1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 35 /* 36 * The "absolute" timestamp in the buffer is only 59 bits. 37 * If a clock has the 5 MSBs set, it needs to be saved and 38 * reinserted. 39 */ 40 #define TS_MSB (0xf8ULL << 56) 41 #define ABS_TS_MASK (~TS_MSB) 42 43 static void update_pages_handler(struct work_struct *work); 44 45 /* 46 * The ring buffer header is special. We must manually up keep it. 47 */ 48 int ring_buffer_print_entry_header(struct trace_seq *s) 49 { 50 trace_seq_puts(s, "# compressed entry header\n"); 51 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 52 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 53 trace_seq_puts(s, "\tarray : 32 bits\n"); 54 trace_seq_putc(s, '\n'); 55 trace_seq_printf(s, "\tpadding : type == %d\n", 56 RINGBUF_TYPE_PADDING); 57 trace_seq_printf(s, "\ttime_extend : type == %d\n", 58 RINGBUF_TYPE_TIME_EXTEND); 59 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 60 RINGBUF_TYPE_TIME_STAMP); 61 trace_seq_printf(s, "\tdata max type_len == %d\n", 62 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 63 64 return !trace_seq_has_overflowed(s); 65 } 66 67 /* 68 * The ring buffer is made up of a list of pages. A separate list of pages is 69 * allocated for each CPU. A writer may only write to a buffer that is 70 * associated with the CPU it is currently executing on. A reader may read 71 * from any per cpu buffer. 72 * 73 * The reader is special. For each per cpu buffer, the reader has its own 74 * reader page. When a reader has read the entire reader page, this reader 75 * page is swapped with another page in the ring buffer. 76 * 77 * Now, as long as the writer is off the reader page, the reader can do what 78 * ever it wants with that page. The writer will never write to that page 79 * again (as long as it is out of the ring buffer). 80 * 81 * Here's some silly ASCII art. 82 * 83 * +------+ 84 * |reader| RING BUFFER 85 * |page | 86 * +------+ +---+ +---+ +---+ 87 * | |-->| |-->| | 88 * +---+ +---+ +---+ 89 * ^ | 90 * | | 91 * +---------------+ 92 * 93 * 94 * +------+ 95 * |reader| RING BUFFER 96 * |page |------------------v 97 * +------+ +---+ +---+ +---+ 98 * | |-->| |-->| | 99 * +---+ +---+ +---+ 100 * ^ | 101 * | | 102 * +---------------+ 103 * 104 * 105 * +------+ 106 * |reader| RING BUFFER 107 * |page |------------------v 108 * +------+ +---+ +---+ +---+ 109 * ^ | |-->| |-->| | 110 * | +---+ +---+ +---+ 111 * | | 112 * | | 113 * +------------------------------+ 114 * 115 * 116 * +------+ 117 * |buffer| RING BUFFER 118 * |page |------------------v 119 * +------+ +---+ +---+ +---+ 120 * ^ | | | |-->| | 121 * | New +---+ +---+ +---+ 122 * | Reader------^ | 123 * | page | 124 * +------------------------------+ 125 * 126 * 127 * After we make this swap, the reader can hand this page off to the splice 128 * code and be done with it. It can even allocate a new page if it needs to 129 * and swap that into the ring buffer. 130 * 131 * We will be using cmpxchg soon to make all this lockless. 132 * 133 */ 134 135 /* Used for individual buffers (after the counter) */ 136 #define RB_BUFFER_OFF (1 << 20) 137 138 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 139 140 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 141 #define RB_ALIGNMENT 4U 142 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 143 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 144 145 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 146 # define RB_FORCE_8BYTE_ALIGNMENT 0 147 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 148 #else 149 # define RB_FORCE_8BYTE_ALIGNMENT 1 150 # define RB_ARCH_ALIGNMENT 8U 151 #endif 152 153 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 154 155 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 156 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 157 158 enum { 159 RB_LEN_TIME_EXTEND = 8, 160 RB_LEN_TIME_STAMP = 8, 161 }; 162 163 #define skip_time_extend(event) \ 164 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 165 166 #define extended_time(event) \ 167 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 168 169 static inline bool rb_null_event(struct ring_buffer_event *event) 170 { 171 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 172 } 173 174 static void rb_event_set_padding(struct ring_buffer_event *event) 175 { 176 /* padding has a NULL time_delta */ 177 event->type_len = RINGBUF_TYPE_PADDING; 178 event->time_delta = 0; 179 } 180 181 static unsigned 182 rb_event_data_length(struct ring_buffer_event *event) 183 { 184 unsigned length; 185 186 if (event->type_len) 187 length = event->type_len * RB_ALIGNMENT; 188 else 189 length = event->array[0]; 190 return length + RB_EVNT_HDR_SIZE; 191 } 192 193 /* 194 * Return the length of the given event. Will return 195 * the length of the time extend if the event is a 196 * time extend. 197 */ 198 static inline unsigned 199 rb_event_length(struct ring_buffer_event *event) 200 { 201 switch (event->type_len) { 202 case RINGBUF_TYPE_PADDING: 203 if (rb_null_event(event)) 204 /* undefined */ 205 return -1; 206 return event->array[0] + RB_EVNT_HDR_SIZE; 207 208 case RINGBUF_TYPE_TIME_EXTEND: 209 return RB_LEN_TIME_EXTEND; 210 211 case RINGBUF_TYPE_TIME_STAMP: 212 return RB_LEN_TIME_STAMP; 213 214 case RINGBUF_TYPE_DATA: 215 return rb_event_data_length(event); 216 default: 217 WARN_ON_ONCE(1); 218 } 219 /* not hit */ 220 return 0; 221 } 222 223 /* 224 * Return total length of time extend and data, 225 * or just the event length for all other events. 226 */ 227 static inline unsigned 228 rb_event_ts_length(struct ring_buffer_event *event) 229 { 230 unsigned len = 0; 231 232 if (extended_time(event)) { 233 /* time extends include the data event after it */ 234 len = RB_LEN_TIME_EXTEND; 235 event = skip_time_extend(event); 236 } 237 return len + rb_event_length(event); 238 } 239 240 /** 241 * ring_buffer_event_length - return the length of the event 242 * @event: the event to get the length of 243 * 244 * Returns the size of the data load of a data event. 245 * If the event is something other than a data event, it 246 * returns the size of the event itself. With the exception 247 * of a TIME EXTEND, where it still returns the size of the 248 * data load of the data event after it. 249 */ 250 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 251 { 252 unsigned length; 253 254 if (extended_time(event)) 255 event = skip_time_extend(event); 256 257 length = rb_event_length(event); 258 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 259 return length; 260 length -= RB_EVNT_HDR_SIZE; 261 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 262 length -= sizeof(event->array[0]); 263 return length; 264 } 265 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 266 267 /* inline for ring buffer fast paths */ 268 static __always_inline void * 269 rb_event_data(struct ring_buffer_event *event) 270 { 271 if (extended_time(event)) 272 event = skip_time_extend(event); 273 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 274 /* If length is in len field, then array[0] has the data */ 275 if (event->type_len) 276 return (void *)&event->array[0]; 277 /* Otherwise length is in array[0] and array[1] has the data */ 278 return (void *)&event->array[1]; 279 } 280 281 /** 282 * ring_buffer_event_data - return the data of the event 283 * @event: the event to get the data from 284 */ 285 void *ring_buffer_event_data(struct ring_buffer_event *event) 286 { 287 return rb_event_data(event); 288 } 289 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 290 291 #define for_each_buffer_cpu(buffer, cpu) \ 292 for_each_cpu(cpu, buffer->cpumask) 293 294 #define for_each_online_buffer_cpu(buffer, cpu) \ 295 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 296 297 #define TS_SHIFT 27 298 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 299 #define TS_DELTA_TEST (~TS_MASK) 300 301 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 302 { 303 u64 ts; 304 305 ts = event->array[0]; 306 ts <<= TS_SHIFT; 307 ts += event->time_delta; 308 309 return ts; 310 } 311 312 /* Flag when events were overwritten */ 313 #define RB_MISSED_EVENTS (1 << 31) 314 /* Missed count stored at end */ 315 #define RB_MISSED_STORED (1 << 30) 316 317 #define RB_MISSED_MASK (3 << 30) 318 319 struct buffer_data_page { 320 u64 time_stamp; /* page time stamp */ 321 local_t commit; /* write committed index */ 322 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 323 }; 324 325 struct buffer_data_read_page { 326 unsigned order; /* order of the page */ 327 struct buffer_data_page *data; /* actual data, stored in this page */ 328 }; 329 330 /* 331 * Note, the buffer_page list must be first. The buffer pages 332 * are allocated in cache lines, which means that each buffer 333 * page will be at the beginning of a cache line, and thus 334 * the least significant bits will be zero. We use this to 335 * add flags in the list struct pointers, to make the ring buffer 336 * lockless. 337 */ 338 struct buffer_page { 339 struct list_head list; /* list of buffer pages */ 340 local_t write; /* index for next write */ 341 unsigned read; /* index for next read */ 342 local_t entries; /* entries on this page */ 343 unsigned long real_end; /* real end of data */ 344 unsigned order; /* order of the page */ 345 u32 id; /* ID for external mapping */ 346 struct buffer_data_page *page; /* Actual data page */ 347 }; 348 349 /* 350 * The buffer page counters, write and entries, must be reset 351 * atomically when crossing page boundaries. To synchronize this 352 * update, two counters are inserted into the number. One is 353 * the actual counter for the write position or count on the page. 354 * 355 * The other is a counter of updaters. Before an update happens 356 * the update partition of the counter is incremented. This will 357 * allow the updater to update the counter atomically. 358 * 359 * The counter is 20 bits, and the state data is 12. 360 */ 361 #define RB_WRITE_MASK 0xfffff 362 #define RB_WRITE_INTCNT (1 << 20) 363 364 static void rb_init_page(struct buffer_data_page *bpage) 365 { 366 local_set(&bpage->commit, 0); 367 } 368 369 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 370 { 371 return local_read(&bpage->page->commit); 372 } 373 374 static void free_buffer_page(struct buffer_page *bpage) 375 { 376 free_pages((unsigned long)bpage->page, bpage->order); 377 kfree(bpage); 378 } 379 380 /* 381 * We need to fit the time_stamp delta into 27 bits. 382 */ 383 static inline bool test_time_stamp(u64 delta) 384 { 385 return !!(delta & TS_DELTA_TEST); 386 } 387 388 struct rb_irq_work { 389 struct irq_work work; 390 wait_queue_head_t waiters; 391 wait_queue_head_t full_waiters; 392 atomic_t seq; 393 bool waiters_pending; 394 bool full_waiters_pending; 395 bool wakeup_full; 396 }; 397 398 /* 399 * Structure to hold event state and handle nested events. 400 */ 401 struct rb_event_info { 402 u64 ts; 403 u64 delta; 404 u64 before; 405 u64 after; 406 unsigned long length; 407 struct buffer_page *tail_page; 408 int add_timestamp; 409 }; 410 411 /* 412 * Used for the add_timestamp 413 * NONE 414 * EXTEND - wants a time extend 415 * ABSOLUTE - the buffer requests all events to have absolute time stamps 416 * FORCE - force a full time stamp. 417 */ 418 enum { 419 RB_ADD_STAMP_NONE = 0, 420 RB_ADD_STAMP_EXTEND = BIT(1), 421 RB_ADD_STAMP_ABSOLUTE = BIT(2), 422 RB_ADD_STAMP_FORCE = BIT(3) 423 }; 424 /* 425 * Used for which event context the event is in. 426 * TRANSITION = 0 427 * NMI = 1 428 * IRQ = 2 429 * SOFTIRQ = 3 430 * NORMAL = 4 431 * 432 * See trace_recursive_lock() comment below for more details. 433 */ 434 enum { 435 RB_CTX_TRANSITION, 436 RB_CTX_NMI, 437 RB_CTX_IRQ, 438 RB_CTX_SOFTIRQ, 439 RB_CTX_NORMAL, 440 RB_CTX_MAX 441 }; 442 443 struct rb_time_struct { 444 local64_t time; 445 }; 446 typedef struct rb_time_struct rb_time_t; 447 448 #define MAX_NEST 5 449 450 /* 451 * head_page == tail_page && head == tail then buffer is empty. 452 */ 453 struct ring_buffer_per_cpu { 454 int cpu; 455 atomic_t record_disabled; 456 atomic_t resize_disabled; 457 struct trace_buffer *buffer; 458 raw_spinlock_t reader_lock; /* serialize readers */ 459 arch_spinlock_t lock; 460 struct lock_class_key lock_key; 461 struct buffer_data_page *free_page; 462 unsigned long nr_pages; 463 unsigned int current_context; 464 struct list_head *pages; 465 struct buffer_page *head_page; /* read from head */ 466 struct buffer_page *tail_page; /* write to tail */ 467 struct buffer_page *commit_page; /* committed pages */ 468 struct buffer_page *reader_page; 469 unsigned long lost_events; 470 unsigned long last_overrun; 471 unsigned long nest; 472 local_t entries_bytes; 473 local_t entries; 474 local_t overrun; 475 local_t commit_overrun; 476 local_t dropped_events; 477 local_t committing; 478 local_t commits; 479 local_t pages_touched; 480 local_t pages_lost; 481 local_t pages_read; 482 long last_pages_touch; 483 size_t shortest_full; 484 unsigned long read; 485 unsigned long read_bytes; 486 rb_time_t write_stamp; 487 rb_time_t before_stamp; 488 u64 event_stamp[MAX_NEST]; 489 u64 read_stamp; 490 /* pages removed since last reset */ 491 unsigned long pages_removed; 492 493 unsigned int mapped; 494 struct mutex mapping_lock; 495 unsigned long *subbuf_ids; /* ID to subbuf VA */ 496 struct trace_buffer_meta *meta_page; 497 498 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 499 long nr_pages_to_update; 500 struct list_head new_pages; /* new pages to add */ 501 struct work_struct update_pages_work; 502 struct completion update_done; 503 504 struct rb_irq_work irq_work; 505 }; 506 507 struct trace_buffer { 508 unsigned flags; 509 int cpus; 510 atomic_t record_disabled; 511 atomic_t resizing; 512 cpumask_var_t cpumask; 513 514 struct lock_class_key *reader_lock_key; 515 516 struct mutex mutex; 517 518 struct ring_buffer_per_cpu **buffers; 519 520 struct hlist_node node; 521 u64 (*clock)(void); 522 523 struct rb_irq_work irq_work; 524 bool time_stamp_abs; 525 526 unsigned int subbuf_size; 527 unsigned int subbuf_order; 528 unsigned int max_data_size; 529 }; 530 531 struct ring_buffer_iter { 532 struct ring_buffer_per_cpu *cpu_buffer; 533 unsigned long head; 534 unsigned long next_event; 535 struct buffer_page *head_page; 536 struct buffer_page *cache_reader_page; 537 unsigned long cache_read; 538 unsigned long cache_pages_removed; 539 u64 read_stamp; 540 u64 page_stamp; 541 struct ring_buffer_event *event; 542 size_t event_size; 543 int missed_events; 544 }; 545 546 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 547 { 548 struct buffer_data_page field; 549 550 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 551 "offset:0;\tsize:%u;\tsigned:%u;\n", 552 (unsigned int)sizeof(field.time_stamp), 553 (unsigned int)is_signed_type(u64)); 554 555 trace_seq_printf(s, "\tfield: local_t commit;\t" 556 "offset:%u;\tsize:%u;\tsigned:%u;\n", 557 (unsigned int)offsetof(typeof(field), commit), 558 (unsigned int)sizeof(field.commit), 559 (unsigned int)is_signed_type(long)); 560 561 trace_seq_printf(s, "\tfield: int overwrite;\t" 562 "offset:%u;\tsize:%u;\tsigned:%u;\n", 563 (unsigned int)offsetof(typeof(field), commit), 564 1, 565 (unsigned int)is_signed_type(long)); 566 567 trace_seq_printf(s, "\tfield: char data;\t" 568 "offset:%u;\tsize:%u;\tsigned:%u;\n", 569 (unsigned int)offsetof(typeof(field), data), 570 (unsigned int)buffer->subbuf_size, 571 (unsigned int)is_signed_type(char)); 572 573 return !trace_seq_has_overflowed(s); 574 } 575 576 static inline void rb_time_read(rb_time_t *t, u64 *ret) 577 { 578 *ret = local64_read(&t->time); 579 } 580 static void rb_time_set(rb_time_t *t, u64 val) 581 { 582 local64_set(&t->time, val); 583 } 584 585 /* 586 * Enable this to make sure that the event passed to 587 * ring_buffer_event_time_stamp() is not committed and also 588 * is on the buffer that it passed in. 589 */ 590 //#define RB_VERIFY_EVENT 591 #ifdef RB_VERIFY_EVENT 592 static struct list_head *rb_list_head(struct list_head *list); 593 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 594 void *event) 595 { 596 struct buffer_page *page = cpu_buffer->commit_page; 597 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 598 struct list_head *next; 599 long commit, write; 600 unsigned long addr = (unsigned long)event; 601 bool done = false; 602 int stop = 0; 603 604 /* Make sure the event exists and is not committed yet */ 605 do { 606 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 607 done = true; 608 commit = local_read(&page->page->commit); 609 write = local_read(&page->write); 610 if (addr >= (unsigned long)&page->page->data[commit] && 611 addr < (unsigned long)&page->page->data[write]) 612 return; 613 614 next = rb_list_head(page->list.next); 615 page = list_entry(next, struct buffer_page, list); 616 } while (!done); 617 WARN_ON_ONCE(1); 618 } 619 #else 620 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 621 void *event) 622 { 623 } 624 #endif 625 626 /* 627 * The absolute time stamp drops the 5 MSBs and some clocks may 628 * require them. The rb_fix_abs_ts() will take a previous full 629 * time stamp, and add the 5 MSB of that time stamp on to the 630 * saved absolute time stamp. Then they are compared in case of 631 * the unlikely event that the latest time stamp incremented 632 * the 5 MSB. 633 */ 634 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 635 { 636 if (save_ts & TS_MSB) { 637 abs |= save_ts & TS_MSB; 638 /* Check for overflow */ 639 if (unlikely(abs < save_ts)) 640 abs += 1ULL << 59; 641 } 642 return abs; 643 } 644 645 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 646 647 /** 648 * ring_buffer_event_time_stamp - return the event's current time stamp 649 * @buffer: The buffer that the event is on 650 * @event: the event to get the time stamp of 651 * 652 * Note, this must be called after @event is reserved, and before it is 653 * committed to the ring buffer. And must be called from the same 654 * context where the event was reserved (normal, softirq, irq, etc). 655 * 656 * Returns the time stamp associated with the current event. 657 * If the event has an extended time stamp, then that is used as 658 * the time stamp to return. 659 * In the highly unlikely case that the event was nested more than 660 * the max nesting, then the write_stamp of the buffer is returned, 661 * otherwise current time is returned, but that really neither of 662 * the last two cases should ever happen. 663 */ 664 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 665 struct ring_buffer_event *event) 666 { 667 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 668 unsigned int nest; 669 u64 ts; 670 671 /* If the event includes an absolute time, then just use that */ 672 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 673 ts = rb_event_time_stamp(event); 674 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 675 } 676 677 nest = local_read(&cpu_buffer->committing); 678 verify_event(cpu_buffer, event); 679 if (WARN_ON_ONCE(!nest)) 680 goto fail; 681 682 /* Read the current saved nesting level time stamp */ 683 if (likely(--nest < MAX_NEST)) 684 return cpu_buffer->event_stamp[nest]; 685 686 /* Shouldn't happen, warn if it does */ 687 WARN_ONCE(1, "nest (%d) greater than max", nest); 688 689 fail: 690 rb_time_read(&cpu_buffer->write_stamp, &ts); 691 692 return ts; 693 } 694 695 /** 696 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 697 * @buffer: The ring_buffer to get the number of pages from 698 * @cpu: The cpu of the ring_buffer to get the number of pages from 699 * 700 * Returns the number of pages that have content in the ring buffer. 701 */ 702 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 703 { 704 size_t read; 705 size_t lost; 706 size_t cnt; 707 708 read = local_read(&buffer->buffers[cpu]->pages_read); 709 lost = local_read(&buffer->buffers[cpu]->pages_lost); 710 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 711 712 if (WARN_ON_ONCE(cnt < lost)) 713 return 0; 714 715 cnt -= lost; 716 717 /* The reader can read an empty page, but not more than that */ 718 if (cnt < read) { 719 WARN_ON_ONCE(read > cnt + 1); 720 return 0; 721 } 722 723 return cnt - read; 724 } 725 726 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 727 { 728 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 729 size_t nr_pages; 730 size_t dirty; 731 732 nr_pages = cpu_buffer->nr_pages; 733 if (!nr_pages || !full) 734 return true; 735 736 /* 737 * Add one as dirty will never equal nr_pages, as the sub-buffer 738 * that the writer is on is not counted as dirty. 739 * This is needed if "buffer_percent" is set to 100. 740 */ 741 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 742 743 return (dirty * 100) >= (full * nr_pages); 744 } 745 746 /* 747 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 748 * 749 * Schedules a delayed work to wake up any task that is blocked on the 750 * ring buffer waiters queue. 751 */ 752 static void rb_wake_up_waiters(struct irq_work *work) 753 { 754 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 755 756 /* For waiters waiting for the first wake up */ 757 (void)atomic_fetch_inc_release(&rbwork->seq); 758 759 wake_up_all(&rbwork->waiters); 760 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 761 /* Only cpu_buffer sets the above flags */ 762 struct ring_buffer_per_cpu *cpu_buffer = 763 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 764 765 /* Called from interrupt context */ 766 raw_spin_lock(&cpu_buffer->reader_lock); 767 rbwork->wakeup_full = false; 768 rbwork->full_waiters_pending = false; 769 770 /* Waking up all waiters, they will reset the shortest full */ 771 cpu_buffer->shortest_full = 0; 772 raw_spin_unlock(&cpu_buffer->reader_lock); 773 774 wake_up_all(&rbwork->full_waiters); 775 } 776 } 777 778 /** 779 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 780 * @buffer: The ring buffer to wake waiters on 781 * @cpu: The CPU buffer to wake waiters on 782 * 783 * In the case of a file that represents a ring buffer is closing, 784 * it is prudent to wake up any waiters that are on this. 785 */ 786 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 787 { 788 struct ring_buffer_per_cpu *cpu_buffer; 789 struct rb_irq_work *rbwork; 790 791 if (!buffer) 792 return; 793 794 if (cpu == RING_BUFFER_ALL_CPUS) { 795 796 /* Wake up individual ones too. One level recursion */ 797 for_each_buffer_cpu(buffer, cpu) 798 ring_buffer_wake_waiters(buffer, cpu); 799 800 rbwork = &buffer->irq_work; 801 } else { 802 if (WARN_ON_ONCE(!buffer->buffers)) 803 return; 804 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 805 return; 806 807 cpu_buffer = buffer->buffers[cpu]; 808 /* The CPU buffer may not have been initialized yet */ 809 if (!cpu_buffer) 810 return; 811 rbwork = &cpu_buffer->irq_work; 812 } 813 814 /* This can be called in any context */ 815 irq_work_queue(&rbwork->work); 816 } 817 818 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 819 { 820 struct ring_buffer_per_cpu *cpu_buffer; 821 bool ret = false; 822 823 /* Reads of all CPUs always waits for any data */ 824 if (cpu == RING_BUFFER_ALL_CPUS) 825 return !ring_buffer_empty(buffer); 826 827 cpu_buffer = buffer->buffers[cpu]; 828 829 if (!ring_buffer_empty_cpu(buffer, cpu)) { 830 unsigned long flags; 831 bool pagebusy; 832 833 if (!full) 834 return true; 835 836 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 837 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 838 ret = !pagebusy && full_hit(buffer, cpu, full); 839 840 if (!ret && (!cpu_buffer->shortest_full || 841 cpu_buffer->shortest_full > full)) { 842 cpu_buffer->shortest_full = full; 843 } 844 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 845 } 846 return ret; 847 } 848 849 static inline bool 850 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 851 int cpu, int full, ring_buffer_cond_fn cond, void *data) 852 { 853 if (rb_watermark_hit(buffer, cpu, full)) 854 return true; 855 856 if (cond(data)) 857 return true; 858 859 /* 860 * The events can happen in critical sections where 861 * checking a work queue can cause deadlocks. 862 * After adding a task to the queue, this flag is set 863 * only to notify events to try to wake up the queue 864 * using irq_work. 865 * 866 * We don't clear it even if the buffer is no longer 867 * empty. The flag only causes the next event to run 868 * irq_work to do the work queue wake up. The worse 869 * that can happen if we race with !trace_empty() is that 870 * an event will cause an irq_work to try to wake up 871 * an empty queue. 872 * 873 * There's no reason to protect this flag either, as 874 * the work queue and irq_work logic will do the necessary 875 * synchronization for the wake ups. The only thing 876 * that is necessary is that the wake up happens after 877 * a task has been queued. It's OK for spurious wake ups. 878 */ 879 if (full) 880 rbwork->full_waiters_pending = true; 881 else 882 rbwork->waiters_pending = true; 883 884 return false; 885 } 886 887 struct rb_wait_data { 888 struct rb_irq_work *irq_work; 889 int seq; 890 }; 891 892 /* 893 * The default wait condition for ring_buffer_wait() is to just to exit the 894 * wait loop the first time it is woken up. 895 */ 896 static bool rb_wait_once(void *data) 897 { 898 struct rb_wait_data *rdata = data; 899 struct rb_irq_work *rbwork = rdata->irq_work; 900 901 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 902 } 903 904 /** 905 * ring_buffer_wait - wait for input to the ring buffer 906 * @buffer: buffer to wait on 907 * @cpu: the cpu buffer to wait on 908 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 909 * @cond: condition function to break out of wait (NULL to run once) 910 * @data: the data to pass to @cond. 911 * 912 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 913 * as data is added to any of the @buffer's cpu buffers. Otherwise 914 * it will wait for data to be added to a specific cpu buffer. 915 */ 916 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 917 ring_buffer_cond_fn cond, void *data) 918 { 919 struct ring_buffer_per_cpu *cpu_buffer; 920 struct wait_queue_head *waitq; 921 struct rb_irq_work *rbwork; 922 struct rb_wait_data rdata; 923 int ret = 0; 924 925 /* 926 * Depending on what the caller is waiting for, either any 927 * data in any cpu buffer, or a specific buffer, put the 928 * caller on the appropriate wait queue. 929 */ 930 if (cpu == RING_BUFFER_ALL_CPUS) { 931 rbwork = &buffer->irq_work; 932 /* Full only makes sense on per cpu reads */ 933 full = 0; 934 } else { 935 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 936 return -ENODEV; 937 cpu_buffer = buffer->buffers[cpu]; 938 rbwork = &cpu_buffer->irq_work; 939 } 940 941 if (full) 942 waitq = &rbwork->full_waiters; 943 else 944 waitq = &rbwork->waiters; 945 946 /* Set up to exit loop as soon as it is woken */ 947 if (!cond) { 948 cond = rb_wait_once; 949 rdata.irq_work = rbwork; 950 rdata.seq = atomic_read_acquire(&rbwork->seq); 951 data = &rdata; 952 } 953 954 ret = wait_event_interruptible((*waitq), 955 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 956 957 return ret; 958 } 959 960 /** 961 * ring_buffer_poll_wait - poll on buffer input 962 * @buffer: buffer to wait on 963 * @cpu: the cpu buffer to wait on 964 * @filp: the file descriptor 965 * @poll_table: The poll descriptor 966 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 967 * 968 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 969 * as data is added to any of the @buffer's cpu buffers. Otherwise 970 * it will wait for data to be added to a specific cpu buffer. 971 * 972 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 973 * zero otherwise. 974 */ 975 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 976 struct file *filp, poll_table *poll_table, int full) 977 { 978 struct ring_buffer_per_cpu *cpu_buffer; 979 struct rb_irq_work *rbwork; 980 981 if (cpu == RING_BUFFER_ALL_CPUS) { 982 rbwork = &buffer->irq_work; 983 full = 0; 984 } else { 985 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 986 return EPOLLERR; 987 988 cpu_buffer = buffer->buffers[cpu]; 989 rbwork = &cpu_buffer->irq_work; 990 } 991 992 if (full) { 993 poll_wait(filp, &rbwork->full_waiters, poll_table); 994 995 if (rb_watermark_hit(buffer, cpu, full)) 996 return EPOLLIN | EPOLLRDNORM; 997 /* 998 * Only allow full_waiters_pending update to be seen after 999 * the shortest_full is set (in rb_watermark_hit). If the 1000 * writer sees the full_waiters_pending flag set, it will 1001 * compare the amount in the ring buffer to shortest_full. 1002 * If the amount in the ring buffer is greater than the 1003 * shortest_full percent, it will call the irq_work handler 1004 * to wake up this list. The irq_handler will reset shortest_full 1005 * back to zero. That's done under the reader_lock, but 1006 * the below smp_mb() makes sure that the update to 1007 * full_waiters_pending doesn't leak up into the above. 1008 */ 1009 smp_mb(); 1010 rbwork->full_waiters_pending = true; 1011 return 0; 1012 } 1013 1014 poll_wait(filp, &rbwork->waiters, poll_table); 1015 rbwork->waiters_pending = true; 1016 1017 /* 1018 * There's a tight race between setting the waiters_pending and 1019 * checking if the ring buffer is empty. Once the waiters_pending bit 1020 * is set, the next event will wake the task up, but we can get stuck 1021 * if there's only a single event in. 1022 * 1023 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1024 * but adding a memory barrier to all events will cause too much of a 1025 * performance hit in the fast path. We only need a memory barrier when 1026 * the buffer goes from empty to having content. But as this race is 1027 * extremely small, and it's not a problem if another event comes in, we 1028 * will fix it later. 1029 */ 1030 smp_mb(); 1031 1032 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1033 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1034 return EPOLLIN | EPOLLRDNORM; 1035 return 0; 1036 } 1037 1038 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1039 #define RB_WARN_ON(b, cond) \ 1040 ({ \ 1041 int _____ret = unlikely(cond); \ 1042 if (_____ret) { \ 1043 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1044 struct ring_buffer_per_cpu *__b = \ 1045 (void *)b; \ 1046 atomic_inc(&__b->buffer->record_disabled); \ 1047 } else \ 1048 atomic_inc(&b->record_disabled); \ 1049 WARN_ON(1); \ 1050 } \ 1051 _____ret; \ 1052 }) 1053 1054 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1055 #define DEBUG_SHIFT 0 1056 1057 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1058 { 1059 u64 ts; 1060 1061 /* Skip retpolines :-( */ 1062 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1063 ts = trace_clock_local(); 1064 else 1065 ts = buffer->clock(); 1066 1067 /* shift to debug/test normalization and TIME_EXTENTS */ 1068 return ts << DEBUG_SHIFT; 1069 } 1070 1071 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1072 { 1073 u64 time; 1074 1075 preempt_disable_notrace(); 1076 time = rb_time_stamp(buffer); 1077 preempt_enable_notrace(); 1078 1079 return time; 1080 } 1081 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1082 1083 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1084 int cpu, u64 *ts) 1085 { 1086 /* Just stupid testing the normalize function and deltas */ 1087 *ts >>= DEBUG_SHIFT; 1088 } 1089 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1090 1091 /* 1092 * Making the ring buffer lockless makes things tricky. 1093 * Although writes only happen on the CPU that they are on, 1094 * and they only need to worry about interrupts. Reads can 1095 * happen on any CPU. 1096 * 1097 * The reader page is always off the ring buffer, but when the 1098 * reader finishes with a page, it needs to swap its page with 1099 * a new one from the buffer. The reader needs to take from 1100 * the head (writes go to the tail). But if a writer is in overwrite 1101 * mode and wraps, it must push the head page forward. 1102 * 1103 * Here lies the problem. 1104 * 1105 * The reader must be careful to replace only the head page, and 1106 * not another one. As described at the top of the file in the 1107 * ASCII art, the reader sets its old page to point to the next 1108 * page after head. It then sets the page after head to point to 1109 * the old reader page. But if the writer moves the head page 1110 * during this operation, the reader could end up with the tail. 1111 * 1112 * We use cmpxchg to help prevent this race. We also do something 1113 * special with the page before head. We set the LSB to 1. 1114 * 1115 * When the writer must push the page forward, it will clear the 1116 * bit that points to the head page, move the head, and then set 1117 * the bit that points to the new head page. 1118 * 1119 * We also don't want an interrupt coming in and moving the head 1120 * page on another writer. Thus we use the second LSB to catch 1121 * that too. Thus: 1122 * 1123 * head->list->prev->next bit 1 bit 0 1124 * ------- ------- 1125 * Normal page 0 0 1126 * Points to head page 0 1 1127 * New head page 1 0 1128 * 1129 * Note we can not trust the prev pointer of the head page, because: 1130 * 1131 * +----+ +-----+ +-----+ 1132 * | |------>| T |---X--->| N | 1133 * | |<------| | | | 1134 * +----+ +-----+ +-----+ 1135 * ^ ^ | 1136 * | +-----+ | | 1137 * +----------| R |----------+ | 1138 * | |<-----------+ 1139 * +-----+ 1140 * 1141 * Key: ---X--> HEAD flag set in pointer 1142 * T Tail page 1143 * R Reader page 1144 * N Next page 1145 * 1146 * (see __rb_reserve_next() to see where this happens) 1147 * 1148 * What the above shows is that the reader just swapped out 1149 * the reader page with a page in the buffer, but before it 1150 * could make the new header point back to the new page added 1151 * it was preempted by a writer. The writer moved forward onto 1152 * the new page added by the reader and is about to move forward 1153 * again. 1154 * 1155 * You can see, it is legitimate for the previous pointer of 1156 * the head (or any page) not to point back to itself. But only 1157 * temporarily. 1158 */ 1159 1160 #define RB_PAGE_NORMAL 0UL 1161 #define RB_PAGE_HEAD 1UL 1162 #define RB_PAGE_UPDATE 2UL 1163 1164 1165 #define RB_FLAG_MASK 3UL 1166 1167 /* PAGE_MOVED is not part of the mask */ 1168 #define RB_PAGE_MOVED 4UL 1169 1170 /* 1171 * rb_list_head - remove any bit 1172 */ 1173 static struct list_head *rb_list_head(struct list_head *list) 1174 { 1175 unsigned long val = (unsigned long)list; 1176 1177 return (struct list_head *)(val & ~RB_FLAG_MASK); 1178 } 1179 1180 /* 1181 * rb_is_head_page - test if the given page is the head page 1182 * 1183 * Because the reader may move the head_page pointer, we can 1184 * not trust what the head page is (it may be pointing to 1185 * the reader page). But if the next page is a header page, 1186 * its flags will be non zero. 1187 */ 1188 static inline int 1189 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1190 { 1191 unsigned long val; 1192 1193 val = (unsigned long)list->next; 1194 1195 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1196 return RB_PAGE_MOVED; 1197 1198 return val & RB_FLAG_MASK; 1199 } 1200 1201 /* 1202 * rb_is_reader_page 1203 * 1204 * The unique thing about the reader page, is that, if the 1205 * writer is ever on it, the previous pointer never points 1206 * back to the reader page. 1207 */ 1208 static bool rb_is_reader_page(struct buffer_page *page) 1209 { 1210 struct list_head *list = page->list.prev; 1211 1212 return rb_list_head(list->next) != &page->list; 1213 } 1214 1215 /* 1216 * rb_set_list_to_head - set a list_head to be pointing to head. 1217 */ 1218 static void rb_set_list_to_head(struct list_head *list) 1219 { 1220 unsigned long *ptr; 1221 1222 ptr = (unsigned long *)&list->next; 1223 *ptr |= RB_PAGE_HEAD; 1224 *ptr &= ~RB_PAGE_UPDATE; 1225 } 1226 1227 /* 1228 * rb_head_page_activate - sets up head page 1229 */ 1230 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1231 { 1232 struct buffer_page *head; 1233 1234 head = cpu_buffer->head_page; 1235 if (!head) 1236 return; 1237 1238 /* 1239 * Set the previous list pointer to have the HEAD flag. 1240 */ 1241 rb_set_list_to_head(head->list.prev); 1242 } 1243 1244 static void rb_list_head_clear(struct list_head *list) 1245 { 1246 unsigned long *ptr = (unsigned long *)&list->next; 1247 1248 *ptr &= ~RB_FLAG_MASK; 1249 } 1250 1251 /* 1252 * rb_head_page_deactivate - clears head page ptr (for free list) 1253 */ 1254 static void 1255 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1256 { 1257 struct list_head *hd; 1258 1259 /* Go through the whole list and clear any pointers found. */ 1260 rb_list_head_clear(cpu_buffer->pages); 1261 1262 list_for_each(hd, cpu_buffer->pages) 1263 rb_list_head_clear(hd); 1264 } 1265 1266 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1267 struct buffer_page *head, 1268 struct buffer_page *prev, 1269 int old_flag, int new_flag) 1270 { 1271 struct list_head *list; 1272 unsigned long val = (unsigned long)&head->list; 1273 unsigned long ret; 1274 1275 list = &prev->list; 1276 1277 val &= ~RB_FLAG_MASK; 1278 1279 ret = cmpxchg((unsigned long *)&list->next, 1280 val | old_flag, val | new_flag); 1281 1282 /* check if the reader took the page */ 1283 if ((ret & ~RB_FLAG_MASK) != val) 1284 return RB_PAGE_MOVED; 1285 1286 return ret & RB_FLAG_MASK; 1287 } 1288 1289 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1290 struct buffer_page *head, 1291 struct buffer_page *prev, 1292 int old_flag) 1293 { 1294 return rb_head_page_set(cpu_buffer, head, prev, 1295 old_flag, RB_PAGE_UPDATE); 1296 } 1297 1298 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1299 struct buffer_page *head, 1300 struct buffer_page *prev, 1301 int old_flag) 1302 { 1303 return rb_head_page_set(cpu_buffer, head, prev, 1304 old_flag, RB_PAGE_HEAD); 1305 } 1306 1307 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1308 struct buffer_page *head, 1309 struct buffer_page *prev, 1310 int old_flag) 1311 { 1312 return rb_head_page_set(cpu_buffer, head, prev, 1313 old_flag, RB_PAGE_NORMAL); 1314 } 1315 1316 static inline void rb_inc_page(struct buffer_page **bpage) 1317 { 1318 struct list_head *p = rb_list_head((*bpage)->list.next); 1319 1320 *bpage = list_entry(p, struct buffer_page, list); 1321 } 1322 1323 static struct buffer_page * 1324 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1325 { 1326 struct buffer_page *head; 1327 struct buffer_page *page; 1328 struct list_head *list; 1329 int i; 1330 1331 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1332 return NULL; 1333 1334 /* sanity check */ 1335 list = cpu_buffer->pages; 1336 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1337 return NULL; 1338 1339 page = head = cpu_buffer->head_page; 1340 /* 1341 * It is possible that the writer moves the header behind 1342 * where we started, and we miss in one loop. 1343 * A second loop should grab the header, but we'll do 1344 * three loops just because I'm paranoid. 1345 */ 1346 for (i = 0; i < 3; i++) { 1347 do { 1348 if (rb_is_head_page(page, page->list.prev)) { 1349 cpu_buffer->head_page = page; 1350 return page; 1351 } 1352 rb_inc_page(&page); 1353 } while (page != head); 1354 } 1355 1356 RB_WARN_ON(cpu_buffer, 1); 1357 1358 return NULL; 1359 } 1360 1361 static bool rb_head_page_replace(struct buffer_page *old, 1362 struct buffer_page *new) 1363 { 1364 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1365 unsigned long val; 1366 1367 val = *ptr & ~RB_FLAG_MASK; 1368 val |= RB_PAGE_HEAD; 1369 1370 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1371 } 1372 1373 /* 1374 * rb_tail_page_update - move the tail page forward 1375 */ 1376 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1377 struct buffer_page *tail_page, 1378 struct buffer_page *next_page) 1379 { 1380 unsigned long old_entries; 1381 unsigned long old_write; 1382 1383 /* 1384 * The tail page now needs to be moved forward. 1385 * 1386 * We need to reset the tail page, but without messing 1387 * with possible erasing of data brought in by interrupts 1388 * that have moved the tail page and are currently on it. 1389 * 1390 * We add a counter to the write field to denote this. 1391 */ 1392 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1393 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1394 1395 /* 1396 * Just make sure we have seen our old_write and synchronize 1397 * with any interrupts that come in. 1398 */ 1399 barrier(); 1400 1401 /* 1402 * If the tail page is still the same as what we think 1403 * it is, then it is up to us to update the tail 1404 * pointer. 1405 */ 1406 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1407 /* Zero the write counter */ 1408 unsigned long val = old_write & ~RB_WRITE_MASK; 1409 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1410 1411 /* 1412 * This will only succeed if an interrupt did 1413 * not come in and change it. In which case, we 1414 * do not want to modify it. 1415 * 1416 * We add (void) to let the compiler know that we do not care 1417 * about the return value of these functions. We use the 1418 * cmpxchg to only update if an interrupt did not already 1419 * do it for us. If the cmpxchg fails, we don't care. 1420 */ 1421 (void)local_cmpxchg(&next_page->write, old_write, val); 1422 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1423 1424 /* 1425 * No need to worry about races with clearing out the commit. 1426 * it only can increment when a commit takes place. But that 1427 * only happens in the outer most nested commit. 1428 */ 1429 local_set(&next_page->page->commit, 0); 1430 1431 /* Either we update tail_page or an interrupt does */ 1432 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1433 local_inc(&cpu_buffer->pages_touched); 1434 } 1435 } 1436 1437 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1438 struct buffer_page *bpage) 1439 { 1440 unsigned long val = (unsigned long)bpage; 1441 1442 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1443 } 1444 1445 /** 1446 * rb_check_pages - integrity check of buffer pages 1447 * @cpu_buffer: CPU buffer with pages to test 1448 * 1449 * As a safety measure we check to make sure the data pages have not 1450 * been corrupted. 1451 * 1452 * Callers of this function need to guarantee that the list of pages doesn't get 1453 * modified during the check. In particular, if it's possible that the function 1454 * is invoked with concurrent readers which can swap in a new reader page then 1455 * the caller should take cpu_buffer->reader_lock. 1456 */ 1457 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1458 { 1459 struct list_head *head = rb_list_head(cpu_buffer->pages); 1460 struct list_head *tmp; 1461 1462 if (RB_WARN_ON(cpu_buffer, 1463 rb_list_head(rb_list_head(head->next)->prev) != head)) 1464 return; 1465 1466 if (RB_WARN_ON(cpu_buffer, 1467 rb_list_head(rb_list_head(head->prev)->next) != head)) 1468 return; 1469 1470 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1471 if (RB_WARN_ON(cpu_buffer, 1472 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1473 return; 1474 1475 if (RB_WARN_ON(cpu_buffer, 1476 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1477 return; 1478 } 1479 } 1480 1481 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1482 long nr_pages, struct list_head *pages) 1483 { 1484 struct buffer_page *bpage, *tmp; 1485 bool user_thread = current->mm != NULL; 1486 gfp_t mflags; 1487 long i; 1488 1489 /* 1490 * Check if the available memory is there first. 1491 * Note, si_mem_available() only gives us a rough estimate of available 1492 * memory. It may not be accurate. But we don't care, we just want 1493 * to prevent doing any allocation when it is obvious that it is 1494 * not going to succeed. 1495 */ 1496 i = si_mem_available(); 1497 if (i < nr_pages) 1498 return -ENOMEM; 1499 1500 /* 1501 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1502 * gracefully without invoking oom-killer and the system is not 1503 * destabilized. 1504 */ 1505 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1506 1507 /* 1508 * If a user thread allocates too much, and si_mem_available() 1509 * reports there's enough memory, even though there is not. 1510 * Make sure the OOM killer kills this thread. This can happen 1511 * even with RETRY_MAYFAIL because another task may be doing 1512 * an allocation after this task has taken all memory. 1513 * This is the task the OOM killer needs to take out during this 1514 * loop, even if it was triggered by an allocation somewhere else. 1515 */ 1516 if (user_thread) 1517 set_current_oom_origin(); 1518 for (i = 0; i < nr_pages; i++) { 1519 struct page *page; 1520 1521 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1522 mflags, cpu_to_node(cpu_buffer->cpu)); 1523 if (!bpage) 1524 goto free_pages; 1525 1526 rb_check_bpage(cpu_buffer, bpage); 1527 1528 list_add(&bpage->list, pages); 1529 1530 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 1531 mflags | __GFP_COMP | __GFP_ZERO, 1532 cpu_buffer->buffer->subbuf_order); 1533 if (!page) 1534 goto free_pages; 1535 bpage->page = page_address(page); 1536 bpage->order = cpu_buffer->buffer->subbuf_order; 1537 rb_init_page(bpage->page); 1538 1539 if (user_thread && fatal_signal_pending(current)) 1540 goto free_pages; 1541 } 1542 if (user_thread) 1543 clear_current_oom_origin(); 1544 1545 return 0; 1546 1547 free_pages: 1548 list_for_each_entry_safe(bpage, tmp, pages, list) { 1549 list_del_init(&bpage->list); 1550 free_buffer_page(bpage); 1551 } 1552 if (user_thread) 1553 clear_current_oom_origin(); 1554 1555 return -ENOMEM; 1556 } 1557 1558 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1559 unsigned long nr_pages) 1560 { 1561 LIST_HEAD(pages); 1562 1563 WARN_ON(!nr_pages); 1564 1565 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1566 return -ENOMEM; 1567 1568 /* 1569 * The ring buffer page list is a circular list that does not 1570 * start and end with a list head. All page list items point to 1571 * other pages. 1572 */ 1573 cpu_buffer->pages = pages.next; 1574 list_del(&pages); 1575 1576 cpu_buffer->nr_pages = nr_pages; 1577 1578 rb_check_pages(cpu_buffer); 1579 1580 return 0; 1581 } 1582 1583 static struct ring_buffer_per_cpu * 1584 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1585 { 1586 struct ring_buffer_per_cpu *cpu_buffer; 1587 struct buffer_page *bpage; 1588 struct page *page; 1589 int ret; 1590 1591 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1592 GFP_KERNEL, cpu_to_node(cpu)); 1593 if (!cpu_buffer) 1594 return NULL; 1595 1596 cpu_buffer->cpu = cpu; 1597 cpu_buffer->buffer = buffer; 1598 raw_spin_lock_init(&cpu_buffer->reader_lock); 1599 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1600 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1601 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1602 init_completion(&cpu_buffer->update_done); 1603 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1604 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1605 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1606 mutex_init(&cpu_buffer->mapping_lock); 1607 1608 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1609 GFP_KERNEL, cpu_to_node(cpu)); 1610 if (!bpage) 1611 goto fail_free_buffer; 1612 1613 rb_check_bpage(cpu_buffer, bpage); 1614 1615 cpu_buffer->reader_page = bpage; 1616 1617 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 1618 cpu_buffer->buffer->subbuf_order); 1619 if (!page) 1620 goto fail_free_reader; 1621 bpage->page = page_address(page); 1622 rb_init_page(bpage->page); 1623 1624 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1625 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1626 1627 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1628 if (ret < 0) 1629 goto fail_free_reader; 1630 1631 cpu_buffer->head_page 1632 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1633 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1634 1635 rb_head_page_activate(cpu_buffer); 1636 1637 return cpu_buffer; 1638 1639 fail_free_reader: 1640 free_buffer_page(cpu_buffer->reader_page); 1641 1642 fail_free_buffer: 1643 kfree(cpu_buffer); 1644 return NULL; 1645 } 1646 1647 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1648 { 1649 struct list_head *head = cpu_buffer->pages; 1650 struct buffer_page *bpage, *tmp; 1651 1652 irq_work_sync(&cpu_buffer->irq_work.work); 1653 1654 free_buffer_page(cpu_buffer->reader_page); 1655 1656 if (head) { 1657 rb_head_page_deactivate(cpu_buffer); 1658 1659 list_for_each_entry_safe(bpage, tmp, head, list) { 1660 list_del_init(&bpage->list); 1661 free_buffer_page(bpage); 1662 } 1663 bpage = list_entry(head, struct buffer_page, list); 1664 free_buffer_page(bpage); 1665 } 1666 1667 free_page((unsigned long)cpu_buffer->free_page); 1668 1669 kfree(cpu_buffer); 1670 } 1671 1672 /** 1673 * __ring_buffer_alloc - allocate a new ring_buffer 1674 * @size: the size in bytes per cpu that is needed. 1675 * @flags: attributes to set for the ring buffer. 1676 * @key: ring buffer reader_lock_key. 1677 * 1678 * Currently the only flag that is available is the RB_FL_OVERWRITE 1679 * flag. This flag means that the buffer will overwrite old data 1680 * when the buffer wraps. If this flag is not set, the buffer will 1681 * drop data when the tail hits the head. 1682 */ 1683 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1684 struct lock_class_key *key) 1685 { 1686 struct trace_buffer *buffer; 1687 long nr_pages; 1688 int bsize; 1689 int cpu; 1690 int ret; 1691 1692 /* keep it in its own cache line */ 1693 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1694 GFP_KERNEL); 1695 if (!buffer) 1696 return NULL; 1697 1698 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1699 goto fail_free_buffer; 1700 1701 /* Default buffer page size - one system page */ 1702 buffer->subbuf_order = 0; 1703 buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE; 1704 1705 /* Max payload is buffer page size - header (8bytes) */ 1706 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 1707 1708 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 1709 buffer->flags = flags; 1710 buffer->clock = trace_clock_local; 1711 buffer->reader_lock_key = key; 1712 1713 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1714 init_waitqueue_head(&buffer->irq_work.waiters); 1715 1716 /* need at least two pages */ 1717 if (nr_pages < 2) 1718 nr_pages = 2; 1719 1720 buffer->cpus = nr_cpu_ids; 1721 1722 bsize = sizeof(void *) * nr_cpu_ids; 1723 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1724 GFP_KERNEL); 1725 if (!buffer->buffers) 1726 goto fail_free_cpumask; 1727 1728 cpu = raw_smp_processor_id(); 1729 cpumask_set_cpu(cpu, buffer->cpumask); 1730 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1731 if (!buffer->buffers[cpu]) 1732 goto fail_free_buffers; 1733 1734 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1735 if (ret < 0) 1736 goto fail_free_buffers; 1737 1738 mutex_init(&buffer->mutex); 1739 1740 return buffer; 1741 1742 fail_free_buffers: 1743 for_each_buffer_cpu(buffer, cpu) { 1744 if (buffer->buffers[cpu]) 1745 rb_free_cpu_buffer(buffer->buffers[cpu]); 1746 } 1747 kfree(buffer->buffers); 1748 1749 fail_free_cpumask: 1750 free_cpumask_var(buffer->cpumask); 1751 1752 fail_free_buffer: 1753 kfree(buffer); 1754 return NULL; 1755 } 1756 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1757 1758 /** 1759 * ring_buffer_free - free a ring buffer. 1760 * @buffer: the buffer to free. 1761 */ 1762 void 1763 ring_buffer_free(struct trace_buffer *buffer) 1764 { 1765 int cpu; 1766 1767 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1768 1769 irq_work_sync(&buffer->irq_work.work); 1770 1771 for_each_buffer_cpu(buffer, cpu) 1772 rb_free_cpu_buffer(buffer->buffers[cpu]); 1773 1774 kfree(buffer->buffers); 1775 free_cpumask_var(buffer->cpumask); 1776 1777 kfree(buffer); 1778 } 1779 EXPORT_SYMBOL_GPL(ring_buffer_free); 1780 1781 void ring_buffer_set_clock(struct trace_buffer *buffer, 1782 u64 (*clock)(void)) 1783 { 1784 buffer->clock = clock; 1785 } 1786 1787 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1788 { 1789 buffer->time_stamp_abs = abs; 1790 } 1791 1792 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1793 { 1794 return buffer->time_stamp_abs; 1795 } 1796 1797 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1798 { 1799 return local_read(&bpage->entries) & RB_WRITE_MASK; 1800 } 1801 1802 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1803 { 1804 return local_read(&bpage->write) & RB_WRITE_MASK; 1805 } 1806 1807 static bool 1808 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1809 { 1810 struct list_head *tail_page, *to_remove, *next_page; 1811 struct buffer_page *to_remove_page, *tmp_iter_page; 1812 struct buffer_page *last_page, *first_page; 1813 unsigned long nr_removed; 1814 unsigned long head_bit; 1815 int page_entries; 1816 1817 head_bit = 0; 1818 1819 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1820 atomic_inc(&cpu_buffer->record_disabled); 1821 /* 1822 * We don't race with the readers since we have acquired the reader 1823 * lock. We also don't race with writers after disabling recording. 1824 * This makes it easy to figure out the first and the last page to be 1825 * removed from the list. We unlink all the pages in between including 1826 * the first and last pages. This is done in a busy loop so that we 1827 * lose the least number of traces. 1828 * The pages are freed after we restart recording and unlock readers. 1829 */ 1830 tail_page = &cpu_buffer->tail_page->list; 1831 1832 /* 1833 * tail page might be on reader page, we remove the next page 1834 * from the ring buffer 1835 */ 1836 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1837 tail_page = rb_list_head(tail_page->next); 1838 to_remove = tail_page; 1839 1840 /* start of pages to remove */ 1841 first_page = list_entry(rb_list_head(to_remove->next), 1842 struct buffer_page, list); 1843 1844 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1845 to_remove = rb_list_head(to_remove)->next; 1846 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1847 } 1848 /* Read iterators need to reset themselves when some pages removed */ 1849 cpu_buffer->pages_removed += nr_removed; 1850 1851 next_page = rb_list_head(to_remove)->next; 1852 1853 /* 1854 * Now we remove all pages between tail_page and next_page. 1855 * Make sure that we have head_bit value preserved for the 1856 * next page 1857 */ 1858 tail_page->next = (struct list_head *)((unsigned long)next_page | 1859 head_bit); 1860 next_page = rb_list_head(next_page); 1861 next_page->prev = tail_page; 1862 1863 /* make sure pages points to a valid page in the ring buffer */ 1864 cpu_buffer->pages = next_page; 1865 1866 /* update head page */ 1867 if (head_bit) 1868 cpu_buffer->head_page = list_entry(next_page, 1869 struct buffer_page, list); 1870 1871 /* pages are removed, resume tracing and then free the pages */ 1872 atomic_dec(&cpu_buffer->record_disabled); 1873 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1874 1875 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1876 1877 /* last buffer page to remove */ 1878 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1879 list); 1880 tmp_iter_page = first_page; 1881 1882 do { 1883 cond_resched(); 1884 1885 to_remove_page = tmp_iter_page; 1886 rb_inc_page(&tmp_iter_page); 1887 1888 /* update the counters */ 1889 page_entries = rb_page_entries(to_remove_page); 1890 if (page_entries) { 1891 /* 1892 * If something was added to this page, it was full 1893 * since it is not the tail page. So we deduct the 1894 * bytes consumed in ring buffer from here. 1895 * Increment overrun to account for the lost events. 1896 */ 1897 local_add(page_entries, &cpu_buffer->overrun); 1898 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 1899 local_inc(&cpu_buffer->pages_lost); 1900 } 1901 1902 /* 1903 * We have already removed references to this list item, just 1904 * free up the buffer_page and its page 1905 */ 1906 free_buffer_page(to_remove_page); 1907 nr_removed--; 1908 1909 } while (to_remove_page != last_page); 1910 1911 RB_WARN_ON(cpu_buffer, nr_removed); 1912 1913 return nr_removed == 0; 1914 } 1915 1916 static bool 1917 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1918 { 1919 struct list_head *pages = &cpu_buffer->new_pages; 1920 unsigned long flags; 1921 bool success; 1922 int retries; 1923 1924 /* Can be called at early boot up, where interrupts must not been enabled */ 1925 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1926 /* 1927 * We are holding the reader lock, so the reader page won't be swapped 1928 * in the ring buffer. Now we are racing with the writer trying to 1929 * move head page and the tail page. 1930 * We are going to adapt the reader page update process where: 1931 * 1. We first splice the start and end of list of new pages between 1932 * the head page and its previous page. 1933 * 2. We cmpxchg the prev_page->next to point from head page to the 1934 * start of new pages list. 1935 * 3. Finally, we update the head->prev to the end of new list. 1936 * 1937 * We will try this process 10 times, to make sure that we don't keep 1938 * spinning. 1939 */ 1940 retries = 10; 1941 success = false; 1942 while (retries--) { 1943 struct list_head *head_page, *prev_page; 1944 struct list_head *last_page, *first_page; 1945 struct list_head *head_page_with_bit; 1946 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 1947 1948 if (!hpage) 1949 break; 1950 head_page = &hpage->list; 1951 prev_page = head_page->prev; 1952 1953 first_page = pages->next; 1954 last_page = pages->prev; 1955 1956 head_page_with_bit = (struct list_head *) 1957 ((unsigned long)head_page | RB_PAGE_HEAD); 1958 1959 last_page->next = head_page_with_bit; 1960 first_page->prev = prev_page; 1961 1962 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 1963 if (try_cmpxchg(&prev_page->next, 1964 &head_page_with_bit, first_page)) { 1965 /* 1966 * yay, we replaced the page pointer to our new list, 1967 * now, we just have to update to head page's prev 1968 * pointer to point to end of list 1969 */ 1970 head_page->prev = last_page; 1971 success = true; 1972 break; 1973 } 1974 } 1975 1976 if (success) 1977 INIT_LIST_HEAD(pages); 1978 /* 1979 * If we weren't successful in adding in new pages, warn and stop 1980 * tracing 1981 */ 1982 RB_WARN_ON(cpu_buffer, !success); 1983 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1984 1985 /* free pages if they weren't inserted */ 1986 if (!success) { 1987 struct buffer_page *bpage, *tmp; 1988 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 1989 list) { 1990 list_del_init(&bpage->list); 1991 free_buffer_page(bpage); 1992 } 1993 } 1994 return success; 1995 } 1996 1997 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 1998 { 1999 bool success; 2000 2001 if (cpu_buffer->nr_pages_to_update > 0) 2002 success = rb_insert_pages(cpu_buffer); 2003 else 2004 success = rb_remove_pages(cpu_buffer, 2005 -cpu_buffer->nr_pages_to_update); 2006 2007 if (success) 2008 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2009 } 2010 2011 static void update_pages_handler(struct work_struct *work) 2012 { 2013 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2014 struct ring_buffer_per_cpu, update_pages_work); 2015 rb_update_pages(cpu_buffer); 2016 complete(&cpu_buffer->update_done); 2017 } 2018 2019 /** 2020 * ring_buffer_resize - resize the ring buffer 2021 * @buffer: the buffer to resize. 2022 * @size: the new size. 2023 * @cpu_id: the cpu buffer to resize 2024 * 2025 * Minimum size is 2 * buffer->subbuf_size. 2026 * 2027 * Returns 0 on success and < 0 on failure. 2028 */ 2029 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2030 int cpu_id) 2031 { 2032 struct ring_buffer_per_cpu *cpu_buffer; 2033 unsigned long nr_pages; 2034 int cpu, err; 2035 2036 /* 2037 * Always succeed at resizing a non-existent buffer: 2038 */ 2039 if (!buffer) 2040 return 0; 2041 2042 /* Make sure the requested buffer exists */ 2043 if (cpu_id != RING_BUFFER_ALL_CPUS && 2044 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2045 return 0; 2046 2047 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2048 2049 /* we need a minimum of two pages */ 2050 if (nr_pages < 2) 2051 nr_pages = 2; 2052 2053 /* prevent another thread from changing buffer sizes */ 2054 mutex_lock(&buffer->mutex); 2055 atomic_inc(&buffer->resizing); 2056 2057 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2058 /* 2059 * Don't succeed if resizing is disabled, as a reader might be 2060 * manipulating the ring buffer and is expecting a sane state while 2061 * this is true. 2062 */ 2063 for_each_buffer_cpu(buffer, cpu) { 2064 cpu_buffer = buffer->buffers[cpu]; 2065 if (atomic_read(&cpu_buffer->resize_disabled)) { 2066 err = -EBUSY; 2067 goto out_err_unlock; 2068 } 2069 } 2070 2071 /* calculate the pages to update */ 2072 for_each_buffer_cpu(buffer, cpu) { 2073 cpu_buffer = buffer->buffers[cpu]; 2074 2075 cpu_buffer->nr_pages_to_update = nr_pages - 2076 cpu_buffer->nr_pages; 2077 /* 2078 * nothing more to do for removing pages or no update 2079 */ 2080 if (cpu_buffer->nr_pages_to_update <= 0) 2081 continue; 2082 /* 2083 * to add pages, make sure all new pages can be 2084 * allocated without receiving ENOMEM 2085 */ 2086 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2087 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2088 &cpu_buffer->new_pages)) { 2089 /* not enough memory for new pages */ 2090 err = -ENOMEM; 2091 goto out_err; 2092 } 2093 2094 cond_resched(); 2095 } 2096 2097 cpus_read_lock(); 2098 /* 2099 * Fire off all the required work handlers 2100 * We can't schedule on offline CPUs, but it's not necessary 2101 * since we can change their buffer sizes without any race. 2102 */ 2103 for_each_buffer_cpu(buffer, cpu) { 2104 cpu_buffer = buffer->buffers[cpu]; 2105 if (!cpu_buffer->nr_pages_to_update) 2106 continue; 2107 2108 /* Can't run something on an offline CPU. */ 2109 if (!cpu_online(cpu)) { 2110 rb_update_pages(cpu_buffer); 2111 cpu_buffer->nr_pages_to_update = 0; 2112 } else { 2113 /* Run directly if possible. */ 2114 migrate_disable(); 2115 if (cpu != smp_processor_id()) { 2116 migrate_enable(); 2117 schedule_work_on(cpu, 2118 &cpu_buffer->update_pages_work); 2119 } else { 2120 update_pages_handler(&cpu_buffer->update_pages_work); 2121 migrate_enable(); 2122 } 2123 } 2124 } 2125 2126 /* wait for all the updates to complete */ 2127 for_each_buffer_cpu(buffer, cpu) { 2128 cpu_buffer = buffer->buffers[cpu]; 2129 if (!cpu_buffer->nr_pages_to_update) 2130 continue; 2131 2132 if (cpu_online(cpu)) 2133 wait_for_completion(&cpu_buffer->update_done); 2134 cpu_buffer->nr_pages_to_update = 0; 2135 } 2136 2137 cpus_read_unlock(); 2138 } else { 2139 cpu_buffer = buffer->buffers[cpu_id]; 2140 2141 if (nr_pages == cpu_buffer->nr_pages) 2142 goto out; 2143 2144 /* 2145 * Don't succeed if resizing is disabled, as a reader might be 2146 * manipulating the ring buffer and is expecting a sane state while 2147 * this is true. 2148 */ 2149 if (atomic_read(&cpu_buffer->resize_disabled)) { 2150 err = -EBUSY; 2151 goto out_err_unlock; 2152 } 2153 2154 cpu_buffer->nr_pages_to_update = nr_pages - 2155 cpu_buffer->nr_pages; 2156 2157 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2158 if (cpu_buffer->nr_pages_to_update > 0 && 2159 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2160 &cpu_buffer->new_pages)) { 2161 err = -ENOMEM; 2162 goto out_err; 2163 } 2164 2165 cpus_read_lock(); 2166 2167 /* Can't run something on an offline CPU. */ 2168 if (!cpu_online(cpu_id)) 2169 rb_update_pages(cpu_buffer); 2170 else { 2171 /* Run directly if possible. */ 2172 migrate_disable(); 2173 if (cpu_id == smp_processor_id()) { 2174 rb_update_pages(cpu_buffer); 2175 migrate_enable(); 2176 } else { 2177 migrate_enable(); 2178 schedule_work_on(cpu_id, 2179 &cpu_buffer->update_pages_work); 2180 wait_for_completion(&cpu_buffer->update_done); 2181 } 2182 } 2183 2184 cpu_buffer->nr_pages_to_update = 0; 2185 cpus_read_unlock(); 2186 } 2187 2188 out: 2189 /* 2190 * The ring buffer resize can happen with the ring buffer 2191 * enabled, so that the update disturbs the tracing as little 2192 * as possible. But if the buffer is disabled, we do not need 2193 * to worry about that, and we can take the time to verify 2194 * that the buffer is not corrupt. 2195 */ 2196 if (atomic_read(&buffer->record_disabled)) { 2197 atomic_inc(&buffer->record_disabled); 2198 /* 2199 * Even though the buffer was disabled, we must make sure 2200 * that it is truly disabled before calling rb_check_pages. 2201 * There could have been a race between checking 2202 * record_disable and incrementing it. 2203 */ 2204 synchronize_rcu(); 2205 for_each_buffer_cpu(buffer, cpu) { 2206 unsigned long flags; 2207 2208 cpu_buffer = buffer->buffers[cpu]; 2209 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2210 rb_check_pages(cpu_buffer); 2211 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2212 } 2213 atomic_dec(&buffer->record_disabled); 2214 } 2215 2216 atomic_dec(&buffer->resizing); 2217 mutex_unlock(&buffer->mutex); 2218 return 0; 2219 2220 out_err: 2221 for_each_buffer_cpu(buffer, cpu) { 2222 struct buffer_page *bpage, *tmp; 2223 2224 cpu_buffer = buffer->buffers[cpu]; 2225 cpu_buffer->nr_pages_to_update = 0; 2226 2227 if (list_empty(&cpu_buffer->new_pages)) 2228 continue; 2229 2230 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2231 list) { 2232 list_del_init(&bpage->list); 2233 free_buffer_page(bpage); 2234 } 2235 } 2236 out_err_unlock: 2237 atomic_dec(&buffer->resizing); 2238 mutex_unlock(&buffer->mutex); 2239 return err; 2240 } 2241 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2242 2243 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2244 { 2245 mutex_lock(&buffer->mutex); 2246 if (val) 2247 buffer->flags |= RB_FL_OVERWRITE; 2248 else 2249 buffer->flags &= ~RB_FL_OVERWRITE; 2250 mutex_unlock(&buffer->mutex); 2251 } 2252 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2253 2254 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2255 { 2256 return bpage->page->data + index; 2257 } 2258 2259 static __always_inline struct ring_buffer_event * 2260 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2261 { 2262 return __rb_page_index(cpu_buffer->reader_page, 2263 cpu_buffer->reader_page->read); 2264 } 2265 2266 static struct ring_buffer_event * 2267 rb_iter_head_event(struct ring_buffer_iter *iter) 2268 { 2269 struct ring_buffer_event *event; 2270 struct buffer_page *iter_head_page = iter->head_page; 2271 unsigned long commit; 2272 unsigned length; 2273 2274 if (iter->head != iter->next_event) 2275 return iter->event; 2276 2277 /* 2278 * When the writer goes across pages, it issues a cmpxchg which 2279 * is a mb(), which will synchronize with the rmb here. 2280 * (see rb_tail_page_update() and __rb_reserve_next()) 2281 */ 2282 commit = rb_page_commit(iter_head_page); 2283 smp_rmb(); 2284 2285 /* An event needs to be at least 8 bytes in size */ 2286 if (iter->head > commit - 8) 2287 goto reset; 2288 2289 event = __rb_page_index(iter_head_page, iter->head); 2290 length = rb_event_length(event); 2291 2292 /* 2293 * READ_ONCE() doesn't work on functions and we don't want the 2294 * compiler doing any crazy optimizations with length. 2295 */ 2296 barrier(); 2297 2298 if ((iter->head + length) > commit || length > iter->event_size) 2299 /* Writer corrupted the read? */ 2300 goto reset; 2301 2302 memcpy(iter->event, event, length); 2303 /* 2304 * If the page stamp is still the same after this rmb() then the 2305 * event was safely copied without the writer entering the page. 2306 */ 2307 smp_rmb(); 2308 2309 /* Make sure the page didn't change since we read this */ 2310 if (iter->page_stamp != iter_head_page->page->time_stamp || 2311 commit > rb_page_commit(iter_head_page)) 2312 goto reset; 2313 2314 iter->next_event = iter->head + length; 2315 return iter->event; 2316 reset: 2317 /* Reset to the beginning */ 2318 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2319 iter->head = 0; 2320 iter->next_event = 0; 2321 iter->missed_events = 1; 2322 return NULL; 2323 } 2324 2325 /* Size is determined by what has been committed */ 2326 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2327 { 2328 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 2329 } 2330 2331 static __always_inline unsigned 2332 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2333 { 2334 return rb_page_commit(cpu_buffer->commit_page); 2335 } 2336 2337 static __always_inline unsigned 2338 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 2339 { 2340 unsigned long addr = (unsigned long)event; 2341 2342 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 2343 2344 return addr - BUF_PAGE_HDR_SIZE; 2345 } 2346 2347 static void rb_inc_iter(struct ring_buffer_iter *iter) 2348 { 2349 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2350 2351 /* 2352 * The iterator could be on the reader page (it starts there). 2353 * But the head could have moved, since the reader was 2354 * found. Check for this case and assign the iterator 2355 * to the head page instead of next. 2356 */ 2357 if (iter->head_page == cpu_buffer->reader_page) 2358 iter->head_page = rb_set_head_page(cpu_buffer); 2359 else 2360 rb_inc_page(&iter->head_page); 2361 2362 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2363 iter->head = 0; 2364 iter->next_event = 0; 2365 } 2366 2367 /* 2368 * rb_handle_head_page - writer hit the head page 2369 * 2370 * Returns: +1 to retry page 2371 * 0 to continue 2372 * -1 on error 2373 */ 2374 static int 2375 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2376 struct buffer_page *tail_page, 2377 struct buffer_page *next_page) 2378 { 2379 struct buffer_page *new_head; 2380 int entries; 2381 int type; 2382 int ret; 2383 2384 entries = rb_page_entries(next_page); 2385 2386 /* 2387 * The hard part is here. We need to move the head 2388 * forward, and protect against both readers on 2389 * other CPUs and writers coming in via interrupts. 2390 */ 2391 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2392 RB_PAGE_HEAD); 2393 2394 /* 2395 * type can be one of four: 2396 * NORMAL - an interrupt already moved it for us 2397 * HEAD - we are the first to get here. 2398 * UPDATE - we are the interrupt interrupting 2399 * a current move. 2400 * MOVED - a reader on another CPU moved the next 2401 * pointer to its reader page. Give up 2402 * and try again. 2403 */ 2404 2405 switch (type) { 2406 case RB_PAGE_HEAD: 2407 /* 2408 * We changed the head to UPDATE, thus 2409 * it is our responsibility to update 2410 * the counters. 2411 */ 2412 local_add(entries, &cpu_buffer->overrun); 2413 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 2414 local_inc(&cpu_buffer->pages_lost); 2415 2416 /* 2417 * The entries will be zeroed out when we move the 2418 * tail page. 2419 */ 2420 2421 /* still more to do */ 2422 break; 2423 2424 case RB_PAGE_UPDATE: 2425 /* 2426 * This is an interrupt that interrupt the 2427 * previous update. Still more to do. 2428 */ 2429 break; 2430 case RB_PAGE_NORMAL: 2431 /* 2432 * An interrupt came in before the update 2433 * and processed this for us. 2434 * Nothing left to do. 2435 */ 2436 return 1; 2437 case RB_PAGE_MOVED: 2438 /* 2439 * The reader is on another CPU and just did 2440 * a swap with our next_page. 2441 * Try again. 2442 */ 2443 return 1; 2444 default: 2445 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2446 return -1; 2447 } 2448 2449 /* 2450 * Now that we are here, the old head pointer is 2451 * set to UPDATE. This will keep the reader from 2452 * swapping the head page with the reader page. 2453 * The reader (on another CPU) will spin till 2454 * we are finished. 2455 * 2456 * We just need to protect against interrupts 2457 * doing the job. We will set the next pointer 2458 * to HEAD. After that, we set the old pointer 2459 * to NORMAL, but only if it was HEAD before. 2460 * otherwise we are an interrupt, and only 2461 * want the outer most commit to reset it. 2462 */ 2463 new_head = next_page; 2464 rb_inc_page(&new_head); 2465 2466 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2467 RB_PAGE_NORMAL); 2468 2469 /* 2470 * Valid returns are: 2471 * HEAD - an interrupt came in and already set it. 2472 * NORMAL - One of two things: 2473 * 1) We really set it. 2474 * 2) A bunch of interrupts came in and moved 2475 * the page forward again. 2476 */ 2477 switch (ret) { 2478 case RB_PAGE_HEAD: 2479 case RB_PAGE_NORMAL: 2480 /* OK */ 2481 break; 2482 default: 2483 RB_WARN_ON(cpu_buffer, 1); 2484 return -1; 2485 } 2486 2487 /* 2488 * It is possible that an interrupt came in, 2489 * set the head up, then more interrupts came in 2490 * and moved it again. When we get back here, 2491 * the page would have been set to NORMAL but we 2492 * just set it back to HEAD. 2493 * 2494 * How do you detect this? Well, if that happened 2495 * the tail page would have moved. 2496 */ 2497 if (ret == RB_PAGE_NORMAL) { 2498 struct buffer_page *buffer_tail_page; 2499 2500 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2501 /* 2502 * If the tail had moved passed next, then we need 2503 * to reset the pointer. 2504 */ 2505 if (buffer_tail_page != tail_page && 2506 buffer_tail_page != next_page) 2507 rb_head_page_set_normal(cpu_buffer, new_head, 2508 next_page, 2509 RB_PAGE_HEAD); 2510 } 2511 2512 /* 2513 * If this was the outer most commit (the one that 2514 * changed the original pointer from HEAD to UPDATE), 2515 * then it is up to us to reset it to NORMAL. 2516 */ 2517 if (type == RB_PAGE_HEAD) { 2518 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2519 tail_page, 2520 RB_PAGE_UPDATE); 2521 if (RB_WARN_ON(cpu_buffer, 2522 ret != RB_PAGE_UPDATE)) 2523 return -1; 2524 } 2525 2526 return 0; 2527 } 2528 2529 static inline void 2530 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2531 unsigned long tail, struct rb_event_info *info) 2532 { 2533 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 2534 struct buffer_page *tail_page = info->tail_page; 2535 struct ring_buffer_event *event; 2536 unsigned long length = info->length; 2537 2538 /* 2539 * Only the event that crossed the page boundary 2540 * must fill the old tail_page with padding. 2541 */ 2542 if (tail >= bsize) { 2543 /* 2544 * If the page was filled, then we still need 2545 * to update the real_end. Reset it to zero 2546 * and the reader will ignore it. 2547 */ 2548 if (tail == bsize) 2549 tail_page->real_end = 0; 2550 2551 local_sub(length, &tail_page->write); 2552 return; 2553 } 2554 2555 event = __rb_page_index(tail_page, tail); 2556 2557 /* 2558 * Save the original length to the meta data. 2559 * This will be used by the reader to add lost event 2560 * counter. 2561 */ 2562 tail_page->real_end = tail; 2563 2564 /* 2565 * If this event is bigger than the minimum size, then 2566 * we need to be careful that we don't subtract the 2567 * write counter enough to allow another writer to slip 2568 * in on this page. 2569 * We put in a discarded commit instead, to make sure 2570 * that this space is not used again, and this space will 2571 * not be accounted into 'entries_bytes'. 2572 * 2573 * If we are less than the minimum size, we don't need to 2574 * worry about it. 2575 */ 2576 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 2577 /* No room for any events */ 2578 2579 /* Mark the rest of the page with padding */ 2580 rb_event_set_padding(event); 2581 2582 /* Make sure the padding is visible before the write update */ 2583 smp_wmb(); 2584 2585 /* Set the write back to the previous setting */ 2586 local_sub(length, &tail_page->write); 2587 return; 2588 } 2589 2590 /* Put in a discarded event */ 2591 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 2592 event->type_len = RINGBUF_TYPE_PADDING; 2593 /* time delta must be non zero */ 2594 event->time_delta = 1; 2595 2596 /* account for padding bytes */ 2597 local_add(bsize - tail, &cpu_buffer->entries_bytes); 2598 2599 /* Make sure the padding is visible before the tail_page->write update */ 2600 smp_wmb(); 2601 2602 /* Set write to end of buffer */ 2603 length = (tail + length) - bsize; 2604 local_sub(length, &tail_page->write); 2605 } 2606 2607 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2608 2609 /* 2610 * This is the slow path, force gcc not to inline it. 2611 */ 2612 static noinline struct ring_buffer_event * 2613 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2614 unsigned long tail, struct rb_event_info *info) 2615 { 2616 struct buffer_page *tail_page = info->tail_page; 2617 struct buffer_page *commit_page = cpu_buffer->commit_page; 2618 struct trace_buffer *buffer = cpu_buffer->buffer; 2619 struct buffer_page *next_page; 2620 int ret; 2621 2622 next_page = tail_page; 2623 2624 rb_inc_page(&next_page); 2625 2626 /* 2627 * If for some reason, we had an interrupt storm that made 2628 * it all the way around the buffer, bail, and warn 2629 * about it. 2630 */ 2631 if (unlikely(next_page == commit_page)) { 2632 local_inc(&cpu_buffer->commit_overrun); 2633 goto out_reset; 2634 } 2635 2636 /* 2637 * This is where the fun begins! 2638 * 2639 * We are fighting against races between a reader that 2640 * could be on another CPU trying to swap its reader 2641 * page with the buffer head. 2642 * 2643 * We are also fighting against interrupts coming in and 2644 * moving the head or tail on us as well. 2645 * 2646 * If the next page is the head page then we have filled 2647 * the buffer, unless the commit page is still on the 2648 * reader page. 2649 */ 2650 if (rb_is_head_page(next_page, &tail_page->list)) { 2651 2652 /* 2653 * If the commit is not on the reader page, then 2654 * move the header page. 2655 */ 2656 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2657 /* 2658 * If we are not in overwrite mode, 2659 * this is easy, just stop here. 2660 */ 2661 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2662 local_inc(&cpu_buffer->dropped_events); 2663 goto out_reset; 2664 } 2665 2666 ret = rb_handle_head_page(cpu_buffer, 2667 tail_page, 2668 next_page); 2669 if (ret < 0) 2670 goto out_reset; 2671 if (ret) 2672 goto out_again; 2673 } else { 2674 /* 2675 * We need to be careful here too. The 2676 * commit page could still be on the reader 2677 * page. We could have a small buffer, and 2678 * have filled up the buffer with events 2679 * from interrupts and such, and wrapped. 2680 * 2681 * Note, if the tail page is also on the 2682 * reader_page, we let it move out. 2683 */ 2684 if (unlikely((cpu_buffer->commit_page != 2685 cpu_buffer->tail_page) && 2686 (cpu_buffer->commit_page == 2687 cpu_buffer->reader_page))) { 2688 local_inc(&cpu_buffer->commit_overrun); 2689 goto out_reset; 2690 } 2691 } 2692 } 2693 2694 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2695 2696 out_again: 2697 2698 rb_reset_tail(cpu_buffer, tail, info); 2699 2700 /* Commit what we have for now. */ 2701 rb_end_commit(cpu_buffer); 2702 /* rb_end_commit() decs committing */ 2703 local_inc(&cpu_buffer->committing); 2704 2705 /* fail and let the caller try again */ 2706 return ERR_PTR(-EAGAIN); 2707 2708 out_reset: 2709 /* reset write */ 2710 rb_reset_tail(cpu_buffer, tail, info); 2711 2712 return NULL; 2713 } 2714 2715 /* Slow path */ 2716 static struct ring_buffer_event * 2717 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2718 struct ring_buffer_event *event, u64 delta, bool abs) 2719 { 2720 if (abs) 2721 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2722 else 2723 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2724 2725 /* Not the first event on the page, or not delta? */ 2726 if (abs || rb_event_index(cpu_buffer, event)) { 2727 event->time_delta = delta & TS_MASK; 2728 event->array[0] = delta >> TS_SHIFT; 2729 } else { 2730 /* nope, just zero it */ 2731 event->time_delta = 0; 2732 event->array[0] = 0; 2733 } 2734 2735 return skip_time_extend(event); 2736 } 2737 2738 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2739 static inline bool sched_clock_stable(void) 2740 { 2741 return true; 2742 } 2743 #endif 2744 2745 static void 2746 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2747 struct rb_event_info *info) 2748 { 2749 u64 write_stamp; 2750 2751 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2752 (unsigned long long)info->delta, 2753 (unsigned long long)info->ts, 2754 (unsigned long long)info->before, 2755 (unsigned long long)info->after, 2756 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 2757 sched_clock_stable() ? "" : 2758 "If you just came from a suspend/resume,\n" 2759 "please switch to the trace global clock:\n" 2760 " echo global > /sys/kernel/tracing/trace_clock\n" 2761 "or add trace_clock=global to the kernel command line\n"); 2762 } 2763 2764 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2765 struct ring_buffer_event **event, 2766 struct rb_event_info *info, 2767 u64 *delta, 2768 unsigned int *length) 2769 { 2770 bool abs = info->add_timestamp & 2771 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2772 2773 if (unlikely(info->delta > (1ULL << 59))) { 2774 /* 2775 * Some timers can use more than 59 bits, and when a timestamp 2776 * is added to the buffer, it will lose those bits. 2777 */ 2778 if (abs && (info->ts & TS_MSB)) { 2779 info->delta &= ABS_TS_MASK; 2780 2781 /* did the clock go backwards */ 2782 } else if (info->before == info->after && info->before > info->ts) { 2783 /* not interrupted */ 2784 static int once; 2785 2786 /* 2787 * This is possible with a recalibrating of the TSC. 2788 * Do not produce a call stack, but just report it. 2789 */ 2790 if (!once) { 2791 once++; 2792 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2793 info->before, info->ts); 2794 } 2795 } else 2796 rb_check_timestamp(cpu_buffer, info); 2797 if (!abs) 2798 info->delta = 0; 2799 } 2800 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 2801 *length -= RB_LEN_TIME_EXTEND; 2802 *delta = 0; 2803 } 2804 2805 /** 2806 * rb_update_event - update event type and data 2807 * @cpu_buffer: The per cpu buffer of the @event 2808 * @event: the event to update 2809 * @info: The info to update the @event with (contains length and delta) 2810 * 2811 * Update the type and data fields of the @event. The length 2812 * is the actual size that is written to the ring buffer, 2813 * and with this, we can determine what to place into the 2814 * data field. 2815 */ 2816 static void 2817 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2818 struct ring_buffer_event *event, 2819 struct rb_event_info *info) 2820 { 2821 unsigned length = info->length; 2822 u64 delta = info->delta; 2823 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2824 2825 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2826 cpu_buffer->event_stamp[nest] = info->ts; 2827 2828 /* 2829 * If we need to add a timestamp, then we 2830 * add it to the start of the reserved space. 2831 */ 2832 if (unlikely(info->add_timestamp)) 2833 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2834 2835 event->time_delta = delta; 2836 length -= RB_EVNT_HDR_SIZE; 2837 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2838 event->type_len = 0; 2839 event->array[0] = length; 2840 } else 2841 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2842 } 2843 2844 static unsigned rb_calculate_event_length(unsigned length) 2845 { 2846 struct ring_buffer_event event; /* Used only for sizeof array */ 2847 2848 /* zero length can cause confusions */ 2849 if (!length) 2850 length++; 2851 2852 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2853 length += sizeof(event.array[0]); 2854 2855 length += RB_EVNT_HDR_SIZE; 2856 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2857 2858 /* 2859 * In case the time delta is larger than the 27 bits for it 2860 * in the header, we need to add a timestamp. If another 2861 * event comes in when trying to discard this one to increase 2862 * the length, then the timestamp will be added in the allocated 2863 * space of this event. If length is bigger than the size needed 2864 * for the TIME_EXTEND, then padding has to be used. The events 2865 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2866 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2867 * As length is a multiple of 4, we only need to worry if it 2868 * is 12 (RB_LEN_TIME_EXTEND + 4). 2869 */ 2870 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2871 length += RB_ALIGNMENT; 2872 2873 return length; 2874 } 2875 2876 static inline bool 2877 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2878 struct ring_buffer_event *event) 2879 { 2880 unsigned long new_index, old_index; 2881 struct buffer_page *bpage; 2882 unsigned long addr; 2883 2884 new_index = rb_event_index(cpu_buffer, event); 2885 old_index = new_index + rb_event_ts_length(event); 2886 addr = (unsigned long)event; 2887 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 2888 2889 bpage = READ_ONCE(cpu_buffer->tail_page); 2890 2891 /* 2892 * Make sure the tail_page is still the same and 2893 * the next write location is the end of this event 2894 */ 2895 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2896 unsigned long write_mask = 2897 local_read(&bpage->write) & ~RB_WRITE_MASK; 2898 unsigned long event_length = rb_event_length(event); 2899 2900 /* 2901 * For the before_stamp to be different than the write_stamp 2902 * to make sure that the next event adds an absolute 2903 * value and does not rely on the saved write stamp, which 2904 * is now going to be bogus. 2905 * 2906 * By setting the before_stamp to zero, the next event 2907 * is not going to use the write_stamp and will instead 2908 * create an absolute timestamp. This means there's no 2909 * reason to update the wirte_stamp! 2910 */ 2911 rb_time_set(&cpu_buffer->before_stamp, 0); 2912 2913 /* 2914 * If an event were to come in now, it would see that the 2915 * write_stamp and the before_stamp are different, and assume 2916 * that this event just added itself before updating 2917 * the write stamp. The interrupting event will fix the 2918 * write stamp for us, and use an absolute timestamp. 2919 */ 2920 2921 /* 2922 * This is on the tail page. It is possible that 2923 * a write could come in and move the tail page 2924 * and write to the next page. That is fine 2925 * because we just shorten what is on this page. 2926 */ 2927 old_index += write_mask; 2928 new_index += write_mask; 2929 2930 /* caution: old_index gets updated on cmpxchg failure */ 2931 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 2932 /* update counters */ 2933 local_sub(event_length, &cpu_buffer->entries_bytes); 2934 return true; 2935 } 2936 } 2937 2938 /* could not discard */ 2939 return false; 2940 } 2941 2942 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2943 { 2944 local_inc(&cpu_buffer->committing); 2945 local_inc(&cpu_buffer->commits); 2946 } 2947 2948 static __always_inline void 2949 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2950 { 2951 unsigned long max_count; 2952 2953 /* 2954 * We only race with interrupts and NMIs on this CPU. 2955 * If we own the commit event, then we can commit 2956 * all others that interrupted us, since the interruptions 2957 * are in stack format (they finish before they come 2958 * back to us). This allows us to do a simple loop to 2959 * assign the commit to the tail. 2960 */ 2961 again: 2962 max_count = cpu_buffer->nr_pages * 100; 2963 2964 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2965 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2966 return; 2967 if (RB_WARN_ON(cpu_buffer, 2968 rb_is_reader_page(cpu_buffer->tail_page))) 2969 return; 2970 /* 2971 * No need for a memory barrier here, as the update 2972 * of the tail_page did it for this page. 2973 */ 2974 local_set(&cpu_buffer->commit_page->page->commit, 2975 rb_page_write(cpu_buffer->commit_page)); 2976 rb_inc_page(&cpu_buffer->commit_page); 2977 /* add barrier to keep gcc from optimizing too much */ 2978 barrier(); 2979 } 2980 while (rb_commit_index(cpu_buffer) != 2981 rb_page_write(cpu_buffer->commit_page)) { 2982 2983 /* Make sure the readers see the content of what is committed. */ 2984 smp_wmb(); 2985 local_set(&cpu_buffer->commit_page->page->commit, 2986 rb_page_write(cpu_buffer->commit_page)); 2987 RB_WARN_ON(cpu_buffer, 2988 local_read(&cpu_buffer->commit_page->page->commit) & 2989 ~RB_WRITE_MASK); 2990 barrier(); 2991 } 2992 2993 /* again, keep gcc from optimizing */ 2994 barrier(); 2995 2996 /* 2997 * If an interrupt came in just after the first while loop 2998 * and pushed the tail page forward, we will be left with 2999 * a dangling commit that will never go forward. 3000 */ 3001 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3002 goto again; 3003 } 3004 3005 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3006 { 3007 unsigned long commits; 3008 3009 if (RB_WARN_ON(cpu_buffer, 3010 !local_read(&cpu_buffer->committing))) 3011 return; 3012 3013 again: 3014 commits = local_read(&cpu_buffer->commits); 3015 /* synchronize with interrupts */ 3016 barrier(); 3017 if (local_read(&cpu_buffer->committing) == 1) 3018 rb_set_commit_to_write(cpu_buffer); 3019 3020 local_dec(&cpu_buffer->committing); 3021 3022 /* synchronize with interrupts */ 3023 barrier(); 3024 3025 /* 3026 * Need to account for interrupts coming in between the 3027 * updating of the commit page and the clearing of the 3028 * committing counter. 3029 */ 3030 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3031 !local_read(&cpu_buffer->committing)) { 3032 local_inc(&cpu_buffer->committing); 3033 goto again; 3034 } 3035 } 3036 3037 static inline void rb_event_discard(struct ring_buffer_event *event) 3038 { 3039 if (extended_time(event)) 3040 event = skip_time_extend(event); 3041 3042 /* array[0] holds the actual length for the discarded event */ 3043 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3044 event->type_len = RINGBUF_TYPE_PADDING; 3045 /* time delta must be non zero */ 3046 if (!event->time_delta) 3047 event->time_delta = 1; 3048 } 3049 3050 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3051 { 3052 local_inc(&cpu_buffer->entries); 3053 rb_end_commit(cpu_buffer); 3054 } 3055 3056 static __always_inline void 3057 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3058 { 3059 if (buffer->irq_work.waiters_pending) { 3060 buffer->irq_work.waiters_pending = false; 3061 /* irq_work_queue() supplies it's own memory barriers */ 3062 irq_work_queue(&buffer->irq_work.work); 3063 } 3064 3065 if (cpu_buffer->irq_work.waiters_pending) { 3066 cpu_buffer->irq_work.waiters_pending = false; 3067 /* irq_work_queue() supplies it's own memory barriers */ 3068 irq_work_queue(&cpu_buffer->irq_work.work); 3069 } 3070 3071 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3072 return; 3073 3074 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3075 return; 3076 3077 if (!cpu_buffer->irq_work.full_waiters_pending) 3078 return; 3079 3080 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3081 3082 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3083 return; 3084 3085 cpu_buffer->irq_work.wakeup_full = true; 3086 cpu_buffer->irq_work.full_waiters_pending = false; 3087 /* irq_work_queue() supplies it's own memory barriers */ 3088 irq_work_queue(&cpu_buffer->irq_work.work); 3089 } 3090 3091 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3092 # define do_ring_buffer_record_recursion() \ 3093 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3094 #else 3095 # define do_ring_buffer_record_recursion() do { } while (0) 3096 #endif 3097 3098 /* 3099 * The lock and unlock are done within a preempt disable section. 3100 * The current_context per_cpu variable can only be modified 3101 * by the current task between lock and unlock. But it can 3102 * be modified more than once via an interrupt. To pass this 3103 * information from the lock to the unlock without having to 3104 * access the 'in_interrupt()' functions again (which do show 3105 * a bit of overhead in something as critical as function tracing, 3106 * we use a bitmask trick. 3107 * 3108 * bit 1 = NMI context 3109 * bit 2 = IRQ context 3110 * bit 3 = SoftIRQ context 3111 * bit 4 = normal context. 3112 * 3113 * This works because this is the order of contexts that can 3114 * preempt other contexts. A SoftIRQ never preempts an IRQ 3115 * context. 3116 * 3117 * When the context is determined, the corresponding bit is 3118 * checked and set (if it was set, then a recursion of that context 3119 * happened). 3120 * 3121 * On unlock, we need to clear this bit. To do so, just subtract 3122 * 1 from the current_context and AND it to itself. 3123 * 3124 * (binary) 3125 * 101 - 1 = 100 3126 * 101 & 100 = 100 (clearing bit zero) 3127 * 3128 * 1010 - 1 = 1001 3129 * 1010 & 1001 = 1000 (clearing bit 1) 3130 * 3131 * The least significant bit can be cleared this way, and it 3132 * just so happens that it is the same bit corresponding to 3133 * the current context. 3134 * 3135 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3136 * is set when a recursion is detected at the current context, and if 3137 * the TRANSITION bit is already set, it will fail the recursion. 3138 * This is needed because there's a lag between the changing of 3139 * interrupt context and updating the preempt count. In this case, 3140 * a false positive will be found. To handle this, one extra recursion 3141 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3142 * bit is already set, then it is considered a recursion and the function 3143 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3144 * 3145 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3146 * to be cleared. Even if it wasn't the context that set it. That is, 3147 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3148 * is called before preempt_count() is updated, since the check will 3149 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3150 * NMI then comes in, it will set the NMI bit, but when the NMI code 3151 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3152 * and leave the NMI bit set. But this is fine, because the interrupt 3153 * code that set the TRANSITION bit will then clear the NMI bit when it 3154 * calls trace_recursive_unlock(). If another NMI comes in, it will 3155 * set the TRANSITION bit and continue. 3156 * 3157 * Note: The TRANSITION bit only handles a single transition between context. 3158 */ 3159 3160 static __always_inline bool 3161 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3162 { 3163 unsigned int val = cpu_buffer->current_context; 3164 int bit = interrupt_context_level(); 3165 3166 bit = RB_CTX_NORMAL - bit; 3167 3168 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3169 /* 3170 * It is possible that this was called by transitioning 3171 * between interrupt context, and preempt_count() has not 3172 * been updated yet. In this case, use the TRANSITION bit. 3173 */ 3174 bit = RB_CTX_TRANSITION; 3175 if (val & (1 << (bit + cpu_buffer->nest))) { 3176 do_ring_buffer_record_recursion(); 3177 return true; 3178 } 3179 } 3180 3181 val |= (1 << (bit + cpu_buffer->nest)); 3182 cpu_buffer->current_context = val; 3183 3184 return false; 3185 } 3186 3187 static __always_inline void 3188 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3189 { 3190 cpu_buffer->current_context &= 3191 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3192 } 3193 3194 /* The recursive locking above uses 5 bits */ 3195 #define NESTED_BITS 5 3196 3197 /** 3198 * ring_buffer_nest_start - Allow to trace while nested 3199 * @buffer: The ring buffer to modify 3200 * 3201 * The ring buffer has a safety mechanism to prevent recursion. 3202 * But there may be a case where a trace needs to be done while 3203 * tracing something else. In this case, calling this function 3204 * will allow this function to nest within a currently active 3205 * ring_buffer_lock_reserve(). 3206 * 3207 * Call this function before calling another ring_buffer_lock_reserve() and 3208 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3209 */ 3210 void ring_buffer_nest_start(struct trace_buffer *buffer) 3211 { 3212 struct ring_buffer_per_cpu *cpu_buffer; 3213 int cpu; 3214 3215 /* Enabled by ring_buffer_nest_end() */ 3216 preempt_disable_notrace(); 3217 cpu = raw_smp_processor_id(); 3218 cpu_buffer = buffer->buffers[cpu]; 3219 /* This is the shift value for the above recursive locking */ 3220 cpu_buffer->nest += NESTED_BITS; 3221 } 3222 3223 /** 3224 * ring_buffer_nest_end - Allow to trace while nested 3225 * @buffer: The ring buffer to modify 3226 * 3227 * Must be called after ring_buffer_nest_start() and after the 3228 * ring_buffer_unlock_commit(). 3229 */ 3230 void ring_buffer_nest_end(struct trace_buffer *buffer) 3231 { 3232 struct ring_buffer_per_cpu *cpu_buffer; 3233 int cpu; 3234 3235 /* disabled by ring_buffer_nest_start() */ 3236 cpu = raw_smp_processor_id(); 3237 cpu_buffer = buffer->buffers[cpu]; 3238 /* This is the shift value for the above recursive locking */ 3239 cpu_buffer->nest -= NESTED_BITS; 3240 preempt_enable_notrace(); 3241 } 3242 3243 /** 3244 * ring_buffer_unlock_commit - commit a reserved 3245 * @buffer: The buffer to commit to 3246 * 3247 * This commits the data to the ring buffer, and releases any locks held. 3248 * 3249 * Must be paired with ring_buffer_lock_reserve. 3250 */ 3251 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3252 { 3253 struct ring_buffer_per_cpu *cpu_buffer; 3254 int cpu = raw_smp_processor_id(); 3255 3256 cpu_buffer = buffer->buffers[cpu]; 3257 3258 rb_commit(cpu_buffer); 3259 3260 rb_wakeups(buffer, cpu_buffer); 3261 3262 trace_recursive_unlock(cpu_buffer); 3263 3264 preempt_enable_notrace(); 3265 3266 return 0; 3267 } 3268 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3269 3270 /* Special value to validate all deltas on a page. */ 3271 #define CHECK_FULL_PAGE 1L 3272 3273 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3274 3275 static const char *show_irq_str(int bits) 3276 { 3277 const char *type[] = { 3278 ".", // 0 3279 "s", // 1 3280 "h", // 2 3281 "Hs", // 3 3282 "n", // 4 3283 "Ns", // 5 3284 "Nh", // 6 3285 "NHs", // 7 3286 }; 3287 3288 return type[bits]; 3289 } 3290 3291 /* Assume this is an trace event */ 3292 static const char *show_flags(struct ring_buffer_event *event) 3293 { 3294 struct trace_entry *entry; 3295 int bits = 0; 3296 3297 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3298 return "X"; 3299 3300 entry = ring_buffer_event_data(event); 3301 3302 if (entry->flags & TRACE_FLAG_SOFTIRQ) 3303 bits |= 1; 3304 3305 if (entry->flags & TRACE_FLAG_HARDIRQ) 3306 bits |= 2; 3307 3308 if (entry->flags & TRACE_FLAG_NMI) 3309 bits |= 4; 3310 3311 return show_irq_str(bits); 3312 } 3313 3314 static const char *show_irq(struct ring_buffer_event *event) 3315 { 3316 struct trace_entry *entry; 3317 3318 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3319 return ""; 3320 3321 entry = ring_buffer_event_data(event); 3322 if (entry->flags & TRACE_FLAG_IRQS_OFF) 3323 return "d"; 3324 return ""; 3325 } 3326 3327 static const char *show_interrupt_level(void) 3328 { 3329 unsigned long pc = preempt_count(); 3330 unsigned char level = 0; 3331 3332 if (pc & SOFTIRQ_OFFSET) 3333 level |= 1; 3334 3335 if (pc & HARDIRQ_MASK) 3336 level |= 2; 3337 3338 if (pc & NMI_MASK) 3339 level |= 4; 3340 3341 return show_irq_str(level); 3342 } 3343 3344 static void dump_buffer_page(struct buffer_data_page *bpage, 3345 struct rb_event_info *info, 3346 unsigned long tail) 3347 { 3348 struct ring_buffer_event *event; 3349 u64 ts, delta; 3350 int e; 3351 3352 ts = bpage->time_stamp; 3353 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3354 3355 for (e = 0; e < tail; e += rb_event_length(event)) { 3356 3357 event = (struct ring_buffer_event *)(bpage->data + e); 3358 3359 switch (event->type_len) { 3360 3361 case RINGBUF_TYPE_TIME_EXTEND: 3362 delta = rb_event_time_stamp(event); 3363 ts += delta; 3364 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 3365 e, ts, delta); 3366 break; 3367 3368 case RINGBUF_TYPE_TIME_STAMP: 3369 delta = rb_event_time_stamp(event); 3370 ts = rb_fix_abs_ts(delta, ts); 3371 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 3372 e, ts, delta); 3373 break; 3374 3375 case RINGBUF_TYPE_PADDING: 3376 ts += event->time_delta; 3377 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 3378 e, ts, event->time_delta); 3379 break; 3380 3381 case RINGBUF_TYPE_DATA: 3382 ts += event->time_delta; 3383 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 3384 e, ts, event->time_delta, 3385 show_flags(event), show_irq(event)); 3386 break; 3387 3388 default: 3389 break; 3390 } 3391 } 3392 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 3393 } 3394 3395 static DEFINE_PER_CPU(atomic_t, checking); 3396 static atomic_t ts_dump; 3397 3398 #define buffer_warn_return(fmt, ...) \ 3399 do { \ 3400 /* If another report is happening, ignore this one */ \ 3401 if (atomic_inc_return(&ts_dump) != 1) { \ 3402 atomic_dec(&ts_dump); \ 3403 goto out; \ 3404 } \ 3405 atomic_inc(&cpu_buffer->record_disabled); \ 3406 pr_warn(fmt, ##__VA_ARGS__); \ 3407 dump_buffer_page(bpage, info, tail); \ 3408 atomic_dec(&ts_dump); \ 3409 /* There's some cases in boot up that this can happen */ \ 3410 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 3411 /* Do not re-enable checking */ \ 3412 return; \ 3413 } while (0) 3414 3415 /* 3416 * Check if the current event time stamp matches the deltas on 3417 * the buffer page. 3418 */ 3419 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3420 struct rb_event_info *info, 3421 unsigned long tail) 3422 { 3423 struct ring_buffer_event *event; 3424 struct buffer_data_page *bpage; 3425 u64 ts, delta; 3426 bool full = false; 3427 int e; 3428 3429 bpage = info->tail_page->page; 3430 3431 if (tail == CHECK_FULL_PAGE) { 3432 full = true; 3433 tail = local_read(&bpage->commit); 3434 } else if (info->add_timestamp & 3435 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3436 /* Ignore events with absolute time stamps */ 3437 return; 3438 } 3439 3440 /* 3441 * Do not check the first event (skip possible extends too). 3442 * Also do not check if previous events have not been committed. 3443 */ 3444 if (tail <= 8 || tail > local_read(&bpage->commit)) 3445 return; 3446 3447 /* 3448 * If this interrupted another event, 3449 */ 3450 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3451 goto out; 3452 3453 ts = bpage->time_stamp; 3454 3455 for (e = 0; e < tail; e += rb_event_length(event)) { 3456 3457 event = (struct ring_buffer_event *)(bpage->data + e); 3458 3459 switch (event->type_len) { 3460 3461 case RINGBUF_TYPE_TIME_EXTEND: 3462 delta = rb_event_time_stamp(event); 3463 ts += delta; 3464 break; 3465 3466 case RINGBUF_TYPE_TIME_STAMP: 3467 delta = rb_event_time_stamp(event); 3468 delta = rb_fix_abs_ts(delta, ts); 3469 if (delta < ts) { 3470 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 3471 cpu_buffer->cpu, ts, delta); 3472 } 3473 ts = delta; 3474 break; 3475 3476 case RINGBUF_TYPE_PADDING: 3477 if (event->time_delta == 1) 3478 break; 3479 fallthrough; 3480 case RINGBUF_TYPE_DATA: 3481 ts += event->time_delta; 3482 break; 3483 3484 default: 3485 RB_WARN_ON(cpu_buffer, 1); 3486 } 3487 } 3488 if ((full && ts > info->ts) || 3489 (!full && ts + info->delta != info->ts)) { 3490 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 3491 cpu_buffer->cpu, 3492 ts + info->delta, info->ts, info->delta, 3493 info->before, info->after, 3494 full ? " (full)" : "", show_interrupt_level()); 3495 } 3496 out: 3497 atomic_dec(this_cpu_ptr(&checking)); 3498 } 3499 #else 3500 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3501 struct rb_event_info *info, 3502 unsigned long tail) 3503 { 3504 } 3505 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3506 3507 static struct ring_buffer_event * 3508 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3509 struct rb_event_info *info) 3510 { 3511 struct ring_buffer_event *event; 3512 struct buffer_page *tail_page; 3513 unsigned long tail, write, w; 3514 3515 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3516 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3517 3518 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3519 barrier(); 3520 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3521 rb_time_read(&cpu_buffer->write_stamp, &info->after); 3522 barrier(); 3523 info->ts = rb_time_stamp(cpu_buffer->buffer); 3524 3525 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3526 info->delta = info->ts; 3527 } else { 3528 /* 3529 * If interrupting an event time update, we may need an 3530 * absolute timestamp. 3531 * Don't bother if this is the start of a new page (w == 0). 3532 */ 3533 if (!w) { 3534 /* Use the sub-buffer timestamp */ 3535 info->delta = 0; 3536 } else if (unlikely(info->before != info->after)) { 3537 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3538 info->length += RB_LEN_TIME_EXTEND; 3539 } else { 3540 info->delta = info->ts - info->after; 3541 if (unlikely(test_time_stamp(info->delta))) { 3542 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3543 info->length += RB_LEN_TIME_EXTEND; 3544 } 3545 } 3546 } 3547 3548 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3549 3550 /*C*/ write = local_add_return(info->length, &tail_page->write); 3551 3552 /* set write to only the index of the write */ 3553 write &= RB_WRITE_MASK; 3554 3555 tail = write - info->length; 3556 3557 /* See if we shot pass the end of this buffer page */ 3558 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 3559 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3560 return rb_move_tail(cpu_buffer, tail, info); 3561 } 3562 3563 if (likely(tail == w)) { 3564 /* Nothing interrupted us between A and C */ 3565 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3566 /* 3567 * If something came in between C and D, the write stamp 3568 * may now not be in sync. But that's fine as the before_stamp 3569 * will be different and then next event will just be forced 3570 * to use an absolute timestamp. 3571 */ 3572 if (likely(!(info->add_timestamp & 3573 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3574 /* This did not interrupt any time update */ 3575 info->delta = info->ts - info->after; 3576 else 3577 /* Just use full timestamp for interrupting event */ 3578 info->delta = info->ts; 3579 check_buffer(cpu_buffer, info, tail); 3580 } else { 3581 u64 ts; 3582 /* SLOW PATH - Interrupted between A and C */ 3583 3584 /* Save the old before_stamp */ 3585 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3586 3587 /* 3588 * Read a new timestamp and update the before_stamp to make 3589 * the next event after this one force using an absolute 3590 * timestamp. This is in case an interrupt were to come in 3591 * between E and F. 3592 */ 3593 ts = rb_time_stamp(cpu_buffer->buffer); 3594 rb_time_set(&cpu_buffer->before_stamp, ts); 3595 3596 barrier(); 3597 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 3598 barrier(); 3599 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3600 info->after == info->before && info->after < ts) { 3601 /* 3602 * Nothing came after this event between C and F, it is 3603 * safe to use info->after for the delta as it 3604 * matched info->before and is still valid. 3605 */ 3606 info->delta = ts - info->after; 3607 } else { 3608 /* 3609 * Interrupted between C and F: 3610 * Lost the previous events time stamp. Just set the 3611 * delta to zero, and this will be the same time as 3612 * the event this event interrupted. And the events that 3613 * came after this will still be correct (as they would 3614 * have built their delta on the previous event. 3615 */ 3616 info->delta = 0; 3617 } 3618 info->ts = ts; 3619 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3620 } 3621 3622 /* 3623 * If this is the first commit on the page, then it has the same 3624 * timestamp as the page itself. 3625 */ 3626 if (unlikely(!tail && !(info->add_timestamp & 3627 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3628 info->delta = 0; 3629 3630 /* We reserved something on the buffer */ 3631 3632 event = __rb_page_index(tail_page, tail); 3633 rb_update_event(cpu_buffer, event, info); 3634 3635 local_inc(&tail_page->entries); 3636 3637 /* 3638 * If this is the first commit on the page, then update 3639 * its timestamp. 3640 */ 3641 if (unlikely(!tail)) 3642 tail_page->page->time_stamp = info->ts; 3643 3644 /* account for these added bytes */ 3645 local_add(info->length, &cpu_buffer->entries_bytes); 3646 3647 return event; 3648 } 3649 3650 static __always_inline struct ring_buffer_event * 3651 rb_reserve_next_event(struct trace_buffer *buffer, 3652 struct ring_buffer_per_cpu *cpu_buffer, 3653 unsigned long length) 3654 { 3655 struct ring_buffer_event *event; 3656 struct rb_event_info info; 3657 int nr_loops = 0; 3658 int add_ts_default; 3659 3660 /* ring buffer does cmpxchg, make sure it is safe in NMI context */ 3661 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) && 3662 (unlikely(in_nmi()))) { 3663 return NULL; 3664 } 3665 3666 rb_start_commit(cpu_buffer); 3667 /* The commit page can not change after this */ 3668 3669 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3670 /* 3671 * Due to the ability to swap a cpu buffer from a buffer 3672 * it is possible it was swapped before we committed. 3673 * (committing stops a swap). We check for it here and 3674 * if it happened, we have to fail the write. 3675 */ 3676 barrier(); 3677 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3678 local_dec(&cpu_buffer->committing); 3679 local_dec(&cpu_buffer->commits); 3680 return NULL; 3681 } 3682 #endif 3683 3684 info.length = rb_calculate_event_length(length); 3685 3686 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3687 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3688 info.length += RB_LEN_TIME_EXTEND; 3689 if (info.length > cpu_buffer->buffer->max_data_size) 3690 goto out_fail; 3691 } else { 3692 add_ts_default = RB_ADD_STAMP_NONE; 3693 } 3694 3695 again: 3696 info.add_timestamp = add_ts_default; 3697 info.delta = 0; 3698 3699 /* 3700 * We allow for interrupts to reenter here and do a trace. 3701 * If one does, it will cause this original code to loop 3702 * back here. Even with heavy interrupts happening, this 3703 * should only happen a few times in a row. If this happens 3704 * 1000 times in a row, there must be either an interrupt 3705 * storm or we have something buggy. 3706 * Bail! 3707 */ 3708 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3709 goto out_fail; 3710 3711 event = __rb_reserve_next(cpu_buffer, &info); 3712 3713 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3714 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3715 info.length -= RB_LEN_TIME_EXTEND; 3716 goto again; 3717 } 3718 3719 if (likely(event)) 3720 return event; 3721 out_fail: 3722 rb_end_commit(cpu_buffer); 3723 return NULL; 3724 } 3725 3726 /** 3727 * ring_buffer_lock_reserve - reserve a part of the buffer 3728 * @buffer: the ring buffer to reserve from 3729 * @length: the length of the data to reserve (excluding event header) 3730 * 3731 * Returns a reserved event on the ring buffer to copy directly to. 3732 * The user of this interface will need to get the body to write into 3733 * and can use the ring_buffer_event_data() interface. 3734 * 3735 * The length is the length of the data needed, not the event length 3736 * which also includes the event header. 3737 * 3738 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3739 * If NULL is returned, then nothing has been allocated or locked. 3740 */ 3741 struct ring_buffer_event * 3742 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3743 { 3744 struct ring_buffer_per_cpu *cpu_buffer; 3745 struct ring_buffer_event *event; 3746 int cpu; 3747 3748 /* If we are tracing schedule, we don't want to recurse */ 3749 preempt_disable_notrace(); 3750 3751 if (unlikely(atomic_read(&buffer->record_disabled))) 3752 goto out; 3753 3754 cpu = raw_smp_processor_id(); 3755 3756 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3757 goto out; 3758 3759 cpu_buffer = buffer->buffers[cpu]; 3760 3761 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3762 goto out; 3763 3764 if (unlikely(length > buffer->max_data_size)) 3765 goto out; 3766 3767 if (unlikely(trace_recursive_lock(cpu_buffer))) 3768 goto out; 3769 3770 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3771 if (!event) 3772 goto out_unlock; 3773 3774 return event; 3775 3776 out_unlock: 3777 trace_recursive_unlock(cpu_buffer); 3778 out: 3779 preempt_enable_notrace(); 3780 return NULL; 3781 } 3782 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3783 3784 /* 3785 * Decrement the entries to the page that an event is on. 3786 * The event does not even need to exist, only the pointer 3787 * to the page it is on. This may only be called before the commit 3788 * takes place. 3789 */ 3790 static inline void 3791 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3792 struct ring_buffer_event *event) 3793 { 3794 unsigned long addr = (unsigned long)event; 3795 struct buffer_page *bpage = cpu_buffer->commit_page; 3796 struct buffer_page *start; 3797 3798 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3799 3800 /* Do the likely case first */ 3801 if (likely(bpage->page == (void *)addr)) { 3802 local_dec(&bpage->entries); 3803 return; 3804 } 3805 3806 /* 3807 * Because the commit page may be on the reader page we 3808 * start with the next page and check the end loop there. 3809 */ 3810 rb_inc_page(&bpage); 3811 start = bpage; 3812 do { 3813 if (bpage->page == (void *)addr) { 3814 local_dec(&bpage->entries); 3815 return; 3816 } 3817 rb_inc_page(&bpage); 3818 } while (bpage != start); 3819 3820 /* commit not part of this buffer?? */ 3821 RB_WARN_ON(cpu_buffer, 1); 3822 } 3823 3824 /** 3825 * ring_buffer_discard_commit - discard an event that has not been committed 3826 * @buffer: the ring buffer 3827 * @event: non committed event to discard 3828 * 3829 * Sometimes an event that is in the ring buffer needs to be ignored. 3830 * This function lets the user discard an event in the ring buffer 3831 * and then that event will not be read later. 3832 * 3833 * This function only works if it is called before the item has been 3834 * committed. It will try to free the event from the ring buffer 3835 * if another event has not been added behind it. 3836 * 3837 * If another event has been added behind it, it will set the event 3838 * up as discarded, and perform the commit. 3839 * 3840 * If this function is called, do not call ring_buffer_unlock_commit on 3841 * the event. 3842 */ 3843 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3844 struct ring_buffer_event *event) 3845 { 3846 struct ring_buffer_per_cpu *cpu_buffer; 3847 int cpu; 3848 3849 /* The event is discarded regardless */ 3850 rb_event_discard(event); 3851 3852 cpu = smp_processor_id(); 3853 cpu_buffer = buffer->buffers[cpu]; 3854 3855 /* 3856 * This must only be called if the event has not been 3857 * committed yet. Thus we can assume that preemption 3858 * is still disabled. 3859 */ 3860 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3861 3862 rb_decrement_entry(cpu_buffer, event); 3863 if (rb_try_to_discard(cpu_buffer, event)) 3864 goto out; 3865 3866 out: 3867 rb_end_commit(cpu_buffer); 3868 3869 trace_recursive_unlock(cpu_buffer); 3870 3871 preempt_enable_notrace(); 3872 3873 } 3874 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3875 3876 /** 3877 * ring_buffer_write - write data to the buffer without reserving 3878 * @buffer: The ring buffer to write to. 3879 * @length: The length of the data being written (excluding the event header) 3880 * @data: The data to write to the buffer. 3881 * 3882 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3883 * one function. If you already have the data to write to the buffer, it 3884 * may be easier to simply call this function. 3885 * 3886 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3887 * and not the length of the event which would hold the header. 3888 */ 3889 int ring_buffer_write(struct trace_buffer *buffer, 3890 unsigned long length, 3891 void *data) 3892 { 3893 struct ring_buffer_per_cpu *cpu_buffer; 3894 struct ring_buffer_event *event; 3895 void *body; 3896 int ret = -EBUSY; 3897 int cpu; 3898 3899 preempt_disable_notrace(); 3900 3901 if (atomic_read(&buffer->record_disabled)) 3902 goto out; 3903 3904 cpu = raw_smp_processor_id(); 3905 3906 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3907 goto out; 3908 3909 cpu_buffer = buffer->buffers[cpu]; 3910 3911 if (atomic_read(&cpu_buffer->record_disabled)) 3912 goto out; 3913 3914 if (length > buffer->max_data_size) 3915 goto out; 3916 3917 if (unlikely(trace_recursive_lock(cpu_buffer))) 3918 goto out; 3919 3920 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3921 if (!event) 3922 goto out_unlock; 3923 3924 body = rb_event_data(event); 3925 3926 memcpy(body, data, length); 3927 3928 rb_commit(cpu_buffer); 3929 3930 rb_wakeups(buffer, cpu_buffer); 3931 3932 ret = 0; 3933 3934 out_unlock: 3935 trace_recursive_unlock(cpu_buffer); 3936 3937 out: 3938 preempt_enable_notrace(); 3939 3940 return ret; 3941 } 3942 EXPORT_SYMBOL_GPL(ring_buffer_write); 3943 3944 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3945 { 3946 struct buffer_page *reader = cpu_buffer->reader_page; 3947 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3948 struct buffer_page *commit = cpu_buffer->commit_page; 3949 3950 /* In case of error, head will be NULL */ 3951 if (unlikely(!head)) 3952 return true; 3953 3954 /* Reader should exhaust content in reader page */ 3955 if (reader->read != rb_page_size(reader)) 3956 return false; 3957 3958 /* 3959 * If writers are committing on the reader page, knowing all 3960 * committed content has been read, the ring buffer is empty. 3961 */ 3962 if (commit == reader) 3963 return true; 3964 3965 /* 3966 * If writers are committing on a page other than reader page 3967 * and head page, there should always be content to read. 3968 */ 3969 if (commit != head) 3970 return false; 3971 3972 /* 3973 * Writers are committing on the head page, we just need 3974 * to care about there're committed data, and the reader will 3975 * swap reader page with head page when it is to read data. 3976 */ 3977 return rb_page_commit(commit) == 0; 3978 } 3979 3980 /** 3981 * ring_buffer_record_disable - stop all writes into the buffer 3982 * @buffer: The ring buffer to stop writes to. 3983 * 3984 * This prevents all writes to the buffer. Any attempt to write 3985 * to the buffer after this will fail and return NULL. 3986 * 3987 * The caller should call synchronize_rcu() after this. 3988 */ 3989 void ring_buffer_record_disable(struct trace_buffer *buffer) 3990 { 3991 atomic_inc(&buffer->record_disabled); 3992 } 3993 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 3994 3995 /** 3996 * ring_buffer_record_enable - enable writes to the buffer 3997 * @buffer: The ring buffer to enable writes 3998 * 3999 * Note, multiple disables will need the same number of enables 4000 * to truly enable the writing (much like preempt_disable). 4001 */ 4002 void ring_buffer_record_enable(struct trace_buffer *buffer) 4003 { 4004 atomic_dec(&buffer->record_disabled); 4005 } 4006 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4007 4008 /** 4009 * ring_buffer_record_off - stop all writes into the buffer 4010 * @buffer: The ring buffer to stop writes to. 4011 * 4012 * This prevents all writes to the buffer. Any attempt to write 4013 * to the buffer after this will fail and return NULL. 4014 * 4015 * This is different than ring_buffer_record_disable() as 4016 * it works like an on/off switch, where as the disable() version 4017 * must be paired with a enable(). 4018 */ 4019 void ring_buffer_record_off(struct trace_buffer *buffer) 4020 { 4021 unsigned int rd; 4022 unsigned int new_rd; 4023 4024 rd = atomic_read(&buffer->record_disabled); 4025 do { 4026 new_rd = rd | RB_BUFFER_OFF; 4027 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4028 } 4029 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4030 4031 /** 4032 * ring_buffer_record_on - restart writes into the buffer 4033 * @buffer: The ring buffer to start writes to. 4034 * 4035 * This enables all writes to the buffer that was disabled by 4036 * ring_buffer_record_off(). 4037 * 4038 * This is different than ring_buffer_record_enable() as 4039 * it works like an on/off switch, where as the enable() version 4040 * must be paired with a disable(). 4041 */ 4042 void ring_buffer_record_on(struct trace_buffer *buffer) 4043 { 4044 unsigned int rd; 4045 unsigned int new_rd; 4046 4047 rd = atomic_read(&buffer->record_disabled); 4048 do { 4049 new_rd = rd & ~RB_BUFFER_OFF; 4050 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4051 } 4052 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4053 4054 /** 4055 * ring_buffer_record_is_on - return true if the ring buffer can write 4056 * @buffer: The ring buffer to see if write is enabled 4057 * 4058 * Returns true if the ring buffer is in a state that it accepts writes. 4059 */ 4060 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4061 { 4062 return !atomic_read(&buffer->record_disabled); 4063 } 4064 4065 /** 4066 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4067 * @buffer: The ring buffer to see if write is set enabled 4068 * 4069 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4070 * Note that this does NOT mean it is in a writable state. 4071 * 4072 * It may return true when the ring buffer has been disabled by 4073 * ring_buffer_record_disable(), as that is a temporary disabling of 4074 * the ring buffer. 4075 */ 4076 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4077 { 4078 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4079 } 4080 4081 /** 4082 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4083 * @buffer: The ring buffer to stop writes to. 4084 * @cpu: The CPU buffer to stop 4085 * 4086 * This prevents all writes to the buffer. Any attempt to write 4087 * to the buffer after this will fail and return NULL. 4088 * 4089 * The caller should call synchronize_rcu() after this. 4090 */ 4091 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4092 { 4093 struct ring_buffer_per_cpu *cpu_buffer; 4094 4095 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4096 return; 4097 4098 cpu_buffer = buffer->buffers[cpu]; 4099 atomic_inc(&cpu_buffer->record_disabled); 4100 } 4101 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4102 4103 /** 4104 * ring_buffer_record_enable_cpu - enable writes to the buffer 4105 * @buffer: The ring buffer to enable writes 4106 * @cpu: The CPU to enable. 4107 * 4108 * Note, multiple disables will need the same number of enables 4109 * to truly enable the writing (much like preempt_disable). 4110 */ 4111 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4112 { 4113 struct ring_buffer_per_cpu *cpu_buffer; 4114 4115 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4116 return; 4117 4118 cpu_buffer = buffer->buffers[cpu]; 4119 atomic_dec(&cpu_buffer->record_disabled); 4120 } 4121 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4122 4123 /* 4124 * The total entries in the ring buffer is the running counter 4125 * of entries entered into the ring buffer, minus the sum of 4126 * the entries read from the ring buffer and the number of 4127 * entries that were overwritten. 4128 */ 4129 static inline unsigned long 4130 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4131 { 4132 return local_read(&cpu_buffer->entries) - 4133 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4134 } 4135 4136 /** 4137 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4138 * @buffer: The ring buffer 4139 * @cpu: The per CPU buffer to read from. 4140 */ 4141 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4142 { 4143 unsigned long flags; 4144 struct ring_buffer_per_cpu *cpu_buffer; 4145 struct buffer_page *bpage; 4146 u64 ret = 0; 4147 4148 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4149 return 0; 4150 4151 cpu_buffer = buffer->buffers[cpu]; 4152 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4153 /* 4154 * if the tail is on reader_page, oldest time stamp is on the reader 4155 * page 4156 */ 4157 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4158 bpage = cpu_buffer->reader_page; 4159 else 4160 bpage = rb_set_head_page(cpu_buffer); 4161 if (bpage) 4162 ret = bpage->page->time_stamp; 4163 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4164 4165 return ret; 4166 } 4167 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4168 4169 /** 4170 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4171 * @buffer: The ring buffer 4172 * @cpu: The per CPU buffer to read from. 4173 */ 4174 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4175 { 4176 struct ring_buffer_per_cpu *cpu_buffer; 4177 unsigned long ret; 4178 4179 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4180 return 0; 4181 4182 cpu_buffer = buffer->buffers[cpu]; 4183 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4184 4185 return ret; 4186 } 4187 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4188 4189 /** 4190 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4191 * @buffer: The ring buffer 4192 * @cpu: The per CPU buffer to get the entries from. 4193 */ 4194 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4195 { 4196 struct ring_buffer_per_cpu *cpu_buffer; 4197 4198 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4199 return 0; 4200 4201 cpu_buffer = buffer->buffers[cpu]; 4202 4203 return rb_num_of_entries(cpu_buffer); 4204 } 4205 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4206 4207 /** 4208 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4209 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4210 * @buffer: The ring buffer 4211 * @cpu: The per CPU buffer to get the number of overruns from 4212 */ 4213 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4214 { 4215 struct ring_buffer_per_cpu *cpu_buffer; 4216 unsigned long ret; 4217 4218 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4219 return 0; 4220 4221 cpu_buffer = buffer->buffers[cpu]; 4222 ret = local_read(&cpu_buffer->overrun); 4223 4224 return ret; 4225 } 4226 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4227 4228 /** 4229 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4230 * commits failing due to the buffer wrapping around while there are uncommitted 4231 * events, such as during an interrupt storm. 4232 * @buffer: The ring buffer 4233 * @cpu: The per CPU buffer to get the number of overruns from 4234 */ 4235 unsigned long 4236 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4237 { 4238 struct ring_buffer_per_cpu *cpu_buffer; 4239 unsigned long ret; 4240 4241 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4242 return 0; 4243 4244 cpu_buffer = buffer->buffers[cpu]; 4245 ret = local_read(&cpu_buffer->commit_overrun); 4246 4247 return ret; 4248 } 4249 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4250 4251 /** 4252 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4253 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4254 * @buffer: The ring buffer 4255 * @cpu: The per CPU buffer to get the number of overruns from 4256 */ 4257 unsigned long 4258 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4259 { 4260 struct ring_buffer_per_cpu *cpu_buffer; 4261 unsigned long ret; 4262 4263 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4264 return 0; 4265 4266 cpu_buffer = buffer->buffers[cpu]; 4267 ret = local_read(&cpu_buffer->dropped_events); 4268 4269 return ret; 4270 } 4271 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4272 4273 /** 4274 * ring_buffer_read_events_cpu - get the number of events successfully read 4275 * @buffer: The ring buffer 4276 * @cpu: The per CPU buffer to get the number of events read 4277 */ 4278 unsigned long 4279 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4280 { 4281 struct ring_buffer_per_cpu *cpu_buffer; 4282 4283 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4284 return 0; 4285 4286 cpu_buffer = buffer->buffers[cpu]; 4287 return cpu_buffer->read; 4288 } 4289 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4290 4291 /** 4292 * ring_buffer_entries - get the number of entries in a buffer 4293 * @buffer: The ring buffer 4294 * 4295 * Returns the total number of entries in the ring buffer 4296 * (all CPU entries) 4297 */ 4298 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4299 { 4300 struct ring_buffer_per_cpu *cpu_buffer; 4301 unsigned long entries = 0; 4302 int cpu; 4303 4304 /* if you care about this being correct, lock the buffer */ 4305 for_each_buffer_cpu(buffer, cpu) { 4306 cpu_buffer = buffer->buffers[cpu]; 4307 entries += rb_num_of_entries(cpu_buffer); 4308 } 4309 4310 return entries; 4311 } 4312 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4313 4314 /** 4315 * ring_buffer_overruns - get the number of overruns in buffer 4316 * @buffer: The ring buffer 4317 * 4318 * Returns the total number of overruns in the ring buffer 4319 * (all CPU entries) 4320 */ 4321 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4322 { 4323 struct ring_buffer_per_cpu *cpu_buffer; 4324 unsigned long overruns = 0; 4325 int cpu; 4326 4327 /* if you care about this being correct, lock the buffer */ 4328 for_each_buffer_cpu(buffer, cpu) { 4329 cpu_buffer = buffer->buffers[cpu]; 4330 overruns += local_read(&cpu_buffer->overrun); 4331 } 4332 4333 return overruns; 4334 } 4335 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4336 4337 static void rb_iter_reset(struct ring_buffer_iter *iter) 4338 { 4339 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4340 4341 /* Iterator usage is expected to have record disabled */ 4342 iter->head_page = cpu_buffer->reader_page; 4343 iter->head = cpu_buffer->reader_page->read; 4344 iter->next_event = iter->head; 4345 4346 iter->cache_reader_page = iter->head_page; 4347 iter->cache_read = cpu_buffer->read; 4348 iter->cache_pages_removed = cpu_buffer->pages_removed; 4349 4350 if (iter->head) { 4351 iter->read_stamp = cpu_buffer->read_stamp; 4352 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4353 } else { 4354 iter->read_stamp = iter->head_page->page->time_stamp; 4355 iter->page_stamp = iter->read_stamp; 4356 } 4357 } 4358 4359 /** 4360 * ring_buffer_iter_reset - reset an iterator 4361 * @iter: The iterator to reset 4362 * 4363 * Resets the iterator, so that it will start from the beginning 4364 * again. 4365 */ 4366 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4367 { 4368 struct ring_buffer_per_cpu *cpu_buffer; 4369 unsigned long flags; 4370 4371 if (!iter) 4372 return; 4373 4374 cpu_buffer = iter->cpu_buffer; 4375 4376 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4377 rb_iter_reset(iter); 4378 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4379 } 4380 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4381 4382 /** 4383 * ring_buffer_iter_empty - check if an iterator has no more to read 4384 * @iter: The iterator to check 4385 */ 4386 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4387 { 4388 struct ring_buffer_per_cpu *cpu_buffer; 4389 struct buffer_page *reader; 4390 struct buffer_page *head_page; 4391 struct buffer_page *commit_page; 4392 struct buffer_page *curr_commit_page; 4393 unsigned commit; 4394 u64 curr_commit_ts; 4395 u64 commit_ts; 4396 4397 cpu_buffer = iter->cpu_buffer; 4398 reader = cpu_buffer->reader_page; 4399 head_page = cpu_buffer->head_page; 4400 commit_page = READ_ONCE(cpu_buffer->commit_page); 4401 commit_ts = commit_page->page->time_stamp; 4402 4403 /* 4404 * When the writer goes across pages, it issues a cmpxchg which 4405 * is a mb(), which will synchronize with the rmb here. 4406 * (see rb_tail_page_update()) 4407 */ 4408 smp_rmb(); 4409 commit = rb_page_commit(commit_page); 4410 /* We want to make sure that the commit page doesn't change */ 4411 smp_rmb(); 4412 4413 /* Make sure commit page didn't change */ 4414 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4415 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4416 4417 /* If the commit page changed, then there's more data */ 4418 if (curr_commit_page != commit_page || 4419 curr_commit_ts != commit_ts) 4420 return 0; 4421 4422 /* Still racy, as it may return a false positive, but that's OK */ 4423 return ((iter->head_page == commit_page && iter->head >= commit) || 4424 (iter->head_page == reader && commit_page == head_page && 4425 head_page->read == commit && 4426 iter->head == rb_page_size(cpu_buffer->reader_page))); 4427 } 4428 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4429 4430 static void 4431 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4432 struct ring_buffer_event *event) 4433 { 4434 u64 delta; 4435 4436 switch (event->type_len) { 4437 case RINGBUF_TYPE_PADDING: 4438 return; 4439 4440 case RINGBUF_TYPE_TIME_EXTEND: 4441 delta = rb_event_time_stamp(event); 4442 cpu_buffer->read_stamp += delta; 4443 return; 4444 4445 case RINGBUF_TYPE_TIME_STAMP: 4446 delta = rb_event_time_stamp(event); 4447 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4448 cpu_buffer->read_stamp = delta; 4449 return; 4450 4451 case RINGBUF_TYPE_DATA: 4452 cpu_buffer->read_stamp += event->time_delta; 4453 return; 4454 4455 default: 4456 RB_WARN_ON(cpu_buffer, 1); 4457 } 4458 } 4459 4460 static void 4461 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4462 struct ring_buffer_event *event) 4463 { 4464 u64 delta; 4465 4466 switch (event->type_len) { 4467 case RINGBUF_TYPE_PADDING: 4468 return; 4469 4470 case RINGBUF_TYPE_TIME_EXTEND: 4471 delta = rb_event_time_stamp(event); 4472 iter->read_stamp += delta; 4473 return; 4474 4475 case RINGBUF_TYPE_TIME_STAMP: 4476 delta = rb_event_time_stamp(event); 4477 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4478 iter->read_stamp = delta; 4479 return; 4480 4481 case RINGBUF_TYPE_DATA: 4482 iter->read_stamp += event->time_delta; 4483 return; 4484 4485 default: 4486 RB_WARN_ON(iter->cpu_buffer, 1); 4487 } 4488 } 4489 4490 static struct buffer_page * 4491 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4492 { 4493 struct buffer_page *reader = NULL; 4494 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 4495 unsigned long overwrite; 4496 unsigned long flags; 4497 int nr_loops = 0; 4498 bool ret; 4499 4500 local_irq_save(flags); 4501 arch_spin_lock(&cpu_buffer->lock); 4502 4503 again: 4504 /* 4505 * This should normally only loop twice. But because the 4506 * start of the reader inserts an empty page, it causes 4507 * a case where we will loop three times. There should be no 4508 * reason to loop four times (that I know of). 4509 */ 4510 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4511 reader = NULL; 4512 goto out; 4513 } 4514 4515 reader = cpu_buffer->reader_page; 4516 4517 /* If there's more to read, return this page */ 4518 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4519 goto out; 4520 4521 /* Never should we have an index greater than the size */ 4522 if (RB_WARN_ON(cpu_buffer, 4523 cpu_buffer->reader_page->read > rb_page_size(reader))) 4524 goto out; 4525 4526 /* check if we caught up to the tail */ 4527 reader = NULL; 4528 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4529 goto out; 4530 4531 /* Don't bother swapping if the ring buffer is empty */ 4532 if (rb_num_of_entries(cpu_buffer) == 0) 4533 goto out; 4534 4535 /* 4536 * Reset the reader page to size zero. 4537 */ 4538 local_set(&cpu_buffer->reader_page->write, 0); 4539 local_set(&cpu_buffer->reader_page->entries, 0); 4540 local_set(&cpu_buffer->reader_page->page->commit, 0); 4541 cpu_buffer->reader_page->real_end = 0; 4542 4543 spin: 4544 /* 4545 * Splice the empty reader page into the list around the head. 4546 */ 4547 reader = rb_set_head_page(cpu_buffer); 4548 if (!reader) 4549 goto out; 4550 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4551 cpu_buffer->reader_page->list.prev = reader->list.prev; 4552 4553 /* 4554 * cpu_buffer->pages just needs to point to the buffer, it 4555 * has no specific buffer page to point to. Lets move it out 4556 * of our way so we don't accidentally swap it. 4557 */ 4558 cpu_buffer->pages = reader->list.prev; 4559 4560 /* The reader page will be pointing to the new head */ 4561 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4562 4563 /* 4564 * We want to make sure we read the overruns after we set up our 4565 * pointers to the next object. The writer side does a 4566 * cmpxchg to cross pages which acts as the mb on the writer 4567 * side. Note, the reader will constantly fail the swap 4568 * while the writer is updating the pointers, so this 4569 * guarantees that the overwrite recorded here is the one we 4570 * want to compare with the last_overrun. 4571 */ 4572 smp_mb(); 4573 overwrite = local_read(&(cpu_buffer->overrun)); 4574 4575 /* 4576 * Here's the tricky part. 4577 * 4578 * We need to move the pointer past the header page. 4579 * But we can only do that if a writer is not currently 4580 * moving it. The page before the header page has the 4581 * flag bit '1' set if it is pointing to the page we want. 4582 * but if the writer is in the process of moving it 4583 * than it will be '2' or already moved '0'. 4584 */ 4585 4586 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4587 4588 /* 4589 * If we did not convert it, then we must try again. 4590 */ 4591 if (!ret) 4592 goto spin; 4593 4594 /* 4595 * Yay! We succeeded in replacing the page. 4596 * 4597 * Now make the new head point back to the reader page. 4598 */ 4599 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4600 rb_inc_page(&cpu_buffer->head_page); 4601 4602 local_inc(&cpu_buffer->pages_read); 4603 4604 /* Finally update the reader page to the new head */ 4605 cpu_buffer->reader_page = reader; 4606 cpu_buffer->reader_page->read = 0; 4607 4608 if (overwrite != cpu_buffer->last_overrun) { 4609 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4610 cpu_buffer->last_overrun = overwrite; 4611 } 4612 4613 goto again; 4614 4615 out: 4616 /* Update the read_stamp on the first event */ 4617 if (reader && reader->read == 0) 4618 cpu_buffer->read_stamp = reader->page->time_stamp; 4619 4620 arch_spin_unlock(&cpu_buffer->lock); 4621 local_irq_restore(flags); 4622 4623 /* 4624 * The writer has preempt disable, wait for it. But not forever 4625 * Although, 1 second is pretty much "forever" 4626 */ 4627 #define USECS_WAIT 1000000 4628 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4629 /* If the write is past the end of page, a writer is still updating it */ 4630 if (likely(!reader || rb_page_write(reader) <= bsize)) 4631 break; 4632 4633 udelay(1); 4634 4635 /* Get the latest version of the reader write value */ 4636 smp_rmb(); 4637 } 4638 4639 /* The writer is not moving forward? Something is wrong */ 4640 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4641 reader = NULL; 4642 4643 /* 4644 * Make sure we see any padding after the write update 4645 * (see rb_reset_tail()). 4646 * 4647 * In addition, a writer may be writing on the reader page 4648 * if the page has not been fully filled, so the read barrier 4649 * is also needed to make sure we see the content of what is 4650 * committed by the writer (see rb_set_commit_to_write()). 4651 */ 4652 smp_rmb(); 4653 4654 4655 return reader; 4656 } 4657 4658 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4659 { 4660 struct ring_buffer_event *event; 4661 struct buffer_page *reader; 4662 unsigned length; 4663 4664 reader = rb_get_reader_page(cpu_buffer); 4665 4666 /* This function should not be called when buffer is empty */ 4667 if (RB_WARN_ON(cpu_buffer, !reader)) 4668 return; 4669 4670 event = rb_reader_event(cpu_buffer); 4671 4672 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4673 cpu_buffer->read++; 4674 4675 rb_update_read_stamp(cpu_buffer, event); 4676 4677 length = rb_event_length(event); 4678 cpu_buffer->reader_page->read += length; 4679 cpu_buffer->read_bytes += length; 4680 } 4681 4682 static void rb_advance_iter(struct ring_buffer_iter *iter) 4683 { 4684 struct ring_buffer_per_cpu *cpu_buffer; 4685 4686 cpu_buffer = iter->cpu_buffer; 4687 4688 /* If head == next_event then we need to jump to the next event */ 4689 if (iter->head == iter->next_event) { 4690 /* If the event gets overwritten again, there's nothing to do */ 4691 if (rb_iter_head_event(iter) == NULL) 4692 return; 4693 } 4694 4695 iter->head = iter->next_event; 4696 4697 /* 4698 * Check if we are at the end of the buffer. 4699 */ 4700 if (iter->next_event >= rb_page_size(iter->head_page)) { 4701 /* discarded commits can make the page empty */ 4702 if (iter->head_page == cpu_buffer->commit_page) 4703 return; 4704 rb_inc_iter(iter); 4705 return; 4706 } 4707 4708 rb_update_iter_read_stamp(iter, iter->event); 4709 } 4710 4711 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4712 { 4713 return cpu_buffer->lost_events; 4714 } 4715 4716 static struct ring_buffer_event * 4717 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4718 unsigned long *lost_events) 4719 { 4720 struct ring_buffer_event *event; 4721 struct buffer_page *reader; 4722 int nr_loops = 0; 4723 4724 if (ts) 4725 *ts = 0; 4726 again: 4727 /* 4728 * We repeat when a time extend is encountered. 4729 * Since the time extend is always attached to a data event, 4730 * we should never loop more than once. 4731 * (We never hit the following condition more than twice). 4732 */ 4733 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4734 return NULL; 4735 4736 reader = rb_get_reader_page(cpu_buffer); 4737 if (!reader) 4738 return NULL; 4739 4740 event = rb_reader_event(cpu_buffer); 4741 4742 switch (event->type_len) { 4743 case RINGBUF_TYPE_PADDING: 4744 if (rb_null_event(event)) 4745 RB_WARN_ON(cpu_buffer, 1); 4746 /* 4747 * Because the writer could be discarding every 4748 * event it creates (which would probably be bad) 4749 * if we were to go back to "again" then we may never 4750 * catch up, and will trigger the warn on, or lock 4751 * the box. Return the padding, and we will release 4752 * the current locks, and try again. 4753 */ 4754 return event; 4755 4756 case RINGBUF_TYPE_TIME_EXTEND: 4757 /* Internal data, OK to advance */ 4758 rb_advance_reader(cpu_buffer); 4759 goto again; 4760 4761 case RINGBUF_TYPE_TIME_STAMP: 4762 if (ts) { 4763 *ts = rb_event_time_stamp(event); 4764 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4765 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4766 cpu_buffer->cpu, ts); 4767 } 4768 /* Internal data, OK to advance */ 4769 rb_advance_reader(cpu_buffer); 4770 goto again; 4771 4772 case RINGBUF_TYPE_DATA: 4773 if (ts && !(*ts)) { 4774 *ts = cpu_buffer->read_stamp + event->time_delta; 4775 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4776 cpu_buffer->cpu, ts); 4777 } 4778 if (lost_events) 4779 *lost_events = rb_lost_events(cpu_buffer); 4780 return event; 4781 4782 default: 4783 RB_WARN_ON(cpu_buffer, 1); 4784 } 4785 4786 return NULL; 4787 } 4788 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4789 4790 static struct ring_buffer_event * 4791 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4792 { 4793 struct trace_buffer *buffer; 4794 struct ring_buffer_per_cpu *cpu_buffer; 4795 struct ring_buffer_event *event; 4796 int nr_loops = 0; 4797 4798 if (ts) 4799 *ts = 0; 4800 4801 cpu_buffer = iter->cpu_buffer; 4802 buffer = cpu_buffer->buffer; 4803 4804 /* 4805 * Check if someone performed a consuming read to the buffer 4806 * or removed some pages from the buffer. In these cases, 4807 * iterator was invalidated and we need to reset it. 4808 */ 4809 if (unlikely(iter->cache_read != cpu_buffer->read || 4810 iter->cache_reader_page != cpu_buffer->reader_page || 4811 iter->cache_pages_removed != cpu_buffer->pages_removed)) 4812 rb_iter_reset(iter); 4813 4814 again: 4815 if (ring_buffer_iter_empty(iter)) 4816 return NULL; 4817 4818 /* 4819 * As the writer can mess with what the iterator is trying 4820 * to read, just give up if we fail to get an event after 4821 * three tries. The iterator is not as reliable when reading 4822 * the ring buffer with an active write as the consumer is. 4823 * Do not warn if the three failures is reached. 4824 */ 4825 if (++nr_loops > 3) 4826 return NULL; 4827 4828 if (rb_per_cpu_empty(cpu_buffer)) 4829 return NULL; 4830 4831 if (iter->head >= rb_page_size(iter->head_page)) { 4832 rb_inc_iter(iter); 4833 goto again; 4834 } 4835 4836 event = rb_iter_head_event(iter); 4837 if (!event) 4838 goto again; 4839 4840 switch (event->type_len) { 4841 case RINGBUF_TYPE_PADDING: 4842 if (rb_null_event(event)) { 4843 rb_inc_iter(iter); 4844 goto again; 4845 } 4846 rb_advance_iter(iter); 4847 return event; 4848 4849 case RINGBUF_TYPE_TIME_EXTEND: 4850 /* Internal data, OK to advance */ 4851 rb_advance_iter(iter); 4852 goto again; 4853 4854 case RINGBUF_TYPE_TIME_STAMP: 4855 if (ts) { 4856 *ts = rb_event_time_stamp(event); 4857 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4858 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4859 cpu_buffer->cpu, ts); 4860 } 4861 /* Internal data, OK to advance */ 4862 rb_advance_iter(iter); 4863 goto again; 4864 4865 case RINGBUF_TYPE_DATA: 4866 if (ts && !(*ts)) { 4867 *ts = iter->read_stamp + event->time_delta; 4868 ring_buffer_normalize_time_stamp(buffer, 4869 cpu_buffer->cpu, ts); 4870 } 4871 return event; 4872 4873 default: 4874 RB_WARN_ON(cpu_buffer, 1); 4875 } 4876 4877 return NULL; 4878 } 4879 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4880 4881 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4882 { 4883 if (likely(!in_nmi())) { 4884 raw_spin_lock(&cpu_buffer->reader_lock); 4885 return true; 4886 } 4887 4888 /* 4889 * If an NMI die dumps out the content of the ring buffer 4890 * trylock must be used to prevent a deadlock if the NMI 4891 * preempted a task that holds the ring buffer locks. If 4892 * we get the lock then all is fine, if not, then continue 4893 * to do the read, but this can corrupt the ring buffer, 4894 * so it must be permanently disabled from future writes. 4895 * Reading from NMI is a oneshot deal. 4896 */ 4897 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4898 return true; 4899 4900 /* Continue without locking, but disable the ring buffer */ 4901 atomic_inc(&cpu_buffer->record_disabled); 4902 return false; 4903 } 4904 4905 static inline void 4906 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4907 { 4908 if (likely(locked)) 4909 raw_spin_unlock(&cpu_buffer->reader_lock); 4910 } 4911 4912 /** 4913 * ring_buffer_peek - peek at the next event to be read 4914 * @buffer: The ring buffer to read 4915 * @cpu: The cpu to peak at 4916 * @ts: The timestamp counter of this event. 4917 * @lost_events: a variable to store if events were lost (may be NULL) 4918 * 4919 * This will return the event that will be read next, but does 4920 * not consume the data. 4921 */ 4922 struct ring_buffer_event * 4923 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4924 unsigned long *lost_events) 4925 { 4926 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4927 struct ring_buffer_event *event; 4928 unsigned long flags; 4929 bool dolock; 4930 4931 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4932 return NULL; 4933 4934 again: 4935 local_irq_save(flags); 4936 dolock = rb_reader_lock(cpu_buffer); 4937 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4938 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4939 rb_advance_reader(cpu_buffer); 4940 rb_reader_unlock(cpu_buffer, dolock); 4941 local_irq_restore(flags); 4942 4943 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4944 goto again; 4945 4946 return event; 4947 } 4948 4949 /** ring_buffer_iter_dropped - report if there are dropped events 4950 * @iter: The ring buffer iterator 4951 * 4952 * Returns true if there was dropped events since the last peek. 4953 */ 4954 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4955 { 4956 bool ret = iter->missed_events != 0; 4957 4958 iter->missed_events = 0; 4959 return ret; 4960 } 4961 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4962 4963 /** 4964 * ring_buffer_iter_peek - peek at the next event to be read 4965 * @iter: The ring buffer iterator 4966 * @ts: The timestamp counter of this event. 4967 * 4968 * This will return the event that will be read next, but does 4969 * not increment the iterator. 4970 */ 4971 struct ring_buffer_event * 4972 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4973 { 4974 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4975 struct ring_buffer_event *event; 4976 unsigned long flags; 4977 4978 again: 4979 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4980 event = rb_iter_peek(iter, ts); 4981 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4982 4983 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4984 goto again; 4985 4986 return event; 4987 } 4988 4989 /** 4990 * ring_buffer_consume - return an event and consume it 4991 * @buffer: The ring buffer to get the next event from 4992 * @cpu: the cpu to read the buffer from 4993 * @ts: a variable to store the timestamp (may be NULL) 4994 * @lost_events: a variable to store if events were lost (may be NULL) 4995 * 4996 * Returns the next event in the ring buffer, and that event is consumed. 4997 * Meaning, that sequential reads will keep returning a different event, 4998 * and eventually empty the ring buffer if the producer is slower. 4999 */ 5000 struct ring_buffer_event * 5001 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5002 unsigned long *lost_events) 5003 { 5004 struct ring_buffer_per_cpu *cpu_buffer; 5005 struct ring_buffer_event *event = NULL; 5006 unsigned long flags; 5007 bool dolock; 5008 5009 again: 5010 /* might be called in atomic */ 5011 preempt_disable(); 5012 5013 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5014 goto out; 5015 5016 cpu_buffer = buffer->buffers[cpu]; 5017 local_irq_save(flags); 5018 dolock = rb_reader_lock(cpu_buffer); 5019 5020 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5021 if (event) { 5022 cpu_buffer->lost_events = 0; 5023 rb_advance_reader(cpu_buffer); 5024 } 5025 5026 rb_reader_unlock(cpu_buffer, dolock); 5027 local_irq_restore(flags); 5028 5029 out: 5030 preempt_enable(); 5031 5032 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5033 goto again; 5034 5035 return event; 5036 } 5037 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5038 5039 /** 5040 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5041 * @buffer: The ring buffer to read from 5042 * @cpu: The cpu buffer to iterate over 5043 * @flags: gfp flags to use for memory allocation 5044 * 5045 * This performs the initial preparations necessary to iterate 5046 * through the buffer. Memory is allocated, buffer resizing 5047 * is disabled, and the iterator pointer is returned to the caller. 5048 * 5049 * After a sequence of ring_buffer_read_prepare calls, the user is 5050 * expected to make at least one call to ring_buffer_read_prepare_sync. 5051 * Afterwards, ring_buffer_read_start is invoked to get things going 5052 * for real. 5053 * 5054 * This overall must be paired with ring_buffer_read_finish. 5055 */ 5056 struct ring_buffer_iter * 5057 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5058 { 5059 struct ring_buffer_per_cpu *cpu_buffer; 5060 struct ring_buffer_iter *iter; 5061 5062 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5063 return NULL; 5064 5065 iter = kzalloc(sizeof(*iter), flags); 5066 if (!iter) 5067 return NULL; 5068 5069 /* Holds the entire event: data and meta data */ 5070 iter->event_size = buffer->subbuf_size; 5071 iter->event = kmalloc(iter->event_size, flags); 5072 if (!iter->event) { 5073 kfree(iter); 5074 return NULL; 5075 } 5076 5077 cpu_buffer = buffer->buffers[cpu]; 5078 5079 iter->cpu_buffer = cpu_buffer; 5080 5081 atomic_inc(&cpu_buffer->resize_disabled); 5082 5083 return iter; 5084 } 5085 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5086 5087 /** 5088 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5089 * 5090 * All previously invoked ring_buffer_read_prepare calls to prepare 5091 * iterators will be synchronized. Afterwards, read_buffer_read_start 5092 * calls on those iterators are allowed. 5093 */ 5094 void 5095 ring_buffer_read_prepare_sync(void) 5096 { 5097 synchronize_rcu(); 5098 } 5099 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5100 5101 /** 5102 * ring_buffer_read_start - start a non consuming read of the buffer 5103 * @iter: The iterator returned by ring_buffer_read_prepare 5104 * 5105 * This finalizes the startup of an iteration through the buffer. 5106 * The iterator comes from a call to ring_buffer_read_prepare and 5107 * an intervening ring_buffer_read_prepare_sync must have been 5108 * performed. 5109 * 5110 * Must be paired with ring_buffer_read_finish. 5111 */ 5112 void 5113 ring_buffer_read_start(struct ring_buffer_iter *iter) 5114 { 5115 struct ring_buffer_per_cpu *cpu_buffer; 5116 unsigned long flags; 5117 5118 if (!iter) 5119 return; 5120 5121 cpu_buffer = iter->cpu_buffer; 5122 5123 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5124 arch_spin_lock(&cpu_buffer->lock); 5125 rb_iter_reset(iter); 5126 arch_spin_unlock(&cpu_buffer->lock); 5127 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5128 } 5129 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5130 5131 /** 5132 * ring_buffer_read_finish - finish reading the iterator of the buffer 5133 * @iter: The iterator retrieved by ring_buffer_start 5134 * 5135 * This re-enables resizing of the buffer, and frees the iterator. 5136 */ 5137 void 5138 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5139 { 5140 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5141 unsigned long flags; 5142 5143 /* Use this opportunity to check the integrity of the ring buffer. */ 5144 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5145 rb_check_pages(cpu_buffer); 5146 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5147 5148 atomic_dec(&cpu_buffer->resize_disabled); 5149 kfree(iter->event); 5150 kfree(iter); 5151 } 5152 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5153 5154 /** 5155 * ring_buffer_iter_advance - advance the iterator to the next location 5156 * @iter: The ring buffer iterator 5157 * 5158 * Move the location of the iterator such that the next read will 5159 * be the next location of the iterator. 5160 */ 5161 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5162 { 5163 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5164 unsigned long flags; 5165 5166 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5167 5168 rb_advance_iter(iter); 5169 5170 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5171 } 5172 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5173 5174 /** 5175 * ring_buffer_size - return the size of the ring buffer (in bytes) 5176 * @buffer: The ring buffer. 5177 * @cpu: The CPU to get ring buffer size from. 5178 */ 5179 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5180 { 5181 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5182 return 0; 5183 5184 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5185 } 5186 EXPORT_SYMBOL_GPL(ring_buffer_size); 5187 5188 /** 5189 * ring_buffer_max_event_size - return the max data size of an event 5190 * @buffer: The ring buffer. 5191 * 5192 * Returns the maximum size an event can be. 5193 */ 5194 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5195 { 5196 /* If abs timestamp is requested, events have a timestamp too */ 5197 if (ring_buffer_time_stamp_abs(buffer)) 5198 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5199 return buffer->max_data_size; 5200 } 5201 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5202 5203 static void rb_clear_buffer_page(struct buffer_page *page) 5204 { 5205 local_set(&page->write, 0); 5206 local_set(&page->entries, 0); 5207 rb_init_page(page->page); 5208 page->read = 0; 5209 } 5210 5211 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5212 { 5213 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 5214 5215 meta->reader.read = cpu_buffer->reader_page->read; 5216 meta->reader.id = cpu_buffer->reader_page->id; 5217 meta->reader.lost_events = cpu_buffer->lost_events; 5218 5219 meta->entries = local_read(&cpu_buffer->entries); 5220 meta->overrun = local_read(&cpu_buffer->overrun); 5221 meta->read = cpu_buffer->read; 5222 5223 /* Some archs do not have data cache coherency between kernel and user-space */ 5224 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 5225 } 5226 5227 static void 5228 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5229 { 5230 struct buffer_page *page; 5231 5232 rb_head_page_deactivate(cpu_buffer); 5233 5234 cpu_buffer->head_page 5235 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5236 rb_clear_buffer_page(cpu_buffer->head_page); 5237 list_for_each_entry(page, cpu_buffer->pages, list) { 5238 rb_clear_buffer_page(page); 5239 } 5240 5241 cpu_buffer->tail_page = cpu_buffer->head_page; 5242 cpu_buffer->commit_page = cpu_buffer->head_page; 5243 5244 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5245 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5246 rb_clear_buffer_page(cpu_buffer->reader_page); 5247 5248 local_set(&cpu_buffer->entries_bytes, 0); 5249 local_set(&cpu_buffer->overrun, 0); 5250 local_set(&cpu_buffer->commit_overrun, 0); 5251 local_set(&cpu_buffer->dropped_events, 0); 5252 local_set(&cpu_buffer->entries, 0); 5253 local_set(&cpu_buffer->committing, 0); 5254 local_set(&cpu_buffer->commits, 0); 5255 local_set(&cpu_buffer->pages_touched, 0); 5256 local_set(&cpu_buffer->pages_lost, 0); 5257 local_set(&cpu_buffer->pages_read, 0); 5258 cpu_buffer->last_pages_touch = 0; 5259 cpu_buffer->shortest_full = 0; 5260 cpu_buffer->read = 0; 5261 cpu_buffer->read_bytes = 0; 5262 5263 rb_time_set(&cpu_buffer->write_stamp, 0); 5264 rb_time_set(&cpu_buffer->before_stamp, 0); 5265 5266 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5267 5268 cpu_buffer->lost_events = 0; 5269 cpu_buffer->last_overrun = 0; 5270 5271 if (cpu_buffer->mapped) 5272 rb_update_meta_page(cpu_buffer); 5273 5274 rb_head_page_activate(cpu_buffer); 5275 cpu_buffer->pages_removed = 0; 5276 } 5277 5278 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5279 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5280 { 5281 unsigned long flags; 5282 5283 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5284 5285 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5286 goto out; 5287 5288 arch_spin_lock(&cpu_buffer->lock); 5289 5290 rb_reset_cpu(cpu_buffer); 5291 5292 arch_spin_unlock(&cpu_buffer->lock); 5293 5294 out: 5295 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5296 } 5297 5298 /** 5299 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5300 * @buffer: The ring buffer to reset a per cpu buffer of 5301 * @cpu: The CPU buffer to be reset 5302 */ 5303 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5304 { 5305 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5306 5307 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5308 return; 5309 5310 /* prevent another thread from changing buffer sizes */ 5311 mutex_lock(&buffer->mutex); 5312 5313 atomic_inc(&cpu_buffer->resize_disabled); 5314 atomic_inc(&cpu_buffer->record_disabled); 5315 5316 /* Make sure all commits have finished */ 5317 synchronize_rcu(); 5318 5319 reset_disabled_cpu_buffer(cpu_buffer); 5320 5321 atomic_dec(&cpu_buffer->record_disabled); 5322 atomic_dec(&cpu_buffer->resize_disabled); 5323 5324 mutex_unlock(&buffer->mutex); 5325 } 5326 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5327 5328 /* Flag to ensure proper resetting of atomic variables */ 5329 #define RESET_BIT (1 << 30) 5330 5331 /** 5332 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5333 * @buffer: The ring buffer to reset a per cpu buffer of 5334 */ 5335 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5336 { 5337 struct ring_buffer_per_cpu *cpu_buffer; 5338 int cpu; 5339 5340 /* prevent another thread from changing buffer sizes */ 5341 mutex_lock(&buffer->mutex); 5342 5343 for_each_online_buffer_cpu(buffer, cpu) { 5344 cpu_buffer = buffer->buffers[cpu]; 5345 5346 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5347 atomic_inc(&cpu_buffer->record_disabled); 5348 } 5349 5350 /* Make sure all commits have finished */ 5351 synchronize_rcu(); 5352 5353 for_each_buffer_cpu(buffer, cpu) { 5354 cpu_buffer = buffer->buffers[cpu]; 5355 5356 /* 5357 * If a CPU came online during the synchronize_rcu(), then 5358 * ignore it. 5359 */ 5360 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5361 continue; 5362 5363 reset_disabled_cpu_buffer(cpu_buffer); 5364 5365 atomic_dec(&cpu_buffer->record_disabled); 5366 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5367 } 5368 5369 mutex_unlock(&buffer->mutex); 5370 } 5371 5372 /** 5373 * ring_buffer_reset - reset a ring buffer 5374 * @buffer: The ring buffer to reset all cpu buffers 5375 */ 5376 void ring_buffer_reset(struct trace_buffer *buffer) 5377 { 5378 struct ring_buffer_per_cpu *cpu_buffer; 5379 int cpu; 5380 5381 /* prevent another thread from changing buffer sizes */ 5382 mutex_lock(&buffer->mutex); 5383 5384 for_each_buffer_cpu(buffer, cpu) { 5385 cpu_buffer = buffer->buffers[cpu]; 5386 5387 atomic_inc(&cpu_buffer->resize_disabled); 5388 atomic_inc(&cpu_buffer->record_disabled); 5389 } 5390 5391 /* Make sure all commits have finished */ 5392 synchronize_rcu(); 5393 5394 for_each_buffer_cpu(buffer, cpu) { 5395 cpu_buffer = buffer->buffers[cpu]; 5396 5397 reset_disabled_cpu_buffer(cpu_buffer); 5398 5399 atomic_dec(&cpu_buffer->record_disabled); 5400 atomic_dec(&cpu_buffer->resize_disabled); 5401 } 5402 5403 mutex_unlock(&buffer->mutex); 5404 } 5405 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5406 5407 /** 5408 * ring_buffer_empty - is the ring buffer empty? 5409 * @buffer: The ring buffer to test 5410 */ 5411 bool ring_buffer_empty(struct trace_buffer *buffer) 5412 { 5413 struct ring_buffer_per_cpu *cpu_buffer; 5414 unsigned long flags; 5415 bool dolock; 5416 bool ret; 5417 int cpu; 5418 5419 /* yes this is racy, but if you don't like the race, lock the buffer */ 5420 for_each_buffer_cpu(buffer, cpu) { 5421 cpu_buffer = buffer->buffers[cpu]; 5422 local_irq_save(flags); 5423 dolock = rb_reader_lock(cpu_buffer); 5424 ret = rb_per_cpu_empty(cpu_buffer); 5425 rb_reader_unlock(cpu_buffer, dolock); 5426 local_irq_restore(flags); 5427 5428 if (!ret) 5429 return false; 5430 } 5431 5432 return true; 5433 } 5434 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5435 5436 /** 5437 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5438 * @buffer: The ring buffer 5439 * @cpu: The CPU buffer to test 5440 */ 5441 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5442 { 5443 struct ring_buffer_per_cpu *cpu_buffer; 5444 unsigned long flags; 5445 bool dolock; 5446 bool ret; 5447 5448 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5449 return true; 5450 5451 cpu_buffer = buffer->buffers[cpu]; 5452 local_irq_save(flags); 5453 dolock = rb_reader_lock(cpu_buffer); 5454 ret = rb_per_cpu_empty(cpu_buffer); 5455 rb_reader_unlock(cpu_buffer, dolock); 5456 local_irq_restore(flags); 5457 5458 return ret; 5459 } 5460 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5461 5462 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5463 /** 5464 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5465 * @buffer_a: One buffer to swap with 5466 * @buffer_b: The other buffer to swap with 5467 * @cpu: the CPU of the buffers to swap 5468 * 5469 * This function is useful for tracers that want to take a "snapshot" 5470 * of a CPU buffer and has another back up buffer lying around. 5471 * it is expected that the tracer handles the cpu buffer not being 5472 * used at the moment. 5473 */ 5474 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5475 struct trace_buffer *buffer_b, int cpu) 5476 { 5477 struct ring_buffer_per_cpu *cpu_buffer_a; 5478 struct ring_buffer_per_cpu *cpu_buffer_b; 5479 int ret = -EINVAL; 5480 5481 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5482 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5483 goto out; 5484 5485 cpu_buffer_a = buffer_a->buffers[cpu]; 5486 cpu_buffer_b = buffer_b->buffers[cpu]; 5487 5488 /* It's up to the callers to not try to swap mapped buffers */ 5489 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 5490 ret = -EBUSY; 5491 goto out; 5492 } 5493 5494 /* At least make sure the two buffers are somewhat the same */ 5495 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5496 goto out; 5497 5498 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 5499 goto out; 5500 5501 ret = -EAGAIN; 5502 5503 if (atomic_read(&buffer_a->record_disabled)) 5504 goto out; 5505 5506 if (atomic_read(&buffer_b->record_disabled)) 5507 goto out; 5508 5509 if (atomic_read(&cpu_buffer_a->record_disabled)) 5510 goto out; 5511 5512 if (atomic_read(&cpu_buffer_b->record_disabled)) 5513 goto out; 5514 5515 /* 5516 * We can't do a synchronize_rcu here because this 5517 * function can be called in atomic context. 5518 * Normally this will be called from the same CPU as cpu. 5519 * If not it's up to the caller to protect this. 5520 */ 5521 atomic_inc(&cpu_buffer_a->record_disabled); 5522 atomic_inc(&cpu_buffer_b->record_disabled); 5523 5524 ret = -EBUSY; 5525 if (local_read(&cpu_buffer_a->committing)) 5526 goto out_dec; 5527 if (local_read(&cpu_buffer_b->committing)) 5528 goto out_dec; 5529 5530 /* 5531 * When resize is in progress, we cannot swap it because 5532 * it will mess the state of the cpu buffer. 5533 */ 5534 if (atomic_read(&buffer_a->resizing)) 5535 goto out_dec; 5536 if (atomic_read(&buffer_b->resizing)) 5537 goto out_dec; 5538 5539 buffer_a->buffers[cpu] = cpu_buffer_b; 5540 buffer_b->buffers[cpu] = cpu_buffer_a; 5541 5542 cpu_buffer_b->buffer = buffer_a; 5543 cpu_buffer_a->buffer = buffer_b; 5544 5545 ret = 0; 5546 5547 out_dec: 5548 atomic_dec(&cpu_buffer_a->record_disabled); 5549 atomic_dec(&cpu_buffer_b->record_disabled); 5550 out: 5551 return ret; 5552 } 5553 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5554 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5555 5556 /** 5557 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5558 * @buffer: the buffer to allocate for. 5559 * @cpu: the cpu buffer to allocate. 5560 * 5561 * This function is used in conjunction with ring_buffer_read_page. 5562 * When reading a full page from the ring buffer, these functions 5563 * can be used to speed up the process. The calling function should 5564 * allocate a few pages first with this function. Then when it 5565 * needs to get pages from the ring buffer, it passes the result 5566 * of this function into ring_buffer_read_page, which will swap 5567 * the page that was allocated, with the read page of the buffer. 5568 * 5569 * Returns: 5570 * The page allocated, or ERR_PTR 5571 */ 5572 struct buffer_data_read_page * 5573 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5574 { 5575 struct ring_buffer_per_cpu *cpu_buffer; 5576 struct buffer_data_read_page *bpage = NULL; 5577 unsigned long flags; 5578 struct page *page; 5579 5580 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5581 return ERR_PTR(-ENODEV); 5582 5583 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 5584 if (!bpage) 5585 return ERR_PTR(-ENOMEM); 5586 5587 bpage->order = buffer->subbuf_order; 5588 cpu_buffer = buffer->buffers[cpu]; 5589 local_irq_save(flags); 5590 arch_spin_lock(&cpu_buffer->lock); 5591 5592 if (cpu_buffer->free_page) { 5593 bpage->data = cpu_buffer->free_page; 5594 cpu_buffer->free_page = NULL; 5595 } 5596 5597 arch_spin_unlock(&cpu_buffer->lock); 5598 local_irq_restore(flags); 5599 5600 if (bpage->data) 5601 goto out; 5602 5603 page = alloc_pages_node(cpu_to_node(cpu), 5604 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 5605 cpu_buffer->buffer->subbuf_order); 5606 if (!page) { 5607 kfree(bpage); 5608 return ERR_PTR(-ENOMEM); 5609 } 5610 5611 bpage->data = page_address(page); 5612 5613 out: 5614 rb_init_page(bpage->data); 5615 5616 return bpage; 5617 } 5618 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5619 5620 /** 5621 * ring_buffer_free_read_page - free an allocated read page 5622 * @buffer: the buffer the page was allocate for 5623 * @cpu: the cpu buffer the page came from 5624 * @data_page: the page to free 5625 * 5626 * Free a page allocated from ring_buffer_alloc_read_page. 5627 */ 5628 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 5629 struct buffer_data_read_page *data_page) 5630 { 5631 struct ring_buffer_per_cpu *cpu_buffer; 5632 struct buffer_data_page *bpage = data_page->data; 5633 struct page *page = virt_to_page(bpage); 5634 unsigned long flags; 5635 5636 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5637 return; 5638 5639 cpu_buffer = buffer->buffers[cpu]; 5640 5641 /* 5642 * If the page is still in use someplace else, or order of the page 5643 * is different from the subbuffer order of the buffer - 5644 * we can't reuse it 5645 */ 5646 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 5647 goto out; 5648 5649 local_irq_save(flags); 5650 arch_spin_lock(&cpu_buffer->lock); 5651 5652 if (!cpu_buffer->free_page) { 5653 cpu_buffer->free_page = bpage; 5654 bpage = NULL; 5655 } 5656 5657 arch_spin_unlock(&cpu_buffer->lock); 5658 local_irq_restore(flags); 5659 5660 out: 5661 free_pages((unsigned long)bpage, data_page->order); 5662 kfree(data_page); 5663 } 5664 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5665 5666 /** 5667 * ring_buffer_read_page - extract a page from the ring buffer 5668 * @buffer: buffer to extract from 5669 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5670 * @len: amount to extract 5671 * @cpu: the cpu of the buffer to extract 5672 * @full: should the extraction only happen when the page is full. 5673 * 5674 * This function will pull out a page from the ring buffer and consume it. 5675 * @data_page must be the address of the variable that was returned 5676 * from ring_buffer_alloc_read_page. This is because the page might be used 5677 * to swap with a page in the ring buffer. 5678 * 5679 * for example: 5680 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5681 * if (IS_ERR(rpage)) 5682 * return PTR_ERR(rpage); 5683 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 5684 * if (ret >= 0) 5685 * process_page(ring_buffer_read_page_data(rpage), ret); 5686 * ring_buffer_free_read_page(buffer, cpu, rpage); 5687 * 5688 * When @full is set, the function will not return true unless 5689 * the writer is off the reader page. 5690 * 5691 * Note: it is up to the calling functions to handle sleeps and wakeups. 5692 * The ring buffer can be used anywhere in the kernel and can not 5693 * blindly call wake_up. The layer that uses the ring buffer must be 5694 * responsible for that. 5695 * 5696 * Returns: 5697 * >=0 if data has been transferred, returns the offset of consumed data. 5698 * <0 if no data has been transferred. 5699 */ 5700 int ring_buffer_read_page(struct trace_buffer *buffer, 5701 struct buffer_data_read_page *data_page, 5702 size_t len, int cpu, int full) 5703 { 5704 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5705 struct ring_buffer_event *event; 5706 struct buffer_data_page *bpage; 5707 struct buffer_page *reader; 5708 unsigned long missed_events; 5709 unsigned long flags; 5710 unsigned int commit; 5711 unsigned int read; 5712 u64 save_timestamp; 5713 int ret = -1; 5714 5715 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5716 goto out; 5717 5718 /* 5719 * If len is not big enough to hold the page header, then 5720 * we can not copy anything. 5721 */ 5722 if (len <= BUF_PAGE_HDR_SIZE) 5723 goto out; 5724 5725 len -= BUF_PAGE_HDR_SIZE; 5726 5727 if (!data_page || !data_page->data) 5728 goto out; 5729 if (data_page->order != buffer->subbuf_order) 5730 goto out; 5731 5732 bpage = data_page->data; 5733 if (!bpage) 5734 goto out; 5735 5736 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5737 5738 reader = rb_get_reader_page(cpu_buffer); 5739 if (!reader) 5740 goto out_unlock; 5741 5742 event = rb_reader_event(cpu_buffer); 5743 5744 read = reader->read; 5745 commit = rb_page_size(reader); 5746 5747 /* Check if any events were dropped */ 5748 missed_events = cpu_buffer->lost_events; 5749 5750 /* 5751 * If this page has been partially read or 5752 * if len is not big enough to read the rest of the page or 5753 * a writer is still on the page, then 5754 * we must copy the data from the page to the buffer. 5755 * Otherwise, we can simply swap the page with the one passed in. 5756 */ 5757 if (read || (len < (commit - read)) || 5758 cpu_buffer->reader_page == cpu_buffer->commit_page || 5759 cpu_buffer->mapped) { 5760 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5761 unsigned int rpos = read; 5762 unsigned int pos = 0; 5763 unsigned int size; 5764 5765 /* 5766 * If a full page is expected, this can still be returned 5767 * if there's been a previous partial read and the 5768 * rest of the page can be read and the commit page is off 5769 * the reader page. 5770 */ 5771 if (full && 5772 (!read || (len < (commit - read)) || 5773 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5774 goto out_unlock; 5775 5776 if (len > (commit - read)) 5777 len = (commit - read); 5778 5779 /* Always keep the time extend and data together */ 5780 size = rb_event_ts_length(event); 5781 5782 if (len < size) 5783 goto out_unlock; 5784 5785 /* save the current timestamp, since the user will need it */ 5786 save_timestamp = cpu_buffer->read_stamp; 5787 5788 /* Need to copy one event at a time */ 5789 do { 5790 /* We need the size of one event, because 5791 * rb_advance_reader only advances by one event, 5792 * whereas rb_event_ts_length may include the size of 5793 * one or two events. 5794 * We have already ensured there's enough space if this 5795 * is a time extend. */ 5796 size = rb_event_length(event); 5797 memcpy(bpage->data + pos, rpage->data + rpos, size); 5798 5799 len -= size; 5800 5801 rb_advance_reader(cpu_buffer); 5802 rpos = reader->read; 5803 pos += size; 5804 5805 if (rpos >= commit) 5806 break; 5807 5808 event = rb_reader_event(cpu_buffer); 5809 /* Always keep the time extend and data together */ 5810 size = rb_event_ts_length(event); 5811 } while (len >= size); 5812 5813 /* update bpage */ 5814 local_set(&bpage->commit, pos); 5815 bpage->time_stamp = save_timestamp; 5816 5817 /* we copied everything to the beginning */ 5818 read = 0; 5819 } else { 5820 /* update the entry counter */ 5821 cpu_buffer->read += rb_page_entries(reader); 5822 cpu_buffer->read_bytes += rb_page_size(reader); 5823 5824 /* swap the pages */ 5825 rb_init_page(bpage); 5826 bpage = reader->page; 5827 reader->page = data_page->data; 5828 local_set(&reader->write, 0); 5829 local_set(&reader->entries, 0); 5830 reader->read = 0; 5831 data_page->data = bpage; 5832 5833 /* 5834 * Use the real_end for the data size, 5835 * This gives us a chance to store the lost events 5836 * on the page. 5837 */ 5838 if (reader->real_end) 5839 local_set(&bpage->commit, reader->real_end); 5840 } 5841 ret = read; 5842 5843 cpu_buffer->lost_events = 0; 5844 5845 commit = local_read(&bpage->commit); 5846 /* 5847 * Set a flag in the commit field if we lost events 5848 */ 5849 if (missed_events) { 5850 /* If there is room at the end of the page to save the 5851 * missed events, then record it there. 5852 */ 5853 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 5854 memcpy(&bpage->data[commit], &missed_events, 5855 sizeof(missed_events)); 5856 local_add(RB_MISSED_STORED, &bpage->commit); 5857 commit += sizeof(missed_events); 5858 } 5859 local_add(RB_MISSED_EVENTS, &bpage->commit); 5860 } 5861 5862 /* 5863 * This page may be off to user land. Zero it out here. 5864 */ 5865 if (commit < buffer->subbuf_size) 5866 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 5867 5868 out_unlock: 5869 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5870 5871 out: 5872 return ret; 5873 } 5874 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5875 5876 /** 5877 * ring_buffer_read_page_data - get pointer to the data in the page. 5878 * @page: the page to get the data from 5879 * 5880 * Returns pointer to the actual data in this page. 5881 */ 5882 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 5883 { 5884 return page->data; 5885 } 5886 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 5887 5888 /** 5889 * ring_buffer_subbuf_size_get - get size of the sub buffer. 5890 * @buffer: the buffer to get the sub buffer size from 5891 * 5892 * Returns size of the sub buffer, in bytes. 5893 */ 5894 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 5895 { 5896 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 5897 } 5898 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 5899 5900 /** 5901 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 5902 * @buffer: The ring_buffer to get the system sub page order from 5903 * 5904 * By default, one ring buffer sub page equals to one system page. This parameter 5905 * is configurable, per ring buffer. The size of the ring buffer sub page can be 5906 * extended, but must be an order of system page size. 5907 * 5908 * Returns the order of buffer sub page size, in system pages: 5909 * 0 means the sub buffer size is 1 system page and so forth. 5910 * In case of an error < 0 is returned. 5911 */ 5912 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 5913 { 5914 if (!buffer) 5915 return -EINVAL; 5916 5917 return buffer->subbuf_order; 5918 } 5919 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 5920 5921 /** 5922 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 5923 * @buffer: The ring_buffer to set the new page size. 5924 * @order: Order of the system pages in one sub buffer page 5925 * 5926 * By default, one ring buffer pages equals to one system page. This API can be 5927 * used to set new size of the ring buffer page. The size must be order of 5928 * system page size, that's why the input parameter @order is the order of 5929 * system pages that are allocated for one ring buffer page: 5930 * 0 - 1 system page 5931 * 1 - 2 system pages 5932 * 3 - 4 system pages 5933 * ... 5934 * 5935 * Returns 0 on success or < 0 in case of an error. 5936 */ 5937 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 5938 { 5939 struct ring_buffer_per_cpu *cpu_buffer; 5940 struct buffer_page *bpage, *tmp; 5941 int old_order, old_size; 5942 int nr_pages; 5943 int psize; 5944 int err; 5945 int cpu; 5946 5947 if (!buffer || order < 0) 5948 return -EINVAL; 5949 5950 if (buffer->subbuf_order == order) 5951 return 0; 5952 5953 psize = (1 << order) * PAGE_SIZE; 5954 if (psize <= BUF_PAGE_HDR_SIZE) 5955 return -EINVAL; 5956 5957 /* Size of a subbuf cannot be greater than the write counter */ 5958 if (psize > RB_WRITE_MASK + 1) 5959 return -EINVAL; 5960 5961 old_order = buffer->subbuf_order; 5962 old_size = buffer->subbuf_size; 5963 5964 /* prevent another thread from changing buffer sizes */ 5965 mutex_lock(&buffer->mutex); 5966 atomic_inc(&buffer->record_disabled); 5967 5968 /* Make sure all commits have finished */ 5969 synchronize_rcu(); 5970 5971 buffer->subbuf_order = order; 5972 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 5973 5974 /* Make sure all new buffers are allocated, before deleting the old ones */ 5975 for_each_buffer_cpu(buffer, cpu) { 5976 5977 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5978 continue; 5979 5980 cpu_buffer = buffer->buffers[cpu]; 5981 5982 if (cpu_buffer->mapped) { 5983 err = -EBUSY; 5984 goto error; 5985 } 5986 5987 /* Update the number of pages to match the new size */ 5988 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 5989 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 5990 5991 /* we need a minimum of two pages */ 5992 if (nr_pages < 2) 5993 nr_pages = 2; 5994 5995 cpu_buffer->nr_pages_to_update = nr_pages; 5996 5997 /* Include the reader page */ 5998 nr_pages++; 5999 6000 /* Allocate the new size buffer */ 6001 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6002 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6003 &cpu_buffer->new_pages)) { 6004 /* not enough memory for new pages */ 6005 err = -ENOMEM; 6006 goto error; 6007 } 6008 } 6009 6010 for_each_buffer_cpu(buffer, cpu) { 6011 6012 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6013 continue; 6014 6015 cpu_buffer = buffer->buffers[cpu]; 6016 6017 /* Clear the head bit to make the link list normal to read */ 6018 rb_head_page_deactivate(cpu_buffer); 6019 6020 /* Now walk the list and free all the old sub buffers */ 6021 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) { 6022 list_del_init(&bpage->list); 6023 free_buffer_page(bpage); 6024 } 6025 /* The above loop stopped an the last page needing to be freed */ 6026 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list); 6027 free_buffer_page(bpage); 6028 6029 /* Free the current reader page */ 6030 free_buffer_page(cpu_buffer->reader_page); 6031 6032 /* One page was allocated for the reader page */ 6033 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6034 struct buffer_page, list); 6035 list_del_init(&cpu_buffer->reader_page->list); 6036 6037 /* The cpu_buffer pages are a link list with no head */ 6038 cpu_buffer->pages = cpu_buffer->new_pages.next; 6039 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev; 6040 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next; 6041 6042 /* Clear the new_pages list */ 6043 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6044 6045 cpu_buffer->head_page 6046 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6047 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6048 6049 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6050 cpu_buffer->nr_pages_to_update = 0; 6051 6052 free_pages((unsigned long)cpu_buffer->free_page, old_order); 6053 cpu_buffer->free_page = NULL; 6054 6055 rb_head_page_activate(cpu_buffer); 6056 6057 rb_check_pages(cpu_buffer); 6058 } 6059 6060 atomic_dec(&buffer->record_disabled); 6061 mutex_unlock(&buffer->mutex); 6062 6063 return 0; 6064 6065 error: 6066 buffer->subbuf_order = old_order; 6067 buffer->subbuf_size = old_size; 6068 6069 atomic_dec(&buffer->record_disabled); 6070 mutex_unlock(&buffer->mutex); 6071 6072 for_each_buffer_cpu(buffer, cpu) { 6073 cpu_buffer = buffer->buffers[cpu]; 6074 6075 if (!cpu_buffer->nr_pages_to_update) 6076 continue; 6077 6078 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6079 list_del_init(&bpage->list); 6080 free_buffer_page(bpage); 6081 } 6082 } 6083 6084 return err; 6085 } 6086 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6087 6088 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6089 { 6090 struct page *page; 6091 6092 if (cpu_buffer->meta_page) 6093 return 0; 6094 6095 page = alloc_page(GFP_USER | __GFP_ZERO); 6096 if (!page) 6097 return -ENOMEM; 6098 6099 cpu_buffer->meta_page = page_to_virt(page); 6100 6101 return 0; 6102 } 6103 6104 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6105 { 6106 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6107 6108 free_page(addr); 6109 cpu_buffer->meta_page = NULL; 6110 } 6111 6112 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6113 unsigned long *subbuf_ids) 6114 { 6115 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6116 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6117 struct buffer_page *first_subbuf, *subbuf; 6118 int id = 0; 6119 6120 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6121 cpu_buffer->reader_page->id = id++; 6122 6123 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6124 do { 6125 if (WARN_ON(id >= nr_subbufs)) 6126 break; 6127 6128 subbuf_ids[id] = (unsigned long)subbuf->page; 6129 subbuf->id = id; 6130 6131 rb_inc_page(&subbuf); 6132 id++; 6133 } while (subbuf != first_subbuf); 6134 6135 /* install subbuf ID to kern VA translation */ 6136 cpu_buffer->subbuf_ids = subbuf_ids; 6137 6138 meta->meta_page_size = PAGE_SIZE; 6139 meta->meta_struct_len = sizeof(*meta); 6140 meta->nr_subbufs = nr_subbufs; 6141 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6142 6143 rb_update_meta_page(cpu_buffer); 6144 } 6145 6146 static struct ring_buffer_per_cpu * 6147 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6148 { 6149 struct ring_buffer_per_cpu *cpu_buffer; 6150 6151 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6152 return ERR_PTR(-EINVAL); 6153 6154 cpu_buffer = buffer->buffers[cpu]; 6155 6156 mutex_lock(&cpu_buffer->mapping_lock); 6157 6158 if (!cpu_buffer->mapped) { 6159 mutex_unlock(&cpu_buffer->mapping_lock); 6160 return ERR_PTR(-ENODEV); 6161 } 6162 6163 return cpu_buffer; 6164 } 6165 6166 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6167 { 6168 mutex_unlock(&cpu_buffer->mapping_lock); 6169 } 6170 6171 /* 6172 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 6173 * to be set-up or torn-down. 6174 */ 6175 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 6176 bool inc) 6177 { 6178 unsigned long flags; 6179 6180 lockdep_assert_held(&cpu_buffer->mapping_lock); 6181 6182 if (inc && cpu_buffer->mapped == UINT_MAX) 6183 return -EBUSY; 6184 6185 if (WARN_ON(!inc && cpu_buffer->mapped == 0)) 6186 return -EINVAL; 6187 6188 mutex_lock(&cpu_buffer->buffer->mutex); 6189 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6190 6191 if (inc) 6192 cpu_buffer->mapped++; 6193 else 6194 cpu_buffer->mapped--; 6195 6196 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6197 mutex_unlock(&cpu_buffer->buffer->mutex); 6198 6199 return 0; 6200 } 6201 6202 /* 6203 * +--------------+ pgoff == 0 6204 * | meta page | 6205 * +--------------+ pgoff == 1 6206 * | subbuffer 0 | 6207 * | | 6208 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 6209 * | subbuffer 1 | 6210 * | | 6211 * ... 6212 */ 6213 #ifdef CONFIG_MMU 6214 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6215 struct vm_area_struct *vma) 6216 { 6217 unsigned long nr_subbufs, nr_pages, vma_pages, pgoff = vma->vm_pgoff; 6218 unsigned int subbuf_pages, subbuf_order; 6219 struct page **pages; 6220 int p = 0, s = 0; 6221 int err; 6222 6223 /* Refuse MP_PRIVATE or writable mappings */ 6224 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 6225 !(vma->vm_flags & VM_MAYSHARE)) 6226 return -EPERM; 6227 6228 /* 6229 * Make sure the mapping cannot become writable later. Also tell the VM 6230 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 6231 */ 6232 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 6233 VM_MAYWRITE); 6234 6235 lockdep_assert_held(&cpu_buffer->mapping_lock); 6236 6237 subbuf_order = cpu_buffer->buffer->subbuf_order; 6238 subbuf_pages = 1 << subbuf_order; 6239 6240 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 6241 nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */ 6242 6243 vma_pages = (vma->vm_end - vma->vm_start) >> PAGE_SHIFT; 6244 if (!vma_pages || vma_pages > nr_pages) 6245 return -EINVAL; 6246 6247 nr_pages = vma_pages; 6248 6249 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 6250 if (!pages) 6251 return -ENOMEM; 6252 6253 if (!pgoff) { 6254 pages[p++] = virt_to_page(cpu_buffer->meta_page); 6255 6256 /* 6257 * TODO: Align sub-buffers on their size, once 6258 * vm_insert_pages() supports the zero-page. 6259 */ 6260 } else { 6261 /* Skip the meta-page */ 6262 pgoff--; 6263 6264 if (pgoff % subbuf_pages) { 6265 err = -EINVAL; 6266 goto out; 6267 } 6268 6269 s += pgoff / subbuf_pages; 6270 } 6271 6272 while (p < nr_pages) { 6273 struct page *page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 6274 int off = 0; 6275 6276 if (WARN_ON_ONCE(s >= nr_subbufs)) { 6277 err = -EINVAL; 6278 goto out; 6279 } 6280 6281 for (; off < (1 << (subbuf_order)); off++, page++) { 6282 if (p >= nr_pages) 6283 break; 6284 6285 pages[p++] = page; 6286 } 6287 s++; 6288 } 6289 6290 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 6291 6292 out: 6293 kfree(pages); 6294 6295 return err; 6296 } 6297 #else 6298 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6299 struct vm_area_struct *vma) 6300 { 6301 return -EOPNOTSUPP; 6302 } 6303 #endif 6304 6305 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 6306 struct vm_area_struct *vma) 6307 { 6308 struct ring_buffer_per_cpu *cpu_buffer; 6309 unsigned long flags, *subbuf_ids; 6310 int err = 0; 6311 6312 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6313 return -EINVAL; 6314 6315 cpu_buffer = buffer->buffers[cpu]; 6316 6317 mutex_lock(&cpu_buffer->mapping_lock); 6318 6319 if (cpu_buffer->mapped) { 6320 err = __rb_map_vma(cpu_buffer, vma); 6321 if (!err) 6322 err = __rb_inc_dec_mapped(cpu_buffer, true); 6323 mutex_unlock(&cpu_buffer->mapping_lock); 6324 return err; 6325 } 6326 6327 /* prevent another thread from changing buffer/sub-buffer sizes */ 6328 mutex_lock(&buffer->mutex); 6329 6330 err = rb_alloc_meta_page(cpu_buffer); 6331 if (err) 6332 goto unlock; 6333 6334 /* subbuf_ids include the reader while nr_pages does not */ 6335 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 6336 if (!subbuf_ids) { 6337 rb_free_meta_page(cpu_buffer); 6338 err = -ENOMEM; 6339 goto unlock; 6340 } 6341 6342 atomic_inc(&cpu_buffer->resize_disabled); 6343 6344 /* 6345 * Lock all readers to block any subbuf swap until the subbuf IDs are 6346 * assigned. 6347 */ 6348 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6349 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 6350 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6351 6352 err = __rb_map_vma(cpu_buffer, vma); 6353 if (!err) { 6354 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6355 cpu_buffer->mapped = 1; 6356 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6357 } else { 6358 kfree(cpu_buffer->subbuf_ids); 6359 cpu_buffer->subbuf_ids = NULL; 6360 rb_free_meta_page(cpu_buffer); 6361 } 6362 6363 unlock: 6364 mutex_unlock(&buffer->mutex); 6365 mutex_unlock(&cpu_buffer->mapping_lock); 6366 6367 return err; 6368 } 6369 6370 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 6371 { 6372 struct ring_buffer_per_cpu *cpu_buffer; 6373 unsigned long flags; 6374 int err = 0; 6375 6376 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6377 return -EINVAL; 6378 6379 cpu_buffer = buffer->buffers[cpu]; 6380 6381 mutex_lock(&cpu_buffer->mapping_lock); 6382 6383 if (!cpu_buffer->mapped) { 6384 err = -ENODEV; 6385 goto out; 6386 } else if (cpu_buffer->mapped > 1) { 6387 __rb_inc_dec_mapped(cpu_buffer, false); 6388 goto out; 6389 } 6390 6391 mutex_lock(&buffer->mutex); 6392 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6393 6394 cpu_buffer->mapped = 0; 6395 6396 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6397 6398 kfree(cpu_buffer->subbuf_ids); 6399 cpu_buffer->subbuf_ids = NULL; 6400 rb_free_meta_page(cpu_buffer); 6401 atomic_dec(&cpu_buffer->resize_disabled); 6402 6403 mutex_unlock(&buffer->mutex); 6404 6405 out: 6406 mutex_unlock(&cpu_buffer->mapping_lock); 6407 6408 return err; 6409 } 6410 6411 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 6412 { 6413 struct ring_buffer_per_cpu *cpu_buffer; 6414 struct buffer_page *reader; 6415 unsigned long missed_events; 6416 unsigned long reader_size; 6417 unsigned long flags; 6418 6419 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 6420 if (IS_ERR(cpu_buffer)) 6421 return (int)PTR_ERR(cpu_buffer); 6422 6423 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6424 6425 consume: 6426 if (rb_per_cpu_empty(cpu_buffer)) 6427 goto out; 6428 6429 reader_size = rb_page_size(cpu_buffer->reader_page); 6430 6431 /* 6432 * There are data to be read on the current reader page, we can 6433 * return to the caller. But before that, we assume the latter will read 6434 * everything. Let's update the kernel reader accordingly. 6435 */ 6436 if (cpu_buffer->reader_page->read < reader_size) { 6437 while (cpu_buffer->reader_page->read < reader_size) 6438 rb_advance_reader(cpu_buffer); 6439 goto out; 6440 } 6441 6442 reader = rb_get_reader_page(cpu_buffer); 6443 if (WARN_ON(!reader)) 6444 goto out; 6445 6446 /* Check if any events were dropped */ 6447 missed_events = cpu_buffer->lost_events; 6448 6449 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 6450 if (missed_events) { 6451 struct buffer_data_page *bpage = reader->page; 6452 unsigned int commit; 6453 /* 6454 * Use the real_end for the data size, 6455 * This gives us a chance to store the lost events 6456 * on the page. 6457 */ 6458 if (reader->real_end) 6459 local_set(&bpage->commit, reader->real_end); 6460 /* 6461 * If there is room at the end of the page to save the 6462 * missed events, then record it there. 6463 */ 6464 commit = rb_page_size(reader); 6465 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6466 memcpy(&bpage->data[commit], &missed_events, 6467 sizeof(missed_events)); 6468 local_add(RB_MISSED_STORED, &bpage->commit); 6469 } 6470 local_add(RB_MISSED_EVENTS, &bpage->commit); 6471 } 6472 } else { 6473 /* 6474 * There really shouldn't be any missed events if the commit 6475 * is on the reader page. 6476 */ 6477 WARN_ON_ONCE(missed_events); 6478 } 6479 6480 cpu_buffer->lost_events = 0; 6481 6482 goto consume; 6483 6484 out: 6485 /* Some archs do not have data cache coherency between kernel and user-space */ 6486 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 6487 6488 rb_update_meta_page(cpu_buffer); 6489 6490 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6491 rb_put_mapped_buffer(cpu_buffer); 6492 6493 return 0; 6494 } 6495 6496 /* 6497 * We only allocate new buffers, never free them if the CPU goes down. 6498 * If we were to free the buffer, then the user would lose any trace that was in 6499 * the buffer. 6500 */ 6501 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 6502 { 6503 struct trace_buffer *buffer; 6504 long nr_pages_same; 6505 int cpu_i; 6506 unsigned long nr_pages; 6507 6508 buffer = container_of(node, struct trace_buffer, node); 6509 if (cpumask_test_cpu(cpu, buffer->cpumask)) 6510 return 0; 6511 6512 nr_pages = 0; 6513 nr_pages_same = 1; 6514 /* check if all cpu sizes are same */ 6515 for_each_buffer_cpu(buffer, cpu_i) { 6516 /* fill in the size from first enabled cpu */ 6517 if (nr_pages == 0) 6518 nr_pages = buffer->buffers[cpu_i]->nr_pages; 6519 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 6520 nr_pages_same = 0; 6521 break; 6522 } 6523 } 6524 /* allocate minimum pages, user can later expand it */ 6525 if (!nr_pages_same) 6526 nr_pages = 2; 6527 buffer->buffers[cpu] = 6528 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 6529 if (!buffer->buffers[cpu]) { 6530 WARN(1, "failed to allocate ring buffer on CPU %u\n", 6531 cpu); 6532 return -ENOMEM; 6533 } 6534 smp_wmb(); 6535 cpumask_set_cpu(cpu, buffer->cpumask); 6536 return 0; 6537 } 6538 6539 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 6540 /* 6541 * This is a basic integrity check of the ring buffer. 6542 * Late in the boot cycle this test will run when configured in. 6543 * It will kick off a thread per CPU that will go into a loop 6544 * writing to the per cpu ring buffer various sizes of data. 6545 * Some of the data will be large items, some small. 6546 * 6547 * Another thread is created that goes into a spin, sending out 6548 * IPIs to the other CPUs to also write into the ring buffer. 6549 * this is to test the nesting ability of the buffer. 6550 * 6551 * Basic stats are recorded and reported. If something in the 6552 * ring buffer should happen that's not expected, a big warning 6553 * is displayed and all ring buffers are disabled. 6554 */ 6555 static struct task_struct *rb_threads[NR_CPUS] __initdata; 6556 6557 struct rb_test_data { 6558 struct trace_buffer *buffer; 6559 unsigned long events; 6560 unsigned long bytes_written; 6561 unsigned long bytes_alloc; 6562 unsigned long bytes_dropped; 6563 unsigned long events_nested; 6564 unsigned long bytes_written_nested; 6565 unsigned long bytes_alloc_nested; 6566 unsigned long bytes_dropped_nested; 6567 int min_size_nested; 6568 int max_size_nested; 6569 int max_size; 6570 int min_size; 6571 int cpu; 6572 int cnt; 6573 }; 6574 6575 static struct rb_test_data rb_data[NR_CPUS] __initdata; 6576 6577 /* 1 meg per cpu */ 6578 #define RB_TEST_BUFFER_SIZE 1048576 6579 6580 static char rb_string[] __initdata = 6581 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 6582 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 6583 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 6584 6585 static bool rb_test_started __initdata; 6586 6587 struct rb_item { 6588 int size; 6589 char str[]; 6590 }; 6591 6592 static __init int rb_write_something(struct rb_test_data *data, bool nested) 6593 { 6594 struct ring_buffer_event *event; 6595 struct rb_item *item; 6596 bool started; 6597 int event_len; 6598 int size; 6599 int len; 6600 int cnt; 6601 6602 /* Have nested writes different that what is written */ 6603 cnt = data->cnt + (nested ? 27 : 0); 6604 6605 /* Multiply cnt by ~e, to make some unique increment */ 6606 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 6607 6608 len = size + sizeof(struct rb_item); 6609 6610 started = rb_test_started; 6611 /* read rb_test_started before checking buffer enabled */ 6612 smp_rmb(); 6613 6614 event = ring_buffer_lock_reserve(data->buffer, len); 6615 if (!event) { 6616 /* Ignore dropped events before test starts. */ 6617 if (started) { 6618 if (nested) 6619 data->bytes_dropped += len; 6620 else 6621 data->bytes_dropped_nested += len; 6622 } 6623 return len; 6624 } 6625 6626 event_len = ring_buffer_event_length(event); 6627 6628 if (RB_WARN_ON(data->buffer, event_len < len)) 6629 goto out; 6630 6631 item = ring_buffer_event_data(event); 6632 item->size = size; 6633 memcpy(item->str, rb_string, size); 6634 6635 if (nested) { 6636 data->bytes_alloc_nested += event_len; 6637 data->bytes_written_nested += len; 6638 data->events_nested++; 6639 if (!data->min_size_nested || len < data->min_size_nested) 6640 data->min_size_nested = len; 6641 if (len > data->max_size_nested) 6642 data->max_size_nested = len; 6643 } else { 6644 data->bytes_alloc += event_len; 6645 data->bytes_written += len; 6646 data->events++; 6647 if (!data->min_size || len < data->min_size) 6648 data->max_size = len; 6649 if (len > data->max_size) 6650 data->max_size = len; 6651 } 6652 6653 out: 6654 ring_buffer_unlock_commit(data->buffer); 6655 6656 return 0; 6657 } 6658 6659 static __init int rb_test(void *arg) 6660 { 6661 struct rb_test_data *data = arg; 6662 6663 while (!kthread_should_stop()) { 6664 rb_write_something(data, false); 6665 data->cnt++; 6666 6667 set_current_state(TASK_INTERRUPTIBLE); 6668 /* Now sleep between a min of 100-300us and a max of 1ms */ 6669 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6670 } 6671 6672 return 0; 6673 } 6674 6675 static __init void rb_ipi(void *ignore) 6676 { 6677 struct rb_test_data *data; 6678 int cpu = smp_processor_id(); 6679 6680 data = &rb_data[cpu]; 6681 rb_write_something(data, true); 6682 } 6683 6684 static __init int rb_hammer_test(void *arg) 6685 { 6686 while (!kthread_should_stop()) { 6687 6688 /* Send an IPI to all cpus to write data! */ 6689 smp_call_function(rb_ipi, NULL, 1); 6690 /* No sleep, but for non preempt, let others run */ 6691 schedule(); 6692 } 6693 6694 return 0; 6695 } 6696 6697 static __init int test_ringbuffer(void) 6698 { 6699 struct task_struct *rb_hammer; 6700 struct trace_buffer *buffer; 6701 int cpu; 6702 int ret = 0; 6703 6704 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6705 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6706 return 0; 6707 } 6708 6709 pr_info("Running ring buffer tests...\n"); 6710 6711 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6712 if (WARN_ON(!buffer)) 6713 return 0; 6714 6715 /* Disable buffer so that threads can't write to it yet */ 6716 ring_buffer_record_off(buffer); 6717 6718 for_each_online_cpu(cpu) { 6719 rb_data[cpu].buffer = buffer; 6720 rb_data[cpu].cpu = cpu; 6721 rb_data[cpu].cnt = cpu; 6722 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6723 cpu, "rbtester/%u"); 6724 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6725 pr_cont("FAILED\n"); 6726 ret = PTR_ERR(rb_threads[cpu]); 6727 goto out_free; 6728 } 6729 } 6730 6731 /* Now create the rb hammer! */ 6732 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6733 if (WARN_ON(IS_ERR(rb_hammer))) { 6734 pr_cont("FAILED\n"); 6735 ret = PTR_ERR(rb_hammer); 6736 goto out_free; 6737 } 6738 6739 ring_buffer_record_on(buffer); 6740 /* 6741 * Show buffer is enabled before setting rb_test_started. 6742 * Yes there's a small race window where events could be 6743 * dropped and the thread wont catch it. But when a ring 6744 * buffer gets enabled, there will always be some kind of 6745 * delay before other CPUs see it. Thus, we don't care about 6746 * those dropped events. We care about events dropped after 6747 * the threads see that the buffer is active. 6748 */ 6749 smp_wmb(); 6750 rb_test_started = true; 6751 6752 set_current_state(TASK_INTERRUPTIBLE); 6753 /* Just run for 10 seconds */; 6754 schedule_timeout(10 * HZ); 6755 6756 kthread_stop(rb_hammer); 6757 6758 out_free: 6759 for_each_online_cpu(cpu) { 6760 if (!rb_threads[cpu]) 6761 break; 6762 kthread_stop(rb_threads[cpu]); 6763 } 6764 if (ret) { 6765 ring_buffer_free(buffer); 6766 return ret; 6767 } 6768 6769 /* Report! */ 6770 pr_info("finished\n"); 6771 for_each_online_cpu(cpu) { 6772 struct ring_buffer_event *event; 6773 struct rb_test_data *data = &rb_data[cpu]; 6774 struct rb_item *item; 6775 unsigned long total_events; 6776 unsigned long total_dropped; 6777 unsigned long total_written; 6778 unsigned long total_alloc; 6779 unsigned long total_read = 0; 6780 unsigned long total_size = 0; 6781 unsigned long total_len = 0; 6782 unsigned long total_lost = 0; 6783 unsigned long lost; 6784 int big_event_size; 6785 int small_event_size; 6786 6787 ret = -1; 6788 6789 total_events = data->events + data->events_nested; 6790 total_written = data->bytes_written + data->bytes_written_nested; 6791 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6792 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6793 6794 big_event_size = data->max_size + data->max_size_nested; 6795 small_event_size = data->min_size + data->min_size_nested; 6796 6797 pr_info("CPU %d:\n", cpu); 6798 pr_info(" events: %ld\n", total_events); 6799 pr_info(" dropped bytes: %ld\n", total_dropped); 6800 pr_info(" alloced bytes: %ld\n", total_alloc); 6801 pr_info(" written bytes: %ld\n", total_written); 6802 pr_info(" biggest event: %d\n", big_event_size); 6803 pr_info(" smallest event: %d\n", small_event_size); 6804 6805 if (RB_WARN_ON(buffer, total_dropped)) 6806 break; 6807 6808 ret = 0; 6809 6810 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6811 total_lost += lost; 6812 item = ring_buffer_event_data(event); 6813 total_len += ring_buffer_event_length(event); 6814 total_size += item->size + sizeof(struct rb_item); 6815 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6816 pr_info("FAILED!\n"); 6817 pr_info("buffer had: %.*s\n", item->size, item->str); 6818 pr_info("expected: %.*s\n", item->size, rb_string); 6819 RB_WARN_ON(buffer, 1); 6820 ret = -1; 6821 break; 6822 } 6823 total_read++; 6824 } 6825 if (ret) 6826 break; 6827 6828 ret = -1; 6829 6830 pr_info(" read events: %ld\n", total_read); 6831 pr_info(" lost events: %ld\n", total_lost); 6832 pr_info(" total events: %ld\n", total_lost + total_read); 6833 pr_info(" recorded len bytes: %ld\n", total_len); 6834 pr_info(" recorded size bytes: %ld\n", total_size); 6835 if (total_lost) { 6836 pr_info(" With dropped events, record len and size may not match\n" 6837 " alloced and written from above\n"); 6838 } else { 6839 if (RB_WARN_ON(buffer, total_len != total_alloc || 6840 total_size != total_written)) 6841 break; 6842 } 6843 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6844 break; 6845 6846 ret = 0; 6847 } 6848 if (!ret) 6849 pr_info("Ring buffer PASSED!\n"); 6850 6851 ring_buffer_free(buffer); 6852 return 0; 6853 } 6854 6855 late_initcall(test_ringbuffer); 6856 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6857