1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/backing-dev.h> 5 #include <linux/fs.h> 6 #include <linux/mm.h> 7 #include <linux/swap.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/pagevec.h> 11 #include <linux/task_io_accounting_ops.h> 12 #include <linux/signal.h> 13 #include <linux/iversion.h> 14 #include <linux/ktime.h> 15 #include <linux/netfs.h> 16 #include <trace/events/netfs.h> 17 18 #include "super.h" 19 #include "mds_client.h" 20 #include "cache.h" 21 #include "metric.h" 22 #include "crypto.h" 23 #include <linux/ceph/osd_client.h> 24 #include <linux/ceph/striper.h> 25 26 /* 27 * Ceph address space ops. 28 * 29 * There are a few funny things going on here. 30 * 31 * The page->private field is used to reference a struct 32 * ceph_snap_context for _every_ dirty page. This indicates which 33 * snapshot the page was logically dirtied in, and thus which snap 34 * context needs to be associated with the osd write during writeback. 35 * 36 * Similarly, struct ceph_inode_info maintains a set of counters to 37 * count dirty pages on the inode. In the absence of snapshots, 38 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. 39 * 40 * When a snapshot is taken (that is, when the client receives 41 * notification that a snapshot was taken), each inode with caps and 42 * with dirty pages (dirty pages implies there is a cap) gets a new 43 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending 44 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is 45 * moved to capsnap->dirty. (Unless a sync write is currently in 46 * progress. In that case, the capsnap is said to be "pending", new 47 * writes cannot start, and the capsnap isn't "finalized" until the 48 * write completes (or fails) and a final size/mtime for the inode for 49 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. 50 * 51 * On writeback, we must submit writes to the osd IN SNAP ORDER. So, 52 * we look for the first capsnap in i_cap_snaps and write out pages in 53 * that snap context _only_. Then we move on to the next capsnap, 54 * eventually reaching the "live" or "head" context (i.e., pages that 55 * are not yet snapped) and are writing the most recently dirtied 56 * pages. 57 * 58 * Invalidate and so forth must take care to ensure the dirty page 59 * accounting is preserved. 60 */ 61 62 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10)) 63 #define CONGESTION_OFF_THRESH(congestion_kb) \ 64 (CONGESTION_ON_THRESH(congestion_kb) - \ 65 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) 66 67 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 68 struct folio **foliop, void **_fsdata); 69 70 static inline struct ceph_snap_context *page_snap_context(struct page *page) 71 { 72 if (PagePrivate(page)) 73 return (void *)page->private; 74 return NULL; 75 } 76 77 /* 78 * Dirty a page. Optimistically adjust accounting, on the assumption 79 * that we won't race with invalidate. If we do, readjust. 80 */ 81 static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) 82 { 83 struct inode *inode = mapping->host; 84 struct ceph_client *cl = ceph_inode_to_client(inode); 85 struct ceph_inode_info *ci; 86 struct ceph_snap_context *snapc; 87 88 if (folio_test_dirty(folio)) { 89 doutc(cl, "%llx.%llx %p idx %lu -- already dirty\n", 90 ceph_vinop(inode), folio, folio->index); 91 VM_BUG_ON_FOLIO(!folio_test_private(folio), folio); 92 return false; 93 } 94 95 ci = ceph_inode(inode); 96 97 /* dirty the head */ 98 spin_lock(&ci->i_ceph_lock); 99 BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference 100 if (__ceph_have_pending_cap_snap(ci)) { 101 struct ceph_cap_snap *capsnap = 102 list_last_entry(&ci->i_cap_snaps, 103 struct ceph_cap_snap, 104 ci_item); 105 snapc = ceph_get_snap_context(capsnap->context); 106 capsnap->dirty_pages++; 107 } else { 108 BUG_ON(!ci->i_head_snapc); 109 snapc = ceph_get_snap_context(ci->i_head_snapc); 110 ++ci->i_wrbuffer_ref_head; 111 } 112 if (ci->i_wrbuffer_ref == 0) 113 ihold(inode); 114 ++ci->i_wrbuffer_ref; 115 doutc(cl, "%llx.%llx %p idx %lu head %d/%d -> %d/%d " 116 "snapc %p seq %lld (%d snaps)\n", 117 ceph_vinop(inode), folio, folio->index, 118 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, 119 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 120 snapc, snapc->seq, snapc->num_snaps); 121 spin_unlock(&ci->i_ceph_lock); 122 123 /* 124 * Reference snap context in folio->private. Also set 125 * PagePrivate so that we get invalidate_folio callback. 126 */ 127 VM_WARN_ON_FOLIO(folio->private, folio); 128 folio_attach_private(folio, snapc); 129 130 return ceph_fscache_dirty_folio(mapping, folio); 131 } 132 133 /* 134 * If we are truncating the full folio (i.e. offset == 0), adjust the 135 * dirty folio counters appropriately. Only called if there is private 136 * data on the folio. 137 */ 138 static void ceph_invalidate_folio(struct folio *folio, size_t offset, 139 size_t length) 140 { 141 struct inode *inode = folio->mapping->host; 142 struct ceph_client *cl = ceph_inode_to_client(inode); 143 struct ceph_inode_info *ci = ceph_inode(inode); 144 struct ceph_snap_context *snapc; 145 146 147 if (offset != 0 || length != folio_size(folio)) { 148 doutc(cl, "%llx.%llx idx %lu partial dirty page %zu~%zu\n", 149 ceph_vinop(inode), folio->index, offset, length); 150 return; 151 } 152 153 WARN_ON(!folio_test_locked(folio)); 154 if (folio_test_private(folio)) { 155 doutc(cl, "%llx.%llx idx %lu full dirty page\n", 156 ceph_vinop(inode), folio->index); 157 158 snapc = folio_detach_private(folio); 159 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 160 ceph_put_snap_context(snapc); 161 } 162 163 netfs_invalidate_folio(folio, offset, length); 164 } 165 166 static void ceph_netfs_expand_readahead(struct netfs_io_request *rreq) 167 { 168 struct inode *inode = rreq->inode; 169 struct ceph_inode_info *ci = ceph_inode(inode); 170 struct ceph_file_layout *lo = &ci->i_layout; 171 unsigned long max_pages = inode->i_sb->s_bdi->ra_pages; 172 loff_t end = rreq->start + rreq->len, new_end; 173 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 174 unsigned long max_len; 175 u32 blockoff; 176 177 if (priv) { 178 /* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */ 179 if (priv->file_ra_disabled) 180 max_pages = 0; 181 else 182 max_pages = priv->file_ra_pages; 183 184 } 185 186 /* Readahead is disabled */ 187 if (!max_pages) 188 return; 189 190 max_len = max_pages << PAGE_SHIFT; 191 192 /* 193 * Try to expand the length forward by rounding up it to the next 194 * block, but do not exceed the file size, unless the original 195 * request already exceeds it. 196 */ 197 new_end = umin(round_up(end, lo->stripe_unit), rreq->i_size); 198 if (new_end > end && new_end <= rreq->start + max_len) 199 rreq->len = new_end - rreq->start; 200 201 /* Try to expand the start downward */ 202 div_u64_rem(rreq->start, lo->stripe_unit, &blockoff); 203 if (rreq->len + blockoff <= max_len) { 204 rreq->start -= blockoff; 205 rreq->len += blockoff; 206 } 207 } 208 209 static void finish_netfs_read(struct ceph_osd_request *req) 210 { 211 struct inode *inode = req->r_inode; 212 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 213 struct ceph_client *cl = fsc->client; 214 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 215 struct netfs_io_subrequest *subreq = req->r_priv; 216 struct ceph_osd_req_op *op = &req->r_ops[0]; 217 int err = req->r_result; 218 bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); 219 220 ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, 221 req->r_end_latency, osd_data->length, err); 222 223 doutc(cl, "result %d subreq->len=%zu i_size=%lld\n", req->r_result, 224 subreq->len, i_size_read(req->r_inode)); 225 226 /* no object means success but no data */ 227 if (err == -ENOENT) 228 err = 0; 229 else if (err == -EBLOCKLISTED) 230 fsc->blocklisted = true; 231 232 if (err >= 0) { 233 if (sparse && err > 0) 234 err = ceph_sparse_ext_map_end(op); 235 if (err < subreq->len && 236 subreq->rreq->origin != NETFS_DIO_READ) 237 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 238 if (IS_ENCRYPTED(inode) && err > 0) { 239 err = ceph_fscrypt_decrypt_extents(inode, 240 osd_data->pages, subreq->start, 241 op->extent.sparse_ext, 242 op->extent.sparse_ext_cnt); 243 if (err > subreq->len) 244 err = subreq->len; 245 } 246 } 247 248 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 249 ceph_put_page_vector(osd_data->pages, 250 calc_pages_for(osd_data->alignment, 251 osd_data->length), false); 252 } 253 if (err > 0) { 254 subreq->transferred = err; 255 err = 0; 256 } 257 trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); 258 netfs_read_subreq_terminated(subreq, err, false); 259 iput(req->r_inode); 260 ceph_dec_osd_stopping_blocker(fsc->mdsc); 261 } 262 263 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq) 264 { 265 struct netfs_io_request *rreq = subreq->rreq; 266 struct inode *inode = rreq->inode; 267 struct ceph_mds_reply_info_parsed *rinfo; 268 struct ceph_mds_reply_info_in *iinfo; 269 struct ceph_mds_request *req; 270 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 271 struct ceph_inode_info *ci = ceph_inode(inode); 272 ssize_t err = 0; 273 size_t len; 274 int mode; 275 276 if (rreq->origin != NETFS_DIO_READ) 277 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 278 __clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags); 279 280 if (subreq->start >= inode->i_size) 281 goto out; 282 283 /* We need to fetch the inline data. */ 284 mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA); 285 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode); 286 if (IS_ERR(req)) { 287 err = PTR_ERR(req); 288 goto out; 289 } 290 req->r_ino1 = ci->i_vino; 291 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA); 292 req->r_num_caps = 2; 293 294 trace_netfs_sreq(subreq, netfs_sreq_trace_submit); 295 err = ceph_mdsc_do_request(mdsc, NULL, req); 296 if (err < 0) 297 goto out; 298 299 rinfo = &req->r_reply_info; 300 iinfo = &rinfo->targeti; 301 if (iinfo->inline_version == CEPH_INLINE_NONE) { 302 /* The data got uninlined */ 303 ceph_mdsc_put_request(req); 304 return false; 305 } 306 307 len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len); 308 err = copy_to_iter(iinfo->inline_data + subreq->start, len, &subreq->io_iter); 309 if (err == 0) { 310 err = -EFAULT; 311 } else { 312 subreq->transferred += err; 313 err = 0; 314 } 315 316 ceph_mdsc_put_request(req); 317 out: 318 netfs_read_subreq_terminated(subreq, err, false); 319 return true; 320 } 321 322 static int ceph_netfs_prepare_read(struct netfs_io_subrequest *subreq) 323 { 324 struct netfs_io_request *rreq = subreq->rreq; 325 struct inode *inode = rreq->inode; 326 struct ceph_inode_info *ci = ceph_inode(inode); 327 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 328 u64 objno, objoff; 329 u32 xlen; 330 331 /* Truncate the extent at the end of the current block */ 332 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len, 333 &objno, &objoff, &xlen); 334 rreq->io_streams[0].sreq_max_len = umin(xlen, fsc->mount_options->rsize); 335 return 0; 336 } 337 338 static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq) 339 { 340 struct netfs_io_request *rreq = subreq->rreq; 341 struct inode *inode = rreq->inode; 342 struct ceph_inode_info *ci = ceph_inode(inode); 343 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 344 struct ceph_client *cl = fsc->client; 345 struct ceph_osd_request *req = NULL; 346 struct ceph_vino vino = ceph_vino(inode); 347 int err; 348 u64 len; 349 bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD); 350 u64 off = subreq->start; 351 int extent_cnt; 352 353 if (ceph_inode_is_shutdown(inode)) { 354 err = -EIO; 355 goto out; 356 } 357 358 if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq)) 359 return; 360 361 // TODO: This rounding here is slightly dodgy. It *should* work, for 362 // now, as the cache only deals in blocks that are a multiple of 363 // PAGE_SIZE and fscrypt blocks are at most PAGE_SIZE. What needs to 364 // happen is for the fscrypt driving to be moved into netfslib and the 365 // data in the cache also to be stored encrypted. 366 len = subreq->len; 367 ceph_fscrypt_adjust_off_and_len(inode, &off, &len); 368 369 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, 370 off, &len, 0, 1, sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ, 371 CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, 372 ci->i_truncate_size, false); 373 if (IS_ERR(req)) { 374 err = PTR_ERR(req); 375 req = NULL; 376 goto out; 377 } 378 379 if (sparse) { 380 extent_cnt = __ceph_sparse_read_ext_count(inode, len); 381 err = ceph_alloc_sparse_ext_map(&req->r_ops[0], extent_cnt); 382 if (err) 383 goto out; 384 } 385 386 doutc(cl, "%llx.%llx pos=%llu orig_len=%zu len=%llu\n", 387 ceph_vinop(inode), subreq->start, subreq->len, len); 388 389 /* 390 * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for 391 * encrypted inodes. We'd need infrastructure that handles an iov_iter 392 * instead of page arrays, and we don't have that as of yet. Once the 393 * dust settles on the write helpers and encrypt/decrypt routines for 394 * netfs, we should be able to rework this. 395 */ 396 if (IS_ENCRYPTED(inode)) { 397 struct page **pages; 398 size_t page_off; 399 400 err = iov_iter_get_pages_alloc2(&subreq->io_iter, &pages, len, &page_off); 401 if (err < 0) { 402 doutc(cl, "%llx.%llx failed to allocate pages, %d\n", 403 ceph_vinop(inode), err); 404 goto out; 405 } 406 407 /* should always give us a page-aligned read */ 408 WARN_ON_ONCE(page_off); 409 len = err; 410 err = 0; 411 412 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, 413 false); 414 } else { 415 osd_req_op_extent_osd_iter(req, 0, &subreq->io_iter); 416 } 417 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 418 err = -EIO; 419 goto out; 420 } 421 req->r_callback = finish_netfs_read; 422 req->r_priv = subreq; 423 req->r_inode = inode; 424 ihold(inode); 425 426 trace_netfs_sreq(subreq, netfs_sreq_trace_submit); 427 ceph_osdc_start_request(req->r_osdc, req); 428 out: 429 ceph_osdc_put_request(req); 430 if (err) 431 netfs_read_subreq_terminated(subreq, err, false); 432 doutc(cl, "%llx.%llx result %d\n", ceph_vinop(inode), err); 433 } 434 435 static int ceph_init_request(struct netfs_io_request *rreq, struct file *file) 436 { 437 struct inode *inode = rreq->inode; 438 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 439 struct ceph_client *cl = ceph_inode_to_client(inode); 440 int got = 0, want = CEPH_CAP_FILE_CACHE; 441 struct ceph_netfs_request_data *priv; 442 int ret = 0; 443 444 /* [DEPRECATED] Use PG_private_2 to mark folio being written to the cache. */ 445 __set_bit(NETFS_RREQ_USE_PGPRIV2, &rreq->flags); 446 447 if (rreq->origin != NETFS_READAHEAD) 448 return 0; 449 450 priv = kzalloc(sizeof(*priv), GFP_NOFS); 451 if (!priv) 452 return -ENOMEM; 453 454 if (file) { 455 struct ceph_rw_context *rw_ctx; 456 struct ceph_file_info *fi = file->private_data; 457 458 priv->file_ra_pages = file->f_ra.ra_pages; 459 priv->file_ra_disabled = file->f_mode & FMODE_RANDOM; 460 461 rw_ctx = ceph_find_rw_context(fi); 462 if (rw_ctx) { 463 rreq->netfs_priv = priv; 464 return 0; 465 } 466 } 467 468 /* 469 * readahead callers do not necessarily hold Fcb caps 470 * (e.g. fadvise, madvise). 471 */ 472 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got); 473 if (ret < 0) { 474 doutc(cl, "%llx.%llx, error getting cap\n", ceph_vinop(inode)); 475 goto out; 476 } 477 478 if (!(got & want)) { 479 doutc(cl, "%llx.%llx, no cache cap\n", ceph_vinop(inode)); 480 ret = -EACCES; 481 goto out; 482 } 483 if (ret == 0) { 484 ret = -EACCES; 485 goto out; 486 } 487 488 priv->caps = got; 489 rreq->netfs_priv = priv; 490 rreq->io_streams[0].sreq_max_len = fsc->mount_options->rsize; 491 492 out: 493 if (ret < 0) 494 kfree(priv); 495 496 return ret; 497 } 498 499 static void ceph_netfs_free_request(struct netfs_io_request *rreq) 500 { 501 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 502 503 if (!priv) 504 return; 505 506 if (priv->caps) 507 ceph_put_cap_refs(ceph_inode(rreq->inode), priv->caps); 508 kfree(priv); 509 rreq->netfs_priv = NULL; 510 } 511 512 const struct netfs_request_ops ceph_netfs_ops = { 513 .init_request = ceph_init_request, 514 .free_request = ceph_netfs_free_request, 515 .prepare_read = ceph_netfs_prepare_read, 516 .issue_read = ceph_netfs_issue_read, 517 .expand_readahead = ceph_netfs_expand_readahead, 518 .check_write_begin = ceph_netfs_check_write_begin, 519 }; 520 521 #ifdef CONFIG_CEPH_FSCACHE 522 static void ceph_set_page_fscache(struct page *page) 523 { 524 folio_start_private_2(page_folio(page)); /* [DEPRECATED] */ 525 } 526 527 static void ceph_fscache_write_terminated(void *priv, ssize_t error, bool was_async) 528 { 529 struct inode *inode = priv; 530 531 if (IS_ERR_VALUE(error) && error != -ENOBUFS) 532 ceph_fscache_invalidate(inode, false); 533 } 534 535 static void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 536 { 537 struct ceph_inode_info *ci = ceph_inode(inode); 538 struct fscache_cookie *cookie = ceph_fscache_cookie(ci); 539 540 fscache_write_to_cache(cookie, inode->i_mapping, off, len, i_size_read(inode), 541 ceph_fscache_write_terminated, inode, true, caching); 542 } 543 #else 544 static inline void ceph_set_page_fscache(struct page *page) 545 { 546 } 547 548 static inline void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 549 { 550 } 551 #endif /* CONFIG_CEPH_FSCACHE */ 552 553 struct ceph_writeback_ctl 554 { 555 loff_t i_size; 556 u64 truncate_size; 557 u32 truncate_seq; 558 bool size_stable; 559 bool head_snapc; 560 }; 561 562 /* 563 * Get ref for the oldest snapc for an inode with dirty data... that is, the 564 * only snap context we are allowed to write back. 565 */ 566 static struct ceph_snap_context * 567 get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl, 568 struct ceph_snap_context *page_snapc) 569 { 570 struct ceph_inode_info *ci = ceph_inode(inode); 571 struct ceph_client *cl = ceph_inode_to_client(inode); 572 struct ceph_snap_context *snapc = NULL; 573 struct ceph_cap_snap *capsnap = NULL; 574 575 spin_lock(&ci->i_ceph_lock); 576 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 577 doutc(cl, " capsnap %p snapc %p has %d dirty pages\n", 578 capsnap, capsnap->context, capsnap->dirty_pages); 579 if (!capsnap->dirty_pages) 580 continue; 581 582 /* get i_size, truncate_{seq,size} for page_snapc? */ 583 if (snapc && capsnap->context != page_snapc) 584 continue; 585 586 if (ctl) { 587 if (capsnap->writing) { 588 ctl->i_size = i_size_read(inode); 589 ctl->size_stable = false; 590 } else { 591 ctl->i_size = capsnap->size; 592 ctl->size_stable = true; 593 } 594 ctl->truncate_size = capsnap->truncate_size; 595 ctl->truncate_seq = capsnap->truncate_seq; 596 ctl->head_snapc = false; 597 } 598 599 if (snapc) 600 break; 601 602 snapc = ceph_get_snap_context(capsnap->context); 603 if (!page_snapc || 604 page_snapc == snapc || 605 page_snapc->seq > snapc->seq) 606 break; 607 } 608 if (!snapc && ci->i_wrbuffer_ref_head) { 609 snapc = ceph_get_snap_context(ci->i_head_snapc); 610 doutc(cl, " head snapc %p has %d dirty pages\n", snapc, 611 ci->i_wrbuffer_ref_head); 612 if (ctl) { 613 ctl->i_size = i_size_read(inode); 614 ctl->truncate_size = ci->i_truncate_size; 615 ctl->truncate_seq = ci->i_truncate_seq; 616 ctl->size_stable = false; 617 ctl->head_snapc = true; 618 } 619 } 620 spin_unlock(&ci->i_ceph_lock); 621 return snapc; 622 } 623 624 static u64 get_writepages_data_length(struct inode *inode, 625 struct page *page, u64 start) 626 { 627 struct ceph_inode_info *ci = ceph_inode(inode); 628 struct ceph_snap_context *snapc; 629 struct ceph_cap_snap *capsnap = NULL; 630 u64 end = i_size_read(inode); 631 u64 ret; 632 633 snapc = page_snap_context(ceph_fscrypt_pagecache_page(page)); 634 if (snapc != ci->i_head_snapc) { 635 bool found = false; 636 spin_lock(&ci->i_ceph_lock); 637 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 638 if (capsnap->context == snapc) { 639 if (!capsnap->writing) 640 end = capsnap->size; 641 found = true; 642 break; 643 } 644 } 645 spin_unlock(&ci->i_ceph_lock); 646 WARN_ON(!found); 647 } 648 if (end > ceph_fscrypt_page_offset(page) + thp_size(page)) 649 end = ceph_fscrypt_page_offset(page) + thp_size(page); 650 ret = end > start ? end - start : 0; 651 if (ret && fscrypt_is_bounce_page(page)) 652 ret = round_up(ret, CEPH_FSCRYPT_BLOCK_SIZE); 653 return ret; 654 } 655 656 /* 657 * Write a single page, but leave the page locked. 658 * 659 * If we get a write error, mark the mapping for error, but still adjust the 660 * dirty page accounting (i.e., page is no longer dirty). 661 */ 662 static int writepage_nounlock(struct page *page, struct writeback_control *wbc) 663 { 664 struct folio *folio = page_folio(page); 665 struct inode *inode = page->mapping->host; 666 struct ceph_inode_info *ci = ceph_inode(inode); 667 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 668 struct ceph_client *cl = fsc->client; 669 struct ceph_snap_context *snapc, *oldest; 670 loff_t page_off = page_offset(page); 671 int err; 672 loff_t len = thp_size(page); 673 loff_t wlen; 674 struct ceph_writeback_ctl ceph_wbc; 675 struct ceph_osd_client *osdc = &fsc->client->osdc; 676 struct ceph_osd_request *req; 677 bool caching = ceph_is_cache_enabled(inode); 678 struct page *bounce_page = NULL; 679 680 doutc(cl, "%llx.%llx page %p idx %lu\n", ceph_vinop(inode), page, 681 page->index); 682 683 if (ceph_inode_is_shutdown(inode)) 684 return -EIO; 685 686 /* verify this is a writeable snap context */ 687 snapc = page_snap_context(page); 688 if (!snapc) { 689 doutc(cl, "%llx.%llx page %p not dirty?\n", ceph_vinop(inode), 690 page); 691 return 0; 692 } 693 oldest = get_oldest_context(inode, &ceph_wbc, snapc); 694 if (snapc->seq > oldest->seq) { 695 doutc(cl, "%llx.%llx page %p snapc %p not writeable - noop\n", 696 ceph_vinop(inode), page, snapc); 697 /* we should only noop if called by kswapd */ 698 WARN_ON(!(current->flags & PF_MEMALLOC)); 699 ceph_put_snap_context(oldest); 700 redirty_page_for_writepage(wbc, page); 701 return 0; 702 } 703 ceph_put_snap_context(oldest); 704 705 /* is this a partial page at end of file? */ 706 if (page_off >= ceph_wbc.i_size) { 707 doutc(cl, "%llx.%llx folio at %lu beyond eof %llu\n", 708 ceph_vinop(inode), folio->index, ceph_wbc.i_size); 709 folio_invalidate(folio, 0, folio_size(folio)); 710 return 0; 711 } 712 713 if (ceph_wbc.i_size < page_off + len) 714 len = ceph_wbc.i_size - page_off; 715 716 wlen = IS_ENCRYPTED(inode) ? round_up(len, CEPH_FSCRYPT_BLOCK_SIZE) : len; 717 doutc(cl, "%llx.%llx page %p index %lu on %llu~%llu snapc %p seq %lld\n", 718 ceph_vinop(inode), page, page->index, page_off, wlen, snapc, 719 snapc->seq); 720 721 if (atomic_long_inc_return(&fsc->writeback_count) > 722 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 723 fsc->write_congested = true; 724 725 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), 726 page_off, &wlen, 0, 1, CEPH_OSD_OP_WRITE, 727 CEPH_OSD_FLAG_WRITE, snapc, 728 ceph_wbc.truncate_seq, 729 ceph_wbc.truncate_size, true); 730 if (IS_ERR(req)) { 731 redirty_page_for_writepage(wbc, page); 732 return PTR_ERR(req); 733 } 734 735 if (wlen < len) 736 len = wlen; 737 738 set_page_writeback(page); 739 if (caching) 740 ceph_set_page_fscache(page); 741 ceph_fscache_write_to_cache(inode, page_off, len, caching); 742 743 if (IS_ENCRYPTED(inode)) { 744 bounce_page = fscrypt_encrypt_pagecache_blocks(page, 745 CEPH_FSCRYPT_BLOCK_SIZE, 0, 746 GFP_NOFS); 747 if (IS_ERR(bounce_page)) { 748 redirty_page_for_writepage(wbc, page); 749 end_page_writeback(page); 750 ceph_osdc_put_request(req); 751 return PTR_ERR(bounce_page); 752 } 753 } 754 755 /* it may be a short write due to an object boundary */ 756 WARN_ON_ONCE(len > thp_size(page)); 757 osd_req_op_extent_osd_data_pages(req, 0, 758 bounce_page ? &bounce_page : &page, wlen, 0, 759 false, false); 760 doutc(cl, "%llx.%llx %llu~%llu (%llu bytes, %sencrypted)\n", 761 ceph_vinop(inode), page_off, len, wlen, 762 IS_ENCRYPTED(inode) ? "" : "not "); 763 764 req->r_mtime = inode_get_mtime(inode); 765 ceph_osdc_start_request(osdc, req); 766 err = ceph_osdc_wait_request(osdc, req); 767 768 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 769 req->r_end_latency, len, err); 770 fscrypt_free_bounce_page(bounce_page); 771 ceph_osdc_put_request(req); 772 if (err == 0) 773 err = len; 774 775 if (err < 0) { 776 struct writeback_control tmp_wbc; 777 if (!wbc) 778 wbc = &tmp_wbc; 779 if (err == -ERESTARTSYS) { 780 /* killed by SIGKILL */ 781 doutc(cl, "%llx.%llx interrupted page %p\n", 782 ceph_vinop(inode), page); 783 redirty_page_for_writepage(wbc, page); 784 end_page_writeback(page); 785 return err; 786 } 787 if (err == -EBLOCKLISTED) 788 fsc->blocklisted = true; 789 doutc(cl, "%llx.%llx setting page/mapping error %d %p\n", 790 ceph_vinop(inode), err, page); 791 mapping_set_error(&inode->i_data, err); 792 wbc->pages_skipped++; 793 } else { 794 doutc(cl, "%llx.%llx cleaned page %p\n", 795 ceph_vinop(inode), page); 796 err = 0; /* vfs expects us to return 0 */ 797 } 798 oldest = detach_page_private(page); 799 WARN_ON_ONCE(oldest != snapc); 800 end_page_writeback(page); 801 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 802 ceph_put_snap_context(snapc); /* page's reference */ 803 804 if (atomic_long_dec_return(&fsc->writeback_count) < 805 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 806 fsc->write_congested = false; 807 808 return err; 809 } 810 811 static int ceph_writepage(struct page *page, struct writeback_control *wbc) 812 { 813 int err; 814 struct inode *inode = page->mapping->host; 815 BUG_ON(!inode); 816 ihold(inode); 817 818 if (wbc->sync_mode == WB_SYNC_NONE && 819 ceph_inode_to_fs_client(inode)->write_congested) { 820 redirty_page_for_writepage(wbc, page); 821 return AOP_WRITEPAGE_ACTIVATE; 822 } 823 824 folio_wait_private_2(page_folio(page)); /* [DEPRECATED] */ 825 826 err = writepage_nounlock(page, wbc); 827 if (err == -ERESTARTSYS) { 828 /* direct memory reclaimer was killed by SIGKILL. return 0 829 * to prevent caller from setting mapping/page error */ 830 err = 0; 831 } 832 unlock_page(page); 833 iput(inode); 834 return err; 835 } 836 837 /* 838 * async writeback completion handler. 839 * 840 * If we get an error, set the mapping error bit, but not the individual 841 * page error bits. 842 */ 843 static void writepages_finish(struct ceph_osd_request *req) 844 { 845 struct inode *inode = req->r_inode; 846 struct ceph_inode_info *ci = ceph_inode(inode); 847 struct ceph_client *cl = ceph_inode_to_client(inode); 848 struct ceph_osd_data *osd_data; 849 struct page *page; 850 int num_pages, total_pages = 0; 851 int i, j; 852 int rc = req->r_result; 853 struct ceph_snap_context *snapc = req->r_snapc; 854 struct address_space *mapping = inode->i_mapping; 855 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 856 unsigned int len = 0; 857 bool remove_page; 858 859 doutc(cl, "%llx.%llx rc %d\n", ceph_vinop(inode), rc); 860 if (rc < 0) { 861 mapping_set_error(mapping, rc); 862 ceph_set_error_write(ci); 863 if (rc == -EBLOCKLISTED) 864 fsc->blocklisted = true; 865 } else { 866 ceph_clear_error_write(ci); 867 } 868 869 /* 870 * We lost the cache cap, need to truncate the page before 871 * it is unlocked, otherwise we'd truncate it later in the 872 * page truncation thread, possibly losing some data that 873 * raced its way in 874 */ 875 remove_page = !(ceph_caps_issued(ci) & 876 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 877 878 /* clean all pages */ 879 for (i = 0; i < req->r_num_ops; i++) { 880 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) { 881 pr_warn_client(cl, 882 "%llx.%llx incorrect op %d req %p index %d tid %llu\n", 883 ceph_vinop(inode), req->r_ops[i].op, req, i, 884 req->r_tid); 885 break; 886 } 887 888 osd_data = osd_req_op_extent_osd_data(req, i); 889 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 890 len += osd_data->length; 891 num_pages = calc_pages_for((u64)osd_data->alignment, 892 (u64)osd_data->length); 893 total_pages += num_pages; 894 for (j = 0; j < num_pages; j++) { 895 page = osd_data->pages[j]; 896 if (fscrypt_is_bounce_page(page)) { 897 page = fscrypt_pagecache_page(page); 898 fscrypt_free_bounce_page(osd_data->pages[j]); 899 osd_data->pages[j] = page; 900 } 901 BUG_ON(!page); 902 WARN_ON(!PageUptodate(page)); 903 904 if (atomic_long_dec_return(&fsc->writeback_count) < 905 CONGESTION_OFF_THRESH( 906 fsc->mount_options->congestion_kb)) 907 fsc->write_congested = false; 908 909 ceph_put_snap_context(detach_page_private(page)); 910 end_page_writeback(page); 911 doutc(cl, "unlocking %p\n", page); 912 913 if (remove_page) 914 generic_error_remove_folio(inode->i_mapping, 915 page_folio(page)); 916 917 unlock_page(page); 918 } 919 doutc(cl, "%llx.%llx wrote %llu bytes cleaned %d pages\n", 920 ceph_vinop(inode), osd_data->length, 921 rc >= 0 ? num_pages : 0); 922 923 release_pages(osd_data->pages, num_pages); 924 } 925 926 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 927 req->r_end_latency, len, rc); 928 929 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); 930 931 osd_data = osd_req_op_extent_osd_data(req, 0); 932 if (osd_data->pages_from_pool) 933 mempool_free(osd_data->pages, ceph_wb_pagevec_pool); 934 else 935 kfree(osd_data->pages); 936 ceph_osdc_put_request(req); 937 ceph_dec_osd_stopping_blocker(fsc->mdsc); 938 } 939 940 /* 941 * initiate async writeback 942 */ 943 static int ceph_writepages_start(struct address_space *mapping, 944 struct writeback_control *wbc) 945 { 946 struct inode *inode = mapping->host; 947 struct ceph_inode_info *ci = ceph_inode(inode); 948 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 949 struct ceph_client *cl = fsc->client; 950 struct ceph_vino vino = ceph_vino(inode); 951 pgoff_t index, start_index, end = -1; 952 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; 953 struct folio_batch fbatch; 954 int rc = 0; 955 unsigned int wsize = i_blocksize(inode); 956 struct ceph_osd_request *req = NULL; 957 struct ceph_writeback_ctl ceph_wbc; 958 bool should_loop, range_whole = false; 959 bool done = false; 960 bool caching = ceph_is_cache_enabled(inode); 961 xa_mark_t tag; 962 963 if (wbc->sync_mode == WB_SYNC_NONE && 964 fsc->write_congested) 965 return 0; 966 967 doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), 968 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 969 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 970 971 if (ceph_inode_is_shutdown(inode)) { 972 if (ci->i_wrbuffer_ref > 0) { 973 pr_warn_ratelimited_client(cl, 974 "%llx.%llx %lld forced umount\n", 975 ceph_vinop(inode), ceph_ino(inode)); 976 } 977 mapping_set_error(mapping, -EIO); 978 return -EIO; /* we're in a forced umount, don't write! */ 979 } 980 if (fsc->mount_options->wsize < wsize) 981 wsize = fsc->mount_options->wsize; 982 983 folio_batch_init(&fbatch); 984 985 start_index = wbc->range_cyclic ? mapping->writeback_index : 0; 986 index = start_index; 987 988 if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) { 989 tag = PAGECACHE_TAG_TOWRITE; 990 } else { 991 tag = PAGECACHE_TAG_DIRTY; 992 } 993 retry: 994 /* find oldest snap context with dirty data */ 995 snapc = get_oldest_context(inode, &ceph_wbc, NULL); 996 if (!snapc) { 997 /* hmm, why does writepages get called when there 998 is no dirty data? */ 999 doutc(cl, " no snap context with dirty data?\n"); 1000 goto out; 1001 } 1002 doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", snapc, 1003 snapc->seq, snapc->num_snaps); 1004 1005 should_loop = false; 1006 if (ceph_wbc.head_snapc && snapc != last_snapc) { 1007 /* where to start/end? */ 1008 if (wbc->range_cyclic) { 1009 index = start_index; 1010 end = -1; 1011 if (index > 0) 1012 should_loop = true; 1013 doutc(cl, " cyclic, start at %lu\n", index); 1014 } else { 1015 index = wbc->range_start >> PAGE_SHIFT; 1016 end = wbc->range_end >> PAGE_SHIFT; 1017 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) 1018 range_whole = true; 1019 doutc(cl, " not cyclic, %lu to %lu\n", index, end); 1020 } 1021 } else if (!ceph_wbc.head_snapc) { 1022 /* Do not respect wbc->range_{start,end}. Dirty pages 1023 * in that range can be associated with newer snapc. 1024 * They are not writeable until we write all dirty pages 1025 * associated with 'snapc' get written */ 1026 if (index > 0) 1027 should_loop = true; 1028 doutc(cl, " non-head snapc, range whole\n"); 1029 } 1030 1031 if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) 1032 tag_pages_for_writeback(mapping, index, end); 1033 1034 ceph_put_snap_context(last_snapc); 1035 last_snapc = snapc; 1036 1037 while (!done && index <= end) { 1038 int num_ops = 0, op_idx; 1039 unsigned i, nr_folios, max_pages, locked_pages = 0; 1040 struct page **pages = NULL, **data_pages; 1041 struct page *page; 1042 pgoff_t strip_unit_end = 0; 1043 u64 offset = 0, len = 0; 1044 bool from_pool = false; 1045 1046 max_pages = wsize >> PAGE_SHIFT; 1047 1048 get_more_pages: 1049 nr_folios = filemap_get_folios_tag(mapping, &index, 1050 end, tag, &fbatch); 1051 doutc(cl, "pagevec_lookup_range_tag got %d\n", nr_folios); 1052 if (!nr_folios && !locked_pages) 1053 break; 1054 for (i = 0; i < nr_folios && locked_pages < max_pages; i++) { 1055 page = &fbatch.folios[i]->page; 1056 doutc(cl, "? %p idx %lu\n", page, page->index); 1057 if (locked_pages == 0) 1058 lock_page(page); /* first page */ 1059 else if (!trylock_page(page)) 1060 break; 1061 1062 /* only dirty pages, or our accounting breaks */ 1063 if (unlikely(!PageDirty(page)) || 1064 unlikely(page->mapping != mapping)) { 1065 doutc(cl, "!dirty or !mapping %p\n", page); 1066 unlock_page(page); 1067 continue; 1068 } 1069 /* only if matching snap context */ 1070 pgsnapc = page_snap_context(page); 1071 if (pgsnapc != snapc) { 1072 doutc(cl, "page snapc %p %lld != oldest %p %lld\n", 1073 pgsnapc, pgsnapc->seq, snapc, snapc->seq); 1074 if (!should_loop && 1075 !ceph_wbc.head_snapc && 1076 wbc->sync_mode != WB_SYNC_NONE) 1077 should_loop = true; 1078 unlock_page(page); 1079 continue; 1080 } 1081 if (page_offset(page) >= ceph_wbc.i_size) { 1082 struct folio *folio = page_folio(page); 1083 1084 doutc(cl, "folio at %lu beyond eof %llu\n", 1085 folio->index, ceph_wbc.i_size); 1086 if ((ceph_wbc.size_stable || 1087 folio_pos(folio) >= i_size_read(inode)) && 1088 folio_clear_dirty_for_io(folio)) 1089 folio_invalidate(folio, 0, 1090 folio_size(folio)); 1091 folio_unlock(folio); 1092 continue; 1093 } 1094 if (strip_unit_end && (page->index > strip_unit_end)) { 1095 doutc(cl, "end of strip unit %p\n", page); 1096 unlock_page(page); 1097 break; 1098 } 1099 if (PageWriteback(page) || 1100 PagePrivate2(page) /* [DEPRECATED] */) { 1101 if (wbc->sync_mode == WB_SYNC_NONE) { 1102 doutc(cl, "%p under writeback\n", page); 1103 unlock_page(page); 1104 continue; 1105 } 1106 doutc(cl, "waiting on writeback %p\n", page); 1107 wait_on_page_writeback(page); 1108 folio_wait_private_2(page_folio(page)); /* [DEPRECATED] */ 1109 } 1110 1111 if (!clear_page_dirty_for_io(page)) { 1112 doutc(cl, "%p !clear_page_dirty_for_io\n", page); 1113 unlock_page(page); 1114 continue; 1115 } 1116 1117 /* 1118 * We have something to write. If this is 1119 * the first locked page this time through, 1120 * calculate max possinle write size and 1121 * allocate a page array 1122 */ 1123 if (locked_pages == 0) { 1124 u64 objnum; 1125 u64 objoff; 1126 u32 xlen; 1127 1128 /* prepare async write request */ 1129 offset = (u64)page_offset(page); 1130 ceph_calc_file_object_mapping(&ci->i_layout, 1131 offset, wsize, 1132 &objnum, &objoff, 1133 &xlen); 1134 len = xlen; 1135 1136 num_ops = 1; 1137 strip_unit_end = page->index + 1138 ((len - 1) >> PAGE_SHIFT); 1139 1140 BUG_ON(pages); 1141 max_pages = calc_pages_for(0, (u64)len); 1142 pages = kmalloc_array(max_pages, 1143 sizeof(*pages), 1144 GFP_NOFS); 1145 if (!pages) { 1146 from_pool = true; 1147 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); 1148 BUG_ON(!pages); 1149 } 1150 1151 len = 0; 1152 } else if (page->index != 1153 (offset + len) >> PAGE_SHIFT) { 1154 if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS : 1155 CEPH_OSD_MAX_OPS)) { 1156 redirty_page_for_writepage(wbc, page); 1157 unlock_page(page); 1158 break; 1159 } 1160 1161 num_ops++; 1162 offset = (u64)page_offset(page); 1163 len = 0; 1164 } 1165 1166 /* note position of first page in fbatch */ 1167 doutc(cl, "%llx.%llx will write page %p idx %lu\n", 1168 ceph_vinop(inode), page, page->index); 1169 1170 if (atomic_long_inc_return(&fsc->writeback_count) > 1171 CONGESTION_ON_THRESH( 1172 fsc->mount_options->congestion_kb)) 1173 fsc->write_congested = true; 1174 1175 if (IS_ENCRYPTED(inode)) { 1176 pages[locked_pages] = 1177 fscrypt_encrypt_pagecache_blocks(page, 1178 PAGE_SIZE, 0, 1179 locked_pages ? GFP_NOWAIT : GFP_NOFS); 1180 if (IS_ERR(pages[locked_pages])) { 1181 if (PTR_ERR(pages[locked_pages]) == -EINVAL) 1182 pr_err_client(cl, 1183 "inode->i_blkbits=%hhu\n", 1184 inode->i_blkbits); 1185 /* better not fail on first page! */ 1186 BUG_ON(locked_pages == 0); 1187 pages[locked_pages] = NULL; 1188 redirty_page_for_writepage(wbc, page); 1189 unlock_page(page); 1190 break; 1191 } 1192 ++locked_pages; 1193 } else { 1194 pages[locked_pages++] = page; 1195 } 1196 1197 fbatch.folios[i] = NULL; 1198 len += thp_size(page); 1199 } 1200 1201 /* did we get anything? */ 1202 if (!locked_pages) 1203 goto release_folios; 1204 if (i) { 1205 unsigned j, n = 0; 1206 /* shift unused page to beginning of fbatch */ 1207 for (j = 0; j < nr_folios; j++) { 1208 if (!fbatch.folios[j]) 1209 continue; 1210 if (n < j) 1211 fbatch.folios[n] = fbatch.folios[j]; 1212 n++; 1213 } 1214 fbatch.nr = n; 1215 1216 if (nr_folios && i == nr_folios && 1217 locked_pages < max_pages) { 1218 doutc(cl, "reached end fbatch, trying for more\n"); 1219 folio_batch_release(&fbatch); 1220 goto get_more_pages; 1221 } 1222 } 1223 1224 new_request: 1225 offset = ceph_fscrypt_page_offset(pages[0]); 1226 len = wsize; 1227 1228 req = ceph_osdc_new_request(&fsc->client->osdc, 1229 &ci->i_layout, vino, 1230 offset, &len, 0, num_ops, 1231 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 1232 snapc, ceph_wbc.truncate_seq, 1233 ceph_wbc.truncate_size, false); 1234 if (IS_ERR(req)) { 1235 req = ceph_osdc_new_request(&fsc->client->osdc, 1236 &ci->i_layout, vino, 1237 offset, &len, 0, 1238 min(num_ops, 1239 CEPH_OSD_SLAB_OPS), 1240 CEPH_OSD_OP_WRITE, 1241 CEPH_OSD_FLAG_WRITE, 1242 snapc, ceph_wbc.truncate_seq, 1243 ceph_wbc.truncate_size, true); 1244 BUG_ON(IS_ERR(req)); 1245 } 1246 BUG_ON(len < ceph_fscrypt_page_offset(pages[locked_pages - 1]) + 1247 thp_size(pages[locked_pages - 1]) - offset); 1248 1249 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 1250 rc = -EIO; 1251 goto release_folios; 1252 } 1253 req->r_callback = writepages_finish; 1254 req->r_inode = inode; 1255 1256 /* Format the osd request message and submit the write */ 1257 len = 0; 1258 data_pages = pages; 1259 op_idx = 0; 1260 for (i = 0; i < locked_pages; i++) { 1261 struct page *page = ceph_fscrypt_pagecache_page(pages[i]); 1262 1263 u64 cur_offset = page_offset(page); 1264 /* 1265 * Discontinuity in page range? Ceph can handle that by just passing 1266 * multiple extents in the write op. 1267 */ 1268 if (offset + len != cur_offset) { 1269 /* If it's full, stop here */ 1270 if (op_idx + 1 == req->r_num_ops) 1271 break; 1272 1273 /* Kick off an fscache write with what we have so far. */ 1274 ceph_fscache_write_to_cache(inode, offset, len, caching); 1275 1276 /* Start a new extent */ 1277 osd_req_op_extent_dup_last(req, op_idx, 1278 cur_offset - offset); 1279 doutc(cl, "got pages at %llu~%llu\n", offset, 1280 len); 1281 osd_req_op_extent_osd_data_pages(req, op_idx, 1282 data_pages, len, 0, 1283 from_pool, false); 1284 osd_req_op_extent_update(req, op_idx, len); 1285 1286 len = 0; 1287 offset = cur_offset; 1288 data_pages = pages + i; 1289 op_idx++; 1290 } 1291 1292 set_page_writeback(page); 1293 if (caching) 1294 ceph_set_page_fscache(page); 1295 len += thp_size(page); 1296 } 1297 ceph_fscache_write_to_cache(inode, offset, len, caching); 1298 1299 if (ceph_wbc.size_stable) { 1300 len = min(len, ceph_wbc.i_size - offset); 1301 } else if (i == locked_pages) { 1302 /* writepages_finish() clears writeback pages 1303 * according to the data length, so make sure 1304 * data length covers all locked pages */ 1305 u64 min_len = len + 1 - thp_size(page); 1306 len = get_writepages_data_length(inode, pages[i - 1], 1307 offset); 1308 len = max(len, min_len); 1309 } 1310 if (IS_ENCRYPTED(inode)) 1311 len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); 1312 1313 doutc(cl, "got pages at %llu~%llu\n", offset, len); 1314 1315 if (IS_ENCRYPTED(inode) && 1316 ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) 1317 pr_warn_client(cl, 1318 "bad encrypted write offset=%lld len=%llu\n", 1319 offset, len); 1320 1321 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, 1322 0, from_pool, false); 1323 osd_req_op_extent_update(req, op_idx, len); 1324 1325 BUG_ON(op_idx + 1 != req->r_num_ops); 1326 1327 from_pool = false; 1328 if (i < locked_pages) { 1329 BUG_ON(num_ops <= req->r_num_ops); 1330 num_ops -= req->r_num_ops; 1331 locked_pages -= i; 1332 1333 /* allocate new pages array for next request */ 1334 data_pages = pages; 1335 pages = kmalloc_array(locked_pages, sizeof(*pages), 1336 GFP_NOFS); 1337 if (!pages) { 1338 from_pool = true; 1339 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); 1340 BUG_ON(!pages); 1341 } 1342 memcpy(pages, data_pages + i, 1343 locked_pages * sizeof(*pages)); 1344 memset(data_pages + i, 0, 1345 locked_pages * sizeof(*pages)); 1346 } else { 1347 BUG_ON(num_ops != req->r_num_ops); 1348 index = pages[i - 1]->index + 1; 1349 /* request message now owns the pages array */ 1350 pages = NULL; 1351 } 1352 1353 req->r_mtime = inode_get_mtime(inode); 1354 ceph_osdc_start_request(&fsc->client->osdc, req); 1355 req = NULL; 1356 1357 wbc->nr_to_write -= i; 1358 if (pages) 1359 goto new_request; 1360 1361 /* 1362 * We stop writing back only if we are not doing 1363 * integrity sync. In case of integrity sync we have to 1364 * keep going until we have written all the pages 1365 * we tagged for writeback prior to entering this loop. 1366 */ 1367 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) 1368 done = true; 1369 1370 release_folios: 1371 doutc(cl, "folio_batch release on %d folios (%p)\n", 1372 (int)fbatch.nr, fbatch.nr ? fbatch.folios[0] : NULL); 1373 folio_batch_release(&fbatch); 1374 } 1375 1376 if (should_loop && !done) { 1377 /* more to do; loop back to beginning of file */ 1378 doutc(cl, "looping back to beginning of file\n"); 1379 end = start_index - 1; /* OK even when start_index == 0 */ 1380 1381 /* to write dirty pages associated with next snapc, 1382 * we need to wait until current writes complete */ 1383 if (wbc->sync_mode != WB_SYNC_NONE && 1384 start_index == 0 && /* all dirty pages were checked */ 1385 !ceph_wbc.head_snapc) { 1386 struct page *page; 1387 unsigned i, nr; 1388 index = 0; 1389 while ((index <= end) && 1390 (nr = filemap_get_folios_tag(mapping, &index, 1391 (pgoff_t)-1, 1392 PAGECACHE_TAG_WRITEBACK, 1393 &fbatch))) { 1394 for (i = 0; i < nr; i++) { 1395 page = &fbatch.folios[i]->page; 1396 if (page_snap_context(page) != snapc) 1397 continue; 1398 wait_on_page_writeback(page); 1399 } 1400 folio_batch_release(&fbatch); 1401 cond_resched(); 1402 } 1403 } 1404 1405 start_index = 0; 1406 index = 0; 1407 goto retry; 1408 } 1409 1410 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) 1411 mapping->writeback_index = index; 1412 1413 out: 1414 ceph_osdc_put_request(req); 1415 ceph_put_snap_context(last_snapc); 1416 doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), 1417 rc); 1418 return rc; 1419 } 1420 1421 1422 1423 /* 1424 * See if a given @snapc is either writeable, or already written. 1425 */ 1426 static int context_is_writeable_or_written(struct inode *inode, 1427 struct ceph_snap_context *snapc) 1428 { 1429 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL); 1430 int ret = !oldest || snapc->seq <= oldest->seq; 1431 1432 ceph_put_snap_context(oldest); 1433 return ret; 1434 } 1435 1436 /** 1437 * ceph_find_incompatible - find an incompatible context and return it 1438 * @page: page being dirtied 1439 * 1440 * We are only allowed to write into/dirty a page if the page is 1441 * clean, or already dirty within the same snap context. Returns a 1442 * conflicting context if there is one, NULL if there isn't, or a 1443 * negative error code on other errors. 1444 * 1445 * Must be called with page lock held. 1446 */ 1447 static struct ceph_snap_context * 1448 ceph_find_incompatible(struct page *page) 1449 { 1450 struct inode *inode = page->mapping->host; 1451 struct ceph_client *cl = ceph_inode_to_client(inode); 1452 struct ceph_inode_info *ci = ceph_inode(inode); 1453 1454 if (ceph_inode_is_shutdown(inode)) { 1455 doutc(cl, " %llx.%llx page %p is shutdown\n", 1456 ceph_vinop(inode), page); 1457 return ERR_PTR(-ESTALE); 1458 } 1459 1460 for (;;) { 1461 struct ceph_snap_context *snapc, *oldest; 1462 1463 wait_on_page_writeback(page); 1464 1465 snapc = page_snap_context(page); 1466 if (!snapc || snapc == ci->i_head_snapc) 1467 break; 1468 1469 /* 1470 * this page is already dirty in another (older) snap 1471 * context! is it writeable now? 1472 */ 1473 oldest = get_oldest_context(inode, NULL, NULL); 1474 if (snapc->seq > oldest->seq) { 1475 /* not writeable -- return it for the caller to deal with */ 1476 ceph_put_snap_context(oldest); 1477 doutc(cl, " %llx.%llx page %p snapc %p not current or oldest\n", 1478 ceph_vinop(inode), page, snapc); 1479 return ceph_get_snap_context(snapc); 1480 } 1481 ceph_put_snap_context(oldest); 1482 1483 /* yay, writeable, do it now (without dropping page lock) */ 1484 doutc(cl, " %llx.%llx page %p snapc %p not current, but oldest\n", 1485 ceph_vinop(inode), page, snapc); 1486 if (clear_page_dirty_for_io(page)) { 1487 int r = writepage_nounlock(page, NULL); 1488 if (r < 0) 1489 return ERR_PTR(r); 1490 } 1491 } 1492 return NULL; 1493 } 1494 1495 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 1496 struct folio **foliop, void **_fsdata) 1497 { 1498 struct inode *inode = file_inode(file); 1499 struct ceph_inode_info *ci = ceph_inode(inode); 1500 struct ceph_snap_context *snapc; 1501 1502 snapc = ceph_find_incompatible(folio_page(*foliop, 0)); 1503 if (snapc) { 1504 int r; 1505 1506 folio_unlock(*foliop); 1507 folio_put(*foliop); 1508 *foliop = NULL; 1509 if (IS_ERR(snapc)) 1510 return PTR_ERR(snapc); 1511 1512 ceph_queue_writeback(inode); 1513 r = wait_event_killable(ci->i_cap_wq, 1514 context_is_writeable_or_written(inode, snapc)); 1515 ceph_put_snap_context(snapc); 1516 return r == 0 ? -EAGAIN : r; 1517 } 1518 return 0; 1519 } 1520 1521 /* 1522 * We are only allowed to write into/dirty the page if the page is 1523 * clean, or already dirty within the same snap context. 1524 */ 1525 static int ceph_write_begin(struct file *file, struct address_space *mapping, 1526 loff_t pos, unsigned len, 1527 struct folio **foliop, void **fsdata) 1528 { 1529 struct inode *inode = file_inode(file); 1530 struct ceph_inode_info *ci = ceph_inode(inode); 1531 int r; 1532 1533 r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, foliop, NULL); 1534 if (r < 0) 1535 return r; 1536 1537 folio_wait_private_2(*foliop); /* [DEPRECATED] */ 1538 WARN_ON_ONCE(!folio_test_locked(*foliop)); 1539 return 0; 1540 } 1541 1542 /* 1543 * we don't do anything in here that simple_write_end doesn't do 1544 * except adjust dirty page accounting 1545 */ 1546 static int ceph_write_end(struct file *file, struct address_space *mapping, 1547 loff_t pos, unsigned len, unsigned copied, 1548 struct folio *folio, void *fsdata) 1549 { 1550 struct inode *inode = file_inode(file); 1551 struct ceph_client *cl = ceph_inode_to_client(inode); 1552 bool check_cap = false; 1553 1554 doutc(cl, "%llx.%llx file %p folio %p %d~%d (%d)\n", ceph_vinop(inode), 1555 file, folio, (int)pos, (int)copied, (int)len); 1556 1557 if (!folio_test_uptodate(folio)) { 1558 /* just return that nothing was copied on a short copy */ 1559 if (copied < len) { 1560 copied = 0; 1561 goto out; 1562 } 1563 folio_mark_uptodate(folio); 1564 } 1565 1566 /* did file size increase? */ 1567 if (pos+copied > i_size_read(inode)) 1568 check_cap = ceph_inode_set_size(inode, pos+copied); 1569 1570 folio_mark_dirty(folio); 1571 1572 out: 1573 folio_unlock(folio); 1574 folio_put(folio); 1575 1576 if (check_cap) 1577 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY); 1578 1579 return copied; 1580 } 1581 1582 const struct address_space_operations ceph_aops = { 1583 .read_folio = netfs_read_folio, 1584 .readahead = netfs_readahead, 1585 .writepage = ceph_writepage, 1586 .writepages = ceph_writepages_start, 1587 .write_begin = ceph_write_begin, 1588 .write_end = ceph_write_end, 1589 .dirty_folio = ceph_dirty_folio, 1590 .invalidate_folio = ceph_invalidate_folio, 1591 .release_folio = netfs_release_folio, 1592 .direct_IO = noop_direct_IO, 1593 }; 1594 1595 static void ceph_block_sigs(sigset_t *oldset) 1596 { 1597 sigset_t mask; 1598 siginitsetinv(&mask, sigmask(SIGKILL)); 1599 sigprocmask(SIG_BLOCK, &mask, oldset); 1600 } 1601 1602 static void ceph_restore_sigs(sigset_t *oldset) 1603 { 1604 sigprocmask(SIG_SETMASK, oldset, NULL); 1605 } 1606 1607 /* 1608 * vm ops 1609 */ 1610 static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf) 1611 { 1612 struct vm_area_struct *vma = vmf->vma; 1613 struct inode *inode = file_inode(vma->vm_file); 1614 struct ceph_inode_info *ci = ceph_inode(inode); 1615 struct ceph_client *cl = ceph_inode_to_client(inode); 1616 struct ceph_file_info *fi = vma->vm_file->private_data; 1617 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT; 1618 int want, got, err; 1619 sigset_t oldset; 1620 vm_fault_t ret = VM_FAULT_SIGBUS; 1621 1622 if (ceph_inode_is_shutdown(inode)) 1623 return ret; 1624 1625 ceph_block_sigs(&oldset); 1626 1627 doutc(cl, "%llx.%llx %llu trying to get caps\n", 1628 ceph_vinop(inode), off); 1629 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1630 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1631 else 1632 want = CEPH_CAP_FILE_CACHE; 1633 1634 got = 0; 1635 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got); 1636 if (err < 0) 1637 goto out_restore; 1638 1639 doutc(cl, "%llx.%llx %llu got cap refs on %s\n", ceph_vinop(inode), 1640 off, ceph_cap_string(got)); 1641 1642 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || 1643 !ceph_has_inline_data(ci)) { 1644 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); 1645 ceph_add_rw_context(fi, &rw_ctx); 1646 ret = filemap_fault(vmf); 1647 ceph_del_rw_context(fi, &rw_ctx); 1648 doutc(cl, "%llx.%llx %llu drop cap refs %s ret %x\n", 1649 ceph_vinop(inode), off, ceph_cap_string(got), ret); 1650 } else 1651 err = -EAGAIN; 1652 1653 ceph_put_cap_refs(ci, got); 1654 1655 if (err != -EAGAIN) 1656 goto out_restore; 1657 1658 /* read inline data */ 1659 if (off >= PAGE_SIZE) { 1660 /* does not support inline data > PAGE_SIZE */ 1661 ret = VM_FAULT_SIGBUS; 1662 } else { 1663 struct address_space *mapping = inode->i_mapping; 1664 struct page *page; 1665 1666 filemap_invalidate_lock_shared(mapping); 1667 page = find_or_create_page(mapping, 0, 1668 mapping_gfp_constraint(mapping, ~__GFP_FS)); 1669 if (!page) { 1670 ret = VM_FAULT_OOM; 1671 goto out_inline; 1672 } 1673 err = __ceph_do_getattr(inode, page, 1674 CEPH_STAT_CAP_INLINE_DATA, true); 1675 if (err < 0 || off >= i_size_read(inode)) { 1676 unlock_page(page); 1677 put_page(page); 1678 ret = vmf_error(err); 1679 goto out_inline; 1680 } 1681 if (err < PAGE_SIZE) 1682 zero_user_segment(page, err, PAGE_SIZE); 1683 else 1684 flush_dcache_page(page); 1685 SetPageUptodate(page); 1686 vmf->page = page; 1687 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; 1688 out_inline: 1689 filemap_invalidate_unlock_shared(mapping); 1690 doutc(cl, "%llx.%llx %llu read inline data ret %x\n", 1691 ceph_vinop(inode), off, ret); 1692 } 1693 out_restore: 1694 ceph_restore_sigs(&oldset); 1695 if (err < 0) 1696 ret = vmf_error(err); 1697 1698 return ret; 1699 } 1700 1701 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) 1702 { 1703 struct vm_area_struct *vma = vmf->vma; 1704 struct inode *inode = file_inode(vma->vm_file); 1705 struct ceph_client *cl = ceph_inode_to_client(inode); 1706 struct ceph_inode_info *ci = ceph_inode(inode); 1707 struct ceph_file_info *fi = vma->vm_file->private_data; 1708 struct ceph_cap_flush *prealloc_cf; 1709 struct page *page = vmf->page; 1710 loff_t off = page_offset(page); 1711 loff_t size = i_size_read(inode); 1712 size_t len; 1713 int want, got, err; 1714 sigset_t oldset; 1715 vm_fault_t ret = VM_FAULT_SIGBUS; 1716 1717 if (ceph_inode_is_shutdown(inode)) 1718 return ret; 1719 1720 prealloc_cf = ceph_alloc_cap_flush(); 1721 if (!prealloc_cf) 1722 return VM_FAULT_OOM; 1723 1724 sb_start_pagefault(inode->i_sb); 1725 ceph_block_sigs(&oldset); 1726 1727 if (off + thp_size(page) <= size) 1728 len = thp_size(page); 1729 else 1730 len = offset_in_thp(page, size); 1731 1732 doutc(cl, "%llx.%llx %llu~%zd getting caps i_size %llu\n", 1733 ceph_vinop(inode), off, len, size); 1734 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1735 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1736 else 1737 want = CEPH_CAP_FILE_BUFFER; 1738 1739 got = 0; 1740 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got); 1741 if (err < 0) 1742 goto out_free; 1743 1744 doutc(cl, "%llx.%llx %llu~%zd got cap refs on %s\n", ceph_vinop(inode), 1745 off, len, ceph_cap_string(got)); 1746 1747 /* Update time before taking page lock */ 1748 file_update_time(vma->vm_file); 1749 inode_inc_iversion_raw(inode); 1750 1751 do { 1752 struct ceph_snap_context *snapc; 1753 1754 lock_page(page); 1755 1756 if (page_mkwrite_check_truncate(page, inode) < 0) { 1757 unlock_page(page); 1758 ret = VM_FAULT_NOPAGE; 1759 break; 1760 } 1761 1762 snapc = ceph_find_incompatible(page); 1763 if (!snapc) { 1764 /* success. we'll keep the page locked. */ 1765 set_page_dirty(page); 1766 ret = VM_FAULT_LOCKED; 1767 break; 1768 } 1769 1770 unlock_page(page); 1771 1772 if (IS_ERR(snapc)) { 1773 ret = VM_FAULT_SIGBUS; 1774 break; 1775 } 1776 1777 ceph_queue_writeback(inode); 1778 err = wait_event_killable(ci->i_cap_wq, 1779 context_is_writeable_or_written(inode, snapc)); 1780 ceph_put_snap_context(snapc); 1781 } while (err == 0); 1782 1783 if (ret == VM_FAULT_LOCKED) { 1784 int dirty; 1785 spin_lock(&ci->i_ceph_lock); 1786 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 1787 &prealloc_cf); 1788 spin_unlock(&ci->i_ceph_lock); 1789 if (dirty) 1790 __mark_inode_dirty(inode, dirty); 1791 } 1792 1793 doutc(cl, "%llx.%llx %llu~%zd dropping cap refs on %s ret %x\n", 1794 ceph_vinop(inode), off, len, ceph_cap_string(got), ret); 1795 ceph_put_cap_refs_async(ci, got); 1796 out_free: 1797 ceph_restore_sigs(&oldset); 1798 sb_end_pagefault(inode->i_sb); 1799 ceph_free_cap_flush(prealloc_cf); 1800 if (err < 0) 1801 ret = vmf_error(err); 1802 return ret; 1803 } 1804 1805 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, 1806 char *data, size_t len) 1807 { 1808 struct ceph_client *cl = ceph_inode_to_client(inode); 1809 struct address_space *mapping = inode->i_mapping; 1810 struct page *page; 1811 1812 if (locked_page) { 1813 page = locked_page; 1814 } else { 1815 if (i_size_read(inode) == 0) 1816 return; 1817 page = find_or_create_page(mapping, 0, 1818 mapping_gfp_constraint(mapping, 1819 ~__GFP_FS)); 1820 if (!page) 1821 return; 1822 if (PageUptodate(page)) { 1823 unlock_page(page); 1824 put_page(page); 1825 return; 1826 } 1827 } 1828 1829 doutc(cl, "%p %llx.%llx len %zu locked_page %p\n", inode, 1830 ceph_vinop(inode), len, locked_page); 1831 1832 if (len > 0) { 1833 void *kaddr = kmap_atomic(page); 1834 memcpy(kaddr, data, len); 1835 kunmap_atomic(kaddr); 1836 } 1837 1838 if (page != locked_page) { 1839 if (len < PAGE_SIZE) 1840 zero_user_segment(page, len, PAGE_SIZE); 1841 else 1842 flush_dcache_page(page); 1843 1844 SetPageUptodate(page); 1845 unlock_page(page); 1846 put_page(page); 1847 } 1848 } 1849 1850 int ceph_uninline_data(struct file *file) 1851 { 1852 struct inode *inode = file_inode(file); 1853 struct ceph_inode_info *ci = ceph_inode(inode); 1854 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1855 struct ceph_client *cl = fsc->client; 1856 struct ceph_osd_request *req = NULL; 1857 struct ceph_cap_flush *prealloc_cf = NULL; 1858 struct folio *folio = NULL; 1859 u64 inline_version = CEPH_INLINE_NONE; 1860 struct page *pages[1]; 1861 int err = 0; 1862 u64 len; 1863 1864 spin_lock(&ci->i_ceph_lock); 1865 inline_version = ci->i_inline_version; 1866 spin_unlock(&ci->i_ceph_lock); 1867 1868 doutc(cl, "%llx.%llx inline_version %llu\n", ceph_vinop(inode), 1869 inline_version); 1870 1871 if (ceph_inode_is_shutdown(inode)) { 1872 err = -EIO; 1873 goto out; 1874 } 1875 1876 if (inline_version == CEPH_INLINE_NONE) 1877 return 0; 1878 1879 prealloc_cf = ceph_alloc_cap_flush(); 1880 if (!prealloc_cf) 1881 return -ENOMEM; 1882 1883 if (inline_version == 1) /* initial version, no data */ 1884 goto out_uninline; 1885 1886 folio = read_mapping_folio(inode->i_mapping, 0, file); 1887 if (IS_ERR(folio)) { 1888 err = PTR_ERR(folio); 1889 goto out; 1890 } 1891 1892 folio_lock(folio); 1893 1894 len = i_size_read(inode); 1895 if (len > folio_size(folio)) 1896 len = folio_size(folio); 1897 1898 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1899 ceph_vino(inode), 0, &len, 0, 1, 1900 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, 1901 NULL, 0, 0, false); 1902 if (IS_ERR(req)) { 1903 err = PTR_ERR(req); 1904 goto out_unlock; 1905 } 1906 1907 req->r_mtime = inode_get_mtime(inode); 1908 ceph_osdc_start_request(&fsc->client->osdc, req); 1909 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1910 ceph_osdc_put_request(req); 1911 if (err < 0) 1912 goto out_unlock; 1913 1914 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1915 ceph_vino(inode), 0, &len, 1, 3, 1916 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 1917 NULL, ci->i_truncate_seq, 1918 ci->i_truncate_size, false); 1919 if (IS_ERR(req)) { 1920 err = PTR_ERR(req); 1921 goto out_unlock; 1922 } 1923 1924 pages[0] = folio_page(folio, 0); 1925 osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false); 1926 1927 { 1928 __le64 xattr_buf = cpu_to_le64(inline_version); 1929 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, 1930 "inline_version", &xattr_buf, 1931 sizeof(xattr_buf), 1932 CEPH_OSD_CMPXATTR_OP_GT, 1933 CEPH_OSD_CMPXATTR_MODE_U64); 1934 if (err) 1935 goto out_put_req; 1936 } 1937 1938 { 1939 char xattr_buf[32]; 1940 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), 1941 "%llu", inline_version); 1942 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, 1943 "inline_version", 1944 xattr_buf, xattr_len, 0, 0); 1945 if (err) 1946 goto out_put_req; 1947 } 1948 1949 req->r_mtime = inode_get_mtime(inode); 1950 ceph_osdc_start_request(&fsc->client->osdc, req); 1951 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1952 1953 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 1954 req->r_end_latency, len, err); 1955 1956 out_uninline: 1957 if (!err) { 1958 int dirty; 1959 1960 /* Set to CAP_INLINE_NONE and dirty the caps */ 1961 down_read(&fsc->mdsc->snap_rwsem); 1962 spin_lock(&ci->i_ceph_lock); 1963 ci->i_inline_version = CEPH_INLINE_NONE; 1964 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); 1965 spin_unlock(&ci->i_ceph_lock); 1966 up_read(&fsc->mdsc->snap_rwsem); 1967 if (dirty) 1968 __mark_inode_dirty(inode, dirty); 1969 } 1970 out_put_req: 1971 ceph_osdc_put_request(req); 1972 if (err == -ECANCELED) 1973 err = 0; 1974 out_unlock: 1975 if (folio) { 1976 folio_unlock(folio); 1977 folio_put(folio); 1978 } 1979 out: 1980 ceph_free_cap_flush(prealloc_cf); 1981 doutc(cl, "%llx.%llx inline_version %llu = %d\n", 1982 ceph_vinop(inode), inline_version, err); 1983 return err; 1984 } 1985 1986 static const struct vm_operations_struct ceph_vmops = { 1987 .fault = ceph_filemap_fault, 1988 .page_mkwrite = ceph_page_mkwrite, 1989 }; 1990 1991 int ceph_mmap(struct file *file, struct vm_area_struct *vma) 1992 { 1993 struct address_space *mapping = file->f_mapping; 1994 1995 if (!mapping->a_ops->read_folio) 1996 return -ENOEXEC; 1997 vma->vm_ops = &ceph_vmops; 1998 return 0; 1999 } 2000 2001 enum { 2002 POOL_READ = 1, 2003 POOL_WRITE = 2, 2004 }; 2005 2006 static int __ceph_pool_perm_get(struct ceph_inode_info *ci, 2007 s64 pool, struct ceph_string *pool_ns) 2008 { 2009 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(&ci->netfs.inode); 2010 struct ceph_mds_client *mdsc = fsc->mdsc; 2011 struct ceph_client *cl = fsc->client; 2012 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; 2013 struct rb_node **p, *parent; 2014 struct ceph_pool_perm *perm; 2015 struct page **pages; 2016 size_t pool_ns_len; 2017 int err = 0, err2 = 0, have = 0; 2018 2019 down_read(&mdsc->pool_perm_rwsem); 2020 p = &mdsc->pool_perm_tree.rb_node; 2021 while (*p) { 2022 perm = rb_entry(*p, struct ceph_pool_perm, node); 2023 if (pool < perm->pool) 2024 p = &(*p)->rb_left; 2025 else if (pool > perm->pool) 2026 p = &(*p)->rb_right; 2027 else { 2028 int ret = ceph_compare_string(pool_ns, 2029 perm->pool_ns, 2030 perm->pool_ns_len); 2031 if (ret < 0) 2032 p = &(*p)->rb_left; 2033 else if (ret > 0) 2034 p = &(*p)->rb_right; 2035 else { 2036 have = perm->perm; 2037 break; 2038 } 2039 } 2040 } 2041 up_read(&mdsc->pool_perm_rwsem); 2042 if (*p) 2043 goto out; 2044 2045 if (pool_ns) 2046 doutc(cl, "pool %lld ns %.*s no perm cached\n", pool, 2047 (int)pool_ns->len, pool_ns->str); 2048 else 2049 doutc(cl, "pool %lld no perm cached\n", pool); 2050 2051 down_write(&mdsc->pool_perm_rwsem); 2052 p = &mdsc->pool_perm_tree.rb_node; 2053 parent = NULL; 2054 while (*p) { 2055 parent = *p; 2056 perm = rb_entry(parent, struct ceph_pool_perm, node); 2057 if (pool < perm->pool) 2058 p = &(*p)->rb_left; 2059 else if (pool > perm->pool) 2060 p = &(*p)->rb_right; 2061 else { 2062 int ret = ceph_compare_string(pool_ns, 2063 perm->pool_ns, 2064 perm->pool_ns_len); 2065 if (ret < 0) 2066 p = &(*p)->rb_left; 2067 else if (ret > 0) 2068 p = &(*p)->rb_right; 2069 else { 2070 have = perm->perm; 2071 break; 2072 } 2073 } 2074 } 2075 if (*p) { 2076 up_write(&mdsc->pool_perm_rwsem); 2077 goto out; 2078 } 2079 2080 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2081 1, false, GFP_NOFS); 2082 if (!rd_req) { 2083 err = -ENOMEM; 2084 goto out_unlock; 2085 } 2086 2087 rd_req->r_flags = CEPH_OSD_FLAG_READ; 2088 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); 2089 rd_req->r_base_oloc.pool = pool; 2090 if (pool_ns) 2091 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns); 2092 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); 2093 2094 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); 2095 if (err) 2096 goto out_unlock; 2097 2098 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2099 1, false, GFP_NOFS); 2100 if (!wr_req) { 2101 err = -ENOMEM; 2102 goto out_unlock; 2103 } 2104 2105 wr_req->r_flags = CEPH_OSD_FLAG_WRITE; 2106 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 2107 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); 2108 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); 2109 2110 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); 2111 if (err) 2112 goto out_unlock; 2113 2114 /* one page should be large enough for STAT data */ 2115 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 2116 if (IS_ERR(pages)) { 2117 err = PTR_ERR(pages); 2118 goto out_unlock; 2119 } 2120 2121 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 2122 0, false, true); 2123 ceph_osdc_start_request(&fsc->client->osdc, rd_req); 2124 2125 wr_req->r_mtime = inode_get_mtime(&ci->netfs.inode); 2126 ceph_osdc_start_request(&fsc->client->osdc, wr_req); 2127 2128 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); 2129 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); 2130 2131 if (err >= 0 || err == -ENOENT) 2132 have |= POOL_READ; 2133 else if (err != -EPERM) { 2134 if (err == -EBLOCKLISTED) 2135 fsc->blocklisted = true; 2136 goto out_unlock; 2137 } 2138 2139 if (err2 == 0 || err2 == -EEXIST) 2140 have |= POOL_WRITE; 2141 else if (err2 != -EPERM) { 2142 if (err2 == -EBLOCKLISTED) 2143 fsc->blocklisted = true; 2144 err = err2; 2145 goto out_unlock; 2146 } 2147 2148 pool_ns_len = pool_ns ? pool_ns->len : 0; 2149 perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS); 2150 if (!perm) { 2151 err = -ENOMEM; 2152 goto out_unlock; 2153 } 2154 2155 perm->pool = pool; 2156 perm->perm = have; 2157 perm->pool_ns_len = pool_ns_len; 2158 if (pool_ns_len > 0) 2159 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len); 2160 perm->pool_ns[pool_ns_len] = 0; 2161 2162 rb_link_node(&perm->node, parent, p); 2163 rb_insert_color(&perm->node, &mdsc->pool_perm_tree); 2164 err = 0; 2165 out_unlock: 2166 up_write(&mdsc->pool_perm_rwsem); 2167 2168 ceph_osdc_put_request(rd_req); 2169 ceph_osdc_put_request(wr_req); 2170 out: 2171 if (!err) 2172 err = have; 2173 if (pool_ns) 2174 doutc(cl, "pool %lld ns %.*s result = %d\n", pool, 2175 (int)pool_ns->len, pool_ns->str, err); 2176 else 2177 doutc(cl, "pool %lld result = %d\n", pool, err); 2178 return err; 2179 } 2180 2181 int ceph_pool_perm_check(struct inode *inode, int need) 2182 { 2183 struct ceph_client *cl = ceph_inode_to_client(inode); 2184 struct ceph_inode_info *ci = ceph_inode(inode); 2185 struct ceph_string *pool_ns; 2186 s64 pool; 2187 int ret, flags; 2188 2189 /* Only need to do this for regular files */ 2190 if (!S_ISREG(inode->i_mode)) 2191 return 0; 2192 2193 if (ci->i_vino.snap != CEPH_NOSNAP) { 2194 /* 2195 * Pool permission check needs to write to the first object. 2196 * But for snapshot, head of the first object may have alread 2197 * been deleted. Skip check to avoid creating orphan object. 2198 */ 2199 return 0; 2200 } 2201 2202 if (ceph_test_mount_opt(ceph_inode_to_fs_client(inode), 2203 NOPOOLPERM)) 2204 return 0; 2205 2206 spin_lock(&ci->i_ceph_lock); 2207 flags = ci->i_ceph_flags; 2208 pool = ci->i_layout.pool_id; 2209 spin_unlock(&ci->i_ceph_lock); 2210 check: 2211 if (flags & CEPH_I_POOL_PERM) { 2212 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { 2213 doutc(cl, "pool %lld no read perm\n", pool); 2214 return -EPERM; 2215 } 2216 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { 2217 doutc(cl, "pool %lld no write perm\n", pool); 2218 return -EPERM; 2219 } 2220 return 0; 2221 } 2222 2223 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); 2224 ret = __ceph_pool_perm_get(ci, pool, pool_ns); 2225 ceph_put_string(pool_ns); 2226 if (ret < 0) 2227 return ret; 2228 2229 flags = CEPH_I_POOL_PERM; 2230 if (ret & POOL_READ) 2231 flags |= CEPH_I_POOL_RD; 2232 if (ret & POOL_WRITE) 2233 flags |= CEPH_I_POOL_WR; 2234 2235 spin_lock(&ci->i_ceph_lock); 2236 if (pool == ci->i_layout.pool_id && 2237 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) { 2238 ci->i_ceph_flags |= flags; 2239 } else { 2240 pool = ci->i_layout.pool_id; 2241 flags = ci->i_ceph_flags; 2242 } 2243 spin_unlock(&ci->i_ceph_lock); 2244 goto check; 2245 } 2246 2247 void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) 2248 { 2249 struct ceph_pool_perm *perm; 2250 struct rb_node *n; 2251 2252 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { 2253 n = rb_first(&mdsc->pool_perm_tree); 2254 perm = rb_entry(n, struct ceph_pool_perm, node); 2255 rb_erase(n, &mdsc->pool_perm_tree); 2256 kfree(perm); 2257 } 2258 } 2259