xref: /linux/fs/ceph/mds_client.c (revision a4f174dee4ae842e07cab7eeec194a3e60925c8d)
1 #include <linux/ceph/ceph_debug.h>
2 
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/gfp.h>
7 #include <linux/sched.h>
8 #include <linux/debugfs.h>
9 #include <linux/seq_file.h>
10 #include <linux/utsname.h>
11 
12 #include "super.h"
13 #include "mds_client.h"
14 
15 #include <linux/ceph/ceph_features.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/pagelist.h>
19 #include <linux/ceph/auth.h>
20 #include <linux/ceph/debugfs.h>
21 
22 /*
23  * A cluster of MDS (metadata server) daemons is responsible for
24  * managing the file system namespace (the directory hierarchy and
25  * inodes) and for coordinating shared access to storage.  Metadata is
26  * partitioning hierarchically across a number of servers, and that
27  * partition varies over time as the cluster adjusts the distribution
28  * in order to balance load.
29  *
30  * The MDS client is primarily responsible to managing synchronous
31  * metadata requests for operations like open, unlink, and so forth.
32  * If there is a MDS failure, we find out about it when we (possibly
33  * request and) receive a new MDS map, and can resubmit affected
34  * requests.
35  *
36  * For the most part, though, we take advantage of a lossless
37  * communications channel to the MDS, and do not need to worry about
38  * timing out or resubmitting requests.
39  *
40  * We maintain a stateful "session" with each MDS we interact with.
41  * Within each session, we sent periodic heartbeat messages to ensure
42  * any capabilities or leases we have been issues remain valid.  If
43  * the session times out and goes stale, our leases and capabilities
44  * are no longer valid.
45  */
46 
47 struct ceph_reconnect_state {
48 	int nr_caps;
49 	struct ceph_pagelist *pagelist;
50 	bool flock;
51 };
52 
53 static void __wake_requests(struct ceph_mds_client *mdsc,
54 			    struct list_head *head);
55 
56 static const struct ceph_connection_operations mds_con_ops;
57 
58 
59 /*
60  * mds reply parsing
61  */
62 
63 /*
64  * parse individual inode info
65  */
66 static int parse_reply_info_in(void **p, void *end,
67 			       struct ceph_mds_reply_info_in *info,
68 			       u64 features)
69 {
70 	int err = -EIO;
71 
72 	info->in = *p;
73 	*p += sizeof(struct ceph_mds_reply_inode) +
74 		sizeof(*info->in->fragtree.splits) *
75 		le32_to_cpu(info->in->fragtree.nsplits);
76 
77 	ceph_decode_32_safe(p, end, info->symlink_len, bad);
78 	ceph_decode_need(p, end, info->symlink_len, bad);
79 	info->symlink = *p;
80 	*p += info->symlink_len;
81 
82 	if (features & CEPH_FEATURE_DIRLAYOUTHASH)
83 		ceph_decode_copy_safe(p, end, &info->dir_layout,
84 				      sizeof(info->dir_layout), bad);
85 	else
86 		memset(&info->dir_layout, 0, sizeof(info->dir_layout));
87 
88 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
89 	ceph_decode_need(p, end, info->xattr_len, bad);
90 	info->xattr_data = *p;
91 	*p += info->xattr_len;
92 	return 0;
93 bad:
94 	return err;
95 }
96 
97 /*
98  * parse a normal reply, which may contain a (dir+)dentry and/or a
99  * target inode.
100  */
101 static int parse_reply_info_trace(void **p, void *end,
102 				  struct ceph_mds_reply_info_parsed *info,
103 				  u64 features)
104 {
105 	int err;
106 
107 	if (info->head->is_dentry) {
108 		err = parse_reply_info_in(p, end, &info->diri, features);
109 		if (err < 0)
110 			goto out_bad;
111 
112 		if (unlikely(*p + sizeof(*info->dirfrag) > end))
113 			goto bad;
114 		info->dirfrag = *p;
115 		*p += sizeof(*info->dirfrag) +
116 			sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
117 		if (unlikely(*p > end))
118 			goto bad;
119 
120 		ceph_decode_32_safe(p, end, info->dname_len, bad);
121 		ceph_decode_need(p, end, info->dname_len, bad);
122 		info->dname = *p;
123 		*p += info->dname_len;
124 		info->dlease = *p;
125 		*p += sizeof(*info->dlease);
126 	}
127 
128 	if (info->head->is_target) {
129 		err = parse_reply_info_in(p, end, &info->targeti, features);
130 		if (err < 0)
131 			goto out_bad;
132 	}
133 
134 	if (unlikely(*p != end))
135 		goto bad;
136 	return 0;
137 
138 bad:
139 	err = -EIO;
140 out_bad:
141 	pr_err("problem parsing mds trace %d\n", err);
142 	return err;
143 }
144 
145 /*
146  * parse readdir results
147  */
148 static int parse_reply_info_dir(void **p, void *end,
149 				struct ceph_mds_reply_info_parsed *info,
150 				u64 features)
151 {
152 	u32 num, i = 0;
153 	int err;
154 
155 	info->dir_dir = *p;
156 	if (*p + sizeof(*info->dir_dir) > end)
157 		goto bad;
158 	*p += sizeof(*info->dir_dir) +
159 		sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
160 	if (*p > end)
161 		goto bad;
162 
163 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
164 	num = ceph_decode_32(p);
165 	info->dir_end = ceph_decode_8(p);
166 	info->dir_complete = ceph_decode_8(p);
167 	if (num == 0)
168 		goto done;
169 
170 	BUG_ON(!info->dir_in);
171 	info->dir_dname = (void *)(info->dir_in + num);
172 	info->dir_dname_len = (void *)(info->dir_dname + num);
173 	info->dir_dlease = (void *)(info->dir_dname_len + num);
174 	if ((unsigned long)(info->dir_dlease + num) >
175 	    (unsigned long)info->dir_in + info->dir_buf_size) {
176 		pr_err("dir contents are larger than expected\n");
177 		WARN_ON(1);
178 		goto bad;
179 	}
180 
181 	info->dir_nr = num;
182 	while (num) {
183 		/* dentry */
184 		ceph_decode_need(p, end, sizeof(u32)*2, bad);
185 		info->dir_dname_len[i] = ceph_decode_32(p);
186 		ceph_decode_need(p, end, info->dir_dname_len[i], bad);
187 		info->dir_dname[i] = *p;
188 		*p += info->dir_dname_len[i];
189 		dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
190 		     info->dir_dname[i]);
191 		info->dir_dlease[i] = *p;
192 		*p += sizeof(struct ceph_mds_reply_lease);
193 
194 		/* inode */
195 		err = parse_reply_info_in(p, end, &info->dir_in[i], features);
196 		if (err < 0)
197 			goto out_bad;
198 		i++;
199 		num--;
200 	}
201 
202 done:
203 	if (*p != end)
204 		goto bad;
205 	return 0;
206 
207 bad:
208 	err = -EIO;
209 out_bad:
210 	pr_err("problem parsing dir contents %d\n", err);
211 	return err;
212 }
213 
214 /*
215  * parse fcntl F_GETLK results
216  */
217 static int parse_reply_info_filelock(void **p, void *end,
218 				     struct ceph_mds_reply_info_parsed *info,
219 				     u64 features)
220 {
221 	if (*p + sizeof(*info->filelock_reply) > end)
222 		goto bad;
223 
224 	info->filelock_reply = *p;
225 	*p += sizeof(*info->filelock_reply);
226 
227 	if (unlikely(*p != end))
228 		goto bad;
229 	return 0;
230 
231 bad:
232 	return -EIO;
233 }
234 
235 /*
236  * parse create results
237  */
238 static int parse_reply_info_create(void **p, void *end,
239 				  struct ceph_mds_reply_info_parsed *info,
240 				  u64 features)
241 {
242 	if (features & CEPH_FEATURE_REPLY_CREATE_INODE) {
243 		if (*p == end) {
244 			info->has_create_ino = false;
245 		} else {
246 			info->has_create_ino = true;
247 			info->ino = ceph_decode_64(p);
248 		}
249 	}
250 
251 	if (unlikely(*p != end))
252 		goto bad;
253 	return 0;
254 
255 bad:
256 	return -EIO;
257 }
258 
259 /*
260  * parse extra results
261  */
262 static int parse_reply_info_extra(void **p, void *end,
263 				  struct ceph_mds_reply_info_parsed *info,
264 				  u64 features)
265 {
266 	if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
267 		return parse_reply_info_filelock(p, end, info, features);
268 	else if (info->head->op == CEPH_MDS_OP_READDIR ||
269 		 info->head->op == CEPH_MDS_OP_LSSNAP)
270 		return parse_reply_info_dir(p, end, info, features);
271 	else if (info->head->op == CEPH_MDS_OP_CREATE)
272 		return parse_reply_info_create(p, end, info, features);
273 	else
274 		return -EIO;
275 }
276 
277 /*
278  * parse entire mds reply
279  */
280 static int parse_reply_info(struct ceph_msg *msg,
281 			    struct ceph_mds_reply_info_parsed *info,
282 			    u64 features)
283 {
284 	void *p, *end;
285 	u32 len;
286 	int err;
287 
288 	info->head = msg->front.iov_base;
289 	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
290 	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
291 
292 	/* trace */
293 	ceph_decode_32_safe(&p, end, len, bad);
294 	if (len > 0) {
295 		ceph_decode_need(&p, end, len, bad);
296 		err = parse_reply_info_trace(&p, p+len, info, features);
297 		if (err < 0)
298 			goto out_bad;
299 	}
300 
301 	/* extra */
302 	ceph_decode_32_safe(&p, end, len, bad);
303 	if (len > 0) {
304 		ceph_decode_need(&p, end, len, bad);
305 		err = parse_reply_info_extra(&p, p+len, info, features);
306 		if (err < 0)
307 			goto out_bad;
308 	}
309 
310 	/* snap blob */
311 	ceph_decode_32_safe(&p, end, len, bad);
312 	info->snapblob_len = len;
313 	info->snapblob = p;
314 	p += len;
315 
316 	if (p != end)
317 		goto bad;
318 	return 0;
319 
320 bad:
321 	err = -EIO;
322 out_bad:
323 	pr_err("mds parse_reply err %d\n", err);
324 	return err;
325 }
326 
327 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
328 {
329 	if (!info->dir_in)
330 		return;
331 	free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size));
332 }
333 
334 
335 /*
336  * sessions
337  */
338 const char *ceph_session_state_name(int s)
339 {
340 	switch (s) {
341 	case CEPH_MDS_SESSION_NEW: return "new";
342 	case CEPH_MDS_SESSION_OPENING: return "opening";
343 	case CEPH_MDS_SESSION_OPEN: return "open";
344 	case CEPH_MDS_SESSION_HUNG: return "hung";
345 	case CEPH_MDS_SESSION_CLOSING: return "closing";
346 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
347 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
348 	default: return "???";
349 	}
350 }
351 
352 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
353 {
354 	if (atomic_inc_not_zero(&s->s_ref)) {
355 		dout("mdsc get_session %p %d -> %d\n", s,
356 		     atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
357 		return s;
358 	} else {
359 		dout("mdsc get_session %p 0 -- FAIL", s);
360 		return NULL;
361 	}
362 }
363 
364 void ceph_put_mds_session(struct ceph_mds_session *s)
365 {
366 	dout("mdsc put_session %p %d -> %d\n", s,
367 	     atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
368 	if (atomic_dec_and_test(&s->s_ref)) {
369 		if (s->s_auth.authorizer)
370 			ceph_auth_destroy_authorizer(
371 				s->s_mdsc->fsc->client->monc.auth,
372 				s->s_auth.authorizer);
373 		kfree(s);
374 	}
375 }
376 
377 /*
378  * called under mdsc->mutex
379  */
380 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
381 						   int mds)
382 {
383 	struct ceph_mds_session *session;
384 
385 	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
386 		return NULL;
387 	session = mdsc->sessions[mds];
388 	dout("lookup_mds_session %p %d\n", session,
389 	     atomic_read(&session->s_ref));
390 	get_session(session);
391 	return session;
392 }
393 
394 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
395 {
396 	if (mds >= mdsc->max_sessions)
397 		return false;
398 	return mdsc->sessions[mds];
399 }
400 
401 static int __verify_registered_session(struct ceph_mds_client *mdsc,
402 				       struct ceph_mds_session *s)
403 {
404 	if (s->s_mds >= mdsc->max_sessions ||
405 	    mdsc->sessions[s->s_mds] != s)
406 		return -ENOENT;
407 	return 0;
408 }
409 
410 /*
411  * create+register a new session for given mds.
412  * called under mdsc->mutex.
413  */
414 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
415 						 int mds)
416 {
417 	struct ceph_mds_session *s;
418 
419 	if (mds >= mdsc->mdsmap->m_max_mds)
420 		return ERR_PTR(-EINVAL);
421 
422 	s = kzalloc(sizeof(*s), GFP_NOFS);
423 	if (!s)
424 		return ERR_PTR(-ENOMEM);
425 	s->s_mdsc = mdsc;
426 	s->s_mds = mds;
427 	s->s_state = CEPH_MDS_SESSION_NEW;
428 	s->s_ttl = 0;
429 	s->s_seq = 0;
430 	mutex_init(&s->s_mutex);
431 
432 	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
433 
434 	spin_lock_init(&s->s_gen_ttl_lock);
435 	s->s_cap_gen = 0;
436 	s->s_cap_ttl = jiffies - 1;
437 
438 	spin_lock_init(&s->s_cap_lock);
439 	s->s_renew_requested = 0;
440 	s->s_renew_seq = 0;
441 	INIT_LIST_HEAD(&s->s_caps);
442 	s->s_nr_caps = 0;
443 	s->s_trim_caps = 0;
444 	atomic_set(&s->s_ref, 1);
445 	INIT_LIST_HEAD(&s->s_waiting);
446 	INIT_LIST_HEAD(&s->s_unsafe);
447 	s->s_num_cap_releases = 0;
448 	s->s_cap_reconnect = 0;
449 	s->s_cap_iterator = NULL;
450 	INIT_LIST_HEAD(&s->s_cap_releases);
451 	INIT_LIST_HEAD(&s->s_cap_releases_done);
452 	INIT_LIST_HEAD(&s->s_cap_flushing);
453 	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
454 
455 	dout("register_session mds%d\n", mds);
456 	if (mds >= mdsc->max_sessions) {
457 		int newmax = 1 << get_count_order(mds+1);
458 		struct ceph_mds_session **sa;
459 
460 		dout("register_session realloc to %d\n", newmax);
461 		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
462 		if (sa == NULL)
463 			goto fail_realloc;
464 		if (mdsc->sessions) {
465 			memcpy(sa, mdsc->sessions,
466 			       mdsc->max_sessions * sizeof(void *));
467 			kfree(mdsc->sessions);
468 		}
469 		mdsc->sessions = sa;
470 		mdsc->max_sessions = newmax;
471 	}
472 	mdsc->sessions[mds] = s;
473 	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
474 
475 	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
476 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
477 
478 	return s;
479 
480 fail_realloc:
481 	kfree(s);
482 	return ERR_PTR(-ENOMEM);
483 }
484 
485 /*
486  * called under mdsc->mutex
487  */
488 static void __unregister_session(struct ceph_mds_client *mdsc,
489 			       struct ceph_mds_session *s)
490 {
491 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
492 	BUG_ON(mdsc->sessions[s->s_mds] != s);
493 	mdsc->sessions[s->s_mds] = NULL;
494 	ceph_con_close(&s->s_con);
495 	ceph_put_mds_session(s);
496 }
497 
498 /*
499  * drop session refs in request.
500  *
501  * should be last request ref, or hold mdsc->mutex
502  */
503 static void put_request_session(struct ceph_mds_request *req)
504 {
505 	if (req->r_session) {
506 		ceph_put_mds_session(req->r_session);
507 		req->r_session = NULL;
508 	}
509 }
510 
511 void ceph_mdsc_release_request(struct kref *kref)
512 {
513 	struct ceph_mds_request *req = container_of(kref,
514 						    struct ceph_mds_request,
515 						    r_kref);
516 	destroy_reply_info(&req->r_reply_info);
517 	if (req->r_request)
518 		ceph_msg_put(req->r_request);
519 	if (req->r_reply)
520 		ceph_msg_put(req->r_reply);
521 	if (req->r_inode) {
522 		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
523 		iput(req->r_inode);
524 	}
525 	if (req->r_locked_dir)
526 		ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
527 	if (req->r_target_inode)
528 		iput(req->r_target_inode);
529 	if (req->r_dentry)
530 		dput(req->r_dentry);
531 	if (req->r_old_dentry)
532 		dput(req->r_old_dentry);
533 	if (req->r_old_dentry_dir) {
534 		/*
535 		 * track (and drop pins for) r_old_dentry_dir
536 		 * separately, since r_old_dentry's d_parent may have
537 		 * changed between the dir mutex being dropped and
538 		 * this request being freed.
539 		 */
540 		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
541 				  CEPH_CAP_PIN);
542 		iput(req->r_old_dentry_dir);
543 	}
544 	kfree(req->r_path1);
545 	kfree(req->r_path2);
546 	if (req->r_pagelist)
547 		ceph_pagelist_release(req->r_pagelist);
548 	put_request_session(req);
549 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
550 	kfree(req);
551 }
552 
553 /*
554  * lookup session, bump ref if found.
555  *
556  * called under mdsc->mutex.
557  */
558 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
559 					     u64 tid)
560 {
561 	struct ceph_mds_request *req;
562 	struct rb_node *n = mdsc->request_tree.rb_node;
563 
564 	while (n) {
565 		req = rb_entry(n, struct ceph_mds_request, r_node);
566 		if (tid < req->r_tid)
567 			n = n->rb_left;
568 		else if (tid > req->r_tid)
569 			n = n->rb_right;
570 		else {
571 			ceph_mdsc_get_request(req);
572 			return req;
573 		}
574 	}
575 	return NULL;
576 }
577 
578 static void __insert_request(struct ceph_mds_client *mdsc,
579 			     struct ceph_mds_request *new)
580 {
581 	struct rb_node **p = &mdsc->request_tree.rb_node;
582 	struct rb_node *parent = NULL;
583 	struct ceph_mds_request *req = NULL;
584 
585 	while (*p) {
586 		parent = *p;
587 		req = rb_entry(parent, struct ceph_mds_request, r_node);
588 		if (new->r_tid < req->r_tid)
589 			p = &(*p)->rb_left;
590 		else if (new->r_tid > req->r_tid)
591 			p = &(*p)->rb_right;
592 		else
593 			BUG();
594 	}
595 
596 	rb_link_node(&new->r_node, parent, p);
597 	rb_insert_color(&new->r_node, &mdsc->request_tree);
598 }
599 
600 /*
601  * Register an in-flight request, and assign a tid.  Link to directory
602  * are modifying (if any).
603  *
604  * Called under mdsc->mutex.
605  */
606 static void __register_request(struct ceph_mds_client *mdsc,
607 			       struct ceph_mds_request *req,
608 			       struct inode *dir)
609 {
610 	req->r_tid = ++mdsc->last_tid;
611 	if (req->r_num_caps)
612 		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
613 				  req->r_num_caps);
614 	dout("__register_request %p tid %lld\n", req, req->r_tid);
615 	ceph_mdsc_get_request(req);
616 	__insert_request(mdsc, req);
617 
618 	req->r_uid = current_fsuid();
619 	req->r_gid = current_fsgid();
620 
621 	if (dir) {
622 		struct ceph_inode_info *ci = ceph_inode(dir);
623 
624 		ihold(dir);
625 		spin_lock(&ci->i_unsafe_lock);
626 		req->r_unsafe_dir = dir;
627 		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
628 		spin_unlock(&ci->i_unsafe_lock);
629 	}
630 }
631 
632 static void __unregister_request(struct ceph_mds_client *mdsc,
633 				 struct ceph_mds_request *req)
634 {
635 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
636 	rb_erase(&req->r_node, &mdsc->request_tree);
637 	RB_CLEAR_NODE(&req->r_node);
638 
639 	if (req->r_unsafe_dir) {
640 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
641 
642 		spin_lock(&ci->i_unsafe_lock);
643 		list_del_init(&req->r_unsafe_dir_item);
644 		spin_unlock(&ci->i_unsafe_lock);
645 
646 		iput(req->r_unsafe_dir);
647 		req->r_unsafe_dir = NULL;
648 	}
649 
650 	complete_all(&req->r_safe_completion);
651 
652 	ceph_mdsc_put_request(req);
653 }
654 
655 /*
656  * Choose mds to send request to next.  If there is a hint set in the
657  * request (e.g., due to a prior forward hint from the mds), use that.
658  * Otherwise, consult frag tree and/or caps to identify the
659  * appropriate mds.  If all else fails, choose randomly.
660  *
661  * Called under mdsc->mutex.
662  */
663 static struct dentry *get_nonsnap_parent(struct dentry *dentry)
664 {
665 	/*
666 	 * we don't need to worry about protecting the d_parent access
667 	 * here because we never renaming inside the snapped namespace
668 	 * except to resplice to another snapdir, and either the old or new
669 	 * result is a valid result.
670 	 */
671 	while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
672 		dentry = dentry->d_parent;
673 	return dentry;
674 }
675 
676 static int __choose_mds(struct ceph_mds_client *mdsc,
677 			struct ceph_mds_request *req)
678 {
679 	struct inode *inode;
680 	struct ceph_inode_info *ci;
681 	struct ceph_cap *cap;
682 	int mode = req->r_direct_mode;
683 	int mds = -1;
684 	u32 hash = req->r_direct_hash;
685 	bool is_hash = req->r_direct_is_hash;
686 
687 	/*
688 	 * is there a specific mds we should try?  ignore hint if we have
689 	 * no session and the mds is not up (active or recovering).
690 	 */
691 	if (req->r_resend_mds >= 0 &&
692 	    (__have_session(mdsc, req->r_resend_mds) ||
693 	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
694 		dout("choose_mds using resend_mds mds%d\n",
695 		     req->r_resend_mds);
696 		return req->r_resend_mds;
697 	}
698 
699 	if (mode == USE_RANDOM_MDS)
700 		goto random;
701 
702 	inode = NULL;
703 	if (req->r_inode) {
704 		inode = req->r_inode;
705 	} else if (req->r_dentry) {
706 		/* ignore race with rename; old or new d_parent is okay */
707 		struct dentry *parent = req->r_dentry->d_parent;
708 		struct inode *dir = parent->d_inode;
709 
710 		if (dir->i_sb != mdsc->fsc->sb) {
711 			/* not this fs! */
712 			inode = req->r_dentry->d_inode;
713 		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
714 			/* direct snapped/virtual snapdir requests
715 			 * based on parent dir inode */
716 			struct dentry *dn = get_nonsnap_parent(parent);
717 			inode = dn->d_inode;
718 			dout("__choose_mds using nonsnap parent %p\n", inode);
719 		} else {
720 			/* dentry target */
721 			inode = req->r_dentry->d_inode;
722 			if (!inode || mode == USE_AUTH_MDS) {
723 				/* dir + name */
724 				inode = dir;
725 				hash = ceph_dentry_hash(dir, req->r_dentry);
726 				is_hash = true;
727 			}
728 		}
729 	}
730 
731 	dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
732 	     (int)hash, mode);
733 	if (!inode)
734 		goto random;
735 	ci = ceph_inode(inode);
736 
737 	if (is_hash && S_ISDIR(inode->i_mode)) {
738 		struct ceph_inode_frag frag;
739 		int found;
740 
741 		ceph_choose_frag(ci, hash, &frag, &found);
742 		if (found) {
743 			if (mode == USE_ANY_MDS && frag.ndist > 0) {
744 				u8 r;
745 
746 				/* choose a random replica */
747 				get_random_bytes(&r, 1);
748 				r %= frag.ndist;
749 				mds = frag.dist[r];
750 				dout("choose_mds %p %llx.%llx "
751 				     "frag %u mds%d (%d/%d)\n",
752 				     inode, ceph_vinop(inode),
753 				     frag.frag, mds,
754 				     (int)r, frag.ndist);
755 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
756 				    CEPH_MDS_STATE_ACTIVE)
757 					return mds;
758 			}
759 
760 			/* since this file/dir wasn't known to be
761 			 * replicated, then we want to look for the
762 			 * authoritative mds. */
763 			mode = USE_AUTH_MDS;
764 			if (frag.mds >= 0) {
765 				/* choose auth mds */
766 				mds = frag.mds;
767 				dout("choose_mds %p %llx.%llx "
768 				     "frag %u mds%d (auth)\n",
769 				     inode, ceph_vinop(inode), frag.frag, mds);
770 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
771 				    CEPH_MDS_STATE_ACTIVE)
772 					return mds;
773 			}
774 		}
775 	}
776 
777 	spin_lock(&ci->i_ceph_lock);
778 	cap = NULL;
779 	if (mode == USE_AUTH_MDS)
780 		cap = ci->i_auth_cap;
781 	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
782 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
783 	if (!cap) {
784 		spin_unlock(&ci->i_ceph_lock);
785 		goto random;
786 	}
787 	mds = cap->session->s_mds;
788 	dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
789 	     inode, ceph_vinop(inode), mds,
790 	     cap == ci->i_auth_cap ? "auth " : "", cap);
791 	spin_unlock(&ci->i_ceph_lock);
792 	return mds;
793 
794 random:
795 	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
796 	dout("choose_mds chose random mds%d\n", mds);
797 	return mds;
798 }
799 
800 
801 /*
802  * session messages
803  */
804 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
805 {
806 	struct ceph_msg *msg;
807 	struct ceph_mds_session_head *h;
808 
809 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
810 			   false);
811 	if (!msg) {
812 		pr_err("create_session_msg ENOMEM creating msg\n");
813 		return NULL;
814 	}
815 	h = msg->front.iov_base;
816 	h->op = cpu_to_le32(op);
817 	h->seq = cpu_to_le64(seq);
818 
819 	return msg;
820 }
821 
822 /*
823  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
824  * to include additional client metadata fields.
825  */
826 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
827 {
828 	struct ceph_msg *msg;
829 	struct ceph_mds_session_head *h;
830 	int i = -1;
831 	int metadata_bytes = 0;
832 	int metadata_key_count = 0;
833 	struct ceph_options *opt = mdsc->fsc->client->options;
834 	void *p;
835 
836 	const char* metadata[3][2] = {
837 		{"hostname", utsname()->nodename},
838 		{"entity_id", opt->name ? opt->name : ""},
839 		{NULL, NULL}
840 	};
841 
842 	/* Calculate serialized length of metadata */
843 	metadata_bytes = 4;  /* map length */
844 	for (i = 0; metadata[i][0] != NULL; ++i) {
845 		metadata_bytes += 8 + strlen(metadata[i][0]) +
846 			strlen(metadata[i][1]);
847 		metadata_key_count++;
848 	}
849 
850 	/* Allocate the message */
851 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
852 			   GFP_NOFS, false);
853 	if (!msg) {
854 		pr_err("create_session_msg ENOMEM creating msg\n");
855 		return NULL;
856 	}
857 	h = msg->front.iov_base;
858 	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
859 	h->seq = cpu_to_le64(seq);
860 
861 	/*
862 	 * Serialize client metadata into waiting buffer space, using
863 	 * the format that userspace expects for map<string, string>
864 	 */
865 	msg->hdr.version = 2;  /* ClientSession messages with metadata are v2 */
866 
867 	/* The write pointer, following the session_head structure */
868 	p = msg->front.iov_base + sizeof(*h);
869 
870 	/* Number of entries in the map */
871 	ceph_encode_32(&p, metadata_key_count);
872 
873 	/* Two length-prefixed strings for each entry in the map */
874 	for (i = 0; metadata[i][0] != NULL; ++i) {
875 		size_t const key_len = strlen(metadata[i][0]);
876 		size_t const val_len = strlen(metadata[i][1]);
877 
878 		ceph_encode_32(&p, key_len);
879 		memcpy(p, metadata[i][0], key_len);
880 		p += key_len;
881 		ceph_encode_32(&p, val_len);
882 		memcpy(p, metadata[i][1], val_len);
883 		p += val_len;
884 	}
885 
886 	return msg;
887 }
888 
889 /*
890  * send session open request.
891  *
892  * called under mdsc->mutex
893  */
894 static int __open_session(struct ceph_mds_client *mdsc,
895 			  struct ceph_mds_session *session)
896 {
897 	struct ceph_msg *msg;
898 	int mstate;
899 	int mds = session->s_mds;
900 
901 	/* wait for mds to go active? */
902 	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
903 	dout("open_session to mds%d (%s)\n", mds,
904 	     ceph_mds_state_name(mstate));
905 	session->s_state = CEPH_MDS_SESSION_OPENING;
906 	session->s_renew_requested = jiffies;
907 
908 	/* send connect message */
909 	msg = create_session_open_msg(mdsc, session->s_seq);
910 	if (!msg)
911 		return -ENOMEM;
912 	ceph_con_send(&session->s_con, msg);
913 	return 0;
914 }
915 
916 /*
917  * open sessions for any export targets for the given mds
918  *
919  * called under mdsc->mutex
920  */
921 static struct ceph_mds_session *
922 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
923 {
924 	struct ceph_mds_session *session;
925 
926 	session = __ceph_lookup_mds_session(mdsc, target);
927 	if (!session) {
928 		session = register_session(mdsc, target);
929 		if (IS_ERR(session))
930 			return session;
931 	}
932 	if (session->s_state == CEPH_MDS_SESSION_NEW ||
933 	    session->s_state == CEPH_MDS_SESSION_CLOSING)
934 		__open_session(mdsc, session);
935 
936 	return session;
937 }
938 
939 struct ceph_mds_session *
940 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
941 {
942 	struct ceph_mds_session *session;
943 
944 	dout("open_export_target_session to mds%d\n", target);
945 
946 	mutex_lock(&mdsc->mutex);
947 	session = __open_export_target_session(mdsc, target);
948 	mutex_unlock(&mdsc->mutex);
949 
950 	return session;
951 }
952 
953 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
954 					  struct ceph_mds_session *session)
955 {
956 	struct ceph_mds_info *mi;
957 	struct ceph_mds_session *ts;
958 	int i, mds = session->s_mds;
959 
960 	if (mds >= mdsc->mdsmap->m_max_mds)
961 		return;
962 
963 	mi = &mdsc->mdsmap->m_info[mds];
964 	dout("open_export_target_sessions for mds%d (%d targets)\n",
965 	     session->s_mds, mi->num_export_targets);
966 
967 	for (i = 0; i < mi->num_export_targets; i++) {
968 		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
969 		if (!IS_ERR(ts))
970 			ceph_put_mds_session(ts);
971 	}
972 }
973 
974 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
975 					   struct ceph_mds_session *session)
976 {
977 	mutex_lock(&mdsc->mutex);
978 	__open_export_target_sessions(mdsc, session);
979 	mutex_unlock(&mdsc->mutex);
980 }
981 
982 /*
983  * session caps
984  */
985 
986 /*
987  * Free preallocated cap messages assigned to this session
988  */
989 static void cleanup_cap_releases(struct ceph_mds_session *session)
990 {
991 	struct ceph_msg *msg;
992 
993 	spin_lock(&session->s_cap_lock);
994 	while (!list_empty(&session->s_cap_releases)) {
995 		msg = list_first_entry(&session->s_cap_releases,
996 				       struct ceph_msg, list_head);
997 		list_del_init(&msg->list_head);
998 		ceph_msg_put(msg);
999 	}
1000 	while (!list_empty(&session->s_cap_releases_done)) {
1001 		msg = list_first_entry(&session->s_cap_releases_done,
1002 				       struct ceph_msg, list_head);
1003 		list_del_init(&msg->list_head);
1004 		ceph_msg_put(msg);
1005 	}
1006 	spin_unlock(&session->s_cap_lock);
1007 }
1008 
1009 /*
1010  * Helper to safely iterate over all caps associated with a session, with
1011  * special care taken to handle a racing __ceph_remove_cap().
1012  *
1013  * Caller must hold session s_mutex.
1014  */
1015 static int iterate_session_caps(struct ceph_mds_session *session,
1016 				 int (*cb)(struct inode *, struct ceph_cap *,
1017 					    void *), void *arg)
1018 {
1019 	struct list_head *p;
1020 	struct ceph_cap *cap;
1021 	struct inode *inode, *last_inode = NULL;
1022 	struct ceph_cap *old_cap = NULL;
1023 	int ret;
1024 
1025 	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1026 	spin_lock(&session->s_cap_lock);
1027 	p = session->s_caps.next;
1028 	while (p != &session->s_caps) {
1029 		cap = list_entry(p, struct ceph_cap, session_caps);
1030 		inode = igrab(&cap->ci->vfs_inode);
1031 		if (!inode) {
1032 			p = p->next;
1033 			continue;
1034 		}
1035 		session->s_cap_iterator = cap;
1036 		spin_unlock(&session->s_cap_lock);
1037 
1038 		if (last_inode) {
1039 			iput(last_inode);
1040 			last_inode = NULL;
1041 		}
1042 		if (old_cap) {
1043 			ceph_put_cap(session->s_mdsc, old_cap);
1044 			old_cap = NULL;
1045 		}
1046 
1047 		ret = cb(inode, cap, arg);
1048 		last_inode = inode;
1049 
1050 		spin_lock(&session->s_cap_lock);
1051 		p = p->next;
1052 		if (cap->ci == NULL) {
1053 			dout("iterate_session_caps  finishing cap %p removal\n",
1054 			     cap);
1055 			BUG_ON(cap->session != session);
1056 			list_del_init(&cap->session_caps);
1057 			session->s_nr_caps--;
1058 			cap->session = NULL;
1059 			old_cap = cap;  /* put_cap it w/o locks held */
1060 		}
1061 		if (ret < 0)
1062 			goto out;
1063 	}
1064 	ret = 0;
1065 out:
1066 	session->s_cap_iterator = NULL;
1067 	spin_unlock(&session->s_cap_lock);
1068 
1069 	if (last_inode)
1070 		iput(last_inode);
1071 	if (old_cap)
1072 		ceph_put_cap(session->s_mdsc, old_cap);
1073 
1074 	return ret;
1075 }
1076 
1077 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1078 				  void *arg)
1079 {
1080 	struct ceph_inode_info *ci = ceph_inode(inode);
1081 	int drop = 0;
1082 
1083 	dout("removing cap %p, ci is %p, inode is %p\n",
1084 	     cap, ci, &ci->vfs_inode);
1085 	spin_lock(&ci->i_ceph_lock);
1086 	__ceph_remove_cap(cap, false);
1087 	if (!__ceph_is_any_real_caps(ci)) {
1088 		struct ceph_mds_client *mdsc =
1089 			ceph_sb_to_client(inode->i_sb)->mdsc;
1090 
1091 		spin_lock(&mdsc->cap_dirty_lock);
1092 		if (!list_empty(&ci->i_dirty_item)) {
1093 			pr_info(" dropping dirty %s state for %p %lld\n",
1094 				ceph_cap_string(ci->i_dirty_caps),
1095 				inode, ceph_ino(inode));
1096 			ci->i_dirty_caps = 0;
1097 			list_del_init(&ci->i_dirty_item);
1098 			drop = 1;
1099 		}
1100 		if (!list_empty(&ci->i_flushing_item)) {
1101 			pr_info(" dropping dirty+flushing %s state for %p %lld\n",
1102 				ceph_cap_string(ci->i_flushing_caps),
1103 				inode, ceph_ino(inode));
1104 			ci->i_flushing_caps = 0;
1105 			list_del_init(&ci->i_flushing_item);
1106 			mdsc->num_cap_flushing--;
1107 			drop = 1;
1108 		}
1109 		if (drop && ci->i_wrbuffer_ref) {
1110 			pr_info(" dropping dirty data for %p %lld\n",
1111 				inode, ceph_ino(inode));
1112 			ci->i_wrbuffer_ref = 0;
1113 			ci->i_wrbuffer_ref_head = 0;
1114 			drop++;
1115 		}
1116 		spin_unlock(&mdsc->cap_dirty_lock);
1117 	}
1118 	spin_unlock(&ci->i_ceph_lock);
1119 	while (drop--)
1120 		iput(inode);
1121 	return 0;
1122 }
1123 
1124 /*
1125  * caller must hold session s_mutex
1126  */
1127 static void remove_session_caps(struct ceph_mds_session *session)
1128 {
1129 	dout("remove_session_caps on %p\n", session);
1130 	iterate_session_caps(session, remove_session_caps_cb, NULL);
1131 
1132 	spin_lock(&session->s_cap_lock);
1133 	if (session->s_nr_caps > 0) {
1134 		struct super_block *sb = session->s_mdsc->fsc->sb;
1135 		struct inode *inode;
1136 		struct ceph_cap *cap, *prev = NULL;
1137 		struct ceph_vino vino;
1138 		/*
1139 		 * iterate_session_caps() skips inodes that are being
1140 		 * deleted, we need to wait until deletions are complete.
1141 		 * __wait_on_freeing_inode() is designed for the job,
1142 		 * but it is not exported, so use lookup inode function
1143 		 * to access it.
1144 		 */
1145 		while (!list_empty(&session->s_caps)) {
1146 			cap = list_entry(session->s_caps.next,
1147 					 struct ceph_cap, session_caps);
1148 			if (cap == prev)
1149 				break;
1150 			prev = cap;
1151 			vino = cap->ci->i_vino;
1152 			spin_unlock(&session->s_cap_lock);
1153 
1154 			inode = ceph_find_inode(sb, vino);
1155 			iput(inode);
1156 
1157 			spin_lock(&session->s_cap_lock);
1158 		}
1159 	}
1160 	spin_unlock(&session->s_cap_lock);
1161 
1162 	BUG_ON(session->s_nr_caps > 0);
1163 	BUG_ON(!list_empty(&session->s_cap_flushing));
1164 	cleanup_cap_releases(session);
1165 }
1166 
1167 /*
1168  * wake up any threads waiting on this session's caps.  if the cap is
1169  * old (didn't get renewed on the client reconnect), remove it now.
1170  *
1171  * caller must hold s_mutex.
1172  */
1173 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1174 			      void *arg)
1175 {
1176 	struct ceph_inode_info *ci = ceph_inode(inode);
1177 
1178 	wake_up_all(&ci->i_cap_wq);
1179 	if (arg) {
1180 		spin_lock(&ci->i_ceph_lock);
1181 		ci->i_wanted_max_size = 0;
1182 		ci->i_requested_max_size = 0;
1183 		spin_unlock(&ci->i_ceph_lock);
1184 	}
1185 	return 0;
1186 }
1187 
1188 static void wake_up_session_caps(struct ceph_mds_session *session,
1189 				 int reconnect)
1190 {
1191 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1192 	iterate_session_caps(session, wake_up_session_cb,
1193 			     (void *)(unsigned long)reconnect);
1194 }
1195 
1196 /*
1197  * Send periodic message to MDS renewing all currently held caps.  The
1198  * ack will reset the expiration for all caps from this session.
1199  *
1200  * caller holds s_mutex
1201  */
1202 static int send_renew_caps(struct ceph_mds_client *mdsc,
1203 			   struct ceph_mds_session *session)
1204 {
1205 	struct ceph_msg *msg;
1206 	int state;
1207 
1208 	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1209 	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1210 		pr_info("mds%d caps stale\n", session->s_mds);
1211 	session->s_renew_requested = jiffies;
1212 
1213 	/* do not try to renew caps until a recovering mds has reconnected
1214 	 * with its clients. */
1215 	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1216 	if (state < CEPH_MDS_STATE_RECONNECT) {
1217 		dout("send_renew_caps ignoring mds%d (%s)\n",
1218 		     session->s_mds, ceph_mds_state_name(state));
1219 		return 0;
1220 	}
1221 
1222 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1223 		ceph_mds_state_name(state));
1224 	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1225 				 ++session->s_renew_seq);
1226 	if (!msg)
1227 		return -ENOMEM;
1228 	ceph_con_send(&session->s_con, msg);
1229 	return 0;
1230 }
1231 
1232 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1233 			     struct ceph_mds_session *session, u64 seq)
1234 {
1235 	struct ceph_msg *msg;
1236 
1237 	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1238 	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1239 	msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1240 	if (!msg)
1241 		return -ENOMEM;
1242 	ceph_con_send(&session->s_con, msg);
1243 	return 0;
1244 }
1245 
1246 
1247 /*
1248  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1249  *
1250  * Called under session->s_mutex
1251  */
1252 static void renewed_caps(struct ceph_mds_client *mdsc,
1253 			 struct ceph_mds_session *session, int is_renew)
1254 {
1255 	int was_stale;
1256 	int wake = 0;
1257 
1258 	spin_lock(&session->s_cap_lock);
1259 	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1260 
1261 	session->s_cap_ttl = session->s_renew_requested +
1262 		mdsc->mdsmap->m_session_timeout*HZ;
1263 
1264 	if (was_stale) {
1265 		if (time_before(jiffies, session->s_cap_ttl)) {
1266 			pr_info("mds%d caps renewed\n", session->s_mds);
1267 			wake = 1;
1268 		} else {
1269 			pr_info("mds%d caps still stale\n", session->s_mds);
1270 		}
1271 	}
1272 	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1273 	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1274 	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1275 	spin_unlock(&session->s_cap_lock);
1276 
1277 	if (wake)
1278 		wake_up_session_caps(session, 0);
1279 }
1280 
1281 /*
1282  * send a session close request
1283  */
1284 static int request_close_session(struct ceph_mds_client *mdsc,
1285 				 struct ceph_mds_session *session)
1286 {
1287 	struct ceph_msg *msg;
1288 
1289 	dout("request_close_session mds%d state %s seq %lld\n",
1290 	     session->s_mds, ceph_session_state_name(session->s_state),
1291 	     session->s_seq);
1292 	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1293 	if (!msg)
1294 		return -ENOMEM;
1295 	ceph_con_send(&session->s_con, msg);
1296 	return 0;
1297 }
1298 
1299 /*
1300  * Called with s_mutex held.
1301  */
1302 static int __close_session(struct ceph_mds_client *mdsc,
1303 			 struct ceph_mds_session *session)
1304 {
1305 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1306 		return 0;
1307 	session->s_state = CEPH_MDS_SESSION_CLOSING;
1308 	return request_close_session(mdsc, session);
1309 }
1310 
1311 /*
1312  * Trim old(er) caps.
1313  *
1314  * Because we can't cache an inode without one or more caps, we do
1315  * this indirectly: if a cap is unused, we prune its aliases, at which
1316  * point the inode will hopefully get dropped to.
1317  *
1318  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1319  * memory pressure from the MDS, though, so it needn't be perfect.
1320  */
1321 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1322 {
1323 	struct ceph_mds_session *session = arg;
1324 	struct ceph_inode_info *ci = ceph_inode(inode);
1325 	int used, wanted, oissued, mine;
1326 
1327 	if (session->s_trim_caps <= 0)
1328 		return -1;
1329 
1330 	spin_lock(&ci->i_ceph_lock);
1331 	mine = cap->issued | cap->implemented;
1332 	used = __ceph_caps_used(ci);
1333 	wanted = __ceph_caps_file_wanted(ci);
1334 	oissued = __ceph_caps_issued_other(ci, cap);
1335 
1336 	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1337 	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1338 	     ceph_cap_string(used), ceph_cap_string(wanted));
1339 	if (cap == ci->i_auth_cap) {
1340 		if (ci->i_dirty_caps | ci->i_flushing_caps)
1341 			goto out;
1342 		if ((used | wanted) & CEPH_CAP_ANY_WR)
1343 			goto out;
1344 	}
1345 	if ((used | wanted) & ~oissued & mine)
1346 		goto out;   /* we need these caps */
1347 
1348 	session->s_trim_caps--;
1349 	if (oissued) {
1350 		/* we aren't the only cap.. just remove us */
1351 		__ceph_remove_cap(cap, true);
1352 	} else {
1353 		/* try to drop referring dentries */
1354 		spin_unlock(&ci->i_ceph_lock);
1355 		d_prune_aliases(inode);
1356 		dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1357 		     inode, cap, atomic_read(&inode->i_count));
1358 		return 0;
1359 	}
1360 
1361 out:
1362 	spin_unlock(&ci->i_ceph_lock);
1363 	return 0;
1364 }
1365 
1366 /*
1367  * Trim session cap count down to some max number.
1368  */
1369 static int trim_caps(struct ceph_mds_client *mdsc,
1370 		     struct ceph_mds_session *session,
1371 		     int max_caps)
1372 {
1373 	int trim_caps = session->s_nr_caps - max_caps;
1374 
1375 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
1376 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1377 	if (trim_caps > 0) {
1378 		session->s_trim_caps = trim_caps;
1379 		iterate_session_caps(session, trim_caps_cb, session);
1380 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1381 		     session->s_mds, session->s_nr_caps, max_caps,
1382 			trim_caps - session->s_trim_caps);
1383 		session->s_trim_caps = 0;
1384 	}
1385 
1386 	ceph_add_cap_releases(mdsc, session);
1387 	ceph_send_cap_releases(mdsc, session);
1388 	return 0;
1389 }
1390 
1391 /*
1392  * Allocate cap_release messages.  If there is a partially full message
1393  * in the queue, try to allocate enough to cover it's remainder, so that
1394  * we can send it immediately.
1395  *
1396  * Called under s_mutex.
1397  */
1398 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1399 			  struct ceph_mds_session *session)
1400 {
1401 	struct ceph_msg *msg, *partial = NULL;
1402 	struct ceph_mds_cap_release *head;
1403 	int err = -ENOMEM;
1404 	int extra = mdsc->fsc->mount_options->cap_release_safety;
1405 	int num;
1406 
1407 	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1408 	     extra);
1409 
1410 	spin_lock(&session->s_cap_lock);
1411 
1412 	if (!list_empty(&session->s_cap_releases)) {
1413 		msg = list_first_entry(&session->s_cap_releases,
1414 				       struct ceph_msg,
1415 				 list_head);
1416 		head = msg->front.iov_base;
1417 		num = le32_to_cpu(head->num);
1418 		if (num) {
1419 			dout(" partial %p with (%d/%d)\n", msg, num,
1420 			     (int)CEPH_CAPS_PER_RELEASE);
1421 			extra += CEPH_CAPS_PER_RELEASE - num;
1422 			partial = msg;
1423 		}
1424 	}
1425 	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1426 		spin_unlock(&session->s_cap_lock);
1427 		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1428 				   GFP_NOFS, false);
1429 		if (!msg)
1430 			goto out_unlocked;
1431 		dout("add_cap_releases %p msg %p now %d\n", session, msg,
1432 		     (int)msg->front.iov_len);
1433 		head = msg->front.iov_base;
1434 		head->num = cpu_to_le32(0);
1435 		msg->front.iov_len = sizeof(*head);
1436 		spin_lock(&session->s_cap_lock);
1437 		list_add(&msg->list_head, &session->s_cap_releases);
1438 		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1439 	}
1440 
1441 	if (partial) {
1442 		head = partial->front.iov_base;
1443 		num = le32_to_cpu(head->num);
1444 		dout(" queueing partial %p with %d/%d\n", partial, num,
1445 		     (int)CEPH_CAPS_PER_RELEASE);
1446 		list_move_tail(&partial->list_head,
1447 			       &session->s_cap_releases_done);
1448 		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1449 	}
1450 	err = 0;
1451 	spin_unlock(&session->s_cap_lock);
1452 out_unlocked:
1453 	return err;
1454 }
1455 
1456 /*
1457  * flush all dirty inode data to disk.
1458  *
1459  * returns true if we've flushed through want_flush_seq
1460  */
1461 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1462 {
1463 	int mds, ret = 1;
1464 
1465 	dout("check_cap_flush want %lld\n", want_flush_seq);
1466 	mutex_lock(&mdsc->mutex);
1467 	for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1468 		struct ceph_mds_session *session = mdsc->sessions[mds];
1469 
1470 		if (!session)
1471 			continue;
1472 		get_session(session);
1473 		mutex_unlock(&mdsc->mutex);
1474 
1475 		mutex_lock(&session->s_mutex);
1476 		if (!list_empty(&session->s_cap_flushing)) {
1477 			struct ceph_inode_info *ci =
1478 				list_entry(session->s_cap_flushing.next,
1479 					   struct ceph_inode_info,
1480 					   i_flushing_item);
1481 			struct inode *inode = &ci->vfs_inode;
1482 
1483 			spin_lock(&ci->i_ceph_lock);
1484 			if (ci->i_cap_flush_seq <= want_flush_seq) {
1485 				dout("check_cap_flush still flushing %p "
1486 				     "seq %lld <= %lld to mds%d\n", inode,
1487 				     ci->i_cap_flush_seq, want_flush_seq,
1488 				     session->s_mds);
1489 				ret = 0;
1490 			}
1491 			spin_unlock(&ci->i_ceph_lock);
1492 		}
1493 		mutex_unlock(&session->s_mutex);
1494 		ceph_put_mds_session(session);
1495 
1496 		if (!ret)
1497 			return ret;
1498 		mutex_lock(&mdsc->mutex);
1499 	}
1500 
1501 	mutex_unlock(&mdsc->mutex);
1502 	dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1503 	return ret;
1504 }
1505 
1506 /*
1507  * called under s_mutex
1508  */
1509 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1510 			    struct ceph_mds_session *session)
1511 {
1512 	struct ceph_msg *msg;
1513 
1514 	dout("send_cap_releases mds%d\n", session->s_mds);
1515 	spin_lock(&session->s_cap_lock);
1516 	while (!list_empty(&session->s_cap_releases_done)) {
1517 		msg = list_first_entry(&session->s_cap_releases_done,
1518 				 struct ceph_msg, list_head);
1519 		list_del_init(&msg->list_head);
1520 		spin_unlock(&session->s_cap_lock);
1521 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1522 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1523 		ceph_con_send(&session->s_con, msg);
1524 		spin_lock(&session->s_cap_lock);
1525 	}
1526 	spin_unlock(&session->s_cap_lock);
1527 }
1528 
1529 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1530 				 struct ceph_mds_session *session)
1531 {
1532 	struct ceph_msg *msg;
1533 	struct ceph_mds_cap_release *head;
1534 	unsigned num;
1535 
1536 	dout("discard_cap_releases mds%d\n", session->s_mds);
1537 
1538 	if (!list_empty(&session->s_cap_releases)) {
1539 		/* zero out the in-progress message */
1540 		msg = list_first_entry(&session->s_cap_releases,
1541 					struct ceph_msg, list_head);
1542 		head = msg->front.iov_base;
1543 		num = le32_to_cpu(head->num);
1544 		dout("discard_cap_releases mds%d %p %u\n",
1545 		     session->s_mds, msg, num);
1546 		head->num = cpu_to_le32(0);
1547 		msg->front.iov_len = sizeof(*head);
1548 		session->s_num_cap_releases += num;
1549 	}
1550 
1551 	/* requeue completed messages */
1552 	while (!list_empty(&session->s_cap_releases_done)) {
1553 		msg = list_first_entry(&session->s_cap_releases_done,
1554 				 struct ceph_msg, list_head);
1555 		list_del_init(&msg->list_head);
1556 
1557 		head = msg->front.iov_base;
1558 		num = le32_to_cpu(head->num);
1559 		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1560 		     num);
1561 		session->s_num_cap_releases += num;
1562 		head->num = cpu_to_le32(0);
1563 		msg->front.iov_len = sizeof(*head);
1564 		list_add(&msg->list_head, &session->s_cap_releases);
1565 	}
1566 }
1567 
1568 /*
1569  * requests
1570  */
1571 
1572 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
1573 				    struct inode *dir)
1574 {
1575 	struct ceph_inode_info *ci = ceph_inode(dir);
1576 	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
1577 	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
1578 	size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) +
1579 		      sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease);
1580 	int order, num_entries;
1581 
1582 	spin_lock(&ci->i_ceph_lock);
1583 	num_entries = ci->i_files + ci->i_subdirs;
1584 	spin_unlock(&ci->i_ceph_lock);
1585 	num_entries = max(num_entries, 1);
1586 	num_entries = min(num_entries, opt->max_readdir);
1587 
1588 	order = get_order(size * num_entries);
1589 	while (order >= 0) {
1590 		rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
1591 							order);
1592 		if (rinfo->dir_in)
1593 			break;
1594 		order--;
1595 	}
1596 	if (!rinfo->dir_in)
1597 		return -ENOMEM;
1598 
1599 	num_entries = (PAGE_SIZE << order) / size;
1600 	num_entries = min(num_entries, opt->max_readdir);
1601 
1602 	rinfo->dir_buf_size = PAGE_SIZE << order;
1603 	req->r_num_caps = num_entries + 1;
1604 	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
1605 	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
1606 	return 0;
1607 }
1608 
1609 /*
1610  * Create an mds request.
1611  */
1612 struct ceph_mds_request *
1613 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1614 {
1615 	struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1616 
1617 	if (!req)
1618 		return ERR_PTR(-ENOMEM);
1619 
1620 	mutex_init(&req->r_fill_mutex);
1621 	req->r_mdsc = mdsc;
1622 	req->r_started = jiffies;
1623 	req->r_resend_mds = -1;
1624 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1625 	req->r_fmode = -1;
1626 	kref_init(&req->r_kref);
1627 	INIT_LIST_HEAD(&req->r_wait);
1628 	init_completion(&req->r_completion);
1629 	init_completion(&req->r_safe_completion);
1630 	INIT_LIST_HEAD(&req->r_unsafe_item);
1631 
1632 	req->r_stamp = CURRENT_TIME;
1633 
1634 	req->r_op = op;
1635 	req->r_direct_mode = mode;
1636 	return req;
1637 }
1638 
1639 /*
1640  * return oldest (lowest) request, tid in request tree, 0 if none.
1641  *
1642  * called under mdsc->mutex.
1643  */
1644 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1645 {
1646 	if (RB_EMPTY_ROOT(&mdsc->request_tree))
1647 		return NULL;
1648 	return rb_entry(rb_first(&mdsc->request_tree),
1649 			struct ceph_mds_request, r_node);
1650 }
1651 
1652 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1653 {
1654 	struct ceph_mds_request *req = __get_oldest_req(mdsc);
1655 
1656 	if (req)
1657 		return req->r_tid;
1658 	return 0;
1659 }
1660 
1661 /*
1662  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1663  * on build_path_from_dentry in fs/cifs/dir.c.
1664  *
1665  * If @stop_on_nosnap, generate path relative to the first non-snapped
1666  * inode.
1667  *
1668  * Encode hidden .snap dirs as a double /, i.e.
1669  *   foo/.snap/bar -> foo//bar
1670  */
1671 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1672 			   int stop_on_nosnap)
1673 {
1674 	struct dentry *temp;
1675 	char *path;
1676 	int len, pos;
1677 	unsigned seq;
1678 
1679 	if (dentry == NULL)
1680 		return ERR_PTR(-EINVAL);
1681 
1682 retry:
1683 	len = 0;
1684 	seq = read_seqbegin(&rename_lock);
1685 	rcu_read_lock();
1686 	for (temp = dentry; !IS_ROOT(temp);) {
1687 		struct inode *inode = temp->d_inode;
1688 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1689 			len++;  /* slash only */
1690 		else if (stop_on_nosnap && inode &&
1691 			 ceph_snap(inode) == CEPH_NOSNAP)
1692 			break;
1693 		else
1694 			len += 1 + temp->d_name.len;
1695 		temp = temp->d_parent;
1696 	}
1697 	rcu_read_unlock();
1698 	if (len)
1699 		len--;  /* no leading '/' */
1700 
1701 	path = kmalloc(len+1, GFP_NOFS);
1702 	if (path == NULL)
1703 		return ERR_PTR(-ENOMEM);
1704 	pos = len;
1705 	path[pos] = 0;	/* trailing null */
1706 	rcu_read_lock();
1707 	for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1708 		struct inode *inode;
1709 
1710 		spin_lock(&temp->d_lock);
1711 		inode = temp->d_inode;
1712 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1713 			dout("build_path path+%d: %p SNAPDIR\n",
1714 			     pos, temp);
1715 		} else if (stop_on_nosnap && inode &&
1716 			   ceph_snap(inode) == CEPH_NOSNAP) {
1717 			spin_unlock(&temp->d_lock);
1718 			break;
1719 		} else {
1720 			pos -= temp->d_name.len;
1721 			if (pos < 0) {
1722 				spin_unlock(&temp->d_lock);
1723 				break;
1724 			}
1725 			strncpy(path + pos, temp->d_name.name,
1726 				temp->d_name.len);
1727 		}
1728 		spin_unlock(&temp->d_lock);
1729 		if (pos)
1730 			path[--pos] = '/';
1731 		temp = temp->d_parent;
1732 	}
1733 	rcu_read_unlock();
1734 	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
1735 		pr_err("build_path did not end path lookup where "
1736 		       "expected, namelen is %d, pos is %d\n", len, pos);
1737 		/* presumably this is only possible if racing with a
1738 		   rename of one of the parent directories (we can not
1739 		   lock the dentries above us to prevent this, but
1740 		   retrying should be harmless) */
1741 		kfree(path);
1742 		goto retry;
1743 	}
1744 
1745 	*base = ceph_ino(temp->d_inode);
1746 	*plen = len;
1747 	dout("build_path on %p %d built %llx '%.*s'\n",
1748 	     dentry, d_count(dentry), *base, len, path);
1749 	return path;
1750 }
1751 
1752 static int build_dentry_path(struct dentry *dentry,
1753 			     const char **ppath, int *ppathlen, u64 *pino,
1754 			     int *pfreepath)
1755 {
1756 	char *path;
1757 
1758 	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1759 		*pino = ceph_ino(dentry->d_parent->d_inode);
1760 		*ppath = dentry->d_name.name;
1761 		*ppathlen = dentry->d_name.len;
1762 		return 0;
1763 	}
1764 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1765 	if (IS_ERR(path))
1766 		return PTR_ERR(path);
1767 	*ppath = path;
1768 	*pfreepath = 1;
1769 	return 0;
1770 }
1771 
1772 static int build_inode_path(struct inode *inode,
1773 			    const char **ppath, int *ppathlen, u64 *pino,
1774 			    int *pfreepath)
1775 {
1776 	struct dentry *dentry;
1777 	char *path;
1778 
1779 	if (ceph_snap(inode) == CEPH_NOSNAP) {
1780 		*pino = ceph_ino(inode);
1781 		*ppathlen = 0;
1782 		return 0;
1783 	}
1784 	dentry = d_find_alias(inode);
1785 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1786 	dput(dentry);
1787 	if (IS_ERR(path))
1788 		return PTR_ERR(path);
1789 	*ppath = path;
1790 	*pfreepath = 1;
1791 	return 0;
1792 }
1793 
1794 /*
1795  * request arguments may be specified via an inode *, a dentry *, or
1796  * an explicit ino+path.
1797  */
1798 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1799 				  const char *rpath, u64 rino,
1800 				  const char **ppath, int *pathlen,
1801 				  u64 *ino, int *freepath)
1802 {
1803 	int r = 0;
1804 
1805 	if (rinode) {
1806 		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1807 		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1808 		     ceph_snap(rinode));
1809 	} else if (rdentry) {
1810 		r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1811 		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1812 		     *ppath);
1813 	} else if (rpath || rino) {
1814 		*ino = rino;
1815 		*ppath = rpath;
1816 		*pathlen = rpath ? strlen(rpath) : 0;
1817 		dout(" path %.*s\n", *pathlen, rpath);
1818 	}
1819 
1820 	return r;
1821 }
1822 
1823 /*
1824  * called under mdsc->mutex
1825  */
1826 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1827 					       struct ceph_mds_request *req,
1828 					       int mds)
1829 {
1830 	struct ceph_msg *msg;
1831 	struct ceph_mds_request_head *head;
1832 	const char *path1 = NULL;
1833 	const char *path2 = NULL;
1834 	u64 ino1 = 0, ino2 = 0;
1835 	int pathlen1 = 0, pathlen2 = 0;
1836 	int freepath1 = 0, freepath2 = 0;
1837 	int len;
1838 	u16 releases;
1839 	void *p, *end;
1840 	int ret;
1841 
1842 	ret = set_request_path_attr(req->r_inode, req->r_dentry,
1843 			      req->r_path1, req->r_ino1.ino,
1844 			      &path1, &pathlen1, &ino1, &freepath1);
1845 	if (ret < 0) {
1846 		msg = ERR_PTR(ret);
1847 		goto out;
1848 	}
1849 
1850 	ret = set_request_path_attr(NULL, req->r_old_dentry,
1851 			      req->r_path2, req->r_ino2.ino,
1852 			      &path2, &pathlen2, &ino2, &freepath2);
1853 	if (ret < 0) {
1854 		msg = ERR_PTR(ret);
1855 		goto out_free1;
1856 	}
1857 
1858 	len = sizeof(*head) +
1859 		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
1860 		sizeof(struct timespec);
1861 
1862 	/* calculate (max) length for cap releases */
1863 	len += sizeof(struct ceph_mds_request_release) *
1864 		(!!req->r_inode_drop + !!req->r_dentry_drop +
1865 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1866 	if (req->r_dentry_drop)
1867 		len += req->r_dentry->d_name.len;
1868 	if (req->r_old_dentry_drop)
1869 		len += req->r_old_dentry->d_name.len;
1870 
1871 	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
1872 	if (!msg) {
1873 		msg = ERR_PTR(-ENOMEM);
1874 		goto out_free2;
1875 	}
1876 
1877 	msg->hdr.version = 2;
1878 	msg->hdr.tid = cpu_to_le64(req->r_tid);
1879 
1880 	head = msg->front.iov_base;
1881 	p = msg->front.iov_base + sizeof(*head);
1882 	end = msg->front.iov_base + msg->front.iov_len;
1883 
1884 	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1885 	head->op = cpu_to_le32(req->r_op);
1886 	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
1887 	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
1888 	head->args = req->r_args;
1889 
1890 	ceph_encode_filepath(&p, end, ino1, path1);
1891 	ceph_encode_filepath(&p, end, ino2, path2);
1892 
1893 	/* make note of release offset, in case we need to replay */
1894 	req->r_request_release_offset = p - msg->front.iov_base;
1895 
1896 	/* cap releases */
1897 	releases = 0;
1898 	if (req->r_inode_drop)
1899 		releases += ceph_encode_inode_release(&p,
1900 		      req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1901 		      mds, req->r_inode_drop, req->r_inode_unless, 0);
1902 	if (req->r_dentry_drop)
1903 		releases += ceph_encode_dentry_release(&p, req->r_dentry,
1904 		       mds, req->r_dentry_drop, req->r_dentry_unless);
1905 	if (req->r_old_dentry_drop)
1906 		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1907 		       mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1908 	if (req->r_old_inode_drop)
1909 		releases += ceph_encode_inode_release(&p,
1910 		      req->r_old_dentry->d_inode,
1911 		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1912 	head->num_releases = cpu_to_le16(releases);
1913 
1914 	/* time stamp */
1915 	ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
1916 
1917 	BUG_ON(p > end);
1918 	msg->front.iov_len = p - msg->front.iov_base;
1919 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1920 
1921 	if (req->r_pagelist) {
1922 		struct ceph_pagelist *pagelist = req->r_pagelist;
1923 		atomic_inc(&pagelist->refcnt);
1924 		ceph_msg_data_add_pagelist(msg, pagelist);
1925 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
1926 	} else {
1927 		msg->hdr.data_len = 0;
1928 	}
1929 
1930 	msg->hdr.data_off = cpu_to_le16(0);
1931 
1932 out_free2:
1933 	if (freepath2)
1934 		kfree((char *)path2);
1935 out_free1:
1936 	if (freepath1)
1937 		kfree((char *)path1);
1938 out:
1939 	return msg;
1940 }
1941 
1942 /*
1943  * called under mdsc->mutex if error, under no mutex if
1944  * success.
1945  */
1946 static void complete_request(struct ceph_mds_client *mdsc,
1947 			     struct ceph_mds_request *req)
1948 {
1949 	if (req->r_callback)
1950 		req->r_callback(mdsc, req);
1951 	else
1952 		complete_all(&req->r_completion);
1953 }
1954 
1955 /*
1956  * called under mdsc->mutex
1957  */
1958 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1959 				  struct ceph_mds_request *req,
1960 				  int mds)
1961 {
1962 	struct ceph_mds_request_head *rhead;
1963 	struct ceph_msg *msg;
1964 	int flags = 0;
1965 
1966 	req->r_attempts++;
1967 	if (req->r_inode) {
1968 		struct ceph_cap *cap =
1969 			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1970 
1971 		if (cap)
1972 			req->r_sent_on_mseq = cap->mseq;
1973 		else
1974 			req->r_sent_on_mseq = -1;
1975 	}
1976 	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1977 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1978 
1979 	if (req->r_got_unsafe) {
1980 		void *p;
1981 		/*
1982 		 * Replay.  Do not regenerate message (and rebuild
1983 		 * paths, etc.); just use the original message.
1984 		 * Rebuilding paths will break for renames because
1985 		 * d_move mangles the src name.
1986 		 */
1987 		msg = req->r_request;
1988 		rhead = msg->front.iov_base;
1989 
1990 		flags = le32_to_cpu(rhead->flags);
1991 		flags |= CEPH_MDS_FLAG_REPLAY;
1992 		rhead->flags = cpu_to_le32(flags);
1993 
1994 		if (req->r_target_inode)
1995 			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1996 
1997 		rhead->num_retry = req->r_attempts - 1;
1998 
1999 		/* remove cap/dentry releases from message */
2000 		rhead->num_releases = 0;
2001 
2002 		/* time stamp */
2003 		p = msg->front.iov_base + req->r_request_release_offset;
2004 		ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
2005 
2006 		msg->front.iov_len = p - msg->front.iov_base;
2007 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2008 		return 0;
2009 	}
2010 
2011 	if (req->r_request) {
2012 		ceph_msg_put(req->r_request);
2013 		req->r_request = NULL;
2014 	}
2015 	msg = create_request_message(mdsc, req, mds);
2016 	if (IS_ERR(msg)) {
2017 		req->r_err = PTR_ERR(msg);
2018 		complete_request(mdsc, req);
2019 		return PTR_ERR(msg);
2020 	}
2021 	req->r_request = msg;
2022 
2023 	rhead = msg->front.iov_base;
2024 	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2025 	if (req->r_got_unsafe)
2026 		flags |= CEPH_MDS_FLAG_REPLAY;
2027 	if (req->r_locked_dir)
2028 		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2029 	rhead->flags = cpu_to_le32(flags);
2030 	rhead->num_fwd = req->r_num_fwd;
2031 	rhead->num_retry = req->r_attempts - 1;
2032 	rhead->ino = 0;
2033 
2034 	dout(" r_locked_dir = %p\n", req->r_locked_dir);
2035 	return 0;
2036 }
2037 
2038 /*
2039  * send request, or put it on the appropriate wait list.
2040  */
2041 static int __do_request(struct ceph_mds_client *mdsc,
2042 			struct ceph_mds_request *req)
2043 {
2044 	struct ceph_mds_session *session = NULL;
2045 	int mds = -1;
2046 	int err = -EAGAIN;
2047 
2048 	if (req->r_err || req->r_got_result) {
2049 		if (req->r_aborted)
2050 			__unregister_request(mdsc, req);
2051 		goto out;
2052 	}
2053 
2054 	if (req->r_timeout &&
2055 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2056 		dout("do_request timed out\n");
2057 		err = -EIO;
2058 		goto finish;
2059 	}
2060 
2061 	put_request_session(req);
2062 
2063 	mds = __choose_mds(mdsc, req);
2064 	if (mds < 0 ||
2065 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2066 		dout("do_request no mds or not active, waiting for map\n");
2067 		list_add(&req->r_wait, &mdsc->waiting_for_map);
2068 		goto out;
2069 	}
2070 
2071 	/* get, open session */
2072 	session = __ceph_lookup_mds_session(mdsc, mds);
2073 	if (!session) {
2074 		session = register_session(mdsc, mds);
2075 		if (IS_ERR(session)) {
2076 			err = PTR_ERR(session);
2077 			goto finish;
2078 		}
2079 	}
2080 	req->r_session = get_session(session);
2081 
2082 	dout("do_request mds%d session %p state %s\n", mds, session,
2083 	     ceph_session_state_name(session->s_state));
2084 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2085 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2086 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2087 		    session->s_state == CEPH_MDS_SESSION_CLOSING)
2088 			__open_session(mdsc, session);
2089 		list_add(&req->r_wait, &session->s_waiting);
2090 		goto out_session;
2091 	}
2092 
2093 	/* send request */
2094 	req->r_resend_mds = -1;   /* forget any previous mds hint */
2095 
2096 	if (req->r_request_started == 0)   /* note request start time */
2097 		req->r_request_started = jiffies;
2098 
2099 	err = __prepare_send_request(mdsc, req, mds);
2100 	if (!err) {
2101 		ceph_msg_get(req->r_request);
2102 		ceph_con_send(&session->s_con, req->r_request);
2103 	}
2104 
2105 out_session:
2106 	ceph_put_mds_session(session);
2107 out:
2108 	return err;
2109 
2110 finish:
2111 	req->r_err = err;
2112 	complete_request(mdsc, req);
2113 	goto out;
2114 }
2115 
2116 /*
2117  * called under mdsc->mutex
2118  */
2119 static void __wake_requests(struct ceph_mds_client *mdsc,
2120 			    struct list_head *head)
2121 {
2122 	struct ceph_mds_request *req;
2123 	LIST_HEAD(tmp_list);
2124 
2125 	list_splice_init(head, &tmp_list);
2126 
2127 	while (!list_empty(&tmp_list)) {
2128 		req = list_entry(tmp_list.next,
2129 				 struct ceph_mds_request, r_wait);
2130 		list_del_init(&req->r_wait);
2131 		dout(" wake request %p tid %llu\n", req, req->r_tid);
2132 		__do_request(mdsc, req);
2133 	}
2134 }
2135 
2136 /*
2137  * Wake up threads with requests pending for @mds, so that they can
2138  * resubmit their requests to a possibly different mds.
2139  */
2140 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2141 {
2142 	struct ceph_mds_request *req;
2143 	struct rb_node *p = rb_first(&mdsc->request_tree);
2144 
2145 	dout("kick_requests mds%d\n", mds);
2146 	while (p) {
2147 		req = rb_entry(p, struct ceph_mds_request, r_node);
2148 		p = rb_next(p);
2149 		if (req->r_got_unsafe)
2150 			continue;
2151 		if (req->r_session &&
2152 		    req->r_session->s_mds == mds) {
2153 			dout(" kicking tid %llu\n", req->r_tid);
2154 			list_del_init(&req->r_wait);
2155 			__do_request(mdsc, req);
2156 		}
2157 	}
2158 }
2159 
2160 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
2161 			      struct ceph_mds_request *req)
2162 {
2163 	dout("submit_request on %p\n", req);
2164 	mutex_lock(&mdsc->mutex);
2165 	__register_request(mdsc, req, NULL);
2166 	__do_request(mdsc, req);
2167 	mutex_unlock(&mdsc->mutex);
2168 }
2169 
2170 /*
2171  * Synchrously perform an mds request.  Take care of all of the
2172  * session setup, forwarding, retry details.
2173  */
2174 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
2175 			 struct inode *dir,
2176 			 struct ceph_mds_request *req)
2177 {
2178 	int err;
2179 
2180 	dout("do_request on %p\n", req);
2181 
2182 	/* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
2183 	if (req->r_inode)
2184 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2185 	if (req->r_locked_dir)
2186 		ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
2187 	if (req->r_old_dentry_dir)
2188 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2189 				  CEPH_CAP_PIN);
2190 
2191 	/* issue */
2192 	mutex_lock(&mdsc->mutex);
2193 	__register_request(mdsc, req, dir);
2194 	__do_request(mdsc, req);
2195 
2196 	if (req->r_err) {
2197 		err = req->r_err;
2198 		__unregister_request(mdsc, req);
2199 		dout("do_request early error %d\n", err);
2200 		goto out;
2201 	}
2202 
2203 	/* wait */
2204 	mutex_unlock(&mdsc->mutex);
2205 	dout("do_request waiting\n");
2206 	if (req->r_timeout) {
2207 		err = (long)wait_for_completion_killable_timeout(
2208 			&req->r_completion, req->r_timeout);
2209 		if (err == 0)
2210 			err = -EIO;
2211 	} else {
2212 		err = wait_for_completion_killable(&req->r_completion);
2213 	}
2214 	dout("do_request waited, got %d\n", err);
2215 	mutex_lock(&mdsc->mutex);
2216 
2217 	/* only abort if we didn't race with a real reply */
2218 	if (req->r_got_result) {
2219 		err = le32_to_cpu(req->r_reply_info.head->result);
2220 	} else if (err < 0) {
2221 		dout("aborted request %lld with %d\n", req->r_tid, err);
2222 
2223 		/*
2224 		 * ensure we aren't running concurrently with
2225 		 * ceph_fill_trace or ceph_readdir_prepopulate, which
2226 		 * rely on locks (dir mutex) held by our caller.
2227 		 */
2228 		mutex_lock(&req->r_fill_mutex);
2229 		req->r_err = err;
2230 		req->r_aborted = true;
2231 		mutex_unlock(&req->r_fill_mutex);
2232 
2233 		if (req->r_locked_dir &&
2234 		    (req->r_op & CEPH_MDS_OP_WRITE))
2235 			ceph_invalidate_dir_request(req);
2236 	} else {
2237 		err = req->r_err;
2238 	}
2239 
2240 out:
2241 	mutex_unlock(&mdsc->mutex);
2242 	dout("do_request %p done, result %d\n", req, err);
2243 	return err;
2244 }
2245 
2246 /*
2247  * Invalidate dir's completeness, dentry lease state on an aborted MDS
2248  * namespace request.
2249  */
2250 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
2251 {
2252 	struct inode *inode = req->r_locked_dir;
2253 
2254 	dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
2255 
2256 	ceph_dir_clear_complete(inode);
2257 	if (req->r_dentry)
2258 		ceph_invalidate_dentry_lease(req->r_dentry);
2259 	if (req->r_old_dentry)
2260 		ceph_invalidate_dentry_lease(req->r_old_dentry);
2261 }
2262 
2263 /*
2264  * Handle mds reply.
2265  *
2266  * We take the session mutex and parse and process the reply immediately.
2267  * This preserves the logical ordering of replies, capabilities, etc., sent
2268  * by the MDS as they are applied to our local cache.
2269  */
2270 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2271 {
2272 	struct ceph_mds_client *mdsc = session->s_mdsc;
2273 	struct ceph_mds_request *req;
2274 	struct ceph_mds_reply_head *head = msg->front.iov_base;
2275 	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2276 	u64 tid;
2277 	int err, result;
2278 	int mds = session->s_mds;
2279 
2280 	if (msg->front.iov_len < sizeof(*head)) {
2281 		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2282 		ceph_msg_dump(msg);
2283 		return;
2284 	}
2285 
2286 	/* get request, session */
2287 	tid = le64_to_cpu(msg->hdr.tid);
2288 	mutex_lock(&mdsc->mutex);
2289 	req = __lookup_request(mdsc, tid);
2290 	if (!req) {
2291 		dout("handle_reply on unknown tid %llu\n", tid);
2292 		mutex_unlock(&mdsc->mutex);
2293 		return;
2294 	}
2295 	dout("handle_reply %p\n", req);
2296 
2297 	/* correct session? */
2298 	if (req->r_session != session) {
2299 		pr_err("mdsc_handle_reply got %llu on session mds%d"
2300 		       " not mds%d\n", tid, session->s_mds,
2301 		       req->r_session ? req->r_session->s_mds : -1);
2302 		mutex_unlock(&mdsc->mutex);
2303 		goto out;
2304 	}
2305 
2306 	/* dup? */
2307 	if ((req->r_got_unsafe && !head->safe) ||
2308 	    (req->r_got_safe && head->safe)) {
2309 		pr_warn("got a dup %s reply on %llu from mds%d\n",
2310 			   head->safe ? "safe" : "unsafe", tid, mds);
2311 		mutex_unlock(&mdsc->mutex);
2312 		goto out;
2313 	}
2314 	if (req->r_got_safe && !head->safe) {
2315 		pr_warn("got unsafe after safe on %llu from mds%d\n",
2316 			   tid, mds);
2317 		mutex_unlock(&mdsc->mutex);
2318 		goto out;
2319 	}
2320 
2321 	result = le32_to_cpu(head->result);
2322 
2323 	/*
2324 	 * Handle an ESTALE
2325 	 * if we're not talking to the authority, send to them
2326 	 * if the authority has changed while we weren't looking,
2327 	 * send to new authority
2328 	 * Otherwise we just have to return an ESTALE
2329 	 */
2330 	if (result == -ESTALE) {
2331 		dout("got ESTALE on request %llu", req->r_tid);
2332 		req->r_resend_mds = -1;
2333 		if (req->r_direct_mode != USE_AUTH_MDS) {
2334 			dout("not using auth, setting for that now");
2335 			req->r_direct_mode = USE_AUTH_MDS;
2336 			__do_request(mdsc, req);
2337 			mutex_unlock(&mdsc->mutex);
2338 			goto out;
2339 		} else  {
2340 			int mds = __choose_mds(mdsc, req);
2341 			if (mds >= 0 && mds != req->r_session->s_mds) {
2342 				dout("but auth changed, so resending");
2343 				__do_request(mdsc, req);
2344 				mutex_unlock(&mdsc->mutex);
2345 				goto out;
2346 			}
2347 		}
2348 		dout("have to return ESTALE on request %llu", req->r_tid);
2349 	}
2350 
2351 
2352 	if (head->safe) {
2353 		req->r_got_safe = true;
2354 		__unregister_request(mdsc, req);
2355 
2356 		if (req->r_got_unsafe) {
2357 			/*
2358 			 * We already handled the unsafe response, now do the
2359 			 * cleanup.  No need to examine the response; the MDS
2360 			 * doesn't include any result info in the safe
2361 			 * response.  And even if it did, there is nothing
2362 			 * useful we could do with a revised return value.
2363 			 */
2364 			dout("got safe reply %llu, mds%d\n", tid, mds);
2365 			list_del_init(&req->r_unsafe_item);
2366 
2367 			/* last unsafe request during umount? */
2368 			if (mdsc->stopping && !__get_oldest_req(mdsc))
2369 				complete_all(&mdsc->safe_umount_waiters);
2370 			mutex_unlock(&mdsc->mutex);
2371 			goto out;
2372 		}
2373 	} else {
2374 		req->r_got_unsafe = true;
2375 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2376 	}
2377 
2378 	dout("handle_reply tid %lld result %d\n", tid, result);
2379 	rinfo = &req->r_reply_info;
2380 	err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2381 	mutex_unlock(&mdsc->mutex);
2382 
2383 	mutex_lock(&session->s_mutex);
2384 	if (err < 0) {
2385 		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2386 		ceph_msg_dump(msg);
2387 		goto out_err;
2388 	}
2389 
2390 	/* snap trace */
2391 	if (rinfo->snapblob_len) {
2392 		down_write(&mdsc->snap_rwsem);
2393 		ceph_update_snap_trace(mdsc, rinfo->snapblob,
2394 			       rinfo->snapblob + rinfo->snapblob_len,
2395 			       le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2396 		downgrade_write(&mdsc->snap_rwsem);
2397 	} else {
2398 		down_read(&mdsc->snap_rwsem);
2399 	}
2400 
2401 	/* insert trace into our cache */
2402 	mutex_lock(&req->r_fill_mutex);
2403 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2404 	if (err == 0) {
2405 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
2406 				    req->r_op == CEPH_MDS_OP_LSSNAP))
2407 			ceph_readdir_prepopulate(req, req->r_session);
2408 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2409 	}
2410 	mutex_unlock(&req->r_fill_mutex);
2411 
2412 	up_read(&mdsc->snap_rwsem);
2413 out_err:
2414 	mutex_lock(&mdsc->mutex);
2415 	if (!req->r_aborted) {
2416 		if (err) {
2417 			req->r_err = err;
2418 		} else {
2419 			req->r_reply = msg;
2420 			ceph_msg_get(msg);
2421 			req->r_got_result = true;
2422 		}
2423 	} else {
2424 		dout("reply arrived after request %lld was aborted\n", tid);
2425 	}
2426 	mutex_unlock(&mdsc->mutex);
2427 
2428 	ceph_add_cap_releases(mdsc, req->r_session);
2429 	mutex_unlock(&session->s_mutex);
2430 
2431 	/* kick calling process */
2432 	complete_request(mdsc, req);
2433 out:
2434 	ceph_mdsc_put_request(req);
2435 	return;
2436 }
2437 
2438 
2439 
2440 /*
2441  * handle mds notification that our request has been forwarded.
2442  */
2443 static void handle_forward(struct ceph_mds_client *mdsc,
2444 			   struct ceph_mds_session *session,
2445 			   struct ceph_msg *msg)
2446 {
2447 	struct ceph_mds_request *req;
2448 	u64 tid = le64_to_cpu(msg->hdr.tid);
2449 	u32 next_mds;
2450 	u32 fwd_seq;
2451 	int err = -EINVAL;
2452 	void *p = msg->front.iov_base;
2453 	void *end = p + msg->front.iov_len;
2454 
2455 	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2456 	next_mds = ceph_decode_32(&p);
2457 	fwd_seq = ceph_decode_32(&p);
2458 
2459 	mutex_lock(&mdsc->mutex);
2460 	req = __lookup_request(mdsc, tid);
2461 	if (!req) {
2462 		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2463 		goto out;  /* dup reply? */
2464 	}
2465 
2466 	if (req->r_aborted) {
2467 		dout("forward tid %llu aborted, unregistering\n", tid);
2468 		__unregister_request(mdsc, req);
2469 	} else if (fwd_seq <= req->r_num_fwd) {
2470 		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2471 		     tid, next_mds, req->r_num_fwd, fwd_seq);
2472 	} else {
2473 		/* resend. forward race not possible; mds would drop */
2474 		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2475 		BUG_ON(req->r_err);
2476 		BUG_ON(req->r_got_result);
2477 		req->r_num_fwd = fwd_seq;
2478 		req->r_resend_mds = next_mds;
2479 		put_request_session(req);
2480 		__do_request(mdsc, req);
2481 	}
2482 	ceph_mdsc_put_request(req);
2483 out:
2484 	mutex_unlock(&mdsc->mutex);
2485 	return;
2486 
2487 bad:
2488 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
2489 }
2490 
2491 /*
2492  * handle a mds session control message
2493  */
2494 static void handle_session(struct ceph_mds_session *session,
2495 			   struct ceph_msg *msg)
2496 {
2497 	struct ceph_mds_client *mdsc = session->s_mdsc;
2498 	u32 op;
2499 	u64 seq;
2500 	int mds = session->s_mds;
2501 	struct ceph_mds_session_head *h = msg->front.iov_base;
2502 	int wake = 0;
2503 
2504 	/* decode */
2505 	if (msg->front.iov_len != sizeof(*h))
2506 		goto bad;
2507 	op = le32_to_cpu(h->op);
2508 	seq = le64_to_cpu(h->seq);
2509 
2510 	mutex_lock(&mdsc->mutex);
2511 	if (op == CEPH_SESSION_CLOSE)
2512 		__unregister_session(mdsc, session);
2513 	/* FIXME: this ttl calculation is generous */
2514 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2515 	mutex_unlock(&mdsc->mutex);
2516 
2517 	mutex_lock(&session->s_mutex);
2518 
2519 	dout("handle_session mds%d %s %p state %s seq %llu\n",
2520 	     mds, ceph_session_op_name(op), session,
2521 	     ceph_session_state_name(session->s_state), seq);
2522 
2523 	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2524 		session->s_state = CEPH_MDS_SESSION_OPEN;
2525 		pr_info("mds%d came back\n", session->s_mds);
2526 	}
2527 
2528 	switch (op) {
2529 	case CEPH_SESSION_OPEN:
2530 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2531 			pr_info("mds%d reconnect success\n", session->s_mds);
2532 		session->s_state = CEPH_MDS_SESSION_OPEN;
2533 		renewed_caps(mdsc, session, 0);
2534 		wake = 1;
2535 		if (mdsc->stopping)
2536 			__close_session(mdsc, session);
2537 		break;
2538 
2539 	case CEPH_SESSION_RENEWCAPS:
2540 		if (session->s_renew_seq == seq)
2541 			renewed_caps(mdsc, session, 1);
2542 		break;
2543 
2544 	case CEPH_SESSION_CLOSE:
2545 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2546 			pr_info("mds%d reconnect denied\n", session->s_mds);
2547 		remove_session_caps(session);
2548 		wake = 2; /* for good measure */
2549 		wake_up_all(&mdsc->session_close_wq);
2550 		break;
2551 
2552 	case CEPH_SESSION_STALE:
2553 		pr_info("mds%d caps went stale, renewing\n",
2554 			session->s_mds);
2555 		spin_lock(&session->s_gen_ttl_lock);
2556 		session->s_cap_gen++;
2557 		session->s_cap_ttl = jiffies - 1;
2558 		spin_unlock(&session->s_gen_ttl_lock);
2559 		send_renew_caps(mdsc, session);
2560 		break;
2561 
2562 	case CEPH_SESSION_RECALL_STATE:
2563 		trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2564 		break;
2565 
2566 	case CEPH_SESSION_FLUSHMSG:
2567 		send_flushmsg_ack(mdsc, session, seq);
2568 		break;
2569 
2570 	default:
2571 		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2572 		WARN_ON(1);
2573 	}
2574 
2575 	mutex_unlock(&session->s_mutex);
2576 	if (wake) {
2577 		mutex_lock(&mdsc->mutex);
2578 		__wake_requests(mdsc, &session->s_waiting);
2579 		if (wake == 2)
2580 			kick_requests(mdsc, mds);
2581 		mutex_unlock(&mdsc->mutex);
2582 	}
2583 	return;
2584 
2585 bad:
2586 	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2587 	       (int)msg->front.iov_len);
2588 	ceph_msg_dump(msg);
2589 	return;
2590 }
2591 
2592 
2593 /*
2594  * called under session->mutex.
2595  */
2596 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2597 				   struct ceph_mds_session *session)
2598 {
2599 	struct ceph_mds_request *req, *nreq;
2600 	int err;
2601 
2602 	dout("replay_unsafe_requests mds%d\n", session->s_mds);
2603 
2604 	mutex_lock(&mdsc->mutex);
2605 	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2606 		err = __prepare_send_request(mdsc, req, session->s_mds);
2607 		if (!err) {
2608 			ceph_msg_get(req->r_request);
2609 			ceph_con_send(&session->s_con, req->r_request);
2610 		}
2611 	}
2612 	mutex_unlock(&mdsc->mutex);
2613 }
2614 
2615 /*
2616  * Encode information about a cap for a reconnect with the MDS.
2617  */
2618 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2619 			  void *arg)
2620 {
2621 	union {
2622 		struct ceph_mds_cap_reconnect v2;
2623 		struct ceph_mds_cap_reconnect_v1 v1;
2624 	} rec;
2625 	size_t reclen;
2626 	struct ceph_inode_info *ci;
2627 	struct ceph_reconnect_state *recon_state = arg;
2628 	struct ceph_pagelist *pagelist = recon_state->pagelist;
2629 	char *path;
2630 	int pathlen, err;
2631 	u64 pathbase;
2632 	struct dentry *dentry;
2633 
2634 	ci = cap->ci;
2635 
2636 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2637 	     inode, ceph_vinop(inode), cap, cap->cap_id,
2638 	     ceph_cap_string(cap->issued));
2639 	err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2640 	if (err)
2641 		return err;
2642 
2643 	dentry = d_find_alias(inode);
2644 	if (dentry) {
2645 		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2646 		if (IS_ERR(path)) {
2647 			err = PTR_ERR(path);
2648 			goto out_dput;
2649 		}
2650 	} else {
2651 		path = NULL;
2652 		pathlen = 0;
2653 	}
2654 	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2655 	if (err)
2656 		goto out_free;
2657 
2658 	spin_lock(&ci->i_ceph_lock);
2659 	cap->seq = 0;        /* reset cap seq */
2660 	cap->issue_seq = 0;  /* and issue_seq */
2661 	cap->mseq = 0;       /* and migrate_seq */
2662 	cap->cap_gen = cap->session->s_cap_gen;
2663 
2664 	if (recon_state->flock) {
2665 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2666 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2667 		rec.v2.issued = cpu_to_le32(cap->issued);
2668 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2669 		rec.v2.pathbase = cpu_to_le64(pathbase);
2670 		rec.v2.flock_len = 0;
2671 		reclen = sizeof(rec.v2);
2672 	} else {
2673 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2674 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2675 		rec.v1.issued = cpu_to_le32(cap->issued);
2676 		rec.v1.size = cpu_to_le64(inode->i_size);
2677 		ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2678 		ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2679 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2680 		rec.v1.pathbase = cpu_to_le64(pathbase);
2681 		reclen = sizeof(rec.v1);
2682 	}
2683 	spin_unlock(&ci->i_ceph_lock);
2684 
2685 	if (recon_state->flock) {
2686 		int num_fcntl_locks, num_flock_locks;
2687 		struct ceph_filelock *flocks;
2688 
2689 encode_again:
2690 		spin_lock(&inode->i_lock);
2691 		ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
2692 		spin_unlock(&inode->i_lock);
2693 		flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
2694 				 sizeof(struct ceph_filelock), GFP_NOFS);
2695 		if (!flocks) {
2696 			err = -ENOMEM;
2697 			goto out_free;
2698 		}
2699 		spin_lock(&inode->i_lock);
2700 		err = ceph_encode_locks_to_buffer(inode, flocks,
2701 						  num_fcntl_locks,
2702 						  num_flock_locks);
2703 		spin_unlock(&inode->i_lock);
2704 		if (err) {
2705 			kfree(flocks);
2706 			if (err == -ENOSPC)
2707 				goto encode_again;
2708 			goto out_free;
2709 		}
2710 		/*
2711 		 * number of encoded locks is stable, so copy to pagelist
2712 		 */
2713 		rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
2714 				    (num_fcntl_locks+num_flock_locks) *
2715 				    sizeof(struct ceph_filelock));
2716 		err = ceph_pagelist_append(pagelist, &rec, reclen);
2717 		if (!err)
2718 			err = ceph_locks_to_pagelist(flocks, pagelist,
2719 						     num_fcntl_locks,
2720 						     num_flock_locks);
2721 		kfree(flocks);
2722 	} else {
2723 		err = ceph_pagelist_append(pagelist, &rec, reclen);
2724 	}
2725 
2726 	recon_state->nr_caps++;
2727 out_free:
2728 	kfree(path);
2729 out_dput:
2730 	dput(dentry);
2731 	return err;
2732 }
2733 
2734 
2735 /*
2736  * If an MDS fails and recovers, clients need to reconnect in order to
2737  * reestablish shared state.  This includes all caps issued through
2738  * this session _and_ the snap_realm hierarchy.  Because it's not
2739  * clear which snap realms the mds cares about, we send everything we
2740  * know about.. that ensures we'll then get any new info the
2741  * recovering MDS might have.
2742  *
2743  * This is a relatively heavyweight operation, but it's rare.
2744  *
2745  * called with mdsc->mutex held.
2746  */
2747 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2748 			       struct ceph_mds_session *session)
2749 {
2750 	struct ceph_msg *reply;
2751 	struct rb_node *p;
2752 	int mds = session->s_mds;
2753 	int err = -ENOMEM;
2754 	int s_nr_caps;
2755 	struct ceph_pagelist *pagelist;
2756 	struct ceph_reconnect_state recon_state;
2757 
2758 	pr_info("mds%d reconnect start\n", mds);
2759 
2760 	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2761 	if (!pagelist)
2762 		goto fail_nopagelist;
2763 	ceph_pagelist_init(pagelist);
2764 
2765 	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
2766 	if (!reply)
2767 		goto fail_nomsg;
2768 
2769 	mutex_lock(&session->s_mutex);
2770 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2771 	session->s_seq = 0;
2772 
2773 	dout("session %p state %s\n", session,
2774 	     ceph_session_state_name(session->s_state));
2775 
2776 	spin_lock(&session->s_gen_ttl_lock);
2777 	session->s_cap_gen++;
2778 	spin_unlock(&session->s_gen_ttl_lock);
2779 
2780 	spin_lock(&session->s_cap_lock);
2781 	/*
2782 	 * notify __ceph_remove_cap() that we are composing cap reconnect.
2783 	 * If a cap get released before being added to the cap reconnect,
2784 	 * __ceph_remove_cap() should skip queuing cap release.
2785 	 */
2786 	session->s_cap_reconnect = 1;
2787 	/* drop old cap expires; we're about to reestablish that state */
2788 	discard_cap_releases(mdsc, session);
2789 	spin_unlock(&session->s_cap_lock);
2790 
2791 	/* trim unused caps to reduce MDS's cache rejoin time */
2792 	shrink_dcache_parent(mdsc->fsc->sb->s_root);
2793 
2794 	ceph_con_close(&session->s_con);
2795 	ceph_con_open(&session->s_con,
2796 		      CEPH_ENTITY_TYPE_MDS, mds,
2797 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2798 
2799 	/* replay unsafe requests */
2800 	replay_unsafe_requests(mdsc, session);
2801 
2802 	down_read(&mdsc->snap_rwsem);
2803 
2804 	/* traverse this session's caps */
2805 	s_nr_caps = session->s_nr_caps;
2806 	err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
2807 	if (err)
2808 		goto fail;
2809 
2810 	recon_state.nr_caps = 0;
2811 	recon_state.pagelist = pagelist;
2812 	recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2813 	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2814 	if (err < 0)
2815 		goto fail;
2816 
2817 	spin_lock(&session->s_cap_lock);
2818 	session->s_cap_reconnect = 0;
2819 	spin_unlock(&session->s_cap_lock);
2820 
2821 	/*
2822 	 * snaprealms.  we provide mds with the ino, seq (version), and
2823 	 * parent for all of our realms.  If the mds has any newer info,
2824 	 * it will tell us.
2825 	 */
2826 	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2827 		struct ceph_snap_realm *realm =
2828 			rb_entry(p, struct ceph_snap_realm, node);
2829 		struct ceph_mds_snaprealm_reconnect sr_rec;
2830 
2831 		dout(" adding snap realm %llx seq %lld parent %llx\n",
2832 		     realm->ino, realm->seq, realm->parent_ino);
2833 		sr_rec.ino = cpu_to_le64(realm->ino);
2834 		sr_rec.seq = cpu_to_le64(realm->seq);
2835 		sr_rec.parent = cpu_to_le64(realm->parent_ino);
2836 		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2837 		if (err)
2838 			goto fail;
2839 	}
2840 
2841 	if (recon_state.flock)
2842 		reply->hdr.version = cpu_to_le16(2);
2843 
2844 	/* raced with cap release? */
2845 	if (s_nr_caps != recon_state.nr_caps) {
2846 		struct page *page = list_first_entry(&pagelist->head,
2847 						     struct page, lru);
2848 		__le32 *addr = kmap_atomic(page);
2849 		*addr = cpu_to_le32(recon_state.nr_caps);
2850 		kunmap_atomic(addr);
2851 	}
2852 
2853 	reply->hdr.data_len = cpu_to_le32(pagelist->length);
2854 	ceph_msg_data_add_pagelist(reply, pagelist);
2855 	ceph_con_send(&session->s_con, reply);
2856 
2857 	mutex_unlock(&session->s_mutex);
2858 
2859 	mutex_lock(&mdsc->mutex);
2860 	__wake_requests(mdsc, &session->s_waiting);
2861 	mutex_unlock(&mdsc->mutex);
2862 
2863 	up_read(&mdsc->snap_rwsem);
2864 	return;
2865 
2866 fail:
2867 	ceph_msg_put(reply);
2868 	up_read(&mdsc->snap_rwsem);
2869 	mutex_unlock(&session->s_mutex);
2870 fail_nomsg:
2871 	ceph_pagelist_release(pagelist);
2872 fail_nopagelist:
2873 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2874 	return;
2875 }
2876 
2877 
2878 /*
2879  * compare old and new mdsmaps, kicking requests
2880  * and closing out old connections as necessary
2881  *
2882  * called under mdsc->mutex.
2883  */
2884 static void check_new_map(struct ceph_mds_client *mdsc,
2885 			  struct ceph_mdsmap *newmap,
2886 			  struct ceph_mdsmap *oldmap)
2887 {
2888 	int i;
2889 	int oldstate, newstate;
2890 	struct ceph_mds_session *s;
2891 
2892 	dout("check_new_map new %u old %u\n",
2893 	     newmap->m_epoch, oldmap->m_epoch);
2894 
2895 	for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2896 		if (mdsc->sessions[i] == NULL)
2897 			continue;
2898 		s = mdsc->sessions[i];
2899 		oldstate = ceph_mdsmap_get_state(oldmap, i);
2900 		newstate = ceph_mdsmap_get_state(newmap, i);
2901 
2902 		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2903 		     i, ceph_mds_state_name(oldstate),
2904 		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2905 		     ceph_mds_state_name(newstate),
2906 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2907 		     ceph_session_state_name(s->s_state));
2908 
2909 		if (i >= newmap->m_max_mds ||
2910 		    memcmp(ceph_mdsmap_get_addr(oldmap, i),
2911 			   ceph_mdsmap_get_addr(newmap, i),
2912 			   sizeof(struct ceph_entity_addr))) {
2913 			if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2914 				/* the session never opened, just close it
2915 				 * out now */
2916 				__wake_requests(mdsc, &s->s_waiting);
2917 				__unregister_session(mdsc, s);
2918 			} else {
2919 				/* just close it */
2920 				mutex_unlock(&mdsc->mutex);
2921 				mutex_lock(&s->s_mutex);
2922 				mutex_lock(&mdsc->mutex);
2923 				ceph_con_close(&s->s_con);
2924 				mutex_unlock(&s->s_mutex);
2925 				s->s_state = CEPH_MDS_SESSION_RESTARTING;
2926 			}
2927 
2928 			/* kick any requests waiting on the recovering mds */
2929 			kick_requests(mdsc, i);
2930 		} else if (oldstate == newstate) {
2931 			continue;  /* nothing new with this mds */
2932 		}
2933 
2934 		/*
2935 		 * send reconnect?
2936 		 */
2937 		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2938 		    newstate >= CEPH_MDS_STATE_RECONNECT) {
2939 			mutex_unlock(&mdsc->mutex);
2940 			send_mds_reconnect(mdsc, s);
2941 			mutex_lock(&mdsc->mutex);
2942 		}
2943 
2944 		/*
2945 		 * kick request on any mds that has gone active.
2946 		 */
2947 		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2948 		    newstate >= CEPH_MDS_STATE_ACTIVE) {
2949 			if (oldstate != CEPH_MDS_STATE_CREATING &&
2950 			    oldstate != CEPH_MDS_STATE_STARTING)
2951 				pr_info("mds%d recovery completed\n", s->s_mds);
2952 			kick_requests(mdsc, i);
2953 			ceph_kick_flushing_caps(mdsc, s);
2954 			wake_up_session_caps(s, 1);
2955 		}
2956 	}
2957 
2958 	for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2959 		s = mdsc->sessions[i];
2960 		if (!s)
2961 			continue;
2962 		if (!ceph_mdsmap_is_laggy(newmap, i))
2963 			continue;
2964 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2965 		    s->s_state == CEPH_MDS_SESSION_HUNG ||
2966 		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
2967 			dout(" connecting to export targets of laggy mds%d\n",
2968 			     i);
2969 			__open_export_target_sessions(mdsc, s);
2970 		}
2971 	}
2972 }
2973 
2974 
2975 
2976 /*
2977  * leases
2978  */
2979 
2980 /*
2981  * caller must hold session s_mutex, dentry->d_lock
2982  */
2983 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2984 {
2985 	struct ceph_dentry_info *di = ceph_dentry(dentry);
2986 
2987 	ceph_put_mds_session(di->lease_session);
2988 	di->lease_session = NULL;
2989 }
2990 
2991 static void handle_lease(struct ceph_mds_client *mdsc,
2992 			 struct ceph_mds_session *session,
2993 			 struct ceph_msg *msg)
2994 {
2995 	struct super_block *sb = mdsc->fsc->sb;
2996 	struct inode *inode;
2997 	struct dentry *parent, *dentry;
2998 	struct ceph_dentry_info *di;
2999 	int mds = session->s_mds;
3000 	struct ceph_mds_lease *h = msg->front.iov_base;
3001 	u32 seq;
3002 	struct ceph_vino vino;
3003 	struct qstr dname;
3004 	int release = 0;
3005 
3006 	dout("handle_lease from mds%d\n", mds);
3007 
3008 	/* decode */
3009 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
3010 		goto bad;
3011 	vino.ino = le64_to_cpu(h->ino);
3012 	vino.snap = CEPH_NOSNAP;
3013 	seq = le32_to_cpu(h->seq);
3014 	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
3015 	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
3016 	if (dname.len != get_unaligned_le32(h+1))
3017 		goto bad;
3018 
3019 	/* lookup inode */
3020 	inode = ceph_find_inode(sb, vino);
3021 	dout("handle_lease %s, ino %llx %p %.*s\n",
3022 	     ceph_lease_op_name(h->action), vino.ino, inode,
3023 	     dname.len, dname.name);
3024 
3025 	mutex_lock(&session->s_mutex);
3026 	session->s_seq++;
3027 
3028 	if (inode == NULL) {
3029 		dout("handle_lease no inode %llx\n", vino.ino);
3030 		goto release;
3031 	}
3032 
3033 	/* dentry */
3034 	parent = d_find_alias(inode);
3035 	if (!parent) {
3036 		dout("no parent dentry on inode %p\n", inode);
3037 		WARN_ON(1);
3038 		goto release;  /* hrm... */
3039 	}
3040 	dname.hash = full_name_hash(dname.name, dname.len);
3041 	dentry = d_lookup(parent, &dname);
3042 	dput(parent);
3043 	if (!dentry)
3044 		goto release;
3045 
3046 	spin_lock(&dentry->d_lock);
3047 	di = ceph_dentry(dentry);
3048 	switch (h->action) {
3049 	case CEPH_MDS_LEASE_REVOKE:
3050 		if (di->lease_session == session) {
3051 			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
3052 				h->seq = cpu_to_le32(di->lease_seq);
3053 			__ceph_mdsc_drop_dentry_lease(dentry);
3054 		}
3055 		release = 1;
3056 		break;
3057 
3058 	case CEPH_MDS_LEASE_RENEW:
3059 		if (di->lease_session == session &&
3060 		    di->lease_gen == session->s_cap_gen &&
3061 		    di->lease_renew_from &&
3062 		    di->lease_renew_after == 0) {
3063 			unsigned long duration =
3064 				le32_to_cpu(h->duration_ms) * HZ / 1000;
3065 
3066 			di->lease_seq = seq;
3067 			dentry->d_time = di->lease_renew_from + duration;
3068 			di->lease_renew_after = di->lease_renew_from +
3069 				(duration >> 1);
3070 			di->lease_renew_from = 0;
3071 		}
3072 		break;
3073 	}
3074 	spin_unlock(&dentry->d_lock);
3075 	dput(dentry);
3076 
3077 	if (!release)
3078 		goto out;
3079 
3080 release:
3081 	/* let's just reuse the same message */
3082 	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
3083 	ceph_msg_get(msg);
3084 	ceph_con_send(&session->s_con, msg);
3085 
3086 out:
3087 	iput(inode);
3088 	mutex_unlock(&session->s_mutex);
3089 	return;
3090 
3091 bad:
3092 	pr_err("corrupt lease message\n");
3093 	ceph_msg_dump(msg);
3094 }
3095 
3096 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
3097 			      struct inode *inode,
3098 			      struct dentry *dentry, char action,
3099 			      u32 seq)
3100 {
3101 	struct ceph_msg *msg;
3102 	struct ceph_mds_lease *lease;
3103 	int len = sizeof(*lease) + sizeof(u32);
3104 	int dnamelen = 0;
3105 
3106 	dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
3107 	     inode, dentry, ceph_lease_op_name(action), session->s_mds);
3108 	dnamelen = dentry->d_name.len;
3109 	len += dnamelen;
3110 
3111 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
3112 	if (!msg)
3113 		return;
3114 	lease = msg->front.iov_base;
3115 	lease->action = action;
3116 	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
3117 	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
3118 	lease->seq = cpu_to_le32(seq);
3119 	put_unaligned_le32(dnamelen, lease + 1);
3120 	memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
3121 
3122 	/*
3123 	 * if this is a preemptive lease RELEASE, no need to
3124 	 * flush request stream, since the actual request will
3125 	 * soon follow.
3126 	 */
3127 	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
3128 
3129 	ceph_con_send(&session->s_con, msg);
3130 }
3131 
3132 /*
3133  * Preemptively release a lease we expect to invalidate anyway.
3134  * Pass @inode always, @dentry is optional.
3135  */
3136 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
3137 			     struct dentry *dentry)
3138 {
3139 	struct ceph_dentry_info *di;
3140 	struct ceph_mds_session *session;
3141 	u32 seq;
3142 
3143 	BUG_ON(inode == NULL);
3144 	BUG_ON(dentry == NULL);
3145 
3146 	/* is dentry lease valid? */
3147 	spin_lock(&dentry->d_lock);
3148 	di = ceph_dentry(dentry);
3149 	if (!di || !di->lease_session ||
3150 	    di->lease_session->s_mds < 0 ||
3151 	    di->lease_gen != di->lease_session->s_cap_gen ||
3152 	    !time_before(jiffies, dentry->d_time)) {
3153 		dout("lease_release inode %p dentry %p -- "
3154 		     "no lease\n",
3155 		     inode, dentry);
3156 		spin_unlock(&dentry->d_lock);
3157 		return;
3158 	}
3159 
3160 	/* we do have a lease on this dentry; note mds and seq */
3161 	session = ceph_get_mds_session(di->lease_session);
3162 	seq = di->lease_seq;
3163 	__ceph_mdsc_drop_dentry_lease(dentry);
3164 	spin_unlock(&dentry->d_lock);
3165 
3166 	dout("lease_release inode %p dentry %p to mds%d\n",
3167 	     inode, dentry, session->s_mds);
3168 	ceph_mdsc_lease_send_msg(session, inode, dentry,
3169 				 CEPH_MDS_LEASE_RELEASE, seq);
3170 	ceph_put_mds_session(session);
3171 }
3172 
3173 /*
3174  * drop all leases (and dentry refs) in preparation for umount
3175  */
3176 static void drop_leases(struct ceph_mds_client *mdsc)
3177 {
3178 	int i;
3179 
3180 	dout("drop_leases\n");
3181 	mutex_lock(&mdsc->mutex);
3182 	for (i = 0; i < mdsc->max_sessions; i++) {
3183 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3184 		if (!s)
3185 			continue;
3186 		mutex_unlock(&mdsc->mutex);
3187 		mutex_lock(&s->s_mutex);
3188 		mutex_unlock(&s->s_mutex);
3189 		ceph_put_mds_session(s);
3190 		mutex_lock(&mdsc->mutex);
3191 	}
3192 	mutex_unlock(&mdsc->mutex);
3193 }
3194 
3195 
3196 
3197 /*
3198  * delayed work -- periodically trim expired leases, renew caps with mds
3199  */
3200 static void schedule_delayed(struct ceph_mds_client *mdsc)
3201 {
3202 	int delay = 5;
3203 	unsigned hz = round_jiffies_relative(HZ * delay);
3204 	schedule_delayed_work(&mdsc->delayed_work, hz);
3205 }
3206 
3207 static void delayed_work(struct work_struct *work)
3208 {
3209 	int i;
3210 	struct ceph_mds_client *mdsc =
3211 		container_of(work, struct ceph_mds_client, delayed_work.work);
3212 	int renew_interval;
3213 	int renew_caps;
3214 
3215 	dout("mdsc delayed_work\n");
3216 	ceph_check_delayed_caps(mdsc);
3217 
3218 	mutex_lock(&mdsc->mutex);
3219 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
3220 	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
3221 				   mdsc->last_renew_caps);
3222 	if (renew_caps)
3223 		mdsc->last_renew_caps = jiffies;
3224 
3225 	for (i = 0; i < mdsc->max_sessions; i++) {
3226 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
3227 		if (s == NULL)
3228 			continue;
3229 		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
3230 			dout("resending session close request for mds%d\n",
3231 			     s->s_mds);
3232 			request_close_session(mdsc, s);
3233 			ceph_put_mds_session(s);
3234 			continue;
3235 		}
3236 		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
3237 			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
3238 				s->s_state = CEPH_MDS_SESSION_HUNG;
3239 				pr_info("mds%d hung\n", s->s_mds);
3240 			}
3241 		}
3242 		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
3243 			/* this mds is failed or recovering, just wait */
3244 			ceph_put_mds_session(s);
3245 			continue;
3246 		}
3247 		mutex_unlock(&mdsc->mutex);
3248 
3249 		mutex_lock(&s->s_mutex);
3250 		if (renew_caps)
3251 			send_renew_caps(mdsc, s);
3252 		else
3253 			ceph_con_keepalive(&s->s_con);
3254 		ceph_add_cap_releases(mdsc, s);
3255 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
3256 		    s->s_state == CEPH_MDS_SESSION_HUNG)
3257 			ceph_send_cap_releases(mdsc, s);
3258 		mutex_unlock(&s->s_mutex);
3259 		ceph_put_mds_session(s);
3260 
3261 		mutex_lock(&mdsc->mutex);
3262 	}
3263 	mutex_unlock(&mdsc->mutex);
3264 
3265 	schedule_delayed(mdsc);
3266 }
3267 
3268 int ceph_mdsc_init(struct ceph_fs_client *fsc)
3269 
3270 {
3271 	struct ceph_mds_client *mdsc;
3272 
3273 	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
3274 	if (!mdsc)
3275 		return -ENOMEM;
3276 	mdsc->fsc = fsc;
3277 	fsc->mdsc = mdsc;
3278 	mutex_init(&mdsc->mutex);
3279 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
3280 	if (mdsc->mdsmap == NULL) {
3281 		kfree(mdsc);
3282 		return -ENOMEM;
3283 	}
3284 
3285 	init_completion(&mdsc->safe_umount_waiters);
3286 	init_waitqueue_head(&mdsc->session_close_wq);
3287 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
3288 	mdsc->sessions = NULL;
3289 	mdsc->max_sessions = 0;
3290 	mdsc->stopping = 0;
3291 	init_rwsem(&mdsc->snap_rwsem);
3292 	mdsc->snap_realms = RB_ROOT;
3293 	INIT_LIST_HEAD(&mdsc->snap_empty);
3294 	spin_lock_init(&mdsc->snap_empty_lock);
3295 	mdsc->last_tid = 0;
3296 	mdsc->request_tree = RB_ROOT;
3297 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
3298 	mdsc->last_renew_caps = jiffies;
3299 	INIT_LIST_HEAD(&mdsc->cap_delay_list);
3300 	spin_lock_init(&mdsc->cap_delay_lock);
3301 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
3302 	spin_lock_init(&mdsc->snap_flush_lock);
3303 	mdsc->cap_flush_seq = 0;
3304 	INIT_LIST_HEAD(&mdsc->cap_dirty);
3305 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
3306 	mdsc->num_cap_flushing = 0;
3307 	spin_lock_init(&mdsc->cap_dirty_lock);
3308 	init_waitqueue_head(&mdsc->cap_flushing_wq);
3309 	spin_lock_init(&mdsc->dentry_lru_lock);
3310 	INIT_LIST_HEAD(&mdsc->dentry_lru);
3311 
3312 	ceph_caps_init(mdsc);
3313 	ceph_adjust_min_caps(mdsc, fsc->min_caps);
3314 
3315 	return 0;
3316 }
3317 
3318 /*
3319  * Wait for safe replies on open mds requests.  If we time out, drop
3320  * all requests from the tree to avoid dangling dentry refs.
3321  */
3322 static void wait_requests(struct ceph_mds_client *mdsc)
3323 {
3324 	struct ceph_mds_request *req;
3325 	struct ceph_fs_client *fsc = mdsc->fsc;
3326 
3327 	mutex_lock(&mdsc->mutex);
3328 	if (__get_oldest_req(mdsc)) {
3329 		mutex_unlock(&mdsc->mutex);
3330 
3331 		dout("wait_requests waiting for requests\n");
3332 		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3333 				    fsc->client->options->mount_timeout * HZ);
3334 
3335 		/* tear down remaining requests */
3336 		mutex_lock(&mdsc->mutex);
3337 		while ((req = __get_oldest_req(mdsc))) {
3338 			dout("wait_requests timed out on tid %llu\n",
3339 			     req->r_tid);
3340 			__unregister_request(mdsc, req);
3341 		}
3342 	}
3343 	mutex_unlock(&mdsc->mutex);
3344 	dout("wait_requests done\n");
3345 }
3346 
3347 /*
3348  * called before mount is ro, and before dentries are torn down.
3349  * (hmm, does this still race with new lookups?)
3350  */
3351 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3352 {
3353 	dout("pre_umount\n");
3354 	mdsc->stopping = 1;
3355 
3356 	drop_leases(mdsc);
3357 	ceph_flush_dirty_caps(mdsc);
3358 	wait_requests(mdsc);
3359 
3360 	/*
3361 	 * wait for reply handlers to drop their request refs and
3362 	 * their inode/dcache refs
3363 	 */
3364 	ceph_msgr_flush();
3365 }
3366 
3367 /*
3368  * wait for all write mds requests to flush.
3369  */
3370 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3371 {
3372 	struct ceph_mds_request *req = NULL, *nextreq;
3373 	struct rb_node *n;
3374 
3375 	mutex_lock(&mdsc->mutex);
3376 	dout("wait_unsafe_requests want %lld\n", want_tid);
3377 restart:
3378 	req = __get_oldest_req(mdsc);
3379 	while (req && req->r_tid <= want_tid) {
3380 		/* find next request */
3381 		n = rb_next(&req->r_node);
3382 		if (n)
3383 			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3384 		else
3385 			nextreq = NULL;
3386 		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3387 			/* write op */
3388 			ceph_mdsc_get_request(req);
3389 			if (nextreq)
3390 				ceph_mdsc_get_request(nextreq);
3391 			mutex_unlock(&mdsc->mutex);
3392 			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3393 			     req->r_tid, want_tid);
3394 			wait_for_completion(&req->r_safe_completion);
3395 			mutex_lock(&mdsc->mutex);
3396 			ceph_mdsc_put_request(req);
3397 			if (!nextreq)
3398 				break;  /* next dne before, so we're done! */
3399 			if (RB_EMPTY_NODE(&nextreq->r_node)) {
3400 				/* next request was removed from tree */
3401 				ceph_mdsc_put_request(nextreq);
3402 				goto restart;
3403 			}
3404 			ceph_mdsc_put_request(nextreq);  /* won't go away */
3405 		}
3406 		req = nextreq;
3407 	}
3408 	mutex_unlock(&mdsc->mutex);
3409 	dout("wait_unsafe_requests done\n");
3410 }
3411 
3412 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3413 {
3414 	u64 want_tid, want_flush;
3415 
3416 	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3417 		return;
3418 
3419 	dout("sync\n");
3420 	mutex_lock(&mdsc->mutex);
3421 	want_tid = mdsc->last_tid;
3422 	want_flush = mdsc->cap_flush_seq;
3423 	mutex_unlock(&mdsc->mutex);
3424 	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3425 
3426 	ceph_flush_dirty_caps(mdsc);
3427 
3428 	wait_unsafe_requests(mdsc, want_tid);
3429 	wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3430 }
3431 
3432 /*
3433  * true if all sessions are closed, or we force unmount
3434  */
3435 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
3436 {
3437 	int i, n = 0;
3438 
3439 	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3440 		return true;
3441 
3442 	mutex_lock(&mdsc->mutex);
3443 	for (i = 0; i < mdsc->max_sessions; i++)
3444 		if (mdsc->sessions[i])
3445 			n++;
3446 	mutex_unlock(&mdsc->mutex);
3447 	return n == 0;
3448 }
3449 
3450 /*
3451  * called after sb is ro.
3452  */
3453 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3454 {
3455 	struct ceph_mds_session *session;
3456 	int i;
3457 	struct ceph_fs_client *fsc = mdsc->fsc;
3458 	unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3459 
3460 	dout("close_sessions\n");
3461 
3462 	/* close sessions */
3463 	mutex_lock(&mdsc->mutex);
3464 	for (i = 0; i < mdsc->max_sessions; i++) {
3465 		session = __ceph_lookup_mds_session(mdsc, i);
3466 		if (!session)
3467 			continue;
3468 		mutex_unlock(&mdsc->mutex);
3469 		mutex_lock(&session->s_mutex);
3470 		__close_session(mdsc, session);
3471 		mutex_unlock(&session->s_mutex);
3472 		ceph_put_mds_session(session);
3473 		mutex_lock(&mdsc->mutex);
3474 	}
3475 	mutex_unlock(&mdsc->mutex);
3476 
3477 	dout("waiting for sessions to close\n");
3478 	wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3479 			   timeout);
3480 
3481 	/* tear down remaining sessions */
3482 	mutex_lock(&mdsc->mutex);
3483 	for (i = 0; i < mdsc->max_sessions; i++) {
3484 		if (mdsc->sessions[i]) {
3485 			session = get_session(mdsc->sessions[i]);
3486 			__unregister_session(mdsc, session);
3487 			mutex_unlock(&mdsc->mutex);
3488 			mutex_lock(&session->s_mutex);
3489 			remove_session_caps(session);
3490 			mutex_unlock(&session->s_mutex);
3491 			ceph_put_mds_session(session);
3492 			mutex_lock(&mdsc->mutex);
3493 		}
3494 	}
3495 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
3496 	mutex_unlock(&mdsc->mutex);
3497 
3498 	ceph_cleanup_empty_realms(mdsc);
3499 
3500 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3501 
3502 	dout("stopped\n");
3503 }
3504 
3505 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3506 {
3507 	dout("stop\n");
3508 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3509 	if (mdsc->mdsmap)
3510 		ceph_mdsmap_destroy(mdsc->mdsmap);
3511 	kfree(mdsc->sessions);
3512 	ceph_caps_finalize(mdsc);
3513 }
3514 
3515 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3516 {
3517 	struct ceph_mds_client *mdsc = fsc->mdsc;
3518 
3519 	dout("mdsc_destroy %p\n", mdsc);
3520 	ceph_mdsc_stop(mdsc);
3521 
3522 	/* flush out any connection work with references to us */
3523 	ceph_msgr_flush();
3524 
3525 	fsc->mdsc = NULL;
3526 	kfree(mdsc);
3527 	dout("mdsc_destroy %p done\n", mdsc);
3528 }
3529 
3530 
3531 /*
3532  * handle mds map update.
3533  */
3534 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3535 {
3536 	u32 epoch;
3537 	u32 maplen;
3538 	void *p = msg->front.iov_base;
3539 	void *end = p + msg->front.iov_len;
3540 	struct ceph_mdsmap *newmap, *oldmap;
3541 	struct ceph_fsid fsid;
3542 	int err = -EINVAL;
3543 
3544 	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3545 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3546 	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3547 		return;
3548 	epoch = ceph_decode_32(&p);
3549 	maplen = ceph_decode_32(&p);
3550 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3551 
3552 	/* do we need it? */
3553 	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3554 	mutex_lock(&mdsc->mutex);
3555 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3556 		dout("handle_map epoch %u <= our %u\n",
3557 		     epoch, mdsc->mdsmap->m_epoch);
3558 		mutex_unlock(&mdsc->mutex);
3559 		return;
3560 	}
3561 
3562 	newmap = ceph_mdsmap_decode(&p, end);
3563 	if (IS_ERR(newmap)) {
3564 		err = PTR_ERR(newmap);
3565 		goto bad_unlock;
3566 	}
3567 
3568 	/* swap into place */
3569 	if (mdsc->mdsmap) {
3570 		oldmap = mdsc->mdsmap;
3571 		mdsc->mdsmap = newmap;
3572 		check_new_map(mdsc, newmap, oldmap);
3573 		ceph_mdsmap_destroy(oldmap);
3574 	} else {
3575 		mdsc->mdsmap = newmap;  /* first mds map */
3576 	}
3577 	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3578 
3579 	__wake_requests(mdsc, &mdsc->waiting_for_map);
3580 
3581 	mutex_unlock(&mdsc->mutex);
3582 	schedule_delayed(mdsc);
3583 	return;
3584 
3585 bad_unlock:
3586 	mutex_unlock(&mdsc->mutex);
3587 bad:
3588 	pr_err("error decoding mdsmap %d\n", err);
3589 	return;
3590 }
3591 
3592 static struct ceph_connection *con_get(struct ceph_connection *con)
3593 {
3594 	struct ceph_mds_session *s = con->private;
3595 
3596 	if (get_session(s)) {
3597 		dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3598 		return con;
3599 	}
3600 	dout("mdsc con_get %p FAIL\n", s);
3601 	return NULL;
3602 }
3603 
3604 static void con_put(struct ceph_connection *con)
3605 {
3606 	struct ceph_mds_session *s = con->private;
3607 
3608 	dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1);
3609 	ceph_put_mds_session(s);
3610 }
3611 
3612 /*
3613  * if the client is unresponsive for long enough, the mds will kill
3614  * the session entirely.
3615  */
3616 static void peer_reset(struct ceph_connection *con)
3617 {
3618 	struct ceph_mds_session *s = con->private;
3619 	struct ceph_mds_client *mdsc = s->s_mdsc;
3620 
3621 	pr_warn("mds%d closed our session\n", s->s_mds);
3622 	send_mds_reconnect(mdsc, s);
3623 }
3624 
3625 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3626 {
3627 	struct ceph_mds_session *s = con->private;
3628 	struct ceph_mds_client *mdsc = s->s_mdsc;
3629 	int type = le16_to_cpu(msg->hdr.type);
3630 
3631 	mutex_lock(&mdsc->mutex);
3632 	if (__verify_registered_session(mdsc, s) < 0) {
3633 		mutex_unlock(&mdsc->mutex);
3634 		goto out;
3635 	}
3636 	mutex_unlock(&mdsc->mutex);
3637 
3638 	switch (type) {
3639 	case CEPH_MSG_MDS_MAP:
3640 		ceph_mdsc_handle_map(mdsc, msg);
3641 		break;
3642 	case CEPH_MSG_CLIENT_SESSION:
3643 		handle_session(s, msg);
3644 		break;
3645 	case CEPH_MSG_CLIENT_REPLY:
3646 		handle_reply(s, msg);
3647 		break;
3648 	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3649 		handle_forward(mdsc, s, msg);
3650 		break;
3651 	case CEPH_MSG_CLIENT_CAPS:
3652 		ceph_handle_caps(s, msg);
3653 		break;
3654 	case CEPH_MSG_CLIENT_SNAP:
3655 		ceph_handle_snap(mdsc, s, msg);
3656 		break;
3657 	case CEPH_MSG_CLIENT_LEASE:
3658 		handle_lease(mdsc, s, msg);
3659 		break;
3660 
3661 	default:
3662 		pr_err("received unknown message type %d %s\n", type,
3663 		       ceph_msg_type_name(type));
3664 	}
3665 out:
3666 	ceph_msg_put(msg);
3667 }
3668 
3669 /*
3670  * authentication
3671  */
3672 
3673 /*
3674  * Note: returned pointer is the address of a structure that's
3675  * managed separately.  Caller must *not* attempt to free it.
3676  */
3677 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
3678 					int *proto, int force_new)
3679 {
3680 	struct ceph_mds_session *s = con->private;
3681 	struct ceph_mds_client *mdsc = s->s_mdsc;
3682 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3683 	struct ceph_auth_handshake *auth = &s->s_auth;
3684 
3685 	if (force_new && auth->authorizer) {
3686 		ceph_auth_destroy_authorizer(ac, auth->authorizer);
3687 		auth->authorizer = NULL;
3688 	}
3689 	if (!auth->authorizer) {
3690 		int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3691 						      auth);
3692 		if (ret)
3693 			return ERR_PTR(ret);
3694 	} else {
3695 		int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
3696 						      auth);
3697 		if (ret)
3698 			return ERR_PTR(ret);
3699 	}
3700 	*proto = ac->protocol;
3701 
3702 	return auth;
3703 }
3704 
3705 
3706 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3707 {
3708 	struct ceph_mds_session *s = con->private;
3709 	struct ceph_mds_client *mdsc = s->s_mdsc;
3710 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3711 
3712 	return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len);
3713 }
3714 
3715 static int invalidate_authorizer(struct ceph_connection *con)
3716 {
3717 	struct ceph_mds_session *s = con->private;
3718 	struct ceph_mds_client *mdsc = s->s_mdsc;
3719 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3720 
3721 	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3722 
3723 	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3724 }
3725 
3726 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
3727 				struct ceph_msg_header *hdr, int *skip)
3728 {
3729 	struct ceph_msg *msg;
3730 	int type = (int) le16_to_cpu(hdr->type);
3731 	int front_len = (int) le32_to_cpu(hdr->front_len);
3732 
3733 	if (con->in_msg)
3734 		return con->in_msg;
3735 
3736 	*skip = 0;
3737 	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
3738 	if (!msg) {
3739 		pr_err("unable to allocate msg type %d len %d\n",
3740 		       type, front_len);
3741 		return NULL;
3742 	}
3743 
3744 	return msg;
3745 }
3746 
3747 static const struct ceph_connection_operations mds_con_ops = {
3748 	.get = con_get,
3749 	.put = con_put,
3750 	.dispatch = dispatch,
3751 	.get_authorizer = get_authorizer,
3752 	.verify_authorizer_reply = verify_authorizer_reply,
3753 	.invalidate_authorizer = invalidate_authorizer,
3754 	.peer_reset = peer_reset,
3755 	.alloc_msg = mds_alloc_msg,
3756 };
3757 
3758 /* eof */
3759