xref: /linux/fs/netfs/rolling_buffer.c (revision ca56a74a31e26d81a481304ed2f631e65883372b)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* Rolling buffer helpers
3  *
4  * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #include <linux/bitops.h>
9 #include <linux/pagemap.h>
10 #include <linux/rolling_buffer.h>
11 #include <linux/slab.h>
12 #include "internal.h"
13 
14 static atomic_t debug_ids;
15 
16 /**
17  * netfs_folioq_alloc - Allocate a folio_queue struct
18  * @rreq_id: Associated debugging ID for tracing purposes
19  * @gfp: Allocation constraints
20  * @trace: Trace tag to indicate the purpose of the allocation
21  *
22  * Allocate, initialise and account the folio_queue struct and log a trace line
23  * to mark the allocation.
24  */
netfs_folioq_alloc(unsigned int rreq_id,gfp_t gfp,unsigned int trace)25 struct folio_queue *netfs_folioq_alloc(unsigned int rreq_id, gfp_t gfp,
26 				       unsigned int /*enum netfs_folioq_trace*/ trace)
27 {
28 	struct folio_queue *fq;
29 
30 	fq = kmalloc(sizeof(*fq), gfp);
31 	if (fq) {
32 		netfs_stat(&netfs_n_folioq);
33 		folioq_init(fq, rreq_id);
34 		fq->debug_id = atomic_inc_return(&debug_ids);
35 		trace_netfs_folioq(fq, trace);
36 	}
37 	return fq;
38 }
39 EXPORT_SYMBOL(netfs_folioq_alloc);
40 
41 /**
42  * netfs_folioq_free - Free a folio_queue struct
43  * @folioq: The object to free
44  * @trace: Trace tag to indicate which free
45  *
46  * Free and unaccount the folio_queue struct.
47  */
netfs_folioq_free(struct folio_queue * folioq,unsigned int trace)48 void netfs_folioq_free(struct folio_queue *folioq,
49 		       unsigned int /*enum netfs_trace_folioq*/ trace)
50 {
51 	trace_netfs_folioq(folioq, trace);
52 	netfs_stat_d(&netfs_n_folioq);
53 	kfree(folioq);
54 }
55 EXPORT_SYMBOL(netfs_folioq_free);
56 
57 /*
58  * Initialise a rolling buffer.  We allocate an empty folio queue struct to so
59  * that the pointers can be independently driven by the producer and the
60  * consumer.
61  */
rolling_buffer_init(struct rolling_buffer * roll,unsigned int rreq_id,unsigned int direction)62 int rolling_buffer_init(struct rolling_buffer *roll, unsigned int rreq_id,
63 			unsigned int direction)
64 {
65 	struct folio_queue *fq;
66 
67 	fq = netfs_folioq_alloc(rreq_id, GFP_NOFS, netfs_trace_folioq_rollbuf_init);
68 	if (!fq)
69 		return -ENOMEM;
70 
71 	roll->head = fq;
72 	roll->tail = fq;
73 	iov_iter_folio_queue(&roll->iter, direction, fq, 0, 0, 0);
74 	return 0;
75 }
76 
77 /*
78  * Add another folio_queue to a rolling buffer if there's no space left.
79  */
rolling_buffer_make_space(struct rolling_buffer * roll)80 int rolling_buffer_make_space(struct rolling_buffer *roll)
81 {
82 	struct folio_queue *fq, *head = roll->head;
83 
84 	if (!folioq_full(head))
85 		return 0;
86 
87 	fq = netfs_folioq_alloc(head->rreq_id, GFP_NOFS, netfs_trace_folioq_make_space);
88 	if (!fq)
89 		return -ENOMEM;
90 	fq->prev = head;
91 
92 	roll->head = fq;
93 	if (folioq_full(head)) {
94 		/* Make sure we don't leave the master iterator pointing to a
95 		 * block that might get immediately consumed.
96 		 */
97 		if (roll->iter.folioq == head &&
98 		    roll->iter.folioq_slot == folioq_nr_slots(head)) {
99 			roll->iter.folioq = fq;
100 			roll->iter.folioq_slot = 0;
101 		}
102 	}
103 
104 	/* Make sure the initialisation is stored before the next pointer.
105 	 *
106 	 * [!] NOTE: After we set head->next, the consumer is at liberty to
107 	 * immediately delete the old head.
108 	 */
109 	smp_store_release(&head->next, fq);
110 	return 0;
111 }
112 
113 /*
114  * Decant the list of folios to read into a rolling buffer.
115  */
rolling_buffer_load_from_ra(struct rolling_buffer * roll,struct readahead_control * ractl,struct folio_batch * put_batch)116 ssize_t rolling_buffer_load_from_ra(struct rolling_buffer *roll,
117 				    struct readahead_control *ractl,
118 				    struct folio_batch *put_batch)
119 {
120 	struct folio_queue *fq;
121 	struct page **vec;
122 	int nr, ix, to;
123 	ssize_t size = 0;
124 
125 	if (rolling_buffer_make_space(roll) < 0)
126 		return -ENOMEM;
127 
128 	fq = roll->head;
129 	vec = (struct page **)fq->vec.folios;
130 	nr = __readahead_batch(ractl, vec + folio_batch_count(&fq->vec),
131 			       folio_batch_space(&fq->vec));
132 	ix = fq->vec.nr;
133 	to = ix + nr;
134 	fq->vec.nr = to;
135 	for (; ix < to; ix++) {
136 		struct folio *folio = folioq_folio(fq, ix);
137 		unsigned int order = folio_order(folio);
138 
139 		fq->orders[ix] = order;
140 		size += PAGE_SIZE << order;
141 		trace_netfs_folio(folio, netfs_folio_trace_read);
142 		if (!folio_batch_add(put_batch, folio))
143 			folio_batch_release(put_batch);
144 	}
145 	WRITE_ONCE(roll->iter.count, roll->iter.count + size);
146 
147 	/* Store the counter after setting the slot. */
148 	smp_store_release(&roll->next_head_slot, to);
149 
150 	for (; ix < folioq_nr_slots(fq); ix++)
151 		folioq_clear(fq, ix);
152 
153 	return size;
154 }
155 
156 /*
157  * Append a folio to the rolling buffer.
158  */
rolling_buffer_append(struct rolling_buffer * roll,struct folio * folio,unsigned int flags)159 ssize_t rolling_buffer_append(struct rolling_buffer *roll, struct folio *folio,
160 			      unsigned int flags)
161 {
162 	ssize_t size = folio_size(folio);
163 	int slot;
164 
165 	if (rolling_buffer_make_space(roll) < 0)
166 		return -ENOMEM;
167 
168 	slot = folioq_append(roll->head, folio);
169 	if (flags & ROLLBUF_MARK_1)
170 		folioq_mark(roll->head, slot);
171 	if (flags & ROLLBUF_MARK_2)
172 		folioq_mark2(roll->head, slot);
173 
174 	WRITE_ONCE(roll->iter.count, roll->iter.count + size);
175 
176 	/* Store the counter after setting the slot. */
177 	smp_store_release(&roll->next_head_slot, slot);
178 	return size;
179 }
180 
181 /*
182  * Delete a spent buffer from a rolling queue and return the next in line.  We
183  * don't return the last buffer to keep the pointers independent, but return
184  * NULL instead.
185  */
rolling_buffer_delete_spent(struct rolling_buffer * roll)186 struct folio_queue *rolling_buffer_delete_spent(struct rolling_buffer *roll)
187 {
188 	struct folio_queue *spent = roll->tail, *next = READ_ONCE(spent->next);
189 
190 	if (!next)
191 		return NULL;
192 	next->prev = NULL;
193 	netfs_folioq_free(spent, netfs_trace_folioq_delete);
194 	roll->tail = next;
195 	return next;
196 }
197 
198 /*
199  * Clear out a rolling queue.  Folios that have mark 1 set are put.
200  */
rolling_buffer_clear(struct rolling_buffer * roll)201 void rolling_buffer_clear(struct rolling_buffer *roll)
202 {
203 	struct folio_batch fbatch;
204 	struct folio_queue *p;
205 
206 	folio_batch_init(&fbatch);
207 
208 	while ((p = roll->tail)) {
209 		roll->tail = p->next;
210 		for (int slot = 0; slot < folioq_count(p); slot++) {
211 			struct folio *folio = folioq_folio(p, slot);
212 
213 			if (!folio)
214 				continue;
215 			if (folioq_is_marked(p, slot)) {
216 				trace_netfs_folio(folio, netfs_folio_trace_put);
217 				if (!folio_batch_add(&fbatch, folio))
218 					folio_batch_release(&fbatch);
219 			}
220 		}
221 
222 		netfs_folioq_free(p, netfs_trace_folioq_clear);
223 	}
224 
225 	folio_batch_release(&fbatch);
226 }
227