1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/spinlock.h> 5 #include <linux/fs_struct.h> 6 #include <linux/namei.h> 7 #include <linux/slab.h> 8 #include <linux/sched.h> 9 #include <linux/xattr.h> 10 11 #include "super.h" 12 #include "mds_client.h" 13 14 /* 15 * Directory operations: readdir, lookup, create, link, unlink, 16 * rename, etc. 17 */ 18 19 /* 20 * Ceph MDS operations are specified in terms of a base ino and 21 * relative path. Thus, the client can specify an operation on a 22 * specific inode (e.g., a getattr due to fstat(2)), or as a path 23 * relative to, say, the root directory. 24 * 25 * Normally, we limit ourselves to strict inode ops (no path component) 26 * or dentry operations (a single path component relative to an ino). The 27 * exception to this is open_root_dentry(), which will open the mount 28 * point by name. 29 */ 30 31 const struct dentry_operations ceph_dentry_ops; 32 33 /* 34 * Initialize ceph dentry state. 35 */ 36 static int ceph_d_init(struct dentry *dentry) 37 { 38 struct ceph_dentry_info *di; 39 40 di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL); 41 if (!di) 42 return -ENOMEM; /* oh well */ 43 44 di->dentry = dentry; 45 di->lease_session = NULL; 46 di->time = jiffies; 47 dentry->d_fsdata = di; 48 ceph_dentry_lru_add(dentry); 49 return 0; 50 } 51 52 /* 53 * for f_pos for readdir: 54 * - hash order: 55 * (0xff << 52) | ((24 bits hash) << 28) | 56 * (the nth entry has hash collision); 57 * - frag+name order; 58 * ((frag value) << 28) | (the nth entry in frag); 59 */ 60 #define OFFSET_BITS 28 61 #define OFFSET_MASK ((1 << OFFSET_BITS) - 1) 62 #define HASH_ORDER (0xffull << (OFFSET_BITS + 24)) 63 loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order) 64 { 65 loff_t fpos = ((loff_t)high << 28) | (loff_t)off; 66 if (hash_order) 67 fpos |= HASH_ORDER; 68 return fpos; 69 } 70 71 static bool is_hash_order(loff_t p) 72 { 73 return (p & HASH_ORDER) == HASH_ORDER; 74 } 75 76 static unsigned fpos_frag(loff_t p) 77 { 78 return p >> OFFSET_BITS; 79 } 80 81 static unsigned fpos_hash(loff_t p) 82 { 83 return ceph_frag_value(fpos_frag(p)); 84 } 85 86 static unsigned fpos_off(loff_t p) 87 { 88 return p & OFFSET_MASK; 89 } 90 91 static int fpos_cmp(loff_t l, loff_t r) 92 { 93 int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r)); 94 if (v) 95 return v; 96 return (int)(fpos_off(l) - fpos_off(r)); 97 } 98 99 /* 100 * make note of the last dentry we read, so we can 101 * continue at the same lexicographical point, 102 * regardless of what dir changes take place on the 103 * server. 104 */ 105 static int note_last_dentry(struct ceph_file_info *fi, const char *name, 106 int len, unsigned next_offset) 107 { 108 char *buf = kmalloc(len+1, GFP_KERNEL); 109 if (!buf) 110 return -ENOMEM; 111 kfree(fi->last_name); 112 fi->last_name = buf; 113 memcpy(fi->last_name, name, len); 114 fi->last_name[len] = 0; 115 fi->next_offset = next_offset; 116 dout("note_last_dentry '%s'\n", fi->last_name); 117 return 0; 118 } 119 120 121 static struct dentry * 122 __dcache_find_get_entry(struct dentry *parent, u64 idx, 123 struct ceph_readdir_cache_control *cache_ctl) 124 { 125 struct inode *dir = d_inode(parent); 126 struct dentry *dentry; 127 unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1; 128 loff_t ptr_pos = idx * sizeof(struct dentry *); 129 pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT; 130 131 if (ptr_pos >= i_size_read(dir)) 132 return NULL; 133 134 if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) { 135 ceph_readdir_cache_release(cache_ctl); 136 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff); 137 if (!cache_ctl->page) { 138 dout(" page %lu not found\n", ptr_pgoff); 139 return ERR_PTR(-EAGAIN); 140 } 141 /* reading/filling the cache are serialized by 142 i_mutex, no need to use page lock */ 143 unlock_page(cache_ctl->page); 144 cache_ctl->dentries = kmap(cache_ctl->page); 145 } 146 147 cache_ctl->index = idx & idx_mask; 148 149 rcu_read_lock(); 150 spin_lock(&parent->d_lock); 151 /* check i_size again here, because empty directory can be 152 * marked as complete while not holding the i_mutex. */ 153 if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) 154 dentry = cache_ctl->dentries[cache_ctl->index]; 155 else 156 dentry = NULL; 157 spin_unlock(&parent->d_lock); 158 if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) 159 dentry = NULL; 160 rcu_read_unlock(); 161 return dentry ? : ERR_PTR(-EAGAIN); 162 } 163 164 /* 165 * When possible, we try to satisfy a readdir by peeking at the 166 * dcache. We make this work by carefully ordering dentries on 167 * d_child when we initially get results back from the MDS, and 168 * falling back to a "normal" sync readdir if any dentries in the dir 169 * are dropped. 170 * 171 * Complete dir indicates that we have all dentries in the dir. It is 172 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by 173 * the MDS if/when the directory is modified). 174 */ 175 static int __dcache_readdir(struct file *file, struct dir_context *ctx, 176 int shared_gen) 177 { 178 struct ceph_file_info *fi = file->private_data; 179 struct dentry *parent = file->f_path.dentry; 180 struct inode *dir = d_inode(parent); 181 struct dentry *dentry, *last = NULL; 182 struct ceph_dentry_info *di; 183 struct ceph_readdir_cache_control cache_ctl = {}; 184 u64 idx = 0; 185 int err = 0; 186 187 dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos); 188 189 /* search start position */ 190 if (ctx->pos > 2) { 191 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *)); 192 while (count > 0) { 193 u64 step = count >> 1; 194 dentry = __dcache_find_get_entry(parent, idx + step, 195 &cache_ctl); 196 if (!dentry) { 197 /* use linar search */ 198 idx = 0; 199 break; 200 } 201 if (IS_ERR(dentry)) { 202 err = PTR_ERR(dentry); 203 goto out; 204 } 205 di = ceph_dentry(dentry); 206 spin_lock(&dentry->d_lock); 207 if (fpos_cmp(di->offset, ctx->pos) < 0) { 208 idx += step + 1; 209 count -= step + 1; 210 } else { 211 count = step; 212 } 213 spin_unlock(&dentry->d_lock); 214 dput(dentry); 215 } 216 217 dout("__dcache_readdir %p cache idx %llu\n", dir, idx); 218 } 219 220 221 for (;;) { 222 bool emit_dentry = false; 223 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl); 224 if (!dentry) { 225 fi->flags |= CEPH_F_ATEND; 226 err = 0; 227 break; 228 } 229 if (IS_ERR(dentry)) { 230 err = PTR_ERR(dentry); 231 goto out; 232 } 233 234 spin_lock(&dentry->d_lock); 235 di = ceph_dentry(dentry); 236 if (d_unhashed(dentry) || 237 d_really_is_negative(dentry) || 238 di->lease_shared_gen != shared_gen) { 239 spin_unlock(&dentry->d_lock); 240 dput(dentry); 241 err = -EAGAIN; 242 goto out; 243 } 244 if (fpos_cmp(ctx->pos, di->offset) <= 0) { 245 emit_dentry = true; 246 } 247 spin_unlock(&dentry->d_lock); 248 249 if (emit_dentry) { 250 dout(" %llx dentry %p %pd %p\n", di->offset, 251 dentry, dentry, d_inode(dentry)); 252 ctx->pos = di->offset; 253 if (!dir_emit(ctx, dentry->d_name.name, 254 dentry->d_name.len, 255 ceph_translate_ino(dentry->d_sb, 256 d_inode(dentry)->i_ino), 257 d_inode(dentry)->i_mode >> 12)) { 258 dput(dentry); 259 err = 0; 260 break; 261 } 262 ctx->pos++; 263 264 if (last) 265 dput(last); 266 last = dentry; 267 } else { 268 dput(dentry); 269 } 270 } 271 out: 272 ceph_readdir_cache_release(&cache_ctl); 273 if (last) { 274 int ret; 275 di = ceph_dentry(last); 276 ret = note_last_dentry(fi, last->d_name.name, last->d_name.len, 277 fpos_off(di->offset) + 1); 278 if (ret < 0) 279 err = ret; 280 dput(last); 281 /* last_name no longer match cache index */ 282 if (fi->readdir_cache_idx >= 0) { 283 fi->readdir_cache_idx = -1; 284 fi->dir_release_count = 0; 285 } 286 } 287 return err; 288 } 289 290 static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos) 291 { 292 if (!fi->last_readdir) 293 return true; 294 if (is_hash_order(pos)) 295 return !ceph_frag_contains_value(fi->frag, fpos_hash(pos)); 296 else 297 return fi->frag != fpos_frag(pos); 298 } 299 300 static int ceph_readdir(struct file *file, struct dir_context *ctx) 301 { 302 struct ceph_file_info *fi = file->private_data; 303 struct inode *inode = file_inode(file); 304 struct ceph_inode_info *ci = ceph_inode(inode); 305 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 306 struct ceph_mds_client *mdsc = fsc->mdsc; 307 int i; 308 int err; 309 unsigned frag = -1; 310 struct ceph_mds_reply_info_parsed *rinfo; 311 312 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos); 313 if (fi->flags & CEPH_F_ATEND) 314 return 0; 315 316 /* always start with . and .. */ 317 if (ctx->pos == 0) { 318 dout("readdir off 0 -> '.'\n"); 319 if (!dir_emit(ctx, ".", 1, 320 ceph_translate_ino(inode->i_sb, inode->i_ino), 321 inode->i_mode >> 12)) 322 return 0; 323 ctx->pos = 1; 324 } 325 if (ctx->pos == 1) { 326 ino_t ino = parent_ino(file->f_path.dentry); 327 dout("readdir off 1 -> '..'\n"); 328 if (!dir_emit(ctx, "..", 2, 329 ceph_translate_ino(inode->i_sb, ino), 330 inode->i_mode >> 12)) 331 return 0; 332 ctx->pos = 2; 333 } 334 335 /* can we use the dcache? */ 336 spin_lock(&ci->i_ceph_lock); 337 if (ceph_test_mount_opt(fsc, DCACHE) && 338 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 339 ceph_snap(inode) != CEPH_SNAPDIR && 340 __ceph_dir_is_complete_ordered(ci) && 341 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 342 int shared_gen = atomic_read(&ci->i_shared_gen); 343 spin_unlock(&ci->i_ceph_lock); 344 err = __dcache_readdir(file, ctx, shared_gen); 345 if (err != -EAGAIN) 346 return err; 347 } else { 348 spin_unlock(&ci->i_ceph_lock); 349 } 350 351 /* proceed with a normal readdir */ 352 more: 353 /* do we have the correct frag content buffered? */ 354 if (need_send_readdir(fi, ctx->pos)) { 355 struct ceph_mds_request *req; 356 int op = ceph_snap(inode) == CEPH_SNAPDIR ? 357 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; 358 359 /* discard old result, if any */ 360 if (fi->last_readdir) { 361 ceph_mdsc_put_request(fi->last_readdir); 362 fi->last_readdir = NULL; 363 } 364 365 if (is_hash_order(ctx->pos)) { 366 /* fragtree isn't always accurate. choose frag 367 * based on previous reply when possible. */ 368 if (frag == (unsigned)-1) 369 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), 370 NULL, NULL); 371 } else { 372 frag = fpos_frag(ctx->pos); 373 } 374 375 dout("readdir fetching %llx.%llx frag %x offset '%s'\n", 376 ceph_vinop(inode), frag, fi->last_name); 377 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 378 if (IS_ERR(req)) 379 return PTR_ERR(req); 380 err = ceph_alloc_readdir_reply_buffer(req, inode); 381 if (err) { 382 ceph_mdsc_put_request(req); 383 return err; 384 } 385 /* hints to request -> mds selection code */ 386 req->r_direct_mode = USE_AUTH_MDS; 387 if (op == CEPH_MDS_OP_READDIR) { 388 req->r_direct_hash = ceph_frag_value(frag); 389 __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 390 req->r_inode_drop = CEPH_CAP_FILE_EXCL; 391 } 392 if (fi->last_name) { 393 req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); 394 if (!req->r_path2) { 395 ceph_mdsc_put_request(req); 396 return -ENOMEM; 397 } 398 } else if (is_hash_order(ctx->pos)) { 399 req->r_args.readdir.offset_hash = 400 cpu_to_le32(fpos_hash(ctx->pos)); 401 } 402 403 req->r_dir_release_cnt = fi->dir_release_count; 404 req->r_dir_ordered_cnt = fi->dir_ordered_count; 405 req->r_readdir_cache_idx = fi->readdir_cache_idx; 406 req->r_readdir_offset = fi->next_offset; 407 req->r_args.readdir.frag = cpu_to_le32(frag); 408 req->r_args.readdir.flags = 409 cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS); 410 411 req->r_inode = inode; 412 ihold(inode); 413 req->r_dentry = dget(file->f_path.dentry); 414 err = ceph_mdsc_do_request(mdsc, NULL, req); 415 if (err < 0) { 416 ceph_mdsc_put_request(req); 417 return err; 418 } 419 dout("readdir got and parsed readdir result=%d on " 420 "frag %x, end=%d, complete=%d, hash_order=%d\n", 421 err, frag, 422 (int)req->r_reply_info.dir_end, 423 (int)req->r_reply_info.dir_complete, 424 (int)req->r_reply_info.hash_order); 425 426 rinfo = &req->r_reply_info; 427 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 428 frag = le32_to_cpu(rinfo->dir_dir->frag); 429 if (!rinfo->hash_order) { 430 fi->next_offset = req->r_readdir_offset; 431 /* adjust ctx->pos to beginning of frag */ 432 ctx->pos = ceph_make_fpos(frag, 433 fi->next_offset, 434 false); 435 } 436 } 437 438 fi->frag = frag; 439 fi->last_readdir = req; 440 441 if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) { 442 fi->readdir_cache_idx = req->r_readdir_cache_idx; 443 if (fi->readdir_cache_idx < 0) { 444 /* preclude from marking dir ordered */ 445 fi->dir_ordered_count = 0; 446 } else if (ceph_frag_is_leftmost(frag) && 447 fi->next_offset == 2) { 448 /* note dir version at start of readdir so 449 * we can tell if any dentries get dropped */ 450 fi->dir_release_count = req->r_dir_release_cnt; 451 fi->dir_ordered_count = req->r_dir_ordered_cnt; 452 } 453 } else { 454 dout("readdir !did_prepopulate"); 455 /* disable readdir cache */ 456 fi->readdir_cache_idx = -1; 457 /* preclude from marking dir complete */ 458 fi->dir_release_count = 0; 459 } 460 461 /* note next offset and last dentry name */ 462 if (rinfo->dir_nr > 0) { 463 struct ceph_mds_reply_dir_entry *rde = 464 rinfo->dir_entries + (rinfo->dir_nr-1); 465 unsigned next_offset = req->r_reply_info.dir_end ? 466 2 : (fpos_off(rde->offset) + 1); 467 err = note_last_dentry(fi, rde->name, rde->name_len, 468 next_offset); 469 if (err) 470 return err; 471 } else if (req->r_reply_info.dir_end) { 472 fi->next_offset = 2; 473 /* keep last name */ 474 } 475 } 476 477 rinfo = &fi->last_readdir->r_reply_info; 478 dout("readdir frag %x num %d pos %llx chunk first %llx\n", 479 fi->frag, rinfo->dir_nr, ctx->pos, 480 rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL); 481 482 i = 0; 483 /* search start position */ 484 if (rinfo->dir_nr > 0) { 485 int step, nr = rinfo->dir_nr; 486 while (nr > 0) { 487 step = nr >> 1; 488 if (rinfo->dir_entries[i + step].offset < ctx->pos) { 489 i += step + 1; 490 nr -= step + 1; 491 } else { 492 nr = step; 493 } 494 } 495 } 496 for (; i < rinfo->dir_nr; i++) { 497 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; 498 struct ceph_vino vino; 499 ino_t ino; 500 u32 ftype; 501 502 BUG_ON(rde->offset < ctx->pos); 503 504 ctx->pos = rde->offset; 505 dout("readdir (%d/%d) -> %llx '%.*s' %p\n", 506 i, rinfo->dir_nr, ctx->pos, 507 rde->name_len, rde->name, &rde->inode.in); 508 509 BUG_ON(!rde->inode.in); 510 ftype = le32_to_cpu(rde->inode.in->mode) >> 12; 511 vino.ino = le64_to_cpu(rde->inode.in->ino); 512 vino.snap = le64_to_cpu(rde->inode.in->snapid); 513 ino = ceph_vino_to_ino(vino); 514 515 if (!dir_emit(ctx, rde->name, rde->name_len, 516 ceph_translate_ino(inode->i_sb, ino), ftype)) { 517 dout("filldir stopping us...\n"); 518 return 0; 519 } 520 ctx->pos++; 521 } 522 523 ceph_mdsc_put_request(fi->last_readdir); 524 fi->last_readdir = NULL; 525 526 if (fi->next_offset > 2) { 527 frag = fi->frag; 528 goto more; 529 } 530 531 /* more frags? */ 532 if (!ceph_frag_is_rightmost(fi->frag)) { 533 frag = ceph_frag_next(fi->frag); 534 if (is_hash_order(ctx->pos)) { 535 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), 536 fi->next_offset, true); 537 if (new_pos > ctx->pos) 538 ctx->pos = new_pos; 539 /* keep last_name */ 540 } else { 541 ctx->pos = ceph_make_fpos(frag, fi->next_offset, false); 542 kfree(fi->last_name); 543 fi->last_name = NULL; 544 } 545 dout("readdir next frag is %x\n", frag); 546 goto more; 547 } 548 fi->flags |= CEPH_F_ATEND; 549 550 /* 551 * if dir_release_count still matches the dir, no dentries 552 * were released during the whole readdir, and we should have 553 * the complete dir contents in our cache. 554 */ 555 if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) { 556 spin_lock(&ci->i_ceph_lock); 557 if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) { 558 dout(" marking %p complete and ordered\n", inode); 559 /* use i_size to track number of entries in 560 * readdir cache */ 561 BUG_ON(fi->readdir_cache_idx < 0); 562 i_size_write(inode, fi->readdir_cache_idx * 563 sizeof(struct dentry*)); 564 } else { 565 dout(" marking %p complete\n", inode); 566 } 567 __ceph_dir_set_complete(ci, fi->dir_release_count, 568 fi->dir_ordered_count); 569 spin_unlock(&ci->i_ceph_lock); 570 } 571 572 dout("readdir %p file %p done.\n", inode, file); 573 return 0; 574 } 575 576 static void reset_readdir(struct ceph_file_info *fi) 577 { 578 if (fi->last_readdir) { 579 ceph_mdsc_put_request(fi->last_readdir); 580 fi->last_readdir = NULL; 581 } 582 kfree(fi->last_name); 583 fi->last_name = NULL; 584 fi->dir_release_count = 0; 585 fi->readdir_cache_idx = -1; 586 fi->next_offset = 2; /* compensate for . and .. */ 587 fi->flags &= ~CEPH_F_ATEND; 588 } 589 590 /* 591 * discard buffered readdir content on seekdir(0), or seek to new frag, 592 * or seek prior to current chunk 593 */ 594 static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos) 595 { 596 struct ceph_mds_reply_info_parsed *rinfo; 597 loff_t chunk_offset; 598 if (new_pos == 0) 599 return true; 600 if (is_hash_order(new_pos)) { 601 /* no need to reset last_name for a forward seek when 602 * dentries are sotred in hash order */ 603 } else if (fi->frag != fpos_frag(new_pos)) { 604 return true; 605 } 606 rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL; 607 if (!rinfo || !rinfo->dir_nr) 608 return true; 609 chunk_offset = rinfo->dir_entries[0].offset; 610 return new_pos < chunk_offset || 611 is_hash_order(new_pos) != is_hash_order(chunk_offset); 612 } 613 614 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) 615 { 616 struct ceph_file_info *fi = file->private_data; 617 struct inode *inode = file->f_mapping->host; 618 loff_t retval; 619 620 inode_lock(inode); 621 retval = -EINVAL; 622 switch (whence) { 623 case SEEK_CUR: 624 offset += file->f_pos; 625 case SEEK_SET: 626 break; 627 case SEEK_END: 628 retval = -EOPNOTSUPP; 629 default: 630 goto out; 631 } 632 633 if (offset >= 0) { 634 if (need_reset_readdir(fi, offset)) { 635 dout("dir_llseek dropping %p content\n", file); 636 reset_readdir(fi); 637 } else if (is_hash_order(offset) && offset > file->f_pos) { 638 /* for hash offset, we don't know if a forward seek 639 * is within same frag */ 640 fi->dir_release_count = 0; 641 fi->readdir_cache_idx = -1; 642 } 643 644 if (offset != file->f_pos) { 645 file->f_pos = offset; 646 file->f_version = 0; 647 fi->flags &= ~CEPH_F_ATEND; 648 } 649 retval = offset; 650 } 651 out: 652 inode_unlock(inode); 653 return retval; 654 } 655 656 /* 657 * Handle lookups for the hidden .snap directory. 658 */ 659 int ceph_handle_snapdir(struct ceph_mds_request *req, 660 struct dentry *dentry, int err) 661 { 662 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 663 struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */ 664 665 /* .snap dir? */ 666 if (err == -ENOENT && 667 ceph_snap(parent) == CEPH_NOSNAP && 668 strcmp(dentry->d_name.name, 669 fsc->mount_options->snapdir_name) == 0) { 670 struct inode *inode = ceph_get_snapdir(parent); 671 dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n", 672 dentry, dentry, inode); 673 BUG_ON(!d_unhashed(dentry)); 674 d_add(dentry, inode); 675 err = 0; 676 } 677 return err; 678 } 679 680 /* 681 * Figure out final result of a lookup/open request. 682 * 683 * Mainly, make sure we return the final req->r_dentry (if it already 684 * existed) in place of the original VFS-provided dentry when they 685 * differ. 686 * 687 * Gracefully handle the case where the MDS replies with -ENOENT and 688 * no trace (which it may do, at its discretion, e.g., if it doesn't 689 * care to issue a lease on the negative dentry). 690 */ 691 struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 692 struct dentry *dentry, int err) 693 { 694 if (err == -ENOENT) { 695 /* no trace? */ 696 err = 0; 697 if (!req->r_reply_info.head->is_dentry) { 698 dout("ENOENT and no trace, dentry %p inode %p\n", 699 dentry, d_inode(dentry)); 700 if (d_really_is_positive(dentry)) { 701 d_drop(dentry); 702 err = -ENOENT; 703 } else { 704 d_add(dentry, NULL); 705 } 706 } 707 } 708 if (err) 709 dentry = ERR_PTR(err); 710 else if (dentry != req->r_dentry) 711 dentry = dget(req->r_dentry); /* we got spliced */ 712 else 713 dentry = NULL; 714 return dentry; 715 } 716 717 static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry) 718 { 719 return ceph_ino(inode) == CEPH_INO_ROOT && 720 strncmp(dentry->d_name.name, ".ceph", 5) == 0; 721 } 722 723 /* 724 * Look up a single dir entry. If there is a lookup intent, inform 725 * the MDS so that it gets our 'caps wanted' value in a single op. 726 */ 727 static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, 728 unsigned int flags) 729 { 730 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 731 struct ceph_mds_client *mdsc = fsc->mdsc; 732 struct ceph_mds_request *req; 733 int op; 734 int mask; 735 int err; 736 737 dout("lookup %p dentry %p '%pd'\n", 738 dir, dentry, dentry); 739 740 if (dentry->d_name.len > NAME_MAX) 741 return ERR_PTR(-ENAMETOOLONG); 742 743 /* can we conclude ENOENT locally? */ 744 if (d_really_is_negative(dentry)) { 745 struct ceph_inode_info *ci = ceph_inode(dir); 746 struct ceph_dentry_info *di = ceph_dentry(dentry); 747 748 spin_lock(&ci->i_ceph_lock); 749 dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags); 750 if (strncmp(dentry->d_name.name, 751 fsc->mount_options->snapdir_name, 752 dentry->d_name.len) && 753 !is_root_ceph_dentry(dir, dentry) && 754 ceph_test_mount_opt(fsc, DCACHE) && 755 __ceph_dir_is_complete(ci) && 756 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { 757 spin_unlock(&ci->i_ceph_lock); 758 dout(" dir %p complete, -ENOENT\n", dir); 759 d_add(dentry, NULL); 760 di->lease_shared_gen = atomic_read(&ci->i_shared_gen); 761 return NULL; 762 } 763 spin_unlock(&ci->i_ceph_lock); 764 } 765 766 op = ceph_snap(dir) == CEPH_SNAPDIR ? 767 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; 768 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 769 if (IS_ERR(req)) 770 return ERR_CAST(req); 771 req->r_dentry = dget(dentry); 772 req->r_num_caps = 2; 773 774 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 775 if (ceph_security_xattr_wanted(dir)) 776 mask |= CEPH_CAP_XATTR_SHARED; 777 req->r_args.getattr.mask = cpu_to_le32(mask); 778 779 req->r_parent = dir; 780 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 781 err = ceph_mdsc_do_request(mdsc, NULL, req); 782 err = ceph_handle_snapdir(req, dentry, err); 783 dentry = ceph_finish_lookup(req, dentry, err); 784 ceph_mdsc_put_request(req); /* will dput(dentry) */ 785 dout("lookup result=%p\n", dentry); 786 return dentry; 787 } 788 789 /* 790 * If we do a create but get no trace back from the MDS, follow up with 791 * a lookup (the VFS expects us to link up the provided dentry). 792 */ 793 int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) 794 { 795 struct dentry *result = ceph_lookup(dir, dentry, 0); 796 797 if (result && !IS_ERR(result)) { 798 /* 799 * We created the item, then did a lookup, and found 800 * it was already linked to another inode we already 801 * had in our cache (and thus got spliced). To not 802 * confuse VFS (especially when inode is a directory), 803 * we don't link our dentry to that inode, return an 804 * error instead. 805 * 806 * This event should be rare and it happens only when 807 * we talk to old MDS. Recent MDS does not send traceless 808 * reply for request that creates new inode. 809 */ 810 d_drop(result); 811 return -ESTALE; 812 } 813 return PTR_ERR(result); 814 } 815 816 static int ceph_mknod(struct inode *dir, struct dentry *dentry, 817 umode_t mode, dev_t rdev) 818 { 819 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 820 struct ceph_mds_client *mdsc = fsc->mdsc; 821 struct ceph_mds_request *req; 822 struct ceph_acls_info acls = {}; 823 int err; 824 825 if (ceph_snap(dir) != CEPH_NOSNAP) 826 return -EROFS; 827 828 err = ceph_pre_init_acls(dir, &mode, &acls); 829 if (err < 0) 830 return err; 831 832 dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", 833 dir, dentry, mode, rdev); 834 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); 835 if (IS_ERR(req)) { 836 err = PTR_ERR(req); 837 goto out; 838 } 839 req->r_dentry = dget(dentry); 840 req->r_num_caps = 2; 841 req->r_parent = dir; 842 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 843 req->r_args.mknod.mode = cpu_to_le32(mode); 844 req->r_args.mknod.rdev = cpu_to_le32(rdev); 845 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 846 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 847 if (acls.pagelist) { 848 req->r_pagelist = acls.pagelist; 849 acls.pagelist = NULL; 850 } 851 err = ceph_mdsc_do_request(mdsc, dir, req); 852 if (!err && !req->r_reply_info.head->is_dentry) 853 err = ceph_handle_notrace_create(dir, dentry); 854 ceph_mdsc_put_request(req); 855 out: 856 if (!err) 857 ceph_init_inode_acls(d_inode(dentry), &acls); 858 else 859 d_drop(dentry); 860 ceph_release_acls_info(&acls); 861 return err; 862 } 863 864 static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode, 865 bool excl) 866 { 867 return ceph_mknod(dir, dentry, mode, 0); 868 } 869 870 static int ceph_symlink(struct inode *dir, struct dentry *dentry, 871 const char *dest) 872 { 873 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 874 struct ceph_mds_client *mdsc = fsc->mdsc; 875 struct ceph_mds_request *req; 876 int err; 877 878 if (ceph_snap(dir) != CEPH_NOSNAP) 879 return -EROFS; 880 881 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); 882 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); 883 if (IS_ERR(req)) { 884 err = PTR_ERR(req); 885 goto out; 886 } 887 req->r_path2 = kstrdup(dest, GFP_KERNEL); 888 if (!req->r_path2) { 889 err = -ENOMEM; 890 ceph_mdsc_put_request(req); 891 goto out; 892 } 893 req->r_parent = dir; 894 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 895 req->r_dentry = dget(dentry); 896 req->r_num_caps = 2; 897 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 898 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 899 err = ceph_mdsc_do_request(mdsc, dir, req); 900 if (!err && !req->r_reply_info.head->is_dentry) 901 err = ceph_handle_notrace_create(dir, dentry); 902 ceph_mdsc_put_request(req); 903 out: 904 if (err) 905 d_drop(dentry); 906 return err; 907 } 908 909 static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) 910 { 911 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 912 struct ceph_mds_client *mdsc = fsc->mdsc; 913 struct ceph_mds_request *req; 914 struct ceph_acls_info acls = {}; 915 int err = -EROFS; 916 int op; 917 918 if (ceph_snap(dir) == CEPH_SNAPDIR) { 919 /* mkdir .snap/foo is a MKSNAP */ 920 op = CEPH_MDS_OP_MKSNAP; 921 dout("mksnap dir %p snap '%pd' dn %p\n", dir, 922 dentry, dentry); 923 } else if (ceph_snap(dir) == CEPH_NOSNAP) { 924 dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode); 925 op = CEPH_MDS_OP_MKDIR; 926 } else { 927 goto out; 928 } 929 930 mode |= S_IFDIR; 931 err = ceph_pre_init_acls(dir, &mode, &acls); 932 if (err < 0) 933 goto out; 934 935 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 936 if (IS_ERR(req)) { 937 err = PTR_ERR(req); 938 goto out; 939 } 940 941 req->r_dentry = dget(dentry); 942 req->r_num_caps = 2; 943 req->r_parent = dir; 944 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 945 req->r_args.mkdir.mode = cpu_to_le32(mode); 946 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 947 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 948 if (acls.pagelist) { 949 req->r_pagelist = acls.pagelist; 950 acls.pagelist = NULL; 951 } 952 err = ceph_mdsc_do_request(mdsc, dir, req); 953 if (!err && 954 !req->r_reply_info.head->is_target && 955 !req->r_reply_info.head->is_dentry) 956 err = ceph_handle_notrace_create(dir, dentry); 957 ceph_mdsc_put_request(req); 958 out: 959 if (!err) 960 ceph_init_inode_acls(d_inode(dentry), &acls); 961 else 962 d_drop(dentry); 963 ceph_release_acls_info(&acls); 964 return err; 965 } 966 967 static int ceph_link(struct dentry *old_dentry, struct inode *dir, 968 struct dentry *dentry) 969 { 970 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 971 struct ceph_mds_client *mdsc = fsc->mdsc; 972 struct ceph_mds_request *req; 973 int err; 974 975 if (ceph_snap(dir) != CEPH_NOSNAP) 976 return -EROFS; 977 978 dout("link in dir %p old_dentry %p dentry %p\n", dir, 979 old_dentry, dentry); 980 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS); 981 if (IS_ERR(req)) { 982 d_drop(dentry); 983 return PTR_ERR(req); 984 } 985 req->r_dentry = dget(dentry); 986 req->r_num_caps = 2; 987 req->r_old_dentry = dget(old_dentry); 988 req->r_parent = dir; 989 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 990 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 991 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 992 /* release LINK_SHARED on source inode (mds will lock it) */ 993 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 994 err = ceph_mdsc_do_request(mdsc, dir, req); 995 if (err) { 996 d_drop(dentry); 997 } else if (!req->r_reply_info.head->is_dentry) { 998 ihold(d_inode(old_dentry)); 999 d_instantiate(dentry, d_inode(old_dentry)); 1000 } 1001 ceph_mdsc_put_request(req); 1002 return err; 1003 } 1004 1005 /* 1006 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it 1007 * looks like the link count will hit 0, drop any other caps (other 1008 * than PIN) we don't specifically want (due to the file still being 1009 * open). 1010 */ 1011 static int drop_caps_for_unlink(struct inode *inode) 1012 { 1013 struct ceph_inode_info *ci = ceph_inode(inode); 1014 int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 1015 1016 spin_lock(&ci->i_ceph_lock); 1017 if (inode->i_nlink == 1) { 1018 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN); 1019 ci->i_ceph_flags |= CEPH_I_NODELAY; 1020 } 1021 spin_unlock(&ci->i_ceph_lock); 1022 return drop; 1023 } 1024 1025 /* 1026 * rmdir and unlink are differ only by the metadata op code 1027 */ 1028 static int ceph_unlink(struct inode *dir, struct dentry *dentry) 1029 { 1030 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 1031 struct ceph_mds_client *mdsc = fsc->mdsc; 1032 struct inode *inode = d_inode(dentry); 1033 struct ceph_mds_request *req; 1034 int err = -EROFS; 1035 int op; 1036 1037 if (ceph_snap(dir) == CEPH_SNAPDIR) { 1038 /* rmdir .snap/foo is RMSNAP */ 1039 dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry); 1040 op = CEPH_MDS_OP_RMSNAP; 1041 } else if (ceph_snap(dir) == CEPH_NOSNAP) { 1042 dout("unlink/rmdir dir %p dn %p inode %p\n", 1043 dir, dentry, inode); 1044 op = d_is_dir(dentry) ? 1045 CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK; 1046 } else 1047 goto out; 1048 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 1049 if (IS_ERR(req)) { 1050 err = PTR_ERR(req); 1051 goto out; 1052 } 1053 req->r_dentry = dget(dentry); 1054 req->r_num_caps = 2; 1055 req->r_parent = dir; 1056 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1057 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1058 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1059 req->r_inode_drop = drop_caps_for_unlink(inode); 1060 err = ceph_mdsc_do_request(mdsc, dir, req); 1061 if (!err && !req->r_reply_info.head->is_dentry) 1062 d_delete(dentry); 1063 ceph_mdsc_put_request(req); 1064 out: 1065 return err; 1066 } 1067 1068 static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, 1069 struct inode *new_dir, struct dentry *new_dentry, 1070 unsigned int flags) 1071 { 1072 struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb); 1073 struct ceph_mds_client *mdsc = fsc->mdsc; 1074 struct ceph_mds_request *req; 1075 int op = CEPH_MDS_OP_RENAME; 1076 int err; 1077 1078 if (flags) 1079 return -EINVAL; 1080 1081 if (ceph_snap(old_dir) != ceph_snap(new_dir)) 1082 return -EXDEV; 1083 if (ceph_snap(old_dir) != CEPH_NOSNAP) { 1084 if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR) 1085 op = CEPH_MDS_OP_RENAMESNAP; 1086 else 1087 return -EROFS; 1088 } 1089 dout("rename dir %p dentry %p to dir %p dentry %p\n", 1090 old_dir, old_dentry, new_dir, new_dentry); 1091 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 1092 if (IS_ERR(req)) 1093 return PTR_ERR(req); 1094 ihold(old_dir); 1095 req->r_dentry = dget(new_dentry); 1096 req->r_num_caps = 2; 1097 req->r_old_dentry = dget(old_dentry); 1098 req->r_old_dentry_dir = old_dir; 1099 req->r_parent = new_dir; 1100 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1101 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 1102 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; 1103 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1104 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1105 /* release LINK_RDCACHE on source inode (mds will lock it) */ 1106 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 1107 if (d_really_is_positive(new_dentry)) 1108 req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry)); 1109 err = ceph_mdsc_do_request(mdsc, old_dir, req); 1110 if (!err && !req->r_reply_info.head->is_dentry) { 1111 /* 1112 * Normally d_move() is done by fill_trace (called by 1113 * do_request, above). If there is no trace, we need 1114 * to do it here. 1115 */ 1116 d_move(old_dentry, new_dentry); 1117 } 1118 ceph_mdsc_put_request(req); 1119 return err; 1120 } 1121 1122 /* 1123 * Ensure a dentry lease will no longer revalidate. 1124 */ 1125 void ceph_invalidate_dentry_lease(struct dentry *dentry) 1126 { 1127 spin_lock(&dentry->d_lock); 1128 ceph_dentry(dentry)->time = jiffies; 1129 ceph_dentry(dentry)->lease_shared_gen = 0; 1130 spin_unlock(&dentry->d_lock); 1131 } 1132 1133 /* 1134 * Check if dentry lease is valid. If not, delete the lease. Try to 1135 * renew if the least is more than half up. 1136 */ 1137 static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, 1138 struct inode *dir) 1139 { 1140 struct ceph_dentry_info *di; 1141 struct ceph_mds_session *s; 1142 int valid = 0; 1143 u32 gen; 1144 unsigned long ttl; 1145 struct ceph_mds_session *session = NULL; 1146 u32 seq = 0; 1147 1148 spin_lock(&dentry->d_lock); 1149 di = ceph_dentry(dentry); 1150 if (di && di->lease_session) { 1151 s = di->lease_session; 1152 spin_lock(&s->s_gen_ttl_lock); 1153 gen = s->s_cap_gen; 1154 ttl = s->s_cap_ttl; 1155 spin_unlock(&s->s_gen_ttl_lock); 1156 1157 if (di->lease_gen == gen && 1158 time_before(jiffies, di->time) && 1159 time_before(jiffies, ttl)) { 1160 valid = 1; 1161 if (di->lease_renew_after && 1162 time_after(jiffies, di->lease_renew_after)) { 1163 /* 1164 * We should renew. If we're in RCU walk mode 1165 * though, we can't do that so just return 1166 * -ECHILD. 1167 */ 1168 if (flags & LOOKUP_RCU) { 1169 valid = -ECHILD; 1170 } else { 1171 session = ceph_get_mds_session(s); 1172 seq = di->lease_seq; 1173 di->lease_renew_after = 0; 1174 di->lease_renew_from = jiffies; 1175 } 1176 } 1177 } 1178 } 1179 spin_unlock(&dentry->d_lock); 1180 1181 if (session) { 1182 ceph_mdsc_lease_send_msg(session, dir, dentry, 1183 CEPH_MDS_LEASE_RENEW, seq); 1184 ceph_put_mds_session(session); 1185 } 1186 dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid); 1187 return valid; 1188 } 1189 1190 /* 1191 * Check if directory-wide content lease/cap is valid. 1192 */ 1193 static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) 1194 { 1195 struct ceph_inode_info *ci = ceph_inode(dir); 1196 struct ceph_dentry_info *di = ceph_dentry(dentry); 1197 int valid = 0; 1198 1199 spin_lock(&ci->i_ceph_lock); 1200 if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen) 1201 valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); 1202 spin_unlock(&ci->i_ceph_lock); 1203 dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", 1204 dir, (unsigned)atomic_read(&ci->i_shared_gen), 1205 dentry, (unsigned)di->lease_shared_gen, valid); 1206 return valid; 1207 } 1208 1209 /* 1210 * Check if cached dentry can be trusted. 1211 */ 1212 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) 1213 { 1214 int valid = 0; 1215 struct dentry *parent; 1216 struct inode *dir; 1217 1218 if (flags & LOOKUP_RCU) { 1219 parent = READ_ONCE(dentry->d_parent); 1220 dir = d_inode_rcu(parent); 1221 if (!dir) 1222 return -ECHILD; 1223 } else { 1224 parent = dget_parent(dentry); 1225 dir = d_inode(parent); 1226 } 1227 1228 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry, 1229 dentry, d_inode(dentry), ceph_dentry(dentry)->offset); 1230 1231 /* always trust cached snapped dentries, snapdir dentry */ 1232 if (ceph_snap(dir) != CEPH_NOSNAP) { 1233 dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry, 1234 dentry, d_inode(dentry)); 1235 valid = 1; 1236 } else if (d_really_is_positive(dentry) && 1237 ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) { 1238 valid = 1; 1239 } else { 1240 valid = dentry_lease_is_valid(dentry, flags, dir); 1241 if (valid == -ECHILD) 1242 return valid; 1243 if (valid || dir_lease_is_valid(dir, dentry)) { 1244 if (d_really_is_positive(dentry)) 1245 valid = ceph_is_any_caps(d_inode(dentry)); 1246 else 1247 valid = 1; 1248 } 1249 } 1250 1251 if (!valid) { 1252 struct ceph_mds_client *mdsc = 1253 ceph_sb_to_client(dir->i_sb)->mdsc; 1254 struct ceph_mds_request *req; 1255 int op, err; 1256 u32 mask; 1257 1258 if (flags & LOOKUP_RCU) 1259 return -ECHILD; 1260 1261 op = ceph_snap(dir) == CEPH_SNAPDIR ? 1262 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; 1263 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 1264 if (!IS_ERR(req)) { 1265 req->r_dentry = dget(dentry); 1266 req->r_num_caps = 2; 1267 req->r_parent = dir; 1268 1269 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 1270 if (ceph_security_xattr_wanted(dir)) 1271 mask |= CEPH_CAP_XATTR_SHARED; 1272 req->r_args.getattr.mask = cpu_to_le32(mask); 1273 1274 err = ceph_mdsc_do_request(mdsc, NULL, req); 1275 switch (err) { 1276 case 0: 1277 if (d_really_is_positive(dentry) && 1278 d_inode(dentry) == req->r_target_inode) 1279 valid = 1; 1280 break; 1281 case -ENOENT: 1282 if (d_really_is_negative(dentry)) 1283 valid = 1; 1284 /* Fallthrough */ 1285 default: 1286 break; 1287 } 1288 ceph_mdsc_put_request(req); 1289 dout("d_revalidate %p lookup result=%d\n", 1290 dentry, err); 1291 } 1292 } 1293 1294 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1295 if (valid) { 1296 ceph_dentry_lru_touch(dentry); 1297 } else { 1298 ceph_dir_clear_complete(dir); 1299 } 1300 1301 if (!(flags & LOOKUP_RCU)) 1302 dput(parent); 1303 return valid; 1304 } 1305 1306 /* 1307 * Release our ceph_dentry_info. 1308 */ 1309 static void ceph_d_release(struct dentry *dentry) 1310 { 1311 struct ceph_dentry_info *di = ceph_dentry(dentry); 1312 1313 dout("d_release %p\n", dentry); 1314 ceph_dentry_lru_del(dentry); 1315 1316 spin_lock(&dentry->d_lock); 1317 dentry->d_fsdata = NULL; 1318 spin_unlock(&dentry->d_lock); 1319 1320 if (di->lease_session) 1321 ceph_put_mds_session(di->lease_session); 1322 kmem_cache_free(ceph_dentry_cachep, di); 1323 } 1324 1325 /* 1326 * When the VFS prunes a dentry from the cache, we need to clear the 1327 * complete flag on the parent directory. 1328 * 1329 * Called under dentry->d_lock. 1330 */ 1331 static void ceph_d_prune(struct dentry *dentry) 1332 { 1333 struct ceph_inode_info *dir_ci; 1334 struct ceph_dentry_info *di; 1335 1336 dout("ceph_d_prune %pd %p\n", dentry, dentry); 1337 1338 /* do we have a valid parent? */ 1339 if (IS_ROOT(dentry)) 1340 return; 1341 1342 /* we hold d_lock, so d_parent is stable */ 1343 dir_ci = ceph_inode(d_inode(dentry->d_parent)); 1344 if (dir_ci->i_vino.snap == CEPH_SNAPDIR) 1345 return; 1346 1347 /* who calls d_delete() should also disable dcache readdir */ 1348 if (d_really_is_negative(dentry)) 1349 return; 1350 1351 /* d_fsdata does not get cleared until d_release */ 1352 if (!d_unhashed(dentry)) { 1353 __ceph_dir_clear_complete(dir_ci); 1354 return; 1355 } 1356 1357 /* Disable dcache readdir just in case that someone called d_drop() 1358 * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED 1359 * properly (dcache readdir is still enabled) */ 1360 di = ceph_dentry(dentry); 1361 if (di->offset > 0 && 1362 di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen)) 1363 __ceph_dir_clear_ordered(dir_ci); 1364 } 1365 1366 /* 1367 * read() on a dir. This weird interface hack only works if mounted 1368 * with '-o dirstat'. 1369 */ 1370 static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, 1371 loff_t *ppos) 1372 { 1373 struct ceph_file_info *cf = file->private_data; 1374 struct inode *inode = file_inode(file); 1375 struct ceph_inode_info *ci = ceph_inode(inode); 1376 int left; 1377 const int bufsize = 1024; 1378 1379 if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT)) 1380 return -EISDIR; 1381 1382 if (!cf->dir_info) { 1383 cf->dir_info = kmalloc(bufsize, GFP_KERNEL); 1384 if (!cf->dir_info) 1385 return -ENOMEM; 1386 cf->dir_info_len = 1387 snprintf(cf->dir_info, bufsize, 1388 "entries: %20lld\n" 1389 " files: %20lld\n" 1390 " subdirs: %20lld\n" 1391 "rentries: %20lld\n" 1392 " rfiles: %20lld\n" 1393 " rsubdirs: %20lld\n" 1394 "rbytes: %20lld\n" 1395 "rctime: %10ld.%09ld\n", 1396 ci->i_files + ci->i_subdirs, 1397 ci->i_files, 1398 ci->i_subdirs, 1399 ci->i_rfiles + ci->i_rsubdirs, 1400 ci->i_rfiles, 1401 ci->i_rsubdirs, 1402 ci->i_rbytes, 1403 (long)ci->i_rctime.tv_sec, 1404 (long)ci->i_rctime.tv_nsec); 1405 } 1406 1407 if (*ppos >= cf->dir_info_len) 1408 return 0; 1409 size = min_t(unsigned, size, cf->dir_info_len-*ppos); 1410 left = copy_to_user(buf, cf->dir_info + *ppos, size); 1411 if (left == size) 1412 return -EFAULT; 1413 *ppos += (size - left); 1414 return size - left; 1415 } 1416 1417 /* 1418 * We maintain a private dentry LRU. 1419 * 1420 * FIXME: this needs to be changed to a per-mds lru to be useful. 1421 */ 1422 void ceph_dentry_lru_add(struct dentry *dn) 1423 { 1424 struct ceph_dentry_info *di = ceph_dentry(dn); 1425 struct ceph_mds_client *mdsc; 1426 1427 dout("dentry_lru_add %p %p '%pd'\n", di, dn, dn); 1428 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1429 spin_lock(&mdsc->dentry_lru_lock); 1430 list_add_tail(&di->lru, &mdsc->dentry_lru); 1431 mdsc->num_dentry++; 1432 spin_unlock(&mdsc->dentry_lru_lock); 1433 } 1434 1435 void ceph_dentry_lru_touch(struct dentry *dn) 1436 { 1437 struct ceph_dentry_info *di = ceph_dentry(dn); 1438 struct ceph_mds_client *mdsc; 1439 1440 dout("dentry_lru_touch %p %p '%pd' (offset %lld)\n", di, dn, dn, 1441 di->offset); 1442 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1443 spin_lock(&mdsc->dentry_lru_lock); 1444 list_move_tail(&di->lru, &mdsc->dentry_lru); 1445 spin_unlock(&mdsc->dentry_lru_lock); 1446 } 1447 1448 void ceph_dentry_lru_del(struct dentry *dn) 1449 { 1450 struct ceph_dentry_info *di = ceph_dentry(dn); 1451 struct ceph_mds_client *mdsc; 1452 1453 dout("dentry_lru_del %p %p '%pd'\n", di, dn, dn); 1454 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1455 spin_lock(&mdsc->dentry_lru_lock); 1456 list_del_init(&di->lru); 1457 mdsc->num_dentry--; 1458 spin_unlock(&mdsc->dentry_lru_lock); 1459 } 1460 1461 /* 1462 * Return name hash for a given dentry. This is dependent on 1463 * the parent directory's hash function. 1464 */ 1465 unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) 1466 { 1467 struct ceph_inode_info *dci = ceph_inode(dir); 1468 1469 switch (dci->i_dir_layout.dl_dir_hash) { 1470 case 0: /* for backward compat */ 1471 case CEPH_STR_HASH_LINUX: 1472 return dn->d_name.hash; 1473 1474 default: 1475 return ceph_str_hash(dci->i_dir_layout.dl_dir_hash, 1476 dn->d_name.name, dn->d_name.len); 1477 } 1478 } 1479 1480 const struct file_operations ceph_dir_fops = { 1481 .read = ceph_read_dir, 1482 .iterate = ceph_readdir, 1483 .llseek = ceph_dir_llseek, 1484 .open = ceph_open, 1485 .release = ceph_release, 1486 .unlocked_ioctl = ceph_ioctl, 1487 .fsync = ceph_fsync, 1488 }; 1489 1490 const struct file_operations ceph_snapdir_fops = { 1491 .iterate = ceph_readdir, 1492 .llseek = ceph_dir_llseek, 1493 .open = ceph_open, 1494 .release = ceph_release, 1495 }; 1496 1497 const struct inode_operations ceph_dir_iops = { 1498 .lookup = ceph_lookup, 1499 .permission = ceph_permission, 1500 .getattr = ceph_getattr, 1501 .setattr = ceph_setattr, 1502 .listxattr = ceph_listxattr, 1503 .get_acl = ceph_get_acl, 1504 .set_acl = ceph_set_acl, 1505 .mknod = ceph_mknod, 1506 .symlink = ceph_symlink, 1507 .mkdir = ceph_mkdir, 1508 .link = ceph_link, 1509 .unlink = ceph_unlink, 1510 .rmdir = ceph_unlink, 1511 .rename = ceph_rename, 1512 .create = ceph_create, 1513 .atomic_open = ceph_atomic_open, 1514 }; 1515 1516 const struct inode_operations ceph_snapdir_iops = { 1517 .lookup = ceph_lookup, 1518 .permission = ceph_permission, 1519 .getattr = ceph_getattr, 1520 .mkdir = ceph_mkdir, 1521 .rmdir = ceph_unlink, 1522 .rename = ceph_rename, 1523 }; 1524 1525 const struct dentry_operations ceph_dentry_ops = { 1526 .d_revalidate = ceph_d_revalidate, 1527 .d_release = ceph_d_release, 1528 .d_prune = ceph_d_prune, 1529 .d_init = ceph_d_init, 1530 }; 1531