xref: /linux/drivers/md/dm-pcache/cache_req.c (revision e3966940559d52aa1800a008dcfeec218dd31f88)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 
3 #include "cache.h"
4 #include "backing_dev.h"
5 #include "cache_dev.h"
6 #include "dm_pcache.h"
7 
8 static int cache_data_head_init(struct pcache_cache *cache)
9 {
10 	struct pcache_cache_segment *next_seg;
11 	struct pcache_cache_data_head *data_head;
12 
13 	data_head = get_data_head(cache);
14 	next_seg = get_cache_segment(cache);
15 	if (!next_seg)
16 		return -EBUSY;
17 
18 	cache_seg_get(next_seg);
19 	data_head->head_pos.cache_seg = next_seg;
20 	data_head->head_pos.seg_off = 0;
21 
22 	return 0;
23 }
24 
25 /**
26  * cache_data_alloc - Allocate data for a cache key.
27  * @cache: Pointer to the cache structure.
28  * @key: Pointer to the cache key to allocate data for.
29  *
30  * This function tries to allocate space from the cache segment specified by the
31  * data head. If the remaining space in the segment is insufficient to allocate
32  * the requested length for the cache key, it will allocate whatever is available
33  * and adjust the key's length accordingly. This function does not allocate
34  * space that crosses segment boundaries.
35  */
36 static int cache_data_alloc(struct pcache_cache *cache, struct pcache_cache_key *key)
37 {
38 	struct pcache_cache_data_head *data_head;
39 	struct pcache_cache_pos *head_pos;
40 	struct pcache_cache_segment *cache_seg;
41 	u32 seg_remain;
42 	u32 allocated = 0, to_alloc;
43 	int ret = 0;
44 
45 	preempt_disable();
46 	data_head = get_data_head(cache);
47 again:
48 	to_alloc = key->len - allocated;
49 	if (!data_head->head_pos.cache_seg) {
50 		seg_remain = 0;
51 	} else {
52 		cache_pos_copy(&key->cache_pos, &data_head->head_pos);
53 		key->seg_gen = key->cache_pos.cache_seg->gen;
54 
55 		head_pos = &data_head->head_pos;
56 		cache_seg = head_pos->cache_seg;
57 		seg_remain = cache_seg_remain(head_pos);
58 	}
59 
60 	if (seg_remain > to_alloc) {
61 		/* If remaining space in segment is sufficient for the cache key, allocate it. */
62 		cache_pos_advance(head_pos, to_alloc);
63 		allocated += to_alloc;
64 		cache_seg_get(cache_seg);
65 	} else if (seg_remain) {
66 		/* If remaining space is not enough, allocate the remaining space and adjust the cache key length. */
67 		cache_pos_advance(head_pos, seg_remain);
68 		key->len = seg_remain;
69 
70 		/* Get for key: obtain a reference to the cache segment for the key. */
71 		cache_seg_get(cache_seg);
72 		/* Put for head_pos->cache_seg: release the reference for the current head's segment. */
73 		cache_seg_put(head_pos->cache_seg);
74 		head_pos->cache_seg = NULL;
75 	} else {
76 		/* Initialize a new data head if no segment is available. */
77 		ret = cache_data_head_init(cache);
78 		if (ret)
79 			goto out;
80 
81 		goto again;
82 	}
83 
84 out:
85 	preempt_enable();
86 
87 	return ret;
88 }
89 
90 static int cache_copy_from_req_bio(struct pcache_cache *cache, struct pcache_cache_key *key,
91 				struct pcache_request *pcache_req, u32 bio_off)
92 {
93 	struct pcache_cache_pos *pos = &key->cache_pos;
94 	struct pcache_segment *segment;
95 
96 	segment = &pos->cache_seg->segment;
97 
98 	return segment_copy_from_bio(segment, pos->seg_off, key->len, pcache_req->bio, bio_off);
99 }
100 
101 static int cache_copy_to_req_bio(struct pcache_cache *cache, struct pcache_request *pcache_req,
102 			    u32 bio_off, u32 len, struct pcache_cache_pos *pos, u64 key_gen)
103 {
104 	struct pcache_cache_segment *cache_seg = pos->cache_seg;
105 	struct pcache_segment *segment = &cache_seg->segment;
106 	int ret;
107 
108 	spin_lock(&cache_seg->gen_lock);
109 	if (key_gen < cache_seg->gen) {
110 		spin_unlock(&cache_seg->gen_lock);
111 		return -EINVAL;
112 	}
113 
114 	ret = segment_copy_to_bio(segment, pos->seg_off, len, pcache_req->bio, bio_off);
115 	spin_unlock(&cache_seg->gen_lock);
116 
117 	return ret;
118 }
119 
120 /**
121  * miss_read_end_req - Handle the end of a miss read request.
122  * @backing_req: Pointer to the request structure.
123  * @read_ret: Return value of read.
124  *
125  * This function is called when a backing request to read data from
126  * the backing_dev is completed. If the key associated with the request
127  * is empty (a placeholder), it allocates cache space for the key,
128  * copies the data read from the bio into the cache, and updates
129  * the key's status. If the key has been overwritten by a write
130  * request during this process, it will be deleted from the cache
131  * tree and no further action will be taken.
132  */
133 static void miss_read_end_req(struct pcache_backing_dev_req *backing_req, int read_ret)
134 {
135 	void *priv_data = backing_req->priv_data;
136 	struct pcache_request *pcache_req = backing_req->req.upper_req;
137 	struct pcache_cache *cache = backing_req->backing_dev->cache;
138 	int ret;
139 
140 	if (priv_data) {
141 		struct pcache_cache_key *key;
142 		struct pcache_cache_subtree *cache_subtree;
143 
144 		key = (struct pcache_cache_key *)priv_data;
145 		cache_subtree = key->cache_subtree;
146 
147 		/* if this key was deleted from cache_subtree by a write, key->flags should be cleared,
148 		 * so if cache_key_empty() return true, this key is still in cache_subtree
149 		 */
150 		spin_lock(&cache_subtree->tree_lock);
151 		if (cache_key_empty(key)) {
152 			/* Check if the backing request was successful. */
153 			if (read_ret) {
154 				cache_key_delete(key);
155 				goto unlock;
156 			}
157 
158 			/* Allocate cache space for the key and copy data from the backing_dev. */
159 			ret = cache_data_alloc(cache, key);
160 			if (ret) {
161 				cache_key_delete(key);
162 				goto unlock;
163 			}
164 
165 			ret = cache_copy_from_req_bio(cache, key, pcache_req, backing_req->req.bio_off);
166 			if (ret) {
167 				cache_seg_put(key->cache_pos.cache_seg);
168 				cache_key_delete(key);
169 				goto unlock;
170 			}
171 			key->flags &= ~PCACHE_CACHE_KEY_FLAGS_EMPTY;
172 			key->flags |= PCACHE_CACHE_KEY_FLAGS_CLEAN;
173 
174 			/* Append the key to the cache. */
175 			ret = cache_key_append(cache, key, false);
176 			if (ret) {
177 				cache_seg_put(key->cache_pos.cache_seg);
178 				cache_key_delete(key);
179 				goto unlock;
180 			}
181 		}
182 unlock:
183 		spin_unlock(&cache_subtree->tree_lock);
184 		cache_key_put(key);
185 	}
186 }
187 
188 /**
189  * submit_cache_miss_req - Submit a backing request when cache data is missing
190  * @cache: The cache context that manages cache operations
191  * @backing_req: The cache request containing information about the read request
192  *
193  * This function is used to handle cases where a cache read request cannot locate
194  * the required data in the cache. When such a miss occurs during `cache_subtree_walk`,
195  * it triggers a backing read request to fetch data from the backing storage.
196  *
197  * If `pcache_req->priv_data` is set, it points to a `pcache_cache_key`, representing
198  * a new cache key to be inserted into the cache. The function calls `cache_key_insert`
199  * to attempt adding the key. On insertion failure, it releases the key reference and
200  * clears `priv_data` to avoid further processing.
201  */
202 static void submit_cache_miss_req(struct pcache_cache *cache, struct pcache_backing_dev_req *backing_req)
203 {
204 	if (backing_req->priv_data) {
205 		struct pcache_cache_key *key;
206 
207 		/* Attempt to insert the key into the cache if priv_data is set */
208 		key = (struct pcache_cache_key *)backing_req->priv_data;
209 		cache_key_insert(&cache->req_key_tree, key, true);
210 	}
211 	backing_dev_req_submit(backing_req, false);
212 }
213 
214 static void cache_miss_req_free(struct pcache_backing_dev_req *backing_req)
215 {
216 	struct pcache_cache_key *key;
217 
218 	if (backing_req->priv_data) {
219 		key = backing_req->priv_data;
220 		backing_req->priv_data = NULL;
221 		cache_key_put(key); /* for ->priv_data */
222 		cache_key_put(key); /* for init ref in alloc */
223 	}
224 
225 	backing_dev_req_end(backing_req);
226 }
227 
228 static struct pcache_backing_dev_req *cache_miss_req_alloc(struct pcache_cache *cache,
229 							   struct pcache_request *parent,
230 							   gfp_t gfp_mask)
231 {
232 	struct pcache_backing_dev *backing_dev = cache->backing_dev;
233 	struct pcache_backing_dev_req *backing_req;
234 	struct pcache_cache_key *key = NULL;
235 	struct pcache_backing_dev_req_opts req_opts = { 0 };
236 
237 	req_opts.type = BACKING_DEV_REQ_TYPE_REQ;
238 	req_opts.gfp_mask = gfp_mask;
239 	req_opts.req.upper_req = parent;
240 
241 	backing_req = backing_dev_req_alloc(backing_dev, &req_opts);
242 	if (!backing_req)
243 		return NULL;
244 
245 	key = cache_key_alloc(&cache->req_key_tree, gfp_mask);
246 	if (!key)
247 		goto free_backing_req;
248 
249 	cache_key_get(key);
250 	backing_req->priv_data = key;
251 
252 	return backing_req;
253 
254 free_backing_req:
255 	cache_miss_req_free(backing_req);
256 	return NULL;
257 }
258 
259 static void cache_miss_req_init(struct pcache_cache *cache,
260 				struct pcache_backing_dev_req *backing_req,
261 				struct pcache_request *parent,
262 				u32 off, u32 len, bool insert_key)
263 {
264 	struct pcache_cache_key *key;
265 	struct pcache_backing_dev_req_opts req_opts = { 0 };
266 
267 	req_opts.type = BACKING_DEV_REQ_TYPE_REQ;
268 	req_opts.req.upper_req = parent;
269 	req_opts.req.req_off = off;
270 	req_opts.req.len = len;
271 	req_opts.end_fn = miss_read_end_req;
272 
273 	backing_dev_req_init(backing_req, &req_opts);
274 
275 	if (insert_key) {
276 		key = backing_req->priv_data;
277 		key->off = parent->off + off;
278 		key->len = len;
279 		key->flags |= PCACHE_CACHE_KEY_FLAGS_EMPTY;
280 	} else {
281 		key = backing_req->priv_data;
282 		backing_req->priv_data = NULL;
283 		cache_key_put(key);
284 		cache_key_put(key);
285 	}
286 }
287 
288 static struct pcache_backing_dev_req *get_pre_alloc_req(struct pcache_cache_subtree_walk_ctx *ctx)
289 {
290 	struct pcache_cache *cache = ctx->cache_tree->cache;
291 	struct pcache_request *pcache_req = ctx->pcache_req;
292 	struct pcache_backing_dev_req *backing_req;
293 
294 	if (ctx->pre_alloc_req) {
295 		backing_req = ctx->pre_alloc_req;
296 		ctx->pre_alloc_req = NULL;
297 
298 		return backing_req;
299 	}
300 
301 	return cache_miss_req_alloc(cache, pcache_req, GFP_NOWAIT);
302 }
303 
304 /*
305  * In the process of walking the cache tree to locate cached data, this
306  * function handles the situation where the requested data range lies
307  * entirely before an existing cache node (`key_tmp`). This outcome
308  * signifies that the target data is absent from the cache (cache miss).
309  *
310  * To fulfill this portion of the read request, the function creates a
311  * backing request (`backing_req`) for the missing data range represented
312  * by `key`. It then appends this request to the submission list in the
313  * `ctx`, which will later be processed to retrieve the data from backing
314  * storage. After setting up the backing request, `req_done` in `ctx` is
315  * updated to reflect the length of the handled range, and the range
316  * in `key` is adjusted by trimming off the portion that is now handled.
317  *
318  * The scenario handled here:
319  *
320  *	  |--------|			  key_tmp (existing cached range)
321  * |====|					   key (requested range, preceding key_tmp)
322  *
323  * Since `key` is before `key_tmp`, it signifies that the requested data
324  * range is missing in the cache (cache miss) and needs retrieval from
325  * backing storage.
326  */
327 static int read_before(struct pcache_cache_key *key, struct pcache_cache_key *key_tmp,
328 		struct pcache_cache_subtree_walk_ctx *ctx)
329 {
330 	struct pcache_backing_dev_req *backing_req;
331 	struct pcache_cache *cache = ctx->cache_tree->cache;
332 
333 	/*
334 	 * In this scenario, `key` represents a range that precedes `key_tmp`,
335 	 * meaning the requested data range is missing from the cache tree
336 	 * and must be retrieved from the backing_dev.
337 	 */
338 	backing_req = get_pre_alloc_req(ctx);
339 	if (!backing_req)
340 		return SUBTREE_WALK_RET_NEED_REQ;
341 
342 	cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, key->len, true);
343 
344 	list_add(&backing_req->node, ctx->submit_req_list);
345 	ctx->req_done += key->len;
346 	cache_key_cutfront(key, key->len);
347 
348 	return SUBTREE_WALK_RET_OK;
349 }
350 
351 /*
352  * During cache_subtree_walk, this function manages a scenario where part of the
353  * requested data range overlaps with an existing cache node (`key_tmp`).
354  *
355  *	 |----------------|  key_tmp (existing cached range)
356  * |===========|		   key (requested range, overlapping the tail of key_tmp)
357  */
358 static int read_overlap_tail(struct pcache_cache_key *key, struct pcache_cache_key *key_tmp,
359 		struct pcache_cache_subtree_walk_ctx *ctx)
360 {
361 	struct pcache_cache *cache = ctx->cache_tree->cache;
362 	struct pcache_backing_dev_req *backing_req;
363 	u32 io_len;
364 	int ret;
365 
366 	/*
367 	 * Calculate the length of the non-overlapping portion of `key`
368 	 * before `key_tmp`, representing the data missing in the cache.
369 	 */
370 	io_len = cache_key_lstart(key_tmp) - cache_key_lstart(key);
371 	if (io_len) {
372 		backing_req = get_pre_alloc_req(ctx);
373 		if (!backing_req)
374 			return SUBTREE_WALK_RET_NEED_REQ;
375 
376 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, io_len, true);
377 
378 		list_add(&backing_req->node, ctx->submit_req_list);
379 		ctx->req_done += io_len;
380 		cache_key_cutfront(key, io_len);
381 	}
382 
383 	/*
384 	 * Handle the overlapping portion by calculating the length of
385 	 * the remaining data in `key` that coincides with `key_tmp`.
386 	 */
387 	io_len = cache_key_lend(key) - cache_key_lstart(key_tmp);
388 	if (cache_key_empty(key_tmp)) {
389 		backing_req = get_pre_alloc_req(ctx);
390 		if (!backing_req)
391 			return SUBTREE_WALK_RET_NEED_REQ;
392 
393 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, io_len, false);
394 		submit_cache_miss_req(cache, backing_req);
395 	} else {
396 		ret = cache_copy_to_req_bio(ctx->cache_tree->cache, ctx->pcache_req, ctx->req_done,
397 					io_len, &key_tmp->cache_pos, key_tmp->seg_gen);
398 		if (ret) {
399 			if (ret == -EINVAL) {
400 				cache_key_delete(key_tmp);
401 				return SUBTREE_WALK_RET_RESEARCH;
402 			}
403 
404 			ctx->ret = ret;
405 			return SUBTREE_WALK_RET_ERR;
406 		}
407 	}
408 
409 	ctx->req_done += io_len;
410 	cache_key_cutfront(key, io_len);
411 
412 	return SUBTREE_WALK_RET_OK;
413 }
414 
415 /*
416  *    |----|          key_tmp (existing cached range)
417  * |==========|       key (requested range)
418  */
419 static int read_overlap_contain(struct pcache_cache_key *key, struct pcache_cache_key *key_tmp,
420 		struct pcache_cache_subtree_walk_ctx *ctx)
421 {
422 	struct pcache_cache *cache = ctx->cache_tree->cache;
423 	struct pcache_backing_dev_req *backing_req;
424 	u32 io_len;
425 	int ret;
426 
427 	/*
428 	 * Calculate the non-overlapping part of `key` before `key_tmp`
429 	 * to identify the missing data length.
430 	 */
431 	io_len = cache_key_lstart(key_tmp) - cache_key_lstart(key);
432 	if (io_len) {
433 		backing_req = get_pre_alloc_req(ctx);
434 		if (!backing_req)
435 			return SUBTREE_WALK_RET_NEED_REQ;
436 
437 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, io_len, true);
438 
439 		list_add(&backing_req->node, ctx->submit_req_list);
440 
441 		ctx->req_done += io_len;
442 		cache_key_cutfront(key, io_len);
443 	}
444 
445 	/*
446 	 * Handle the overlapping portion between `key` and `key_tmp`.
447 	 */
448 	io_len = key_tmp->len;
449 	if (cache_key_empty(key_tmp)) {
450 		backing_req = get_pre_alloc_req(ctx);
451 		if (!backing_req)
452 			return SUBTREE_WALK_RET_NEED_REQ;
453 
454 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, io_len, false);
455 		submit_cache_miss_req(cache, backing_req);
456 	} else {
457 		ret = cache_copy_to_req_bio(ctx->cache_tree->cache, ctx->pcache_req, ctx->req_done,
458 					io_len, &key_tmp->cache_pos, key_tmp->seg_gen);
459 		if (ret) {
460 			if (ret == -EINVAL) {
461 				cache_key_delete(key_tmp);
462 				return SUBTREE_WALK_RET_RESEARCH;
463 			}
464 
465 			ctx->ret = ret;
466 			return SUBTREE_WALK_RET_ERR;
467 		}
468 	}
469 
470 	ctx->req_done += io_len;
471 	cache_key_cutfront(key, io_len);
472 
473 	return SUBTREE_WALK_RET_OK;
474 }
475 
476 /*
477  *	 |-----------|		key_tmp (existing cached range)
478  *	   |====|			key (requested range, fully within key_tmp)
479  *
480  * If `key_tmp` contains valid cached data, this function copies the relevant
481  * portion to the request's bio. Otherwise, it sends a backing request to
482  * fetch the required data range.
483  */
484 static int read_overlap_contained(struct pcache_cache_key *key, struct pcache_cache_key *key_tmp,
485 		struct pcache_cache_subtree_walk_ctx *ctx)
486 {
487 	struct pcache_cache *cache = ctx->cache_tree->cache;
488 	struct pcache_backing_dev_req *backing_req;
489 	struct pcache_cache_pos pos;
490 	int ret;
491 
492 	/*
493 	 * Check if `key_tmp` is empty, indicating a miss. If so, initiate
494 	 * a backing request to fetch the required data for `key`.
495 	 */
496 	if (cache_key_empty(key_tmp)) {
497 		backing_req = get_pre_alloc_req(ctx);
498 		if (!backing_req)
499 			return SUBTREE_WALK_RET_NEED_REQ;
500 
501 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, key->len, false);
502 		submit_cache_miss_req(cache, backing_req);
503 	} else {
504 		cache_pos_copy(&pos, &key_tmp->cache_pos);
505 		cache_pos_advance(&pos, cache_key_lstart(key) - cache_key_lstart(key_tmp));
506 
507 		ret = cache_copy_to_req_bio(ctx->cache_tree->cache, ctx->pcache_req, ctx->req_done,
508 					key->len, &pos, key_tmp->seg_gen);
509 		if (ret) {
510 			if (ret == -EINVAL) {
511 				cache_key_delete(key_tmp);
512 				return SUBTREE_WALK_RET_RESEARCH;
513 			}
514 
515 			ctx->ret = ret;
516 			return SUBTREE_WALK_RET_ERR;
517 		}
518 	}
519 
520 	ctx->req_done += key->len;
521 	cache_key_cutfront(key, key->len);
522 
523 	return SUBTREE_WALK_RET_OK;
524 }
525 
526 /*
527  *	 |--------|		  key_tmp (existing cached range)
528  *	   |==========|	  key (requested range, overlapping the head of key_tmp)
529  */
530 static int read_overlap_head(struct pcache_cache_key *key, struct pcache_cache_key *key_tmp,
531 		struct pcache_cache_subtree_walk_ctx *ctx)
532 {
533 	struct pcache_cache *cache = ctx->cache_tree->cache;
534 	struct pcache_backing_dev_req *backing_req;
535 	struct pcache_cache_pos pos;
536 	u32 io_len;
537 	int ret;
538 
539 	io_len = cache_key_lend(key_tmp) - cache_key_lstart(key);
540 
541 	if (cache_key_empty(key_tmp)) {
542 		backing_req = get_pre_alloc_req(ctx);
543 		if (!backing_req)
544 			return SUBTREE_WALK_RET_NEED_REQ;
545 
546 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, io_len, false);
547 		submit_cache_miss_req(cache, backing_req);
548 	} else {
549 		cache_pos_copy(&pos, &key_tmp->cache_pos);
550 		cache_pos_advance(&pos, cache_key_lstart(key) - cache_key_lstart(key_tmp));
551 
552 		ret = cache_copy_to_req_bio(ctx->cache_tree->cache, ctx->pcache_req, ctx->req_done,
553 					io_len, &pos, key_tmp->seg_gen);
554 		if (ret) {
555 			if (ret == -EINVAL) {
556 				cache_key_delete(key_tmp);
557 				return SUBTREE_WALK_RET_RESEARCH;
558 			}
559 
560 			ctx->ret = ret;
561 			return SUBTREE_WALK_RET_ERR;
562 		}
563 	}
564 
565 	ctx->req_done += io_len;
566 	cache_key_cutfront(key, io_len);
567 
568 	return SUBTREE_WALK_RET_OK;
569 }
570 
571 /**
572  * read_walk_finally - Finalizes the cache read tree walk by submitting any
573  *					 remaining backing requests
574  * @ctx:	Context structure holding information about the cache,
575  *		read request, and submission list
576  * @ret:	the return value after this walk.
577  *
578  * This function is called at the end of the `cache_subtree_walk` during a
579  * cache read operation. It completes the walk by checking if any data
580  * requested by `key` was not found in the cache tree, and if so, it sends
581  * a backing request to retrieve that data. Then, it iterates through the
582  * submission list of backing requests created during the walk, removing
583  * each request from the list and submitting it.
584  *
585  * The scenario managed here includes:
586  * - Sending a backing request for the remaining length of `key` if it was
587  *   not fulfilled by existing cache entries.
588  * - Iterating through `ctx->submit_req_list` to submit each backing request
589  *   enqueued during the walk.
590  *
591  * This ensures all necessary backing requests for cache misses are submitted
592  * to the backing storage to retrieve any data that could not be found in
593  * the cache.
594  */
595 static int read_walk_finally(struct pcache_cache_subtree_walk_ctx *ctx, int ret)
596 {
597 	struct pcache_cache *cache = ctx->cache_tree->cache;
598 	struct pcache_backing_dev_req *backing_req, *next_req;
599 	struct pcache_cache_key *key = ctx->key;
600 
601 	list_for_each_entry_safe(backing_req, next_req, ctx->submit_req_list, node) {
602 		list_del_init(&backing_req->node);
603 		submit_cache_miss_req(ctx->cache_tree->cache, backing_req);
604 	}
605 
606 	if (ret != SUBTREE_WALK_RET_OK)
607 		return ret;
608 
609 	if (key->len) {
610 		backing_req = get_pre_alloc_req(ctx);
611 		if (!backing_req)
612 			return SUBTREE_WALK_RET_NEED_REQ;
613 
614 		cache_miss_req_init(cache, backing_req, ctx->pcache_req, ctx->req_done, key->len, true);
615 		submit_cache_miss_req(cache, backing_req);
616 		ctx->req_done += key->len;
617 	}
618 
619 	return SUBTREE_WALK_RET_OK;
620 }
621 
622 /*
623  * This function is used within `cache_subtree_walk` to determine whether the
624  * read operation has covered the requested data length. It compares the
625  * amount of data processed (`ctx->req_done`) with the total data length
626  * specified in the original request (`ctx->pcache_req->data_len`).
627  *
628  * If `req_done` meets or exceeds the required data length, the function
629  * returns `true`, indicating the walk is complete. Otherwise, it returns `false`,
630  * signaling that additional data processing is needed to fulfill the request.
631  */
632 static bool read_walk_done(struct pcache_cache_subtree_walk_ctx *ctx)
633 {
634 	return (ctx->req_done >= ctx->pcache_req->data_len);
635 }
636 
637 /**
638  * cache_read - Process a read request by traversing the cache tree
639  * @cache:	 Cache structure holding cache trees and related configurations
640  * @pcache_req:   Request structure with information about the data to read
641  *
642  * This function attempts to fulfill a read request by traversing the cache tree(s)
643  * to locate cached data for the requested range. If parts of the data are missing
644  * in the cache, backing requests are generated to retrieve the required segments.
645  *
646  * The function operates by initializing a key for the requested data range and
647  * preparing a context (`walk_ctx`) to manage the cache tree traversal. The context
648  * includes pointers to functions (e.g., `read_before`, `read_overlap_tail`) that handle
649  * specific conditions encountered during the traversal. The `walk_finally` and `walk_done`
650  * functions manage the end stages of the traversal, while the `delete_key_list` and
651  * `submit_req_list` lists track any keys to be deleted or requests to be submitted.
652  *
653  * The function first calculates the requested range and checks if it fits within the
654  * current cache tree (based on the tree's size limits). It then locks the cache tree
655  * and performs a search to locate any matching keys. If there are outdated keys,
656  * these are deleted, and the search is restarted to ensure accurate data retrieval.
657  *
658  * If the requested range spans multiple cache trees, the function moves on to the
659  * next tree once the current range has been processed. This continues until the
660  * entire requested data length has been handled.
661  */
662 static int cache_read(struct pcache_cache *cache, struct pcache_request *pcache_req)
663 {
664 	struct pcache_cache_key key_data = { .off = pcache_req->off, .len = pcache_req->data_len };
665 	struct pcache_cache_subtree *cache_subtree;
666 	struct pcache_cache_key *key_tmp = NULL, *key_next;
667 	struct rb_node *prev_node = NULL;
668 	struct pcache_cache_key *key = &key_data;
669 	struct pcache_cache_subtree_walk_ctx walk_ctx = { 0 };
670 	struct pcache_backing_dev_req *backing_req, *next_req;
671 	LIST_HEAD(delete_key_list);
672 	LIST_HEAD(submit_req_list);
673 	int ret;
674 
675 	walk_ctx.cache_tree = &cache->req_key_tree;
676 	walk_ctx.req_done = 0;
677 	walk_ctx.pcache_req = pcache_req;
678 	walk_ctx.before = read_before;
679 	walk_ctx.overlap_tail = read_overlap_tail;
680 	walk_ctx.overlap_head = read_overlap_head;
681 	walk_ctx.overlap_contain = read_overlap_contain;
682 	walk_ctx.overlap_contained = read_overlap_contained;
683 	walk_ctx.walk_finally = read_walk_finally;
684 	walk_ctx.walk_done = read_walk_done;
685 	walk_ctx.delete_key_list = &delete_key_list;
686 	walk_ctx.submit_req_list = &submit_req_list;
687 
688 next:
689 	key->off = pcache_req->off + walk_ctx.req_done;
690 	key->len = pcache_req->data_len - walk_ctx.req_done;
691 	if (key->len > PCACHE_CACHE_SUBTREE_SIZE - (key->off & PCACHE_CACHE_SUBTREE_SIZE_MASK))
692 		key->len = PCACHE_CACHE_SUBTREE_SIZE - (key->off & PCACHE_CACHE_SUBTREE_SIZE_MASK);
693 
694 	cache_subtree = get_subtree(&cache->req_key_tree, key->off);
695 	spin_lock(&cache_subtree->tree_lock);
696 search:
697 	prev_node = cache_subtree_search(cache_subtree, key, NULL, NULL, &delete_key_list);
698 	if (!list_empty(&delete_key_list)) {
699 		list_for_each_entry_safe(key_tmp, key_next, &delete_key_list, list_node) {
700 			list_del_init(&key_tmp->list_node);
701 			cache_key_delete(key_tmp);
702 		}
703 		goto search;
704 	}
705 
706 	walk_ctx.start_node = prev_node;
707 	walk_ctx.key = key;
708 
709 	ret = cache_subtree_walk(&walk_ctx);
710 	if (ret == SUBTREE_WALK_RET_RESEARCH)
711 		goto search;
712 	spin_unlock(&cache_subtree->tree_lock);
713 
714 	if (ret == SUBTREE_WALK_RET_ERR) {
715 		ret = walk_ctx.ret;
716 		goto out;
717 	}
718 
719 	if (ret == SUBTREE_WALK_RET_NEED_REQ) {
720 		walk_ctx.pre_alloc_req = cache_miss_req_alloc(cache, pcache_req, GFP_NOIO);
721 		pcache_dev_debug(CACHE_TO_PCACHE(cache), "allocate pre_alloc_req with GFP_NOIO");
722 	}
723 
724 	if (walk_ctx.req_done < pcache_req->data_len)
725 		goto next;
726 	ret = 0;
727 out:
728 	if (walk_ctx.pre_alloc_req)
729 		cache_miss_req_free(walk_ctx.pre_alloc_req);
730 
731 	list_for_each_entry_safe(backing_req, next_req, &submit_req_list, node) {
732 		list_del_init(&backing_req->node);
733 		backing_dev_req_end(backing_req);
734 	}
735 
736 	return ret;
737 }
738 
739 static int cache_write(struct pcache_cache *cache, struct pcache_request *pcache_req)
740 {
741 	struct pcache_cache_subtree *cache_subtree;
742 	struct pcache_cache_key *key;
743 	u64 offset = pcache_req->off;
744 	u32 length = pcache_req->data_len;
745 	u32 io_done = 0;
746 	int ret;
747 
748 	while (true) {
749 		if (io_done >= length)
750 			break;
751 
752 		key = cache_key_alloc(&cache->req_key_tree, GFP_NOIO);
753 		key->off = offset + io_done;
754 		key->len = length - io_done;
755 		if (key->len > PCACHE_CACHE_SUBTREE_SIZE - (key->off & PCACHE_CACHE_SUBTREE_SIZE_MASK))
756 			key->len = PCACHE_CACHE_SUBTREE_SIZE - (key->off & PCACHE_CACHE_SUBTREE_SIZE_MASK);
757 
758 		ret = cache_data_alloc(cache, key);
759 		if (ret) {
760 			cache_key_put(key);
761 			goto err;
762 		}
763 
764 		ret = cache_copy_from_req_bio(cache, key, pcache_req, io_done);
765 		if (ret) {
766 			cache_seg_put(key->cache_pos.cache_seg);
767 			cache_key_put(key);
768 			goto err;
769 		}
770 
771 		cache_subtree = get_subtree(&cache->req_key_tree, key->off);
772 		spin_lock(&cache_subtree->tree_lock);
773 		cache_key_insert(&cache->req_key_tree, key, true);
774 		ret = cache_key_append(cache, key, pcache_req->bio->bi_opf & REQ_FUA);
775 		if (ret) {
776 			cache_seg_put(key->cache_pos.cache_seg);
777 			cache_key_delete(key);
778 			goto unlock;
779 		}
780 
781 		io_done += key->len;
782 		spin_unlock(&cache_subtree->tree_lock);
783 	}
784 
785 	return 0;
786 unlock:
787 	spin_unlock(&cache_subtree->tree_lock);
788 err:
789 	return ret;
790 }
791 
792 /**
793  * cache_flush - Flush all ksets to persist any pending cache data
794  * @cache: Pointer to the cache structure
795  *
796  * This function iterates through all ksets associated with the provided `cache`
797  * and ensures that any data marked for persistence is written to media. For each
798  * kset, it acquires the kset lock, then invokes `cache_kset_close`, which handles
799  * the persistence logic for that kset.
800  *
801  * If `cache_kset_close` encounters an error, the function exits immediately with
802  * the respective error code, preventing the flush operation from proceeding to
803  * subsequent ksets.
804  */
805 int cache_flush(struct pcache_cache *cache)
806 {
807 	struct pcache_cache_kset *kset;
808 	int ret;
809 	u32 i;
810 
811 	for (i = 0; i < cache->n_ksets; i++) {
812 		kset = get_kset(cache, i);
813 
814 		spin_lock(&kset->kset_lock);
815 		ret = cache_kset_close(cache, kset);
816 		spin_unlock(&kset->kset_lock);
817 
818 		if (ret)
819 			return ret;
820 	}
821 
822 	return 0;
823 }
824 
825 int pcache_cache_handle_req(struct pcache_cache *cache, struct pcache_request *pcache_req)
826 {
827 	struct bio *bio = pcache_req->bio;
828 
829 	if (unlikely(bio->bi_opf & REQ_PREFLUSH))
830 		return cache_flush(cache);
831 
832 	if (bio_data_dir(bio) == READ)
833 		return cache_read(cache, pcache_req);
834 
835 	return cache_write(cache, pcache_req);
836 }
837