1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14 #include <linux/bitmap.h>
15 #include <linux/mnt_idmapping.h>
16
17 #include "super.h"
18 #include "mds_client.h"
19 #include "crypto.h"
20
21 #include <linux/ceph/ceph_features.h>
22 #include <linux/ceph/messenger.h>
23 #include <linux/ceph/decode.h>
24 #include <linux/ceph/pagelist.h>
25 #include <linux/ceph/auth.h>
26 #include <linux/ceph/debugfs.h>
27 #include <trace/events/ceph.h>
28
29 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
30
31 /*
32 * A cluster of MDS (metadata server) daemons is responsible for
33 * managing the file system namespace (the directory hierarchy and
34 * inodes) and for coordinating shared access to storage. Metadata is
35 * partitioning hierarchically across a number of servers, and that
36 * partition varies over time as the cluster adjusts the distribution
37 * in order to balance load.
38 *
39 * The MDS client is primarily responsible to managing synchronous
40 * metadata requests for operations like open, unlink, and so forth.
41 * If there is a MDS failure, we find out about it when we (possibly
42 * request and) receive a new MDS map, and can resubmit affected
43 * requests.
44 *
45 * For the most part, though, we take advantage of a lossless
46 * communications channel to the MDS, and do not need to worry about
47 * timing out or resubmitting requests.
48 *
49 * We maintain a stateful "session" with each MDS we interact with.
50 * Within each session, we sent periodic heartbeat messages to ensure
51 * any capabilities or leases we have been issues remain valid. If
52 * the session times out and goes stale, our leases and capabilities
53 * are no longer valid.
54 */
55
56 struct ceph_reconnect_state {
57 struct ceph_mds_session *session;
58 int nr_caps, nr_realms;
59 struct ceph_pagelist *pagelist;
60 unsigned msg_version;
61 bool allow_multi;
62 };
63
64 static void __wake_requests(struct ceph_mds_client *mdsc,
65 struct list_head *head);
66 static void ceph_cap_release_work(struct work_struct *work);
67 static void ceph_cap_reclaim_work(struct work_struct *work);
68
69 static const struct ceph_connection_operations mds_con_ops;
70
71
72 /*
73 * mds reply parsing
74 */
75
parse_reply_info_quota(void ** p,void * end,struct ceph_mds_reply_info_in * info)76 static int parse_reply_info_quota(void **p, void *end,
77 struct ceph_mds_reply_info_in *info)
78 {
79 u8 struct_v, struct_compat;
80 u32 struct_len;
81
82 ceph_decode_8_safe(p, end, struct_v, bad);
83 ceph_decode_8_safe(p, end, struct_compat, bad);
84 /* struct_v is expected to be >= 1. we only
85 * understand encoding with struct_compat == 1. */
86 if (!struct_v || struct_compat != 1)
87 goto bad;
88 ceph_decode_32_safe(p, end, struct_len, bad);
89 ceph_decode_need(p, end, struct_len, bad);
90 end = *p + struct_len;
91 ceph_decode_64_safe(p, end, info->max_bytes, bad);
92 ceph_decode_64_safe(p, end, info->max_files, bad);
93 *p = end;
94 return 0;
95 bad:
96 return -EIO;
97 }
98
99 /*
100 * parse individual inode info
101 */
parse_reply_info_in(void ** p,void * end,struct ceph_mds_reply_info_in * info,u64 features)102 static int parse_reply_info_in(void **p, void *end,
103 struct ceph_mds_reply_info_in *info,
104 u64 features)
105 {
106 int err = 0;
107 u8 struct_v = 0;
108
109 if (features == (u64)-1) {
110 u32 struct_len;
111 u8 struct_compat;
112 ceph_decode_8_safe(p, end, struct_v, bad);
113 ceph_decode_8_safe(p, end, struct_compat, bad);
114 /* struct_v is expected to be >= 1. we only understand
115 * encoding with struct_compat == 1. */
116 if (!struct_v || struct_compat != 1)
117 goto bad;
118 ceph_decode_32_safe(p, end, struct_len, bad);
119 ceph_decode_need(p, end, struct_len, bad);
120 end = *p + struct_len;
121 }
122
123 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
124 info->in = *p;
125 *p += sizeof(struct ceph_mds_reply_inode) +
126 sizeof(*info->in->fragtree.splits) *
127 le32_to_cpu(info->in->fragtree.nsplits);
128
129 ceph_decode_32_safe(p, end, info->symlink_len, bad);
130 ceph_decode_need(p, end, info->symlink_len, bad);
131 info->symlink = *p;
132 *p += info->symlink_len;
133
134 ceph_decode_copy_safe(p, end, &info->dir_layout,
135 sizeof(info->dir_layout), bad);
136 ceph_decode_32_safe(p, end, info->xattr_len, bad);
137 ceph_decode_need(p, end, info->xattr_len, bad);
138 info->xattr_data = *p;
139 *p += info->xattr_len;
140
141 if (features == (u64)-1) {
142 /* inline data */
143 ceph_decode_64_safe(p, end, info->inline_version, bad);
144 ceph_decode_32_safe(p, end, info->inline_len, bad);
145 ceph_decode_need(p, end, info->inline_len, bad);
146 info->inline_data = *p;
147 *p += info->inline_len;
148 /* quota */
149 err = parse_reply_info_quota(p, end, info);
150 if (err < 0)
151 goto out_bad;
152 /* pool namespace */
153 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
154 if (info->pool_ns_len > 0) {
155 ceph_decode_need(p, end, info->pool_ns_len, bad);
156 info->pool_ns_data = *p;
157 *p += info->pool_ns_len;
158 }
159
160 /* btime */
161 ceph_decode_need(p, end, sizeof(info->btime), bad);
162 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
163
164 /* change attribute */
165 ceph_decode_64_safe(p, end, info->change_attr, bad);
166
167 /* dir pin */
168 if (struct_v >= 2) {
169 ceph_decode_32_safe(p, end, info->dir_pin, bad);
170 } else {
171 info->dir_pin = -ENODATA;
172 }
173
174 /* snapshot birth time, remains zero for v<=2 */
175 if (struct_v >= 3) {
176 ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
177 ceph_decode_copy(p, &info->snap_btime,
178 sizeof(info->snap_btime));
179 } else {
180 memset(&info->snap_btime, 0, sizeof(info->snap_btime));
181 }
182
183 /* snapshot count, remains zero for v<=3 */
184 if (struct_v >= 4) {
185 ceph_decode_64_safe(p, end, info->rsnaps, bad);
186 } else {
187 info->rsnaps = 0;
188 }
189
190 if (struct_v >= 5) {
191 u32 alen;
192
193 ceph_decode_32_safe(p, end, alen, bad);
194
195 while (alen--) {
196 u32 len;
197
198 /* key */
199 ceph_decode_32_safe(p, end, len, bad);
200 ceph_decode_skip_n(p, end, len, bad);
201 /* value */
202 ceph_decode_32_safe(p, end, len, bad);
203 ceph_decode_skip_n(p, end, len, bad);
204 }
205 }
206
207 /* fscrypt flag -- ignore */
208 if (struct_v >= 6)
209 ceph_decode_skip_8(p, end, bad);
210
211 info->fscrypt_auth = NULL;
212 info->fscrypt_auth_len = 0;
213 info->fscrypt_file = NULL;
214 info->fscrypt_file_len = 0;
215 if (struct_v >= 7) {
216 ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
217 if (info->fscrypt_auth_len) {
218 info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
219 GFP_KERNEL);
220 if (!info->fscrypt_auth)
221 return -ENOMEM;
222 ceph_decode_copy_safe(p, end, info->fscrypt_auth,
223 info->fscrypt_auth_len, bad);
224 }
225 ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
226 if (info->fscrypt_file_len) {
227 info->fscrypt_file = kmalloc(info->fscrypt_file_len,
228 GFP_KERNEL);
229 if (!info->fscrypt_file)
230 return -ENOMEM;
231 ceph_decode_copy_safe(p, end, info->fscrypt_file,
232 info->fscrypt_file_len, bad);
233 }
234 }
235 *p = end;
236 } else {
237 /* legacy (unversioned) struct */
238 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
239 ceph_decode_64_safe(p, end, info->inline_version, bad);
240 ceph_decode_32_safe(p, end, info->inline_len, bad);
241 ceph_decode_need(p, end, info->inline_len, bad);
242 info->inline_data = *p;
243 *p += info->inline_len;
244 } else
245 info->inline_version = CEPH_INLINE_NONE;
246
247 if (features & CEPH_FEATURE_MDS_QUOTA) {
248 err = parse_reply_info_quota(p, end, info);
249 if (err < 0)
250 goto out_bad;
251 } else {
252 info->max_bytes = 0;
253 info->max_files = 0;
254 }
255
256 info->pool_ns_len = 0;
257 info->pool_ns_data = NULL;
258 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
259 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
260 if (info->pool_ns_len > 0) {
261 ceph_decode_need(p, end, info->pool_ns_len, bad);
262 info->pool_ns_data = *p;
263 *p += info->pool_ns_len;
264 }
265 }
266
267 if (features & CEPH_FEATURE_FS_BTIME) {
268 ceph_decode_need(p, end, sizeof(info->btime), bad);
269 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
270 ceph_decode_64_safe(p, end, info->change_attr, bad);
271 }
272
273 info->dir_pin = -ENODATA;
274 /* info->snap_btime and info->rsnaps remain zero */
275 }
276 return 0;
277 bad:
278 err = -EIO;
279 out_bad:
280 return err;
281 }
282
parse_reply_info_dir(void ** p,void * end,struct ceph_mds_reply_dirfrag ** dirfrag,u64 features)283 static int parse_reply_info_dir(void **p, void *end,
284 struct ceph_mds_reply_dirfrag **dirfrag,
285 u64 features)
286 {
287 if (features == (u64)-1) {
288 u8 struct_v, struct_compat;
289 u32 struct_len;
290 ceph_decode_8_safe(p, end, struct_v, bad);
291 ceph_decode_8_safe(p, end, struct_compat, bad);
292 /* struct_v is expected to be >= 1. we only understand
293 * encoding whose struct_compat == 1. */
294 if (!struct_v || struct_compat != 1)
295 goto bad;
296 ceph_decode_32_safe(p, end, struct_len, bad);
297 ceph_decode_need(p, end, struct_len, bad);
298 end = *p + struct_len;
299 }
300
301 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
302 *dirfrag = *p;
303 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
304 if (unlikely(*p > end))
305 goto bad;
306 if (features == (u64)-1)
307 *p = end;
308 return 0;
309 bad:
310 return -EIO;
311 }
312
parse_reply_info_lease(void ** p,void * end,struct ceph_mds_reply_lease ** lease,u64 features,u32 * altname_len,u8 ** altname)313 static int parse_reply_info_lease(void **p, void *end,
314 struct ceph_mds_reply_lease **lease,
315 u64 features, u32 *altname_len, u8 **altname)
316 {
317 u8 struct_v;
318 u32 struct_len;
319 void *lend;
320
321 if (features == (u64)-1) {
322 u8 struct_compat;
323
324 ceph_decode_8_safe(p, end, struct_v, bad);
325 ceph_decode_8_safe(p, end, struct_compat, bad);
326
327 /* struct_v is expected to be >= 1. we only understand
328 * encoding whose struct_compat == 1. */
329 if (!struct_v || struct_compat != 1)
330 goto bad;
331
332 ceph_decode_32_safe(p, end, struct_len, bad);
333 } else {
334 struct_len = sizeof(**lease);
335 *altname_len = 0;
336 *altname = NULL;
337 }
338
339 lend = *p + struct_len;
340 ceph_decode_need(p, end, struct_len, bad);
341 *lease = *p;
342 *p += sizeof(**lease);
343
344 if (features == (u64)-1) {
345 if (struct_v >= 2) {
346 ceph_decode_32_safe(p, end, *altname_len, bad);
347 ceph_decode_need(p, end, *altname_len, bad);
348 *altname = *p;
349 *p += *altname_len;
350 } else {
351 *altname = NULL;
352 *altname_len = 0;
353 }
354 }
355 *p = lend;
356 return 0;
357 bad:
358 return -EIO;
359 }
360
361 /*
362 * parse a normal reply, which may contain a (dir+)dentry and/or a
363 * target inode.
364 */
parse_reply_info_trace(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)365 static int parse_reply_info_trace(void **p, void *end,
366 struct ceph_mds_reply_info_parsed *info,
367 u64 features)
368 {
369 int err;
370
371 if (info->head->is_dentry) {
372 err = parse_reply_info_in(p, end, &info->diri, features);
373 if (err < 0)
374 goto out_bad;
375
376 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
377 if (err < 0)
378 goto out_bad;
379
380 ceph_decode_32_safe(p, end, info->dname_len, bad);
381 ceph_decode_need(p, end, info->dname_len, bad);
382 info->dname = *p;
383 *p += info->dname_len;
384
385 err = parse_reply_info_lease(p, end, &info->dlease, features,
386 &info->altname_len, &info->altname);
387 if (err < 0)
388 goto out_bad;
389 }
390
391 if (info->head->is_target) {
392 err = parse_reply_info_in(p, end, &info->targeti, features);
393 if (err < 0)
394 goto out_bad;
395 }
396
397 if (unlikely(*p != end))
398 goto bad;
399 return 0;
400
401 bad:
402 err = -EIO;
403 out_bad:
404 pr_err("problem parsing mds trace %d\n", err);
405 return err;
406 }
407
408 /*
409 * parse readdir results
410 */
parse_reply_info_readdir(void ** p,void * end,struct ceph_mds_request * req,u64 features)411 static int parse_reply_info_readdir(void **p, void *end,
412 struct ceph_mds_request *req,
413 u64 features)
414 {
415 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
416 struct ceph_client *cl = req->r_mdsc->fsc->client;
417 u32 num, i = 0;
418 int err;
419
420 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
421 if (err < 0)
422 goto out_bad;
423
424 ceph_decode_need(p, end, sizeof(num) + 2, bad);
425 num = ceph_decode_32(p);
426 {
427 u16 flags = ceph_decode_16(p);
428 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
429 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
430 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
431 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
432 }
433 if (num == 0)
434 goto done;
435
436 BUG_ON(!info->dir_entries);
437 if ((unsigned long)(info->dir_entries + num) >
438 (unsigned long)info->dir_entries + info->dir_buf_size) {
439 pr_err_client(cl, "dir contents are larger than expected\n");
440 WARN_ON(1);
441 goto bad;
442 }
443
444 info->dir_nr = num;
445 while (num) {
446 struct inode *inode = d_inode(req->r_dentry);
447 struct ceph_inode_info *ci = ceph_inode(inode);
448 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
449 struct fscrypt_str tname = FSTR_INIT(NULL, 0);
450 struct fscrypt_str oname = FSTR_INIT(NULL, 0);
451 struct ceph_fname fname;
452 u32 altname_len, _name_len;
453 u8 *altname, *_name;
454
455 /* dentry */
456 ceph_decode_32_safe(p, end, _name_len, bad);
457 ceph_decode_need(p, end, _name_len, bad);
458 _name = *p;
459 *p += _name_len;
460 doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name);
461
462 if (info->hash_order)
463 rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
464 _name, _name_len);
465
466 /* dentry lease */
467 err = parse_reply_info_lease(p, end, &rde->lease, features,
468 &altname_len, &altname);
469 if (err)
470 goto out_bad;
471
472 /*
473 * Try to dencrypt the dentry names and update them
474 * in the ceph_mds_reply_dir_entry struct.
475 */
476 fname.dir = inode;
477 fname.name = _name;
478 fname.name_len = _name_len;
479 fname.ctext = altname;
480 fname.ctext_len = altname_len;
481 /*
482 * The _name_len maybe larger than altname_len, such as
483 * when the human readable name length is in range of
484 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
485 * then the copy in ceph_fname_to_usr will corrupt the
486 * data if there has no encryption key.
487 *
488 * Just set the no_copy flag and then if there has no
489 * encryption key the oname.name will be assigned to
490 * _name always.
491 */
492 fname.no_copy = true;
493 if (altname_len == 0) {
494 /*
495 * Set tname to _name, and this will be used
496 * to do the base64_decode in-place. It's
497 * safe because the decoded string should
498 * always be shorter, which is 3/4 of origin
499 * string.
500 */
501 tname.name = _name;
502
503 /*
504 * Set oname to _name too, and this will be
505 * used to do the dencryption in-place.
506 */
507 oname.name = _name;
508 oname.len = _name_len;
509 } else {
510 /*
511 * This will do the decryption only in-place
512 * from altname cryptext directly.
513 */
514 oname.name = altname;
515 oname.len = altname_len;
516 }
517 rde->is_nokey = false;
518 err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
519 if (err) {
520 pr_err_client(cl, "unable to decode %.*s, got %d\n",
521 _name_len, _name, err);
522 goto out_bad;
523 }
524 rde->name = oname.name;
525 rde->name_len = oname.len;
526
527 /* inode */
528 err = parse_reply_info_in(p, end, &rde->inode, features);
529 if (err < 0)
530 goto out_bad;
531 /* ceph_readdir_prepopulate() will update it */
532 rde->offset = 0;
533 i++;
534 num--;
535 }
536
537 done:
538 /* Skip over any unrecognized fields */
539 *p = end;
540 return 0;
541
542 bad:
543 err = -EIO;
544 out_bad:
545 pr_err_client(cl, "problem parsing dir contents %d\n", err);
546 return err;
547 }
548
549 /*
550 * parse fcntl F_GETLK results
551 */
parse_reply_info_filelock(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)552 static int parse_reply_info_filelock(void **p, void *end,
553 struct ceph_mds_reply_info_parsed *info,
554 u64 features)
555 {
556 if (*p + sizeof(*info->filelock_reply) > end)
557 goto bad;
558
559 info->filelock_reply = *p;
560
561 /* Skip over any unrecognized fields */
562 *p = end;
563 return 0;
564 bad:
565 return -EIO;
566 }
567
568
569 #if BITS_PER_LONG == 64
570
571 #define DELEGATED_INO_AVAILABLE xa_mk_value(1)
572
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)573 static int ceph_parse_deleg_inos(void **p, void *end,
574 struct ceph_mds_session *s)
575 {
576 struct ceph_client *cl = s->s_mdsc->fsc->client;
577 u32 sets;
578
579 ceph_decode_32_safe(p, end, sets, bad);
580 doutc(cl, "got %u sets of delegated inodes\n", sets);
581 while (sets--) {
582 u64 start, len;
583
584 ceph_decode_64_safe(p, end, start, bad);
585 ceph_decode_64_safe(p, end, len, bad);
586
587 /* Don't accept a delegation of system inodes */
588 if (start < CEPH_INO_SYSTEM_BASE) {
589 pr_warn_ratelimited_client(cl,
590 "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
591 start, len);
592 continue;
593 }
594 while (len--) {
595 int err = xa_insert(&s->s_delegated_inos, start++,
596 DELEGATED_INO_AVAILABLE,
597 GFP_KERNEL);
598 if (!err) {
599 doutc(cl, "added delegated inode 0x%llx\n", start - 1);
600 } else if (err == -EBUSY) {
601 pr_warn_client(cl,
602 "MDS delegated inode 0x%llx more than once.\n",
603 start - 1);
604 } else {
605 return err;
606 }
607 }
608 }
609 return 0;
610 bad:
611 return -EIO;
612 }
613
ceph_get_deleg_ino(struct ceph_mds_session * s)614 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
615 {
616 unsigned long ino;
617 void *val;
618
619 xa_for_each(&s->s_delegated_inos, ino, val) {
620 val = xa_erase(&s->s_delegated_inos, ino);
621 if (val == DELEGATED_INO_AVAILABLE)
622 return ino;
623 }
624 return 0;
625 }
626
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)627 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
628 {
629 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
630 GFP_KERNEL);
631 }
632 #else /* BITS_PER_LONG == 64 */
633 /*
634 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
635 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
636 * and bottom words?
637 */
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)638 static int ceph_parse_deleg_inos(void **p, void *end,
639 struct ceph_mds_session *s)
640 {
641 u32 sets;
642
643 ceph_decode_32_safe(p, end, sets, bad);
644 if (sets)
645 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
646 return 0;
647 bad:
648 return -EIO;
649 }
650
ceph_get_deleg_ino(struct ceph_mds_session * s)651 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
652 {
653 return 0;
654 }
655
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)656 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
657 {
658 return 0;
659 }
660 #endif /* BITS_PER_LONG == 64 */
661
662 /*
663 * parse create results
664 */
parse_reply_info_create(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)665 static int parse_reply_info_create(void **p, void *end,
666 struct ceph_mds_reply_info_parsed *info,
667 u64 features, struct ceph_mds_session *s)
668 {
669 int ret;
670
671 if (features == (u64)-1 ||
672 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
673 if (*p == end) {
674 /* Malformed reply? */
675 info->has_create_ino = false;
676 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
677 info->has_create_ino = true;
678 /* struct_v, struct_compat, and len */
679 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
680 ceph_decode_64_safe(p, end, info->ino, bad);
681 ret = ceph_parse_deleg_inos(p, end, s);
682 if (ret)
683 return ret;
684 } else {
685 /* legacy */
686 ceph_decode_64_safe(p, end, info->ino, bad);
687 info->has_create_ino = true;
688 }
689 } else {
690 if (*p != end)
691 goto bad;
692 }
693
694 /* Skip over any unrecognized fields */
695 *p = end;
696 return 0;
697 bad:
698 return -EIO;
699 }
700
parse_reply_info_getvxattr(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)701 static int parse_reply_info_getvxattr(void **p, void *end,
702 struct ceph_mds_reply_info_parsed *info,
703 u64 features)
704 {
705 u32 value_len;
706
707 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
708 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
709 ceph_decode_skip_32(p, end, bad); /* skip payload length */
710
711 ceph_decode_32_safe(p, end, value_len, bad);
712
713 if (value_len == end - *p) {
714 info->xattr_info.xattr_value = *p;
715 info->xattr_info.xattr_value_len = value_len;
716 *p = end;
717 return value_len;
718 }
719 bad:
720 return -EIO;
721 }
722
723 /*
724 * parse extra results
725 */
parse_reply_info_extra(void ** p,void * end,struct ceph_mds_request * req,u64 features,struct ceph_mds_session * s)726 static int parse_reply_info_extra(void **p, void *end,
727 struct ceph_mds_request *req,
728 u64 features, struct ceph_mds_session *s)
729 {
730 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
731 u32 op = le32_to_cpu(info->head->op);
732
733 if (op == CEPH_MDS_OP_GETFILELOCK)
734 return parse_reply_info_filelock(p, end, info, features);
735 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
736 return parse_reply_info_readdir(p, end, req, features);
737 else if (op == CEPH_MDS_OP_CREATE)
738 return parse_reply_info_create(p, end, info, features, s);
739 else if (op == CEPH_MDS_OP_GETVXATTR)
740 return parse_reply_info_getvxattr(p, end, info, features);
741 else
742 return -EIO;
743 }
744
745 /*
746 * parse entire mds reply
747 */
parse_reply_info(struct ceph_mds_session * s,struct ceph_msg * msg,struct ceph_mds_request * req,u64 features)748 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
749 struct ceph_mds_request *req, u64 features)
750 {
751 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
752 struct ceph_client *cl = s->s_mdsc->fsc->client;
753 void *p, *end;
754 u32 len;
755 int err;
756
757 info->head = msg->front.iov_base;
758 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
759 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
760
761 /* trace */
762 ceph_decode_32_safe(&p, end, len, bad);
763 if (len > 0) {
764 ceph_decode_need(&p, end, len, bad);
765 err = parse_reply_info_trace(&p, p+len, info, features);
766 if (err < 0)
767 goto out_bad;
768 }
769
770 /* extra */
771 ceph_decode_32_safe(&p, end, len, bad);
772 if (len > 0) {
773 ceph_decode_need(&p, end, len, bad);
774 err = parse_reply_info_extra(&p, p+len, req, features, s);
775 if (err < 0)
776 goto out_bad;
777 }
778
779 /* snap blob */
780 ceph_decode_32_safe(&p, end, len, bad);
781 info->snapblob_len = len;
782 info->snapblob = p;
783 p += len;
784
785 if (p != end)
786 goto bad;
787 return 0;
788
789 bad:
790 err = -EIO;
791 out_bad:
792 pr_err_client(cl, "mds parse_reply err %d\n", err);
793 ceph_msg_dump(msg);
794 return err;
795 }
796
destroy_reply_info(struct ceph_mds_reply_info_parsed * info)797 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
798 {
799 int i;
800
801 kfree(info->diri.fscrypt_auth);
802 kfree(info->diri.fscrypt_file);
803 kfree(info->targeti.fscrypt_auth);
804 kfree(info->targeti.fscrypt_file);
805 if (!info->dir_entries)
806 return;
807
808 for (i = 0; i < info->dir_nr; i++) {
809 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
810
811 kfree(rde->inode.fscrypt_auth);
812 kfree(rde->inode.fscrypt_file);
813 }
814 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
815 }
816
817 /*
818 * In async unlink case the kclient won't wait for the first reply
819 * from MDS and just drop all the links and unhash the dentry and then
820 * succeeds immediately.
821 *
822 * For any new create/link/rename,etc requests followed by using the
823 * same file names we must wait for the first reply of the inflight
824 * unlink request, or the MDS possibly will fail these following
825 * requests with -EEXIST if the inflight async unlink request was
826 * delayed for some reasons.
827 *
828 * And the worst case is that for the none async openc request it will
829 * successfully open the file if the CDentry hasn't been unlinked yet,
830 * but later the previous delayed async unlink request will remove the
831 * CDentry. That means the just created file is possibly deleted later
832 * by accident.
833 *
834 * We need to wait for the inflight async unlink requests to finish
835 * when creating new files/directories by using the same file names.
836 */
ceph_wait_on_conflict_unlink(struct dentry * dentry)837 int ceph_wait_on_conflict_unlink(struct dentry *dentry)
838 {
839 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
840 struct ceph_client *cl = fsc->client;
841 struct dentry *pdentry = dentry->d_parent;
842 struct dentry *udentry, *found = NULL;
843 struct ceph_dentry_info *di;
844 struct qstr dname;
845 u32 hash = dentry->d_name.hash;
846 int err;
847
848 dname.name = dentry->d_name.name;
849 dname.len = dentry->d_name.len;
850
851 rcu_read_lock();
852 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
853 hnode, hash) {
854 udentry = di->dentry;
855
856 spin_lock(&udentry->d_lock);
857 if (udentry->d_name.hash != hash)
858 goto next;
859 if (unlikely(udentry->d_parent != pdentry))
860 goto next;
861 if (!hash_hashed(&di->hnode))
862 goto next;
863
864 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
865 pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n",
866 dentry, dentry);
867
868 if (!d_same_name(udentry, pdentry, &dname))
869 goto next;
870
871 found = dget_dlock(udentry);
872 spin_unlock(&udentry->d_lock);
873 break;
874 next:
875 spin_unlock(&udentry->d_lock);
876 }
877 rcu_read_unlock();
878
879 if (likely(!found))
880 return 0;
881
882 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry,
883 found, found);
884
885 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
886 TASK_KILLABLE);
887 dput(found);
888 return err;
889 }
890
891
892 /*
893 * sessions
894 */
ceph_session_state_name(int s)895 const char *ceph_session_state_name(int s)
896 {
897 switch (s) {
898 case CEPH_MDS_SESSION_NEW: return "new";
899 case CEPH_MDS_SESSION_OPENING: return "opening";
900 case CEPH_MDS_SESSION_OPEN: return "open";
901 case CEPH_MDS_SESSION_HUNG: return "hung";
902 case CEPH_MDS_SESSION_CLOSING: return "closing";
903 case CEPH_MDS_SESSION_CLOSED: return "closed";
904 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
905 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
906 case CEPH_MDS_SESSION_REJECTED: return "rejected";
907 default: return "???";
908 }
909 }
910
ceph_get_mds_session(struct ceph_mds_session * s)911 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
912 {
913 if (refcount_inc_not_zero(&s->s_ref))
914 return s;
915 return NULL;
916 }
917
ceph_put_mds_session(struct ceph_mds_session * s)918 void ceph_put_mds_session(struct ceph_mds_session *s)
919 {
920 if (IS_ERR_OR_NULL(s))
921 return;
922
923 if (refcount_dec_and_test(&s->s_ref)) {
924 if (s->s_auth.authorizer)
925 ceph_auth_destroy_authorizer(s->s_auth.authorizer);
926 WARN_ON(mutex_is_locked(&s->s_mutex));
927 xa_destroy(&s->s_delegated_inos);
928 kfree(s);
929 }
930 }
931
932 /*
933 * called under mdsc->mutex
934 */
__ceph_lookup_mds_session(struct ceph_mds_client * mdsc,int mds)935 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
936 int mds)
937 {
938 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
939 return NULL;
940 return ceph_get_mds_session(mdsc->sessions[mds]);
941 }
942
__have_session(struct ceph_mds_client * mdsc,int mds)943 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
944 {
945 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
946 return false;
947 else
948 return true;
949 }
950
__verify_registered_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)951 static int __verify_registered_session(struct ceph_mds_client *mdsc,
952 struct ceph_mds_session *s)
953 {
954 if (s->s_mds >= mdsc->max_sessions ||
955 mdsc->sessions[s->s_mds] != s)
956 return -ENOENT;
957 return 0;
958 }
959
960 /*
961 * create+register a new session for given mds.
962 * called under mdsc->mutex.
963 */
register_session(struct ceph_mds_client * mdsc,int mds)964 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
965 int mds)
966 {
967 struct ceph_client *cl = mdsc->fsc->client;
968 struct ceph_mds_session *s;
969
970 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
971 return ERR_PTR(-EIO);
972
973 if (mds >= mdsc->mdsmap->possible_max_rank)
974 return ERR_PTR(-EINVAL);
975
976 s = kzalloc_obj(*s, GFP_NOFS);
977 if (!s)
978 return ERR_PTR(-ENOMEM);
979
980 if (mds >= mdsc->max_sessions) {
981 int newmax = 1 << get_count_order(mds + 1);
982 struct ceph_mds_session **sa;
983 size_t ptr_size = sizeof(struct ceph_mds_session *);
984
985 doutc(cl, "realloc to %d\n", newmax);
986 sa = kcalloc(newmax, ptr_size, GFP_NOFS);
987 if (!sa)
988 goto fail_realloc;
989 if (mdsc->sessions) {
990 memcpy(sa, mdsc->sessions,
991 mdsc->max_sessions * ptr_size);
992 kfree(mdsc->sessions);
993 }
994 mdsc->sessions = sa;
995 mdsc->max_sessions = newmax;
996 }
997
998 doutc(cl, "mds%d\n", mds);
999 s->s_mdsc = mdsc;
1000 s->s_mds = mds;
1001 s->s_state = CEPH_MDS_SESSION_NEW;
1002 mutex_init(&s->s_mutex);
1003
1004 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
1005
1006 atomic_set(&s->s_cap_gen, 1);
1007 s->s_cap_ttl = jiffies - 1;
1008
1009 spin_lock_init(&s->s_cap_lock);
1010 INIT_LIST_HEAD(&s->s_caps);
1011 refcount_set(&s->s_ref, 1);
1012 INIT_LIST_HEAD(&s->s_waiting);
1013 INIT_LIST_HEAD(&s->s_unsafe);
1014 xa_init(&s->s_delegated_inos);
1015 INIT_LIST_HEAD(&s->s_cap_releases);
1016 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1017
1018 INIT_LIST_HEAD(&s->s_cap_dirty);
1019 INIT_LIST_HEAD(&s->s_cap_flushing);
1020
1021 mdsc->sessions[mds] = s;
1022 atomic_inc(&mdsc->num_sessions);
1023 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */
1024
1025 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1026 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1027
1028 return s;
1029
1030 fail_realloc:
1031 kfree(s);
1032 return ERR_PTR(-ENOMEM);
1033 }
1034
1035 /*
1036 * called under mdsc->mutex
1037 */
__unregister_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)1038 static void __unregister_session(struct ceph_mds_client *mdsc,
1039 struct ceph_mds_session *s)
1040 {
1041 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s);
1042 BUG_ON(mdsc->sessions[s->s_mds] != s);
1043 mdsc->sessions[s->s_mds] = NULL;
1044 ceph_con_close(&s->s_con);
1045 ceph_put_mds_session(s);
1046 atomic_dec(&mdsc->num_sessions);
1047 }
1048
1049 /*
1050 * drop session refs in request.
1051 *
1052 * should be last request ref, or hold mdsc->mutex
1053 */
put_request_session(struct ceph_mds_request * req)1054 static void put_request_session(struct ceph_mds_request *req)
1055 {
1056 if (req->r_session) {
1057 ceph_put_mds_session(req->r_session);
1058 req->r_session = NULL;
1059 }
1060 }
1061
ceph_mdsc_iterate_sessions(struct ceph_mds_client * mdsc,void (* cb)(struct ceph_mds_session *),bool check_state)1062 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1063 void (*cb)(struct ceph_mds_session *),
1064 bool check_state)
1065 {
1066 int mds;
1067
1068 mutex_lock(&mdsc->mutex);
1069 for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1070 struct ceph_mds_session *s;
1071
1072 s = __ceph_lookup_mds_session(mdsc, mds);
1073 if (!s)
1074 continue;
1075
1076 if (check_state && !check_session_state(s)) {
1077 ceph_put_mds_session(s);
1078 continue;
1079 }
1080
1081 mutex_unlock(&mdsc->mutex);
1082 cb(s);
1083 ceph_put_mds_session(s);
1084 mutex_lock(&mdsc->mutex);
1085 }
1086 mutex_unlock(&mdsc->mutex);
1087 }
1088
ceph_mdsc_release_request(struct kref * kref)1089 void ceph_mdsc_release_request(struct kref *kref)
1090 {
1091 struct ceph_mds_request *req = container_of(kref,
1092 struct ceph_mds_request,
1093 r_kref);
1094 ceph_mdsc_release_dir_caps_async(req);
1095 destroy_reply_info(&req->r_reply_info);
1096 if (req->r_request)
1097 ceph_msg_put(req->r_request);
1098 if (req->r_reply)
1099 ceph_msg_put(req->r_reply);
1100 if (req->r_inode) {
1101 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1102 iput(req->r_inode);
1103 }
1104 if (req->r_parent) {
1105 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1106 iput(req->r_parent);
1107 }
1108 iput(req->r_target_inode);
1109 iput(req->r_new_inode);
1110 if (req->r_dentry)
1111 dput(req->r_dentry);
1112 if (req->r_old_dentry)
1113 dput(req->r_old_dentry);
1114 if (req->r_old_dentry_dir) {
1115 /*
1116 * track (and drop pins for) r_old_dentry_dir
1117 * separately, since r_old_dentry's d_parent may have
1118 * changed between the dir mutex being dropped and
1119 * this request being freed.
1120 */
1121 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1122 CEPH_CAP_PIN);
1123 iput(req->r_old_dentry_dir);
1124 }
1125 kfree(req->r_path1);
1126 kfree(req->r_path2);
1127 put_cred(req->r_cred);
1128 if (req->r_mnt_idmap)
1129 mnt_idmap_put(req->r_mnt_idmap);
1130 if (req->r_pagelist)
1131 ceph_pagelist_release(req->r_pagelist);
1132 kfree(req->r_fscrypt_auth);
1133 kfree(req->r_altname);
1134 put_request_session(req);
1135 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1136 WARN_ON_ONCE(!list_empty(&req->r_wait));
1137 kmem_cache_free(ceph_mds_request_cachep, req);
1138 }
1139
DEFINE_RB_FUNCS(request,struct ceph_mds_request,r_tid,r_node)1140 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1141
1142 /*
1143 * lookup session, bump ref if found.
1144 *
1145 * called under mdsc->mutex.
1146 */
1147 static struct ceph_mds_request *
1148 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1149 {
1150 struct ceph_mds_request *req;
1151
1152 req = lookup_request(&mdsc->request_tree, tid);
1153 if (req)
1154 ceph_mdsc_get_request(req);
1155
1156 return req;
1157 }
1158
1159 /*
1160 * Register an in-flight request, and assign a tid. Link to directory
1161 * are modifying (if any).
1162 *
1163 * Called under mdsc->mutex.
1164 */
__register_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,struct inode * dir)1165 static void __register_request(struct ceph_mds_client *mdsc,
1166 struct ceph_mds_request *req,
1167 struct inode *dir)
1168 {
1169 struct ceph_client *cl = mdsc->fsc->client;
1170 int ret = 0;
1171
1172 req->r_tid = ++mdsc->last_tid;
1173 if (req->r_num_caps) {
1174 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1175 req->r_num_caps);
1176 if (ret < 0) {
1177 pr_err_client(cl, "%p failed to reserve caps: %d\n",
1178 req, ret);
1179 /* set req->r_err to fail early from __do_request */
1180 req->r_err = ret;
1181 return;
1182 }
1183 }
1184 doutc(cl, "%p tid %lld\n", req, req->r_tid);
1185 ceph_mdsc_get_request(req);
1186 insert_request(&mdsc->request_tree, req);
1187
1188 req->r_cred = get_current_cred();
1189 if (!req->r_mnt_idmap)
1190 req->r_mnt_idmap = &nop_mnt_idmap;
1191
1192 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1193 mdsc->oldest_tid = req->r_tid;
1194
1195 if (dir) {
1196 struct ceph_inode_info *ci = ceph_inode(dir);
1197
1198 ihold(dir);
1199 req->r_unsafe_dir = dir;
1200 spin_lock(&ci->i_unsafe_lock);
1201 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1202 spin_unlock(&ci->i_unsafe_lock);
1203 }
1204 }
1205
__unregister_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)1206 static void __unregister_request(struct ceph_mds_client *mdsc,
1207 struct ceph_mds_request *req)
1208 {
1209 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid);
1210
1211 /* Never leave an unregistered request on an unsafe list! */
1212 list_del_init(&req->r_unsafe_item);
1213
1214 if (req->r_tid == mdsc->oldest_tid) {
1215 struct rb_node *p = rb_next(&req->r_node);
1216 mdsc->oldest_tid = 0;
1217 while (p) {
1218 struct ceph_mds_request *next_req =
1219 rb_entry(p, struct ceph_mds_request, r_node);
1220 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1221 mdsc->oldest_tid = next_req->r_tid;
1222 break;
1223 }
1224 p = rb_next(p);
1225 }
1226 }
1227
1228 erase_request(&mdsc->request_tree, req);
1229
1230 if (req->r_unsafe_dir) {
1231 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1232 spin_lock(&ci->i_unsafe_lock);
1233 list_del_init(&req->r_unsafe_dir_item);
1234 spin_unlock(&ci->i_unsafe_lock);
1235 }
1236 if (req->r_target_inode &&
1237 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1238 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1239 spin_lock(&ci->i_unsafe_lock);
1240 list_del_init(&req->r_unsafe_target_item);
1241 spin_unlock(&ci->i_unsafe_lock);
1242 }
1243
1244 if (req->r_unsafe_dir) {
1245 iput(req->r_unsafe_dir);
1246 req->r_unsafe_dir = NULL;
1247 }
1248
1249 complete_all(&req->r_safe_completion);
1250
1251 ceph_mdsc_put_request(req);
1252 }
1253
1254 /*
1255 * Walk back up the dentry tree until we hit a dentry representing a
1256 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1257 * when calling this) to ensure that the objects won't disappear while we're
1258 * working with them. Once we hit a candidate dentry, we attempt to take a
1259 * reference to it, and return that as the result.
1260 */
get_nonsnap_parent(struct dentry * dentry)1261 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1262 {
1263 struct inode *inode = NULL;
1264
1265 while (dentry && !IS_ROOT(dentry)) {
1266 inode = d_inode_rcu(dentry);
1267 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1268 break;
1269 dentry = dentry->d_parent;
1270 }
1271 if (inode)
1272 inode = igrab(inode);
1273 return inode;
1274 }
1275
1276 /*
1277 * Choose mds to send request to next. If there is a hint set in the
1278 * request (e.g., due to a prior forward hint from the mds), use that.
1279 * Otherwise, consult frag tree and/or caps to identify the
1280 * appropriate mds. If all else fails, choose randomly.
1281 *
1282 * Called under mdsc->mutex.
1283 */
__choose_mds(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,bool * random)1284 static int __choose_mds(struct ceph_mds_client *mdsc,
1285 struct ceph_mds_request *req,
1286 bool *random)
1287 {
1288 struct inode *inode;
1289 struct ceph_inode_info *ci;
1290 struct ceph_cap *cap;
1291 int mode = req->r_direct_mode;
1292 int mds = -1;
1293 u32 hash = req->r_direct_hash;
1294 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1295 struct ceph_client *cl = mdsc->fsc->client;
1296
1297 if (random)
1298 *random = false;
1299
1300 /*
1301 * is there a specific mds we should try? ignore hint if we have
1302 * no session and the mds is not up (active or recovering).
1303 */
1304 if (req->r_resend_mds >= 0 &&
1305 (__have_session(mdsc, req->r_resend_mds) ||
1306 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1307 doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds);
1308 return req->r_resend_mds;
1309 }
1310
1311 if (mode == USE_RANDOM_MDS)
1312 goto random;
1313
1314 inode = NULL;
1315 if (req->r_inode) {
1316 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1317 inode = req->r_inode;
1318 ihold(inode);
1319 } else {
1320 /* req->r_dentry is non-null for LSSNAP request */
1321 rcu_read_lock();
1322 inode = get_nonsnap_parent(req->r_dentry);
1323 rcu_read_unlock();
1324 doutc(cl, "using snapdir's parent %p %llx.%llx\n",
1325 inode, ceph_vinop(inode));
1326 }
1327 } else if (req->r_dentry) {
1328 /* ignore race with rename; old or new d_parent is okay */
1329 struct dentry *parent;
1330 struct inode *dir;
1331
1332 rcu_read_lock();
1333 parent = READ_ONCE(req->r_dentry->d_parent);
1334 dir = req->r_parent ? : d_inode_rcu(parent);
1335
1336 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1337 /* not this fs or parent went negative */
1338 inode = d_inode(req->r_dentry);
1339 if (inode)
1340 ihold(inode);
1341 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1342 /* direct snapped/virtual snapdir requests
1343 * based on parent dir inode */
1344 inode = get_nonsnap_parent(parent);
1345 doutc(cl, "using nonsnap parent %p %llx.%llx\n",
1346 inode, ceph_vinop(inode));
1347 } else {
1348 /* dentry target */
1349 inode = d_inode(req->r_dentry);
1350 if (!inode || mode == USE_AUTH_MDS) {
1351 /* dir + name */
1352 inode = igrab(dir);
1353 hash = ceph_dentry_hash(dir, req->r_dentry);
1354 is_hash = true;
1355 } else {
1356 ihold(inode);
1357 }
1358 }
1359 rcu_read_unlock();
1360 }
1361
1362 if (!inode)
1363 goto random;
1364
1365 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode,
1366 ceph_vinop(inode), (int)is_hash, hash, mode);
1367 ci = ceph_inode(inode);
1368
1369 if (is_hash && S_ISDIR(inode->i_mode)) {
1370 struct ceph_inode_frag frag;
1371 int found;
1372
1373 ceph_choose_frag(ci, hash, &frag, &found);
1374 if (found) {
1375 if (mode == USE_ANY_MDS && frag.ndist > 0) {
1376 u8 r;
1377
1378 /* choose a random replica */
1379 get_random_bytes(&r, 1);
1380 r %= frag.ndist;
1381 mds = frag.dist[r];
1382 doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
1383 inode, ceph_vinop(inode), frag.frag,
1384 mds, (int)r, frag.ndist);
1385 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1386 CEPH_MDS_STATE_ACTIVE &&
1387 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1388 goto out;
1389 }
1390
1391 /* since this file/dir wasn't known to be
1392 * replicated, then we want to look for the
1393 * authoritative mds. */
1394 if (frag.mds >= 0) {
1395 /* choose auth mds */
1396 mds = frag.mds;
1397 doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
1398 inode, ceph_vinop(inode), frag.frag, mds);
1399 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1400 CEPH_MDS_STATE_ACTIVE) {
1401 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1402 mds))
1403 goto out;
1404 }
1405 }
1406 mode = USE_AUTH_MDS;
1407 }
1408 }
1409
1410 spin_lock(&ci->i_ceph_lock);
1411 cap = NULL;
1412 if (mode == USE_AUTH_MDS)
1413 cap = ci->i_auth_cap;
1414 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1415 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1416 if (!cap) {
1417 spin_unlock(&ci->i_ceph_lock);
1418 iput(inode);
1419 goto random;
1420 }
1421 mds = cap->session->s_mds;
1422 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
1423 ceph_vinop(inode), mds,
1424 cap == ci->i_auth_cap ? "auth " : "", cap);
1425 spin_unlock(&ci->i_ceph_lock);
1426 out:
1427 iput(inode);
1428 return mds;
1429
1430 random:
1431 if (random)
1432 *random = true;
1433
1434 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1435 doutc(cl, "chose random mds%d\n", mds);
1436 return mds;
1437 }
1438
1439
1440 /*
1441 * session messages
1442 */
ceph_create_session_msg(u32 op,u64 seq)1443 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1444 {
1445 struct ceph_msg *msg;
1446 struct ceph_mds_session_head *h;
1447
1448 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1449 false);
1450 if (!msg) {
1451 pr_err("ENOMEM creating session %s msg\n",
1452 ceph_session_op_name(op));
1453 return NULL;
1454 }
1455 h = msg->front.iov_base;
1456 h->op = cpu_to_le32(op);
1457 h->seq = cpu_to_le64(seq);
1458
1459 return msg;
1460 }
1461
1462 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1463 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
encode_supported_features(void ** p,void * end)1464 static int encode_supported_features(void **p, void *end)
1465 {
1466 static const size_t count = ARRAY_SIZE(feature_bits);
1467
1468 if (count > 0) {
1469 size_t i;
1470 size_t size = FEATURE_BYTES(count);
1471 unsigned long bit;
1472
1473 if (WARN_ON_ONCE(*p + 4 + size > end))
1474 return -ERANGE;
1475
1476 ceph_encode_32(p, size);
1477 memset(*p, 0, size);
1478 for (i = 0; i < count; i++) {
1479 bit = feature_bits[i];
1480 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1481 }
1482 *p += size;
1483 } else {
1484 if (WARN_ON_ONCE(*p + 4 > end))
1485 return -ERANGE;
1486
1487 ceph_encode_32(p, 0);
1488 }
1489
1490 return 0;
1491 }
1492
1493 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1494 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
encode_metric_spec(void ** p,void * end)1495 static int encode_metric_spec(void **p, void *end)
1496 {
1497 static const size_t count = ARRAY_SIZE(metric_bits);
1498
1499 /* header */
1500 if (WARN_ON_ONCE(*p + 2 > end))
1501 return -ERANGE;
1502
1503 ceph_encode_8(p, 1); /* version */
1504 ceph_encode_8(p, 1); /* compat */
1505
1506 if (count > 0) {
1507 size_t i;
1508 size_t size = METRIC_BYTES(count);
1509
1510 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1511 return -ERANGE;
1512
1513 /* metric spec info length */
1514 ceph_encode_32(p, 4 + size);
1515
1516 /* metric spec */
1517 ceph_encode_32(p, size);
1518 memset(*p, 0, size);
1519 for (i = 0; i < count; i++)
1520 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1521 *p += size;
1522 } else {
1523 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1524 return -ERANGE;
1525
1526 /* metric spec info length */
1527 ceph_encode_32(p, 4);
1528 /* metric spec */
1529 ceph_encode_32(p, 0);
1530 }
1531
1532 return 0;
1533 }
1534
1535 /*
1536 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1537 * to include additional client metadata fields.
1538 */
1539 static struct ceph_msg *
create_session_full_msg(struct ceph_mds_client * mdsc,int op,u64 seq)1540 create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq)
1541 {
1542 struct ceph_msg *msg;
1543 struct ceph_mds_session_head *h;
1544 int i;
1545 int extra_bytes = 0;
1546 int metadata_key_count = 0;
1547 struct ceph_options *opt = mdsc->fsc->client->options;
1548 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1549 struct ceph_client *cl = mdsc->fsc->client;
1550 size_t size, count;
1551 void *p, *end;
1552 int ret;
1553
1554 const char* metadata[][2] = {
1555 {"hostname", mdsc->nodename},
1556 {"kernel_version", init_utsname()->release},
1557 {"entity_id", opt->name ? : ""},
1558 {"root", fsopt->server_path ? : "/"},
1559 {NULL, NULL}
1560 };
1561
1562 /* Calculate serialized length of metadata */
1563 extra_bytes = 4; /* map length */
1564 for (i = 0; metadata[i][0]; ++i) {
1565 extra_bytes += 8 + strlen(metadata[i][0]) +
1566 strlen(metadata[i][1]);
1567 metadata_key_count++;
1568 }
1569
1570 /* supported feature */
1571 size = 0;
1572 count = ARRAY_SIZE(feature_bits);
1573 if (count > 0)
1574 size = FEATURE_BYTES(count);
1575 extra_bytes += 4 + size;
1576
1577 /* metric spec */
1578 size = 0;
1579 count = ARRAY_SIZE(metric_bits);
1580 if (count > 0)
1581 size = METRIC_BYTES(count);
1582 extra_bytes += 2 + 4 + 4 + size;
1583
1584 /* flags, mds auth caps and oldest_client_tid */
1585 extra_bytes += 4 + 4 + 8;
1586
1587 /* Allocate the message */
1588 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1589 GFP_NOFS, false);
1590 if (!msg) {
1591 pr_err_client(cl, "ENOMEM creating session open msg\n");
1592 return ERR_PTR(-ENOMEM);
1593 }
1594 p = msg->front.iov_base;
1595 end = p + msg->front.iov_len;
1596
1597 h = p;
1598 h->op = cpu_to_le32(op);
1599 h->seq = cpu_to_le64(seq);
1600
1601 /*
1602 * Serialize client metadata into waiting buffer space, using
1603 * the format that userspace expects for map<string, string>
1604 *
1605 * ClientSession messages with metadata are v7
1606 */
1607 msg->hdr.version = cpu_to_le16(7);
1608 msg->hdr.compat_version = cpu_to_le16(1);
1609
1610 /* The write pointer, following the session_head structure */
1611 p += sizeof(*h);
1612
1613 /* Number of entries in the map */
1614 ceph_encode_32(&p, metadata_key_count);
1615
1616 /* Two length-prefixed strings for each entry in the map */
1617 for (i = 0; metadata[i][0]; ++i) {
1618 size_t const key_len = strlen(metadata[i][0]);
1619 size_t const val_len = strlen(metadata[i][1]);
1620
1621 ceph_encode_32(&p, key_len);
1622 memcpy(p, metadata[i][0], key_len);
1623 p += key_len;
1624 ceph_encode_32(&p, val_len);
1625 memcpy(p, metadata[i][1], val_len);
1626 p += val_len;
1627 }
1628
1629 ret = encode_supported_features(&p, end);
1630 if (ret) {
1631 pr_err_client(cl, "encode_supported_features failed!\n");
1632 ceph_msg_put(msg);
1633 return ERR_PTR(ret);
1634 }
1635
1636 ret = encode_metric_spec(&p, end);
1637 if (ret) {
1638 pr_err_client(cl, "encode_metric_spec failed!\n");
1639 ceph_msg_put(msg);
1640 return ERR_PTR(ret);
1641 }
1642
1643 /* version == 5, flags */
1644 ceph_encode_32(&p, 0);
1645
1646 /* version == 6, mds auth caps */
1647 ceph_encode_32(&p, 0);
1648
1649 /* version == 7, oldest_client_tid */
1650 ceph_encode_64(&p, mdsc->oldest_tid);
1651
1652 msg->front.iov_len = p - msg->front.iov_base;
1653 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1654
1655 return msg;
1656 }
1657
1658 /*
1659 * send session open request.
1660 *
1661 * called under mdsc->mutex
1662 */
__open_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1663 static int __open_session(struct ceph_mds_client *mdsc,
1664 struct ceph_mds_session *session)
1665 {
1666 struct ceph_msg *msg;
1667 int mstate;
1668 int mds = session->s_mds;
1669
1670 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1671 return -EIO;
1672
1673 /* wait for mds to go active? */
1674 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1675 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
1676 ceph_mds_state_name(mstate));
1677 session->s_state = CEPH_MDS_SESSION_OPENING;
1678 session->s_renew_requested = jiffies;
1679
1680 /* send connect message */
1681 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN,
1682 session->s_seq);
1683 if (IS_ERR(msg))
1684 return PTR_ERR(msg);
1685 ceph_con_send(&session->s_con, msg);
1686 return 0;
1687 }
1688
1689 /*
1690 * open sessions for any export targets for the given mds
1691 *
1692 * called under mdsc->mutex
1693 */
1694 static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client * mdsc,int target)1695 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1696 {
1697 struct ceph_mds_session *session;
1698 int ret;
1699
1700 session = __ceph_lookup_mds_session(mdsc, target);
1701 if (!session) {
1702 session = register_session(mdsc, target);
1703 if (IS_ERR(session))
1704 return session;
1705 }
1706 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1707 session->s_state == CEPH_MDS_SESSION_CLOSING) {
1708 ret = __open_session(mdsc, session);
1709 if (ret)
1710 return ERR_PTR(ret);
1711 }
1712
1713 return session;
1714 }
1715
1716 struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client * mdsc,int target)1717 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1718 {
1719 struct ceph_mds_session *session;
1720 struct ceph_client *cl = mdsc->fsc->client;
1721
1722 doutc(cl, "to mds%d\n", target);
1723
1724 mutex_lock(&mdsc->mutex);
1725 session = __open_export_target_session(mdsc, target);
1726 mutex_unlock(&mdsc->mutex);
1727
1728 return session;
1729 }
1730
__open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1731 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1732 struct ceph_mds_session *session)
1733 {
1734 struct ceph_mds_info *mi;
1735 struct ceph_mds_session *ts;
1736 int i, mds = session->s_mds;
1737 struct ceph_client *cl = mdsc->fsc->client;
1738
1739 if (mds >= mdsc->mdsmap->possible_max_rank)
1740 return;
1741
1742 mi = &mdsc->mdsmap->m_info[mds];
1743 doutc(cl, "for mds%d (%d targets)\n", session->s_mds,
1744 mi->num_export_targets);
1745
1746 for (i = 0; i < mi->num_export_targets; i++) {
1747 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1748 ceph_put_mds_session(ts);
1749 }
1750 }
1751
1752 /*
1753 * session caps
1754 */
1755
detach_cap_releases(struct ceph_mds_session * session,struct list_head * target)1756 static void detach_cap_releases(struct ceph_mds_session *session,
1757 struct list_head *target)
1758 {
1759 struct ceph_client *cl = session->s_mdsc->fsc->client;
1760
1761 lockdep_assert_held(&session->s_cap_lock);
1762
1763 list_splice_init(&session->s_cap_releases, target);
1764 session->s_num_cap_releases = 0;
1765 doutc(cl, "mds%d\n", session->s_mds);
1766 }
1767
dispose_cap_releases(struct ceph_mds_client * mdsc,struct list_head * dispose)1768 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1769 struct list_head *dispose)
1770 {
1771 while (!list_empty(dispose)) {
1772 struct ceph_cap *cap;
1773 /* zero out the in-progress message */
1774 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1775 list_del(&cap->session_caps);
1776 ceph_put_cap(mdsc, cap);
1777 }
1778 }
1779
cleanup_session_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1780 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1781 struct ceph_mds_session *session)
1782 {
1783 struct ceph_client *cl = mdsc->fsc->client;
1784 struct ceph_mds_request *req;
1785 struct rb_node *p;
1786
1787 doutc(cl, "mds%d\n", session->s_mds);
1788 mutex_lock(&mdsc->mutex);
1789 while (!list_empty(&session->s_unsafe)) {
1790 req = list_first_entry(&session->s_unsafe,
1791 struct ceph_mds_request, r_unsafe_item);
1792 pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
1793 req->r_tid);
1794 if (req->r_target_inode)
1795 mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1796 if (req->r_unsafe_dir)
1797 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1798 __unregister_request(mdsc, req);
1799 }
1800 /* zero r_attempts, so kick_requests() will re-send requests */
1801 p = rb_first(&mdsc->request_tree);
1802 while (p) {
1803 req = rb_entry(p, struct ceph_mds_request, r_node);
1804 p = rb_next(p);
1805 if (req->r_session &&
1806 req->r_session->s_mds == session->s_mds)
1807 req->r_attempts = 0;
1808 }
1809 mutex_unlock(&mdsc->mutex);
1810 }
1811
1812 /*
1813 * Helper to safely iterate over all caps associated with a session, with
1814 * special care taken to handle a racing __ceph_remove_cap().
1815 *
1816 * Caller must hold session s_mutex.
1817 */
ceph_iterate_session_caps(struct ceph_mds_session * session,int (* cb)(struct inode *,int mds,void *),void * arg)1818 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1819 int (*cb)(struct inode *, int mds, void *),
1820 void *arg)
1821 {
1822 struct ceph_client *cl = session->s_mdsc->fsc->client;
1823 struct list_head *p;
1824 struct ceph_cap *cap;
1825 struct inode *inode, *last_inode = NULL;
1826 struct ceph_cap *old_cap = NULL;
1827 int ret;
1828
1829 doutc(cl, "%p mds%d\n", session, session->s_mds);
1830 spin_lock(&session->s_cap_lock);
1831 p = session->s_caps.next;
1832 while (p != &session->s_caps) {
1833 int mds;
1834
1835 cap = list_entry(p, struct ceph_cap, session_caps);
1836 inode = igrab(&cap->ci->netfs.inode);
1837 if (!inode) {
1838 p = p->next;
1839 continue;
1840 }
1841 session->s_cap_iterator = cap;
1842 mds = cap->mds;
1843 spin_unlock(&session->s_cap_lock);
1844
1845 if (last_inode) {
1846 iput(last_inode);
1847 last_inode = NULL;
1848 }
1849 if (old_cap) {
1850 ceph_put_cap(session->s_mdsc, old_cap);
1851 old_cap = NULL;
1852 }
1853
1854 ret = cb(inode, mds, arg);
1855 last_inode = inode;
1856
1857 spin_lock(&session->s_cap_lock);
1858 p = p->next;
1859 if (!cap->ci) {
1860 doutc(cl, "finishing cap %p removal\n", cap);
1861 BUG_ON(cap->session != session);
1862 cap->session = NULL;
1863 list_del_init(&cap->session_caps);
1864 session->s_nr_caps--;
1865 atomic64_dec(&session->s_mdsc->metric.total_caps);
1866 if (cap->queue_release)
1867 __ceph_queue_cap_release(session, cap);
1868 else
1869 old_cap = cap; /* put_cap it w/o locks held */
1870 }
1871 if (ret < 0)
1872 goto out;
1873 }
1874 ret = 0;
1875 out:
1876 session->s_cap_iterator = NULL;
1877 spin_unlock(&session->s_cap_lock);
1878
1879 iput(last_inode);
1880 if (old_cap)
1881 ceph_put_cap(session->s_mdsc, old_cap);
1882
1883 return ret;
1884 }
1885
remove_session_caps_cb(struct inode * inode,int mds,void * arg)1886 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1887 {
1888 struct ceph_inode_info *ci = ceph_inode(inode);
1889 struct ceph_client *cl = ceph_inode_to_client(inode);
1890 bool invalidate = false;
1891 struct ceph_cap *cap;
1892 int iputs = 0;
1893
1894 spin_lock(&ci->i_ceph_lock);
1895 cap = __get_cap_for_mds(ci, mds);
1896 if (cap) {
1897 doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
1898 cap, ci, &ci->netfs.inode);
1899
1900 iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1901 }
1902 spin_unlock(&ci->i_ceph_lock);
1903
1904 if (cap)
1905 wake_up_all(&ci->i_cap_wq);
1906 if (invalidate)
1907 ceph_queue_invalidate(inode);
1908 while (iputs--)
1909 iput(inode);
1910 return 0;
1911 }
1912
1913 /*
1914 * caller must hold session s_mutex
1915 */
remove_session_caps(struct ceph_mds_session * session)1916 static void remove_session_caps(struct ceph_mds_session *session)
1917 {
1918 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1919 struct super_block *sb = fsc->sb;
1920 LIST_HEAD(dispose);
1921
1922 doutc(fsc->client, "on %p\n", session);
1923 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1924
1925 wake_up_all(&fsc->mdsc->cap_flushing_wq);
1926
1927 spin_lock(&session->s_cap_lock);
1928 if (session->s_nr_caps > 0) {
1929 struct inode *inode;
1930 struct ceph_cap *cap, *prev = NULL;
1931 struct ceph_vino vino;
1932 /*
1933 * iterate_session_caps() skips inodes that are being
1934 * deleted, we need to wait until deletions are complete.
1935 * __wait_on_freeing_inode() is designed for the job,
1936 * but it is not exported, so use lookup inode function
1937 * to access it.
1938 */
1939 while (!list_empty(&session->s_caps)) {
1940 cap = list_entry(session->s_caps.next,
1941 struct ceph_cap, session_caps);
1942 if (cap == prev)
1943 break;
1944 prev = cap;
1945 vino = cap->ci->i_vino;
1946 spin_unlock(&session->s_cap_lock);
1947
1948 inode = ceph_find_inode(sb, vino);
1949 iput(inode);
1950
1951 spin_lock(&session->s_cap_lock);
1952 }
1953 }
1954
1955 // drop cap expires and unlock s_cap_lock
1956 detach_cap_releases(session, &dispose);
1957
1958 BUG_ON(session->s_nr_caps > 0);
1959 BUG_ON(!list_empty(&session->s_cap_flushing));
1960 spin_unlock(&session->s_cap_lock);
1961 dispose_cap_releases(session->s_mdsc, &dispose);
1962 }
1963
1964 enum {
1965 RECONNECT,
1966 RENEWCAPS,
1967 FORCE_RO,
1968 };
1969
1970 /*
1971 * wake up any threads waiting on this session's caps. if the cap is
1972 * old (didn't get renewed on the client reconnect), remove it now.
1973 *
1974 * caller must hold s_mutex.
1975 */
wake_up_session_cb(struct inode * inode,int mds,void * arg)1976 static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1977 {
1978 struct ceph_inode_info *ci = ceph_inode(inode);
1979 unsigned long ev = (unsigned long)arg;
1980
1981 if (ev == RECONNECT) {
1982 spin_lock(&ci->i_ceph_lock);
1983 ci->i_wanted_max_size = 0;
1984 ci->i_requested_max_size = 0;
1985 spin_unlock(&ci->i_ceph_lock);
1986 } else if (ev == RENEWCAPS) {
1987 struct ceph_cap *cap;
1988
1989 spin_lock(&ci->i_ceph_lock);
1990 cap = __get_cap_for_mds(ci, mds);
1991 /* mds did not re-issue stale cap */
1992 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1993 cap->issued = cap->implemented = CEPH_CAP_PIN;
1994 spin_unlock(&ci->i_ceph_lock);
1995 } else if (ev == FORCE_RO) {
1996 }
1997 wake_up_all(&ci->i_cap_wq);
1998 return 0;
1999 }
2000
wake_up_session_caps(struct ceph_mds_session * session,int ev)2001 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
2002 {
2003 struct ceph_client *cl = session->s_mdsc->fsc->client;
2004
2005 doutc(cl, "session %p mds%d\n", session, session->s_mds);
2006 ceph_iterate_session_caps(session, wake_up_session_cb,
2007 (void *)(unsigned long)ev);
2008 }
2009
2010 /*
2011 * Send periodic message to MDS renewing all currently held caps. The
2012 * ack will reset the expiration for all caps from this session.
2013 *
2014 * caller holds s_mutex
2015 */
send_renew_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2016 static int send_renew_caps(struct ceph_mds_client *mdsc,
2017 struct ceph_mds_session *session)
2018 {
2019 struct ceph_client *cl = mdsc->fsc->client;
2020 struct ceph_msg *msg;
2021 int state;
2022
2023 if (time_after_eq(jiffies, session->s_cap_ttl) &&
2024 time_after_eq(session->s_cap_ttl, session->s_renew_requested))
2025 pr_info_client(cl, "mds%d caps stale\n", session->s_mds);
2026 session->s_renew_requested = jiffies;
2027
2028 /* do not try to renew caps until a recovering mds has reconnected
2029 * with its clients. */
2030 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
2031 if (state < CEPH_MDS_STATE_RECONNECT) {
2032 doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
2033 ceph_mds_state_name(state));
2034 return 0;
2035 }
2036
2037 doutc(cl, "to mds%d (%s)\n", session->s_mds,
2038 ceph_mds_state_name(state));
2039 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS,
2040 ++session->s_renew_seq);
2041 if (IS_ERR(msg))
2042 return PTR_ERR(msg);
2043 ceph_con_send(&session->s_con, msg);
2044 return 0;
2045 }
2046
send_flushmsg_ack(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,u64 seq)2047 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2048 struct ceph_mds_session *session, u64 seq)
2049 {
2050 struct ceph_client *cl = mdsc->fsc->client;
2051 struct ceph_msg *msg;
2052
2053 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds,
2054 ceph_session_state_name(session->s_state), seq);
2055 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2056 if (!msg)
2057 return -ENOMEM;
2058 ceph_con_send(&session->s_con, msg);
2059 return 0;
2060 }
2061
2062
2063 /*
2064 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
2065 *
2066 * Called under session->s_mutex
2067 */
renewed_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int is_renew)2068 static void renewed_caps(struct ceph_mds_client *mdsc,
2069 struct ceph_mds_session *session, int is_renew)
2070 {
2071 struct ceph_client *cl = mdsc->fsc->client;
2072 int was_stale;
2073 int wake = 0;
2074
2075 spin_lock(&session->s_cap_lock);
2076 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2077
2078 session->s_cap_ttl = session->s_renew_requested +
2079 mdsc->mdsmap->m_session_timeout*HZ;
2080
2081 if (was_stale) {
2082 if (time_before(jiffies, session->s_cap_ttl)) {
2083 pr_info_client(cl, "mds%d caps renewed\n",
2084 session->s_mds);
2085 wake = 1;
2086 } else {
2087 pr_info_client(cl, "mds%d caps still stale\n",
2088 session->s_mds);
2089 }
2090 }
2091 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds,
2092 session->s_cap_ttl, was_stale ? "stale" : "fresh",
2093 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2094 spin_unlock(&session->s_cap_lock);
2095
2096 if (wake)
2097 wake_up_session_caps(session, RENEWCAPS);
2098 }
2099
2100 /*
2101 * send a session close request
2102 */
request_close_session(struct ceph_mds_session * session)2103 static int request_close_session(struct ceph_mds_session *session)
2104 {
2105 struct ceph_client *cl = session->s_mdsc->fsc->client;
2106 struct ceph_msg *msg;
2107
2108 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds,
2109 ceph_session_state_name(session->s_state), session->s_seq);
2110 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2111 session->s_seq);
2112 if (!msg)
2113 return -ENOMEM;
2114 ceph_con_send(&session->s_con, msg);
2115 return 1;
2116 }
2117
2118 /*
2119 * Called with s_mutex held.
2120 */
__close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2121 static int __close_session(struct ceph_mds_client *mdsc,
2122 struct ceph_mds_session *session)
2123 {
2124 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2125 return 0;
2126 session->s_state = CEPH_MDS_SESSION_CLOSING;
2127 return request_close_session(session);
2128 }
2129
drop_negative_children(struct dentry * dentry)2130 static bool drop_negative_children(struct dentry *dentry)
2131 {
2132 struct dentry *child;
2133 bool all_negative = true;
2134
2135 if (!d_is_dir(dentry))
2136 goto out;
2137
2138 spin_lock(&dentry->d_lock);
2139 hlist_for_each_entry(child, &dentry->d_children, d_sib) {
2140 if (d_really_is_positive(child)) {
2141 all_negative = false;
2142 break;
2143 }
2144 }
2145 spin_unlock(&dentry->d_lock);
2146
2147 if (all_negative)
2148 shrink_dcache_parent(dentry);
2149 out:
2150 return all_negative;
2151 }
2152
2153 /*
2154 * Trim old(er) caps.
2155 *
2156 * Because we can't cache an inode without one or more caps, we do
2157 * this indirectly: if a cap is unused, we prune its aliases, at which
2158 * point the inode will hopefully get dropped to.
2159 *
2160 * Yes, this is a bit sloppy. Our only real goal here is to respond to
2161 * memory pressure from the MDS, though, so it needn't be perfect.
2162 */
trim_caps_cb(struct inode * inode,int mds,void * arg)2163 static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2164 {
2165 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2166 struct ceph_client *cl = mdsc->fsc->client;
2167 int *remaining = arg;
2168 struct ceph_inode_info *ci = ceph_inode(inode);
2169 int used, wanted, oissued, mine;
2170 struct ceph_cap *cap;
2171
2172 if (*remaining <= 0)
2173 return -1;
2174
2175 spin_lock(&ci->i_ceph_lock);
2176 cap = __get_cap_for_mds(ci, mds);
2177 if (!cap) {
2178 spin_unlock(&ci->i_ceph_lock);
2179 return 0;
2180 }
2181 mine = cap->issued | cap->implemented;
2182 used = __ceph_caps_used(ci);
2183 wanted = __ceph_caps_file_wanted(ci);
2184 oissued = __ceph_caps_issued_other(ci, cap);
2185
2186 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
2187 inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
2188 ceph_cap_string(oissued), ceph_cap_string(used),
2189 ceph_cap_string(wanted));
2190 if (cap == ci->i_auth_cap) {
2191 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2192 !list_empty(&ci->i_cap_snaps))
2193 goto out;
2194 if ((used | wanted) & CEPH_CAP_ANY_WR)
2195 goto out;
2196 /* Note: it's possible that i_filelock_ref becomes non-zero
2197 * after dropping auth caps. It doesn't hurt because reply
2198 * of lock mds request will re-add auth caps. */
2199 if (atomic_read(&ci->i_filelock_ref) > 0)
2200 goto out;
2201 }
2202 /* The inode has cached pages, but it's no longer used.
2203 * we can safely drop it */
2204 if (S_ISREG(inode->i_mode) &&
2205 wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2206 !(oissued & CEPH_CAP_FILE_CACHE)) {
2207 used = 0;
2208 oissued = 0;
2209 }
2210 if ((used | wanted) & ~oissued & mine)
2211 goto out; /* we need these caps */
2212
2213 if (oissued) {
2214 /* we aren't the only cap.. just remove us */
2215 ceph_remove_cap(mdsc, cap, true);
2216 (*remaining)--;
2217 } else {
2218 struct dentry *dentry;
2219 /* try dropping referring dentries */
2220 spin_unlock(&ci->i_ceph_lock);
2221 dentry = d_find_any_alias(inode);
2222 if (dentry && drop_negative_children(dentry)) {
2223 int count;
2224 dput(dentry);
2225 d_prune_aliases(inode);
2226 count = icount_read(inode);
2227 if (count == 1)
2228 (*remaining)--;
2229 doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
2230 inode, ceph_vinop(inode), cap, count);
2231 } else {
2232 dput(dentry);
2233 }
2234 return 0;
2235 }
2236
2237 out:
2238 spin_unlock(&ci->i_ceph_lock);
2239 return 0;
2240 }
2241
2242 /*
2243 * Trim session cap count down to some max number.
2244 */
ceph_trim_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int max_caps)2245 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2246 struct ceph_mds_session *session,
2247 int max_caps)
2248 {
2249 struct ceph_client *cl = mdsc->fsc->client;
2250 int trim_caps = session->s_nr_caps - max_caps;
2251
2252 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
2253 session->s_nr_caps, max_caps, trim_caps);
2254 if (trim_caps > 0) {
2255 int remaining = trim_caps;
2256
2257 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2258 doutc(cl, "mds%d done: %d / %d, trimmed %d\n",
2259 session->s_mds, session->s_nr_caps, max_caps,
2260 trim_caps - remaining);
2261 }
2262
2263 ceph_flush_session_cap_releases(mdsc, session);
2264 return 0;
2265 }
2266
check_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2267 static int check_caps_flush(struct ceph_mds_client *mdsc,
2268 u64 want_flush_tid)
2269 {
2270 struct ceph_client *cl = mdsc->fsc->client;
2271 int ret = 1;
2272
2273 spin_lock(&mdsc->cap_dirty_lock);
2274 if (!list_empty(&mdsc->cap_flush_list)) {
2275 struct ceph_cap_flush *cf =
2276 list_first_entry(&mdsc->cap_flush_list,
2277 struct ceph_cap_flush, g_list);
2278 if (cf->tid <= want_flush_tid) {
2279 doutc(cl, "still flushing tid %llu <= %llu\n",
2280 cf->tid, want_flush_tid);
2281 ret = 0;
2282 }
2283 }
2284 spin_unlock(&mdsc->cap_dirty_lock);
2285 return ret;
2286 }
2287
2288 /*
2289 * flush all dirty inode data to disk.
2290 *
2291 * returns true if we've flushed through want_flush_tid
2292 */
wait_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2293 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2294 u64 want_flush_tid)
2295 {
2296 struct ceph_client *cl = mdsc->fsc->client;
2297
2298 doutc(cl, "want %llu\n", want_flush_tid);
2299
2300 wait_event(mdsc->cap_flushing_wq,
2301 check_caps_flush(mdsc, want_flush_tid));
2302
2303 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid);
2304 }
2305
2306 /*
2307 * called under s_mutex
2308 */
ceph_send_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2309 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2310 struct ceph_mds_session *session)
2311 {
2312 struct ceph_client *cl = mdsc->fsc->client;
2313 struct ceph_msg *msg = NULL;
2314 struct ceph_mds_cap_release *head;
2315 struct ceph_mds_cap_item *item;
2316 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2317 struct ceph_cap *cap;
2318 LIST_HEAD(tmp_list);
2319 int num_cap_releases;
2320 __le32 barrier, *cap_barrier;
2321
2322 down_read(&osdc->lock);
2323 barrier = cpu_to_le32(osdc->epoch_barrier);
2324 up_read(&osdc->lock);
2325
2326 spin_lock(&session->s_cap_lock);
2327 again:
2328 list_splice_init(&session->s_cap_releases, &tmp_list);
2329 num_cap_releases = session->s_num_cap_releases;
2330 session->s_num_cap_releases = 0;
2331 spin_unlock(&session->s_cap_lock);
2332
2333 while (!list_empty(&tmp_list)) {
2334 if (!msg) {
2335 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2336 PAGE_SIZE, GFP_NOFS, false);
2337 if (!msg)
2338 goto out_err;
2339 head = msg->front.iov_base;
2340 head->num = cpu_to_le32(0);
2341 msg->front.iov_len = sizeof(*head);
2342
2343 msg->hdr.version = cpu_to_le16(2);
2344 msg->hdr.compat_version = cpu_to_le16(1);
2345 }
2346
2347 cap = list_first_entry(&tmp_list, struct ceph_cap,
2348 session_caps);
2349 list_del(&cap->session_caps);
2350 num_cap_releases--;
2351
2352 head = msg->front.iov_base;
2353 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2354 &head->num);
2355 item = msg->front.iov_base + msg->front.iov_len;
2356 item->ino = cpu_to_le64(cap->cap_ino);
2357 item->cap_id = cpu_to_le64(cap->cap_id);
2358 item->migrate_seq = cpu_to_le32(cap->mseq);
2359 item->issue_seq = cpu_to_le32(cap->issue_seq);
2360 msg->front.iov_len += sizeof(*item);
2361
2362 ceph_put_cap(mdsc, cap);
2363
2364 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2365 // Append cap_barrier field
2366 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2367 *cap_barrier = barrier;
2368 msg->front.iov_len += sizeof(*cap_barrier);
2369
2370 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2371 doutc(cl, "mds%d %p\n", session->s_mds, msg);
2372 ceph_con_send(&session->s_con, msg);
2373 msg = NULL;
2374 }
2375 }
2376
2377 BUG_ON(num_cap_releases != 0);
2378
2379 spin_lock(&session->s_cap_lock);
2380 if (!list_empty(&session->s_cap_releases))
2381 goto again;
2382 spin_unlock(&session->s_cap_lock);
2383
2384 if (msg) {
2385 // Append cap_barrier field
2386 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2387 *cap_barrier = barrier;
2388 msg->front.iov_len += sizeof(*cap_barrier);
2389
2390 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2391 doutc(cl, "mds%d %p\n", session->s_mds, msg);
2392 ceph_con_send(&session->s_con, msg);
2393 }
2394 return;
2395 out_err:
2396 pr_err_client(cl, "mds%d, failed to allocate message\n",
2397 session->s_mds);
2398 spin_lock(&session->s_cap_lock);
2399 list_splice(&tmp_list, &session->s_cap_releases);
2400 session->s_num_cap_releases += num_cap_releases;
2401 spin_unlock(&session->s_cap_lock);
2402 }
2403
ceph_cap_release_work(struct work_struct * work)2404 static void ceph_cap_release_work(struct work_struct *work)
2405 {
2406 struct ceph_mds_session *session =
2407 container_of(work, struct ceph_mds_session, s_cap_release_work);
2408
2409 mutex_lock(&session->s_mutex);
2410 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2411 session->s_state == CEPH_MDS_SESSION_HUNG)
2412 ceph_send_cap_releases(session->s_mdsc, session);
2413 mutex_unlock(&session->s_mutex);
2414 ceph_put_mds_session(session);
2415 }
2416
ceph_flush_session_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2417 void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc,
2418 struct ceph_mds_session *session)
2419 {
2420 struct ceph_client *cl = mdsc->fsc->client;
2421 if (mdsc->stopping)
2422 return;
2423
2424 ceph_get_mds_session(session);
2425 if (queue_work(mdsc->fsc->cap_wq,
2426 &session->s_cap_release_work)) {
2427 doutc(cl, "cap release work queued\n");
2428 } else {
2429 ceph_put_mds_session(session);
2430 doutc(cl, "failed to queue cap release work\n");
2431 }
2432 }
2433
2434 /*
2435 * caller holds session->s_cap_lock
2436 */
__ceph_queue_cap_release(struct ceph_mds_session * session,struct ceph_cap * cap)2437 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2438 struct ceph_cap *cap)
2439 {
2440 list_add_tail(&cap->session_caps, &session->s_cap_releases);
2441 session->s_num_cap_releases++;
2442
2443 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2444 ceph_flush_session_cap_releases(session->s_mdsc, session);
2445 }
2446
ceph_cap_reclaim_work(struct work_struct * work)2447 static void ceph_cap_reclaim_work(struct work_struct *work)
2448 {
2449 struct ceph_mds_client *mdsc =
2450 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2451 int ret = ceph_trim_dentries(mdsc);
2452 if (ret == -EAGAIN)
2453 ceph_queue_cap_reclaim_work(mdsc);
2454 }
2455
ceph_queue_cap_reclaim_work(struct ceph_mds_client * mdsc)2456 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2457 {
2458 struct ceph_client *cl = mdsc->fsc->client;
2459 if (mdsc->stopping)
2460 return;
2461
2462 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2463 doutc(cl, "caps reclaim work queued\n");
2464 } else {
2465 doutc(cl, "failed to queue caps release work\n");
2466 }
2467 }
2468
ceph_reclaim_caps_nr(struct ceph_mds_client * mdsc,int nr)2469 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2470 {
2471 int val;
2472 if (!nr)
2473 return;
2474 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2475 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2476 atomic_set(&mdsc->cap_reclaim_pending, 0);
2477 ceph_queue_cap_reclaim_work(mdsc);
2478 }
2479 }
2480
ceph_queue_cap_unlink_work(struct ceph_mds_client * mdsc)2481 void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
2482 {
2483 struct ceph_client *cl = mdsc->fsc->client;
2484 if (mdsc->stopping)
2485 return;
2486
2487 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
2488 doutc(cl, "caps unlink work queued\n");
2489 } else {
2490 doutc(cl, "failed to queue caps unlink work\n");
2491 }
2492 }
2493
ceph_cap_unlink_work(struct work_struct * work)2494 static void ceph_cap_unlink_work(struct work_struct *work)
2495 {
2496 struct ceph_mds_client *mdsc =
2497 container_of(work, struct ceph_mds_client, cap_unlink_work);
2498 struct ceph_client *cl = mdsc->fsc->client;
2499
2500 doutc(cl, "begin\n");
2501 spin_lock(&mdsc->cap_delay_lock);
2502 while (!list_empty(&mdsc->cap_unlink_delay_list)) {
2503 struct ceph_inode_info *ci;
2504 struct inode *inode;
2505
2506 ci = list_first_entry(&mdsc->cap_unlink_delay_list,
2507 struct ceph_inode_info,
2508 i_cap_delay_list);
2509 list_del_init(&ci->i_cap_delay_list);
2510
2511 inode = igrab(&ci->netfs.inode);
2512 if (inode) {
2513 spin_unlock(&mdsc->cap_delay_lock);
2514 doutc(cl, "on %p %llx.%llx\n", inode,
2515 ceph_vinop(inode));
2516 ceph_check_caps(ci, CHECK_CAPS_FLUSH);
2517 iput(inode);
2518 spin_lock(&mdsc->cap_delay_lock);
2519 }
2520 }
2521 spin_unlock(&mdsc->cap_delay_lock);
2522 doutc(cl, "done\n");
2523 }
2524
2525 /*
2526 * requests
2527 */
2528
ceph_alloc_readdir_reply_buffer(struct ceph_mds_request * req,struct inode * dir)2529 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2530 struct inode *dir)
2531 {
2532 struct ceph_inode_info *ci = ceph_inode(dir);
2533 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2534 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2535 size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2536 unsigned int num_entries;
2537 u64 bytes_count;
2538 int order;
2539
2540 spin_lock(&ci->i_ceph_lock);
2541 num_entries = ci->i_files + ci->i_subdirs;
2542 spin_unlock(&ci->i_ceph_lock);
2543 num_entries = max(num_entries, 1U);
2544 num_entries = min(num_entries, opt->max_readdir);
2545
2546 bytes_count = (u64)size * num_entries;
2547 if (unlikely(bytes_count > ULONG_MAX))
2548 bytes_count = ULONG_MAX;
2549
2550 order = get_order((unsigned long)bytes_count);
2551 while (order >= 0) {
2552 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2553 __GFP_NOWARN |
2554 __GFP_ZERO,
2555 order);
2556 if (rinfo->dir_entries)
2557 break;
2558 order--;
2559 }
2560 if (!rinfo->dir_entries || unlikely(order < 0))
2561 return -ENOMEM;
2562
2563 num_entries = (PAGE_SIZE << order) / size;
2564 num_entries = min(num_entries, opt->max_readdir);
2565
2566 rinfo->dir_buf_size = PAGE_SIZE << order;
2567 req->r_num_caps = num_entries + 1;
2568 req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2569 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2570 return 0;
2571 }
2572
2573 /*
2574 * Create an mds request.
2575 */
2576 struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client * mdsc,int op,int mode)2577 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2578 {
2579 struct ceph_mds_request *req;
2580
2581 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2582 if (!req)
2583 return ERR_PTR(-ENOMEM);
2584
2585 mutex_init(&req->r_fill_mutex);
2586 req->r_mdsc = mdsc;
2587 req->r_started = jiffies;
2588 req->r_start_latency = ktime_get();
2589 req->r_resend_mds = -1;
2590 INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2591 INIT_LIST_HEAD(&req->r_unsafe_target_item);
2592 req->r_fmode = -1;
2593 req->r_feature_needed = -1;
2594 kref_init(&req->r_kref);
2595 RB_CLEAR_NODE(&req->r_node);
2596 INIT_LIST_HEAD(&req->r_wait);
2597 init_completion(&req->r_completion);
2598 init_completion(&req->r_safe_completion);
2599 INIT_LIST_HEAD(&req->r_unsafe_item);
2600
2601 ktime_get_coarse_real_ts64(&req->r_stamp);
2602
2603 req->r_op = op;
2604 req->r_direct_mode = mode;
2605 return req;
2606 }
2607
2608 /*
2609 * return oldest (lowest) request, tid in request tree, 0 if none.
2610 *
2611 * called under mdsc->mutex.
2612 */
__get_oldest_req(struct ceph_mds_client * mdsc)2613 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2614 {
2615 if (RB_EMPTY_ROOT(&mdsc->request_tree))
2616 return NULL;
2617 return rb_entry(rb_first(&mdsc->request_tree),
2618 struct ceph_mds_request, r_node);
2619 }
2620
__get_oldest_tid(struct ceph_mds_client * mdsc)2621 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2622 {
2623 return mdsc->oldest_tid;
2624 }
2625
2626 #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
get_fscrypt_altname(const struct ceph_mds_request * req,u32 * plen)2627 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2628 {
2629 struct inode *dir = req->r_parent;
2630 struct dentry *dentry = req->r_dentry;
2631 const struct qstr *name = req->r_dname;
2632 u8 *cryptbuf = NULL;
2633 u32 len = 0;
2634 int ret = 0;
2635
2636 /* only encode if we have parent and dentry */
2637 if (!dir || !dentry)
2638 goto success;
2639
2640 /* No-op unless this is encrypted */
2641 if (!IS_ENCRYPTED(dir))
2642 goto success;
2643
2644 ret = ceph_fscrypt_prepare_readdir(dir);
2645 if (ret < 0)
2646 return ERR_PTR(ret);
2647
2648 /* No key? Just ignore it. */
2649 if (!fscrypt_has_encryption_key(dir))
2650 goto success;
2651
2652 if (!name)
2653 name = &dentry->d_name;
2654
2655 if (!fscrypt_fname_encrypted_size(dir, name->len, NAME_MAX, &len)) {
2656 WARN_ON_ONCE(1);
2657 return ERR_PTR(-ENAMETOOLONG);
2658 }
2659
2660 /* No need to append altname if name is short enough */
2661 if (len <= CEPH_NOHASH_NAME_MAX) {
2662 len = 0;
2663 goto success;
2664 }
2665
2666 cryptbuf = kmalloc(len, GFP_KERNEL);
2667 if (!cryptbuf)
2668 return ERR_PTR(-ENOMEM);
2669
2670 ret = fscrypt_fname_encrypt(dir, name, cryptbuf, len);
2671 if (ret) {
2672 kfree(cryptbuf);
2673 return ERR_PTR(ret);
2674 }
2675 success:
2676 *plen = len;
2677 return cryptbuf;
2678 }
2679 #else
get_fscrypt_altname(const struct ceph_mds_request * req,u32 * plen)2680 static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2681 {
2682 *plen = 0;
2683 return NULL;
2684 }
2685 #endif
2686
2687 /**
2688 * ceph_mdsc_build_path - build a path string to a given dentry
2689 * @mdsc: mds client
2690 * @dentry: dentry to which path should be built
2691 * @path_info: output path, length, base ino+snap, and freepath ownership flag
2692 * @for_wire: is this path going to be sent to the MDS?
2693 *
2694 * Build a string that represents the path to the dentry. This is mostly called
2695 * for two different purposes:
2696 *
2697 * 1) we need to build a path string to send to the MDS (for_wire == true)
2698 * 2) we need a path string for local presentation (e.g. debugfs)
2699 * (for_wire == false)
2700 *
2701 * The path is built in reverse, starting with the dentry. Walk back up toward
2702 * the root, building the path until the first non-snapped inode is reached
2703 * (for_wire) or the root inode is reached (!for_wire).
2704 *
2705 * Encode hidden .snap dirs as a double /, i.e.
2706 * foo/.snap/bar -> foo//bar
2707 */
ceph_mdsc_build_path(struct ceph_mds_client * mdsc,struct dentry * dentry,struct ceph_path_info * path_info,int for_wire)2708 char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2709 struct ceph_path_info *path_info, int for_wire)
2710 {
2711 struct ceph_client *cl = mdsc->fsc->client;
2712 struct dentry *cur;
2713 struct inode *inode;
2714 char *path;
2715 int pos;
2716 unsigned seq;
2717 u64 base;
2718
2719 if (!dentry)
2720 return ERR_PTR(-EINVAL);
2721
2722 path = __getname();
2723 if (!path)
2724 return ERR_PTR(-ENOMEM);
2725 retry:
2726 pos = PATH_MAX - 1;
2727 path[pos] = '\0';
2728
2729 seq = read_seqbegin(&rename_lock);
2730 cur = dget(dentry);
2731 for (;;) {
2732 struct dentry *parent;
2733
2734 spin_lock(&cur->d_lock);
2735 inode = d_inode(cur);
2736 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2737 doutc(cl, "path+%d: %p SNAPDIR\n", pos, cur);
2738 spin_unlock(&cur->d_lock);
2739 parent = dget_parent(cur);
2740 } else if (for_wire && inode && dentry != cur &&
2741 ceph_snap(inode) == CEPH_NOSNAP) {
2742 spin_unlock(&cur->d_lock);
2743 pos++; /* get rid of any prepended '/' */
2744 break;
2745 } else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2746 pos -= cur->d_name.len;
2747 if (pos < 0) {
2748 spin_unlock(&cur->d_lock);
2749 break;
2750 }
2751 memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2752 spin_unlock(&cur->d_lock);
2753 parent = dget_parent(cur);
2754 } else {
2755 int len, ret;
2756 char buf[NAME_MAX];
2757
2758 /*
2759 * Proactively copy name into buf, in case we need to
2760 * present it as-is.
2761 */
2762 memcpy(buf, cur->d_name.name, cur->d_name.len);
2763 len = cur->d_name.len;
2764 spin_unlock(&cur->d_lock);
2765 parent = dget_parent(cur);
2766
2767 ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2768 if (ret < 0) {
2769 dput(parent);
2770 dput(cur);
2771 __putname(path);
2772 return ERR_PTR(ret);
2773 }
2774
2775 if (fscrypt_has_encryption_key(d_inode(parent))) {
2776 len = ceph_encode_encrypted_dname(d_inode(parent),
2777 buf, len);
2778 if (len < 0) {
2779 dput(parent);
2780 dput(cur);
2781 __putname(path);
2782 return ERR_PTR(len);
2783 }
2784 }
2785 pos -= len;
2786 if (pos < 0) {
2787 dput(parent);
2788 break;
2789 }
2790 memcpy(path + pos, buf, len);
2791 }
2792 dput(cur);
2793 cur = parent;
2794
2795 /* Are we at the root? */
2796 if (IS_ROOT(cur))
2797 break;
2798
2799 /* Are we out of buffer? */
2800 if (--pos < 0)
2801 break;
2802
2803 path[pos] = '/';
2804 }
2805 inode = d_inode(cur);
2806 base = inode ? ceph_ino(inode) : 0;
2807 dput(cur);
2808
2809 if (read_seqretry(&rename_lock, seq))
2810 goto retry;
2811
2812 if (pos < 0) {
2813 /*
2814 * The path is longer than PATH_MAX and this function
2815 * cannot ever succeed. Creating paths that long is
2816 * possible with Ceph, but Linux cannot use them.
2817 */
2818 __putname(path);
2819 return ERR_PTR(-ENAMETOOLONG);
2820 }
2821
2822 /* Initialize the output structure */
2823 memset(path_info, 0, sizeof(*path_info));
2824
2825 path_info->vino.ino = base;
2826 path_info->pathlen = PATH_MAX - 1 - pos;
2827 path_info->path = path + pos;
2828 path_info->freepath = true;
2829
2830 /* Set snap from dentry if available */
2831 if (d_inode(dentry))
2832 path_info->vino.snap = ceph_snap(d_inode(dentry));
2833 else
2834 path_info->vino.snap = CEPH_NOSNAP;
2835
2836 doutc(cl, "on %p %d built %llx '%.*s'\n", dentry, d_count(dentry),
2837 base, PATH_MAX - 1 - pos, path + pos);
2838 return path + pos;
2839 }
2840
build_dentry_path(struct ceph_mds_client * mdsc,struct dentry * dentry,struct inode * dir,struct ceph_path_info * path_info,bool parent_locked)2841 static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2842 struct inode *dir, struct ceph_path_info *path_info,
2843 bool parent_locked)
2844 {
2845 char *path;
2846
2847 rcu_read_lock();
2848 if (!dir)
2849 dir = d_inode_rcu(dentry->d_parent);
2850 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2851 !IS_ENCRYPTED(dir)) {
2852 path_info->vino.ino = ceph_ino(dir);
2853 path_info->vino.snap = ceph_snap(dir);
2854 rcu_read_unlock();
2855 path_info->path = dentry->d_name.name;
2856 path_info->pathlen = dentry->d_name.len;
2857 path_info->freepath = false;
2858 return 0;
2859 }
2860 rcu_read_unlock();
2861 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2862 if (IS_ERR(path))
2863 return PTR_ERR(path);
2864 /*
2865 * ceph_mdsc_build_path already fills path_info, including snap handling.
2866 */
2867 return 0;
2868 }
2869
build_inode_path(struct inode * inode,struct ceph_path_info * path_info)2870 static int build_inode_path(struct inode *inode, struct ceph_path_info *path_info)
2871 {
2872 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2873 struct dentry *dentry;
2874 char *path;
2875
2876 if (ceph_snap(inode) == CEPH_NOSNAP) {
2877 path_info->vino.ino = ceph_ino(inode);
2878 path_info->vino.snap = ceph_snap(inode);
2879 path_info->pathlen = 0;
2880 path_info->freepath = false;
2881 return 0;
2882 }
2883 dentry = d_find_alias(inode);
2884 path = ceph_mdsc_build_path(mdsc, dentry, path_info, 1);
2885 dput(dentry);
2886 if (IS_ERR(path))
2887 return PTR_ERR(path);
2888 /*
2889 * ceph_mdsc_build_path already fills path_info, including snap from dentry.
2890 * Override with inode's snap since that's what this function is for.
2891 */
2892 path_info->vino.snap = ceph_snap(inode);
2893 return 0;
2894 }
2895
2896 /*
2897 * request arguments may be specified via an inode *, a dentry *, or
2898 * an explicit ino+path.
2899 */
set_request_path_attr(struct ceph_mds_client * mdsc,struct inode * rinode,struct dentry * rdentry,struct inode * rdiri,const char * rpath,u64 rino,struct ceph_path_info * path_info,bool parent_locked)2900 static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode,
2901 struct dentry *rdentry, struct inode *rdiri,
2902 const char *rpath, u64 rino,
2903 struct ceph_path_info *path_info,
2904 bool parent_locked)
2905 {
2906 struct ceph_client *cl = mdsc->fsc->client;
2907 int r = 0;
2908
2909 /* Initialize the output structure */
2910 memset(path_info, 0, sizeof(*path_info));
2911
2912 if (rinode) {
2913 r = build_inode_path(rinode, path_info);
2914 doutc(cl, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2915 ceph_snap(rinode));
2916 } else if (rdentry) {
2917 r = build_dentry_path(mdsc, rdentry, rdiri, path_info, parent_locked);
2918 doutc(cl, " dentry %p %llx/%.*s\n", rdentry, path_info->vino.ino,
2919 path_info->pathlen, path_info->path);
2920 } else if (rpath || rino) {
2921 path_info->vino.ino = rino;
2922 path_info->vino.snap = CEPH_NOSNAP;
2923 path_info->path = rpath;
2924 path_info->pathlen = rpath ? strlen(rpath) : 0;
2925 path_info->freepath = false;
2926
2927 doutc(cl, " path %.*s\n", path_info->pathlen, rpath);
2928 }
2929
2930 return r;
2931 }
2932
encode_mclientrequest_tail(void ** p,const struct ceph_mds_request * req)2933 static void encode_mclientrequest_tail(void **p,
2934 const struct ceph_mds_request *req)
2935 {
2936 struct ceph_timespec ts;
2937 int i;
2938
2939 ceph_encode_timespec64(&ts, &req->r_stamp);
2940 ceph_encode_copy(p, &ts, sizeof(ts));
2941
2942 /* v4: gid_list */
2943 ceph_encode_32(p, req->r_cred->group_info->ngroups);
2944 for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2945 ceph_encode_64(p, from_kgid(&init_user_ns,
2946 req->r_cred->group_info->gid[i]));
2947
2948 /* v5: altname */
2949 ceph_encode_32(p, req->r_altname_len);
2950 ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2951
2952 /* v6: fscrypt_auth and fscrypt_file */
2953 if (req->r_fscrypt_auth) {
2954 u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2955
2956 ceph_encode_32(p, authlen);
2957 ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2958 } else {
2959 ceph_encode_32(p, 0);
2960 }
2961 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2962 ceph_encode_32(p, sizeof(__le64));
2963 ceph_encode_64(p, req->r_fscrypt_file);
2964 } else {
2965 ceph_encode_32(p, 0);
2966 }
2967 }
2968
mds_supported_head_version(struct ceph_mds_session * session)2969 static inline u16 mds_supported_head_version(struct ceph_mds_session *session)
2970 {
2971 if (!test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD, &session->s_features))
2972 return 1;
2973
2974 if (!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features))
2975 return 2;
2976
2977 return CEPH_MDS_REQUEST_HEAD_VERSION;
2978 }
2979
2980 static struct ceph_mds_request_head_legacy *
find_legacy_request_head(void * p,u64 features)2981 find_legacy_request_head(void *p, u64 features)
2982 {
2983 bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2984 struct ceph_mds_request_head *head;
2985
2986 if (legacy)
2987 return (struct ceph_mds_request_head_legacy *)p;
2988 head = (struct ceph_mds_request_head *)p;
2989 return (struct ceph_mds_request_head_legacy *)&head->oldest_client_tid;
2990 }
2991
2992 /*
2993 * called under mdsc->mutex
2994 */
create_request_message(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)2995 static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2996 struct ceph_mds_request *req,
2997 bool drop_cap_releases)
2998 {
2999 int mds = session->s_mds;
3000 struct ceph_mds_client *mdsc = session->s_mdsc;
3001 struct ceph_client *cl = mdsc->fsc->client;
3002 struct ceph_msg *msg;
3003 struct ceph_mds_request_head_legacy *lhead;
3004 struct ceph_path_info path_info1 = {0};
3005 struct ceph_path_info path_info2 = {0};
3006 struct dentry *old_dentry = NULL;
3007 int len;
3008 u16 releases;
3009 void *p, *end;
3010 int ret;
3011 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
3012 u16 request_head_version = mds_supported_head_version(session);
3013 kuid_t caller_fsuid = req->r_cred->fsuid;
3014 kgid_t caller_fsgid = req->r_cred->fsgid;
3015 bool parent_locked = test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
3016
3017 ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry,
3018 req->r_parent, req->r_path1, req->r_ino1.ino,
3019 &path_info1, parent_locked);
3020 if (ret < 0) {
3021 msg = ERR_PTR(ret);
3022 goto out;
3023 }
3024
3025 /*
3026 * When the parent directory's i_rwsem is *not* locked, req->r_parent may
3027 * have become stale (e.g. after a concurrent rename) between the time the
3028 * dentry was looked up and now. If we detect that the stored r_parent
3029 * does not match the inode number we just encoded for the request, switch
3030 * to the correct inode so that the MDS receives a valid parent reference.
3031 */
3032 if (!parent_locked && req->r_parent && path_info1.vino.ino &&
3033 ceph_ino(req->r_parent) != path_info1.vino.ino) {
3034 struct inode *old_parent = req->r_parent;
3035 struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL);
3036 if (!IS_ERR(correct_dir)) {
3037 WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n",
3038 ceph_ino(old_parent), path_info1.vino.ino);
3039 /*
3040 * Transfer CEPH_CAP_PIN from the old parent to the new one.
3041 * The pin was taken earlier in ceph_mdsc_submit_request().
3042 */
3043 ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN);
3044 iput(old_parent);
3045 req->r_parent = correct_dir;
3046 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
3047 }
3048 }
3049
3050 /* If r_old_dentry is set, then assume that its parent is locked */
3051 if (req->r_old_dentry &&
3052 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
3053 old_dentry = req->r_old_dentry;
3054 ret = set_request_path_attr(mdsc, NULL, old_dentry,
3055 req->r_old_dentry_dir,
3056 req->r_path2, req->r_ino2.ino,
3057 &path_info2, true);
3058 if (ret < 0) {
3059 msg = ERR_PTR(ret);
3060 goto out_free1;
3061 }
3062
3063 req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
3064 if (IS_ERR(req->r_altname)) {
3065 msg = ERR_CAST(req->r_altname);
3066 req->r_altname = NULL;
3067 goto out_free2;
3068 }
3069
3070 /*
3071 * For old cephs without supporting the 32bit retry/fwd feature
3072 * it will copy the raw memories directly when decoding the
3073 * requests. While new cephs will decode the head depending the
3074 * version member, so we need to make sure it will be compatible
3075 * with them both.
3076 */
3077 if (legacy)
3078 len = sizeof(struct ceph_mds_request_head_legacy);
3079 else if (request_head_version == 1)
3080 len = offsetofend(struct ceph_mds_request_head, args);
3081 else if (request_head_version == 2)
3082 len = offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3083 else
3084 len = sizeof(struct ceph_mds_request_head);
3085
3086 /* filepaths */
3087 len += 2 * (1 + sizeof(u32) + sizeof(u64));
3088 len += path_info1.pathlen + path_info2.pathlen;
3089
3090 /* cap releases */
3091 len += sizeof(struct ceph_mds_request_release) *
3092 (!!req->r_inode_drop + !!req->r_dentry_drop +
3093 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
3094
3095 if (req->r_dentry_drop)
3096 len += path_info1.pathlen;
3097 if (req->r_old_dentry_drop)
3098 len += path_info2.pathlen;
3099
3100 /* MClientRequest tail */
3101
3102 /* req->r_stamp */
3103 len += sizeof(struct ceph_timespec);
3104
3105 /* gid list */
3106 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
3107
3108 /* alternate name */
3109 len += sizeof(u32) + req->r_altname_len;
3110
3111 /* fscrypt_auth */
3112 len += sizeof(u32); // fscrypt_auth
3113 if (req->r_fscrypt_auth)
3114 len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
3115
3116 /* fscrypt_file */
3117 len += sizeof(u32);
3118 if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
3119 len += sizeof(__le64);
3120
3121 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
3122 if (!msg) {
3123 msg = ERR_PTR(-ENOMEM);
3124 goto out_free2;
3125 }
3126
3127 msg->hdr.tid = cpu_to_le64(req->r_tid);
3128
3129 lhead = find_legacy_request_head(msg->front.iov_base,
3130 session->s_con.peer_features);
3131
3132 if ((req->r_mnt_idmap != &nop_mnt_idmap) &&
3133 !test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) {
3134 WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op));
3135
3136 if (enable_unsafe_idmap) {
3137 pr_warn_once_client(cl,
3138 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3139 " is not supported by MDS. UID/GID-based restrictions may"
3140 " not work properly.\n");
3141
3142 caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3143 VFSUIDT_INIT(req->r_cred->fsuid));
3144 caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3145 VFSGIDT_INIT(req->r_cred->fsgid));
3146 } else {
3147 pr_err_ratelimited_client(cl,
3148 "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID"
3149 " is not supported by MDS. Fail request with -EIO.\n");
3150
3151 ret = -EIO;
3152 goto out_err;
3153 }
3154 }
3155
3156 /*
3157 * The ceph_mds_request_head_legacy didn't contain a version field, and
3158 * one was added when we moved the message version from 3->4.
3159 */
3160 if (legacy) {
3161 msg->hdr.version = cpu_to_le16(3);
3162 p = msg->front.iov_base + sizeof(*lhead);
3163 } else if (request_head_version == 1) {
3164 struct ceph_mds_request_head *nhead = msg->front.iov_base;
3165
3166 msg->hdr.version = cpu_to_le16(4);
3167 nhead->version = cpu_to_le16(1);
3168 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, args);
3169 } else if (request_head_version == 2) {
3170 struct ceph_mds_request_head *nhead = msg->front.iov_base;
3171
3172 msg->hdr.version = cpu_to_le16(6);
3173 nhead->version = cpu_to_le16(2);
3174
3175 p = msg->front.iov_base + offsetofend(struct ceph_mds_request_head, ext_num_fwd);
3176 } else {
3177 struct ceph_mds_request_head *nhead = msg->front.iov_base;
3178 kuid_t owner_fsuid;
3179 kgid_t owner_fsgid;
3180
3181 msg->hdr.version = cpu_to_le16(6);
3182 nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
3183 nhead->struct_len = cpu_to_le32(sizeof(struct ceph_mds_request_head));
3184
3185 if (IS_CEPH_MDS_OP_NEWINODE(req->r_op)) {
3186 owner_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
3187 VFSUIDT_INIT(req->r_cred->fsuid));
3188 owner_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
3189 VFSGIDT_INIT(req->r_cred->fsgid));
3190 nhead->owner_uid = cpu_to_le32(from_kuid(&init_user_ns, owner_fsuid));
3191 nhead->owner_gid = cpu_to_le32(from_kgid(&init_user_ns, owner_fsgid));
3192 } else {
3193 nhead->owner_uid = cpu_to_le32(-1);
3194 nhead->owner_gid = cpu_to_le32(-1);
3195 }
3196
3197 p = msg->front.iov_base + sizeof(*nhead);
3198 }
3199
3200 end = msg->front.iov_base + msg->front.iov_len;
3201
3202 lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
3203 lhead->op = cpu_to_le32(req->r_op);
3204 lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
3205 caller_fsuid));
3206 lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
3207 caller_fsgid));
3208 lhead->ino = cpu_to_le64(req->r_deleg_ino);
3209 lhead->args = req->r_args;
3210
3211 ceph_encode_filepath(&p, end, path_info1.vino.ino, path_info1.path);
3212 ceph_encode_filepath(&p, end, path_info2.vino.ino, path_info2.path);
3213
3214 /* make note of release offset, in case we need to replay */
3215 req->r_request_release_offset = p - msg->front.iov_base;
3216
3217 /* cap releases */
3218 releases = 0;
3219 if (req->r_inode_drop)
3220 releases += ceph_encode_inode_release(&p,
3221 req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3222 mds, req->r_inode_drop, req->r_inode_unless,
3223 req->r_op == CEPH_MDS_OP_READDIR);
3224 if (req->r_dentry_drop) {
3225 ret = ceph_encode_dentry_release(&p, req->r_dentry,
3226 req->r_parent, mds, req->r_dentry_drop,
3227 req->r_dentry_unless);
3228 if (ret < 0)
3229 goto out_err;
3230 releases += ret;
3231 }
3232 if (req->r_old_dentry_drop) {
3233 ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3234 req->r_old_dentry_dir, mds,
3235 req->r_old_dentry_drop,
3236 req->r_old_dentry_unless);
3237 if (ret < 0)
3238 goto out_err;
3239 releases += ret;
3240 }
3241 if (req->r_old_inode_drop)
3242 releases += ceph_encode_inode_release(&p,
3243 d_inode(req->r_old_dentry),
3244 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3245
3246 if (drop_cap_releases) {
3247 releases = 0;
3248 p = msg->front.iov_base + req->r_request_release_offset;
3249 }
3250
3251 lhead->num_releases = cpu_to_le16(releases);
3252
3253 encode_mclientrequest_tail(&p, req);
3254
3255 if (WARN_ON_ONCE(p > end)) {
3256 ceph_msg_put(msg);
3257 msg = ERR_PTR(-ERANGE);
3258 goto out_free2;
3259 }
3260
3261 msg->front.iov_len = p - msg->front.iov_base;
3262 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3263
3264 if (req->r_pagelist) {
3265 struct ceph_pagelist *pagelist = req->r_pagelist;
3266 ceph_msg_data_add_pagelist(msg, pagelist);
3267 msg->hdr.data_len = cpu_to_le32(pagelist->length);
3268 } else {
3269 msg->hdr.data_len = 0;
3270 }
3271
3272 msg->hdr.data_off = cpu_to_le16(0);
3273
3274 out_free2:
3275 ceph_mdsc_free_path_info(&path_info2);
3276 out_free1:
3277 ceph_mdsc_free_path_info(&path_info1);
3278 out:
3279 return msg;
3280 out_err:
3281 ceph_msg_put(msg);
3282 msg = ERR_PTR(ret);
3283 goto out_free2;
3284 }
3285
3286 /*
3287 * called under mdsc->mutex if error, under no mutex if
3288 * success.
3289 */
complete_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3290 static void complete_request(struct ceph_mds_client *mdsc,
3291 struct ceph_mds_request *req)
3292 {
3293 req->r_end_latency = ktime_get();
3294
3295 trace_ceph_mdsc_complete_request(mdsc, req);
3296
3297 if (req->r_callback)
3298 req->r_callback(mdsc, req);
3299 complete_all(&req->r_completion);
3300 }
3301
3302 /*
3303 * called under mdsc->mutex
3304 */
__prepare_send_request(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)3305 static int __prepare_send_request(struct ceph_mds_session *session,
3306 struct ceph_mds_request *req,
3307 bool drop_cap_releases)
3308 {
3309 int mds = session->s_mds;
3310 struct ceph_mds_client *mdsc = session->s_mdsc;
3311 struct ceph_client *cl = mdsc->fsc->client;
3312 struct ceph_mds_request_head_legacy *lhead;
3313 struct ceph_mds_request_head *nhead;
3314 struct ceph_msg *msg;
3315 int flags = 0, old_max_retry;
3316 bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3317 &session->s_features);
3318
3319 /*
3320 * Avoid infinite retrying after overflow. The client will
3321 * increase the retry count and if the MDS is old version,
3322 * so we limit to retry at most 256 times.
3323 */
3324 if (req->r_attempts) {
3325 old_max_retry = sizeof_field(struct ceph_mds_request_head,
3326 num_retry);
3327 old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3328 if ((old_version && req->r_attempts >= old_max_retry) ||
3329 ((uint32_t)req->r_attempts >= U32_MAX)) {
3330 pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n",
3331 req->r_tid);
3332 return -EMULTIHOP;
3333 }
3334 }
3335
3336 req->r_attempts++;
3337 if (req->r_inode) {
3338 struct ceph_cap *cap =
3339 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3340
3341 if (cap)
3342 req->r_sent_on_mseq = cap->mseq;
3343 else
3344 req->r_sent_on_mseq = -1;
3345 }
3346 doutc(cl, "%p tid %lld %s (attempt %d)\n", req, req->r_tid,
3347 ceph_mds_op_name(req->r_op), req->r_attempts);
3348
3349 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3350 void *p;
3351
3352 /*
3353 * Replay. Do not regenerate message (and rebuild
3354 * paths, etc.); just use the original message.
3355 * Rebuilding paths will break for renames because
3356 * d_move mangles the src name.
3357 */
3358 msg = req->r_request;
3359 lhead = find_legacy_request_head(msg->front.iov_base,
3360 session->s_con.peer_features);
3361
3362 flags = le32_to_cpu(lhead->flags);
3363 flags |= CEPH_MDS_FLAG_REPLAY;
3364 lhead->flags = cpu_to_le32(flags);
3365
3366 if (req->r_target_inode)
3367 lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3368
3369 lhead->num_retry = req->r_attempts - 1;
3370 if (!old_version) {
3371 nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3372 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3373 }
3374
3375 /* remove cap/dentry releases from message */
3376 lhead->num_releases = 0;
3377
3378 p = msg->front.iov_base + req->r_request_release_offset;
3379 encode_mclientrequest_tail(&p, req);
3380
3381 msg->front.iov_len = p - msg->front.iov_base;
3382 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3383 return 0;
3384 }
3385
3386 if (req->r_request) {
3387 ceph_msg_put(req->r_request);
3388 req->r_request = NULL;
3389 }
3390 msg = create_request_message(session, req, drop_cap_releases);
3391 if (IS_ERR(msg)) {
3392 req->r_err = PTR_ERR(msg);
3393 return PTR_ERR(msg);
3394 }
3395 req->r_request = msg;
3396
3397 lhead = find_legacy_request_head(msg->front.iov_base,
3398 session->s_con.peer_features);
3399 lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3400 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3401 flags |= CEPH_MDS_FLAG_REPLAY;
3402 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3403 flags |= CEPH_MDS_FLAG_ASYNC;
3404 if (req->r_parent)
3405 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3406 lhead->flags = cpu_to_le32(flags);
3407 lhead->num_fwd = req->r_num_fwd;
3408 lhead->num_retry = req->r_attempts - 1;
3409 if (!old_version) {
3410 nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3411 nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3412 nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3413 }
3414
3415 doutc(cl, " r_parent = %p\n", req->r_parent);
3416 return 0;
3417 }
3418
3419 /*
3420 * called under mdsc->mutex
3421 */
__send_request(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)3422 static int __send_request(struct ceph_mds_session *session,
3423 struct ceph_mds_request *req,
3424 bool drop_cap_releases)
3425 {
3426 int err;
3427
3428 trace_ceph_mdsc_send_request(session, req);
3429
3430 err = __prepare_send_request(session, req, drop_cap_releases);
3431 if (!err) {
3432 ceph_msg_get(req->r_request);
3433 ceph_con_send(&session->s_con, req->r_request);
3434 }
3435
3436 return err;
3437 }
3438
3439 /*
3440 * send request, or put it on the appropriate wait list.
3441 */
__do_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3442 static void __do_request(struct ceph_mds_client *mdsc,
3443 struct ceph_mds_request *req)
3444 {
3445 struct ceph_client *cl = mdsc->fsc->client;
3446 struct ceph_mds_session *session = NULL;
3447 int mds = -1;
3448 int err = 0;
3449 bool random;
3450
3451 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3452 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3453 __unregister_request(mdsc, req);
3454 return;
3455 }
3456
3457 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3458 doutc(cl, "metadata corrupted\n");
3459 err = -EIO;
3460 goto finish;
3461 }
3462 if (req->r_timeout &&
3463 time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3464 doutc(cl, "timed out\n");
3465 err = -ETIMEDOUT;
3466 goto finish;
3467 }
3468 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3469 doutc(cl, "forced umount\n");
3470 err = -EIO;
3471 goto finish;
3472 }
3473 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3474 if (mdsc->mdsmap_err) {
3475 err = mdsc->mdsmap_err;
3476 doutc(cl, "mdsmap err %d\n", err);
3477 goto finish;
3478 }
3479 if (mdsc->mdsmap->m_epoch == 0) {
3480 doutc(cl, "no mdsmap, waiting for map\n");
3481 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3482 ceph_mdsc_suspend_reason_no_mdsmap);
3483 list_add(&req->r_wait, &mdsc->waiting_for_map);
3484 return;
3485 }
3486 if (!(mdsc->fsc->mount_options->flags &
3487 CEPH_MOUNT_OPT_MOUNTWAIT) &&
3488 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3489 err = -EHOSTUNREACH;
3490 goto finish;
3491 }
3492 }
3493
3494 put_request_session(req);
3495
3496 mds = __choose_mds(mdsc, req, &random);
3497 if (mds < 0 ||
3498 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3499 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3500 err = -EJUKEBOX;
3501 goto finish;
3502 }
3503 doutc(cl, "no mds or not active, waiting for map\n");
3504 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3505 ceph_mdsc_suspend_reason_no_active_mds);
3506 list_add(&req->r_wait, &mdsc->waiting_for_map);
3507 return;
3508 }
3509
3510 /* get, open session */
3511 session = __ceph_lookup_mds_session(mdsc, mds);
3512 if (!session) {
3513 session = register_session(mdsc, mds);
3514 if (IS_ERR(session)) {
3515 err = PTR_ERR(session);
3516 goto finish;
3517 }
3518 }
3519 req->r_session = ceph_get_mds_session(session);
3520
3521 doutc(cl, "mds%d session %p state %s\n", mds, session,
3522 ceph_session_state_name(session->s_state));
3523
3524 /*
3525 * The old ceph will crash the MDSs when see unknown OPs
3526 */
3527 if (req->r_feature_needed > 0 &&
3528 !test_bit(req->r_feature_needed, &session->s_features)) {
3529 err = -EOPNOTSUPP;
3530 goto out_session;
3531 }
3532
3533 if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3534 session->s_state != CEPH_MDS_SESSION_HUNG) {
3535 /*
3536 * We cannot queue async requests since the caps and delegated
3537 * inodes are bound to the session. Just return -EJUKEBOX and
3538 * let the caller retry a sync request in that case.
3539 */
3540 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3541 err = -EJUKEBOX;
3542 goto out_session;
3543 }
3544
3545 /*
3546 * If the session has been REJECTED, then return a hard error,
3547 * unless it's a CLEANRECOVER mount, in which case we'll queue
3548 * it to the mdsc queue.
3549 */
3550 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3551 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) {
3552 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3553 ceph_mdsc_suspend_reason_rejected);
3554 list_add(&req->r_wait, &mdsc->waiting_for_map);
3555 } else
3556 err = -EACCES;
3557 goto out_session;
3558 }
3559
3560 if (session->s_state == CEPH_MDS_SESSION_NEW ||
3561 session->s_state == CEPH_MDS_SESSION_CLOSING) {
3562 err = __open_session(mdsc, session);
3563 if (err)
3564 goto out_session;
3565 /* retry the same mds later */
3566 if (random)
3567 req->r_resend_mds = mds;
3568 }
3569 trace_ceph_mdsc_suspend_request(mdsc, session, req,
3570 ceph_mdsc_suspend_reason_session);
3571 list_add(&req->r_wait, &session->s_waiting);
3572 goto out_session;
3573 }
3574
3575 /* send request */
3576 req->r_resend_mds = -1; /* forget any previous mds hint */
3577
3578 if (req->r_request_started == 0) /* note request start time */
3579 req->r_request_started = jiffies;
3580
3581 /*
3582 * For async create we will choose the auth MDS of frag in parent
3583 * directory to send the request and usually this works fine, but
3584 * if the migrated the dirtory to another MDS before it could handle
3585 * it the request will be forwarded.
3586 *
3587 * And then the auth cap will be changed.
3588 */
3589 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3590 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3591 struct ceph_inode_info *ci;
3592 struct ceph_cap *cap;
3593
3594 /*
3595 * The request maybe handled very fast and the new inode
3596 * hasn't been linked to the dentry yet. We need to wait
3597 * for the ceph_finish_async_create(), which shouldn't be
3598 * stuck too long or fail in thoery, to finish when forwarding
3599 * the request.
3600 */
3601 if (!d_inode(req->r_dentry)) {
3602 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3603 TASK_KILLABLE);
3604 if (err) {
3605 mutex_lock(&req->r_fill_mutex);
3606 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3607 mutex_unlock(&req->r_fill_mutex);
3608 goto out_session;
3609 }
3610 }
3611
3612 ci = ceph_inode(d_inode(req->r_dentry));
3613
3614 spin_lock(&ci->i_ceph_lock);
3615 cap = ci->i_auth_cap;
3616 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3617 doutc(cl, "session changed for auth cap %d -> %d\n",
3618 cap->session->s_mds, session->s_mds);
3619
3620 /* Remove the auth cap from old session */
3621 spin_lock(&cap->session->s_cap_lock);
3622 cap->session->s_nr_caps--;
3623 list_del_init(&cap->session_caps);
3624 spin_unlock(&cap->session->s_cap_lock);
3625
3626 /* Add the auth cap to the new session */
3627 cap->mds = mds;
3628 cap->session = session;
3629 spin_lock(&session->s_cap_lock);
3630 session->s_nr_caps++;
3631 list_add_tail(&cap->session_caps, &session->s_caps);
3632 spin_unlock(&session->s_cap_lock);
3633
3634 change_auth_cap_ses(ci, session);
3635 }
3636 spin_unlock(&ci->i_ceph_lock);
3637 }
3638
3639 err = __send_request(session, req, false);
3640
3641 out_session:
3642 ceph_put_mds_session(session);
3643 finish:
3644 if (err) {
3645 doutc(cl, "early error %d\n", err);
3646 req->r_err = err;
3647 complete_request(mdsc, req);
3648 __unregister_request(mdsc, req);
3649 }
3650 return;
3651 }
3652
3653 /*
3654 * called under mdsc->mutex
3655 */
__wake_requests(struct ceph_mds_client * mdsc,struct list_head * head)3656 static void __wake_requests(struct ceph_mds_client *mdsc,
3657 struct list_head *head)
3658 {
3659 struct ceph_client *cl = mdsc->fsc->client;
3660 struct ceph_mds_request *req;
3661 LIST_HEAD(tmp_list);
3662
3663 list_splice_init(head, &tmp_list);
3664
3665 while (!list_empty(&tmp_list)) {
3666 req = list_entry(tmp_list.next,
3667 struct ceph_mds_request, r_wait);
3668 list_del_init(&req->r_wait);
3669 doutc(cl, " wake request %p tid %llu\n", req,
3670 req->r_tid);
3671 trace_ceph_mdsc_resume_request(mdsc, req);
3672 __do_request(mdsc, req);
3673 }
3674 }
3675
3676 /*
3677 * Wake up threads with requests pending for @mds, so that they can
3678 * resubmit their requests to a possibly different mds.
3679 */
kick_requests(struct ceph_mds_client * mdsc,int mds)3680 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3681 {
3682 struct ceph_client *cl = mdsc->fsc->client;
3683 struct ceph_mds_request *req;
3684 struct rb_node *p = rb_first(&mdsc->request_tree);
3685
3686 doutc(cl, "kick_requests mds%d\n", mds);
3687 while (p) {
3688 req = rb_entry(p, struct ceph_mds_request, r_node);
3689 p = rb_next(p);
3690 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3691 continue;
3692 if (req->r_attempts > 0)
3693 continue; /* only new requests */
3694 if (req->r_session &&
3695 req->r_session->s_mds == mds) {
3696 doutc(cl, " kicking tid %llu\n", req->r_tid);
3697 list_del_init(&req->r_wait);
3698 trace_ceph_mdsc_resume_request(mdsc, req);
3699 __do_request(mdsc, req);
3700 }
3701 }
3702 }
3703
ceph_mdsc_submit_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3704 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3705 struct ceph_mds_request *req)
3706 {
3707 struct ceph_client *cl = mdsc->fsc->client;
3708 int err = 0;
3709
3710 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3711 if (req->r_inode)
3712 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3713 if (req->r_parent) {
3714 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3715 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3716 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3717 spin_lock(&ci->i_ceph_lock);
3718 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3719 __ceph_touch_fmode(ci, mdsc, fmode);
3720 spin_unlock(&ci->i_ceph_lock);
3721 }
3722 if (req->r_old_dentry_dir)
3723 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3724 CEPH_CAP_PIN);
3725
3726 if (req->r_inode) {
3727 err = ceph_wait_on_async_create(req->r_inode);
3728 if (err) {
3729 doutc(cl, "wait for async create returned: %d\n", err);
3730 return err;
3731 }
3732 }
3733
3734 if (!err && req->r_old_inode) {
3735 err = ceph_wait_on_async_create(req->r_old_inode);
3736 if (err) {
3737 doutc(cl, "wait for async create returned: %d\n", err);
3738 return err;
3739 }
3740 }
3741
3742 doutc(cl, "submit_request on %p for inode %p\n", req, dir);
3743 mutex_lock(&mdsc->mutex);
3744 __register_request(mdsc, req, dir);
3745 trace_ceph_mdsc_submit_request(mdsc, req);
3746 __do_request(mdsc, req);
3747 err = req->r_err;
3748 mutex_unlock(&mdsc->mutex);
3749 return err;
3750 }
3751
ceph_mdsc_wait_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,ceph_mds_request_wait_callback_t wait_func)3752 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3753 struct ceph_mds_request *req,
3754 ceph_mds_request_wait_callback_t wait_func)
3755 {
3756 struct ceph_client *cl = mdsc->fsc->client;
3757 int err;
3758
3759 /* wait */
3760 doutc(cl, "do_request waiting\n");
3761 if (wait_func) {
3762 err = wait_func(mdsc, req);
3763 } else {
3764 long timeleft = wait_for_completion_killable_timeout(
3765 &req->r_completion,
3766 ceph_timeout_jiffies(req->r_timeout));
3767 if (timeleft > 0)
3768 err = 0;
3769 else if (!timeleft)
3770 err = -ETIMEDOUT; /* timed out */
3771 else
3772 err = timeleft; /* killed */
3773 }
3774 doutc(cl, "do_request waited, got %d\n", err);
3775 mutex_lock(&mdsc->mutex);
3776
3777 /* only abort if we didn't race with a real reply */
3778 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3779 err = le32_to_cpu(req->r_reply_info.head->result);
3780 } else if (err < 0) {
3781 doutc(cl, "aborted request %lld with %d\n", req->r_tid, err);
3782
3783 /*
3784 * ensure we aren't running concurrently with
3785 * ceph_fill_trace or ceph_readdir_prepopulate, which
3786 * rely on locks (dir mutex) held by our caller.
3787 */
3788 mutex_lock(&req->r_fill_mutex);
3789 req->r_err = err;
3790 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3791 mutex_unlock(&req->r_fill_mutex);
3792
3793 if (req->r_parent &&
3794 (req->r_op & CEPH_MDS_OP_WRITE))
3795 ceph_invalidate_dir_request(req);
3796 } else {
3797 err = req->r_err;
3798 }
3799
3800 mutex_unlock(&mdsc->mutex);
3801 return err;
3802 }
3803
3804 /*
3805 * Synchrously perform an mds request. Take care of all of the
3806 * session setup, forwarding, retry details.
3807 */
ceph_mdsc_do_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3808 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3809 struct inode *dir,
3810 struct ceph_mds_request *req)
3811 {
3812 struct ceph_client *cl = mdsc->fsc->client;
3813 int err;
3814
3815 doutc(cl, "do_request on %p\n", req);
3816
3817 /* issue */
3818 err = ceph_mdsc_submit_request(mdsc, dir, req);
3819 if (!err)
3820 err = ceph_mdsc_wait_request(mdsc, req, NULL);
3821 doutc(cl, "do_request %p done, result %d\n", req, err);
3822 return err;
3823 }
3824
3825 /*
3826 * Invalidate dir's completeness, dentry lease state on an aborted MDS
3827 * namespace request.
3828 */
ceph_invalidate_dir_request(struct ceph_mds_request * req)3829 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3830 {
3831 struct inode *dir = req->r_parent;
3832 struct inode *old_dir = req->r_old_dentry_dir;
3833 struct ceph_client *cl = req->r_mdsc->fsc->client;
3834
3835 doutc(cl, "invalidate_dir_request %p %p (complete, lease(s))\n",
3836 dir, old_dir);
3837
3838 ceph_dir_clear_complete(dir);
3839 if (old_dir)
3840 ceph_dir_clear_complete(old_dir);
3841 if (req->r_dentry)
3842 ceph_invalidate_dentry_lease(req->r_dentry);
3843 if (req->r_old_dentry)
3844 ceph_invalidate_dentry_lease(req->r_old_dentry);
3845 }
3846
3847 /*
3848 * Handle mds reply.
3849 *
3850 * We take the session mutex and parse and process the reply immediately.
3851 * This preserves the logical ordering of replies, capabilities, etc., sent
3852 * by the MDS as they are applied to our local cache.
3853 */
handle_reply(struct ceph_mds_session * session,struct ceph_msg * msg)3854 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3855 {
3856 struct ceph_mds_client *mdsc = session->s_mdsc;
3857 struct ceph_client *cl = mdsc->fsc->client;
3858 struct ceph_mds_request *req;
3859 struct ceph_mds_reply_head *head = msg->front.iov_base;
3860 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
3861 struct ceph_snap_realm *realm;
3862 u64 tid;
3863 int err, result;
3864 int mds = session->s_mds;
3865 bool close_sessions = false;
3866
3867 if (msg->front.iov_len < sizeof(*head)) {
3868 pr_err_client(cl, "got corrupt (short) reply\n");
3869 ceph_msg_dump(msg);
3870 return;
3871 }
3872
3873 /* get request, session */
3874 tid = le64_to_cpu(msg->hdr.tid);
3875 mutex_lock(&mdsc->mutex);
3876 req = lookup_get_request(mdsc, tid);
3877 if (!req) {
3878 doutc(cl, "on unknown tid %llu\n", tid);
3879 mutex_unlock(&mdsc->mutex);
3880 return;
3881 }
3882 doutc(cl, "handle_reply %p\n", req);
3883
3884 /* correct session? */
3885 if (req->r_session != session) {
3886 pr_err_client(cl, "got %llu on session mds%d not mds%d\n",
3887 tid, session->s_mds,
3888 req->r_session ? req->r_session->s_mds : -1);
3889 mutex_unlock(&mdsc->mutex);
3890 goto out;
3891 }
3892
3893 /* dup? */
3894 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3895 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3896 pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n",
3897 head->safe ? "safe" : "unsafe", tid, mds);
3898 mutex_unlock(&mdsc->mutex);
3899 goto out;
3900 }
3901 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3902 pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n",
3903 tid, mds);
3904 mutex_unlock(&mdsc->mutex);
3905 goto out;
3906 }
3907
3908 result = le32_to_cpu(head->result);
3909
3910 if (head->safe) {
3911 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3912 __unregister_request(mdsc, req);
3913
3914 /* last request during umount? */
3915 if (mdsc->stopping && !__get_oldest_req(mdsc))
3916 complete_all(&mdsc->safe_umount_waiters);
3917
3918 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3919 /*
3920 * We already handled the unsafe response, now do the
3921 * cleanup. No need to examine the response; the MDS
3922 * doesn't include any result info in the safe
3923 * response. And even if it did, there is nothing
3924 * useful we could do with a revised return value.
3925 */
3926 doutc(cl, "got safe reply %llu, mds%d\n", tid, mds);
3927
3928 mutex_unlock(&mdsc->mutex);
3929 goto out;
3930 }
3931 } else {
3932 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3933 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3934 }
3935
3936 doutc(cl, "tid %lld result %d\n", tid, result);
3937 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3938 err = parse_reply_info(session, msg, req, (u64)-1);
3939 else
3940 err = parse_reply_info(session, msg, req,
3941 session->s_con.peer_features);
3942 mutex_unlock(&mdsc->mutex);
3943
3944 /* Must find target inode outside of mutexes to avoid deadlocks */
3945 rinfo = &req->r_reply_info;
3946 if ((err >= 0) && rinfo->head->is_target) {
3947 struct inode *in = xchg(&req->r_new_inode, NULL);
3948 struct ceph_vino tvino = {
3949 .ino = le64_to_cpu(rinfo->targeti.in->ino),
3950 .snap = le64_to_cpu(rinfo->targeti.in->snapid)
3951 };
3952
3953 /*
3954 * If we ended up opening an existing inode, discard
3955 * r_new_inode
3956 */
3957 if (req->r_op == CEPH_MDS_OP_CREATE &&
3958 !req->r_reply_info.has_create_ino) {
3959 /* This should never happen on an async create */
3960 WARN_ON_ONCE(req->r_deleg_ino);
3961 iput(in);
3962 in = NULL;
3963 }
3964
3965 in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3966 if (IS_ERR(in)) {
3967 err = PTR_ERR(in);
3968 mutex_lock(&session->s_mutex);
3969 goto out_err;
3970 }
3971 req->r_target_inode = in;
3972 }
3973
3974 mutex_lock(&session->s_mutex);
3975 if (err < 0) {
3976 pr_err_client(cl, "got corrupt reply mds%d(tid:%lld)\n",
3977 mds, tid);
3978 ceph_msg_dump(msg);
3979 goto out_err;
3980 }
3981
3982 /* snap trace */
3983 realm = NULL;
3984 if (rinfo->snapblob_len) {
3985 down_write(&mdsc->snap_rwsem);
3986 err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3987 rinfo->snapblob + rinfo->snapblob_len,
3988 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3989 &realm);
3990 if (err) {
3991 up_write(&mdsc->snap_rwsem);
3992 close_sessions = true;
3993 if (err == -EIO)
3994 ceph_msg_dump(msg);
3995 goto out_err;
3996 }
3997 downgrade_write(&mdsc->snap_rwsem);
3998 } else {
3999 down_read(&mdsc->snap_rwsem);
4000 }
4001
4002 /* insert trace into our cache */
4003 mutex_lock(&req->r_fill_mutex);
4004 current->journal_info = req;
4005 err = ceph_fill_trace(mdsc->fsc->sb, req);
4006 if (err == 0) {
4007 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
4008 req->r_op == CEPH_MDS_OP_LSSNAP))
4009 err = ceph_readdir_prepopulate(req, req->r_session);
4010 }
4011 current->journal_info = NULL;
4012 mutex_unlock(&req->r_fill_mutex);
4013
4014 up_read(&mdsc->snap_rwsem);
4015 if (realm)
4016 ceph_put_snap_realm(mdsc, realm);
4017
4018 if (err == 0) {
4019 if (req->r_target_inode &&
4020 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
4021 struct ceph_inode_info *ci =
4022 ceph_inode(req->r_target_inode);
4023 spin_lock(&ci->i_unsafe_lock);
4024 list_add_tail(&req->r_unsafe_target_item,
4025 &ci->i_unsafe_iops);
4026 spin_unlock(&ci->i_unsafe_lock);
4027 }
4028
4029 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
4030 }
4031 out_err:
4032 mutex_lock(&mdsc->mutex);
4033 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4034 if (err) {
4035 req->r_err = err;
4036 } else {
4037 req->r_reply = ceph_msg_get(msg);
4038 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
4039 }
4040 } else {
4041 doutc(cl, "reply arrived after request %lld was aborted\n", tid);
4042 }
4043 mutex_unlock(&mdsc->mutex);
4044
4045 mutex_unlock(&session->s_mutex);
4046
4047 /* kick calling process */
4048 complete_request(mdsc, req);
4049
4050 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
4051 req->r_end_latency, err);
4052 out:
4053 ceph_mdsc_put_request(req);
4054
4055 /* Defer closing the sessions after s_mutex lock being released */
4056 if (close_sessions)
4057 ceph_mdsc_close_sessions(mdsc);
4058 return;
4059 }
4060
4061
4062
4063 /*
4064 * handle mds notification that our request has been forwarded.
4065 */
handle_forward(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)4066 static void handle_forward(struct ceph_mds_client *mdsc,
4067 struct ceph_mds_session *session,
4068 struct ceph_msg *msg)
4069 {
4070 struct ceph_client *cl = mdsc->fsc->client;
4071 struct ceph_mds_request *req;
4072 u64 tid = le64_to_cpu(msg->hdr.tid);
4073 u32 next_mds;
4074 u32 fwd_seq;
4075 int err = -EINVAL;
4076 void *p = msg->front.iov_base;
4077 void *end = p + msg->front.iov_len;
4078 bool aborted = false;
4079
4080 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4081 next_mds = ceph_decode_32(&p);
4082 fwd_seq = ceph_decode_32(&p);
4083
4084 mutex_lock(&mdsc->mutex);
4085 req = lookup_get_request(mdsc, tid);
4086 if (!req) {
4087 mutex_unlock(&mdsc->mutex);
4088 doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds);
4089 return; /* dup reply? */
4090 }
4091
4092 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
4093 doutc(cl, "forward tid %llu aborted, unregistering\n", tid);
4094 __unregister_request(mdsc, req);
4095 } else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
4096 /*
4097 * Avoid infinite retrying after overflow.
4098 *
4099 * The MDS will increase the fwd count and in client side
4100 * if the num_fwd is less than the one saved in request
4101 * that means the MDS is an old version and overflowed of
4102 * 8 bits.
4103 */
4104 mutex_lock(&req->r_fill_mutex);
4105 req->r_err = -EMULTIHOP;
4106 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
4107 mutex_unlock(&req->r_fill_mutex);
4108 aborted = true;
4109 pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n",
4110 tid);
4111 } else {
4112 /* resend. forward race not possible; mds would drop */
4113 doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds);
4114 BUG_ON(req->r_err);
4115 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
4116 req->r_attempts = 0;
4117 req->r_num_fwd = fwd_seq;
4118 req->r_resend_mds = next_mds;
4119 put_request_session(req);
4120 __do_request(mdsc, req);
4121 }
4122 mutex_unlock(&mdsc->mutex);
4123
4124 /* kick calling process */
4125 if (aborted)
4126 complete_request(mdsc, req);
4127 ceph_mdsc_put_request(req);
4128 return;
4129
4130 bad:
4131 pr_err_client(cl, "decode error err=%d\n", err);
4132 ceph_msg_dump(msg);
4133 }
4134
__decode_session_metadata(void ** p,void * end,bool * blocklisted)4135 static int __decode_session_metadata(void **p, void *end,
4136 bool *blocklisted)
4137 {
4138 /* map<string,string> */
4139 u32 n;
4140 bool err_str;
4141 ceph_decode_32_safe(p, end, n, bad);
4142 while (n-- > 0) {
4143 u32 len;
4144 ceph_decode_32_safe(p, end, len, bad);
4145 ceph_decode_need(p, end, len, bad);
4146 err_str = !strncmp(*p, "error_string", len);
4147 *p += len;
4148 ceph_decode_32_safe(p, end, len, bad);
4149 ceph_decode_need(p, end, len, bad);
4150 /*
4151 * Match "blocklisted (blacklisted)" from newer MDSes,
4152 * or "blacklisted" from older MDSes.
4153 */
4154 if (err_str && strnstr(*p, "blacklisted", len))
4155 *blocklisted = true;
4156 *p += len;
4157 }
4158 return 0;
4159 bad:
4160 return -1;
4161 }
4162
4163 /*
4164 * handle a mds session control message
4165 */
handle_session(struct ceph_mds_session * session,struct ceph_msg * msg)4166 static void handle_session(struct ceph_mds_session *session,
4167 struct ceph_msg *msg)
4168 {
4169 struct ceph_mds_client *mdsc = session->s_mdsc;
4170 struct ceph_client *cl = mdsc->fsc->client;
4171 int mds = session->s_mds;
4172 int msg_version = le16_to_cpu(msg->hdr.version);
4173 void *p = msg->front.iov_base;
4174 void *end = p + msg->front.iov_len;
4175 struct ceph_mds_session_head *h;
4176 struct ceph_mds_cap_auth *cap_auths = NULL;
4177 u32 op, cap_auths_num = 0;
4178 u64 seq, features = 0;
4179 int wake = 0;
4180 bool blocklisted = false;
4181 u32 i;
4182
4183
4184 /* decode */
4185 ceph_decode_need(&p, end, sizeof(*h), bad);
4186 h = p;
4187 p += sizeof(*h);
4188
4189 op = le32_to_cpu(h->op);
4190 seq = le64_to_cpu(h->seq);
4191
4192 if (msg_version >= 3) {
4193 u32 len;
4194 /* version >= 2 and < 5, decode metadata, skip otherwise
4195 * as it's handled via flags.
4196 */
4197 if (msg_version >= 5)
4198 ceph_decode_skip_map(&p, end, string, string, bad);
4199 else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
4200 goto bad;
4201
4202 /* version >= 3, feature bits */
4203 ceph_decode_32_safe(&p, end, len, bad);
4204 if (len) {
4205 ceph_decode_64_safe(&p, end, features, bad);
4206 p += len - sizeof(features);
4207 }
4208 }
4209
4210 if (msg_version >= 5) {
4211 u32 flags, len;
4212
4213 /* version >= 4 */
4214 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
4215 ceph_decode_32_safe(&p, end, len, bad); /* len */
4216 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
4217
4218 /* version >= 5, flags */
4219 ceph_decode_32_safe(&p, end, flags, bad);
4220 if (flags & CEPH_SESSION_BLOCKLISTED) {
4221 pr_warn_client(cl, "mds%d session blocklisted\n",
4222 session->s_mds);
4223 blocklisted = true;
4224 }
4225 }
4226
4227 if (msg_version >= 6) {
4228 ceph_decode_32_safe(&p, end, cap_auths_num, bad);
4229 doutc(cl, "cap_auths_num %d\n", cap_auths_num);
4230
4231 if (cap_auths_num && op != CEPH_SESSION_OPEN) {
4232 WARN_ON_ONCE(op != CEPH_SESSION_OPEN);
4233 goto skip_cap_auths;
4234 }
4235
4236 cap_auths = kzalloc_objs(struct ceph_mds_cap_auth,
4237 cap_auths_num);
4238 if (!cap_auths) {
4239 pr_err_client(cl, "No memory for cap_auths\n");
4240 return;
4241 }
4242
4243 for (i = 0; i < cap_auths_num; i++) {
4244 u32 _len, j;
4245
4246 /* struct_v, struct_compat, and struct_len in MDSCapAuth */
4247 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4248
4249 /* struct_v, struct_compat, and struct_len in MDSCapMatch */
4250 ceph_decode_skip_n(&p, end, 2 + sizeof(u32), bad);
4251 ceph_decode_64_safe(&p, end, cap_auths[i].match.uid, bad);
4252 ceph_decode_32_safe(&p, end, _len, bad);
4253 if (_len) {
4254 cap_auths[i].match.gids = kcalloc(_len, sizeof(u32),
4255 GFP_KERNEL);
4256 if (!cap_auths[i].match.gids) {
4257 pr_err_client(cl, "No memory for gids\n");
4258 goto fail;
4259 }
4260
4261 cap_auths[i].match.num_gids = _len;
4262 for (j = 0; j < _len; j++)
4263 ceph_decode_32_safe(&p, end,
4264 cap_auths[i].match.gids[j],
4265 bad);
4266 }
4267
4268 ceph_decode_32_safe(&p, end, _len, bad);
4269 if (_len) {
4270 cap_auths[i].match.path = kcalloc(_len + 1, sizeof(char),
4271 GFP_KERNEL);
4272 if (!cap_auths[i].match.path) {
4273 pr_err_client(cl, "No memory for path\n");
4274 goto fail;
4275 }
4276 ceph_decode_copy(&p, cap_auths[i].match.path, _len);
4277
4278 /* Remove the tailing '/' */
4279 while (_len && cap_auths[i].match.path[_len - 1] == '/') {
4280 cap_auths[i].match.path[_len - 1] = '\0';
4281 _len -= 1;
4282 }
4283 }
4284
4285 ceph_decode_32_safe(&p, end, _len, bad);
4286 if (_len) {
4287 cap_auths[i].match.fs_name = kcalloc(_len + 1, sizeof(char),
4288 GFP_KERNEL);
4289 if (!cap_auths[i].match.fs_name) {
4290 pr_err_client(cl, "No memory for fs_name\n");
4291 goto fail;
4292 }
4293 ceph_decode_copy(&p, cap_auths[i].match.fs_name, _len);
4294 }
4295
4296 ceph_decode_8_safe(&p, end, cap_auths[i].match.root_squash, bad);
4297 ceph_decode_8_safe(&p, end, cap_auths[i].readable, bad);
4298 ceph_decode_8_safe(&p, end, cap_auths[i].writeable, bad);
4299 doutc(cl, "uid %lld, num_gids %u, path %s, fs_name %s, root_squash %d, readable %d, writeable %d\n",
4300 cap_auths[i].match.uid, cap_auths[i].match.num_gids,
4301 cap_auths[i].match.path, cap_auths[i].match.fs_name,
4302 cap_auths[i].match.root_squash,
4303 cap_auths[i].readable, cap_auths[i].writeable);
4304 }
4305 }
4306
4307 skip_cap_auths:
4308 mutex_lock(&mdsc->mutex);
4309 if (op == CEPH_SESSION_OPEN) {
4310 if (mdsc->s_cap_auths) {
4311 for (i = 0; i < mdsc->s_cap_auths_num; i++) {
4312 kfree(mdsc->s_cap_auths[i].match.gids);
4313 kfree(mdsc->s_cap_auths[i].match.path);
4314 kfree(mdsc->s_cap_auths[i].match.fs_name);
4315 }
4316 kfree(mdsc->s_cap_auths);
4317 }
4318 mdsc->s_cap_auths_num = cap_auths_num;
4319 mdsc->s_cap_auths = cap_auths;
4320 }
4321 if (op == CEPH_SESSION_CLOSE) {
4322 ceph_get_mds_session(session);
4323 __unregister_session(mdsc, session);
4324 }
4325 /* FIXME: this ttl calculation is generous */
4326 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
4327 mutex_unlock(&mdsc->mutex);
4328
4329 mutex_lock(&session->s_mutex);
4330
4331 doutc(cl, "mds%d %s %p state %s seq %llu\n", mds,
4332 ceph_session_op_name(op), session,
4333 ceph_session_state_name(session->s_state), seq);
4334
4335 if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4336 session->s_state = CEPH_MDS_SESSION_OPEN;
4337 pr_info_client(cl, "mds%d came back\n", session->s_mds);
4338 }
4339
4340 switch (op) {
4341 case CEPH_SESSION_OPEN:
4342 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4343 pr_info_client(cl, "mds%d reconnect success\n",
4344 session->s_mds);
4345
4346 session->s_features = features;
4347 if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4348 pr_notice_client(cl, "mds%d is already opened\n",
4349 session->s_mds);
4350 } else {
4351 session->s_state = CEPH_MDS_SESSION_OPEN;
4352 renewed_caps(mdsc, session, 0);
4353 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4354 &session->s_features))
4355 metric_schedule_delayed(&mdsc->metric);
4356 }
4357
4358 /*
4359 * The connection maybe broken and the session in client
4360 * side has been reinitialized, need to update the seq
4361 * anyway.
4362 */
4363 if (!session->s_seq && seq)
4364 session->s_seq = seq;
4365
4366 wake = 1;
4367 if (mdsc->stopping)
4368 __close_session(mdsc, session);
4369 break;
4370
4371 case CEPH_SESSION_RENEWCAPS:
4372 if (session->s_renew_seq == seq)
4373 renewed_caps(mdsc, session, 1);
4374 break;
4375
4376 case CEPH_SESSION_CLOSE:
4377 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4378 pr_info_client(cl, "mds%d reconnect denied\n",
4379 session->s_mds);
4380 session->s_state = CEPH_MDS_SESSION_CLOSED;
4381 cleanup_session_requests(mdsc, session);
4382 remove_session_caps(session);
4383 wake = 2; /* for good measure */
4384 wake_up_all(&mdsc->session_close_wq);
4385 break;
4386
4387 case CEPH_SESSION_STALE:
4388 pr_info_client(cl, "mds%d caps went stale, renewing\n",
4389 session->s_mds);
4390 atomic_inc(&session->s_cap_gen);
4391 session->s_cap_ttl = jiffies - 1;
4392 send_renew_caps(mdsc, session);
4393 break;
4394
4395 case CEPH_SESSION_RECALL_STATE:
4396 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4397 break;
4398
4399 case CEPH_SESSION_FLUSHMSG:
4400 /* flush cap releases */
4401 spin_lock(&session->s_cap_lock);
4402 if (session->s_num_cap_releases)
4403 ceph_flush_session_cap_releases(mdsc, session);
4404 spin_unlock(&session->s_cap_lock);
4405
4406 send_flushmsg_ack(mdsc, session, seq);
4407 break;
4408
4409 case CEPH_SESSION_FORCE_RO:
4410 doutc(cl, "force_session_readonly %p\n", session);
4411 spin_lock(&session->s_cap_lock);
4412 session->s_readonly = true;
4413 spin_unlock(&session->s_cap_lock);
4414 wake_up_session_caps(session, FORCE_RO);
4415 break;
4416
4417 case CEPH_SESSION_REJECT:
4418 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4419 pr_info_client(cl, "mds%d rejected session\n",
4420 session->s_mds);
4421 session->s_state = CEPH_MDS_SESSION_REJECTED;
4422 cleanup_session_requests(mdsc, session);
4423 remove_session_caps(session);
4424 if (blocklisted)
4425 mdsc->fsc->blocklisted = true;
4426 wake = 2; /* for good measure */
4427 break;
4428
4429 default:
4430 pr_err_client(cl, "bad op %d mds%d\n", op, mds);
4431 WARN_ON(1);
4432 }
4433
4434 mutex_unlock(&session->s_mutex);
4435 if (wake) {
4436 mutex_lock(&mdsc->mutex);
4437 __wake_requests(mdsc, &session->s_waiting);
4438 if (wake == 2)
4439 kick_requests(mdsc, mds);
4440 mutex_unlock(&mdsc->mutex);
4441 }
4442 if (op == CEPH_SESSION_CLOSE)
4443 ceph_put_mds_session(session);
4444 return;
4445
4446 bad:
4447 pr_err_client(cl, "corrupt message mds%d len %d\n", mds,
4448 (int)msg->front.iov_len);
4449 ceph_msg_dump(msg);
4450 fail:
4451 for (i = 0; i < cap_auths_num; i++) {
4452 kfree(cap_auths[i].match.gids);
4453 kfree(cap_auths[i].match.path);
4454 kfree(cap_auths[i].match.fs_name);
4455 }
4456 kfree(cap_auths);
4457 return;
4458 }
4459
ceph_mdsc_release_dir_caps(struct ceph_mds_request * req)4460 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4461 {
4462 struct ceph_client *cl = req->r_mdsc->fsc->client;
4463 int dcaps;
4464
4465 dcaps = xchg(&req->r_dir_caps, 0);
4466 if (dcaps) {
4467 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4468 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4469 }
4470 }
4471
ceph_mdsc_release_dir_caps_async(struct ceph_mds_request * req)4472 void ceph_mdsc_release_dir_caps_async(struct ceph_mds_request *req)
4473 {
4474 struct ceph_client *cl = req->r_mdsc->fsc->client;
4475 int dcaps;
4476
4477 dcaps = xchg(&req->r_dir_caps, 0);
4478 if (dcaps) {
4479 doutc(cl, "releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4480 ceph_put_cap_refs_async(ceph_inode(req->r_parent), dcaps);
4481 }
4482 }
4483
4484 /*
4485 * called under session->mutex.
4486 */
replay_unsafe_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)4487 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4488 struct ceph_mds_session *session)
4489 {
4490 struct ceph_mds_request *req, *nreq;
4491 struct rb_node *p;
4492
4493 doutc(mdsc->fsc->client, "mds%d\n", session->s_mds);
4494
4495 mutex_lock(&mdsc->mutex);
4496 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4497 __send_request(session, req, true);
4498
4499 /*
4500 * also re-send old requests when MDS enters reconnect stage. So that MDS
4501 * can process completed request in clientreplay stage.
4502 */
4503 p = rb_first(&mdsc->request_tree);
4504 while (p) {
4505 req = rb_entry(p, struct ceph_mds_request, r_node);
4506 p = rb_next(p);
4507 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4508 continue;
4509 if (req->r_attempts == 0)
4510 continue; /* only old requests */
4511 if (!req->r_session)
4512 continue;
4513 if (req->r_session->s_mds != session->s_mds)
4514 continue;
4515
4516 ceph_mdsc_release_dir_caps_async(req);
4517
4518 __send_request(session, req, true);
4519 }
4520 mutex_unlock(&mdsc->mutex);
4521 }
4522
send_reconnect_partial(struct ceph_reconnect_state * recon_state)4523 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4524 {
4525 struct ceph_msg *reply;
4526 struct ceph_pagelist *_pagelist;
4527 struct page *page;
4528 __le32 *addr;
4529 int err = -ENOMEM;
4530
4531 if (!recon_state->allow_multi)
4532 return -ENOSPC;
4533
4534 /* can't handle message that contains both caps and realm */
4535 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4536
4537 /* pre-allocate new pagelist */
4538 _pagelist = ceph_pagelist_alloc(GFP_NOFS);
4539 if (!_pagelist)
4540 return -ENOMEM;
4541
4542 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4543 if (!reply)
4544 goto fail_msg;
4545
4546 /* placeholder for nr_caps */
4547 err = ceph_pagelist_encode_32(_pagelist, 0);
4548 if (err < 0)
4549 goto fail;
4550
4551 if (recon_state->nr_caps) {
4552 /* currently encoding caps */
4553 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4554 if (err)
4555 goto fail;
4556 } else {
4557 /* placeholder for nr_realms (currently encoding relams) */
4558 err = ceph_pagelist_encode_32(_pagelist, 0);
4559 if (err < 0)
4560 goto fail;
4561 }
4562
4563 err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4564 if (err)
4565 goto fail;
4566
4567 page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4568 addr = kmap_atomic(page);
4569 if (recon_state->nr_caps) {
4570 /* currently encoding caps */
4571 *addr = cpu_to_le32(recon_state->nr_caps);
4572 } else {
4573 /* currently encoding relams */
4574 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4575 }
4576 kunmap_atomic(addr);
4577
4578 reply->hdr.version = cpu_to_le16(5);
4579 reply->hdr.compat_version = cpu_to_le16(4);
4580
4581 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4582 ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4583
4584 ceph_con_send(&recon_state->session->s_con, reply);
4585 ceph_pagelist_release(recon_state->pagelist);
4586
4587 recon_state->pagelist = _pagelist;
4588 recon_state->nr_caps = 0;
4589 recon_state->nr_realms = 0;
4590 recon_state->msg_version = 5;
4591 return 0;
4592 fail:
4593 ceph_msg_put(reply);
4594 fail_msg:
4595 ceph_pagelist_release(_pagelist);
4596 return err;
4597 }
4598
d_find_primary(struct inode * inode)4599 static struct dentry* d_find_primary(struct inode *inode)
4600 {
4601 struct dentry *alias, *dn = NULL;
4602
4603 if (hlist_empty(&inode->i_dentry))
4604 return NULL;
4605
4606 spin_lock(&inode->i_lock);
4607 if (hlist_empty(&inode->i_dentry))
4608 goto out_unlock;
4609
4610 if (S_ISDIR(inode->i_mode)) {
4611 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4612 if (!IS_ROOT(alias))
4613 dn = dget(alias);
4614 goto out_unlock;
4615 }
4616
4617 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4618 spin_lock(&alias->d_lock);
4619 if (!d_unhashed(alias) &&
4620 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4621 dn = dget_dlock(alias);
4622 }
4623 spin_unlock(&alias->d_lock);
4624 if (dn)
4625 break;
4626 }
4627 out_unlock:
4628 spin_unlock(&inode->i_lock);
4629 return dn;
4630 }
4631
4632 /*
4633 * Encode information about a cap for a reconnect with the MDS.
4634 */
reconnect_caps_cb(struct inode * inode,int mds,void * arg)4635 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4636 {
4637 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
4638 struct ceph_client *cl = ceph_inode_to_client(inode);
4639 union {
4640 struct ceph_mds_cap_reconnect v2;
4641 struct ceph_mds_cap_reconnect_v1 v1;
4642 } rec;
4643 struct ceph_inode_info *ci = ceph_inode(inode);
4644 struct ceph_reconnect_state *recon_state = arg;
4645 struct ceph_pagelist *pagelist = recon_state->pagelist;
4646 struct dentry *dentry;
4647 struct ceph_cap *cap;
4648 struct ceph_path_info path_info = {0};
4649 int err;
4650 u64 snap_follows;
4651
4652 dentry = d_find_primary(inode);
4653 if (dentry) {
4654 /* set pathbase to parent dir when msg_version >= 2 */
4655 char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info,
4656 recon_state->msg_version >= 2);
4657 dput(dentry);
4658 if (IS_ERR(path)) {
4659 err = PTR_ERR(path);
4660 goto out_err;
4661 }
4662 }
4663
4664 spin_lock(&ci->i_ceph_lock);
4665 cap = __get_cap_for_mds(ci, mds);
4666 if (!cap) {
4667 spin_unlock(&ci->i_ceph_lock);
4668 err = 0;
4669 goto out_err;
4670 }
4671 doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode,
4672 ceph_vinop(inode), cap, cap->cap_id,
4673 ceph_cap_string(cap->issued));
4674
4675 cap->seq = 0; /* reset cap seq */
4676 cap->issue_seq = 0; /* and issue_seq */
4677 cap->mseq = 0; /* and migrate_seq */
4678 cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4679
4680 /* These are lost when the session goes away */
4681 if (S_ISDIR(inode->i_mode)) {
4682 if (cap->issued & CEPH_CAP_DIR_CREATE) {
4683 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4684 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4685 }
4686 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4687 }
4688
4689 if (recon_state->msg_version >= 2) {
4690 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4691 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4692 rec.v2.issued = cpu_to_le32(cap->issued);
4693 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4694 rec.v2.pathbase = cpu_to_le64(path_info.vino.ino);
4695 rec.v2.flock_len = (__force __le32)
4696 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4697 } else {
4698 struct timespec64 ts;
4699
4700 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4701 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4702 rec.v1.issued = cpu_to_le32(cap->issued);
4703 rec.v1.size = cpu_to_le64(i_size_read(inode));
4704 ts = inode_get_mtime(inode);
4705 ceph_encode_timespec64(&rec.v1.mtime, &ts);
4706 ts = inode_get_atime(inode);
4707 ceph_encode_timespec64(&rec.v1.atime, &ts);
4708 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4709 rec.v1.pathbase = cpu_to_le64(path_info.vino.ino);
4710 }
4711
4712 if (list_empty(&ci->i_cap_snaps)) {
4713 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4714 } else {
4715 struct ceph_cap_snap *capsnap =
4716 list_first_entry(&ci->i_cap_snaps,
4717 struct ceph_cap_snap, ci_item);
4718 snap_follows = capsnap->follows;
4719 }
4720 spin_unlock(&ci->i_ceph_lock);
4721
4722 if (recon_state->msg_version >= 2) {
4723 int num_fcntl_locks, num_flock_locks;
4724 struct ceph_filelock *flocks = NULL;
4725 size_t struct_len, total_len = sizeof(u64);
4726 u8 struct_v = 0;
4727
4728 encode_again:
4729 if (rec.v2.flock_len) {
4730 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4731 } else {
4732 num_fcntl_locks = 0;
4733 num_flock_locks = 0;
4734 }
4735 if (num_fcntl_locks + num_flock_locks > 0) {
4736 flocks = kmalloc_objs(struct ceph_filelock,
4737 num_fcntl_locks + num_flock_locks,
4738 GFP_NOFS);
4739 if (!flocks) {
4740 err = -ENOMEM;
4741 goto out_err;
4742 }
4743 err = ceph_encode_locks_to_buffer(inode, flocks,
4744 num_fcntl_locks,
4745 num_flock_locks);
4746 if (err) {
4747 kfree(flocks);
4748 flocks = NULL;
4749 if (err == -ENOSPC)
4750 goto encode_again;
4751 goto out_err;
4752 }
4753 } else {
4754 kfree(flocks);
4755 flocks = NULL;
4756 }
4757
4758 if (recon_state->msg_version >= 3) {
4759 /* version, compat_version and struct_len */
4760 total_len += 2 * sizeof(u8) + sizeof(u32);
4761 struct_v = 2;
4762 }
4763 /*
4764 * number of encoded locks is stable, so copy to pagelist
4765 */
4766 struct_len = 2 * sizeof(u32) +
4767 (num_fcntl_locks + num_flock_locks) *
4768 sizeof(struct ceph_filelock);
4769 rec.v2.flock_len = cpu_to_le32(struct_len);
4770
4771 struct_len += sizeof(u32) + path_info.pathlen + sizeof(rec.v2);
4772
4773 if (struct_v >= 2)
4774 struct_len += sizeof(u64); /* snap_follows */
4775
4776 total_len += struct_len;
4777
4778 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4779 err = send_reconnect_partial(recon_state);
4780 if (err)
4781 goto out_freeflocks;
4782 pagelist = recon_state->pagelist;
4783 }
4784
4785 err = ceph_pagelist_reserve(pagelist, total_len);
4786 if (err)
4787 goto out_freeflocks;
4788
4789 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4790 if (recon_state->msg_version >= 3) {
4791 ceph_pagelist_encode_8(pagelist, struct_v);
4792 ceph_pagelist_encode_8(pagelist, 1);
4793 ceph_pagelist_encode_32(pagelist, struct_len);
4794 }
4795 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4796 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4797 ceph_locks_to_pagelist(flocks, pagelist,
4798 num_fcntl_locks, num_flock_locks);
4799 if (struct_v >= 2)
4800 ceph_pagelist_encode_64(pagelist, snap_follows);
4801 out_freeflocks:
4802 kfree(flocks);
4803 } else {
4804 err = ceph_pagelist_reserve(pagelist,
4805 sizeof(u64) + sizeof(u32) +
4806 path_info.pathlen + sizeof(rec.v1));
4807 if (err)
4808 goto out_err;
4809
4810 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4811 ceph_pagelist_encode_string(pagelist, (char *)path_info.path, path_info.pathlen);
4812 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4813 }
4814
4815 out_err:
4816 ceph_mdsc_free_path_info(&path_info);
4817 if (!err)
4818 recon_state->nr_caps++;
4819 return err;
4820 }
4821
encode_snap_realms(struct ceph_mds_client * mdsc,struct ceph_reconnect_state * recon_state)4822 static int encode_snap_realms(struct ceph_mds_client *mdsc,
4823 struct ceph_reconnect_state *recon_state)
4824 {
4825 struct rb_node *p;
4826 struct ceph_pagelist *pagelist = recon_state->pagelist;
4827 struct ceph_client *cl = mdsc->fsc->client;
4828 int err = 0;
4829
4830 if (recon_state->msg_version >= 4) {
4831 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4832 if (err < 0)
4833 goto fail;
4834 }
4835
4836 /*
4837 * snaprealms. we provide mds with the ino, seq (version), and
4838 * parent for all of our realms. If the mds has any newer info,
4839 * it will tell us.
4840 */
4841 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4842 struct ceph_snap_realm *realm =
4843 rb_entry(p, struct ceph_snap_realm, node);
4844 struct ceph_mds_snaprealm_reconnect sr_rec;
4845
4846 if (recon_state->msg_version >= 4) {
4847 size_t need = sizeof(u8) * 2 + sizeof(u32) +
4848 sizeof(sr_rec);
4849
4850 if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4851 err = send_reconnect_partial(recon_state);
4852 if (err)
4853 goto fail;
4854 pagelist = recon_state->pagelist;
4855 }
4856
4857 err = ceph_pagelist_reserve(pagelist, need);
4858 if (err)
4859 goto fail;
4860
4861 ceph_pagelist_encode_8(pagelist, 1);
4862 ceph_pagelist_encode_8(pagelist, 1);
4863 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4864 }
4865
4866 doutc(cl, " adding snap realm %llx seq %lld parent %llx\n",
4867 realm->ino, realm->seq, realm->parent_ino);
4868 sr_rec.ino = cpu_to_le64(realm->ino);
4869 sr_rec.seq = cpu_to_le64(realm->seq);
4870 sr_rec.parent = cpu_to_le64(realm->parent_ino);
4871
4872 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4873 if (err)
4874 goto fail;
4875
4876 recon_state->nr_realms++;
4877 }
4878 fail:
4879 return err;
4880 }
4881
4882
4883 /*
4884 * If an MDS fails and recovers, clients need to reconnect in order to
4885 * reestablish shared state. This includes all caps issued through
4886 * this session _and_ the snap_realm hierarchy. Because it's not
4887 * clear which snap realms the mds cares about, we send everything we
4888 * know about.. that ensures we'll then get any new info the
4889 * recovering MDS might have.
4890 *
4891 * This is a relatively heavyweight operation, but it's rare.
4892 */
send_mds_reconnect(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)4893 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4894 struct ceph_mds_session *session)
4895 {
4896 struct ceph_client *cl = mdsc->fsc->client;
4897 struct ceph_msg *reply;
4898 int mds = session->s_mds;
4899 int err = -ENOMEM;
4900 struct ceph_reconnect_state recon_state = {
4901 .session = session,
4902 };
4903 LIST_HEAD(dispose);
4904
4905 pr_info_client(cl, "mds%d reconnect start\n", mds);
4906
4907 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4908 if (!recon_state.pagelist)
4909 goto fail_nopagelist;
4910
4911 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4912 if (!reply)
4913 goto fail_nomsg;
4914
4915 xa_destroy(&session->s_delegated_inos);
4916
4917 mutex_lock(&session->s_mutex);
4918 session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4919 session->s_seq = 0;
4920
4921 doutc(cl, "session %p state %s\n", session,
4922 ceph_session_state_name(session->s_state));
4923
4924 atomic_inc(&session->s_cap_gen);
4925
4926 spin_lock(&session->s_cap_lock);
4927 /* don't know if session is readonly */
4928 session->s_readonly = 0;
4929 /*
4930 * notify __ceph_remove_cap() that we are composing cap reconnect.
4931 * If a cap get released before being added to the cap reconnect,
4932 * __ceph_remove_cap() should skip queuing cap release.
4933 */
4934 session->s_cap_reconnect = 1;
4935 /* drop old cap expires; we're about to reestablish that state */
4936 detach_cap_releases(session, &dispose);
4937 spin_unlock(&session->s_cap_lock);
4938 dispose_cap_releases(mdsc, &dispose);
4939
4940 /* trim unused caps to reduce MDS's cache rejoin time */
4941 if (mdsc->fsc->sb->s_root)
4942 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4943
4944 ceph_con_close(&session->s_con);
4945 ceph_con_open(&session->s_con,
4946 CEPH_ENTITY_TYPE_MDS, mds,
4947 ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4948
4949 /* replay unsafe requests */
4950 replay_unsafe_requests(mdsc, session);
4951
4952 ceph_early_kick_flushing_caps(mdsc, session);
4953
4954 down_read(&mdsc->snap_rwsem);
4955
4956 /* placeholder for nr_caps */
4957 err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4958 if (err)
4959 goto fail;
4960
4961 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4962 recon_state.msg_version = 3;
4963 recon_state.allow_multi = true;
4964 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4965 recon_state.msg_version = 3;
4966 } else {
4967 recon_state.msg_version = 2;
4968 }
4969 /* traverse this session's caps */
4970 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4971
4972 spin_lock(&session->s_cap_lock);
4973 session->s_cap_reconnect = 0;
4974 spin_unlock(&session->s_cap_lock);
4975
4976 if (err < 0)
4977 goto fail;
4978
4979 /* check if all realms can be encoded into current message */
4980 if (mdsc->num_snap_realms) {
4981 size_t total_len =
4982 recon_state.pagelist->length +
4983 mdsc->num_snap_realms *
4984 sizeof(struct ceph_mds_snaprealm_reconnect);
4985 if (recon_state.msg_version >= 4) {
4986 /* number of realms */
4987 total_len += sizeof(u32);
4988 /* version, compat_version and struct_len */
4989 total_len += mdsc->num_snap_realms *
4990 (2 * sizeof(u8) + sizeof(u32));
4991 }
4992 if (total_len > RECONNECT_MAX_SIZE) {
4993 if (!recon_state.allow_multi) {
4994 err = -ENOSPC;
4995 goto fail;
4996 }
4997 if (recon_state.nr_caps) {
4998 err = send_reconnect_partial(&recon_state);
4999 if (err)
5000 goto fail;
5001 }
5002 recon_state.msg_version = 5;
5003 }
5004 }
5005
5006 err = encode_snap_realms(mdsc, &recon_state);
5007 if (err < 0)
5008 goto fail;
5009
5010 if (recon_state.msg_version >= 5) {
5011 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
5012 if (err < 0)
5013 goto fail;
5014 }
5015
5016 if (recon_state.nr_caps || recon_state.nr_realms) {
5017 struct page *page =
5018 list_first_entry(&recon_state.pagelist->head,
5019 struct page, lru);
5020 __le32 *addr = kmap_atomic(page);
5021 if (recon_state.nr_caps) {
5022 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
5023 *addr = cpu_to_le32(recon_state.nr_caps);
5024 } else if (recon_state.msg_version >= 4) {
5025 *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
5026 }
5027 kunmap_atomic(addr);
5028 }
5029
5030 reply->hdr.version = cpu_to_le16(recon_state.msg_version);
5031 if (recon_state.msg_version >= 4)
5032 reply->hdr.compat_version = cpu_to_le16(4);
5033
5034 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
5035 ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
5036
5037 ceph_con_send(&session->s_con, reply);
5038
5039 mutex_unlock(&session->s_mutex);
5040
5041 mutex_lock(&mdsc->mutex);
5042 __wake_requests(mdsc, &session->s_waiting);
5043 mutex_unlock(&mdsc->mutex);
5044
5045 up_read(&mdsc->snap_rwsem);
5046 ceph_pagelist_release(recon_state.pagelist);
5047 return;
5048
5049 fail:
5050 ceph_msg_put(reply);
5051 up_read(&mdsc->snap_rwsem);
5052 mutex_unlock(&session->s_mutex);
5053 fail_nomsg:
5054 ceph_pagelist_release(recon_state.pagelist);
5055 fail_nopagelist:
5056 pr_err_client(cl, "error %d preparing reconnect for mds%d\n",
5057 err, mds);
5058 return;
5059 }
5060
5061
5062 /*
5063 * compare old and new mdsmaps, kicking requests
5064 * and closing out old connections as necessary
5065 *
5066 * called under mdsc->mutex.
5067 */
check_new_map(struct ceph_mds_client * mdsc,struct ceph_mdsmap * newmap,struct ceph_mdsmap * oldmap)5068 static void check_new_map(struct ceph_mds_client *mdsc,
5069 struct ceph_mdsmap *newmap,
5070 struct ceph_mdsmap *oldmap)
5071 {
5072 int i, j, err;
5073 int oldstate, newstate;
5074 struct ceph_mds_session *s;
5075 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
5076 struct ceph_client *cl = mdsc->fsc->client;
5077
5078 doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch);
5079
5080 if (newmap->m_info) {
5081 for (i = 0; i < newmap->possible_max_rank; i++) {
5082 for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
5083 set_bit(newmap->m_info[i].export_targets[j], targets);
5084 }
5085 }
5086
5087 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5088 if (!mdsc->sessions[i])
5089 continue;
5090 s = mdsc->sessions[i];
5091 oldstate = ceph_mdsmap_get_state(oldmap, i);
5092 newstate = ceph_mdsmap_get_state(newmap, i);
5093
5094 doutc(cl, "mds%d state %s%s -> %s%s (session %s)\n",
5095 i, ceph_mds_state_name(oldstate),
5096 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
5097 ceph_mds_state_name(newstate),
5098 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
5099 ceph_session_state_name(s->s_state));
5100
5101 if (i >= newmap->possible_max_rank) {
5102 /* force close session for stopped mds */
5103 ceph_get_mds_session(s);
5104 __unregister_session(mdsc, s);
5105 __wake_requests(mdsc, &s->s_waiting);
5106 mutex_unlock(&mdsc->mutex);
5107
5108 mutex_lock(&s->s_mutex);
5109 cleanup_session_requests(mdsc, s);
5110 remove_session_caps(s);
5111 mutex_unlock(&s->s_mutex);
5112
5113 ceph_put_mds_session(s);
5114
5115 mutex_lock(&mdsc->mutex);
5116 kick_requests(mdsc, i);
5117 continue;
5118 }
5119
5120 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
5121 ceph_mdsmap_get_addr(newmap, i),
5122 sizeof(struct ceph_entity_addr))) {
5123 /* just close it */
5124 mutex_unlock(&mdsc->mutex);
5125 mutex_lock(&s->s_mutex);
5126 mutex_lock(&mdsc->mutex);
5127 ceph_con_close(&s->s_con);
5128 mutex_unlock(&s->s_mutex);
5129 s->s_state = CEPH_MDS_SESSION_RESTARTING;
5130 } else if (oldstate == newstate) {
5131 continue; /* nothing new with this mds */
5132 }
5133
5134 /*
5135 * send reconnect?
5136 */
5137 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
5138 newstate >= CEPH_MDS_STATE_RECONNECT) {
5139 mutex_unlock(&mdsc->mutex);
5140 clear_bit(i, targets);
5141 send_mds_reconnect(mdsc, s);
5142 mutex_lock(&mdsc->mutex);
5143 }
5144
5145 /*
5146 * kick request on any mds that has gone active.
5147 */
5148 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
5149 newstate >= CEPH_MDS_STATE_ACTIVE) {
5150 if (oldstate != CEPH_MDS_STATE_CREATING &&
5151 oldstate != CEPH_MDS_STATE_STARTING)
5152 pr_info_client(cl, "mds%d recovery completed\n",
5153 s->s_mds);
5154 kick_requests(mdsc, i);
5155 mutex_unlock(&mdsc->mutex);
5156 mutex_lock(&s->s_mutex);
5157 mutex_lock(&mdsc->mutex);
5158 ceph_kick_flushing_caps(mdsc, s);
5159 mutex_unlock(&s->s_mutex);
5160 wake_up_session_caps(s, RECONNECT);
5161 }
5162 }
5163
5164 /*
5165 * Only open and reconnect sessions that don't exist yet.
5166 */
5167 for (i = 0; i < newmap->possible_max_rank; i++) {
5168 /*
5169 * In case the import MDS is crashed just after
5170 * the EImportStart journal is flushed, so when
5171 * a standby MDS takes over it and is replaying
5172 * the EImportStart journal the new MDS daemon
5173 * will wait the client to reconnect it, but the
5174 * client may never register/open the session yet.
5175 *
5176 * Will try to reconnect that MDS daemon if the
5177 * rank number is in the export targets array and
5178 * is the up:reconnect state.
5179 */
5180 newstate = ceph_mdsmap_get_state(newmap, i);
5181 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
5182 continue;
5183
5184 /*
5185 * The session maybe registered and opened by some
5186 * requests which were choosing random MDSes during
5187 * the mdsc->mutex's unlock/lock gap below in rare
5188 * case. But the related MDS daemon will just queue
5189 * that requests and be still waiting for the client's
5190 * reconnection request in up:reconnect state.
5191 */
5192 s = __ceph_lookup_mds_session(mdsc, i);
5193 if (likely(!s)) {
5194 s = __open_export_target_session(mdsc, i);
5195 if (IS_ERR(s)) {
5196 err = PTR_ERR(s);
5197 pr_err_client(cl,
5198 "failed to open export target session, err %d\n",
5199 err);
5200 continue;
5201 }
5202 }
5203 doutc(cl, "send reconnect to export target mds.%d\n", i);
5204 mutex_unlock(&mdsc->mutex);
5205 send_mds_reconnect(mdsc, s);
5206 ceph_put_mds_session(s);
5207 mutex_lock(&mdsc->mutex);
5208 }
5209
5210 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
5211 s = mdsc->sessions[i];
5212 if (!s)
5213 continue;
5214 if (!ceph_mdsmap_is_laggy(newmap, i))
5215 continue;
5216 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5217 s->s_state == CEPH_MDS_SESSION_HUNG ||
5218 s->s_state == CEPH_MDS_SESSION_CLOSING) {
5219 doutc(cl, " connecting to export targets of laggy mds%d\n", i);
5220 __open_export_target_sessions(mdsc, s);
5221 }
5222 }
5223 }
5224
5225
5226
5227 /*
5228 * leases
5229 */
5230
5231 /*
5232 * caller must hold session s_mutex, dentry->d_lock
5233 */
__ceph_mdsc_drop_dentry_lease(struct dentry * dentry)5234 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
5235 {
5236 struct ceph_dentry_info *di = ceph_dentry(dentry);
5237
5238 ceph_put_mds_session(di->lease_session);
5239 di->lease_session = NULL;
5240 }
5241
handle_lease(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)5242 static void handle_lease(struct ceph_mds_client *mdsc,
5243 struct ceph_mds_session *session,
5244 struct ceph_msg *msg)
5245 {
5246 struct ceph_client *cl = mdsc->fsc->client;
5247 struct super_block *sb = mdsc->fsc->sb;
5248 struct inode *inode;
5249 struct dentry *parent, *dentry;
5250 struct ceph_dentry_info *di;
5251 int mds = session->s_mds;
5252 struct ceph_mds_lease *h = msg->front.iov_base;
5253 u32 seq;
5254 struct ceph_vino vino;
5255 struct qstr dname;
5256 int release = 0;
5257
5258 doutc(cl, "from mds%d\n", mds);
5259
5260 if (!ceph_inc_mds_stopping_blocker(mdsc, session))
5261 return;
5262
5263 /* decode */
5264 if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
5265 goto bad;
5266 vino.ino = le64_to_cpu(h->ino);
5267 vino.snap = CEPH_NOSNAP;
5268 seq = le32_to_cpu(h->seq);
5269 dname.len = get_unaligned_le32(h + 1);
5270 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
5271 goto bad;
5272 dname.name = (void *)(h + 1) + sizeof(u32);
5273
5274 /* lookup inode */
5275 inode = ceph_find_inode(sb, vino);
5276 doutc(cl, "%s, ino %llx %p %.*s\n", ceph_lease_op_name(h->action),
5277 vino.ino, inode, dname.len, dname.name);
5278
5279 mutex_lock(&session->s_mutex);
5280 if (!inode) {
5281 doutc(cl, "no inode %llx\n", vino.ino);
5282 goto release;
5283 }
5284
5285 /* dentry */
5286 parent = d_find_alias(inode);
5287 if (!parent) {
5288 doutc(cl, "no parent dentry on inode %p\n", inode);
5289 WARN_ON(1);
5290 goto release; /* hrm... */
5291 }
5292 dname.hash = full_name_hash(parent, dname.name, dname.len);
5293 dentry = d_lookup(parent, &dname);
5294 dput(parent);
5295 if (!dentry)
5296 goto release;
5297
5298 spin_lock(&dentry->d_lock);
5299 di = ceph_dentry(dentry);
5300 switch (h->action) {
5301 case CEPH_MDS_LEASE_REVOKE:
5302 if (di->lease_session == session) {
5303 if (ceph_seq_cmp(di->lease_seq, seq) > 0)
5304 h->seq = cpu_to_le32(di->lease_seq);
5305 __ceph_mdsc_drop_dentry_lease(dentry);
5306 }
5307 release = 1;
5308 break;
5309
5310 case CEPH_MDS_LEASE_RENEW:
5311 if (di->lease_session == session &&
5312 di->lease_gen == atomic_read(&session->s_cap_gen) &&
5313 di->lease_renew_from &&
5314 di->lease_renew_after == 0) {
5315 unsigned long duration =
5316 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
5317
5318 di->lease_seq = seq;
5319 di->time = di->lease_renew_from + duration;
5320 di->lease_renew_after = di->lease_renew_from +
5321 (duration >> 1);
5322 di->lease_renew_from = 0;
5323 }
5324 break;
5325 }
5326 spin_unlock(&dentry->d_lock);
5327 dput(dentry);
5328
5329 if (!release)
5330 goto out;
5331
5332 release:
5333 /* let's just reuse the same message */
5334 h->action = CEPH_MDS_LEASE_REVOKE_ACK;
5335 ceph_msg_get(msg);
5336 ceph_con_send(&session->s_con, msg);
5337
5338 out:
5339 mutex_unlock(&session->s_mutex);
5340 iput(inode);
5341
5342 ceph_dec_mds_stopping_blocker(mdsc);
5343 return;
5344
5345 bad:
5346 ceph_dec_mds_stopping_blocker(mdsc);
5347
5348 pr_err_client(cl, "corrupt lease message\n");
5349 ceph_msg_dump(msg);
5350 }
5351
ceph_mdsc_lease_send_msg(struct ceph_mds_session * session,struct dentry * dentry,char action,u32 seq)5352 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5353 struct dentry *dentry, char action,
5354 u32 seq)
5355 {
5356 struct ceph_client *cl = session->s_mdsc->fsc->client;
5357 struct ceph_msg *msg;
5358 struct ceph_mds_lease *lease;
5359 struct inode *dir;
5360 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5361
5362 doutc(cl, "identry %p %s to mds%d\n", dentry, ceph_lease_op_name(action),
5363 session->s_mds);
5364
5365 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5366 if (!msg)
5367 return;
5368 lease = msg->front.iov_base;
5369 lease->action = action;
5370 lease->seq = cpu_to_le32(seq);
5371
5372 spin_lock(&dentry->d_lock);
5373 dir = d_inode(dentry->d_parent);
5374 lease->ino = cpu_to_le64(ceph_ino(dir));
5375 lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5376
5377 put_unaligned_le32(dentry->d_name.len, lease + 1);
5378 memcpy((void *)(lease + 1) + 4,
5379 dentry->d_name.name, dentry->d_name.len);
5380 spin_unlock(&dentry->d_lock);
5381
5382 ceph_con_send(&session->s_con, msg);
5383 }
5384
5385 /*
5386 * lock unlock the session, to wait ongoing session activities
5387 */
lock_unlock_session(struct ceph_mds_session * s)5388 static void lock_unlock_session(struct ceph_mds_session *s)
5389 {
5390 mutex_lock(&s->s_mutex);
5391 mutex_unlock(&s->s_mutex);
5392 }
5393
maybe_recover_session(struct ceph_mds_client * mdsc)5394 static void maybe_recover_session(struct ceph_mds_client *mdsc)
5395 {
5396 struct ceph_client *cl = mdsc->fsc->client;
5397 struct ceph_fs_client *fsc = mdsc->fsc;
5398
5399 if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5400 return;
5401
5402 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5403 return;
5404
5405 if (!READ_ONCE(fsc->blocklisted))
5406 return;
5407
5408 pr_info_client(cl, "auto reconnect after blocklisted\n");
5409 ceph_force_reconnect(fsc->sb);
5410 }
5411
check_session_state(struct ceph_mds_session * s)5412 bool check_session_state(struct ceph_mds_session *s)
5413 {
5414 struct ceph_client *cl = s->s_mdsc->fsc->client;
5415
5416 switch (s->s_state) {
5417 case CEPH_MDS_SESSION_OPEN:
5418 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5419 s->s_state = CEPH_MDS_SESSION_HUNG;
5420 pr_info_client(cl, "mds%d hung\n", s->s_mds);
5421 }
5422 break;
5423 case CEPH_MDS_SESSION_CLOSING:
5424 case CEPH_MDS_SESSION_NEW:
5425 case CEPH_MDS_SESSION_RESTARTING:
5426 case CEPH_MDS_SESSION_CLOSED:
5427 case CEPH_MDS_SESSION_REJECTED:
5428 return false;
5429 }
5430
5431 return true;
5432 }
5433
5434 /*
5435 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5436 * then we need to retransmit that request.
5437 */
inc_session_sequence(struct ceph_mds_session * s)5438 void inc_session_sequence(struct ceph_mds_session *s)
5439 {
5440 struct ceph_client *cl = s->s_mdsc->fsc->client;
5441
5442 lockdep_assert_held(&s->s_mutex);
5443
5444 s->s_seq++;
5445
5446 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5447 int ret;
5448
5449 doutc(cl, "resending session close request for mds%d\n", s->s_mds);
5450 ret = request_close_session(s);
5451 if (ret < 0)
5452 pr_err_client(cl, "unable to close session to mds%d: %d\n",
5453 s->s_mds, ret);
5454 }
5455 }
5456
5457 /*
5458 * delayed work -- periodically trim expired leases, renew caps with mds. If
5459 * the @delay parameter is set to 0 or if it's more than 5 secs, the default
5460 * workqueue delay value of 5 secs will be used.
5461 */
schedule_delayed(struct ceph_mds_client * mdsc,unsigned long delay)5462 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5463 {
5464 unsigned long max_delay = HZ * 5;
5465
5466 /* 5 secs default delay */
5467 if (!delay || (delay > max_delay))
5468 delay = max_delay;
5469 schedule_delayed_work(&mdsc->delayed_work,
5470 round_jiffies_relative(delay));
5471 }
5472
delayed_work(struct work_struct * work)5473 static void delayed_work(struct work_struct *work)
5474 {
5475 struct ceph_mds_client *mdsc =
5476 container_of(work, struct ceph_mds_client, delayed_work.work);
5477 unsigned long delay;
5478 int renew_interval;
5479 int renew_caps;
5480 int i;
5481
5482 doutc(mdsc->fsc->client, "mdsc delayed_work\n");
5483
5484 if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5485 return;
5486
5487 mutex_lock(&mdsc->mutex);
5488 renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5489 renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5490 mdsc->last_renew_caps);
5491 if (renew_caps)
5492 mdsc->last_renew_caps = jiffies;
5493
5494 for (i = 0; i < mdsc->max_sessions; i++) {
5495 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5496 if (!s)
5497 continue;
5498
5499 if (!check_session_state(s)) {
5500 ceph_put_mds_session(s);
5501 continue;
5502 }
5503 mutex_unlock(&mdsc->mutex);
5504
5505 ceph_flush_session_cap_releases(mdsc, s);
5506
5507 mutex_lock(&s->s_mutex);
5508 if (renew_caps)
5509 send_renew_caps(mdsc, s);
5510 else
5511 ceph_con_keepalive(&s->s_con);
5512 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5513 s->s_state == CEPH_MDS_SESSION_HUNG)
5514 ceph_send_cap_releases(mdsc, s);
5515 mutex_unlock(&s->s_mutex);
5516 ceph_put_mds_session(s);
5517
5518 mutex_lock(&mdsc->mutex);
5519 }
5520 mutex_unlock(&mdsc->mutex);
5521
5522 delay = ceph_check_delayed_caps(mdsc);
5523
5524 ceph_queue_cap_reclaim_work(mdsc);
5525
5526 ceph_trim_snapid_map(mdsc);
5527
5528 maybe_recover_session(mdsc);
5529
5530 schedule_delayed(mdsc, delay);
5531 }
5532
ceph_mdsc_init(struct ceph_fs_client * fsc)5533 int ceph_mdsc_init(struct ceph_fs_client *fsc)
5534
5535 {
5536 struct ceph_mds_client *mdsc;
5537 int err;
5538
5539 mdsc = kzalloc_obj(struct ceph_mds_client, GFP_NOFS);
5540 if (!mdsc)
5541 return -ENOMEM;
5542 mdsc->fsc = fsc;
5543 mutex_init(&mdsc->mutex);
5544 mdsc->mdsmap = kzalloc_obj(*mdsc->mdsmap, GFP_NOFS);
5545 if (!mdsc->mdsmap) {
5546 err = -ENOMEM;
5547 goto err_mdsc;
5548 }
5549
5550 init_completion(&mdsc->safe_umount_waiters);
5551 spin_lock_init(&mdsc->stopping_lock);
5552 atomic_set(&mdsc->stopping_blockers, 0);
5553 init_completion(&mdsc->stopping_waiter);
5554 atomic64_set(&mdsc->dirty_folios, 0);
5555 init_waitqueue_head(&mdsc->flush_end_wq);
5556 init_waitqueue_head(&mdsc->session_close_wq);
5557 INIT_LIST_HEAD(&mdsc->waiting_for_map);
5558 mdsc->quotarealms_inodes = RB_ROOT;
5559 mutex_init(&mdsc->quotarealms_inodes_mutex);
5560 init_rwsem(&mdsc->snap_rwsem);
5561 mdsc->snap_realms = RB_ROOT;
5562 INIT_LIST_HEAD(&mdsc->snap_empty);
5563 spin_lock_init(&mdsc->snap_empty_lock);
5564 mdsc->request_tree = RB_ROOT;
5565 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5566 mdsc->last_renew_caps = jiffies;
5567 INIT_LIST_HEAD(&mdsc->cap_delay_list);
5568 #ifdef CONFIG_DEBUG_FS
5569 INIT_LIST_HEAD(&mdsc->cap_wait_list);
5570 #endif
5571 spin_lock_init(&mdsc->cap_delay_lock);
5572 INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
5573 INIT_LIST_HEAD(&mdsc->snap_flush_list);
5574 spin_lock_init(&mdsc->snap_flush_lock);
5575 mdsc->last_cap_flush_tid = 1;
5576 INIT_LIST_HEAD(&mdsc->cap_flush_list);
5577 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5578 spin_lock_init(&mdsc->cap_dirty_lock);
5579 init_waitqueue_head(&mdsc->cap_flushing_wq);
5580 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5581 INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
5582 err = ceph_metric_init(&mdsc->metric);
5583 if (err)
5584 goto err_mdsmap;
5585
5586 spin_lock_init(&mdsc->dentry_list_lock);
5587 INIT_LIST_HEAD(&mdsc->dentry_leases);
5588 INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5589
5590 ceph_caps_init(mdsc);
5591 ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5592
5593 spin_lock_init(&mdsc->snapid_map_lock);
5594 mdsc->snapid_map_tree = RB_ROOT;
5595 INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5596
5597 init_rwsem(&mdsc->pool_perm_rwsem);
5598 mdsc->pool_perm_tree = RB_ROOT;
5599
5600 strscpy(mdsc->nodename, utsname()->nodename,
5601 sizeof(mdsc->nodename));
5602
5603 fsc->mdsc = mdsc;
5604 return 0;
5605
5606 err_mdsmap:
5607 kfree(mdsc->mdsmap);
5608 err_mdsc:
5609 kfree(mdsc);
5610 return err;
5611 }
5612
5613 /*
5614 * Wait for safe replies on open mds requests. If we time out, drop
5615 * all requests from the tree to avoid dangling dentry refs.
5616 */
wait_requests(struct ceph_mds_client * mdsc)5617 static void wait_requests(struct ceph_mds_client *mdsc)
5618 {
5619 struct ceph_client *cl = mdsc->fsc->client;
5620 struct ceph_options *opts = mdsc->fsc->client->options;
5621 struct ceph_mds_request *req;
5622
5623 mutex_lock(&mdsc->mutex);
5624 if (__get_oldest_req(mdsc)) {
5625 mutex_unlock(&mdsc->mutex);
5626
5627 doutc(cl, "waiting for requests\n");
5628 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5629 ceph_timeout_jiffies(opts->mount_timeout));
5630
5631 /* tear down remaining requests */
5632 mutex_lock(&mdsc->mutex);
5633 while ((req = __get_oldest_req(mdsc))) {
5634 doutc(cl, "timed out on tid %llu\n", req->r_tid);
5635 list_del_init(&req->r_wait);
5636 __unregister_request(mdsc, req);
5637 }
5638 }
5639 mutex_unlock(&mdsc->mutex);
5640 doutc(cl, "done\n");
5641 }
5642
send_flush_mdlog(struct ceph_mds_session * s)5643 void send_flush_mdlog(struct ceph_mds_session *s)
5644 {
5645 struct ceph_client *cl = s->s_mdsc->fsc->client;
5646 struct ceph_msg *msg;
5647
5648 /*
5649 * Pre-luminous MDS crashes when it sees an unknown session request
5650 */
5651 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5652 return;
5653
5654 mutex_lock(&s->s_mutex);
5655 doutc(cl, "request mdlog flush to mds%d (%s)s seq %lld\n",
5656 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5657 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5658 s->s_seq);
5659 if (!msg) {
5660 pr_err_client(cl, "failed to request mdlog flush to mds%d (%s) seq %lld\n",
5661 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5662 } else {
5663 ceph_con_send(&s->s_con, msg);
5664 }
5665 mutex_unlock(&s->s_mutex);
5666 }
5667
ceph_mds_auth_match(struct ceph_mds_client * mdsc,struct ceph_mds_cap_auth * auth,const struct cred * cred,char * tpath)5668 static int ceph_mds_auth_match(struct ceph_mds_client *mdsc,
5669 struct ceph_mds_cap_auth *auth,
5670 const struct cred *cred,
5671 char *tpath)
5672 {
5673 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5674 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5675 struct ceph_client *cl = mdsc->fsc->client;
5676 const char *fs_name = mdsc->mdsmap->m_fs_name;
5677 const char *spath = mdsc->fsc->mount_options->server_path;
5678 bool gid_matched = false;
5679 u32 gid, tlen, len;
5680 int i, j;
5681
5682 doutc(cl, "fsname check fs_name=%s match.fs_name=%s\n",
5683 fs_name, auth->match.fs_name ? auth->match.fs_name : "");
5684
5685 if (!ceph_namespace_match(auth->match.fs_name, fs_name)) {
5686 /* fsname mismatch, try next one */
5687 return 0;
5688 }
5689
5690 doutc(cl, "match.uid %lld\n", auth->match.uid);
5691 if (auth->match.uid != MDS_AUTH_UID_ANY) {
5692 if (auth->match.uid != caller_uid)
5693 return 0;
5694 if (auth->match.num_gids) {
5695 for (i = 0; i < auth->match.num_gids; i++) {
5696 if (caller_gid == auth->match.gids[i])
5697 gid_matched = true;
5698 }
5699 if (!gid_matched && cred->group_info->ngroups) {
5700 for (i = 0; i < cred->group_info->ngroups; i++) {
5701 gid = from_kgid(&init_user_ns,
5702 cred->group_info->gid[i]);
5703 for (j = 0; j < auth->match.num_gids; j++) {
5704 if (gid == auth->match.gids[j]) {
5705 gid_matched = true;
5706 break;
5707 }
5708 }
5709 if (gid_matched)
5710 break;
5711 }
5712 }
5713 if (!gid_matched)
5714 return 0;
5715 }
5716 }
5717
5718 /* path match */
5719 if (auth->match.path) {
5720 if (!tpath)
5721 return 0;
5722
5723 tlen = strlen(tpath);
5724 len = strlen(auth->match.path);
5725 if (len) {
5726 char *_tpath = tpath;
5727 bool free_tpath = false;
5728 int m, n;
5729
5730 doutc(cl, "server path %s, tpath %s, match.path %s\n",
5731 spath, tpath, auth->match.path);
5732 if (spath && (m = strlen(spath)) != 1) {
5733 /* mount path + '/' + tpath + an extra space */
5734 n = m + 1 + tlen + 1;
5735 _tpath = kmalloc(n, GFP_NOFS);
5736 if (!_tpath)
5737 return -ENOMEM;
5738 /* remove the leading '/' */
5739 snprintf(_tpath, n, "%s/%s", spath + 1, tpath);
5740 free_tpath = true;
5741 tlen = strlen(_tpath);
5742 }
5743
5744 /*
5745 * Please note the tailing '/' for match.path has already
5746 * been removed when parsing.
5747 *
5748 * Remove the tailing '/' for the target path.
5749 */
5750 while (tlen && _tpath[tlen - 1] == '/') {
5751 _tpath[tlen - 1] = '\0';
5752 tlen -= 1;
5753 }
5754 doutc(cl, "_tpath %s\n", _tpath);
5755
5756 /*
5757 * In case first == _tpath && tlen == len:
5758 * match.path=/foo --> /foo _path=/foo --> match
5759 * match.path=/foo/ --> /foo _path=/foo --> match
5760 *
5761 * In case first == _tmatch.path && tlen > len:
5762 * match.path=/foo/ --> /foo _path=/foo/ --> match
5763 * match.path=/foo --> /foo _path=/foo/ --> match
5764 * match.path=/foo/ --> /foo _path=/foo/d --> match
5765 * match.path=/foo --> /foo _path=/food --> mismatch
5766 *
5767 * All the other cases --> mismatch
5768 */
5769 bool path_matched = true;
5770 char *first = strstr(_tpath, auth->match.path);
5771 if (first != _tpath ||
5772 (tlen > len && _tpath[len] != '/')) {
5773 path_matched = false;
5774 }
5775
5776 if (free_tpath)
5777 kfree(_tpath);
5778
5779 if (!path_matched)
5780 return 0;
5781 }
5782 }
5783
5784 doutc(cl, "matched\n");
5785 return 1;
5786 }
5787
ceph_mds_check_access(struct ceph_mds_client * mdsc,char * tpath,int mask)5788 int ceph_mds_check_access(struct ceph_mds_client *mdsc, char *tpath, int mask)
5789 {
5790 const struct cred *cred = get_current_cred();
5791 u32 caller_uid = from_kuid(&init_user_ns, cred->fsuid);
5792 u32 caller_gid = from_kgid(&init_user_ns, cred->fsgid);
5793 struct ceph_mds_cap_auth *rw_perms_s = NULL;
5794 struct ceph_client *cl = mdsc->fsc->client;
5795 bool root_squash_perms = true;
5796 int i, err;
5797
5798 doutc(cl, "tpath '%s', mask %d, caller_uid %d, caller_gid %d\n",
5799 tpath, mask, caller_uid, caller_gid);
5800
5801 for (i = 0; i < mdsc->s_cap_auths_num; i++) {
5802 struct ceph_mds_cap_auth *s = &mdsc->s_cap_auths[i];
5803
5804 err = ceph_mds_auth_match(mdsc, s, cred, tpath);
5805 if (err < 0) {
5806 put_cred(cred);
5807 return err;
5808 } else if (err > 0) {
5809 /* always follow the last auth caps' permission */
5810 root_squash_perms = true;
5811 rw_perms_s = NULL;
5812 if ((mask & MAY_WRITE) && s->writeable &&
5813 s->match.root_squash && (!caller_uid || !caller_gid))
5814 root_squash_perms = false;
5815
5816 if (((mask & MAY_WRITE) && !s->writeable) ||
5817 ((mask & MAY_READ) && !s->readable))
5818 rw_perms_s = s;
5819 }
5820 }
5821
5822 put_cred(cred);
5823
5824 doutc(cl, "root_squash_perms %d, rw_perms_s %p\n", root_squash_perms,
5825 rw_perms_s);
5826 if (root_squash_perms && rw_perms_s == NULL) {
5827 doutc(cl, "access allowed\n");
5828 return 0;
5829 }
5830
5831 if (!root_squash_perms) {
5832 doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write",
5833 caller_uid, caller_gid);
5834 }
5835 if (rw_perms_s) {
5836 doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d",
5837 rw_perms_s->readable, rw_perms_s->writeable,
5838 !!(mask & MAY_READ), !!(mask & MAY_WRITE));
5839 }
5840 doutc(cl, "access denied\n");
5841 return -EACCES;
5842 }
5843
5844 /*
5845 * called before mount is ro, and before dentries are torn down.
5846 * (hmm, does this still race with new lookups?)
5847 */
ceph_mdsc_pre_umount(struct ceph_mds_client * mdsc)5848 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5849 {
5850 doutc(mdsc->fsc->client, "begin\n");
5851 mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5852
5853 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5854 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5855 ceph_flush_dirty_caps(mdsc);
5856 wait_requests(mdsc);
5857
5858 /*
5859 * wait for reply handlers to drop their request refs and
5860 * their inode/dcache refs
5861 */
5862 ceph_msgr_flush();
5863
5864 ceph_cleanup_quotarealms_inodes(mdsc);
5865 doutc(mdsc->fsc->client, "done\n");
5866 }
5867
5868 /*
5869 * flush the mdlog and wait for all write mds requests to flush.
5870 */
flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client * mdsc,u64 want_tid)5871 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5872 u64 want_tid)
5873 {
5874 struct ceph_client *cl = mdsc->fsc->client;
5875 struct ceph_mds_request *req = NULL, *nextreq;
5876 struct ceph_mds_session *last_session = NULL;
5877 struct rb_node *n;
5878
5879 mutex_lock(&mdsc->mutex);
5880 doutc(cl, "want %lld\n", want_tid);
5881 restart:
5882 req = __get_oldest_req(mdsc);
5883 while (req && req->r_tid <= want_tid) {
5884 /* find next request */
5885 n = rb_next(&req->r_node);
5886 if (n)
5887 nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5888 else
5889 nextreq = NULL;
5890 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5891 (req->r_op & CEPH_MDS_OP_WRITE)) {
5892 struct ceph_mds_session *s = req->r_session;
5893
5894 if (!s) {
5895 req = nextreq;
5896 continue;
5897 }
5898
5899 /* write op */
5900 ceph_mdsc_get_request(req);
5901 if (nextreq)
5902 ceph_mdsc_get_request(nextreq);
5903 s = ceph_get_mds_session(s);
5904 mutex_unlock(&mdsc->mutex);
5905
5906 /* send flush mdlog request to MDS */
5907 if (last_session != s) {
5908 send_flush_mdlog(s);
5909 ceph_put_mds_session(last_session);
5910 last_session = s;
5911 } else {
5912 ceph_put_mds_session(s);
5913 }
5914 doutc(cl, "wait on %llu (want %llu)\n",
5915 req->r_tid, want_tid);
5916 wait_for_completion(&req->r_safe_completion);
5917
5918 mutex_lock(&mdsc->mutex);
5919 ceph_mdsc_put_request(req);
5920 if (!nextreq)
5921 break; /* next dne before, so we're done! */
5922 if (RB_EMPTY_NODE(&nextreq->r_node)) {
5923 /* next request was removed from tree */
5924 ceph_mdsc_put_request(nextreq);
5925 goto restart;
5926 }
5927 ceph_mdsc_put_request(nextreq); /* won't go away */
5928 }
5929 req = nextreq;
5930 }
5931 mutex_unlock(&mdsc->mutex);
5932 ceph_put_mds_session(last_session);
5933 doutc(cl, "done\n");
5934 }
5935
ceph_mdsc_sync(struct ceph_mds_client * mdsc)5936 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5937 {
5938 struct ceph_client *cl = mdsc->fsc->client;
5939 u64 want_tid, want_flush;
5940
5941 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5942 return;
5943
5944 doutc(cl, "sync\n");
5945 mutex_lock(&mdsc->mutex);
5946 want_tid = mdsc->last_tid;
5947 mutex_unlock(&mdsc->mutex);
5948
5949 ceph_flush_dirty_caps(mdsc);
5950 ceph_flush_cap_releases(mdsc);
5951 spin_lock(&mdsc->cap_dirty_lock);
5952 want_flush = mdsc->last_cap_flush_tid;
5953 if (!list_empty(&mdsc->cap_flush_list)) {
5954 struct ceph_cap_flush *cf =
5955 list_last_entry(&mdsc->cap_flush_list,
5956 struct ceph_cap_flush, g_list);
5957 cf->wake = true;
5958 }
5959 spin_unlock(&mdsc->cap_dirty_lock);
5960
5961 doutc(cl, "sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
5962
5963 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5964 wait_caps_flush(mdsc, want_flush);
5965 }
5966
5967 /*
5968 * true if all sessions are closed, or we force unmount
5969 */
done_closing_sessions(struct ceph_mds_client * mdsc,int skipped)5970 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5971 {
5972 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5973 return true;
5974 return atomic_read(&mdsc->num_sessions) <= skipped;
5975 }
5976
5977 /*
5978 * called after sb is ro or when metadata corrupted.
5979 */
ceph_mdsc_close_sessions(struct ceph_mds_client * mdsc)5980 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5981 {
5982 struct ceph_options *opts = mdsc->fsc->client->options;
5983 struct ceph_client *cl = mdsc->fsc->client;
5984 struct ceph_mds_session *session;
5985 int i;
5986 int skipped = 0;
5987
5988 doutc(cl, "begin\n");
5989
5990 /* close sessions */
5991 mutex_lock(&mdsc->mutex);
5992 for (i = 0; i < mdsc->max_sessions; i++) {
5993 session = __ceph_lookup_mds_session(mdsc, i);
5994 if (!session)
5995 continue;
5996 mutex_unlock(&mdsc->mutex);
5997 mutex_lock(&session->s_mutex);
5998 if (__close_session(mdsc, session) <= 0)
5999 skipped++;
6000 mutex_unlock(&session->s_mutex);
6001 ceph_put_mds_session(session);
6002 mutex_lock(&mdsc->mutex);
6003 }
6004 mutex_unlock(&mdsc->mutex);
6005
6006 doutc(cl, "waiting for sessions to close\n");
6007 wait_event_timeout(mdsc->session_close_wq,
6008 done_closing_sessions(mdsc, skipped),
6009 ceph_timeout_jiffies(opts->mount_timeout));
6010
6011 /* tear down remaining sessions */
6012 mutex_lock(&mdsc->mutex);
6013 for (i = 0; i < mdsc->max_sessions; i++) {
6014 if (mdsc->sessions[i]) {
6015 session = ceph_get_mds_session(mdsc->sessions[i]);
6016 __unregister_session(mdsc, session);
6017 mutex_unlock(&mdsc->mutex);
6018 mutex_lock(&session->s_mutex);
6019 remove_session_caps(session);
6020 mutex_unlock(&session->s_mutex);
6021 ceph_put_mds_session(session);
6022 mutex_lock(&mdsc->mutex);
6023 }
6024 }
6025 WARN_ON(!list_empty(&mdsc->cap_delay_list));
6026 mutex_unlock(&mdsc->mutex);
6027
6028 ceph_cleanup_snapid_map(mdsc);
6029 ceph_cleanup_global_and_empty_realms(mdsc);
6030
6031 cancel_work_sync(&mdsc->cap_reclaim_work);
6032 cancel_work_sync(&mdsc->cap_unlink_work);
6033 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
6034
6035 doutc(cl, "done\n");
6036 }
6037
ceph_mdsc_force_umount(struct ceph_mds_client * mdsc)6038 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
6039 {
6040 struct ceph_mds_session *session;
6041 int mds;
6042
6043 doutc(mdsc->fsc->client, "force umount\n");
6044
6045 mutex_lock(&mdsc->mutex);
6046 for (mds = 0; mds < mdsc->max_sessions; mds++) {
6047 session = __ceph_lookup_mds_session(mdsc, mds);
6048 if (!session)
6049 continue;
6050
6051 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
6052 __unregister_session(mdsc, session);
6053 __wake_requests(mdsc, &session->s_waiting);
6054 mutex_unlock(&mdsc->mutex);
6055
6056 mutex_lock(&session->s_mutex);
6057 __close_session(mdsc, session);
6058 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
6059 cleanup_session_requests(mdsc, session);
6060 remove_session_caps(session);
6061 }
6062 mutex_unlock(&session->s_mutex);
6063 ceph_put_mds_session(session);
6064
6065 mutex_lock(&mdsc->mutex);
6066 kick_requests(mdsc, mds);
6067 }
6068 __wake_requests(mdsc, &mdsc->waiting_for_map);
6069 mutex_unlock(&mdsc->mutex);
6070 }
6071
ceph_mdsc_stop(struct ceph_mds_client * mdsc)6072 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
6073 {
6074 doutc(mdsc->fsc->client, "stop\n");
6075 /*
6076 * Make sure the delayed work stopped before releasing
6077 * the resources.
6078 *
6079 * Because the cancel_delayed_work_sync() will only
6080 * guarantee that the work finishes executing. But the
6081 * delayed work will re-arm itself again after that.
6082 */
6083 flush_delayed_work(&mdsc->delayed_work);
6084
6085 if (mdsc->mdsmap)
6086 ceph_mdsmap_destroy(mdsc->mdsmap);
6087 kfree(mdsc->sessions);
6088 ceph_caps_finalize(mdsc);
6089
6090 if (mdsc->s_cap_auths) {
6091 int i;
6092
6093 for (i = 0; i < mdsc->s_cap_auths_num; i++) {
6094 kfree(mdsc->s_cap_auths[i].match.gids);
6095 kfree(mdsc->s_cap_auths[i].match.path);
6096 kfree(mdsc->s_cap_auths[i].match.fs_name);
6097 }
6098 kfree(mdsc->s_cap_auths);
6099 }
6100
6101 ceph_pool_perm_destroy(mdsc);
6102 }
6103
ceph_mdsc_destroy(struct ceph_fs_client * fsc)6104 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
6105 {
6106 struct ceph_mds_client *mdsc = fsc->mdsc;
6107 doutc(fsc->client, "%p\n", mdsc);
6108
6109 if (!mdsc)
6110 return;
6111
6112 /* flush out any connection work with references to us */
6113 ceph_msgr_flush();
6114
6115 ceph_mdsc_stop(mdsc);
6116
6117 ceph_metric_destroy(&mdsc->metric);
6118
6119 fsc->mdsc = NULL;
6120 kfree(mdsc);
6121 doutc(fsc->client, "%p done\n", mdsc);
6122 }
6123
ceph_mdsc_handle_fsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)6124 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6125 {
6126 struct ceph_fs_client *fsc = mdsc->fsc;
6127 struct ceph_client *cl = fsc->client;
6128 const char *mds_namespace = fsc->mount_options->mds_namespace;
6129 void *p = msg->front.iov_base;
6130 void *end = p + msg->front.iov_len;
6131 u32 epoch;
6132 u32 num_fs;
6133 u32 mount_fscid = (u32)-1;
6134 int err = -EINVAL;
6135
6136 ceph_decode_need(&p, end, sizeof(u32), bad);
6137 epoch = ceph_decode_32(&p);
6138
6139 doutc(cl, "epoch %u\n", epoch);
6140
6141 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
6142 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
6143
6144 ceph_decode_32_safe(&p, end, num_fs, bad);
6145 while (num_fs-- > 0) {
6146 void *info_p, *info_end;
6147 u32 info_len;
6148 u32 fscid, namelen;
6149
6150 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
6151 p += 2; // info_v, info_cv
6152 info_len = ceph_decode_32(&p);
6153 ceph_decode_need(&p, end, info_len, bad);
6154 info_p = p;
6155 info_end = p + info_len;
6156 p = info_end;
6157
6158 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
6159 fscid = ceph_decode_32(&info_p);
6160 namelen = ceph_decode_32(&info_p);
6161 ceph_decode_need(&info_p, info_end, namelen, bad);
6162
6163 if (mds_namespace &&
6164 strlen(mds_namespace) == namelen &&
6165 !strncmp(mds_namespace, (char *)info_p, namelen)) {
6166 mount_fscid = fscid;
6167 break;
6168 }
6169 }
6170
6171 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
6172 if (mount_fscid != (u32)-1) {
6173 fsc->client->monc.fs_cluster_id = mount_fscid;
6174 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
6175 0, true);
6176 ceph_monc_renew_subs(&fsc->client->monc);
6177 } else {
6178 err = -ENOENT;
6179 goto err_out;
6180 }
6181 return;
6182
6183 bad:
6184 pr_err_client(cl, "error decoding fsmap %d. Shutting down mount.\n",
6185 err);
6186 ceph_umount_begin(mdsc->fsc->sb);
6187 ceph_msg_dump(msg);
6188 err_out:
6189 mutex_lock(&mdsc->mutex);
6190 mdsc->mdsmap_err = err;
6191 __wake_requests(mdsc, &mdsc->waiting_for_map);
6192 mutex_unlock(&mdsc->mutex);
6193 }
6194
6195 /*
6196 * handle mds map update.
6197 */
ceph_mdsc_handle_mdsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)6198 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
6199 {
6200 struct ceph_client *cl = mdsc->fsc->client;
6201 u32 epoch;
6202 u32 maplen;
6203 void *p = msg->front.iov_base;
6204 void *end = p + msg->front.iov_len;
6205 struct ceph_mdsmap *newmap, *oldmap;
6206 struct ceph_fsid fsid;
6207 int err = -EINVAL;
6208
6209 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
6210 ceph_decode_copy(&p, &fsid, sizeof(fsid));
6211 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
6212 return;
6213 epoch = ceph_decode_32(&p);
6214 maplen = ceph_decode_32(&p);
6215 doutc(cl, "epoch %u len %d\n", epoch, (int)maplen);
6216
6217 /* do we need it? */
6218 mutex_lock(&mdsc->mutex);
6219 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
6220 doutc(cl, "epoch %u <= our %u\n", epoch, mdsc->mdsmap->m_epoch);
6221 mutex_unlock(&mdsc->mutex);
6222 return;
6223 }
6224
6225 newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client));
6226 if (IS_ERR(newmap)) {
6227 err = PTR_ERR(newmap);
6228 goto bad_unlock;
6229 }
6230
6231 /* swap into place */
6232 if (mdsc->mdsmap) {
6233 oldmap = mdsc->mdsmap;
6234 mdsc->mdsmap = newmap;
6235 check_new_map(mdsc, newmap, oldmap);
6236 ceph_mdsmap_destroy(oldmap);
6237 } else {
6238 mdsc->mdsmap = newmap; /* first mds map */
6239 }
6240 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
6241 MAX_LFS_FILESIZE);
6242
6243 __wake_requests(mdsc, &mdsc->waiting_for_map);
6244 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
6245 mdsc->mdsmap->m_epoch);
6246
6247 mutex_unlock(&mdsc->mutex);
6248 schedule_delayed(mdsc, 0);
6249 return;
6250
6251 bad_unlock:
6252 mutex_unlock(&mdsc->mutex);
6253 bad:
6254 pr_err_client(cl, "error decoding mdsmap %d. Shutting down mount.\n",
6255 err);
6256 ceph_umount_begin(mdsc->fsc->sb);
6257 ceph_msg_dump(msg);
6258 return;
6259 }
6260
mds_get_con(struct ceph_connection * con)6261 static struct ceph_connection *mds_get_con(struct ceph_connection *con)
6262 {
6263 struct ceph_mds_session *s = con->private;
6264
6265 if (ceph_get_mds_session(s))
6266 return con;
6267 return NULL;
6268 }
6269
mds_put_con(struct ceph_connection * con)6270 static void mds_put_con(struct ceph_connection *con)
6271 {
6272 struct ceph_mds_session *s = con->private;
6273
6274 ceph_put_mds_session(s);
6275 }
6276
6277 /*
6278 * if the client is unresponsive for long enough, the mds will kill
6279 * the session entirely.
6280 */
mds_peer_reset(struct ceph_connection * con)6281 static void mds_peer_reset(struct ceph_connection *con)
6282 {
6283 struct ceph_mds_session *s = con->private;
6284 struct ceph_mds_client *mdsc = s->s_mdsc;
6285
6286 pr_warn_client(mdsc->fsc->client, "mds%d closed our session\n",
6287 s->s_mds);
6288 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO &&
6289 ceph_mdsmap_get_state(mdsc->mdsmap, s->s_mds) >= CEPH_MDS_STATE_RECONNECT)
6290 send_mds_reconnect(mdsc, s);
6291 }
6292
mds_dispatch(struct ceph_connection * con,struct ceph_msg * msg)6293 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
6294 {
6295 struct ceph_mds_session *s = con->private;
6296 struct ceph_mds_client *mdsc = s->s_mdsc;
6297 struct ceph_client *cl = mdsc->fsc->client;
6298 int type = le16_to_cpu(msg->hdr.type);
6299
6300 mutex_lock(&mdsc->mutex);
6301 if (__verify_registered_session(mdsc, s) < 0) {
6302 mutex_unlock(&mdsc->mutex);
6303 goto out;
6304 }
6305 mutex_unlock(&mdsc->mutex);
6306
6307 switch (type) {
6308 case CEPH_MSG_MDS_MAP:
6309 ceph_mdsc_handle_mdsmap(mdsc, msg);
6310 break;
6311 case CEPH_MSG_FS_MAP_USER:
6312 ceph_mdsc_handle_fsmap(mdsc, msg);
6313 break;
6314 case CEPH_MSG_CLIENT_SESSION:
6315 handle_session(s, msg);
6316 break;
6317 case CEPH_MSG_CLIENT_REPLY:
6318 handle_reply(s, msg);
6319 break;
6320 case CEPH_MSG_CLIENT_REQUEST_FORWARD:
6321 handle_forward(mdsc, s, msg);
6322 break;
6323 case CEPH_MSG_CLIENT_CAPS:
6324 ceph_handle_caps(s, msg);
6325 break;
6326 case CEPH_MSG_CLIENT_SNAP:
6327 ceph_handle_snap(mdsc, s, msg);
6328 break;
6329 case CEPH_MSG_CLIENT_LEASE:
6330 handle_lease(mdsc, s, msg);
6331 break;
6332 case CEPH_MSG_CLIENT_QUOTA:
6333 ceph_handle_quota(mdsc, s, msg);
6334 break;
6335
6336 default:
6337 pr_err_client(cl, "received unknown message type %d %s\n",
6338 type, ceph_msg_type_name(type));
6339 }
6340 out:
6341 ceph_msg_put(msg);
6342 }
6343
6344 /*
6345 * authentication
6346 */
6347
6348 /*
6349 * Note: returned pointer is the address of a structure that's
6350 * managed separately. Caller must *not* attempt to free it.
6351 */
6352 static struct ceph_auth_handshake *
mds_get_authorizer(struct ceph_connection * con,int * proto,int force_new)6353 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
6354 {
6355 struct ceph_mds_session *s = con->private;
6356 struct ceph_mds_client *mdsc = s->s_mdsc;
6357 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6358 struct ceph_auth_handshake *auth = &s->s_auth;
6359 int ret;
6360
6361 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6362 force_new, proto, NULL, NULL);
6363 if (ret)
6364 return ERR_PTR(ret);
6365
6366 return auth;
6367 }
6368
mds_add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)6369 static int mds_add_authorizer_challenge(struct ceph_connection *con,
6370 void *challenge_buf, int challenge_buf_len)
6371 {
6372 struct ceph_mds_session *s = con->private;
6373 struct ceph_mds_client *mdsc = s->s_mdsc;
6374 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6375
6376 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
6377 challenge_buf, challenge_buf_len);
6378 }
6379
mds_verify_authorizer_reply(struct ceph_connection * con)6380 static int mds_verify_authorizer_reply(struct ceph_connection *con)
6381 {
6382 struct ceph_mds_session *s = con->private;
6383 struct ceph_mds_client *mdsc = s->s_mdsc;
6384 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6385 struct ceph_auth_handshake *auth = &s->s_auth;
6386
6387 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
6388 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
6389 NULL, NULL, NULL, NULL);
6390 }
6391
mds_invalidate_authorizer(struct ceph_connection * con)6392 static int mds_invalidate_authorizer(struct ceph_connection *con)
6393 {
6394 struct ceph_mds_session *s = con->private;
6395 struct ceph_mds_client *mdsc = s->s_mdsc;
6396 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
6397
6398 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
6399
6400 return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
6401 }
6402
mds_get_auth_request(struct ceph_connection * con,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)6403 static int mds_get_auth_request(struct ceph_connection *con,
6404 void *buf, int *buf_len,
6405 void **authorizer, int *authorizer_len)
6406 {
6407 struct ceph_mds_session *s = con->private;
6408 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6409 struct ceph_auth_handshake *auth = &s->s_auth;
6410 int ret;
6411
6412 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
6413 buf, buf_len);
6414 if (ret)
6415 return ret;
6416
6417 *authorizer = auth->authorizer_buf;
6418 *authorizer_len = auth->authorizer_buf_len;
6419 return 0;
6420 }
6421
mds_handle_auth_reply_more(struct ceph_connection * con,void * reply,int reply_len,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)6422 static int mds_handle_auth_reply_more(struct ceph_connection *con,
6423 void *reply, int reply_len,
6424 void *buf, int *buf_len,
6425 void **authorizer, int *authorizer_len)
6426 {
6427 struct ceph_mds_session *s = con->private;
6428 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6429 struct ceph_auth_handshake *auth = &s->s_auth;
6430 int ret;
6431
6432 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
6433 buf, buf_len);
6434 if (ret)
6435 return ret;
6436
6437 *authorizer = auth->authorizer_buf;
6438 *authorizer_len = auth->authorizer_buf_len;
6439 return 0;
6440 }
6441
mds_handle_auth_done(struct ceph_connection * con,u64 global_id,void * reply,int reply_len,u8 * session_key,int * session_key_len,u8 * con_secret,int * con_secret_len)6442 static int mds_handle_auth_done(struct ceph_connection *con,
6443 u64 global_id, void *reply, int reply_len,
6444 u8 *session_key, int *session_key_len,
6445 u8 *con_secret, int *con_secret_len)
6446 {
6447 struct ceph_mds_session *s = con->private;
6448 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
6449 struct ceph_auth_handshake *auth = &s->s_auth;
6450
6451 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
6452 session_key, session_key_len,
6453 con_secret, con_secret_len);
6454 }
6455
mds_handle_auth_bad_method(struct ceph_connection * con,int used_proto,int result,const int * allowed_protos,int proto_cnt,const int * allowed_modes,int mode_cnt)6456 static int mds_handle_auth_bad_method(struct ceph_connection *con,
6457 int used_proto, int result,
6458 const int *allowed_protos, int proto_cnt,
6459 const int *allowed_modes, int mode_cnt)
6460 {
6461 struct ceph_mds_session *s = con->private;
6462 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
6463 int ret;
6464
6465 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
6466 used_proto, result,
6467 allowed_protos, proto_cnt,
6468 allowed_modes, mode_cnt)) {
6469 ret = ceph_monc_validate_auth(monc);
6470 if (ret)
6471 return ret;
6472 }
6473
6474 return -EACCES;
6475 }
6476
mds_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)6477 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
6478 struct ceph_msg_header *hdr, int *skip)
6479 {
6480 struct ceph_msg *msg;
6481 int type = (int) le16_to_cpu(hdr->type);
6482 int front_len = (int) le32_to_cpu(hdr->front_len);
6483
6484 if (con->in_msg)
6485 return con->in_msg;
6486
6487 *skip = 0;
6488 msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
6489 if (!msg) {
6490 pr_err("unable to allocate msg type %d len %d\n",
6491 type, front_len);
6492 return NULL;
6493 }
6494
6495 return msg;
6496 }
6497
mds_sign_message(struct ceph_msg * msg)6498 static int mds_sign_message(struct ceph_msg *msg)
6499 {
6500 struct ceph_mds_session *s = msg->con->private;
6501 struct ceph_auth_handshake *auth = &s->s_auth;
6502
6503 return ceph_auth_sign_message(auth, msg);
6504 }
6505
mds_check_message_signature(struct ceph_msg * msg)6506 static int mds_check_message_signature(struct ceph_msg *msg)
6507 {
6508 struct ceph_mds_session *s = msg->con->private;
6509 struct ceph_auth_handshake *auth = &s->s_auth;
6510
6511 return ceph_auth_check_message_signature(auth, msg);
6512 }
6513
6514 static const struct ceph_connection_operations mds_con_ops = {
6515 .get = mds_get_con,
6516 .put = mds_put_con,
6517 .alloc_msg = mds_alloc_msg,
6518 .dispatch = mds_dispatch,
6519 .peer_reset = mds_peer_reset,
6520 .get_authorizer = mds_get_authorizer,
6521 .add_authorizer_challenge = mds_add_authorizer_challenge,
6522 .verify_authorizer_reply = mds_verify_authorizer_reply,
6523 .invalidate_authorizer = mds_invalidate_authorizer,
6524 .sign_message = mds_sign_message,
6525 .check_message_signature = mds_check_message_signature,
6526 .get_auth_request = mds_get_auth_request,
6527 .handle_auth_reply_more = mds_handle_auth_reply_more,
6528 .handle_auth_done = mds_handle_auth_done,
6529 .handle_auth_bad_method = mds_handle_auth_bad_method,
6530 };
6531
6532 /* eof */
6533