1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 #include <asm/setup.h> 35 36 #include "trace.h" 37 38 /* 39 * The "absolute" timestamp in the buffer is only 59 bits. 40 * If a clock has the 5 MSBs set, it needs to be saved and 41 * reinserted. 42 */ 43 #define TS_MSB (0xf8ULL << 56) 44 #define ABS_TS_MASK (~TS_MSB) 45 46 static void update_pages_handler(struct work_struct *work); 47 48 #define RING_BUFFER_META_MAGIC 0xBADFEED 49 50 struct ring_buffer_meta { 51 int magic; 52 int struct_sizes; 53 unsigned long total_size; 54 unsigned long buffers_offset; 55 }; 56 57 struct ring_buffer_cpu_meta { 58 unsigned long first_buffer; 59 unsigned long head_buffer; 60 unsigned long commit_buffer; 61 __u32 subbuf_size; 62 __u32 nr_subbufs; 63 int buffers[]; 64 }; 65 66 /* 67 * The ring buffer header is special. We must manually up keep it. 68 */ 69 int ring_buffer_print_entry_header(struct trace_seq *s) 70 { 71 trace_seq_puts(s, "# compressed entry header\n"); 72 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 73 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 74 trace_seq_puts(s, "\tarray : 32 bits\n"); 75 trace_seq_putc(s, '\n'); 76 trace_seq_printf(s, "\tpadding : type == %d\n", 77 RINGBUF_TYPE_PADDING); 78 trace_seq_printf(s, "\ttime_extend : type == %d\n", 79 RINGBUF_TYPE_TIME_EXTEND); 80 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 81 RINGBUF_TYPE_TIME_STAMP); 82 trace_seq_printf(s, "\tdata max type_len == %d\n", 83 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 84 85 return !trace_seq_has_overflowed(s); 86 } 87 88 /* 89 * The ring buffer is made up of a list of pages. A separate list of pages is 90 * allocated for each CPU. A writer may only write to a buffer that is 91 * associated with the CPU it is currently executing on. A reader may read 92 * from any per cpu buffer. 93 * 94 * The reader is special. For each per cpu buffer, the reader has its own 95 * reader page. When a reader has read the entire reader page, this reader 96 * page is swapped with another page in the ring buffer. 97 * 98 * Now, as long as the writer is off the reader page, the reader can do what 99 * ever it wants with that page. The writer will never write to that page 100 * again (as long as it is out of the ring buffer). 101 * 102 * Here's some silly ASCII art. 103 * 104 * +------+ 105 * |reader| RING BUFFER 106 * |page | 107 * +------+ +---+ +---+ +---+ 108 * | |-->| |-->| | 109 * +---+ +---+ +---+ 110 * ^ | 111 * | | 112 * +---------------+ 113 * 114 * 115 * +------+ 116 * |reader| RING BUFFER 117 * |page |------------------v 118 * +------+ +---+ +---+ +---+ 119 * | |-->| |-->| | 120 * +---+ +---+ +---+ 121 * ^ | 122 * | | 123 * +---------------+ 124 * 125 * 126 * +------+ 127 * |reader| RING BUFFER 128 * |page |------------------v 129 * +------+ +---+ +---+ +---+ 130 * ^ | |-->| |-->| | 131 * | +---+ +---+ +---+ 132 * | | 133 * | | 134 * +------------------------------+ 135 * 136 * 137 * +------+ 138 * |buffer| RING BUFFER 139 * |page |------------------v 140 * +------+ +---+ +---+ +---+ 141 * ^ | | | |-->| | 142 * | New +---+ +---+ +---+ 143 * | Reader------^ | 144 * | page | 145 * +------------------------------+ 146 * 147 * 148 * After we make this swap, the reader can hand this page off to the splice 149 * code and be done with it. It can even allocate a new page if it needs to 150 * and swap that into the ring buffer. 151 * 152 * We will be using cmpxchg soon to make all this lockless. 153 * 154 */ 155 156 /* Used for individual buffers (after the counter) */ 157 #define RB_BUFFER_OFF (1 << 20) 158 159 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 160 161 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 162 #define RB_ALIGNMENT 4U 163 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 164 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 165 166 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 167 # define RB_FORCE_8BYTE_ALIGNMENT 0 168 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 169 #else 170 # define RB_FORCE_8BYTE_ALIGNMENT 1 171 # define RB_ARCH_ALIGNMENT 8U 172 #endif 173 174 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 175 176 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 177 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 178 179 enum { 180 RB_LEN_TIME_EXTEND = 8, 181 RB_LEN_TIME_STAMP = 8, 182 }; 183 184 #define skip_time_extend(event) \ 185 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 186 187 #define extended_time(event) \ 188 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 189 190 static inline bool rb_null_event(struct ring_buffer_event *event) 191 { 192 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 193 } 194 195 static void rb_event_set_padding(struct ring_buffer_event *event) 196 { 197 /* padding has a NULL time_delta */ 198 event->type_len = RINGBUF_TYPE_PADDING; 199 event->time_delta = 0; 200 } 201 202 static unsigned 203 rb_event_data_length(struct ring_buffer_event *event) 204 { 205 unsigned length; 206 207 if (event->type_len) 208 length = event->type_len * RB_ALIGNMENT; 209 else 210 length = event->array[0]; 211 return length + RB_EVNT_HDR_SIZE; 212 } 213 214 /* 215 * Return the length of the given event. Will return 216 * the length of the time extend if the event is a 217 * time extend. 218 */ 219 static inline unsigned 220 rb_event_length(struct ring_buffer_event *event) 221 { 222 switch (event->type_len) { 223 case RINGBUF_TYPE_PADDING: 224 if (rb_null_event(event)) 225 /* undefined */ 226 return -1; 227 return event->array[0] + RB_EVNT_HDR_SIZE; 228 229 case RINGBUF_TYPE_TIME_EXTEND: 230 return RB_LEN_TIME_EXTEND; 231 232 case RINGBUF_TYPE_TIME_STAMP: 233 return RB_LEN_TIME_STAMP; 234 235 case RINGBUF_TYPE_DATA: 236 return rb_event_data_length(event); 237 default: 238 WARN_ON_ONCE(1); 239 } 240 /* not hit */ 241 return 0; 242 } 243 244 /* 245 * Return total length of time extend and data, 246 * or just the event length for all other events. 247 */ 248 static inline unsigned 249 rb_event_ts_length(struct ring_buffer_event *event) 250 { 251 unsigned len = 0; 252 253 if (extended_time(event)) { 254 /* time extends include the data event after it */ 255 len = RB_LEN_TIME_EXTEND; 256 event = skip_time_extend(event); 257 } 258 return len + rb_event_length(event); 259 } 260 261 /** 262 * ring_buffer_event_length - return the length of the event 263 * @event: the event to get the length of 264 * 265 * Returns the size of the data load of a data event. 266 * If the event is something other than a data event, it 267 * returns the size of the event itself. With the exception 268 * of a TIME EXTEND, where it still returns the size of the 269 * data load of the data event after it. 270 */ 271 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 272 { 273 unsigned length; 274 275 if (extended_time(event)) 276 event = skip_time_extend(event); 277 278 length = rb_event_length(event); 279 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 280 return length; 281 length -= RB_EVNT_HDR_SIZE; 282 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 283 length -= sizeof(event->array[0]); 284 return length; 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 287 288 /* inline for ring buffer fast paths */ 289 static __always_inline void * 290 rb_event_data(struct ring_buffer_event *event) 291 { 292 if (extended_time(event)) 293 event = skip_time_extend(event); 294 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define for_each_online_buffer_cpu(buffer, cpu) \ 316 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 317 318 #define TS_SHIFT 27 319 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 320 #define TS_DELTA_TEST (~TS_MASK) 321 322 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 323 { 324 u64 ts; 325 326 ts = event->array[0]; 327 ts <<= TS_SHIFT; 328 ts += event->time_delta; 329 330 return ts; 331 } 332 333 /* Flag when events were overwritten */ 334 #define RB_MISSED_EVENTS (1 << 31) 335 /* Missed count stored at end */ 336 #define RB_MISSED_STORED (1 << 30) 337 338 #define RB_MISSED_MASK (3 << 30) 339 340 struct buffer_data_page { 341 u64 time_stamp; /* page time stamp */ 342 local_t commit; /* write committed index */ 343 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 344 }; 345 346 struct buffer_data_read_page { 347 unsigned order; /* order of the page */ 348 struct buffer_data_page *data; /* actual data, stored in this page */ 349 }; 350 351 /* 352 * Note, the buffer_page list must be first. The buffer pages 353 * are allocated in cache lines, which means that each buffer 354 * page will be at the beginning of a cache line, and thus 355 * the least significant bits will be zero. We use this to 356 * add flags in the list struct pointers, to make the ring buffer 357 * lockless. 358 */ 359 struct buffer_page { 360 struct list_head list; /* list of buffer pages */ 361 local_t write; /* index for next write */ 362 unsigned read; /* index for next read */ 363 local_t entries; /* entries on this page */ 364 unsigned long real_end; /* real end of data */ 365 unsigned order; /* order of the page */ 366 u32 id:30; /* ID for external mapping */ 367 u32 range:1; /* Mapped via a range */ 368 struct buffer_data_page *page; /* Actual data page */ 369 }; 370 371 /* 372 * The buffer page counters, write and entries, must be reset 373 * atomically when crossing page boundaries. To synchronize this 374 * update, two counters are inserted into the number. One is 375 * the actual counter for the write position or count on the page. 376 * 377 * The other is a counter of updaters. Before an update happens 378 * the update partition of the counter is incremented. This will 379 * allow the updater to update the counter atomically. 380 * 381 * The counter is 20 bits, and the state data is 12. 382 */ 383 #define RB_WRITE_MASK 0xfffff 384 #define RB_WRITE_INTCNT (1 << 20) 385 386 static void rb_init_page(struct buffer_data_page *bpage) 387 { 388 local_set(&bpage->commit, 0); 389 } 390 391 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 392 { 393 return local_read(&bpage->page->commit); 394 } 395 396 static void free_buffer_page(struct buffer_page *bpage) 397 { 398 /* Range pages are not to be freed */ 399 if (!bpage->range) 400 free_pages((unsigned long)bpage->page, bpage->order); 401 kfree(bpage); 402 } 403 404 /* 405 * For best performance, allocate cpu buffer data cache line sized 406 * and per CPU. 407 */ 408 #define alloc_cpu_buffer(cpu) (struct ring_buffer_per_cpu *) \ 409 kzalloc_node(ALIGN(sizeof(struct ring_buffer_per_cpu), \ 410 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 411 412 #define alloc_cpu_page(cpu) (struct buffer_page *) \ 413 kzalloc_node(ALIGN(sizeof(struct buffer_page), \ 414 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 415 416 static struct buffer_data_page *alloc_cpu_data(int cpu, int order) 417 { 418 struct buffer_data_page *dpage; 419 struct page *page; 420 gfp_t mflags; 421 422 /* 423 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 424 * gracefully without invoking oom-killer and the system is not 425 * destabilized. 426 */ 427 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_COMP | __GFP_ZERO; 428 429 page = alloc_pages_node(cpu_to_node(cpu), mflags, order); 430 if (!page) 431 return NULL; 432 433 dpage = page_address(page); 434 rb_init_page(dpage); 435 436 return dpage; 437 } 438 439 /* 440 * We need to fit the time_stamp delta into 27 bits. 441 */ 442 static inline bool test_time_stamp(u64 delta) 443 { 444 return !!(delta & TS_DELTA_TEST); 445 } 446 447 struct rb_irq_work { 448 struct irq_work work; 449 wait_queue_head_t waiters; 450 wait_queue_head_t full_waiters; 451 atomic_t seq; 452 bool waiters_pending; 453 bool full_waiters_pending; 454 bool wakeup_full; 455 }; 456 457 /* 458 * Structure to hold event state and handle nested events. 459 */ 460 struct rb_event_info { 461 u64 ts; 462 u64 delta; 463 u64 before; 464 u64 after; 465 unsigned long length; 466 struct buffer_page *tail_page; 467 int add_timestamp; 468 }; 469 470 /* 471 * Used for the add_timestamp 472 * NONE 473 * EXTEND - wants a time extend 474 * ABSOLUTE - the buffer requests all events to have absolute time stamps 475 * FORCE - force a full time stamp. 476 */ 477 enum { 478 RB_ADD_STAMP_NONE = 0, 479 RB_ADD_STAMP_EXTEND = BIT(1), 480 RB_ADD_STAMP_ABSOLUTE = BIT(2), 481 RB_ADD_STAMP_FORCE = BIT(3) 482 }; 483 /* 484 * Used for which event context the event is in. 485 * TRANSITION = 0 486 * NMI = 1 487 * IRQ = 2 488 * SOFTIRQ = 3 489 * NORMAL = 4 490 * 491 * See trace_recursive_lock() comment below for more details. 492 */ 493 enum { 494 RB_CTX_TRANSITION, 495 RB_CTX_NMI, 496 RB_CTX_IRQ, 497 RB_CTX_SOFTIRQ, 498 RB_CTX_NORMAL, 499 RB_CTX_MAX 500 }; 501 502 struct rb_time_struct { 503 local64_t time; 504 }; 505 typedef struct rb_time_struct rb_time_t; 506 507 #define MAX_NEST 5 508 509 /* 510 * head_page == tail_page && head == tail then buffer is empty. 511 */ 512 struct ring_buffer_per_cpu { 513 int cpu; 514 atomic_t record_disabled; 515 atomic_t resize_disabled; 516 struct trace_buffer *buffer; 517 raw_spinlock_t reader_lock; /* serialize readers */ 518 arch_spinlock_t lock; 519 struct lock_class_key lock_key; 520 struct buffer_data_page *free_page; 521 unsigned long nr_pages; 522 unsigned int current_context; 523 struct list_head *pages; 524 /* pages generation counter, incremented when the list changes */ 525 unsigned long cnt; 526 struct buffer_page *head_page; /* read from head */ 527 struct buffer_page *tail_page; /* write to tail */ 528 struct buffer_page *commit_page; /* committed pages */ 529 struct buffer_page *reader_page; 530 unsigned long lost_events; 531 unsigned long last_overrun; 532 unsigned long nest; 533 local_t entries_bytes; 534 local_t entries; 535 local_t overrun; 536 local_t commit_overrun; 537 local_t dropped_events; 538 local_t committing; 539 local_t commits; 540 local_t pages_touched; 541 local_t pages_lost; 542 local_t pages_read; 543 long last_pages_touch; 544 size_t shortest_full; 545 unsigned long read; 546 unsigned long read_bytes; 547 rb_time_t write_stamp; 548 rb_time_t before_stamp; 549 u64 event_stamp[MAX_NEST]; 550 u64 read_stamp; 551 /* pages removed since last reset */ 552 unsigned long pages_removed; 553 554 unsigned int mapped; 555 unsigned int user_mapped; /* user space mapping */ 556 struct mutex mapping_lock; 557 unsigned long *subbuf_ids; /* ID to subbuf VA */ 558 struct trace_buffer_meta *meta_page; 559 struct ring_buffer_cpu_meta *ring_meta; 560 561 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 562 long nr_pages_to_update; 563 struct list_head new_pages; /* new pages to add */ 564 struct work_struct update_pages_work; 565 struct completion update_done; 566 567 struct rb_irq_work irq_work; 568 }; 569 570 struct trace_buffer { 571 unsigned flags; 572 int cpus; 573 atomic_t record_disabled; 574 atomic_t resizing; 575 cpumask_var_t cpumask; 576 577 struct lock_class_key *reader_lock_key; 578 579 struct mutex mutex; 580 581 struct ring_buffer_per_cpu **buffers; 582 583 struct hlist_node node; 584 u64 (*clock)(void); 585 586 struct rb_irq_work irq_work; 587 bool time_stamp_abs; 588 589 unsigned long range_addr_start; 590 unsigned long range_addr_end; 591 592 struct ring_buffer_meta *meta; 593 594 unsigned int subbuf_size; 595 unsigned int subbuf_order; 596 unsigned int max_data_size; 597 }; 598 599 struct ring_buffer_iter { 600 struct ring_buffer_per_cpu *cpu_buffer; 601 unsigned long head; 602 unsigned long next_event; 603 struct buffer_page *head_page; 604 struct buffer_page *cache_reader_page; 605 unsigned long cache_read; 606 unsigned long cache_pages_removed; 607 u64 read_stamp; 608 u64 page_stamp; 609 struct ring_buffer_event *event; 610 size_t event_size; 611 int missed_events; 612 }; 613 614 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 615 { 616 struct buffer_data_page field; 617 618 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 619 "offset:0;\tsize:%u;\tsigned:%u;\n", 620 (unsigned int)sizeof(field.time_stamp), 621 (unsigned int)is_signed_type(u64)); 622 623 trace_seq_printf(s, "\tfield: local_t commit;\t" 624 "offset:%u;\tsize:%u;\tsigned:%u;\n", 625 (unsigned int)offsetof(typeof(field), commit), 626 (unsigned int)sizeof(field.commit), 627 (unsigned int)is_signed_type(long)); 628 629 trace_seq_printf(s, "\tfield: int overwrite;\t" 630 "offset:%u;\tsize:%u;\tsigned:%u;\n", 631 (unsigned int)offsetof(typeof(field), commit), 632 1, 633 (unsigned int)is_signed_type(long)); 634 635 trace_seq_printf(s, "\tfield: char data;\t" 636 "offset:%u;\tsize:%u;\tsigned:%u;\n", 637 (unsigned int)offsetof(typeof(field), data), 638 (unsigned int)buffer->subbuf_size, 639 (unsigned int)is_signed_type(char)); 640 641 return !trace_seq_has_overflowed(s); 642 } 643 644 static inline void rb_time_read(rb_time_t *t, u64 *ret) 645 { 646 *ret = local64_read(&t->time); 647 } 648 static void rb_time_set(rb_time_t *t, u64 val) 649 { 650 local64_set(&t->time, val); 651 } 652 653 /* 654 * Enable this to make sure that the event passed to 655 * ring_buffer_event_time_stamp() is not committed and also 656 * is on the buffer that it passed in. 657 */ 658 //#define RB_VERIFY_EVENT 659 #ifdef RB_VERIFY_EVENT 660 static struct list_head *rb_list_head(struct list_head *list); 661 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 662 void *event) 663 { 664 struct buffer_page *page = cpu_buffer->commit_page; 665 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 666 struct list_head *next; 667 long commit, write; 668 unsigned long addr = (unsigned long)event; 669 bool done = false; 670 int stop = 0; 671 672 /* Make sure the event exists and is not committed yet */ 673 do { 674 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 675 done = true; 676 commit = local_read(&page->page->commit); 677 write = local_read(&page->write); 678 if (addr >= (unsigned long)&page->page->data[commit] && 679 addr < (unsigned long)&page->page->data[write]) 680 return; 681 682 next = rb_list_head(page->list.next); 683 page = list_entry(next, struct buffer_page, list); 684 } while (!done); 685 WARN_ON_ONCE(1); 686 } 687 #else 688 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 689 void *event) 690 { 691 } 692 #endif 693 694 /* 695 * The absolute time stamp drops the 5 MSBs and some clocks may 696 * require them. The rb_fix_abs_ts() will take a previous full 697 * time stamp, and add the 5 MSB of that time stamp on to the 698 * saved absolute time stamp. Then they are compared in case of 699 * the unlikely event that the latest time stamp incremented 700 * the 5 MSB. 701 */ 702 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 703 { 704 if (save_ts & TS_MSB) { 705 abs |= save_ts & TS_MSB; 706 /* Check for overflow */ 707 if (unlikely(abs < save_ts)) 708 abs += 1ULL << 59; 709 } 710 return abs; 711 } 712 713 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 714 715 /** 716 * ring_buffer_event_time_stamp - return the event's current time stamp 717 * @buffer: The buffer that the event is on 718 * @event: the event to get the time stamp of 719 * 720 * Note, this must be called after @event is reserved, and before it is 721 * committed to the ring buffer. And must be called from the same 722 * context where the event was reserved (normal, softirq, irq, etc). 723 * 724 * Returns the time stamp associated with the current event. 725 * If the event has an extended time stamp, then that is used as 726 * the time stamp to return. 727 * In the highly unlikely case that the event was nested more than 728 * the max nesting, then the write_stamp of the buffer is returned, 729 * otherwise current time is returned, but that really neither of 730 * the last two cases should ever happen. 731 */ 732 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 733 struct ring_buffer_event *event) 734 { 735 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 736 unsigned int nest; 737 u64 ts; 738 739 /* If the event includes an absolute time, then just use that */ 740 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 741 ts = rb_event_time_stamp(event); 742 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 743 } 744 745 nest = local_read(&cpu_buffer->committing); 746 verify_event(cpu_buffer, event); 747 if (WARN_ON_ONCE(!nest)) 748 goto fail; 749 750 /* Read the current saved nesting level time stamp */ 751 if (likely(--nest < MAX_NEST)) 752 return cpu_buffer->event_stamp[nest]; 753 754 /* Shouldn't happen, warn if it does */ 755 WARN_ONCE(1, "nest (%d) greater than max", nest); 756 757 fail: 758 rb_time_read(&cpu_buffer->write_stamp, &ts); 759 760 return ts; 761 } 762 763 /** 764 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 765 * @buffer: The ring_buffer to get the number of pages from 766 * @cpu: The cpu of the ring_buffer to get the number of pages from 767 * 768 * Returns the number of pages that have content in the ring buffer. 769 */ 770 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 771 { 772 size_t read; 773 size_t lost; 774 size_t cnt; 775 776 read = local_read(&buffer->buffers[cpu]->pages_read); 777 lost = local_read(&buffer->buffers[cpu]->pages_lost); 778 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 779 780 if (WARN_ON_ONCE(cnt < lost)) 781 return 0; 782 783 cnt -= lost; 784 785 /* The reader can read an empty page, but not more than that */ 786 if (cnt < read) { 787 WARN_ON_ONCE(read > cnt + 1); 788 return 0; 789 } 790 791 return cnt - read; 792 } 793 794 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 795 { 796 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 797 size_t nr_pages; 798 size_t dirty; 799 800 nr_pages = cpu_buffer->nr_pages; 801 if (!nr_pages || !full) 802 return true; 803 804 /* 805 * Add one as dirty will never equal nr_pages, as the sub-buffer 806 * that the writer is on is not counted as dirty. 807 * This is needed if "buffer_percent" is set to 100. 808 */ 809 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 810 811 return (dirty * 100) >= (full * nr_pages); 812 } 813 814 /* 815 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 816 * 817 * Schedules a delayed work to wake up any task that is blocked on the 818 * ring buffer waiters queue. 819 */ 820 static void rb_wake_up_waiters(struct irq_work *work) 821 { 822 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 823 824 /* For waiters waiting for the first wake up */ 825 (void)atomic_fetch_inc_release(&rbwork->seq); 826 827 wake_up_all(&rbwork->waiters); 828 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 829 /* Only cpu_buffer sets the above flags */ 830 struct ring_buffer_per_cpu *cpu_buffer = 831 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 832 833 /* Called from interrupt context */ 834 raw_spin_lock(&cpu_buffer->reader_lock); 835 rbwork->wakeup_full = false; 836 rbwork->full_waiters_pending = false; 837 838 /* Waking up all waiters, they will reset the shortest full */ 839 cpu_buffer->shortest_full = 0; 840 raw_spin_unlock(&cpu_buffer->reader_lock); 841 842 wake_up_all(&rbwork->full_waiters); 843 } 844 } 845 846 /** 847 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 848 * @buffer: The ring buffer to wake waiters on 849 * @cpu: The CPU buffer to wake waiters on 850 * 851 * In the case of a file that represents a ring buffer is closing, 852 * it is prudent to wake up any waiters that are on this. 853 */ 854 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 855 { 856 struct ring_buffer_per_cpu *cpu_buffer; 857 struct rb_irq_work *rbwork; 858 859 if (!buffer) 860 return; 861 862 if (cpu == RING_BUFFER_ALL_CPUS) { 863 864 /* Wake up individual ones too. One level recursion */ 865 for_each_buffer_cpu(buffer, cpu) 866 ring_buffer_wake_waiters(buffer, cpu); 867 868 rbwork = &buffer->irq_work; 869 } else { 870 if (WARN_ON_ONCE(!buffer->buffers)) 871 return; 872 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 873 return; 874 875 cpu_buffer = buffer->buffers[cpu]; 876 /* The CPU buffer may not have been initialized yet */ 877 if (!cpu_buffer) 878 return; 879 rbwork = &cpu_buffer->irq_work; 880 } 881 882 /* This can be called in any context */ 883 irq_work_queue(&rbwork->work); 884 } 885 886 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 887 { 888 struct ring_buffer_per_cpu *cpu_buffer; 889 bool ret = false; 890 891 /* Reads of all CPUs always waits for any data */ 892 if (cpu == RING_BUFFER_ALL_CPUS) 893 return !ring_buffer_empty(buffer); 894 895 cpu_buffer = buffer->buffers[cpu]; 896 897 if (!ring_buffer_empty_cpu(buffer, cpu)) { 898 unsigned long flags; 899 bool pagebusy; 900 901 if (!full) 902 return true; 903 904 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 905 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 906 ret = !pagebusy && full_hit(buffer, cpu, full); 907 908 if (!ret && (!cpu_buffer->shortest_full || 909 cpu_buffer->shortest_full > full)) { 910 cpu_buffer->shortest_full = full; 911 } 912 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 913 } 914 return ret; 915 } 916 917 static inline bool 918 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 919 int cpu, int full, ring_buffer_cond_fn cond, void *data) 920 { 921 if (rb_watermark_hit(buffer, cpu, full)) 922 return true; 923 924 if (cond(data)) 925 return true; 926 927 /* 928 * The events can happen in critical sections where 929 * checking a work queue can cause deadlocks. 930 * After adding a task to the queue, this flag is set 931 * only to notify events to try to wake up the queue 932 * using irq_work. 933 * 934 * We don't clear it even if the buffer is no longer 935 * empty. The flag only causes the next event to run 936 * irq_work to do the work queue wake up. The worse 937 * that can happen if we race with !trace_empty() is that 938 * an event will cause an irq_work to try to wake up 939 * an empty queue. 940 * 941 * There's no reason to protect this flag either, as 942 * the work queue and irq_work logic will do the necessary 943 * synchronization for the wake ups. The only thing 944 * that is necessary is that the wake up happens after 945 * a task has been queued. It's OK for spurious wake ups. 946 */ 947 if (full) 948 rbwork->full_waiters_pending = true; 949 else 950 rbwork->waiters_pending = true; 951 952 return false; 953 } 954 955 struct rb_wait_data { 956 struct rb_irq_work *irq_work; 957 int seq; 958 }; 959 960 /* 961 * The default wait condition for ring_buffer_wait() is to just to exit the 962 * wait loop the first time it is woken up. 963 */ 964 static bool rb_wait_once(void *data) 965 { 966 struct rb_wait_data *rdata = data; 967 struct rb_irq_work *rbwork = rdata->irq_work; 968 969 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 970 } 971 972 /** 973 * ring_buffer_wait - wait for input to the ring buffer 974 * @buffer: buffer to wait on 975 * @cpu: the cpu buffer to wait on 976 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 977 * @cond: condition function to break out of wait (NULL to run once) 978 * @data: the data to pass to @cond. 979 * 980 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 981 * as data is added to any of the @buffer's cpu buffers. Otherwise 982 * it will wait for data to be added to a specific cpu buffer. 983 */ 984 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 985 ring_buffer_cond_fn cond, void *data) 986 { 987 struct ring_buffer_per_cpu *cpu_buffer; 988 struct wait_queue_head *waitq; 989 struct rb_irq_work *rbwork; 990 struct rb_wait_data rdata; 991 int ret = 0; 992 993 /* 994 * Depending on what the caller is waiting for, either any 995 * data in any cpu buffer, or a specific buffer, put the 996 * caller on the appropriate wait queue. 997 */ 998 if (cpu == RING_BUFFER_ALL_CPUS) { 999 rbwork = &buffer->irq_work; 1000 /* Full only makes sense on per cpu reads */ 1001 full = 0; 1002 } else { 1003 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1004 return -ENODEV; 1005 cpu_buffer = buffer->buffers[cpu]; 1006 rbwork = &cpu_buffer->irq_work; 1007 } 1008 1009 if (full) 1010 waitq = &rbwork->full_waiters; 1011 else 1012 waitq = &rbwork->waiters; 1013 1014 /* Set up to exit loop as soon as it is woken */ 1015 if (!cond) { 1016 cond = rb_wait_once; 1017 rdata.irq_work = rbwork; 1018 rdata.seq = atomic_read_acquire(&rbwork->seq); 1019 data = &rdata; 1020 } 1021 1022 ret = wait_event_interruptible((*waitq), 1023 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 1024 1025 return ret; 1026 } 1027 1028 /** 1029 * ring_buffer_poll_wait - poll on buffer input 1030 * @buffer: buffer to wait on 1031 * @cpu: the cpu buffer to wait on 1032 * @filp: the file descriptor 1033 * @poll_table: The poll descriptor 1034 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1035 * 1036 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1037 * as data is added to any of the @buffer's cpu buffers. Otherwise 1038 * it will wait for data to be added to a specific cpu buffer. 1039 * 1040 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1041 * zero otherwise. 1042 */ 1043 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1044 struct file *filp, poll_table *poll_table, int full) 1045 { 1046 struct ring_buffer_per_cpu *cpu_buffer; 1047 struct rb_irq_work *rbwork; 1048 1049 if (cpu == RING_BUFFER_ALL_CPUS) { 1050 rbwork = &buffer->irq_work; 1051 full = 0; 1052 } else { 1053 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1054 return EPOLLERR; 1055 1056 cpu_buffer = buffer->buffers[cpu]; 1057 rbwork = &cpu_buffer->irq_work; 1058 } 1059 1060 if (full) { 1061 poll_wait(filp, &rbwork->full_waiters, poll_table); 1062 1063 if (rb_watermark_hit(buffer, cpu, full)) 1064 return EPOLLIN | EPOLLRDNORM; 1065 /* 1066 * Only allow full_waiters_pending update to be seen after 1067 * the shortest_full is set (in rb_watermark_hit). If the 1068 * writer sees the full_waiters_pending flag set, it will 1069 * compare the amount in the ring buffer to shortest_full. 1070 * If the amount in the ring buffer is greater than the 1071 * shortest_full percent, it will call the irq_work handler 1072 * to wake up this list. The irq_handler will reset shortest_full 1073 * back to zero. That's done under the reader_lock, but 1074 * the below smp_mb() makes sure that the update to 1075 * full_waiters_pending doesn't leak up into the above. 1076 */ 1077 smp_mb(); 1078 rbwork->full_waiters_pending = true; 1079 return 0; 1080 } 1081 1082 poll_wait(filp, &rbwork->waiters, poll_table); 1083 rbwork->waiters_pending = true; 1084 1085 /* 1086 * There's a tight race between setting the waiters_pending and 1087 * checking if the ring buffer is empty. Once the waiters_pending bit 1088 * is set, the next event will wake the task up, but we can get stuck 1089 * if there's only a single event in. 1090 * 1091 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1092 * but adding a memory barrier to all events will cause too much of a 1093 * performance hit in the fast path. We only need a memory barrier when 1094 * the buffer goes from empty to having content. But as this race is 1095 * extremely small, and it's not a problem if another event comes in, we 1096 * will fix it later. 1097 */ 1098 smp_mb(); 1099 1100 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1101 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1102 return EPOLLIN | EPOLLRDNORM; 1103 return 0; 1104 } 1105 1106 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1107 #define RB_WARN_ON(b, cond) \ 1108 ({ \ 1109 int _____ret = unlikely(cond); \ 1110 if (_____ret) { \ 1111 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1112 struct ring_buffer_per_cpu *__b = \ 1113 (void *)b; \ 1114 atomic_inc(&__b->buffer->record_disabled); \ 1115 } else \ 1116 atomic_inc(&b->record_disabled); \ 1117 WARN_ON(1); \ 1118 } \ 1119 _____ret; \ 1120 }) 1121 1122 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1123 #define DEBUG_SHIFT 0 1124 1125 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1126 { 1127 u64 ts; 1128 1129 /* Skip retpolines :-( */ 1130 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1131 ts = trace_clock_local(); 1132 else 1133 ts = buffer->clock(); 1134 1135 /* shift to debug/test normalization and TIME_EXTENTS */ 1136 return ts << DEBUG_SHIFT; 1137 } 1138 1139 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1140 { 1141 u64 time; 1142 1143 preempt_disable_notrace(); 1144 time = rb_time_stamp(buffer); 1145 preempt_enable_notrace(); 1146 1147 return time; 1148 } 1149 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1150 1151 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1152 int cpu, u64 *ts) 1153 { 1154 /* Just stupid testing the normalize function and deltas */ 1155 *ts >>= DEBUG_SHIFT; 1156 } 1157 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1158 1159 /* 1160 * Making the ring buffer lockless makes things tricky. 1161 * Although writes only happen on the CPU that they are on, 1162 * and they only need to worry about interrupts. Reads can 1163 * happen on any CPU. 1164 * 1165 * The reader page is always off the ring buffer, but when the 1166 * reader finishes with a page, it needs to swap its page with 1167 * a new one from the buffer. The reader needs to take from 1168 * the head (writes go to the tail). But if a writer is in overwrite 1169 * mode and wraps, it must push the head page forward. 1170 * 1171 * Here lies the problem. 1172 * 1173 * The reader must be careful to replace only the head page, and 1174 * not another one. As described at the top of the file in the 1175 * ASCII art, the reader sets its old page to point to the next 1176 * page after head. It then sets the page after head to point to 1177 * the old reader page. But if the writer moves the head page 1178 * during this operation, the reader could end up with the tail. 1179 * 1180 * We use cmpxchg to help prevent this race. We also do something 1181 * special with the page before head. We set the LSB to 1. 1182 * 1183 * When the writer must push the page forward, it will clear the 1184 * bit that points to the head page, move the head, and then set 1185 * the bit that points to the new head page. 1186 * 1187 * We also don't want an interrupt coming in and moving the head 1188 * page on another writer. Thus we use the second LSB to catch 1189 * that too. Thus: 1190 * 1191 * head->list->prev->next bit 1 bit 0 1192 * ------- ------- 1193 * Normal page 0 0 1194 * Points to head page 0 1 1195 * New head page 1 0 1196 * 1197 * Note we can not trust the prev pointer of the head page, because: 1198 * 1199 * +----+ +-----+ +-----+ 1200 * | |------>| T |---X--->| N | 1201 * | |<------| | | | 1202 * +----+ +-----+ +-----+ 1203 * ^ ^ | 1204 * | +-----+ | | 1205 * +----------| R |----------+ | 1206 * | |<-----------+ 1207 * +-----+ 1208 * 1209 * Key: ---X--> HEAD flag set in pointer 1210 * T Tail page 1211 * R Reader page 1212 * N Next page 1213 * 1214 * (see __rb_reserve_next() to see where this happens) 1215 * 1216 * What the above shows is that the reader just swapped out 1217 * the reader page with a page in the buffer, but before it 1218 * could make the new header point back to the new page added 1219 * it was preempted by a writer. The writer moved forward onto 1220 * the new page added by the reader and is about to move forward 1221 * again. 1222 * 1223 * You can see, it is legitimate for the previous pointer of 1224 * the head (or any page) not to point back to itself. But only 1225 * temporarily. 1226 */ 1227 1228 #define RB_PAGE_NORMAL 0UL 1229 #define RB_PAGE_HEAD 1UL 1230 #define RB_PAGE_UPDATE 2UL 1231 1232 1233 #define RB_FLAG_MASK 3UL 1234 1235 /* PAGE_MOVED is not part of the mask */ 1236 #define RB_PAGE_MOVED 4UL 1237 1238 /* 1239 * rb_list_head - remove any bit 1240 */ 1241 static struct list_head *rb_list_head(struct list_head *list) 1242 { 1243 unsigned long val = (unsigned long)list; 1244 1245 return (struct list_head *)(val & ~RB_FLAG_MASK); 1246 } 1247 1248 /* 1249 * rb_is_head_page - test if the given page is the head page 1250 * 1251 * Because the reader may move the head_page pointer, we can 1252 * not trust what the head page is (it may be pointing to 1253 * the reader page). But if the next page is a header page, 1254 * its flags will be non zero. 1255 */ 1256 static inline int 1257 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1258 { 1259 unsigned long val; 1260 1261 val = (unsigned long)list->next; 1262 1263 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1264 return RB_PAGE_MOVED; 1265 1266 return val & RB_FLAG_MASK; 1267 } 1268 1269 /* 1270 * rb_is_reader_page 1271 * 1272 * The unique thing about the reader page, is that, if the 1273 * writer is ever on it, the previous pointer never points 1274 * back to the reader page. 1275 */ 1276 static bool rb_is_reader_page(struct buffer_page *page) 1277 { 1278 struct list_head *list = page->list.prev; 1279 1280 return rb_list_head(list->next) != &page->list; 1281 } 1282 1283 /* 1284 * rb_set_list_to_head - set a list_head to be pointing to head. 1285 */ 1286 static void rb_set_list_to_head(struct list_head *list) 1287 { 1288 unsigned long *ptr; 1289 1290 ptr = (unsigned long *)&list->next; 1291 *ptr |= RB_PAGE_HEAD; 1292 *ptr &= ~RB_PAGE_UPDATE; 1293 } 1294 1295 /* 1296 * rb_head_page_activate - sets up head page 1297 */ 1298 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1299 { 1300 struct buffer_page *head; 1301 1302 head = cpu_buffer->head_page; 1303 if (!head) 1304 return; 1305 1306 /* 1307 * Set the previous list pointer to have the HEAD flag. 1308 */ 1309 rb_set_list_to_head(head->list.prev); 1310 1311 if (cpu_buffer->ring_meta) { 1312 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1313 meta->head_buffer = (unsigned long)head->page; 1314 } 1315 } 1316 1317 static void rb_list_head_clear(struct list_head *list) 1318 { 1319 unsigned long *ptr = (unsigned long *)&list->next; 1320 1321 *ptr &= ~RB_FLAG_MASK; 1322 } 1323 1324 /* 1325 * rb_head_page_deactivate - clears head page ptr (for free list) 1326 */ 1327 static void 1328 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1329 { 1330 struct list_head *hd; 1331 1332 /* Go through the whole list and clear any pointers found. */ 1333 rb_list_head_clear(cpu_buffer->pages); 1334 1335 list_for_each(hd, cpu_buffer->pages) 1336 rb_list_head_clear(hd); 1337 } 1338 1339 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1340 struct buffer_page *head, 1341 struct buffer_page *prev, 1342 int old_flag, int new_flag) 1343 { 1344 struct list_head *list; 1345 unsigned long val = (unsigned long)&head->list; 1346 unsigned long ret; 1347 1348 list = &prev->list; 1349 1350 val &= ~RB_FLAG_MASK; 1351 1352 ret = cmpxchg((unsigned long *)&list->next, 1353 val | old_flag, val | new_flag); 1354 1355 /* check if the reader took the page */ 1356 if ((ret & ~RB_FLAG_MASK) != val) 1357 return RB_PAGE_MOVED; 1358 1359 return ret & RB_FLAG_MASK; 1360 } 1361 1362 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1363 struct buffer_page *head, 1364 struct buffer_page *prev, 1365 int old_flag) 1366 { 1367 return rb_head_page_set(cpu_buffer, head, prev, 1368 old_flag, RB_PAGE_UPDATE); 1369 } 1370 1371 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1372 struct buffer_page *head, 1373 struct buffer_page *prev, 1374 int old_flag) 1375 { 1376 return rb_head_page_set(cpu_buffer, head, prev, 1377 old_flag, RB_PAGE_HEAD); 1378 } 1379 1380 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1381 struct buffer_page *head, 1382 struct buffer_page *prev, 1383 int old_flag) 1384 { 1385 return rb_head_page_set(cpu_buffer, head, prev, 1386 old_flag, RB_PAGE_NORMAL); 1387 } 1388 1389 static inline void rb_inc_page(struct buffer_page **bpage) 1390 { 1391 struct list_head *p = rb_list_head((*bpage)->list.next); 1392 1393 *bpage = list_entry(p, struct buffer_page, list); 1394 } 1395 1396 static inline void rb_dec_page(struct buffer_page **bpage) 1397 { 1398 struct list_head *p = rb_list_head((*bpage)->list.prev); 1399 1400 *bpage = list_entry(p, struct buffer_page, list); 1401 } 1402 1403 static struct buffer_page * 1404 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1405 { 1406 struct buffer_page *head; 1407 struct buffer_page *page; 1408 struct list_head *list; 1409 int i; 1410 1411 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1412 return NULL; 1413 1414 /* sanity check */ 1415 list = cpu_buffer->pages; 1416 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1417 return NULL; 1418 1419 page = head = cpu_buffer->head_page; 1420 /* 1421 * It is possible that the writer moves the header behind 1422 * where we started, and we miss in one loop. 1423 * A second loop should grab the header, but we'll do 1424 * three loops just because I'm paranoid. 1425 */ 1426 for (i = 0; i < 3; i++) { 1427 do { 1428 if (rb_is_head_page(page, page->list.prev)) { 1429 cpu_buffer->head_page = page; 1430 return page; 1431 } 1432 rb_inc_page(&page); 1433 } while (page != head); 1434 } 1435 1436 RB_WARN_ON(cpu_buffer, 1); 1437 1438 return NULL; 1439 } 1440 1441 static bool rb_head_page_replace(struct buffer_page *old, 1442 struct buffer_page *new) 1443 { 1444 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1445 unsigned long val; 1446 1447 val = *ptr & ~RB_FLAG_MASK; 1448 val |= RB_PAGE_HEAD; 1449 1450 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1451 } 1452 1453 /* 1454 * rb_tail_page_update - move the tail page forward 1455 */ 1456 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1457 struct buffer_page *tail_page, 1458 struct buffer_page *next_page) 1459 { 1460 unsigned long old_entries; 1461 unsigned long old_write; 1462 1463 /* 1464 * The tail page now needs to be moved forward. 1465 * 1466 * We need to reset the tail page, but without messing 1467 * with possible erasing of data brought in by interrupts 1468 * that have moved the tail page and are currently on it. 1469 * 1470 * We add a counter to the write field to denote this. 1471 */ 1472 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1473 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1474 1475 /* 1476 * Just make sure we have seen our old_write and synchronize 1477 * with any interrupts that come in. 1478 */ 1479 barrier(); 1480 1481 /* 1482 * If the tail page is still the same as what we think 1483 * it is, then it is up to us to update the tail 1484 * pointer. 1485 */ 1486 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1487 /* Zero the write counter */ 1488 unsigned long val = old_write & ~RB_WRITE_MASK; 1489 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1490 1491 /* 1492 * This will only succeed if an interrupt did 1493 * not come in and change it. In which case, we 1494 * do not want to modify it. 1495 * 1496 * We add (void) to let the compiler know that we do not care 1497 * about the return value of these functions. We use the 1498 * cmpxchg to only update if an interrupt did not already 1499 * do it for us. If the cmpxchg fails, we don't care. 1500 */ 1501 (void)local_cmpxchg(&next_page->write, old_write, val); 1502 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1503 1504 /* 1505 * No need to worry about races with clearing out the commit. 1506 * it only can increment when a commit takes place. But that 1507 * only happens in the outer most nested commit. 1508 */ 1509 local_set(&next_page->page->commit, 0); 1510 1511 /* Either we update tail_page or an interrupt does */ 1512 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1513 local_inc(&cpu_buffer->pages_touched); 1514 } 1515 } 1516 1517 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1518 struct buffer_page *bpage) 1519 { 1520 unsigned long val = (unsigned long)bpage; 1521 1522 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1523 } 1524 1525 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1526 struct list_head *list) 1527 { 1528 if (RB_WARN_ON(cpu_buffer, 1529 rb_list_head(rb_list_head(list->next)->prev) != list)) 1530 return false; 1531 1532 if (RB_WARN_ON(cpu_buffer, 1533 rb_list_head(rb_list_head(list->prev)->next) != list)) 1534 return false; 1535 1536 return true; 1537 } 1538 1539 /** 1540 * rb_check_pages - integrity check of buffer pages 1541 * @cpu_buffer: CPU buffer with pages to test 1542 * 1543 * As a safety measure we check to make sure the data pages have not 1544 * been corrupted. 1545 */ 1546 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1547 { 1548 struct list_head *head, *tmp; 1549 unsigned long buffer_cnt; 1550 unsigned long flags; 1551 int nr_loops = 0; 1552 1553 /* 1554 * Walk the linked list underpinning the ring buffer and validate all 1555 * its next and prev links. 1556 * 1557 * The check acquires the reader_lock to avoid concurrent processing 1558 * with code that could be modifying the list. However, the lock cannot 1559 * be held for the entire duration of the walk, as this would make the 1560 * time when interrupts are disabled non-deterministic, dependent on the 1561 * ring buffer size. Therefore, the code releases and re-acquires the 1562 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1563 * is then used to detect if the list was modified while the lock was 1564 * not held, in which case the check needs to be restarted. 1565 * 1566 * The code attempts to perform the check at most three times before 1567 * giving up. This is acceptable because this is only a self-validation 1568 * to detect problems early on. In practice, the list modification 1569 * operations are fairly spaced, and so this check typically succeeds at 1570 * most on the second try. 1571 */ 1572 again: 1573 if (++nr_loops > 3) 1574 return; 1575 1576 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1577 head = rb_list_head(cpu_buffer->pages); 1578 if (!rb_check_links(cpu_buffer, head)) 1579 goto out_locked; 1580 buffer_cnt = cpu_buffer->cnt; 1581 tmp = head; 1582 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1583 1584 while (true) { 1585 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1586 1587 if (buffer_cnt != cpu_buffer->cnt) { 1588 /* The list was updated, try again. */ 1589 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1590 goto again; 1591 } 1592 1593 tmp = rb_list_head(tmp->next); 1594 if (tmp == head) 1595 /* The iteration circled back, all is done. */ 1596 goto out_locked; 1597 1598 if (!rb_check_links(cpu_buffer, tmp)) 1599 goto out_locked; 1600 1601 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1602 } 1603 1604 out_locked: 1605 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1606 } 1607 1608 /* 1609 * Take an address, add the meta data size as well as the array of 1610 * array subbuffer indexes, then align it to a subbuffer size. 1611 * 1612 * This is used to help find the next per cpu subbuffer within a mapped range. 1613 */ 1614 static unsigned long 1615 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1616 { 1617 addr += sizeof(struct ring_buffer_cpu_meta) + 1618 sizeof(int) * nr_subbufs; 1619 return ALIGN(addr, subbuf_size); 1620 } 1621 1622 /* 1623 * Return the ring_buffer_meta for a given @cpu. 1624 */ 1625 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1626 { 1627 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1628 struct ring_buffer_cpu_meta *meta; 1629 struct ring_buffer_meta *bmeta; 1630 unsigned long ptr; 1631 int nr_subbufs; 1632 1633 bmeta = buffer->meta; 1634 if (!bmeta) 1635 return NULL; 1636 1637 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1638 meta = (struct ring_buffer_cpu_meta *)ptr; 1639 1640 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1641 if (!nr_pages) { 1642 nr_subbufs = meta->nr_subbufs; 1643 } else { 1644 /* Include the reader page */ 1645 nr_subbufs = nr_pages + 1; 1646 } 1647 1648 /* 1649 * The first chunk may not be subbuffer aligned, where as 1650 * the rest of the chunks are. 1651 */ 1652 if (cpu) { 1653 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1654 ptr += subbuf_size * nr_subbufs; 1655 1656 /* We can use multiplication to find chunks greater than 1 */ 1657 if (cpu > 1) { 1658 unsigned long size; 1659 unsigned long p; 1660 1661 /* Save the beginning of this CPU chunk */ 1662 p = ptr; 1663 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1664 ptr += subbuf_size * nr_subbufs; 1665 1666 /* Now all chunks after this are the same size */ 1667 size = ptr - p; 1668 ptr += size * (cpu - 2); 1669 } 1670 } 1671 return (void *)ptr; 1672 } 1673 1674 /* Return the start of subbufs given the meta pointer */ 1675 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1676 { 1677 int subbuf_size = meta->subbuf_size; 1678 unsigned long ptr; 1679 1680 ptr = (unsigned long)meta; 1681 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1682 1683 return (void *)ptr; 1684 } 1685 1686 /* 1687 * Return a specific sub-buffer for a given @cpu defined by @idx. 1688 */ 1689 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1690 { 1691 struct ring_buffer_cpu_meta *meta; 1692 unsigned long ptr; 1693 int subbuf_size; 1694 1695 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1696 if (!meta) 1697 return NULL; 1698 1699 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1700 return NULL; 1701 1702 subbuf_size = meta->subbuf_size; 1703 1704 /* Map this buffer to the order that's in meta->buffers[] */ 1705 idx = meta->buffers[idx]; 1706 1707 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1708 1709 ptr += subbuf_size * idx; 1710 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1711 return NULL; 1712 1713 return (void *)ptr; 1714 } 1715 1716 /* 1717 * See if the existing memory contains a valid meta section. 1718 * if so, use that, otherwise initialize it. 1719 */ 1720 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1721 { 1722 unsigned long ptr = buffer->range_addr_start; 1723 struct ring_buffer_meta *bmeta; 1724 unsigned long total_size; 1725 int struct_sizes; 1726 1727 bmeta = (struct ring_buffer_meta *)ptr; 1728 buffer->meta = bmeta; 1729 1730 total_size = buffer->range_addr_end - buffer->range_addr_start; 1731 1732 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1733 struct_sizes |= sizeof(*bmeta) << 16; 1734 1735 /* The first buffer will start word size after the meta page */ 1736 ptr += sizeof(*bmeta); 1737 ptr = ALIGN(ptr, sizeof(long)); 1738 ptr += scratch_size; 1739 1740 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1741 pr_info("Ring buffer boot meta mismatch of magic\n"); 1742 goto init; 1743 } 1744 1745 if (bmeta->struct_sizes != struct_sizes) { 1746 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1747 goto init; 1748 } 1749 1750 if (bmeta->total_size != total_size) { 1751 pr_info("Ring buffer boot meta mismatch of total size\n"); 1752 goto init; 1753 } 1754 1755 if (bmeta->buffers_offset > bmeta->total_size) { 1756 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1757 goto init; 1758 } 1759 1760 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1761 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1762 goto init; 1763 } 1764 1765 return true; 1766 1767 init: 1768 bmeta->magic = RING_BUFFER_META_MAGIC; 1769 bmeta->struct_sizes = struct_sizes; 1770 bmeta->total_size = total_size; 1771 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1772 1773 /* Zero out the scratch pad */ 1774 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1775 1776 return false; 1777 } 1778 1779 /* 1780 * See if the existing memory contains valid ring buffer data. 1781 * As the previous kernel must be the same as this kernel, all 1782 * the calculations (size of buffers and number of buffers) 1783 * must be the same. 1784 */ 1785 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1786 struct trace_buffer *buffer, int nr_pages, 1787 unsigned long *subbuf_mask) 1788 { 1789 int subbuf_size = PAGE_SIZE; 1790 struct buffer_data_page *subbuf; 1791 unsigned long buffers_start; 1792 unsigned long buffers_end; 1793 int i; 1794 1795 if (!subbuf_mask) 1796 return false; 1797 1798 buffers_start = meta->first_buffer; 1799 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1800 1801 /* Is the head and commit buffers within the range of buffers? */ 1802 if (meta->head_buffer < buffers_start || 1803 meta->head_buffer >= buffers_end) { 1804 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1805 return false; 1806 } 1807 1808 if (meta->commit_buffer < buffers_start || 1809 meta->commit_buffer >= buffers_end) { 1810 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1811 return false; 1812 } 1813 1814 subbuf = rb_subbufs_from_meta(meta); 1815 1816 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1817 1818 /* Is the meta buffers and the subbufs themselves have correct data? */ 1819 for (i = 0; i < meta->nr_subbufs; i++) { 1820 if (meta->buffers[i] < 0 || 1821 meta->buffers[i] >= meta->nr_subbufs) { 1822 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1823 return false; 1824 } 1825 1826 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1827 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1828 return false; 1829 } 1830 1831 if (test_bit(meta->buffers[i], subbuf_mask)) { 1832 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1833 return false; 1834 } 1835 1836 set_bit(meta->buffers[i], subbuf_mask); 1837 subbuf = (void *)subbuf + subbuf_size; 1838 } 1839 1840 return true; 1841 } 1842 1843 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1844 1845 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1846 unsigned long long *timestamp, u64 *delta_ptr) 1847 { 1848 struct ring_buffer_event *event; 1849 u64 ts, delta; 1850 int events = 0; 1851 int e; 1852 1853 *delta_ptr = 0; 1854 *timestamp = 0; 1855 1856 ts = dpage->time_stamp; 1857 1858 for (e = 0; e < tail; e += rb_event_length(event)) { 1859 1860 event = (struct ring_buffer_event *)(dpage->data + e); 1861 1862 switch (event->type_len) { 1863 1864 case RINGBUF_TYPE_TIME_EXTEND: 1865 delta = rb_event_time_stamp(event); 1866 ts += delta; 1867 break; 1868 1869 case RINGBUF_TYPE_TIME_STAMP: 1870 delta = rb_event_time_stamp(event); 1871 delta = rb_fix_abs_ts(delta, ts); 1872 if (delta < ts) { 1873 *delta_ptr = delta; 1874 *timestamp = ts; 1875 return -1; 1876 } 1877 ts = delta; 1878 break; 1879 1880 case RINGBUF_TYPE_PADDING: 1881 if (event->time_delta == 1) 1882 break; 1883 fallthrough; 1884 case RINGBUF_TYPE_DATA: 1885 events++; 1886 ts += event->time_delta; 1887 break; 1888 1889 default: 1890 return -1; 1891 } 1892 } 1893 *timestamp = ts; 1894 return events; 1895 } 1896 1897 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1898 { 1899 unsigned long long ts; 1900 u64 delta; 1901 int tail; 1902 1903 tail = local_read(&dpage->commit); 1904 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1905 } 1906 1907 /* If the meta data has been validated, now validate the events */ 1908 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1909 { 1910 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1911 struct buffer_page *head_page, *orig_head; 1912 unsigned long entry_bytes = 0; 1913 unsigned long entries = 0; 1914 int ret; 1915 u64 ts; 1916 int i; 1917 1918 if (!meta || !meta->head_buffer) 1919 return; 1920 1921 /* Do the reader page first */ 1922 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1923 if (ret < 0) { 1924 pr_info("Ring buffer reader page is invalid\n"); 1925 goto invalid; 1926 } 1927 entries += ret; 1928 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1929 local_set(&cpu_buffer->reader_page->entries, ret); 1930 1931 orig_head = head_page = cpu_buffer->head_page; 1932 ts = head_page->page->time_stamp; 1933 1934 /* 1935 * Try to rewind the head so that we can read the pages which already 1936 * read in the previous boot. 1937 */ 1938 if (head_page == cpu_buffer->tail_page) 1939 goto skip_rewind; 1940 1941 rb_dec_page(&head_page); 1942 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 1943 1944 /* Rewind until tail (writer) page. */ 1945 if (head_page == cpu_buffer->tail_page) 1946 break; 1947 1948 /* Ensure the page has older data than head. */ 1949 if (ts < head_page->page->time_stamp) 1950 break; 1951 1952 ts = head_page->page->time_stamp; 1953 /* Ensure the page has correct timestamp and some data. */ 1954 if (!ts || rb_page_commit(head_page) == 0) 1955 break; 1956 1957 /* Stop rewind if the page is invalid. */ 1958 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1959 if (ret < 0) 1960 break; 1961 1962 /* Recover the number of entries and update stats. */ 1963 local_set(&head_page->entries, ret); 1964 if (ret) 1965 local_inc(&cpu_buffer->pages_touched); 1966 entries += ret; 1967 entry_bytes += rb_page_commit(head_page); 1968 } 1969 if (i) 1970 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 1971 1972 /* The last rewound page must be skipped. */ 1973 if (head_page != orig_head) 1974 rb_inc_page(&head_page); 1975 1976 /* 1977 * If the ring buffer was rewound, then inject the reader page 1978 * into the location just before the original head page. 1979 */ 1980 if (head_page != orig_head) { 1981 struct buffer_page *bpage = orig_head; 1982 1983 rb_dec_page(&bpage); 1984 /* 1985 * Insert the reader_page before the original head page. 1986 * Since the list encode RB_PAGE flags, general list 1987 * operations should be avoided. 1988 */ 1989 cpu_buffer->reader_page->list.next = &orig_head->list; 1990 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 1991 orig_head->list.prev = &cpu_buffer->reader_page->list; 1992 bpage->list.next = &cpu_buffer->reader_page->list; 1993 1994 /* Make the head_page the reader page */ 1995 cpu_buffer->reader_page = head_page; 1996 bpage = head_page; 1997 rb_inc_page(&head_page); 1998 head_page->list.prev = bpage->list.prev; 1999 rb_dec_page(&bpage); 2000 bpage->list.next = &head_page->list; 2001 rb_set_list_to_head(&bpage->list); 2002 cpu_buffer->pages = &head_page->list; 2003 2004 cpu_buffer->head_page = head_page; 2005 meta->head_buffer = (unsigned long)head_page->page; 2006 2007 /* Reset all the indexes */ 2008 bpage = cpu_buffer->reader_page; 2009 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 2010 bpage->id = 0; 2011 2012 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 2013 i++, rb_inc_page(&bpage)) { 2014 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 2015 bpage->id = i; 2016 } 2017 2018 /* We'll restart verifying from orig_head */ 2019 head_page = orig_head; 2020 } 2021 2022 skip_rewind: 2023 /* If the commit_buffer is the reader page, update the commit page */ 2024 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 2025 cpu_buffer->commit_page = cpu_buffer->reader_page; 2026 /* Nothing more to do, the only page is the reader page */ 2027 goto done; 2028 } 2029 2030 /* Iterate until finding the commit page */ 2031 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 2032 2033 /* Reader page has already been done */ 2034 if (head_page == cpu_buffer->reader_page) 2035 continue; 2036 2037 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 2038 if (ret < 0) { 2039 pr_info("Ring buffer meta [%d] invalid buffer page\n", 2040 cpu_buffer->cpu); 2041 goto invalid; 2042 } 2043 2044 /* If the buffer has content, update pages_touched */ 2045 if (ret) 2046 local_inc(&cpu_buffer->pages_touched); 2047 2048 entries += ret; 2049 entry_bytes += local_read(&head_page->page->commit); 2050 local_set(&cpu_buffer->head_page->entries, ret); 2051 2052 if (head_page == cpu_buffer->commit_page) 2053 break; 2054 } 2055 2056 if (head_page != cpu_buffer->commit_page) { 2057 pr_info("Ring buffer meta [%d] commit page not found\n", 2058 cpu_buffer->cpu); 2059 goto invalid; 2060 } 2061 done: 2062 local_set(&cpu_buffer->entries, entries); 2063 local_set(&cpu_buffer->entries_bytes, entry_bytes); 2064 2065 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 2066 return; 2067 2068 invalid: 2069 /* The content of the buffers are invalid, reset the meta data */ 2070 meta->head_buffer = 0; 2071 meta->commit_buffer = 0; 2072 2073 /* Reset the reader page */ 2074 local_set(&cpu_buffer->reader_page->entries, 0); 2075 local_set(&cpu_buffer->reader_page->page->commit, 0); 2076 2077 /* Reset all the subbuffers */ 2078 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2079 local_set(&head_page->entries, 0); 2080 local_set(&head_page->page->commit, 0); 2081 } 2082 } 2083 2084 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2085 { 2086 struct ring_buffer_cpu_meta *meta; 2087 unsigned long *subbuf_mask; 2088 unsigned long delta; 2089 void *subbuf; 2090 bool valid = false; 2091 int cpu; 2092 int i; 2093 2094 /* Create a mask to test the subbuf array */ 2095 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2096 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2097 2098 if (rb_meta_init(buffer, scratch_size)) 2099 valid = true; 2100 2101 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2102 void *next_meta; 2103 2104 meta = rb_range_meta(buffer, nr_pages, cpu); 2105 2106 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2107 /* Make the mappings match the current address */ 2108 subbuf = rb_subbufs_from_meta(meta); 2109 delta = (unsigned long)subbuf - meta->first_buffer; 2110 meta->first_buffer += delta; 2111 meta->head_buffer += delta; 2112 meta->commit_buffer += delta; 2113 continue; 2114 } 2115 2116 if (cpu < nr_cpu_ids - 1) 2117 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2118 else 2119 next_meta = (void *)buffer->range_addr_end; 2120 2121 memset(meta, 0, next_meta - (void *)meta); 2122 2123 meta->nr_subbufs = nr_pages + 1; 2124 meta->subbuf_size = PAGE_SIZE; 2125 2126 subbuf = rb_subbufs_from_meta(meta); 2127 2128 meta->first_buffer = (unsigned long)subbuf; 2129 2130 /* 2131 * The buffers[] array holds the order of the sub-buffers 2132 * that are after the meta data. The sub-buffers may 2133 * be swapped out when read and inserted into a different 2134 * location of the ring buffer. Although their addresses 2135 * remain the same, the buffers[] array contains the 2136 * index into the sub-buffers holding their actual order. 2137 */ 2138 for (i = 0; i < meta->nr_subbufs; i++) { 2139 meta->buffers[i] = i; 2140 rb_init_page(subbuf); 2141 subbuf += meta->subbuf_size; 2142 } 2143 } 2144 bitmap_free(subbuf_mask); 2145 } 2146 2147 static void *rbm_start(struct seq_file *m, loff_t *pos) 2148 { 2149 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2150 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2151 unsigned long val; 2152 2153 if (!meta) 2154 return NULL; 2155 2156 if (*pos > meta->nr_subbufs) 2157 return NULL; 2158 2159 val = *pos; 2160 val++; 2161 2162 return (void *)val; 2163 } 2164 2165 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2166 { 2167 (*pos)++; 2168 2169 return rbm_start(m, pos); 2170 } 2171 2172 static int rbm_show(struct seq_file *m, void *v) 2173 { 2174 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2175 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2176 unsigned long val = (unsigned long)v; 2177 2178 if (val == 1) { 2179 seq_printf(m, "head_buffer: %d\n", 2180 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2181 seq_printf(m, "commit_buffer: %d\n", 2182 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2183 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2184 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2185 return 0; 2186 } 2187 2188 val -= 2; 2189 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2190 2191 return 0; 2192 } 2193 2194 static void rbm_stop(struct seq_file *m, void *p) 2195 { 2196 } 2197 2198 static const struct seq_operations rb_meta_seq_ops = { 2199 .start = rbm_start, 2200 .next = rbm_next, 2201 .show = rbm_show, 2202 .stop = rbm_stop, 2203 }; 2204 2205 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2206 { 2207 struct seq_file *m; 2208 int ret; 2209 2210 ret = seq_open(file, &rb_meta_seq_ops); 2211 if (ret) 2212 return ret; 2213 2214 m = file->private_data; 2215 m->private = buffer->buffers[cpu]; 2216 2217 return 0; 2218 } 2219 2220 /* Map the buffer_pages to the previous head and commit pages */ 2221 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2222 struct buffer_page *bpage) 2223 { 2224 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2225 2226 if (meta->head_buffer == (unsigned long)bpage->page) 2227 cpu_buffer->head_page = bpage; 2228 2229 if (meta->commit_buffer == (unsigned long)bpage->page) { 2230 cpu_buffer->commit_page = bpage; 2231 cpu_buffer->tail_page = bpage; 2232 } 2233 } 2234 2235 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2236 long nr_pages, struct list_head *pages) 2237 { 2238 struct trace_buffer *buffer = cpu_buffer->buffer; 2239 struct ring_buffer_cpu_meta *meta = NULL; 2240 struct buffer_page *bpage, *tmp; 2241 bool user_thread = current->mm != NULL; 2242 long i; 2243 2244 /* 2245 * Check if the available memory is there first. 2246 * Note, si_mem_available() only gives us a rough estimate of available 2247 * memory. It may not be accurate. But we don't care, we just want 2248 * to prevent doing any allocation when it is obvious that it is 2249 * not going to succeed. 2250 */ 2251 i = si_mem_available(); 2252 if (i < nr_pages) 2253 return -ENOMEM; 2254 2255 /* 2256 * If a user thread allocates too much, and si_mem_available() 2257 * reports there's enough memory, even though there is not. 2258 * Make sure the OOM killer kills this thread. This can happen 2259 * even with RETRY_MAYFAIL because another task may be doing 2260 * an allocation after this task has taken all memory. 2261 * This is the task the OOM killer needs to take out during this 2262 * loop, even if it was triggered by an allocation somewhere else. 2263 */ 2264 if (user_thread) 2265 set_current_oom_origin(); 2266 2267 if (buffer->range_addr_start) 2268 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2269 2270 for (i = 0; i < nr_pages; i++) { 2271 2272 bpage = alloc_cpu_page(cpu_buffer->cpu); 2273 if (!bpage) 2274 goto free_pages; 2275 2276 rb_check_bpage(cpu_buffer, bpage); 2277 2278 /* 2279 * Append the pages as for mapped buffers we want to keep 2280 * the order 2281 */ 2282 list_add_tail(&bpage->list, pages); 2283 2284 if (meta) { 2285 /* A range was given. Use that for the buffer page */ 2286 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2287 if (!bpage->page) 2288 goto free_pages; 2289 /* If this is valid from a previous boot */ 2290 if (meta->head_buffer) 2291 rb_meta_buffer_update(cpu_buffer, bpage); 2292 bpage->range = 1; 2293 bpage->id = i + 1; 2294 } else { 2295 int order = cpu_buffer->buffer->subbuf_order; 2296 bpage->page = alloc_cpu_data(cpu_buffer->cpu, order); 2297 if (!bpage->page) 2298 goto free_pages; 2299 } 2300 bpage->order = cpu_buffer->buffer->subbuf_order; 2301 2302 if (user_thread && fatal_signal_pending(current)) 2303 goto free_pages; 2304 } 2305 if (user_thread) 2306 clear_current_oom_origin(); 2307 2308 return 0; 2309 2310 free_pages: 2311 list_for_each_entry_safe(bpage, tmp, pages, list) { 2312 list_del_init(&bpage->list); 2313 free_buffer_page(bpage); 2314 } 2315 if (user_thread) 2316 clear_current_oom_origin(); 2317 2318 return -ENOMEM; 2319 } 2320 2321 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2322 unsigned long nr_pages) 2323 { 2324 LIST_HEAD(pages); 2325 2326 WARN_ON(!nr_pages); 2327 2328 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2329 return -ENOMEM; 2330 2331 /* 2332 * The ring buffer page list is a circular list that does not 2333 * start and end with a list head. All page list items point to 2334 * other pages. 2335 */ 2336 cpu_buffer->pages = pages.next; 2337 list_del(&pages); 2338 2339 cpu_buffer->nr_pages = nr_pages; 2340 2341 rb_check_pages(cpu_buffer); 2342 2343 return 0; 2344 } 2345 2346 static struct ring_buffer_per_cpu * 2347 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2348 { 2349 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = 2350 alloc_cpu_buffer(cpu); 2351 struct ring_buffer_cpu_meta *meta; 2352 struct buffer_page *bpage; 2353 int ret; 2354 2355 if (!cpu_buffer) 2356 return NULL; 2357 2358 cpu_buffer->cpu = cpu; 2359 cpu_buffer->buffer = buffer; 2360 raw_spin_lock_init(&cpu_buffer->reader_lock); 2361 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2362 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2363 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2364 init_completion(&cpu_buffer->update_done); 2365 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2366 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2367 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2368 mutex_init(&cpu_buffer->mapping_lock); 2369 2370 bpage = alloc_cpu_page(cpu); 2371 if (!bpage) 2372 return NULL; 2373 2374 rb_check_bpage(cpu_buffer, bpage); 2375 2376 cpu_buffer->reader_page = bpage; 2377 2378 if (buffer->range_addr_start) { 2379 /* 2380 * Range mapped buffers have the same restrictions as memory 2381 * mapped ones do. 2382 */ 2383 cpu_buffer->mapped = 1; 2384 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2385 bpage->page = rb_range_buffer(cpu_buffer, 0); 2386 if (!bpage->page) 2387 goto fail_free_reader; 2388 if (cpu_buffer->ring_meta->head_buffer) 2389 rb_meta_buffer_update(cpu_buffer, bpage); 2390 bpage->range = 1; 2391 } else { 2392 int order = cpu_buffer->buffer->subbuf_order; 2393 bpage->page = alloc_cpu_data(cpu, order); 2394 if (!bpage->page) 2395 goto fail_free_reader; 2396 } 2397 2398 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2399 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2400 2401 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2402 if (ret < 0) 2403 goto fail_free_reader; 2404 2405 rb_meta_validate_events(cpu_buffer); 2406 2407 /* If the boot meta was valid then this has already been updated */ 2408 meta = cpu_buffer->ring_meta; 2409 if (!meta || !meta->head_buffer || 2410 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2411 if (meta && meta->head_buffer && 2412 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2413 pr_warn("Ring buffer meta buffers not all mapped\n"); 2414 if (!cpu_buffer->head_page) 2415 pr_warn(" Missing head_page\n"); 2416 if (!cpu_buffer->commit_page) 2417 pr_warn(" Missing commit_page\n"); 2418 if (!cpu_buffer->tail_page) 2419 pr_warn(" Missing tail_page\n"); 2420 } 2421 2422 cpu_buffer->head_page 2423 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2424 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2425 2426 rb_head_page_activate(cpu_buffer); 2427 2428 if (cpu_buffer->ring_meta) 2429 meta->commit_buffer = meta->head_buffer; 2430 } else { 2431 /* The valid meta buffer still needs to activate the head page */ 2432 rb_head_page_activate(cpu_buffer); 2433 } 2434 2435 return_ptr(cpu_buffer); 2436 2437 fail_free_reader: 2438 free_buffer_page(cpu_buffer->reader_page); 2439 2440 return NULL; 2441 } 2442 2443 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2444 { 2445 struct list_head *head = cpu_buffer->pages; 2446 struct buffer_page *bpage, *tmp; 2447 2448 irq_work_sync(&cpu_buffer->irq_work.work); 2449 2450 free_buffer_page(cpu_buffer->reader_page); 2451 2452 if (head) { 2453 rb_head_page_deactivate(cpu_buffer); 2454 2455 list_for_each_entry_safe(bpage, tmp, head, list) { 2456 list_del_init(&bpage->list); 2457 free_buffer_page(bpage); 2458 } 2459 bpage = list_entry(head, struct buffer_page, list); 2460 free_buffer_page(bpage); 2461 } 2462 2463 free_page((unsigned long)cpu_buffer->free_page); 2464 2465 kfree(cpu_buffer); 2466 } 2467 2468 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2469 int order, unsigned long start, 2470 unsigned long end, 2471 unsigned long scratch_size, 2472 struct lock_class_key *key) 2473 { 2474 struct trace_buffer *buffer __free(kfree) = NULL; 2475 long nr_pages; 2476 int subbuf_size; 2477 int bsize; 2478 int cpu; 2479 int ret; 2480 2481 /* keep it in its own cache line */ 2482 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2483 GFP_KERNEL); 2484 if (!buffer) 2485 return NULL; 2486 2487 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2488 return NULL; 2489 2490 buffer->subbuf_order = order; 2491 subbuf_size = (PAGE_SIZE << order); 2492 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2493 2494 /* Max payload is buffer page size - header (8bytes) */ 2495 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2496 2497 buffer->flags = flags; 2498 buffer->clock = trace_clock_local; 2499 buffer->reader_lock_key = key; 2500 2501 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2502 init_waitqueue_head(&buffer->irq_work.waiters); 2503 2504 buffer->cpus = nr_cpu_ids; 2505 2506 bsize = sizeof(void *) * nr_cpu_ids; 2507 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2508 GFP_KERNEL); 2509 if (!buffer->buffers) 2510 goto fail_free_cpumask; 2511 2512 /* If start/end are specified, then that overrides size */ 2513 if (start && end) { 2514 unsigned long buffers_start; 2515 unsigned long ptr; 2516 int n; 2517 2518 /* Make sure that start is word aligned */ 2519 start = ALIGN(start, sizeof(long)); 2520 2521 /* scratch_size needs to be aligned too */ 2522 scratch_size = ALIGN(scratch_size, sizeof(long)); 2523 2524 /* Subtract the buffer meta data and word aligned */ 2525 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2526 buffers_start = ALIGN(buffers_start, sizeof(long)); 2527 buffers_start += scratch_size; 2528 2529 /* Calculate the size for the per CPU data */ 2530 size = end - buffers_start; 2531 size = size / nr_cpu_ids; 2532 2533 /* 2534 * The number of sub-buffers (nr_pages) is determined by the 2535 * total size allocated minus the meta data size. 2536 * Then that is divided by the number of per CPU buffers 2537 * needed, plus account for the integer array index that 2538 * will be appended to the meta data. 2539 */ 2540 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2541 (subbuf_size + sizeof(int)); 2542 /* Need at least two pages plus the reader page */ 2543 if (nr_pages < 3) 2544 goto fail_free_buffers; 2545 2546 again: 2547 /* Make sure that the size fits aligned */ 2548 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2549 ptr += sizeof(struct ring_buffer_cpu_meta) + 2550 sizeof(int) * nr_pages; 2551 ptr = ALIGN(ptr, subbuf_size); 2552 ptr += subbuf_size * nr_pages; 2553 } 2554 if (ptr > end) { 2555 if (nr_pages <= 3) 2556 goto fail_free_buffers; 2557 nr_pages--; 2558 goto again; 2559 } 2560 2561 /* nr_pages should not count the reader page */ 2562 nr_pages--; 2563 buffer->range_addr_start = start; 2564 buffer->range_addr_end = end; 2565 2566 rb_range_meta_init(buffer, nr_pages, scratch_size); 2567 } else { 2568 2569 /* need at least two pages */ 2570 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2571 if (nr_pages < 2) 2572 nr_pages = 2; 2573 } 2574 2575 cpu = raw_smp_processor_id(); 2576 cpumask_set_cpu(cpu, buffer->cpumask); 2577 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2578 if (!buffer->buffers[cpu]) 2579 goto fail_free_buffers; 2580 2581 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2582 if (ret < 0) 2583 goto fail_free_buffers; 2584 2585 mutex_init(&buffer->mutex); 2586 2587 return_ptr(buffer); 2588 2589 fail_free_buffers: 2590 for_each_buffer_cpu(buffer, cpu) { 2591 if (buffer->buffers[cpu]) 2592 rb_free_cpu_buffer(buffer->buffers[cpu]); 2593 } 2594 kfree(buffer->buffers); 2595 2596 fail_free_cpumask: 2597 free_cpumask_var(buffer->cpumask); 2598 2599 return NULL; 2600 } 2601 2602 /** 2603 * __ring_buffer_alloc - allocate a new ring_buffer 2604 * @size: the size in bytes per cpu that is needed. 2605 * @flags: attributes to set for the ring buffer. 2606 * @key: ring buffer reader_lock_key. 2607 * 2608 * Currently the only flag that is available is the RB_FL_OVERWRITE 2609 * flag. This flag means that the buffer will overwrite old data 2610 * when the buffer wraps. If this flag is not set, the buffer will 2611 * drop data when the tail hits the head. 2612 */ 2613 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2614 struct lock_class_key *key) 2615 { 2616 /* Default buffer page size - one system page */ 2617 return alloc_buffer(size, flags, 0, 0, 0, 0, key); 2618 2619 } 2620 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2621 2622 /** 2623 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2624 * @size: the size in bytes per cpu that is needed. 2625 * @flags: attributes to set for the ring buffer. 2626 * @order: sub-buffer order 2627 * @start: start of allocated range 2628 * @range_size: size of allocated range 2629 * @scratch_size: size of scratch area (for preallocated memory buffers) 2630 * @key: ring buffer reader_lock_key. 2631 * 2632 * Currently the only flag that is available is the RB_FL_OVERWRITE 2633 * flag. This flag means that the buffer will overwrite old data 2634 * when the buffer wraps. If this flag is not set, the buffer will 2635 * drop data when the tail hits the head. 2636 */ 2637 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2638 int order, unsigned long start, 2639 unsigned long range_size, 2640 unsigned long scratch_size, 2641 struct lock_class_key *key) 2642 { 2643 return alloc_buffer(size, flags, order, start, start + range_size, 2644 scratch_size, key); 2645 } 2646 2647 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2648 { 2649 struct ring_buffer_meta *meta; 2650 void *ptr; 2651 2652 if (!buffer || !buffer->meta) 2653 return NULL; 2654 2655 meta = buffer->meta; 2656 2657 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2658 2659 if (size) 2660 *size = (void *)meta + meta->buffers_offset - ptr; 2661 2662 return ptr; 2663 } 2664 2665 /** 2666 * ring_buffer_free - free a ring buffer. 2667 * @buffer: the buffer to free. 2668 */ 2669 void 2670 ring_buffer_free(struct trace_buffer *buffer) 2671 { 2672 int cpu; 2673 2674 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2675 2676 irq_work_sync(&buffer->irq_work.work); 2677 2678 for_each_buffer_cpu(buffer, cpu) 2679 rb_free_cpu_buffer(buffer->buffers[cpu]); 2680 2681 kfree(buffer->buffers); 2682 free_cpumask_var(buffer->cpumask); 2683 2684 kfree(buffer); 2685 } 2686 EXPORT_SYMBOL_GPL(ring_buffer_free); 2687 2688 void ring_buffer_set_clock(struct trace_buffer *buffer, 2689 u64 (*clock)(void)) 2690 { 2691 buffer->clock = clock; 2692 } 2693 2694 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2695 { 2696 buffer->time_stamp_abs = abs; 2697 } 2698 2699 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2700 { 2701 return buffer->time_stamp_abs; 2702 } 2703 2704 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2705 { 2706 return local_read(&bpage->entries) & RB_WRITE_MASK; 2707 } 2708 2709 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2710 { 2711 return local_read(&bpage->write) & RB_WRITE_MASK; 2712 } 2713 2714 static bool 2715 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2716 { 2717 struct list_head *tail_page, *to_remove, *next_page; 2718 struct buffer_page *to_remove_page, *tmp_iter_page; 2719 struct buffer_page *last_page, *first_page; 2720 unsigned long nr_removed; 2721 unsigned long head_bit; 2722 int page_entries; 2723 2724 head_bit = 0; 2725 2726 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2727 atomic_inc(&cpu_buffer->record_disabled); 2728 /* 2729 * We don't race with the readers since we have acquired the reader 2730 * lock. We also don't race with writers after disabling recording. 2731 * This makes it easy to figure out the first and the last page to be 2732 * removed from the list. We unlink all the pages in between including 2733 * the first and last pages. This is done in a busy loop so that we 2734 * lose the least number of traces. 2735 * The pages are freed after we restart recording and unlock readers. 2736 */ 2737 tail_page = &cpu_buffer->tail_page->list; 2738 2739 /* 2740 * tail page might be on reader page, we remove the next page 2741 * from the ring buffer 2742 */ 2743 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2744 tail_page = rb_list_head(tail_page->next); 2745 to_remove = tail_page; 2746 2747 /* start of pages to remove */ 2748 first_page = list_entry(rb_list_head(to_remove->next), 2749 struct buffer_page, list); 2750 2751 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2752 to_remove = rb_list_head(to_remove)->next; 2753 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2754 } 2755 /* Read iterators need to reset themselves when some pages removed */ 2756 cpu_buffer->pages_removed += nr_removed; 2757 2758 next_page = rb_list_head(to_remove)->next; 2759 2760 /* 2761 * Now we remove all pages between tail_page and next_page. 2762 * Make sure that we have head_bit value preserved for the 2763 * next page 2764 */ 2765 tail_page->next = (struct list_head *)((unsigned long)next_page | 2766 head_bit); 2767 next_page = rb_list_head(next_page); 2768 next_page->prev = tail_page; 2769 2770 /* make sure pages points to a valid page in the ring buffer */ 2771 cpu_buffer->pages = next_page; 2772 cpu_buffer->cnt++; 2773 2774 /* update head page */ 2775 if (head_bit) 2776 cpu_buffer->head_page = list_entry(next_page, 2777 struct buffer_page, list); 2778 2779 /* pages are removed, resume tracing and then free the pages */ 2780 atomic_dec(&cpu_buffer->record_disabled); 2781 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2782 2783 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2784 2785 /* last buffer page to remove */ 2786 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2787 list); 2788 tmp_iter_page = first_page; 2789 2790 do { 2791 cond_resched(); 2792 2793 to_remove_page = tmp_iter_page; 2794 rb_inc_page(&tmp_iter_page); 2795 2796 /* update the counters */ 2797 page_entries = rb_page_entries(to_remove_page); 2798 if (page_entries) { 2799 /* 2800 * If something was added to this page, it was full 2801 * since it is not the tail page. So we deduct the 2802 * bytes consumed in ring buffer from here. 2803 * Increment overrun to account for the lost events. 2804 */ 2805 local_add(page_entries, &cpu_buffer->overrun); 2806 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2807 local_inc(&cpu_buffer->pages_lost); 2808 } 2809 2810 /* 2811 * We have already removed references to this list item, just 2812 * free up the buffer_page and its page 2813 */ 2814 free_buffer_page(to_remove_page); 2815 nr_removed--; 2816 2817 } while (to_remove_page != last_page); 2818 2819 RB_WARN_ON(cpu_buffer, nr_removed); 2820 2821 return nr_removed == 0; 2822 } 2823 2824 static bool 2825 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2826 { 2827 struct list_head *pages = &cpu_buffer->new_pages; 2828 unsigned long flags; 2829 bool success; 2830 int retries; 2831 2832 /* Can be called at early boot up, where interrupts must not been enabled */ 2833 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2834 /* 2835 * We are holding the reader lock, so the reader page won't be swapped 2836 * in the ring buffer. Now we are racing with the writer trying to 2837 * move head page and the tail page. 2838 * We are going to adapt the reader page update process where: 2839 * 1. We first splice the start and end of list of new pages between 2840 * the head page and its previous page. 2841 * 2. We cmpxchg the prev_page->next to point from head page to the 2842 * start of new pages list. 2843 * 3. Finally, we update the head->prev to the end of new list. 2844 * 2845 * We will try this process 10 times, to make sure that we don't keep 2846 * spinning. 2847 */ 2848 retries = 10; 2849 success = false; 2850 while (retries--) { 2851 struct list_head *head_page, *prev_page; 2852 struct list_head *last_page, *first_page; 2853 struct list_head *head_page_with_bit; 2854 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2855 2856 if (!hpage) 2857 break; 2858 head_page = &hpage->list; 2859 prev_page = head_page->prev; 2860 2861 first_page = pages->next; 2862 last_page = pages->prev; 2863 2864 head_page_with_bit = (struct list_head *) 2865 ((unsigned long)head_page | RB_PAGE_HEAD); 2866 2867 last_page->next = head_page_with_bit; 2868 first_page->prev = prev_page; 2869 2870 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2871 if (try_cmpxchg(&prev_page->next, 2872 &head_page_with_bit, first_page)) { 2873 /* 2874 * yay, we replaced the page pointer to our new list, 2875 * now, we just have to update to head page's prev 2876 * pointer to point to end of list 2877 */ 2878 head_page->prev = last_page; 2879 cpu_buffer->cnt++; 2880 success = true; 2881 break; 2882 } 2883 } 2884 2885 if (success) 2886 INIT_LIST_HEAD(pages); 2887 /* 2888 * If we weren't successful in adding in new pages, warn and stop 2889 * tracing 2890 */ 2891 RB_WARN_ON(cpu_buffer, !success); 2892 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2893 2894 /* free pages if they weren't inserted */ 2895 if (!success) { 2896 struct buffer_page *bpage, *tmp; 2897 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2898 list) { 2899 list_del_init(&bpage->list); 2900 free_buffer_page(bpage); 2901 } 2902 } 2903 return success; 2904 } 2905 2906 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2907 { 2908 bool success; 2909 2910 if (cpu_buffer->nr_pages_to_update > 0) 2911 success = rb_insert_pages(cpu_buffer); 2912 else 2913 success = rb_remove_pages(cpu_buffer, 2914 -cpu_buffer->nr_pages_to_update); 2915 2916 if (success) 2917 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2918 } 2919 2920 static void update_pages_handler(struct work_struct *work) 2921 { 2922 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2923 struct ring_buffer_per_cpu, update_pages_work); 2924 rb_update_pages(cpu_buffer); 2925 complete(&cpu_buffer->update_done); 2926 } 2927 2928 /** 2929 * ring_buffer_resize - resize the ring buffer 2930 * @buffer: the buffer to resize. 2931 * @size: the new size. 2932 * @cpu_id: the cpu buffer to resize 2933 * 2934 * Minimum size is 2 * buffer->subbuf_size. 2935 * 2936 * Returns 0 on success and < 0 on failure. 2937 */ 2938 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2939 int cpu_id) 2940 { 2941 struct ring_buffer_per_cpu *cpu_buffer; 2942 unsigned long nr_pages; 2943 int cpu, err; 2944 2945 /* 2946 * Always succeed at resizing a non-existent buffer: 2947 */ 2948 if (!buffer) 2949 return 0; 2950 2951 /* Make sure the requested buffer exists */ 2952 if (cpu_id != RING_BUFFER_ALL_CPUS && 2953 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2954 return 0; 2955 2956 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2957 2958 /* we need a minimum of two pages */ 2959 if (nr_pages < 2) 2960 nr_pages = 2; 2961 2962 /* 2963 * Keep CPUs from coming online while resizing to synchronize 2964 * with new per CPU buffers being created. 2965 */ 2966 guard(cpus_read_lock)(); 2967 2968 /* prevent another thread from changing buffer sizes */ 2969 mutex_lock(&buffer->mutex); 2970 atomic_inc(&buffer->resizing); 2971 2972 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2973 /* 2974 * Don't succeed if resizing is disabled, as a reader might be 2975 * manipulating the ring buffer and is expecting a sane state while 2976 * this is true. 2977 */ 2978 for_each_buffer_cpu(buffer, cpu) { 2979 cpu_buffer = buffer->buffers[cpu]; 2980 if (atomic_read(&cpu_buffer->resize_disabled)) { 2981 err = -EBUSY; 2982 goto out_err_unlock; 2983 } 2984 } 2985 2986 /* calculate the pages to update */ 2987 for_each_buffer_cpu(buffer, cpu) { 2988 cpu_buffer = buffer->buffers[cpu]; 2989 2990 cpu_buffer->nr_pages_to_update = nr_pages - 2991 cpu_buffer->nr_pages; 2992 /* 2993 * nothing more to do for removing pages or no update 2994 */ 2995 if (cpu_buffer->nr_pages_to_update <= 0) 2996 continue; 2997 /* 2998 * to add pages, make sure all new pages can be 2999 * allocated without receiving ENOMEM 3000 */ 3001 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3002 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3003 &cpu_buffer->new_pages)) { 3004 /* not enough memory for new pages */ 3005 err = -ENOMEM; 3006 goto out_err; 3007 } 3008 3009 cond_resched(); 3010 } 3011 3012 /* 3013 * Fire off all the required work handlers 3014 * We can't schedule on offline CPUs, but it's not necessary 3015 * since we can change their buffer sizes without any race. 3016 */ 3017 for_each_buffer_cpu(buffer, cpu) { 3018 cpu_buffer = buffer->buffers[cpu]; 3019 if (!cpu_buffer->nr_pages_to_update) 3020 continue; 3021 3022 /* Can't run something on an offline CPU. */ 3023 if (!cpu_online(cpu)) { 3024 rb_update_pages(cpu_buffer); 3025 cpu_buffer->nr_pages_to_update = 0; 3026 } else { 3027 /* Run directly if possible. */ 3028 migrate_disable(); 3029 if (cpu != smp_processor_id()) { 3030 migrate_enable(); 3031 schedule_work_on(cpu, 3032 &cpu_buffer->update_pages_work); 3033 } else { 3034 update_pages_handler(&cpu_buffer->update_pages_work); 3035 migrate_enable(); 3036 } 3037 } 3038 } 3039 3040 /* wait for all the updates to complete */ 3041 for_each_buffer_cpu(buffer, cpu) { 3042 cpu_buffer = buffer->buffers[cpu]; 3043 if (!cpu_buffer->nr_pages_to_update) 3044 continue; 3045 3046 if (cpu_online(cpu)) 3047 wait_for_completion(&cpu_buffer->update_done); 3048 cpu_buffer->nr_pages_to_update = 0; 3049 } 3050 3051 } else { 3052 cpu_buffer = buffer->buffers[cpu_id]; 3053 3054 if (nr_pages == cpu_buffer->nr_pages) 3055 goto out; 3056 3057 /* 3058 * Don't succeed if resizing is disabled, as a reader might be 3059 * manipulating the ring buffer and is expecting a sane state while 3060 * this is true. 3061 */ 3062 if (atomic_read(&cpu_buffer->resize_disabled)) { 3063 err = -EBUSY; 3064 goto out_err_unlock; 3065 } 3066 3067 cpu_buffer->nr_pages_to_update = nr_pages - 3068 cpu_buffer->nr_pages; 3069 3070 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3071 if (cpu_buffer->nr_pages_to_update > 0 && 3072 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3073 &cpu_buffer->new_pages)) { 3074 err = -ENOMEM; 3075 goto out_err; 3076 } 3077 3078 /* Can't run something on an offline CPU. */ 3079 if (!cpu_online(cpu_id)) 3080 rb_update_pages(cpu_buffer); 3081 else { 3082 /* Run directly if possible. */ 3083 migrate_disable(); 3084 if (cpu_id == smp_processor_id()) { 3085 rb_update_pages(cpu_buffer); 3086 migrate_enable(); 3087 } else { 3088 migrate_enable(); 3089 schedule_work_on(cpu_id, 3090 &cpu_buffer->update_pages_work); 3091 wait_for_completion(&cpu_buffer->update_done); 3092 } 3093 } 3094 3095 cpu_buffer->nr_pages_to_update = 0; 3096 } 3097 3098 out: 3099 /* 3100 * The ring buffer resize can happen with the ring buffer 3101 * enabled, so that the update disturbs the tracing as little 3102 * as possible. But if the buffer is disabled, we do not need 3103 * to worry about that, and we can take the time to verify 3104 * that the buffer is not corrupt. 3105 */ 3106 if (atomic_read(&buffer->record_disabled)) { 3107 atomic_inc(&buffer->record_disabled); 3108 /* 3109 * Even though the buffer was disabled, we must make sure 3110 * that it is truly disabled before calling rb_check_pages. 3111 * There could have been a race between checking 3112 * record_disable and incrementing it. 3113 */ 3114 synchronize_rcu(); 3115 for_each_buffer_cpu(buffer, cpu) { 3116 cpu_buffer = buffer->buffers[cpu]; 3117 rb_check_pages(cpu_buffer); 3118 } 3119 atomic_dec(&buffer->record_disabled); 3120 } 3121 3122 atomic_dec(&buffer->resizing); 3123 mutex_unlock(&buffer->mutex); 3124 return 0; 3125 3126 out_err: 3127 for_each_buffer_cpu(buffer, cpu) { 3128 struct buffer_page *bpage, *tmp; 3129 3130 cpu_buffer = buffer->buffers[cpu]; 3131 cpu_buffer->nr_pages_to_update = 0; 3132 3133 if (list_empty(&cpu_buffer->new_pages)) 3134 continue; 3135 3136 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3137 list) { 3138 list_del_init(&bpage->list); 3139 free_buffer_page(bpage); 3140 3141 cond_resched(); 3142 } 3143 } 3144 out_err_unlock: 3145 atomic_dec(&buffer->resizing); 3146 mutex_unlock(&buffer->mutex); 3147 return err; 3148 } 3149 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3150 3151 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3152 { 3153 mutex_lock(&buffer->mutex); 3154 if (val) 3155 buffer->flags |= RB_FL_OVERWRITE; 3156 else 3157 buffer->flags &= ~RB_FL_OVERWRITE; 3158 mutex_unlock(&buffer->mutex); 3159 } 3160 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3161 3162 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3163 { 3164 return bpage->page->data + index; 3165 } 3166 3167 static __always_inline struct ring_buffer_event * 3168 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3169 { 3170 return __rb_page_index(cpu_buffer->reader_page, 3171 cpu_buffer->reader_page->read); 3172 } 3173 3174 static struct ring_buffer_event * 3175 rb_iter_head_event(struct ring_buffer_iter *iter) 3176 { 3177 struct ring_buffer_event *event; 3178 struct buffer_page *iter_head_page = iter->head_page; 3179 unsigned long commit; 3180 unsigned length; 3181 3182 if (iter->head != iter->next_event) 3183 return iter->event; 3184 3185 /* 3186 * When the writer goes across pages, it issues a cmpxchg which 3187 * is a mb(), which will synchronize with the rmb here. 3188 * (see rb_tail_page_update() and __rb_reserve_next()) 3189 */ 3190 commit = rb_page_commit(iter_head_page); 3191 smp_rmb(); 3192 3193 /* An event needs to be at least 8 bytes in size */ 3194 if (iter->head > commit - 8) 3195 goto reset; 3196 3197 event = __rb_page_index(iter_head_page, iter->head); 3198 length = rb_event_length(event); 3199 3200 /* 3201 * READ_ONCE() doesn't work on functions and we don't want the 3202 * compiler doing any crazy optimizations with length. 3203 */ 3204 barrier(); 3205 3206 if ((iter->head + length) > commit || length > iter->event_size) 3207 /* Writer corrupted the read? */ 3208 goto reset; 3209 3210 memcpy(iter->event, event, length); 3211 /* 3212 * If the page stamp is still the same after this rmb() then the 3213 * event was safely copied without the writer entering the page. 3214 */ 3215 smp_rmb(); 3216 3217 /* Make sure the page didn't change since we read this */ 3218 if (iter->page_stamp != iter_head_page->page->time_stamp || 3219 commit > rb_page_commit(iter_head_page)) 3220 goto reset; 3221 3222 iter->next_event = iter->head + length; 3223 return iter->event; 3224 reset: 3225 /* Reset to the beginning */ 3226 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3227 iter->head = 0; 3228 iter->next_event = 0; 3229 iter->missed_events = 1; 3230 return NULL; 3231 } 3232 3233 /* Size is determined by what has been committed */ 3234 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3235 { 3236 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3237 } 3238 3239 static __always_inline unsigned 3240 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3241 { 3242 return rb_page_commit(cpu_buffer->commit_page); 3243 } 3244 3245 static __always_inline unsigned 3246 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3247 { 3248 unsigned long addr = (unsigned long)event; 3249 3250 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3251 3252 return addr - BUF_PAGE_HDR_SIZE; 3253 } 3254 3255 static void rb_inc_iter(struct ring_buffer_iter *iter) 3256 { 3257 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3258 3259 /* 3260 * The iterator could be on the reader page (it starts there). 3261 * But the head could have moved, since the reader was 3262 * found. Check for this case and assign the iterator 3263 * to the head page instead of next. 3264 */ 3265 if (iter->head_page == cpu_buffer->reader_page) 3266 iter->head_page = rb_set_head_page(cpu_buffer); 3267 else 3268 rb_inc_page(&iter->head_page); 3269 3270 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3271 iter->head = 0; 3272 iter->next_event = 0; 3273 } 3274 3275 /* Return the index into the sub-buffers for a given sub-buffer */ 3276 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3277 { 3278 void *subbuf_array; 3279 3280 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3281 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3282 return (subbuf - subbuf_array) / meta->subbuf_size; 3283 } 3284 3285 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3286 struct buffer_page *next_page) 3287 { 3288 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3289 unsigned long old_head = (unsigned long)next_page->page; 3290 unsigned long new_head; 3291 3292 rb_inc_page(&next_page); 3293 new_head = (unsigned long)next_page->page; 3294 3295 /* 3296 * Only move it forward once, if something else came in and 3297 * moved it forward, then we don't want to touch it. 3298 */ 3299 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3300 } 3301 3302 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3303 struct buffer_page *reader) 3304 { 3305 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3306 void *old_reader = cpu_buffer->reader_page->page; 3307 void *new_reader = reader->page; 3308 int id; 3309 3310 id = reader->id; 3311 cpu_buffer->reader_page->id = id; 3312 reader->id = 0; 3313 3314 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3315 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3316 3317 /* The head pointer is the one after the reader */ 3318 rb_update_meta_head(cpu_buffer, reader); 3319 } 3320 3321 /* 3322 * rb_handle_head_page - writer hit the head page 3323 * 3324 * Returns: +1 to retry page 3325 * 0 to continue 3326 * -1 on error 3327 */ 3328 static int 3329 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3330 struct buffer_page *tail_page, 3331 struct buffer_page *next_page) 3332 { 3333 struct buffer_page *new_head; 3334 int entries; 3335 int type; 3336 int ret; 3337 3338 entries = rb_page_entries(next_page); 3339 3340 /* 3341 * The hard part is here. We need to move the head 3342 * forward, and protect against both readers on 3343 * other CPUs and writers coming in via interrupts. 3344 */ 3345 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3346 RB_PAGE_HEAD); 3347 3348 /* 3349 * type can be one of four: 3350 * NORMAL - an interrupt already moved it for us 3351 * HEAD - we are the first to get here. 3352 * UPDATE - we are the interrupt interrupting 3353 * a current move. 3354 * MOVED - a reader on another CPU moved the next 3355 * pointer to its reader page. Give up 3356 * and try again. 3357 */ 3358 3359 switch (type) { 3360 case RB_PAGE_HEAD: 3361 /* 3362 * We changed the head to UPDATE, thus 3363 * it is our responsibility to update 3364 * the counters. 3365 */ 3366 local_add(entries, &cpu_buffer->overrun); 3367 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3368 local_inc(&cpu_buffer->pages_lost); 3369 3370 if (cpu_buffer->ring_meta) 3371 rb_update_meta_head(cpu_buffer, next_page); 3372 /* 3373 * The entries will be zeroed out when we move the 3374 * tail page. 3375 */ 3376 3377 /* still more to do */ 3378 break; 3379 3380 case RB_PAGE_UPDATE: 3381 /* 3382 * This is an interrupt that interrupt the 3383 * previous update. Still more to do. 3384 */ 3385 break; 3386 case RB_PAGE_NORMAL: 3387 /* 3388 * An interrupt came in before the update 3389 * and processed this for us. 3390 * Nothing left to do. 3391 */ 3392 return 1; 3393 case RB_PAGE_MOVED: 3394 /* 3395 * The reader is on another CPU and just did 3396 * a swap with our next_page. 3397 * Try again. 3398 */ 3399 return 1; 3400 default: 3401 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3402 return -1; 3403 } 3404 3405 /* 3406 * Now that we are here, the old head pointer is 3407 * set to UPDATE. This will keep the reader from 3408 * swapping the head page with the reader page. 3409 * The reader (on another CPU) will spin till 3410 * we are finished. 3411 * 3412 * We just need to protect against interrupts 3413 * doing the job. We will set the next pointer 3414 * to HEAD. After that, we set the old pointer 3415 * to NORMAL, but only if it was HEAD before. 3416 * otherwise we are an interrupt, and only 3417 * want the outer most commit to reset it. 3418 */ 3419 new_head = next_page; 3420 rb_inc_page(&new_head); 3421 3422 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3423 RB_PAGE_NORMAL); 3424 3425 /* 3426 * Valid returns are: 3427 * HEAD - an interrupt came in and already set it. 3428 * NORMAL - One of two things: 3429 * 1) We really set it. 3430 * 2) A bunch of interrupts came in and moved 3431 * the page forward again. 3432 */ 3433 switch (ret) { 3434 case RB_PAGE_HEAD: 3435 case RB_PAGE_NORMAL: 3436 /* OK */ 3437 break; 3438 default: 3439 RB_WARN_ON(cpu_buffer, 1); 3440 return -1; 3441 } 3442 3443 /* 3444 * It is possible that an interrupt came in, 3445 * set the head up, then more interrupts came in 3446 * and moved it again. When we get back here, 3447 * the page would have been set to NORMAL but we 3448 * just set it back to HEAD. 3449 * 3450 * How do you detect this? Well, if that happened 3451 * the tail page would have moved. 3452 */ 3453 if (ret == RB_PAGE_NORMAL) { 3454 struct buffer_page *buffer_tail_page; 3455 3456 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3457 /* 3458 * If the tail had moved passed next, then we need 3459 * to reset the pointer. 3460 */ 3461 if (buffer_tail_page != tail_page && 3462 buffer_tail_page != next_page) 3463 rb_head_page_set_normal(cpu_buffer, new_head, 3464 next_page, 3465 RB_PAGE_HEAD); 3466 } 3467 3468 /* 3469 * If this was the outer most commit (the one that 3470 * changed the original pointer from HEAD to UPDATE), 3471 * then it is up to us to reset it to NORMAL. 3472 */ 3473 if (type == RB_PAGE_HEAD) { 3474 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3475 tail_page, 3476 RB_PAGE_UPDATE); 3477 if (RB_WARN_ON(cpu_buffer, 3478 ret != RB_PAGE_UPDATE)) 3479 return -1; 3480 } 3481 3482 return 0; 3483 } 3484 3485 static inline void 3486 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3487 unsigned long tail, struct rb_event_info *info) 3488 { 3489 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3490 struct buffer_page *tail_page = info->tail_page; 3491 struct ring_buffer_event *event; 3492 unsigned long length = info->length; 3493 3494 /* 3495 * Only the event that crossed the page boundary 3496 * must fill the old tail_page with padding. 3497 */ 3498 if (tail >= bsize) { 3499 /* 3500 * If the page was filled, then we still need 3501 * to update the real_end. Reset it to zero 3502 * and the reader will ignore it. 3503 */ 3504 if (tail == bsize) 3505 tail_page->real_end = 0; 3506 3507 local_sub(length, &tail_page->write); 3508 return; 3509 } 3510 3511 event = __rb_page_index(tail_page, tail); 3512 3513 /* 3514 * Save the original length to the meta data. 3515 * This will be used by the reader to add lost event 3516 * counter. 3517 */ 3518 tail_page->real_end = tail; 3519 3520 /* 3521 * If this event is bigger than the minimum size, then 3522 * we need to be careful that we don't subtract the 3523 * write counter enough to allow another writer to slip 3524 * in on this page. 3525 * We put in a discarded commit instead, to make sure 3526 * that this space is not used again, and this space will 3527 * not be accounted into 'entries_bytes'. 3528 * 3529 * If we are less than the minimum size, we don't need to 3530 * worry about it. 3531 */ 3532 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3533 /* No room for any events */ 3534 3535 /* Mark the rest of the page with padding */ 3536 rb_event_set_padding(event); 3537 3538 /* Make sure the padding is visible before the write update */ 3539 smp_wmb(); 3540 3541 /* Set the write back to the previous setting */ 3542 local_sub(length, &tail_page->write); 3543 return; 3544 } 3545 3546 /* Put in a discarded event */ 3547 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3548 event->type_len = RINGBUF_TYPE_PADDING; 3549 /* time delta must be non zero */ 3550 event->time_delta = 1; 3551 3552 /* account for padding bytes */ 3553 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3554 3555 /* Make sure the padding is visible before the tail_page->write update */ 3556 smp_wmb(); 3557 3558 /* Set write to end of buffer */ 3559 length = (tail + length) - bsize; 3560 local_sub(length, &tail_page->write); 3561 } 3562 3563 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3564 3565 /* 3566 * This is the slow path, force gcc not to inline it. 3567 */ 3568 static noinline struct ring_buffer_event * 3569 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3570 unsigned long tail, struct rb_event_info *info) 3571 { 3572 struct buffer_page *tail_page = info->tail_page; 3573 struct buffer_page *commit_page = cpu_buffer->commit_page; 3574 struct trace_buffer *buffer = cpu_buffer->buffer; 3575 struct buffer_page *next_page; 3576 int ret; 3577 3578 next_page = tail_page; 3579 3580 rb_inc_page(&next_page); 3581 3582 /* 3583 * If for some reason, we had an interrupt storm that made 3584 * it all the way around the buffer, bail, and warn 3585 * about it. 3586 */ 3587 if (unlikely(next_page == commit_page)) { 3588 local_inc(&cpu_buffer->commit_overrun); 3589 goto out_reset; 3590 } 3591 3592 /* 3593 * This is where the fun begins! 3594 * 3595 * We are fighting against races between a reader that 3596 * could be on another CPU trying to swap its reader 3597 * page with the buffer head. 3598 * 3599 * We are also fighting against interrupts coming in and 3600 * moving the head or tail on us as well. 3601 * 3602 * If the next page is the head page then we have filled 3603 * the buffer, unless the commit page is still on the 3604 * reader page. 3605 */ 3606 if (rb_is_head_page(next_page, &tail_page->list)) { 3607 3608 /* 3609 * If the commit is not on the reader page, then 3610 * move the header page. 3611 */ 3612 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3613 /* 3614 * If we are not in overwrite mode, 3615 * this is easy, just stop here. 3616 */ 3617 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3618 local_inc(&cpu_buffer->dropped_events); 3619 goto out_reset; 3620 } 3621 3622 ret = rb_handle_head_page(cpu_buffer, 3623 tail_page, 3624 next_page); 3625 if (ret < 0) 3626 goto out_reset; 3627 if (ret) 3628 goto out_again; 3629 } else { 3630 /* 3631 * We need to be careful here too. The 3632 * commit page could still be on the reader 3633 * page. We could have a small buffer, and 3634 * have filled up the buffer with events 3635 * from interrupts and such, and wrapped. 3636 * 3637 * Note, if the tail page is also on the 3638 * reader_page, we let it move out. 3639 */ 3640 if (unlikely((cpu_buffer->commit_page != 3641 cpu_buffer->tail_page) && 3642 (cpu_buffer->commit_page == 3643 cpu_buffer->reader_page))) { 3644 local_inc(&cpu_buffer->commit_overrun); 3645 goto out_reset; 3646 } 3647 } 3648 } 3649 3650 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3651 3652 out_again: 3653 3654 rb_reset_tail(cpu_buffer, tail, info); 3655 3656 /* Commit what we have for now. */ 3657 rb_end_commit(cpu_buffer); 3658 /* rb_end_commit() decs committing */ 3659 local_inc(&cpu_buffer->committing); 3660 3661 /* fail and let the caller try again */ 3662 return ERR_PTR(-EAGAIN); 3663 3664 out_reset: 3665 /* reset write */ 3666 rb_reset_tail(cpu_buffer, tail, info); 3667 3668 return NULL; 3669 } 3670 3671 /* Slow path */ 3672 static struct ring_buffer_event * 3673 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3674 struct ring_buffer_event *event, u64 delta, bool abs) 3675 { 3676 if (abs) 3677 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3678 else 3679 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3680 3681 /* Not the first event on the page, or not delta? */ 3682 if (abs || rb_event_index(cpu_buffer, event)) { 3683 event->time_delta = delta & TS_MASK; 3684 event->array[0] = delta >> TS_SHIFT; 3685 } else { 3686 /* nope, just zero it */ 3687 event->time_delta = 0; 3688 event->array[0] = 0; 3689 } 3690 3691 return skip_time_extend(event); 3692 } 3693 3694 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3695 static inline bool sched_clock_stable(void) 3696 { 3697 return true; 3698 } 3699 #endif 3700 3701 static void 3702 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3703 struct rb_event_info *info) 3704 { 3705 u64 write_stamp; 3706 3707 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3708 (unsigned long long)info->delta, 3709 (unsigned long long)info->ts, 3710 (unsigned long long)info->before, 3711 (unsigned long long)info->after, 3712 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3713 sched_clock_stable() ? "" : 3714 "If you just came from a suspend/resume,\n" 3715 "please switch to the trace global clock:\n" 3716 " echo global > /sys/kernel/tracing/trace_clock\n" 3717 "or add trace_clock=global to the kernel command line\n"); 3718 } 3719 3720 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3721 struct ring_buffer_event **event, 3722 struct rb_event_info *info, 3723 u64 *delta, 3724 unsigned int *length) 3725 { 3726 bool abs = info->add_timestamp & 3727 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3728 3729 if (unlikely(info->delta > (1ULL << 59))) { 3730 /* 3731 * Some timers can use more than 59 bits, and when a timestamp 3732 * is added to the buffer, it will lose those bits. 3733 */ 3734 if (abs && (info->ts & TS_MSB)) { 3735 info->delta &= ABS_TS_MASK; 3736 3737 /* did the clock go backwards */ 3738 } else if (info->before == info->after && info->before > info->ts) { 3739 /* not interrupted */ 3740 static int once; 3741 3742 /* 3743 * This is possible with a recalibrating of the TSC. 3744 * Do not produce a call stack, but just report it. 3745 */ 3746 if (!once) { 3747 once++; 3748 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3749 info->before, info->ts); 3750 } 3751 } else 3752 rb_check_timestamp(cpu_buffer, info); 3753 if (!abs) 3754 info->delta = 0; 3755 } 3756 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3757 *length -= RB_LEN_TIME_EXTEND; 3758 *delta = 0; 3759 } 3760 3761 /** 3762 * rb_update_event - update event type and data 3763 * @cpu_buffer: The per cpu buffer of the @event 3764 * @event: the event to update 3765 * @info: The info to update the @event with (contains length and delta) 3766 * 3767 * Update the type and data fields of the @event. The length 3768 * is the actual size that is written to the ring buffer, 3769 * and with this, we can determine what to place into the 3770 * data field. 3771 */ 3772 static void 3773 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3774 struct ring_buffer_event *event, 3775 struct rb_event_info *info) 3776 { 3777 unsigned length = info->length; 3778 u64 delta = info->delta; 3779 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3780 3781 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3782 cpu_buffer->event_stamp[nest] = info->ts; 3783 3784 /* 3785 * If we need to add a timestamp, then we 3786 * add it to the start of the reserved space. 3787 */ 3788 if (unlikely(info->add_timestamp)) 3789 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3790 3791 event->time_delta = delta; 3792 length -= RB_EVNT_HDR_SIZE; 3793 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3794 event->type_len = 0; 3795 event->array[0] = length; 3796 } else 3797 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3798 } 3799 3800 static unsigned rb_calculate_event_length(unsigned length) 3801 { 3802 struct ring_buffer_event event; /* Used only for sizeof array */ 3803 3804 /* zero length can cause confusions */ 3805 if (!length) 3806 length++; 3807 3808 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3809 length += sizeof(event.array[0]); 3810 3811 length += RB_EVNT_HDR_SIZE; 3812 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3813 3814 /* 3815 * In case the time delta is larger than the 27 bits for it 3816 * in the header, we need to add a timestamp. If another 3817 * event comes in when trying to discard this one to increase 3818 * the length, then the timestamp will be added in the allocated 3819 * space of this event. If length is bigger than the size needed 3820 * for the TIME_EXTEND, then padding has to be used. The events 3821 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3822 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3823 * As length is a multiple of 4, we only need to worry if it 3824 * is 12 (RB_LEN_TIME_EXTEND + 4). 3825 */ 3826 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3827 length += RB_ALIGNMENT; 3828 3829 return length; 3830 } 3831 3832 static inline bool 3833 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3834 struct ring_buffer_event *event) 3835 { 3836 unsigned long new_index, old_index; 3837 struct buffer_page *bpage; 3838 unsigned long addr; 3839 3840 new_index = rb_event_index(cpu_buffer, event); 3841 old_index = new_index + rb_event_ts_length(event); 3842 addr = (unsigned long)event; 3843 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3844 3845 bpage = READ_ONCE(cpu_buffer->tail_page); 3846 3847 /* 3848 * Make sure the tail_page is still the same and 3849 * the next write location is the end of this event 3850 */ 3851 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3852 unsigned long write_mask = 3853 local_read(&bpage->write) & ~RB_WRITE_MASK; 3854 unsigned long event_length = rb_event_length(event); 3855 3856 /* 3857 * For the before_stamp to be different than the write_stamp 3858 * to make sure that the next event adds an absolute 3859 * value and does not rely on the saved write stamp, which 3860 * is now going to be bogus. 3861 * 3862 * By setting the before_stamp to zero, the next event 3863 * is not going to use the write_stamp and will instead 3864 * create an absolute timestamp. This means there's no 3865 * reason to update the wirte_stamp! 3866 */ 3867 rb_time_set(&cpu_buffer->before_stamp, 0); 3868 3869 /* 3870 * If an event were to come in now, it would see that the 3871 * write_stamp and the before_stamp are different, and assume 3872 * that this event just added itself before updating 3873 * the write stamp. The interrupting event will fix the 3874 * write stamp for us, and use an absolute timestamp. 3875 */ 3876 3877 /* 3878 * This is on the tail page. It is possible that 3879 * a write could come in and move the tail page 3880 * and write to the next page. That is fine 3881 * because we just shorten what is on this page. 3882 */ 3883 old_index += write_mask; 3884 new_index += write_mask; 3885 3886 /* caution: old_index gets updated on cmpxchg failure */ 3887 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3888 /* update counters */ 3889 local_sub(event_length, &cpu_buffer->entries_bytes); 3890 return true; 3891 } 3892 } 3893 3894 /* could not discard */ 3895 return false; 3896 } 3897 3898 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3899 { 3900 local_inc(&cpu_buffer->committing); 3901 local_inc(&cpu_buffer->commits); 3902 } 3903 3904 static __always_inline void 3905 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3906 { 3907 unsigned long max_count; 3908 3909 /* 3910 * We only race with interrupts and NMIs on this CPU. 3911 * If we own the commit event, then we can commit 3912 * all others that interrupted us, since the interruptions 3913 * are in stack format (they finish before they come 3914 * back to us). This allows us to do a simple loop to 3915 * assign the commit to the tail. 3916 */ 3917 again: 3918 max_count = cpu_buffer->nr_pages * 100; 3919 3920 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3921 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3922 return; 3923 if (RB_WARN_ON(cpu_buffer, 3924 rb_is_reader_page(cpu_buffer->tail_page))) 3925 return; 3926 /* 3927 * No need for a memory barrier here, as the update 3928 * of the tail_page did it for this page. 3929 */ 3930 local_set(&cpu_buffer->commit_page->page->commit, 3931 rb_page_write(cpu_buffer->commit_page)); 3932 rb_inc_page(&cpu_buffer->commit_page); 3933 if (cpu_buffer->ring_meta) { 3934 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3935 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3936 } 3937 /* add barrier to keep gcc from optimizing too much */ 3938 barrier(); 3939 } 3940 while (rb_commit_index(cpu_buffer) != 3941 rb_page_write(cpu_buffer->commit_page)) { 3942 3943 /* Make sure the readers see the content of what is committed. */ 3944 smp_wmb(); 3945 local_set(&cpu_buffer->commit_page->page->commit, 3946 rb_page_write(cpu_buffer->commit_page)); 3947 RB_WARN_ON(cpu_buffer, 3948 local_read(&cpu_buffer->commit_page->page->commit) & 3949 ~RB_WRITE_MASK); 3950 barrier(); 3951 } 3952 3953 /* again, keep gcc from optimizing */ 3954 barrier(); 3955 3956 /* 3957 * If an interrupt came in just after the first while loop 3958 * and pushed the tail page forward, we will be left with 3959 * a dangling commit that will never go forward. 3960 */ 3961 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3962 goto again; 3963 } 3964 3965 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3966 { 3967 unsigned long commits; 3968 3969 if (RB_WARN_ON(cpu_buffer, 3970 !local_read(&cpu_buffer->committing))) 3971 return; 3972 3973 again: 3974 commits = local_read(&cpu_buffer->commits); 3975 /* synchronize with interrupts */ 3976 barrier(); 3977 if (local_read(&cpu_buffer->committing) == 1) 3978 rb_set_commit_to_write(cpu_buffer); 3979 3980 local_dec(&cpu_buffer->committing); 3981 3982 /* synchronize with interrupts */ 3983 barrier(); 3984 3985 /* 3986 * Need to account for interrupts coming in between the 3987 * updating of the commit page and the clearing of the 3988 * committing counter. 3989 */ 3990 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3991 !local_read(&cpu_buffer->committing)) { 3992 local_inc(&cpu_buffer->committing); 3993 goto again; 3994 } 3995 } 3996 3997 static inline void rb_event_discard(struct ring_buffer_event *event) 3998 { 3999 if (extended_time(event)) 4000 event = skip_time_extend(event); 4001 4002 /* array[0] holds the actual length for the discarded event */ 4003 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 4004 event->type_len = RINGBUF_TYPE_PADDING; 4005 /* time delta must be non zero */ 4006 if (!event->time_delta) 4007 event->time_delta = 1; 4008 } 4009 4010 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 4011 { 4012 local_inc(&cpu_buffer->entries); 4013 rb_end_commit(cpu_buffer); 4014 } 4015 4016 static __always_inline void 4017 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4018 { 4019 if (buffer->irq_work.waiters_pending) { 4020 buffer->irq_work.waiters_pending = false; 4021 /* irq_work_queue() supplies it's own memory barriers */ 4022 irq_work_queue(&buffer->irq_work.work); 4023 } 4024 4025 if (cpu_buffer->irq_work.waiters_pending) { 4026 cpu_buffer->irq_work.waiters_pending = false; 4027 /* irq_work_queue() supplies it's own memory barriers */ 4028 irq_work_queue(&cpu_buffer->irq_work.work); 4029 } 4030 4031 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4032 return; 4033 4034 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4035 return; 4036 4037 if (!cpu_buffer->irq_work.full_waiters_pending) 4038 return; 4039 4040 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4041 4042 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4043 return; 4044 4045 cpu_buffer->irq_work.wakeup_full = true; 4046 cpu_buffer->irq_work.full_waiters_pending = false; 4047 /* irq_work_queue() supplies it's own memory barriers */ 4048 irq_work_queue(&cpu_buffer->irq_work.work); 4049 } 4050 4051 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4052 # define do_ring_buffer_record_recursion() \ 4053 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4054 #else 4055 # define do_ring_buffer_record_recursion() do { } while (0) 4056 #endif 4057 4058 /* 4059 * The lock and unlock are done within a preempt disable section. 4060 * The current_context per_cpu variable can only be modified 4061 * by the current task between lock and unlock. But it can 4062 * be modified more than once via an interrupt. To pass this 4063 * information from the lock to the unlock without having to 4064 * access the 'in_interrupt()' functions again (which do show 4065 * a bit of overhead in something as critical as function tracing, 4066 * we use a bitmask trick. 4067 * 4068 * bit 1 = NMI context 4069 * bit 2 = IRQ context 4070 * bit 3 = SoftIRQ context 4071 * bit 4 = normal context. 4072 * 4073 * This works because this is the order of contexts that can 4074 * preempt other contexts. A SoftIRQ never preempts an IRQ 4075 * context. 4076 * 4077 * When the context is determined, the corresponding bit is 4078 * checked and set (if it was set, then a recursion of that context 4079 * happened). 4080 * 4081 * On unlock, we need to clear this bit. To do so, just subtract 4082 * 1 from the current_context and AND it to itself. 4083 * 4084 * (binary) 4085 * 101 - 1 = 100 4086 * 101 & 100 = 100 (clearing bit zero) 4087 * 4088 * 1010 - 1 = 1001 4089 * 1010 & 1001 = 1000 (clearing bit 1) 4090 * 4091 * The least significant bit can be cleared this way, and it 4092 * just so happens that it is the same bit corresponding to 4093 * the current context. 4094 * 4095 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4096 * is set when a recursion is detected at the current context, and if 4097 * the TRANSITION bit is already set, it will fail the recursion. 4098 * This is needed because there's a lag between the changing of 4099 * interrupt context and updating the preempt count. In this case, 4100 * a false positive will be found. To handle this, one extra recursion 4101 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4102 * bit is already set, then it is considered a recursion and the function 4103 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4104 * 4105 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4106 * to be cleared. Even if it wasn't the context that set it. That is, 4107 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4108 * is called before preempt_count() is updated, since the check will 4109 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4110 * NMI then comes in, it will set the NMI bit, but when the NMI code 4111 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4112 * and leave the NMI bit set. But this is fine, because the interrupt 4113 * code that set the TRANSITION bit will then clear the NMI bit when it 4114 * calls trace_recursive_unlock(). If another NMI comes in, it will 4115 * set the TRANSITION bit and continue. 4116 * 4117 * Note: The TRANSITION bit only handles a single transition between context. 4118 */ 4119 4120 static __always_inline bool 4121 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4122 { 4123 unsigned int val = cpu_buffer->current_context; 4124 int bit = interrupt_context_level(); 4125 4126 bit = RB_CTX_NORMAL - bit; 4127 4128 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4129 /* 4130 * It is possible that this was called by transitioning 4131 * between interrupt context, and preempt_count() has not 4132 * been updated yet. In this case, use the TRANSITION bit. 4133 */ 4134 bit = RB_CTX_TRANSITION; 4135 if (val & (1 << (bit + cpu_buffer->nest))) { 4136 do_ring_buffer_record_recursion(); 4137 return true; 4138 } 4139 } 4140 4141 val |= (1 << (bit + cpu_buffer->nest)); 4142 cpu_buffer->current_context = val; 4143 4144 return false; 4145 } 4146 4147 static __always_inline void 4148 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4149 { 4150 cpu_buffer->current_context &= 4151 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4152 } 4153 4154 /* The recursive locking above uses 5 bits */ 4155 #define NESTED_BITS 5 4156 4157 /** 4158 * ring_buffer_nest_start - Allow to trace while nested 4159 * @buffer: The ring buffer to modify 4160 * 4161 * The ring buffer has a safety mechanism to prevent recursion. 4162 * But there may be a case where a trace needs to be done while 4163 * tracing something else. In this case, calling this function 4164 * will allow this function to nest within a currently active 4165 * ring_buffer_lock_reserve(). 4166 * 4167 * Call this function before calling another ring_buffer_lock_reserve() and 4168 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4169 */ 4170 void ring_buffer_nest_start(struct trace_buffer *buffer) 4171 { 4172 struct ring_buffer_per_cpu *cpu_buffer; 4173 int cpu; 4174 4175 /* Enabled by ring_buffer_nest_end() */ 4176 preempt_disable_notrace(); 4177 cpu = raw_smp_processor_id(); 4178 cpu_buffer = buffer->buffers[cpu]; 4179 /* This is the shift value for the above recursive locking */ 4180 cpu_buffer->nest += NESTED_BITS; 4181 } 4182 4183 /** 4184 * ring_buffer_nest_end - Allow to trace while nested 4185 * @buffer: The ring buffer to modify 4186 * 4187 * Must be called after ring_buffer_nest_start() and after the 4188 * ring_buffer_unlock_commit(). 4189 */ 4190 void ring_buffer_nest_end(struct trace_buffer *buffer) 4191 { 4192 struct ring_buffer_per_cpu *cpu_buffer; 4193 int cpu; 4194 4195 /* disabled by ring_buffer_nest_start() */ 4196 cpu = raw_smp_processor_id(); 4197 cpu_buffer = buffer->buffers[cpu]; 4198 /* This is the shift value for the above recursive locking */ 4199 cpu_buffer->nest -= NESTED_BITS; 4200 preempt_enable_notrace(); 4201 } 4202 4203 /** 4204 * ring_buffer_unlock_commit - commit a reserved 4205 * @buffer: The buffer to commit to 4206 * 4207 * This commits the data to the ring buffer, and releases any locks held. 4208 * 4209 * Must be paired with ring_buffer_lock_reserve. 4210 */ 4211 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4212 { 4213 struct ring_buffer_per_cpu *cpu_buffer; 4214 int cpu = raw_smp_processor_id(); 4215 4216 cpu_buffer = buffer->buffers[cpu]; 4217 4218 rb_commit(cpu_buffer); 4219 4220 rb_wakeups(buffer, cpu_buffer); 4221 4222 trace_recursive_unlock(cpu_buffer); 4223 4224 preempt_enable_notrace(); 4225 4226 return 0; 4227 } 4228 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4229 4230 /* Special value to validate all deltas on a page. */ 4231 #define CHECK_FULL_PAGE 1L 4232 4233 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4234 4235 static const char *show_irq_str(int bits) 4236 { 4237 static const char * type[] = { 4238 ".", // 0 4239 "s", // 1 4240 "h", // 2 4241 "Hs", // 3 4242 "n", // 4 4243 "Ns", // 5 4244 "Nh", // 6 4245 "NHs", // 7 4246 }; 4247 4248 return type[bits]; 4249 } 4250 4251 /* Assume this is a trace event */ 4252 static const char *show_flags(struct ring_buffer_event *event) 4253 { 4254 struct trace_entry *entry; 4255 int bits = 0; 4256 4257 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4258 return "X"; 4259 4260 entry = ring_buffer_event_data(event); 4261 4262 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4263 bits |= 1; 4264 4265 if (entry->flags & TRACE_FLAG_HARDIRQ) 4266 bits |= 2; 4267 4268 if (entry->flags & TRACE_FLAG_NMI) 4269 bits |= 4; 4270 4271 return show_irq_str(bits); 4272 } 4273 4274 static const char *show_irq(struct ring_buffer_event *event) 4275 { 4276 struct trace_entry *entry; 4277 4278 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4279 return ""; 4280 4281 entry = ring_buffer_event_data(event); 4282 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4283 return "d"; 4284 return ""; 4285 } 4286 4287 static const char *show_interrupt_level(void) 4288 { 4289 unsigned long pc = preempt_count(); 4290 unsigned char level = 0; 4291 4292 if (pc & SOFTIRQ_OFFSET) 4293 level |= 1; 4294 4295 if (pc & HARDIRQ_MASK) 4296 level |= 2; 4297 4298 if (pc & NMI_MASK) 4299 level |= 4; 4300 4301 return show_irq_str(level); 4302 } 4303 4304 static void dump_buffer_page(struct buffer_data_page *bpage, 4305 struct rb_event_info *info, 4306 unsigned long tail) 4307 { 4308 struct ring_buffer_event *event; 4309 u64 ts, delta; 4310 int e; 4311 4312 ts = bpage->time_stamp; 4313 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4314 4315 for (e = 0; e < tail; e += rb_event_length(event)) { 4316 4317 event = (struct ring_buffer_event *)(bpage->data + e); 4318 4319 switch (event->type_len) { 4320 4321 case RINGBUF_TYPE_TIME_EXTEND: 4322 delta = rb_event_time_stamp(event); 4323 ts += delta; 4324 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4325 e, ts, delta); 4326 break; 4327 4328 case RINGBUF_TYPE_TIME_STAMP: 4329 delta = rb_event_time_stamp(event); 4330 ts = rb_fix_abs_ts(delta, ts); 4331 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4332 e, ts, delta); 4333 break; 4334 4335 case RINGBUF_TYPE_PADDING: 4336 ts += event->time_delta; 4337 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4338 e, ts, event->time_delta); 4339 break; 4340 4341 case RINGBUF_TYPE_DATA: 4342 ts += event->time_delta; 4343 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4344 e, ts, event->time_delta, 4345 show_flags(event), show_irq(event)); 4346 break; 4347 4348 default: 4349 break; 4350 } 4351 } 4352 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4353 } 4354 4355 static DEFINE_PER_CPU(atomic_t, checking); 4356 static atomic_t ts_dump; 4357 4358 #define buffer_warn_return(fmt, ...) \ 4359 do { \ 4360 /* If another report is happening, ignore this one */ \ 4361 if (atomic_inc_return(&ts_dump) != 1) { \ 4362 atomic_dec(&ts_dump); \ 4363 goto out; \ 4364 } \ 4365 atomic_inc(&cpu_buffer->record_disabled); \ 4366 pr_warn(fmt, ##__VA_ARGS__); \ 4367 dump_buffer_page(bpage, info, tail); \ 4368 atomic_dec(&ts_dump); \ 4369 /* There's some cases in boot up that this can happen */ \ 4370 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4371 /* Do not re-enable checking */ \ 4372 return; \ 4373 } while (0) 4374 4375 /* 4376 * Check if the current event time stamp matches the deltas on 4377 * the buffer page. 4378 */ 4379 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4380 struct rb_event_info *info, 4381 unsigned long tail) 4382 { 4383 struct buffer_data_page *bpage; 4384 u64 ts, delta; 4385 bool full = false; 4386 int ret; 4387 4388 bpage = info->tail_page->page; 4389 4390 if (tail == CHECK_FULL_PAGE) { 4391 full = true; 4392 tail = local_read(&bpage->commit); 4393 } else if (info->add_timestamp & 4394 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4395 /* Ignore events with absolute time stamps */ 4396 return; 4397 } 4398 4399 /* 4400 * Do not check the first event (skip possible extends too). 4401 * Also do not check if previous events have not been committed. 4402 */ 4403 if (tail <= 8 || tail > local_read(&bpage->commit)) 4404 return; 4405 4406 /* 4407 * If this interrupted another event, 4408 */ 4409 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4410 goto out; 4411 4412 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4413 if (ret < 0) { 4414 if (delta < ts) { 4415 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4416 cpu_buffer->cpu, ts, delta); 4417 goto out; 4418 } 4419 } 4420 if ((full && ts > info->ts) || 4421 (!full && ts + info->delta != info->ts)) { 4422 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4423 cpu_buffer->cpu, 4424 ts + info->delta, info->ts, info->delta, 4425 info->before, info->after, 4426 full ? " (full)" : "", show_interrupt_level()); 4427 } 4428 out: 4429 atomic_dec(this_cpu_ptr(&checking)); 4430 } 4431 #else 4432 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4433 struct rb_event_info *info, 4434 unsigned long tail) 4435 { 4436 } 4437 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4438 4439 static struct ring_buffer_event * 4440 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4441 struct rb_event_info *info) 4442 { 4443 struct ring_buffer_event *event; 4444 struct buffer_page *tail_page; 4445 unsigned long tail, write, w; 4446 4447 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4448 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4449 4450 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4451 barrier(); 4452 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4453 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4454 barrier(); 4455 info->ts = rb_time_stamp(cpu_buffer->buffer); 4456 4457 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4458 info->delta = info->ts; 4459 } else { 4460 /* 4461 * If interrupting an event time update, we may need an 4462 * absolute timestamp. 4463 * Don't bother if this is the start of a new page (w == 0). 4464 */ 4465 if (!w) { 4466 /* Use the sub-buffer timestamp */ 4467 info->delta = 0; 4468 } else if (unlikely(info->before != info->after)) { 4469 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4470 info->length += RB_LEN_TIME_EXTEND; 4471 } else { 4472 info->delta = info->ts - info->after; 4473 if (unlikely(test_time_stamp(info->delta))) { 4474 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4475 info->length += RB_LEN_TIME_EXTEND; 4476 } 4477 } 4478 } 4479 4480 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4481 4482 /*C*/ write = local_add_return(info->length, &tail_page->write); 4483 4484 /* set write to only the index of the write */ 4485 write &= RB_WRITE_MASK; 4486 4487 tail = write - info->length; 4488 4489 /* See if we shot pass the end of this buffer page */ 4490 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4491 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4492 return rb_move_tail(cpu_buffer, tail, info); 4493 } 4494 4495 if (likely(tail == w)) { 4496 /* Nothing interrupted us between A and C */ 4497 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4498 /* 4499 * If something came in between C and D, the write stamp 4500 * may now not be in sync. But that's fine as the before_stamp 4501 * will be different and then next event will just be forced 4502 * to use an absolute timestamp. 4503 */ 4504 if (likely(!(info->add_timestamp & 4505 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4506 /* This did not interrupt any time update */ 4507 info->delta = info->ts - info->after; 4508 else 4509 /* Just use full timestamp for interrupting event */ 4510 info->delta = info->ts; 4511 check_buffer(cpu_buffer, info, tail); 4512 } else { 4513 u64 ts; 4514 /* SLOW PATH - Interrupted between A and C */ 4515 4516 /* Save the old before_stamp */ 4517 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4518 4519 /* 4520 * Read a new timestamp and update the before_stamp to make 4521 * the next event after this one force using an absolute 4522 * timestamp. This is in case an interrupt were to come in 4523 * between E and F. 4524 */ 4525 ts = rb_time_stamp(cpu_buffer->buffer); 4526 rb_time_set(&cpu_buffer->before_stamp, ts); 4527 4528 barrier(); 4529 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4530 barrier(); 4531 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4532 info->after == info->before && info->after < ts) { 4533 /* 4534 * Nothing came after this event between C and F, it is 4535 * safe to use info->after for the delta as it 4536 * matched info->before and is still valid. 4537 */ 4538 info->delta = ts - info->after; 4539 } else { 4540 /* 4541 * Interrupted between C and F: 4542 * Lost the previous events time stamp. Just set the 4543 * delta to zero, and this will be the same time as 4544 * the event this event interrupted. And the events that 4545 * came after this will still be correct (as they would 4546 * have built their delta on the previous event. 4547 */ 4548 info->delta = 0; 4549 } 4550 info->ts = ts; 4551 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4552 } 4553 4554 /* 4555 * If this is the first commit on the page, then it has the same 4556 * timestamp as the page itself. 4557 */ 4558 if (unlikely(!tail && !(info->add_timestamp & 4559 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4560 info->delta = 0; 4561 4562 /* We reserved something on the buffer */ 4563 4564 event = __rb_page_index(tail_page, tail); 4565 rb_update_event(cpu_buffer, event, info); 4566 4567 local_inc(&tail_page->entries); 4568 4569 /* 4570 * If this is the first commit on the page, then update 4571 * its timestamp. 4572 */ 4573 if (unlikely(!tail)) 4574 tail_page->page->time_stamp = info->ts; 4575 4576 /* account for these added bytes */ 4577 local_add(info->length, &cpu_buffer->entries_bytes); 4578 4579 return event; 4580 } 4581 4582 static __always_inline struct ring_buffer_event * 4583 rb_reserve_next_event(struct trace_buffer *buffer, 4584 struct ring_buffer_per_cpu *cpu_buffer, 4585 unsigned long length) 4586 { 4587 struct ring_buffer_event *event; 4588 struct rb_event_info info; 4589 int nr_loops = 0; 4590 int add_ts_default; 4591 4592 /* 4593 * ring buffer does cmpxchg as well as atomic64 operations 4594 * (which some archs use locking for atomic64), make sure this 4595 * is safe in NMI context 4596 */ 4597 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4598 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4599 (unlikely(in_nmi()))) { 4600 return NULL; 4601 } 4602 4603 rb_start_commit(cpu_buffer); 4604 /* The commit page can not change after this */ 4605 4606 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4607 /* 4608 * Due to the ability to swap a cpu buffer from a buffer 4609 * it is possible it was swapped before we committed. 4610 * (committing stops a swap). We check for it here and 4611 * if it happened, we have to fail the write. 4612 */ 4613 barrier(); 4614 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4615 local_dec(&cpu_buffer->committing); 4616 local_dec(&cpu_buffer->commits); 4617 return NULL; 4618 } 4619 #endif 4620 4621 info.length = rb_calculate_event_length(length); 4622 4623 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4624 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4625 info.length += RB_LEN_TIME_EXTEND; 4626 if (info.length > cpu_buffer->buffer->max_data_size) 4627 goto out_fail; 4628 } else { 4629 add_ts_default = RB_ADD_STAMP_NONE; 4630 } 4631 4632 again: 4633 info.add_timestamp = add_ts_default; 4634 info.delta = 0; 4635 4636 /* 4637 * We allow for interrupts to reenter here and do a trace. 4638 * If one does, it will cause this original code to loop 4639 * back here. Even with heavy interrupts happening, this 4640 * should only happen a few times in a row. If this happens 4641 * 1000 times in a row, there must be either an interrupt 4642 * storm or we have something buggy. 4643 * Bail! 4644 */ 4645 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4646 goto out_fail; 4647 4648 event = __rb_reserve_next(cpu_buffer, &info); 4649 4650 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4651 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4652 info.length -= RB_LEN_TIME_EXTEND; 4653 goto again; 4654 } 4655 4656 if (likely(event)) 4657 return event; 4658 out_fail: 4659 rb_end_commit(cpu_buffer); 4660 return NULL; 4661 } 4662 4663 /** 4664 * ring_buffer_lock_reserve - reserve a part of the buffer 4665 * @buffer: the ring buffer to reserve from 4666 * @length: the length of the data to reserve (excluding event header) 4667 * 4668 * Returns a reserved event on the ring buffer to copy directly to. 4669 * The user of this interface will need to get the body to write into 4670 * and can use the ring_buffer_event_data() interface. 4671 * 4672 * The length is the length of the data needed, not the event length 4673 * which also includes the event header. 4674 * 4675 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4676 * If NULL is returned, then nothing has been allocated or locked. 4677 */ 4678 struct ring_buffer_event * 4679 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4680 { 4681 struct ring_buffer_per_cpu *cpu_buffer; 4682 struct ring_buffer_event *event; 4683 int cpu; 4684 4685 /* If we are tracing schedule, we don't want to recurse */ 4686 preempt_disable_notrace(); 4687 4688 if (unlikely(atomic_read(&buffer->record_disabled))) 4689 goto out; 4690 4691 cpu = raw_smp_processor_id(); 4692 4693 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4694 goto out; 4695 4696 cpu_buffer = buffer->buffers[cpu]; 4697 4698 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4699 goto out; 4700 4701 if (unlikely(length > buffer->max_data_size)) 4702 goto out; 4703 4704 if (unlikely(trace_recursive_lock(cpu_buffer))) 4705 goto out; 4706 4707 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4708 if (!event) 4709 goto out_unlock; 4710 4711 return event; 4712 4713 out_unlock: 4714 trace_recursive_unlock(cpu_buffer); 4715 out: 4716 preempt_enable_notrace(); 4717 return NULL; 4718 } 4719 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4720 4721 /* 4722 * Decrement the entries to the page that an event is on. 4723 * The event does not even need to exist, only the pointer 4724 * to the page it is on. This may only be called before the commit 4725 * takes place. 4726 */ 4727 static inline void 4728 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4729 struct ring_buffer_event *event) 4730 { 4731 unsigned long addr = (unsigned long)event; 4732 struct buffer_page *bpage = cpu_buffer->commit_page; 4733 struct buffer_page *start; 4734 4735 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4736 4737 /* Do the likely case first */ 4738 if (likely(bpage->page == (void *)addr)) { 4739 local_dec(&bpage->entries); 4740 return; 4741 } 4742 4743 /* 4744 * Because the commit page may be on the reader page we 4745 * start with the next page and check the end loop there. 4746 */ 4747 rb_inc_page(&bpage); 4748 start = bpage; 4749 do { 4750 if (bpage->page == (void *)addr) { 4751 local_dec(&bpage->entries); 4752 return; 4753 } 4754 rb_inc_page(&bpage); 4755 } while (bpage != start); 4756 4757 /* commit not part of this buffer?? */ 4758 RB_WARN_ON(cpu_buffer, 1); 4759 } 4760 4761 /** 4762 * ring_buffer_discard_commit - discard an event that has not been committed 4763 * @buffer: the ring buffer 4764 * @event: non committed event to discard 4765 * 4766 * Sometimes an event that is in the ring buffer needs to be ignored. 4767 * This function lets the user discard an event in the ring buffer 4768 * and then that event will not be read later. 4769 * 4770 * This function only works if it is called before the item has been 4771 * committed. It will try to free the event from the ring buffer 4772 * if another event has not been added behind it. 4773 * 4774 * If another event has been added behind it, it will set the event 4775 * up as discarded, and perform the commit. 4776 * 4777 * If this function is called, do not call ring_buffer_unlock_commit on 4778 * the event. 4779 */ 4780 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4781 struct ring_buffer_event *event) 4782 { 4783 struct ring_buffer_per_cpu *cpu_buffer; 4784 int cpu; 4785 4786 /* The event is discarded regardless */ 4787 rb_event_discard(event); 4788 4789 cpu = smp_processor_id(); 4790 cpu_buffer = buffer->buffers[cpu]; 4791 4792 /* 4793 * This must only be called if the event has not been 4794 * committed yet. Thus we can assume that preemption 4795 * is still disabled. 4796 */ 4797 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4798 4799 rb_decrement_entry(cpu_buffer, event); 4800 rb_try_to_discard(cpu_buffer, event); 4801 rb_end_commit(cpu_buffer); 4802 4803 trace_recursive_unlock(cpu_buffer); 4804 4805 preempt_enable_notrace(); 4806 4807 } 4808 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4809 4810 /** 4811 * ring_buffer_write - write data to the buffer without reserving 4812 * @buffer: The ring buffer to write to. 4813 * @length: The length of the data being written (excluding the event header) 4814 * @data: The data to write to the buffer. 4815 * 4816 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4817 * one function. If you already have the data to write to the buffer, it 4818 * may be easier to simply call this function. 4819 * 4820 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4821 * and not the length of the event which would hold the header. 4822 */ 4823 int ring_buffer_write(struct trace_buffer *buffer, 4824 unsigned long length, 4825 void *data) 4826 { 4827 struct ring_buffer_per_cpu *cpu_buffer; 4828 struct ring_buffer_event *event; 4829 void *body; 4830 int ret = -EBUSY; 4831 int cpu; 4832 4833 guard(preempt_notrace)(); 4834 4835 if (atomic_read(&buffer->record_disabled)) 4836 return -EBUSY; 4837 4838 cpu = raw_smp_processor_id(); 4839 4840 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4841 return -EBUSY; 4842 4843 cpu_buffer = buffer->buffers[cpu]; 4844 4845 if (atomic_read(&cpu_buffer->record_disabled)) 4846 return -EBUSY; 4847 4848 if (length > buffer->max_data_size) 4849 return -EBUSY; 4850 4851 if (unlikely(trace_recursive_lock(cpu_buffer))) 4852 return -EBUSY; 4853 4854 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4855 if (!event) 4856 goto out_unlock; 4857 4858 body = rb_event_data(event); 4859 4860 memcpy(body, data, length); 4861 4862 rb_commit(cpu_buffer); 4863 4864 rb_wakeups(buffer, cpu_buffer); 4865 4866 ret = 0; 4867 4868 out_unlock: 4869 trace_recursive_unlock(cpu_buffer); 4870 return ret; 4871 } 4872 EXPORT_SYMBOL_GPL(ring_buffer_write); 4873 4874 /* 4875 * The total entries in the ring buffer is the running counter 4876 * of entries entered into the ring buffer, minus the sum of 4877 * the entries read from the ring buffer and the number of 4878 * entries that were overwritten. 4879 */ 4880 static inline unsigned long 4881 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4882 { 4883 return local_read(&cpu_buffer->entries) - 4884 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4885 } 4886 4887 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4888 { 4889 return !rb_num_of_entries(cpu_buffer); 4890 } 4891 4892 /** 4893 * ring_buffer_record_disable - stop all writes into the buffer 4894 * @buffer: The ring buffer to stop writes to. 4895 * 4896 * This prevents all writes to the buffer. Any attempt to write 4897 * to the buffer after this will fail and return NULL. 4898 * 4899 * The caller should call synchronize_rcu() after this. 4900 */ 4901 void ring_buffer_record_disable(struct trace_buffer *buffer) 4902 { 4903 atomic_inc(&buffer->record_disabled); 4904 } 4905 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4906 4907 /** 4908 * ring_buffer_record_enable - enable writes to the buffer 4909 * @buffer: The ring buffer to enable writes 4910 * 4911 * Note, multiple disables will need the same number of enables 4912 * to truly enable the writing (much like preempt_disable). 4913 */ 4914 void ring_buffer_record_enable(struct trace_buffer *buffer) 4915 { 4916 atomic_dec(&buffer->record_disabled); 4917 } 4918 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4919 4920 /** 4921 * ring_buffer_record_off - stop all writes into the buffer 4922 * @buffer: The ring buffer to stop writes to. 4923 * 4924 * This prevents all writes to the buffer. Any attempt to write 4925 * to the buffer after this will fail and return NULL. 4926 * 4927 * This is different than ring_buffer_record_disable() as 4928 * it works like an on/off switch, where as the disable() version 4929 * must be paired with a enable(). 4930 */ 4931 void ring_buffer_record_off(struct trace_buffer *buffer) 4932 { 4933 unsigned int rd; 4934 unsigned int new_rd; 4935 4936 rd = atomic_read(&buffer->record_disabled); 4937 do { 4938 new_rd = rd | RB_BUFFER_OFF; 4939 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4940 } 4941 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4942 4943 /** 4944 * ring_buffer_record_on - restart writes into the buffer 4945 * @buffer: The ring buffer to start writes to. 4946 * 4947 * This enables all writes to the buffer that was disabled by 4948 * ring_buffer_record_off(). 4949 * 4950 * This is different than ring_buffer_record_enable() as 4951 * it works like an on/off switch, where as the enable() version 4952 * must be paired with a disable(). 4953 */ 4954 void ring_buffer_record_on(struct trace_buffer *buffer) 4955 { 4956 unsigned int rd; 4957 unsigned int new_rd; 4958 4959 rd = atomic_read(&buffer->record_disabled); 4960 do { 4961 new_rd = rd & ~RB_BUFFER_OFF; 4962 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4963 } 4964 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4965 4966 /** 4967 * ring_buffer_record_is_on - return true if the ring buffer can write 4968 * @buffer: The ring buffer to see if write is enabled 4969 * 4970 * Returns true if the ring buffer is in a state that it accepts writes. 4971 */ 4972 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4973 { 4974 return !atomic_read(&buffer->record_disabled); 4975 } 4976 4977 /** 4978 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4979 * @buffer: The ring buffer to see if write is set enabled 4980 * 4981 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4982 * Note that this does NOT mean it is in a writable state. 4983 * 4984 * It may return true when the ring buffer has been disabled by 4985 * ring_buffer_record_disable(), as that is a temporary disabling of 4986 * the ring buffer. 4987 */ 4988 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4989 { 4990 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4991 } 4992 4993 /** 4994 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 4995 * @buffer: The ring buffer to see if write is enabled 4996 * @cpu: The CPU to test if the ring buffer can write too 4997 * 4998 * Returns true if the ring buffer is in a state that it accepts writes 4999 * for a particular CPU. 5000 */ 5001 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 5002 { 5003 struct ring_buffer_per_cpu *cpu_buffer; 5004 5005 cpu_buffer = buffer->buffers[cpu]; 5006 5007 return ring_buffer_record_is_set_on(buffer) && 5008 !atomic_read(&cpu_buffer->record_disabled); 5009 } 5010 5011 /** 5012 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 5013 * @buffer: The ring buffer to stop writes to. 5014 * @cpu: The CPU buffer to stop 5015 * 5016 * This prevents all writes to the buffer. Any attempt to write 5017 * to the buffer after this will fail and return NULL. 5018 * 5019 * The caller should call synchronize_rcu() after this. 5020 */ 5021 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5022 { 5023 struct ring_buffer_per_cpu *cpu_buffer; 5024 5025 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5026 return; 5027 5028 cpu_buffer = buffer->buffers[cpu]; 5029 atomic_inc(&cpu_buffer->record_disabled); 5030 } 5031 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5032 5033 /** 5034 * ring_buffer_record_enable_cpu - enable writes to the buffer 5035 * @buffer: The ring buffer to enable writes 5036 * @cpu: The CPU to enable. 5037 * 5038 * Note, multiple disables will need the same number of enables 5039 * to truly enable the writing (much like preempt_disable). 5040 */ 5041 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5042 { 5043 struct ring_buffer_per_cpu *cpu_buffer; 5044 5045 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5046 return; 5047 5048 cpu_buffer = buffer->buffers[cpu]; 5049 atomic_dec(&cpu_buffer->record_disabled); 5050 } 5051 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5052 5053 /** 5054 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5055 * @buffer: The ring buffer 5056 * @cpu: The per CPU buffer to read from. 5057 */ 5058 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5059 { 5060 unsigned long flags; 5061 struct ring_buffer_per_cpu *cpu_buffer; 5062 struct buffer_page *bpage; 5063 u64 ret = 0; 5064 5065 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5066 return 0; 5067 5068 cpu_buffer = buffer->buffers[cpu]; 5069 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5070 /* 5071 * if the tail is on reader_page, oldest time stamp is on the reader 5072 * page 5073 */ 5074 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5075 bpage = cpu_buffer->reader_page; 5076 else 5077 bpage = rb_set_head_page(cpu_buffer); 5078 if (bpage) 5079 ret = bpage->page->time_stamp; 5080 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5081 5082 return ret; 5083 } 5084 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5085 5086 /** 5087 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5088 * @buffer: The ring buffer 5089 * @cpu: The per CPU buffer to read from. 5090 */ 5091 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5092 { 5093 struct ring_buffer_per_cpu *cpu_buffer; 5094 unsigned long ret; 5095 5096 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5097 return 0; 5098 5099 cpu_buffer = buffer->buffers[cpu]; 5100 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5101 5102 return ret; 5103 } 5104 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5105 5106 /** 5107 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5108 * @buffer: The ring buffer 5109 * @cpu: The per CPU buffer to get the entries from. 5110 */ 5111 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5112 { 5113 struct ring_buffer_per_cpu *cpu_buffer; 5114 5115 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5116 return 0; 5117 5118 cpu_buffer = buffer->buffers[cpu]; 5119 5120 return rb_num_of_entries(cpu_buffer); 5121 } 5122 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5123 5124 /** 5125 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5126 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5127 * @buffer: The ring buffer 5128 * @cpu: The per CPU buffer to get the number of overruns from 5129 */ 5130 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5131 { 5132 struct ring_buffer_per_cpu *cpu_buffer; 5133 unsigned long ret; 5134 5135 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5136 return 0; 5137 5138 cpu_buffer = buffer->buffers[cpu]; 5139 ret = local_read(&cpu_buffer->overrun); 5140 5141 return ret; 5142 } 5143 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5144 5145 /** 5146 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5147 * commits failing due to the buffer wrapping around while there are uncommitted 5148 * events, such as during an interrupt storm. 5149 * @buffer: The ring buffer 5150 * @cpu: The per CPU buffer to get the number of overruns from 5151 */ 5152 unsigned long 5153 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5154 { 5155 struct ring_buffer_per_cpu *cpu_buffer; 5156 unsigned long ret; 5157 5158 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5159 return 0; 5160 5161 cpu_buffer = buffer->buffers[cpu]; 5162 ret = local_read(&cpu_buffer->commit_overrun); 5163 5164 return ret; 5165 } 5166 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5167 5168 /** 5169 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5170 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5171 * @buffer: The ring buffer 5172 * @cpu: The per CPU buffer to get the number of overruns from 5173 */ 5174 unsigned long 5175 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5176 { 5177 struct ring_buffer_per_cpu *cpu_buffer; 5178 unsigned long ret; 5179 5180 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5181 return 0; 5182 5183 cpu_buffer = buffer->buffers[cpu]; 5184 ret = local_read(&cpu_buffer->dropped_events); 5185 5186 return ret; 5187 } 5188 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5189 5190 /** 5191 * ring_buffer_read_events_cpu - get the number of events successfully read 5192 * @buffer: The ring buffer 5193 * @cpu: The per CPU buffer to get the number of events read 5194 */ 5195 unsigned long 5196 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5197 { 5198 struct ring_buffer_per_cpu *cpu_buffer; 5199 5200 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5201 return 0; 5202 5203 cpu_buffer = buffer->buffers[cpu]; 5204 return cpu_buffer->read; 5205 } 5206 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5207 5208 /** 5209 * ring_buffer_entries - get the number of entries in a buffer 5210 * @buffer: The ring buffer 5211 * 5212 * Returns the total number of entries in the ring buffer 5213 * (all CPU entries) 5214 */ 5215 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5216 { 5217 struct ring_buffer_per_cpu *cpu_buffer; 5218 unsigned long entries = 0; 5219 int cpu; 5220 5221 /* if you care about this being correct, lock the buffer */ 5222 for_each_buffer_cpu(buffer, cpu) { 5223 cpu_buffer = buffer->buffers[cpu]; 5224 entries += rb_num_of_entries(cpu_buffer); 5225 } 5226 5227 return entries; 5228 } 5229 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5230 5231 /** 5232 * ring_buffer_overruns - get the number of overruns in buffer 5233 * @buffer: The ring buffer 5234 * 5235 * Returns the total number of overruns in the ring buffer 5236 * (all CPU entries) 5237 */ 5238 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5239 { 5240 struct ring_buffer_per_cpu *cpu_buffer; 5241 unsigned long overruns = 0; 5242 int cpu; 5243 5244 /* if you care about this being correct, lock the buffer */ 5245 for_each_buffer_cpu(buffer, cpu) { 5246 cpu_buffer = buffer->buffers[cpu]; 5247 overruns += local_read(&cpu_buffer->overrun); 5248 } 5249 5250 return overruns; 5251 } 5252 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5253 5254 static void rb_iter_reset(struct ring_buffer_iter *iter) 5255 { 5256 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5257 5258 /* Iterator usage is expected to have record disabled */ 5259 iter->head_page = cpu_buffer->reader_page; 5260 iter->head = cpu_buffer->reader_page->read; 5261 iter->next_event = iter->head; 5262 5263 iter->cache_reader_page = iter->head_page; 5264 iter->cache_read = cpu_buffer->read; 5265 iter->cache_pages_removed = cpu_buffer->pages_removed; 5266 5267 if (iter->head) { 5268 iter->read_stamp = cpu_buffer->read_stamp; 5269 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5270 } else { 5271 iter->read_stamp = iter->head_page->page->time_stamp; 5272 iter->page_stamp = iter->read_stamp; 5273 } 5274 } 5275 5276 /** 5277 * ring_buffer_iter_reset - reset an iterator 5278 * @iter: The iterator to reset 5279 * 5280 * Resets the iterator, so that it will start from the beginning 5281 * again. 5282 */ 5283 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5284 { 5285 struct ring_buffer_per_cpu *cpu_buffer; 5286 unsigned long flags; 5287 5288 if (!iter) 5289 return; 5290 5291 cpu_buffer = iter->cpu_buffer; 5292 5293 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5294 rb_iter_reset(iter); 5295 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5296 } 5297 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5298 5299 /** 5300 * ring_buffer_iter_empty - check if an iterator has no more to read 5301 * @iter: The iterator to check 5302 */ 5303 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5304 { 5305 struct ring_buffer_per_cpu *cpu_buffer; 5306 struct buffer_page *reader; 5307 struct buffer_page *head_page; 5308 struct buffer_page *commit_page; 5309 struct buffer_page *curr_commit_page; 5310 unsigned commit; 5311 u64 curr_commit_ts; 5312 u64 commit_ts; 5313 5314 cpu_buffer = iter->cpu_buffer; 5315 reader = cpu_buffer->reader_page; 5316 head_page = cpu_buffer->head_page; 5317 commit_page = READ_ONCE(cpu_buffer->commit_page); 5318 commit_ts = commit_page->page->time_stamp; 5319 5320 /* 5321 * When the writer goes across pages, it issues a cmpxchg which 5322 * is a mb(), which will synchronize with the rmb here. 5323 * (see rb_tail_page_update()) 5324 */ 5325 smp_rmb(); 5326 commit = rb_page_commit(commit_page); 5327 /* We want to make sure that the commit page doesn't change */ 5328 smp_rmb(); 5329 5330 /* Make sure commit page didn't change */ 5331 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5332 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5333 5334 /* If the commit page changed, then there's more data */ 5335 if (curr_commit_page != commit_page || 5336 curr_commit_ts != commit_ts) 5337 return 0; 5338 5339 /* Still racy, as it may return a false positive, but that's OK */ 5340 return ((iter->head_page == commit_page && iter->head >= commit) || 5341 (iter->head_page == reader && commit_page == head_page && 5342 head_page->read == commit && 5343 iter->head == rb_page_size(cpu_buffer->reader_page))); 5344 } 5345 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5346 5347 static void 5348 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5349 struct ring_buffer_event *event) 5350 { 5351 u64 delta; 5352 5353 switch (event->type_len) { 5354 case RINGBUF_TYPE_PADDING: 5355 return; 5356 5357 case RINGBUF_TYPE_TIME_EXTEND: 5358 delta = rb_event_time_stamp(event); 5359 cpu_buffer->read_stamp += delta; 5360 return; 5361 5362 case RINGBUF_TYPE_TIME_STAMP: 5363 delta = rb_event_time_stamp(event); 5364 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5365 cpu_buffer->read_stamp = delta; 5366 return; 5367 5368 case RINGBUF_TYPE_DATA: 5369 cpu_buffer->read_stamp += event->time_delta; 5370 return; 5371 5372 default: 5373 RB_WARN_ON(cpu_buffer, 1); 5374 } 5375 } 5376 5377 static void 5378 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5379 struct ring_buffer_event *event) 5380 { 5381 u64 delta; 5382 5383 switch (event->type_len) { 5384 case RINGBUF_TYPE_PADDING: 5385 return; 5386 5387 case RINGBUF_TYPE_TIME_EXTEND: 5388 delta = rb_event_time_stamp(event); 5389 iter->read_stamp += delta; 5390 return; 5391 5392 case RINGBUF_TYPE_TIME_STAMP: 5393 delta = rb_event_time_stamp(event); 5394 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5395 iter->read_stamp = delta; 5396 return; 5397 5398 case RINGBUF_TYPE_DATA: 5399 iter->read_stamp += event->time_delta; 5400 return; 5401 5402 default: 5403 RB_WARN_ON(iter->cpu_buffer, 1); 5404 } 5405 } 5406 5407 static struct buffer_page * 5408 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5409 { 5410 struct buffer_page *reader = NULL; 5411 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5412 unsigned long overwrite; 5413 unsigned long flags; 5414 int nr_loops = 0; 5415 bool ret; 5416 5417 local_irq_save(flags); 5418 arch_spin_lock(&cpu_buffer->lock); 5419 5420 again: 5421 /* 5422 * This should normally only loop twice. But because the 5423 * start of the reader inserts an empty page, it causes 5424 * a case where we will loop three times. There should be no 5425 * reason to loop four times (that I know of). 5426 */ 5427 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5428 reader = NULL; 5429 goto out; 5430 } 5431 5432 reader = cpu_buffer->reader_page; 5433 5434 /* If there's more to read, return this page */ 5435 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5436 goto out; 5437 5438 /* Never should we have an index greater than the size */ 5439 if (RB_WARN_ON(cpu_buffer, 5440 cpu_buffer->reader_page->read > rb_page_size(reader))) 5441 goto out; 5442 5443 /* check if we caught up to the tail */ 5444 reader = NULL; 5445 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5446 goto out; 5447 5448 /* Don't bother swapping if the ring buffer is empty */ 5449 if (rb_num_of_entries(cpu_buffer) == 0) 5450 goto out; 5451 5452 /* 5453 * Reset the reader page to size zero. 5454 */ 5455 local_set(&cpu_buffer->reader_page->write, 0); 5456 local_set(&cpu_buffer->reader_page->entries, 0); 5457 cpu_buffer->reader_page->real_end = 0; 5458 5459 spin: 5460 /* 5461 * Splice the empty reader page into the list around the head. 5462 */ 5463 reader = rb_set_head_page(cpu_buffer); 5464 if (!reader) 5465 goto out; 5466 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5467 cpu_buffer->reader_page->list.prev = reader->list.prev; 5468 5469 /* 5470 * cpu_buffer->pages just needs to point to the buffer, it 5471 * has no specific buffer page to point to. Lets move it out 5472 * of our way so we don't accidentally swap it. 5473 */ 5474 cpu_buffer->pages = reader->list.prev; 5475 5476 /* The reader page will be pointing to the new head */ 5477 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5478 5479 /* 5480 * We want to make sure we read the overruns after we set up our 5481 * pointers to the next object. The writer side does a 5482 * cmpxchg to cross pages which acts as the mb on the writer 5483 * side. Note, the reader will constantly fail the swap 5484 * while the writer is updating the pointers, so this 5485 * guarantees that the overwrite recorded here is the one we 5486 * want to compare with the last_overrun. 5487 */ 5488 smp_mb(); 5489 overwrite = local_read(&(cpu_buffer->overrun)); 5490 5491 /* 5492 * Here's the tricky part. 5493 * 5494 * We need to move the pointer past the header page. 5495 * But we can only do that if a writer is not currently 5496 * moving it. The page before the header page has the 5497 * flag bit '1' set if it is pointing to the page we want. 5498 * but if the writer is in the process of moving it 5499 * then it will be '2' or already moved '0'. 5500 */ 5501 5502 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5503 5504 /* 5505 * If we did not convert it, then we must try again. 5506 */ 5507 if (!ret) 5508 goto spin; 5509 5510 if (cpu_buffer->ring_meta) 5511 rb_update_meta_reader(cpu_buffer, reader); 5512 5513 /* 5514 * Yay! We succeeded in replacing the page. 5515 * 5516 * Now make the new head point back to the reader page. 5517 */ 5518 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5519 rb_inc_page(&cpu_buffer->head_page); 5520 5521 cpu_buffer->cnt++; 5522 local_inc(&cpu_buffer->pages_read); 5523 5524 /* Finally update the reader page to the new head */ 5525 cpu_buffer->reader_page = reader; 5526 cpu_buffer->reader_page->read = 0; 5527 5528 if (overwrite != cpu_buffer->last_overrun) { 5529 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5530 cpu_buffer->last_overrun = overwrite; 5531 } 5532 5533 goto again; 5534 5535 out: 5536 /* Update the read_stamp on the first event */ 5537 if (reader && reader->read == 0) 5538 cpu_buffer->read_stamp = reader->page->time_stamp; 5539 5540 arch_spin_unlock(&cpu_buffer->lock); 5541 local_irq_restore(flags); 5542 5543 /* 5544 * The writer has preempt disable, wait for it. But not forever 5545 * Although, 1 second is pretty much "forever" 5546 */ 5547 #define USECS_WAIT 1000000 5548 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5549 /* If the write is past the end of page, a writer is still updating it */ 5550 if (likely(!reader || rb_page_write(reader) <= bsize)) 5551 break; 5552 5553 udelay(1); 5554 5555 /* Get the latest version of the reader write value */ 5556 smp_rmb(); 5557 } 5558 5559 /* The writer is not moving forward? Something is wrong */ 5560 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5561 reader = NULL; 5562 5563 /* 5564 * Make sure we see any padding after the write update 5565 * (see rb_reset_tail()). 5566 * 5567 * In addition, a writer may be writing on the reader page 5568 * if the page has not been fully filled, so the read barrier 5569 * is also needed to make sure we see the content of what is 5570 * committed by the writer (see rb_set_commit_to_write()). 5571 */ 5572 smp_rmb(); 5573 5574 5575 return reader; 5576 } 5577 5578 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5579 { 5580 struct ring_buffer_event *event; 5581 struct buffer_page *reader; 5582 unsigned length; 5583 5584 reader = rb_get_reader_page(cpu_buffer); 5585 5586 /* This function should not be called when buffer is empty */ 5587 if (RB_WARN_ON(cpu_buffer, !reader)) 5588 return; 5589 5590 event = rb_reader_event(cpu_buffer); 5591 5592 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5593 cpu_buffer->read++; 5594 5595 rb_update_read_stamp(cpu_buffer, event); 5596 5597 length = rb_event_length(event); 5598 cpu_buffer->reader_page->read += length; 5599 cpu_buffer->read_bytes += length; 5600 } 5601 5602 static void rb_advance_iter(struct ring_buffer_iter *iter) 5603 { 5604 struct ring_buffer_per_cpu *cpu_buffer; 5605 5606 cpu_buffer = iter->cpu_buffer; 5607 5608 /* If head == next_event then we need to jump to the next event */ 5609 if (iter->head == iter->next_event) { 5610 /* If the event gets overwritten again, there's nothing to do */ 5611 if (rb_iter_head_event(iter) == NULL) 5612 return; 5613 } 5614 5615 iter->head = iter->next_event; 5616 5617 /* 5618 * Check if we are at the end of the buffer. 5619 */ 5620 if (iter->next_event >= rb_page_size(iter->head_page)) { 5621 /* discarded commits can make the page empty */ 5622 if (iter->head_page == cpu_buffer->commit_page) 5623 return; 5624 rb_inc_iter(iter); 5625 return; 5626 } 5627 5628 rb_update_iter_read_stamp(iter, iter->event); 5629 } 5630 5631 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5632 { 5633 return cpu_buffer->lost_events; 5634 } 5635 5636 static struct ring_buffer_event * 5637 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5638 unsigned long *lost_events) 5639 { 5640 struct ring_buffer_event *event; 5641 struct buffer_page *reader; 5642 int nr_loops = 0; 5643 5644 if (ts) 5645 *ts = 0; 5646 again: 5647 /* 5648 * We repeat when a time extend is encountered. 5649 * Since the time extend is always attached to a data event, 5650 * we should never loop more than once. 5651 * (We never hit the following condition more than twice). 5652 */ 5653 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5654 return NULL; 5655 5656 reader = rb_get_reader_page(cpu_buffer); 5657 if (!reader) 5658 return NULL; 5659 5660 event = rb_reader_event(cpu_buffer); 5661 5662 switch (event->type_len) { 5663 case RINGBUF_TYPE_PADDING: 5664 if (rb_null_event(event)) 5665 RB_WARN_ON(cpu_buffer, 1); 5666 /* 5667 * Because the writer could be discarding every 5668 * event it creates (which would probably be bad) 5669 * if we were to go back to "again" then we may never 5670 * catch up, and will trigger the warn on, or lock 5671 * the box. Return the padding, and we will release 5672 * the current locks, and try again. 5673 */ 5674 return event; 5675 5676 case RINGBUF_TYPE_TIME_EXTEND: 5677 /* Internal data, OK to advance */ 5678 rb_advance_reader(cpu_buffer); 5679 goto again; 5680 5681 case RINGBUF_TYPE_TIME_STAMP: 5682 if (ts) { 5683 *ts = rb_event_time_stamp(event); 5684 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5685 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5686 cpu_buffer->cpu, ts); 5687 } 5688 /* Internal data, OK to advance */ 5689 rb_advance_reader(cpu_buffer); 5690 goto again; 5691 5692 case RINGBUF_TYPE_DATA: 5693 if (ts && !(*ts)) { 5694 *ts = cpu_buffer->read_stamp + event->time_delta; 5695 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5696 cpu_buffer->cpu, ts); 5697 } 5698 if (lost_events) 5699 *lost_events = rb_lost_events(cpu_buffer); 5700 return event; 5701 5702 default: 5703 RB_WARN_ON(cpu_buffer, 1); 5704 } 5705 5706 return NULL; 5707 } 5708 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5709 5710 static struct ring_buffer_event * 5711 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5712 { 5713 struct trace_buffer *buffer; 5714 struct ring_buffer_per_cpu *cpu_buffer; 5715 struct ring_buffer_event *event; 5716 int nr_loops = 0; 5717 5718 if (ts) 5719 *ts = 0; 5720 5721 cpu_buffer = iter->cpu_buffer; 5722 buffer = cpu_buffer->buffer; 5723 5724 /* 5725 * Check if someone performed a consuming read to the buffer 5726 * or removed some pages from the buffer. In these cases, 5727 * iterator was invalidated and we need to reset it. 5728 */ 5729 if (unlikely(iter->cache_read != cpu_buffer->read || 5730 iter->cache_reader_page != cpu_buffer->reader_page || 5731 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5732 rb_iter_reset(iter); 5733 5734 again: 5735 if (ring_buffer_iter_empty(iter)) 5736 return NULL; 5737 5738 /* 5739 * As the writer can mess with what the iterator is trying 5740 * to read, just give up if we fail to get an event after 5741 * three tries. The iterator is not as reliable when reading 5742 * the ring buffer with an active write as the consumer is. 5743 * Do not warn if the three failures is reached. 5744 */ 5745 if (++nr_loops > 3) 5746 return NULL; 5747 5748 if (rb_per_cpu_empty(cpu_buffer)) 5749 return NULL; 5750 5751 if (iter->head >= rb_page_size(iter->head_page)) { 5752 rb_inc_iter(iter); 5753 goto again; 5754 } 5755 5756 event = rb_iter_head_event(iter); 5757 if (!event) 5758 goto again; 5759 5760 switch (event->type_len) { 5761 case RINGBUF_TYPE_PADDING: 5762 if (rb_null_event(event)) { 5763 rb_inc_iter(iter); 5764 goto again; 5765 } 5766 rb_advance_iter(iter); 5767 return event; 5768 5769 case RINGBUF_TYPE_TIME_EXTEND: 5770 /* Internal data, OK to advance */ 5771 rb_advance_iter(iter); 5772 goto again; 5773 5774 case RINGBUF_TYPE_TIME_STAMP: 5775 if (ts) { 5776 *ts = rb_event_time_stamp(event); 5777 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5778 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5779 cpu_buffer->cpu, ts); 5780 } 5781 /* Internal data, OK to advance */ 5782 rb_advance_iter(iter); 5783 goto again; 5784 5785 case RINGBUF_TYPE_DATA: 5786 if (ts && !(*ts)) { 5787 *ts = iter->read_stamp + event->time_delta; 5788 ring_buffer_normalize_time_stamp(buffer, 5789 cpu_buffer->cpu, ts); 5790 } 5791 return event; 5792 5793 default: 5794 RB_WARN_ON(cpu_buffer, 1); 5795 } 5796 5797 return NULL; 5798 } 5799 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5800 5801 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5802 { 5803 if (likely(!in_nmi())) { 5804 raw_spin_lock(&cpu_buffer->reader_lock); 5805 return true; 5806 } 5807 5808 /* 5809 * If an NMI die dumps out the content of the ring buffer 5810 * trylock must be used to prevent a deadlock if the NMI 5811 * preempted a task that holds the ring buffer locks. If 5812 * we get the lock then all is fine, if not, then continue 5813 * to do the read, but this can corrupt the ring buffer, 5814 * so it must be permanently disabled from future writes. 5815 * Reading from NMI is a oneshot deal. 5816 */ 5817 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5818 return true; 5819 5820 /* Continue without locking, but disable the ring buffer */ 5821 atomic_inc(&cpu_buffer->record_disabled); 5822 return false; 5823 } 5824 5825 static inline void 5826 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5827 { 5828 if (likely(locked)) 5829 raw_spin_unlock(&cpu_buffer->reader_lock); 5830 } 5831 5832 /** 5833 * ring_buffer_peek - peek at the next event to be read 5834 * @buffer: The ring buffer to read 5835 * @cpu: The cpu to peak at 5836 * @ts: The timestamp counter of this event. 5837 * @lost_events: a variable to store if events were lost (may be NULL) 5838 * 5839 * This will return the event that will be read next, but does 5840 * not consume the data. 5841 */ 5842 struct ring_buffer_event * 5843 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5844 unsigned long *lost_events) 5845 { 5846 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5847 struct ring_buffer_event *event; 5848 unsigned long flags; 5849 bool dolock; 5850 5851 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5852 return NULL; 5853 5854 again: 5855 local_irq_save(flags); 5856 dolock = rb_reader_lock(cpu_buffer); 5857 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5858 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5859 rb_advance_reader(cpu_buffer); 5860 rb_reader_unlock(cpu_buffer, dolock); 5861 local_irq_restore(flags); 5862 5863 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5864 goto again; 5865 5866 return event; 5867 } 5868 5869 /** ring_buffer_iter_dropped - report if there are dropped events 5870 * @iter: The ring buffer iterator 5871 * 5872 * Returns true if there was dropped events since the last peek. 5873 */ 5874 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5875 { 5876 bool ret = iter->missed_events != 0; 5877 5878 iter->missed_events = 0; 5879 return ret; 5880 } 5881 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5882 5883 /** 5884 * ring_buffer_iter_peek - peek at the next event to be read 5885 * @iter: The ring buffer iterator 5886 * @ts: The timestamp counter of this event. 5887 * 5888 * This will return the event that will be read next, but does 5889 * not increment the iterator. 5890 */ 5891 struct ring_buffer_event * 5892 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5893 { 5894 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5895 struct ring_buffer_event *event; 5896 unsigned long flags; 5897 5898 again: 5899 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5900 event = rb_iter_peek(iter, ts); 5901 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5902 5903 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5904 goto again; 5905 5906 return event; 5907 } 5908 5909 /** 5910 * ring_buffer_consume - return an event and consume it 5911 * @buffer: The ring buffer to get the next event from 5912 * @cpu: the cpu to read the buffer from 5913 * @ts: a variable to store the timestamp (may be NULL) 5914 * @lost_events: a variable to store if events were lost (may be NULL) 5915 * 5916 * Returns the next event in the ring buffer, and that event is consumed. 5917 * Meaning, that sequential reads will keep returning a different event, 5918 * and eventually empty the ring buffer if the producer is slower. 5919 */ 5920 struct ring_buffer_event * 5921 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5922 unsigned long *lost_events) 5923 { 5924 struct ring_buffer_per_cpu *cpu_buffer; 5925 struct ring_buffer_event *event = NULL; 5926 unsigned long flags; 5927 bool dolock; 5928 5929 again: 5930 /* might be called in atomic */ 5931 preempt_disable(); 5932 5933 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5934 goto out; 5935 5936 cpu_buffer = buffer->buffers[cpu]; 5937 local_irq_save(flags); 5938 dolock = rb_reader_lock(cpu_buffer); 5939 5940 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5941 if (event) { 5942 cpu_buffer->lost_events = 0; 5943 rb_advance_reader(cpu_buffer); 5944 } 5945 5946 rb_reader_unlock(cpu_buffer, dolock); 5947 local_irq_restore(flags); 5948 5949 out: 5950 preempt_enable(); 5951 5952 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5953 goto again; 5954 5955 return event; 5956 } 5957 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5958 5959 /** 5960 * ring_buffer_read_start - start a non consuming read of the buffer 5961 * @buffer: The ring buffer to read from 5962 * @cpu: The cpu buffer to iterate over 5963 * @flags: gfp flags to use for memory allocation 5964 * 5965 * This creates an iterator to allow non-consuming iteration through 5966 * the buffer. If the buffer is disabled for writing, it will produce 5967 * the same information each time, but if the buffer is still writing 5968 * then the first hit of a write will cause the iteration to stop. 5969 * 5970 * Must be paired with ring_buffer_read_finish. 5971 */ 5972 struct ring_buffer_iter * 5973 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 5974 { 5975 struct ring_buffer_per_cpu *cpu_buffer; 5976 struct ring_buffer_iter *iter; 5977 5978 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5979 return NULL; 5980 5981 iter = kzalloc(sizeof(*iter), flags); 5982 if (!iter) 5983 return NULL; 5984 5985 /* Holds the entire event: data and meta data */ 5986 iter->event_size = buffer->subbuf_size; 5987 iter->event = kmalloc(iter->event_size, flags); 5988 if (!iter->event) { 5989 kfree(iter); 5990 return NULL; 5991 } 5992 5993 cpu_buffer = buffer->buffers[cpu]; 5994 5995 iter->cpu_buffer = cpu_buffer; 5996 5997 atomic_inc(&cpu_buffer->resize_disabled); 5998 5999 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6000 arch_spin_lock(&cpu_buffer->lock); 6001 rb_iter_reset(iter); 6002 arch_spin_unlock(&cpu_buffer->lock); 6003 6004 return iter; 6005 } 6006 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 6007 6008 /** 6009 * ring_buffer_read_finish - finish reading the iterator of the buffer 6010 * @iter: The iterator retrieved by ring_buffer_start 6011 * 6012 * This re-enables resizing of the buffer, and frees the iterator. 6013 */ 6014 void 6015 ring_buffer_read_finish(struct ring_buffer_iter *iter) 6016 { 6017 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6018 6019 /* Use this opportunity to check the integrity of the ring buffer. */ 6020 rb_check_pages(cpu_buffer); 6021 6022 atomic_dec(&cpu_buffer->resize_disabled); 6023 kfree(iter->event); 6024 kfree(iter); 6025 } 6026 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6027 6028 /** 6029 * ring_buffer_iter_advance - advance the iterator to the next location 6030 * @iter: The ring buffer iterator 6031 * 6032 * Move the location of the iterator such that the next read will 6033 * be the next location of the iterator. 6034 */ 6035 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6036 { 6037 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6038 unsigned long flags; 6039 6040 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6041 6042 rb_advance_iter(iter); 6043 6044 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6045 } 6046 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6047 6048 /** 6049 * ring_buffer_size - return the size of the ring buffer (in bytes) 6050 * @buffer: The ring buffer. 6051 * @cpu: The CPU to get ring buffer size from. 6052 */ 6053 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6054 { 6055 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6056 return 0; 6057 6058 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6059 } 6060 EXPORT_SYMBOL_GPL(ring_buffer_size); 6061 6062 /** 6063 * ring_buffer_max_event_size - return the max data size of an event 6064 * @buffer: The ring buffer. 6065 * 6066 * Returns the maximum size an event can be. 6067 */ 6068 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6069 { 6070 /* If abs timestamp is requested, events have a timestamp too */ 6071 if (ring_buffer_time_stamp_abs(buffer)) 6072 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6073 return buffer->max_data_size; 6074 } 6075 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6076 6077 static void rb_clear_buffer_page(struct buffer_page *page) 6078 { 6079 local_set(&page->write, 0); 6080 local_set(&page->entries, 0); 6081 rb_init_page(page->page); 6082 page->read = 0; 6083 } 6084 6085 /* 6086 * When the buffer is memory mapped to user space, each sub buffer 6087 * has a unique id that is used by the meta data to tell the user 6088 * where the current reader page is. 6089 * 6090 * For a normal allocated ring buffer, the id is saved in the buffer page 6091 * id field, and updated via this function. 6092 * 6093 * But for a fixed memory mapped buffer, the id is already assigned for 6094 * fixed memory ordering in the memory layout and can not be used. Instead 6095 * the index of where the page lies in the memory layout is used. 6096 * 6097 * For the normal pages, set the buffer page id with the passed in @id 6098 * value and return that. 6099 * 6100 * For fixed memory mapped pages, get the page index in the memory layout 6101 * and return that as the id. 6102 */ 6103 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6104 struct buffer_page *bpage, int id) 6105 { 6106 /* 6107 * For boot buffers, the id is the index, 6108 * otherwise, set the buffer page with this id 6109 */ 6110 if (cpu_buffer->ring_meta) 6111 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6112 else 6113 bpage->id = id; 6114 6115 return id; 6116 } 6117 6118 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6119 { 6120 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6121 6122 if (!meta) 6123 return; 6124 6125 meta->reader.read = cpu_buffer->reader_page->read; 6126 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6127 cpu_buffer->reader_page->id); 6128 6129 meta->reader.lost_events = cpu_buffer->lost_events; 6130 6131 meta->entries = local_read(&cpu_buffer->entries); 6132 meta->overrun = local_read(&cpu_buffer->overrun); 6133 meta->read = cpu_buffer->read; 6134 6135 /* Some archs do not have data cache coherency between kernel and user-space */ 6136 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6137 } 6138 6139 static void 6140 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6141 { 6142 struct buffer_page *page; 6143 6144 rb_head_page_deactivate(cpu_buffer); 6145 6146 cpu_buffer->head_page 6147 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6148 rb_clear_buffer_page(cpu_buffer->head_page); 6149 list_for_each_entry(page, cpu_buffer->pages, list) { 6150 rb_clear_buffer_page(page); 6151 } 6152 6153 cpu_buffer->tail_page = cpu_buffer->head_page; 6154 cpu_buffer->commit_page = cpu_buffer->head_page; 6155 6156 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6157 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6158 rb_clear_buffer_page(cpu_buffer->reader_page); 6159 6160 local_set(&cpu_buffer->entries_bytes, 0); 6161 local_set(&cpu_buffer->overrun, 0); 6162 local_set(&cpu_buffer->commit_overrun, 0); 6163 local_set(&cpu_buffer->dropped_events, 0); 6164 local_set(&cpu_buffer->entries, 0); 6165 local_set(&cpu_buffer->committing, 0); 6166 local_set(&cpu_buffer->commits, 0); 6167 local_set(&cpu_buffer->pages_touched, 0); 6168 local_set(&cpu_buffer->pages_lost, 0); 6169 local_set(&cpu_buffer->pages_read, 0); 6170 cpu_buffer->last_pages_touch = 0; 6171 cpu_buffer->shortest_full = 0; 6172 cpu_buffer->read = 0; 6173 cpu_buffer->read_bytes = 0; 6174 6175 rb_time_set(&cpu_buffer->write_stamp, 0); 6176 rb_time_set(&cpu_buffer->before_stamp, 0); 6177 6178 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6179 6180 cpu_buffer->lost_events = 0; 6181 cpu_buffer->last_overrun = 0; 6182 6183 rb_head_page_activate(cpu_buffer); 6184 cpu_buffer->pages_removed = 0; 6185 6186 if (cpu_buffer->mapped) { 6187 rb_update_meta_page(cpu_buffer); 6188 if (cpu_buffer->ring_meta) { 6189 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6190 meta->commit_buffer = meta->head_buffer; 6191 } 6192 } 6193 } 6194 6195 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6196 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6197 { 6198 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6199 6200 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6201 return; 6202 6203 arch_spin_lock(&cpu_buffer->lock); 6204 6205 rb_reset_cpu(cpu_buffer); 6206 6207 arch_spin_unlock(&cpu_buffer->lock); 6208 } 6209 6210 /** 6211 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6212 * @buffer: The ring buffer to reset a per cpu buffer of 6213 * @cpu: The CPU buffer to be reset 6214 */ 6215 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6216 { 6217 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6218 6219 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6220 return; 6221 6222 /* prevent another thread from changing buffer sizes */ 6223 mutex_lock(&buffer->mutex); 6224 6225 atomic_inc(&cpu_buffer->resize_disabled); 6226 atomic_inc(&cpu_buffer->record_disabled); 6227 6228 /* Make sure all commits have finished */ 6229 synchronize_rcu(); 6230 6231 reset_disabled_cpu_buffer(cpu_buffer); 6232 6233 atomic_dec(&cpu_buffer->record_disabled); 6234 atomic_dec(&cpu_buffer->resize_disabled); 6235 6236 mutex_unlock(&buffer->mutex); 6237 } 6238 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6239 6240 /* Flag to ensure proper resetting of atomic variables */ 6241 #define RESET_BIT (1 << 30) 6242 6243 /** 6244 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6245 * @buffer: The ring buffer to reset a per cpu buffer of 6246 */ 6247 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6248 { 6249 struct ring_buffer_per_cpu *cpu_buffer; 6250 int cpu; 6251 6252 /* prevent another thread from changing buffer sizes */ 6253 mutex_lock(&buffer->mutex); 6254 6255 for_each_online_buffer_cpu(buffer, cpu) { 6256 cpu_buffer = buffer->buffers[cpu]; 6257 6258 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6259 atomic_inc(&cpu_buffer->record_disabled); 6260 } 6261 6262 /* Make sure all commits have finished */ 6263 synchronize_rcu(); 6264 6265 for_each_buffer_cpu(buffer, cpu) { 6266 cpu_buffer = buffer->buffers[cpu]; 6267 6268 /* 6269 * If a CPU came online during the synchronize_rcu(), then 6270 * ignore it. 6271 */ 6272 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6273 continue; 6274 6275 reset_disabled_cpu_buffer(cpu_buffer); 6276 6277 atomic_dec(&cpu_buffer->record_disabled); 6278 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6279 } 6280 6281 mutex_unlock(&buffer->mutex); 6282 } 6283 6284 /** 6285 * ring_buffer_reset - reset a ring buffer 6286 * @buffer: The ring buffer to reset all cpu buffers 6287 */ 6288 void ring_buffer_reset(struct trace_buffer *buffer) 6289 { 6290 struct ring_buffer_per_cpu *cpu_buffer; 6291 int cpu; 6292 6293 /* prevent another thread from changing buffer sizes */ 6294 mutex_lock(&buffer->mutex); 6295 6296 for_each_buffer_cpu(buffer, cpu) { 6297 cpu_buffer = buffer->buffers[cpu]; 6298 6299 atomic_inc(&cpu_buffer->resize_disabled); 6300 atomic_inc(&cpu_buffer->record_disabled); 6301 } 6302 6303 /* Make sure all commits have finished */ 6304 synchronize_rcu(); 6305 6306 for_each_buffer_cpu(buffer, cpu) { 6307 cpu_buffer = buffer->buffers[cpu]; 6308 6309 reset_disabled_cpu_buffer(cpu_buffer); 6310 6311 atomic_dec(&cpu_buffer->record_disabled); 6312 atomic_dec(&cpu_buffer->resize_disabled); 6313 } 6314 6315 mutex_unlock(&buffer->mutex); 6316 } 6317 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6318 6319 /** 6320 * ring_buffer_empty - is the ring buffer empty? 6321 * @buffer: The ring buffer to test 6322 */ 6323 bool ring_buffer_empty(struct trace_buffer *buffer) 6324 { 6325 struct ring_buffer_per_cpu *cpu_buffer; 6326 unsigned long flags; 6327 bool dolock; 6328 bool ret; 6329 int cpu; 6330 6331 /* yes this is racy, but if you don't like the race, lock the buffer */ 6332 for_each_buffer_cpu(buffer, cpu) { 6333 cpu_buffer = buffer->buffers[cpu]; 6334 local_irq_save(flags); 6335 dolock = rb_reader_lock(cpu_buffer); 6336 ret = rb_per_cpu_empty(cpu_buffer); 6337 rb_reader_unlock(cpu_buffer, dolock); 6338 local_irq_restore(flags); 6339 6340 if (!ret) 6341 return false; 6342 } 6343 6344 return true; 6345 } 6346 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6347 6348 /** 6349 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6350 * @buffer: The ring buffer 6351 * @cpu: The CPU buffer to test 6352 */ 6353 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6354 { 6355 struct ring_buffer_per_cpu *cpu_buffer; 6356 unsigned long flags; 6357 bool dolock; 6358 bool ret; 6359 6360 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6361 return true; 6362 6363 cpu_buffer = buffer->buffers[cpu]; 6364 local_irq_save(flags); 6365 dolock = rb_reader_lock(cpu_buffer); 6366 ret = rb_per_cpu_empty(cpu_buffer); 6367 rb_reader_unlock(cpu_buffer, dolock); 6368 local_irq_restore(flags); 6369 6370 return ret; 6371 } 6372 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6373 6374 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6375 /** 6376 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6377 * @buffer_a: One buffer to swap with 6378 * @buffer_b: The other buffer to swap with 6379 * @cpu: the CPU of the buffers to swap 6380 * 6381 * This function is useful for tracers that want to take a "snapshot" 6382 * of a CPU buffer and has another back up buffer lying around. 6383 * it is expected that the tracer handles the cpu buffer not being 6384 * used at the moment. 6385 */ 6386 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6387 struct trace_buffer *buffer_b, int cpu) 6388 { 6389 struct ring_buffer_per_cpu *cpu_buffer_a; 6390 struct ring_buffer_per_cpu *cpu_buffer_b; 6391 int ret = -EINVAL; 6392 6393 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6394 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6395 return -EINVAL; 6396 6397 cpu_buffer_a = buffer_a->buffers[cpu]; 6398 cpu_buffer_b = buffer_b->buffers[cpu]; 6399 6400 /* It's up to the callers to not try to swap mapped buffers */ 6401 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6402 return -EBUSY; 6403 6404 /* At least make sure the two buffers are somewhat the same */ 6405 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6406 return -EINVAL; 6407 6408 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6409 return -EINVAL; 6410 6411 if (atomic_read(&buffer_a->record_disabled)) 6412 return -EAGAIN; 6413 6414 if (atomic_read(&buffer_b->record_disabled)) 6415 return -EAGAIN; 6416 6417 if (atomic_read(&cpu_buffer_a->record_disabled)) 6418 return -EAGAIN; 6419 6420 if (atomic_read(&cpu_buffer_b->record_disabled)) 6421 return -EAGAIN; 6422 6423 /* 6424 * We can't do a synchronize_rcu here because this 6425 * function can be called in atomic context. 6426 * Normally this will be called from the same CPU as cpu. 6427 * If not it's up to the caller to protect this. 6428 */ 6429 atomic_inc(&cpu_buffer_a->record_disabled); 6430 atomic_inc(&cpu_buffer_b->record_disabled); 6431 6432 ret = -EBUSY; 6433 if (local_read(&cpu_buffer_a->committing)) 6434 goto out_dec; 6435 if (local_read(&cpu_buffer_b->committing)) 6436 goto out_dec; 6437 6438 /* 6439 * When resize is in progress, we cannot swap it because 6440 * it will mess the state of the cpu buffer. 6441 */ 6442 if (atomic_read(&buffer_a->resizing)) 6443 goto out_dec; 6444 if (atomic_read(&buffer_b->resizing)) 6445 goto out_dec; 6446 6447 buffer_a->buffers[cpu] = cpu_buffer_b; 6448 buffer_b->buffers[cpu] = cpu_buffer_a; 6449 6450 cpu_buffer_b->buffer = buffer_a; 6451 cpu_buffer_a->buffer = buffer_b; 6452 6453 ret = 0; 6454 6455 out_dec: 6456 atomic_dec(&cpu_buffer_a->record_disabled); 6457 atomic_dec(&cpu_buffer_b->record_disabled); 6458 return ret; 6459 } 6460 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6461 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6462 6463 /** 6464 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6465 * @buffer: the buffer to allocate for. 6466 * @cpu: the cpu buffer to allocate. 6467 * 6468 * This function is used in conjunction with ring_buffer_read_page. 6469 * When reading a full page from the ring buffer, these functions 6470 * can be used to speed up the process. The calling function should 6471 * allocate a few pages first with this function. Then when it 6472 * needs to get pages from the ring buffer, it passes the result 6473 * of this function into ring_buffer_read_page, which will swap 6474 * the page that was allocated, with the read page of the buffer. 6475 * 6476 * Returns: 6477 * The page allocated, or ERR_PTR 6478 */ 6479 struct buffer_data_read_page * 6480 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6481 { 6482 struct ring_buffer_per_cpu *cpu_buffer; 6483 struct buffer_data_read_page *bpage = NULL; 6484 unsigned long flags; 6485 6486 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6487 return ERR_PTR(-ENODEV); 6488 6489 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6490 if (!bpage) 6491 return ERR_PTR(-ENOMEM); 6492 6493 bpage->order = buffer->subbuf_order; 6494 cpu_buffer = buffer->buffers[cpu]; 6495 local_irq_save(flags); 6496 arch_spin_lock(&cpu_buffer->lock); 6497 6498 if (cpu_buffer->free_page) { 6499 bpage->data = cpu_buffer->free_page; 6500 cpu_buffer->free_page = NULL; 6501 } 6502 6503 arch_spin_unlock(&cpu_buffer->lock); 6504 local_irq_restore(flags); 6505 6506 if (bpage->data) { 6507 rb_init_page(bpage->data); 6508 } else { 6509 bpage->data = alloc_cpu_data(cpu, cpu_buffer->buffer->subbuf_order); 6510 if (!bpage->data) { 6511 kfree(bpage); 6512 return ERR_PTR(-ENOMEM); 6513 } 6514 } 6515 6516 return bpage; 6517 } 6518 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6519 6520 /** 6521 * ring_buffer_free_read_page - free an allocated read page 6522 * @buffer: the buffer the page was allocate for 6523 * @cpu: the cpu buffer the page came from 6524 * @data_page: the page to free 6525 * 6526 * Free a page allocated from ring_buffer_alloc_read_page. 6527 */ 6528 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6529 struct buffer_data_read_page *data_page) 6530 { 6531 struct ring_buffer_per_cpu *cpu_buffer; 6532 struct buffer_data_page *bpage = data_page->data; 6533 struct page *page = virt_to_page(bpage); 6534 unsigned long flags; 6535 6536 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6537 return; 6538 6539 cpu_buffer = buffer->buffers[cpu]; 6540 6541 /* 6542 * If the page is still in use someplace else, or order of the page 6543 * is different from the subbuffer order of the buffer - 6544 * we can't reuse it 6545 */ 6546 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6547 goto out; 6548 6549 local_irq_save(flags); 6550 arch_spin_lock(&cpu_buffer->lock); 6551 6552 if (!cpu_buffer->free_page) { 6553 cpu_buffer->free_page = bpage; 6554 bpage = NULL; 6555 } 6556 6557 arch_spin_unlock(&cpu_buffer->lock); 6558 local_irq_restore(flags); 6559 6560 out: 6561 free_pages((unsigned long)bpage, data_page->order); 6562 kfree(data_page); 6563 } 6564 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6565 6566 /** 6567 * ring_buffer_read_page - extract a page from the ring buffer 6568 * @buffer: buffer to extract from 6569 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6570 * @len: amount to extract 6571 * @cpu: the cpu of the buffer to extract 6572 * @full: should the extraction only happen when the page is full. 6573 * 6574 * This function will pull out a page from the ring buffer and consume it. 6575 * @data_page must be the address of the variable that was returned 6576 * from ring_buffer_alloc_read_page. This is because the page might be used 6577 * to swap with a page in the ring buffer. 6578 * 6579 * for example: 6580 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6581 * if (IS_ERR(rpage)) 6582 * return PTR_ERR(rpage); 6583 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6584 * if (ret >= 0) 6585 * process_page(ring_buffer_read_page_data(rpage), ret); 6586 * ring_buffer_free_read_page(buffer, cpu, rpage); 6587 * 6588 * When @full is set, the function will not return true unless 6589 * the writer is off the reader page. 6590 * 6591 * Note: it is up to the calling functions to handle sleeps and wakeups. 6592 * The ring buffer can be used anywhere in the kernel and can not 6593 * blindly call wake_up. The layer that uses the ring buffer must be 6594 * responsible for that. 6595 * 6596 * Returns: 6597 * >=0 if data has been transferred, returns the offset of consumed data. 6598 * <0 if no data has been transferred. 6599 */ 6600 int ring_buffer_read_page(struct trace_buffer *buffer, 6601 struct buffer_data_read_page *data_page, 6602 size_t len, int cpu, int full) 6603 { 6604 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6605 struct ring_buffer_event *event; 6606 struct buffer_data_page *bpage; 6607 struct buffer_page *reader; 6608 unsigned long missed_events; 6609 unsigned int commit; 6610 unsigned int read; 6611 u64 save_timestamp; 6612 6613 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6614 return -1; 6615 6616 /* 6617 * If len is not big enough to hold the page header, then 6618 * we can not copy anything. 6619 */ 6620 if (len <= BUF_PAGE_HDR_SIZE) 6621 return -1; 6622 6623 len -= BUF_PAGE_HDR_SIZE; 6624 6625 if (!data_page || !data_page->data) 6626 return -1; 6627 6628 if (data_page->order != buffer->subbuf_order) 6629 return -1; 6630 6631 bpage = data_page->data; 6632 if (!bpage) 6633 return -1; 6634 6635 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6636 6637 reader = rb_get_reader_page(cpu_buffer); 6638 if (!reader) 6639 return -1; 6640 6641 event = rb_reader_event(cpu_buffer); 6642 6643 read = reader->read; 6644 commit = rb_page_size(reader); 6645 6646 /* Check if any events were dropped */ 6647 missed_events = cpu_buffer->lost_events; 6648 6649 /* 6650 * If this page has been partially read or 6651 * if len is not big enough to read the rest of the page or 6652 * a writer is still on the page, then 6653 * we must copy the data from the page to the buffer. 6654 * Otherwise, we can simply swap the page with the one passed in. 6655 */ 6656 if (read || (len < (commit - read)) || 6657 cpu_buffer->reader_page == cpu_buffer->commit_page || 6658 cpu_buffer->mapped) { 6659 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6660 unsigned int rpos = read; 6661 unsigned int pos = 0; 6662 unsigned int size; 6663 6664 /* 6665 * If a full page is expected, this can still be returned 6666 * if there's been a previous partial read and the 6667 * rest of the page can be read and the commit page is off 6668 * the reader page. 6669 */ 6670 if (full && 6671 (!read || (len < (commit - read)) || 6672 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6673 return -1; 6674 6675 if (len > (commit - read)) 6676 len = (commit - read); 6677 6678 /* Always keep the time extend and data together */ 6679 size = rb_event_ts_length(event); 6680 6681 if (len < size) 6682 return -1; 6683 6684 /* save the current timestamp, since the user will need it */ 6685 save_timestamp = cpu_buffer->read_stamp; 6686 6687 /* Need to copy one event at a time */ 6688 do { 6689 /* We need the size of one event, because 6690 * rb_advance_reader only advances by one event, 6691 * whereas rb_event_ts_length may include the size of 6692 * one or two events. 6693 * We have already ensured there's enough space if this 6694 * is a time extend. */ 6695 size = rb_event_length(event); 6696 memcpy(bpage->data + pos, rpage->data + rpos, size); 6697 6698 len -= size; 6699 6700 rb_advance_reader(cpu_buffer); 6701 rpos = reader->read; 6702 pos += size; 6703 6704 if (rpos >= commit) 6705 break; 6706 6707 event = rb_reader_event(cpu_buffer); 6708 /* Always keep the time extend and data together */ 6709 size = rb_event_ts_length(event); 6710 } while (len >= size); 6711 6712 /* update bpage */ 6713 local_set(&bpage->commit, pos); 6714 bpage->time_stamp = save_timestamp; 6715 6716 /* we copied everything to the beginning */ 6717 read = 0; 6718 } else { 6719 /* update the entry counter */ 6720 cpu_buffer->read += rb_page_entries(reader); 6721 cpu_buffer->read_bytes += rb_page_size(reader); 6722 6723 /* swap the pages */ 6724 rb_init_page(bpage); 6725 bpage = reader->page; 6726 reader->page = data_page->data; 6727 local_set(&reader->write, 0); 6728 local_set(&reader->entries, 0); 6729 reader->read = 0; 6730 data_page->data = bpage; 6731 6732 /* 6733 * Use the real_end for the data size, 6734 * This gives us a chance to store the lost events 6735 * on the page. 6736 */ 6737 if (reader->real_end) 6738 local_set(&bpage->commit, reader->real_end); 6739 } 6740 6741 cpu_buffer->lost_events = 0; 6742 6743 commit = local_read(&bpage->commit); 6744 /* 6745 * Set a flag in the commit field if we lost events 6746 */ 6747 if (missed_events) { 6748 /* If there is room at the end of the page to save the 6749 * missed events, then record it there. 6750 */ 6751 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6752 memcpy(&bpage->data[commit], &missed_events, 6753 sizeof(missed_events)); 6754 local_add(RB_MISSED_STORED, &bpage->commit); 6755 commit += sizeof(missed_events); 6756 } 6757 local_add(RB_MISSED_EVENTS, &bpage->commit); 6758 } 6759 6760 /* 6761 * This page may be off to user land. Zero it out here. 6762 */ 6763 if (commit < buffer->subbuf_size) 6764 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6765 6766 return read; 6767 } 6768 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6769 6770 /** 6771 * ring_buffer_read_page_data - get pointer to the data in the page. 6772 * @page: the page to get the data from 6773 * 6774 * Returns pointer to the actual data in this page. 6775 */ 6776 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6777 { 6778 return page->data; 6779 } 6780 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6781 6782 /** 6783 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6784 * @buffer: the buffer to get the sub buffer size from 6785 * 6786 * Returns size of the sub buffer, in bytes. 6787 */ 6788 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6789 { 6790 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6791 } 6792 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6793 6794 /** 6795 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6796 * @buffer: The ring_buffer to get the system sub page order from 6797 * 6798 * By default, one ring buffer sub page equals to one system page. This parameter 6799 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6800 * extended, but must be an order of system page size. 6801 * 6802 * Returns the order of buffer sub page size, in system pages: 6803 * 0 means the sub buffer size is 1 system page and so forth. 6804 * In case of an error < 0 is returned. 6805 */ 6806 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6807 { 6808 if (!buffer) 6809 return -EINVAL; 6810 6811 return buffer->subbuf_order; 6812 } 6813 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6814 6815 /** 6816 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6817 * @buffer: The ring_buffer to set the new page size. 6818 * @order: Order of the system pages in one sub buffer page 6819 * 6820 * By default, one ring buffer pages equals to one system page. This API can be 6821 * used to set new size of the ring buffer page. The size must be order of 6822 * system page size, that's why the input parameter @order is the order of 6823 * system pages that are allocated for one ring buffer page: 6824 * 0 - 1 system page 6825 * 1 - 2 system pages 6826 * 3 - 4 system pages 6827 * ... 6828 * 6829 * Returns 0 on success or < 0 in case of an error. 6830 */ 6831 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6832 { 6833 struct ring_buffer_per_cpu *cpu_buffer; 6834 struct buffer_page *bpage, *tmp; 6835 int old_order, old_size; 6836 int nr_pages; 6837 int psize; 6838 int err; 6839 int cpu; 6840 6841 if (!buffer || order < 0) 6842 return -EINVAL; 6843 6844 if (buffer->subbuf_order == order) 6845 return 0; 6846 6847 psize = (1 << order) * PAGE_SIZE; 6848 if (psize <= BUF_PAGE_HDR_SIZE) 6849 return -EINVAL; 6850 6851 /* Size of a subbuf cannot be greater than the write counter */ 6852 if (psize > RB_WRITE_MASK + 1) 6853 return -EINVAL; 6854 6855 old_order = buffer->subbuf_order; 6856 old_size = buffer->subbuf_size; 6857 6858 /* prevent another thread from changing buffer sizes */ 6859 guard(mutex)(&buffer->mutex); 6860 atomic_inc(&buffer->record_disabled); 6861 6862 /* Make sure all commits have finished */ 6863 synchronize_rcu(); 6864 6865 buffer->subbuf_order = order; 6866 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6867 6868 /* Make sure all new buffers are allocated, before deleting the old ones */ 6869 for_each_buffer_cpu(buffer, cpu) { 6870 6871 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6872 continue; 6873 6874 cpu_buffer = buffer->buffers[cpu]; 6875 6876 if (cpu_buffer->mapped) { 6877 err = -EBUSY; 6878 goto error; 6879 } 6880 6881 /* Update the number of pages to match the new size */ 6882 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6883 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6884 6885 /* we need a minimum of two pages */ 6886 if (nr_pages < 2) 6887 nr_pages = 2; 6888 6889 cpu_buffer->nr_pages_to_update = nr_pages; 6890 6891 /* Include the reader page */ 6892 nr_pages++; 6893 6894 /* Allocate the new size buffer */ 6895 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6896 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6897 &cpu_buffer->new_pages)) { 6898 /* not enough memory for new pages */ 6899 err = -ENOMEM; 6900 goto error; 6901 } 6902 } 6903 6904 for_each_buffer_cpu(buffer, cpu) { 6905 struct buffer_data_page *old_free_data_page; 6906 struct list_head old_pages; 6907 unsigned long flags; 6908 6909 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6910 continue; 6911 6912 cpu_buffer = buffer->buffers[cpu]; 6913 6914 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6915 6916 /* Clear the head bit to make the link list normal to read */ 6917 rb_head_page_deactivate(cpu_buffer); 6918 6919 /* 6920 * Collect buffers from the cpu_buffer pages list and the 6921 * reader_page on old_pages, so they can be freed later when not 6922 * under a spinlock. The pages list is a linked list with no 6923 * head, adding old_pages turns it into a regular list with 6924 * old_pages being the head. 6925 */ 6926 list_add(&old_pages, cpu_buffer->pages); 6927 list_add(&cpu_buffer->reader_page->list, &old_pages); 6928 6929 /* One page was allocated for the reader page */ 6930 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6931 struct buffer_page, list); 6932 list_del_init(&cpu_buffer->reader_page->list); 6933 6934 /* Install the new pages, remove the head from the list */ 6935 cpu_buffer->pages = cpu_buffer->new_pages.next; 6936 list_del_init(&cpu_buffer->new_pages); 6937 cpu_buffer->cnt++; 6938 6939 cpu_buffer->head_page 6940 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6941 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6942 6943 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6944 cpu_buffer->nr_pages_to_update = 0; 6945 6946 old_free_data_page = cpu_buffer->free_page; 6947 cpu_buffer->free_page = NULL; 6948 6949 rb_head_page_activate(cpu_buffer); 6950 6951 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6952 6953 /* Free old sub buffers */ 6954 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6955 list_del_init(&bpage->list); 6956 free_buffer_page(bpage); 6957 } 6958 free_pages((unsigned long)old_free_data_page, old_order); 6959 6960 rb_check_pages(cpu_buffer); 6961 } 6962 6963 atomic_dec(&buffer->record_disabled); 6964 6965 return 0; 6966 6967 error: 6968 buffer->subbuf_order = old_order; 6969 buffer->subbuf_size = old_size; 6970 6971 atomic_dec(&buffer->record_disabled); 6972 6973 for_each_buffer_cpu(buffer, cpu) { 6974 cpu_buffer = buffer->buffers[cpu]; 6975 6976 if (!cpu_buffer->nr_pages_to_update) 6977 continue; 6978 6979 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6980 list_del_init(&bpage->list); 6981 free_buffer_page(bpage); 6982 } 6983 } 6984 6985 return err; 6986 } 6987 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6988 6989 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6990 { 6991 struct page *page; 6992 6993 if (cpu_buffer->meta_page) 6994 return 0; 6995 6996 page = alloc_page(GFP_USER | __GFP_ZERO); 6997 if (!page) 6998 return -ENOMEM; 6999 7000 cpu_buffer->meta_page = page_to_virt(page); 7001 7002 return 0; 7003 } 7004 7005 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7006 { 7007 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 7008 7009 free_page(addr); 7010 cpu_buffer->meta_page = NULL; 7011 } 7012 7013 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7014 unsigned long *subbuf_ids) 7015 { 7016 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7017 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7018 struct buffer_page *first_subbuf, *subbuf; 7019 int cnt = 0; 7020 int id = 0; 7021 7022 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7023 subbuf_ids[id++] = (unsigned long)cpu_buffer->reader_page->page; 7024 cnt++; 7025 7026 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7027 do { 7028 id = rb_page_id(cpu_buffer, subbuf, id); 7029 7030 if (WARN_ON(id >= nr_subbufs)) 7031 break; 7032 7033 subbuf_ids[id] = (unsigned long)subbuf->page; 7034 7035 rb_inc_page(&subbuf); 7036 id++; 7037 cnt++; 7038 } while (subbuf != first_subbuf); 7039 7040 WARN_ON(cnt != nr_subbufs); 7041 7042 /* install subbuf ID to kern VA translation */ 7043 cpu_buffer->subbuf_ids = subbuf_ids; 7044 7045 meta->meta_struct_len = sizeof(*meta); 7046 meta->nr_subbufs = nr_subbufs; 7047 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7048 meta->meta_page_size = meta->subbuf_size; 7049 7050 rb_update_meta_page(cpu_buffer); 7051 } 7052 7053 static struct ring_buffer_per_cpu * 7054 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7055 { 7056 struct ring_buffer_per_cpu *cpu_buffer; 7057 7058 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7059 return ERR_PTR(-EINVAL); 7060 7061 cpu_buffer = buffer->buffers[cpu]; 7062 7063 mutex_lock(&cpu_buffer->mapping_lock); 7064 7065 if (!cpu_buffer->user_mapped) { 7066 mutex_unlock(&cpu_buffer->mapping_lock); 7067 return ERR_PTR(-ENODEV); 7068 } 7069 7070 return cpu_buffer; 7071 } 7072 7073 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7074 { 7075 mutex_unlock(&cpu_buffer->mapping_lock); 7076 } 7077 7078 /* 7079 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7080 * to be set-up or torn-down. 7081 */ 7082 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7083 bool inc) 7084 { 7085 unsigned long flags; 7086 7087 lockdep_assert_held(&cpu_buffer->mapping_lock); 7088 7089 /* mapped is always greater or equal to user_mapped */ 7090 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7091 return -EINVAL; 7092 7093 if (inc && cpu_buffer->mapped == UINT_MAX) 7094 return -EBUSY; 7095 7096 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7097 return -EINVAL; 7098 7099 mutex_lock(&cpu_buffer->buffer->mutex); 7100 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7101 7102 if (inc) { 7103 cpu_buffer->user_mapped++; 7104 cpu_buffer->mapped++; 7105 } else { 7106 cpu_buffer->user_mapped--; 7107 cpu_buffer->mapped--; 7108 } 7109 7110 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7111 mutex_unlock(&cpu_buffer->buffer->mutex); 7112 7113 return 0; 7114 } 7115 7116 /* 7117 * +--------------+ pgoff == 0 7118 * | meta page | 7119 * +--------------+ pgoff == 1 7120 * | subbuffer 0 | 7121 * | | 7122 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7123 * | subbuffer 1 | 7124 * | | 7125 * ... 7126 */ 7127 #ifdef CONFIG_MMU 7128 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7129 struct vm_area_struct *vma) 7130 { 7131 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7132 unsigned int subbuf_pages, subbuf_order; 7133 struct page **pages __free(kfree) = NULL; 7134 int p = 0, s = 0; 7135 int err; 7136 7137 /* Refuse MP_PRIVATE or writable mappings */ 7138 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7139 !(vma->vm_flags & VM_MAYSHARE)) 7140 return -EPERM; 7141 7142 subbuf_order = cpu_buffer->buffer->subbuf_order; 7143 subbuf_pages = 1 << subbuf_order; 7144 7145 if (subbuf_order && pgoff % subbuf_pages) 7146 return -EINVAL; 7147 7148 /* 7149 * Make sure the mapping cannot become writable later. Also tell the VM 7150 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7151 */ 7152 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7153 VM_MAYWRITE); 7154 7155 lockdep_assert_held(&cpu_buffer->mapping_lock); 7156 7157 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7158 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7159 if (nr_pages <= pgoff) 7160 return -EINVAL; 7161 7162 nr_pages -= pgoff; 7163 7164 nr_vma_pages = vma_pages(vma); 7165 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7166 return -EINVAL; 7167 7168 nr_pages = nr_vma_pages; 7169 7170 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7171 if (!pages) 7172 return -ENOMEM; 7173 7174 if (!pgoff) { 7175 unsigned long meta_page_padding; 7176 7177 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7178 7179 /* 7180 * Pad with the zero-page to align the meta-page with the 7181 * sub-buffers. 7182 */ 7183 meta_page_padding = subbuf_pages - 1; 7184 while (meta_page_padding-- && p < nr_pages) { 7185 unsigned long __maybe_unused zero_addr = 7186 vma->vm_start + (PAGE_SIZE * p); 7187 7188 pages[p++] = ZERO_PAGE(zero_addr); 7189 } 7190 } else { 7191 /* Skip the meta-page */ 7192 pgoff -= subbuf_pages; 7193 7194 s += pgoff / subbuf_pages; 7195 } 7196 7197 while (p < nr_pages) { 7198 struct page *page; 7199 int off = 0; 7200 7201 if (WARN_ON_ONCE(s >= nr_subbufs)) 7202 return -EINVAL; 7203 7204 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7205 7206 for (; off < (1 << (subbuf_order)); off++, page++) { 7207 if (p >= nr_pages) 7208 break; 7209 7210 pages[p++] = page; 7211 } 7212 s++; 7213 } 7214 7215 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7216 7217 return err; 7218 } 7219 #else 7220 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7221 struct vm_area_struct *vma) 7222 { 7223 return -EOPNOTSUPP; 7224 } 7225 #endif 7226 7227 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7228 struct vm_area_struct *vma) 7229 { 7230 struct ring_buffer_per_cpu *cpu_buffer; 7231 unsigned long flags, *subbuf_ids; 7232 int err; 7233 7234 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7235 return -EINVAL; 7236 7237 cpu_buffer = buffer->buffers[cpu]; 7238 7239 guard(mutex)(&cpu_buffer->mapping_lock); 7240 7241 if (cpu_buffer->user_mapped) { 7242 err = __rb_map_vma(cpu_buffer, vma); 7243 if (!err) 7244 err = __rb_inc_dec_mapped(cpu_buffer, true); 7245 return err; 7246 } 7247 7248 /* prevent another thread from changing buffer/sub-buffer sizes */ 7249 guard(mutex)(&buffer->mutex); 7250 7251 err = rb_alloc_meta_page(cpu_buffer); 7252 if (err) 7253 return err; 7254 7255 /* subbuf_ids include the reader while nr_pages does not */ 7256 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7257 if (!subbuf_ids) { 7258 rb_free_meta_page(cpu_buffer); 7259 return -ENOMEM; 7260 } 7261 7262 atomic_inc(&cpu_buffer->resize_disabled); 7263 7264 /* 7265 * Lock all readers to block any subbuf swap until the subbuf IDs are 7266 * assigned. 7267 */ 7268 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7269 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7270 7271 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7272 7273 err = __rb_map_vma(cpu_buffer, vma); 7274 if (!err) { 7275 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7276 /* This is the first time it is mapped by user */ 7277 cpu_buffer->mapped++; 7278 cpu_buffer->user_mapped = 1; 7279 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7280 } else { 7281 kfree(cpu_buffer->subbuf_ids); 7282 cpu_buffer->subbuf_ids = NULL; 7283 rb_free_meta_page(cpu_buffer); 7284 atomic_dec(&cpu_buffer->resize_disabled); 7285 } 7286 7287 return err; 7288 } 7289 7290 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7291 { 7292 struct ring_buffer_per_cpu *cpu_buffer; 7293 unsigned long flags; 7294 7295 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7296 return -EINVAL; 7297 7298 cpu_buffer = buffer->buffers[cpu]; 7299 7300 guard(mutex)(&cpu_buffer->mapping_lock); 7301 7302 if (!cpu_buffer->user_mapped) { 7303 return -ENODEV; 7304 } else if (cpu_buffer->user_mapped > 1) { 7305 __rb_inc_dec_mapped(cpu_buffer, false); 7306 return 0; 7307 } 7308 7309 guard(mutex)(&buffer->mutex); 7310 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7311 7312 /* This is the last user space mapping */ 7313 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7314 cpu_buffer->mapped--; 7315 cpu_buffer->user_mapped = 0; 7316 7317 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7318 7319 kfree(cpu_buffer->subbuf_ids); 7320 cpu_buffer->subbuf_ids = NULL; 7321 rb_free_meta_page(cpu_buffer); 7322 atomic_dec(&cpu_buffer->resize_disabled); 7323 7324 return 0; 7325 } 7326 7327 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7328 { 7329 struct ring_buffer_per_cpu *cpu_buffer; 7330 struct buffer_page *reader; 7331 unsigned long missed_events; 7332 unsigned long reader_size; 7333 unsigned long flags; 7334 7335 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7336 if (IS_ERR(cpu_buffer)) 7337 return (int)PTR_ERR(cpu_buffer); 7338 7339 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7340 7341 consume: 7342 if (rb_per_cpu_empty(cpu_buffer)) 7343 goto out; 7344 7345 reader_size = rb_page_size(cpu_buffer->reader_page); 7346 7347 /* 7348 * There are data to be read on the current reader page, we can 7349 * return to the caller. But before that, we assume the latter will read 7350 * everything. Let's update the kernel reader accordingly. 7351 */ 7352 if (cpu_buffer->reader_page->read < reader_size) { 7353 while (cpu_buffer->reader_page->read < reader_size) 7354 rb_advance_reader(cpu_buffer); 7355 goto out; 7356 } 7357 7358 /* Did the reader catch up with the writer? */ 7359 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 7360 goto out; 7361 7362 reader = rb_get_reader_page(cpu_buffer); 7363 if (WARN_ON(!reader)) 7364 goto out; 7365 7366 /* Check if any events were dropped */ 7367 missed_events = cpu_buffer->lost_events; 7368 7369 if (missed_events) { 7370 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7371 struct buffer_data_page *bpage = reader->page; 7372 unsigned int commit; 7373 /* 7374 * Use the real_end for the data size, 7375 * This gives us a chance to store the lost events 7376 * on the page. 7377 */ 7378 if (reader->real_end) 7379 local_set(&bpage->commit, reader->real_end); 7380 /* 7381 * If there is room at the end of the page to save the 7382 * missed events, then record it there. 7383 */ 7384 commit = rb_page_size(reader); 7385 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7386 memcpy(&bpage->data[commit], &missed_events, 7387 sizeof(missed_events)); 7388 local_add(RB_MISSED_STORED, &bpage->commit); 7389 } 7390 local_add(RB_MISSED_EVENTS, &bpage->commit); 7391 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7392 "Reader on commit with %ld missed events", 7393 missed_events)) { 7394 /* 7395 * There shouldn't be any missed events if the tail_page 7396 * is on the reader page. But if the tail page is not on the 7397 * reader page and the commit_page is, that would mean that 7398 * there's a commit_overrun (an interrupt preempted an 7399 * addition of an event and then filled the buffer 7400 * with new events). In this case it's not an 7401 * error, but it should still be reported. 7402 * 7403 * TODO: Add missed events to the page for user space to know. 7404 */ 7405 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7406 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7407 } 7408 } 7409 7410 cpu_buffer->lost_events = 0; 7411 7412 goto consume; 7413 7414 out: 7415 /* Some archs do not have data cache coherency between kernel and user-space */ 7416 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7417 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7418 7419 rb_update_meta_page(cpu_buffer); 7420 7421 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7422 rb_put_mapped_buffer(cpu_buffer); 7423 7424 return 0; 7425 } 7426 7427 /* 7428 * We only allocate new buffers, never free them if the CPU goes down. 7429 * If we were to free the buffer, then the user would lose any trace that was in 7430 * the buffer. 7431 */ 7432 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7433 { 7434 struct trace_buffer *buffer; 7435 long nr_pages_same; 7436 int cpu_i; 7437 unsigned long nr_pages; 7438 7439 buffer = container_of(node, struct trace_buffer, node); 7440 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7441 return 0; 7442 7443 nr_pages = 0; 7444 nr_pages_same = 1; 7445 /* check if all cpu sizes are same */ 7446 for_each_buffer_cpu(buffer, cpu_i) { 7447 /* fill in the size from first enabled cpu */ 7448 if (nr_pages == 0) 7449 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7450 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7451 nr_pages_same = 0; 7452 break; 7453 } 7454 } 7455 /* allocate minimum pages, user can later expand it */ 7456 if (!nr_pages_same) 7457 nr_pages = 2; 7458 buffer->buffers[cpu] = 7459 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7460 if (!buffer->buffers[cpu]) { 7461 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7462 cpu); 7463 return -ENOMEM; 7464 } 7465 smp_wmb(); 7466 cpumask_set_cpu(cpu, buffer->cpumask); 7467 return 0; 7468 } 7469 7470 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7471 /* 7472 * This is a basic integrity check of the ring buffer. 7473 * Late in the boot cycle this test will run when configured in. 7474 * It will kick off a thread per CPU that will go into a loop 7475 * writing to the per cpu ring buffer various sizes of data. 7476 * Some of the data will be large items, some small. 7477 * 7478 * Another thread is created that goes into a spin, sending out 7479 * IPIs to the other CPUs to also write into the ring buffer. 7480 * this is to test the nesting ability of the buffer. 7481 * 7482 * Basic stats are recorded and reported. If something in the 7483 * ring buffer should happen that's not expected, a big warning 7484 * is displayed and all ring buffers are disabled. 7485 */ 7486 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7487 7488 struct rb_test_data { 7489 struct trace_buffer *buffer; 7490 unsigned long events; 7491 unsigned long bytes_written; 7492 unsigned long bytes_alloc; 7493 unsigned long bytes_dropped; 7494 unsigned long events_nested; 7495 unsigned long bytes_written_nested; 7496 unsigned long bytes_alloc_nested; 7497 unsigned long bytes_dropped_nested; 7498 int min_size_nested; 7499 int max_size_nested; 7500 int max_size; 7501 int min_size; 7502 int cpu; 7503 int cnt; 7504 }; 7505 7506 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7507 7508 /* 1 meg per cpu */ 7509 #define RB_TEST_BUFFER_SIZE 1048576 7510 7511 static char rb_string[] __initdata = 7512 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7513 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7514 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7515 7516 static bool rb_test_started __initdata; 7517 7518 struct rb_item { 7519 int size; 7520 char str[]; 7521 }; 7522 7523 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7524 { 7525 struct ring_buffer_event *event; 7526 struct rb_item *item; 7527 bool started; 7528 int event_len; 7529 int size; 7530 int len; 7531 int cnt; 7532 7533 /* Have nested writes different that what is written */ 7534 cnt = data->cnt + (nested ? 27 : 0); 7535 7536 /* Multiply cnt by ~e, to make some unique increment */ 7537 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7538 7539 len = size + sizeof(struct rb_item); 7540 7541 started = rb_test_started; 7542 /* read rb_test_started before checking buffer enabled */ 7543 smp_rmb(); 7544 7545 event = ring_buffer_lock_reserve(data->buffer, len); 7546 if (!event) { 7547 /* Ignore dropped events before test starts. */ 7548 if (started) { 7549 if (nested) 7550 data->bytes_dropped_nested += len; 7551 else 7552 data->bytes_dropped += len; 7553 } 7554 return len; 7555 } 7556 7557 event_len = ring_buffer_event_length(event); 7558 7559 if (RB_WARN_ON(data->buffer, event_len < len)) 7560 goto out; 7561 7562 item = ring_buffer_event_data(event); 7563 item->size = size; 7564 memcpy(item->str, rb_string, size); 7565 7566 if (nested) { 7567 data->bytes_alloc_nested += event_len; 7568 data->bytes_written_nested += len; 7569 data->events_nested++; 7570 if (!data->min_size_nested || len < data->min_size_nested) 7571 data->min_size_nested = len; 7572 if (len > data->max_size_nested) 7573 data->max_size_nested = len; 7574 } else { 7575 data->bytes_alloc += event_len; 7576 data->bytes_written += len; 7577 data->events++; 7578 if (!data->min_size || len < data->min_size) 7579 data->max_size = len; 7580 if (len > data->max_size) 7581 data->max_size = len; 7582 } 7583 7584 out: 7585 ring_buffer_unlock_commit(data->buffer); 7586 7587 return 0; 7588 } 7589 7590 static __init int rb_test(void *arg) 7591 { 7592 struct rb_test_data *data = arg; 7593 7594 while (!kthread_should_stop()) { 7595 rb_write_something(data, false); 7596 data->cnt++; 7597 7598 set_current_state(TASK_INTERRUPTIBLE); 7599 /* Now sleep between a min of 100-300us and a max of 1ms */ 7600 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7601 } 7602 7603 return 0; 7604 } 7605 7606 static __init void rb_ipi(void *ignore) 7607 { 7608 struct rb_test_data *data; 7609 int cpu = smp_processor_id(); 7610 7611 data = &rb_data[cpu]; 7612 rb_write_something(data, true); 7613 } 7614 7615 static __init int rb_hammer_test(void *arg) 7616 { 7617 while (!kthread_should_stop()) { 7618 7619 /* Send an IPI to all cpus to write data! */ 7620 smp_call_function(rb_ipi, NULL, 1); 7621 /* No sleep, but for non preempt, let others run */ 7622 schedule(); 7623 } 7624 7625 return 0; 7626 } 7627 7628 static __init int test_ringbuffer(void) 7629 { 7630 struct task_struct *rb_hammer; 7631 struct trace_buffer *buffer; 7632 int cpu; 7633 int ret = 0; 7634 7635 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7636 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7637 return 0; 7638 } 7639 7640 pr_info("Running ring buffer tests...\n"); 7641 7642 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7643 if (WARN_ON(!buffer)) 7644 return 0; 7645 7646 /* Disable buffer so that threads can't write to it yet */ 7647 ring_buffer_record_off(buffer); 7648 7649 for_each_online_cpu(cpu) { 7650 rb_data[cpu].buffer = buffer; 7651 rb_data[cpu].cpu = cpu; 7652 rb_data[cpu].cnt = cpu; 7653 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7654 cpu, "rbtester/%u"); 7655 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7656 pr_cont("FAILED\n"); 7657 ret = PTR_ERR(rb_threads[cpu]); 7658 goto out_free; 7659 } 7660 } 7661 7662 /* Now create the rb hammer! */ 7663 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7664 if (WARN_ON(IS_ERR(rb_hammer))) { 7665 pr_cont("FAILED\n"); 7666 ret = PTR_ERR(rb_hammer); 7667 goto out_free; 7668 } 7669 7670 ring_buffer_record_on(buffer); 7671 /* 7672 * Show buffer is enabled before setting rb_test_started. 7673 * Yes there's a small race window where events could be 7674 * dropped and the thread won't catch it. But when a ring 7675 * buffer gets enabled, there will always be some kind of 7676 * delay before other CPUs see it. Thus, we don't care about 7677 * those dropped events. We care about events dropped after 7678 * the threads see that the buffer is active. 7679 */ 7680 smp_wmb(); 7681 rb_test_started = true; 7682 7683 set_current_state(TASK_INTERRUPTIBLE); 7684 /* Just run for 10 seconds */ 7685 schedule_timeout(10 * HZ); 7686 7687 kthread_stop(rb_hammer); 7688 7689 out_free: 7690 for_each_online_cpu(cpu) { 7691 if (!rb_threads[cpu]) 7692 break; 7693 kthread_stop(rb_threads[cpu]); 7694 } 7695 if (ret) { 7696 ring_buffer_free(buffer); 7697 return ret; 7698 } 7699 7700 /* Report! */ 7701 pr_info("finished\n"); 7702 for_each_online_cpu(cpu) { 7703 struct ring_buffer_event *event; 7704 struct rb_test_data *data = &rb_data[cpu]; 7705 struct rb_item *item; 7706 unsigned long total_events; 7707 unsigned long total_dropped; 7708 unsigned long total_written; 7709 unsigned long total_alloc; 7710 unsigned long total_read = 0; 7711 unsigned long total_size = 0; 7712 unsigned long total_len = 0; 7713 unsigned long total_lost = 0; 7714 unsigned long lost; 7715 int big_event_size; 7716 int small_event_size; 7717 7718 ret = -1; 7719 7720 total_events = data->events + data->events_nested; 7721 total_written = data->bytes_written + data->bytes_written_nested; 7722 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7723 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7724 7725 big_event_size = data->max_size + data->max_size_nested; 7726 small_event_size = data->min_size + data->min_size_nested; 7727 7728 pr_info("CPU %d:\n", cpu); 7729 pr_info(" events: %ld\n", total_events); 7730 pr_info(" dropped bytes: %ld\n", total_dropped); 7731 pr_info(" alloced bytes: %ld\n", total_alloc); 7732 pr_info(" written bytes: %ld\n", total_written); 7733 pr_info(" biggest event: %d\n", big_event_size); 7734 pr_info(" smallest event: %d\n", small_event_size); 7735 7736 if (RB_WARN_ON(buffer, total_dropped)) 7737 break; 7738 7739 ret = 0; 7740 7741 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7742 total_lost += lost; 7743 item = ring_buffer_event_data(event); 7744 total_len += ring_buffer_event_length(event); 7745 total_size += item->size + sizeof(struct rb_item); 7746 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7747 pr_info("FAILED!\n"); 7748 pr_info("buffer had: %.*s\n", item->size, item->str); 7749 pr_info("expected: %.*s\n", item->size, rb_string); 7750 RB_WARN_ON(buffer, 1); 7751 ret = -1; 7752 break; 7753 } 7754 total_read++; 7755 } 7756 if (ret) 7757 break; 7758 7759 ret = -1; 7760 7761 pr_info(" read events: %ld\n", total_read); 7762 pr_info(" lost events: %ld\n", total_lost); 7763 pr_info(" total events: %ld\n", total_lost + total_read); 7764 pr_info(" recorded len bytes: %ld\n", total_len); 7765 pr_info(" recorded size bytes: %ld\n", total_size); 7766 if (total_lost) { 7767 pr_info(" With dropped events, record len and size may not match\n" 7768 " alloced and written from above\n"); 7769 } else { 7770 if (RB_WARN_ON(buffer, total_len != total_alloc || 7771 total_size != total_written)) 7772 break; 7773 } 7774 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7775 break; 7776 7777 ret = 0; 7778 } 7779 if (!ret) 7780 pr_info("Ring buffer PASSED!\n"); 7781 7782 ring_buffer_free(buffer); 7783 return 0; 7784 } 7785 7786 late_initcall(test_ringbuffer); 7787 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7788