1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/sched/isolation.h> 8 #include <linux/trace_recursion.h> 9 #include <linux/trace_events.h> 10 #include <linux/ring_buffer.h> 11 #include <linux/trace_clock.h> 12 #include <linux/sched/clock.h> 13 #include <linux/cacheflush.h> 14 #include <linux/trace_seq.h> 15 #include <linux/spinlock.h> 16 #include <linux/irq_work.h> 17 #include <linux/security.h> 18 #include <linux/uaccess.h> 19 #include <linux/hardirq.h> 20 #include <linux/kthread.h> /* for self test */ 21 #include <linux/module.h> 22 #include <linux/percpu.h> 23 #include <linux/mutex.h> 24 #include <linux/delay.h> 25 #include <linux/slab.h> 26 #include <linux/init.h> 27 #include <linux/hash.h> 28 #include <linux/list.h> 29 #include <linux/cpu.h> 30 #include <linux/oom.h> 31 #include <linux/mm.h> 32 33 #include <asm/local64.h> 34 #include <asm/local.h> 35 #include <asm/setup.h> 36 37 #include "trace.h" 38 39 /* 40 * The "absolute" timestamp in the buffer is only 59 bits. 41 * If a clock has the 5 MSBs set, it needs to be saved and 42 * reinserted. 43 */ 44 #define TS_MSB (0xf8ULL << 56) 45 #define ABS_TS_MASK (~TS_MSB) 46 47 static void update_pages_handler(struct work_struct *work); 48 49 #define RING_BUFFER_META_MAGIC 0xBADFEED 50 51 struct ring_buffer_meta { 52 int magic; 53 int struct_sizes; 54 unsigned long total_size; 55 unsigned long buffers_offset; 56 }; 57 58 struct ring_buffer_cpu_meta { 59 unsigned long first_buffer; 60 unsigned long head_buffer; 61 unsigned long commit_buffer; 62 __u32 subbuf_size; 63 __u32 nr_subbufs; 64 int buffers[]; 65 }; 66 67 /* 68 * The ring buffer header is special. We must manually up keep it. 69 */ 70 int ring_buffer_print_entry_header(struct trace_seq *s) 71 { 72 trace_seq_puts(s, "# compressed entry header\n"); 73 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 74 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 75 trace_seq_puts(s, "\tarray : 32 bits\n"); 76 trace_seq_putc(s, '\n'); 77 trace_seq_printf(s, "\tpadding : type == %d\n", 78 RINGBUF_TYPE_PADDING); 79 trace_seq_printf(s, "\ttime_extend : type == %d\n", 80 RINGBUF_TYPE_TIME_EXTEND); 81 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 82 RINGBUF_TYPE_TIME_STAMP); 83 trace_seq_printf(s, "\tdata max type_len == %d\n", 84 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 85 86 return !trace_seq_has_overflowed(s); 87 } 88 89 /* 90 * The ring buffer is made up of a list of pages. A separate list of pages is 91 * allocated for each CPU. A writer may only write to a buffer that is 92 * associated with the CPU it is currently executing on. A reader may read 93 * from any per cpu buffer. 94 * 95 * The reader is special. For each per cpu buffer, the reader has its own 96 * reader page. When a reader has read the entire reader page, this reader 97 * page is swapped with another page in the ring buffer. 98 * 99 * Now, as long as the writer is off the reader page, the reader can do what 100 * ever it wants with that page. The writer will never write to that page 101 * again (as long as it is out of the ring buffer). 102 * 103 * Here's some silly ASCII art. 104 * 105 * +------+ 106 * |reader| RING BUFFER 107 * |page | 108 * +------+ +---+ +---+ +---+ 109 * | |-->| |-->| | 110 * +---+ +---+ +---+ 111 * ^ | 112 * | | 113 * +---------------+ 114 * 115 * 116 * +------+ 117 * |reader| RING BUFFER 118 * |page |------------------v 119 * +------+ +---+ +---+ +---+ 120 * | |-->| |-->| | 121 * +---+ +---+ +---+ 122 * ^ | 123 * | | 124 * +---------------+ 125 * 126 * 127 * +------+ 128 * |reader| RING BUFFER 129 * |page |------------------v 130 * +------+ +---+ +---+ +---+ 131 * ^ | |-->| |-->| | 132 * | +---+ +---+ +---+ 133 * | | 134 * | | 135 * +------------------------------+ 136 * 137 * 138 * +------+ 139 * |buffer| RING BUFFER 140 * |page |------------------v 141 * +------+ +---+ +---+ +---+ 142 * ^ | | | |-->| | 143 * | New +---+ +---+ +---+ 144 * | Reader------^ | 145 * | page | 146 * +------------------------------+ 147 * 148 * 149 * After we make this swap, the reader can hand this page off to the splice 150 * code and be done with it. It can even allocate a new page if it needs to 151 * and swap that into the ring buffer. 152 * 153 * We will be using cmpxchg soon to make all this lockless. 154 * 155 */ 156 157 /* Used for individual buffers (after the counter) */ 158 #define RB_BUFFER_OFF (1 << 20) 159 160 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 161 162 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 163 #define RB_ALIGNMENT 4U 164 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 165 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 166 167 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 168 # define RB_FORCE_8BYTE_ALIGNMENT 0 169 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 170 #else 171 # define RB_FORCE_8BYTE_ALIGNMENT 1 172 # define RB_ARCH_ALIGNMENT 8U 173 #endif 174 175 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 176 177 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 178 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 179 180 enum { 181 RB_LEN_TIME_EXTEND = 8, 182 RB_LEN_TIME_STAMP = 8, 183 }; 184 185 #define skip_time_extend(event) \ 186 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 187 188 #define extended_time(event) \ 189 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 190 191 static inline bool rb_null_event(struct ring_buffer_event *event) 192 { 193 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 194 } 195 196 static void rb_event_set_padding(struct ring_buffer_event *event) 197 { 198 /* padding has a NULL time_delta */ 199 event->type_len = RINGBUF_TYPE_PADDING; 200 event->time_delta = 0; 201 } 202 203 static unsigned 204 rb_event_data_length(struct ring_buffer_event *event) 205 { 206 unsigned length; 207 208 if (event->type_len) 209 length = event->type_len * RB_ALIGNMENT; 210 else 211 length = event->array[0]; 212 return length + RB_EVNT_HDR_SIZE; 213 } 214 215 /* 216 * Return the length of the given event. Will return 217 * the length of the time extend if the event is a 218 * time extend. 219 */ 220 static inline unsigned 221 rb_event_length(struct ring_buffer_event *event) 222 { 223 switch (event->type_len) { 224 case RINGBUF_TYPE_PADDING: 225 if (rb_null_event(event)) 226 /* undefined */ 227 return -1; 228 return event->array[0] + RB_EVNT_HDR_SIZE; 229 230 case RINGBUF_TYPE_TIME_EXTEND: 231 return RB_LEN_TIME_EXTEND; 232 233 case RINGBUF_TYPE_TIME_STAMP: 234 return RB_LEN_TIME_STAMP; 235 236 case RINGBUF_TYPE_DATA: 237 return rb_event_data_length(event); 238 default: 239 WARN_ON_ONCE(1); 240 } 241 /* not hit */ 242 return 0; 243 } 244 245 /* 246 * Return total length of time extend and data, 247 * or just the event length for all other events. 248 */ 249 static inline unsigned 250 rb_event_ts_length(struct ring_buffer_event *event) 251 { 252 unsigned len = 0; 253 254 if (extended_time(event)) { 255 /* time extends include the data event after it */ 256 len = RB_LEN_TIME_EXTEND; 257 event = skip_time_extend(event); 258 } 259 return len + rb_event_length(event); 260 } 261 262 /** 263 * ring_buffer_event_length - return the length of the event 264 * @event: the event to get the length of 265 * 266 * Returns the size of the data load of a data event. 267 * If the event is something other than a data event, it 268 * returns the size of the event itself. With the exception 269 * of a TIME EXTEND, where it still returns the size of the 270 * data load of the data event after it. 271 */ 272 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 273 { 274 unsigned length; 275 276 if (extended_time(event)) 277 event = skip_time_extend(event); 278 279 length = rb_event_length(event); 280 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 281 return length; 282 length -= RB_EVNT_HDR_SIZE; 283 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 284 length -= sizeof(event->array[0]); 285 return length; 286 } 287 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 288 289 /* inline for ring buffer fast paths */ 290 static __always_inline void * 291 rb_event_data(struct ring_buffer_event *event) 292 { 293 if (extended_time(event)) 294 event = skip_time_extend(event); 295 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 296 /* If length is in len field, then array[0] has the data */ 297 if (event->type_len) 298 return (void *)&event->array[0]; 299 /* Otherwise length is in array[0] and array[1] has the data */ 300 return (void *)&event->array[1]; 301 } 302 303 /** 304 * ring_buffer_event_data - return the data of the event 305 * @event: the event to get the data from 306 */ 307 void *ring_buffer_event_data(struct ring_buffer_event *event) 308 { 309 return rb_event_data(event); 310 } 311 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 312 313 #define for_each_buffer_cpu(buffer, cpu) \ 314 for_each_cpu(cpu, buffer->cpumask) 315 316 #define for_each_online_buffer_cpu(buffer, cpu) \ 317 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 318 319 #define TS_SHIFT 27 320 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 321 #define TS_DELTA_TEST (~TS_MASK) 322 323 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 324 { 325 u64 ts; 326 327 ts = event->array[0]; 328 ts <<= TS_SHIFT; 329 ts += event->time_delta; 330 331 return ts; 332 } 333 334 /* Flag when events were overwritten */ 335 #define RB_MISSED_EVENTS (1 << 31) 336 /* Missed count stored at end */ 337 #define RB_MISSED_STORED (1 << 30) 338 339 #define RB_MISSED_MASK (3 << 30) 340 341 struct buffer_data_page { 342 u64 time_stamp; /* page time stamp */ 343 local_t commit; /* write committed index */ 344 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 345 }; 346 347 struct buffer_data_read_page { 348 unsigned order; /* order of the page */ 349 struct buffer_data_page *data; /* actual data, stored in this page */ 350 }; 351 352 /* 353 * Note, the buffer_page list must be first. The buffer pages 354 * are allocated in cache lines, which means that each buffer 355 * page will be at the beginning of a cache line, and thus 356 * the least significant bits will be zero. We use this to 357 * add flags in the list struct pointers, to make the ring buffer 358 * lockless. 359 */ 360 struct buffer_page { 361 struct list_head list; /* list of buffer pages */ 362 local_t write; /* index for next write */ 363 unsigned read; /* index for next read */ 364 local_t entries; /* entries on this page */ 365 unsigned long real_end; /* real end of data */ 366 unsigned order; /* order of the page */ 367 u32 id:30; /* ID for external mapping */ 368 u32 range:1; /* Mapped via a range */ 369 struct buffer_data_page *page; /* Actual data page */ 370 }; 371 372 /* 373 * The buffer page counters, write and entries, must be reset 374 * atomically when crossing page boundaries. To synchronize this 375 * update, two counters are inserted into the number. One is 376 * the actual counter for the write position or count on the page. 377 * 378 * The other is a counter of updaters. Before an update happens 379 * the update partition of the counter is incremented. This will 380 * allow the updater to update the counter atomically. 381 * 382 * The counter is 20 bits, and the state data is 12. 383 */ 384 #define RB_WRITE_MASK 0xfffff 385 #define RB_WRITE_INTCNT (1 << 20) 386 387 static void rb_init_page(struct buffer_data_page *bpage) 388 { 389 local_set(&bpage->commit, 0); 390 } 391 392 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 393 { 394 return local_read(&bpage->page->commit); 395 } 396 397 static void free_buffer_page(struct buffer_page *bpage) 398 { 399 /* Range pages are not to be freed */ 400 if (!bpage->range) 401 free_pages((unsigned long)bpage->page, bpage->order); 402 kfree(bpage); 403 } 404 405 /* 406 * For best performance, allocate cpu buffer data cache line sized 407 * and per CPU. 408 */ 409 #define alloc_cpu_buffer(cpu) (struct ring_buffer_per_cpu *) \ 410 kzalloc_node(ALIGN(sizeof(struct ring_buffer_per_cpu), \ 411 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 412 413 #define alloc_cpu_page(cpu) (struct buffer_page *) \ 414 kzalloc_node(ALIGN(sizeof(struct buffer_page), \ 415 cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); 416 417 static struct buffer_data_page *alloc_cpu_data(int cpu, int order) 418 { 419 struct buffer_data_page *dpage; 420 struct page *page; 421 gfp_t mflags; 422 423 /* 424 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 425 * gracefully without invoking oom-killer and the system is not 426 * destabilized. 427 */ 428 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL | __GFP_COMP | __GFP_ZERO; 429 430 page = alloc_pages_node(cpu_to_node(cpu), mflags, order); 431 if (!page) 432 return NULL; 433 434 dpage = page_address(page); 435 rb_init_page(dpage); 436 437 return dpage; 438 } 439 440 /* 441 * We need to fit the time_stamp delta into 27 bits. 442 */ 443 static inline bool test_time_stamp(u64 delta) 444 { 445 return !!(delta & TS_DELTA_TEST); 446 } 447 448 struct rb_irq_work { 449 struct irq_work work; 450 wait_queue_head_t waiters; 451 wait_queue_head_t full_waiters; 452 atomic_t seq; 453 bool waiters_pending; 454 bool full_waiters_pending; 455 bool wakeup_full; 456 }; 457 458 /* 459 * Structure to hold event state and handle nested events. 460 */ 461 struct rb_event_info { 462 u64 ts; 463 u64 delta; 464 u64 before; 465 u64 after; 466 unsigned long length; 467 struct buffer_page *tail_page; 468 int add_timestamp; 469 }; 470 471 /* 472 * Used for the add_timestamp 473 * NONE 474 * EXTEND - wants a time extend 475 * ABSOLUTE - the buffer requests all events to have absolute time stamps 476 * FORCE - force a full time stamp. 477 */ 478 enum { 479 RB_ADD_STAMP_NONE = 0, 480 RB_ADD_STAMP_EXTEND = BIT(1), 481 RB_ADD_STAMP_ABSOLUTE = BIT(2), 482 RB_ADD_STAMP_FORCE = BIT(3) 483 }; 484 /* 485 * Used for which event context the event is in. 486 * TRANSITION = 0 487 * NMI = 1 488 * IRQ = 2 489 * SOFTIRQ = 3 490 * NORMAL = 4 491 * 492 * See trace_recursive_lock() comment below for more details. 493 */ 494 enum { 495 RB_CTX_TRANSITION, 496 RB_CTX_NMI, 497 RB_CTX_IRQ, 498 RB_CTX_SOFTIRQ, 499 RB_CTX_NORMAL, 500 RB_CTX_MAX 501 }; 502 503 struct rb_time_struct { 504 local64_t time; 505 }; 506 typedef struct rb_time_struct rb_time_t; 507 508 #define MAX_NEST 5 509 510 /* 511 * head_page == tail_page && head == tail then buffer is empty. 512 */ 513 struct ring_buffer_per_cpu { 514 int cpu; 515 atomic_t record_disabled; 516 atomic_t resize_disabled; 517 struct trace_buffer *buffer; 518 raw_spinlock_t reader_lock; /* serialize readers */ 519 arch_spinlock_t lock; 520 struct lock_class_key lock_key; 521 struct buffer_data_page *free_page; 522 unsigned long nr_pages; 523 unsigned int current_context; 524 struct list_head *pages; 525 /* pages generation counter, incremented when the list changes */ 526 unsigned long cnt; 527 struct buffer_page *head_page; /* read from head */ 528 struct buffer_page *tail_page; /* write to tail */ 529 struct buffer_page *commit_page; /* committed pages */ 530 struct buffer_page *reader_page; 531 unsigned long lost_events; 532 unsigned long last_overrun; 533 unsigned long nest; 534 local_t entries_bytes; 535 local_t entries; 536 local_t overrun; 537 local_t commit_overrun; 538 local_t dropped_events; 539 local_t committing; 540 local_t commits; 541 local_t pages_touched; 542 local_t pages_lost; 543 local_t pages_read; 544 long last_pages_touch; 545 size_t shortest_full; 546 unsigned long read; 547 unsigned long read_bytes; 548 rb_time_t write_stamp; 549 rb_time_t before_stamp; 550 u64 event_stamp[MAX_NEST]; 551 u64 read_stamp; 552 /* pages removed since last reset */ 553 unsigned long pages_removed; 554 555 unsigned int mapped; 556 unsigned int user_mapped; /* user space mapping */ 557 struct mutex mapping_lock; 558 unsigned long *subbuf_ids; /* ID to subbuf VA */ 559 struct trace_buffer_meta *meta_page; 560 struct ring_buffer_cpu_meta *ring_meta; 561 562 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 563 long nr_pages_to_update; 564 struct list_head new_pages; /* new pages to add */ 565 struct work_struct update_pages_work; 566 struct completion update_done; 567 568 struct rb_irq_work irq_work; 569 }; 570 571 struct trace_buffer { 572 unsigned flags; 573 int cpus; 574 atomic_t record_disabled; 575 atomic_t resizing; 576 cpumask_var_t cpumask; 577 578 struct lock_class_key *reader_lock_key; 579 580 struct mutex mutex; 581 582 struct ring_buffer_per_cpu **buffers; 583 584 struct hlist_node node; 585 u64 (*clock)(void); 586 587 struct rb_irq_work irq_work; 588 bool time_stamp_abs; 589 590 unsigned long range_addr_start; 591 unsigned long range_addr_end; 592 593 struct ring_buffer_meta *meta; 594 595 unsigned int subbuf_size; 596 unsigned int subbuf_order; 597 unsigned int max_data_size; 598 }; 599 600 struct ring_buffer_iter { 601 struct ring_buffer_per_cpu *cpu_buffer; 602 unsigned long head; 603 unsigned long next_event; 604 struct buffer_page *head_page; 605 struct buffer_page *cache_reader_page; 606 unsigned long cache_read; 607 unsigned long cache_pages_removed; 608 u64 read_stamp; 609 u64 page_stamp; 610 struct ring_buffer_event *event; 611 size_t event_size; 612 int missed_events; 613 }; 614 615 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 616 { 617 struct buffer_data_page field; 618 619 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 620 "offset:0;\tsize:%u;\tsigned:%u;\n", 621 (unsigned int)sizeof(field.time_stamp), 622 (unsigned int)is_signed_type(u64)); 623 624 trace_seq_printf(s, "\tfield: local_t commit;\t" 625 "offset:%u;\tsize:%u;\tsigned:%u;\n", 626 (unsigned int)offsetof(typeof(field), commit), 627 (unsigned int)sizeof(field.commit), 628 (unsigned int)is_signed_type(long)); 629 630 trace_seq_printf(s, "\tfield: int overwrite;\t" 631 "offset:%u;\tsize:%u;\tsigned:%u;\n", 632 (unsigned int)offsetof(typeof(field), commit), 633 1, 634 (unsigned int)is_signed_type(long)); 635 636 trace_seq_printf(s, "\tfield: char data;\t" 637 "offset:%u;\tsize:%u;\tsigned:%u;\n", 638 (unsigned int)offsetof(typeof(field), data), 639 (unsigned int)buffer->subbuf_size, 640 (unsigned int)is_signed_type(char)); 641 642 return !trace_seq_has_overflowed(s); 643 } 644 645 static inline void rb_time_read(rb_time_t *t, u64 *ret) 646 { 647 *ret = local64_read(&t->time); 648 } 649 static void rb_time_set(rb_time_t *t, u64 val) 650 { 651 local64_set(&t->time, val); 652 } 653 654 /* 655 * Enable this to make sure that the event passed to 656 * ring_buffer_event_time_stamp() is not committed and also 657 * is on the buffer that it passed in. 658 */ 659 //#define RB_VERIFY_EVENT 660 #ifdef RB_VERIFY_EVENT 661 static struct list_head *rb_list_head(struct list_head *list); 662 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 663 void *event) 664 { 665 struct buffer_page *page = cpu_buffer->commit_page; 666 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 667 struct list_head *next; 668 long commit, write; 669 unsigned long addr = (unsigned long)event; 670 bool done = false; 671 int stop = 0; 672 673 /* Make sure the event exists and is not committed yet */ 674 do { 675 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 676 done = true; 677 commit = local_read(&page->page->commit); 678 write = local_read(&page->write); 679 if (addr >= (unsigned long)&page->page->data[commit] && 680 addr < (unsigned long)&page->page->data[write]) 681 return; 682 683 next = rb_list_head(page->list.next); 684 page = list_entry(next, struct buffer_page, list); 685 } while (!done); 686 WARN_ON_ONCE(1); 687 } 688 #else 689 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 690 void *event) 691 { 692 } 693 #endif 694 695 /* 696 * The absolute time stamp drops the 5 MSBs and some clocks may 697 * require them. The rb_fix_abs_ts() will take a previous full 698 * time stamp, and add the 5 MSB of that time stamp on to the 699 * saved absolute time stamp. Then they are compared in case of 700 * the unlikely event that the latest time stamp incremented 701 * the 5 MSB. 702 */ 703 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 704 { 705 if (save_ts & TS_MSB) { 706 abs |= save_ts & TS_MSB; 707 /* Check for overflow */ 708 if (unlikely(abs < save_ts)) 709 abs += 1ULL << 59; 710 } 711 return abs; 712 } 713 714 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 715 716 /** 717 * ring_buffer_event_time_stamp - return the event's current time stamp 718 * @buffer: The buffer that the event is on 719 * @event: the event to get the time stamp of 720 * 721 * Note, this must be called after @event is reserved, and before it is 722 * committed to the ring buffer. And must be called from the same 723 * context where the event was reserved (normal, softirq, irq, etc). 724 * 725 * Returns the time stamp associated with the current event. 726 * If the event has an extended time stamp, then that is used as 727 * the time stamp to return. 728 * In the highly unlikely case that the event was nested more than 729 * the max nesting, then the write_stamp of the buffer is returned, 730 * otherwise current time is returned, but that really neither of 731 * the last two cases should ever happen. 732 */ 733 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 734 struct ring_buffer_event *event) 735 { 736 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 737 unsigned int nest; 738 u64 ts; 739 740 /* If the event includes an absolute time, then just use that */ 741 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 742 ts = rb_event_time_stamp(event); 743 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 744 } 745 746 nest = local_read(&cpu_buffer->committing); 747 verify_event(cpu_buffer, event); 748 if (WARN_ON_ONCE(!nest)) 749 goto fail; 750 751 /* Read the current saved nesting level time stamp */ 752 if (likely(--nest < MAX_NEST)) 753 return cpu_buffer->event_stamp[nest]; 754 755 /* Shouldn't happen, warn if it does */ 756 WARN_ONCE(1, "nest (%d) greater than max", nest); 757 758 fail: 759 rb_time_read(&cpu_buffer->write_stamp, &ts); 760 761 return ts; 762 } 763 764 /** 765 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 766 * @buffer: The ring_buffer to get the number of pages from 767 * @cpu: The cpu of the ring_buffer to get the number of pages from 768 * 769 * Returns the number of pages that have content in the ring buffer. 770 */ 771 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 772 { 773 size_t read; 774 size_t lost; 775 size_t cnt; 776 777 read = local_read(&buffer->buffers[cpu]->pages_read); 778 lost = local_read(&buffer->buffers[cpu]->pages_lost); 779 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 780 781 if (WARN_ON_ONCE(cnt < lost)) 782 return 0; 783 784 cnt -= lost; 785 786 /* The reader can read an empty page, but not more than that */ 787 if (cnt < read) { 788 WARN_ON_ONCE(read > cnt + 1); 789 return 0; 790 } 791 792 return cnt - read; 793 } 794 795 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 796 { 797 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 798 size_t nr_pages; 799 size_t dirty; 800 801 nr_pages = cpu_buffer->nr_pages; 802 if (!nr_pages || !full) 803 return true; 804 805 /* 806 * Add one as dirty will never equal nr_pages, as the sub-buffer 807 * that the writer is on is not counted as dirty. 808 * This is needed if "buffer_percent" is set to 100. 809 */ 810 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 811 812 return (dirty * 100) >= (full * nr_pages); 813 } 814 815 /* 816 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 817 * 818 * Schedules a delayed work to wake up any task that is blocked on the 819 * ring buffer waiters queue. 820 */ 821 static void rb_wake_up_waiters(struct irq_work *work) 822 { 823 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 824 825 /* For waiters waiting for the first wake up */ 826 (void)atomic_fetch_inc_release(&rbwork->seq); 827 828 wake_up_all(&rbwork->waiters); 829 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 830 /* Only cpu_buffer sets the above flags */ 831 struct ring_buffer_per_cpu *cpu_buffer = 832 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 833 834 /* Called from interrupt context */ 835 raw_spin_lock(&cpu_buffer->reader_lock); 836 rbwork->wakeup_full = false; 837 rbwork->full_waiters_pending = false; 838 839 /* Waking up all waiters, they will reset the shortest full */ 840 cpu_buffer->shortest_full = 0; 841 raw_spin_unlock(&cpu_buffer->reader_lock); 842 843 wake_up_all(&rbwork->full_waiters); 844 } 845 } 846 847 /** 848 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 849 * @buffer: The ring buffer to wake waiters on 850 * @cpu: The CPU buffer to wake waiters on 851 * 852 * In the case of a file that represents a ring buffer is closing, 853 * it is prudent to wake up any waiters that are on this. 854 */ 855 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 856 { 857 struct ring_buffer_per_cpu *cpu_buffer; 858 struct rb_irq_work *rbwork; 859 860 if (!buffer) 861 return; 862 863 if (cpu == RING_BUFFER_ALL_CPUS) { 864 865 /* Wake up individual ones too. One level recursion */ 866 for_each_buffer_cpu(buffer, cpu) 867 ring_buffer_wake_waiters(buffer, cpu); 868 869 rbwork = &buffer->irq_work; 870 } else { 871 if (WARN_ON_ONCE(!buffer->buffers)) 872 return; 873 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 874 return; 875 876 cpu_buffer = buffer->buffers[cpu]; 877 /* The CPU buffer may not have been initialized yet */ 878 if (!cpu_buffer) 879 return; 880 rbwork = &cpu_buffer->irq_work; 881 } 882 883 /* This can be called in any context */ 884 irq_work_queue(&rbwork->work); 885 } 886 887 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 888 { 889 struct ring_buffer_per_cpu *cpu_buffer; 890 bool ret = false; 891 892 /* Reads of all CPUs always waits for any data */ 893 if (cpu == RING_BUFFER_ALL_CPUS) 894 return !ring_buffer_empty(buffer); 895 896 cpu_buffer = buffer->buffers[cpu]; 897 898 if (!ring_buffer_empty_cpu(buffer, cpu)) { 899 unsigned long flags; 900 bool pagebusy; 901 902 if (!full) 903 return true; 904 905 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 906 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 907 ret = !pagebusy && full_hit(buffer, cpu, full); 908 909 if (!ret && (!cpu_buffer->shortest_full || 910 cpu_buffer->shortest_full > full)) { 911 cpu_buffer->shortest_full = full; 912 } 913 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 914 } 915 return ret; 916 } 917 918 static inline bool 919 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 920 int cpu, int full, ring_buffer_cond_fn cond, void *data) 921 { 922 if (rb_watermark_hit(buffer, cpu, full)) 923 return true; 924 925 if (cond(data)) 926 return true; 927 928 /* 929 * The events can happen in critical sections where 930 * checking a work queue can cause deadlocks. 931 * After adding a task to the queue, this flag is set 932 * only to notify events to try to wake up the queue 933 * using irq_work. 934 * 935 * We don't clear it even if the buffer is no longer 936 * empty. The flag only causes the next event to run 937 * irq_work to do the work queue wake up. The worse 938 * that can happen if we race with !trace_empty() is that 939 * an event will cause an irq_work to try to wake up 940 * an empty queue. 941 * 942 * There's no reason to protect this flag either, as 943 * the work queue and irq_work logic will do the necessary 944 * synchronization for the wake ups. The only thing 945 * that is necessary is that the wake up happens after 946 * a task has been queued. It's OK for spurious wake ups. 947 */ 948 if (full) 949 rbwork->full_waiters_pending = true; 950 else 951 rbwork->waiters_pending = true; 952 953 return false; 954 } 955 956 struct rb_wait_data { 957 struct rb_irq_work *irq_work; 958 int seq; 959 }; 960 961 /* 962 * The default wait condition for ring_buffer_wait() is to just to exit the 963 * wait loop the first time it is woken up. 964 */ 965 static bool rb_wait_once(void *data) 966 { 967 struct rb_wait_data *rdata = data; 968 struct rb_irq_work *rbwork = rdata->irq_work; 969 970 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 971 } 972 973 /** 974 * ring_buffer_wait - wait for input to the ring buffer 975 * @buffer: buffer to wait on 976 * @cpu: the cpu buffer to wait on 977 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 978 * @cond: condition function to break out of wait (NULL to run once) 979 * @data: the data to pass to @cond. 980 * 981 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 982 * as data is added to any of the @buffer's cpu buffers. Otherwise 983 * it will wait for data to be added to a specific cpu buffer. 984 */ 985 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 986 ring_buffer_cond_fn cond, void *data) 987 { 988 struct ring_buffer_per_cpu *cpu_buffer; 989 struct wait_queue_head *waitq; 990 struct rb_irq_work *rbwork; 991 struct rb_wait_data rdata; 992 int ret = 0; 993 994 /* 995 * Depending on what the caller is waiting for, either any 996 * data in any cpu buffer, or a specific buffer, put the 997 * caller on the appropriate wait queue. 998 */ 999 if (cpu == RING_BUFFER_ALL_CPUS) { 1000 rbwork = &buffer->irq_work; 1001 /* Full only makes sense on per cpu reads */ 1002 full = 0; 1003 } else { 1004 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1005 return -ENODEV; 1006 cpu_buffer = buffer->buffers[cpu]; 1007 rbwork = &cpu_buffer->irq_work; 1008 } 1009 1010 if (full) 1011 waitq = &rbwork->full_waiters; 1012 else 1013 waitq = &rbwork->waiters; 1014 1015 /* Set up to exit loop as soon as it is woken */ 1016 if (!cond) { 1017 cond = rb_wait_once; 1018 rdata.irq_work = rbwork; 1019 rdata.seq = atomic_read_acquire(&rbwork->seq); 1020 data = &rdata; 1021 } 1022 1023 ret = wait_event_interruptible((*waitq), 1024 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 1025 1026 return ret; 1027 } 1028 1029 /** 1030 * ring_buffer_poll_wait - poll on buffer input 1031 * @buffer: buffer to wait on 1032 * @cpu: the cpu buffer to wait on 1033 * @filp: the file descriptor 1034 * @poll_table: The poll descriptor 1035 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1036 * 1037 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1038 * as data is added to any of the @buffer's cpu buffers. Otherwise 1039 * it will wait for data to be added to a specific cpu buffer. 1040 * 1041 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1042 * zero otherwise. 1043 */ 1044 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1045 struct file *filp, poll_table *poll_table, int full) 1046 { 1047 struct ring_buffer_per_cpu *cpu_buffer; 1048 struct rb_irq_work *rbwork; 1049 1050 if (cpu == RING_BUFFER_ALL_CPUS) { 1051 rbwork = &buffer->irq_work; 1052 full = 0; 1053 } else { 1054 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1055 return EPOLLERR; 1056 1057 cpu_buffer = buffer->buffers[cpu]; 1058 rbwork = &cpu_buffer->irq_work; 1059 } 1060 1061 if (full) { 1062 poll_wait(filp, &rbwork->full_waiters, poll_table); 1063 1064 if (rb_watermark_hit(buffer, cpu, full)) 1065 return EPOLLIN | EPOLLRDNORM; 1066 /* 1067 * Only allow full_waiters_pending update to be seen after 1068 * the shortest_full is set (in rb_watermark_hit). If the 1069 * writer sees the full_waiters_pending flag set, it will 1070 * compare the amount in the ring buffer to shortest_full. 1071 * If the amount in the ring buffer is greater than the 1072 * shortest_full percent, it will call the irq_work handler 1073 * to wake up this list. The irq_handler will reset shortest_full 1074 * back to zero. That's done under the reader_lock, but 1075 * the below smp_mb() makes sure that the update to 1076 * full_waiters_pending doesn't leak up into the above. 1077 */ 1078 smp_mb(); 1079 rbwork->full_waiters_pending = true; 1080 return 0; 1081 } 1082 1083 poll_wait(filp, &rbwork->waiters, poll_table); 1084 rbwork->waiters_pending = true; 1085 1086 /* 1087 * There's a tight race between setting the waiters_pending and 1088 * checking if the ring buffer is empty. Once the waiters_pending bit 1089 * is set, the next event will wake the task up, but we can get stuck 1090 * if there's only a single event in. 1091 * 1092 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1093 * but adding a memory barrier to all events will cause too much of a 1094 * performance hit in the fast path. We only need a memory barrier when 1095 * the buffer goes from empty to having content. But as this race is 1096 * extremely small, and it's not a problem if another event comes in, we 1097 * will fix it later. 1098 */ 1099 smp_mb(); 1100 1101 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1102 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1103 return EPOLLIN | EPOLLRDNORM; 1104 return 0; 1105 } 1106 1107 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1108 #define RB_WARN_ON(b, cond) \ 1109 ({ \ 1110 int _____ret = unlikely(cond); \ 1111 if (_____ret) { \ 1112 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1113 struct ring_buffer_per_cpu *__b = \ 1114 (void *)b; \ 1115 atomic_inc(&__b->buffer->record_disabled); \ 1116 } else \ 1117 atomic_inc(&b->record_disabled); \ 1118 WARN_ON(1); \ 1119 } \ 1120 _____ret; \ 1121 }) 1122 1123 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1124 #define DEBUG_SHIFT 0 1125 1126 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1127 { 1128 u64 ts; 1129 1130 /* Skip retpolines :-( */ 1131 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1132 ts = trace_clock_local(); 1133 else 1134 ts = buffer->clock(); 1135 1136 /* shift to debug/test normalization and TIME_EXTENTS */ 1137 return ts << DEBUG_SHIFT; 1138 } 1139 1140 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1141 { 1142 u64 time; 1143 1144 preempt_disable_notrace(); 1145 time = rb_time_stamp(buffer); 1146 preempt_enable_notrace(); 1147 1148 return time; 1149 } 1150 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1151 1152 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1153 int cpu, u64 *ts) 1154 { 1155 /* Just stupid testing the normalize function and deltas */ 1156 *ts >>= DEBUG_SHIFT; 1157 } 1158 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1159 1160 /* 1161 * Making the ring buffer lockless makes things tricky. 1162 * Although writes only happen on the CPU that they are on, 1163 * and they only need to worry about interrupts. Reads can 1164 * happen on any CPU. 1165 * 1166 * The reader page is always off the ring buffer, but when the 1167 * reader finishes with a page, it needs to swap its page with 1168 * a new one from the buffer. The reader needs to take from 1169 * the head (writes go to the tail). But if a writer is in overwrite 1170 * mode and wraps, it must push the head page forward. 1171 * 1172 * Here lies the problem. 1173 * 1174 * The reader must be careful to replace only the head page, and 1175 * not another one. As described at the top of the file in the 1176 * ASCII art, the reader sets its old page to point to the next 1177 * page after head. It then sets the page after head to point to 1178 * the old reader page. But if the writer moves the head page 1179 * during this operation, the reader could end up with the tail. 1180 * 1181 * We use cmpxchg to help prevent this race. We also do something 1182 * special with the page before head. We set the LSB to 1. 1183 * 1184 * When the writer must push the page forward, it will clear the 1185 * bit that points to the head page, move the head, and then set 1186 * the bit that points to the new head page. 1187 * 1188 * We also don't want an interrupt coming in and moving the head 1189 * page on another writer. Thus we use the second LSB to catch 1190 * that too. Thus: 1191 * 1192 * head->list->prev->next bit 1 bit 0 1193 * ------- ------- 1194 * Normal page 0 0 1195 * Points to head page 0 1 1196 * New head page 1 0 1197 * 1198 * Note we can not trust the prev pointer of the head page, because: 1199 * 1200 * +----+ +-----+ +-----+ 1201 * | |------>| T |---X--->| N | 1202 * | |<------| | | | 1203 * +----+ +-----+ +-----+ 1204 * ^ ^ | 1205 * | +-----+ | | 1206 * +----------| R |----------+ | 1207 * | |<-----------+ 1208 * +-----+ 1209 * 1210 * Key: ---X--> HEAD flag set in pointer 1211 * T Tail page 1212 * R Reader page 1213 * N Next page 1214 * 1215 * (see __rb_reserve_next() to see where this happens) 1216 * 1217 * What the above shows is that the reader just swapped out 1218 * the reader page with a page in the buffer, but before it 1219 * could make the new header point back to the new page added 1220 * it was preempted by a writer. The writer moved forward onto 1221 * the new page added by the reader and is about to move forward 1222 * again. 1223 * 1224 * You can see, it is legitimate for the previous pointer of 1225 * the head (or any page) not to point back to itself. But only 1226 * temporarily. 1227 */ 1228 1229 #define RB_PAGE_NORMAL 0UL 1230 #define RB_PAGE_HEAD 1UL 1231 #define RB_PAGE_UPDATE 2UL 1232 1233 1234 #define RB_FLAG_MASK 3UL 1235 1236 /* PAGE_MOVED is not part of the mask */ 1237 #define RB_PAGE_MOVED 4UL 1238 1239 /* 1240 * rb_list_head - remove any bit 1241 */ 1242 static struct list_head *rb_list_head(struct list_head *list) 1243 { 1244 unsigned long val = (unsigned long)list; 1245 1246 return (struct list_head *)(val & ~RB_FLAG_MASK); 1247 } 1248 1249 /* 1250 * rb_is_head_page - test if the given page is the head page 1251 * 1252 * Because the reader may move the head_page pointer, we can 1253 * not trust what the head page is (it may be pointing to 1254 * the reader page). But if the next page is a header page, 1255 * its flags will be non zero. 1256 */ 1257 static inline int 1258 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1259 { 1260 unsigned long val; 1261 1262 val = (unsigned long)list->next; 1263 1264 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1265 return RB_PAGE_MOVED; 1266 1267 return val & RB_FLAG_MASK; 1268 } 1269 1270 /* 1271 * rb_is_reader_page 1272 * 1273 * The unique thing about the reader page, is that, if the 1274 * writer is ever on it, the previous pointer never points 1275 * back to the reader page. 1276 */ 1277 static bool rb_is_reader_page(struct buffer_page *page) 1278 { 1279 struct list_head *list = page->list.prev; 1280 1281 return rb_list_head(list->next) != &page->list; 1282 } 1283 1284 /* 1285 * rb_set_list_to_head - set a list_head to be pointing to head. 1286 */ 1287 static void rb_set_list_to_head(struct list_head *list) 1288 { 1289 unsigned long *ptr; 1290 1291 ptr = (unsigned long *)&list->next; 1292 *ptr |= RB_PAGE_HEAD; 1293 *ptr &= ~RB_PAGE_UPDATE; 1294 } 1295 1296 /* 1297 * rb_head_page_activate - sets up head page 1298 */ 1299 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1300 { 1301 struct buffer_page *head; 1302 1303 head = cpu_buffer->head_page; 1304 if (!head) 1305 return; 1306 1307 /* 1308 * Set the previous list pointer to have the HEAD flag. 1309 */ 1310 rb_set_list_to_head(head->list.prev); 1311 1312 if (cpu_buffer->ring_meta) { 1313 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1314 meta->head_buffer = (unsigned long)head->page; 1315 } 1316 } 1317 1318 static void rb_list_head_clear(struct list_head *list) 1319 { 1320 unsigned long *ptr = (unsigned long *)&list->next; 1321 1322 *ptr &= ~RB_FLAG_MASK; 1323 } 1324 1325 /* 1326 * rb_head_page_deactivate - clears head page ptr (for free list) 1327 */ 1328 static void 1329 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1330 { 1331 struct list_head *hd; 1332 1333 /* Go through the whole list and clear any pointers found. */ 1334 rb_list_head_clear(cpu_buffer->pages); 1335 1336 list_for_each(hd, cpu_buffer->pages) 1337 rb_list_head_clear(hd); 1338 } 1339 1340 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1341 struct buffer_page *head, 1342 struct buffer_page *prev, 1343 int old_flag, int new_flag) 1344 { 1345 struct list_head *list; 1346 unsigned long val = (unsigned long)&head->list; 1347 unsigned long ret; 1348 1349 list = &prev->list; 1350 1351 val &= ~RB_FLAG_MASK; 1352 1353 ret = cmpxchg((unsigned long *)&list->next, 1354 val | old_flag, val | new_flag); 1355 1356 /* check if the reader took the page */ 1357 if ((ret & ~RB_FLAG_MASK) != val) 1358 return RB_PAGE_MOVED; 1359 1360 return ret & RB_FLAG_MASK; 1361 } 1362 1363 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1364 struct buffer_page *head, 1365 struct buffer_page *prev, 1366 int old_flag) 1367 { 1368 return rb_head_page_set(cpu_buffer, head, prev, 1369 old_flag, RB_PAGE_UPDATE); 1370 } 1371 1372 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1373 struct buffer_page *head, 1374 struct buffer_page *prev, 1375 int old_flag) 1376 { 1377 return rb_head_page_set(cpu_buffer, head, prev, 1378 old_flag, RB_PAGE_HEAD); 1379 } 1380 1381 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1382 struct buffer_page *head, 1383 struct buffer_page *prev, 1384 int old_flag) 1385 { 1386 return rb_head_page_set(cpu_buffer, head, prev, 1387 old_flag, RB_PAGE_NORMAL); 1388 } 1389 1390 static inline void rb_inc_page(struct buffer_page **bpage) 1391 { 1392 struct list_head *p = rb_list_head((*bpage)->list.next); 1393 1394 *bpage = list_entry(p, struct buffer_page, list); 1395 } 1396 1397 static inline void rb_dec_page(struct buffer_page **bpage) 1398 { 1399 struct list_head *p = rb_list_head((*bpage)->list.prev); 1400 1401 *bpage = list_entry(p, struct buffer_page, list); 1402 } 1403 1404 static struct buffer_page * 1405 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1406 { 1407 struct buffer_page *head; 1408 struct buffer_page *page; 1409 struct list_head *list; 1410 int i; 1411 1412 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1413 return NULL; 1414 1415 /* sanity check */ 1416 list = cpu_buffer->pages; 1417 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1418 return NULL; 1419 1420 page = head = cpu_buffer->head_page; 1421 /* 1422 * It is possible that the writer moves the header behind 1423 * where we started, and we miss in one loop. 1424 * A second loop should grab the header, but we'll do 1425 * three loops just because I'm paranoid. 1426 */ 1427 for (i = 0; i < 3; i++) { 1428 do { 1429 if (rb_is_head_page(page, page->list.prev)) { 1430 cpu_buffer->head_page = page; 1431 return page; 1432 } 1433 rb_inc_page(&page); 1434 } while (page != head); 1435 } 1436 1437 RB_WARN_ON(cpu_buffer, 1); 1438 1439 return NULL; 1440 } 1441 1442 static bool rb_head_page_replace(struct buffer_page *old, 1443 struct buffer_page *new) 1444 { 1445 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1446 unsigned long val; 1447 1448 val = *ptr & ~RB_FLAG_MASK; 1449 val |= RB_PAGE_HEAD; 1450 1451 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1452 } 1453 1454 /* 1455 * rb_tail_page_update - move the tail page forward 1456 */ 1457 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1458 struct buffer_page *tail_page, 1459 struct buffer_page *next_page) 1460 { 1461 unsigned long old_entries; 1462 unsigned long old_write; 1463 1464 /* 1465 * The tail page now needs to be moved forward. 1466 * 1467 * We need to reset the tail page, but without messing 1468 * with possible erasing of data brought in by interrupts 1469 * that have moved the tail page and are currently on it. 1470 * 1471 * We add a counter to the write field to denote this. 1472 */ 1473 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1474 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1475 1476 /* 1477 * Just make sure we have seen our old_write and synchronize 1478 * with any interrupts that come in. 1479 */ 1480 barrier(); 1481 1482 /* 1483 * If the tail page is still the same as what we think 1484 * it is, then it is up to us to update the tail 1485 * pointer. 1486 */ 1487 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1488 /* Zero the write counter */ 1489 unsigned long val = old_write & ~RB_WRITE_MASK; 1490 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1491 1492 /* 1493 * This will only succeed if an interrupt did 1494 * not come in and change it. In which case, we 1495 * do not want to modify it. 1496 * 1497 * We add (void) to let the compiler know that we do not care 1498 * about the return value of these functions. We use the 1499 * cmpxchg to only update if an interrupt did not already 1500 * do it for us. If the cmpxchg fails, we don't care. 1501 */ 1502 (void)local_cmpxchg(&next_page->write, old_write, val); 1503 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1504 1505 /* 1506 * No need to worry about races with clearing out the commit. 1507 * it only can increment when a commit takes place. But that 1508 * only happens in the outer most nested commit. 1509 */ 1510 local_set(&next_page->page->commit, 0); 1511 1512 /* Either we update tail_page or an interrupt does */ 1513 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1514 local_inc(&cpu_buffer->pages_touched); 1515 } 1516 } 1517 1518 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1519 struct buffer_page *bpage) 1520 { 1521 unsigned long val = (unsigned long)bpage; 1522 1523 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1524 } 1525 1526 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1527 struct list_head *list) 1528 { 1529 if (RB_WARN_ON(cpu_buffer, 1530 rb_list_head(rb_list_head(list->next)->prev) != list)) 1531 return false; 1532 1533 if (RB_WARN_ON(cpu_buffer, 1534 rb_list_head(rb_list_head(list->prev)->next) != list)) 1535 return false; 1536 1537 return true; 1538 } 1539 1540 /** 1541 * rb_check_pages - integrity check of buffer pages 1542 * @cpu_buffer: CPU buffer with pages to test 1543 * 1544 * As a safety measure we check to make sure the data pages have not 1545 * been corrupted. 1546 */ 1547 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1548 { 1549 struct list_head *head, *tmp; 1550 unsigned long buffer_cnt; 1551 unsigned long flags; 1552 int nr_loops = 0; 1553 1554 /* 1555 * Walk the linked list underpinning the ring buffer and validate all 1556 * its next and prev links. 1557 * 1558 * The check acquires the reader_lock to avoid concurrent processing 1559 * with code that could be modifying the list. However, the lock cannot 1560 * be held for the entire duration of the walk, as this would make the 1561 * time when interrupts are disabled non-deterministic, dependent on the 1562 * ring buffer size. Therefore, the code releases and re-acquires the 1563 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1564 * is then used to detect if the list was modified while the lock was 1565 * not held, in which case the check needs to be restarted. 1566 * 1567 * The code attempts to perform the check at most three times before 1568 * giving up. This is acceptable because this is only a self-validation 1569 * to detect problems early on. In practice, the list modification 1570 * operations are fairly spaced, and so this check typically succeeds at 1571 * most on the second try. 1572 */ 1573 again: 1574 if (++nr_loops > 3) 1575 return; 1576 1577 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1578 head = rb_list_head(cpu_buffer->pages); 1579 if (!rb_check_links(cpu_buffer, head)) 1580 goto out_locked; 1581 buffer_cnt = cpu_buffer->cnt; 1582 tmp = head; 1583 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1584 1585 while (true) { 1586 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1587 1588 if (buffer_cnt != cpu_buffer->cnt) { 1589 /* The list was updated, try again. */ 1590 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1591 goto again; 1592 } 1593 1594 tmp = rb_list_head(tmp->next); 1595 if (tmp == head) 1596 /* The iteration circled back, all is done. */ 1597 goto out_locked; 1598 1599 if (!rb_check_links(cpu_buffer, tmp)) 1600 goto out_locked; 1601 1602 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1603 } 1604 1605 out_locked: 1606 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1607 } 1608 1609 /* 1610 * Take an address, add the meta data size as well as the array of 1611 * array subbuffer indexes, then align it to a subbuffer size. 1612 * 1613 * This is used to help find the next per cpu subbuffer within a mapped range. 1614 */ 1615 static unsigned long 1616 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1617 { 1618 addr += sizeof(struct ring_buffer_cpu_meta) + 1619 sizeof(int) * nr_subbufs; 1620 return ALIGN(addr, subbuf_size); 1621 } 1622 1623 /* 1624 * Return the ring_buffer_meta for a given @cpu. 1625 */ 1626 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1627 { 1628 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1629 struct ring_buffer_cpu_meta *meta; 1630 struct ring_buffer_meta *bmeta; 1631 unsigned long ptr; 1632 int nr_subbufs; 1633 1634 bmeta = buffer->meta; 1635 if (!bmeta) 1636 return NULL; 1637 1638 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1639 meta = (struct ring_buffer_cpu_meta *)ptr; 1640 1641 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1642 if (!nr_pages) { 1643 nr_subbufs = meta->nr_subbufs; 1644 } else { 1645 /* Include the reader page */ 1646 nr_subbufs = nr_pages + 1; 1647 } 1648 1649 /* 1650 * The first chunk may not be subbuffer aligned, where as 1651 * the rest of the chunks are. 1652 */ 1653 if (cpu) { 1654 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1655 ptr += subbuf_size * nr_subbufs; 1656 1657 /* We can use multiplication to find chunks greater than 1 */ 1658 if (cpu > 1) { 1659 unsigned long size; 1660 unsigned long p; 1661 1662 /* Save the beginning of this CPU chunk */ 1663 p = ptr; 1664 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1665 ptr += subbuf_size * nr_subbufs; 1666 1667 /* Now all chunks after this are the same size */ 1668 size = ptr - p; 1669 ptr += size * (cpu - 2); 1670 } 1671 } 1672 return (void *)ptr; 1673 } 1674 1675 /* Return the start of subbufs given the meta pointer */ 1676 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1677 { 1678 int subbuf_size = meta->subbuf_size; 1679 unsigned long ptr; 1680 1681 ptr = (unsigned long)meta; 1682 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1683 1684 return (void *)ptr; 1685 } 1686 1687 /* 1688 * Return a specific sub-buffer for a given @cpu defined by @idx. 1689 */ 1690 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1691 { 1692 struct ring_buffer_cpu_meta *meta; 1693 unsigned long ptr; 1694 int subbuf_size; 1695 1696 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1697 if (!meta) 1698 return NULL; 1699 1700 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1701 return NULL; 1702 1703 subbuf_size = meta->subbuf_size; 1704 1705 /* Map this buffer to the order that's in meta->buffers[] */ 1706 idx = meta->buffers[idx]; 1707 1708 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1709 1710 ptr += subbuf_size * idx; 1711 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1712 return NULL; 1713 1714 return (void *)ptr; 1715 } 1716 1717 /* 1718 * See if the existing memory contains a valid meta section. 1719 * if so, use that, otherwise initialize it. 1720 */ 1721 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1722 { 1723 unsigned long ptr = buffer->range_addr_start; 1724 struct ring_buffer_meta *bmeta; 1725 unsigned long total_size; 1726 int struct_sizes; 1727 1728 bmeta = (struct ring_buffer_meta *)ptr; 1729 buffer->meta = bmeta; 1730 1731 total_size = buffer->range_addr_end - buffer->range_addr_start; 1732 1733 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1734 struct_sizes |= sizeof(*bmeta) << 16; 1735 1736 /* The first buffer will start word size after the meta page */ 1737 ptr += sizeof(*bmeta); 1738 ptr = ALIGN(ptr, sizeof(long)); 1739 ptr += scratch_size; 1740 1741 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1742 pr_info("Ring buffer boot meta mismatch of magic\n"); 1743 goto init; 1744 } 1745 1746 if (bmeta->struct_sizes != struct_sizes) { 1747 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1748 goto init; 1749 } 1750 1751 if (bmeta->total_size != total_size) { 1752 pr_info("Ring buffer boot meta mismatch of total size\n"); 1753 goto init; 1754 } 1755 1756 if (bmeta->buffers_offset > bmeta->total_size) { 1757 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1758 goto init; 1759 } 1760 1761 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1762 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1763 goto init; 1764 } 1765 1766 return true; 1767 1768 init: 1769 bmeta->magic = RING_BUFFER_META_MAGIC; 1770 bmeta->struct_sizes = struct_sizes; 1771 bmeta->total_size = total_size; 1772 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1773 1774 /* Zero out the scratch pad */ 1775 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1776 1777 return false; 1778 } 1779 1780 /* 1781 * See if the existing memory contains valid ring buffer data. 1782 * As the previous kernel must be the same as this kernel, all 1783 * the calculations (size of buffers and number of buffers) 1784 * must be the same. 1785 */ 1786 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1787 struct trace_buffer *buffer, int nr_pages, 1788 unsigned long *subbuf_mask) 1789 { 1790 int subbuf_size = PAGE_SIZE; 1791 struct buffer_data_page *subbuf; 1792 unsigned long buffers_start; 1793 unsigned long buffers_end; 1794 int i; 1795 1796 if (!subbuf_mask) 1797 return false; 1798 1799 buffers_start = meta->first_buffer; 1800 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1801 1802 /* Is the head and commit buffers within the range of buffers? */ 1803 if (meta->head_buffer < buffers_start || 1804 meta->head_buffer >= buffers_end) { 1805 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1806 return false; 1807 } 1808 1809 if (meta->commit_buffer < buffers_start || 1810 meta->commit_buffer >= buffers_end) { 1811 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1812 return false; 1813 } 1814 1815 subbuf = rb_subbufs_from_meta(meta); 1816 1817 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1818 1819 /* Is the meta buffers and the subbufs themselves have correct data? */ 1820 for (i = 0; i < meta->nr_subbufs; i++) { 1821 if (meta->buffers[i] < 0 || 1822 meta->buffers[i] >= meta->nr_subbufs) { 1823 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1824 return false; 1825 } 1826 1827 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1828 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1829 return false; 1830 } 1831 1832 if (test_bit(meta->buffers[i], subbuf_mask)) { 1833 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1834 return false; 1835 } 1836 1837 set_bit(meta->buffers[i], subbuf_mask); 1838 subbuf = (void *)subbuf + subbuf_size; 1839 } 1840 1841 return true; 1842 } 1843 1844 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1845 1846 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1847 unsigned long long *timestamp, u64 *delta_ptr) 1848 { 1849 struct ring_buffer_event *event; 1850 u64 ts, delta; 1851 int events = 0; 1852 int e; 1853 1854 *delta_ptr = 0; 1855 *timestamp = 0; 1856 1857 ts = dpage->time_stamp; 1858 1859 for (e = 0; e < tail; e += rb_event_length(event)) { 1860 1861 event = (struct ring_buffer_event *)(dpage->data + e); 1862 1863 switch (event->type_len) { 1864 1865 case RINGBUF_TYPE_TIME_EXTEND: 1866 delta = rb_event_time_stamp(event); 1867 ts += delta; 1868 break; 1869 1870 case RINGBUF_TYPE_TIME_STAMP: 1871 delta = rb_event_time_stamp(event); 1872 delta = rb_fix_abs_ts(delta, ts); 1873 if (delta < ts) { 1874 *delta_ptr = delta; 1875 *timestamp = ts; 1876 return -1; 1877 } 1878 ts = delta; 1879 break; 1880 1881 case RINGBUF_TYPE_PADDING: 1882 if (event->time_delta == 1) 1883 break; 1884 fallthrough; 1885 case RINGBUF_TYPE_DATA: 1886 events++; 1887 ts += event->time_delta; 1888 break; 1889 1890 default: 1891 return -1; 1892 } 1893 } 1894 *timestamp = ts; 1895 return events; 1896 } 1897 1898 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1899 { 1900 unsigned long long ts; 1901 u64 delta; 1902 int tail; 1903 1904 tail = local_read(&dpage->commit); 1905 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1906 } 1907 1908 /* If the meta data has been validated, now validate the events */ 1909 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1910 { 1911 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1912 struct buffer_page *head_page, *orig_head; 1913 unsigned long entry_bytes = 0; 1914 unsigned long entries = 0; 1915 int ret; 1916 u64 ts; 1917 int i; 1918 1919 if (!meta || !meta->head_buffer) 1920 return; 1921 1922 /* Do the reader page first */ 1923 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1924 if (ret < 0) { 1925 pr_info("Ring buffer reader page is invalid\n"); 1926 goto invalid; 1927 } 1928 entries += ret; 1929 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1930 local_set(&cpu_buffer->reader_page->entries, ret); 1931 1932 orig_head = head_page = cpu_buffer->head_page; 1933 ts = head_page->page->time_stamp; 1934 1935 /* 1936 * Try to rewind the head so that we can read the pages which already 1937 * read in the previous boot. 1938 */ 1939 if (head_page == cpu_buffer->tail_page) 1940 goto skip_rewind; 1941 1942 rb_dec_page(&head_page); 1943 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 1944 1945 /* Rewind until tail (writer) page. */ 1946 if (head_page == cpu_buffer->tail_page) 1947 break; 1948 1949 /* Ensure the page has older data than head. */ 1950 if (ts < head_page->page->time_stamp) 1951 break; 1952 1953 ts = head_page->page->time_stamp; 1954 /* Ensure the page has correct timestamp and some data. */ 1955 if (!ts || rb_page_commit(head_page) == 0) 1956 break; 1957 1958 /* Stop rewind if the page is invalid. */ 1959 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1960 if (ret < 0) 1961 break; 1962 1963 /* Recover the number of entries and update stats. */ 1964 local_set(&head_page->entries, ret); 1965 if (ret) 1966 local_inc(&cpu_buffer->pages_touched); 1967 entries += ret; 1968 entry_bytes += rb_page_commit(head_page); 1969 } 1970 if (i) 1971 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 1972 1973 /* The last rewound page must be skipped. */ 1974 if (head_page != orig_head) 1975 rb_inc_page(&head_page); 1976 1977 /* 1978 * If the ring buffer was rewound, then inject the reader page 1979 * into the location just before the original head page. 1980 */ 1981 if (head_page != orig_head) { 1982 struct buffer_page *bpage = orig_head; 1983 1984 rb_dec_page(&bpage); 1985 /* 1986 * Insert the reader_page before the original head page. 1987 * Since the list encode RB_PAGE flags, general list 1988 * operations should be avoided. 1989 */ 1990 cpu_buffer->reader_page->list.next = &orig_head->list; 1991 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 1992 orig_head->list.prev = &cpu_buffer->reader_page->list; 1993 bpage->list.next = &cpu_buffer->reader_page->list; 1994 1995 /* Make the head_page the reader page */ 1996 cpu_buffer->reader_page = head_page; 1997 bpage = head_page; 1998 rb_inc_page(&head_page); 1999 head_page->list.prev = bpage->list.prev; 2000 rb_dec_page(&bpage); 2001 bpage->list.next = &head_page->list; 2002 rb_set_list_to_head(&bpage->list); 2003 cpu_buffer->pages = &head_page->list; 2004 2005 cpu_buffer->head_page = head_page; 2006 meta->head_buffer = (unsigned long)head_page->page; 2007 2008 /* Reset all the indexes */ 2009 bpage = cpu_buffer->reader_page; 2010 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 2011 bpage->id = 0; 2012 2013 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 2014 i++, rb_inc_page(&bpage)) { 2015 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 2016 bpage->id = i; 2017 } 2018 2019 /* We'll restart verifying from orig_head */ 2020 head_page = orig_head; 2021 } 2022 2023 skip_rewind: 2024 /* If the commit_buffer is the reader page, update the commit page */ 2025 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 2026 cpu_buffer->commit_page = cpu_buffer->reader_page; 2027 /* Nothing more to do, the only page is the reader page */ 2028 goto done; 2029 } 2030 2031 /* Iterate until finding the commit page */ 2032 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 2033 2034 /* Reader page has already been done */ 2035 if (head_page == cpu_buffer->reader_page) 2036 continue; 2037 2038 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 2039 if (ret < 0) { 2040 pr_info("Ring buffer meta [%d] invalid buffer page\n", 2041 cpu_buffer->cpu); 2042 goto invalid; 2043 } 2044 2045 /* If the buffer has content, update pages_touched */ 2046 if (ret) 2047 local_inc(&cpu_buffer->pages_touched); 2048 2049 entries += ret; 2050 entry_bytes += local_read(&head_page->page->commit); 2051 local_set(&cpu_buffer->head_page->entries, ret); 2052 2053 if (head_page == cpu_buffer->commit_page) 2054 break; 2055 } 2056 2057 if (head_page != cpu_buffer->commit_page) { 2058 pr_info("Ring buffer meta [%d] commit page not found\n", 2059 cpu_buffer->cpu); 2060 goto invalid; 2061 } 2062 done: 2063 local_set(&cpu_buffer->entries, entries); 2064 local_set(&cpu_buffer->entries_bytes, entry_bytes); 2065 2066 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 2067 return; 2068 2069 invalid: 2070 /* The content of the buffers are invalid, reset the meta data */ 2071 meta->head_buffer = 0; 2072 meta->commit_buffer = 0; 2073 2074 /* Reset the reader page */ 2075 local_set(&cpu_buffer->reader_page->entries, 0); 2076 local_set(&cpu_buffer->reader_page->page->commit, 0); 2077 2078 /* Reset all the subbuffers */ 2079 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2080 local_set(&head_page->entries, 0); 2081 local_set(&head_page->page->commit, 0); 2082 } 2083 } 2084 2085 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2086 { 2087 struct ring_buffer_cpu_meta *meta; 2088 unsigned long *subbuf_mask; 2089 unsigned long delta; 2090 void *subbuf; 2091 bool valid = false; 2092 int cpu; 2093 int i; 2094 2095 /* Create a mask to test the subbuf array */ 2096 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2097 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2098 2099 if (rb_meta_init(buffer, scratch_size)) 2100 valid = true; 2101 2102 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2103 void *next_meta; 2104 2105 meta = rb_range_meta(buffer, nr_pages, cpu); 2106 2107 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2108 /* Make the mappings match the current address */ 2109 subbuf = rb_subbufs_from_meta(meta); 2110 delta = (unsigned long)subbuf - meta->first_buffer; 2111 meta->first_buffer += delta; 2112 meta->head_buffer += delta; 2113 meta->commit_buffer += delta; 2114 continue; 2115 } 2116 2117 if (cpu < nr_cpu_ids - 1) 2118 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2119 else 2120 next_meta = (void *)buffer->range_addr_end; 2121 2122 memset(meta, 0, next_meta - (void *)meta); 2123 2124 meta->nr_subbufs = nr_pages + 1; 2125 meta->subbuf_size = PAGE_SIZE; 2126 2127 subbuf = rb_subbufs_from_meta(meta); 2128 2129 meta->first_buffer = (unsigned long)subbuf; 2130 2131 /* 2132 * The buffers[] array holds the order of the sub-buffers 2133 * that are after the meta data. The sub-buffers may 2134 * be swapped out when read and inserted into a different 2135 * location of the ring buffer. Although their addresses 2136 * remain the same, the buffers[] array contains the 2137 * index into the sub-buffers holding their actual order. 2138 */ 2139 for (i = 0; i < meta->nr_subbufs; i++) { 2140 meta->buffers[i] = i; 2141 rb_init_page(subbuf); 2142 subbuf += meta->subbuf_size; 2143 } 2144 } 2145 bitmap_free(subbuf_mask); 2146 } 2147 2148 static void *rbm_start(struct seq_file *m, loff_t *pos) 2149 { 2150 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2151 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2152 unsigned long val; 2153 2154 if (!meta) 2155 return NULL; 2156 2157 if (*pos > meta->nr_subbufs) 2158 return NULL; 2159 2160 val = *pos; 2161 val++; 2162 2163 return (void *)val; 2164 } 2165 2166 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2167 { 2168 (*pos)++; 2169 2170 return rbm_start(m, pos); 2171 } 2172 2173 static int rbm_show(struct seq_file *m, void *v) 2174 { 2175 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2176 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2177 unsigned long val = (unsigned long)v; 2178 2179 if (val == 1) { 2180 seq_printf(m, "head_buffer: %d\n", 2181 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2182 seq_printf(m, "commit_buffer: %d\n", 2183 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2184 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2185 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2186 return 0; 2187 } 2188 2189 val -= 2; 2190 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2191 2192 return 0; 2193 } 2194 2195 static void rbm_stop(struct seq_file *m, void *p) 2196 { 2197 } 2198 2199 static const struct seq_operations rb_meta_seq_ops = { 2200 .start = rbm_start, 2201 .next = rbm_next, 2202 .show = rbm_show, 2203 .stop = rbm_stop, 2204 }; 2205 2206 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2207 { 2208 struct seq_file *m; 2209 int ret; 2210 2211 ret = seq_open(file, &rb_meta_seq_ops); 2212 if (ret) 2213 return ret; 2214 2215 m = file->private_data; 2216 m->private = buffer->buffers[cpu]; 2217 2218 return 0; 2219 } 2220 2221 /* Map the buffer_pages to the previous head and commit pages */ 2222 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2223 struct buffer_page *bpage) 2224 { 2225 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2226 2227 if (meta->head_buffer == (unsigned long)bpage->page) 2228 cpu_buffer->head_page = bpage; 2229 2230 if (meta->commit_buffer == (unsigned long)bpage->page) { 2231 cpu_buffer->commit_page = bpage; 2232 cpu_buffer->tail_page = bpage; 2233 } 2234 } 2235 2236 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2237 long nr_pages, struct list_head *pages) 2238 { 2239 struct trace_buffer *buffer = cpu_buffer->buffer; 2240 struct ring_buffer_cpu_meta *meta = NULL; 2241 struct buffer_page *bpage, *tmp; 2242 bool user_thread = current->mm != NULL; 2243 long i; 2244 2245 /* 2246 * Check if the available memory is there first. 2247 * Note, si_mem_available() only gives us a rough estimate of available 2248 * memory. It may not be accurate. But we don't care, we just want 2249 * to prevent doing any allocation when it is obvious that it is 2250 * not going to succeed. 2251 */ 2252 i = si_mem_available(); 2253 if (i < nr_pages) 2254 return -ENOMEM; 2255 2256 /* 2257 * If a user thread allocates too much, and si_mem_available() 2258 * reports there's enough memory, even though there is not. 2259 * Make sure the OOM killer kills this thread. This can happen 2260 * even with RETRY_MAYFAIL because another task may be doing 2261 * an allocation after this task has taken all memory. 2262 * This is the task the OOM killer needs to take out during this 2263 * loop, even if it was triggered by an allocation somewhere else. 2264 */ 2265 if (user_thread) 2266 set_current_oom_origin(); 2267 2268 if (buffer->range_addr_start) 2269 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2270 2271 for (i = 0; i < nr_pages; i++) { 2272 2273 bpage = alloc_cpu_page(cpu_buffer->cpu); 2274 if (!bpage) 2275 goto free_pages; 2276 2277 rb_check_bpage(cpu_buffer, bpage); 2278 2279 /* 2280 * Append the pages as for mapped buffers we want to keep 2281 * the order 2282 */ 2283 list_add_tail(&bpage->list, pages); 2284 2285 if (meta) { 2286 /* A range was given. Use that for the buffer page */ 2287 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2288 if (!bpage->page) 2289 goto free_pages; 2290 /* If this is valid from a previous boot */ 2291 if (meta->head_buffer) 2292 rb_meta_buffer_update(cpu_buffer, bpage); 2293 bpage->range = 1; 2294 bpage->id = i + 1; 2295 } else { 2296 int order = cpu_buffer->buffer->subbuf_order; 2297 bpage->page = alloc_cpu_data(cpu_buffer->cpu, order); 2298 if (!bpage->page) 2299 goto free_pages; 2300 } 2301 bpage->order = cpu_buffer->buffer->subbuf_order; 2302 2303 if (user_thread && fatal_signal_pending(current)) 2304 goto free_pages; 2305 } 2306 if (user_thread) 2307 clear_current_oom_origin(); 2308 2309 return 0; 2310 2311 free_pages: 2312 list_for_each_entry_safe(bpage, tmp, pages, list) { 2313 list_del_init(&bpage->list); 2314 free_buffer_page(bpage); 2315 } 2316 if (user_thread) 2317 clear_current_oom_origin(); 2318 2319 return -ENOMEM; 2320 } 2321 2322 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2323 unsigned long nr_pages) 2324 { 2325 LIST_HEAD(pages); 2326 2327 WARN_ON(!nr_pages); 2328 2329 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2330 return -ENOMEM; 2331 2332 /* 2333 * The ring buffer page list is a circular list that does not 2334 * start and end with a list head. All page list items point to 2335 * other pages. 2336 */ 2337 cpu_buffer->pages = pages.next; 2338 list_del(&pages); 2339 2340 cpu_buffer->nr_pages = nr_pages; 2341 2342 rb_check_pages(cpu_buffer); 2343 2344 return 0; 2345 } 2346 2347 static struct ring_buffer_per_cpu * 2348 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2349 { 2350 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = 2351 alloc_cpu_buffer(cpu); 2352 struct ring_buffer_cpu_meta *meta; 2353 struct buffer_page *bpage; 2354 int ret; 2355 2356 if (!cpu_buffer) 2357 return NULL; 2358 2359 cpu_buffer->cpu = cpu; 2360 cpu_buffer->buffer = buffer; 2361 raw_spin_lock_init(&cpu_buffer->reader_lock); 2362 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2363 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2364 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2365 init_completion(&cpu_buffer->update_done); 2366 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2367 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2368 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2369 mutex_init(&cpu_buffer->mapping_lock); 2370 2371 bpage = alloc_cpu_page(cpu); 2372 if (!bpage) 2373 return NULL; 2374 2375 rb_check_bpage(cpu_buffer, bpage); 2376 2377 cpu_buffer->reader_page = bpage; 2378 2379 if (buffer->range_addr_start) { 2380 /* 2381 * Range mapped buffers have the same restrictions as memory 2382 * mapped ones do. 2383 */ 2384 cpu_buffer->mapped = 1; 2385 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2386 bpage->page = rb_range_buffer(cpu_buffer, 0); 2387 if (!bpage->page) 2388 goto fail_free_reader; 2389 if (cpu_buffer->ring_meta->head_buffer) 2390 rb_meta_buffer_update(cpu_buffer, bpage); 2391 bpage->range = 1; 2392 } else { 2393 int order = cpu_buffer->buffer->subbuf_order; 2394 bpage->page = alloc_cpu_data(cpu, order); 2395 if (!bpage->page) 2396 goto fail_free_reader; 2397 } 2398 2399 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2400 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2401 2402 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2403 if (ret < 0) 2404 goto fail_free_reader; 2405 2406 rb_meta_validate_events(cpu_buffer); 2407 2408 /* If the boot meta was valid then this has already been updated */ 2409 meta = cpu_buffer->ring_meta; 2410 if (!meta || !meta->head_buffer || 2411 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2412 if (meta && meta->head_buffer && 2413 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2414 pr_warn("Ring buffer meta buffers not all mapped\n"); 2415 if (!cpu_buffer->head_page) 2416 pr_warn(" Missing head_page\n"); 2417 if (!cpu_buffer->commit_page) 2418 pr_warn(" Missing commit_page\n"); 2419 if (!cpu_buffer->tail_page) 2420 pr_warn(" Missing tail_page\n"); 2421 } 2422 2423 cpu_buffer->head_page 2424 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2425 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2426 2427 rb_head_page_activate(cpu_buffer); 2428 2429 if (cpu_buffer->ring_meta) 2430 meta->commit_buffer = meta->head_buffer; 2431 } else { 2432 /* The valid meta buffer still needs to activate the head page */ 2433 rb_head_page_activate(cpu_buffer); 2434 } 2435 2436 return_ptr(cpu_buffer); 2437 2438 fail_free_reader: 2439 free_buffer_page(cpu_buffer->reader_page); 2440 2441 return NULL; 2442 } 2443 2444 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2445 { 2446 struct list_head *head = cpu_buffer->pages; 2447 struct buffer_page *bpage, *tmp; 2448 2449 irq_work_sync(&cpu_buffer->irq_work.work); 2450 2451 free_buffer_page(cpu_buffer->reader_page); 2452 2453 if (head) { 2454 rb_head_page_deactivate(cpu_buffer); 2455 2456 list_for_each_entry_safe(bpage, tmp, head, list) { 2457 list_del_init(&bpage->list); 2458 free_buffer_page(bpage); 2459 } 2460 bpage = list_entry(head, struct buffer_page, list); 2461 free_buffer_page(bpage); 2462 } 2463 2464 free_page((unsigned long)cpu_buffer->free_page); 2465 2466 kfree(cpu_buffer); 2467 } 2468 2469 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2470 int order, unsigned long start, 2471 unsigned long end, 2472 unsigned long scratch_size, 2473 struct lock_class_key *key) 2474 { 2475 struct trace_buffer *buffer __free(kfree) = NULL; 2476 long nr_pages; 2477 int subbuf_size; 2478 int bsize; 2479 int cpu; 2480 int ret; 2481 2482 /* keep it in its own cache line */ 2483 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2484 GFP_KERNEL); 2485 if (!buffer) 2486 return NULL; 2487 2488 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2489 return NULL; 2490 2491 buffer->subbuf_order = order; 2492 subbuf_size = (PAGE_SIZE << order); 2493 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2494 2495 /* Max payload is buffer page size - header (8bytes) */ 2496 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2497 2498 buffer->flags = flags; 2499 buffer->clock = trace_clock_local; 2500 buffer->reader_lock_key = key; 2501 2502 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2503 init_waitqueue_head(&buffer->irq_work.waiters); 2504 2505 buffer->cpus = nr_cpu_ids; 2506 2507 bsize = sizeof(void *) * nr_cpu_ids; 2508 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2509 GFP_KERNEL); 2510 if (!buffer->buffers) 2511 goto fail_free_cpumask; 2512 2513 /* If start/end are specified, then that overrides size */ 2514 if (start && end) { 2515 unsigned long buffers_start; 2516 unsigned long ptr; 2517 int n; 2518 2519 /* Make sure that start is word aligned */ 2520 start = ALIGN(start, sizeof(long)); 2521 2522 /* scratch_size needs to be aligned too */ 2523 scratch_size = ALIGN(scratch_size, sizeof(long)); 2524 2525 /* Subtract the buffer meta data and word aligned */ 2526 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2527 buffers_start = ALIGN(buffers_start, sizeof(long)); 2528 buffers_start += scratch_size; 2529 2530 /* Calculate the size for the per CPU data */ 2531 size = end - buffers_start; 2532 size = size / nr_cpu_ids; 2533 2534 /* 2535 * The number of sub-buffers (nr_pages) is determined by the 2536 * total size allocated minus the meta data size. 2537 * Then that is divided by the number of per CPU buffers 2538 * needed, plus account for the integer array index that 2539 * will be appended to the meta data. 2540 */ 2541 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2542 (subbuf_size + sizeof(int)); 2543 /* Need at least two pages plus the reader page */ 2544 if (nr_pages < 3) 2545 goto fail_free_buffers; 2546 2547 again: 2548 /* Make sure that the size fits aligned */ 2549 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2550 ptr += sizeof(struct ring_buffer_cpu_meta) + 2551 sizeof(int) * nr_pages; 2552 ptr = ALIGN(ptr, subbuf_size); 2553 ptr += subbuf_size * nr_pages; 2554 } 2555 if (ptr > end) { 2556 if (nr_pages <= 3) 2557 goto fail_free_buffers; 2558 nr_pages--; 2559 goto again; 2560 } 2561 2562 /* nr_pages should not count the reader page */ 2563 nr_pages--; 2564 buffer->range_addr_start = start; 2565 buffer->range_addr_end = end; 2566 2567 rb_range_meta_init(buffer, nr_pages, scratch_size); 2568 } else { 2569 2570 /* need at least two pages */ 2571 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2572 if (nr_pages < 2) 2573 nr_pages = 2; 2574 } 2575 2576 cpu = raw_smp_processor_id(); 2577 cpumask_set_cpu(cpu, buffer->cpumask); 2578 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2579 if (!buffer->buffers[cpu]) 2580 goto fail_free_buffers; 2581 2582 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2583 if (ret < 0) 2584 goto fail_free_buffers; 2585 2586 mutex_init(&buffer->mutex); 2587 2588 return_ptr(buffer); 2589 2590 fail_free_buffers: 2591 for_each_buffer_cpu(buffer, cpu) { 2592 if (buffer->buffers[cpu]) 2593 rb_free_cpu_buffer(buffer->buffers[cpu]); 2594 } 2595 kfree(buffer->buffers); 2596 2597 fail_free_cpumask: 2598 free_cpumask_var(buffer->cpumask); 2599 2600 return NULL; 2601 } 2602 2603 /** 2604 * __ring_buffer_alloc - allocate a new ring_buffer 2605 * @size: the size in bytes per cpu that is needed. 2606 * @flags: attributes to set for the ring buffer. 2607 * @key: ring buffer reader_lock_key. 2608 * 2609 * Currently the only flag that is available is the RB_FL_OVERWRITE 2610 * flag. This flag means that the buffer will overwrite old data 2611 * when the buffer wraps. If this flag is not set, the buffer will 2612 * drop data when the tail hits the head. 2613 */ 2614 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2615 struct lock_class_key *key) 2616 { 2617 /* Default buffer page size - one system page */ 2618 return alloc_buffer(size, flags, 0, 0, 0, 0, key); 2619 2620 } 2621 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2622 2623 /** 2624 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2625 * @size: the size in bytes per cpu that is needed. 2626 * @flags: attributes to set for the ring buffer. 2627 * @order: sub-buffer order 2628 * @start: start of allocated range 2629 * @range_size: size of allocated range 2630 * @scratch_size: size of scratch area (for preallocated memory buffers) 2631 * @key: ring buffer reader_lock_key. 2632 * 2633 * Currently the only flag that is available is the RB_FL_OVERWRITE 2634 * flag. This flag means that the buffer will overwrite old data 2635 * when the buffer wraps. If this flag is not set, the buffer will 2636 * drop data when the tail hits the head. 2637 */ 2638 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2639 int order, unsigned long start, 2640 unsigned long range_size, 2641 unsigned long scratch_size, 2642 struct lock_class_key *key) 2643 { 2644 return alloc_buffer(size, flags, order, start, start + range_size, 2645 scratch_size, key); 2646 } 2647 2648 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2649 { 2650 struct ring_buffer_meta *meta; 2651 void *ptr; 2652 2653 if (!buffer || !buffer->meta) 2654 return NULL; 2655 2656 meta = buffer->meta; 2657 2658 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2659 2660 if (size) 2661 *size = (void *)meta + meta->buffers_offset - ptr; 2662 2663 return ptr; 2664 } 2665 2666 /** 2667 * ring_buffer_free - free a ring buffer. 2668 * @buffer: the buffer to free. 2669 */ 2670 void 2671 ring_buffer_free(struct trace_buffer *buffer) 2672 { 2673 int cpu; 2674 2675 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2676 2677 irq_work_sync(&buffer->irq_work.work); 2678 2679 for_each_buffer_cpu(buffer, cpu) 2680 rb_free_cpu_buffer(buffer->buffers[cpu]); 2681 2682 kfree(buffer->buffers); 2683 free_cpumask_var(buffer->cpumask); 2684 2685 kfree(buffer); 2686 } 2687 EXPORT_SYMBOL_GPL(ring_buffer_free); 2688 2689 void ring_buffer_set_clock(struct trace_buffer *buffer, 2690 u64 (*clock)(void)) 2691 { 2692 buffer->clock = clock; 2693 } 2694 2695 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2696 { 2697 buffer->time_stamp_abs = abs; 2698 } 2699 2700 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2701 { 2702 return buffer->time_stamp_abs; 2703 } 2704 2705 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2706 { 2707 return local_read(&bpage->entries) & RB_WRITE_MASK; 2708 } 2709 2710 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2711 { 2712 return local_read(&bpage->write) & RB_WRITE_MASK; 2713 } 2714 2715 static bool 2716 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2717 { 2718 struct list_head *tail_page, *to_remove, *next_page; 2719 struct buffer_page *to_remove_page, *tmp_iter_page; 2720 struct buffer_page *last_page, *first_page; 2721 unsigned long nr_removed; 2722 unsigned long head_bit; 2723 int page_entries; 2724 2725 head_bit = 0; 2726 2727 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2728 atomic_inc(&cpu_buffer->record_disabled); 2729 /* 2730 * We don't race with the readers since we have acquired the reader 2731 * lock. We also don't race with writers after disabling recording. 2732 * This makes it easy to figure out the first and the last page to be 2733 * removed from the list. We unlink all the pages in between including 2734 * the first and last pages. This is done in a busy loop so that we 2735 * lose the least number of traces. 2736 * The pages are freed after we restart recording and unlock readers. 2737 */ 2738 tail_page = &cpu_buffer->tail_page->list; 2739 2740 /* 2741 * tail page might be on reader page, we remove the next page 2742 * from the ring buffer 2743 */ 2744 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2745 tail_page = rb_list_head(tail_page->next); 2746 to_remove = tail_page; 2747 2748 /* start of pages to remove */ 2749 first_page = list_entry(rb_list_head(to_remove->next), 2750 struct buffer_page, list); 2751 2752 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2753 to_remove = rb_list_head(to_remove)->next; 2754 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2755 } 2756 /* Read iterators need to reset themselves when some pages removed */ 2757 cpu_buffer->pages_removed += nr_removed; 2758 2759 next_page = rb_list_head(to_remove)->next; 2760 2761 /* 2762 * Now we remove all pages between tail_page and next_page. 2763 * Make sure that we have head_bit value preserved for the 2764 * next page 2765 */ 2766 tail_page->next = (struct list_head *)((unsigned long)next_page | 2767 head_bit); 2768 next_page = rb_list_head(next_page); 2769 next_page->prev = tail_page; 2770 2771 /* make sure pages points to a valid page in the ring buffer */ 2772 cpu_buffer->pages = next_page; 2773 cpu_buffer->cnt++; 2774 2775 /* update head page */ 2776 if (head_bit) 2777 cpu_buffer->head_page = list_entry(next_page, 2778 struct buffer_page, list); 2779 2780 /* pages are removed, resume tracing and then free the pages */ 2781 atomic_dec(&cpu_buffer->record_disabled); 2782 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2783 2784 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2785 2786 /* last buffer page to remove */ 2787 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2788 list); 2789 tmp_iter_page = first_page; 2790 2791 do { 2792 cond_resched(); 2793 2794 to_remove_page = tmp_iter_page; 2795 rb_inc_page(&tmp_iter_page); 2796 2797 /* update the counters */ 2798 page_entries = rb_page_entries(to_remove_page); 2799 if (page_entries) { 2800 /* 2801 * If something was added to this page, it was full 2802 * since it is not the tail page. So we deduct the 2803 * bytes consumed in ring buffer from here. 2804 * Increment overrun to account for the lost events. 2805 */ 2806 local_add(page_entries, &cpu_buffer->overrun); 2807 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2808 local_inc(&cpu_buffer->pages_lost); 2809 } 2810 2811 /* 2812 * We have already removed references to this list item, just 2813 * free up the buffer_page and its page 2814 */ 2815 free_buffer_page(to_remove_page); 2816 nr_removed--; 2817 2818 } while (to_remove_page != last_page); 2819 2820 RB_WARN_ON(cpu_buffer, nr_removed); 2821 2822 return nr_removed == 0; 2823 } 2824 2825 static bool 2826 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2827 { 2828 struct list_head *pages = &cpu_buffer->new_pages; 2829 unsigned long flags; 2830 bool success; 2831 int retries; 2832 2833 /* Can be called at early boot up, where interrupts must not been enabled */ 2834 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2835 /* 2836 * We are holding the reader lock, so the reader page won't be swapped 2837 * in the ring buffer. Now we are racing with the writer trying to 2838 * move head page and the tail page. 2839 * We are going to adapt the reader page update process where: 2840 * 1. We first splice the start and end of list of new pages between 2841 * the head page and its previous page. 2842 * 2. We cmpxchg the prev_page->next to point from head page to the 2843 * start of new pages list. 2844 * 3. Finally, we update the head->prev to the end of new list. 2845 * 2846 * We will try this process 10 times, to make sure that we don't keep 2847 * spinning. 2848 */ 2849 retries = 10; 2850 success = false; 2851 while (retries--) { 2852 struct list_head *head_page, *prev_page; 2853 struct list_head *last_page, *first_page; 2854 struct list_head *head_page_with_bit; 2855 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2856 2857 if (!hpage) 2858 break; 2859 head_page = &hpage->list; 2860 prev_page = head_page->prev; 2861 2862 first_page = pages->next; 2863 last_page = pages->prev; 2864 2865 head_page_with_bit = (struct list_head *) 2866 ((unsigned long)head_page | RB_PAGE_HEAD); 2867 2868 last_page->next = head_page_with_bit; 2869 first_page->prev = prev_page; 2870 2871 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2872 if (try_cmpxchg(&prev_page->next, 2873 &head_page_with_bit, first_page)) { 2874 /* 2875 * yay, we replaced the page pointer to our new list, 2876 * now, we just have to update to head page's prev 2877 * pointer to point to end of list 2878 */ 2879 head_page->prev = last_page; 2880 cpu_buffer->cnt++; 2881 success = true; 2882 break; 2883 } 2884 } 2885 2886 if (success) 2887 INIT_LIST_HEAD(pages); 2888 /* 2889 * If we weren't successful in adding in new pages, warn and stop 2890 * tracing 2891 */ 2892 RB_WARN_ON(cpu_buffer, !success); 2893 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2894 2895 /* free pages if they weren't inserted */ 2896 if (!success) { 2897 struct buffer_page *bpage, *tmp; 2898 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2899 list) { 2900 list_del_init(&bpage->list); 2901 free_buffer_page(bpage); 2902 } 2903 } 2904 return success; 2905 } 2906 2907 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2908 { 2909 bool success; 2910 2911 if (cpu_buffer->nr_pages_to_update > 0) 2912 success = rb_insert_pages(cpu_buffer); 2913 else 2914 success = rb_remove_pages(cpu_buffer, 2915 -cpu_buffer->nr_pages_to_update); 2916 2917 if (success) 2918 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2919 } 2920 2921 static void update_pages_handler(struct work_struct *work) 2922 { 2923 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2924 struct ring_buffer_per_cpu, update_pages_work); 2925 rb_update_pages(cpu_buffer); 2926 complete(&cpu_buffer->update_done); 2927 } 2928 2929 /** 2930 * ring_buffer_resize - resize the ring buffer 2931 * @buffer: the buffer to resize. 2932 * @size: the new size. 2933 * @cpu_id: the cpu buffer to resize 2934 * 2935 * Minimum size is 2 * buffer->subbuf_size. 2936 * 2937 * Returns 0 on success and < 0 on failure. 2938 */ 2939 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2940 int cpu_id) 2941 { 2942 struct ring_buffer_per_cpu *cpu_buffer; 2943 unsigned long nr_pages; 2944 int cpu, err; 2945 2946 /* 2947 * Always succeed at resizing a non-existent buffer: 2948 */ 2949 if (!buffer) 2950 return 0; 2951 2952 /* Make sure the requested buffer exists */ 2953 if (cpu_id != RING_BUFFER_ALL_CPUS && 2954 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2955 return 0; 2956 2957 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2958 2959 /* we need a minimum of two pages */ 2960 if (nr_pages < 2) 2961 nr_pages = 2; 2962 2963 /* 2964 * Keep CPUs from coming online while resizing to synchronize 2965 * with new per CPU buffers being created. 2966 */ 2967 guard(cpus_read_lock)(); 2968 2969 /* prevent another thread from changing buffer sizes */ 2970 mutex_lock(&buffer->mutex); 2971 atomic_inc(&buffer->resizing); 2972 2973 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2974 /* 2975 * Don't succeed if resizing is disabled, as a reader might be 2976 * manipulating the ring buffer and is expecting a sane state while 2977 * this is true. 2978 */ 2979 for_each_buffer_cpu(buffer, cpu) { 2980 cpu_buffer = buffer->buffers[cpu]; 2981 if (atomic_read(&cpu_buffer->resize_disabled)) { 2982 err = -EBUSY; 2983 goto out_err_unlock; 2984 } 2985 } 2986 2987 /* calculate the pages to update */ 2988 for_each_buffer_cpu(buffer, cpu) { 2989 cpu_buffer = buffer->buffers[cpu]; 2990 2991 cpu_buffer->nr_pages_to_update = nr_pages - 2992 cpu_buffer->nr_pages; 2993 /* 2994 * nothing more to do for removing pages or no update 2995 */ 2996 if (cpu_buffer->nr_pages_to_update <= 0) 2997 continue; 2998 /* 2999 * to add pages, make sure all new pages can be 3000 * allocated without receiving ENOMEM 3001 */ 3002 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3003 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3004 &cpu_buffer->new_pages)) { 3005 /* not enough memory for new pages */ 3006 err = -ENOMEM; 3007 goto out_err; 3008 } 3009 3010 cond_resched(); 3011 } 3012 3013 /* 3014 * Fire off all the required work handlers 3015 * We can't schedule on offline CPUs, but it's not necessary 3016 * since we can change their buffer sizes without any race. 3017 */ 3018 for_each_buffer_cpu(buffer, cpu) { 3019 cpu_buffer = buffer->buffers[cpu]; 3020 if (!cpu_buffer->nr_pages_to_update) 3021 continue; 3022 3023 /* Can't run something on an offline CPU. */ 3024 if (!cpu_online(cpu)) { 3025 rb_update_pages(cpu_buffer); 3026 cpu_buffer->nr_pages_to_update = 0; 3027 } else { 3028 /* Run directly if possible. */ 3029 migrate_disable(); 3030 if (cpu != smp_processor_id()) { 3031 migrate_enable(); 3032 schedule_work_on(cpu, 3033 &cpu_buffer->update_pages_work); 3034 } else { 3035 update_pages_handler(&cpu_buffer->update_pages_work); 3036 migrate_enable(); 3037 } 3038 } 3039 } 3040 3041 /* wait for all the updates to complete */ 3042 for_each_buffer_cpu(buffer, cpu) { 3043 cpu_buffer = buffer->buffers[cpu]; 3044 if (!cpu_buffer->nr_pages_to_update) 3045 continue; 3046 3047 if (cpu_online(cpu)) 3048 wait_for_completion(&cpu_buffer->update_done); 3049 cpu_buffer->nr_pages_to_update = 0; 3050 } 3051 3052 } else { 3053 cpu_buffer = buffer->buffers[cpu_id]; 3054 3055 if (nr_pages == cpu_buffer->nr_pages) 3056 goto out; 3057 3058 /* 3059 * Don't succeed if resizing is disabled, as a reader might be 3060 * manipulating the ring buffer and is expecting a sane state while 3061 * this is true. 3062 */ 3063 if (atomic_read(&cpu_buffer->resize_disabled)) { 3064 err = -EBUSY; 3065 goto out_err_unlock; 3066 } 3067 3068 cpu_buffer->nr_pages_to_update = nr_pages - 3069 cpu_buffer->nr_pages; 3070 3071 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3072 if (cpu_buffer->nr_pages_to_update > 0 && 3073 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3074 &cpu_buffer->new_pages)) { 3075 err = -ENOMEM; 3076 goto out_err; 3077 } 3078 3079 /* Can't run something on an offline CPU. */ 3080 if (!cpu_online(cpu_id)) 3081 rb_update_pages(cpu_buffer); 3082 else { 3083 /* Run directly if possible. */ 3084 migrate_disable(); 3085 if (cpu_id == smp_processor_id()) { 3086 rb_update_pages(cpu_buffer); 3087 migrate_enable(); 3088 } else { 3089 migrate_enable(); 3090 schedule_work_on(cpu_id, 3091 &cpu_buffer->update_pages_work); 3092 wait_for_completion(&cpu_buffer->update_done); 3093 } 3094 } 3095 3096 cpu_buffer->nr_pages_to_update = 0; 3097 } 3098 3099 out: 3100 /* 3101 * The ring buffer resize can happen with the ring buffer 3102 * enabled, so that the update disturbs the tracing as little 3103 * as possible. But if the buffer is disabled, we do not need 3104 * to worry about that, and we can take the time to verify 3105 * that the buffer is not corrupt. 3106 */ 3107 if (atomic_read(&buffer->record_disabled)) { 3108 atomic_inc(&buffer->record_disabled); 3109 /* 3110 * Even though the buffer was disabled, we must make sure 3111 * that it is truly disabled before calling rb_check_pages. 3112 * There could have been a race between checking 3113 * record_disable and incrementing it. 3114 */ 3115 synchronize_rcu(); 3116 for_each_buffer_cpu(buffer, cpu) { 3117 cpu_buffer = buffer->buffers[cpu]; 3118 rb_check_pages(cpu_buffer); 3119 } 3120 atomic_dec(&buffer->record_disabled); 3121 } 3122 3123 atomic_dec(&buffer->resizing); 3124 mutex_unlock(&buffer->mutex); 3125 return 0; 3126 3127 out_err: 3128 for_each_buffer_cpu(buffer, cpu) { 3129 struct buffer_page *bpage, *tmp; 3130 3131 cpu_buffer = buffer->buffers[cpu]; 3132 cpu_buffer->nr_pages_to_update = 0; 3133 3134 if (list_empty(&cpu_buffer->new_pages)) 3135 continue; 3136 3137 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3138 list) { 3139 list_del_init(&bpage->list); 3140 free_buffer_page(bpage); 3141 3142 cond_resched(); 3143 } 3144 } 3145 out_err_unlock: 3146 atomic_dec(&buffer->resizing); 3147 mutex_unlock(&buffer->mutex); 3148 return err; 3149 } 3150 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3151 3152 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3153 { 3154 mutex_lock(&buffer->mutex); 3155 if (val) 3156 buffer->flags |= RB_FL_OVERWRITE; 3157 else 3158 buffer->flags &= ~RB_FL_OVERWRITE; 3159 mutex_unlock(&buffer->mutex); 3160 } 3161 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3162 3163 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3164 { 3165 return bpage->page->data + index; 3166 } 3167 3168 static __always_inline struct ring_buffer_event * 3169 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3170 { 3171 return __rb_page_index(cpu_buffer->reader_page, 3172 cpu_buffer->reader_page->read); 3173 } 3174 3175 static struct ring_buffer_event * 3176 rb_iter_head_event(struct ring_buffer_iter *iter) 3177 { 3178 struct ring_buffer_event *event; 3179 struct buffer_page *iter_head_page = iter->head_page; 3180 unsigned long commit; 3181 unsigned length; 3182 3183 if (iter->head != iter->next_event) 3184 return iter->event; 3185 3186 /* 3187 * When the writer goes across pages, it issues a cmpxchg which 3188 * is a mb(), which will synchronize with the rmb here. 3189 * (see rb_tail_page_update() and __rb_reserve_next()) 3190 */ 3191 commit = rb_page_commit(iter_head_page); 3192 smp_rmb(); 3193 3194 /* An event needs to be at least 8 bytes in size */ 3195 if (iter->head > commit - 8) 3196 goto reset; 3197 3198 event = __rb_page_index(iter_head_page, iter->head); 3199 length = rb_event_length(event); 3200 3201 /* 3202 * READ_ONCE() doesn't work on functions and we don't want the 3203 * compiler doing any crazy optimizations with length. 3204 */ 3205 barrier(); 3206 3207 if ((iter->head + length) > commit || length > iter->event_size) 3208 /* Writer corrupted the read? */ 3209 goto reset; 3210 3211 memcpy(iter->event, event, length); 3212 /* 3213 * If the page stamp is still the same after this rmb() then the 3214 * event was safely copied without the writer entering the page. 3215 */ 3216 smp_rmb(); 3217 3218 /* Make sure the page didn't change since we read this */ 3219 if (iter->page_stamp != iter_head_page->page->time_stamp || 3220 commit > rb_page_commit(iter_head_page)) 3221 goto reset; 3222 3223 iter->next_event = iter->head + length; 3224 return iter->event; 3225 reset: 3226 /* Reset to the beginning */ 3227 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3228 iter->head = 0; 3229 iter->next_event = 0; 3230 iter->missed_events = 1; 3231 return NULL; 3232 } 3233 3234 /* Size is determined by what has been committed */ 3235 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3236 { 3237 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3238 } 3239 3240 static __always_inline unsigned 3241 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3242 { 3243 return rb_page_commit(cpu_buffer->commit_page); 3244 } 3245 3246 static __always_inline unsigned 3247 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3248 { 3249 unsigned long addr = (unsigned long)event; 3250 3251 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3252 3253 return addr - BUF_PAGE_HDR_SIZE; 3254 } 3255 3256 static void rb_inc_iter(struct ring_buffer_iter *iter) 3257 { 3258 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3259 3260 /* 3261 * The iterator could be on the reader page (it starts there). 3262 * But the head could have moved, since the reader was 3263 * found. Check for this case and assign the iterator 3264 * to the head page instead of next. 3265 */ 3266 if (iter->head_page == cpu_buffer->reader_page) 3267 iter->head_page = rb_set_head_page(cpu_buffer); 3268 else 3269 rb_inc_page(&iter->head_page); 3270 3271 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3272 iter->head = 0; 3273 iter->next_event = 0; 3274 } 3275 3276 /* Return the index into the sub-buffers for a given sub-buffer */ 3277 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3278 { 3279 void *subbuf_array; 3280 3281 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3282 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3283 return (subbuf - subbuf_array) / meta->subbuf_size; 3284 } 3285 3286 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3287 struct buffer_page *next_page) 3288 { 3289 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3290 unsigned long old_head = (unsigned long)next_page->page; 3291 unsigned long new_head; 3292 3293 rb_inc_page(&next_page); 3294 new_head = (unsigned long)next_page->page; 3295 3296 /* 3297 * Only move it forward once, if something else came in and 3298 * moved it forward, then we don't want to touch it. 3299 */ 3300 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3301 } 3302 3303 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3304 struct buffer_page *reader) 3305 { 3306 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3307 void *old_reader = cpu_buffer->reader_page->page; 3308 void *new_reader = reader->page; 3309 int id; 3310 3311 id = reader->id; 3312 cpu_buffer->reader_page->id = id; 3313 reader->id = 0; 3314 3315 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3316 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3317 3318 /* The head pointer is the one after the reader */ 3319 rb_update_meta_head(cpu_buffer, reader); 3320 } 3321 3322 /* 3323 * rb_handle_head_page - writer hit the head page 3324 * 3325 * Returns: +1 to retry page 3326 * 0 to continue 3327 * -1 on error 3328 */ 3329 static int 3330 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3331 struct buffer_page *tail_page, 3332 struct buffer_page *next_page) 3333 { 3334 struct buffer_page *new_head; 3335 int entries; 3336 int type; 3337 int ret; 3338 3339 entries = rb_page_entries(next_page); 3340 3341 /* 3342 * The hard part is here. We need to move the head 3343 * forward, and protect against both readers on 3344 * other CPUs and writers coming in via interrupts. 3345 */ 3346 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3347 RB_PAGE_HEAD); 3348 3349 /* 3350 * type can be one of four: 3351 * NORMAL - an interrupt already moved it for us 3352 * HEAD - we are the first to get here. 3353 * UPDATE - we are the interrupt interrupting 3354 * a current move. 3355 * MOVED - a reader on another CPU moved the next 3356 * pointer to its reader page. Give up 3357 * and try again. 3358 */ 3359 3360 switch (type) { 3361 case RB_PAGE_HEAD: 3362 /* 3363 * We changed the head to UPDATE, thus 3364 * it is our responsibility to update 3365 * the counters. 3366 */ 3367 local_add(entries, &cpu_buffer->overrun); 3368 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3369 local_inc(&cpu_buffer->pages_lost); 3370 3371 if (cpu_buffer->ring_meta) 3372 rb_update_meta_head(cpu_buffer, next_page); 3373 /* 3374 * The entries will be zeroed out when we move the 3375 * tail page. 3376 */ 3377 3378 /* still more to do */ 3379 break; 3380 3381 case RB_PAGE_UPDATE: 3382 /* 3383 * This is an interrupt that interrupt the 3384 * previous update. Still more to do. 3385 */ 3386 break; 3387 case RB_PAGE_NORMAL: 3388 /* 3389 * An interrupt came in before the update 3390 * and processed this for us. 3391 * Nothing left to do. 3392 */ 3393 return 1; 3394 case RB_PAGE_MOVED: 3395 /* 3396 * The reader is on another CPU and just did 3397 * a swap with our next_page. 3398 * Try again. 3399 */ 3400 return 1; 3401 default: 3402 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3403 return -1; 3404 } 3405 3406 /* 3407 * Now that we are here, the old head pointer is 3408 * set to UPDATE. This will keep the reader from 3409 * swapping the head page with the reader page. 3410 * The reader (on another CPU) will spin till 3411 * we are finished. 3412 * 3413 * We just need to protect against interrupts 3414 * doing the job. We will set the next pointer 3415 * to HEAD. After that, we set the old pointer 3416 * to NORMAL, but only if it was HEAD before. 3417 * otherwise we are an interrupt, and only 3418 * want the outer most commit to reset it. 3419 */ 3420 new_head = next_page; 3421 rb_inc_page(&new_head); 3422 3423 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3424 RB_PAGE_NORMAL); 3425 3426 /* 3427 * Valid returns are: 3428 * HEAD - an interrupt came in and already set it. 3429 * NORMAL - One of two things: 3430 * 1) We really set it. 3431 * 2) A bunch of interrupts came in and moved 3432 * the page forward again. 3433 */ 3434 switch (ret) { 3435 case RB_PAGE_HEAD: 3436 case RB_PAGE_NORMAL: 3437 /* OK */ 3438 break; 3439 default: 3440 RB_WARN_ON(cpu_buffer, 1); 3441 return -1; 3442 } 3443 3444 /* 3445 * It is possible that an interrupt came in, 3446 * set the head up, then more interrupts came in 3447 * and moved it again. When we get back here, 3448 * the page would have been set to NORMAL but we 3449 * just set it back to HEAD. 3450 * 3451 * How do you detect this? Well, if that happened 3452 * the tail page would have moved. 3453 */ 3454 if (ret == RB_PAGE_NORMAL) { 3455 struct buffer_page *buffer_tail_page; 3456 3457 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3458 /* 3459 * If the tail had moved passed next, then we need 3460 * to reset the pointer. 3461 */ 3462 if (buffer_tail_page != tail_page && 3463 buffer_tail_page != next_page) 3464 rb_head_page_set_normal(cpu_buffer, new_head, 3465 next_page, 3466 RB_PAGE_HEAD); 3467 } 3468 3469 /* 3470 * If this was the outer most commit (the one that 3471 * changed the original pointer from HEAD to UPDATE), 3472 * then it is up to us to reset it to NORMAL. 3473 */ 3474 if (type == RB_PAGE_HEAD) { 3475 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3476 tail_page, 3477 RB_PAGE_UPDATE); 3478 if (RB_WARN_ON(cpu_buffer, 3479 ret != RB_PAGE_UPDATE)) 3480 return -1; 3481 } 3482 3483 return 0; 3484 } 3485 3486 static inline void 3487 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3488 unsigned long tail, struct rb_event_info *info) 3489 { 3490 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3491 struct buffer_page *tail_page = info->tail_page; 3492 struct ring_buffer_event *event; 3493 unsigned long length = info->length; 3494 3495 /* 3496 * Only the event that crossed the page boundary 3497 * must fill the old tail_page with padding. 3498 */ 3499 if (tail >= bsize) { 3500 /* 3501 * If the page was filled, then we still need 3502 * to update the real_end. Reset it to zero 3503 * and the reader will ignore it. 3504 */ 3505 if (tail == bsize) 3506 tail_page->real_end = 0; 3507 3508 local_sub(length, &tail_page->write); 3509 return; 3510 } 3511 3512 event = __rb_page_index(tail_page, tail); 3513 3514 /* 3515 * Save the original length to the meta data. 3516 * This will be used by the reader to add lost event 3517 * counter. 3518 */ 3519 tail_page->real_end = tail; 3520 3521 /* 3522 * If this event is bigger than the minimum size, then 3523 * we need to be careful that we don't subtract the 3524 * write counter enough to allow another writer to slip 3525 * in on this page. 3526 * We put in a discarded commit instead, to make sure 3527 * that this space is not used again, and this space will 3528 * not be accounted into 'entries_bytes'. 3529 * 3530 * If we are less than the minimum size, we don't need to 3531 * worry about it. 3532 */ 3533 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3534 /* No room for any events */ 3535 3536 /* Mark the rest of the page with padding */ 3537 rb_event_set_padding(event); 3538 3539 /* Make sure the padding is visible before the write update */ 3540 smp_wmb(); 3541 3542 /* Set the write back to the previous setting */ 3543 local_sub(length, &tail_page->write); 3544 return; 3545 } 3546 3547 /* Put in a discarded event */ 3548 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3549 event->type_len = RINGBUF_TYPE_PADDING; 3550 /* time delta must be non zero */ 3551 event->time_delta = 1; 3552 3553 /* account for padding bytes */ 3554 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3555 3556 /* Make sure the padding is visible before the tail_page->write update */ 3557 smp_wmb(); 3558 3559 /* Set write to end of buffer */ 3560 length = (tail + length) - bsize; 3561 local_sub(length, &tail_page->write); 3562 } 3563 3564 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3565 3566 /* 3567 * This is the slow path, force gcc not to inline it. 3568 */ 3569 static noinline struct ring_buffer_event * 3570 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3571 unsigned long tail, struct rb_event_info *info) 3572 { 3573 struct buffer_page *tail_page = info->tail_page; 3574 struct buffer_page *commit_page = cpu_buffer->commit_page; 3575 struct trace_buffer *buffer = cpu_buffer->buffer; 3576 struct buffer_page *next_page; 3577 int ret; 3578 3579 next_page = tail_page; 3580 3581 rb_inc_page(&next_page); 3582 3583 /* 3584 * If for some reason, we had an interrupt storm that made 3585 * it all the way around the buffer, bail, and warn 3586 * about it. 3587 */ 3588 if (unlikely(next_page == commit_page)) { 3589 local_inc(&cpu_buffer->commit_overrun); 3590 goto out_reset; 3591 } 3592 3593 /* 3594 * This is where the fun begins! 3595 * 3596 * We are fighting against races between a reader that 3597 * could be on another CPU trying to swap its reader 3598 * page with the buffer head. 3599 * 3600 * We are also fighting against interrupts coming in and 3601 * moving the head or tail on us as well. 3602 * 3603 * If the next page is the head page then we have filled 3604 * the buffer, unless the commit page is still on the 3605 * reader page. 3606 */ 3607 if (rb_is_head_page(next_page, &tail_page->list)) { 3608 3609 /* 3610 * If the commit is not on the reader page, then 3611 * move the header page. 3612 */ 3613 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3614 /* 3615 * If we are not in overwrite mode, 3616 * this is easy, just stop here. 3617 */ 3618 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3619 local_inc(&cpu_buffer->dropped_events); 3620 goto out_reset; 3621 } 3622 3623 ret = rb_handle_head_page(cpu_buffer, 3624 tail_page, 3625 next_page); 3626 if (ret < 0) 3627 goto out_reset; 3628 if (ret) 3629 goto out_again; 3630 } else { 3631 /* 3632 * We need to be careful here too. The 3633 * commit page could still be on the reader 3634 * page. We could have a small buffer, and 3635 * have filled up the buffer with events 3636 * from interrupts and such, and wrapped. 3637 * 3638 * Note, if the tail page is also on the 3639 * reader_page, we let it move out. 3640 */ 3641 if (unlikely((cpu_buffer->commit_page != 3642 cpu_buffer->tail_page) && 3643 (cpu_buffer->commit_page == 3644 cpu_buffer->reader_page))) { 3645 local_inc(&cpu_buffer->commit_overrun); 3646 goto out_reset; 3647 } 3648 } 3649 } 3650 3651 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3652 3653 out_again: 3654 3655 rb_reset_tail(cpu_buffer, tail, info); 3656 3657 /* Commit what we have for now. */ 3658 rb_end_commit(cpu_buffer); 3659 /* rb_end_commit() decs committing */ 3660 local_inc(&cpu_buffer->committing); 3661 3662 /* fail and let the caller try again */ 3663 return ERR_PTR(-EAGAIN); 3664 3665 out_reset: 3666 /* reset write */ 3667 rb_reset_tail(cpu_buffer, tail, info); 3668 3669 return NULL; 3670 } 3671 3672 /* Slow path */ 3673 static struct ring_buffer_event * 3674 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3675 struct ring_buffer_event *event, u64 delta, bool abs) 3676 { 3677 if (abs) 3678 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3679 else 3680 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3681 3682 /* Not the first event on the page, or not delta? */ 3683 if (abs || rb_event_index(cpu_buffer, event)) { 3684 event->time_delta = delta & TS_MASK; 3685 event->array[0] = delta >> TS_SHIFT; 3686 } else { 3687 /* nope, just zero it */ 3688 event->time_delta = 0; 3689 event->array[0] = 0; 3690 } 3691 3692 return skip_time_extend(event); 3693 } 3694 3695 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3696 static inline bool sched_clock_stable(void) 3697 { 3698 return true; 3699 } 3700 #endif 3701 3702 static void 3703 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3704 struct rb_event_info *info) 3705 { 3706 u64 write_stamp; 3707 3708 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3709 (unsigned long long)info->delta, 3710 (unsigned long long)info->ts, 3711 (unsigned long long)info->before, 3712 (unsigned long long)info->after, 3713 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3714 sched_clock_stable() ? "" : 3715 "If you just came from a suspend/resume,\n" 3716 "please switch to the trace global clock:\n" 3717 " echo global > /sys/kernel/tracing/trace_clock\n" 3718 "or add trace_clock=global to the kernel command line\n"); 3719 } 3720 3721 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3722 struct ring_buffer_event **event, 3723 struct rb_event_info *info, 3724 u64 *delta, 3725 unsigned int *length) 3726 { 3727 bool abs = info->add_timestamp & 3728 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3729 3730 if (unlikely(info->delta > (1ULL << 59))) { 3731 /* 3732 * Some timers can use more than 59 bits, and when a timestamp 3733 * is added to the buffer, it will lose those bits. 3734 */ 3735 if (abs && (info->ts & TS_MSB)) { 3736 info->delta &= ABS_TS_MASK; 3737 3738 /* did the clock go backwards */ 3739 } else if (info->before == info->after && info->before > info->ts) { 3740 /* not interrupted */ 3741 static int once; 3742 3743 /* 3744 * This is possible with a recalibrating of the TSC. 3745 * Do not produce a call stack, but just report it. 3746 */ 3747 if (!once) { 3748 once++; 3749 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3750 info->before, info->ts); 3751 } 3752 } else 3753 rb_check_timestamp(cpu_buffer, info); 3754 if (!abs) 3755 info->delta = 0; 3756 } 3757 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3758 *length -= RB_LEN_TIME_EXTEND; 3759 *delta = 0; 3760 } 3761 3762 /** 3763 * rb_update_event - update event type and data 3764 * @cpu_buffer: The per cpu buffer of the @event 3765 * @event: the event to update 3766 * @info: The info to update the @event with (contains length and delta) 3767 * 3768 * Update the type and data fields of the @event. The length 3769 * is the actual size that is written to the ring buffer, 3770 * and with this, we can determine what to place into the 3771 * data field. 3772 */ 3773 static void 3774 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3775 struct ring_buffer_event *event, 3776 struct rb_event_info *info) 3777 { 3778 unsigned length = info->length; 3779 u64 delta = info->delta; 3780 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3781 3782 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3783 cpu_buffer->event_stamp[nest] = info->ts; 3784 3785 /* 3786 * If we need to add a timestamp, then we 3787 * add it to the start of the reserved space. 3788 */ 3789 if (unlikely(info->add_timestamp)) 3790 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3791 3792 event->time_delta = delta; 3793 length -= RB_EVNT_HDR_SIZE; 3794 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3795 event->type_len = 0; 3796 event->array[0] = length; 3797 } else 3798 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3799 } 3800 3801 static unsigned rb_calculate_event_length(unsigned length) 3802 { 3803 struct ring_buffer_event event; /* Used only for sizeof array */ 3804 3805 /* zero length can cause confusions */ 3806 if (!length) 3807 length++; 3808 3809 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3810 length += sizeof(event.array[0]); 3811 3812 length += RB_EVNT_HDR_SIZE; 3813 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3814 3815 /* 3816 * In case the time delta is larger than the 27 bits for it 3817 * in the header, we need to add a timestamp. If another 3818 * event comes in when trying to discard this one to increase 3819 * the length, then the timestamp will be added in the allocated 3820 * space of this event. If length is bigger than the size needed 3821 * for the TIME_EXTEND, then padding has to be used. The events 3822 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3823 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3824 * As length is a multiple of 4, we only need to worry if it 3825 * is 12 (RB_LEN_TIME_EXTEND + 4). 3826 */ 3827 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3828 length += RB_ALIGNMENT; 3829 3830 return length; 3831 } 3832 3833 static inline bool 3834 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3835 struct ring_buffer_event *event) 3836 { 3837 unsigned long new_index, old_index; 3838 struct buffer_page *bpage; 3839 unsigned long addr; 3840 3841 new_index = rb_event_index(cpu_buffer, event); 3842 old_index = new_index + rb_event_ts_length(event); 3843 addr = (unsigned long)event; 3844 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3845 3846 bpage = READ_ONCE(cpu_buffer->tail_page); 3847 3848 /* 3849 * Make sure the tail_page is still the same and 3850 * the next write location is the end of this event 3851 */ 3852 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3853 unsigned long write_mask = 3854 local_read(&bpage->write) & ~RB_WRITE_MASK; 3855 unsigned long event_length = rb_event_length(event); 3856 3857 /* 3858 * For the before_stamp to be different than the write_stamp 3859 * to make sure that the next event adds an absolute 3860 * value and does not rely on the saved write stamp, which 3861 * is now going to be bogus. 3862 * 3863 * By setting the before_stamp to zero, the next event 3864 * is not going to use the write_stamp and will instead 3865 * create an absolute timestamp. This means there's no 3866 * reason to update the wirte_stamp! 3867 */ 3868 rb_time_set(&cpu_buffer->before_stamp, 0); 3869 3870 /* 3871 * If an event were to come in now, it would see that the 3872 * write_stamp and the before_stamp are different, and assume 3873 * that this event just added itself before updating 3874 * the write stamp. The interrupting event will fix the 3875 * write stamp for us, and use an absolute timestamp. 3876 */ 3877 3878 /* 3879 * This is on the tail page. It is possible that 3880 * a write could come in and move the tail page 3881 * and write to the next page. That is fine 3882 * because we just shorten what is on this page. 3883 */ 3884 old_index += write_mask; 3885 new_index += write_mask; 3886 3887 /* caution: old_index gets updated on cmpxchg failure */ 3888 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3889 /* update counters */ 3890 local_sub(event_length, &cpu_buffer->entries_bytes); 3891 return true; 3892 } 3893 } 3894 3895 /* could not discard */ 3896 return false; 3897 } 3898 3899 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3900 { 3901 local_inc(&cpu_buffer->committing); 3902 local_inc(&cpu_buffer->commits); 3903 } 3904 3905 static __always_inline void 3906 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3907 { 3908 unsigned long max_count; 3909 3910 /* 3911 * We only race with interrupts and NMIs on this CPU. 3912 * If we own the commit event, then we can commit 3913 * all others that interrupted us, since the interruptions 3914 * are in stack format (they finish before they come 3915 * back to us). This allows us to do a simple loop to 3916 * assign the commit to the tail. 3917 */ 3918 again: 3919 max_count = cpu_buffer->nr_pages * 100; 3920 3921 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3922 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3923 return; 3924 if (RB_WARN_ON(cpu_buffer, 3925 rb_is_reader_page(cpu_buffer->tail_page))) 3926 return; 3927 /* 3928 * No need for a memory barrier here, as the update 3929 * of the tail_page did it for this page. 3930 */ 3931 local_set(&cpu_buffer->commit_page->page->commit, 3932 rb_page_write(cpu_buffer->commit_page)); 3933 rb_inc_page(&cpu_buffer->commit_page); 3934 if (cpu_buffer->ring_meta) { 3935 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3936 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3937 } 3938 /* add barrier to keep gcc from optimizing too much */ 3939 barrier(); 3940 } 3941 while (rb_commit_index(cpu_buffer) != 3942 rb_page_write(cpu_buffer->commit_page)) { 3943 3944 /* Make sure the readers see the content of what is committed. */ 3945 smp_wmb(); 3946 local_set(&cpu_buffer->commit_page->page->commit, 3947 rb_page_write(cpu_buffer->commit_page)); 3948 RB_WARN_ON(cpu_buffer, 3949 local_read(&cpu_buffer->commit_page->page->commit) & 3950 ~RB_WRITE_MASK); 3951 barrier(); 3952 } 3953 3954 /* again, keep gcc from optimizing */ 3955 barrier(); 3956 3957 /* 3958 * If an interrupt came in just after the first while loop 3959 * and pushed the tail page forward, we will be left with 3960 * a dangling commit that will never go forward. 3961 */ 3962 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3963 goto again; 3964 } 3965 3966 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3967 { 3968 unsigned long commits; 3969 3970 if (RB_WARN_ON(cpu_buffer, 3971 !local_read(&cpu_buffer->committing))) 3972 return; 3973 3974 again: 3975 commits = local_read(&cpu_buffer->commits); 3976 /* synchronize with interrupts */ 3977 barrier(); 3978 if (local_read(&cpu_buffer->committing) == 1) 3979 rb_set_commit_to_write(cpu_buffer); 3980 3981 local_dec(&cpu_buffer->committing); 3982 3983 /* synchronize with interrupts */ 3984 barrier(); 3985 3986 /* 3987 * Need to account for interrupts coming in between the 3988 * updating of the commit page and the clearing of the 3989 * committing counter. 3990 */ 3991 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3992 !local_read(&cpu_buffer->committing)) { 3993 local_inc(&cpu_buffer->committing); 3994 goto again; 3995 } 3996 } 3997 3998 static inline void rb_event_discard(struct ring_buffer_event *event) 3999 { 4000 if (extended_time(event)) 4001 event = skip_time_extend(event); 4002 4003 /* array[0] holds the actual length for the discarded event */ 4004 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 4005 event->type_len = RINGBUF_TYPE_PADDING; 4006 /* time delta must be non zero */ 4007 if (!event->time_delta) 4008 event->time_delta = 1; 4009 } 4010 4011 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 4012 { 4013 local_inc(&cpu_buffer->entries); 4014 rb_end_commit(cpu_buffer); 4015 } 4016 4017 static bool 4018 rb_irq_work_queue(struct rb_irq_work *irq_work) 4019 { 4020 int cpu; 4021 4022 /* irq_work_queue_on() is not NMI-safe */ 4023 if (unlikely(in_nmi())) 4024 return irq_work_queue(&irq_work->work); 4025 4026 /* 4027 * If CPU isolation is not active, cpu is always the current 4028 * CPU, and the following is equivallent to irq_work_queue(). 4029 */ 4030 cpu = housekeeping_any_cpu(HK_TYPE_KERNEL_NOISE); 4031 return irq_work_queue_on(&irq_work->work, cpu); 4032 } 4033 4034 static __always_inline void 4035 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4036 { 4037 if (buffer->irq_work.waiters_pending) { 4038 buffer->irq_work.waiters_pending = false; 4039 /* irq_work_queue() supplies it's own memory barriers */ 4040 rb_irq_work_queue(&buffer->irq_work); 4041 } 4042 4043 if (cpu_buffer->irq_work.waiters_pending) { 4044 cpu_buffer->irq_work.waiters_pending = false; 4045 /* irq_work_queue() supplies it's own memory barriers */ 4046 rb_irq_work_queue(&cpu_buffer->irq_work); 4047 } 4048 4049 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4050 return; 4051 4052 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4053 return; 4054 4055 if (!cpu_buffer->irq_work.full_waiters_pending) 4056 return; 4057 4058 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4059 4060 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4061 return; 4062 4063 cpu_buffer->irq_work.wakeup_full = true; 4064 cpu_buffer->irq_work.full_waiters_pending = false; 4065 /* irq_work_queue() supplies it's own memory barriers */ 4066 rb_irq_work_queue(&cpu_buffer->irq_work); 4067 } 4068 4069 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4070 # define do_ring_buffer_record_recursion() \ 4071 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4072 #else 4073 # define do_ring_buffer_record_recursion() do { } while (0) 4074 #endif 4075 4076 /* 4077 * The lock and unlock are done within a preempt disable section. 4078 * The current_context per_cpu variable can only be modified 4079 * by the current task between lock and unlock. But it can 4080 * be modified more than once via an interrupt. To pass this 4081 * information from the lock to the unlock without having to 4082 * access the 'in_interrupt()' functions again (which do show 4083 * a bit of overhead in something as critical as function tracing, 4084 * we use a bitmask trick. 4085 * 4086 * bit 1 = NMI context 4087 * bit 2 = IRQ context 4088 * bit 3 = SoftIRQ context 4089 * bit 4 = normal context. 4090 * 4091 * This works because this is the order of contexts that can 4092 * preempt other contexts. A SoftIRQ never preempts an IRQ 4093 * context. 4094 * 4095 * When the context is determined, the corresponding bit is 4096 * checked and set (if it was set, then a recursion of that context 4097 * happened). 4098 * 4099 * On unlock, we need to clear this bit. To do so, just subtract 4100 * 1 from the current_context and AND it to itself. 4101 * 4102 * (binary) 4103 * 101 - 1 = 100 4104 * 101 & 100 = 100 (clearing bit zero) 4105 * 4106 * 1010 - 1 = 1001 4107 * 1010 & 1001 = 1000 (clearing bit 1) 4108 * 4109 * The least significant bit can be cleared this way, and it 4110 * just so happens that it is the same bit corresponding to 4111 * the current context. 4112 * 4113 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4114 * is set when a recursion is detected at the current context, and if 4115 * the TRANSITION bit is already set, it will fail the recursion. 4116 * This is needed because there's a lag between the changing of 4117 * interrupt context and updating the preempt count. In this case, 4118 * a false positive will be found. To handle this, one extra recursion 4119 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4120 * bit is already set, then it is considered a recursion and the function 4121 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4122 * 4123 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4124 * to be cleared. Even if it wasn't the context that set it. That is, 4125 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4126 * is called before preempt_count() is updated, since the check will 4127 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4128 * NMI then comes in, it will set the NMI bit, but when the NMI code 4129 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4130 * and leave the NMI bit set. But this is fine, because the interrupt 4131 * code that set the TRANSITION bit will then clear the NMI bit when it 4132 * calls trace_recursive_unlock(). If another NMI comes in, it will 4133 * set the TRANSITION bit and continue. 4134 * 4135 * Note: The TRANSITION bit only handles a single transition between context. 4136 */ 4137 4138 static __always_inline bool 4139 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4140 { 4141 unsigned int val = cpu_buffer->current_context; 4142 int bit = interrupt_context_level(); 4143 4144 bit = RB_CTX_NORMAL - bit; 4145 4146 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4147 /* 4148 * It is possible that this was called by transitioning 4149 * between interrupt context, and preempt_count() has not 4150 * been updated yet. In this case, use the TRANSITION bit. 4151 */ 4152 bit = RB_CTX_TRANSITION; 4153 if (val & (1 << (bit + cpu_buffer->nest))) { 4154 do_ring_buffer_record_recursion(); 4155 return true; 4156 } 4157 } 4158 4159 val |= (1 << (bit + cpu_buffer->nest)); 4160 cpu_buffer->current_context = val; 4161 4162 return false; 4163 } 4164 4165 static __always_inline void 4166 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4167 { 4168 cpu_buffer->current_context &= 4169 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4170 } 4171 4172 /* The recursive locking above uses 5 bits */ 4173 #define NESTED_BITS 5 4174 4175 /** 4176 * ring_buffer_nest_start - Allow to trace while nested 4177 * @buffer: The ring buffer to modify 4178 * 4179 * The ring buffer has a safety mechanism to prevent recursion. 4180 * But there may be a case where a trace needs to be done while 4181 * tracing something else. In this case, calling this function 4182 * will allow this function to nest within a currently active 4183 * ring_buffer_lock_reserve(). 4184 * 4185 * Call this function before calling another ring_buffer_lock_reserve() and 4186 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4187 */ 4188 void ring_buffer_nest_start(struct trace_buffer *buffer) 4189 { 4190 struct ring_buffer_per_cpu *cpu_buffer; 4191 int cpu; 4192 4193 /* Enabled by ring_buffer_nest_end() */ 4194 preempt_disable_notrace(); 4195 cpu = raw_smp_processor_id(); 4196 cpu_buffer = buffer->buffers[cpu]; 4197 /* This is the shift value for the above recursive locking */ 4198 cpu_buffer->nest += NESTED_BITS; 4199 } 4200 4201 /** 4202 * ring_buffer_nest_end - Allow to trace while nested 4203 * @buffer: The ring buffer to modify 4204 * 4205 * Must be called after ring_buffer_nest_start() and after the 4206 * ring_buffer_unlock_commit(). 4207 */ 4208 void ring_buffer_nest_end(struct trace_buffer *buffer) 4209 { 4210 struct ring_buffer_per_cpu *cpu_buffer; 4211 int cpu; 4212 4213 /* disabled by ring_buffer_nest_start() */ 4214 cpu = raw_smp_processor_id(); 4215 cpu_buffer = buffer->buffers[cpu]; 4216 /* This is the shift value for the above recursive locking */ 4217 cpu_buffer->nest -= NESTED_BITS; 4218 preempt_enable_notrace(); 4219 } 4220 4221 /** 4222 * ring_buffer_unlock_commit - commit a reserved 4223 * @buffer: The buffer to commit to 4224 * 4225 * This commits the data to the ring buffer, and releases any locks held. 4226 * 4227 * Must be paired with ring_buffer_lock_reserve. 4228 */ 4229 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4230 { 4231 struct ring_buffer_per_cpu *cpu_buffer; 4232 int cpu = raw_smp_processor_id(); 4233 4234 cpu_buffer = buffer->buffers[cpu]; 4235 4236 rb_commit(cpu_buffer); 4237 4238 rb_wakeups(buffer, cpu_buffer); 4239 4240 trace_recursive_unlock(cpu_buffer); 4241 4242 preempt_enable_notrace(); 4243 4244 return 0; 4245 } 4246 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4247 4248 /* Special value to validate all deltas on a page. */ 4249 #define CHECK_FULL_PAGE 1L 4250 4251 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4252 4253 static const char *show_irq_str(int bits) 4254 { 4255 static const char * type[] = { 4256 ".", // 0 4257 "s", // 1 4258 "h", // 2 4259 "Hs", // 3 4260 "n", // 4 4261 "Ns", // 5 4262 "Nh", // 6 4263 "NHs", // 7 4264 }; 4265 4266 return type[bits]; 4267 } 4268 4269 /* Assume this is a trace event */ 4270 static const char *show_flags(struct ring_buffer_event *event) 4271 { 4272 struct trace_entry *entry; 4273 int bits = 0; 4274 4275 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4276 return "X"; 4277 4278 entry = ring_buffer_event_data(event); 4279 4280 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4281 bits |= 1; 4282 4283 if (entry->flags & TRACE_FLAG_HARDIRQ) 4284 bits |= 2; 4285 4286 if (entry->flags & TRACE_FLAG_NMI) 4287 bits |= 4; 4288 4289 return show_irq_str(bits); 4290 } 4291 4292 static const char *show_irq(struct ring_buffer_event *event) 4293 { 4294 struct trace_entry *entry; 4295 4296 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4297 return ""; 4298 4299 entry = ring_buffer_event_data(event); 4300 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4301 return "d"; 4302 return ""; 4303 } 4304 4305 static const char *show_interrupt_level(void) 4306 { 4307 unsigned long pc = preempt_count(); 4308 unsigned char level = 0; 4309 4310 if (pc & SOFTIRQ_OFFSET) 4311 level |= 1; 4312 4313 if (pc & HARDIRQ_MASK) 4314 level |= 2; 4315 4316 if (pc & NMI_MASK) 4317 level |= 4; 4318 4319 return show_irq_str(level); 4320 } 4321 4322 static void dump_buffer_page(struct buffer_data_page *bpage, 4323 struct rb_event_info *info, 4324 unsigned long tail) 4325 { 4326 struct ring_buffer_event *event; 4327 u64 ts, delta; 4328 int e; 4329 4330 ts = bpage->time_stamp; 4331 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4332 4333 for (e = 0; e < tail; e += rb_event_length(event)) { 4334 4335 event = (struct ring_buffer_event *)(bpage->data + e); 4336 4337 switch (event->type_len) { 4338 4339 case RINGBUF_TYPE_TIME_EXTEND: 4340 delta = rb_event_time_stamp(event); 4341 ts += delta; 4342 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4343 e, ts, delta); 4344 break; 4345 4346 case RINGBUF_TYPE_TIME_STAMP: 4347 delta = rb_event_time_stamp(event); 4348 ts = rb_fix_abs_ts(delta, ts); 4349 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4350 e, ts, delta); 4351 break; 4352 4353 case RINGBUF_TYPE_PADDING: 4354 ts += event->time_delta; 4355 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4356 e, ts, event->time_delta); 4357 break; 4358 4359 case RINGBUF_TYPE_DATA: 4360 ts += event->time_delta; 4361 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4362 e, ts, event->time_delta, 4363 show_flags(event), show_irq(event)); 4364 break; 4365 4366 default: 4367 break; 4368 } 4369 } 4370 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4371 } 4372 4373 static DEFINE_PER_CPU(atomic_t, checking); 4374 static atomic_t ts_dump; 4375 4376 #define buffer_warn_return(fmt, ...) \ 4377 do { \ 4378 /* If another report is happening, ignore this one */ \ 4379 if (atomic_inc_return(&ts_dump) != 1) { \ 4380 atomic_dec(&ts_dump); \ 4381 goto out; \ 4382 } \ 4383 atomic_inc(&cpu_buffer->record_disabled); \ 4384 pr_warn(fmt, ##__VA_ARGS__); \ 4385 dump_buffer_page(bpage, info, tail); \ 4386 atomic_dec(&ts_dump); \ 4387 /* There's some cases in boot up that this can happen */ \ 4388 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4389 /* Do not re-enable checking */ \ 4390 return; \ 4391 } while (0) 4392 4393 /* 4394 * Check if the current event time stamp matches the deltas on 4395 * the buffer page. 4396 */ 4397 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4398 struct rb_event_info *info, 4399 unsigned long tail) 4400 { 4401 struct buffer_data_page *bpage; 4402 u64 ts, delta; 4403 bool full = false; 4404 int ret; 4405 4406 bpage = info->tail_page->page; 4407 4408 if (tail == CHECK_FULL_PAGE) { 4409 full = true; 4410 tail = local_read(&bpage->commit); 4411 } else if (info->add_timestamp & 4412 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4413 /* Ignore events with absolute time stamps */ 4414 return; 4415 } 4416 4417 /* 4418 * Do not check the first event (skip possible extends too). 4419 * Also do not check if previous events have not been committed. 4420 */ 4421 if (tail <= 8 || tail > local_read(&bpage->commit)) 4422 return; 4423 4424 /* 4425 * If this interrupted another event, 4426 */ 4427 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4428 goto out; 4429 4430 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4431 if (ret < 0) { 4432 if (delta < ts) { 4433 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4434 cpu_buffer->cpu, ts, delta); 4435 goto out; 4436 } 4437 } 4438 if ((full && ts > info->ts) || 4439 (!full && ts + info->delta != info->ts)) { 4440 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4441 cpu_buffer->cpu, 4442 ts + info->delta, info->ts, info->delta, 4443 info->before, info->after, 4444 full ? " (full)" : "", show_interrupt_level()); 4445 } 4446 out: 4447 atomic_dec(this_cpu_ptr(&checking)); 4448 } 4449 #else 4450 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4451 struct rb_event_info *info, 4452 unsigned long tail) 4453 { 4454 } 4455 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4456 4457 static struct ring_buffer_event * 4458 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4459 struct rb_event_info *info) 4460 { 4461 struct ring_buffer_event *event; 4462 struct buffer_page *tail_page; 4463 unsigned long tail, write, w; 4464 4465 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4466 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4467 4468 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4469 barrier(); 4470 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4471 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4472 barrier(); 4473 info->ts = rb_time_stamp(cpu_buffer->buffer); 4474 4475 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4476 info->delta = info->ts; 4477 } else { 4478 /* 4479 * If interrupting an event time update, we may need an 4480 * absolute timestamp. 4481 * Don't bother if this is the start of a new page (w == 0). 4482 */ 4483 if (!w) { 4484 /* Use the sub-buffer timestamp */ 4485 info->delta = 0; 4486 } else if (unlikely(info->before != info->after)) { 4487 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4488 info->length += RB_LEN_TIME_EXTEND; 4489 } else { 4490 info->delta = info->ts - info->after; 4491 if (unlikely(test_time_stamp(info->delta))) { 4492 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4493 info->length += RB_LEN_TIME_EXTEND; 4494 } 4495 } 4496 } 4497 4498 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4499 4500 /*C*/ write = local_add_return(info->length, &tail_page->write); 4501 4502 /* set write to only the index of the write */ 4503 write &= RB_WRITE_MASK; 4504 4505 tail = write - info->length; 4506 4507 /* See if we shot pass the end of this buffer page */ 4508 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4509 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4510 return rb_move_tail(cpu_buffer, tail, info); 4511 } 4512 4513 if (likely(tail == w)) { 4514 /* Nothing interrupted us between A and C */ 4515 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4516 /* 4517 * If something came in between C and D, the write stamp 4518 * may now not be in sync. But that's fine as the before_stamp 4519 * will be different and then next event will just be forced 4520 * to use an absolute timestamp. 4521 */ 4522 if (likely(!(info->add_timestamp & 4523 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4524 /* This did not interrupt any time update */ 4525 info->delta = info->ts - info->after; 4526 else 4527 /* Just use full timestamp for interrupting event */ 4528 info->delta = info->ts; 4529 check_buffer(cpu_buffer, info, tail); 4530 } else { 4531 u64 ts; 4532 /* SLOW PATH - Interrupted between A and C */ 4533 4534 /* Save the old before_stamp */ 4535 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4536 4537 /* 4538 * Read a new timestamp and update the before_stamp to make 4539 * the next event after this one force using an absolute 4540 * timestamp. This is in case an interrupt were to come in 4541 * between E and F. 4542 */ 4543 ts = rb_time_stamp(cpu_buffer->buffer); 4544 rb_time_set(&cpu_buffer->before_stamp, ts); 4545 4546 barrier(); 4547 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4548 barrier(); 4549 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4550 info->after == info->before && info->after < ts) { 4551 /* 4552 * Nothing came after this event between C and F, it is 4553 * safe to use info->after for the delta as it 4554 * matched info->before and is still valid. 4555 */ 4556 info->delta = ts - info->after; 4557 } else { 4558 /* 4559 * Interrupted between C and F: 4560 * Lost the previous events time stamp. Just set the 4561 * delta to zero, and this will be the same time as 4562 * the event this event interrupted. And the events that 4563 * came after this will still be correct (as they would 4564 * have built their delta on the previous event. 4565 */ 4566 info->delta = 0; 4567 } 4568 info->ts = ts; 4569 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4570 } 4571 4572 /* 4573 * If this is the first commit on the page, then it has the same 4574 * timestamp as the page itself. 4575 */ 4576 if (unlikely(!tail && !(info->add_timestamp & 4577 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4578 info->delta = 0; 4579 4580 /* We reserved something on the buffer */ 4581 4582 event = __rb_page_index(tail_page, tail); 4583 rb_update_event(cpu_buffer, event, info); 4584 4585 local_inc(&tail_page->entries); 4586 4587 /* 4588 * If this is the first commit on the page, then update 4589 * its timestamp. 4590 */ 4591 if (unlikely(!tail)) 4592 tail_page->page->time_stamp = info->ts; 4593 4594 /* account for these added bytes */ 4595 local_add(info->length, &cpu_buffer->entries_bytes); 4596 4597 return event; 4598 } 4599 4600 static __always_inline struct ring_buffer_event * 4601 rb_reserve_next_event(struct trace_buffer *buffer, 4602 struct ring_buffer_per_cpu *cpu_buffer, 4603 unsigned long length) 4604 { 4605 struct ring_buffer_event *event; 4606 struct rb_event_info info; 4607 int nr_loops = 0; 4608 int add_ts_default; 4609 4610 /* 4611 * ring buffer does cmpxchg as well as atomic64 operations 4612 * (which some archs use locking for atomic64), make sure this 4613 * is safe in NMI context 4614 */ 4615 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4616 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4617 (unlikely(in_nmi()))) { 4618 return NULL; 4619 } 4620 4621 rb_start_commit(cpu_buffer); 4622 /* The commit page can not change after this */ 4623 4624 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4625 /* 4626 * Due to the ability to swap a cpu buffer from a buffer 4627 * it is possible it was swapped before we committed. 4628 * (committing stops a swap). We check for it here and 4629 * if it happened, we have to fail the write. 4630 */ 4631 barrier(); 4632 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4633 local_dec(&cpu_buffer->committing); 4634 local_dec(&cpu_buffer->commits); 4635 return NULL; 4636 } 4637 #endif 4638 4639 info.length = rb_calculate_event_length(length); 4640 4641 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4642 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4643 info.length += RB_LEN_TIME_EXTEND; 4644 if (info.length > cpu_buffer->buffer->max_data_size) 4645 goto out_fail; 4646 } else { 4647 add_ts_default = RB_ADD_STAMP_NONE; 4648 } 4649 4650 again: 4651 info.add_timestamp = add_ts_default; 4652 info.delta = 0; 4653 4654 /* 4655 * We allow for interrupts to reenter here and do a trace. 4656 * If one does, it will cause this original code to loop 4657 * back here. Even with heavy interrupts happening, this 4658 * should only happen a few times in a row. If this happens 4659 * 1000 times in a row, there must be either an interrupt 4660 * storm or we have something buggy. 4661 * Bail! 4662 */ 4663 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4664 goto out_fail; 4665 4666 event = __rb_reserve_next(cpu_buffer, &info); 4667 4668 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4669 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4670 info.length -= RB_LEN_TIME_EXTEND; 4671 goto again; 4672 } 4673 4674 if (likely(event)) 4675 return event; 4676 out_fail: 4677 rb_end_commit(cpu_buffer); 4678 return NULL; 4679 } 4680 4681 /** 4682 * ring_buffer_lock_reserve - reserve a part of the buffer 4683 * @buffer: the ring buffer to reserve from 4684 * @length: the length of the data to reserve (excluding event header) 4685 * 4686 * Returns a reserved event on the ring buffer to copy directly to. 4687 * The user of this interface will need to get the body to write into 4688 * and can use the ring_buffer_event_data() interface. 4689 * 4690 * The length is the length of the data needed, not the event length 4691 * which also includes the event header. 4692 * 4693 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4694 * If NULL is returned, then nothing has been allocated or locked. 4695 */ 4696 struct ring_buffer_event * 4697 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4698 { 4699 struct ring_buffer_per_cpu *cpu_buffer; 4700 struct ring_buffer_event *event; 4701 int cpu; 4702 4703 /* If we are tracing schedule, we don't want to recurse */ 4704 preempt_disable_notrace(); 4705 4706 if (unlikely(atomic_read(&buffer->record_disabled))) 4707 goto out; 4708 4709 cpu = raw_smp_processor_id(); 4710 4711 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4712 goto out; 4713 4714 cpu_buffer = buffer->buffers[cpu]; 4715 4716 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4717 goto out; 4718 4719 if (unlikely(length > buffer->max_data_size)) 4720 goto out; 4721 4722 if (unlikely(trace_recursive_lock(cpu_buffer))) 4723 goto out; 4724 4725 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4726 if (!event) 4727 goto out_unlock; 4728 4729 return event; 4730 4731 out_unlock: 4732 trace_recursive_unlock(cpu_buffer); 4733 out: 4734 preempt_enable_notrace(); 4735 return NULL; 4736 } 4737 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4738 4739 /* 4740 * Decrement the entries to the page that an event is on. 4741 * The event does not even need to exist, only the pointer 4742 * to the page it is on. This may only be called before the commit 4743 * takes place. 4744 */ 4745 static inline void 4746 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4747 struct ring_buffer_event *event) 4748 { 4749 unsigned long addr = (unsigned long)event; 4750 struct buffer_page *bpage = cpu_buffer->commit_page; 4751 struct buffer_page *start; 4752 4753 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4754 4755 /* Do the likely case first */ 4756 if (likely(bpage->page == (void *)addr)) { 4757 local_dec(&bpage->entries); 4758 return; 4759 } 4760 4761 /* 4762 * Because the commit page may be on the reader page we 4763 * start with the next page and check the end loop there. 4764 */ 4765 rb_inc_page(&bpage); 4766 start = bpage; 4767 do { 4768 if (bpage->page == (void *)addr) { 4769 local_dec(&bpage->entries); 4770 return; 4771 } 4772 rb_inc_page(&bpage); 4773 } while (bpage != start); 4774 4775 /* commit not part of this buffer?? */ 4776 RB_WARN_ON(cpu_buffer, 1); 4777 } 4778 4779 /** 4780 * ring_buffer_discard_commit - discard an event that has not been committed 4781 * @buffer: the ring buffer 4782 * @event: non committed event to discard 4783 * 4784 * Sometimes an event that is in the ring buffer needs to be ignored. 4785 * This function lets the user discard an event in the ring buffer 4786 * and then that event will not be read later. 4787 * 4788 * This function only works if it is called before the item has been 4789 * committed. It will try to free the event from the ring buffer 4790 * if another event has not been added behind it. 4791 * 4792 * If another event has been added behind it, it will set the event 4793 * up as discarded, and perform the commit. 4794 * 4795 * If this function is called, do not call ring_buffer_unlock_commit on 4796 * the event. 4797 */ 4798 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4799 struct ring_buffer_event *event) 4800 { 4801 struct ring_buffer_per_cpu *cpu_buffer; 4802 int cpu; 4803 4804 /* The event is discarded regardless */ 4805 rb_event_discard(event); 4806 4807 cpu = smp_processor_id(); 4808 cpu_buffer = buffer->buffers[cpu]; 4809 4810 /* 4811 * This must only be called if the event has not been 4812 * committed yet. Thus we can assume that preemption 4813 * is still disabled. 4814 */ 4815 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4816 4817 rb_decrement_entry(cpu_buffer, event); 4818 rb_try_to_discard(cpu_buffer, event); 4819 rb_end_commit(cpu_buffer); 4820 4821 trace_recursive_unlock(cpu_buffer); 4822 4823 preempt_enable_notrace(); 4824 4825 } 4826 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4827 4828 /** 4829 * ring_buffer_write - write data to the buffer without reserving 4830 * @buffer: The ring buffer to write to. 4831 * @length: The length of the data being written (excluding the event header) 4832 * @data: The data to write to the buffer. 4833 * 4834 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4835 * one function. If you already have the data to write to the buffer, it 4836 * may be easier to simply call this function. 4837 * 4838 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4839 * and not the length of the event which would hold the header. 4840 */ 4841 int ring_buffer_write(struct trace_buffer *buffer, 4842 unsigned long length, 4843 void *data) 4844 { 4845 struct ring_buffer_per_cpu *cpu_buffer; 4846 struct ring_buffer_event *event; 4847 void *body; 4848 int ret = -EBUSY; 4849 int cpu; 4850 4851 guard(preempt_notrace)(); 4852 4853 if (atomic_read(&buffer->record_disabled)) 4854 return -EBUSY; 4855 4856 cpu = raw_smp_processor_id(); 4857 4858 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4859 return -EBUSY; 4860 4861 cpu_buffer = buffer->buffers[cpu]; 4862 4863 if (atomic_read(&cpu_buffer->record_disabled)) 4864 return -EBUSY; 4865 4866 if (length > buffer->max_data_size) 4867 return -EBUSY; 4868 4869 if (unlikely(trace_recursive_lock(cpu_buffer))) 4870 return -EBUSY; 4871 4872 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4873 if (!event) 4874 goto out_unlock; 4875 4876 body = rb_event_data(event); 4877 4878 memcpy(body, data, length); 4879 4880 rb_commit(cpu_buffer); 4881 4882 rb_wakeups(buffer, cpu_buffer); 4883 4884 ret = 0; 4885 4886 out_unlock: 4887 trace_recursive_unlock(cpu_buffer); 4888 return ret; 4889 } 4890 EXPORT_SYMBOL_GPL(ring_buffer_write); 4891 4892 /* 4893 * The total entries in the ring buffer is the running counter 4894 * of entries entered into the ring buffer, minus the sum of 4895 * the entries read from the ring buffer and the number of 4896 * entries that were overwritten. 4897 */ 4898 static inline unsigned long 4899 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4900 { 4901 return local_read(&cpu_buffer->entries) - 4902 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4903 } 4904 4905 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4906 { 4907 return !rb_num_of_entries(cpu_buffer); 4908 } 4909 4910 /** 4911 * ring_buffer_record_disable - stop all writes into the buffer 4912 * @buffer: The ring buffer to stop writes to. 4913 * 4914 * This prevents all writes to the buffer. Any attempt to write 4915 * to the buffer after this will fail and return NULL. 4916 * 4917 * The caller should call synchronize_rcu() after this. 4918 */ 4919 void ring_buffer_record_disable(struct trace_buffer *buffer) 4920 { 4921 atomic_inc(&buffer->record_disabled); 4922 } 4923 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4924 4925 /** 4926 * ring_buffer_record_enable - enable writes to the buffer 4927 * @buffer: The ring buffer to enable writes 4928 * 4929 * Note, multiple disables will need the same number of enables 4930 * to truly enable the writing (much like preempt_disable). 4931 */ 4932 void ring_buffer_record_enable(struct trace_buffer *buffer) 4933 { 4934 atomic_dec(&buffer->record_disabled); 4935 } 4936 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4937 4938 /** 4939 * ring_buffer_record_off - stop all writes into the buffer 4940 * @buffer: The ring buffer to stop writes to. 4941 * 4942 * This prevents all writes to the buffer. Any attempt to write 4943 * to the buffer after this will fail and return NULL. 4944 * 4945 * This is different than ring_buffer_record_disable() as 4946 * it works like an on/off switch, where as the disable() version 4947 * must be paired with a enable(). 4948 */ 4949 void ring_buffer_record_off(struct trace_buffer *buffer) 4950 { 4951 unsigned int rd; 4952 unsigned int new_rd; 4953 4954 rd = atomic_read(&buffer->record_disabled); 4955 do { 4956 new_rd = rd | RB_BUFFER_OFF; 4957 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4958 } 4959 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4960 4961 /** 4962 * ring_buffer_record_on - restart writes into the buffer 4963 * @buffer: The ring buffer to start writes to. 4964 * 4965 * This enables all writes to the buffer that was disabled by 4966 * ring_buffer_record_off(). 4967 * 4968 * This is different than ring_buffer_record_enable() as 4969 * it works like an on/off switch, where as the enable() version 4970 * must be paired with a disable(). 4971 */ 4972 void ring_buffer_record_on(struct trace_buffer *buffer) 4973 { 4974 unsigned int rd; 4975 unsigned int new_rd; 4976 4977 rd = atomic_read(&buffer->record_disabled); 4978 do { 4979 new_rd = rd & ~RB_BUFFER_OFF; 4980 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4981 } 4982 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4983 4984 /** 4985 * ring_buffer_record_is_on - return true if the ring buffer can write 4986 * @buffer: The ring buffer to see if write is enabled 4987 * 4988 * Returns true if the ring buffer is in a state that it accepts writes. 4989 */ 4990 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4991 { 4992 return !atomic_read(&buffer->record_disabled); 4993 } 4994 4995 /** 4996 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4997 * @buffer: The ring buffer to see if write is set enabled 4998 * 4999 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 5000 * Note that this does NOT mean it is in a writable state. 5001 * 5002 * It may return true when the ring buffer has been disabled by 5003 * ring_buffer_record_disable(), as that is a temporary disabling of 5004 * the ring buffer. 5005 */ 5006 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 5007 { 5008 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 5009 } 5010 5011 /** 5012 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 5013 * @buffer: The ring buffer to see if write is enabled 5014 * @cpu: The CPU to test if the ring buffer can write too 5015 * 5016 * Returns true if the ring buffer is in a state that it accepts writes 5017 * for a particular CPU. 5018 */ 5019 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 5020 { 5021 struct ring_buffer_per_cpu *cpu_buffer; 5022 5023 cpu_buffer = buffer->buffers[cpu]; 5024 5025 return ring_buffer_record_is_set_on(buffer) && 5026 !atomic_read(&cpu_buffer->record_disabled); 5027 } 5028 5029 /** 5030 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 5031 * @buffer: The ring buffer to stop writes to. 5032 * @cpu: The CPU buffer to stop 5033 * 5034 * This prevents all writes to the buffer. Any attempt to write 5035 * to the buffer after this will fail and return NULL. 5036 * 5037 * The caller should call synchronize_rcu() after this. 5038 */ 5039 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5040 { 5041 struct ring_buffer_per_cpu *cpu_buffer; 5042 5043 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5044 return; 5045 5046 cpu_buffer = buffer->buffers[cpu]; 5047 atomic_inc(&cpu_buffer->record_disabled); 5048 } 5049 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5050 5051 /** 5052 * ring_buffer_record_enable_cpu - enable writes to the buffer 5053 * @buffer: The ring buffer to enable writes 5054 * @cpu: The CPU to enable. 5055 * 5056 * Note, multiple disables will need the same number of enables 5057 * to truly enable the writing (much like preempt_disable). 5058 */ 5059 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5060 { 5061 struct ring_buffer_per_cpu *cpu_buffer; 5062 5063 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5064 return; 5065 5066 cpu_buffer = buffer->buffers[cpu]; 5067 atomic_dec(&cpu_buffer->record_disabled); 5068 } 5069 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5070 5071 /** 5072 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5073 * @buffer: The ring buffer 5074 * @cpu: The per CPU buffer to read from. 5075 */ 5076 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5077 { 5078 unsigned long flags; 5079 struct ring_buffer_per_cpu *cpu_buffer; 5080 struct buffer_page *bpage; 5081 u64 ret = 0; 5082 5083 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5084 return 0; 5085 5086 cpu_buffer = buffer->buffers[cpu]; 5087 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5088 /* 5089 * if the tail is on reader_page, oldest time stamp is on the reader 5090 * page 5091 */ 5092 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5093 bpage = cpu_buffer->reader_page; 5094 else 5095 bpage = rb_set_head_page(cpu_buffer); 5096 if (bpage) 5097 ret = bpage->page->time_stamp; 5098 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5099 5100 return ret; 5101 } 5102 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5103 5104 /** 5105 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5106 * @buffer: The ring buffer 5107 * @cpu: The per CPU buffer to read from. 5108 */ 5109 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5110 { 5111 struct ring_buffer_per_cpu *cpu_buffer; 5112 unsigned long ret; 5113 5114 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5115 return 0; 5116 5117 cpu_buffer = buffer->buffers[cpu]; 5118 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5119 5120 return ret; 5121 } 5122 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5123 5124 /** 5125 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5126 * @buffer: The ring buffer 5127 * @cpu: The per CPU buffer to get the entries from. 5128 */ 5129 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5130 { 5131 struct ring_buffer_per_cpu *cpu_buffer; 5132 5133 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5134 return 0; 5135 5136 cpu_buffer = buffer->buffers[cpu]; 5137 5138 return rb_num_of_entries(cpu_buffer); 5139 } 5140 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5141 5142 /** 5143 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5144 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5145 * @buffer: The ring buffer 5146 * @cpu: The per CPU buffer to get the number of overruns from 5147 */ 5148 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5149 { 5150 struct ring_buffer_per_cpu *cpu_buffer; 5151 unsigned long ret; 5152 5153 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5154 return 0; 5155 5156 cpu_buffer = buffer->buffers[cpu]; 5157 ret = local_read(&cpu_buffer->overrun); 5158 5159 return ret; 5160 } 5161 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5162 5163 /** 5164 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5165 * commits failing due to the buffer wrapping around while there are uncommitted 5166 * events, such as during an interrupt storm. 5167 * @buffer: The ring buffer 5168 * @cpu: The per CPU buffer to get the number of overruns from 5169 */ 5170 unsigned long 5171 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5172 { 5173 struct ring_buffer_per_cpu *cpu_buffer; 5174 unsigned long ret; 5175 5176 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5177 return 0; 5178 5179 cpu_buffer = buffer->buffers[cpu]; 5180 ret = local_read(&cpu_buffer->commit_overrun); 5181 5182 return ret; 5183 } 5184 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5185 5186 /** 5187 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5188 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5189 * @buffer: The ring buffer 5190 * @cpu: The per CPU buffer to get the number of overruns from 5191 */ 5192 unsigned long 5193 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5194 { 5195 struct ring_buffer_per_cpu *cpu_buffer; 5196 unsigned long ret; 5197 5198 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5199 return 0; 5200 5201 cpu_buffer = buffer->buffers[cpu]; 5202 ret = local_read(&cpu_buffer->dropped_events); 5203 5204 return ret; 5205 } 5206 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5207 5208 /** 5209 * ring_buffer_read_events_cpu - get the number of events successfully read 5210 * @buffer: The ring buffer 5211 * @cpu: The per CPU buffer to get the number of events read 5212 */ 5213 unsigned long 5214 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5215 { 5216 struct ring_buffer_per_cpu *cpu_buffer; 5217 5218 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5219 return 0; 5220 5221 cpu_buffer = buffer->buffers[cpu]; 5222 return cpu_buffer->read; 5223 } 5224 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5225 5226 /** 5227 * ring_buffer_entries - get the number of entries in a buffer 5228 * @buffer: The ring buffer 5229 * 5230 * Returns the total number of entries in the ring buffer 5231 * (all CPU entries) 5232 */ 5233 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5234 { 5235 struct ring_buffer_per_cpu *cpu_buffer; 5236 unsigned long entries = 0; 5237 int cpu; 5238 5239 /* if you care about this being correct, lock the buffer */ 5240 for_each_buffer_cpu(buffer, cpu) { 5241 cpu_buffer = buffer->buffers[cpu]; 5242 entries += rb_num_of_entries(cpu_buffer); 5243 } 5244 5245 return entries; 5246 } 5247 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5248 5249 /** 5250 * ring_buffer_overruns - get the number of overruns in buffer 5251 * @buffer: The ring buffer 5252 * 5253 * Returns the total number of overruns in the ring buffer 5254 * (all CPU entries) 5255 */ 5256 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5257 { 5258 struct ring_buffer_per_cpu *cpu_buffer; 5259 unsigned long overruns = 0; 5260 int cpu; 5261 5262 /* if you care about this being correct, lock the buffer */ 5263 for_each_buffer_cpu(buffer, cpu) { 5264 cpu_buffer = buffer->buffers[cpu]; 5265 overruns += local_read(&cpu_buffer->overrun); 5266 } 5267 5268 return overruns; 5269 } 5270 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5271 5272 static void rb_iter_reset(struct ring_buffer_iter *iter) 5273 { 5274 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5275 5276 /* Iterator usage is expected to have record disabled */ 5277 iter->head_page = cpu_buffer->reader_page; 5278 iter->head = cpu_buffer->reader_page->read; 5279 iter->next_event = iter->head; 5280 5281 iter->cache_reader_page = iter->head_page; 5282 iter->cache_read = cpu_buffer->read; 5283 iter->cache_pages_removed = cpu_buffer->pages_removed; 5284 5285 if (iter->head) { 5286 iter->read_stamp = cpu_buffer->read_stamp; 5287 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5288 } else { 5289 iter->read_stamp = iter->head_page->page->time_stamp; 5290 iter->page_stamp = iter->read_stamp; 5291 } 5292 } 5293 5294 /** 5295 * ring_buffer_iter_reset - reset an iterator 5296 * @iter: The iterator to reset 5297 * 5298 * Resets the iterator, so that it will start from the beginning 5299 * again. 5300 */ 5301 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5302 { 5303 struct ring_buffer_per_cpu *cpu_buffer; 5304 unsigned long flags; 5305 5306 if (!iter) 5307 return; 5308 5309 cpu_buffer = iter->cpu_buffer; 5310 5311 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5312 rb_iter_reset(iter); 5313 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5314 } 5315 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5316 5317 /** 5318 * ring_buffer_iter_empty - check if an iterator has no more to read 5319 * @iter: The iterator to check 5320 */ 5321 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5322 { 5323 struct ring_buffer_per_cpu *cpu_buffer; 5324 struct buffer_page *reader; 5325 struct buffer_page *head_page; 5326 struct buffer_page *commit_page; 5327 struct buffer_page *curr_commit_page; 5328 unsigned commit; 5329 u64 curr_commit_ts; 5330 u64 commit_ts; 5331 5332 cpu_buffer = iter->cpu_buffer; 5333 reader = cpu_buffer->reader_page; 5334 head_page = cpu_buffer->head_page; 5335 commit_page = READ_ONCE(cpu_buffer->commit_page); 5336 commit_ts = commit_page->page->time_stamp; 5337 5338 /* 5339 * When the writer goes across pages, it issues a cmpxchg which 5340 * is a mb(), which will synchronize with the rmb here. 5341 * (see rb_tail_page_update()) 5342 */ 5343 smp_rmb(); 5344 commit = rb_page_commit(commit_page); 5345 /* We want to make sure that the commit page doesn't change */ 5346 smp_rmb(); 5347 5348 /* Make sure commit page didn't change */ 5349 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5350 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5351 5352 /* If the commit page changed, then there's more data */ 5353 if (curr_commit_page != commit_page || 5354 curr_commit_ts != commit_ts) 5355 return 0; 5356 5357 /* Still racy, as it may return a false positive, but that's OK */ 5358 return ((iter->head_page == commit_page && iter->head >= commit) || 5359 (iter->head_page == reader && commit_page == head_page && 5360 head_page->read == commit && 5361 iter->head == rb_page_size(cpu_buffer->reader_page))); 5362 } 5363 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5364 5365 static void 5366 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5367 struct ring_buffer_event *event) 5368 { 5369 u64 delta; 5370 5371 switch (event->type_len) { 5372 case RINGBUF_TYPE_PADDING: 5373 return; 5374 5375 case RINGBUF_TYPE_TIME_EXTEND: 5376 delta = rb_event_time_stamp(event); 5377 cpu_buffer->read_stamp += delta; 5378 return; 5379 5380 case RINGBUF_TYPE_TIME_STAMP: 5381 delta = rb_event_time_stamp(event); 5382 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5383 cpu_buffer->read_stamp = delta; 5384 return; 5385 5386 case RINGBUF_TYPE_DATA: 5387 cpu_buffer->read_stamp += event->time_delta; 5388 return; 5389 5390 default: 5391 RB_WARN_ON(cpu_buffer, 1); 5392 } 5393 } 5394 5395 static void 5396 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5397 struct ring_buffer_event *event) 5398 { 5399 u64 delta; 5400 5401 switch (event->type_len) { 5402 case RINGBUF_TYPE_PADDING: 5403 return; 5404 5405 case RINGBUF_TYPE_TIME_EXTEND: 5406 delta = rb_event_time_stamp(event); 5407 iter->read_stamp += delta; 5408 return; 5409 5410 case RINGBUF_TYPE_TIME_STAMP: 5411 delta = rb_event_time_stamp(event); 5412 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5413 iter->read_stamp = delta; 5414 return; 5415 5416 case RINGBUF_TYPE_DATA: 5417 iter->read_stamp += event->time_delta; 5418 return; 5419 5420 default: 5421 RB_WARN_ON(iter->cpu_buffer, 1); 5422 } 5423 } 5424 5425 static struct buffer_page * 5426 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5427 { 5428 struct buffer_page *reader = NULL; 5429 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5430 unsigned long overwrite; 5431 unsigned long flags; 5432 int nr_loops = 0; 5433 bool ret; 5434 5435 local_irq_save(flags); 5436 arch_spin_lock(&cpu_buffer->lock); 5437 5438 again: 5439 /* 5440 * This should normally only loop twice. But because the 5441 * start of the reader inserts an empty page, it causes 5442 * a case where we will loop three times. There should be no 5443 * reason to loop four times (that I know of). 5444 */ 5445 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5446 reader = NULL; 5447 goto out; 5448 } 5449 5450 reader = cpu_buffer->reader_page; 5451 5452 /* If there's more to read, return this page */ 5453 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5454 goto out; 5455 5456 /* Never should we have an index greater than the size */ 5457 if (RB_WARN_ON(cpu_buffer, 5458 cpu_buffer->reader_page->read > rb_page_size(reader))) 5459 goto out; 5460 5461 /* check if we caught up to the tail */ 5462 reader = NULL; 5463 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5464 goto out; 5465 5466 /* Don't bother swapping if the ring buffer is empty */ 5467 if (rb_num_of_entries(cpu_buffer) == 0) 5468 goto out; 5469 5470 /* 5471 * Reset the reader page to size zero. 5472 */ 5473 local_set(&cpu_buffer->reader_page->write, 0); 5474 local_set(&cpu_buffer->reader_page->entries, 0); 5475 cpu_buffer->reader_page->real_end = 0; 5476 5477 spin: 5478 /* 5479 * Splice the empty reader page into the list around the head. 5480 */ 5481 reader = rb_set_head_page(cpu_buffer); 5482 if (!reader) 5483 goto out; 5484 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5485 cpu_buffer->reader_page->list.prev = reader->list.prev; 5486 5487 /* 5488 * cpu_buffer->pages just needs to point to the buffer, it 5489 * has no specific buffer page to point to. Lets move it out 5490 * of our way so we don't accidentally swap it. 5491 */ 5492 cpu_buffer->pages = reader->list.prev; 5493 5494 /* The reader page will be pointing to the new head */ 5495 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5496 5497 /* 5498 * We want to make sure we read the overruns after we set up our 5499 * pointers to the next object. The writer side does a 5500 * cmpxchg to cross pages which acts as the mb on the writer 5501 * side. Note, the reader will constantly fail the swap 5502 * while the writer is updating the pointers, so this 5503 * guarantees that the overwrite recorded here is the one we 5504 * want to compare with the last_overrun. 5505 */ 5506 smp_mb(); 5507 overwrite = local_read(&(cpu_buffer->overrun)); 5508 5509 /* 5510 * Here's the tricky part. 5511 * 5512 * We need to move the pointer past the header page. 5513 * But we can only do that if a writer is not currently 5514 * moving it. The page before the header page has the 5515 * flag bit '1' set if it is pointing to the page we want. 5516 * but if the writer is in the process of moving it 5517 * then it will be '2' or already moved '0'. 5518 */ 5519 5520 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5521 5522 /* 5523 * If we did not convert it, then we must try again. 5524 */ 5525 if (!ret) 5526 goto spin; 5527 5528 if (cpu_buffer->ring_meta) 5529 rb_update_meta_reader(cpu_buffer, reader); 5530 5531 /* 5532 * Yay! We succeeded in replacing the page. 5533 * 5534 * Now make the new head point back to the reader page. 5535 */ 5536 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5537 rb_inc_page(&cpu_buffer->head_page); 5538 5539 cpu_buffer->cnt++; 5540 local_inc(&cpu_buffer->pages_read); 5541 5542 /* Finally update the reader page to the new head */ 5543 cpu_buffer->reader_page = reader; 5544 cpu_buffer->reader_page->read = 0; 5545 5546 if (overwrite != cpu_buffer->last_overrun) { 5547 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5548 cpu_buffer->last_overrun = overwrite; 5549 } 5550 5551 goto again; 5552 5553 out: 5554 /* Update the read_stamp on the first event */ 5555 if (reader && reader->read == 0) 5556 cpu_buffer->read_stamp = reader->page->time_stamp; 5557 5558 arch_spin_unlock(&cpu_buffer->lock); 5559 local_irq_restore(flags); 5560 5561 /* 5562 * The writer has preempt disable, wait for it. But not forever 5563 * Although, 1 second is pretty much "forever" 5564 */ 5565 #define USECS_WAIT 1000000 5566 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5567 /* If the write is past the end of page, a writer is still updating it */ 5568 if (likely(!reader || rb_page_write(reader) <= bsize)) 5569 break; 5570 5571 udelay(1); 5572 5573 /* Get the latest version of the reader write value */ 5574 smp_rmb(); 5575 } 5576 5577 /* The writer is not moving forward? Something is wrong */ 5578 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5579 reader = NULL; 5580 5581 /* 5582 * Make sure we see any padding after the write update 5583 * (see rb_reset_tail()). 5584 * 5585 * In addition, a writer may be writing on the reader page 5586 * if the page has not been fully filled, so the read barrier 5587 * is also needed to make sure we see the content of what is 5588 * committed by the writer (see rb_set_commit_to_write()). 5589 */ 5590 smp_rmb(); 5591 5592 5593 return reader; 5594 } 5595 5596 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5597 { 5598 struct ring_buffer_event *event; 5599 struct buffer_page *reader; 5600 unsigned length; 5601 5602 reader = rb_get_reader_page(cpu_buffer); 5603 5604 /* This function should not be called when buffer is empty */ 5605 if (RB_WARN_ON(cpu_buffer, !reader)) 5606 return; 5607 5608 event = rb_reader_event(cpu_buffer); 5609 5610 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5611 cpu_buffer->read++; 5612 5613 rb_update_read_stamp(cpu_buffer, event); 5614 5615 length = rb_event_length(event); 5616 cpu_buffer->reader_page->read += length; 5617 cpu_buffer->read_bytes += length; 5618 } 5619 5620 static void rb_advance_iter(struct ring_buffer_iter *iter) 5621 { 5622 struct ring_buffer_per_cpu *cpu_buffer; 5623 5624 cpu_buffer = iter->cpu_buffer; 5625 5626 /* If head == next_event then we need to jump to the next event */ 5627 if (iter->head == iter->next_event) { 5628 /* If the event gets overwritten again, there's nothing to do */ 5629 if (rb_iter_head_event(iter) == NULL) 5630 return; 5631 } 5632 5633 iter->head = iter->next_event; 5634 5635 /* 5636 * Check if we are at the end of the buffer. 5637 */ 5638 if (iter->next_event >= rb_page_size(iter->head_page)) { 5639 /* discarded commits can make the page empty */ 5640 if (iter->head_page == cpu_buffer->commit_page) 5641 return; 5642 rb_inc_iter(iter); 5643 return; 5644 } 5645 5646 rb_update_iter_read_stamp(iter, iter->event); 5647 } 5648 5649 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5650 { 5651 return cpu_buffer->lost_events; 5652 } 5653 5654 static struct ring_buffer_event * 5655 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5656 unsigned long *lost_events) 5657 { 5658 struct ring_buffer_event *event; 5659 struct buffer_page *reader; 5660 int nr_loops = 0; 5661 5662 if (ts) 5663 *ts = 0; 5664 again: 5665 /* 5666 * We repeat when a time extend is encountered. 5667 * Since the time extend is always attached to a data event, 5668 * we should never loop more than once. 5669 * (We never hit the following condition more than twice). 5670 */ 5671 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5672 return NULL; 5673 5674 reader = rb_get_reader_page(cpu_buffer); 5675 if (!reader) 5676 return NULL; 5677 5678 event = rb_reader_event(cpu_buffer); 5679 5680 switch (event->type_len) { 5681 case RINGBUF_TYPE_PADDING: 5682 if (rb_null_event(event)) 5683 RB_WARN_ON(cpu_buffer, 1); 5684 /* 5685 * Because the writer could be discarding every 5686 * event it creates (which would probably be bad) 5687 * if we were to go back to "again" then we may never 5688 * catch up, and will trigger the warn on, or lock 5689 * the box. Return the padding, and we will release 5690 * the current locks, and try again. 5691 */ 5692 return event; 5693 5694 case RINGBUF_TYPE_TIME_EXTEND: 5695 /* Internal data, OK to advance */ 5696 rb_advance_reader(cpu_buffer); 5697 goto again; 5698 5699 case RINGBUF_TYPE_TIME_STAMP: 5700 if (ts) { 5701 *ts = rb_event_time_stamp(event); 5702 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5703 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5704 cpu_buffer->cpu, ts); 5705 } 5706 /* Internal data, OK to advance */ 5707 rb_advance_reader(cpu_buffer); 5708 goto again; 5709 5710 case RINGBUF_TYPE_DATA: 5711 if (ts && !(*ts)) { 5712 *ts = cpu_buffer->read_stamp + event->time_delta; 5713 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5714 cpu_buffer->cpu, ts); 5715 } 5716 if (lost_events) 5717 *lost_events = rb_lost_events(cpu_buffer); 5718 return event; 5719 5720 default: 5721 RB_WARN_ON(cpu_buffer, 1); 5722 } 5723 5724 return NULL; 5725 } 5726 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5727 5728 static struct ring_buffer_event * 5729 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5730 { 5731 struct trace_buffer *buffer; 5732 struct ring_buffer_per_cpu *cpu_buffer; 5733 struct ring_buffer_event *event; 5734 int nr_loops = 0; 5735 5736 if (ts) 5737 *ts = 0; 5738 5739 cpu_buffer = iter->cpu_buffer; 5740 buffer = cpu_buffer->buffer; 5741 5742 /* 5743 * Check if someone performed a consuming read to the buffer 5744 * or removed some pages from the buffer. In these cases, 5745 * iterator was invalidated and we need to reset it. 5746 */ 5747 if (unlikely(iter->cache_read != cpu_buffer->read || 5748 iter->cache_reader_page != cpu_buffer->reader_page || 5749 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5750 rb_iter_reset(iter); 5751 5752 again: 5753 if (ring_buffer_iter_empty(iter)) 5754 return NULL; 5755 5756 /* 5757 * As the writer can mess with what the iterator is trying 5758 * to read, just give up if we fail to get an event after 5759 * three tries. The iterator is not as reliable when reading 5760 * the ring buffer with an active write as the consumer is. 5761 * Do not warn if the three failures is reached. 5762 */ 5763 if (++nr_loops > 3) 5764 return NULL; 5765 5766 if (rb_per_cpu_empty(cpu_buffer)) 5767 return NULL; 5768 5769 if (iter->head >= rb_page_size(iter->head_page)) { 5770 rb_inc_iter(iter); 5771 goto again; 5772 } 5773 5774 event = rb_iter_head_event(iter); 5775 if (!event) 5776 goto again; 5777 5778 switch (event->type_len) { 5779 case RINGBUF_TYPE_PADDING: 5780 if (rb_null_event(event)) { 5781 rb_inc_iter(iter); 5782 goto again; 5783 } 5784 rb_advance_iter(iter); 5785 return event; 5786 5787 case RINGBUF_TYPE_TIME_EXTEND: 5788 /* Internal data, OK to advance */ 5789 rb_advance_iter(iter); 5790 goto again; 5791 5792 case RINGBUF_TYPE_TIME_STAMP: 5793 if (ts) { 5794 *ts = rb_event_time_stamp(event); 5795 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5796 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5797 cpu_buffer->cpu, ts); 5798 } 5799 /* Internal data, OK to advance */ 5800 rb_advance_iter(iter); 5801 goto again; 5802 5803 case RINGBUF_TYPE_DATA: 5804 if (ts && !(*ts)) { 5805 *ts = iter->read_stamp + event->time_delta; 5806 ring_buffer_normalize_time_stamp(buffer, 5807 cpu_buffer->cpu, ts); 5808 } 5809 return event; 5810 5811 default: 5812 RB_WARN_ON(cpu_buffer, 1); 5813 } 5814 5815 return NULL; 5816 } 5817 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5818 5819 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5820 { 5821 if (likely(!in_nmi())) { 5822 raw_spin_lock(&cpu_buffer->reader_lock); 5823 return true; 5824 } 5825 5826 /* 5827 * If an NMI die dumps out the content of the ring buffer 5828 * trylock must be used to prevent a deadlock if the NMI 5829 * preempted a task that holds the ring buffer locks. If 5830 * we get the lock then all is fine, if not, then continue 5831 * to do the read, but this can corrupt the ring buffer, 5832 * so it must be permanently disabled from future writes. 5833 * Reading from NMI is a oneshot deal. 5834 */ 5835 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5836 return true; 5837 5838 /* Continue without locking, but disable the ring buffer */ 5839 atomic_inc(&cpu_buffer->record_disabled); 5840 return false; 5841 } 5842 5843 static inline void 5844 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5845 { 5846 if (likely(locked)) 5847 raw_spin_unlock(&cpu_buffer->reader_lock); 5848 } 5849 5850 /** 5851 * ring_buffer_peek - peek at the next event to be read 5852 * @buffer: The ring buffer to read 5853 * @cpu: The cpu to peak at 5854 * @ts: The timestamp counter of this event. 5855 * @lost_events: a variable to store if events were lost (may be NULL) 5856 * 5857 * This will return the event that will be read next, but does 5858 * not consume the data. 5859 */ 5860 struct ring_buffer_event * 5861 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5862 unsigned long *lost_events) 5863 { 5864 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5865 struct ring_buffer_event *event; 5866 unsigned long flags; 5867 bool dolock; 5868 5869 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5870 return NULL; 5871 5872 again: 5873 local_irq_save(flags); 5874 dolock = rb_reader_lock(cpu_buffer); 5875 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5876 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5877 rb_advance_reader(cpu_buffer); 5878 rb_reader_unlock(cpu_buffer, dolock); 5879 local_irq_restore(flags); 5880 5881 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5882 goto again; 5883 5884 return event; 5885 } 5886 5887 /** ring_buffer_iter_dropped - report if there are dropped events 5888 * @iter: The ring buffer iterator 5889 * 5890 * Returns true if there was dropped events since the last peek. 5891 */ 5892 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5893 { 5894 bool ret = iter->missed_events != 0; 5895 5896 iter->missed_events = 0; 5897 return ret; 5898 } 5899 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5900 5901 /** 5902 * ring_buffer_iter_peek - peek at the next event to be read 5903 * @iter: The ring buffer iterator 5904 * @ts: The timestamp counter of this event. 5905 * 5906 * This will return the event that will be read next, but does 5907 * not increment the iterator. 5908 */ 5909 struct ring_buffer_event * 5910 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5911 { 5912 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5913 struct ring_buffer_event *event; 5914 unsigned long flags; 5915 5916 again: 5917 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5918 event = rb_iter_peek(iter, ts); 5919 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5920 5921 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5922 goto again; 5923 5924 return event; 5925 } 5926 5927 /** 5928 * ring_buffer_consume - return an event and consume it 5929 * @buffer: The ring buffer to get the next event from 5930 * @cpu: the cpu to read the buffer from 5931 * @ts: a variable to store the timestamp (may be NULL) 5932 * @lost_events: a variable to store if events were lost (may be NULL) 5933 * 5934 * Returns the next event in the ring buffer, and that event is consumed. 5935 * Meaning, that sequential reads will keep returning a different event, 5936 * and eventually empty the ring buffer if the producer is slower. 5937 */ 5938 struct ring_buffer_event * 5939 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5940 unsigned long *lost_events) 5941 { 5942 struct ring_buffer_per_cpu *cpu_buffer; 5943 struct ring_buffer_event *event = NULL; 5944 unsigned long flags; 5945 bool dolock; 5946 5947 again: 5948 /* might be called in atomic */ 5949 preempt_disable(); 5950 5951 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5952 goto out; 5953 5954 cpu_buffer = buffer->buffers[cpu]; 5955 local_irq_save(flags); 5956 dolock = rb_reader_lock(cpu_buffer); 5957 5958 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5959 if (event) { 5960 cpu_buffer->lost_events = 0; 5961 rb_advance_reader(cpu_buffer); 5962 } 5963 5964 rb_reader_unlock(cpu_buffer, dolock); 5965 local_irq_restore(flags); 5966 5967 out: 5968 preempt_enable(); 5969 5970 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5971 goto again; 5972 5973 return event; 5974 } 5975 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5976 5977 /** 5978 * ring_buffer_read_start - start a non consuming read of the buffer 5979 * @buffer: The ring buffer to read from 5980 * @cpu: The cpu buffer to iterate over 5981 * @flags: gfp flags to use for memory allocation 5982 * 5983 * This creates an iterator to allow non-consuming iteration through 5984 * the buffer. If the buffer is disabled for writing, it will produce 5985 * the same information each time, but if the buffer is still writing 5986 * then the first hit of a write will cause the iteration to stop. 5987 * 5988 * Must be paired with ring_buffer_read_finish. 5989 */ 5990 struct ring_buffer_iter * 5991 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 5992 { 5993 struct ring_buffer_per_cpu *cpu_buffer; 5994 struct ring_buffer_iter *iter; 5995 5996 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5997 return NULL; 5998 5999 iter = kzalloc(sizeof(*iter), flags); 6000 if (!iter) 6001 return NULL; 6002 6003 /* Holds the entire event: data and meta data */ 6004 iter->event_size = buffer->subbuf_size; 6005 iter->event = kmalloc(iter->event_size, flags); 6006 if (!iter->event) { 6007 kfree(iter); 6008 return NULL; 6009 } 6010 6011 cpu_buffer = buffer->buffers[cpu]; 6012 6013 iter->cpu_buffer = cpu_buffer; 6014 6015 atomic_inc(&cpu_buffer->resize_disabled); 6016 6017 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6018 arch_spin_lock(&cpu_buffer->lock); 6019 rb_iter_reset(iter); 6020 arch_spin_unlock(&cpu_buffer->lock); 6021 6022 return iter; 6023 } 6024 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 6025 6026 /** 6027 * ring_buffer_read_finish - finish reading the iterator of the buffer 6028 * @iter: The iterator retrieved by ring_buffer_start 6029 * 6030 * This re-enables resizing of the buffer, and frees the iterator. 6031 */ 6032 void 6033 ring_buffer_read_finish(struct ring_buffer_iter *iter) 6034 { 6035 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6036 6037 /* Use this opportunity to check the integrity of the ring buffer. */ 6038 rb_check_pages(cpu_buffer); 6039 6040 atomic_dec(&cpu_buffer->resize_disabled); 6041 kfree(iter->event); 6042 kfree(iter); 6043 } 6044 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6045 6046 /** 6047 * ring_buffer_iter_advance - advance the iterator to the next location 6048 * @iter: The ring buffer iterator 6049 * 6050 * Move the location of the iterator such that the next read will 6051 * be the next location of the iterator. 6052 */ 6053 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6054 { 6055 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6056 unsigned long flags; 6057 6058 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6059 6060 rb_advance_iter(iter); 6061 6062 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6063 } 6064 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6065 6066 /** 6067 * ring_buffer_size - return the size of the ring buffer (in bytes) 6068 * @buffer: The ring buffer. 6069 * @cpu: The CPU to get ring buffer size from. 6070 */ 6071 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6072 { 6073 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6074 return 0; 6075 6076 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6077 } 6078 EXPORT_SYMBOL_GPL(ring_buffer_size); 6079 6080 /** 6081 * ring_buffer_max_event_size - return the max data size of an event 6082 * @buffer: The ring buffer. 6083 * 6084 * Returns the maximum size an event can be. 6085 */ 6086 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6087 { 6088 /* If abs timestamp is requested, events have a timestamp too */ 6089 if (ring_buffer_time_stamp_abs(buffer)) 6090 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6091 return buffer->max_data_size; 6092 } 6093 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6094 6095 static void rb_clear_buffer_page(struct buffer_page *page) 6096 { 6097 local_set(&page->write, 0); 6098 local_set(&page->entries, 0); 6099 rb_init_page(page->page); 6100 page->read = 0; 6101 } 6102 6103 /* 6104 * When the buffer is memory mapped to user space, each sub buffer 6105 * has a unique id that is used by the meta data to tell the user 6106 * where the current reader page is. 6107 * 6108 * For a normal allocated ring buffer, the id is saved in the buffer page 6109 * id field, and updated via this function. 6110 * 6111 * But for a fixed memory mapped buffer, the id is already assigned for 6112 * fixed memory ordering in the memory layout and can not be used. Instead 6113 * the index of where the page lies in the memory layout is used. 6114 * 6115 * For the normal pages, set the buffer page id with the passed in @id 6116 * value and return that. 6117 * 6118 * For fixed memory mapped pages, get the page index in the memory layout 6119 * and return that as the id. 6120 */ 6121 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6122 struct buffer_page *bpage, int id) 6123 { 6124 /* 6125 * For boot buffers, the id is the index, 6126 * otherwise, set the buffer page with this id 6127 */ 6128 if (cpu_buffer->ring_meta) 6129 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6130 else 6131 bpage->id = id; 6132 6133 return id; 6134 } 6135 6136 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6137 { 6138 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6139 6140 if (!meta) 6141 return; 6142 6143 meta->reader.read = cpu_buffer->reader_page->read; 6144 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6145 cpu_buffer->reader_page->id); 6146 6147 meta->reader.lost_events = cpu_buffer->lost_events; 6148 6149 meta->entries = local_read(&cpu_buffer->entries); 6150 meta->overrun = local_read(&cpu_buffer->overrun); 6151 meta->read = cpu_buffer->read; 6152 6153 /* Some archs do not have data cache coherency between kernel and user-space */ 6154 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6155 } 6156 6157 static void 6158 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6159 { 6160 struct buffer_page *page; 6161 6162 rb_head_page_deactivate(cpu_buffer); 6163 6164 cpu_buffer->head_page 6165 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6166 rb_clear_buffer_page(cpu_buffer->head_page); 6167 list_for_each_entry(page, cpu_buffer->pages, list) { 6168 rb_clear_buffer_page(page); 6169 } 6170 6171 cpu_buffer->tail_page = cpu_buffer->head_page; 6172 cpu_buffer->commit_page = cpu_buffer->head_page; 6173 6174 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6175 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6176 rb_clear_buffer_page(cpu_buffer->reader_page); 6177 6178 local_set(&cpu_buffer->entries_bytes, 0); 6179 local_set(&cpu_buffer->overrun, 0); 6180 local_set(&cpu_buffer->commit_overrun, 0); 6181 local_set(&cpu_buffer->dropped_events, 0); 6182 local_set(&cpu_buffer->entries, 0); 6183 local_set(&cpu_buffer->committing, 0); 6184 local_set(&cpu_buffer->commits, 0); 6185 local_set(&cpu_buffer->pages_touched, 0); 6186 local_set(&cpu_buffer->pages_lost, 0); 6187 local_set(&cpu_buffer->pages_read, 0); 6188 cpu_buffer->last_pages_touch = 0; 6189 cpu_buffer->shortest_full = 0; 6190 cpu_buffer->read = 0; 6191 cpu_buffer->read_bytes = 0; 6192 6193 rb_time_set(&cpu_buffer->write_stamp, 0); 6194 rb_time_set(&cpu_buffer->before_stamp, 0); 6195 6196 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6197 6198 cpu_buffer->lost_events = 0; 6199 cpu_buffer->last_overrun = 0; 6200 6201 rb_head_page_activate(cpu_buffer); 6202 cpu_buffer->pages_removed = 0; 6203 6204 if (cpu_buffer->mapped) { 6205 rb_update_meta_page(cpu_buffer); 6206 if (cpu_buffer->ring_meta) { 6207 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6208 meta->commit_buffer = meta->head_buffer; 6209 } 6210 } 6211 } 6212 6213 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6214 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6215 { 6216 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6217 6218 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6219 return; 6220 6221 arch_spin_lock(&cpu_buffer->lock); 6222 6223 rb_reset_cpu(cpu_buffer); 6224 6225 arch_spin_unlock(&cpu_buffer->lock); 6226 } 6227 6228 /** 6229 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6230 * @buffer: The ring buffer to reset a per cpu buffer of 6231 * @cpu: The CPU buffer to be reset 6232 */ 6233 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6234 { 6235 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6236 6237 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6238 return; 6239 6240 /* prevent another thread from changing buffer sizes */ 6241 mutex_lock(&buffer->mutex); 6242 6243 atomic_inc(&cpu_buffer->resize_disabled); 6244 atomic_inc(&cpu_buffer->record_disabled); 6245 6246 /* Make sure all commits have finished */ 6247 synchronize_rcu(); 6248 6249 reset_disabled_cpu_buffer(cpu_buffer); 6250 6251 atomic_dec(&cpu_buffer->record_disabled); 6252 atomic_dec(&cpu_buffer->resize_disabled); 6253 6254 mutex_unlock(&buffer->mutex); 6255 } 6256 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6257 6258 /* Flag to ensure proper resetting of atomic variables */ 6259 #define RESET_BIT (1 << 30) 6260 6261 /** 6262 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6263 * @buffer: The ring buffer to reset a per cpu buffer of 6264 */ 6265 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6266 { 6267 struct ring_buffer_per_cpu *cpu_buffer; 6268 int cpu; 6269 6270 /* prevent another thread from changing buffer sizes */ 6271 mutex_lock(&buffer->mutex); 6272 6273 for_each_online_buffer_cpu(buffer, cpu) { 6274 cpu_buffer = buffer->buffers[cpu]; 6275 6276 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6277 atomic_inc(&cpu_buffer->record_disabled); 6278 } 6279 6280 /* Make sure all commits have finished */ 6281 synchronize_rcu(); 6282 6283 for_each_buffer_cpu(buffer, cpu) { 6284 cpu_buffer = buffer->buffers[cpu]; 6285 6286 /* 6287 * If a CPU came online during the synchronize_rcu(), then 6288 * ignore it. 6289 */ 6290 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6291 continue; 6292 6293 reset_disabled_cpu_buffer(cpu_buffer); 6294 6295 atomic_dec(&cpu_buffer->record_disabled); 6296 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6297 } 6298 6299 mutex_unlock(&buffer->mutex); 6300 } 6301 6302 /** 6303 * ring_buffer_reset - reset a ring buffer 6304 * @buffer: The ring buffer to reset all cpu buffers 6305 */ 6306 void ring_buffer_reset(struct trace_buffer *buffer) 6307 { 6308 struct ring_buffer_per_cpu *cpu_buffer; 6309 int cpu; 6310 6311 /* prevent another thread from changing buffer sizes */ 6312 mutex_lock(&buffer->mutex); 6313 6314 for_each_buffer_cpu(buffer, cpu) { 6315 cpu_buffer = buffer->buffers[cpu]; 6316 6317 atomic_inc(&cpu_buffer->resize_disabled); 6318 atomic_inc(&cpu_buffer->record_disabled); 6319 } 6320 6321 /* Make sure all commits have finished */ 6322 synchronize_rcu(); 6323 6324 for_each_buffer_cpu(buffer, cpu) { 6325 cpu_buffer = buffer->buffers[cpu]; 6326 6327 reset_disabled_cpu_buffer(cpu_buffer); 6328 6329 atomic_dec(&cpu_buffer->record_disabled); 6330 atomic_dec(&cpu_buffer->resize_disabled); 6331 } 6332 6333 mutex_unlock(&buffer->mutex); 6334 } 6335 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6336 6337 /** 6338 * ring_buffer_empty - is the ring buffer empty? 6339 * @buffer: The ring buffer to test 6340 */ 6341 bool ring_buffer_empty(struct trace_buffer *buffer) 6342 { 6343 struct ring_buffer_per_cpu *cpu_buffer; 6344 unsigned long flags; 6345 bool dolock; 6346 bool ret; 6347 int cpu; 6348 6349 /* yes this is racy, but if you don't like the race, lock the buffer */ 6350 for_each_buffer_cpu(buffer, cpu) { 6351 cpu_buffer = buffer->buffers[cpu]; 6352 local_irq_save(flags); 6353 dolock = rb_reader_lock(cpu_buffer); 6354 ret = rb_per_cpu_empty(cpu_buffer); 6355 rb_reader_unlock(cpu_buffer, dolock); 6356 local_irq_restore(flags); 6357 6358 if (!ret) 6359 return false; 6360 } 6361 6362 return true; 6363 } 6364 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6365 6366 /** 6367 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6368 * @buffer: The ring buffer 6369 * @cpu: The CPU buffer to test 6370 */ 6371 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6372 { 6373 struct ring_buffer_per_cpu *cpu_buffer; 6374 unsigned long flags; 6375 bool dolock; 6376 bool ret; 6377 6378 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6379 return true; 6380 6381 cpu_buffer = buffer->buffers[cpu]; 6382 local_irq_save(flags); 6383 dolock = rb_reader_lock(cpu_buffer); 6384 ret = rb_per_cpu_empty(cpu_buffer); 6385 rb_reader_unlock(cpu_buffer, dolock); 6386 local_irq_restore(flags); 6387 6388 return ret; 6389 } 6390 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6391 6392 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6393 /** 6394 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6395 * @buffer_a: One buffer to swap with 6396 * @buffer_b: The other buffer to swap with 6397 * @cpu: the CPU of the buffers to swap 6398 * 6399 * This function is useful for tracers that want to take a "snapshot" 6400 * of a CPU buffer and has another back up buffer lying around. 6401 * it is expected that the tracer handles the cpu buffer not being 6402 * used at the moment. 6403 */ 6404 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6405 struct trace_buffer *buffer_b, int cpu) 6406 { 6407 struct ring_buffer_per_cpu *cpu_buffer_a; 6408 struct ring_buffer_per_cpu *cpu_buffer_b; 6409 int ret = -EINVAL; 6410 6411 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6412 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6413 return -EINVAL; 6414 6415 cpu_buffer_a = buffer_a->buffers[cpu]; 6416 cpu_buffer_b = buffer_b->buffers[cpu]; 6417 6418 /* It's up to the callers to not try to swap mapped buffers */ 6419 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6420 return -EBUSY; 6421 6422 /* At least make sure the two buffers are somewhat the same */ 6423 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6424 return -EINVAL; 6425 6426 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6427 return -EINVAL; 6428 6429 if (atomic_read(&buffer_a->record_disabled)) 6430 return -EAGAIN; 6431 6432 if (atomic_read(&buffer_b->record_disabled)) 6433 return -EAGAIN; 6434 6435 if (atomic_read(&cpu_buffer_a->record_disabled)) 6436 return -EAGAIN; 6437 6438 if (atomic_read(&cpu_buffer_b->record_disabled)) 6439 return -EAGAIN; 6440 6441 /* 6442 * We can't do a synchronize_rcu here because this 6443 * function can be called in atomic context. 6444 * Normally this will be called from the same CPU as cpu. 6445 * If not it's up to the caller to protect this. 6446 */ 6447 atomic_inc(&cpu_buffer_a->record_disabled); 6448 atomic_inc(&cpu_buffer_b->record_disabled); 6449 6450 ret = -EBUSY; 6451 if (local_read(&cpu_buffer_a->committing)) 6452 goto out_dec; 6453 if (local_read(&cpu_buffer_b->committing)) 6454 goto out_dec; 6455 6456 /* 6457 * When resize is in progress, we cannot swap it because 6458 * it will mess the state of the cpu buffer. 6459 */ 6460 if (atomic_read(&buffer_a->resizing)) 6461 goto out_dec; 6462 if (atomic_read(&buffer_b->resizing)) 6463 goto out_dec; 6464 6465 buffer_a->buffers[cpu] = cpu_buffer_b; 6466 buffer_b->buffers[cpu] = cpu_buffer_a; 6467 6468 cpu_buffer_b->buffer = buffer_a; 6469 cpu_buffer_a->buffer = buffer_b; 6470 6471 ret = 0; 6472 6473 out_dec: 6474 atomic_dec(&cpu_buffer_a->record_disabled); 6475 atomic_dec(&cpu_buffer_b->record_disabled); 6476 return ret; 6477 } 6478 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6479 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6480 6481 /** 6482 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6483 * @buffer: the buffer to allocate for. 6484 * @cpu: the cpu buffer to allocate. 6485 * 6486 * This function is used in conjunction with ring_buffer_read_page. 6487 * When reading a full page from the ring buffer, these functions 6488 * can be used to speed up the process. The calling function should 6489 * allocate a few pages first with this function. Then when it 6490 * needs to get pages from the ring buffer, it passes the result 6491 * of this function into ring_buffer_read_page, which will swap 6492 * the page that was allocated, with the read page of the buffer. 6493 * 6494 * Returns: 6495 * The page allocated, or ERR_PTR 6496 */ 6497 struct buffer_data_read_page * 6498 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6499 { 6500 struct ring_buffer_per_cpu *cpu_buffer; 6501 struct buffer_data_read_page *bpage = NULL; 6502 unsigned long flags; 6503 6504 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6505 return ERR_PTR(-ENODEV); 6506 6507 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6508 if (!bpage) 6509 return ERR_PTR(-ENOMEM); 6510 6511 bpage->order = buffer->subbuf_order; 6512 cpu_buffer = buffer->buffers[cpu]; 6513 local_irq_save(flags); 6514 arch_spin_lock(&cpu_buffer->lock); 6515 6516 if (cpu_buffer->free_page) { 6517 bpage->data = cpu_buffer->free_page; 6518 cpu_buffer->free_page = NULL; 6519 } 6520 6521 arch_spin_unlock(&cpu_buffer->lock); 6522 local_irq_restore(flags); 6523 6524 if (bpage->data) { 6525 rb_init_page(bpage->data); 6526 } else { 6527 bpage->data = alloc_cpu_data(cpu, cpu_buffer->buffer->subbuf_order); 6528 if (!bpage->data) { 6529 kfree(bpage); 6530 return ERR_PTR(-ENOMEM); 6531 } 6532 } 6533 6534 return bpage; 6535 } 6536 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6537 6538 /** 6539 * ring_buffer_free_read_page - free an allocated read page 6540 * @buffer: the buffer the page was allocate for 6541 * @cpu: the cpu buffer the page came from 6542 * @data_page: the page to free 6543 * 6544 * Free a page allocated from ring_buffer_alloc_read_page. 6545 */ 6546 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6547 struct buffer_data_read_page *data_page) 6548 { 6549 struct ring_buffer_per_cpu *cpu_buffer; 6550 struct buffer_data_page *bpage = data_page->data; 6551 struct page *page = virt_to_page(bpage); 6552 unsigned long flags; 6553 6554 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6555 return; 6556 6557 cpu_buffer = buffer->buffers[cpu]; 6558 6559 /* 6560 * If the page is still in use someplace else, or order of the page 6561 * is different from the subbuffer order of the buffer - 6562 * we can't reuse it 6563 */ 6564 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6565 goto out; 6566 6567 local_irq_save(flags); 6568 arch_spin_lock(&cpu_buffer->lock); 6569 6570 if (!cpu_buffer->free_page) { 6571 cpu_buffer->free_page = bpage; 6572 bpage = NULL; 6573 } 6574 6575 arch_spin_unlock(&cpu_buffer->lock); 6576 local_irq_restore(flags); 6577 6578 out: 6579 free_pages((unsigned long)bpage, data_page->order); 6580 kfree(data_page); 6581 } 6582 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6583 6584 /** 6585 * ring_buffer_read_page - extract a page from the ring buffer 6586 * @buffer: buffer to extract from 6587 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6588 * @len: amount to extract 6589 * @cpu: the cpu of the buffer to extract 6590 * @full: should the extraction only happen when the page is full. 6591 * 6592 * This function will pull out a page from the ring buffer and consume it. 6593 * @data_page must be the address of the variable that was returned 6594 * from ring_buffer_alloc_read_page. This is because the page might be used 6595 * to swap with a page in the ring buffer. 6596 * 6597 * for example: 6598 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6599 * if (IS_ERR(rpage)) 6600 * return PTR_ERR(rpage); 6601 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6602 * if (ret >= 0) 6603 * process_page(ring_buffer_read_page_data(rpage), ret); 6604 * ring_buffer_free_read_page(buffer, cpu, rpage); 6605 * 6606 * When @full is set, the function will not return true unless 6607 * the writer is off the reader page. 6608 * 6609 * Note: it is up to the calling functions to handle sleeps and wakeups. 6610 * The ring buffer can be used anywhere in the kernel and can not 6611 * blindly call wake_up. The layer that uses the ring buffer must be 6612 * responsible for that. 6613 * 6614 * Returns: 6615 * >=0 if data has been transferred, returns the offset of consumed data. 6616 * <0 if no data has been transferred. 6617 */ 6618 int ring_buffer_read_page(struct trace_buffer *buffer, 6619 struct buffer_data_read_page *data_page, 6620 size_t len, int cpu, int full) 6621 { 6622 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6623 struct ring_buffer_event *event; 6624 struct buffer_data_page *bpage; 6625 struct buffer_page *reader; 6626 unsigned long missed_events; 6627 unsigned int commit; 6628 unsigned int read; 6629 u64 save_timestamp; 6630 6631 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6632 return -1; 6633 6634 /* 6635 * If len is not big enough to hold the page header, then 6636 * we can not copy anything. 6637 */ 6638 if (len <= BUF_PAGE_HDR_SIZE) 6639 return -1; 6640 6641 len -= BUF_PAGE_HDR_SIZE; 6642 6643 if (!data_page || !data_page->data) 6644 return -1; 6645 6646 if (data_page->order != buffer->subbuf_order) 6647 return -1; 6648 6649 bpage = data_page->data; 6650 if (!bpage) 6651 return -1; 6652 6653 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6654 6655 reader = rb_get_reader_page(cpu_buffer); 6656 if (!reader) 6657 return -1; 6658 6659 event = rb_reader_event(cpu_buffer); 6660 6661 read = reader->read; 6662 commit = rb_page_size(reader); 6663 6664 /* Check if any events were dropped */ 6665 missed_events = cpu_buffer->lost_events; 6666 6667 /* 6668 * If this page has been partially read or 6669 * if len is not big enough to read the rest of the page or 6670 * a writer is still on the page, then 6671 * we must copy the data from the page to the buffer. 6672 * Otherwise, we can simply swap the page with the one passed in. 6673 */ 6674 if (read || (len < (commit - read)) || 6675 cpu_buffer->reader_page == cpu_buffer->commit_page || 6676 cpu_buffer->mapped) { 6677 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6678 unsigned int rpos = read; 6679 unsigned int pos = 0; 6680 unsigned int size; 6681 6682 /* 6683 * If a full page is expected, this can still be returned 6684 * if there's been a previous partial read and the 6685 * rest of the page can be read and the commit page is off 6686 * the reader page. 6687 */ 6688 if (full && 6689 (!read || (len < (commit - read)) || 6690 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6691 return -1; 6692 6693 if (len > (commit - read)) 6694 len = (commit - read); 6695 6696 /* Always keep the time extend and data together */ 6697 size = rb_event_ts_length(event); 6698 6699 if (len < size) 6700 return -1; 6701 6702 /* save the current timestamp, since the user will need it */ 6703 save_timestamp = cpu_buffer->read_stamp; 6704 6705 /* Need to copy one event at a time */ 6706 do { 6707 /* We need the size of one event, because 6708 * rb_advance_reader only advances by one event, 6709 * whereas rb_event_ts_length may include the size of 6710 * one or two events. 6711 * We have already ensured there's enough space if this 6712 * is a time extend. */ 6713 size = rb_event_length(event); 6714 memcpy(bpage->data + pos, rpage->data + rpos, size); 6715 6716 len -= size; 6717 6718 rb_advance_reader(cpu_buffer); 6719 rpos = reader->read; 6720 pos += size; 6721 6722 if (rpos >= commit) 6723 break; 6724 6725 event = rb_reader_event(cpu_buffer); 6726 /* Always keep the time extend and data together */ 6727 size = rb_event_ts_length(event); 6728 } while (len >= size); 6729 6730 /* update bpage */ 6731 local_set(&bpage->commit, pos); 6732 bpage->time_stamp = save_timestamp; 6733 6734 /* we copied everything to the beginning */ 6735 read = 0; 6736 } else { 6737 /* update the entry counter */ 6738 cpu_buffer->read += rb_page_entries(reader); 6739 cpu_buffer->read_bytes += rb_page_size(reader); 6740 6741 /* swap the pages */ 6742 rb_init_page(bpage); 6743 bpage = reader->page; 6744 reader->page = data_page->data; 6745 local_set(&reader->write, 0); 6746 local_set(&reader->entries, 0); 6747 reader->read = 0; 6748 data_page->data = bpage; 6749 6750 /* 6751 * Use the real_end for the data size, 6752 * This gives us a chance to store the lost events 6753 * on the page. 6754 */ 6755 if (reader->real_end) 6756 local_set(&bpage->commit, reader->real_end); 6757 } 6758 6759 cpu_buffer->lost_events = 0; 6760 6761 commit = local_read(&bpage->commit); 6762 /* 6763 * Set a flag in the commit field if we lost events 6764 */ 6765 if (missed_events) { 6766 /* If there is room at the end of the page to save the 6767 * missed events, then record it there. 6768 */ 6769 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6770 memcpy(&bpage->data[commit], &missed_events, 6771 sizeof(missed_events)); 6772 local_add(RB_MISSED_STORED, &bpage->commit); 6773 commit += sizeof(missed_events); 6774 } 6775 local_add(RB_MISSED_EVENTS, &bpage->commit); 6776 } 6777 6778 /* 6779 * This page may be off to user land. Zero it out here. 6780 */ 6781 if (commit < buffer->subbuf_size) 6782 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6783 6784 return read; 6785 } 6786 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6787 6788 /** 6789 * ring_buffer_read_page_data - get pointer to the data in the page. 6790 * @page: the page to get the data from 6791 * 6792 * Returns pointer to the actual data in this page. 6793 */ 6794 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6795 { 6796 return page->data; 6797 } 6798 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6799 6800 /** 6801 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6802 * @buffer: the buffer to get the sub buffer size from 6803 * 6804 * Returns size of the sub buffer, in bytes. 6805 */ 6806 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6807 { 6808 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6809 } 6810 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6811 6812 /** 6813 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6814 * @buffer: The ring_buffer to get the system sub page order from 6815 * 6816 * By default, one ring buffer sub page equals to one system page. This parameter 6817 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6818 * extended, but must be an order of system page size. 6819 * 6820 * Returns the order of buffer sub page size, in system pages: 6821 * 0 means the sub buffer size is 1 system page and so forth. 6822 * In case of an error < 0 is returned. 6823 */ 6824 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6825 { 6826 if (!buffer) 6827 return -EINVAL; 6828 6829 return buffer->subbuf_order; 6830 } 6831 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6832 6833 /** 6834 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6835 * @buffer: The ring_buffer to set the new page size. 6836 * @order: Order of the system pages in one sub buffer page 6837 * 6838 * By default, one ring buffer pages equals to one system page. This API can be 6839 * used to set new size of the ring buffer page. The size must be order of 6840 * system page size, that's why the input parameter @order is the order of 6841 * system pages that are allocated for one ring buffer page: 6842 * 0 - 1 system page 6843 * 1 - 2 system pages 6844 * 3 - 4 system pages 6845 * ... 6846 * 6847 * Returns 0 on success or < 0 in case of an error. 6848 */ 6849 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6850 { 6851 struct ring_buffer_per_cpu *cpu_buffer; 6852 struct buffer_page *bpage, *tmp; 6853 int old_order, old_size; 6854 int nr_pages; 6855 int psize; 6856 int err; 6857 int cpu; 6858 6859 if (!buffer || order < 0) 6860 return -EINVAL; 6861 6862 if (buffer->subbuf_order == order) 6863 return 0; 6864 6865 psize = (1 << order) * PAGE_SIZE; 6866 if (psize <= BUF_PAGE_HDR_SIZE) 6867 return -EINVAL; 6868 6869 /* Size of a subbuf cannot be greater than the write counter */ 6870 if (psize > RB_WRITE_MASK + 1) 6871 return -EINVAL; 6872 6873 old_order = buffer->subbuf_order; 6874 old_size = buffer->subbuf_size; 6875 6876 /* prevent another thread from changing buffer sizes */ 6877 guard(mutex)(&buffer->mutex); 6878 atomic_inc(&buffer->record_disabled); 6879 6880 /* Make sure all commits have finished */ 6881 synchronize_rcu(); 6882 6883 buffer->subbuf_order = order; 6884 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6885 6886 /* Make sure all new buffers are allocated, before deleting the old ones */ 6887 for_each_buffer_cpu(buffer, cpu) { 6888 6889 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6890 continue; 6891 6892 cpu_buffer = buffer->buffers[cpu]; 6893 6894 if (cpu_buffer->mapped) { 6895 err = -EBUSY; 6896 goto error; 6897 } 6898 6899 /* Update the number of pages to match the new size */ 6900 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6901 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6902 6903 /* we need a minimum of two pages */ 6904 if (nr_pages < 2) 6905 nr_pages = 2; 6906 6907 cpu_buffer->nr_pages_to_update = nr_pages; 6908 6909 /* Include the reader page */ 6910 nr_pages++; 6911 6912 /* Allocate the new size buffer */ 6913 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6914 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6915 &cpu_buffer->new_pages)) { 6916 /* not enough memory for new pages */ 6917 err = -ENOMEM; 6918 goto error; 6919 } 6920 } 6921 6922 for_each_buffer_cpu(buffer, cpu) { 6923 struct buffer_data_page *old_free_data_page; 6924 struct list_head old_pages; 6925 unsigned long flags; 6926 6927 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6928 continue; 6929 6930 cpu_buffer = buffer->buffers[cpu]; 6931 6932 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6933 6934 /* Clear the head bit to make the link list normal to read */ 6935 rb_head_page_deactivate(cpu_buffer); 6936 6937 /* 6938 * Collect buffers from the cpu_buffer pages list and the 6939 * reader_page on old_pages, so they can be freed later when not 6940 * under a spinlock. The pages list is a linked list with no 6941 * head, adding old_pages turns it into a regular list with 6942 * old_pages being the head. 6943 */ 6944 list_add(&old_pages, cpu_buffer->pages); 6945 list_add(&cpu_buffer->reader_page->list, &old_pages); 6946 6947 /* One page was allocated for the reader page */ 6948 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6949 struct buffer_page, list); 6950 list_del_init(&cpu_buffer->reader_page->list); 6951 6952 /* Install the new pages, remove the head from the list */ 6953 cpu_buffer->pages = cpu_buffer->new_pages.next; 6954 list_del_init(&cpu_buffer->new_pages); 6955 cpu_buffer->cnt++; 6956 6957 cpu_buffer->head_page 6958 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6959 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6960 6961 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6962 cpu_buffer->nr_pages_to_update = 0; 6963 6964 old_free_data_page = cpu_buffer->free_page; 6965 cpu_buffer->free_page = NULL; 6966 6967 rb_head_page_activate(cpu_buffer); 6968 6969 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6970 6971 /* Free old sub buffers */ 6972 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6973 list_del_init(&bpage->list); 6974 free_buffer_page(bpage); 6975 } 6976 free_pages((unsigned long)old_free_data_page, old_order); 6977 6978 rb_check_pages(cpu_buffer); 6979 } 6980 6981 atomic_dec(&buffer->record_disabled); 6982 6983 return 0; 6984 6985 error: 6986 buffer->subbuf_order = old_order; 6987 buffer->subbuf_size = old_size; 6988 6989 atomic_dec(&buffer->record_disabled); 6990 6991 for_each_buffer_cpu(buffer, cpu) { 6992 cpu_buffer = buffer->buffers[cpu]; 6993 6994 if (!cpu_buffer->nr_pages_to_update) 6995 continue; 6996 6997 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6998 list_del_init(&bpage->list); 6999 free_buffer_page(bpage); 7000 } 7001 } 7002 7003 return err; 7004 } 7005 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 7006 7007 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7008 { 7009 struct page *page; 7010 7011 if (cpu_buffer->meta_page) 7012 return 0; 7013 7014 page = alloc_page(GFP_USER | __GFP_ZERO); 7015 if (!page) 7016 return -ENOMEM; 7017 7018 cpu_buffer->meta_page = page_to_virt(page); 7019 7020 return 0; 7021 } 7022 7023 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 7024 { 7025 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 7026 7027 free_page(addr); 7028 cpu_buffer->meta_page = NULL; 7029 } 7030 7031 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7032 unsigned long *subbuf_ids) 7033 { 7034 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7035 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7036 struct buffer_page *first_subbuf, *subbuf; 7037 int cnt = 0; 7038 int id = 0; 7039 7040 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7041 subbuf_ids[id++] = (unsigned long)cpu_buffer->reader_page->page; 7042 cnt++; 7043 7044 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7045 do { 7046 id = rb_page_id(cpu_buffer, subbuf, id); 7047 7048 if (WARN_ON(id >= nr_subbufs)) 7049 break; 7050 7051 subbuf_ids[id] = (unsigned long)subbuf->page; 7052 7053 rb_inc_page(&subbuf); 7054 id++; 7055 cnt++; 7056 } while (subbuf != first_subbuf); 7057 7058 WARN_ON(cnt != nr_subbufs); 7059 7060 /* install subbuf ID to kern VA translation */ 7061 cpu_buffer->subbuf_ids = subbuf_ids; 7062 7063 meta->meta_struct_len = sizeof(*meta); 7064 meta->nr_subbufs = nr_subbufs; 7065 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7066 meta->meta_page_size = meta->subbuf_size; 7067 7068 rb_update_meta_page(cpu_buffer); 7069 } 7070 7071 static struct ring_buffer_per_cpu * 7072 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7073 { 7074 struct ring_buffer_per_cpu *cpu_buffer; 7075 7076 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7077 return ERR_PTR(-EINVAL); 7078 7079 cpu_buffer = buffer->buffers[cpu]; 7080 7081 mutex_lock(&cpu_buffer->mapping_lock); 7082 7083 if (!cpu_buffer->user_mapped) { 7084 mutex_unlock(&cpu_buffer->mapping_lock); 7085 return ERR_PTR(-ENODEV); 7086 } 7087 7088 return cpu_buffer; 7089 } 7090 7091 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7092 { 7093 mutex_unlock(&cpu_buffer->mapping_lock); 7094 } 7095 7096 /* 7097 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7098 * to be set-up or torn-down. 7099 */ 7100 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7101 bool inc) 7102 { 7103 unsigned long flags; 7104 7105 lockdep_assert_held(&cpu_buffer->mapping_lock); 7106 7107 /* mapped is always greater or equal to user_mapped */ 7108 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7109 return -EINVAL; 7110 7111 if (inc && cpu_buffer->mapped == UINT_MAX) 7112 return -EBUSY; 7113 7114 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7115 return -EINVAL; 7116 7117 mutex_lock(&cpu_buffer->buffer->mutex); 7118 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7119 7120 if (inc) { 7121 cpu_buffer->user_mapped++; 7122 cpu_buffer->mapped++; 7123 } else { 7124 cpu_buffer->user_mapped--; 7125 cpu_buffer->mapped--; 7126 } 7127 7128 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7129 mutex_unlock(&cpu_buffer->buffer->mutex); 7130 7131 return 0; 7132 } 7133 7134 /* 7135 * +--------------+ pgoff == 0 7136 * | meta page | 7137 * +--------------+ pgoff == 1 7138 * | subbuffer 0 | 7139 * | | 7140 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7141 * | subbuffer 1 | 7142 * | | 7143 * ... 7144 */ 7145 #ifdef CONFIG_MMU 7146 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7147 struct vm_area_struct *vma) 7148 { 7149 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7150 unsigned int subbuf_pages, subbuf_order; 7151 struct page **pages __free(kfree) = NULL; 7152 int p = 0, s = 0; 7153 int err; 7154 7155 /* Refuse MP_PRIVATE or writable mappings */ 7156 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7157 !(vma->vm_flags & VM_MAYSHARE)) 7158 return -EPERM; 7159 7160 subbuf_order = cpu_buffer->buffer->subbuf_order; 7161 subbuf_pages = 1 << subbuf_order; 7162 7163 if (subbuf_order && pgoff % subbuf_pages) 7164 return -EINVAL; 7165 7166 /* 7167 * Make sure the mapping cannot become writable later. Also tell the VM 7168 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7169 */ 7170 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7171 VM_MAYWRITE); 7172 7173 lockdep_assert_held(&cpu_buffer->mapping_lock); 7174 7175 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7176 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7177 if (nr_pages <= pgoff) 7178 return -EINVAL; 7179 7180 nr_pages -= pgoff; 7181 7182 nr_vma_pages = vma_pages(vma); 7183 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7184 return -EINVAL; 7185 7186 nr_pages = nr_vma_pages; 7187 7188 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7189 if (!pages) 7190 return -ENOMEM; 7191 7192 if (!pgoff) { 7193 unsigned long meta_page_padding; 7194 7195 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7196 7197 /* 7198 * Pad with the zero-page to align the meta-page with the 7199 * sub-buffers. 7200 */ 7201 meta_page_padding = subbuf_pages - 1; 7202 while (meta_page_padding-- && p < nr_pages) { 7203 unsigned long __maybe_unused zero_addr = 7204 vma->vm_start + (PAGE_SIZE * p); 7205 7206 pages[p++] = ZERO_PAGE(zero_addr); 7207 } 7208 } else { 7209 /* Skip the meta-page */ 7210 pgoff -= subbuf_pages; 7211 7212 s += pgoff / subbuf_pages; 7213 } 7214 7215 while (p < nr_pages) { 7216 struct page *page; 7217 int off = 0; 7218 7219 if (WARN_ON_ONCE(s >= nr_subbufs)) 7220 return -EINVAL; 7221 7222 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7223 7224 for (; off < (1 << (subbuf_order)); off++, page++) { 7225 if (p >= nr_pages) 7226 break; 7227 7228 pages[p++] = page; 7229 } 7230 s++; 7231 } 7232 7233 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7234 7235 return err; 7236 } 7237 #else 7238 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7239 struct vm_area_struct *vma) 7240 { 7241 return -EOPNOTSUPP; 7242 } 7243 #endif 7244 7245 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7246 struct vm_area_struct *vma) 7247 { 7248 struct ring_buffer_per_cpu *cpu_buffer; 7249 unsigned long flags, *subbuf_ids; 7250 int err; 7251 7252 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7253 return -EINVAL; 7254 7255 cpu_buffer = buffer->buffers[cpu]; 7256 7257 guard(mutex)(&cpu_buffer->mapping_lock); 7258 7259 if (cpu_buffer->user_mapped) { 7260 err = __rb_map_vma(cpu_buffer, vma); 7261 if (!err) 7262 err = __rb_inc_dec_mapped(cpu_buffer, true); 7263 return err; 7264 } 7265 7266 /* prevent another thread from changing buffer/sub-buffer sizes */ 7267 guard(mutex)(&buffer->mutex); 7268 7269 err = rb_alloc_meta_page(cpu_buffer); 7270 if (err) 7271 return err; 7272 7273 /* subbuf_ids include the reader while nr_pages does not */ 7274 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7275 if (!subbuf_ids) { 7276 rb_free_meta_page(cpu_buffer); 7277 return -ENOMEM; 7278 } 7279 7280 atomic_inc(&cpu_buffer->resize_disabled); 7281 7282 /* 7283 * Lock all readers to block any subbuf swap until the subbuf IDs are 7284 * assigned. 7285 */ 7286 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7287 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7288 7289 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7290 7291 err = __rb_map_vma(cpu_buffer, vma); 7292 if (!err) { 7293 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7294 /* This is the first time it is mapped by user */ 7295 cpu_buffer->mapped++; 7296 cpu_buffer->user_mapped = 1; 7297 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7298 } else { 7299 kfree(cpu_buffer->subbuf_ids); 7300 cpu_buffer->subbuf_ids = NULL; 7301 rb_free_meta_page(cpu_buffer); 7302 atomic_dec(&cpu_buffer->resize_disabled); 7303 } 7304 7305 return err; 7306 } 7307 7308 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7309 { 7310 struct ring_buffer_per_cpu *cpu_buffer; 7311 unsigned long flags; 7312 7313 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7314 return -EINVAL; 7315 7316 cpu_buffer = buffer->buffers[cpu]; 7317 7318 guard(mutex)(&cpu_buffer->mapping_lock); 7319 7320 if (!cpu_buffer->user_mapped) { 7321 return -ENODEV; 7322 } else if (cpu_buffer->user_mapped > 1) { 7323 __rb_inc_dec_mapped(cpu_buffer, false); 7324 return 0; 7325 } 7326 7327 guard(mutex)(&buffer->mutex); 7328 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7329 7330 /* This is the last user space mapping */ 7331 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7332 cpu_buffer->mapped--; 7333 cpu_buffer->user_mapped = 0; 7334 7335 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7336 7337 kfree(cpu_buffer->subbuf_ids); 7338 cpu_buffer->subbuf_ids = NULL; 7339 rb_free_meta_page(cpu_buffer); 7340 atomic_dec(&cpu_buffer->resize_disabled); 7341 7342 return 0; 7343 } 7344 7345 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7346 { 7347 struct ring_buffer_per_cpu *cpu_buffer; 7348 struct buffer_page *reader; 7349 unsigned long missed_events; 7350 unsigned long reader_size; 7351 unsigned long flags; 7352 7353 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7354 if (IS_ERR(cpu_buffer)) 7355 return (int)PTR_ERR(cpu_buffer); 7356 7357 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7358 7359 consume: 7360 if (rb_per_cpu_empty(cpu_buffer)) 7361 goto out; 7362 7363 reader_size = rb_page_size(cpu_buffer->reader_page); 7364 7365 /* 7366 * There are data to be read on the current reader page, we can 7367 * return to the caller. But before that, we assume the latter will read 7368 * everything. Let's update the kernel reader accordingly. 7369 */ 7370 if (cpu_buffer->reader_page->read < reader_size) { 7371 while (cpu_buffer->reader_page->read < reader_size) 7372 rb_advance_reader(cpu_buffer); 7373 goto out; 7374 } 7375 7376 /* Did the reader catch up with the writer? */ 7377 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 7378 goto out; 7379 7380 reader = rb_get_reader_page(cpu_buffer); 7381 if (WARN_ON(!reader)) 7382 goto out; 7383 7384 /* Check if any events were dropped */ 7385 missed_events = cpu_buffer->lost_events; 7386 7387 if (missed_events) { 7388 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7389 struct buffer_data_page *bpage = reader->page; 7390 unsigned int commit; 7391 /* 7392 * Use the real_end for the data size, 7393 * This gives us a chance to store the lost events 7394 * on the page. 7395 */ 7396 if (reader->real_end) 7397 local_set(&bpage->commit, reader->real_end); 7398 /* 7399 * If there is room at the end of the page to save the 7400 * missed events, then record it there. 7401 */ 7402 commit = rb_page_size(reader); 7403 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7404 memcpy(&bpage->data[commit], &missed_events, 7405 sizeof(missed_events)); 7406 local_add(RB_MISSED_STORED, &bpage->commit); 7407 } 7408 local_add(RB_MISSED_EVENTS, &bpage->commit); 7409 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7410 "Reader on commit with %ld missed events", 7411 missed_events)) { 7412 /* 7413 * There shouldn't be any missed events if the tail_page 7414 * is on the reader page. But if the tail page is not on the 7415 * reader page and the commit_page is, that would mean that 7416 * there's a commit_overrun (an interrupt preempted an 7417 * addition of an event and then filled the buffer 7418 * with new events). In this case it's not an 7419 * error, but it should still be reported. 7420 * 7421 * TODO: Add missed events to the page for user space to know. 7422 */ 7423 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7424 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7425 } 7426 } 7427 7428 cpu_buffer->lost_events = 0; 7429 7430 goto consume; 7431 7432 out: 7433 /* Some archs do not have data cache coherency between kernel and user-space */ 7434 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7435 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7436 7437 rb_update_meta_page(cpu_buffer); 7438 7439 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7440 rb_put_mapped_buffer(cpu_buffer); 7441 7442 return 0; 7443 } 7444 7445 /* 7446 * We only allocate new buffers, never free them if the CPU goes down. 7447 * If we were to free the buffer, then the user would lose any trace that was in 7448 * the buffer. 7449 */ 7450 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7451 { 7452 struct trace_buffer *buffer; 7453 long nr_pages_same; 7454 int cpu_i; 7455 unsigned long nr_pages; 7456 7457 buffer = container_of(node, struct trace_buffer, node); 7458 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7459 return 0; 7460 7461 nr_pages = 0; 7462 nr_pages_same = 1; 7463 /* check if all cpu sizes are same */ 7464 for_each_buffer_cpu(buffer, cpu_i) { 7465 /* fill in the size from first enabled cpu */ 7466 if (nr_pages == 0) 7467 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7468 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7469 nr_pages_same = 0; 7470 break; 7471 } 7472 } 7473 /* allocate minimum pages, user can later expand it */ 7474 if (!nr_pages_same) 7475 nr_pages = 2; 7476 buffer->buffers[cpu] = 7477 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7478 if (!buffer->buffers[cpu]) { 7479 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7480 cpu); 7481 return -ENOMEM; 7482 } 7483 smp_wmb(); 7484 cpumask_set_cpu(cpu, buffer->cpumask); 7485 return 0; 7486 } 7487 7488 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7489 /* 7490 * This is a basic integrity check of the ring buffer. 7491 * Late in the boot cycle this test will run when configured in. 7492 * It will kick off a thread per CPU that will go into a loop 7493 * writing to the per cpu ring buffer various sizes of data. 7494 * Some of the data will be large items, some small. 7495 * 7496 * Another thread is created that goes into a spin, sending out 7497 * IPIs to the other CPUs to also write into the ring buffer. 7498 * this is to test the nesting ability of the buffer. 7499 * 7500 * Basic stats are recorded and reported. If something in the 7501 * ring buffer should happen that's not expected, a big warning 7502 * is displayed and all ring buffers are disabled. 7503 */ 7504 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7505 7506 struct rb_test_data { 7507 struct trace_buffer *buffer; 7508 unsigned long events; 7509 unsigned long bytes_written; 7510 unsigned long bytes_alloc; 7511 unsigned long bytes_dropped; 7512 unsigned long events_nested; 7513 unsigned long bytes_written_nested; 7514 unsigned long bytes_alloc_nested; 7515 unsigned long bytes_dropped_nested; 7516 int min_size_nested; 7517 int max_size_nested; 7518 int max_size; 7519 int min_size; 7520 int cpu; 7521 int cnt; 7522 }; 7523 7524 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7525 7526 /* 1 meg per cpu */ 7527 #define RB_TEST_BUFFER_SIZE 1048576 7528 7529 static char rb_string[] __initdata = 7530 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7531 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7532 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7533 7534 static bool rb_test_started __initdata; 7535 7536 struct rb_item { 7537 int size; 7538 char str[]; 7539 }; 7540 7541 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7542 { 7543 struct ring_buffer_event *event; 7544 struct rb_item *item; 7545 bool started; 7546 int event_len; 7547 int size; 7548 int len; 7549 int cnt; 7550 7551 /* Have nested writes different that what is written */ 7552 cnt = data->cnt + (nested ? 27 : 0); 7553 7554 /* Multiply cnt by ~e, to make some unique increment */ 7555 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7556 7557 len = size + sizeof(struct rb_item); 7558 7559 started = rb_test_started; 7560 /* read rb_test_started before checking buffer enabled */ 7561 smp_rmb(); 7562 7563 event = ring_buffer_lock_reserve(data->buffer, len); 7564 if (!event) { 7565 /* Ignore dropped events before test starts. */ 7566 if (started) { 7567 if (nested) 7568 data->bytes_dropped_nested += len; 7569 else 7570 data->bytes_dropped += len; 7571 } 7572 return len; 7573 } 7574 7575 event_len = ring_buffer_event_length(event); 7576 7577 if (RB_WARN_ON(data->buffer, event_len < len)) 7578 goto out; 7579 7580 item = ring_buffer_event_data(event); 7581 item->size = size; 7582 memcpy(item->str, rb_string, size); 7583 7584 if (nested) { 7585 data->bytes_alloc_nested += event_len; 7586 data->bytes_written_nested += len; 7587 data->events_nested++; 7588 if (!data->min_size_nested || len < data->min_size_nested) 7589 data->min_size_nested = len; 7590 if (len > data->max_size_nested) 7591 data->max_size_nested = len; 7592 } else { 7593 data->bytes_alloc += event_len; 7594 data->bytes_written += len; 7595 data->events++; 7596 if (!data->min_size || len < data->min_size) 7597 data->max_size = len; 7598 if (len > data->max_size) 7599 data->max_size = len; 7600 } 7601 7602 out: 7603 ring_buffer_unlock_commit(data->buffer); 7604 7605 return 0; 7606 } 7607 7608 static __init int rb_test(void *arg) 7609 { 7610 struct rb_test_data *data = arg; 7611 7612 while (!kthread_should_stop()) { 7613 rb_write_something(data, false); 7614 data->cnt++; 7615 7616 set_current_state(TASK_INTERRUPTIBLE); 7617 /* Now sleep between a min of 100-300us and a max of 1ms */ 7618 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7619 } 7620 7621 return 0; 7622 } 7623 7624 static __init void rb_ipi(void *ignore) 7625 { 7626 struct rb_test_data *data; 7627 int cpu = smp_processor_id(); 7628 7629 data = &rb_data[cpu]; 7630 rb_write_something(data, true); 7631 } 7632 7633 static __init int rb_hammer_test(void *arg) 7634 { 7635 while (!kthread_should_stop()) { 7636 7637 /* Send an IPI to all cpus to write data! */ 7638 smp_call_function(rb_ipi, NULL, 1); 7639 /* No sleep, but for non preempt, let others run */ 7640 schedule(); 7641 } 7642 7643 return 0; 7644 } 7645 7646 static __init int test_ringbuffer(void) 7647 { 7648 struct task_struct *rb_hammer; 7649 struct trace_buffer *buffer; 7650 int cpu; 7651 int ret = 0; 7652 7653 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7654 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7655 return 0; 7656 } 7657 7658 pr_info("Running ring buffer tests...\n"); 7659 7660 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7661 if (WARN_ON(!buffer)) 7662 return 0; 7663 7664 /* Disable buffer so that threads can't write to it yet */ 7665 ring_buffer_record_off(buffer); 7666 7667 for_each_online_cpu(cpu) { 7668 rb_data[cpu].buffer = buffer; 7669 rb_data[cpu].cpu = cpu; 7670 rb_data[cpu].cnt = cpu; 7671 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7672 cpu, "rbtester/%u"); 7673 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7674 pr_cont("FAILED\n"); 7675 ret = PTR_ERR(rb_threads[cpu]); 7676 goto out_free; 7677 } 7678 } 7679 7680 /* Now create the rb hammer! */ 7681 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7682 if (WARN_ON(IS_ERR(rb_hammer))) { 7683 pr_cont("FAILED\n"); 7684 ret = PTR_ERR(rb_hammer); 7685 goto out_free; 7686 } 7687 7688 ring_buffer_record_on(buffer); 7689 /* 7690 * Show buffer is enabled before setting rb_test_started. 7691 * Yes there's a small race window where events could be 7692 * dropped and the thread won't catch it. But when a ring 7693 * buffer gets enabled, there will always be some kind of 7694 * delay before other CPUs see it. Thus, we don't care about 7695 * those dropped events. We care about events dropped after 7696 * the threads see that the buffer is active. 7697 */ 7698 smp_wmb(); 7699 rb_test_started = true; 7700 7701 set_current_state(TASK_INTERRUPTIBLE); 7702 /* Just run for 10 seconds */ 7703 schedule_timeout(10 * HZ); 7704 7705 kthread_stop(rb_hammer); 7706 7707 out_free: 7708 for_each_online_cpu(cpu) { 7709 if (!rb_threads[cpu]) 7710 break; 7711 kthread_stop(rb_threads[cpu]); 7712 } 7713 if (ret) { 7714 ring_buffer_free(buffer); 7715 return ret; 7716 } 7717 7718 /* Report! */ 7719 pr_info("finished\n"); 7720 for_each_online_cpu(cpu) { 7721 struct ring_buffer_event *event; 7722 struct rb_test_data *data = &rb_data[cpu]; 7723 struct rb_item *item; 7724 unsigned long total_events; 7725 unsigned long total_dropped; 7726 unsigned long total_written; 7727 unsigned long total_alloc; 7728 unsigned long total_read = 0; 7729 unsigned long total_size = 0; 7730 unsigned long total_len = 0; 7731 unsigned long total_lost = 0; 7732 unsigned long lost; 7733 int big_event_size; 7734 int small_event_size; 7735 7736 ret = -1; 7737 7738 total_events = data->events + data->events_nested; 7739 total_written = data->bytes_written + data->bytes_written_nested; 7740 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7741 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7742 7743 big_event_size = data->max_size + data->max_size_nested; 7744 small_event_size = data->min_size + data->min_size_nested; 7745 7746 pr_info("CPU %d:\n", cpu); 7747 pr_info(" events: %ld\n", total_events); 7748 pr_info(" dropped bytes: %ld\n", total_dropped); 7749 pr_info(" alloced bytes: %ld\n", total_alloc); 7750 pr_info(" written bytes: %ld\n", total_written); 7751 pr_info(" biggest event: %d\n", big_event_size); 7752 pr_info(" smallest event: %d\n", small_event_size); 7753 7754 if (RB_WARN_ON(buffer, total_dropped)) 7755 break; 7756 7757 ret = 0; 7758 7759 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7760 total_lost += lost; 7761 item = ring_buffer_event_data(event); 7762 total_len += ring_buffer_event_length(event); 7763 total_size += item->size + sizeof(struct rb_item); 7764 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7765 pr_info("FAILED!\n"); 7766 pr_info("buffer had: %.*s\n", item->size, item->str); 7767 pr_info("expected: %.*s\n", item->size, rb_string); 7768 RB_WARN_ON(buffer, 1); 7769 ret = -1; 7770 break; 7771 } 7772 total_read++; 7773 } 7774 if (ret) 7775 break; 7776 7777 ret = -1; 7778 7779 pr_info(" read events: %ld\n", total_read); 7780 pr_info(" lost events: %ld\n", total_lost); 7781 pr_info(" total events: %ld\n", total_lost + total_read); 7782 pr_info(" recorded len bytes: %ld\n", total_len); 7783 pr_info(" recorded size bytes: %ld\n", total_size); 7784 if (total_lost) { 7785 pr_info(" With dropped events, record len and size may not match\n" 7786 " alloced and written from above\n"); 7787 } else { 7788 if (RB_WARN_ON(buffer, total_len != total_alloc || 7789 total_size != total_written)) 7790 break; 7791 } 7792 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7793 break; 7794 7795 ret = 0; 7796 } 7797 if (!ret) 7798 pr_info("Ring buffer PASSED!\n"); 7799 7800 ring_buffer_free(buffer); 7801 return 0; 7802 } 7803 7804 late_initcall(test_ringbuffer); 7805 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7806