1 // SPDX-License-Identifier: GPL-2.0-only 2 /* Network filesystem write subrequest result collection, assessment 3 * and retrying. 4 * 5 * Copyright (C) 2024 Red Hat, Inc. All Rights Reserved. 6 * Written by David Howells (dhowells@redhat.com) 7 */ 8 9 #include <linux/export.h> 10 #include <linux/fs.h> 11 #include <linux/mm.h> 12 #include <linux/pagemap.h> 13 #include <linux/slab.h> 14 #include "internal.h" 15 16 /* Notes made in the collector */ 17 #define HIT_PENDING 0x01 /* A front op was still pending */ 18 #define NEED_REASSESS 0x02 /* Need to loop round and reassess */ 19 #define MADE_PROGRESS 0x04 /* Made progress cleaning up a stream or the folio set */ 20 #define NEED_UNLOCK 0x08 /* The pagecache needs unlocking */ 21 #define NEED_RETRY 0x10 /* A front op requests retrying */ 22 #define SAW_FAILURE 0x20 /* One stream or hit a permanent failure */ 23 24 static void netfs_dump_request(const struct netfs_io_request *rreq) 25 { 26 pr_err("Request R=%08x r=%d fl=%lx or=%x e=%ld\n", 27 rreq->debug_id, refcount_read(&rreq->ref), rreq->flags, 28 rreq->origin, rreq->error); 29 pr_err(" st=%llx tsl=%zx/%llx/%llx\n", 30 rreq->start, rreq->transferred, rreq->submitted, rreq->len); 31 pr_err(" cci=%llx/%llx/%llx\n", 32 rreq->cleaned_to, rreq->collected_to, atomic64_read(&rreq->issued_to)); 33 pr_err(" iw=%pSR\n", rreq->netfs_ops->issue_write); 34 for (int i = 0; i < NR_IO_STREAMS; i++) { 35 const struct netfs_io_subrequest *sreq; 36 const struct netfs_io_stream *s = &rreq->io_streams[i]; 37 38 pr_err(" str[%x] s=%x e=%d acnf=%u,%u,%u,%u\n", 39 s->stream_nr, s->source, s->error, 40 s->avail, s->active, s->need_retry, s->failed); 41 pr_err(" str[%x] ct=%llx t=%zx\n", 42 s->stream_nr, s->collected_to, s->transferred); 43 list_for_each_entry(sreq, &s->subrequests, rreq_link) { 44 pr_err(" sreq[%x:%x] sc=%u s=%llx t=%zx/%zx r=%d f=%lx\n", 45 sreq->stream_nr, sreq->debug_index, sreq->source, 46 sreq->start, sreq->transferred, sreq->len, 47 refcount_read(&sreq->ref), sreq->flags); 48 } 49 } 50 } 51 52 /* 53 * Successful completion of write of a folio to the server and/or cache. Note 54 * that we are not allowed to lock the folio here on pain of deadlocking with 55 * truncate. 56 */ 57 int netfs_folio_written_back(struct folio *folio) 58 { 59 enum netfs_folio_trace why = netfs_folio_trace_clear; 60 struct netfs_inode *ictx = netfs_inode(folio->mapping->host); 61 struct netfs_folio *finfo; 62 struct netfs_group *group = NULL; 63 int gcount = 0; 64 65 if ((finfo = netfs_folio_info(folio))) { 66 /* Streaming writes cannot be redirtied whilst under writeback, 67 * so discard the streaming record. 68 */ 69 unsigned long long fend; 70 71 fend = folio_pos(folio) + finfo->dirty_offset + finfo->dirty_len; 72 if (fend > ictx->zero_point) 73 ictx->zero_point = fend; 74 75 folio_detach_private(folio); 76 group = finfo->netfs_group; 77 gcount++; 78 kfree(finfo); 79 why = netfs_folio_trace_clear_s; 80 goto end_wb; 81 } 82 83 if ((group = netfs_folio_group(folio))) { 84 if (group == NETFS_FOLIO_COPY_TO_CACHE) { 85 why = netfs_folio_trace_clear_cc; 86 folio_detach_private(folio); 87 goto end_wb; 88 } 89 90 /* Need to detach the group pointer if the page didn't get 91 * redirtied. If it has been redirtied, then it must be within 92 * the same group. 93 */ 94 why = netfs_folio_trace_redirtied; 95 if (!folio_test_dirty(folio)) { 96 folio_detach_private(folio); 97 gcount++; 98 why = netfs_folio_trace_clear_g; 99 } 100 } 101 102 end_wb: 103 trace_netfs_folio(folio, why); 104 folio_end_writeback(folio); 105 return gcount; 106 } 107 108 /* 109 * Unlock any folios we've finished with. 110 */ 111 static void netfs_writeback_unlock_folios(struct netfs_io_request *wreq, 112 unsigned int *notes) 113 { 114 struct folio_queue *folioq = wreq->buffer.tail; 115 unsigned long long collected_to = wreq->collected_to; 116 unsigned int slot = wreq->buffer.first_tail_slot; 117 118 if (WARN_ON_ONCE(!folioq)) { 119 pr_err("[!] Writeback unlock found empty rolling buffer!\n"); 120 netfs_dump_request(wreq); 121 return; 122 } 123 124 if (wreq->origin == NETFS_PGPRIV2_COPY_TO_CACHE) { 125 if (netfs_pgpriv2_unlock_copied_folios(wreq)) 126 *notes |= MADE_PROGRESS; 127 return; 128 } 129 130 if (slot >= folioq_nr_slots(folioq)) { 131 folioq = rolling_buffer_delete_spent(&wreq->buffer); 132 if (!folioq) 133 return; 134 slot = 0; 135 } 136 137 for (;;) { 138 struct folio *folio; 139 struct netfs_folio *finfo; 140 unsigned long long fpos, fend; 141 size_t fsize, flen; 142 143 folio = folioq_folio(folioq, slot); 144 if (WARN_ONCE(!folio_test_writeback(folio), 145 "R=%08x: folio %lx is not under writeback\n", 146 wreq->debug_id, folio->index)) 147 trace_netfs_folio(folio, netfs_folio_trace_not_under_wback); 148 149 fpos = folio_pos(folio); 150 fsize = folio_size(folio); 151 finfo = netfs_folio_info(folio); 152 flen = finfo ? finfo->dirty_offset + finfo->dirty_len : fsize; 153 154 fend = min_t(unsigned long long, fpos + flen, wreq->i_size); 155 156 trace_netfs_collect_folio(wreq, folio, fend, collected_to); 157 158 /* Unlock any folio we've transferred all of. */ 159 if (collected_to < fend) 160 break; 161 162 wreq->nr_group_rel += netfs_folio_written_back(folio); 163 wreq->cleaned_to = fpos + fsize; 164 *notes |= MADE_PROGRESS; 165 166 /* Clean up the head folioq. If we clear an entire folioq, then 167 * we can get rid of it provided it's not also the tail folioq 168 * being filled by the issuer. 169 */ 170 folioq_clear(folioq, slot); 171 slot++; 172 if (slot >= folioq_nr_slots(folioq)) { 173 folioq = rolling_buffer_delete_spent(&wreq->buffer); 174 if (!folioq) 175 goto done; 176 slot = 0; 177 } 178 179 if (fpos + fsize >= collected_to) 180 break; 181 } 182 183 wreq->buffer.tail = folioq; 184 done: 185 wreq->buffer.first_tail_slot = slot; 186 } 187 188 /* 189 * Collect and assess the results of various write subrequests. We may need to 190 * retry some of the results - or even do an RMW cycle for content crypto. 191 * 192 * Note that we have a number of parallel, overlapping lists of subrequests, 193 * one to the server and one to the local cache for example, which may not be 194 * the same size or starting position and may not even correspond in boundary 195 * alignment. 196 */ 197 static void netfs_collect_write_results(struct netfs_io_request *wreq) 198 { 199 struct netfs_io_subrequest *front, *remove; 200 struct netfs_io_stream *stream; 201 unsigned long long collected_to, issued_to; 202 unsigned int notes; 203 int s; 204 205 _enter("%llx-%llx", wreq->start, wreq->start + wreq->len); 206 trace_netfs_collect(wreq); 207 trace_netfs_rreq(wreq, netfs_rreq_trace_collect); 208 209 reassess_streams: 210 issued_to = atomic64_read(&wreq->issued_to); 211 smp_rmb(); 212 collected_to = ULLONG_MAX; 213 if (wreq->origin == NETFS_WRITEBACK || 214 wreq->origin == NETFS_WRITETHROUGH || 215 wreq->origin == NETFS_PGPRIV2_COPY_TO_CACHE) 216 notes = NEED_UNLOCK; 217 else 218 notes = 0; 219 220 /* Remove completed subrequests from the front of the streams and 221 * advance the completion point on each stream. We stop when we hit 222 * something that's in progress. The issuer thread may be adding stuff 223 * to the tail whilst we're doing this. 224 */ 225 for (s = 0; s < NR_IO_STREAMS; s++) { 226 stream = &wreq->io_streams[s]; 227 /* Read active flag before list pointers */ 228 if (!smp_load_acquire(&stream->active)) 229 continue; 230 231 front = stream->front; 232 while (front) { 233 trace_netfs_collect_sreq(wreq, front); 234 //_debug("sreq [%x] %llx %zx/%zx", 235 // front->debug_index, front->start, front->transferred, front->len); 236 237 if (stream->collected_to < front->start) { 238 trace_netfs_collect_gap(wreq, stream, issued_to, 'F'); 239 stream->collected_to = front->start; 240 } 241 242 /* Stall if the front is still undergoing I/O. */ 243 if (test_bit(NETFS_SREQ_IN_PROGRESS, &front->flags)) { 244 notes |= HIT_PENDING; 245 break; 246 } 247 smp_rmb(); /* Read counters after I-P flag. */ 248 249 if (stream->failed) { 250 stream->collected_to = front->start + front->len; 251 notes |= MADE_PROGRESS | SAW_FAILURE; 252 goto cancel; 253 } 254 if (front->start + front->transferred > stream->collected_to) { 255 stream->collected_to = front->start + front->transferred; 256 stream->transferred = stream->collected_to - wreq->start; 257 notes |= MADE_PROGRESS; 258 } 259 if (test_bit(NETFS_SREQ_FAILED, &front->flags)) { 260 stream->failed = true; 261 stream->error = front->error; 262 if (stream->source == NETFS_UPLOAD_TO_SERVER) 263 mapping_set_error(wreq->mapping, front->error); 264 notes |= NEED_REASSESS | SAW_FAILURE; 265 break; 266 } 267 if (front->transferred < front->len) { 268 stream->need_retry = true; 269 notes |= NEED_RETRY | MADE_PROGRESS; 270 break; 271 } 272 273 cancel: 274 /* Remove if completely consumed. */ 275 spin_lock(&wreq->lock); 276 277 remove = front; 278 list_del_init(&front->rreq_link); 279 front = list_first_entry_or_null(&stream->subrequests, 280 struct netfs_io_subrequest, rreq_link); 281 stream->front = front; 282 spin_unlock(&wreq->lock); 283 netfs_put_subrequest(remove, false, 284 notes & SAW_FAILURE ? 285 netfs_sreq_trace_put_cancel : 286 netfs_sreq_trace_put_done); 287 } 288 289 /* If we have an empty stream, we need to jump it forward 290 * otherwise the collection point will never advance. 291 */ 292 if (!front && issued_to > stream->collected_to) { 293 trace_netfs_collect_gap(wreq, stream, issued_to, 'E'); 294 stream->collected_to = issued_to; 295 } 296 297 if (stream->collected_to < collected_to) 298 collected_to = stream->collected_to; 299 } 300 301 if (collected_to != ULLONG_MAX && collected_to > wreq->collected_to) 302 wreq->collected_to = collected_to; 303 304 for (s = 0; s < NR_IO_STREAMS; s++) { 305 stream = &wreq->io_streams[s]; 306 if (stream->active) 307 trace_netfs_collect_stream(wreq, stream); 308 } 309 310 trace_netfs_collect_state(wreq, wreq->collected_to, notes); 311 312 /* Unlock any folios that we have now finished with. */ 313 if (notes & NEED_UNLOCK) { 314 if (wreq->cleaned_to < wreq->collected_to) 315 netfs_writeback_unlock_folios(wreq, ¬es); 316 } else { 317 wreq->cleaned_to = wreq->collected_to; 318 } 319 320 // TODO: Discard encryption buffers 321 322 if (notes & NEED_RETRY) 323 goto need_retry; 324 if ((notes & MADE_PROGRESS) && test_bit(NETFS_RREQ_PAUSE, &wreq->flags)) { 325 trace_netfs_rreq(wreq, netfs_rreq_trace_unpause); 326 clear_bit_unlock(NETFS_RREQ_PAUSE, &wreq->flags); 327 smp_mb__after_atomic(); /* Set PAUSE before task state */ 328 wake_up(&wreq->waitq); 329 } 330 331 if (notes & NEED_REASSESS) { 332 //cond_resched(); 333 goto reassess_streams; 334 } 335 if (notes & MADE_PROGRESS) { 336 //cond_resched(); 337 goto reassess_streams; 338 } 339 340 out: 341 netfs_put_group_many(wreq->group, wreq->nr_group_rel); 342 wreq->nr_group_rel = 0; 343 _leave(" = %x", notes); 344 return; 345 346 need_retry: 347 /* Okay... We're going to have to retry one or both streams. Note 348 * that any partially completed op will have had any wholly transferred 349 * folios removed from it. 350 */ 351 _debug("retry"); 352 netfs_retry_writes(wreq); 353 goto out; 354 } 355 356 /* 357 * Perform the collection of subrequests, folios and encryption buffers. 358 */ 359 void netfs_write_collection_worker(struct work_struct *work) 360 { 361 struct netfs_io_request *wreq = container_of(work, struct netfs_io_request, work); 362 struct netfs_inode *ictx = netfs_inode(wreq->inode); 363 size_t transferred; 364 int s; 365 366 _enter("R=%x", wreq->debug_id); 367 368 netfs_see_request(wreq, netfs_rreq_trace_see_work); 369 if (!test_bit(NETFS_RREQ_IN_PROGRESS, &wreq->flags)) { 370 netfs_put_request(wreq, false, netfs_rreq_trace_put_work); 371 return; 372 } 373 374 netfs_collect_write_results(wreq); 375 376 /* We're done when the app thread has finished posting subreqs and all 377 * the queues in all the streams are empty. 378 */ 379 if (!test_bit(NETFS_RREQ_ALL_QUEUED, &wreq->flags)) { 380 netfs_put_request(wreq, false, netfs_rreq_trace_put_work); 381 return; 382 } 383 smp_rmb(); /* Read ALL_QUEUED before lists. */ 384 385 transferred = LONG_MAX; 386 for (s = 0; s < NR_IO_STREAMS; s++) { 387 struct netfs_io_stream *stream = &wreq->io_streams[s]; 388 if (!stream->active) 389 continue; 390 if (!list_empty(&stream->subrequests)) { 391 netfs_put_request(wreq, false, netfs_rreq_trace_put_work); 392 return; 393 } 394 if (stream->transferred < transferred) 395 transferred = stream->transferred; 396 } 397 398 /* Okay, declare that all I/O is complete. */ 399 wreq->transferred = transferred; 400 trace_netfs_rreq(wreq, netfs_rreq_trace_write_done); 401 402 if (wreq->io_streams[1].active && 403 wreq->io_streams[1].failed) { 404 /* Cache write failure doesn't prevent writeback completion 405 * unless we're in disconnected mode. 406 */ 407 ictx->ops->invalidate_cache(wreq); 408 } 409 410 if (wreq->cleanup) 411 wreq->cleanup(wreq); 412 413 if (wreq->origin == NETFS_DIO_WRITE && 414 wreq->mapping->nrpages) { 415 /* mmap may have got underfoot and we may now have folios 416 * locally covering the region we just wrote. Attempt to 417 * discard the folios, but leave in place any modified locally. 418 * ->write_iter() is prevented from interfering by the DIO 419 * counter. 420 */ 421 pgoff_t first = wreq->start >> PAGE_SHIFT; 422 pgoff_t last = (wreq->start + wreq->transferred - 1) >> PAGE_SHIFT; 423 invalidate_inode_pages2_range(wreq->mapping, first, last); 424 } 425 426 if (wreq->origin == NETFS_DIO_WRITE) 427 inode_dio_end(wreq->inode); 428 429 _debug("finished"); 430 trace_netfs_rreq(wreq, netfs_rreq_trace_wake_ip); 431 clear_and_wake_up_bit(NETFS_RREQ_IN_PROGRESS, &wreq->flags); 432 433 if (wreq->iocb) { 434 size_t written = min(wreq->transferred, wreq->len); 435 wreq->iocb->ki_pos += written; 436 if (wreq->iocb->ki_complete) 437 wreq->iocb->ki_complete( 438 wreq->iocb, wreq->error ? wreq->error : written); 439 wreq->iocb = VFS_PTR_POISON; 440 } 441 442 netfs_clear_subrequests(wreq, false); 443 netfs_put_request(wreq, false, netfs_rreq_trace_put_work_complete); 444 } 445 446 /* 447 * Wake the collection work item. 448 */ 449 void netfs_wake_write_collector(struct netfs_io_request *wreq, bool was_async) 450 { 451 if (!work_pending(&wreq->work)) { 452 netfs_get_request(wreq, netfs_rreq_trace_get_work); 453 if (!queue_work(system_unbound_wq, &wreq->work)) 454 netfs_put_request(wreq, was_async, netfs_rreq_trace_put_work_nq); 455 } 456 } 457 458 /** 459 * netfs_write_subrequest_terminated - Note the termination of a write operation. 460 * @_op: The I/O request that has terminated. 461 * @transferred_or_error: The amount of data transferred or an error code. 462 * @was_async: The termination was asynchronous 463 * 464 * This tells the library that a contributory write I/O operation has 465 * terminated, one way or another, and that it should collect the results. 466 * 467 * The caller indicates in @transferred_or_error the outcome of the operation, 468 * supplying a positive value to indicate the number of bytes transferred or a 469 * negative error code. The library will look after reissuing I/O operations 470 * as appropriate and writing downloaded data to the cache. 471 * 472 * If @was_async is true, the caller might be running in softirq or interrupt 473 * context and we can't sleep. 474 * 475 * When this is called, ownership of the subrequest is transferred back to the 476 * library, along with a ref. 477 * 478 * Note that %_op is a void* so that the function can be passed to 479 * kiocb::term_func without the need for a casting wrapper. 480 */ 481 void netfs_write_subrequest_terminated(void *_op, ssize_t transferred_or_error, 482 bool was_async) 483 { 484 struct netfs_io_subrequest *subreq = _op; 485 struct netfs_io_request *wreq = subreq->rreq; 486 struct netfs_io_stream *stream = &wreq->io_streams[subreq->stream_nr]; 487 488 _enter("%x[%x] %zd", wreq->debug_id, subreq->debug_index, transferred_or_error); 489 490 switch (subreq->source) { 491 case NETFS_UPLOAD_TO_SERVER: 492 netfs_stat(&netfs_n_wh_upload_done); 493 break; 494 case NETFS_WRITE_TO_CACHE: 495 netfs_stat(&netfs_n_wh_write_done); 496 break; 497 case NETFS_INVALID_WRITE: 498 break; 499 default: 500 BUG(); 501 } 502 503 if (IS_ERR_VALUE(transferred_or_error)) { 504 subreq->error = transferred_or_error; 505 if (subreq->error == -EAGAIN) 506 set_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags); 507 else 508 set_bit(NETFS_SREQ_FAILED, &subreq->flags); 509 trace_netfs_failure(wreq, subreq, transferred_or_error, netfs_fail_write); 510 511 switch (subreq->source) { 512 case NETFS_WRITE_TO_CACHE: 513 netfs_stat(&netfs_n_wh_write_failed); 514 break; 515 case NETFS_UPLOAD_TO_SERVER: 516 netfs_stat(&netfs_n_wh_upload_failed); 517 break; 518 default: 519 break; 520 } 521 trace_netfs_rreq(wreq, netfs_rreq_trace_set_pause); 522 set_bit(NETFS_RREQ_PAUSE, &wreq->flags); 523 } else { 524 if (WARN(transferred_or_error > subreq->len - subreq->transferred, 525 "Subreq excess write: R=%x[%x] %zd > %zu - %zu", 526 wreq->debug_id, subreq->debug_index, 527 transferred_or_error, subreq->len, subreq->transferred)) 528 transferred_or_error = subreq->len - subreq->transferred; 529 530 subreq->error = 0; 531 subreq->transferred += transferred_or_error; 532 533 if (subreq->transferred < subreq->len) 534 set_bit(NETFS_SREQ_NEED_RETRY, &subreq->flags); 535 } 536 537 trace_netfs_sreq(subreq, netfs_sreq_trace_terminated); 538 539 clear_and_wake_up_bit(NETFS_SREQ_IN_PROGRESS, &subreq->flags); 540 541 /* If we are at the head of the queue, wake up the collector, 542 * transferring a ref to it if we were the ones to do so. 543 */ 544 if (list_is_first(&subreq->rreq_link, &stream->subrequests)) 545 netfs_wake_write_collector(wreq, was_async); 546 547 netfs_put_subrequest(subreq, was_async, netfs_sreq_trace_put_terminated); 548 } 549 EXPORT_SYMBOL(netfs_write_subrequest_terminated); 550