xref: /linux/drivers/md/dm-vdo/io-submitter.c (revision d358e5254674b70f34c847715ca509e46eb81e6f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "io-submitter.h"
7 
8 #include <linux/bio.h>
9 #include <linux/kernel.h>
10 #include <linux/mutex.h>
11 
12 #include "memory-alloc.h"
13 #include "permassert.h"
14 
15 #include "data-vio.h"
16 #include "logger.h"
17 #include "types.h"
18 #include "vdo.h"
19 #include "vio.h"
20 
21 /*
22  * Submission of bio operations to the underlying storage device will go through a separate work
23  * queue thread (or more than one) to prevent blocking in other threads if the storage device has a
24  * full queue. The plug structure allows that thread to do better batching of requests to make the
25  * I/O more efficient.
26  *
27  * When multiple worker threads are used, a thread is chosen for a I/O operation submission based
28  * on the PBN, so a given PBN will consistently wind up on the same thread. Flush operations are
29  * assigned round-robin.
30  *
31  * The map (protected by the mutex) collects pending I/O operations so that the worker thread can
32  * reorder them to try to encourage I/O request merging in the request queue underneath.
33  */
34 struct bio_queue_data {
35 	struct vdo_work_queue *queue;
36 	struct blk_plug plug;
37 	struct int_map *map;
38 	struct mutex lock;
39 	unsigned int queue_number;
40 };
41 
42 struct io_submitter {
43 	unsigned int num_bio_queues_used;
44 	unsigned int bio_queue_rotation_interval;
45 	struct bio_queue_data bio_queue_data[];
46 };
47 
start_bio_queue(void * ptr)48 static void start_bio_queue(void *ptr)
49 {
50 	struct bio_queue_data *bio_queue_data = ptr;
51 
52 	blk_start_plug(&bio_queue_data->plug);
53 }
54 
finish_bio_queue(void * ptr)55 static void finish_bio_queue(void *ptr)
56 {
57 	struct bio_queue_data *bio_queue_data = ptr;
58 
59 	blk_finish_plug(&bio_queue_data->plug);
60 }
61 
62 static const struct vdo_work_queue_type bio_queue_type = {
63 	.start = start_bio_queue,
64 	.finish = finish_bio_queue,
65 	.max_priority = BIO_Q_MAX_PRIORITY,
66 	.default_priority = BIO_Q_DATA_PRIORITY,
67 };
68 
69 /**
70  * count_all_bios() - Determine which bio counter to use.
71  * @vio: The vio associated with the bio.
72  * @bio: The bio to count.
73  */
count_all_bios(struct vio * vio,struct bio * bio)74 static void count_all_bios(struct vio *vio, struct bio *bio)
75 {
76 	struct atomic_statistics *stats = &vio->completion.vdo->stats;
77 
78 	if (is_data_vio(vio)) {
79 		vdo_count_bios(&stats->bios_out, bio);
80 		return;
81 	}
82 
83 	vdo_count_bios(&stats->bios_meta, bio);
84 	if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
85 		vdo_count_bios(&stats->bios_journal, bio);
86 	else if (vio->type == VIO_TYPE_BLOCK_MAP)
87 		vdo_count_bios(&stats->bios_page_cache, bio);
88 }
89 
90 /**
91  * assert_in_bio_zone() - Assert that a vio is in the correct bio zone and not in interrupt
92  *                        context.
93  * @vio: The vio to check.
94  */
assert_in_bio_zone(struct vio * vio)95 static void assert_in_bio_zone(struct vio *vio)
96 {
97 	VDO_ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context");
98 	assert_vio_in_bio_zone(vio);
99 }
100 
101 /**
102  * send_bio_to_device() - Update stats and tracing info, then submit the supplied bio to the OS for
103  *                        processing.
104  * @vio: The vio associated with the bio.
105  * @bio: The bio to submit to the OS.
106  */
send_bio_to_device(struct vio * vio,struct bio * bio)107 static void send_bio_to_device(struct vio *vio, struct bio *bio)
108 {
109 	struct vdo *vdo = vio->completion.vdo;
110 
111 	assert_in_bio_zone(vio);
112 	atomic64_inc(&vdo->stats.bios_submitted);
113 	count_all_bios(vio, bio);
114 	bio_set_dev(bio, vdo_get_backing_device(vdo));
115 	submit_bio_noacct(bio);
116 }
117 
118 /**
119  * vdo_submit_vio() - Submits a vio's bio to the underlying block device. May block if the device
120  *		      is busy. This callback should be used by vios which did not attempt to merge.
121  * @completion: The vio to submit.
122  */
vdo_submit_vio(struct vdo_completion * completion)123 void vdo_submit_vio(struct vdo_completion *completion)
124 {
125 	struct vio *vio = as_vio(completion);
126 
127 	send_bio_to_device(vio, vio->bio);
128 }
129 
130 /**
131  * get_bio_list() - Extract the list of bios to submit from a vio.
132  * @vio: The vio submitting I/O.
133  *
134  * The list will always contain at least one entry (the bio for the vio on which it is called), but
135  * other bios may have been merged with it as well.
136  *
137  * Return: The head of the bio list to submit.
138  */
get_bio_list(struct vio * vio)139 static struct bio *get_bio_list(struct vio *vio)
140 {
141 	struct bio *bio;
142 	struct io_submitter *submitter = vio->completion.vdo->io_submitter;
143 	struct bio_queue_data *bio_queue_data = &(submitter->bio_queue_data[vio->bio_zone]);
144 
145 	assert_in_bio_zone(vio);
146 
147 	mutex_lock(&bio_queue_data->lock);
148 	vdo_int_map_remove(bio_queue_data->map,
149 			   vio->bios_merged.head->bi_iter.bi_sector);
150 	vdo_int_map_remove(bio_queue_data->map,
151 			   vio->bios_merged.tail->bi_iter.bi_sector);
152 	bio = vio->bios_merged.head;
153 	bio_list_init(&vio->bios_merged);
154 	mutex_unlock(&bio_queue_data->lock);
155 
156 	return bio;
157 }
158 
159 /**
160  * submit_data_vio() - Submit a data_vio's bio to the storage below along with
161  *		       any bios that have been merged with it.
162  * @completion: The vio to submit.
163  *
164  * Context: This call may block and so should only be called from a bio thread.
165  */
submit_data_vio(struct vdo_completion * completion)166 static void submit_data_vio(struct vdo_completion *completion)
167 {
168 	struct bio *bio, *next;
169 	struct vio *vio = as_vio(completion);
170 
171 	assert_in_bio_zone(vio);
172 	for (bio = get_bio_list(vio); bio != NULL; bio = next) {
173 		next = bio->bi_next;
174 		bio->bi_next = NULL;
175 		send_bio_to_device((struct vio *) bio->bi_private, bio);
176 	}
177 }
178 
179 /**
180  * get_mergeable_locked() - Attempt to find an already queued bio that the current bio can be
181  *                          merged with.
182  * @map: The bio map to use for merging.
183  * @vio: The vio we want to merge.
184  * @back_merge: Set to true for a back merge, false for a front merge.
185  *
186  * There are two types of merging possible, forward and backward, which are distinguished by a flag
187  * that uses kernel elevator terminology.
188  *
189  * Return: The vio to merge to, NULL if no merging is possible.
190  */
get_mergeable_locked(struct int_map * map,struct vio * vio,bool back_merge)191 static struct vio *get_mergeable_locked(struct int_map *map, struct vio *vio,
192 					bool back_merge)
193 {
194 	struct bio *bio = vio->bio;
195 	sector_t merge_sector = bio->bi_iter.bi_sector;
196 	struct vio *vio_merge;
197 
198 	if (back_merge)
199 		merge_sector -= VDO_SECTORS_PER_BLOCK;
200 	else
201 		merge_sector += VDO_SECTORS_PER_BLOCK;
202 
203 	vio_merge = vdo_int_map_get(map, merge_sector);
204 
205 	if (vio_merge == NULL)
206 		return NULL;
207 
208 	if (vio->completion.priority != vio_merge->completion.priority)
209 		return NULL;
210 
211 	if (bio_data_dir(bio) != bio_data_dir(vio_merge->bio))
212 		return NULL;
213 
214 	if (bio_list_empty(&vio_merge->bios_merged))
215 		return NULL;
216 
217 	if (back_merge) {
218 		return (vio_merge->bios_merged.tail->bi_iter.bi_sector == merge_sector ?
219 			vio_merge : NULL);
220 	}
221 
222 	return (vio_merge->bios_merged.head->bi_iter.bi_sector == merge_sector ?
223 		vio_merge : NULL);
224 }
225 
map_merged_vio(struct int_map * bio_map,struct vio * vio)226 static int map_merged_vio(struct int_map *bio_map, struct vio *vio)
227 {
228 	int result;
229 	sector_t bio_sector;
230 
231 	bio_sector = vio->bios_merged.head->bi_iter.bi_sector;
232 	result = vdo_int_map_put(bio_map, bio_sector, vio, true, NULL);
233 	if (result != VDO_SUCCESS)
234 		return result;
235 
236 	bio_sector = vio->bios_merged.tail->bi_iter.bi_sector;
237 	return vdo_int_map_put(bio_map, bio_sector, vio, true, NULL);
238 }
239 
merge_to_prev_tail(struct int_map * bio_map,struct vio * vio,struct vio * prev_vio)240 static int merge_to_prev_tail(struct int_map *bio_map, struct vio *vio,
241 			      struct vio *prev_vio)
242 {
243 	vdo_int_map_remove(bio_map, prev_vio->bios_merged.tail->bi_iter.bi_sector);
244 	bio_list_merge(&prev_vio->bios_merged, &vio->bios_merged);
245 	return map_merged_vio(bio_map, prev_vio);
246 }
247 
merge_to_next_head(struct int_map * bio_map,struct vio * vio,struct vio * next_vio)248 static int merge_to_next_head(struct int_map *bio_map, struct vio *vio,
249 			      struct vio *next_vio)
250 {
251 	/*
252 	 * Handle "next merge" and "gap fill" cases the same way so as to reorder bios in a way
253 	 * that's compatible with using funnel queues in work queues. This avoids removing an
254 	 * existing completion.
255 	 */
256 	vdo_int_map_remove(bio_map, next_vio->bios_merged.head->bi_iter.bi_sector);
257 	bio_list_merge_head(&next_vio->bios_merged, &vio->bios_merged);
258 	return map_merged_vio(bio_map, next_vio);
259 }
260 
261 /**
262  * try_bio_map_merge() - Attempt to merge a vio's bio with other pending I/Os.
263  * @vio: The vio to merge.
264  *
265  * Currently this is only used for data_vios, but is broken out for future use with metadata vios.
266  *
267  * Return: Whether or not the vio was merged.
268  */
try_bio_map_merge(struct vio * vio)269 static bool try_bio_map_merge(struct vio *vio)
270 {
271 	int result;
272 	bool merged = true;
273 	struct bio *bio = vio->bio;
274 	struct vio *prev_vio, *next_vio;
275 	struct vdo *vdo = vio->completion.vdo;
276 	struct bio_queue_data *bio_queue_data =
277 		&vdo->io_submitter->bio_queue_data[vio->bio_zone];
278 
279 	bio->bi_next = NULL;
280 	bio_list_init(&vio->bios_merged);
281 	bio_list_add(&vio->bios_merged, bio);
282 
283 	mutex_lock(&bio_queue_data->lock);
284 	prev_vio = get_mergeable_locked(bio_queue_data->map, vio, true);
285 	next_vio = get_mergeable_locked(bio_queue_data->map, vio, false);
286 	if (prev_vio == next_vio)
287 		next_vio = NULL;
288 
289 	if ((prev_vio == NULL) && (next_vio == NULL)) {
290 		/* no merge. just add to bio_queue */
291 		merged = false;
292 		result = vdo_int_map_put(bio_queue_data->map,
293 					 bio->bi_iter.bi_sector,
294 					 vio, true, NULL);
295 	} else if (next_vio == NULL) {
296 		/* Only prev. merge to prev's tail */
297 		result = merge_to_prev_tail(bio_queue_data->map, vio, prev_vio);
298 	} else {
299 		/* Only next. merge to next's head */
300 		result = merge_to_next_head(bio_queue_data->map, vio, next_vio);
301 	}
302 	mutex_unlock(&bio_queue_data->lock);
303 
304 	/* We don't care about failure of int_map_put in this case. */
305 	VDO_ASSERT_LOG_ONLY(result == VDO_SUCCESS, "bio map insertion succeeds");
306 	return merged;
307 }
308 
309 /**
310  * vdo_submit_data_vio() - Submit I/O for a data_vio.
311  * @data_vio: The data_vio for which to issue I/O.
312  *
313  * If possible, this I/O will be merged other pending I/Os. Otherwise, the data_vio will be sent to
314  * the appropriate bio zone directly.
315  */
vdo_submit_data_vio(struct data_vio * data_vio)316 void vdo_submit_data_vio(struct data_vio *data_vio)
317 {
318 	if (try_bio_map_merge(&data_vio->vio))
319 		return;
320 
321 	launch_data_vio_bio_zone_callback(data_vio, submit_data_vio);
322 }
323 
324 /**
325  * __submit_metadata_vio() - Submit I/O for a metadata vio.
326  * @vio: The vio for which to issue I/O.
327  * @physical: The physical block number to read or write.
328  * @callback: The bio endio function which will be called after the I/O completes.
329  * @error_handler: The handler for submission or I/O errors; may be NULL.
330  * @operation: The type of I/O to perform.
331  * @data: The buffer to read or write; may be NULL.
332  * @size: The I/O amount in bytes.
333  *
334  * The vio is enqueued on a vdo bio queue so that bio submission (which may block) does not block
335  * other vdo threads.
336  *
337  * That the error handler will run on the correct thread is only true so long as the thread calling
338  * this function, and the thread set in the endio callback are the same, as well as the fact that
339  * no error can occur on the bio queue. Currently this is true for all callers, but additional care
340  * will be needed if this ever changes.
341  */
__submit_metadata_vio(struct vio * vio,physical_block_number_t physical,bio_end_io_t callback,vdo_action_fn error_handler,blk_opf_t operation,char * data,int size)342 void __submit_metadata_vio(struct vio *vio, physical_block_number_t physical,
343 			   bio_end_io_t callback, vdo_action_fn error_handler,
344 			   blk_opf_t operation, char *data, int size)
345 {
346 	int result;
347 	struct vdo_completion *completion = &vio->completion;
348 	const struct admin_state_code *code = vdo_get_admin_state(completion->vdo);
349 
350 
351 	VDO_ASSERT_LOG_ONLY(!code->quiescent, "I/O not allowed in state %s", code->name);
352 
353 	vdo_reset_completion(completion);
354 	completion->error_handler = error_handler;
355 	result = vio_reset_bio_with_size(vio, data, size, callback, operation | REQ_META,
356 					 physical);
357 	if (result != VDO_SUCCESS) {
358 		continue_vio(vio, result);
359 		return;
360 	}
361 
362 	vdo_set_completion_callback(completion, vdo_submit_vio,
363 				    get_vio_bio_zone_thread_id(vio));
364 	vdo_launch_completion_with_priority(completion, get_metadata_priority(vio));
365 }
366 
367 /**
368  * vdo_make_io_submitter() - Create an io_submitter structure.
369  * @thread_count: Number of bio-submission threads to set up.
370  * @rotation_interval: Interval to use when rotating between bio-submission threads when enqueuing
371  *                     completions.
372  * @max_requests_active: Number of bios for merge tracking.
373  * @vdo: The vdo which will use this submitter.
374  * @io_submitter_ptr: pointer to the new data structure.
375  *
376  * Return: VDO_SUCCESS or an error.
377  */
vdo_make_io_submitter(unsigned int thread_count,unsigned int rotation_interval,unsigned int max_requests_active,struct vdo * vdo,struct io_submitter ** io_submitter_ptr)378 int vdo_make_io_submitter(unsigned int thread_count, unsigned int rotation_interval,
379 			  unsigned int max_requests_active, struct vdo *vdo,
380 			  struct io_submitter **io_submitter_ptr)
381 {
382 	unsigned int i;
383 	struct io_submitter *io_submitter;
384 	int result;
385 
386 	result = vdo_allocate_extended(struct io_submitter, thread_count,
387 				       struct bio_queue_data, "bio submission data",
388 				       &io_submitter);
389 	if (result != VDO_SUCCESS)
390 		return result;
391 
392 	io_submitter->bio_queue_rotation_interval = rotation_interval;
393 
394 	/* Setup for each bio-submission work queue */
395 	for (i = 0; i < thread_count; i++) {
396 		struct bio_queue_data *bio_queue_data = &io_submitter->bio_queue_data[i];
397 
398 		mutex_init(&bio_queue_data->lock);
399 		/*
400 		 * One I/O operation per request, but both first & last sector numbers.
401 		 *
402 		 * If requests are assigned to threads round-robin, they should be distributed
403 		 * quite evenly. But if they're assigned based on PBN, things can sometimes be very
404 		 * uneven. So for now, we'll assume that all requests *may* wind up on one thread,
405 		 * and thus all in the same map.
406 		 */
407 		result = vdo_int_map_create(max_requests_active * 2,
408 					    &bio_queue_data->map);
409 		if (result != VDO_SUCCESS) {
410 			/*
411 			 * Clean up the partially initialized bio-queue entirely and indicate that
412 			 * initialization failed.
413 			 */
414 			vdo_log_error("bio map initialization failed %d", result);
415 			vdo_cleanup_io_submitter(io_submitter);
416 			vdo_free_io_submitter(io_submitter);
417 			return result;
418 		}
419 
420 		bio_queue_data->queue_number = i;
421 		result = vdo_make_thread(vdo, vdo->thread_config.bio_threads[i],
422 					 &bio_queue_type, 1, (void **) &bio_queue_data);
423 		if (result != VDO_SUCCESS) {
424 			/*
425 			 * Clean up the partially initialized bio-queue entirely and indicate that
426 			 * initialization failed.
427 			 */
428 			vdo_int_map_free(vdo_forget(bio_queue_data->map));
429 			vdo_log_error("bio queue initialization failed %d", result);
430 			vdo_cleanup_io_submitter(io_submitter);
431 			vdo_free_io_submitter(io_submitter);
432 			return result;
433 		}
434 
435 		bio_queue_data->queue = vdo->threads[vdo->thread_config.bio_threads[i]].queue;
436 		io_submitter->num_bio_queues_used++;
437 	}
438 
439 	*io_submitter_ptr = io_submitter;
440 
441 	return VDO_SUCCESS;
442 }
443 
444 /**
445  * vdo_cleanup_io_submitter() - Tear down the io_submitter fields as needed for a physical layer.
446  * @io_submitter: The I/O submitter data to tear down; may be NULL.
447  */
vdo_cleanup_io_submitter(struct io_submitter * io_submitter)448 void vdo_cleanup_io_submitter(struct io_submitter *io_submitter)
449 {
450 	int i;
451 
452 	if (io_submitter == NULL)
453 		return;
454 
455 	for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--)
456 		vdo_finish_work_queue(io_submitter->bio_queue_data[i].queue);
457 }
458 
459 /**
460  * vdo_free_io_submitter() - Free the io_submitter fields and structure as needed.
461  * @io_submitter: The I/O submitter data to destroy.
462  *
463  * This must be called after vdo_cleanup_io_submitter(). It is used to release resources late in
464  * the shutdown process to avoid or reduce the chance of race conditions.
465  */
vdo_free_io_submitter(struct io_submitter * io_submitter)466 void vdo_free_io_submitter(struct io_submitter *io_submitter)
467 {
468 	int i;
469 
470 	if (io_submitter == NULL)
471 		return;
472 
473 	for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--) {
474 		io_submitter->num_bio_queues_used--;
475 		/* vdo_destroy() will free the work queue, so just give up our reference to it. */
476 		vdo_forget(io_submitter->bio_queue_data[i].queue);
477 		vdo_int_map_free(vdo_forget(io_submitter->bio_queue_data[i].map));
478 	}
479 	vdo_free(io_submitter);
480 }
481