xref: /linux/fs/orangefs/orangefs-bufmap.c (revision e61bc5e4d87433c8759e7dc92bb640ef71a8970c)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * (C) 2001 Clemson University and The University of Chicago
4  *
5  * See COPYING in top-level directory.
6  */
7 #include "protocol.h"
8 #include "orangefs-kernel.h"
9 #include "orangefs-bufmap.h"
10 
11 struct slot_map {
12 	int c;
13 	wait_queue_head_t q;
14 	int count;
15 	unsigned long *map;
16 };
17 
18 static struct slot_map rw_map = {
19 	.c = -1,
20 	.q = __WAIT_QUEUE_HEAD_INITIALIZER(rw_map.q)
21 };
22 static struct slot_map readdir_map = {
23 	.c = -1,
24 	.q = __WAIT_QUEUE_HEAD_INITIALIZER(readdir_map.q)
25 };
26 
27 
28 static void install(struct slot_map *m, int count, unsigned long *map)
29 {
30 	spin_lock(&m->q.lock);
31 	m->c = m->count = count;
32 	m->map = map;
33 	wake_up_all_locked(&m->q);
34 	spin_unlock(&m->q.lock);
35 }
36 
37 static void mark_killed(struct slot_map *m)
38 {
39 	spin_lock(&m->q.lock);
40 	m->c -= m->count + 1;
41 	spin_unlock(&m->q.lock);
42 }
43 
44 static void run_down(struct slot_map *m)
45 {
46 	DEFINE_WAIT(wait);
47 	spin_lock(&m->q.lock);
48 	if (m->c != -1) {
49 		for (;;) {
50 			if (likely(list_empty(&wait.entry)))
51 				__add_wait_queue_entry_tail(&m->q, &wait);
52 			set_current_state(TASK_UNINTERRUPTIBLE);
53 
54 			if (m->c == -1)
55 				break;
56 
57 			spin_unlock(&m->q.lock);
58 			schedule();
59 			spin_lock(&m->q.lock);
60 		}
61 		__remove_wait_queue(&m->q, &wait);
62 		__set_current_state(TASK_RUNNING);
63 	}
64 	m->map = NULL;
65 	spin_unlock(&m->q.lock);
66 }
67 
68 static void put(struct slot_map *m, int slot)
69 {
70 	int v;
71 	spin_lock(&m->q.lock);
72 	__clear_bit(slot, m->map);
73 	v = ++m->c;
74 	if (v > 0)
75 		wake_up_locked(&m->q);
76 	if (unlikely(v == -1))     /* finished dying */
77 		wake_up_all_locked(&m->q);
78 	spin_unlock(&m->q.lock);
79 }
80 
81 static int wait_for_free(struct slot_map *m)
82 {
83 	long left = slot_timeout_secs * HZ;
84 	DEFINE_WAIT(wait);
85 
86 	do {
87 		long n = left, t;
88 		if (likely(list_empty(&wait.entry)))
89 			__add_wait_queue_entry_tail_exclusive(&m->q, &wait);
90 		set_current_state(TASK_INTERRUPTIBLE);
91 
92 		if (m->c > 0)
93 			break;
94 
95 		if (m->c < 0) {
96 			/* we are waiting for map to be installed */
97 			/* it would better be there soon, or we go away */
98 			if (n > ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ)
99 				n = ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ;
100 		}
101 		spin_unlock(&m->q.lock);
102 		t = schedule_timeout(n);
103 		spin_lock(&m->q.lock);
104 		if (unlikely(!t) && n != left && m->c < 0)
105 			left = t;
106 		else
107 			left = t + (left - n);
108 		if (signal_pending(current))
109 			left = -EINTR;
110 	} while (left > 0);
111 
112 	if (!list_empty(&wait.entry))
113 		list_del(&wait.entry);
114 	else if (left <= 0 && waitqueue_active(&m->q))
115 		__wake_up_locked_key(&m->q, TASK_INTERRUPTIBLE, NULL);
116 	__set_current_state(TASK_RUNNING);
117 
118 	if (likely(left > 0))
119 		return 0;
120 
121 	return left < 0 ? -EINTR : -ETIMEDOUT;
122 }
123 
124 static int get(struct slot_map *m)
125 {
126 	int res = 0;
127 	spin_lock(&m->q.lock);
128 	if (unlikely(m->c <= 0))
129 		res = wait_for_free(m);
130 	if (likely(!res)) {
131 		m->c--;
132 		res = find_first_zero_bit(m->map, m->count);
133 		__set_bit(res, m->map);
134 	}
135 	spin_unlock(&m->q.lock);
136 	return res;
137 }
138 
139 /* used to describe mapped buffers */
140 struct orangefs_bufmap_desc {
141 	void __user *uaddr;		/* user space address pointer */
142 	struct folio **folio_array;
143 	/*
144 	 * folio_offsets could be needed when userspace sets custom
145 	 * sizes in user_desc, or when folios aren't all backed by
146 	 * 2MB THPs.
147 	 */
148 	size_t *folio_offsets;
149 	int folio_count;
150 	bool is_two_2mib_chunks;
151 };
152 
153 static struct orangefs_bufmap {
154 	int desc_size;
155 	int desc_shift;
156 	int desc_count;
157 	int total_size;
158 	int page_count;
159 	int folio_count;
160 
161 	struct page **page_array;
162 	struct folio **folio_array;
163 	struct orangefs_bufmap_desc *desc_array;
164 
165 	/* array to track usage of buffer descriptors */
166 	unsigned long *buffer_index_array;
167 
168 	/* array to track usage of buffer descriptors for readdir */
169 #define N DIV_ROUND_UP(ORANGEFS_READDIR_DEFAULT_DESC_COUNT, BITS_PER_LONG)
170 	unsigned long readdir_index_array[N];
171 #undef N
172 } *__orangefs_bufmap;
173 
174 static DEFINE_SPINLOCK(orangefs_bufmap_lock);
175 
176 static void
177 orangefs_bufmap_unmap(struct orangefs_bufmap *bufmap)
178 {
179 	unpin_user_pages(bufmap->page_array, bufmap->page_count);
180 }
181 
182 static void
183 orangefs_bufmap_free(struct orangefs_bufmap *bufmap)
184 {
185 	int i;
186 
187 	if (!bufmap)
188 		return;
189 
190 	for (i = 0; i < bufmap->desc_count; i++) {
191 		kfree(bufmap->desc_array[i].folio_array);
192 		kfree(bufmap->desc_array[i].folio_offsets);
193 		bufmap->desc_array[i].folio_array = NULL;
194 		bufmap->desc_array[i].folio_offsets = NULL;
195 	}
196 	kfree(bufmap->page_array);
197 	kfree(bufmap->desc_array);
198 	bitmap_free(bufmap->buffer_index_array);
199 	kfree(bufmap);
200 }
201 
202 /*
203  * XXX: Can the size and shift change while the caller gives up the
204  * XXX: lock between calling this and doing something useful?
205  */
206 
207 int orangefs_bufmap_size_query(void)
208 {
209 	struct orangefs_bufmap *bufmap;
210 	int size = 0;
211 	spin_lock(&orangefs_bufmap_lock);
212 	bufmap = __orangefs_bufmap;
213 	if (bufmap)
214 		size = bufmap->desc_size;
215 	spin_unlock(&orangefs_bufmap_lock);
216 	return size;
217 }
218 
219 static DECLARE_WAIT_QUEUE_HEAD(bufmap_waitq);
220 static DECLARE_WAIT_QUEUE_HEAD(readdir_waitq);
221 
222 static struct orangefs_bufmap *
223 orangefs_bufmap_alloc(struct ORANGEFS_dev_map_desc *user_desc)
224 {
225 	struct orangefs_bufmap *bufmap;
226 
227 	bufmap = kzalloc_obj(*bufmap);
228 	if (!bufmap)
229 		goto out;
230 
231 	bufmap->total_size = user_desc->total_size;
232 	bufmap->desc_count = user_desc->count;
233 	bufmap->desc_size = user_desc->size;
234 	bufmap->desc_shift = ilog2(bufmap->desc_size);
235 	bufmap->page_count = bufmap->total_size / PAGE_SIZE;
236 
237 	bufmap->buffer_index_array =
238 		bitmap_zalloc(bufmap->desc_count, GFP_KERNEL);
239 	if (!bufmap->buffer_index_array)
240 		goto out_free_bufmap;
241 
242 	bufmap->desc_array =
243 		kzalloc_objs(struct orangefs_bufmap_desc, bufmap->desc_count);
244 	if (!bufmap->desc_array)
245 		goto out_free_index_array;
246 
247 	/* allocate storage to track our page mappings */
248 	bufmap->page_array =
249 		kzalloc_objs(struct page *, bufmap->page_count);
250 	if (!bufmap->page_array)
251 		goto out_free_desc_array;
252 
253 	/* allocate folio array. */
254 	bufmap->folio_array = kzalloc_objs(struct folio *, bufmap->page_count);
255 	if (!bufmap->folio_array)
256 		goto out_free_page_array;
257 
258 	return bufmap;
259 
260 out_free_page_array:
261 	kfree(bufmap->page_array);
262 out_free_desc_array:
263 	kfree(bufmap->desc_array);
264 out_free_index_array:
265 	bitmap_free(bufmap->buffer_index_array);
266 out_free_bufmap:
267 	kfree(bufmap);
268 out:
269 	return NULL;
270 }
271 
272 static int orangefs_bufmap_group_folios(struct orangefs_bufmap *bufmap)
273 {
274 	int i = 0;
275 	int f = 0;
276 	int k;
277 	int num_pages;
278 	struct page *page;
279 	struct folio *folio;
280 
281 	while (i < bufmap->page_count) {
282 		page = bufmap->page_array[i];
283 		folio = page_folio(page);
284 		num_pages = folio_nr_pages(folio);
285 		gossip_debug(GOSSIP_BUFMAP_DEBUG,
286 			"%s: i:%d: num_pages:%d: \n", __func__, i, num_pages);
287 
288 		for (k = 1; k < num_pages; k++) {
289 			if (bufmap->page_array[i + k] != folio_page(folio, k)) {
290 				gossip_err("%s: bad match,  i:%d: k:%d:\n",
291 					__func__, i, k);
292 				return -EINVAL;
293 			}
294 		}
295 
296 		bufmap->folio_array[f++] = folio;
297 		i += num_pages;
298 	}
299 
300 	bufmap->folio_count = f;
301 	pr_info("%s: Grouped %d folios from %d pages.\n",
302 		__func__,
303 		bufmap->folio_count,
304 		bufmap->page_count);
305 	return 0;
306 }
307 
308 static int orangefs_bufmap_map(struct orangefs_bufmap *bufmap,
309 				struct ORANGEFS_dev_map_desc *user_desc)
310 {
311 	int pages_per_desc = bufmap->desc_size / PAGE_SIZE;
312 	int ret;
313 	int i;
314 	int j;
315 	int current_folio;
316 	int desc_pages_needed;
317 	int desc_folio_count;
318 	int remaining_pages;
319 	int need_avail_min;
320 	int pages_assigned_to_this_desc;
321 	int allocated_descs = 0;
322 	size_t current_offset;
323 	size_t adjust_offset;
324 	struct folio *folio;
325 
326 	/* map the pages */
327 	ret = pin_user_pages_fast((unsigned long)user_desc->ptr,
328 		bufmap->page_count,
329 		FOLL_WRITE,
330 		bufmap->page_array);
331 
332 	if (ret < 0)
333 		return ret;
334 
335 	if (ret != bufmap->page_count) {
336 		gossip_err("orangefs error: asked for %d pages, only got %d.\n",
337 				bufmap->page_count, ret);
338 		for (i = 0; i < ret; i++)
339 			unpin_user_page(bufmap->page_array[i]);
340 		return -ENOMEM;
341 	}
342 
343 	/*
344 	 * ideally we want to get kernel space pointers for each page, but
345 	 * we can't kmap that many pages at once if highmem is being used.
346 	 * so instead, we just kmap/kunmap the page address each time the
347 	 * kaddr is needed.
348 	 */
349 	for (i = 0; i < bufmap->page_count; i++)
350 		flush_dcache_page(bufmap->page_array[i]);
351 
352 	/*
353 	 * Group pages into folios.
354 	 */
355 	ret = orangefs_bufmap_group_folios(bufmap);
356 	if (ret)
357 		goto unpin;
358 
359 	pr_info("%s: desc_size=%d bytes (%d pages per desc), total folios=%d\n",
360 			__func__, bufmap->desc_size, pages_per_desc,
361 			bufmap->folio_count);
362 
363 	current_folio = 0;
364 	remaining_pages = 0;
365 	current_offset = 0;
366 	for (i = 0; i < bufmap->desc_count; i++) {
367 		desc_pages_needed = pages_per_desc;
368 		desc_folio_count = 0;
369 		pages_assigned_to_this_desc = 0;
370 		bufmap->desc_array[i].is_two_2mib_chunks = false;
371 
372 		/*
373 		 * We hope there was enough memory that each desc is
374 		 * covered by two THPs/folios, if not we want to keep on
375 		 * working even if there's only one page per folio.
376 		 */
377 		bufmap->desc_array[i].folio_array =
378 			kzalloc_objs(struct folio *, pages_per_desc);
379 		if (!bufmap->desc_array[i].folio_array) {
380 			ret = -ENOMEM;
381 			goto unpin;
382 		}
383 
384 		bufmap->desc_array[i].folio_offsets =
385 			kzalloc_objs(size_t, pages_per_desc);
386 		if (!bufmap->desc_array[i].folio_offsets) {
387 			ret = -ENOMEM;
388 			kfree(bufmap->desc_array[i].folio_array);
389 			bufmap->desc_array[i].folio_array = NULL;
390 			goto unpin;
391 		}
392 
393 		bufmap->desc_array[i].uaddr =
394 			user_desc->ptr + (size_t)i * bufmap->desc_size;
395 
396 		/*
397 		 * Accumulate folios until desc is full.
398 		 */
399 		while (desc_pages_needed > 0) {
400 			if (remaining_pages == 0) {
401 				/* shouldn't happen. */
402 				if (current_folio >= bufmap->folio_count) {
403 					ret = -EINVAL;
404 					goto unpin;
405 				}
406 				folio = bufmap->folio_array[current_folio++];
407 				remaining_pages = folio_nr_pages(folio);
408 				current_offset = 0;
409 			} else {
410 				folio = bufmap->folio_array[current_folio - 1];
411 			}
412 
413 			need_avail_min =
414 				min(desc_pages_needed, remaining_pages);
415 			adjust_offset = need_avail_min * PAGE_SIZE;
416 
417 			bufmap->desc_array[i].folio_array[desc_folio_count] =
418 				folio;
419 			bufmap->desc_array[i].folio_offsets[desc_folio_count] =
420 				current_offset;
421 			desc_folio_count++;
422 			pages_assigned_to_this_desc += need_avail_min;
423 			desc_pages_needed -= need_avail_min;
424 			remaining_pages -= need_avail_min;
425 			current_offset += adjust_offset;
426 		}
427 
428 		/* Detect optimal case: two 2MiB folios per 4MiB slot. */
429 		if (desc_folio_count == 2 &&
430 		  folio_nr_pages(bufmap->desc_array[i].folio_array[0]) == 512 &&
431 		  folio_nr_pages(bufmap->desc_array[i].folio_array[1]) == 512) {
432 			bufmap->desc_array[i].is_two_2mib_chunks = true;
433 			gossip_debug(GOSSIP_BUFMAP_DEBUG, "%s: descriptor :%d: "
434 				"optimal folio/page ratio.\n", __func__, i);
435 		}
436 
437 		bufmap->desc_array[i].folio_count = desc_folio_count;
438 		gossip_debug(GOSSIP_BUFMAP_DEBUG,
439 			" descriptor %d: folio_count=%d, "
440 			"pages_assigned=%d (should be %d)\n",
441 			i, desc_folio_count, pages_assigned_to_this_desc,
442 			pages_per_desc);
443 
444 		allocated_descs = i + 1;
445 	}
446 
447 	return 0;
448 unpin:
449 	/*
450 	 * rollback any allocations we got so far...
451 	 * Memory pressure, like in generic/340, led me
452 	 * to write the rollback this way.
453 	 */
454 	for (j = 0; j < allocated_descs; j++) {
455 		if (bufmap->desc_array[j].folio_array) {
456 			kfree(bufmap->desc_array[j].folio_array);
457 			bufmap->desc_array[j].folio_array = NULL;
458 		}
459 		if (bufmap->desc_array[j].folio_offsets) {
460 			kfree(bufmap->desc_array[j].folio_offsets);
461 			bufmap->desc_array[j].folio_offsets = NULL;
462 		}
463 	}
464 	unpin_user_pages(bufmap->page_array, bufmap->page_count);
465 	return ret;
466 }
467 
468 /*
469  * orangefs_bufmap_initialize()
470  *
471  * initializes the mapped buffer interface
472  *
473  * user_desc is the parameters provided by userspace for the bufmap.
474  *
475  * returns 0 on success, -errno on failure
476  */
477 int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc)
478 {
479 	struct orangefs_bufmap *bufmap;
480 	int ret = -EINVAL;
481 
482 	gossip_debug(GOSSIP_BUFMAP_DEBUG,
483 		     "%s: called (ptr (" "%p) sz (%d) cnt(%d).\n",
484 		     __func__,
485 		     user_desc->ptr,
486 		     user_desc->size,
487 		     user_desc->count);
488 
489 	if (user_desc->total_size < 0 ||
490 	    user_desc->size < 0 ||
491 	    user_desc->count < 0)
492 		goto out;
493 
494 	/*
495 	 * sanity check alignment and size of buffer that caller wants to
496 	 * work with
497 	 */
498 	if (PAGE_ALIGN((unsigned long)user_desc->ptr) !=
499 	    (unsigned long)user_desc->ptr) {
500 		gossip_err("orangefs error: memory alignment (front). %p\n",
501 			   user_desc->ptr);
502 		goto out;
503 	}
504 
505 	if (PAGE_ALIGN(((unsigned long)user_desc->ptr + user_desc->total_size))
506 	    != (unsigned long)(user_desc->ptr + user_desc->total_size)) {
507 		gossip_err("orangefs error: memory alignment (back).(%p + %d)\n",
508 			   user_desc->ptr,
509 			   user_desc->total_size);
510 		goto out;
511 	}
512 
513 	if (user_desc->total_size != (user_desc->size * user_desc->count)) {
514 		gossip_err("orangefs error: user provided an oddly sized buffer: (%d, %d, %d)\n",
515 			   user_desc->total_size,
516 			   user_desc->size,
517 			   user_desc->count);
518 		goto out;
519 	}
520 
521 	if ((user_desc->size % PAGE_SIZE) != 0) {
522 		gossip_err("orangefs error: bufmap size not page size divisible (%d).\n",
523 			   user_desc->size);
524 		goto out;
525 	}
526 
527 	ret = -ENOMEM;
528 	bufmap = orangefs_bufmap_alloc(user_desc);
529 	if (!bufmap)
530 		goto out;
531 
532 	ret = orangefs_bufmap_map(bufmap, user_desc);
533 	if (ret)
534 		goto out_free_bufmap;
535 
536 
537 	spin_lock(&orangefs_bufmap_lock);
538 	if (__orangefs_bufmap) {
539 		spin_unlock(&orangefs_bufmap_lock);
540 		gossip_err("orangefs: error: bufmap already initialized.\n");
541 		ret = -EINVAL;
542 		goto out_unmap_bufmap;
543 	}
544 	__orangefs_bufmap = bufmap;
545 	install(&rw_map,
546 		bufmap->desc_count,
547 		bufmap->buffer_index_array);
548 	install(&readdir_map,
549 		ORANGEFS_READDIR_DEFAULT_DESC_COUNT,
550 		bufmap->readdir_index_array);
551 	spin_unlock(&orangefs_bufmap_lock);
552 
553 	gossip_debug(GOSSIP_BUFMAP_DEBUG,
554 		     "%s: exiting normally\n", __func__);
555 	return 0;
556 
557 out_unmap_bufmap:
558 	orangefs_bufmap_unmap(bufmap);
559 out_free_bufmap:
560 	orangefs_bufmap_free(bufmap);
561 out:
562 	return ret;
563 }
564 
565 /*
566  * orangefs_bufmap_finalize()
567  *
568  * shuts down the mapped buffer interface and releases any resources
569  * associated with it
570  *
571  * no return value
572  */
573 void orangefs_bufmap_finalize(void)
574 {
575 	struct orangefs_bufmap *bufmap = __orangefs_bufmap;
576 	if (!bufmap)
577 		return;
578 	gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: called\n");
579 	mark_killed(&rw_map);
580 	mark_killed(&readdir_map);
581 	gossip_debug(GOSSIP_BUFMAP_DEBUG,
582 		     "orangefs_bufmap_finalize: exiting normally\n");
583 }
584 
585 void orangefs_bufmap_run_down(void)
586 {
587 	struct orangefs_bufmap *bufmap = __orangefs_bufmap;
588 	if (!bufmap)
589 		return;
590 	run_down(&rw_map);
591 	run_down(&readdir_map);
592 	spin_lock(&orangefs_bufmap_lock);
593 	__orangefs_bufmap = NULL;
594 	spin_unlock(&orangefs_bufmap_lock);
595 	orangefs_bufmap_unmap(bufmap);
596 	orangefs_bufmap_free(bufmap);
597 }
598 
599 /*
600  * orangefs_bufmap_get()
601  *
602  * gets a free mapped buffer descriptor, will sleep until one becomes
603  * available if necessary
604  *
605  * returns slot on success, -errno on failure
606  */
607 int orangefs_bufmap_get(void)
608 {
609 	return get(&rw_map);
610 }
611 
612 /*
613  * orangefs_bufmap_put()
614  *
615  * returns a mapped buffer descriptor to the collection
616  *
617  * no return value
618  */
619 void orangefs_bufmap_put(int buffer_index)
620 {
621 	put(&rw_map, buffer_index);
622 }
623 
624 /*
625  * orangefs_readdir_index_get()
626  *
627  * gets a free descriptor, will sleep until one becomes
628  * available if necessary.
629  * Although the readdir buffers are not mapped into kernel space
630  * we could do that at a later point of time. Regardless, these
631  * indices are used by the client-core.
632  *
633  * returns slot on success, -errno on failure
634  */
635 int orangefs_readdir_index_get(void)
636 {
637 	return get(&readdir_map);
638 }
639 
640 void orangefs_readdir_index_put(int buffer_index)
641 {
642 	put(&readdir_map, buffer_index);
643 }
644 
645 /*
646  * we've been handed an iovec, we need to copy it to
647  * the shared memory descriptor at "buffer_index".
648  */
649 int orangefs_bufmap_copy_from_iovec(struct iov_iter *iter,
650 				int buffer_index,
651 				size_t size)
652 {
653 	struct orangefs_bufmap_desc *to;
654 	size_t remaining = size;
655 	int folio_index = 0;
656 	struct folio *folio;
657 	size_t folio_offset;
658 	size_t folio_avail;
659 	size_t copy_amount;
660 	size_t copied;
661 	void *kaddr;
662 	size_t half;
663 	size_t first;
664 	size_t second;
665 
666 	to = &__orangefs_bufmap->desc_array[buffer_index];
667 
668 	/* shouldn't happen... */
669 	if (size > 4194304)
670 		pr_info("%s: size:%zu\n", __func__, size);
671 
672 	gossip_debug(GOSSIP_BUFMAP_DEBUG,
673 		"%s: buffer_index:%d size:%zu folio_count:%d\n",
674 		__func__,
675 		buffer_index,
676 		size,
677 		to->folio_count);
678 
679 	/* Fast path: exactly two 2 MiB folios */
680 	if (to->is_two_2mib_chunks && size <= 4194304) {
681 		gossip_debug(GOSSIP_BUFMAP_DEBUG,
682 			"%s: fastpath hit.\n", __func__);
683 		half = 2097152;		/* 2 MiB */
684 		first = min(size, half);
685 		second = (size > half) ? size - half : 0;
686 
687 		/* First 2 MiB chunk */
688 		kaddr = kmap_local_folio(to->folio_array[0], 0);
689 		copied = copy_from_iter(kaddr, first, iter);
690 		kunmap_local(kaddr);
691 		if (copied != first)
692 			return -EFAULT;
693 
694 		if (second == 0)
695 			return 0;
696 
697 		/* Second 2 MiB chunk */
698 		kaddr = kmap_local_folio(to->folio_array[1], 0);
699 		copied = copy_from_iter(kaddr, second, iter);
700 		kunmap_local(kaddr);
701 		if (copied != second)
702 			return -EFAULT;
703 
704 		return 0;
705 	}
706 
707 	while (remaining > 0) {
708 
709 		if (unlikely(folio_index >= to->folio_count ||
710 			to->folio_array[folio_index] == NULL)) {
711 				gossip_err("%s: "
712 				   "folio_index:%d: >= folio_count:%d: "
713 		                   "(size %zu, buffer %d)\n",
714 					__func__,
715 					folio_index,
716 					to->folio_count,
717 					size,
718 					buffer_index);
719 				return -EFAULT;
720 		}
721 
722 		folio = to->folio_array[folio_index];
723 		folio_offset = to->folio_offsets[folio_index];
724 		folio_avail = folio_nr_pages(folio) * PAGE_SIZE - folio_offset;
725 		copy_amount = min(remaining, folio_avail);
726 		kaddr = kmap_local_folio(folio, folio_offset);
727 		copied = copy_from_iter(kaddr, copy_amount, iter);
728 		kunmap_local(kaddr);
729 
730 		if (copied != copy_amount)
731 			return -EFAULT;
732 
733 		remaining -= copied;
734 		folio_index++;
735 	}
736 
737 	return 0;
738 }
739 
740 /*
741  * we've been handed an iovec, we need to fill it from
742  * the shared memory descriptor at "buffer_index".
743  */
744 int orangefs_bufmap_copy_to_iovec(struct iov_iter *iter,
745 				    int buffer_index,
746 				    size_t size)
747 {
748 	struct orangefs_bufmap_desc *from;
749 	size_t remaining = size;
750 	int folio_index = 0;
751 	struct folio *folio;
752 	size_t folio_offset;
753 	size_t folio_avail;
754 	size_t copy_amount;
755 	size_t copied;
756 	void *kaddr;
757 	size_t half;
758 	size_t first;
759 	size_t second;
760 
761 	from = &__orangefs_bufmap->desc_array[buffer_index];
762 
763 	/* shouldn't happen... */
764 	if (size > 4194304)
765 		pr_info("%s: size:%zu\n", __func__, size);
766 
767 	gossip_debug(GOSSIP_BUFMAP_DEBUG,
768 		"%s: buffer_index:%d size:%zu folio_count:%d\n",
769 		__func__,
770 		buffer_index,
771 		size,
772 		from->folio_count);
773 
774 	/* Fast path: exactly two 2 MiB folios */
775 	if (from->is_two_2mib_chunks && size <= 4194304) {
776 		gossip_debug(GOSSIP_BUFMAP_DEBUG,
777 			"%s: fastpath hit.\n", __func__);
778 		half = 2097152;		/* 2 MiB */
779 		first = min(size, half);
780 		second = (size > half) ? size - half : 0;
781 		void *kaddr;
782 		size_t copied;
783 
784 		/* First 2 MiB chunk */
785 		kaddr = kmap_local_folio(from->folio_array[0], 0);
786 		copied = copy_to_iter(kaddr, first, iter);
787 		kunmap_local(kaddr);
788 		if (copied != first)
789 			return -EFAULT;
790 
791 		if (second == 0)
792 			return 0;
793 
794 		/* Second 2 MiB chunk */
795 		kaddr = kmap_local_folio(from->folio_array[1], 0);
796 		copied = copy_to_iter(kaddr, second, iter);
797 		kunmap_local(kaddr);
798 		if (copied != second)
799 			return -EFAULT;
800 
801 		return 0;
802 	}
803 
804 	while (remaining > 0) {
805 
806 		if (unlikely(folio_index >= from->folio_count ||
807 			from->folio_array[folio_index] == NULL)) {
808 				gossip_err("%s: "
809 				   "folio_index:%d: >= folio_count:%d: "
810 		                   "(size %zu, buffer %d)\n",
811 					__func__,
812 					folio_index,
813 					from->folio_count,
814 					size,
815 					buffer_index);
816 				return -EFAULT;
817 		}
818 
819 		folio = from->folio_array[folio_index];
820 		folio_offset = from->folio_offsets[folio_index];
821 		folio_avail = folio_nr_pages(folio) * PAGE_SIZE - folio_offset;
822 		copy_amount = min(remaining, folio_avail);
823 
824 		kaddr = kmap_local_folio(folio, folio_offset);
825 		copied = copy_to_iter(kaddr, copy_amount, iter);
826 		kunmap_local(kaddr);
827 
828 		if (copied != copy_amount)
829 			return -EFAULT;
830 
831 		remaining -= copied;
832 		folio_index++;
833 	}
834 
835 	return 0;
836 }
837