1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 #include <asm/setup.h> 35 36 #include "trace.h" 37 38 /* 39 * The "absolute" timestamp in the buffer is only 59 bits. 40 * If a clock has the 5 MSBs set, it needs to be saved and 41 * reinserted. 42 */ 43 #define TS_MSB (0xf8ULL << 56) 44 #define ABS_TS_MASK (~TS_MSB) 45 46 static void update_pages_handler(struct work_struct *work); 47 48 #define RING_BUFFER_META_MAGIC 0xBADFEED 49 50 struct ring_buffer_meta { 51 int magic; 52 int struct_sizes; 53 unsigned long total_size; 54 unsigned long buffers_offset; 55 }; 56 57 struct ring_buffer_cpu_meta { 58 unsigned long first_buffer; 59 unsigned long head_buffer; 60 unsigned long commit_buffer; 61 __u32 subbuf_size; 62 __u32 nr_subbufs; 63 int buffers[]; 64 }; 65 66 /* 67 * The ring buffer header is special. We must manually up keep it. 68 */ 69 int ring_buffer_print_entry_header(struct trace_seq *s) 70 { 71 trace_seq_puts(s, "# compressed entry header\n"); 72 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 73 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 74 trace_seq_puts(s, "\tarray : 32 bits\n"); 75 trace_seq_putc(s, '\n'); 76 trace_seq_printf(s, "\tpadding : type == %d\n", 77 RINGBUF_TYPE_PADDING); 78 trace_seq_printf(s, "\ttime_extend : type == %d\n", 79 RINGBUF_TYPE_TIME_EXTEND); 80 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 81 RINGBUF_TYPE_TIME_STAMP); 82 trace_seq_printf(s, "\tdata max type_len == %d\n", 83 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 84 85 return !trace_seq_has_overflowed(s); 86 } 87 88 /* 89 * The ring buffer is made up of a list of pages. A separate list of pages is 90 * allocated for each CPU. A writer may only write to a buffer that is 91 * associated with the CPU it is currently executing on. A reader may read 92 * from any per cpu buffer. 93 * 94 * The reader is special. For each per cpu buffer, the reader has its own 95 * reader page. When a reader has read the entire reader page, this reader 96 * page is swapped with another page in the ring buffer. 97 * 98 * Now, as long as the writer is off the reader page, the reader can do what 99 * ever it wants with that page. The writer will never write to that page 100 * again (as long as it is out of the ring buffer). 101 * 102 * Here's some silly ASCII art. 103 * 104 * +------+ 105 * |reader| RING BUFFER 106 * |page | 107 * +------+ +---+ +---+ +---+ 108 * | |-->| |-->| | 109 * +---+ +---+ +---+ 110 * ^ | 111 * | | 112 * +---------------+ 113 * 114 * 115 * +------+ 116 * |reader| RING BUFFER 117 * |page |------------------v 118 * +------+ +---+ +---+ +---+ 119 * | |-->| |-->| | 120 * +---+ +---+ +---+ 121 * ^ | 122 * | | 123 * +---------------+ 124 * 125 * 126 * +------+ 127 * |reader| RING BUFFER 128 * |page |------------------v 129 * +------+ +---+ +---+ +---+ 130 * ^ | |-->| |-->| | 131 * | +---+ +---+ +---+ 132 * | | 133 * | | 134 * +------------------------------+ 135 * 136 * 137 * +------+ 138 * |buffer| RING BUFFER 139 * |page |------------------v 140 * +------+ +---+ +---+ +---+ 141 * ^ | | | |-->| | 142 * | New +---+ +---+ +---+ 143 * | Reader------^ | 144 * | page | 145 * +------------------------------+ 146 * 147 * 148 * After we make this swap, the reader can hand this page off to the splice 149 * code and be done with it. It can even allocate a new page if it needs to 150 * and swap that into the ring buffer. 151 * 152 * We will be using cmpxchg soon to make all this lockless. 153 * 154 */ 155 156 /* Used for individual buffers (after the counter) */ 157 #define RB_BUFFER_OFF (1 << 20) 158 159 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 160 161 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 162 #define RB_ALIGNMENT 4U 163 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 164 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 165 166 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 167 # define RB_FORCE_8BYTE_ALIGNMENT 0 168 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 169 #else 170 # define RB_FORCE_8BYTE_ALIGNMENT 1 171 # define RB_ARCH_ALIGNMENT 8U 172 #endif 173 174 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 175 176 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 177 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 178 179 enum { 180 RB_LEN_TIME_EXTEND = 8, 181 RB_LEN_TIME_STAMP = 8, 182 }; 183 184 #define skip_time_extend(event) \ 185 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 186 187 #define extended_time(event) \ 188 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 189 190 static inline bool rb_null_event(struct ring_buffer_event *event) 191 { 192 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 193 } 194 195 static void rb_event_set_padding(struct ring_buffer_event *event) 196 { 197 /* padding has a NULL time_delta */ 198 event->type_len = RINGBUF_TYPE_PADDING; 199 event->time_delta = 0; 200 } 201 202 static unsigned 203 rb_event_data_length(struct ring_buffer_event *event) 204 { 205 unsigned length; 206 207 if (event->type_len) 208 length = event->type_len * RB_ALIGNMENT; 209 else 210 length = event->array[0]; 211 return length + RB_EVNT_HDR_SIZE; 212 } 213 214 /* 215 * Return the length of the given event. Will return 216 * the length of the time extend if the event is a 217 * time extend. 218 */ 219 static inline unsigned 220 rb_event_length(struct ring_buffer_event *event) 221 { 222 switch (event->type_len) { 223 case RINGBUF_TYPE_PADDING: 224 if (rb_null_event(event)) 225 /* undefined */ 226 return -1; 227 return event->array[0] + RB_EVNT_HDR_SIZE; 228 229 case RINGBUF_TYPE_TIME_EXTEND: 230 return RB_LEN_TIME_EXTEND; 231 232 case RINGBUF_TYPE_TIME_STAMP: 233 return RB_LEN_TIME_STAMP; 234 235 case RINGBUF_TYPE_DATA: 236 return rb_event_data_length(event); 237 default: 238 WARN_ON_ONCE(1); 239 } 240 /* not hit */ 241 return 0; 242 } 243 244 /* 245 * Return total length of time extend and data, 246 * or just the event length for all other events. 247 */ 248 static inline unsigned 249 rb_event_ts_length(struct ring_buffer_event *event) 250 { 251 unsigned len = 0; 252 253 if (extended_time(event)) { 254 /* time extends include the data event after it */ 255 len = RB_LEN_TIME_EXTEND; 256 event = skip_time_extend(event); 257 } 258 return len + rb_event_length(event); 259 } 260 261 /** 262 * ring_buffer_event_length - return the length of the event 263 * @event: the event to get the length of 264 * 265 * Returns the size of the data load of a data event. 266 * If the event is something other than a data event, it 267 * returns the size of the event itself. With the exception 268 * of a TIME EXTEND, where it still returns the size of the 269 * data load of the data event after it. 270 */ 271 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 272 { 273 unsigned length; 274 275 if (extended_time(event)) 276 event = skip_time_extend(event); 277 278 length = rb_event_length(event); 279 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 280 return length; 281 length -= RB_EVNT_HDR_SIZE; 282 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 283 length -= sizeof(event->array[0]); 284 return length; 285 } 286 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 287 288 /* inline for ring buffer fast paths */ 289 static __always_inline void * 290 rb_event_data(struct ring_buffer_event *event) 291 { 292 if (extended_time(event)) 293 event = skip_time_extend(event); 294 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 295 /* If length is in len field, then array[0] has the data */ 296 if (event->type_len) 297 return (void *)&event->array[0]; 298 /* Otherwise length is in array[0] and array[1] has the data */ 299 return (void *)&event->array[1]; 300 } 301 302 /** 303 * ring_buffer_event_data - return the data of the event 304 * @event: the event to get the data from 305 */ 306 void *ring_buffer_event_data(struct ring_buffer_event *event) 307 { 308 return rb_event_data(event); 309 } 310 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 311 312 #define for_each_buffer_cpu(buffer, cpu) \ 313 for_each_cpu(cpu, buffer->cpumask) 314 315 #define for_each_online_buffer_cpu(buffer, cpu) \ 316 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 317 318 #define TS_SHIFT 27 319 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 320 #define TS_DELTA_TEST (~TS_MASK) 321 322 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 323 { 324 u64 ts; 325 326 ts = event->array[0]; 327 ts <<= TS_SHIFT; 328 ts += event->time_delta; 329 330 return ts; 331 } 332 333 /* Flag when events were overwritten */ 334 #define RB_MISSED_EVENTS (1 << 31) 335 /* Missed count stored at end */ 336 #define RB_MISSED_STORED (1 << 30) 337 338 #define RB_MISSED_MASK (3 << 30) 339 340 struct buffer_data_page { 341 u64 time_stamp; /* page time stamp */ 342 local_t commit; /* write committed index */ 343 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 344 }; 345 346 struct buffer_data_read_page { 347 unsigned order; /* order of the page */ 348 struct buffer_data_page *data; /* actual data, stored in this page */ 349 }; 350 351 /* 352 * Note, the buffer_page list must be first. The buffer pages 353 * are allocated in cache lines, which means that each buffer 354 * page will be at the beginning of a cache line, and thus 355 * the least significant bits will be zero. We use this to 356 * add flags in the list struct pointers, to make the ring buffer 357 * lockless. 358 */ 359 struct buffer_page { 360 struct list_head list; /* list of buffer pages */ 361 local_t write; /* index for next write */ 362 unsigned read; /* index for next read */ 363 local_t entries; /* entries on this page */ 364 unsigned long real_end; /* real end of data */ 365 unsigned order; /* order of the page */ 366 u32 id:30; /* ID for external mapping */ 367 u32 range:1; /* Mapped via a range */ 368 struct buffer_data_page *page; /* Actual data page */ 369 }; 370 371 /* 372 * The buffer page counters, write and entries, must be reset 373 * atomically when crossing page boundaries. To synchronize this 374 * update, two counters are inserted into the number. One is 375 * the actual counter for the write position or count on the page. 376 * 377 * The other is a counter of updaters. Before an update happens 378 * the update partition of the counter is incremented. This will 379 * allow the updater to update the counter atomically. 380 * 381 * The counter is 20 bits, and the state data is 12. 382 */ 383 #define RB_WRITE_MASK 0xfffff 384 #define RB_WRITE_INTCNT (1 << 20) 385 386 static void rb_init_page(struct buffer_data_page *bpage) 387 { 388 local_set(&bpage->commit, 0); 389 } 390 391 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 392 { 393 return local_read(&bpage->page->commit); 394 } 395 396 static void free_buffer_page(struct buffer_page *bpage) 397 { 398 /* Range pages are not to be freed */ 399 if (!bpage->range) 400 free_pages((unsigned long)bpage->page, bpage->order); 401 kfree(bpage); 402 } 403 404 /* 405 * We need to fit the time_stamp delta into 27 bits. 406 */ 407 static inline bool test_time_stamp(u64 delta) 408 { 409 return !!(delta & TS_DELTA_TEST); 410 } 411 412 struct rb_irq_work { 413 struct irq_work work; 414 wait_queue_head_t waiters; 415 wait_queue_head_t full_waiters; 416 atomic_t seq; 417 bool waiters_pending; 418 bool full_waiters_pending; 419 bool wakeup_full; 420 }; 421 422 /* 423 * Structure to hold event state and handle nested events. 424 */ 425 struct rb_event_info { 426 u64 ts; 427 u64 delta; 428 u64 before; 429 u64 after; 430 unsigned long length; 431 struct buffer_page *tail_page; 432 int add_timestamp; 433 }; 434 435 /* 436 * Used for the add_timestamp 437 * NONE 438 * EXTEND - wants a time extend 439 * ABSOLUTE - the buffer requests all events to have absolute time stamps 440 * FORCE - force a full time stamp. 441 */ 442 enum { 443 RB_ADD_STAMP_NONE = 0, 444 RB_ADD_STAMP_EXTEND = BIT(1), 445 RB_ADD_STAMP_ABSOLUTE = BIT(2), 446 RB_ADD_STAMP_FORCE = BIT(3) 447 }; 448 /* 449 * Used for which event context the event is in. 450 * TRANSITION = 0 451 * NMI = 1 452 * IRQ = 2 453 * SOFTIRQ = 3 454 * NORMAL = 4 455 * 456 * See trace_recursive_lock() comment below for more details. 457 */ 458 enum { 459 RB_CTX_TRANSITION, 460 RB_CTX_NMI, 461 RB_CTX_IRQ, 462 RB_CTX_SOFTIRQ, 463 RB_CTX_NORMAL, 464 RB_CTX_MAX 465 }; 466 467 struct rb_time_struct { 468 local64_t time; 469 }; 470 typedef struct rb_time_struct rb_time_t; 471 472 #define MAX_NEST 5 473 474 /* 475 * head_page == tail_page && head == tail then buffer is empty. 476 */ 477 struct ring_buffer_per_cpu { 478 int cpu; 479 atomic_t record_disabled; 480 atomic_t resize_disabled; 481 struct trace_buffer *buffer; 482 raw_spinlock_t reader_lock; /* serialize readers */ 483 arch_spinlock_t lock; 484 struct lock_class_key lock_key; 485 struct buffer_data_page *free_page; 486 unsigned long nr_pages; 487 unsigned int current_context; 488 struct list_head *pages; 489 /* pages generation counter, incremented when the list changes */ 490 unsigned long cnt; 491 struct buffer_page *head_page; /* read from head */ 492 struct buffer_page *tail_page; /* write to tail */ 493 struct buffer_page *commit_page; /* committed pages */ 494 struct buffer_page *reader_page; 495 unsigned long lost_events; 496 unsigned long last_overrun; 497 unsigned long nest; 498 local_t entries_bytes; 499 local_t entries; 500 local_t overrun; 501 local_t commit_overrun; 502 local_t dropped_events; 503 local_t committing; 504 local_t commits; 505 local_t pages_touched; 506 local_t pages_lost; 507 local_t pages_read; 508 long last_pages_touch; 509 size_t shortest_full; 510 unsigned long read; 511 unsigned long read_bytes; 512 rb_time_t write_stamp; 513 rb_time_t before_stamp; 514 u64 event_stamp[MAX_NEST]; 515 u64 read_stamp; 516 /* pages removed since last reset */ 517 unsigned long pages_removed; 518 519 unsigned int mapped; 520 unsigned int user_mapped; /* user space mapping */ 521 struct mutex mapping_lock; 522 unsigned long *subbuf_ids; /* ID to subbuf VA */ 523 struct trace_buffer_meta *meta_page; 524 struct ring_buffer_cpu_meta *ring_meta; 525 526 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 527 long nr_pages_to_update; 528 struct list_head new_pages; /* new pages to add */ 529 struct work_struct update_pages_work; 530 struct completion update_done; 531 532 struct rb_irq_work irq_work; 533 }; 534 535 struct trace_buffer { 536 unsigned flags; 537 int cpus; 538 atomic_t record_disabled; 539 atomic_t resizing; 540 cpumask_var_t cpumask; 541 542 struct lock_class_key *reader_lock_key; 543 544 struct mutex mutex; 545 546 struct ring_buffer_per_cpu **buffers; 547 548 struct hlist_node node; 549 u64 (*clock)(void); 550 551 struct rb_irq_work irq_work; 552 bool time_stamp_abs; 553 554 unsigned long range_addr_start; 555 unsigned long range_addr_end; 556 557 struct ring_buffer_meta *meta; 558 559 unsigned int subbuf_size; 560 unsigned int subbuf_order; 561 unsigned int max_data_size; 562 }; 563 564 struct ring_buffer_iter { 565 struct ring_buffer_per_cpu *cpu_buffer; 566 unsigned long head; 567 unsigned long next_event; 568 struct buffer_page *head_page; 569 struct buffer_page *cache_reader_page; 570 unsigned long cache_read; 571 unsigned long cache_pages_removed; 572 u64 read_stamp; 573 u64 page_stamp; 574 struct ring_buffer_event *event; 575 size_t event_size; 576 int missed_events; 577 }; 578 579 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 580 { 581 struct buffer_data_page field; 582 583 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 584 "offset:0;\tsize:%u;\tsigned:%u;\n", 585 (unsigned int)sizeof(field.time_stamp), 586 (unsigned int)is_signed_type(u64)); 587 588 trace_seq_printf(s, "\tfield: local_t commit;\t" 589 "offset:%u;\tsize:%u;\tsigned:%u;\n", 590 (unsigned int)offsetof(typeof(field), commit), 591 (unsigned int)sizeof(field.commit), 592 (unsigned int)is_signed_type(long)); 593 594 trace_seq_printf(s, "\tfield: int overwrite;\t" 595 "offset:%u;\tsize:%u;\tsigned:%u;\n", 596 (unsigned int)offsetof(typeof(field), commit), 597 1, 598 (unsigned int)is_signed_type(long)); 599 600 trace_seq_printf(s, "\tfield: char data;\t" 601 "offset:%u;\tsize:%u;\tsigned:%u;\n", 602 (unsigned int)offsetof(typeof(field), data), 603 (unsigned int)buffer->subbuf_size, 604 (unsigned int)is_signed_type(char)); 605 606 return !trace_seq_has_overflowed(s); 607 } 608 609 static inline void rb_time_read(rb_time_t *t, u64 *ret) 610 { 611 *ret = local64_read(&t->time); 612 } 613 static void rb_time_set(rb_time_t *t, u64 val) 614 { 615 local64_set(&t->time, val); 616 } 617 618 /* 619 * Enable this to make sure that the event passed to 620 * ring_buffer_event_time_stamp() is not committed and also 621 * is on the buffer that it passed in. 622 */ 623 //#define RB_VERIFY_EVENT 624 #ifdef RB_VERIFY_EVENT 625 static struct list_head *rb_list_head(struct list_head *list); 626 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 627 void *event) 628 { 629 struct buffer_page *page = cpu_buffer->commit_page; 630 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 631 struct list_head *next; 632 long commit, write; 633 unsigned long addr = (unsigned long)event; 634 bool done = false; 635 int stop = 0; 636 637 /* Make sure the event exists and is not committed yet */ 638 do { 639 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 640 done = true; 641 commit = local_read(&page->page->commit); 642 write = local_read(&page->write); 643 if (addr >= (unsigned long)&page->page->data[commit] && 644 addr < (unsigned long)&page->page->data[write]) 645 return; 646 647 next = rb_list_head(page->list.next); 648 page = list_entry(next, struct buffer_page, list); 649 } while (!done); 650 WARN_ON_ONCE(1); 651 } 652 #else 653 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 654 void *event) 655 { 656 } 657 #endif 658 659 /* 660 * The absolute time stamp drops the 5 MSBs and some clocks may 661 * require them. The rb_fix_abs_ts() will take a previous full 662 * time stamp, and add the 5 MSB of that time stamp on to the 663 * saved absolute time stamp. Then they are compared in case of 664 * the unlikely event that the latest time stamp incremented 665 * the 5 MSB. 666 */ 667 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 668 { 669 if (save_ts & TS_MSB) { 670 abs |= save_ts & TS_MSB; 671 /* Check for overflow */ 672 if (unlikely(abs < save_ts)) 673 abs += 1ULL << 59; 674 } 675 return abs; 676 } 677 678 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 679 680 /** 681 * ring_buffer_event_time_stamp - return the event's current time stamp 682 * @buffer: The buffer that the event is on 683 * @event: the event to get the time stamp of 684 * 685 * Note, this must be called after @event is reserved, and before it is 686 * committed to the ring buffer. And must be called from the same 687 * context where the event was reserved (normal, softirq, irq, etc). 688 * 689 * Returns the time stamp associated with the current event. 690 * If the event has an extended time stamp, then that is used as 691 * the time stamp to return. 692 * In the highly unlikely case that the event was nested more than 693 * the max nesting, then the write_stamp of the buffer is returned, 694 * otherwise current time is returned, but that really neither of 695 * the last two cases should ever happen. 696 */ 697 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 698 struct ring_buffer_event *event) 699 { 700 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 701 unsigned int nest; 702 u64 ts; 703 704 /* If the event includes an absolute time, then just use that */ 705 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 706 ts = rb_event_time_stamp(event); 707 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 708 } 709 710 nest = local_read(&cpu_buffer->committing); 711 verify_event(cpu_buffer, event); 712 if (WARN_ON_ONCE(!nest)) 713 goto fail; 714 715 /* Read the current saved nesting level time stamp */ 716 if (likely(--nest < MAX_NEST)) 717 return cpu_buffer->event_stamp[nest]; 718 719 /* Shouldn't happen, warn if it does */ 720 WARN_ONCE(1, "nest (%d) greater than max", nest); 721 722 fail: 723 rb_time_read(&cpu_buffer->write_stamp, &ts); 724 725 return ts; 726 } 727 728 /** 729 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 730 * @buffer: The ring_buffer to get the number of pages from 731 * @cpu: The cpu of the ring_buffer to get the number of pages from 732 * 733 * Returns the number of pages that have content in the ring buffer. 734 */ 735 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 736 { 737 size_t read; 738 size_t lost; 739 size_t cnt; 740 741 read = local_read(&buffer->buffers[cpu]->pages_read); 742 lost = local_read(&buffer->buffers[cpu]->pages_lost); 743 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 744 745 if (WARN_ON_ONCE(cnt < lost)) 746 return 0; 747 748 cnt -= lost; 749 750 /* The reader can read an empty page, but not more than that */ 751 if (cnt < read) { 752 WARN_ON_ONCE(read > cnt + 1); 753 return 0; 754 } 755 756 return cnt - read; 757 } 758 759 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 760 { 761 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 762 size_t nr_pages; 763 size_t dirty; 764 765 nr_pages = cpu_buffer->nr_pages; 766 if (!nr_pages || !full) 767 return true; 768 769 /* 770 * Add one as dirty will never equal nr_pages, as the sub-buffer 771 * that the writer is on is not counted as dirty. 772 * This is needed if "buffer_percent" is set to 100. 773 */ 774 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 775 776 return (dirty * 100) >= (full * nr_pages); 777 } 778 779 /* 780 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 781 * 782 * Schedules a delayed work to wake up any task that is blocked on the 783 * ring buffer waiters queue. 784 */ 785 static void rb_wake_up_waiters(struct irq_work *work) 786 { 787 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 788 789 /* For waiters waiting for the first wake up */ 790 (void)atomic_fetch_inc_release(&rbwork->seq); 791 792 wake_up_all(&rbwork->waiters); 793 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 794 /* Only cpu_buffer sets the above flags */ 795 struct ring_buffer_per_cpu *cpu_buffer = 796 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 797 798 /* Called from interrupt context */ 799 raw_spin_lock(&cpu_buffer->reader_lock); 800 rbwork->wakeup_full = false; 801 rbwork->full_waiters_pending = false; 802 803 /* Waking up all waiters, they will reset the shortest full */ 804 cpu_buffer->shortest_full = 0; 805 raw_spin_unlock(&cpu_buffer->reader_lock); 806 807 wake_up_all(&rbwork->full_waiters); 808 } 809 } 810 811 /** 812 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 813 * @buffer: The ring buffer to wake waiters on 814 * @cpu: The CPU buffer to wake waiters on 815 * 816 * In the case of a file that represents a ring buffer is closing, 817 * it is prudent to wake up any waiters that are on this. 818 */ 819 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 820 { 821 struct ring_buffer_per_cpu *cpu_buffer; 822 struct rb_irq_work *rbwork; 823 824 if (!buffer) 825 return; 826 827 if (cpu == RING_BUFFER_ALL_CPUS) { 828 829 /* Wake up individual ones too. One level recursion */ 830 for_each_buffer_cpu(buffer, cpu) 831 ring_buffer_wake_waiters(buffer, cpu); 832 833 rbwork = &buffer->irq_work; 834 } else { 835 if (WARN_ON_ONCE(!buffer->buffers)) 836 return; 837 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 838 return; 839 840 cpu_buffer = buffer->buffers[cpu]; 841 /* The CPU buffer may not have been initialized yet */ 842 if (!cpu_buffer) 843 return; 844 rbwork = &cpu_buffer->irq_work; 845 } 846 847 /* This can be called in any context */ 848 irq_work_queue(&rbwork->work); 849 } 850 851 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 852 { 853 struct ring_buffer_per_cpu *cpu_buffer; 854 bool ret = false; 855 856 /* Reads of all CPUs always waits for any data */ 857 if (cpu == RING_BUFFER_ALL_CPUS) 858 return !ring_buffer_empty(buffer); 859 860 cpu_buffer = buffer->buffers[cpu]; 861 862 if (!ring_buffer_empty_cpu(buffer, cpu)) { 863 unsigned long flags; 864 bool pagebusy; 865 866 if (!full) 867 return true; 868 869 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 870 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 871 ret = !pagebusy && full_hit(buffer, cpu, full); 872 873 if (!ret && (!cpu_buffer->shortest_full || 874 cpu_buffer->shortest_full > full)) { 875 cpu_buffer->shortest_full = full; 876 } 877 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 878 } 879 return ret; 880 } 881 882 static inline bool 883 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 884 int cpu, int full, ring_buffer_cond_fn cond, void *data) 885 { 886 if (rb_watermark_hit(buffer, cpu, full)) 887 return true; 888 889 if (cond(data)) 890 return true; 891 892 /* 893 * The events can happen in critical sections where 894 * checking a work queue can cause deadlocks. 895 * After adding a task to the queue, this flag is set 896 * only to notify events to try to wake up the queue 897 * using irq_work. 898 * 899 * We don't clear it even if the buffer is no longer 900 * empty. The flag only causes the next event to run 901 * irq_work to do the work queue wake up. The worse 902 * that can happen if we race with !trace_empty() is that 903 * an event will cause an irq_work to try to wake up 904 * an empty queue. 905 * 906 * There's no reason to protect this flag either, as 907 * the work queue and irq_work logic will do the necessary 908 * synchronization for the wake ups. The only thing 909 * that is necessary is that the wake up happens after 910 * a task has been queued. It's OK for spurious wake ups. 911 */ 912 if (full) 913 rbwork->full_waiters_pending = true; 914 else 915 rbwork->waiters_pending = true; 916 917 return false; 918 } 919 920 struct rb_wait_data { 921 struct rb_irq_work *irq_work; 922 int seq; 923 }; 924 925 /* 926 * The default wait condition for ring_buffer_wait() is to just to exit the 927 * wait loop the first time it is woken up. 928 */ 929 static bool rb_wait_once(void *data) 930 { 931 struct rb_wait_data *rdata = data; 932 struct rb_irq_work *rbwork = rdata->irq_work; 933 934 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 935 } 936 937 /** 938 * ring_buffer_wait - wait for input to the ring buffer 939 * @buffer: buffer to wait on 940 * @cpu: the cpu buffer to wait on 941 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 942 * @cond: condition function to break out of wait (NULL to run once) 943 * @data: the data to pass to @cond. 944 * 945 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 946 * as data is added to any of the @buffer's cpu buffers. Otherwise 947 * it will wait for data to be added to a specific cpu buffer. 948 */ 949 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 950 ring_buffer_cond_fn cond, void *data) 951 { 952 struct ring_buffer_per_cpu *cpu_buffer; 953 struct wait_queue_head *waitq; 954 struct rb_irq_work *rbwork; 955 struct rb_wait_data rdata; 956 int ret = 0; 957 958 /* 959 * Depending on what the caller is waiting for, either any 960 * data in any cpu buffer, or a specific buffer, put the 961 * caller on the appropriate wait queue. 962 */ 963 if (cpu == RING_BUFFER_ALL_CPUS) { 964 rbwork = &buffer->irq_work; 965 /* Full only makes sense on per cpu reads */ 966 full = 0; 967 } else { 968 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 969 return -ENODEV; 970 cpu_buffer = buffer->buffers[cpu]; 971 rbwork = &cpu_buffer->irq_work; 972 } 973 974 if (full) 975 waitq = &rbwork->full_waiters; 976 else 977 waitq = &rbwork->waiters; 978 979 /* Set up to exit loop as soon as it is woken */ 980 if (!cond) { 981 cond = rb_wait_once; 982 rdata.irq_work = rbwork; 983 rdata.seq = atomic_read_acquire(&rbwork->seq); 984 data = &rdata; 985 } 986 987 ret = wait_event_interruptible((*waitq), 988 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 989 990 return ret; 991 } 992 993 /** 994 * ring_buffer_poll_wait - poll on buffer input 995 * @buffer: buffer to wait on 996 * @cpu: the cpu buffer to wait on 997 * @filp: the file descriptor 998 * @poll_table: The poll descriptor 999 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 1000 * 1001 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 1002 * as data is added to any of the @buffer's cpu buffers. Otherwise 1003 * it will wait for data to be added to a specific cpu buffer. 1004 * 1005 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 1006 * zero otherwise. 1007 */ 1008 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 1009 struct file *filp, poll_table *poll_table, int full) 1010 { 1011 struct ring_buffer_per_cpu *cpu_buffer; 1012 struct rb_irq_work *rbwork; 1013 1014 if (cpu == RING_BUFFER_ALL_CPUS) { 1015 rbwork = &buffer->irq_work; 1016 full = 0; 1017 } else { 1018 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 1019 return EPOLLERR; 1020 1021 cpu_buffer = buffer->buffers[cpu]; 1022 rbwork = &cpu_buffer->irq_work; 1023 } 1024 1025 if (full) { 1026 poll_wait(filp, &rbwork->full_waiters, poll_table); 1027 1028 if (rb_watermark_hit(buffer, cpu, full)) 1029 return EPOLLIN | EPOLLRDNORM; 1030 /* 1031 * Only allow full_waiters_pending update to be seen after 1032 * the shortest_full is set (in rb_watermark_hit). If the 1033 * writer sees the full_waiters_pending flag set, it will 1034 * compare the amount in the ring buffer to shortest_full. 1035 * If the amount in the ring buffer is greater than the 1036 * shortest_full percent, it will call the irq_work handler 1037 * to wake up this list. The irq_handler will reset shortest_full 1038 * back to zero. That's done under the reader_lock, but 1039 * the below smp_mb() makes sure that the update to 1040 * full_waiters_pending doesn't leak up into the above. 1041 */ 1042 smp_mb(); 1043 rbwork->full_waiters_pending = true; 1044 return 0; 1045 } 1046 1047 poll_wait(filp, &rbwork->waiters, poll_table); 1048 rbwork->waiters_pending = true; 1049 1050 /* 1051 * There's a tight race between setting the waiters_pending and 1052 * checking if the ring buffer is empty. Once the waiters_pending bit 1053 * is set, the next event will wake the task up, but we can get stuck 1054 * if there's only a single event in. 1055 * 1056 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1057 * but adding a memory barrier to all events will cause too much of a 1058 * performance hit in the fast path. We only need a memory barrier when 1059 * the buffer goes from empty to having content. But as this race is 1060 * extremely small, and it's not a problem if another event comes in, we 1061 * will fix it later. 1062 */ 1063 smp_mb(); 1064 1065 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1066 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1067 return EPOLLIN | EPOLLRDNORM; 1068 return 0; 1069 } 1070 1071 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1072 #define RB_WARN_ON(b, cond) \ 1073 ({ \ 1074 int _____ret = unlikely(cond); \ 1075 if (_____ret) { \ 1076 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1077 struct ring_buffer_per_cpu *__b = \ 1078 (void *)b; \ 1079 atomic_inc(&__b->buffer->record_disabled); \ 1080 } else \ 1081 atomic_inc(&b->record_disabled); \ 1082 WARN_ON(1); \ 1083 } \ 1084 _____ret; \ 1085 }) 1086 1087 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1088 #define DEBUG_SHIFT 0 1089 1090 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1091 { 1092 u64 ts; 1093 1094 /* Skip retpolines :-( */ 1095 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1096 ts = trace_clock_local(); 1097 else 1098 ts = buffer->clock(); 1099 1100 /* shift to debug/test normalization and TIME_EXTENTS */ 1101 return ts << DEBUG_SHIFT; 1102 } 1103 1104 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1105 { 1106 u64 time; 1107 1108 preempt_disable_notrace(); 1109 time = rb_time_stamp(buffer); 1110 preempt_enable_notrace(); 1111 1112 return time; 1113 } 1114 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1115 1116 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1117 int cpu, u64 *ts) 1118 { 1119 /* Just stupid testing the normalize function and deltas */ 1120 *ts >>= DEBUG_SHIFT; 1121 } 1122 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1123 1124 /* 1125 * Making the ring buffer lockless makes things tricky. 1126 * Although writes only happen on the CPU that they are on, 1127 * and they only need to worry about interrupts. Reads can 1128 * happen on any CPU. 1129 * 1130 * The reader page is always off the ring buffer, but when the 1131 * reader finishes with a page, it needs to swap its page with 1132 * a new one from the buffer. The reader needs to take from 1133 * the head (writes go to the tail). But if a writer is in overwrite 1134 * mode and wraps, it must push the head page forward. 1135 * 1136 * Here lies the problem. 1137 * 1138 * The reader must be careful to replace only the head page, and 1139 * not another one. As described at the top of the file in the 1140 * ASCII art, the reader sets its old page to point to the next 1141 * page after head. It then sets the page after head to point to 1142 * the old reader page. But if the writer moves the head page 1143 * during this operation, the reader could end up with the tail. 1144 * 1145 * We use cmpxchg to help prevent this race. We also do something 1146 * special with the page before head. We set the LSB to 1. 1147 * 1148 * When the writer must push the page forward, it will clear the 1149 * bit that points to the head page, move the head, and then set 1150 * the bit that points to the new head page. 1151 * 1152 * We also don't want an interrupt coming in and moving the head 1153 * page on another writer. Thus we use the second LSB to catch 1154 * that too. Thus: 1155 * 1156 * head->list->prev->next bit 1 bit 0 1157 * ------- ------- 1158 * Normal page 0 0 1159 * Points to head page 0 1 1160 * New head page 1 0 1161 * 1162 * Note we can not trust the prev pointer of the head page, because: 1163 * 1164 * +----+ +-----+ +-----+ 1165 * | |------>| T |---X--->| N | 1166 * | |<------| | | | 1167 * +----+ +-----+ +-----+ 1168 * ^ ^ | 1169 * | +-----+ | | 1170 * +----------| R |----------+ | 1171 * | |<-----------+ 1172 * +-----+ 1173 * 1174 * Key: ---X--> HEAD flag set in pointer 1175 * T Tail page 1176 * R Reader page 1177 * N Next page 1178 * 1179 * (see __rb_reserve_next() to see where this happens) 1180 * 1181 * What the above shows is that the reader just swapped out 1182 * the reader page with a page in the buffer, but before it 1183 * could make the new header point back to the new page added 1184 * it was preempted by a writer. The writer moved forward onto 1185 * the new page added by the reader and is about to move forward 1186 * again. 1187 * 1188 * You can see, it is legitimate for the previous pointer of 1189 * the head (or any page) not to point back to itself. But only 1190 * temporarily. 1191 */ 1192 1193 #define RB_PAGE_NORMAL 0UL 1194 #define RB_PAGE_HEAD 1UL 1195 #define RB_PAGE_UPDATE 2UL 1196 1197 1198 #define RB_FLAG_MASK 3UL 1199 1200 /* PAGE_MOVED is not part of the mask */ 1201 #define RB_PAGE_MOVED 4UL 1202 1203 /* 1204 * rb_list_head - remove any bit 1205 */ 1206 static struct list_head *rb_list_head(struct list_head *list) 1207 { 1208 unsigned long val = (unsigned long)list; 1209 1210 return (struct list_head *)(val & ~RB_FLAG_MASK); 1211 } 1212 1213 /* 1214 * rb_is_head_page - test if the given page is the head page 1215 * 1216 * Because the reader may move the head_page pointer, we can 1217 * not trust what the head page is (it may be pointing to 1218 * the reader page). But if the next page is a header page, 1219 * its flags will be non zero. 1220 */ 1221 static inline int 1222 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1223 { 1224 unsigned long val; 1225 1226 val = (unsigned long)list->next; 1227 1228 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1229 return RB_PAGE_MOVED; 1230 1231 return val & RB_FLAG_MASK; 1232 } 1233 1234 /* 1235 * rb_is_reader_page 1236 * 1237 * The unique thing about the reader page, is that, if the 1238 * writer is ever on it, the previous pointer never points 1239 * back to the reader page. 1240 */ 1241 static bool rb_is_reader_page(struct buffer_page *page) 1242 { 1243 struct list_head *list = page->list.prev; 1244 1245 return rb_list_head(list->next) != &page->list; 1246 } 1247 1248 /* 1249 * rb_set_list_to_head - set a list_head to be pointing to head. 1250 */ 1251 static void rb_set_list_to_head(struct list_head *list) 1252 { 1253 unsigned long *ptr; 1254 1255 ptr = (unsigned long *)&list->next; 1256 *ptr |= RB_PAGE_HEAD; 1257 *ptr &= ~RB_PAGE_UPDATE; 1258 } 1259 1260 /* 1261 * rb_head_page_activate - sets up head page 1262 */ 1263 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1264 { 1265 struct buffer_page *head; 1266 1267 head = cpu_buffer->head_page; 1268 if (!head) 1269 return; 1270 1271 /* 1272 * Set the previous list pointer to have the HEAD flag. 1273 */ 1274 rb_set_list_to_head(head->list.prev); 1275 1276 if (cpu_buffer->ring_meta) { 1277 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1278 meta->head_buffer = (unsigned long)head->page; 1279 } 1280 } 1281 1282 static void rb_list_head_clear(struct list_head *list) 1283 { 1284 unsigned long *ptr = (unsigned long *)&list->next; 1285 1286 *ptr &= ~RB_FLAG_MASK; 1287 } 1288 1289 /* 1290 * rb_head_page_deactivate - clears head page ptr (for free list) 1291 */ 1292 static void 1293 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1294 { 1295 struct list_head *hd; 1296 1297 /* Go through the whole list and clear any pointers found. */ 1298 rb_list_head_clear(cpu_buffer->pages); 1299 1300 list_for_each(hd, cpu_buffer->pages) 1301 rb_list_head_clear(hd); 1302 } 1303 1304 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1305 struct buffer_page *head, 1306 struct buffer_page *prev, 1307 int old_flag, int new_flag) 1308 { 1309 struct list_head *list; 1310 unsigned long val = (unsigned long)&head->list; 1311 unsigned long ret; 1312 1313 list = &prev->list; 1314 1315 val &= ~RB_FLAG_MASK; 1316 1317 ret = cmpxchg((unsigned long *)&list->next, 1318 val | old_flag, val | new_flag); 1319 1320 /* check if the reader took the page */ 1321 if ((ret & ~RB_FLAG_MASK) != val) 1322 return RB_PAGE_MOVED; 1323 1324 return ret & RB_FLAG_MASK; 1325 } 1326 1327 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1328 struct buffer_page *head, 1329 struct buffer_page *prev, 1330 int old_flag) 1331 { 1332 return rb_head_page_set(cpu_buffer, head, prev, 1333 old_flag, RB_PAGE_UPDATE); 1334 } 1335 1336 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1337 struct buffer_page *head, 1338 struct buffer_page *prev, 1339 int old_flag) 1340 { 1341 return rb_head_page_set(cpu_buffer, head, prev, 1342 old_flag, RB_PAGE_HEAD); 1343 } 1344 1345 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1346 struct buffer_page *head, 1347 struct buffer_page *prev, 1348 int old_flag) 1349 { 1350 return rb_head_page_set(cpu_buffer, head, prev, 1351 old_flag, RB_PAGE_NORMAL); 1352 } 1353 1354 static inline void rb_inc_page(struct buffer_page **bpage) 1355 { 1356 struct list_head *p = rb_list_head((*bpage)->list.next); 1357 1358 *bpage = list_entry(p, struct buffer_page, list); 1359 } 1360 1361 static inline void rb_dec_page(struct buffer_page **bpage) 1362 { 1363 struct list_head *p = rb_list_head((*bpage)->list.prev); 1364 1365 *bpage = list_entry(p, struct buffer_page, list); 1366 } 1367 1368 static struct buffer_page * 1369 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1370 { 1371 struct buffer_page *head; 1372 struct buffer_page *page; 1373 struct list_head *list; 1374 int i; 1375 1376 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1377 return NULL; 1378 1379 /* sanity check */ 1380 list = cpu_buffer->pages; 1381 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1382 return NULL; 1383 1384 page = head = cpu_buffer->head_page; 1385 /* 1386 * It is possible that the writer moves the header behind 1387 * where we started, and we miss in one loop. 1388 * A second loop should grab the header, but we'll do 1389 * three loops just because I'm paranoid. 1390 */ 1391 for (i = 0; i < 3; i++) { 1392 do { 1393 if (rb_is_head_page(page, page->list.prev)) { 1394 cpu_buffer->head_page = page; 1395 return page; 1396 } 1397 rb_inc_page(&page); 1398 } while (page != head); 1399 } 1400 1401 RB_WARN_ON(cpu_buffer, 1); 1402 1403 return NULL; 1404 } 1405 1406 static bool rb_head_page_replace(struct buffer_page *old, 1407 struct buffer_page *new) 1408 { 1409 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1410 unsigned long val; 1411 1412 val = *ptr & ~RB_FLAG_MASK; 1413 val |= RB_PAGE_HEAD; 1414 1415 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1416 } 1417 1418 /* 1419 * rb_tail_page_update - move the tail page forward 1420 */ 1421 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1422 struct buffer_page *tail_page, 1423 struct buffer_page *next_page) 1424 { 1425 unsigned long old_entries; 1426 unsigned long old_write; 1427 1428 /* 1429 * The tail page now needs to be moved forward. 1430 * 1431 * We need to reset the tail page, but without messing 1432 * with possible erasing of data brought in by interrupts 1433 * that have moved the tail page and are currently on it. 1434 * 1435 * We add a counter to the write field to denote this. 1436 */ 1437 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1438 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1439 1440 /* 1441 * Just make sure we have seen our old_write and synchronize 1442 * with any interrupts that come in. 1443 */ 1444 barrier(); 1445 1446 /* 1447 * If the tail page is still the same as what we think 1448 * it is, then it is up to us to update the tail 1449 * pointer. 1450 */ 1451 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1452 /* Zero the write counter */ 1453 unsigned long val = old_write & ~RB_WRITE_MASK; 1454 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1455 1456 /* 1457 * This will only succeed if an interrupt did 1458 * not come in and change it. In which case, we 1459 * do not want to modify it. 1460 * 1461 * We add (void) to let the compiler know that we do not care 1462 * about the return value of these functions. We use the 1463 * cmpxchg to only update if an interrupt did not already 1464 * do it for us. If the cmpxchg fails, we don't care. 1465 */ 1466 (void)local_cmpxchg(&next_page->write, old_write, val); 1467 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1468 1469 /* 1470 * No need to worry about races with clearing out the commit. 1471 * it only can increment when a commit takes place. But that 1472 * only happens in the outer most nested commit. 1473 */ 1474 local_set(&next_page->page->commit, 0); 1475 1476 /* Either we update tail_page or an interrupt does */ 1477 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1478 local_inc(&cpu_buffer->pages_touched); 1479 } 1480 } 1481 1482 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1483 struct buffer_page *bpage) 1484 { 1485 unsigned long val = (unsigned long)bpage; 1486 1487 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1488 } 1489 1490 static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer, 1491 struct list_head *list) 1492 { 1493 if (RB_WARN_ON(cpu_buffer, 1494 rb_list_head(rb_list_head(list->next)->prev) != list)) 1495 return false; 1496 1497 if (RB_WARN_ON(cpu_buffer, 1498 rb_list_head(rb_list_head(list->prev)->next) != list)) 1499 return false; 1500 1501 return true; 1502 } 1503 1504 /** 1505 * rb_check_pages - integrity check of buffer pages 1506 * @cpu_buffer: CPU buffer with pages to test 1507 * 1508 * As a safety measure we check to make sure the data pages have not 1509 * been corrupted. 1510 */ 1511 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1512 { 1513 struct list_head *head, *tmp; 1514 unsigned long buffer_cnt; 1515 unsigned long flags; 1516 int nr_loops = 0; 1517 1518 /* 1519 * Walk the linked list underpinning the ring buffer and validate all 1520 * its next and prev links. 1521 * 1522 * The check acquires the reader_lock to avoid concurrent processing 1523 * with code that could be modifying the list. However, the lock cannot 1524 * be held for the entire duration of the walk, as this would make the 1525 * time when interrupts are disabled non-deterministic, dependent on the 1526 * ring buffer size. Therefore, the code releases and re-acquires the 1527 * lock after checking each page. The ring_buffer_per_cpu.cnt variable 1528 * is then used to detect if the list was modified while the lock was 1529 * not held, in which case the check needs to be restarted. 1530 * 1531 * The code attempts to perform the check at most three times before 1532 * giving up. This is acceptable because this is only a self-validation 1533 * to detect problems early on. In practice, the list modification 1534 * operations are fairly spaced, and so this check typically succeeds at 1535 * most on the second try. 1536 */ 1537 again: 1538 if (++nr_loops > 3) 1539 return; 1540 1541 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1542 head = rb_list_head(cpu_buffer->pages); 1543 if (!rb_check_links(cpu_buffer, head)) 1544 goto out_locked; 1545 buffer_cnt = cpu_buffer->cnt; 1546 tmp = head; 1547 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1548 1549 while (true) { 1550 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1551 1552 if (buffer_cnt != cpu_buffer->cnt) { 1553 /* The list was updated, try again. */ 1554 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1555 goto again; 1556 } 1557 1558 tmp = rb_list_head(tmp->next); 1559 if (tmp == head) 1560 /* The iteration circled back, all is done. */ 1561 goto out_locked; 1562 1563 if (!rb_check_links(cpu_buffer, tmp)) 1564 goto out_locked; 1565 1566 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1567 } 1568 1569 out_locked: 1570 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1571 } 1572 1573 /* 1574 * Take an address, add the meta data size as well as the array of 1575 * array subbuffer indexes, then align it to a subbuffer size. 1576 * 1577 * This is used to help find the next per cpu subbuffer within a mapped range. 1578 */ 1579 static unsigned long 1580 rb_range_align_subbuf(unsigned long addr, int subbuf_size, int nr_subbufs) 1581 { 1582 addr += sizeof(struct ring_buffer_cpu_meta) + 1583 sizeof(int) * nr_subbufs; 1584 return ALIGN(addr, subbuf_size); 1585 } 1586 1587 /* 1588 * Return the ring_buffer_meta for a given @cpu. 1589 */ 1590 static void *rb_range_meta(struct trace_buffer *buffer, int nr_pages, int cpu) 1591 { 1592 int subbuf_size = buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 1593 struct ring_buffer_cpu_meta *meta; 1594 struct ring_buffer_meta *bmeta; 1595 unsigned long ptr; 1596 int nr_subbufs; 1597 1598 bmeta = buffer->meta; 1599 if (!bmeta) 1600 return NULL; 1601 1602 ptr = (unsigned long)bmeta + bmeta->buffers_offset; 1603 meta = (struct ring_buffer_cpu_meta *)ptr; 1604 1605 /* When nr_pages passed in is zero, the first meta has already been initialized */ 1606 if (!nr_pages) { 1607 nr_subbufs = meta->nr_subbufs; 1608 } else { 1609 /* Include the reader page */ 1610 nr_subbufs = nr_pages + 1; 1611 } 1612 1613 /* 1614 * The first chunk may not be subbuffer aligned, where as 1615 * the rest of the chunks are. 1616 */ 1617 if (cpu) { 1618 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1619 ptr += subbuf_size * nr_subbufs; 1620 1621 /* We can use multiplication to find chunks greater than 1 */ 1622 if (cpu > 1) { 1623 unsigned long size; 1624 unsigned long p; 1625 1626 /* Save the beginning of this CPU chunk */ 1627 p = ptr; 1628 ptr = rb_range_align_subbuf(ptr, subbuf_size, nr_subbufs); 1629 ptr += subbuf_size * nr_subbufs; 1630 1631 /* Now all chunks after this are the same size */ 1632 size = ptr - p; 1633 ptr += size * (cpu - 2); 1634 } 1635 } 1636 return (void *)ptr; 1637 } 1638 1639 /* Return the start of subbufs given the meta pointer */ 1640 static void *rb_subbufs_from_meta(struct ring_buffer_cpu_meta *meta) 1641 { 1642 int subbuf_size = meta->subbuf_size; 1643 unsigned long ptr; 1644 1645 ptr = (unsigned long)meta; 1646 ptr = rb_range_align_subbuf(ptr, subbuf_size, meta->nr_subbufs); 1647 1648 return (void *)ptr; 1649 } 1650 1651 /* 1652 * Return a specific sub-buffer for a given @cpu defined by @idx. 1653 */ 1654 static void *rb_range_buffer(struct ring_buffer_per_cpu *cpu_buffer, int idx) 1655 { 1656 struct ring_buffer_cpu_meta *meta; 1657 unsigned long ptr; 1658 int subbuf_size; 1659 1660 meta = rb_range_meta(cpu_buffer->buffer, 0, cpu_buffer->cpu); 1661 if (!meta) 1662 return NULL; 1663 1664 if (WARN_ON_ONCE(idx >= meta->nr_subbufs)) 1665 return NULL; 1666 1667 subbuf_size = meta->subbuf_size; 1668 1669 /* Map this buffer to the order that's in meta->buffers[] */ 1670 idx = meta->buffers[idx]; 1671 1672 ptr = (unsigned long)rb_subbufs_from_meta(meta); 1673 1674 ptr += subbuf_size * idx; 1675 if (ptr + subbuf_size > cpu_buffer->buffer->range_addr_end) 1676 return NULL; 1677 1678 return (void *)ptr; 1679 } 1680 1681 /* 1682 * See if the existing memory contains a valid meta section. 1683 * if so, use that, otherwise initialize it. 1684 */ 1685 static bool rb_meta_init(struct trace_buffer *buffer, int scratch_size) 1686 { 1687 unsigned long ptr = buffer->range_addr_start; 1688 struct ring_buffer_meta *bmeta; 1689 unsigned long total_size; 1690 int struct_sizes; 1691 1692 bmeta = (struct ring_buffer_meta *)ptr; 1693 buffer->meta = bmeta; 1694 1695 total_size = buffer->range_addr_end - buffer->range_addr_start; 1696 1697 struct_sizes = sizeof(struct ring_buffer_cpu_meta); 1698 struct_sizes |= sizeof(*bmeta) << 16; 1699 1700 /* The first buffer will start word size after the meta page */ 1701 ptr += sizeof(*bmeta); 1702 ptr = ALIGN(ptr, sizeof(long)); 1703 ptr += scratch_size; 1704 1705 if (bmeta->magic != RING_BUFFER_META_MAGIC) { 1706 pr_info("Ring buffer boot meta mismatch of magic\n"); 1707 goto init; 1708 } 1709 1710 if (bmeta->struct_sizes != struct_sizes) { 1711 pr_info("Ring buffer boot meta mismatch of struct size\n"); 1712 goto init; 1713 } 1714 1715 if (bmeta->total_size != total_size) { 1716 pr_info("Ring buffer boot meta mismatch of total size\n"); 1717 goto init; 1718 } 1719 1720 if (bmeta->buffers_offset > bmeta->total_size) { 1721 pr_info("Ring buffer boot meta mismatch of offset outside of total size\n"); 1722 goto init; 1723 } 1724 1725 if (bmeta->buffers_offset != (void *)ptr - (void *)bmeta) { 1726 pr_info("Ring buffer boot meta mismatch of first buffer offset\n"); 1727 goto init; 1728 } 1729 1730 return true; 1731 1732 init: 1733 bmeta->magic = RING_BUFFER_META_MAGIC; 1734 bmeta->struct_sizes = struct_sizes; 1735 bmeta->total_size = total_size; 1736 bmeta->buffers_offset = (void *)ptr - (void *)bmeta; 1737 1738 /* Zero out the scatch pad */ 1739 memset((void *)bmeta + sizeof(*bmeta), 0, bmeta->buffers_offset - sizeof(*bmeta)); 1740 1741 return false; 1742 } 1743 1744 /* 1745 * See if the existing memory contains valid ring buffer data. 1746 * As the previous kernel must be the same as this kernel, all 1747 * the calculations (size of buffers and number of buffers) 1748 * must be the same. 1749 */ 1750 static bool rb_cpu_meta_valid(struct ring_buffer_cpu_meta *meta, int cpu, 1751 struct trace_buffer *buffer, int nr_pages, 1752 unsigned long *subbuf_mask) 1753 { 1754 int subbuf_size = PAGE_SIZE; 1755 struct buffer_data_page *subbuf; 1756 unsigned long buffers_start; 1757 unsigned long buffers_end; 1758 int i; 1759 1760 if (!subbuf_mask) 1761 return false; 1762 1763 buffers_start = meta->first_buffer; 1764 buffers_end = meta->first_buffer + (subbuf_size * meta->nr_subbufs); 1765 1766 /* Is the head and commit buffers within the range of buffers? */ 1767 if (meta->head_buffer < buffers_start || 1768 meta->head_buffer >= buffers_end) { 1769 pr_info("Ring buffer boot meta [%d] head buffer out of range\n", cpu); 1770 return false; 1771 } 1772 1773 if (meta->commit_buffer < buffers_start || 1774 meta->commit_buffer >= buffers_end) { 1775 pr_info("Ring buffer boot meta [%d] commit buffer out of range\n", cpu); 1776 return false; 1777 } 1778 1779 subbuf = rb_subbufs_from_meta(meta); 1780 1781 bitmap_clear(subbuf_mask, 0, meta->nr_subbufs); 1782 1783 /* Is the meta buffers and the subbufs themselves have correct data? */ 1784 for (i = 0; i < meta->nr_subbufs; i++) { 1785 if (meta->buffers[i] < 0 || 1786 meta->buffers[i] >= meta->nr_subbufs) { 1787 pr_info("Ring buffer boot meta [%d] array out of range\n", cpu); 1788 return false; 1789 } 1790 1791 if ((unsigned)local_read(&subbuf->commit) > subbuf_size) { 1792 pr_info("Ring buffer boot meta [%d] buffer invalid commit\n", cpu); 1793 return false; 1794 } 1795 1796 if (test_bit(meta->buffers[i], subbuf_mask)) { 1797 pr_info("Ring buffer boot meta [%d] array has duplicates\n", cpu); 1798 return false; 1799 } 1800 1801 set_bit(meta->buffers[i], subbuf_mask); 1802 subbuf = (void *)subbuf + subbuf_size; 1803 } 1804 1805 return true; 1806 } 1807 1808 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf); 1809 1810 static int rb_read_data_buffer(struct buffer_data_page *dpage, int tail, int cpu, 1811 unsigned long long *timestamp, u64 *delta_ptr) 1812 { 1813 struct ring_buffer_event *event; 1814 u64 ts, delta; 1815 int events = 0; 1816 int e; 1817 1818 *delta_ptr = 0; 1819 *timestamp = 0; 1820 1821 ts = dpage->time_stamp; 1822 1823 for (e = 0; e < tail; e += rb_event_length(event)) { 1824 1825 event = (struct ring_buffer_event *)(dpage->data + e); 1826 1827 switch (event->type_len) { 1828 1829 case RINGBUF_TYPE_TIME_EXTEND: 1830 delta = rb_event_time_stamp(event); 1831 ts += delta; 1832 break; 1833 1834 case RINGBUF_TYPE_TIME_STAMP: 1835 delta = rb_event_time_stamp(event); 1836 delta = rb_fix_abs_ts(delta, ts); 1837 if (delta < ts) { 1838 *delta_ptr = delta; 1839 *timestamp = ts; 1840 return -1; 1841 } 1842 ts = delta; 1843 break; 1844 1845 case RINGBUF_TYPE_PADDING: 1846 if (event->time_delta == 1) 1847 break; 1848 fallthrough; 1849 case RINGBUF_TYPE_DATA: 1850 events++; 1851 ts += event->time_delta; 1852 break; 1853 1854 default: 1855 return -1; 1856 } 1857 } 1858 *timestamp = ts; 1859 return events; 1860 } 1861 1862 static int rb_validate_buffer(struct buffer_data_page *dpage, int cpu) 1863 { 1864 unsigned long long ts; 1865 u64 delta; 1866 int tail; 1867 1868 tail = local_read(&dpage->commit); 1869 return rb_read_data_buffer(dpage, tail, cpu, &ts, &delta); 1870 } 1871 1872 /* If the meta data has been validated, now validate the events */ 1873 static void rb_meta_validate_events(struct ring_buffer_per_cpu *cpu_buffer) 1874 { 1875 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 1876 struct buffer_page *head_page, *orig_head; 1877 unsigned long entry_bytes = 0; 1878 unsigned long entries = 0; 1879 int ret; 1880 u64 ts; 1881 int i; 1882 1883 if (!meta || !meta->head_buffer) 1884 return; 1885 1886 /* Do the reader page first */ 1887 ret = rb_validate_buffer(cpu_buffer->reader_page->page, cpu_buffer->cpu); 1888 if (ret < 0) { 1889 pr_info("Ring buffer reader page is invalid\n"); 1890 goto invalid; 1891 } 1892 entries += ret; 1893 entry_bytes += local_read(&cpu_buffer->reader_page->page->commit); 1894 local_set(&cpu_buffer->reader_page->entries, ret); 1895 1896 orig_head = head_page = cpu_buffer->head_page; 1897 ts = head_page->page->time_stamp; 1898 1899 /* 1900 * Try to rewind the head so that we can read the pages which already 1901 * read in the previous boot. 1902 */ 1903 if (head_page == cpu_buffer->tail_page) 1904 goto skip_rewind; 1905 1906 rb_dec_page(&head_page); 1907 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_dec_page(&head_page)) { 1908 1909 /* Rewind until tail (writer) page. */ 1910 if (head_page == cpu_buffer->tail_page) 1911 break; 1912 1913 /* Ensure the page has older data than head. */ 1914 if (ts < head_page->page->time_stamp) 1915 break; 1916 1917 ts = head_page->page->time_stamp; 1918 /* Ensure the page has correct timestamp and some data. */ 1919 if (!ts || rb_page_commit(head_page) == 0) 1920 break; 1921 1922 /* Stop rewind if the page is invalid. */ 1923 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 1924 if (ret < 0) 1925 break; 1926 1927 /* Recover the number of entries and update stats. */ 1928 local_set(&head_page->entries, ret); 1929 if (ret) 1930 local_inc(&cpu_buffer->pages_touched); 1931 entries += ret; 1932 entry_bytes += rb_page_commit(head_page); 1933 } 1934 if (i) 1935 pr_info("Ring buffer [%d] rewound %d pages\n", cpu_buffer->cpu, i); 1936 1937 /* The last rewound page must be skipped. */ 1938 if (head_page != orig_head) 1939 rb_inc_page(&head_page); 1940 1941 /* 1942 * If the ring buffer was rewound, then inject the reader page 1943 * into the location just before the original head page. 1944 */ 1945 if (head_page != orig_head) { 1946 struct buffer_page *bpage = orig_head; 1947 1948 rb_dec_page(&bpage); 1949 /* 1950 * Insert the reader_page before the original head page. 1951 * Since the list encode RB_PAGE flags, general list 1952 * operations should be avoided. 1953 */ 1954 cpu_buffer->reader_page->list.next = &orig_head->list; 1955 cpu_buffer->reader_page->list.prev = orig_head->list.prev; 1956 orig_head->list.prev = &cpu_buffer->reader_page->list; 1957 bpage->list.next = &cpu_buffer->reader_page->list; 1958 1959 /* Make the head_page the reader page */ 1960 cpu_buffer->reader_page = head_page; 1961 bpage = head_page; 1962 rb_inc_page(&head_page); 1963 head_page->list.prev = bpage->list.prev; 1964 rb_dec_page(&bpage); 1965 bpage->list.next = &head_page->list; 1966 rb_set_list_to_head(&bpage->list); 1967 cpu_buffer->pages = &head_page->list; 1968 1969 cpu_buffer->head_page = head_page; 1970 meta->head_buffer = (unsigned long)head_page->page; 1971 1972 /* Reset all the indexes */ 1973 bpage = cpu_buffer->reader_page; 1974 meta->buffers[0] = rb_meta_subbuf_idx(meta, bpage->page); 1975 bpage->id = 0; 1976 1977 for (i = 1, bpage = head_page; i < meta->nr_subbufs; 1978 i++, rb_inc_page(&bpage)) { 1979 meta->buffers[i] = rb_meta_subbuf_idx(meta, bpage->page); 1980 bpage->id = i; 1981 } 1982 1983 /* We'll restart verifying from orig_head */ 1984 head_page = orig_head; 1985 } 1986 1987 skip_rewind: 1988 /* If the commit_buffer is the reader page, update the commit page */ 1989 if (meta->commit_buffer == (unsigned long)cpu_buffer->reader_page->page) { 1990 cpu_buffer->commit_page = cpu_buffer->reader_page; 1991 /* Nothing more to do, the only page is the reader page */ 1992 goto done; 1993 } 1994 1995 /* Iterate until finding the commit page */ 1996 for (i = 0; i < meta->nr_subbufs + 1; i++, rb_inc_page(&head_page)) { 1997 1998 /* Reader page has already been done */ 1999 if (head_page == cpu_buffer->reader_page) 2000 continue; 2001 2002 ret = rb_validate_buffer(head_page->page, cpu_buffer->cpu); 2003 if (ret < 0) { 2004 pr_info("Ring buffer meta [%d] invalid buffer page\n", 2005 cpu_buffer->cpu); 2006 goto invalid; 2007 } 2008 2009 /* If the buffer has content, update pages_touched */ 2010 if (ret) 2011 local_inc(&cpu_buffer->pages_touched); 2012 2013 entries += ret; 2014 entry_bytes += local_read(&head_page->page->commit); 2015 local_set(&cpu_buffer->head_page->entries, ret); 2016 2017 if (head_page == cpu_buffer->commit_page) 2018 break; 2019 } 2020 2021 if (head_page != cpu_buffer->commit_page) { 2022 pr_info("Ring buffer meta [%d] commit page not found\n", 2023 cpu_buffer->cpu); 2024 goto invalid; 2025 } 2026 done: 2027 local_set(&cpu_buffer->entries, entries); 2028 local_set(&cpu_buffer->entries_bytes, entry_bytes); 2029 2030 pr_info("Ring buffer meta [%d] is from previous boot!\n", cpu_buffer->cpu); 2031 return; 2032 2033 invalid: 2034 /* The content of the buffers are invalid, reset the meta data */ 2035 meta->head_buffer = 0; 2036 meta->commit_buffer = 0; 2037 2038 /* Reset the reader page */ 2039 local_set(&cpu_buffer->reader_page->entries, 0); 2040 local_set(&cpu_buffer->reader_page->page->commit, 0); 2041 2042 /* Reset all the subbuffers */ 2043 for (i = 0; i < meta->nr_subbufs - 1; i++, rb_inc_page(&head_page)) { 2044 local_set(&head_page->entries, 0); 2045 local_set(&head_page->page->commit, 0); 2046 } 2047 } 2048 2049 static void rb_range_meta_init(struct trace_buffer *buffer, int nr_pages, int scratch_size) 2050 { 2051 struct ring_buffer_cpu_meta *meta; 2052 unsigned long *subbuf_mask; 2053 unsigned long delta; 2054 void *subbuf; 2055 bool valid = false; 2056 int cpu; 2057 int i; 2058 2059 /* Create a mask to test the subbuf array */ 2060 subbuf_mask = bitmap_alloc(nr_pages + 1, GFP_KERNEL); 2061 /* If subbuf_mask fails to allocate, then rb_meta_valid() will return false */ 2062 2063 if (rb_meta_init(buffer, scratch_size)) 2064 valid = true; 2065 2066 for (cpu = 0; cpu < nr_cpu_ids; cpu++) { 2067 void *next_meta; 2068 2069 meta = rb_range_meta(buffer, nr_pages, cpu); 2070 2071 if (valid && rb_cpu_meta_valid(meta, cpu, buffer, nr_pages, subbuf_mask)) { 2072 /* Make the mappings match the current address */ 2073 subbuf = rb_subbufs_from_meta(meta); 2074 delta = (unsigned long)subbuf - meta->first_buffer; 2075 meta->first_buffer += delta; 2076 meta->head_buffer += delta; 2077 meta->commit_buffer += delta; 2078 continue; 2079 } 2080 2081 if (cpu < nr_cpu_ids - 1) 2082 next_meta = rb_range_meta(buffer, nr_pages, cpu + 1); 2083 else 2084 next_meta = (void *)buffer->range_addr_end; 2085 2086 memset(meta, 0, next_meta - (void *)meta); 2087 2088 meta->nr_subbufs = nr_pages + 1; 2089 meta->subbuf_size = PAGE_SIZE; 2090 2091 subbuf = rb_subbufs_from_meta(meta); 2092 2093 meta->first_buffer = (unsigned long)subbuf; 2094 2095 /* 2096 * The buffers[] array holds the order of the sub-buffers 2097 * that are after the meta data. The sub-buffers may 2098 * be swapped out when read and inserted into a different 2099 * location of the ring buffer. Although their addresses 2100 * remain the same, the buffers[] array contains the 2101 * index into the sub-buffers holding their actual order. 2102 */ 2103 for (i = 0; i < meta->nr_subbufs; i++) { 2104 meta->buffers[i] = i; 2105 rb_init_page(subbuf); 2106 subbuf += meta->subbuf_size; 2107 } 2108 } 2109 bitmap_free(subbuf_mask); 2110 } 2111 2112 static void *rbm_start(struct seq_file *m, loff_t *pos) 2113 { 2114 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2115 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2116 unsigned long val; 2117 2118 if (!meta) 2119 return NULL; 2120 2121 if (*pos > meta->nr_subbufs) 2122 return NULL; 2123 2124 val = *pos; 2125 val++; 2126 2127 return (void *)val; 2128 } 2129 2130 static void *rbm_next(struct seq_file *m, void *v, loff_t *pos) 2131 { 2132 (*pos)++; 2133 2134 return rbm_start(m, pos); 2135 } 2136 2137 static int rbm_show(struct seq_file *m, void *v) 2138 { 2139 struct ring_buffer_per_cpu *cpu_buffer = m->private; 2140 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2141 unsigned long val = (unsigned long)v; 2142 2143 if (val == 1) { 2144 seq_printf(m, "head_buffer: %d\n", 2145 rb_meta_subbuf_idx(meta, (void *)meta->head_buffer)); 2146 seq_printf(m, "commit_buffer: %d\n", 2147 rb_meta_subbuf_idx(meta, (void *)meta->commit_buffer)); 2148 seq_printf(m, "subbuf_size: %d\n", meta->subbuf_size); 2149 seq_printf(m, "nr_subbufs: %d\n", meta->nr_subbufs); 2150 return 0; 2151 } 2152 2153 val -= 2; 2154 seq_printf(m, "buffer[%ld]: %d\n", val, meta->buffers[val]); 2155 2156 return 0; 2157 } 2158 2159 static void rbm_stop(struct seq_file *m, void *p) 2160 { 2161 } 2162 2163 static const struct seq_operations rb_meta_seq_ops = { 2164 .start = rbm_start, 2165 .next = rbm_next, 2166 .show = rbm_show, 2167 .stop = rbm_stop, 2168 }; 2169 2170 int ring_buffer_meta_seq_init(struct file *file, struct trace_buffer *buffer, int cpu) 2171 { 2172 struct seq_file *m; 2173 int ret; 2174 2175 ret = seq_open(file, &rb_meta_seq_ops); 2176 if (ret) 2177 return ret; 2178 2179 m = file->private_data; 2180 m->private = buffer->buffers[cpu]; 2181 2182 return 0; 2183 } 2184 2185 /* Map the buffer_pages to the previous head and commit pages */ 2186 static void rb_meta_buffer_update(struct ring_buffer_per_cpu *cpu_buffer, 2187 struct buffer_page *bpage) 2188 { 2189 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 2190 2191 if (meta->head_buffer == (unsigned long)bpage->page) 2192 cpu_buffer->head_page = bpage; 2193 2194 if (meta->commit_buffer == (unsigned long)bpage->page) { 2195 cpu_buffer->commit_page = bpage; 2196 cpu_buffer->tail_page = bpage; 2197 } 2198 } 2199 2200 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2201 long nr_pages, struct list_head *pages) 2202 { 2203 struct trace_buffer *buffer = cpu_buffer->buffer; 2204 struct ring_buffer_cpu_meta *meta = NULL; 2205 struct buffer_page *bpage, *tmp; 2206 bool user_thread = current->mm != NULL; 2207 gfp_t mflags; 2208 long i; 2209 2210 /* 2211 * Check if the available memory is there first. 2212 * Note, si_mem_available() only gives us a rough estimate of available 2213 * memory. It may not be accurate. But we don't care, we just want 2214 * to prevent doing any allocation when it is obvious that it is 2215 * not going to succeed. 2216 */ 2217 i = si_mem_available(); 2218 if (i < nr_pages) 2219 return -ENOMEM; 2220 2221 /* 2222 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 2223 * gracefully without invoking oom-killer and the system is not 2224 * destabilized. 2225 */ 2226 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 2227 2228 /* 2229 * If a user thread allocates too much, and si_mem_available() 2230 * reports there's enough memory, even though there is not. 2231 * Make sure the OOM killer kills this thread. This can happen 2232 * even with RETRY_MAYFAIL because another task may be doing 2233 * an allocation after this task has taken all memory. 2234 * This is the task the OOM killer needs to take out during this 2235 * loop, even if it was triggered by an allocation somewhere else. 2236 */ 2237 if (user_thread) 2238 set_current_oom_origin(); 2239 2240 if (buffer->range_addr_start) 2241 meta = rb_range_meta(buffer, nr_pages, cpu_buffer->cpu); 2242 2243 for (i = 0; i < nr_pages; i++) { 2244 struct page *page; 2245 2246 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2247 mflags, cpu_to_node(cpu_buffer->cpu)); 2248 if (!bpage) 2249 goto free_pages; 2250 2251 rb_check_bpage(cpu_buffer, bpage); 2252 2253 /* 2254 * Append the pages as for mapped buffers we want to keep 2255 * the order 2256 */ 2257 list_add_tail(&bpage->list, pages); 2258 2259 if (meta) { 2260 /* A range was given. Use that for the buffer page */ 2261 bpage->page = rb_range_buffer(cpu_buffer, i + 1); 2262 if (!bpage->page) 2263 goto free_pages; 2264 /* If this is valid from a previous boot */ 2265 if (meta->head_buffer) 2266 rb_meta_buffer_update(cpu_buffer, bpage); 2267 bpage->range = 1; 2268 bpage->id = i + 1; 2269 } else { 2270 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 2271 mflags | __GFP_COMP | __GFP_ZERO, 2272 cpu_buffer->buffer->subbuf_order); 2273 if (!page) 2274 goto free_pages; 2275 bpage->page = page_address(page); 2276 rb_init_page(bpage->page); 2277 } 2278 bpage->order = cpu_buffer->buffer->subbuf_order; 2279 2280 if (user_thread && fatal_signal_pending(current)) 2281 goto free_pages; 2282 } 2283 if (user_thread) 2284 clear_current_oom_origin(); 2285 2286 return 0; 2287 2288 free_pages: 2289 list_for_each_entry_safe(bpage, tmp, pages, list) { 2290 list_del_init(&bpage->list); 2291 free_buffer_page(bpage); 2292 } 2293 if (user_thread) 2294 clear_current_oom_origin(); 2295 2296 return -ENOMEM; 2297 } 2298 2299 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 2300 unsigned long nr_pages) 2301 { 2302 LIST_HEAD(pages); 2303 2304 WARN_ON(!nr_pages); 2305 2306 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 2307 return -ENOMEM; 2308 2309 /* 2310 * The ring buffer page list is a circular list that does not 2311 * start and end with a list head. All page list items point to 2312 * other pages. 2313 */ 2314 cpu_buffer->pages = pages.next; 2315 list_del(&pages); 2316 2317 cpu_buffer->nr_pages = nr_pages; 2318 2319 rb_check_pages(cpu_buffer); 2320 2321 return 0; 2322 } 2323 2324 static struct ring_buffer_per_cpu * 2325 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 2326 { 2327 struct ring_buffer_per_cpu *cpu_buffer __free(kfree) = NULL; 2328 struct ring_buffer_cpu_meta *meta; 2329 struct buffer_page *bpage; 2330 struct page *page; 2331 int ret; 2332 2333 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 2334 GFP_KERNEL, cpu_to_node(cpu)); 2335 if (!cpu_buffer) 2336 return NULL; 2337 2338 cpu_buffer->cpu = cpu; 2339 cpu_buffer->buffer = buffer; 2340 raw_spin_lock_init(&cpu_buffer->reader_lock); 2341 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 2342 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 2343 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 2344 init_completion(&cpu_buffer->update_done); 2345 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 2346 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 2347 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 2348 mutex_init(&cpu_buffer->mapping_lock); 2349 2350 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 2351 GFP_KERNEL, cpu_to_node(cpu)); 2352 if (!bpage) 2353 return NULL; 2354 2355 rb_check_bpage(cpu_buffer, bpage); 2356 2357 cpu_buffer->reader_page = bpage; 2358 2359 if (buffer->range_addr_start) { 2360 /* 2361 * Range mapped buffers have the same restrictions as memory 2362 * mapped ones do. 2363 */ 2364 cpu_buffer->mapped = 1; 2365 cpu_buffer->ring_meta = rb_range_meta(buffer, nr_pages, cpu); 2366 bpage->page = rb_range_buffer(cpu_buffer, 0); 2367 if (!bpage->page) 2368 goto fail_free_reader; 2369 if (cpu_buffer->ring_meta->head_buffer) 2370 rb_meta_buffer_update(cpu_buffer, bpage); 2371 bpage->range = 1; 2372 } else { 2373 page = alloc_pages_node(cpu_to_node(cpu), 2374 GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 2375 cpu_buffer->buffer->subbuf_order); 2376 if (!page) 2377 goto fail_free_reader; 2378 bpage->page = page_address(page); 2379 rb_init_page(bpage->page); 2380 } 2381 2382 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 2383 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2384 2385 ret = rb_allocate_pages(cpu_buffer, nr_pages); 2386 if (ret < 0) 2387 goto fail_free_reader; 2388 2389 rb_meta_validate_events(cpu_buffer); 2390 2391 /* If the boot meta was valid then this has already been updated */ 2392 meta = cpu_buffer->ring_meta; 2393 if (!meta || !meta->head_buffer || 2394 !cpu_buffer->head_page || !cpu_buffer->commit_page || !cpu_buffer->tail_page) { 2395 if (meta && meta->head_buffer && 2396 (cpu_buffer->head_page || cpu_buffer->commit_page || cpu_buffer->tail_page)) { 2397 pr_warn("Ring buffer meta buffers not all mapped\n"); 2398 if (!cpu_buffer->head_page) 2399 pr_warn(" Missing head_page\n"); 2400 if (!cpu_buffer->commit_page) 2401 pr_warn(" Missing commit_page\n"); 2402 if (!cpu_buffer->tail_page) 2403 pr_warn(" Missing tail_page\n"); 2404 } 2405 2406 cpu_buffer->head_page 2407 = list_entry(cpu_buffer->pages, struct buffer_page, list); 2408 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 2409 2410 rb_head_page_activate(cpu_buffer); 2411 2412 if (cpu_buffer->ring_meta) 2413 meta->commit_buffer = meta->head_buffer; 2414 } else { 2415 /* The valid meta buffer still needs to activate the head page */ 2416 rb_head_page_activate(cpu_buffer); 2417 } 2418 2419 return_ptr(cpu_buffer); 2420 2421 fail_free_reader: 2422 free_buffer_page(cpu_buffer->reader_page); 2423 2424 return NULL; 2425 } 2426 2427 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 2428 { 2429 struct list_head *head = cpu_buffer->pages; 2430 struct buffer_page *bpage, *tmp; 2431 2432 irq_work_sync(&cpu_buffer->irq_work.work); 2433 2434 free_buffer_page(cpu_buffer->reader_page); 2435 2436 if (head) { 2437 rb_head_page_deactivate(cpu_buffer); 2438 2439 list_for_each_entry_safe(bpage, tmp, head, list) { 2440 list_del_init(&bpage->list); 2441 free_buffer_page(bpage); 2442 } 2443 bpage = list_entry(head, struct buffer_page, list); 2444 free_buffer_page(bpage); 2445 } 2446 2447 free_page((unsigned long)cpu_buffer->free_page); 2448 2449 kfree(cpu_buffer); 2450 } 2451 2452 static struct trace_buffer *alloc_buffer(unsigned long size, unsigned flags, 2453 int order, unsigned long start, 2454 unsigned long end, 2455 unsigned long scratch_size, 2456 struct lock_class_key *key) 2457 { 2458 struct trace_buffer *buffer __free(kfree) = NULL; 2459 long nr_pages; 2460 int subbuf_size; 2461 int bsize; 2462 int cpu; 2463 int ret; 2464 2465 /* keep it in its own cache line */ 2466 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 2467 GFP_KERNEL); 2468 if (!buffer) 2469 return NULL; 2470 2471 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 2472 return NULL; 2473 2474 buffer->subbuf_order = order; 2475 subbuf_size = (PAGE_SIZE << order); 2476 buffer->subbuf_size = subbuf_size - BUF_PAGE_HDR_SIZE; 2477 2478 /* Max payload is buffer page size - header (8bytes) */ 2479 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 2480 2481 buffer->flags = flags; 2482 buffer->clock = trace_clock_local; 2483 buffer->reader_lock_key = key; 2484 2485 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 2486 init_waitqueue_head(&buffer->irq_work.waiters); 2487 2488 buffer->cpus = nr_cpu_ids; 2489 2490 bsize = sizeof(void *) * nr_cpu_ids; 2491 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 2492 GFP_KERNEL); 2493 if (!buffer->buffers) 2494 goto fail_free_cpumask; 2495 2496 /* If start/end are specified, then that overrides size */ 2497 if (start && end) { 2498 unsigned long buffers_start; 2499 unsigned long ptr; 2500 int n; 2501 2502 /* Make sure that start is word aligned */ 2503 start = ALIGN(start, sizeof(long)); 2504 2505 /* scratch_size needs to be aligned too */ 2506 scratch_size = ALIGN(scratch_size, sizeof(long)); 2507 2508 /* Subtract the buffer meta data and word aligned */ 2509 buffers_start = start + sizeof(struct ring_buffer_cpu_meta); 2510 buffers_start = ALIGN(buffers_start, sizeof(long)); 2511 buffers_start += scratch_size; 2512 2513 /* Calculate the size for the per CPU data */ 2514 size = end - buffers_start; 2515 size = size / nr_cpu_ids; 2516 2517 /* 2518 * The number of sub-buffers (nr_pages) is determined by the 2519 * total size allocated minus the meta data size. 2520 * Then that is divided by the number of per CPU buffers 2521 * needed, plus account for the integer array index that 2522 * will be appended to the meta data. 2523 */ 2524 nr_pages = (size - sizeof(struct ring_buffer_cpu_meta)) / 2525 (subbuf_size + sizeof(int)); 2526 /* Need at least two pages plus the reader page */ 2527 if (nr_pages < 3) 2528 goto fail_free_buffers; 2529 2530 again: 2531 /* Make sure that the size fits aligned */ 2532 for (n = 0, ptr = buffers_start; n < nr_cpu_ids; n++) { 2533 ptr += sizeof(struct ring_buffer_cpu_meta) + 2534 sizeof(int) * nr_pages; 2535 ptr = ALIGN(ptr, subbuf_size); 2536 ptr += subbuf_size * nr_pages; 2537 } 2538 if (ptr > end) { 2539 if (nr_pages <= 3) 2540 goto fail_free_buffers; 2541 nr_pages--; 2542 goto again; 2543 } 2544 2545 /* nr_pages should not count the reader page */ 2546 nr_pages--; 2547 buffer->range_addr_start = start; 2548 buffer->range_addr_end = end; 2549 2550 rb_range_meta_init(buffer, nr_pages, scratch_size); 2551 } else { 2552 2553 /* need at least two pages */ 2554 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2555 if (nr_pages < 2) 2556 nr_pages = 2; 2557 } 2558 2559 cpu = raw_smp_processor_id(); 2560 cpumask_set_cpu(cpu, buffer->cpumask); 2561 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 2562 if (!buffer->buffers[cpu]) 2563 goto fail_free_buffers; 2564 2565 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2566 if (ret < 0) 2567 goto fail_free_buffers; 2568 2569 mutex_init(&buffer->mutex); 2570 2571 return_ptr(buffer); 2572 2573 fail_free_buffers: 2574 for_each_buffer_cpu(buffer, cpu) { 2575 if (buffer->buffers[cpu]) 2576 rb_free_cpu_buffer(buffer->buffers[cpu]); 2577 } 2578 kfree(buffer->buffers); 2579 2580 fail_free_cpumask: 2581 free_cpumask_var(buffer->cpumask); 2582 2583 return NULL; 2584 } 2585 2586 /** 2587 * __ring_buffer_alloc - allocate a new ring_buffer 2588 * @size: the size in bytes per cpu that is needed. 2589 * @flags: attributes to set for the ring buffer. 2590 * @key: ring buffer reader_lock_key. 2591 * 2592 * Currently the only flag that is available is the RB_FL_OVERWRITE 2593 * flag. This flag means that the buffer will overwrite old data 2594 * when the buffer wraps. If this flag is not set, the buffer will 2595 * drop data when the tail hits the head. 2596 */ 2597 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 2598 struct lock_class_key *key) 2599 { 2600 /* Default buffer page size - one system page */ 2601 return alloc_buffer(size, flags, 0, 0, 0, 0, key); 2602 2603 } 2604 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 2605 2606 /** 2607 * __ring_buffer_alloc_range - allocate a new ring_buffer from existing memory 2608 * @size: the size in bytes per cpu that is needed. 2609 * @flags: attributes to set for the ring buffer. 2610 * @order: sub-buffer order 2611 * @start: start of allocated range 2612 * @range_size: size of allocated range 2613 * @scratch_size: size of scratch area (for preallocated memory buffers) 2614 * @key: ring buffer reader_lock_key. 2615 * 2616 * Currently the only flag that is available is the RB_FL_OVERWRITE 2617 * flag. This flag means that the buffer will overwrite old data 2618 * when the buffer wraps. If this flag is not set, the buffer will 2619 * drop data when the tail hits the head. 2620 */ 2621 struct trace_buffer *__ring_buffer_alloc_range(unsigned long size, unsigned flags, 2622 int order, unsigned long start, 2623 unsigned long range_size, 2624 unsigned long scratch_size, 2625 struct lock_class_key *key) 2626 { 2627 return alloc_buffer(size, flags, order, start, start + range_size, 2628 scratch_size, key); 2629 } 2630 2631 void *ring_buffer_meta_scratch(struct trace_buffer *buffer, unsigned int *size) 2632 { 2633 struct ring_buffer_meta *meta; 2634 void *ptr; 2635 2636 if (!buffer || !buffer->meta) 2637 return NULL; 2638 2639 meta = buffer->meta; 2640 2641 ptr = (void *)ALIGN((unsigned long)meta + sizeof(*meta), sizeof(long)); 2642 2643 if (size) 2644 *size = (void *)meta + meta->buffers_offset - ptr; 2645 2646 return ptr; 2647 } 2648 2649 /** 2650 * ring_buffer_free - free a ring buffer. 2651 * @buffer: the buffer to free. 2652 */ 2653 void 2654 ring_buffer_free(struct trace_buffer *buffer) 2655 { 2656 int cpu; 2657 2658 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 2659 2660 irq_work_sync(&buffer->irq_work.work); 2661 2662 for_each_buffer_cpu(buffer, cpu) 2663 rb_free_cpu_buffer(buffer->buffers[cpu]); 2664 2665 kfree(buffer->buffers); 2666 free_cpumask_var(buffer->cpumask); 2667 2668 kfree(buffer); 2669 } 2670 EXPORT_SYMBOL_GPL(ring_buffer_free); 2671 2672 void ring_buffer_set_clock(struct trace_buffer *buffer, 2673 u64 (*clock)(void)) 2674 { 2675 buffer->clock = clock; 2676 } 2677 2678 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 2679 { 2680 buffer->time_stamp_abs = abs; 2681 } 2682 2683 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 2684 { 2685 return buffer->time_stamp_abs; 2686 } 2687 2688 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 2689 { 2690 return local_read(&bpage->entries) & RB_WRITE_MASK; 2691 } 2692 2693 static inline unsigned long rb_page_write(struct buffer_page *bpage) 2694 { 2695 return local_read(&bpage->write) & RB_WRITE_MASK; 2696 } 2697 2698 static bool 2699 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 2700 { 2701 struct list_head *tail_page, *to_remove, *next_page; 2702 struct buffer_page *to_remove_page, *tmp_iter_page; 2703 struct buffer_page *last_page, *first_page; 2704 unsigned long nr_removed; 2705 unsigned long head_bit; 2706 int page_entries; 2707 2708 head_bit = 0; 2709 2710 raw_spin_lock_irq(&cpu_buffer->reader_lock); 2711 atomic_inc(&cpu_buffer->record_disabled); 2712 /* 2713 * We don't race with the readers since we have acquired the reader 2714 * lock. We also don't race with writers after disabling recording. 2715 * This makes it easy to figure out the first and the last page to be 2716 * removed from the list. We unlink all the pages in between including 2717 * the first and last pages. This is done in a busy loop so that we 2718 * lose the least number of traces. 2719 * The pages are freed after we restart recording and unlock readers. 2720 */ 2721 tail_page = &cpu_buffer->tail_page->list; 2722 2723 /* 2724 * tail page might be on reader page, we remove the next page 2725 * from the ring buffer 2726 */ 2727 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 2728 tail_page = rb_list_head(tail_page->next); 2729 to_remove = tail_page; 2730 2731 /* start of pages to remove */ 2732 first_page = list_entry(rb_list_head(to_remove->next), 2733 struct buffer_page, list); 2734 2735 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 2736 to_remove = rb_list_head(to_remove)->next; 2737 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 2738 } 2739 /* Read iterators need to reset themselves when some pages removed */ 2740 cpu_buffer->pages_removed += nr_removed; 2741 2742 next_page = rb_list_head(to_remove)->next; 2743 2744 /* 2745 * Now we remove all pages between tail_page and next_page. 2746 * Make sure that we have head_bit value preserved for the 2747 * next page 2748 */ 2749 tail_page->next = (struct list_head *)((unsigned long)next_page | 2750 head_bit); 2751 next_page = rb_list_head(next_page); 2752 next_page->prev = tail_page; 2753 2754 /* make sure pages points to a valid page in the ring buffer */ 2755 cpu_buffer->pages = next_page; 2756 cpu_buffer->cnt++; 2757 2758 /* update head page */ 2759 if (head_bit) 2760 cpu_buffer->head_page = list_entry(next_page, 2761 struct buffer_page, list); 2762 2763 /* pages are removed, resume tracing and then free the pages */ 2764 atomic_dec(&cpu_buffer->record_disabled); 2765 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 2766 2767 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 2768 2769 /* last buffer page to remove */ 2770 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 2771 list); 2772 tmp_iter_page = first_page; 2773 2774 do { 2775 cond_resched(); 2776 2777 to_remove_page = tmp_iter_page; 2778 rb_inc_page(&tmp_iter_page); 2779 2780 /* update the counters */ 2781 page_entries = rb_page_entries(to_remove_page); 2782 if (page_entries) { 2783 /* 2784 * If something was added to this page, it was full 2785 * since it is not the tail page. So we deduct the 2786 * bytes consumed in ring buffer from here. 2787 * Increment overrun to account for the lost events. 2788 */ 2789 local_add(page_entries, &cpu_buffer->overrun); 2790 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 2791 local_inc(&cpu_buffer->pages_lost); 2792 } 2793 2794 /* 2795 * We have already removed references to this list item, just 2796 * free up the buffer_page and its page 2797 */ 2798 free_buffer_page(to_remove_page); 2799 nr_removed--; 2800 2801 } while (to_remove_page != last_page); 2802 2803 RB_WARN_ON(cpu_buffer, nr_removed); 2804 2805 return nr_removed == 0; 2806 } 2807 2808 static bool 2809 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 2810 { 2811 struct list_head *pages = &cpu_buffer->new_pages; 2812 unsigned long flags; 2813 bool success; 2814 int retries; 2815 2816 /* Can be called at early boot up, where interrupts must not been enabled */ 2817 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2818 /* 2819 * We are holding the reader lock, so the reader page won't be swapped 2820 * in the ring buffer. Now we are racing with the writer trying to 2821 * move head page and the tail page. 2822 * We are going to adapt the reader page update process where: 2823 * 1. We first splice the start and end of list of new pages between 2824 * the head page and its previous page. 2825 * 2. We cmpxchg the prev_page->next to point from head page to the 2826 * start of new pages list. 2827 * 3. Finally, we update the head->prev to the end of new list. 2828 * 2829 * We will try this process 10 times, to make sure that we don't keep 2830 * spinning. 2831 */ 2832 retries = 10; 2833 success = false; 2834 while (retries--) { 2835 struct list_head *head_page, *prev_page; 2836 struct list_head *last_page, *first_page; 2837 struct list_head *head_page_with_bit; 2838 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 2839 2840 if (!hpage) 2841 break; 2842 head_page = &hpage->list; 2843 prev_page = head_page->prev; 2844 2845 first_page = pages->next; 2846 last_page = pages->prev; 2847 2848 head_page_with_bit = (struct list_head *) 2849 ((unsigned long)head_page | RB_PAGE_HEAD); 2850 2851 last_page->next = head_page_with_bit; 2852 first_page->prev = prev_page; 2853 2854 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 2855 if (try_cmpxchg(&prev_page->next, 2856 &head_page_with_bit, first_page)) { 2857 /* 2858 * yay, we replaced the page pointer to our new list, 2859 * now, we just have to update to head page's prev 2860 * pointer to point to end of list 2861 */ 2862 head_page->prev = last_page; 2863 cpu_buffer->cnt++; 2864 success = true; 2865 break; 2866 } 2867 } 2868 2869 if (success) 2870 INIT_LIST_HEAD(pages); 2871 /* 2872 * If we weren't successful in adding in new pages, warn and stop 2873 * tracing 2874 */ 2875 RB_WARN_ON(cpu_buffer, !success); 2876 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2877 2878 /* free pages if they weren't inserted */ 2879 if (!success) { 2880 struct buffer_page *bpage, *tmp; 2881 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2882 list) { 2883 list_del_init(&bpage->list); 2884 free_buffer_page(bpage); 2885 } 2886 } 2887 return success; 2888 } 2889 2890 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2891 { 2892 bool success; 2893 2894 if (cpu_buffer->nr_pages_to_update > 0) 2895 success = rb_insert_pages(cpu_buffer); 2896 else 2897 success = rb_remove_pages(cpu_buffer, 2898 -cpu_buffer->nr_pages_to_update); 2899 2900 if (success) 2901 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2902 } 2903 2904 static void update_pages_handler(struct work_struct *work) 2905 { 2906 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2907 struct ring_buffer_per_cpu, update_pages_work); 2908 rb_update_pages(cpu_buffer); 2909 complete(&cpu_buffer->update_done); 2910 } 2911 2912 /** 2913 * ring_buffer_resize - resize the ring buffer 2914 * @buffer: the buffer to resize. 2915 * @size: the new size. 2916 * @cpu_id: the cpu buffer to resize 2917 * 2918 * Minimum size is 2 * buffer->subbuf_size. 2919 * 2920 * Returns 0 on success and < 0 on failure. 2921 */ 2922 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2923 int cpu_id) 2924 { 2925 struct ring_buffer_per_cpu *cpu_buffer; 2926 unsigned long nr_pages; 2927 int cpu, err; 2928 2929 /* 2930 * Always succeed at resizing a non-existent buffer: 2931 */ 2932 if (!buffer) 2933 return 0; 2934 2935 /* Make sure the requested buffer exists */ 2936 if (cpu_id != RING_BUFFER_ALL_CPUS && 2937 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2938 return 0; 2939 2940 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2941 2942 /* we need a minimum of two pages */ 2943 if (nr_pages < 2) 2944 nr_pages = 2; 2945 2946 /* 2947 * Keep CPUs from coming online while resizing to synchronize 2948 * with new per CPU buffers being created. 2949 */ 2950 guard(cpus_read_lock)(); 2951 2952 /* prevent another thread from changing buffer sizes */ 2953 mutex_lock(&buffer->mutex); 2954 atomic_inc(&buffer->resizing); 2955 2956 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2957 /* 2958 * Don't succeed if resizing is disabled, as a reader might be 2959 * manipulating the ring buffer and is expecting a sane state while 2960 * this is true. 2961 */ 2962 for_each_buffer_cpu(buffer, cpu) { 2963 cpu_buffer = buffer->buffers[cpu]; 2964 if (atomic_read(&cpu_buffer->resize_disabled)) { 2965 err = -EBUSY; 2966 goto out_err_unlock; 2967 } 2968 } 2969 2970 /* calculate the pages to update */ 2971 for_each_buffer_cpu(buffer, cpu) { 2972 cpu_buffer = buffer->buffers[cpu]; 2973 2974 cpu_buffer->nr_pages_to_update = nr_pages - 2975 cpu_buffer->nr_pages; 2976 /* 2977 * nothing more to do for removing pages or no update 2978 */ 2979 if (cpu_buffer->nr_pages_to_update <= 0) 2980 continue; 2981 /* 2982 * to add pages, make sure all new pages can be 2983 * allocated without receiving ENOMEM 2984 */ 2985 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2986 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2987 &cpu_buffer->new_pages)) { 2988 /* not enough memory for new pages */ 2989 err = -ENOMEM; 2990 goto out_err; 2991 } 2992 2993 cond_resched(); 2994 } 2995 2996 /* 2997 * Fire off all the required work handlers 2998 * We can't schedule on offline CPUs, but it's not necessary 2999 * since we can change their buffer sizes without any race. 3000 */ 3001 for_each_buffer_cpu(buffer, cpu) { 3002 cpu_buffer = buffer->buffers[cpu]; 3003 if (!cpu_buffer->nr_pages_to_update) 3004 continue; 3005 3006 /* Can't run something on an offline CPU. */ 3007 if (!cpu_online(cpu)) { 3008 rb_update_pages(cpu_buffer); 3009 cpu_buffer->nr_pages_to_update = 0; 3010 } else { 3011 /* Run directly if possible. */ 3012 migrate_disable(); 3013 if (cpu != smp_processor_id()) { 3014 migrate_enable(); 3015 schedule_work_on(cpu, 3016 &cpu_buffer->update_pages_work); 3017 } else { 3018 update_pages_handler(&cpu_buffer->update_pages_work); 3019 migrate_enable(); 3020 } 3021 } 3022 } 3023 3024 /* wait for all the updates to complete */ 3025 for_each_buffer_cpu(buffer, cpu) { 3026 cpu_buffer = buffer->buffers[cpu]; 3027 if (!cpu_buffer->nr_pages_to_update) 3028 continue; 3029 3030 if (cpu_online(cpu)) 3031 wait_for_completion(&cpu_buffer->update_done); 3032 cpu_buffer->nr_pages_to_update = 0; 3033 } 3034 3035 } else { 3036 cpu_buffer = buffer->buffers[cpu_id]; 3037 3038 if (nr_pages == cpu_buffer->nr_pages) 3039 goto out; 3040 3041 /* 3042 * Don't succeed if resizing is disabled, as a reader might be 3043 * manipulating the ring buffer and is expecting a sane state while 3044 * this is true. 3045 */ 3046 if (atomic_read(&cpu_buffer->resize_disabled)) { 3047 err = -EBUSY; 3048 goto out_err_unlock; 3049 } 3050 3051 cpu_buffer->nr_pages_to_update = nr_pages - 3052 cpu_buffer->nr_pages; 3053 3054 INIT_LIST_HEAD(&cpu_buffer->new_pages); 3055 if (cpu_buffer->nr_pages_to_update > 0 && 3056 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 3057 &cpu_buffer->new_pages)) { 3058 err = -ENOMEM; 3059 goto out_err; 3060 } 3061 3062 /* Can't run something on an offline CPU. */ 3063 if (!cpu_online(cpu_id)) 3064 rb_update_pages(cpu_buffer); 3065 else { 3066 /* Run directly if possible. */ 3067 migrate_disable(); 3068 if (cpu_id == smp_processor_id()) { 3069 rb_update_pages(cpu_buffer); 3070 migrate_enable(); 3071 } else { 3072 migrate_enable(); 3073 schedule_work_on(cpu_id, 3074 &cpu_buffer->update_pages_work); 3075 wait_for_completion(&cpu_buffer->update_done); 3076 } 3077 } 3078 3079 cpu_buffer->nr_pages_to_update = 0; 3080 } 3081 3082 out: 3083 /* 3084 * The ring buffer resize can happen with the ring buffer 3085 * enabled, so that the update disturbs the tracing as little 3086 * as possible. But if the buffer is disabled, we do not need 3087 * to worry about that, and we can take the time to verify 3088 * that the buffer is not corrupt. 3089 */ 3090 if (atomic_read(&buffer->record_disabled)) { 3091 atomic_inc(&buffer->record_disabled); 3092 /* 3093 * Even though the buffer was disabled, we must make sure 3094 * that it is truly disabled before calling rb_check_pages. 3095 * There could have been a race between checking 3096 * record_disable and incrementing it. 3097 */ 3098 synchronize_rcu(); 3099 for_each_buffer_cpu(buffer, cpu) { 3100 cpu_buffer = buffer->buffers[cpu]; 3101 rb_check_pages(cpu_buffer); 3102 } 3103 atomic_dec(&buffer->record_disabled); 3104 } 3105 3106 atomic_dec(&buffer->resizing); 3107 mutex_unlock(&buffer->mutex); 3108 return 0; 3109 3110 out_err: 3111 for_each_buffer_cpu(buffer, cpu) { 3112 struct buffer_page *bpage, *tmp; 3113 3114 cpu_buffer = buffer->buffers[cpu]; 3115 cpu_buffer->nr_pages_to_update = 0; 3116 3117 if (list_empty(&cpu_buffer->new_pages)) 3118 continue; 3119 3120 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 3121 list) { 3122 list_del_init(&bpage->list); 3123 free_buffer_page(bpage); 3124 } 3125 } 3126 out_err_unlock: 3127 atomic_dec(&buffer->resizing); 3128 mutex_unlock(&buffer->mutex); 3129 return err; 3130 } 3131 EXPORT_SYMBOL_GPL(ring_buffer_resize); 3132 3133 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 3134 { 3135 mutex_lock(&buffer->mutex); 3136 if (val) 3137 buffer->flags |= RB_FL_OVERWRITE; 3138 else 3139 buffer->flags &= ~RB_FL_OVERWRITE; 3140 mutex_unlock(&buffer->mutex); 3141 } 3142 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 3143 3144 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 3145 { 3146 return bpage->page->data + index; 3147 } 3148 3149 static __always_inline struct ring_buffer_event * 3150 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 3151 { 3152 return __rb_page_index(cpu_buffer->reader_page, 3153 cpu_buffer->reader_page->read); 3154 } 3155 3156 static struct ring_buffer_event * 3157 rb_iter_head_event(struct ring_buffer_iter *iter) 3158 { 3159 struct ring_buffer_event *event; 3160 struct buffer_page *iter_head_page = iter->head_page; 3161 unsigned long commit; 3162 unsigned length; 3163 3164 if (iter->head != iter->next_event) 3165 return iter->event; 3166 3167 /* 3168 * When the writer goes across pages, it issues a cmpxchg which 3169 * is a mb(), which will synchronize with the rmb here. 3170 * (see rb_tail_page_update() and __rb_reserve_next()) 3171 */ 3172 commit = rb_page_commit(iter_head_page); 3173 smp_rmb(); 3174 3175 /* An event needs to be at least 8 bytes in size */ 3176 if (iter->head > commit - 8) 3177 goto reset; 3178 3179 event = __rb_page_index(iter_head_page, iter->head); 3180 length = rb_event_length(event); 3181 3182 /* 3183 * READ_ONCE() doesn't work on functions and we don't want the 3184 * compiler doing any crazy optimizations with length. 3185 */ 3186 barrier(); 3187 3188 if ((iter->head + length) > commit || length > iter->event_size) 3189 /* Writer corrupted the read? */ 3190 goto reset; 3191 3192 memcpy(iter->event, event, length); 3193 /* 3194 * If the page stamp is still the same after this rmb() then the 3195 * event was safely copied without the writer entering the page. 3196 */ 3197 smp_rmb(); 3198 3199 /* Make sure the page didn't change since we read this */ 3200 if (iter->page_stamp != iter_head_page->page->time_stamp || 3201 commit > rb_page_commit(iter_head_page)) 3202 goto reset; 3203 3204 iter->next_event = iter->head + length; 3205 return iter->event; 3206 reset: 3207 /* Reset to the beginning */ 3208 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3209 iter->head = 0; 3210 iter->next_event = 0; 3211 iter->missed_events = 1; 3212 return NULL; 3213 } 3214 3215 /* Size is determined by what has been committed */ 3216 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 3217 { 3218 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 3219 } 3220 3221 static __always_inline unsigned 3222 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 3223 { 3224 return rb_page_commit(cpu_buffer->commit_page); 3225 } 3226 3227 static __always_inline unsigned 3228 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 3229 { 3230 unsigned long addr = (unsigned long)event; 3231 3232 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 3233 3234 return addr - BUF_PAGE_HDR_SIZE; 3235 } 3236 3237 static void rb_inc_iter(struct ring_buffer_iter *iter) 3238 { 3239 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 3240 3241 /* 3242 * The iterator could be on the reader page (it starts there). 3243 * But the head could have moved, since the reader was 3244 * found. Check for this case and assign the iterator 3245 * to the head page instead of next. 3246 */ 3247 if (iter->head_page == cpu_buffer->reader_page) 3248 iter->head_page = rb_set_head_page(cpu_buffer); 3249 else 3250 rb_inc_page(&iter->head_page); 3251 3252 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 3253 iter->head = 0; 3254 iter->next_event = 0; 3255 } 3256 3257 /* Return the index into the sub-buffers for a given sub-buffer */ 3258 static int rb_meta_subbuf_idx(struct ring_buffer_cpu_meta *meta, void *subbuf) 3259 { 3260 void *subbuf_array; 3261 3262 subbuf_array = (void *)meta + sizeof(int) * meta->nr_subbufs; 3263 subbuf_array = (void *)ALIGN((unsigned long)subbuf_array, meta->subbuf_size); 3264 return (subbuf - subbuf_array) / meta->subbuf_size; 3265 } 3266 3267 static void rb_update_meta_head(struct ring_buffer_per_cpu *cpu_buffer, 3268 struct buffer_page *next_page) 3269 { 3270 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3271 unsigned long old_head = (unsigned long)next_page->page; 3272 unsigned long new_head; 3273 3274 rb_inc_page(&next_page); 3275 new_head = (unsigned long)next_page->page; 3276 3277 /* 3278 * Only move it forward once, if something else came in and 3279 * moved it forward, then we don't want to touch it. 3280 */ 3281 (void)cmpxchg(&meta->head_buffer, old_head, new_head); 3282 } 3283 3284 static void rb_update_meta_reader(struct ring_buffer_per_cpu *cpu_buffer, 3285 struct buffer_page *reader) 3286 { 3287 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3288 void *old_reader = cpu_buffer->reader_page->page; 3289 void *new_reader = reader->page; 3290 int id; 3291 3292 id = reader->id; 3293 cpu_buffer->reader_page->id = id; 3294 reader->id = 0; 3295 3296 meta->buffers[0] = rb_meta_subbuf_idx(meta, new_reader); 3297 meta->buffers[id] = rb_meta_subbuf_idx(meta, old_reader); 3298 3299 /* The head pointer is the one after the reader */ 3300 rb_update_meta_head(cpu_buffer, reader); 3301 } 3302 3303 /* 3304 * rb_handle_head_page - writer hit the head page 3305 * 3306 * Returns: +1 to retry page 3307 * 0 to continue 3308 * -1 on error 3309 */ 3310 static int 3311 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 3312 struct buffer_page *tail_page, 3313 struct buffer_page *next_page) 3314 { 3315 struct buffer_page *new_head; 3316 int entries; 3317 int type; 3318 int ret; 3319 3320 entries = rb_page_entries(next_page); 3321 3322 /* 3323 * The hard part is here. We need to move the head 3324 * forward, and protect against both readers on 3325 * other CPUs and writers coming in via interrupts. 3326 */ 3327 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 3328 RB_PAGE_HEAD); 3329 3330 /* 3331 * type can be one of four: 3332 * NORMAL - an interrupt already moved it for us 3333 * HEAD - we are the first to get here. 3334 * UPDATE - we are the interrupt interrupting 3335 * a current move. 3336 * MOVED - a reader on another CPU moved the next 3337 * pointer to its reader page. Give up 3338 * and try again. 3339 */ 3340 3341 switch (type) { 3342 case RB_PAGE_HEAD: 3343 /* 3344 * We changed the head to UPDATE, thus 3345 * it is our responsibility to update 3346 * the counters. 3347 */ 3348 local_add(entries, &cpu_buffer->overrun); 3349 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 3350 local_inc(&cpu_buffer->pages_lost); 3351 3352 if (cpu_buffer->ring_meta) 3353 rb_update_meta_head(cpu_buffer, next_page); 3354 /* 3355 * The entries will be zeroed out when we move the 3356 * tail page. 3357 */ 3358 3359 /* still more to do */ 3360 break; 3361 3362 case RB_PAGE_UPDATE: 3363 /* 3364 * This is an interrupt that interrupt the 3365 * previous update. Still more to do. 3366 */ 3367 break; 3368 case RB_PAGE_NORMAL: 3369 /* 3370 * An interrupt came in before the update 3371 * and processed this for us. 3372 * Nothing left to do. 3373 */ 3374 return 1; 3375 case RB_PAGE_MOVED: 3376 /* 3377 * The reader is on another CPU and just did 3378 * a swap with our next_page. 3379 * Try again. 3380 */ 3381 return 1; 3382 default: 3383 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 3384 return -1; 3385 } 3386 3387 /* 3388 * Now that we are here, the old head pointer is 3389 * set to UPDATE. This will keep the reader from 3390 * swapping the head page with the reader page. 3391 * The reader (on another CPU) will spin till 3392 * we are finished. 3393 * 3394 * We just need to protect against interrupts 3395 * doing the job. We will set the next pointer 3396 * to HEAD. After that, we set the old pointer 3397 * to NORMAL, but only if it was HEAD before. 3398 * otherwise we are an interrupt, and only 3399 * want the outer most commit to reset it. 3400 */ 3401 new_head = next_page; 3402 rb_inc_page(&new_head); 3403 3404 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 3405 RB_PAGE_NORMAL); 3406 3407 /* 3408 * Valid returns are: 3409 * HEAD - an interrupt came in and already set it. 3410 * NORMAL - One of two things: 3411 * 1) We really set it. 3412 * 2) A bunch of interrupts came in and moved 3413 * the page forward again. 3414 */ 3415 switch (ret) { 3416 case RB_PAGE_HEAD: 3417 case RB_PAGE_NORMAL: 3418 /* OK */ 3419 break; 3420 default: 3421 RB_WARN_ON(cpu_buffer, 1); 3422 return -1; 3423 } 3424 3425 /* 3426 * It is possible that an interrupt came in, 3427 * set the head up, then more interrupts came in 3428 * and moved it again. When we get back here, 3429 * the page would have been set to NORMAL but we 3430 * just set it back to HEAD. 3431 * 3432 * How do you detect this? Well, if that happened 3433 * the tail page would have moved. 3434 */ 3435 if (ret == RB_PAGE_NORMAL) { 3436 struct buffer_page *buffer_tail_page; 3437 3438 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 3439 /* 3440 * If the tail had moved passed next, then we need 3441 * to reset the pointer. 3442 */ 3443 if (buffer_tail_page != tail_page && 3444 buffer_tail_page != next_page) 3445 rb_head_page_set_normal(cpu_buffer, new_head, 3446 next_page, 3447 RB_PAGE_HEAD); 3448 } 3449 3450 /* 3451 * If this was the outer most commit (the one that 3452 * changed the original pointer from HEAD to UPDATE), 3453 * then it is up to us to reset it to NORMAL. 3454 */ 3455 if (type == RB_PAGE_HEAD) { 3456 ret = rb_head_page_set_normal(cpu_buffer, next_page, 3457 tail_page, 3458 RB_PAGE_UPDATE); 3459 if (RB_WARN_ON(cpu_buffer, 3460 ret != RB_PAGE_UPDATE)) 3461 return -1; 3462 } 3463 3464 return 0; 3465 } 3466 3467 static inline void 3468 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 3469 unsigned long tail, struct rb_event_info *info) 3470 { 3471 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 3472 struct buffer_page *tail_page = info->tail_page; 3473 struct ring_buffer_event *event; 3474 unsigned long length = info->length; 3475 3476 /* 3477 * Only the event that crossed the page boundary 3478 * must fill the old tail_page with padding. 3479 */ 3480 if (tail >= bsize) { 3481 /* 3482 * If the page was filled, then we still need 3483 * to update the real_end. Reset it to zero 3484 * and the reader will ignore it. 3485 */ 3486 if (tail == bsize) 3487 tail_page->real_end = 0; 3488 3489 local_sub(length, &tail_page->write); 3490 return; 3491 } 3492 3493 event = __rb_page_index(tail_page, tail); 3494 3495 /* 3496 * Save the original length to the meta data. 3497 * This will be used by the reader to add lost event 3498 * counter. 3499 */ 3500 tail_page->real_end = tail; 3501 3502 /* 3503 * If this event is bigger than the minimum size, then 3504 * we need to be careful that we don't subtract the 3505 * write counter enough to allow another writer to slip 3506 * in on this page. 3507 * We put in a discarded commit instead, to make sure 3508 * that this space is not used again, and this space will 3509 * not be accounted into 'entries_bytes'. 3510 * 3511 * If we are less than the minimum size, we don't need to 3512 * worry about it. 3513 */ 3514 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 3515 /* No room for any events */ 3516 3517 /* Mark the rest of the page with padding */ 3518 rb_event_set_padding(event); 3519 3520 /* Make sure the padding is visible before the write update */ 3521 smp_wmb(); 3522 3523 /* Set the write back to the previous setting */ 3524 local_sub(length, &tail_page->write); 3525 return; 3526 } 3527 3528 /* Put in a discarded event */ 3529 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 3530 event->type_len = RINGBUF_TYPE_PADDING; 3531 /* time delta must be non zero */ 3532 event->time_delta = 1; 3533 3534 /* account for padding bytes */ 3535 local_add(bsize - tail, &cpu_buffer->entries_bytes); 3536 3537 /* Make sure the padding is visible before the tail_page->write update */ 3538 smp_wmb(); 3539 3540 /* Set write to end of buffer */ 3541 length = (tail + length) - bsize; 3542 local_sub(length, &tail_page->write); 3543 } 3544 3545 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 3546 3547 /* 3548 * This is the slow path, force gcc not to inline it. 3549 */ 3550 static noinline struct ring_buffer_event * 3551 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 3552 unsigned long tail, struct rb_event_info *info) 3553 { 3554 struct buffer_page *tail_page = info->tail_page; 3555 struct buffer_page *commit_page = cpu_buffer->commit_page; 3556 struct trace_buffer *buffer = cpu_buffer->buffer; 3557 struct buffer_page *next_page; 3558 int ret; 3559 3560 next_page = tail_page; 3561 3562 rb_inc_page(&next_page); 3563 3564 /* 3565 * If for some reason, we had an interrupt storm that made 3566 * it all the way around the buffer, bail, and warn 3567 * about it. 3568 */ 3569 if (unlikely(next_page == commit_page)) { 3570 local_inc(&cpu_buffer->commit_overrun); 3571 goto out_reset; 3572 } 3573 3574 /* 3575 * This is where the fun begins! 3576 * 3577 * We are fighting against races between a reader that 3578 * could be on another CPU trying to swap its reader 3579 * page with the buffer head. 3580 * 3581 * We are also fighting against interrupts coming in and 3582 * moving the head or tail on us as well. 3583 * 3584 * If the next page is the head page then we have filled 3585 * the buffer, unless the commit page is still on the 3586 * reader page. 3587 */ 3588 if (rb_is_head_page(next_page, &tail_page->list)) { 3589 3590 /* 3591 * If the commit is not on the reader page, then 3592 * move the header page. 3593 */ 3594 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 3595 /* 3596 * If we are not in overwrite mode, 3597 * this is easy, just stop here. 3598 */ 3599 if (!(buffer->flags & RB_FL_OVERWRITE)) { 3600 local_inc(&cpu_buffer->dropped_events); 3601 goto out_reset; 3602 } 3603 3604 ret = rb_handle_head_page(cpu_buffer, 3605 tail_page, 3606 next_page); 3607 if (ret < 0) 3608 goto out_reset; 3609 if (ret) 3610 goto out_again; 3611 } else { 3612 /* 3613 * We need to be careful here too. The 3614 * commit page could still be on the reader 3615 * page. We could have a small buffer, and 3616 * have filled up the buffer with events 3617 * from interrupts and such, and wrapped. 3618 * 3619 * Note, if the tail page is also on the 3620 * reader_page, we let it move out. 3621 */ 3622 if (unlikely((cpu_buffer->commit_page != 3623 cpu_buffer->tail_page) && 3624 (cpu_buffer->commit_page == 3625 cpu_buffer->reader_page))) { 3626 local_inc(&cpu_buffer->commit_overrun); 3627 goto out_reset; 3628 } 3629 } 3630 } 3631 3632 rb_tail_page_update(cpu_buffer, tail_page, next_page); 3633 3634 out_again: 3635 3636 rb_reset_tail(cpu_buffer, tail, info); 3637 3638 /* Commit what we have for now. */ 3639 rb_end_commit(cpu_buffer); 3640 /* rb_end_commit() decs committing */ 3641 local_inc(&cpu_buffer->committing); 3642 3643 /* fail and let the caller try again */ 3644 return ERR_PTR(-EAGAIN); 3645 3646 out_reset: 3647 /* reset write */ 3648 rb_reset_tail(cpu_buffer, tail, info); 3649 3650 return NULL; 3651 } 3652 3653 /* Slow path */ 3654 static struct ring_buffer_event * 3655 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 3656 struct ring_buffer_event *event, u64 delta, bool abs) 3657 { 3658 if (abs) 3659 event->type_len = RINGBUF_TYPE_TIME_STAMP; 3660 else 3661 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 3662 3663 /* Not the first event on the page, or not delta? */ 3664 if (abs || rb_event_index(cpu_buffer, event)) { 3665 event->time_delta = delta & TS_MASK; 3666 event->array[0] = delta >> TS_SHIFT; 3667 } else { 3668 /* nope, just zero it */ 3669 event->time_delta = 0; 3670 event->array[0] = 0; 3671 } 3672 3673 return skip_time_extend(event); 3674 } 3675 3676 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 3677 static inline bool sched_clock_stable(void) 3678 { 3679 return true; 3680 } 3681 #endif 3682 3683 static void 3684 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3685 struct rb_event_info *info) 3686 { 3687 u64 write_stamp; 3688 3689 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 3690 (unsigned long long)info->delta, 3691 (unsigned long long)info->ts, 3692 (unsigned long long)info->before, 3693 (unsigned long long)info->after, 3694 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 3695 sched_clock_stable() ? "" : 3696 "If you just came from a suspend/resume,\n" 3697 "please switch to the trace global clock:\n" 3698 " echo global > /sys/kernel/tracing/trace_clock\n" 3699 "or add trace_clock=global to the kernel command line\n"); 3700 } 3701 3702 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 3703 struct ring_buffer_event **event, 3704 struct rb_event_info *info, 3705 u64 *delta, 3706 unsigned int *length) 3707 { 3708 bool abs = info->add_timestamp & 3709 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 3710 3711 if (unlikely(info->delta > (1ULL << 59))) { 3712 /* 3713 * Some timers can use more than 59 bits, and when a timestamp 3714 * is added to the buffer, it will lose those bits. 3715 */ 3716 if (abs && (info->ts & TS_MSB)) { 3717 info->delta &= ABS_TS_MASK; 3718 3719 /* did the clock go backwards */ 3720 } else if (info->before == info->after && info->before > info->ts) { 3721 /* not interrupted */ 3722 static int once; 3723 3724 /* 3725 * This is possible with a recalibrating of the TSC. 3726 * Do not produce a call stack, but just report it. 3727 */ 3728 if (!once) { 3729 once++; 3730 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 3731 info->before, info->ts); 3732 } 3733 } else 3734 rb_check_timestamp(cpu_buffer, info); 3735 if (!abs) 3736 info->delta = 0; 3737 } 3738 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 3739 *length -= RB_LEN_TIME_EXTEND; 3740 *delta = 0; 3741 } 3742 3743 /** 3744 * rb_update_event - update event type and data 3745 * @cpu_buffer: The per cpu buffer of the @event 3746 * @event: the event to update 3747 * @info: The info to update the @event with (contains length and delta) 3748 * 3749 * Update the type and data fields of the @event. The length 3750 * is the actual size that is written to the ring buffer, 3751 * and with this, we can determine what to place into the 3752 * data field. 3753 */ 3754 static void 3755 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 3756 struct ring_buffer_event *event, 3757 struct rb_event_info *info) 3758 { 3759 unsigned length = info->length; 3760 u64 delta = info->delta; 3761 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 3762 3763 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 3764 cpu_buffer->event_stamp[nest] = info->ts; 3765 3766 /* 3767 * If we need to add a timestamp, then we 3768 * add it to the start of the reserved space. 3769 */ 3770 if (unlikely(info->add_timestamp)) 3771 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 3772 3773 event->time_delta = delta; 3774 length -= RB_EVNT_HDR_SIZE; 3775 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 3776 event->type_len = 0; 3777 event->array[0] = length; 3778 } else 3779 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 3780 } 3781 3782 static unsigned rb_calculate_event_length(unsigned length) 3783 { 3784 struct ring_buffer_event event; /* Used only for sizeof array */ 3785 3786 /* zero length can cause confusions */ 3787 if (!length) 3788 length++; 3789 3790 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 3791 length += sizeof(event.array[0]); 3792 3793 length += RB_EVNT_HDR_SIZE; 3794 length = ALIGN(length, RB_ARCH_ALIGNMENT); 3795 3796 /* 3797 * In case the time delta is larger than the 27 bits for it 3798 * in the header, we need to add a timestamp. If another 3799 * event comes in when trying to discard this one to increase 3800 * the length, then the timestamp will be added in the allocated 3801 * space of this event. If length is bigger than the size needed 3802 * for the TIME_EXTEND, then padding has to be used. The events 3803 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 3804 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 3805 * As length is a multiple of 4, we only need to worry if it 3806 * is 12 (RB_LEN_TIME_EXTEND + 4). 3807 */ 3808 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 3809 length += RB_ALIGNMENT; 3810 3811 return length; 3812 } 3813 3814 static inline bool 3815 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 3816 struct ring_buffer_event *event) 3817 { 3818 unsigned long new_index, old_index; 3819 struct buffer_page *bpage; 3820 unsigned long addr; 3821 3822 new_index = rb_event_index(cpu_buffer, event); 3823 old_index = new_index + rb_event_ts_length(event); 3824 addr = (unsigned long)event; 3825 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3826 3827 bpage = READ_ONCE(cpu_buffer->tail_page); 3828 3829 /* 3830 * Make sure the tail_page is still the same and 3831 * the next write location is the end of this event 3832 */ 3833 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 3834 unsigned long write_mask = 3835 local_read(&bpage->write) & ~RB_WRITE_MASK; 3836 unsigned long event_length = rb_event_length(event); 3837 3838 /* 3839 * For the before_stamp to be different than the write_stamp 3840 * to make sure that the next event adds an absolute 3841 * value and does not rely on the saved write stamp, which 3842 * is now going to be bogus. 3843 * 3844 * By setting the before_stamp to zero, the next event 3845 * is not going to use the write_stamp and will instead 3846 * create an absolute timestamp. This means there's no 3847 * reason to update the wirte_stamp! 3848 */ 3849 rb_time_set(&cpu_buffer->before_stamp, 0); 3850 3851 /* 3852 * If an event were to come in now, it would see that the 3853 * write_stamp and the before_stamp are different, and assume 3854 * that this event just added itself before updating 3855 * the write stamp. The interrupting event will fix the 3856 * write stamp for us, and use an absolute timestamp. 3857 */ 3858 3859 /* 3860 * This is on the tail page. It is possible that 3861 * a write could come in and move the tail page 3862 * and write to the next page. That is fine 3863 * because we just shorten what is on this page. 3864 */ 3865 old_index += write_mask; 3866 new_index += write_mask; 3867 3868 /* caution: old_index gets updated on cmpxchg failure */ 3869 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 3870 /* update counters */ 3871 local_sub(event_length, &cpu_buffer->entries_bytes); 3872 return true; 3873 } 3874 } 3875 3876 /* could not discard */ 3877 return false; 3878 } 3879 3880 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 3881 { 3882 local_inc(&cpu_buffer->committing); 3883 local_inc(&cpu_buffer->commits); 3884 } 3885 3886 static __always_inline void 3887 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 3888 { 3889 unsigned long max_count; 3890 3891 /* 3892 * We only race with interrupts and NMIs on this CPU. 3893 * If we own the commit event, then we can commit 3894 * all others that interrupted us, since the interruptions 3895 * are in stack format (they finish before they come 3896 * back to us). This allows us to do a simple loop to 3897 * assign the commit to the tail. 3898 */ 3899 again: 3900 max_count = cpu_buffer->nr_pages * 100; 3901 3902 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 3903 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 3904 return; 3905 if (RB_WARN_ON(cpu_buffer, 3906 rb_is_reader_page(cpu_buffer->tail_page))) 3907 return; 3908 /* 3909 * No need for a memory barrier here, as the update 3910 * of the tail_page did it for this page. 3911 */ 3912 local_set(&cpu_buffer->commit_page->page->commit, 3913 rb_page_write(cpu_buffer->commit_page)); 3914 rb_inc_page(&cpu_buffer->commit_page); 3915 if (cpu_buffer->ring_meta) { 3916 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 3917 meta->commit_buffer = (unsigned long)cpu_buffer->commit_page->page; 3918 } 3919 /* add barrier to keep gcc from optimizing too much */ 3920 barrier(); 3921 } 3922 while (rb_commit_index(cpu_buffer) != 3923 rb_page_write(cpu_buffer->commit_page)) { 3924 3925 /* Make sure the readers see the content of what is committed. */ 3926 smp_wmb(); 3927 local_set(&cpu_buffer->commit_page->page->commit, 3928 rb_page_write(cpu_buffer->commit_page)); 3929 RB_WARN_ON(cpu_buffer, 3930 local_read(&cpu_buffer->commit_page->page->commit) & 3931 ~RB_WRITE_MASK); 3932 barrier(); 3933 } 3934 3935 /* again, keep gcc from optimizing */ 3936 barrier(); 3937 3938 /* 3939 * If an interrupt came in just after the first while loop 3940 * and pushed the tail page forward, we will be left with 3941 * a dangling commit that will never go forward. 3942 */ 3943 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3944 goto again; 3945 } 3946 3947 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3948 { 3949 unsigned long commits; 3950 3951 if (RB_WARN_ON(cpu_buffer, 3952 !local_read(&cpu_buffer->committing))) 3953 return; 3954 3955 again: 3956 commits = local_read(&cpu_buffer->commits); 3957 /* synchronize with interrupts */ 3958 barrier(); 3959 if (local_read(&cpu_buffer->committing) == 1) 3960 rb_set_commit_to_write(cpu_buffer); 3961 3962 local_dec(&cpu_buffer->committing); 3963 3964 /* synchronize with interrupts */ 3965 barrier(); 3966 3967 /* 3968 * Need to account for interrupts coming in between the 3969 * updating of the commit page and the clearing of the 3970 * committing counter. 3971 */ 3972 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3973 !local_read(&cpu_buffer->committing)) { 3974 local_inc(&cpu_buffer->committing); 3975 goto again; 3976 } 3977 } 3978 3979 static inline void rb_event_discard(struct ring_buffer_event *event) 3980 { 3981 if (extended_time(event)) 3982 event = skip_time_extend(event); 3983 3984 /* array[0] holds the actual length for the discarded event */ 3985 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3986 event->type_len = RINGBUF_TYPE_PADDING; 3987 /* time delta must be non zero */ 3988 if (!event->time_delta) 3989 event->time_delta = 1; 3990 } 3991 3992 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3993 { 3994 local_inc(&cpu_buffer->entries); 3995 rb_end_commit(cpu_buffer); 3996 } 3997 3998 static __always_inline void 3999 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 4000 { 4001 if (buffer->irq_work.waiters_pending) { 4002 buffer->irq_work.waiters_pending = false; 4003 /* irq_work_queue() supplies it's own memory barriers */ 4004 irq_work_queue(&buffer->irq_work.work); 4005 } 4006 4007 if (cpu_buffer->irq_work.waiters_pending) { 4008 cpu_buffer->irq_work.waiters_pending = false; 4009 /* irq_work_queue() supplies it's own memory barriers */ 4010 irq_work_queue(&cpu_buffer->irq_work.work); 4011 } 4012 4013 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 4014 return; 4015 4016 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 4017 return; 4018 4019 if (!cpu_buffer->irq_work.full_waiters_pending) 4020 return; 4021 4022 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 4023 4024 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 4025 return; 4026 4027 cpu_buffer->irq_work.wakeup_full = true; 4028 cpu_buffer->irq_work.full_waiters_pending = false; 4029 /* irq_work_queue() supplies it's own memory barriers */ 4030 irq_work_queue(&cpu_buffer->irq_work.work); 4031 } 4032 4033 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 4034 # define do_ring_buffer_record_recursion() \ 4035 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 4036 #else 4037 # define do_ring_buffer_record_recursion() do { } while (0) 4038 #endif 4039 4040 /* 4041 * The lock and unlock are done within a preempt disable section. 4042 * The current_context per_cpu variable can only be modified 4043 * by the current task between lock and unlock. But it can 4044 * be modified more than once via an interrupt. To pass this 4045 * information from the lock to the unlock without having to 4046 * access the 'in_interrupt()' functions again (which do show 4047 * a bit of overhead in something as critical as function tracing, 4048 * we use a bitmask trick. 4049 * 4050 * bit 1 = NMI context 4051 * bit 2 = IRQ context 4052 * bit 3 = SoftIRQ context 4053 * bit 4 = normal context. 4054 * 4055 * This works because this is the order of contexts that can 4056 * preempt other contexts. A SoftIRQ never preempts an IRQ 4057 * context. 4058 * 4059 * When the context is determined, the corresponding bit is 4060 * checked and set (if it was set, then a recursion of that context 4061 * happened). 4062 * 4063 * On unlock, we need to clear this bit. To do so, just subtract 4064 * 1 from the current_context and AND it to itself. 4065 * 4066 * (binary) 4067 * 101 - 1 = 100 4068 * 101 & 100 = 100 (clearing bit zero) 4069 * 4070 * 1010 - 1 = 1001 4071 * 1010 & 1001 = 1000 (clearing bit 1) 4072 * 4073 * The least significant bit can be cleared this way, and it 4074 * just so happens that it is the same bit corresponding to 4075 * the current context. 4076 * 4077 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 4078 * is set when a recursion is detected at the current context, and if 4079 * the TRANSITION bit is already set, it will fail the recursion. 4080 * This is needed because there's a lag between the changing of 4081 * interrupt context and updating the preempt count. In this case, 4082 * a false positive will be found. To handle this, one extra recursion 4083 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 4084 * bit is already set, then it is considered a recursion and the function 4085 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 4086 * 4087 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 4088 * to be cleared. Even if it wasn't the context that set it. That is, 4089 * if an interrupt comes in while NORMAL bit is set and the ring buffer 4090 * is called before preempt_count() is updated, since the check will 4091 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 4092 * NMI then comes in, it will set the NMI bit, but when the NMI code 4093 * does the trace_recursive_unlock() it will clear the TRANSITION bit 4094 * and leave the NMI bit set. But this is fine, because the interrupt 4095 * code that set the TRANSITION bit will then clear the NMI bit when it 4096 * calls trace_recursive_unlock(). If another NMI comes in, it will 4097 * set the TRANSITION bit and continue. 4098 * 4099 * Note: The TRANSITION bit only handles a single transition between context. 4100 */ 4101 4102 static __always_inline bool 4103 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 4104 { 4105 unsigned int val = cpu_buffer->current_context; 4106 int bit = interrupt_context_level(); 4107 4108 bit = RB_CTX_NORMAL - bit; 4109 4110 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 4111 /* 4112 * It is possible that this was called by transitioning 4113 * between interrupt context, and preempt_count() has not 4114 * been updated yet. In this case, use the TRANSITION bit. 4115 */ 4116 bit = RB_CTX_TRANSITION; 4117 if (val & (1 << (bit + cpu_buffer->nest))) { 4118 do_ring_buffer_record_recursion(); 4119 return true; 4120 } 4121 } 4122 4123 val |= (1 << (bit + cpu_buffer->nest)); 4124 cpu_buffer->current_context = val; 4125 4126 return false; 4127 } 4128 4129 static __always_inline void 4130 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 4131 { 4132 cpu_buffer->current_context &= 4133 cpu_buffer->current_context - (1 << cpu_buffer->nest); 4134 } 4135 4136 /* The recursive locking above uses 5 bits */ 4137 #define NESTED_BITS 5 4138 4139 /** 4140 * ring_buffer_nest_start - Allow to trace while nested 4141 * @buffer: The ring buffer to modify 4142 * 4143 * The ring buffer has a safety mechanism to prevent recursion. 4144 * But there may be a case where a trace needs to be done while 4145 * tracing something else. In this case, calling this function 4146 * will allow this function to nest within a currently active 4147 * ring_buffer_lock_reserve(). 4148 * 4149 * Call this function before calling another ring_buffer_lock_reserve() and 4150 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 4151 */ 4152 void ring_buffer_nest_start(struct trace_buffer *buffer) 4153 { 4154 struct ring_buffer_per_cpu *cpu_buffer; 4155 int cpu; 4156 4157 /* Enabled by ring_buffer_nest_end() */ 4158 preempt_disable_notrace(); 4159 cpu = raw_smp_processor_id(); 4160 cpu_buffer = buffer->buffers[cpu]; 4161 /* This is the shift value for the above recursive locking */ 4162 cpu_buffer->nest += NESTED_BITS; 4163 } 4164 4165 /** 4166 * ring_buffer_nest_end - Allow to trace while nested 4167 * @buffer: The ring buffer to modify 4168 * 4169 * Must be called after ring_buffer_nest_start() and after the 4170 * ring_buffer_unlock_commit(). 4171 */ 4172 void ring_buffer_nest_end(struct trace_buffer *buffer) 4173 { 4174 struct ring_buffer_per_cpu *cpu_buffer; 4175 int cpu; 4176 4177 /* disabled by ring_buffer_nest_start() */ 4178 cpu = raw_smp_processor_id(); 4179 cpu_buffer = buffer->buffers[cpu]; 4180 /* This is the shift value for the above recursive locking */ 4181 cpu_buffer->nest -= NESTED_BITS; 4182 preempt_enable_notrace(); 4183 } 4184 4185 /** 4186 * ring_buffer_unlock_commit - commit a reserved 4187 * @buffer: The buffer to commit to 4188 * 4189 * This commits the data to the ring buffer, and releases any locks held. 4190 * 4191 * Must be paired with ring_buffer_lock_reserve. 4192 */ 4193 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 4194 { 4195 struct ring_buffer_per_cpu *cpu_buffer; 4196 int cpu = raw_smp_processor_id(); 4197 4198 cpu_buffer = buffer->buffers[cpu]; 4199 4200 rb_commit(cpu_buffer); 4201 4202 rb_wakeups(buffer, cpu_buffer); 4203 4204 trace_recursive_unlock(cpu_buffer); 4205 4206 preempt_enable_notrace(); 4207 4208 return 0; 4209 } 4210 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 4211 4212 /* Special value to validate all deltas on a page. */ 4213 #define CHECK_FULL_PAGE 1L 4214 4215 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 4216 4217 static const char *show_irq_str(int bits) 4218 { 4219 static const char * type[] = { 4220 ".", // 0 4221 "s", // 1 4222 "h", // 2 4223 "Hs", // 3 4224 "n", // 4 4225 "Ns", // 5 4226 "Nh", // 6 4227 "NHs", // 7 4228 }; 4229 4230 return type[bits]; 4231 } 4232 4233 /* Assume this is a trace event */ 4234 static const char *show_flags(struct ring_buffer_event *event) 4235 { 4236 struct trace_entry *entry; 4237 int bits = 0; 4238 4239 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4240 return "X"; 4241 4242 entry = ring_buffer_event_data(event); 4243 4244 if (entry->flags & TRACE_FLAG_SOFTIRQ) 4245 bits |= 1; 4246 4247 if (entry->flags & TRACE_FLAG_HARDIRQ) 4248 bits |= 2; 4249 4250 if (entry->flags & TRACE_FLAG_NMI) 4251 bits |= 4; 4252 4253 return show_irq_str(bits); 4254 } 4255 4256 static const char *show_irq(struct ring_buffer_event *event) 4257 { 4258 struct trace_entry *entry; 4259 4260 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 4261 return ""; 4262 4263 entry = ring_buffer_event_data(event); 4264 if (entry->flags & TRACE_FLAG_IRQS_OFF) 4265 return "d"; 4266 return ""; 4267 } 4268 4269 static const char *show_interrupt_level(void) 4270 { 4271 unsigned long pc = preempt_count(); 4272 unsigned char level = 0; 4273 4274 if (pc & SOFTIRQ_OFFSET) 4275 level |= 1; 4276 4277 if (pc & HARDIRQ_MASK) 4278 level |= 2; 4279 4280 if (pc & NMI_MASK) 4281 level |= 4; 4282 4283 return show_irq_str(level); 4284 } 4285 4286 static void dump_buffer_page(struct buffer_data_page *bpage, 4287 struct rb_event_info *info, 4288 unsigned long tail) 4289 { 4290 struct ring_buffer_event *event; 4291 u64 ts, delta; 4292 int e; 4293 4294 ts = bpage->time_stamp; 4295 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 4296 4297 for (e = 0; e < tail; e += rb_event_length(event)) { 4298 4299 event = (struct ring_buffer_event *)(bpage->data + e); 4300 4301 switch (event->type_len) { 4302 4303 case RINGBUF_TYPE_TIME_EXTEND: 4304 delta = rb_event_time_stamp(event); 4305 ts += delta; 4306 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 4307 e, ts, delta); 4308 break; 4309 4310 case RINGBUF_TYPE_TIME_STAMP: 4311 delta = rb_event_time_stamp(event); 4312 ts = rb_fix_abs_ts(delta, ts); 4313 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 4314 e, ts, delta); 4315 break; 4316 4317 case RINGBUF_TYPE_PADDING: 4318 ts += event->time_delta; 4319 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 4320 e, ts, event->time_delta); 4321 break; 4322 4323 case RINGBUF_TYPE_DATA: 4324 ts += event->time_delta; 4325 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 4326 e, ts, event->time_delta, 4327 show_flags(event), show_irq(event)); 4328 break; 4329 4330 default: 4331 break; 4332 } 4333 } 4334 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 4335 } 4336 4337 static DEFINE_PER_CPU(atomic_t, checking); 4338 static atomic_t ts_dump; 4339 4340 #define buffer_warn_return(fmt, ...) \ 4341 do { \ 4342 /* If another report is happening, ignore this one */ \ 4343 if (atomic_inc_return(&ts_dump) != 1) { \ 4344 atomic_dec(&ts_dump); \ 4345 goto out; \ 4346 } \ 4347 atomic_inc(&cpu_buffer->record_disabled); \ 4348 pr_warn(fmt, ##__VA_ARGS__); \ 4349 dump_buffer_page(bpage, info, tail); \ 4350 atomic_dec(&ts_dump); \ 4351 /* There's some cases in boot up that this can happen */ \ 4352 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 4353 /* Do not re-enable checking */ \ 4354 return; \ 4355 } while (0) 4356 4357 /* 4358 * Check if the current event time stamp matches the deltas on 4359 * the buffer page. 4360 */ 4361 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4362 struct rb_event_info *info, 4363 unsigned long tail) 4364 { 4365 struct buffer_data_page *bpage; 4366 u64 ts, delta; 4367 bool full = false; 4368 int ret; 4369 4370 bpage = info->tail_page->page; 4371 4372 if (tail == CHECK_FULL_PAGE) { 4373 full = true; 4374 tail = local_read(&bpage->commit); 4375 } else if (info->add_timestamp & 4376 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 4377 /* Ignore events with absolute time stamps */ 4378 return; 4379 } 4380 4381 /* 4382 * Do not check the first event (skip possible extends too). 4383 * Also do not check if previous events have not been committed. 4384 */ 4385 if (tail <= 8 || tail > local_read(&bpage->commit)) 4386 return; 4387 4388 /* 4389 * If this interrupted another event, 4390 */ 4391 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 4392 goto out; 4393 4394 ret = rb_read_data_buffer(bpage, tail, cpu_buffer->cpu, &ts, &delta); 4395 if (ret < 0) { 4396 if (delta < ts) { 4397 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 4398 cpu_buffer->cpu, ts, delta); 4399 goto out; 4400 } 4401 } 4402 if ((full && ts > info->ts) || 4403 (!full && ts + info->delta != info->ts)) { 4404 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 4405 cpu_buffer->cpu, 4406 ts + info->delta, info->ts, info->delta, 4407 info->before, info->after, 4408 full ? " (full)" : "", show_interrupt_level()); 4409 } 4410 out: 4411 atomic_dec(this_cpu_ptr(&checking)); 4412 } 4413 #else 4414 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 4415 struct rb_event_info *info, 4416 unsigned long tail) 4417 { 4418 } 4419 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 4420 4421 static struct ring_buffer_event * 4422 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 4423 struct rb_event_info *info) 4424 { 4425 struct ring_buffer_event *event; 4426 struct buffer_page *tail_page; 4427 unsigned long tail, write, w; 4428 4429 /* Don't let the compiler play games with cpu_buffer->tail_page */ 4430 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 4431 4432 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 4433 barrier(); 4434 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4435 rb_time_read(&cpu_buffer->write_stamp, &info->after); 4436 barrier(); 4437 info->ts = rb_time_stamp(cpu_buffer->buffer); 4438 4439 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 4440 info->delta = info->ts; 4441 } else { 4442 /* 4443 * If interrupting an event time update, we may need an 4444 * absolute timestamp. 4445 * Don't bother if this is the start of a new page (w == 0). 4446 */ 4447 if (!w) { 4448 /* Use the sub-buffer timestamp */ 4449 info->delta = 0; 4450 } else if (unlikely(info->before != info->after)) { 4451 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 4452 info->length += RB_LEN_TIME_EXTEND; 4453 } else { 4454 info->delta = info->ts - info->after; 4455 if (unlikely(test_time_stamp(info->delta))) { 4456 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 4457 info->length += RB_LEN_TIME_EXTEND; 4458 } 4459 } 4460 } 4461 4462 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 4463 4464 /*C*/ write = local_add_return(info->length, &tail_page->write); 4465 4466 /* set write to only the index of the write */ 4467 write &= RB_WRITE_MASK; 4468 4469 tail = write - info->length; 4470 4471 /* See if we shot pass the end of this buffer page */ 4472 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 4473 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 4474 return rb_move_tail(cpu_buffer, tail, info); 4475 } 4476 4477 if (likely(tail == w)) { 4478 /* Nothing interrupted us between A and C */ 4479 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 4480 /* 4481 * If something came in between C and D, the write stamp 4482 * may now not be in sync. But that's fine as the before_stamp 4483 * will be different and then next event will just be forced 4484 * to use an absolute timestamp. 4485 */ 4486 if (likely(!(info->add_timestamp & 4487 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4488 /* This did not interrupt any time update */ 4489 info->delta = info->ts - info->after; 4490 else 4491 /* Just use full timestamp for interrupting event */ 4492 info->delta = info->ts; 4493 check_buffer(cpu_buffer, info, tail); 4494 } else { 4495 u64 ts; 4496 /* SLOW PATH - Interrupted between A and C */ 4497 4498 /* Save the old before_stamp */ 4499 rb_time_read(&cpu_buffer->before_stamp, &info->before); 4500 4501 /* 4502 * Read a new timestamp and update the before_stamp to make 4503 * the next event after this one force using an absolute 4504 * timestamp. This is in case an interrupt were to come in 4505 * between E and F. 4506 */ 4507 ts = rb_time_stamp(cpu_buffer->buffer); 4508 rb_time_set(&cpu_buffer->before_stamp, ts); 4509 4510 barrier(); 4511 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 4512 barrier(); 4513 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 4514 info->after == info->before && info->after < ts) { 4515 /* 4516 * Nothing came after this event between C and F, it is 4517 * safe to use info->after for the delta as it 4518 * matched info->before and is still valid. 4519 */ 4520 info->delta = ts - info->after; 4521 } else { 4522 /* 4523 * Interrupted between C and F: 4524 * Lost the previous events time stamp. Just set the 4525 * delta to zero, and this will be the same time as 4526 * the event this event interrupted. And the events that 4527 * came after this will still be correct (as they would 4528 * have built their delta on the previous event. 4529 */ 4530 info->delta = 0; 4531 } 4532 info->ts = ts; 4533 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 4534 } 4535 4536 /* 4537 * If this is the first commit on the page, then it has the same 4538 * timestamp as the page itself. 4539 */ 4540 if (unlikely(!tail && !(info->add_timestamp & 4541 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 4542 info->delta = 0; 4543 4544 /* We reserved something on the buffer */ 4545 4546 event = __rb_page_index(tail_page, tail); 4547 rb_update_event(cpu_buffer, event, info); 4548 4549 local_inc(&tail_page->entries); 4550 4551 /* 4552 * If this is the first commit on the page, then update 4553 * its timestamp. 4554 */ 4555 if (unlikely(!tail)) 4556 tail_page->page->time_stamp = info->ts; 4557 4558 /* account for these added bytes */ 4559 local_add(info->length, &cpu_buffer->entries_bytes); 4560 4561 return event; 4562 } 4563 4564 static __always_inline struct ring_buffer_event * 4565 rb_reserve_next_event(struct trace_buffer *buffer, 4566 struct ring_buffer_per_cpu *cpu_buffer, 4567 unsigned long length) 4568 { 4569 struct ring_buffer_event *event; 4570 struct rb_event_info info; 4571 int nr_loops = 0; 4572 int add_ts_default; 4573 4574 /* 4575 * ring buffer does cmpxchg as well as atomic64 operations 4576 * (which some archs use locking for atomic64), make sure this 4577 * is safe in NMI context 4578 */ 4579 if ((!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) || 4580 IS_ENABLED(CONFIG_GENERIC_ATOMIC64)) && 4581 (unlikely(in_nmi()))) { 4582 return NULL; 4583 } 4584 4585 rb_start_commit(cpu_buffer); 4586 /* The commit page can not change after this */ 4587 4588 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 4589 /* 4590 * Due to the ability to swap a cpu buffer from a buffer 4591 * it is possible it was swapped before we committed. 4592 * (committing stops a swap). We check for it here and 4593 * if it happened, we have to fail the write. 4594 */ 4595 barrier(); 4596 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 4597 local_dec(&cpu_buffer->committing); 4598 local_dec(&cpu_buffer->commits); 4599 return NULL; 4600 } 4601 #endif 4602 4603 info.length = rb_calculate_event_length(length); 4604 4605 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 4606 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 4607 info.length += RB_LEN_TIME_EXTEND; 4608 if (info.length > cpu_buffer->buffer->max_data_size) 4609 goto out_fail; 4610 } else { 4611 add_ts_default = RB_ADD_STAMP_NONE; 4612 } 4613 4614 again: 4615 info.add_timestamp = add_ts_default; 4616 info.delta = 0; 4617 4618 /* 4619 * We allow for interrupts to reenter here and do a trace. 4620 * If one does, it will cause this original code to loop 4621 * back here. Even with heavy interrupts happening, this 4622 * should only happen a few times in a row. If this happens 4623 * 1000 times in a row, there must be either an interrupt 4624 * storm or we have something buggy. 4625 * Bail! 4626 */ 4627 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 4628 goto out_fail; 4629 4630 event = __rb_reserve_next(cpu_buffer, &info); 4631 4632 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 4633 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 4634 info.length -= RB_LEN_TIME_EXTEND; 4635 goto again; 4636 } 4637 4638 if (likely(event)) 4639 return event; 4640 out_fail: 4641 rb_end_commit(cpu_buffer); 4642 return NULL; 4643 } 4644 4645 /** 4646 * ring_buffer_lock_reserve - reserve a part of the buffer 4647 * @buffer: the ring buffer to reserve from 4648 * @length: the length of the data to reserve (excluding event header) 4649 * 4650 * Returns a reserved event on the ring buffer to copy directly to. 4651 * The user of this interface will need to get the body to write into 4652 * and can use the ring_buffer_event_data() interface. 4653 * 4654 * The length is the length of the data needed, not the event length 4655 * which also includes the event header. 4656 * 4657 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 4658 * If NULL is returned, then nothing has been allocated or locked. 4659 */ 4660 struct ring_buffer_event * 4661 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 4662 { 4663 struct ring_buffer_per_cpu *cpu_buffer; 4664 struct ring_buffer_event *event; 4665 int cpu; 4666 4667 /* If we are tracing schedule, we don't want to recurse */ 4668 preempt_disable_notrace(); 4669 4670 if (unlikely(atomic_read(&buffer->record_disabled))) 4671 goto out; 4672 4673 cpu = raw_smp_processor_id(); 4674 4675 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 4676 goto out; 4677 4678 cpu_buffer = buffer->buffers[cpu]; 4679 4680 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 4681 goto out; 4682 4683 if (unlikely(length > buffer->max_data_size)) 4684 goto out; 4685 4686 if (unlikely(trace_recursive_lock(cpu_buffer))) 4687 goto out; 4688 4689 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4690 if (!event) 4691 goto out_unlock; 4692 4693 return event; 4694 4695 out_unlock: 4696 trace_recursive_unlock(cpu_buffer); 4697 out: 4698 preempt_enable_notrace(); 4699 return NULL; 4700 } 4701 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 4702 4703 /* 4704 * Decrement the entries to the page that an event is on. 4705 * The event does not even need to exist, only the pointer 4706 * to the page it is on. This may only be called before the commit 4707 * takes place. 4708 */ 4709 static inline void 4710 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 4711 struct ring_buffer_event *event) 4712 { 4713 unsigned long addr = (unsigned long)event; 4714 struct buffer_page *bpage = cpu_buffer->commit_page; 4715 struct buffer_page *start; 4716 4717 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 4718 4719 /* Do the likely case first */ 4720 if (likely(bpage->page == (void *)addr)) { 4721 local_dec(&bpage->entries); 4722 return; 4723 } 4724 4725 /* 4726 * Because the commit page may be on the reader page we 4727 * start with the next page and check the end loop there. 4728 */ 4729 rb_inc_page(&bpage); 4730 start = bpage; 4731 do { 4732 if (bpage->page == (void *)addr) { 4733 local_dec(&bpage->entries); 4734 return; 4735 } 4736 rb_inc_page(&bpage); 4737 } while (bpage != start); 4738 4739 /* commit not part of this buffer?? */ 4740 RB_WARN_ON(cpu_buffer, 1); 4741 } 4742 4743 /** 4744 * ring_buffer_discard_commit - discard an event that has not been committed 4745 * @buffer: the ring buffer 4746 * @event: non committed event to discard 4747 * 4748 * Sometimes an event that is in the ring buffer needs to be ignored. 4749 * This function lets the user discard an event in the ring buffer 4750 * and then that event will not be read later. 4751 * 4752 * This function only works if it is called before the item has been 4753 * committed. It will try to free the event from the ring buffer 4754 * if another event has not been added behind it. 4755 * 4756 * If another event has been added behind it, it will set the event 4757 * up as discarded, and perform the commit. 4758 * 4759 * If this function is called, do not call ring_buffer_unlock_commit on 4760 * the event. 4761 */ 4762 void ring_buffer_discard_commit(struct trace_buffer *buffer, 4763 struct ring_buffer_event *event) 4764 { 4765 struct ring_buffer_per_cpu *cpu_buffer; 4766 int cpu; 4767 4768 /* The event is discarded regardless */ 4769 rb_event_discard(event); 4770 4771 cpu = smp_processor_id(); 4772 cpu_buffer = buffer->buffers[cpu]; 4773 4774 /* 4775 * This must only be called if the event has not been 4776 * committed yet. Thus we can assume that preemption 4777 * is still disabled. 4778 */ 4779 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 4780 4781 rb_decrement_entry(cpu_buffer, event); 4782 rb_try_to_discard(cpu_buffer, event); 4783 rb_end_commit(cpu_buffer); 4784 4785 trace_recursive_unlock(cpu_buffer); 4786 4787 preempt_enable_notrace(); 4788 4789 } 4790 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 4791 4792 /** 4793 * ring_buffer_write - write data to the buffer without reserving 4794 * @buffer: The ring buffer to write to. 4795 * @length: The length of the data being written (excluding the event header) 4796 * @data: The data to write to the buffer. 4797 * 4798 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 4799 * one function. If you already have the data to write to the buffer, it 4800 * may be easier to simply call this function. 4801 * 4802 * Note, like ring_buffer_lock_reserve, the length is the length of the data 4803 * and not the length of the event which would hold the header. 4804 */ 4805 int ring_buffer_write(struct trace_buffer *buffer, 4806 unsigned long length, 4807 void *data) 4808 { 4809 struct ring_buffer_per_cpu *cpu_buffer; 4810 struct ring_buffer_event *event; 4811 void *body; 4812 int ret = -EBUSY; 4813 int cpu; 4814 4815 guard(preempt_notrace)(); 4816 4817 if (atomic_read(&buffer->record_disabled)) 4818 return -EBUSY; 4819 4820 cpu = raw_smp_processor_id(); 4821 4822 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4823 return -EBUSY; 4824 4825 cpu_buffer = buffer->buffers[cpu]; 4826 4827 if (atomic_read(&cpu_buffer->record_disabled)) 4828 return -EBUSY; 4829 4830 if (length > buffer->max_data_size) 4831 return -EBUSY; 4832 4833 if (unlikely(trace_recursive_lock(cpu_buffer))) 4834 return -EBUSY; 4835 4836 event = rb_reserve_next_event(buffer, cpu_buffer, length); 4837 if (!event) 4838 goto out_unlock; 4839 4840 body = rb_event_data(event); 4841 4842 memcpy(body, data, length); 4843 4844 rb_commit(cpu_buffer); 4845 4846 rb_wakeups(buffer, cpu_buffer); 4847 4848 ret = 0; 4849 4850 out_unlock: 4851 trace_recursive_unlock(cpu_buffer); 4852 return ret; 4853 } 4854 EXPORT_SYMBOL_GPL(ring_buffer_write); 4855 4856 /* 4857 * The total entries in the ring buffer is the running counter 4858 * of entries entered into the ring buffer, minus the sum of 4859 * the entries read from the ring buffer and the number of 4860 * entries that were overwritten. 4861 */ 4862 static inline unsigned long 4863 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4864 { 4865 return local_read(&cpu_buffer->entries) - 4866 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4867 } 4868 4869 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 4870 { 4871 return !rb_num_of_entries(cpu_buffer); 4872 } 4873 4874 /** 4875 * ring_buffer_record_disable - stop all writes into the buffer 4876 * @buffer: The ring buffer to stop writes to. 4877 * 4878 * This prevents all writes to the buffer. Any attempt to write 4879 * to the buffer after this will fail and return NULL. 4880 * 4881 * The caller should call synchronize_rcu() after this. 4882 */ 4883 void ring_buffer_record_disable(struct trace_buffer *buffer) 4884 { 4885 atomic_inc(&buffer->record_disabled); 4886 } 4887 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4888 4889 /** 4890 * ring_buffer_record_enable - enable writes to the buffer 4891 * @buffer: The ring buffer to enable writes 4892 * 4893 * Note, multiple disables will need the same number of enables 4894 * to truly enable the writing (much like preempt_disable). 4895 */ 4896 void ring_buffer_record_enable(struct trace_buffer *buffer) 4897 { 4898 atomic_dec(&buffer->record_disabled); 4899 } 4900 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4901 4902 /** 4903 * ring_buffer_record_off - stop all writes into the buffer 4904 * @buffer: The ring buffer to stop writes to. 4905 * 4906 * This prevents all writes to the buffer. Any attempt to write 4907 * to the buffer after this will fail and return NULL. 4908 * 4909 * This is different than ring_buffer_record_disable() as 4910 * it works like an on/off switch, where as the disable() version 4911 * must be paired with a enable(). 4912 */ 4913 void ring_buffer_record_off(struct trace_buffer *buffer) 4914 { 4915 unsigned int rd; 4916 unsigned int new_rd; 4917 4918 rd = atomic_read(&buffer->record_disabled); 4919 do { 4920 new_rd = rd | RB_BUFFER_OFF; 4921 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4922 } 4923 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4924 4925 /** 4926 * ring_buffer_record_on - restart writes into the buffer 4927 * @buffer: The ring buffer to start writes to. 4928 * 4929 * This enables all writes to the buffer that was disabled by 4930 * ring_buffer_record_off(). 4931 * 4932 * This is different than ring_buffer_record_enable() as 4933 * it works like an on/off switch, where as the enable() version 4934 * must be paired with a disable(). 4935 */ 4936 void ring_buffer_record_on(struct trace_buffer *buffer) 4937 { 4938 unsigned int rd; 4939 unsigned int new_rd; 4940 4941 rd = atomic_read(&buffer->record_disabled); 4942 do { 4943 new_rd = rd & ~RB_BUFFER_OFF; 4944 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4945 } 4946 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4947 4948 /** 4949 * ring_buffer_record_is_on - return true if the ring buffer can write 4950 * @buffer: The ring buffer to see if write is enabled 4951 * 4952 * Returns true if the ring buffer is in a state that it accepts writes. 4953 */ 4954 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4955 { 4956 return !atomic_read(&buffer->record_disabled); 4957 } 4958 4959 /** 4960 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4961 * @buffer: The ring buffer to see if write is set enabled 4962 * 4963 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4964 * Note that this does NOT mean it is in a writable state. 4965 * 4966 * It may return true when the ring buffer has been disabled by 4967 * ring_buffer_record_disable(), as that is a temporary disabling of 4968 * the ring buffer. 4969 */ 4970 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4971 { 4972 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4973 } 4974 4975 /** 4976 * ring_buffer_record_is_on_cpu - return true if the ring buffer can write 4977 * @buffer: The ring buffer to see if write is enabled 4978 * @cpu: The CPU to test if the ring buffer can write too 4979 * 4980 * Returns true if the ring buffer is in a state that it accepts writes 4981 * for a particular CPU. 4982 */ 4983 bool ring_buffer_record_is_on_cpu(struct trace_buffer *buffer, int cpu) 4984 { 4985 struct ring_buffer_per_cpu *cpu_buffer; 4986 4987 cpu_buffer = buffer->buffers[cpu]; 4988 4989 return ring_buffer_record_is_set_on(buffer) && 4990 !atomic_read(&cpu_buffer->record_disabled); 4991 } 4992 4993 /** 4994 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4995 * @buffer: The ring buffer to stop writes to. 4996 * @cpu: The CPU buffer to stop 4997 * 4998 * This prevents all writes to the buffer. Any attempt to write 4999 * to the buffer after this will fail and return NULL. 5000 * 5001 * The caller should call synchronize_rcu() after this. 5002 */ 5003 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 5004 { 5005 struct ring_buffer_per_cpu *cpu_buffer; 5006 5007 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5008 return; 5009 5010 cpu_buffer = buffer->buffers[cpu]; 5011 atomic_inc(&cpu_buffer->record_disabled); 5012 } 5013 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 5014 5015 /** 5016 * ring_buffer_record_enable_cpu - enable writes to the buffer 5017 * @buffer: The ring buffer to enable writes 5018 * @cpu: The CPU to enable. 5019 * 5020 * Note, multiple disables will need the same number of enables 5021 * to truly enable the writing (much like preempt_disable). 5022 */ 5023 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 5024 { 5025 struct ring_buffer_per_cpu *cpu_buffer; 5026 5027 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5028 return; 5029 5030 cpu_buffer = buffer->buffers[cpu]; 5031 atomic_dec(&cpu_buffer->record_disabled); 5032 } 5033 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 5034 5035 /** 5036 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 5037 * @buffer: The ring buffer 5038 * @cpu: The per CPU buffer to read from. 5039 */ 5040 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 5041 { 5042 unsigned long flags; 5043 struct ring_buffer_per_cpu *cpu_buffer; 5044 struct buffer_page *bpage; 5045 u64 ret = 0; 5046 5047 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5048 return 0; 5049 5050 cpu_buffer = buffer->buffers[cpu]; 5051 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5052 /* 5053 * if the tail is on reader_page, oldest time stamp is on the reader 5054 * page 5055 */ 5056 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 5057 bpage = cpu_buffer->reader_page; 5058 else 5059 bpage = rb_set_head_page(cpu_buffer); 5060 if (bpage) 5061 ret = bpage->page->time_stamp; 5062 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5063 5064 return ret; 5065 } 5066 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 5067 5068 /** 5069 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 5070 * @buffer: The ring buffer 5071 * @cpu: The per CPU buffer to read from. 5072 */ 5073 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 5074 { 5075 struct ring_buffer_per_cpu *cpu_buffer; 5076 unsigned long ret; 5077 5078 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5079 return 0; 5080 5081 cpu_buffer = buffer->buffers[cpu]; 5082 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 5083 5084 return ret; 5085 } 5086 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 5087 5088 /** 5089 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 5090 * @buffer: The ring buffer 5091 * @cpu: The per CPU buffer to get the entries from. 5092 */ 5093 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 5094 { 5095 struct ring_buffer_per_cpu *cpu_buffer; 5096 5097 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5098 return 0; 5099 5100 cpu_buffer = buffer->buffers[cpu]; 5101 5102 return rb_num_of_entries(cpu_buffer); 5103 } 5104 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 5105 5106 /** 5107 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 5108 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 5109 * @buffer: The ring buffer 5110 * @cpu: The per CPU buffer to get the number of overruns from 5111 */ 5112 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 5113 { 5114 struct ring_buffer_per_cpu *cpu_buffer; 5115 unsigned long ret; 5116 5117 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5118 return 0; 5119 5120 cpu_buffer = buffer->buffers[cpu]; 5121 ret = local_read(&cpu_buffer->overrun); 5122 5123 return ret; 5124 } 5125 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 5126 5127 /** 5128 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 5129 * commits failing due to the buffer wrapping around while there are uncommitted 5130 * events, such as during an interrupt storm. 5131 * @buffer: The ring buffer 5132 * @cpu: The per CPU buffer to get the number of overruns from 5133 */ 5134 unsigned long 5135 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 5136 { 5137 struct ring_buffer_per_cpu *cpu_buffer; 5138 unsigned long ret; 5139 5140 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5141 return 0; 5142 5143 cpu_buffer = buffer->buffers[cpu]; 5144 ret = local_read(&cpu_buffer->commit_overrun); 5145 5146 return ret; 5147 } 5148 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 5149 5150 /** 5151 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 5152 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 5153 * @buffer: The ring buffer 5154 * @cpu: The per CPU buffer to get the number of overruns from 5155 */ 5156 unsigned long 5157 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 5158 { 5159 struct ring_buffer_per_cpu *cpu_buffer; 5160 unsigned long ret; 5161 5162 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5163 return 0; 5164 5165 cpu_buffer = buffer->buffers[cpu]; 5166 ret = local_read(&cpu_buffer->dropped_events); 5167 5168 return ret; 5169 } 5170 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 5171 5172 /** 5173 * ring_buffer_read_events_cpu - get the number of events successfully read 5174 * @buffer: The ring buffer 5175 * @cpu: The per CPU buffer to get the number of events read 5176 */ 5177 unsigned long 5178 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 5179 { 5180 struct ring_buffer_per_cpu *cpu_buffer; 5181 5182 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5183 return 0; 5184 5185 cpu_buffer = buffer->buffers[cpu]; 5186 return cpu_buffer->read; 5187 } 5188 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 5189 5190 /** 5191 * ring_buffer_entries - get the number of entries in a buffer 5192 * @buffer: The ring buffer 5193 * 5194 * Returns the total number of entries in the ring buffer 5195 * (all CPU entries) 5196 */ 5197 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 5198 { 5199 struct ring_buffer_per_cpu *cpu_buffer; 5200 unsigned long entries = 0; 5201 int cpu; 5202 5203 /* if you care about this being correct, lock the buffer */ 5204 for_each_buffer_cpu(buffer, cpu) { 5205 cpu_buffer = buffer->buffers[cpu]; 5206 entries += rb_num_of_entries(cpu_buffer); 5207 } 5208 5209 return entries; 5210 } 5211 EXPORT_SYMBOL_GPL(ring_buffer_entries); 5212 5213 /** 5214 * ring_buffer_overruns - get the number of overruns in buffer 5215 * @buffer: The ring buffer 5216 * 5217 * Returns the total number of overruns in the ring buffer 5218 * (all CPU entries) 5219 */ 5220 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 5221 { 5222 struct ring_buffer_per_cpu *cpu_buffer; 5223 unsigned long overruns = 0; 5224 int cpu; 5225 5226 /* if you care about this being correct, lock the buffer */ 5227 for_each_buffer_cpu(buffer, cpu) { 5228 cpu_buffer = buffer->buffers[cpu]; 5229 overruns += local_read(&cpu_buffer->overrun); 5230 } 5231 5232 return overruns; 5233 } 5234 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 5235 5236 static void rb_iter_reset(struct ring_buffer_iter *iter) 5237 { 5238 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5239 5240 /* Iterator usage is expected to have record disabled */ 5241 iter->head_page = cpu_buffer->reader_page; 5242 iter->head = cpu_buffer->reader_page->read; 5243 iter->next_event = iter->head; 5244 5245 iter->cache_reader_page = iter->head_page; 5246 iter->cache_read = cpu_buffer->read; 5247 iter->cache_pages_removed = cpu_buffer->pages_removed; 5248 5249 if (iter->head) { 5250 iter->read_stamp = cpu_buffer->read_stamp; 5251 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 5252 } else { 5253 iter->read_stamp = iter->head_page->page->time_stamp; 5254 iter->page_stamp = iter->read_stamp; 5255 } 5256 } 5257 5258 /** 5259 * ring_buffer_iter_reset - reset an iterator 5260 * @iter: The iterator to reset 5261 * 5262 * Resets the iterator, so that it will start from the beginning 5263 * again. 5264 */ 5265 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 5266 { 5267 struct ring_buffer_per_cpu *cpu_buffer; 5268 unsigned long flags; 5269 5270 if (!iter) 5271 return; 5272 5273 cpu_buffer = iter->cpu_buffer; 5274 5275 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5276 rb_iter_reset(iter); 5277 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5278 } 5279 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 5280 5281 /** 5282 * ring_buffer_iter_empty - check if an iterator has no more to read 5283 * @iter: The iterator to check 5284 */ 5285 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 5286 { 5287 struct ring_buffer_per_cpu *cpu_buffer; 5288 struct buffer_page *reader; 5289 struct buffer_page *head_page; 5290 struct buffer_page *commit_page; 5291 struct buffer_page *curr_commit_page; 5292 unsigned commit; 5293 u64 curr_commit_ts; 5294 u64 commit_ts; 5295 5296 cpu_buffer = iter->cpu_buffer; 5297 reader = cpu_buffer->reader_page; 5298 head_page = cpu_buffer->head_page; 5299 commit_page = READ_ONCE(cpu_buffer->commit_page); 5300 commit_ts = commit_page->page->time_stamp; 5301 5302 /* 5303 * When the writer goes across pages, it issues a cmpxchg which 5304 * is a mb(), which will synchronize with the rmb here. 5305 * (see rb_tail_page_update()) 5306 */ 5307 smp_rmb(); 5308 commit = rb_page_commit(commit_page); 5309 /* We want to make sure that the commit page doesn't change */ 5310 smp_rmb(); 5311 5312 /* Make sure commit page didn't change */ 5313 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 5314 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 5315 5316 /* If the commit page changed, then there's more data */ 5317 if (curr_commit_page != commit_page || 5318 curr_commit_ts != commit_ts) 5319 return 0; 5320 5321 /* Still racy, as it may return a false positive, but that's OK */ 5322 return ((iter->head_page == commit_page && iter->head >= commit) || 5323 (iter->head_page == reader && commit_page == head_page && 5324 head_page->read == commit && 5325 iter->head == rb_page_size(cpu_buffer->reader_page))); 5326 } 5327 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 5328 5329 static void 5330 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 5331 struct ring_buffer_event *event) 5332 { 5333 u64 delta; 5334 5335 switch (event->type_len) { 5336 case RINGBUF_TYPE_PADDING: 5337 return; 5338 5339 case RINGBUF_TYPE_TIME_EXTEND: 5340 delta = rb_event_time_stamp(event); 5341 cpu_buffer->read_stamp += delta; 5342 return; 5343 5344 case RINGBUF_TYPE_TIME_STAMP: 5345 delta = rb_event_time_stamp(event); 5346 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 5347 cpu_buffer->read_stamp = delta; 5348 return; 5349 5350 case RINGBUF_TYPE_DATA: 5351 cpu_buffer->read_stamp += event->time_delta; 5352 return; 5353 5354 default: 5355 RB_WARN_ON(cpu_buffer, 1); 5356 } 5357 } 5358 5359 static void 5360 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 5361 struct ring_buffer_event *event) 5362 { 5363 u64 delta; 5364 5365 switch (event->type_len) { 5366 case RINGBUF_TYPE_PADDING: 5367 return; 5368 5369 case RINGBUF_TYPE_TIME_EXTEND: 5370 delta = rb_event_time_stamp(event); 5371 iter->read_stamp += delta; 5372 return; 5373 5374 case RINGBUF_TYPE_TIME_STAMP: 5375 delta = rb_event_time_stamp(event); 5376 delta = rb_fix_abs_ts(delta, iter->read_stamp); 5377 iter->read_stamp = delta; 5378 return; 5379 5380 case RINGBUF_TYPE_DATA: 5381 iter->read_stamp += event->time_delta; 5382 return; 5383 5384 default: 5385 RB_WARN_ON(iter->cpu_buffer, 1); 5386 } 5387 } 5388 5389 static struct buffer_page * 5390 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 5391 { 5392 struct buffer_page *reader = NULL; 5393 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 5394 unsigned long overwrite; 5395 unsigned long flags; 5396 int nr_loops = 0; 5397 bool ret; 5398 5399 local_irq_save(flags); 5400 arch_spin_lock(&cpu_buffer->lock); 5401 5402 again: 5403 /* 5404 * This should normally only loop twice. But because the 5405 * start of the reader inserts an empty page, it causes 5406 * a case where we will loop three times. There should be no 5407 * reason to loop four times (that I know of). 5408 */ 5409 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 5410 reader = NULL; 5411 goto out; 5412 } 5413 5414 reader = cpu_buffer->reader_page; 5415 5416 /* If there's more to read, return this page */ 5417 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 5418 goto out; 5419 5420 /* Never should we have an index greater than the size */ 5421 if (RB_WARN_ON(cpu_buffer, 5422 cpu_buffer->reader_page->read > rb_page_size(reader))) 5423 goto out; 5424 5425 /* check if we caught up to the tail */ 5426 reader = NULL; 5427 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 5428 goto out; 5429 5430 /* Don't bother swapping if the ring buffer is empty */ 5431 if (rb_num_of_entries(cpu_buffer) == 0) 5432 goto out; 5433 5434 /* 5435 * Reset the reader page to size zero. 5436 */ 5437 local_set(&cpu_buffer->reader_page->write, 0); 5438 local_set(&cpu_buffer->reader_page->entries, 0); 5439 cpu_buffer->reader_page->real_end = 0; 5440 5441 spin: 5442 /* 5443 * Splice the empty reader page into the list around the head. 5444 */ 5445 reader = rb_set_head_page(cpu_buffer); 5446 if (!reader) 5447 goto out; 5448 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 5449 cpu_buffer->reader_page->list.prev = reader->list.prev; 5450 5451 /* 5452 * cpu_buffer->pages just needs to point to the buffer, it 5453 * has no specific buffer page to point to. Lets move it out 5454 * of our way so we don't accidentally swap it. 5455 */ 5456 cpu_buffer->pages = reader->list.prev; 5457 5458 /* The reader page will be pointing to the new head */ 5459 rb_set_list_to_head(&cpu_buffer->reader_page->list); 5460 5461 /* 5462 * We want to make sure we read the overruns after we set up our 5463 * pointers to the next object. The writer side does a 5464 * cmpxchg to cross pages which acts as the mb on the writer 5465 * side. Note, the reader will constantly fail the swap 5466 * while the writer is updating the pointers, so this 5467 * guarantees that the overwrite recorded here is the one we 5468 * want to compare with the last_overrun. 5469 */ 5470 smp_mb(); 5471 overwrite = local_read(&(cpu_buffer->overrun)); 5472 5473 /* 5474 * Here's the tricky part. 5475 * 5476 * We need to move the pointer past the header page. 5477 * But we can only do that if a writer is not currently 5478 * moving it. The page before the header page has the 5479 * flag bit '1' set if it is pointing to the page we want. 5480 * but if the writer is in the process of moving it 5481 * then it will be '2' or already moved '0'. 5482 */ 5483 5484 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 5485 5486 /* 5487 * If we did not convert it, then we must try again. 5488 */ 5489 if (!ret) 5490 goto spin; 5491 5492 if (cpu_buffer->ring_meta) 5493 rb_update_meta_reader(cpu_buffer, reader); 5494 5495 /* 5496 * Yay! We succeeded in replacing the page. 5497 * 5498 * Now make the new head point back to the reader page. 5499 */ 5500 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 5501 rb_inc_page(&cpu_buffer->head_page); 5502 5503 cpu_buffer->cnt++; 5504 local_inc(&cpu_buffer->pages_read); 5505 5506 /* Finally update the reader page to the new head */ 5507 cpu_buffer->reader_page = reader; 5508 cpu_buffer->reader_page->read = 0; 5509 5510 if (overwrite != cpu_buffer->last_overrun) { 5511 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 5512 cpu_buffer->last_overrun = overwrite; 5513 } 5514 5515 goto again; 5516 5517 out: 5518 /* Update the read_stamp on the first event */ 5519 if (reader && reader->read == 0) 5520 cpu_buffer->read_stamp = reader->page->time_stamp; 5521 5522 arch_spin_unlock(&cpu_buffer->lock); 5523 local_irq_restore(flags); 5524 5525 /* 5526 * The writer has preempt disable, wait for it. But not forever 5527 * Although, 1 second is pretty much "forever" 5528 */ 5529 #define USECS_WAIT 1000000 5530 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 5531 /* If the write is past the end of page, a writer is still updating it */ 5532 if (likely(!reader || rb_page_write(reader) <= bsize)) 5533 break; 5534 5535 udelay(1); 5536 5537 /* Get the latest version of the reader write value */ 5538 smp_rmb(); 5539 } 5540 5541 /* The writer is not moving forward? Something is wrong */ 5542 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 5543 reader = NULL; 5544 5545 /* 5546 * Make sure we see any padding after the write update 5547 * (see rb_reset_tail()). 5548 * 5549 * In addition, a writer may be writing on the reader page 5550 * if the page has not been fully filled, so the read barrier 5551 * is also needed to make sure we see the content of what is 5552 * committed by the writer (see rb_set_commit_to_write()). 5553 */ 5554 smp_rmb(); 5555 5556 5557 return reader; 5558 } 5559 5560 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 5561 { 5562 struct ring_buffer_event *event; 5563 struct buffer_page *reader; 5564 unsigned length; 5565 5566 reader = rb_get_reader_page(cpu_buffer); 5567 5568 /* This function should not be called when buffer is empty */ 5569 if (RB_WARN_ON(cpu_buffer, !reader)) 5570 return; 5571 5572 event = rb_reader_event(cpu_buffer); 5573 5574 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 5575 cpu_buffer->read++; 5576 5577 rb_update_read_stamp(cpu_buffer, event); 5578 5579 length = rb_event_length(event); 5580 cpu_buffer->reader_page->read += length; 5581 cpu_buffer->read_bytes += length; 5582 } 5583 5584 static void rb_advance_iter(struct ring_buffer_iter *iter) 5585 { 5586 struct ring_buffer_per_cpu *cpu_buffer; 5587 5588 cpu_buffer = iter->cpu_buffer; 5589 5590 /* If head == next_event then we need to jump to the next event */ 5591 if (iter->head == iter->next_event) { 5592 /* If the event gets overwritten again, there's nothing to do */ 5593 if (rb_iter_head_event(iter) == NULL) 5594 return; 5595 } 5596 5597 iter->head = iter->next_event; 5598 5599 /* 5600 * Check if we are at the end of the buffer. 5601 */ 5602 if (iter->next_event >= rb_page_size(iter->head_page)) { 5603 /* discarded commits can make the page empty */ 5604 if (iter->head_page == cpu_buffer->commit_page) 5605 return; 5606 rb_inc_iter(iter); 5607 return; 5608 } 5609 5610 rb_update_iter_read_stamp(iter, iter->event); 5611 } 5612 5613 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 5614 { 5615 return cpu_buffer->lost_events; 5616 } 5617 5618 static struct ring_buffer_event * 5619 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 5620 unsigned long *lost_events) 5621 { 5622 struct ring_buffer_event *event; 5623 struct buffer_page *reader; 5624 int nr_loops = 0; 5625 5626 if (ts) 5627 *ts = 0; 5628 again: 5629 /* 5630 * We repeat when a time extend is encountered. 5631 * Since the time extend is always attached to a data event, 5632 * we should never loop more than once. 5633 * (We never hit the following condition more than twice). 5634 */ 5635 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 5636 return NULL; 5637 5638 reader = rb_get_reader_page(cpu_buffer); 5639 if (!reader) 5640 return NULL; 5641 5642 event = rb_reader_event(cpu_buffer); 5643 5644 switch (event->type_len) { 5645 case RINGBUF_TYPE_PADDING: 5646 if (rb_null_event(event)) 5647 RB_WARN_ON(cpu_buffer, 1); 5648 /* 5649 * Because the writer could be discarding every 5650 * event it creates (which would probably be bad) 5651 * if we were to go back to "again" then we may never 5652 * catch up, and will trigger the warn on, or lock 5653 * the box. Return the padding, and we will release 5654 * the current locks, and try again. 5655 */ 5656 return event; 5657 5658 case RINGBUF_TYPE_TIME_EXTEND: 5659 /* Internal data, OK to advance */ 5660 rb_advance_reader(cpu_buffer); 5661 goto again; 5662 5663 case RINGBUF_TYPE_TIME_STAMP: 5664 if (ts) { 5665 *ts = rb_event_time_stamp(event); 5666 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 5667 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5668 cpu_buffer->cpu, ts); 5669 } 5670 /* Internal data, OK to advance */ 5671 rb_advance_reader(cpu_buffer); 5672 goto again; 5673 5674 case RINGBUF_TYPE_DATA: 5675 if (ts && !(*ts)) { 5676 *ts = cpu_buffer->read_stamp + event->time_delta; 5677 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5678 cpu_buffer->cpu, ts); 5679 } 5680 if (lost_events) 5681 *lost_events = rb_lost_events(cpu_buffer); 5682 return event; 5683 5684 default: 5685 RB_WARN_ON(cpu_buffer, 1); 5686 } 5687 5688 return NULL; 5689 } 5690 EXPORT_SYMBOL_GPL(ring_buffer_peek); 5691 5692 static struct ring_buffer_event * 5693 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5694 { 5695 struct trace_buffer *buffer; 5696 struct ring_buffer_per_cpu *cpu_buffer; 5697 struct ring_buffer_event *event; 5698 int nr_loops = 0; 5699 5700 if (ts) 5701 *ts = 0; 5702 5703 cpu_buffer = iter->cpu_buffer; 5704 buffer = cpu_buffer->buffer; 5705 5706 /* 5707 * Check if someone performed a consuming read to the buffer 5708 * or removed some pages from the buffer. In these cases, 5709 * iterator was invalidated and we need to reset it. 5710 */ 5711 if (unlikely(iter->cache_read != cpu_buffer->read || 5712 iter->cache_reader_page != cpu_buffer->reader_page || 5713 iter->cache_pages_removed != cpu_buffer->pages_removed)) 5714 rb_iter_reset(iter); 5715 5716 again: 5717 if (ring_buffer_iter_empty(iter)) 5718 return NULL; 5719 5720 /* 5721 * As the writer can mess with what the iterator is trying 5722 * to read, just give up if we fail to get an event after 5723 * three tries. The iterator is not as reliable when reading 5724 * the ring buffer with an active write as the consumer is. 5725 * Do not warn if the three failures is reached. 5726 */ 5727 if (++nr_loops > 3) 5728 return NULL; 5729 5730 if (rb_per_cpu_empty(cpu_buffer)) 5731 return NULL; 5732 5733 if (iter->head >= rb_page_size(iter->head_page)) { 5734 rb_inc_iter(iter); 5735 goto again; 5736 } 5737 5738 event = rb_iter_head_event(iter); 5739 if (!event) 5740 goto again; 5741 5742 switch (event->type_len) { 5743 case RINGBUF_TYPE_PADDING: 5744 if (rb_null_event(event)) { 5745 rb_inc_iter(iter); 5746 goto again; 5747 } 5748 rb_advance_iter(iter); 5749 return event; 5750 5751 case RINGBUF_TYPE_TIME_EXTEND: 5752 /* Internal data, OK to advance */ 5753 rb_advance_iter(iter); 5754 goto again; 5755 5756 case RINGBUF_TYPE_TIME_STAMP: 5757 if (ts) { 5758 *ts = rb_event_time_stamp(event); 5759 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 5760 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 5761 cpu_buffer->cpu, ts); 5762 } 5763 /* Internal data, OK to advance */ 5764 rb_advance_iter(iter); 5765 goto again; 5766 5767 case RINGBUF_TYPE_DATA: 5768 if (ts && !(*ts)) { 5769 *ts = iter->read_stamp + event->time_delta; 5770 ring_buffer_normalize_time_stamp(buffer, 5771 cpu_buffer->cpu, ts); 5772 } 5773 return event; 5774 5775 default: 5776 RB_WARN_ON(cpu_buffer, 1); 5777 } 5778 5779 return NULL; 5780 } 5781 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 5782 5783 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 5784 { 5785 if (likely(!in_nmi())) { 5786 raw_spin_lock(&cpu_buffer->reader_lock); 5787 return true; 5788 } 5789 5790 /* 5791 * If an NMI die dumps out the content of the ring buffer 5792 * trylock must be used to prevent a deadlock if the NMI 5793 * preempted a task that holds the ring buffer locks. If 5794 * we get the lock then all is fine, if not, then continue 5795 * to do the read, but this can corrupt the ring buffer, 5796 * so it must be permanently disabled from future writes. 5797 * Reading from NMI is a oneshot deal. 5798 */ 5799 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 5800 return true; 5801 5802 /* Continue without locking, but disable the ring buffer */ 5803 atomic_inc(&cpu_buffer->record_disabled); 5804 return false; 5805 } 5806 5807 static inline void 5808 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 5809 { 5810 if (likely(locked)) 5811 raw_spin_unlock(&cpu_buffer->reader_lock); 5812 } 5813 5814 /** 5815 * ring_buffer_peek - peek at the next event to be read 5816 * @buffer: The ring buffer to read 5817 * @cpu: The cpu to peak at 5818 * @ts: The timestamp counter of this event. 5819 * @lost_events: a variable to store if events were lost (may be NULL) 5820 * 5821 * This will return the event that will be read next, but does 5822 * not consume the data. 5823 */ 5824 struct ring_buffer_event * 5825 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 5826 unsigned long *lost_events) 5827 { 5828 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5829 struct ring_buffer_event *event; 5830 unsigned long flags; 5831 bool dolock; 5832 5833 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5834 return NULL; 5835 5836 again: 5837 local_irq_save(flags); 5838 dolock = rb_reader_lock(cpu_buffer); 5839 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5840 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5841 rb_advance_reader(cpu_buffer); 5842 rb_reader_unlock(cpu_buffer, dolock); 5843 local_irq_restore(flags); 5844 5845 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5846 goto again; 5847 5848 return event; 5849 } 5850 5851 /** ring_buffer_iter_dropped - report if there are dropped events 5852 * @iter: The ring buffer iterator 5853 * 5854 * Returns true if there was dropped events since the last peek. 5855 */ 5856 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 5857 { 5858 bool ret = iter->missed_events != 0; 5859 5860 iter->missed_events = 0; 5861 return ret; 5862 } 5863 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 5864 5865 /** 5866 * ring_buffer_iter_peek - peek at the next event to be read 5867 * @iter: The ring buffer iterator 5868 * @ts: The timestamp counter of this event. 5869 * 5870 * This will return the event that will be read next, but does 5871 * not increment the iterator. 5872 */ 5873 struct ring_buffer_event * 5874 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 5875 { 5876 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5877 struct ring_buffer_event *event; 5878 unsigned long flags; 5879 5880 again: 5881 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5882 event = rb_iter_peek(iter, ts); 5883 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5884 5885 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5886 goto again; 5887 5888 return event; 5889 } 5890 5891 /** 5892 * ring_buffer_consume - return an event and consume it 5893 * @buffer: The ring buffer to get the next event from 5894 * @cpu: the cpu to read the buffer from 5895 * @ts: a variable to store the timestamp (may be NULL) 5896 * @lost_events: a variable to store if events were lost (may be NULL) 5897 * 5898 * Returns the next event in the ring buffer, and that event is consumed. 5899 * Meaning, that sequential reads will keep returning a different event, 5900 * and eventually empty the ring buffer if the producer is slower. 5901 */ 5902 struct ring_buffer_event * 5903 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5904 unsigned long *lost_events) 5905 { 5906 struct ring_buffer_per_cpu *cpu_buffer; 5907 struct ring_buffer_event *event = NULL; 5908 unsigned long flags; 5909 bool dolock; 5910 5911 again: 5912 /* might be called in atomic */ 5913 preempt_disable(); 5914 5915 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5916 goto out; 5917 5918 cpu_buffer = buffer->buffers[cpu]; 5919 local_irq_save(flags); 5920 dolock = rb_reader_lock(cpu_buffer); 5921 5922 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5923 if (event) { 5924 cpu_buffer->lost_events = 0; 5925 rb_advance_reader(cpu_buffer); 5926 } 5927 5928 rb_reader_unlock(cpu_buffer, dolock); 5929 local_irq_restore(flags); 5930 5931 out: 5932 preempt_enable(); 5933 5934 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5935 goto again; 5936 5937 return event; 5938 } 5939 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5940 5941 /** 5942 * ring_buffer_read_start - start a non consuming read of the buffer 5943 * @buffer: The ring buffer to read from 5944 * @cpu: The cpu buffer to iterate over 5945 * @flags: gfp flags to use for memory allocation 5946 * 5947 * This creates an iterator to allow non-consuming iteration through 5948 * the buffer. If the buffer is disabled for writing, it will produce 5949 * the same information each time, but if the buffer is still writing 5950 * then the first hit of a write will cause the iteration to stop. 5951 * 5952 * Must be paired with ring_buffer_read_finish. 5953 */ 5954 struct ring_buffer_iter * 5955 ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) 5956 { 5957 struct ring_buffer_per_cpu *cpu_buffer; 5958 struct ring_buffer_iter *iter; 5959 5960 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5961 return NULL; 5962 5963 iter = kzalloc(sizeof(*iter), flags); 5964 if (!iter) 5965 return NULL; 5966 5967 /* Holds the entire event: data and meta data */ 5968 iter->event_size = buffer->subbuf_size; 5969 iter->event = kmalloc(iter->event_size, flags); 5970 if (!iter->event) { 5971 kfree(iter); 5972 return NULL; 5973 } 5974 5975 cpu_buffer = buffer->buffers[cpu]; 5976 5977 iter->cpu_buffer = cpu_buffer; 5978 5979 atomic_inc(&cpu_buffer->resize_disabled); 5980 5981 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 5982 arch_spin_lock(&cpu_buffer->lock); 5983 rb_iter_reset(iter); 5984 arch_spin_unlock(&cpu_buffer->lock); 5985 5986 return iter; 5987 } 5988 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5989 5990 /** 5991 * ring_buffer_read_finish - finish reading the iterator of the buffer 5992 * @iter: The iterator retrieved by ring_buffer_start 5993 * 5994 * This re-enables resizing of the buffer, and frees the iterator. 5995 */ 5996 void 5997 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5998 { 5999 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6000 6001 /* Use this opportunity to check the integrity of the ring buffer. */ 6002 rb_check_pages(cpu_buffer); 6003 6004 atomic_dec(&cpu_buffer->resize_disabled); 6005 kfree(iter->event); 6006 kfree(iter); 6007 } 6008 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 6009 6010 /** 6011 * ring_buffer_iter_advance - advance the iterator to the next location 6012 * @iter: The ring buffer iterator 6013 * 6014 * Move the location of the iterator such that the next read will 6015 * be the next location of the iterator. 6016 */ 6017 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 6018 { 6019 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 6020 unsigned long flags; 6021 6022 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6023 6024 rb_advance_iter(iter); 6025 6026 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6027 } 6028 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 6029 6030 /** 6031 * ring_buffer_size - return the size of the ring buffer (in bytes) 6032 * @buffer: The ring buffer. 6033 * @cpu: The CPU to get ring buffer size from. 6034 */ 6035 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 6036 { 6037 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6038 return 0; 6039 6040 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 6041 } 6042 EXPORT_SYMBOL_GPL(ring_buffer_size); 6043 6044 /** 6045 * ring_buffer_max_event_size - return the max data size of an event 6046 * @buffer: The ring buffer. 6047 * 6048 * Returns the maximum size an event can be. 6049 */ 6050 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 6051 { 6052 /* If abs timestamp is requested, events have a timestamp too */ 6053 if (ring_buffer_time_stamp_abs(buffer)) 6054 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 6055 return buffer->max_data_size; 6056 } 6057 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 6058 6059 static void rb_clear_buffer_page(struct buffer_page *page) 6060 { 6061 local_set(&page->write, 0); 6062 local_set(&page->entries, 0); 6063 rb_init_page(page->page); 6064 page->read = 0; 6065 } 6066 6067 /* 6068 * When the buffer is memory mapped to user space, each sub buffer 6069 * has a unique id that is used by the meta data to tell the user 6070 * where the current reader page is. 6071 * 6072 * For a normal allocated ring buffer, the id is saved in the buffer page 6073 * id field, and updated via this function. 6074 * 6075 * But for a fixed memory mapped buffer, the id is already assigned for 6076 * fixed memory ording in the memory layout and can not be used. Instead 6077 * the index of where the page lies in the memory layout is used. 6078 * 6079 * For the normal pages, set the buffer page id with the passed in @id 6080 * value and return that. 6081 * 6082 * For fixed memory mapped pages, get the page index in the memory layout 6083 * and return that as the id. 6084 */ 6085 static int rb_page_id(struct ring_buffer_per_cpu *cpu_buffer, 6086 struct buffer_page *bpage, int id) 6087 { 6088 /* 6089 * For boot buffers, the id is the index, 6090 * otherwise, set the buffer page with this id 6091 */ 6092 if (cpu_buffer->ring_meta) 6093 id = rb_meta_subbuf_idx(cpu_buffer->ring_meta, bpage->page); 6094 else 6095 bpage->id = id; 6096 6097 return id; 6098 } 6099 6100 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6101 { 6102 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6103 6104 if (!meta) 6105 return; 6106 6107 meta->reader.read = cpu_buffer->reader_page->read; 6108 meta->reader.id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, 6109 cpu_buffer->reader_page->id); 6110 6111 meta->reader.lost_events = cpu_buffer->lost_events; 6112 6113 meta->entries = local_read(&cpu_buffer->entries); 6114 meta->overrun = local_read(&cpu_buffer->overrun); 6115 meta->read = cpu_buffer->read; 6116 6117 /* Some archs do not have data cache coherency between kernel and user-space */ 6118 flush_kernel_vmap_range(cpu_buffer->meta_page, PAGE_SIZE); 6119 } 6120 6121 static void 6122 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 6123 { 6124 struct buffer_page *page; 6125 6126 rb_head_page_deactivate(cpu_buffer); 6127 6128 cpu_buffer->head_page 6129 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6130 rb_clear_buffer_page(cpu_buffer->head_page); 6131 list_for_each_entry(page, cpu_buffer->pages, list) { 6132 rb_clear_buffer_page(page); 6133 } 6134 6135 cpu_buffer->tail_page = cpu_buffer->head_page; 6136 cpu_buffer->commit_page = cpu_buffer->head_page; 6137 6138 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 6139 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6140 rb_clear_buffer_page(cpu_buffer->reader_page); 6141 6142 local_set(&cpu_buffer->entries_bytes, 0); 6143 local_set(&cpu_buffer->overrun, 0); 6144 local_set(&cpu_buffer->commit_overrun, 0); 6145 local_set(&cpu_buffer->dropped_events, 0); 6146 local_set(&cpu_buffer->entries, 0); 6147 local_set(&cpu_buffer->committing, 0); 6148 local_set(&cpu_buffer->commits, 0); 6149 local_set(&cpu_buffer->pages_touched, 0); 6150 local_set(&cpu_buffer->pages_lost, 0); 6151 local_set(&cpu_buffer->pages_read, 0); 6152 cpu_buffer->last_pages_touch = 0; 6153 cpu_buffer->shortest_full = 0; 6154 cpu_buffer->read = 0; 6155 cpu_buffer->read_bytes = 0; 6156 6157 rb_time_set(&cpu_buffer->write_stamp, 0); 6158 rb_time_set(&cpu_buffer->before_stamp, 0); 6159 6160 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 6161 6162 cpu_buffer->lost_events = 0; 6163 cpu_buffer->last_overrun = 0; 6164 6165 rb_head_page_activate(cpu_buffer); 6166 cpu_buffer->pages_removed = 0; 6167 6168 if (cpu_buffer->mapped) { 6169 rb_update_meta_page(cpu_buffer); 6170 if (cpu_buffer->ring_meta) { 6171 struct ring_buffer_cpu_meta *meta = cpu_buffer->ring_meta; 6172 meta->commit_buffer = meta->head_buffer; 6173 } 6174 } 6175 } 6176 6177 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 6178 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6179 { 6180 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6181 6182 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 6183 return; 6184 6185 arch_spin_lock(&cpu_buffer->lock); 6186 6187 rb_reset_cpu(cpu_buffer); 6188 6189 arch_spin_unlock(&cpu_buffer->lock); 6190 } 6191 6192 /** 6193 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 6194 * @buffer: The ring buffer to reset a per cpu buffer of 6195 * @cpu: The CPU buffer to be reset 6196 */ 6197 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 6198 { 6199 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6200 6201 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6202 return; 6203 6204 /* prevent another thread from changing buffer sizes */ 6205 mutex_lock(&buffer->mutex); 6206 6207 atomic_inc(&cpu_buffer->resize_disabled); 6208 atomic_inc(&cpu_buffer->record_disabled); 6209 6210 /* Make sure all commits have finished */ 6211 synchronize_rcu(); 6212 6213 reset_disabled_cpu_buffer(cpu_buffer); 6214 6215 atomic_dec(&cpu_buffer->record_disabled); 6216 atomic_dec(&cpu_buffer->resize_disabled); 6217 6218 mutex_unlock(&buffer->mutex); 6219 } 6220 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 6221 6222 /* Flag to ensure proper resetting of atomic variables */ 6223 #define RESET_BIT (1 << 30) 6224 6225 /** 6226 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 6227 * @buffer: The ring buffer to reset a per cpu buffer of 6228 */ 6229 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 6230 { 6231 struct ring_buffer_per_cpu *cpu_buffer; 6232 int cpu; 6233 6234 /* prevent another thread from changing buffer sizes */ 6235 mutex_lock(&buffer->mutex); 6236 6237 for_each_online_buffer_cpu(buffer, cpu) { 6238 cpu_buffer = buffer->buffers[cpu]; 6239 6240 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 6241 atomic_inc(&cpu_buffer->record_disabled); 6242 } 6243 6244 /* Make sure all commits have finished */ 6245 synchronize_rcu(); 6246 6247 for_each_buffer_cpu(buffer, cpu) { 6248 cpu_buffer = buffer->buffers[cpu]; 6249 6250 /* 6251 * If a CPU came online during the synchronize_rcu(), then 6252 * ignore it. 6253 */ 6254 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 6255 continue; 6256 6257 reset_disabled_cpu_buffer(cpu_buffer); 6258 6259 atomic_dec(&cpu_buffer->record_disabled); 6260 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 6261 } 6262 6263 mutex_unlock(&buffer->mutex); 6264 } 6265 6266 /** 6267 * ring_buffer_reset - reset a ring buffer 6268 * @buffer: The ring buffer to reset all cpu buffers 6269 */ 6270 void ring_buffer_reset(struct trace_buffer *buffer) 6271 { 6272 struct ring_buffer_per_cpu *cpu_buffer; 6273 int cpu; 6274 6275 /* prevent another thread from changing buffer sizes */ 6276 mutex_lock(&buffer->mutex); 6277 6278 for_each_buffer_cpu(buffer, cpu) { 6279 cpu_buffer = buffer->buffers[cpu]; 6280 6281 atomic_inc(&cpu_buffer->resize_disabled); 6282 atomic_inc(&cpu_buffer->record_disabled); 6283 } 6284 6285 /* Make sure all commits have finished */ 6286 synchronize_rcu(); 6287 6288 for_each_buffer_cpu(buffer, cpu) { 6289 cpu_buffer = buffer->buffers[cpu]; 6290 6291 reset_disabled_cpu_buffer(cpu_buffer); 6292 6293 atomic_dec(&cpu_buffer->record_disabled); 6294 atomic_dec(&cpu_buffer->resize_disabled); 6295 } 6296 6297 mutex_unlock(&buffer->mutex); 6298 } 6299 EXPORT_SYMBOL_GPL(ring_buffer_reset); 6300 6301 /** 6302 * ring_buffer_empty - is the ring buffer empty? 6303 * @buffer: The ring buffer to test 6304 */ 6305 bool ring_buffer_empty(struct trace_buffer *buffer) 6306 { 6307 struct ring_buffer_per_cpu *cpu_buffer; 6308 unsigned long flags; 6309 bool dolock; 6310 bool ret; 6311 int cpu; 6312 6313 /* yes this is racy, but if you don't like the race, lock the buffer */ 6314 for_each_buffer_cpu(buffer, cpu) { 6315 cpu_buffer = buffer->buffers[cpu]; 6316 local_irq_save(flags); 6317 dolock = rb_reader_lock(cpu_buffer); 6318 ret = rb_per_cpu_empty(cpu_buffer); 6319 rb_reader_unlock(cpu_buffer, dolock); 6320 local_irq_restore(flags); 6321 6322 if (!ret) 6323 return false; 6324 } 6325 6326 return true; 6327 } 6328 EXPORT_SYMBOL_GPL(ring_buffer_empty); 6329 6330 /** 6331 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 6332 * @buffer: The ring buffer 6333 * @cpu: The CPU buffer to test 6334 */ 6335 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 6336 { 6337 struct ring_buffer_per_cpu *cpu_buffer; 6338 unsigned long flags; 6339 bool dolock; 6340 bool ret; 6341 6342 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6343 return true; 6344 6345 cpu_buffer = buffer->buffers[cpu]; 6346 local_irq_save(flags); 6347 dolock = rb_reader_lock(cpu_buffer); 6348 ret = rb_per_cpu_empty(cpu_buffer); 6349 rb_reader_unlock(cpu_buffer, dolock); 6350 local_irq_restore(flags); 6351 6352 return ret; 6353 } 6354 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 6355 6356 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 6357 /** 6358 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 6359 * @buffer_a: One buffer to swap with 6360 * @buffer_b: The other buffer to swap with 6361 * @cpu: the CPU of the buffers to swap 6362 * 6363 * This function is useful for tracers that want to take a "snapshot" 6364 * of a CPU buffer and has another back up buffer lying around. 6365 * it is expected that the tracer handles the cpu buffer not being 6366 * used at the moment. 6367 */ 6368 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 6369 struct trace_buffer *buffer_b, int cpu) 6370 { 6371 struct ring_buffer_per_cpu *cpu_buffer_a; 6372 struct ring_buffer_per_cpu *cpu_buffer_b; 6373 int ret = -EINVAL; 6374 6375 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 6376 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 6377 return -EINVAL; 6378 6379 cpu_buffer_a = buffer_a->buffers[cpu]; 6380 cpu_buffer_b = buffer_b->buffers[cpu]; 6381 6382 /* It's up to the callers to not try to swap mapped buffers */ 6383 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) 6384 return -EBUSY; 6385 6386 /* At least make sure the two buffers are somewhat the same */ 6387 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 6388 return -EINVAL; 6389 6390 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 6391 return -EINVAL; 6392 6393 if (atomic_read(&buffer_a->record_disabled)) 6394 return -EAGAIN; 6395 6396 if (atomic_read(&buffer_b->record_disabled)) 6397 return -EAGAIN; 6398 6399 if (atomic_read(&cpu_buffer_a->record_disabled)) 6400 return -EAGAIN; 6401 6402 if (atomic_read(&cpu_buffer_b->record_disabled)) 6403 return -EAGAIN; 6404 6405 /* 6406 * We can't do a synchronize_rcu here because this 6407 * function can be called in atomic context. 6408 * Normally this will be called from the same CPU as cpu. 6409 * If not it's up to the caller to protect this. 6410 */ 6411 atomic_inc(&cpu_buffer_a->record_disabled); 6412 atomic_inc(&cpu_buffer_b->record_disabled); 6413 6414 ret = -EBUSY; 6415 if (local_read(&cpu_buffer_a->committing)) 6416 goto out_dec; 6417 if (local_read(&cpu_buffer_b->committing)) 6418 goto out_dec; 6419 6420 /* 6421 * When resize is in progress, we cannot swap it because 6422 * it will mess the state of the cpu buffer. 6423 */ 6424 if (atomic_read(&buffer_a->resizing)) 6425 goto out_dec; 6426 if (atomic_read(&buffer_b->resizing)) 6427 goto out_dec; 6428 6429 buffer_a->buffers[cpu] = cpu_buffer_b; 6430 buffer_b->buffers[cpu] = cpu_buffer_a; 6431 6432 cpu_buffer_b->buffer = buffer_a; 6433 cpu_buffer_a->buffer = buffer_b; 6434 6435 ret = 0; 6436 6437 out_dec: 6438 atomic_dec(&cpu_buffer_a->record_disabled); 6439 atomic_dec(&cpu_buffer_b->record_disabled); 6440 return ret; 6441 } 6442 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 6443 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 6444 6445 /** 6446 * ring_buffer_alloc_read_page - allocate a page to read from buffer 6447 * @buffer: the buffer to allocate for. 6448 * @cpu: the cpu buffer to allocate. 6449 * 6450 * This function is used in conjunction with ring_buffer_read_page. 6451 * When reading a full page from the ring buffer, these functions 6452 * can be used to speed up the process. The calling function should 6453 * allocate a few pages first with this function. Then when it 6454 * needs to get pages from the ring buffer, it passes the result 6455 * of this function into ring_buffer_read_page, which will swap 6456 * the page that was allocated, with the read page of the buffer. 6457 * 6458 * Returns: 6459 * The page allocated, or ERR_PTR 6460 */ 6461 struct buffer_data_read_page * 6462 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 6463 { 6464 struct ring_buffer_per_cpu *cpu_buffer; 6465 struct buffer_data_read_page *bpage = NULL; 6466 unsigned long flags; 6467 struct page *page; 6468 6469 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6470 return ERR_PTR(-ENODEV); 6471 6472 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 6473 if (!bpage) 6474 return ERR_PTR(-ENOMEM); 6475 6476 bpage->order = buffer->subbuf_order; 6477 cpu_buffer = buffer->buffers[cpu]; 6478 local_irq_save(flags); 6479 arch_spin_lock(&cpu_buffer->lock); 6480 6481 if (cpu_buffer->free_page) { 6482 bpage->data = cpu_buffer->free_page; 6483 cpu_buffer->free_page = NULL; 6484 } 6485 6486 arch_spin_unlock(&cpu_buffer->lock); 6487 local_irq_restore(flags); 6488 6489 if (bpage->data) 6490 goto out; 6491 6492 page = alloc_pages_node(cpu_to_node(cpu), 6493 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 6494 cpu_buffer->buffer->subbuf_order); 6495 if (!page) { 6496 kfree(bpage); 6497 return ERR_PTR(-ENOMEM); 6498 } 6499 6500 bpage->data = page_address(page); 6501 6502 out: 6503 rb_init_page(bpage->data); 6504 6505 return bpage; 6506 } 6507 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 6508 6509 /** 6510 * ring_buffer_free_read_page - free an allocated read page 6511 * @buffer: the buffer the page was allocate for 6512 * @cpu: the cpu buffer the page came from 6513 * @data_page: the page to free 6514 * 6515 * Free a page allocated from ring_buffer_alloc_read_page. 6516 */ 6517 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 6518 struct buffer_data_read_page *data_page) 6519 { 6520 struct ring_buffer_per_cpu *cpu_buffer; 6521 struct buffer_data_page *bpage = data_page->data; 6522 struct page *page = virt_to_page(bpage); 6523 unsigned long flags; 6524 6525 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 6526 return; 6527 6528 cpu_buffer = buffer->buffers[cpu]; 6529 6530 /* 6531 * If the page is still in use someplace else, or order of the page 6532 * is different from the subbuffer order of the buffer - 6533 * we can't reuse it 6534 */ 6535 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 6536 goto out; 6537 6538 local_irq_save(flags); 6539 arch_spin_lock(&cpu_buffer->lock); 6540 6541 if (!cpu_buffer->free_page) { 6542 cpu_buffer->free_page = bpage; 6543 bpage = NULL; 6544 } 6545 6546 arch_spin_unlock(&cpu_buffer->lock); 6547 local_irq_restore(flags); 6548 6549 out: 6550 free_pages((unsigned long)bpage, data_page->order); 6551 kfree(data_page); 6552 } 6553 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 6554 6555 /** 6556 * ring_buffer_read_page - extract a page from the ring buffer 6557 * @buffer: buffer to extract from 6558 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 6559 * @len: amount to extract 6560 * @cpu: the cpu of the buffer to extract 6561 * @full: should the extraction only happen when the page is full. 6562 * 6563 * This function will pull out a page from the ring buffer and consume it. 6564 * @data_page must be the address of the variable that was returned 6565 * from ring_buffer_alloc_read_page. This is because the page might be used 6566 * to swap with a page in the ring buffer. 6567 * 6568 * for example: 6569 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 6570 * if (IS_ERR(rpage)) 6571 * return PTR_ERR(rpage); 6572 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 6573 * if (ret >= 0) 6574 * process_page(ring_buffer_read_page_data(rpage), ret); 6575 * ring_buffer_free_read_page(buffer, cpu, rpage); 6576 * 6577 * When @full is set, the function will not return true unless 6578 * the writer is off the reader page. 6579 * 6580 * Note: it is up to the calling functions to handle sleeps and wakeups. 6581 * The ring buffer can be used anywhere in the kernel and can not 6582 * blindly call wake_up. The layer that uses the ring buffer must be 6583 * responsible for that. 6584 * 6585 * Returns: 6586 * >=0 if data has been transferred, returns the offset of consumed data. 6587 * <0 if no data has been transferred. 6588 */ 6589 int ring_buffer_read_page(struct trace_buffer *buffer, 6590 struct buffer_data_read_page *data_page, 6591 size_t len, int cpu, int full) 6592 { 6593 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 6594 struct ring_buffer_event *event; 6595 struct buffer_data_page *bpage; 6596 struct buffer_page *reader; 6597 unsigned long missed_events; 6598 unsigned int commit; 6599 unsigned int read; 6600 u64 save_timestamp; 6601 6602 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6603 return -1; 6604 6605 /* 6606 * If len is not big enough to hold the page header, then 6607 * we can not copy anything. 6608 */ 6609 if (len <= BUF_PAGE_HDR_SIZE) 6610 return -1; 6611 6612 len -= BUF_PAGE_HDR_SIZE; 6613 6614 if (!data_page || !data_page->data) 6615 return -1; 6616 6617 if (data_page->order != buffer->subbuf_order) 6618 return -1; 6619 6620 bpage = data_page->data; 6621 if (!bpage) 6622 return -1; 6623 6624 guard(raw_spinlock_irqsave)(&cpu_buffer->reader_lock); 6625 6626 reader = rb_get_reader_page(cpu_buffer); 6627 if (!reader) 6628 return -1; 6629 6630 event = rb_reader_event(cpu_buffer); 6631 6632 read = reader->read; 6633 commit = rb_page_size(reader); 6634 6635 /* Check if any events were dropped */ 6636 missed_events = cpu_buffer->lost_events; 6637 6638 /* 6639 * If this page has been partially read or 6640 * if len is not big enough to read the rest of the page or 6641 * a writer is still on the page, then 6642 * we must copy the data from the page to the buffer. 6643 * Otherwise, we can simply swap the page with the one passed in. 6644 */ 6645 if (read || (len < (commit - read)) || 6646 cpu_buffer->reader_page == cpu_buffer->commit_page || 6647 cpu_buffer->mapped) { 6648 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 6649 unsigned int rpos = read; 6650 unsigned int pos = 0; 6651 unsigned int size; 6652 6653 /* 6654 * If a full page is expected, this can still be returned 6655 * if there's been a previous partial read and the 6656 * rest of the page can be read and the commit page is off 6657 * the reader page. 6658 */ 6659 if (full && 6660 (!read || (len < (commit - read)) || 6661 cpu_buffer->reader_page == cpu_buffer->commit_page)) 6662 return -1; 6663 6664 if (len > (commit - read)) 6665 len = (commit - read); 6666 6667 /* Always keep the time extend and data together */ 6668 size = rb_event_ts_length(event); 6669 6670 if (len < size) 6671 return -1; 6672 6673 /* save the current timestamp, since the user will need it */ 6674 save_timestamp = cpu_buffer->read_stamp; 6675 6676 /* Need to copy one event at a time */ 6677 do { 6678 /* We need the size of one event, because 6679 * rb_advance_reader only advances by one event, 6680 * whereas rb_event_ts_length may include the size of 6681 * one or two events. 6682 * We have already ensured there's enough space if this 6683 * is a time extend. */ 6684 size = rb_event_length(event); 6685 memcpy(bpage->data + pos, rpage->data + rpos, size); 6686 6687 len -= size; 6688 6689 rb_advance_reader(cpu_buffer); 6690 rpos = reader->read; 6691 pos += size; 6692 6693 if (rpos >= commit) 6694 break; 6695 6696 event = rb_reader_event(cpu_buffer); 6697 /* Always keep the time extend and data together */ 6698 size = rb_event_ts_length(event); 6699 } while (len >= size); 6700 6701 /* update bpage */ 6702 local_set(&bpage->commit, pos); 6703 bpage->time_stamp = save_timestamp; 6704 6705 /* we copied everything to the beginning */ 6706 read = 0; 6707 } else { 6708 /* update the entry counter */ 6709 cpu_buffer->read += rb_page_entries(reader); 6710 cpu_buffer->read_bytes += rb_page_size(reader); 6711 6712 /* swap the pages */ 6713 rb_init_page(bpage); 6714 bpage = reader->page; 6715 reader->page = data_page->data; 6716 local_set(&reader->write, 0); 6717 local_set(&reader->entries, 0); 6718 reader->read = 0; 6719 data_page->data = bpage; 6720 6721 /* 6722 * Use the real_end for the data size, 6723 * This gives us a chance to store the lost events 6724 * on the page. 6725 */ 6726 if (reader->real_end) 6727 local_set(&bpage->commit, reader->real_end); 6728 } 6729 6730 cpu_buffer->lost_events = 0; 6731 6732 commit = local_read(&bpage->commit); 6733 /* 6734 * Set a flag in the commit field if we lost events 6735 */ 6736 if (missed_events) { 6737 /* If there is room at the end of the page to save the 6738 * missed events, then record it there. 6739 */ 6740 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6741 memcpy(&bpage->data[commit], &missed_events, 6742 sizeof(missed_events)); 6743 local_add(RB_MISSED_STORED, &bpage->commit); 6744 commit += sizeof(missed_events); 6745 } 6746 local_add(RB_MISSED_EVENTS, &bpage->commit); 6747 } 6748 6749 /* 6750 * This page may be off to user land. Zero it out here. 6751 */ 6752 if (commit < buffer->subbuf_size) 6753 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 6754 6755 return read; 6756 } 6757 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 6758 6759 /** 6760 * ring_buffer_read_page_data - get pointer to the data in the page. 6761 * @page: the page to get the data from 6762 * 6763 * Returns pointer to the actual data in this page. 6764 */ 6765 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 6766 { 6767 return page->data; 6768 } 6769 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 6770 6771 /** 6772 * ring_buffer_subbuf_size_get - get size of the sub buffer. 6773 * @buffer: the buffer to get the sub buffer size from 6774 * 6775 * Returns size of the sub buffer, in bytes. 6776 */ 6777 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 6778 { 6779 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6780 } 6781 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 6782 6783 /** 6784 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 6785 * @buffer: The ring_buffer to get the system sub page order from 6786 * 6787 * By default, one ring buffer sub page equals to one system page. This parameter 6788 * is configurable, per ring buffer. The size of the ring buffer sub page can be 6789 * extended, but must be an order of system page size. 6790 * 6791 * Returns the order of buffer sub page size, in system pages: 6792 * 0 means the sub buffer size is 1 system page and so forth. 6793 * In case of an error < 0 is returned. 6794 */ 6795 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 6796 { 6797 if (!buffer) 6798 return -EINVAL; 6799 6800 return buffer->subbuf_order; 6801 } 6802 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 6803 6804 /** 6805 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 6806 * @buffer: The ring_buffer to set the new page size. 6807 * @order: Order of the system pages in one sub buffer page 6808 * 6809 * By default, one ring buffer pages equals to one system page. This API can be 6810 * used to set new size of the ring buffer page. The size must be order of 6811 * system page size, that's why the input parameter @order is the order of 6812 * system pages that are allocated for one ring buffer page: 6813 * 0 - 1 system page 6814 * 1 - 2 system pages 6815 * 3 - 4 system pages 6816 * ... 6817 * 6818 * Returns 0 on success or < 0 in case of an error. 6819 */ 6820 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 6821 { 6822 struct ring_buffer_per_cpu *cpu_buffer; 6823 struct buffer_page *bpage, *tmp; 6824 int old_order, old_size; 6825 int nr_pages; 6826 int psize; 6827 int err; 6828 int cpu; 6829 6830 if (!buffer || order < 0) 6831 return -EINVAL; 6832 6833 if (buffer->subbuf_order == order) 6834 return 0; 6835 6836 psize = (1 << order) * PAGE_SIZE; 6837 if (psize <= BUF_PAGE_HDR_SIZE) 6838 return -EINVAL; 6839 6840 /* Size of a subbuf cannot be greater than the write counter */ 6841 if (psize > RB_WRITE_MASK + 1) 6842 return -EINVAL; 6843 6844 old_order = buffer->subbuf_order; 6845 old_size = buffer->subbuf_size; 6846 6847 /* prevent another thread from changing buffer sizes */ 6848 guard(mutex)(&buffer->mutex); 6849 atomic_inc(&buffer->record_disabled); 6850 6851 /* Make sure all commits have finished */ 6852 synchronize_rcu(); 6853 6854 buffer->subbuf_order = order; 6855 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 6856 6857 /* Make sure all new buffers are allocated, before deleting the old ones */ 6858 for_each_buffer_cpu(buffer, cpu) { 6859 6860 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6861 continue; 6862 6863 cpu_buffer = buffer->buffers[cpu]; 6864 6865 if (cpu_buffer->mapped) { 6866 err = -EBUSY; 6867 goto error; 6868 } 6869 6870 /* Update the number of pages to match the new size */ 6871 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6872 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6873 6874 /* we need a minimum of two pages */ 6875 if (nr_pages < 2) 6876 nr_pages = 2; 6877 6878 cpu_buffer->nr_pages_to_update = nr_pages; 6879 6880 /* Include the reader page */ 6881 nr_pages++; 6882 6883 /* Allocate the new size buffer */ 6884 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6885 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6886 &cpu_buffer->new_pages)) { 6887 /* not enough memory for new pages */ 6888 err = -ENOMEM; 6889 goto error; 6890 } 6891 } 6892 6893 for_each_buffer_cpu(buffer, cpu) { 6894 struct buffer_data_page *old_free_data_page; 6895 struct list_head old_pages; 6896 unsigned long flags; 6897 6898 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6899 continue; 6900 6901 cpu_buffer = buffer->buffers[cpu]; 6902 6903 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6904 6905 /* Clear the head bit to make the link list normal to read */ 6906 rb_head_page_deactivate(cpu_buffer); 6907 6908 /* 6909 * Collect buffers from the cpu_buffer pages list and the 6910 * reader_page on old_pages, so they can be freed later when not 6911 * under a spinlock. The pages list is a linked list with no 6912 * head, adding old_pages turns it into a regular list with 6913 * old_pages being the head. 6914 */ 6915 list_add(&old_pages, cpu_buffer->pages); 6916 list_add(&cpu_buffer->reader_page->list, &old_pages); 6917 6918 /* One page was allocated for the reader page */ 6919 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6920 struct buffer_page, list); 6921 list_del_init(&cpu_buffer->reader_page->list); 6922 6923 /* Install the new pages, remove the head from the list */ 6924 cpu_buffer->pages = cpu_buffer->new_pages.next; 6925 list_del_init(&cpu_buffer->new_pages); 6926 cpu_buffer->cnt++; 6927 6928 cpu_buffer->head_page 6929 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6930 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6931 6932 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6933 cpu_buffer->nr_pages_to_update = 0; 6934 6935 old_free_data_page = cpu_buffer->free_page; 6936 cpu_buffer->free_page = NULL; 6937 6938 rb_head_page_activate(cpu_buffer); 6939 6940 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6941 6942 /* Free old sub buffers */ 6943 list_for_each_entry_safe(bpage, tmp, &old_pages, list) { 6944 list_del_init(&bpage->list); 6945 free_buffer_page(bpage); 6946 } 6947 free_pages((unsigned long)old_free_data_page, old_order); 6948 6949 rb_check_pages(cpu_buffer); 6950 } 6951 6952 atomic_dec(&buffer->record_disabled); 6953 6954 return 0; 6955 6956 error: 6957 buffer->subbuf_order = old_order; 6958 buffer->subbuf_size = old_size; 6959 6960 atomic_dec(&buffer->record_disabled); 6961 6962 for_each_buffer_cpu(buffer, cpu) { 6963 cpu_buffer = buffer->buffers[cpu]; 6964 6965 if (!cpu_buffer->nr_pages_to_update) 6966 continue; 6967 6968 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6969 list_del_init(&bpage->list); 6970 free_buffer_page(bpage); 6971 } 6972 } 6973 6974 return err; 6975 } 6976 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6977 6978 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6979 { 6980 struct page *page; 6981 6982 if (cpu_buffer->meta_page) 6983 return 0; 6984 6985 page = alloc_page(GFP_USER | __GFP_ZERO); 6986 if (!page) 6987 return -ENOMEM; 6988 6989 cpu_buffer->meta_page = page_to_virt(page); 6990 6991 return 0; 6992 } 6993 6994 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6995 { 6996 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6997 6998 free_page(addr); 6999 cpu_buffer->meta_page = NULL; 7000 } 7001 7002 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 7003 unsigned long *subbuf_ids) 7004 { 7005 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 7006 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 7007 struct buffer_page *first_subbuf, *subbuf; 7008 int cnt = 0; 7009 int id = 0; 7010 7011 id = rb_page_id(cpu_buffer, cpu_buffer->reader_page, id); 7012 subbuf_ids[id++] = (unsigned long)cpu_buffer->reader_page->page; 7013 cnt++; 7014 7015 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 7016 do { 7017 id = rb_page_id(cpu_buffer, subbuf, id); 7018 7019 if (WARN_ON(id >= nr_subbufs)) 7020 break; 7021 7022 subbuf_ids[id] = (unsigned long)subbuf->page; 7023 7024 rb_inc_page(&subbuf); 7025 id++; 7026 cnt++; 7027 } while (subbuf != first_subbuf); 7028 7029 WARN_ON(cnt != nr_subbufs); 7030 7031 /* install subbuf ID to kern VA translation */ 7032 cpu_buffer->subbuf_ids = subbuf_ids; 7033 7034 meta->meta_struct_len = sizeof(*meta); 7035 meta->nr_subbufs = nr_subbufs; 7036 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 7037 meta->meta_page_size = meta->subbuf_size; 7038 7039 rb_update_meta_page(cpu_buffer); 7040 } 7041 7042 static struct ring_buffer_per_cpu * 7043 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 7044 { 7045 struct ring_buffer_per_cpu *cpu_buffer; 7046 7047 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7048 return ERR_PTR(-EINVAL); 7049 7050 cpu_buffer = buffer->buffers[cpu]; 7051 7052 mutex_lock(&cpu_buffer->mapping_lock); 7053 7054 if (!cpu_buffer->user_mapped) { 7055 mutex_unlock(&cpu_buffer->mapping_lock); 7056 return ERR_PTR(-ENODEV); 7057 } 7058 7059 return cpu_buffer; 7060 } 7061 7062 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 7063 { 7064 mutex_unlock(&cpu_buffer->mapping_lock); 7065 } 7066 7067 /* 7068 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 7069 * to be set-up or torn-down. 7070 */ 7071 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 7072 bool inc) 7073 { 7074 unsigned long flags; 7075 7076 lockdep_assert_held(&cpu_buffer->mapping_lock); 7077 7078 /* mapped is always greater or equal to user_mapped */ 7079 if (WARN_ON(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7080 return -EINVAL; 7081 7082 if (inc && cpu_buffer->mapped == UINT_MAX) 7083 return -EBUSY; 7084 7085 if (WARN_ON(!inc && cpu_buffer->user_mapped == 0)) 7086 return -EINVAL; 7087 7088 mutex_lock(&cpu_buffer->buffer->mutex); 7089 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7090 7091 if (inc) { 7092 cpu_buffer->user_mapped++; 7093 cpu_buffer->mapped++; 7094 } else { 7095 cpu_buffer->user_mapped--; 7096 cpu_buffer->mapped--; 7097 } 7098 7099 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7100 mutex_unlock(&cpu_buffer->buffer->mutex); 7101 7102 return 0; 7103 } 7104 7105 /* 7106 * +--------------+ pgoff == 0 7107 * | meta page | 7108 * +--------------+ pgoff == 1 7109 * | subbuffer 0 | 7110 * | | 7111 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 7112 * | subbuffer 1 | 7113 * | | 7114 * ... 7115 */ 7116 #ifdef CONFIG_MMU 7117 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7118 struct vm_area_struct *vma) 7119 { 7120 unsigned long nr_subbufs, nr_pages, nr_vma_pages, pgoff = vma->vm_pgoff; 7121 unsigned int subbuf_pages, subbuf_order; 7122 struct page **pages __free(kfree) = NULL; 7123 int p = 0, s = 0; 7124 int err; 7125 7126 /* Refuse MP_PRIVATE or writable mappings */ 7127 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 7128 !(vma->vm_flags & VM_MAYSHARE)) 7129 return -EPERM; 7130 7131 subbuf_order = cpu_buffer->buffer->subbuf_order; 7132 subbuf_pages = 1 << subbuf_order; 7133 7134 if (subbuf_order && pgoff % subbuf_pages) 7135 return -EINVAL; 7136 7137 /* 7138 * Make sure the mapping cannot become writable later. Also tell the VM 7139 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 7140 */ 7141 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 7142 VM_MAYWRITE); 7143 7144 lockdep_assert_held(&cpu_buffer->mapping_lock); 7145 7146 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 7147 nr_pages = ((nr_subbufs + 1) << subbuf_order); /* + meta-page */ 7148 if (nr_pages <= pgoff) 7149 return -EINVAL; 7150 7151 nr_pages -= pgoff; 7152 7153 nr_vma_pages = vma_pages(vma); 7154 if (!nr_vma_pages || nr_vma_pages > nr_pages) 7155 return -EINVAL; 7156 7157 nr_pages = nr_vma_pages; 7158 7159 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 7160 if (!pages) 7161 return -ENOMEM; 7162 7163 if (!pgoff) { 7164 unsigned long meta_page_padding; 7165 7166 pages[p++] = virt_to_page(cpu_buffer->meta_page); 7167 7168 /* 7169 * Pad with the zero-page to align the meta-page with the 7170 * sub-buffers. 7171 */ 7172 meta_page_padding = subbuf_pages - 1; 7173 while (meta_page_padding-- && p < nr_pages) { 7174 unsigned long __maybe_unused zero_addr = 7175 vma->vm_start + (PAGE_SIZE * p); 7176 7177 pages[p++] = ZERO_PAGE(zero_addr); 7178 } 7179 } else { 7180 /* Skip the meta-page */ 7181 pgoff -= subbuf_pages; 7182 7183 s += pgoff / subbuf_pages; 7184 } 7185 7186 while (p < nr_pages) { 7187 struct page *page; 7188 int off = 0; 7189 7190 if (WARN_ON_ONCE(s >= nr_subbufs)) 7191 return -EINVAL; 7192 7193 page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 7194 7195 for (; off < (1 << (subbuf_order)); off++, page++) { 7196 if (p >= nr_pages) 7197 break; 7198 7199 pages[p++] = page; 7200 } 7201 s++; 7202 } 7203 7204 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 7205 7206 return err; 7207 } 7208 #else 7209 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 7210 struct vm_area_struct *vma) 7211 { 7212 return -EOPNOTSUPP; 7213 } 7214 #endif 7215 7216 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 7217 struct vm_area_struct *vma) 7218 { 7219 struct ring_buffer_per_cpu *cpu_buffer; 7220 unsigned long flags, *subbuf_ids; 7221 int err; 7222 7223 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7224 return -EINVAL; 7225 7226 cpu_buffer = buffer->buffers[cpu]; 7227 7228 guard(mutex)(&cpu_buffer->mapping_lock); 7229 7230 if (cpu_buffer->user_mapped) { 7231 err = __rb_map_vma(cpu_buffer, vma); 7232 if (!err) 7233 err = __rb_inc_dec_mapped(cpu_buffer, true); 7234 return err; 7235 } 7236 7237 /* prevent another thread from changing buffer/sub-buffer sizes */ 7238 guard(mutex)(&buffer->mutex); 7239 7240 err = rb_alloc_meta_page(cpu_buffer); 7241 if (err) 7242 return err; 7243 7244 /* subbuf_ids include the reader while nr_pages does not */ 7245 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 7246 if (!subbuf_ids) { 7247 rb_free_meta_page(cpu_buffer); 7248 return -ENOMEM; 7249 } 7250 7251 atomic_inc(&cpu_buffer->resize_disabled); 7252 7253 /* 7254 * Lock all readers to block any subbuf swap until the subbuf IDs are 7255 * assigned. 7256 */ 7257 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7258 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 7259 7260 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7261 7262 err = __rb_map_vma(cpu_buffer, vma); 7263 if (!err) { 7264 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7265 /* This is the first time it is mapped by user */ 7266 cpu_buffer->mapped++; 7267 cpu_buffer->user_mapped = 1; 7268 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7269 } else { 7270 kfree(cpu_buffer->subbuf_ids); 7271 cpu_buffer->subbuf_ids = NULL; 7272 rb_free_meta_page(cpu_buffer); 7273 atomic_dec(&cpu_buffer->resize_disabled); 7274 } 7275 7276 return 0; 7277 } 7278 7279 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 7280 { 7281 struct ring_buffer_per_cpu *cpu_buffer; 7282 unsigned long flags; 7283 7284 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 7285 return -EINVAL; 7286 7287 cpu_buffer = buffer->buffers[cpu]; 7288 7289 guard(mutex)(&cpu_buffer->mapping_lock); 7290 7291 if (!cpu_buffer->user_mapped) { 7292 return -ENODEV; 7293 } else if (cpu_buffer->user_mapped > 1) { 7294 __rb_inc_dec_mapped(cpu_buffer, false); 7295 return 0; 7296 } 7297 7298 guard(mutex)(&buffer->mutex); 7299 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7300 7301 /* This is the last user space mapping */ 7302 if (!WARN_ON_ONCE(cpu_buffer->mapped < cpu_buffer->user_mapped)) 7303 cpu_buffer->mapped--; 7304 cpu_buffer->user_mapped = 0; 7305 7306 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7307 7308 kfree(cpu_buffer->subbuf_ids); 7309 cpu_buffer->subbuf_ids = NULL; 7310 rb_free_meta_page(cpu_buffer); 7311 atomic_dec(&cpu_buffer->resize_disabled); 7312 7313 return 0; 7314 } 7315 7316 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 7317 { 7318 struct ring_buffer_per_cpu *cpu_buffer; 7319 struct buffer_page *reader; 7320 unsigned long missed_events; 7321 unsigned long reader_size; 7322 unsigned long flags; 7323 7324 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 7325 if (IS_ERR(cpu_buffer)) 7326 return (int)PTR_ERR(cpu_buffer); 7327 7328 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 7329 7330 consume: 7331 if (rb_per_cpu_empty(cpu_buffer)) 7332 goto out; 7333 7334 reader_size = rb_page_size(cpu_buffer->reader_page); 7335 7336 /* 7337 * There are data to be read on the current reader page, we can 7338 * return to the caller. But before that, we assume the latter will read 7339 * everything. Let's update the kernel reader accordingly. 7340 */ 7341 if (cpu_buffer->reader_page->read < reader_size) { 7342 while (cpu_buffer->reader_page->read < reader_size) 7343 rb_advance_reader(cpu_buffer); 7344 goto out; 7345 } 7346 7347 reader = rb_get_reader_page(cpu_buffer); 7348 if (WARN_ON(!reader)) 7349 goto out; 7350 7351 /* Check if any events were dropped */ 7352 missed_events = cpu_buffer->lost_events; 7353 7354 if (missed_events) { 7355 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 7356 struct buffer_data_page *bpage = reader->page; 7357 unsigned int commit; 7358 /* 7359 * Use the real_end for the data size, 7360 * This gives us a chance to store the lost events 7361 * on the page. 7362 */ 7363 if (reader->real_end) 7364 local_set(&bpage->commit, reader->real_end); 7365 /* 7366 * If there is room at the end of the page to save the 7367 * missed events, then record it there. 7368 */ 7369 commit = rb_page_size(reader); 7370 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 7371 memcpy(&bpage->data[commit], &missed_events, 7372 sizeof(missed_events)); 7373 local_add(RB_MISSED_STORED, &bpage->commit); 7374 } 7375 local_add(RB_MISSED_EVENTS, &bpage->commit); 7376 } else if (!WARN_ONCE(cpu_buffer->reader_page == cpu_buffer->tail_page, 7377 "Reader on commit with %ld missed events", 7378 missed_events)) { 7379 /* 7380 * There shouldn't be any missed events if the tail_page 7381 * is on the reader page. But if the tail page is not on the 7382 * reader page and the commit_page is, that would mean that 7383 * there's a commit_overrun (an interrupt preempted an 7384 * addition of an event and then filled the buffer 7385 * with new events). In this case it's not an 7386 * error, but it should still be reported. 7387 * 7388 * TODO: Add missed events to the page for user space to know. 7389 */ 7390 pr_info("Ring buffer [%d] commit overrun lost %ld events at timestamp:%lld\n", 7391 cpu, missed_events, cpu_buffer->reader_page->page->time_stamp); 7392 } 7393 } 7394 7395 cpu_buffer->lost_events = 0; 7396 7397 goto consume; 7398 7399 out: 7400 /* Some archs do not have data cache coherency between kernel and user-space */ 7401 flush_kernel_vmap_range(cpu_buffer->reader_page->page, 7402 buffer->subbuf_size + BUF_PAGE_HDR_SIZE); 7403 7404 rb_update_meta_page(cpu_buffer); 7405 7406 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 7407 rb_put_mapped_buffer(cpu_buffer); 7408 7409 return 0; 7410 } 7411 7412 /* 7413 * We only allocate new buffers, never free them if the CPU goes down. 7414 * If we were to free the buffer, then the user would lose any trace that was in 7415 * the buffer. 7416 */ 7417 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 7418 { 7419 struct trace_buffer *buffer; 7420 long nr_pages_same; 7421 int cpu_i; 7422 unsigned long nr_pages; 7423 7424 buffer = container_of(node, struct trace_buffer, node); 7425 if (cpumask_test_cpu(cpu, buffer->cpumask)) 7426 return 0; 7427 7428 nr_pages = 0; 7429 nr_pages_same = 1; 7430 /* check if all cpu sizes are same */ 7431 for_each_buffer_cpu(buffer, cpu_i) { 7432 /* fill in the size from first enabled cpu */ 7433 if (nr_pages == 0) 7434 nr_pages = buffer->buffers[cpu_i]->nr_pages; 7435 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 7436 nr_pages_same = 0; 7437 break; 7438 } 7439 } 7440 /* allocate minimum pages, user can later expand it */ 7441 if (!nr_pages_same) 7442 nr_pages = 2; 7443 buffer->buffers[cpu] = 7444 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 7445 if (!buffer->buffers[cpu]) { 7446 WARN(1, "failed to allocate ring buffer on CPU %u\n", 7447 cpu); 7448 return -ENOMEM; 7449 } 7450 smp_wmb(); 7451 cpumask_set_cpu(cpu, buffer->cpumask); 7452 return 0; 7453 } 7454 7455 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 7456 /* 7457 * This is a basic integrity check of the ring buffer. 7458 * Late in the boot cycle this test will run when configured in. 7459 * It will kick off a thread per CPU that will go into a loop 7460 * writing to the per cpu ring buffer various sizes of data. 7461 * Some of the data will be large items, some small. 7462 * 7463 * Another thread is created that goes into a spin, sending out 7464 * IPIs to the other CPUs to also write into the ring buffer. 7465 * this is to test the nesting ability of the buffer. 7466 * 7467 * Basic stats are recorded and reported. If something in the 7468 * ring buffer should happen that's not expected, a big warning 7469 * is displayed and all ring buffers are disabled. 7470 */ 7471 static struct task_struct *rb_threads[NR_CPUS] __initdata; 7472 7473 struct rb_test_data { 7474 struct trace_buffer *buffer; 7475 unsigned long events; 7476 unsigned long bytes_written; 7477 unsigned long bytes_alloc; 7478 unsigned long bytes_dropped; 7479 unsigned long events_nested; 7480 unsigned long bytes_written_nested; 7481 unsigned long bytes_alloc_nested; 7482 unsigned long bytes_dropped_nested; 7483 int min_size_nested; 7484 int max_size_nested; 7485 int max_size; 7486 int min_size; 7487 int cpu; 7488 int cnt; 7489 }; 7490 7491 static struct rb_test_data rb_data[NR_CPUS] __initdata; 7492 7493 /* 1 meg per cpu */ 7494 #define RB_TEST_BUFFER_SIZE 1048576 7495 7496 static char rb_string[] __initdata = 7497 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 7498 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 7499 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 7500 7501 static bool rb_test_started __initdata; 7502 7503 struct rb_item { 7504 int size; 7505 char str[]; 7506 }; 7507 7508 static __init int rb_write_something(struct rb_test_data *data, bool nested) 7509 { 7510 struct ring_buffer_event *event; 7511 struct rb_item *item; 7512 bool started; 7513 int event_len; 7514 int size; 7515 int len; 7516 int cnt; 7517 7518 /* Have nested writes different that what is written */ 7519 cnt = data->cnt + (nested ? 27 : 0); 7520 7521 /* Multiply cnt by ~e, to make some unique increment */ 7522 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 7523 7524 len = size + sizeof(struct rb_item); 7525 7526 started = rb_test_started; 7527 /* read rb_test_started before checking buffer enabled */ 7528 smp_rmb(); 7529 7530 event = ring_buffer_lock_reserve(data->buffer, len); 7531 if (!event) { 7532 /* Ignore dropped events before test starts. */ 7533 if (started) { 7534 if (nested) 7535 data->bytes_dropped_nested += len; 7536 else 7537 data->bytes_dropped += len; 7538 } 7539 return len; 7540 } 7541 7542 event_len = ring_buffer_event_length(event); 7543 7544 if (RB_WARN_ON(data->buffer, event_len < len)) 7545 goto out; 7546 7547 item = ring_buffer_event_data(event); 7548 item->size = size; 7549 memcpy(item->str, rb_string, size); 7550 7551 if (nested) { 7552 data->bytes_alloc_nested += event_len; 7553 data->bytes_written_nested += len; 7554 data->events_nested++; 7555 if (!data->min_size_nested || len < data->min_size_nested) 7556 data->min_size_nested = len; 7557 if (len > data->max_size_nested) 7558 data->max_size_nested = len; 7559 } else { 7560 data->bytes_alloc += event_len; 7561 data->bytes_written += len; 7562 data->events++; 7563 if (!data->min_size || len < data->min_size) 7564 data->max_size = len; 7565 if (len > data->max_size) 7566 data->max_size = len; 7567 } 7568 7569 out: 7570 ring_buffer_unlock_commit(data->buffer); 7571 7572 return 0; 7573 } 7574 7575 static __init int rb_test(void *arg) 7576 { 7577 struct rb_test_data *data = arg; 7578 7579 while (!kthread_should_stop()) { 7580 rb_write_something(data, false); 7581 data->cnt++; 7582 7583 set_current_state(TASK_INTERRUPTIBLE); 7584 /* Now sleep between a min of 100-300us and a max of 1ms */ 7585 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 7586 } 7587 7588 return 0; 7589 } 7590 7591 static __init void rb_ipi(void *ignore) 7592 { 7593 struct rb_test_data *data; 7594 int cpu = smp_processor_id(); 7595 7596 data = &rb_data[cpu]; 7597 rb_write_something(data, true); 7598 } 7599 7600 static __init int rb_hammer_test(void *arg) 7601 { 7602 while (!kthread_should_stop()) { 7603 7604 /* Send an IPI to all cpus to write data! */ 7605 smp_call_function(rb_ipi, NULL, 1); 7606 /* No sleep, but for non preempt, let others run */ 7607 schedule(); 7608 } 7609 7610 return 0; 7611 } 7612 7613 static __init int test_ringbuffer(void) 7614 { 7615 struct task_struct *rb_hammer; 7616 struct trace_buffer *buffer; 7617 int cpu; 7618 int ret = 0; 7619 7620 if (security_locked_down(LOCKDOWN_TRACEFS)) { 7621 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 7622 return 0; 7623 } 7624 7625 pr_info("Running ring buffer tests...\n"); 7626 7627 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 7628 if (WARN_ON(!buffer)) 7629 return 0; 7630 7631 /* Disable buffer so that threads can't write to it yet */ 7632 ring_buffer_record_off(buffer); 7633 7634 for_each_online_cpu(cpu) { 7635 rb_data[cpu].buffer = buffer; 7636 rb_data[cpu].cpu = cpu; 7637 rb_data[cpu].cnt = cpu; 7638 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 7639 cpu, "rbtester/%u"); 7640 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 7641 pr_cont("FAILED\n"); 7642 ret = PTR_ERR(rb_threads[cpu]); 7643 goto out_free; 7644 } 7645 } 7646 7647 /* Now create the rb hammer! */ 7648 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 7649 if (WARN_ON(IS_ERR(rb_hammer))) { 7650 pr_cont("FAILED\n"); 7651 ret = PTR_ERR(rb_hammer); 7652 goto out_free; 7653 } 7654 7655 ring_buffer_record_on(buffer); 7656 /* 7657 * Show buffer is enabled before setting rb_test_started. 7658 * Yes there's a small race window where events could be 7659 * dropped and the thread wont catch it. But when a ring 7660 * buffer gets enabled, there will always be some kind of 7661 * delay before other CPUs see it. Thus, we don't care about 7662 * those dropped events. We care about events dropped after 7663 * the threads see that the buffer is active. 7664 */ 7665 smp_wmb(); 7666 rb_test_started = true; 7667 7668 set_current_state(TASK_INTERRUPTIBLE); 7669 /* Just run for 10 seconds */ 7670 schedule_timeout(10 * HZ); 7671 7672 kthread_stop(rb_hammer); 7673 7674 out_free: 7675 for_each_online_cpu(cpu) { 7676 if (!rb_threads[cpu]) 7677 break; 7678 kthread_stop(rb_threads[cpu]); 7679 } 7680 if (ret) { 7681 ring_buffer_free(buffer); 7682 return ret; 7683 } 7684 7685 /* Report! */ 7686 pr_info("finished\n"); 7687 for_each_online_cpu(cpu) { 7688 struct ring_buffer_event *event; 7689 struct rb_test_data *data = &rb_data[cpu]; 7690 struct rb_item *item; 7691 unsigned long total_events; 7692 unsigned long total_dropped; 7693 unsigned long total_written; 7694 unsigned long total_alloc; 7695 unsigned long total_read = 0; 7696 unsigned long total_size = 0; 7697 unsigned long total_len = 0; 7698 unsigned long total_lost = 0; 7699 unsigned long lost; 7700 int big_event_size; 7701 int small_event_size; 7702 7703 ret = -1; 7704 7705 total_events = data->events + data->events_nested; 7706 total_written = data->bytes_written + data->bytes_written_nested; 7707 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 7708 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 7709 7710 big_event_size = data->max_size + data->max_size_nested; 7711 small_event_size = data->min_size + data->min_size_nested; 7712 7713 pr_info("CPU %d:\n", cpu); 7714 pr_info(" events: %ld\n", total_events); 7715 pr_info(" dropped bytes: %ld\n", total_dropped); 7716 pr_info(" alloced bytes: %ld\n", total_alloc); 7717 pr_info(" written bytes: %ld\n", total_written); 7718 pr_info(" biggest event: %d\n", big_event_size); 7719 pr_info(" smallest event: %d\n", small_event_size); 7720 7721 if (RB_WARN_ON(buffer, total_dropped)) 7722 break; 7723 7724 ret = 0; 7725 7726 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 7727 total_lost += lost; 7728 item = ring_buffer_event_data(event); 7729 total_len += ring_buffer_event_length(event); 7730 total_size += item->size + sizeof(struct rb_item); 7731 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 7732 pr_info("FAILED!\n"); 7733 pr_info("buffer had: %.*s\n", item->size, item->str); 7734 pr_info("expected: %.*s\n", item->size, rb_string); 7735 RB_WARN_ON(buffer, 1); 7736 ret = -1; 7737 break; 7738 } 7739 total_read++; 7740 } 7741 if (ret) 7742 break; 7743 7744 ret = -1; 7745 7746 pr_info(" read events: %ld\n", total_read); 7747 pr_info(" lost events: %ld\n", total_lost); 7748 pr_info(" total events: %ld\n", total_lost + total_read); 7749 pr_info(" recorded len bytes: %ld\n", total_len); 7750 pr_info(" recorded size bytes: %ld\n", total_size); 7751 if (total_lost) { 7752 pr_info(" With dropped events, record len and size may not match\n" 7753 " alloced and written from above\n"); 7754 } else { 7755 if (RB_WARN_ON(buffer, total_len != total_alloc || 7756 total_size != total_written)) 7757 break; 7758 } 7759 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 7760 break; 7761 7762 ret = 0; 7763 } 7764 if (!ret) 7765 pr_info("Ring buffer PASSED!\n"); 7766 7767 ring_buffer_free(buffer); 7768 return 0; 7769 } 7770 7771 late_initcall(test_ringbuffer); 7772 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 7773