xref: /linux/drivers/md/dm-vdo/indexer/io-factory.c (revision eb01fe7abbe2d0b38824d2a93fdb4cc3eaf2ccc1)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "io-factory.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/blkdev.h>
10 #include <linux/err.h>
11 #include <linux/mount.h>
12 
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "numeric.h"
16 
17 /*
18  * The I/O factory object manages access to index storage, which is a contiguous range of blocks on
19  * a block device.
20  *
21  * The factory holds the open device and is responsible for closing it. The factory has methods to
22  * make helper structures that can be used to access sections of the index.
23  */
24 struct io_factory {
25 	struct block_device *bdev;
26 	atomic_t ref_count;
27 };
28 
29 /* The buffered reader allows efficient I/O by reading page-sized segments into a buffer. */
30 struct buffered_reader {
31 	struct io_factory *factory;
32 	struct dm_bufio_client *client;
33 	struct dm_buffer *buffer;
34 	sector_t limit;
35 	sector_t block_number;
36 	u8 *start;
37 	u8 *end;
38 };
39 
40 #define MAX_READ_AHEAD_BLOCKS 4
41 
42 /*
43  * The buffered writer allows efficient I/O by buffering writes and committing page-sized segments
44  * to storage.
45  */
46 struct buffered_writer {
47 	struct io_factory *factory;
48 	struct dm_bufio_client *client;
49 	struct dm_buffer *buffer;
50 	sector_t limit;
51 	sector_t block_number;
52 	u8 *start;
53 	u8 *end;
54 	int error;
55 };
56 
57 static void uds_get_io_factory(struct io_factory *factory)
58 {
59 	atomic_inc(&factory->ref_count);
60 }
61 
62 int uds_make_io_factory(struct block_device *bdev, struct io_factory **factory_ptr)
63 {
64 	int result;
65 	struct io_factory *factory;
66 
67 	result = vdo_allocate(1, struct io_factory, __func__, &factory);
68 	if (result != VDO_SUCCESS)
69 		return result;
70 
71 	factory->bdev = bdev;
72 	atomic_set_release(&factory->ref_count, 1);
73 
74 	*factory_ptr = factory;
75 	return UDS_SUCCESS;
76 }
77 
78 int uds_replace_storage(struct io_factory *factory, struct block_device *bdev)
79 {
80 	factory->bdev = bdev;
81 	return UDS_SUCCESS;
82 }
83 
84 /* Free an I/O factory once all references have been released. */
85 void uds_put_io_factory(struct io_factory *factory)
86 {
87 	if (atomic_add_return(-1, &factory->ref_count) <= 0)
88 		vdo_free(factory);
89 }
90 
91 size_t uds_get_writable_size(struct io_factory *factory)
92 {
93 	return i_size_read(factory->bdev->bd_inode);
94 }
95 
96 /* Create a struct dm_bufio_client for an index region starting at offset. */
97 int uds_make_bufio(struct io_factory *factory, off_t block_offset, size_t block_size,
98 		   unsigned int reserved_buffers, struct dm_bufio_client **client_ptr)
99 {
100 	struct dm_bufio_client *client;
101 
102 	client = dm_bufio_client_create(factory->bdev, block_size, reserved_buffers, 0,
103 					NULL, NULL, 0);
104 	if (IS_ERR(client))
105 		return -PTR_ERR(client);
106 
107 	dm_bufio_set_sector_offset(client, block_offset * SECTORS_PER_BLOCK);
108 	*client_ptr = client;
109 	return UDS_SUCCESS;
110 }
111 
112 static void read_ahead(struct buffered_reader *reader, sector_t block_number)
113 {
114 	if (block_number < reader->limit) {
115 		sector_t read_ahead = min((sector_t) MAX_READ_AHEAD_BLOCKS,
116 					  reader->limit - block_number);
117 
118 		dm_bufio_prefetch(reader->client, block_number, read_ahead);
119 	}
120 }
121 
122 void uds_free_buffered_reader(struct buffered_reader *reader)
123 {
124 	if (reader == NULL)
125 		return;
126 
127 	if (reader->buffer != NULL)
128 		dm_bufio_release(reader->buffer);
129 
130 	dm_bufio_client_destroy(reader->client);
131 	uds_put_io_factory(reader->factory);
132 	vdo_free(reader);
133 }
134 
135 /* Create a buffered reader for an index region starting at offset. */
136 int uds_make_buffered_reader(struct io_factory *factory, off_t offset, u64 block_count,
137 			     struct buffered_reader **reader_ptr)
138 {
139 	int result;
140 	struct dm_bufio_client *client = NULL;
141 	struct buffered_reader *reader = NULL;
142 
143 	result = uds_make_bufio(factory, offset, UDS_BLOCK_SIZE, 1, &client);
144 	if (result != UDS_SUCCESS)
145 		return result;
146 
147 	result = vdo_allocate(1, struct buffered_reader, "buffered reader", &reader);
148 	if (result != VDO_SUCCESS) {
149 		dm_bufio_client_destroy(client);
150 		return result;
151 	}
152 
153 	*reader = (struct buffered_reader) {
154 		.factory = factory,
155 		.client = client,
156 		.buffer = NULL,
157 		.limit = block_count,
158 		.block_number = 0,
159 		.start = NULL,
160 		.end = NULL,
161 	};
162 
163 	read_ahead(reader, 0);
164 	uds_get_io_factory(factory);
165 	*reader_ptr = reader;
166 	return UDS_SUCCESS;
167 }
168 
169 static int position_reader(struct buffered_reader *reader, sector_t block_number,
170 			   off_t offset)
171 {
172 	struct dm_buffer *buffer = NULL;
173 	void *data;
174 
175 	if ((reader->end == NULL) || (block_number != reader->block_number)) {
176 		if (block_number >= reader->limit)
177 			return UDS_OUT_OF_RANGE;
178 
179 		if (reader->buffer != NULL)
180 			dm_bufio_release(vdo_forget(reader->buffer));
181 
182 		data = dm_bufio_read(reader->client, block_number, &buffer);
183 		if (IS_ERR(data))
184 			return -PTR_ERR(data);
185 
186 		reader->buffer = buffer;
187 		reader->start = data;
188 		if (block_number == reader->block_number + 1)
189 			read_ahead(reader, block_number + 1);
190 	}
191 
192 	reader->block_number = block_number;
193 	reader->end = reader->start + offset;
194 	return UDS_SUCCESS;
195 }
196 
197 static size_t bytes_remaining_in_read_buffer(struct buffered_reader *reader)
198 {
199 	return (reader->end == NULL) ? 0 : reader->start + UDS_BLOCK_SIZE - reader->end;
200 }
201 
202 static int reset_reader(struct buffered_reader *reader)
203 {
204 	sector_t block_number;
205 
206 	if (bytes_remaining_in_read_buffer(reader) > 0)
207 		return UDS_SUCCESS;
208 
209 	block_number = reader->block_number;
210 	if (reader->end != NULL)
211 		block_number++;
212 
213 	return position_reader(reader, block_number, 0);
214 }
215 
216 int uds_read_from_buffered_reader(struct buffered_reader *reader, u8 *data,
217 				  size_t length)
218 {
219 	int result = UDS_SUCCESS;
220 	size_t chunk_size;
221 
222 	while (length > 0) {
223 		result = reset_reader(reader);
224 		if (result != UDS_SUCCESS)
225 			return result;
226 
227 		chunk_size = min(length, bytes_remaining_in_read_buffer(reader));
228 		memcpy(data, reader->end, chunk_size);
229 		length -= chunk_size;
230 		data += chunk_size;
231 		reader->end += chunk_size;
232 	}
233 
234 	return UDS_SUCCESS;
235 }
236 
237 /*
238  * Verify that the next data on the reader matches the required value. If the value matches, the
239  * matching contents are consumed. If the value does not match, the reader state is unchanged.
240  */
241 int uds_verify_buffered_data(struct buffered_reader *reader, const u8 *value,
242 			     size_t length)
243 {
244 	int result = UDS_SUCCESS;
245 	size_t chunk_size;
246 	sector_t start_block_number = reader->block_number;
247 	int start_offset = reader->end - reader->start;
248 
249 	while (length > 0) {
250 		result = reset_reader(reader);
251 		if (result != UDS_SUCCESS) {
252 			result = UDS_CORRUPT_DATA;
253 			break;
254 		}
255 
256 		chunk_size = min(length, bytes_remaining_in_read_buffer(reader));
257 		if (memcmp(value, reader->end, chunk_size) != 0) {
258 			result = UDS_CORRUPT_DATA;
259 			break;
260 		}
261 
262 		length -= chunk_size;
263 		value += chunk_size;
264 		reader->end += chunk_size;
265 	}
266 
267 	if (result != UDS_SUCCESS)
268 		position_reader(reader, start_block_number, start_offset);
269 
270 	return result;
271 }
272 
273 /* Create a buffered writer for an index region starting at offset. */
274 int uds_make_buffered_writer(struct io_factory *factory, off_t offset, u64 block_count,
275 			     struct buffered_writer **writer_ptr)
276 {
277 	int result;
278 	struct dm_bufio_client *client = NULL;
279 	struct buffered_writer *writer;
280 
281 	result = uds_make_bufio(factory, offset, UDS_BLOCK_SIZE, 1, &client);
282 	if (result != UDS_SUCCESS)
283 		return result;
284 
285 	result = vdo_allocate(1, struct buffered_writer, "buffered writer", &writer);
286 	if (result != VDO_SUCCESS) {
287 		dm_bufio_client_destroy(client);
288 		return result;
289 	}
290 
291 	*writer = (struct buffered_writer) {
292 		.factory = factory,
293 		.client = client,
294 		.buffer = NULL,
295 		.limit = block_count,
296 		.start = NULL,
297 		.end = NULL,
298 		.block_number = 0,
299 		.error = UDS_SUCCESS,
300 	};
301 
302 	uds_get_io_factory(factory);
303 	*writer_ptr = writer;
304 	return UDS_SUCCESS;
305 }
306 
307 static size_t get_remaining_write_space(struct buffered_writer *writer)
308 {
309 	return writer->start + UDS_BLOCK_SIZE - writer->end;
310 }
311 
312 static int __must_check prepare_next_buffer(struct buffered_writer *writer)
313 {
314 	struct dm_buffer *buffer = NULL;
315 	void *data;
316 
317 	if (writer->block_number >= writer->limit) {
318 		writer->error = UDS_OUT_OF_RANGE;
319 		return UDS_OUT_OF_RANGE;
320 	}
321 
322 	data = dm_bufio_new(writer->client, writer->block_number, &buffer);
323 	if (IS_ERR(data)) {
324 		writer->error = -PTR_ERR(data);
325 		return writer->error;
326 	}
327 
328 	writer->buffer = buffer;
329 	writer->start = data;
330 	writer->end = data;
331 	return UDS_SUCCESS;
332 }
333 
334 static int flush_previous_buffer(struct buffered_writer *writer)
335 {
336 	size_t available;
337 
338 	if (writer->buffer == NULL)
339 		return writer->error;
340 
341 	if (writer->error == UDS_SUCCESS) {
342 		available = get_remaining_write_space(writer);
343 
344 		if (available > 0)
345 			memset(writer->end, 0, available);
346 
347 		dm_bufio_mark_buffer_dirty(writer->buffer);
348 	}
349 
350 	dm_bufio_release(writer->buffer);
351 	writer->buffer = NULL;
352 	writer->start = NULL;
353 	writer->end = NULL;
354 	writer->block_number++;
355 	return writer->error;
356 }
357 
358 void uds_free_buffered_writer(struct buffered_writer *writer)
359 {
360 	int result;
361 
362 	if (writer == NULL)
363 		return;
364 
365 	flush_previous_buffer(writer);
366 	result = -dm_bufio_write_dirty_buffers(writer->client);
367 	if (result != UDS_SUCCESS)
368 		vdo_log_warning_strerror(result, "%s: failed to sync storage", __func__);
369 
370 	dm_bufio_client_destroy(writer->client);
371 	uds_put_io_factory(writer->factory);
372 	vdo_free(writer);
373 }
374 
375 /*
376  * Append data to the buffer, writing as needed. If no data is provided, zeros are written instead.
377  * If a write error occurs, it is recorded and returned on every subsequent write attempt.
378  */
379 int uds_write_to_buffered_writer(struct buffered_writer *writer, const u8 *data,
380 				 size_t length)
381 {
382 	int result = writer->error;
383 	size_t chunk_size;
384 
385 	while ((length > 0) && (result == UDS_SUCCESS)) {
386 		if (writer->buffer == NULL) {
387 			result = prepare_next_buffer(writer);
388 			continue;
389 		}
390 
391 		chunk_size = min(length, get_remaining_write_space(writer));
392 		if (data == NULL) {
393 			memset(writer->end, 0, chunk_size);
394 		} else {
395 			memcpy(writer->end, data, chunk_size);
396 			data += chunk_size;
397 		}
398 
399 		length -= chunk_size;
400 		writer->end += chunk_size;
401 
402 		if (get_remaining_write_space(writer) == 0)
403 			result = uds_flush_buffered_writer(writer);
404 	}
405 
406 	return result;
407 }
408 
409 int uds_flush_buffered_writer(struct buffered_writer *writer)
410 {
411 	if (writer->error != UDS_SUCCESS)
412 		return writer->error;
413 
414 	return flush_previous_buffer(writer);
415 }
416