xref: /linux/fs/ceph/mds_client.c (revision 2600d2dd5085ab6fb09540226138a60055abf335)
1 #include "ceph_debug.h"
2 
3 #include <linux/wait.h>
4 #include <linux/sched.h>
5 
6 #include "mds_client.h"
7 #include "mon_client.h"
8 #include "super.h"
9 #include "messenger.h"
10 #include "decode.h"
11 #include "auth.h"
12 #include "pagelist.h"
13 
14 /*
15  * A cluster of MDS (metadata server) daemons is responsible for
16  * managing the file system namespace (the directory hierarchy and
17  * inodes) and for coordinating shared access to storage.  Metadata is
18  * partitioning hierarchically across a number of servers, and that
19  * partition varies over time as the cluster adjusts the distribution
20  * in order to balance load.
21  *
22  * The MDS client is primarily responsible to managing synchronous
23  * metadata requests for operations like open, unlink, and so forth.
24  * If there is a MDS failure, we find out about it when we (possibly
25  * request and) receive a new MDS map, and can resubmit affected
26  * requests.
27  *
28  * For the most part, though, we take advantage of a lossless
29  * communications channel to the MDS, and do not need to worry about
30  * timing out or resubmitting requests.
31  *
32  * We maintain a stateful "session" with each MDS we interact with.
33  * Within each session, we sent periodic heartbeat messages to ensure
34  * any capabilities or leases we have been issues remain valid.  If
35  * the session times out and goes stale, our leases and capabilities
36  * are no longer valid.
37  */
38 
39 static void __wake_requests(struct ceph_mds_client *mdsc,
40 			    struct list_head *head);
41 
42 const static struct ceph_connection_operations mds_con_ops;
43 
44 
45 /*
46  * mds reply parsing
47  */
48 
49 /*
50  * parse individual inode info
51  */
52 static int parse_reply_info_in(void **p, void *end,
53 			       struct ceph_mds_reply_info_in *info)
54 {
55 	int err = -EIO;
56 
57 	info->in = *p;
58 	*p += sizeof(struct ceph_mds_reply_inode) +
59 		sizeof(*info->in->fragtree.splits) *
60 		le32_to_cpu(info->in->fragtree.nsplits);
61 
62 	ceph_decode_32_safe(p, end, info->symlink_len, bad);
63 	ceph_decode_need(p, end, info->symlink_len, bad);
64 	info->symlink = *p;
65 	*p += info->symlink_len;
66 
67 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
68 	ceph_decode_need(p, end, info->xattr_len, bad);
69 	info->xattr_data = *p;
70 	*p += info->xattr_len;
71 	return 0;
72 bad:
73 	return err;
74 }
75 
76 /*
77  * parse a normal reply, which may contain a (dir+)dentry and/or a
78  * target inode.
79  */
80 static int parse_reply_info_trace(void **p, void *end,
81 				  struct ceph_mds_reply_info_parsed *info)
82 {
83 	int err;
84 
85 	if (info->head->is_dentry) {
86 		err = parse_reply_info_in(p, end, &info->diri);
87 		if (err < 0)
88 			goto out_bad;
89 
90 		if (unlikely(*p + sizeof(*info->dirfrag) > end))
91 			goto bad;
92 		info->dirfrag = *p;
93 		*p += sizeof(*info->dirfrag) +
94 			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
95 		if (unlikely(*p > end))
96 			goto bad;
97 
98 		ceph_decode_32_safe(p, end, info->dname_len, bad);
99 		ceph_decode_need(p, end, info->dname_len, bad);
100 		info->dname = *p;
101 		*p += info->dname_len;
102 		info->dlease = *p;
103 		*p += sizeof(*info->dlease);
104 	}
105 
106 	if (info->head->is_target) {
107 		err = parse_reply_info_in(p, end, &info->targeti);
108 		if (err < 0)
109 			goto out_bad;
110 	}
111 
112 	if (unlikely(*p != end))
113 		goto bad;
114 	return 0;
115 
116 bad:
117 	err = -EIO;
118 out_bad:
119 	pr_err("problem parsing mds trace %d\n", err);
120 	return err;
121 }
122 
123 /*
124  * parse readdir results
125  */
126 static int parse_reply_info_dir(void **p, void *end,
127 				struct ceph_mds_reply_info_parsed *info)
128 {
129 	u32 num, i = 0;
130 	int err;
131 
132 	info->dir_dir = *p;
133 	if (*p + sizeof(*info->dir_dir) > end)
134 		goto bad;
135 	*p += sizeof(*info->dir_dir) +
136 		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
137 	if (*p > end)
138 		goto bad;
139 
140 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
141 	num = ceph_decode_32(p);
142 	info->dir_end = ceph_decode_8(p);
143 	info->dir_complete = ceph_decode_8(p);
144 	if (num == 0)
145 		goto done;
146 
147 	/* alloc large array */
148 	info->dir_nr = num;
149 	info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
150 			       sizeof(*info->dir_dname) +
151 			       sizeof(*info->dir_dname_len) +
152 			       sizeof(*info->dir_dlease),
153 			       GFP_NOFS);
154 	if (info->dir_in == NULL) {
155 		err = -ENOMEM;
156 		goto out_bad;
157 	}
158 	info->dir_dname = (void *)(info->dir_in + num);
159 	info->dir_dname_len = (void *)(info->dir_dname + num);
160 	info->dir_dlease = (void *)(info->dir_dname_len + num);
161 
162 	while (num) {
163 		/* dentry */
164 		ceph_decode_need(p, end, sizeof(u32)*2, bad);
165 		info->dir_dname_len[i] = ceph_decode_32(p);
166 		ceph_decode_need(p, end, info->dir_dname_len[i], bad);
167 		info->dir_dname[i] = *p;
168 		*p += info->dir_dname_len[i];
169 		dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
170 		     info->dir_dname[i]);
171 		info->dir_dlease[i] = *p;
172 		*p += sizeof(struct ceph_mds_reply_lease);
173 
174 		/* inode */
175 		err = parse_reply_info_in(p, end, &info->dir_in[i]);
176 		if (err < 0)
177 			goto out_bad;
178 		i++;
179 		num--;
180 	}
181 
182 done:
183 	if (*p != end)
184 		goto bad;
185 	return 0;
186 
187 bad:
188 	err = -EIO;
189 out_bad:
190 	pr_err("problem parsing dir contents %d\n", err);
191 	return err;
192 }
193 
194 /*
195  * parse entire mds reply
196  */
197 static int parse_reply_info(struct ceph_msg *msg,
198 			    struct ceph_mds_reply_info_parsed *info)
199 {
200 	void *p, *end;
201 	u32 len;
202 	int err;
203 
204 	info->head = msg->front.iov_base;
205 	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
206 	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
207 
208 	/* trace */
209 	ceph_decode_32_safe(&p, end, len, bad);
210 	if (len > 0) {
211 		err = parse_reply_info_trace(&p, p+len, info);
212 		if (err < 0)
213 			goto out_bad;
214 	}
215 
216 	/* dir content */
217 	ceph_decode_32_safe(&p, end, len, bad);
218 	if (len > 0) {
219 		err = parse_reply_info_dir(&p, p+len, info);
220 		if (err < 0)
221 			goto out_bad;
222 	}
223 
224 	/* snap blob */
225 	ceph_decode_32_safe(&p, end, len, bad);
226 	info->snapblob_len = len;
227 	info->snapblob = p;
228 	p += len;
229 
230 	if (p != end)
231 		goto bad;
232 	return 0;
233 
234 bad:
235 	err = -EIO;
236 out_bad:
237 	pr_err("mds parse_reply err %d\n", err);
238 	return err;
239 }
240 
241 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
242 {
243 	kfree(info->dir_in);
244 }
245 
246 
247 /*
248  * sessions
249  */
250 static const char *session_state_name(int s)
251 {
252 	switch (s) {
253 	case CEPH_MDS_SESSION_NEW: return "new";
254 	case CEPH_MDS_SESSION_OPENING: return "opening";
255 	case CEPH_MDS_SESSION_OPEN: return "open";
256 	case CEPH_MDS_SESSION_HUNG: return "hung";
257 	case CEPH_MDS_SESSION_CLOSING: return "closing";
258 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
259 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
260 	default: return "???";
261 	}
262 }
263 
264 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
265 {
266 	if (atomic_inc_not_zero(&s->s_ref)) {
267 		dout("mdsc get_session %p %d -> %d\n", s,
268 		     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
269 		return s;
270 	} else {
271 		dout("mdsc get_session %p 0 -- FAIL", s);
272 		return NULL;
273 	}
274 }
275 
276 void ceph_put_mds_session(struct ceph_mds_session *s)
277 {
278 	dout("mdsc put_session %p %d -> %d\n", s,
279 	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
280 	if (atomic_dec_and_test(&s->s_ref)) {
281 		if (s->s_authorizer)
282 			s->s_mdsc->client->monc.auth->ops->destroy_authorizer(
283 				s->s_mdsc->client->monc.auth, s->s_authorizer);
284 		kfree(s);
285 	}
286 }
287 
288 /*
289  * called under mdsc->mutex
290  */
291 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
292 						   int mds)
293 {
294 	struct ceph_mds_session *session;
295 
296 	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
297 		return NULL;
298 	session = mdsc->sessions[mds];
299 	dout("lookup_mds_session %p %d\n", session,
300 	     atomic_read(&session->s_ref));
301 	get_session(session);
302 	return session;
303 }
304 
305 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
306 {
307 	if (mds >= mdsc->max_sessions)
308 		return false;
309 	return mdsc->sessions[mds];
310 }
311 
312 static int __verify_registered_session(struct ceph_mds_client *mdsc,
313 				       struct ceph_mds_session *s)
314 {
315 	if (s->s_mds >= mdsc->max_sessions ||
316 	    mdsc->sessions[s->s_mds] != s)
317 		return -ENOENT;
318 	return 0;
319 }
320 
321 /*
322  * create+register a new session for given mds.
323  * called under mdsc->mutex.
324  */
325 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
326 						 int mds)
327 {
328 	struct ceph_mds_session *s;
329 
330 	s = kzalloc(sizeof(*s), GFP_NOFS);
331 	s->s_mdsc = mdsc;
332 	s->s_mds = mds;
333 	s->s_state = CEPH_MDS_SESSION_NEW;
334 	s->s_ttl = 0;
335 	s->s_seq = 0;
336 	mutex_init(&s->s_mutex);
337 
338 	ceph_con_init(mdsc->client->msgr, &s->s_con);
339 	s->s_con.private = s;
340 	s->s_con.ops = &mds_con_ops;
341 	s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
342 	s->s_con.peer_name.num = cpu_to_le64(mds);
343 
344 	spin_lock_init(&s->s_cap_lock);
345 	s->s_cap_gen = 0;
346 	s->s_cap_ttl = 0;
347 	s->s_renew_requested = 0;
348 	s->s_renew_seq = 0;
349 	INIT_LIST_HEAD(&s->s_caps);
350 	s->s_nr_caps = 0;
351 	s->s_trim_caps = 0;
352 	atomic_set(&s->s_ref, 1);
353 	INIT_LIST_HEAD(&s->s_waiting);
354 	INIT_LIST_HEAD(&s->s_unsafe);
355 	s->s_num_cap_releases = 0;
356 	s->s_cap_iterator = NULL;
357 	INIT_LIST_HEAD(&s->s_cap_releases);
358 	INIT_LIST_HEAD(&s->s_cap_releases_done);
359 	INIT_LIST_HEAD(&s->s_cap_flushing);
360 	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
361 
362 	dout("register_session mds%d\n", mds);
363 	if (mds >= mdsc->max_sessions) {
364 		int newmax = 1 << get_count_order(mds+1);
365 		struct ceph_mds_session **sa;
366 
367 		dout("register_session realloc to %d\n", newmax);
368 		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
369 		if (sa == NULL)
370 			goto fail_realloc;
371 		if (mdsc->sessions) {
372 			memcpy(sa, mdsc->sessions,
373 			       mdsc->max_sessions * sizeof(void *));
374 			kfree(mdsc->sessions);
375 		}
376 		mdsc->sessions = sa;
377 		mdsc->max_sessions = newmax;
378 	}
379 	mdsc->sessions[mds] = s;
380 	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
381 
382 	ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
383 
384 	return s;
385 
386 fail_realloc:
387 	kfree(s);
388 	return ERR_PTR(-ENOMEM);
389 }
390 
391 /*
392  * called under mdsc->mutex
393  */
394 static void __unregister_session(struct ceph_mds_client *mdsc,
395 			       struct ceph_mds_session *s)
396 {
397 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
398 	BUG_ON(mdsc->sessions[s->s_mds] != s);
399 	mdsc->sessions[s->s_mds] = NULL;
400 	ceph_con_close(&s->s_con);
401 	ceph_put_mds_session(s);
402 }
403 
404 /*
405  * drop session refs in request.
406  *
407  * should be last request ref, or hold mdsc->mutex
408  */
409 static void put_request_session(struct ceph_mds_request *req)
410 {
411 	if (req->r_session) {
412 		ceph_put_mds_session(req->r_session);
413 		req->r_session = NULL;
414 	}
415 }
416 
417 void ceph_mdsc_release_request(struct kref *kref)
418 {
419 	struct ceph_mds_request *req = container_of(kref,
420 						    struct ceph_mds_request,
421 						    r_kref);
422 	if (req->r_request)
423 		ceph_msg_put(req->r_request);
424 	if (req->r_reply) {
425 		ceph_msg_put(req->r_reply);
426 		destroy_reply_info(&req->r_reply_info);
427 	}
428 	if (req->r_inode) {
429 		ceph_put_cap_refs(ceph_inode(req->r_inode),
430 				  CEPH_CAP_PIN);
431 		iput(req->r_inode);
432 	}
433 	if (req->r_locked_dir)
434 		ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
435 				  CEPH_CAP_PIN);
436 	if (req->r_target_inode)
437 		iput(req->r_target_inode);
438 	if (req->r_dentry)
439 		dput(req->r_dentry);
440 	if (req->r_old_dentry) {
441 		ceph_put_cap_refs(
442 			ceph_inode(req->r_old_dentry->d_parent->d_inode),
443 			CEPH_CAP_PIN);
444 		dput(req->r_old_dentry);
445 	}
446 	kfree(req->r_path1);
447 	kfree(req->r_path2);
448 	put_request_session(req);
449 	ceph_unreserve_caps(&req->r_caps_reservation);
450 	kfree(req);
451 }
452 
453 /*
454  * lookup session, bump ref if found.
455  *
456  * called under mdsc->mutex.
457  */
458 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
459 					     u64 tid)
460 {
461 	struct ceph_mds_request *req;
462 	struct rb_node *n = mdsc->request_tree.rb_node;
463 
464 	while (n) {
465 		req = rb_entry(n, struct ceph_mds_request, r_node);
466 		if (tid < req->r_tid)
467 			n = n->rb_left;
468 		else if (tid > req->r_tid)
469 			n = n->rb_right;
470 		else {
471 			ceph_mdsc_get_request(req);
472 			return req;
473 		}
474 	}
475 	return NULL;
476 }
477 
478 static void __insert_request(struct ceph_mds_client *mdsc,
479 			     struct ceph_mds_request *new)
480 {
481 	struct rb_node **p = &mdsc->request_tree.rb_node;
482 	struct rb_node *parent = NULL;
483 	struct ceph_mds_request *req = NULL;
484 
485 	while (*p) {
486 		parent = *p;
487 		req = rb_entry(parent, struct ceph_mds_request, r_node);
488 		if (new->r_tid < req->r_tid)
489 			p = &(*p)->rb_left;
490 		else if (new->r_tid > req->r_tid)
491 			p = &(*p)->rb_right;
492 		else
493 			BUG();
494 	}
495 
496 	rb_link_node(&new->r_node, parent, p);
497 	rb_insert_color(&new->r_node, &mdsc->request_tree);
498 }
499 
500 /*
501  * Register an in-flight request, and assign a tid.  Link to directory
502  * are modifying (if any).
503  *
504  * Called under mdsc->mutex.
505  */
506 static void __register_request(struct ceph_mds_client *mdsc,
507 			       struct ceph_mds_request *req,
508 			       struct inode *dir)
509 {
510 	req->r_tid = ++mdsc->last_tid;
511 	if (req->r_num_caps)
512 		ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
513 	dout("__register_request %p tid %lld\n", req, req->r_tid);
514 	ceph_mdsc_get_request(req);
515 	__insert_request(mdsc, req);
516 
517 	if (dir) {
518 		struct ceph_inode_info *ci = ceph_inode(dir);
519 
520 		spin_lock(&ci->i_unsafe_lock);
521 		req->r_unsafe_dir = dir;
522 		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
523 		spin_unlock(&ci->i_unsafe_lock);
524 	}
525 }
526 
527 static void __unregister_request(struct ceph_mds_client *mdsc,
528 				 struct ceph_mds_request *req)
529 {
530 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
531 	rb_erase(&req->r_node, &mdsc->request_tree);
532 	ceph_mdsc_put_request(req);
533 
534 	if (req->r_unsafe_dir) {
535 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
536 
537 		spin_lock(&ci->i_unsafe_lock);
538 		list_del_init(&req->r_unsafe_dir_item);
539 		spin_unlock(&ci->i_unsafe_lock);
540 	}
541 }
542 
543 /*
544  * Choose mds to send request to next.  If there is a hint set in the
545  * request (e.g., due to a prior forward hint from the mds), use that.
546  * Otherwise, consult frag tree and/or caps to identify the
547  * appropriate mds.  If all else fails, choose randomly.
548  *
549  * Called under mdsc->mutex.
550  */
551 static int __choose_mds(struct ceph_mds_client *mdsc,
552 			struct ceph_mds_request *req)
553 {
554 	struct inode *inode;
555 	struct ceph_inode_info *ci;
556 	struct ceph_cap *cap;
557 	int mode = req->r_direct_mode;
558 	int mds = -1;
559 	u32 hash = req->r_direct_hash;
560 	bool is_hash = req->r_direct_is_hash;
561 
562 	/*
563 	 * is there a specific mds we should try?  ignore hint if we have
564 	 * no session and the mds is not up (active or recovering).
565 	 */
566 	if (req->r_resend_mds >= 0 &&
567 	    (__have_session(mdsc, req->r_resend_mds) ||
568 	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
569 		dout("choose_mds using resend_mds mds%d\n",
570 		     req->r_resend_mds);
571 		return req->r_resend_mds;
572 	}
573 
574 	if (mode == USE_RANDOM_MDS)
575 		goto random;
576 
577 	inode = NULL;
578 	if (req->r_inode) {
579 		inode = req->r_inode;
580 	} else if (req->r_dentry) {
581 		if (req->r_dentry->d_inode) {
582 			inode = req->r_dentry->d_inode;
583 		} else {
584 			inode = req->r_dentry->d_parent->d_inode;
585 			hash = req->r_dentry->d_name.hash;
586 			is_hash = true;
587 		}
588 	}
589 	dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
590 	     (int)hash, mode);
591 	if (!inode)
592 		goto random;
593 	ci = ceph_inode(inode);
594 
595 	if (is_hash && S_ISDIR(inode->i_mode)) {
596 		struct ceph_inode_frag frag;
597 		int found;
598 
599 		ceph_choose_frag(ci, hash, &frag, &found);
600 		if (found) {
601 			if (mode == USE_ANY_MDS && frag.ndist > 0) {
602 				u8 r;
603 
604 				/* choose a random replica */
605 				get_random_bytes(&r, 1);
606 				r %= frag.ndist;
607 				mds = frag.dist[r];
608 				dout("choose_mds %p %llx.%llx "
609 				     "frag %u mds%d (%d/%d)\n",
610 				     inode, ceph_vinop(inode),
611 				     frag.frag, frag.mds,
612 				     (int)r, frag.ndist);
613 				return mds;
614 			}
615 
616 			/* since this file/dir wasn't known to be
617 			 * replicated, then we want to look for the
618 			 * authoritative mds. */
619 			mode = USE_AUTH_MDS;
620 			if (frag.mds >= 0) {
621 				/* choose auth mds */
622 				mds = frag.mds;
623 				dout("choose_mds %p %llx.%llx "
624 				     "frag %u mds%d (auth)\n",
625 				     inode, ceph_vinop(inode), frag.frag, mds);
626 				return mds;
627 			}
628 		}
629 	}
630 
631 	spin_lock(&inode->i_lock);
632 	cap = NULL;
633 	if (mode == USE_AUTH_MDS)
634 		cap = ci->i_auth_cap;
635 	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
636 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
637 	if (!cap) {
638 		spin_unlock(&inode->i_lock);
639 		goto random;
640 	}
641 	mds = cap->session->s_mds;
642 	dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
643 	     inode, ceph_vinop(inode), mds,
644 	     cap == ci->i_auth_cap ? "auth " : "", cap);
645 	spin_unlock(&inode->i_lock);
646 	return mds;
647 
648 random:
649 	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
650 	dout("choose_mds chose random mds%d\n", mds);
651 	return mds;
652 }
653 
654 
655 /*
656  * session messages
657  */
658 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
659 {
660 	struct ceph_msg *msg;
661 	struct ceph_mds_session_head *h;
662 
663 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
664 	if (IS_ERR(msg)) {
665 		pr_err("create_session_msg ENOMEM creating msg\n");
666 		return ERR_PTR(PTR_ERR(msg));
667 	}
668 	h = msg->front.iov_base;
669 	h->op = cpu_to_le32(op);
670 	h->seq = cpu_to_le64(seq);
671 	return msg;
672 }
673 
674 /*
675  * send session open request.
676  *
677  * called under mdsc->mutex
678  */
679 static int __open_session(struct ceph_mds_client *mdsc,
680 			  struct ceph_mds_session *session)
681 {
682 	struct ceph_msg *msg;
683 	int mstate;
684 	int mds = session->s_mds;
685 	int err = 0;
686 
687 	/* wait for mds to go active? */
688 	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
689 	dout("open_session to mds%d (%s)\n", mds,
690 	     ceph_mds_state_name(mstate));
691 	session->s_state = CEPH_MDS_SESSION_OPENING;
692 	session->s_renew_requested = jiffies;
693 
694 	/* send connect message */
695 	msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
696 	if (IS_ERR(msg)) {
697 		err = PTR_ERR(msg);
698 		goto out;
699 	}
700 	ceph_con_send(&session->s_con, msg);
701 
702 out:
703 	return 0;
704 }
705 
706 /*
707  * session caps
708  */
709 
710 /*
711  * Free preallocated cap messages assigned to this session
712  */
713 static void cleanup_cap_releases(struct ceph_mds_session *session)
714 {
715 	struct ceph_msg *msg;
716 
717 	spin_lock(&session->s_cap_lock);
718 	while (!list_empty(&session->s_cap_releases)) {
719 		msg = list_first_entry(&session->s_cap_releases,
720 				       struct ceph_msg, list_head);
721 		list_del_init(&msg->list_head);
722 		ceph_msg_put(msg);
723 	}
724 	while (!list_empty(&session->s_cap_releases_done)) {
725 		msg = list_first_entry(&session->s_cap_releases_done,
726 				       struct ceph_msg, list_head);
727 		list_del_init(&msg->list_head);
728 		ceph_msg_put(msg);
729 	}
730 	spin_unlock(&session->s_cap_lock);
731 }
732 
733 /*
734  * Helper to safely iterate over all caps associated with a session.
735  *
736  * caller must hold session s_mutex
737  */
738 static int iterate_session_caps(struct ceph_mds_session *session,
739 				 int (*cb)(struct inode *, struct ceph_cap *,
740 					    void *), void *arg)
741 {
742 	struct list_head *p;
743 	struct ceph_cap *cap;
744 	struct inode *inode, *last_inode = NULL;
745 	struct ceph_cap *old_cap = NULL;
746 	int ret;
747 
748 	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
749 	spin_lock(&session->s_cap_lock);
750 	p = session->s_caps.next;
751 	while (p != &session->s_caps) {
752 		cap = list_entry(p, struct ceph_cap, session_caps);
753 		inode = igrab(&cap->ci->vfs_inode);
754 		if (!inode) {
755 			p = p->next;
756 			continue;
757 		}
758 		session->s_cap_iterator = cap;
759 		spin_unlock(&session->s_cap_lock);
760 
761 		if (last_inode) {
762 			iput(last_inode);
763 			last_inode = NULL;
764 		}
765 		if (old_cap) {
766 			ceph_put_cap(old_cap);
767 			old_cap = NULL;
768 		}
769 
770 		ret = cb(inode, cap, arg);
771 		last_inode = inode;
772 
773 		spin_lock(&session->s_cap_lock);
774 		p = p->next;
775 		if (cap->ci == NULL) {
776 			dout("iterate_session_caps  finishing cap %p removal\n",
777 			     cap);
778 			BUG_ON(cap->session != session);
779 			list_del_init(&cap->session_caps);
780 			session->s_nr_caps--;
781 			cap->session = NULL;
782 			old_cap = cap;  /* put_cap it w/o locks held */
783 		}
784 		if (ret < 0)
785 			goto out;
786 	}
787 	ret = 0;
788 out:
789 	session->s_cap_iterator = NULL;
790 	spin_unlock(&session->s_cap_lock);
791 
792 	if (last_inode)
793 		iput(last_inode);
794 	if (old_cap)
795 		ceph_put_cap(old_cap);
796 
797 	return ret;
798 }
799 
800 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
801 				   void *arg)
802 {
803 	struct ceph_inode_info *ci = ceph_inode(inode);
804 	dout("removing cap %p, ci is %p, inode is %p\n",
805 	     cap, ci, &ci->vfs_inode);
806 	ceph_remove_cap(cap);
807 	return 0;
808 }
809 
810 /*
811  * caller must hold session s_mutex
812  */
813 static void remove_session_caps(struct ceph_mds_session *session)
814 {
815 	dout("remove_session_caps on %p\n", session);
816 	iterate_session_caps(session, remove_session_caps_cb, NULL);
817 	BUG_ON(session->s_nr_caps > 0);
818 	cleanup_cap_releases(session);
819 }
820 
821 /*
822  * wake up any threads waiting on this session's caps.  if the cap is
823  * old (didn't get renewed on the client reconnect), remove it now.
824  *
825  * caller must hold s_mutex.
826  */
827 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
828 			      void *arg)
829 {
830 	struct ceph_inode_info *ci = ceph_inode(inode);
831 
832 	wake_up(&ci->i_cap_wq);
833 	if (arg) {
834 		spin_lock(&inode->i_lock);
835 		ci->i_wanted_max_size = 0;
836 		ci->i_requested_max_size = 0;
837 		spin_unlock(&inode->i_lock);
838 	}
839 	return 0;
840 }
841 
842 static void wake_up_session_caps(struct ceph_mds_session *session,
843 				 int reconnect)
844 {
845 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
846 	iterate_session_caps(session, wake_up_session_cb,
847 			     (void *)(unsigned long)reconnect);
848 }
849 
850 /*
851  * Send periodic message to MDS renewing all currently held caps.  The
852  * ack will reset the expiration for all caps from this session.
853  *
854  * caller holds s_mutex
855  */
856 static int send_renew_caps(struct ceph_mds_client *mdsc,
857 			   struct ceph_mds_session *session)
858 {
859 	struct ceph_msg *msg;
860 	int state;
861 
862 	if (time_after_eq(jiffies, session->s_cap_ttl) &&
863 	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
864 		pr_info("mds%d caps stale\n", session->s_mds);
865 
866 	/* do not try to renew caps until a recovering mds has reconnected
867 	 * with its clients. */
868 	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
869 	if (state < CEPH_MDS_STATE_RECONNECT) {
870 		dout("send_renew_caps ignoring mds%d (%s)\n",
871 		     session->s_mds, ceph_mds_state_name(state));
872 		return 0;
873 	}
874 
875 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
876 		ceph_mds_state_name(state));
877 	session->s_renew_requested = jiffies;
878 	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
879 				 ++session->s_renew_seq);
880 	if (IS_ERR(msg))
881 		return PTR_ERR(msg);
882 	ceph_con_send(&session->s_con, msg);
883 	return 0;
884 }
885 
886 /*
887  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
888  *
889  * Called under session->s_mutex
890  */
891 static void renewed_caps(struct ceph_mds_client *mdsc,
892 			 struct ceph_mds_session *session, int is_renew)
893 {
894 	int was_stale;
895 	int wake = 0;
896 
897 	spin_lock(&session->s_cap_lock);
898 	was_stale = is_renew && (session->s_cap_ttl == 0 ||
899 				 time_after_eq(jiffies, session->s_cap_ttl));
900 
901 	session->s_cap_ttl = session->s_renew_requested +
902 		mdsc->mdsmap->m_session_timeout*HZ;
903 
904 	if (was_stale) {
905 		if (time_before(jiffies, session->s_cap_ttl)) {
906 			pr_info("mds%d caps renewed\n", session->s_mds);
907 			wake = 1;
908 		} else {
909 			pr_info("mds%d caps still stale\n", session->s_mds);
910 		}
911 	}
912 	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
913 	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
914 	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
915 	spin_unlock(&session->s_cap_lock);
916 
917 	if (wake)
918 		wake_up_session_caps(session, 0);
919 }
920 
921 /*
922  * send a session close request
923  */
924 static int request_close_session(struct ceph_mds_client *mdsc,
925 				 struct ceph_mds_session *session)
926 {
927 	struct ceph_msg *msg;
928 	int err = 0;
929 
930 	dout("request_close_session mds%d state %s seq %lld\n",
931 	     session->s_mds, session_state_name(session->s_state),
932 	     session->s_seq);
933 	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
934 	if (IS_ERR(msg))
935 		err = PTR_ERR(msg);
936 	else
937 		ceph_con_send(&session->s_con, msg);
938 	return err;
939 }
940 
941 /*
942  * Called with s_mutex held.
943  */
944 static int __close_session(struct ceph_mds_client *mdsc,
945 			 struct ceph_mds_session *session)
946 {
947 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
948 		return 0;
949 	session->s_state = CEPH_MDS_SESSION_CLOSING;
950 	return request_close_session(mdsc, session);
951 }
952 
953 /*
954  * Trim old(er) caps.
955  *
956  * Because we can't cache an inode without one or more caps, we do
957  * this indirectly: if a cap is unused, we prune its aliases, at which
958  * point the inode will hopefully get dropped to.
959  *
960  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
961  * memory pressure from the MDS, though, so it needn't be perfect.
962  */
963 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
964 {
965 	struct ceph_mds_session *session = arg;
966 	struct ceph_inode_info *ci = ceph_inode(inode);
967 	int used, oissued, mine;
968 
969 	if (session->s_trim_caps <= 0)
970 		return -1;
971 
972 	spin_lock(&inode->i_lock);
973 	mine = cap->issued | cap->implemented;
974 	used = __ceph_caps_used(ci);
975 	oissued = __ceph_caps_issued_other(ci, cap);
976 
977 	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
978 	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
979 	     ceph_cap_string(used));
980 	if (ci->i_dirty_caps)
981 		goto out;   /* dirty caps */
982 	if ((used & ~oissued) & mine)
983 		goto out;   /* we need these caps */
984 
985 	session->s_trim_caps--;
986 	if (oissued) {
987 		/* we aren't the only cap.. just remove us */
988 		__ceph_remove_cap(cap);
989 	} else {
990 		/* try to drop referring dentries */
991 		spin_unlock(&inode->i_lock);
992 		d_prune_aliases(inode);
993 		dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
994 		     inode, cap, atomic_read(&inode->i_count));
995 		return 0;
996 	}
997 
998 out:
999 	spin_unlock(&inode->i_lock);
1000 	return 0;
1001 }
1002 
1003 /*
1004  * Trim session cap count down to some max number.
1005  */
1006 static int trim_caps(struct ceph_mds_client *mdsc,
1007 		     struct ceph_mds_session *session,
1008 		     int max_caps)
1009 {
1010 	int trim_caps = session->s_nr_caps - max_caps;
1011 
1012 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
1013 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1014 	if (trim_caps > 0) {
1015 		session->s_trim_caps = trim_caps;
1016 		iterate_session_caps(session, trim_caps_cb, session);
1017 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1018 		     session->s_mds, session->s_nr_caps, max_caps,
1019 			trim_caps - session->s_trim_caps);
1020 		session->s_trim_caps = 0;
1021 	}
1022 	return 0;
1023 }
1024 
1025 /*
1026  * Allocate cap_release messages.  If there is a partially full message
1027  * in the queue, try to allocate enough to cover it's remainder, so that
1028  * we can send it immediately.
1029  *
1030  * Called under s_mutex.
1031  */
1032 static int add_cap_releases(struct ceph_mds_client *mdsc,
1033 			    struct ceph_mds_session *session,
1034 			    int extra)
1035 {
1036 	struct ceph_msg *msg;
1037 	struct ceph_mds_cap_release *head;
1038 	int err = -ENOMEM;
1039 
1040 	if (extra < 0)
1041 		extra = mdsc->client->mount_args->cap_release_safety;
1042 
1043 	spin_lock(&session->s_cap_lock);
1044 
1045 	if (!list_empty(&session->s_cap_releases)) {
1046 		msg = list_first_entry(&session->s_cap_releases,
1047 				       struct ceph_msg,
1048 				 list_head);
1049 		head = msg->front.iov_base;
1050 		extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
1051 	}
1052 
1053 	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1054 		spin_unlock(&session->s_cap_lock);
1055 		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1056 				   0, 0, NULL);
1057 		if (!msg)
1058 			goto out_unlocked;
1059 		dout("add_cap_releases %p msg %p now %d\n", session, msg,
1060 		     (int)msg->front.iov_len);
1061 		head = msg->front.iov_base;
1062 		head->num = cpu_to_le32(0);
1063 		msg->front.iov_len = sizeof(*head);
1064 		spin_lock(&session->s_cap_lock);
1065 		list_add(&msg->list_head, &session->s_cap_releases);
1066 		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1067 	}
1068 
1069 	if (!list_empty(&session->s_cap_releases)) {
1070 		msg = list_first_entry(&session->s_cap_releases,
1071 				       struct ceph_msg,
1072 				       list_head);
1073 		head = msg->front.iov_base;
1074 		if (head->num) {
1075 			dout(" queueing non-full %p (%d)\n", msg,
1076 			     le32_to_cpu(head->num));
1077 			list_move_tail(&msg->list_head,
1078 				      &session->s_cap_releases_done);
1079 			session->s_num_cap_releases -=
1080 				CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
1081 		}
1082 	}
1083 	err = 0;
1084 	spin_unlock(&session->s_cap_lock);
1085 out_unlocked:
1086 	return err;
1087 }
1088 
1089 /*
1090  * flush all dirty inode data to disk.
1091  *
1092  * returns true if we've flushed through want_flush_seq
1093  */
1094 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1095 {
1096 	int mds, ret = 1;
1097 
1098 	dout("check_cap_flush want %lld\n", want_flush_seq);
1099 	mutex_lock(&mdsc->mutex);
1100 	for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1101 		struct ceph_mds_session *session = mdsc->sessions[mds];
1102 
1103 		if (!session)
1104 			continue;
1105 		get_session(session);
1106 		mutex_unlock(&mdsc->mutex);
1107 
1108 		mutex_lock(&session->s_mutex);
1109 		if (!list_empty(&session->s_cap_flushing)) {
1110 			struct ceph_inode_info *ci =
1111 				list_entry(session->s_cap_flushing.next,
1112 					   struct ceph_inode_info,
1113 					   i_flushing_item);
1114 			struct inode *inode = &ci->vfs_inode;
1115 
1116 			spin_lock(&inode->i_lock);
1117 			if (ci->i_cap_flush_seq <= want_flush_seq) {
1118 				dout("check_cap_flush still flushing %p "
1119 				     "seq %lld <= %lld to mds%d\n", inode,
1120 				     ci->i_cap_flush_seq, want_flush_seq,
1121 				     session->s_mds);
1122 				ret = 0;
1123 			}
1124 			spin_unlock(&inode->i_lock);
1125 		}
1126 		mutex_unlock(&session->s_mutex);
1127 		ceph_put_mds_session(session);
1128 
1129 		if (!ret)
1130 			return ret;
1131 		mutex_lock(&mdsc->mutex);
1132 	}
1133 
1134 	mutex_unlock(&mdsc->mutex);
1135 	dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1136 	return ret;
1137 }
1138 
1139 /*
1140  * called under s_mutex
1141  */
1142 static void send_cap_releases(struct ceph_mds_client *mdsc,
1143 		       struct ceph_mds_session *session)
1144 {
1145 	struct ceph_msg *msg;
1146 
1147 	dout("send_cap_releases mds%d\n", session->s_mds);
1148 	while (1) {
1149 		spin_lock(&session->s_cap_lock);
1150 		if (list_empty(&session->s_cap_releases_done))
1151 			break;
1152 		msg = list_first_entry(&session->s_cap_releases_done,
1153 				 struct ceph_msg, list_head);
1154 		list_del_init(&msg->list_head);
1155 		spin_unlock(&session->s_cap_lock);
1156 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1157 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1158 		ceph_con_send(&session->s_con, msg);
1159 	}
1160 	spin_unlock(&session->s_cap_lock);
1161 }
1162 
1163 /*
1164  * requests
1165  */
1166 
1167 /*
1168  * Create an mds request.
1169  */
1170 struct ceph_mds_request *
1171 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1172 {
1173 	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1174 
1175 	if (!req)
1176 		return ERR_PTR(-ENOMEM);
1177 
1178 	req->r_started = jiffies;
1179 	req->r_resend_mds = -1;
1180 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1181 	req->r_fmode = -1;
1182 	kref_init(&req->r_kref);
1183 	INIT_LIST_HEAD(&req->r_wait);
1184 	init_completion(&req->r_completion);
1185 	init_completion(&req->r_safe_completion);
1186 	INIT_LIST_HEAD(&req->r_unsafe_item);
1187 
1188 	req->r_op = op;
1189 	req->r_direct_mode = mode;
1190 	return req;
1191 }
1192 
1193 /*
1194  * return oldest (lowest) request, tid in request tree, 0 if none.
1195  *
1196  * called under mdsc->mutex.
1197  */
1198 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1199 {
1200 	if (RB_EMPTY_ROOT(&mdsc->request_tree))
1201 		return NULL;
1202 	return rb_entry(rb_first(&mdsc->request_tree),
1203 			struct ceph_mds_request, r_node);
1204 }
1205 
1206 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1207 {
1208 	struct ceph_mds_request *req = __get_oldest_req(mdsc);
1209 
1210 	if (req)
1211 		return req->r_tid;
1212 	return 0;
1213 }
1214 
1215 /*
1216  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1217  * on build_path_from_dentry in fs/cifs/dir.c.
1218  *
1219  * If @stop_on_nosnap, generate path relative to the first non-snapped
1220  * inode.
1221  *
1222  * Encode hidden .snap dirs as a double /, i.e.
1223  *   foo/.snap/bar -> foo//bar
1224  */
1225 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1226 			   int stop_on_nosnap)
1227 {
1228 	struct dentry *temp;
1229 	char *path;
1230 	int len, pos;
1231 
1232 	if (dentry == NULL)
1233 		return ERR_PTR(-EINVAL);
1234 
1235 retry:
1236 	len = 0;
1237 	for (temp = dentry; !IS_ROOT(temp);) {
1238 		struct inode *inode = temp->d_inode;
1239 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1240 			len++;  /* slash only */
1241 		else if (stop_on_nosnap && inode &&
1242 			 ceph_snap(inode) == CEPH_NOSNAP)
1243 			break;
1244 		else
1245 			len += 1 + temp->d_name.len;
1246 		temp = temp->d_parent;
1247 		if (temp == NULL) {
1248 			pr_err("build_path_dentry corrupt dentry %p\n", dentry);
1249 			return ERR_PTR(-EINVAL);
1250 		}
1251 	}
1252 	if (len)
1253 		len--;  /* no leading '/' */
1254 
1255 	path = kmalloc(len+1, GFP_NOFS);
1256 	if (path == NULL)
1257 		return ERR_PTR(-ENOMEM);
1258 	pos = len;
1259 	path[pos] = 0;	/* trailing null */
1260 	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1261 		struct inode *inode = temp->d_inode;
1262 
1263 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1264 			dout("build_path_dentry path+%d: %p SNAPDIR\n",
1265 			     pos, temp);
1266 		} else if (stop_on_nosnap && inode &&
1267 			   ceph_snap(inode) == CEPH_NOSNAP) {
1268 			break;
1269 		} else {
1270 			pos -= temp->d_name.len;
1271 			if (pos < 0)
1272 				break;
1273 			strncpy(path + pos, temp->d_name.name,
1274 				temp->d_name.len);
1275 			dout("build_path_dentry path+%d: %p '%.*s'\n",
1276 			     pos, temp, temp->d_name.len, path + pos);
1277 		}
1278 		if (pos)
1279 			path[--pos] = '/';
1280 		temp = temp->d_parent;
1281 		if (temp == NULL) {
1282 			pr_err("build_path_dentry corrupt dentry\n");
1283 			kfree(path);
1284 			return ERR_PTR(-EINVAL);
1285 		}
1286 	}
1287 	if (pos != 0) {
1288 		pr_err("build_path_dentry did not end path lookup where "
1289 		       "expected, namelen is %d, pos is %d\n", len, pos);
1290 		/* presumably this is only possible if racing with a
1291 		   rename of one of the parent directories (we can not
1292 		   lock the dentries above us to prevent this, but
1293 		   retrying should be harmless) */
1294 		kfree(path);
1295 		goto retry;
1296 	}
1297 
1298 	*base = ceph_ino(temp->d_inode);
1299 	*plen = len;
1300 	dout("build_path_dentry on %p %d built %llx '%.*s'\n",
1301 	     dentry, atomic_read(&dentry->d_count), *base, len, path);
1302 	return path;
1303 }
1304 
1305 static int build_dentry_path(struct dentry *dentry,
1306 			     const char **ppath, int *ppathlen, u64 *pino,
1307 			     int *pfreepath)
1308 {
1309 	char *path;
1310 
1311 	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1312 		*pino = ceph_ino(dentry->d_parent->d_inode);
1313 		*ppath = dentry->d_name.name;
1314 		*ppathlen = dentry->d_name.len;
1315 		return 0;
1316 	}
1317 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1318 	if (IS_ERR(path))
1319 		return PTR_ERR(path);
1320 	*ppath = path;
1321 	*pfreepath = 1;
1322 	return 0;
1323 }
1324 
1325 static int build_inode_path(struct inode *inode,
1326 			    const char **ppath, int *ppathlen, u64 *pino,
1327 			    int *pfreepath)
1328 {
1329 	struct dentry *dentry;
1330 	char *path;
1331 
1332 	if (ceph_snap(inode) == CEPH_NOSNAP) {
1333 		*pino = ceph_ino(inode);
1334 		*ppathlen = 0;
1335 		return 0;
1336 	}
1337 	dentry = d_find_alias(inode);
1338 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1339 	dput(dentry);
1340 	if (IS_ERR(path))
1341 		return PTR_ERR(path);
1342 	*ppath = path;
1343 	*pfreepath = 1;
1344 	return 0;
1345 }
1346 
1347 /*
1348  * request arguments may be specified via an inode *, a dentry *, or
1349  * an explicit ino+path.
1350  */
1351 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1352 				  const char *rpath, u64 rino,
1353 				  const char **ppath, int *pathlen,
1354 				  u64 *ino, int *freepath)
1355 {
1356 	int r = 0;
1357 
1358 	if (rinode) {
1359 		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1360 		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1361 		     ceph_snap(rinode));
1362 	} else if (rdentry) {
1363 		r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1364 		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1365 		     *ppath);
1366 	} else if (rpath) {
1367 		*ino = rino;
1368 		*ppath = rpath;
1369 		*pathlen = strlen(rpath);
1370 		dout(" path %.*s\n", *pathlen, rpath);
1371 	}
1372 
1373 	return r;
1374 }
1375 
1376 /*
1377  * called under mdsc->mutex
1378  */
1379 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1380 					       struct ceph_mds_request *req,
1381 					       int mds)
1382 {
1383 	struct ceph_msg *msg;
1384 	struct ceph_mds_request_head *head;
1385 	const char *path1 = NULL;
1386 	const char *path2 = NULL;
1387 	u64 ino1 = 0, ino2 = 0;
1388 	int pathlen1 = 0, pathlen2 = 0;
1389 	int freepath1 = 0, freepath2 = 0;
1390 	int len;
1391 	u16 releases;
1392 	void *p, *end;
1393 	int ret;
1394 
1395 	ret = set_request_path_attr(req->r_inode, req->r_dentry,
1396 			      req->r_path1, req->r_ino1.ino,
1397 			      &path1, &pathlen1, &ino1, &freepath1);
1398 	if (ret < 0) {
1399 		msg = ERR_PTR(ret);
1400 		goto out;
1401 	}
1402 
1403 	ret = set_request_path_attr(NULL, req->r_old_dentry,
1404 			      req->r_path2, req->r_ino2.ino,
1405 			      &path2, &pathlen2, &ino2, &freepath2);
1406 	if (ret < 0) {
1407 		msg = ERR_PTR(ret);
1408 		goto out_free1;
1409 	}
1410 
1411 	len = sizeof(*head) +
1412 		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1413 
1414 	/* calculate (max) length for cap releases */
1415 	len += sizeof(struct ceph_mds_request_release) *
1416 		(!!req->r_inode_drop + !!req->r_dentry_drop +
1417 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1418 	if (req->r_dentry_drop)
1419 		len += req->r_dentry->d_name.len;
1420 	if (req->r_old_dentry_drop)
1421 		len += req->r_old_dentry->d_name.len;
1422 
1423 	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
1424 	if (IS_ERR(msg))
1425 		goto out_free2;
1426 
1427 	msg->hdr.tid = cpu_to_le64(req->r_tid);
1428 
1429 	head = msg->front.iov_base;
1430 	p = msg->front.iov_base + sizeof(*head);
1431 	end = msg->front.iov_base + msg->front.iov_len;
1432 
1433 	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1434 	head->op = cpu_to_le32(req->r_op);
1435 	head->caller_uid = cpu_to_le32(current_fsuid());
1436 	head->caller_gid = cpu_to_le32(current_fsgid());
1437 	head->args = req->r_args;
1438 
1439 	ceph_encode_filepath(&p, end, ino1, path1);
1440 	ceph_encode_filepath(&p, end, ino2, path2);
1441 
1442 	/* cap releases */
1443 	releases = 0;
1444 	if (req->r_inode_drop)
1445 		releases += ceph_encode_inode_release(&p,
1446 		      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1447 		      mds, req->r_inode_drop, req->r_inode_unless, 0);
1448 	if (req->r_dentry_drop)
1449 		releases += ceph_encode_dentry_release(&p, req->r_dentry,
1450 		       mds, req->r_dentry_drop, req->r_dentry_unless);
1451 	if (req->r_old_dentry_drop)
1452 		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1453 		       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1454 	if (req->r_old_inode_drop)
1455 		releases += ceph_encode_inode_release(&p,
1456 		      req->r_old_dentry->d_inode,
1457 		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1458 	head->num_releases = cpu_to_le16(releases);
1459 
1460 	BUG_ON(p > end);
1461 	msg->front.iov_len = p - msg->front.iov_base;
1462 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1463 
1464 	msg->pages = req->r_pages;
1465 	msg->nr_pages = req->r_num_pages;
1466 	msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1467 	msg->hdr.data_off = cpu_to_le16(0);
1468 
1469 out_free2:
1470 	if (freepath2)
1471 		kfree((char *)path2);
1472 out_free1:
1473 	if (freepath1)
1474 		kfree((char *)path1);
1475 out:
1476 	return msg;
1477 }
1478 
1479 /*
1480  * called under mdsc->mutex if error, under no mutex if
1481  * success.
1482  */
1483 static void complete_request(struct ceph_mds_client *mdsc,
1484 			     struct ceph_mds_request *req)
1485 {
1486 	if (req->r_callback)
1487 		req->r_callback(mdsc, req);
1488 	else
1489 		complete(&req->r_completion);
1490 }
1491 
1492 /*
1493  * called under mdsc->mutex
1494  */
1495 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1496 				  struct ceph_mds_request *req,
1497 				  int mds)
1498 {
1499 	struct ceph_mds_request_head *rhead;
1500 	struct ceph_msg *msg;
1501 	int flags = 0;
1502 
1503 	req->r_mds = mds;
1504 	req->r_attempts++;
1505 	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1506 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1507 
1508 	if (req->r_request) {
1509 		ceph_msg_put(req->r_request);
1510 		req->r_request = NULL;
1511 	}
1512 	msg = create_request_message(mdsc, req, mds);
1513 	if (IS_ERR(msg)) {
1514 		req->r_reply = ERR_PTR(PTR_ERR(msg));
1515 		complete_request(mdsc, req);
1516 		return -PTR_ERR(msg);
1517 	}
1518 	req->r_request = msg;
1519 
1520 	rhead = msg->front.iov_base;
1521 	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1522 	if (req->r_got_unsafe)
1523 		flags |= CEPH_MDS_FLAG_REPLAY;
1524 	if (req->r_locked_dir)
1525 		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1526 	rhead->flags = cpu_to_le32(flags);
1527 	rhead->num_fwd = req->r_num_fwd;
1528 	rhead->num_retry = req->r_attempts - 1;
1529 
1530 	dout(" r_locked_dir = %p\n", req->r_locked_dir);
1531 
1532 	if (req->r_target_inode && req->r_got_unsafe)
1533 		rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1534 	else
1535 		rhead->ino = 0;
1536 	return 0;
1537 }
1538 
1539 /*
1540  * send request, or put it on the appropriate wait list.
1541  */
1542 static int __do_request(struct ceph_mds_client *mdsc,
1543 			struct ceph_mds_request *req)
1544 {
1545 	struct ceph_mds_session *session = NULL;
1546 	int mds = -1;
1547 	int err = -EAGAIN;
1548 
1549 	if (req->r_reply)
1550 		goto out;
1551 
1552 	if (req->r_timeout &&
1553 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1554 		dout("do_request timed out\n");
1555 		err = -EIO;
1556 		goto finish;
1557 	}
1558 
1559 	mds = __choose_mds(mdsc, req);
1560 	if (mds < 0 ||
1561 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1562 		dout("do_request no mds or not active, waiting for map\n");
1563 		list_add(&req->r_wait, &mdsc->waiting_for_map);
1564 		goto out;
1565 	}
1566 
1567 	/* get, open session */
1568 	session = __ceph_lookup_mds_session(mdsc, mds);
1569 	if (!session)
1570 		session = register_session(mdsc, mds);
1571 	dout("do_request mds%d session %p state %s\n", mds, session,
1572 	     session_state_name(session->s_state));
1573 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1574 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
1575 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
1576 		    session->s_state == CEPH_MDS_SESSION_CLOSING)
1577 			__open_session(mdsc, session);
1578 		list_add(&req->r_wait, &session->s_waiting);
1579 		goto out_session;
1580 	}
1581 
1582 	/* send request */
1583 	req->r_session = get_session(session);
1584 	req->r_resend_mds = -1;   /* forget any previous mds hint */
1585 
1586 	if (req->r_request_started == 0)   /* note request start time */
1587 		req->r_request_started = jiffies;
1588 
1589 	err = __prepare_send_request(mdsc, req, mds);
1590 	if (!err) {
1591 		ceph_msg_get(req->r_request);
1592 		ceph_con_send(&session->s_con, req->r_request);
1593 	}
1594 
1595 out_session:
1596 	ceph_put_mds_session(session);
1597 out:
1598 	return err;
1599 
1600 finish:
1601 	req->r_reply = ERR_PTR(err);
1602 	complete_request(mdsc, req);
1603 	goto out;
1604 }
1605 
1606 /*
1607  * called under mdsc->mutex
1608  */
1609 static void __wake_requests(struct ceph_mds_client *mdsc,
1610 			    struct list_head *head)
1611 {
1612 	struct ceph_mds_request *req, *nreq;
1613 
1614 	list_for_each_entry_safe(req, nreq, head, r_wait) {
1615 		list_del_init(&req->r_wait);
1616 		__do_request(mdsc, req);
1617 	}
1618 }
1619 
1620 /*
1621  * Wake up threads with requests pending for @mds, so that they can
1622  * resubmit their requests to a possibly different mds.  If @all is set,
1623  * wake up if their requests has been forwarded to @mds, too.
1624  */
1625 static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
1626 {
1627 	struct ceph_mds_request *req;
1628 	struct rb_node *p;
1629 
1630 	dout("kick_requests mds%d\n", mds);
1631 	for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1632 		req = rb_entry(p, struct ceph_mds_request, r_node);
1633 		if (req->r_got_unsafe)
1634 			continue;
1635 		if (req->r_session &&
1636 		    req->r_session->s_mds == mds) {
1637 			dout(" kicking tid %llu\n", req->r_tid);
1638 			put_request_session(req);
1639 			__do_request(mdsc, req);
1640 		}
1641 	}
1642 }
1643 
1644 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1645 			      struct ceph_mds_request *req)
1646 {
1647 	dout("submit_request on %p\n", req);
1648 	mutex_lock(&mdsc->mutex);
1649 	__register_request(mdsc, req, NULL);
1650 	__do_request(mdsc, req);
1651 	mutex_unlock(&mdsc->mutex);
1652 }
1653 
1654 /*
1655  * Synchrously perform an mds request.  Take care of all of the
1656  * session setup, forwarding, retry details.
1657  */
1658 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1659 			 struct inode *dir,
1660 			 struct ceph_mds_request *req)
1661 {
1662 	int err;
1663 
1664 	dout("do_request on %p\n", req);
1665 
1666 	/* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1667 	if (req->r_inode)
1668 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1669 	if (req->r_locked_dir)
1670 		ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1671 	if (req->r_old_dentry)
1672 		ceph_get_cap_refs(
1673 			ceph_inode(req->r_old_dentry->d_parent->d_inode),
1674 			CEPH_CAP_PIN);
1675 
1676 	/* issue */
1677 	mutex_lock(&mdsc->mutex);
1678 	__register_request(mdsc, req, dir);
1679 	__do_request(mdsc, req);
1680 
1681 	/* wait */
1682 	if (!req->r_reply) {
1683 		mutex_unlock(&mdsc->mutex);
1684 		if (req->r_timeout) {
1685 			err = (long)wait_for_completion_interruptible_timeout(
1686 				&req->r_completion, req->r_timeout);
1687 			if (err == 0)
1688 				req->r_reply = ERR_PTR(-EIO);
1689 			else if (err < 0)
1690 				req->r_reply = ERR_PTR(err);
1691 		} else {
1692                         err = wait_for_completion_interruptible(
1693                                 &req->r_completion);
1694                         if (err)
1695                                 req->r_reply = ERR_PTR(err);
1696 		}
1697 		mutex_lock(&mdsc->mutex);
1698 	}
1699 
1700 	if (IS_ERR(req->r_reply)) {
1701 		err = PTR_ERR(req->r_reply);
1702 		req->r_reply = NULL;
1703 
1704 		if (err == -ERESTARTSYS) {
1705 			/* aborted */
1706 			req->r_aborted = true;
1707 
1708 			if (req->r_locked_dir &&
1709 			    (req->r_op & CEPH_MDS_OP_WRITE)) {
1710 				struct ceph_inode_info *ci =
1711 					ceph_inode(req->r_locked_dir);
1712 
1713 				dout("aborted, clearing I_COMPLETE on %p\n",
1714 				     req->r_locked_dir);
1715 				spin_lock(&req->r_locked_dir->i_lock);
1716 				ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1717 				ci->i_release_count++;
1718 				spin_unlock(&req->r_locked_dir->i_lock);
1719 			}
1720 		} else {
1721 			/* clean up this request */
1722 			__unregister_request(mdsc, req);
1723 			if (!list_empty(&req->r_unsafe_item))
1724 				list_del_init(&req->r_unsafe_item);
1725 			complete(&req->r_safe_completion);
1726 		}
1727 	} else if (req->r_err) {
1728 		err = req->r_err;
1729 	} else {
1730 		err = le32_to_cpu(req->r_reply_info.head->result);
1731 	}
1732 	mutex_unlock(&mdsc->mutex);
1733 
1734 	dout("do_request %p done, result %d\n", req, err);
1735 	return err;
1736 }
1737 
1738 /*
1739  * Handle mds reply.
1740  *
1741  * We take the session mutex and parse and process the reply immediately.
1742  * This preserves the logical ordering of replies, capabilities, etc., sent
1743  * by the MDS as they are applied to our local cache.
1744  */
1745 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1746 {
1747 	struct ceph_mds_client *mdsc = session->s_mdsc;
1748 	struct ceph_mds_request *req;
1749 	struct ceph_mds_reply_head *head = msg->front.iov_base;
1750 	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
1751 	u64 tid;
1752 	int err, result;
1753 	int mds = session->s_mds;
1754 
1755 	if (msg->front.iov_len < sizeof(*head)) {
1756 		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1757 		ceph_msg_dump(msg);
1758 		return;
1759 	}
1760 
1761 	/* get request, session */
1762 	tid = le64_to_cpu(msg->hdr.tid);
1763 	mutex_lock(&mdsc->mutex);
1764 	req = __lookup_request(mdsc, tid);
1765 	if (!req) {
1766 		dout("handle_reply on unknown tid %llu\n", tid);
1767 		mutex_unlock(&mdsc->mutex);
1768 		return;
1769 	}
1770 	dout("handle_reply %p\n", req);
1771 
1772 	/* correct session? */
1773 	if (!req->r_session && req->r_session != session) {
1774 		pr_err("mdsc_handle_reply got %llu on session mds%d"
1775 		       " not mds%d\n", tid, session->s_mds,
1776 		       req->r_session ? req->r_session->s_mds : -1);
1777 		mutex_unlock(&mdsc->mutex);
1778 		goto out;
1779 	}
1780 
1781 	/* dup? */
1782 	if ((req->r_got_unsafe && !head->safe) ||
1783 	    (req->r_got_safe && head->safe)) {
1784 		pr_warning("got a dup %s reply on %llu from mds%d\n",
1785 			   head->safe ? "safe" : "unsafe", tid, mds);
1786 		mutex_unlock(&mdsc->mutex);
1787 		goto out;
1788 	}
1789 
1790 	result = le32_to_cpu(head->result);
1791 
1792 	/*
1793 	 * Tolerate 2 consecutive ESTALEs from the same mds.
1794 	 * FIXME: we should be looking at the cap migrate_seq.
1795 	 */
1796 	if (result == -ESTALE) {
1797 		req->r_direct_mode = USE_AUTH_MDS;
1798 		req->r_num_stale++;
1799 		if (req->r_num_stale <= 2) {
1800 			__do_request(mdsc, req);
1801 			mutex_unlock(&mdsc->mutex);
1802 			goto out;
1803 		}
1804 	} else {
1805 		req->r_num_stale = 0;
1806 	}
1807 
1808 	if (head->safe) {
1809 		req->r_got_safe = true;
1810 		__unregister_request(mdsc, req);
1811 		complete(&req->r_safe_completion);
1812 
1813 		if (req->r_got_unsafe) {
1814 			/*
1815 			 * We already handled the unsafe response, now do the
1816 			 * cleanup.  No need to examine the response; the MDS
1817 			 * doesn't include any result info in the safe
1818 			 * response.  And even if it did, there is nothing
1819 			 * useful we could do with a revised return value.
1820 			 */
1821 			dout("got safe reply %llu, mds%d\n", tid, mds);
1822 			list_del_init(&req->r_unsafe_item);
1823 
1824 			/* last unsafe request during umount? */
1825 			if (mdsc->stopping && !__get_oldest_req(mdsc))
1826 				complete(&mdsc->safe_umount_waiters);
1827 			mutex_unlock(&mdsc->mutex);
1828 			goto out;
1829 		}
1830 	}
1831 
1832 	BUG_ON(req->r_reply);
1833 
1834 	if (!head->safe) {
1835 		req->r_got_unsafe = true;
1836 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
1837 	}
1838 
1839 	dout("handle_reply tid %lld result %d\n", tid, result);
1840 	rinfo = &req->r_reply_info;
1841 	err = parse_reply_info(msg, rinfo);
1842 	mutex_unlock(&mdsc->mutex);
1843 
1844 	mutex_lock(&session->s_mutex);
1845 	if (err < 0) {
1846 		pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
1847 		ceph_msg_dump(msg);
1848 		goto out_err;
1849 	}
1850 
1851 	/* snap trace */
1852 	if (rinfo->snapblob_len) {
1853 		down_write(&mdsc->snap_rwsem);
1854 		ceph_update_snap_trace(mdsc, rinfo->snapblob,
1855 			       rinfo->snapblob + rinfo->snapblob_len,
1856 			       le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
1857 		downgrade_write(&mdsc->snap_rwsem);
1858 	} else {
1859 		down_read(&mdsc->snap_rwsem);
1860 	}
1861 
1862 	/* insert trace into our cache */
1863 	err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
1864 	if (err == 0) {
1865 		if (result == 0 && rinfo->dir_nr)
1866 			ceph_readdir_prepopulate(req, req->r_session);
1867 		ceph_unreserve_caps(&req->r_caps_reservation);
1868 	}
1869 
1870 	up_read(&mdsc->snap_rwsem);
1871 out_err:
1872 	if (err) {
1873 		req->r_err = err;
1874 	} else {
1875 		req->r_reply = msg;
1876 		ceph_msg_get(msg);
1877 	}
1878 
1879 	add_cap_releases(mdsc, req->r_session, -1);
1880 	mutex_unlock(&session->s_mutex);
1881 
1882 	/* kick calling process */
1883 	complete_request(mdsc, req);
1884 out:
1885 	ceph_mdsc_put_request(req);
1886 	return;
1887 }
1888 
1889 
1890 
1891 /*
1892  * handle mds notification that our request has been forwarded.
1893  */
1894 static void handle_forward(struct ceph_mds_client *mdsc,
1895 			   struct ceph_mds_session *session,
1896 			   struct ceph_msg *msg)
1897 {
1898 	struct ceph_mds_request *req;
1899 	u64 tid;
1900 	u32 next_mds;
1901 	u32 fwd_seq;
1902 	u8 must_resend;
1903 	int err = -EINVAL;
1904 	void *p = msg->front.iov_base;
1905 	void *end = p + msg->front.iov_len;
1906 	int state;
1907 
1908 	ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
1909 	tid = ceph_decode_64(&p);
1910 	next_mds = ceph_decode_32(&p);
1911 	fwd_seq = ceph_decode_32(&p);
1912 	must_resend = ceph_decode_8(&p);
1913 
1914 	WARN_ON(must_resend);  /* shouldn't happen. */
1915 
1916 	mutex_lock(&mdsc->mutex);
1917 	req = __lookup_request(mdsc, tid);
1918 	if (!req) {
1919 		dout("forward %llu dne\n", tid);
1920 		goto out;  /* dup reply? */
1921 	}
1922 
1923 	if (next_mds >= mdsc->max_sessions)
1924 		goto out;
1925 
1926 	state = mdsc->sessions[next_mds]->s_state;
1927 	if (fwd_seq <= req->r_num_fwd) {
1928 		dout("forward %llu to mds%d - old seq %d <= %d\n",
1929 		     tid, next_mds, req->r_num_fwd, fwd_seq);
1930 	} else {
1931 		/* resend. forward race not possible; mds would drop */
1932 		dout("forward %llu to mds%d (we resend)\n", tid, next_mds);
1933 		req->r_num_fwd = fwd_seq;
1934 		req->r_resend_mds = next_mds;
1935 		put_request_session(req);
1936 		__do_request(mdsc, req);
1937 	}
1938 	ceph_mdsc_put_request(req);
1939 out:
1940 	mutex_unlock(&mdsc->mutex);
1941 	return;
1942 
1943 bad:
1944 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
1945 }
1946 
1947 /*
1948  * handle a mds session control message
1949  */
1950 static void handle_session(struct ceph_mds_session *session,
1951 			   struct ceph_msg *msg)
1952 {
1953 	struct ceph_mds_client *mdsc = session->s_mdsc;
1954 	u32 op;
1955 	u64 seq;
1956 	int mds = session->s_mds;
1957 	struct ceph_mds_session_head *h = msg->front.iov_base;
1958 	int wake = 0;
1959 
1960 	/* decode */
1961 	if (msg->front.iov_len != sizeof(*h))
1962 		goto bad;
1963 	op = le32_to_cpu(h->op);
1964 	seq = le64_to_cpu(h->seq);
1965 
1966 	mutex_lock(&mdsc->mutex);
1967 	if (op == CEPH_SESSION_CLOSE)
1968 		__unregister_session(mdsc, session);
1969 	/* FIXME: this ttl calculation is generous */
1970 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
1971 	mutex_unlock(&mdsc->mutex);
1972 
1973 	mutex_lock(&session->s_mutex);
1974 
1975 	dout("handle_session mds%d %s %p state %s seq %llu\n",
1976 	     mds, ceph_session_op_name(op), session,
1977 	     session_state_name(session->s_state), seq);
1978 
1979 	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
1980 		session->s_state = CEPH_MDS_SESSION_OPEN;
1981 		pr_info("mds%d came back\n", session->s_mds);
1982 	}
1983 
1984 	switch (op) {
1985 	case CEPH_SESSION_OPEN:
1986 		session->s_state = CEPH_MDS_SESSION_OPEN;
1987 		renewed_caps(mdsc, session, 0);
1988 		wake = 1;
1989 		if (mdsc->stopping)
1990 			__close_session(mdsc, session);
1991 		break;
1992 
1993 	case CEPH_SESSION_RENEWCAPS:
1994 		if (session->s_renew_seq == seq)
1995 			renewed_caps(mdsc, session, 1);
1996 		break;
1997 
1998 	case CEPH_SESSION_CLOSE:
1999 		remove_session_caps(session);
2000 		wake = 1; /* for good measure */
2001 		complete(&mdsc->session_close_waiters);
2002 		kick_requests(mdsc, mds, 0);      /* cur only */
2003 		break;
2004 
2005 	case CEPH_SESSION_STALE:
2006 		pr_info("mds%d caps went stale, renewing\n",
2007 			session->s_mds);
2008 		spin_lock(&session->s_cap_lock);
2009 		session->s_cap_gen++;
2010 		session->s_cap_ttl = 0;
2011 		spin_unlock(&session->s_cap_lock);
2012 		send_renew_caps(mdsc, session);
2013 		break;
2014 
2015 	case CEPH_SESSION_RECALL_STATE:
2016 		trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2017 		break;
2018 
2019 	default:
2020 		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2021 		WARN_ON(1);
2022 	}
2023 
2024 	mutex_unlock(&session->s_mutex);
2025 	if (wake) {
2026 		mutex_lock(&mdsc->mutex);
2027 		__wake_requests(mdsc, &session->s_waiting);
2028 		mutex_unlock(&mdsc->mutex);
2029 	}
2030 	return;
2031 
2032 bad:
2033 	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2034 	       (int)msg->front.iov_len);
2035 	ceph_msg_dump(msg);
2036 	return;
2037 }
2038 
2039 
2040 /*
2041  * called under session->mutex.
2042  */
2043 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2044 				   struct ceph_mds_session *session)
2045 {
2046 	struct ceph_mds_request *req, *nreq;
2047 	int err;
2048 
2049 	dout("replay_unsafe_requests mds%d\n", session->s_mds);
2050 
2051 	mutex_lock(&mdsc->mutex);
2052 	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2053 		err = __prepare_send_request(mdsc, req, session->s_mds);
2054 		if (!err) {
2055 			ceph_msg_get(req->r_request);
2056 			ceph_con_send(&session->s_con, req->r_request);
2057 		}
2058 	}
2059 	mutex_unlock(&mdsc->mutex);
2060 }
2061 
2062 /*
2063  * Encode information about a cap for a reconnect with the MDS.
2064  */
2065 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2066 			  void *arg)
2067 {
2068 	struct ceph_mds_cap_reconnect rec;
2069 	struct ceph_inode_info *ci;
2070 	struct ceph_pagelist *pagelist = arg;
2071 	char *path;
2072 	int pathlen, err;
2073 	u64 pathbase;
2074 	struct dentry *dentry;
2075 
2076 	ci = cap->ci;
2077 
2078 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2079 	     inode, ceph_vinop(inode), cap, cap->cap_id,
2080 	     ceph_cap_string(cap->issued));
2081 	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2082 	if (err)
2083 		return err;
2084 
2085 	dentry = d_find_alias(inode);
2086 	if (dentry) {
2087 		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2088 		if (IS_ERR(path)) {
2089 			err = PTR_ERR(path);
2090 			BUG_ON(err);
2091 		}
2092 	} else {
2093 		path = NULL;
2094 		pathlen = 0;
2095 	}
2096 	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2097 	if (err)
2098 		goto out;
2099 
2100 	spin_lock(&inode->i_lock);
2101 	cap->seq = 0;        /* reset cap seq */
2102 	cap->issue_seq = 0;  /* and issue_seq */
2103 	rec.cap_id = cpu_to_le64(cap->cap_id);
2104 	rec.pathbase = cpu_to_le64(pathbase);
2105 	rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2106 	rec.issued = cpu_to_le32(cap->issued);
2107 	rec.size = cpu_to_le64(inode->i_size);
2108 	ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
2109 	ceph_encode_timespec(&rec.atime, &inode->i_atime);
2110 	rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2111 	spin_unlock(&inode->i_lock);
2112 
2113 	err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
2114 
2115 out:
2116 	kfree(path);
2117 	dput(dentry);
2118 	return err;
2119 }
2120 
2121 
2122 /*
2123  * If an MDS fails and recovers, clients need to reconnect in order to
2124  * reestablish shared state.  This includes all caps issued through
2125  * this session _and_ the snap_realm hierarchy.  Because it's not
2126  * clear which snap realms the mds cares about, we send everything we
2127  * know about.. that ensures we'll then get any new info the
2128  * recovering MDS might have.
2129  *
2130  * This is a relatively heavyweight operation, but it's rare.
2131  *
2132  * called with mdsc->mutex held.
2133  */
2134 static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
2135 {
2136 	struct ceph_mds_session *session = NULL;
2137 	struct ceph_msg *reply;
2138 	struct rb_node *p;
2139 	int err;
2140 	struct ceph_pagelist *pagelist;
2141 
2142 	pr_info("reconnect to recovering mds%d\n", mds);
2143 
2144 	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2145 	if (!pagelist)
2146 		goto fail_nopagelist;
2147 	ceph_pagelist_init(pagelist);
2148 
2149 	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
2150 	if (IS_ERR(reply)) {
2151 		err = PTR_ERR(reply);
2152 		goto fail_nomsg;
2153 	}
2154 
2155 	/* find session */
2156 	session = __ceph_lookup_mds_session(mdsc, mds);
2157 	mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
2158 
2159 	if (session) {
2160 		mutex_lock(&session->s_mutex);
2161 
2162 		session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2163 		session->s_seq = 0;
2164 
2165 		ceph_con_open(&session->s_con,
2166 			      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2167 
2168 		/* replay unsafe requests */
2169 		replay_unsafe_requests(mdsc, session);
2170 	} else {
2171 		dout("no session for mds%d, will send short reconnect\n",
2172 		     mds);
2173 	}
2174 
2175 	down_read(&mdsc->snap_rwsem);
2176 
2177 	if (!session)
2178 		goto send;
2179 	dout("session %p state %s\n", session,
2180 	     session_state_name(session->s_state));
2181 
2182 	/* traverse this session's caps */
2183 	err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2184 	if (err)
2185 		goto fail;
2186 	err = iterate_session_caps(session, encode_caps_cb, pagelist);
2187 	if (err < 0)
2188 		goto out;
2189 
2190 	/*
2191 	 * snaprealms.  we provide mds with the ino, seq (version), and
2192 	 * parent for all of our realms.  If the mds has any newer info,
2193 	 * it will tell us.
2194 	 */
2195 	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2196 		struct ceph_snap_realm *realm =
2197 			rb_entry(p, struct ceph_snap_realm, node);
2198 		struct ceph_mds_snaprealm_reconnect sr_rec;
2199 
2200 		dout(" adding snap realm %llx seq %lld parent %llx\n",
2201 		     realm->ino, realm->seq, realm->parent_ino);
2202 		sr_rec.ino = cpu_to_le64(realm->ino);
2203 		sr_rec.seq = cpu_to_le64(realm->seq);
2204 		sr_rec.parent = cpu_to_le64(realm->parent_ino);
2205 		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2206 		if (err)
2207 			goto fail;
2208 	}
2209 
2210 send:
2211 	reply->pagelist = pagelist;
2212 	reply->hdr.data_len = cpu_to_le32(pagelist->length);
2213 	reply->nr_pages = calc_pages_for(0, pagelist->length);
2214 	ceph_con_send(&session->s_con, reply);
2215 
2216 	if (session) {
2217 		session->s_state = CEPH_MDS_SESSION_OPEN;
2218 		__wake_requests(mdsc, &session->s_waiting);
2219 	}
2220 
2221 out:
2222 	up_read(&mdsc->snap_rwsem);
2223 	if (session) {
2224 		mutex_unlock(&session->s_mutex);
2225 		ceph_put_mds_session(session);
2226 	}
2227 	mutex_lock(&mdsc->mutex);
2228 	return;
2229 
2230 fail:
2231 	ceph_msg_put(reply);
2232 fail_nomsg:
2233 	ceph_pagelist_release(pagelist);
2234 	kfree(pagelist);
2235 fail_nopagelist:
2236 	pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
2237 	goto out;
2238 }
2239 
2240 
2241 /*
2242  * compare old and new mdsmaps, kicking requests
2243  * and closing out old connections as necessary
2244  *
2245  * called under mdsc->mutex.
2246  */
2247 static void check_new_map(struct ceph_mds_client *mdsc,
2248 			  struct ceph_mdsmap *newmap,
2249 			  struct ceph_mdsmap *oldmap)
2250 {
2251 	int i;
2252 	int oldstate, newstate;
2253 	struct ceph_mds_session *s;
2254 
2255 	dout("check_new_map new %u old %u\n",
2256 	     newmap->m_epoch, oldmap->m_epoch);
2257 
2258 	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2259 		if (mdsc->sessions[i] == NULL)
2260 			continue;
2261 		s = mdsc->sessions[i];
2262 		oldstate = ceph_mdsmap_get_state(oldmap, i);
2263 		newstate = ceph_mdsmap_get_state(newmap, i);
2264 
2265 		dout("check_new_map mds%d state %s -> %s (session %s)\n",
2266 		     i, ceph_mds_state_name(oldstate),
2267 		     ceph_mds_state_name(newstate),
2268 		     session_state_name(s->s_state));
2269 
2270 		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2271 			   ceph_mdsmap_get_addr(newmap, i),
2272 			   sizeof(struct ceph_entity_addr))) {
2273 			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2274 				/* the session never opened, just close it
2275 				 * out now */
2276 				__wake_requests(mdsc, &s->s_waiting);
2277 				__unregister_session(mdsc, s);
2278 			} else {
2279 				/* just close it */
2280 				mutex_unlock(&mdsc->mutex);
2281 				mutex_lock(&s->s_mutex);
2282 				mutex_lock(&mdsc->mutex);
2283 				ceph_con_close(&s->s_con);
2284 				mutex_unlock(&s->s_mutex);
2285 				s->s_state = CEPH_MDS_SESSION_RESTARTING;
2286 			}
2287 
2288 			/* kick any requests waiting on the recovering mds */
2289 			kick_requests(mdsc, i, 1);
2290 		} else if (oldstate == newstate) {
2291 			continue;  /* nothing new with this mds */
2292 		}
2293 
2294 		/*
2295 		 * send reconnect?
2296 		 */
2297 		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2298 		    newstate >= CEPH_MDS_STATE_RECONNECT)
2299 			send_mds_reconnect(mdsc, i);
2300 
2301 		/*
2302 		 * kick requests on any mds that has gone active.
2303 		 *
2304 		 * kick requests on cur or forwarder: we may have sent
2305 		 * the request to mds1, mds1 told us it forwarded it
2306 		 * to mds2, but then we learn mds1 failed and can't be
2307 		 * sure it successfully forwarded our request before
2308 		 * it died.
2309 		 */
2310 		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2311 		    newstate >= CEPH_MDS_STATE_ACTIVE) {
2312 			pr_info("mds%d reconnect completed\n", s->s_mds);
2313 			kick_requests(mdsc, i, 1);
2314 			ceph_kick_flushing_caps(mdsc, s);
2315 			wake_up_session_caps(s, 1);
2316 		}
2317 	}
2318 }
2319 
2320 
2321 
2322 /*
2323  * leases
2324  */
2325 
2326 /*
2327  * caller must hold session s_mutex, dentry->d_lock
2328  */
2329 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2330 {
2331 	struct ceph_dentry_info *di = ceph_dentry(dentry);
2332 
2333 	ceph_put_mds_session(di->lease_session);
2334 	di->lease_session = NULL;
2335 }
2336 
2337 static void handle_lease(struct ceph_mds_client *mdsc,
2338 			 struct ceph_mds_session *session,
2339 			 struct ceph_msg *msg)
2340 {
2341 	struct super_block *sb = mdsc->client->sb;
2342 	struct inode *inode;
2343 	struct ceph_inode_info *ci;
2344 	struct dentry *parent, *dentry;
2345 	struct ceph_dentry_info *di;
2346 	int mds = session->s_mds;
2347 	struct ceph_mds_lease *h = msg->front.iov_base;
2348 	struct ceph_vino vino;
2349 	int mask;
2350 	struct qstr dname;
2351 	int release = 0;
2352 
2353 	dout("handle_lease from mds%d\n", mds);
2354 
2355 	/* decode */
2356 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2357 		goto bad;
2358 	vino.ino = le64_to_cpu(h->ino);
2359 	vino.snap = CEPH_NOSNAP;
2360 	mask = le16_to_cpu(h->mask);
2361 	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2362 	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2363 	if (dname.len != get_unaligned_le32(h+1))
2364 		goto bad;
2365 
2366 	mutex_lock(&session->s_mutex);
2367 	session->s_seq++;
2368 
2369 	/* lookup inode */
2370 	inode = ceph_find_inode(sb, vino);
2371 	dout("handle_lease '%s', mask %d, ino %llx %p\n",
2372 	     ceph_lease_op_name(h->action), mask, vino.ino, inode);
2373 	if (inode == NULL) {
2374 		dout("handle_lease no inode %llx\n", vino.ino);
2375 		goto release;
2376 	}
2377 	ci = ceph_inode(inode);
2378 
2379 	/* dentry */
2380 	parent = d_find_alias(inode);
2381 	if (!parent) {
2382 		dout("no parent dentry on inode %p\n", inode);
2383 		WARN_ON(1);
2384 		goto release;  /* hrm... */
2385 	}
2386 	dname.hash = full_name_hash(dname.name, dname.len);
2387 	dentry = d_lookup(parent, &dname);
2388 	dput(parent);
2389 	if (!dentry)
2390 		goto release;
2391 
2392 	spin_lock(&dentry->d_lock);
2393 	di = ceph_dentry(dentry);
2394 	switch (h->action) {
2395 	case CEPH_MDS_LEASE_REVOKE:
2396 		if (di && di->lease_session == session) {
2397 			h->seq = cpu_to_le32(di->lease_seq);
2398 			__ceph_mdsc_drop_dentry_lease(dentry);
2399 		}
2400 		release = 1;
2401 		break;
2402 
2403 	case CEPH_MDS_LEASE_RENEW:
2404 		if (di && di->lease_session == session &&
2405 		    di->lease_gen == session->s_cap_gen &&
2406 		    di->lease_renew_from &&
2407 		    di->lease_renew_after == 0) {
2408 			unsigned long duration =
2409 				le32_to_cpu(h->duration_ms) * HZ / 1000;
2410 
2411 			di->lease_seq = le32_to_cpu(h->seq);
2412 			dentry->d_time = di->lease_renew_from + duration;
2413 			di->lease_renew_after = di->lease_renew_from +
2414 				(duration >> 1);
2415 			di->lease_renew_from = 0;
2416 		}
2417 		break;
2418 	}
2419 	spin_unlock(&dentry->d_lock);
2420 	dput(dentry);
2421 
2422 	if (!release)
2423 		goto out;
2424 
2425 release:
2426 	/* let's just reuse the same message */
2427 	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2428 	ceph_msg_get(msg);
2429 	ceph_con_send(&session->s_con, msg);
2430 
2431 out:
2432 	iput(inode);
2433 	mutex_unlock(&session->s_mutex);
2434 	return;
2435 
2436 bad:
2437 	pr_err("corrupt lease message\n");
2438 	ceph_msg_dump(msg);
2439 }
2440 
2441 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2442 			      struct inode *inode,
2443 			      struct dentry *dentry, char action,
2444 			      u32 seq)
2445 {
2446 	struct ceph_msg *msg;
2447 	struct ceph_mds_lease *lease;
2448 	int len = sizeof(*lease) + sizeof(u32);
2449 	int dnamelen = 0;
2450 
2451 	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2452 	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
2453 	dnamelen = dentry->d_name.len;
2454 	len += dnamelen;
2455 
2456 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
2457 	if (IS_ERR(msg))
2458 		return;
2459 	lease = msg->front.iov_base;
2460 	lease->action = action;
2461 	lease->mask = cpu_to_le16(CEPH_LOCK_DN);
2462 	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2463 	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2464 	lease->seq = cpu_to_le32(seq);
2465 	put_unaligned_le32(dnamelen, lease + 1);
2466 	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2467 
2468 	/*
2469 	 * if this is a preemptive lease RELEASE, no need to
2470 	 * flush request stream, since the actual request will
2471 	 * soon follow.
2472 	 */
2473 	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2474 
2475 	ceph_con_send(&session->s_con, msg);
2476 }
2477 
2478 /*
2479  * Preemptively release a lease we expect to invalidate anyway.
2480  * Pass @inode always, @dentry is optional.
2481  */
2482 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2483 			     struct dentry *dentry, int mask)
2484 {
2485 	struct ceph_dentry_info *di;
2486 	struct ceph_mds_session *session;
2487 	u32 seq;
2488 
2489 	BUG_ON(inode == NULL);
2490 	BUG_ON(dentry == NULL);
2491 	BUG_ON(mask != CEPH_LOCK_DN);
2492 
2493 	/* is dentry lease valid? */
2494 	spin_lock(&dentry->d_lock);
2495 	di = ceph_dentry(dentry);
2496 	if (!di || !di->lease_session ||
2497 	    di->lease_session->s_mds < 0 ||
2498 	    di->lease_gen != di->lease_session->s_cap_gen ||
2499 	    !time_before(jiffies, dentry->d_time)) {
2500 		dout("lease_release inode %p dentry %p -- "
2501 		     "no lease on %d\n",
2502 		     inode, dentry, mask);
2503 		spin_unlock(&dentry->d_lock);
2504 		return;
2505 	}
2506 
2507 	/* we do have a lease on this dentry; note mds and seq */
2508 	session = ceph_get_mds_session(di->lease_session);
2509 	seq = di->lease_seq;
2510 	__ceph_mdsc_drop_dentry_lease(dentry);
2511 	spin_unlock(&dentry->d_lock);
2512 
2513 	dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2514 	     inode, dentry, mask, session->s_mds);
2515 	ceph_mdsc_lease_send_msg(session, inode, dentry,
2516 				 CEPH_MDS_LEASE_RELEASE, seq);
2517 	ceph_put_mds_session(session);
2518 }
2519 
2520 /*
2521  * drop all leases (and dentry refs) in preparation for umount
2522  */
2523 static void drop_leases(struct ceph_mds_client *mdsc)
2524 {
2525 	int i;
2526 
2527 	dout("drop_leases\n");
2528 	mutex_lock(&mdsc->mutex);
2529 	for (i = 0; i < mdsc->max_sessions; i++) {
2530 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2531 		if (!s)
2532 			continue;
2533 		mutex_unlock(&mdsc->mutex);
2534 		mutex_lock(&s->s_mutex);
2535 		mutex_unlock(&s->s_mutex);
2536 		ceph_put_mds_session(s);
2537 		mutex_lock(&mdsc->mutex);
2538 	}
2539 	mutex_unlock(&mdsc->mutex);
2540 }
2541 
2542 
2543 
2544 /*
2545  * delayed work -- periodically trim expired leases, renew caps with mds
2546  */
2547 static void schedule_delayed(struct ceph_mds_client *mdsc)
2548 {
2549 	int delay = 5;
2550 	unsigned hz = round_jiffies_relative(HZ * delay);
2551 	schedule_delayed_work(&mdsc->delayed_work, hz);
2552 }
2553 
2554 static void delayed_work(struct work_struct *work)
2555 {
2556 	int i;
2557 	struct ceph_mds_client *mdsc =
2558 		container_of(work, struct ceph_mds_client, delayed_work.work);
2559 	int renew_interval;
2560 	int renew_caps;
2561 
2562 	dout("mdsc delayed_work\n");
2563 	ceph_check_delayed_caps(mdsc);
2564 
2565 	mutex_lock(&mdsc->mutex);
2566 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2567 	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2568 				   mdsc->last_renew_caps);
2569 	if (renew_caps)
2570 		mdsc->last_renew_caps = jiffies;
2571 
2572 	for (i = 0; i < mdsc->max_sessions; i++) {
2573 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2574 		if (s == NULL)
2575 			continue;
2576 		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2577 			dout("resending session close request for mds%d\n",
2578 			     s->s_mds);
2579 			request_close_session(mdsc, s);
2580 			ceph_put_mds_session(s);
2581 			continue;
2582 		}
2583 		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2584 			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2585 				s->s_state = CEPH_MDS_SESSION_HUNG;
2586 				pr_info("mds%d hung\n", s->s_mds);
2587 			}
2588 		}
2589 		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2590 			/* this mds is failed or recovering, just wait */
2591 			ceph_put_mds_session(s);
2592 			continue;
2593 		}
2594 		mutex_unlock(&mdsc->mutex);
2595 
2596 		mutex_lock(&s->s_mutex);
2597 		if (renew_caps)
2598 			send_renew_caps(mdsc, s);
2599 		else
2600 			ceph_con_keepalive(&s->s_con);
2601 		add_cap_releases(mdsc, s, -1);
2602 		send_cap_releases(mdsc, s);
2603 		mutex_unlock(&s->s_mutex);
2604 		ceph_put_mds_session(s);
2605 
2606 		mutex_lock(&mdsc->mutex);
2607 	}
2608 	mutex_unlock(&mdsc->mutex);
2609 
2610 	schedule_delayed(mdsc);
2611 }
2612 
2613 
2614 int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
2615 {
2616 	mdsc->client = client;
2617 	mutex_init(&mdsc->mutex);
2618 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2619 	init_completion(&mdsc->safe_umount_waiters);
2620 	init_completion(&mdsc->session_close_waiters);
2621 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
2622 	mdsc->sessions = NULL;
2623 	mdsc->max_sessions = 0;
2624 	mdsc->stopping = 0;
2625 	init_rwsem(&mdsc->snap_rwsem);
2626 	mdsc->snap_realms = RB_ROOT;
2627 	INIT_LIST_HEAD(&mdsc->snap_empty);
2628 	spin_lock_init(&mdsc->snap_empty_lock);
2629 	mdsc->last_tid = 0;
2630 	mdsc->request_tree = RB_ROOT;
2631 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2632 	mdsc->last_renew_caps = jiffies;
2633 	INIT_LIST_HEAD(&mdsc->cap_delay_list);
2634 	spin_lock_init(&mdsc->cap_delay_lock);
2635 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
2636 	spin_lock_init(&mdsc->snap_flush_lock);
2637 	mdsc->cap_flush_seq = 0;
2638 	INIT_LIST_HEAD(&mdsc->cap_dirty);
2639 	mdsc->num_cap_flushing = 0;
2640 	spin_lock_init(&mdsc->cap_dirty_lock);
2641 	init_waitqueue_head(&mdsc->cap_flushing_wq);
2642 	spin_lock_init(&mdsc->dentry_lru_lock);
2643 	INIT_LIST_HEAD(&mdsc->dentry_lru);
2644 	return 0;
2645 }
2646 
2647 /*
2648  * Wait for safe replies on open mds requests.  If we time out, drop
2649  * all requests from the tree to avoid dangling dentry refs.
2650  */
2651 static void wait_requests(struct ceph_mds_client *mdsc)
2652 {
2653 	struct ceph_mds_request *req;
2654 	struct ceph_client *client = mdsc->client;
2655 
2656 	mutex_lock(&mdsc->mutex);
2657 	if (__get_oldest_req(mdsc)) {
2658 		mutex_unlock(&mdsc->mutex);
2659 
2660 		dout("wait_requests waiting for requests\n");
2661 		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2662 				    client->mount_args->mount_timeout * HZ);
2663 
2664 		/* tear down remaining requests */
2665 		mutex_lock(&mdsc->mutex);
2666 		while ((req = __get_oldest_req(mdsc))) {
2667 			dout("wait_requests timed out on tid %llu\n",
2668 			     req->r_tid);
2669 			__unregister_request(mdsc, req);
2670 		}
2671 	}
2672 	mutex_unlock(&mdsc->mutex);
2673 	dout("wait_requests done\n");
2674 }
2675 
2676 /*
2677  * called before mount is ro, and before dentries are torn down.
2678  * (hmm, does this still race with new lookups?)
2679  */
2680 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2681 {
2682 	dout("pre_umount\n");
2683 	mdsc->stopping = 1;
2684 
2685 	drop_leases(mdsc);
2686 	ceph_flush_dirty_caps(mdsc);
2687 	wait_requests(mdsc);
2688 }
2689 
2690 /*
2691  * wait for all write mds requests to flush.
2692  */
2693 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
2694 {
2695 	struct ceph_mds_request *req = NULL;
2696 	struct rb_node *n;
2697 
2698 	mutex_lock(&mdsc->mutex);
2699 	dout("wait_unsafe_requests want %lld\n", want_tid);
2700 	req = __get_oldest_req(mdsc);
2701 	while (req && req->r_tid <= want_tid) {
2702 		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
2703 			/* write op */
2704 			ceph_mdsc_get_request(req);
2705 			mutex_unlock(&mdsc->mutex);
2706 			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
2707 			     req->r_tid, want_tid);
2708 			wait_for_completion(&req->r_safe_completion);
2709 			mutex_lock(&mdsc->mutex);
2710 			n = rb_next(&req->r_node);
2711 			ceph_mdsc_put_request(req);
2712 		} else {
2713 			n = rb_next(&req->r_node);
2714 		}
2715 		if (!n)
2716 			break;
2717 		req = rb_entry(n, struct ceph_mds_request, r_node);
2718 	}
2719 	mutex_unlock(&mdsc->mutex);
2720 	dout("wait_unsafe_requests done\n");
2721 }
2722 
2723 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
2724 {
2725 	u64 want_tid, want_flush;
2726 
2727 	dout("sync\n");
2728 	mutex_lock(&mdsc->mutex);
2729 	want_tid = mdsc->last_tid;
2730 	want_flush = mdsc->cap_flush_seq;
2731 	mutex_unlock(&mdsc->mutex);
2732 	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
2733 
2734 	ceph_flush_dirty_caps(mdsc);
2735 
2736 	wait_unsafe_requests(mdsc, want_tid);
2737 	wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
2738 }
2739 
2740 
2741 /*
2742  * called after sb is ro.
2743  */
2744 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
2745 {
2746 	struct ceph_mds_session *session;
2747 	int i;
2748 	int n;
2749 	struct ceph_client *client = mdsc->client;
2750 	unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
2751 
2752 	dout("close_sessions\n");
2753 
2754 	mutex_lock(&mdsc->mutex);
2755 
2756 	/* close sessions */
2757 	started = jiffies;
2758 	while (time_before(jiffies, started + timeout)) {
2759 		dout("closing sessions\n");
2760 		n = 0;
2761 		for (i = 0; i < mdsc->max_sessions; i++) {
2762 			session = __ceph_lookup_mds_session(mdsc, i);
2763 			if (!session)
2764 				continue;
2765 			mutex_unlock(&mdsc->mutex);
2766 			mutex_lock(&session->s_mutex);
2767 			__close_session(mdsc, session);
2768 			mutex_unlock(&session->s_mutex);
2769 			ceph_put_mds_session(session);
2770 			mutex_lock(&mdsc->mutex);
2771 			n++;
2772 		}
2773 		if (n == 0)
2774 			break;
2775 
2776 		if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
2777 			break;
2778 
2779 		dout("waiting for sessions to close\n");
2780 		mutex_unlock(&mdsc->mutex);
2781 		wait_for_completion_timeout(&mdsc->session_close_waiters,
2782 					    timeout);
2783 		mutex_lock(&mdsc->mutex);
2784 	}
2785 
2786 	/* tear down remaining sessions */
2787 	for (i = 0; i < mdsc->max_sessions; i++) {
2788 		if (mdsc->sessions[i]) {
2789 			session = get_session(mdsc->sessions[i]);
2790 			__unregister_session(mdsc, session);
2791 			mutex_unlock(&mdsc->mutex);
2792 			mutex_lock(&session->s_mutex);
2793 			remove_session_caps(session);
2794 			mutex_unlock(&session->s_mutex);
2795 			ceph_put_mds_session(session);
2796 			mutex_lock(&mdsc->mutex);
2797 		}
2798 	}
2799 
2800 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
2801 
2802 	mutex_unlock(&mdsc->mutex);
2803 
2804 	ceph_cleanup_empty_realms(mdsc);
2805 
2806 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2807 
2808 	dout("stopped\n");
2809 }
2810 
2811 void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
2812 {
2813 	dout("stop\n");
2814 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
2815 	if (mdsc->mdsmap)
2816 		ceph_mdsmap_destroy(mdsc->mdsmap);
2817 	kfree(mdsc->sessions);
2818 }
2819 
2820 
2821 /*
2822  * handle mds map update.
2823  */
2824 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
2825 {
2826 	u32 epoch;
2827 	u32 maplen;
2828 	void *p = msg->front.iov_base;
2829 	void *end = p + msg->front.iov_len;
2830 	struct ceph_mdsmap *newmap, *oldmap;
2831 	struct ceph_fsid fsid;
2832 	int err = -EINVAL;
2833 
2834 	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
2835 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
2836 	if (ceph_check_fsid(mdsc->client, &fsid) < 0)
2837 		return;
2838 	epoch = ceph_decode_32(&p);
2839 	maplen = ceph_decode_32(&p);
2840 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
2841 
2842 	/* do we need it? */
2843 	ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
2844 	mutex_lock(&mdsc->mutex);
2845 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
2846 		dout("handle_map epoch %u <= our %u\n",
2847 		     epoch, mdsc->mdsmap->m_epoch);
2848 		mutex_unlock(&mdsc->mutex);
2849 		return;
2850 	}
2851 
2852 	newmap = ceph_mdsmap_decode(&p, end);
2853 	if (IS_ERR(newmap)) {
2854 		err = PTR_ERR(newmap);
2855 		goto bad_unlock;
2856 	}
2857 
2858 	/* swap into place */
2859 	if (mdsc->mdsmap) {
2860 		oldmap = mdsc->mdsmap;
2861 		mdsc->mdsmap = newmap;
2862 		check_new_map(mdsc, newmap, oldmap);
2863 		ceph_mdsmap_destroy(oldmap);
2864 	} else {
2865 		mdsc->mdsmap = newmap;  /* first mds map */
2866 	}
2867 	mdsc->client->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
2868 
2869 	__wake_requests(mdsc, &mdsc->waiting_for_map);
2870 
2871 	mutex_unlock(&mdsc->mutex);
2872 	schedule_delayed(mdsc);
2873 	return;
2874 
2875 bad_unlock:
2876 	mutex_unlock(&mdsc->mutex);
2877 bad:
2878 	pr_err("error decoding mdsmap %d\n", err);
2879 	return;
2880 }
2881 
2882 static struct ceph_connection *con_get(struct ceph_connection *con)
2883 {
2884 	struct ceph_mds_session *s = con->private;
2885 
2886 	if (get_session(s)) {
2887 		dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
2888 		return con;
2889 	}
2890 	dout("mdsc con_get %p FAIL\n", s);
2891 	return NULL;
2892 }
2893 
2894 static void con_put(struct ceph_connection *con)
2895 {
2896 	struct ceph_mds_session *s = con->private;
2897 
2898 	ceph_put_mds_session(s);
2899 	dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
2900 }
2901 
2902 /*
2903  * if the client is unresponsive for long enough, the mds will kill
2904  * the session entirely.
2905  */
2906 static void peer_reset(struct ceph_connection *con)
2907 {
2908 	struct ceph_mds_session *s = con->private;
2909 
2910 	pr_err("mds%d gave us the boot.  IMPLEMENT RECONNECT.\n",
2911 	       s->s_mds);
2912 }
2913 
2914 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2915 {
2916 	struct ceph_mds_session *s = con->private;
2917 	struct ceph_mds_client *mdsc = s->s_mdsc;
2918 	int type = le16_to_cpu(msg->hdr.type);
2919 
2920 	mutex_lock(&mdsc->mutex);
2921 	if (__verify_registered_session(mdsc, s) < 0) {
2922 		mutex_unlock(&mdsc->mutex);
2923 		goto out;
2924 	}
2925 	mutex_unlock(&mdsc->mutex);
2926 
2927 	switch (type) {
2928 	case CEPH_MSG_MDS_MAP:
2929 		ceph_mdsc_handle_map(mdsc, msg);
2930 		break;
2931 	case CEPH_MSG_CLIENT_SESSION:
2932 		handle_session(s, msg);
2933 		break;
2934 	case CEPH_MSG_CLIENT_REPLY:
2935 		handle_reply(s, msg);
2936 		break;
2937 	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
2938 		handle_forward(mdsc, s, msg);
2939 		break;
2940 	case CEPH_MSG_CLIENT_CAPS:
2941 		ceph_handle_caps(s, msg);
2942 		break;
2943 	case CEPH_MSG_CLIENT_SNAP:
2944 		ceph_handle_snap(mdsc, s, msg);
2945 		break;
2946 	case CEPH_MSG_CLIENT_LEASE:
2947 		handle_lease(mdsc, s, msg);
2948 		break;
2949 
2950 	default:
2951 		pr_err("received unknown message type %d %s\n", type,
2952 		       ceph_msg_type_name(type));
2953 	}
2954 out:
2955 	ceph_msg_put(msg);
2956 }
2957 
2958 /*
2959  * authentication
2960  */
2961 static int get_authorizer(struct ceph_connection *con,
2962 			  void **buf, int *len, int *proto,
2963 			  void **reply_buf, int *reply_len, int force_new)
2964 {
2965 	struct ceph_mds_session *s = con->private;
2966 	struct ceph_mds_client *mdsc = s->s_mdsc;
2967 	struct ceph_auth_client *ac = mdsc->client->monc.auth;
2968 	int ret = 0;
2969 
2970 	if (force_new && s->s_authorizer) {
2971 		ac->ops->destroy_authorizer(ac, s->s_authorizer);
2972 		s->s_authorizer = NULL;
2973 	}
2974 	if (s->s_authorizer == NULL) {
2975 		if (ac->ops->create_authorizer) {
2976 			ret = ac->ops->create_authorizer(
2977 				ac, CEPH_ENTITY_TYPE_MDS,
2978 				&s->s_authorizer,
2979 				&s->s_authorizer_buf,
2980 				&s->s_authorizer_buf_len,
2981 				&s->s_authorizer_reply_buf,
2982 				&s->s_authorizer_reply_buf_len);
2983 			if (ret)
2984 				return ret;
2985 		}
2986 	}
2987 
2988 	*proto = ac->protocol;
2989 	*buf = s->s_authorizer_buf;
2990 	*len = s->s_authorizer_buf_len;
2991 	*reply_buf = s->s_authorizer_reply_buf;
2992 	*reply_len = s->s_authorizer_reply_buf_len;
2993 	return 0;
2994 }
2995 
2996 
2997 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2998 {
2999 	struct ceph_mds_session *s = con->private;
3000 	struct ceph_mds_client *mdsc = s->s_mdsc;
3001 	struct ceph_auth_client *ac = mdsc->client->monc.auth;
3002 
3003 	return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3004 }
3005 
3006 static int invalidate_authorizer(struct ceph_connection *con)
3007 {
3008 	struct ceph_mds_session *s = con->private;
3009 	struct ceph_mds_client *mdsc = s->s_mdsc;
3010 	struct ceph_auth_client *ac = mdsc->client->monc.auth;
3011 
3012 	if (ac->ops->invalidate_authorizer)
3013 		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3014 
3015 	return ceph_monc_validate_auth(&mdsc->client->monc);
3016 }
3017 
3018 const static struct ceph_connection_operations mds_con_ops = {
3019 	.get = con_get,
3020 	.put = con_put,
3021 	.dispatch = dispatch,
3022 	.get_authorizer = get_authorizer,
3023 	.verify_authorizer_reply = verify_authorizer_reply,
3024 	.invalidate_authorizer = invalidate_authorizer,
3025 	.peer_reset = peer_reset,
3026 };
3027 
3028 
3029 
3030 
3031 /* eof */
3032