1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <kunit/visibility.h> 4 #include <linux/kernel.h> 5 #include <linux/irqflags.h> 6 #include <linux/string.h> 7 #include <linux/errno.h> 8 #include <linux/bug.h> 9 #include "printk_ringbuffer.h" 10 #include "internal.h" 11 12 /** 13 * DOC: printk_ringbuffer overview 14 * 15 * Data Structure 16 * -------------- 17 * The printk_ringbuffer is made up of 2 internal ringbuffers: 18 * 19 * desc_ring 20 * A ring of descriptors and their meta data (such as sequence number, 21 * timestamp, loglevel, etc.) as well as internal state information about 22 * the record and logical positions specifying where in the other 23 * ringbuffer the text strings are located. 24 * 25 * text_data_ring 26 * A ring of data blocks. A data block consists of an unsigned long 27 * integer (ID) that maps to a desc_ring index followed by the text 28 * string of the record. 29 * 30 * The internal state information of a descriptor is the key element to allow 31 * readers and writers to locklessly synchronize access to the data. 32 * 33 * Implementation 34 * -------------- 35 * 36 * Descriptor Ring 37 * ~~~~~~~~~~~~~~~ 38 * The descriptor ring is an array of descriptors. A descriptor contains 39 * essential meta data to track the data of a printk record using 40 * blk_lpos structs pointing to associated text data blocks (see 41 * "Data Rings" below). Each descriptor is assigned an ID that maps 42 * directly to index values of the descriptor array and has a state. The ID 43 * and the state are bitwise combined into a single descriptor field named 44 * @state_var, allowing ID and state to be synchronously and atomically 45 * updated. 46 * 47 * Descriptors have four states: 48 * 49 * reserved 50 * A writer is modifying the record. 51 * 52 * committed 53 * The record and all its data are written. A writer can reopen the 54 * descriptor (transitioning it back to reserved), but in the committed 55 * state the data is consistent. 56 * 57 * finalized 58 * The record and all its data are complete and available for reading. A 59 * writer cannot reopen the descriptor. 60 * 61 * reusable 62 * The record exists, but its text and/or meta data may no longer be 63 * available. 64 * 65 * Querying the @state_var of a record requires providing the ID of the 66 * descriptor to query. This can yield a possible fifth (pseudo) state: 67 * 68 * miss 69 * The descriptor being queried has an unexpected ID. 70 * 71 * The descriptor ring has a @tail_id that contains the ID of the oldest 72 * descriptor and @head_id that contains the ID of the newest descriptor. 73 * 74 * When a new descriptor should be created (and the ring is full), the tail 75 * descriptor is invalidated by first transitioning to the reusable state and 76 * then invalidating all tail data blocks up to and including the data blocks 77 * associated with the tail descriptor (for the text ring). Then 78 * @tail_id is advanced, followed by advancing @head_id. And finally the 79 * @state_var of the new descriptor is initialized to the new ID and reserved 80 * state. 81 * 82 * The @tail_id can only be advanced if the new @tail_id would be in the 83 * committed or reusable queried state. This makes it possible that a valid 84 * sequence number of the tail is always available. 85 * 86 * Descriptor Finalization 87 * ~~~~~~~~~~~~~~~~~~~~~~~ 88 * When a writer calls the commit function prb_commit(), record data is 89 * fully stored and is consistent within the ringbuffer. However, a writer can 90 * reopen that record, claiming exclusive access (as with prb_reserve()), and 91 * modify that record. When finished, the writer must again commit the record. 92 * 93 * In order for a record to be made available to readers (and also become 94 * recyclable for writers), it must be finalized. A finalized record cannot be 95 * reopened and can never become "unfinalized". Record finalization can occur 96 * in three different scenarios: 97 * 98 * 1) A writer can simultaneously commit and finalize its record by calling 99 * prb_final_commit() instead of prb_commit(). 100 * 101 * 2) When a new record is reserved and the previous record has been 102 * committed via prb_commit(), that previous record is automatically 103 * finalized. 104 * 105 * 3) When a record is committed via prb_commit() and a newer record 106 * already exists, the record being committed is automatically finalized. 107 * 108 * Data Ring 109 * ~~~~~~~~~ 110 * The text data ring is a byte array composed of data blocks. Data blocks are 111 * referenced by blk_lpos structs that point to the logical position of the 112 * beginning of a data block and the beginning of the next adjacent data 113 * block. Logical positions are mapped directly to index values of the byte 114 * array ringbuffer. 115 * 116 * Each data block consists of an ID followed by the writer data. The ID is 117 * the identifier of a descriptor that is associated with the data block. A 118 * given data block is considered valid if all of the following conditions 119 * are met: 120 * 121 * 1) The descriptor associated with the data block is in the committed 122 * or finalized queried state. 123 * 124 * 2) The blk_lpos struct within the descriptor associated with the data 125 * block references back to the same data block. 126 * 127 * 3) The data block is within the head/tail logical position range. 128 * 129 * If the writer data of a data block would extend beyond the end of the 130 * byte array, only the ID of the data block is stored at the logical 131 * position and the full data block (ID and writer data) is stored at the 132 * beginning of the byte array. The referencing blk_lpos will point to the 133 * ID before the wrap and the next data block will be at the logical 134 * position adjacent the full data block after the wrap. 135 * 136 * Data rings have a @tail_lpos that points to the beginning of the oldest 137 * data block and a @head_lpos that points to the logical position of the 138 * next (not yet existing) data block. 139 * 140 * When a new data block should be created (and the ring is full), tail data 141 * blocks will first be invalidated by putting their associated descriptors 142 * into the reusable state and then pushing the @tail_lpos forward beyond 143 * them. Then the @head_lpos is pushed forward and is associated with a new 144 * descriptor. If a data block is not valid, the @tail_lpos cannot be 145 * advanced beyond it. 146 * 147 * Info Array 148 * ~~~~~~~~~~ 149 * The general meta data of printk records are stored in printk_info structs, 150 * stored in an array with the same number of elements as the descriptor ring. 151 * Each info corresponds to the descriptor of the same index in the 152 * descriptor ring. Info validity is confirmed by evaluating the corresponding 153 * descriptor before and after loading the info. 154 * 155 * Usage 156 * ----- 157 * Here are some simple examples demonstrating writers and readers. For the 158 * examples a global ringbuffer (test_rb) is available (which is not the 159 * actual ringbuffer used by printk):: 160 * 161 * DEFINE_PRINTKRB(test_rb, 15, 5); 162 * 163 * This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of 164 * 1 MiB (2 ^ (15 + 5)) for text data. 165 * 166 * Sample writer code:: 167 * 168 * const char *textstr = "message text"; 169 * struct prb_reserved_entry e; 170 * struct printk_record r; 171 * 172 * // specify how much to allocate 173 * prb_rec_init_wr(&r, strlen(textstr) + 1); 174 * 175 * if (prb_reserve(&e, &test_rb, &r)) { 176 * snprintf(r.text_buf, r.text_buf_size, "%s", textstr); 177 * 178 * r.info->text_len = strlen(textstr); 179 * r.info->ts_nsec = local_clock(); 180 * r.info->caller_id = printk_caller_id(); 181 * 182 * // commit and finalize the record 183 * prb_final_commit(&e); 184 * } 185 * 186 * Note that additional writer functions are available to extend a record 187 * after it has been committed but not yet finalized. This can be done as 188 * long as no new records have been reserved and the caller is the same. 189 * 190 * Sample writer code (record extending):: 191 * 192 * // alternate rest of previous example 193 * 194 * r.info->text_len = strlen(textstr); 195 * r.info->ts_nsec = local_clock(); 196 * r.info->caller_id = printk_caller_id(); 197 * 198 * // commit the record (but do not finalize yet) 199 * prb_commit(&e); 200 * } 201 * 202 * ... 203 * 204 * // specify additional 5 bytes text space to extend 205 * prb_rec_init_wr(&r, 5); 206 * 207 * // try to extend, but only if it does not exceed 32 bytes 208 * if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id(), 32)) { 209 * snprintf(&r.text_buf[r.info->text_len], 210 * r.text_buf_size - r.info->text_len, "hello"); 211 * 212 * r.info->text_len += 5; 213 * 214 * // commit and finalize the record 215 * prb_final_commit(&e); 216 * } 217 * 218 * Sample reader code:: 219 * 220 * struct printk_info info; 221 * struct printk_record r; 222 * char text_buf[32]; 223 * u64 seq; 224 * 225 * prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf)); 226 * 227 * prb_for_each_record(0, &test_rb, seq, &r) { 228 * if (info.seq != seq) 229 * pr_warn("lost %llu records\n", info.seq - seq); 230 * 231 * if (info.text_len > r.text_buf_size) { 232 * pr_warn("record %llu text truncated\n", info.seq); 233 * text_buf[r.text_buf_size - 1] = 0; 234 * } 235 * 236 * pr_info("%llu: %llu: %s\n", info.seq, info.ts_nsec, 237 * &text_buf[0]); 238 * } 239 * 240 * Note that additional less convenient reader functions are available to 241 * allow complex record access. 242 * 243 * ABA Issues 244 * ~~~~~~~~~~ 245 * To help avoid ABA issues, descriptors are referenced by IDs (array index 246 * values combined with tagged bits counting array wraps) and data blocks are 247 * referenced by logical positions (array index values combined with tagged 248 * bits counting array wraps). However, on 32-bit systems the number of 249 * tagged bits is relatively small such that an ABA incident is (at least 250 * theoretically) possible. For example, if 4 million maximally sized (1KiB) 251 * printk messages were to occur in NMI context on a 32-bit system, the 252 * interrupted context would not be able to recognize that the 32-bit integer 253 * completely wrapped and thus represents a different data block than the one 254 * the interrupted context expects. 255 * 256 * To help combat this possibility, additional state checking is performed 257 * (such as using cmpxchg() even though set() would suffice). These extra 258 * checks are commented as such and will hopefully catch any ABA issue that 259 * a 32-bit system might experience. 260 * 261 * Memory Barriers 262 * ~~~~~~~~~~~~~~~ 263 * Multiple memory barriers are used. To simplify proving correctness and 264 * generating litmus tests, lines of code related to memory barriers 265 * (loads, stores, and the associated memory barriers) are labeled:: 266 * 267 * LMM(function:letter) 268 * 269 * Comments reference the labels using only the "function:letter" part. 270 * 271 * The memory barrier pairs and their ordering are: 272 * 273 * desc_reserve:D / desc_reserve:B 274 * push descriptor tail (id), then push descriptor head (id) 275 * 276 * desc_reserve:D / data_push_tail:B 277 * push data tail (lpos), then set new descriptor reserved (state) 278 * 279 * desc_reserve:D / desc_push_tail:C 280 * push descriptor tail (id), then set new descriptor reserved (state) 281 * 282 * desc_reserve:D / prb_first_seq:C 283 * push descriptor tail (id), then set new descriptor reserved (state) 284 * 285 * desc_reserve:F / desc_read:D 286 * set new descriptor id and reserved (state), then allow writer changes 287 * 288 * data_alloc:A (or data_realloc:A) / desc_read:D 289 * set old descriptor reusable (state), then modify new data block area 290 * 291 * data_alloc:A (or data_realloc:A) / data_push_tail:B 292 * push data tail (lpos), then modify new data block area 293 * 294 * _prb_commit:B / desc_read:B 295 * store writer changes, then set new descriptor committed (state) 296 * 297 * desc_reopen_last:A / _prb_commit:B 298 * set descriptor reserved (state), then read descriptor data 299 * 300 * _prb_commit:B / desc_reserve:D 301 * set new descriptor committed (state), then check descriptor head (id) 302 * 303 * data_push_tail:D / data_push_tail:A 304 * set descriptor reusable (state), then push data tail (lpos) 305 * 306 * desc_push_tail:B / desc_reserve:D 307 * set descriptor reusable (state), then push descriptor tail (id) 308 * 309 * desc_update_last_finalized:A / desc_last_finalized_seq:A 310 * store finalized record, then set new highest finalized sequence number 311 */ 312 313 #define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits) 314 #define DATA_SIZE_MASK(data_ring) (DATA_SIZE(data_ring) - 1) 315 316 #define DESCS_COUNT(desc_ring) _DESCS_COUNT((desc_ring)->count_bits) 317 #define DESCS_COUNT_MASK(desc_ring) (DESCS_COUNT(desc_ring) - 1) 318 319 /* Determine the data array index from a logical position. */ 320 #define DATA_INDEX(data_ring, lpos) ((lpos) & DATA_SIZE_MASK(data_ring)) 321 322 /* Determine the desc array index from an ID or sequence number. */ 323 #define DESC_INDEX(desc_ring, n) ((n) & DESCS_COUNT_MASK(desc_ring)) 324 325 /* Determine how many times the data array has wrapped. */ 326 #define DATA_WRAPS(data_ring, lpos) ((lpos) >> (data_ring)->size_bits) 327 328 /* Determine if a logical position refers to a data-less block. */ 329 #define LPOS_DATALESS(lpos) ((lpos) & 1UL) 330 #define BLK_DATALESS(blk) (LPOS_DATALESS((blk)->begin) && \ 331 LPOS_DATALESS((blk)->next)) 332 333 /* Get the logical position at index 0 of the current wrap. */ 334 #define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \ 335 ((lpos) & ~DATA_SIZE_MASK(data_ring)) 336 337 /* Get the ID for the same index of the previous wrap as the given ID. */ 338 #define DESC_ID_PREV_WRAP(desc_ring, id) \ 339 DESC_ID((id) - DESCS_COUNT(desc_ring)) 340 341 /* 342 * A data block: mapped directly to the beginning of the data block area 343 * specified as a logical position within the data ring. 344 * 345 * @id: the ID of the associated descriptor 346 * @data: the writer data 347 * 348 * Note that the size of a data block is only known by its associated 349 * descriptor. 350 */ 351 struct prb_data_block { 352 unsigned long id; 353 char data[]; 354 }; 355 356 /* 357 * Return the descriptor associated with @n. @n can be either a 358 * descriptor ID or a sequence number. 359 */ 360 static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n) 361 { 362 return &desc_ring->descs[DESC_INDEX(desc_ring, n)]; 363 } 364 365 /* 366 * Return the printk_info associated with @n. @n can be either a 367 * descriptor ID or a sequence number. 368 */ 369 static struct printk_info *to_info(struct prb_desc_ring *desc_ring, u64 n) 370 { 371 return &desc_ring->infos[DESC_INDEX(desc_ring, n)]; 372 } 373 374 static struct prb_data_block *to_block(struct prb_data_ring *data_ring, 375 unsigned long begin_lpos) 376 { 377 return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)]; 378 } 379 380 /* 381 * Increase the data size to account for data block meta data plus any 382 * padding so that the adjacent data block is aligned on the ID size. 383 */ 384 static unsigned int to_blk_size(unsigned int size) 385 { 386 struct prb_data_block *db = NULL; 387 388 size += sizeof(*db); 389 size = ALIGN(size, sizeof(db->id)); 390 return size; 391 } 392 393 /* 394 * Sanity checker for reserve size. The ringbuffer code assumes that a data 395 * block does not exceed the maximum possible size that could fit within the 396 * ringbuffer. This function provides that basic size check so that the 397 * assumption is safe. In particular, it guarantees that data_push_tail() will 398 * never attempt to push the tail beyond the head. 399 */ 400 static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size) 401 { 402 /* Data-less blocks take no space. */ 403 if (size == 0) 404 return true; 405 406 /* 407 * If data blocks were allowed to be larger than half the data ring 408 * size, a wrapping data block could require more space than the full 409 * ringbuffer. 410 */ 411 return to_blk_size(size) <= DATA_SIZE(data_ring) / 2; 412 } 413 414 /* 415 * Compare the current and requested logical position and decide 416 * whether more space is needed. 417 * 418 * Return false when @lpos_current is already at or beyond @lpos_target. 419 * 420 * Also return false when the difference between the positions is bigger 421 * than the size of the data buffer. It might happen only when the caller 422 * raced with another CPU(s) which already made and used the space. 423 */ 424 static bool need_more_space(struct prb_data_ring *data_ring, 425 unsigned long lpos_current, 426 unsigned long lpos_target) 427 { 428 return lpos_target - lpos_current - 1 < DATA_SIZE(data_ring); 429 } 430 431 /* Query the state of a descriptor. */ 432 static enum desc_state get_desc_state(unsigned long id, 433 unsigned long state_val) 434 { 435 if (id != DESC_ID(state_val)) 436 return desc_miss; 437 438 return DESC_STATE(state_val); 439 } 440 441 /* 442 * Get a copy of a specified descriptor and return its queried state. If the 443 * descriptor is in an inconsistent state (miss or reserved), the caller can 444 * only expect the descriptor's @state_var field to be valid. 445 * 446 * The sequence number and caller_id can be optionally retrieved. Like all 447 * non-state_var data, they are only valid if the descriptor is in a 448 * consistent state. 449 */ 450 static enum desc_state desc_read(struct prb_desc_ring *desc_ring, 451 unsigned long id, struct prb_desc *desc_out, 452 u64 *seq_out, u32 *caller_id_out) 453 { 454 struct printk_info *info = to_info(desc_ring, id); 455 struct prb_desc *desc = to_desc(desc_ring, id); 456 atomic_long_t *state_var = &desc->state_var; 457 enum desc_state d_state; 458 unsigned long state_val; 459 460 /* Check the descriptor state. */ 461 state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */ 462 d_state = get_desc_state(id, state_val); 463 if (d_state == desc_miss || d_state == desc_reserved) { 464 /* 465 * The descriptor is in an inconsistent state. Set at least 466 * @state_var so that the caller can see the details of 467 * the inconsistent state. 468 */ 469 goto out; 470 } 471 472 /* 473 * Guarantee the state is loaded before copying the descriptor 474 * content. This avoids copying obsolete descriptor content that might 475 * not apply to the descriptor state. This pairs with _prb_commit:B. 476 * 477 * Memory barrier involvement: 478 * 479 * If desc_read:A reads from _prb_commit:B, then desc_read:C reads 480 * from _prb_commit:A. 481 * 482 * Relies on: 483 * 484 * WMB from _prb_commit:A to _prb_commit:B 485 * matching 486 * RMB from desc_read:A to desc_read:C 487 */ 488 smp_rmb(); /* LMM(desc_read:B) */ 489 490 /* 491 * Copy the descriptor data. The data is not valid until the 492 * state has been re-checked. A memcpy() for all of @desc 493 * cannot be used because of the atomic_t @state_var field. 494 */ 495 if (desc_out) { 496 memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos, 497 sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */ 498 } 499 if (seq_out) 500 *seq_out = info->seq; /* also part of desc_read:C */ 501 if (caller_id_out) 502 *caller_id_out = info->caller_id; /* also part of desc_read:C */ 503 504 /* 505 * 1. Guarantee the descriptor content is loaded before re-checking 506 * the state. This avoids reading an obsolete descriptor state 507 * that may not apply to the copied content. This pairs with 508 * desc_reserve:F. 509 * 510 * Memory barrier involvement: 511 * 512 * If desc_read:C reads from desc_reserve:G, then desc_read:E 513 * reads from desc_reserve:F. 514 * 515 * Relies on: 516 * 517 * WMB from desc_reserve:F to desc_reserve:G 518 * matching 519 * RMB from desc_read:C to desc_read:E 520 * 521 * 2. Guarantee the record data is loaded before re-checking the 522 * state. This avoids reading an obsolete descriptor state that may 523 * not apply to the copied data. This pairs with data_alloc:A and 524 * data_realloc:A. 525 * 526 * Memory barrier involvement: 527 * 528 * If copy_data:A reads from data_alloc:B, then desc_read:E 529 * reads from desc_make_reusable:A. 530 * 531 * Relies on: 532 * 533 * MB from desc_make_reusable:A to data_alloc:B 534 * matching 535 * RMB from desc_read:C to desc_read:E 536 * 537 * Note: desc_make_reusable:A and data_alloc:B can be different 538 * CPUs. However, the data_alloc:B CPU (which performs the 539 * full memory barrier) must have previously seen 540 * desc_make_reusable:A. 541 */ 542 smp_rmb(); /* LMM(desc_read:D) */ 543 544 /* 545 * The data has been copied. Return the current descriptor state, 546 * which may have changed since the load above. 547 */ 548 state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */ 549 d_state = get_desc_state(id, state_val); 550 out: 551 if (desc_out) 552 atomic_long_set(&desc_out->state_var, state_val); 553 return d_state; 554 } 555 556 /* 557 * Take a specified descriptor out of the finalized state by attempting 558 * the transition from finalized to reusable. Either this context or some 559 * other context will have been successful. 560 */ 561 static void desc_make_reusable(struct prb_desc_ring *desc_ring, 562 unsigned long id) 563 { 564 unsigned long val_finalized = DESC_SV(id, desc_finalized); 565 unsigned long val_reusable = DESC_SV(id, desc_reusable); 566 struct prb_desc *desc = to_desc(desc_ring, id); 567 atomic_long_t *state_var = &desc->state_var; 568 569 atomic_long_cmpxchg_relaxed(state_var, val_finalized, 570 val_reusable); /* LMM(desc_make_reusable:A) */ 571 } 572 573 /* 574 * Given the text data ring, put the associated descriptor of each 575 * data block from @lpos_begin until @lpos_end into the reusable state. 576 * 577 * If there is any problem making the associated descriptor reusable, either 578 * the descriptor has not yet been finalized or another writer context has 579 * already pushed the tail lpos past the problematic data block. Regardless, 580 * on error the caller can re-load the tail lpos to determine the situation. 581 */ 582 static bool data_make_reusable(struct printk_ringbuffer *rb, 583 unsigned long lpos_begin, 584 unsigned long lpos_end, 585 unsigned long *lpos_out) 586 { 587 588 struct prb_data_ring *data_ring = &rb->text_data_ring; 589 struct prb_desc_ring *desc_ring = &rb->desc_ring; 590 struct prb_data_block *blk; 591 enum desc_state d_state; 592 struct prb_desc desc; 593 struct prb_data_blk_lpos *blk_lpos = &desc.text_blk_lpos; 594 unsigned long id; 595 596 /* Loop until @lpos_begin has advanced to or beyond @lpos_end. */ 597 while (need_more_space(data_ring, lpos_begin, lpos_end)) { 598 blk = to_block(data_ring, lpos_begin); 599 600 /* 601 * Load the block ID from the data block. This is a data race 602 * against a writer that may have newly reserved this data 603 * area. If the loaded value matches a valid descriptor ID, 604 * the blk_lpos of that descriptor will be checked to make 605 * sure it points back to this data block. If the check fails, 606 * the data area has been recycled by another writer. 607 */ 608 id = blk->id; /* LMM(data_make_reusable:A) */ 609 610 d_state = desc_read(desc_ring, id, &desc, 611 NULL, NULL); /* LMM(data_make_reusable:B) */ 612 613 switch (d_state) { 614 case desc_miss: 615 case desc_reserved: 616 case desc_committed: 617 return false; 618 case desc_finalized: 619 /* 620 * This data block is invalid if the descriptor 621 * does not point back to it. 622 */ 623 if (blk_lpos->begin != lpos_begin) 624 return false; 625 desc_make_reusable(desc_ring, id); 626 break; 627 case desc_reusable: 628 /* 629 * This data block is invalid if the descriptor 630 * does not point back to it. 631 */ 632 if (blk_lpos->begin != lpos_begin) 633 return false; 634 break; 635 } 636 637 /* Advance @lpos_begin to the next data block. */ 638 lpos_begin = blk_lpos->next; 639 } 640 641 *lpos_out = lpos_begin; 642 return true; 643 } 644 645 /* 646 * Advance the data ring tail to at least @lpos. This function puts 647 * descriptors into the reusable state if the tail is pushed beyond 648 * their associated data block. 649 */ 650 static bool data_push_tail(struct printk_ringbuffer *rb, unsigned long lpos) 651 { 652 struct prb_data_ring *data_ring = &rb->text_data_ring; 653 unsigned long tail_lpos_new; 654 unsigned long tail_lpos; 655 unsigned long next_lpos; 656 657 /* If @lpos is from a data-less block, there is nothing to do. */ 658 if (LPOS_DATALESS(lpos)) 659 return true; 660 661 /* 662 * Any descriptor states that have transitioned to reusable due to the 663 * data tail being pushed to this loaded value will be visible to this 664 * CPU. This pairs with data_push_tail:D. 665 * 666 * Memory barrier involvement: 667 * 668 * If data_push_tail:A reads from data_push_tail:D, then this CPU can 669 * see desc_make_reusable:A. 670 * 671 * Relies on: 672 * 673 * MB from desc_make_reusable:A to data_push_tail:D 674 * matches 675 * READFROM from data_push_tail:D to data_push_tail:A 676 * thus 677 * READFROM from desc_make_reusable:A to this CPU 678 */ 679 tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:A) */ 680 681 /* 682 * Loop until the tail lpos is at or beyond @lpos. This condition 683 * may already be satisfied, resulting in no full memory barrier 684 * from data_push_tail:D being performed. However, since this CPU 685 * sees the new tail lpos, any descriptor states that transitioned to 686 * the reusable state must already be visible. 687 */ 688 while (need_more_space(data_ring, tail_lpos, lpos)) { 689 /* 690 * Make all descriptors reusable that are associated with 691 * data blocks before @lpos. 692 */ 693 if (!data_make_reusable(rb, tail_lpos, lpos, &next_lpos)) { 694 /* 695 * 1. Guarantee the block ID loaded in 696 * data_make_reusable() is performed before 697 * reloading the tail lpos. The failed 698 * data_make_reusable() may be due to a newly 699 * recycled data area causing the tail lpos to 700 * have been previously pushed. This pairs with 701 * data_alloc:A and data_realloc:A. 702 * 703 * Memory barrier involvement: 704 * 705 * If data_make_reusable:A reads from data_alloc:B, 706 * then data_push_tail:C reads from 707 * data_push_tail:D. 708 * 709 * Relies on: 710 * 711 * MB from data_push_tail:D to data_alloc:B 712 * matching 713 * RMB from data_make_reusable:A to 714 * data_push_tail:C 715 * 716 * Note: data_push_tail:D and data_alloc:B can be 717 * different CPUs. However, the data_alloc:B 718 * CPU (which performs the full memory 719 * barrier) must have previously seen 720 * data_push_tail:D. 721 * 722 * 2. Guarantee the descriptor state loaded in 723 * data_make_reusable() is performed before 724 * reloading the tail lpos. The failed 725 * data_make_reusable() may be due to a newly 726 * recycled descriptor causing the tail lpos to 727 * have been previously pushed. This pairs with 728 * desc_reserve:D. 729 * 730 * Memory barrier involvement: 731 * 732 * If data_make_reusable:B reads from 733 * desc_reserve:F, then data_push_tail:C reads 734 * from data_push_tail:D. 735 * 736 * Relies on: 737 * 738 * MB from data_push_tail:D to desc_reserve:F 739 * matching 740 * RMB from data_make_reusable:B to 741 * data_push_tail:C 742 * 743 * Note: data_push_tail:D and desc_reserve:F can 744 * be different CPUs. However, the 745 * desc_reserve:F CPU (which performs the 746 * full memory barrier) must have previously 747 * seen data_push_tail:D. 748 */ 749 smp_rmb(); /* LMM(data_push_tail:B) */ 750 751 tail_lpos_new = atomic_long_read(&data_ring->tail_lpos 752 ); /* LMM(data_push_tail:C) */ 753 if (tail_lpos_new == tail_lpos) 754 return false; 755 756 /* Another CPU pushed the tail. Try again. */ 757 tail_lpos = tail_lpos_new; 758 continue; 759 } 760 761 /* 762 * Guarantee any descriptor states that have transitioned to 763 * reusable are stored before pushing the tail lpos. A full 764 * memory barrier is needed since other CPUs may have made 765 * the descriptor states reusable. This pairs with 766 * data_push_tail:A. 767 */ 768 if (atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos, 769 next_lpos)) { /* LMM(data_push_tail:D) */ 770 break; 771 } 772 } 773 774 return true; 775 } 776 777 /* 778 * Advance the desc ring tail. This function advances the tail by one 779 * descriptor, thus invalidating the oldest descriptor. Before advancing 780 * the tail, the tail descriptor is made reusable and all data blocks up to 781 * and including the descriptor's data block are invalidated (i.e. the data 782 * ring tail is pushed past the data block of the descriptor being made 783 * reusable). 784 */ 785 static bool desc_push_tail(struct printk_ringbuffer *rb, 786 unsigned long tail_id) 787 { 788 struct prb_desc_ring *desc_ring = &rb->desc_ring; 789 enum desc_state d_state; 790 struct prb_desc desc; 791 792 d_state = desc_read(desc_ring, tail_id, &desc, NULL, NULL); 793 794 switch (d_state) { 795 case desc_miss: 796 /* 797 * If the ID is exactly 1 wrap behind the expected, it is 798 * in the process of being reserved by another writer and 799 * must be considered reserved. 800 */ 801 if (DESC_ID(atomic_long_read(&desc.state_var)) == 802 DESC_ID_PREV_WRAP(desc_ring, tail_id)) { 803 return false; 804 } 805 806 /* 807 * The ID has changed. Another writer must have pushed the 808 * tail and recycled the descriptor already. Success is 809 * returned because the caller is only interested in the 810 * specified tail being pushed, which it was. 811 */ 812 return true; 813 case desc_reserved: 814 case desc_committed: 815 return false; 816 case desc_finalized: 817 desc_make_reusable(desc_ring, tail_id); 818 break; 819 case desc_reusable: 820 break; 821 } 822 823 /* 824 * Data blocks must be invalidated before their associated 825 * descriptor can be made available for recycling. Invalidating 826 * them later is not possible because there is no way to trust 827 * data blocks once their associated descriptor is gone. 828 */ 829 830 if (!data_push_tail(rb, desc.text_blk_lpos.next)) 831 return false; 832 833 /* 834 * Check the next descriptor after @tail_id before pushing the tail 835 * to it because the tail must always be in a finalized or reusable 836 * state. The implementation of prb_first_seq() relies on this. 837 * 838 * A successful read implies that the next descriptor is less than or 839 * equal to @head_id so there is no risk of pushing the tail past the 840 * head. 841 */ 842 d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc, 843 NULL, NULL); /* LMM(desc_push_tail:A) */ 844 845 if (d_state == desc_finalized || d_state == desc_reusable) { 846 /* 847 * Guarantee any descriptor states that have transitioned to 848 * reusable are stored before pushing the tail ID. This allows 849 * verifying the recycled descriptor state. A full memory 850 * barrier is needed since other CPUs may have made the 851 * descriptor states reusable. This pairs with desc_reserve:D. 852 */ 853 atomic_long_cmpxchg(&desc_ring->tail_id, tail_id, 854 DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */ 855 } else { 856 /* 857 * Guarantee the last state load from desc_read() is before 858 * reloading @tail_id in order to see a new tail ID in the 859 * case that the descriptor has been recycled. This pairs 860 * with desc_reserve:D. 861 * 862 * Memory barrier involvement: 863 * 864 * If desc_push_tail:A reads from desc_reserve:F, then 865 * desc_push_tail:D reads from desc_push_tail:B. 866 * 867 * Relies on: 868 * 869 * MB from desc_push_tail:B to desc_reserve:F 870 * matching 871 * RMB from desc_push_tail:A to desc_push_tail:D 872 * 873 * Note: desc_push_tail:B and desc_reserve:F can be different 874 * CPUs. However, the desc_reserve:F CPU (which performs 875 * the full memory barrier) must have previously seen 876 * desc_push_tail:B. 877 */ 878 smp_rmb(); /* LMM(desc_push_tail:C) */ 879 880 /* 881 * Re-check the tail ID. The descriptor following @tail_id is 882 * not in an allowed tail state. But if the tail has since 883 * been moved by another CPU, then it does not matter. 884 */ 885 if (atomic_long_read(&desc_ring->tail_id) == tail_id) /* LMM(desc_push_tail:D) */ 886 return false; 887 } 888 889 return true; 890 } 891 892 /* Reserve a new descriptor, invalidating the oldest if necessary. */ 893 static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out) 894 { 895 struct prb_desc_ring *desc_ring = &rb->desc_ring; 896 unsigned long prev_state_val; 897 unsigned long id_prev_wrap; 898 struct prb_desc *desc; 899 unsigned long head_id; 900 unsigned long id; 901 902 head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */ 903 904 do { 905 id = DESC_ID(head_id + 1); 906 id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id); 907 908 /* 909 * Guarantee the head ID is read before reading the tail ID. 910 * Since the tail ID is updated before the head ID, this 911 * guarantees that @id_prev_wrap is never ahead of the tail 912 * ID. This pairs with desc_reserve:D. 913 * 914 * Memory barrier involvement: 915 * 916 * If desc_reserve:A reads from desc_reserve:D, then 917 * desc_reserve:C reads from desc_push_tail:B. 918 * 919 * Relies on: 920 * 921 * MB from desc_push_tail:B to desc_reserve:D 922 * matching 923 * RMB from desc_reserve:A to desc_reserve:C 924 * 925 * Note: desc_push_tail:B and desc_reserve:D can be different 926 * CPUs. However, the desc_reserve:D CPU (which performs 927 * the full memory barrier) must have previously seen 928 * desc_push_tail:B. 929 */ 930 smp_rmb(); /* LMM(desc_reserve:B) */ 931 932 if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id 933 )) { /* LMM(desc_reserve:C) */ 934 /* 935 * Make space for the new descriptor by 936 * advancing the tail. 937 */ 938 if (!desc_push_tail(rb, id_prev_wrap)) 939 return false; 940 } 941 942 /* 943 * 1. Guarantee the tail ID is read before validating the 944 * recycled descriptor state. A read memory barrier is 945 * sufficient for this. This pairs with desc_push_tail:B. 946 * 947 * Memory barrier involvement: 948 * 949 * If desc_reserve:C reads from desc_push_tail:B, then 950 * desc_reserve:E reads from desc_make_reusable:A. 951 * 952 * Relies on: 953 * 954 * MB from desc_make_reusable:A to desc_push_tail:B 955 * matching 956 * RMB from desc_reserve:C to desc_reserve:E 957 * 958 * Note: desc_make_reusable:A and desc_push_tail:B can be 959 * different CPUs. However, the desc_push_tail:B CPU 960 * (which performs the full memory barrier) must have 961 * previously seen desc_make_reusable:A. 962 * 963 * 2. Guarantee the tail ID is stored before storing the head 964 * ID. This pairs with desc_reserve:B. 965 * 966 * 3. Guarantee any data ring tail changes are stored before 967 * recycling the descriptor. Data ring tail changes can 968 * happen via desc_push_tail()->data_push_tail(). A full 969 * memory barrier is needed since another CPU may have 970 * pushed the data ring tails. This pairs with 971 * data_push_tail:B. 972 * 973 * 4. Guarantee a new tail ID is stored before recycling the 974 * descriptor. A full memory barrier is needed since 975 * another CPU may have pushed the tail ID. This pairs 976 * with desc_push_tail:C and this also pairs with 977 * prb_first_seq:C. 978 * 979 * 5. Guarantee the head ID is stored before trying to 980 * finalize the previous descriptor. This pairs with 981 * _prb_commit:B. 982 */ 983 } while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id, 984 id)); /* LMM(desc_reserve:D) */ 985 986 desc = to_desc(desc_ring, id); 987 988 /* 989 * If the descriptor has been recycled, verify the old state val. 990 * See "ABA Issues" about why this verification is performed. 991 */ 992 prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */ 993 if (prev_state_val && 994 get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) { 995 WARN_ON_ONCE(1); 996 return false; 997 } 998 999 /* 1000 * Assign the descriptor a new ID and set its state to reserved. 1001 * See "ABA Issues" about why cmpxchg() instead of set() is used. 1002 * 1003 * Guarantee the new descriptor ID and state is stored before making 1004 * any other changes. A write memory barrier is sufficient for this. 1005 * This pairs with desc_read:D. 1006 */ 1007 if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val, 1008 DESC_SV(id, desc_reserved))) { /* LMM(desc_reserve:F) */ 1009 WARN_ON_ONCE(1); 1010 return false; 1011 } 1012 1013 /* Now data in @desc can be modified: LMM(desc_reserve:G) */ 1014 1015 *id_out = id; 1016 return true; 1017 } 1018 1019 static bool is_blk_wrapped(struct prb_data_ring *data_ring, 1020 unsigned long begin_lpos, unsigned long next_lpos) 1021 { 1022 /* 1023 * Subtract one from next_lpos since it's not actually part of this data 1024 * block. This allows perfectly fitting records to not wrap. 1025 */ 1026 return DATA_WRAPS(data_ring, begin_lpos) != 1027 DATA_WRAPS(data_ring, next_lpos - 1); 1028 } 1029 1030 /* Determine the end of a data block. */ 1031 static unsigned long get_next_lpos(struct prb_data_ring *data_ring, 1032 unsigned long lpos, unsigned int size) 1033 { 1034 unsigned long begin_lpos; 1035 unsigned long next_lpos; 1036 1037 begin_lpos = lpos; 1038 next_lpos = lpos + size; 1039 1040 /* First check if the data block does not wrap. */ 1041 if (!is_blk_wrapped(data_ring, begin_lpos, next_lpos)) 1042 return next_lpos; 1043 1044 /* Wrapping data blocks store their data at the beginning. */ 1045 return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size); 1046 } 1047 1048 /* 1049 * Allocate a new data block, invalidating the oldest data block(s) 1050 * if necessary. This function also associates the data block with 1051 * a specified descriptor. 1052 */ 1053 static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size, 1054 struct prb_data_blk_lpos *blk_lpos, unsigned long id) 1055 { 1056 struct prb_data_ring *data_ring = &rb->text_data_ring; 1057 struct prb_data_block *blk; 1058 unsigned long begin_lpos; 1059 unsigned long next_lpos; 1060 1061 if (size == 0) { 1062 /* 1063 * Data blocks are not created for empty lines. Instead, the 1064 * reader will recognize these special lpos values and handle 1065 * it appropriately. 1066 */ 1067 blk_lpos->begin = EMPTY_LINE_LPOS; 1068 blk_lpos->next = EMPTY_LINE_LPOS; 1069 return NULL; 1070 } 1071 1072 size = to_blk_size(size); 1073 1074 begin_lpos = atomic_long_read(&data_ring->head_lpos); 1075 1076 do { 1077 next_lpos = get_next_lpos(data_ring, begin_lpos, size); 1078 1079 /* 1080 * data_check_size() prevents data block allocation that could 1081 * cause illegal ringbuffer states. But double check that the 1082 * used space will not be bigger than the ring buffer. Wrapped 1083 * messages need to reserve more space, see get_next_lpos(). 1084 * 1085 * Specify a data-less block when the check or the allocation 1086 * fails. 1087 */ 1088 if (WARN_ON_ONCE(next_lpos - begin_lpos > DATA_SIZE(data_ring)) || 1089 !data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) { 1090 blk_lpos->begin = FAILED_LPOS; 1091 blk_lpos->next = FAILED_LPOS; 1092 return NULL; 1093 } 1094 1095 /* 1096 * 1. Guarantee any descriptor states that have transitioned 1097 * to reusable are stored before modifying the newly 1098 * allocated data area. A full memory barrier is needed 1099 * since other CPUs may have made the descriptor states 1100 * reusable. See data_push_tail:A about why the reusable 1101 * states are visible. This pairs with desc_read:D. 1102 * 1103 * 2. Guarantee any updated tail lpos is stored before 1104 * modifying the newly allocated data area. Another CPU may 1105 * be in data_make_reusable() and is reading a block ID 1106 * from this area. data_make_reusable() can handle reading 1107 * a garbage block ID value, but then it must be able to 1108 * load a new tail lpos. A full memory barrier is needed 1109 * since other CPUs may have updated the tail lpos. This 1110 * pairs with data_push_tail:B. 1111 */ 1112 } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos, 1113 next_lpos)); /* LMM(data_alloc:A) */ 1114 1115 blk = to_block(data_ring, begin_lpos); 1116 blk->id = id; /* LMM(data_alloc:B) */ 1117 1118 if (is_blk_wrapped(data_ring, begin_lpos, next_lpos)) { 1119 /* Wrapping data blocks store their data at the beginning. */ 1120 blk = to_block(data_ring, 0); 1121 1122 /* 1123 * Store the ID on the wrapped block for consistency. 1124 * The printk_ringbuffer does not actually use it. 1125 */ 1126 blk->id = id; 1127 } 1128 1129 blk_lpos->begin = begin_lpos; 1130 blk_lpos->next = next_lpos; 1131 1132 return &blk->data[0]; 1133 } 1134 1135 /* 1136 * Try to resize an existing data block associated with the descriptor 1137 * specified by @id. If the resized data block should become wrapped, it 1138 * copies the old data to the new data block. If @size yields a data block 1139 * with the same or less size, the data block is left as is. 1140 * 1141 * Fail if this is not the last allocated data block or if there is not 1142 * enough space or it is not possible make enough space. 1143 * 1144 * Return a pointer to the beginning of the entire data buffer or NULL on 1145 * failure. 1146 */ 1147 static char *data_realloc(struct printk_ringbuffer *rb, unsigned int size, 1148 struct prb_data_blk_lpos *blk_lpos, unsigned long id) 1149 { 1150 struct prb_data_ring *data_ring = &rb->text_data_ring; 1151 struct prb_data_block *blk; 1152 unsigned long head_lpos; 1153 unsigned long next_lpos; 1154 bool wrapped; 1155 1156 /* Reallocation only works if @blk_lpos is the newest data block. */ 1157 head_lpos = atomic_long_read(&data_ring->head_lpos); 1158 if (head_lpos != blk_lpos->next) 1159 return NULL; 1160 1161 /* Keep track if @blk_lpos was a wrapping data block. */ 1162 wrapped = is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next); 1163 1164 size = to_blk_size(size); 1165 1166 next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size); 1167 1168 /* 1169 * Use the current data block when the size does not increase, i.e. 1170 * when @head_lpos is already able to accommodate the new @next_lpos. 1171 * 1172 * Note that need_more_space() could never return false here because 1173 * the difference between the positions was bigger than the data 1174 * buffer size. The data block is reopened and can't get reused. 1175 */ 1176 if (!need_more_space(data_ring, head_lpos, next_lpos)) { 1177 if (wrapped) 1178 blk = to_block(data_ring, 0); 1179 else 1180 blk = to_block(data_ring, blk_lpos->begin); 1181 return &blk->data[0]; 1182 } 1183 1184 /* 1185 * data_check_size() prevents data block reallocation that could 1186 * cause illegal ringbuffer states. But double check that the 1187 * new used space will not be bigger than the ring buffer. Wrapped 1188 * messages need to reserve more space, see get_next_lpos(). 1189 * 1190 * Specify failure when the check or the allocation fails. 1191 */ 1192 if (WARN_ON_ONCE(next_lpos - blk_lpos->begin > DATA_SIZE(data_ring)) || 1193 !data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) { 1194 return NULL; 1195 } 1196 1197 /* The memory barrier involvement is the same as data_alloc:A. */ 1198 if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos, 1199 next_lpos)) { /* LMM(data_realloc:A) */ 1200 return NULL; 1201 } 1202 1203 blk = to_block(data_ring, blk_lpos->begin); 1204 1205 if (is_blk_wrapped(data_ring, blk_lpos->begin, next_lpos)) { 1206 struct prb_data_block *old_blk = blk; 1207 1208 /* Wrapping data blocks store their data at the beginning. */ 1209 blk = to_block(data_ring, 0); 1210 1211 /* 1212 * Store the ID on the wrapped block for consistency. 1213 * The printk_ringbuffer does not actually use it. 1214 */ 1215 blk->id = id; 1216 1217 if (!wrapped) { 1218 /* 1219 * Since the allocated space is now in the newly 1220 * created wrapping data block, copy the content 1221 * from the old data block. 1222 */ 1223 memcpy(&blk->data[0], &old_blk->data[0], 1224 (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id)); 1225 } 1226 } 1227 1228 blk_lpos->next = next_lpos; 1229 1230 return &blk->data[0]; 1231 } 1232 1233 /* Return the number of bytes used by a data block. */ 1234 static unsigned int space_used(struct prb_data_ring *data_ring, 1235 struct prb_data_blk_lpos *blk_lpos) 1236 { 1237 /* Data-less blocks take no space. */ 1238 if (BLK_DATALESS(blk_lpos)) 1239 return 0; 1240 1241 if (!is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next)) { 1242 /* Data block does not wrap. */ 1243 return (DATA_INDEX(data_ring, blk_lpos->next) - 1244 DATA_INDEX(data_ring, blk_lpos->begin)); 1245 } 1246 1247 /* 1248 * For wrapping data blocks, the trailing (wasted) space is 1249 * also counted. 1250 */ 1251 return (DATA_INDEX(data_ring, blk_lpos->next) + 1252 DATA_SIZE(data_ring) - DATA_INDEX(data_ring, blk_lpos->begin)); 1253 } 1254 1255 /* 1256 * Given @blk_lpos, return a pointer to the writer data from the data block 1257 * and calculate the size of the data part. A NULL pointer is returned if 1258 * @blk_lpos specifies values that could never be legal. 1259 * 1260 * This function (used by readers) performs strict validation on the lpos 1261 * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is 1262 * triggered if an internal error is detected. 1263 */ 1264 static const char *get_data(struct prb_data_ring *data_ring, 1265 struct prb_data_blk_lpos *blk_lpos, 1266 unsigned int *data_size) 1267 { 1268 struct prb_data_block *db; 1269 1270 /* Data-less data block description. */ 1271 if (BLK_DATALESS(blk_lpos)) { 1272 /* 1273 * Records that are just empty lines are also valid, even 1274 * though they do not have a data block. For such records 1275 * explicitly return empty string data to signify success. 1276 */ 1277 if (blk_lpos->begin == EMPTY_LINE_LPOS && 1278 blk_lpos->next == EMPTY_LINE_LPOS) { 1279 *data_size = 0; 1280 return ""; 1281 } 1282 1283 /* Data lost, invalid, or otherwise unavailable. */ 1284 return NULL; 1285 } 1286 1287 /* Regular data block: @begin and @next in the same wrap. */ 1288 if (!is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next)) { 1289 db = to_block(data_ring, blk_lpos->begin); 1290 *data_size = blk_lpos->next - blk_lpos->begin; 1291 1292 /* Wrapping data block: @begin is one wrap behind @next. */ 1293 } else if (!is_blk_wrapped(data_ring, 1294 blk_lpos->begin + DATA_SIZE(data_ring), 1295 blk_lpos->next)) { 1296 db = to_block(data_ring, 0); 1297 *data_size = DATA_INDEX(data_ring, blk_lpos->next); 1298 1299 /* Illegal block description. */ 1300 } else { 1301 WARN_ON_ONCE(1); 1302 return NULL; 1303 } 1304 1305 /* A valid data block will always be aligned to the ID size. */ 1306 if (WARN_ON_ONCE(blk_lpos->begin != ALIGN(blk_lpos->begin, sizeof(db->id))) || 1307 WARN_ON_ONCE(blk_lpos->next != ALIGN(blk_lpos->next, sizeof(db->id)))) { 1308 return NULL; 1309 } 1310 1311 /* 1312 * A regular data block will always have an ID and at least 1313 * 1 byte of data. Data-less blocks were handled earlier. 1314 */ 1315 if (WARN_ON_ONCE(*data_size <= sizeof(db->id))) 1316 return NULL; 1317 1318 /* Subtract block ID space from size to reflect data size. */ 1319 *data_size -= sizeof(db->id); 1320 1321 /* Sanity check the max size of the regular data block. */ 1322 if (WARN_ON_ONCE(!data_check_size(data_ring, *data_size))) 1323 return NULL; 1324 1325 return &db->data[0]; 1326 } 1327 1328 /* 1329 * Attempt to transition the newest descriptor from committed back to reserved 1330 * so that the record can be modified by a writer again. This is only possible 1331 * if the descriptor is not yet finalized and the provided @caller_id matches. 1332 */ 1333 static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring, 1334 u32 caller_id, unsigned long *id_out) 1335 { 1336 unsigned long prev_state_val; 1337 enum desc_state d_state; 1338 struct prb_desc desc; 1339 struct prb_desc *d; 1340 unsigned long id; 1341 u32 cid; 1342 1343 id = atomic_long_read(&desc_ring->head_id); 1344 1345 /* 1346 * To reduce unnecessarily reopening, first check if the descriptor 1347 * state and caller ID are correct. 1348 */ 1349 d_state = desc_read(desc_ring, id, &desc, NULL, &cid); 1350 if (d_state != desc_committed || cid != caller_id) 1351 return NULL; 1352 1353 d = to_desc(desc_ring, id); 1354 1355 prev_state_val = DESC_SV(id, desc_committed); 1356 1357 /* 1358 * Guarantee the reserved state is stored before reading any 1359 * record data. A full memory barrier is needed because @state_var 1360 * modification is followed by reading. This pairs with _prb_commit:B. 1361 * 1362 * Memory barrier involvement: 1363 * 1364 * If desc_reopen_last:A reads from _prb_commit:B, then 1365 * prb_reserve_in_last:A reads from _prb_commit:A. 1366 * 1367 * Relies on: 1368 * 1369 * WMB from _prb_commit:A to _prb_commit:B 1370 * matching 1371 * MB from desc_reopen_last:A to prb_reserve_in_last:A 1372 */ 1373 if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val, 1374 DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */ 1375 return NULL; 1376 } 1377 1378 *id_out = id; 1379 return d; 1380 } 1381 1382 /** 1383 * prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer 1384 * used by the newest record. 1385 * 1386 * @e: The entry structure to setup. 1387 * @rb: The ringbuffer to re-reserve and extend data in. 1388 * @r: The record structure to allocate buffers for. 1389 * @caller_id: The caller ID of the caller (reserving writer). 1390 * @max_size: Fail if the extended size would be greater than this. 1391 * 1392 * This is the public function available to writers to re-reserve and extend 1393 * data. 1394 * 1395 * The writer specifies the text size to extend (not the new total size) by 1396 * setting the @text_buf_size field of @r. To ensure proper initialization 1397 * of @r, prb_rec_init_wr() should be used. 1398 * 1399 * This function will fail if @caller_id does not match the caller ID of the 1400 * newest record. In that case the caller must reserve new data using 1401 * prb_reserve(). 1402 * 1403 * Context: Any context. Disables local interrupts on success. 1404 * Return: true if text data could be extended, otherwise false. 1405 * 1406 * On success: 1407 * 1408 * - @r->text_buf points to the beginning of the entire text buffer. 1409 * 1410 * - @r->text_buf_size is set to the new total size of the buffer. 1411 * 1412 * - @r->info is not touched so that @r->info->text_len could be used 1413 * to append the text. 1414 * 1415 * - prb_record_text_space() can be used on @e to query the new 1416 * actually used space. 1417 * 1418 * Important: All @r->info fields will already be set with the current values 1419 * for the record. I.e. @r->info->text_len will be less than 1420 * @text_buf_size. Writers can use @r->info->text_len to know 1421 * where concatenation begins and writers should update 1422 * @r->info->text_len after concatenating. 1423 */ 1424 bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, 1425 struct printk_record *r, u32 caller_id, unsigned int max_size) 1426 { 1427 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1428 struct printk_info *info; 1429 unsigned int data_size; 1430 struct prb_desc *d; 1431 unsigned long id; 1432 1433 local_irq_save(e->irqflags); 1434 1435 /* Transition the newest descriptor back to the reserved state. */ 1436 d = desc_reopen_last(desc_ring, caller_id, &id); 1437 if (!d) { 1438 local_irq_restore(e->irqflags); 1439 goto fail_reopen; 1440 } 1441 1442 /* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */ 1443 1444 info = to_info(desc_ring, id); 1445 1446 /* 1447 * Set the @e fields here so that prb_commit() can be used if 1448 * anything fails from now on. 1449 */ 1450 e->rb = rb; 1451 e->id = id; 1452 1453 /* 1454 * desc_reopen_last() checked the caller_id, but there was no 1455 * exclusive access at that point. The descriptor may have 1456 * changed since then. 1457 */ 1458 if (caller_id != info->caller_id) 1459 goto fail; 1460 1461 if (BLK_DATALESS(&d->text_blk_lpos)) { 1462 if (WARN_ON_ONCE(info->text_len != 0)) { 1463 pr_warn_once("wrong text_len value (%hu, expecting 0)\n", 1464 info->text_len); 1465 info->text_len = 0; 1466 } 1467 1468 if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) 1469 goto fail; 1470 1471 if (r->text_buf_size > max_size) 1472 goto fail; 1473 1474 r->text_buf = data_alloc(rb, r->text_buf_size, 1475 &d->text_blk_lpos, id); 1476 } else { 1477 if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size)) 1478 goto fail; 1479 1480 /* 1481 * Increase the buffer size to include the original size. If 1482 * the meta data (@text_len) is not sane, use the full data 1483 * block size. 1484 */ 1485 if (WARN_ON_ONCE(info->text_len > data_size)) { 1486 pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n", 1487 info->text_len, data_size); 1488 info->text_len = data_size; 1489 } 1490 r->text_buf_size += info->text_len; 1491 1492 if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) 1493 goto fail; 1494 1495 if (r->text_buf_size > max_size) 1496 goto fail; 1497 1498 r->text_buf = data_realloc(rb, r->text_buf_size, 1499 &d->text_blk_lpos, id); 1500 } 1501 if (r->text_buf_size && !r->text_buf) 1502 goto fail; 1503 1504 r->info = info; 1505 1506 e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos); 1507 1508 return true; 1509 fail: 1510 prb_commit(e); 1511 /* prb_commit() re-enabled interrupts. */ 1512 fail_reopen: 1513 /* Make it clear to the caller that the re-reserve failed. */ 1514 memset(r, 0, sizeof(*r)); 1515 return false; 1516 } 1517 1518 /* 1519 * @last_finalized_seq value guarantees that all records up to and including 1520 * this sequence number are finalized and can be read. The only exception are 1521 * too old records which have already been overwritten. 1522 * 1523 * It is also guaranteed that @last_finalized_seq only increases. 1524 * 1525 * Be aware that finalized records following non-finalized records are not 1526 * reported because they are not yet available to the reader. For example, 1527 * a new record stored via printk() will not be available to a printer if 1528 * it follows a record that has not been finalized yet. However, once that 1529 * non-finalized record becomes finalized, @last_finalized_seq will be 1530 * appropriately updated and the full set of finalized records will be 1531 * available to the printer. And since each printk() caller will either 1532 * directly print or trigger deferred printing of all available unprinted 1533 * records, all printk() messages will get printed. 1534 */ 1535 static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb) 1536 { 1537 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1538 unsigned long ulseq; 1539 1540 /* 1541 * Guarantee the sequence number is loaded before loading the 1542 * associated record in order to guarantee that the record can be 1543 * seen by this CPU. This pairs with desc_update_last_finalized:A. 1544 */ 1545 ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq 1546 ); /* LMM(desc_last_finalized_seq:A) */ 1547 1548 return __ulseq_to_u64seq(rb, ulseq); 1549 } 1550 1551 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, 1552 struct printk_record *r, unsigned int *line_count); 1553 1554 /* 1555 * Check if there are records directly following @last_finalized_seq that are 1556 * finalized. If so, update @last_finalized_seq to the latest of these 1557 * records. It is not allowed to skip over records that are not yet finalized. 1558 */ 1559 static void desc_update_last_finalized(struct printk_ringbuffer *rb) 1560 { 1561 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1562 u64 old_seq = desc_last_finalized_seq(rb); 1563 unsigned long oldval; 1564 unsigned long newval; 1565 u64 finalized_seq; 1566 u64 try_seq; 1567 1568 try_again: 1569 finalized_seq = old_seq; 1570 try_seq = finalized_seq + 1; 1571 1572 /* Try to find later finalized records. */ 1573 while (_prb_read_valid(rb, &try_seq, NULL, NULL)) { 1574 finalized_seq = try_seq; 1575 try_seq++; 1576 } 1577 1578 /* No update needed if no later finalized record was found. */ 1579 if (finalized_seq == old_seq) 1580 return; 1581 1582 oldval = __u64seq_to_ulseq(old_seq); 1583 newval = __u64seq_to_ulseq(finalized_seq); 1584 1585 /* 1586 * Set the sequence number of a later finalized record that has been 1587 * seen. 1588 * 1589 * Guarantee the record data is visible to other CPUs before storing 1590 * its sequence number. This pairs with desc_last_finalized_seq:A. 1591 * 1592 * Memory barrier involvement: 1593 * 1594 * If desc_last_finalized_seq:A reads from 1595 * desc_update_last_finalized:A, then desc_read:A reads from 1596 * _prb_commit:B. 1597 * 1598 * Relies on: 1599 * 1600 * RELEASE from _prb_commit:B to desc_update_last_finalized:A 1601 * matching 1602 * ACQUIRE from desc_last_finalized_seq:A to desc_read:A 1603 * 1604 * Note: _prb_commit:B and desc_update_last_finalized:A can be 1605 * different CPUs. However, the desc_update_last_finalized:A 1606 * CPU (which performs the release) must have previously seen 1607 * _prb_commit:B. 1608 */ 1609 if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq, 1610 &oldval, newval)) { /* LMM(desc_update_last_finalized:A) */ 1611 old_seq = __ulseq_to_u64seq(rb, oldval); 1612 goto try_again; 1613 } 1614 } 1615 1616 /* 1617 * Attempt to finalize a specified descriptor. If this fails, the descriptor 1618 * is either already final or it will finalize itself when the writer commits. 1619 */ 1620 static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id) 1621 { 1622 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1623 unsigned long prev_state_val = DESC_SV(id, desc_committed); 1624 struct prb_desc *d = to_desc(desc_ring, id); 1625 1626 if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val, 1627 DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */ 1628 desc_update_last_finalized(rb); 1629 } 1630 } 1631 1632 /** 1633 * prb_reserve() - Reserve space in the ringbuffer. 1634 * 1635 * @e: The entry structure to setup. 1636 * @rb: The ringbuffer to reserve data in. 1637 * @r: The record structure to allocate buffers for. 1638 * 1639 * This is the public function available to writers to reserve data. 1640 * 1641 * The writer specifies the text size to reserve by setting the 1642 * @text_buf_size field of @r. To ensure proper initialization of @r, 1643 * prb_rec_init_wr() should be used. 1644 * 1645 * Context: Any context. Disables local interrupts on success. 1646 * Return: true if at least text data could be allocated, otherwise false. 1647 * 1648 * On success, the fields @info and @text_buf of @r will be set by this 1649 * function and should be filled in by the writer before committing. Also 1650 * on success, prb_record_text_space() can be used on @e to query the actual 1651 * space used for the text data block. 1652 * 1653 * Important: @info->text_len needs to be set correctly by the writer in 1654 * order for data to be readable and/or extended. Its value 1655 * is initialized to 0. 1656 */ 1657 bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, 1658 struct printk_record *r) 1659 { 1660 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1661 struct printk_info *info; 1662 struct prb_desc *d; 1663 unsigned long id; 1664 u64 seq; 1665 1666 if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) 1667 goto fail; 1668 1669 /* 1670 * Descriptors in the reserved state act as blockers to all further 1671 * reservations once the desc_ring has fully wrapped. Disable 1672 * interrupts during the reserve/commit window in order to minimize 1673 * the likelihood of this happening. 1674 */ 1675 local_irq_save(e->irqflags); 1676 1677 if (!desc_reserve(rb, &id)) { 1678 /* Descriptor reservation failures are tracked. */ 1679 atomic_long_inc(&rb->fail); 1680 local_irq_restore(e->irqflags); 1681 goto fail; 1682 } 1683 1684 d = to_desc(desc_ring, id); 1685 info = to_info(desc_ring, id); 1686 1687 /* 1688 * All @info fields (except @seq) are cleared and must be filled in 1689 * by the writer. Save @seq before clearing because it is used to 1690 * determine the new sequence number. 1691 */ 1692 seq = info->seq; 1693 memset(info, 0, sizeof(*info)); 1694 1695 /* 1696 * Set the @e fields here so that prb_commit() can be used if 1697 * text data allocation fails. 1698 */ 1699 e->rb = rb; 1700 e->id = id; 1701 1702 /* 1703 * Initialize the sequence number if it has "never been set". 1704 * Otherwise just increment it by a full wrap. 1705 * 1706 * @seq is considered "never been set" if it has a value of 0, 1707 * _except_ for @infos[0], which was specially setup by the ringbuffer 1708 * initializer and therefore is always considered as set. 1709 * 1710 * See the "Bootstrap" comment block in printk_ringbuffer.h for 1711 * details about how the initializer bootstraps the descriptors. 1712 */ 1713 if (seq == 0 && DESC_INDEX(desc_ring, id) != 0) 1714 info->seq = DESC_INDEX(desc_ring, id); 1715 else 1716 info->seq = seq + DESCS_COUNT(desc_ring); 1717 1718 /* 1719 * New data is about to be reserved. Once that happens, previous 1720 * descriptors are no longer able to be extended. Finalize the 1721 * previous descriptor now so that it can be made available to 1722 * readers. (For seq==0 there is no previous descriptor.) 1723 */ 1724 if (info->seq > 0) 1725 desc_make_final(rb, DESC_ID(id - 1)); 1726 1727 r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id); 1728 /* If text data allocation fails, a data-less record is committed. */ 1729 if (r->text_buf_size && !r->text_buf) { 1730 prb_commit(e); 1731 /* prb_commit() re-enabled interrupts. */ 1732 goto fail; 1733 } 1734 1735 r->info = info; 1736 1737 /* Record full text space used by record. */ 1738 e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos); 1739 1740 return true; 1741 fail: 1742 /* Make it clear to the caller that the reserve failed. */ 1743 memset(r, 0, sizeof(*r)); 1744 return false; 1745 } 1746 EXPORT_SYMBOL_IF_KUNIT(prb_reserve); 1747 1748 /* Commit the data (possibly finalizing it) and restore interrupts. */ 1749 static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val) 1750 { 1751 struct prb_desc_ring *desc_ring = &e->rb->desc_ring; 1752 struct prb_desc *d = to_desc(desc_ring, e->id); 1753 unsigned long prev_state_val = DESC_SV(e->id, desc_reserved); 1754 1755 /* Now the writer has finished all writing: LMM(_prb_commit:A) */ 1756 1757 /* 1758 * Set the descriptor as committed. See "ABA Issues" about why 1759 * cmpxchg() instead of set() is used. 1760 * 1761 * 1 Guarantee all record data is stored before the descriptor state 1762 * is stored as committed. A write memory barrier is sufficient 1763 * for this. This pairs with desc_read:B and desc_reopen_last:A. 1764 * 1765 * 2. Guarantee the descriptor state is stored as committed before 1766 * re-checking the head ID in order to possibly finalize this 1767 * descriptor. This pairs with desc_reserve:D. 1768 * 1769 * Memory barrier involvement: 1770 * 1771 * If prb_commit:A reads from desc_reserve:D, then 1772 * desc_make_final:A reads from _prb_commit:B. 1773 * 1774 * Relies on: 1775 * 1776 * MB from _prb_commit:B to prb_commit:A 1777 * matching 1778 * MB from desc_reserve:D to desc_make_final:A 1779 */ 1780 if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val, 1781 DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */ 1782 WARN_ON_ONCE(1); 1783 } 1784 1785 /* Restore interrupts, the reserve/commit window is finished. */ 1786 local_irq_restore(e->irqflags); 1787 } 1788 1789 /** 1790 * prb_commit() - Commit (previously reserved) data to the ringbuffer. 1791 * 1792 * @e: The entry containing the reserved data information. 1793 * 1794 * This is the public function available to writers to commit data. 1795 * 1796 * Note that the data is not yet available to readers until it is finalized. 1797 * Finalizing happens automatically when space for the next record is 1798 * reserved. 1799 * 1800 * See prb_final_commit() for a version of this function that finalizes 1801 * immediately. 1802 * 1803 * Context: Any context. Enables local interrupts. 1804 */ 1805 void prb_commit(struct prb_reserved_entry *e) 1806 { 1807 struct prb_desc_ring *desc_ring = &e->rb->desc_ring; 1808 unsigned long head_id; 1809 1810 _prb_commit(e, desc_committed); 1811 1812 /* 1813 * If this descriptor is no longer the head (i.e. a new record has 1814 * been allocated), extending the data for this record is no longer 1815 * allowed and therefore it must be finalized. 1816 */ 1817 head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */ 1818 if (head_id != e->id) 1819 desc_make_final(e->rb, e->id); 1820 } 1821 EXPORT_SYMBOL_IF_KUNIT(prb_commit); 1822 1823 /** 1824 * prb_final_commit() - Commit and finalize (previously reserved) data to 1825 * the ringbuffer. 1826 * 1827 * @e: The entry containing the reserved data information. 1828 * 1829 * This is the public function available to writers to commit+finalize data. 1830 * 1831 * By finalizing, the data is made immediately available to readers. 1832 * 1833 * This function should only be used if there are no intentions of extending 1834 * this data using prb_reserve_in_last(). 1835 * 1836 * Context: Any context. Enables local interrupts. 1837 */ 1838 void prb_final_commit(struct prb_reserved_entry *e) 1839 { 1840 _prb_commit(e, desc_finalized); 1841 1842 desc_update_last_finalized(e->rb); 1843 } 1844 1845 /* 1846 * Count the number of lines in provided text. All text has at least 1 line 1847 * (even if @text_size is 0). Each '\n' processed is counted as an additional 1848 * line. 1849 */ 1850 static unsigned int count_lines(const char *text, unsigned int text_size) 1851 { 1852 unsigned int next_size = text_size; 1853 unsigned int line_count = 1; 1854 const char *next = text; 1855 1856 while (next_size) { 1857 next = memchr(next, '\n', next_size); 1858 if (!next) 1859 break; 1860 line_count++; 1861 next++; 1862 next_size = text_size - (next - text); 1863 } 1864 1865 return line_count; 1866 } 1867 1868 /* 1869 * Given @blk_lpos, copy an expected @len of data into the provided buffer. 1870 * If @line_count is provided, count the number of lines in the data. 1871 * 1872 * This function (used by readers) performs strict validation on the data 1873 * size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is 1874 * triggered if an internal error is detected. 1875 */ 1876 static bool copy_data(struct prb_data_ring *data_ring, 1877 struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf, 1878 unsigned int buf_size, unsigned int *line_count) 1879 { 1880 unsigned int data_size; 1881 const char *data; 1882 1883 /* Caller might not want any data. */ 1884 if ((!buf || !buf_size) && !line_count) 1885 return true; 1886 1887 data = get_data(data_ring, blk_lpos, &data_size); 1888 if (!data) 1889 return false; 1890 1891 /* 1892 * Actual cannot be less than expected. It can be more than expected 1893 * because of the trailing alignment padding. 1894 * 1895 * Note that invalid @len values can occur because the caller loads 1896 * the value during an allowed data race. 1897 */ 1898 if (data_size < (unsigned int)len) 1899 return false; 1900 1901 /* Caller interested in the line count? */ 1902 if (line_count) 1903 *line_count = count_lines(data, len); 1904 1905 /* Caller interested in the data content? */ 1906 if (!buf || !buf_size) 1907 return true; 1908 1909 data_size = min_t(unsigned int, buf_size, len); 1910 1911 memcpy(&buf[0], data, data_size); /* LMM(copy_data:A) */ 1912 return true; 1913 } 1914 1915 /* 1916 * This is an extended version of desc_read(). It gets a copy of a specified 1917 * descriptor. However, it also verifies that the record is finalized and has 1918 * the sequence number @seq. On success, 0 is returned. 1919 * 1920 * Error return values: 1921 * -EINVAL: A finalized record with sequence number @seq does not exist. 1922 * -ENOENT: A finalized record with sequence number @seq exists, but its data 1923 * is not available. This is a valid record, so readers should 1924 * continue with the next record. 1925 */ 1926 static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring, 1927 unsigned long id, u64 seq, 1928 struct prb_desc *desc_out) 1929 { 1930 struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos; 1931 enum desc_state d_state; 1932 u64 s; 1933 1934 d_state = desc_read(desc_ring, id, desc_out, &s, NULL); 1935 1936 /* 1937 * An unexpected @id (desc_miss) or @seq mismatch means the record 1938 * does not exist. A descriptor in the reserved or committed state 1939 * means the record does not yet exist for the reader. 1940 */ 1941 if (d_state == desc_miss || 1942 d_state == desc_reserved || 1943 d_state == desc_committed || 1944 s != seq) { 1945 return -EINVAL; 1946 } 1947 1948 /* 1949 * A descriptor in the reusable state may no longer have its data 1950 * available; report it as existing but with lost data. Or the record 1951 * may actually be a record with lost data. 1952 */ 1953 if (d_state == desc_reusable || 1954 (blk_lpos->begin == FAILED_LPOS && blk_lpos->next == FAILED_LPOS)) { 1955 return -ENOENT; 1956 } 1957 1958 return 0; 1959 } 1960 1961 /* 1962 * Copy the ringbuffer data from the record with @seq to the provided 1963 * @r buffer. On success, 0 is returned. 1964 * 1965 * See desc_read_finalized_seq() for error return values. 1966 */ 1967 static int prb_read(struct printk_ringbuffer *rb, u64 seq, 1968 struct printk_record *r, unsigned int *line_count) 1969 { 1970 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1971 struct printk_info *info = to_info(desc_ring, seq); 1972 struct prb_desc *rdesc = to_desc(desc_ring, seq); 1973 atomic_long_t *state_var = &rdesc->state_var; 1974 struct prb_desc desc; 1975 unsigned long id; 1976 int err; 1977 1978 /* Extract the ID, used to specify the descriptor to read. */ 1979 id = DESC_ID(atomic_long_read(state_var)); 1980 1981 /* Get a local copy of the correct descriptor (if available). */ 1982 err = desc_read_finalized_seq(desc_ring, id, seq, &desc); 1983 1984 /* 1985 * If @r is NULL, the caller is only interested in the availability 1986 * of the record. 1987 */ 1988 if (err || !r) 1989 return err; 1990 1991 /* If requested, copy meta data. */ 1992 if (r->info) 1993 memcpy(r->info, info, sizeof(*(r->info))); 1994 1995 /* Copy text data. If it fails, this is a data-less record. */ 1996 if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len, 1997 r->text_buf, r->text_buf_size, line_count)) { 1998 return -ENOENT; 1999 } 2000 2001 /* Ensure the record is still finalized and has the same @seq. */ 2002 return desc_read_finalized_seq(desc_ring, id, seq, &desc); 2003 } 2004 2005 /* Get the sequence number of the tail descriptor. */ 2006 u64 prb_first_seq(struct printk_ringbuffer *rb) 2007 { 2008 struct prb_desc_ring *desc_ring = &rb->desc_ring; 2009 enum desc_state d_state; 2010 struct prb_desc desc; 2011 unsigned long id; 2012 u64 seq; 2013 2014 for (;;) { 2015 id = atomic_long_read(&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */ 2016 2017 d_state = desc_read(desc_ring, id, &desc, &seq, NULL); /* LMM(prb_first_seq:B) */ 2018 2019 /* 2020 * This loop will not be infinite because the tail is 2021 * _always_ in the finalized or reusable state. 2022 */ 2023 if (d_state == desc_finalized || d_state == desc_reusable) 2024 break; 2025 2026 /* 2027 * Guarantee the last state load from desc_read() is before 2028 * reloading @tail_id in order to see a new tail in the case 2029 * that the descriptor has been recycled. This pairs with 2030 * desc_reserve:D. 2031 * 2032 * Memory barrier involvement: 2033 * 2034 * If prb_first_seq:B reads from desc_reserve:F, then 2035 * prb_first_seq:A reads from desc_push_tail:B. 2036 * 2037 * Relies on: 2038 * 2039 * MB from desc_push_tail:B to desc_reserve:F 2040 * matching 2041 * RMB from prb_first_seq:B to prb_first_seq:A 2042 */ 2043 smp_rmb(); /* LMM(prb_first_seq:C) */ 2044 } 2045 2046 return seq; 2047 } 2048 2049 /** 2050 * prb_next_reserve_seq() - Get the sequence number after the most recently 2051 * reserved record. 2052 * 2053 * @rb: The ringbuffer to get the sequence number from. 2054 * 2055 * This is the public function available to readers to see what sequence 2056 * number will be assigned to the next reserved record. 2057 * 2058 * Note that depending on the situation, this value can be equal to or 2059 * higher than the sequence number returned by prb_next_seq(). 2060 * 2061 * Context: Any context. 2062 * Return: The sequence number that will be assigned to the next record 2063 * reserved. 2064 */ 2065 u64 prb_next_reserve_seq(struct printk_ringbuffer *rb) 2066 { 2067 struct prb_desc_ring *desc_ring = &rb->desc_ring; 2068 unsigned long last_finalized_id; 2069 atomic_long_t *state_var; 2070 u64 last_finalized_seq; 2071 unsigned long head_id; 2072 struct prb_desc desc; 2073 unsigned long diff; 2074 struct prb_desc *d; 2075 int err; 2076 2077 /* 2078 * It may not be possible to read a sequence number for @head_id. 2079 * So the ID of @last_finailzed_seq is used to calculate what the 2080 * sequence number of @head_id will be. 2081 */ 2082 2083 try_again: 2084 last_finalized_seq = desc_last_finalized_seq(rb); 2085 2086 /* 2087 * @head_id is loaded after @last_finalized_seq to ensure that 2088 * it points to the record with @last_finalized_seq or newer. 2089 * 2090 * Memory barrier involvement: 2091 * 2092 * If desc_last_finalized_seq:A reads from 2093 * desc_update_last_finalized:A, then 2094 * prb_next_reserve_seq:A reads from desc_reserve:D. 2095 * 2096 * Relies on: 2097 * 2098 * RELEASE from desc_reserve:D to desc_update_last_finalized:A 2099 * matching 2100 * ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A 2101 * 2102 * Note: desc_reserve:D and desc_update_last_finalized:A can be 2103 * different CPUs. However, the desc_update_last_finalized:A CPU 2104 * (which performs the release) must have previously seen 2105 * desc_read:C, which implies desc_reserve:D can be seen. 2106 */ 2107 head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */ 2108 2109 d = to_desc(desc_ring, last_finalized_seq); 2110 state_var = &d->state_var; 2111 2112 /* Extract the ID, used to specify the descriptor to read. */ 2113 last_finalized_id = DESC_ID(atomic_long_read(state_var)); 2114 2115 /* Ensure @last_finalized_id is correct. */ 2116 err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc); 2117 2118 if (err == -EINVAL) { 2119 if (last_finalized_seq == 0) { 2120 /* 2121 * No record has been finalized or even reserved yet. 2122 * 2123 * The @head_id is initialized such that the first 2124 * increment will yield the first record (seq=0). 2125 * Handle it separately to avoid a negative @diff 2126 * below. 2127 */ 2128 if (head_id == DESC0_ID(desc_ring->count_bits)) 2129 return 0; 2130 2131 /* 2132 * One or more descriptors are already reserved. Use 2133 * the descriptor ID of the first one (@seq=0) for 2134 * the @diff below. 2135 */ 2136 last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1; 2137 } else { 2138 /* Record must have been overwritten. Try again. */ 2139 goto try_again; 2140 } 2141 } 2142 2143 /* Diff of known descriptor IDs to compute related sequence numbers. */ 2144 diff = head_id - last_finalized_id; 2145 2146 /* 2147 * @head_id points to the most recently reserved record, but this 2148 * function returns the sequence number that will be assigned to the 2149 * next (not yet reserved) record. Thus +1 is needed. 2150 */ 2151 return (last_finalized_seq + diff + 1); 2152 } 2153 2154 /* 2155 * Non-blocking read of a record. 2156 * 2157 * On success @seq is updated to the record that was read and (if provided) 2158 * @r and @line_count will contain the read/calculated data. 2159 * 2160 * On failure @seq is updated to a record that is not yet available to the 2161 * reader, but it will be the next record available to the reader. 2162 * 2163 * Note: When the current CPU is in panic, this function will skip over any 2164 * non-existent/non-finalized records in order to allow the panic CPU 2165 * to print any and all records that have been finalized. 2166 */ 2167 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, 2168 struct printk_record *r, unsigned int *line_count) 2169 { 2170 u64 tail_seq; 2171 int err; 2172 2173 while ((err = prb_read(rb, *seq, r, line_count))) { 2174 tail_seq = prb_first_seq(rb); 2175 2176 if (*seq < tail_seq) { 2177 /* 2178 * Behind the tail. Catch up and try again. This 2179 * can happen for -ENOENT and -EINVAL cases. 2180 */ 2181 *seq = tail_seq; 2182 2183 } else if (err == -ENOENT) { 2184 /* Record exists, but the data was lost. Skip. */ 2185 (*seq)++; 2186 2187 } else { 2188 /* 2189 * Non-existent/non-finalized record. Must stop. 2190 * 2191 * For panic situations it cannot be expected that 2192 * non-finalized records will become finalized. But 2193 * there may be other finalized records beyond that 2194 * need to be printed for a panic situation. If this 2195 * is the panic CPU, skip this 2196 * non-existent/non-finalized record unless non-panic 2197 * CPUs are still running and their debugging is 2198 * explicitly enabled. 2199 * 2200 * Note that new messages printed on panic CPU are 2201 * finalized when we are here. The only exception 2202 * might be the last message without trailing newline. 2203 * But it would have the sequence number returned 2204 * by "prb_next_reserve_seq() - 1". 2205 */ 2206 if (panic_on_this_cpu() && 2207 (!debug_non_panic_cpus || legacy_allow_panic_sync) && 2208 ((*seq + 1) < prb_next_reserve_seq(rb))) { 2209 (*seq)++; 2210 } else { 2211 return false; 2212 } 2213 } 2214 } 2215 2216 return true; 2217 } 2218 2219 /** 2220 * prb_read_valid() - Non-blocking read of a requested record or (if gone) 2221 * the next available record. 2222 * 2223 * @rb: The ringbuffer to read from. 2224 * @seq: The sequence number of the record to read. 2225 * @r: A record data buffer to store the read record to. 2226 * 2227 * This is the public function available to readers to read a record. 2228 * 2229 * The reader provides the @info and @text_buf buffers of @r to be 2230 * filled in. Any of the buffer pointers can be set to NULL if the reader 2231 * is not interested in that data. To ensure proper initialization of @r, 2232 * prb_rec_init_rd() should be used. 2233 * 2234 * Context: Any context. 2235 * Return: true if a record was read, otherwise false. 2236 * 2237 * On success, the reader must check r->info.seq to see which record was 2238 * actually read. This allows the reader to detect dropped records. 2239 * 2240 * Failure means @seq refers to a record not yet available to the reader. 2241 */ 2242 bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq, 2243 struct printk_record *r) 2244 { 2245 return _prb_read_valid(rb, &seq, r, NULL); 2246 } 2247 EXPORT_SYMBOL_IF_KUNIT(prb_read_valid); 2248 2249 /** 2250 * prb_read_valid_info() - Non-blocking read of meta data for a requested 2251 * record or (if gone) the next available record. 2252 * 2253 * @rb: The ringbuffer to read from. 2254 * @seq: The sequence number of the record to read. 2255 * @info: A buffer to store the read record meta data to. 2256 * @line_count: A buffer to store the number of lines in the record text. 2257 * 2258 * This is the public function available to readers to read only the 2259 * meta data of a record. 2260 * 2261 * The reader provides the @info, @line_count buffers to be filled in. 2262 * Either of the buffer pointers can be set to NULL if the reader is not 2263 * interested in that data. 2264 * 2265 * Context: Any context. 2266 * Return: true if a record's meta data was read, otherwise false. 2267 * 2268 * On success, the reader must check info->seq to see which record meta data 2269 * was actually read. This allows the reader to detect dropped records. 2270 * 2271 * Failure means @seq refers to a record not yet available to the reader. 2272 */ 2273 bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq, 2274 struct printk_info *info, unsigned int *line_count) 2275 { 2276 struct printk_record r; 2277 2278 prb_rec_init_rd(&r, info, NULL, 0); 2279 2280 return _prb_read_valid(rb, &seq, &r, line_count); 2281 } 2282 2283 /** 2284 * prb_first_valid_seq() - Get the sequence number of the oldest available 2285 * record. 2286 * 2287 * @rb: The ringbuffer to get the sequence number from. 2288 * 2289 * This is the public function available to readers to see what the 2290 * first/oldest valid sequence number is. 2291 * 2292 * This provides readers a starting point to begin iterating the ringbuffer. 2293 * 2294 * Context: Any context. 2295 * Return: The sequence number of the first/oldest record or, if the 2296 * ringbuffer is empty, 0 is returned. 2297 */ 2298 u64 prb_first_valid_seq(struct printk_ringbuffer *rb) 2299 { 2300 u64 seq = 0; 2301 2302 if (!_prb_read_valid(rb, &seq, NULL, NULL)) 2303 return 0; 2304 2305 return seq; 2306 } 2307 2308 /** 2309 * prb_next_seq() - Get the sequence number after the last available record. 2310 * 2311 * @rb: The ringbuffer to get the sequence number from. 2312 * 2313 * This is the public function available to readers to see what the next 2314 * newest sequence number available to readers will be. 2315 * 2316 * This provides readers a sequence number to jump to if all currently 2317 * available records should be skipped. It is guaranteed that all records 2318 * previous to the returned value have been finalized and are (or were) 2319 * available to the reader. 2320 * 2321 * Context: Any context. 2322 * Return: The sequence number of the next newest (not yet available) record 2323 * for readers. 2324 */ 2325 u64 prb_next_seq(struct printk_ringbuffer *rb) 2326 { 2327 u64 seq; 2328 2329 seq = desc_last_finalized_seq(rb); 2330 2331 /* 2332 * Begin searching after the last finalized record. 2333 * 2334 * On 0, the search must begin at 0 because of hack#2 2335 * of the bootstrapping phase it is not known if a 2336 * record at index 0 exists. 2337 */ 2338 if (seq != 0) 2339 seq++; 2340 2341 /* 2342 * The information about the last finalized @seq might be inaccurate. 2343 * Search forward to find the current one. 2344 */ 2345 while (_prb_read_valid(rb, &seq, NULL, NULL)) 2346 seq++; 2347 2348 return seq; 2349 } 2350 2351 /** 2352 * prb_init() - Initialize a ringbuffer to use provided external buffers. 2353 * 2354 * @rb: The ringbuffer to initialize. 2355 * @text_buf: The data buffer for text data. 2356 * @textbits: The size of @text_buf as a power-of-2 value. 2357 * @descs: The descriptor buffer for ringbuffer records. 2358 * @descbits: The count of @descs items as a power-of-2 value. 2359 * @infos: The printk_info buffer for ringbuffer records. 2360 * 2361 * This is the public function available to writers to setup a ringbuffer 2362 * during runtime using provided buffers. 2363 * 2364 * This must match the initialization of DEFINE_PRINTKRB(). 2365 * 2366 * Context: Any context. 2367 */ 2368 void prb_init(struct printk_ringbuffer *rb, 2369 char *text_buf, unsigned int textbits, 2370 struct prb_desc *descs, unsigned int descbits, 2371 struct printk_info *infos) 2372 { 2373 memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0])); 2374 memset(infos, 0, _DESCS_COUNT(descbits) * sizeof(infos[0])); 2375 2376 rb->desc_ring.count_bits = descbits; 2377 rb->desc_ring.descs = descs; 2378 rb->desc_ring.infos = infos; 2379 atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits)); 2380 atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits)); 2381 atomic_long_set(&rb->desc_ring.last_finalized_seq, 0); 2382 2383 rb->text_data_ring.size_bits = textbits; 2384 rb->text_data_ring.data = text_buf; 2385 atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits)); 2386 atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits)); 2387 2388 atomic_long_set(&rb->fail, 0); 2389 2390 atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var), DESC0_SV(descbits)); 2391 descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = FAILED_LPOS; 2392 descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = FAILED_LPOS; 2393 2394 infos[0].seq = -(u64)_DESCS_COUNT(descbits); 2395 infos[_DESCS_COUNT(descbits) - 1].seq = 0; 2396 } 2397 EXPORT_SYMBOL_IF_KUNIT(prb_init); 2398 2399 /** 2400 * prb_record_text_space() - Query the full actual used ringbuffer space for 2401 * the text data of a reserved entry. 2402 * 2403 * @e: The successfully reserved entry to query. 2404 * 2405 * This is the public function available to writers to see how much actual 2406 * space is used in the ringbuffer to store the text data of the specified 2407 * entry. 2408 * 2409 * This function is only valid if @e has been successfully reserved using 2410 * prb_reserve(). 2411 * 2412 * Context: Any context. 2413 * Return: The size in bytes used by the text data of the associated record. 2414 */ 2415 unsigned int prb_record_text_space(struct prb_reserved_entry *e) 2416 { 2417 return e->text_space; 2418 } 2419