1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Generic ring buffer 4 * 5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> 6 */ 7 #include <linux/trace_recursion.h> 8 #include <linux/trace_events.h> 9 #include <linux/ring_buffer.h> 10 #include <linux/trace_clock.h> 11 #include <linux/sched/clock.h> 12 #include <linux/cacheflush.h> 13 #include <linux/trace_seq.h> 14 #include <linux/spinlock.h> 15 #include <linux/irq_work.h> 16 #include <linux/security.h> 17 #include <linux/uaccess.h> 18 #include <linux/hardirq.h> 19 #include <linux/kthread.h> /* for self test */ 20 #include <linux/module.h> 21 #include <linux/percpu.h> 22 #include <linux/mutex.h> 23 #include <linux/delay.h> 24 #include <linux/slab.h> 25 #include <linux/init.h> 26 #include <linux/hash.h> 27 #include <linux/list.h> 28 #include <linux/cpu.h> 29 #include <linux/oom.h> 30 #include <linux/mm.h> 31 32 #include <asm/local64.h> 33 #include <asm/local.h> 34 35 /* 36 * The "absolute" timestamp in the buffer is only 59 bits. 37 * If a clock has the 5 MSBs set, it needs to be saved and 38 * reinserted. 39 */ 40 #define TS_MSB (0xf8ULL << 56) 41 #define ABS_TS_MASK (~TS_MSB) 42 43 static void update_pages_handler(struct work_struct *work); 44 45 /* 46 * The ring buffer header is special. We must manually up keep it. 47 */ 48 int ring_buffer_print_entry_header(struct trace_seq *s) 49 { 50 trace_seq_puts(s, "# compressed entry header\n"); 51 trace_seq_puts(s, "\ttype_len : 5 bits\n"); 52 trace_seq_puts(s, "\ttime_delta : 27 bits\n"); 53 trace_seq_puts(s, "\tarray : 32 bits\n"); 54 trace_seq_putc(s, '\n'); 55 trace_seq_printf(s, "\tpadding : type == %d\n", 56 RINGBUF_TYPE_PADDING); 57 trace_seq_printf(s, "\ttime_extend : type == %d\n", 58 RINGBUF_TYPE_TIME_EXTEND); 59 trace_seq_printf(s, "\ttime_stamp : type == %d\n", 60 RINGBUF_TYPE_TIME_STAMP); 61 trace_seq_printf(s, "\tdata max type_len == %d\n", 62 RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 63 64 return !trace_seq_has_overflowed(s); 65 } 66 67 /* 68 * The ring buffer is made up of a list of pages. A separate list of pages is 69 * allocated for each CPU. A writer may only write to a buffer that is 70 * associated with the CPU it is currently executing on. A reader may read 71 * from any per cpu buffer. 72 * 73 * The reader is special. For each per cpu buffer, the reader has its own 74 * reader page. When a reader has read the entire reader page, this reader 75 * page is swapped with another page in the ring buffer. 76 * 77 * Now, as long as the writer is off the reader page, the reader can do what 78 * ever it wants with that page. The writer will never write to that page 79 * again (as long as it is out of the ring buffer). 80 * 81 * Here's some silly ASCII art. 82 * 83 * +------+ 84 * |reader| RING BUFFER 85 * |page | 86 * +------+ +---+ +---+ +---+ 87 * | |-->| |-->| | 88 * +---+ +---+ +---+ 89 * ^ | 90 * | | 91 * +---------------+ 92 * 93 * 94 * +------+ 95 * |reader| RING BUFFER 96 * |page |------------------v 97 * +------+ +---+ +---+ +---+ 98 * | |-->| |-->| | 99 * +---+ +---+ +---+ 100 * ^ | 101 * | | 102 * +---------------+ 103 * 104 * 105 * +------+ 106 * |reader| RING BUFFER 107 * |page |------------------v 108 * +------+ +---+ +---+ +---+ 109 * ^ | |-->| |-->| | 110 * | +---+ +---+ +---+ 111 * | | 112 * | | 113 * +------------------------------+ 114 * 115 * 116 * +------+ 117 * |buffer| RING BUFFER 118 * |page |------------------v 119 * +------+ +---+ +---+ +---+ 120 * ^ | | | |-->| | 121 * | New +---+ +---+ +---+ 122 * | Reader------^ | 123 * | page | 124 * +------------------------------+ 125 * 126 * 127 * After we make this swap, the reader can hand this page off to the splice 128 * code and be done with it. It can even allocate a new page if it needs to 129 * and swap that into the ring buffer. 130 * 131 * We will be using cmpxchg soon to make all this lockless. 132 * 133 */ 134 135 /* Used for individual buffers (after the counter) */ 136 #define RB_BUFFER_OFF (1 << 20) 137 138 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data) 139 140 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array)) 141 #define RB_ALIGNMENT 4U 142 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 143 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ 144 145 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS 146 # define RB_FORCE_8BYTE_ALIGNMENT 0 147 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT 148 #else 149 # define RB_FORCE_8BYTE_ALIGNMENT 1 150 # define RB_ARCH_ALIGNMENT 8U 151 #endif 152 153 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) 154 155 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ 156 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX 157 158 enum { 159 RB_LEN_TIME_EXTEND = 8, 160 RB_LEN_TIME_STAMP = 8, 161 }; 162 163 #define skip_time_extend(event) \ 164 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND)) 165 166 #define extended_time(event) \ 167 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND) 168 169 static inline bool rb_null_event(struct ring_buffer_event *event) 170 { 171 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta; 172 } 173 174 static void rb_event_set_padding(struct ring_buffer_event *event) 175 { 176 /* padding has a NULL time_delta */ 177 event->type_len = RINGBUF_TYPE_PADDING; 178 event->time_delta = 0; 179 } 180 181 static unsigned 182 rb_event_data_length(struct ring_buffer_event *event) 183 { 184 unsigned length; 185 186 if (event->type_len) 187 length = event->type_len * RB_ALIGNMENT; 188 else 189 length = event->array[0]; 190 return length + RB_EVNT_HDR_SIZE; 191 } 192 193 /* 194 * Return the length of the given event. Will return 195 * the length of the time extend if the event is a 196 * time extend. 197 */ 198 static inline unsigned 199 rb_event_length(struct ring_buffer_event *event) 200 { 201 switch (event->type_len) { 202 case RINGBUF_TYPE_PADDING: 203 if (rb_null_event(event)) 204 /* undefined */ 205 return -1; 206 return event->array[0] + RB_EVNT_HDR_SIZE; 207 208 case RINGBUF_TYPE_TIME_EXTEND: 209 return RB_LEN_TIME_EXTEND; 210 211 case RINGBUF_TYPE_TIME_STAMP: 212 return RB_LEN_TIME_STAMP; 213 214 case RINGBUF_TYPE_DATA: 215 return rb_event_data_length(event); 216 default: 217 WARN_ON_ONCE(1); 218 } 219 /* not hit */ 220 return 0; 221 } 222 223 /* 224 * Return total length of time extend and data, 225 * or just the event length for all other events. 226 */ 227 static inline unsigned 228 rb_event_ts_length(struct ring_buffer_event *event) 229 { 230 unsigned len = 0; 231 232 if (extended_time(event)) { 233 /* time extends include the data event after it */ 234 len = RB_LEN_TIME_EXTEND; 235 event = skip_time_extend(event); 236 } 237 return len + rb_event_length(event); 238 } 239 240 /** 241 * ring_buffer_event_length - return the length of the event 242 * @event: the event to get the length of 243 * 244 * Returns the size of the data load of a data event. 245 * If the event is something other than a data event, it 246 * returns the size of the event itself. With the exception 247 * of a TIME EXTEND, where it still returns the size of the 248 * data load of the data event after it. 249 */ 250 unsigned ring_buffer_event_length(struct ring_buffer_event *event) 251 { 252 unsigned length; 253 254 if (extended_time(event)) 255 event = skip_time_extend(event); 256 257 length = rb_event_length(event); 258 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 259 return length; 260 length -= RB_EVNT_HDR_SIZE; 261 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0])) 262 length -= sizeof(event->array[0]); 263 return length; 264 } 265 EXPORT_SYMBOL_GPL(ring_buffer_event_length); 266 267 /* inline for ring buffer fast paths */ 268 static __always_inline void * 269 rb_event_data(struct ring_buffer_event *event) 270 { 271 if (extended_time(event)) 272 event = skip_time_extend(event); 273 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); 274 /* If length is in len field, then array[0] has the data */ 275 if (event->type_len) 276 return (void *)&event->array[0]; 277 /* Otherwise length is in array[0] and array[1] has the data */ 278 return (void *)&event->array[1]; 279 } 280 281 /** 282 * ring_buffer_event_data - return the data of the event 283 * @event: the event to get the data from 284 */ 285 void *ring_buffer_event_data(struct ring_buffer_event *event) 286 { 287 return rb_event_data(event); 288 } 289 EXPORT_SYMBOL_GPL(ring_buffer_event_data); 290 291 #define for_each_buffer_cpu(buffer, cpu) \ 292 for_each_cpu(cpu, buffer->cpumask) 293 294 #define for_each_online_buffer_cpu(buffer, cpu) \ 295 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) 296 297 #define TS_SHIFT 27 298 #define TS_MASK ((1ULL << TS_SHIFT) - 1) 299 #define TS_DELTA_TEST (~TS_MASK) 300 301 static u64 rb_event_time_stamp(struct ring_buffer_event *event) 302 { 303 u64 ts; 304 305 ts = event->array[0]; 306 ts <<= TS_SHIFT; 307 ts += event->time_delta; 308 309 return ts; 310 } 311 312 /* Flag when events were overwritten */ 313 #define RB_MISSED_EVENTS (1 << 31) 314 /* Missed count stored at end */ 315 #define RB_MISSED_STORED (1 << 30) 316 317 #define RB_MISSED_MASK (3 << 30) 318 319 struct buffer_data_page { 320 u64 time_stamp; /* page time stamp */ 321 local_t commit; /* write committed index */ 322 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */ 323 }; 324 325 struct buffer_data_read_page { 326 unsigned order; /* order of the page */ 327 struct buffer_data_page *data; /* actual data, stored in this page */ 328 }; 329 330 /* 331 * Note, the buffer_page list must be first. The buffer pages 332 * are allocated in cache lines, which means that each buffer 333 * page will be at the beginning of a cache line, and thus 334 * the least significant bits will be zero. We use this to 335 * add flags in the list struct pointers, to make the ring buffer 336 * lockless. 337 */ 338 struct buffer_page { 339 struct list_head list; /* list of buffer pages */ 340 local_t write; /* index for next write */ 341 unsigned read; /* index for next read */ 342 local_t entries; /* entries on this page */ 343 unsigned long real_end; /* real end of data */ 344 unsigned order; /* order of the page */ 345 u32 id; /* ID for external mapping */ 346 struct buffer_data_page *page; /* Actual data page */ 347 }; 348 349 /* 350 * The buffer page counters, write and entries, must be reset 351 * atomically when crossing page boundaries. To synchronize this 352 * update, two counters are inserted into the number. One is 353 * the actual counter for the write position or count on the page. 354 * 355 * The other is a counter of updaters. Before an update happens 356 * the update partition of the counter is incremented. This will 357 * allow the updater to update the counter atomically. 358 * 359 * The counter is 20 bits, and the state data is 12. 360 */ 361 #define RB_WRITE_MASK 0xfffff 362 #define RB_WRITE_INTCNT (1 << 20) 363 364 static void rb_init_page(struct buffer_data_page *bpage) 365 { 366 local_set(&bpage->commit, 0); 367 } 368 369 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage) 370 { 371 return local_read(&bpage->page->commit); 372 } 373 374 static void free_buffer_page(struct buffer_page *bpage) 375 { 376 free_pages((unsigned long)bpage->page, bpage->order); 377 kfree(bpage); 378 } 379 380 /* 381 * We need to fit the time_stamp delta into 27 bits. 382 */ 383 static inline bool test_time_stamp(u64 delta) 384 { 385 return !!(delta & TS_DELTA_TEST); 386 } 387 388 struct rb_irq_work { 389 struct irq_work work; 390 wait_queue_head_t waiters; 391 wait_queue_head_t full_waiters; 392 atomic_t seq; 393 bool waiters_pending; 394 bool full_waiters_pending; 395 bool wakeup_full; 396 }; 397 398 /* 399 * Structure to hold event state and handle nested events. 400 */ 401 struct rb_event_info { 402 u64 ts; 403 u64 delta; 404 u64 before; 405 u64 after; 406 unsigned long length; 407 struct buffer_page *tail_page; 408 int add_timestamp; 409 }; 410 411 /* 412 * Used for the add_timestamp 413 * NONE 414 * EXTEND - wants a time extend 415 * ABSOLUTE - the buffer requests all events to have absolute time stamps 416 * FORCE - force a full time stamp. 417 */ 418 enum { 419 RB_ADD_STAMP_NONE = 0, 420 RB_ADD_STAMP_EXTEND = BIT(1), 421 RB_ADD_STAMP_ABSOLUTE = BIT(2), 422 RB_ADD_STAMP_FORCE = BIT(3) 423 }; 424 /* 425 * Used for which event context the event is in. 426 * TRANSITION = 0 427 * NMI = 1 428 * IRQ = 2 429 * SOFTIRQ = 3 430 * NORMAL = 4 431 * 432 * See trace_recursive_lock() comment below for more details. 433 */ 434 enum { 435 RB_CTX_TRANSITION, 436 RB_CTX_NMI, 437 RB_CTX_IRQ, 438 RB_CTX_SOFTIRQ, 439 RB_CTX_NORMAL, 440 RB_CTX_MAX 441 }; 442 443 struct rb_time_struct { 444 local64_t time; 445 }; 446 typedef struct rb_time_struct rb_time_t; 447 448 #define MAX_NEST 5 449 450 /* 451 * head_page == tail_page && head == tail then buffer is empty. 452 */ 453 struct ring_buffer_per_cpu { 454 int cpu; 455 atomic_t record_disabled; 456 atomic_t resize_disabled; 457 struct trace_buffer *buffer; 458 raw_spinlock_t reader_lock; /* serialize readers */ 459 arch_spinlock_t lock; 460 struct lock_class_key lock_key; 461 struct buffer_data_page *free_page; 462 unsigned long nr_pages; 463 unsigned int current_context; 464 struct list_head *pages; 465 struct buffer_page *head_page; /* read from head */ 466 struct buffer_page *tail_page; /* write to tail */ 467 struct buffer_page *commit_page; /* committed pages */ 468 struct buffer_page *reader_page; 469 unsigned long lost_events; 470 unsigned long last_overrun; 471 unsigned long nest; 472 local_t entries_bytes; 473 local_t entries; 474 local_t overrun; 475 local_t commit_overrun; 476 local_t dropped_events; 477 local_t committing; 478 local_t commits; 479 local_t pages_touched; 480 local_t pages_lost; 481 local_t pages_read; 482 long last_pages_touch; 483 size_t shortest_full; 484 unsigned long read; 485 unsigned long read_bytes; 486 rb_time_t write_stamp; 487 rb_time_t before_stamp; 488 u64 event_stamp[MAX_NEST]; 489 u64 read_stamp; 490 /* pages removed since last reset */ 491 unsigned long pages_removed; 492 493 unsigned int mapped; 494 struct mutex mapping_lock; 495 unsigned long *subbuf_ids; /* ID to subbuf VA */ 496 struct trace_buffer_meta *meta_page; 497 498 /* ring buffer pages to update, > 0 to add, < 0 to remove */ 499 long nr_pages_to_update; 500 struct list_head new_pages; /* new pages to add */ 501 struct work_struct update_pages_work; 502 struct completion update_done; 503 504 struct rb_irq_work irq_work; 505 }; 506 507 struct trace_buffer { 508 unsigned flags; 509 int cpus; 510 atomic_t record_disabled; 511 atomic_t resizing; 512 cpumask_var_t cpumask; 513 514 struct lock_class_key *reader_lock_key; 515 516 struct mutex mutex; 517 518 struct ring_buffer_per_cpu **buffers; 519 520 struct hlist_node node; 521 u64 (*clock)(void); 522 523 struct rb_irq_work irq_work; 524 bool time_stamp_abs; 525 526 unsigned int subbuf_size; 527 unsigned int subbuf_order; 528 unsigned int max_data_size; 529 }; 530 531 struct ring_buffer_iter { 532 struct ring_buffer_per_cpu *cpu_buffer; 533 unsigned long head; 534 unsigned long next_event; 535 struct buffer_page *head_page; 536 struct buffer_page *cache_reader_page; 537 unsigned long cache_read; 538 unsigned long cache_pages_removed; 539 u64 read_stamp; 540 u64 page_stamp; 541 struct ring_buffer_event *event; 542 size_t event_size; 543 int missed_events; 544 }; 545 546 int ring_buffer_print_page_header(struct trace_buffer *buffer, struct trace_seq *s) 547 { 548 struct buffer_data_page field; 549 550 trace_seq_printf(s, "\tfield: u64 timestamp;\t" 551 "offset:0;\tsize:%u;\tsigned:%u;\n", 552 (unsigned int)sizeof(field.time_stamp), 553 (unsigned int)is_signed_type(u64)); 554 555 trace_seq_printf(s, "\tfield: local_t commit;\t" 556 "offset:%u;\tsize:%u;\tsigned:%u;\n", 557 (unsigned int)offsetof(typeof(field), commit), 558 (unsigned int)sizeof(field.commit), 559 (unsigned int)is_signed_type(long)); 560 561 trace_seq_printf(s, "\tfield: int overwrite;\t" 562 "offset:%u;\tsize:%u;\tsigned:%u;\n", 563 (unsigned int)offsetof(typeof(field), commit), 564 1, 565 (unsigned int)is_signed_type(long)); 566 567 trace_seq_printf(s, "\tfield: char data;\t" 568 "offset:%u;\tsize:%u;\tsigned:%u;\n", 569 (unsigned int)offsetof(typeof(field), data), 570 (unsigned int)buffer->subbuf_size, 571 (unsigned int)is_signed_type(char)); 572 573 return !trace_seq_has_overflowed(s); 574 } 575 576 static inline void rb_time_read(rb_time_t *t, u64 *ret) 577 { 578 *ret = local64_read(&t->time); 579 } 580 static void rb_time_set(rb_time_t *t, u64 val) 581 { 582 local64_set(&t->time, val); 583 } 584 585 /* 586 * Enable this to make sure that the event passed to 587 * ring_buffer_event_time_stamp() is not committed and also 588 * is on the buffer that it passed in. 589 */ 590 //#define RB_VERIFY_EVENT 591 #ifdef RB_VERIFY_EVENT 592 static struct list_head *rb_list_head(struct list_head *list); 593 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 594 void *event) 595 { 596 struct buffer_page *page = cpu_buffer->commit_page; 597 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); 598 struct list_head *next; 599 long commit, write; 600 unsigned long addr = (unsigned long)event; 601 bool done = false; 602 int stop = 0; 603 604 /* Make sure the event exists and is not committed yet */ 605 do { 606 if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) 607 done = true; 608 commit = local_read(&page->page->commit); 609 write = local_read(&page->write); 610 if (addr >= (unsigned long)&page->page->data[commit] && 611 addr < (unsigned long)&page->page->data[write]) 612 return; 613 614 next = rb_list_head(page->list.next); 615 page = list_entry(next, struct buffer_page, list); 616 } while (!done); 617 WARN_ON_ONCE(1); 618 } 619 #else 620 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, 621 void *event) 622 { 623 } 624 #endif 625 626 /* 627 * The absolute time stamp drops the 5 MSBs and some clocks may 628 * require them. The rb_fix_abs_ts() will take a previous full 629 * time stamp, and add the 5 MSB of that time stamp on to the 630 * saved absolute time stamp. Then they are compared in case of 631 * the unlikely event that the latest time stamp incremented 632 * the 5 MSB. 633 */ 634 static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) 635 { 636 if (save_ts & TS_MSB) { 637 abs |= save_ts & TS_MSB; 638 /* Check for overflow */ 639 if (unlikely(abs < save_ts)) 640 abs += 1ULL << 59; 641 } 642 return abs; 643 } 644 645 static inline u64 rb_time_stamp(struct trace_buffer *buffer); 646 647 /** 648 * ring_buffer_event_time_stamp - return the event's current time stamp 649 * @buffer: The buffer that the event is on 650 * @event: the event to get the time stamp of 651 * 652 * Note, this must be called after @event is reserved, and before it is 653 * committed to the ring buffer. And must be called from the same 654 * context where the event was reserved (normal, softirq, irq, etc). 655 * 656 * Returns the time stamp associated with the current event. 657 * If the event has an extended time stamp, then that is used as 658 * the time stamp to return. 659 * In the highly unlikely case that the event was nested more than 660 * the max nesting, then the write_stamp of the buffer is returned, 661 * otherwise current time is returned, but that really neither of 662 * the last two cases should ever happen. 663 */ 664 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, 665 struct ring_buffer_event *event) 666 { 667 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; 668 unsigned int nest; 669 u64 ts; 670 671 /* If the event includes an absolute time, then just use that */ 672 if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { 673 ts = rb_event_time_stamp(event); 674 return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); 675 } 676 677 nest = local_read(&cpu_buffer->committing); 678 verify_event(cpu_buffer, event); 679 if (WARN_ON_ONCE(!nest)) 680 goto fail; 681 682 /* Read the current saved nesting level time stamp */ 683 if (likely(--nest < MAX_NEST)) 684 return cpu_buffer->event_stamp[nest]; 685 686 /* Shouldn't happen, warn if it does */ 687 WARN_ONCE(1, "nest (%d) greater than max", nest); 688 689 fail: 690 rb_time_read(&cpu_buffer->write_stamp, &ts); 691 692 return ts; 693 } 694 695 /** 696 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer 697 * @buffer: The ring_buffer to get the number of pages from 698 * @cpu: The cpu of the ring_buffer to get the number of pages from 699 * 700 * Returns the number of pages used by a per_cpu buffer of the ring buffer. 701 */ 702 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) 703 { 704 return buffer->buffers[cpu]->nr_pages; 705 } 706 707 /** 708 * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer 709 * @buffer: The ring_buffer to get the number of pages from 710 * @cpu: The cpu of the ring_buffer to get the number of pages from 711 * 712 * Returns the number of pages that have content in the ring buffer. 713 */ 714 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu) 715 { 716 size_t read; 717 size_t lost; 718 size_t cnt; 719 720 read = local_read(&buffer->buffers[cpu]->pages_read); 721 lost = local_read(&buffer->buffers[cpu]->pages_lost); 722 cnt = local_read(&buffer->buffers[cpu]->pages_touched); 723 724 if (WARN_ON_ONCE(cnt < lost)) 725 return 0; 726 727 cnt -= lost; 728 729 /* The reader can read an empty page, but not more than that */ 730 if (cnt < read) { 731 WARN_ON_ONCE(read > cnt + 1); 732 return 0; 733 } 734 735 return cnt - read; 736 } 737 738 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full) 739 { 740 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 741 size_t nr_pages; 742 size_t dirty; 743 744 nr_pages = cpu_buffer->nr_pages; 745 if (!nr_pages || !full) 746 return true; 747 748 /* 749 * Add one as dirty will never equal nr_pages, as the sub-buffer 750 * that the writer is on is not counted as dirty. 751 * This is needed if "buffer_percent" is set to 100. 752 */ 753 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1; 754 755 return (dirty * 100) >= (full * nr_pages); 756 } 757 758 /* 759 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input 760 * 761 * Schedules a delayed work to wake up any task that is blocked on the 762 * ring buffer waiters queue. 763 */ 764 static void rb_wake_up_waiters(struct irq_work *work) 765 { 766 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); 767 768 /* For waiters waiting for the first wake up */ 769 (void)atomic_fetch_inc_release(&rbwork->seq); 770 771 wake_up_all(&rbwork->waiters); 772 if (rbwork->full_waiters_pending || rbwork->wakeup_full) { 773 /* Only cpu_buffer sets the above flags */ 774 struct ring_buffer_per_cpu *cpu_buffer = 775 container_of(rbwork, struct ring_buffer_per_cpu, irq_work); 776 777 /* Called from interrupt context */ 778 raw_spin_lock(&cpu_buffer->reader_lock); 779 rbwork->wakeup_full = false; 780 rbwork->full_waiters_pending = false; 781 782 /* Waking up all waiters, they will reset the shortest full */ 783 cpu_buffer->shortest_full = 0; 784 raw_spin_unlock(&cpu_buffer->reader_lock); 785 786 wake_up_all(&rbwork->full_waiters); 787 } 788 } 789 790 /** 791 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer 792 * @buffer: The ring buffer to wake waiters on 793 * @cpu: The CPU buffer to wake waiters on 794 * 795 * In the case of a file that represents a ring buffer is closing, 796 * it is prudent to wake up any waiters that are on this. 797 */ 798 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) 799 { 800 struct ring_buffer_per_cpu *cpu_buffer; 801 struct rb_irq_work *rbwork; 802 803 if (!buffer) 804 return; 805 806 if (cpu == RING_BUFFER_ALL_CPUS) { 807 808 /* Wake up individual ones too. One level recursion */ 809 for_each_buffer_cpu(buffer, cpu) 810 ring_buffer_wake_waiters(buffer, cpu); 811 812 rbwork = &buffer->irq_work; 813 } else { 814 if (WARN_ON_ONCE(!buffer->buffers)) 815 return; 816 if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) 817 return; 818 819 cpu_buffer = buffer->buffers[cpu]; 820 /* The CPU buffer may not have been initialized yet */ 821 if (!cpu_buffer) 822 return; 823 rbwork = &cpu_buffer->irq_work; 824 } 825 826 /* This can be called in any context */ 827 irq_work_queue(&rbwork->work); 828 } 829 830 static bool rb_watermark_hit(struct trace_buffer *buffer, int cpu, int full) 831 { 832 struct ring_buffer_per_cpu *cpu_buffer; 833 bool ret = false; 834 835 /* Reads of all CPUs always waits for any data */ 836 if (cpu == RING_BUFFER_ALL_CPUS) 837 return !ring_buffer_empty(buffer); 838 839 cpu_buffer = buffer->buffers[cpu]; 840 841 if (!ring_buffer_empty_cpu(buffer, cpu)) { 842 unsigned long flags; 843 bool pagebusy; 844 845 if (!full) 846 return true; 847 848 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 849 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page; 850 ret = !pagebusy && full_hit(buffer, cpu, full); 851 852 if (!ret && (!cpu_buffer->shortest_full || 853 cpu_buffer->shortest_full > full)) { 854 cpu_buffer->shortest_full = full; 855 } 856 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 857 } 858 return ret; 859 } 860 861 static inline bool 862 rb_wait_cond(struct rb_irq_work *rbwork, struct trace_buffer *buffer, 863 int cpu, int full, ring_buffer_cond_fn cond, void *data) 864 { 865 if (rb_watermark_hit(buffer, cpu, full)) 866 return true; 867 868 if (cond(data)) 869 return true; 870 871 /* 872 * The events can happen in critical sections where 873 * checking a work queue can cause deadlocks. 874 * After adding a task to the queue, this flag is set 875 * only to notify events to try to wake up the queue 876 * using irq_work. 877 * 878 * We don't clear it even if the buffer is no longer 879 * empty. The flag only causes the next event to run 880 * irq_work to do the work queue wake up. The worse 881 * that can happen if we race with !trace_empty() is that 882 * an event will cause an irq_work to try to wake up 883 * an empty queue. 884 * 885 * There's no reason to protect this flag either, as 886 * the work queue and irq_work logic will do the necessary 887 * synchronization for the wake ups. The only thing 888 * that is necessary is that the wake up happens after 889 * a task has been queued. It's OK for spurious wake ups. 890 */ 891 if (full) 892 rbwork->full_waiters_pending = true; 893 else 894 rbwork->waiters_pending = true; 895 896 return false; 897 } 898 899 struct rb_wait_data { 900 struct rb_irq_work *irq_work; 901 int seq; 902 }; 903 904 /* 905 * The default wait condition for ring_buffer_wait() is to just to exit the 906 * wait loop the first time it is woken up. 907 */ 908 static bool rb_wait_once(void *data) 909 { 910 struct rb_wait_data *rdata = data; 911 struct rb_irq_work *rbwork = rdata->irq_work; 912 913 return atomic_read_acquire(&rbwork->seq) != rdata->seq; 914 } 915 916 /** 917 * ring_buffer_wait - wait for input to the ring buffer 918 * @buffer: buffer to wait on 919 * @cpu: the cpu buffer to wait on 920 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 921 * @cond: condition function to break out of wait (NULL to run once) 922 * @data: the data to pass to @cond. 923 * 924 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 925 * as data is added to any of the @buffer's cpu buffers. Otherwise 926 * it will wait for data to be added to a specific cpu buffer. 927 */ 928 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full, 929 ring_buffer_cond_fn cond, void *data) 930 { 931 struct ring_buffer_per_cpu *cpu_buffer; 932 struct wait_queue_head *waitq; 933 struct rb_irq_work *rbwork; 934 struct rb_wait_data rdata; 935 int ret = 0; 936 937 /* 938 * Depending on what the caller is waiting for, either any 939 * data in any cpu buffer, or a specific buffer, put the 940 * caller on the appropriate wait queue. 941 */ 942 if (cpu == RING_BUFFER_ALL_CPUS) { 943 rbwork = &buffer->irq_work; 944 /* Full only makes sense on per cpu reads */ 945 full = 0; 946 } else { 947 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 948 return -ENODEV; 949 cpu_buffer = buffer->buffers[cpu]; 950 rbwork = &cpu_buffer->irq_work; 951 } 952 953 if (full) 954 waitq = &rbwork->full_waiters; 955 else 956 waitq = &rbwork->waiters; 957 958 /* Set up to exit loop as soon as it is woken */ 959 if (!cond) { 960 cond = rb_wait_once; 961 rdata.irq_work = rbwork; 962 rdata.seq = atomic_read_acquire(&rbwork->seq); 963 data = &rdata; 964 } 965 966 ret = wait_event_interruptible((*waitq), 967 rb_wait_cond(rbwork, buffer, cpu, full, cond, data)); 968 969 return ret; 970 } 971 972 /** 973 * ring_buffer_poll_wait - poll on buffer input 974 * @buffer: buffer to wait on 975 * @cpu: the cpu buffer to wait on 976 * @filp: the file descriptor 977 * @poll_table: The poll descriptor 978 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS 979 * 980 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon 981 * as data is added to any of the @buffer's cpu buffers. Otherwise 982 * it will wait for data to be added to a specific cpu buffer. 983 * 984 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers, 985 * zero otherwise. 986 */ 987 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, 988 struct file *filp, poll_table *poll_table, int full) 989 { 990 struct ring_buffer_per_cpu *cpu_buffer; 991 struct rb_irq_work *rbwork; 992 993 if (cpu == RING_BUFFER_ALL_CPUS) { 994 rbwork = &buffer->irq_work; 995 full = 0; 996 } else { 997 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 998 return EPOLLERR; 999 1000 cpu_buffer = buffer->buffers[cpu]; 1001 rbwork = &cpu_buffer->irq_work; 1002 } 1003 1004 if (full) { 1005 poll_wait(filp, &rbwork->full_waiters, poll_table); 1006 1007 if (rb_watermark_hit(buffer, cpu, full)) 1008 return EPOLLIN | EPOLLRDNORM; 1009 /* 1010 * Only allow full_waiters_pending update to be seen after 1011 * the shortest_full is set (in rb_watermark_hit). If the 1012 * writer sees the full_waiters_pending flag set, it will 1013 * compare the amount in the ring buffer to shortest_full. 1014 * If the amount in the ring buffer is greater than the 1015 * shortest_full percent, it will call the irq_work handler 1016 * to wake up this list. The irq_handler will reset shortest_full 1017 * back to zero. That's done under the reader_lock, but 1018 * the below smp_mb() makes sure that the update to 1019 * full_waiters_pending doesn't leak up into the above. 1020 */ 1021 smp_mb(); 1022 rbwork->full_waiters_pending = true; 1023 return 0; 1024 } 1025 1026 poll_wait(filp, &rbwork->waiters, poll_table); 1027 rbwork->waiters_pending = true; 1028 1029 /* 1030 * There's a tight race between setting the waiters_pending and 1031 * checking if the ring buffer is empty. Once the waiters_pending bit 1032 * is set, the next event will wake the task up, but we can get stuck 1033 * if there's only a single event in. 1034 * 1035 * FIXME: Ideally, we need a memory barrier on the writer side as well, 1036 * but adding a memory barrier to all events will cause too much of a 1037 * performance hit in the fast path. We only need a memory barrier when 1038 * the buffer goes from empty to having content. But as this race is 1039 * extremely small, and it's not a problem if another event comes in, we 1040 * will fix it later. 1041 */ 1042 smp_mb(); 1043 1044 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) || 1045 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu))) 1046 return EPOLLIN | EPOLLRDNORM; 1047 return 0; 1048 } 1049 1050 /* buffer may be either ring_buffer or ring_buffer_per_cpu */ 1051 #define RB_WARN_ON(b, cond) \ 1052 ({ \ 1053 int _____ret = unlikely(cond); \ 1054 if (_____ret) { \ 1055 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \ 1056 struct ring_buffer_per_cpu *__b = \ 1057 (void *)b; \ 1058 atomic_inc(&__b->buffer->record_disabled); \ 1059 } else \ 1060 atomic_inc(&b->record_disabled); \ 1061 WARN_ON(1); \ 1062 } \ 1063 _____ret; \ 1064 }) 1065 1066 /* Up this if you want to test the TIME_EXTENTS and normalization */ 1067 #define DEBUG_SHIFT 0 1068 1069 static inline u64 rb_time_stamp(struct trace_buffer *buffer) 1070 { 1071 u64 ts; 1072 1073 /* Skip retpolines :-( */ 1074 if (IS_ENABLED(CONFIG_MITIGATION_RETPOLINE) && likely(buffer->clock == trace_clock_local)) 1075 ts = trace_clock_local(); 1076 else 1077 ts = buffer->clock(); 1078 1079 /* shift to debug/test normalization and TIME_EXTENTS */ 1080 return ts << DEBUG_SHIFT; 1081 } 1082 1083 u64 ring_buffer_time_stamp(struct trace_buffer *buffer) 1084 { 1085 u64 time; 1086 1087 preempt_disable_notrace(); 1088 time = rb_time_stamp(buffer); 1089 preempt_enable_notrace(); 1090 1091 return time; 1092 } 1093 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp); 1094 1095 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer, 1096 int cpu, u64 *ts) 1097 { 1098 /* Just stupid testing the normalize function and deltas */ 1099 *ts >>= DEBUG_SHIFT; 1100 } 1101 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp); 1102 1103 /* 1104 * Making the ring buffer lockless makes things tricky. 1105 * Although writes only happen on the CPU that they are on, 1106 * and they only need to worry about interrupts. Reads can 1107 * happen on any CPU. 1108 * 1109 * The reader page is always off the ring buffer, but when the 1110 * reader finishes with a page, it needs to swap its page with 1111 * a new one from the buffer. The reader needs to take from 1112 * the head (writes go to the tail). But if a writer is in overwrite 1113 * mode and wraps, it must push the head page forward. 1114 * 1115 * Here lies the problem. 1116 * 1117 * The reader must be careful to replace only the head page, and 1118 * not another one. As described at the top of the file in the 1119 * ASCII art, the reader sets its old page to point to the next 1120 * page after head. It then sets the page after head to point to 1121 * the old reader page. But if the writer moves the head page 1122 * during this operation, the reader could end up with the tail. 1123 * 1124 * We use cmpxchg to help prevent this race. We also do something 1125 * special with the page before head. We set the LSB to 1. 1126 * 1127 * When the writer must push the page forward, it will clear the 1128 * bit that points to the head page, move the head, and then set 1129 * the bit that points to the new head page. 1130 * 1131 * We also don't want an interrupt coming in and moving the head 1132 * page on another writer. Thus we use the second LSB to catch 1133 * that too. Thus: 1134 * 1135 * head->list->prev->next bit 1 bit 0 1136 * ------- ------- 1137 * Normal page 0 0 1138 * Points to head page 0 1 1139 * New head page 1 0 1140 * 1141 * Note we can not trust the prev pointer of the head page, because: 1142 * 1143 * +----+ +-----+ +-----+ 1144 * | |------>| T |---X--->| N | 1145 * | |<------| | | | 1146 * +----+ +-----+ +-----+ 1147 * ^ ^ | 1148 * | +-----+ | | 1149 * +----------| R |----------+ | 1150 * | |<-----------+ 1151 * +-----+ 1152 * 1153 * Key: ---X--> HEAD flag set in pointer 1154 * T Tail page 1155 * R Reader page 1156 * N Next page 1157 * 1158 * (see __rb_reserve_next() to see where this happens) 1159 * 1160 * What the above shows is that the reader just swapped out 1161 * the reader page with a page in the buffer, but before it 1162 * could make the new header point back to the new page added 1163 * it was preempted by a writer. The writer moved forward onto 1164 * the new page added by the reader and is about to move forward 1165 * again. 1166 * 1167 * You can see, it is legitimate for the previous pointer of 1168 * the head (or any page) not to point back to itself. But only 1169 * temporarily. 1170 */ 1171 1172 #define RB_PAGE_NORMAL 0UL 1173 #define RB_PAGE_HEAD 1UL 1174 #define RB_PAGE_UPDATE 2UL 1175 1176 1177 #define RB_FLAG_MASK 3UL 1178 1179 /* PAGE_MOVED is not part of the mask */ 1180 #define RB_PAGE_MOVED 4UL 1181 1182 /* 1183 * rb_list_head - remove any bit 1184 */ 1185 static struct list_head *rb_list_head(struct list_head *list) 1186 { 1187 unsigned long val = (unsigned long)list; 1188 1189 return (struct list_head *)(val & ~RB_FLAG_MASK); 1190 } 1191 1192 /* 1193 * rb_is_head_page - test if the given page is the head page 1194 * 1195 * Because the reader may move the head_page pointer, we can 1196 * not trust what the head page is (it may be pointing to 1197 * the reader page). But if the next page is a header page, 1198 * its flags will be non zero. 1199 */ 1200 static inline int 1201 rb_is_head_page(struct buffer_page *page, struct list_head *list) 1202 { 1203 unsigned long val; 1204 1205 val = (unsigned long)list->next; 1206 1207 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list) 1208 return RB_PAGE_MOVED; 1209 1210 return val & RB_FLAG_MASK; 1211 } 1212 1213 /* 1214 * rb_is_reader_page 1215 * 1216 * The unique thing about the reader page, is that, if the 1217 * writer is ever on it, the previous pointer never points 1218 * back to the reader page. 1219 */ 1220 static bool rb_is_reader_page(struct buffer_page *page) 1221 { 1222 struct list_head *list = page->list.prev; 1223 1224 return rb_list_head(list->next) != &page->list; 1225 } 1226 1227 /* 1228 * rb_set_list_to_head - set a list_head to be pointing to head. 1229 */ 1230 static void rb_set_list_to_head(struct list_head *list) 1231 { 1232 unsigned long *ptr; 1233 1234 ptr = (unsigned long *)&list->next; 1235 *ptr |= RB_PAGE_HEAD; 1236 *ptr &= ~RB_PAGE_UPDATE; 1237 } 1238 1239 /* 1240 * rb_head_page_activate - sets up head page 1241 */ 1242 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) 1243 { 1244 struct buffer_page *head; 1245 1246 head = cpu_buffer->head_page; 1247 if (!head) 1248 return; 1249 1250 /* 1251 * Set the previous list pointer to have the HEAD flag. 1252 */ 1253 rb_set_list_to_head(head->list.prev); 1254 } 1255 1256 static void rb_list_head_clear(struct list_head *list) 1257 { 1258 unsigned long *ptr = (unsigned long *)&list->next; 1259 1260 *ptr &= ~RB_FLAG_MASK; 1261 } 1262 1263 /* 1264 * rb_head_page_deactivate - clears head page ptr (for free list) 1265 */ 1266 static void 1267 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer) 1268 { 1269 struct list_head *hd; 1270 1271 /* Go through the whole list and clear any pointers found. */ 1272 rb_list_head_clear(cpu_buffer->pages); 1273 1274 list_for_each(hd, cpu_buffer->pages) 1275 rb_list_head_clear(hd); 1276 } 1277 1278 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer, 1279 struct buffer_page *head, 1280 struct buffer_page *prev, 1281 int old_flag, int new_flag) 1282 { 1283 struct list_head *list; 1284 unsigned long val = (unsigned long)&head->list; 1285 unsigned long ret; 1286 1287 list = &prev->list; 1288 1289 val &= ~RB_FLAG_MASK; 1290 1291 ret = cmpxchg((unsigned long *)&list->next, 1292 val | old_flag, val | new_flag); 1293 1294 /* check if the reader took the page */ 1295 if ((ret & ~RB_FLAG_MASK) != val) 1296 return RB_PAGE_MOVED; 1297 1298 return ret & RB_FLAG_MASK; 1299 } 1300 1301 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer, 1302 struct buffer_page *head, 1303 struct buffer_page *prev, 1304 int old_flag) 1305 { 1306 return rb_head_page_set(cpu_buffer, head, prev, 1307 old_flag, RB_PAGE_UPDATE); 1308 } 1309 1310 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer, 1311 struct buffer_page *head, 1312 struct buffer_page *prev, 1313 int old_flag) 1314 { 1315 return rb_head_page_set(cpu_buffer, head, prev, 1316 old_flag, RB_PAGE_HEAD); 1317 } 1318 1319 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, 1320 struct buffer_page *head, 1321 struct buffer_page *prev, 1322 int old_flag) 1323 { 1324 return rb_head_page_set(cpu_buffer, head, prev, 1325 old_flag, RB_PAGE_NORMAL); 1326 } 1327 1328 static inline void rb_inc_page(struct buffer_page **bpage) 1329 { 1330 struct list_head *p = rb_list_head((*bpage)->list.next); 1331 1332 *bpage = list_entry(p, struct buffer_page, list); 1333 } 1334 1335 static struct buffer_page * 1336 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) 1337 { 1338 struct buffer_page *head; 1339 struct buffer_page *page; 1340 struct list_head *list; 1341 int i; 1342 1343 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page)) 1344 return NULL; 1345 1346 /* sanity check */ 1347 list = cpu_buffer->pages; 1348 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list)) 1349 return NULL; 1350 1351 page = head = cpu_buffer->head_page; 1352 /* 1353 * It is possible that the writer moves the header behind 1354 * where we started, and we miss in one loop. 1355 * A second loop should grab the header, but we'll do 1356 * three loops just because I'm paranoid. 1357 */ 1358 for (i = 0; i < 3; i++) { 1359 do { 1360 if (rb_is_head_page(page, page->list.prev)) { 1361 cpu_buffer->head_page = page; 1362 return page; 1363 } 1364 rb_inc_page(&page); 1365 } while (page != head); 1366 } 1367 1368 RB_WARN_ON(cpu_buffer, 1); 1369 1370 return NULL; 1371 } 1372 1373 static bool rb_head_page_replace(struct buffer_page *old, 1374 struct buffer_page *new) 1375 { 1376 unsigned long *ptr = (unsigned long *)&old->list.prev->next; 1377 unsigned long val; 1378 1379 val = *ptr & ~RB_FLAG_MASK; 1380 val |= RB_PAGE_HEAD; 1381 1382 return try_cmpxchg(ptr, &val, (unsigned long)&new->list); 1383 } 1384 1385 /* 1386 * rb_tail_page_update - move the tail page forward 1387 */ 1388 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer, 1389 struct buffer_page *tail_page, 1390 struct buffer_page *next_page) 1391 { 1392 unsigned long old_entries; 1393 unsigned long old_write; 1394 1395 /* 1396 * The tail page now needs to be moved forward. 1397 * 1398 * We need to reset the tail page, but without messing 1399 * with possible erasing of data brought in by interrupts 1400 * that have moved the tail page and are currently on it. 1401 * 1402 * We add a counter to the write field to denote this. 1403 */ 1404 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write); 1405 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries); 1406 1407 /* 1408 * Just make sure we have seen our old_write and synchronize 1409 * with any interrupts that come in. 1410 */ 1411 barrier(); 1412 1413 /* 1414 * If the tail page is still the same as what we think 1415 * it is, then it is up to us to update the tail 1416 * pointer. 1417 */ 1418 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) { 1419 /* Zero the write counter */ 1420 unsigned long val = old_write & ~RB_WRITE_MASK; 1421 unsigned long eval = old_entries & ~RB_WRITE_MASK; 1422 1423 /* 1424 * This will only succeed if an interrupt did 1425 * not come in and change it. In which case, we 1426 * do not want to modify it. 1427 * 1428 * We add (void) to let the compiler know that we do not care 1429 * about the return value of these functions. We use the 1430 * cmpxchg to only update if an interrupt did not already 1431 * do it for us. If the cmpxchg fails, we don't care. 1432 */ 1433 (void)local_cmpxchg(&next_page->write, old_write, val); 1434 (void)local_cmpxchg(&next_page->entries, old_entries, eval); 1435 1436 /* 1437 * No need to worry about races with clearing out the commit. 1438 * it only can increment when a commit takes place. But that 1439 * only happens in the outer most nested commit. 1440 */ 1441 local_set(&next_page->page->commit, 0); 1442 1443 /* Either we update tail_page or an interrupt does */ 1444 if (try_cmpxchg(&cpu_buffer->tail_page, &tail_page, next_page)) 1445 local_inc(&cpu_buffer->pages_touched); 1446 } 1447 } 1448 1449 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer, 1450 struct buffer_page *bpage) 1451 { 1452 unsigned long val = (unsigned long)bpage; 1453 1454 RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK); 1455 } 1456 1457 /** 1458 * rb_check_pages - integrity check of buffer pages 1459 * @cpu_buffer: CPU buffer with pages to test 1460 * 1461 * As a safety measure we check to make sure the data pages have not 1462 * been corrupted. 1463 * 1464 * Callers of this function need to guarantee that the list of pages doesn't get 1465 * modified during the check. In particular, if it's possible that the function 1466 * is invoked with concurrent readers which can swap in a new reader page then 1467 * the caller should take cpu_buffer->reader_lock. 1468 */ 1469 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) 1470 { 1471 struct list_head *head = rb_list_head(cpu_buffer->pages); 1472 struct list_head *tmp; 1473 1474 if (RB_WARN_ON(cpu_buffer, 1475 rb_list_head(rb_list_head(head->next)->prev) != head)) 1476 return; 1477 1478 if (RB_WARN_ON(cpu_buffer, 1479 rb_list_head(rb_list_head(head->prev)->next) != head)) 1480 return; 1481 1482 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) { 1483 if (RB_WARN_ON(cpu_buffer, 1484 rb_list_head(rb_list_head(tmp->next)->prev) != tmp)) 1485 return; 1486 1487 if (RB_WARN_ON(cpu_buffer, 1488 rb_list_head(rb_list_head(tmp->prev)->next) != tmp)) 1489 return; 1490 } 1491 } 1492 1493 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1494 long nr_pages, struct list_head *pages) 1495 { 1496 struct buffer_page *bpage, *tmp; 1497 bool user_thread = current->mm != NULL; 1498 gfp_t mflags; 1499 long i; 1500 1501 /* 1502 * Check if the available memory is there first. 1503 * Note, si_mem_available() only gives us a rough estimate of available 1504 * memory. It may not be accurate. But we don't care, we just want 1505 * to prevent doing any allocation when it is obvious that it is 1506 * not going to succeed. 1507 */ 1508 i = si_mem_available(); 1509 if (i < nr_pages) 1510 return -ENOMEM; 1511 1512 /* 1513 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails 1514 * gracefully without invoking oom-killer and the system is not 1515 * destabilized. 1516 */ 1517 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL; 1518 1519 /* 1520 * If a user thread allocates too much, and si_mem_available() 1521 * reports there's enough memory, even though there is not. 1522 * Make sure the OOM killer kills this thread. This can happen 1523 * even with RETRY_MAYFAIL because another task may be doing 1524 * an allocation after this task has taken all memory. 1525 * This is the task the OOM killer needs to take out during this 1526 * loop, even if it was triggered by an allocation somewhere else. 1527 */ 1528 if (user_thread) 1529 set_current_oom_origin(); 1530 for (i = 0; i < nr_pages; i++) { 1531 struct page *page; 1532 1533 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1534 mflags, cpu_to_node(cpu_buffer->cpu)); 1535 if (!bpage) 1536 goto free_pages; 1537 1538 rb_check_bpage(cpu_buffer, bpage); 1539 1540 list_add(&bpage->list, pages); 1541 1542 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), 1543 mflags | __GFP_COMP | __GFP_ZERO, 1544 cpu_buffer->buffer->subbuf_order); 1545 if (!page) 1546 goto free_pages; 1547 bpage->page = page_address(page); 1548 bpage->order = cpu_buffer->buffer->subbuf_order; 1549 rb_init_page(bpage->page); 1550 1551 if (user_thread && fatal_signal_pending(current)) 1552 goto free_pages; 1553 } 1554 if (user_thread) 1555 clear_current_oom_origin(); 1556 1557 return 0; 1558 1559 free_pages: 1560 list_for_each_entry_safe(bpage, tmp, pages, list) { 1561 list_del_init(&bpage->list); 1562 free_buffer_page(bpage); 1563 } 1564 if (user_thread) 1565 clear_current_oom_origin(); 1566 1567 return -ENOMEM; 1568 } 1569 1570 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, 1571 unsigned long nr_pages) 1572 { 1573 LIST_HEAD(pages); 1574 1575 WARN_ON(!nr_pages); 1576 1577 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) 1578 return -ENOMEM; 1579 1580 /* 1581 * The ring buffer page list is a circular list that does not 1582 * start and end with a list head. All page list items point to 1583 * other pages. 1584 */ 1585 cpu_buffer->pages = pages.next; 1586 list_del(&pages); 1587 1588 cpu_buffer->nr_pages = nr_pages; 1589 1590 rb_check_pages(cpu_buffer); 1591 1592 return 0; 1593 } 1594 1595 static struct ring_buffer_per_cpu * 1596 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) 1597 { 1598 struct ring_buffer_per_cpu *cpu_buffer; 1599 struct buffer_page *bpage; 1600 struct page *page; 1601 int ret; 1602 1603 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()), 1604 GFP_KERNEL, cpu_to_node(cpu)); 1605 if (!cpu_buffer) 1606 return NULL; 1607 1608 cpu_buffer->cpu = cpu; 1609 cpu_buffer->buffer = buffer; 1610 raw_spin_lock_init(&cpu_buffer->reader_lock); 1611 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key); 1612 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED; 1613 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler); 1614 init_completion(&cpu_buffer->update_done); 1615 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); 1616 init_waitqueue_head(&cpu_buffer->irq_work.waiters); 1617 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); 1618 mutex_init(&cpu_buffer->mapping_lock); 1619 1620 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), 1621 GFP_KERNEL, cpu_to_node(cpu)); 1622 if (!bpage) 1623 goto fail_free_buffer; 1624 1625 rb_check_bpage(cpu_buffer, bpage); 1626 1627 cpu_buffer->reader_page = bpage; 1628 1629 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO, 1630 cpu_buffer->buffer->subbuf_order); 1631 if (!page) 1632 goto fail_free_reader; 1633 bpage->page = page_address(page); 1634 rb_init_page(bpage->page); 1635 1636 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 1637 INIT_LIST_HEAD(&cpu_buffer->new_pages); 1638 1639 ret = rb_allocate_pages(cpu_buffer, nr_pages); 1640 if (ret < 0) 1641 goto fail_free_reader; 1642 1643 cpu_buffer->head_page 1644 = list_entry(cpu_buffer->pages, struct buffer_page, list); 1645 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 1646 1647 rb_head_page_activate(cpu_buffer); 1648 1649 return cpu_buffer; 1650 1651 fail_free_reader: 1652 free_buffer_page(cpu_buffer->reader_page); 1653 1654 fail_free_buffer: 1655 kfree(cpu_buffer); 1656 return NULL; 1657 } 1658 1659 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 1660 { 1661 struct list_head *head = cpu_buffer->pages; 1662 struct buffer_page *bpage, *tmp; 1663 1664 irq_work_sync(&cpu_buffer->irq_work.work); 1665 1666 free_buffer_page(cpu_buffer->reader_page); 1667 1668 if (head) { 1669 rb_head_page_deactivate(cpu_buffer); 1670 1671 list_for_each_entry_safe(bpage, tmp, head, list) { 1672 list_del_init(&bpage->list); 1673 free_buffer_page(bpage); 1674 } 1675 bpage = list_entry(head, struct buffer_page, list); 1676 free_buffer_page(bpage); 1677 } 1678 1679 free_page((unsigned long)cpu_buffer->free_page); 1680 1681 kfree(cpu_buffer); 1682 } 1683 1684 /** 1685 * __ring_buffer_alloc - allocate a new ring_buffer 1686 * @size: the size in bytes per cpu that is needed. 1687 * @flags: attributes to set for the ring buffer. 1688 * @key: ring buffer reader_lock_key. 1689 * 1690 * Currently the only flag that is available is the RB_FL_OVERWRITE 1691 * flag. This flag means that the buffer will overwrite old data 1692 * when the buffer wraps. If this flag is not set, the buffer will 1693 * drop data when the tail hits the head. 1694 */ 1695 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags, 1696 struct lock_class_key *key) 1697 { 1698 struct trace_buffer *buffer; 1699 long nr_pages; 1700 int bsize; 1701 int cpu; 1702 int ret; 1703 1704 /* keep it in its own cache line */ 1705 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()), 1706 GFP_KERNEL); 1707 if (!buffer) 1708 return NULL; 1709 1710 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL)) 1711 goto fail_free_buffer; 1712 1713 /* Default buffer page size - one system page */ 1714 buffer->subbuf_order = 0; 1715 buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE; 1716 1717 /* Max payload is buffer page size - header (8bytes) */ 1718 buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2); 1719 1720 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 1721 buffer->flags = flags; 1722 buffer->clock = trace_clock_local; 1723 buffer->reader_lock_key = key; 1724 1725 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters); 1726 init_waitqueue_head(&buffer->irq_work.waiters); 1727 1728 /* need at least two pages */ 1729 if (nr_pages < 2) 1730 nr_pages = 2; 1731 1732 buffer->cpus = nr_cpu_ids; 1733 1734 bsize = sizeof(void *) * nr_cpu_ids; 1735 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()), 1736 GFP_KERNEL); 1737 if (!buffer->buffers) 1738 goto fail_free_cpumask; 1739 1740 cpu = raw_smp_processor_id(); 1741 cpumask_set_cpu(cpu, buffer->cpumask); 1742 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 1743 if (!buffer->buffers[cpu]) 1744 goto fail_free_buffers; 1745 1746 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1747 if (ret < 0) 1748 goto fail_free_buffers; 1749 1750 mutex_init(&buffer->mutex); 1751 1752 return buffer; 1753 1754 fail_free_buffers: 1755 for_each_buffer_cpu(buffer, cpu) { 1756 if (buffer->buffers[cpu]) 1757 rb_free_cpu_buffer(buffer->buffers[cpu]); 1758 } 1759 kfree(buffer->buffers); 1760 1761 fail_free_cpumask: 1762 free_cpumask_var(buffer->cpumask); 1763 1764 fail_free_buffer: 1765 kfree(buffer); 1766 return NULL; 1767 } 1768 EXPORT_SYMBOL_GPL(__ring_buffer_alloc); 1769 1770 /** 1771 * ring_buffer_free - free a ring buffer. 1772 * @buffer: the buffer to free. 1773 */ 1774 void 1775 ring_buffer_free(struct trace_buffer *buffer) 1776 { 1777 int cpu; 1778 1779 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node); 1780 1781 irq_work_sync(&buffer->irq_work.work); 1782 1783 for_each_buffer_cpu(buffer, cpu) 1784 rb_free_cpu_buffer(buffer->buffers[cpu]); 1785 1786 kfree(buffer->buffers); 1787 free_cpumask_var(buffer->cpumask); 1788 1789 kfree(buffer); 1790 } 1791 EXPORT_SYMBOL_GPL(ring_buffer_free); 1792 1793 void ring_buffer_set_clock(struct trace_buffer *buffer, 1794 u64 (*clock)(void)) 1795 { 1796 buffer->clock = clock; 1797 } 1798 1799 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs) 1800 { 1801 buffer->time_stamp_abs = abs; 1802 } 1803 1804 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer) 1805 { 1806 return buffer->time_stamp_abs; 1807 } 1808 1809 static inline unsigned long rb_page_entries(struct buffer_page *bpage) 1810 { 1811 return local_read(&bpage->entries) & RB_WRITE_MASK; 1812 } 1813 1814 static inline unsigned long rb_page_write(struct buffer_page *bpage) 1815 { 1816 return local_read(&bpage->write) & RB_WRITE_MASK; 1817 } 1818 1819 static bool 1820 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) 1821 { 1822 struct list_head *tail_page, *to_remove, *next_page; 1823 struct buffer_page *to_remove_page, *tmp_iter_page; 1824 struct buffer_page *last_page, *first_page; 1825 unsigned long nr_removed; 1826 unsigned long head_bit; 1827 int page_entries; 1828 1829 head_bit = 0; 1830 1831 raw_spin_lock_irq(&cpu_buffer->reader_lock); 1832 atomic_inc(&cpu_buffer->record_disabled); 1833 /* 1834 * We don't race with the readers since we have acquired the reader 1835 * lock. We also don't race with writers after disabling recording. 1836 * This makes it easy to figure out the first and the last page to be 1837 * removed from the list. We unlink all the pages in between including 1838 * the first and last pages. This is done in a busy loop so that we 1839 * lose the least number of traces. 1840 * The pages are freed after we restart recording and unlock readers. 1841 */ 1842 tail_page = &cpu_buffer->tail_page->list; 1843 1844 /* 1845 * tail page might be on reader page, we remove the next page 1846 * from the ring buffer 1847 */ 1848 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 1849 tail_page = rb_list_head(tail_page->next); 1850 to_remove = tail_page; 1851 1852 /* start of pages to remove */ 1853 first_page = list_entry(rb_list_head(to_remove->next), 1854 struct buffer_page, list); 1855 1856 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) { 1857 to_remove = rb_list_head(to_remove)->next; 1858 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD; 1859 } 1860 /* Read iterators need to reset themselves when some pages removed */ 1861 cpu_buffer->pages_removed += nr_removed; 1862 1863 next_page = rb_list_head(to_remove)->next; 1864 1865 /* 1866 * Now we remove all pages between tail_page and next_page. 1867 * Make sure that we have head_bit value preserved for the 1868 * next page 1869 */ 1870 tail_page->next = (struct list_head *)((unsigned long)next_page | 1871 head_bit); 1872 next_page = rb_list_head(next_page); 1873 next_page->prev = tail_page; 1874 1875 /* make sure pages points to a valid page in the ring buffer */ 1876 cpu_buffer->pages = next_page; 1877 1878 /* update head page */ 1879 if (head_bit) 1880 cpu_buffer->head_page = list_entry(next_page, 1881 struct buffer_page, list); 1882 1883 /* pages are removed, resume tracing and then free the pages */ 1884 atomic_dec(&cpu_buffer->record_disabled); 1885 raw_spin_unlock_irq(&cpu_buffer->reader_lock); 1886 1887 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)); 1888 1889 /* last buffer page to remove */ 1890 last_page = list_entry(rb_list_head(to_remove), struct buffer_page, 1891 list); 1892 tmp_iter_page = first_page; 1893 1894 do { 1895 cond_resched(); 1896 1897 to_remove_page = tmp_iter_page; 1898 rb_inc_page(&tmp_iter_page); 1899 1900 /* update the counters */ 1901 page_entries = rb_page_entries(to_remove_page); 1902 if (page_entries) { 1903 /* 1904 * If something was added to this page, it was full 1905 * since it is not the tail page. So we deduct the 1906 * bytes consumed in ring buffer from here. 1907 * Increment overrun to account for the lost events. 1908 */ 1909 local_add(page_entries, &cpu_buffer->overrun); 1910 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes); 1911 local_inc(&cpu_buffer->pages_lost); 1912 } 1913 1914 /* 1915 * We have already removed references to this list item, just 1916 * free up the buffer_page and its page 1917 */ 1918 free_buffer_page(to_remove_page); 1919 nr_removed--; 1920 1921 } while (to_remove_page != last_page); 1922 1923 RB_WARN_ON(cpu_buffer, nr_removed); 1924 1925 return nr_removed == 0; 1926 } 1927 1928 static bool 1929 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer) 1930 { 1931 struct list_head *pages = &cpu_buffer->new_pages; 1932 unsigned long flags; 1933 bool success; 1934 int retries; 1935 1936 /* Can be called at early boot up, where interrupts must not been enabled */ 1937 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 1938 /* 1939 * We are holding the reader lock, so the reader page won't be swapped 1940 * in the ring buffer. Now we are racing with the writer trying to 1941 * move head page and the tail page. 1942 * We are going to adapt the reader page update process where: 1943 * 1. We first splice the start and end of list of new pages between 1944 * the head page and its previous page. 1945 * 2. We cmpxchg the prev_page->next to point from head page to the 1946 * start of new pages list. 1947 * 3. Finally, we update the head->prev to the end of new list. 1948 * 1949 * We will try this process 10 times, to make sure that we don't keep 1950 * spinning. 1951 */ 1952 retries = 10; 1953 success = false; 1954 while (retries--) { 1955 struct list_head *head_page, *prev_page; 1956 struct list_head *last_page, *first_page; 1957 struct list_head *head_page_with_bit; 1958 struct buffer_page *hpage = rb_set_head_page(cpu_buffer); 1959 1960 if (!hpage) 1961 break; 1962 head_page = &hpage->list; 1963 prev_page = head_page->prev; 1964 1965 first_page = pages->next; 1966 last_page = pages->prev; 1967 1968 head_page_with_bit = (struct list_head *) 1969 ((unsigned long)head_page | RB_PAGE_HEAD); 1970 1971 last_page->next = head_page_with_bit; 1972 first_page->prev = prev_page; 1973 1974 /* caution: head_page_with_bit gets updated on cmpxchg failure */ 1975 if (try_cmpxchg(&prev_page->next, 1976 &head_page_with_bit, first_page)) { 1977 /* 1978 * yay, we replaced the page pointer to our new list, 1979 * now, we just have to update to head page's prev 1980 * pointer to point to end of list 1981 */ 1982 head_page->prev = last_page; 1983 success = true; 1984 break; 1985 } 1986 } 1987 1988 if (success) 1989 INIT_LIST_HEAD(pages); 1990 /* 1991 * If we weren't successful in adding in new pages, warn and stop 1992 * tracing 1993 */ 1994 RB_WARN_ON(cpu_buffer, !success); 1995 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 1996 1997 /* free pages if they weren't inserted */ 1998 if (!success) { 1999 struct buffer_page *bpage, *tmp; 2000 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2001 list) { 2002 list_del_init(&bpage->list); 2003 free_buffer_page(bpage); 2004 } 2005 } 2006 return success; 2007 } 2008 2009 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer) 2010 { 2011 bool success; 2012 2013 if (cpu_buffer->nr_pages_to_update > 0) 2014 success = rb_insert_pages(cpu_buffer); 2015 else 2016 success = rb_remove_pages(cpu_buffer, 2017 -cpu_buffer->nr_pages_to_update); 2018 2019 if (success) 2020 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update; 2021 } 2022 2023 static void update_pages_handler(struct work_struct *work) 2024 { 2025 struct ring_buffer_per_cpu *cpu_buffer = container_of(work, 2026 struct ring_buffer_per_cpu, update_pages_work); 2027 rb_update_pages(cpu_buffer); 2028 complete(&cpu_buffer->update_done); 2029 } 2030 2031 /** 2032 * ring_buffer_resize - resize the ring buffer 2033 * @buffer: the buffer to resize. 2034 * @size: the new size. 2035 * @cpu_id: the cpu buffer to resize 2036 * 2037 * Minimum size is 2 * buffer->subbuf_size. 2038 * 2039 * Returns 0 on success and < 0 on failure. 2040 */ 2041 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, 2042 int cpu_id) 2043 { 2044 struct ring_buffer_per_cpu *cpu_buffer; 2045 unsigned long nr_pages; 2046 int cpu, err; 2047 2048 /* 2049 * Always succeed at resizing a non-existent buffer: 2050 */ 2051 if (!buffer) 2052 return 0; 2053 2054 /* Make sure the requested buffer exists */ 2055 if (cpu_id != RING_BUFFER_ALL_CPUS && 2056 !cpumask_test_cpu(cpu_id, buffer->cpumask)) 2057 return 0; 2058 2059 nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size); 2060 2061 /* we need a minimum of two pages */ 2062 if (nr_pages < 2) 2063 nr_pages = 2; 2064 2065 /* prevent another thread from changing buffer sizes */ 2066 mutex_lock(&buffer->mutex); 2067 atomic_inc(&buffer->resizing); 2068 2069 if (cpu_id == RING_BUFFER_ALL_CPUS) { 2070 /* 2071 * Don't succeed if resizing is disabled, as a reader might be 2072 * manipulating the ring buffer and is expecting a sane state while 2073 * this is true. 2074 */ 2075 for_each_buffer_cpu(buffer, cpu) { 2076 cpu_buffer = buffer->buffers[cpu]; 2077 if (atomic_read(&cpu_buffer->resize_disabled)) { 2078 err = -EBUSY; 2079 goto out_err_unlock; 2080 } 2081 } 2082 2083 /* calculate the pages to update */ 2084 for_each_buffer_cpu(buffer, cpu) { 2085 cpu_buffer = buffer->buffers[cpu]; 2086 2087 cpu_buffer->nr_pages_to_update = nr_pages - 2088 cpu_buffer->nr_pages; 2089 /* 2090 * nothing more to do for removing pages or no update 2091 */ 2092 if (cpu_buffer->nr_pages_to_update <= 0) 2093 continue; 2094 /* 2095 * to add pages, make sure all new pages can be 2096 * allocated without receiving ENOMEM 2097 */ 2098 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2099 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2100 &cpu_buffer->new_pages)) { 2101 /* not enough memory for new pages */ 2102 err = -ENOMEM; 2103 goto out_err; 2104 } 2105 2106 cond_resched(); 2107 } 2108 2109 cpus_read_lock(); 2110 /* 2111 * Fire off all the required work handlers 2112 * We can't schedule on offline CPUs, but it's not necessary 2113 * since we can change their buffer sizes without any race. 2114 */ 2115 for_each_buffer_cpu(buffer, cpu) { 2116 cpu_buffer = buffer->buffers[cpu]; 2117 if (!cpu_buffer->nr_pages_to_update) 2118 continue; 2119 2120 /* Can't run something on an offline CPU. */ 2121 if (!cpu_online(cpu)) { 2122 rb_update_pages(cpu_buffer); 2123 cpu_buffer->nr_pages_to_update = 0; 2124 } else { 2125 /* Run directly if possible. */ 2126 migrate_disable(); 2127 if (cpu != smp_processor_id()) { 2128 migrate_enable(); 2129 schedule_work_on(cpu, 2130 &cpu_buffer->update_pages_work); 2131 } else { 2132 update_pages_handler(&cpu_buffer->update_pages_work); 2133 migrate_enable(); 2134 } 2135 } 2136 } 2137 2138 /* wait for all the updates to complete */ 2139 for_each_buffer_cpu(buffer, cpu) { 2140 cpu_buffer = buffer->buffers[cpu]; 2141 if (!cpu_buffer->nr_pages_to_update) 2142 continue; 2143 2144 if (cpu_online(cpu)) 2145 wait_for_completion(&cpu_buffer->update_done); 2146 cpu_buffer->nr_pages_to_update = 0; 2147 } 2148 2149 cpus_read_unlock(); 2150 } else { 2151 cpu_buffer = buffer->buffers[cpu_id]; 2152 2153 if (nr_pages == cpu_buffer->nr_pages) 2154 goto out; 2155 2156 /* 2157 * Don't succeed if resizing is disabled, as a reader might be 2158 * manipulating the ring buffer and is expecting a sane state while 2159 * this is true. 2160 */ 2161 if (atomic_read(&cpu_buffer->resize_disabled)) { 2162 err = -EBUSY; 2163 goto out_err_unlock; 2164 } 2165 2166 cpu_buffer->nr_pages_to_update = nr_pages - 2167 cpu_buffer->nr_pages; 2168 2169 INIT_LIST_HEAD(&cpu_buffer->new_pages); 2170 if (cpu_buffer->nr_pages_to_update > 0 && 2171 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, 2172 &cpu_buffer->new_pages)) { 2173 err = -ENOMEM; 2174 goto out_err; 2175 } 2176 2177 cpus_read_lock(); 2178 2179 /* Can't run something on an offline CPU. */ 2180 if (!cpu_online(cpu_id)) 2181 rb_update_pages(cpu_buffer); 2182 else { 2183 /* Run directly if possible. */ 2184 migrate_disable(); 2185 if (cpu_id == smp_processor_id()) { 2186 rb_update_pages(cpu_buffer); 2187 migrate_enable(); 2188 } else { 2189 migrate_enable(); 2190 schedule_work_on(cpu_id, 2191 &cpu_buffer->update_pages_work); 2192 wait_for_completion(&cpu_buffer->update_done); 2193 } 2194 } 2195 2196 cpu_buffer->nr_pages_to_update = 0; 2197 cpus_read_unlock(); 2198 } 2199 2200 out: 2201 /* 2202 * The ring buffer resize can happen with the ring buffer 2203 * enabled, so that the update disturbs the tracing as little 2204 * as possible. But if the buffer is disabled, we do not need 2205 * to worry about that, and we can take the time to verify 2206 * that the buffer is not corrupt. 2207 */ 2208 if (atomic_read(&buffer->record_disabled)) { 2209 atomic_inc(&buffer->record_disabled); 2210 /* 2211 * Even though the buffer was disabled, we must make sure 2212 * that it is truly disabled before calling rb_check_pages. 2213 * There could have been a race between checking 2214 * record_disable and incrementing it. 2215 */ 2216 synchronize_rcu(); 2217 for_each_buffer_cpu(buffer, cpu) { 2218 unsigned long flags; 2219 2220 cpu_buffer = buffer->buffers[cpu]; 2221 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 2222 rb_check_pages(cpu_buffer); 2223 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 2224 } 2225 atomic_dec(&buffer->record_disabled); 2226 } 2227 2228 atomic_dec(&buffer->resizing); 2229 mutex_unlock(&buffer->mutex); 2230 return 0; 2231 2232 out_err: 2233 for_each_buffer_cpu(buffer, cpu) { 2234 struct buffer_page *bpage, *tmp; 2235 2236 cpu_buffer = buffer->buffers[cpu]; 2237 cpu_buffer->nr_pages_to_update = 0; 2238 2239 if (list_empty(&cpu_buffer->new_pages)) 2240 continue; 2241 2242 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, 2243 list) { 2244 list_del_init(&bpage->list); 2245 free_buffer_page(bpage); 2246 } 2247 } 2248 out_err_unlock: 2249 atomic_dec(&buffer->resizing); 2250 mutex_unlock(&buffer->mutex); 2251 return err; 2252 } 2253 EXPORT_SYMBOL_GPL(ring_buffer_resize); 2254 2255 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val) 2256 { 2257 mutex_lock(&buffer->mutex); 2258 if (val) 2259 buffer->flags |= RB_FL_OVERWRITE; 2260 else 2261 buffer->flags &= ~RB_FL_OVERWRITE; 2262 mutex_unlock(&buffer->mutex); 2263 } 2264 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite); 2265 2266 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index) 2267 { 2268 return bpage->page->data + index; 2269 } 2270 2271 static __always_inline struct ring_buffer_event * 2272 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) 2273 { 2274 return __rb_page_index(cpu_buffer->reader_page, 2275 cpu_buffer->reader_page->read); 2276 } 2277 2278 static struct ring_buffer_event * 2279 rb_iter_head_event(struct ring_buffer_iter *iter) 2280 { 2281 struct ring_buffer_event *event; 2282 struct buffer_page *iter_head_page = iter->head_page; 2283 unsigned long commit; 2284 unsigned length; 2285 2286 if (iter->head != iter->next_event) 2287 return iter->event; 2288 2289 /* 2290 * When the writer goes across pages, it issues a cmpxchg which 2291 * is a mb(), which will synchronize with the rmb here. 2292 * (see rb_tail_page_update() and __rb_reserve_next()) 2293 */ 2294 commit = rb_page_commit(iter_head_page); 2295 smp_rmb(); 2296 2297 /* An event needs to be at least 8 bytes in size */ 2298 if (iter->head > commit - 8) 2299 goto reset; 2300 2301 event = __rb_page_index(iter_head_page, iter->head); 2302 length = rb_event_length(event); 2303 2304 /* 2305 * READ_ONCE() doesn't work on functions and we don't want the 2306 * compiler doing any crazy optimizations with length. 2307 */ 2308 barrier(); 2309 2310 if ((iter->head + length) > commit || length > iter->event_size) 2311 /* Writer corrupted the read? */ 2312 goto reset; 2313 2314 memcpy(iter->event, event, length); 2315 /* 2316 * If the page stamp is still the same after this rmb() then the 2317 * event was safely copied without the writer entering the page. 2318 */ 2319 smp_rmb(); 2320 2321 /* Make sure the page didn't change since we read this */ 2322 if (iter->page_stamp != iter_head_page->page->time_stamp || 2323 commit > rb_page_commit(iter_head_page)) 2324 goto reset; 2325 2326 iter->next_event = iter->head + length; 2327 return iter->event; 2328 reset: 2329 /* Reset to the beginning */ 2330 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2331 iter->head = 0; 2332 iter->next_event = 0; 2333 iter->missed_events = 1; 2334 return NULL; 2335 } 2336 2337 /* Size is determined by what has been committed */ 2338 static __always_inline unsigned rb_page_size(struct buffer_page *bpage) 2339 { 2340 return rb_page_commit(bpage) & ~RB_MISSED_MASK; 2341 } 2342 2343 static __always_inline unsigned 2344 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer) 2345 { 2346 return rb_page_commit(cpu_buffer->commit_page); 2347 } 2348 2349 static __always_inline unsigned 2350 rb_event_index(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) 2351 { 2352 unsigned long addr = (unsigned long)event; 2353 2354 addr &= (PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1; 2355 2356 return addr - BUF_PAGE_HDR_SIZE; 2357 } 2358 2359 static void rb_inc_iter(struct ring_buffer_iter *iter) 2360 { 2361 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 2362 2363 /* 2364 * The iterator could be on the reader page (it starts there). 2365 * But the head could have moved, since the reader was 2366 * found. Check for this case and assign the iterator 2367 * to the head page instead of next. 2368 */ 2369 if (iter->head_page == cpu_buffer->reader_page) 2370 iter->head_page = rb_set_head_page(cpu_buffer); 2371 else 2372 rb_inc_page(&iter->head_page); 2373 2374 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; 2375 iter->head = 0; 2376 iter->next_event = 0; 2377 } 2378 2379 /* 2380 * rb_handle_head_page - writer hit the head page 2381 * 2382 * Returns: +1 to retry page 2383 * 0 to continue 2384 * -1 on error 2385 */ 2386 static int 2387 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, 2388 struct buffer_page *tail_page, 2389 struct buffer_page *next_page) 2390 { 2391 struct buffer_page *new_head; 2392 int entries; 2393 int type; 2394 int ret; 2395 2396 entries = rb_page_entries(next_page); 2397 2398 /* 2399 * The hard part is here. We need to move the head 2400 * forward, and protect against both readers on 2401 * other CPUs and writers coming in via interrupts. 2402 */ 2403 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page, 2404 RB_PAGE_HEAD); 2405 2406 /* 2407 * type can be one of four: 2408 * NORMAL - an interrupt already moved it for us 2409 * HEAD - we are the first to get here. 2410 * UPDATE - we are the interrupt interrupting 2411 * a current move. 2412 * MOVED - a reader on another CPU moved the next 2413 * pointer to its reader page. Give up 2414 * and try again. 2415 */ 2416 2417 switch (type) { 2418 case RB_PAGE_HEAD: 2419 /* 2420 * We changed the head to UPDATE, thus 2421 * it is our responsibility to update 2422 * the counters. 2423 */ 2424 local_add(entries, &cpu_buffer->overrun); 2425 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes); 2426 local_inc(&cpu_buffer->pages_lost); 2427 2428 /* 2429 * The entries will be zeroed out when we move the 2430 * tail page. 2431 */ 2432 2433 /* still more to do */ 2434 break; 2435 2436 case RB_PAGE_UPDATE: 2437 /* 2438 * This is an interrupt that interrupt the 2439 * previous update. Still more to do. 2440 */ 2441 break; 2442 case RB_PAGE_NORMAL: 2443 /* 2444 * An interrupt came in before the update 2445 * and processed this for us. 2446 * Nothing left to do. 2447 */ 2448 return 1; 2449 case RB_PAGE_MOVED: 2450 /* 2451 * The reader is on another CPU and just did 2452 * a swap with our next_page. 2453 * Try again. 2454 */ 2455 return 1; 2456 default: 2457 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */ 2458 return -1; 2459 } 2460 2461 /* 2462 * Now that we are here, the old head pointer is 2463 * set to UPDATE. This will keep the reader from 2464 * swapping the head page with the reader page. 2465 * The reader (on another CPU) will spin till 2466 * we are finished. 2467 * 2468 * We just need to protect against interrupts 2469 * doing the job. We will set the next pointer 2470 * to HEAD. After that, we set the old pointer 2471 * to NORMAL, but only if it was HEAD before. 2472 * otherwise we are an interrupt, and only 2473 * want the outer most commit to reset it. 2474 */ 2475 new_head = next_page; 2476 rb_inc_page(&new_head); 2477 2478 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, 2479 RB_PAGE_NORMAL); 2480 2481 /* 2482 * Valid returns are: 2483 * HEAD - an interrupt came in and already set it. 2484 * NORMAL - One of two things: 2485 * 1) We really set it. 2486 * 2) A bunch of interrupts came in and moved 2487 * the page forward again. 2488 */ 2489 switch (ret) { 2490 case RB_PAGE_HEAD: 2491 case RB_PAGE_NORMAL: 2492 /* OK */ 2493 break; 2494 default: 2495 RB_WARN_ON(cpu_buffer, 1); 2496 return -1; 2497 } 2498 2499 /* 2500 * It is possible that an interrupt came in, 2501 * set the head up, then more interrupts came in 2502 * and moved it again. When we get back here, 2503 * the page would have been set to NORMAL but we 2504 * just set it back to HEAD. 2505 * 2506 * How do you detect this? Well, if that happened 2507 * the tail page would have moved. 2508 */ 2509 if (ret == RB_PAGE_NORMAL) { 2510 struct buffer_page *buffer_tail_page; 2511 2512 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page); 2513 /* 2514 * If the tail had moved passed next, then we need 2515 * to reset the pointer. 2516 */ 2517 if (buffer_tail_page != tail_page && 2518 buffer_tail_page != next_page) 2519 rb_head_page_set_normal(cpu_buffer, new_head, 2520 next_page, 2521 RB_PAGE_HEAD); 2522 } 2523 2524 /* 2525 * If this was the outer most commit (the one that 2526 * changed the original pointer from HEAD to UPDATE), 2527 * then it is up to us to reset it to NORMAL. 2528 */ 2529 if (type == RB_PAGE_HEAD) { 2530 ret = rb_head_page_set_normal(cpu_buffer, next_page, 2531 tail_page, 2532 RB_PAGE_UPDATE); 2533 if (RB_WARN_ON(cpu_buffer, 2534 ret != RB_PAGE_UPDATE)) 2535 return -1; 2536 } 2537 2538 return 0; 2539 } 2540 2541 static inline void 2542 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, 2543 unsigned long tail, struct rb_event_info *info) 2544 { 2545 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 2546 struct buffer_page *tail_page = info->tail_page; 2547 struct ring_buffer_event *event; 2548 unsigned long length = info->length; 2549 2550 /* 2551 * Only the event that crossed the page boundary 2552 * must fill the old tail_page with padding. 2553 */ 2554 if (tail >= bsize) { 2555 /* 2556 * If the page was filled, then we still need 2557 * to update the real_end. Reset it to zero 2558 * and the reader will ignore it. 2559 */ 2560 if (tail == bsize) 2561 tail_page->real_end = 0; 2562 2563 local_sub(length, &tail_page->write); 2564 return; 2565 } 2566 2567 event = __rb_page_index(tail_page, tail); 2568 2569 /* 2570 * Save the original length to the meta data. 2571 * This will be used by the reader to add lost event 2572 * counter. 2573 */ 2574 tail_page->real_end = tail; 2575 2576 /* 2577 * If this event is bigger than the minimum size, then 2578 * we need to be careful that we don't subtract the 2579 * write counter enough to allow another writer to slip 2580 * in on this page. 2581 * We put in a discarded commit instead, to make sure 2582 * that this space is not used again, and this space will 2583 * not be accounted into 'entries_bytes'. 2584 * 2585 * If we are less than the minimum size, we don't need to 2586 * worry about it. 2587 */ 2588 if (tail > (bsize - RB_EVNT_MIN_SIZE)) { 2589 /* No room for any events */ 2590 2591 /* Mark the rest of the page with padding */ 2592 rb_event_set_padding(event); 2593 2594 /* Make sure the padding is visible before the write update */ 2595 smp_wmb(); 2596 2597 /* Set the write back to the previous setting */ 2598 local_sub(length, &tail_page->write); 2599 return; 2600 } 2601 2602 /* Put in a discarded event */ 2603 event->array[0] = (bsize - tail) - RB_EVNT_HDR_SIZE; 2604 event->type_len = RINGBUF_TYPE_PADDING; 2605 /* time delta must be non zero */ 2606 event->time_delta = 1; 2607 2608 /* account for padding bytes */ 2609 local_add(bsize - tail, &cpu_buffer->entries_bytes); 2610 2611 /* Make sure the padding is visible before the tail_page->write update */ 2612 smp_wmb(); 2613 2614 /* Set write to end of buffer */ 2615 length = (tail + length) - bsize; 2616 local_sub(length, &tail_page->write); 2617 } 2618 2619 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer); 2620 2621 /* 2622 * This is the slow path, force gcc not to inline it. 2623 */ 2624 static noinline struct ring_buffer_event * 2625 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, 2626 unsigned long tail, struct rb_event_info *info) 2627 { 2628 struct buffer_page *tail_page = info->tail_page; 2629 struct buffer_page *commit_page = cpu_buffer->commit_page; 2630 struct trace_buffer *buffer = cpu_buffer->buffer; 2631 struct buffer_page *next_page; 2632 int ret; 2633 2634 next_page = tail_page; 2635 2636 rb_inc_page(&next_page); 2637 2638 /* 2639 * If for some reason, we had an interrupt storm that made 2640 * it all the way around the buffer, bail, and warn 2641 * about it. 2642 */ 2643 if (unlikely(next_page == commit_page)) { 2644 local_inc(&cpu_buffer->commit_overrun); 2645 goto out_reset; 2646 } 2647 2648 /* 2649 * This is where the fun begins! 2650 * 2651 * We are fighting against races between a reader that 2652 * could be on another CPU trying to swap its reader 2653 * page with the buffer head. 2654 * 2655 * We are also fighting against interrupts coming in and 2656 * moving the head or tail on us as well. 2657 * 2658 * If the next page is the head page then we have filled 2659 * the buffer, unless the commit page is still on the 2660 * reader page. 2661 */ 2662 if (rb_is_head_page(next_page, &tail_page->list)) { 2663 2664 /* 2665 * If the commit is not on the reader page, then 2666 * move the header page. 2667 */ 2668 if (!rb_is_reader_page(cpu_buffer->commit_page)) { 2669 /* 2670 * If we are not in overwrite mode, 2671 * this is easy, just stop here. 2672 */ 2673 if (!(buffer->flags & RB_FL_OVERWRITE)) { 2674 local_inc(&cpu_buffer->dropped_events); 2675 goto out_reset; 2676 } 2677 2678 ret = rb_handle_head_page(cpu_buffer, 2679 tail_page, 2680 next_page); 2681 if (ret < 0) 2682 goto out_reset; 2683 if (ret) 2684 goto out_again; 2685 } else { 2686 /* 2687 * We need to be careful here too. The 2688 * commit page could still be on the reader 2689 * page. We could have a small buffer, and 2690 * have filled up the buffer with events 2691 * from interrupts and such, and wrapped. 2692 * 2693 * Note, if the tail page is also on the 2694 * reader_page, we let it move out. 2695 */ 2696 if (unlikely((cpu_buffer->commit_page != 2697 cpu_buffer->tail_page) && 2698 (cpu_buffer->commit_page == 2699 cpu_buffer->reader_page))) { 2700 local_inc(&cpu_buffer->commit_overrun); 2701 goto out_reset; 2702 } 2703 } 2704 } 2705 2706 rb_tail_page_update(cpu_buffer, tail_page, next_page); 2707 2708 out_again: 2709 2710 rb_reset_tail(cpu_buffer, tail, info); 2711 2712 /* Commit what we have for now. */ 2713 rb_end_commit(cpu_buffer); 2714 /* rb_end_commit() decs committing */ 2715 local_inc(&cpu_buffer->committing); 2716 2717 /* fail and let the caller try again */ 2718 return ERR_PTR(-EAGAIN); 2719 2720 out_reset: 2721 /* reset write */ 2722 rb_reset_tail(cpu_buffer, tail, info); 2723 2724 return NULL; 2725 } 2726 2727 /* Slow path */ 2728 static struct ring_buffer_event * 2729 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer, 2730 struct ring_buffer_event *event, u64 delta, bool abs) 2731 { 2732 if (abs) 2733 event->type_len = RINGBUF_TYPE_TIME_STAMP; 2734 else 2735 event->type_len = RINGBUF_TYPE_TIME_EXTEND; 2736 2737 /* Not the first event on the page, or not delta? */ 2738 if (abs || rb_event_index(cpu_buffer, event)) { 2739 event->time_delta = delta & TS_MASK; 2740 event->array[0] = delta >> TS_SHIFT; 2741 } else { 2742 /* nope, just zero it */ 2743 event->time_delta = 0; 2744 event->array[0] = 0; 2745 } 2746 2747 return skip_time_extend(event); 2748 } 2749 2750 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK 2751 static inline bool sched_clock_stable(void) 2752 { 2753 return true; 2754 } 2755 #endif 2756 2757 static void 2758 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2759 struct rb_event_info *info) 2760 { 2761 u64 write_stamp; 2762 2763 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", 2764 (unsigned long long)info->delta, 2765 (unsigned long long)info->ts, 2766 (unsigned long long)info->before, 2767 (unsigned long long)info->after, 2768 (unsigned long long)({rb_time_read(&cpu_buffer->write_stamp, &write_stamp); write_stamp;}), 2769 sched_clock_stable() ? "" : 2770 "If you just came from a suspend/resume,\n" 2771 "please switch to the trace global clock:\n" 2772 " echo global > /sys/kernel/tracing/trace_clock\n" 2773 "or add trace_clock=global to the kernel command line\n"); 2774 } 2775 2776 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, 2777 struct ring_buffer_event **event, 2778 struct rb_event_info *info, 2779 u64 *delta, 2780 unsigned int *length) 2781 { 2782 bool abs = info->add_timestamp & 2783 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); 2784 2785 if (unlikely(info->delta > (1ULL << 59))) { 2786 /* 2787 * Some timers can use more than 59 bits, and when a timestamp 2788 * is added to the buffer, it will lose those bits. 2789 */ 2790 if (abs && (info->ts & TS_MSB)) { 2791 info->delta &= ABS_TS_MASK; 2792 2793 /* did the clock go backwards */ 2794 } else if (info->before == info->after && info->before > info->ts) { 2795 /* not interrupted */ 2796 static int once; 2797 2798 /* 2799 * This is possible with a recalibrating of the TSC. 2800 * Do not produce a call stack, but just report it. 2801 */ 2802 if (!once) { 2803 once++; 2804 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", 2805 info->before, info->ts); 2806 } 2807 } else 2808 rb_check_timestamp(cpu_buffer, info); 2809 if (!abs) 2810 info->delta = 0; 2811 } 2812 *event = rb_add_time_stamp(cpu_buffer, *event, info->delta, abs); 2813 *length -= RB_LEN_TIME_EXTEND; 2814 *delta = 0; 2815 } 2816 2817 /** 2818 * rb_update_event - update event type and data 2819 * @cpu_buffer: The per cpu buffer of the @event 2820 * @event: the event to update 2821 * @info: The info to update the @event with (contains length and delta) 2822 * 2823 * Update the type and data fields of the @event. The length 2824 * is the actual size that is written to the ring buffer, 2825 * and with this, we can determine what to place into the 2826 * data field. 2827 */ 2828 static void 2829 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, 2830 struct ring_buffer_event *event, 2831 struct rb_event_info *info) 2832 { 2833 unsigned length = info->length; 2834 u64 delta = info->delta; 2835 unsigned int nest = local_read(&cpu_buffer->committing) - 1; 2836 2837 if (!WARN_ON_ONCE(nest >= MAX_NEST)) 2838 cpu_buffer->event_stamp[nest] = info->ts; 2839 2840 /* 2841 * If we need to add a timestamp, then we 2842 * add it to the start of the reserved space. 2843 */ 2844 if (unlikely(info->add_timestamp)) 2845 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); 2846 2847 event->time_delta = delta; 2848 length -= RB_EVNT_HDR_SIZE; 2849 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { 2850 event->type_len = 0; 2851 event->array[0] = length; 2852 } else 2853 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT); 2854 } 2855 2856 static unsigned rb_calculate_event_length(unsigned length) 2857 { 2858 struct ring_buffer_event event; /* Used only for sizeof array */ 2859 2860 /* zero length can cause confusions */ 2861 if (!length) 2862 length++; 2863 2864 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) 2865 length += sizeof(event.array[0]); 2866 2867 length += RB_EVNT_HDR_SIZE; 2868 length = ALIGN(length, RB_ARCH_ALIGNMENT); 2869 2870 /* 2871 * In case the time delta is larger than the 27 bits for it 2872 * in the header, we need to add a timestamp. If another 2873 * event comes in when trying to discard this one to increase 2874 * the length, then the timestamp will be added in the allocated 2875 * space of this event. If length is bigger than the size needed 2876 * for the TIME_EXTEND, then padding has to be used. The events 2877 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal 2878 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding. 2879 * As length is a multiple of 4, we only need to worry if it 2880 * is 12 (RB_LEN_TIME_EXTEND + 4). 2881 */ 2882 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT) 2883 length += RB_ALIGNMENT; 2884 2885 return length; 2886 } 2887 2888 static inline bool 2889 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, 2890 struct ring_buffer_event *event) 2891 { 2892 unsigned long new_index, old_index; 2893 struct buffer_page *bpage; 2894 unsigned long addr; 2895 2896 new_index = rb_event_index(cpu_buffer, event); 2897 old_index = new_index + rb_event_ts_length(event); 2898 addr = (unsigned long)event; 2899 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 2900 2901 bpage = READ_ONCE(cpu_buffer->tail_page); 2902 2903 /* 2904 * Make sure the tail_page is still the same and 2905 * the next write location is the end of this event 2906 */ 2907 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { 2908 unsigned long write_mask = 2909 local_read(&bpage->write) & ~RB_WRITE_MASK; 2910 unsigned long event_length = rb_event_length(event); 2911 2912 /* 2913 * For the before_stamp to be different than the write_stamp 2914 * to make sure that the next event adds an absolute 2915 * value and does not rely on the saved write stamp, which 2916 * is now going to be bogus. 2917 * 2918 * By setting the before_stamp to zero, the next event 2919 * is not going to use the write_stamp and will instead 2920 * create an absolute timestamp. This means there's no 2921 * reason to update the wirte_stamp! 2922 */ 2923 rb_time_set(&cpu_buffer->before_stamp, 0); 2924 2925 /* 2926 * If an event were to come in now, it would see that the 2927 * write_stamp and the before_stamp are different, and assume 2928 * that this event just added itself before updating 2929 * the write stamp. The interrupting event will fix the 2930 * write stamp for us, and use an absolute timestamp. 2931 */ 2932 2933 /* 2934 * This is on the tail page. It is possible that 2935 * a write could come in and move the tail page 2936 * and write to the next page. That is fine 2937 * because we just shorten what is on this page. 2938 */ 2939 old_index += write_mask; 2940 new_index += write_mask; 2941 2942 /* caution: old_index gets updated on cmpxchg failure */ 2943 if (local_try_cmpxchg(&bpage->write, &old_index, new_index)) { 2944 /* update counters */ 2945 local_sub(event_length, &cpu_buffer->entries_bytes); 2946 return true; 2947 } 2948 } 2949 2950 /* could not discard */ 2951 return false; 2952 } 2953 2954 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer) 2955 { 2956 local_inc(&cpu_buffer->committing); 2957 local_inc(&cpu_buffer->commits); 2958 } 2959 2960 static __always_inline void 2961 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) 2962 { 2963 unsigned long max_count; 2964 2965 /* 2966 * We only race with interrupts and NMIs on this CPU. 2967 * If we own the commit event, then we can commit 2968 * all others that interrupted us, since the interruptions 2969 * are in stack format (they finish before they come 2970 * back to us). This allows us to do a simple loop to 2971 * assign the commit to the tail. 2972 */ 2973 again: 2974 max_count = cpu_buffer->nr_pages * 100; 2975 2976 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) { 2977 if (RB_WARN_ON(cpu_buffer, !(--max_count))) 2978 return; 2979 if (RB_WARN_ON(cpu_buffer, 2980 rb_is_reader_page(cpu_buffer->tail_page))) 2981 return; 2982 /* 2983 * No need for a memory barrier here, as the update 2984 * of the tail_page did it for this page. 2985 */ 2986 local_set(&cpu_buffer->commit_page->page->commit, 2987 rb_page_write(cpu_buffer->commit_page)); 2988 rb_inc_page(&cpu_buffer->commit_page); 2989 /* add barrier to keep gcc from optimizing too much */ 2990 barrier(); 2991 } 2992 while (rb_commit_index(cpu_buffer) != 2993 rb_page_write(cpu_buffer->commit_page)) { 2994 2995 /* Make sure the readers see the content of what is committed. */ 2996 smp_wmb(); 2997 local_set(&cpu_buffer->commit_page->page->commit, 2998 rb_page_write(cpu_buffer->commit_page)); 2999 RB_WARN_ON(cpu_buffer, 3000 local_read(&cpu_buffer->commit_page->page->commit) & 3001 ~RB_WRITE_MASK); 3002 barrier(); 3003 } 3004 3005 /* again, keep gcc from optimizing */ 3006 barrier(); 3007 3008 /* 3009 * If an interrupt came in just after the first while loop 3010 * and pushed the tail page forward, we will be left with 3011 * a dangling commit that will never go forward. 3012 */ 3013 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page))) 3014 goto again; 3015 } 3016 3017 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer) 3018 { 3019 unsigned long commits; 3020 3021 if (RB_WARN_ON(cpu_buffer, 3022 !local_read(&cpu_buffer->committing))) 3023 return; 3024 3025 again: 3026 commits = local_read(&cpu_buffer->commits); 3027 /* synchronize with interrupts */ 3028 barrier(); 3029 if (local_read(&cpu_buffer->committing) == 1) 3030 rb_set_commit_to_write(cpu_buffer); 3031 3032 local_dec(&cpu_buffer->committing); 3033 3034 /* synchronize with interrupts */ 3035 barrier(); 3036 3037 /* 3038 * Need to account for interrupts coming in between the 3039 * updating of the commit page and the clearing of the 3040 * committing counter. 3041 */ 3042 if (unlikely(local_read(&cpu_buffer->commits) != commits) && 3043 !local_read(&cpu_buffer->committing)) { 3044 local_inc(&cpu_buffer->committing); 3045 goto again; 3046 } 3047 } 3048 3049 static inline void rb_event_discard(struct ring_buffer_event *event) 3050 { 3051 if (extended_time(event)) 3052 event = skip_time_extend(event); 3053 3054 /* array[0] holds the actual length for the discarded event */ 3055 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE; 3056 event->type_len = RINGBUF_TYPE_PADDING; 3057 /* time delta must be non zero */ 3058 if (!event->time_delta) 3059 event->time_delta = 1; 3060 } 3061 3062 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer) 3063 { 3064 local_inc(&cpu_buffer->entries); 3065 rb_end_commit(cpu_buffer); 3066 } 3067 3068 static __always_inline void 3069 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) 3070 { 3071 if (buffer->irq_work.waiters_pending) { 3072 buffer->irq_work.waiters_pending = false; 3073 /* irq_work_queue() supplies it's own memory barriers */ 3074 irq_work_queue(&buffer->irq_work.work); 3075 } 3076 3077 if (cpu_buffer->irq_work.waiters_pending) { 3078 cpu_buffer->irq_work.waiters_pending = false; 3079 /* irq_work_queue() supplies it's own memory barriers */ 3080 irq_work_queue(&cpu_buffer->irq_work.work); 3081 } 3082 3083 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched)) 3084 return; 3085 3086 if (cpu_buffer->reader_page == cpu_buffer->commit_page) 3087 return; 3088 3089 if (!cpu_buffer->irq_work.full_waiters_pending) 3090 return; 3091 3092 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched); 3093 3094 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full)) 3095 return; 3096 3097 cpu_buffer->irq_work.wakeup_full = true; 3098 cpu_buffer->irq_work.full_waiters_pending = false; 3099 /* irq_work_queue() supplies it's own memory barriers */ 3100 irq_work_queue(&cpu_buffer->irq_work.work); 3101 } 3102 3103 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION 3104 # define do_ring_buffer_record_recursion() \ 3105 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) 3106 #else 3107 # define do_ring_buffer_record_recursion() do { } while (0) 3108 #endif 3109 3110 /* 3111 * The lock and unlock are done within a preempt disable section. 3112 * The current_context per_cpu variable can only be modified 3113 * by the current task between lock and unlock. But it can 3114 * be modified more than once via an interrupt. To pass this 3115 * information from the lock to the unlock without having to 3116 * access the 'in_interrupt()' functions again (which do show 3117 * a bit of overhead in something as critical as function tracing, 3118 * we use a bitmask trick. 3119 * 3120 * bit 1 = NMI context 3121 * bit 2 = IRQ context 3122 * bit 3 = SoftIRQ context 3123 * bit 4 = normal context. 3124 * 3125 * This works because this is the order of contexts that can 3126 * preempt other contexts. A SoftIRQ never preempts an IRQ 3127 * context. 3128 * 3129 * When the context is determined, the corresponding bit is 3130 * checked and set (if it was set, then a recursion of that context 3131 * happened). 3132 * 3133 * On unlock, we need to clear this bit. To do so, just subtract 3134 * 1 from the current_context and AND it to itself. 3135 * 3136 * (binary) 3137 * 101 - 1 = 100 3138 * 101 & 100 = 100 (clearing bit zero) 3139 * 3140 * 1010 - 1 = 1001 3141 * 1010 & 1001 = 1000 (clearing bit 1) 3142 * 3143 * The least significant bit can be cleared this way, and it 3144 * just so happens that it is the same bit corresponding to 3145 * the current context. 3146 * 3147 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit 3148 * is set when a recursion is detected at the current context, and if 3149 * the TRANSITION bit is already set, it will fail the recursion. 3150 * This is needed because there's a lag between the changing of 3151 * interrupt context and updating the preempt count. In this case, 3152 * a false positive will be found. To handle this, one extra recursion 3153 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION 3154 * bit is already set, then it is considered a recursion and the function 3155 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. 3156 * 3157 * On the trace_recursive_unlock(), the TRANSITION bit will be the first 3158 * to be cleared. Even if it wasn't the context that set it. That is, 3159 * if an interrupt comes in while NORMAL bit is set and the ring buffer 3160 * is called before preempt_count() is updated, since the check will 3161 * be on the NORMAL bit, the TRANSITION bit will then be set. If an 3162 * NMI then comes in, it will set the NMI bit, but when the NMI code 3163 * does the trace_recursive_unlock() it will clear the TRANSITION bit 3164 * and leave the NMI bit set. But this is fine, because the interrupt 3165 * code that set the TRANSITION bit will then clear the NMI bit when it 3166 * calls trace_recursive_unlock(). If another NMI comes in, it will 3167 * set the TRANSITION bit and continue. 3168 * 3169 * Note: The TRANSITION bit only handles a single transition between context. 3170 */ 3171 3172 static __always_inline bool 3173 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) 3174 { 3175 unsigned int val = cpu_buffer->current_context; 3176 int bit = interrupt_context_level(); 3177 3178 bit = RB_CTX_NORMAL - bit; 3179 3180 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { 3181 /* 3182 * It is possible that this was called by transitioning 3183 * between interrupt context, and preempt_count() has not 3184 * been updated yet. In this case, use the TRANSITION bit. 3185 */ 3186 bit = RB_CTX_TRANSITION; 3187 if (val & (1 << (bit + cpu_buffer->nest))) { 3188 do_ring_buffer_record_recursion(); 3189 return true; 3190 } 3191 } 3192 3193 val |= (1 << (bit + cpu_buffer->nest)); 3194 cpu_buffer->current_context = val; 3195 3196 return false; 3197 } 3198 3199 static __always_inline void 3200 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) 3201 { 3202 cpu_buffer->current_context &= 3203 cpu_buffer->current_context - (1 << cpu_buffer->nest); 3204 } 3205 3206 /* The recursive locking above uses 5 bits */ 3207 #define NESTED_BITS 5 3208 3209 /** 3210 * ring_buffer_nest_start - Allow to trace while nested 3211 * @buffer: The ring buffer to modify 3212 * 3213 * The ring buffer has a safety mechanism to prevent recursion. 3214 * But there may be a case where a trace needs to be done while 3215 * tracing something else. In this case, calling this function 3216 * will allow this function to nest within a currently active 3217 * ring_buffer_lock_reserve(). 3218 * 3219 * Call this function before calling another ring_buffer_lock_reserve() and 3220 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit(). 3221 */ 3222 void ring_buffer_nest_start(struct trace_buffer *buffer) 3223 { 3224 struct ring_buffer_per_cpu *cpu_buffer; 3225 int cpu; 3226 3227 /* Enabled by ring_buffer_nest_end() */ 3228 preempt_disable_notrace(); 3229 cpu = raw_smp_processor_id(); 3230 cpu_buffer = buffer->buffers[cpu]; 3231 /* This is the shift value for the above recursive locking */ 3232 cpu_buffer->nest += NESTED_BITS; 3233 } 3234 3235 /** 3236 * ring_buffer_nest_end - Allow to trace while nested 3237 * @buffer: The ring buffer to modify 3238 * 3239 * Must be called after ring_buffer_nest_start() and after the 3240 * ring_buffer_unlock_commit(). 3241 */ 3242 void ring_buffer_nest_end(struct trace_buffer *buffer) 3243 { 3244 struct ring_buffer_per_cpu *cpu_buffer; 3245 int cpu; 3246 3247 /* disabled by ring_buffer_nest_start() */ 3248 cpu = raw_smp_processor_id(); 3249 cpu_buffer = buffer->buffers[cpu]; 3250 /* This is the shift value for the above recursive locking */ 3251 cpu_buffer->nest -= NESTED_BITS; 3252 preempt_enable_notrace(); 3253 } 3254 3255 /** 3256 * ring_buffer_unlock_commit - commit a reserved 3257 * @buffer: The buffer to commit to 3258 * 3259 * This commits the data to the ring buffer, and releases any locks held. 3260 * 3261 * Must be paired with ring_buffer_lock_reserve. 3262 */ 3263 int ring_buffer_unlock_commit(struct trace_buffer *buffer) 3264 { 3265 struct ring_buffer_per_cpu *cpu_buffer; 3266 int cpu = raw_smp_processor_id(); 3267 3268 cpu_buffer = buffer->buffers[cpu]; 3269 3270 rb_commit(cpu_buffer); 3271 3272 rb_wakeups(buffer, cpu_buffer); 3273 3274 trace_recursive_unlock(cpu_buffer); 3275 3276 preempt_enable_notrace(); 3277 3278 return 0; 3279 } 3280 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); 3281 3282 /* Special value to validate all deltas on a page. */ 3283 #define CHECK_FULL_PAGE 1L 3284 3285 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS 3286 3287 static const char *show_irq_str(int bits) 3288 { 3289 const char *type[] = { 3290 ".", // 0 3291 "s", // 1 3292 "h", // 2 3293 "Hs", // 3 3294 "n", // 4 3295 "Ns", // 5 3296 "Nh", // 6 3297 "NHs", // 7 3298 }; 3299 3300 return type[bits]; 3301 } 3302 3303 /* Assume this is an trace event */ 3304 static const char *show_flags(struct ring_buffer_event *event) 3305 { 3306 struct trace_entry *entry; 3307 int bits = 0; 3308 3309 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3310 return "X"; 3311 3312 entry = ring_buffer_event_data(event); 3313 3314 if (entry->flags & TRACE_FLAG_SOFTIRQ) 3315 bits |= 1; 3316 3317 if (entry->flags & TRACE_FLAG_HARDIRQ) 3318 bits |= 2; 3319 3320 if (entry->flags & TRACE_FLAG_NMI) 3321 bits |= 4; 3322 3323 return show_irq_str(bits); 3324 } 3325 3326 static const char *show_irq(struct ring_buffer_event *event) 3327 { 3328 struct trace_entry *entry; 3329 3330 if (rb_event_data_length(event) - RB_EVNT_HDR_SIZE < sizeof(*entry)) 3331 return ""; 3332 3333 entry = ring_buffer_event_data(event); 3334 if (entry->flags & TRACE_FLAG_IRQS_OFF) 3335 return "d"; 3336 return ""; 3337 } 3338 3339 static const char *show_interrupt_level(void) 3340 { 3341 unsigned long pc = preempt_count(); 3342 unsigned char level = 0; 3343 3344 if (pc & SOFTIRQ_OFFSET) 3345 level |= 1; 3346 3347 if (pc & HARDIRQ_MASK) 3348 level |= 2; 3349 3350 if (pc & NMI_MASK) 3351 level |= 4; 3352 3353 return show_irq_str(level); 3354 } 3355 3356 static void dump_buffer_page(struct buffer_data_page *bpage, 3357 struct rb_event_info *info, 3358 unsigned long tail) 3359 { 3360 struct ring_buffer_event *event; 3361 u64 ts, delta; 3362 int e; 3363 3364 ts = bpage->time_stamp; 3365 pr_warn(" [%lld] PAGE TIME STAMP\n", ts); 3366 3367 for (e = 0; e < tail; e += rb_event_length(event)) { 3368 3369 event = (struct ring_buffer_event *)(bpage->data + e); 3370 3371 switch (event->type_len) { 3372 3373 case RINGBUF_TYPE_TIME_EXTEND: 3374 delta = rb_event_time_stamp(event); 3375 ts += delta; 3376 pr_warn(" 0x%x: [%lld] delta:%lld TIME EXTEND\n", 3377 e, ts, delta); 3378 break; 3379 3380 case RINGBUF_TYPE_TIME_STAMP: 3381 delta = rb_event_time_stamp(event); 3382 ts = rb_fix_abs_ts(delta, ts); 3383 pr_warn(" 0x%x: [%lld] absolute:%lld TIME STAMP\n", 3384 e, ts, delta); 3385 break; 3386 3387 case RINGBUF_TYPE_PADDING: 3388 ts += event->time_delta; 3389 pr_warn(" 0x%x: [%lld] delta:%d PADDING\n", 3390 e, ts, event->time_delta); 3391 break; 3392 3393 case RINGBUF_TYPE_DATA: 3394 ts += event->time_delta; 3395 pr_warn(" 0x%x: [%lld] delta:%d %s%s\n", 3396 e, ts, event->time_delta, 3397 show_flags(event), show_irq(event)); 3398 break; 3399 3400 default: 3401 break; 3402 } 3403 } 3404 pr_warn("expected end:0x%lx last event actually ended at:0x%x\n", tail, e); 3405 } 3406 3407 static DEFINE_PER_CPU(atomic_t, checking); 3408 static atomic_t ts_dump; 3409 3410 #define buffer_warn_return(fmt, ...) \ 3411 do { \ 3412 /* If another report is happening, ignore this one */ \ 3413 if (atomic_inc_return(&ts_dump) != 1) { \ 3414 atomic_dec(&ts_dump); \ 3415 goto out; \ 3416 } \ 3417 atomic_inc(&cpu_buffer->record_disabled); \ 3418 pr_warn(fmt, ##__VA_ARGS__); \ 3419 dump_buffer_page(bpage, info, tail); \ 3420 atomic_dec(&ts_dump); \ 3421 /* There's some cases in boot up that this can happen */ \ 3422 if (WARN_ON_ONCE(system_state != SYSTEM_BOOTING)) \ 3423 /* Do not re-enable checking */ \ 3424 return; \ 3425 } while (0) 3426 3427 /* 3428 * Check if the current event time stamp matches the deltas on 3429 * the buffer page. 3430 */ 3431 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3432 struct rb_event_info *info, 3433 unsigned long tail) 3434 { 3435 struct ring_buffer_event *event; 3436 struct buffer_data_page *bpage; 3437 u64 ts, delta; 3438 bool full = false; 3439 int e; 3440 3441 bpage = info->tail_page->page; 3442 3443 if (tail == CHECK_FULL_PAGE) { 3444 full = true; 3445 tail = local_read(&bpage->commit); 3446 } else if (info->add_timestamp & 3447 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { 3448 /* Ignore events with absolute time stamps */ 3449 return; 3450 } 3451 3452 /* 3453 * Do not check the first event (skip possible extends too). 3454 * Also do not check if previous events have not been committed. 3455 */ 3456 if (tail <= 8 || tail > local_read(&bpage->commit)) 3457 return; 3458 3459 /* 3460 * If this interrupted another event, 3461 */ 3462 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) 3463 goto out; 3464 3465 ts = bpage->time_stamp; 3466 3467 for (e = 0; e < tail; e += rb_event_length(event)) { 3468 3469 event = (struct ring_buffer_event *)(bpage->data + e); 3470 3471 switch (event->type_len) { 3472 3473 case RINGBUF_TYPE_TIME_EXTEND: 3474 delta = rb_event_time_stamp(event); 3475 ts += delta; 3476 break; 3477 3478 case RINGBUF_TYPE_TIME_STAMP: 3479 delta = rb_event_time_stamp(event); 3480 delta = rb_fix_abs_ts(delta, ts); 3481 if (delta < ts) { 3482 buffer_warn_return("[CPU: %d]ABSOLUTE TIME WENT BACKWARDS: last ts: %lld absolute ts: %lld\n", 3483 cpu_buffer->cpu, ts, delta); 3484 } 3485 ts = delta; 3486 break; 3487 3488 case RINGBUF_TYPE_PADDING: 3489 if (event->time_delta == 1) 3490 break; 3491 fallthrough; 3492 case RINGBUF_TYPE_DATA: 3493 ts += event->time_delta; 3494 break; 3495 3496 default: 3497 RB_WARN_ON(cpu_buffer, 1); 3498 } 3499 } 3500 if ((full && ts > info->ts) || 3501 (!full && ts + info->delta != info->ts)) { 3502 buffer_warn_return("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s context:%s\n", 3503 cpu_buffer->cpu, 3504 ts + info->delta, info->ts, info->delta, 3505 info->before, info->after, 3506 full ? " (full)" : "", show_interrupt_level()); 3507 } 3508 out: 3509 atomic_dec(this_cpu_ptr(&checking)); 3510 } 3511 #else 3512 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, 3513 struct rb_event_info *info, 3514 unsigned long tail) 3515 { 3516 } 3517 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ 3518 3519 static struct ring_buffer_event * 3520 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, 3521 struct rb_event_info *info) 3522 { 3523 struct ring_buffer_event *event; 3524 struct buffer_page *tail_page; 3525 unsigned long tail, write, w; 3526 3527 /* Don't let the compiler play games with cpu_buffer->tail_page */ 3528 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); 3529 3530 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; 3531 barrier(); 3532 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3533 rb_time_read(&cpu_buffer->write_stamp, &info->after); 3534 barrier(); 3535 info->ts = rb_time_stamp(cpu_buffer->buffer); 3536 3537 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { 3538 info->delta = info->ts; 3539 } else { 3540 /* 3541 * If interrupting an event time update, we may need an 3542 * absolute timestamp. 3543 * Don't bother if this is the start of a new page (w == 0). 3544 */ 3545 if (!w) { 3546 /* Use the sub-buffer timestamp */ 3547 info->delta = 0; 3548 } else if (unlikely(info->before != info->after)) { 3549 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; 3550 info->length += RB_LEN_TIME_EXTEND; 3551 } else { 3552 info->delta = info->ts - info->after; 3553 if (unlikely(test_time_stamp(info->delta))) { 3554 info->add_timestamp |= RB_ADD_STAMP_EXTEND; 3555 info->length += RB_LEN_TIME_EXTEND; 3556 } 3557 } 3558 } 3559 3560 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); 3561 3562 /*C*/ write = local_add_return(info->length, &tail_page->write); 3563 3564 /* set write to only the index of the write */ 3565 write &= RB_WRITE_MASK; 3566 3567 tail = write - info->length; 3568 3569 /* See if we shot pass the end of this buffer page */ 3570 if (unlikely(write > cpu_buffer->buffer->subbuf_size)) { 3571 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); 3572 return rb_move_tail(cpu_buffer, tail, info); 3573 } 3574 3575 if (likely(tail == w)) { 3576 /* Nothing interrupted us between A and C */ 3577 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); 3578 /* 3579 * If something came in between C and D, the write stamp 3580 * may now not be in sync. But that's fine as the before_stamp 3581 * will be different and then next event will just be forced 3582 * to use an absolute timestamp. 3583 */ 3584 if (likely(!(info->add_timestamp & 3585 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3586 /* This did not interrupt any time update */ 3587 info->delta = info->ts - info->after; 3588 else 3589 /* Just use full timestamp for interrupting event */ 3590 info->delta = info->ts; 3591 check_buffer(cpu_buffer, info, tail); 3592 } else { 3593 u64 ts; 3594 /* SLOW PATH - Interrupted between A and C */ 3595 3596 /* Save the old before_stamp */ 3597 rb_time_read(&cpu_buffer->before_stamp, &info->before); 3598 3599 /* 3600 * Read a new timestamp and update the before_stamp to make 3601 * the next event after this one force using an absolute 3602 * timestamp. This is in case an interrupt were to come in 3603 * between E and F. 3604 */ 3605 ts = rb_time_stamp(cpu_buffer->buffer); 3606 rb_time_set(&cpu_buffer->before_stamp, ts); 3607 3608 barrier(); 3609 /*E*/ rb_time_read(&cpu_buffer->write_stamp, &info->after); 3610 barrier(); 3611 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && 3612 info->after == info->before && info->after < ts) { 3613 /* 3614 * Nothing came after this event between C and F, it is 3615 * safe to use info->after for the delta as it 3616 * matched info->before and is still valid. 3617 */ 3618 info->delta = ts - info->after; 3619 } else { 3620 /* 3621 * Interrupted between C and F: 3622 * Lost the previous events time stamp. Just set the 3623 * delta to zero, and this will be the same time as 3624 * the event this event interrupted. And the events that 3625 * came after this will still be correct (as they would 3626 * have built their delta on the previous event. 3627 */ 3628 info->delta = 0; 3629 } 3630 info->ts = ts; 3631 info->add_timestamp &= ~RB_ADD_STAMP_FORCE; 3632 } 3633 3634 /* 3635 * If this is the first commit on the page, then it has the same 3636 * timestamp as the page itself. 3637 */ 3638 if (unlikely(!tail && !(info->add_timestamp & 3639 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) 3640 info->delta = 0; 3641 3642 /* We reserved something on the buffer */ 3643 3644 event = __rb_page_index(tail_page, tail); 3645 rb_update_event(cpu_buffer, event, info); 3646 3647 local_inc(&tail_page->entries); 3648 3649 /* 3650 * If this is the first commit on the page, then update 3651 * its timestamp. 3652 */ 3653 if (unlikely(!tail)) 3654 tail_page->page->time_stamp = info->ts; 3655 3656 /* account for these added bytes */ 3657 local_add(info->length, &cpu_buffer->entries_bytes); 3658 3659 return event; 3660 } 3661 3662 static __always_inline struct ring_buffer_event * 3663 rb_reserve_next_event(struct trace_buffer *buffer, 3664 struct ring_buffer_per_cpu *cpu_buffer, 3665 unsigned long length) 3666 { 3667 struct ring_buffer_event *event; 3668 struct rb_event_info info; 3669 int nr_loops = 0; 3670 int add_ts_default; 3671 3672 /* ring buffer does cmpxchg, make sure it is safe in NMI context */ 3673 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) && 3674 (unlikely(in_nmi()))) { 3675 return NULL; 3676 } 3677 3678 rb_start_commit(cpu_buffer); 3679 /* The commit page can not change after this */ 3680 3681 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 3682 /* 3683 * Due to the ability to swap a cpu buffer from a buffer 3684 * it is possible it was swapped before we committed. 3685 * (committing stops a swap). We check for it here and 3686 * if it happened, we have to fail the write. 3687 */ 3688 barrier(); 3689 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) { 3690 local_dec(&cpu_buffer->committing); 3691 local_dec(&cpu_buffer->commits); 3692 return NULL; 3693 } 3694 #endif 3695 3696 info.length = rb_calculate_event_length(length); 3697 3698 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { 3699 add_ts_default = RB_ADD_STAMP_ABSOLUTE; 3700 info.length += RB_LEN_TIME_EXTEND; 3701 if (info.length > cpu_buffer->buffer->max_data_size) 3702 goto out_fail; 3703 } else { 3704 add_ts_default = RB_ADD_STAMP_NONE; 3705 } 3706 3707 again: 3708 info.add_timestamp = add_ts_default; 3709 info.delta = 0; 3710 3711 /* 3712 * We allow for interrupts to reenter here and do a trace. 3713 * If one does, it will cause this original code to loop 3714 * back here. Even with heavy interrupts happening, this 3715 * should only happen a few times in a row. If this happens 3716 * 1000 times in a row, there must be either an interrupt 3717 * storm or we have something buggy. 3718 * Bail! 3719 */ 3720 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) 3721 goto out_fail; 3722 3723 event = __rb_reserve_next(cpu_buffer, &info); 3724 3725 if (unlikely(PTR_ERR(event) == -EAGAIN)) { 3726 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) 3727 info.length -= RB_LEN_TIME_EXTEND; 3728 goto again; 3729 } 3730 3731 if (likely(event)) 3732 return event; 3733 out_fail: 3734 rb_end_commit(cpu_buffer); 3735 return NULL; 3736 } 3737 3738 /** 3739 * ring_buffer_lock_reserve - reserve a part of the buffer 3740 * @buffer: the ring buffer to reserve from 3741 * @length: the length of the data to reserve (excluding event header) 3742 * 3743 * Returns a reserved event on the ring buffer to copy directly to. 3744 * The user of this interface will need to get the body to write into 3745 * and can use the ring_buffer_event_data() interface. 3746 * 3747 * The length is the length of the data needed, not the event length 3748 * which also includes the event header. 3749 * 3750 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned. 3751 * If NULL is returned, then nothing has been allocated or locked. 3752 */ 3753 struct ring_buffer_event * 3754 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length) 3755 { 3756 struct ring_buffer_per_cpu *cpu_buffer; 3757 struct ring_buffer_event *event; 3758 int cpu; 3759 3760 /* If we are tracing schedule, we don't want to recurse */ 3761 preempt_disable_notrace(); 3762 3763 if (unlikely(atomic_read(&buffer->record_disabled))) 3764 goto out; 3765 3766 cpu = raw_smp_processor_id(); 3767 3768 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask))) 3769 goto out; 3770 3771 cpu_buffer = buffer->buffers[cpu]; 3772 3773 if (unlikely(atomic_read(&cpu_buffer->record_disabled))) 3774 goto out; 3775 3776 if (unlikely(length > buffer->max_data_size)) 3777 goto out; 3778 3779 if (unlikely(trace_recursive_lock(cpu_buffer))) 3780 goto out; 3781 3782 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3783 if (!event) 3784 goto out_unlock; 3785 3786 return event; 3787 3788 out_unlock: 3789 trace_recursive_unlock(cpu_buffer); 3790 out: 3791 preempt_enable_notrace(); 3792 return NULL; 3793 } 3794 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve); 3795 3796 /* 3797 * Decrement the entries to the page that an event is on. 3798 * The event does not even need to exist, only the pointer 3799 * to the page it is on. This may only be called before the commit 3800 * takes place. 3801 */ 3802 static inline void 3803 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, 3804 struct ring_buffer_event *event) 3805 { 3806 unsigned long addr = (unsigned long)event; 3807 struct buffer_page *bpage = cpu_buffer->commit_page; 3808 struct buffer_page *start; 3809 3810 addr &= ~((PAGE_SIZE << cpu_buffer->buffer->subbuf_order) - 1); 3811 3812 /* Do the likely case first */ 3813 if (likely(bpage->page == (void *)addr)) { 3814 local_dec(&bpage->entries); 3815 return; 3816 } 3817 3818 /* 3819 * Because the commit page may be on the reader page we 3820 * start with the next page and check the end loop there. 3821 */ 3822 rb_inc_page(&bpage); 3823 start = bpage; 3824 do { 3825 if (bpage->page == (void *)addr) { 3826 local_dec(&bpage->entries); 3827 return; 3828 } 3829 rb_inc_page(&bpage); 3830 } while (bpage != start); 3831 3832 /* commit not part of this buffer?? */ 3833 RB_WARN_ON(cpu_buffer, 1); 3834 } 3835 3836 /** 3837 * ring_buffer_discard_commit - discard an event that has not been committed 3838 * @buffer: the ring buffer 3839 * @event: non committed event to discard 3840 * 3841 * Sometimes an event that is in the ring buffer needs to be ignored. 3842 * This function lets the user discard an event in the ring buffer 3843 * and then that event will not be read later. 3844 * 3845 * This function only works if it is called before the item has been 3846 * committed. It will try to free the event from the ring buffer 3847 * if another event has not been added behind it. 3848 * 3849 * If another event has been added behind it, it will set the event 3850 * up as discarded, and perform the commit. 3851 * 3852 * If this function is called, do not call ring_buffer_unlock_commit on 3853 * the event. 3854 */ 3855 void ring_buffer_discard_commit(struct trace_buffer *buffer, 3856 struct ring_buffer_event *event) 3857 { 3858 struct ring_buffer_per_cpu *cpu_buffer; 3859 int cpu; 3860 3861 /* The event is discarded regardless */ 3862 rb_event_discard(event); 3863 3864 cpu = smp_processor_id(); 3865 cpu_buffer = buffer->buffers[cpu]; 3866 3867 /* 3868 * This must only be called if the event has not been 3869 * committed yet. Thus we can assume that preemption 3870 * is still disabled. 3871 */ 3872 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing)); 3873 3874 rb_decrement_entry(cpu_buffer, event); 3875 if (rb_try_to_discard(cpu_buffer, event)) 3876 goto out; 3877 3878 out: 3879 rb_end_commit(cpu_buffer); 3880 3881 trace_recursive_unlock(cpu_buffer); 3882 3883 preempt_enable_notrace(); 3884 3885 } 3886 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit); 3887 3888 /** 3889 * ring_buffer_write - write data to the buffer without reserving 3890 * @buffer: The ring buffer to write to. 3891 * @length: The length of the data being written (excluding the event header) 3892 * @data: The data to write to the buffer. 3893 * 3894 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as 3895 * one function. If you already have the data to write to the buffer, it 3896 * may be easier to simply call this function. 3897 * 3898 * Note, like ring_buffer_lock_reserve, the length is the length of the data 3899 * and not the length of the event which would hold the header. 3900 */ 3901 int ring_buffer_write(struct trace_buffer *buffer, 3902 unsigned long length, 3903 void *data) 3904 { 3905 struct ring_buffer_per_cpu *cpu_buffer; 3906 struct ring_buffer_event *event; 3907 void *body; 3908 int ret = -EBUSY; 3909 int cpu; 3910 3911 preempt_disable_notrace(); 3912 3913 if (atomic_read(&buffer->record_disabled)) 3914 goto out; 3915 3916 cpu = raw_smp_processor_id(); 3917 3918 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 3919 goto out; 3920 3921 cpu_buffer = buffer->buffers[cpu]; 3922 3923 if (atomic_read(&cpu_buffer->record_disabled)) 3924 goto out; 3925 3926 if (length > buffer->max_data_size) 3927 goto out; 3928 3929 if (unlikely(trace_recursive_lock(cpu_buffer))) 3930 goto out; 3931 3932 event = rb_reserve_next_event(buffer, cpu_buffer, length); 3933 if (!event) 3934 goto out_unlock; 3935 3936 body = rb_event_data(event); 3937 3938 memcpy(body, data, length); 3939 3940 rb_commit(cpu_buffer); 3941 3942 rb_wakeups(buffer, cpu_buffer); 3943 3944 ret = 0; 3945 3946 out_unlock: 3947 trace_recursive_unlock(cpu_buffer); 3948 3949 out: 3950 preempt_enable_notrace(); 3951 3952 return ret; 3953 } 3954 EXPORT_SYMBOL_GPL(ring_buffer_write); 3955 3956 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) 3957 { 3958 struct buffer_page *reader = cpu_buffer->reader_page; 3959 struct buffer_page *head = rb_set_head_page(cpu_buffer); 3960 struct buffer_page *commit = cpu_buffer->commit_page; 3961 3962 /* In case of error, head will be NULL */ 3963 if (unlikely(!head)) 3964 return true; 3965 3966 /* Reader should exhaust content in reader page */ 3967 if (reader->read != rb_page_size(reader)) 3968 return false; 3969 3970 /* 3971 * If writers are committing on the reader page, knowing all 3972 * committed content has been read, the ring buffer is empty. 3973 */ 3974 if (commit == reader) 3975 return true; 3976 3977 /* 3978 * If writers are committing on a page other than reader page 3979 * and head page, there should always be content to read. 3980 */ 3981 if (commit != head) 3982 return false; 3983 3984 /* 3985 * Writers are committing on the head page, we just need 3986 * to care about there're committed data, and the reader will 3987 * swap reader page with head page when it is to read data. 3988 */ 3989 return rb_page_commit(commit) == 0; 3990 } 3991 3992 /** 3993 * ring_buffer_record_disable - stop all writes into the buffer 3994 * @buffer: The ring buffer to stop writes to. 3995 * 3996 * This prevents all writes to the buffer. Any attempt to write 3997 * to the buffer after this will fail and return NULL. 3998 * 3999 * The caller should call synchronize_rcu() after this. 4000 */ 4001 void ring_buffer_record_disable(struct trace_buffer *buffer) 4002 { 4003 atomic_inc(&buffer->record_disabled); 4004 } 4005 EXPORT_SYMBOL_GPL(ring_buffer_record_disable); 4006 4007 /** 4008 * ring_buffer_record_enable - enable writes to the buffer 4009 * @buffer: The ring buffer to enable writes 4010 * 4011 * Note, multiple disables will need the same number of enables 4012 * to truly enable the writing (much like preempt_disable). 4013 */ 4014 void ring_buffer_record_enable(struct trace_buffer *buffer) 4015 { 4016 atomic_dec(&buffer->record_disabled); 4017 } 4018 EXPORT_SYMBOL_GPL(ring_buffer_record_enable); 4019 4020 /** 4021 * ring_buffer_record_off - stop all writes into the buffer 4022 * @buffer: The ring buffer to stop writes to. 4023 * 4024 * This prevents all writes to the buffer. Any attempt to write 4025 * to the buffer after this will fail and return NULL. 4026 * 4027 * This is different than ring_buffer_record_disable() as 4028 * it works like an on/off switch, where as the disable() version 4029 * must be paired with a enable(). 4030 */ 4031 void ring_buffer_record_off(struct trace_buffer *buffer) 4032 { 4033 unsigned int rd; 4034 unsigned int new_rd; 4035 4036 rd = atomic_read(&buffer->record_disabled); 4037 do { 4038 new_rd = rd | RB_BUFFER_OFF; 4039 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4040 } 4041 EXPORT_SYMBOL_GPL(ring_buffer_record_off); 4042 4043 /** 4044 * ring_buffer_record_on - restart writes into the buffer 4045 * @buffer: The ring buffer to start writes to. 4046 * 4047 * This enables all writes to the buffer that was disabled by 4048 * ring_buffer_record_off(). 4049 * 4050 * This is different than ring_buffer_record_enable() as 4051 * it works like an on/off switch, where as the enable() version 4052 * must be paired with a disable(). 4053 */ 4054 void ring_buffer_record_on(struct trace_buffer *buffer) 4055 { 4056 unsigned int rd; 4057 unsigned int new_rd; 4058 4059 rd = atomic_read(&buffer->record_disabled); 4060 do { 4061 new_rd = rd & ~RB_BUFFER_OFF; 4062 } while (!atomic_try_cmpxchg(&buffer->record_disabled, &rd, new_rd)); 4063 } 4064 EXPORT_SYMBOL_GPL(ring_buffer_record_on); 4065 4066 /** 4067 * ring_buffer_record_is_on - return true if the ring buffer can write 4068 * @buffer: The ring buffer to see if write is enabled 4069 * 4070 * Returns true if the ring buffer is in a state that it accepts writes. 4071 */ 4072 bool ring_buffer_record_is_on(struct trace_buffer *buffer) 4073 { 4074 return !atomic_read(&buffer->record_disabled); 4075 } 4076 4077 /** 4078 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable 4079 * @buffer: The ring buffer to see if write is set enabled 4080 * 4081 * Returns true if the ring buffer is set writable by ring_buffer_record_on(). 4082 * Note that this does NOT mean it is in a writable state. 4083 * 4084 * It may return true when the ring buffer has been disabled by 4085 * ring_buffer_record_disable(), as that is a temporary disabling of 4086 * the ring buffer. 4087 */ 4088 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer) 4089 { 4090 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF); 4091 } 4092 4093 /** 4094 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer 4095 * @buffer: The ring buffer to stop writes to. 4096 * @cpu: The CPU buffer to stop 4097 * 4098 * This prevents all writes to the buffer. Any attempt to write 4099 * to the buffer after this will fail and return NULL. 4100 * 4101 * The caller should call synchronize_rcu() after this. 4102 */ 4103 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu) 4104 { 4105 struct ring_buffer_per_cpu *cpu_buffer; 4106 4107 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4108 return; 4109 4110 cpu_buffer = buffer->buffers[cpu]; 4111 atomic_inc(&cpu_buffer->record_disabled); 4112 } 4113 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu); 4114 4115 /** 4116 * ring_buffer_record_enable_cpu - enable writes to the buffer 4117 * @buffer: The ring buffer to enable writes 4118 * @cpu: The CPU to enable. 4119 * 4120 * Note, multiple disables will need the same number of enables 4121 * to truly enable the writing (much like preempt_disable). 4122 */ 4123 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu) 4124 { 4125 struct ring_buffer_per_cpu *cpu_buffer; 4126 4127 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4128 return; 4129 4130 cpu_buffer = buffer->buffers[cpu]; 4131 atomic_dec(&cpu_buffer->record_disabled); 4132 } 4133 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu); 4134 4135 /* 4136 * The total entries in the ring buffer is the running counter 4137 * of entries entered into the ring buffer, minus the sum of 4138 * the entries read from the ring buffer and the number of 4139 * entries that were overwritten. 4140 */ 4141 static inline unsigned long 4142 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer) 4143 { 4144 return local_read(&cpu_buffer->entries) - 4145 (local_read(&cpu_buffer->overrun) + cpu_buffer->read); 4146 } 4147 4148 /** 4149 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer 4150 * @buffer: The ring buffer 4151 * @cpu: The per CPU buffer to read from. 4152 */ 4153 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu) 4154 { 4155 unsigned long flags; 4156 struct ring_buffer_per_cpu *cpu_buffer; 4157 struct buffer_page *bpage; 4158 u64 ret = 0; 4159 4160 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4161 return 0; 4162 4163 cpu_buffer = buffer->buffers[cpu]; 4164 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4165 /* 4166 * if the tail is on reader_page, oldest time stamp is on the reader 4167 * page 4168 */ 4169 if (cpu_buffer->tail_page == cpu_buffer->reader_page) 4170 bpage = cpu_buffer->reader_page; 4171 else 4172 bpage = rb_set_head_page(cpu_buffer); 4173 if (bpage) 4174 ret = bpage->page->time_stamp; 4175 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4176 4177 return ret; 4178 } 4179 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts); 4180 4181 /** 4182 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer 4183 * @buffer: The ring buffer 4184 * @cpu: The per CPU buffer to read from. 4185 */ 4186 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu) 4187 { 4188 struct ring_buffer_per_cpu *cpu_buffer; 4189 unsigned long ret; 4190 4191 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4192 return 0; 4193 4194 cpu_buffer = buffer->buffers[cpu]; 4195 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes; 4196 4197 return ret; 4198 } 4199 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu); 4200 4201 /** 4202 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer 4203 * @buffer: The ring buffer 4204 * @cpu: The per CPU buffer to get the entries from. 4205 */ 4206 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu) 4207 { 4208 struct ring_buffer_per_cpu *cpu_buffer; 4209 4210 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4211 return 0; 4212 4213 cpu_buffer = buffer->buffers[cpu]; 4214 4215 return rb_num_of_entries(cpu_buffer); 4216 } 4217 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu); 4218 4219 /** 4220 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring 4221 * buffer wrapping around (only if RB_FL_OVERWRITE is on). 4222 * @buffer: The ring buffer 4223 * @cpu: The per CPU buffer to get the number of overruns from 4224 */ 4225 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu) 4226 { 4227 struct ring_buffer_per_cpu *cpu_buffer; 4228 unsigned long ret; 4229 4230 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4231 return 0; 4232 4233 cpu_buffer = buffer->buffers[cpu]; 4234 ret = local_read(&cpu_buffer->overrun); 4235 4236 return ret; 4237 } 4238 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu); 4239 4240 /** 4241 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by 4242 * commits failing due to the buffer wrapping around while there are uncommitted 4243 * events, such as during an interrupt storm. 4244 * @buffer: The ring buffer 4245 * @cpu: The per CPU buffer to get the number of overruns from 4246 */ 4247 unsigned long 4248 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu) 4249 { 4250 struct ring_buffer_per_cpu *cpu_buffer; 4251 unsigned long ret; 4252 4253 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4254 return 0; 4255 4256 cpu_buffer = buffer->buffers[cpu]; 4257 ret = local_read(&cpu_buffer->commit_overrun); 4258 4259 return ret; 4260 } 4261 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu); 4262 4263 /** 4264 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by 4265 * the ring buffer filling up (only if RB_FL_OVERWRITE is off). 4266 * @buffer: The ring buffer 4267 * @cpu: The per CPU buffer to get the number of overruns from 4268 */ 4269 unsigned long 4270 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu) 4271 { 4272 struct ring_buffer_per_cpu *cpu_buffer; 4273 unsigned long ret; 4274 4275 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4276 return 0; 4277 4278 cpu_buffer = buffer->buffers[cpu]; 4279 ret = local_read(&cpu_buffer->dropped_events); 4280 4281 return ret; 4282 } 4283 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu); 4284 4285 /** 4286 * ring_buffer_read_events_cpu - get the number of events successfully read 4287 * @buffer: The ring buffer 4288 * @cpu: The per CPU buffer to get the number of events read 4289 */ 4290 unsigned long 4291 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu) 4292 { 4293 struct ring_buffer_per_cpu *cpu_buffer; 4294 4295 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4296 return 0; 4297 4298 cpu_buffer = buffer->buffers[cpu]; 4299 return cpu_buffer->read; 4300 } 4301 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu); 4302 4303 /** 4304 * ring_buffer_entries - get the number of entries in a buffer 4305 * @buffer: The ring buffer 4306 * 4307 * Returns the total number of entries in the ring buffer 4308 * (all CPU entries) 4309 */ 4310 unsigned long ring_buffer_entries(struct trace_buffer *buffer) 4311 { 4312 struct ring_buffer_per_cpu *cpu_buffer; 4313 unsigned long entries = 0; 4314 int cpu; 4315 4316 /* if you care about this being correct, lock the buffer */ 4317 for_each_buffer_cpu(buffer, cpu) { 4318 cpu_buffer = buffer->buffers[cpu]; 4319 entries += rb_num_of_entries(cpu_buffer); 4320 } 4321 4322 return entries; 4323 } 4324 EXPORT_SYMBOL_GPL(ring_buffer_entries); 4325 4326 /** 4327 * ring_buffer_overruns - get the number of overruns in buffer 4328 * @buffer: The ring buffer 4329 * 4330 * Returns the total number of overruns in the ring buffer 4331 * (all CPU entries) 4332 */ 4333 unsigned long ring_buffer_overruns(struct trace_buffer *buffer) 4334 { 4335 struct ring_buffer_per_cpu *cpu_buffer; 4336 unsigned long overruns = 0; 4337 int cpu; 4338 4339 /* if you care about this being correct, lock the buffer */ 4340 for_each_buffer_cpu(buffer, cpu) { 4341 cpu_buffer = buffer->buffers[cpu]; 4342 overruns += local_read(&cpu_buffer->overrun); 4343 } 4344 4345 return overruns; 4346 } 4347 EXPORT_SYMBOL_GPL(ring_buffer_overruns); 4348 4349 static void rb_iter_reset(struct ring_buffer_iter *iter) 4350 { 4351 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4352 4353 /* Iterator usage is expected to have record disabled */ 4354 iter->head_page = cpu_buffer->reader_page; 4355 iter->head = cpu_buffer->reader_page->read; 4356 iter->next_event = iter->head; 4357 4358 iter->cache_reader_page = iter->head_page; 4359 iter->cache_read = cpu_buffer->read; 4360 iter->cache_pages_removed = cpu_buffer->pages_removed; 4361 4362 if (iter->head) { 4363 iter->read_stamp = cpu_buffer->read_stamp; 4364 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; 4365 } else { 4366 iter->read_stamp = iter->head_page->page->time_stamp; 4367 iter->page_stamp = iter->read_stamp; 4368 } 4369 } 4370 4371 /** 4372 * ring_buffer_iter_reset - reset an iterator 4373 * @iter: The iterator to reset 4374 * 4375 * Resets the iterator, so that it will start from the beginning 4376 * again. 4377 */ 4378 void ring_buffer_iter_reset(struct ring_buffer_iter *iter) 4379 { 4380 struct ring_buffer_per_cpu *cpu_buffer; 4381 unsigned long flags; 4382 4383 if (!iter) 4384 return; 4385 4386 cpu_buffer = iter->cpu_buffer; 4387 4388 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4389 rb_iter_reset(iter); 4390 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4391 } 4392 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset); 4393 4394 /** 4395 * ring_buffer_iter_empty - check if an iterator has no more to read 4396 * @iter: The iterator to check 4397 */ 4398 int ring_buffer_iter_empty(struct ring_buffer_iter *iter) 4399 { 4400 struct ring_buffer_per_cpu *cpu_buffer; 4401 struct buffer_page *reader; 4402 struct buffer_page *head_page; 4403 struct buffer_page *commit_page; 4404 struct buffer_page *curr_commit_page; 4405 unsigned commit; 4406 u64 curr_commit_ts; 4407 u64 commit_ts; 4408 4409 cpu_buffer = iter->cpu_buffer; 4410 reader = cpu_buffer->reader_page; 4411 head_page = cpu_buffer->head_page; 4412 commit_page = READ_ONCE(cpu_buffer->commit_page); 4413 commit_ts = commit_page->page->time_stamp; 4414 4415 /* 4416 * When the writer goes across pages, it issues a cmpxchg which 4417 * is a mb(), which will synchronize with the rmb here. 4418 * (see rb_tail_page_update()) 4419 */ 4420 smp_rmb(); 4421 commit = rb_page_commit(commit_page); 4422 /* We want to make sure that the commit page doesn't change */ 4423 smp_rmb(); 4424 4425 /* Make sure commit page didn't change */ 4426 curr_commit_page = READ_ONCE(cpu_buffer->commit_page); 4427 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); 4428 4429 /* If the commit page changed, then there's more data */ 4430 if (curr_commit_page != commit_page || 4431 curr_commit_ts != commit_ts) 4432 return 0; 4433 4434 /* Still racy, as it may return a false positive, but that's OK */ 4435 return ((iter->head_page == commit_page && iter->head >= commit) || 4436 (iter->head_page == reader && commit_page == head_page && 4437 head_page->read == commit && 4438 iter->head == rb_page_size(cpu_buffer->reader_page))); 4439 } 4440 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty); 4441 4442 static void 4443 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, 4444 struct ring_buffer_event *event) 4445 { 4446 u64 delta; 4447 4448 switch (event->type_len) { 4449 case RINGBUF_TYPE_PADDING: 4450 return; 4451 4452 case RINGBUF_TYPE_TIME_EXTEND: 4453 delta = rb_event_time_stamp(event); 4454 cpu_buffer->read_stamp += delta; 4455 return; 4456 4457 case RINGBUF_TYPE_TIME_STAMP: 4458 delta = rb_event_time_stamp(event); 4459 delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); 4460 cpu_buffer->read_stamp = delta; 4461 return; 4462 4463 case RINGBUF_TYPE_DATA: 4464 cpu_buffer->read_stamp += event->time_delta; 4465 return; 4466 4467 default: 4468 RB_WARN_ON(cpu_buffer, 1); 4469 } 4470 } 4471 4472 static void 4473 rb_update_iter_read_stamp(struct ring_buffer_iter *iter, 4474 struct ring_buffer_event *event) 4475 { 4476 u64 delta; 4477 4478 switch (event->type_len) { 4479 case RINGBUF_TYPE_PADDING: 4480 return; 4481 4482 case RINGBUF_TYPE_TIME_EXTEND: 4483 delta = rb_event_time_stamp(event); 4484 iter->read_stamp += delta; 4485 return; 4486 4487 case RINGBUF_TYPE_TIME_STAMP: 4488 delta = rb_event_time_stamp(event); 4489 delta = rb_fix_abs_ts(delta, iter->read_stamp); 4490 iter->read_stamp = delta; 4491 return; 4492 4493 case RINGBUF_TYPE_DATA: 4494 iter->read_stamp += event->time_delta; 4495 return; 4496 4497 default: 4498 RB_WARN_ON(iter->cpu_buffer, 1); 4499 } 4500 } 4501 4502 static struct buffer_page * 4503 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) 4504 { 4505 struct buffer_page *reader = NULL; 4506 unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size); 4507 unsigned long overwrite; 4508 unsigned long flags; 4509 int nr_loops = 0; 4510 bool ret; 4511 4512 local_irq_save(flags); 4513 arch_spin_lock(&cpu_buffer->lock); 4514 4515 again: 4516 /* 4517 * This should normally only loop twice. But because the 4518 * start of the reader inserts an empty page, it causes 4519 * a case where we will loop three times. There should be no 4520 * reason to loop four times (that I know of). 4521 */ 4522 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) { 4523 reader = NULL; 4524 goto out; 4525 } 4526 4527 reader = cpu_buffer->reader_page; 4528 4529 /* If there's more to read, return this page */ 4530 if (cpu_buffer->reader_page->read < rb_page_size(reader)) 4531 goto out; 4532 4533 /* Never should we have an index greater than the size */ 4534 if (RB_WARN_ON(cpu_buffer, 4535 cpu_buffer->reader_page->read > rb_page_size(reader))) 4536 goto out; 4537 4538 /* check if we caught up to the tail */ 4539 reader = NULL; 4540 if (cpu_buffer->commit_page == cpu_buffer->reader_page) 4541 goto out; 4542 4543 /* Don't bother swapping if the ring buffer is empty */ 4544 if (rb_num_of_entries(cpu_buffer) == 0) 4545 goto out; 4546 4547 /* 4548 * Reset the reader page to size zero. 4549 */ 4550 local_set(&cpu_buffer->reader_page->write, 0); 4551 local_set(&cpu_buffer->reader_page->entries, 0); 4552 local_set(&cpu_buffer->reader_page->page->commit, 0); 4553 cpu_buffer->reader_page->real_end = 0; 4554 4555 spin: 4556 /* 4557 * Splice the empty reader page into the list around the head. 4558 */ 4559 reader = rb_set_head_page(cpu_buffer); 4560 if (!reader) 4561 goto out; 4562 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next); 4563 cpu_buffer->reader_page->list.prev = reader->list.prev; 4564 4565 /* 4566 * cpu_buffer->pages just needs to point to the buffer, it 4567 * has no specific buffer page to point to. Lets move it out 4568 * of our way so we don't accidentally swap it. 4569 */ 4570 cpu_buffer->pages = reader->list.prev; 4571 4572 /* The reader page will be pointing to the new head */ 4573 rb_set_list_to_head(&cpu_buffer->reader_page->list); 4574 4575 /* 4576 * We want to make sure we read the overruns after we set up our 4577 * pointers to the next object. The writer side does a 4578 * cmpxchg to cross pages which acts as the mb on the writer 4579 * side. Note, the reader will constantly fail the swap 4580 * while the writer is updating the pointers, so this 4581 * guarantees that the overwrite recorded here is the one we 4582 * want to compare with the last_overrun. 4583 */ 4584 smp_mb(); 4585 overwrite = local_read(&(cpu_buffer->overrun)); 4586 4587 /* 4588 * Here's the tricky part. 4589 * 4590 * We need to move the pointer past the header page. 4591 * But we can only do that if a writer is not currently 4592 * moving it. The page before the header page has the 4593 * flag bit '1' set if it is pointing to the page we want. 4594 * but if the writer is in the process of moving it 4595 * than it will be '2' or already moved '0'. 4596 */ 4597 4598 ret = rb_head_page_replace(reader, cpu_buffer->reader_page); 4599 4600 /* 4601 * If we did not convert it, then we must try again. 4602 */ 4603 if (!ret) 4604 goto spin; 4605 4606 /* 4607 * Yay! We succeeded in replacing the page. 4608 * 4609 * Now make the new head point back to the reader page. 4610 */ 4611 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; 4612 rb_inc_page(&cpu_buffer->head_page); 4613 4614 local_inc(&cpu_buffer->pages_read); 4615 4616 /* Finally update the reader page to the new head */ 4617 cpu_buffer->reader_page = reader; 4618 cpu_buffer->reader_page->read = 0; 4619 4620 if (overwrite != cpu_buffer->last_overrun) { 4621 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun; 4622 cpu_buffer->last_overrun = overwrite; 4623 } 4624 4625 goto again; 4626 4627 out: 4628 /* Update the read_stamp on the first event */ 4629 if (reader && reader->read == 0) 4630 cpu_buffer->read_stamp = reader->page->time_stamp; 4631 4632 arch_spin_unlock(&cpu_buffer->lock); 4633 local_irq_restore(flags); 4634 4635 /* 4636 * The writer has preempt disable, wait for it. But not forever 4637 * Although, 1 second is pretty much "forever" 4638 */ 4639 #define USECS_WAIT 1000000 4640 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { 4641 /* If the write is past the end of page, a writer is still updating it */ 4642 if (likely(!reader || rb_page_write(reader) <= bsize)) 4643 break; 4644 4645 udelay(1); 4646 4647 /* Get the latest version of the reader write value */ 4648 smp_rmb(); 4649 } 4650 4651 /* The writer is not moving forward? Something is wrong */ 4652 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) 4653 reader = NULL; 4654 4655 /* 4656 * Make sure we see any padding after the write update 4657 * (see rb_reset_tail()). 4658 * 4659 * In addition, a writer may be writing on the reader page 4660 * if the page has not been fully filled, so the read barrier 4661 * is also needed to make sure we see the content of what is 4662 * committed by the writer (see rb_set_commit_to_write()). 4663 */ 4664 smp_rmb(); 4665 4666 4667 return reader; 4668 } 4669 4670 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) 4671 { 4672 struct ring_buffer_event *event; 4673 struct buffer_page *reader; 4674 unsigned length; 4675 4676 reader = rb_get_reader_page(cpu_buffer); 4677 4678 /* This function should not be called when buffer is empty */ 4679 if (RB_WARN_ON(cpu_buffer, !reader)) 4680 return; 4681 4682 event = rb_reader_event(cpu_buffer); 4683 4684 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX) 4685 cpu_buffer->read++; 4686 4687 rb_update_read_stamp(cpu_buffer, event); 4688 4689 length = rb_event_length(event); 4690 cpu_buffer->reader_page->read += length; 4691 cpu_buffer->read_bytes += length; 4692 } 4693 4694 static void rb_advance_iter(struct ring_buffer_iter *iter) 4695 { 4696 struct ring_buffer_per_cpu *cpu_buffer; 4697 4698 cpu_buffer = iter->cpu_buffer; 4699 4700 /* If head == next_event then we need to jump to the next event */ 4701 if (iter->head == iter->next_event) { 4702 /* If the event gets overwritten again, there's nothing to do */ 4703 if (rb_iter_head_event(iter) == NULL) 4704 return; 4705 } 4706 4707 iter->head = iter->next_event; 4708 4709 /* 4710 * Check if we are at the end of the buffer. 4711 */ 4712 if (iter->next_event >= rb_page_size(iter->head_page)) { 4713 /* discarded commits can make the page empty */ 4714 if (iter->head_page == cpu_buffer->commit_page) 4715 return; 4716 rb_inc_iter(iter); 4717 return; 4718 } 4719 4720 rb_update_iter_read_stamp(iter, iter->event); 4721 } 4722 4723 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) 4724 { 4725 return cpu_buffer->lost_events; 4726 } 4727 4728 static struct ring_buffer_event * 4729 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, 4730 unsigned long *lost_events) 4731 { 4732 struct ring_buffer_event *event; 4733 struct buffer_page *reader; 4734 int nr_loops = 0; 4735 4736 if (ts) 4737 *ts = 0; 4738 again: 4739 /* 4740 * We repeat when a time extend is encountered. 4741 * Since the time extend is always attached to a data event, 4742 * we should never loop more than once. 4743 * (We never hit the following condition more than twice). 4744 */ 4745 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2)) 4746 return NULL; 4747 4748 reader = rb_get_reader_page(cpu_buffer); 4749 if (!reader) 4750 return NULL; 4751 4752 event = rb_reader_event(cpu_buffer); 4753 4754 switch (event->type_len) { 4755 case RINGBUF_TYPE_PADDING: 4756 if (rb_null_event(event)) 4757 RB_WARN_ON(cpu_buffer, 1); 4758 /* 4759 * Because the writer could be discarding every 4760 * event it creates (which would probably be bad) 4761 * if we were to go back to "again" then we may never 4762 * catch up, and will trigger the warn on, or lock 4763 * the box. Return the padding, and we will release 4764 * the current locks, and try again. 4765 */ 4766 return event; 4767 4768 case RINGBUF_TYPE_TIME_EXTEND: 4769 /* Internal data, OK to advance */ 4770 rb_advance_reader(cpu_buffer); 4771 goto again; 4772 4773 case RINGBUF_TYPE_TIME_STAMP: 4774 if (ts) { 4775 *ts = rb_event_time_stamp(event); 4776 *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); 4777 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4778 cpu_buffer->cpu, ts); 4779 } 4780 /* Internal data, OK to advance */ 4781 rb_advance_reader(cpu_buffer); 4782 goto again; 4783 4784 case RINGBUF_TYPE_DATA: 4785 if (ts && !(*ts)) { 4786 *ts = cpu_buffer->read_stamp + event->time_delta; 4787 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4788 cpu_buffer->cpu, ts); 4789 } 4790 if (lost_events) 4791 *lost_events = rb_lost_events(cpu_buffer); 4792 return event; 4793 4794 default: 4795 RB_WARN_ON(cpu_buffer, 1); 4796 } 4797 4798 return NULL; 4799 } 4800 EXPORT_SYMBOL_GPL(ring_buffer_peek); 4801 4802 static struct ring_buffer_event * 4803 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4804 { 4805 struct trace_buffer *buffer; 4806 struct ring_buffer_per_cpu *cpu_buffer; 4807 struct ring_buffer_event *event; 4808 int nr_loops = 0; 4809 4810 if (ts) 4811 *ts = 0; 4812 4813 cpu_buffer = iter->cpu_buffer; 4814 buffer = cpu_buffer->buffer; 4815 4816 /* 4817 * Check if someone performed a consuming read to the buffer 4818 * or removed some pages from the buffer. In these cases, 4819 * iterator was invalidated and we need to reset it. 4820 */ 4821 if (unlikely(iter->cache_read != cpu_buffer->read || 4822 iter->cache_reader_page != cpu_buffer->reader_page || 4823 iter->cache_pages_removed != cpu_buffer->pages_removed)) 4824 rb_iter_reset(iter); 4825 4826 again: 4827 if (ring_buffer_iter_empty(iter)) 4828 return NULL; 4829 4830 /* 4831 * As the writer can mess with what the iterator is trying 4832 * to read, just give up if we fail to get an event after 4833 * three tries. The iterator is not as reliable when reading 4834 * the ring buffer with an active write as the consumer is. 4835 * Do not warn if the three failures is reached. 4836 */ 4837 if (++nr_loops > 3) 4838 return NULL; 4839 4840 if (rb_per_cpu_empty(cpu_buffer)) 4841 return NULL; 4842 4843 if (iter->head >= rb_page_size(iter->head_page)) { 4844 rb_inc_iter(iter); 4845 goto again; 4846 } 4847 4848 event = rb_iter_head_event(iter); 4849 if (!event) 4850 goto again; 4851 4852 switch (event->type_len) { 4853 case RINGBUF_TYPE_PADDING: 4854 if (rb_null_event(event)) { 4855 rb_inc_iter(iter); 4856 goto again; 4857 } 4858 rb_advance_iter(iter); 4859 return event; 4860 4861 case RINGBUF_TYPE_TIME_EXTEND: 4862 /* Internal data, OK to advance */ 4863 rb_advance_iter(iter); 4864 goto again; 4865 4866 case RINGBUF_TYPE_TIME_STAMP: 4867 if (ts) { 4868 *ts = rb_event_time_stamp(event); 4869 *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); 4870 ring_buffer_normalize_time_stamp(cpu_buffer->buffer, 4871 cpu_buffer->cpu, ts); 4872 } 4873 /* Internal data, OK to advance */ 4874 rb_advance_iter(iter); 4875 goto again; 4876 4877 case RINGBUF_TYPE_DATA: 4878 if (ts && !(*ts)) { 4879 *ts = iter->read_stamp + event->time_delta; 4880 ring_buffer_normalize_time_stamp(buffer, 4881 cpu_buffer->cpu, ts); 4882 } 4883 return event; 4884 4885 default: 4886 RB_WARN_ON(cpu_buffer, 1); 4887 } 4888 4889 return NULL; 4890 } 4891 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek); 4892 4893 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer) 4894 { 4895 if (likely(!in_nmi())) { 4896 raw_spin_lock(&cpu_buffer->reader_lock); 4897 return true; 4898 } 4899 4900 /* 4901 * If an NMI die dumps out the content of the ring buffer 4902 * trylock must be used to prevent a deadlock if the NMI 4903 * preempted a task that holds the ring buffer locks. If 4904 * we get the lock then all is fine, if not, then continue 4905 * to do the read, but this can corrupt the ring buffer, 4906 * so it must be permanently disabled from future writes. 4907 * Reading from NMI is a oneshot deal. 4908 */ 4909 if (raw_spin_trylock(&cpu_buffer->reader_lock)) 4910 return true; 4911 4912 /* Continue without locking, but disable the ring buffer */ 4913 atomic_inc(&cpu_buffer->record_disabled); 4914 return false; 4915 } 4916 4917 static inline void 4918 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked) 4919 { 4920 if (likely(locked)) 4921 raw_spin_unlock(&cpu_buffer->reader_lock); 4922 } 4923 4924 /** 4925 * ring_buffer_peek - peek at the next event to be read 4926 * @buffer: The ring buffer to read 4927 * @cpu: The cpu to peak at 4928 * @ts: The timestamp counter of this event. 4929 * @lost_events: a variable to store if events were lost (may be NULL) 4930 * 4931 * This will return the event that will be read next, but does 4932 * not consume the data. 4933 */ 4934 struct ring_buffer_event * 4935 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, 4936 unsigned long *lost_events) 4937 { 4938 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 4939 struct ring_buffer_event *event; 4940 unsigned long flags; 4941 bool dolock; 4942 4943 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 4944 return NULL; 4945 4946 again: 4947 local_irq_save(flags); 4948 dolock = rb_reader_lock(cpu_buffer); 4949 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 4950 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4951 rb_advance_reader(cpu_buffer); 4952 rb_reader_unlock(cpu_buffer, dolock); 4953 local_irq_restore(flags); 4954 4955 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4956 goto again; 4957 4958 return event; 4959 } 4960 4961 /** ring_buffer_iter_dropped - report if there are dropped events 4962 * @iter: The ring buffer iterator 4963 * 4964 * Returns true if there was dropped events since the last peek. 4965 */ 4966 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) 4967 { 4968 bool ret = iter->missed_events != 0; 4969 4970 iter->missed_events = 0; 4971 return ret; 4972 } 4973 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); 4974 4975 /** 4976 * ring_buffer_iter_peek - peek at the next event to be read 4977 * @iter: The ring buffer iterator 4978 * @ts: The timestamp counter of this event. 4979 * 4980 * This will return the event that will be read next, but does 4981 * not increment the iterator. 4982 */ 4983 struct ring_buffer_event * 4984 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts) 4985 { 4986 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 4987 struct ring_buffer_event *event; 4988 unsigned long flags; 4989 4990 again: 4991 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 4992 event = rb_iter_peek(iter, ts); 4993 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 4994 4995 if (event && event->type_len == RINGBUF_TYPE_PADDING) 4996 goto again; 4997 4998 return event; 4999 } 5000 5001 /** 5002 * ring_buffer_consume - return an event and consume it 5003 * @buffer: The ring buffer to get the next event from 5004 * @cpu: the cpu to read the buffer from 5005 * @ts: a variable to store the timestamp (may be NULL) 5006 * @lost_events: a variable to store if events were lost (may be NULL) 5007 * 5008 * Returns the next event in the ring buffer, and that event is consumed. 5009 * Meaning, that sequential reads will keep returning a different event, 5010 * and eventually empty the ring buffer if the producer is slower. 5011 */ 5012 struct ring_buffer_event * 5013 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts, 5014 unsigned long *lost_events) 5015 { 5016 struct ring_buffer_per_cpu *cpu_buffer; 5017 struct ring_buffer_event *event = NULL; 5018 unsigned long flags; 5019 bool dolock; 5020 5021 again: 5022 /* might be called in atomic */ 5023 preempt_disable(); 5024 5025 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5026 goto out; 5027 5028 cpu_buffer = buffer->buffers[cpu]; 5029 local_irq_save(flags); 5030 dolock = rb_reader_lock(cpu_buffer); 5031 5032 event = rb_buffer_peek(cpu_buffer, ts, lost_events); 5033 if (event) { 5034 cpu_buffer->lost_events = 0; 5035 rb_advance_reader(cpu_buffer); 5036 } 5037 5038 rb_reader_unlock(cpu_buffer, dolock); 5039 local_irq_restore(flags); 5040 5041 out: 5042 preempt_enable(); 5043 5044 if (event && event->type_len == RINGBUF_TYPE_PADDING) 5045 goto again; 5046 5047 return event; 5048 } 5049 EXPORT_SYMBOL_GPL(ring_buffer_consume); 5050 5051 /** 5052 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer 5053 * @buffer: The ring buffer to read from 5054 * @cpu: The cpu buffer to iterate over 5055 * @flags: gfp flags to use for memory allocation 5056 * 5057 * This performs the initial preparations necessary to iterate 5058 * through the buffer. Memory is allocated, buffer resizing 5059 * is disabled, and the iterator pointer is returned to the caller. 5060 * 5061 * After a sequence of ring_buffer_read_prepare calls, the user is 5062 * expected to make at least one call to ring_buffer_read_prepare_sync. 5063 * Afterwards, ring_buffer_read_start is invoked to get things going 5064 * for real. 5065 * 5066 * This overall must be paired with ring_buffer_read_finish. 5067 */ 5068 struct ring_buffer_iter * 5069 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) 5070 { 5071 struct ring_buffer_per_cpu *cpu_buffer; 5072 struct ring_buffer_iter *iter; 5073 5074 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5075 return NULL; 5076 5077 iter = kzalloc(sizeof(*iter), flags); 5078 if (!iter) 5079 return NULL; 5080 5081 /* Holds the entire event: data and meta data */ 5082 iter->event_size = buffer->subbuf_size; 5083 iter->event = kmalloc(iter->event_size, flags); 5084 if (!iter->event) { 5085 kfree(iter); 5086 return NULL; 5087 } 5088 5089 cpu_buffer = buffer->buffers[cpu]; 5090 5091 iter->cpu_buffer = cpu_buffer; 5092 5093 atomic_inc(&cpu_buffer->resize_disabled); 5094 5095 return iter; 5096 } 5097 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare); 5098 5099 /** 5100 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls 5101 * 5102 * All previously invoked ring_buffer_read_prepare calls to prepare 5103 * iterators will be synchronized. Afterwards, read_buffer_read_start 5104 * calls on those iterators are allowed. 5105 */ 5106 void 5107 ring_buffer_read_prepare_sync(void) 5108 { 5109 synchronize_rcu(); 5110 } 5111 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync); 5112 5113 /** 5114 * ring_buffer_read_start - start a non consuming read of the buffer 5115 * @iter: The iterator returned by ring_buffer_read_prepare 5116 * 5117 * This finalizes the startup of an iteration through the buffer. 5118 * The iterator comes from a call to ring_buffer_read_prepare and 5119 * an intervening ring_buffer_read_prepare_sync must have been 5120 * performed. 5121 * 5122 * Must be paired with ring_buffer_read_finish. 5123 */ 5124 void 5125 ring_buffer_read_start(struct ring_buffer_iter *iter) 5126 { 5127 struct ring_buffer_per_cpu *cpu_buffer; 5128 unsigned long flags; 5129 5130 if (!iter) 5131 return; 5132 5133 cpu_buffer = iter->cpu_buffer; 5134 5135 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5136 arch_spin_lock(&cpu_buffer->lock); 5137 rb_iter_reset(iter); 5138 arch_spin_unlock(&cpu_buffer->lock); 5139 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5140 } 5141 EXPORT_SYMBOL_GPL(ring_buffer_read_start); 5142 5143 /** 5144 * ring_buffer_read_finish - finish reading the iterator of the buffer 5145 * @iter: The iterator retrieved by ring_buffer_start 5146 * 5147 * This re-enables resizing of the buffer, and frees the iterator. 5148 */ 5149 void 5150 ring_buffer_read_finish(struct ring_buffer_iter *iter) 5151 { 5152 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5153 unsigned long flags; 5154 5155 /* Use this opportunity to check the integrity of the ring buffer. */ 5156 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5157 rb_check_pages(cpu_buffer); 5158 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5159 5160 atomic_dec(&cpu_buffer->resize_disabled); 5161 kfree(iter->event); 5162 kfree(iter); 5163 } 5164 EXPORT_SYMBOL_GPL(ring_buffer_read_finish); 5165 5166 /** 5167 * ring_buffer_iter_advance - advance the iterator to the next location 5168 * @iter: The ring buffer iterator 5169 * 5170 * Move the location of the iterator such that the next read will 5171 * be the next location of the iterator. 5172 */ 5173 void ring_buffer_iter_advance(struct ring_buffer_iter *iter) 5174 { 5175 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; 5176 unsigned long flags; 5177 5178 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5179 5180 rb_advance_iter(iter); 5181 5182 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5183 } 5184 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); 5185 5186 /** 5187 * ring_buffer_size - return the size of the ring buffer (in bytes) 5188 * @buffer: The ring buffer. 5189 * @cpu: The CPU to get ring buffer size from. 5190 */ 5191 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu) 5192 { 5193 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5194 return 0; 5195 5196 return buffer->subbuf_size * buffer->buffers[cpu]->nr_pages; 5197 } 5198 EXPORT_SYMBOL_GPL(ring_buffer_size); 5199 5200 /** 5201 * ring_buffer_max_event_size - return the max data size of an event 5202 * @buffer: The ring buffer. 5203 * 5204 * Returns the maximum size an event can be. 5205 */ 5206 unsigned long ring_buffer_max_event_size(struct trace_buffer *buffer) 5207 { 5208 /* If abs timestamp is requested, events have a timestamp too */ 5209 if (ring_buffer_time_stamp_abs(buffer)) 5210 return buffer->max_data_size - RB_LEN_TIME_EXTEND; 5211 return buffer->max_data_size; 5212 } 5213 EXPORT_SYMBOL_GPL(ring_buffer_max_event_size); 5214 5215 static void rb_clear_buffer_page(struct buffer_page *page) 5216 { 5217 local_set(&page->write, 0); 5218 local_set(&page->entries, 0); 5219 rb_init_page(page->page); 5220 page->read = 0; 5221 } 5222 5223 static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 5224 { 5225 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 5226 5227 meta->reader.read = cpu_buffer->reader_page->read; 5228 meta->reader.id = cpu_buffer->reader_page->id; 5229 meta->reader.lost_events = cpu_buffer->lost_events; 5230 5231 meta->entries = local_read(&cpu_buffer->entries); 5232 meta->overrun = local_read(&cpu_buffer->overrun); 5233 meta->read = cpu_buffer->read; 5234 5235 /* Some archs do not have data cache coherency between kernel and user-space */ 5236 flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page)); 5237 } 5238 5239 static void 5240 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) 5241 { 5242 struct buffer_page *page; 5243 5244 rb_head_page_deactivate(cpu_buffer); 5245 5246 cpu_buffer->head_page 5247 = list_entry(cpu_buffer->pages, struct buffer_page, list); 5248 rb_clear_buffer_page(cpu_buffer->head_page); 5249 list_for_each_entry(page, cpu_buffer->pages, list) { 5250 rb_clear_buffer_page(page); 5251 } 5252 5253 cpu_buffer->tail_page = cpu_buffer->head_page; 5254 cpu_buffer->commit_page = cpu_buffer->head_page; 5255 5256 INIT_LIST_HEAD(&cpu_buffer->reader_page->list); 5257 INIT_LIST_HEAD(&cpu_buffer->new_pages); 5258 rb_clear_buffer_page(cpu_buffer->reader_page); 5259 5260 local_set(&cpu_buffer->entries_bytes, 0); 5261 local_set(&cpu_buffer->overrun, 0); 5262 local_set(&cpu_buffer->commit_overrun, 0); 5263 local_set(&cpu_buffer->dropped_events, 0); 5264 local_set(&cpu_buffer->entries, 0); 5265 local_set(&cpu_buffer->committing, 0); 5266 local_set(&cpu_buffer->commits, 0); 5267 local_set(&cpu_buffer->pages_touched, 0); 5268 local_set(&cpu_buffer->pages_lost, 0); 5269 local_set(&cpu_buffer->pages_read, 0); 5270 cpu_buffer->last_pages_touch = 0; 5271 cpu_buffer->shortest_full = 0; 5272 cpu_buffer->read = 0; 5273 cpu_buffer->read_bytes = 0; 5274 5275 rb_time_set(&cpu_buffer->write_stamp, 0); 5276 rb_time_set(&cpu_buffer->before_stamp, 0); 5277 5278 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); 5279 5280 cpu_buffer->lost_events = 0; 5281 cpu_buffer->last_overrun = 0; 5282 5283 if (cpu_buffer->mapped) 5284 rb_update_meta_page(cpu_buffer); 5285 5286 rb_head_page_activate(cpu_buffer); 5287 cpu_buffer->pages_removed = 0; 5288 } 5289 5290 /* Must have disabled the cpu buffer then done a synchronize_rcu */ 5291 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) 5292 { 5293 unsigned long flags; 5294 5295 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5296 5297 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) 5298 goto out; 5299 5300 arch_spin_lock(&cpu_buffer->lock); 5301 5302 rb_reset_cpu(cpu_buffer); 5303 5304 arch_spin_unlock(&cpu_buffer->lock); 5305 5306 out: 5307 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5308 } 5309 5310 /** 5311 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer 5312 * @buffer: The ring buffer to reset a per cpu buffer of 5313 * @cpu: The CPU buffer to be reset 5314 */ 5315 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) 5316 { 5317 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5318 5319 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5320 return; 5321 5322 /* prevent another thread from changing buffer sizes */ 5323 mutex_lock(&buffer->mutex); 5324 5325 atomic_inc(&cpu_buffer->resize_disabled); 5326 atomic_inc(&cpu_buffer->record_disabled); 5327 5328 /* Make sure all commits have finished */ 5329 synchronize_rcu(); 5330 5331 reset_disabled_cpu_buffer(cpu_buffer); 5332 5333 atomic_dec(&cpu_buffer->record_disabled); 5334 atomic_dec(&cpu_buffer->resize_disabled); 5335 5336 mutex_unlock(&buffer->mutex); 5337 } 5338 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); 5339 5340 /* Flag to ensure proper resetting of atomic variables */ 5341 #define RESET_BIT (1 << 30) 5342 5343 /** 5344 * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer 5345 * @buffer: The ring buffer to reset a per cpu buffer of 5346 */ 5347 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) 5348 { 5349 struct ring_buffer_per_cpu *cpu_buffer; 5350 int cpu; 5351 5352 /* prevent another thread from changing buffer sizes */ 5353 mutex_lock(&buffer->mutex); 5354 5355 for_each_online_buffer_cpu(buffer, cpu) { 5356 cpu_buffer = buffer->buffers[cpu]; 5357 5358 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled); 5359 atomic_inc(&cpu_buffer->record_disabled); 5360 } 5361 5362 /* Make sure all commits have finished */ 5363 synchronize_rcu(); 5364 5365 for_each_buffer_cpu(buffer, cpu) { 5366 cpu_buffer = buffer->buffers[cpu]; 5367 5368 /* 5369 * If a CPU came online during the synchronize_rcu(), then 5370 * ignore it. 5371 */ 5372 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT)) 5373 continue; 5374 5375 reset_disabled_cpu_buffer(cpu_buffer); 5376 5377 atomic_dec(&cpu_buffer->record_disabled); 5378 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled); 5379 } 5380 5381 mutex_unlock(&buffer->mutex); 5382 } 5383 5384 /** 5385 * ring_buffer_reset - reset a ring buffer 5386 * @buffer: The ring buffer to reset all cpu buffers 5387 */ 5388 void ring_buffer_reset(struct trace_buffer *buffer) 5389 { 5390 struct ring_buffer_per_cpu *cpu_buffer; 5391 int cpu; 5392 5393 /* prevent another thread from changing buffer sizes */ 5394 mutex_lock(&buffer->mutex); 5395 5396 for_each_buffer_cpu(buffer, cpu) { 5397 cpu_buffer = buffer->buffers[cpu]; 5398 5399 atomic_inc(&cpu_buffer->resize_disabled); 5400 atomic_inc(&cpu_buffer->record_disabled); 5401 } 5402 5403 /* Make sure all commits have finished */ 5404 synchronize_rcu(); 5405 5406 for_each_buffer_cpu(buffer, cpu) { 5407 cpu_buffer = buffer->buffers[cpu]; 5408 5409 reset_disabled_cpu_buffer(cpu_buffer); 5410 5411 atomic_dec(&cpu_buffer->record_disabled); 5412 atomic_dec(&cpu_buffer->resize_disabled); 5413 } 5414 5415 mutex_unlock(&buffer->mutex); 5416 } 5417 EXPORT_SYMBOL_GPL(ring_buffer_reset); 5418 5419 /** 5420 * ring_buffer_empty - is the ring buffer empty? 5421 * @buffer: The ring buffer to test 5422 */ 5423 bool ring_buffer_empty(struct trace_buffer *buffer) 5424 { 5425 struct ring_buffer_per_cpu *cpu_buffer; 5426 unsigned long flags; 5427 bool dolock; 5428 bool ret; 5429 int cpu; 5430 5431 /* yes this is racy, but if you don't like the race, lock the buffer */ 5432 for_each_buffer_cpu(buffer, cpu) { 5433 cpu_buffer = buffer->buffers[cpu]; 5434 local_irq_save(flags); 5435 dolock = rb_reader_lock(cpu_buffer); 5436 ret = rb_per_cpu_empty(cpu_buffer); 5437 rb_reader_unlock(cpu_buffer, dolock); 5438 local_irq_restore(flags); 5439 5440 if (!ret) 5441 return false; 5442 } 5443 5444 return true; 5445 } 5446 EXPORT_SYMBOL_GPL(ring_buffer_empty); 5447 5448 /** 5449 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty? 5450 * @buffer: The ring buffer 5451 * @cpu: The CPU buffer to test 5452 */ 5453 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu) 5454 { 5455 struct ring_buffer_per_cpu *cpu_buffer; 5456 unsigned long flags; 5457 bool dolock; 5458 bool ret; 5459 5460 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5461 return true; 5462 5463 cpu_buffer = buffer->buffers[cpu]; 5464 local_irq_save(flags); 5465 dolock = rb_reader_lock(cpu_buffer); 5466 ret = rb_per_cpu_empty(cpu_buffer); 5467 rb_reader_unlock(cpu_buffer, dolock); 5468 local_irq_restore(flags); 5469 5470 return ret; 5471 } 5472 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu); 5473 5474 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP 5475 /** 5476 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers 5477 * @buffer_a: One buffer to swap with 5478 * @buffer_b: The other buffer to swap with 5479 * @cpu: the CPU of the buffers to swap 5480 * 5481 * This function is useful for tracers that want to take a "snapshot" 5482 * of a CPU buffer and has another back up buffer lying around. 5483 * it is expected that the tracer handles the cpu buffer not being 5484 * used at the moment. 5485 */ 5486 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, 5487 struct trace_buffer *buffer_b, int cpu) 5488 { 5489 struct ring_buffer_per_cpu *cpu_buffer_a; 5490 struct ring_buffer_per_cpu *cpu_buffer_b; 5491 int ret = -EINVAL; 5492 5493 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) || 5494 !cpumask_test_cpu(cpu, buffer_b->cpumask)) 5495 goto out; 5496 5497 cpu_buffer_a = buffer_a->buffers[cpu]; 5498 cpu_buffer_b = buffer_b->buffers[cpu]; 5499 5500 /* It's up to the callers to not try to swap mapped buffers */ 5501 if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) { 5502 ret = -EBUSY; 5503 goto out; 5504 } 5505 5506 /* At least make sure the two buffers are somewhat the same */ 5507 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) 5508 goto out; 5509 5510 if (buffer_a->subbuf_order != buffer_b->subbuf_order) 5511 goto out; 5512 5513 ret = -EAGAIN; 5514 5515 if (atomic_read(&buffer_a->record_disabled)) 5516 goto out; 5517 5518 if (atomic_read(&buffer_b->record_disabled)) 5519 goto out; 5520 5521 if (atomic_read(&cpu_buffer_a->record_disabled)) 5522 goto out; 5523 5524 if (atomic_read(&cpu_buffer_b->record_disabled)) 5525 goto out; 5526 5527 /* 5528 * We can't do a synchronize_rcu here because this 5529 * function can be called in atomic context. 5530 * Normally this will be called from the same CPU as cpu. 5531 * If not it's up to the caller to protect this. 5532 */ 5533 atomic_inc(&cpu_buffer_a->record_disabled); 5534 atomic_inc(&cpu_buffer_b->record_disabled); 5535 5536 ret = -EBUSY; 5537 if (local_read(&cpu_buffer_a->committing)) 5538 goto out_dec; 5539 if (local_read(&cpu_buffer_b->committing)) 5540 goto out_dec; 5541 5542 /* 5543 * When resize is in progress, we cannot swap it because 5544 * it will mess the state of the cpu buffer. 5545 */ 5546 if (atomic_read(&buffer_a->resizing)) 5547 goto out_dec; 5548 if (atomic_read(&buffer_b->resizing)) 5549 goto out_dec; 5550 5551 buffer_a->buffers[cpu] = cpu_buffer_b; 5552 buffer_b->buffers[cpu] = cpu_buffer_a; 5553 5554 cpu_buffer_b->buffer = buffer_a; 5555 cpu_buffer_a->buffer = buffer_b; 5556 5557 ret = 0; 5558 5559 out_dec: 5560 atomic_dec(&cpu_buffer_a->record_disabled); 5561 atomic_dec(&cpu_buffer_b->record_disabled); 5562 out: 5563 return ret; 5564 } 5565 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu); 5566 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */ 5567 5568 /** 5569 * ring_buffer_alloc_read_page - allocate a page to read from buffer 5570 * @buffer: the buffer to allocate for. 5571 * @cpu: the cpu buffer to allocate. 5572 * 5573 * This function is used in conjunction with ring_buffer_read_page. 5574 * When reading a full page from the ring buffer, these functions 5575 * can be used to speed up the process. The calling function should 5576 * allocate a few pages first with this function. Then when it 5577 * needs to get pages from the ring buffer, it passes the result 5578 * of this function into ring_buffer_read_page, which will swap 5579 * the page that was allocated, with the read page of the buffer. 5580 * 5581 * Returns: 5582 * The page allocated, or ERR_PTR 5583 */ 5584 struct buffer_data_read_page * 5585 ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu) 5586 { 5587 struct ring_buffer_per_cpu *cpu_buffer; 5588 struct buffer_data_read_page *bpage = NULL; 5589 unsigned long flags; 5590 struct page *page; 5591 5592 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5593 return ERR_PTR(-ENODEV); 5594 5595 bpage = kzalloc(sizeof(*bpage), GFP_KERNEL); 5596 if (!bpage) 5597 return ERR_PTR(-ENOMEM); 5598 5599 bpage->order = buffer->subbuf_order; 5600 cpu_buffer = buffer->buffers[cpu]; 5601 local_irq_save(flags); 5602 arch_spin_lock(&cpu_buffer->lock); 5603 5604 if (cpu_buffer->free_page) { 5605 bpage->data = cpu_buffer->free_page; 5606 cpu_buffer->free_page = NULL; 5607 } 5608 5609 arch_spin_unlock(&cpu_buffer->lock); 5610 local_irq_restore(flags); 5611 5612 if (bpage->data) 5613 goto out; 5614 5615 page = alloc_pages_node(cpu_to_node(cpu), 5616 GFP_KERNEL | __GFP_NORETRY | __GFP_COMP | __GFP_ZERO, 5617 cpu_buffer->buffer->subbuf_order); 5618 if (!page) { 5619 kfree(bpage); 5620 return ERR_PTR(-ENOMEM); 5621 } 5622 5623 bpage->data = page_address(page); 5624 5625 out: 5626 rb_init_page(bpage->data); 5627 5628 return bpage; 5629 } 5630 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page); 5631 5632 /** 5633 * ring_buffer_free_read_page - free an allocated read page 5634 * @buffer: the buffer the page was allocate for 5635 * @cpu: the cpu buffer the page came from 5636 * @data_page: the page to free 5637 * 5638 * Free a page allocated from ring_buffer_alloc_read_page. 5639 */ 5640 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, 5641 struct buffer_data_read_page *data_page) 5642 { 5643 struct ring_buffer_per_cpu *cpu_buffer; 5644 struct buffer_data_page *bpage = data_page->data; 5645 struct page *page = virt_to_page(bpage); 5646 unsigned long flags; 5647 5648 if (!buffer || !buffer->buffers || !buffer->buffers[cpu]) 5649 return; 5650 5651 cpu_buffer = buffer->buffers[cpu]; 5652 5653 /* 5654 * If the page is still in use someplace else, or order of the page 5655 * is different from the subbuffer order of the buffer - 5656 * we can't reuse it 5657 */ 5658 if (page_ref_count(page) > 1 || data_page->order != buffer->subbuf_order) 5659 goto out; 5660 5661 local_irq_save(flags); 5662 arch_spin_lock(&cpu_buffer->lock); 5663 5664 if (!cpu_buffer->free_page) { 5665 cpu_buffer->free_page = bpage; 5666 bpage = NULL; 5667 } 5668 5669 arch_spin_unlock(&cpu_buffer->lock); 5670 local_irq_restore(flags); 5671 5672 out: 5673 free_pages((unsigned long)bpage, data_page->order); 5674 kfree(data_page); 5675 } 5676 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page); 5677 5678 /** 5679 * ring_buffer_read_page - extract a page from the ring buffer 5680 * @buffer: buffer to extract from 5681 * @data_page: the page to use allocated from ring_buffer_alloc_read_page 5682 * @len: amount to extract 5683 * @cpu: the cpu of the buffer to extract 5684 * @full: should the extraction only happen when the page is full. 5685 * 5686 * This function will pull out a page from the ring buffer and consume it. 5687 * @data_page must be the address of the variable that was returned 5688 * from ring_buffer_alloc_read_page. This is because the page might be used 5689 * to swap with a page in the ring buffer. 5690 * 5691 * for example: 5692 * rpage = ring_buffer_alloc_read_page(buffer, cpu); 5693 * if (IS_ERR(rpage)) 5694 * return PTR_ERR(rpage); 5695 * ret = ring_buffer_read_page(buffer, rpage, len, cpu, 0); 5696 * if (ret >= 0) 5697 * process_page(ring_buffer_read_page_data(rpage), ret); 5698 * ring_buffer_free_read_page(buffer, cpu, rpage); 5699 * 5700 * When @full is set, the function will not return true unless 5701 * the writer is off the reader page. 5702 * 5703 * Note: it is up to the calling functions to handle sleeps and wakeups. 5704 * The ring buffer can be used anywhere in the kernel and can not 5705 * blindly call wake_up. The layer that uses the ring buffer must be 5706 * responsible for that. 5707 * 5708 * Returns: 5709 * >=0 if data has been transferred, returns the offset of consumed data. 5710 * <0 if no data has been transferred. 5711 */ 5712 int ring_buffer_read_page(struct trace_buffer *buffer, 5713 struct buffer_data_read_page *data_page, 5714 size_t len, int cpu, int full) 5715 { 5716 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; 5717 struct ring_buffer_event *event; 5718 struct buffer_data_page *bpage; 5719 struct buffer_page *reader; 5720 unsigned long missed_events; 5721 unsigned long flags; 5722 unsigned int commit; 5723 unsigned int read; 5724 u64 save_timestamp; 5725 int ret = -1; 5726 5727 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5728 goto out; 5729 5730 /* 5731 * If len is not big enough to hold the page header, then 5732 * we can not copy anything. 5733 */ 5734 if (len <= BUF_PAGE_HDR_SIZE) 5735 goto out; 5736 5737 len -= BUF_PAGE_HDR_SIZE; 5738 5739 if (!data_page || !data_page->data) 5740 goto out; 5741 if (data_page->order != buffer->subbuf_order) 5742 goto out; 5743 5744 bpage = data_page->data; 5745 if (!bpage) 5746 goto out; 5747 5748 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 5749 5750 reader = rb_get_reader_page(cpu_buffer); 5751 if (!reader) 5752 goto out_unlock; 5753 5754 event = rb_reader_event(cpu_buffer); 5755 5756 read = reader->read; 5757 commit = rb_page_size(reader); 5758 5759 /* Check if any events were dropped */ 5760 missed_events = cpu_buffer->lost_events; 5761 5762 /* 5763 * If this page has been partially read or 5764 * if len is not big enough to read the rest of the page or 5765 * a writer is still on the page, then 5766 * we must copy the data from the page to the buffer. 5767 * Otherwise, we can simply swap the page with the one passed in. 5768 */ 5769 if (read || (len < (commit - read)) || 5770 cpu_buffer->reader_page == cpu_buffer->commit_page || 5771 cpu_buffer->mapped) { 5772 struct buffer_data_page *rpage = cpu_buffer->reader_page->page; 5773 unsigned int rpos = read; 5774 unsigned int pos = 0; 5775 unsigned int size; 5776 5777 /* 5778 * If a full page is expected, this can still be returned 5779 * if there's been a previous partial read and the 5780 * rest of the page can be read and the commit page is off 5781 * the reader page. 5782 */ 5783 if (full && 5784 (!read || (len < (commit - read)) || 5785 cpu_buffer->reader_page == cpu_buffer->commit_page)) 5786 goto out_unlock; 5787 5788 if (len > (commit - read)) 5789 len = (commit - read); 5790 5791 /* Always keep the time extend and data together */ 5792 size = rb_event_ts_length(event); 5793 5794 if (len < size) 5795 goto out_unlock; 5796 5797 /* save the current timestamp, since the user will need it */ 5798 save_timestamp = cpu_buffer->read_stamp; 5799 5800 /* Need to copy one event at a time */ 5801 do { 5802 /* We need the size of one event, because 5803 * rb_advance_reader only advances by one event, 5804 * whereas rb_event_ts_length may include the size of 5805 * one or two events. 5806 * We have already ensured there's enough space if this 5807 * is a time extend. */ 5808 size = rb_event_length(event); 5809 memcpy(bpage->data + pos, rpage->data + rpos, size); 5810 5811 len -= size; 5812 5813 rb_advance_reader(cpu_buffer); 5814 rpos = reader->read; 5815 pos += size; 5816 5817 if (rpos >= commit) 5818 break; 5819 5820 event = rb_reader_event(cpu_buffer); 5821 /* Always keep the time extend and data together */ 5822 size = rb_event_ts_length(event); 5823 } while (len >= size); 5824 5825 /* update bpage */ 5826 local_set(&bpage->commit, pos); 5827 bpage->time_stamp = save_timestamp; 5828 5829 /* we copied everything to the beginning */ 5830 read = 0; 5831 } else { 5832 /* update the entry counter */ 5833 cpu_buffer->read += rb_page_entries(reader); 5834 cpu_buffer->read_bytes += rb_page_size(reader); 5835 5836 /* swap the pages */ 5837 rb_init_page(bpage); 5838 bpage = reader->page; 5839 reader->page = data_page->data; 5840 local_set(&reader->write, 0); 5841 local_set(&reader->entries, 0); 5842 reader->read = 0; 5843 data_page->data = bpage; 5844 5845 /* 5846 * Use the real_end for the data size, 5847 * This gives us a chance to store the lost events 5848 * on the page. 5849 */ 5850 if (reader->real_end) 5851 local_set(&bpage->commit, reader->real_end); 5852 } 5853 ret = read; 5854 5855 cpu_buffer->lost_events = 0; 5856 5857 commit = local_read(&bpage->commit); 5858 /* 5859 * Set a flag in the commit field if we lost events 5860 */ 5861 if (missed_events) { 5862 /* If there is room at the end of the page to save the 5863 * missed events, then record it there. 5864 */ 5865 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 5866 memcpy(&bpage->data[commit], &missed_events, 5867 sizeof(missed_events)); 5868 local_add(RB_MISSED_STORED, &bpage->commit); 5869 commit += sizeof(missed_events); 5870 } 5871 local_add(RB_MISSED_EVENTS, &bpage->commit); 5872 } 5873 5874 /* 5875 * This page may be off to user land. Zero it out here. 5876 */ 5877 if (commit < buffer->subbuf_size) 5878 memset(&bpage->data[commit], 0, buffer->subbuf_size - commit); 5879 5880 out_unlock: 5881 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 5882 5883 out: 5884 return ret; 5885 } 5886 EXPORT_SYMBOL_GPL(ring_buffer_read_page); 5887 5888 /** 5889 * ring_buffer_read_page_data - get pointer to the data in the page. 5890 * @page: the page to get the data from 5891 * 5892 * Returns pointer to the actual data in this page. 5893 */ 5894 void *ring_buffer_read_page_data(struct buffer_data_read_page *page) 5895 { 5896 return page->data; 5897 } 5898 EXPORT_SYMBOL_GPL(ring_buffer_read_page_data); 5899 5900 /** 5901 * ring_buffer_subbuf_size_get - get size of the sub buffer. 5902 * @buffer: the buffer to get the sub buffer size from 5903 * 5904 * Returns size of the sub buffer, in bytes. 5905 */ 5906 int ring_buffer_subbuf_size_get(struct trace_buffer *buffer) 5907 { 5908 return buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 5909 } 5910 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_size_get); 5911 5912 /** 5913 * ring_buffer_subbuf_order_get - get order of system sub pages in one buffer page. 5914 * @buffer: The ring_buffer to get the system sub page order from 5915 * 5916 * By default, one ring buffer sub page equals to one system page. This parameter 5917 * is configurable, per ring buffer. The size of the ring buffer sub page can be 5918 * extended, but must be an order of system page size. 5919 * 5920 * Returns the order of buffer sub page size, in system pages: 5921 * 0 means the sub buffer size is 1 system page and so forth. 5922 * In case of an error < 0 is returned. 5923 */ 5924 int ring_buffer_subbuf_order_get(struct trace_buffer *buffer) 5925 { 5926 if (!buffer) 5927 return -EINVAL; 5928 5929 return buffer->subbuf_order; 5930 } 5931 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_get); 5932 5933 /** 5934 * ring_buffer_subbuf_order_set - set the size of ring buffer sub page. 5935 * @buffer: The ring_buffer to set the new page size. 5936 * @order: Order of the system pages in one sub buffer page 5937 * 5938 * By default, one ring buffer pages equals to one system page. This API can be 5939 * used to set new size of the ring buffer page. The size must be order of 5940 * system page size, that's why the input parameter @order is the order of 5941 * system pages that are allocated for one ring buffer page: 5942 * 0 - 1 system page 5943 * 1 - 2 system pages 5944 * 3 - 4 system pages 5945 * ... 5946 * 5947 * Returns 0 on success or < 0 in case of an error. 5948 */ 5949 int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) 5950 { 5951 struct ring_buffer_per_cpu *cpu_buffer; 5952 struct buffer_page *bpage, *tmp; 5953 int old_order, old_size; 5954 int nr_pages; 5955 int psize; 5956 int err; 5957 int cpu; 5958 5959 if (!buffer || order < 0) 5960 return -EINVAL; 5961 5962 if (buffer->subbuf_order == order) 5963 return 0; 5964 5965 psize = (1 << order) * PAGE_SIZE; 5966 if (psize <= BUF_PAGE_HDR_SIZE) 5967 return -EINVAL; 5968 5969 /* Size of a subbuf cannot be greater than the write counter */ 5970 if (psize > RB_WRITE_MASK + 1) 5971 return -EINVAL; 5972 5973 old_order = buffer->subbuf_order; 5974 old_size = buffer->subbuf_size; 5975 5976 /* prevent another thread from changing buffer sizes */ 5977 mutex_lock(&buffer->mutex); 5978 atomic_inc(&buffer->record_disabled); 5979 5980 /* Make sure all commits have finished */ 5981 synchronize_rcu(); 5982 5983 buffer->subbuf_order = order; 5984 buffer->subbuf_size = psize - BUF_PAGE_HDR_SIZE; 5985 5986 /* Make sure all new buffers are allocated, before deleting the old ones */ 5987 for_each_buffer_cpu(buffer, cpu) { 5988 5989 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 5990 continue; 5991 5992 cpu_buffer = buffer->buffers[cpu]; 5993 5994 if (cpu_buffer->mapped) { 5995 err = -EBUSY; 5996 goto error; 5997 } 5998 5999 /* Update the number of pages to match the new size */ 6000 nr_pages = old_size * buffer->buffers[cpu]->nr_pages; 6001 nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); 6002 6003 /* we need a minimum of two pages */ 6004 if (nr_pages < 2) 6005 nr_pages = 2; 6006 6007 cpu_buffer->nr_pages_to_update = nr_pages; 6008 6009 /* Include the reader page */ 6010 nr_pages++; 6011 6012 /* Allocate the new size buffer */ 6013 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6014 if (__rb_allocate_pages(cpu_buffer, nr_pages, 6015 &cpu_buffer->new_pages)) { 6016 /* not enough memory for new pages */ 6017 err = -ENOMEM; 6018 goto error; 6019 } 6020 } 6021 6022 for_each_buffer_cpu(buffer, cpu) { 6023 6024 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6025 continue; 6026 6027 cpu_buffer = buffer->buffers[cpu]; 6028 6029 /* Clear the head bit to make the link list normal to read */ 6030 rb_head_page_deactivate(cpu_buffer); 6031 6032 /* Now walk the list and free all the old sub buffers */ 6033 list_for_each_entry_safe(bpage, tmp, cpu_buffer->pages, list) { 6034 list_del_init(&bpage->list); 6035 free_buffer_page(bpage); 6036 } 6037 /* The above loop stopped an the last page needing to be freed */ 6038 bpage = list_entry(cpu_buffer->pages, struct buffer_page, list); 6039 free_buffer_page(bpage); 6040 6041 /* Free the current reader page */ 6042 free_buffer_page(cpu_buffer->reader_page); 6043 6044 /* One page was allocated for the reader page */ 6045 cpu_buffer->reader_page = list_entry(cpu_buffer->new_pages.next, 6046 struct buffer_page, list); 6047 list_del_init(&cpu_buffer->reader_page->list); 6048 6049 /* The cpu_buffer pages are a link list with no head */ 6050 cpu_buffer->pages = cpu_buffer->new_pages.next; 6051 cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev; 6052 cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next; 6053 6054 /* Clear the new_pages list */ 6055 INIT_LIST_HEAD(&cpu_buffer->new_pages); 6056 6057 cpu_buffer->head_page 6058 = list_entry(cpu_buffer->pages, struct buffer_page, list); 6059 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page; 6060 6061 cpu_buffer->nr_pages = cpu_buffer->nr_pages_to_update; 6062 cpu_buffer->nr_pages_to_update = 0; 6063 6064 free_pages((unsigned long)cpu_buffer->free_page, old_order); 6065 cpu_buffer->free_page = NULL; 6066 6067 rb_head_page_activate(cpu_buffer); 6068 6069 rb_check_pages(cpu_buffer); 6070 } 6071 6072 atomic_dec(&buffer->record_disabled); 6073 mutex_unlock(&buffer->mutex); 6074 6075 return 0; 6076 6077 error: 6078 buffer->subbuf_order = old_order; 6079 buffer->subbuf_size = old_size; 6080 6081 atomic_dec(&buffer->record_disabled); 6082 mutex_unlock(&buffer->mutex); 6083 6084 for_each_buffer_cpu(buffer, cpu) { 6085 cpu_buffer = buffer->buffers[cpu]; 6086 6087 if (!cpu_buffer->nr_pages_to_update) 6088 continue; 6089 6090 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages, list) { 6091 list_del_init(&bpage->list); 6092 free_buffer_page(bpage); 6093 } 6094 } 6095 6096 return err; 6097 } 6098 EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); 6099 6100 static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6101 { 6102 struct page *page; 6103 6104 if (cpu_buffer->meta_page) 6105 return 0; 6106 6107 page = alloc_page(GFP_USER | __GFP_ZERO); 6108 if (!page) 6109 return -ENOMEM; 6110 6111 cpu_buffer->meta_page = page_to_virt(page); 6112 6113 return 0; 6114 } 6115 6116 static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) 6117 { 6118 unsigned long addr = (unsigned long)cpu_buffer->meta_page; 6119 6120 free_page(addr); 6121 cpu_buffer->meta_page = NULL; 6122 } 6123 6124 static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, 6125 unsigned long *subbuf_ids) 6126 { 6127 struct trace_buffer_meta *meta = cpu_buffer->meta_page; 6128 unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; 6129 struct buffer_page *first_subbuf, *subbuf; 6130 int id = 0; 6131 6132 subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; 6133 cpu_buffer->reader_page->id = id++; 6134 6135 first_subbuf = subbuf = rb_set_head_page(cpu_buffer); 6136 do { 6137 if (WARN_ON(id >= nr_subbufs)) 6138 break; 6139 6140 subbuf_ids[id] = (unsigned long)subbuf->page; 6141 subbuf->id = id; 6142 6143 rb_inc_page(&subbuf); 6144 id++; 6145 } while (subbuf != first_subbuf); 6146 6147 /* install subbuf ID to kern VA translation */ 6148 cpu_buffer->subbuf_ids = subbuf_ids; 6149 6150 meta->meta_page_size = PAGE_SIZE; 6151 meta->meta_struct_len = sizeof(*meta); 6152 meta->nr_subbufs = nr_subbufs; 6153 meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; 6154 6155 rb_update_meta_page(cpu_buffer); 6156 } 6157 6158 static struct ring_buffer_per_cpu * 6159 rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) 6160 { 6161 struct ring_buffer_per_cpu *cpu_buffer; 6162 6163 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6164 return ERR_PTR(-EINVAL); 6165 6166 cpu_buffer = buffer->buffers[cpu]; 6167 6168 mutex_lock(&cpu_buffer->mapping_lock); 6169 6170 if (!cpu_buffer->mapped) { 6171 mutex_unlock(&cpu_buffer->mapping_lock); 6172 return ERR_PTR(-ENODEV); 6173 } 6174 6175 return cpu_buffer; 6176 } 6177 6178 static void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) 6179 { 6180 mutex_unlock(&cpu_buffer->mapping_lock); 6181 } 6182 6183 /* 6184 * Fast-path for rb_buffer_(un)map(). Called whenever the meta-page doesn't need 6185 * to be set-up or torn-down. 6186 */ 6187 static int __rb_inc_dec_mapped(struct ring_buffer_per_cpu *cpu_buffer, 6188 bool inc) 6189 { 6190 unsigned long flags; 6191 6192 lockdep_assert_held(&cpu_buffer->mapping_lock); 6193 6194 if (inc && cpu_buffer->mapped == UINT_MAX) 6195 return -EBUSY; 6196 6197 if (WARN_ON(!inc && cpu_buffer->mapped == 0)) 6198 return -EINVAL; 6199 6200 mutex_lock(&cpu_buffer->buffer->mutex); 6201 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6202 6203 if (inc) 6204 cpu_buffer->mapped++; 6205 else 6206 cpu_buffer->mapped--; 6207 6208 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6209 mutex_unlock(&cpu_buffer->buffer->mutex); 6210 6211 return 0; 6212 } 6213 6214 /* 6215 * +--------------+ pgoff == 0 6216 * | meta page | 6217 * +--------------+ pgoff == 1 6218 * | subbuffer 0 | 6219 * | | 6220 * +--------------+ pgoff == (1 + (1 << subbuf_order)) 6221 * | subbuffer 1 | 6222 * | | 6223 * ... 6224 */ 6225 #ifdef CONFIG_MMU 6226 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6227 struct vm_area_struct *vma) 6228 { 6229 unsigned long nr_subbufs, nr_pages, vma_pages, pgoff = vma->vm_pgoff; 6230 unsigned int subbuf_pages, subbuf_order; 6231 struct page **pages; 6232 int p = 0, s = 0; 6233 int err; 6234 6235 /* Refuse MP_PRIVATE or writable mappings */ 6236 if (vma->vm_flags & VM_WRITE || vma->vm_flags & VM_EXEC || 6237 !(vma->vm_flags & VM_MAYSHARE)) 6238 return -EPERM; 6239 6240 /* 6241 * Make sure the mapping cannot become writable later. Also tell the VM 6242 * to not touch these pages (VM_DONTCOPY | VM_DONTEXPAND). 6243 */ 6244 vm_flags_mod(vma, VM_DONTCOPY | VM_DONTEXPAND | VM_DONTDUMP, 6245 VM_MAYWRITE); 6246 6247 lockdep_assert_held(&cpu_buffer->mapping_lock); 6248 6249 subbuf_order = cpu_buffer->buffer->subbuf_order; 6250 subbuf_pages = 1 << subbuf_order; 6251 6252 nr_subbufs = cpu_buffer->nr_pages + 1; /* + reader-subbuf */ 6253 nr_pages = ((nr_subbufs) << subbuf_order) - pgoff + 1; /* + meta-page */ 6254 6255 vma_pages = (vma->vm_end - vma->vm_start) >> PAGE_SHIFT; 6256 if (!vma_pages || vma_pages > nr_pages) 6257 return -EINVAL; 6258 6259 nr_pages = vma_pages; 6260 6261 pages = kcalloc(nr_pages, sizeof(*pages), GFP_KERNEL); 6262 if (!pages) 6263 return -ENOMEM; 6264 6265 if (!pgoff) { 6266 pages[p++] = virt_to_page(cpu_buffer->meta_page); 6267 6268 /* 6269 * TODO: Align sub-buffers on their size, once 6270 * vm_insert_pages() supports the zero-page. 6271 */ 6272 } else { 6273 /* Skip the meta-page */ 6274 pgoff--; 6275 6276 if (pgoff % subbuf_pages) { 6277 err = -EINVAL; 6278 goto out; 6279 } 6280 6281 s += pgoff / subbuf_pages; 6282 } 6283 6284 while (p < nr_pages) { 6285 struct page *page = virt_to_page((void *)cpu_buffer->subbuf_ids[s]); 6286 int off = 0; 6287 6288 if (WARN_ON_ONCE(s >= nr_subbufs)) { 6289 err = -EINVAL; 6290 goto out; 6291 } 6292 6293 for (; off < (1 << (subbuf_order)); off++, page++) { 6294 if (p >= nr_pages) 6295 break; 6296 6297 pages[p++] = page; 6298 } 6299 s++; 6300 } 6301 6302 err = vm_insert_pages(vma, vma->vm_start, pages, &nr_pages); 6303 6304 out: 6305 kfree(pages); 6306 6307 return err; 6308 } 6309 #else 6310 static int __rb_map_vma(struct ring_buffer_per_cpu *cpu_buffer, 6311 struct vm_area_struct *vma) 6312 { 6313 return -EOPNOTSUPP; 6314 } 6315 #endif 6316 6317 int ring_buffer_map(struct trace_buffer *buffer, int cpu, 6318 struct vm_area_struct *vma) 6319 { 6320 struct ring_buffer_per_cpu *cpu_buffer; 6321 unsigned long flags, *subbuf_ids; 6322 int err = 0; 6323 6324 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6325 return -EINVAL; 6326 6327 cpu_buffer = buffer->buffers[cpu]; 6328 6329 mutex_lock(&cpu_buffer->mapping_lock); 6330 6331 if (cpu_buffer->mapped) { 6332 err = __rb_map_vma(cpu_buffer, vma); 6333 if (!err) 6334 err = __rb_inc_dec_mapped(cpu_buffer, true); 6335 mutex_unlock(&cpu_buffer->mapping_lock); 6336 return err; 6337 } 6338 6339 /* prevent another thread from changing buffer/sub-buffer sizes */ 6340 mutex_lock(&buffer->mutex); 6341 6342 err = rb_alloc_meta_page(cpu_buffer); 6343 if (err) 6344 goto unlock; 6345 6346 /* subbuf_ids include the reader while nr_pages does not */ 6347 subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL); 6348 if (!subbuf_ids) { 6349 rb_free_meta_page(cpu_buffer); 6350 err = -ENOMEM; 6351 goto unlock; 6352 } 6353 6354 atomic_inc(&cpu_buffer->resize_disabled); 6355 6356 /* 6357 * Lock all readers to block any subbuf swap until the subbuf IDs are 6358 * assigned. 6359 */ 6360 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6361 rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); 6362 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6363 6364 err = __rb_map_vma(cpu_buffer, vma); 6365 if (!err) { 6366 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6367 cpu_buffer->mapped = 1; 6368 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6369 } else { 6370 kfree(cpu_buffer->subbuf_ids); 6371 cpu_buffer->subbuf_ids = NULL; 6372 rb_free_meta_page(cpu_buffer); 6373 } 6374 6375 unlock: 6376 mutex_unlock(&buffer->mutex); 6377 mutex_unlock(&cpu_buffer->mapping_lock); 6378 6379 return err; 6380 } 6381 6382 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) 6383 { 6384 struct ring_buffer_per_cpu *cpu_buffer; 6385 unsigned long flags; 6386 int err = 0; 6387 6388 if (!cpumask_test_cpu(cpu, buffer->cpumask)) 6389 return -EINVAL; 6390 6391 cpu_buffer = buffer->buffers[cpu]; 6392 6393 mutex_lock(&cpu_buffer->mapping_lock); 6394 6395 if (!cpu_buffer->mapped) { 6396 err = -ENODEV; 6397 goto out; 6398 } else if (cpu_buffer->mapped > 1) { 6399 __rb_inc_dec_mapped(cpu_buffer, false); 6400 goto out; 6401 } 6402 6403 mutex_lock(&buffer->mutex); 6404 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6405 6406 cpu_buffer->mapped = 0; 6407 6408 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6409 6410 kfree(cpu_buffer->subbuf_ids); 6411 cpu_buffer->subbuf_ids = NULL; 6412 rb_free_meta_page(cpu_buffer); 6413 atomic_dec(&cpu_buffer->resize_disabled); 6414 6415 mutex_unlock(&buffer->mutex); 6416 6417 out: 6418 mutex_unlock(&cpu_buffer->mapping_lock); 6419 6420 return err; 6421 } 6422 6423 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) 6424 { 6425 struct ring_buffer_per_cpu *cpu_buffer; 6426 struct buffer_page *reader; 6427 unsigned long missed_events; 6428 unsigned long reader_size; 6429 unsigned long flags; 6430 6431 cpu_buffer = rb_get_mapped_buffer(buffer, cpu); 6432 if (IS_ERR(cpu_buffer)) 6433 return (int)PTR_ERR(cpu_buffer); 6434 6435 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); 6436 6437 consume: 6438 if (rb_per_cpu_empty(cpu_buffer)) 6439 goto out; 6440 6441 reader_size = rb_page_size(cpu_buffer->reader_page); 6442 6443 /* 6444 * There are data to be read on the current reader page, we can 6445 * return to the caller. But before that, we assume the latter will read 6446 * everything. Let's update the kernel reader accordingly. 6447 */ 6448 if (cpu_buffer->reader_page->read < reader_size) { 6449 while (cpu_buffer->reader_page->read < reader_size) 6450 rb_advance_reader(cpu_buffer); 6451 goto out; 6452 } 6453 6454 reader = rb_get_reader_page(cpu_buffer); 6455 if (WARN_ON(!reader)) 6456 goto out; 6457 6458 /* Check if any events were dropped */ 6459 missed_events = cpu_buffer->lost_events; 6460 6461 if (cpu_buffer->reader_page != cpu_buffer->commit_page) { 6462 if (missed_events) { 6463 struct buffer_data_page *bpage = reader->page; 6464 unsigned int commit; 6465 /* 6466 * Use the real_end for the data size, 6467 * This gives us a chance to store the lost events 6468 * on the page. 6469 */ 6470 if (reader->real_end) 6471 local_set(&bpage->commit, reader->real_end); 6472 /* 6473 * If there is room at the end of the page to save the 6474 * missed events, then record it there. 6475 */ 6476 commit = rb_page_size(reader); 6477 if (buffer->subbuf_size - commit >= sizeof(missed_events)) { 6478 memcpy(&bpage->data[commit], &missed_events, 6479 sizeof(missed_events)); 6480 local_add(RB_MISSED_STORED, &bpage->commit); 6481 } 6482 local_add(RB_MISSED_EVENTS, &bpage->commit); 6483 } 6484 } else { 6485 /* 6486 * There really shouldn't be any missed events if the commit 6487 * is on the reader page. 6488 */ 6489 WARN_ON_ONCE(missed_events); 6490 } 6491 6492 cpu_buffer->lost_events = 0; 6493 6494 goto consume; 6495 6496 out: 6497 /* Some archs do not have data cache coherency between kernel and user-space */ 6498 flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page)); 6499 6500 rb_update_meta_page(cpu_buffer); 6501 6502 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); 6503 rb_put_mapped_buffer(cpu_buffer); 6504 6505 return 0; 6506 } 6507 6508 /* 6509 * We only allocate new buffers, never free them if the CPU goes down. 6510 * If we were to free the buffer, then the user would lose any trace that was in 6511 * the buffer. 6512 */ 6513 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node) 6514 { 6515 struct trace_buffer *buffer; 6516 long nr_pages_same; 6517 int cpu_i; 6518 unsigned long nr_pages; 6519 6520 buffer = container_of(node, struct trace_buffer, node); 6521 if (cpumask_test_cpu(cpu, buffer->cpumask)) 6522 return 0; 6523 6524 nr_pages = 0; 6525 nr_pages_same = 1; 6526 /* check if all cpu sizes are same */ 6527 for_each_buffer_cpu(buffer, cpu_i) { 6528 /* fill in the size from first enabled cpu */ 6529 if (nr_pages == 0) 6530 nr_pages = buffer->buffers[cpu_i]->nr_pages; 6531 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) { 6532 nr_pages_same = 0; 6533 break; 6534 } 6535 } 6536 /* allocate minimum pages, user can later expand it */ 6537 if (!nr_pages_same) 6538 nr_pages = 2; 6539 buffer->buffers[cpu] = 6540 rb_allocate_cpu_buffer(buffer, nr_pages, cpu); 6541 if (!buffer->buffers[cpu]) { 6542 WARN(1, "failed to allocate ring buffer on CPU %u\n", 6543 cpu); 6544 return -ENOMEM; 6545 } 6546 smp_wmb(); 6547 cpumask_set_cpu(cpu, buffer->cpumask); 6548 return 0; 6549 } 6550 6551 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST 6552 /* 6553 * This is a basic integrity check of the ring buffer. 6554 * Late in the boot cycle this test will run when configured in. 6555 * It will kick off a thread per CPU that will go into a loop 6556 * writing to the per cpu ring buffer various sizes of data. 6557 * Some of the data will be large items, some small. 6558 * 6559 * Another thread is created that goes into a spin, sending out 6560 * IPIs to the other CPUs to also write into the ring buffer. 6561 * this is to test the nesting ability of the buffer. 6562 * 6563 * Basic stats are recorded and reported. If something in the 6564 * ring buffer should happen that's not expected, a big warning 6565 * is displayed and all ring buffers are disabled. 6566 */ 6567 static struct task_struct *rb_threads[NR_CPUS] __initdata; 6568 6569 struct rb_test_data { 6570 struct trace_buffer *buffer; 6571 unsigned long events; 6572 unsigned long bytes_written; 6573 unsigned long bytes_alloc; 6574 unsigned long bytes_dropped; 6575 unsigned long events_nested; 6576 unsigned long bytes_written_nested; 6577 unsigned long bytes_alloc_nested; 6578 unsigned long bytes_dropped_nested; 6579 int min_size_nested; 6580 int max_size_nested; 6581 int max_size; 6582 int min_size; 6583 int cpu; 6584 int cnt; 6585 }; 6586 6587 static struct rb_test_data rb_data[NR_CPUS] __initdata; 6588 6589 /* 1 meg per cpu */ 6590 #define RB_TEST_BUFFER_SIZE 1048576 6591 6592 static char rb_string[] __initdata = 6593 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\" 6594 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890" 6595 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv"; 6596 6597 static bool rb_test_started __initdata; 6598 6599 struct rb_item { 6600 int size; 6601 char str[]; 6602 }; 6603 6604 static __init int rb_write_something(struct rb_test_data *data, bool nested) 6605 { 6606 struct ring_buffer_event *event; 6607 struct rb_item *item; 6608 bool started; 6609 int event_len; 6610 int size; 6611 int len; 6612 int cnt; 6613 6614 /* Have nested writes different that what is written */ 6615 cnt = data->cnt + (nested ? 27 : 0); 6616 6617 /* Multiply cnt by ~e, to make some unique increment */ 6618 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1); 6619 6620 len = size + sizeof(struct rb_item); 6621 6622 started = rb_test_started; 6623 /* read rb_test_started before checking buffer enabled */ 6624 smp_rmb(); 6625 6626 event = ring_buffer_lock_reserve(data->buffer, len); 6627 if (!event) { 6628 /* Ignore dropped events before test starts. */ 6629 if (started) { 6630 if (nested) 6631 data->bytes_dropped += len; 6632 else 6633 data->bytes_dropped_nested += len; 6634 } 6635 return len; 6636 } 6637 6638 event_len = ring_buffer_event_length(event); 6639 6640 if (RB_WARN_ON(data->buffer, event_len < len)) 6641 goto out; 6642 6643 item = ring_buffer_event_data(event); 6644 item->size = size; 6645 memcpy(item->str, rb_string, size); 6646 6647 if (nested) { 6648 data->bytes_alloc_nested += event_len; 6649 data->bytes_written_nested += len; 6650 data->events_nested++; 6651 if (!data->min_size_nested || len < data->min_size_nested) 6652 data->min_size_nested = len; 6653 if (len > data->max_size_nested) 6654 data->max_size_nested = len; 6655 } else { 6656 data->bytes_alloc += event_len; 6657 data->bytes_written += len; 6658 data->events++; 6659 if (!data->min_size || len < data->min_size) 6660 data->max_size = len; 6661 if (len > data->max_size) 6662 data->max_size = len; 6663 } 6664 6665 out: 6666 ring_buffer_unlock_commit(data->buffer); 6667 6668 return 0; 6669 } 6670 6671 static __init int rb_test(void *arg) 6672 { 6673 struct rb_test_data *data = arg; 6674 6675 while (!kthread_should_stop()) { 6676 rb_write_something(data, false); 6677 data->cnt++; 6678 6679 set_current_state(TASK_INTERRUPTIBLE); 6680 /* Now sleep between a min of 100-300us and a max of 1ms */ 6681 usleep_range(((data->cnt % 3) + 1) * 100, 1000); 6682 } 6683 6684 return 0; 6685 } 6686 6687 static __init void rb_ipi(void *ignore) 6688 { 6689 struct rb_test_data *data; 6690 int cpu = smp_processor_id(); 6691 6692 data = &rb_data[cpu]; 6693 rb_write_something(data, true); 6694 } 6695 6696 static __init int rb_hammer_test(void *arg) 6697 { 6698 while (!kthread_should_stop()) { 6699 6700 /* Send an IPI to all cpus to write data! */ 6701 smp_call_function(rb_ipi, NULL, 1); 6702 /* No sleep, but for non preempt, let others run */ 6703 schedule(); 6704 } 6705 6706 return 0; 6707 } 6708 6709 static __init int test_ringbuffer(void) 6710 { 6711 struct task_struct *rb_hammer; 6712 struct trace_buffer *buffer; 6713 int cpu; 6714 int ret = 0; 6715 6716 if (security_locked_down(LOCKDOWN_TRACEFS)) { 6717 pr_warn("Lockdown is enabled, skipping ring buffer tests\n"); 6718 return 0; 6719 } 6720 6721 pr_info("Running ring buffer tests...\n"); 6722 6723 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE); 6724 if (WARN_ON(!buffer)) 6725 return 0; 6726 6727 /* Disable buffer so that threads can't write to it yet */ 6728 ring_buffer_record_off(buffer); 6729 6730 for_each_online_cpu(cpu) { 6731 rb_data[cpu].buffer = buffer; 6732 rb_data[cpu].cpu = cpu; 6733 rb_data[cpu].cnt = cpu; 6734 rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], 6735 cpu, "rbtester/%u"); 6736 if (WARN_ON(IS_ERR(rb_threads[cpu]))) { 6737 pr_cont("FAILED\n"); 6738 ret = PTR_ERR(rb_threads[cpu]); 6739 goto out_free; 6740 } 6741 } 6742 6743 /* Now create the rb hammer! */ 6744 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer"); 6745 if (WARN_ON(IS_ERR(rb_hammer))) { 6746 pr_cont("FAILED\n"); 6747 ret = PTR_ERR(rb_hammer); 6748 goto out_free; 6749 } 6750 6751 ring_buffer_record_on(buffer); 6752 /* 6753 * Show buffer is enabled before setting rb_test_started. 6754 * Yes there's a small race window where events could be 6755 * dropped and the thread wont catch it. But when a ring 6756 * buffer gets enabled, there will always be some kind of 6757 * delay before other CPUs see it. Thus, we don't care about 6758 * those dropped events. We care about events dropped after 6759 * the threads see that the buffer is active. 6760 */ 6761 smp_wmb(); 6762 rb_test_started = true; 6763 6764 set_current_state(TASK_INTERRUPTIBLE); 6765 /* Just run for 10 seconds */; 6766 schedule_timeout(10 * HZ); 6767 6768 kthread_stop(rb_hammer); 6769 6770 out_free: 6771 for_each_online_cpu(cpu) { 6772 if (!rb_threads[cpu]) 6773 break; 6774 kthread_stop(rb_threads[cpu]); 6775 } 6776 if (ret) { 6777 ring_buffer_free(buffer); 6778 return ret; 6779 } 6780 6781 /* Report! */ 6782 pr_info("finished\n"); 6783 for_each_online_cpu(cpu) { 6784 struct ring_buffer_event *event; 6785 struct rb_test_data *data = &rb_data[cpu]; 6786 struct rb_item *item; 6787 unsigned long total_events; 6788 unsigned long total_dropped; 6789 unsigned long total_written; 6790 unsigned long total_alloc; 6791 unsigned long total_read = 0; 6792 unsigned long total_size = 0; 6793 unsigned long total_len = 0; 6794 unsigned long total_lost = 0; 6795 unsigned long lost; 6796 int big_event_size; 6797 int small_event_size; 6798 6799 ret = -1; 6800 6801 total_events = data->events + data->events_nested; 6802 total_written = data->bytes_written + data->bytes_written_nested; 6803 total_alloc = data->bytes_alloc + data->bytes_alloc_nested; 6804 total_dropped = data->bytes_dropped + data->bytes_dropped_nested; 6805 6806 big_event_size = data->max_size + data->max_size_nested; 6807 small_event_size = data->min_size + data->min_size_nested; 6808 6809 pr_info("CPU %d:\n", cpu); 6810 pr_info(" events: %ld\n", total_events); 6811 pr_info(" dropped bytes: %ld\n", total_dropped); 6812 pr_info(" alloced bytes: %ld\n", total_alloc); 6813 pr_info(" written bytes: %ld\n", total_written); 6814 pr_info(" biggest event: %d\n", big_event_size); 6815 pr_info(" smallest event: %d\n", small_event_size); 6816 6817 if (RB_WARN_ON(buffer, total_dropped)) 6818 break; 6819 6820 ret = 0; 6821 6822 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) { 6823 total_lost += lost; 6824 item = ring_buffer_event_data(event); 6825 total_len += ring_buffer_event_length(event); 6826 total_size += item->size + sizeof(struct rb_item); 6827 if (memcmp(&item->str[0], rb_string, item->size) != 0) { 6828 pr_info("FAILED!\n"); 6829 pr_info("buffer had: %.*s\n", item->size, item->str); 6830 pr_info("expected: %.*s\n", item->size, rb_string); 6831 RB_WARN_ON(buffer, 1); 6832 ret = -1; 6833 break; 6834 } 6835 total_read++; 6836 } 6837 if (ret) 6838 break; 6839 6840 ret = -1; 6841 6842 pr_info(" read events: %ld\n", total_read); 6843 pr_info(" lost events: %ld\n", total_lost); 6844 pr_info(" total events: %ld\n", total_lost + total_read); 6845 pr_info(" recorded len bytes: %ld\n", total_len); 6846 pr_info(" recorded size bytes: %ld\n", total_size); 6847 if (total_lost) { 6848 pr_info(" With dropped events, record len and size may not match\n" 6849 " alloced and written from above\n"); 6850 } else { 6851 if (RB_WARN_ON(buffer, total_len != total_alloc || 6852 total_size != total_written)) 6853 break; 6854 } 6855 if (RB_WARN_ON(buffer, total_lost + total_read != total_events)) 6856 break; 6857 6858 ret = 0; 6859 } 6860 if (!ret) 6861 pr_info("Ring buffer PASSED!\n"); 6862 6863 ring_buffer_free(buffer); 6864 return 0; 6865 } 6866 6867 late_initcall(test_ringbuffer); 6868 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */ 6869