1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/backing-dev.h> 5 #include <linux/fs.h> 6 #include <linux/mm.h> 7 #include <linux/swap.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/folio_batch.h> 11 #include <linux/task_io_accounting_ops.h> 12 #include <linux/signal.h> 13 #include <linux/iversion.h> 14 #include <linux/ktime.h> 15 #include <linux/netfs.h> 16 #include <trace/events/netfs.h> 17 18 #include "super.h" 19 #include "mds_client.h" 20 #include "cache.h" 21 #include "metric.h" 22 #include "subvolume_metrics.h" 23 #include "crypto.h" 24 #include <linux/ceph/osd_client.h> 25 #include <linux/ceph/striper.h> 26 27 /* 28 * Ceph address space ops. 29 * 30 * There are a few funny things going on here. 31 * 32 * The page->private field is used to reference a struct 33 * ceph_snap_context for _every_ dirty page. This indicates which 34 * snapshot the page was logically dirtied in, and thus which snap 35 * context needs to be associated with the osd write during writeback. 36 * 37 * Similarly, struct ceph_inode_info maintains a set of counters to 38 * count dirty pages on the inode. In the absence of snapshots, 39 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. 40 * 41 * When a snapshot is taken (that is, when the client receives 42 * notification that a snapshot was taken), each inode with caps and 43 * with dirty pages (dirty pages implies there is a cap) gets a new 44 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending 45 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is 46 * moved to capsnap->dirty. (Unless a sync write is currently in 47 * progress. In that case, the capsnap is said to be "pending", new 48 * writes cannot start, and the capsnap isn't "finalized" until the 49 * write completes (or fails) and a final size/mtime for the inode for 50 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. 51 * 52 * On writeback, we must submit writes to the osd IN SNAP ORDER. So, 53 * we look for the first capsnap in i_cap_snaps and write out pages in 54 * that snap context _only_. Then we move on to the next capsnap, 55 * eventually reaching the "live" or "head" context (i.e., pages that 56 * are not yet snapped) and are writing the most recently dirtied 57 * pages. 58 * 59 * Invalidate and so forth must take care to ensure the dirty page 60 * accounting is preserved. 61 */ 62 63 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10)) 64 #define CONGESTION_OFF_THRESH(congestion_kb) \ 65 (CONGESTION_ON_THRESH(congestion_kb) - \ 66 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) 67 68 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 69 struct folio **foliop, void **_fsdata); 70 71 static inline struct ceph_snap_context *page_snap_context(struct page *page) 72 { 73 if (PagePrivate(page)) 74 return (void *)page->private; 75 return NULL; 76 } 77 78 /* 79 * Dirty a page. Optimistically adjust accounting, on the assumption 80 * that we won't race with invalidate. If we do, readjust. 81 */ 82 static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) 83 { 84 struct inode *inode = mapping->host; 85 struct ceph_client *cl = ceph_inode_to_client(inode); 86 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 87 struct ceph_inode_info *ci; 88 struct ceph_snap_context *snapc; 89 90 if (folio_test_dirty(folio)) { 91 doutc(cl, "%llx.%llx %p idx %lu -- already dirty\n", 92 ceph_vinop(inode), folio, folio->index); 93 VM_BUG_ON_FOLIO(!folio_test_private(folio), folio); 94 return false; 95 } 96 97 atomic64_inc(&mdsc->dirty_folios); 98 99 ci = ceph_inode(inode); 100 101 /* dirty the head */ 102 spin_lock(&ci->i_ceph_lock); 103 if (__ceph_have_pending_cap_snap(ci)) { 104 struct ceph_cap_snap *capsnap = 105 list_last_entry(&ci->i_cap_snaps, 106 struct ceph_cap_snap, 107 ci_item); 108 snapc = ceph_get_snap_context(capsnap->context); 109 capsnap->dirty_pages++; 110 } else { 111 BUG_ON(!ci->i_head_snapc); 112 snapc = ceph_get_snap_context(ci->i_head_snapc); 113 ++ci->i_wrbuffer_ref_head; 114 } 115 if (ci->i_wrbuffer_ref == 0) 116 ihold(inode); 117 ++ci->i_wrbuffer_ref; 118 doutc(cl, "%llx.%llx %p idx %lu head %d/%d -> %d/%d " 119 "snapc %p seq %lld (%d snaps)\n", 120 ceph_vinop(inode), folio, folio->index, 121 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, 122 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 123 snapc, snapc->seq, snapc->num_snaps); 124 spin_unlock(&ci->i_ceph_lock); 125 126 /* 127 * Reference snap context in folio->private. Also set 128 * PagePrivate so that we get invalidate_folio callback. 129 */ 130 VM_WARN_ON_FOLIO(folio->private, folio); 131 folio_attach_private(folio, snapc); 132 133 return ceph_fscache_dirty_folio(mapping, folio); 134 } 135 136 /* 137 * If we are truncating the full folio (i.e. offset == 0), adjust the 138 * dirty folio counters appropriately. Only called if there is private 139 * data on the folio. 140 */ 141 static void ceph_invalidate_folio(struct folio *folio, size_t offset, 142 size_t length) 143 { 144 struct inode *inode = folio->mapping->host; 145 struct ceph_client *cl = ceph_inode_to_client(inode); 146 struct ceph_inode_info *ci = ceph_inode(inode); 147 struct ceph_snap_context *snapc; 148 149 150 if (offset != 0 || length != folio_size(folio)) { 151 doutc(cl, "%llx.%llx idx %lu partial dirty page %zu~%zu\n", 152 ceph_vinop(inode), folio->index, offset, length); 153 return; 154 } 155 156 WARN_ON(!folio_test_locked(folio)); 157 if (folio_test_private(folio)) { 158 doutc(cl, "%llx.%llx idx %lu full dirty page\n", 159 ceph_vinop(inode), folio->index); 160 161 snapc = folio_detach_private(folio); 162 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 163 ceph_put_snap_context(snapc); 164 } 165 166 netfs_invalidate_folio(folio, offset, length); 167 } 168 169 static void ceph_netfs_expand_readahead(struct netfs_io_request *rreq) 170 { 171 struct inode *inode = rreq->inode; 172 struct ceph_inode_info *ci = ceph_inode(inode); 173 struct ceph_file_layout *lo = &ci->i_layout; 174 unsigned long max_pages = inode->i_sb->s_bdi->ra_pages; 175 loff_t end = rreq->start + rreq->len, new_end; 176 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 177 unsigned long max_len; 178 u32 blockoff; 179 180 if (priv) { 181 /* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */ 182 if (priv->file_ra_disabled) 183 max_pages = 0; 184 else 185 max_pages = priv->file_ra_pages; 186 187 } 188 189 /* Readahead is disabled */ 190 if (!max_pages) 191 return; 192 193 max_len = max_pages << PAGE_SHIFT; 194 195 /* 196 * Try to expand the length forward by rounding up it to the next 197 * block, but do not exceed the file size, unless the original 198 * request already exceeds it. 199 */ 200 new_end = umin(round_up(end, lo->stripe_unit), rreq->i_size); 201 if (new_end > end && new_end <= rreq->start + max_len) 202 rreq->len = new_end - rreq->start; 203 204 /* Try to expand the start downward */ 205 div_u64_rem(rreq->start, lo->stripe_unit, &blockoff); 206 if (rreq->len + blockoff <= max_len) { 207 rreq->start -= blockoff; 208 rreq->len += blockoff; 209 } 210 } 211 212 static void finish_netfs_read(struct ceph_osd_request *req) 213 { 214 struct inode *inode = req->r_inode; 215 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 216 struct ceph_client *cl = fsc->client; 217 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 218 struct netfs_io_subrequest *subreq = req->r_priv; 219 struct ceph_osd_req_op *op = &req->r_ops[0]; 220 int err = req->r_result; 221 bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); 222 223 ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, 224 req->r_end_latency, osd_data->length, err); 225 226 doutc(cl, "result %d subreq->len=%zu i_size=%lld\n", req->r_result, 227 subreq->len, i_size_read(req->r_inode)); 228 229 /* no object means success but no data */ 230 if (err == -ENOENT) { 231 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 232 __set_bit(NETFS_SREQ_MADE_PROGRESS, &subreq->flags); 233 err = 0; 234 } else if (err == -EBLOCKLISTED) { 235 fsc->blocklisted = true; 236 } 237 238 if (err >= 0) { 239 if (sparse && err > 0) 240 err = ceph_sparse_ext_map_end(op); 241 if (err < subreq->len && 242 subreq->rreq->origin != NETFS_UNBUFFERED_READ && 243 subreq->rreq->origin != NETFS_DIO_READ) 244 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 245 if (IS_ENCRYPTED(inode) && err > 0) { 246 err = ceph_fscrypt_decrypt_extents(inode, 247 osd_data->pages, subreq->start, 248 op->extent.sparse_ext, 249 op->extent.sparse_ext_cnt); 250 if (err > subreq->len) 251 err = subreq->len; 252 } 253 if (err > 0) 254 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 255 } 256 257 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 258 ceph_put_page_vector(osd_data->pages, 259 calc_pages_for(osd_data->alignment, 260 osd_data->length), false); 261 } 262 if (err > 0) { 263 ceph_subvolume_metrics_record_io(fsc->mdsc, ceph_inode(inode), 264 false, err, 265 req->r_start_latency, 266 req->r_end_latency); 267 subreq->transferred = err; 268 err = 0; 269 } 270 subreq->error = err; 271 trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); 272 netfs_read_subreq_terminated(subreq); 273 iput(req->r_inode); 274 ceph_dec_osd_stopping_blocker(fsc->mdsc); 275 } 276 277 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq) 278 { 279 struct netfs_io_request *rreq = subreq->rreq; 280 struct inode *inode = rreq->inode; 281 struct ceph_mds_reply_info_parsed *rinfo; 282 struct ceph_mds_reply_info_in *iinfo; 283 struct ceph_mds_request *req; 284 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 285 struct ceph_inode_info *ci = ceph_inode(inode); 286 ssize_t err = 0; 287 size_t len; 288 int mode; 289 290 if (rreq->origin != NETFS_UNBUFFERED_READ && 291 rreq->origin != NETFS_DIO_READ) 292 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 293 __clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags); 294 295 if (subreq->start >= inode->i_size) 296 goto out; 297 298 /* We need to fetch the inline data. */ 299 mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA); 300 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode); 301 if (IS_ERR(req)) { 302 err = PTR_ERR(req); 303 goto out; 304 } 305 req->r_ino1 = ci->i_vino; 306 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA); 307 req->r_num_caps = 2; 308 309 trace_netfs_sreq(subreq, netfs_sreq_trace_submit); 310 err = ceph_mdsc_do_request(mdsc, NULL, req); 311 if (err < 0) 312 goto out; 313 314 rinfo = &req->r_reply_info; 315 iinfo = &rinfo->targeti; 316 if (iinfo->inline_version == CEPH_INLINE_NONE) { 317 /* The data got uninlined */ 318 ceph_mdsc_put_request(req); 319 return false; 320 } 321 322 len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len); 323 err = copy_to_iter(iinfo->inline_data + subreq->start, len, &subreq->io_iter); 324 if (err == 0) { 325 err = -EFAULT; 326 } else { 327 subreq->transferred += err; 328 err = 0; 329 } 330 331 ceph_mdsc_put_request(req); 332 out: 333 subreq->error = err; 334 trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); 335 netfs_read_subreq_terminated(subreq); 336 return true; 337 } 338 339 static int ceph_netfs_prepare_read(struct netfs_io_subrequest *subreq) 340 { 341 struct netfs_io_request *rreq = subreq->rreq; 342 struct inode *inode = rreq->inode; 343 struct ceph_inode_info *ci = ceph_inode(inode); 344 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 345 u64 objno, objoff; 346 u32 xlen; 347 348 /* Truncate the extent at the end of the current block */ 349 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len, 350 &objno, &objoff, &xlen); 351 rreq->io_streams[0].sreq_max_len = umin(xlen, fsc->mount_options->rsize); 352 return 0; 353 } 354 355 static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq) 356 { 357 struct netfs_io_request *rreq = subreq->rreq; 358 struct inode *inode = rreq->inode; 359 struct ceph_inode_info *ci = ceph_inode(inode); 360 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 361 struct ceph_client *cl = fsc->client; 362 struct ceph_osd_request *req = NULL; 363 struct ceph_vino vino = ceph_vino(inode); 364 int err; 365 u64 len; 366 bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD); 367 u64 off = subreq->start; 368 int extent_cnt; 369 370 if (ceph_inode_is_shutdown(inode)) { 371 err = -EIO; 372 goto out; 373 } 374 375 if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq)) 376 return; 377 378 // TODO: This rounding here is slightly dodgy. It *should* work, for 379 // now, as the cache only deals in blocks that are a multiple of 380 // PAGE_SIZE and fscrypt blocks are at most PAGE_SIZE. What needs to 381 // happen is for the fscrypt driving to be moved into netfslib and the 382 // data in the cache also to be stored encrypted. 383 len = subreq->len; 384 ceph_fscrypt_adjust_off_and_len(inode, &off, &len); 385 386 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, 387 off, &len, 0, 1, sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ, 388 CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, 389 ci->i_truncate_size, false); 390 if (IS_ERR(req)) { 391 err = PTR_ERR(req); 392 req = NULL; 393 goto out; 394 } 395 396 if (sparse) { 397 extent_cnt = __ceph_sparse_read_ext_count(inode, len); 398 err = ceph_alloc_sparse_ext_map(&req->r_ops[0], extent_cnt); 399 if (err) 400 goto out; 401 } 402 403 doutc(cl, "%llx.%llx pos=%llu orig_len=%zu len=%llu\n", 404 ceph_vinop(inode), subreq->start, subreq->len, len); 405 406 /* 407 * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for 408 * encrypted inodes. We'd need infrastructure that handles an iov_iter 409 * instead of page arrays, and we don't have that as of yet. Once the 410 * dust settles on the write helpers and encrypt/decrypt routines for 411 * netfs, we should be able to rework this. 412 */ 413 if (IS_ENCRYPTED(inode)) { 414 struct page **pages; 415 size_t page_off; 416 417 /* 418 * FIXME: io_iter.count needs to be corrected to aligned 419 * length. Otherwise, iov_iter_get_pages_alloc2() operates 420 * with the initial unaligned length value. As a result, 421 * ceph_msg_data_cursor_init() triggers BUG_ON() in the case 422 * if msg->sparse_read_total > msg->data_length. 423 */ 424 subreq->io_iter.count = len; 425 426 err = iov_iter_get_pages_alloc2(&subreq->io_iter, &pages, len, &page_off); 427 if (err < 0) { 428 doutc(cl, "%llx.%llx failed to allocate pages, %d\n", 429 ceph_vinop(inode), err); 430 goto out; 431 } 432 433 /* should always give us a page-aligned read */ 434 WARN_ON_ONCE(page_off); 435 len = err; 436 err = 0; 437 438 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, 439 false); 440 } else { 441 osd_req_op_extent_osd_iter(req, 0, &subreq->io_iter); 442 } 443 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 444 err = -EIO; 445 goto out; 446 } 447 req->r_callback = finish_netfs_read; 448 req->r_priv = subreq; 449 req->r_inode = inode; 450 ihold(inode); 451 452 trace_netfs_sreq(subreq, netfs_sreq_trace_submit); 453 ceph_osdc_start_request(req->r_osdc, req); 454 out: 455 ceph_osdc_put_request(req); 456 if (err) { 457 subreq->error = err; 458 netfs_read_subreq_terminated(subreq); 459 } 460 doutc(cl, "%llx.%llx result %d\n", ceph_vinop(inode), err); 461 } 462 463 static int ceph_init_request(struct netfs_io_request *rreq, struct file *file) 464 { 465 struct inode *inode = rreq->inode; 466 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 467 struct ceph_client *cl = ceph_inode_to_client(inode); 468 int got = 0, want = CEPH_CAP_FILE_CACHE; 469 struct ceph_netfs_request_data *priv; 470 int ret = 0; 471 472 /* [DEPRECATED] Use PG_private_2 to mark folio being written to the cache. */ 473 __set_bit(NETFS_RREQ_USE_PGPRIV2, &rreq->flags); 474 475 if (rreq->origin != NETFS_READAHEAD) 476 return 0; 477 478 priv = kzalloc_obj(*priv, GFP_NOFS); 479 if (!priv) 480 return -ENOMEM; 481 482 if (file) { 483 struct ceph_rw_context *rw_ctx; 484 struct ceph_file_info *fi = file->private_data; 485 486 priv->file_ra_pages = file->f_ra.ra_pages; 487 priv->file_ra_disabled = file->f_mode & FMODE_RANDOM; 488 489 rw_ctx = ceph_find_rw_context(fi); 490 if (rw_ctx) { 491 rreq->netfs_priv = priv; 492 return 0; 493 } 494 } 495 496 /* 497 * readahead callers do not necessarily hold Fcb caps 498 * (e.g. fadvise, madvise). 499 */ 500 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got); 501 if (ret < 0) { 502 doutc(cl, "%llx.%llx, error getting cap\n", ceph_vinop(inode)); 503 goto out; 504 } 505 506 if (!(got & want)) { 507 doutc(cl, "%llx.%llx, no cache cap\n", ceph_vinop(inode)); 508 ret = -EACCES; 509 goto out; 510 } 511 if (ret == 0) { 512 ret = -EACCES; 513 goto out; 514 } 515 516 priv->caps = got; 517 rreq->netfs_priv = priv; 518 rreq->io_streams[0].sreq_max_len = fsc->mount_options->rsize; 519 520 out: 521 if (ret < 0) { 522 if (got) 523 ceph_put_cap_refs(ceph_inode(inode), got); 524 kfree(priv); 525 } 526 527 return ret; 528 } 529 530 static void ceph_netfs_free_request(struct netfs_io_request *rreq) 531 { 532 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 533 534 if (!priv) 535 return; 536 537 if (priv->caps) 538 ceph_put_cap_refs(ceph_inode(rreq->inode), priv->caps); 539 kfree(priv); 540 rreq->netfs_priv = NULL; 541 } 542 543 const struct netfs_request_ops ceph_netfs_ops = { 544 .init_request = ceph_init_request, 545 .free_request = ceph_netfs_free_request, 546 .prepare_read = ceph_netfs_prepare_read, 547 .issue_read = ceph_netfs_issue_read, 548 .expand_readahead = ceph_netfs_expand_readahead, 549 .check_write_begin = ceph_netfs_check_write_begin, 550 }; 551 552 #ifdef CONFIG_CEPH_FSCACHE 553 static void ceph_set_page_fscache(struct page *page) 554 { 555 folio_start_private_2(page_folio(page)); /* [DEPRECATED] */ 556 } 557 558 static void ceph_fscache_write_terminated(void *priv, ssize_t error) 559 { 560 struct inode *inode = priv; 561 562 if (IS_ERR_VALUE(error) && error != -ENOBUFS) 563 ceph_fscache_invalidate(inode, false); 564 } 565 566 static void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 567 { 568 struct ceph_inode_info *ci = ceph_inode(inode); 569 struct fscache_cookie *cookie = ceph_fscache_cookie(ci); 570 571 fscache_write_to_cache(cookie, inode->i_mapping, off, len, i_size_read(inode), 572 ceph_fscache_write_terminated, inode, true, caching); 573 } 574 #else 575 static inline void ceph_set_page_fscache(struct page *page) 576 { 577 } 578 579 static inline void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 580 { 581 } 582 #endif /* CONFIG_CEPH_FSCACHE */ 583 584 struct ceph_writeback_ctl 585 { 586 loff_t i_size; 587 u64 truncate_size; 588 u32 truncate_seq; 589 bool size_stable; 590 591 bool head_snapc; 592 struct ceph_snap_context *snapc; 593 struct ceph_snap_context *last_snapc; 594 595 bool done; 596 bool should_loop; 597 bool range_whole; 598 pgoff_t start_index; 599 pgoff_t index; 600 pgoff_t end; 601 xa_mark_t tag; 602 603 pgoff_t strip_unit_end; 604 unsigned int wsize; 605 unsigned int nr_folios; 606 unsigned int max_pages; 607 unsigned int locked_pages; 608 609 int op_idx; 610 int num_ops; 611 u64 offset; 612 u64 len; 613 614 struct folio_batch fbatch; 615 unsigned int processed_in_fbatch; 616 617 bool from_pool; 618 struct page **pages; 619 struct page **data_pages; 620 }; 621 622 /* 623 * Get ref for the oldest snapc for an inode with dirty data... that is, the 624 * only snap context we are allowed to write back. 625 */ 626 static struct ceph_snap_context * 627 get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl, 628 struct ceph_snap_context *page_snapc) 629 { 630 struct ceph_inode_info *ci = ceph_inode(inode); 631 struct ceph_client *cl = ceph_inode_to_client(inode); 632 struct ceph_snap_context *snapc = NULL; 633 struct ceph_cap_snap *capsnap = NULL; 634 635 spin_lock(&ci->i_ceph_lock); 636 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 637 doutc(cl, " capsnap %p snapc %p has %d dirty pages\n", 638 capsnap, capsnap->context, capsnap->dirty_pages); 639 if (!capsnap->dirty_pages) 640 continue; 641 642 /* get i_size, truncate_{seq,size} for page_snapc? */ 643 if (snapc && capsnap->context != page_snapc) 644 continue; 645 646 if (ctl) { 647 if (capsnap->writing) { 648 ctl->i_size = i_size_read(inode); 649 ctl->size_stable = false; 650 } else { 651 ctl->i_size = capsnap->size; 652 ctl->size_stable = true; 653 } 654 ctl->truncate_size = capsnap->truncate_size; 655 ctl->truncate_seq = capsnap->truncate_seq; 656 ctl->head_snapc = false; 657 } 658 659 if (snapc) 660 break; 661 662 snapc = ceph_get_snap_context(capsnap->context); 663 if (!page_snapc || 664 page_snapc == snapc || 665 page_snapc->seq > snapc->seq) 666 break; 667 } 668 if (!snapc && ci->i_wrbuffer_ref_head) { 669 snapc = ceph_get_snap_context(ci->i_head_snapc); 670 doutc(cl, " head snapc %p has %d dirty pages\n", snapc, 671 ci->i_wrbuffer_ref_head); 672 if (ctl) { 673 ctl->i_size = i_size_read(inode); 674 ctl->truncate_size = ci->i_truncate_size; 675 ctl->truncate_seq = ci->i_truncate_seq; 676 ctl->size_stable = false; 677 ctl->head_snapc = true; 678 } 679 } 680 spin_unlock(&ci->i_ceph_lock); 681 return snapc; 682 } 683 684 static u64 get_writepages_data_length(struct inode *inode, 685 struct page *page, u64 start) 686 { 687 struct ceph_inode_info *ci = ceph_inode(inode); 688 struct ceph_snap_context *snapc; 689 struct ceph_cap_snap *capsnap = NULL; 690 u64 end = i_size_read(inode); 691 u64 ret; 692 693 snapc = page_snap_context(ceph_fscrypt_pagecache_page(page)); 694 if (snapc != ci->i_head_snapc) { 695 bool found = false; 696 spin_lock(&ci->i_ceph_lock); 697 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 698 if (capsnap->context == snapc) { 699 if (!capsnap->writing) 700 end = capsnap->size; 701 found = true; 702 break; 703 } 704 } 705 spin_unlock(&ci->i_ceph_lock); 706 WARN_ON(!found); 707 } 708 if (end > ceph_fscrypt_page_offset(page) + thp_size(page)) 709 end = ceph_fscrypt_page_offset(page) + thp_size(page); 710 ret = end > start ? end - start : 0; 711 if (ret && fscrypt_is_bounce_page(page)) 712 ret = round_up(ret, CEPH_FSCRYPT_BLOCK_SIZE); 713 return ret; 714 } 715 716 /* 717 * Write a folio, but leave it locked. 718 * 719 * If we get a write error, mark the mapping for error, but still adjust the 720 * dirty page accounting (i.e., folio is no longer dirty). 721 */ 722 static int write_folio_nounlock(struct folio *folio, 723 struct writeback_control *wbc) 724 { 725 struct page *page = &folio->page; 726 struct inode *inode = folio->mapping->host; 727 struct ceph_inode_info *ci = ceph_inode(inode); 728 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 729 struct ceph_client *cl = fsc->client; 730 struct ceph_snap_context *snapc, *oldest; 731 loff_t page_off = folio_pos(folio); 732 int err; 733 loff_t len = folio_size(folio); 734 loff_t wlen; 735 struct ceph_writeback_ctl ceph_wbc; 736 struct ceph_osd_client *osdc = &fsc->client->osdc; 737 struct ceph_osd_request *req; 738 bool caching = ceph_is_cache_enabled(inode); 739 struct page *bounce_page = NULL; 740 741 doutc(cl, "%llx.%llx folio %p idx %lu\n", ceph_vinop(inode), folio, 742 folio->index); 743 744 if (ceph_inode_is_shutdown(inode)) 745 return -EIO; 746 747 /* verify this is a writeable snap context */ 748 snapc = page_snap_context(&folio->page); 749 if (!snapc) { 750 doutc(cl, "%llx.%llx folio %p not dirty?\n", ceph_vinop(inode), 751 folio); 752 return 0; 753 } 754 oldest = get_oldest_context(inode, &ceph_wbc, snapc); 755 if (snapc->seq > oldest->seq) { 756 doutc(cl, "%llx.%llx folio %p snapc %p not writeable - noop\n", 757 ceph_vinop(inode), folio, snapc); 758 /* we should only noop if called by kswapd */ 759 WARN_ON(!(current->flags & PF_MEMALLOC)); 760 ceph_put_snap_context(oldest); 761 folio_redirty_for_writepage(wbc, folio); 762 return 0; 763 } 764 ceph_put_snap_context(oldest); 765 766 /* is this a partial page at end of file? */ 767 if (page_off >= ceph_wbc.i_size) { 768 doutc(cl, "%llx.%llx folio at %lu beyond eof %llu\n", 769 ceph_vinop(inode), folio->index, ceph_wbc.i_size); 770 folio_invalidate(folio, 0, folio_size(folio)); 771 return 0; 772 } 773 774 if (ceph_wbc.i_size < page_off + len) 775 len = ceph_wbc.i_size - page_off; 776 777 wlen = IS_ENCRYPTED(inode) ? round_up(len, CEPH_FSCRYPT_BLOCK_SIZE) : len; 778 doutc(cl, "%llx.%llx folio %p index %lu on %llu~%llu snapc %p seq %lld\n", 779 ceph_vinop(inode), folio, folio->index, page_off, wlen, snapc, 780 snapc->seq); 781 782 if (atomic_long_inc_return(&fsc->writeback_count) > 783 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 784 fsc->write_congested = true; 785 786 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), 787 page_off, &wlen, 0, 1, CEPH_OSD_OP_WRITE, 788 CEPH_OSD_FLAG_WRITE, snapc, 789 ceph_wbc.truncate_seq, 790 ceph_wbc.truncate_size, true); 791 if (IS_ERR(req)) { 792 folio_redirty_for_writepage(wbc, folio); 793 return PTR_ERR(req); 794 } 795 796 if (wlen < len) 797 len = wlen; 798 799 folio_start_writeback(folio); 800 if (caching) 801 ceph_set_page_fscache(&folio->page); 802 ceph_fscache_write_to_cache(inode, page_off, len, caching); 803 804 if (IS_ENCRYPTED(inode)) { 805 bounce_page = fscrypt_encrypt_pagecache_blocks(folio, 806 CEPH_FSCRYPT_BLOCK_SIZE, 0, 807 GFP_NOFS); 808 if (IS_ERR(bounce_page)) { 809 folio_redirty_for_writepage(wbc, folio); 810 folio_end_writeback(folio); 811 ceph_osdc_put_request(req); 812 return PTR_ERR(bounce_page); 813 } 814 } 815 816 /* it may be a short write due to an object boundary */ 817 WARN_ON_ONCE(len > folio_size(folio)); 818 osd_req_op_extent_osd_data_pages(req, 0, 819 bounce_page ? &bounce_page : &page, wlen, 0, 820 false, false); 821 doutc(cl, "%llx.%llx %llu~%llu (%llu bytes, %sencrypted)\n", 822 ceph_vinop(inode), page_off, len, wlen, 823 IS_ENCRYPTED(inode) ? "" : "not "); 824 825 req->r_mtime = inode_get_mtime(inode); 826 ceph_osdc_start_request(osdc, req); 827 err = ceph_osdc_wait_request(osdc, req); 828 829 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 830 req->r_end_latency, len, err); 831 if (err >= 0 && len > 0) 832 ceph_subvolume_metrics_record_io(fsc->mdsc, ci, true, len, 833 req->r_start_latency, 834 req->r_end_latency); 835 fscrypt_free_bounce_page(bounce_page); 836 ceph_osdc_put_request(req); 837 if (err == 0) 838 err = len; 839 840 if (err < 0) { 841 struct writeback_control tmp_wbc; 842 if (!wbc) 843 wbc = &tmp_wbc; 844 if (err == -ERESTARTSYS) { 845 /* killed by SIGKILL */ 846 doutc(cl, "%llx.%llx interrupted page %p\n", 847 ceph_vinop(inode), folio); 848 folio_redirty_for_writepage(wbc, folio); 849 folio_end_writeback(folio); 850 return err; 851 } 852 if (err == -EBLOCKLISTED) 853 fsc->blocklisted = true; 854 doutc(cl, "%llx.%llx setting mapping error %d %p\n", 855 ceph_vinop(inode), err, folio); 856 mapping_set_error(&inode->i_data, err); 857 wbc->pages_skipped++; 858 } else { 859 doutc(cl, "%llx.%llx cleaned page %p\n", 860 ceph_vinop(inode), folio); 861 err = 0; /* vfs expects us to return 0 */ 862 } 863 oldest = folio_detach_private(folio); 864 WARN_ON_ONCE(oldest != snapc); 865 folio_end_writeback(folio); 866 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 867 ceph_put_snap_context(snapc); /* page's reference */ 868 869 if (atomic_long_dec_return(&fsc->writeback_count) < 870 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 871 fsc->write_congested = false; 872 873 return err; 874 } 875 876 /* 877 * async writeback completion handler. 878 * 879 * If we get an error, set the mapping error bit, but not the individual 880 * page error bits. 881 */ 882 static void writepages_finish(struct ceph_osd_request *req) 883 { 884 struct inode *inode = req->r_inode; 885 struct ceph_inode_info *ci = ceph_inode(inode); 886 struct ceph_client *cl = ceph_inode_to_client(inode); 887 struct ceph_osd_data *osd_data; 888 struct page *page; 889 int num_pages, total_pages = 0; 890 int i, j; 891 int rc = req->r_result; 892 struct ceph_snap_context *snapc = req->r_snapc; 893 struct address_space *mapping = inode->i_mapping; 894 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 895 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 896 unsigned int len = 0; 897 bool remove_page; 898 899 doutc(cl, "%llx.%llx rc %d\n", ceph_vinop(inode), rc); 900 if (rc < 0) { 901 mapping_set_error(mapping, rc); 902 ceph_set_error_write(ci); 903 if (rc == -EBLOCKLISTED) 904 fsc->blocklisted = true; 905 } else { 906 ceph_clear_error_write(ci); 907 } 908 909 /* 910 * We lost the cache cap, need to truncate the page before 911 * it is unlocked, otherwise we'd truncate it later in the 912 * page truncation thread, possibly losing some data that 913 * raced its way in 914 */ 915 remove_page = !(ceph_caps_issued(ci) & 916 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 917 918 /* clean all pages */ 919 for (i = 0; i < req->r_num_ops; i++) { 920 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) { 921 pr_warn_client(cl, 922 "%llx.%llx incorrect op %d req %p index %d tid %llu\n", 923 ceph_vinop(inode), req->r_ops[i].op, req, i, 924 req->r_tid); 925 break; 926 } 927 928 osd_data = osd_req_op_extent_osd_data(req, i); 929 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 930 len += osd_data->length; 931 num_pages = calc_pages_for((u64)osd_data->alignment, 932 (u64)osd_data->length); 933 total_pages += num_pages; 934 for (j = 0; j < num_pages; j++) { 935 page = osd_data->pages[j]; 936 if (fscrypt_is_bounce_page(page)) { 937 page = fscrypt_pagecache_page(page); 938 fscrypt_free_bounce_page(osd_data->pages[j]); 939 osd_data->pages[j] = page; 940 } 941 BUG_ON(!page); 942 WARN_ON(!PageUptodate(page)); 943 944 if (atomic_long_dec_return(&fsc->writeback_count) < 945 CONGESTION_OFF_THRESH( 946 fsc->mount_options->congestion_kb)) 947 fsc->write_congested = false; 948 949 ceph_put_snap_context(detach_page_private(page)); 950 end_page_writeback(page); 951 952 if (atomic64_dec_return(&mdsc->dirty_folios) <= 0) { 953 wake_up_all(&mdsc->flush_end_wq); 954 WARN_ON(atomic64_read(&mdsc->dirty_folios) < 0); 955 } 956 957 doutc(cl, "unlocking %p\n", page); 958 959 if (remove_page) 960 generic_error_remove_folio(inode->i_mapping, 961 page_folio(page)); 962 963 unlock_page(page); 964 } 965 doutc(cl, "%llx.%llx wrote %llu bytes cleaned %d pages\n", 966 ceph_vinop(inode), osd_data->length, 967 rc >= 0 ? num_pages : 0); 968 969 release_pages(osd_data->pages, num_pages); 970 } 971 972 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 973 req->r_end_latency, len, rc); 974 975 if (rc >= 0 && len > 0) 976 ceph_subvolume_metrics_record_io(mdsc, ci, true, len, 977 req->r_start_latency, 978 req->r_end_latency); 979 980 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); 981 982 osd_data = osd_req_op_extent_osd_data(req, 0); 983 if (osd_data->pages_from_pool) 984 mempool_free(osd_data->pages, ceph_wb_pagevec_pool); 985 else 986 kfree(osd_data->pages); 987 ceph_osdc_put_request(req); 988 ceph_dec_osd_stopping_blocker(fsc->mdsc); 989 } 990 991 static inline 992 bool is_forced_umount(struct address_space *mapping) 993 { 994 struct inode *inode = mapping->host; 995 struct ceph_inode_info *ci = ceph_inode(inode); 996 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 997 struct ceph_client *cl = fsc->client; 998 999 if (ceph_inode_is_shutdown(inode)) { 1000 if (ci->i_wrbuffer_ref > 0) { 1001 pr_warn_ratelimited_client(cl, 1002 "%llx.%llx %lld forced umount\n", 1003 ceph_vinop(inode), ceph_ino(inode)); 1004 } 1005 mapping_set_error(mapping, -EIO); 1006 return true; 1007 } 1008 1009 return false; 1010 } 1011 1012 static inline 1013 unsigned int ceph_define_write_size(struct address_space *mapping) 1014 { 1015 struct inode *inode = mapping->host; 1016 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1017 struct ceph_inode_info *ci = ceph_inode(inode); 1018 unsigned int wsize = ci->i_layout.stripe_unit; 1019 1020 if (fsc->mount_options->wsize < wsize) 1021 wsize = fsc->mount_options->wsize; 1022 1023 return wsize; 1024 } 1025 1026 static inline 1027 void ceph_folio_batch_init(struct ceph_writeback_ctl *ceph_wbc) 1028 { 1029 folio_batch_init(&ceph_wbc->fbatch); 1030 ceph_wbc->processed_in_fbatch = 0; 1031 } 1032 1033 static inline 1034 void ceph_folio_batch_reinit(struct ceph_writeback_ctl *ceph_wbc) 1035 { 1036 folio_batch_release(&ceph_wbc->fbatch); 1037 ceph_folio_batch_init(ceph_wbc); 1038 } 1039 1040 static inline 1041 void ceph_init_writeback_ctl(struct address_space *mapping, 1042 struct writeback_control *wbc, 1043 struct ceph_writeback_ctl *ceph_wbc) 1044 { 1045 ceph_wbc->snapc = NULL; 1046 ceph_wbc->last_snapc = NULL; 1047 1048 ceph_wbc->strip_unit_end = 0; 1049 ceph_wbc->wsize = ceph_define_write_size(mapping); 1050 1051 ceph_wbc->nr_folios = 0; 1052 ceph_wbc->max_pages = 0; 1053 ceph_wbc->locked_pages = 0; 1054 1055 ceph_wbc->done = false; 1056 ceph_wbc->should_loop = false; 1057 ceph_wbc->range_whole = false; 1058 1059 ceph_wbc->start_index = wbc->range_cyclic ? mapping->writeback_index : 0; 1060 ceph_wbc->index = ceph_wbc->start_index; 1061 ceph_wbc->end = -1; 1062 1063 ceph_wbc->tag = wbc_to_tag(wbc); 1064 1065 ceph_wbc->op_idx = -1; 1066 ceph_wbc->num_ops = 0; 1067 ceph_wbc->offset = 0; 1068 ceph_wbc->len = 0; 1069 ceph_wbc->from_pool = false; 1070 1071 ceph_folio_batch_init(ceph_wbc); 1072 1073 ceph_wbc->pages = NULL; 1074 ceph_wbc->data_pages = NULL; 1075 } 1076 1077 static inline 1078 int ceph_define_writeback_range(struct address_space *mapping, 1079 struct writeback_control *wbc, 1080 struct ceph_writeback_ctl *ceph_wbc) 1081 { 1082 struct inode *inode = mapping->host; 1083 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1084 struct ceph_client *cl = fsc->client; 1085 1086 /* find oldest snap context with dirty data */ 1087 ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); 1088 if (!ceph_wbc->snapc) { 1089 /* hmm, why does writepages get called when there 1090 is no dirty data? */ 1091 doutc(cl, " no snap context with dirty data?\n"); 1092 return -ENODATA; 1093 } 1094 1095 doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", 1096 ceph_wbc->snapc, ceph_wbc->snapc->seq, 1097 ceph_wbc->snapc->num_snaps); 1098 1099 ceph_wbc->should_loop = false; 1100 1101 if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { 1102 /* where to start/end? */ 1103 if (wbc->range_cyclic) { 1104 ceph_wbc->index = ceph_wbc->start_index; 1105 ceph_wbc->end = -1; 1106 if (ceph_wbc->index > 0) 1107 ceph_wbc->should_loop = true; 1108 doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index); 1109 } else { 1110 ceph_wbc->index = wbc->range_start >> PAGE_SHIFT; 1111 ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; 1112 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) 1113 ceph_wbc->range_whole = true; 1114 doutc(cl, " not cyclic, %lu to %lu\n", 1115 ceph_wbc->index, ceph_wbc->end); 1116 } 1117 } else if (!ceph_wbc->head_snapc) { 1118 /* Do not respect wbc->range_{start,end}. Dirty pages 1119 * in that range can be associated with newer snapc. 1120 * They are not writeable until we write all dirty pages 1121 * associated with 'snapc' get written */ 1122 if (ceph_wbc->index > 0) 1123 ceph_wbc->should_loop = true; 1124 doutc(cl, " non-head snapc, range whole\n"); 1125 } 1126 1127 ceph_put_snap_context(ceph_wbc->last_snapc); 1128 ceph_wbc->last_snapc = ceph_wbc->snapc; 1129 1130 return 0; 1131 } 1132 1133 static inline 1134 bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc) 1135 { 1136 return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end; 1137 } 1138 1139 static inline 1140 bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc, 1141 unsigned index) 1142 { 1143 return index < ceph_wbc->nr_folios && 1144 ceph_wbc->locked_pages < ceph_wbc->max_pages; 1145 } 1146 1147 static 1148 int ceph_check_page_before_write(struct address_space *mapping, 1149 struct writeback_control *wbc, 1150 struct ceph_writeback_ctl *ceph_wbc, 1151 struct folio *folio) 1152 { 1153 struct inode *inode = mapping->host; 1154 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1155 struct ceph_client *cl = fsc->client; 1156 struct ceph_snap_context *pgsnapc; 1157 1158 /* only dirty folios, or our accounting breaks */ 1159 if (unlikely(!folio_test_dirty(folio) || folio->mapping != mapping)) { 1160 doutc(cl, "!dirty or !mapping %p\n", folio); 1161 return -ENODATA; 1162 } 1163 1164 /* only if matching snap context */ 1165 pgsnapc = page_snap_context(&folio->page); 1166 if (pgsnapc != ceph_wbc->snapc) { 1167 doutc(cl, "folio snapc %p %lld != oldest %p %lld\n", 1168 pgsnapc, pgsnapc->seq, 1169 ceph_wbc->snapc, ceph_wbc->snapc->seq); 1170 1171 if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc && 1172 wbc->sync_mode != WB_SYNC_NONE) 1173 ceph_wbc->should_loop = true; 1174 1175 return -ENODATA; 1176 } 1177 1178 if (folio_pos(folio) >= ceph_wbc->i_size) { 1179 doutc(cl, "folio at %lu beyond eof %llu\n", 1180 folio->index, ceph_wbc->i_size); 1181 1182 if ((ceph_wbc->size_stable || 1183 folio_pos(folio) >= i_size_read(inode)) && 1184 folio_clear_dirty_for_io(folio)) 1185 folio_invalidate(folio, 0, folio_size(folio)); 1186 1187 return -ENODATA; 1188 } 1189 1190 if (ceph_wbc->strip_unit_end && 1191 (folio->index > ceph_wbc->strip_unit_end)) { 1192 doutc(cl, "end of strip unit %p\n", folio); 1193 return -E2BIG; 1194 } 1195 1196 return 0; 1197 } 1198 1199 static inline 1200 void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc, 1201 unsigned int max_pages) 1202 { 1203 ceph_wbc->pages = kmalloc_objs(*ceph_wbc->pages, max_pages, GFP_NOFS); 1204 if (!ceph_wbc->pages) { 1205 ceph_wbc->from_pool = true; 1206 ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); 1207 BUG_ON(!ceph_wbc->pages); 1208 } 1209 } 1210 1211 static inline 1212 void ceph_allocate_page_array(struct address_space *mapping, 1213 struct ceph_writeback_ctl *ceph_wbc, 1214 struct folio *folio) 1215 { 1216 struct inode *inode = mapping->host; 1217 struct ceph_inode_info *ci = ceph_inode(inode); 1218 u64 objnum; 1219 u64 objoff; 1220 u32 xlen; 1221 1222 /* prepare async write request */ 1223 ceph_wbc->offset = (u64)folio_pos(folio); 1224 ceph_calc_file_object_mapping(&ci->i_layout, 1225 ceph_wbc->offset, ceph_wbc->wsize, 1226 &objnum, &objoff, &xlen); 1227 1228 ceph_wbc->num_ops = 1; 1229 ceph_wbc->strip_unit_end = folio->index + ((xlen - 1) >> PAGE_SHIFT); 1230 1231 BUG_ON(ceph_wbc->pages); 1232 ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen); 1233 __ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages); 1234 1235 ceph_wbc->len = 0; 1236 } 1237 1238 static inline 1239 bool is_folio_index_contiguous(const struct ceph_writeback_ctl *ceph_wbc, 1240 const struct folio *folio) 1241 { 1242 return folio->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT; 1243 } 1244 1245 static inline 1246 bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc) 1247 { 1248 return ceph_wbc->num_ops >= 1249 (ceph_wbc->from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS); 1250 } 1251 1252 static inline 1253 bool is_write_congestion_happened(struct ceph_fs_client *fsc) 1254 { 1255 return atomic_long_inc_return(&fsc->writeback_count) > 1256 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb); 1257 } 1258 1259 static inline int move_dirty_folio_in_page_array(struct address_space *mapping, 1260 struct writeback_control *wbc, 1261 struct ceph_writeback_ctl *ceph_wbc, struct folio *folio) 1262 { 1263 struct inode *inode = mapping->host; 1264 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1265 struct ceph_client *cl = fsc->client; 1266 struct page **pages = ceph_wbc->pages; 1267 unsigned int index = ceph_wbc->locked_pages; 1268 gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS; 1269 1270 if (IS_ENCRYPTED(inode)) { 1271 pages[index] = fscrypt_encrypt_pagecache_blocks(folio, 1272 PAGE_SIZE, 1273 0, 1274 gfp_flags); 1275 if (IS_ERR(pages[index])) { 1276 int err = PTR_ERR(pages[index]); 1277 1278 if (err == -EINVAL) { 1279 pr_err_client(cl, "inode->i_blkbits=%hhu\n", 1280 inode->i_blkbits); 1281 } 1282 1283 /* better not fail on first page! */ 1284 BUG_ON(ceph_wbc->locked_pages == 0); 1285 1286 pages[index] = NULL; 1287 return err; 1288 } 1289 } else { 1290 pages[index] = &folio->page; 1291 } 1292 1293 ceph_wbc->locked_pages++; 1294 1295 return 0; 1296 } 1297 1298 static 1299 void ceph_process_folio_batch(struct address_space *mapping, 1300 struct writeback_control *wbc, 1301 struct ceph_writeback_ctl *ceph_wbc) 1302 { 1303 struct inode *inode = mapping->host; 1304 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1305 struct ceph_client *cl = fsc->client; 1306 struct folio *folio = NULL; 1307 unsigned i; 1308 int rc; 1309 1310 for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) { 1311 folio = ceph_wbc->fbatch.folios[i]; 1312 1313 if (!folio) 1314 continue; 1315 1316 doutc(cl, "? %p idx %lu, folio_test_writeback %#x, " 1317 "folio_test_dirty %#x, folio_test_locked %#x\n", 1318 folio, folio->index, folio_test_writeback(folio), 1319 folio_test_dirty(folio), 1320 folio_test_locked(folio)); 1321 1322 if (folio_test_writeback(folio) || 1323 folio_test_private_2(folio) /* [DEPRECATED] */) { 1324 doutc(cl, "waiting on writeback %p\n", folio); 1325 folio_wait_writeback(folio); 1326 folio_wait_private_2(folio); /* [DEPRECATED] */ 1327 continue; 1328 } 1329 1330 if (ceph_wbc->locked_pages == 0) 1331 folio_lock(folio); 1332 else if (!folio_trylock(folio)) 1333 break; 1334 1335 rc = ceph_check_page_before_write(mapping, wbc, 1336 ceph_wbc, folio); 1337 if (rc == -ENODATA) { 1338 folio_unlock(folio); 1339 folio_put(folio); 1340 ceph_wbc->fbatch.folios[i] = NULL; 1341 continue; 1342 } else if (rc == -E2BIG) { 1343 folio_unlock(folio); 1344 break; 1345 } 1346 1347 if (!folio_clear_dirty_for_io(folio)) { 1348 doutc(cl, "%p !folio_clear_dirty_for_io\n", folio); 1349 folio_unlock(folio); 1350 folio_put(folio); 1351 ceph_wbc->fbatch.folios[i] = NULL; 1352 continue; 1353 } 1354 1355 /* 1356 * We have something to write. If this is 1357 * the first locked page this time through, 1358 * calculate max possible write size and 1359 * allocate a page array 1360 */ 1361 if (ceph_wbc->locked_pages == 0) { 1362 ceph_allocate_page_array(mapping, ceph_wbc, folio); 1363 } else if (!is_folio_index_contiguous(ceph_wbc, folio)) { 1364 if (is_num_ops_too_big(ceph_wbc)) { 1365 folio_redirty_for_writepage(wbc, folio); 1366 folio_unlock(folio); 1367 break; 1368 } 1369 1370 ceph_wbc->num_ops++; 1371 ceph_wbc->offset = (u64)folio_pos(folio); 1372 ceph_wbc->len = 0; 1373 } 1374 1375 /* note position of first page in fbatch */ 1376 doutc(cl, "%llx.%llx will write folio %p idx %lu\n", 1377 ceph_vinop(inode), folio, folio->index); 1378 1379 fsc->write_congested = is_write_congestion_happened(fsc); 1380 1381 rc = move_dirty_folio_in_page_array(mapping, wbc, ceph_wbc, 1382 folio); 1383 if (rc) { 1384 /* Did we just begin a new contiguous op? Nevermind! */ 1385 if (ceph_wbc->len == 0) 1386 ceph_wbc->num_ops--; 1387 1388 folio_redirty_for_writepage(wbc, folio); 1389 folio_unlock(folio); 1390 break; 1391 } 1392 1393 ceph_wbc->fbatch.folios[i] = NULL; 1394 ceph_wbc->len += folio_size(folio); 1395 } 1396 1397 ceph_wbc->processed_in_fbatch = i; 1398 } 1399 1400 static inline 1401 void ceph_shift_unused_folios_left(struct folio_batch *fbatch) 1402 { 1403 unsigned j, n = 0; 1404 1405 /* shift unused page to beginning of fbatch */ 1406 for (j = 0; j < folio_batch_count(fbatch); j++) { 1407 if (!fbatch->folios[j]) 1408 continue; 1409 1410 if (n < j) { 1411 fbatch->folios[n] = fbatch->folios[j]; 1412 } 1413 1414 n++; 1415 } 1416 1417 fbatch->nr = n; 1418 } 1419 1420 static 1421 int ceph_submit_write(struct address_space *mapping, 1422 struct writeback_control *wbc, 1423 struct ceph_writeback_ctl *ceph_wbc) 1424 { 1425 struct inode *inode = mapping->host; 1426 struct ceph_inode_info *ci = ceph_inode(inode); 1427 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1428 struct ceph_client *cl = fsc->client; 1429 struct ceph_vino vino = ceph_vino(inode); 1430 struct ceph_osd_request *req = NULL; 1431 struct page *page = NULL; 1432 bool caching = ceph_is_cache_enabled(inode); 1433 u64 offset; 1434 u64 len; 1435 unsigned i; 1436 1437 new_request: 1438 offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]); 1439 len = ceph_wbc->wsize; 1440 1441 req = ceph_osdc_new_request(&fsc->client->osdc, 1442 &ci->i_layout, vino, 1443 offset, &len, 0, ceph_wbc->num_ops, 1444 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 1445 ceph_wbc->snapc, ceph_wbc->truncate_seq, 1446 ceph_wbc->truncate_size, false); 1447 if (IS_ERR(req)) { 1448 req = ceph_osdc_new_request(&fsc->client->osdc, 1449 &ci->i_layout, vino, 1450 offset, &len, 0, 1451 min(ceph_wbc->num_ops, 1452 CEPH_OSD_SLAB_OPS), 1453 CEPH_OSD_OP_WRITE, 1454 CEPH_OSD_FLAG_WRITE, 1455 ceph_wbc->snapc, 1456 ceph_wbc->truncate_seq, 1457 ceph_wbc->truncate_size, 1458 true); 1459 BUG_ON(IS_ERR(req)); 1460 } 1461 1462 page = ceph_wbc->pages[ceph_wbc->locked_pages - 1]; 1463 BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset); 1464 1465 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 1466 for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) { 1467 struct folio *folio = ceph_wbc->fbatch.folios[i]; 1468 1469 if (!folio) 1470 continue; 1471 1472 page = &folio->page; 1473 redirty_page_for_writepage(wbc, page); 1474 unlock_page(page); 1475 } 1476 1477 for (i = 0; i < ceph_wbc->locked_pages; i++) { 1478 page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); 1479 1480 if (!page) 1481 continue; 1482 1483 redirty_page_for_writepage(wbc, page); 1484 unlock_page(page); 1485 } 1486 1487 ceph_osdc_put_request(req); 1488 return -EIO; 1489 } 1490 1491 req->r_callback = writepages_finish; 1492 req->r_inode = inode; 1493 1494 /* Format the osd request message and submit the write */ 1495 len = 0; 1496 ceph_wbc->data_pages = ceph_wbc->pages; 1497 ceph_wbc->op_idx = 0; 1498 for (i = 0; i < ceph_wbc->locked_pages; i++) { 1499 u64 cur_offset; 1500 1501 page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); 1502 cur_offset = page_offset(page); 1503 1504 /* 1505 * Discontinuity in page range? Ceph can handle that by just passing 1506 * multiple extents in the write op. 1507 */ 1508 if (offset + len != cur_offset) { 1509 /* If it's full, stop here */ 1510 if (ceph_wbc->op_idx + 1 == req->r_num_ops) 1511 break; 1512 1513 /* Kick off an fscache write with what we have so far. */ 1514 ceph_fscache_write_to_cache(inode, offset, len, caching); 1515 1516 /* Start a new extent */ 1517 osd_req_op_extent_dup_last(req, ceph_wbc->op_idx, 1518 cur_offset - offset); 1519 1520 doutc(cl, "got pages at %llu~%llu\n", offset, len); 1521 1522 osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, 1523 ceph_wbc->data_pages, 1524 len, 0, 1525 ceph_wbc->from_pool, 1526 false); 1527 osd_req_op_extent_update(req, ceph_wbc->op_idx, len); 1528 1529 len = 0; 1530 offset = cur_offset; 1531 ceph_wbc->data_pages = ceph_wbc->pages + i; 1532 ceph_wbc->op_idx++; 1533 } 1534 1535 set_page_writeback(page); 1536 1537 if (caching) 1538 ceph_set_page_fscache(page); 1539 1540 len += thp_size(page); 1541 } 1542 1543 ceph_fscache_write_to_cache(inode, offset, len, caching); 1544 1545 if (ceph_wbc->size_stable) { 1546 len = min(len, ceph_wbc->i_size - offset); 1547 } else if (i == ceph_wbc->locked_pages) { 1548 /* writepages_finish() clears writeback pages 1549 * according to the data length, so make sure 1550 * data length covers all locked pages */ 1551 u64 min_len = len + 1 - thp_size(page); 1552 len = get_writepages_data_length(inode, 1553 ceph_wbc->pages[i - 1], 1554 offset); 1555 len = max(len, min_len); 1556 } 1557 1558 if (IS_ENCRYPTED(inode)) 1559 len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); 1560 1561 doutc(cl, "got pages at %llu~%llu\n", offset, len); 1562 1563 if (IS_ENCRYPTED(inode) && 1564 ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) { 1565 pr_warn_client(cl, 1566 "bad encrypted write offset=%lld len=%llu\n", 1567 offset, len); 1568 } 1569 1570 osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, 1571 ceph_wbc->data_pages, len, 1572 0, ceph_wbc->from_pool, false); 1573 osd_req_op_extent_update(req, ceph_wbc->op_idx, len); 1574 1575 BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops); 1576 1577 ceph_wbc->from_pool = false; 1578 if (i < ceph_wbc->locked_pages) { 1579 BUG_ON(ceph_wbc->num_ops <= req->r_num_ops); 1580 ceph_wbc->num_ops -= req->r_num_ops; 1581 ceph_wbc->locked_pages -= i; 1582 1583 /* allocate new pages array for next request */ 1584 ceph_wbc->data_pages = ceph_wbc->pages; 1585 __ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages); 1586 memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i, 1587 ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); 1588 memset(ceph_wbc->data_pages + i, 0, 1589 ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); 1590 } else { 1591 BUG_ON(ceph_wbc->num_ops != req->r_num_ops); 1592 /* request message now owns the pages array */ 1593 ceph_wbc->pages = NULL; 1594 } 1595 1596 req->r_mtime = inode_get_mtime(inode); 1597 ceph_osdc_start_request(&fsc->client->osdc, req); 1598 req = NULL; 1599 1600 wbc->nr_to_write -= i; 1601 if (ceph_wbc->pages) 1602 goto new_request; 1603 1604 return 0; 1605 } 1606 1607 static 1608 void ceph_wait_until_current_writes_complete(struct address_space *mapping, 1609 struct writeback_control *wbc, 1610 struct ceph_writeback_ctl *ceph_wbc) 1611 { 1612 struct page *page; 1613 unsigned i, nr; 1614 1615 if (wbc->sync_mode != WB_SYNC_NONE && 1616 ceph_wbc->start_index == 0 && /* all dirty pages were checked */ 1617 !ceph_wbc->head_snapc) { 1618 ceph_wbc->index = 0; 1619 1620 while ((ceph_wbc->index <= ceph_wbc->end) && 1621 (nr = filemap_get_folios_tag(mapping, 1622 &ceph_wbc->index, 1623 (pgoff_t)-1, 1624 PAGECACHE_TAG_WRITEBACK, 1625 &ceph_wbc->fbatch))) { 1626 for (i = 0; i < nr; i++) { 1627 page = &ceph_wbc->fbatch.folios[i]->page; 1628 if (page_snap_context(page) != ceph_wbc->snapc) 1629 continue; 1630 wait_on_page_writeback(page); 1631 } 1632 1633 folio_batch_release(&ceph_wbc->fbatch); 1634 cond_resched(); 1635 } 1636 } 1637 } 1638 1639 /* 1640 * initiate async writeback 1641 */ 1642 static int ceph_writepages_start(struct address_space *mapping, 1643 struct writeback_control *wbc) 1644 { 1645 struct inode *inode = mapping->host; 1646 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1647 struct ceph_client *cl = fsc->client; 1648 struct ceph_writeback_ctl ceph_wbc; 1649 int rc = 0; 1650 1651 if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested) 1652 return 0; 1653 1654 doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), 1655 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 1656 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 1657 1658 if (is_forced_umount(mapping)) { 1659 /* we're in a forced umount, don't write! */ 1660 return -EIO; 1661 } 1662 1663 ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); 1664 1665 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 1666 rc = -EIO; 1667 goto out; 1668 } 1669 1670 retry: 1671 rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); 1672 if (rc == -ENODATA) { 1673 /* hmm, why does writepages get called when there 1674 is no dirty data? */ 1675 rc = 0; 1676 goto dec_osd_stopping_blocker; 1677 } 1678 1679 if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) 1680 tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); 1681 1682 while (!has_writeback_done(&ceph_wbc)) { 1683 BUG_ON(ceph_wbc.locked_pages); 1684 BUG_ON(ceph_wbc.pages); 1685 1686 ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; 1687 1688 get_more_pages: 1689 ceph_folio_batch_reinit(&ceph_wbc); 1690 1691 ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, 1692 &ceph_wbc.index, 1693 ceph_wbc.end, 1694 ceph_wbc.tag, 1695 &ceph_wbc.fbatch); 1696 doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n", 1697 ceph_wbc.tag, ceph_wbc.nr_folios); 1698 1699 if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) 1700 break; 1701 1702 process_folio_batch: 1703 ceph_process_folio_batch(mapping, wbc, &ceph_wbc); 1704 ceph_shift_unused_folios_left(&ceph_wbc.fbatch); 1705 1706 /* did we get anything? */ 1707 if (!ceph_wbc.locked_pages) 1708 goto release_folios; 1709 1710 if (ceph_wbc.processed_in_fbatch) { 1711 if (folio_batch_count(&ceph_wbc.fbatch) == 0 && 1712 ceph_wbc.locked_pages < ceph_wbc.max_pages) { 1713 doutc(cl, "reached end fbatch, trying for more\n"); 1714 goto get_more_pages; 1715 } 1716 } 1717 1718 rc = ceph_submit_write(mapping, wbc, &ceph_wbc); 1719 if (rc) 1720 goto release_folios; 1721 1722 ceph_wbc.locked_pages = 0; 1723 ceph_wbc.strip_unit_end = 0; 1724 1725 if (folio_batch_count(&ceph_wbc.fbatch) > 0) { 1726 ceph_wbc.nr_folios = 1727 folio_batch_count(&ceph_wbc.fbatch); 1728 goto process_folio_batch; 1729 } 1730 1731 /* 1732 * We stop writing back only if we are not doing 1733 * integrity sync. In case of integrity sync we have to 1734 * keep going until we have written all the pages 1735 * we tagged for writeback prior to entering this loop. 1736 */ 1737 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) 1738 ceph_wbc.done = true; 1739 1740 release_folios: 1741 doutc(cl, "folio_batch release on %d folios (%p)\n", 1742 (int)ceph_wbc.fbatch.nr, 1743 ceph_wbc.fbatch.nr ? ceph_wbc.fbatch.folios[0] : NULL); 1744 folio_batch_release(&ceph_wbc.fbatch); 1745 } 1746 1747 if (ceph_wbc.should_loop && !ceph_wbc.done) { 1748 /* more to do; loop back to beginning of file */ 1749 doutc(cl, "looping back to beginning of file\n"); 1750 /* OK even when start_index == 0 */ 1751 ceph_wbc.end = ceph_wbc.start_index - 1; 1752 1753 /* to write dirty pages associated with next snapc, 1754 * we need to wait until current writes complete */ 1755 ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc); 1756 1757 ceph_wbc.start_index = 0; 1758 ceph_wbc.index = 0; 1759 goto retry; 1760 } 1761 1762 if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0)) 1763 mapping->writeback_index = ceph_wbc.index; 1764 1765 dec_osd_stopping_blocker: 1766 ceph_dec_osd_stopping_blocker(fsc->mdsc); 1767 1768 out: 1769 ceph_put_snap_context(ceph_wbc.last_snapc); 1770 doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), 1771 rc); 1772 1773 return rc; 1774 } 1775 1776 /* 1777 * See if a given @snapc is either writeable, or already written. 1778 */ 1779 static int context_is_writeable_or_written(struct inode *inode, 1780 struct ceph_snap_context *snapc) 1781 { 1782 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL); 1783 int ret = !oldest || snapc->seq <= oldest->seq; 1784 1785 ceph_put_snap_context(oldest); 1786 return ret; 1787 } 1788 1789 /** 1790 * ceph_find_incompatible - find an incompatible context and return it 1791 * @folio: folio being dirtied 1792 * 1793 * We are only allowed to write into/dirty a folio if the folio is 1794 * clean, or already dirty within the same snap context. Returns a 1795 * conflicting context if there is one, NULL if there isn't, or a 1796 * negative error code on other errors. 1797 * 1798 * Must be called with folio lock held. 1799 */ 1800 static struct ceph_snap_context * 1801 ceph_find_incompatible(struct folio *folio) 1802 { 1803 struct inode *inode = folio->mapping->host; 1804 struct ceph_client *cl = ceph_inode_to_client(inode); 1805 struct ceph_inode_info *ci = ceph_inode(inode); 1806 1807 if (ceph_inode_is_shutdown(inode)) { 1808 doutc(cl, " %llx.%llx folio %p is shutdown\n", 1809 ceph_vinop(inode), folio); 1810 return ERR_PTR(-ESTALE); 1811 } 1812 1813 for (;;) { 1814 struct ceph_snap_context *snapc, *oldest; 1815 1816 folio_wait_writeback(folio); 1817 1818 snapc = page_snap_context(&folio->page); 1819 if (!snapc || snapc == ci->i_head_snapc) 1820 break; 1821 1822 /* 1823 * this folio is already dirty in another (older) snap 1824 * context! is it writeable now? 1825 */ 1826 oldest = get_oldest_context(inode, NULL, NULL); 1827 if (snapc->seq > oldest->seq) { 1828 /* not writeable -- return it for the caller to deal with */ 1829 ceph_put_snap_context(oldest); 1830 doutc(cl, " %llx.%llx folio %p snapc %p not current or oldest\n", 1831 ceph_vinop(inode), folio, snapc); 1832 return ceph_get_snap_context(snapc); 1833 } 1834 ceph_put_snap_context(oldest); 1835 1836 /* yay, writeable, do it now (without dropping folio lock) */ 1837 doutc(cl, " %llx.%llx folio %p snapc %p not current, but oldest\n", 1838 ceph_vinop(inode), folio, snapc); 1839 if (folio_clear_dirty_for_io(folio)) { 1840 int r = write_folio_nounlock(folio, NULL); 1841 if (r < 0) 1842 return ERR_PTR(r); 1843 } 1844 } 1845 return NULL; 1846 } 1847 1848 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 1849 struct folio **foliop, void **_fsdata) 1850 { 1851 struct inode *inode = file_inode(file); 1852 struct ceph_inode_info *ci = ceph_inode(inode); 1853 struct ceph_snap_context *snapc; 1854 1855 snapc = ceph_find_incompatible(*foliop); 1856 if (snapc) { 1857 int r; 1858 1859 folio_unlock(*foliop); 1860 folio_put(*foliop); 1861 *foliop = NULL; 1862 if (IS_ERR(snapc)) 1863 return PTR_ERR(snapc); 1864 1865 ceph_queue_writeback(inode); 1866 r = wait_event_killable(ci->i_cap_wq, 1867 context_is_writeable_or_written(inode, snapc)); 1868 ceph_put_snap_context(snapc); 1869 return r == 0 ? -EAGAIN : r; 1870 } 1871 return 0; 1872 } 1873 1874 /* 1875 * We are only allowed to write into/dirty the page if the page is 1876 * clean, or already dirty within the same snap context. 1877 */ 1878 static int ceph_write_begin(const struct kiocb *iocb, 1879 struct address_space *mapping, 1880 loff_t pos, unsigned len, 1881 struct folio **foliop, void **fsdata) 1882 { 1883 struct file *file = iocb->ki_filp; 1884 struct inode *inode = file_inode(file); 1885 struct ceph_inode_info *ci = ceph_inode(inode); 1886 int r; 1887 1888 r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, foliop, NULL); 1889 if (r < 0) 1890 return r; 1891 1892 folio_wait_private_2(*foliop); /* [DEPRECATED] */ 1893 WARN_ON_ONCE(!folio_test_locked(*foliop)); 1894 return 0; 1895 } 1896 1897 /* 1898 * we don't do anything in here that simple_write_end doesn't do 1899 * except adjust dirty page accounting 1900 */ 1901 static int ceph_write_end(const struct kiocb *iocb, 1902 struct address_space *mapping, loff_t pos, 1903 unsigned len, unsigned copied, 1904 struct folio *folio, void *fsdata) 1905 { 1906 struct file *file = iocb->ki_filp; 1907 struct inode *inode = file_inode(file); 1908 struct ceph_client *cl = ceph_inode_to_client(inode); 1909 bool check_cap = false; 1910 1911 doutc(cl, "%llx.%llx file %p folio %p %d~%d (%d)\n", ceph_vinop(inode), 1912 file, folio, (int)pos, (int)copied, (int)len); 1913 1914 if (!folio_test_uptodate(folio)) { 1915 /* just return that nothing was copied on a short copy */ 1916 if (copied < len) { 1917 copied = 0; 1918 goto out; 1919 } 1920 folio_mark_uptodate(folio); 1921 } 1922 1923 /* did file size increase? */ 1924 if (pos+copied > i_size_read(inode)) 1925 check_cap = ceph_inode_set_size(inode, pos+copied); 1926 1927 folio_mark_dirty(folio); 1928 1929 out: 1930 folio_unlock(folio); 1931 folio_put(folio); 1932 1933 if (check_cap) 1934 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY); 1935 1936 return copied; 1937 } 1938 1939 const struct address_space_operations ceph_aops = { 1940 .read_folio = netfs_read_folio, 1941 .readahead = netfs_readahead, 1942 .writepages = ceph_writepages_start, 1943 .write_begin = ceph_write_begin, 1944 .write_end = ceph_write_end, 1945 .dirty_folio = ceph_dirty_folio, 1946 .invalidate_folio = ceph_invalidate_folio, 1947 .release_folio = netfs_release_folio, 1948 .direct_IO = noop_direct_IO, 1949 .migrate_folio = filemap_migrate_folio, 1950 }; 1951 1952 static void ceph_block_sigs(sigset_t *oldset) 1953 { 1954 sigset_t mask; 1955 siginitsetinv(&mask, sigmask(SIGKILL)); 1956 sigprocmask(SIG_BLOCK, &mask, oldset); 1957 } 1958 1959 static void ceph_restore_sigs(sigset_t *oldset) 1960 { 1961 sigprocmask(SIG_SETMASK, oldset, NULL); 1962 } 1963 1964 /* 1965 * vm ops 1966 */ 1967 static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf) 1968 { 1969 struct vm_area_struct *vma = vmf->vma; 1970 struct inode *inode = file_inode(vma->vm_file); 1971 struct ceph_inode_info *ci = ceph_inode(inode); 1972 struct ceph_client *cl = ceph_inode_to_client(inode); 1973 struct ceph_file_info *fi = vma->vm_file->private_data; 1974 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT; 1975 int want, got, err; 1976 sigset_t oldset; 1977 vm_fault_t ret = VM_FAULT_SIGBUS; 1978 1979 if (ceph_inode_is_shutdown(inode)) 1980 return ret; 1981 1982 ceph_block_sigs(&oldset); 1983 1984 doutc(cl, "%llx.%llx %llu trying to get caps\n", 1985 ceph_vinop(inode), off); 1986 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1987 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1988 else 1989 want = CEPH_CAP_FILE_CACHE; 1990 1991 got = 0; 1992 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got); 1993 if (err < 0) 1994 goto out_restore; 1995 1996 doutc(cl, "%llx.%llx %llu got cap refs on %s\n", ceph_vinop(inode), 1997 off, ceph_cap_string(got)); 1998 1999 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || 2000 !ceph_has_inline_data(ci)) { 2001 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); 2002 ceph_add_rw_context(fi, &rw_ctx); 2003 ret = filemap_fault(vmf); 2004 ceph_del_rw_context(fi, &rw_ctx); 2005 doutc(cl, "%llx.%llx %llu drop cap refs %s ret %x\n", 2006 ceph_vinop(inode), off, ceph_cap_string(got), ret); 2007 } else 2008 err = -EAGAIN; 2009 2010 ceph_put_cap_refs(ci, got); 2011 2012 if (err != -EAGAIN) 2013 goto out_restore; 2014 2015 /* read inline data */ 2016 if (off >= PAGE_SIZE) { 2017 /* does not support inline data > PAGE_SIZE */ 2018 ret = VM_FAULT_SIGBUS; 2019 } else { 2020 struct address_space *mapping = inode->i_mapping; 2021 struct page *page; 2022 2023 filemap_invalidate_lock_shared(mapping); 2024 page = find_or_create_page(mapping, 0, 2025 mapping_gfp_constraint(mapping, ~__GFP_FS)); 2026 if (!page) { 2027 ret = VM_FAULT_OOM; 2028 goto out_inline; 2029 } 2030 err = __ceph_do_getattr(inode, page, 2031 CEPH_STAT_CAP_INLINE_DATA, true); 2032 if (err < 0 || off >= i_size_read(inode)) { 2033 unlock_page(page); 2034 put_page(page); 2035 ret = vmf_error(err); 2036 goto out_inline; 2037 } 2038 if (err < PAGE_SIZE) 2039 zero_user_segment(page, err, PAGE_SIZE); 2040 else 2041 flush_dcache_page(page); 2042 SetPageUptodate(page); 2043 vmf->page = page; 2044 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; 2045 out_inline: 2046 filemap_invalidate_unlock_shared(mapping); 2047 doutc(cl, "%llx.%llx %llu read inline data ret %x\n", 2048 ceph_vinop(inode), off, ret); 2049 } 2050 out_restore: 2051 ceph_restore_sigs(&oldset); 2052 if (err < 0) 2053 ret = vmf_error(err); 2054 2055 return ret; 2056 } 2057 2058 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) 2059 { 2060 struct vm_area_struct *vma = vmf->vma; 2061 struct inode *inode = file_inode(vma->vm_file); 2062 struct ceph_client *cl = ceph_inode_to_client(inode); 2063 struct ceph_inode_info *ci = ceph_inode(inode); 2064 struct ceph_file_info *fi = vma->vm_file->private_data; 2065 struct ceph_cap_flush *prealloc_cf; 2066 struct folio *folio = page_folio(vmf->page); 2067 loff_t off = folio_pos(folio); 2068 loff_t size = i_size_read(inode); 2069 size_t len; 2070 int want, got, err; 2071 sigset_t oldset; 2072 vm_fault_t ret = VM_FAULT_SIGBUS; 2073 2074 if (ceph_inode_is_shutdown(inode)) 2075 return ret; 2076 2077 prealloc_cf = ceph_alloc_cap_flush(); 2078 if (!prealloc_cf) 2079 return VM_FAULT_OOM; 2080 2081 sb_start_pagefault(inode->i_sb); 2082 ceph_block_sigs(&oldset); 2083 2084 if (off + folio_size(folio) <= size) 2085 len = folio_size(folio); 2086 else 2087 len = offset_in_folio(folio, size); 2088 2089 doutc(cl, "%llx.%llx %llu~%zd getting caps i_size %llu\n", 2090 ceph_vinop(inode), off, len, size); 2091 if (fi->fmode & CEPH_FILE_MODE_LAZY) 2092 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 2093 else 2094 want = CEPH_CAP_FILE_BUFFER; 2095 2096 got = 0; 2097 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got); 2098 if (err < 0) 2099 goto out_free; 2100 2101 doutc(cl, "%llx.%llx %llu~%zd got cap refs on %s\n", ceph_vinop(inode), 2102 off, len, ceph_cap_string(got)); 2103 2104 /* Update time before taking folio lock */ 2105 file_update_time(vma->vm_file); 2106 inode_inc_iversion_raw(inode); 2107 2108 do { 2109 struct ceph_snap_context *snapc; 2110 2111 folio_lock(folio); 2112 2113 if (folio_mkwrite_check_truncate(folio, inode) < 0) { 2114 folio_unlock(folio); 2115 ret = VM_FAULT_NOPAGE; 2116 break; 2117 } 2118 2119 snapc = ceph_find_incompatible(folio); 2120 if (!snapc) { 2121 /* success. we'll keep the folio locked. */ 2122 folio_mark_dirty(folio); 2123 ret = VM_FAULT_LOCKED; 2124 break; 2125 } 2126 2127 folio_unlock(folio); 2128 2129 if (IS_ERR(snapc)) { 2130 ret = VM_FAULT_SIGBUS; 2131 break; 2132 } 2133 2134 ceph_queue_writeback(inode); 2135 err = wait_event_killable(ci->i_cap_wq, 2136 context_is_writeable_or_written(inode, snapc)); 2137 ceph_put_snap_context(snapc); 2138 } while (err == 0); 2139 2140 if (ret == VM_FAULT_LOCKED) { 2141 int dirty; 2142 spin_lock(&ci->i_ceph_lock); 2143 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 2144 &prealloc_cf); 2145 spin_unlock(&ci->i_ceph_lock); 2146 if (dirty) 2147 __mark_inode_dirty(inode, dirty); 2148 } 2149 2150 doutc(cl, "%llx.%llx %llu~%zd dropping cap refs on %s ret %x\n", 2151 ceph_vinop(inode), off, len, ceph_cap_string(got), ret); 2152 ceph_put_cap_refs_async(ci, got); 2153 out_free: 2154 ceph_restore_sigs(&oldset); 2155 sb_end_pagefault(inode->i_sb); 2156 ceph_free_cap_flush(prealloc_cf); 2157 if (err < 0) 2158 ret = vmf_error(err); 2159 return ret; 2160 } 2161 2162 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, 2163 char *data, size_t len) 2164 { 2165 struct ceph_client *cl = ceph_inode_to_client(inode); 2166 struct address_space *mapping = inode->i_mapping; 2167 struct page *page; 2168 2169 if (locked_page) { 2170 page = locked_page; 2171 } else { 2172 if (i_size_read(inode) == 0) 2173 return; 2174 page = find_or_create_page(mapping, 0, 2175 mapping_gfp_constraint(mapping, 2176 ~__GFP_FS)); 2177 if (!page) 2178 return; 2179 if (PageUptodate(page)) { 2180 unlock_page(page); 2181 put_page(page); 2182 return; 2183 } 2184 } 2185 2186 doutc(cl, "%p %llx.%llx len %zu locked_page %p\n", inode, 2187 ceph_vinop(inode), len, locked_page); 2188 2189 if (len > 0) { 2190 void *kaddr = kmap_atomic(page); 2191 memcpy(kaddr, data, len); 2192 kunmap_atomic(kaddr); 2193 } 2194 2195 if (page != locked_page) { 2196 if (len < PAGE_SIZE) 2197 zero_user_segment(page, len, PAGE_SIZE); 2198 else 2199 flush_dcache_page(page); 2200 2201 SetPageUptodate(page); 2202 unlock_page(page); 2203 put_page(page); 2204 } 2205 } 2206 2207 int ceph_uninline_data(struct file *file) 2208 { 2209 struct inode *inode = file_inode(file); 2210 struct ceph_inode_info *ci = ceph_inode(inode); 2211 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 2212 struct ceph_client *cl = fsc->client; 2213 struct ceph_osd_request *req = NULL; 2214 struct ceph_cap_flush *prealloc_cf = NULL; 2215 struct folio *folio = NULL; 2216 struct ceph_snap_context *snapc = NULL; 2217 u64 inline_version = CEPH_INLINE_NONE; 2218 struct page *pages[1]; 2219 int err = 0; 2220 u64 len; 2221 2222 spin_lock(&ci->i_ceph_lock); 2223 inline_version = ci->i_inline_version; 2224 spin_unlock(&ci->i_ceph_lock); 2225 2226 doutc(cl, "%llx.%llx inline_version %llu\n", ceph_vinop(inode), 2227 inline_version); 2228 2229 if (ceph_inode_is_shutdown(inode)) { 2230 err = -EIO; 2231 goto out; 2232 } 2233 2234 if (inline_version == CEPH_INLINE_NONE) 2235 return 0; 2236 2237 prealloc_cf = ceph_alloc_cap_flush(); 2238 if (!prealloc_cf) 2239 return -ENOMEM; 2240 2241 if (inline_version == 1) /* initial version, no data */ 2242 goto out_uninline; 2243 2244 down_read(&fsc->mdsc->snap_rwsem); 2245 spin_lock(&ci->i_ceph_lock); 2246 if (__ceph_have_pending_cap_snap(ci)) { 2247 struct ceph_cap_snap *capsnap = 2248 list_last_entry(&ci->i_cap_snaps, 2249 struct ceph_cap_snap, 2250 ci_item); 2251 snapc = ceph_get_snap_context(capsnap->context); 2252 } else { 2253 if (!ci->i_head_snapc) { 2254 ci->i_head_snapc = ceph_get_snap_context( 2255 ci->i_snap_realm->cached_context); 2256 } 2257 snapc = ceph_get_snap_context(ci->i_head_snapc); 2258 } 2259 spin_unlock(&ci->i_ceph_lock); 2260 up_read(&fsc->mdsc->snap_rwsem); 2261 2262 folio = read_mapping_folio(inode->i_mapping, 0, file); 2263 if (IS_ERR(folio)) { 2264 err = PTR_ERR(folio); 2265 goto out; 2266 } 2267 2268 folio_lock(folio); 2269 2270 len = i_size_read(inode); 2271 if (len > folio_size(folio)) 2272 len = folio_size(folio); 2273 2274 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 2275 ceph_vino(inode), 0, &len, 0, 1, 2276 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, 2277 snapc, 0, 0, false); 2278 if (IS_ERR(req)) { 2279 err = PTR_ERR(req); 2280 goto out_unlock; 2281 } 2282 2283 req->r_mtime = inode_get_mtime(inode); 2284 ceph_osdc_start_request(&fsc->client->osdc, req); 2285 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 2286 ceph_osdc_put_request(req); 2287 if (err < 0) 2288 goto out_unlock; 2289 2290 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 2291 ceph_vino(inode), 0, &len, 1, 3, 2292 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 2293 snapc, ci->i_truncate_seq, 2294 ci->i_truncate_size, false); 2295 if (IS_ERR(req)) { 2296 err = PTR_ERR(req); 2297 goto out_unlock; 2298 } 2299 2300 pages[0] = folio_page(folio, 0); 2301 osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false); 2302 2303 { 2304 __le64 xattr_buf = cpu_to_le64(inline_version); 2305 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, 2306 "inline_version", &xattr_buf, 2307 sizeof(xattr_buf), 2308 CEPH_OSD_CMPXATTR_OP_GT, 2309 CEPH_OSD_CMPXATTR_MODE_U64); 2310 if (err) 2311 goto out_put_req; 2312 } 2313 2314 { 2315 char xattr_buf[32]; 2316 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), 2317 "%llu", inline_version); 2318 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, 2319 "inline_version", 2320 xattr_buf, xattr_len, 0, 0); 2321 if (err) 2322 goto out_put_req; 2323 } 2324 2325 req->r_mtime = inode_get_mtime(inode); 2326 ceph_osdc_start_request(&fsc->client->osdc, req); 2327 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 2328 2329 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 2330 req->r_end_latency, len, err); 2331 2332 out_uninline: 2333 if (!err) { 2334 int dirty; 2335 2336 /* Set to CAP_INLINE_NONE and dirty the caps */ 2337 down_read(&fsc->mdsc->snap_rwsem); 2338 spin_lock(&ci->i_ceph_lock); 2339 ci->i_inline_version = CEPH_INLINE_NONE; 2340 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); 2341 spin_unlock(&ci->i_ceph_lock); 2342 up_read(&fsc->mdsc->snap_rwsem); 2343 if (dirty) 2344 __mark_inode_dirty(inode, dirty); 2345 } 2346 out_put_req: 2347 ceph_osdc_put_request(req); 2348 if (err == -ECANCELED) 2349 err = 0; 2350 out_unlock: 2351 if (folio) { 2352 folio_unlock(folio); 2353 folio_put(folio); 2354 } 2355 out: 2356 ceph_put_snap_context(snapc); 2357 ceph_free_cap_flush(prealloc_cf); 2358 doutc(cl, "%llx.%llx inline_version %llu = %d\n", 2359 ceph_vinop(inode), inline_version, err); 2360 return err; 2361 } 2362 2363 static const struct vm_operations_struct ceph_vmops = { 2364 .fault = ceph_filemap_fault, 2365 .page_mkwrite = ceph_page_mkwrite, 2366 }; 2367 2368 int ceph_mmap_prepare(struct vm_area_desc *desc) 2369 { 2370 struct address_space *mapping = desc->file->f_mapping; 2371 2372 if (!mapping->a_ops->read_folio) 2373 return -ENOEXEC; 2374 desc->vm_ops = &ceph_vmops; 2375 return 0; 2376 } 2377 2378 enum { 2379 POOL_READ = 1, 2380 POOL_WRITE = 2, 2381 }; 2382 2383 static int __ceph_pool_perm_get(struct ceph_inode_info *ci, 2384 s64 pool, struct ceph_string *pool_ns) 2385 { 2386 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(&ci->netfs.inode); 2387 struct ceph_mds_client *mdsc = fsc->mdsc; 2388 struct ceph_client *cl = fsc->client; 2389 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; 2390 struct rb_node **p, *parent; 2391 struct ceph_pool_perm *perm; 2392 struct page **pages; 2393 size_t pool_ns_len; 2394 int err = 0, err2 = 0, have = 0; 2395 2396 down_read(&mdsc->pool_perm_rwsem); 2397 p = &mdsc->pool_perm_tree.rb_node; 2398 while (*p) { 2399 perm = rb_entry(*p, struct ceph_pool_perm, node); 2400 if (pool < perm->pool) 2401 p = &(*p)->rb_left; 2402 else if (pool > perm->pool) 2403 p = &(*p)->rb_right; 2404 else { 2405 int ret = ceph_compare_string(pool_ns, 2406 perm->pool_ns, 2407 perm->pool_ns_len); 2408 if (ret < 0) 2409 p = &(*p)->rb_left; 2410 else if (ret > 0) 2411 p = &(*p)->rb_right; 2412 else { 2413 have = perm->perm; 2414 break; 2415 } 2416 } 2417 } 2418 up_read(&mdsc->pool_perm_rwsem); 2419 if (*p) 2420 goto out; 2421 2422 if (pool_ns) 2423 doutc(cl, "pool %lld ns %.*s no perm cached\n", pool, 2424 (int)pool_ns->len, pool_ns->str); 2425 else 2426 doutc(cl, "pool %lld no perm cached\n", pool); 2427 2428 down_write(&mdsc->pool_perm_rwsem); 2429 p = &mdsc->pool_perm_tree.rb_node; 2430 parent = NULL; 2431 while (*p) { 2432 parent = *p; 2433 perm = rb_entry(parent, struct ceph_pool_perm, node); 2434 if (pool < perm->pool) 2435 p = &(*p)->rb_left; 2436 else if (pool > perm->pool) 2437 p = &(*p)->rb_right; 2438 else { 2439 int ret = ceph_compare_string(pool_ns, 2440 perm->pool_ns, 2441 perm->pool_ns_len); 2442 if (ret < 0) 2443 p = &(*p)->rb_left; 2444 else if (ret > 0) 2445 p = &(*p)->rb_right; 2446 else { 2447 have = perm->perm; 2448 break; 2449 } 2450 } 2451 } 2452 if (*p) { 2453 up_write(&mdsc->pool_perm_rwsem); 2454 goto out; 2455 } 2456 2457 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2458 1, false, GFP_NOFS); 2459 if (!rd_req) { 2460 err = -ENOMEM; 2461 goto out_unlock; 2462 } 2463 2464 rd_req->r_flags = CEPH_OSD_FLAG_READ; 2465 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); 2466 rd_req->r_base_oloc.pool = pool; 2467 if (pool_ns) 2468 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns); 2469 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); 2470 2471 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); 2472 if (err) 2473 goto out_unlock; 2474 2475 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2476 1, false, GFP_NOFS); 2477 if (!wr_req) { 2478 err = -ENOMEM; 2479 goto out_unlock; 2480 } 2481 2482 wr_req->r_flags = CEPH_OSD_FLAG_WRITE; 2483 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 2484 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); 2485 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); 2486 2487 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); 2488 if (err) 2489 goto out_unlock; 2490 2491 /* one page should be large enough for STAT data */ 2492 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 2493 if (IS_ERR(pages)) { 2494 err = PTR_ERR(pages); 2495 goto out_unlock; 2496 } 2497 2498 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 2499 0, false, true); 2500 ceph_osdc_start_request(&fsc->client->osdc, rd_req); 2501 2502 wr_req->r_mtime = inode_get_mtime(&ci->netfs.inode); 2503 ceph_osdc_start_request(&fsc->client->osdc, wr_req); 2504 2505 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); 2506 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); 2507 2508 if (err >= 0 || err == -ENOENT) 2509 have |= POOL_READ; 2510 else if (err != -EPERM) { 2511 if (err == -EBLOCKLISTED) 2512 fsc->blocklisted = true; 2513 goto out_unlock; 2514 } 2515 2516 if (err2 == 0 || err2 == -EEXIST) 2517 have |= POOL_WRITE; 2518 else if (err2 != -EPERM) { 2519 if (err2 == -EBLOCKLISTED) 2520 fsc->blocklisted = true; 2521 err = err2; 2522 goto out_unlock; 2523 } 2524 2525 pool_ns_len = pool_ns ? pool_ns->len : 0; 2526 perm = kmalloc_flex(*perm, pool_ns, pool_ns_len + 1, GFP_NOFS); 2527 if (!perm) { 2528 err = -ENOMEM; 2529 goto out_unlock; 2530 } 2531 2532 perm->pool = pool; 2533 perm->perm = have; 2534 perm->pool_ns_len = pool_ns_len; 2535 if (pool_ns_len > 0) 2536 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len); 2537 perm->pool_ns[pool_ns_len] = 0; 2538 2539 rb_link_node(&perm->node, parent, p); 2540 rb_insert_color(&perm->node, &mdsc->pool_perm_tree); 2541 err = 0; 2542 out_unlock: 2543 up_write(&mdsc->pool_perm_rwsem); 2544 2545 ceph_osdc_put_request(rd_req); 2546 ceph_osdc_put_request(wr_req); 2547 out: 2548 if (!err) 2549 err = have; 2550 if (pool_ns) 2551 doutc(cl, "pool %lld ns %.*s result = %d\n", pool, 2552 (int)pool_ns->len, pool_ns->str, err); 2553 else 2554 doutc(cl, "pool %lld result = %d\n", pool, err); 2555 return err; 2556 } 2557 2558 int ceph_pool_perm_check(struct inode *inode, int need) 2559 { 2560 struct ceph_client *cl = ceph_inode_to_client(inode); 2561 struct ceph_inode_info *ci = ceph_inode(inode); 2562 struct ceph_string *pool_ns; 2563 s64 pool; 2564 int ret, flags; 2565 2566 /* Only need to do this for regular files */ 2567 if (!S_ISREG(inode->i_mode)) 2568 return 0; 2569 2570 if (ci->i_vino.snap != CEPH_NOSNAP) { 2571 /* 2572 * Pool permission check needs to write to the first object. 2573 * But for snapshot, head of the first object may have already 2574 * been deleted. Skip check to avoid creating orphan object. 2575 */ 2576 return 0; 2577 } 2578 2579 if (ceph_test_mount_opt(ceph_inode_to_fs_client(inode), 2580 NOPOOLPERM)) 2581 return 0; 2582 2583 spin_lock(&ci->i_ceph_lock); 2584 flags = ci->i_ceph_flags; 2585 pool = ci->i_layout.pool_id; 2586 spin_unlock(&ci->i_ceph_lock); 2587 check: 2588 if (flags & CEPH_I_POOL_PERM) { 2589 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { 2590 doutc(cl, "pool %lld no read perm\n", pool); 2591 return -EPERM; 2592 } 2593 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { 2594 doutc(cl, "pool %lld no write perm\n", pool); 2595 return -EPERM; 2596 } 2597 return 0; 2598 } 2599 2600 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); 2601 ret = __ceph_pool_perm_get(ci, pool, pool_ns); 2602 ceph_put_string(pool_ns); 2603 if (ret < 0) 2604 return ret; 2605 2606 flags = CEPH_I_POOL_PERM; 2607 if (ret & POOL_READ) 2608 flags |= CEPH_I_POOL_RD; 2609 if (ret & POOL_WRITE) 2610 flags |= CEPH_I_POOL_WR; 2611 2612 spin_lock(&ci->i_ceph_lock); 2613 if (pool == ci->i_layout.pool_id && 2614 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) { 2615 ci->i_ceph_flags |= flags; 2616 } else { 2617 pool = ci->i_layout.pool_id; 2618 flags = ci->i_ceph_flags; 2619 } 2620 spin_unlock(&ci->i_ceph_lock); 2621 goto check; 2622 } 2623 2624 void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) 2625 { 2626 struct ceph_pool_perm *perm; 2627 struct rb_node *n; 2628 2629 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { 2630 n = rb_first(&mdsc->pool_perm_tree); 2631 perm = rb_entry(n, struct ceph_pool_perm, node); 2632 rb_erase(n, &mdsc->pool_perm_tree); 2633 kfree(perm); 2634 } 2635 } 2636