1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 35 /* 36 * The "absolute" timestamp in the buffer is only 59 bits. 37 * If a clock has the 5 MSBs set, it needs to be saved and 38 * reinserted. 39 */ 40 #define TS_MSB (0xf8ULL << 56) 41 #define ABS_TS_MASK (~TS_MSB) 42 43 static void update_pages_handler(struct work_struct *work); 44 45 /* 46 * The ring buffer header is special. We must manually up keep it. 47 */ 48 int ring_buffer_print_entry_header(struct trace_seq *s) 49 { 50 trace_seq_puts(s, "# compressed entry header\n"); 51 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 52 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 53 trace_seq_puts(s, "\tarray : 32 bits\n"); 54 trace_seq_putc(s, '\n'); 55 trace_seq_printf(s, "\tpadding : type == %d\n", 56 RINGBUF_TYPE_PADDING); 57 trace_seq_printf(s, "\ttime_extend : type == %d\n", 58 RINGBUF_TYPE_TIME_EXTEND); 59 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 60 RINGBUF_TYPE_TIME_STAMP); 61 trace_seq_printf(s, "\tdata max type_len == %d\n", 62 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 63 64 return !trace_seq_has_overflowed(s); 65 } 66 67 /* 68 * The ring buffer is made up of a list of pages. A separate list of pages is 69 * allocated for each CPU. A writer may only write to a buffer that is 70 * associated with the CPU it is currently executing on. A reader may read 71 * from any per cpu buffer. 72 * 73 * The reader is special. For each per cpu buffer, the reader has its own 74 * reader page. When a reader has read the entire reader page, this reader 75 * page is swapped with another page in the ring buffer. 76 * 77 * Now, as long as the writer is off the reader page, the reader can do what 78 * ever it wants with that page. The writer will never write to that page 79 * again (as long as it is out of the ring buffer). 80 * 81 * Here's some silly ASCII art. 82 * 83 * +------+ 84 * |reader| RING BUFFER 85 * |page | 86 * +------+ +---+ +---+ +---+ 87 * | |-->| |-->| | 88 * +---+ +---+ +---+ 89 * ^ | 90 * | | 91 * +---------------+ 92 * 93 * 94 * +------+ 95 * |reader| RING BUFFER 96 * |page |------------------v 97 * +------+ +---+ +---+ +---+ 98 * | |-->| |-->| | 99 * +---+ +---+ +---+ 100 * ^ | 101 * | | 102 * +---------------+ 103 * 104 * 105 * +------+ 106 * |reader| RING BUFFER 107 * |page |------------------v 108 * +------+ +---+ +---+ +---+ 109 * ^ | |-->| |-->| | 110 * | +---+ +---+ +---+ 111 * | | 112 * | | 113 * +------------------------------+ 114 * 115 * 116 * +------+ 117 * |buffer| RING BUFFER 118 * |page |------------------v 119 * +------+ +---+ +---+ +---+ 120 * ^ | | | |-->| | 121 * | New +---+ +---+ +---+ 122 * | Reader------^ | 123 * | page | 124 * +------------------------------+ 125 * 126 * 127 * After we make this swap, the reader can hand this page off to the splice 128 * code and be done with it. It can even allocate a new page if it needs to 129 * and swap that into the ring buffer. 130 * 131 * We will be using cmpxchg soon to make all this lockless. 132 * 133 */ 134 135 /* Used for individual buffers (after the counter) */ 136 #define RB_BUFFER_OFF (1 << 20) 137 138 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 139 140 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 141 #define RB_ALIGNMENT 4U 142 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 143 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 144 145 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 146 # define RB_FORCE_8BYTE_ALIGNMENT 0 147 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 148 #else 149 # define RB_FORCE_8BYTE_ALIGNMENT 1 150 # define RB_ARCH_ALIGNMENT 8U 151 #endif 152 153 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 154 155 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 156 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 157 158 enum { 159 RB_LEN_TIME_EXTEND = 8, 160 RB_LEN_TIME_STAMP = 8, 161 }; 162 163 #define skip_time_extend(event) \ 164 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 165 166 #define extended_time(event) \ 167 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 168 169 static inline bool rb_null_event(struct ring_buffer_event *event) 170 { 171 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 172 } 173 174 static void rb_event_set_padding(struct ring_buffer_event *event) 175 { 176 /* padding has a NULL time_delta */ 177 event->type_len = RINGBUF_TYPE_PADDING; 178 event->time_delta = 0; 179 } 180 181 static unsigned 182 rb_event_data_length(struct ring_buffer_event *event) 183 { 184 unsigned length; 185 186 if (event->type_len) 187 length = event->type_len * RB_ALIGNMENT; 188 else 189 length = event->array[0]; 190 return length + RB_EVNT_HDR_SIZE; 191 } 192 193 /* 194 * Return the length of the given event. Will return 195 * the length of the time extend if the event is a 196 * time extend. 197 */ 198 static inline unsigned 199 rb_event_length(struct ring_buffer_event *event) 200 { 201 switch (event->type_len) { 202 case RINGBUF_TYPE_PADDING: 203 if (rb_null_event(event)) 204 /* undefined */ 205 return -1; 206 return event->array[0] + RB_EVNT_HDR_SIZE; 207 208 case RINGBUF_TYPE_TIME_EXTEND: 209 return RB_LEN_TIME_EXTEND; 210 211 case RINGBUF_TYPE_TIME_STAMP: 212 return RB_LEN_TIME_STAMP; 213 214 case RINGBUF_TYPE_DATA: 215 return rb_event_data_length(event); 216 default: 217 WARN_ON_ONCE(1); 218 } 219 /* not hit */ 220 return 0; 221 } 222 223 /* 224 * Return total length of time extend and data, 225 * or just the event length for all other events. 226 */ 227 static inline unsigned 228 rb_event_ts_length(struct ring_buffer_event *event) 229 { 230 unsigned len = 0; 231 232 if (extended_time(event)) { 233 /* time extends include the data event after it */ 234 len = RB_LEN_TIME_EXTEND; 235 event = skip_time_extend(event); 236 } 237 return len + rb_event_length(event); 238 } 239 240 /** 241 * ring_buffer_event_length - return the length of the event 242 * @event: the event to get the length of 243 * 244 * Returns the size of the data load of a data event. 245 * If the event is something other than a data event, it 246 * returns the size of the event itself. With the exception 247 * of a TIME EXTEND, where it still returns the size of the 248 * data load of the data event after it. 249 */ 250 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 251 { 252 unsigned length; 253 254 if (extended_time(event)) 255 event = skip_time_extend(event); 256 257 length = rb_event_length(event); 258 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 259 return length; 260 length -= RB_EVNT_HDR_SIZE; 261 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 262 length -= sizeof(event->array[0]); 263 return length; 264 } 265 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 266 267 /* inline for ring buffer fast paths */ 268 static __always_inline void * 269 rb_event_data(struct ring_buffer_event *event) 270 { 271 if (extended_time(event)) 272 event = skip_time_extend(event); 273 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 274 /* If length is in len field, then array[0] has the data */ 275 if (event->type_len) 276 return (void *)&event->array[0]; 277 /* Otherwise length is in array[0] and array[1] has the data */ 278 return (void *)&event->array[1]; 279 } 280 281 /** 282 * ring_buffer_event_data - return the data of the event 283 * @event: the event to get the data from 284 */ 285 void *ring_buffer_event_data(struct ring_buffer_event *event) 286 { 287 return rb_event_data(event); 288 } 289 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 290 291 #define for_each_buffer_cpu(buffer, cpu) \ 292 for_each_cpu(cpu, buffer->cpumask) 293 294 #define for_each_online_buffer_cpu(buffer, cpu) \ 295 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 296 297 #define TS_SHIFT 27 298 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 299 #define TS_DELTA_TEST (~TS_MASK) 300 301 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 302 { 303 u64 ts; 304 305 ts = event->array[0]; 306 ts <<= TS_SHIFT; 307 ts += event->time_delta; 308 309 return ts; 310 } 311 312 /* Flag when events were overwritten */ 313 #define RB_MISSED_EVENTS (1 << 31) 314 /* Missed count stored at end */ 315 #define RB_MISSED_STORED (1 << 30) 316 317 struct buffer_data_page { 318 u64 time_stamp; /* page time stamp */ 319 local_t commit; /* write committed index */ 320 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 321 }; 322 323 struct buffer_data_read_page { 324 unsigned order; /* order of the page */ 325 struct buffer_data_page *data; /* actual data, stored in this page */ 326 }; 327 328 /* 329 * Note, the buffer_page list must be first. The buffer pages 330 * are allocated in cache lines, which means that each buffer 331 * page will be at the beginning of a cache line, and thus 332 * the least significant bits will be zero. We use this to 333 * add flags in the list struct pointers, to make the ring buffer 334 * lockless. 335 */ 336 struct buffer_page { 337 struct list_head list; /* list of buffer pages */ 338 local_t write; /* index for next write */ 339 unsigned read; /* index for next read */ 340 local_t entries; /* entries on this page */ 341 unsigned long real_end; /* real end of data */ 342 unsigned order; /* order of the page */ 343 u32 id; /* ID for external mapping */ 344 struct buffer_data_page *page; /* Actual data page */ 345 }; 346 347 /* 348 * The buffer page counters, write and entries, must be reset 349 * atomically when crossing page boundaries. To synchronize this 350 * update, two counters are inserted into the number. One is 351 * the actual counter for the write position or count on the page. 352 * 353 * The other is a counter of updaters. Before an update happens 354 * the update partition of the counter is incremented. This will 355 * allow the updater to update the counter atomically. 356 * 357 * The counter is 20 bits, and the state data is 12. 358 */ 359 #define RB_WRITE_MASK 0xfffff 360 #define RB_WRITE_INTCNT (1 << 20) 361 362 static void rb_init_page(struct buffer_data_page *bpage) 363 { 364 local_set(&bpage->commit, 0); 365 } 366 367 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 368 { 369 return local_read(&bpage->page->commit); 370 } 371 372 static void free_buffer_page(struct buffer_page *bpage) 373 { 374 free_pages((unsigned long)bpage->page, bpage->order); 375 kfree(bpage); 376 } 377 378 /* 379 * We need to fit the time_stamp delta into 27 bits. 380 */ 381 static inline bool test_time_stamp(u64 delta) 382 { 383 return !!(delta & TS_DELTA_TEST); 384 } 385 386 struct rb_irq_work { 387 struct irq_work work; 388 wait_queue_head_t waiters; 389 wait_queue_head_t full_waiters; 390 atomic_t seq; 391 bool waiters_pending; 392 bool full_waiters_pending; 393 bool wakeup_full; 394 }; 395 396 /* 397 * Structure to hold event state and handle nested events. 398 */ 399 struct rb_event_info { 400 u64 ts; 401 u64 delta; 402 u64 before; 403 u64 after; 404 unsigned long length; 405 struct buffer_page *tail_page; 406 int add_timestamp; 407 }; 408 409 /* 410 * Used for the add_timestamp 411 * NONE 412 * EXTEND - wants a time extend 413 * ABSOLUTE - the buffer requests all events to have absolute time stamps 414 * FORCE - force a full time stamp. 415 */ 416 enum { 417 RB_ADD_STAMP_NONE = 0, 418 RB_ADD_STAMP_EXTEND = BIT(1), 419 RB_ADD_STAMP_ABSOLUTE = BIT(2), 420 RB_ADD_STAMP_FORCE = BIT(3) 421 }; 422 /* 423 * Used for which event context the event is in. 424 * TRANSITION = 0 425 * NMI = 1 426 * IRQ = 2 427 * SOFTIRQ = 3 428 * NORMAL = 4 429 * 430 * See trace_recursive_lock() comment below for more details. 431 */ 432 enum { 433 RB_CTX_TRANSITION, 434 RB_CTX_NMI, 435 RB_CTX_IRQ, 436 RB_CTX_SOFTIRQ, 437 RB_CTX_NORMAL, 438 RB_CTX_MAX 439 }; 440 441 struct rb_time_struct { 442 local64_t time; 443 }; 444 typedef struct rb_time_struct rb_time_t; 445 446 #define MAX_NEST 5 447 448 /* 449 * head_page == tail_page && head == tail then buffer is empty. 450 */ 451 struct ring_buffer_per_cpu { 452 int cpu; 453 atomic_t record_disabled; 454 atomic_t resize_disabled; 455 struct trace_buffer *buffer; 456 raw_spinlock_t reader_lock; /* serialize readers */ 457 arch_spinlock_t lock; 458 struct lock_class_key lock_key; 459 struct buffer_data_page *free_page; 460 unsigned long nr_pages; 461 unsigned int current_context; 462 struct list_head *pages; 463 struct buffer_page *head_page; /* read from head */ 464 struct buffer_page *tail_page; /* write to tail */ 465 struct buffer_page *commit_page; /* committed pages */ 466 struct buffer_page *reader_page; 467 unsigned long lost_events; 468 unsigned long last_overrun; 469 unsigned long nest; 470 local_t entries_bytes; 471 local_t entries; 472 local_t overrun; 473 local_t commit_overrun; 474 local_t dropped_events; 475 local_t committing; 476 local_t commits; 477 local_t pages_touched; 478 local_t pages_lost; 479 local_t pages_read; 480 long last_pages_touch; 481 size_t shortest_full; 482 unsigned long read; 483 unsigned long read_bytes; 484 rb_time_t write_stamp; 485 rb_time_t before_stamp; 486 u64 event_stamp[MAX_NEST]; 487 u64 read_stamp; 488 /* pages removed since last reset */ 489 unsigned long pages_removed; 490 491 unsigned int mapped; 492 struct mutex mapping_lock; 493 unsigned long *subbuf_ids; /* ID to subbuf VA */ 494 struct trace_buffer_meta *meta_page; 495 496 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 497 long nr_pages_to_update; 498 struct list_head new_pages; /* new pages to add */ 499 struct work_struct update_pages_work; 500 struct completion update_done; 501 502 struct rb_irq_work irq_work; 503 }; 504 505 struct trace_buffer { 506 unsigned flags; 507 int cpus; 508 atomic_t record_disabled; 509 atomic_t resizing; 510 cpumask_var_t cpumask; 511 512 struct lock_class_key *reader_lock_key; 513 514 struct mutex mutex; 515 516 struct ring_buffer_per_cpu **buffers; 517 518 struct hlist_node node; 519 u64 (*clock)(void); 520 521 struct rb_irq_work irq_work; 522 bool time_stamp_abs; 523 524 unsigned int subbuf_size; 525 unsigned int subbuf_order; 526 unsigned int max_data_size; 527 }; 528 529 struct ring_buffer_iter { 530 struct ring_buffer_per_cpu *cpu_buffer; 531 unsigned long head; 532 unsigned long next_event; 533 struct buffer_page *head_page; 534 struct buffer_page *cache_reader_page; 535 unsigned long cache_read; 536 unsigned long cache_pages_removed; 537 u64 read_stamp; 538 u64 page_stamp; 539 struct ring_buffer_event *event; 540 size_t event_size; 541 int missed_events; 542 }; 543 544 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 545 { 546 struct buffer_data_page field; 547 548 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 549 "offset:0;\tsize:%u;\tsigned:%u;\n", 550 (unsigned int)sizeof(field.time_stamp), 551 (unsigned int)is_signed_type(u64)); 552 553 trace_seq_printf(s, "\tfield: local_t commit;\t" 554 "offset:%u;\tsize:%u;\tsigned:%u;\n", 555 (unsigned int)offsetof(typeof(field), commit), 556 (unsigned int)sizeof(field.commit), 557 (unsigned int)is_signed_type(long)); 558 559 trace_seq_printf(s, "\tfield: int overwrite;\t" 560 "offset:%u;\tsize:%u;\tsigned:%u;\n", 561 (unsigned int)offsetof(typeof(field), commit), 562 1, 563 (unsigned int)is_signed_type(long)); 564 565 trace_seq_printf(s, "\tfield: char data;\t" 566 "offset:%u;\tsize:%u;\tsigned:%u;\n", 567 (unsigned int)offsetof(typeof(field), data), 568 (unsigned int)buffer->subbuf_size, 569 (unsigned int)is_signed_type(char)); 570 571 return !trace_seq_has_overflowed(s); 572 } 573 574 static inline void rb_time_read(rb_time_t *t, u64 *ret) 575 { 576 *ret = local64_read(&t->time); 577 } 578 static void rb_time_set(rb_time_t *t, u64 val) 579 { 580 local64_set(&t->time, val); 581 } 582 583 /* 584 * Enable this to make sure that the event passed to 585 * ring_buffer_event_time_stamp() is not committed and also 586 * is on the buffer that it passed in. 587 */ 588 //#define RB_VERIFY_EVENT 589 #ifdef RB_VERIFY_EVENT 590 static struct list_head *rb_list_head(struct list_head *list); 591 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 592 void *event) 593 { 594 struct buffer_page *page = cpu_buffer->commit_page; 595 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 596 struct list_head *next; 597 long commit, write; 598 unsigned long addr = (unsigned long)event; 599 bool done = false; 600 int stop = 0; 601 602 /* Make sure the event exists and is not committed yet */ 603 do { 604 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 605 done = true; 606 commit = local_read(&page->page->commit); 607 write = local_read(&page->write); 608 if (addr >= (unsigned long)&page->page->data[commit] && 609 addr < (unsigned long)&page->page->data[write]) 610 return; 611 612 next = rb_list_head(page->list.next); 613 page = list_entry(next, struct buffer_page, list); 614 } while (!done); 615 WARN_ON_ONCE(1); 616 } 617 #else 618 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 619 void *event) 620 { 621 } 622 #endif 623 624 /* 625 * The absolute time stamp drops the 5 MSBs and some clocks may 626 * require them. The rb_fix_abs_ts() will take a previous full 627 * time stamp, and add the 5 MSB of that time stamp on to the 628 * saved absolute time stamp. Then they are compared in case of 629 * the unlikely event that the latest time stamp incremented 630 * the 5 MSB. 631 */ 632 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 633 { 634 if (save_ts & TS_MSB) { 635 abs |= save_ts & TS_MSB; 636 /* Check for overflow */ 637 if (unlikely(abs < save_ts)) 638 abs += 1ULL << 59; 639 } 640 return abs; 641 } 642 643 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 644 645 /** 646 * ring_buffer_event_time_stamp - return the event's current time stamp 647 * @buffer: The buffer that the event is on 648 * @event: the event to get the time stamp of 649 * 650 * Note, this must be called after @event is reserved, and before it is 651 * committed to the ring buffer. And must be called from the same 652 * context where the event was reserved (normal, softirq, irq, etc). 653 * 654 * Returns the time stamp associated with the current event. 655 * If the event has an extended time stamp, then that is used as 656 * the time stamp to return. 657 * In the highly unlikely case that the event was nested more than 658 * the max nesting, then the write_stamp of the buffer is returned, 659 * otherwise current time is returned, but that really neither of 660 * the last two cases should ever happen. 661 */ 662 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 663 struct ring_buffer_event *event) 664 { 665 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 666 unsigned int nest; 667 u64 ts; 668 669 /* If the event includes an absolute time, then just use that */ 670 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 671 ts = rb_event_time_stamp(event); 672 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 673 } 674 675 nest = local_read(&cpu_buffer->committing); 676 verify_event(cpu_buffer, event); 677 if (WARN_ON_ONCE(!nest)) 678 goto fail; 679 680 /* Read the current saved nesting level time stamp */ 681 if (likely(--nest < MAX_NEST)) 682 return cpu_buffer->event_stamp[nest]; 683 684 /* Shouldn't happen, warn if it does */ 685 WARN_ONCE(1, "nest (%d) greater than max", nest); 686 687 fail: 688 rb_time_read(&cpu_buffer->write_stamp, &ts); 689 690 return ts; 691 } 692 693 /** 694 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 695 * @buffer: The ring_buffer to get the number of pages from 696 * @cpu: The cpu of the ring_buffer to get the number of pages from 697 * 698 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 699 */ 700 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 701 { 702 return buffer->buffers[cpu]->nr_pages; 703 } 704 705 /** 706 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 707 * @buffer: The ring_buffer to get the number of pages from 708 * @cpu: The cpu of the ring_buffer to get the number of pages from 709 * 710 * Returns the number of pages that have content in the ring buffer. 711 */ 712 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 713 { 714 size_t read; 715 size_t lost; 716 size_t cnt; 717 718 read = local_read(&buffer->buffers[cpu]->pages_read); 719 lost = local_read(&buffer->buffers[cpu]->pages_lost); 720 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 721 722 if (WARN_ON_ONCE(cnt < lost)) 723 return 0; 724 725 cnt -= lost; 726 727 /* The reader can read an empty page, but not more than that */ 728 if (cnt < read) { 729 WARN_ON_ONCE(read > cnt + 1); 730 return 0; 731 } 732 733 return cnt - read; 734 } 735 736 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 737 { 738 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 739 size_t nr_pages; 740 size_t dirty; 741 742 nr_pages = cpu_buffer->nr_pages; 743 if (!nr_pages || !full) 744 return true; 745 746 /* 747 * Add one as dirty will never equal nr_pages, as the sub-buffer 748 * that the writer is on is not counted as dirty. 749 * This is needed if "buffer_percent" is set to 100. 750 */ 751 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 752 753 return (dirty * 100) >= (full * nr_pages); 754 } 755 756 /* 757 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 758 * 759 * Schedules a delayed work to wake up any task that is blocked on the 760 * ring buffer waiters queue. 761 */ 762 static void rb_wake_up_waiters(struct irq_work *work) 763 { 764 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 765 766 /* For waiters waiting for the first wake up */ 767 (void)atomic_fetch_inc_release(&rbwork->seq); 768 769 wake_up_all(&rbwork->waiters); 770 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 771 /* Only cpu_buffer sets the above flags */ 772 struct ring_buffer_per_cpu *cpu_buffer = 773 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 774 775 /* Called from interrupt context */ 776 raw_spin_lock(&cpu_buffer->reader_lock); 777 rbwork->wakeup_full = false; 778 rbwork->full_waiters_pending = false; 779 780 /* Waking up all waiters, they will reset the shortest full */ 781 cpu_buffer->shortest_full = 0; 782 raw_spin_unlock(&cpu_buffer->reader_lock); 783 784 wake_up_all(&rbwork->full_waiters); 785 } 786 } 787 788 /** 789 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 790 * @buffer: The ring buffer to wake waiters on 791 * @cpu: The CPU buffer to wake waiters on 792 * 793 * In the case of a file that represents a ring buffer is closing, 794 * it is prudent to wake up any waiters that are on this. 795 */ 796 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 797 { 798 struct ring_buffer_per_cpu *cpu_buffer; 799 struct rb_irq_work *rbwork; 800 801 if (!buffer) 802 return; 803 804 if (cpu == RING_BUFFER_ALL_CPUS) { 805 806 /* Wake up individual ones too. One level recursion */ 807 for_each_buffer_cpu(buffer, cpu) 808 ring_buffer_wake_waiters(buffer, cpu); 809 810 rbwork = &buffer->irq_work; 811 } else { 812 if (WARN_ON_ONCE(!buffer->buffers)) 813 return; 814 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 815 return; 816 817 cpu_buffer = buffer->buffers[cpu]; 818 /* The CPU buffer may not have been initialized yet */ 819 if (!cpu_buffer) 820 return; 821 rbwork = &cpu_buffer->irq_work; 822 } 823 824 /* This can be called in any context */ 825 irq_work_queue(&rbwork->work); 826 } 827 828 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 829 { 830 struct ring_buffer_per_cpu *cpu_buffer; 831 bool ret = false; 832 833 /* Reads of all CPUs always waits for any data */ 834 if (cpu == RING_BUFFER_ALL_CPUS) 835 return !ring_buffer_empty(buffer); 836 837 cpu_buffer = buffer->buffers[cpu]; 838 839 if (!ring_buffer_empty_cpu(buffer, cpu)) { 840 unsigned long flags; 841 bool pagebusy; 842 843 if (!full) 844 return true; 845 846 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 847 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 848 ret = !pagebusy && full_hit(buffer, cpu, full); 849 850 if (!ret && (!cpu_buffer->shortest_full || 851 cpu_buffer->shortest_full > full)) { 852 cpu_buffer->shortest_full = full; 853 } 854 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 855 } 856 return ret; 857 } 858 859 static inline bool 860 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 861 int cpu, int full, ring_buffer_cond_fn cond, void *data) 862 { 863 if (rb_watermark_hit(buffer, cpu, full)) 864 return true; 865 866 if (cond(data)) 867 return true; 868 869 /* 870 * The events can happen in critical sections where 871 * checking a work queue can cause deadlocks. 872 * After adding a task to the queue, this flag is set 873 * only to notify events to try to wake up the queue 874 * using irq_work. 875 * 876 * We don't clear it even if the buffer is no longer 877 * empty. The flag only causes the next event to run 878 * irq_work to do the work queue wake up. The worse 879 * that can happen if we race with !trace_empty() is that 880 * an event will cause an irq_work to try to wake up 881 * an empty queue. 882 * 883 * There's no reason to protect this flag either, as 884 * the work queue and irq_work logic will do the necessary 885 * synchronization for the wake ups. The only thing 886 * that is necessary is that the wake up happens after 887 * a task has been queued. It's OK for spurious wake ups. 888 */ 889 if (full) 890 rbwork->full_waiters_pending = true; 891 else 892 rbwork->waiters_pending = true; 893 894 return false; 895 } 896 897 struct rb_wait_data { 898 struct rb_irq_work *irq_work; 899 int seq; 900 }; 901 902 /* 903 * The default wait condition for ring_buffer_wait() is to just to exit the 904 * wait loop the first time it is woken up. 905 */ 906 static bool rb_wait_once(void *data) 907 { 908 struct rb_wait_data *rdata = data; 909 struct rb_irq_work *rbwork = rdata->irq_work; 910 911 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 912 } 913 914 /** 915 * ring_buffer_wait - wait for input to the ring buffer 916 * @buffer: buffer to wait on 917 * @cpu: the cpu buffer to wait on 918 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 919 * @cond: condition function to break out of wait (NULL to run once) 920 * @data: the data to pass to @cond. 921 * 922 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 923 * as data is added to any of the @buffer's cpu buffers. Otherwise 924 * it will wait for data to be added to a specific cpu buffer. 925 */ 926 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 927 ring_buffer_cond_fn cond, void *data) 928 { 929 struct ring_buffer_per_cpu *cpu_buffer; 930 struct wait_queue_head *waitq; 931 struct rb_irq_work *rbwork; 932 struct rb_wait_data rdata; 933 int ret = 0; 934 935 /* 936 * Depending on what the caller is waiting for, either any 937 * data in any cpu buffer, or a specific buffer, put the 938 * caller on the appropriate wait queue. 939 */ 940 if (cpu == RING_BUFFER_ALL_CPUS) { 941 rbwork = &buffer->irq_work; 942 /* Full only makes sense on per cpu reads */ 943 full = 0; 944 } else { 945 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 946 return -ENODEV; 947 cpu_buffer = buffer->buffers[cpu]; 948 rbwork = &cpu_buffer->irq_work; 949 } 950 951 if (full) 952 waitq = &rbwork->full_waiters; 953 else 954 waitq = &rbwork->waiters; 955 956 /* Set up to exit loop as soon as it is woken */ 957 if (!cond) { 958 cond = rb_wait_once; 959 rdata.irq_work = rbwork; 960 rdata.seq = atomic_read_acquire(&rbwork->seq); 961 data = &rdata; 962 } 963 964 ret = wait_event_interruptible((*waitq), 965 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 966 967 return ret; 968 } 969 970 /** 971 * ring_buffer_poll_wait - poll on buffer input 972 * @buffer: buffer to wait on 973 * @cpu: the cpu buffer to wait on 974 * @filp: the file descriptor 975 * @poll_table: The poll descriptor 976 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 977 * 978 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 979 * as data is added to any of the @buffer's cpu buffers. Otherwise 980 * it will wait for data to be added to a specific cpu buffer. 981 * 982 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 983 * zero otherwise. 984 */ 985 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 986 struct file *filp, poll_table *poll_table, int full) 987 { 988 struct ring_buffer_per_cpu *cpu_buffer; 989 struct rb_irq_work *rbwork; 990 991 if (cpu == RING_BUFFER_ALL_CPUS) { 992 rbwork = &buffer->irq_work; 993 full = 0; 994 } else { 995 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 996 return EPOLLERR; 997 998 cpu_buffer = buffer->buffers[cpu]; 999 rbwork = &cpu_buffer->irq_work; 1000 } 1001 1002 if (full) { 1003 poll_wait(filp, &rbwork->full_waiters, poll_table); 1004 1005 if (rb_watermark_hit(buffer, cpu, full)) 1006 return EPOLLIN | EPOLLRDNORM; 1007 /* 1008 * Only allow full_waiters_pending update to be seen after 1009 * the shortest_full is set (in rb_watermark_hit). If the 1010 * writer sees the full_waiters_pending flag set, it will 1011 * compare the amount in the ring buffer to shortest_full. 1012 * If the amount in the ring buffer is greater than the 1013 * shortest_full percent, it will call the irq_work handler 1014 * to wake up this list. The irq_handler will reset shortest_full 1015 * back to zero. That's done under the reader_lock, but 1016 * the below smp_mb() makes sure that the update to 1017 * full_waiters_pending doesn't leak up into the above. 1018 */ 1019 smp_mb(); 1020 rbwork->full_waiters_pending = true; 1021 return 0; 1022 } 1023 1024 poll_wait(filp, &rbwork->waiters, poll_table); 1025 rbwork->waiters_pending = true; 1026 1027 /* 1028 * There's a tight race between setting the waiters_pending and 1029 * checking if the ring buffer is empty. Once the waiters_pending bit 1030 * is set, the next event will wake the task up, but we can get stuck 1031 * if there's only a single event in. 1032 * 1033 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1034 * but adding a memory barrier to all events will cause too much of a 1035 * performance hit in the fast path. We only need a memory barrier when 1036 * the buffer goes from empty to having content. But as this race is 1037 * extremely small, and it's not a problem if another event comes in, we 1038 * will fix it later. 1039 */ 1040 smp_mb(); 1041 1042 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1043 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1044 return EPOLLIN | EPOLLRDNORM; 1045 return 0; 1046 } 1047 1048 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1049 #define RB_WARN_ON(b, cond) \ 1050 ({ \ 1051 int _____ret = unlikely(cond); \ 1052 if (_____ret) { \ 1053 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1054 struct ring_buffer_per_cpu *__b = \ 1055 (void *)b; \ 1056 atomic_inc(&__b->buffer->record_disabled); \ 1057 } else \ 1058 atomic_inc(&b->record_disabled); \ 1059 WARN_ON(1); \ 1060 } \ 1061 _____ret; \ 1062 }) 1063 1064 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1065 #define DEBUG_SHIFT 0 1066 1067 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1068 { 1069 u64 ts; 1070 1071 /* Skip retpolines :-( */ 1072 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1073 ts = trace_clock_local(); 1074 else 1075 ts = buffer->clock(); 1076 1077 /* shift to debug/test normalization and TIME_EXTENTS */ 1078 return ts << DEBUG_SHIFT; 1079 } 1080 1081 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1082 { 1083 u64 time; 1084 1085 preempt_disable_notrace(); 1086 time = rb_time_stamp(buffer); 1087 preempt_enable_notrace(); 1088 1089 return time; 1090 } 1091 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1092 1093 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1094 int cpu, u64 *ts) 1095 { 1096 /* Just stupid testing the normalize function and deltas */ 1097 *ts >>= DEBUG_SHIFT; 1098 } 1099 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1100 1101 /* 1102 * Making the ring buffer lockless makes things tricky. 1103 * Although writes only happen on the CPU that they are on, 1104 * and they only need to worry about interrupts. Reads can 1105 * happen on any CPU. 1106 * 1107 * The reader page is always off the ring buffer, but when the 1108 * reader finishes with a page, it needs to swap its page with 1109 * a new one from the buffer. The reader needs to take from 1110 * the head (writes go to the tail). But if a writer is in overwrite 1111 * mode and wraps, it must push the head page forward. 1112 * 1113 * Here lies the problem. 1114 * 1115 * The reader must be careful to replace only the head page, and 1116 * not another one. As described at the top of the file in the 1117 * ASCII art, the reader sets its old page to point to the next 1118 * page after head. It then sets the page after head to point to 1119 * the old reader page. But if the writer moves the head page 1120 * during this operation, the reader could end up with the tail. 1121 * 1122 * We use cmpxchg to help prevent this race. We also do something 1123 * special with the page before head. We set the LSB to 1. 1124 * 1125 * When the writer must push the page forward, it will clear the 1126 * bit that points to the head page, move the head, and then set 1127 * the bit that points to the new head page. 1128 * 1129 * We also don't want an interrupt coming in and moving the head 1130 * page on another writer. Thus we use the second LSB to catch 1131 * that too. Thus: 1132 * 1133 * head->list->prev->next bit 1 bit 0 1134 * ------- ------- 1135 * Normal page 0 0 1136 * Points to head page 0 1 1137 * New head page 1 0 1138 * 1139 * Note we can not trust the prev pointer of the head page, because: 1140 * 1141 * +----+ +-----+ +-----+ 1142 * | |------>| T |---X--->| N | 1143 * | |<------| | | | 1144 * +----+ +-----+ +-----+ 1145 * ^ ^ | 1146 * | +-----+ | | 1147 * +----------| R |----------+ | 1148 * | |<-----------+ 1149 * +-----+ 1150 * 1151 * Key: ---X--> HEAD flag set in pointer 1152 * T Tail page 1153 * R Reader page 1154 * N Next page 1155 * 1156 * (see __rb_reserve_next() to see where this happens) 1157 * 1158 * What the above shows is that the reader just swapped out 1159 * the reader page with a page in the buffer, but before it 1160 * could make the new header point back to the new page added 1161 * it was preempted by a writer. The writer moved forward onto 1162 * the new page added by the reader and is about to move forward 1163 * again. 1164 * 1165 * You can see, it is legitimate for the previous pointer of 1166 * the head (or any page) not to point back to itself. But only 1167 * temporarily. 1168 */ 1169 1170 #define RB_PAGE_NORMAL 0UL 1171 #define RB_PAGE_HEAD 1UL 1172 #define RB_PAGE_UPDATE 2UL 1173 1174 1175 #define RB_FLAG_MASK 3UL 1176 1177 /* PAGE_MOVED is not part of the mask */ 1178 #define RB_PAGE_MOVED 4UL 1179 1180 /* 1181 * rb_list_head - remove any bit 1182 */ 1183 static struct list_head *rb_list_head(struct list_head *list) 1184 { 1185 unsigned long val = (unsigned long)list; 1186 1187 return (struct list_head *)(val & ~RB_FLAG_MASK); 1188 } 1189 1190 /* 1191 * rb_is_head_page - test if the given page is the head page 1192 * 1193 * Because the reader may move the head_page pointer, we can 1194 * not trust what the head page is (it may be pointing to 1195 * the reader page). But if the next page is a header page, 1196 * its flags will be non zero. 1197 */ 1198 static inline int 1199 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1200 { 1201 unsigned long val; 1202 1203 val = (unsigned long)list->next; 1204 1205 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1206 return RB_PAGE_MOVED; 1207 1208 return val & RB_FLAG_MASK; 1209 } 1210 1211 /* 1212 * rb_is_reader_page 1213 * 1214 * The unique thing about the reader page, is that, if the 1215 * writer is ever on it, the previous pointer never points 1216 * back to the reader page. 1217 */ 1218 static bool rb_is_reader_page(struct buffer_page *page) 1219 { 1220 struct list_head *list = page->list.prev; 1221 1222 return rb_list_head(list->next) != &page->list; 1223 } 1224 1225 /* 1226 * rb_set_list_to_head - set a list_head to be pointing to head. 1227 */ 1228 static void rb_set_list_to_head(struct list_head *list) 1229 { 1230 unsigned long *ptr; 1231 1232 ptr = (unsigned long *)&list->next; 1233 *ptr |= RB_PAGE_HEAD; 1234 *ptr &= ~RB_PAGE_UPDATE; 1235 } 1236 1237 /* 1238 * rb_head_page_activate - sets up head page 1239 */ 1240 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1241 { 1242 struct buffer_page *head; 1243 1244 head = cpu_buffer->head_page; 1245 if (!head) 1246 return; 1247 1248 /* 1249 * Set the previous list pointer to have the HEAD flag. 1250 */ 1251 rb_set_list_to_head(head->list.prev); 1252 } 1253 1254 static void rb_list_head_clear(struct list_head *list) 1255 { 1256 unsigned long *ptr = (unsigned long *)&list->next; 1257 1258 *ptr &= ~RB_FLAG_MASK; 1259 } 1260 1261 /* 1262 * rb_head_page_deactivate - clears head page ptr (for free list) 1263 */ 1264 static void 1265 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1266 { 1267 struct list_head *hd; 1268 1269 /* Go through the whole list and clear any pointers found. */ 1270 rb_list_head_clear(cpu_buffer->pages); 1271 1272 list_for_each(hd, cpu_buffer->pages) 1273 rb_list_head_clear(hd); 1274 } 1275 1276 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1277 struct buffer_page *head, 1278 struct buffer_page *prev, 1279 int old_flag, int new_flag) 1280 { 1281 struct list_head *list; 1282 unsigned long val = (unsigned long)&head->list; 1283 unsigned long ret; 1284 1285 list = &prev->list; 1286 1287 val &= ~RB_FLAG_MASK; 1288 1289 ret = cmpxchg((unsigned long *)&list->next, 1290 val | old_flag, val | new_flag); 1291 1292 /* check if the reader took the page */ 1293 if ((ret & ~RB_FLAG_MASK) != val) 1294 return RB_PAGE_MOVED; 1295 1296 return ret & RB_FLAG_MASK; 1297 } 1298 1299 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1300 struct buffer_page *head, 1301 struct buffer_page *prev, 1302 int old_flag) 1303 { 1304 return rb_head_page_set(cpu_buffer, head, prev, 1305 old_flag, RB_PAGE_UPDATE); 1306 } 1307 1308 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1309 struct buffer_page *head, 1310 struct buffer_page *prev, 1311 int old_flag) 1312 { 1313 return rb_head_page_set(cpu_buffer, head, prev, 1314 old_flag, RB_PAGE_HEAD); 1315 } 1316 1317 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1318 struct buffer_page *head, 1319 struct buffer_page *prev, 1320 int old_flag) 1321 { 1322 return rb_head_page_set(cpu_buffer, head, prev, 1323 old_flag, RB_PAGE_NORMAL); 1324 } 1325 1326 static inline void rb_inc_page(struct buffer_page **bpage) 1327 { 1328 struct list_head *p = rb_list_head((*bpage)->list.next); 1329 1330 *bpage = list_entry(p, struct buffer_page, list); 1331 } 1332 1333 static struct buffer_page * 1334 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1335 { 1336 struct buffer_page *head; 1337 struct buffer_page *page; 1338 struct list_head *list; 1339 int i; 1340 1341 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1342 return NULL; 1343 1344 /* sanity check */ 1345 list = cpu_buffer->pages; 1346 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1347 return NULL; 1348 1349 page = head = cpu_buffer->head_page; 1350 /* 1351 * It is possible that the writer moves the header behind 1352 * where we started, and we miss in one loop. 1353 * A second loop should grab the header, but we'll do 1354 * three loops just because I'm paranoid. 1355 */ 1356 for (i = 0; i < 3; i++) { 1357 do { 1358 if (rb_is_head_page(page, page->list.prev)) { 1359 cpu_buffer->head_page = page; 1360 return page; 1361 } 1362 rb_inc_page(&page); 1363 } while (page != head); 1364 } 1365 1366 RB_WARN_ON(cpu_buffer, 1); 1367 1368 return NULL; 1369 } 1370 1371 static bool rb_head_page_replace(struct buffer_page *old, 1372 struct buffer_page *new) 1373 { 1374 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1375 unsigned long val; 1376 1377 val = *ptr & ~RB_FLAG_MASK; 1378 val |= RB_PAGE_HEAD; 1379 1380 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1381 } 1382 1383 /* 1384 * rb_tail_page_update - move the tail page forward 1385 */ 1386 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1387 struct buffer_page *tail_page, 1388 struct buffer_page *next_page) 1389 { 1390 unsigned long old_entries; 1391 unsigned long old_write; 1392 1393 /* 1394 * The tail page now needs to be moved forward. 1395 * 1396 * We need to reset the tail page, but without messing 1397 * with possible erasing of data brought in by interrupts 1398 * that have moved the tail page and are currently on it. 1399 * 1400 * We add a counter to the write field to denote this. 1401 */ 1402 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1403 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1404 1405 /* 1406 * Just make sure we have seen our old_write and synchronize 1407 * with any interrupts that come in. 1408 */ 1409 barrier(); 1410 1411 /* 1412 * If the tail page is still the same as what we think 1413 * it is, then it is up to us to update the tail 1414 * pointer. 1415 */ 1416 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1417 /* Zero the write counter */ 1418 unsigned long val = old_write & ~RB_WRITE_MASK; 1419 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1420 1421 /* 1422 * This will only succeed if an interrupt did 1423 * not come in and change it. In which case, we 1424 * do not want to modify it. 1425 * 1426 * We add (void) to let the compiler know that we do not care 1427 * about the return value of these functions. We use the 1428 * cmpxchg to only update if an interrupt did not already 1429 * do it for us. If the cmpxchg fails, we don't care. 1430 */ 1431 (void)local_cmpxchg(&next_page->write, old_write, val); 1432 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1433 1434 /* 1435 * No need to worry about races with clearing out the commit. 1436 * it only can increment when a commit takes place. But that 1437 * only happens in the outer most nested commit. 1438 */ 1439 local_set(&next_page->page->commit, 0); 1440 1441 /* Either we update tail_page or an interrupt does */ 1442 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1443 local_inc(&cpu_buffer->pages_touched); 1444 } 1445 } 1446 1447 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1448 struct buffer_page *bpage) 1449 { 1450 unsigned long val = (unsigned long)bpage; 1451 1452 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1453 } 1454 1455 /** 1456 * rb_check_pages - integrity check of buffer pages 1457 * @cpu_buffer: CPU buffer with pages to test 1458 * 1459 * As a safety measure we check to make sure the data pages have not 1460 * been corrupted. 1461 */ 1462 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1463 { 1464 struct list_head *head = rb_list_head(cpu_buffer->pages); 1465 struct list_head *tmp; 1466 1467 if (RB_WARN_ON(cpu_buffer, 1468 rb_list_head(rb_list_head(head->next)->prev) != head)) 1469 return; 1470 1471 if (RB_WARN_ON(cpu_buffer, 1472 rb_list_head(rb_list_head(head->prev)->next) != head)) 1473 return; 1474 1475 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1476 if (RB_WARN_ON(cpu_buffer, 1477 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1478 return; 1479 1480 if (RB_WARN_ON(cpu_buffer, 1481 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1482 return; 1483 } 1484 } 1485 1486 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1487 long nr_pages, struct list_head *pages) 1488 { 1489 struct buffer_page *bpage, *tmp; 1490 bool user_thread = current->mm != NULL; 1491 gfp_t mflags; 1492 long i; 1493 1494 /* 1495 * Check if the available memory is there first. 1496 * Note, si_mem_available() only gives us a rough estimate of available 1497 * memory. It may not be accurate. But we don't care, we just want 1498 * to prevent doing any allocation when it is obvious that it is 1499 * not going to succeed. 1500 */ 1501 i = si_mem_available(); 1502 if (i < nr_pages) 1503 return -ENOMEM; 1504 1505 /* 1506 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1507 * gracefully without invoking oom-killer and the system is not 1508 * destabilized. 1509 */ 1510 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1511 1512 /* 1513 * If a user thread allocates too much, and si_mem_available() 1514 * reports there's enough memory, even though there is not. 1515 * Make sure the OOM killer kills this thread. This can happen 1516 * even with RETRY_MAYFAIL because another task may be doing 1517 * an allocation after this task has taken all memory. 1518 * This is the task the OOM killer needs to take out during this 1519 * loop, even if it was triggered by an allocation somewhere else. 1520 */ 1521 if (user_thread) 1522 set_current_oom_origin(); 1523 for (i = 0; i < nr_pages; i++) { 1524 struct page *page; 1525 1526 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1527 mflags, cpu_to_node(cpu_buffer->cpu)); 1528 if (!bpage) 1529 goto free_pages; 1530 1531 rb_check_bpage(cpu_buffer, bpage); 1532 1533 list_add(&bpage->list, pages); 1534 1535 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 1536 mflags | __GFP_COMP | __GFP_ZERO, 1537 cpu_buffer->buffer->subbuf_order); 1538 if (!page) 1539 goto free_pages; 1540 bpage->page = page_address(page); 1541 bpage->order = cpu_buffer->buffer->subbuf_order; 1542 rb_init_page(bpage->page); 1543 1544 if (user_thread && fatal_signal_pending(current)) 1545 goto free_pages; 1546 } 1547 if (user_thread) 1548 clear_current_oom_origin(); 1549 1550 return 0; 1551 1552 free_pages: 1553 list_for_each_entry_safe(bpage, tmp, pages, list) { 1554 list_del_init(&bpage->list); 1555 free_buffer_page(bpage); 1556 } 1557 if (user_thread) 1558 clear_current_oom_origin(); 1559 1560 return -ENOMEM; 1561 } 1562 1563 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1564 unsigned long nr_pages) 1565 { 1566 LIST_HEAD(pages); 1567 1568 WARN_ON(!nr_pages); 1569 1570 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1571 return -ENOMEM; 1572 1573 /* 1574 * The ring buffer page list is a circular list that does not 1575 * start and end with a list head. All page list items point to 1576 * other pages. 1577 */ 1578 cpu_buffer->pages = pages.next; 1579 list_del(&pages); 1580 1581 cpu_buffer->nr_pages = nr_pages; 1582 1583 rb_check_pages(cpu_buffer); 1584 1585 return 0; 1586 } 1587 1588 static struct ring_buffer_per_cpu * 1589 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1590 { 1591 struct ring_buffer_per_cpu *cpu_buffer; 1592 struct buffer_page *bpage; 1593 struct page *page; 1594 int ret; 1595 1596 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1597 GFP_KERNEL, cpu_to_node(cpu)); 1598 if (!cpu_buffer) 1599 return NULL; 1600 1601 cpu_buffer->cpu = cpu; 1602 cpu_buffer->buffer = buffer; 1603 raw_spin_lock_init(&cpu_buffer->reader_lock); 1604 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1605 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1606 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1607 init_completion(&cpu_buffer->update_done); 1608 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1609 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1610 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1611 mutex_init(&cpu_buffer->mapping_lock); 1612 1613 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1614 GFP_KERNEL, cpu_to_node(cpu)); 1615 if (!bpage) 1616 goto fail_free_buffer; 1617 1618 rb_check_bpage(cpu_buffer, bpage); 1619 1620 cpu_buffer->reader_page = bpage; 1621 1622 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 1623 cpu_buffer->buffer->subbuf_order); 1624 if (!page) 1625 goto fail_free_reader; 1626 bpage->page = page_address(page); 1627 rb_init_page(bpage->page); 1628 1629 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1630 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1631 1632 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1633 if (ret < 0) 1634 goto fail_free_reader; 1635 1636 cpu_buffer->head_page 1637 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1638 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1639 1640 rb_head_page_activate(cpu_buffer); 1641 1642 return cpu_buffer; 1643 1644 fail_free_reader: 1645 free_buffer_page(cpu_buffer->reader_page); 1646 1647 fail_free_buffer: 1648 kfree(cpu_buffer); 1649 return NULL; 1650 } 1651 1652 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1653 { 1654 struct list_head *head = cpu_buffer->pages; 1655 struct buffer_page *bpage, *tmp; 1656 1657 irq_work_sync(&cpu_buffer->irq_work.work); 1658 1659 free_buffer_page(cpu_buffer->reader_page); 1660 1661 if (head) { 1662 rb_head_page_deactivate(cpu_buffer); 1663 1664 list_for_each_entry_safe(bpage, tmp, head, list) { 1665 list_del_init(&bpage->list); 1666 free_buffer_page(bpage); 1667 } 1668 bpage = list_entry(head, struct buffer_page, list); 1669 free_buffer_page(bpage); 1670 } 1671 1672 free_page((unsigned long)cpu_buffer->free_page); 1673 1674 kfree(cpu_buffer); 1675 } 1676 1677 /** 1678 * __ring_buffer_alloc - allocate a new ring_buffer 1679 * @size: the size in bytes per cpu that is needed. 1680 * @flags: attributes to set for the ring buffer. 1681 * @key: ring buffer reader_lock_key. 1682 * 1683 * Currently the only flag that is available is the RB_FL_OVERWRITE 1684 * flag. This flag means that the buffer will overwrite old data 1685 * when the buffer wraps. If this flag is not set, the buffer will 1686 * drop data when the tail hits the head. 1687 */ 1688 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1689 struct lock_class_key *key) 1690 { 1691 struct trace_buffer *buffer; 1692 long nr_pages; 1693 int bsize; 1694 int cpu; 1695 int ret; 1696 1697 /* keep it in its own cache line */ 1698 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1699 GFP_KERNEL); 1700 if (!buffer) 1701 return NULL; 1702 1703 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1704 goto fail_free_buffer; 1705 1706 /* Default buffer page size - one system page */ 1707 buffer->subbuf_order = 0; 1708 buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE; 1709 1710 /* Max payload is buffer page size - header (8bytes) */ 1711 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 1712 1713 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 1714 buffer->flags = flags; 1715 buffer->clock = trace_clock_local; 1716 buffer->reader_lock_key = key; 1717 1718 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1719 init_waitqueue_head(&buffer->irq_work.waiters); 1720 1721 /* need at least two pages */ 1722 if (nr_pages < 2) 1723 nr_pages = 2; 1724 1725 buffer->cpus = nr_cpu_ids; 1726 1727 bsize = sizeof(void *) * nr_cpu_ids; 1728 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1729 GFP_KERNEL); 1730 if (!buffer->buffers) 1731 goto fail_free_cpumask; 1732 1733 cpu = raw_smp_processor_id(); 1734 cpumask_set_cpu(cpu, buffer->cpumask); 1735 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1736 if (!buffer->buffers[cpu]) 1737 goto fail_free_buffers; 1738 1739 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1740 if (ret < 0) 1741 goto fail_free_buffers; 1742 1743 mutex_init(&buffer->mutex); 1744 1745 return buffer; 1746 1747 fail_free_buffers: 1748 for_each_buffer_cpu(buffer, cpu) { 1749 if (buffer->buffers[cpu]) 1750 rb_free_cpu_buffer(buffer->buffers[cpu]); 1751 } 1752 kfree(buffer->buffers); 1753 1754 fail_free_cpumask: 1755 free_cpumask_var(buffer->cpumask); 1756 1757 fail_free_buffer: 1758 kfree(buffer); 1759 return NULL; 1760 } 1761 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1762 1763 /** 1764 * ring_buffer_free - free a ring buffer. 1765 * @buffer: the buffer to free. 1766 */ 1767 void 1768 ring_buffer_free(struct trace_buffer *buffer) 1769 { 1770 int cpu; 1771 1772 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1773 1774 irq_work_sync(&buffer->irq_work.work); 1775 1776 for_each_buffer_cpu(buffer, cpu) 1777 rb_free_cpu_buffer(buffer->buffers[cpu]); 1778 1779 kfree(buffer->buffers); 1780 free_cpumask_var(buffer->cpumask); 1781 1782 kfree(buffer); 1783 } 1784 EXPORT_SYMBOL_GPL(ring_buffer_free); 1785 1786 void ring_buffer_set_clock(struct trace_buffer *buffer, 1787 u64 (*clock)(void)) 1788 { 1789 buffer->clock = clock; 1790 } 1791 1792 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1793 { 1794 buffer->time_stamp_abs = abs; 1795 } 1796 1797 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1798 { 1799 return buffer->time_stamp_abs; 1800 } 1801 1802 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1803 { 1804 return local_read(&bpage->entries) & RB_WRITE_MASK; 1805 } 1806 1807 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1808 { 1809 return local_read(&bpage->write) & RB_WRITE_MASK; 1810 } 1811 1812 static bool 1813 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1814 { 1815 struct list_head *tail_page, *to_remove, *next_page; 1816 struct buffer_page *to_remove_page, *tmp_iter_page; 1817 struct buffer_page *last_page, *first_page; 1818 unsigned long nr_removed; 1819 unsigned long head_bit; 1820 int page_entries; 1821 1822 head_bit = 0; 1823 1824 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1825 atomic_inc(&cpu_buffer->record_disabled); 1826 /* 1827 * We don't race with the readers since we have acquired the reader 1828 * lock. We also don't race with writers after disabling recording. 1829 * This makes it easy to figure out the first and the last page to be 1830 * removed from the list. We unlink all the pages in between including 1831 * the first and last pages. This is done in a busy loop so that we 1832 * lose the least number of traces. 1833 * The pages are freed after we restart recording and unlock readers. 1834 */ 1835 tail_page = &cpu_buffer->tail_page->list; 1836 1837 /* 1838 * tail page might be on reader page, we remove the next page 1839 * from the ring buffer 1840 */ 1841 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1842 tail_page = rb_list_head(tail_page->next); 1843 to_remove = tail_page; 1844 1845 /* start of pages to remove */ 1846 first_page = list_entry(rb_list_head(to_remove->next), 1847 struct buffer_page, list); 1848 1849 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1850 to_remove = rb_list_head(to_remove)->next; 1851 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1852 } 1853 /* Read iterators need to reset themselves when some pages removed */ 1854 cpu_buffer->pages_removed += nr_removed; 1855 1856 next_page = rb_list_head(to_remove)->next; 1857 1858 /* 1859 * Now we remove all pages between tail_page and next_page. 1860 * Make sure that we have head_bit value preserved for the 1861 * next page 1862 */ 1863 tail_page->next = (struct list_head *)((unsigned long)next_page | 1864 head_bit); 1865 next_page = rb_list_head(next_page); 1866 next_page->prev = tail_page; 1867 1868 /* make sure pages points to a valid page in the ring buffer */ 1869 cpu_buffer->pages = next_page; 1870 1871 /* update head page */ 1872 if (head_bit) 1873 cpu_buffer->head_page = list_entry(next_page, 1874 struct buffer_page, list); 1875 1876 /* pages are removed, resume tracing and then free the pages */ 1877 atomic_dec(&cpu_buffer->record_disabled); 1878 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1879 1880 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1881 1882 /* last buffer page to remove */ 1883 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1884 list); 1885 tmp_iter_page = first_page; 1886 1887 do { 1888 cond_resched(); 1889 1890 to_remove_page = tmp_iter_page; 1891 rb_inc_page(&tmp_iter_page); 1892 1893 /* update the counters */ 1894 page_entries = rb_page_entries(to_remove_page); 1895 if (page_entries) { 1896 /* 1897 * If something was added to this page, it was full 1898 * since it is not the tail page. So we deduct the 1899 * bytes consumed in ring buffer from here. 1900 * Increment overrun to account for the lost events. 1901 */ 1902 local_add(page_entries, &cpu_buffer->overrun); 1903 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 1904 local_inc(&cpu_buffer->pages_lost); 1905 } 1906 1907 /* 1908 * We have already removed references to this list item, just 1909 * free up the buffer_page and its page 1910 */ 1911 free_buffer_page(to_remove_page); 1912 nr_removed--; 1913 1914 } while (to_remove_page != last_page); 1915 1916 RB_WARN_ON(cpu_buffer, nr_removed); 1917 1918 return nr_removed == 0; 1919 } 1920 1921 static bool 1922 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1923 { 1924 struct list_head *pages = &cpu_buffer->new_pages; 1925 unsigned long flags; 1926 bool success; 1927 int retries; 1928 1929 /* Can be called at early boot up, where interrupts must not been enabled */ 1930 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1931 /* 1932 * We are holding the reader lock, so the reader page won't be swapped 1933 * in the ring buffer. Now we are racing with the writer trying to 1934 * move head page and the tail page. 1935 * We are going to adapt the reader page update process where: 1936 * 1. We first splice the start and end of list of new pages between 1937 * the head page and its previous page. 1938 * 2. We cmpxchg the prev_page->next to point from head page to the 1939 * start of new pages list. 1940 * 3. Finally, we update the head->prev to the end of new list. 1941 * 1942 * We will try this process 10 times, to make sure that we don't keep 1943 * spinning. 1944 */ 1945 retries = 10; 1946 success = false; 1947 while (retries--) { 1948 struct list_head *head_page, *prev_page; 1949 struct list_head *last_page, *first_page; 1950 struct list_head *head_page_with_bit; 1951 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 1952 1953 if (!hpage) 1954 break; 1955 head_page = &hpage->list; 1956 prev_page = head_page->prev; 1957 1958 first_page = pages->next; 1959 last_page = pages->prev; 1960 1961 head_page_with_bit = (struct list_head *) 1962 ((unsigned long)head_page | RB_PAGE_HEAD); 1963 1964 last_page->next = head_page_with_bit; 1965 first_page->prev = prev_page; 1966 1967 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 1968 if (try_cmpxchg(&prev_page->next, 1969 &head_page_with_bit, first_page)) { 1970 /* 1971 * yay, we replaced the page pointer to our new list, 1972 * now, we just have to update to head page's prev 1973 * pointer to point to end of list 1974 */ 1975 head_page->prev = last_page; 1976 success = true; 1977 break; 1978 } 1979 } 1980 1981 if (success) 1982 INIT_LIST_HEAD(pages); 1983 /* 1984 * If we weren't successful in adding in new pages, warn and stop 1985 * tracing 1986 */ 1987 RB_WARN_ON(cpu_buffer, !success); 1988 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1989 1990 /* free pages if they weren't inserted */ 1991 if (!success) { 1992 struct buffer_page *bpage, *tmp; 1993 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1994 list) { 1995 list_del_init(&bpage->list); 1996 free_buffer_page(bpage); 1997 } 1998 } 1999 return success; 2000 } 2001 2002 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2003 { 2004 bool success; 2005 2006 if (cpu_buffer->nr_pages_to_update > 0) 2007 success = rb_insert_pages(cpu_buffer); 2008 else 2009 success = rb_remove_pages(cpu_buffer, 2010 -cpu_buffer->nr_pages_to_update); 2011 2012 if (success) 2013 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2014 } 2015 2016 static void update_pages_handler(struct work_struct *work) 2017 { 2018 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2019 struct ring_buffer_per_cpu, update_pages_work); 2020 rb_update_pages(cpu_buffer); 2021 complete(&cpu_buffer->update_done); 2022 } 2023 2024 /** 2025 * ring_buffer_resize - resize the ring buffer 2026 * @buffer: the buffer to resize. 2027 * @size: the new size. 2028 * @cpu_id: the cpu buffer to resize 2029 * 2030 * Minimum size is 2 * buffer->subbuf_size. 2031 * 2032 * Returns 0 on success and < 0 on failure. 2033 */ 2034 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2035 int cpu_id) 2036 { 2037 struct ring_buffer_per_cpu *cpu_buffer; 2038 unsigned long nr_pages; 2039 int cpu, err; 2040 2041 /* 2042 * Always succeed at resizing a non-existent buffer: 2043 */ 2044 if (!buffer) 2045 return 0; 2046 2047 /* Make sure the requested buffer exists */ 2048 if (cpu_id != RING_BUFFER_ALL_CPUS && 2049 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2050 return 0; 2051 2052 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2053 2054 /* we need a minimum of two pages */ 2055 if (nr_pages < 2) 2056 nr_pages = 2; 2057 2058 /* prevent another thread from changing buffer sizes */ 2059 mutex_lock(&buffer->mutex); 2060 atomic_inc(&buffer->resizing); 2061 2062 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2063 /* 2064 * Don't succeed if resizing is disabled, as a reader might be 2065 * manipulating the ring buffer and is expecting a sane state while 2066 * this is true. 2067 */ 2068 for_each_buffer_cpu(buffer, cpu) { 2069 cpu_buffer = buffer->buffers[cpu]; 2070 if (atomic_read(&cpu_buffer->resize_disabled)) { 2071 err = -EBUSY; 2072 goto out_err_unlock; 2073 } 2074 } 2075 2076 /* calculate the pages to update */ 2077 for_each_buffer_cpu(buffer, cpu) { 2078 cpu_buffer = buffer->buffers[cpu]; 2079 2080 cpu_buffer->nr_pages_to_update = nr_pages - 2081 cpu_buffer->nr_pages; 2082 /* 2083 * nothing more to do for removing pages or no update 2084 */ 2085 if (cpu_buffer->nr_pages_to_update <= 0) 2086 continue; 2087 /* 2088 * to add pages, make sure all new pages can be 2089 * allocated without receiving ENOMEM 2090 */ 2091 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2092 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2093 &cpu_buffer->new_pages)) { 2094 /* not enough memory for new pages */ 2095 err = -ENOMEM; 2096 goto out_err; 2097 } 2098 2099 cond_resched(); 2100 } 2101 2102 cpus_read_lock(); 2103 /* 2104 * Fire off all the required work handlers 2105 * We can't schedule on offline CPUs, but it's not necessary 2106 * since we can change their buffer sizes without any race. 2107 */ 2108 for_each_buffer_cpu(buffer, cpu) { 2109 cpu_buffer = buffer->buffers[cpu]; 2110 if (!cpu_buffer->nr_pages_to_update) 2111 continue; 2112 2113 /* Can't run something on an offline CPU. */ 2114 if (!cpu_online(cpu)) { 2115 rb_update_pages(cpu_buffer); 2116 cpu_buffer->nr_pages_to_update = 0; 2117 } else { 2118 /* Run directly if possible. */ 2119 migrate_disable(); 2120 if (cpu != smp_processor_id()) { 2121 migrate_enable(); 2122 schedule_work_on(cpu, 2123 &cpu_buffer->update_pages_work); 2124 } else { 2125 update_pages_handler(&cpu_buffer->update_pages_work); 2126 migrate_enable(); 2127 } 2128 } 2129 } 2130 2131 /* wait for all the updates to complete */ 2132 for_each_buffer_cpu(buffer, cpu) { 2133 cpu_buffer = buffer->buffers[cpu]; 2134 if (!cpu_buffer->nr_pages_to_update) 2135 continue; 2136 2137 if (cpu_online(cpu)) 2138 wait_for_completion(&cpu_buffer->update_done); 2139 cpu_buffer->nr_pages_to_update = 0; 2140 } 2141 2142 cpus_read_unlock(); 2143 } else { 2144 cpu_buffer = buffer->buffers[cpu_id]; 2145 2146 if (nr_pages == cpu_buffer->nr_pages) 2147 goto out; 2148 2149 /* 2150 * Don't succeed if resizing is disabled, as a reader might be 2151 * manipulating the ring buffer and is expecting a sane state while 2152 * this is true. 2153 */ 2154 if (atomic_read(&cpu_buffer->resize_disabled)) { 2155 err = -EBUSY; 2156 goto out_err_unlock; 2157 } 2158 2159 cpu_buffer->nr_pages_to_update = nr_pages - 2160 cpu_buffer->nr_pages; 2161 2162 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2163 if (cpu_buffer->nr_pages_to_update > 0 && 2164 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2165 &cpu_buffer->new_pages)) { 2166 err = -ENOMEM; 2167 goto out_err; 2168 } 2169 2170 cpus_read_lock(); 2171 2172 /* Can't run something on an offline CPU. */ 2173 if (!cpu_online(cpu_id)) 2174 rb_update_pages(cpu_buffer); 2175 else { 2176 /* Run directly if possible. */ 2177 migrate_disable(); 2178 if (cpu_id == smp_processor_id()) { 2179 rb_update_pages(cpu_buffer); 2180 migrate_enable(); 2181 } else { 2182 migrate_enable(); 2183 schedule_work_on(cpu_id, 2184 &cpu_buffer->update_pages_work); 2185 wait_for_completion(&cpu_buffer->update_done); 2186 } 2187 } 2188 2189 cpu_buffer->nr_pages_to_update = 0; 2190 cpus_read_unlock(); 2191 } 2192 2193 out: 2194 /* 2195 * The ring buffer resize can happen with the ring buffer 2196 * enabled, so that the update disturbs the tracing as little 2197 * as possible. But if the buffer is disabled, we do not need 2198 * to worry about that, and we can take the time to verify 2199 * that the buffer is not corrupt. 2200 */ 2201 if (atomic_read(&buffer->record_disabled)) { 2202 atomic_inc(&buffer->record_disabled); 2203 /* 2204 * Even though the buffer was disabled, we must make sure 2205 * that it is truly disabled before calling rb_check_pages. 2206 * There could have been a race between checking 2207 * record_disable and incrementing it. 2208 */ 2209 synchronize_rcu(); 2210 for_each_buffer_cpu(buffer, cpu) { 2211 cpu_buffer = buffer->buffers[cpu]; 2212 rb_check_pages(cpu_buffer); 2213 } 2214 atomic_dec(&buffer->record_disabled); 2215 } 2216 2217 atomic_dec(&buffer->resizing); 2218 mutex_unlock(&buffer->mutex); 2219 return 0; 2220 2221 out_err: 2222 for_each_buffer_cpu(buffer, cpu) { 2223 struct buffer_page *bpage, *tmp; 2224 2225 cpu_buffer = buffer->buffers[cpu]; 2226 cpu_buffer->nr_pages_to_update = 0; 2227 2228 if (list_empty(&cpu_buffer->new_pages)) 2229 continue; 2230 2231 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2232 list) { 2233 list_del_init(&bpage->list); 2234 free_buffer_page(bpage); 2235 } 2236 } 2237 out_err_unlock: 2238 atomic_dec(&buffer->resizing); 2239 mutex_unlock(&buffer->mutex); 2240 return err; 2241 } 2242 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2243 2244 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2245 { 2246 mutex_lock(&buffer->mutex); 2247 if (val) 2248 buffer->flags |= RB_FL_OVERWRITE; 2249 else 2250 buffer->flags &= ~RB_FL_OVERWRITE; 2251 mutex_unlock(&buffer->mutex); 2252 } 2253 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2254 2255 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2256 { 2257 return bpage->page->data + index; 2258 } 2259 2260 static __always_inline struct ring_buffer_event * 2261 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2262 { 2263 return __rb_page_index(cpu_buffer->reader_page, 2264 cpu_buffer->reader_page->read); 2265 } 2266 2267 static struct ring_buffer_event * 2268 rb_iter_head_event(struct ring_buffer_iter *iter) 2269 { 2270 struct ring_buffer_event *event; 2271 struct buffer_page *iter_head_page = iter->head_page; 2272 unsigned long commit; 2273 unsigned length; 2274 2275 if (iter->head != iter->next_event) 2276 return iter->event; 2277 2278 /* 2279 * When the writer goes across pages, it issues a cmpxchg which 2280 * is a mb(), which will synchronize with the rmb here. 2281 * (see rb_tail_page_update() and __rb_reserve_next()) 2282 */ 2283 commit = rb_page_commit(iter_head_page); 2284 smp_rmb(); 2285 2286 /* An event needs to be at least 8 bytes in size */ 2287 if (iter->head > commit - 8) 2288 goto reset; 2289 2290 event = __rb_page_index(iter_head_page, iter->head); 2291 length = rb_event_length(event); 2292 2293 /* 2294 * READ_ONCE() doesn't work on functions and we don't want the 2295 * compiler doing any crazy optimizations with length. 2296 */ 2297 barrier(); 2298 2299 if ((iter->head + length) > commit || length > iter->event_size) 2300 /* Writer corrupted the read? */ 2301 goto reset; 2302 2303 memcpy(iter->event, event, length); 2304 /* 2305 * If the page stamp is still the same after this rmb() then the 2306 * event was safely copied without the writer entering the page. 2307 */ 2308 smp_rmb(); 2309 2310 /* Make sure the page didn't change since we read this */ 2311 if (iter->page_stamp != iter_head_page->page->time_stamp || 2312 commit > rb_page_commit(iter_head_page)) 2313 goto reset; 2314 2315 iter->next_event = iter->head + length; 2316 return iter->event; 2317 reset: 2318 /* Reset to the beginning */ 2319 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2320 iter->head = 0; 2321 iter->next_event = 0; 2322 iter->missed_events = 1; 2323 return NULL; 2324 } 2325 2326 /* Size is determined by what has been committed */ 2327 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2328 { 2329 return rb_page_commit(bpage); 2330 } 2331 2332 static __always_inline unsigned 2333 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2334 { 2335 return rb_page_commit(cpu_buffer->commit_page); 2336 } 2337 2338 static __always_inline unsigned 2339 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 2340 { 2341 unsigned long addr = (unsigned long)event; 2342 2343 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 2344 2345 return addr - BUF_PAGE_HDR_SIZE; 2346 } 2347 2348 static void rb_inc_iter(struct ring_buffer_iter *iter) 2349 { 2350 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2351 2352 /* 2353 * The iterator could be on the reader page (it starts there). 2354 * But the head could have moved, since the reader was 2355 * found. Check for this case and assign the iterator 2356 * to the head page instead of next. 2357 */ 2358 if (iter->head_page == cpu_buffer->reader_page) 2359 iter->head_page = rb_set_head_page(cpu_buffer); 2360 else 2361 rb_inc_page(&iter->head_page); 2362 2363 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2364 iter->head = 0; 2365 iter->next_event = 0; 2366 } 2367 2368 /* 2369 * rb_handle_head_page - writer hit the head page 2370 * 2371 * Returns: +1 to retry page 2372 * 0 to continue 2373 * -1 on error 2374 */ 2375 static int 2376 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2377 struct buffer_page *tail_page, 2378 struct buffer_page *next_page) 2379 { 2380 struct buffer_page *new_head; 2381 int entries; 2382 int type; 2383 int ret; 2384 2385 entries = rb_page_entries(next_page); 2386 2387 /* 2388 * The hard part is here. We need to move the head 2389 * forward, and protect against both readers on 2390 * other CPUs and writers coming in via interrupts. 2391 */ 2392 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2393 RB_PAGE_HEAD); 2394 2395 /* 2396 * type can be one of four: 2397 * NORMAL - an interrupt already moved it for us 2398 * HEAD - we are the first to get here. 2399 * UPDATE - we are the interrupt interrupting 2400 * a current move. 2401 * MOVED - a reader on another CPU moved the next 2402 * pointer to its reader page. Give up 2403 * and try again. 2404 */ 2405 2406 switch (type) { 2407 case RB_PAGE_HEAD: 2408 /* 2409 * We changed the head to UPDATE, thus 2410 * it is our responsibility to update 2411 * the counters. 2412 */ 2413 local_add(entries, &cpu_buffer->overrun); 2414 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 2415 local_inc(&cpu_buffer->pages_lost); 2416 2417 /* 2418 * The entries will be zeroed out when we move the 2419 * tail page. 2420 */ 2421 2422 /* still more to do */ 2423 break; 2424 2425 case RB_PAGE_UPDATE: 2426 /* 2427 * This is an interrupt that interrupt the 2428 * previous update. Still more to do. 2429 */ 2430 break; 2431 case RB_PAGE_NORMAL: 2432 /* 2433 * An interrupt came in before the update 2434 * and processed this for us. 2435 * Nothing left to do. 2436 */ 2437 return 1; 2438 case RB_PAGE_MOVED: 2439 /* 2440 * The reader is on another CPU and just did 2441 * a swap with our next_page. 2442 * Try again. 2443 */ 2444 return 1; 2445 default: 2446 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2447 return -1; 2448 } 2449 2450 /* 2451 * Now that we are here, the old head pointer is 2452 * set to UPDATE. This will keep the reader from 2453 * swapping the head page with the reader page. 2454 * The reader (on another CPU) will spin till 2455 * we are finished. 2456 * 2457 * We just need to protect against interrupts 2458 * doing the job. We will set the next pointer 2459 * to HEAD. After that, we set the old pointer 2460 * to NORMAL, but only if it was HEAD before. 2461 * otherwise we are an interrupt, and only 2462 * want the outer most commit to reset it. 2463 */ 2464 new_head = next_page; 2465 rb_inc_page(&new_head); 2466 2467 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2468 RB_PAGE_NORMAL); 2469 2470 /* 2471 * Valid returns are: 2472 * HEAD - an interrupt came in and already set it. 2473 * NORMAL - One of two things: 2474 * 1) We really set it. 2475 * 2) A bunch of interrupts came in and moved 2476 * the page forward again. 2477 */ 2478 switch (ret) { 2479 case RB_PAGE_HEAD: 2480 case RB_PAGE_NORMAL: 2481 /* OK */ 2482 break; 2483 default: 2484 RB_WARN_ON(cpu_buffer, 1); 2485 return -1; 2486 } 2487 2488 /* 2489 * It is possible that an interrupt came in, 2490 * set the head up, then more interrupts came in 2491 * and moved it again. When we get back here, 2492 * the page would have been set to NORMAL but we 2493 * just set it back to HEAD. 2494 * 2495 * How do you detect this? Well, if that happened 2496 * the tail page would have moved. 2497 */ 2498 if (ret == RB_PAGE_NORMAL) { 2499 struct buffer_page *buffer_tail_page; 2500 2501 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2502 /* 2503 * If the tail had moved passed next, then we need 2504 * to reset the pointer. 2505 */ 2506 if (buffer_tail_page != tail_page && 2507 buffer_tail_page != next_page) 2508 rb_head_page_set_normal(cpu_buffer, new_head, 2509 next_page, 2510 RB_PAGE_HEAD); 2511 } 2512 2513 /* 2514 * If this was the outer most commit (the one that 2515 * changed the original pointer from HEAD to UPDATE), 2516 * then it is up to us to reset it to NORMAL. 2517 */ 2518 if (type == RB_PAGE_HEAD) { 2519 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2520 tail_page, 2521 RB_PAGE_UPDATE); 2522 if (RB_WARN_ON(cpu_buffer, 2523 ret != RB_PAGE_UPDATE)) 2524 return -1; 2525 } 2526 2527 return 0; 2528 } 2529 2530 static inline void 2531 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2532 unsigned long tail, struct rb_event_info *info) 2533 { 2534 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 2535 struct buffer_page *tail_page = info->tail_page; 2536 struct ring_buffer_event *event; 2537 unsigned long length = info->length; 2538 2539 /* 2540 * Only the event that crossed the page boundary 2541 * must fill the old tail_page with padding. 2542 */ 2543 if (tail >= bsize) { 2544 /* 2545 * If the page was filled, then we still need 2546 * to update the real_end. Reset it to zero 2547 * and the reader will ignore it. 2548 */ 2549 if (tail == bsize) 2550 tail_page->real_end = 0; 2551 2552 local_sub(length, &tail_page->write); 2553 return; 2554 } 2555 2556 event = __rb_page_index(tail_page, tail); 2557 2558 /* 2559 * Save the original length to the meta data. 2560 * This will be used by the reader to add lost event 2561 * counter. 2562 */ 2563 tail_page->real_end = tail; 2564 2565 /* 2566 * If this event is bigger than the minimum size, then 2567 * we need to be careful that we don't subtract the 2568 * write counter enough to allow another writer to slip 2569 * in on this page. 2570 * We put in a discarded commit instead, to make sure 2571 * that this space is not used again, and this space will 2572 * not be accounted into 'entries_bytes'. 2573 * 2574 * If we are less than the minimum size, we don't need to 2575 * worry about it. 2576 */ 2577 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 2578 /* No room for any events */ 2579 2580 /* Mark the rest of the page with padding */ 2581 rb_event_set_padding(event); 2582 2583 /* Make sure the padding is visible before the write update */ 2584 smp_wmb(); 2585 2586 /* Set the write back to the previous setting */ 2587 local_sub(length, &tail_page->write); 2588 return; 2589 } 2590 2591 /* Put in a discarded event */ 2592 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 2593 event->type_len = RINGBUF_TYPE_PADDING; 2594 /* time delta must be non zero */ 2595 event->time_delta = 1; 2596 2597 /* account for padding bytes */ 2598 local_add(bsize - tail, &cpu_buffer->entries_bytes); 2599 2600 /* Make sure the padding is visible before the tail_page->write update */ 2601 smp_wmb(); 2602 2603 /* Set write to end of buffer */ 2604 length = (tail + length) - bsize; 2605 local_sub(length, &tail_page->write); 2606 } 2607 2608 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2609 2610 /* 2611 * This is the slow path, force gcc not to inline it. 2612 */ 2613 static noinline struct ring_buffer_event * 2614 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2615 unsigned long tail, struct rb_event_info *info) 2616 { 2617 struct buffer_page *tail_page = info->tail_page; 2618 struct buffer_page *commit_page = cpu_buffer->commit_page; 2619 struct trace_buffer *buffer = cpu_buffer->buffer; 2620 struct buffer_page *next_page; 2621 int ret; 2622 2623 next_page = tail_page; 2624 2625 rb_inc_page(&next_page); 2626 2627 /* 2628 * If for some reason, we had an interrupt storm that made 2629 * it all the way around the buffer, bail, and warn 2630 * about it. 2631 */ 2632 if (unlikely(next_page == commit_page)) { 2633 local_inc(&cpu_buffer->commit_overrun); 2634 goto out_reset; 2635 } 2636 2637 /* 2638 * This is where the fun begins! 2639 * 2640 * We are fighting against races between a reader that 2641 * could be on another CPU trying to swap its reader 2642 * page with the buffer head. 2643 * 2644 * We are also fighting against interrupts coming in and 2645 * moving the head or tail on us as well. 2646 * 2647 * If the next page is the head page then we have filled 2648 * the buffer, unless the commit page is still on the 2649 * reader page. 2650 */ 2651 if (rb_is_head_page(next_page, &tail_page->list)) { 2652 2653 /* 2654 * If the commit is not on the reader page, then 2655 * move the header page. 2656 */ 2657 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2658 /* 2659 * If we are not in overwrite mode, 2660 * this is easy, just stop here. 2661 */ 2662 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2663 local_inc(&cpu_buffer->dropped_events); 2664 goto out_reset; 2665 } 2666 2667 ret = rb_handle_head_page(cpu_buffer, 2668 tail_page, 2669 next_page); 2670 if (ret < 0) 2671 goto out_reset; 2672 if (ret) 2673 goto out_again; 2674 } else { 2675 /* 2676 * We need to be careful here too. The 2677 * commit page could still be on the reader 2678 * page. We could have a small buffer, and 2679 * have filled up the buffer with events 2680 * from interrupts and such, and wrapped. 2681 * 2682 * Note, if the tail page is also on the 2683 * reader_page, we let it move out. 2684 */ 2685 if (unlikely((cpu_buffer->commit_page != 2686 cpu_buffer->tail_page) && 2687 (cpu_buffer->commit_page == 2688 cpu_buffer->reader_page))) { 2689 local_inc(&cpu_buffer->commit_overrun); 2690 goto out_reset; 2691 } 2692 } 2693 } 2694 2695 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2696 2697 out_again: 2698 2699 rb_reset_tail(cpu_buffer, tail, info); 2700 2701 /* Commit what we have for now. */ 2702 rb_end_commit(cpu_buffer); 2703 /* rb_end_commit() decs committing */ 2704 local_inc(&cpu_buffer->committing); 2705 2706 /* fail and let the caller try again */ 2707 return ERR_PTR(-EAGAIN); 2708 2709 out_reset: 2710 /* reset write */ 2711 rb_reset_tail(cpu_buffer, tail, info); 2712 2713 return NULL; 2714 } 2715 2716 /* Slow path */ 2717 static struct ring_buffer_event * 2718 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2719 struct ring_buffer_event *event, u64 delta, bool abs) 2720 { 2721 if (abs) 2722 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2723 else 2724 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2725 2726 /* Not the first event on the page, or not delta? */ 2727 if (abs || rb_event_index(cpu_buffer, event)) { 2728 event->time_delta = delta & TS_MASK; 2729 event->array[0] = delta >> TS_SHIFT; 2730 } else { 2731 /* nope, just zero it */ 2732 event->time_delta = 0; 2733 event->array[0] = 0; 2734 } 2735 2736 return skip_time_extend(event); 2737 } 2738 2739 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2740 static inline bool sched_clock_stable(void) 2741 { 2742 return true; 2743 } 2744 #endif 2745 2746 static void 2747 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2748 struct rb_event_info *info) 2749 { 2750 u64 write_stamp; 2751 2752 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2753 (unsigned long long)info->delta, 2754 (unsigned long long)info->ts, 2755 (unsigned long long)info->before, 2756 (unsigned long long)info->after, 2757 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 2758 sched_clock_stable() ? "" : 2759 "If you just came from a suspend/resume,\n" 2760 "please switch to the trace global clock:\n" 2761 " echo global > /sys/kernel/tracing/trace_clock\n" 2762 "or add trace_clock=global to the kernel command line\n"); 2763 } 2764 2765 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2766 struct ring_buffer_event **event, 2767 struct rb_event_info *info, 2768 u64 *delta, 2769 unsigned int *length) 2770 { 2771 bool abs = info->add_timestamp & 2772 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2773 2774 if (unlikely(info->delta > (1ULL << 59))) { 2775 /* 2776 * Some timers can use more than 59 bits, and when a timestamp 2777 * is added to the buffer, it will lose those bits. 2778 */ 2779 if (abs && (info->ts & TS_MSB)) { 2780 info->delta &= ABS_TS_MASK; 2781 2782 /* did the clock go backwards */ 2783 } else if (info->before == info->after && info->before > info->ts) { 2784 /* not interrupted */ 2785 static int once; 2786 2787 /* 2788 * This is possible with a recalibrating of the TSC. 2789 * Do not produce a call stack, but just report it. 2790 */ 2791 if (!once) { 2792 once++; 2793 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2794 info->before, info->ts); 2795 } 2796 } else 2797 rb_check_timestamp(cpu_buffer, info); 2798 if (!abs) 2799 info->delta = 0; 2800 } 2801 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 2802 *length -= RB_LEN_TIME_EXTEND; 2803 *delta = 0; 2804 } 2805 2806 /** 2807 * rb_update_event - update event type and data 2808 * @cpu_buffer: The per cpu buffer of the @event 2809 * @event: the event to update 2810 * @info: The info to update the @event with (contains length and delta) 2811 * 2812 * Update the type and data fields of the @event. The length 2813 * is the actual size that is written to the ring buffer, 2814 * and with this, we can determine what to place into the 2815 * data field. 2816 */ 2817 static void 2818 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2819 struct ring_buffer_event *event, 2820 struct rb_event_info *info) 2821 { 2822 unsigned length = info->length; 2823 u64 delta = info->delta; 2824 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2825 2826 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2827 cpu_buffer->event_stamp[nest] = info->ts; 2828 2829 /* 2830 * If we need to add a timestamp, then we 2831 * add it to the start of the reserved space. 2832 */ 2833 if (unlikely(info->add_timestamp)) 2834 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2835 2836 event->time_delta = delta; 2837 length -= RB_EVNT_HDR_SIZE; 2838 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2839 event->type_len = 0; 2840 event->array[0] = length; 2841 } else 2842 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2843 } 2844 2845 static unsigned rb_calculate_event_length(unsigned length) 2846 { 2847 struct ring_buffer_event event; /* Used only for sizeof array */ 2848 2849 /* zero length can cause confusions */ 2850 if (!length) 2851 length++; 2852 2853 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2854 length += sizeof(event.array[0]); 2855 2856 length += RB_EVNT_HDR_SIZE; 2857 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2858 2859 /* 2860 * In case the time delta is larger than the 27 bits for it 2861 * in the header, we need to add a timestamp. If another 2862 * event comes in when trying to discard this one to increase 2863 * the length, then the timestamp will be added in the allocated 2864 * space of this event. If length is bigger than the size needed 2865 * for the TIME_EXTEND, then padding has to be used. The events 2866 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2867 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2868 * As length is a multiple of 4, we only need to worry if it 2869 * is 12 (RB_LEN_TIME_EXTEND + 4). 2870 */ 2871 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2872 length += RB_ALIGNMENT; 2873 2874 return length; 2875 } 2876 2877 static inline bool 2878 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2879 struct ring_buffer_event *event) 2880 { 2881 unsigned long new_index, old_index; 2882 struct buffer_page *bpage; 2883 unsigned long addr; 2884 2885 new_index = rb_event_index(cpu_buffer, event); 2886 old_index = new_index + rb_event_ts_length(event); 2887 addr = (unsigned long)event; 2888 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 2889 2890 bpage = READ_ONCE(cpu_buffer->tail_page); 2891 2892 /* 2893 * Make sure the tail_page is still the same and 2894 * the next write location is the end of this event 2895 */ 2896 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2897 unsigned long write_mask = 2898 local_read(&bpage->write) & ~RB_WRITE_MASK; 2899 unsigned long event_length = rb_event_length(event); 2900 2901 /* 2902 * For the before_stamp to be different than the write_stamp 2903 * to make sure that the next event adds an absolute 2904 * value and does not rely on the saved write stamp, which 2905 * is now going to be bogus. 2906 * 2907 * By setting the before_stamp to zero, the next event 2908 * is not going to use the write_stamp and will instead 2909 * create an absolute timestamp. This means there's no 2910 * reason to update the wirte_stamp! 2911 */ 2912 rb_time_set(&cpu_buffer->before_stamp, 0); 2913 2914 /* 2915 * If an event were to come in now, it would see that the 2916 * write_stamp and the before_stamp are different, and assume 2917 * that this event just added itself before updating 2918 * the write stamp. The interrupting event will fix the 2919 * write stamp for us, and use an absolute timestamp. 2920 */ 2921 2922 /* 2923 * This is on the tail page. It is possible that 2924 * a write could come in and move the tail page 2925 * and write to the next page. That is fine 2926 * because we just shorten what is on this page. 2927 */ 2928 old_index += write_mask; 2929 new_index += write_mask; 2930 2931 /* caution: old_index gets updated on cmpxchg failure */ 2932 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 2933 /* update counters */ 2934 local_sub(event_length, &cpu_buffer->entries_bytes); 2935 return true; 2936 } 2937 } 2938 2939 /* could not discard */ 2940 return false; 2941 } 2942 2943 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2944 { 2945 local_inc(&cpu_buffer->committing); 2946 local_inc(&cpu_buffer->commits); 2947 } 2948 2949 static __always_inline void 2950 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2951 { 2952 unsigned long max_count; 2953 2954 /* 2955 * We only race with interrupts and NMIs on this CPU. 2956 * If we own the commit event, then we can commit 2957 * all others that interrupted us, since the interruptions 2958 * are in stack format (they finish before they come 2959 * back to us). This allows us to do a simple loop to 2960 * assign the commit to the tail. 2961 */ 2962 again: 2963 max_count = cpu_buffer->nr_pages * 100; 2964 2965 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2966 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2967 return; 2968 if (RB_WARN_ON(cpu_buffer, 2969 rb_is_reader_page(cpu_buffer->tail_page))) 2970 return; 2971 /* 2972 * No need for a memory barrier here, as the update 2973 * of the tail_page did it for this page. 2974 */ 2975 local_set(&cpu_buffer->commit_page->page->commit, 2976 rb_page_write(cpu_buffer->commit_page)); 2977 rb_inc_page(&cpu_buffer->commit_page); 2978 /* add barrier to keep gcc from optimizing too much */ 2979 barrier(); 2980 } 2981 while (rb_commit_index(cpu_buffer) != 2982 rb_page_write(cpu_buffer->commit_page)) { 2983 2984 /* Make sure the readers see the content of what is committed. */ 2985 smp_wmb(); 2986 local_set(&cpu_buffer->commit_page->page->commit, 2987 rb_page_write(cpu_buffer->commit_page)); 2988 RB_WARN_ON(cpu_buffer, 2989 local_read(&cpu_buffer->commit_page->page->commit) & 2990 ~RB_WRITE_MASK); 2991 barrier(); 2992 } 2993 2994 /* again, keep gcc from optimizing */ 2995 barrier(); 2996 2997 /* 2998 * If an interrupt came in just after the first while loop 2999 * and pushed the tail page forward, we will be left with 3000 * a dangling commit that will never go forward. 3001 */ 3002 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3003 goto again; 3004 } 3005 3006 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3007 { 3008 unsigned long commits; 3009 3010 if (RB_WARN_ON(cpu_buffer, 3011 !local_read(&cpu_buffer->committing))) 3012 return; 3013 3014 again: 3015 commits = local_read(&cpu_buffer->commits); 3016 /* synchronize with interrupts */ 3017 barrier(); 3018 if (local_read(&cpu_buffer->committing) == 1) 3019 rb_set_commit_to_write(cpu_buffer); 3020 3021 local_dec(&cpu_buffer->committing); 3022 3023 /* synchronize with interrupts */ 3024 barrier(); 3025 3026 /* 3027 * Need to account for interrupts coming in between the 3028 * updating of the commit page and the clearing of the 3029 * committing counter. 3030 */ 3031 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3032 !local_read(&cpu_buffer->committing)) { 3033 local_inc(&cpu_buffer->committing); 3034 goto again; 3035 } 3036 } 3037 3038 static inline void rb_event_discard(struct ring_buffer_event *event) 3039 { 3040 if (extended_time(event)) 3041 event = skip_time_extend(event); 3042 3043 /* array[0] holds the actual length for the discarded event */ 3044 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3045 event->type_len = RINGBUF_TYPE_PADDING; 3046 /* time delta must be non zero */ 3047 if (!event->time_delta) 3048 event->time_delta = 1; 3049 } 3050 3051 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3052 { 3053 local_inc(&cpu_buffer->entries); 3054 rb_end_commit(cpu_buffer); 3055 } 3056 3057 static __always_inline void 3058 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3059 { 3060 if (buffer->irq_work.waiters_pending) { 3061 buffer->irq_work.waiters_pending = false; 3062 /* irq_work_queue() supplies it's own memory barriers */ 3063 irq_work_queue(&buffer->irq_work.work); 3064 } 3065 3066 if (cpu_buffer->irq_work.waiters_pending) { 3067 cpu_buffer->irq_work.waiters_pending = false; 3068 /* irq_work_queue() supplies it's own memory barriers */ 3069 irq_work_queue(&cpu_buffer->irq_work.work); 3070 } 3071 3072 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3073 return; 3074 3075 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3076 return; 3077 3078 if (!cpu_buffer->irq_work.full_waiters_pending) 3079 return; 3080 3081 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3082 3083 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3084 return; 3085 3086 cpu_buffer->irq_work.wakeup_full = true; 3087 cpu_buffer->irq_work.full_waiters_pending = false; 3088 /* irq_work_queue() supplies it's own memory barriers */ 3089 irq_work_queue(&cpu_buffer->irq_work.work); 3090 } 3091 3092 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3093 # define do_ring_buffer_record_recursion() \ 3094 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3095 #else 3096 # define do_ring_buffer_record_recursion() do { } while (0) 3097 #endif 3098 3099 /* 3100 * The lock and unlock are done within a preempt disable section. 3101 * The current_context per_cpu variable can only be modified 3102 * by the current task between lock and unlock. But it can 3103 * be modified more than once via an interrupt. To pass this 3104 * information from the lock to the unlock without having to 3105 * access the 'in_interrupt()' functions again (which do show 3106 * a bit of overhead in something as critical as function tracing, 3107 * we use a bitmask trick. 3108 * 3109 * bit 1 = NMI context 3110 * bit 2 = IRQ context 3111 * bit 3 = SoftIRQ context 3112 * bit 4 = normal context. 3113 * 3114 * This works because this is the order of contexts that can 3115 * preempt other contexts. A SoftIRQ never preempts an IRQ 3116 * context. 3117 * 3118 * When the context is determined, the corresponding bit is 3119 * checked and set (if it was set, then a recursion of that context 3120 * happened). 3121 * 3122 * On unlock, we need to clear this bit. To do so, just subtract 3123 * 1 from the current_context and AND it to itself. 3124 * 3125 * (binary) 3126 * 101 - 1 = 100 3127 * 101 & 100 = 100 (clearing bit zero) 3128 * 3129 * 1010 - 1 = 1001 3130 * 1010 & 1001 = 1000 (clearing bit 1) 3131 * 3132 * The least significant bit can be cleared this way, and it 3133 * just so happens that it is the same bit corresponding to 3134 * the current context. 3135 * 3136 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3137 * is set when a recursion is detected at the current context, and if 3138 * the TRANSITION bit is already set, it will fail the recursion. 3139 * This is needed because there's a lag between the changing of 3140 * interrupt context and updating the preempt count. In this case, 3141 * a false positive will be found. To handle this, one extra recursion 3142 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3143 * bit is already set, then it is considered a recursion and the function 3144 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3145 * 3146 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3147 * to be cleared. Even if it wasn't the context that set it. That is, 3148 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3149 * is called before preempt_count() is updated, since the check will 3150 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3151 * NMI then comes in, it will set the NMI bit, but when the NMI code 3152 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3153 * and leave the NMI bit set. But this is fine, because the interrupt 3154 * code that set the TRANSITION bit will then clear the NMI bit when it 3155 * calls trace_recursive_unlock(). If another NMI comes in, it will 3156 * set the TRANSITION bit and continue. 3157 * 3158 * Note: The TRANSITION bit only handles a single transition between context. 3159 */ 3160 3161 static __always_inline bool 3162 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3163 { 3164 unsigned int val = cpu_buffer->current_context; 3165 int bit = interrupt_context_level(); 3166 3167 bit = RB_CTX_NORMAL - bit; 3168 3169 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3170 /* 3171 * It is possible that this was called by transitioning 3172 * between interrupt context, and preempt_count() has not 3173 * been updated yet. In this case, use the TRANSITION bit. 3174 */ 3175 bit = RB_CTX_TRANSITION; 3176 if (val & (1 << (bit + cpu_buffer->nest))) { 3177 do_ring_buffer_record_recursion(); 3178 return true; 3179 } 3180 } 3181 3182 val |= (1 << (bit + cpu_buffer->nest)); 3183 cpu_buffer->current_context = val; 3184 3185 return false; 3186 } 3187 3188 static __always_inline void 3189 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3190 { 3191 cpu_buffer->current_context &= 3192 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3193 } 3194 3195 /* The recursive locking above uses 5 bits */ 3196 #define NESTED_BITS 5 3197 3198 /** 3199 * ring_buffer_nest_start - Allow to trace while nested 3200 * @buffer: The ring buffer to modify 3201 * 3202 * The ring buffer has a safety mechanism to prevent recursion. 3203 * But there may be a case where a trace needs to be done while 3204 * tracing something else. In this case, calling this function 3205 * will allow this function to nest within a currently active 3206 * ring_buffer_lock_reserve(). 3207 * 3208 * Call this function before calling another ring_buffer_lock_reserve() and 3209 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3210 */ 3211 void ring_buffer_nest_start(struct trace_buffer *buffer) 3212 { 3213 struct ring_buffer_per_cpu *cpu_buffer; 3214 int cpu; 3215 3216 /* Enabled by ring_buffer_nest_end() */ 3217 preempt_disable_notrace(); 3218 cpu = raw_smp_processor_id(); 3219 cpu_buffer = buffer->buffers[cpu]; 3220 /* This is the shift value for the above recursive locking */ 3221 cpu_buffer->nest += NESTED_BITS; 3222 } 3223 3224 /** 3225 * ring_buffer_nest_end - Allow to trace while nested 3226 * @buffer: The ring buffer to modify 3227 * 3228 * Must be called after ring_buffer_nest_start() and after the 3229 * ring_buffer_unlock_commit(). 3230 */ 3231 void ring_buffer_nest_end(struct trace_buffer *buffer) 3232 { 3233 struct ring_buffer_per_cpu *cpu_buffer; 3234 int cpu; 3235 3236 /* disabled by ring_buffer_nest_start() */ 3237 cpu = raw_smp_processor_id(); 3238 cpu_buffer = buffer->buffers[cpu]; 3239 /* This is the shift value for the above recursive locking */ 3240 cpu_buffer->nest -= NESTED_BITS; 3241 preempt_enable_notrace(); 3242 } 3243 3244 /** 3245 * ring_buffer_unlock_commit - commit a reserved 3246 * @buffer: The buffer to commit to 3247 * 3248 * This commits the data to the ring buffer, and releases any locks held. 3249 * 3250 * Must be paired with ring_buffer_lock_reserve. 3251 */ 3252 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3253 { 3254 struct ring_buffer_per_cpu *cpu_buffer; 3255 int cpu = raw_smp_processor_id(); 3256 3257 cpu_buffer = buffer->buffers[cpu]; 3258 3259 rb_commit(cpu_buffer); 3260 3261 rb_wakeups(buffer, cpu_buffer); 3262 3263 trace_recursive_unlock(cpu_buffer); 3264 3265 preempt_enable_notrace(); 3266 3267 return 0; 3268 } 3269 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3270 3271 /* Special value to validate all deltas on a page. */ 3272 #define CHECK_FULL_PAGE 1L 3273 3274 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3275 3276 static const char *show_irq_str(int bits) 3277 { 3278 const char *type[] = { 3279 ".", // 0 3280 "s", // 1 3281 "h", // 2 3282 "Hs", // 3 3283 "n", // 4 3284 "Ns", // 5 3285 "Nh", // 6 3286 "NHs", // 7 3287 }; 3288 3289 return type[bits]; 3290 } 3291 3292 /* Assume this is an trace event */ 3293 static const char *show_flags(struct ring_buffer_event *event) 3294 { 3295 struct trace_entry *entry; 3296 int bits = 0; 3297 3298 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3299 return "X"; 3300 3301 entry = ring_buffer_event_data(event); 3302 3303 if (entry->flags & TRACE_FLAG_SOFTIRQ) 3304 bits |= 1; 3305 3306 if (entry->flags & TRACE_FLAG_HARDIRQ) 3307 bits |= 2; 3308 3309 if (entry->flags & TRACE_FLAG_NMI) 3310 bits |= 4; 3311 3312 return show_irq_str(bits); 3313 } 3314 3315 static const char *show_irq(struct ring_buffer_event *event) 3316 { 3317 struct trace_entry *entry; 3318 3319 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3320 return ""; 3321 3322 entry = ring_buffer_event_data(event); 3323 if (entry->flags & TRACE_FLAG_IRQS_OFF) 3324 return "d"; 3325 return ""; 3326 } 3327 3328 static const char *show_interrupt_level(void) 3329 { 3330 unsigned long pc = preempt_count(); 3331 unsigned char level = 0; 3332 3333 if (pc & SOFTIRQ_OFFSET) 3334 level |= 1; 3335 3336 if (pc & HARDIRQ_MASK) 3337 level |= 2; 3338 3339 if (pc & NMI_MASK) 3340 level |= 4; 3341 3342 return show_irq_str(level); 3343 } 3344 3345 static void dump_buffer_page(struct buffer_data_page *bpage, 3346 struct rb_event_info *info, 3347 unsigned long tail) 3348 { 3349 struct ring_buffer_event *event; 3350 u64 ts, delta; 3351 int e; 3352 3353 ts = bpage->time_stamp; 3354 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3355 3356 for (e = 0; e < tail; e += rb_event_length(event)) { 3357 3358 event = (struct ring_buffer_event *)(bpage->data + e); 3359 3360 switch (event->type_len) { 3361 3362 case RINGBUF_TYPE_TIME_EXTEND: 3363 delta = rb_event_time_stamp(event); 3364 ts += delta; 3365 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 3366 e, ts, delta); 3367 break; 3368 3369 case RINGBUF_TYPE_TIME_STAMP: 3370 delta = rb_event_time_stamp(event); 3371 ts = rb_fix_abs_ts(delta, ts); 3372 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 3373 e, ts, delta); 3374 break; 3375 3376 case RINGBUF_TYPE_PADDING: 3377 ts += event->time_delta; 3378 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 3379 e, ts, event->time_delta); 3380 break; 3381 3382 case RINGBUF_TYPE_DATA: 3383 ts += event->time_delta; 3384 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 3385 e, ts, event->time_delta, 3386 show_flags(event), show_irq(event)); 3387 break; 3388 3389 default: 3390 break; 3391 } 3392 } 3393 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 3394 } 3395 3396 static DEFINE_PER_CPU(atomic_t, checking); 3397 static atomic_t ts_dump; 3398 3399 #define buffer_warn_return(fmt, ...) \ 3400 do { \ 3401 /* If another report is happening, ignore this one */ \ 3402 if (atomic_inc_return(&ts_dump) != 1) { \ 3403 atomic_dec(&ts_dump); \ 3404 goto out; \ 3405 } \ 3406 atomic_inc(&cpu_buffer->record_disabled); \ 3407 pr_warn(fmt, ##__VA_ARGS__); \ 3408 dump_buffer_page(bpage, info, tail); \ 3409 atomic_dec(&ts_dump); \ 3410 /* There's some cases in boot up that this can happen */ \ 3411 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 3412 /* Do not re-enable checking */ \ 3413 return; \ 3414 } while (0) 3415 3416 /* 3417 * Check if the current event time stamp matches the deltas on 3418 * the buffer page. 3419 */ 3420 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3421 struct rb_event_info *info, 3422 unsigned long tail) 3423 { 3424 struct ring_buffer_event *event; 3425 struct buffer_data_page *bpage; 3426 u64 ts, delta; 3427 bool full = false; 3428 int e; 3429 3430 bpage = info->tail_page->page; 3431 3432 if (tail == CHECK_FULL_PAGE) { 3433 full = true; 3434 tail = local_read(&bpage->commit); 3435 } else if (info->add_timestamp & 3436 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3437 /* Ignore events with absolute time stamps */ 3438 return; 3439 } 3440 3441 /* 3442 * Do not check the first event (skip possible extends too). 3443 * Also do not check if previous events have not been committed. 3444 */ 3445 if (tail <= 8 || tail > local_read(&bpage->commit)) 3446 return; 3447 3448 /* 3449 * If this interrupted another event, 3450 */ 3451 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3452 goto out; 3453 3454 ts = bpage->time_stamp; 3455 3456 for (e = 0; e < tail; e += rb_event_length(event)) { 3457 3458 event = (struct ring_buffer_event *)(bpage->data + e); 3459 3460 switch (event->type_len) { 3461 3462 case RINGBUF_TYPE_TIME_EXTEND: 3463 delta = rb_event_time_stamp(event); 3464 ts += delta; 3465 break; 3466 3467 case RINGBUF_TYPE_TIME_STAMP: 3468 delta = rb_event_time_stamp(event); 3469 delta = rb_fix_abs_ts(delta, ts); 3470 if (delta < ts) { 3471 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 3472 cpu_buffer->cpu, ts, delta); 3473 } 3474 ts = delta; 3475 break; 3476 3477 case RINGBUF_TYPE_PADDING: 3478 if (event->time_delta == 1) 3479 break; 3480 fallthrough; 3481 case RINGBUF_TYPE_DATA: 3482 ts += event->time_delta; 3483 break; 3484 3485 default: 3486 RB_WARN_ON(cpu_buffer, 1); 3487 } 3488 } 3489 if ((full && ts > info->ts) || 3490 (!full && ts + info->delta != info->ts)) { 3491 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 3492 cpu_buffer->cpu, 3493 ts + info->delta, info->ts, info->delta, 3494 info->before, info->after, 3495 full ? " (full)" : "", show_interrupt_level()); 3496 } 3497 out: 3498 atomic_dec(this_cpu_ptr(&checking)); 3499 } 3500 #else 3501 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3502 struct rb_event_info *info, 3503 unsigned long tail) 3504 { 3505 } 3506 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3507 3508 static struct ring_buffer_event * 3509 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3510 struct rb_event_info *info) 3511 { 3512 struct ring_buffer_event *event; 3513 struct buffer_page *tail_page; 3514 unsigned long tail, write, w; 3515 3516 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3517 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3518 3519 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3520 barrier(); 3521 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3522 rb_time_read(&cpu_buffer->write_stamp, &info->after); 3523 barrier(); 3524 info->ts = rb_time_stamp(cpu_buffer->buffer); 3525 3526 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3527 info->delta = info->ts; 3528 } else { 3529 /* 3530 * If interrupting an event time update, we may need an 3531 * absolute timestamp. 3532 * Don't bother if this is the start of a new page (w == 0). 3533 */ 3534 if (!w) { 3535 /* Use the sub-buffer timestamp */ 3536 info->delta = 0; 3537 } else if (unlikely(info->before != info->after)) { 3538 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3539 info->length += RB_LEN_TIME_EXTEND; 3540 } else { 3541 info->delta = info->ts - info->after; 3542 if (unlikely(test_time_stamp(info->delta))) { 3543 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3544 info->length += RB_LEN_TIME_EXTEND; 3545 } 3546 } 3547 } 3548 3549 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3550 3551 /*C*/ write = local_add_return(info->length, &tail_page->write); 3552 3553 /* set write to only the index of the write */ 3554 write &= RB_WRITE_MASK; 3555 3556 tail = write - info->length; 3557 3558 /* See if we shot pass the end of this buffer page */ 3559 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 3560 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3561 return rb_move_tail(cpu_buffer, tail, info); 3562 } 3563 3564 if (likely(tail == w)) { 3565 /* Nothing interrupted us between A and C */ 3566 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3567 /* 3568 * If something came in between C and D, the write stamp 3569 * may now not be in sync. But that's fine as the before_stamp 3570 * will be different and then next event will just be forced 3571 * to use an absolute timestamp. 3572 */ 3573 if (likely(!(info->add_timestamp & 3574 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3575 /* This did not interrupt any time update */ 3576 info->delta = info->ts - info->after; 3577 else 3578 /* Just use full timestamp for interrupting event */ 3579 info->delta = info->ts; 3580 check_buffer(cpu_buffer, info, tail); 3581 } else { 3582 u64 ts; 3583 /* SLOW PATH - Interrupted between A and C */ 3584 3585 /* Save the old before_stamp */ 3586 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3587 3588 /* 3589 * Read a new timestamp and update the before_stamp to make 3590 * the next event after this one force using an absolute 3591 * timestamp. This is in case an interrupt were to come in 3592 * between E and F. 3593 */ 3594 ts = rb_time_stamp(cpu_buffer->buffer); 3595 rb_time_set(&cpu_buffer->before_stamp, ts); 3596 3597 barrier(); 3598 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 3599 barrier(); 3600 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3601 info->after == info->before && info->after < ts) { 3602 /* 3603 * Nothing came after this event between C and F, it is 3604 * safe to use info->after for the delta as it 3605 * matched info->before and is still valid. 3606 */ 3607 info->delta = ts - info->after; 3608 } else { 3609 /* 3610 * Interrupted between C and F: 3611 * Lost the previous events time stamp. Just set the 3612 * delta to zero, and this will be the same time as 3613 * the event this event interrupted. And the events that 3614 * came after this will still be correct (as they would 3615 * have built their delta on the previous event. 3616 */ 3617 info->delta = 0; 3618 } 3619 info->ts = ts; 3620 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3621 } 3622 3623 /* 3624 * If this is the first commit on the page, then it has the same 3625 * timestamp as the page itself. 3626 */ 3627 if (unlikely(!tail && !(info->add_timestamp & 3628 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3629 info->delta = 0; 3630 3631 /* We reserved something on the buffer */ 3632 3633 event = __rb_page_index(tail_page, tail); 3634 rb_update_event(cpu_buffer, event, info); 3635 3636 local_inc(&tail_page->entries); 3637 3638 /* 3639 * If this is the first commit on the page, then update 3640 * its timestamp. 3641 */ 3642 if (unlikely(!tail)) 3643 tail_page->page->time_stamp = info->ts; 3644 3645 /* account for these added bytes */ 3646 local_add(info->length, &cpu_buffer->entries_bytes); 3647 3648 return event; 3649 } 3650 3651 static __always_inline struct ring_buffer_event * 3652 rb_reserve_next_event(struct trace_buffer *buffer, 3653 struct ring_buffer_per_cpu *cpu_buffer, 3654 unsigned long length) 3655 { 3656 struct ring_buffer_event *event; 3657 struct rb_event_info info; 3658 int nr_loops = 0; 3659 int add_ts_default; 3660 3661 /* ring buffer does cmpxchg, make sure it is safe in NMI context */ 3662 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) && 3663 (unlikely(in_nmi()))) { 3664 return NULL; 3665 } 3666 3667 rb_start_commit(cpu_buffer); 3668 /* The commit page can not change after this */ 3669 3670 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3671 /* 3672 * Due to the ability to swap a cpu buffer from a buffer 3673 * it is possible it was swapped before we committed. 3674 * (committing stops a swap). We check for it here and 3675 * if it happened, we have to fail the write. 3676 */ 3677 barrier(); 3678 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3679 local_dec(&cpu_buffer->committing); 3680 local_dec(&cpu_buffer->commits); 3681 return NULL; 3682 } 3683 #endif 3684 3685 info.length = rb_calculate_event_length(length); 3686 3687 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3688 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3689 info.length += RB_LEN_TIME_EXTEND; 3690 if (info.length > cpu_buffer->buffer->max_data_size) 3691 goto out_fail; 3692 } else { 3693 add_ts_default = RB_ADD_STAMP_NONE; 3694 } 3695 3696 again: 3697 info.add_timestamp = add_ts_default; 3698 info.delta = 0; 3699 3700 /* 3701 * We allow for interrupts to reenter here and do a trace. 3702 * If one does, it will cause this original code to loop 3703 * back here. Even with heavy interrupts happening, this 3704 * should only happen a few times in a row. If this happens 3705 * 1000 times in a row, there must be either an interrupt 3706 * storm or we have something buggy. 3707 * Bail! 3708 */ 3709 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3710 goto out_fail; 3711 3712 event = __rb_reserve_next(cpu_buffer, &info); 3713 3714 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3715 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3716 info.length -= RB_LEN_TIME_EXTEND; 3717 goto again; 3718 } 3719 3720 if (likely(event)) 3721 return event; 3722 out_fail: 3723 rb_end_commit(cpu_buffer); 3724 return NULL; 3725 } 3726 3727 /** 3728 * ring_buffer_lock_reserve - reserve a part of the buffer 3729 * @buffer: the ring buffer to reserve from 3730 * @length: the length of the data to reserve (excluding event header) 3731 * 3732 * Returns a reserved event on the ring buffer to copy directly to. 3733 * The user of this interface will need to get the body to write into 3734 * and can use the ring_buffer_event_data() interface. 3735 * 3736 * The length is the length of the data needed, not the event length 3737 * which also includes the event header. 3738 * 3739 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3740 * If NULL is returned, then nothing has been allocated or locked. 3741 */ 3742 struct ring_buffer_event * 3743 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3744 { 3745 struct ring_buffer_per_cpu *cpu_buffer; 3746 struct ring_buffer_event *event; 3747 int cpu; 3748 3749 /* If we are tracing schedule, we don't want to recurse */ 3750 preempt_disable_notrace(); 3751 3752 if (unlikely(atomic_read(&buffer->record_disabled))) 3753 goto out; 3754 3755 cpu = raw_smp_processor_id(); 3756 3757 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3758 goto out; 3759 3760 cpu_buffer = buffer->buffers[cpu]; 3761 3762 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3763 goto out; 3764 3765 if (unlikely(length > buffer->max_data_size)) 3766 goto out; 3767 3768 if (unlikely(trace_recursive_lock(cpu_buffer))) 3769 goto out; 3770 3771 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3772 if (!event) 3773 goto out_unlock; 3774 3775 return event; 3776 3777 out_unlock: 3778 trace_recursive_unlock(cpu_buffer); 3779 out: 3780 preempt_enable_notrace(); 3781 return NULL; 3782 } 3783 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3784 3785 /* 3786 * Decrement the entries to the page that an event is on. 3787 * The event does not even need to exist, only the pointer 3788 * to the page it is on. This may only be called before the commit 3789 * takes place. 3790 */ 3791 static inline void 3792 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3793 struct ring_buffer_event *event) 3794 { 3795 unsigned long addr = (unsigned long)event; 3796 struct buffer_page *bpage = cpu_buffer->commit_page; 3797 struct buffer_page *start; 3798 3799 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3800 3801 /* Do the likely case first */ 3802 if (likely(bpage->page == (void *)addr)) { 3803 local_dec(&bpage->entries); 3804 return; 3805 } 3806 3807 /* 3808 * Because the commit page may be on the reader page we 3809 * start with the next page and check the end loop there. 3810 */ 3811 rb_inc_page(&bpage); 3812 start = bpage; 3813 do { 3814 if (bpage->page == (void *)addr) { 3815 local_dec(&bpage->entries); 3816 return; 3817 } 3818 rb_inc_page(&bpage); 3819 } while (bpage != start); 3820 3821 /* commit not part of this buffer?? */ 3822 RB_WARN_ON(cpu_buffer, 1); 3823 } 3824 3825 /** 3826 * ring_buffer_discard_commit - discard an event that has not been committed 3827 * @buffer: the ring buffer 3828 * @event: non committed event to discard 3829 * 3830 * Sometimes an event that is in the ring buffer needs to be ignored. 3831 * This function lets the user discard an event in the ring buffer 3832 * and then that event will not be read later. 3833 * 3834 * This function only works if it is called before the item has been 3835 * committed. It will try to free the event from the ring buffer 3836 * if another event has not been added behind it. 3837 * 3838 * If another event has been added behind it, it will set the event 3839 * up as discarded, and perform the commit. 3840 * 3841 * If this function is called, do not call ring_buffer_unlock_commit on 3842 * the event. 3843 */ 3844 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3845 struct ring_buffer_event *event) 3846 { 3847 struct ring_buffer_per_cpu *cpu_buffer; 3848 int cpu; 3849 3850 /* The event is discarded regardless */ 3851 rb_event_discard(event); 3852 3853 cpu = smp_processor_id(); 3854 cpu_buffer = buffer->buffers[cpu]; 3855 3856 /* 3857 * This must only be called if the event has not been 3858 * committed yet. Thus we can assume that preemption 3859 * is still disabled. 3860 */ 3861 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3862 3863 rb_decrement_entry(cpu_buffer, event); 3864 if (rb_try_to_discard(cpu_buffer, event)) 3865 goto out; 3866 3867 out: 3868 rb_end_commit(cpu_buffer); 3869 3870 trace_recursive_unlock(cpu_buffer); 3871 3872 preempt_enable_notrace(); 3873 3874 } 3875 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3876 3877 /** 3878 * ring_buffer_write - write data to the buffer without reserving 3879 * @buffer: The ring buffer to write to. 3880 * @length: The length of the data being written (excluding the event header) 3881 * @data: The data to write to the buffer. 3882 * 3883 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3884 * one function. If you already have the data to write to the buffer, it 3885 * may be easier to simply call this function. 3886 * 3887 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3888 * and not the length of the event which would hold the header. 3889 */ 3890 int ring_buffer_write(struct trace_buffer *buffer, 3891 unsigned long length, 3892 void *data) 3893 { 3894 struct ring_buffer_per_cpu *cpu_buffer; 3895 struct ring_buffer_event *event; 3896 void *body; 3897 int ret = -EBUSY; 3898 int cpu; 3899 3900 preempt_disable_notrace(); 3901 3902 if (atomic_read(&buffer->record_disabled)) 3903 goto out; 3904 3905 cpu = raw_smp_processor_id(); 3906 3907 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3908 goto out; 3909 3910 cpu_buffer = buffer->buffers[cpu]; 3911 3912 if (atomic_read(&cpu_buffer->record_disabled)) 3913 goto out; 3914 3915 if (length > buffer->max_data_size) 3916 goto out; 3917 3918 if (unlikely(trace_recursive_lock(cpu_buffer))) 3919 goto out; 3920 3921 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3922 if (!event) 3923 goto out_unlock; 3924 3925 body = rb_event_data(event); 3926 3927 memcpy(body, data, length); 3928 3929 rb_commit(cpu_buffer); 3930 3931 rb_wakeups(buffer, cpu_buffer); 3932 3933 ret = 0; 3934 3935 out_unlock: 3936 trace_recursive_unlock(cpu_buffer); 3937 3938 out: 3939 preempt_enable_notrace(); 3940 3941 return ret; 3942 } 3943 EXPORT_SYMBOL_GPL(ring_buffer_write); 3944 3945 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3946 { 3947 struct buffer_page *reader = cpu_buffer->reader_page; 3948 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3949 struct buffer_page *commit = cpu_buffer->commit_page; 3950 3951 /* In case of error, head will be NULL */ 3952 if (unlikely(!head)) 3953 return true; 3954 3955 /* Reader should exhaust content in reader page */ 3956 if (reader->read != rb_page_commit(reader)) 3957 return false; 3958 3959 /* 3960 * If writers are committing on the reader page, knowing all 3961 * committed content has been read, the ring buffer is empty. 3962 */ 3963 if (commit == reader) 3964 return true; 3965 3966 /* 3967 * If writers are committing on a page other than reader page 3968 * and head page, there should always be content to read. 3969 */ 3970 if (commit != head) 3971 return false; 3972 3973 /* 3974 * Writers are committing on the head page, we just need 3975 * to care about there're committed data, and the reader will 3976 * swap reader page with head page when it is to read data. 3977 */ 3978 return rb_page_commit(commit) == 0; 3979 } 3980 3981 /** 3982 * ring_buffer_record_disable - stop all writes into the buffer 3983 * @buffer: The ring buffer to stop writes to. 3984 * 3985 * This prevents all writes to the buffer. Any attempt to write 3986 * to the buffer after this will fail and return NULL. 3987 * 3988 * The caller should call synchronize_rcu() after this. 3989 */ 3990 void ring_buffer_record_disable(struct trace_buffer *buffer) 3991 { 3992 atomic_inc(&buffer->record_disabled); 3993 } 3994 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3995 3996 /** 3997 * ring_buffer_record_enable - enable writes to the buffer 3998 * @buffer: The ring buffer to enable writes 3999 * 4000 * Note, multiple disables will need the same number of enables 4001 * to truly enable the writing (much like preempt_disable). 4002 */ 4003 void ring_buffer_record_enable(struct trace_buffer *buffer) 4004 { 4005 atomic_dec(&buffer->record_disabled); 4006 } 4007 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4008 4009 /** 4010 * ring_buffer_record_off - stop all writes into the buffer 4011 * @buffer: The ring buffer to stop writes to. 4012 * 4013 * This prevents all writes to the buffer. Any attempt to write 4014 * to the buffer after this will fail and return NULL. 4015 * 4016 * This is different than ring_buffer_record_disable() as 4017 * it works like an on/off switch, where as the disable() version 4018 * must be paired with a enable(). 4019 */ 4020 void ring_buffer_record_off(struct trace_buffer *buffer) 4021 { 4022 unsigned int rd; 4023 unsigned int new_rd; 4024 4025 rd = atomic_read(&buffer->record_disabled); 4026 do { 4027 new_rd = rd | RB_BUFFER_OFF; 4028 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4029 } 4030 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4031 4032 /** 4033 * ring_buffer_record_on - restart writes into the buffer 4034 * @buffer: The ring buffer to start writes to. 4035 * 4036 * This enables all writes to the buffer that was disabled by 4037 * ring_buffer_record_off(). 4038 * 4039 * This is different than ring_buffer_record_enable() as 4040 * it works like an on/off switch, where as the enable() version 4041 * must be paired with a disable(). 4042 */ 4043 void ring_buffer_record_on(struct trace_buffer *buffer) 4044 { 4045 unsigned int rd; 4046 unsigned int new_rd; 4047 4048 rd = atomic_read(&buffer->record_disabled); 4049 do { 4050 new_rd = rd & ~RB_BUFFER_OFF; 4051 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4052 } 4053 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4054 4055 /** 4056 * ring_buffer_record_is_on - return true if the ring buffer can write 4057 * @buffer: The ring buffer to see if write is enabled 4058 * 4059 * Returns true if the ring buffer is in a state that it accepts writes. 4060 */ 4061 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4062 { 4063 return !atomic_read(&buffer->record_disabled); 4064 } 4065 4066 /** 4067 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4068 * @buffer: The ring buffer to see if write is set enabled 4069 * 4070 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4071 * Note that this does NOT mean it is in a writable state. 4072 * 4073 * It may return true when the ring buffer has been disabled by 4074 * ring_buffer_record_disable(), as that is a temporary disabling of 4075 * the ring buffer. 4076 */ 4077 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4078 { 4079 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4080 } 4081 4082 /** 4083 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4084 * @buffer: The ring buffer to stop writes to. 4085 * @cpu: The CPU buffer to stop 4086 * 4087 * This prevents all writes to the buffer. Any attempt to write 4088 * to the buffer after this will fail and return NULL. 4089 * 4090 * The caller should call synchronize_rcu() after this. 4091 */ 4092 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4093 { 4094 struct ring_buffer_per_cpu *cpu_buffer; 4095 4096 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4097 return; 4098 4099 cpu_buffer = buffer->buffers[cpu]; 4100 atomic_inc(&cpu_buffer->record_disabled); 4101 } 4102 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4103 4104 /** 4105 * ring_buffer_record_enable_cpu - enable writes to the buffer 4106 * @buffer: The ring buffer to enable writes 4107 * @cpu: The CPU to enable. 4108 * 4109 * Note, multiple disables will need the same number of enables 4110 * to truly enable the writing (much like preempt_disable). 4111 */ 4112 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4113 { 4114 struct ring_buffer_per_cpu *cpu_buffer; 4115 4116 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4117 return; 4118 4119 cpu_buffer = buffer->buffers[cpu]; 4120 atomic_dec(&cpu_buffer->record_disabled); 4121 } 4122 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4123 4124 /* 4125 * The total entries in the ring buffer is the running counter 4126 * of entries entered into the ring buffer, minus the sum of 4127 * the entries read from the ring buffer and the number of 4128 * entries that were overwritten. 4129 */ 4130 static inline unsigned long 4131 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4132 { 4133 return local_read(&cpu_buffer->entries) - 4134 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4135 } 4136 4137 /** 4138 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4139 * @buffer: The ring buffer 4140 * @cpu: The per CPU buffer to read from. 4141 */ 4142 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4143 { 4144 unsigned long flags; 4145 struct ring_buffer_per_cpu *cpu_buffer; 4146 struct buffer_page *bpage; 4147 u64 ret = 0; 4148 4149 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4150 return 0; 4151 4152 cpu_buffer = buffer->buffers[cpu]; 4153 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4154 /* 4155 * if the tail is on reader_page, oldest time stamp is on the reader 4156 * page 4157 */ 4158 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4159 bpage = cpu_buffer->reader_page; 4160 else 4161 bpage = rb_set_head_page(cpu_buffer); 4162 if (bpage) 4163 ret = bpage->page->time_stamp; 4164 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4165 4166 return ret; 4167 } 4168 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4169 4170 /** 4171 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4172 * @buffer: The ring buffer 4173 * @cpu: The per CPU buffer to read from. 4174 */ 4175 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4176 { 4177 struct ring_buffer_per_cpu *cpu_buffer; 4178 unsigned long ret; 4179 4180 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4181 return 0; 4182 4183 cpu_buffer = buffer->buffers[cpu]; 4184 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4185 4186 return ret; 4187 } 4188 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4189 4190 /** 4191 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4192 * @buffer: The ring buffer 4193 * @cpu: The per CPU buffer to get the entries from. 4194 */ 4195 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4196 { 4197 struct ring_buffer_per_cpu *cpu_buffer; 4198 4199 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4200 return 0; 4201 4202 cpu_buffer = buffer->buffers[cpu]; 4203 4204 return rb_num_of_entries(cpu_buffer); 4205 } 4206 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4207 4208 /** 4209 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4210 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4211 * @buffer: The ring buffer 4212 * @cpu: The per CPU buffer to get the number of overruns from 4213 */ 4214 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4215 { 4216 struct ring_buffer_per_cpu *cpu_buffer; 4217 unsigned long ret; 4218 4219 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4220 return 0; 4221 4222 cpu_buffer = buffer->buffers[cpu]; 4223 ret = local_read(&cpu_buffer->overrun); 4224 4225 return ret; 4226 } 4227 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4228 4229 /** 4230 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4231 * commits failing due to the buffer wrapping around while there are uncommitted 4232 * events, such as during an interrupt storm. 4233 * @buffer: The ring buffer 4234 * @cpu: The per CPU buffer to get the number of overruns from 4235 */ 4236 unsigned long 4237 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4238 { 4239 struct ring_buffer_per_cpu *cpu_buffer; 4240 unsigned long ret; 4241 4242 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4243 return 0; 4244 4245 cpu_buffer = buffer->buffers[cpu]; 4246 ret = local_read(&cpu_buffer->commit_overrun); 4247 4248 return ret; 4249 } 4250 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4251 4252 /** 4253 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4254 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4255 * @buffer: The ring buffer 4256 * @cpu: The per CPU buffer to get the number of overruns from 4257 */ 4258 unsigned long 4259 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4260 { 4261 struct ring_buffer_per_cpu *cpu_buffer; 4262 unsigned long ret; 4263 4264 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4265 return 0; 4266 4267 cpu_buffer = buffer->buffers[cpu]; 4268 ret = local_read(&cpu_buffer->dropped_events); 4269 4270 return ret; 4271 } 4272 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4273 4274 /** 4275 * ring_buffer_read_events_cpu - get the number of events successfully read 4276 * @buffer: The ring buffer 4277 * @cpu: The per CPU buffer to get the number of events read 4278 */ 4279 unsigned long 4280 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4281 { 4282 struct ring_buffer_per_cpu *cpu_buffer; 4283 4284 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4285 return 0; 4286 4287 cpu_buffer = buffer->buffers[cpu]; 4288 return cpu_buffer->read; 4289 } 4290 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4291 4292 /** 4293 * ring_buffer_entries - get the number of entries in a buffer 4294 * @buffer: The ring buffer 4295 * 4296 * Returns the total number of entries in the ring buffer 4297 * (all CPU entries) 4298 */ 4299 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4300 { 4301 struct ring_buffer_per_cpu *cpu_buffer; 4302 unsigned long entries = 0; 4303 int cpu; 4304 4305 /* if you care about this being correct, lock the buffer */ 4306 for_each_buffer_cpu(buffer, cpu) { 4307 cpu_buffer = buffer->buffers[cpu]; 4308 entries += rb_num_of_entries(cpu_buffer); 4309 } 4310 4311 return entries; 4312 } 4313 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4314 4315 /** 4316 * ring_buffer_overruns - get the number of overruns in buffer 4317 * @buffer: The ring buffer 4318 * 4319 * Returns the total number of overruns in the ring buffer 4320 * (all CPU entries) 4321 */ 4322 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4323 { 4324 struct ring_buffer_per_cpu *cpu_buffer; 4325 unsigned long overruns = 0; 4326 int cpu; 4327 4328 /* if you care about this being correct, lock the buffer */ 4329 for_each_buffer_cpu(buffer, cpu) { 4330 cpu_buffer = buffer->buffers[cpu]; 4331 overruns += local_read(&cpu_buffer->overrun); 4332 } 4333 4334 return overruns; 4335 } 4336 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4337 4338 static void rb_iter_reset(struct ring_buffer_iter *iter) 4339 { 4340 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4341 4342 /* Iterator usage is expected to have record disabled */ 4343 iter->head_page = cpu_buffer->reader_page; 4344 iter->head = cpu_buffer->reader_page->read; 4345 iter->next_event = iter->head; 4346 4347 iter->cache_reader_page = iter->head_page; 4348 iter->cache_read = cpu_buffer->read; 4349 iter->cache_pages_removed = cpu_buffer->pages_removed; 4350 4351 if (iter->head) { 4352 iter->read_stamp = cpu_buffer->read_stamp; 4353 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4354 } else { 4355 iter->read_stamp = iter->head_page->page->time_stamp; 4356 iter->page_stamp = iter->read_stamp; 4357 } 4358 } 4359 4360 /** 4361 * ring_buffer_iter_reset - reset an iterator 4362 * @iter: The iterator to reset 4363 * 4364 * Resets the iterator, so that it will start from the beginning 4365 * again. 4366 */ 4367 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4368 { 4369 struct ring_buffer_per_cpu *cpu_buffer; 4370 unsigned long flags; 4371 4372 if (!iter) 4373 return; 4374 4375 cpu_buffer = iter->cpu_buffer; 4376 4377 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4378 rb_iter_reset(iter); 4379 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4380 } 4381 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4382 4383 /** 4384 * ring_buffer_iter_empty - check if an iterator has no more to read 4385 * @iter: The iterator to check 4386 */ 4387 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4388 { 4389 struct ring_buffer_per_cpu *cpu_buffer; 4390 struct buffer_page *reader; 4391 struct buffer_page *head_page; 4392 struct buffer_page *commit_page; 4393 struct buffer_page *curr_commit_page; 4394 unsigned commit; 4395 u64 curr_commit_ts; 4396 u64 commit_ts; 4397 4398 cpu_buffer = iter->cpu_buffer; 4399 reader = cpu_buffer->reader_page; 4400 head_page = cpu_buffer->head_page; 4401 commit_page = READ_ONCE(cpu_buffer->commit_page); 4402 commit_ts = commit_page->page->time_stamp; 4403 4404 /* 4405 * When the writer goes across pages, it issues a cmpxchg which 4406 * is a mb(), which will synchronize with the rmb here. 4407 * (see rb_tail_page_update()) 4408 */ 4409 smp_rmb(); 4410 commit = rb_page_commit(commit_page); 4411 /* We want to make sure that the commit page doesn't change */ 4412 smp_rmb(); 4413 4414 /* Make sure commit page didn't change */ 4415 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4416 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4417 4418 /* If the commit page changed, then there's more data */ 4419 if (curr_commit_page != commit_page || 4420 curr_commit_ts != commit_ts) 4421 return 0; 4422 4423 /* Still racy, as it may return a false positive, but that's OK */ 4424 return ((iter->head_page == commit_page && iter->head >= commit) || 4425 (iter->head_page == reader && commit_page == head_page && 4426 head_page->read == commit && 4427 iter->head == rb_page_commit(cpu_buffer->reader_page))); 4428 } 4429 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4430 4431 static void 4432 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4433 struct ring_buffer_event *event) 4434 { 4435 u64 delta; 4436 4437 switch (event->type_len) { 4438 case RINGBUF_TYPE_PADDING: 4439 return; 4440 4441 case RINGBUF_TYPE_TIME_EXTEND: 4442 delta = rb_event_time_stamp(event); 4443 cpu_buffer->read_stamp += delta; 4444 return; 4445 4446 case RINGBUF_TYPE_TIME_STAMP: 4447 delta = rb_event_time_stamp(event); 4448 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4449 cpu_buffer->read_stamp = delta; 4450 return; 4451 4452 case RINGBUF_TYPE_DATA: 4453 cpu_buffer->read_stamp += event->time_delta; 4454 return; 4455 4456 default: 4457 RB_WARN_ON(cpu_buffer, 1); 4458 } 4459 } 4460 4461 static void 4462 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4463 struct ring_buffer_event *event) 4464 { 4465 u64 delta; 4466 4467 switch (event->type_len) { 4468 case RINGBUF_TYPE_PADDING: 4469 return; 4470 4471 case RINGBUF_TYPE_TIME_EXTEND: 4472 delta = rb_event_time_stamp(event); 4473 iter->read_stamp += delta; 4474 return; 4475 4476 case RINGBUF_TYPE_TIME_STAMP: 4477 delta = rb_event_time_stamp(event); 4478 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4479 iter->read_stamp = delta; 4480 return; 4481 4482 case RINGBUF_TYPE_DATA: 4483 iter->read_stamp += event->time_delta; 4484 return; 4485 4486 default: 4487 RB_WARN_ON(iter->cpu_buffer, 1); 4488 } 4489 } 4490 4491 static struct buffer_page * 4492 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4493 { 4494 struct buffer_page *reader = NULL; 4495 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 4496 unsigned long overwrite; 4497 unsigned long flags; 4498 int nr_loops = 0; 4499 bool ret; 4500 4501 local_irq_save(flags); 4502 arch_spin_lock(&cpu_buffer->lock); 4503 4504 again: 4505 /* 4506 * This should normally only loop twice. But because the 4507 * start of the reader inserts an empty page, it causes 4508 * a case where we will loop three times. There should be no 4509 * reason to loop four times (that I know of). 4510 */ 4511 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4512 reader = NULL; 4513 goto out; 4514 } 4515 4516 reader = cpu_buffer->reader_page; 4517 4518 /* If there's more to read, return this page */ 4519 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4520 goto out; 4521 4522 /* Never should we have an index greater than the size */ 4523 if (RB_WARN_ON(cpu_buffer, 4524 cpu_buffer->reader_page->read > rb_page_size(reader))) 4525 goto out; 4526 4527 /* check if we caught up to the tail */ 4528 reader = NULL; 4529 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4530 goto out; 4531 4532 /* Don't bother swapping if the ring buffer is empty */ 4533 if (rb_num_of_entries(cpu_buffer) == 0) 4534 goto out; 4535 4536 /* 4537 * Reset the reader page to size zero. 4538 */ 4539 local_set(&cpu_buffer->reader_page->write, 0); 4540 local_set(&cpu_buffer->reader_page->entries, 0); 4541 local_set(&cpu_buffer->reader_page->page->commit, 0); 4542 cpu_buffer->reader_page->real_end = 0; 4543 4544 spin: 4545 /* 4546 * Splice the empty reader page into the list around the head. 4547 */ 4548 reader = rb_set_head_page(cpu_buffer); 4549 if (!reader) 4550 goto out; 4551 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4552 cpu_buffer->reader_page->list.prev = reader->list.prev; 4553 4554 /* 4555 * cpu_buffer->pages just needs to point to the buffer, it 4556 * has no specific buffer page to point to. Lets move it out 4557 * of our way so we don't accidentally swap it. 4558 */ 4559 cpu_buffer->pages = reader->list.prev; 4560 4561 /* The reader page will be pointing to the new head */ 4562 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4563 4564 /* 4565 * We want to make sure we read the overruns after we set up our 4566 * pointers to the next object. The writer side does a 4567 * cmpxchg to cross pages which acts as the mb on the writer 4568 * side. Note, the reader will constantly fail the swap 4569 * while the writer is updating the pointers, so this 4570 * guarantees that the overwrite recorded here is the one we 4571 * want to compare with the last_overrun. 4572 */ 4573 smp_mb(); 4574 overwrite = local_read(&(cpu_buffer->overrun)); 4575 4576 /* 4577 * Here's the tricky part. 4578 * 4579 * We need to move the pointer past the header page. 4580 * But we can only do that if a writer is not currently 4581 * moving it. The page before the header page has the 4582 * flag bit '1' set if it is pointing to the page we want. 4583 * but if the writer is in the process of moving it 4584 * than it will be '2' or already moved '0'. 4585 */ 4586 4587 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4588 4589 /* 4590 * If we did not convert it, then we must try again. 4591 */ 4592 if (!ret) 4593 goto spin; 4594 4595 /* 4596 * Yay! We succeeded in replacing the page. 4597 * 4598 * Now make the new head point back to the reader page. 4599 */ 4600 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4601 rb_inc_page(&cpu_buffer->head_page); 4602 4603 local_inc(&cpu_buffer->pages_read); 4604 4605 /* Finally update the reader page to the new head */ 4606 cpu_buffer->reader_page = reader; 4607 cpu_buffer->reader_page->read = 0; 4608 4609 if (overwrite != cpu_buffer->last_overrun) { 4610 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4611 cpu_buffer->last_overrun = overwrite; 4612 } 4613 4614 goto again; 4615 4616 out: 4617 /* Update the read_stamp on the first event */ 4618 if (reader && reader->read == 0) 4619 cpu_buffer->read_stamp = reader->page->time_stamp; 4620 4621 arch_spin_unlock(&cpu_buffer->lock); 4622 local_irq_restore(flags); 4623 4624 /* 4625 * The writer has preempt disable, wait for it. But not forever 4626 * Although, 1 second is pretty much "forever" 4627 */ 4628 #define USECS_WAIT 1000000 4629 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4630 /* If the write is past the end of page, a writer is still updating it */ 4631 if (likely(!reader || rb_page_write(reader) <= bsize)) 4632 break; 4633 4634 udelay(1); 4635 4636 /* Get the latest version of the reader write value */ 4637 smp_rmb(); 4638 } 4639 4640 /* The writer is not moving forward? Something is wrong */ 4641 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4642 reader = NULL; 4643 4644 /* 4645 * Make sure we see any padding after the write update 4646 * (see rb_reset_tail()). 4647 * 4648 * In addition, a writer may be writing on the reader page 4649 * if the page has not been fully filled, so the read barrier 4650 * is also needed to make sure we see the content of what is 4651 * committed by the writer (see rb_set_commit_to_write()). 4652 */ 4653 smp_rmb(); 4654 4655 4656 return reader; 4657 } 4658 4659 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4660 { 4661 struct ring_buffer_event *event; 4662 struct buffer_page *reader; 4663 unsigned length; 4664 4665 reader = rb_get_reader_page(cpu_buffer); 4666 4667 /* This function should not be called when buffer is empty */ 4668 if (RB_WARN_ON(cpu_buffer, !reader)) 4669 return; 4670 4671 event = rb_reader_event(cpu_buffer); 4672 4673 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4674 cpu_buffer->read++; 4675 4676 rb_update_read_stamp(cpu_buffer, event); 4677 4678 length = rb_event_length(event); 4679 cpu_buffer->reader_page->read += length; 4680 cpu_buffer->read_bytes += length; 4681 } 4682 4683 static void rb_advance_iter(struct ring_buffer_iter *iter) 4684 { 4685 struct ring_buffer_per_cpu *cpu_buffer; 4686 4687 cpu_buffer = iter->cpu_buffer; 4688 4689 /* If head == next_event then we need to jump to the next event */ 4690 if (iter->head == iter->next_event) { 4691 /* If the event gets overwritten again, there's nothing to do */ 4692 if (rb_iter_head_event(iter) == NULL) 4693 return; 4694 } 4695 4696 iter->head = iter->next_event; 4697 4698 /* 4699 * Check if we are at the end of the buffer. 4700 */ 4701 if (iter->next_event >= rb_page_size(iter->head_page)) { 4702 /* discarded commits can make the page empty */ 4703 if (iter->head_page == cpu_buffer->commit_page) 4704 return; 4705 rb_inc_iter(iter); 4706 return; 4707 } 4708 4709 rb_update_iter_read_stamp(iter, iter->event); 4710 } 4711 4712 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4713 { 4714 return cpu_buffer->lost_events; 4715 } 4716 4717 static struct ring_buffer_event * 4718 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4719 unsigned long *lost_events) 4720 { 4721 struct ring_buffer_event *event; 4722 struct buffer_page *reader; 4723 int nr_loops = 0; 4724 4725 if (ts) 4726 *ts = 0; 4727 again: 4728 /* 4729 * We repeat when a time extend is encountered. 4730 * Since the time extend is always attached to a data event, 4731 * we should never loop more than once. 4732 * (We never hit the following condition more than twice). 4733 */ 4734 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4735 return NULL; 4736 4737 reader = rb_get_reader_page(cpu_buffer); 4738 if (!reader) 4739 return NULL; 4740 4741 event = rb_reader_event(cpu_buffer); 4742 4743 switch (event->type_len) { 4744 case RINGBUF_TYPE_PADDING: 4745 if (rb_null_event(event)) 4746 RB_WARN_ON(cpu_buffer, 1); 4747 /* 4748 * Because the writer could be discarding every 4749 * event it creates (which would probably be bad) 4750 * if we were to go back to "again" then we may never 4751 * catch up, and will trigger the warn on, or lock 4752 * the box. Return the padding, and we will release 4753 * the current locks, and try again. 4754 */ 4755 return event; 4756 4757 case RINGBUF_TYPE_TIME_EXTEND: 4758 /* Internal data, OK to advance */ 4759 rb_advance_reader(cpu_buffer); 4760 goto again; 4761 4762 case RINGBUF_TYPE_TIME_STAMP: 4763 if (ts) { 4764 *ts = rb_event_time_stamp(event); 4765 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4766 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4767 cpu_buffer->cpu, ts); 4768 } 4769 /* Internal data, OK to advance */ 4770 rb_advance_reader(cpu_buffer); 4771 goto again; 4772 4773 case RINGBUF_TYPE_DATA: 4774 if (ts && !(*ts)) { 4775 *ts = cpu_buffer->read_stamp + event->time_delta; 4776 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4777 cpu_buffer->cpu, ts); 4778 } 4779 if (lost_events) 4780 *lost_events = rb_lost_events(cpu_buffer); 4781 return event; 4782 4783 default: 4784 RB_WARN_ON(cpu_buffer, 1); 4785 } 4786 4787 return NULL; 4788 } 4789 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4790 4791 static struct ring_buffer_event * 4792 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4793 { 4794 struct trace_buffer *buffer; 4795 struct ring_buffer_per_cpu *cpu_buffer; 4796 struct ring_buffer_event *event; 4797 int nr_loops = 0; 4798 4799 if (ts) 4800 *ts = 0; 4801 4802 cpu_buffer = iter->cpu_buffer; 4803 buffer = cpu_buffer->buffer; 4804 4805 /* 4806 * Check if someone performed a consuming read to the buffer 4807 * or removed some pages from the buffer. In these cases, 4808 * iterator was invalidated and we need to reset it. 4809 */ 4810 if (unlikely(iter->cache_read != cpu_buffer->read || 4811 iter->cache_reader_page != cpu_buffer->reader_page || 4812 iter->cache_pages_removed != cpu_buffer->pages_removed)) 4813 rb_iter_reset(iter); 4814 4815 again: 4816 if (ring_buffer_iter_empty(iter)) 4817 return NULL; 4818 4819 /* 4820 * As the writer can mess with what the iterator is trying 4821 * to read, just give up if we fail to get an event after 4822 * three tries. The iterator is not as reliable when reading 4823 * the ring buffer with an active write as the consumer is. 4824 * Do not warn if the three failures is reached. 4825 */ 4826 if (++nr_loops > 3) 4827 return NULL; 4828 4829 if (rb_per_cpu_empty(cpu_buffer)) 4830 return NULL; 4831 4832 if (iter->head >= rb_page_size(iter->head_page)) { 4833 rb_inc_iter(iter); 4834 goto again; 4835 } 4836 4837 event = rb_iter_head_event(iter); 4838 if (!event) 4839 goto again; 4840 4841 switch (event->type_len) { 4842 case RINGBUF_TYPE_PADDING: 4843 if (rb_null_event(event)) { 4844 rb_inc_iter(iter); 4845 goto again; 4846 } 4847 rb_advance_iter(iter); 4848 return event; 4849 4850 case RINGBUF_TYPE_TIME_EXTEND: 4851 /* Internal data, OK to advance */ 4852 rb_advance_iter(iter); 4853 goto again; 4854 4855 case RINGBUF_TYPE_TIME_STAMP: 4856 if (ts) { 4857 *ts = rb_event_time_stamp(event); 4858 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4859 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4860 cpu_buffer->cpu, ts); 4861 } 4862 /* Internal data, OK to advance */ 4863 rb_advance_iter(iter); 4864 goto again; 4865 4866 case RINGBUF_TYPE_DATA: 4867 if (ts && !(*ts)) { 4868 *ts = iter->read_stamp + event->time_delta; 4869 ring_buffer_normalize_time_stamp(buffer, 4870 cpu_buffer->cpu, ts); 4871 } 4872 return event; 4873 4874 default: 4875 RB_WARN_ON(cpu_buffer, 1); 4876 } 4877 4878 return NULL; 4879 } 4880 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4881 4882 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4883 { 4884 if (likely(!in_nmi())) { 4885 raw_spin_lock(&cpu_buffer->reader_lock); 4886 return true; 4887 } 4888 4889 /* 4890 * If an NMI die dumps out the content of the ring buffer 4891 * trylock must be used to prevent a deadlock if the NMI 4892 * preempted a task that holds the ring buffer locks. If 4893 * we get the lock then all is fine, if not, then continue 4894 * to do the read, but this can corrupt the ring buffer, 4895 * so it must be permanently disabled from future writes. 4896 * Reading from NMI is a oneshot deal. 4897 */ 4898 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4899 return true; 4900 4901 /* Continue without locking, but disable the ring buffer */ 4902 atomic_inc(&cpu_buffer->record_disabled); 4903 return false; 4904 } 4905 4906 static inline void 4907 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4908 { 4909 if (likely(locked)) 4910 raw_spin_unlock(&cpu_buffer->reader_lock); 4911 } 4912 4913 /** 4914 * ring_buffer_peek - peek at the next event to be read 4915 * @buffer: The ring buffer to read 4916 * @cpu: The cpu to peak at 4917 * @ts: The timestamp counter of this event. 4918 * @lost_events: a variable to store if events were lost (may be NULL) 4919 * 4920 * This will return the event that will be read next, but does 4921 * not consume the data. 4922 */ 4923 struct ring_buffer_event * 4924 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4925 unsigned long *lost_events) 4926 { 4927 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4928 struct ring_buffer_event *event; 4929 unsigned long flags; 4930 bool dolock; 4931 4932 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4933 return NULL; 4934 4935 again: 4936 local_irq_save(flags); 4937 dolock = rb_reader_lock(cpu_buffer); 4938 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4939 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4940 rb_advance_reader(cpu_buffer); 4941 rb_reader_unlock(cpu_buffer, dolock); 4942 local_irq_restore(flags); 4943 4944 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4945 goto again; 4946 4947 return event; 4948 } 4949 4950 /** ring_buffer_iter_dropped - report if there are dropped events 4951 * @iter: The ring buffer iterator 4952 * 4953 * Returns true if there was dropped events since the last peek. 4954 */ 4955 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4956 { 4957 bool ret = iter->missed_events != 0; 4958 4959 iter->missed_events = 0; 4960 return ret; 4961 } 4962 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4963 4964 /** 4965 * ring_buffer_iter_peek - peek at the next event to be read 4966 * @iter: The ring buffer iterator 4967 * @ts: The timestamp counter of this event. 4968 * 4969 * This will return the event that will be read next, but does 4970 * not increment the iterator. 4971 */ 4972 struct ring_buffer_event * 4973 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4974 { 4975 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4976 struct ring_buffer_event *event; 4977 unsigned long flags; 4978 4979 again: 4980 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4981 event = rb_iter_peek(iter, ts); 4982 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4983 4984 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4985 goto again; 4986 4987 return event; 4988 } 4989 4990 /** 4991 * ring_buffer_consume - return an event and consume it 4992 * @buffer: The ring buffer to get the next event from 4993 * @cpu: the cpu to read the buffer from 4994 * @ts: a variable to store the timestamp (may be NULL) 4995 * @lost_events: a variable to store if events were lost (may be NULL) 4996 * 4997 * Returns the next event in the ring buffer, and that event is consumed. 4998 * Meaning, that sequential reads will keep returning a different event, 4999 * and eventually empty the ring buffer if the producer is slower. 5000 */ 5001 struct ring_buffer_event * 5002 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5003 unsigned long *lost_events) 5004 { 5005 struct ring_buffer_per_cpu *cpu_buffer; 5006 struct ring_buffer_event *event = NULL; 5007 unsigned long flags; 5008 bool dolock; 5009 5010 again: 5011 /* might be called in atomic */ 5012 preempt_disable(); 5013 5014 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5015 goto out; 5016 5017 cpu_buffer = buffer->buffers[cpu]; 5018 local_irq_save(flags); 5019 dolock = rb_reader_lock(cpu_buffer); 5020 5021 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5022 if (event) { 5023 cpu_buffer->lost_events = 0; 5024 rb_advance_reader(cpu_buffer); 5025 } 5026 5027 rb_reader_unlock(cpu_buffer, dolock); 5028 local_irq_restore(flags); 5029 5030 out: 5031 preempt_enable(); 5032 5033 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5034 goto again; 5035 5036 return event; 5037 } 5038 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5039 5040 /** 5041 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5042 * @buffer: The ring buffer to read from 5043 * @cpu: The cpu buffer to iterate over 5044 * @flags: gfp flags to use for memory allocation 5045 * 5046 * This performs the initial preparations necessary to iterate 5047 * through the buffer. Memory is allocated, buffer recording 5048 * is disabled, and the iterator pointer is returned to the caller. 5049 * 5050 * Disabling buffer recording prevents the reading from being 5051 * corrupted. This is not a consuming read, so a producer is not 5052 * expected. 5053 * 5054 * After a sequence of ring_buffer_read_prepare calls, the user is 5055 * expected to make at least one call to ring_buffer_read_prepare_sync. 5056 * Afterwards, ring_buffer_read_start is invoked to get things going 5057 * for real. 5058 * 5059 * This overall must be paired with ring_buffer_read_finish. 5060 */ 5061 struct ring_buffer_iter * 5062 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5063 { 5064 struct ring_buffer_per_cpu *cpu_buffer; 5065 struct ring_buffer_iter *iter; 5066 5067 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5068 return NULL; 5069 5070 iter = kzalloc(sizeof(*iter), flags); 5071 if (!iter) 5072 return NULL; 5073 5074 /* Holds the entire event: data and meta data */ 5075 iter->event_size = buffer->subbuf_size; 5076 iter->event = kmalloc(iter->event_size, flags); 5077 if (!iter->event) { 5078 kfree(iter); 5079 return NULL; 5080 } 5081 5082 cpu_buffer = buffer->buffers[cpu]; 5083 5084 iter->cpu_buffer = cpu_buffer; 5085 5086 atomic_inc(&cpu_buffer->resize_disabled); 5087 5088 return iter; 5089 } 5090 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5091 5092 /** 5093 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5094 * 5095 * All previously invoked ring_buffer_read_prepare calls to prepare 5096 * iterators will be synchronized. Afterwards, read_buffer_read_start 5097 * calls on those iterators are allowed. 5098 */ 5099 void 5100 ring_buffer_read_prepare_sync(void) 5101 { 5102 synchronize_rcu(); 5103 } 5104 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5105 5106 /** 5107 * ring_buffer_read_start - start a non consuming read of the buffer 5108 * @iter: The iterator returned by ring_buffer_read_prepare 5109 * 5110 * This finalizes the startup of an iteration through the buffer. 5111 * The iterator comes from a call to ring_buffer_read_prepare and 5112 * an intervening ring_buffer_read_prepare_sync must have been 5113 * performed. 5114 * 5115 * Must be paired with ring_buffer_read_finish. 5116 */ 5117 void 5118 ring_buffer_read_start(struct ring_buffer_iter *iter) 5119 { 5120 struct ring_buffer_per_cpu *cpu_buffer; 5121 unsigned long flags; 5122 5123 if (!iter) 5124 return; 5125 5126 cpu_buffer = iter->cpu_buffer; 5127 5128 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5129 arch_spin_lock(&cpu_buffer->lock); 5130 rb_iter_reset(iter); 5131 arch_spin_unlock(&cpu_buffer->lock); 5132 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5133 } 5134 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5135 5136 /** 5137 * ring_buffer_read_finish - finish reading the iterator of the buffer 5138 * @iter: The iterator retrieved by ring_buffer_start 5139 * 5140 * This re-enables the recording to the buffer, and frees the 5141 * iterator. 5142 */ 5143 void 5144 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5145 { 5146 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5147 unsigned long flags; 5148 5149 /* 5150 * Ring buffer is disabled from recording, here's a good place 5151 * to check the integrity of the ring buffer. 5152 * Must prevent readers from trying to read, as the check 5153 * clears the HEAD page and readers require it. 5154 */ 5155 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5156 rb_check_pages(cpu_buffer); 5157 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5158 5159 atomic_dec(&cpu_buffer->resize_disabled); 5160 kfree(iter->event); 5161 kfree(iter); 5162 } 5163 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5164 5165 /** 5166 * ring_buffer_iter_advance - advance the iterator to the next location 5167 * @iter: The ring buffer iterator 5168 * 5169 * Move the location of the iterator such that the next read will 5170 * be the next location of the iterator. 5171 */ 5172 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5173 { 5174 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5175 unsigned long flags; 5176 5177 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5178 5179 rb_advance_iter(iter); 5180 5181 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5182 } 5183 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5184 5185 /** 5186 * ring_buffer_size - return the size of the ring buffer (in bytes) 5187 * @buffer: The ring buffer. 5188 * @cpu: The CPU to get ring buffer size from. 5189 */ 5190 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5191 { 5192 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5193 return 0; 5194 5195 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5196 } 5197 EXPORT_SYMBOL_GPL(ring_buffer_size); 5198 5199 /** 5200 * ring_buffer_max_event_size - return the max data size of an event 5201 * @buffer: The ring buffer. 5202 * 5203 * Returns the maximum size an event can be. 5204 */ 5205 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5206 { 5207 /* If abs timestamp is requested, events have a timestamp too */ 5208 if (ring_buffer_time_stamp_abs(buffer)) 5209 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5210 return buffer->max_data_size; 5211 } 5212 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5213 5214 static void rb_clear_buffer_page(struct buffer_page *page) 5215 { 5216 local_set(&page->write, 0); 5217 local_set(&page->entries, 0); 5218 rb_init_page(page->page); 5219 page->read = 0; 5220 } 5221 5222 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5223 { 5224 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 5225 5226 meta->reader.read = cpu_buffer->reader_page->read; 5227 meta->reader.id = cpu_buffer->reader_page->id; 5228 meta->reader.lost_events = cpu_buffer->lost_events; 5229 5230 meta->entries = local_read(&cpu_buffer->entries); 5231 meta->overrun = local_read(&cpu_buffer->overrun); 5232 meta->read = cpu_buffer->read; 5233 5234 /* Some archs do not have data cache coherency between kernel and user-space */ 5235 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 5236 } 5237 5238 static void 5239 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5240 { 5241 struct buffer_page *page; 5242 5243 rb_head_page_deactivate(cpu_buffer); 5244 5245 cpu_buffer->head_page 5246 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5247 rb_clear_buffer_page(cpu_buffer->head_page); 5248 list_for_each_entry(page, cpu_buffer->pages, list) { 5249 rb_clear_buffer_page(page); 5250 } 5251 5252 cpu_buffer->tail_page = cpu_buffer->head_page; 5253 cpu_buffer->commit_page = cpu_buffer->head_page; 5254 5255 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5256 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5257 rb_clear_buffer_page(cpu_buffer->reader_page); 5258 5259 local_set(&cpu_buffer->entries_bytes, 0); 5260 local_set(&cpu_buffer->overrun, 0); 5261 local_set(&cpu_buffer->commit_overrun, 0); 5262 local_set(&cpu_buffer->dropped_events, 0); 5263 local_set(&cpu_buffer->entries, 0); 5264 local_set(&cpu_buffer->committing, 0); 5265 local_set(&cpu_buffer->commits, 0); 5266 local_set(&cpu_buffer->pages_touched, 0); 5267 local_set(&cpu_buffer->pages_lost, 0); 5268 local_set(&cpu_buffer->pages_read, 0); 5269 cpu_buffer->last_pages_touch = 0; 5270 cpu_buffer->shortest_full = 0; 5271 cpu_buffer->read = 0; 5272 cpu_buffer->read_bytes = 0; 5273 5274 rb_time_set(&cpu_buffer->write_stamp, 0); 5275 rb_time_set(&cpu_buffer->before_stamp, 0); 5276 5277 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5278 5279 cpu_buffer->lost_events = 0; 5280 cpu_buffer->last_overrun = 0; 5281 5282 if (cpu_buffer->mapped) 5283 rb_update_meta_page(cpu_buffer); 5284 5285 rb_head_page_activate(cpu_buffer); 5286 cpu_buffer->pages_removed = 0; 5287 } 5288 5289 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5290 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5291 { 5292 unsigned long flags; 5293 5294 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5295 5296 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5297 goto out; 5298 5299 arch_spin_lock(&cpu_buffer->lock); 5300 5301 rb_reset_cpu(cpu_buffer); 5302 5303 arch_spin_unlock(&cpu_buffer->lock); 5304 5305 out: 5306 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5307 } 5308 5309 /** 5310 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5311 * @buffer: The ring buffer to reset a per cpu buffer of 5312 * @cpu: The CPU buffer to be reset 5313 */ 5314 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5315 { 5316 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5317 5318 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5319 return; 5320 5321 /* prevent another thread from changing buffer sizes */ 5322 mutex_lock(&buffer->mutex); 5323 5324 atomic_inc(&cpu_buffer->resize_disabled); 5325 atomic_inc(&cpu_buffer->record_disabled); 5326 5327 /* Make sure all commits have finished */ 5328 synchronize_rcu(); 5329 5330 reset_disabled_cpu_buffer(cpu_buffer); 5331 5332 atomic_dec(&cpu_buffer->record_disabled); 5333 atomic_dec(&cpu_buffer->resize_disabled); 5334 5335 mutex_unlock(&buffer->mutex); 5336 } 5337 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5338 5339 /* Flag to ensure proper resetting of atomic variables */ 5340 #define RESET_BIT (1 << 30) 5341 5342 /** 5343 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5344 * @buffer: The ring buffer to reset a per cpu buffer of 5345 */ 5346 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5347 { 5348 struct ring_buffer_per_cpu *cpu_buffer; 5349 int cpu; 5350 5351 /* prevent another thread from changing buffer sizes */ 5352 mutex_lock(&buffer->mutex); 5353 5354 for_each_online_buffer_cpu(buffer, cpu) { 5355 cpu_buffer = buffer->buffers[cpu]; 5356 5357 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5358 atomic_inc(&cpu_buffer->record_disabled); 5359 } 5360 5361 /* Make sure all commits have finished */ 5362 synchronize_rcu(); 5363 5364 for_each_buffer_cpu(buffer, cpu) { 5365 cpu_buffer = buffer->buffers[cpu]; 5366 5367 /* 5368 * If a CPU came online during the synchronize_rcu(), then 5369 * ignore it. 5370 */ 5371 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5372 continue; 5373 5374 reset_disabled_cpu_buffer(cpu_buffer); 5375 5376 atomic_dec(&cpu_buffer->record_disabled); 5377 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5378 } 5379 5380 mutex_unlock(&buffer->mutex); 5381 } 5382 5383 /** 5384 * ring_buffer_reset - reset a ring buffer 5385 * @buffer: The ring buffer to reset all cpu buffers 5386 */ 5387 void ring_buffer_reset(struct trace_buffer *buffer) 5388 { 5389 struct ring_buffer_per_cpu *cpu_buffer; 5390 int cpu; 5391 5392 /* prevent another thread from changing buffer sizes */ 5393 mutex_lock(&buffer->mutex); 5394 5395 for_each_buffer_cpu(buffer, cpu) { 5396 cpu_buffer = buffer->buffers[cpu]; 5397 5398 atomic_inc(&cpu_buffer->resize_disabled); 5399 atomic_inc(&cpu_buffer->record_disabled); 5400 } 5401 5402 /* Make sure all commits have finished */ 5403 synchronize_rcu(); 5404 5405 for_each_buffer_cpu(buffer, cpu) { 5406 cpu_buffer = buffer->buffers[cpu]; 5407 5408 reset_disabled_cpu_buffer(cpu_buffer); 5409 5410 atomic_dec(&cpu_buffer->record_disabled); 5411 atomic_dec(&cpu_buffer->resize_disabled); 5412 } 5413 5414 mutex_unlock(&buffer->mutex); 5415 } 5416 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5417 5418 /** 5419 * ring_buffer_empty - is the ring buffer empty? 5420 * @buffer: The ring buffer to test 5421 */ 5422 bool ring_buffer_empty(struct trace_buffer *buffer) 5423 { 5424 struct ring_buffer_per_cpu *cpu_buffer; 5425 unsigned long flags; 5426 bool dolock; 5427 bool ret; 5428 int cpu; 5429 5430 /* yes this is racy, but if you don't like the race, lock the buffer */ 5431 for_each_buffer_cpu(buffer, cpu) { 5432 cpu_buffer = buffer->buffers[cpu]; 5433 local_irq_save(flags); 5434 dolock = rb_reader_lock(cpu_buffer); 5435 ret = rb_per_cpu_empty(cpu_buffer); 5436 rb_reader_unlock(cpu_buffer, dolock); 5437 local_irq_restore(flags); 5438 5439 if (!ret) 5440 return false; 5441 } 5442 5443 return true; 5444 } 5445 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5446 5447 /** 5448 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5449 * @buffer: The ring buffer 5450 * @cpu: The CPU buffer to test 5451 */ 5452 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5453 { 5454 struct ring_buffer_per_cpu *cpu_buffer; 5455 unsigned long flags; 5456 bool dolock; 5457 bool ret; 5458 5459 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5460 return true; 5461 5462 cpu_buffer = buffer->buffers[cpu]; 5463 local_irq_save(flags); 5464 dolock = rb_reader_lock(cpu_buffer); 5465 ret = rb_per_cpu_empty(cpu_buffer); 5466 rb_reader_unlock(cpu_buffer, dolock); 5467 local_irq_restore(flags); 5468 5469 return ret; 5470 } 5471 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5472 5473 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5474 /** 5475 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5476 * @buffer_a: One buffer to swap with 5477 * @buffer_b: The other buffer to swap with 5478 * @cpu: the CPU of the buffers to swap 5479 * 5480 * This function is useful for tracers that want to take a "snapshot" 5481 * of a CPU buffer and has another back up buffer lying around. 5482 * it is expected that the tracer handles the cpu buffer not being 5483 * used at the moment. 5484 */ 5485 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5486 struct trace_buffer *buffer_b, int cpu) 5487 { 5488 struct ring_buffer_per_cpu *cpu_buffer_a; 5489 struct ring_buffer_per_cpu *cpu_buffer_b; 5490 int ret = -EINVAL; 5491 5492 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5493 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5494 goto out; 5495 5496 cpu_buffer_a = buffer_a->buffers[cpu]; 5497 cpu_buffer_b = buffer_b->buffers[cpu]; 5498 5499 /* It's up to the callers to not try to swap mapped buffers */ 5500 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 5501 ret = -EBUSY; 5502 goto out; 5503 } 5504 5505 /* At least make sure the two buffers are somewhat the same */ 5506 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5507 goto out; 5508 5509 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 5510 goto out; 5511 5512 ret = -EAGAIN; 5513 5514 if (atomic_read(&buffer_a->record_disabled)) 5515 goto out; 5516 5517 if (atomic_read(&buffer_b->record_disabled)) 5518 goto out; 5519 5520 if (atomic_read(&cpu_buffer_a->record_disabled)) 5521 goto out; 5522 5523 if (atomic_read(&cpu_buffer_b->record_disabled)) 5524 goto out; 5525 5526 /* 5527 * We can't do a synchronize_rcu here because this 5528 * function can be called in atomic context. 5529 * Normally this will be called from the same CPU as cpu. 5530 * If not it's up to the caller to protect this. 5531 */ 5532 atomic_inc(&cpu_buffer_a->record_disabled); 5533 atomic_inc(&cpu_buffer_b->record_disabled); 5534 5535 ret = -EBUSY; 5536 if (local_read(&cpu_buffer_a->committing)) 5537 goto out_dec; 5538 if (local_read(&cpu_buffer_b->committing)) 5539 goto out_dec; 5540 5541 /* 5542 * When resize is in progress, we cannot swap it because 5543 * it will mess the state of the cpu buffer. 5544 */ 5545 if (atomic_read(&buffer_a->resizing)) 5546 goto out_dec; 5547 if (atomic_read(&buffer_b->resizing)) 5548 goto out_dec; 5549 5550 buffer_a->buffers[cpu] = cpu_buffer_b; 5551 buffer_b->buffers[cpu] = cpu_buffer_a; 5552 5553 cpu_buffer_b->buffer = buffer_a; 5554 cpu_buffer_a->buffer = buffer_b; 5555 5556 ret = 0; 5557 5558 out_dec: 5559 atomic_dec(&cpu_buffer_a->record_disabled); 5560 atomic_dec(&cpu_buffer_b->record_disabled); 5561 out: 5562 return ret; 5563 } 5564 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5565 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5566 5567 /** 5568 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5569 * @buffer: the buffer to allocate for. 5570 * @cpu: the cpu buffer to allocate. 5571 * 5572 * This function is used in conjunction with ring_buffer_read_page. 5573 * When reading a full page from the ring buffer, these functions 5574 * can be used to speed up the process. The calling function should 5575 * allocate a few pages first with this function. Then when it 5576 * needs to get pages from the ring buffer, it passes the result 5577 * of this function into ring_buffer_read_page, which will swap 5578 * the page that was allocated, with the read page of the buffer. 5579 * 5580 * Returns: 5581 * The page allocated, or ERR_PTR 5582 */ 5583 struct buffer_data_read_page * 5584 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5585 { 5586 struct ring_buffer_per_cpu *cpu_buffer; 5587 struct buffer_data_read_page *bpage = NULL; 5588 unsigned long flags; 5589 struct page *page; 5590 5591 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5592 return ERR_PTR(-ENODEV); 5593 5594 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 5595 if (!bpage) 5596 return ERR_PTR(-ENOMEM); 5597 5598 bpage->order = buffer->subbuf_order; 5599 cpu_buffer = buffer->buffers[cpu]; 5600 local_irq_save(flags); 5601 arch_spin_lock(&cpu_buffer->lock); 5602 5603 if (cpu_buffer->free_page) { 5604 bpage->data = cpu_buffer->free_page; 5605 cpu_buffer->free_page = NULL; 5606 } 5607 5608 arch_spin_unlock(&cpu_buffer->lock); 5609 local_irq_restore(flags); 5610 5611 if (bpage->data) 5612 goto out; 5613 5614 page = alloc_pages_node(cpu_to_node(cpu), 5615 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 5616 cpu_buffer->buffer->subbuf_order); 5617 if (!page) { 5618 kfree(bpage); 5619 return ERR_PTR(-ENOMEM); 5620 } 5621 5622 bpage->data = page_address(page); 5623 5624 out: 5625 rb_init_page(bpage->data); 5626 5627 return bpage; 5628 } 5629 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5630 5631 /** 5632 * ring_buffer_free_read_page - free an allocated read page 5633 * @buffer: the buffer the page was allocate for 5634 * @cpu: the cpu buffer the page came from 5635 * @data_page: the page to free 5636 * 5637 * Free a page allocated from ring_buffer_alloc_read_page. 5638 */ 5639 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 5640 struct buffer_data_read_page *data_page) 5641 { 5642 struct ring_buffer_per_cpu *cpu_buffer; 5643 struct buffer_data_page *bpage = data_page->data; 5644 struct page *page = virt_to_page(bpage); 5645 unsigned long flags; 5646 5647 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5648 return; 5649 5650 cpu_buffer = buffer->buffers[cpu]; 5651 5652 /* 5653 * If the page is still in use someplace else, or order of the page 5654 * is different from the subbuffer order of the buffer - 5655 * we can't reuse it 5656 */ 5657 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 5658 goto out; 5659 5660 local_irq_save(flags); 5661 arch_spin_lock(&cpu_buffer->lock); 5662 5663 if (!cpu_buffer->free_page) { 5664 cpu_buffer->free_page = bpage; 5665 bpage = NULL; 5666 } 5667 5668 arch_spin_unlock(&cpu_buffer->lock); 5669 local_irq_restore(flags); 5670 5671 out: 5672 free_pages((unsigned long)bpage, data_page->order); 5673 kfree(data_page); 5674 } 5675 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5676 5677 /** 5678 * ring_buffer_read_page - extract a page from the ring buffer 5679 * @buffer: buffer to extract from 5680 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5681 * @len: amount to extract 5682 * @cpu: the cpu of the buffer to extract 5683 * @full: should the extraction only happen when the page is full. 5684 * 5685 * This function will pull out a page from the ring buffer and consume it. 5686 * @data_page must be the address of the variable that was returned 5687 * from ring_buffer_alloc_read_page. This is because the page might be used 5688 * to swap with a page in the ring buffer. 5689 * 5690 * for example: 5691 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5692 * if (IS_ERR(rpage)) 5693 * return PTR_ERR(rpage); 5694 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 5695 * if (ret >= 0) 5696 * process_page(ring_buffer_read_page_data(rpage), ret); 5697 * ring_buffer_free_read_page(buffer, cpu, rpage); 5698 * 5699 * When @full is set, the function will not return true unless 5700 * the writer is off the reader page. 5701 * 5702 * Note: it is up to the calling functions to handle sleeps and wakeups. 5703 * The ring buffer can be used anywhere in the kernel and can not 5704 * blindly call wake_up. The layer that uses the ring buffer must be 5705 * responsible for that. 5706 * 5707 * Returns: 5708 * >=0 if data has been transferred, returns the offset of consumed data. 5709 * <0 if no data has been transferred. 5710 */ 5711 int ring_buffer_read_page(struct trace_buffer *buffer, 5712 struct buffer_data_read_page *data_page, 5713 size_t len, int cpu, int full) 5714 { 5715 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5716 struct ring_buffer_event *event; 5717 struct buffer_data_page *bpage; 5718 struct buffer_page *reader; 5719 unsigned long missed_events; 5720 unsigned long flags; 5721 unsigned int commit; 5722 unsigned int read; 5723 u64 save_timestamp; 5724 int ret = -1; 5725 5726 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5727 goto out; 5728 5729 /* 5730 * If len is not big enough to hold the page header, then 5731 * we can not copy anything. 5732 */ 5733 if (len <= BUF_PAGE_HDR_SIZE) 5734 goto out; 5735 5736 len -= BUF_PAGE_HDR_SIZE; 5737 5738 if (!data_page || !data_page->data) 5739 goto out; 5740 if (data_page->order != buffer->subbuf_order) 5741 goto out; 5742 5743 bpage = data_page->data; 5744 if (!bpage) 5745 goto out; 5746 5747 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5748 5749 reader = rb_get_reader_page(cpu_buffer); 5750 if (!reader) 5751 goto out_unlock; 5752 5753 event = rb_reader_event(cpu_buffer); 5754 5755 read = reader->read; 5756 commit = rb_page_commit(reader); 5757 5758 /* Check if any events were dropped */ 5759 missed_events = cpu_buffer->lost_events; 5760 5761 /* 5762 * If this page has been partially read or 5763 * if len is not big enough to read the rest of the page or 5764 * a writer is still on the page, then 5765 * we must copy the data from the page to the buffer. 5766 * Otherwise, we can simply swap the page with the one passed in. 5767 */ 5768 if (read || (len < (commit - read)) || 5769 cpu_buffer->reader_page == cpu_buffer->commit_page || 5770 cpu_buffer->mapped) { 5771 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5772 unsigned int rpos = read; 5773 unsigned int pos = 0; 5774 unsigned int size; 5775 5776 /* 5777 * If a full page is expected, this can still be returned 5778 * if there's been a previous partial read and the 5779 * rest of the page can be read and the commit page is off 5780 * the reader page. 5781 */ 5782 if (full && 5783 (!read || (len < (commit - read)) || 5784 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5785 goto out_unlock; 5786 5787 if (len > (commit - read)) 5788 len = (commit - read); 5789 5790 /* Always keep the time extend and data together */ 5791 size = rb_event_ts_length(event); 5792 5793 if (len < size) 5794 goto out_unlock; 5795 5796 /* save the current timestamp, since the user will need it */ 5797 save_timestamp = cpu_buffer->read_stamp; 5798 5799 /* Need to copy one event at a time */ 5800 do { 5801 /* We need the size of one event, because 5802 * rb_advance_reader only advances by one event, 5803 * whereas rb_event_ts_length may include the size of 5804 * one or two events. 5805 * We have already ensured there's enough space if this 5806 * is a time extend. */ 5807 size = rb_event_length(event); 5808 memcpy(bpage->data + pos, rpage->data + rpos, size); 5809 5810 len -= size; 5811 5812 rb_advance_reader(cpu_buffer); 5813 rpos = reader->read; 5814 pos += size; 5815 5816 if (rpos >= commit) 5817 break; 5818 5819 event = rb_reader_event(cpu_buffer); 5820 /* Always keep the time extend and data together */ 5821 size = rb_event_ts_length(event); 5822 } while (len >= size); 5823 5824 /* update bpage */ 5825 local_set(&bpage->commit, pos); 5826 bpage->time_stamp = save_timestamp; 5827 5828 /* we copied everything to the beginning */ 5829 read = 0; 5830 } else { 5831 /* update the entry counter */ 5832 cpu_buffer->read += rb_page_entries(reader); 5833 cpu_buffer->read_bytes += rb_page_commit(reader); 5834 5835 /* swap the pages */ 5836 rb_init_page(bpage); 5837 bpage = reader->page; 5838 reader->page = data_page->data; 5839 local_set(&reader->write, 0); 5840 local_set(&reader->entries, 0); 5841 reader->read = 0; 5842 data_page->data = bpage; 5843 5844 /* 5845 * Use the real_end for the data size, 5846 * This gives us a chance to store the lost events 5847 * on the page. 5848 */ 5849 if (reader->real_end) 5850 local_set(&bpage->commit, reader->real_end); 5851 } 5852 ret = read; 5853 5854 cpu_buffer->lost_events = 0; 5855 5856 commit = local_read(&bpage->commit); 5857 /* 5858 * Set a flag in the commit field if we lost events 5859 */ 5860 if (missed_events) { 5861 /* If there is room at the end of the page to save the 5862 * missed events, then record it there. 5863 */ 5864 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 5865 memcpy(&bpage->data[commit], &missed_events, 5866 sizeof(missed_events)); 5867 local_add(RB_MISSED_STORED, &bpage->commit); 5868 commit += sizeof(missed_events); 5869 } 5870 local_add(RB_MISSED_EVENTS, &bpage->commit); 5871 } 5872 5873 /* 5874 * This page may be off to user land. Zero it out here. 5875 */ 5876 if (commit < buffer->subbuf_size) 5877 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 5878 5879 out_unlock: 5880 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5881 5882 out: 5883 return ret; 5884 } 5885 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5886 5887 /** 5888 * ring_buffer_read_page_data - get pointer to the data in the page. 5889 * @page: the page to get the data from 5890 * 5891 * Returns pointer to the actual data in this page. 5892 */ 5893 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 5894 { 5895 return page->data; 5896 } 5897 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 5898 5899 /** 5900 * ring_buffer_subbuf_size_get - get size of the sub buffer. 5901 * @buffer: the buffer to get the sub buffer size from 5902 * 5903 * Returns size of the sub buffer, in bytes. 5904 */ 5905 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 5906 { 5907 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 5908 } 5909 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 5910 5911 /** 5912 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 5913 * @buffer: The ring_buffer to get the system sub page order from 5914 * 5915 * By default, one ring buffer sub page equals to one system page. This parameter 5916 * is configurable, per ring buffer. The size of the ring buffer sub page can be 5917 * extended, but must be an order of system page size. 5918 * 5919 * Returns the order of buffer sub page size, in system pages: 5920 * 0 means the sub buffer size is 1 system page and so forth. 5921 * In case of an error < 0 is returned. 5922 */ 5923 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 5924 { 5925 if (!buffer) 5926 return -EINVAL; 5927 5928 return buffer->subbuf_order; 5929 } 5930 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 5931 5932 /** 5933 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 5934 * @buffer: The ring_buffer to set the new page size. 5935 * @order: Order of the system pages in one sub buffer page 5936 * 5937 * By default, one ring buffer pages equals to one system page. This API can be 5938 * used to set new size of the ring buffer page. The size must be order of 5939 * system page size, that's why the input parameter @order is the order of 5940 * system pages that are allocated for one ring buffer page: 5941 * 0 - 1 system page 5942 * 1 - 2 system pages 5943 * 3 - 4 system pages 5944 * ... 5945 * 5946 * Returns 0 on success or < 0 in case of an error. 5947 */ 5948 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 5949 { 5950 struct ring_buffer_per_cpu *cpu_buffer; 5951 struct buffer_page *bpage, *tmp; 5952 int old_order, old_size; 5953 int nr_pages; 5954 int psize; 5955 int err; 5956 int cpu; 5957 5958 if (!buffer || order < 0) 5959 return -EINVAL; 5960 5961 if (buffer->subbuf_order == order) 5962 return 0; 5963 5964 psize = (1 << order) * PAGE_SIZE; 5965 if (psize <= BUF_PAGE_HDR_SIZE) 5966 return -EINVAL; 5967 5968 /* Size of a subbuf cannot be greater than the write counter */ 5969 if (psize > RB_WRITE_MASK + 1) 5970 return -EINVAL; 5971 5972 old_order = buffer->subbuf_order; 5973 old_size = buffer->subbuf_size; 5974 5975 /* prevent another thread from changing buffer sizes */ 5976 mutex_lock(&buffer->mutex); 5977 atomic_inc(&buffer->record_disabled); 5978 5979 /* Make sure all commits have finished */ 5980 synchronize_rcu(); 5981 5982 buffer->subbuf_order = order; 5983 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 5984 5985 /* Make sure all new buffers are allocated, before deleting the old ones */ 5986 for_each_buffer_cpu(buffer, cpu) { 5987 5988 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5989 continue; 5990 5991 cpu_buffer = buffer->buffers[cpu]; 5992 5993 if (cpu_buffer->mapped) { 5994 err = -EBUSY; 5995 goto error; 5996 } 5997 5998 /* Update the number of pages to match the new size */ 5999 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6000 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6001 6002 /* we need a minimum of two pages */ 6003 if (nr_pages < 2) 6004 nr_pages = 2; 6005 6006 cpu_buffer->nr_pages_to_update = nr_pages; 6007 6008 /* Include the reader page */ 6009 nr_pages++; 6010 6011 /* Allocate the new size buffer */ 6012 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6013 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6014 &cpu_buffer->new_pages)) { 6015 /* not enough memory for new pages */ 6016 err = -ENOMEM; 6017 goto error; 6018 } 6019 } 6020 6021 for_each_buffer_cpu(buffer, cpu) { 6022 6023 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6024 continue; 6025 6026 cpu_buffer = buffer->buffers[cpu]; 6027 6028 /* Clear the head bit to make the link list normal to read */ 6029 rb_head_page_deactivate(cpu_buffer); 6030 6031 /* Now walk the list and free all the old sub buffers */ 6032 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) { 6033 list_del_init(&bpage->list); 6034 free_buffer_page(bpage); 6035 } 6036 /* The above loop stopped an the last page needing to be freed */ 6037 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list); 6038 free_buffer_page(bpage); 6039 6040 /* Free the current reader page */ 6041 free_buffer_page(cpu_buffer->reader_page); 6042 6043 /* One page was allocated for the reader page */ 6044 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6045 struct buffer_page, list); 6046 list_del_init(&cpu_buffer->reader_page->list); 6047 6048 /* The cpu_buffer pages are a link list with no head */ 6049 cpu_buffer->pages = cpu_buffer->new_pages.next; 6050 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev; 6051 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next; 6052 6053 /* Clear the new_pages list */ 6054 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6055 6056 cpu_buffer->head_page 6057 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6058 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6059 6060 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6061 cpu_buffer->nr_pages_to_update = 0; 6062 6063 free_pages((unsigned long)cpu_buffer->free_page, old_order); 6064 cpu_buffer->free_page = NULL; 6065 6066 rb_head_page_activate(cpu_buffer); 6067 6068 rb_check_pages(cpu_buffer); 6069 } 6070 6071 atomic_dec(&buffer->record_disabled); 6072 mutex_unlock(&buffer->mutex); 6073 6074 return 0; 6075 6076 error: 6077 buffer->subbuf_order = old_order; 6078 buffer->subbuf_size = old_size; 6079 6080 atomic_dec(&buffer->record_disabled); 6081 mutex_unlock(&buffer->mutex); 6082 6083 for_each_buffer_cpu(buffer, cpu) { 6084 cpu_buffer = buffer->buffers[cpu]; 6085 6086 if (!cpu_buffer->nr_pages_to_update) 6087 continue; 6088 6089 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6090 list_del_init(&bpage->list); 6091 free_buffer_page(bpage); 6092 } 6093 } 6094 6095 return err; 6096 } 6097 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6098 6099 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6100 { 6101 struct page *page; 6102 6103 if (cpu_buffer->meta_page) 6104 return 0; 6105 6106 page = alloc_page(GFP_USER | __GFP_ZERO); 6107 if (!page) 6108 return -ENOMEM; 6109 6110 cpu_buffer->meta_page = page_to_virt(page); 6111 6112 return 0; 6113 } 6114 6115 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6116 { 6117 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6118 6119 free_page(addr); 6120 cpu_buffer->meta_page = NULL; 6121 } 6122 6123 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6124 unsigned long *subbuf_ids) 6125 { 6126 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6127 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6128 struct buffer_page *first_subbuf, *subbuf; 6129 int id = 0; 6130 6131 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6132 cpu_buffer->reader_page->id = id++; 6133 6134 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6135 do { 6136 if (WARN_ON(id >= nr_subbufs)) 6137 break; 6138 6139 subbuf_ids[id] = (unsigned long)subbuf->page; 6140 subbuf->id = id; 6141 6142 rb_inc_page(&subbuf); 6143 id++; 6144 } while (subbuf != first_subbuf); 6145 6146 /* install subbuf ID to kern VA translation */ 6147 cpu_buffer->subbuf_ids = subbuf_ids; 6148 6149 meta->meta_page_size = PAGE_SIZE; 6150 meta->meta_struct_len = sizeof(*meta); 6151 meta->nr_subbufs = nr_subbufs; 6152 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6153 6154 rb_update_meta_page(cpu_buffer); 6155 } 6156 6157 static struct ring_buffer_per_cpu * 6158 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6159 { 6160 struct ring_buffer_per_cpu *cpu_buffer; 6161 6162 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6163 return ERR_PTR(-EINVAL); 6164 6165 cpu_buffer = buffer->buffers[cpu]; 6166 6167 mutex_lock(&cpu_buffer->mapping_lock); 6168 6169 if (!cpu_buffer->mapped) { 6170 mutex_unlock(&cpu_buffer->mapping_lock); 6171 return ERR_PTR(-ENODEV); 6172 } 6173 6174 return cpu_buffer; 6175 } 6176 6177 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6178 { 6179 mutex_unlock(&cpu_buffer->mapping_lock); 6180 } 6181 6182 /* 6183 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 6184 * to be set-up or torn-down. 6185 */ 6186 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 6187 bool inc) 6188 { 6189 unsigned long flags; 6190 6191 lockdep_assert_held(&cpu_buffer->mapping_lock); 6192 6193 if (inc && cpu_buffer->mapped == UINT_MAX) 6194 return -EBUSY; 6195 6196 if (WARN_ON(!inc && cpu_buffer->mapped == 0)) 6197 return -EINVAL; 6198 6199 mutex_lock(&cpu_buffer->buffer->mutex); 6200 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6201 6202 if (inc) 6203 cpu_buffer->mapped++; 6204 else 6205 cpu_buffer->mapped--; 6206 6207 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6208 mutex_unlock(&cpu_buffer->buffer->mutex); 6209 6210 return 0; 6211 } 6212 6213 /* 6214 * +--------------+ pgoff == 0 6215 * | meta page | 6216 * +--------------+ pgoff == 1 6217 * | subbuffer 0 | 6218 * | | 6219 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 6220 * | subbuffer 1 | 6221 * | | 6222 * ... 6223 */ 6224 #ifdef CONFIG_MMU 6225 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6226 struct vm_area_struct *vma) 6227 { 6228 unsigned long nr_subbufs, nr_pages, vma_pages, pgoff = vma->vm_pgoff; 6229 unsigned int subbuf_pages, subbuf_order; 6230 struct page **pages; 6231 int p = 0, s = 0; 6232 int err; 6233 6234 /* Refuse MP_PRIVATE or writable mappings */ 6235 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 6236 !(vma->vm_flags & VM_MAYSHARE)) 6237 return -EPERM; 6238 6239 /* 6240 * Make sure the mapping cannot become writable later. Also tell the VM 6241 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 6242 */ 6243 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 6244 VM_MAYWRITE); 6245 6246 lockdep_assert_held(&cpu_buffer->mapping_lock); 6247 6248 subbuf_order = cpu_buffer->buffer->subbuf_order; 6249 subbuf_pages = 1 << subbuf_order; 6250 6251 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 6252 nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */ 6253 6254 vma_pages = (vma->vm_end - vma->vm_start) >> PAGE_SHIFT; 6255 if (!vma_pages || vma_pages > nr_pages) 6256 return -EINVAL; 6257 6258 nr_pages = vma_pages; 6259 6260 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 6261 if (!pages) 6262 return -ENOMEM; 6263 6264 if (!pgoff) { 6265 pages[p++] = virt_to_page(cpu_buffer->meta_page); 6266 6267 /* 6268 * TODO: Align sub-buffers on their size, once 6269 * vm_insert_pages() supports the zero-page. 6270 */ 6271 } else { 6272 /* Skip the meta-page */ 6273 pgoff--; 6274 6275 if (pgoff % subbuf_pages) { 6276 err = -EINVAL; 6277 goto out; 6278 } 6279 6280 s += pgoff / subbuf_pages; 6281 } 6282 6283 while (p < nr_pages) { 6284 struct page *page = virt_to_page(cpu_buffer->subbuf_ids[s]); 6285 int off = 0; 6286 6287 if (WARN_ON_ONCE(s >= nr_subbufs)) { 6288 err = -EINVAL; 6289 goto out; 6290 } 6291 6292 for (; off < (1 << (subbuf_order)); off++, page++) { 6293 if (p >= nr_pages) 6294 break; 6295 6296 pages[p++] = page; 6297 } 6298 s++; 6299 } 6300 6301 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 6302 6303 out: 6304 kfree(pages); 6305 6306 return err; 6307 } 6308 #else 6309 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6310 struct vm_area_struct *vma) 6311 { 6312 return -EOPNOTSUPP; 6313 } 6314 #endif 6315 6316 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 6317 struct vm_area_struct *vma) 6318 { 6319 struct ring_buffer_per_cpu *cpu_buffer; 6320 unsigned long flags, *subbuf_ids; 6321 int err = 0; 6322 6323 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6324 return -EINVAL; 6325 6326 cpu_buffer = buffer->buffers[cpu]; 6327 6328 mutex_lock(&cpu_buffer->mapping_lock); 6329 6330 if (cpu_buffer->mapped) { 6331 err = __rb_map_vma(cpu_buffer, vma); 6332 if (!err) 6333 err = __rb_inc_dec_mapped(cpu_buffer, true); 6334 mutex_unlock(&cpu_buffer->mapping_lock); 6335 return err; 6336 } 6337 6338 /* prevent another thread from changing buffer/sub-buffer sizes */ 6339 mutex_lock(&buffer->mutex); 6340 6341 err = rb_alloc_meta_page(cpu_buffer); 6342 if (err) 6343 goto unlock; 6344 6345 /* subbuf_ids include the reader while nr_pages does not */ 6346 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 6347 if (!subbuf_ids) { 6348 rb_free_meta_page(cpu_buffer); 6349 err = -ENOMEM; 6350 goto unlock; 6351 } 6352 6353 atomic_inc(&cpu_buffer->resize_disabled); 6354 6355 /* 6356 * Lock all readers to block any subbuf swap until the subbuf IDs are 6357 * assigned. 6358 */ 6359 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6360 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 6361 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6362 6363 err = __rb_map_vma(cpu_buffer, vma); 6364 if (!err) { 6365 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6366 cpu_buffer->mapped = 1; 6367 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6368 } else { 6369 kfree(cpu_buffer->subbuf_ids); 6370 cpu_buffer->subbuf_ids = NULL; 6371 rb_free_meta_page(cpu_buffer); 6372 } 6373 6374 unlock: 6375 mutex_unlock(&buffer->mutex); 6376 mutex_unlock(&cpu_buffer->mapping_lock); 6377 6378 return err; 6379 } 6380 6381 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 6382 { 6383 struct ring_buffer_per_cpu *cpu_buffer; 6384 unsigned long flags; 6385 int err = 0; 6386 6387 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6388 return -EINVAL; 6389 6390 cpu_buffer = buffer->buffers[cpu]; 6391 6392 mutex_lock(&cpu_buffer->mapping_lock); 6393 6394 if (!cpu_buffer->mapped) { 6395 err = -ENODEV; 6396 goto out; 6397 } else if (cpu_buffer->mapped > 1) { 6398 __rb_inc_dec_mapped(cpu_buffer, false); 6399 goto out; 6400 } 6401 6402 mutex_lock(&buffer->mutex); 6403 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6404 6405 cpu_buffer->mapped = 0; 6406 6407 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6408 6409 kfree(cpu_buffer->subbuf_ids); 6410 cpu_buffer->subbuf_ids = NULL; 6411 rb_free_meta_page(cpu_buffer); 6412 atomic_dec(&cpu_buffer->resize_disabled); 6413 6414 mutex_unlock(&buffer->mutex); 6415 6416 out: 6417 mutex_unlock(&cpu_buffer->mapping_lock); 6418 6419 return err; 6420 } 6421 6422 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 6423 { 6424 struct ring_buffer_per_cpu *cpu_buffer; 6425 unsigned long reader_size; 6426 unsigned long flags; 6427 6428 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 6429 if (IS_ERR(cpu_buffer)) 6430 return (int)PTR_ERR(cpu_buffer); 6431 6432 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6433 6434 consume: 6435 if (rb_per_cpu_empty(cpu_buffer)) 6436 goto out; 6437 6438 reader_size = rb_page_size(cpu_buffer->reader_page); 6439 6440 /* 6441 * There are data to be read on the current reader page, we can 6442 * return to the caller. But before that, we assume the latter will read 6443 * everything. Let's update the kernel reader accordingly. 6444 */ 6445 if (cpu_buffer->reader_page->read < reader_size) { 6446 while (cpu_buffer->reader_page->read < reader_size) 6447 rb_advance_reader(cpu_buffer); 6448 goto out; 6449 } 6450 6451 if (WARN_ON(!rb_get_reader_page(cpu_buffer))) 6452 goto out; 6453 6454 goto consume; 6455 6456 out: 6457 /* Some archs do not have data cache coherency between kernel and user-space */ 6458 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 6459 6460 rb_update_meta_page(cpu_buffer); 6461 6462 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6463 rb_put_mapped_buffer(cpu_buffer); 6464 6465 return 0; 6466 } 6467 6468 /* 6469 * We only allocate new buffers, never free them if the CPU goes down. 6470 * If we were to free the buffer, then the user would lose any trace that was in 6471 * the buffer. 6472 */ 6473 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 6474 { 6475 struct trace_buffer *buffer; 6476 long nr_pages_same; 6477 int cpu_i; 6478 unsigned long nr_pages; 6479 6480 buffer = container_of(node, struct trace_buffer, node); 6481 if (cpumask_test_cpu(cpu, buffer->cpumask)) 6482 return 0; 6483 6484 nr_pages = 0; 6485 nr_pages_same = 1; 6486 /* check if all cpu sizes are same */ 6487 for_each_buffer_cpu(buffer, cpu_i) { 6488 /* fill in the size from first enabled cpu */ 6489 if (nr_pages == 0) 6490 nr_pages = buffer->buffers[cpu_i]->nr_pages; 6491 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 6492 nr_pages_same = 0; 6493 break; 6494 } 6495 } 6496 /* allocate minimum pages, user can later expand it */ 6497 if (!nr_pages_same) 6498 nr_pages = 2; 6499 buffer->buffers[cpu] = 6500 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 6501 if (!buffer->buffers[cpu]) { 6502 WARN(1, "failed to allocate ring buffer on CPU %u\n", 6503 cpu); 6504 return -ENOMEM; 6505 } 6506 smp_wmb(); 6507 cpumask_set_cpu(cpu, buffer->cpumask); 6508 return 0; 6509 } 6510 6511 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 6512 /* 6513 * This is a basic integrity check of the ring buffer. 6514 * Late in the boot cycle this test will run when configured in. 6515 * It will kick off a thread per CPU that will go into a loop 6516 * writing to the per cpu ring buffer various sizes of data. 6517 * Some of the data will be large items, some small. 6518 * 6519 * Another thread is created that goes into a spin, sending out 6520 * IPIs to the other CPUs to also write into the ring buffer. 6521 * this is to test the nesting ability of the buffer. 6522 * 6523 * Basic stats are recorded and reported. If something in the 6524 * ring buffer should happen that's not expected, a big warning 6525 * is displayed and all ring buffers are disabled. 6526 */ 6527 static struct task_struct *rb_threads[NR_CPUS] __initdata; 6528 6529 struct rb_test_data { 6530 struct trace_buffer *buffer; 6531 unsigned long events; 6532 unsigned long bytes_written; 6533 unsigned long bytes_alloc; 6534 unsigned long bytes_dropped; 6535 unsigned long events_nested; 6536 unsigned long bytes_written_nested; 6537 unsigned long bytes_alloc_nested; 6538 unsigned long bytes_dropped_nested; 6539 int min_size_nested; 6540 int max_size_nested; 6541 int max_size; 6542 int min_size; 6543 int cpu; 6544 int cnt; 6545 }; 6546 6547 static struct rb_test_data rb_data[NR_CPUS] __initdata; 6548 6549 /* 1 meg per cpu */ 6550 #define RB_TEST_BUFFER_SIZE 1048576 6551 6552 static char rb_string[] __initdata = 6553 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 6554 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 6555 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 6556 6557 static bool rb_test_started __initdata; 6558 6559 struct rb_item { 6560 int size; 6561 char str[]; 6562 }; 6563 6564 static __init int rb_write_something(struct rb_test_data *data, bool nested) 6565 { 6566 struct ring_buffer_event *event; 6567 struct rb_item *item; 6568 bool started; 6569 int event_len; 6570 int size; 6571 int len; 6572 int cnt; 6573 6574 /* Have nested writes different that what is written */ 6575 cnt = data->cnt + (nested ? 27 : 0); 6576 6577 /* Multiply cnt by ~e, to make some unique increment */ 6578 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 6579 6580 len = size + sizeof(struct rb_item); 6581 6582 started = rb_test_started; 6583 /* read rb_test_started before checking buffer enabled */ 6584 smp_rmb(); 6585 6586 event = ring_buffer_lock_reserve(data->buffer, len); 6587 if (!event) { 6588 /* Ignore dropped events before test starts. */ 6589 if (started) { 6590 if (nested) 6591 data->bytes_dropped += len; 6592 else 6593 data->bytes_dropped_nested += len; 6594 } 6595 return len; 6596 } 6597 6598 event_len = ring_buffer_event_length(event); 6599 6600 if (RB_WARN_ON(data->buffer, event_len < len)) 6601 goto out; 6602 6603 item = ring_buffer_event_data(event); 6604 item->size = size; 6605 memcpy(item->str, rb_string, size); 6606 6607 if (nested) { 6608 data->bytes_alloc_nested += event_len; 6609 data->bytes_written_nested += len; 6610 data->events_nested++; 6611 if (!data->min_size_nested || len < data->min_size_nested) 6612 data->min_size_nested = len; 6613 if (len > data->max_size_nested) 6614 data->max_size_nested = len; 6615 } else { 6616 data->bytes_alloc += event_len; 6617 data->bytes_written += len; 6618 data->events++; 6619 if (!data->min_size || len < data->min_size) 6620 data->max_size = len; 6621 if (len > data->max_size) 6622 data->max_size = len; 6623 } 6624 6625 out: 6626 ring_buffer_unlock_commit(data->buffer); 6627 6628 return 0; 6629 } 6630 6631 static __init int rb_test(void *arg) 6632 { 6633 struct rb_test_data *data = arg; 6634 6635 while (!kthread_should_stop()) { 6636 rb_write_something(data, false); 6637 data->cnt++; 6638 6639 set_current_state(TASK_INTERRUPTIBLE); 6640 /* Now sleep between a min of 100-300us and a max of 1ms */ 6641 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6642 } 6643 6644 return 0; 6645 } 6646 6647 static __init void rb_ipi(void *ignore) 6648 { 6649 struct rb_test_data *data; 6650 int cpu = smp_processor_id(); 6651 6652 data = &rb_data[cpu]; 6653 rb_write_something(data, true); 6654 } 6655 6656 static __init int rb_hammer_test(void *arg) 6657 { 6658 while (!kthread_should_stop()) { 6659 6660 /* Send an IPI to all cpus to write data! */ 6661 smp_call_function(rb_ipi, NULL, 1); 6662 /* No sleep, but for non preempt, let others run */ 6663 schedule(); 6664 } 6665 6666 return 0; 6667 } 6668 6669 static __init int test_ringbuffer(void) 6670 { 6671 struct task_struct *rb_hammer; 6672 struct trace_buffer *buffer; 6673 int cpu; 6674 int ret = 0; 6675 6676 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6677 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6678 return 0; 6679 } 6680 6681 pr_info("Running ring buffer tests...\n"); 6682 6683 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6684 if (WARN_ON(!buffer)) 6685 return 0; 6686 6687 /* Disable buffer so that threads can't write to it yet */ 6688 ring_buffer_record_off(buffer); 6689 6690 for_each_online_cpu(cpu) { 6691 rb_data[cpu].buffer = buffer; 6692 rb_data[cpu].cpu = cpu; 6693 rb_data[cpu].cnt = cpu; 6694 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6695 cpu, "rbtester/%u"); 6696 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6697 pr_cont("FAILED\n"); 6698 ret = PTR_ERR(rb_threads[cpu]); 6699 goto out_free; 6700 } 6701 } 6702 6703 /* Now create the rb hammer! */ 6704 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6705 if (WARN_ON(IS_ERR(rb_hammer))) { 6706 pr_cont("FAILED\n"); 6707 ret = PTR_ERR(rb_hammer); 6708 goto out_free; 6709 } 6710 6711 ring_buffer_record_on(buffer); 6712 /* 6713 * Show buffer is enabled before setting rb_test_started. 6714 * Yes there's a small race window where events could be 6715 * dropped and the thread wont catch it. But when a ring 6716 * buffer gets enabled, there will always be some kind of 6717 * delay before other CPUs see it. Thus, we don't care about 6718 * those dropped events. We care about events dropped after 6719 * the threads see that the buffer is active. 6720 */ 6721 smp_wmb(); 6722 rb_test_started = true; 6723 6724 set_current_state(TASK_INTERRUPTIBLE); 6725 /* Just run for 10 seconds */; 6726 schedule_timeout(10 * HZ); 6727 6728 kthread_stop(rb_hammer); 6729 6730 out_free: 6731 for_each_online_cpu(cpu) { 6732 if (!rb_threads[cpu]) 6733 break; 6734 kthread_stop(rb_threads[cpu]); 6735 } 6736 if (ret) { 6737 ring_buffer_free(buffer); 6738 return ret; 6739 } 6740 6741 /* Report! */ 6742 pr_info("finished\n"); 6743 for_each_online_cpu(cpu) { 6744 struct ring_buffer_event *event; 6745 struct rb_test_data *data = &rb_data[cpu]; 6746 struct rb_item *item; 6747 unsigned long total_events; 6748 unsigned long total_dropped; 6749 unsigned long total_written; 6750 unsigned long total_alloc; 6751 unsigned long total_read = 0; 6752 unsigned long total_size = 0; 6753 unsigned long total_len = 0; 6754 unsigned long total_lost = 0; 6755 unsigned long lost; 6756 int big_event_size; 6757 int small_event_size; 6758 6759 ret = -1; 6760 6761 total_events = data->events + data->events_nested; 6762 total_written = data->bytes_written + data->bytes_written_nested; 6763 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6764 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6765 6766 big_event_size = data->max_size + data->max_size_nested; 6767 small_event_size = data->min_size + data->min_size_nested; 6768 6769 pr_info("CPU %d:\n", cpu); 6770 pr_info(" events: %ld\n", total_events); 6771 pr_info(" dropped bytes: %ld\n", total_dropped); 6772 pr_info(" alloced bytes: %ld\n", total_alloc); 6773 pr_info(" written bytes: %ld\n", total_written); 6774 pr_info(" biggest event: %d\n", big_event_size); 6775 pr_info(" smallest event: %d\n", small_event_size); 6776 6777 if (RB_WARN_ON(buffer, total_dropped)) 6778 break; 6779 6780 ret = 0; 6781 6782 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6783 total_lost += lost; 6784 item = ring_buffer_event_data(event); 6785 total_len += ring_buffer_event_length(event); 6786 total_size += item->size + sizeof(struct rb_item); 6787 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6788 pr_info("FAILED!\n"); 6789 pr_info("buffer had: %.*s\n", item->size, item->str); 6790 pr_info("expected: %.*s\n", item->size, rb_string); 6791 RB_WARN_ON(buffer, 1); 6792 ret = -1; 6793 break; 6794 } 6795 total_read++; 6796 } 6797 if (ret) 6798 break; 6799 6800 ret = -1; 6801 6802 pr_info(" read events: %ld\n", total_read); 6803 pr_info(" lost events: %ld\n", total_lost); 6804 pr_info(" total events: %ld\n", total_lost + total_read); 6805 pr_info(" recorded len bytes: %ld\n", total_len); 6806 pr_info(" recorded size bytes: %ld\n", total_size); 6807 if (total_lost) { 6808 pr_info(" With dropped events, record len and size may not match\n" 6809 " alloced and written from above\n"); 6810 } else { 6811 if (RB_WARN_ON(buffer, total_len != total_alloc || 6812 total_size != total_written)) 6813 break; 6814 } 6815 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6816 break; 6817 6818 ret = 0; 6819 } 6820 if (!ret) 6821 pr_info("Ring buffer PASSED!\n"); 6822 6823 ring_buffer_free(buffer); 6824 return 0; 6825 } 6826 6827 late_initcall(test_ringbuffer); 6828 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6829