xref: /linux/kernel/printk/printk_ringbuffer.c (revision 5d83b9cbe7cf40746948a419c5018f4d617a86fa)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <linux/kernel.h>
4 #include <linux/irqflags.h>
5 #include <linux/string.h>
6 #include <linux/errno.h>
7 #include <linux/bug.h>
8 #include "printk_ringbuffer.h"
9 #include "internal.h"
10 
11 /**
12  * DOC: printk_ringbuffer overview
13  *
14  * Data Structure
15  * --------------
16  * The printk_ringbuffer is made up of 3 internal ringbuffers:
17  *
18  *   desc_ring
19  *     A ring of descriptors and their meta data (such as sequence number,
20  *     timestamp, loglevel, etc.) as well as internal state information about
21  *     the record and logical positions specifying where in the other
22  *     ringbuffer the text strings are located.
23  *
24  *   text_data_ring
25  *     A ring of data blocks. A data block consists of an unsigned long
26  *     integer (ID) that maps to a desc_ring index followed by the text
27  *     string of the record.
28  *
29  * The internal state information of a descriptor is the key element to allow
30  * readers and writers to locklessly synchronize access to the data.
31  *
32  * Implementation
33  * --------------
34  *
35  * Descriptor Ring
36  * ~~~~~~~~~~~~~~~
37  * The descriptor ring is an array of descriptors. A descriptor contains
38  * essential meta data to track the data of a printk record using
39  * blk_lpos structs pointing to associated text data blocks (see
40  * "Data Rings" below). Each descriptor is assigned an ID that maps
41  * directly to index values of the descriptor array and has a state. The ID
42  * and the state are bitwise combined into a single descriptor field named
43  * @state_var, allowing ID and state to be synchronously and atomically
44  * updated.
45  *
46  * Descriptors have four states:
47  *
48  *   reserved
49  *     A writer is modifying the record.
50  *
51  *   committed
52  *     The record and all its data are written. A writer can reopen the
53  *     descriptor (transitioning it back to reserved), but in the committed
54  *     state the data is consistent.
55  *
56  *   finalized
57  *     The record and all its data are complete and available for reading. A
58  *     writer cannot reopen the descriptor.
59  *
60  *   reusable
61  *     The record exists, but its text and/or meta data may no longer be
62  *     available.
63  *
64  * Querying the @state_var of a record requires providing the ID of the
65  * descriptor to query. This can yield a possible fifth (pseudo) state:
66  *
67  *   miss
68  *     The descriptor being queried has an unexpected ID.
69  *
70  * The descriptor ring has a @tail_id that contains the ID of the oldest
71  * descriptor and @head_id that contains the ID of the newest descriptor.
72  *
73  * When a new descriptor should be created (and the ring is full), the tail
74  * descriptor is invalidated by first transitioning to the reusable state and
75  * then invalidating all tail data blocks up to and including the data blocks
76  * associated with the tail descriptor (for the text ring). Then
77  * @tail_id is advanced, followed by advancing @head_id. And finally the
78  * @state_var of the new descriptor is initialized to the new ID and reserved
79  * state.
80  *
81  * The @tail_id can only be advanced if the new @tail_id would be in the
82  * committed or reusable queried state. This makes it possible that a valid
83  * sequence number of the tail is always available.
84  *
85  * Descriptor Finalization
86  * ~~~~~~~~~~~~~~~~~~~~~~~
87  * When a writer calls the commit function prb_commit(), record data is
88  * fully stored and is consistent within the ringbuffer. However, a writer can
89  * reopen that record, claiming exclusive access (as with prb_reserve()), and
90  * modify that record. When finished, the writer must again commit the record.
91  *
92  * In order for a record to be made available to readers (and also become
93  * recyclable for writers), it must be finalized. A finalized record cannot be
94  * reopened and can never become "unfinalized". Record finalization can occur
95  * in three different scenarios:
96  *
97  *   1) A writer can simultaneously commit and finalize its record by calling
98  *      prb_final_commit() instead of prb_commit().
99  *
100  *   2) When a new record is reserved and the previous record has been
101  *      committed via prb_commit(), that previous record is automatically
102  *      finalized.
103  *
104  *   3) When a record is committed via prb_commit() and a newer record
105  *      already exists, the record being committed is automatically finalized.
106  *
107  * Data Ring
108  * ~~~~~~~~~
109  * The text data ring is a byte array composed of data blocks. Data blocks are
110  * referenced by blk_lpos structs that point to the logical position of the
111  * beginning of a data block and the beginning of the next adjacent data
112  * block. Logical positions are mapped directly to index values of the byte
113  * array ringbuffer.
114  *
115  * Each data block consists of an ID followed by the writer data. The ID is
116  * the identifier of a descriptor that is associated with the data block. A
117  * given data block is considered valid if all of the following conditions
118  * are met:
119  *
120  *   1) The descriptor associated with the data block is in the committed
121  *      or finalized queried state.
122  *
123  *   2) The blk_lpos struct within the descriptor associated with the data
124  *      block references back to the same data block.
125  *
126  *   3) The data block is within the head/tail logical position range.
127  *
128  * If the writer data of a data block would extend beyond the end of the
129  * byte array, only the ID of the data block is stored at the logical
130  * position and the full data block (ID and writer data) is stored at the
131  * beginning of the byte array. The referencing blk_lpos will point to the
132  * ID before the wrap and the next data block will be at the logical
133  * position adjacent the full data block after the wrap.
134  *
135  * Data rings have a @tail_lpos that points to the beginning of the oldest
136  * data block and a @head_lpos that points to the logical position of the
137  * next (not yet existing) data block.
138  *
139  * When a new data block should be created (and the ring is full), tail data
140  * blocks will first be invalidated by putting their associated descriptors
141  * into the reusable state and then pushing the @tail_lpos forward beyond
142  * them. Then the @head_lpos is pushed forward and is associated with a new
143  * descriptor. If a data block is not valid, the @tail_lpos cannot be
144  * advanced beyond it.
145  *
146  * Info Array
147  * ~~~~~~~~~~
148  * The general meta data of printk records are stored in printk_info structs,
149  * stored in an array with the same number of elements as the descriptor ring.
150  * Each info corresponds to the descriptor of the same index in the
151  * descriptor ring. Info validity is confirmed by evaluating the corresponding
152  * descriptor before and after loading the info.
153  *
154  * Usage
155  * -----
156  * Here are some simple examples demonstrating writers and readers. For the
157  * examples a global ringbuffer (test_rb) is available (which is not the
158  * actual ringbuffer used by printk)::
159  *
160  *	DEFINE_PRINTKRB(test_rb, 15, 5);
161  *
162  * This ringbuffer allows up to 32768 records (2 ^ 15) and has a size of
163  * 1 MiB (2 ^ (15 + 5)) for text data.
164  *
165  * Sample writer code::
166  *
167  *	const char *textstr = "message text";
168  *	struct prb_reserved_entry e;
169  *	struct printk_record r;
170  *
171  *	// specify how much to allocate
172  *	prb_rec_init_wr(&r, strlen(textstr) + 1);
173  *
174  *	if (prb_reserve(&e, &test_rb, &r)) {
175  *		snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
176  *
177  *		r.info->text_len = strlen(textstr);
178  *		r.info->ts_nsec = local_clock();
179  *		r.info->caller_id = printk_caller_id();
180  *
181  *		// commit and finalize the record
182  *		prb_final_commit(&e);
183  *	}
184  *
185  * Note that additional writer functions are available to extend a record
186  * after it has been committed but not yet finalized. This can be done as
187  * long as no new records have been reserved and the caller is the same.
188  *
189  * Sample writer code (record extending)::
190  *
191  *		// alternate rest of previous example
192  *
193  *		r.info->text_len = strlen(textstr);
194  *		r.info->ts_nsec = local_clock();
195  *		r.info->caller_id = printk_caller_id();
196  *
197  *		// commit the record (but do not finalize yet)
198  *		prb_commit(&e);
199  *	}
200  *
201  *	...
202  *
203  *	// specify additional 5 bytes text space to extend
204  *	prb_rec_init_wr(&r, 5);
205  *
206  *	// try to extend, but only if it does not exceed 32 bytes
207  *	if (prb_reserve_in_last(&e, &test_rb, &r, printk_caller_id(), 32)) {
208  *		snprintf(&r.text_buf[r.info->text_len],
209  *			 r.text_buf_size - r.info->text_len, "hello");
210  *
211  *		r.info->text_len += 5;
212  *
213  *		// commit and finalize the record
214  *		prb_final_commit(&e);
215  *	}
216  *
217  * Sample reader code::
218  *
219  *	struct printk_info info;
220  *	struct printk_record r;
221  *	char text_buf[32];
222  *	u64 seq;
223  *
224  *	prb_rec_init_rd(&r, &info, &text_buf[0], sizeof(text_buf));
225  *
226  *	prb_for_each_record(0, &test_rb, &seq, &r) {
227  *		if (info.seq != seq)
228  *			pr_warn("lost %llu records\n", info.seq - seq);
229  *
230  *		if (info.text_len > r.text_buf_size) {
231  *			pr_warn("record %llu text truncated\n", info.seq);
232  *			text_buf[r.text_buf_size - 1] = 0;
233  *		}
234  *
235  *		pr_info("%llu: %llu: %s\n", info.seq, info.ts_nsec,
236  *			&text_buf[0]);
237  *	}
238  *
239  * Note that additional less convenient reader functions are available to
240  * allow complex record access.
241  *
242  * ABA Issues
243  * ~~~~~~~~~~
244  * To help avoid ABA issues, descriptors are referenced by IDs (array index
245  * values combined with tagged bits counting array wraps) and data blocks are
246  * referenced by logical positions (array index values combined with tagged
247  * bits counting array wraps). However, on 32-bit systems the number of
248  * tagged bits is relatively small such that an ABA incident is (at least
249  * theoretically) possible. For example, if 4 million maximally sized (1KiB)
250  * printk messages were to occur in NMI context on a 32-bit system, the
251  * interrupted context would not be able to recognize that the 32-bit integer
252  * completely wrapped and thus represents a different data block than the one
253  * the interrupted context expects.
254  *
255  * To help combat this possibility, additional state checking is performed
256  * (such as using cmpxchg() even though set() would suffice). These extra
257  * checks are commented as such and will hopefully catch any ABA issue that
258  * a 32-bit system might experience.
259  *
260  * Memory Barriers
261  * ~~~~~~~~~~~~~~~
262  * Multiple memory barriers are used. To simplify proving correctness and
263  * generating litmus tests, lines of code related to memory barriers
264  * (loads, stores, and the associated memory barriers) are labeled::
265  *
266  *	LMM(function:letter)
267  *
268  * Comments reference the labels using only the "function:letter" part.
269  *
270  * The memory barrier pairs and their ordering are:
271  *
272  *   desc_reserve:D / desc_reserve:B
273  *     push descriptor tail (id), then push descriptor head (id)
274  *
275  *   desc_reserve:D / data_push_tail:B
276  *     push data tail (lpos), then set new descriptor reserved (state)
277  *
278  *   desc_reserve:D / desc_push_tail:C
279  *     push descriptor tail (id), then set new descriptor reserved (state)
280  *
281  *   desc_reserve:D / prb_first_seq:C
282  *     push descriptor tail (id), then set new descriptor reserved (state)
283  *
284  *   desc_reserve:F / desc_read:D
285  *     set new descriptor id and reserved (state), then allow writer changes
286  *
287  *   data_alloc:A (or data_realloc:A) / desc_read:D
288  *     set old descriptor reusable (state), then modify new data block area
289  *
290  *   data_alloc:A (or data_realloc:A) / data_push_tail:B
291  *     push data tail (lpos), then modify new data block area
292  *
293  *   _prb_commit:B / desc_read:B
294  *     store writer changes, then set new descriptor committed (state)
295  *
296  *   desc_reopen_last:A / _prb_commit:B
297  *     set descriptor reserved (state), then read descriptor data
298  *
299  *   _prb_commit:B / desc_reserve:D
300  *     set new descriptor committed (state), then check descriptor head (id)
301  *
302  *   data_push_tail:D / data_push_tail:A
303  *     set descriptor reusable (state), then push data tail (lpos)
304  *
305  *   desc_push_tail:B / desc_reserve:D
306  *     set descriptor reusable (state), then push descriptor tail (id)
307  *
308  *   desc_update_last_finalized:A / desc_last_finalized_seq:A
309  *     store finalized record, then set new highest finalized sequence number
310  */
311 
312 #define DATA_SIZE(data_ring)		_DATA_SIZE((data_ring)->size_bits)
313 #define DATA_SIZE_MASK(data_ring)	(DATA_SIZE(data_ring) - 1)
314 
315 #define DESCS_COUNT(desc_ring)		_DESCS_COUNT((desc_ring)->count_bits)
316 #define DESCS_COUNT_MASK(desc_ring)	(DESCS_COUNT(desc_ring) - 1)
317 
318 /* Determine the data array index from a logical position. */
319 #define DATA_INDEX(data_ring, lpos)	((lpos) & DATA_SIZE_MASK(data_ring))
320 
321 /* Determine the desc array index from an ID or sequence number. */
322 #define DESC_INDEX(desc_ring, n)	((n) & DESCS_COUNT_MASK(desc_ring))
323 
324 /* Determine how many times the data array has wrapped. */
325 #define DATA_WRAPS(data_ring, lpos)	((lpos) >> (data_ring)->size_bits)
326 
327 /* Determine if a logical position refers to a data-less block. */
328 #define LPOS_DATALESS(lpos)		((lpos) & 1UL)
329 #define BLK_DATALESS(blk)		(LPOS_DATALESS((blk)->begin) && \
330 					 LPOS_DATALESS((blk)->next))
331 
332 /* Get the logical position at index 0 of the current wrap. */
333 #define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
334 ((lpos) & ~DATA_SIZE_MASK(data_ring))
335 
336 /* Get the ID for the same index of the previous wrap as the given ID. */
337 #define DESC_ID_PREV_WRAP(desc_ring, id) \
338 DESC_ID((id) - DESCS_COUNT(desc_ring))
339 
340 /*
341  * A data block: mapped directly to the beginning of the data block area
342  * specified as a logical position within the data ring.
343  *
344  * @id:   the ID of the associated descriptor
345  * @data: the writer data
346  *
347  * Note that the size of a data block is only known by its associated
348  * descriptor.
349  */
350 struct prb_data_block {
351 	unsigned long	id;
352 	char		data[];
353 };
354 
355 /*
356  * Return the descriptor associated with @n. @n can be either a
357  * descriptor ID or a sequence number.
358  */
359 static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
360 {
361 	return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
362 }
363 
364 /*
365  * Return the printk_info associated with @n. @n can be either a
366  * descriptor ID or a sequence number.
367  */
368 static struct printk_info *to_info(struct prb_desc_ring *desc_ring, u64 n)
369 {
370 	return &desc_ring->infos[DESC_INDEX(desc_ring, n)];
371 }
372 
373 static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
374 				       unsigned long begin_lpos)
375 {
376 	return (void *)&data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
377 }
378 
379 /*
380  * Increase the data size to account for data block meta data plus any
381  * padding so that the adjacent data block is aligned on the ID size.
382  */
383 static unsigned int to_blk_size(unsigned int size)
384 {
385 	struct prb_data_block *db = NULL;
386 
387 	size += sizeof(*db);
388 	size = ALIGN(size, sizeof(db->id));
389 	return size;
390 }
391 
392 /*
393  * Sanity checker for reserve size. The ringbuffer code assumes that a data
394  * block does not exceed the maximum possible size that could fit within the
395  * ringbuffer. This function provides that basic size check so that the
396  * assumption is safe.
397  */
398 static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
399 {
400 	struct prb_data_block *db = NULL;
401 
402 	if (size == 0)
403 		return true;
404 
405 	/*
406 	 * Ensure the alignment padded size could possibly fit in the data
407 	 * array. The largest possible data block must still leave room for
408 	 * at least the ID of the next block.
409 	 */
410 	size = to_blk_size(size);
411 	if (size > DATA_SIZE(data_ring) - sizeof(db->id))
412 		return false;
413 
414 	return true;
415 }
416 
417 /* Query the state of a descriptor. */
418 static enum desc_state get_desc_state(unsigned long id,
419 				      unsigned long state_val)
420 {
421 	if (id != DESC_ID(state_val))
422 		return desc_miss;
423 
424 	return DESC_STATE(state_val);
425 }
426 
427 /*
428  * Get a copy of a specified descriptor and return its queried state. If the
429  * descriptor is in an inconsistent state (miss or reserved), the caller can
430  * only expect the descriptor's @state_var field to be valid.
431  *
432  * The sequence number and caller_id can be optionally retrieved. Like all
433  * non-state_var data, they are only valid if the descriptor is in a
434  * consistent state.
435  */
436 static enum desc_state desc_read(struct prb_desc_ring *desc_ring,
437 				 unsigned long id, struct prb_desc *desc_out,
438 				 u64 *seq_out, u32 *caller_id_out)
439 {
440 	struct printk_info *info = to_info(desc_ring, id);
441 	struct prb_desc *desc = to_desc(desc_ring, id);
442 	atomic_long_t *state_var = &desc->state_var;
443 	enum desc_state d_state;
444 	unsigned long state_val;
445 
446 	/* Check the descriptor state. */
447 	state_val = atomic_long_read(state_var); /* LMM(desc_read:A) */
448 	d_state = get_desc_state(id, state_val);
449 	if (d_state == desc_miss || d_state == desc_reserved) {
450 		/*
451 		 * The descriptor is in an inconsistent state. Set at least
452 		 * @state_var so that the caller can see the details of
453 		 * the inconsistent state.
454 		 */
455 		goto out;
456 	}
457 
458 	/*
459 	 * Guarantee the state is loaded before copying the descriptor
460 	 * content. This avoids copying obsolete descriptor content that might
461 	 * not apply to the descriptor state. This pairs with _prb_commit:B.
462 	 *
463 	 * Memory barrier involvement:
464 	 *
465 	 * If desc_read:A reads from _prb_commit:B, then desc_read:C reads
466 	 * from _prb_commit:A.
467 	 *
468 	 * Relies on:
469 	 *
470 	 * WMB from _prb_commit:A to _prb_commit:B
471 	 *    matching
472 	 * RMB from desc_read:A to desc_read:C
473 	 */
474 	smp_rmb(); /* LMM(desc_read:B) */
475 
476 	/*
477 	 * Copy the descriptor data. The data is not valid until the
478 	 * state has been re-checked. A memcpy() for all of @desc
479 	 * cannot be used because of the atomic_t @state_var field.
480 	 */
481 	if (desc_out) {
482 		memcpy(&desc_out->text_blk_lpos, &desc->text_blk_lpos,
483 		       sizeof(desc_out->text_blk_lpos)); /* LMM(desc_read:C) */
484 	}
485 	if (seq_out)
486 		*seq_out = info->seq; /* also part of desc_read:C */
487 	if (caller_id_out)
488 		*caller_id_out = info->caller_id; /* also part of desc_read:C */
489 
490 	/*
491 	 * 1. Guarantee the descriptor content is loaded before re-checking
492 	 *    the state. This avoids reading an obsolete descriptor state
493 	 *    that may not apply to the copied content. This pairs with
494 	 *    desc_reserve:F.
495 	 *
496 	 *    Memory barrier involvement:
497 	 *
498 	 *    If desc_read:C reads from desc_reserve:G, then desc_read:E
499 	 *    reads from desc_reserve:F.
500 	 *
501 	 *    Relies on:
502 	 *
503 	 *    WMB from desc_reserve:F to desc_reserve:G
504 	 *       matching
505 	 *    RMB from desc_read:C to desc_read:E
506 	 *
507 	 * 2. Guarantee the record data is loaded before re-checking the
508 	 *    state. This avoids reading an obsolete descriptor state that may
509 	 *    not apply to the copied data. This pairs with data_alloc:A and
510 	 *    data_realloc:A.
511 	 *
512 	 *    Memory barrier involvement:
513 	 *
514 	 *    If copy_data:A reads from data_alloc:B, then desc_read:E
515 	 *    reads from desc_make_reusable:A.
516 	 *
517 	 *    Relies on:
518 	 *
519 	 *    MB from desc_make_reusable:A to data_alloc:B
520 	 *       matching
521 	 *    RMB from desc_read:C to desc_read:E
522 	 *
523 	 *    Note: desc_make_reusable:A and data_alloc:B can be different
524 	 *          CPUs. However, the data_alloc:B CPU (which performs the
525 	 *          full memory barrier) must have previously seen
526 	 *          desc_make_reusable:A.
527 	 */
528 	smp_rmb(); /* LMM(desc_read:D) */
529 
530 	/*
531 	 * The data has been copied. Return the current descriptor state,
532 	 * which may have changed since the load above.
533 	 */
534 	state_val = atomic_long_read(state_var); /* LMM(desc_read:E) */
535 	d_state = get_desc_state(id, state_val);
536 out:
537 	if (desc_out)
538 		atomic_long_set(&desc_out->state_var, state_val);
539 	return d_state;
540 }
541 
542 /*
543  * Take a specified descriptor out of the finalized state by attempting
544  * the transition from finalized to reusable. Either this context or some
545  * other context will have been successful.
546  */
547 static void desc_make_reusable(struct prb_desc_ring *desc_ring,
548 			       unsigned long id)
549 {
550 	unsigned long val_finalized = DESC_SV(id, desc_finalized);
551 	unsigned long val_reusable = DESC_SV(id, desc_reusable);
552 	struct prb_desc *desc = to_desc(desc_ring, id);
553 	atomic_long_t *state_var = &desc->state_var;
554 
555 	atomic_long_cmpxchg_relaxed(state_var, val_finalized,
556 				    val_reusable); /* LMM(desc_make_reusable:A) */
557 }
558 
559 /*
560  * Given the text data ring, put the associated descriptor of each
561  * data block from @lpos_begin until @lpos_end into the reusable state.
562  *
563  * If there is any problem making the associated descriptor reusable, either
564  * the descriptor has not yet been finalized or another writer context has
565  * already pushed the tail lpos past the problematic data block. Regardless,
566  * on error the caller can re-load the tail lpos to determine the situation.
567  */
568 static bool data_make_reusable(struct printk_ringbuffer *rb,
569 			       unsigned long lpos_begin,
570 			       unsigned long lpos_end,
571 			       unsigned long *lpos_out)
572 {
573 
574 	struct prb_data_ring *data_ring = &rb->text_data_ring;
575 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
576 	struct prb_data_block *blk;
577 	enum desc_state d_state;
578 	struct prb_desc desc;
579 	struct prb_data_blk_lpos *blk_lpos = &desc.text_blk_lpos;
580 	unsigned long id;
581 
582 	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
583 	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
584 		blk = to_block(data_ring, lpos_begin);
585 
586 		/*
587 		 * Load the block ID from the data block. This is a data race
588 		 * against a writer that may have newly reserved this data
589 		 * area. If the loaded value matches a valid descriptor ID,
590 		 * the blk_lpos of that descriptor will be checked to make
591 		 * sure it points back to this data block. If the check fails,
592 		 * the data area has been recycled by another writer.
593 		 */
594 		id = blk->id; /* LMM(data_make_reusable:A) */
595 
596 		d_state = desc_read(desc_ring, id, &desc,
597 				    NULL, NULL); /* LMM(data_make_reusable:B) */
598 
599 		switch (d_state) {
600 		case desc_miss:
601 		case desc_reserved:
602 		case desc_committed:
603 			return false;
604 		case desc_finalized:
605 			/*
606 			 * This data block is invalid if the descriptor
607 			 * does not point back to it.
608 			 */
609 			if (blk_lpos->begin != lpos_begin)
610 				return false;
611 			desc_make_reusable(desc_ring, id);
612 			break;
613 		case desc_reusable:
614 			/*
615 			 * This data block is invalid if the descriptor
616 			 * does not point back to it.
617 			 */
618 			if (blk_lpos->begin != lpos_begin)
619 				return false;
620 			break;
621 		}
622 
623 		/* Advance @lpos_begin to the next data block. */
624 		lpos_begin = blk_lpos->next;
625 	}
626 
627 	*lpos_out = lpos_begin;
628 	return true;
629 }
630 
631 /*
632  * Advance the data ring tail to at least @lpos. This function puts
633  * descriptors into the reusable state if the tail is pushed beyond
634  * their associated data block.
635  */
636 static bool data_push_tail(struct printk_ringbuffer *rb, unsigned long lpos)
637 {
638 	struct prb_data_ring *data_ring = &rb->text_data_ring;
639 	unsigned long tail_lpos_new;
640 	unsigned long tail_lpos;
641 	unsigned long next_lpos;
642 
643 	/* If @lpos is from a data-less block, there is nothing to do. */
644 	if (LPOS_DATALESS(lpos))
645 		return true;
646 
647 	/*
648 	 * Any descriptor states that have transitioned to reusable due to the
649 	 * data tail being pushed to this loaded value will be visible to this
650 	 * CPU. This pairs with data_push_tail:D.
651 	 *
652 	 * Memory barrier involvement:
653 	 *
654 	 * If data_push_tail:A reads from data_push_tail:D, then this CPU can
655 	 * see desc_make_reusable:A.
656 	 *
657 	 * Relies on:
658 	 *
659 	 * MB from desc_make_reusable:A to data_push_tail:D
660 	 *    matches
661 	 * READFROM from data_push_tail:D to data_push_tail:A
662 	 *    thus
663 	 * READFROM from desc_make_reusable:A to this CPU
664 	 */
665 	tail_lpos = atomic_long_read(&data_ring->tail_lpos); /* LMM(data_push_tail:A) */
666 
667 	/*
668 	 * Loop until the tail lpos is at or beyond @lpos. This condition
669 	 * may already be satisfied, resulting in no full memory barrier
670 	 * from data_push_tail:D being performed. However, since this CPU
671 	 * sees the new tail lpos, any descriptor states that transitioned to
672 	 * the reusable state must already be visible.
673 	 */
674 	while ((lpos - tail_lpos) - 1 < DATA_SIZE(data_ring)) {
675 		/*
676 		 * Make all descriptors reusable that are associated with
677 		 * data blocks before @lpos.
678 		 */
679 		if (!data_make_reusable(rb, tail_lpos, lpos, &next_lpos)) {
680 			/*
681 			 * 1. Guarantee the block ID loaded in
682 			 *    data_make_reusable() is performed before
683 			 *    reloading the tail lpos. The failed
684 			 *    data_make_reusable() may be due to a newly
685 			 *    recycled data area causing the tail lpos to
686 			 *    have been previously pushed. This pairs with
687 			 *    data_alloc:A and data_realloc:A.
688 			 *
689 			 *    Memory barrier involvement:
690 			 *
691 			 *    If data_make_reusable:A reads from data_alloc:B,
692 			 *    then data_push_tail:C reads from
693 			 *    data_push_tail:D.
694 			 *
695 			 *    Relies on:
696 			 *
697 			 *    MB from data_push_tail:D to data_alloc:B
698 			 *       matching
699 			 *    RMB from data_make_reusable:A to
700 			 *    data_push_tail:C
701 			 *
702 			 *    Note: data_push_tail:D and data_alloc:B can be
703 			 *          different CPUs. However, the data_alloc:B
704 			 *          CPU (which performs the full memory
705 			 *          barrier) must have previously seen
706 			 *          data_push_tail:D.
707 			 *
708 			 * 2. Guarantee the descriptor state loaded in
709 			 *    data_make_reusable() is performed before
710 			 *    reloading the tail lpos. The failed
711 			 *    data_make_reusable() may be due to a newly
712 			 *    recycled descriptor causing the tail lpos to
713 			 *    have been previously pushed. This pairs with
714 			 *    desc_reserve:D.
715 			 *
716 			 *    Memory barrier involvement:
717 			 *
718 			 *    If data_make_reusable:B reads from
719 			 *    desc_reserve:F, then data_push_tail:C reads
720 			 *    from data_push_tail:D.
721 			 *
722 			 *    Relies on:
723 			 *
724 			 *    MB from data_push_tail:D to desc_reserve:F
725 			 *       matching
726 			 *    RMB from data_make_reusable:B to
727 			 *    data_push_tail:C
728 			 *
729 			 *    Note: data_push_tail:D and desc_reserve:F can
730 			 *          be different CPUs. However, the
731 			 *          desc_reserve:F CPU (which performs the
732 			 *          full memory barrier) must have previously
733 			 *          seen data_push_tail:D.
734 			 */
735 			smp_rmb(); /* LMM(data_push_tail:B) */
736 
737 			tail_lpos_new = atomic_long_read(&data_ring->tail_lpos
738 							); /* LMM(data_push_tail:C) */
739 			if (tail_lpos_new == tail_lpos)
740 				return false;
741 
742 			/* Another CPU pushed the tail. Try again. */
743 			tail_lpos = tail_lpos_new;
744 			continue;
745 		}
746 
747 		/*
748 		 * Guarantee any descriptor states that have transitioned to
749 		 * reusable are stored before pushing the tail lpos. A full
750 		 * memory barrier is needed since other CPUs may have made
751 		 * the descriptor states reusable. This pairs with
752 		 * data_push_tail:A.
753 		 */
754 		if (atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos,
755 					    next_lpos)) { /* LMM(data_push_tail:D) */
756 			break;
757 		}
758 	}
759 
760 	return true;
761 }
762 
763 /*
764  * Advance the desc ring tail. This function advances the tail by one
765  * descriptor, thus invalidating the oldest descriptor. Before advancing
766  * the tail, the tail descriptor is made reusable and all data blocks up to
767  * and including the descriptor's data block are invalidated (i.e. the data
768  * ring tail is pushed past the data block of the descriptor being made
769  * reusable).
770  */
771 static bool desc_push_tail(struct printk_ringbuffer *rb,
772 			   unsigned long tail_id)
773 {
774 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
775 	enum desc_state d_state;
776 	struct prb_desc desc;
777 
778 	d_state = desc_read(desc_ring, tail_id, &desc, NULL, NULL);
779 
780 	switch (d_state) {
781 	case desc_miss:
782 		/*
783 		 * If the ID is exactly 1 wrap behind the expected, it is
784 		 * in the process of being reserved by another writer and
785 		 * must be considered reserved.
786 		 */
787 		if (DESC_ID(atomic_long_read(&desc.state_var)) ==
788 		    DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
789 			return false;
790 		}
791 
792 		/*
793 		 * The ID has changed. Another writer must have pushed the
794 		 * tail and recycled the descriptor already. Success is
795 		 * returned because the caller is only interested in the
796 		 * specified tail being pushed, which it was.
797 		 */
798 		return true;
799 	case desc_reserved:
800 	case desc_committed:
801 		return false;
802 	case desc_finalized:
803 		desc_make_reusable(desc_ring, tail_id);
804 		break;
805 	case desc_reusable:
806 		break;
807 	}
808 
809 	/*
810 	 * Data blocks must be invalidated before their associated
811 	 * descriptor can be made available for recycling. Invalidating
812 	 * them later is not possible because there is no way to trust
813 	 * data blocks once their associated descriptor is gone.
814 	 */
815 
816 	if (!data_push_tail(rb, desc.text_blk_lpos.next))
817 		return false;
818 
819 	/*
820 	 * Check the next descriptor after @tail_id before pushing the tail
821 	 * to it because the tail must always be in a finalized or reusable
822 	 * state. The implementation of prb_first_seq() relies on this.
823 	 *
824 	 * A successful read implies that the next descriptor is less than or
825 	 * equal to @head_id so there is no risk of pushing the tail past the
826 	 * head.
827 	 */
828 	d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc,
829 			    NULL, NULL); /* LMM(desc_push_tail:A) */
830 
831 	if (d_state == desc_finalized || d_state == desc_reusable) {
832 		/*
833 		 * Guarantee any descriptor states that have transitioned to
834 		 * reusable are stored before pushing the tail ID. This allows
835 		 * verifying the recycled descriptor state. A full memory
836 		 * barrier is needed since other CPUs may have made the
837 		 * descriptor states reusable. This pairs with desc_reserve:D.
838 		 */
839 		atomic_long_cmpxchg(&desc_ring->tail_id, tail_id,
840 				    DESC_ID(tail_id + 1)); /* LMM(desc_push_tail:B) */
841 	} else {
842 		/*
843 		 * Guarantee the last state load from desc_read() is before
844 		 * reloading @tail_id in order to see a new tail ID in the
845 		 * case that the descriptor has been recycled. This pairs
846 		 * with desc_reserve:D.
847 		 *
848 		 * Memory barrier involvement:
849 		 *
850 		 * If desc_push_tail:A reads from desc_reserve:F, then
851 		 * desc_push_tail:D reads from desc_push_tail:B.
852 		 *
853 		 * Relies on:
854 		 *
855 		 * MB from desc_push_tail:B to desc_reserve:F
856 		 *    matching
857 		 * RMB from desc_push_tail:A to desc_push_tail:D
858 		 *
859 		 * Note: desc_push_tail:B and desc_reserve:F can be different
860 		 *       CPUs. However, the desc_reserve:F CPU (which performs
861 		 *       the full memory barrier) must have previously seen
862 		 *       desc_push_tail:B.
863 		 */
864 		smp_rmb(); /* LMM(desc_push_tail:C) */
865 
866 		/*
867 		 * Re-check the tail ID. The descriptor following @tail_id is
868 		 * not in an allowed tail state. But if the tail has since
869 		 * been moved by another CPU, then it does not matter.
870 		 */
871 		if (atomic_long_read(&desc_ring->tail_id) == tail_id) /* LMM(desc_push_tail:D) */
872 			return false;
873 	}
874 
875 	return true;
876 }
877 
878 /* Reserve a new descriptor, invalidating the oldest if necessary. */
879 static bool desc_reserve(struct printk_ringbuffer *rb, unsigned long *id_out)
880 {
881 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
882 	unsigned long prev_state_val;
883 	unsigned long id_prev_wrap;
884 	struct prb_desc *desc;
885 	unsigned long head_id;
886 	unsigned long id;
887 
888 	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(desc_reserve:A) */
889 
890 	do {
891 		id = DESC_ID(head_id + 1);
892 		id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
893 
894 		/*
895 		 * Guarantee the head ID is read before reading the tail ID.
896 		 * Since the tail ID is updated before the head ID, this
897 		 * guarantees that @id_prev_wrap is never ahead of the tail
898 		 * ID. This pairs with desc_reserve:D.
899 		 *
900 		 * Memory barrier involvement:
901 		 *
902 		 * If desc_reserve:A reads from desc_reserve:D, then
903 		 * desc_reserve:C reads from desc_push_tail:B.
904 		 *
905 		 * Relies on:
906 		 *
907 		 * MB from desc_push_tail:B to desc_reserve:D
908 		 *    matching
909 		 * RMB from desc_reserve:A to desc_reserve:C
910 		 *
911 		 * Note: desc_push_tail:B and desc_reserve:D can be different
912 		 *       CPUs. However, the desc_reserve:D CPU (which performs
913 		 *       the full memory barrier) must have previously seen
914 		 *       desc_push_tail:B.
915 		 */
916 		smp_rmb(); /* LMM(desc_reserve:B) */
917 
918 		if (id_prev_wrap == atomic_long_read(&desc_ring->tail_id
919 						    )) { /* LMM(desc_reserve:C) */
920 			/*
921 			 * Make space for the new descriptor by
922 			 * advancing the tail.
923 			 */
924 			if (!desc_push_tail(rb, id_prev_wrap))
925 				return false;
926 		}
927 
928 		/*
929 		 * 1. Guarantee the tail ID is read before validating the
930 		 *    recycled descriptor state. A read memory barrier is
931 		 *    sufficient for this. This pairs with desc_push_tail:B.
932 		 *
933 		 *    Memory barrier involvement:
934 		 *
935 		 *    If desc_reserve:C reads from desc_push_tail:B, then
936 		 *    desc_reserve:E reads from desc_make_reusable:A.
937 		 *
938 		 *    Relies on:
939 		 *
940 		 *    MB from desc_make_reusable:A to desc_push_tail:B
941 		 *       matching
942 		 *    RMB from desc_reserve:C to desc_reserve:E
943 		 *
944 		 *    Note: desc_make_reusable:A and desc_push_tail:B can be
945 		 *          different CPUs. However, the desc_push_tail:B CPU
946 		 *          (which performs the full memory barrier) must have
947 		 *          previously seen desc_make_reusable:A.
948 		 *
949 		 * 2. Guarantee the tail ID is stored before storing the head
950 		 *    ID. This pairs with desc_reserve:B.
951 		 *
952 		 * 3. Guarantee any data ring tail changes are stored before
953 		 *    recycling the descriptor. Data ring tail changes can
954 		 *    happen via desc_push_tail()->data_push_tail(). A full
955 		 *    memory barrier is needed since another CPU may have
956 		 *    pushed the data ring tails. This pairs with
957 		 *    data_push_tail:B.
958 		 *
959 		 * 4. Guarantee a new tail ID is stored before recycling the
960 		 *    descriptor. A full memory barrier is needed since
961 		 *    another CPU may have pushed the tail ID. This pairs
962 		 *    with desc_push_tail:C and this also pairs with
963 		 *    prb_first_seq:C.
964 		 *
965 		 * 5. Guarantee the head ID is stored before trying to
966 		 *    finalize the previous descriptor. This pairs with
967 		 *    _prb_commit:B.
968 		 */
969 	} while (!atomic_long_try_cmpxchg(&desc_ring->head_id, &head_id,
970 					  id)); /* LMM(desc_reserve:D) */
971 
972 	desc = to_desc(desc_ring, id);
973 
974 	/*
975 	 * If the descriptor has been recycled, verify the old state val.
976 	 * See "ABA Issues" about why this verification is performed.
977 	 */
978 	prev_state_val = atomic_long_read(&desc->state_var); /* LMM(desc_reserve:E) */
979 	if (prev_state_val &&
980 	    get_desc_state(id_prev_wrap, prev_state_val) != desc_reusable) {
981 		WARN_ON_ONCE(1);
982 		return false;
983 	}
984 
985 	/*
986 	 * Assign the descriptor a new ID and set its state to reserved.
987 	 * See "ABA Issues" about why cmpxchg() instead of set() is used.
988 	 *
989 	 * Guarantee the new descriptor ID and state is stored before making
990 	 * any other changes. A write memory barrier is sufficient for this.
991 	 * This pairs with desc_read:D.
992 	 */
993 	if (!atomic_long_try_cmpxchg(&desc->state_var, &prev_state_val,
994 			DESC_SV(id, desc_reserved))) { /* LMM(desc_reserve:F) */
995 		WARN_ON_ONCE(1);
996 		return false;
997 	}
998 
999 	/* Now data in @desc can be modified: LMM(desc_reserve:G) */
1000 
1001 	*id_out = id;
1002 	return true;
1003 }
1004 
1005 /* Determine the end of a data block. */
1006 static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
1007 				   unsigned long lpos, unsigned int size)
1008 {
1009 	unsigned long begin_lpos;
1010 	unsigned long next_lpos;
1011 
1012 	begin_lpos = lpos;
1013 	next_lpos = lpos + size;
1014 
1015 	/* First check if the data block does not wrap. */
1016 	if (DATA_WRAPS(data_ring, begin_lpos) == DATA_WRAPS(data_ring, next_lpos))
1017 		return next_lpos;
1018 
1019 	/* Wrapping data blocks store their data at the beginning. */
1020 	return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
1021 }
1022 
1023 /*
1024  * Allocate a new data block, invalidating the oldest data block(s)
1025  * if necessary. This function also associates the data block with
1026  * a specified descriptor.
1027  */
1028 static char *data_alloc(struct printk_ringbuffer *rb, unsigned int size,
1029 			struct prb_data_blk_lpos *blk_lpos, unsigned long id)
1030 {
1031 	struct prb_data_ring *data_ring = &rb->text_data_ring;
1032 	struct prb_data_block *blk;
1033 	unsigned long begin_lpos;
1034 	unsigned long next_lpos;
1035 
1036 	if (size == 0) {
1037 		/*
1038 		 * Data blocks are not created for empty lines. Instead, the
1039 		 * reader will recognize these special lpos values and handle
1040 		 * it appropriately.
1041 		 */
1042 		blk_lpos->begin = EMPTY_LINE_LPOS;
1043 		blk_lpos->next = EMPTY_LINE_LPOS;
1044 		return NULL;
1045 	}
1046 
1047 	size = to_blk_size(size);
1048 
1049 	begin_lpos = atomic_long_read(&data_ring->head_lpos);
1050 
1051 	do {
1052 		next_lpos = get_next_lpos(data_ring, begin_lpos, size);
1053 
1054 		if (!data_push_tail(rb, next_lpos - DATA_SIZE(data_ring))) {
1055 			/* Failed to allocate, specify a data-less block. */
1056 			blk_lpos->begin = FAILED_LPOS;
1057 			blk_lpos->next = FAILED_LPOS;
1058 			return NULL;
1059 		}
1060 
1061 		/*
1062 		 * 1. Guarantee any descriptor states that have transitioned
1063 		 *    to reusable are stored before modifying the newly
1064 		 *    allocated data area. A full memory barrier is needed
1065 		 *    since other CPUs may have made the descriptor states
1066 		 *    reusable. See data_push_tail:A about why the reusable
1067 		 *    states are visible. This pairs with desc_read:D.
1068 		 *
1069 		 * 2. Guarantee any updated tail lpos is stored before
1070 		 *    modifying the newly allocated data area. Another CPU may
1071 		 *    be in data_make_reusable() and is reading a block ID
1072 		 *    from this area. data_make_reusable() can handle reading
1073 		 *    a garbage block ID value, but then it must be able to
1074 		 *    load a new tail lpos. A full memory barrier is needed
1075 		 *    since other CPUs may have updated the tail lpos. This
1076 		 *    pairs with data_push_tail:B.
1077 		 */
1078 	} while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
1079 					  next_lpos)); /* LMM(data_alloc:A) */
1080 
1081 	blk = to_block(data_ring, begin_lpos);
1082 	blk->id = id; /* LMM(data_alloc:B) */
1083 
1084 	if (DATA_WRAPS(data_ring, begin_lpos) != DATA_WRAPS(data_ring, next_lpos)) {
1085 		/* Wrapping data blocks store their data at the beginning. */
1086 		blk = to_block(data_ring, 0);
1087 
1088 		/*
1089 		 * Store the ID on the wrapped block for consistency.
1090 		 * The printk_ringbuffer does not actually use it.
1091 		 */
1092 		blk->id = id;
1093 	}
1094 
1095 	blk_lpos->begin = begin_lpos;
1096 	blk_lpos->next = next_lpos;
1097 
1098 	return &blk->data[0];
1099 }
1100 
1101 /*
1102  * Try to resize an existing data block associated with the descriptor
1103  * specified by @id. If the resized data block should become wrapped, it
1104  * copies the old data to the new data block. If @size yields a data block
1105  * with the same or less size, the data block is left as is.
1106  *
1107  * Fail if this is not the last allocated data block or if there is not
1108  * enough space or it is not possible make enough space.
1109  *
1110  * Return a pointer to the beginning of the entire data buffer or NULL on
1111  * failure.
1112  */
1113 static char *data_realloc(struct printk_ringbuffer *rb, unsigned int size,
1114 			  struct prb_data_blk_lpos *blk_lpos, unsigned long id)
1115 {
1116 	struct prb_data_ring *data_ring = &rb->text_data_ring;
1117 	struct prb_data_block *blk;
1118 	unsigned long head_lpos;
1119 	unsigned long next_lpos;
1120 	bool wrapped;
1121 
1122 	/* Reallocation only works if @blk_lpos is the newest data block. */
1123 	head_lpos = atomic_long_read(&data_ring->head_lpos);
1124 	if (head_lpos != blk_lpos->next)
1125 		return NULL;
1126 
1127 	/* Keep track if @blk_lpos was a wrapping data block. */
1128 	wrapped = (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, blk_lpos->next));
1129 
1130 	size = to_blk_size(size);
1131 
1132 	next_lpos = get_next_lpos(data_ring, blk_lpos->begin, size);
1133 
1134 	/* If the data block does not increase, there is nothing to do. */
1135 	if (head_lpos - next_lpos < DATA_SIZE(data_ring)) {
1136 		if (wrapped)
1137 			blk = to_block(data_ring, 0);
1138 		else
1139 			blk = to_block(data_ring, blk_lpos->begin);
1140 		return &blk->data[0];
1141 	}
1142 
1143 	if (!data_push_tail(rb, next_lpos - DATA_SIZE(data_ring)))
1144 		return NULL;
1145 
1146 	/* The memory barrier involvement is the same as data_alloc:A. */
1147 	if (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &head_lpos,
1148 				     next_lpos)) { /* LMM(data_realloc:A) */
1149 		return NULL;
1150 	}
1151 
1152 	blk = to_block(data_ring, blk_lpos->begin);
1153 
1154 	if (DATA_WRAPS(data_ring, blk_lpos->begin) != DATA_WRAPS(data_ring, next_lpos)) {
1155 		struct prb_data_block *old_blk = blk;
1156 
1157 		/* Wrapping data blocks store their data at the beginning. */
1158 		blk = to_block(data_ring, 0);
1159 
1160 		/*
1161 		 * Store the ID on the wrapped block for consistency.
1162 		 * The printk_ringbuffer does not actually use it.
1163 		 */
1164 		blk->id = id;
1165 
1166 		if (!wrapped) {
1167 			/*
1168 			 * Since the allocated space is now in the newly
1169 			 * created wrapping data block, copy the content
1170 			 * from the old data block.
1171 			 */
1172 			memcpy(&blk->data[0], &old_blk->data[0],
1173 			       (blk_lpos->next - blk_lpos->begin) - sizeof(blk->id));
1174 		}
1175 	}
1176 
1177 	blk_lpos->next = next_lpos;
1178 
1179 	return &blk->data[0];
1180 }
1181 
1182 /* Return the number of bytes used by a data block. */
1183 static unsigned int space_used(struct prb_data_ring *data_ring,
1184 			       struct prb_data_blk_lpos *blk_lpos)
1185 {
1186 	/* Data-less blocks take no space. */
1187 	if (BLK_DATALESS(blk_lpos))
1188 		return 0;
1189 
1190 	if (DATA_WRAPS(data_ring, blk_lpos->begin) == DATA_WRAPS(data_ring, blk_lpos->next)) {
1191 		/* Data block does not wrap. */
1192 		return (DATA_INDEX(data_ring, blk_lpos->next) -
1193 			DATA_INDEX(data_ring, blk_lpos->begin));
1194 	}
1195 
1196 	/*
1197 	 * For wrapping data blocks, the trailing (wasted) space is
1198 	 * also counted.
1199 	 */
1200 	return (DATA_INDEX(data_ring, blk_lpos->next) +
1201 		DATA_SIZE(data_ring) - DATA_INDEX(data_ring, blk_lpos->begin));
1202 }
1203 
1204 /*
1205  * Given @blk_lpos, return a pointer to the writer data from the data block
1206  * and calculate the size of the data part. A NULL pointer is returned if
1207  * @blk_lpos specifies values that could never be legal.
1208  *
1209  * This function (used by readers) performs strict validation on the lpos
1210  * values to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
1211  * triggered if an internal error is detected.
1212  */
1213 static const char *get_data(struct prb_data_ring *data_ring,
1214 			    struct prb_data_blk_lpos *blk_lpos,
1215 			    unsigned int *data_size)
1216 {
1217 	struct prb_data_block *db;
1218 
1219 	/* Data-less data block description. */
1220 	if (BLK_DATALESS(blk_lpos)) {
1221 		/*
1222 		 * Records that are just empty lines are also valid, even
1223 		 * though they do not have a data block. For such records
1224 		 * explicitly return empty string data to signify success.
1225 		 */
1226 		if (blk_lpos->begin == EMPTY_LINE_LPOS &&
1227 		    blk_lpos->next == EMPTY_LINE_LPOS) {
1228 			*data_size = 0;
1229 			return "";
1230 		}
1231 
1232 		/* Data lost, invalid, or otherwise unavailable. */
1233 		return NULL;
1234 	}
1235 
1236 	/* Regular data block: @begin less than @next and in same wrap. */
1237 	if (DATA_WRAPS(data_ring, blk_lpos->begin) == DATA_WRAPS(data_ring, blk_lpos->next) &&
1238 	    blk_lpos->begin < blk_lpos->next) {
1239 		db = to_block(data_ring, blk_lpos->begin);
1240 		*data_size = blk_lpos->next - blk_lpos->begin;
1241 
1242 	/* Wrapping data block: @begin is one wrap behind @next. */
1243 	} else if (DATA_WRAPS(data_ring, blk_lpos->begin + DATA_SIZE(data_ring)) ==
1244 		   DATA_WRAPS(data_ring, blk_lpos->next)) {
1245 		db = to_block(data_ring, 0);
1246 		*data_size = DATA_INDEX(data_ring, blk_lpos->next);
1247 
1248 	/* Illegal block description. */
1249 	} else {
1250 		WARN_ON_ONCE(1);
1251 		return NULL;
1252 	}
1253 
1254 	/* A valid data block will always be aligned to the ID size. */
1255 	if (WARN_ON_ONCE(blk_lpos->begin != ALIGN(blk_lpos->begin, sizeof(db->id))) ||
1256 	    WARN_ON_ONCE(blk_lpos->next != ALIGN(blk_lpos->next, sizeof(db->id)))) {
1257 		return NULL;
1258 	}
1259 
1260 	/* A valid data block will always have at least an ID. */
1261 	if (WARN_ON_ONCE(*data_size < sizeof(db->id)))
1262 		return NULL;
1263 
1264 	/* Subtract block ID space from size to reflect data size. */
1265 	*data_size -= sizeof(db->id);
1266 
1267 	return &db->data[0];
1268 }
1269 
1270 /*
1271  * Attempt to transition the newest descriptor from committed back to reserved
1272  * so that the record can be modified by a writer again. This is only possible
1273  * if the descriptor is not yet finalized and the provided @caller_id matches.
1274  */
1275 static struct prb_desc *desc_reopen_last(struct prb_desc_ring *desc_ring,
1276 					 u32 caller_id, unsigned long *id_out)
1277 {
1278 	unsigned long prev_state_val;
1279 	enum desc_state d_state;
1280 	struct prb_desc desc;
1281 	struct prb_desc *d;
1282 	unsigned long id;
1283 	u32 cid;
1284 
1285 	id = atomic_long_read(&desc_ring->head_id);
1286 
1287 	/*
1288 	 * To reduce unnecessarily reopening, first check if the descriptor
1289 	 * state and caller ID are correct.
1290 	 */
1291 	d_state = desc_read(desc_ring, id, &desc, NULL, &cid);
1292 	if (d_state != desc_committed || cid != caller_id)
1293 		return NULL;
1294 
1295 	d = to_desc(desc_ring, id);
1296 
1297 	prev_state_val = DESC_SV(id, desc_committed);
1298 
1299 	/*
1300 	 * Guarantee the reserved state is stored before reading any
1301 	 * record data. A full memory barrier is needed because @state_var
1302 	 * modification is followed by reading. This pairs with _prb_commit:B.
1303 	 *
1304 	 * Memory barrier involvement:
1305 	 *
1306 	 * If desc_reopen_last:A reads from _prb_commit:B, then
1307 	 * prb_reserve_in_last:A reads from _prb_commit:A.
1308 	 *
1309 	 * Relies on:
1310 	 *
1311 	 * WMB from _prb_commit:A to _prb_commit:B
1312 	 *    matching
1313 	 * MB If desc_reopen_last:A to prb_reserve_in_last:A
1314 	 */
1315 	if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
1316 			DESC_SV(id, desc_reserved))) { /* LMM(desc_reopen_last:A) */
1317 		return NULL;
1318 	}
1319 
1320 	*id_out = id;
1321 	return d;
1322 }
1323 
1324 /**
1325  * prb_reserve_in_last() - Re-reserve and extend the space in the ringbuffer
1326  *                         used by the newest record.
1327  *
1328  * @e:         The entry structure to setup.
1329  * @rb:        The ringbuffer to re-reserve and extend data in.
1330  * @r:         The record structure to allocate buffers for.
1331  * @caller_id: The caller ID of the caller (reserving writer).
1332  * @max_size:  Fail if the extended size would be greater than this.
1333  *
1334  * This is the public function available to writers to re-reserve and extend
1335  * data.
1336  *
1337  * The writer specifies the text size to extend (not the new total size) by
1338  * setting the @text_buf_size field of @r. To ensure proper initialization
1339  * of @r, prb_rec_init_wr() should be used.
1340  *
1341  * This function will fail if @caller_id does not match the caller ID of the
1342  * newest record. In that case the caller must reserve new data using
1343  * prb_reserve().
1344  *
1345  * Context: Any context. Disables local interrupts on success.
1346  * Return: true if text data could be extended, otherwise false.
1347  *
1348  * On success:
1349  *
1350  *   - @r->text_buf points to the beginning of the entire text buffer.
1351  *
1352  *   - @r->text_buf_size is set to the new total size of the buffer.
1353  *
1354  *   - @r->info is not touched so that @r->info->text_len could be used
1355  *     to append the text.
1356  *
1357  *   - prb_record_text_space() can be used on @e to query the new
1358  *     actually used space.
1359  *
1360  * Important: All @r->info fields will already be set with the current values
1361  *            for the record. I.e. @r->info->text_len will be less than
1362  *            @text_buf_size. Writers can use @r->info->text_len to know
1363  *            where concatenation begins and writers should update
1364  *            @r->info->text_len after concatenating.
1365  */
1366 bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
1367 			 struct printk_record *r, u32 caller_id, unsigned int max_size)
1368 {
1369 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1370 	struct printk_info *info;
1371 	unsigned int data_size;
1372 	struct prb_desc *d;
1373 	unsigned long id;
1374 
1375 	local_irq_save(e->irqflags);
1376 
1377 	/* Transition the newest descriptor back to the reserved state. */
1378 	d = desc_reopen_last(desc_ring, caller_id, &id);
1379 	if (!d) {
1380 		local_irq_restore(e->irqflags);
1381 		goto fail_reopen;
1382 	}
1383 
1384 	/* Now the writer has exclusive access: LMM(prb_reserve_in_last:A) */
1385 
1386 	info = to_info(desc_ring, id);
1387 
1388 	/*
1389 	 * Set the @e fields here so that prb_commit() can be used if
1390 	 * anything fails from now on.
1391 	 */
1392 	e->rb = rb;
1393 	e->id = id;
1394 
1395 	/*
1396 	 * desc_reopen_last() checked the caller_id, but there was no
1397 	 * exclusive access at that point. The descriptor may have
1398 	 * changed since then.
1399 	 */
1400 	if (caller_id != info->caller_id)
1401 		goto fail;
1402 
1403 	if (BLK_DATALESS(&d->text_blk_lpos)) {
1404 		if (WARN_ON_ONCE(info->text_len != 0)) {
1405 			pr_warn_once("wrong text_len value (%hu, expecting 0)\n",
1406 				     info->text_len);
1407 			info->text_len = 0;
1408 		}
1409 
1410 		if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
1411 			goto fail;
1412 
1413 		if (r->text_buf_size > max_size)
1414 			goto fail;
1415 
1416 		r->text_buf = data_alloc(rb, r->text_buf_size,
1417 					 &d->text_blk_lpos, id);
1418 	} else {
1419 		if (!get_data(&rb->text_data_ring, &d->text_blk_lpos, &data_size))
1420 			goto fail;
1421 
1422 		/*
1423 		 * Increase the buffer size to include the original size. If
1424 		 * the meta data (@text_len) is not sane, use the full data
1425 		 * block size.
1426 		 */
1427 		if (WARN_ON_ONCE(info->text_len > data_size)) {
1428 			pr_warn_once("wrong text_len value (%hu, expecting <=%u)\n",
1429 				     info->text_len, data_size);
1430 			info->text_len = data_size;
1431 		}
1432 		r->text_buf_size += info->text_len;
1433 
1434 		if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
1435 			goto fail;
1436 
1437 		if (r->text_buf_size > max_size)
1438 			goto fail;
1439 
1440 		r->text_buf = data_realloc(rb, r->text_buf_size,
1441 					   &d->text_blk_lpos, id);
1442 	}
1443 	if (r->text_buf_size && !r->text_buf)
1444 		goto fail;
1445 
1446 	r->info = info;
1447 
1448 	e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
1449 
1450 	return true;
1451 fail:
1452 	prb_commit(e);
1453 	/* prb_commit() re-enabled interrupts. */
1454 fail_reopen:
1455 	/* Make it clear to the caller that the re-reserve failed. */
1456 	memset(r, 0, sizeof(*r));
1457 	return false;
1458 }
1459 
1460 /*
1461  * @last_finalized_seq value guarantees that all records up to and including
1462  * this sequence number are finalized and can be read. The only exception are
1463  * too old records which have already been overwritten.
1464  *
1465  * It is also guaranteed that @last_finalized_seq only increases.
1466  *
1467  * Be aware that finalized records following non-finalized records are not
1468  * reported because they are not yet available to the reader. For example,
1469  * a new record stored via printk() will not be available to a printer if
1470  * it follows a record that has not been finalized yet. However, once that
1471  * non-finalized record becomes finalized, @last_finalized_seq will be
1472  * appropriately updated and the full set of finalized records will be
1473  * available to the printer. And since each printk() caller will either
1474  * directly print or trigger deferred printing of all available unprinted
1475  * records, all printk() messages will get printed.
1476  */
1477 static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
1478 {
1479 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1480 	unsigned long ulseq;
1481 
1482 	/*
1483 	 * Guarantee the sequence number is loaded before loading the
1484 	 * associated record in order to guarantee that the record can be
1485 	 * seen by this CPU. This pairs with desc_update_last_finalized:A.
1486 	 */
1487 	ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
1488 					); /* LMM(desc_last_finalized_seq:A) */
1489 
1490 	return __ulseq_to_u64seq(rb, ulseq);
1491 }
1492 
1493 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
1494 			    struct printk_record *r, unsigned int *line_count);
1495 
1496 /*
1497  * Check if there are records directly following @last_finalized_seq that are
1498  * finalized. If so, update @last_finalized_seq to the latest of these
1499  * records. It is not allowed to skip over records that are not yet finalized.
1500  */
1501 static void desc_update_last_finalized(struct printk_ringbuffer *rb)
1502 {
1503 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1504 	u64 old_seq = desc_last_finalized_seq(rb);
1505 	unsigned long oldval;
1506 	unsigned long newval;
1507 	u64 finalized_seq;
1508 	u64 try_seq;
1509 
1510 try_again:
1511 	finalized_seq = old_seq;
1512 	try_seq = finalized_seq + 1;
1513 
1514 	/* Try to find later finalized records. */
1515 	while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
1516 		finalized_seq = try_seq;
1517 		try_seq++;
1518 	}
1519 
1520 	/* No update needed if no later finalized record was found. */
1521 	if (finalized_seq == old_seq)
1522 		return;
1523 
1524 	oldval = __u64seq_to_ulseq(old_seq);
1525 	newval = __u64seq_to_ulseq(finalized_seq);
1526 
1527 	/*
1528 	 * Set the sequence number of a later finalized record that has been
1529 	 * seen.
1530 	 *
1531 	 * Guarantee the record data is visible to other CPUs before storing
1532 	 * its sequence number. This pairs with desc_last_finalized_seq:A.
1533 	 *
1534 	 * Memory barrier involvement:
1535 	 *
1536 	 * If desc_last_finalized_seq:A reads from
1537 	 * desc_update_last_finalized:A, then desc_read:A reads from
1538 	 * _prb_commit:B.
1539 	 *
1540 	 * Relies on:
1541 	 *
1542 	 * RELEASE from _prb_commit:B to desc_update_last_finalized:A
1543 	 *    matching
1544 	 * ACQUIRE from desc_last_finalized_seq:A to desc_read:A
1545 	 *
1546 	 * Note: _prb_commit:B and desc_update_last_finalized:A can be
1547 	 *       different CPUs. However, the desc_update_last_finalized:A
1548 	 *       CPU (which performs the release) must have previously seen
1549 	 *       _prb_commit:B.
1550 	 */
1551 	if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
1552 				&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
1553 		old_seq = __ulseq_to_u64seq(rb, oldval);
1554 		goto try_again;
1555 	}
1556 }
1557 
1558 /*
1559  * Attempt to finalize a specified descriptor. If this fails, the descriptor
1560  * is either already final or it will finalize itself when the writer commits.
1561  */
1562 static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
1563 {
1564 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1565 	unsigned long prev_state_val = DESC_SV(id, desc_committed);
1566 	struct prb_desc *d = to_desc(desc_ring, id);
1567 
1568 	if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
1569 			DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
1570 		desc_update_last_finalized(rb);
1571 	}
1572 }
1573 
1574 /**
1575  * prb_reserve() - Reserve space in the ringbuffer.
1576  *
1577  * @e:  The entry structure to setup.
1578  * @rb: The ringbuffer to reserve data in.
1579  * @r:  The record structure to allocate buffers for.
1580  *
1581  * This is the public function available to writers to reserve data.
1582  *
1583  * The writer specifies the text size to reserve by setting the
1584  * @text_buf_size field of @r. To ensure proper initialization of @r,
1585  * prb_rec_init_wr() should be used.
1586  *
1587  * Context: Any context. Disables local interrupts on success.
1588  * Return: true if at least text data could be allocated, otherwise false.
1589  *
1590  * On success, the fields @info and @text_buf of @r will be set by this
1591  * function and should be filled in by the writer before committing. Also
1592  * on success, prb_record_text_space() can be used on @e to query the actual
1593  * space used for the text data block.
1594  *
1595  * Important: @info->text_len needs to be set correctly by the writer in
1596  *            order for data to be readable and/or extended. Its value
1597  *            is initialized to 0.
1598  */
1599 bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
1600 		 struct printk_record *r)
1601 {
1602 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1603 	struct printk_info *info;
1604 	struct prb_desc *d;
1605 	unsigned long id;
1606 	u64 seq;
1607 
1608 	if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
1609 		goto fail;
1610 
1611 	/*
1612 	 * Descriptors in the reserved state act as blockers to all further
1613 	 * reservations once the desc_ring has fully wrapped. Disable
1614 	 * interrupts during the reserve/commit window in order to minimize
1615 	 * the likelihood of this happening.
1616 	 */
1617 	local_irq_save(e->irqflags);
1618 
1619 	if (!desc_reserve(rb, &id)) {
1620 		/* Descriptor reservation failures are tracked. */
1621 		atomic_long_inc(&rb->fail);
1622 		local_irq_restore(e->irqflags);
1623 		goto fail;
1624 	}
1625 
1626 	d = to_desc(desc_ring, id);
1627 	info = to_info(desc_ring, id);
1628 
1629 	/*
1630 	 * All @info fields (except @seq) are cleared and must be filled in
1631 	 * by the writer. Save @seq before clearing because it is used to
1632 	 * determine the new sequence number.
1633 	 */
1634 	seq = info->seq;
1635 	memset(info, 0, sizeof(*info));
1636 
1637 	/*
1638 	 * Set the @e fields here so that prb_commit() can be used if
1639 	 * text data allocation fails.
1640 	 */
1641 	e->rb = rb;
1642 	e->id = id;
1643 
1644 	/*
1645 	 * Initialize the sequence number if it has "never been set".
1646 	 * Otherwise just increment it by a full wrap.
1647 	 *
1648 	 * @seq is considered "never been set" if it has a value of 0,
1649 	 * _except_ for @infos[0], which was specially setup by the ringbuffer
1650 	 * initializer and therefore is always considered as set.
1651 	 *
1652 	 * See the "Bootstrap" comment block in printk_ringbuffer.h for
1653 	 * details about how the initializer bootstraps the descriptors.
1654 	 */
1655 	if (seq == 0 && DESC_INDEX(desc_ring, id) != 0)
1656 		info->seq = DESC_INDEX(desc_ring, id);
1657 	else
1658 		info->seq = seq + DESCS_COUNT(desc_ring);
1659 
1660 	/*
1661 	 * New data is about to be reserved. Once that happens, previous
1662 	 * descriptors are no longer able to be extended. Finalize the
1663 	 * previous descriptor now so that it can be made available to
1664 	 * readers. (For seq==0 there is no previous descriptor.)
1665 	 */
1666 	if (info->seq > 0)
1667 		desc_make_final(rb, DESC_ID(id - 1));
1668 
1669 	r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
1670 	/* If text data allocation fails, a data-less record is committed. */
1671 	if (r->text_buf_size && !r->text_buf) {
1672 		prb_commit(e);
1673 		/* prb_commit() re-enabled interrupts. */
1674 		goto fail;
1675 	}
1676 
1677 	r->info = info;
1678 
1679 	/* Record full text space used by record. */
1680 	e->text_space = space_used(&rb->text_data_ring, &d->text_blk_lpos);
1681 
1682 	return true;
1683 fail:
1684 	/* Make it clear to the caller that the reserve failed. */
1685 	memset(r, 0, sizeof(*r));
1686 	return false;
1687 }
1688 
1689 /* Commit the data (possibly finalizing it) and restore interrupts. */
1690 static void _prb_commit(struct prb_reserved_entry *e, unsigned long state_val)
1691 {
1692 	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
1693 	struct prb_desc *d = to_desc(desc_ring, e->id);
1694 	unsigned long prev_state_val = DESC_SV(e->id, desc_reserved);
1695 
1696 	/* Now the writer has finished all writing: LMM(_prb_commit:A) */
1697 
1698 	/*
1699 	 * Set the descriptor as committed. See "ABA Issues" about why
1700 	 * cmpxchg() instead of set() is used.
1701 	 *
1702 	 * 1  Guarantee all record data is stored before the descriptor state
1703 	 *    is stored as committed. A write memory barrier is sufficient
1704 	 *    for this. This pairs with desc_read:B and desc_reopen_last:A.
1705 	 *
1706 	 * 2. Guarantee the descriptor state is stored as committed before
1707 	 *    re-checking the head ID in order to possibly finalize this
1708 	 *    descriptor. This pairs with desc_reserve:D.
1709 	 *
1710 	 *    Memory barrier involvement:
1711 	 *
1712 	 *    If prb_commit:A reads from desc_reserve:D, then
1713 	 *    desc_make_final:A reads from _prb_commit:B.
1714 	 *
1715 	 *    Relies on:
1716 	 *
1717 	 *    MB _prb_commit:B to prb_commit:A
1718 	 *       matching
1719 	 *    MB desc_reserve:D to desc_make_final:A
1720 	 */
1721 	if (!atomic_long_try_cmpxchg(&d->state_var, &prev_state_val,
1722 			DESC_SV(e->id, state_val))) { /* LMM(_prb_commit:B) */
1723 		WARN_ON_ONCE(1);
1724 	}
1725 
1726 	/* Restore interrupts, the reserve/commit window is finished. */
1727 	local_irq_restore(e->irqflags);
1728 }
1729 
1730 /**
1731  * prb_commit() - Commit (previously reserved) data to the ringbuffer.
1732  *
1733  * @e: The entry containing the reserved data information.
1734  *
1735  * This is the public function available to writers to commit data.
1736  *
1737  * Note that the data is not yet available to readers until it is finalized.
1738  * Finalizing happens automatically when space for the next record is
1739  * reserved.
1740  *
1741  * See prb_final_commit() for a version of this function that finalizes
1742  * immediately.
1743  *
1744  * Context: Any context. Enables local interrupts.
1745  */
1746 void prb_commit(struct prb_reserved_entry *e)
1747 {
1748 	struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
1749 	unsigned long head_id;
1750 
1751 	_prb_commit(e, desc_committed);
1752 
1753 	/*
1754 	 * If this descriptor is no longer the head (i.e. a new record has
1755 	 * been allocated), extending the data for this record is no longer
1756 	 * allowed and therefore it must be finalized.
1757 	 */
1758 	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
1759 	if (head_id != e->id)
1760 		desc_make_final(e->rb, e->id);
1761 }
1762 
1763 /**
1764  * prb_final_commit() - Commit and finalize (previously reserved) data to
1765  *                      the ringbuffer.
1766  *
1767  * @e: The entry containing the reserved data information.
1768  *
1769  * This is the public function available to writers to commit+finalize data.
1770  *
1771  * By finalizing, the data is made immediately available to readers.
1772  *
1773  * This function should only be used if there are no intentions of extending
1774  * this data using prb_reserve_in_last().
1775  *
1776  * Context: Any context. Enables local interrupts.
1777  */
1778 void prb_final_commit(struct prb_reserved_entry *e)
1779 {
1780 	_prb_commit(e, desc_finalized);
1781 
1782 	desc_update_last_finalized(e->rb);
1783 }
1784 
1785 /*
1786  * Count the number of lines in provided text. All text has at least 1 line
1787  * (even if @text_size is 0). Each '\n' processed is counted as an additional
1788  * line.
1789  */
1790 static unsigned int count_lines(const char *text, unsigned int text_size)
1791 {
1792 	unsigned int next_size = text_size;
1793 	unsigned int line_count = 1;
1794 	const char *next = text;
1795 
1796 	while (next_size) {
1797 		next = memchr(next, '\n', next_size);
1798 		if (!next)
1799 			break;
1800 		line_count++;
1801 		next++;
1802 		next_size = text_size - (next - text);
1803 	}
1804 
1805 	return line_count;
1806 }
1807 
1808 /*
1809  * Given @blk_lpos, copy an expected @len of data into the provided buffer.
1810  * If @line_count is provided, count the number of lines in the data.
1811  *
1812  * This function (used by readers) performs strict validation on the data
1813  * size to possibly detect bugs in the writer code. A WARN_ON_ONCE() is
1814  * triggered if an internal error is detected.
1815  */
1816 static bool copy_data(struct prb_data_ring *data_ring,
1817 		      struct prb_data_blk_lpos *blk_lpos, u16 len, char *buf,
1818 		      unsigned int buf_size, unsigned int *line_count)
1819 {
1820 	unsigned int data_size;
1821 	const char *data;
1822 
1823 	/* Caller might not want any data. */
1824 	if ((!buf || !buf_size) && !line_count)
1825 		return true;
1826 
1827 	data = get_data(data_ring, blk_lpos, &data_size);
1828 	if (!data)
1829 		return false;
1830 
1831 	/*
1832 	 * Actual cannot be less than expected. It can be more than expected
1833 	 * because of the trailing alignment padding.
1834 	 *
1835 	 * Note that invalid @len values can occur because the caller loads
1836 	 * the value during an allowed data race.
1837 	 */
1838 	if (data_size < (unsigned int)len)
1839 		return false;
1840 
1841 	/* Caller interested in the line count? */
1842 	if (line_count)
1843 		*line_count = count_lines(data, len);
1844 
1845 	/* Caller interested in the data content? */
1846 	if (!buf || !buf_size)
1847 		return true;
1848 
1849 	data_size = min_t(unsigned int, buf_size, len);
1850 
1851 	memcpy(&buf[0], data, data_size); /* LMM(copy_data:A) */
1852 	return true;
1853 }
1854 
1855 /*
1856  * This is an extended version of desc_read(). It gets a copy of a specified
1857  * descriptor. However, it also verifies that the record is finalized and has
1858  * the sequence number @seq. On success, 0 is returned.
1859  *
1860  * Error return values:
1861  * -EINVAL: A finalized record with sequence number @seq does not exist.
1862  * -ENOENT: A finalized record with sequence number @seq exists, but its data
1863  *          is not available. This is a valid record, so readers should
1864  *          continue with the next record.
1865  */
1866 static int desc_read_finalized_seq(struct prb_desc_ring *desc_ring,
1867 				   unsigned long id, u64 seq,
1868 				   struct prb_desc *desc_out)
1869 {
1870 	struct prb_data_blk_lpos *blk_lpos = &desc_out->text_blk_lpos;
1871 	enum desc_state d_state;
1872 	u64 s;
1873 
1874 	d_state = desc_read(desc_ring, id, desc_out, &s, NULL);
1875 
1876 	/*
1877 	 * An unexpected @id (desc_miss) or @seq mismatch means the record
1878 	 * does not exist. A descriptor in the reserved or committed state
1879 	 * means the record does not yet exist for the reader.
1880 	 */
1881 	if (d_state == desc_miss ||
1882 	    d_state == desc_reserved ||
1883 	    d_state == desc_committed ||
1884 	    s != seq) {
1885 		return -EINVAL;
1886 	}
1887 
1888 	/*
1889 	 * A descriptor in the reusable state may no longer have its data
1890 	 * available; report it as existing but with lost data. Or the record
1891 	 * may actually be a record with lost data.
1892 	 */
1893 	if (d_state == desc_reusable ||
1894 	    (blk_lpos->begin == FAILED_LPOS && blk_lpos->next == FAILED_LPOS)) {
1895 		return -ENOENT;
1896 	}
1897 
1898 	return 0;
1899 }
1900 
1901 /*
1902  * Copy the ringbuffer data from the record with @seq to the provided
1903  * @r buffer. On success, 0 is returned.
1904  *
1905  * See desc_read_finalized_seq() for error return values.
1906  */
1907 static int prb_read(struct printk_ringbuffer *rb, u64 seq,
1908 		    struct printk_record *r, unsigned int *line_count)
1909 {
1910 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1911 	struct printk_info *info = to_info(desc_ring, seq);
1912 	struct prb_desc *rdesc = to_desc(desc_ring, seq);
1913 	atomic_long_t *state_var = &rdesc->state_var;
1914 	struct prb_desc desc;
1915 	unsigned long id;
1916 	int err;
1917 
1918 	/* Extract the ID, used to specify the descriptor to read. */
1919 	id = DESC_ID(atomic_long_read(state_var));
1920 
1921 	/* Get a local copy of the correct descriptor (if available). */
1922 	err = desc_read_finalized_seq(desc_ring, id, seq, &desc);
1923 
1924 	/*
1925 	 * If @r is NULL, the caller is only interested in the availability
1926 	 * of the record.
1927 	 */
1928 	if (err || !r)
1929 		return err;
1930 
1931 	/* If requested, copy meta data. */
1932 	if (r->info)
1933 		memcpy(r->info, info, sizeof(*(r->info)));
1934 
1935 	/* Copy text data. If it fails, this is a data-less record. */
1936 	if (!copy_data(&rb->text_data_ring, &desc.text_blk_lpos, info->text_len,
1937 		       r->text_buf, r->text_buf_size, line_count)) {
1938 		return -ENOENT;
1939 	}
1940 
1941 	/* Ensure the record is still finalized and has the same @seq. */
1942 	return desc_read_finalized_seq(desc_ring, id, seq, &desc);
1943 }
1944 
1945 /* Get the sequence number of the tail descriptor. */
1946 u64 prb_first_seq(struct printk_ringbuffer *rb)
1947 {
1948 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
1949 	enum desc_state d_state;
1950 	struct prb_desc desc;
1951 	unsigned long id;
1952 	u64 seq;
1953 
1954 	for (;;) {
1955 		id = atomic_long_read(&rb->desc_ring.tail_id); /* LMM(prb_first_seq:A) */
1956 
1957 		d_state = desc_read(desc_ring, id, &desc, &seq, NULL); /* LMM(prb_first_seq:B) */
1958 
1959 		/*
1960 		 * This loop will not be infinite because the tail is
1961 		 * _always_ in the finalized or reusable state.
1962 		 */
1963 		if (d_state == desc_finalized || d_state == desc_reusable)
1964 			break;
1965 
1966 		/*
1967 		 * Guarantee the last state load from desc_read() is before
1968 		 * reloading @tail_id in order to see a new tail in the case
1969 		 * that the descriptor has been recycled. This pairs with
1970 		 * desc_reserve:D.
1971 		 *
1972 		 * Memory barrier involvement:
1973 		 *
1974 		 * If prb_first_seq:B reads from desc_reserve:F, then
1975 		 * prb_first_seq:A reads from desc_push_tail:B.
1976 		 *
1977 		 * Relies on:
1978 		 *
1979 		 * MB from desc_push_tail:B to desc_reserve:F
1980 		 *    matching
1981 		 * RMB prb_first_seq:B to prb_first_seq:A
1982 		 */
1983 		smp_rmb(); /* LMM(prb_first_seq:C) */
1984 	}
1985 
1986 	return seq;
1987 }
1988 
1989 /**
1990  * prb_next_reserve_seq() - Get the sequence number after the most recently
1991  *                  reserved record.
1992  *
1993  * @rb:  The ringbuffer to get the sequence number from.
1994  *
1995  * This is the public function available to readers to see what sequence
1996  * number will be assigned to the next reserved record.
1997  *
1998  * Note that depending on the situation, this value can be equal to or
1999  * higher than the sequence number returned by prb_next_seq().
2000  *
2001  * Context: Any context.
2002  * Return: The sequence number that will be assigned to the next record
2003  *         reserved.
2004  */
2005 u64 prb_next_reserve_seq(struct printk_ringbuffer *rb)
2006 {
2007 	struct prb_desc_ring *desc_ring = &rb->desc_ring;
2008 	unsigned long last_finalized_id;
2009 	atomic_long_t *state_var;
2010 	u64 last_finalized_seq;
2011 	unsigned long head_id;
2012 	struct prb_desc desc;
2013 	unsigned long diff;
2014 	struct prb_desc *d;
2015 	int err;
2016 
2017 	/*
2018 	 * It may not be possible to read a sequence number for @head_id.
2019 	 * So the ID of @last_finailzed_seq is used to calculate what the
2020 	 * sequence number of @head_id will be.
2021 	 */
2022 
2023 try_again:
2024 	last_finalized_seq = desc_last_finalized_seq(rb);
2025 
2026 	/*
2027 	 * @head_id is loaded after @last_finalized_seq to ensure that
2028 	 * it points to the record with @last_finalized_seq or newer.
2029 	 *
2030 	 * Memory barrier involvement:
2031 	 *
2032 	 * If desc_last_finalized_seq:A reads from
2033 	 * desc_update_last_finalized:A, then
2034 	 * prb_next_reserve_seq:A reads from desc_reserve:D.
2035 	 *
2036 	 * Relies on:
2037 	 *
2038 	 * RELEASE from desc_reserve:D to desc_update_last_finalized:A
2039 	 *    matching
2040 	 * ACQUIRE from desc_last_finalized_seq:A to prb_next_reserve_seq:A
2041 	 *
2042 	 * Note: desc_reserve:D and desc_update_last_finalized:A can be
2043 	 *       different CPUs. However, the desc_update_last_finalized:A CPU
2044 	 *       (which performs the release) must have previously seen
2045 	 *       desc_read:C, which implies desc_reserve:D can be seen.
2046 	 */
2047 	head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_next_reserve_seq:A) */
2048 
2049 	d = to_desc(desc_ring, last_finalized_seq);
2050 	state_var = &d->state_var;
2051 
2052 	/* Extract the ID, used to specify the descriptor to read. */
2053 	last_finalized_id = DESC_ID(atomic_long_read(state_var));
2054 
2055 	/* Ensure @last_finalized_id is correct. */
2056 	err = desc_read_finalized_seq(desc_ring, last_finalized_id, last_finalized_seq, &desc);
2057 
2058 	if (err == -EINVAL) {
2059 		if (last_finalized_seq == 0) {
2060 			/*
2061 			 * No record has been finalized or even reserved yet.
2062 			 *
2063 			 * The @head_id is initialized such that the first
2064 			 * increment will yield the first record (seq=0).
2065 			 * Handle it separately to avoid a negative @diff
2066 			 * below.
2067 			 */
2068 			if (head_id == DESC0_ID(desc_ring->count_bits))
2069 				return 0;
2070 
2071 			/*
2072 			 * One or more descriptors are already reserved. Use
2073 			 * the descriptor ID of the first one (@seq=0) for
2074 			 * the @diff below.
2075 			 */
2076 			last_finalized_id = DESC0_ID(desc_ring->count_bits) + 1;
2077 		} else {
2078 			/* Record must have been overwritten. Try again. */
2079 			goto try_again;
2080 		}
2081 	}
2082 
2083 	/* Diff of known descriptor IDs to compute related sequence numbers. */
2084 	diff = head_id - last_finalized_id;
2085 
2086 	/*
2087 	 * @head_id points to the most recently reserved record, but this
2088 	 * function returns the sequence number that will be assigned to the
2089 	 * next (not yet reserved) record. Thus +1 is needed.
2090 	 */
2091 	return (last_finalized_seq + diff + 1);
2092 }
2093 
2094 /*
2095  * Non-blocking read of a record.
2096  *
2097  * On success @seq is updated to the record that was read and (if provided)
2098  * @r and @line_count will contain the read/calculated data.
2099  *
2100  * On failure @seq is updated to a record that is not yet available to the
2101  * reader, but it will be the next record available to the reader.
2102  *
2103  * Note: When the current CPU is in panic, this function will skip over any
2104  *       non-existent/non-finalized records in order to allow the panic CPU
2105  *       to print any and all records that have been finalized.
2106  */
2107 static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
2108 			    struct printk_record *r, unsigned int *line_count)
2109 {
2110 	u64 tail_seq;
2111 	int err;
2112 
2113 	while ((err = prb_read(rb, *seq, r, line_count))) {
2114 		tail_seq = prb_first_seq(rb);
2115 
2116 		if (*seq < tail_seq) {
2117 			/*
2118 			 * Behind the tail. Catch up and try again. This
2119 			 * can happen for -ENOENT and -EINVAL cases.
2120 			 */
2121 			*seq = tail_seq;
2122 
2123 		} else if (err == -ENOENT) {
2124 			/* Record exists, but the data was lost. Skip. */
2125 			(*seq)++;
2126 
2127 		} else {
2128 			/*
2129 			 * Non-existent/non-finalized record. Must stop.
2130 			 *
2131 			 * For panic situations it cannot be expected that
2132 			 * non-finalized records will become finalized. But
2133 			 * there may be other finalized records beyond that
2134 			 * need to be printed for a panic situation. If this
2135 			 * is the panic CPU, skip this
2136 			 * non-existent/non-finalized record unless it is
2137 			 * at or beyond the head, in which case it is not
2138 			 * possible to continue.
2139 			 *
2140 			 * Note that new messages printed on panic CPU are
2141 			 * finalized when we are here. The only exception
2142 			 * might be the last message without trailing newline.
2143 			 * But it would have the sequence number returned
2144 			 * by "prb_next_reserve_seq() - 1".
2145 			 */
2146 			if (this_cpu_in_panic() && ((*seq + 1) < prb_next_reserve_seq(rb)))
2147 				(*seq)++;
2148 			else
2149 				return false;
2150 		}
2151 	}
2152 
2153 	return true;
2154 }
2155 
2156 /**
2157  * prb_read_valid() - Non-blocking read of a requested record or (if gone)
2158  *                    the next available record.
2159  *
2160  * @rb:  The ringbuffer to read from.
2161  * @seq: The sequence number of the record to read.
2162  * @r:   A record data buffer to store the read record to.
2163  *
2164  * This is the public function available to readers to read a record.
2165  *
2166  * The reader provides the @info and @text_buf buffers of @r to be
2167  * filled in. Any of the buffer pointers can be set to NULL if the reader
2168  * is not interested in that data. To ensure proper initialization of @r,
2169  * prb_rec_init_rd() should be used.
2170  *
2171  * Context: Any context.
2172  * Return: true if a record was read, otherwise false.
2173  *
2174  * On success, the reader must check r->info.seq to see which record was
2175  * actually read. This allows the reader to detect dropped records.
2176  *
2177  * Failure means @seq refers to a record not yet available to the reader.
2178  */
2179 bool prb_read_valid(struct printk_ringbuffer *rb, u64 seq,
2180 		    struct printk_record *r)
2181 {
2182 	return _prb_read_valid(rb, &seq, r, NULL);
2183 }
2184 
2185 /**
2186  * prb_read_valid_info() - Non-blocking read of meta data for a requested
2187  *                         record or (if gone) the next available record.
2188  *
2189  * @rb:         The ringbuffer to read from.
2190  * @seq:        The sequence number of the record to read.
2191  * @info:       A buffer to store the read record meta data to.
2192  * @line_count: A buffer to store the number of lines in the record text.
2193  *
2194  * This is the public function available to readers to read only the
2195  * meta data of a record.
2196  *
2197  * The reader provides the @info, @line_count buffers to be filled in.
2198  * Either of the buffer pointers can be set to NULL if the reader is not
2199  * interested in that data.
2200  *
2201  * Context: Any context.
2202  * Return: true if a record's meta data was read, otherwise false.
2203  *
2204  * On success, the reader must check info->seq to see which record meta data
2205  * was actually read. This allows the reader to detect dropped records.
2206  *
2207  * Failure means @seq refers to a record not yet available to the reader.
2208  */
2209 bool prb_read_valid_info(struct printk_ringbuffer *rb, u64 seq,
2210 			 struct printk_info *info, unsigned int *line_count)
2211 {
2212 	struct printk_record r;
2213 
2214 	prb_rec_init_rd(&r, info, NULL, 0);
2215 
2216 	return _prb_read_valid(rb, &seq, &r, line_count);
2217 }
2218 
2219 /**
2220  * prb_first_valid_seq() - Get the sequence number of the oldest available
2221  *                         record.
2222  *
2223  * @rb: The ringbuffer to get the sequence number from.
2224  *
2225  * This is the public function available to readers to see what the
2226  * first/oldest valid sequence number is.
2227  *
2228  * This provides readers a starting point to begin iterating the ringbuffer.
2229  *
2230  * Context: Any context.
2231  * Return: The sequence number of the first/oldest record or, if the
2232  *         ringbuffer is empty, 0 is returned.
2233  */
2234 u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
2235 {
2236 	u64 seq = 0;
2237 
2238 	if (!_prb_read_valid(rb, &seq, NULL, NULL))
2239 		return 0;
2240 
2241 	return seq;
2242 }
2243 
2244 /**
2245  * prb_next_seq() - Get the sequence number after the last available record.
2246  *
2247  * @rb:  The ringbuffer to get the sequence number from.
2248  *
2249  * This is the public function available to readers to see what the next
2250  * newest sequence number available to readers will be.
2251  *
2252  * This provides readers a sequence number to jump to if all currently
2253  * available records should be skipped. It is guaranteed that all records
2254  * previous to the returned value have been finalized and are (or were)
2255  * available to the reader.
2256  *
2257  * Context: Any context.
2258  * Return: The sequence number of the next newest (not yet available) record
2259  *         for readers.
2260  */
2261 u64 prb_next_seq(struct printk_ringbuffer *rb)
2262 {
2263 	u64 seq;
2264 
2265 	seq = desc_last_finalized_seq(rb);
2266 
2267 	/*
2268 	 * Begin searching after the last finalized record.
2269 	 *
2270 	 * On 0, the search must begin at 0 because of hack#2
2271 	 * of the bootstrapping phase it is not known if a
2272 	 * record at index 0 exists.
2273 	 */
2274 	if (seq != 0)
2275 		seq++;
2276 
2277 	/*
2278 	 * The information about the last finalized @seq might be inaccurate.
2279 	 * Search forward to find the current one.
2280 	 */
2281 	while (_prb_read_valid(rb, &seq, NULL, NULL))
2282 		seq++;
2283 
2284 	return seq;
2285 }
2286 
2287 /**
2288  * prb_init() - Initialize a ringbuffer to use provided external buffers.
2289  *
2290  * @rb:       The ringbuffer to initialize.
2291  * @text_buf: The data buffer for text data.
2292  * @textbits: The size of @text_buf as a power-of-2 value.
2293  * @descs:    The descriptor buffer for ringbuffer records.
2294  * @descbits: The count of @descs items as a power-of-2 value.
2295  * @infos:    The printk_info buffer for ringbuffer records.
2296  *
2297  * This is the public function available to writers to setup a ringbuffer
2298  * during runtime using provided buffers.
2299  *
2300  * This must match the initialization of DEFINE_PRINTKRB().
2301  *
2302  * Context: Any context.
2303  */
2304 void prb_init(struct printk_ringbuffer *rb,
2305 	      char *text_buf, unsigned int textbits,
2306 	      struct prb_desc *descs, unsigned int descbits,
2307 	      struct printk_info *infos)
2308 {
2309 	memset(descs, 0, _DESCS_COUNT(descbits) * sizeof(descs[0]));
2310 	memset(infos, 0, _DESCS_COUNT(descbits) * sizeof(infos[0]));
2311 
2312 	rb->desc_ring.count_bits = descbits;
2313 	rb->desc_ring.descs = descs;
2314 	rb->desc_ring.infos = infos;
2315 	atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
2316 	atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
2317 	atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
2318 
2319 	rb->text_data_ring.size_bits = textbits;
2320 	rb->text_data_ring.data = text_buf;
2321 	atomic_long_set(&rb->text_data_ring.head_lpos, BLK0_LPOS(textbits));
2322 	atomic_long_set(&rb->text_data_ring.tail_lpos, BLK0_LPOS(textbits));
2323 
2324 	atomic_long_set(&rb->fail, 0);
2325 
2326 	atomic_long_set(&(descs[_DESCS_COUNT(descbits) - 1].state_var), DESC0_SV(descbits));
2327 	descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.begin = FAILED_LPOS;
2328 	descs[_DESCS_COUNT(descbits) - 1].text_blk_lpos.next = FAILED_LPOS;
2329 
2330 	infos[0].seq = -(u64)_DESCS_COUNT(descbits);
2331 	infos[_DESCS_COUNT(descbits) - 1].seq = 0;
2332 }
2333 
2334 /**
2335  * prb_record_text_space() - Query the full actual used ringbuffer space for
2336  *                           the text data of a reserved entry.
2337  *
2338  * @e: The successfully reserved entry to query.
2339  *
2340  * This is the public function available to writers to see how much actual
2341  * space is used in the ringbuffer to store the text data of the specified
2342  * entry.
2343  *
2344  * This function is only valid if @e has been successfully reserved using
2345  * prb_reserve().
2346  *
2347  * Context: Any context.
2348  * Return: The size in bytes used by the text data of the associated record.
2349  */
2350 unsigned int prb_record_text_space(struct prb_reserved_entry *e)
2351 {
2352 	return e->text_space;
2353 }
2354