1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/backing-dev.h> 5 #include <linux/fs.h> 6 #include <linux/mm.h> 7 #include <linux/swap.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/folio_batch.h> 11 #include <linux/task_io_accounting_ops.h> 12 #include <linux/signal.h> 13 #include <linux/iversion.h> 14 #include <linux/ktime.h> 15 #include <linux/netfs.h> 16 #include <trace/events/netfs.h> 17 18 #include "super.h" 19 #include "mds_client.h" 20 #include "cache.h" 21 #include "metric.h" 22 #include "subvolume_metrics.h" 23 #include "crypto.h" 24 #include <linux/ceph/osd_client.h> 25 #include <linux/ceph/striper.h> 26 27 /* 28 * Ceph address space ops. 29 * 30 * There are a few funny things going on here. 31 * 32 * The page->private field is used to reference a struct 33 * ceph_snap_context for _every_ dirty page. This indicates which 34 * snapshot the page was logically dirtied in, and thus which snap 35 * context needs to be associated with the osd write during writeback. 36 * 37 * Similarly, struct ceph_inode_info maintains a set of counters to 38 * count dirty pages on the inode. In the absence of snapshots, 39 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. 40 * 41 * When a snapshot is taken (that is, when the client receives 42 * notification that a snapshot was taken), each inode with caps and 43 * with dirty pages (dirty pages implies there is a cap) gets a new 44 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending 45 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is 46 * moved to capsnap->dirty. (Unless a sync write is currently in 47 * progress. In that case, the capsnap is said to be "pending", new 48 * writes cannot start, and the capsnap isn't "finalized" until the 49 * write completes (or fails) and a final size/mtime for the inode for 50 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. 51 * 52 * On writeback, we must submit writes to the osd IN SNAP ORDER. So, 53 * we look for the first capsnap in i_cap_snaps and write out pages in 54 * that snap context _only_. Then we move on to the next capsnap, 55 * eventually reaching the "live" or "head" context (i.e., pages that 56 * are not yet snapped) and are writing the most recently dirtied 57 * pages. 58 * 59 * Invalidate and so forth must take care to ensure the dirty page 60 * accounting is preserved. 61 */ 62 63 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10)) 64 #define CONGESTION_OFF_THRESH(congestion_kb) \ 65 (CONGESTION_ON_THRESH(congestion_kb) - \ 66 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) 67 68 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 69 struct folio **foliop, void **_fsdata); 70 71 static inline struct ceph_snap_context *page_snap_context(struct page *page) 72 { 73 if (PagePrivate(page)) 74 return (void *)page->private; 75 return NULL; 76 } 77 78 /* 79 * Dirty a page. Optimistically adjust accounting, on the assumption 80 * that we won't race with invalidate. If we do, readjust. 81 */ 82 static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) 83 { 84 struct inode *inode = mapping->host; 85 struct ceph_client *cl = ceph_inode_to_client(inode); 86 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 87 struct ceph_inode_info *ci; 88 struct ceph_snap_context *snapc; 89 90 if (folio_test_dirty(folio)) { 91 doutc(cl, "%llx.%llx %p idx %lu -- already dirty\n", 92 ceph_vinop(inode), folio, folio->index); 93 VM_BUG_ON_FOLIO(!folio_test_private(folio), folio); 94 return false; 95 } 96 97 atomic64_inc(&mdsc->dirty_folios); 98 99 ci = ceph_inode(inode); 100 101 /* dirty the head */ 102 spin_lock(&ci->i_ceph_lock); 103 if (__ceph_have_pending_cap_snap(ci)) { 104 struct ceph_cap_snap *capsnap = 105 list_last_entry(&ci->i_cap_snaps, 106 struct ceph_cap_snap, 107 ci_item); 108 snapc = ceph_get_snap_context(capsnap->context); 109 capsnap->dirty_pages++; 110 } else { 111 BUG_ON(!ci->i_head_snapc); 112 snapc = ceph_get_snap_context(ci->i_head_snapc); 113 ++ci->i_wrbuffer_ref_head; 114 } 115 if (ci->i_wrbuffer_ref == 0) 116 ihold(inode); 117 ++ci->i_wrbuffer_ref; 118 doutc(cl, "%llx.%llx %p idx %lu head %d/%d -> %d/%d " 119 "snapc %p seq %lld (%d snaps)\n", 120 ceph_vinop(inode), folio, folio->index, 121 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, 122 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 123 snapc, snapc->seq, snapc->num_snaps); 124 spin_unlock(&ci->i_ceph_lock); 125 126 /* 127 * Reference snap context in folio->private. Also set 128 * PagePrivate so that we get invalidate_folio callback. 129 */ 130 VM_WARN_ON_FOLIO(folio->private, folio); 131 folio_attach_private(folio, snapc); 132 133 return ceph_fscache_dirty_folio(mapping, folio); 134 } 135 136 /* 137 * If we are truncating the full folio (i.e. offset == 0), adjust the 138 * dirty folio counters appropriately. Only called if there is private 139 * data on the folio. 140 */ 141 static void ceph_invalidate_folio(struct folio *folio, size_t offset, 142 size_t length) 143 { 144 struct inode *inode = folio->mapping->host; 145 struct ceph_client *cl = ceph_inode_to_client(inode); 146 struct ceph_inode_info *ci = ceph_inode(inode); 147 struct ceph_snap_context *snapc; 148 149 150 if (offset != 0 || length != folio_size(folio)) { 151 doutc(cl, "%llx.%llx idx %lu partial dirty page %zu~%zu\n", 152 ceph_vinop(inode), folio->index, offset, length); 153 return; 154 } 155 156 WARN_ON(!folio_test_locked(folio)); 157 if (folio_test_private(folio)) { 158 doutc(cl, "%llx.%llx idx %lu full dirty page\n", 159 ceph_vinop(inode), folio->index); 160 161 snapc = folio_detach_private(folio); 162 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 163 ceph_put_snap_context(snapc); 164 } 165 166 netfs_invalidate_folio(folio, offset, length); 167 } 168 169 static void ceph_netfs_expand_readahead(struct netfs_io_request *rreq) 170 { 171 struct inode *inode = rreq->inode; 172 struct ceph_inode_info *ci = ceph_inode(inode); 173 struct ceph_file_layout *lo = &ci->i_layout; 174 unsigned long max_pages = inode->i_sb->s_bdi->ra_pages; 175 loff_t end = rreq->start + rreq->len, new_end; 176 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 177 unsigned long max_len; 178 u32 blockoff; 179 180 if (priv) { 181 /* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */ 182 if (priv->file_ra_disabled) 183 max_pages = 0; 184 else 185 max_pages = priv->file_ra_pages; 186 187 } 188 189 /* Readahead is disabled */ 190 if (!max_pages) 191 return; 192 193 max_len = max_pages << PAGE_SHIFT; 194 195 /* 196 * Try to expand the length forward by rounding up it to the next 197 * block, but do not exceed the file size, unless the original 198 * request already exceeds it. 199 */ 200 new_end = umin(round_up(end, lo->stripe_unit), rreq->i_size); 201 if (new_end > end && new_end <= rreq->start + max_len) 202 rreq->len = new_end - rreq->start; 203 204 /* Try to expand the start downward */ 205 div_u64_rem(rreq->start, lo->stripe_unit, &blockoff); 206 if (rreq->len + blockoff <= max_len) { 207 rreq->start -= blockoff; 208 rreq->len += blockoff; 209 } 210 } 211 212 static void finish_netfs_read(struct ceph_osd_request *req) 213 { 214 struct inode *inode = req->r_inode; 215 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 216 struct ceph_client *cl = fsc->client; 217 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 218 struct netfs_io_subrequest *subreq = req->r_priv; 219 struct ceph_osd_req_op *op = &req->r_ops[0]; 220 int err = req->r_result; 221 bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); 222 223 ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, 224 req->r_end_latency, osd_data->length, err); 225 226 doutc(cl, "result %d subreq->len=%zu i_size=%lld\n", req->r_result, 227 subreq->len, i_size_read(req->r_inode)); 228 229 /* no object means success but no data */ 230 if (err == -ENOENT) { 231 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 232 __set_bit(NETFS_SREQ_MADE_PROGRESS, &subreq->flags); 233 err = 0; 234 } else if (err == -EBLOCKLISTED) { 235 fsc->blocklisted = true; 236 } 237 238 if (err >= 0) { 239 if (sparse && err > 0) 240 err = ceph_sparse_ext_map_end(op); 241 if (err < subreq->len && 242 subreq->rreq->origin != NETFS_UNBUFFERED_READ && 243 subreq->rreq->origin != NETFS_DIO_READ) 244 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 245 if (IS_ENCRYPTED(inode) && err > 0) { 246 err = ceph_fscrypt_decrypt_extents(inode, 247 osd_data->pages, subreq->start, 248 op->extent.sparse_ext, 249 op->extent.sparse_ext_cnt); 250 if (err > subreq->len) 251 err = subreq->len; 252 } 253 if (err > 0) 254 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 255 } 256 257 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 258 ceph_put_page_vector(osd_data->pages, 259 calc_pages_for(osd_data->alignment, 260 osd_data->length), false); 261 } 262 if (err > 0) { 263 ceph_subvolume_metrics_record_io(fsc->mdsc, ceph_inode(inode), 264 false, err, 265 req->r_start_latency, 266 req->r_end_latency); 267 subreq->transferred = err; 268 err = 0; 269 } 270 subreq->error = err; 271 trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); 272 netfs_read_subreq_terminated(subreq); 273 iput(req->r_inode); 274 ceph_dec_osd_stopping_blocker(fsc->mdsc); 275 } 276 277 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq) 278 { 279 struct netfs_io_request *rreq = subreq->rreq; 280 struct inode *inode = rreq->inode; 281 struct ceph_mds_reply_info_parsed *rinfo; 282 struct ceph_mds_reply_info_in *iinfo; 283 struct ceph_mds_request *req; 284 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 285 struct ceph_inode_info *ci = ceph_inode(inode); 286 ssize_t err = 0; 287 size_t len; 288 int mode; 289 290 if (rreq->origin != NETFS_UNBUFFERED_READ && 291 rreq->origin != NETFS_DIO_READ) 292 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 293 __clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags); 294 295 if (subreq->start >= inode->i_size) 296 goto out; 297 298 /* We need to fetch the inline data. */ 299 mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA); 300 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode); 301 if (IS_ERR(req)) { 302 err = PTR_ERR(req); 303 goto out; 304 } 305 req->r_ino1 = ci->i_vino; 306 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA); 307 req->r_num_caps = 2; 308 309 trace_netfs_sreq(subreq, netfs_sreq_trace_submit); 310 err = ceph_mdsc_do_request(mdsc, NULL, req); 311 if (err < 0) 312 goto out; 313 314 rinfo = &req->r_reply_info; 315 iinfo = &rinfo->targeti; 316 if (iinfo->inline_version == CEPH_INLINE_NONE) { 317 /* The data got uninlined */ 318 ceph_mdsc_put_request(req); 319 return false; 320 } 321 322 len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len); 323 err = copy_to_iter(iinfo->inline_data + subreq->start, len, &subreq->io_iter); 324 if (err == 0) { 325 err = -EFAULT; 326 } else { 327 subreq->transferred += err; 328 err = 0; 329 } 330 331 ceph_mdsc_put_request(req); 332 out: 333 subreq->error = err; 334 trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress); 335 netfs_read_subreq_terminated(subreq); 336 return true; 337 } 338 339 static int ceph_netfs_prepare_read(struct netfs_io_subrequest *subreq) 340 { 341 struct netfs_io_request *rreq = subreq->rreq; 342 struct inode *inode = rreq->inode; 343 struct ceph_inode_info *ci = ceph_inode(inode); 344 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 345 u64 objno, objoff; 346 u32 xlen; 347 348 /* Truncate the extent at the end of the current block */ 349 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len, 350 &objno, &objoff, &xlen); 351 rreq->io_streams[0].sreq_max_len = umin(xlen, fsc->mount_options->rsize); 352 return 0; 353 } 354 355 static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq) 356 { 357 struct netfs_io_request *rreq = subreq->rreq; 358 struct inode *inode = rreq->inode; 359 struct ceph_inode_info *ci = ceph_inode(inode); 360 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 361 struct ceph_client *cl = fsc->client; 362 struct ceph_osd_request *req = NULL; 363 struct ceph_vino vino = ceph_vino(inode); 364 int err; 365 u64 len; 366 bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD); 367 u64 off = subreq->start; 368 int extent_cnt; 369 370 if (ceph_inode_is_shutdown(inode)) { 371 err = -EIO; 372 goto out; 373 } 374 375 if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq)) 376 return; 377 378 // TODO: This rounding here is slightly dodgy. It *should* work, for 379 // now, as the cache only deals in blocks that are a multiple of 380 // PAGE_SIZE and fscrypt blocks are at most PAGE_SIZE. What needs to 381 // happen is for the fscrypt driving to be moved into netfslib and the 382 // data in the cache also to be stored encrypted. 383 len = subreq->len; 384 ceph_fscrypt_adjust_off_and_len(inode, &off, &len); 385 386 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, 387 off, &len, 0, 1, sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ, 388 CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, 389 ci->i_truncate_size, false); 390 if (IS_ERR(req)) { 391 err = PTR_ERR(req); 392 req = NULL; 393 goto out; 394 } 395 396 if (sparse) { 397 extent_cnt = __ceph_sparse_read_ext_count(inode, len); 398 err = ceph_alloc_sparse_ext_map(&req->r_ops[0], extent_cnt); 399 if (err) 400 goto out; 401 } 402 403 doutc(cl, "%llx.%llx pos=%llu orig_len=%zu len=%llu\n", 404 ceph_vinop(inode), subreq->start, subreq->len, len); 405 406 /* 407 * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for 408 * encrypted inodes. We'd need infrastructure that handles an iov_iter 409 * instead of page arrays, and we don't have that as of yet. Once the 410 * dust settles on the write helpers and encrypt/decrypt routines for 411 * netfs, we should be able to rework this. 412 */ 413 if (IS_ENCRYPTED(inode)) { 414 struct page **pages; 415 size_t page_off; 416 417 /* 418 * FIXME: io_iter.count needs to be corrected to aligned 419 * length. Otherwise, iov_iter_get_pages_alloc2() operates 420 * with the initial unaligned length value. As a result, 421 * ceph_msg_data_cursor_init() triggers BUG_ON() in the case 422 * if msg->sparse_read_total > msg->data_length. 423 */ 424 subreq->io_iter.count = len; 425 426 err = iov_iter_get_pages_alloc2(&subreq->io_iter, &pages, len, &page_off); 427 if (err < 0) { 428 doutc(cl, "%llx.%llx failed to allocate pages, %d\n", 429 ceph_vinop(inode), err); 430 goto out; 431 } 432 433 /* should always give us a page-aligned read */ 434 WARN_ON_ONCE(page_off); 435 len = err; 436 err = 0; 437 438 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, 439 false); 440 } else { 441 osd_req_op_extent_osd_iter(req, 0, &subreq->io_iter); 442 } 443 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 444 err = -EIO; 445 goto out; 446 } 447 req->r_callback = finish_netfs_read; 448 req->r_priv = subreq; 449 req->r_inode = inode; 450 ihold(inode); 451 452 trace_netfs_sreq(subreq, netfs_sreq_trace_submit); 453 ceph_osdc_start_request(req->r_osdc, req); 454 out: 455 ceph_osdc_put_request(req); 456 if (err) { 457 subreq->error = err; 458 netfs_read_subreq_terminated(subreq); 459 } 460 doutc(cl, "%llx.%llx result %d\n", ceph_vinop(inode), err); 461 } 462 463 static int ceph_init_request(struct netfs_io_request *rreq, struct file *file) 464 { 465 struct inode *inode = rreq->inode; 466 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 467 struct ceph_client *cl = ceph_inode_to_client(inode); 468 int got = 0, want = CEPH_CAP_FILE_CACHE; 469 struct ceph_netfs_request_data *priv; 470 int ret = 0; 471 472 /* [DEPRECATED] Use PG_private_2 to mark folio being written to the cache. */ 473 __set_bit(NETFS_RREQ_USE_PGPRIV2, &rreq->flags); 474 475 if (rreq->origin != NETFS_READAHEAD) 476 return 0; 477 478 priv = kzalloc_obj(*priv, GFP_NOFS); 479 if (!priv) 480 return -ENOMEM; 481 482 if (file) { 483 struct ceph_rw_context *rw_ctx; 484 struct ceph_file_info *fi = file->private_data; 485 486 priv->file_ra_pages = file->f_ra.ra_pages; 487 priv->file_ra_disabled = file->f_mode & FMODE_RANDOM; 488 489 rw_ctx = ceph_find_rw_context(fi); 490 if (rw_ctx) { 491 rreq->netfs_priv = priv; 492 return 0; 493 } 494 } 495 496 /* 497 * readahead callers do not necessarily hold Fcb caps 498 * (e.g. fadvise, madvise). 499 */ 500 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got); 501 if (ret < 0) { 502 doutc(cl, "%llx.%llx, error getting cap\n", ceph_vinop(inode)); 503 goto out; 504 } 505 506 if (!(got & want)) { 507 doutc(cl, "%llx.%llx, no cache cap\n", ceph_vinop(inode)); 508 ret = -EACCES; 509 goto out; 510 } 511 if (ret == 0) { 512 ret = -EACCES; 513 goto out; 514 } 515 516 priv->caps = got; 517 rreq->netfs_priv = priv; 518 rreq->io_streams[0].sreq_max_len = fsc->mount_options->rsize; 519 520 out: 521 if (ret < 0) { 522 if (got) 523 ceph_put_cap_refs(ceph_inode(inode), got); 524 kfree(priv); 525 } 526 527 return ret; 528 } 529 530 static void ceph_netfs_free_request(struct netfs_io_request *rreq) 531 { 532 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 533 534 if (!priv) 535 return; 536 537 if (priv->caps) 538 ceph_put_cap_refs(ceph_inode(rreq->inode), priv->caps); 539 kfree(priv); 540 rreq->netfs_priv = NULL; 541 } 542 543 const struct netfs_request_ops ceph_netfs_ops = { 544 .init_request = ceph_init_request, 545 .free_request = ceph_netfs_free_request, 546 .prepare_read = ceph_netfs_prepare_read, 547 .issue_read = ceph_netfs_issue_read, 548 .expand_readahead = ceph_netfs_expand_readahead, 549 .check_write_begin = ceph_netfs_check_write_begin, 550 }; 551 552 #ifdef CONFIG_CEPH_FSCACHE 553 static void ceph_set_page_fscache(struct page *page) 554 { 555 folio_start_private_2(page_folio(page)); /* [DEPRECATED] */ 556 } 557 558 static void ceph_fscache_write_terminated(void *priv, ssize_t error) 559 { 560 struct inode *inode = priv; 561 562 if (IS_ERR_VALUE(error) && error != -ENOBUFS) 563 ceph_fscache_invalidate(inode, false); 564 } 565 566 static void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 567 { 568 struct ceph_inode_info *ci = ceph_inode(inode); 569 struct fscache_cookie *cookie = ceph_fscache_cookie(ci); 570 571 fscache_write_to_cache(cookie, inode->i_mapping, off, len, i_size_read(inode), 572 ceph_fscache_write_terminated, inode, true, caching); 573 } 574 #else 575 static inline void ceph_set_page_fscache(struct page *page) 576 { 577 } 578 579 static inline void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 580 { 581 } 582 #endif /* CONFIG_CEPH_FSCACHE */ 583 584 struct ceph_writeback_ctl 585 { 586 loff_t i_size; 587 u64 truncate_size; 588 u32 truncate_seq; 589 bool size_stable; 590 591 bool head_snapc; 592 struct ceph_snap_context *snapc; 593 struct ceph_snap_context *last_snapc; 594 595 bool done; 596 bool should_loop; 597 bool range_whole; 598 pgoff_t start_index; 599 pgoff_t index; 600 pgoff_t end; 601 xa_mark_t tag; 602 603 pgoff_t strip_unit_end; 604 unsigned int wsize; 605 unsigned int nr_folios; 606 unsigned int max_pages; 607 unsigned int locked_pages; 608 609 int op_idx; 610 int num_ops; 611 u64 offset; 612 u64 len; 613 614 struct folio_batch fbatch; 615 unsigned int processed_in_fbatch; 616 617 bool from_pool; 618 struct page **pages; 619 struct page **data_pages; 620 }; 621 622 /* 623 * Get ref for the oldest snapc for an inode with dirty data... that is, the 624 * only snap context we are allowed to write back. 625 */ 626 static struct ceph_snap_context * 627 get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl, 628 struct ceph_snap_context *page_snapc) 629 { 630 struct ceph_inode_info *ci = ceph_inode(inode); 631 struct ceph_client *cl = ceph_inode_to_client(inode); 632 struct ceph_snap_context *snapc = NULL; 633 struct ceph_cap_snap *capsnap = NULL; 634 635 spin_lock(&ci->i_ceph_lock); 636 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 637 doutc(cl, " capsnap %p snapc %p has %d dirty pages\n", 638 capsnap, capsnap->context, capsnap->dirty_pages); 639 if (!capsnap->dirty_pages) 640 continue; 641 642 /* get i_size, truncate_{seq,size} for page_snapc? */ 643 if (snapc && capsnap->context != page_snapc) 644 continue; 645 646 if (ctl) { 647 if (capsnap->writing) { 648 ctl->i_size = i_size_read(inode); 649 ctl->size_stable = false; 650 } else { 651 ctl->i_size = capsnap->size; 652 ctl->size_stable = true; 653 } 654 ctl->truncate_size = capsnap->truncate_size; 655 ctl->truncate_seq = capsnap->truncate_seq; 656 ctl->head_snapc = false; 657 } 658 659 if (snapc) 660 break; 661 662 snapc = ceph_get_snap_context(capsnap->context); 663 if (!page_snapc || 664 page_snapc == snapc || 665 page_snapc->seq > snapc->seq) 666 break; 667 } 668 if (!snapc && ci->i_wrbuffer_ref_head) { 669 snapc = ceph_get_snap_context(ci->i_head_snapc); 670 doutc(cl, " head snapc %p has %d dirty pages\n", snapc, 671 ci->i_wrbuffer_ref_head); 672 if (ctl) { 673 ctl->i_size = i_size_read(inode); 674 ctl->truncate_size = ci->i_truncate_size; 675 ctl->truncate_seq = ci->i_truncate_seq; 676 ctl->size_stable = false; 677 ctl->head_snapc = true; 678 } 679 } 680 spin_unlock(&ci->i_ceph_lock); 681 return snapc; 682 } 683 684 static u64 get_writepages_data_length(struct inode *inode, 685 struct page *page, u64 start) 686 { 687 struct ceph_inode_info *ci = ceph_inode(inode); 688 struct ceph_snap_context *snapc; 689 struct ceph_cap_snap *capsnap = NULL; 690 u64 end = i_size_read(inode); 691 u64 ret; 692 693 snapc = page_snap_context(ceph_fscrypt_pagecache_page(page)); 694 if (snapc != ci->i_head_snapc) { 695 bool found = false; 696 spin_lock(&ci->i_ceph_lock); 697 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 698 if (capsnap->context == snapc) { 699 if (!capsnap->writing) 700 end = capsnap->size; 701 found = true; 702 break; 703 } 704 } 705 spin_unlock(&ci->i_ceph_lock); 706 WARN_ON(!found); 707 } 708 if (end > ceph_fscrypt_page_offset(page) + thp_size(page)) 709 end = ceph_fscrypt_page_offset(page) + thp_size(page); 710 ret = end > start ? end - start : 0; 711 if (ret && fscrypt_is_bounce_page(page)) 712 ret = round_up(ret, CEPH_FSCRYPT_BLOCK_SIZE); 713 return ret; 714 } 715 716 /* 717 * Write a folio, but leave it locked. 718 * 719 * If we get a write error, mark the mapping for error, but still adjust the 720 * dirty page accounting (i.e., folio is no longer dirty). 721 */ 722 static int write_folio_nounlock(struct folio *folio, 723 struct writeback_control *wbc) 724 { 725 struct page *page = &folio->page; 726 struct inode *inode = folio->mapping->host; 727 struct ceph_inode_info *ci = ceph_inode(inode); 728 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 729 struct ceph_client *cl = fsc->client; 730 struct ceph_snap_context *snapc, *oldest; 731 loff_t page_off = folio_pos(folio); 732 int err; 733 loff_t len = folio_size(folio); 734 loff_t wlen; 735 struct ceph_writeback_ctl ceph_wbc; 736 struct ceph_osd_client *osdc = &fsc->client->osdc; 737 struct ceph_osd_request *req; 738 bool caching = ceph_is_cache_enabled(inode); 739 struct page *bounce_page = NULL; 740 741 doutc(cl, "%llx.%llx folio %p idx %lu\n", ceph_vinop(inode), folio, 742 folio->index); 743 744 if (ceph_inode_is_shutdown(inode)) 745 return -EIO; 746 747 /* verify this is a writeable snap context */ 748 snapc = page_snap_context(&folio->page); 749 if (!snapc) { 750 doutc(cl, "%llx.%llx folio %p not dirty?\n", ceph_vinop(inode), 751 folio); 752 return 0; 753 } 754 oldest = get_oldest_context(inode, &ceph_wbc, snapc); 755 if (snapc->seq > oldest->seq) { 756 doutc(cl, "%llx.%llx folio %p snapc %p not writeable - noop\n", 757 ceph_vinop(inode), folio, snapc); 758 /* we should only noop if called by kswapd */ 759 WARN_ON(!(current->flags & PF_MEMALLOC)); 760 ceph_put_snap_context(oldest); 761 folio_redirty_for_writepage(wbc, folio); 762 return 0; 763 } 764 ceph_put_snap_context(oldest); 765 766 /* is this a partial page at end of file? */ 767 if (page_off >= ceph_wbc.i_size) { 768 doutc(cl, "%llx.%llx folio at %lu beyond eof %llu\n", 769 ceph_vinop(inode), folio->index, ceph_wbc.i_size); 770 folio_invalidate(folio, 0, folio_size(folio)); 771 return 0; 772 } 773 774 if (ceph_wbc.i_size < page_off + len) 775 len = ceph_wbc.i_size - page_off; 776 777 wlen = IS_ENCRYPTED(inode) ? round_up(len, CEPH_FSCRYPT_BLOCK_SIZE) : len; 778 doutc(cl, "%llx.%llx folio %p index %lu on %llu~%llu snapc %p seq %lld\n", 779 ceph_vinop(inode), folio, folio->index, page_off, wlen, snapc, 780 snapc->seq); 781 782 if (atomic_long_inc_return(&fsc->writeback_count) > 783 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 784 fsc->write_congested = true; 785 786 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), 787 page_off, &wlen, 0, 1, CEPH_OSD_OP_WRITE, 788 CEPH_OSD_FLAG_WRITE, snapc, 789 ceph_wbc.truncate_seq, 790 ceph_wbc.truncate_size, true); 791 if (IS_ERR(req)) { 792 folio_redirty_for_writepage(wbc, folio); 793 return PTR_ERR(req); 794 } 795 796 if (wlen < len) 797 len = wlen; 798 799 folio_start_writeback(folio); 800 if (caching) 801 ceph_set_page_fscache(&folio->page); 802 ceph_fscache_write_to_cache(inode, page_off, len, caching); 803 804 if (IS_ENCRYPTED(inode)) { 805 bounce_page = fscrypt_encrypt_pagecache_blocks(folio, 806 CEPH_FSCRYPT_BLOCK_SIZE, 0, 807 GFP_NOFS); 808 if (IS_ERR(bounce_page)) { 809 folio_redirty_for_writepage(wbc, folio); 810 folio_end_writeback(folio); 811 ceph_osdc_put_request(req); 812 return PTR_ERR(bounce_page); 813 } 814 } 815 816 /* it may be a short write due to an object boundary */ 817 WARN_ON_ONCE(len > folio_size(folio)); 818 osd_req_op_extent_osd_data_pages(req, 0, 819 bounce_page ? &bounce_page : &page, wlen, 0, 820 false, false); 821 doutc(cl, "%llx.%llx %llu~%llu (%llu bytes, %sencrypted)\n", 822 ceph_vinop(inode), page_off, len, wlen, 823 IS_ENCRYPTED(inode) ? "" : "not "); 824 825 req->r_mtime = inode_get_mtime(inode); 826 ceph_osdc_start_request(osdc, req); 827 err = ceph_osdc_wait_request(osdc, req); 828 829 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 830 req->r_end_latency, len, err); 831 if (err >= 0 && len > 0) 832 ceph_subvolume_metrics_record_io(fsc->mdsc, ci, true, len, 833 req->r_start_latency, 834 req->r_end_latency); 835 fscrypt_free_bounce_page(bounce_page); 836 ceph_osdc_put_request(req); 837 if (err == 0) 838 err = len; 839 840 if (err < 0) { 841 struct writeback_control tmp_wbc; 842 if (!wbc) 843 wbc = &tmp_wbc; 844 if (err == -ERESTARTSYS) { 845 /* killed by SIGKILL */ 846 doutc(cl, "%llx.%llx interrupted page %p\n", 847 ceph_vinop(inode), folio); 848 folio_redirty_for_writepage(wbc, folio); 849 folio_end_writeback(folio); 850 return err; 851 } 852 if (err == -EBLOCKLISTED) 853 fsc->blocklisted = true; 854 doutc(cl, "%llx.%llx setting mapping error %d %p\n", 855 ceph_vinop(inode), err, folio); 856 mapping_set_error(&inode->i_data, err); 857 wbc->pages_skipped++; 858 } else { 859 doutc(cl, "%llx.%llx cleaned page %p\n", 860 ceph_vinop(inode), folio); 861 err = 0; /* vfs expects us to return 0 */ 862 } 863 oldest = folio_detach_private(folio); 864 WARN_ON_ONCE(oldest != snapc); 865 folio_end_writeback(folio); 866 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 867 ceph_put_snap_context(snapc); /* page's reference */ 868 869 if (atomic_long_dec_return(&fsc->writeback_count) < 870 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 871 fsc->write_congested = false; 872 873 return err; 874 } 875 876 /* 877 * async writeback completion handler. 878 * 879 * If we get an error, set the mapping error bit, but not the individual 880 * page error bits. 881 */ 882 static void writepages_finish(struct ceph_osd_request *req) 883 { 884 struct inode *inode = req->r_inode; 885 struct ceph_inode_info *ci = ceph_inode(inode); 886 struct ceph_client *cl = ceph_inode_to_client(inode); 887 struct ceph_osd_data *osd_data; 888 struct page *page; 889 int num_pages, total_pages = 0; 890 int i, j; 891 int rc = req->r_result; 892 struct ceph_snap_context *snapc = req->r_snapc; 893 struct address_space *mapping = inode->i_mapping; 894 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 895 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 896 unsigned int len = 0; 897 bool remove_page; 898 899 doutc(cl, "%llx.%llx rc %d\n", ceph_vinop(inode), rc); 900 if (rc < 0) { 901 mapping_set_error(mapping, rc); 902 ceph_set_error_write(ci); 903 if (rc == -EBLOCKLISTED) 904 fsc->blocklisted = true; 905 } else { 906 ceph_clear_error_write(ci); 907 } 908 909 /* 910 * We lost the cache cap, need to truncate the page before 911 * it is unlocked, otherwise we'd truncate it later in the 912 * page truncation thread, possibly losing some data that 913 * raced its way in 914 */ 915 remove_page = !(ceph_caps_issued(ci) & 916 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 917 918 /* clean all pages */ 919 for (i = 0; i < req->r_num_ops; i++) { 920 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) { 921 pr_warn_client(cl, 922 "%llx.%llx incorrect op %d req %p index %d tid %llu\n", 923 ceph_vinop(inode), req->r_ops[i].op, req, i, 924 req->r_tid); 925 break; 926 } 927 928 osd_data = osd_req_op_extent_osd_data(req, i); 929 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 930 len += osd_data->length; 931 num_pages = calc_pages_for((u64)osd_data->alignment, 932 (u64)osd_data->length); 933 total_pages += num_pages; 934 for (j = 0; j < num_pages; j++) { 935 page = osd_data->pages[j]; 936 if (fscrypt_is_bounce_page(page)) { 937 page = fscrypt_pagecache_page(page); 938 fscrypt_free_bounce_page(osd_data->pages[j]); 939 osd_data->pages[j] = page; 940 } 941 BUG_ON(!page); 942 WARN_ON(!PageUptodate(page)); 943 944 if (atomic_long_dec_return(&fsc->writeback_count) < 945 CONGESTION_OFF_THRESH( 946 fsc->mount_options->congestion_kb)) 947 fsc->write_congested = false; 948 949 ceph_put_snap_context(detach_page_private(page)); 950 end_page_writeback(page); 951 952 if (atomic64_dec_return(&mdsc->dirty_folios) <= 0) { 953 wake_up_all(&mdsc->flush_end_wq); 954 WARN_ON(atomic64_read(&mdsc->dirty_folios) < 0); 955 } 956 957 doutc(cl, "unlocking %p\n", page); 958 959 if (remove_page) 960 generic_error_remove_folio(inode->i_mapping, 961 page_folio(page)); 962 963 unlock_page(page); 964 } 965 doutc(cl, "%llx.%llx wrote %llu bytes cleaned %d pages\n", 966 ceph_vinop(inode), osd_data->length, 967 rc >= 0 ? num_pages : 0); 968 969 release_pages(osd_data->pages, num_pages); 970 } 971 972 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 973 req->r_end_latency, len, rc); 974 975 if (rc >= 0 && len > 0) 976 ceph_subvolume_metrics_record_io(mdsc, ci, true, len, 977 req->r_start_latency, 978 req->r_end_latency); 979 980 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); 981 982 osd_data = osd_req_op_extent_osd_data(req, 0); 983 if (osd_data->pages_from_pool) 984 mempool_free(osd_data->pages, ceph_wb_pagevec_pool); 985 else 986 kfree(osd_data->pages); 987 ceph_osdc_put_request(req); 988 ceph_dec_osd_stopping_blocker(fsc->mdsc); 989 } 990 991 static inline 992 bool is_forced_umount(struct address_space *mapping) 993 { 994 struct inode *inode = mapping->host; 995 struct ceph_inode_info *ci = ceph_inode(inode); 996 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 997 struct ceph_client *cl = fsc->client; 998 999 if (ceph_inode_is_shutdown(inode)) { 1000 if (ci->i_wrbuffer_ref > 0) { 1001 pr_warn_ratelimited_client(cl, 1002 "%llx.%llx %lld forced umount\n", 1003 ceph_vinop(inode), ceph_ino(inode)); 1004 } 1005 mapping_set_error(mapping, -EIO); 1006 return true; 1007 } 1008 1009 return false; 1010 } 1011 1012 static inline 1013 unsigned int ceph_define_write_size(struct address_space *mapping) 1014 { 1015 struct inode *inode = mapping->host; 1016 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1017 struct ceph_inode_info *ci = ceph_inode(inode); 1018 unsigned int wsize = ci->i_layout.stripe_unit; 1019 1020 if (fsc->mount_options->wsize < wsize) 1021 wsize = fsc->mount_options->wsize; 1022 1023 return wsize; 1024 } 1025 1026 static inline 1027 void ceph_folio_batch_init(struct ceph_writeback_ctl *ceph_wbc) 1028 { 1029 folio_batch_init(&ceph_wbc->fbatch); 1030 ceph_wbc->processed_in_fbatch = 0; 1031 } 1032 1033 static inline 1034 void ceph_folio_batch_reinit(struct ceph_writeback_ctl *ceph_wbc) 1035 { 1036 folio_batch_release(&ceph_wbc->fbatch); 1037 ceph_folio_batch_init(ceph_wbc); 1038 } 1039 1040 static inline 1041 void ceph_init_writeback_ctl(struct address_space *mapping, 1042 struct writeback_control *wbc, 1043 struct ceph_writeback_ctl *ceph_wbc) 1044 { 1045 ceph_wbc->snapc = NULL; 1046 ceph_wbc->last_snapc = NULL; 1047 1048 ceph_wbc->strip_unit_end = 0; 1049 ceph_wbc->wsize = ceph_define_write_size(mapping); 1050 1051 ceph_wbc->nr_folios = 0; 1052 ceph_wbc->max_pages = 0; 1053 ceph_wbc->locked_pages = 0; 1054 1055 ceph_wbc->done = false; 1056 ceph_wbc->should_loop = false; 1057 ceph_wbc->range_whole = false; 1058 1059 ceph_wbc->start_index = wbc->range_cyclic ? mapping->writeback_index : 0; 1060 ceph_wbc->index = ceph_wbc->start_index; 1061 ceph_wbc->end = -1; 1062 1063 ceph_wbc->tag = wbc_to_tag(wbc); 1064 1065 ceph_wbc->op_idx = -1; 1066 ceph_wbc->num_ops = 0; 1067 ceph_wbc->offset = 0; 1068 ceph_wbc->len = 0; 1069 ceph_wbc->from_pool = false; 1070 1071 ceph_folio_batch_init(ceph_wbc); 1072 1073 ceph_wbc->pages = NULL; 1074 ceph_wbc->data_pages = NULL; 1075 } 1076 1077 static inline 1078 int ceph_define_writeback_range(struct address_space *mapping, 1079 struct writeback_control *wbc, 1080 struct ceph_writeback_ctl *ceph_wbc) 1081 { 1082 struct inode *inode = mapping->host; 1083 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1084 struct ceph_client *cl = fsc->client; 1085 1086 /* find oldest snap context with dirty data */ 1087 ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); 1088 if (!ceph_wbc->snapc) { 1089 /* hmm, why does writepages get called when there 1090 is no dirty data? */ 1091 doutc(cl, " no snap context with dirty data?\n"); 1092 return -ENODATA; 1093 } 1094 1095 doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", 1096 ceph_wbc->snapc, ceph_wbc->snapc->seq, 1097 ceph_wbc->snapc->num_snaps); 1098 1099 ceph_wbc->should_loop = false; 1100 1101 if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { 1102 /* where to start/end? */ 1103 if (wbc->range_cyclic) { 1104 ceph_wbc->index = ceph_wbc->start_index; 1105 ceph_wbc->end = -1; 1106 if (ceph_wbc->index > 0) 1107 ceph_wbc->should_loop = true; 1108 doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index); 1109 } else { 1110 ceph_wbc->index = wbc->range_start >> PAGE_SHIFT; 1111 ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; 1112 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) 1113 ceph_wbc->range_whole = true; 1114 doutc(cl, " not cyclic, %lu to %lu\n", 1115 ceph_wbc->index, ceph_wbc->end); 1116 } 1117 } else if (!ceph_wbc->head_snapc) { 1118 /* Do not respect wbc->range_{start,end}. Dirty pages 1119 * in that range can be associated with newer snapc. 1120 * They are not writeable until we write all dirty pages 1121 * associated with 'snapc' get written */ 1122 if (ceph_wbc->index > 0) 1123 ceph_wbc->should_loop = true; 1124 doutc(cl, " non-head snapc, range whole\n"); 1125 } 1126 1127 ceph_put_snap_context(ceph_wbc->last_snapc); 1128 ceph_wbc->last_snapc = ceph_wbc->snapc; 1129 1130 return 0; 1131 } 1132 1133 static inline 1134 bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc) 1135 { 1136 return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end; 1137 } 1138 1139 static inline 1140 bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc, 1141 unsigned index) 1142 { 1143 return index < ceph_wbc->nr_folios && 1144 ceph_wbc->locked_pages < ceph_wbc->max_pages; 1145 } 1146 1147 static 1148 int ceph_check_page_before_write(struct address_space *mapping, 1149 struct writeback_control *wbc, 1150 struct ceph_writeback_ctl *ceph_wbc, 1151 struct folio *folio) 1152 { 1153 struct inode *inode = mapping->host; 1154 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1155 struct ceph_client *cl = fsc->client; 1156 struct ceph_snap_context *pgsnapc; 1157 1158 /* only dirty folios, or our accounting breaks */ 1159 if (unlikely(!folio_test_dirty(folio) || folio->mapping != mapping)) { 1160 doutc(cl, "!dirty or !mapping %p\n", folio); 1161 return -ENODATA; 1162 } 1163 1164 /* only if matching snap context */ 1165 pgsnapc = page_snap_context(&folio->page); 1166 if (pgsnapc != ceph_wbc->snapc) { 1167 doutc(cl, "folio snapc %p %lld != oldest %p %lld\n", 1168 pgsnapc, pgsnapc->seq, 1169 ceph_wbc->snapc, ceph_wbc->snapc->seq); 1170 1171 if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc && 1172 wbc->sync_mode != WB_SYNC_NONE) 1173 ceph_wbc->should_loop = true; 1174 1175 return -ENODATA; 1176 } 1177 1178 if (folio_pos(folio) >= ceph_wbc->i_size) { 1179 doutc(cl, "folio at %lu beyond eof %llu\n", 1180 folio->index, ceph_wbc->i_size); 1181 1182 if ((ceph_wbc->size_stable || 1183 folio_pos(folio) >= i_size_read(inode)) && 1184 folio_clear_dirty_for_io(folio)) 1185 folio_invalidate(folio, 0, folio_size(folio)); 1186 1187 return -ENODATA; 1188 } 1189 1190 if (ceph_wbc->strip_unit_end && 1191 (folio->index > ceph_wbc->strip_unit_end)) { 1192 doutc(cl, "end of strip unit %p\n", folio); 1193 return -E2BIG; 1194 } 1195 1196 return 0; 1197 } 1198 1199 static inline 1200 void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc, 1201 unsigned int max_pages) 1202 { 1203 ceph_wbc->pages = kmalloc_objs(*ceph_wbc->pages, max_pages, GFP_NOFS); 1204 if (!ceph_wbc->pages) { 1205 ceph_wbc->from_pool = true; 1206 ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); 1207 BUG_ON(!ceph_wbc->pages); 1208 } 1209 } 1210 1211 static inline 1212 void ceph_allocate_page_array(struct address_space *mapping, 1213 struct ceph_writeback_ctl *ceph_wbc, 1214 struct folio *folio) 1215 { 1216 struct inode *inode = mapping->host; 1217 struct ceph_inode_info *ci = ceph_inode(inode); 1218 u64 objnum; 1219 u64 objoff; 1220 u32 xlen; 1221 1222 /* prepare async write request */ 1223 ceph_wbc->offset = (u64)folio_pos(folio); 1224 ceph_calc_file_object_mapping(&ci->i_layout, 1225 ceph_wbc->offset, ceph_wbc->wsize, 1226 &objnum, &objoff, &xlen); 1227 1228 ceph_wbc->num_ops = 1; 1229 ceph_wbc->strip_unit_end = folio->index + ((xlen - 1) >> PAGE_SHIFT); 1230 1231 BUG_ON(ceph_wbc->pages); 1232 ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen); 1233 __ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages); 1234 1235 ceph_wbc->len = 0; 1236 } 1237 1238 static inline 1239 bool is_folio_index_contiguous(const struct ceph_writeback_ctl *ceph_wbc, 1240 const struct folio *folio) 1241 { 1242 return folio->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT; 1243 } 1244 1245 static inline 1246 bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc) 1247 { 1248 return ceph_wbc->num_ops >= 1249 (ceph_wbc->from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS); 1250 } 1251 1252 static inline 1253 bool is_write_congestion_happened(struct ceph_fs_client *fsc) 1254 { 1255 return atomic_long_inc_return(&fsc->writeback_count) > 1256 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb); 1257 } 1258 1259 static inline int move_dirty_folio_in_page_array(struct address_space *mapping, 1260 struct writeback_control *wbc, 1261 struct ceph_writeback_ctl *ceph_wbc, struct folio *folio) 1262 { 1263 struct inode *inode = mapping->host; 1264 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1265 struct ceph_client *cl = fsc->client; 1266 struct page **pages = ceph_wbc->pages; 1267 unsigned int index = ceph_wbc->locked_pages; 1268 gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS; 1269 1270 if (IS_ENCRYPTED(inode)) { 1271 pages[index] = fscrypt_encrypt_pagecache_blocks(folio, 1272 PAGE_SIZE, 1273 0, 1274 gfp_flags); 1275 if (IS_ERR(pages[index])) { 1276 int err = PTR_ERR(pages[index]); 1277 1278 if (err == -EINVAL) { 1279 pr_err_client(cl, "inode->i_blkbits=%hhu\n", 1280 inode->i_blkbits); 1281 } 1282 1283 /* better not fail on first page! */ 1284 BUG_ON(ceph_wbc->locked_pages == 0); 1285 1286 pages[index] = NULL; 1287 return err; 1288 } 1289 } else { 1290 pages[index] = &folio->page; 1291 } 1292 1293 ceph_wbc->locked_pages++; 1294 1295 return 0; 1296 } 1297 1298 static 1299 void ceph_process_folio_batch(struct address_space *mapping, 1300 struct writeback_control *wbc, 1301 struct ceph_writeback_ctl *ceph_wbc) 1302 { 1303 struct inode *inode = mapping->host; 1304 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1305 struct ceph_client *cl = fsc->client; 1306 struct folio *folio = NULL; 1307 unsigned i; 1308 int rc; 1309 1310 for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) { 1311 folio = ceph_wbc->fbatch.folios[i]; 1312 1313 if (!folio) 1314 continue; 1315 1316 doutc(cl, "? %p idx %lu, folio_test_writeback %#x, " 1317 "folio_test_dirty %#x, folio_test_locked %#x\n", 1318 folio, folio->index, folio_test_writeback(folio), 1319 folio_test_dirty(folio), 1320 folio_test_locked(folio)); 1321 1322 if (folio_test_writeback(folio) || 1323 folio_test_private_2(folio) /* [DEPRECATED] */) { 1324 doutc(cl, "waiting on writeback %p\n", folio); 1325 folio_wait_writeback(folio); 1326 folio_wait_private_2(folio); /* [DEPRECATED] */ 1327 continue; 1328 } 1329 1330 if (ceph_wbc->locked_pages == 0) 1331 folio_lock(folio); 1332 else if (!folio_trylock(folio)) 1333 break; 1334 1335 rc = ceph_check_page_before_write(mapping, wbc, 1336 ceph_wbc, folio); 1337 if (rc == -ENODATA) { 1338 folio_unlock(folio); 1339 ceph_wbc->fbatch.folios[i] = NULL; 1340 continue; 1341 } else if (rc == -E2BIG) { 1342 folio_unlock(folio); 1343 break; 1344 } 1345 1346 if (!folio_clear_dirty_for_io(folio)) { 1347 doutc(cl, "%p !folio_clear_dirty_for_io\n", folio); 1348 folio_unlock(folio); 1349 ceph_wbc->fbatch.folios[i] = NULL; 1350 continue; 1351 } 1352 1353 /* 1354 * We have something to write. If this is 1355 * the first locked page this time through, 1356 * calculate max possible write size and 1357 * allocate a page array 1358 */ 1359 if (ceph_wbc->locked_pages == 0) { 1360 ceph_allocate_page_array(mapping, ceph_wbc, folio); 1361 } else if (!is_folio_index_contiguous(ceph_wbc, folio)) { 1362 if (is_num_ops_too_big(ceph_wbc)) { 1363 folio_redirty_for_writepage(wbc, folio); 1364 folio_unlock(folio); 1365 break; 1366 } 1367 1368 ceph_wbc->num_ops++; 1369 ceph_wbc->offset = (u64)folio_pos(folio); 1370 ceph_wbc->len = 0; 1371 } 1372 1373 /* note position of first page in fbatch */ 1374 doutc(cl, "%llx.%llx will write folio %p idx %lu\n", 1375 ceph_vinop(inode), folio, folio->index); 1376 1377 fsc->write_congested = is_write_congestion_happened(fsc); 1378 1379 rc = move_dirty_folio_in_page_array(mapping, wbc, ceph_wbc, 1380 folio); 1381 if (rc) { 1382 /* Did we just begin a new contiguous op? Nevermind! */ 1383 if (ceph_wbc->len == 0) 1384 ceph_wbc->num_ops--; 1385 1386 folio_redirty_for_writepage(wbc, folio); 1387 folio_unlock(folio); 1388 break; 1389 } 1390 1391 ceph_wbc->fbatch.folios[i] = NULL; 1392 ceph_wbc->len += folio_size(folio); 1393 } 1394 1395 ceph_wbc->processed_in_fbatch = i; 1396 } 1397 1398 static inline 1399 void ceph_shift_unused_folios_left(struct folio_batch *fbatch) 1400 { 1401 unsigned j, n = 0; 1402 1403 /* shift unused page to beginning of fbatch */ 1404 for (j = 0; j < folio_batch_count(fbatch); j++) { 1405 if (!fbatch->folios[j]) 1406 continue; 1407 1408 if (n < j) { 1409 fbatch->folios[n] = fbatch->folios[j]; 1410 } 1411 1412 n++; 1413 } 1414 1415 fbatch->nr = n; 1416 } 1417 1418 static 1419 int ceph_submit_write(struct address_space *mapping, 1420 struct writeback_control *wbc, 1421 struct ceph_writeback_ctl *ceph_wbc) 1422 { 1423 struct inode *inode = mapping->host; 1424 struct ceph_inode_info *ci = ceph_inode(inode); 1425 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1426 struct ceph_client *cl = fsc->client; 1427 struct ceph_vino vino = ceph_vino(inode); 1428 struct ceph_osd_request *req = NULL; 1429 struct page *page = NULL; 1430 bool caching = ceph_is_cache_enabled(inode); 1431 u64 offset; 1432 u64 len; 1433 unsigned i; 1434 1435 new_request: 1436 offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]); 1437 len = ceph_wbc->wsize; 1438 1439 req = ceph_osdc_new_request(&fsc->client->osdc, 1440 &ci->i_layout, vino, 1441 offset, &len, 0, ceph_wbc->num_ops, 1442 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 1443 ceph_wbc->snapc, ceph_wbc->truncate_seq, 1444 ceph_wbc->truncate_size, false); 1445 if (IS_ERR(req)) { 1446 req = ceph_osdc_new_request(&fsc->client->osdc, 1447 &ci->i_layout, vino, 1448 offset, &len, 0, 1449 min(ceph_wbc->num_ops, 1450 CEPH_OSD_SLAB_OPS), 1451 CEPH_OSD_OP_WRITE, 1452 CEPH_OSD_FLAG_WRITE, 1453 ceph_wbc->snapc, 1454 ceph_wbc->truncate_seq, 1455 ceph_wbc->truncate_size, 1456 true); 1457 BUG_ON(IS_ERR(req)); 1458 } 1459 1460 page = ceph_wbc->pages[ceph_wbc->locked_pages - 1]; 1461 BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset); 1462 1463 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 1464 for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) { 1465 struct folio *folio = ceph_wbc->fbatch.folios[i]; 1466 1467 if (!folio) 1468 continue; 1469 1470 page = &folio->page; 1471 redirty_page_for_writepage(wbc, page); 1472 unlock_page(page); 1473 } 1474 1475 for (i = 0; i < ceph_wbc->locked_pages; i++) { 1476 page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); 1477 1478 if (!page) 1479 continue; 1480 1481 redirty_page_for_writepage(wbc, page); 1482 unlock_page(page); 1483 } 1484 1485 ceph_osdc_put_request(req); 1486 return -EIO; 1487 } 1488 1489 req->r_callback = writepages_finish; 1490 req->r_inode = inode; 1491 1492 /* Format the osd request message and submit the write */ 1493 len = 0; 1494 ceph_wbc->data_pages = ceph_wbc->pages; 1495 ceph_wbc->op_idx = 0; 1496 for (i = 0; i < ceph_wbc->locked_pages; i++) { 1497 u64 cur_offset; 1498 1499 page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); 1500 cur_offset = page_offset(page); 1501 1502 /* 1503 * Discontinuity in page range? Ceph can handle that by just passing 1504 * multiple extents in the write op. 1505 */ 1506 if (offset + len != cur_offset) { 1507 /* If it's full, stop here */ 1508 if (ceph_wbc->op_idx + 1 == req->r_num_ops) 1509 break; 1510 1511 /* Kick off an fscache write with what we have so far. */ 1512 ceph_fscache_write_to_cache(inode, offset, len, caching); 1513 1514 /* Start a new extent */ 1515 osd_req_op_extent_dup_last(req, ceph_wbc->op_idx, 1516 cur_offset - offset); 1517 1518 doutc(cl, "got pages at %llu~%llu\n", offset, len); 1519 1520 osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, 1521 ceph_wbc->data_pages, 1522 len, 0, 1523 ceph_wbc->from_pool, 1524 false); 1525 osd_req_op_extent_update(req, ceph_wbc->op_idx, len); 1526 1527 len = 0; 1528 offset = cur_offset; 1529 ceph_wbc->data_pages = ceph_wbc->pages + i; 1530 ceph_wbc->op_idx++; 1531 } 1532 1533 set_page_writeback(page); 1534 1535 if (caching) 1536 ceph_set_page_fscache(page); 1537 1538 len += thp_size(page); 1539 } 1540 1541 ceph_fscache_write_to_cache(inode, offset, len, caching); 1542 1543 if (ceph_wbc->size_stable) { 1544 len = min(len, ceph_wbc->i_size - offset); 1545 } else if (i == ceph_wbc->locked_pages) { 1546 /* writepages_finish() clears writeback pages 1547 * according to the data length, so make sure 1548 * data length covers all locked pages */ 1549 u64 min_len = len + 1 - thp_size(page); 1550 len = get_writepages_data_length(inode, 1551 ceph_wbc->pages[i - 1], 1552 offset); 1553 len = max(len, min_len); 1554 } 1555 1556 if (IS_ENCRYPTED(inode)) 1557 len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); 1558 1559 doutc(cl, "got pages at %llu~%llu\n", offset, len); 1560 1561 if (IS_ENCRYPTED(inode) && 1562 ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) { 1563 pr_warn_client(cl, 1564 "bad encrypted write offset=%lld len=%llu\n", 1565 offset, len); 1566 } 1567 1568 osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, 1569 ceph_wbc->data_pages, len, 1570 0, ceph_wbc->from_pool, false); 1571 osd_req_op_extent_update(req, ceph_wbc->op_idx, len); 1572 1573 BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops); 1574 1575 ceph_wbc->from_pool = false; 1576 if (i < ceph_wbc->locked_pages) { 1577 BUG_ON(ceph_wbc->num_ops <= req->r_num_ops); 1578 ceph_wbc->num_ops -= req->r_num_ops; 1579 ceph_wbc->locked_pages -= i; 1580 1581 /* allocate new pages array for next request */ 1582 ceph_wbc->data_pages = ceph_wbc->pages; 1583 __ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages); 1584 memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i, 1585 ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); 1586 memset(ceph_wbc->data_pages + i, 0, 1587 ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); 1588 } else { 1589 BUG_ON(ceph_wbc->num_ops != req->r_num_ops); 1590 /* request message now owns the pages array */ 1591 ceph_wbc->pages = NULL; 1592 } 1593 1594 req->r_mtime = inode_get_mtime(inode); 1595 ceph_osdc_start_request(&fsc->client->osdc, req); 1596 req = NULL; 1597 1598 wbc->nr_to_write -= i; 1599 if (ceph_wbc->pages) 1600 goto new_request; 1601 1602 return 0; 1603 } 1604 1605 static 1606 void ceph_wait_until_current_writes_complete(struct address_space *mapping, 1607 struct writeback_control *wbc, 1608 struct ceph_writeback_ctl *ceph_wbc) 1609 { 1610 struct page *page; 1611 unsigned i, nr; 1612 1613 if (wbc->sync_mode != WB_SYNC_NONE && 1614 ceph_wbc->start_index == 0 && /* all dirty pages were checked */ 1615 !ceph_wbc->head_snapc) { 1616 ceph_wbc->index = 0; 1617 1618 while ((ceph_wbc->index <= ceph_wbc->end) && 1619 (nr = filemap_get_folios_tag(mapping, 1620 &ceph_wbc->index, 1621 (pgoff_t)-1, 1622 PAGECACHE_TAG_WRITEBACK, 1623 &ceph_wbc->fbatch))) { 1624 for (i = 0; i < nr; i++) { 1625 page = &ceph_wbc->fbatch.folios[i]->page; 1626 if (page_snap_context(page) != ceph_wbc->snapc) 1627 continue; 1628 wait_on_page_writeback(page); 1629 } 1630 1631 folio_batch_release(&ceph_wbc->fbatch); 1632 cond_resched(); 1633 } 1634 } 1635 } 1636 1637 /* 1638 * initiate async writeback 1639 */ 1640 static int ceph_writepages_start(struct address_space *mapping, 1641 struct writeback_control *wbc) 1642 { 1643 struct inode *inode = mapping->host; 1644 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1645 struct ceph_client *cl = fsc->client; 1646 struct ceph_writeback_ctl ceph_wbc; 1647 int rc = 0; 1648 1649 if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested) 1650 return 0; 1651 1652 doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), 1653 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 1654 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 1655 1656 if (is_forced_umount(mapping)) { 1657 /* we're in a forced umount, don't write! */ 1658 return -EIO; 1659 } 1660 1661 ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); 1662 1663 if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { 1664 rc = -EIO; 1665 goto out; 1666 } 1667 1668 retry: 1669 rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); 1670 if (rc == -ENODATA) { 1671 /* hmm, why does writepages get called when there 1672 is no dirty data? */ 1673 rc = 0; 1674 goto dec_osd_stopping_blocker; 1675 } 1676 1677 if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) 1678 tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); 1679 1680 while (!has_writeback_done(&ceph_wbc)) { 1681 BUG_ON(ceph_wbc.locked_pages); 1682 BUG_ON(ceph_wbc.pages); 1683 1684 ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; 1685 1686 get_more_pages: 1687 ceph_folio_batch_reinit(&ceph_wbc); 1688 1689 ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, 1690 &ceph_wbc.index, 1691 ceph_wbc.end, 1692 ceph_wbc.tag, 1693 &ceph_wbc.fbatch); 1694 doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n", 1695 ceph_wbc.tag, ceph_wbc.nr_folios); 1696 1697 if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) 1698 break; 1699 1700 process_folio_batch: 1701 ceph_process_folio_batch(mapping, wbc, &ceph_wbc); 1702 ceph_shift_unused_folios_left(&ceph_wbc.fbatch); 1703 1704 /* did we get anything? */ 1705 if (!ceph_wbc.locked_pages) 1706 goto release_folios; 1707 1708 if (ceph_wbc.processed_in_fbatch) { 1709 if (folio_batch_count(&ceph_wbc.fbatch) == 0 && 1710 ceph_wbc.locked_pages < ceph_wbc.max_pages) { 1711 doutc(cl, "reached end fbatch, trying for more\n"); 1712 goto get_more_pages; 1713 } 1714 } 1715 1716 rc = ceph_submit_write(mapping, wbc, &ceph_wbc); 1717 if (rc) 1718 goto release_folios; 1719 1720 ceph_wbc.locked_pages = 0; 1721 ceph_wbc.strip_unit_end = 0; 1722 1723 if (folio_batch_count(&ceph_wbc.fbatch) > 0) { 1724 ceph_wbc.nr_folios = 1725 folio_batch_count(&ceph_wbc.fbatch); 1726 goto process_folio_batch; 1727 } 1728 1729 /* 1730 * We stop writing back only if we are not doing 1731 * integrity sync. In case of integrity sync we have to 1732 * keep going until we have written all the pages 1733 * we tagged for writeback prior to entering this loop. 1734 */ 1735 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) 1736 ceph_wbc.done = true; 1737 1738 release_folios: 1739 doutc(cl, "folio_batch release on %d folios (%p)\n", 1740 (int)ceph_wbc.fbatch.nr, 1741 ceph_wbc.fbatch.nr ? ceph_wbc.fbatch.folios[0] : NULL); 1742 folio_batch_release(&ceph_wbc.fbatch); 1743 } 1744 1745 if (ceph_wbc.should_loop && !ceph_wbc.done) { 1746 /* more to do; loop back to beginning of file */ 1747 doutc(cl, "looping back to beginning of file\n"); 1748 /* OK even when start_index == 0 */ 1749 ceph_wbc.end = ceph_wbc.start_index - 1; 1750 1751 /* to write dirty pages associated with next snapc, 1752 * we need to wait until current writes complete */ 1753 ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc); 1754 1755 ceph_wbc.start_index = 0; 1756 ceph_wbc.index = 0; 1757 goto retry; 1758 } 1759 1760 if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0)) 1761 mapping->writeback_index = ceph_wbc.index; 1762 1763 dec_osd_stopping_blocker: 1764 ceph_dec_osd_stopping_blocker(fsc->mdsc); 1765 1766 out: 1767 ceph_put_snap_context(ceph_wbc.last_snapc); 1768 doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), 1769 rc); 1770 1771 return rc; 1772 } 1773 1774 /* 1775 * See if a given @snapc is either writeable, or already written. 1776 */ 1777 static int context_is_writeable_or_written(struct inode *inode, 1778 struct ceph_snap_context *snapc) 1779 { 1780 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL); 1781 int ret = !oldest || snapc->seq <= oldest->seq; 1782 1783 ceph_put_snap_context(oldest); 1784 return ret; 1785 } 1786 1787 /** 1788 * ceph_find_incompatible - find an incompatible context and return it 1789 * @folio: folio being dirtied 1790 * 1791 * We are only allowed to write into/dirty a folio if the folio is 1792 * clean, or already dirty within the same snap context. Returns a 1793 * conflicting context if there is one, NULL if there isn't, or a 1794 * negative error code on other errors. 1795 * 1796 * Must be called with folio lock held. 1797 */ 1798 static struct ceph_snap_context * 1799 ceph_find_incompatible(struct folio *folio) 1800 { 1801 struct inode *inode = folio->mapping->host; 1802 struct ceph_client *cl = ceph_inode_to_client(inode); 1803 struct ceph_inode_info *ci = ceph_inode(inode); 1804 1805 if (ceph_inode_is_shutdown(inode)) { 1806 doutc(cl, " %llx.%llx folio %p is shutdown\n", 1807 ceph_vinop(inode), folio); 1808 return ERR_PTR(-ESTALE); 1809 } 1810 1811 for (;;) { 1812 struct ceph_snap_context *snapc, *oldest; 1813 1814 folio_wait_writeback(folio); 1815 1816 snapc = page_snap_context(&folio->page); 1817 if (!snapc || snapc == ci->i_head_snapc) 1818 break; 1819 1820 /* 1821 * this folio is already dirty in another (older) snap 1822 * context! is it writeable now? 1823 */ 1824 oldest = get_oldest_context(inode, NULL, NULL); 1825 if (snapc->seq > oldest->seq) { 1826 /* not writeable -- return it for the caller to deal with */ 1827 ceph_put_snap_context(oldest); 1828 doutc(cl, " %llx.%llx folio %p snapc %p not current or oldest\n", 1829 ceph_vinop(inode), folio, snapc); 1830 return ceph_get_snap_context(snapc); 1831 } 1832 ceph_put_snap_context(oldest); 1833 1834 /* yay, writeable, do it now (without dropping folio lock) */ 1835 doutc(cl, " %llx.%llx folio %p snapc %p not current, but oldest\n", 1836 ceph_vinop(inode), folio, snapc); 1837 if (folio_clear_dirty_for_io(folio)) { 1838 int r = write_folio_nounlock(folio, NULL); 1839 if (r < 0) 1840 return ERR_PTR(r); 1841 } 1842 } 1843 return NULL; 1844 } 1845 1846 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 1847 struct folio **foliop, void **_fsdata) 1848 { 1849 struct inode *inode = file_inode(file); 1850 struct ceph_inode_info *ci = ceph_inode(inode); 1851 struct ceph_snap_context *snapc; 1852 1853 snapc = ceph_find_incompatible(*foliop); 1854 if (snapc) { 1855 int r; 1856 1857 folio_unlock(*foliop); 1858 folio_put(*foliop); 1859 *foliop = NULL; 1860 if (IS_ERR(snapc)) 1861 return PTR_ERR(snapc); 1862 1863 ceph_queue_writeback(inode); 1864 r = wait_event_killable(ci->i_cap_wq, 1865 context_is_writeable_or_written(inode, snapc)); 1866 ceph_put_snap_context(snapc); 1867 return r == 0 ? -EAGAIN : r; 1868 } 1869 return 0; 1870 } 1871 1872 /* 1873 * We are only allowed to write into/dirty the page if the page is 1874 * clean, or already dirty within the same snap context. 1875 */ 1876 static int ceph_write_begin(const struct kiocb *iocb, 1877 struct address_space *mapping, 1878 loff_t pos, unsigned len, 1879 struct folio **foliop, void **fsdata) 1880 { 1881 struct file *file = iocb->ki_filp; 1882 struct inode *inode = file_inode(file); 1883 struct ceph_inode_info *ci = ceph_inode(inode); 1884 int r; 1885 1886 r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, foliop, NULL); 1887 if (r < 0) 1888 return r; 1889 1890 folio_wait_private_2(*foliop); /* [DEPRECATED] */ 1891 WARN_ON_ONCE(!folio_test_locked(*foliop)); 1892 return 0; 1893 } 1894 1895 /* 1896 * we don't do anything in here that simple_write_end doesn't do 1897 * except adjust dirty page accounting 1898 */ 1899 static int ceph_write_end(const struct kiocb *iocb, 1900 struct address_space *mapping, loff_t pos, 1901 unsigned len, unsigned copied, 1902 struct folio *folio, void *fsdata) 1903 { 1904 struct file *file = iocb->ki_filp; 1905 struct inode *inode = file_inode(file); 1906 struct ceph_client *cl = ceph_inode_to_client(inode); 1907 bool check_cap = false; 1908 1909 doutc(cl, "%llx.%llx file %p folio %p %d~%d (%d)\n", ceph_vinop(inode), 1910 file, folio, (int)pos, (int)copied, (int)len); 1911 1912 if (!folio_test_uptodate(folio)) { 1913 /* just return that nothing was copied on a short copy */ 1914 if (copied < len) { 1915 copied = 0; 1916 goto out; 1917 } 1918 folio_mark_uptodate(folio); 1919 } 1920 1921 /* did file size increase? */ 1922 if (pos+copied > i_size_read(inode)) 1923 check_cap = ceph_inode_set_size(inode, pos+copied); 1924 1925 folio_mark_dirty(folio); 1926 1927 out: 1928 folio_unlock(folio); 1929 folio_put(folio); 1930 1931 if (check_cap) 1932 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY); 1933 1934 return copied; 1935 } 1936 1937 const struct address_space_operations ceph_aops = { 1938 .read_folio = netfs_read_folio, 1939 .readahead = netfs_readahead, 1940 .writepages = ceph_writepages_start, 1941 .write_begin = ceph_write_begin, 1942 .write_end = ceph_write_end, 1943 .dirty_folio = ceph_dirty_folio, 1944 .invalidate_folio = ceph_invalidate_folio, 1945 .release_folio = netfs_release_folio, 1946 .direct_IO = noop_direct_IO, 1947 .migrate_folio = filemap_migrate_folio, 1948 }; 1949 1950 static void ceph_block_sigs(sigset_t *oldset) 1951 { 1952 sigset_t mask; 1953 siginitsetinv(&mask, sigmask(SIGKILL)); 1954 sigprocmask(SIG_BLOCK, &mask, oldset); 1955 } 1956 1957 static void ceph_restore_sigs(sigset_t *oldset) 1958 { 1959 sigprocmask(SIG_SETMASK, oldset, NULL); 1960 } 1961 1962 /* 1963 * vm ops 1964 */ 1965 static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf) 1966 { 1967 struct vm_area_struct *vma = vmf->vma; 1968 struct inode *inode = file_inode(vma->vm_file); 1969 struct ceph_inode_info *ci = ceph_inode(inode); 1970 struct ceph_client *cl = ceph_inode_to_client(inode); 1971 struct ceph_file_info *fi = vma->vm_file->private_data; 1972 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT; 1973 int want, got, err; 1974 sigset_t oldset; 1975 vm_fault_t ret = VM_FAULT_SIGBUS; 1976 1977 if (ceph_inode_is_shutdown(inode)) 1978 return ret; 1979 1980 ceph_block_sigs(&oldset); 1981 1982 doutc(cl, "%llx.%llx %llu trying to get caps\n", 1983 ceph_vinop(inode), off); 1984 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1985 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1986 else 1987 want = CEPH_CAP_FILE_CACHE; 1988 1989 got = 0; 1990 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got); 1991 if (err < 0) 1992 goto out_restore; 1993 1994 doutc(cl, "%llx.%llx %llu got cap refs on %s\n", ceph_vinop(inode), 1995 off, ceph_cap_string(got)); 1996 1997 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || 1998 !ceph_has_inline_data(ci)) { 1999 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); 2000 ceph_add_rw_context(fi, &rw_ctx); 2001 ret = filemap_fault(vmf); 2002 ceph_del_rw_context(fi, &rw_ctx); 2003 doutc(cl, "%llx.%llx %llu drop cap refs %s ret %x\n", 2004 ceph_vinop(inode), off, ceph_cap_string(got), ret); 2005 } else 2006 err = -EAGAIN; 2007 2008 ceph_put_cap_refs(ci, got); 2009 2010 if (err != -EAGAIN) 2011 goto out_restore; 2012 2013 /* read inline data */ 2014 if (off >= PAGE_SIZE) { 2015 /* does not support inline data > PAGE_SIZE */ 2016 ret = VM_FAULT_SIGBUS; 2017 } else { 2018 struct address_space *mapping = inode->i_mapping; 2019 struct page *page; 2020 2021 filemap_invalidate_lock_shared(mapping); 2022 page = find_or_create_page(mapping, 0, 2023 mapping_gfp_constraint(mapping, ~__GFP_FS)); 2024 if (!page) { 2025 ret = VM_FAULT_OOM; 2026 goto out_inline; 2027 } 2028 err = __ceph_do_getattr(inode, page, 2029 CEPH_STAT_CAP_INLINE_DATA, true); 2030 if (err < 0 || off >= i_size_read(inode)) { 2031 unlock_page(page); 2032 put_page(page); 2033 ret = vmf_error(err); 2034 goto out_inline; 2035 } 2036 if (err < PAGE_SIZE) 2037 zero_user_segment(page, err, PAGE_SIZE); 2038 else 2039 flush_dcache_page(page); 2040 SetPageUptodate(page); 2041 vmf->page = page; 2042 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; 2043 out_inline: 2044 filemap_invalidate_unlock_shared(mapping); 2045 doutc(cl, "%llx.%llx %llu read inline data ret %x\n", 2046 ceph_vinop(inode), off, ret); 2047 } 2048 out_restore: 2049 ceph_restore_sigs(&oldset); 2050 if (err < 0) 2051 ret = vmf_error(err); 2052 2053 return ret; 2054 } 2055 2056 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) 2057 { 2058 struct vm_area_struct *vma = vmf->vma; 2059 struct inode *inode = file_inode(vma->vm_file); 2060 struct ceph_client *cl = ceph_inode_to_client(inode); 2061 struct ceph_inode_info *ci = ceph_inode(inode); 2062 struct ceph_file_info *fi = vma->vm_file->private_data; 2063 struct ceph_cap_flush *prealloc_cf; 2064 struct folio *folio = page_folio(vmf->page); 2065 loff_t off = folio_pos(folio); 2066 loff_t size = i_size_read(inode); 2067 size_t len; 2068 int want, got, err; 2069 sigset_t oldset; 2070 vm_fault_t ret = VM_FAULT_SIGBUS; 2071 2072 if (ceph_inode_is_shutdown(inode)) 2073 return ret; 2074 2075 prealloc_cf = ceph_alloc_cap_flush(); 2076 if (!prealloc_cf) 2077 return VM_FAULT_OOM; 2078 2079 sb_start_pagefault(inode->i_sb); 2080 ceph_block_sigs(&oldset); 2081 2082 if (off + folio_size(folio) <= size) 2083 len = folio_size(folio); 2084 else 2085 len = offset_in_folio(folio, size); 2086 2087 doutc(cl, "%llx.%llx %llu~%zd getting caps i_size %llu\n", 2088 ceph_vinop(inode), off, len, size); 2089 if (fi->fmode & CEPH_FILE_MODE_LAZY) 2090 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 2091 else 2092 want = CEPH_CAP_FILE_BUFFER; 2093 2094 got = 0; 2095 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got); 2096 if (err < 0) 2097 goto out_free; 2098 2099 doutc(cl, "%llx.%llx %llu~%zd got cap refs on %s\n", ceph_vinop(inode), 2100 off, len, ceph_cap_string(got)); 2101 2102 /* Update time before taking folio lock */ 2103 file_update_time(vma->vm_file); 2104 inode_inc_iversion_raw(inode); 2105 2106 do { 2107 struct ceph_snap_context *snapc; 2108 2109 folio_lock(folio); 2110 2111 if (folio_mkwrite_check_truncate(folio, inode) < 0) { 2112 folio_unlock(folio); 2113 ret = VM_FAULT_NOPAGE; 2114 break; 2115 } 2116 2117 snapc = ceph_find_incompatible(folio); 2118 if (!snapc) { 2119 /* success. we'll keep the folio locked. */ 2120 folio_mark_dirty(folio); 2121 ret = VM_FAULT_LOCKED; 2122 break; 2123 } 2124 2125 folio_unlock(folio); 2126 2127 if (IS_ERR(snapc)) { 2128 ret = VM_FAULT_SIGBUS; 2129 break; 2130 } 2131 2132 ceph_queue_writeback(inode); 2133 err = wait_event_killable(ci->i_cap_wq, 2134 context_is_writeable_or_written(inode, snapc)); 2135 ceph_put_snap_context(snapc); 2136 } while (err == 0); 2137 2138 if (ret == VM_FAULT_LOCKED) { 2139 int dirty; 2140 spin_lock(&ci->i_ceph_lock); 2141 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 2142 &prealloc_cf); 2143 spin_unlock(&ci->i_ceph_lock); 2144 if (dirty) 2145 __mark_inode_dirty(inode, dirty); 2146 } 2147 2148 doutc(cl, "%llx.%llx %llu~%zd dropping cap refs on %s ret %x\n", 2149 ceph_vinop(inode), off, len, ceph_cap_string(got), ret); 2150 ceph_put_cap_refs_async(ci, got); 2151 out_free: 2152 ceph_restore_sigs(&oldset); 2153 sb_end_pagefault(inode->i_sb); 2154 ceph_free_cap_flush(prealloc_cf); 2155 if (err < 0) 2156 ret = vmf_error(err); 2157 return ret; 2158 } 2159 2160 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, 2161 char *data, size_t len) 2162 { 2163 struct ceph_client *cl = ceph_inode_to_client(inode); 2164 struct address_space *mapping = inode->i_mapping; 2165 struct page *page; 2166 2167 if (locked_page) { 2168 page = locked_page; 2169 } else { 2170 if (i_size_read(inode) == 0) 2171 return; 2172 page = find_or_create_page(mapping, 0, 2173 mapping_gfp_constraint(mapping, 2174 ~__GFP_FS)); 2175 if (!page) 2176 return; 2177 if (PageUptodate(page)) { 2178 unlock_page(page); 2179 put_page(page); 2180 return; 2181 } 2182 } 2183 2184 doutc(cl, "%p %llx.%llx len %zu locked_page %p\n", inode, 2185 ceph_vinop(inode), len, locked_page); 2186 2187 if (len > 0) { 2188 void *kaddr = kmap_atomic(page); 2189 memcpy(kaddr, data, len); 2190 kunmap_atomic(kaddr); 2191 } 2192 2193 if (page != locked_page) { 2194 if (len < PAGE_SIZE) 2195 zero_user_segment(page, len, PAGE_SIZE); 2196 else 2197 flush_dcache_page(page); 2198 2199 SetPageUptodate(page); 2200 unlock_page(page); 2201 put_page(page); 2202 } 2203 } 2204 2205 int ceph_uninline_data(struct file *file) 2206 { 2207 struct inode *inode = file_inode(file); 2208 struct ceph_inode_info *ci = ceph_inode(inode); 2209 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 2210 struct ceph_client *cl = fsc->client; 2211 struct ceph_osd_request *req = NULL; 2212 struct ceph_cap_flush *prealloc_cf = NULL; 2213 struct folio *folio = NULL; 2214 struct ceph_snap_context *snapc = NULL; 2215 u64 inline_version = CEPH_INLINE_NONE; 2216 struct page *pages[1]; 2217 int err = 0; 2218 u64 len; 2219 2220 spin_lock(&ci->i_ceph_lock); 2221 inline_version = ci->i_inline_version; 2222 spin_unlock(&ci->i_ceph_lock); 2223 2224 doutc(cl, "%llx.%llx inline_version %llu\n", ceph_vinop(inode), 2225 inline_version); 2226 2227 if (ceph_inode_is_shutdown(inode)) { 2228 err = -EIO; 2229 goto out; 2230 } 2231 2232 if (inline_version == CEPH_INLINE_NONE) 2233 return 0; 2234 2235 prealloc_cf = ceph_alloc_cap_flush(); 2236 if (!prealloc_cf) 2237 return -ENOMEM; 2238 2239 if (inline_version == 1) /* initial version, no data */ 2240 goto out_uninline; 2241 2242 down_read(&fsc->mdsc->snap_rwsem); 2243 spin_lock(&ci->i_ceph_lock); 2244 if (__ceph_have_pending_cap_snap(ci)) { 2245 struct ceph_cap_snap *capsnap = 2246 list_last_entry(&ci->i_cap_snaps, 2247 struct ceph_cap_snap, 2248 ci_item); 2249 snapc = ceph_get_snap_context(capsnap->context); 2250 } else { 2251 if (!ci->i_head_snapc) { 2252 ci->i_head_snapc = ceph_get_snap_context( 2253 ci->i_snap_realm->cached_context); 2254 } 2255 snapc = ceph_get_snap_context(ci->i_head_snapc); 2256 } 2257 spin_unlock(&ci->i_ceph_lock); 2258 up_read(&fsc->mdsc->snap_rwsem); 2259 2260 folio = read_mapping_folio(inode->i_mapping, 0, file); 2261 if (IS_ERR(folio)) { 2262 err = PTR_ERR(folio); 2263 goto out; 2264 } 2265 2266 folio_lock(folio); 2267 2268 len = i_size_read(inode); 2269 if (len > folio_size(folio)) 2270 len = folio_size(folio); 2271 2272 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 2273 ceph_vino(inode), 0, &len, 0, 1, 2274 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, 2275 snapc, 0, 0, false); 2276 if (IS_ERR(req)) { 2277 err = PTR_ERR(req); 2278 goto out_unlock; 2279 } 2280 2281 req->r_mtime = inode_get_mtime(inode); 2282 ceph_osdc_start_request(&fsc->client->osdc, req); 2283 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 2284 ceph_osdc_put_request(req); 2285 if (err < 0) 2286 goto out_unlock; 2287 2288 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 2289 ceph_vino(inode), 0, &len, 1, 3, 2290 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 2291 snapc, ci->i_truncate_seq, 2292 ci->i_truncate_size, false); 2293 if (IS_ERR(req)) { 2294 err = PTR_ERR(req); 2295 goto out_unlock; 2296 } 2297 2298 pages[0] = folio_page(folio, 0); 2299 osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false); 2300 2301 { 2302 __le64 xattr_buf = cpu_to_le64(inline_version); 2303 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, 2304 "inline_version", &xattr_buf, 2305 sizeof(xattr_buf), 2306 CEPH_OSD_CMPXATTR_OP_GT, 2307 CEPH_OSD_CMPXATTR_MODE_U64); 2308 if (err) 2309 goto out_put_req; 2310 } 2311 2312 { 2313 char xattr_buf[32]; 2314 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), 2315 "%llu", inline_version); 2316 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, 2317 "inline_version", 2318 xattr_buf, xattr_len, 0, 0); 2319 if (err) 2320 goto out_put_req; 2321 } 2322 2323 req->r_mtime = inode_get_mtime(inode); 2324 ceph_osdc_start_request(&fsc->client->osdc, req); 2325 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 2326 2327 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 2328 req->r_end_latency, len, err); 2329 2330 out_uninline: 2331 if (!err) { 2332 int dirty; 2333 2334 /* Set to CAP_INLINE_NONE and dirty the caps */ 2335 down_read(&fsc->mdsc->snap_rwsem); 2336 spin_lock(&ci->i_ceph_lock); 2337 ci->i_inline_version = CEPH_INLINE_NONE; 2338 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); 2339 spin_unlock(&ci->i_ceph_lock); 2340 up_read(&fsc->mdsc->snap_rwsem); 2341 if (dirty) 2342 __mark_inode_dirty(inode, dirty); 2343 } 2344 out_put_req: 2345 ceph_osdc_put_request(req); 2346 if (err == -ECANCELED) 2347 err = 0; 2348 out_unlock: 2349 if (folio) { 2350 folio_unlock(folio); 2351 folio_put(folio); 2352 } 2353 out: 2354 ceph_put_snap_context(snapc); 2355 ceph_free_cap_flush(prealloc_cf); 2356 doutc(cl, "%llx.%llx inline_version %llu = %d\n", 2357 ceph_vinop(inode), inline_version, err); 2358 return err; 2359 } 2360 2361 static const struct vm_operations_struct ceph_vmops = { 2362 .fault = ceph_filemap_fault, 2363 .page_mkwrite = ceph_page_mkwrite, 2364 }; 2365 2366 int ceph_mmap_prepare(struct vm_area_desc *desc) 2367 { 2368 struct address_space *mapping = desc->file->f_mapping; 2369 2370 if (!mapping->a_ops->read_folio) 2371 return -ENOEXEC; 2372 desc->vm_ops = &ceph_vmops; 2373 return 0; 2374 } 2375 2376 enum { 2377 POOL_READ = 1, 2378 POOL_WRITE = 2, 2379 }; 2380 2381 static int __ceph_pool_perm_get(struct ceph_inode_info *ci, 2382 s64 pool, struct ceph_string *pool_ns) 2383 { 2384 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(&ci->netfs.inode); 2385 struct ceph_mds_client *mdsc = fsc->mdsc; 2386 struct ceph_client *cl = fsc->client; 2387 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; 2388 struct rb_node **p, *parent; 2389 struct ceph_pool_perm *perm; 2390 struct page **pages; 2391 size_t pool_ns_len; 2392 int err = 0, err2 = 0, have = 0; 2393 2394 down_read(&mdsc->pool_perm_rwsem); 2395 p = &mdsc->pool_perm_tree.rb_node; 2396 while (*p) { 2397 perm = rb_entry(*p, struct ceph_pool_perm, node); 2398 if (pool < perm->pool) 2399 p = &(*p)->rb_left; 2400 else if (pool > perm->pool) 2401 p = &(*p)->rb_right; 2402 else { 2403 int ret = ceph_compare_string(pool_ns, 2404 perm->pool_ns, 2405 perm->pool_ns_len); 2406 if (ret < 0) 2407 p = &(*p)->rb_left; 2408 else if (ret > 0) 2409 p = &(*p)->rb_right; 2410 else { 2411 have = perm->perm; 2412 break; 2413 } 2414 } 2415 } 2416 up_read(&mdsc->pool_perm_rwsem); 2417 if (*p) 2418 goto out; 2419 2420 if (pool_ns) 2421 doutc(cl, "pool %lld ns %.*s no perm cached\n", pool, 2422 (int)pool_ns->len, pool_ns->str); 2423 else 2424 doutc(cl, "pool %lld no perm cached\n", pool); 2425 2426 down_write(&mdsc->pool_perm_rwsem); 2427 p = &mdsc->pool_perm_tree.rb_node; 2428 parent = NULL; 2429 while (*p) { 2430 parent = *p; 2431 perm = rb_entry(parent, struct ceph_pool_perm, node); 2432 if (pool < perm->pool) 2433 p = &(*p)->rb_left; 2434 else if (pool > perm->pool) 2435 p = &(*p)->rb_right; 2436 else { 2437 int ret = ceph_compare_string(pool_ns, 2438 perm->pool_ns, 2439 perm->pool_ns_len); 2440 if (ret < 0) 2441 p = &(*p)->rb_left; 2442 else if (ret > 0) 2443 p = &(*p)->rb_right; 2444 else { 2445 have = perm->perm; 2446 break; 2447 } 2448 } 2449 } 2450 if (*p) { 2451 up_write(&mdsc->pool_perm_rwsem); 2452 goto out; 2453 } 2454 2455 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2456 1, false, GFP_NOFS); 2457 if (!rd_req) { 2458 err = -ENOMEM; 2459 goto out_unlock; 2460 } 2461 2462 rd_req->r_flags = CEPH_OSD_FLAG_READ; 2463 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); 2464 rd_req->r_base_oloc.pool = pool; 2465 if (pool_ns) 2466 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns); 2467 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); 2468 2469 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); 2470 if (err) 2471 goto out_unlock; 2472 2473 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2474 1, false, GFP_NOFS); 2475 if (!wr_req) { 2476 err = -ENOMEM; 2477 goto out_unlock; 2478 } 2479 2480 wr_req->r_flags = CEPH_OSD_FLAG_WRITE; 2481 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 2482 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); 2483 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); 2484 2485 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); 2486 if (err) 2487 goto out_unlock; 2488 2489 /* one page should be large enough for STAT data */ 2490 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 2491 if (IS_ERR(pages)) { 2492 err = PTR_ERR(pages); 2493 goto out_unlock; 2494 } 2495 2496 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 2497 0, false, true); 2498 ceph_osdc_start_request(&fsc->client->osdc, rd_req); 2499 2500 wr_req->r_mtime = inode_get_mtime(&ci->netfs.inode); 2501 ceph_osdc_start_request(&fsc->client->osdc, wr_req); 2502 2503 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); 2504 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); 2505 2506 if (err >= 0 || err == -ENOENT) 2507 have |= POOL_READ; 2508 else if (err != -EPERM) { 2509 if (err == -EBLOCKLISTED) 2510 fsc->blocklisted = true; 2511 goto out_unlock; 2512 } 2513 2514 if (err2 == 0 || err2 == -EEXIST) 2515 have |= POOL_WRITE; 2516 else if (err2 != -EPERM) { 2517 if (err2 == -EBLOCKLISTED) 2518 fsc->blocklisted = true; 2519 err = err2; 2520 goto out_unlock; 2521 } 2522 2523 pool_ns_len = pool_ns ? pool_ns->len : 0; 2524 perm = kmalloc_flex(*perm, pool_ns, pool_ns_len + 1, GFP_NOFS); 2525 if (!perm) { 2526 err = -ENOMEM; 2527 goto out_unlock; 2528 } 2529 2530 perm->pool = pool; 2531 perm->perm = have; 2532 perm->pool_ns_len = pool_ns_len; 2533 if (pool_ns_len > 0) 2534 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len); 2535 perm->pool_ns[pool_ns_len] = 0; 2536 2537 rb_link_node(&perm->node, parent, p); 2538 rb_insert_color(&perm->node, &mdsc->pool_perm_tree); 2539 err = 0; 2540 out_unlock: 2541 up_write(&mdsc->pool_perm_rwsem); 2542 2543 ceph_osdc_put_request(rd_req); 2544 ceph_osdc_put_request(wr_req); 2545 out: 2546 if (!err) 2547 err = have; 2548 if (pool_ns) 2549 doutc(cl, "pool %lld ns %.*s result = %d\n", pool, 2550 (int)pool_ns->len, pool_ns->str, err); 2551 else 2552 doutc(cl, "pool %lld result = %d\n", pool, err); 2553 return err; 2554 } 2555 2556 int ceph_pool_perm_check(struct inode *inode, int need) 2557 { 2558 struct ceph_client *cl = ceph_inode_to_client(inode); 2559 struct ceph_inode_info *ci = ceph_inode(inode); 2560 struct ceph_string *pool_ns; 2561 s64 pool; 2562 int ret, flags; 2563 2564 /* Only need to do this for regular files */ 2565 if (!S_ISREG(inode->i_mode)) 2566 return 0; 2567 2568 if (ci->i_vino.snap != CEPH_NOSNAP) { 2569 /* 2570 * Pool permission check needs to write to the first object. 2571 * But for snapshot, head of the first object may have already 2572 * been deleted. Skip check to avoid creating orphan object. 2573 */ 2574 return 0; 2575 } 2576 2577 if (ceph_test_mount_opt(ceph_inode_to_fs_client(inode), 2578 NOPOOLPERM)) 2579 return 0; 2580 2581 spin_lock(&ci->i_ceph_lock); 2582 flags = ci->i_ceph_flags; 2583 pool = ci->i_layout.pool_id; 2584 spin_unlock(&ci->i_ceph_lock); 2585 check: 2586 if (flags & CEPH_I_POOL_PERM) { 2587 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { 2588 doutc(cl, "pool %lld no read perm\n", pool); 2589 return -EPERM; 2590 } 2591 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { 2592 doutc(cl, "pool %lld no write perm\n", pool); 2593 return -EPERM; 2594 } 2595 return 0; 2596 } 2597 2598 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); 2599 ret = __ceph_pool_perm_get(ci, pool, pool_ns); 2600 ceph_put_string(pool_ns); 2601 if (ret < 0) 2602 return ret; 2603 2604 flags = CEPH_I_POOL_PERM; 2605 if (ret & POOL_READ) 2606 flags |= CEPH_I_POOL_RD; 2607 if (ret & POOL_WRITE) 2608 flags |= CEPH_I_POOL_WR; 2609 2610 spin_lock(&ci->i_ceph_lock); 2611 if (pool == ci->i_layout.pool_id && 2612 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) { 2613 ci->i_ceph_flags |= flags; 2614 } else { 2615 pool = ci->i_layout.pool_id; 2616 flags = ci->i_ceph_flags; 2617 } 2618 spin_unlock(&ci->i_ceph_lock); 2619 goto check; 2620 } 2621 2622 void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) 2623 { 2624 struct ceph_pool_perm *perm; 2625 struct rb_node *n; 2626 2627 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { 2628 n = rb_first(&mdsc->pool_perm_tree); 2629 perm = rb_entry(n, struct ceph_pool_perm, node); 2630 rb_erase(n, &mdsc->pool_perm_tree); 2631 kfree(perm); 2632 } 2633 } 2634