xref: /linux/fs/netfs/write_collect.c (revision 1c75adb22d49ca9389333ca5e6939052a7203111)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /* Network filesystem write subrequest result collection, assessment
3  * and retrying.
4  *
5  * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved.
6  * Written by David Howells (dhowells@redhat.com)
7  */
8 
9 #include <linux/export.h>
10 #include <linux/fs.h>
11 #include <linux/mm.h>
12 #include <linux/pagemap.h>
13 #include <linux/slab.h>
14 #include "internal.h"
15 
16 /* Notes made in the collector */
17 #define HIT_PENDING		0x01	/* A front op was still pending */
18 #define SOME_EMPTY		0x02	/* One of more streams are empty */
19 #define ALL_EMPTY		0x04	/* All streams are empty */
20 #define MAYBE_DISCONTIG		0x08	/* A front op may be discontiguous (rounded to PAGE_SIZE) */
21 #define NEED_REASSESS		0x10	/* Need to loop round and reassess */
22 #define REASSESS_DISCONTIG	0x20	/* Reassess discontiguity if contiguity advances */
23 #define MADE_PROGRESS		0x40	/* Made progress cleaning up a stream or the folio set */
24 #define BUFFERED		0x80	/* The pagecache needs cleaning up */
25 #define NEED_RETRY		0x100	/* A front op requests retrying */
26 #define SAW_FAILURE		0x200	/* One stream or hit a permanent failure */
27 
28 /*
29  * Successful completion of write of a folio to the server and/or cache.  Note
30  * that we are not allowed to lock the folio here on pain of deadlocking with
31  * truncate.
32  */
33 int netfs_folio_written_back(struct folio *folio)
34 {
35 	enum netfs_folio_trace why = netfs_folio_trace_clear;
36 	struct netfs_folio *finfo;
37 	struct netfs_group *group = NULL;
38 	int gcount = 0;
39 
40 	if ((finfo = netfs_folio_info(folio))) {
41 		/* Streaming writes cannot be redirtied whilst under writeback,
42 		 * so discard the streaming record.
43 		 */
44 		folio_detach_private(folio);
45 		group = finfo->netfs_group;
46 		gcount++;
47 		kfree(finfo);
48 		why = netfs_folio_trace_clear_s;
49 		goto end_wb;
50 	}
51 
52 	if ((group = netfs_folio_group(folio))) {
53 		if (group == NETFS_FOLIO_COPY_TO_CACHE) {
54 			why = netfs_folio_trace_clear_cc;
55 			folio_detach_private(folio);
56 			goto end_wb;
57 		}
58 
59 		/* Need to detach the group pointer if the page didn't get
60 		 * redirtied.  If it has been redirtied, then it must be within
61 		 * the same group.
62 		 */
63 		why = netfs_folio_trace_redirtied;
64 		if (!folio_test_dirty(folio)) {
65 			folio_detach_private(folio);
66 			gcount++;
67 			why = netfs_folio_trace_clear_g;
68 		}
69 	}
70 
71 end_wb:
72 	trace_netfs_folio(folio, why);
73 	folio_end_writeback(folio);
74 	return gcount;
75 }
76 
77 /*
78  * Get hold of a folio we have under writeback.  We don't want to get the
79  * refcount on it.
80  */
81 static struct folio *netfs_writeback_lookup_folio(struct netfs_io_request *wreq, loff_t pos)
82 {
83 	XA_STATE(xas, &wreq->mapping->i_pages, pos / PAGE_SIZE);
84 	struct folio *folio;
85 
86 	rcu_read_lock();
87 
88 	for (;;) {
89 		xas_reset(&xas);
90 		folio = xas_load(&xas);
91 		if (xas_retry(&xas, folio))
92 			continue;
93 
94 		if (!folio || xa_is_value(folio))
95 			kdebug("R=%08x: folio %lx (%llx) not present",
96 			       wreq->debug_id, xas.xa_index, pos / PAGE_SIZE);
97 		BUG_ON(!folio || xa_is_value(folio));
98 
99 		if (folio == xas_reload(&xas))
100 			break;
101 	}
102 
103 	rcu_read_unlock();
104 
105 	if (WARN_ONCE(!folio_test_writeback(folio),
106 		      "R=%08x: folio %lx is not under writeback\n",
107 		      wreq->debug_id, folio->index)) {
108 		trace_netfs_folio(folio, netfs_folio_trace_not_under_wback);
109 	}
110 	return folio;
111 }
112 
113 /*
114  * Unlock any folios we've finished with.
115  */
116 static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq,
117 					  unsigned long long collected_to,
118 					  unsigned int *notes)
119 {
120 	for (;;) {
121 		struct folio *folio;
122 		struct netfs_folio *finfo;
123 		unsigned long long fpos, fend;
124 		size_t fsize, flen;
125 
126 		folio = netfs_writeback_lookup_folio(wreq, wreq->cleaned_to);
127 
128 		fpos = folio_pos(folio);
129 		fsize = folio_size(folio);
130 		finfo = netfs_folio_info(folio);
131 		flen = finfo ? finfo->dirty_offset + finfo->dirty_len : fsize;
132 
133 		fend = min_t(unsigned long long, fpos + flen, wreq->i_size);
134 
135 		trace_netfs_collect_folio(wreq, folio, fend, collected_to);
136 
137 		if (fpos + fsize > wreq->contiguity) {
138 			trace_netfs_collect_contig(wreq, fpos + fsize,
139 						   netfs_contig_trace_unlock);
140 			wreq->contiguity = fpos + fsize;
141 		}
142 
143 		/* Unlock any folio we've transferred all of. */
144 		if (collected_to < fend)
145 			break;
146 
147 		wreq->nr_group_rel += netfs_folio_written_back(folio);
148 		wreq->cleaned_to = fpos + fsize;
149 		*notes |= MADE_PROGRESS;
150 
151 		if (fpos + fsize >= collected_to)
152 			break;
153 	}
154 }
155 
156 /*
157  * Perform retries on the streams that need it.
158  */
159 static void netfs_retry_write_stream(struct netfs_io_request *wreq,
160 				     struct netfs_io_stream *stream)
161 {
162 	struct list_head *next;
163 
164 	_enter("R=%x[%x:]", wreq->debug_id, stream->stream_nr);
165 
166 	if (list_empty(&stream->subrequests))
167 		return;
168 
169 	if (stream->source == NETFS_UPLOAD_TO_SERVER &&
170 	    wreq->netfs_ops->retry_request)
171 		wreq->netfs_ops->retry_request(wreq, stream);
172 
173 	if (unlikely(stream->failed))
174 		return;
175 
176 	/* If there's no renegotiation to do, just resend each failed subreq. */
177 	if (!stream->prepare_write) {
178 		struct netfs_io_subrequest *subreq;
179 
180 		list_for_each_entry(subreq, &stream->subrequests, rreq_link) {
181 			if (test_bit(NETFS_SREQ_FAILED, &subreq->flags))
182 				break;
183 			if (__test_and_clear_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags)) {
184 				__set_bit(NETFS_SREQ_RETRYING, &subreq->flags);
185 				netfs_get_subrequest(subreq, netfs_sreq_trace_get_resubmit);
186 				netfs_reissue_write(stream, subreq);
187 			}
188 		}
189 		return;
190 	}
191 
192 	next = stream->subrequests.next;
193 
194 	do {
195 		struct netfs_io_subrequest *subreq = NULL, *from, *to, *tmp;
196 		unsigned long long start, len;
197 		size_t part;
198 		bool boundary = false;
199 
200 		/* Go through the stream and find the next span of contiguous
201 		 * data that we then rejig (cifs, for example, needs the wsize
202 		 * renegotiating) and reissue.
203 		 */
204 		from = list_entry(next, struct netfs_io_subrequest, rreq_link);
205 		to = from;
206 		start = from->start + from->transferred;
207 		len   = from->len   - from->transferred;
208 
209 		if (test_bit(NETFS_SREQ_FAILED, &from->flags) ||
210 		    !test_bit(NETFS_SREQ_NEED_RETRY, &from->flags))
211 			return;
212 
213 		list_for_each_continue(next, &stream->subrequests) {
214 			subreq = list_entry(next, struct netfs_io_subrequest, rreq_link);
215 			if (subreq->start + subreq->transferred != start + len ||
216 			    test_bit(NETFS_SREQ_BOUNDARY, &subreq->flags) ||
217 			    !test_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags))
218 				break;
219 			to = subreq;
220 			len += to->len;
221 		}
222 
223 		/* Work through the sublist. */
224 		subreq = from;
225 		list_for_each_entry_from(subreq, &stream->subrequests, rreq_link) {
226 			if (!len)
227 				break;
228 			/* Renegotiate max_len (wsize) */
229 			trace_netfs_sreq(subreq, netfs_sreq_trace_retry);
230 			__clear_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags);
231 			__set_bit(NETFS_SREQ_RETRYING, &subreq->flags);
232 			stream->prepare_write(subreq);
233 
234 			part = min(len, subreq->max_len);
235 			subreq->len = part;
236 			subreq->start = start;
237 			subreq->transferred = 0;
238 			len -= part;
239 			start += part;
240 			if (len && subreq == to &&
241 			    __test_and_clear_bit(NETFS_SREQ_BOUNDARY, &to->flags))
242 				boundary = true;
243 
244 			netfs_get_subrequest(subreq, netfs_sreq_trace_get_resubmit);
245 			netfs_reissue_write(stream, subreq);
246 			if (subreq == to)
247 				break;
248 		}
249 
250 		/* If we managed to use fewer subreqs, we can discard the
251 		 * excess; if we used the same number, then we're done.
252 		 */
253 		if (!len) {
254 			if (subreq == to)
255 				continue;
256 			list_for_each_entry_safe_from(subreq, tmp,
257 						      &stream->subrequests, rreq_link) {
258 				trace_netfs_sreq(subreq, netfs_sreq_trace_discard);
259 				list_del(&subreq->rreq_link);
260 				netfs_put_subrequest(subreq, false, netfs_sreq_trace_put_done);
261 				if (subreq == to)
262 					break;
263 			}
264 			continue;
265 		}
266 
267 		/* We ran out of subrequests, so we need to allocate some more
268 		 * and insert them after.
269 		 */
270 		do {
271 			subreq = netfs_alloc_subrequest(wreq);
272 			subreq->source		= to->source;
273 			subreq->start		= start;
274 			subreq->max_len		= len;
275 			subreq->max_nr_segs	= INT_MAX;
276 			subreq->debug_index	= atomic_inc_return(&wreq->subreq_counter);
277 			subreq->stream_nr	= to->stream_nr;
278 			__set_bit(NETFS_SREQ_RETRYING, &subreq->flags);
279 
280 			trace_netfs_sreq_ref(wreq->debug_id, subreq->debug_index,
281 					     refcount_read(&subreq->ref),
282 					     netfs_sreq_trace_new);
283 			netfs_get_subrequest(subreq, netfs_sreq_trace_get_resubmit);
284 
285 			list_add(&subreq->rreq_link, &to->rreq_link);
286 			to = list_next_entry(to, rreq_link);
287 			trace_netfs_sreq(subreq, netfs_sreq_trace_retry);
288 
289 			switch (stream->source) {
290 			case NETFS_UPLOAD_TO_SERVER:
291 				netfs_stat(&netfs_n_wh_upload);
292 				subreq->max_len = min(len, wreq->wsize);
293 				break;
294 			case NETFS_WRITE_TO_CACHE:
295 				netfs_stat(&netfs_n_wh_write);
296 				break;
297 			default:
298 				WARN_ON_ONCE(1);
299 			}
300 
301 			stream->prepare_write(subreq);
302 
303 			part = min(len, subreq->max_len);
304 			subreq->len = subreq->transferred + part;
305 			len -= part;
306 			start += part;
307 			if (!len && boundary) {
308 				__set_bit(NETFS_SREQ_BOUNDARY, &to->flags);
309 				boundary = false;
310 			}
311 
312 			netfs_reissue_write(stream, subreq);
313 			if (!len)
314 				break;
315 
316 		} while (len);
317 
318 	} while (!list_is_head(next, &stream->subrequests));
319 }
320 
321 /*
322  * Perform retries on the streams that need it.  If we're doing content
323  * encryption and the server copy changed due to a third-party write, we may
324  * need to do an RMW cycle and also rewrite the data to the cache.
325  */
326 static void netfs_retry_writes(struct netfs_io_request *wreq)
327 {
328 	struct netfs_io_subrequest *subreq;
329 	struct netfs_io_stream *stream;
330 	int s;
331 
332 	/* Wait for all outstanding I/O to quiesce before performing retries as
333 	 * we may need to renegotiate the I/O sizes.
334 	 */
335 	for (s = 0; s < NR_IO_STREAMS; s++) {
336 		stream = &wreq->io_streams[s];
337 		if (!stream->active)
338 			continue;
339 
340 		list_for_each_entry(subreq, &stream->subrequests, rreq_link) {
341 			wait_on_bit(&subreq->flags, NETFS_SREQ_IN_PROGRESS,
342 				    TASK_UNINTERRUPTIBLE);
343 		}
344 	}
345 
346 	// TODO: Enc: Fetch changed partial pages
347 	// TODO: Enc: Reencrypt content if needed.
348 	// TODO: Enc: Wind back transferred point.
349 	// TODO: Enc: Mark cache pages for retry.
350 
351 	for (s = 0; s < NR_IO_STREAMS; s++) {
352 		stream = &wreq->io_streams[s];
353 		if (stream->need_retry) {
354 			stream->need_retry = false;
355 			netfs_retry_write_stream(wreq, stream);
356 		}
357 	}
358 }
359 
360 /*
361  * Collect and assess the results of various write subrequests.  We may need to
362  * retry some of the results - or even do an RMW cycle for content crypto.
363  *
364  * Note that we have a number of parallel, overlapping lists of subrequests,
365  * one to the server and one to the local cache for example, which may not be
366  * the same size or starting position and may not even correspond in boundary
367  * alignment.
368  */
369 static void netfs_collect_write_results(struct netfs_io_request *wreq)
370 {
371 	struct netfs_io_subrequest *front, *remove;
372 	struct netfs_io_stream *stream;
373 	unsigned long long collected_to;
374 	unsigned int notes;
375 	int s;
376 
377 	_enter("%llx-%llx", wreq->start, wreq->start + wreq->len);
378 	trace_netfs_collect(wreq);
379 	trace_netfs_rreq(wreq, netfs_rreq_trace_collect);
380 
381 reassess_streams:
382 	smp_rmb();
383 	collected_to = ULLONG_MAX;
384 	if (wreq->origin == NETFS_WRITEBACK)
385 		notes = ALL_EMPTY | BUFFERED | MAYBE_DISCONTIG;
386 	else if (wreq->origin == NETFS_WRITETHROUGH)
387 		notes = ALL_EMPTY | BUFFERED;
388 	else
389 		notes = ALL_EMPTY;
390 
391 	/* Remove completed subrequests from the front of the streams and
392 	 * advance the completion point on each stream.  We stop when we hit
393 	 * something that's in progress.  The issuer thread may be adding stuff
394 	 * to the tail whilst we're doing this.
395 	 *
396 	 * We must not, however, merge in discontiguities that span whole
397 	 * folios that aren't under writeback.  This is made more complicated
398 	 * by the folios in the gap being of unpredictable sizes - if they even
399 	 * exist - but we don't want to look them up.
400 	 */
401 	for (s = 0; s < NR_IO_STREAMS; s++) {
402 		loff_t rstart, rend;
403 
404 		stream = &wreq->io_streams[s];
405 		/* Read active flag before list pointers */
406 		if (!smp_load_acquire(&stream->active))
407 			continue;
408 
409 		front = stream->front;
410 		while (front) {
411 			trace_netfs_collect_sreq(wreq, front);
412 			//_debug("sreq [%x] %llx %zx/%zx",
413 			//       front->debug_index, front->start, front->transferred, front->len);
414 
415 			/* Stall if there may be a discontinuity. */
416 			rstart = round_down(front->start, PAGE_SIZE);
417 			if (rstart > wreq->contiguity) {
418 				if (wreq->contiguity > stream->collected_to) {
419 					trace_netfs_collect_gap(wreq, stream,
420 								wreq->contiguity, 'D');
421 					stream->collected_to = wreq->contiguity;
422 				}
423 				notes |= REASSESS_DISCONTIG;
424 				break;
425 			}
426 			rend = round_up(front->start + front->len, PAGE_SIZE);
427 			if (rend > wreq->contiguity) {
428 				trace_netfs_collect_contig(wreq, rend,
429 							   netfs_contig_trace_collect);
430 				wreq->contiguity = rend;
431 				if (notes & REASSESS_DISCONTIG)
432 					notes |= NEED_REASSESS;
433 			}
434 			notes &= ~MAYBE_DISCONTIG;
435 
436 			/* Stall if the front is still undergoing I/O. */
437 			if (test_bit(NETFS_SREQ_IN_PROGRESS, &front->flags)) {
438 				notes |= HIT_PENDING;
439 				break;
440 			}
441 			smp_rmb(); /* Read counters after I-P flag. */
442 
443 			if (stream->failed) {
444 				stream->collected_to = front->start + front->len;
445 				notes |= MADE_PROGRESS | SAW_FAILURE;
446 				goto cancel;
447 			}
448 			if (front->start + front->transferred > stream->collected_to) {
449 				stream->collected_to = front->start + front->transferred;
450 				stream->transferred = stream->collected_to - wreq->start;
451 				notes |= MADE_PROGRESS;
452 			}
453 			if (test_bit(NETFS_SREQ_FAILED, &front->flags)) {
454 				stream->failed = true;
455 				stream->error = front->error;
456 				if (stream->source == NETFS_UPLOAD_TO_SERVER)
457 					mapping_set_error(wreq->mapping, front->error);
458 				notes |= NEED_REASSESS | SAW_FAILURE;
459 				break;
460 			}
461 			if (front->transferred < front->len) {
462 				stream->need_retry = true;
463 				notes |= NEED_RETRY | MADE_PROGRESS;
464 				break;
465 			}
466 
467 		cancel:
468 			/* Remove if completely consumed. */
469 			spin_lock(&wreq->lock);
470 
471 			remove = front;
472 			list_del_init(&front->rreq_link);
473 			front = list_first_entry_or_null(&stream->subrequests,
474 							 struct netfs_io_subrequest, rreq_link);
475 			stream->front = front;
476 			if (!front) {
477 				unsigned long long jump_to = atomic64_read(&wreq->issued_to);
478 
479 				if (stream->collected_to < jump_to) {
480 					trace_netfs_collect_gap(wreq, stream, jump_to, 'A');
481 					stream->collected_to = jump_to;
482 				}
483 			}
484 
485 			spin_unlock(&wreq->lock);
486 			netfs_put_subrequest(remove, false,
487 					     notes & SAW_FAILURE ?
488 					     netfs_sreq_trace_put_cancel :
489 					     netfs_sreq_trace_put_done);
490 		}
491 
492 		if (front)
493 			notes &= ~ALL_EMPTY;
494 		else
495 			notes |= SOME_EMPTY;
496 
497 		if (stream->collected_to < collected_to)
498 			collected_to = stream->collected_to;
499 	}
500 
501 	if (collected_to != ULLONG_MAX && collected_to > wreq->collected_to)
502 		wreq->collected_to = collected_to;
503 
504 	/* If we have an empty stream, we need to jump it forward over any gap
505 	 * otherwise the collection point will never advance.
506 	 *
507 	 * Note that the issuer always adds to the stream with the lowest
508 	 * so-far submitted start, so if we see two consecutive subreqs in one
509 	 * stream with nothing between then in another stream, then the second
510 	 * stream has a gap that can be jumped.
511 	 */
512 	if (notes & SOME_EMPTY) {
513 		unsigned long long jump_to = wreq->start + READ_ONCE(wreq->submitted);
514 
515 		for (s = 0; s < NR_IO_STREAMS; s++) {
516 			stream = &wreq->io_streams[s];
517 			if (stream->active &&
518 			    stream->front &&
519 			    stream->front->start < jump_to)
520 				jump_to = stream->front->start;
521 		}
522 
523 		for (s = 0; s < NR_IO_STREAMS; s++) {
524 			stream = &wreq->io_streams[s];
525 			if (stream->active &&
526 			    !stream->front &&
527 			    stream->collected_to < jump_to) {
528 				trace_netfs_collect_gap(wreq, stream, jump_to, 'B');
529 				stream->collected_to = jump_to;
530 			}
531 		}
532 	}
533 
534 	for (s = 0; s < NR_IO_STREAMS; s++) {
535 		stream = &wreq->io_streams[s];
536 		if (stream->active)
537 			trace_netfs_collect_stream(wreq, stream);
538 	}
539 
540 	trace_netfs_collect_state(wreq, wreq->collected_to, notes);
541 
542 	/* Unlock any folios that we have now finished with. */
543 	if (notes & BUFFERED) {
544 		unsigned long long clean_to = min(wreq->collected_to, wreq->contiguity);
545 
546 		if (wreq->cleaned_to < clean_to)
547 			netfs_writeback_unlock_folios(wreq, clean_to, &notes);
548 	} else {
549 		wreq->cleaned_to = wreq->collected_to;
550 	}
551 
552 	// TODO: Discard encryption buffers
553 
554 	/* If all streams are discontiguous with the last folio we cleared, we
555 	 * may need to skip a set of folios.
556 	 */
557 	if ((notes & (MAYBE_DISCONTIG | ALL_EMPTY)) == MAYBE_DISCONTIG) {
558 		unsigned long long jump_to = ULLONG_MAX;
559 
560 		for (s = 0; s < NR_IO_STREAMS; s++) {
561 			stream = &wreq->io_streams[s];
562 			if (stream->active && stream->front &&
563 			    stream->front->start < jump_to)
564 				jump_to = stream->front->start;
565 		}
566 
567 		trace_netfs_collect_contig(wreq, jump_to, netfs_contig_trace_jump);
568 		wreq->contiguity = jump_to;
569 		wreq->cleaned_to = jump_to;
570 		wreq->collected_to = jump_to;
571 		for (s = 0; s < NR_IO_STREAMS; s++) {
572 			stream = &wreq->io_streams[s];
573 			if (stream->collected_to < jump_to)
574 				stream->collected_to = jump_to;
575 		}
576 		//cond_resched();
577 		notes |= MADE_PROGRESS;
578 		goto reassess_streams;
579 	}
580 
581 	if (notes & NEED_RETRY)
582 		goto need_retry;
583 	if ((notes & MADE_PROGRESS) && test_bit(NETFS_RREQ_PAUSE, &wreq->flags)) {
584 		trace_netfs_rreq(wreq, netfs_rreq_trace_unpause);
585 		clear_bit_unlock(NETFS_RREQ_PAUSE, &wreq->flags);
586 		wake_up_bit(&wreq->flags, NETFS_RREQ_PAUSE);
587 	}
588 
589 	if (notes & NEED_REASSESS) {
590 		//cond_resched();
591 		goto reassess_streams;
592 	}
593 	if (notes & MADE_PROGRESS) {
594 		//cond_resched();
595 		goto reassess_streams;
596 	}
597 
598 out:
599 	netfs_put_group_many(wreq->group, wreq->nr_group_rel);
600 	wreq->nr_group_rel = 0;
601 	_leave(" = %x", notes);
602 	return;
603 
604 need_retry:
605 	/* Okay...  We're going to have to retry one or both streams.  Note
606 	 * that any partially completed op will have had any wholly transferred
607 	 * folios removed from it.
608 	 */
609 	_debug("retry");
610 	netfs_retry_writes(wreq);
611 	goto out;
612 }
613 
614 /*
615  * Perform the collection of subrequests, folios and encryption buffers.
616  */
617 void netfs_write_collection_worker(struct work_struct *work)
618 {
619 	struct netfs_io_request *wreq = container_of(work, struct netfs_io_request, work);
620 	struct netfs_inode *ictx = netfs_inode(wreq->inode);
621 	size_t transferred;
622 	int s;
623 
624 	_enter("R=%x", wreq->debug_id);
625 
626 	netfs_see_request(wreq, netfs_rreq_trace_see_work);
627 	if (!test_bit(NETFS_RREQ_IN_PROGRESS, &wreq->flags)) {
628 		netfs_put_request(wreq, false, netfs_rreq_trace_put_work);
629 		return;
630 	}
631 
632 	netfs_collect_write_results(wreq);
633 
634 	/* We're done when the app thread has finished posting subreqs and all
635 	 * the queues in all the streams are empty.
636 	 */
637 	if (!test_bit(NETFS_RREQ_ALL_QUEUED, &wreq->flags)) {
638 		netfs_put_request(wreq, false, netfs_rreq_trace_put_work);
639 		return;
640 	}
641 	smp_rmb(); /* Read ALL_QUEUED before lists. */
642 
643 	transferred = LONG_MAX;
644 	for (s = 0; s < NR_IO_STREAMS; s++) {
645 		struct netfs_io_stream *stream = &wreq->io_streams[s];
646 		if (!stream->active)
647 			continue;
648 		if (!list_empty(&stream->subrequests)) {
649 			netfs_put_request(wreq, false, netfs_rreq_trace_put_work);
650 			return;
651 		}
652 		if (stream->transferred < transferred)
653 			transferred = stream->transferred;
654 	}
655 
656 	/* Okay, declare that all I/O is complete. */
657 	wreq->transferred = transferred;
658 	trace_netfs_rreq(wreq, netfs_rreq_trace_write_done);
659 
660 	if (wreq->io_streams[1].active &&
661 	    wreq->io_streams[1].failed) {
662 		/* Cache write failure doesn't prevent writeback completion
663 		 * unless we're in disconnected mode.
664 		 */
665 		ictx->ops->invalidate_cache(wreq);
666 	}
667 
668 	if (wreq->cleanup)
669 		wreq->cleanup(wreq);
670 
671 	if (wreq->origin == NETFS_DIO_WRITE &&
672 	    wreq->mapping->nrpages) {
673 		/* mmap may have got underfoot and we may now have folios
674 		 * locally covering the region we just wrote.  Attempt to
675 		 * discard the folios, but leave in place any modified locally.
676 		 * ->write_iter() is prevented from interfering by the DIO
677 		 * counter.
678 		 */
679 		pgoff_t first = wreq->start >> PAGE_SHIFT;
680 		pgoff_t last = (wreq->start + wreq->transferred - 1) >> PAGE_SHIFT;
681 		invalidate_inode_pages2_range(wreq->mapping, first, last);
682 	}
683 
684 	if (wreq->origin == NETFS_DIO_WRITE)
685 		inode_dio_end(wreq->inode);
686 
687 	_debug("finished");
688 	trace_netfs_rreq(wreq, netfs_rreq_trace_wake_ip);
689 	clear_bit_unlock(NETFS_RREQ_IN_PROGRESS, &wreq->flags);
690 	wake_up_bit(&wreq->flags, NETFS_RREQ_IN_PROGRESS);
691 
692 	if (wreq->iocb) {
693 		size_t written = min(wreq->transferred, wreq->len);
694 		wreq->iocb->ki_pos += written;
695 		if (wreq->iocb->ki_complete)
696 			wreq->iocb->ki_complete(
697 				wreq->iocb, wreq->error ? wreq->error : written);
698 		wreq->iocb = VFS_PTR_POISON;
699 	}
700 
701 	netfs_clear_subrequests(wreq, false);
702 	netfs_put_request(wreq, false, netfs_rreq_trace_put_work_complete);
703 }
704 
705 /*
706  * Wake the collection work item.
707  */
708 void netfs_wake_write_collector(struct netfs_io_request *wreq, bool was_async)
709 {
710 	if (!work_pending(&wreq->work)) {
711 		netfs_get_request(wreq, netfs_rreq_trace_get_work);
712 		if (!queue_work(system_unbound_wq, &wreq->work))
713 			netfs_put_request(wreq, was_async, netfs_rreq_trace_put_work_nq);
714 	}
715 }
716 
717 /**
718  * netfs_write_subrequest_terminated - Note the termination of a write operation.
719  * @_op: The I/O request that has terminated.
720  * @transferred_or_error: The amount of data transferred or an error code.
721  * @was_async: The termination was asynchronous
722  *
723  * This tells the library that a contributory write I/O operation has
724  * terminated, one way or another, and that it should collect the results.
725  *
726  * The caller indicates in @transferred_or_error the outcome of the operation,
727  * supplying a positive value to indicate the number of bytes transferred or a
728  * negative error code.  The library will look after reissuing I/O operations
729  * as appropriate and writing downloaded data to the cache.
730  *
731  * If @was_async is true, the caller might be running in softirq or interrupt
732  * context and we can't sleep.
733  *
734  * When this is called, ownership of the subrequest is transferred back to the
735  * library, along with a ref.
736  *
737  * Note that %_op is a void* so that the function can be passed to
738  * kiocb::term_func without the need for a casting wrapper.
739  */
740 void netfs_write_subrequest_terminated(void *_op, ssize_t transferred_or_error,
741 				       bool was_async)
742 {
743 	struct netfs_io_subrequest *subreq = _op;
744 	struct netfs_io_request *wreq = subreq->rreq;
745 	struct netfs_io_stream *stream = &wreq->io_streams[subreq->stream_nr];
746 
747 	_enter("%x[%x] %zd", wreq->debug_id, subreq->debug_index, transferred_or_error);
748 
749 	switch (subreq->source) {
750 	case NETFS_UPLOAD_TO_SERVER:
751 		netfs_stat(&netfs_n_wh_upload_done);
752 		break;
753 	case NETFS_WRITE_TO_CACHE:
754 		netfs_stat(&netfs_n_wh_write_done);
755 		break;
756 	case NETFS_INVALID_WRITE:
757 		break;
758 	default:
759 		BUG();
760 	}
761 
762 	if (IS_ERR_VALUE(transferred_or_error)) {
763 		subreq->error = transferred_or_error;
764 		if (subreq->error == -EAGAIN)
765 			set_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags);
766 		else
767 			set_bit(NETFS_SREQ_FAILED, &subreq->flags);
768 		trace_netfs_failure(wreq, subreq, transferred_or_error, netfs_fail_write);
769 
770 		switch (subreq->source) {
771 		case NETFS_WRITE_TO_CACHE:
772 			netfs_stat(&netfs_n_wh_write_failed);
773 			break;
774 		case NETFS_UPLOAD_TO_SERVER:
775 			netfs_stat(&netfs_n_wh_upload_failed);
776 			break;
777 		default:
778 			break;
779 		}
780 		trace_netfs_rreq(wreq, netfs_rreq_trace_set_pause);
781 		set_bit(NETFS_RREQ_PAUSE, &wreq->flags);
782 	} else {
783 		if (WARN(transferred_or_error > subreq->len - subreq->transferred,
784 			 "Subreq excess write: R=%x[%x] %zd > %zu - %zu",
785 			 wreq->debug_id, subreq->debug_index,
786 			 transferred_or_error, subreq->len, subreq->transferred))
787 			transferred_or_error = subreq->len - subreq->transferred;
788 
789 		subreq->error = 0;
790 		subreq->transferred += transferred_or_error;
791 
792 		if (subreq->transferred < subreq->len)
793 			set_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags);
794 	}
795 
796 	trace_netfs_sreq(subreq, netfs_sreq_trace_terminated);
797 
798 	clear_bit_unlock(NETFS_SREQ_IN_PROGRESS, &subreq->flags);
799 	wake_up_bit(&subreq->flags, NETFS_SREQ_IN_PROGRESS);
800 
801 	/* If we are at the head of the queue, wake up the collector,
802 	 * transferring a ref to it if we were the ones to do so.
803 	 */
804 	if (list_is_first(&subreq->rreq_link, &stream->subrequests))
805 		netfs_wake_write_collector(wreq, was_async);
806 
807 	netfs_put_subrequest(subreq, was_async, netfs_sreq_trace_put_terminated);
808 }
809 EXPORT_SYMBOL(netfs_write_subrequest_terminated);
810