1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 #include <linux/ceph/striper.h> 4 5 #include <linux/module.h> 6 #include <linux/sched.h> 7 #include <linux/slab.h> 8 #include <linux/file.h> 9 #include <linux/mount.h> 10 #include <linux/namei.h> 11 #include <linux/writeback.h> 12 #include <linux/falloc.h> 13 #include <linux/iversion.h> 14 #include <linux/ktime.h> 15 #include <linux/splice.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "cache.h" 20 #include "io.h" 21 #include "metric.h" 22 23 static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags) 24 { 25 struct ceph_client *cl = mdsc->fsc->client; 26 u32 wire_flags = 0; 27 28 switch (flags & O_ACCMODE) { 29 case O_RDONLY: 30 wire_flags |= CEPH_O_RDONLY; 31 break; 32 case O_WRONLY: 33 wire_flags |= CEPH_O_WRONLY; 34 break; 35 case O_RDWR: 36 wire_flags |= CEPH_O_RDWR; 37 break; 38 } 39 40 flags &= ~O_ACCMODE; 41 42 #define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; } 43 44 ceph_sys2wire(O_CREAT); 45 ceph_sys2wire(O_EXCL); 46 ceph_sys2wire(O_TRUNC); 47 ceph_sys2wire(O_DIRECTORY); 48 ceph_sys2wire(O_NOFOLLOW); 49 50 #undef ceph_sys2wire 51 52 if (flags) 53 doutc(cl, "unused open flags: %x\n", flags); 54 55 return cpu_to_le32(wire_flags); 56 } 57 58 /* 59 * Ceph file operations 60 * 61 * Implement basic open/close functionality, and implement 62 * read/write. 63 * 64 * We implement three modes of file I/O: 65 * - buffered uses the generic_file_aio_{read,write} helpers 66 * 67 * - synchronous is used when there is multi-client read/write 68 * sharing, avoids the page cache, and synchronously waits for an 69 * ack from the OSD. 70 * 71 * - direct io takes the variant of the sync path that references 72 * user pages directly. 73 * 74 * fsync() flushes and waits on dirty pages, but just queues metadata 75 * for writeback: since the MDS can recover size and mtime there is no 76 * need to wait for MDS acknowledgement. 77 */ 78 79 /* 80 * How many pages to get in one call to iov_iter_get_pages(). This 81 * determines the size of the on-stack array used as a buffer. 82 */ 83 #define ITER_GET_BVECS_PAGES 64 84 85 static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, 86 struct bio_vec *bvecs) 87 { 88 size_t size = 0; 89 int bvec_idx = 0; 90 91 if (maxsize > iov_iter_count(iter)) 92 maxsize = iov_iter_count(iter); 93 94 while (size < maxsize) { 95 struct page *pages[ITER_GET_BVECS_PAGES]; 96 ssize_t bytes; 97 size_t start; 98 int idx = 0; 99 100 bytes = iov_iter_get_pages2(iter, pages, maxsize - size, 101 ITER_GET_BVECS_PAGES, &start); 102 if (bytes < 0) 103 return size ?: bytes; 104 105 size += bytes; 106 107 for ( ; bytes; idx++, bvec_idx++) { 108 int len = min_t(int, bytes, PAGE_SIZE - start); 109 110 bvec_set_page(&bvecs[bvec_idx], pages[idx], len, start); 111 bytes -= len; 112 start = 0; 113 } 114 } 115 116 return size; 117 } 118 119 /* 120 * iov_iter_get_pages() only considers one iov_iter segment, no matter 121 * what maxsize or maxpages are given. For ITER_BVEC that is a single 122 * page. 123 * 124 * Attempt to get up to @maxsize bytes worth of pages from @iter. 125 * Return the number of bytes in the created bio_vec array, or an error. 126 */ 127 static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, 128 struct bio_vec **bvecs, int *num_bvecs) 129 { 130 struct bio_vec *bv; 131 size_t orig_count = iov_iter_count(iter); 132 ssize_t bytes; 133 int npages; 134 135 iov_iter_truncate(iter, maxsize); 136 npages = iov_iter_npages(iter, INT_MAX); 137 iov_iter_reexpand(iter, orig_count); 138 139 /* 140 * __iter_get_bvecs() may populate only part of the array -- zero it 141 * out. 142 */ 143 bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); 144 if (!bv) 145 return -ENOMEM; 146 147 bytes = __iter_get_bvecs(iter, maxsize, bv); 148 if (bytes < 0) { 149 /* 150 * No pages were pinned -- just free the array. 151 */ 152 kvfree(bv); 153 return bytes; 154 } 155 156 *bvecs = bv; 157 *num_bvecs = npages; 158 return bytes; 159 } 160 161 static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) 162 { 163 int i; 164 165 for (i = 0; i < num_bvecs; i++) { 166 if (bvecs[i].bv_page) { 167 if (should_dirty) 168 set_page_dirty_lock(bvecs[i].bv_page); 169 put_page(bvecs[i].bv_page); 170 } 171 } 172 kvfree(bvecs); 173 } 174 175 /* 176 * Prepare an open request. Preallocate ceph_cap to avoid an 177 * inopportune ENOMEM later. 178 */ 179 static struct ceph_mds_request * 180 prepare_open_request(struct super_block *sb, int flags, int create_mode) 181 { 182 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb); 183 struct ceph_mds_request *req; 184 int want_auth = USE_ANY_MDS; 185 int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN; 186 187 if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC)) 188 want_auth = USE_AUTH_MDS; 189 190 req = ceph_mdsc_create_request(mdsc, op, want_auth); 191 if (IS_ERR(req)) 192 goto out; 193 req->r_fmode = ceph_flags_to_mode(flags); 194 req->r_args.open.flags = ceph_flags_sys2wire(mdsc, flags); 195 req->r_args.open.mode = cpu_to_le32(create_mode); 196 out: 197 return req; 198 } 199 200 static int ceph_init_file_info(struct inode *inode, struct file *file, 201 int fmode, bool isdir) 202 { 203 struct ceph_inode_info *ci = ceph_inode(inode); 204 struct ceph_mount_options *opt = 205 ceph_inode_to_fs_client(&ci->netfs.inode)->mount_options; 206 struct ceph_client *cl = ceph_inode_to_client(inode); 207 struct ceph_file_info *fi; 208 int ret; 209 210 doutc(cl, "%p %llx.%llx %p 0%o (%s)\n", inode, ceph_vinop(inode), 211 file, inode->i_mode, isdir ? "dir" : "regular"); 212 BUG_ON(inode->i_fop->release != ceph_release); 213 214 if (isdir) { 215 struct ceph_dir_file_info *dfi = 216 kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL); 217 if (!dfi) 218 return -ENOMEM; 219 220 file->private_data = dfi; 221 fi = &dfi->file_info; 222 dfi->next_offset = 2; 223 dfi->readdir_cache_idx = -1; 224 } else { 225 fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL); 226 if (!fi) 227 return -ENOMEM; 228 229 if (opt->flags & CEPH_MOUNT_OPT_NOPAGECACHE) 230 fi->flags |= CEPH_F_SYNC; 231 232 file->private_data = fi; 233 } 234 235 ceph_get_fmode(ci, fmode, 1); 236 fi->fmode = fmode; 237 238 spin_lock_init(&fi->rw_contexts_lock); 239 INIT_LIST_HEAD(&fi->rw_contexts); 240 fi->filp_gen = READ_ONCE(ceph_inode_to_fs_client(inode)->filp_gen); 241 242 if ((file->f_mode & FMODE_WRITE) && ceph_has_inline_data(ci)) { 243 ret = ceph_uninline_data(file); 244 if (ret < 0) 245 goto error; 246 } 247 248 return 0; 249 250 error: 251 ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE); 252 ceph_put_fmode(ci, fi->fmode, 1); 253 kmem_cache_free(ceph_file_cachep, fi); 254 /* wake up anyone waiting for caps on this inode */ 255 wake_up_all(&ci->i_cap_wq); 256 return ret; 257 } 258 259 /* 260 * initialize private struct file data. 261 * if we fail, clean up by dropping fmode reference on the ceph_inode 262 */ 263 static int ceph_init_file(struct inode *inode, struct file *file, int fmode) 264 { 265 struct ceph_client *cl = ceph_inode_to_client(inode); 266 int ret = 0; 267 268 switch (inode->i_mode & S_IFMT) { 269 case S_IFREG: 270 ceph_fscache_use_cookie(inode, file->f_mode & FMODE_WRITE); 271 fallthrough; 272 case S_IFDIR: 273 ret = ceph_init_file_info(inode, file, fmode, 274 S_ISDIR(inode->i_mode)); 275 break; 276 277 case S_IFLNK: 278 doutc(cl, "%p %llx.%llx %p 0%o (symlink)\n", inode, 279 ceph_vinop(inode), file, inode->i_mode); 280 break; 281 282 default: 283 doutc(cl, "%p %llx.%llx %p 0%o (special)\n", inode, 284 ceph_vinop(inode), file, inode->i_mode); 285 /* 286 * we need to drop the open ref now, since we don't 287 * have .release set to ceph_release. 288 */ 289 BUG_ON(inode->i_fop->release == ceph_release); 290 291 /* call the proper open fop */ 292 ret = inode->i_fop->open(inode, file); 293 } 294 return ret; 295 } 296 297 /* 298 * try renew caps after session gets killed. 299 */ 300 int ceph_renew_caps(struct inode *inode, int fmode) 301 { 302 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 303 struct ceph_client *cl = mdsc->fsc->client; 304 struct ceph_inode_info *ci = ceph_inode(inode); 305 struct ceph_mds_request *req; 306 int err, flags, wanted; 307 308 spin_lock(&ci->i_ceph_lock); 309 __ceph_touch_fmode(ci, mdsc, fmode); 310 wanted = __ceph_caps_file_wanted(ci); 311 if (__ceph_is_any_real_caps(ci) && 312 (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) { 313 int issued = __ceph_caps_issued(ci, NULL); 314 spin_unlock(&ci->i_ceph_lock); 315 doutc(cl, "%p %llx.%llx want %s issued %s updating mds_wanted\n", 316 inode, ceph_vinop(inode), ceph_cap_string(wanted), 317 ceph_cap_string(issued)); 318 ceph_check_caps(ci, 0); 319 return 0; 320 } 321 spin_unlock(&ci->i_ceph_lock); 322 323 flags = 0; 324 if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR)) 325 flags = O_RDWR; 326 else if (wanted & CEPH_CAP_FILE_RD) 327 flags = O_RDONLY; 328 else if (wanted & CEPH_CAP_FILE_WR) 329 flags = O_WRONLY; 330 #ifdef O_LAZY 331 if (wanted & CEPH_CAP_FILE_LAZYIO) 332 flags |= O_LAZY; 333 #endif 334 335 req = prepare_open_request(inode->i_sb, flags, 0); 336 if (IS_ERR(req)) { 337 err = PTR_ERR(req); 338 goto out; 339 } 340 341 req->r_inode = inode; 342 ihold(inode); 343 req->r_num_caps = 1; 344 345 err = ceph_mdsc_do_request(mdsc, NULL, req); 346 ceph_mdsc_put_request(req); 347 out: 348 doutc(cl, "%p %llx.%llx open result=%d\n", inode, ceph_vinop(inode), 349 err); 350 return err < 0 ? err : 0; 351 } 352 353 /* 354 * If we already have the requisite capabilities, we can satisfy 355 * the open request locally (no need to request new caps from the 356 * MDS). We do, however, need to inform the MDS (asynchronously) 357 * if our wanted caps set expands. 358 */ 359 int ceph_open(struct inode *inode, struct file *file) 360 { 361 struct ceph_inode_info *ci = ceph_inode(inode); 362 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(inode->i_sb); 363 struct ceph_client *cl = fsc->client; 364 struct ceph_mds_client *mdsc = fsc->mdsc; 365 struct ceph_mds_request *req; 366 struct ceph_file_info *fi = file->private_data; 367 int err; 368 int flags, fmode, wanted; 369 struct dentry *dentry; 370 char *path; 371 bool do_sync = false; 372 int mask = MAY_READ; 373 374 if (fi) { 375 doutc(cl, "file %p is already opened\n", file); 376 return 0; 377 } 378 379 /* filter out O_CREAT|O_EXCL; vfs did that already. yuck. */ 380 flags = file->f_flags & ~(O_CREAT|O_EXCL); 381 if (S_ISDIR(inode->i_mode)) { 382 flags = O_DIRECTORY; /* mds likes to know */ 383 } else if (S_ISREG(inode->i_mode)) { 384 err = fscrypt_file_open(inode, file); 385 if (err) 386 return err; 387 } 388 389 doutc(cl, "%p %llx.%llx file %p flags %d (%d)\n", inode, 390 ceph_vinop(inode), file, flags, file->f_flags); 391 fmode = ceph_flags_to_mode(flags); 392 wanted = ceph_caps_for_mode(fmode); 393 394 if (fmode & CEPH_FILE_MODE_WR) 395 mask |= MAY_WRITE; 396 dentry = d_find_alias(inode); 397 if (!dentry) { 398 do_sync = true; 399 } else { 400 struct ceph_path_info path_info; 401 path = ceph_mdsc_build_path(mdsc, dentry, &path_info, 0); 402 if (IS_ERR(path)) { 403 do_sync = true; 404 err = 0; 405 } else { 406 err = ceph_mds_check_access(mdsc, path, mask); 407 } 408 ceph_mdsc_free_path_info(&path_info); 409 dput(dentry); 410 411 /* For none EACCES cases will let the MDS do the mds auth check */ 412 if (err == -EACCES) { 413 return err; 414 } else if (err < 0) { 415 do_sync = true; 416 err = 0; 417 } 418 } 419 420 /* snapped files are read-only */ 421 if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE)) 422 return -EROFS; 423 424 /* trivially open snapdir */ 425 if (ceph_snap(inode) == CEPH_SNAPDIR) { 426 return ceph_init_file(inode, file, fmode); 427 } 428 429 /* 430 * No need to block if we have caps on the auth MDS (for 431 * write) or any MDS (for read). Update wanted set 432 * asynchronously. 433 */ 434 spin_lock(&ci->i_ceph_lock); 435 if (!do_sync && __ceph_is_any_real_caps(ci) && 436 (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { 437 int mds_wanted = __ceph_caps_mds_wanted(ci, true); 438 int issued = __ceph_caps_issued(ci, NULL); 439 440 doutc(cl, "open %p fmode %d want %s issued %s using existing\n", 441 inode, fmode, ceph_cap_string(wanted), 442 ceph_cap_string(issued)); 443 __ceph_touch_fmode(ci, mdsc, fmode); 444 spin_unlock(&ci->i_ceph_lock); 445 446 /* adjust wanted? */ 447 if ((issued & wanted) != wanted && 448 (mds_wanted & wanted) != wanted && 449 ceph_snap(inode) != CEPH_SNAPDIR) 450 ceph_check_caps(ci, 0); 451 452 return ceph_init_file(inode, file, fmode); 453 } else if (!do_sync && ceph_snap(inode) != CEPH_NOSNAP && 454 (ci->i_snap_caps & wanted) == wanted) { 455 __ceph_touch_fmode(ci, mdsc, fmode); 456 spin_unlock(&ci->i_ceph_lock); 457 return ceph_init_file(inode, file, fmode); 458 } 459 460 spin_unlock(&ci->i_ceph_lock); 461 462 doutc(cl, "open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); 463 req = prepare_open_request(inode->i_sb, flags, 0); 464 if (IS_ERR(req)) { 465 err = PTR_ERR(req); 466 goto out; 467 } 468 req->r_inode = inode; 469 ihold(inode); 470 471 req->r_num_caps = 1; 472 err = ceph_mdsc_do_request(mdsc, NULL, req); 473 if (!err) 474 err = ceph_init_file(inode, file, req->r_fmode); 475 ceph_mdsc_put_request(req); 476 doutc(cl, "open result=%d on %llx.%llx\n", err, ceph_vinop(inode)); 477 out: 478 return err; 479 } 480 481 /* Clone the layout from a synchronous create, if the dir now has Dc caps */ 482 static void 483 cache_file_layout(struct inode *dst, struct inode *src) 484 { 485 struct ceph_inode_info *cdst = ceph_inode(dst); 486 struct ceph_inode_info *csrc = ceph_inode(src); 487 488 spin_lock(&cdst->i_ceph_lock); 489 if ((__ceph_caps_issued(cdst, NULL) & CEPH_CAP_DIR_CREATE) && 490 !ceph_file_layout_is_valid(&cdst->i_cached_layout)) { 491 memcpy(&cdst->i_cached_layout, &csrc->i_layout, 492 sizeof(cdst->i_cached_layout)); 493 rcu_assign_pointer(cdst->i_cached_layout.pool_ns, 494 ceph_try_get_string(csrc->i_layout.pool_ns)); 495 } 496 spin_unlock(&cdst->i_ceph_lock); 497 } 498 499 /* 500 * Try to set up an async create. We need caps, a file layout, and inode number, 501 * and either a lease on the dentry or complete dir info. If any of those 502 * criteria are not satisfied, then return false and the caller can go 503 * synchronous. 504 */ 505 static int try_prep_async_create(struct inode *dir, struct dentry *dentry, 506 struct ceph_file_layout *lo, u64 *pino) 507 { 508 struct ceph_inode_info *ci = ceph_inode(dir); 509 struct ceph_dentry_info *di = ceph_dentry(dentry); 510 int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_CREATE; 511 u64 ino; 512 513 spin_lock(&ci->i_ceph_lock); 514 /* No auth cap means no chance for Dc caps */ 515 if (!ci->i_auth_cap) 516 goto no_async; 517 518 /* Any delegated inos? */ 519 if (xa_empty(&ci->i_auth_cap->session->s_delegated_inos)) 520 goto no_async; 521 522 if (!ceph_file_layout_is_valid(&ci->i_cached_layout)) 523 goto no_async; 524 525 if ((__ceph_caps_issued(ci, NULL) & want) != want) 526 goto no_async; 527 528 if (d_in_lookup(dentry)) { 529 if (!__ceph_dir_is_complete(ci)) 530 goto no_async; 531 spin_lock(&dentry->d_lock); 532 di->lease_shared_gen = atomic_read(&ci->i_shared_gen); 533 spin_unlock(&dentry->d_lock); 534 } else if (atomic_read(&ci->i_shared_gen) != 535 READ_ONCE(di->lease_shared_gen)) { 536 goto no_async; 537 } 538 539 ino = ceph_get_deleg_ino(ci->i_auth_cap->session); 540 if (!ino) 541 goto no_async; 542 543 *pino = ino; 544 ceph_take_cap_refs(ci, want, false); 545 memcpy(lo, &ci->i_cached_layout, sizeof(*lo)); 546 rcu_assign_pointer(lo->pool_ns, 547 ceph_try_get_string(ci->i_cached_layout.pool_ns)); 548 got = want; 549 no_async: 550 spin_unlock(&ci->i_ceph_lock); 551 return got; 552 } 553 554 static void restore_deleg_ino(struct inode *dir, u64 ino) 555 { 556 struct ceph_client *cl = ceph_inode_to_client(dir); 557 struct ceph_inode_info *ci = ceph_inode(dir); 558 struct ceph_mds_session *s = NULL; 559 560 spin_lock(&ci->i_ceph_lock); 561 if (ci->i_auth_cap) 562 s = ceph_get_mds_session(ci->i_auth_cap->session); 563 spin_unlock(&ci->i_ceph_lock); 564 if (s) { 565 int err = ceph_restore_deleg_ino(s, ino); 566 if (err) 567 pr_warn_client(cl, 568 "unable to restore delegated ino 0x%llx to session: %d\n", 569 ino, err); 570 ceph_put_mds_session(s); 571 } 572 } 573 574 static void wake_async_create_waiters(struct inode *inode, 575 struct ceph_mds_session *session) 576 { 577 struct ceph_inode_info *ci = ceph_inode(inode); 578 bool check_cap = false; 579 580 spin_lock(&ci->i_ceph_lock); 581 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) { 582 clear_and_wake_up_bit(CEPH_ASYNC_CREATE_BIT, &ci->i_ceph_flags); 583 584 if (ci->i_ceph_flags & CEPH_I_ASYNC_CHECK_CAPS) { 585 ci->i_ceph_flags &= ~CEPH_I_ASYNC_CHECK_CAPS; 586 check_cap = true; 587 } 588 } 589 ceph_kick_flushing_inode_caps(session, ci); 590 spin_unlock(&ci->i_ceph_lock); 591 592 if (check_cap) 593 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 594 } 595 596 static void ceph_async_create_cb(struct ceph_mds_client *mdsc, 597 struct ceph_mds_request *req) 598 { 599 struct ceph_client *cl = mdsc->fsc->client; 600 struct dentry *dentry = req->r_dentry; 601 struct inode *dinode = d_inode(dentry); 602 struct inode *tinode = req->r_target_inode; 603 int result = req->r_err ? req->r_err : 604 le32_to_cpu(req->r_reply_info.head->result); 605 606 WARN_ON_ONCE(dinode && tinode && dinode != tinode); 607 608 /* MDS changed -- caller must resubmit */ 609 if (result == -EJUKEBOX) 610 goto out; 611 612 mapping_set_error(req->r_parent->i_mapping, result); 613 614 if (result) { 615 struct ceph_path_info path_info = {0}; 616 char *path = ceph_mdsc_build_path(mdsc, req->r_dentry, &path_info, 0); 617 618 pr_warn_client(cl, 619 "async create failure path=(%llx)%s result=%d!\n", 620 path_info.vino.ino, IS_ERR(path) ? "<<bad>>" : path, result); 621 ceph_mdsc_free_path_info(&path_info); 622 623 ceph_dir_clear_complete(req->r_parent); 624 if (!d_unhashed(dentry)) 625 d_drop(dentry); 626 627 if (dinode) { 628 mapping_set_error(dinode->i_mapping, result); 629 ceph_inode_shutdown(dinode); 630 wake_async_create_waiters(dinode, req->r_session); 631 } 632 } 633 634 if (tinode) { 635 u64 ino = ceph_vino(tinode).ino; 636 637 if (req->r_deleg_ino != ino) 638 pr_warn_client(cl, 639 "inode number mismatch! err=%d deleg_ino=0x%llx target=0x%llx\n", 640 req->r_err, req->r_deleg_ino, ino); 641 642 mapping_set_error(tinode->i_mapping, result); 643 wake_async_create_waiters(tinode, req->r_session); 644 } else if (!result) { 645 pr_warn_client(cl, "no req->r_target_inode for 0x%llx\n", 646 req->r_deleg_ino); 647 } 648 out: 649 ceph_mdsc_release_dir_caps(req); 650 } 651 652 static int ceph_finish_async_create(struct inode *dir, struct inode *inode, 653 struct dentry *dentry, 654 struct file *file, umode_t mode, 655 struct ceph_mds_request *req, 656 struct ceph_acl_sec_ctx *as_ctx, 657 struct ceph_file_layout *lo) 658 { 659 int ret; 660 char xattr_buf[4]; 661 struct ceph_mds_reply_inode in = { }; 662 struct ceph_mds_reply_info_in iinfo = { .in = &in }; 663 struct ceph_inode_info *ci = ceph_inode(dir); 664 struct ceph_dentry_info *di = ceph_dentry(dentry); 665 struct timespec64 now; 666 struct ceph_string *pool_ns; 667 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); 668 struct ceph_client *cl = mdsc->fsc->client; 669 struct ceph_vino vino = { .ino = req->r_deleg_ino, 670 .snap = CEPH_NOSNAP }; 671 672 ktime_get_real_ts64(&now); 673 674 iinfo.inline_version = CEPH_INLINE_NONE; 675 iinfo.change_attr = 1; 676 ceph_encode_timespec64(&iinfo.btime, &now); 677 678 if (req->r_pagelist) { 679 iinfo.xattr_len = req->r_pagelist->length; 680 iinfo.xattr_data = req->r_pagelist->mapped_tail; 681 } else { 682 /* fake it */ 683 iinfo.xattr_len = ARRAY_SIZE(xattr_buf); 684 iinfo.xattr_data = xattr_buf; 685 memset(iinfo.xattr_data, 0, iinfo.xattr_len); 686 } 687 688 in.ino = cpu_to_le64(vino.ino); 689 in.snapid = cpu_to_le64(CEPH_NOSNAP); 690 in.version = cpu_to_le64(1); // ??? 691 in.cap.caps = in.cap.wanted = cpu_to_le32(CEPH_CAP_ALL_FILE); 692 in.cap.cap_id = cpu_to_le64(1); 693 in.cap.realm = cpu_to_le64(ci->i_snap_realm->ino); 694 in.cap.flags = CEPH_CAP_FLAG_AUTH; 695 in.ctime = in.mtime = in.atime = iinfo.btime; 696 in.truncate_seq = cpu_to_le32(1); 697 in.truncate_size = cpu_to_le64(-1ULL); 698 in.xattr_version = cpu_to_le64(1); 699 in.uid = cpu_to_le32(from_kuid(&init_user_ns, 700 mapped_fsuid(req->r_mnt_idmap, 701 &init_user_ns))); 702 if (dir->i_mode & S_ISGID) { 703 in.gid = cpu_to_le32(from_kgid(&init_user_ns, dir->i_gid)); 704 705 /* Directories always inherit the setgid bit. */ 706 if (S_ISDIR(mode)) 707 mode |= S_ISGID; 708 } else { 709 in.gid = cpu_to_le32(from_kgid(&init_user_ns, 710 mapped_fsgid(req->r_mnt_idmap, 711 &init_user_ns))); 712 } 713 in.mode = cpu_to_le32((u32)mode); 714 715 in.nlink = cpu_to_le32(1); 716 in.max_size = cpu_to_le64(lo->stripe_unit); 717 718 ceph_file_layout_to_legacy(lo, &in.layout); 719 /* lo is private, so pool_ns can't change */ 720 pool_ns = rcu_dereference_raw(lo->pool_ns); 721 if (pool_ns) { 722 iinfo.pool_ns_len = pool_ns->len; 723 iinfo.pool_ns_data = pool_ns->str; 724 } 725 726 down_read(&mdsc->snap_rwsem); 727 ret = ceph_fill_inode(inode, NULL, &iinfo, NULL, req->r_session, 728 req->r_fmode, NULL); 729 up_read(&mdsc->snap_rwsem); 730 if (ret) { 731 doutc(cl, "failed to fill inode: %d\n", ret); 732 ceph_dir_clear_complete(dir); 733 if (!d_unhashed(dentry)) 734 d_drop(dentry); 735 discard_new_inode(inode); 736 } else { 737 struct dentry *dn; 738 739 doutc(cl, "d_adding new inode 0x%llx to 0x%llx/%s\n", 740 vino.ino, ceph_ino(dir), dentry->d_name.name); 741 ceph_dir_clear_ordered(dir); 742 ceph_init_inode_acls(inode, as_ctx); 743 if (inode->i_state & I_NEW) { 744 /* 745 * If it's not I_NEW, then someone created this before 746 * we got here. Assume the server is aware of it at 747 * that point and don't worry about setting 748 * CEPH_I_ASYNC_CREATE. 749 */ 750 ceph_inode(inode)->i_ceph_flags = CEPH_I_ASYNC_CREATE; 751 unlock_new_inode(inode); 752 } 753 if (d_in_lookup(dentry) || d_really_is_negative(dentry)) { 754 if (!d_unhashed(dentry)) 755 d_drop(dentry); 756 dn = d_splice_alias(inode, dentry); 757 WARN_ON_ONCE(dn && dn != dentry); 758 } 759 file->f_mode |= FMODE_CREATED; 760 ret = finish_open(file, dentry, ceph_open); 761 } 762 763 spin_lock(&dentry->d_lock); 764 clear_and_wake_up_bit(CEPH_DENTRY_ASYNC_CREATE_BIT, &di->flags); 765 spin_unlock(&dentry->d_lock); 766 767 return ret; 768 } 769 770 /* 771 * Do a lookup + open with a single request. If we get a non-existent 772 * file or symlink, return 1 so the VFS can retry. 773 */ 774 int ceph_atomic_open(struct inode *dir, struct dentry *dentry, 775 struct file *file, unsigned flags, umode_t mode) 776 { 777 struct mnt_idmap *idmap = file_mnt_idmap(file); 778 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dir->i_sb); 779 struct ceph_client *cl = fsc->client; 780 struct ceph_mds_client *mdsc = fsc->mdsc; 781 struct ceph_mds_request *req; 782 struct inode *new_inode = NULL; 783 struct dentry *dn; 784 struct ceph_acl_sec_ctx as_ctx = {}; 785 bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS); 786 int mask; 787 int err; 788 char *path; 789 790 doutc(cl, "%p %llx.%llx dentry %p '%pd' %s flags %d mode 0%o\n", 791 dir, ceph_vinop(dir), dentry, dentry, 792 d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode); 793 794 if (dentry->d_name.len > NAME_MAX) 795 return -ENAMETOOLONG; 796 797 err = ceph_wait_on_conflict_unlink(dentry); 798 if (err) 799 return err; 800 /* 801 * Do not truncate the file, since atomic_open is called before the 802 * permission check. The caller will do the truncation afterward. 803 */ 804 flags &= ~O_TRUNC; 805 806 dn = d_find_alias(dir); 807 if (!dn) { 808 try_async = false; 809 } else { 810 struct ceph_path_info path_info; 811 path = ceph_mdsc_build_path(mdsc, dn, &path_info, 0); 812 if (IS_ERR(path)) { 813 try_async = false; 814 err = 0; 815 } else { 816 int fmode = ceph_flags_to_mode(flags); 817 818 mask = MAY_READ; 819 if (fmode & CEPH_FILE_MODE_WR) 820 mask |= MAY_WRITE; 821 err = ceph_mds_check_access(mdsc, path, mask); 822 } 823 ceph_mdsc_free_path_info(&path_info); 824 dput(dn); 825 826 /* For none EACCES cases will let the MDS do the mds auth check */ 827 if (err == -EACCES) { 828 return err; 829 } else if (err < 0) { 830 try_async = false; 831 err = 0; 832 } 833 } 834 835 retry: 836 if (flags & O_CREAT) { 837 if (ceph_quota_is_max_files_exceeded(dir)) 838 return -EDQUOT; 839 840 new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx); 841 if (IS_ERR(new_inode)) { 842 err = PTR_ERR(new_inode); 843 goto out_ctx; 844 } 845 /* Async create can't handle more than a page of xattrs */ 846 if (as_ctx.pagelist && 847 !list_is_singular(&as_ctx.pagelist->head)) 848 try_async = false; 849 } else if (!d_in_lookup(dentry)) { 850 /* If it's not being looked up, it's negative */ 851 return -ENOENT; 852 } 853 854 /* do the open */ 855 req = prepare_open_request(dir->i_sb, flags, mode); 856 if (IS_ERR(req)) { 857 err = PTR_ERR(req); 858 goto out_ctx; 859 } 860 req->r_dentry = dget(dentry); 861 req->r_num_caps = 2; 862 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 863 if (ceph_security_xattr_wanted(dir)) 864 mask |= CEPH_CAP_XATTR_SHARED; 865 req->r_args.open.mask = cpu_to_le32(mask); 866 req->r_parent = dir; 867 if (req->r_op == CEPH_MDS_OP_CREATE) 868 req->r_mnt_idmap = mnt_idmap_get(idmap); 869 ihold(dir); 870 if (IS_ENCRYPTED(dir)) { 871 set_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags); 872 err = fscrypt_prepare_lookup_partial(dir, dentry); 873 if (err < 0) 874 goto out_req; 875 } 876 877 if (flags & O_CREAT) { 878 struct ceph_file_layout lo; 879 880 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL | 881 CEPH_CAP_XATTR_EXCL; 882 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 883 884 ceph_as_ctx_to_req(req, &as_ctx); 885 886 if (try_async && (req->r_dir_caps = 887 try_prep_async_create(dir, dentry, &lo, 888 &req->r_deleg_ino))) { 889 struct ceph_vino vino = { .ino = req->r_deleg_ino, 890 .snap = CEPH_NOSNAP }; 891 struct ceph_dentry_info *di = ceph_dentry(dentry); 892 893 set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags); 894 req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL); 895 req->r_callback = ceph_async_create_cb; 896 897 /* Hash inode before RPC */ 898 new_inode = ceph_get_inode(dir->i_sb, vino, new_inode); 899 if (IS_ERR(new_inode)) { 900 err = PTR_ERR(new_inode); 901 new_inode = NULL; 902 goto out_req; 903 } 904 WARN_ON_ONCE(!(new_inode->i_state & I_NEW)); 905 906 spin_lock(&dentry->d_lock); 907 di->flags |= CEPH_DENTRY_ASYNC_CREATE; 908 spin_unlock(&dentry->d_lock); 909 910 err = ceph_mdsc_submit_request(mdsc, dir, req); 911 if (!err) { 912 err = ceph_finish_async_create(dir, new_inode, 913 dentry, file, 914 mode, req, 915 &as_ctx, &lo); 916 new_inode = NULL; 917 } else if (err == -EJUKEBOX) { 918 restore_deleg_ino(dir, req->r_deleg_ino); 919 ceph_mdsc_put_request(req); 920 discard_new_inode(new_inode); 921 ceph_release_acl_sec_ctx(&as_ctx); 922 memset(&as_ctx, 0, sizeof(as_ctx)); 923 new_inode = NULL; 924 try_async = false; 925 ceph_put_string(rcu_dereference_raw(lo.pool_ns)); 926 goto retry; 927 } 928 ceph_put_string(rcu_dereference_raw(lo.pool_ns)); 929 goto out_req; 930 } 931 } 932 933 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 934 req->r_new_inode = new_inode; 935 new_inode = NULL; 936 err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req); 937 if (err == -ENOENT) { 938 dentry = ceph_handle_snapdir(req, dentry); 939 if (IS_ERR(dentry)) { 940 err = PTR_ERR(dentry); 941 goto out_req; 942 } 943 err = 0; 944 } 945 946 if (!err && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 947 err = ceph_handle_notrace_create(dir, dentry); 948 949 if (d_in_lookup(dentry)) { 950 dn = ceph_finish_lookup(req, dentry, err); 951 if (IS_ERR(dn)) 952 err = PTR_ERR(dn); 953 } else { 954 /* we were given a hashed negative dentry */ 955 dn = NULL; 956 } 957 if (err) 958 goto out_req; 959 if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) { 960 /* make vfs retry on splice, ENOENT, or symlink */ 961 doutc(cl, "finish_no_open on dn %p\n", dn); 962 err = finish_no_open(file, dn); 963 } else { 964 if (IS_ENCRYPTED(dir) && 965 !fscrypt_has_permitted_context(dir, d_inode(dentry))) { 966 pr_warn_client(cl, 967 "Inconsistent encryption context (parent %llx:%llx child %llx:%llx)\n", 968 ceph_vinop(dir), ceph_vinop(d_inode(dentry))); 969 goto out_req; 970 } 971 972 doutc(cl, "finish_open on dn %p\n", dn); 973 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { 974 struct inode *newino = d_inode(dentry); 975 976 cache_file_layout(dir, newino); 977 ceph_init_inode_acls(newino, &as_ctx); 978 file->f_mode |= FMODE_CREATED; 979 } 980 err = finish_open(file, dentry, ceph_open); 981 } 982 out_req: 983 ceph_mdsc_put_request(req); 984 iput(new_inode); 985 out_ctx: 986 ceph_release_acl_sec_ctx(&as_ctx); 987 doutc(cl, "result=%d\n", err); 988 return err; 989 } 990 991 int ceph_release(struct inode *inode, struct file *file) 992 { 993 struct ceph_client *cl = ceph_inode_to_client(inode); 994 struct ceph_inode_info *ci = ceph_inode(inode); 995 996 if (S_ISDIR(inode->i_mode)) { 997 struct ceph_dir_file_info *dfi = file->private_data; 998 doutc(cl, "%p %llx.%llx dir file %p\n", inode, 999 ceph_vinop(inode), file); 1000 WARN_ON(!list_empty(&dfi->file_info.rw_contexts)); 1001 1002 ceph_put_fmode(ci, dfi->file_info.fmode, 1); 1003 1004 if (dfi->last_readdir) 1005 ceph_mdsc_put_request(dfi->last_readdir); 1006 kfree(dfi->last_name); 1007 kfree(dfi->dir_info); 1008 kmem_cache_free(ceph_dir_file_cachep, dfi); 1009 } else { 1010 struct ceph_file_info *fi = file->private_data; 1011 doutc(cl, "%p %llx.%llx regular file %p\n", inode, 1012 ceph_vinop(inode), file); 1013 WARN_ON(!list_empty(&fi->rw_contexts)); 1014 1015 ceph_fscache_unuse_cookie(inode, file->f_mode & FMODE_WRITE); 1016 ceph_put_fmode(ci, fi->fmode, 1); 1017 1018 kmem_cache_free(ceph_file_cachep, fi); 1019 } 1020 1021 /* wake up anyone waiting for caps on this inode */ 1022 wake_up_all(&ci->i_cap_wq); 1023 return 0; 1024 } 1025 1026 enum { 1027 HAVE_RETRIED = 1, 1028 CHECK_EOF = 2, 1029 READ_INLINE = 3, 1030 }; 1031 1032 /* 1033 * Completely synchronous read and write methods. Direct from __user 1034 * buffer to osd, or directly to user pages (if O_DIRECT). 1035 * 1036 * If the read spans object boundary, just do multiple reads. (That's not 1037 * atomic, but good enough for now.) 1038 * 1039 * If we get a short result from the OSD, check against i_size; we need to 1040 * only return a short read to the caller if we hit EOF. 1041 */ 1042 ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos, 1043 struct iov_iter *to, int *retry_op, 1044 u64 *last_objver) 1045 { 1046 struct ceph_inode_info *ci = ceph_inode(inode); 1047 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1048 struct ceph_client *cl = fsc->client; 1049 struct ceph_osd_client *osdc = &fsc->client->osdc; 1050 ssize_t ret; 1051 u64 off = *ki_pos; 1052 u64 len = iov_iter_count(to); 1053 u64 i_size = i_size_read(inode); 1054 bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD); 1055 u64 objver = 0; 1056 1057 doutc(cl, "on inode %p %llx.%llx %llx~%llx\n", inode, 1058 ceph_vinop(inode), *ki_pos, len); 1059 1060 if (ceph_inode_is_shutdown(inode)) 1061 return -EIO; 1062 1063 if (!len || !i_size) 1064 return 0; 1065 /* 1066 * flush any page cache pages in this range. this 1067 * will make concurrent normal and sync io slow, 1068 * but it will at least behave sensibly when they are 1069 * in sequence. 1070 */ 1071 ret = filemap_write_and_wait_range(inode->i_mapping, 1072 off, off + len - 1); 1073 if (ret < 0) 1074 return ret; 1075 1076 ret = 0; 1077 while ((len = iov_iter_count(to)) > 0) { 1078 struct ceph_osd_request *req; 1079 struct page **pages; 1080 int num_pages; 1081 size_t page_off; 1082 bool more; 1083 int idx = 0; 1084 size_t left; 1085 struct ceph_osd_req_op *op; 1086 u64 read_off = off; 1087 u64 read_len = len; 1088 int extent_cnt; 1089 1090 /* determine new offset/length if encrypted */ 1091 ceph_fscrypt_adjust_off_and_len(inode, &read_off, &read_len); 1092 1093 doutc(cl, "orig %llu~%llu reading %llu~%llu", off, len, 1094 read_off, read_len); 1095 1096 req = ceph_osdc_new_request(osdc, &ci->i_layout, 1097 ci->i_vino, read_off, &read_len, 0, 1, 1098 sparse ? CEPH_OSD_OP_SPARSE_READ : 1099 CEPH_OSD_OP_READ, 1100 CEPH_OSD_FLAG_READ, 1101 NULL, ci->i_truncate_seq, 1102 ci->i_truncate_size, false); 1103 if (IS_ERR(req)) { 1104 ret = PTR_ERR(req); 1105 break; 1106 } 1107 1108 /* adjust len downward if the request truncated the len */ 1109 if (off + len > read_off + read_len) 1110 len = read_off + read_len - off; 1111 more = len < iov_iter_count(to); 1112 1113 op = &req->r_ops[0]; 1114 if (sparse) { 1115 extent_cnt = __ceph_sparse_read_ext_count(inode, read_len); 1116 ret = ceph_alloc_sparse_ext_map(op, extent_cnt); 1117 if (ret) { 1118 ceph_osdc_put_request(req); 1119 break; 1120 } 1121 } 1122 1123 num_pages = calc_pages_for(read_off, read_len); 1124 page_off = offset_in_page(off); 1125 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1126 if (IS_ERR(pages)) { 1127 ceph_osdc_put_request(req); 1128 ret = PTR_ERR(pages); 1129 break; 1130 } 1131 1132 osd_req_op_extent_osd_data_pages(req, 0, pages, read_len, 1133 offset_in_page(read_off), 1134 false, true); 1135 1136 ceph_osdc_start_request(osdc, req); 1137 ret = ceph_osdc_wait_request(osdc, req); 1138 1139 ceph_update_read_metrics(&fsc->mdsc->metric, 1140 req->r_start_latency, 1141 req->r_end_latency, 1142 read_len, ret); 1143 1144 if (ret > 0) 1145 objver = req->r_version; 1146 1147 i_size = i_size_read(inode); 1148 doutc(cl, "%llu~%llu got %zd i_size %llu%s\n", off, len, 1149 ret, i_size, (more ? " MORE" : "")); 1150 1151 /* Fix it to go to end of extent map */ 1152 if (sparse && ret >= 0) 1153 ret = ceph_sparse_ext_map_end(op); 1154 else if (ret == -ENOENT) 1155 ret = 0; 1156 1157 if (ret < 0) { 1158 ceph_osdc_put_request(req); 1159 if (ret == -EBLOCKLISTED) 1160 fsc->blocklisted = true; 1161 break; 1162 } 1163 1164 if (IS_ENCRYPTED(inode)) { 1165 int fret; 1166 1167 fret = ceph_fscrypt_decrypt_extents(inode, pages, 1168 read_off, op->extent.sparse_ext, 1169 op->extent.sparse_ext_cnt); 1170 if (fret < 0) { 1171 ret = fret; 1172 ceph_osdc_put_request(req); 1173 break; 1174 } 1175 1176 /* account for any partial block at the beginning */ 1177 fret -= (off - read_off); 1178 1179 /* 1180 * Short read after big offset adjustment? 1181 * Nothing is usable, just call it a zero 1182 * len read. 1183 */ 1184 fret = max(fret, 0); 1185 1186 /* account for partial block at the end */ 1187 ret = min_t(ssize_t, fret, len); 1188 } 1189 1190 /* Short read but not EOF? Zero out the remainder. */ 1191 if (ret < len && (off + ret < i_size)) { 1192 int zlen = min(len - ret, i_size - off - ret); 1193 int zoff = page_off + ret; 1194 1195 doutc(cl, "zero gap %llu~%llu\n", off + ret, 1196 off + ret + zlen); 1197 ceph_zero_page_vector_range(zoff, zlen, pages); 1198 ret += zlen; 1199 } 1200 1201 if (off + ret > i_size) 1202 left = (i_size > off) ? i_size - off : 0; 1203 else 1204 left = ret; 1205 1206 while (left > 0) { 1207 size_t plen, copied; 1208 1209 plen = min_t(size_t, left, PAGE_SIZE - page_off); 1210 SetPageUptodate(pages[idx]); 1211 copied = copy_page_to_iter(pages[idx++], 1212 page_off, plen, to); 1213 off += copied; 1214 left -= copied; 1215 page_off = 0; 1216 if (copied < plen) { 1217 ret = -EFAULT; 1218 break; 1219 } 1220 } 1221 1222 ceph_osdc_put_request(req); 1223 1224 if (off >= i_size || !more) 1225 break; 1226 } 1227 1228 if (ret > 0) { 1229 if (off >= i_size) { 1230 *retry_op = CHECK_EOF; 1231 ret = i_size - *ki_pos; 1232 *ki_pos = i_size; 1233 } else { 1234 ret = off - *ki_pos; 1235 *ki_pos = off; 1236 } 1237 1238 if (last_objver) 1239 *last_objver = objver; 1240 } 1241 doutc(cl, "result %zd retry_op %d\n", ret, *retry_op); 1242 return ret; 1243 } 1244 1245 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, 1246 int *retry_op) 1247 { 1248 struct file *file = iocb->ki_filp; 1249 struct inode *inode = file_inode(file); 1250 struct ceph_client *cl = ceph_inode_to_client(inode); 1251 1252 doutc(cl, "on file %p %llx~%zx %s\n", file, iocb->ki_pos, 1253 iov_iter_count(to), 1254 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 1255 1256 return __ceph_sync_read(inode, &iocb->ki_pos, to, retry_op, NULL); 1257 } 1258 1259 struct ceph_aio_request { 1260 struct kiocb *iocb; 1261 size_t total_len; 1262 bool write; 1263 bool should_dirty; 1264 int error; 1265 struct list_head osd_reqs; 1266 unsigned num_reqs; 1267 atomic_t pending_reqs; 1268 struct timespec64 mtime; 1269 struct ceph_cap_flush *prealloc_cf; 1270 }; 1271 1272 struct ceph_aio_work { 1273 struct work_struct work; 1274 struct ceph_osd_request *req; 1275 }; 1276 1277 static void ceph_aio_retry_work(struct work_struct *work); 1278 1279 static void ceph_aio_complete(struct inode *inode, 1280 struct ceph_aio_request *aio_req) 1281 { 1282 struct ceph_client *cl = ceph_inode_to_client(inode); 1283 struct ceph_inode_info *ci = ceph_inode(inode); 1284 int ret; 1285 1286 if (!atomic_dec_and_test(&aio_req->pending_reqs)) 1287 return; 1288 1289 if (aio_req->iocb->ki_flags & IOCB_DIRECT) 1290 inode_dio_end(inode); 1291 1292 ret = aio_req->error; 1293 if (!ret) 1294 ret = aio_req->total_len; 1295 1296 doutc(cl, "%p %llx.%llx rc %d\n", inode, ceph_vinop(inode), ret); 1297 1298 if (ret >= 0 && aio_req->write) { 1299 int dirty; 1300 1301 loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len; 1302 if (endoff > i_size_read(inode)) { 1303 if (ceph_inode_set_size(inode, endoff)) 1304 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY); 1305 } 1306 1307 spin_lock(&ci->i_ceph_lock); 1308 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 1309 &aio_req->prealloc_cf); 1310 spin_unlock(&ci->i_ceph_lock); 1311 if (dirty) 1312 __mark_inode_dirty(inode, dirty); 1313 1314 } 1315 1316 ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR : 1317 CEPH_CAP_FILE_RD)); 1318 1319 aio_req->iocb->ki_complete(aio_req->iocb, ret); 1320 1321 ceph_free_cap_flush(aio_req->prealloc_cf); 1322 kfree(aio_req); 1323 } 1324 1325 static void ceph_aio_complete_req(struct ceph_osd_request *req) 1326 { 1327 int rc = req->r_result; 1328 struct inode *inode = req->r_inode; 1329 struct ceph_aio_request *aio_req = req->r_priv; 1330 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 1331 struct ceph_osd_req_op *op = &req->r_ops[0]; 1332 struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric; 1333 unsigned int len = osd_data->bvec_pos.iter.bi_size; 1334 bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); 1335 struct ceph_client *cl = ceph_inode_to_client(inode); 1336 1337 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); 1338 BUG_ON(!osd_data->num_bvecs); 1339 1340 doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %u\n", req, 1341 inode, ceph_vinop(inode), rc, len); 1342 1343 if (rc == -EOLDSNAPC) { 1344 struct ceph_aio_work *aio_work; 1345 BUG_ON(!aio_req->write); 1346 1347 aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS); 1348 if (aio_work) { 1349 INIT_WORK(&aio_work->work, ceph_aio_retry_work); 1350 aio_work->req = req; 1351 queue_work(ceph_inode_to_fs_client(inode)->inode_wq, 1352 &aio_work->work); 1353 return; 1354 } 1355 rc = -ENOMEM; 1356 } else if (!aio_req->write) { 1357 if (sparse && rc >= 0) 1358 rc = ceph_sparse_ext_map_end(op); 1359 if (rc == -ENOENT) 1360 rc = 0; 1361 if (rc >= 0 && len > rc) { 1362 struct iov_iter i; 1363 int zlen = len - rc; 1364 1365 /* 1366 * If read is satisfied by single OSD request, 1367 * it can pass EOF. Otherwise read is within 1368 * i_size. 1369 */ 1370 if (aio_req->num_reqs == 1) { 1371 loff_t i_size = i_size_read(inode); 1372 loff_t endoff = aio_req->iocb->ki_pos + rc; 1373 if (endoff < i_size) 1374 zlen = min_t(size_t, zlen, 1375 i_size - endoff); 1376 aio_req->total_len = rc + zlen; 1377 } 1378 1379 iov_iter_bvec(&i, ITER_DEST, osd_data->bvec_pos.bvecs, 1380 osd_data->num_bvecs, len); 1381 iov_iter_advance(&i, rc); 1382 iov_iter_zero(zlen, &i); 1383 } 1384 } 1385 1386 /* r_start_latency == 0 means the request was not submitted */ 1387 if (req->r_start_latency) { 1388 if (aio_req->write) 1389 ceph_update_write_metrics(metric, req->r_start_latency, 1390 req->r_end_latency, len, rc); 1391 else 1392 ceph_update_read_metrics(metric, req->r_start_latency, 1393 req->r_end_latency, len, rc); 1394 } 1395 1396 put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, 1397 aio_req->should_dirty); 1398 ceph_osdc_put_request(req); 1399 1400 if (rc < 0) 1401 cmpxchg(&aio_req->error, 0, rc); 1402 1403 ceph_aio_complete(inode, aio_req); 1404 return; 1405 } 1406 1407 static void ceph_aio_retry_work(struct work_struct *work) 1408 { 1409 struct ceph_aio_work *aio_work = 1410 container_of(work, struct ceph_aio_work, work); 1411 struct ceph_osd_request *orig_req = aio_work->req; 1412 struct ceph_aio_request *aio_req = orig_req->r_priv; 1413 struct inode *inode = orig_req->r_inode; 1414 struct ceph_inode_info *ci = ceph_inode(inode); 1415 struct ceph_snap_context *snapc; 1416 struct ceph_osd_request *req; 1417 int ret; 1418 1419 spin_lock(&ci->i_ceph_lock); 1420 if (__ceph_have_pending_cap_snap(ci)) { 1421 struct ceph_cap_snap *capsnap = 1422 list_last_entry(&ci->i_cap_snaps, 1423 struct ceph_cap_snap, 1424 ci_item); 1425 snapc = ceph_get_snap_context(capsnap->context); 1426 } else { 1427 BUG_ON(!ci->i_head_snapc); 1428 snapc = ceph_get_snap_context(ci->i_head_snapc); 1429 } 1430 spin_unlock(&ci->i_ceph_lock); 1431 1432 req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1, 1433 false, GFP_NOFS); 1434 if (!req) { 1435 ret = -ENOMEM; 1436 req = orig_req; 1437 goto out; 1438 } 1439 1440 req->r_flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE; 1441 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); 1442 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); 1443 1444 req->r_ops[0] = orig_req->r_ops[0]; 1445 1446 req->r_mtime = aio_req->mtime; 1447 req->r_data_offset = req->r_ops[0].extent.offset; 1448 1449 ret = ceph_osdc_alloc_messages(req, GFP_NOFS); 1450 if (ret) { 1451 ceph_osdc_put_request(req); 1452 req = orig_req; 1453 goto out; 1454 } 1455 1456 ceph_osdc_put_request(orig_req); 1457 1458 req->r_callback = ceph_aio_complete_req; 1459 req->r_inode = inode; 1460 req->r_priv = aio_req; 1461 1462 ceph_osdc_start_request(req->r_osdc, req); 1463 out: 1464 if (ret < 0) { 1465 req->r_result = ret; 1466 ceph_aio_complete_req(req); 1467 } 1468 1469 ceph_put_snap_context(snapc); 1470 kfree(aio_work); 1471 } 1472 1473 static ssize_t 1474 ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, 1475 struct ceph_snap_context *snapc, 1476 struct ceph_cap_flush **pcf) 1477 { 1478 struct file *file = iocb->ki_filp; 1479 struct inode *inode = file_inode(file); 1480 struct ceph_inode_info *ci = ceph_inode(inode); 1481 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1482 struct ceph_client *cl = fsc->client; 1483 struct ceph_client_metric *metric = &fsc->mdsc->metric; 1484 struct ceph_vino vino; 1485 struct ceph_osd_request *req; 1486 struct bio_vec *bvecs; 1487 struct ceph_aio_request *aio_req = NULL; 1488 int num_pages = 0; 1489 int flags; 1490 int ret = 0; 1491 struct timespec64 mtime = current_time(inode); 1492 size_t count = iov_iter_count(iter); 1493 loff_t pos = iocb->ki_pos; 1494 bool write = iov_iter_rw(iter) == WRITE; 1495 bool should_dirty = !write && user_backed_iter(iter); 1496 bool sparse = ceph_test_mount_opt(fsc, SPARSEREAD); 1497 1498 if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP) 1499 return -EROFS; 1500 1501 doutc(cl, "sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n", 1502 (write ? "write" : "read"), file, pos, (unsigned)count, 1503 snapc, snapc ? snapc->seq : 0); 1504 1505 if (write) { 1506 int ret2; 1507 1508 ceph_fscache_invalidate(inode, true); 1509 1510 ret2 = invalidate_inode_pages2_range(inode->i_mapping, 1511 pos >> PAGE_SHIFT, 1512 (pos + count - 1) >> PAGE_SHIFT); 1513 if (ret2 < 0) 1514 doutc(cl, "invalidate_inode_pages2_range returned %d\n", 1515 ret2); 1516 1517 flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE; 1518 } else { 1519 flags = CEPH_OSD_FLAG_READ; 1520 } 1521 1522 while (iov_iter_count(iter) > 0) { 1523 u64 size = iov_iter_count(iter); 1524 ssize_t len; 1525 struct ceph_osd_req_op *op; 1526 int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ; 1527 int extent_cnt; 1528 1529 if (write) 1530 size = min_t(u64, size, fsc->mount_options->wsize); 1531 else 1532 size = min_t(u64, size, fsc->mount_options->rsize); 1533 1534 vino = ceph_vino(inode); 1535 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1536 vino, pos, &size, 0, 1537 1, 1538 write ? CEPH_OSD_OP_WRITE : readop, 1539 flags, snapc, 1540 ci->i_truncate_seq, 1541 ci->i_truncate_size, 1542 false); 1543 if (IS_ERR(req)) { 1544 ret = PTR_ERR(req); 1545 break; 1546 } 1547 1548 op = &req->r_ops[0]; 1549 if (!write && sparse) { 1550 extent_cnt = __ceph_sparse_read_ext_count(inode, size); 1551 ret = ceph_alloc_sparse_ext_map(op, extent_cnt); 1552 if (ret) { 1553 ceph_osdc_put_request(req); 1554 break; 1555 } 1556 } 1557 1558 len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); 1559 if (len < 0) { 1560 ceph_osdc_put_request(req); 1561 ret = len; 1562 break; 1563 } 1564 if (len != size) 1565 osd_req_op_extent_update(req, 0, len); 1566 1567 osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); 1568 1569 /* 1570 * To simplify error handling, allow AIO when IO within i_size 1571 * or IO can be satisfied by single OSD request. 1572 */ 1573 if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) && 1574 (len == count || pos + count <= i_size_read(inode))) { 1575 aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL); 1576 if (aio_req) { 1577 aio_req->iocb = iocb; 1578 aio_req->write = write; 1579 aio_req->should_dirty = should_dirty; 1580 INIT_LIST_HEAD(&aio_req->osd_reqs); 1581 if (write) { 1582 aio_req->mtime = mtime; 1583 swap(aio_req->prealloc_cf, *pcf); 1584 } 1585 } 1586 /* ignore error */ 1587 } 1588 1589 if (write) { 1590 /* 1591 * throw out any page cache pages in this range. this 1592 * may block. 1593 */ 1594 truncate_inode_pages_range(inode->i_mapping, pos, 1595 PAGE_ALIGN(pos + len) - 1); 1596 1597 req->r_mtime = mtime; 1598 } 1599 1600 if (aio_req) { 1601 aio_req->total_len += len; 1602 aio_req->num_reqs++; 1603 atomic_inc(&aio_req->pending_reqs); 1604 1605 req->r_callback = ceph_aio_complete_req; 1606 req->r_inode = inode; 1607 req->r_priv = aio_req; 1608 list_add_tail(&req->r_private_item, &aio_req->osd_reqs); 1609 1610 pos += len; 1611 continue; 1612 } 1613 1614 ceph_osdc_start_request(req->r_osdc, req); 1615 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1616 1617 if (write) 1618 ceph_update_write_metrics(metric, req->r_start_latency, 1619 req->r_end_latency, len, ret); 1620 else 1621 ceph_update_read_metrics(metric, req->r_start_latency, 1622 req->r_end_latency, len, ret); 1623 1624 size = i_size_read(inode); 1625 if (!write) { 1626 if (sparse && ret >= 0) 1627 ret = ceph_sparse_ext_map_end(op); 1628 else if (ret == -ENOENT) 1629 ret = 0; 1630 1631 if (ret >= 0 && ret < len && pos + ret < size) { 1632 struct iov_iter i; 1633 int zlen = min_t(size_t, len - ret, 1634 size - pos - ret); 1635 1636 iov_iter_bvec(&i, ITER_DEST, bvecs, num_pages, len); 1637 iov_iter_advance(&i, ret); 1638 iov_iter_zero(zlen, &i); 1639 ret += zlen; 1640 } 1641 if (ret >= 0) 1642 len = ret; 1643 } 1644 1645 put_bvecs(bvecs, num_pages, should_dirty); 1646 ceph_osdc_put_request(req); 1647 if (ret < 0) 1648 break; 1649 1650 pos += len; 1651 if (!write && pos >= size) 1652 break; 1653 1654 if (write && pos > size) { 1655 if (ceph_inode_set_size(inode, pos)) 1656 ceph_check_caps(ceph_inode(inode), 1657 CHECK_CAPS_AUTHONLY); 1658 } 1659 } 1660 1661 if (aio_req) { 1662 LIST_HEAD(osd_reqs); 1663 1664 if (aio_req->num_reqs == 0) { 1665 kfree(aio_req); 1666 return ret; 1667 } 1668 1669 ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR : 1670 CEPH_CAP_FILE_RD); 1671 1672 list_splice(&aio_req->osd_reqs, &osd_reqs); 1673 inode_dio_begin(inode); 1674 while (!list_empty(&osd_reqs)) { 1675 req = list_first_entry(&osd_reqs, 1676 struct ceph_osd_request, 1677 r_private_item); 1678 list_del_init(&req->r_private_item); 1679 if (ret >= 0) 1680 ceph_osdc_start_request(req->r_osdc, req); 1681 if (ret < 0) { 1682 req->r_result = ret; 1683 ceph_aio_complete_req(req); 1684 } 1685 } 1686 return -EIOCBQUEUED; 1687 } 1688 1689 if (ret != -EOLDSNAPC && pos > iocb->ki_pos) { 1690 ret = pos - iocb->ki_pos; 1691 iocb->ki_pos = pos; 1692 } 1693 return ret; 1694 } 1695 1696 /* 1697 * Synchronous write, straight from __user pointer or user pages. 1698 * 1699 * If write spans object boundary, just do multiple writes. (For a 1700 * correct atomic write, we should e.g. take write locks on all 1701 * objects, rollback on failure, etc.) 1702 */ 1703 static ssize_t 1704 ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, 1705 struct ceph_snap_context *snapc) 1706 { 1707 struct file *file = iocb->ki_filp; 1708 struct inode *inode = file_inode(file); 1709 struct ceph_inode_info *ci = ceph_inode(inode); 1710 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 1711 struct ceph_client *cl = fsc->client; 1712 struct ceph_osd_client *osdc = &fsc->client->osdc; 1713 struct ceph_osd_request *req; 1714 struct page **pages; 1715 u64 len; 1716 int num_pages; 1717 int written = 0; 1718 int ret; 1719 bool check_caps = false; 1720 struct timespec64 mtime = current_time(inode); 1721 size_t count = iov_iter_count(from); 1722 1723 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 1724 return -EROFS; 1725 1726 doutc(cl, "on file %p %lld~%u snapc %p seq %lld\n", file, pos, 1727 (unsigned)count, snapc, snapc->seq); 1728 1729 ret = filemap_write_and_wait_range(inode->i_mapping, 1730 pos, pos + count - 1); 1731 if (ret < 0) 1732 return ret; 1733 1734 ceph_fscache_invalidate(inode, false); 1735 1736 while ((len = iov_iter_count(from)) > 0) { 1737 size_t left; 1738 int n; 1739 u64 write_pos = pos; 1740 u64 write_len = len; 1741 u64 objnum, objoff; 1742 u32 xlen; 1743 u64 assert_ver = 0; 1744 bool rmw; 1745 bool first, last; 1746 struct iov_iter saved_iter = *from; 1747 size_t off; 1748 1749 ceph_fscrypt_adjust_off_and_len(inode, &write_pos, &write_len); 1750 1751 /* clamp the length to the end of first object */ 1752 ceph_calc_file_object_mapping(&ci->i_layout, write_pos, 1753 write_len, &objnum, &objoff, 1754 &xlen); 1755 write_len = xlen; 1756 1757 /* adjust len downward if it goes beyond current object */ 1758 if (pos + len > write_pos + write_len) 1759 len = write_pos + write_len - pos; 1760 1761 /* 1762 * If we had to adjust the length or position to align with a 1763 * crypto block, then we must do a read/modify/write cycle. We 1764 * use a version assertion to redrive the thing if something 1765 * changes in between. 1766 */ 1767 first = pos != write_pos; 1768 last = (pos + len) != (write_pos + write_len); 1769 rmw = first || last; 1770 1771 doutc(cl, "ino %llx %lld~%llu adjusted %lld~%llu -- %srmw\n", 1772 ci->i_vino.ino, pos, len, write_pos, write_len, 1773 rmw ? "" : "no "); 1774 1775 /* 1776 * The data is emplaced into the page as it would be if it were 1777 * in an array of pagecache pages. 1778 */ 1779 num_pages = calc_pages_for(write_pos, write_len); 1780 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1781 if (IS_ERR(pages)) { 1782 ret = PTR_ERR(pages); 1783 break; 1784 } 1785 1786 /* Do we need to preload the pages? */ 1787 if (rmw) { 1788 u64 first_pos = write_pos; 1789 u64 last_pos = (write_pos + write_len) - CEPH_FSCRYPT_BLOCK_SIZE; 1790 u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE; 1791 struct ceph_osd_req_op *op; 1792 1793 /* We should only need to do this for encrypted inodes */ 1794 WARN_ON_ONCE(!IS_ENCRYPTED(inode)); 1795 1796 /* No need to do two reads if first and last blocks are same */ 1797 if (first && last_pos == first_pos) 1798 last = false; 1799 1800 /* 1801 * Allocate a read request for one or two extents, 1802 * depending on how the request was aligned. 1803 */ 1804 req = ceph_osdc_new_request(osdc, &ci->i_layout, 1805 ci->i_vino, first ? first_pos : last_pos, 1806 &read_len, 0, (first && last) ? 2 : 1, 1807 CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ, 1808 NULL, ci->i_truncate_seq, 1809 ci->i_truncate_size, false); 1810 if (IS_ERR(req)) { 1811 ceph_release_page_vector(pages, num_pages); 1812 ret = PTR_ERR(req); 1813 break; 1814 } 1815 1816 /* Something is misaligned! */ 1817 if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) { 1818 ceph_osdc_put_request(req); 1819 ceph_release_page_vector(pages, num_pages); 1820 ret = -EIO; 1821 break; 1822 } 1823 1824 /* Add extent for first block? */ 1825 op = &req->r_ops[0]; 1826 1827 if (first) { 1828 osd_req_op_extent_osd_data_pages(req, 0, pages, 1829 CEPH_FSCRYPT_BLOCK_SIZE, 1830 offset_in_page(first_pos), 1831 false, false); 1832 /* We only expect a single extent here */ 1833 ret = __ceph_alloc_sparse_ext_map(op, 1); 1834 if (ret) { 1835 ceph_osdc_put_request(req); 1836 ceph_release_page_vector(pages, num_pages); 1837 break; 1838 } 1839 } 1840 1841 /* Add extent for last block */ 1842 if (last) { 1843 /* Init the other extent if first extent has been used */ 1844 if (first) { 1845 op = &req->r_ops[1]; 1846 osd_req_op_extent_init(req, 1, 1847 CEPH_OSD_OP_SPARSE_READ, 1848 last_pos, CEPH_FSCRYPT_BLOCK_SIZE, 1849 ci->i_truncate_size, 1850 ci->i_truncate_seq); 1851 } 1852 1853 ret = __ceph_alloc_sparse_ext_map(op, 1); 1854 if (ret) { 1855 ceph_osdc_put_request(req); 1856 ceph_release_page_vector(pages, num_pages); 1857 break; 1858 } 1859 1860 osd_req_op_extent_osd_data_pages(req, first ? 1 : 0, 1861 &pages[num_pages - 1], 1862 CEPH_FSCRYPT_BLOCK_SIZE, 1863 offset_in_page(last_pos), 1864 false, false); 1865 } 1866 1867 ceph_osdc_start_request(osdc, req); 1868 ret = ceph_osdc_wait_request(osdc, req); 1869 1870 /* FIXME: length field is wrong if there are 2 extents */ 1871 ceph_update_read_metrics(&fsc->mdsc->metric, 1872 req->r_start_latency, 1873 req->r_end_latency, 1874 read_len, ret); 1875 1876 /* Ok if object is not already present */ 1877 if (ret == -ENOENT) { 1878 /* 1879 * If there is no object, then we can't assert 1880 * on its version. Set it to 0, and we'll use an 1881 * exclusive create instead. 1882 */ 1883 ceph_osdc_put_request(req); 1884 ret = 0; 1885 1886 /* 1887 * zero out the soon-to-be uncopied parts of the 1888 * first and last pages. 1889 */ 1890 if (first) 1891 zero_user_segment(pages[0], 0, 1892 offset_in_page(first_pos)); 1893 if (last) 1894 zero_user_segment(pages[num_pages - 1], 1895 offset_in_page(last_pos), 1896 PAGE_SIZE); 1897 } else { 1898 if (ret < 0) { 1899 ceph_osdc_put_request(req); 1900 ceph_release_page_vector(pages, num_pages); 1901 break; 1902 } 1903 1904 op = &req->r_ops[0]; 1905 if (op->extent.sparse_ext_cnt == 0) { 1906 if (first) 1907 zero_user_segment(pages[0], 0, 1908 offset_in_page(first_pos)); 1909 else 1910 zero_user_segment(pages[num_pages - 1], 1911 offset_in_page(last_pos), 1912 PAGE_SIZE); 1913 } else if (op->extent.sparse_ext_cnt != 1 || 1914 ceph_sparse_ext_map_end(op) != 1915 CEPH_FSCRYPT_BLOCK_SIZE) { 1916 ret = -EIO; 1917 ceph_osdc_put_request(req); 1918 ceph_release_page_vector(pages, num_pages); 1919 break; 1920 } 1921 1922 if (first && last) { 1923 op = &req->r_ops[1]; 1924 if (op->extent.sparse_ext_cnt == 0) { 1925 zero_user_segment(pages[num_pages - 1], 1926 offset_in_page(last_pos), 1927 PAGE_SIZE); 1928 } else if (op->extent.sparse_ext_cnt != 1 || 1929 ceph_sparse_ext_map_end(op) != 1930 CEPH_FSCRYPT_BLOCK_SIZE) { 1931 ret = -EIO; 1932 ceph_osdc_put_request(req); 1933 ceph_release_page_vector(pages, num_pages); 1934 break; 1935 } 1936 } 1937 1938 /* Grab assert version. It must be non-zero. */ 1939 assert_ver = req->r_version; 1940 WARN_ON_ONCE(ret > 0 && assert_ver == 0); 1941 1942 ceph_osdc_put_request(req); 1943 if (first) { 1944 ret = ceph_fscrypt_decrypt_block_inplace(inode, 1945 pages[0], CEPH_FSCRYPT_BLOCK_SIZE, 1946 offset_in_page(first_pos), 1947 first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT); 1948 if (ret < 0) { 1949 ceph_release_page_vector(pages, num_pages); 1950 break; 1951 } 1952 } 1953 if (last) { 1954 ret = ceph_fscrypt_decrypt_block_inplace(inode, 1955 pages[num_pages - 1], 1956 CEPH_FSCRYPT_BLOCK_SIZE, 1957 offset_in_page(last_pos), 1958 last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT); 1959 if (ret < 0) { 1960 ceph_release_page_vector(pages, num_pages); 1961 break; 1962 } 1963 } 1964 } 1965 } 1966 1967 left = len; 1968 off = offset_in_page(pos); 1969 for (n = 0; n < num_pages; n++) { 1970 size_t plen = min_t(size_t, left, PAGE_SIZE - off); 1971 1972 /* copy the data */ 1973 ret = copy_page_from_iter(pages[n], off, plen, from); 1974 if (ret != plen) { 1975 ret = -EFAULT; 1976 break; 1977 } 1978 off = 0; 1979 left -= ret; 1980 } 1981 if (ret < 0) { 1982 doutc(cl, "write failed with %d\n", ret); 1983 ceph_release_page_vector(pages, num_pages); 1984 break; 1985 } 1986 1987 if (IS_ENCRYPTED(inode)) { 1988 ret = ceph_fscrypt_encrypt_pages(inode, pages, 1989 write_pos, write_len); 1990 if (ret < 0) { 1991 doutc(cl, "encryption failed with %d\n", ret); 1992 ceph_release_page_vector(pages, num_pages); 1993 break; 1994 } 1995 } 1996 1997 req = ceph_osdc_new_request(osdc, &ci->i_layout, 1998 ci->i_vino, write_pos, &write_len, 1999 rmw ? 1 : 0, rmw ? 2 : 1, 2000 CEPH_OSD_OP_WRITE, 2001 CEPH_OSD_FLAG_WRITE, 2002 snapc, ci->i_truncate_seq, 2003 ci->i_truncate_size, false); 2004 if (IS_ERR(req)) { 2005 ret = PTR_ERR(req); 2006 ceph_release_page_vector(pages, num_pages); 2007 break; 2008 } 2009 2010 doutc(cl, "write op %lld~%llu\n", write_pos, write_len); 2011 osd_req_op_extent_osd_data_pages(req, rmw ? 1 : 0, pages, write_len, 2012 offset_in_page(write_pos), false, 2013 true); 2014 req->r_inode = inode; 2015 req->r_mtime = mtime; 2016 2017 /* Set up the assertion */ 2018 if (rmw) { 2019 /* 2020 * Set up the assertion. If we don't have a version 2021 * number, then the object doesn't exist yet. Use an 2022 * exclusive create instead of a version assertion in 2023 * that case. 2024 */ 2025 if (assert_ver) { 2026 osd_req_op_init(req, 0, CEPH_OSD_OP_ASSERT_VER, 0); 2027 req->r_ops[0].assert_ver.ver = assert_ver; 2028 } else { 2029 osd_req_op_init(req, 0, CEPH_OSD_OP_CREATE, 2030 CEPH_OSD_OP_FLAG_EXCL); 2031 } 2032 } 2033 2034 ceph_osdc_start_request(osdc, req); 2035 ret = ceph_osdc_wait_request(osdc, req); 2036 2037 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 2038 req->r_end_latency, len, ret); 2039 ceph_osdc_put_request(req); 2040 if (ret != 0) { 2041 doutc(cl, "osd write returned %d\n", ret); 2042 /* Version changed! Must re-do the rmw cycle */ 2043 if ((assert_ver && (ret == -ERANGE || ret == -EOVERFLOW)) || 2044 (!assert_ver && ret == -EEXIST)) { 2045 /* We should only ever see this on a rmw */ 2046 WARN_ON_ONCE(!rmw); 2047 2048 /* The version should never go backward */ 2049 WARN_ON_ONCE(ret == -EOVERFLOW); 2050 2051 *from = saved_iter; 2052 2053 /* FIXME: limit number of times we loop? */ 2054 continue; 2055 } 2056 ceph_set_error_write(ci); 2057 break; 2058 } 2059 2060 ceph_clear_error_write(ci); 2061 2062 /* 2063 * We successfully wrote to a range of the file. Declare 2064 * that region of the pagecache invalid. 2065 */ 2066 ret = invalidate_inode_pages2_range( 2067 inode->i_mapping, 2068 pos >> PAGE_SHIFT, 2069 (pos + len - 1) >> PAGE_SHIFT); 2070 if (ret < 0) { 2071 doutc(cl, "invalidate_inode_pages2_range returned %d\n", 2072 ret); 2073 ret = 0; 2074 } 2075 pos += len; 2076 written += len; 2077 doutc(cl, "written %d\n", written); 2078 if (pos > i_size_read(inode)) { 2079 check_caps = ceph_inode_set_size(inode, pos); 2080 if (check_caps) 2081 ceph_check_caps(ceph_inode(inode), 2082 CHECK_CAPS_AUTHONLY); 2083 } 2084 2085 } 2086 2087 if (ret != -EOLDSNAPC && written > 0) { 2088 ret = written; 2089 iocb->ki_pos = pos; 2090 } 2091 doutc(cl, "returning %d\n", ret); 2092 return ret; 2093 } 2094 2095 /* 2096 * Wrap generic_file_aio_read with checks for cap bits on the inode. 2097 * Atomically grab references, so that those bits are not released 2098 * back to the MDS mid-read. 2099 * 2100 * Hmm, the sync read case isn't actually async... should it be? 2101 */ 2102 static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) 2103 { 2104 struct file *filp = iocb->ki_filp; 2105 struct ceph_file_info *fi = filp->private_data; 2106 size_t len = iov_iter_count(to); 2107 struct inode *inode = file_inode(filp); 2108 struct ceph_inode_info *ci = ceph_inode(inode); 2109 bool direct_lock = iocb->ki_flags & IOCB_DIRECT; 2110 struct ceph_client *cl = ceph_inode_to_client(inode); 2111 ssize_t ret; 2112 int want = 0, got = 0; 2113 int retry_op = 0, read = 0; 2114 2115 again: 2116 doutc(cl, "%llu~%u trying to get caps on %p %llx.%llx\n", 2117 iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode)); 2118 2119 if (ceph_inode_is_shutdown(inode)) 2120 return -ESTALE; 2121 2122 ret = direct_lock ? ceph_start_io_direct(inode) : 2123 ceph_start_io_read(inode); 2124 if (ret) 2125 return ret; 2126 2127 if (!(fi->flags & CEPH_F_SYNC) && !direct_lock) 2128 want |= CEPH_CAP_FILE_CACHE; 2129 if (fi->fmode & CEPH_FILE_MODE_LAZY) 2130 want |= CEPH_CAP_FILE_LAZYIO; 2131 2132 ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1, &got); 2133 if (ret < 0) { 2134 if (direct_lock) 2135 ceph_end_io_direct(inode); 2136 else 2137 ceph_end_io_read(inode); 2138 return ret; 2139 } 2140 2141 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 2142 (iocb->ki_flags & IOCB_DIRECT) || 2143 (fi->flags & CEPH_F_SYNC)) { 2144 2145 doutc(cl, "sync %p %llx.%llx %llu~%u got cap refs on %s\n", 2146 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 2147 ceph_cap_string(got)); 2148 2149 if (!ceph_has_inline_data(ci)) { 2150 if (!retry_op && 2151 (iocb->ki_flags & IOCB_DIRECT) && 2152 !IS_ENCRYPTED(inode)) { 2153 ret = ceph_direct_read_write(iocb, to, 2154 NULL, NULL); 2155 if (ret >= 0 && ret < len) 2156 retry_op = CHECK_EOF; 2157 } else { 2158 ret = ceph_sync_read(iocb, to, &retry_op); 2159 } 2160 } else { 2161 retry_op = READ_INLINE; 2162 } 2163 } else { 2164 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); 2165 doutc(cl, "async %p %llx.%llx %llu~%u got cap refs on %s\n", 2166 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 2167 ceph_cap_string(got)); 2168 ceph_add_rw_context(fi, &rw_ctx); 2169 ret = generic_file_read_iter(iocb, to); 2170 ceph_del_rw_context(fi, &rw_ctx); 2171 } 2172 2173 doutc(cl, "%p %llx.%llx dropping cap refs on %s = %d\n", 2174 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 2175 ceph_put_cap_refs(ci, got); 2176 2177 if (direct_lock) 2178 ceph_end_io_direct(inode); 2179 else 2180 ceph_end_io_read(inode); 2181 2182 if (retry_op > HAVE_RETRIED && ret >= 0) { 2183 int statret; 2184 struct page *page = NULL; 2185 loff_t i_size; 2186 int mask = CEPH_STAT_CAP_SIZE; 2187 if (retry_op == READ_INLINE) { 2188 page = __page_cache_alloc(GFP_KERNEL); 2189 if (!page) 2190 return -ENOMEM; 2191 2192 mask = CEPH_STAT_CAP_INLINE_DATA; 2193 } 2194 2195 statret = __ceph_do_getattr(inode, page, mask, !!page); 2196 if (statret < 0) { 2197 if (page) 2198 __free_page(page); 2199 if (statret == -ENODATA) { 2200 BUG_ON(retry_op != READ_INLINE); 2201 goto again; 2202 } 2203 return statret; 2204 } 2205 2206 i_size = i_size_read(inode); 2207 if (retry_op == READ_INLINE) { 2208 BUG_ON(ret > 0 || read > 0); 2209 if (iocb->ki_pos < i_size && 2210 iocb->ki_pos < PAGE_SIZE) { 2211 loff_t end = min_t(loff_t, i_size, 2212 iocb->ki_pos + len); 2213 end = min_t(loff_t, end, PAGE_SIZE); 2214 if (statret < end) 2215 zero_user_segment(page, statret, end); 2216 ret = copy_page_to_iter(page, 2217 iocb->ki_pos & ~PAGE_MASK, 2218 end - iocb->ki_pos, to); 2219 iocb->ki_pos += ret; 2220 read += ret; 2221 } 2222 if (iocb->ki_pos < i_size && read < len) { 2223 size_t zlen = min_t(size_t, len - read, 2224 i_size - iocb->ki_pos); 2225 ret = iov_iter_zero(zlen, to); 2226 iocb->ki_pos += ret; 2227 read += ret; 2228 } 2229 __free_pages(page, 0); 2230 return read; 2231 } 2232 2233 /* hit EOF or hole? */ 2234 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 2235 ret < len) { 2236 doutc(cl, "may hit hole, ppos %lld < size %lld, reading more\n", 2237 iocb->ki_pos, i_size); 2238 2239 read += ret; 2240 len -= ret; 2241 retry_op = HAVE_RETRIED; 2242 goto again; 2243 } 2244 } 2245 2246 if (ret >= 0) 2247 ret += read; 2248 2249 return ret; 2250 } 2251 2252 /* 2253 * Wrap filemap_splice_read with checks for cap bits on the inode. 2254 * Atomically grab references, so that those bits are not released 2255 * back to the MDS mid-read. 2256 */ 2257 static ssize_t ceph_splice_read(struct file *in, loff_t *ppos, 2258 struct pipe_inode_info *pipe, 2259 size_t len, unsigned int flags) 2260 { 2261 struct ceph_file_info *fi = in->private_data; 2262 struct inode *inode = file_inode(in); 2263 struct ceph_inode_info *ci = ceph_inode(inode); 2264 ssize_t ret; 2265 int want = 0, got = 0; 2266 CEPH_DEFINE_RW_CONTEXT(rw_ctx, 0); 2267 2268 dout("splice_read %p %llx.%llx %llu~%zu trying to get caps on %p\n", 2269 inode, ceph_vinop(inode), *ppos, len, inode); 2270 2271 if (ceph_inode_is_shutdown(inode)) 2272 return -ESTALE; 2273 2274 if (ceph_has_inline_data(ci) || 2275 (fi->flags & CEPH_F_SYNC)) 2276 return copy_splice_read(in, ppos, pipe, len, flags); 2277 2278 ret = ceph_start_io_read(inode); 2279 if (ret) 2280 return ret; 2281 2282 want = CEPH_CAP_FILE_CACHE; 2283 if (fi->fmode & CEPH_FILE_MODE_LAZY) 2284 want |= CEPH_CAP_FILE_LAZYIO; 2285 2286 ret = ceph_get_caps(in, CEPH_CAP_FILE_RD, want, -1, &got); 2287 if (ret < 0) 2288 goto out_end; 2289 2290 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) == 0) { 2291 dout("splice_read/sync %p %llx.%llx %llu~%zu got cap refs on %s\n", 2292 inode, ceph_vinop(inode), *ppos, len, 2293 ceph_cap_string(got)); 2294 2295 ceph_put_cap_refs(ci, got); 2296 ceph_end_io_read(inode); 2297 return copy_splice_read(in, ppos, pipe, len, flags); 2298 } 2299 2300 dout("splice_read %p %llx.%llx %llu~%zu got cap refs on %s\n", 2301 inode, ceph_vinop(inode), *ppos, len, ceph_cap_string(got)); 2302 2303 rw_ctx.caps = got; 2304 ceph_add_rw_context(fi, &rw_ctx); 2305 ret = filemap_splice_read(in, ppos, pipe, len, flags); 2306 ceph_del_rw_context(fi, &rw_ctx); 2307 2308 dout("splice_read %p %llx.%llx dropping cap refs on %s = %zd\n", 2309 inode, ceph_vinop(inode), ceph_cap_string(got), ret); 2310 2311 ceph_put_cap_refs(ci, got); 2312 out_end: 2313 ceph_end_io_read(inode); 2314 return ret; 2315 } 2316 2317 /* 2318 * Take cap references to avoid releasing caps to MDS mid-write. 2319 * 2320 * If we are synchronous, and write with an old snap context, the OSD 2321 * may return EOLDSNAPC. In that case, retry the write.. _after_ 2322 * dropping our cap refs and allowing the pending snap to logically 2323 * complete _before_ this write occurs. 2324 * 2325 * If we are near ENOSPC, write synchronously. 2326 */ 2327 static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) 2328 { 2329 struct file *file = iocb->ki_filp; 2330 struct ceph_file_info *fi = file->private_data; 2331 struct inode *inode = file_inode(file); 2332 struct ceph_inode_info *ci = ceph_inode(inode); 2333 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 2334 struct ceph_client *cl = fsc->client; 2335 struct ceph_osd_client *osdc = &fsc->client->osdc; 2336 struct ceph_cap_flush *prealloc_cf; 2337 ssize_t count, written = 0; 2338 int err, want = 0, got; 2339 bool direct_lock = false; 2340 u32 map_flags; 2341 u64 pool_flags; 2342 loff_t pos; 2343 loff_t limit = max(i_size_read(inode), fsc->max_file_size); 2344 2345 if (ceph_inode_is_shutdown(inode)) 2346 return -ESTALE; 2347 2348 if (ceph_snap(inode) != CEPH_NOSNAP) 2349 return -EROFS; 2350 2351 prealloc_cf = ceph_alloc_cap_flush(); 2352 if (!prealloc_cf) 2353 return -ENOMEM; 2354 2355 if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT) 2356 direct_lock = true; 2357 2358 retry_snap: 2359 err = direct_lock ? ceph_start_io_direct(inode) : 2360 ceph_start_io_write(inode); 2361 if (err) 2362 goto out_unlocked; 2363 2364 if (iocb->ki_flags & IOCB_APPEND) { 2365 err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 2366 if (err < 0) 2367 goto out; 2368 } 2369 2370 err = generic_write_checks(iocb, from); 2371 if (err <= 0) 2372 goto out; 2373 2374 pos = iocb->ki_pos; 2375 if (unlikely(pos >= limit)) { 2376 err = -EFBIG; 2377 goto out; 2378 } else { 2379 iov_iter_truncate(from, limit - pos); 2380 } 2381 2382 count = iov_iter_count(from); 2383 if (ceph_quota_is_max_bytes_exceeded(inode, pos + count)) { 2384 err = -EDQUOT; 2385 goto out; 2386 } 2387 2388 down_read(&osdc->lock); 2389 map_flags = osdc->osdmap->flags; 2390 pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id); 2391 up_read(&osdc->lock); 2392 if ((map_flags & CEPH_OSDMAP_FULL) || 2393 (pool_flags & CEPH_POOL_FLAG_FULL)) { 2394 err = -ENOSPC; 2395 goto out; 2396 } 2397 2398 err = file_remove_privs(file); 2399 if (err) 2400 goto out; 2401 2402 doutc(cl, "%p %llx.%llx %llu~%zd getting caps. i_size %llu\n", 2403 inode, ceph_vinop(inode), pos, count, 2404 i_size_read(inode)); 2405 if (!(fi->flags & CEPH_F_SYNC) && !direct_lock) 2406 want |= CEPH_CAP_FILE_BUFFER; 2407 if (fi->fmode & CEPH_FILE_MODE_LAZY) 2408 want |= CEPH_CAP_FILE_LAZYIO; 2409 got = 0; 2410 err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count, &got); 2411 if (err < 0) 2412 goto out; 2413 2414 err = file_update_time(file); 2415 if (err) 2416 goto out_caps; 2417 2418 inode_inc_iversion_raw(inode); 2419 2420 doutc(cl, "%p %llx.%llx %llu~%zd got cap refs on %s\n", 2421 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 2422 2423 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 2424 (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) || 2425 (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) { 2426 struct ceph_snap_context *snapc; 2427 struct iov_iter data; 2428 2429 spin_lock(&ci->i_ceph_lock); 2430 if (__ceph_have_pending_cap_snap(ci)) { 2431 struct ceph_cap_snap *capsnap = 2432 list_last_entry(&ci->i_cap_snaps, 2433 struct ceph_cap_snap, 2434 ci_item); 2435 snapc = ceph_get_snap_context(capsnap->context); 2436 } else { 2437 BUG_ON(!ci->i_head_snapc); 2438 snapc = ceph_get_snap_context(ci->i_head_snapc); 2439 } 2440 spin_unlock(&ci->i_ceph_lock); 2441 2442 /* we might need to revert back to that point */ 2443 data = *from; 2444 if ((iocb->ki_flags & IOCB_DIRECT) && !IS_ENCRYPTED(inode)) 2445 written = ceph_direct_read_write(iocb, &data, snapc, 2446 &prealloc_cf); 2447 else 2448 written = ceph_sync_write(iocb, &data, pos, snapc); 2449 if (direct_lock) 2450 ceph_end_io_direct(inode); 2451 else 2452 ceph_end_io_write(inode); 2453 if (written > 0) 2454 iov_iter_advance(from, written); 2455 ceph_put_snap_context(snapc); 2456 } else { 2457 /* 2458 * No need to acquire the i_truncate_mutex. Because 2459 * the MDS revokes Fwb caps before sending truncate 2460 * message to us. We can't get Fwb cap while there 2461 * are pending vmtruncate. So write and vmtruncate 2462 * can not run at the same time 2463 */ 2464 written = generic_perform_write(iocb, from); 2465 ceph_end_io_write(inode); 2466 } 2467 2468 if (written >= 0) { 2469 int dirty; 2470 2471 spin_lock(&ci->i_ceph_lock); 2472 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 2473 &prealloc_cf); 2474 spin_unlock(&ci->i_ceph_lock); 2475 if (dirty) 2476 __mark_inode_dirty(inode, dirty); 2477 if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos)) 2478 ceph_check_caps(ci, CHECK_CAPS_FLUSH); 2479 } 2480 2481 doutc(cl, "%p %llx.%llx %llu~%u dropping cap refs on %s\n", 2482 inode, ceph_vinop(inode), pos, (unsigned)count, 2483 ceph_cap_string(got)); 2484 ceph_put_cap_refs(ci, got); 2485 2486 if (written == -EOLDSNAPC) { 2487 doutc(cl, "%p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", 2488 inode, ceph_vinop(inode), pos, (unsigned)count); 2489 goto retry_snap; 2490 } 2491 2492 if (written >= 0) { 2493 if ((map_flags & CEPH_OSDMAP_NEARFULL) || 2494 (pool_flags & CEPH_POOL_FLAG_NEARFULL)) 2495 iocb->ki_flags |= IOCB_DSYNC; 2496 written = generic_write_sync(iocb, written); 2497 } 2498 2499 goto out_unlocked; 2500 out_caps: 2501 ceph_put_cap_refs(ci, got); 2502 out: 2503 if (direct_lock) 2504 ceph_end_io_direct(inode); 2505 else 2506 ceph_end_io_write(inode); 2507 out_unlocked: 2508 ceph_free_cap_flush(prealloc_cf); 2509 return written ? written : err; 2510 } 2511 2512 /* 2513 * llseek. be sure to verify file size on SEEK_END. 2514 */ 2515 static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) 2516 { 2517 if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { 2518 struct inode *inode = file_inode(file); 2519 int ret; 2520 2521 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 2522 if (ret < 0) 2523 return ret; 2524 } 2525 return generic_file_llseek(file, offset, whence); 2526 } 2527 2528 static inline void ceph_zero_partial_page(struct inode *inode, 2529 loff_t offset, size_t size) 2530 { 2531 struct folio *folio; 2532 2533 folio = filemap_lock_folio(inode->i_mapping, offset >> PAGE_SHIFT); 2534 if (IS_ERR(folio)) 2535 return; 2536 2537 folio_wait_writeback(folio); 2538 folio_zero_range(folio, offset_in_folio(folio, offset), size); 2539 folio_unlock(folio); 2540 folio_put(folio); 2541 } 2542 2543 static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset, 2544 loff_t length) 2545 { 2546 loff_t nearly = round_up(offset, PAGE_SIZE); 2547 if (offset < nearly) { 2548 loff_t size = nearly - offset; 2549 if (length < size) 2550 size = length; 2551 ceph_zero_partial_page(inode, offset, size); 2552 offset += size; 2553 length -= size; 2554 } 2555 if (length >= PAGE_SIZE) { 2556 loff_t size = round_down(length, PAGE_SIZE); 2557 truncate_pagecache_range(inode, offset, offset + size - 1); 2558 offset += size; 2559 length -= size; 2560 } 2561 if (length) 2562 ceph_zero_partial_page(inode, offset, length); 2563 } 2564 2565 static int ceph_zero_partial_object(struct inode *inode, 2566 loff_t offset, loff_t *length) 2567 { 2568 struct ceph_inode_info *ci = ceph_inode(inode); 2569 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); 2570 struct ceph_osd_request *req; 2571 int ret = 0; 2572 loff_t zero = 0; 2573 int op; 2574 2575 if (ceph_inode_is_shutdown(inode)) 2576 return -EIO; 2577 2578 if (!length) { 2579 op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE; 2580 length = &zero; 2581 } else { 2582 op = CEPH_OSD_OP_ZERO; 2583 } 2584 2585 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 2586 ceph_vino(inode), 2587 offset, length, 2588 0, 1, op, 2589 CEPH_OSD_FLAG_WRITE, 2590 NULL, 0, 0, false); 2591 if (IS_ERR(req)) { 2592 ret = PTR_ERR(req); 2593 goto out; 2594 } 2595 2596 req->r_mtime = inode_get_mtime(inode); 2597 ceph_osdc_start_request(&fsc->client->osdc, req); 2598 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 2599 if (ret == -ENOENT) 2600 ret = 0; 2601 ceph_osdc_put_request(req); 2602 2603 out: 2604 return ret; 2605 } 2606 2607 static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length) 2608 { 2609 int ret = 0; 2610 struct ceph_inode_info *ci = ceph_inode(inode); 2611 s32 stripe_unit = ci->i_layout.stripe_unit; 2612 s32 stripe_count = ci->i_layout.stripe_count; 2613 s32 object_size = ci->i_layout.object_size; 2614 u64 object_set_size = (u64) object_size * stripe_count; 2615 u64 nearly, t; 2616 2617 /* round offset up to next period boundary */ 2618 nearly = offset + object_set_size - 1; 2619 t = nearly; 2620 nearly -= do_div(t, object_set_size); 2621 2622 while (length && offset < nearly) { 2623 loff_t size = length; 2624 ret = ceph_zero_partial_object(inode, offset, &size); 2625 if (ret < 0) 2626 return ret; 2627 offset += size; 2628 length -= size; 2629 } 2630 while (length >= object_set_size) { 2631 int i; 2632 loff_t pos = offset; 2633 for (i = 0; i < stripe_count; ++i) { 2634 ret = ceph_zero_partial_object(inode, pos, NULL); 2635 if (ret < 0) 2636 return ret; 2637 pos += stripe_unit; 2638 } 2639 offset += object_set_size; 2640 length -= object_set_size; 2641 } 2642 while (length) { 2643 loff_t size = length; 2644 ret = ceph_zero_partial_object(inode, offset, &size); 2645 if (ret < 0) 2646 return ret; 2647 offset += size; 2648 length -= size; 2649 } 2650 return ret; 2651 } 2652 2653 static long ceph_fallocate(struct file *file, int mode, 2654 loff_t offset, loff_t length) 2655 { 2656 struct ceph_file_info *fi = file->private_data; 2657 struct inode *inode = file_inode(file); 2658 struct ceph_inode_info *ci = ceph_inode(inode); 2659 struct ceph_cap_flush *prealloc_cf; 2660 struct ceph_client *cl = ceph_inode_to_client(inode); 2661 int want, got = 0; 2662 int dirty; 2663 int ret = 0; 2664 loff_t endoff = 0; 2665 loff_t size; 2666 2667 doutc(cl, "%p %llx.%llx mode %x, offset %llu length %llu\n", 2668 inode, ceph_vinop(inode), mode, offset, length); 2669 2670 if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) 2671 return -EOPNOTSUPP; 2672 2673 if (!S_ISREG(inode->i_mode)) 2674 return -EOPNOTSUPP; 2675 2676 if (IS_ENCRYPTED(inode)) 2677 return -EOPNOTSUPP; 2678 2679 prealloc_cf = ceph_alloc_cap_flush(); 2680 if (!prealloc_cf) 2681 return -ENOMEM; 2682 2683 inode_lock(inode); 2684 2685 if (ceph_snap(inode) != CEPH_NOSNAP) { 2686 ret = -EROFS; 2687 goto unlock; 2688 } 2689 2690 size = i_size_read(inode); 2691 2692 /* Are we punching a hole beyond EOF? */ 2693 if (offset >= size) 2694 goto unlock; 2695 if ((offset + length) > size) 2696 length = size - offset; 2697 2698 if (fi->fmode & CEPH_FILE_MODE_LAZY) 2699 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 2700 else 2701 want = CEPH_CAP_FILE_BUFFER; 2702 2703 ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got); 2704 if (ret < 0) 2705 goto unlock; 2706 2707 ret = file_modified(file); 2708 if (ret) 2709 goto put_caps; 2710 2711 filemap_invalidate_lock(inode->i_mapping); 2712 ceph_fscache_invalidate(inode, false); 2713 ceph_zero_pagecache_range(inode, offset, length); 2714 ret = ceph_zero_objects(inode, offset, length); 2715 2716 if (!ret) { 2717 spin_lock(&ci->i_ceph_lock); 2718 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 2719 &prealloc_cf); 2720 spin_unlock(&ci->i_ceph_lock); 2721 if (dirty) 2722 __mark_inode_dirty(inode, dirty); 2723 } 2724 filemap_invalidate_unlock(inode->i_mapping); 2725 2726 put_caps: 2727 ceph_put_cap_refs(ci, got); 2728 unlock: 2729 inode_unlock(inode); 2730 ceph_free_cap_flush(prealloc_cf); 2731 return ret; 2732 } 2733 2734 /* 2735 * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for 2736 * src_ci. Two attempts are made to obtain both caps, and an error is return if 2737 * this fails; zero is returned on success. 2738 */ 2739 static int get_rd_wr_caps(struct file *src_filp, int *src_got, 2740 struct file *dst_filp, 2741 loff_t dst_endoff, int *dst_got) 2742 { 2743 int ret = 0; 2744 bool retrying = false; 2745 2746 retry_caps: 2747 ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, 2748 dst_endoff, dst_got); 2749 if (ret < 0) 2750 return ret; 2751 2752 /* 2753 * Since we're already holding the FILE_WR capability for the dst file, 2754 * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some 2755 * retry dance instead to try to get both capabilities. 2756 */ 2757 ret = ceph_try_get_caps(file_inode(src_filp), 2758 CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, 2759 false, src_got); 2760 if (ret <= 0) { 2761 /* Start by dropping dst_ci caps and getting src_ci caps */ 2762 ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got); 2763 if (retrying) { 2764 if (!ret) 2765 /* ceph_try_get_caps masks EAGAIN */ 2766 ret = -EAGAIN; 2767 return ret; 2768 } 2769 ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD, 2770 CEPH_CAP_FILE_SHARED, -1, src_got); 2771 if (ret < 0) 2772 return ret; 2773 /*... drop src_ci caps too, and retry */ 2774 ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got); 2775 retrying = true; 2776 goto retry_caps; 2777 } 2778 return ret; 2779 } 2780 2781 static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, 2782 struct ceph_inode_info *dst_ci, int dst_got) 2783 { 2784 ceph_put_cap_refs(src_ci, src_got); 2785 ceph_put_cap_refs(dst_ci, dst_got); 2786 } 2787 2788 /* 2789 * This function does several size-related checks, returning an error if: 2790 * - source file is smaller than off+len 2791 * - destination file size is not OK (inode_newsize_ok()) 2792 * - max bytes quotas is exceeded 2793 */ 2794 static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, 2795 loff_t src_off, loff_t dst_off, size_t len) 2796 { 2797 struct ceph_client *cl = ceph_inode_to_client(src_inode); 2798 loff_t size, endoff; 2799 2800 size = i_size_read(src_inode); 2801 /* 2802 * Don't copy beyond source file EOF. Instead of simply setting length 2803 * to (size - src_off), just drop to VFS default implementation, as the 2804 * local i_size may be stale due to other clients writing to the source 2805 * inode. 2806 */ 2807 if (src_off + len > size) { 2808 doutc(cl, "Copy beyond EOF (%llu + %zu > %llu)\n", src_off, 2809 len, size); 2810 return -EOPNOTSUPP; 2811 } 2812 size = i_size_read(dst_inode); 2813 2814 endoff = dst_off + len; 2815 if (inode_newsize_ok(dst_inode, endoff)) 2816 return -EOPNOTSUPP; 2817 2818 if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) 2819 return -EDQUOT; 2820 2821 return 0; 2822 } 2823 2824 static struct ceph_osd_request * 2825 ceph_alloc_copyfrom_request(struct ceph_osd_client *osdc, 2826 u64 src_snapid, 2827 struct ceph_object_id *src_oid, 2828 struct ceph_object_locator *src_oloc, 2829 struct ceph_object_id *dst_oid, 2830 struct ceph_object_locator *dst_oloc, 2831 u32 truncate_seq, u64 truncate_size) 2832 { 2833 struct ceph_osd_request *req; 2834 int ret; 2835 u32 src_fadvise_flags = 2836 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | 2837 CEPH_OSD_OP_FLAG_FADVISE_NOCACHE; 2838 u32 dst_fadvise_flags = 2839 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | 2840 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED; 2841 2842 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 2843 if (!req) 2844 return ERR_PTR(-ENOMEM); 2845 2846 req->r_flags = CEPH_OSD_FLAG_WRITE; 2847 2848 ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc); 2849 ceph_oid_copy(&req->r_t.base_oid, dst_oid); 2850 2851 ret = osd_req_op_copy_from_init(req, src_snapid, 0, 2852 src_oid, src_oloc, 2853 src_fadvise_flags, 2854 dst_fadvise_flags, 2855 truncate_seq, 2856 truncate_size, 2857 CEPH_OSD_COPY_FROM_FLAG_TRUNCATE_SEQ); 2858 if (ret) 2859 goto out; 2860 2861 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 2862 if (ret) 2863 goto out; 2864 2865 return req; 2866 2867 out: 2868 ceph_osdc_put_request(req); 2869 return ERR_PTR(ret); 2870 } 2871 2872 static ssize_t ceph_do_objects_copy(struct ceph_inode_info *src_ci, u64 *src_off, 2873 struct ceph_inode_info *dst_ci, u64 *dst_off, 2874 struct ceph_fs_client *fsc, 2875 size_t len, unsigned int flags) 2876 { 2877 struct ceph_object_locator src_oloc, dst_oloc; 2878 struct ceph_object_id src_oid, dst_oid; 2879 struct ceph_osd_client *osdc; 2880 struct ceph_osd_request *req; 2881 ssize_t bytes = 0; 2882 u64 src_objnum, src_objoff, dst_objnum, dst_objoff; 2883 u32 src_objlen, dst_objlen; 2884 u32 object_size = src_ci->i_layout.object_size; 2885 struct ceph_client *cl = fsc->client; 2886 int ret; 2887 2888 src_oloc.pool = src_ci->i_layout.pool_id; 2889 src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); 2890 dst_oloc.pool = dst_ci->i_layout.pool_id; 2891 dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); 2892 osdc = &fsc->client->osdc; 2893 2894 while (len >= object_size) { 2895 ceph_calc_file_object_mapping(&src_ci->i_layout, *src_off, 2896 object_size, &src_objnum, 2897 &src_objoff, &src_objlen); 2898 ceph_calc_file_object_mapping(&dst_ci->i_layout, *dst_off, 2899 object_size, &dst_objnum, 2900 &dst_objoff, &dst_objlen); 2901 ceph_oid_init(&src_oid); 2902 ceph_oid_printf(&src_oid, "%llx.%08llx", 2903 src_ci->i_vino.ino, src_objnum); 2904 ceph_oid_init(&dst_oid); 2905 ceph_oid_printf(&dst_oid, "%llx.%08llx", 2906 dst_ci->i_vino.ino, dst_objnum); 2907 /* Do an object remote copy */ 2908 req = ceph_alloc_copyfrom_request(osdc, src_ci->i_vino.snap, 2909 &src_oid, &src_oloc, 2910 &dst_oid, &dst_oloc, 2911 dst_ci->i_truncate_seq, 2912 dst_ci->i_truncate_size); 2913 if (IS_ERR(req)) 2914 ret = PTR_ERR(req); 2915 else { 2916 ceph_osdc_start_request(osdc, req); 2917 ret = ceph_osdc_wait_request(osdc, req); 2918 ceph_update_copyfrom_metrics(&fsc->mdsc->metric, 2919 req->r_start_latency, 2920 req->r_end_latency, 2921 object_size, ret); 2922 ceph_osdc_put_request(req); 2923 } 2924 if (ret) { 2925 if (ret == -EOPNOTSUPP) { 2926 fsc->have_copy_from2 = false; 2927 pr_notice_client(cl, 2928 "OSDs don't support copy-from2; disabling copy offload\n"); 2929 } 2930 doutc(cl, "returned %d\n", ret); 2931 if (bytes <= 0) 2932 bytes = ret; 2933 goto out; 2934 } 2935 len -= object_size; 2936 bytes += object_size; 2937 *src_off += object_size; 2938 *dst_off += object_size; 2939 } 2940 2941 out: 2942 ceph_oloc_destroy(&src_oloc); 2943 ceph_oloc_destroy(&dst_oloc); 2944 return bytes; 2945 } 2946 2947 static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off, 2948 struct file *dst_file, loff_t dst_off, 2949 size_t len, unsigned int flags) 2950 { 2951 struct inode *src_inode = file_inode(src_file); 2952 struct inode *dst_inode = file_inode(dst_file); 2953 struct ceph_inode_info *src_ci = ceph_inode(src_inode); 2954 struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); 2955 struct ceph_cap_flush *prealloc_cf; 2956 struct ceph_fs_client *src_fsc = ceph_inode_to_fs_client(src_inode); 2957 struct ceph_client *cl = src_fsc->client; 2958 loff_t size; 2959 ssize_t ret = -EIO, bytes; 2960 u64 src_objnum, dst_objnum, src_objoff, dst_objoff; 2961 u32 src_objlen, dst_objlen; 2962 int src_got = 0, dst_got = 0, err, dirty; 2963 2964 if (src_inode->i_sb != dst_inode->i_sb) { 2965 struct ceph_fs_client *dst_fsc = ceph_inode_to_fs_client(dst_inode); 2966 2967 if (ceph_fsid_compare(&src_fsc->client->fsid, 2968 &dst_fsc->client->fsid)) { 2969 dout("Copying files across clusters: src: %pU dst: %pU\n", 2970 &src_fsc->client->fsid, &dst_fsc->client->fsid); 2971 return -EXDEV; 2972 } 2973 } 2974 if (ceph_snap(dst_inode) != CEPH_NOSNAP) 2975 return -EROFS; 2976 2977 /* 2978 * Some of the checks below will return -EOPNOTSUPP, which will force a 2979 * fallback to the default VFS copy_file_range implementation. This is 2980 * desirable in several cases (for ex, the 'len' is smaller than the 2981 * size of the objects, or in cases where that would be more 2982 * efficient). 2983 */ 2984 2985 if (ceph_test_mount_opt(src_fsc, NOCOPYFROM)) 2986 return -EOPNOTSUPP; 2987 2988 if (!src_fsc->have_copy_from2) 2989 return -EOPNOTSUPP; 2990 2991 /* 2992 * Striped file layouts require that we copy partial objects, but the 2993 * OSD copy-from operation only supports full-object copies. Limit 2994 * this to non-striped file layouts for now. 2995 */ 2996 if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || 2997 (src_ci->i_layout.stripe_count != 1) || 2998 (dst_ci->i_layout.stripe_count != 1) || 2999 (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) { 3000 doutc(cl, "Invalid src/dst files layout\n"); 3001 return -EOPNOTSUPP; 3002 } 3003 3004 /* Every encrypted inode gets its own key, so we can't offload them */ 3005 if (IS_ENCRYPTED(src_inode) || IS_ENCRYPTED(dst_inode)) 3006 return -EOPNOTSUPP; 3007 3008 if (len < src_ci->i_layout.object_size) 3009 return -EOPNOTSUPP; /* no remote copy will be done */ 3010 3011 prealloc_cf = ceph_alloc_cap_flush(); 3012 if (!prealloc_cf) 3013 return -ENOMEM; 3014 3015 /* Start by sync'ing the source and destination files */ 3016 ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); 3017 if (ret < 0) { 3018 doutc(cl, "failed to write src file (%zd)\n", ret); 3019 goto out; 3020 } 3021 ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len)); 3022 if (ret < 0) { 3023 doutc(cl, "failed to write dst file (%zd)\n", ret); 3024 goto out; 3025 } 3026 3027 /* 3028 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other 3029 * clients may have dirty data in their caches. And OSDs know nothing 3030 * about caps, so they can't safely do the remote object copies. 3031 */ 3032 err = get_rd_wr_caps(src_file, &src_got, 3033 dst_file, (dst_off + len), &dst_got); 3034 if (err < 0) { 3035 doutc(cl, "get_rd_wr_caps returned %d\n", err); 3036 ret = -EOPNOTSUPP; 3037 goto out; 3038 } 3039 3040 ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); 3041 if (ret < 0) 3042 goto out_caps; 3043 3044 /* Drop dst file cached pages */ 3045 ceph_fscache_invalidate(dst_inode, false); 3046 ret = invalidate_inode_pages2_range(dst_inode->i_mapping, 3047 dst_off >> PAGE_SHIFT, 3048 (dst_off + len) >> PAGE_SHIFT); 3049 if (ret < 0) { 3050 doutc(cl, "Failed to invalidate inode pages (%zd)\n", 3051 ret); 3052 ret = 0; /* XXX */ 3053 } 3054 ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, 3055 src_ci->i_layout.object_size, 3056 &src_objnum, &src_objoff, &src_objlen); 3057 ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, 3058 dst_ci->i_layout.object_size, 3059 &dst_objnum, &dst_objoff, &dst_objlen); 3060 /* object-level offsets need to the same */ 3061 if (src_objoff != dst_objoff) { 3062 ret = -EOPNOTSUPP; 3063 goto out_caps; 3064 } 3065 3066 /* 3067 * Do a manual copy if the object offset isn't object aligned. 3068 * 'src_objlen' contains the bytes left until the end of the object, 3069 * starting at the src_off 3070 */ 3071 if (src_objoff) { 3072 doutc(cl, "Initial partial copy of %u bytes\n", src_objlen); 3073 3074 /* 3075 * we need to temporarily drop all caps as we'll be calling 3076 * {read,write}_iter, which will get caps again. 3077 */ 3078 put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); 3079 ret = splice_file_range(src_file, &src_off, dst_file, &dst_off, 3080 src_objlen); 3081 /* Abort on short copies or on error */ 3082 if (ret < (long)src_objlen) { 3083 doutc(cl, "Failed partial copy (%zd)\n", ret); 3084 goto out; 3085 } 3086 len -= ret; 3087 err = get_rd_wr_caps(src_file, &src_got, 3088 dst_file, (dst_off + len), &dst_got); 3089 if (err < 0) 3090 goto out; 3091 err = is_file_size_ok(src_inode, dst_inode, 3092 src_off, dst_off, len); 3093 if (err < 0) 3094 goto out_caps; 3095 } 3096 3097 size = i_size_read(dst_inode); 3098 bytes = ceph_do_objects_copy(src_ci, &src_off, dst_ci, &dst_off, 3099 src_fsc, len, flags); 3100 if (bytes <= 0) { 3101 if (!ret) 3102 ret = bytes; 3103 goto out_caps; 3104 } 3105 doutc(cl, "Copied %zu bytes out of %zu\n", bytes, len); 3106 len -= bytes; 3107 ret += bytes; 3108 3109 file_update_time(dst_file); 3110 inode_inc_iversion_raw(dst_inode); 3111 3112 if (dst_off > size) { 3113 /* Let the MDS know about dst file size change */ 3114 if (ceph_inode_set_size(dst_inode, dst_off) || 3115 ceph_quota_is_max_bytes_approaching(dst_inode, dst_off)) 3116 ceph_check_caps(dst_ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_FLUSH); 3117 } 3118 /* Mark Fw dirty */ 3119 spin_lock(&dst_ci->i_ceph_lock); 3120 dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); 3121 spin_unlock(&dst_ci->i_ceph_lock); 3122 if (dirty) 3123 __mark_inode_dirty(dst_inode, dirty); 3124 3125 out_caps: 3126 put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); 3127 3128 /* 3129 * Do the final manual copy if we still have some bytes left, unless 3130 * there were errors in remote object copies (len >= object_size). 3131 */ 3132 if (len && (len < src_ci->i_layout.object_size)) { 3133 doutc(cl, "Final partial copy of %zu bytes\n", len); 3134 bytes = splice_file_range(src_file, &src_off, dst_file, 3135 &dst_off, len); 3136 if (bytes > 0) 3137 ret += bytes; 3138 else 3139 doutc(cl, "Failed partial copy (%zd)\n", bytes); 3140 } 3141 3142 out: 3143 ceph_free_cap_flush(prealloc_cf); 3144 3145 return ret; 3146 } 3147 3148 static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, 3149 struct file *dst_file, loff_t dst_off, 3150 size_t len, unsigned int flags) 3151 { 3152 ssize_t ret; 3153 3154 ret = __ceph_copy_file_range(src_file, src_off, dst_file, dst_off, 3155 len, flags); 3156 3157 if (ret == -EOPNOTSUPP || ret == -EXDEV) 3158 ret = splice_copy_file_range(src_file, src_off, dst_file, 3159 dst_off, len); 3160 return ret; 3161 } 3162 3163 const struct file_operations ceph_file_fops = { 3164 .open = ceph_open, 3165 .release = ceph_release, 3166 .llseek = ceph_llseek, 3167 .read_iter = ceph_read_iter, 3168 .write_iter = ceph_write_iter, 3169 .mmap_prepare = ceph_mmap_prepare, 3170 .fsync = ceph_fsync, 3171 .lock = ceph_lock, 3172 .setlease = simple_nosetlease, 3173 .flock = ceph_flock, 3174 .splice_read = ceph_splice_read, 3175 .splice_write = iter_file_splice_write, 3176 .unlocked_ioctl = ceph_ioctl, 3177 .compat_ioctl = compat_ptr_ioctl, 3178 .fallocate = ceph_fallocate, 3179 .copy_file_range = ceph_copy_file_range, 3180 }; 3181