1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <kunit/visibility.h> 4 #include <linux/kernel.h> 5 #include <linux/irqflags.h> 6 #include <linux/string.h> 7 #include <linux/errno.h> 8 #include <linux/bug.h> 9 #include "printk_ringbuffer.h" 10 #include "internal.h" 11 12 /** 13 * DOC: printk_ringbuffer overview 14 * 15 * Data Structure 16 * -------------- 17 * The printk_ringbuffer is made up of 3 internal ringbuffers: 18 * 19 * desc_ring 20 * A ring of descriptors and their meta data (such as sequence number, 21 * timestamp, loglevel, etc.) as well as internal state information about 22 * the record and logical positions specifying where in the other 23 * ringbuffer the text strings are located. 24 * 25 * text_data_ring 26 * A ring of data blocks. A data block consists of an unsigned long 27 * integer (ID) that maps to a desc_ring index followed by the text 28 * string of the record. 29 * 30 * The internal state information of a descriptor is the key element to allow 31 * readers and writers to locklessly synchronize access to the data. 32 * 33 * Implementation 34 * -------------- 35 * 36 * Descriptor Ring 37 * ~~~~~~~~~~~~~~~ 38 * The descriptor ring is an array of descriptors. A descriptor contains 39 * essential meta data to track the data of a printk record using 40 * blk_lpos structs pointing to associated text data blocks (see 41 * "Data Rings" below). Each descriptor is assigned an ID that maps 42 * directly to index values of the descriptor array and has a state. The ID 43 * and the state are bitwise combined into a single descriptor field named 44 * @state_var, allowing ID and state to be synchronously and atomically 45 * updated. 46 * 47 * Descriptors have four states: 48 * 49 * reserved 50 * A writer is modifying the record. 51 * 52 * committed 53 * The record and all its data are written. A writer can reopen the 54 * descriptor (transitioning it back to reserved), but in the committed 55 * state the data is consistent. 56 * 57 * finalized 58 * The record and all its data are complete and available for reading. A 59 * writer cannot reopen the descriptor. 60 * 61 * reusable 62 * The record exists, but its text and/or meta data may no longer be 63 * available. 64 * 65 * Querying the @state_var of a record requires providing the ID of the 66 * descriptor to query. This can yield a possible fifth (pseudo) state: 67 * 68 * miss 69 * The descriptor being queried has an unexpected ID. 70 * 71 * The descriptor ring has a @tail_id that contains the ID of the oldest 72 * descriptor and @head_id that contains the ID of the newest descriptor. 73 * 74 * When a new descriptor should be created (and the ring is full), the tail 75 * descriptor is invalidated by first transitioning to the reusable state and 76 * then invalidating all tail data blocks up to and including the data blocks 77 * associated with the tail descriptor (for the text ring). Then 78 * @tail_id is advanced, followed by advancing @head_id. And finally the 79 * @state_var of the new descriptor is initialized to the new ID and reserved 80 * state. 81 * 82 * The @tail_id can only be advanced if the new @tail_id would be in the 83 * committed or reusable queried state. This makes it possible that a valid 84 * sequence number of the tail is always available. 85 * 86 * Descriptor Finalization 87 * ~~~~~~~~~~~~~~~~~~~~~~~ 88 * When a writer calls the commit function prb_commit(), record data is 89 * fully stored and is consistent within the ringbuffer. However, a writer can 90 * reopen that record, claiming exclusive access (as with prb_reserve()), and 91 * modify that record. When finished, the writer must again commit the record. 92 * 93 * In order for a record to be made available to readers (and also become 94 * recyclable for writers), it must be finalized. A finalized record cannot be 95 * reopened and can never become "unfinalized". Record finalization can occur 96 * in three different scenarios: 97 * 98 * 1) A writer can simultaneously commit and finalize its record by calling 99 * prb_final_commit() instead of prb_commit(). 100 * 101 * 2) When a new record is reserved and the previous record has been 102 * committed via prb_commit(), that previous record is automatically 103 * finalized. 104 * 105 * 3) When a record is committed via prb_commit() and a newer record 106 * already exists, the record being committed is automatically finalized. 107 * 108 * Data Ring 109 * ~~~~~~~~~ 110 * The text data ring is a byte array composed of data blocks. Data blocks are 111 * referenced by blk_lpos structs that point to the logical position of the 112 * beginning of a data block and the beginning of the next adjacent data 113 * block. Logical positions are mapped directly to index values of the byte 114 * array ringbuffer. 115 * 116 * Each data block consists of an ID followed by the writer data. The ID is 117 * the identifier of a descriptor that is associated with the data block. A 118 * given data block is considered valid if all of the following conditions 119 * are met: 120 * 121 * 1) The descriptor associated with the data block is in the committed 122 * or finalized queried state. 123 * 124 * 2) The blk_lpos struct within the descriptor associated with the data 125 * block references back to the same data block. 126 * 127 * 3) The data block is within the head/tail logical position range. 128 * 129 * If the writer data of a data block would extend beyond the end of the 130 * byte array, only the ID of the data block is stored at the logical 131 * position and the full data block (ID and writer data) is stored at the 132 * beginning of the byte array. The referencing blk_lpos will point to the 133 * ID before the wrap and the next data block will be at the logical 134 * position adjacent the full data block after the wrap. 135 * 136 * Data rings have a @tail_lpos that points to the beginning of the oldest 137 * data block and a @head_lpos that points to the logical position of the 138 * next (not yet existing) data block. 139 * 140 * When a new data block should be created (and the ring is full), tail data 141 * blocks will first be invalidated by putting their associated descriptors 142 * into the reusable state and then pushing the @tail_lpos forward beyond 143 * them. Then the @head_lpos is pushed forward and is associated with a new 144 * descriptor. If a data block is not valid, the @tail_lpos cannot be 145 * advanced beyond it. 146 * 147 * Info Array 148 * ~~~~~~~~~~ 149 * The general meta data of printk records are stored in printk_info structs, 150 * stored in an array with the same number of elements as the descriptor ring. 151 * Each info corresponds to the descriptor of the same index in the 152 * descriptor ring. Info validity is confirmed by evaluating the corresponding 153 * descriptor before and after loading the info. 154 * 155 * Usage 156 * ----- 157 * Here are some simple examples demonstrating writers and readers. For the 158 * examples a global ringbuffer (test_rb) is available (which is not the 159 * actual ringbuffer used by printk):: 160 * 161 * DEFINE_PRINTKRB(test_rb, 15, 5); 162 * 163 * This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of 164 * 1 MiB (2 ^ (15 + 5)) for text data. 165 * 166 * Sample writer code:: 167 * 168 * const char *textstr = "message text"; 169 * struct prb_reserved_entry e; 170 * struct printk_record r; 171 * 172 * // specify how much to allocate 173 * prb_rec_init_wr(&r, strlen(textstr) + 1); 174 * 175 * if (prb_reserve(&e, &test_rb, &r)) { 176 * snprintf(r.text_buf, r.text_buf_size, "%s", textstr); 177 * 178 * r.info->text_len = strlen(textstr); 179 * r.info->ts_nsec = local_clock(); 180 * r.info->caller_id = printk_caller_id(); 181 * 182 * // commit and finalize the record 183 * prb_final_commit(&e); 184 * } 185 * 186 * Note that additional writer functions are available to extend a record 187 * after it has been committed but not yet finalized. This can be done as 188 * long as no new records have been reserved and the caller is the same. 189 * 190 * Sample writer code (record extending):: 191 * 192 * // alternate rest of previous example 193 * 194 * r.info->text_len = strlen(textstr); 195 * r.info->ts_nsec = local_clock(); 196 * r.info->caller_id = printk_caller_id(); 197 * 198 * // commit the record (but do not finalize yet) 199 * prb_commit(&e); 200 * } 201 * 202 * ... 203 * 204 * // specify additional 5 bytes text space to extend 205 * prb_rec_init_wr(&r, 5); 206 * 207 * // try to extend, but only if it does not exceed 32 bytes 208 * if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id(), 32)) { 209 * snprintf(&r.text_buf[r.info->text_len], 210 * r.text_buf_size - r.info->text_len, "hello"); 211 * 212 * r.info->text_len += 5; 213 * 214 * // commit and finalize the record 215 * prb_final_commit(&e); 216 * } 217 * 218 * Sample reader code:: 219 * 220 * struct printk_info info; 221 * struct printk_record r; 222 * char text_buf[32]; 223 * u64 seq; 224 * 225 * prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf)); 226 * 227 * prb_for_each_record(0, &test_rb, &seq, &r) { 228 * if (info.seq != seq) 229 * pr_warn("lost %llu records\n", info.seq - seq); 230 * 231 * if (info.text_len > r.text_buf_size) { 232 * pr_warn("record %llu text truncated\n", info.seq); 233 * text_buf[r.text_buf_size - 1] = 0; 234 * } 235 * 236 * pr_info("%llu: %llu: %s\n", info.seq, info.ts_nsec, 237 * &text_buf[0]); 238 * } 239 * 240 * Note that additional less convenient reader functions are available to 241 * allow complex record access. 242 * 243 * ABA Issues 244 * ~~~~~~~~~~ 245 * To help avoid ABA issues, descriptors are referenced by IDs (array index 246 * values combined with tagged bits counting array wraps) and data blocks are 247 * referenced by logical positions (array index values combined with tagged 248 * bits counting array wraps). However, on 32-bit systems the number of 249 * tagged bits is relatively small such that an ABA incident is (at least 250 * theoretically) possible. For example, if 4 million maximally sized (1KiB) 251 * printk messages were to occur in NMI context on a 32-bit system, the 252 * interrupted context would not be able to recognize that the 32-bit integer 253 * completely wrapped and thus represents a different data block than the one 254 * the interrupted context expects. 255 * 256 * To help combat this possibility, additional state checking is performed 257 * (such as using cmpxchg() even though set() would suffice). These extra 258 * checks are commented as such and will hopefully catch any ABA issue that 259 * a 32-bit system might experience. 260 * 261 * Memory Barriers 262 * ~~~~~~~~~~~~~~~ 263 * Multiple memory barriers are used. To simplify proving correctness and 264 * generating litmus tests, lines of code related to memory barriers 265 * (loads, stores, and the associated memory barriers) are labeled:: 266 * 267 * LMM(function:letter) 268 * 269 * Comments reference the labels using only the "function:letter" part. 270 * 271 * The memory barrier pairs and their ordering are: 272 * 273 * desc_reserve:D / desc_reserve:B 274 * push descriptor tail (id), then push descriptor head (id) 275 * 276 * desc_reserve:D / data_push_tail:B 277 * push data tail (lpos), then set new descriptor reserved (state) 278 * 279 * desc_reserve:D / desc_push_tail:C 280 * push descriptor tail (id), then set new descriptor reserved (state) 281 * 282 * desc_reserve:D / prb_first_seq:C 283 * push descriptor tail (id), then set new descriptor reserved (state) 284 * 285 * desc_reserve:F / desc_read:D 286 * set new descriptor id and reserved (state), then allow writer changes 287 * 288 * data_alloc:A (or data_realloc:A) / desc_read:D 289 * set old descriptor reusable (state), then modify new data block area 290 * 291 * data_alloc:A (or data_realloc:A) / data_push_tail:B 292 * push data tail (lpos), then modify new data block area 293 * 294 * _prb_commit:B / desc_read:B 295 * store writer changes, then set new descriptor committed (state) 296 * 297 * desc_reopen_last:A / _prb_commit:B 298 * set descriptor reserved (state), then read descriptor data 299 * 300 * _prb_commit:B / desc_reserve:D 301 * set new descriptor committed (state), then check descriptor head (id) 302 * 303 * data_push_tail:D / data_push_tail:A 304 * set descriptor reusable (state), then push data tail (lpos) 305 * 306 * desc_push_tail:B / desc_reserve:D 307 * set descriptor reusable (state), then push descriptor tail (id) 308 * 309 * desc_update_last_finalized:A / desc_last_finalized_seq:A 310 * store finalized record, then set new highest finalized sequence number 311 */ 312 313 #define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits) 314 #define DATA_SIZE_MASK(data_ring) (DATA_SIZE(data_ring) - 1) 315 316 #define DESCS_COUNT(desc_ring) _DESCS_COUNT((desc_ring)->count_bits) 317 #define DESCS_COUNT_MASK(desc_ring) (DESCS_COUNT(desc_ring) - 1) 318 319 /* Determine the data array index from a logical position. */ 320 #define DATA_INDEX(data_ring, lpos) ((lpos) & DATA_SIZE_MASK(data_ring)) 321 322 /* Determine the desc array index from an ID or sequence number. */ 323 #define DESC_INDEX(desc_ring, n) ((n) & DESCS_COUNT_MASK(desc_ring)) 324 325 /* Determine how many times the data array has wrapped. */ 326 #define DATA_WRAPS(data_ring, lpos) ((lpos) >> (data_ring)->size_bits) 327 328 /* Determine if a logical position refers to a data-less block. */ 329 #define LPOS_DATALESS(lpos) ((lpos) & 1UL) 330 #define BLK_DATALESS(blk) (LPOS_DATALESS((blk)->begin) && \ 331 LPOS_DATALESS((blk)->next)) 332 333 /* Get the logical position at index 0 of the current wrap. */ 334 #define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \ 335 ((lpos) & ~DATA_SIZE_MASK(data_ring)) 336 337 /* Get the ID for the same index of the previous wrap as the given ID. */ 338 #define DESC_ID_PREV_WRAP(desc_ring, id) \ 339 DESC_ID((id) - DESCS_COUNT(desc_ring)) 340 341 /* 342 * A data block: mapped directly to the beginning of the data block area 343 * specified as a logical position within the data ring. 344 * 345 * @id: the ID of the associated descriptor 346 * @data: the writer data 347 * 348 * Note that the size of a data block is only known by its associated 349 * descriptor. 350 */ 351 struct prb_data_block { 352 unsigned long id; 353 char data[]; 354 }; 355 356 /* 357 * Return the descriptor associated with @n. @n can be either a 358 * descriptor ID or a sequence number. 359 */ 360 static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n) 361 { 362 return &desc_ring->descs[DESC_INDEX(desc_ring, n)]; 363 } 364 365 /* 366 * Return the printk_info associated with @n. @n can be either a 367 * descriptor ID or a sequence number. 368 */ 369 static struct printk_info *to_info(struct prb_desc_ring *desc_ring, u64 n) 370 { 371 return &desc_ring->infos[DESC_INDEX(desc_ring, n)]; 372 } 373 374 static struct prb_data_block *to_block(struct prb_data_ring *data_ring, 375 unsigned long begin_lpos) 376 { 377 return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)]; 378 } 379 380 /* 381 * Increase the data size to account for data block meta data plus any 382 * padding so that the adjacent data block is aligned on the ID size. 383 */ 384 static unsigned int to_blk_size(unsigned int size) 385 { 386 struct prb_data_block *db = NULL; 387 388 size += sizeof(*db); 389 size = ALIGN(size, sizeof(db->id)); 390 return size; 391 } 392 393 /* 394 * Sanity checker for reserve size. The ringbuffer code assumes that a data 395 * block does not exceed the maximum possible size that could fit within the 396 * ringbuffer. This function provides that basic size check so that the 397 * assumption is safe. In particular, it guarantees that data_push_tail() will 398 * never attempt to push the tail beyond the head. 399 */ 400 static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size) 401 { 402 /* Data-less blocks take no space. */ 403 if (size == 0) 404 return true; 405 406 /* 407 * If data blocks were allowed to be larger than half the data ring 408 * size, a wrapping data block could require more space than the full 409 * ringbuffer. 410 */ 411 return to_blk_size(size) <= DATA_SIZE(data_ring) / 2; 412 } 413 414 /* 415 * Compare the current and requested logical position and decide 416 * whether more space is needed. 417 * 418 * Return false when @lpos_current is already at or beyond @lpos_target. 419 * 420 * Also return false when the difference between the positions is bigger 421 * than the size of the data buffer. It might happen only when the caller 422 * raced with another CPU(s) which already made and used the space. 423 */ 424 static bool need_more_space(struct prb_data_ring *data_ring, 425 unsigned long lpos_current, 426 unsigned long lpos_target) 427 { 428 return lpos_target - lpos_current - 1 < DATA_SIZE(data_ring); 429 } 430 431 /* Query the state of a descriptor. */ 432 static enum desc_state get_desc_state(unsigned long id, 433 unsigned long state_val) 434 { 435 if (id != DESC_ID(state_val)) 436 return desc_miss; 437 438 return DESC_STATE(state_val); 439 } 440 441 /* 442 * Get a copy of a specified descriptor and return its queried state. If the 443 * descriptor is in an inconsistent state (miss or reserved), the caller can 444 * only expect the descriptor's @state_var field to be valid. 445 * 446 * The sequence number and caller_id can be optionally retrieved. Like all 447 * non-state_var data, they are only valid if the descriptor is in a 448 * consistent state. 449 */ 450 static enum desc_state desc_read(struct prb_desc_ring *desc_ring, 451 unsigned long id, struct prb_desc *desc_out, 452 u64 *seq_out, u32 *caller_id_out) 453 { 454 struct printk_info *info = to_info(desc_ring, id); 455 struct prb_desc *desc = to_desc(desc_ring, id); 456 atomic_long_t *state_var = &desc->state_var; 457 enum desc_state d_state; 458 unsigned long state_val; 459 460 /* Check the descriptor state. */ 461 state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */ 462 d_state = get_desc_state(id, state_val); 463 if (d_state == desc_miss || d_state == desc_reserved) { 464 /* 465 * The descriptor is in an inconsistent state. Set at least 466 * @state_var so that the caller can see the details of 467 * the inconsistent state. 468 */ 469 goto out; 470 } 471 472 /* 473 * Guarantee the state is loaded before copying the descriptor 474 * content. This avoids copying obsolete descriptor content that might 475 * not apply to the descriptor state. This pairs with _prb_commit:B. 476 * 477 * Memory barrier involvement: 478 * 479 * If desc_read:A reads from _prb_commit:B, then desc_read:C reads 480 * from _prb_commit:A. 481 * 482 * Relies on: 483 * 484 * WMB from _prb_commit:A to _prb_commit:B 485 * matching 486 * RMB from desc_read:A to desc_read:C 487 */ 488 smp_rmb(); /* LMM(desc_read:B) */ 489 490 /* 491 * Copy the descriptor data. The data is not valid until the 492 * state has been re-checked. A memcpy() for all of @desc 493 * cannot be used because of the atomic_t @state_var field. 494 */ 495 if (desc_out) { 496 memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos, 497 sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */ 498 } 499 if (seq_out) 500 *seq_out = info->seq; /* also part of desc_read:C */ 501 if (caller_id_out) 502 *caller_id_out = info->caller_id; /* also part of desc_read:C */ 503 504 /* 505 * 1. Guarantee the descriptor content is loaded before re-checking 506 * the state. This avoids reading an obsolete descriptor state 507 * that may not apply to the copied content. This pairs with 508 * desc_reserve:F. 509 * 510 * Memory barrier involvement: 511 * 512 * If desc_read:C reads from desc_reserve:G, then desc_read:E 513 * reads from desc_reserve:F. 514 * 515 * Relies on: 516 * 517 * WMB from desc_reserve:F to desc_reserve:G 518 * matching 519 * RMB from desc_read:C to desc_read:E 520 * 521 * 2. Guarantee the record data is loaded before re-checking the 522 * state. This avoids reading an obsolete descriptor state that may 523 * not apply to the copied data. This pairs with data_alloc:A and 524 * data_realloc:A. 525 * 526 * Memory barrier involvement: 527 * 528 * If copy_data:A reads from data_alloc:B, then desc_read:E 529 * reads from desc_make_reusable:A. 530 * 531 * Relies on: 532 * 533 * MB from desc_make_reusable:A to data_alloc:B 534 * matching 535 * RMB from desc_read:C to desc_read:E 536 * 537 * Note: desc_make_reusable:A and data_alloc:B can be different 538 * CPUs. However, the data_alloc:B CPU (which performs the 539 * full memory barrier) must have previously seen 540 * desc_make_reusable:A. 541 */ 542 smp_rmb(); /* LMM(desc_read:D) */ 543 544 /* 545 * The data has been copied. Return the current descriptor state, 546 * which may have changed since the load above. 547 */ 548 state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */ 549 d_state = get_desc_state(id, state_val); 550 out: 551 if (desc_out) 552 atomic_long_set(&desc_out->state_var, state_val); 553 return d_state; 554 } 555 556 /* 557 * Take a specified descriptor out of the finalized state by attempting 558 * the transition from finalized to reusable. Either this context or some 559 * other context will have been successful. 560 */ 561 static void desc_make_reusable(struct prb_desc_ring *desc_ring, 562 unsigned long id) 563 { 564 unsigned long val_finalized = DESC_SV(id, desc_finalized); 565 unsigned long val_reusable = DESC_SV(id, desc_reusable); 566 struct prb_desc *desc = to_desc(desc_ring, id); 567 atomic_long_t *state_var = &desc->state_var; 568 569 atomic_long_cmpxchg_relaxed(state_var, val_finalized, 570 val_reusable); /* LMM(desc_make_reusable:A) */ 571 } 572 573 /* 574 * Given the text data ring, put the associated descriptor of each 575 * data block from @lpos_begin until @lpos_end into the reusable state. 576 * 577 * If there is any problem making the associated descriptor reusable, either 578 * the descriptor has not yet been finalized or another writer context has 579 * already pushed the tail lpos past the problematic data block. Regardless, 580 * on error the caller can re-load the tail lpos to determine the situation. 581 */ 582 static bool data_make_reusable(struct printk_ringbuffer *rb, 583 unsigned long lpos_begin, 584 unsigned long lpos_end, 585 unsigned long *lpos_out) 586 { 587 588 struct prb_data_ring *data_ring = &rb->text_data_ring; 589 struct prb_desc_ring *desc_ring = &rb->desc_ring; 590 struct prb_data_block *blk; 591 enum desc_state d_state; 592 struct prb_desc desc; 593 struct prb_data_blk_lpos *blk_lpos = &desc.text_blk_lpos; 594 unsigned long id; 595 596 /* Loop until @lpos_begin has advanced to or beyond @lpos_end. */ 597 while (need_more_space(data_ring, lpos_begin, lpos_end)) { 598 blk = to_block(data_ring, lpos_begin); 599 600 /* 601 * Load the block ID from the data block. This is a data race 602 * against a writer that may have newly reserved this data 603 * area. If the loaded value matches a valid descriptor ID, 604 * the blk_lpos of that descriptor will be checked to make 605 * sure it points back to this data block. If the check fails, 606 * the data area has been recycled by another writer. 607 */ 608 id = blk->id; /* LMM(data_make_reusable:A) */ 609 610 d_state = desc_read(desc_ring, id, &desc, 611 NULL, NULL); /* LMM(data_make_reusable:B) */ 612 613 switch (d_state) { 614 case desc_miss: 615 case desc_reserved: 616 case desc_committed: 617 return false; 618 case desc_finalized: 619 /* 620 * This data block is invalid if the descriptor 621 * does not point back to it. 622 */ 623 if (blk_lpos->begin != lpos_begin) 624 return false; 625 desc_make_reusable(desc_ring, id); 626 break; 627 case desc_reusable: 628 /* 629 * This data block is invalid if the descriptor 630 * does not point back to it. 631 */ 632 if (blk_lpos->begin != lpos_begin) 633 return false; 634 break; 635 } 636 637 /* Advance @lpos_begin to the next data block. */ 638 lpos_begin = blk_lpos->next; 639 } 640 641 *lpos_out = lpos_begin; 642 return true; 643 } 644 645 /* 646 * Advance the data ring tail to at least @lpos. This function puts 647 * descriptors into the reusable state if the tail is pushed beyond 648 * their associated data block. 649 */ 650 static bool data_push_tail(struct printk_ringbuffer *rb, unsigned long lpos) 651 { 652 struct prb_data_ring *data_ring = &rb->text_data_ring; 653 unsigned long tail_lpos_new; 654 unsigned long tail_lpos; 655 unsigned long next_lpos; 656 657 /* If @lpos is from a data-less block, there is nothing to do. */ 658 if (LPOS_DATALESS(lpos)) 659 return true; 660 661 /* 662 * Any descriptor states that have transitioned to reusable due to the 663 * data tail being pushed to this loaded value will be visible to this 664 * CPU. This pairs with data_push_tail:D. 665 * 666 * Memory barrier involvement: 667 * 668 * If data_push_tail:A reads from data_push_tail:D, then this CPU can 669 * see desc_make_reusable:A. 670 * 671 * Relies on: 672 * 673 * MB from desc_make_reusable:A to data_push_tail:D 674 * matches 675 * READFROM from data_push_tail:D to data_push_tail:A 676 * thus 677 * READFROM from desc_make_reusable:A to this CPU 678 */ 679 tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:A) */ 680 681 /* 682 * Loop until the tail lpos is at or beyond @lpos. This condition 683 * may already be satisfied, resulting in no full memory barrier 684 * from data_push_tail:D being performed. However, since this CPU 685 * sees the new tail lpos, any descriptor states that transitioned to 686 * the reusable state must already be visible. 687 */ 688 while (need_more_space(data_ring, tail_lpos, lpos)) { 689 /* 690 * Make all descriptors reusable that are associated with 691 * data blocks before @lpos. 692 */ 693 if (!data_make_reusable(rb, tail_lpos, lpos, &next_lpos)) { 694 /* 695 * 1. Guarantee the block ID loaded in 696 * data_make_reusable() is performed before 697 * reloading the tail lpos. The failed 698 * data_make_reusable() may be due to a newly 699 * recycled data area causing the tail lpos to 700 * have been previously pushed. This pairs with 701 * data_alloc:A and data_realloc:A. 702 * 703 * Memory barrier involvement: 704 * 705 * If data_make_reusable:A reads from data_alloc:B, 706 * then data_push_tail:C reads from 707 * data_push_tail:D. 708 * 709 * Relies on: 710 * 711 * MB from data_push_tail:D to data_alloc:B 712 * matching 713 * RMB from data_make_reusable:A to 714 * data_push_tail:C 715 * 716 * Note: data_push_tail:D and data_alloc:B can be 717 * different CPUs. However, the data_alloc:B 718 * CPU (which performs the full memory 719 * barrier) must have previously seen 720 * data_push_tail:D. 721 * 722 * 2. Guarantee the descriptor state loaded in 723 * data_make_reusable() is performed before 724 * reloading the tail lpos. The failed 725 * data_make_reusable() may be due to a newly 726 * recycled descriptor causing the tail lpos to 727 * have been previously pushed. This pairs with 728 * desc_reserve:D. 729 * 730 * Memory barrier involvement: 731 * 732 * If data_make_reusable:B reads from 733 * desc_reserve:F, then data_push_tail:C reads 734 * from data_push_tail:D. 735 * 736 * Relies on: 737 * 738 * MB from data_push_tail:D to desc_reserve:F 739 * matching 740 * RMB from data_make_reusable:B to 741 * data_push_tail:C 742 * 743 * Note: data_push_tail:D and desc_reserve:F can 744 * be different CPUs. However, the 745 * desc_reserve:F CPU (which performs the 746 * full memory barrier) must have previously 747 * seen data_push_tail:D. 748 */ 749 smp_rmb(); /* LMM(data_push_tail:B) */ 750 751 tail_lpos_new = atomic_long_read(&data_ring->tail_lpos 752 ); /* LMM(data_push_tail:C) */ 753 if (tail_lpos_new == tail_lpos) 754 return false; 755 756 /* Another CPU pushed the tail. Try again. */ 757 tail_lpos = tail_lpos_new; 758 continue; 759 } 760 761 /* 762 * Guarantee any descriptor states that have transitioned to 763 * reusable are stored before pushing the tail lpos. A full 764 * memory barrier is needed since other CPUs may have made 765 * the descriptor states reusable. This pairs with 766 * data_push_tail:A. 767 */ 768 if (atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos, 769 next_lpos)) { /* LMM(data_push_tail:D) */ 770 break; 771 } 772 } 773 774 return true; 775 } 776 777 /* 778 * Advance the desc ring tail. This function advances the tail by one 779 * descriptor, thus invalidating the oldest descriptor. Before advancing 780 * the tail, the tail descriptor is made reusable and all data blocks up to 781 * and including the descriptor's data block are invalidated (i.e. the data 782 * ring tail is pushed past the data block of the descriptor being made 783 * reusable). 784 */ 785 static bool desc_push_tail(struct printk_ringbuffer *rb, 786 unsigned long tail_id) 787 { 788 struct prb_desc_ring *desc_ring = &rb->desc_ring; 789 enum desc_state d_state; 790 struct prb_desc desc; 791 792 d_state = desc_read(desc_ring, tail_id, &desc, NULL, NULL); 793 794 switch (d_state) { 795 case desc_miss: 796 /* 797 * If the ID is exactly 1 wrap behind the expected, it is 798 * in the process of being reserved by another writer and 799 * must be considered reserved. 800 */ 801 if (DESC_ID(atomic_long_read(&desc.state_var)) == 802 DESC_ID_PREV_WRAP(desc_ring, tail_id)) { 803 return false; 804 } 805 806 /* 807 * The ID has changed. Another writer must have pushed the 808 * tail and recycled the descriptor already. Success is 809 * returned because the caller is only interested in the 810 * specified tail being pushed, which it was. 811 */ 812 return true; 813 case desc_reserved: 814 case desc_committed: 815 return false; 816 case desc_finalized: 817 desc_make_reusable(desc_ring, tail_id); 818 break; 819 case desc_reusable: 820 break; 821 } 822 823 /* 824 * Data blocks must be invalidated before their associated 825 * descriptor can be made available for recycling. Invalidating 826 * them later is not possible because there is no way to trust 827 * data blocks once their associated descriptor is gone. 828 */ 829 830 if (!data_push_tail(rb, desc.text_blk_lpos.next)) 831 return false; 832 833 /* 834 * Check the next descriptor after @tail_id before pushing the tail 835 * to it because the tail must always be in a finalized or reusable 836 * state. The implementation of prb_first_seq() relies on this. 837 * 838 * A successful read implies that the next descriptor is less than or 839 * equal to @head_id so there is no risk of pushing the tail past the 840 * head. 841 */ 842 d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc, 843 NULL, NULL); /* LMM(desc_push_tail:A) */ 844 845 if (d_state == desc_finalized || d_state == desc_reusable) { 846 /* 847 * Guarantee any descriptor states that have transitioned to 848 * reusable are stored before pushing the tail ID. This allows 849 * verifying the recycled descriptor state. A full memory 850 * barrier is needed since other CPUs may have made the 851 * descriptor states reusable. This pairs with desc_reserve:D. 852 */ 853 atomic_long_cmpxchg(&desc_ring->tail_id, tail_id, 854 DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */ 855 } else { 856 /* 857 * Guarantee the last state load from desc_read() is before 858 * reloading @tail_id in order to see a new tail ID in the 859 * case that the descriptor has been recycled. This pairs 860 * with desc_reserve:D. 861 * 862 * Memory barrier involvement: 863 * 864 * If desc_push_tail:A reads from desc_reserve:F, then 865 * desc_push_tail:D reads from desc_push_tail:B. 866 * 867 * Relies on: 868 * 869 * MB from desc_push_tail:B to desc_reserve:F 870 * matching 871 * RMB from desc_push_tail:A to desc_push_tail:D 872 * 873 * Note: desc_push_tail:B and desc_reserve:F can be different 874 * CPUs. However, the desc_reserve:F CPU (which performs 875 * the full memory barrier) must have previously seen 876 * desc_push_tail:B. 877 */ 878 smp_rmb(); /* LMM(desc_push_tail:C) */ 879 880 /* 881 * Re-check the tail ID. The descriptor following @tail_id is 882 * not in an allowed tail state. But if the tail has since 883 * been moved by another CPU, then it does not matter. 884 */ 885 if (atomic_long_read(&desc_ring->tail_id) == tail_id) /* LMM(desc_push_tail:D) */ 886 return false; 887 } 888 889 return true; 890 } 891 892 /* Reserve a new descriptor, invalidating the oldest if necessary. */ 893 static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out) 894 { 895 struct prb_desc_ring *desc_ring = &rb->desc_ring; 896 unsigned long prev_state_val; 897 unsigned long id_prev_wrap; 898 struct prb_desc *desc; 899 unsigned long head_id; 900 unsigned long id; 901 902 head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */ 903 904 do { 905 id = DESC_ID(head_id + 1); 906 id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id); 907 908 /* 909 * Guarantee the head ID is read before reading the tail ID. 910 * Since the tail ID is updated before the head ID, this 911 * guarantees that @id_prev_wrap is never ahead of the tail 912 * ID. This pairs with desc_reserve:D. 913 * 914 * Memory barrier involvement: 915 * 916 * If desc_reserve:A reads from desc_reserve:D, then 917 * desc_reserve:C reads from desc_push_tail:B. 918 * 919 * Relies on: 920 * 921 * MB from desc_push_tail:B to desc_reserve:D 922 * matching 923 * RMB from desc_reserve:A to desc_reserve:C 924 * 925 * Note: desc_push_tail:B and desc_reserve:D can be different 926 * CPUs. However, the desc_reserve:D CPU (which performs 927 * the full memory barrier) must have previously seen 928 * desc_push_tail:B. 929 */ 930 smp_rmb(); /* LMM(desc_reserve:B) */ 931 932 if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id 933 )) { /* LMM(desc_reserve:C) */ 934 /* 935 * Make space for the new descriptor by 936 * advancing the tail. 937 */ 938 if (!desc_push_tail(rb, id_prev_wrap)) 939 return false; 940 } 941 942 /* 943 * 1. Guarantee the tail ID is read before validating the 944 * recycled descriptor state. A read memory barrier is 945 * sufficient for this. This pairs with desc_push_tail:B. 946 * 947 * Memory barrier involvement: 948 * 949 * If desc_reserve:C reads from desc_push_tail:B, then 950 * desc_reserve:E reads from desc_make_reusable:A. 951 * 952 * Relies on: 953 * 954 * MB from desc_make_reusable:A to desc_push_tail:B 955 * matching 956 * RMB from desc_reserve:C to desc_reserve:E 957 * 958 * Note: desc_make_reusable:A and desc_push_tail:B can be 959 * different CPUs. However, the desc_push_tail:B CPU 960 * (which performs the full memory barrier) must have 961 * previously seen desc_make_reusable:A. 962 * 963 * 2. Guarantee the tail ID is stored before storing the head 964 * ID. This pairs with desc_reserve:B. 965 * 966 * 3. Guarantee any data ring tail changes are stored before 967 * recycling the descriptor. Data ring tail changes can 968 * happen via desc_push_tail()->data_push_tail(). A full 969 * memory barrier is needed since another CPU may have 970 * pushed the data ring tails. This pairs with 971 * data_push_tail:B. 972 * 973 * 4. Guarantee a new tail ID is stored before recycling the 974 * descriptor. A full memory barrier is needed since 975 * another CPU may have pushed the tail ID. This pairs 976 * with desc_push_tail:C and this also pairs with 977 * prb_first_seq:C. 978 * 979 * 5. Guarantee the head ID is stored before trying to 980 * finalize the previous descriptor. This pairs with 981 * _prb_commit:B. 982 */ 983 } while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id, 984 id)); /* LMM(desc_reserve:D) */ 985 986 desc = to_desc(desc_ring, id); 987 988 /* 989 * If the descriptor has been recycled, verify the old state val. 990 * See "ABA Issues" about why this verification is performed. 991 */ 992 prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */ 993 if (prev_state_val && 994 get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) { 995 WARN_ON_ONCE(1); 996 return false; 997 } 998 999 /* 1000 * Assign the descriptor a new ID and set its state to reserved. 1001 * See "ABA Issues" about why cmpxchg() instead of set() is used. 1002 * 1003 * Guarantee the new descriptor ID and state is stored before making 1004 * any other changes. A write memory barrier is sufficient for this. 1005 * This pairs with desc_read:D. 1006 */ 1007 if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val, 1008 DESC_SV(id, desc_reserved))) { /* LMM(desc_reserve:F) */ 1009 WARN_ON_ONCE(1); 1010 return false; 1011 } 1012 1013 /* Now data in @desc can be modified: LMM(desc_reserve:G) */ 1014 1015 *id_out = id; 1016 return true; 1017 } 1018 1019 static bool is_blk_wrapped(struct prb_data_ring *data_ring, 1020 unsigned long begin_lpos, unsigned long next_lpos) 1021 { 1022 /* 1023 * Subtract one from next_lpos since it's not actually part of this data 1024 * block. This allows perfectly fitting records to not wrap. 1025 */ 1026 return DATA_WRAPS(data_ring, begin_lpos) != 1027 DATA_WRAPS(data_ring, next_lpos - 1); 1028 } 1029 1030 /* Determine the end of a data block. */ 1031 static unsigned long get_next_lpos(struct prb_data_ring *data_ring, 1032 unsigned long lpos, unsigned int size) 1033 { 1034 unsigned long begin_lpos; 1035 unsigned long next_lpos; 1036 1037 begin_lpos = lpos; 1038 next_lpos = lpos + size; 1039 1040 /* First check if the data block does not wrap. */ 1041 if (!is_blk_wrapped(data_ring, begin_lpos, next_lpos)) 1042 return next_lpos; 1043 1044 /* Wrapping data blocks store their data at the beginning. */ 1045 return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size); 1046 } 1047 1048 /* 1049 * Allocate a new data block, invalidating the oldest data block(s) 1050 * if necessary. This function also associates the data block with 1051 * a specified descriptor. 1052 */ 1053 static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size, 1054 struct prb_data_blk_lpos *blk_lpos, unsigned long id) 1055 { 1056 struct prb_data_ring *data_ring = &rb->text_data_ring; 1057 struct prb_data_block *blk; 1058 unsigned long begin_lpos; 1059 unsigned long next_lpos; 1060 1061 if (size == 0) { 1062 /* 1063 * Data blocks are not created for empty lines. Instead, the 1064 * reader will recognize these special lpos values and handle 1065 * it appropriately. 1066 */ 1067 blk_lpos->begin = EMPTY_LINE_LPOS; 1068 blk_lpos->next = EMPTY_LINE_LPOS; 1069 return NULL; 1070 } 1071 1072 size = to_blk_size(size); 1073 1074 begin_lpos = atomic_long_read(&data_ring->head_lpos); 1075 1076 do { 1077 next_lpos = get_next_lpos(data_ring, begin_lpos, size); 1078 1079 /* 1080 * data_check_size() prevents data block allocation that could 1081 * cause illegal ringbuffer states. But double check that the 1082 * used space will not be bigger than the ring buffer. Wrapped 1083 * messages need to reserve more space, see get_next_lpos(). 1084 * 1085 * Specify a data-less block when the check or the allocation 1086 * fails. 1087 */ 1088 if (WARN_ON_ONCE(next_lpos - begin_lpos > DATA_SIZE(data_ring)) || 1089 !data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) { 1090 blk_lpos->begin = FAILED_LPOS; 1091 blk_lpos->next = FAILED_LPOS; 1092 return NULL; 1093 } 1094 1095 /* 1096 * 1. Guarantee any descriptor states that have transitioned 1097 * to reusable are stored before modifying the newly 1098 * allocated data area. A full memory barrier is needed 1099 * since other CPUs may have made the descriptor states 1100 * reusable. See data_push_tail:A about why the reusable 1101 * states are visible. This pairs with desc_read:D. 1102 * 1103 * 2. Guarantee any updated tail lpos is stored before 1104 * modifying the newly allocated data area. Another CPU may 1105 * be in data_make_reusable() and is reading a block ID 1106 * from this area. data_make_reusable() can handle reading 1107 * a garbage block ID value, but then it must be able to 1108 * load a new tail lpos. A full memory barrier is needed 1109 * since other CPUs may have updated the tail lpos. This 1110 * pairs with data_push_tail:B. 1111 */ 1112 } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos, 1113 next_lpos)); /* LMM(data_alloc:A) */ 1114 1115 blk = to_block(data_ring, begin_lpos); 1116 blk->id = id; /* LMM(data_alloc:B) */ 1117 1118 if (is_blk_wrapped(data_ring, begin_lpos, next_lpos)) { 1119 /* Wrapping data blocks store their data at the beginning. */ 1120 blk = to_block(data_ring, 0); 1121 1122 /* 1123 * Store the ID on the wrapped block for consistency. 1124 * The printk_ringbuffer does not actually use it. 1125 */ 1126 blk->id = id; 1127 } 1128 1129 blk_lpos->begin = begin_lpos; 1130 blk_lpos->next = next_lpos; 1131 1132 return &blk->data[0]; 1133 } 1134 1135 /* 1136 * Try to resize an existing data block associated with the descriptor 1137 * specified by @id. If the resized data block should become wrapped, it 1138 * copies the old data to the new data block. If @size yields a data block 1139 * with the same or less size, the data block is left as is. 1140 * 1141 * Fail if this is not the last allocated data block or if there is not 1142 * enough space or it is not possible make enough space. 1143 * 1144 * Return a pointer to the beginning of the entire data buffer or NULL on 1145 * failure. 1146 */ 1147 static char *data_realloc(struct printk_ringbuffer *rb, unsigned int size, 1148 struct prb_data_blk_lpos *blk_lpos, unsigned long id) 1149 { 1150 struct prb_data_ring *data_ring = &rb->text_data_ring; 1151 struct prb_data_block *blk; 1152 unsigned long head_lpos; 1153 unsigned long next_lpos; 1154 bool wrapped; 1155 1156 /* Reallocation only works if @blk_lpos is the newest data block. */ 1157 head_lpos = atomic_long_read(&data_ring->head_lpos); 1158 if (head_lpos != blk_lpos->next) 1159 return NULL; 1160 1161 /* Keep track if @blk_lpos was a wrapping data block. */ 1162 wrapped = is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next); 1163 1164 size = to_blk_size(size); 1165 1166 next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size); 1167 1168 /* 1169 * Use the current data block when the size does not increase, i.e. 1170 * when @head_lpos is already able to accommodate the new @next_lpos. 1171 * 1172 * Note that need_more_space() could never return false here because 1173 * the difference between the positions was bigger than the data 1174 * buffer size. The data block is reopened and can't get reused. 1175 */ 1176 if (!need_more_space(data_ring, head_lpos, next_lpos)) { 1177 if (wrapped) 1178 blk = to_block(data_ring, 0); 1179 else 1180 blk = to_block(data_ring, blk_lpos->begin); 1181 return &blk->data[0]; 1182 } 1183 1184 /* 1185 * data_check_size() prevents data block reallocation that could 1186 * cause illegal ringbuffer states. But double check that the 1187 * new used space will not be bigger than the ring buffer. Wrapped 1188 * messages need to reserve more space, see get_next_lpos(). 1189 * 1190 * Specify failure when the check or the allocation fails. 1191 */ 1192 if (WARN_ON_ONCE(next_lpos - blk_lpos->begin > DATA_SIZE(data_ring)) || 1193 !data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) { 1194 return NULL; 1195 } 1196 1197 /* The memory barrier involvement is the same as data_alloc:A. */ 1198 if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos, 1199 next_lpos)) { /* LMM(data_realloc:A) */ 1200 return NULL; 1201 } 1202 1203 blk = to_block(data_ring, blk_lpos->begin); 1204 1205 if (is_blk_wrapped(data_ring, blk_lpos->begin, next_lpos)) { 1206 struct prb_data_block *old_blk = blk; 1207 1208 /* Wrapping data blocks store their data at the beginning. */ 1209 blk = to_block(data_ring, 0); 1210 1211 /* 1212 * Store the ID on the wrapped block for consistency. 1213 * The printk_ringbuffer does not actually use it. 1214 */ 1215 blk->id = id; 1216 1217 if (!wrapped) { 1218 /* 1219 * Since the allocated space is now in the newly 1220 * created wrapping data block, copy the content 1221 * from the old data block. 1222 */ 1223 memcpy(&blk->data[0], &old_blk->data[0], 1224 (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id)); 1225 } 1226 } 1227 1228 blk_lpos->next = next_lpos; 1229 1230 return &blk->data[0]; 1231 } 1232 1233 /* Return the number of bytes used by a data block. */ 1234 static unsigned int space_used(struct prb_data_ring *data_ring, 1235 struct prb_data_blk_lpos *blk_lpos) 1236 { 1237 /* Data-less blocks take no space. */ 1238 if (BLK_DATALESS(blk_lpos)) 1239 return 0; 1240 1241 if (!is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next)) { 1242 /* Data block does not wrap. */ 1243 return (DATA_INDEX(data_ring, blk_lpos->next) - 1244 DATA_INDEX(data_ring, blk_lpos->begin)); 1245 } 1246 1247 /* 1248 * For wrapping data blocks, the trailing (wasted) space is 1249 * also counted. 1250 */ 1251 return (DATA_INDEX(data_ring, blk_lpos->next) + 1252 DATA_SIZE(data_ring) - DATA_INDEX(data_ring, blk_lpos->begin)); 1253 } 1254 1255 /* 1256 * Given @blk_lpos, return a pointer to the writer data from the data block 1257 * and calculate the size of the data part. A NULL pointer is returned if 1258 * @blk_lpos specifies values that could never be legal. 1259 * 1260 * This function (used by readers) performs strict validation on the lpos 1261 * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is 1262 * triggered if an internal error is detected. 1263 */ 1264 static const char *get_data(struct prb_data_ring *data_ring, 1265 struct prb_data_blk_lpos *blk_lpos, 1266 unsigned int *data_size) 1267 { 1268 struct prb_data_block *db; 1269 1270 /* Data-less data block description. */ 1271 if (BLK_DATALESS(blk_lpos)) { 1272 /* 1273 * Records that are just empty lines are also valid, even 1274 * though they do not have a data block. For such records 1275 * explicitly return empty string data to signify success. 1276 */ 1277 if (blk_lpos->begin == EMPTY_LINE_LPOS && 1278 blk_lpos->next == EMPTY_LINE_LPOS) { 1279 *data_size = 0; 1280 return ""; 1281 } 1282 1283 /* Data lost, invalid, or otherwise unavailable. */ 1284 return NULL; 1285 } 1286 1287 /* Regular data block: @begin and @next in the same wrap. */ 1288 if (!is_blk_wrapped(data_ring, blk_lpos->begin, blk_lpos->next)) { 1289 db = to_block(data_ring, blk_lpos->begin); 1290 *data_size = blk_lpos->next - blk_lpos->begin; 1291 1292 /* Wrapping data block: @begin is one wrap behind @next. */ 1293 } else if (!is_blk_wrapped(data_ring, 1294 blk_lpos->begin + DATA_SIZE(data_ring), 1295 blk_lpos->next)) { 1296 db = to_block(data_ring, 0); 1297 *data_size = DATA_INDEX(data_ring, blk_lpos->next); 1298 1299 /* Illegal block description. */ 1300 } else { 1301 WARN_ON_ONCE(1); 1302 return NULL; 1303 } 1304 1305 /* Sanity check. Data-less blocks were handled earlier. */ 1306 if (WARN_ON_ONCE(!data_check_size(data_ring, *data_size) || !*data_size)) 1307 return NULL; 1308 1309 /* A valid data block will always be aligned to the ID size. */ 1310 if (WARN_ON_ONCE(blk_lpos->begin != ALIGN(blk_lpos->begin, sizeof(db->id))) || 1311 WARN_ON_ONCE(blk_lpos->next != ALIGN(blk_lpos->next, sizeof(db->id)))) { 1312 return NULL; 1313 } 1314 1315 /* A valid data block will always have at least an ID. */ 1316 if (WARN_ON_ONCE(*data_size < sizeof(db->id))) 1317 return NULL; 1318 1319 /* Subtract block ID space from size to reflect data size. */ 1320 *data_size -= sizeof(db->id); 1321 1322 return &db->data[0]; 1323 } 1324 1325 /* 1326 * Attempt to transition the newest descriptor from committed back to reserved 1327 * so that the record can be modified by a writer again. This is only possible 1328 * if the descriptor is not yet finalized and the provided @caller_id matches. 1329 */ 1330 static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring, 1331 u32 caller_id, unsigned long *id_out) 1332 { 1333 unsigned long prev_state_val; 1334 enum desc_state d_state; 1335 struct prb_desc desc; 1336 struct prb_desc *d; 1337 unsigned long id; 1338 u32 cid; 1339 1340 id = atomic_long_read(&desc_ring->head_id); 1341 1342 /* 1343 * To reduce unnecessarily reopening, first check if the descriptor 1344 * state and caller ID are correct. 1345 */ 1346 d_state = desc_read(desc_ring, id, &desc, NULL, &cid); 1347 if (d_state != desc_committed || cid != caller_id) 1348 return NULL; 1349 1350 d = to_desc(desc_ring, id); 1351 1352 prev_state_val = DESC_SV(id, desc_committed); 1353 1354 /* 1355 * Guarantee the reserved state is stored before reading any 1356 * record data. A full memory barrier is needed because @state_var 1357 * modification is followed by reading. This pairs with _prb_commit:B. 1358 * 1359 * Memory barrier involvement: 1360 * 1361 * If desc_reopen_last:A reads from _prb_commit:B, then 1362 * prb_reserve_in_last:A reads from _prb_commit:A. 1363 * 1364 * Relies on: 1365 * 1366 * WMB from _prb_commit:A to _prb_commit:B 1367 * matching 1368 * MB If desc_reopen_last:A to prb_reserve_in_last:A 1369 */ 1370 if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val, 1371 DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */ 1372 return NULL; 1373 } 1374 1375 *id_out = id; 1376 return d; 1377 } 1378 1379 /** 1380 * prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer 1381 * used by the newest record. 1382 * 1383 * @e: The entry structure to setup. 1384 * @rb: The ringbuffer to re-reserve and extend data in. 1385 * @r: The record structure to allocate buffers for. 1386 * @caller_id: The caller ID of the caller (reserving writer). 1387 * @max_size: Fail if the extended size would be greater than this. 1388 * 1389 * This is the public function available to writers to re-reserve and extend 1390 * data. 1391 * 1392 * The writer specifies the text size to extend (not the new total size) by 1393 * setting the @text_buf_size field of @r. To ensure proper initialization 1394 * of @r, prb_rec_init_wr() should be used. 1395 * 1396 * This function will fail if @caller_id does not match the caller ID of the 1397 * newest record. In that case the caller must reserve new data using 1398 * prb_reserve(). 1399 * 1400 * Context: Any context. Disables local interrupts on success. 1401 * Return: true if text data could be extended, otherwise false. 1402 * 1403 * On success: 1404 * 1405 * - @r->text_buf points to the beginning of the entire text buffer. 1406 * 1407 * - @r->text_buf_size is set to the new total size of the buffer. 1408 * 1409 * - @r->info is not touched so that @r->info->text_len could be used 1410 * to append the text. 1411 * 1412 * - prb_record_text_space() can be used on @e to query the new 1413 * actually used space. 1414 * 1415 * Important: All @r->info fields will already be set with the current values 1416 * for the record. I.e. @r->info->text_len will be less than 1417 * @text_buf_size. Writers can use @r->info->text_len to know 1418 * where concatenation begins and writers should update 1419 * @r->info->text_len after concatenating. 1420 */ 1421 bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, 1422 struct printk_record *r, u32 caller_id, unsigned int max_size) 1423 { 1424 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1425 struct printk_info *info; 1426 unsigned int data_size; 1427 struct prb_desc *d; 1428 unsigned long id; 1429 1430 local_irq_save(e->irqflags); 1431 1432 /* Transition the newest descriptor back to the reserved state. */ 1433 d = desc_reopen_last(desc_ring, caller_id, &id); 1434 if (!d) { 1435 local_irq_restore(e->irqflags); 1436 goto fail_reopen; 1437 } 1438 1439 /* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */ 1440 1441 info = to_info(desc_ring, id); 1442 1443 /* 1444 * Set the @e fields here so that prb_commit() can be used if 1445 * anything fails from now on. 1446 */ 1447 e->rb = rb; 1448 e->id = id; 1449 1450 /* 1451 * desc_reopen_last() checked the caller_id, but there was no 1452 * exclusive access at that point. The descriptor may have 1453 * changed since then. 1454 */ 1455 if (caller_id != info->caller_id) 1456 goto fail; 1457 1458 if (BLK_DATALESS(&d->text_blk_lpos)) { 1459 if (WARN_ON_ONCE(info->text_len != 0)) { 1460 pr_warn_once("wrong text_len value (%hu, expecting 0)\n", 1461 info->text_len); 1462 info->text_len = 0; 1463 } 1464 1465 if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) 1466 goto fail; 1467 1468 if (r->text_buf_size > max_size) 1469 goto fail; 1470 1471 r->text_buf = data_alloc(rb, r->text_buf_size, 1472 &d->text_blk_lpos, id); 1473 } else { 1474 if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size)) 1475 goto fail; 1476 1477 /* 1478 * Increase the buffer size to include the original size. If 1479 * the meta data (@text_len) is not sane, use the full data 1480 * block size. 1481 */ 1482 if (WARN_ON_ONCE(info->text_len > data_size)) { 1483 pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n", 1484 info->text_len, data_size); 1485 info->text_len = data_size; 1486 } 1487 r->text_buf_size += info->text_len; 1488 1489 if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) 1490 goto fail; 1491 1492 if (r->text_buf_size > max_size) 1493 goto fail; 1494 1495 r->text_buf = data_realloc(rb, r->text_buf_size, 1496 &d->text_blk_lpos, id); 1497 } 1498 if (r->text_buf_size && !r->text_buf) 1499 goto fail; 1500 1501 r->info = info; 1502 1503 e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos); 1504 1505 return true; 1506 fail: 1507 prb_commit(e); 1508 /* prb_commit() re-enabled interrupts. */ 1509 fail_reopen: 1510 /* Make it clear to the caller that the re-reserve failed. */ 1511 memset(r, 0, sizeof(*r)); 1512 return false; 1513 } 1514 1515 /* 1516 * @last_finalized_seq value guarantees that all records up to and including 1517 * this sequence number are finalized and can be read. The only exception are 1518 * too old records which have already been overwritten. 1519 * 1520 * It is also guaranteed that @last_finalized_seq only increases. 1521 * 1522 * Be aware that finalized records following non-finalized records are not 1523 * reported because they are not yet available to the reader. For example, 1524 * a new record stored via printk() will not be available to a printer if 1525 * it follows a record that has not been finalized yet. However, once that 1526 * non-finalized record becomes finalized, @last_finalized_seq will be 1527 * appropriately updated and the full set of finalized records will be 1528 * available to the printer. And since each printk() caller will either 1529 * directly print or trigger deferred printing of all available unprinted 1530 * records, all printk() messages will get printed. 1531 */ 1532 static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb) 1533 { 1534 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1535 unsigned long ulseq; 1536 1537 /* 1538 * Guarantee the sequence number is loaded before loading the 1539 * associated record in order to guarantee that the record can be 1540 * seen by this CPU. This pairs with desc_update_last_finalized:A. 1541 */ 1542 ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq 1543 ); /* LMM(desc_last_finalized_seq:A) */ 1544 1545 return __ulseq_to_u64seq(rb, ulseq); 1546 } 1547 1548 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, 1549 struct printk_record *r, unsigned int *line_count); 1550 1551 /* 1552 * Check if there are records directly following @last_finalized_seq that are 1553 * finalized. If so, update @last_finalized_seq to the latest of these 1554 * records. It is not allowed to skip over records that are not yet finalized. 1555 */ 1556 static void desc_update_last_finalized(struct printk_ringbuffer *rb) 1557 { 1558 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1559 u64 old_seq = desc_last_finalized_seq(rb); 1560 unsigned long oldval; 1561 unsigned long newval; 1562 u64 finalized_seq; 1563 u64 try_seq; 1564 1565 try_again: 1566 finalized_seq = old_seq; 1567 try_seq = finalized_seq + 1; 1568 1569 /* Try to find later finalized records. */ 1570 while (_prb_read_valid(rb, &try_seq, NULL, NULL)) { 1571 finalized_seq = try_seq; 1572 try_seq++; 1573 } 1574 1575 /* No update needed if no later finalized record was found. */ 1576 if (finalized_seq == old_seq) 1577 return; 1578 1579 oldval = __u64seq_to_ulseq(old_seq); 1580 newval = __u64seq_to_ulseq(finalized_seq); 1581 1582 /* 1583 * Set the sequence number of a later finalized record that has been 1584 * seen. 1585 * 1586 * Guarantee the record data is visible to other CPUs before storing 1587 * its sequence number. This pairs with desc_last_finalized_seq:A. 1588 * 1589 * Memory barrier involvement: 1590 * 1591 * If desc_last_finalized_seq:A reads from 1592 * desc_update_last_finalized:A, then desc_read:A reads from 1593 * _prb_commit:B. 1594 * 1595 * Relies on: 1596 * 1597 * RELEASE from _prb_commit:B to desc_update_last_finalized:A 1598 * matching 1599 * ACQUIRE from desc_last_finalized_seq:A to desc_read:A 1600 * 1601 * Note: _prb_commit:B and desc_update_last_finalized:A can be 1602 * different CPUs. However, the desc_update_last_finalized:A 1603 * CPU (which performs the release) must have previously seen 1604 * _prb_commit:B. 1605 */ 1606 if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq, 1607 &oldval, newval)) { /* LMM(desc_update_last_finalized:A) */ 1608 old_seq = __ulseq_to_u64seq(rb, oldval); 1609 goto try_again; 1610 } 1611 } 1612 1613 /* 1614 * Attempt to finalize a specified descriptor. If this fails, the descriptor 1615 * is either already final or it will finalize itself when the writer commits. 1616 */ 1617 static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id) 1618 { 1619 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1620 unsigned long prev_state_val = DESC_SV(id, desc_committed); 1621 struct prb_desc *d = to_desc(desc_ring, id); 1622 1623 if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val, 1624 DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */ 1625 desc_update_last_finalized(rb); 1626 } 1627 } 1628 1629 /** 1630 * prb_reserve() - Reserve space in the ringbuffer. 1631 * 1632 * @e: The entry structure to setup. 1633 * @rb: The ringbuffer to reserve data in. 1634 * @r: The record structure to allocate buffers for. 1635 * 1636 * This is the public function available to writers to reserve data. 1637 * 1638 * The writer specifies the text size to reserve by setting the 1639 * @text_buf_size field of @r. To ensure proper initialization of @r, 1640 * prb_rec_init_wr() should be used. 1641 * 1642 * Context: Any context. Disables local interrupts on success. 1643 * Return: true if at least text data could be allocated, otherwise false. 1644 * 1645 * On success, the fields @info and @text_buf of @r will be set by this 1646 * function and should be filled in by the writer before committing. Also 1647 * on success, prb_record_text_space() can be used on @e to query the actual 1648 * space used for the text data block. 1649 * 1650 * Important: @info->text_len needs to be set correctly by the writer in 1651 * order for data to be readable and/or extended. Its value 1652 * is initialized to 0. 1653 */ 1654 bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb, 1655 struct printk_record *r) 1656 { 1657 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1658 struct printk_info *info; 1659 struct prb_desc *d; 1660 unsigned long id; 1661 u64 seq; 1662 1663 if (!data_check_size(&rb->text_data_ring, r->text_buf_size)) 1664 goto fail; 1665 1666 /* 1667 * Descriptors in the reserved state act as blockers to all further 1668 * reservations once the desc_ring has fully wrapped. Disable 1669 * interrupts during the reserve/commit window in order to minimize 1670 * the likelihood of this happening. 1671 */ 1672 local_irq_save(e->irqflags); 1673 1674 if (!desc_reserve(rb, &id)) { 1675 /* Descriptor reservation failures are tracked. */ 1676 atomic_long_inc(&rb->fail); 1677 local_irq_restore(e->irqflags); 1678 goto fail; 1679 } 1680 1681 d = to_desc(desc_ring, id); 1682 info = to_info(desc_ring, id); 1683 1684 /* 1685 * All @info fields (except @seq) are cleared and must be filled in 1686 * by the writer. Save @seq before clearing because it is used to 1687 * determine the new sequence number. 1688 */ 1689 seq = info->seq; 1690 memset(info, 0, sizeof(*info)); 1691 1692 /* 1693 * Set the @e fields here so that prb_commit() can be used if 1694 * text data allocation fails. 1695 */ 1696 e->rb = rb; 1697 e->id = id; 1698 1699 /* 1700 * Initialize the sequence number if it has "never been set". 1701 * Otherwise just increment it by a full wrap. 1702 * 1703 * @seq is considered "never been set" if it has a value of 0, 1704 * _except_ for @infos[0], which was specially setup by the ringbuffer 1705 * initializer and therefore is always considered as set. 1706 * 1707 * See the "Bootstrap" comment block in printk_ringbuffer.h for 1708 * details about how the initializer bootstraps the descriptors. 1709 */ 1710 if (seq == 0 && DESC_INDEX(desc_ring, id) != 0) 1711 info->seq = DESC_INDEX(desc_ring, id); 1712 else 1713 info->seq = seq + DESCS_COUNT(desc_ring); 1714 1715 /* 1716 * New data is about to be reserved. Once that happens, previous 1717 * descriptors are no longer able to be extended. Finalize the 1718 * previous descriptor now so that it can be made available to 1719 * readers. (For seq==0 there is no previous descriptor.) 1720 */ 1721 if (info->seq > 0) 1722 desc_make_final(rb, DESC_ID(id - 1)); 1723 1724 r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id); 1725 /* If text data allocation fails, a data-less record is committed. */ 1726 if (r->text_buf_size && !r->text_buf) { 1727 prb_commit(e); 1728 /* prb_commit() re-enabled interrupts. */ 1729 goto fail; 1730 } 1731 1732 r->info = info; 1733 1734 /* Record full text space used by record. */ 1735 e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos); 1736 1737 return true; 1738 fail: 1739 /* Make it clear to the caller that the reserve failed. */ 1740 memset(r, 0, sizeof(*r)); 1741 return false; 1742 } 1743 EXPORT_SYMBOL_IF_KUNIT(prb_reserve); 1744 1745 /* Commit the data (possibly finalizing it) and restore interrupts. */ 1746 static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val) 1747 { 1748 struct prb_desc_ring *desc_ring = &e->rb->desc_ring; 1749 struct prb_desc *d = to_desc(desc_ring, e->id); 1750 unsigned long prev_state_val = DESC_SV(e->id, desc_reserved); 1751 1752 /* Now the writer has finished all writing: LMM(_prb_commit:A) */ 1753 1754 /* 1755 * Set the descriptor as committed. See "ABA Issues" about why 1756 * cmpxchg() instead of set() is used. 1757 * 1758 * 1 Guarantee all record data is stored before the descriptor state 1759 * is stored as committed. A write memory barrier is sufficient 1760 * for this. This pairs with desc_read:B and desc_reopen_last:A. 1761 * 1762 * 2. Guarantee the descriptor state is stored as committed before 1763 * re-checking the head ID in order to possibly finalize this 1764 * descriptor. This pairs with desc_reserve:D. 1765 * 1766 * Memory barrier involvement: 1767 * 1768 * If prb_commit:A reads from desc_reserve:D, then 1769 * desc_make_final:A reads from _prb_commit:B. 1770 * 1771 * Relies on: 1772 * 1773 * MB _prb_commit:B to prb_commit:A 1774 * matching 1775 * MB desc_reserve:D to desc_make_final:A 1776 */ 1777 if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val, 1778 DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */ 1779 WARN_ON_ONCE(1); 1780 } 1781 1782 /* Restore interrupts, the reserve/commit window is finished. */ 1783 local_irq_restore(e->irqflags); 1784 } 1785 1786 /** 1787 * prb_commit() - Commit (previously reserved) data to the ringbuffer. 1788 * 1789 * @e: The entry containing the reserved data information. 1790 * 1791 * This is the public function available to writers to commit data. 1792 * 1793 * Note that the data is not yet available to readers until it is finalized. 1794 * Finalizing happens automatically when space for the next record is 1795 * reserved. 1796 * 1797 * See prb_final_commit() for a version of this function that finalizes 1798 * immediately. 1799 * 1800 * Context: Any context. Enables local interrupts. 1801 */ 1802 void prb_commit(struct prb_reserved_entry *e) 1803 { 1804 struct prb_desc_ring *desc_ring = &e->rb->desc_ring; 1805 unsigned long head_id; 1806 1807 _prb_commit(e, desc_committed); 1808 1809 /* 1810 * If this descriptor is no longer the head (i.e. a new record has 1811 * been allocated), extending the data for this record is no longer 1812 * allowed and therefore it must be finalized. 1813 */ 1814 head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */ 1815 if (head_id != e->id) 1816 desc_make_final(e->rb, e->id); 1817 } 1818 EXPORT_SYMBOL_IF_KUNIT(prb_commit); 1819 1820 /** 1821 * prb_final_commit() - Commit and finalize (previously reserved) data to 1822 * the ringbuffer. 1823 * 1824 * @e: The entry containing the reserved data information. 1825 * 1826 * This is the public function available to writers to commit+finalize data. 1827 * 1828 * By finalizing, the data is made immediately available to readers. 1829 * 1830 * This function should only be used if there are no intentions of extending 1831 * this data using prb_reserve_in_last(). 1832 * 1833 * Context: Any context. Enables local interrupts. 1834 */ 1835 void prb_final_commit(struct prb_reserved_entry *e) 1836 { 1837 _prb_commit(e, desc_finalized); 1838 1839 desc_update_last_finalized(e->rb); 1840 } 1841 1842 /* 1843 * Count the number of lines in provided text. All text has at least 1 line 1844 * (even if @text_size is 0). Each '\n' processed is counted as an additional 1845 * line. 1846 */ 1847 static unsigned int count_lines(const char *text, unsigned int text_size) 1848 { 1849 unsigned int next_size = text_size; 1850 unsigned int line_count = 1; 1851 const char *next = text; 1852 1853 while (next_size) { 1854 next = memchr(next, '\n', next_size); 1855 if (!next) 1856 break; 1857 line_count++; 1858 next++; 1859 next_size = text_size - (next - text); 1860 } 1861 1862 return line_count; 1863 } 1864 1865 /* 1866 * Given @blk_lpos, copy an expected @len of data into the provided buffer. 1867 * If @line_count is provided, count the number of lines in the data. 1868 * 1869 * This function (used by readers) performs strict validation on the data 1870 * size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is 1871 * triggered if an internal error is detected. 1872 */ 1873 static bool copy_data(struct prb_data_ring *data_ring, 1874 struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf, 1875 unsigned int buf_size, unsigned int *line_count) 1876 { 1877 unsigned int data_size; 1878 const char *data; 1879 1880 /* Caller might not want any data. */ 1881 if ((!buf || !buf_size) && !line_count) 1882 return true; 1883 1884 data = get_data(data_ring, blk_lpos, &data_size); 1885 if (!data) 1886 return false; 1887 1888 /* 1889 * Actual cannot be less than expected. It can be more than expected 1890 * because of the trailing alignment padding. 1891 * 1892 * Note that invalid @len values can occur because the caller loads 1893 * the value during an allowed data race. 1894 */ 1895 if (data_size < (unsigned int)len) 1896 return false; 1897 1898 /* Caller interested in the line count? */ 1899 if (line_count) 1900 *line_count = count_lines(data, len); 1901 1902 /* Caller interested in the data content? */ 1903 if (!buf || !buf_size) 1904 return true; 1905 1906 data_size = min_t(unsigned int, buf_size, len); 1907 1908 memcpy(&buf[0], data, data_size); /* LMM(copy_data:A) */ 1909 return true; 1910 } 1911 1912 /* 1913 * This is an extended version of desc_read(). It gets a copy of a specified 1914 * descriptor. However, it also verifies that the record is finalized and has 1915 * the sequence number @seq. On success, 0 is returned. 1916 * 1917 * Error return values: 1918 * -EINVAL: A finalized record with sequence number @seq does not exist. 1919 * -ENOENT: A finalized record with sequence number @seq exists, but its data 1920 * is not available. This is a valid record, so readers should 1921 * continue with the next record. 1922 */ 1923 static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring, 1924 unsigned long id, u64 seq, 1925 struct prb_desc *desc_out) 1926 { 1927 struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos; 1928 enum desc_state d_state; 1929 u64 s; 1930 1931 d_state = desc_read(desc_ring, id, desc_out, &s, NULL); 1932 1933 /* 1934 * An unexpected @id (desc_miss) or @seq mismatch means the record 1935 * does not exist. A descriptor in the reserved or committed state 1936 * means the record does not yet exist for the reader. 1937 */ 1938 if (d_state == desc_miss || 1939 d_state == desc_reserved || 1940 d_state == desc_committed || 1941 s != seq) { 1942 return -EINVAL; 1943 } 1944 1945 /* 1946 * A descriptor in the reusable state may no longer have its data 1947 * available; report it as existing but with lost data. Or the record 1948 * may actually be a record with lost data. 1949 */ 1950 if (d_state == desc_reusable || 1951 (blk_lpos->begin == FAILED_LPOS && blk_lpos->next == FAILED_LPOS)) { 1952 return -ENOENT; 1953 } 1954 1955 return 0; 1956 } 1957 1958 /* 1959 * Copy the ringbuffer data from the record with @seq to the provided 1960 * @r buffer. On success, 0 is returned. 1961 * 1962 * See desc_read_finalized_seq() for error return values. 1963 */ 1964 static int prb_read(struct printk_ringbuffer *rb, u64 seq, 1965 struct printk_record *r, unsigned int *line_count) 1966 { 1967 struct prb_desc_ring *desc_ring = &rb->desc_ring; 1968 struct printk_info *info = to_info(desc_ring, seq); 1969 struct prb_desc *rdesc = to_desc(desc_ring, seq); 1970 atomic_long_t *state_var = &rdesc->state_var; 1971 struct prb_desc desc; 1972 unsigned long id; 1973 int err; 1974 1975 /* Extract the ID, used to specify the descriptor to read. */ 1976 id = DESC_ID(atomic_long_read(state_var)); 1977 1978 /* Get a local copy of the correct descriptor (if available). */ 1979 err = desc_read_finalized_seq(desc_ring, id, seq, &desc); 1980 1981 /* 1982 * If @r is NULL, the caller is only interested in the availability 1983 * of the record. 1984 */ 1985 if (err || !r) 1986 return err; 1987 1988 /* If requested, copy meta data. */ 1989 if (r->info) 1990 memcpy(r->info, info, sizeof(*(r->info))); 1991 1992 /* Copy text data. If it fails, this is a data-less record. */ 1993 if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len, 1994 r->text_buf, r->text_buf_size, line_count)) { 1995 return -ENOENT; 1996 } 1997 1998 /* Ensure the record is still finalized and has the same @seq. */ 1999 return desc_read_finalized_seq(desc_ring, id, seq, &desc); 2000 } 2001 2002 /* Get the sequence number of the tail descriptor. */ 2003 u64 prb_first_seq(struct printk_ringbuffer *rb) 2004 { 2005 struct prb_desc_ring *desc_ring = &rb->desc_ring; 2006 enum desc_state d_state; 2007 struct prb_desc desc; 2008 unsigned long id; 2009 u64 seq; 2010 2011 for (;;) { 2012 id = atomic_long_read(&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */ 2013 2014 d_state = desc_read(desc_ring, id, &desc, &seq, NULL); /* LMM(prb_first_seq:B) */ 2015 2016 /* 2017 * This loop will not be infinite because the tail is 2018 * _always_ in the finalized or reusable state. 2019 */ 2020 if (d_state == desc_finalized || d_state == desc_reusable) 2021 break; 2022 2023 /* 2024 * Guarantee the last state load from desc_read() is before 2025 * reloading @tail_id in order to see a new tail in the case 2026 * that the descriptor has been recycled. This pairs with 2027 * desc_reserve:D. 2028 * 2029 * Memory barrier involvement: 2030 * 2031 * If prb_first_seq:B reads from desc_reserve:F, then 2032 * prb_first_seq:A reads from desc_push_tail:B. 2033 * 2034 * Relies on: 2035 * 2036 * MB from desc_push_tail:B to desc_reserve:F 2037 * matching 2038 * RMB prb_first_seq:B to prb_first_seq:A 2039 */ 2040 smp_rmb(); /* LMM(prb_first_seq:C) */ 2041 } 2042 2043 return seq; 2044 } 2045 2046 /** 2047 * prb_next_reserve_seq() - Get the sequence number after the most recently 2048 * reserved record. 2049 * 2050 * @rb: The ringbuffer to get the sequence number from. 2051 * 2052 * This is the public function available to readers to see what sequence 2053 * number will be assigned to the next reserved record. 2054 * 2055 * Note that depending on the situation, this value can be equal to or 2056 * higher than the sequence number returned by prb_next_seq(). 2057 * 2058 * Context: Any context. 2059 * Return: The sequence number that will be assigned to the next record 2060 * reserved. 2061 */ 2062 u64 prb_next_reserve_seq(struct printk_ringbuffer *rb) 2063 { 2064 struct prb_desc_ring *desc_ring = &rb->desc_ring; 2065 unsigned long last_finalized_id; 2066 atomic_long_t *state_var; 2067 u64 last_finalized_seq; 2068 unsigned long head_id; 2069 struct prb_desc desc; 2070 unsigned long diff; 2071 struct prb_desc *d; 2072 int err; 2073 2074 /* 2075 * It may not be possible to read a sequence number for @head_id. 2076 * So the ID of @last_finailzed_seq is used to calculate what the 2077 * sequence number of @head_id will be. 2078 */ 2079 2080 try_again: 2081 last_finalized_seq = desc_last_finalized_seq(rb); 2082 2083 /* 2084 * @head_id is loaded after @last_finalized_seq to ensure that 2085 * it points to the record with @last_finalized_seq or newer. 2086 * 2087 * Memory barrier involvement: 2088 * 2089 * If desc_last_finalized_seq:A reads from 2090 * desc_update_last_finalized:A, then 2091 * prb_next_reserve_seq:A reads from desc_reserve:D. 2092 * 2093 * Relies on: 2094 * 2095 * RELEASE from desc_reserve:D to desc_update_last_finalized:A 2096 * matching 2097 * ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A 2098 * 2099 * Note: desc_reserve:D and desc_update_last_finalized:A can be 2100 * different CPUs. However, the desc_update_last_finalized:A CPU 2101 * (which performs the release) must have previously seen 2102 * desc_read:C, which implies desc_reserve:D can be seen. 2103 */ 2104 head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */ 2105 2106 d = to_desc(desc_ring, last_finalized_seq); 2107 state_var = &d->state_var; 2108 2109 /* Extract the ID, used to specify the descriptor to read. */ 2110 last_finalized_id = DESC_ID(atomic_long_read(state_var)); 2111 2112 /* Ensure @last_finalized_id is correct. */ 2113 err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc); 2114 2115 if (err == -EINVAL) { 2116 if (last_finalized_seq == 0) { 2117 /* 2118 * No record has been finalized or even reserved yet. 2119 * 2120 * The @head_id is initialized such that the first 2121 * increment will yield the first record (seq=0). 2122 * Handle it separately to avoid a negative @diff 2123 * below. 2124 */ 2125 if (head_id == DESC0_ID(desc_ring->count_bits)) 2126 return 0; 2127 2128 /* 2129 * One or more descriptors are already reserved. Use 2130 * the descriptor ID of the first one (@seq=0) for 2131 * the @diff below. 2132 */ 2133 last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1; 2134 } else { 2135 /* Record must have been overwritten. Try again. */ 2136 goto try_again; 2137 } 2138 } 2139 2140 /* Diff of known descriptor IDs to compute related sequence numbers. */ 2141 diff = head_id - last_finalized_id; 2142 2143 /* 2144 * @head_id points to the most recently reserved record, but this 2145 * function returns the sequence number that will be assigned to the 2146 * next (not yet reserved) record. Thus +1 is needed. 2147 */ 2148 return (last_finalized_seq + diff + 1); 2149 } 2150 2151 /* 2152 * Non-blocking read of a record. 2153 * 2154 * On success @seq is updated to the record that was read and (if provided) 2155 * @r and @line_count will contain the read/calculated data. 2156 * 2157 * On failure @seq is updated to a record that is not yet available to the 2158 * reader, but it will be the next record available to the reader. 2159 * 2160 * Note: When the current CPU is in panic, this function will skip over any 2161 * non-existent/non-finalized records in order to allow the panic CPU 2162 * to print any and all records that have been finalized. 2163 */ 2164 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq, 2165 struct printk_record *r, unsigned int *line_count) 2166 { 2167 u64 tail_seq; 2168 int err; 2169 2170 while ((err = prb_read(rb, *seq, r, line_count))) { 2171 tail_seq = prb_first_seq(rb); 2172 2173 if (*seq < tail_seq) { 2174 /* 2175 * Behind the tail. Catch up and try again. This 2176 * can happen for -ENOENT and -EINVAL cases. 2177 */ 2178 *seq = tail_seq; 2179 2180 } else if (err == -ENOENT) { 2181 /* Record exists, but the data was lost. Skip. */ 2182 (*seq)++; 2183 2184 } else { 2185 /* 2186 * Non-existent/non-finalized record. Must stop. 2187 * 2188 * For panic situations it cannot be expected that 2189 * non-finalized records will become finalized. But 2190 * there may be other finalized records beyond that 2191 * need to be printed for a panic situation. If this 2192 * is the panic CPU, skip this 2193 * non-existent/non-finalized record unless non-panic 2194 * CPUs are still running and their debugging is 2195 * explicitly enabled. 2196 * 2197 * Note that new messages printed on panic CPU are 2198 * finalized when we are here. The only exception 2199 * might be the last message without trailing newline. 2200 * But it would have the sequence number returned 2201 * by "prb_next_reserve_seq() - 1". 2202 */ 2203 if (panic_on_this_cpu() && 2204 (!debug_non_panic_cpus || legacy_allow_panic_sync) && 2205 ((*seq + 1) < prb_next_reserve_seq(rb))) { 2206 (*seq)++; 2207 } else { 2208 return false; 2209 } 2210 } 2211 } 2212 2213 return true; 2214 } 2215 2216 /** 2217 * prb_read_valid() - Non-blocking read of a requested record or (if gone) 2218 * the next available record. 2219 * 2220 * @rb: The ringbuffer to read from. 2221 * @seq: The sequence number of the record to read. 2222 * @r: A record data buffer to store the read record to. 2223 * 2224 * This is the public function available to readers to read a record. 2225 * 2226 * The reader provides the @info and @text_buf buffers of @r to be 2227 * filled in. Any of the buffer pointers can be set to NULL if the reader 2228 * is not interested in that data. To ensure proper initialization of @r, 2229 * prb_rec_init_rd() should be used. 2230 * 2231 * Context: Any context. 2232 * Return: true if a record was read, otherwise false. 2233 * 2234 * On success, the reader must check r->info.seq to see which record was 2235 * actually read. This allows the reader to detect dropped records. 2236 * 2237 * Failure means @seq refers to a record not yet available to the reader. 2238 */ 2239 bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq, 2240 struct printk_record *r) 2241 { 2242 return _prb_read_valid(rb, &seq, r, NULL); 2243 } 2244 EXPORT_SYMBOL_IF_KUNIT(prb_read_valid); 2245 2246 /** 2247 * prb_read_valid_info() - Non-blocking read of meta data for a requested 2248 * record or (if gone) the next available record. 2249 * 2250 * @rb: The ringbuffer to read from. 2251 * @seq: The sequence number of the record to read. 2252 * @info: A buffer to store the read record meta data to. 2253 * @line_count: A buffer to store the number of lines in the record text. 2254 * 2255 * This is the public function available to readers to read only the 2256 * meta data of a record. 2257 * 2258 * The reader provides the @info, @line_count buffers to be filled in. 2259 * Either of the buffer pointers can be set to NULL if the reader is not 2260 * interested in that data. 2261 * 2262 * Context: Any context. 2263 * Return: true if a record's meta data was read, otherwise false. 2264 * 2265 * On success, the reader must check info->seq to see which record meta data 2266 * was actually read. This allows the reader to detect dropped records. 2267 * 2268 * Failure means @seq refers to a record not yet available to the reader. 2269 */ 2270 bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq, 2271 struct printk_info *info, unsigned int *line_count) 2272 { 2273 struct printk_record r; 2274 2275 prb_rec_init_rd(&r, info, NULL, 0); 2276 2277 return _prb_read_valid(rb, &seq, &r, line_count); 2278 } 2279 2280 /** 2281 * prb_first_valid_seq() - Get the sequence number of the oldest available 2282 * record. 2283 * 2284 * @rb: The ringbuffer to get the sequence number from. 2285 * 2286 * This is the public function available to readers to see what the 2287 * first/oldest valid sequence number is. 2288 * 2289 * This provides readers a starting point to begin iterating the ringbuffer. 2290 * 2291 * Context: Any context. 2292 * Return: The sequence number of the first/oldest record or, if the 2293 * ringbuffer is empty, 0 is returned. 2294 */ 2295 u64 prb_first_valid_seq(struct printk_ringbuffer *rb) 2296 { 2297 u64 seq = 0; 2298 2299 if (!_prb_read_valid(rb, &seq, NULL, NULL)) 2300 return 0; 2301 2302 return seq; 2303 } 2304 2305 /** 2306 * prb_next_seq() - Get the sequence number after the last available record. 2307 * 2308 * @rb: The ringbuffer to get the sequence number from. 2309 * 2310 * This is the public function available to readers to see what the next 2311 * newest sequence number available to readers will be. 2312 * 2313 * This provides readers a sequence number to jump to if all currently 2314 * available records should be skipped. It is guaranteed that all records 2315 * previous to the returned value have been finalized and are (or were) 2316 * available to the reader. 2317 * 2318 * Context: Any context. 2319 * Return: The sequence number of the next newest (not yet available) record 2320 * for readers. 2321 */ 2322 u64 prb_next_seq(struct printk_ringbuffer *rb) 2323 { 2324 u64 seq; 2325 2326 seq = desc_last_finalized_seq(rb); 2327 2328 /* 2329 * Begin searching after the last finalized record. 2330 * 2331 * On 0, the search must begin at 0 because of hack#2 2332 * of the bootstrapping phase it is not known if a 2333 * record at index 0 exists. 2334 */ 2335 if (seq != 0) 2336 seq++; 2337 2338 /* 2339 * The information about the last finalized @seq might be inaccurate. 2340 * Search forward to find the current one. 2341 */ 2342 while (_prb_read_valid(rb, &seq, NULL, NULL)) 2343 seq++; 2344 2345 return seq; 2346 } 2347 2348 /** 2349 * prb_init() - Initialize a ringbuffer to use provided external buffers. 2350 * 2351 * @rb: The ringbuffer to initialize. 2352 * @text_buf: The data buffer for text data. 2353 * @textbits: The size of @text_buf as a power-of-2 value. 2354 * @descs: The descriptor buffer for ringbuffer records. 2355 * @descbits: The count of @descs items as a power-of-2 value. 2356 * @infos: The printk_info buffer for ringbuffer records. 2357 * 2358 * This is the public function available to writers to setup a ringbuffer 2359 * during runtime using provided buffers. 2360 * 2361 * This must match the initialization of DEFINE_PRINTKRB(). 2362 * 2363 * Context: Any context. 2364 */ 2365 void prb_init(struct printk_ringbuffer *rb, 2366 char *text_buf, unsigned int textbits, 2367 struct prb_desc *descs, unsigned int descbits, 2368 struct printk_info *infos) 2369 { 2370 memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0])); 2371 memset(infos, 0, _DESCS_COUNT(descbits) * sizeof(infos[0])); 2372 2373 rb->desc_ring.count_bits = descbits; 2374 rb->desc_ring.descs = descs; 2375 rb->desc_ring.infos = infos; 2376 atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits)); 2377 atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits)); 2378 atomic_long_set(&rb->desc_ring.last_finalized_seq, 0); 2379 2380 rb->text_data_ring.size_bits = textbits; 2381 rb->text_data_ring.data = text_buf; 2382 atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits)); 2383 atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits)); 2384 2385 atomic_long_set(&rb->fail, 0); 2386 2387 atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var), DESC0_SV(descbits)); 2388 descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = FAILED_LPOS; 2389 descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = FAILED_LPOS; 2390 2391 infos[0].seq = -(u64)_DESCS_COUNT(descbits); 2392 infos[_DESCS_COUNT(descbits) - 1].seq = 0; 2393 } 2394 EXPORT_SYMBOL_IF_KUNIT(prb_init); 2395 2396 /** 2397 * prb_record_text_space() - Query the full actual used ringbuffer space for 2398 * the text data of a reserved entry. 2399 * 2400 * @e: The successfully reserved entry to query. 2401 * 2402 * This is the public function available to writers to see how much actual 2403 * space is used in the ringbuffer to store the text data of the specified 2404 * entry. 2405 * 2406 * This function is only valid if @e has been successfully reserved using 2407 * prb_reserve(). 2408 * 2409 * Context: Any context. 2410 * Return: The size in bytes used by the text data of the associated record. 2411 */ 2412 unsigned int prb_record_text_space(struct prb_reserved_entry *e) 2413 { 2414 return e->text_space; 2415 } 2416