1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 #include <asm/setup.h> 35 36 #include "trace.h" 37 38 /* 39 * The "absolute" timestamp in the buffer is only 59 bits. 40 * If a clock has the 5 MSBs set, it needs to be saved and 41 * reinserted. 42 */ 43 #define TS_MSB (0xf8ULL << 56) 44 #define ABS_TS_MASK (~TS_MSB) 45 46 static void update_pages_handler(struct work_struct *work); 47 48 #define RING_BUFFER_META_MAGIC 0xBADFEED 49 50 struct ring_buffer_meta { 51 int magic; 52 int struct_sizes; 53 unsigned long total_size; 54 unsigned long buffers_offset; 55 }; 56 57 struct ring_buffer_cpu_meta { 58 unsigned long first_buffer; 59 unsigned long head_buffer; 60 unsigned long commit_buffer; 61 __u32 subbuf_size; 62 __u32 nr_subbufs; 63 int buffers[]; 64 }; 65 66 /* 67 * The ring buffer header is special. We must manually up keep it. 68 */ 69 int ring_buffer_print_entry_header(struct trace_seq *s) 70 { 71 trace_seq_puts(s, "# compressed entry header\n"); 72 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 73 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 74 trace_seq_puts(s, "\tarray : 32 bits\n"); 75 trace_seq_putc(s, '\n'); 76 trace_seq_printf(s, "\tpadding : type == %d\n", 77 RINGBUF_TYPE_PADDING); 78 trace_seq_printf(s, "\ttime_extend : type == %d\n", 79 RINGBUF_TYPE_TIME_EXTEND); 80 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 81 RINGBUF_TYPE_TIME_STAMP); 82 trace_seq_printf(s, "\tdata max type_len == %d\n", 83 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 84 85 return !trace_seq_has_overflowed(s); 86 } 87 88 /* 89 * The ring buffer is made up of a list of pages. A separate list of pages is 90 * allocated for each CPU. A writer may only write to a buffer that is 91 * associated with the CPU it is currently executing on. A reader may read 92 * from any per cpu buffer. 93 * 94 * The reader is special. For each per cpu buffer, the reader has its own 95 * reader page. When a reader has read the entire reader page, this reader 96 * page is swapped with another page in the ring buffer. 97 * 98 * Now, as long as the writer is off the reader page, the reader can do what 99 * ever it wants with that page. The writer will never write to that page 100 * again (as long as it is out of the ring buffer). 101 * 102 * Here's some silly ASCII art. 103 * 104 * +------+ 105 * |reader| RING BUFFER 106 * |page | 107 * +------+ +---+ +---+ +---+ 108 * | |-->| |-->| | 109 * +---+ +---+ +---+ 110 * ^ | 111 * | | 112 * +---------------+ 113 * 114 * 115 * +------+ 116 * |reader| RING BUFFER 117 * |page |------------------v 118 * +------+ +---+ +---+ +---+ 119 * | |-->| |-->| | 120 * +---+ +---+ +---+ 121 * ^ | 122 * | | 123 * +---------------+ 124 * 125 * 126 * +------+ 127 * |reader| RING BUFFER 128 * |page |------------------v 129 * +------+ +---+ +---+ +---+ 130 * ^ | |-->| |-->| | 131 * | +---+ +---+ +---+ 132 * | | 133 * | | 134 * +------------------------------+ 135 * 136 * 137 * +------+ 138 * |buffer| RING BUFFER 139 * |page |------------------v 140 * +------+ +---+ +---+ +---+ 141 * ^ | | | |-->| | 142 * | New +---+ +---+ +---+ 143 * | Reader------^ | 144 * | page | 145 * +------------------------------+ 146 * 147 * 148 * After we make this swap, the reader can hand this page off to the splice 149 * code and be done with it. It can even allocate a new page if it needs to 150 * and swap that into the ring buffer. 151 * 152 * We will be using cmpxchg soon to make all this lockless. 153 * 154 */ 155 156 /* Used for individual buffers (after the counter) */ 157 #define RB_BUFFER_OFF (1 << 20) 158 159 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 160 161 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 162 #define RB_ALIGNMENT 4U 163 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 164 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 165 166 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 167 # define RB_FORCE_8BYTE_ALIGNMENT 0 168 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 169 #else 170 # define RB_FORCE_8BYTE_ALIGNMENT 1 171 # define RB_ARCH_ALIGNMENT 8U 172 #endif 173 174 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 175 176 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 177 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 178 179 enum { 180 RB_LEN_TIME_EXTEND = 8, 181 RB_LEN_TIME_STAMP = 8, 182 }; 183 184 #define skip_time_extend(event) \ 185 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 186 187 #define extended_time(event) \ 188 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 189 190 static inline bool rb_null_event(struct ring_buffer_event *event) 191 { 192 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 193 } 194 195 static void rb_event_set_padding(struct ring_buffer_event *event) 196 { 197 /* padding has a NULL time_delta */ 198 event->type_len = RINGBUF_TYPE_PADDING; 199 event->time_delta = 0; 200 } 201 202 static unsigned 203 rb_event_data_length(struct ring_buffer_event *event) 204 { 205 unsigned length; 206 207 if (event->type_len) 208 length = event->type_len * RB_ALIGNMENT; 209 else 210 length = event->array[0]; 211 return length + RB_EVNT_HDR_SIZE; 212 } 213 214 /* 215 * Return the length of the given event. Will return 216 * the length of the time extend if the event is a 217 * time extend. 218 */ 219 static inline unsigned 220 rb_event_length(struct ring_buffer_event *event) 221 { 222 switch (event->type_len) { 223 case RINGBUF_TYPE_PADDING: 224 if (rb_null_event(event)) 225 /* undefined */ 226 return -1; 227 return event->array[0] + RB_EVNT_HDR_SIZE; 228 229 case RINGBUF_TYPE_TIME_EXTEND: 230 return RB_LEN_TIME_EXTEND; 231 232 case RINGBUF_TYPE_TIME_STAMP: 233 return RB_LEN_TIME_STAMP; 234 235 case RINGBUF_TYPE_DATA: 236 return rb_event_data_length(event); 237 default: 238 WARN_ON_ONCE(1); 239 } 240 /* not hit */ 241 return 0; 242 } 243 244 /* 245 * Return total length of time extend and data, 246 * or just the event length for all other events. 247 */ 248 static inline unsigned 249 rb_event_ts_length(struct ring_buffer_event *event) 250 { 251 unsigned len = 0; 252 253 if (extended_time(event)) { 254 /* time extends include the data event after it */ 255 len = RB_LEN_TIME_EXTEND; 256 event = skip_time_extend(event); 257 } 258 return len + rb_event_length(event); 259 } 260 261 /** 262 * ring_buffer_event_length - return the length of the event 263 * @event: the event to get the length of 264 * 265 * Returns the size of the data load of a data event. 266 * If the event is something other than a data event, it 267 * returns the size of the event itself. With the exception 268 * of a TIME EXTEND, where it still returns the size of the 269 * data load of the data event after it. 270 */ 271 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 272 { 273 unsigned length; 274 275 if (extended_time(event)) 276 event = skip_time_extend(event); 277 278 length = rb_event_length(event); 279 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 280 return length; 281 length -= RB_EVNT_HDR_SIZE; 282 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 283 length -= sizeof(event->array[0]); 284 return length; 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 287 288 /* inline for ring buffer fast paths */ 289 static __always_inline void * 290 rb_event_data(struct ring_buffer_event *event) 291 { 292 if (extended_time(event)) 293 event = skip_time_extend(event); 294 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define for_each_online_buffer_cpu(buffer, cpu) \ 316 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 317 318 #define TS_SHIFT 27 319 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 320 #define TS_DELTA_TEST (~TS_MASK) 321 322 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 323 { 324 u64 ts; 325 326 ts = event->array[0]; 327 ts <<= TS_SHIFT; 328 ts += event->time_delta; 329 330 return ts; 331 } 332 333 /* Flag when events were overwritten */ 334 #define RB_MISSED_EVENTS (1 << 31) 335 /* Missed count stored at end */ 336 #define RB_MISSED_STORED (1 << 30) 337 338 #define RB_MISSED_MASK (3 << 30) 339 340 struct buffer_data_page { 341 u64 time_stamp; /* page time stamp */ 342 local_t commit; /* write committed index */ 343 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 344 }; 345 346 struct buffer_data_read_page { 347 unsigned order; /* order of the page */ 348 struct buffer_data_page *data; /* actual data, stored in this page */ 349 }; 350 351 /* 352 * Note, the buffer_page list must be first. The buffer pages 353 * are allocated in cache lines, which means that each buffer 354 * page will be at the beginning of a cache line, and thus 355 * the least significant bits will be zero. We use this to 356 * add flags in the list struct pointers, to make the ring buffer 357 * lockless. 358 */ 359 struct buffer_page { 360 struct list_head list; /* list of buffer pages */ 361 local_t write; /* index for next write */ 362 unsigned read; /* index for next read */ 363 local_t entries; /* entries on this page */ 364 unsigned long real_end; /* real end of data */ 365 unsigned order; /* order of the page */ 366 u32 id:30; /* ID for external mapping */ 367 u32 range:1; /* Mapped via a range */ 368 struct buffer_data_page *page; /* Actual data page */ 369 }; 370 371 /* 372 * The buffer page counters, write and entries, must be reset 373 * atomically when crossing page boundaries. To synchronize this 374 * update, two counters are inserted into the number. One is 375 * the actual counter for the write position or count on the page. 376 * 377 * The other is a counter of updaters. Before an update happens 378 * the update partition of the counter is incremented. This will 379 * allow the updater to update the counter atomically. 380 * 381 * The counter is 20 bits, and the state data is 12. 382 */ 383 #define RB_WRITE_MASK 0xfffff 384 #define RB_WRITE_INTCNT (1 << 20) 385 386 static void rb_init_page(struct buffer_data_page *bpage) 387 { 388 local_set(&bpage->commit, 0); 389 } 390 391 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 392 { 393 return local_read(&bpage->page->commit); 394 } 395 396 static void free_buffer_page(struct buffer_page *bpage) 397 { 398 /* Range pages are not to be freed */ 399 if (!bpage->range) 400 free_pages((unsigned long)bpage->page, bpage->order); 401 kfree(bpage); 402 } 403 404 /* 405 * For best performance, allocate cpu buffer data cache line sized 406 * and per CPU. 407 */ 408 #define alloc_cpu_buffer(cpu) (struct ring_buffer_per_cpu *) \ 409 kzalloc_node(ALIGN(sizeof(struct ring_buffer_per_cpu), \ 410 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 411 412 #define alloc_cpu_page(cpu) (struct buffer_page *) \ 413 kzalloc_node(ALIGN(sizeof(struct buffer_page), \ 414 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 415 416 static struct buffer_data_page *alloc_cpu_data(int cpu, int order) 417 { 418 struct buffer_data_page *dpage; 419 struct page *page; 420 gfp_t mflags; 421 422 /* 423 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 424 * gracefully without invoking oom-killer and the system is not 425 * destabilized. 426 */ 427 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_COMP | __GFP_ZERO; 428 429 page = alloc_pages_node(cpu_to_node(cpu), mflags, order); 430 if (!page) 431 return NULL; 432 433 dpage = page_address(page); 434 rb_init_page(dpage); 435 436 return dpage; 437 } 438 439 /* 440 * We need to fit the time_stamp delta into 27 bits. 441 */ 442 static inline bool test_time_stamp(u64 delta) 443 { 444 return !!(delta & TS_DELTA_TEST); 445 } 446 447 struct rb_irq_work { 448 struct irq_work work; 449 wait_queue_head_t waiters; 450 wait_queue_head_t full_waiters; 451 atomic_t seq; 452 bool waiters_pending; 453 bool full_waiters_pending; 454 bool wakeup_full; 455 }; 456 457 /* 458 * Structure to hold event state and handle nested events. 459 */ 460 struct rb_event_info { 461 u64 ts; 462 u64 delta; 463 u64 before; 464 u64 after; 465 unsigned long length; 466 struct buffer_page *tail_page; 467 int add_timestamp; 468 }; 469 470 /* 471 * Used for the add_timestamp 472 * NONE 473 * EXTEND - wants a time extend 474 * ABSOLUTE - the buffer requests all events to have absolute time stamps 475 * FORCE - force a full time stamp. 476 */ 477 enum { 478 RB_ADD_STAMP_NONE = 0, 479 RB_ADD_STAMP_EXTEND = BIT(1), 480 RB_ADD_STAMP_ABSOLUTE = BIT(2), 481 RB_ADD_STAMP_FORCE = BIT(3) 482 }; 483 /* 484 * Used for which event context the event is in. 485 * TRANSITION = 0 486 * NMI = 1 487 * IRQ = 2 488 * SOFTIRQ = 3 489 * NORMAL = 4 490 * 491 * See trace_recursive_lock() comment below for more details. 492 */ 493 enum { 494 RB_CTX_TRANSITION, 495 RB_CTX_NMI, 496 RB_CTX_IRQ, 497 RB_CTX_SOFTIRQ, 498 RB_CTX_NORMAL, 499 RB_CTX_MAX 500 }; 501 502 struct rb_time_struct { 503 local64_t time; 504 }; 505 typedef struct rb_time_struct rb_time_t; 506 507 #define MAX_NEST 5 508 509 /* 510 * head_page == tail_page && head == tail then buffer is empty. 511 */ 512 struct ring_buffer_per_cpu { 513 int cpu; 514 atomic_t record_disabled; 515 atomic_t resize_disabled; 516 struct trace_buffer *buffer; 517 raw_spinlock_t reader_lock; /* serialize readers */ 518 arch_spinlock_t lock; 519 struct lock_class_key lock_key; 520 struct buffer_data_page *free_page; 521 unsigned long nr_pages; 522 unsigned int current_context; 523 struct list_head *pages; 524 /* pages generation counter, incremented when the list changes */ 525 unsigned long cnt; 526 struct buffer_page *head_page; /* read from head */ 527 struct buffer_page *tail_page; /* write to tail */ 528 struct buffer_page *commit_page; /* committed pages */ 529 struct buffer_page *reader_page; 530 unsigned long lost_events; 531 unsigned long last_overrun; 532 unsigned long nest; 533 local_t entries_bytes; 534 local_t entries; 535 local_t overrun; 536 local_t commit_overrun; 537 local_t dropped_events; 538 local_t committing; 539 local_t commits; 540 local_t pages_touched; 541 local_t pages_lost; 542 local_t pages_read; 543 long last_pages_touch; 544 size_t shortest_full; 545 unsigned long read; 546 unsigned long read_bytes; 547 rb_time_t write_stamp; 548 rb_time_t before_stamp; 549 u64 event_stamp[MAX_NEST]; 550 u64 read_stamp; 551 /* pages removed since last reset */ 552 unsigned long pages_removed; 553 554 unsigned int mapped; 555 unsigned int user_mapped; /* user space mapping */ 556 struct mutex mapping_lock; 557 unsigned long *subbuf_ids; /* ID to subbuf VA */ 558 struct trace_buffer_meta *meta_page; 559 struct ring_buffer_cpu_meta *ring_meta; 560 561 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 562 long nr_pages_to_update; 563 struct list_head new_pages; /* new pages to add */ 564 struct work_struct update_pages_work; 565 struct completion update_done; 566 567 struct rb_irq_work irq_work; 568 }; 569 570 struct trace_buffer { 571 unsigned flags; 572 int cpus; 573 atomic_t record_disabled; 574 atomic_t resizing; 575 cpumask_var_t cpumask; 576 577 struct lock_class_key *reader_lock_key; 578 579 struct mutex mutex; 580 581 struct ring_buffer_per_cpu **buffers; 582 583 struct hlist_node node; 584 u64 (*clock)(void); 585 586 struct rb_irq_work irq_work; 587 bool time_stamp_abs; 588 589 unsigned long range_addr_start; 590 unsigned long range_addr_end; 591 592 struct ring_buffer_meta *meta; 593 594 unsigned int subbuf_size; 595 unsigned int subbuf_order; 596 unsigned int max_data_size; 597 }; 598 599 struct ring_buffer_iter { 600 struct ring_buffer_per_cpu *cpu_buffer; 601 unsigned long head; 602 unsigned long next_event; 603 struct buffer_page *head_page; 604 struct buffer_page *cache_reader_page; 605 unsigned long cache_read; 606 unsigned long cache_pages_removed; 607 u64 read_stamp; 608 u64 page_stamp; 609 struct ring_buffer_event *event; 610 size_t event_size; 611 int missed_events; 612 }; 613 614 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 615 { 616 struct buffer_data_page field; 617 618 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 619 "offset:0;\tsize:%u;\tsigned:%u;\n", 620 (unsigned int)sizeof(field.time_stamp), 621 (unsigned int)is_signed_type(u64)); 622 623 trace_seq_printf(s, "\tfield: local_t commit;\t" 624 "offset:%u;\tsize:%u;\tsigned:%u;\n", 625 (unsigned int)offsetof(typeof(field), commit), 626 (unsigned int)sizeof(field.commit), 627 (unsigned int)is_signed_type(long)); 628 629 trace_seq_printf(s, "\tfield: int overwrite;\t" 630 "offset:%u;\tsize:%u;\tsigned:%u;\n", 631 (unsigned int)offsetof(typeof(field), commit), 632 1, 633 (unsigned int)is_signed_type(long)); 634 635 trace_seq_printf(s, "\tfield: char data;\t" 636 "offset:%u;\tsize:%u;\tsigned:%u;\n", 637 (unsigned int)offsetof(typeof(field), data), 638 (unsigned int)buffer->subbuf_size, 639 (unsigned int)is_signed_type(char)); 640 641 return !trace_seq_has_overflowed(s); 642 } 643 644 static inline void rb_time_read(rb_time_t *t, u64 *ret) 645 { 646 *ret = local64_read(&t->time); 647 } 648 static void rb_time_set(rb_time_t *t, u64 val) 649 { 650 local64_set(&t->time, val); 651 } 652 653 /* 654 * Enable this to make sure that the event passed to 655 * ring_buffer_event_time_stamp() is not committed and also 656 * is on the buffer that it passed in. 657 */ 658 //#define RB_VERIFY_EVENT 659 #ifdef RB_VERIFY_EVENT 660 static struct list_head *rb_list_head(struct list_head *list); 661 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 662 void *event) 663 { 664 struct buffer_page *page = cpu_buffer->commit_page; 665 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 666 struct list_head *next; 667 long commit, write; 668 unsigned long addr = (unsigned long)event; 669 bool done = false; 670 int stop = 0; 671 672 /* Make sure the event exists and is not committed yet */ 673 do { 674 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 675 done = true; 676 commit = local_read(&page->page->commit); 677 write = local_read(&page->write); 678 if (addr >= (unsigned long)&page->page->data[commit] && 679 addr < (unsigned long)&page->page->data[write]) 680 return; 681 682 next = rb_list_head(page->list.next); 683 page = list_entry(next, struct buffer_page, list); 684 } while (!done); 685 WARN_ON_ONCE(1); 686 } 687 #else 688 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 689 void *event) 690 { 691 } 692 #endif 693 694 /* 695 * The absolute time stamp drops the 5 MSBs and some clocks may 696 * require them. The rb_fix_abs_ts() will take a previous full 697 * time stamp, and add the 5 MSB of that time stamp on to the 698 * saved absolute time stamp. Then they are compared in case of 699 * the unlikely event that the latest time stamp incremented 700 * the 5 MSB. 701 */ 702 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 703 { 704 if (save_ts & TS_MSB) { 705 abs |= save_ts & TS_MSB; 706 /* Check for overflow */ 707 if (unlikely(abs < save_ts)) 708 abs += 1ULL << 59; 709 } 710 return abs; 711 } 712 713 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 714 715 /** 716 * ring_buffer_event_time_stamp - return the event's current time stamp 717 * @buffer: The buffer that the event is on 718 * @event: the event to get the time stamp of 719 * 720 * Note, this must be called after @event is reserved, and before it is 721 * committed to the ring buffer. And must be called from the same 722 * context where the event was reserved (normal, softirq, irq, etc). 723 * 724 * Returns the time stamp associated with the current event. 725 * If the event has an extended time stamp, then that is used as 726 * the time stamp to return. 727 * In the highly unlikely case that the event was nested more than 728 * the max nesting, then the write_stamp of the buffer is returned, 729 * otherwise current time is returned, but that really neither of 730 * the last two cases should ever happen. 731 */ 732 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 733 struct ring_buffer_event *event) 734 { 735 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 736 unsigned int nest; 737 u64 ts; 738 739 /* If the event includes an absolute time, then just use that */ 740 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 741 ts = rb_event_time_stamp(event); 742 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 743 } 744 745 nest = local_read(&cpu_buffer->committing); 746 verify_event(cpu_buffer, event); 747 if (WARN_ON_ONCE(!nest)) 748 goto fail; 749 750 /* Read the current saved nesting level time stamp */ 751 if (likely(--nest < MAX_NEST)) 752 return cpu_buffer->event_stamp[nest]; 753 754 /* Shouldn't happen, warn if it does */ 755 WARN_ONCE(1, "nest (%d) greater than max", nest); 756 757 fail: 758 rb_time_read(&cpu_buffer->write_stamp, &ts); 759 760 return ts; 761 } 762 763 /** 764 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 765 * @buffer: The ring_buffer to get the number of pages from 766 * @cpu: The cpu of the ring_buffer to get the number of pages from 767 * 768 * Returns the number of pages that have content in the ring buffer. 769 */ 770 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 771 { 772 size_t read; 773 size_t lost; 774 size_t cnt; 775 776 read = local_read(&buffer->buffers[cpu]->pages_read); 777 lost = local_read(&buffer->buffers[cpu]->pages_lost); 778 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 779 780 if (WARN_ON_ONCE(cnt < lost)) 781 return 0; 782 783 cnt -= lost; 784 785 /* The reader can read an empty page, but not more than that */ 786 if (cnt < read) { 787 WARN_ON_ONCE(read > cnt + 1); 788 return 0; 789 } 790 791 return cnt - read; 792 } 793 794 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 795 { 796 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 797 size_t nr_pages; 798 size_t dirty; 799 800 nr_pages = cpu_buffer->nr_pages; 801 if (!nr_pages || !full) 802 return true; 803 804 /* 805 * Add one as dirty will never equal nr_pages, as the sub-buffer 806 * that the writer is on is not counted as dirty. 807 * This is needed if "buffer_percent" is set to 100. 808 */ 809 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 810 811 return (dirty * 100) >= (full * nr_pages); 812 } 813 814 /* 815 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 816 * 817 * Schedules a delayed work to wake up any task that is blocked on the 818 * ring buffer waiters queue. 819 */ 820 static void rb_wake_up_waiters(struct irq_work *work) 821 { 822 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 823 824 /* For waiters waiting for the first wake up */ 825 (void)atomic_fetch_inc_release(&rbwork->seq); 826 827 wake_up_all(&rbwork->waiters); 828 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 829 /* Only cpu_buffer sets the above flags */ 830 struct ring_buffer_per_cpu *cpu_buffer = 831 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 832 833 /* Called from interrupt context */ 834 raw_spin_lock(&cpu_buffer->reader_lock); 835 rbwork->wakeup_full = false; 836 rbwork->full_waiters_pending = false; 837 838 /* Waking up all waiters, they will reset the shortest full */ 839 cpu_buffer->shortest_full = 0; 840 raw_spin_unlock(&cpu_buffer->reader_lock); 841 842 wake_up_all(&rbwork->full_waiters); 843 } 844 } 845 846 /** 847 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 848 * @buffer: The ring buffer to wake waiters on 849 * @cpu: The CPU buffer to wake waiters on 850 * 851 * In the case of a file that represents a ring buffer is closing, 852 * it is prudent to wake up any waiters that are on this. 853 */ 854 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 855 { 856 struct ring_buffer_per_cpu *cpu_buffer; 857 struct rb_irq_work *rbwork; 858 859 if (!buffer) 860 return; 861 862 if (cpu == RING_BUFFER_ALL_CPUS) { 863 864 /* Wake up individual ones too. One level recursion */ 865 for_each_buffer_cpu(buffer, cpu) 866 ring_buffer_wake_waiters(buffer, cpu); 867 868 rbwork = &buffer->irq_work; 869 } else { 870 if (WARN_ON_ONCE(!buffer->buffers)) 871 return; 872 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 873 return; 874 875 cpu_buffer = buffer->buffers[cpu]; 876 /* The CPU buffer may not have been initialized yet */ 877 if (!cpu_buffer) 878 return; 879 rbwork = &cpu_buffer->irq_work; 880 } 881 882 /* This can be called in any context */ 883 irq_work_queue(&rbwork->work); 884 } 885 886 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 887 { 888 struct ring_buffer_per_cpu *cpu_buffer; 889 bool ret = false; 890 891 /* Reads of all CPUs always waits for any data */ 892 if (cpu == RING_BUFFER_ALL_CPUS) 893 return !ring_buffer_empty(buffer); 894 895 cpu_buffer = buffer->buffers[cpu]; 896 897 if (!ring_buffer_empty_cpu(buffer, cpu)) { 898 unsigned long flags; 899 bool pagebusy; 900 901 if (!full) 902 return true; 903 904 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 905 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 906 ret = !pagebusy && full_hit(buffer, cpu, full); 907 908 if (!ret && (!cpu_buffer->shortest_full || 909 cpu_buffer->shortest_full > full)) { 910 cpu_buffer->shortest_full = full; 911 } 912 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 913 } 914 return ret; 915 } 916 917 static inline bool 918 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 919 int cpu, int full, ring_buffer_cond_fn cond, void *data) 920 { 921 if (rb_watermark_hit(buffer, cpu, full)) 922 return true; 923 924 if (cond(data)) 925 return true; 926 927 /* 928 * The events can happen in critical sections where 929 * checking a work queue can cause deadlocks. 930 * After adding a task to the queue, this flag is set 931 * only to notify events to try to wake up the queue 932 * using irq_work. 933 * 934 * We don't clear it even if the buffer is no longer 935 * empty. The flag only causes the next event to run 936 * irq_work to do the work queue wake up. The worse 937 * that can happen if we race with !trace_empty() is that 938 * an event will cause an irq_work to try to wake up 939 * an empty queue. 940 * 941 * There's no reason to protect this flag either, as 942 * the work queue and irq_work logic will do the necessary 943 * synchronization for the wake ups. The only thing 944 * that is necessary is that the wake up happens after 945 * a task has been queued. It's OK for spurious wake ups. 946 */ 947 if (full) 948 rbwork->full_waiters_pending = true; 949 else 950 rbwork->waiters_pending = true; 951 952 return false; 953 } 954 955 struct rb_wait_data { 956 struct rb_irq_work *irq_work; 957 int seq; 958 }; 959 960 /* 961 * The default wait condition for ring_buffer_wait() is to just to exit the 962 * wait loop the first time it is woken up. 963 */ 964 static bool rb_wait_once(void *data) 965 { 966 struct rb_wait_data *rdata = data; 967 struct rb_irq_work *rbwork = rdata->irq_work; 968 969 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 970 } 971 972 /** 973 * ring_buffer_wait - wait for input to the ring buffer 974 * @buffer: buffer to wait on 975 * @cpu: the cpu buffer to wait on 976 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 977 * @cond: condition function to break out of wait (NULL to run once) 978 * @data: the data to pass to @cond. 979 * 980 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 981 * as data is added to any of the @buffer's cpu buffers. Otherwise 982 * it will wait for data to be added to a specific cpu buffer. 983 */ 984 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 985 ring_buffer_cond_fn cond, void *data) 986 { 987 struct ring_buffer_per_cpu *cpu_buffer; 988 struct wait_queue_head *waitq; 989 struct rb_irq_work *rbwork; 990 struct rb_wait_data rdata; 991 int ret = 0; 992 993 /* 994 * Depending on what the caller is waiting for, either any 995 * data in any cpu buffer, or a specific buffer, put the 996 * caller on the appropriate wait queue. 997 */ 998 if (cpu == RING_BUFFER_ALL_CPUS) { 999 rbwork = &buffer->irq_work; 1000 /* Full only makes sense on per cpu reads */ 1001 full = 0; 1002 } else { 1003 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1004 return -ENODEV; 1005 cpu_buffer = buffer->buffers[cpu]; 1006 rbwork = &cpu_buffer->irq_work; 1007 } 1008 1009 if (full) 1010 waitq = &rbwork->full_waiters; 1011 else 1012 waitq = &rbwork->waiters; 1013 1014 /* Set up to exit loop as soon as it is woken */ 1015 if (!cond) { 1016 cond = rb_wait_once; 1017 rdata.irq_work = rbwork; 1018 rdata.seq = atomic_read_acquire(&rbwork->seq); 1019 data = &rdata; 1020 } 1021 1022 ret = wait_event_interruptible((*waitq), 1023 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 1024 1025 return ret; 1026 } 1027 1028 /** 1029 * ring_buffer_poll_wait - poll on buffer input 1030 * @buffer: buffer to wait on 1031 * @cpu: the cpu buffer to wait on 1032 * @filp: the file descriptor 1033 * @poll_table: The poll descriptor 1034 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1035 * 1036 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1037 * as data is added to any of the @buffer's cpu buffers. Otherwise 1038 * it will wait for data to be added to a specific cpu buffer. 1039 * 1040 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1041 * zero otherwise. 1042 */ 1043 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1044 struct file *filp, poll_table *poll_table, int full) 1045 { 1046 struct ring_buffer_per_cpu *cpu_buffer; 1047 struct rb_irq_work *rbwork; 1048 1049 if (cpu == RING_BUFFER_ALL_CPUS) { 1050 rbwork = &buffer->irq_work; 1051 full = 0; 1052 } else { 1053 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1054 return EPOLLERR; 1055 1056 cpu_buffer = buffer->buffers[cpu]; 1057 rbwork = &cpu_buffer->irq_work; 1058 } 1059 1060 if (full) { 1061 poll_wait(filp, &rbwork->full_waiters, poll_table); 1062 1063 if (rb_watermark_hit(buffer, cpu, full)) 1064 return EPOLLIN | EPOLLRDNORM; 1065 /* 1066 * Only allow full_waiters_pending update to be seen after 1067 * the shortest_full is set (in rb_watermark_hit). If the 1068 * writer sees the full_waiters_pending flag set, it will 1069 * compare the amount in the ring buffer to shortest_full. 1070 * If the amount in the ring buffer is greater than the 1071 * shortest_full percent, it will call the irq_work handler 1072 * to wake up this list. The irq_handler will reset shortest_full 1073 * back to zero. That's done under the reader_lock, but 1074 * the below smp_mb() makes sure that the update to 1075 * full_waiters_pending doesn't leak up into the above. 1076 */ 1077 smp_mb(); 1078 rbwork->full_waiters_pending = true; 1079 return 0; 1080 } 1081 1082 poll_wait(filp, &rbwork->waiters, poll_table); 1083 rbwork->waiters_pending = true; 1084 1085 /* 1086 * There's a tight race between setting the waiters_pending and 1087 * checking if the ring buffer is empty. Once the waiters_pending bit 1088 * is set, the next event will wake the task up, but we can get stuck 1089 * if there's only a single event in. 1090 * 1091 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1092 * but adding a memory barrier to all events will cause too much of a 1093 * performance hit in the fast path. We only need a memory barrier when 1094 * the buffer goes from empty to having content. But as this race is 1095 * extremely small, and it's not a problem if another event comes in, we 1096 * will fix it later. 1097 */ 1098 smp_mb(); 1099 1100 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1101 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1102 return EPOLLIN | EPOLLRDNORM; 1103 return 0; 1104 } 1105 1106 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1107 #define RB_WARN_ON(b, cond) \ 1108 ({ \ 1109 int _____ret = unlikely(cond); \ 1110 if (_____ret) { \ 1111 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1112 struct ring_buffer_per_cpu *__b = \ 1113 (void *)b; \ 1114 atomic_inc(&__b->buffer->record_disabled); \ 1115 } else \ 1116 atomic_inc(&b->record_disabled); \ 1117 WARN_ON(1); \ 1118 } \ 1119 _____ret; \ 1120 }) 1121 1122 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1123 #define DEBUG_SHIFT 0 1124 1125 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1126 { 1127 u64 ts; 1128 1129 /* Skip retpolines :-( */ 1130 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1131 ts = trace_clock_local(); 1132 else 1133 ts = buffer->clock(); 1134 1135 /* shift to debug/test normalization and TIME_EXTENTS */ 1136 return ts << DEBUG_SHIFT; 1137 } 1138 1139 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1140 { 1141 u64 time; 1142 1143 preempt_disable_notrace(); 1144 time = rb_time_stamp(buffer); 1145 preempt_enable_notrace(); 1146 1147 return time; 1148 } 1149 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1150 1151 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1152 int cpu, u64 *ts) 1153 { 1154 /* Just stupid testing the normalize function and deltas */ 1155 *ts >>= DEBUG_SHIFT; 1156 } 1157 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1158 1159 /* 1160 * Making the ring buffer lockless makes things tricky. 1161 * Although writes only happen on the CPU that they are on, 1162 * and they only need to worry about interrupts. Reads can 1163 * happen on any CPU. 1164 * 1165 * The reader page is always off the ring buffer, but when the 1166 * reader finishes with a page, it needs to swap its page with 1167 * a new one from the buffer. The reader needs to take from 1168 * the head (writes go to the tail). But if a writer is in overwrite 1169 * mode and wraps, it must push the head page forward. 1170 * 1171 * Here lies the problem. 1172 * 1173 * The reader must be careful to replace only the head page, and 1174 * not another one. As described at the top of the file in the 1175 * ASCII art, the reader sets its old page to point to the next 1176 * page after head. It then sets the page after head to point to 1177 * the old reader page. But if the writer moves the head page 1178 * during this operation, the reader could end up with the tail. 1179 * 1180 * We use cmpxchg to help prevent this race. We also do something 1181 * special with the page before head. We set the LSB to 1. 1182 * 1183 * When the writer must push the page forward, it will clear the 1184 * bit that points to the head page, move the head, and then set 1185 * the bit that points to the new head page. 1186 * 1187 * We also don't want an interrupt coming in and moving the head 1188 * page on another writer. Thus we use the second LSB to catch 1189 * that too. Thus: 1190 * 1191 * head->list->prev->next bit 1 bit 0 1192 * ------- ------- 1193 * Normal page 0 0 1194 * Points to head page 0 1 1195 * New head page 1 0 1196 * 1197 * Note we can not trust the prev pointer of the head page, because: 1198 * 1199 * +----+ +-----+ +-----+ 1200 * | |------>| T |---X--->| N | 1201 * | |<------| | | | 1202 * +----+ +-----+ +-----+ 1203 * ^ ^ | 1204 * | +-----+ | | 1205 * +----------| R |----------+ | 1206 * | |<-----------+ 1207 * +-----+ 1208 * 1209 * Key: ---X--> HEAD flag set in pointer 1210 * T Tail page 1211 * R Reader page 1212 * N Next page 1213 * 1214 * (see __rb_reserve_next() to see where this happens) 1215 * 1216 * What the above shows is that the reader just swapped out 1217 * the reader page with a page in the buffer, but before it 1218 * could make the new header point back to the new page added 1219 * it was preempted by a writer. The writer moved forward onto 1220 * the new page added by the reader and is about to move forward 1221 * again. 1222 * 1223 * You can see, it is legitimate for the previous pointer of 1224 * the head (or any page) not to point back to itself. But only 1225 * temporarily. 1226 */ 1227 1228 #define RB_PAGE_NORMAL 0UL 1229 #define RB_PAGE_HEAD 1UL 1230 #define RB_PAGE_UPDATE 2UL 1231 1232 1233 #define RB_FLAG_MASK 3UL 1234 1235 /* PAGE_MOVED is not part of the mask */ 1236 #define RB_PAGE_MOVED 4UL 1237 1238 /* 1239 * rb_list_head - remove any bit 1240 */ 1241 static struct list_head *rb_list_head(struct list_head *list) 1242 { 1243 unsigned long val = (unsigned long)list; 1244 1245 return (struct list_head *)(val & ~RB_FLAG_MASK); 1246 } 1247 1248 /* 1249 * rb_is_head_page - test if the given page is the head page 1250 * 1251 * Because the reader may move the head_page pointer, we can 1252 * not trust what the head page is (it may be pointing to 1253 * the reader page). But if the next page is a header page, 1254 * its flags will be non zero. 1255 */ 1256 static inline int 1257 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1258 { 1259 unsigned long val; 1260 1261 val = (unsigned long)list->next; 1262 1263 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1264 return RB_PAGE_MOVED; 1265 1266 return val & RB_FLAG_MASK; 1267 } 1268 1269 /* 1270 * rb_is_reader_page 1271 * 1272 * The unique thing about the reader page, is that, if the 1273 * writer is ever on it, the previous pointer never points 1274 * back to the reader page. 1275 */ 1276 static bool rb_is_reader_page(struct buffer_page *page) 1277 { 1278 struct list_head *list = page->list.prev; 1279 1280 return rb_list_head(list->next) != &page->list; 1281 } 1282 1283 /* 1284 * rb_set_list_to_head - set a list_head to be pointing to head. 1285 */ 1286 static void rb_set_list_to_head(struct list_head *list) 1287 { 1288 unsigned long *ptr; 1289 1290 ptr = (unsigned long *)&list->next; 1291 *ptr |= RB_PAGE_HEAD; 1292 *ptr &= ~RB_PAGE_UPDATE; 1293 } 1294 1295 /* 1296 * rb_head_page_activate - sets up head page 1297 */ 1298 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1299 { 1300 struct buffer_page *head; 1301 1302 head = cpu_buffer->head_page; 1303 if (!head) 1304 return; 1305 1306 /* 1307 * Set the previous list pointer to have the HEAD flag. 1308 */ 1309 rb_set_list_to_head(head->list.prev); 1310 1311 if (cpu_buffer->ring_meta) { 1312 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1313 meta->head_buffer = (unsigned long)head->page; 1314 } 1315 } 1316 1317 static void rb_list_head_clear(struct list_head *list) 1318 { 1319 unsigned long *ptr = (unsigned long *)&list->next; 1320 1321 *ptr &= ~RB_FLAG_MASK; 1322 } 1323 1324 /* 1325 * rb_head_page_deactivate - clears head page ptr (for free list) 1326 */ 1327 static void 1328 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1329 { 1330 struct list_head *hd; 1331 1332 /* Go through the whole list and clear any pointers found. */ 1333 rb_list_head_clear(cpu_buffer->pages); 1334 1335 list_for_each(hd, cpu_buffer->pages) 1336 rb_list_head_clear(hd); 1337 } 1338 1339 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1340 struct buffer_page *head, 1341 struct buffer_page *prev, 1342 int old_flag, int new_flag) 1343 { 1344 struct list_head *list; 1345 unsigned long val = (unsigned long)&head->list; 1346 unsigned long ret; 1347 1348 list = &prev->list; 1349 1350 val &= ~RB_FLAG_MASK; 1351 1352 ret = cmpxchg((unsigned long *)&list->next, 1353 val | old_flag, val | new_flag); 1354 1355 /* check if the reader took the page */ 1356 if ((ret & ~RB_FLAG_MASK) != val) 1357 return RB_PAGE_MOVED; 1358 1359 return ret & RB_FLAG_MASK; 1360 } 1361 1362 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1363 struct buffer_page *head, 1364 struct buffer_page *prev, 1365 int old_flag) 1366 { 1367 return rb_head_page_set(cpu_buffer, head, prev, 1368 old_flag, RB_PAGE_UPDATE); 1369 } 1370 1371 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1372 struct buffer_page *head, 1373 struct buffer_page *prev, 1374 int old_flag) 1375 { 1376 return rb_head_page_set(cpu_buffer, head, prev, 1377 old_flag, RB_PAGE_HEAD); 1378 } 1379 1380 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1381 struct buffer_page *head, 1382 struct buffer_page *prev, 1383 int old_flag) 1384 { 1385 return rb_head_page_set(cpu_buffer, head, prev, 1386 old_flag, RB_PAGE_NORMAL); 1387 } 1388 1389 static inline void rb_inc_page(struct buffer_page **bpage) 1390 { 1391 struct list_head *p = rb_list_head((*bpage)->list.next); 1392 1393 *bpage = list_entry(p, struct buffer_page, list); 1394 } 1395 1396 static inline void rb_dec_page(struct buffer_page **bpage) 1397 { 1398 struct list_head *p = rb_list_head((*bpage)->list.prev); 1399 1400 *bpage = list_entry(p, struct buffer_page, list); 1401 } 1402 1403 static struct buffer_page * 1404 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1405 { 1406 struct buffer_page *head; 1407 struct buffer_page *page; 1408 struct list_head *list; 1409 int i; 1410 1411 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1412 return NULL; 1413 1414 /* sanity check */ 1415 list = cpu_buffer->pages; 1416 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1417 return NULL; 1418 1419 page = head = cpu_buffer->head_page; 1420 /* 1421 * It is possible that the writer moves the header behind 1422 * where we started, and we miss in one loop. 1423 * A second loop should grab the header, but we'll do 1424 * three loops just because I'm paranoid. 1425 */ 1426 for (i = 0; i < 3; i++) { 1427 do { 1428 if (rb_is_head_page(page, page->list.prev)) { 1429 cpu_buffer->head_page = page; 1430 return page; 1431 } 1432 rb_inc_page(&page); 1433 } while (page != head); 1434 } 1435 1436 RB_WARN_ON(cpu_buffer, 1); 1437 1438 return NULL; 1439 } 1440 1441 static bool rb_head_page_replace(struct buffer_page *old, 1442 struct buffer_page *new) 1443 { 1444 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1445 unsigned long val; 1446 1447 val = *ptr & ~RB_FLAG_MASK; 1448 val |= RB_PAGE_HEAD; 1449 1450 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1451 } 1452 1453 /* 1454 * rb_tail_page_update - move the tail page forward 1455 */ 1456 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1457 struct buffer_page *tail_page, 1458 struct buffer_page *next_page) 1459 { 1460 unsigned long old_entries; 1461 unsigned long old_write; 1462 1463 /* 1464 * The tail page now needs to be moved forward. 1465 * 1466 * We need to reset the tail page, but without messing 1467 * with possible erasing of data brought in by interrupts 1468 * that have moved the tail page and are currently on it. 1469 * 1470 * We add a counter to the write field to denote this. 1471 */ 1472 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1473 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1474 1475 /* 1476 * Just make sure we have seen our old_write and synchronize 1477 * with any interrupts that come in. 1478 */ 1479 barrier(); 1480 1481 /* 1482 * If the tail page is still the same as what we think 1483 * it is, then it is up to us to update the tail 1484 * pointer. 1485 */ 1486 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1487 /* Zero the write counter */ 1488 unsigned long val = old_write & ~RB_WRITE_MASK; 1489 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1490 1491 /* 1492 * This will only succeed if an interrupt did 1493 * not come in and change it. In which case, we 1494 * do not want to modify it. 1495 * 1496 * We add (void) to let the compiler know that we do not care 1497 * about the return value of these functions. We use the 1498 * cmpxchg to only update if an interrupt did not already 1499 * do it for us. If the cmpxchg fails, we don't care. 1500 */ 1501 (void)local_cmpxchg(&next_page->write, old_write, val); 1502 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1503 1504 /* 1505 * No need to worry about races with clearing out the commit. 1506 * it only can increment when a commit takes place. But that 1507 * only happens in the outer most nested commit. 1508 */ 1509 local_set(&next_page->page->commit, 0); 1510 1511 /* Either we update tail_page or an interrupt does */ 1512 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1513 local_inc(&cpu_buffer->pages_touched); 1514 } 1515 } 1516 1517 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1518 struct buffer_page *bpage) 1519 { 1520 unsigned long val = (unsigned long)bpage; 1521 1522 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1523 } 1524 1525 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1526 struct list_head *list) 1527 { 1528 if (RB_WARN_ON(cpu_buffer, 1529 rb_list_head(rb_list_head(list->next)->prev) != list)) 1530 return false; 1531 1532 if (RB_WARN_ON(cpu_buffer, 1533 rb_list_head(rb_list_head(list->prev)->next) != list)) 1534 return false; 1535 1536 return true; 1537 } 1538 1539 /** 1540 * rb_check_pages - integrity check of buffer pages 1541 * @cpu_buffer: CPU buffer with pages to test 1542 * 1543 * As a safety measure we check to make sure the data pages have not 1544 * been corrupted. 1545 */ 1546 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1547 { 1548 struct list_head *head, *tmp; 1549 unsigned long buffer_cnt; 1550 unsigned long flags; 1551 int nr_loops = 0; 1552 1553 /* 1554 * Walk the linked list underpinning the ring buffer and validate all 1555 * its next and prev links. 1556 * 1557 * The check acquires the reader_lock to avoid concurrent processing 1558 * with code that could be modifying the list. However, the lock cannot 1559 * be held for the entire duration of the walk, as this would make the 1560 * time when interrupts are disabled non-deterministic, dependent on the 1561 * ring buffer size. Therefore, the code releases and re-acquires the 1562 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1563 * is then used to detect if the list was modified while the lock was 1564 * not held, in which case the check needs to be restarted. 1565 * 1566 * The code attempts to perform the check at most three times before 1567 * giving up. This is acceptable because this is only a self-validation 1568 * to detect problems early on. In practice, the list modification 1569 * operations are fairly spaced, and so this check typically succeeds at 1570 * most on the second try. 1571 */ 1572 again: 1573 if (++nr_loops > 3) 1574 return; 1575 1576 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1577 head = rb_list_head(cpu_buffer->pages); 1578 if (!rb_check_links(cpu_buffer, head)) 1579 goto out_locked; 1580 buffer_cnt = cpu_buffer->cnt; 1581 tmp = head; 1582 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1583 1584 while (true) { 1585 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1586 1587 if (buffer_cnt != cpu_buffer->cnt) { 1588 /* The list was updated, try again. */ 1589 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1590 goto again; 1591 } 1592 1593 tmp = rb_list_head(tmp->next); 1594 if (tmp == head) 1595 /* The iteration circled back, all is done. */ 1596 goto out_locked; 1597 1598 if (!rb_check_links(cpu_buffer, tmp)) 1599 goto out_locked; 1600 1601 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1602 } 1603 1604 out_locked: 1605 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1606 } 1607 1608 /* 1609 * Take an address, add the meta data size as well as the array of 1610 * array subbuffer indexes, then align it to a subbuffer size. 1611 * 1612 * This is used to help find the next per cpu subbuffer within a mapped range. 1613 */ 1614 static unsigned long 1615 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1616 { 1617 addr += sizeof(struct ring_buffer_cpu_meta) + 1618 sizeof(int) * nr_subbufs; 1619 return ALIGN(addr, subbuf_size); 1620 } 1621 1622 /* 1623 * Return the ring_buffer_meta for a given @cpu. 1624 */ 1625 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1626 { 1627 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1628 struct ring_buffer_cpu_meta *meta; 1629 struct ring_buffer_meta *bmeta; 1630 unsigned long ptr; 1631 int nr_subbufs; 1632 1633 bmeta = buffer->meta; 1634 if (!bmeta) 1635 return NULL; 1636 1637 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1638 meta = (struct ring_buffer_cpu_meta *)ptr; 1639 1640 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1641 if (!nr_pages) { 1642 nr_subbufs = meta->nr_subbufs; 1643 } else { 1644 /* Include the reader page */ 1645 nr_subbufs = nr_pages + 1; 1646 } 1647 1648 /* 1649 * The first chunk may not be subbuffer aligned, where as 1650 * the rest of the chunks are. 1651 */ 1652 if (cpu) { 1653 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1654 ptr += subbuf_size * nr_subbufs; 1655 1656 /* We can use multiplication to find chunks greater than 1 */ 1657 if (cpu > 1) { 1658 unsigned long size; 1659 unsigned long p; 1660 1661 /* Save the beginning of this CPU chunk */ 1662 p = ptr; 1663 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1664 ptr += subbuf_size * nr_subbufs; 1665 1666 /* Now all chunks after this are the same size */ 1667 size = ptr - p; 1668 ptr += size * (cpu - 2); 1669 } 1670 } 1671 return (void *)ptr; 1672 } 1673 1674 /* Return the start of subbufs given the meta pointer */ 1675 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1676 { 1677 int subbuf_size = meta->subbuf_size; 1678 unsigned long ptr; 1679 1680 ptr = (unsigned long)meta; 1681 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1682 1683 return (void *)ptr; 1684 } 1685 1686 /* 1687 * Return a specific sub-buffer for a given @cpu defined by @idx. 1688 */ 1689 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1690 { 1691 struct ring_buffer_cpu_meta *meta; 1692 unsigned long ptr; 1693 int subbuf_size; 1694 1695 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1696 if (!meta) 1697 return NULL; 1698 1699 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1700 return NULL; 1701 1702 subbuf_size = meta->subbuf_size; 1703 1704 /* Map this buffer to the order that's in meta->buffers[] */ 1705 idx = meta->buffers[idx]; 1706 1707 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1708 1709 ptr += subbuf_size * idx; 1710 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1711 return NULL; 1712 1713 return (void *)ptr; 1714 } 1715 1716 /* 1717 * See if the existing memory contains a valid meta section. 1718 * if so, use that, otherwise initialize it. 1719 */ 1720 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1721 { 1722 unsigned long ptr = buffer->range_addr_start; 1723 struct ring_buffer_meta *bmeta; 1724 unsigned long total_size; 1725 int struct_sizes; 1726 1727 bmeta = (struct ring_buffer_meta *)ptr; 1728 buffer->meta = bmeta; 1729 1730 total_size = buffer->range_addr_end - buffer->range_addr_start; 1731 1732 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1733 struct_sizes |= sizeof(*bmeta) << 16; 1734 1735 /* The first buffer will start word size after the meta page */ 1736 ptr += sizeof(*bmeta); 1737 ptr = ALIGN(ptr, sizeof(long)); 1738 ptr += scratch_size; 1739 1740 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1741 pr_info("Ring buffer boot meta mismatch of magic\n"); 1742 goto init; 1743 } 1744 1745 if (bmeta->struct_sizes != struct_sizes) { 1746 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1747 goto init; 1748 } 1749 1750 if (bmeta->total_size != total_size) { 1751 pr_info("Ring buffer boot meta mismatch of total size\n"); 1752 goto init; 1753 } 1754 1755 if (bmeta->buffers_offset > bmeta->total_size) { 1756 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1757 goto init; 1758 } 1759 1760 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1761 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1762 goto init; 1763 } 1764 1765 return true; 1766 1767 init: 1768 bmeta->magic = RING_BUFFER_META_MAGIC; 1769 bmeta->struct_sizes = struct_sizes; 1770 bmeta->total_size = total_size; 1771 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1772 1773 /* Zero out the scratch pad */ 1774 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1775 1776 return false; 1777 } 1778 1779 /* 1780 * See if the existing memory contains valid ring buffer data. 1781 * As the previous kernel must be the same as this kernel, all 1782 * the calculations (size of buffers and number of buffers) 1783 * must be the same. 1784 */ 1785 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1786 struct trace_buffer *buffer, int nr_pages, 1787 unsigned long *subbuf_mask) 1788 { 1789 int subbuf_size = PAGE_SIZE; 1790 struct buffer_data_page *subbuf; 1791 unsigned long buffers_start; 1792 unsigned long buffers_end; 1793 int i; 1794 1795 if (!subbuf_mask) 1796 return false; 1797 1798 buffers_start = meta->first_buffer; 1799 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1800 1801 /* Is the head and commit buffers within the range of buffers? */ 1802 if (meta->head_buffer < buffers_start || 1803 meta->head_buffer >= buffers_end) { 1804 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1805 return false; 1806 } 1807 1808 if (meta->commit_buffer < buffers_start || 1809 meta->commit_buffer >= buffers_end) { 1810 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1811 return false; 1812 } 1813 1814 subbuf = rb_subbufs_from_meta(meta); 1815 1816 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1817 1818 /* Is the meta buffers and the subbufs themselves have correct data? */ 1819 for (i = 0; i < meta->nr_subbufs; i++) { 1820 if (meta->buffers[i] < 0 || 1821 meta->buffers[i] >= meta->nr_subbufs) { 1822 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1823 return false; 1824 } 1825 1826 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1827 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1828 return false; 1829 } 1830 1831 if (test_bit(meta->buffers[i], subbuf_mask)) { 1832 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1833 return false; 1834 } 1835 1836 set_bit(meta->buffers[i], subbuf_mask); 1837 subbuf = (void *)subbuf + subbuf_size; 1838 } 1839 1840 return true; 1841 } 1842 1843 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1844 1845 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1846 unsigned long long *timestamp, u64 *delta_ptr) 1847 { 1848 struct ring_buffer_event *event; 1849 u64 ts, delta; 1850 int events = 0; 1851 int e; 1852 1853 *delta_ptr = 0; 1854 *timestamp = 0; 1855 1856 ts = dpage->time_stamp; 1857 1858 for (e = 0; e < tail; e += rb_event_length(event)) { 1859 1860 event = (struct ring_buffer_event *)(dpage->data + e); 1861 1862 switch (event->type_len) { 1863 1864 case RINGBUF_TYPE_TIME_EXTEND: 1865 delta = rb_event_time_stamp(event); 1866 ts += delta; 1867 break; 1868 1869 case RINGBUF_TYPE_TIME_STAMP: 1870 delta = rb_event_time_stamp(event); 1871 delta = rb_fix_abs_ts(delta, ts); 1872 if (delta < ts) { 1873 *delta_ptr = delta; 1874 *timestamp = ts; 1875 return -1; 1876 } 1877 ts = delta; 1878 break; 1879 1880 case RINGBUF_TYPE_PADDING: 1881 if (event->time_delta == 1) 1882 break; 1883 fallthrough; 1884 case RINGBUF_TYPE_DATA: 1885 events++; 1886 ts += event->time_delta; 1887 break; 1888 1889 default: 1890 return -1; 1891 } 1892 } 1893 *timestamp = ts; 1894 return events; 1895 } 1896 1897 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1898 { 1899 unsigned long long ts; 1900 u64 delta; 1901 int tail; 1902 1903 tail = local_read(&dpage->commit); 1904 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1905 } 1906 1907 /* If the meta data has been validated, now validate the events */ 1908 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1909 { 1910 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1911 struct buffer_page *head_page, *orig_head; 1912 unsigned long entry_bytes = 0; 1913 unsigned long entries = 0; 1914 int ret; 1915 u64 ts; 1916 int i; 1917 1918 if (!meta || !meta->head_buffer) 1919 return; 1920 1921 /* Do the reader page first */ 1922 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1923 if (ret < 0) { 1924 pr_info("Ring buffer reader page is invalid\n"); 1925 goto invalid; 1926 } 1927 entries += ret; 1928 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1929 local_set(&cpu_buffer->reader_page->entries, ret); 1930 1931 orig_head = head_page = cpu_buffer->head_page; 1932 ts = head_page->page->time_stamp; 1933 1934 /* 1935 * Try to rewind the head so that we can read the pages which already 1936 * read in the previous boot. 1937 */ 1938 if (head_page == cpu_buffer->tail_page) 1939 goto skip_rewind; 1940 1941 rb_dec_page(&head_page); 1942 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 1943 1944 /* Rewind until tail (writer) page. */ 1945 if (head_page == cpu_buffer->tail_page) 1946 break; 1947 1948 /* Ensure the page has older data than head. */ 1949 if (ts < head_page->page->time_stamp) 1950 break; 1951 1952 ts = head_page->page->time_stamp; 1953 /* Ensure the page has correct timestamp and some data. */ 1954 if (!ts || rb_page_commit(head_page) == 0) 1955 break; 1956 1957 /* Stop rewind if the page is invalid. */ 1958 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1959 if (ret < 0) 1960 break; 1961 1962 /* Recover the number of entries and update stats. */ 1963 local_set(&head_page->entries, ret); 1964 if (ret) 1965 local_inc(&cpu_buffer->pages_touched); 1966 entries += ret; 1967 entry_bytes += rb_page_commit(head_page); 1968 } 1969 if (i) 1970 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 1971 1972 /* The last rewound page must be skipped. */ 1973 if (head_page != orig_head) 1974 rb_inc_page(&head_page); 1975 1976 /* 1977 * If the ring buffer was rewound, then inject the reader page 1978 * into the location just before the original head page. 1979 */ 1980 if (head_page != orig_head) { 1981 struct buffer_page *bpage = orig_head; 1982 1983 rb_dec_page(&bpage); 1984 /* 1985 * Insert the reader_page before the original head page. 1986 * Since the list encode RB_PAGE flags, general list 1987 * operations should be avoided. 1988 */ 1989 cpu_buffer->reader_page->list.next = &orig_head->list; 1990 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 1991 orig_head->list.prev = &cpu_buffer->reader_page->list; 1992 bpage->list.next = &cpu_buffer->reader_page->list; 1993 1994 /* Make the head_page the reader page */ 1995 cpu_buffer->reader_page = head_page; 1996 bpage = head_page; 1997 rb_inc_page(&head_page); 1998 head_page->list.prev = bpage->list.prev; 1999 rb_dec_page(&bpage); 2000 bpage->list.next = &head_page->list; 2001 rb_set_list_to_head(&bpage->list); 2002 cpu_buffer->pages = &head_page->list; 2003 2004 cpu_buffer->head_page = head_page; 2005 meta->head_buffer = (unsigned long)head_page->page; 2006 2007 /* Reset all the indexes */ 2008 bpage = cpu_buffer->reader_page; 2009 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 2010 bpage->id = 0; 2011 2012 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 2013 i++, rb_inc_page(&bpage)) { 2014 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 2015 bpage->id = i; 2016 } 2017 2018 /* We'll restart verifying from orig_head */ 2019 head_page = orig_head; 2020 } 2021 2022 skip_rewind: 2023 /* If the commit_buffer is the reader page, update the commit page */ 2024 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 2025 cpu_buffer->commit_page = cpu_buffer->reader_page; 2026 /* Nothing more to do, the only page is the reader page */ 2027 goto done; 2028 } 2029 2030 /* Iterate until finding the commit page */ 2031 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 2032 2033 /* Reader page has already been done */ 2034 if (head_page == cpu_buffer->reader_page) 2035 continue; 2036 2037 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 2038 if (ret < 0) { 2039 pr_info("Ring buffer meta [%d] invalid buffer page\n", 2040 cpu_buffer->cpu); 2041 goto invalid; 2042 } 2043 2044 /* If the buffer has content, update pages_touched */ 2045 if (ret) 2046 local_inc(&cpu_buffer->pages_touched); 2047 2048 entries += ret; 2049 entry_bytes += local_read(&head_page->page->commit); 2050 local_set(&cpu_buffer->head_page->entries, ret); 2051 2052 if (head_page == cpu_buffer->commit_page) 2053 break; 2054 } 2055 2056 if (head_page != cpu_buffer->commit_page) { 2057 pr_info("Ring buffer meta [%d] commit page not found\n", 2058 cpu_buffer->cpu); 2059 goto invalid; 2060 } 2061 done: 2062 local_set(&cpu_buffer->entries, entries); 2063 local_set(&cpu_buffer->entries_bytes, entry_bytes); 2064 2065 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 2066 return; 2067 2068 invalid: 2069 /* The content of the buffers are invalid, reset the meta data */ 2070 meta->head_buffer = 0; 2071 meta->commit_buffer = 0; 2072 2073 /* Reset the reader page */ 2074 local_set(&cpu_buffer->reader_page->entries, 0); 2075 local_set(&cpu_buffer->reader_page->page->commit, 0); 2076 2077 /* Reset all the subbuffers */ 2078 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2079 local_set(&head_page->entries, 0); 2080 local_set(&head_page->page->commit, 0); 2081 } 2082 } 2083 2084 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2085 { 2086 struct ring_buffer_cpu_meta *meta; 2087 unsigned long *subbuf_mask; 2088 unsigned long delta; 2089 void *subbuf; 2090 bool valid = false; 2091 int cpu; 2092 int i; 2093 2094 /* Create a mask to test the subbuf array */ 2095 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2096 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2097 2098 if (rb_meta_init(buffer, scratch_size)) 2099 valid = true; 2100 2101 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2102 void *next_meta; 2103 2104 meta = rb_range_meta(buffer, nr_pages, cpu); 2105 2106 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2107 /* Make the mappings match the current address */ 2108 subbuf = rb_subbufs_from_meta(meta); 2109 delta = (unsigned long)subbuf - meta->first_buffer; 2110 meta->first_buffer += delta; 2111 meta->head_buffer += delta; 2112 meta->commit_buffer += delta; 2113 continue; 2114 } 2115 2116 if (cpu < nr_cpu_ids - 1) 2117 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2118 else 2119 next_meta = (void *)buffer->range_addr_end; 2120 2121 memset(meta, 0, next_meta - (void *)meta); 2122 2123 meta->nr_subbufs = nr_pages + 1; 2124 meta->subbuf_size = PAGE_SIZE; 2125 2126 subbuf = rb_subbufs_from_meta(meta); 2127 2128 meta->first_buffer = (unsigned long)subbuf; 2129 2130 /* 2131 * The buffers[] array holds the order of the sub-buffers 2132 * that are after the meta data. The sub-buffers may 2133 * be swapped out when read and inserted into a different 2134 * location of the ring buffer. Although their addresses 2135 * remain the same, the buffers[] array contains the 2136 * index into the sub-buffers holding their actual order. 2137 */ 2138 for (i = 0; i < meta->nr_subbufs; i++) { 2139 meta->buffers[i] = i; 2140 rb_init_page(subbuf); 2141 subbuf += meta->subbuf_size; 2142 } 2143 } 2144 bitmap_free(subbuf_mask); 2145 } 2146 2147 static void *rbm_start(struct seq_file *m, loff_t *pos) 2148 { 2149 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2150 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2151 unsigned long val; 2152 2153 if (!meta) 2154 return NULL; 2155 2156 if (*pos > meta->nr_subbufs) 2157 return NULL; 2158 2159 val = *pos; 2160 val++; 2161 2162 return (void *)val; 2163 } 2164 2165 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2166 { 2167 (*pos)++; 2168 2169 return rbm_start(m, pos); 2170 } 2171 2172 static int rbm_show(struct seq_file *m, void *v) 2173 { 2174 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2175 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2176 unsigned long val = (unsigned long)v; 2177 2178 if (val == 1) { 2179 seq_printf(m, "head_buffer: %d\n", 2180 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2181 seq_printf(m, "commit_buffer: %d\n", 2182 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2183 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2184 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2185 return 0; 2186 } 2187 2188 val -= 2; 2189 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2190 2191 return 0; 2192 } 2193 2194 static void rbm_stop(struct seq_file *m, void *p) 2195 { 2196 } 2197 2198 static const struct seq_operations rb_meta_seq_ops = { 2199 .start = rbm_start, 2200 .next = rbm_next, 2201 .show = rbm_show, 2202 .stop = rbm_stop, 2203 }; 2204 2205 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2206 { 2207 struct seq_file *m; 2208 int ret; 2209 2210 ret = seq_open(file, &rb_meta_seq_ops); 2211 if (ret) 2212 return ret; 2213 2214 m = file->private_data; 2215 m->private = buffer->buffers[cpu]; 2216 2217 return 0; 2218 } 2219 2220 /* Map the buffer_pages to the previous head and commit pages */ 2221 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2222 struct buffer_page *bpage) 2223 { 2224 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2225 2226 if (meta->head_buffer == (unsigned long)bpage->page) 2227 cpu_buffer->head_page = bpage; 2228 2229 if (meta->commit_buffer == (unsigned long)bpage->page) { 2230 cpu_buffer->commit_page = bpage; 2231 cpu_buffer->tail_page = bpage; 2232 } 2233 } 2234 2235 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2236 long nr_pages, struct list_head *pages) 2237 { 2238 struct trace_buffer *buffer = cpu_buffer->buffer; 2239 struct ring_buffer_cpu_meta *meta = NULL; 2240 struct buffer_page *bpage, *tmp; 2241 bool user_thread = current->mm != NULL; 2242 long i; 2243 2244 /* 2245 * Check if the available memory is there first. 2246 * Note, si_mem_available() only gives us a rough estimate of available 2247 * memory. It may not be accurate. But we don't care, we just want 2248 * to prevent doing any allocation when it is obvious that it is 2249 * not going to succeed. 2250 */ 2251 i = si_mem_available(); 2252 if (i < nr_pages) 2253 return -ENOMEM; 2254 2255 /* 2256 * If a user thread allocates too much, and si_mem_available() 2257 * reports there's enough memory, even though there is not. 2258 * Make sure the OOM killer kills this thread. This can happen 2259 * even with RETRY_MAYFAIL because another task may be doing 2260 * an allocation after this task has taken all memory. 2261 * This is the task the OOM killer needs to take out during this 2262 * loop, even if it was triggered by an allocation somewhere else. 2263 */ 2264 if (user_thread) 2265 set_current_oom_origin(); 2266 2267 if (buffer->range_addr_start) 2268 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2269 2270 for (i = 0; i < nr_pages; i++) { 2271 2272 bpage = alloc_cpu_page(cpu_buffer->cpu); 2273 if (!bpage) 2274 goto free_pages; 2275 2276 rb_check_bpage(cpu_buffer, bpage); 2277 2278 /* 2279 * Append the pages as for mapped buffers we want to keep 2280 * the order 2281 */ 2282 list_add_tail(&bpage->list, pages); 2283 2284 if (meta) { 2285 /* A range was given. Use that for the buffer page */ 2286 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2287 if (!bpage->page) 2288 goto free_pages; 2289 /* If this is valid from a previous boot */ 2290 if (meta->head_buffer) 2291 rb_meta_buffer_update(cpu_buffer, bpage); 2292 bpage->range = 1; 2293 bpage->id = i + 1; 2294 } else { 2295 int order = cpu_buffer->buffer->subbuf_order; 2296 bpage->page = alloc_cpu_data(cpu_buffer->cpu, order); 2297 if (!bpage->page) 2298 goto free_pages; 2299 } 2300 bpage->order = cpu_buffer->buffer->subbuf_order; 2301 2302 if (user_thread && fatal_signal_pending(current)) 2303 goto free_pages; 2304 } 2305 if (user_thread) 2306 clear_current_oom_origin(); 2307 2308 return 0; 2309 2310 free_pages: 2311 list_for_each_entry_safe(bpage, tmp, pages, list) { 2312 list_del_init(&bpage->list); 2313 free_buffer_page(bpage); 2314 } 2315 if (user_thread) 2316 clear_current_oom_origin(); 2317 2318 return -ENOMEM; 2319 } 2320 2321 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2322 unsigned long nr_pages) 2323 { 2324 LIST_HEAD(pages); 2325 2326 WARN_ON(!nr_pages); 2327 2328 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2329 return -ENOMEM; 2330 2331 /* 2332 * The ring buffer page list is a circular list that does not 2333 * start and end with a list head. All page list items point to 2334 * other pages. 2335 */ 2336 cpu_buffer->pages = pages.next; 2337 list_del(&pages); 2338 2339 cpu_buffer->nr_pages = nr_pages; 2340 2341 rb_check_pages(cpu_buffer); 2342 2343 return 0; 2344 } 2345 2346 static struct ring_buffer_per_cpu * 2347 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2348 { 2349 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = 2350 alloc_cpu_buffer(cpu); 2351 struct ring_buffer_cpu_meta *meta; 2352 struct buffer_page *bpage; 2353 int ret; 2354 2355 if (!cpu_buffer) 2356 return NULL; 2357 2358 cpu_buffer->cpu = cpu; 2359 cpu_buffer->buffer = buffer; 2360 raw_spin_lock_init(&cpu_buffer->reader_lock); 2361 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2362 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2363 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2364 init_completion(&cpu_buffer->update_done); 2365 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2366 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2367 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2368 mutex_init(&cpu_buffer->mapping_lock); 2369 2370 bpage = alloc_cpu_page(cpu); 2371 if (!bpage) 2372 return NULL; 2373 2374 rb_check_bpage(cpu_buffer, bpage); 2375 2376 cpu_buffer->reader_page = bpage; 2377 2378 if (buffer->range_addr_start) { 2379 /* 2380 * Range mapped buffers have the same restrictions as memory 2381 * mapped ones do. 2382 */ 2383 cpu_buffer->mapped = 1; 2384 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2385 bpage->page = rb_range_buffer(cpu_buffer, 0); 2386 if (!bpage->page) 2387 goto fail_free_reader; 2388 if (cpu_buffer->ring_meta->head_buffer) 2389 rb_meta_buffer_update(cpu_buffer, bpage); 2390 bpage->range = 1; 2391 } else { 2392 int order = cpu_buffer->buffer->subbuf_order; 2393 bpage->page = alloc_cpu_data(cpu, order); 2394 if (!bpage->page) 2395 goto fail_free_reader; 2396 } 2397 2398 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2399 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2400 2401 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2402 if (ret < 0) 2403 goto fail_free_reader; 2404 2405 rb_meta_validate_events(cpu_buffer); 2406 2407 /* If the boot meta was valid then this has already been updated */ 2408 meta = cpu_buffer->ring_meta; 2409 if (!meta || !meta->head_buffer || 2410 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2411 if (meta && meta->head_buffer && 2412 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2413 pr_warn("Ring buffer meta buffers not all mapped\n"); 2414 if (!cpu_buffer->head_page) 2415 pr_warn(" Missing head_page\n"); 2416 if (!cpu_buffer->commit_page) 2417 pr_warn(" Missing commit_page\n"); 2418 if (!cpu_buffer->tail_page) 2419 pr_warn(" Missing tail_page\n"); 2420 } 2421 2422 cpu_buffer->head_page 2423 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2424 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2425 2426 rb_head_page_activate(cpu_buffer); 2427 2428 if (cpu_buffer->ring_meta) 2429 meta->commit_buffer = meta->head_buffer; 2430 } else { 2431 /* The valid meta buffer still needs to activate the head page */ 2432 rb_head_page_activate(cpu_buffer); 2433 } 2434 2435 return_ptr(cpu_buffer); 2436 2437 fail_free_reader: 2438 free_buffer_page(cpu_buffer->reader_page); 2439 2440 return NULL; 2441 } 2442 2443 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2444 { 2445 struct list_head *head = cpu_buffer->pages; 2446 struct buffer_page *bpage, *tmp; 2447 2448 irq_work_sync(&cpu_buffer->irq_work.work); 2449 2450 free_buffer_page(cpu_buffer->reader_page); 2451 2452 if (head) { 2453 rb_head_page_deactivate(cpu_buffer); 2454 2455 list_for_each_entry_safe(bpage, tmp, head, list) { 2456 list_del_init(&bpage->list); 2457 free_buffer_page(bpage); 2458 } 2459 bpage = list_entry(head, struct buffer_page, list); 2460 free_buffer_page(bpage); 2461 } 2462 2463 free_page((unsigned long)cpu_buffer->free_page); 2464 2465 kfree(cpu_buffer); 2466 } 2467 2468 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2469 int order, unsigned long start, 2470 unsigned long end, 2471 unsigned long scratch_size, 2472 struct lock_class_key *key) 2473 { 2474 struct trace_buffer *buffer __free(kfree) = NULL; 2475 long nr_pages; 2476 int subbuf_size; 2477 int bsize; 2478 int cpu; 2479 int ret; 2480 2481 /* keep it in its own cache line */ 2482 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2483 GFP_KERNEL); 2484 if (!buffer) 2485 return NULL; 2486 2487 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2488 return NULL; 2489 2490 buffer->subbuf_order = order; 2491 subbuf_size = (PAGE_SIZE << order); 2492 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2493 2494 /* Max payload is buffer page size - header (8bytes) */ 2495 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2496 2497 buffer->flags = flags; 2498 buffer->clock = trace_clock_local; 2499 buffer->reader_lock_key = key; 2500 2501 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2502 init_waitqueue_head(&buffer->irq_work.waiters); 2503 2504 buffer->cpus = nr_cpu_ids; 2505 2506 bsize = sizeof(void *) * nr_cpu_ids; 2507 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2508 GFP_KERNEL); 2509 if (!buffer->buffers) 2510 goto fail_free_cpumask; 2511 2512 /* If start/end are specified, then that overrides size */ 2513 if (start && end) { 2514 unsigned long buffers_start; 2515 unsigned long ptr; 2516 int n; 2517 2518 /* Make sure that start is word aligned */ 2519 start = ALIGN(start, sizeof(long)); 2520 2521 /* scratch_size needs to be aligned too */ 2522 scratch_size = ALIGN(scratch_size, sizeof(long)); 2523 2524 /* Subtract the buffer meta data and word aligned */ 2525 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2526 buffers_start = ALIGN(buffers_start, sizeof(long)); 2527 buffers_start += scratch_size; 2528 2529 /* Calculate the size for the per CPU data */ 2530 size = end - buffers_start; 2531 size = size / nr_cpu_ids; 2532 2533 /* 2534 * The number of sub-buffers (nr_pages) is determined by the 2535 * total size allocated minus the meta data size. 2536 * Then that is divided by the number of per CPU buffers 2537 * needed, plus account for the integer array index that 2538 * will be appended to the meta data. 2539 */ 2540 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2541 (subbuf_size + sizeof(int)); 2542 /* Need at least two pages plus the reader page */ 2543 if (nr_pages < 3) 2544 goto fail_free_buffers; 2545 2546 again: 2547 /* Make sure that the size fits aligned */ 2548 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2549 ptr += sizeof(struct ring_buffer_cpu_meta) + 2550 sizeof(int) * nr_pages; 2551 ptr = ALIGN(ptr, subbuf_size); 2552 ptr += subbuf_size * nr_pages; 2553 } 2554 if (ptr > end) { 2555 if (nr_pages <= 3) 2556 goto fail_free_buffers; 2557 nr_pages--; 2558 goto again; 2559 } 2560 2561 /* nr_pages should not count the reader page */ 2562 nr_pages--; 2563 buffer->range_addr_start = start; 2564 buffer->range_addr_end = end; 2565 2566 rb_range_meta_init(buffer, nr_pages, scratch_size); 2567 } else { 2568 2569 /* need at least two pages */ 2570 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2571 if (nr_pages < 2) 2572 nr_pages = 2; 2573 } 2574 2575 cpu = raw_smp_processor_id(); 2576 cpumask_set_cpu(cpu, buffer->cpumask); 2577 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2578 if (!buffer->buffers[cpu]) 2579 goto fail_free_buffers; 2580 2581 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2582 if (ret < 0) 2583 goto fail_free_buffers; 2584 2585 mutex_init(&buffer->mutex); 2586 2587 return_ptr(buffer); 2588 2589 fail_free_buffers: 2590 for_each_buffer_cpu(buffer, cpu) { 2591 if (buffer->buffers[cpu]) 2592 rb_free_cpu_buffer(buffer->buffers[cpu]); 2593 } 2594 kfree(buffer->buffers); 2595 2596 fail_free_cpumask: 2597 free_cpumask_var(buffer->cpumask); 2598 2599 return NULL; 2600 } 2601 2602 /** 2603 * __ring_buffer_alloc - allocate a new ring_buffer 2604 * @size: the size in bytes per cpu that is needed. 2605 * @flags: attributes to set for the ring buffer. 2606 * @key: ring buffer reader_lock_key. 2607 * 2608 * Currently the only flag that is available is the RB_FL_OVERWRITE 2609 * flag. This flag means that the buffer will overwrite old data 2610 * when the buffer wraps. If this flag is not set, the buffer will 2611 * drop data when the tail hits the head. 2612 */ 2613 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2614 struct lock_class_key *key) 2615 { 2616 /* Default buffer page size - one system page */ 2617 return alloc_buffer(size, flags, 0, 0, 0, 0, key); 2618 2619 } 2620 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2621 2622 /** 2623 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2624 * @size: the size in bytes per cpu that is needed. 2625 * @flags: attributes to set for the ring buffer. 2626 * @order: sub-buffer order 2627 * @start: start of allocated range 2628 * @range_size: size of allocated range 2629 * @scratch_size: size of scratch area (for preallocated memory buffers) 2630 * @key: ring buffer reader_lock_key. 2631 * 2632 * Currently the only flag that is available is the RB_FL_OVERWRITE 2633 * flag. This flag means that the buffer will overwrite old data 2634 * when the buffer wraps. If this flag is not set, the buffer will 2635 * drop data when the tail hits the head. 2636 */ 2637 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2638 int order, unsigned long start, 2639 unsigned long range_size, 2640 unsigned long scratch_size, 2641 struct lock_class_key *key) 2642 { 2643 return alloc_buffer(size, flags, order, start, start + range_size, 2644 scratch_size, key); 2645 } 2646 2647 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2648 { 2649 struct ring_buffer_meta *meta; 2650 void *ptr; 2651 2652 if (!buffer || !buffer->meta) 2653 return NULL; 2654 2655 meta = buffer->meta; 2656 2657 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2658 2659 if (size) 2660 *size = (void *)meta + meta->buffers_offset - ptr; 2661 2662 return ptr; 2663 } 2664 2665 /** 2666 * ring_buffer_free - free a ring buffer. 2667 * @buffer: the buffer to free. 2668 */ 2669 void 2670 ring_buffer_free(struct trace_buffer *buffer) 2671 { 2672 int cpu; 2673 2674 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2675 2676 irq_work_sync(&buffer->irq_work.work); 2677 2678 for_each_buffer_cpu(buffer, cpu) 2679 rb_free_cpu_buffer(buffer->buffers[cpu]); 2680 2681 kfree(buffer->buffers); 2682 free_cpumask_var(buffer->cpumask); 2683 2684 kfree(buffer); 2685 } 2686 EXPORT_SYMBOL_GPL(ring_buffer_free); 2687 2688 void ring_buffer_set_clock(struct trace_buffer *buffer, 2689 u64 (*clock)(void)) 2690 { 2691 buffer->clock = clock; 2692 } 2693 2694 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2695 { 2696 buffer->time_stamp_abs = abs; 2697 } 2698 2699 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2700 { 2701 return buffer->time_stamp_abs; 2702 } 2703 2704 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2705 { 2706 return local_read(&bpage->entries) & RB_WRITE_MASK; 2707 } 2708 2709 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2710 { 2711 return local_read(&bpage->write) & RB_WRITE_MASK; 2712 } 2713 2714 static bool 2715 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2716 { 2717 struct list_head *tail_page, *to_remove, *next_page; 2718 struct buffer_page *to_remove_page, *tmp_iter_page; 2719 struct buffer_page *last_page, *first_page; 2720 unsigned long nr_removed; 2721 unsigned long head_bit; 2722 int page_entries; 2723 2724 head_bit = 0; 2725 2726 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2727 atomic_inc(&cpu_buffer->record_disabled); 2728 /* 2729 * We don't race with the readers since we have acquired the reader 2730 * lock. We also don't race with writers after disabling recording. 2731 * This makes it easy to figure out the first and the last page to be 2732 * removed from the list. We unlink all the pages in between including 2733 * the first and last pages. This is done in a busy loop so that we 2734 * lose the least number of traces. 2735 * The pages are freed after we restart recording and unlock readers. 2736 */ 2737 tail_page = &cpu_buffer->tail_page->list; 2738 2739 /* 2740 * tail page might be on reader page, we remove the next page 2741 * from the ring buffer 2742 */ 2743 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2744 tail_page = rb_list_head(tail_page->next); 2745 to_remove = tail_page; 2746 2747 /* start of pages to remove */ 2748 first_page = list_entry(rb_list_head(to_remove->next), 2749 struct buffer_page, list); 2750 2751 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2752 to_remove = rb_list_head(to_remove)->next; 2753 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2754 } 2755 /* Read iterators need to reset themselves when some pages removed */ 2756 cpu_buffer->pages_removed += nr_removed; 2757 2758 next_page = rb_list_head(to_remove)->next; 2759 2760 /* 2761 * Now we remove all pages between tail_page and next_page. 2762 * Make sure that we have head_bit value preserved for the 2763 * next page 2764 */ 2765 tail_page->next = (struct list_head *)((unsigned long)next_page | 2766 head_bit); 2767 next_page = rb_list_head(next_page); 2768 next_page->prev = tail_page; 2769 2770 /* make sure pages points to a valid page in the ring buffer */ 2771 cpu_buffer->pages = next_page; 2772 cpu_buffer->cnt++; 2773 2774 /* update head page */ 2775 if (head_bit) 2776 cpu_buffer->head_page = list_entry(next_page, 2777 struct buffer_page, list); 2778 2779 /* pages are removed, resume tracing and then free the pages */ 2780 atomic_dec(&cpu_buffer->record_disabled); 2781 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2782 2783 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2784 2785 /* last buffer page to remove */ 2786 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2787 list); 2788 tmp_iter_page = first_page; 2789 2790 do { 2791 cond_resched(); 2792 2793 to_remove_page = tmp_iter_page; 2794 rb_inc_page(&tmp_iter_page); 2795 2796 /* update the counters */ 2797 page_entries = rb_page_entries(to_remove_page); 2798 if (page_entries) { 2799 /* 2800 * If something was added to this page, it was full 2801 * since it is not the tail page. So we deduct the 2802 * bytes consumed in ring buffer from here. 2803 * Increment overrun to account for the lost events. 2804 */ 2805 local_add(page_entries, &cpu_buffer->overrun); 2806 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2807 local_inc(&cpu_buffer->pages_lost); 2808 } 2809 2810 /* 2811 * We have already removed references to this list item, just 2812 * free up the buffer_page and its page 2813 */ 2814 free_buffer_page(to_remove_page); 2815 nr_removed--; 2816 2817 } while (to_remove_page != last_page); 2818 2819 RB_WARN_ON(cpu_buffer, nr_removed); 2820 2821 return nr_removed == 0; 2822 } 2823 2824 static bool 2825 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2826 { 2827 struct list_head *pages = &cpu_buffer->new_pages; 2828 unsigned long flags; 2829 bool success; 2830 int retries; 2831 2832 /* Can be called at early boot up, where interrupts must not been enabled */ 2833 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2834 /* 2835 * We are holding the reader lock, so the reader page won't be swapped 2836 * in the ring buffer. Now we are racing with the writer trying to 2837 * move head page and the tail page. 2838 * We are going to adapt the reader page update process where: 2839 * 1. We first splice the start and end of list of new pages between 2840 * the head page and its previous page. 2841 * 2. We cmpxchg the prev_page->next to point from head page to the 2842 * start of new pages list. 2843 * 3. Finally, we update the head->prev to the end of new list. 2844 * 2845 * We will try this process 10 times, to make sure that we don't keep 2846 * spinning. 2847 */ 2848 retries = 10; 2849 success = false; 2850 while (retries--) { 2851 struct list_head *head_page, *prev_page; 2852 struct list_head *last_page, *first_page; 2853 struct list_head *head_page_with_bit; 2854 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2855 2856 if (!hpage) 2857 break; 2858 head_page = &hpage->list; 2859 prev_page = head_page->prev; 2860 2861 first_page = pages->next; 2862 last_page = pages->prev; 2863 2864 head_page_with_bit = (struct list_head *) 2865 ((unsigned long)head_page | RB_PAGE_HEAD); 2866 2867 last_page->next = head_page_with_bit; 2868 first_page->prev = prev_page; 2869 2870 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2871 if (try_cmpxchg(&prev_page->next, 2872 &head_page_with_bit, first_page)) { 2873 /* 2874 * yay, we replaced the page pointer to our new list, 2875 * now, we just have to update to head page's prev 2876 * pointer to point to end of list 2877 */ 2878 head_page->prev = last_page; 2879 cpu_buffer->cnt++; 2880 success = true; 2881 break; 2882 } 2883 } 2884 2885 if (success) 2886 INIT_LIST_HEAD(pages); 2887 /* 2888 * If we weren't successful in adding in new pages, warn and stop 2889 * tracing 2890 */ 2891 RB_WARN_ON(cpu_buffer, !success); 2892 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2893 2894 /* free pages if they weren't inserted */ 2895 if (!success) { 2896 struct buffer_page *bpage, *tmp; 2897 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2898 list) { 2899 list_del_init(&bpage->list); 2900 free_buffer_page(bpage); 2901 } 2902 } 2903 return success; 2904 } 2905 2906 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2907 { 2908 bool success; 2909 2910 if (cpu_buffer->nr_pages_to_update > 0) 2911 success = rb_insert_pages(cpu_buffer); 2912 else 2913 success = rb_remove_pages(cpu_buffer, 2914 -cpu_buffer->nr_pages_to_update); 2915 2916 if (success) 2917 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2918 } 2919 2920 static void update_pages_handler(struct work_struct *work) 2921 { 2922 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2923 struct ring_buffer_per_cpu, update_pages_work); 2924 rb_update_pages(cpu_buffer); 2925 complete(&cpu_buffer->update_done); 2926 } 2927 2928 /** 2929 * ring_buffer_resize - resize the ring buffer 2930 * @buffer: the buffer to resize. 2931 * @size: the new size. 2932 * @cpu_id: the cpu buffer to resize 2933 * 2934 * Minimum size is 2 * buffer->subbuf_size. 2935 * 2936 * Returns 0 on success and < 0 on failure. 2937 */ 2938 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2939 int cpu_id) 2940 { 2941 struct ring_buffer_per_cpu *cpu_buffer; 2942 unsigned long nr_pages; 2943 int cpu, err; 2944 2945 /* 2946 * Always succeed at resizing a non-existent buffer: 2947 */ 2948 if (!buffer) 2949 return 0; 2950 2951 /* Make sure the requested buffer exists */ 2952 if (cpu_id != RING_BUFFER_ALL_CPUS && 2953 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2954 return 0; 2955 2956 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2957 2958 /* we need a minimum of two pages */ 2959 if (nr_pages < 2) 2960 nr_pages = 2; 2961 2962 /* 2963 * Keep CPUs from coming online while resizing to synchronize 2964 * with new per CPU buffers being created. 2965 */ 2966 guard(cpus_read_lock)(); 2967 2968 /* prevent another thread from changing buffer sizes */ 2969 mutex_lock(&buffer->mutex); 2970 atomic_inc(&buffer->resizing); 2971 2972 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2973 /* 2974 * Don't succeed if resizing is disabled, as a reader might be 2975 * manipulating the ring buffer and is expecting a sane state while 2976 * this is true. 2977 */ 2978 for_each_buffer_cpu(buffer, cpu) { 2979 cpu_buffer = buffer->buffers[cpu]; 2980 if (atomic_read(&cpu_buffer->resize_disabled)) { 2981 err = -EBUSY; 2982 goto out_err_unlock; 2983 } 2984 } 2985 2986 /* calculate the pages to update */ 2987 for_each_buffer_cpu(buffer, cpu) { 2988 cpu_buffer = buffer->buffers[cpu]; 2989 2990 cpu_buffer->nr_pages_to_update = nr_pages - 2991 cpu_buffer->nr_pages; 2992 /* 2993 * nothing more to do for removing pages or no update 2994 */ 2995 if (cpu_buffer->nr_pages_to_update <= 0) 2996 continue; 2997 /* 2998 * to add pages, make sure all new pages can be 2999 * allocated without receiving ENOMEM 3000 */ 3001 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3002 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3003 &cpu_buffer->new_pages)) { 3004 /* not enough memory for new pages */ 3005 err = -ENOMEM; 3006 goto out_err; 3007 } 3008 3009 cond_resched(); 3010 } 3011 3012 /* 3013 * Fire off all the required work handlers 3014 * We can't schedule on offline CPUs, but it's not necessary 3015 * since we can change their buffer sizes without any race. 3016 */ 3017 for_each_buffer_cpu(buffer, cpu) { 3018 cpu_buffer = buffer->buffers[cpu]; 3019 if (!cpu_buffer->nr_pages_to_update) 3020 continue; 3021 3022 /* Can't run something on an offline CPU. */ 3023 if (!cpu_online(cpu)) { 3024 rb_update_pages(cpu_buffer); 3025 cpu_buffer->nr_pages_to_update = 0; 3026 } else { 3027 /* Run directly if possible. */ 3028 migrate_disable(); 3029 if (cpu != smp_processor_id()) { 3030 migrate_enable(); 3031 schedule_work_on(cpu, 3032 &cpu_buffer->update_pages_work); 3033 } else { 3034 update_pages_handler(&cpu_buffer->update_pages_work); 3035 migrate_enable(); 3036 } 3037 } 3038 } 3039 3040 /* wait for all the updates to complete */ 3041 for_each_buffer_cpu(buffer, cpu) { 3042 cpu_buffer = buffer->buffers[cpu]; 3043 if (!cpu_buffer->nr_pages_to_update) 3044 continue; 3045 3046 if (cpu_online(cpu)) 3047 wait_for_completion(&cpu_buffer->update_done); 3048 cpu_buffer->nr_pages_to_update = 0; 3049 } 3050 3051 } else { 3052 cpu_buffer = buffer->buffers[cpu_id]; 3053 3054 if (nr_pages == cpu_buffer->nr_pages) 3055 goto out; 3056 3057 /* 3058 * Don't succeed if resizing is disabled, as a reader might be 3059 * manipulating the ring buffer and is expecting a sane state while 3060 * this is true. 3061 */ 3062 if (atomic_read(&cpu_buffer->resize_disabled)) { 3063 err = -EBUSY; 3064 goto out_err_unlock; 3065 } 3066 3067 cpu_buffer->nr_pages_to_update = nr_pages - 3068 cpu_buffer->nr_pages; 3069 3070 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3071 if (cpu_buffer->nr_pages_to_update > 0 && 3072 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3073 &cpu_buffer->new_pages)) { 3074 err = -ENOMEM; 3075 goto out_err; 3076 } 3077 3078 /* Can't run something on an offline CPU. */ 3079 if (!cpu_online(cpu_id)) 3080 rb_update_pages(cpu_buffer); 3081 else { 3082 /* Run directly if possible. */ 3083 migrate_disable(); 3084 if (cpu_id == smp_processor_id()) { 3085 rb_update_pages(cpu_buffer); 3086 migrate_enable(); 3087 } else { 3088 migrate_enable(); 3089 schedule_work_on(cpu_id, 3090 &cpu_buffer->update_pages_work); 3091 wait_for_completion(&cpu_buffer->update_done); 3092 } 3093 } 3094 3095 cpu_buffer->nr_pages_to_update = 0; 3096 } 3097 3098 out: 3099 /* 3100 * The ring buffer resize can happen with the ring buffer 3101 * enabled, so that the update disturbs the tracing as little 3102 * as possible. But if the buffer is disabled, we do not need 3103 * to worry about that, and we can take the time to verify 3104 * that the buffer is not corrupt. 3105 */ 3106 if (atomic_read(&buffer->record_disabled)) { 3107 atomic_inc(&buffer->record_disabled); 3108 /* 3109 * Even though the buffer was disabled, we must make sure 3110 * that it is truly disabled before calling rb_check_pages. 3111 * There could have been a race between checking 3112 * record_disable and incrementing it. 3113 */ 3114 synchronize_rcu(); 3115 for_each_buffer_cpu(buffer, cpu) { 3116 cpu_buffer = buffer->buffers[cpu]; 3117 rb_check_pages(cpu_buffer); 3118 } 3119 atomic_dec(&buffer->record_disabled); 3120 } 3121 3122 atomic_dec(&buffer->resizing); 3123 mutex_unlock(&buffer->mutex); 3124 return 0; 3125 3126 out_err: 3127 for_each_buffer_cpu(buffer, cpu) { 3128 struct buffer_page *bpage, *tmp; 3129 3130 cpu_buffer = buffer->buffers[cpu]; 3131 cpu_buffer->nr_pages_to_update = 0; 3132 3133 if (list_empty(&cpu_buffer->new_pages)) 3134 continue; 3135 3136 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3137 list) { 3138 list_del_init(&bpage->list); 3139 free_buffer_page(bpage); 3140 } 3141 } 3142 out_err_unlock: 3143 atomic_dec(&buffer->resizing); 3144 mutex_unlock(&buffer->mutex); 3145 return err; 3146 } 3147 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3148 3149 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3150 { 3151 mutex_lock(&buffer->mutex); 3152 if (val) 3153 buffer->flags |= RB_FL_OVERWRITE; 3154 else 3155 buffer->flags &= ~RB_FL_OVERWRITE; 3156 mutex_unlock(&buffer->mutex); 3157 } 3158 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3159 3160 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3161 { 3162 return bpage->page->data + index; 3163 } 3164 3165 static __always_inline struct ring_buffer_event * 3166 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3167 { 3168 return __rb_page_index(cpu_buffer->reader_page, 3169 cpu_buffer->reader_page->read); 3170 } 3171 3172 static struct ring_buffer_event * 3173 rb_iter_head_event(struct ring_buffer_iter *iter) 3174 { 3175 struct ring_buffer_event *event; 3176 struct buffer_page *iter_head_page = iter->head_page; 3177 unsigned long commit; 3178 unsigned length; 3179 3180 if (iter->head != iter->next_event) 3181 return iter->event; 3182 3183 /* 3184 * When the writer goes across pages, it issues a cmpxchg which 3185 * is a mb(), which will synchronize with the rmb here. 3186 * (see rb_tail_page_update() and __rb_reserve_next()) 3187 */ 3188 commit = rb_page_commit(iter_head_page); 3189 smp_rmb(); 3190 3191 /* An event needs to be at least 8 bytes in size */ 3192 if (iter->head > commit - 8) 3193 goto reset; 3194 3195 event = __rb_page_index(iter_head_page, iter->head); 3196 length = rb_event_length(event); 3197 3198 /* 3199 * READ_ONCE() doesn't work on functions and we don't want the 3200 * compiler doing any crazy optimizations with length. 3201 */ 3202 barrier(); 3203 3204 if ((iter->head + length) > commit || length > iter->event_size) 3205 /* Writer corrupted the read? */ 3206 goto reset; 3207 3208 memcpy(iter->event, event, length); 3209 /* 3210 * If the page stamp is still the same after this rmb() then the 3211 * event was safely copied without the writer entering the page. 3212 */ 3213 smp_rmb(); 3214 3215 /* Make sure the page didn't change since we read this */ 3216 if (iter->page_stamp != iter_head_page->page->time_stamp || 3217 commit > rb_page_commit(iter_head_page)) 3218 goto reset; 3219 3220 iter->next_event = iter->head + length; 3221 return iter->event; 3222 reset: 3223 /* Reset to the beginning */ 3224 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3225 iter->head = 0; 3226 iter->next_event = 0; 3227 iter->missed_events = 1; 3228 return NULL; 3229 } 3230 3231 /* Size is determined by what has been committed */ 3232 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3233 { 3234 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3235 } 3236 3237 static __always_inline unsigned 3238 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3239 { 3240 return rb_page_commit(cpu_buffer->commit_page); 3241 } 3242 3243 static __always_inline unsigned 3244 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3245 { 3246 unsigned long addr = (unsigned long)event; 3247 3248 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3249 3250 return addr - BUF_PAGE_HDR_SIZE; 3251 } 3252 3253 static void rb_inc_iter(struct ring_buffer_iter *iter) 3254 { 3255 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3256 3257 /* 3258 * The iterator could be on the reader page (it starts there). 3259 * But the head could have moved, since the reader was 3260 * found. Check for this case and assign the iterator 3261 * to the head page instead of next. 3262 */ 3263 if (iter->head_page == cpu_buffer->reader_page) 3264 iter->head_page = rb_set_head_page(cpu_buffer); 3265 else 3266 rb_inc_page(&iter->head_page); 3267 3268 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3269 iter->head = 0; 3270 iter->next_event = 0; 3271 } 3272 3273 /* Return the index into the sub-buffers for a given sub-buffer */ 3274 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3275 { 3276 void *subbuf_array; 3277 3278 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3279 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3280 return (subbuf - subbuf_array) / meta->subbuf_size; 3281 } 3282 3283 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3284 struct buffer_page *next_page) 3285 { 3286 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3287 unsigned long old_head = (unsigned long)next_page->page; 3288 unsigned long new_head; 3289 3290 rb_inc_page(&next_page); 3291 new_head = (unsigned long)next_page->page; 3292 3293 /* 3294 * Only move it forward once, if something else came in and 3295 * moved it forward, then we don't want to touch it. 3296 */ 3297 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3298 } 3299 3300 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3301 struct buffer_page *reader) 3302 { 3303 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3304 void *old_reader = cpu_buffer->reader_page->page; 3305 void *new_reader = reader->page; 3306 int id; 3307 3308 id = reader->id; 3309 cpu_buffer->reader_page->id = id; 3310 reader->id = 0; 3311 3312 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3313 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3314 3315 /* The head pointer is the one after the reader */ 3316 rb_update_meta_head(cpu_buffer, reader); 3317 } 3318 3319 /* 3320 * rb_handle_head_page - writer hit the head page 3321 * 3322 * Returns: +1 to retry page 3323 * 0 to continue 3324 * -1 on error 3325 */ 3326 static int 3327 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3328 struct buffer_page *tail_page, 3329 struct buffer_page *next_page) 3330 { 3331 struct buffer_page *new_head; 3332 int entries; 3333 int type; 3334 int ret; 3335 3336 entries = rb_page_entries(next_page); 3337 3338 /* 3339 * The hard part is here. We need to move the head 3340 * forward, and protect against both readers on 3341 * other CPUs and writers coming in via interrupts. 3342 */ 3343 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3344 RB_PAGE_HEAD); 3345 3346 /* 3347 * type can be one of four: 3348 * NORMAL - an interrupt already moved it for us 3349 * HEAD - we are the first to get here. 3350 * UPDATE - we are the interrupt interrupting 3351 * a current move. 3352 * MOVED - a reader on another CPU moved the next 3353 * pointer to its reader page. Give up 3354 * and try again. 3355 */ 3356 3357 switch (type) { 3358 case RB_PAGE_HEAD: 3359 /* 3360 * We changed the head to UPDATE, thus 3361 * it is our responsibility to update 3362 * the counters. 3363 */ 3364 local_add(entries, &cpu_buffer->overrun); 3365 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3366 local_inc(&cpu_buffer->pages_lost); 3367 3368 if (cpu_buffer->ring_meta) 3369 rb_update_meta_head(cpu_buffer, next_page); 3370 /* 3371 * The entries will be zeroed out when we move the 3372 * tail page. 3373 */ 3374 3375 /* still more to do */ 3376 break; 3377 3378 case RB_PAGE_UPDATE: 3379 /* 3380 * This is an interrupt that interrupt the 3381 * previous update. Still more to do. 3382 */ 3383 break; 3384 case RB_PAGE_NORMAL: 3385 /* 3386 * An interrupt came in before the update 3387 * and processed this for us. 3388 * Nothing left to do. 3389 */ 3390 return 1; 3391 case RB_PAGE_MOVED: 3392 /* 3393 * The reader is on another CPU and just did 3394 * a swap with our next_page. 3395 * Try again. 3396 */ 3397 return 1; 3398 default: 3399 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3400 return -1; 3401 } 3402 3403 /* 3404 * Now that we are here, the old head pointer is 3405 * set to UPDATE. This will keep the reader from 3406 * swapping the head page with the reader page. 3407 * The reader (on another CPU) will spin till 3408 * we are finished. 3409 * 3410 * We just need to protect against interrupts 3411 * doing the job. We will set the next pointer 3412 * to HEAD. After that, we set the old pointer 3413 * to NORMAL, but only if it was HEAD before. 3414 * otherwise we are an interrupt, and only 3415 * want the outer most commit to reset it. 3416 */ 3417 new_head = next_page; 3418 rb_inc_page(&new_head); 3419 3420 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3421 RB_PAGE_NORMAL); 3422 3423 /* 3424 * Valid returns are: 3425 * HEAD - an interrupt came in and already set it. 3426 * NORMAL - One of two things: 3427 * 1) We really set it. 3428 * 2) A bunch of interrupts came in and moved 3429 * the page forward again. 3430 */ 3431 switch (ret) { 3432 case RB_PAGE_HEAD: 3433 case RB_PAGE_NORMAL: 3434 /* OK */ 3435 break; 3436 default: 3437 RB_WARN_ON(cpu_buffer, 1); 3438 return -1; 3439 } 3440 3441 /* 3442 * It is possible that an interrupt came in, 3443 * set the head up, then more interrupts came in 3444 * and moved it again. When we get back here, 3445 * the page would have been set to NORMAL but we 3446 * just set it back to HEAD. 3447 * 3448 * How do you detect this? Well, if that happened 3449 * the tail page would have moved. 3450 */ 3451 if (ret == RB_PAGE_NORMAL) { 3452 struct buffer_page *buffer_tail_page; 3453 3454 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3455 /* 3456 * If the tail had moved passed next, then we need 3457 * to reset the pointer. 3458 */ 3459 if (buffer_tail_page != tail_page && 3460 buffer_tail_page != next_page) 3461 rb_head_page_set_normal(cpu_buffer, new_head, 3462 next_page, 3463 RB_PAGE_HEAD); 3464 } 3465 3466 /* 3467 * If this was the outer most commit (the one that 3468 * changed the original pointer from HEAD to UPDATE), 3469 * then it is up to us to reset it to NORMAL. 3470 */ 3471 if (type == RB_PAGE_HEAD) { 3472 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3473 tail_page, 3474 RB_PAGE_UPDATE); 3475 if (RB_WARN_ON(cpu_buffer, 3476 ret != RB_PAGE_UPDATE)) 3477 return -1; 3478 } 3479 3480 return 0; 3481 } 3482 3483 static inline void 3484 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3485 unsigned long tail, struct rb_event_info *info) 3486 { 3487 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3488 struct buffer_page *tail_page = info->tail_page; 3489 struct ring_buffer_event *event; 3490 unsigned long length = info->length; 3491 3492 /* 3493 * Only the event that crossed the page boundary 3494 * must fill the old tail_page with padding. 3495 */ 3496 if (tail >= bsize) { 3497 /* 3498 * If the page was filled, then we still need 3499 * to update the real_end. Reset it to zero 3500 * and the reader will ignore it. 3501 */ 3502 if (tail == bsize) 3503 tail_page->real_end = 0; 3504 3505 local_sub(length, &tail_page->write); 3506 return; 3507 } 3508 3509 event = __rb_page_index(tail_page, tail); 3510 3511 /* 3512 * Save the original length to the meta data. 3513 * This will be used by the reader to add lost event 3514 * counter. 3515 */ 3516 tail_page->real_end = tail; 3517 3518 /* 3519 * If this event is bigger than the minimum size, then 3520 * we need to be careful that we don't subtract the 3521 * write counter enough to allow another writer to slip 3522 * in on this page. 3523 * We put in a discarded commit instead, to make sure 3524 * that this space is not used again, and this space will 3525 * not be accounted into 'entries_bytes'. 3526 * 3527 * If we are less than the minimum size, we don't need to 3528 * worry about it. 3529 */ 3530 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3531 /* No room for any events */ 3532 3533 /* Mark the rest of the page with padding */ 3534 rb_event_set_padding(event); 3535 3536 /* Make sure the padding is visible before the write update */ 3537 smp_wmb(); 3538 3539 /* Set the write back to the previous setting */ 3540 local_sub(length, &tail_page->write); 3541 return; 3542 } 3543 3544 /* Put in a discarded event */ 3545 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3546 event->type_len = RINGBUF_TYPE_PADDING; 3547 /* time delta must be non zero */ 3548 event->time_delta = 1; 3549 3550 /* account for padding bytes */ 3551 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3552 3553 /* Make sure the padding is visible before the tail_page->write update */ 3554 smp_wmb(); 3555 3556 /* Set write to end of buffer */ 3557 length = (tail + length) - bsize; 3558 local_sub(length, &tail_page->write); 3559 } 3560 3561 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3562 3563 /* 3564 * This is the slow path, force gcc not to inline it. 3565 */ 3566 static noinline struct ring_buffer_event * 3567 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3568 unsigned long tail, struct rb_event_info *info) 3569 { 3570 struct buffer_page *tail_page = info->tail_page; 3571 struct buffer_page *commit_page = cpu_buffer->commit_page; 3572 struct trace_buffer *buffer = cpu_buffer->buffer; 3573 struct buffer_page *next_page; 3574 int ret; 3575 3576 next_page = tail_page; 3577 3578 rb_inc_page(&next_page); 3579 3580 /* 3581 * If for some reason, we had an interrupt storm that made 3582 * it all the way around the buffer, bail, and warn 3583 * about it. 3584 */ 3585 if (unlikely(next_page == commit_page)) { 3586 local_inc(&cpu_buffer->commit_overrun); 3587 goto out_reset; 3588 } 3589 3590 /* 3591 * This is where the fun begins! 3592 * 3593 * We are fighting against races between a reader that 3594 * could be on another CPU trying to swap its reader 3595 * page with the buffer head. 3596 * 3597 * We are also fighting against interrupts coming in and 3598 * moving the head or tail on us as well. 3599 * 3600 * If the next page is the head page then we have filled 3601 * the buffer, unless the commit page is still on the 3602 * reader page. 3603 */ 3604 if (rb_is_head_page(next_page, &tail_page->list)) { 3605 3606 /* 3607 * If the commit is not on the reader page, then 3608 * move the header page. 3609 */ 3610 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3611 /* 3612 * If we are not in overwrite mode, 3613 * this is easy, just stop here. 3614 */ 3615 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3616 local_inc(&cpu_buffer->dropped_events); 3617 goto out_reset; 3618 } 3619 3620 ret = rb_handle_head_page(cpu_buffer, 3621 tail_page, 3622 next_page); 3623 if (ret < 0) 3624 goto out_reset; 3625 if (ret) 3626 goto out_again; 3627 } else { 3628 /* 3629 * We need to be careful here too. The 3630 * commit page could still be on the reader 3631 * page. We could have a small buffer, and 3632 * have filled up the buffer with events 3633 * from interrupts and such, and wrapped. 3634 * 3635 * Note, if the tail page is also on the 3636 * reader_page, we let it move out. 3637 */ 3638 if (unlikely((cpu_buffer->commit_page != 3639 cpu_buffer->tail_page) && 3640 (cpu_buffer->commit_page == 3641 cpu_buffer->reader_page))) { 3642 local_inc(&cpu_buffer->commit_overrun); 3643 goto out_reset; 3644 } 3645 } 3646 } 3647 3648 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3649 3650 out_again: 3651 3652 rb_reset_tail(cpu_buffer, tail, info); 3653 3654 /* Commit what we have for now. */ 3655 rb_end_commit(cpu_buffer); 3656 /* rb_end_commit() decs committing */ 3657 local_inc(&cpu_buffer->committing); 3658 3659 /* fail and let the caller try again */ 3660 return ERR_PTR(-EAGAIN); 3661 3662 out_reset: 3663 /* reset write */ 3664 rb_reset_tail(cpu_buffer, tail, info); 3665 3666 return NULL; 3667 } 3668 3669 /* Slow path */ 3670 static struct ring_buffer_event * 3671 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3672 struct ring_buffer_event *event, u64 delta, bool abs) 3673 { 3674 if (abs) 3675 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3676 else 3677 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3678 3679 /* Not the first event on the page, or not delta? */ 3680 if (abs || rb_event_index(cpu_buffer, event)) { 3681 event->time_delta = delta & TS_MASK; 3682 event->array[0] = delta >> TS_SHIFT; 3683 } else { 3684 /* nope, just zero it */ 3685 event->time_delta = 0; 3686 event->array[0] = 0; 3687 } 3688 3689 return skip_time_extend(event); 3690 } 3691 3692 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3693 static inline bool sched_clock_stable(void) 3694 { 3695 return true; 3696 } 3697 #endif 3698 3699 static void 3700 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3701 struct rb_event_info *info) 3702 { 3703 u64 write_stamp; 3704 3705 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3706 (unsigned long long)info->delta, 3707 (unsigned long long)info->ts, 3708 (unsigned long long)info->before, 3709 (unsigned long long)info->after, 3710 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3711 sched_clock_stable() ? "" : 3712 "If you just came from a suspend/resume,\n" 3713 "please switch to the trace global clock:\n" 3714 " echo global > /sys/kernel/tracing/trace_clock\n" 3715 "or add trace_clock=global to the kernel command line\n"); 3716 } 3717 3718 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3719 struct ring_buffer_event **event, 3720 struct rb_event_info *info, 3721 u64 *delta, 3722 unsigned int *length) 3723 { 3724 bool abs = info->add_timestamp & 3725 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3726 3727 if (unlikely(info->delta > (1ULL << 59))) { 3728 /* 3729 * Some timers can use more than 59 bits, and when a timestamp 3730 * is added to the buffer, it will lose those bits. 3731 */ 3732 if (abs && (info->ts & TS_MSB)) { 3733 info->delta &= ABS_TS_MASK; 3734 3735 /* did the clock go backwards */ 3736 } else if (info->before == info->after && info->before > info->ts) { 3737 /* not interrupted */ 3738 static int once; 3739 3740 /* 3741 * This is possible with a recalibrating of the TSC. 3742 * Do not produce a call stack, but just report it. 3743 */ 3744 if (!once) { 3745 once++; 3746 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3747 info->before, info->ts); 3748 } 3749 } else 3750 rb_check_timestamp(cpu_buffer, info); 3751 if (!abs) 3752 info->delta = 0; 3753 } 3754 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3755 *length -= RB_LEN_TIME_EXTEND; 3756 *delta = 0; 3757 } 3758 3759 /** 3760 * rb_update_event - update event type and data 3761 * @cpu_buffer: The per cpu buffer of the @event 3762 * @event: the event to update 3763 * @info: The info to update the @event with (contains length and delta) 3764 * 3765 * Update the type and data fields of the @event. The length 3766 * is the actual size that is written to the ring buffer, 3767 * and with this, we can determine what to place into the 3768 * data field. 3769 */ 3770 static void 3771 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3772 struct ring_buffer_event *event, 3773 struct rb_event_info *info) 3774 { 3775 unsigned length = info->length; 3776 u64 delta = info->delta; 3777 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3778 3779 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3780 cpu_buffer->event_stamp[nest] = info->ts; 3781 3782 /* 3783 * If we need to add a timestamp, then we 3784 * add it to the start of the reserved space. 3785 */ 3786 if (unlikely(info->add_timestamp)) 3787 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3788 3789 event->time_delta = delta; 3790 length -= RB_EVNT_HDR_SIZE; 3791 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3792 event->type_len = 0; 3793 event->array[0] = length; 3794 } else 3795 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3796 } 3797 3798 static unsigned rb_calculate_event_length(unsigned length) 3799 { 3800 struct ring_buffer_event event; /* Used only for sizeof array */ 3801 3802 /* zero length can cause confusions */ 3803 if (!length) 3804 length++; 3805 3806 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3807 length += sizeof(event.array[0]); 3808 3809 length += RB_EVNT_HDR_SIZE; 3810 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3811 3812 /* 3813 * In case the time delta is larger than the 27 bits for it 3814 * in the header, we need to add a timestamp. If another 3815 * event comes in when trying to discard this one to increase 3816 * the length, then the timestamp will be added in the allocated 3817 * space of this event. If length is bigger than the size needed 3818 * for the TIME_EXTEND, then padding has to be used. The events 3819 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3820 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3821 * As length is a multiple of 4, we only need to worry if it 3822 * is 12 (RB_LEN_TIME_EXTEND + 4). 3823 */ 3824 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3825 length += RB_ALIGNMENT; 3826 3827 return length; 3828 } 3829 3830 static inline bool 3831 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3832 struct ring_buffer_event *event) 3833 { 3834 unsigned long new_index, old_index; 3835 struct buffer_page *bpage; 3836 unsigned long addr; 3837 3838 new_index = rb_event_index(cpu_buffer, event); 3839 old_index = new_index + rb_event_ts_length(event); 3840 addr = (unsigned long)event; 3841 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3842 3843 bpage = READ_ONCE(cpu_buffer->tail_page); 3844 3845 /* 3846 * Make sure the tail_page is still the same and 3847 * the next write location is the end of this event 3848 */ 3849 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3850 unsigned long write_mask = 3851 local_read(&bpage->write) & ~RB_WRITE_MASK; 3852 unsigned long event_length = rb_event_length(event); 3853 3854 /* 3855 * For the before_stamp to be different than the write_stamp 3856 * to make sure that the next event adds an absolute 3857 * value and does not rely on the saved write stamp, which 3858 * is now going to be bogus. 3859 * 3860 * By setting the before_stamp to zero, the next event 3861 * is not going to use the write_stamp and will instead 3862 * create an absolute timestamp. This means there's no 3863 * reason to update the wirte_stamp! 3864 */ 3865 rb_time_set(&cpu_buffer->before_stamp, 0); 3866 3867 /* 3868 * If an event were to come in now, it would see that the 3869 * write_stamp and the before_stamp are different, and assume 3870 * that this event just added itself before updating 3871 * the write stamp. The interrupting event will fix the 3872 * write stamp for us, and use an absolute timestamp. 3873 */ 3874 3875 /* 3876 * This is on the tail page. It is possible that 3877 * a write could come in and move the tail page 3878 * and write to the next page. That is fine 3879 * because we just shorten what is on this page. 3880 */ 3881 old_index += write_mask; 3882 new_index += write_mask; 3883 3884 /* caution: old_index gets updated on cmpxchg failure */ 3885 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3886 /* update counters */ 3887 local_sub(event_length, &cpu_buffer->entries_bytes); 3888 return true; 3889 } 3890 } 3891 3892 /* could not discard */ 3893 return false; 3894 } 3895 3896 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3897 { 3898 local_inc(&cpu_buffer->committing); 3899 local_inc(&cpu_buffer->commits); 3900 } 3901 3902 static __always_inline void 3903 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3904 { 3905 unsigned long max_count; 3906 3907 /* 3908 * We only race with interrupts and NMIs on this CPU. 3909 * If we own the commit event, then we can commit 3910 * all others that interrupted us, since the interruptions 3911 * are in stack format (they finish before they come 3912 * back to us). This allows us to do a simple loop to 3913 * assign the commit to the tail. 3914 */ 3915 again: 3916 max_count = cpu_buffer->nr_pages * 100; 3917 3918 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3919 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3920 return; 3921 if (RB_WARN_ON(cpu_buffer, 3922 rb_is_reader_page(cpu_buffer->tail_page))) 3923 return; 3924 /* 3925 * No need for a memory barrier here, as the update 3926 * of the tail_page did it for this page. 3927 */ 3928 local_set(&cpu_buffer->commit_page->page->commit, 3929 rb_page_write(cpu_buffer->commit_page)); 3930 rb_inc_page(&cpu_buffer->commit_page); 3931 if (cpu_buffer->ring_meta) { 3932 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3933 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3934 } 3935 /* add barrier to keep gcc from optimizing too much */ 3936 barrier(); 3937 } 3938 while (rb_commit_index(cpu_buffer) != 3939 rb_page_write(cpu_buffer->commit_page)) { 3940 3941 /* Make sure the readers see the content of what is committed. */ 3942 smp_wmb(); 3943 local_set(&cpu_buffer->commit_page->page->commit, 3944 rb_page_write(cpu_buffer->commit_page)); 3945 RB_WARN_ON(cpu_buffer, 3946 local_read(&cpu_buffer->commit_page->page->commit) & 3947 ~RB_WRITE_MASK); 3948 barrier(); 3949 } 3950 3951 /* again, keep gcc from optimizing */ 3952 barrier(); 3953 3954 /* 3955 * If an interrupt came in just after the first while loop 3956 * and pushed the tail page forward, we will be left with 3957 * a dangling commit that will never go forward. 3958 */ 3959 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3960 goto again; 3961 } 3962 3963 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3964 { 3965 unsigned long commits; 3966 3967 if (RB_WARN_ON(cpu_buffer, 3968 !local_read(&cpu_buffer->committing))) 3969 return; 3970 3971 again: 3972 commits = local_read(&cpu_buffer->commits); 3973 /* synchronize with interrupts */ 3974 barrier(); 3975 if (local_read(&cpu_buffer->committing) == 1) 3976 rb_set_commit_to_write(cpu_buffer); 3977 3978 local_dec(&cpu_buffer->committing); 3979 3980 /* synchronize with interrupts */ 3981 barrier(); 3982 3983 /* 3984 * Need to account for interrupts coming in between the 3985 * updating of the commit page and the clearing of the 3986 * committing counter. 3987 */ 3988 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3989 !local_read(&cpu_buffer->committing)) { 3990 local_inc(&cpu_buffer->committing); 3991 goto again; 3992 } 3993 } 3994 3995 static inline void rb_event_discard(struct ring_buffer_event *event) 3996 { 3997 if (extended_time(event)) 3998 event = skip_time_extend(event); 3999 4000 /* array[0] holds the actual length for the discarded event */ 4001 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 4002 event->type_len = RINGBUF_TYPE_PADDING; 4003 /* time delta must be non zero */ 4004 if (!event->time_delta) 4005 event->time_delta = 1; 4006 } 4007 4008 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 4009 { 4010 local_inc(&cpu_buffer->entries); 4011 rb_end_commit(cpu_buffer); 4012 } 4013 4014 static __always_inline void 4015 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4016 { 4017 if (buffer->irq_work.waiters_pending) { 4018 buffer->irq_work.waiters_pending = false; 4019 /* irq_work_queue() supplies it's own memory barriers */ 4020 irq_work_queue(&buffer->irq_work.work); 4021 } 4022 4023 if (cpu_buffer->irq_work.waiters_pending) { 4024 cpu_buffer->irq_work.waiters_pending = false; 4025 /* irq_work_queue() supplies it's own memory barriers */ 4026 irq_work_queue(&cpu_buffer->irq_work.work); 4027 } 4028 4029 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4030 return; 4031 4032 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4033 return; 4034 4035 if (!cpu_buffer->irq_work.full_waiters_pending) 4036 return; 4037 4038 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4039 4040 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4041 return; 4042 4043 cpu_buffer->irq_work.wakeup_full = true; 4044 cpu_buffer->irq_work.full_waiters_pending = false; 4045 /* irq_work_queue() supplies it's own memory barriers */ 4046 irq_work_queue(&cpu_buffer->irq_work.work); 4047 } 4048 4049 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4050 # define do_ring_buffer_record_recursion() \ 4051 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4052 #else 4053 # define do_ring_buffer_record_recursion() do { } while (0) 4054 #endif 4055 4056 /* 4057 * The lock and unlock are done within a preempt disable section. 4058 * The current_context per_cpu variable can only be modified 4059 * by the current task between lock and unlock. But it can 4060 * be modified more than once via an interrupt. To pass this 4061 * information from the lock to the unlock without having to 4062 * access the 'in_interrupt()' functions again (which do show 4063 * a bit of overhead in something as critical as function tracing, 4064 * we use a bitmask trick. 4065 * 4066 * bit 1 = NMI context 4067 * bit 2 = IRQ context 4068 * bit 3 = SoftIRQ context 4069 * bit 4 = normal context. 4070 * 4071 * This works because this is the order of contexts that can 4072 * preempt other contexts. A SoftIRQ never preempts an IRQ 4073 * context. 4074 * 4075 * When the context is determined, the corresponding bit is 4076 * checked and set (if it was set, then a recursion of that context 4077 * happened). 4078 * 4079 * On unlock, we need to clear this bit. To do so, just subtract 4080 * 1 from the current_context and AND it to itself. 4081 * 4082 * (binary) 4083 * 101 - 1 = 100 4084 * 101 & 100 = 100 (clearing bit zero) 4085 * 4086 * 1010 - 1 = 1001 4087 * 1010 & 1001 = 1000 (clearing bit 1) 4088 * 4089 * The least significant bit can be cleared this way, and it 4090 * just so happens that it is the same bit corresponding to 4091 * the current context. 4092 * 4093 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4094 * is set when a recursion is detected at the current context, and if 4095 * the TRANSITION bit is already set, it will fail the recursion. 4096 * This is needed because there's a lag between the changing of 4097 * interrupt context and updating the preempt count. In this case, 4098 * a false positive will be found. To handle this, one extra recursion 4099 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4100 * bit is already set, then it is considered a recursion and the function 4101 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4102 * 4103 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4104 * to be cleared. Even if it wasn't the context that set it. That is, 4105 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4106 * is called before preempt_count() is updated, since the check will 4107 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4108 * NMI then comes in, it will set the NMI bit, but when the NMI code 4109 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4110 * and leave the NMI bit set. But this is fine, because the interrupt 4111 * code that set the TRANSITION bit will then clear the NMI bit when it 4112 * calls trace_recursive_unlock(). If another NMI comes in, it will 4113 * set the TRANSITION bit and continue. 4114 * 4115 * Note: The TRANSITION bit only handles a single transition between context. 4116 */ 4117 4118 static __always_inline bool 4119 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4120 { 4121 unsigned int val = cpu_buffer->current_context; 4122 int bit = interrupt_context_level(); 4123 4124 bit = RB_CTX_NORMAL - bit; 4125 4126 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4127 /* 4128 * It is possible that this was called by transitioning 4129 * between interrupt context, and preempt_count() has not 4130 * been updated yet. In this case, use the TRANSITION bit. 4131 */ 4132 bit = RB_CTX_TRANSITION; 4133 if (val & (1 << (bit + cpu_buffer->nest))) { 4134 do_ring_buffer_record_recursion(); 4135 return true; 4136 } 4137 } 4138 4139 val |= (1 << (bit + cpu_buffer->nest)); 4140 cpu_buffer->current_context = val; 4141 4142 return false; 4143 } 4144 4145 static __always_inline void 4146 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4147 { 4148 cpu_buffer->current_context &= 4149 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4150 } 4151 4152 /* The recursive locking above uses 5 bits */ 4153 #define NESTED_BITS 5 4154 4155 /** 4156 * ring_buffer_nest_start - Allow to trace while nested 4157 * @buffer: The ring buffer to modify 4158 * 4159 * The ring buffer has a safety mechanism to prevent recursion. 4160 * But there may be a case where a trace needs to be done while 4161 * tracing something else. In this case, calling this function 4162 * will allow this function to nest within a currently active 4163 * ring_buffer_lock_reserve(). 4164 * 4165 * Call this function before calling another ring_buffer_lock_reserve() and 4166 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4167 */ 4168 void ring_buffer_nest_start(struct trace_buffer *buffer) 4169 { 4170 struct ring_buffer_per_cpu *cpu_buffer; 4171 int cpu; 4172 4173 /* Enabled by ring_buffer_nest_end() */ 4174 preempt_disable_notrace(); 4175 cpu = raw_smp_processor_id(); 4176 cpu_buffer = buffer->buffers[cpu]; 4177 /* This is the shift value for the above recursive locking */ 4178 cpu_buffer->nest += NESTED_BITS; 4179 } 4180 4181 /** 4182 * ring_buffer_nest_end - Allow to trace while nested 4183 * @buffer: The ring buffer to modify 4184 * 4185 * Must be called after ring_buffer_nest_start() and after the 4186 * ring_buffer_unlock_commit(). 4187 */ 4188 void ring_buffer_nest_end(struct trace_buffer *buffer) 4189 { 4190 struct ring_buffer_per_cpu *cpu_buffer; 4191 int cpu; 4192 4193 /* disabled by ring_buffer_nest_start() */ 4194 cpu = raw_smp_processor_id(); 4195 cpu_buffer = buffer->buffers[cpu]; 4196 /* This is the shift value for the above recursive locking */ 4197 cpu_buffer->nest -= NESTED_BITS; 4198 preempt_enable_notrace(); 4199 } 4200 4201 /** 4202 * ring_buffer_unlock_commit - commit a reserved 4203 * @buffer: The buffer to commit to 4204 * 4205 * This commits the data to the ring buffer, and releases any locks held. 4206 * 4207 * Must be paired with ring_buffer_lock_reserve. 4208 */ 4209 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4210 { 4211 struct ring_buffer_per_cpu *cpu_buffer; 4212 int cpu = raw_smp_processor_id(); 4213 4214 cpu_buffer = buffer->buffers[cpu]; 4215 4216 rb_commit(cpu_buffer); 4217 4218 rb_wakeups(buffer, cpu_buffer); 4219 4220 trace_recursive_unlock(cpu_buffer); 4221 4222 preempt_enable_notrace(); 4223 4224 return 0; 4225 } 4226 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4227 4228 /* Special value to validate all deltas on a page. */ 4229 #define CHECK_FULL_PAGE 1L 4230 4231 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4232 4233 static const char *show_irq_str(int bits) 4234 { 4235 static const char * type[] = { 4236 ".", // 0 4237 "s", // 1 4238 "h", // 2 4239 "Hs", // 3 4240 "n", // 4 4241 "Ns", // 5 4242 "Nh", // 6 4243 "NHs", // 7 4244 }; 4245 4246 return type[bits]; 4247 } 4248 4249 /* Assume this is a trace event */ 4250 static const char *show_flags(struct ring_buffer_event *event) 4251 { 4252 struct trace_entry *entry; 4253 int bits = 0; 4254 4255 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4256 return "X"; 4257 4258 entry = ring_buffer_event_data(event); 4259 4260 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4261 bits |= 1; 4262 4263 if (entry->flags & TRACE_FLAG_HARDIRQ) 4264 bits |= 2; 4265 4266 if (entry->flags & TRACE_FLAG_NMI) 4267 bits |= 4; 4268 4269 return show_irq_str(bits); 4270 } 4271 4272 static const char *show_irq(struct ring_buffer_event *event) 4273 { 4274 struct trace_entry *entry; 4275 4276 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4277 return ""; 4278 4279 entry = ring_buffer_event_data(event); 4280 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4281 return "d"; 4282 return ""; 4283 } 4284 4285 static const char *show_interrupt_level(void) 4286 { 4287 unsigned long pc = preempt_count(); 4288 unsigned char level = 0; 4289 4290 if (pc & SOFTIRQ_OFFSET) 4291 level |= 1; 4292 4293 if (pc & HARDIRQ_MASK) 4294 level |= 2; 4295 4296 if (pc & NMI_MASK) 4297 level |= 4; 4298 4299 return show_irq_str(level); 4300 } 4301 4302 static void dump_buffer_page(struct buffer_data_page *bpage, 4303 struct rb_event_info *info, 4304 unsigned long tail) 4305 { 4306 struct ring_buffer_event *event; 4307 u64 ts, delta; 4308 int e; 4309 4310 ts = bpage->time_stamp; 4311 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4312 4313 for (e = 0; e < tail; e += rb_event_length(event)) { 4314 4315 event = (struct ring_buffer_event *)(bpage->data + e); 4316 4317 switch (event->type_len) { 4318 4319 case RINGBUF_TYPE_TIME_EXTEND: 4320 delta = rb_event_time_stamp(event); 4321 ts += delta; 4322 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4323 e, ts, delta); 4324 break; 4325 4326 case RINGBUF_TYPE_TIME_STAMP: 4327 delta = rb_event_time_stamp(event); 4328 ts = rb_fix_abs_ts(delta, ts); 4329 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4330 e, ts, delta); 4331 break; 4332 4333 case RINGBUF_TYPE_PADDING: 4334 ts += event->time_delta; 4335 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4336 e, ts, event->time_delta); 4337 break; 4338 4339 case RINGBUF_TYPE_DATA: 4340 ts += event->time_delta; 4341 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4342 e, ts, event->time_delta, 4343 show_flags(event), show_irq(event)); 4344 break; 4345 4346 default: 4347 break; 4348 } 4349 } 4350 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4351 } 4352 4353 static DEFINE_PER_CPU(atomic_t, checking); 4354 static atomic_t ts_dump; 4355 4356 #define buffer_warn_return(fmt, ...) \ 4357 do { \ 4358 /* If another report is happening, ignore this one */ \ 4359 if (atomic_inc_return(&ts_dump) != 1) { \ 4360 atomic_dec(&ts_dump); \ 4361 goto out; \ 4362 } \ 4363 atomic_inc(&cpu_buffer->record_disabled); \ 4364 pr_warn(fmt, ##__VA_ARGS__); \ 4365 dump_buffer_page(bpage, info, tail); \ 4366 atomic_dec(&ts_dump); \ 4367 /* There's some cases in boot up that this can happen */ \ 4368 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4369 /* Do not re-enable checking */ \ 4370 return; \ 4371 } while (0) 4372 4373 /* 4374 * Check if the current event time stamp matches the deltas on 4375 * the buffer page. 4376 */ 4377 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4378 struct rb_event_info *info, 4379 unsigned long tail) 4380 { 4381 struct buffer_data_page *bpage; 4382 u64 ts, delta; 4383 bool full = false; 4384 int ret; 4385 4386 bpage = info->tail_page->page; 4387 4388 if (tail == CHECK_FULL_PAGE) { 4389 full = true; 4390 tail = local_read(&bpage->commit); 4391 } else if (info->add_timestamp & 4392 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4393 /* Ignore events with absolute time stamps */ 4394 return; 4395 } 4396 4397 /* 4398 * Do not check the first event (skip possible extends too). 4399 * Also do not check if previous events have not been committed. 4400 */ 4401 if (tail <= 8 || tail > local_read(&bpage->commit)) 4402 return; 4403 4404 /* 4405 * If this interrupted another event, 4406 */ 4407 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4408 goto out; 4409 4410 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4411 if (ret < 0) { 4412 if (delta < ts) { 4413 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4414 cpu_buffer->cpu, ts, delta); 4415 goto out; 4416 } 4417 } 4418 if ((full && ts > info->ts) || 4419 (!full && ts + info->delta != info->ts)) { 4420 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4421 cpu_buffer->cpu, 4422 ts + info->delta, info->ts, info->delta, 4423 info->before, info->after, 4424 full ? " (full)" : "", show_interrupt_level()); 4425 } 4426 out: 4427 atomic_dec(this_cpu_ptr(&checking)); 4428 } 4429 #else 4430 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4431 struct rb_event_info *info, 4432 unsigned long tail) 4433 { 4434 } 4435 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4436 4437 static struct ring_buffer_event * 4438 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4439 struct rb_event_info *info) 4440 { 4441 struct ring_buffer_event *event; 4442 struct buffer_page *tail_page; 4443 unsigned long tail, write, w; 4444 4445 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4446 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4447 4448 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4449 barrier(); 4450 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4451 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4452 barrier(); 4453 info->ts = rb_time_stamp(cpu_buffer->buffer); 4454 4455 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4456 info->delta = info->ts; 4457 } else { 4458 /* 4459 * If interrupting an event time update, we may need an 4460 * absolute timestamp. 4461 * Don't bother if this is the start of a new page (w == 0). 4462 */ 4463 if (!w) { 4464 /* Use the sub-buffer timestamp */ 4465 info->delta = 0; 4466 } else if (unlikely(info->before != info->after)) { 4467 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4468 info->length += RB_LEN_TIME_EXTEND; 4469 } else { 4470 info->delta = info->ts - info->after; 4471 if (unlikely(test_time_stamp(info->delta))) { 4472 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4473 info->length += RB_LEN_TIME_EXTEND; 4474 } 4475 } 4476 } 4477 4478 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4479 4480 /*C*/ write = local_add_return(info->length, &tail_page->write); 4481 4482 /* set write to only the index of the write */ 4483 write &= RB_WRITE_MASK; 4484 4485 tail = write - info->length; 4486 4487 /* See if we shot pass the end of this buffer page */ 4488 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4489 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4490 return rb_move_tail(cpu_buffer, tail, info); 4491 } 4492 4493 if (likely(tail == w)) { 4494 /* Nothing interrupted us between A and C */ 4495 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4496 /* 4497 * If something came in between C and D, the write stamp 4498 * may now not be in sync. But that's fine as the before_stamp 4499 * will be different and then next event will just be forced 4500 * to use an absolute timestamp. 4501 */ 4502 if (likely(!(info->add_timestamp & 4503 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4504 /* This did not interrupt any time update */ 4505 info->delta = info->ts - info->after; 4506 else 4507 /* Just use full timestamp for interrupting event */ 4508 info->delta = info->ts; 4509 check_buffer(cpu_buffer, info, tail); 4510 } else { 4511 u64 ts; 4512 /* SLOW PATH - Interrupted between A and C */ 4513 4514 /* Save the old before_stamp */ 4515 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4516 4517 /* 4518 * Read a new timestamp and update the before_stamp to make 4519 * the next event after this one force using an absolute 4520 * timestamp. This is in case an interrupt were to come in 4521 * between E and F. 4522 */ 4523 ts = rb_time_stamp(cpu_buffer->buffer); 4524 rb_time_set(&cpu_buffer->before_stamp, ts); 4525 4526 barrier(); 4527 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4528 barrier(); 4529 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4530 info->after == info->before && info->after < ts) { 4531 /* 4532 * Nothing came after this event between C and F, it is 4533 * safe to use info->after for the delta as it 4534 * matched info->before and is still valid. 4535 */ 4536 info->delta = ts - info->after; 4537 } else { 4538 /* 4539 * Interrupted between C and F: 4540 * Lost the previous events time stamp. Just set the 4541 * delta to zero, and this will be the same time as 4542 * the event this event interrupted. And the events that 4543 * came after this will still be correct (as they would 4544 * have built their delta on the previous event. 4545 */ 4546 info->delta = 0; 4547 } 4548 info->ts = ts; 4549 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4550 } 4551 4552 /* 4553 * If this is the first commit on the page, then it has the same 4554 * timestamp as the page itself. 4555 */ 4556 if (unlikely(!tail && !(info->add_timestamp & 4557 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4558 info->delta = 0; 4559 4560 /* We reserved something on the buffer */ 4561 4562 event = __rb_page_index(tail_page, tail); 4563 rb_update_event(cpu_buffer, event, info); 4564 4565 local_inc(&tail_page->entries); 4566 4567 /* 4568 * If this is the first commit on the page, then update 4569 * its timestamp. 4570 */ 4571 if (unlikely(!tail)) 4572 tail_page->page->time_stamp = info->ts; 4573 4574 /* account for these added bytes */ 4575 local_add(info->length, &cpu_buffer->entries_bytes); 4576 4577 return event; 4578 } 4579 4580 static __always_inline struct ring_buffer_event * 4581 rb_reserve_next_event(struct trace_buffer *buffer, 4582 struct ring_buffer_per_cpu *cpu_buffer, 4583 unsigned long length) 4584 { 4585 struct ring_buffer_event *event; 4586 struct rb_event_info info; 4587 int nr_loops = 0; 4588 int add_ts_default; 4589 4590 /* 4591 * ring buffer does cmpxchg as well as atomic64 operations 4592 * (which some archs use locking for atomic64), make sure this 4593 * is safe in NMI context 4594 */ 4595 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4596 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4597 (unlikely(in_nmi()))) { 4598 return NULL; 4599 } 4600 4601 rb_start_commit(cpu_buffer); 4602 /* The commit page can not change after this */ 4603 4604 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4605 /* 4606 * Due to the ability to swap a cpu buffer from a buffer 4607 * it is possible it was swapped before we committed. 4608 * (committing stops a swap). We check for it here and 4609 * if it happened, we have to fail the write. 4610 */ 4611 barrier(); 4612 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4613 local_dec(&cpu_buffer->committing); 4614 local_dec(&cpu_buffer->commits); 4615 return NULL; 4616 } 4617 #endif 4618 4619 info.length = rb_calculate_event_length(length); 4620 4621 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4622 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4623 info.length += RB_LEN_TIME_EXTEND; 4624 if (info.length > cpu_buffer->buffer->max_data_size) 4625 goto out_fail; 4626 } else { 4627 add_ts_default = RB_ADD_STAMP_NONE; 4628 } 4629 4630 again: 4631 info.add_timestamp = add_ts_default; 4632 info.delta = 0; 4633 4634 /* 4635 * We allow for interrupts to reenter here and do a trace. 4636 * If one does, it will cause this original code to loop 4637 * back here. Even with heavy interrupts happening, this 4638 * should only happen a few times in a row. If this happens 4639 * 1000 times in a row, there must be either an interrupt 4640 * storm or we have something buggy. 4641 * Bail! 4642 */ 4643 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4644 goto out_fail; 4645 4646 event = __rb_reserve_next(cpu_buffer, &info); 4647 4648 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4649 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4650 info.length -= RB_LEN_TIME_EXTEND; 4651 goto again; 4652 } 4653 4654 if (likely(event)) 4655 return event; 4656 out_fail: 4657 rb_end_commit(cpu_buffer); 4658 return NULL; 4659 } 4660 4661 /** 4662 * ring_buffer_lock_reserve - reserve a part of the buffer 4663 * @buffer: the ring buffer to reserve from 4664 * @length: the length of the data to reserve (excluding event header) 4665 * 4666 * Returns a reserved event on the ring buffer to copy directly to. 4667 * The user of this interface will need to get the body to write into 4668 * and can use the ring_buffer_event_data() interface. 4669 * 4670 * The length is the length of the data needed, not the event length 4671 * which also includes the event header. 4672 * 4673 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4674 * If NULL is returned, then nothing has been allocated or locked. 4675 */ 4676 struct ring_buffer_event * 4677 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4678 { 4679 struct ring_buffer_per_cpu *cpu_buffer; 4680 struct ring_buffer_event *event; 4681 int cpu; 4682 4683 /* If we are tracing schedule, we don't want to recurse */ 4684 preempt_disable_notrace(); 4685 4686 if (unlikely(atomic_read(&buffer->record_disabled))) 4687 goto out; 4688 4689 cpu = raw_smp_processor_id(); 4690 4691 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4692 goto out; 4693 4694 cpu_buffer = buffer->buffers[cpu]; 4695 4696 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4697 goto out; 4698 4699 if (unlikely(length > buffer->max_data_size)) 4700 goto out; 4701 4702 if (unlikely(trace_recursive_lock(cpu_buffer))) 4703 goto out; 4704 4705 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4706 if (!event) 4707 goto out_unlock; 4708 4709 return event; 4710 4711 out_unlock: 4712 trace_recursive_unlock(cpu_buffer); 4713 out: 4714 preempt_enable_notrace(); 4715 return NULL; 4716 } 4717 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4718 4719 /* 4720 * Decrement the entries to the page that an event is on. 4721 * The event does not even need to exist, only the pointer 4722 * to the page it is on. This may only be called before the commit 4723 * takes place. 4724 */ 4725 static inline void 4726 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4727 struct ring_buffer_event *event) 4728 { 4729 unsigned long addr = (unsigned long)event; 4730 struct buffer_page *bpage = cpu_buffer->commit_page; 4731 struct buffer_page *start; 4732 4733 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4734 4735 /* Do the likely case first */ 4736 if (likely(bpage->page == (void *)addr)) { 4737 local_dec(&bpage->entries); 4738 return; 4739 } 4740 4741 /* 4742 * Because the commit page may be on the reader page we 4743 * start with the next page and check the end loop there. 4744 */ 4745 rb_inc_page(&bpage); 4746 start = bpage; 4747 do { 4748 if (bpage->page == (void *)addr) { 4749 local_dec(&bpage->entries); 4750 return; 4751 } 4752 rb_inc_page(&bpage); 4753 } while (bpage != start); 4754 4755 /* commit not part of this buffer?? */ 4756 RB_WARN_ON(cpu_buffer, 1); 4757 } 4758 4759 /** 4760 * ring_buffer_discard_commit - discard an event that has not been committed 4761 * @buffer: the ring buffer 4762 * @event: non committed event to discard 4763 * 4764 * Sometimes an event that is in the ring buffer needs to be ignored. 4765 * This function lets the user discard an event in the ring buffer 4766 * and then that event will not be read later. 4767 * 4768 * This function only works if it is called before the item has been 4769 * committed. It will try to free the event from the ring buffer 4770 * if another event has not been added behind it. 4771 * 4772 * If another event has been added behind it, it will set the event 4773 * up as discarded, and perform the commit. 4774 * 4775 * If this function is called, do not call ring_buffer_unlock_commit on 4776 * the event. 4777 */ 4778 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4779 struct ring_buffer_event *event) 4780 { 4781 struct ring_buffer_per_cpu *cpu_buffer; 4782 int cpu; 4783 4784 /* The event is discarded regardless */ 4785 rb_event_discard(event); 4786 4787 cpu = smp_processor_id(); 4788 cpu_buffer = buffer->buffers[cpu]; 4789 4790 /* 4791 * This must only be called if the event has not been 4792 * committed yet. Thus we can assume that preemption 4793 * is still disabled. 4794 */ 4795 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4796 4797 rb_decrement_entry(cpu_buffer, event); 4798 rb_try_to_discard(cpu_buffer, event); 4799 rb_end_commit(cpu_buffer); 4800 4801 trace_recursive_unlock(cpu_buffer); 4802 4803 preempt_enable_notrace(); 4804 4805 } 4806 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4807 4808 /** 4809 * ring_buffer_write - write data to the buffer without reserving 4810 * @buffer: The ring buffer to write to. 4811 * @length: The length of the data being written (excluding the event header) 4812 * @data: The data to write to the buffer. 4813 * 4814 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4815 * one function. If you already have the data to write to the buffer, it 4816 * may be easier to simply call this function. 4817 * 4818 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4819 * and not the length of the event which would hold the header. 4820 */ 4821 int ring_buffer_write(struct trace_buffer *buffer, 4822 unsigned long length, 4823 void *data) 4824 { 4825 struct ring_buffer_per_cpu *cpu_buffer; 4826 struct ring_buffer_event *event; 4827 void *body; 4828 int ret = -EBUSY; 4829 int cpu; 4830 4831 guard(preempt_notrace)(); 4832 4833 if (atomic_read(&buffer->record_disabled)) 4834 return -EBUSY; 4835 4836 cpu = raw_smp_processor_id(); 4837 4838 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4839 return -EBUSY; 4840 4841 cpu_buffer = buffer->buffers[cpu]; 4842 4843 if (atomic_read(&cpu_buffer->record_disabled)) 4844 return -EBUSY; 4845 4846 if (length > buffer->max_data_size) 4847 return -EBUSY; 4848 4849 if (unlikely(trace_recursive_lock(cpu_buffer))) 4850 return -EBUSY; 4851 4852 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4853 if (!event) 4854 goto out_unlock; 4855 4856 body = rb_event_data(event); 4857 4858 memcpy(body, data, length); 4859 4860 rb_commit(cpu_buffer); 4861 4862 rb_wakeups(buffer, cpu_buffer); 4863 4864 ret = 0; 4865 4866 out_unlock: 4867 trace_recursive_unlock(cpu_buffer); 4868 return ret; 4869 } 4870 EXPORT_SYMBOL_GPL(ring_buffer_write); 4871 4872 /* 4873 * The total entries in the ring buffer is the running counter 4874 * of entries entered into the ring buffer, minus the sum of 4875 * the entries read from the ring buffer and the number of 4876 * entries that were overwritten. 4877 */ 4878 static inline unsigned long 4879 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4880 { 4881 return local_read(&cpu_buffer->entries) - 4882 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4883 } 4884 4885 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4886 { 4887 return !rb_num_of_entries(cpu_buffer); 4888 } 4889 4890 /** 4891 * ring_buffer_record_disable - stop all writes into the buffer 4892 * @buffer: The ring buffer to stop writes to. 4893 * 4894 * This prevents all writes to the buffer. Any attempt to write 4895 * to the buffer after this will fail and return NULL. 4896 * 4897 * The caller should call synchronize_rcu() after this. 4898 */ 4899 void ring_buffer_record_disable(struct trace_buffer *buffer) 4900 { 4901 atomic_inc(&buffer->record_disabled); 4902 } 4903 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4904 4905 /** 4906 * ring_buffer_record_enable - enable writes to the buffer 4907 * @buffer: The ring buffer to enable writes 4908 * 4909 * Note, multiple disables will need the same number of enables 4910 * to truly enable the writing (much like preempt_disable). 4911 */ 4912 void ring_buffer_record_enable(struct trace_buffer *buffer) 4913 { 4914 atomic_dec(&buffer->record_disabled); 4915 } 4916 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4917 4918 /** 4919 * ring_buffer_record_off - stop all writes into the buffer 4920 * @buffer: The ring buffer to stop writes to. 4921 * 4922 * This prevents all writes to the buffer. Any attempt to write 4923 * to the buffer after this will fail and return NULL. 4924 * 4925 * This is different than ring_buffer_record_disable() as 4926 * it works like an on/off switch, where as the disable() version 4927 * must be paired with a enable(). 4928 */ 4929 void ring_buffer_record_off(struct trace_buffer *buffer) 4930 { 4931 unsigned int rd; 4932 unsigned int new_rd; 4933 4934 rd = atomic_read(&buffer->record_disabled); 4935 do { 4936 new_rd = rd | RB_BUFFER_OFF; 4937 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4938 } 4939 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4940 4941 /** 4942 * ring_buffer_record_on - restart writes into the buffer 4943 * @buffer: The ring buffer to start writes to. 4944 * 4945 * This enables all writes to the buffer that was disabled by 4946 * ring_buffer_record_off(). 4947 * 4948 * This is different than ring_buffer_record_enable() as 4949 * it works like an on/off switch, where as the enable() version 4950 * must be paired with a disable(). 4951 */ 4952 void ring_buffer_record_on(struct trace_buffer *buffer) 4953 { 4954 unsigned int rd; 4955 unsigned int new_rd; 4956 4957 rd = atomic_read(&buffer->record_disabled); 4958 do { 4959 new_rd = rd & ~RB_BUFFER_OFF; 4960 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4961 } 4962 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4963 4964 /** 4965 * ring_buffer_record_is_on - return true if the ring buffer can write 4966 * @buffer: The ring buffer to see if write is enabled 4967 * 4968 * Returns true if the ring buffer is in a state that it accepts writes. 4969 */ 4970 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4971 { 4972 return !atomic_read(&buffer->record_disabled); 4973 } 4974 4975 /** 4976 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4977 * @buffer: The ring buffer to see if write is set enabled 4978 * 4979 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4980 * Note that this does NOT mean it is in a writable state. 4981 * 4982 * It may return true when the ring buffer has been disabled by 4983 * ring_buffer_record_disable(), as that is a temporary disabling of 4984 * the ring buffer. 4985 */ 4986 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4987 { 4988 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4989 } 4990 4991 /** 4992 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 4993 * @buffer: The ring buffer to see if write is enabled 4994 * @cpu: The CPU to test if the ring buffer can write too 4995 * 4996 * Returns true if the ring buffer is in a state that it accepts writes 4997 * for a particular CPU. 4998 */ 4999 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 5000 { 5001 struct ring_buffer_per_cpu *cpu_buffer; 5002 5003 cpu_buffer = buffer->buffers[cpu]; 5004 5005 return ring_buffer_record_is_set_on(buffer) && 5006 !atomic_read(&cpu_buffer->record_disabled); 5007 } 5008 5009 /** 5010 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 5011 * @buffer: The ring buffer to stop writes to. 5012 * @cpu: The CPU buffer to stop 5013 * 5014 * This prevents all writes to the buffer. Any attempt to write 5015 * to the buffer after this will fail and return NULL. 5016 * 5017 * The caller should call synchronize_rcu() after this. 5018 */ 5019 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5020 { 5021 struct ring_buffer_per_cpu *cpu_buffer; 5022 5023 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5024 return; 5025 5026 cpu_buffer = buffer->buffers[cpu]; 5027 atomic_inc(&cpu_buffer->record_disabled); 5028 } 5029 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5030 5031 /** 5032 * ring_buffer_record_enable_cpu - enable writes to the buffer 5033 * @buffer: The ring buffer to enable writes 5034 * @cpu: The CPU to enable. 5035 * 5036 * Note, multiple disables will need the same number of enables 5037 * to truly enable the writing (much like preempt_disable). 5038 */ 5039 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5040 { 5041 struct ring_buffer_per_cpu *cpu_buffer; 5042 5043 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5044 return; 5045 5046 cpu_buffer = buffer->buffers[cpu]; 5047 atomic_dec(&cpu_buffer->record_disabled); 5048 } 5049 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5050 5051 /** 5052 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5053 * @buffer: The ring buffer 5054 * @cpu: The per CPU buffer to read from. 5055 */ 5056 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5057 { 5058 unsigned long flags; 5059 struct ring_buffer_per_cpu *cpu_buffer; 5060 struct buffer_page *bpage; 5061 u64 ret = 0; 5062 5063 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5064 return 0; 5065 5066 cpu_buffer = buffer->buffers[cpu]; 5067 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5068 /* 5069 * if the tail is on reader_page, oldest time stamp is on the reader 5070 * page 5071 */ 5072 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5073 bpage = cpu_buffer->reader_page; 5074 else 5075 bpage = rb_set_head_page(cpu_buffer); 5076 if (bpage) 5077 ret = bpage->page->time_stamp; 5078 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5079 5080 return ret; 5081 } 5082 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5083 5084 /** 5085 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5086 * @buffer: The ring buffer 5087 * @cpu: The per CPU buffer to read from. 5088 */ 5089 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5090 { 5091 struct ring_buffer_per_cpu *cpu_buffer; 5092 unsigned long ret; 5093 5094 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5095 return 0; 5096 5097 cpu_buffer = buffer->buffers[cpu]; 5098 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5099 5100 return ret; 5101 } 5102 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5103 5104 /** 5105 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5106 * @buffer: The ring buffer 5107 * @cpu: The per CPU buffer to get the entries from. 5108 */ 5109 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5110 { 5111 struct ring_buffer_per_cpu *cpu_buffer; 5112 5113 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5114 return 0; 5115 5116 cpu_buffer = buffer->buffers[cpu]; 5117 5118 return rb_num_of_entries(cpu_buffer); 5119 } 5120 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5121 5122 /** 5123 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5124 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5125 * @buffer: The ring buffer 5126 * @cpu: The per CPU buffer to get the number of overruns from 5127 */ 5128 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5129 { 5130 struct ring_buffer_per_cpu *cpu_buffer; 5131 unsigned long ret; 5132 5133 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5134 return 0; 5135 5136 cpu_buffer = buffer->buffers[cpu]; 5137 ret = local_read(&cpu_buffer->overrun); 5138 5139 return ret; 5140 } 5141 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5142 5143 /** 5144 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5145 * commits failing due to the buffer wrapping around while there are uncommitted 5146 * events, such as during an interrupt storm. 5147 * @buffer: The ring buffer 5148 * @cpu: The per CPU buffer to get the number of overruns from 5149 */ 5150 unsigned long 5151 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5152 { 5153 struct ring_buffer_per_cpu *cpu_buffer; 5154 unsigned long ret; 5155 5156 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5157 return 0; 5158 5159 cpu_buffer = buffer->buffers[cpu]; 5160 ret = local_read(&cpu_buffer->commit_overrun); 5161 5162 return ret; 5163 } 5164 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5165 5166 /** 5167 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5168 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5169 * @buffer: The ring buffer 5170 * @cpu: The per CPU buffer to get the number of overruns from 5171 */ 5172 unsigned long 5173 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5174 { 5175 struct ring_buffer_per_cpu *cpu_buffer; 5176 unsigned long ret; 5177 5178 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5179 return 0; 5180 5181 cpu_buffer = buffer->buffers[cpu]; 5182 ret = local_read(&cpu_buffer->dropped_events); 5183 5184 return ret; 5185 } 5186 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5187 5188 /** 5189 * ring_buffer_read_events_cpu - get the number of events successfully read 5190 * @buffer: The ring buffer 5191 * @cpu: The per CPU buffer to get the number of events read 5192 */ 5193 unsigned long 5194 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5195 { 5196 struct ring_buffer_per_cpu *cpu_buffer; 5197 5198 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5199 return 0; 5200 5201 cpu_buffer = buffer->buffers[cpu]; 5202 return cpu_buffer->read; 5203 } 5204 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5205 5206 /** 5207 * ring_buffer_entries - get the number of entries in a buffer 5208 * @buffer: The ring buffer 5209 * 5210 * Returns the total number of entries in the ring buffer 5211 * (all CPU entries) 5212 */ 5213 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5214 { 5215 struct ring_buffer_per_cpu *cpu_buffer; 5216 unsigned long entries = 0; 5217 int cpu; 5218 5219 /* if you care about this being correct, lock the buffer */ 5220 for_each_buffer_cpu(buffer, cpu) { 5221 cpu_buffer = buffer->buffers[cpu]; 5222 entries += rb_num_of_entries(cpu_buffer); 5223 } 5224 5225 return entries; 5226 } 5227 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5228 5229 /** 5230 * ring_buffer_overruns - get the number of overruns in buffer 5231 * @buffer: The ring buffer 5232 * 5233 * Returns the total number of overruns in the ring buffer 5234 * (all CPU entries) 5235 */ 5236 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5237 { 5238 struct ring_buffer_per_cpu *cpu_buffer; 5239 unsigned long overruns = 0; 5240 int cpu; 5241 5242 /* if you care about this being correct, lock the buffer */ 5243 for_each_buffer_cpu(buffer, cpu) { 5244 cpu_buffer = buffer->buffers[cpu]; 5245 overruns += local_read(&cpu_buffer->overrun); 5246 } 5247 5248 return overruns; 5249 } 5250 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5251 5252 static void rb_iter_reset(struct ring_buffer_iter *iter) 5253 { 5254 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5255 5256 /* Iterator usage is expected to have record disabled */ 5257 iter->head_page = cpu_buffer->reader_page; 5258 iter->head = cpu_buffer->reader_page->read; 5259 iter->next_event = iter->head; 5260 5261 iter->cache_reader_page = iter->head_page; 5262 iter->cache_read = cpu_buffer->read; 5263 iter->cache_pages_removed = cpu_buffer->pages_removed; 5264 5265 if (iter->head) { 5266 iter->read_stamp = cpu_buffer->read_stamp; 5267 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5268 } else { 5269 iter->read_stamp = iter->head_page->page->time_stamp; 5270 iter->page_stamp = iter->read_stamp; 5271 } 5272 } 5273 5274 /** 5275 * ring_buffer_iter_reset - reset an iterator 5276 * @iter: The iterator to reset 5277 * 5278 * Resets the iterator, so that it will start from the beginning 5279 * again. 5280 */ 5281 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5282 { 5283 struct ring_buffer_per_cpu *cpu_buffer; 5284 unsigned long flags; 5285 5286 if (!iter) 5287 return; 5288 5289 cpu_buffer = iter->cpu_buffer; 5290 5291 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5292 rb_iter_reset(iter); 5293 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5294 } 5295 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5296 5297 /** 5298 * ring_buffer_iter_empty - check if an iterator has no more to read 5299 * @iter: The iterator to check 5300 */ 5301 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5302 { 5303 struct ring_buffer_per_cpu *cpu_buffer; 5304 struct buffer_page *reader; 5305 struct buffer_page *head_page; 5306 struct buffer_page *commit_page; 5307 struct buffer_page *curr_commit_page; 5308 unsigned commit; 5309 u64 curr_commit_ts; 5310 u64 commit_ts; 5311 5312 cpu_buffer = iter->cpu_buffer; 5313 reader = cpu_buffer->reader_page; 5314 head_page = cpu_buffer->head_page; 5315 commit_page = READ_ONCE(cpu_buffer->commit_page); 5316 commit_ts = commit_page->page->time_stamp; 5317 5318 /* 5319 * When the writer goes across pages, it issues a cmpxchg which 5320 * is a mb(), which will synchronize with the rmb here. 5321 * (see rb_tail_page_update()) 5322 */ 5323 smp_rmb(); 5324 commit = rb_page_commit(commit_page); 5325 /* We want to make sure that the commit page doesn't change */ 5326 smp_rmb(); 5327 5328 /* Make sure commit page didn't change */ 5329 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5330 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5331 5332 /* If the commit page changed, then there's more data */ 5333 if (curr_commit_page != commit_page || 5334 curr_commit_ts != commit_ts) 5335 return 0; 5336 5337 /* Still racy, as it may return a false positive, but that's OK */ 5338 return ((iter->head_page == commit_page && iter->head >= commit) || 5339 (iter->head_page == reader && commit_page == head_page && 5340 head_page->read == commit && 5341 iter->head == rb_page_size(cpu_buffer->reader_page))); 5342 } 5343 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5344 5345 static void 5346 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5347 struct ring_buffer_event *event) 5348 { 5349 u64 delta; 5350 5351 switch (event->type_len) { 5352 case RINGBUF_TYPE_PADDING: 5353 return; 5354 5355 case RINGBUF_TYPE_TIME_EXTEND: 5356 delta = rb_event_time_stamp(event); 5357 cpu_buffer->read_stamp += delta; 5358 return; 5359 5360 case RINGBUF_TYPE_TIME_STAMP: 5361 delta = rb_event_time_stamp(event); 5362 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5363 cpu_buffer->read_stamp = delta; 5364 return; 5365 5366 case RINGBUF_TYPE_DATA: 5367 cpu_buffer->read_stamp += event->time_delta; 5368 return; 5369 5370 default: 5371 RB_WARN_ON(cpu_buffer, 1); 5372 } 5373 } 5374 5375 static void 5376 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5377 struct ring_buffer_event *event) 5378 { 5379 u64 delta; 5380 5381 switch (event->type_len) { 5382 case RINGBUF_TYPE_PADDING: 5383 return; 5384 5385 case RINGBUF_TYPE_TIME_EXTEND: 5386 delta = rb_event_time_stamp(event); 5387 iter->read_stamp += delta; 5388 return; 5389 5390 case RINGBUF_TYPE_TIME_STAMP: 5391 delta = rb_event_time_stamp(event); 5392 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5393 iter->read_stamp = delta; 5394 return; 5395 5396 case RINGBUF_TYPE_DATA: 5397 iter->read_stamp += event->time_delta; 5398 return; 5399 5400 default: 5401 RB_WARN_ON(iter->cpu_buffer, 1); 5402 } 5403 } 5404 5405 static struct buffer_page * 5406 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5407 { 5408 struct buffer_page *reader = NULL; 5409 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5410 unsigned long overwrite; 5411 unsigned long flags; 5412 int nr_loops = 0; 5413 bool ret; 5414 5415 local_irq_save(flags); 5416 arch_spin_lock(&cpu_buffer->lock); 5417 5418 again: 5419 /* 5420 * This should normally only loop twice. But because the 5421 * start of the reader inserts an empty page, it causes 5422 * a case where we will loop three times. There should be no 5423 * reason to loop four times (that I know of). 5424 */ 5425 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5426 reader = NULL; 5427 goto out; 5428 } 5429 5430 reader = cpu_buffer->reader_page; 5431 5432 /* If there's more to read, return this page */ 5433 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5434 goto out; 5435 5436 /* Never should we have an index greater than the size */ 5437 if (RB_WARN_ON(cpu_buffer, 5438 cpu_buffer->reader_page->read > rb_page_size(reader))) 5439 goto out; 5440 5441 /* check if we caught up to the tail */ 5442 reader = NULL; 5443 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5444 goto out; 5445 5446 /* Don't bother swapping if the ring buffer is empty */ 5447 if (rb_num_of_entries(cpu_buffer) == 0) 5448 goto out; 5449 5450 /* 5451 * Reset the reader page to size zero. 5452 */ 5453 local_set(&cpu_buffer->reader_page->write, 0); 5454 local_set(&cpu_buffer->reader_page->entries, 0); 5455 cpu_buffer->reader_page->real_end = 0; 5456 5457 spin: 5458 /* 5459 * Splice the empty reader page into the list around the head. 5460 */ 5461 reader = rb_set_head_page(cpu_buffer); 5462 if (!reader) 5463 goto out; 5464 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5465 cpu_buffer->reader_page->list.prev = reader->list.prev; 5466 5467 /* 5468 * cpu_buffer->pages just needs to point to the buffer, it 5469 * has no specific buffer page to point to. Lets move it out 5470 * of our way so we don't accidentally swap it. 5471 */ 5472 cpu_buffer->pages = reader->list.prev; 5473 5474 /* The reader page will be pointing to the new head */ 5475 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5476 5477 /* 5478 * We want to make sure we read the overruns after we set up our 5479 * pointers to the next object. The writer side does a 5480 * cmpxchg to cross pages which acts as the mb on the writer 5481 * side. Note, the reader will constantly fail the swap 5482 * while the writer is updating the pointers, so this 5483 * guarantees that the overwrite recorded here is the one we 5484 * want to compare with the last_overrun. 5485 */ 5486 smp_mb(); 5487 overwrite = local_read(&(cpu_buffer->overrun)); 5488 5489 /* 5490 * Here's the tricky part. 5491 * 5492 * We need to move the pointer past the header page. 5493 * But we can only do that if a writer is not currently 5494 * moving it. The page before the header page has the 5495 * flag bit '1' set if it is pointing to the page we want. 5496 * but if the writer is in the process of moving it 5497 * then it will be '2' or already moved '0'. 5498 */ 5499 5500 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5501 5502 /* 5503 * If we did not convert it, then we must try again. 5504 */ 5505 if (!ret) 5506 goto spin; 5507 5508 if (cpu_buffer->ring_meta) 5509 rb_update_meta_reader(cpu_buffer, reader); 5510 5511 /* 5512 * Yay! We succeeded in replacing the page. 5513 * 5514 * Now make the new head point back to the reader page. 5515 */ 5516 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5517 rb_inc_page(&cpu_buffer->head_page); 5518 5519 cpu_buffer->cnt++; 5520 local_inc(&cpu_buffer->pages_read); 5521 5522 /* Finally update the reader page to the new head */ 5523 cpu_buffer->reader_page = reader; 5524 cpu_buffer->reader_page->read = 0; 5525 5526 if (overwrite != cpu_buffer->last_overrun) { 5527 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5528 cpu_buffer->last_overrun = overwrite; 5529 } 5530 5531 goto again; 5532 5533 out: 5534 /* Update the read_stamp on the first event */ 5535 if (reader && reader->read == 0) 5536 cpu_buffer->read_stamp = reader->page->time_stamp; 5537 5538 arch_spin_unlock(&cpu_buffer->lock); 5539 local_irq_restore(flags); 5540 5541 /* 5542 * The writer has preempt disable, wait for it. But not forever 5543 * Although, 1 second is pretty much "forever" 5544 */ 5545 #define USECS_WAIT 1000000 5546 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5547 /* If the write is past the end of page, a writer is still updating it */ 5548 if (likely(!reader || rb_page_write(reader) <= bsize)) 5549 break; 5550 5551 udelay(1); 5552 5553 /* Get the latest version of the reader write value */ 5554 smp_rmb(); 5555 } 5556 5557 /* The writer is not moving forward? Something is wrong */ 5558 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5559 reader = NULL; 5560 5561 /* 5562 * Make sure we see any padding after the write update 5563 * (see rb_reset_tail()). 5564 * 5565 * In addition, a writer may be writing on the reader page 5566 * if the page has not been fully filled, so the read barrier 5567 * is also needed to make sure we see the content of what is 5568 * committed by the writer (see rb_set_commit_to_write()). 5569 */ 5570 smp_rmb(); 5571 5572 5573 return reader; 5574 } 5575 5576 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5577 { 5578 struct ring_buffer_event *event; 5579 struct buffer_page *reader; 5580 unsigned length; 5581 5582 reader = rb_get_reader_page(cpu_buffer); 5583 5584 /* This function should not be called when buffer is empty */ 5585 if (RB_WARN_ON(cpu_buffer, !reader)) 5586 return; 5587 5588 event = rb_reader_event(cpu_buffer); 5589 5590 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5591 cpu_buffer->read++; 5592 5593 rb_update_read_stamp(cpu_buffer, event); 5594 5595 length = rb_event_length(event); 5596 cpu_buffer->reader_page->read += length; 5597 cpu_buffer->read_bytes += length; 5598 } 5599 5600 static void rb_advance_iter(struct ring_buffer_iter *iter) 5601 { 5602 struct ring_buffer_per_cpu *cpu_buffer; 5603 5604 cpu_buffer = iter->cpu_buffer; 5605 5606 /* If head == next_event then we need to jump to the next event */ 5607 if (iter->head == iter->next_event) { 5608 /* If the event gets overwritten again, there's nothing to do */ 5609 if (rb_iter_head_event(iter) == NULL) 5610 return; 5611 } 5612 5613 iter->head = iter->next_event; 5614 5615 /* 5616 * Check if we are at the end of the buffer. 5617 */ 5618 if (iter->next_event >= rb_page_size(iter->head_page)) { 5619 /* discarded commits can make the page empty */ 5620 if (iter->head_page == cpu_buffer->commit_page) 5621 return; 5622 rb_inc_iter(iter); 5623 return; 5624 } 5625 5626 rb_update_iter_read_stamp(iter, iter->event); 5627 } 5628 5629 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5630 { 5631 return cpu_buffer->lost_events; 5632 } 5633 5634 static struct ring_buffer_event * 5635 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5636 unsigned long *lost_events) 5637 { 5638 struct ring_buffer_event *event; 5639 struct buffer_page *reader; 5640 int nr_loops = 0; 5641 5642 if (ts) 5643 *ts = 0; 5644 again: 5645 /* 5646 * We repeat when a time extend is encountered. 5647 * Since the time extend is always attached to a data event, 5648 * we should never loop more than once. 5649 * (We never hit the following condition more than twice). 5650 */ 5651 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5652 return NULL; 5653 5654 reader = rb_get_reader_page(cpu_buffer); 5655 if (!reader) 5656 return NULL; 5657 5658 event = rb_reader_event(cpu_buffer); 5659 5660 switch (event->type_len) { 5661 case RINGBUF_TYPE_PADDING: 5662 if (rb_null_event(event)) 5663 RB_WARN_ON(cpu_buffer, 1); 5664 /* 5665 * Because the writer could be discarding every 5666 * event it creates (which would probably be bad) 5667 * if we were to go back to "again" then we may never 5668 * catch up, and will trigger the warn on, or lock 5669 * the box. Return the padding, and we will release 5670 * the current locks, and try again. 5671 */ 5672 return event; 5673 5674 case RINGBUF_TYPE_TIME_EXTEND: 5675 /* Internal data, OK to advance */ 5676 rb_advance_reader(cpu_buffer); 5677 goto again; 5678 5679 case RINGBUF_TYPE_TIME_STAMP: 5680 if (ts) { 5681 *ts = rb_event_time_stamp(event); 5682 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5683 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5684 cpu_buffer->cpu, ts); 5685 } 5686 /* Internal data, OK to advance */ 5687 rb_advance_reader(cpu_buffer); 5688 goto again; 5689 5690 case RINGBUF_TYPE_DATA: 5691 if (ts && !(*ts)) { 5692 *ts = cpu_buffer->read_stamp + event->time_delta; 5693 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5694 cpu_buffer->cpu, ts); 5695 } 5696 if (lost_events) 5697 *lost_events = rb_lost_events(cpu_buffer); 5698 return event; 5699 5700 default: 5701 RB_WARN_ON(cpu_buffer, 1); 5702 } 5703 5704 return NULL; 5705 } 5706 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5707 5708 static struct ring_buffer_event * 5709 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5710 { 5711 struct trace_buffer *buffer; 5712 struct ring_buffer_per_cpu *cpu_buffer; 5713 struct ring_buffer_event *event; 5714 int nr_loops = 0; 5715 5716 if (ts) 5717 *ts = 0; 5718 5719 cpu_buffer = iter->cpu_buffer; 5720 buffer = cpu_buffer->buffer; 5721 5722 /* 5723 * Check if someone performed a consuming read to the buffer 5724 * or removed some pages from the buffer. In these cases, 5725 * iterator was invalidated and we need to reset it. 5726 */ 5727 if (unlikely(iter->cache_read != cpu_buffer->read || 5728 iter->cache_reader_page != cpu_buffer->reader_page || 5729 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5730 rb_iter_reset(iter); 5731 5732 again: 5733 if (ring_buffer_iter_empty(iter)) 5734 return NULL; 5735 5736 /* 5737 * As the writer can mess with what the iterator is trying 5738 * to read, just give up if we fail to get an event after 5739 * three tries. The iterator is not as reliable when reading 5740 * the ring buffer with an active write as the consumer is. 5741 * Do not warn if the three failures is reached. 5742 */ 5743 if (++nr_loops > 3) 5744 return NULL; 5745 5746 if (rb_per_cpu_empty(cpu_buffer)) 5747 return NULL; 5748 5749 if (iter->head >= rb_page_size(iter->head_page)) { 5750 rb_inc_iter(iter); 5751 goto again; 5752 } 5753 5754 event = rb_iter_head_event(iter); 5755 if (!event) 5756 goto again; 5757 5758 switch (event->type_len) { 5759 case RINGBUF_TYPE_PADDING: 5760 if (rb_null_event(event)) { 5761 rb_inc_iter(iter); 5762 goto again; 5763 } 5764 rb_advance_iter(iter); 5765 return event; 5766 5767 case RINGBUF_TYPE_TIME_EXTEND: 5768 /* Internal data, OK to advance */ 5769 rb_advance_iter(iter); 5770 goto again; 5771 5772 case RINGBUF_TYPE_TIME_STAMP: 5773 if (ts) { 5774 *ts = rb_event_time_stamp(event); 5775 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5776 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5777 cpu_buffer->cpu, ts); 5778 } 5779 /* Internal data, OK to advance */ 5780 rb_advance_iter(iter); 5781 goto again; 5782 5783 case RINGBUF_TYPE_DATA: 5784 if (ts && !(*ts)) { 5785 *ts = iter->read_stamp + event->time_delta; 5786 ring_buffer_normalize_time_stamp(buffer, 5787 cpu_buffer->cpu, ts); 5788 } 5789 return event; 5790 5791 default: 5792 RB_WARN_ON(cpu_buffer, 1); 5793 } 5794 5795 return NULL; 5796 } 5797 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5798 5799 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5800 { 5801 if (likely(!in_nmi())) { 5802 raw_spin_lock(&cpu_buffer->reader_lock); 5803 return true; 5804 } 5805 5806 /* 5807 * If an NMI die dumps out the content of the ring buffer 5808 * trylock must be used to prevent a deadlock if the NMI 5809 * preempted a task that holds the ring buffer locks. If 5810 * we get the lock then all is fine, if not, then continue 5811 * to do the read, but this can corrupt the ring buffer, 5812 * so it must be permanently disabled from future writes. 5813 * Reading from NMI is a oneshot deal. 5814 */ 5815 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5816 return true; 5817 5818 /* Continue without locking, but disable the ring buffer */ 5819 atomic_inc(&cpu_buffer->record_disabled); 5820 return false; 5821 } 5822 5823 static inline void 5824 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5825 { 5826 if (likely(locked)) 5827 raw_spin_unlock(&cpu_buffer->reader_lock); 5828 } 5829 5830 /** 5831 * ring_buffer_peek - peek at the next event to be read 5832 * @buffer: The ring buffer to read 5833 * @cpu: The cpu to peak at 5834 * @ts: The timestamp counter of this event. 5835 * @lost_events: a variable to store if events were lost (may be NULL) 5836 * 5837 * This will return the event that will be read next, but does 5838 * not consume the data. 5839 */ 5840 struct ring_buffer_event * 5841 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5842 unsigned long *lost_events) 5843 { 5844 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5845 struct ring_buffer_event *event; 5846 unsigned long flags; 5847 bool dolock; 5848 5849 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5850 return NULL; 5851 5852 again: 5853 local_irq_save(flags); 5854 dolock = rb_reader_lock(cpu_buffer); 5855 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5856 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5857 rb_advance_reader(cpu_buffer); 5858 rb_reader_unlock(cpu_buffer, dolock); 5859 local_irq_restore(flags); 5860 5861 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5862 goto again; 5863 5864 return event; 5865 } 5866 5867 /** ring_buffer_iter_dropped - report if there are dropped events 5868 * @iter: The ring buffer iterator 5869 * 5870 * Returns true if there was dropped events since the last peek. 5871 */ 5872 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5873 { 5874 bool ret = iter->missed_events != 0; 5875 5876 iter->missed_events = 0; 5877 return ret; 5878 } 5879 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5880 5881 /** 5882 * ring_buffer_iter_peek - peek at the next event to be read 5883 * @iter: The ring buffer iterator 5884 * @ts: The timestamp counter of this event. 5885 * 5886 * This will return the event that will be read next, but does 5887 * not increment the iterator. 5888 */ 5889 struct ring_buffer_event * 5890 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5891 { 5892 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5893 struct ring_buffer_event *event; 5894 unsigned long flags; 5895 5896 again: 5897 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5898 event = rb_iter_peek(iter, ts); 5899 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5900 5901 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5902 goto again; 5903 5904 return event; 5905 } 5906 5907 /** 5908 * ring_buffer_consume - return an event and consume it 5909 * @buffer: The ring buffer to get the next event from 5910 * @cpu: the cpu to read the buffer from 5911 * @ts: a variable to store the timestamp (may be NULL) 5912 * @lost_events: a variable to store if events were lost (may be NULL) 5913 * 5914 * Returns the next event in the ring buffer, and that event is consumed. 5915 * Meaning, that sequential reads will keep returning a different event, 5916 * and eventually empty the ring buffer if the producer is slower. 5917 */ 5918 struct ring_buffer_event * 5919 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5920 unsigned long *lost_events) 5921 { 5922 struct ring_buffer_per_cpu *cpu_buffer; 5923 struct ring_buffer_event *event = NULL; 5924 unsigned long flags; 5925 bool dolock; 5926 5927 again: 5928 /* might be called in atomic */ 5929 preempt_disable(); 5930 5931 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5932 goto out; 5933 5934 cpu_buffer = buffer->buffers[cpu]; 5935 local_irq_save(flags); 5936 dolock = rb_reader_lock(cpu_buffer); 5937 5938 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5939 if (event) { 5940 cpu_buffer->lost_events = 0; 5941 rb_advance_reader(cpu_buffer); 5942 } 5943 5944 rb_reader_unlock(cpu_buffer, dolock); 5945 local_irq_restore(flags); 5946 5947 out: 5948 preempt_enable(); 5949 5950 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5951 goto again; 5952 5953 return event; 5954 } 5955 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5956 5957 /** 5958 * ring_buffer_read_start - start a non consuming read of the buffer 5959 * @buffer: The ring buffer to read from 5960 * @cpu: The cpu buffer to iterate over 5961 * @flags: gfp flags to use for memory allocation 5962 * 5963 * This creates an iterator to allow non-consuming iteration through 5964 * the buffer. If the buffer is disabled for writing, it will produce 5965 * the same information each time, but if the buffer is still writing 5966 * then the first hit of a write will cause the iteration to stop. 5967 * 5968 * Must be paired with ring_buffer_read_finish. 5969 */ 5970 struct ring_buffer_iter * 5971 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 5972 { 5973 struct ring_buffer_per_cpu *cpu_buffer; 5974 struct ring_buffer_iter *iter; 5975 5976 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5977 return NULL; 5978 5979 iter = kzalloc(sizeof(*iter), flags); 5980 if (!iter) 5981 return NULL; 5982 5983 /* Holds the entire event: data and meta data */ 5984 iter->event_size = buffer->subbuf_size; 5985 iter->event = kmalloc(iter->event_size, flags); 5986 if (!iter->event) { 5987 kfree(iter); 5988 return NULL; 5989 } 5990 5991 cpu_buffer = buffer->buffers[cpu]; 5992 5993 iter->cpu_buffer = cpu_buffer; 5994 5995 atomic_inc(&cpu_buffer->resize_disabled); 5996 5997 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 5998 arch_spin_lock(&cpu_buffer->lock); 5999 rb_iter_reset(iter); 6000 arch_spin_unlock(&cpu_buffer->lock); 6001 6002 return iter; 6003 } 6004 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 6005 6006 /** 6007 * ring_buffer_read_finish - finish reading the iterator of the buffer 6008 * @iter: The iterator retrieved by ring_buffer_start 6009 * 6010 * This re-enables resizing of the buffer, and frees the iterator. 6011 */ 6012 void 6013 ring_buffer_read_finish(struct ring_buffer_iter *iter) 6014 { 6015 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6016 6017 /* Use this opportunity to check the integrity of the ring buffer. */ 6018 rb_check_pages(cpu_buffer); 6019 6020 atomic_dec(&cpu_buffer->resize_disabled); 6021 kfree(iter->event); 6022 kfree(iter); 6023 } 6024 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6025 6026 /** 6027 * ring_buffer_iter_advance - advance the iterator to the next location 6028 * @iter: The ring buffer iterator 6029 * 6030 * Move the location of the iterator such that the next read will 6031 * be the next location of the iterator. 6032 */ 6033 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6034 { 6035 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6036 unsigned long flags; 6037 6038 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6039 6040 rb_advance_iter(iter); 6041 6042 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6043 } 6044 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6045 6046 /** 6047 * ring_buffer_size - return the size of the ring buffer (in bytes) 6048 * @buffer: The ring buffer. 6049 * @cpu: The CPU to get ring buffer size from. 6050 */ 6051 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6052 { 6053 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6054 return 0; 6055 6056 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6057 } 6058 EXPORT_SYMBOL_GPL(ring_buffer_size); 6059 6060 /** 6061 * ring_buffer_max_event_size - return the max data size of an event 6062 * @buffer: The ring buffer. 6063 * 6064 * Returns the maximum size an event can be. 6065 */ 6066 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6067 { 6068 /* If abs timestamp is requested, events have a timestamp too */ 6069 if (ring_buffer_time_stamp_abs(buffer)) 6070 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6071 return buffer->max_data_size; 6072 } 6073 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6074 6075 static void rb_clear_buffer_page(struct buffer_page *page) 6076 { 6077 local_set(&page->write, 0); 6078 local_set(&page->entries, 0); 6079 rb_init_page(page->page); 6080 page->read = 0; 6081 } 6082 6083 /* 6084 * When the buffer is memory mapped to user space, each sub buffer 6085 * has a unique id that is used by the meta data to tell the user 6086 * where the current reader page is. 6087 * 6088 * For a normal allocated ring buffer, the id is saved in the buffer page 6089 * id field, and updated via this function. 6090 * 6091 * But for a fixed memory mapped buffer, the id is already assigned for 6092 * fixed memory ordering in the memory layout and can not be used. Instead 6093 * the index of where the page lies in the memory layout is used. 6094 * 6095 * For the normal pages, set the buffer page id with the passed in @id 6096 * value and return that. 6097 * 6098 * For fixed memory mapped pages, get the page index in the memory layout 6099 * and return that as the id. 6100 */ 6101 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6102 struct buffer_page *bpage, int id) 6103 { 6104 /* 6105 * For boot buffers, the id is the index, 6106 * otherwise, set the buffer page with this id 6107 */ 6108 if (cpu_buffer->ring_meta) 6109 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6110 else 6111 bpage->id = id; 6112 6113 return id; 6114 } 6115 6116 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6117 { 6118 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6119 6120 if (!meta) 6121 return; 6122 6123 meta->reader.read = cpu_buffer->reader_page->read; 6124 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6125 cpu_buffer->reader_page->id); 6126 6127 meta->reader.lost_events = cpu_buffer->lost_events; 6128 6129 meta->entries = local_read(&cpu_buffer->entries); 6130 meta->overrun = local_read(&cpu_buffer->overrun); 6131 meta->read = cpu_buffer->read; 6132 6133 /* Some archs do not have data cache coherency between kernel and user-space */ 6134 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6135 } 6136 6137 static void 6138 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6139 { 6140 struct buffer_page *page; 6141 6142 rb_head_page_deactivate(cpu_buffer); 6143 6144 cpu_buffer->head_page 6145 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6146 rb_clear_buffer_page(cpu_buffer->head_page); 6147 list_for_each_entry(page, cpu_buffer->pages, list) { 6148 rb_clear_buffer_page(page); 6149 } 6150 6151 cpu_buffer->tail_page = cpu_buffer->head_page; 6152 cpu_buffer->commit_page = cpu_buffer->head_page; 6153 6154 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6155 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6156 rb_clear_buffer_page(cpu_buffer->reader_page); 6157 6158 local_set(&cpu_buffer->entries_bytes, 0); 6159 local_set(&cpu_buffer->overrun, 0); 6160 local_set(&cpu_buffer->commit_overrun, 0); 6161 local_set(&cpu_buffer->dropped_events, 0); 6162 local_set(&cpu_buffer->entries, 0); 6163 local_set(&cpu_buffer->committing, 0); 6164 local_set(&cpu_buffer->commits, 0); 6165 local_set(&cpu_buffer->pages_touched, 0); 6166 local_set(&cpu_buffer->pages_lost, 0); 6167 local_set(&cpu_buffer->pages_read, 0); 6168 cpu_buffer->last_pages_touch = 0; 6169 cpu_buffer->shortest_full = 0; 6170 cpu_buffer->read = 0; 6171 cpu_buffer->read_bytes = 0; 6172 6173 rb_time_set(&cpu_buffer->write_stamp, 0); 6174 rb_time_set(&cpu_buffer->before_stamp, 0); 6175 6176 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6177 6178 cpu_buffer->lost_events = 0; 6179 cpu_buffer->last_overrun = 0; 6180 6181 rb_head_page_activate(cpu_buffer); 6182 cpu_buffer->pages_removed = 0; 6183 6184 if (cpu_buffer->mapped) { 6185 rb_update_meta_page(cpu_buffer); 6186 if (cpu_buffer->ring_meta) { 6187 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6188 meta->commit_buffer = meta->head_buffer; 6189 } 6190 } 6191 } 6192 6193 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6194 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6195 { 6196 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6197 6198 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6199 return; 6200 6201 arch_spin_lock(&cpu_buffer->lock); 6202 6203 rb_reset_cpu(cpu_buffer); 6204 6205 arch_spin_unlock(&cpu_buffer->lock); 6206 } 6207 6208 /** 6209 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6210 * @buffer: The ring buffer to reset a per cpu buffer of 6211 * @cpu: The CPU buffer to be reset 6212 */ 6213 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6214 { 6215 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6216 6217 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6218 return; 6219 6220 /* prevent another thread from changing buffer sizes */ 6221 mutex_lock(&buffer->mutex); 6222 6223 atomic_inc(&cpu_buffer->resize_disabled); 6224 atomic_inc(&cpu_buffer->record_disabled); 6225 6226 /* Make sure all commits have finished */ 6227 synchronize_rcu(); 6228 6229 reset_disabled_cpu_buffer(cpu_buffer); 6230 6231 atomic_dec(&cpu_buffer->record_disabled); 6232 atomic_dec(&cpu_buffer->resize_disabled); 6233 6234 mutex_unlock(&buffer->mutex); 6235 } 6236 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6237 6238 /* Flag to ensure proper resetting of atomic variables */ 6239 #define RESET_BIT (1 << 30) 6240 6241 /** 6242 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6243 * @buffer: The ring buffer to reset a per cpu buffer of 6244 */ 6245 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6246 { 6247 struct ring_buffer_per_cpu *cpu_buffer; 6248 int cpu; 6249 6250 /* prevent another thread from changing buffer sizes */ 6251 mutex_lock(&buffer->mutex); 6252 6253 for_each_online_buffer_cpu(buffer, cpu) { 6254 cpu_buffer = buffer->buffers[cpu]; 6255 6256 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6257 atomic_inc(&cpu_buffer->record_disabled); 6258 } 6259 6260 /* Make sure all commits have finished */ 6261 synchronize_rcu(); 6262 6263 for_each_buffer_cpu(buffer, cpu) { 6264 cpu_buffer = buffer->buffers[cpu]; 6265 6266 /* 6267 * If a CPU came online during the synchronize_rcu(), then 6268 * ignore it. 6269 */ 6270 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6271 continue; 6272 6273 reset_disabled_cpu_buffer(cpu_buffer); 6274 6275 atomic_dec(&cpu_buffer->record_disabled); 6276 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6277 } 6278 6279 mutex_unlock(&buffer->mutex); 6280 } 6281 6282 /** 6283 * ring_buffer_reset - reset a ring buffer 6284 * @buffer: The ring buffer to reset all cpu buffers 6285 */ 6286 void ring_buffer_reset(struct trace_buffer *buffer) 6287 { 6288 struct ring_buffer_per_cpu *cpu_buffer; 6289 int cpu; 6290 6291 /* prevent another thread from changing buffer sizes */ 6292 mutex_lock(&buffer->mutex); 6293 6294 for_each_buffer_cpu(buffer, cpu) { 6295 cpu_buffer = buffer->buffers[cpu]; 6296 6297 atomic_inc(&cpu_buffer->resize_disabled); 6298 atomic_inc(&cpu_buffer->record_disabled); 6299 } 6300 6301 /* Make sure all commits have finished */ 6302 synchronize_rcu(); 6303 6304 for_each_buffer_cpu(buffer, cpu) { 6305 cpu_buffer = buffer->buffers[cpu]; 6306 6307 reset_disabled_cpu_buffer(cpu_buffer); 6308 6309 atomic_dec(&cpu_buffer->record_disabled); 6310 atomic_dec(&cpu_buffer->resize_disabled); 6311 } 6312 6313 mutex_unlock(&buffer->mutex); 6314 } 6315 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6316 6317 /** 6318 * ring_buffer_empty - is the ring buffer empty? 6319 * @buffer: The ring buffer to test 6320 */ 6321 bool ring_buffer_empty(struct trace_buffer *buffer) 6322 { 6323 struct ring_buffer_per_cpu *cpu_buffer; 6324 unsigned long flags; 6325 bool dolock; 6326 bool ret; 6327 int cpu; 6328 6329 /* yes this is racy, but if you don't like the race, lock the buffer */ 6330 for_each_buffer_cpu(buffer, cpu) { 6331 cpu_buffer = buffer->buffers[cpu]; 6332 local_irq_save(flags); 6333 dolock = rb_reader_lock(cpu_buffer); 6334 ret = rb_per_cpu_empty(cpu_buffer); 6335 rb_reader_unlock(cpu_buffer, dolock); 6336 local_irq_restore(flags); 6337 6338 if (!ret) 6339 return false; 6340 } 6341 6342 return true; 6343 } 6344 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6345 6346 /** 6347 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6348 * @buffer: The ring buffer 6349 * @cpu: The CPU buffer to test 6350 */ 6351 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6352 { 6353 struct ring_buffer_per_cpu *cpu_buffer; 6354 unsigned long flags; 6355 bool dolock; 6356 bool ret; 6357 6358 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6359 return true; 6360 6361 cpu_buffer = buffer->buffers[cpu]; 6362 local_irq_save(flags); 6363 dolock = rb_reader_lock(cpu_buffer); 6364 ret = rb_per_cpu_empty(cpu_buffer); 6365 rb_reader_unlock(cpu_buffer, dolock); 6366 local_irq_restore(flags); 6367 6368 return ret; 6369 } 6370 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6371 6372 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6373 /** 6374 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6375 * @buffer_a: One buffer to swap with 6376 * @buffer_b: The other buffer to swap with 6377 * @cpu: the CPU of the buffers to swap 6378 * 6379 * This function is useful for tracers that want to take a "snapshot" 6380 * of a CPU buffer and has another back up buffer lying around. 6381 * it is expected that the tracer handles the cpu buffer not being 6382 * used at the moment. 6383 */ 6384 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6385 struct trace_buffer *buffer_b, int cpu) 6386 { 6387 struct ring_buffer_per_cpu *cpu_buffer_a; 6388 struct ring_buffer_per_cpu *cpu_buffer_b; 6389 int ret = -EINVAL; 6390 6391 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6392 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6393 return -EINVAL; 6394 6395 cpu_buffer_a = buffer_a->buffers[cpu]; 6396 cpu_buffer_b = buffer_b->buffers[cpu]; 6397 6398 /* It's up to the callers to not try to swap mapped buffers */ 6399 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6400 return -EBUSY; 6401 6402 /* At least make sure the two buffers are somewhat the same */ 6403 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6404 return -EINVAL; 6405 6406 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6407 return -EINVAL; 6408 6409 if (atomic_read(&buffer_a->record_disabled)) 6410 return -EAGAIN; 6411 6412 if (atomic_read(&buffer_b->record_disabled)) 6413 return -EAGAIN; 6414 6415 if (atomic_read(&cpu_buffer_a->record_disabled)) 6416 return -EAGAIN; 6417 6418 if (atomic_read(&cpu_buffer_b->record_disabled)) 6419 return -EAGAIN; 6420 6421 /* 6422 * We can't do a synchronize_rcu here because this 6423 * function can be called in atomic context. 6424 * Normally this will be called from the same CPU as cpu. 6425 * If not it's up to the caller to protect this. 6426 */ 6427 atomic_inc(&cpu_buffer_a->record_disabled); 6428 atomic_inc(&cpu_buffer_b->record_disabled); 6429 6430 ret = -EBUSY; 6431 if (local_read(&cpu_buffer_a->committing)) 6432 goto out_dec; 6433 if (local_read(&cpu_buffer_b->committing)) 6434 goto out_dec; 6435 6436 /* 6437 * When resize is in progress, we cannot swap it because 6438 * it will mess the state of the cpu buffer. 6439 */ 6440 if (atomic_read(&buffer_a->resizing)) 6441 goto out_dec; 6442 if (atomic_read(&buffer_b->resizing)) 6443 goto out_dec; 6444 6445 buffer_a->buffers[cpu] = cpu_buffer_b; 6446 buffer_b->buffers[cpu] = cpu_buffer_a; 6447 6448 cpu_buffer_b->buffer = buffer_a; 6449 cpu_buffer_a->buffer = buffer_b; 6450 6451 ret = 0; 6452 6453 out_dec: 6454 atomic_dec(&cpu_buffer_a->record_disabled); 6455 atomic_dec(&cpu_buffer_b->record_disabled); 6456 return ret; 6457 } 6458 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6459 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6460 6461 /** 6462 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6463 * @buffer: the buffer to allocate for. 6464 * @cpu: the cpu buffer to allocate. 6465 * 6466 * This function is used in conjunction with ring_buffer_read_page. 6467 * When reading a full page from the ring buffer, these functions 6468 * can be used to speed up the process. The calling function should 6469 * allocate a few pages first with this function. Then when it 6470 * needs to get pages from the ring buffer, it passes the result 6471 * of this function into ring_buffer_read_page, which will swap 6472 * the page that was allocated, with the read page of the buffer. 6473 * 6474 * Returns: 6475 * The page allocated, or ERR_PTR 6476 */ 6477 struct buffer_data_read_page * 6478 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6479 { 6480 struct ring_buffer_per_cpu *cpu_buffer; 6481 struct buffer_data_read_page *bpage = NULL; 6482 unsigned long flags; 6483 6484 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6485 return ERR_PTR(-ENODEV); 6486 6487 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6488 if (!bpage) 6489 return ERR_PTR(-ENOMEM); 6490 6491 bpage->order = buffer->subbuf_order; 6492 cpu_buffer = buffer->buffers[cpu]; 6493 local_irq_save(flags); 6494 arch_spin_lock(&cpu_buffer->lock); 6495 6496 if (cpu_buffer->free_page) { 6497 bpage->data = cpu_buffer->free_page; 6498 cpu_buffer->free_page = NULL; 6499 } 6500 6501 arch_spin_unlock(&cpu_buffer->lock); 6502 local_irq_restore(flags); 6503 6504 if (bpage->data) { 6505 rb_init_page(bpage->data); 6506 } else { 6507 bpage->data = alloc_cpu_data(cpu, cpu_buffer->buffer->subbuf_order); 6508 if (!bpage->data) { 6509 kfree(bpage); 6510 return ERR_PTR(-ENOMEM); 6511 } 6512 } 6513 6514 return bpage; 6515 } 6516 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6517 6518 /** 6519 * ring_buffer_free_read_page - free an allocated read page 6520 * @buffer: the buffer the page was allocate for 6521 * @cpu: the cpu buffer the page came from 6522 * @data_page: the page to free 6523 * 6524 * Free a page allocated from ring_buffer_alloc_read_page. 6525 */ 6526 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6527 struct buffer_data_read_page *data_page) 6528 { 6529 struct ring_buffer_per_cpu *cpu_buffer; 6530 struct buffer_data_page *bpage = data_page->data; 6531 struct page *page = virt_to_page(bpage); 6532 unsigned long flags; 6533 6534 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6535 return; 6536 6537 cpu_buffer = buffer->buffers[cpu]; 6538 6539 /* 6540 * If the page is still in use someplace else, or order of the page 6541 * is different from the subbuffer order of the buffer - 6542 * we can't reuse it 6543 */ 6544 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6545 goto out; 6546 6547 local_irq_save(flags); 6548 arch_spin_lock(&cpu_buffer->lock); 6549 6550 if (!cpu_buffer->free_page) { 6551 cpu_buffer->free_page = bpage; 6552 bpage = NULL; 6553 } 6554 6555 arch_spin_unlock(&cpu_buffer->lock); 6556 local_irq_restore(flags); 6557 6558 out: 6559 free_pages((unsigned long)bpage, data_page->order); 6560 kfree(data_page); 6561 } 6562 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6563 6564 /** 6565 * ring_buffer_read_page - extract a page from the ring buffer 6566 * @buffer: buffer to extract from 6567 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6568 * @len: amount to extract 6569 * @cpu: the cpu of the buffer to extract 6570 * @full: should the extraction only happen when the page is full. 6571 * 6572 * This function will pull out a page from the ring buffer and consume it. 6573 * @data_page must be the address of the variable that was returned 6574 * from ring_buffer_alloc_read_page. This is because the page might be used 6575 * to swap with a page in the ring buffer. 6576 * 6577 * for example: 6578 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6579 * if (IS_ERR(rpage)) 6580 * return PTR_ERR(rpage); 6581 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6582 * if (ret >= 0) 6583 * process_page(ring_buffer_read_page_data(rpage), ret); 6584 * ring_buffer_free_read_page(buffer, cpu, rpage); 6585 * 6586 * When @full is set, the function will not return true unless 6587 * the writer is off the reader page. 6588 * 6589 * Note: it is up to the calling functions to handle sleeps and wakeups. 6590 * The ring buffer can be used anywhere in the kernel and can not 6591 * blindly call wake_up. The layer that uses the ring buffer must be 6592 * responsible for that. 6593 * 6594 * Returns: 6595 * >=0 if data has been transferred, returns the offset of consumed data. 6596 * <0 if no data has been transferred. 6597 */ 6598 int ring_buffer_read_page(struct trace_buffer *buffer, 6599 struct buffer_data_read_page *data_page, 6600 size_t len, int cpu, int full) 6601 { 6602 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6603 struct ring_buffer_event *event; 6604 struct buffer_data_page *bpage; 6605 struct buffer_page *reader; 6606 unsigned long missed_events; 6607 unsigned int commit; 6608 unsigned int read; 6609 u64 save_timestamp; 6610 6611 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6612 return -1; 6613 6614 /* 6615 * If len is not big enough to hold the page header, then 6616 * we can not copy anything. 6617 */ 6618 if (len <= BUF_PAGE_HDR_SIZE) 6619 return -1; 6620 6621 len -= BUF_PAGE_HDR_SIZE; 6622 6623 if (!data_page || !data_page->data) 6624 return -1; 6625 6626 if (data_page->order != buffer->subbuf_order) 6627 return -1; 6628 6629 bpage = data_page->data; 6630 if (!bpage) 6631 return -1; 6632 6633 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6634 6635 reader = rb_get_reader_page(cpu_buffer); 6636 if (!reader) 6637 return -1; 6638 6639 event = rb_reader_event(cpu_buffer); 6640 6641 read = reader->read; 6642 commit = rb_page_size(reader); 6643 6644 /* Check if any events were dropped */ 6645 missed_events = cpu_buffer->lost_events; 6646 6647 /* 6648 * If this page has been partially read or 6649 * if len is not big enough to read the rest of the page or 6650 * a writer is still on the page, then 6651 * we must copy the data from the page to the buffer. 6652 * Otherwise, we can simply swap the page with the one passed in. 6653 */ 6654 if (read || (len < (commit - read)) || 6655 cpu_buffer->reader_page == cpu_buffer->commit_page || 6656 cpu_buffer->mapped) { 6657 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6658 unsigned int rpos = read; 6659 unsigned int pos = 0; 6660 unsigned int size; 6661 6662 /* 6663 * If a full page is expected, this can still be returned 6664 * if there's been a previous partial read and the 6665 * rest of the page can be read and the commit page is off 6666 * the reader page. 6667 */ 6668 if (full && 6669 (!read || (len < (commit - read)) || 6670 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6671 return -1; 6672 6673 if (len > (commit - read)) 6674 len = (commit - read); 6675 6676 /* Always keep the time extend and data together */ 6677 size = rb_event_ts_length(event); 6678 6679 if (len < size) 6680 return -1; 6681 6682 /* save the current timestamp, since the user will need it */ 6683 save_timestamp = cpu_buffer->read_stamp; 6684 6685 /* Need to copy one event at a time */ 6686 do { 6687 /* We need the size of one event, because 6688 * rb_advance_reader only advances by one event, 6689 * whereas rb_event_ts_length may include the size of 6690 * one or two events. 6691 * We have already ensured there's enough space if this 6692 * is a time extend. */ 6693 size = rb_event_length(event); 6694 memcpy(bpage->data + pos, rpage->data + rpos, size); 6695 6696 len -= size; 6697 6698 rb_advance_reader(cpu_buffer); 6699 rpos = reader->read; 6700 pos += size; 6701 6702 if (rpos >= commit) 6703 break; 6704 6705 event = rb_reader_event(cpu_buffer); 6706 /* Always keep the time extend and data together */ 6707 size = rb_event_ts_length(event); 6708 } while (len >= size); 6709 6710 /* update bpage */ 6711 local_set(&bpage->commit, pos); 6712 bpage->time_stamp = save_timestamp; 6713 6714 /* we copied everything to the beginning */ 6715 read = 0; 6716 } else { 6717 /* update the entry counter */ 6718 cpu_buffer->read += rb_page_entries(reader); 6719 cpu_buffer->read_bytes += rb_page_size(reader); 6720 6721 /* swap the pages */ 6722 rb_init_page(bpage); 6723 bpage = reader->page; 6724 reader->page = data_page->data; 6725 local_set(&reader->write, 0); 6726 local_set(&reader->entries, 0); 6727 reader->read = 0; 6728 data_page->data = bpage; 6729 6730 /* 6731 * Use the real_end for the data size, 6732 * This gives us a chance to store the lost events 6733 * on the page. 6734 */ 6735 if (reader->real_end) 6736 local_set(&bpage->commit, reader->real_end); 6737 } 6738 6739 cpu_buffer->lost_events = 0; 6740 6741 commit = local_read(&bpage->commit); 6742 /* 6743 * Set a flag in the commit field if we lost events 6744 */ 6745 if (missed_events) { 6746 /* If there is room at the end of the page to save the 6747 * missed events, then record it there. 6748 */ 6749 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6750 memcpy(&bpage->data[commit], &missed_events, 6751 sizeof(missed_events)); 6752 local_add(RB_MISSED_STORED, &bpage->commit); 6753 commit += sizeof(missed_events); 6754 } 6755 local_add(RB_MISSED_EVENTS, &bpage->commit); 6756 } 6757 6758 /* 6759 * This page may be off to user land. Zero it out here. 6760 */ 6761 if (commit < buffer->subbuf_size) 6762 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6763 6764 return read; 6765 } 6766 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6767 6768 /** 6769 * ring_buffer_read_page_data - get pointer to the data in the page. 6770 * @page: the page to get the data from 6771 * 6772 * Returns pointer to the actual data in this page. 6773 */ 6774 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6775 { 6776 return page->data; 6777 } 6778 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6779 6780 /** 6781 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6782 * @buffer: the buffer to get the sub buffer size from 6783 * 6784 * Returns size of the sub buffer, in bytes. 6785 */ 6786 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6787 { 6788 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6789 } 6790 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6791 6792 /** 6793 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6794 * @buffer: The ring_buffer to get the system sub page order from 6795 * 6796 * By default, one ring buffer sub page equals to one system page. This parameter 6797 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6798 * extended, but must be an order of system page size. 6799 * 6800 * Returns the order of buffer sub page size, in system pages: 6801 * 0 means the sub buffer size is 1 system page and so forth. 6802 * In case of an error < 0 is returned. 6803 */ 6804 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6805 { 6806 if (!buffer) 6807 return -EINVAL; 6808 6809 return buffer->subbuf_order; 6810 } 6811 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6812 6813 /** 6814 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6815 * @buffer: The ring_buffer to set the new page size. 6816 * @order: Order of the system pages in one sub buffer page 6817 * 6818 * By default, one ring buffer pages equals to one system page. This API can be 6819 * used to set new size of the ring buffer page. The size must be order of 6820 * system page size, that's why the input parameter @order is the order of 6821 * system pages that are allocated for one ring buffer page: 6822 * 0 - 1 system page 6823 * 1 - 2 system pages 6824 * 3 - 4 system pages 6825 * ... 6826 * 6827 * Returns 0 on success or < 0 in case of an error. 6828 */ 6829 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6830 { 6831 struct ring_buffer_per_cpu *cpu_buffer; 6832 struct buffer_page *bpage, *tmp; 6833 int old_order, old_size; 6834 int nr_pages; 6835 int psize; 6836 int err; 6837 int cpu; 6838 6839 if (!buffer || order < 0) 6840 return -EINVAL; 6841 6842 if (buffer->subbuf_order == order) 6843 return 0; 6844 6845 psize = (1 << order) * PAGE_SIZE; 6846 if (psize <= BUF_PAGE_HDR_SIZE) 6847 return -EINVAL; 6848 6849 /* Size of a subbuf cannot be greater than the write counter */ 6850 if (psize > RB_WRITE_MASK + 1) 6851 return -EINVAL; 6852 6853 old_order = buffer->subbuf_order; 6854 old_size = buffer->subbuf_size; 6855 6856 /* prevent another thread from changing buffer sizes */ 6857 guard(mutex)(&buffer->mutex); 6858 atomic_inc(&buffer->record_disabled); 6859 6860 /* Make sure all commits have finished */ 6861 synchronize_rcu(); 6862 6863 buffer->subbuf_order = order; 6864 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6865 6866 /* Make sure all new buffers are allocated, before deleting the old ones */ 6867 for_each_buffer_cpu(buffer, cpu) { 6868 6869 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6870 continue; 6871 6872 cpu_buffer = buffer->buffers[cpu]; 6873 6874 if (cpu_buffer->mapped) { 6875 err = -EBUSY; 6876 goto error; 6877 } 6878 6879 /* Update the number of pages to match the new size */ 6880 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6881 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6882 6883 /* we need a minimum of two pages */ 6884 if (nr_pages < 2) 6885 nr_pages = 2; 6886 6887 cpu_buffer->nr_pages_to_update = nr_pages; 6888 6889 /* Include the reader page */ 6890 nr_pages++; 6891 6892 /* Allocate the new size buffer */ 6893 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6894 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6895 &cpu_buffer->new_pages)) { 6896 /* not enough memory for new pages */ 6897 err = -ENOMEM; 6898 goto error; 6899 } 6900 } 6901 6902 for_each_buffer_cpu(buffer, cpu) { 6903 struct buffer_data_page *old_free_data_page; 6904 struct list_head old_pages; 6905 unsigned long flags; 6906 6907 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6908 continue; 6909 6910 cpu_buffer = buffer->buffers[cpu]; 6911 6912 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6913 6914 /* Clear the head bit to make the link list normal to read */ 6915 rb_head_page_deactivate(cpu_buffer); 6916 6917 /* 6918 * Collect buffers from the cpu_buffer pages list and the 6919 * reader_page on old_pages, so they can be freed later when not 6920 * under a spinlock. The pages list is a linked list with no 6921 * head, adding old_pages turns it into a regular list with 6922 * old_pages being the head. 6923 */ 6924 list_add(&old_pages, cpu_buffer->pages); 6925 list_add(&cpu_buffer->reader_page->list, &old_pages); 6926 6927 /* One page was allocated for the reader page */ 6928 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6929 struct buffer_page, list); 6930 list_del_init(&cpu_buffer->reader_page->list); 6931 6932 /* Install the new pages, remove the head from the list */ 6933 cpu_buffer->pages = cpu_buffer->new_pages.next; 6934 list_del_init(&cpu_buffer->new_pages); 6935 cpu_buffer->cnt++; 6936 6937 cpu_buffer->head_page 6938 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6939 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6940 6941 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6942 cpu_buffer->nr_pages_to_update = 0; 6943 6944 old_free_data_page = cpu_buffer->free_page; 6945 cpu_buffer->free_page = NULL; 6946 6947 rb_head_page_activate(cpu_buffer); 6948 6949 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6950 6951 /* Free old sub buffers */ 6952 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6953 list_del_init(&bpage->list); 6954 free_buffer_page(bpage); 6955 } 6956 free_pages((unsigned long)old_free_data_page, old_order); 6957 6958 rb_check_pages(cpu_buffer); 6959 } 6960 6961 atomic_dec(&buffer->record_disabled); 6962 6963 return 0; 6964 6965 error: 6966 buffer->subbuf_order = old_order; 6967 buffer->subbuf_size = old_size; 6968 6969 atomic_dec(&buffer->record_disabled); 6970 6971 for_each_buffer_cpu(buffer, cpu) { 6972 cpu_buffer = buffer->buffers[cpu]; 6973 6974 if (!cpu_buffer->nr_pages_to_update) 6975 continue; 6976 6977 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6978 list_del_init(&bpage->list); 6979 free_buffer_page(bpage); 6980 } 6981 } 6982 6983 return err; 6984 } 6985 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6986 6987 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6988 { 6989 struct page *page; 6990 6991 if (cpu_buffer->meta_page) 6992 return 0; 6993 6994 page = alloc_page(GFP_USER | __GFP_ZERO); 6995 if (!page) 6996 return -ENOMEM; 6997 6998 cpu_buffer->meta_page = page_to_virt(page); 6999 7000 return 0; 7001 } 7002 7003 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7004 { 7005 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 7006 7007 free_page(addr); 7008 cpu_buffer->meta_page = NULL; 7009 } 7010 7011 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7012 unsigned long *subbuf_ids) 7013 { 7014 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7015 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7016 struct buffer_page *first_subbuf, *subbuf; 7017 int cnt = 0; 7018 int id = 0; 7019 7020 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7021 subbuf_ids[id++] = (unsigned long)cpu_buffer->reader_page->page; 7022 cnt++; 7023 7024 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7025 do { 7026 id = rb_page_id(cpu_buffer, subbuf, id); 7027 7028 if (WARN_ON(id >= nr_subbufs)) 7029 break; 7030 7031 subbuf_ids[id] = (unsigned long)subbuf->page; 7032 7033 rb_inc_page(&subbuf); 7034 id++; 7035 cnt++; 7036 } while (subbuf != first_subbuf); 7037 7038 WARN_ON(cnt != nr_subbufs); 7039 7040 /* install subbuf ID to kern VA translation */ 7041 cpu_buffer->subbuf_ids = subbuf_ids; 7042 7043 meta->meta_struct_len = sizeof(*meta); 7044 meta->nr_subbufs = nr_subbufs; 7045 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7046 meta->meta_page_size = meta->subbuf_size; 7047 7048 rb_update_meta_page(cpu_buffer); 7049 } 7050 7051 static struct ring_buffer_per_cpu * 7052 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7053 { 7054 struct ring_buffer_per_cpu *cpu_buffer; 7055 7056 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7057 return ERR_PTR(-EINVAL); 7058 7059 cpu_buffer = buffer->buffers[cpu]; 7060 7061 mutex_lock(&cpu_buffer->mapping_lock); 7062 7063 if (!cpu_buffer->user_mapped) { 7064 mutex_unlock(&cpu_buffer->mapping_lock); 7065 return ERR_PTR(-ENODEV); 7066 } 7067 7068 return cpu_buffer; 7069 } 7070 7071 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7072 { 7073 mutex_unlock(&cpu_buffer->mapping_lock); 7074 } 7075 7076 /* 7077 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7078 * to be set-up or torn-down. 7079 */ 7080 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7081 bool inc) 7082 { 7083 unsigned long flags; 7084 7085 lockdep_assert_held(&cpu_buffer->mapping_lock); 7086 7087 /* mapped is always greater or equal to user_mapped */ 7088 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7089 return -EINVAL; 7090 7091 if (inc && cpu_buffer->mapped == UINT_MAX) 7092 return -EBUSY; 7093 7094 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7095 return -EINVAL; 7096 7097 mutex_lock(&cpu_buffer->buffer->mutex); 7098 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7099 7100 if (inc) { 7101 cpu_buffer->user_mapped++; 7102 cpu_buffer->mapped++; 7103 } else { 7104 cpu_buffer->user_mapped--; 7105 cpu_buffer->mapped--; 7106 } 7107 7108 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7109 mutex_unlock(&cpu_buffer->buffer->mutex); 7110 7111 return 0; 7112 } 7113 7114 /* 7115 * +--------------+ pgoff == 0 7116 * | meta page | 7117 * +--------------+ pgoff == 1 7118 * | subbuffer 0 | 7119 * | | 7120 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7121 * | subbuffer 1 | 7122 * | | 7123 * ... 7124 */ 7125 #ifdef CONFIG_MMU 7126 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7127 struct vm_area_struct *vma) 7128 { 7129 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7130 unsigned int subbuf_pages, subbuf_order; 7131 struct page **pages __free(kfree) = NULL; 7132 int p = 0, s = 0; 7133 int err; 7134 7135 /* Refuse MP_PRIVATE or writable mappings */ 7136 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7137 !(vma->vm_flags & VM_MAYSHARE)) 7138 return -EPERM; 7139 7140 subbuf_order = cpu_buffer->buffer->subbuf_order; 7141 subbuf_pages = 1 << subbuf_order; 7142 7143 if (subbuf_order && pgoff % subbuf_pages) 7144 return -EINVAL; 7145 7146 /* 7147 * Make sure the mapping cannot become writable later. Also tell the VM 7148 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7149 */ 7150 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7151 VM_MAYWRITE); 7152 7153 lockdep_assert_held(&cpu_buffer->mapping_lock); 7154 7155 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7156 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7157 if (nr_pages <= pgoff) 7158 return -EINVAL; 7159 7160 nr_pages -= pgoff; 7161 7162 nr_vma_pages = vma_pages(vma); 7163 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7164 return -EINVAL; 7165 7166 nr_pages = nr_vma_pages; 7167 7168 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7169 if (!pages) 7170 return -ENOMEM; 7171 7172 if (!pgoff) { 7173 unsigned long meta_page_padding; 7174 7175 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7176 7177 /* 7178 * Pad with the zero-page to align the meta-page with the 7179 * sub-buffers. 7180 */ 7181 meta_page_padding = subbuf_pages - 1; 7182 while (meta_page_padding-- && p < nr_pages) { 7183 unsigned long __maybe_unused zero_addr = 7184 vma->vm_start + (PAGE_SIZE * p); 7185 7186 pages[p++] = ZERO_PAGE(zero_addr); 7187 } 7188 } else { 7189 /* Skip the meta-page */ 7190 pgoff -= subbuf_pages; 7191 7192 s += pgoff / subbuf_pages; 7193 } 7194 7195 while (p < nr_pages) { 7196 struct page *page; 7197 int off = 0; 7198 7199 if (WARN_ON_ONCE(s >= nr_subbufs)) 7200 return -EINVAL; 7201 7202 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7203 7204 for (; off < (1 << (subbuf_order)); off++, page++) { 7205 if (p >= nr_pages) 7206 break; 7207 7208 pages[p++] = page; 7209 } 7210 s++; 7211 } 7212 7213 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7214 7215 return err; 7216 } 7217 #else 7218 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7219 struct vm_area_struct *vma) 7220 { 7221 return -EOPNOTSUPP; 7222 } 7223 #endif 7224 7225 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7226 struct vm_area_struct *vma) 7227 { 7228 struct ring_buffer_per_cpu *cpu_buffer; 7229 unsigned long flags, *subbuf_ids; 7230 int err; 7231 7232 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7233 return -EINVAL; 7234 7235 cpu_buffer = buffer->buffers[cpu]; 7236 7237 guard(mutex)(&cpu_buffer->mapping_lock); 7238 7239 if (cpu_buffer->user_mapped) { 7240 err = __rb_map_vma(cpu_buffer, vma); 7241 if (!err) 7242 err = __rb_inc_dec_mapped(cpu_buffer, true); 7243 return err; 7244 } 7245 7246 /* prevent another thread from changing buffer/sub-buffer sizes */ 7247 guard(mutex)(&buffer->mutex); 7248 7249 err = rb_alloc_meta_page(cpu_buffer); 7250 if (err) 7251 return err; 7252 7253 /* subbuf_ids include the reader while nr_pages does not */ 7254 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7255 if (!subbuf_ids) { 7256 rb_free_meta_page(cpu_buffer); 7257 return -ENOMEM; 7258 } 7259 7260 atomic_inc(&cpu_buffer->resize_disabled); 7261 7262 /* 7263 * Lock all readers to block any subbuf swap until the subbuf IDs are 7264 * assigned. 7265 */ 7266 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7267 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7268 7269 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7270 7271 err = __rb_map_vma(cpu_buffer, vma); 7272 if (!err) { 7273 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7274 /* This is the first time it is mapped by user */ 7275 cpu_buffer->mapped++; 7276 cpu_buffer->user_mapped = 1; 7277 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7278 } else { 7279 kfree(cpu_buffer->subbuf_ids); 7280 cpu_buffer->subbuf_ids = NULL; 7281 rb_free_meta_page(cpu_buffer); 7282 atomic_dec(&cpu_buffer->resize_disabled); 7283 } 7284 7285 return err; 7286 } 7287 7288 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7289 { 7290 struct ring_buffer_per_cpu *cpu_buffer; 7291 unsigned long flags; 7292 7293 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7294 return -EINVAL; 7295 7296 cpu_buffer = buffer->buffers[cpu]; 7297 7298 guard(mutex)(&cpu_buffer->mapping_lock); 7299 7300 if (!cpu_buffer->user_mapped) { 7301 return -ENODEV; 7302 } else if (cpu_buffer->user_mapped > 1) { 7303 __rb_inc_dec_mapped(cpu_buffer, false); 7304 return 0; 7305 } 7306 7307 guard(mutex)(&buffer->mutex); 7308 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7309 7310 /* This is the last user space mapping */ 7311 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7312 cpu_buffer->mapped--; 7313 cpu_buffer->user_mapped = 0; 7314 7315 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7316 7317 kfree(cpu_buffer->subbuf_ids); 7318 cpu_buffer->subbuf_ids = NULL; 7319 rb_free_meta_page(cpu_buffer); 7320 atomic_dec(&cpu_buffer->resize_disabled); 7321 7322 return 0; 7323 } 7324 7325 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7326 { 7327 struct ring_buffer_per_cpu *cpu_buffer; 7328 struct buffer_page *reader; 7329 unsigned long missed_events; 7330 unsigned long reader_size; 7331 unsigned long flags; 7332 7333 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7334 if (IS_ERR(cpu_buffer)) 7335 return (int)PTR_ERR(cpu_buffer); 7336 7337 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7338 7339 consume: 7340 if (rb_per_cpu_empty(cpu_buffer)) 7341 goto out; 7342 7343 reader_size = rb_page_size(cpu_buffer->reader_page); 7344 7345 /* 7346 * There are data to be read on the current reader page, we can 7347 * return to the caller. But before that, we assume the latter will read 7348 * everything. Let's update the kernel reader accordingly. 7349 */ 7350 if (cpu_buffer->reader_page->read < reader_size) { 7351 while (cpu_buffer->reader_page->read < reader_size) 7352 rb_advance_reader(cpu_buffer); 7353 goto out; 7354 } 7355 7356 /* Did the reader catch up with the writer? */ 7357 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 7358 goto out; 7359 7360 reader = rb_get_reader_page(cpu_buffer); 7361 if (WARN_ON(!reader)) 7362 goto out; 7363 7364 /* Check if any events were dropped */ 7365 missed_events = cpu_buffer->lost_events; 7366 7367 if (missed_events) { 7368 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7369 struct buffer_data_page *bpage = reader->page; 7370 unsigned int commit; 7371 /* 7372 * Use the real_end for the data size, 7373 * This gives us a chance to store the lost events 7374 * on the page. 7375 */ 7376 if (reader->real_end) 7377 local_set(&bpage->commit, reader->real_end); 7378 /* 7379 * If there is room at the end of the page to save the 7380 * missed events, then record it there. 7381 */ 7382 commit = rb_page_size(reader); 7383 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7384 memcpy(&bpage->data[commit], &missed_events, 7385 sizeof(missed_events)); 7386 local_add(RB_MISSED_STORED, &bpage->commit); 7387 } 7388 local_add(RB_MISSED_EVENTS, &bpage->commit); 7389 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7390 "Reader on commit with %ld missed events", 7391 missed_events)) { 7392 /* 7393 * There shouldn't be any missed events if the tail_page 7394 * is on the reader page. But if the tail page is not on the 7395 * reader page and the commit_page is, that would mean that 7396 * there's a commit_overrun (an interrupt preempted an 7397 * addition of an event and then filled the buffer 7398 * with new events). In this case it's not an 7399 * error, but it should still be reported. 7400 * 7401 * TODO: Add missed events to the page for user space to know. 7402 */ 7403 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7404 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7405 } 7406 } 7407 7408 cpu_buffer->lost_events = 0; 7409 7410 goto consume; 7411 7412 out: 7413 /* Some archs do not have data cache coherency between kernel and user-space */ 7414 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7415 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7416 7417 rb_update_meta_page(cpu_buffer); 7418 7419 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7420 rb_put_mapped_buffer(cpu_buffer); 7421 7422 return 0; 7423 } 7424 7425 /* 7426 * We only allocate new buffers, never free them if the CPU goes down. 7427 * If we were to free the buffer, then the user would lose any trace that was in 7428 * the buffer. 7429 */ 7430 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7431 { 7432 struct trace_buffer *buffer; 7433 long nr_pages_same; 7434 int cpu_i; 7435 unsigned long nr_pages; 7436 7437 buffer = container_of(node, struct trace_buffer, node); 7438 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7439 return 0; 7440 7441 nr_pages = 0; 7442 nr_pages_same = 1; 7443 /* check if all cpu sizes are same */ 7444 for_each_buffer_cpu(buffer, cpu_i) { 7445 /* fill in the size from first enabled cpu */ 7446 if (nr_pages == 0) 7447 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7448 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7449 nr_pages_same = 0; 7450 break; 7451 } 7452 } 7453 /* allocate minimum pages, user can later expand it */ 7454 if (!nr_pages_same) 7455 nr_pages = 2; 7456 buffer->buffers[cpu] = 7457 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7458 if (!buffer->buffers[cpu]) { 7459 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7460 cpu); 7461 return -ENOMEM; 7462 } 7463 smp_wmb(); 7464 cpumask_set_cpu(cpu, buffer->cpumask); 7465 return 0; 7466 } 7467 7468 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7469 /* 7470 * This is a basic integrity check of the ring buffer. 7471 * Late in the boot cycle this test will run when configured in. 7472 * It will kick off a thread per CPU that will go into a loop 7473 * writing to the per cpu ring buffer various sizes of data. 7474 * Some of the data will be large items, some small. 7475 * 7476 * Another thread is created that goes into a spin, sending out 7477 * IPIs to the other CPUs to also write into the ring buffer. 7478 * this is to test the nesting ability of the buffer. 7479 * 7480 * Basic stats are recorded and reported. If something in the 7481 * ring buffer should happen that's not expected, a big warning 7482 * is displayed and all ring buffers are disabled. 7483 */ 7484 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7485 7486 struct rb_test_data { 7487 struct trace_buffer *buffer; 7488 unsigned long events; 7489 unsigned long bytes_written; 7490 unsigned long bytes_alloc; 7491 unsigned long bytes_dropped; 7492 unsigned long events_nested; 7493 unsigned long bytes_written_nested; 7494 unsigned long bytes_alloc_nested; 7495 unsigned long bytes_dropped_nested; 7496 int min_size_nested; 7497 int max_size_nested; 7498 int max_size; 7499 int min_size; 7500 int cpu; 7501 int cnt; 7502 }; 7503 7504 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7505 7506 /* 1 meg per cpu */ 7507 #define RB_TEST_BUFFER_SIZE 1048576 7508 7509 static char rb_string[] __initdata = 7510 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7511 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7512 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7513 7514 static bool rb_test_started __initdata; 7515 7516 struct rb_item { 7517 int size; 7518 char str[]; 7519 }; 7520 7521 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7522 { 7523 struct ring_buffer_event *event; 7524 struct rb_item *item; 7525 bool started; 7526 int event_len; 7527 int size; 7528 int len; 7529 int cnt; 7530 7531 /* Have nested writes different that what is written */ 7532 cnt = data->cnt + (nested ? 27 : 0); 7533 7534 /* Multiply cnt by ~e, to make some unique increment */ 7535 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7536 7537 len = size + sizeof(struct rb_item); 7538 7539 started = rb_test_started; 7540 /* read rb_test_started before checking buffer enabled */ 7541 smp_rmb(); 7542 7543 event = ring_buffer_lock_reserve(data->buffer, len); 7544 if (!event) { 7545 /* Ignore dropped events before test starts. */ 7546 if (started) { 7547 if (nested) 7548 data->bytes_dropped_nested += len; 7549 else 7550 data->bytes_dropped += len; 7551 } 7552 return len; 7553 } 7554 7555 event_len = ring_buffer_event_length(event); 7556 7557 if (RB_WARN_ON(data->buffer, event_len < len)) 7558 goto out; 7559 7560 item = ring_buffer_event_data(event); 7561 item->size = size; 7562 memcpy(item->str, rb_string, size); 7563 7564 if (nested) { 7565 data->bytes_alloc_nested += event_len; 7566 data->bytes_written_nested += len; 7567 data->events_nested++; 7568 if (!data->min_size_nested || len < data->min_size_nested) 7569 data->min_size_nested = len; 7570 if (len > data->max_size_nested) 7571 data->max_size_nested = len; 7572 } else { 7573 data->bytes_alloc += event_len; 7574 data->bytes_written += len; 7575 data->events++; 7576 if (!data->min_size || len < data->min_size) 7577 data->max_size = len; 7578 if (len > data->max_size) 7579 data->max_size = len; 7580 } 7581 7582 out: 7583 ring_buffer_unlock_commit(data->buffer); 7584 7585 return 0; 7586 } 7587 7588 static __init int rb_test(void *arg) 7589 { 7590 struct rb_test_data *data = arg; 7591 7592 while (!kthread_should_stop()) { 7593 rb_write_something(data, false); 7594 data->cnt++; 7595 7596 set_current_state(TASK_INTERRUPTIBLE); 7597 /* Now sleep between a min of 100-300us and a max of 1ms */ 7598 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7599 } 7600 7601 return 0; 7602 } 7603 7604 static __init void rb_ipi(void *ignore) 7605 { 7606 struct rb_test_data *data; 7607 int cpu = smp_processor_id(); 7608 7609 data = &rb_data[cpu]; 7610 rb_write_something(data, true); 7611 } 7612 7613 static __init int rb_hammer_test(void *arg) 7614 { 7615 while (!kthread_should_stop()) { 7616 7617 /* Send an IPI to all cpus to write data! */ 7618 smp_call_function(rb_ipi, NULL, 1); 7619 /* No sleep, but for non preempt, let others run */ 7620 schedule(); 7621 } 7622 7623 return 0; 7624 } 7625 7626 static __init int test_ringbuffer(void) 7627 { 7628 struct task_struct *rb_hammer; 7629 struct trace_buffer *buffer; 7630 int cpu; 7631 int ret = 0; 7632 7633 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7634 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7635 return 0; 7636 } 7637 7638 pr_info("Running ring buffer tests...\n"); 7639 7640 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7641 if (WARN_ON(!buffer)) 7642 return 0; 7643 7644 /* Disable buffer so that threads can't write to it yet */ 7645 ring_buffer_record_off(buffer); 7646 7647 for_each_online_cpu(cpu) { 7648 rb_data[cpu].buffer = buffer; 7649 rb_data[cpu].cpu = cpu; 7650 rb_data[cpu].cnt = cpu; 7651 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7652 cpu, "rbtester/%u"); 7653 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7654 pr_cont("FAILED\n"); 7655 ret = PTR_ERR(rb_threads[cpu]); 7656 goto out_free; 7657 } 7658 } 7659 7660 /* Now create the rb hammer! */ 7661 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7662 if (WARN_ON(IS_ERR(rb_hammer))) { 7663 pr_cont("FAILED\n"); 7664 ret = PTR_ERR(rb_hammer); 7665 goto out_free; 7666 } 7667 7668 ring_buffer_record_on(buffer); 7669 /* 7670 * Show buffer is enabled before setting rb_test_started. 7671 * Yes there's a small race window where events could be 7672 * dropped and the thread won't catch it. But when a ring 7673 * buffer gets enabled, there will always be some kind of 7674 * delay before other CPUs see it. Thus, we don't care about 7675 * those dropped events. We care about events dropped after 7676 * the threads see that the buffer is active. 7677 */ 7678 smp_wmb(); 7679 rb_test_started = true; 7680 7681 set_current_state(TASK_INTERRUPTIBLE); 7682 /* Just run for 10 seconds */ 7683 schedule_timeout(10 * HZ); 7684 7685 kthread_stop(rb_hammer); 7686 7687 out_free: 7688 for_each_online_cpu(cpu) { 7689 if (!rb_threads[cpu]) 7690 break; 7691 kthread_stop(rb_threads[cpu]); 7692 } 7693 if (ret) { 7694 ring_buffer_free(buffer); 7695 return ret; 7696 } 7697 7698 /* Report! */ 7699 pr_info("finished\n"); 7700 for_each_online_cpu(cpu) { 7701 struct ring_buffer_event *event; 7702 struct rb_test_data *data = &rb_data[cpu]; 7703 struct rb_item *item; 7704 unsigned long total_events; 7705 unsigned long total_dropped; 7706 unsigned long total_written; 7707 unsigned long total_alloc; 7708 unsigned long total_read = 0; 7709 unsigned long total_size = 0; 7710 unsigned long total_len = 0; 7711 unsigned long total_lost = 0; 7712 unsigned long lost; 7713 int big_event_size; 7714 int small_event_size; 7715 7716 ret = -1; 7717 7718 total_events = data->events + data->events_nested; 7719 total_written = data->bytes_written + data->bytes_written_nested; 7720 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7721 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7722 7723 big_event_size = data->max_size + data->max_size_nested; 7724 small_event_size = data->min_size + data->min_size_nested; 7725 7726 pr_info("CPU %d:\n", cpu); 7727 pr_info(" events: %ld\n", total_events); 7728 pr_info(" dropped bytes: %ld\n", total_dropped); 7729 pr_info(" alloced bytes: %ld\n", total_alloc); 7730 pr_info(" written bytes: %ld\n", total_written); 7731 pr_info(" biggest event: %d\n", big_event_size); 7732 pr_info(" smallest event: %d\n", small_event_size); 7733 7734 if (RB_WARN_ON(buffer, total_dropped)) 7735 break; 7736 7737 ret = 0; 7738 7739 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7740 total_lost += lost; 7741 item = ring_buffer_event_data(event); 7742 total_len += ring_buffer_event_length(event); 7743 total_size += item->size + sizeof(struct rb_item); 7744 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7745 pr_info("FAILED!\n"); 7746 pr_info("buffer had: %.*s\n", item->size, item->str); 7747 pr_info("expected: %.*s\n", item->size, rb_string); 7748 RB_WARN_ON(buffer, 1); 7749 ret = -1; 7750 break; 7751 } 7752 total_read++; 7753 } 7754 if (ret) 7755 break; 7756 7757 ret = -1; 7758 7759 pr_info(" read events: %ld\n", total_read); 7760 pr_info(" lost events: %ld\n", total_lost); 7761 pr_info(" total events: %ld\n", total_lost + total_read); 7762 pr_info(" recorded len bytes: %ld\n", total_len); 7763 pr_info(" recorded size bytes: %ld\n", total_size); 7764 if (total_lost) { 7765 pr_info(" With dropped events, record len and size may not match\n" 7766 " alloced and written from above\n"); 7767 } else { 7768 if (RB_WARN_ON(buffer, total_len != total_alloc || 7769 total_size != total_written)) 7770 break; 7771 } 7772 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7773 break; 7774 7775 ret = 0; 7776 } 7777 if (!ret) 7778 pr_info("Ring buffer PASSED!\n"); 7779 7780 ring_buffer_free(buffer); 7781 return 0; 7782 } 7783 7784 late_initcall(test_ringbuffer); 7785 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7786