xref: /linux/net/ceph/osd_client.c (revision acafe7e30216166a17e6e226aadc3ecb63993242)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <linux/ceph/ceph_debug.h>
4 
5 #include <linux/module.h>
6 #include <linux/err.h>
7 #include <linux/highmem.h>
8 #include <linux/mm.h>
9 #include <linux/pagemap.h>
10 #include <linux/slab.h>
11 #include <linux/uaccess.h>
12 #ifdef CONFIG_BLOCK
13 #include <linux/bio.h>
14 #endif
15 
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/libceph.h>
18 #include <linux/ceph/osd_client.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/auth.h>
22 #include <linux/ceph/pagelist.h>
23 #include <linux/ceph/striper.h>
24 
25 #define OSD_OPREPLY_FRONT_LEN	512
26 
27 static struct kmem_cache	*ceph_osd_request_cache;
28 
29 static const struct ceph_connection_operations osd_con_ops;
30 
31 /*
32  * Implement client access to distributed object storage cluster.
33  *
34  * All data objects are stored within a cluster/cloud of OSDs, or
35  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
36  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
37  * remote daemons serving up and coordinating consistent and safe
38  * access to storage.
39  *
40  * Cluster membership and the mapping of data objects onto storage devices
41  * are described by the osd map.
42  *
43  * We keep track of pending OSD requests (read, write), resubmit
44  * requests to different OSDs when the cluster topology/data layout
45  * change, or retry the affected requests when the communications
46  * channel with an OSD is reset.
47  */
48 
49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51 static void link_linger(struct ceph_osd *osd,
52 			struct ceph_osd_linger_request *lreq);
53 static void unlink_linger(struct ceph_osd *osd,
54 			  struct ceph_osd_linger_request *lreq);
55 static void clear_backoffs(struct ceph_osd *osd);
56 
57 #if 1
58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59 {
60 	bool wrlocked = true;
61 
62 	if (unlikely(down_read_trylock(sem))) {
63 		wrlocked = false;
64 		up_read(sem);
65 	}
66 
67 	return wrlocked;
68 }
69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70 {
71 	WARN_ON(!rwsem_is_locked(&osdc->lock));
72 }
73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74 {
75 	WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76 }
77 static inline void verify_osd_locked(struct ceph_osd *osd)
78 {
79 	struct ceph_osd_client *osdc = osd->o_osdc;
80 
81 	WARN_ON(!(mutex_is_locked(&osd->lock) &&
82 		  rwsem_is_locked(&osdc->lock)) &&
83 		!rwsem_is_wrlocked(&osdc->lock));
84 }
85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86 {
87 	WARN_ON(!mutex_is_locked(&lreq->lock));
88 }
89 #else
90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
92 static inline void verify_osd_locked(struct ceph_osd *osd) { }
93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94 #endif
95 
96 /*
97  * calculate the mapping of a file extent onto an object, and fill out the
98  * request accordingly.  shorten extent as necessary if it crosses an
99  * object boundary.
100  *
101  * fill osd op in request message.
102  */
103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104 			u64 *objnum, u64 *objoff, u64 *objlen)
105 {
106 	u64 orig_len = *plen;
107 	u32 xlen;
108 
109 	/* object extent? */
110 	ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111 					  objoff, &xlen);
112 	*objlen = xlen;
113 	if (*objlen < orig_len) {
114 		*plen = *objlen;
115 		dout(" skipping last %llu, final file extent %llu~%llu\n",
116 		     orig_len - *plen, off, *plen);
117 	}
118 
119 	dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120 	return 0;
121 }
122 
123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124 {
125 	memset(osd_data, 0, sizeof (*osd_data));
126 	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127 }
128 
129 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
130 			struct page **pages, u64 length, u32 alignment,
131 			bool pages_from_pool, bool own_pages)
132 {
133 	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
134 	osd_data->pages = pages;
135 	osd_data->length = length;
136 	osd_data->alignment = alignment;
137 	osd_data->pages_from_pool = pages_from_pool;
138 	osd_data->own_pages = own_pages;
139 }
140 
141 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
142 			struct ceph_pagelist *pagelist)
143 {
144 	osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
145 	osd_data->pagelist = pagelist;
146 }
147 
148 #ifdef CONFIG_BLOCK
149 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
150 				   struct ceph_bio_iter *bio_pos,
151 				   u32 bio_length)
152 {
153 	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
154 	osd_data->bio_pos = *bio_pos;
155 	osd_data->bio_length = bio_length;
156 }
157 #endif /* CONFIG_BLOCK */
158 
159 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
160 				     struct ceph_bvec_iter *bvec_pos)
161 {
162 	osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
163 	osd_data->bvec_pos = *bvec_pos;
164 }
165 
166 #define osd_req_op_data(oreq, whch, typ, fld)				\
167 ({									\
168 	struct ceph_osd_request *__oreq = (oreq);			\
169 	unsigned int __whch = (whch);					\
170 	BUG_ON(__whch >= __oreq->r_num_ops);				\
171 	&__oreq->r_ops[__whch].typ.fld;					\
172 })
173 
174 static struct ceph_osd_data *
175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
176 {
177 	BUG_ON(which >= osd_req->r_num_ops);
178 
179 	return &osd_req->r_ops[which].raw_data_in;
180 }
181 
182 struct ceph_osd_data *
183 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
184 			unsigned int which)
185 {
186 	return osd_req_op_data(osd_req, which, extent, osd_data);
187 }
188 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
189 
190 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
191 			unsigned int which, struct page **pages,
192 			u64 length, u32 alignment,
193 			bool pages_from_pool, bool own_pages)
194 {
195 	struct ceph_osd_data *osd_data;
196 
197 	osd_data = osd_req_op_raw_data_in(osd_req, which);
198 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
199 				pages_from_pool, own_pages);
200 }
201 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
202 
203 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
204 			unsigned int which, struct page **pages,
205 			u64 length, u32 alignment,
206 			bool pages_from_pool, bool own_pages)
207 {
208 	struct ceph_osd_data *osd_data;
209 
210 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
211 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
212 				pages_from_pool, own_pages);
213 }
214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
215 
216 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
217 			unsigned int which, struct ceph_pagelist *pagelist)
218 {
219 	struct ceph_osd_data *osd_data;
220 
221 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
222 	ceph_osd_data_pagelist_init(osd_data, pagelist);
223 }
224 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
225 
226 #ifdef CONFIG_BLOCK
227 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
228 				    unsigned int which,
229 				    struct ceph_bio_iter *bio_pos,
230 				    u32 bio_length)
231 {
232 	struct ceph_osd_data *osd_data;
233 
234 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
235 	ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
236 }
237 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
238 #endif /* CONFIG_BLOCK */
239 
240 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
241 					 unsigned int which,
242 					 struct ceph_bvec_iter *bvec_pos)
243 {
244 	struct ceph_osd_data *osd_data;
245 
246 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
247 	ceph_osd_data_bvecs_init(osd_data, bvec_pos);
248 }
249 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
250 
251 static void osd_req_op_cls_request_info_pagelist(
252 			struct ceph_osd_request *osd_req,
253 			unsigned int which, struct ceph_pagelist *pagelist)
254 {
255 	struct ceph_osd_data *osd_data;
256 
257 	osd_data = osd_req_op_data(osd_req, which, cls, request_info);
258 	ceph_osd_data_pagelist_init(osd_data, pagelist);
259 }
260 
261 void osd_req_op_cls_request_data_pagelist(
262 			struct ceph_osd_request *osd_req,
263 			unsigned int which, struct ceph_pagelist *pagelist)
264 {
265 	struct ceph_osd_data *osd_data;
266 
267 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
268 	ceph_osd_data_pagelist_init(osd_data, pagelist);
269 	osd_req->r_ops[which].cls.indata_len += pagelist->length;
270 	osd_req->r_ops[which].indata_len += pagelist->length;
271 }
272 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
273 
274 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
275 			unsigned int which, struct page **pages, u64 length,
276 			u32 alignment, bool pages_from_pool, bool own_pages)
277 {
278 	struct ceph_osd_data *osd_data;
279 
280 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
281 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
282 				pages_from_pool, own_pages);
283 	osd_req->r_ops[which].cls.indata_len += length;
284 	osd_req->r_ops[which].indata_len += length;
285 }
286 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
287 
288 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
289 				       unsigned int which,
290 				       struct bio_vec *bvecs, u32 bytes)
291 {
292 	struct ceph_osd_data *osd_data;
293 	struct ceph_bvec_iter it = {
294 		.bvecs = bvecs,
295 		.iter = { .bi_size = bytes },
296 	};
297 
298 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
299 	ceph_osd_data_bvecs_init(osd_data, &it);
300 	osd_req->r_ops[which].cls.indata_len += bytes;
301 	osd_req->r_ops[which].indata_len += bytes;
302 }
303 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
304 
305 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
306 			unsigned int which, struct page **pages, u64 length,
307 			u32 alignment, bool pages_from_pool, bool own_pages)
308 {
309 	struct ceph_osd_data *osd_data;
310 
311 	osd_data = osd_req_op_data(osd_req, which, cls, response_data);
312 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
313 				pages_from_pool, own_pages);
314 }
315 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
316 
317 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
318 {
319 	switch (osd_data->type) {
320 	case CEPH_OSD_DATA_TYPE_NONE:
321 		return 0;
322 	case CEPH_OSD_DATA_TYPE_PAGES:
323 		return osd_data->length;
324 	case CEPH_OSD_DATA_TYPE_PAGELIST:
325 		return (u64)osd_data->pagelist->length;
326 #ifdef CONFIG_BLOCK
327 	case CEPH_OSD_DATA_TYPE_BIO:
328 		return (u64)osd_data->bio_length;
329 #endif /* CONFIG_BLOCK */
330 	case CEPH_OSD_DATA_TYPE_BVECS:
331 		return osd_data->bvec_pos.iter.bi_size;
332 	default:
333 		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
334 		return 0;
335 	}
336 }
337 
338 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
339 {
340 	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
341 		int num_pages;
342 
343 		num_pages = calc_pages_for((u64)osd_data->alignment,
344 						(u64)osd_data->length);
345 		ceph_release_page_vector(osd_data->pages, num_pages);
346 	}
347 	ceph_osd_data_init(osd_data);
348 }
349 
350 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
351 			unsigned int which)
352 {
353 	struct ceph_osd_req_op *op;
354 
355 	BUG_ON(which >= osd_req->r_num_ops);
356 	op = &osd_req->r_ops[which];
357 
358 	switch (op->op) {
359 	case CEPH_OSD_OP_READ:
360 	case CEPH_OSD_OP_WRITE:
361 	case CEPH_OSD_OP_WRITEFULL:
362 		ceph_osd_data_release(&op->extent.osd_data);
363 		break;
364 	case CEPH_OSD_OP_CALL:
365 		ceph_osd_data_release(&op->cls.request_info);
366 		ceph_osd_data_release(&op->cls.request_data);
367 		ceph_osd_data_release(&op->cls.response_data);
368 		break;
369 	case CEPH_OSD_OP_SETXATTR:
370 	case CEPH_OSD_OP_CMPXATTR:
371 		ceph_osd_data_release(&op->xattr.osd_data);
372 		break;
373 	case CEPH_OSD_OP_STAT:
374 		ceph_osd_data_release(&op->raw_data_in);
375 		break;
376 	case CEPH_OSD_OP_NOTIFY_ACK:
377 		ceph_osd_data_release(&op->notify_ack.request_data);
378 		break;
379 	case CEPH_OSD_OP_NOTIFY:
380 		ceph_osd_data_release(&op->notify.request_data);
381 		ceph_osd_data_release(&op->notify.response_data);
382 		break;
383 	case CEPH_OSD_OP_LIST_WATCHERS:
384 		ceph_osd_data_release(&op->list_watchers.response_data);
385 		break;
386 	default:
387 		break;
388 	}
389 }
390 
391 /*
392  * Assumes @t is zero-initialized.
393  */
394 static void target_init(struct ceph_osd_request_target *t)
395 {
396 	ceph_oid_init(&t->base_oid);
397 	ceph_oloc_init(&t->base_oloc);
398 	ceph_oid_init(&t->target_oid);
399 	ceph_oloc_init(&t->target_oloc);
400 
401 	ceph_osds_init(&t->acting);
402 	ceph_osds_init(&t->up);
403 	t->size = -1;
404 	t->min_size = -1;
405 
406 	t->osd = CEPH_HOMELESS_OSD;
407 }
408 
409 static void target_copy(struct ceph_osd_request_target *dest,
410 			const struct ceph_osd_request_target *src)
411 {
412 	ceph_oid_copy(&dest->base_oid, &src->base_oid);
413 	ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
414 	ceph_oid_copy(&dest->target_oid, &src->target_oid);
415 	ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
416 
417 	dest->pgid = src->pgid; /* struct */
418 	dest->spgid = src->spgid; /* struct */
419 	dest->pg_num = src->pg_num;
420 	dest->pg_num_mask = src->pg_num_mask;
421 	ceph_osds_copy(&dest->acting, &src->acting);
422 	ceph_osds_copy(&dest->up, &src->up);
423 	dest->size = src->size;
424 	dest->min_size = src->min_size;
425 	dest->sort_bitwise = src->sort_bitwise;
426 
427 	dest->flags = src->flags;
428 	dest->paused = src->paused;
429 
430 	dest->epoch = src->epoch;
431 	dest->last_force_resend = src->last_force_resend;
432 
433 	dest->osd = src->osd;
434 }
435 
436 static void target_destroy(struct ceph_osd_request_target *t)
437 {
438 	ceph_oid_destroy(&t->base_oid);
439 	ceph_oloc_destroy(&t->base_oloc);
440 	ceph_oid_destroy(&t->target_oid);
441 	ceph_oloc_destroy(&t->target_oloc);
442 }
443 
444 /*
445  * requests
446  */
447 static void request_release_checks(struct ceph_osd_request *req)
448 {
449 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
450 	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
451 	WARN_ON(!list_empty(&req->r_unsafe_item));
452 	WARN_ON(req->r_osd);
453 }
454 
455 static void ceph_osdc_release_request(struct kref *kref)
456 {
457 	struct ceph_osd_request *req = container_of(kref,
458 					    struct ceph_osd_request, r_kref);
459 	unsigned int which;
460 
461 	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
462 	     req->r_request, req->r_reply);
463 	request_release_checks(req);
464 
465 	if (req->r_request)
466 		ceph_msg_put(req->r_request);
467 	if (req->r_reply)
468 		ceph_msg_put(req->r_reply);
469 
470 	for (which = 0; which < req->r_num_ops; which++)
471 		osd_req_op_data_release(req, which);
472 
473 	target_destroy(&req->r_t);
474 	ceph_put_snap_context(req->r_snapc);
475 
476 	if (req->r_mempool)
477 		mempool_free(req, req->r_osdc->req_mempool);
478 	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
479 		kmem_cache_free(ceph_osd_request_cache, req);
480 	else
481 		kfree(req);
482 }
483 
484 void ceph_osdc_get_request(struct ceph_osd_request *req)
485 {
486 	dout("%s %p (was %d)\n", __func__, req,
487 	     kref_read(&req->r_kref));
488 	kref_get(&req->r_kref);
489 }
490 EXPORT_SYMBOL(ceph_osdc_get_request);
491 
492 void ceph_osdc_put_request(struct ceph_osd_request *req)
493 {
494 	if (req) {
495 		dout("%s %p (was %d)\n", __func__, req,
496 		     kref_read(&req->r_kref));
497 		kref_put(&req->r_kref, ceph_osdc_release_request);
498 	}
499 }
500 EXPORT_SYMBOL(ceph_osdc_put_request);
501 
502 static void request_init(struct ceph_osd_request *req)
503 {
504 	/* req only, each op is zeroed in _osd_req_op_init() */
505 	memset(req, 0, sizeof(*req));
506 
507 	kref_init(&req->r_kref);
508 	init_completion(&req->r_completion);
509 	RB_CLEAR_NODE(&req->r_node);
510 	RB_CLEAR_NODE(&req->r_mc_node);
511 	INIT_LIST_HEAD(&req->r_unsafe_item);
512 
513 	target_init(&req->r_t);
514 }
515 
516 /*
517  * This is ugly, but it allows us to reuse linger registration and ping
518  * requests, keeping the structure of the code around send_linger{_ping}()
519  * reasonable.  Setting up a min_nr=2 mempool for each linger request
520  * and dealing with copying ops (this blasts req only, watch op remains
521  * intact) isn't any better.
522  */
523 static void request_reinit(struct ceph_osd_request *req)
524 {
525 	struct ceph_osd_client *osdc = req->r_osdc;
526 	bool mempool = req->r_mempool;
527 	unsigned int num_ops = req->r_num_ops;
528 	u64 snapid = req->r_snapid;
529 	struct ceph_snap_context *snapc = req->r_snapc;
530 	bool linger = req->r_linger;
531 	struct ceph_msg *request_msg = req->r_request;
532 	struct ceph_msg *reply_msg = req->r_reply;
533 
534 	dout("%s req %p\n", __func__, req);
535 	WARN_ON(kref_read(&req->r_kref) != 1);
536 	request_release_checks(req);
537 
538 	WARN_ON(kref_read(&request_msg->kref) != 1);
539 	WARN_ON(kref_read(&reply_msg->kref) != 1);
540 	target_destroy(&req->r_t);
541 
542 	request_init(req);
543 	req->r_osdc = osdc;
544 	req->r_mempool = mempool;
545 	req->r_num_ops = num_ops;
546 	req->r_snapid = snapid;
547 	req->r_snapc = snapc;
548 	req->r_linger = linger;
549 	req->r_request = request_msg;
550 	req->r_reply = reply_msg;
551 }
552 
553 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
554 					       struct ceph_snap_context *snapc,
555 					       unsigned int num_ops,
556 					       bool use_mempool,
557 					       gfp_t gfp_flags)
558 {
559 	struct ceph_osd_request *req;
560 
561 	if (use_mempool) {
562 		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
563 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
564 	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
565 		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
566 	} else {
567 		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
568 		req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
569 	}
570 	if (unlikely(!req))
571 		return NULL;
572 
573 	request_init(req);
574 	req->r_osdc = osdc;
575 	req->r_mempool = use_mempool;
576 	req->r_num_ops = num_ops;
577 	req->r_snapid = CEPH_NOSNAP;
578 	req->r_snapc = ceph_get_snap_context(snapc);
579 
580 	dout("%s req %p\n", __func__, req);
581 	return req;
582 }
583 EXPORT_SYMBOL(ceph_osdc_alloc_request);
584 
585 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
586 {
587 	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
588 }
589 
590 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
591 {
592 	struct ceph_osd_client *osdc = req->r_osdc;
593 	struct ceph_msg *msg;
594 	int msg_size;
595 
596 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
597 	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
598 
599 	/* create request message */
600 	msg_size = CEPH_ENCODING_START_BLK_LEN +
601 			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
602 	msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
603 	msg_size += CEPH_ENCODING_START_BLK_LEN +
604 			sizeof(struct ceph_osd_reqid); /* reqid */
605 	msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
606 	msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
607 	msg_size += CEPH_ENCODING_START_BLK_LEN +
608 			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
609 	msg_size += 4 + req->r_base_oid.name_len; /* oid */
610 	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
611 	msg_size += 8; /* snapid */
612 	msg_size += 8; /* snap_seq */
613 	msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
614 	msg_size += 4 + 8; /* retry_attempt, features */
615 
616 	if (req->r_mempool)
617 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
618 	else
619 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
620 	if (!msg)
621 		return -ENOMEM;
622 
623 	memset(msg->front.iov_base, 0, msg->front.iov_len);
624 	req->r_request = msg;
625 
626 	/* create reply message */
627 	msg_size = OSD_OPREPLY_FRONT_LEN;
628 	msg_size += req->r_base_oid.name_len;
629 	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
630 
631 	if (req->r_mempool)
632 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
633 	else
634 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
635 	if (!msg)
636 		return -ENOMEM;
637 
638 	req->r_reply = msg;
639 
640 	return 0;
641 }
642 EXPORT_SYMBOL(ceph_osdc_alloc_messages);
643 
644 static bool osd_req_opcode_valid(u16 opcode)
645 {
646 	switch (opcode) {
647 #define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return true;
648 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
649 #undef GENERATE_CASE
650 	default:
651 		return false;
652 	}
653 }
654 
655 /*
656  * This is an osd op init function for opcodes that have no data or
657  * other information associated with them.  It also serves as a
658  * common init routine for all the other init functions, below.
659  */
660 static struct ceph_osd_req_op *
661 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
662 		 u16 opcode, u32 flags)
663 {
664 	struct ceph_osd_req_op *op;
665 
666 	BUG_ON(which >= osd_req->r_num_ops);
667 	BUG_ON(!osd_req_opcode_valid(opcode));
668 
669 	op = &osd_req->r_ops[which];
670 	memset(op, 0, sizeof (*op));
671 	op->op = opcode;
672 	op->flags = flags;
673 
674 	return op;
675 }
676 
677 void osd_req_op_init(struct ceph_osd_request *osd_req,
678 		     unsigned int which, u16 opcode, u32 flags)
679 {
680 	(void)_osd_req_op_init(osd_req, which, opcode, flags);
681 }
682 EXPORT_SYMBOL(osd_req_op_init);
683 
684 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
685 				unsigned int which, u16 opcode,
686 				u64 offset, u64 length,
687 				u64 truncate_size, u32 truncate_seq)
688 {
689 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
690 						      opcode, 0);
691 	size_t payload_len = 0;
692 
693 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
694 	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
695 	       opcode != CEPH_OSD_OP_TRUNCATE);
696 
697 	op->extent.offset = offset;
698 	op->extent.length = length;
699 	op->extent.truncate_size = truncate_size;
700 	op->extent.truncate_seq = truncate_seq;
701 	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
702 		payload_len += length;
703 
704 	op->indata_len = payload_len;
705 }
706 EXPORT_SYMBOL(osd_req_op_extent_init);
707 
708 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
709 				unsigned int which, u64 length)
710 {
711 	struct ceph_osd_req_op *op;
712 	u64 previous;
713 
714 	BUG_ON(which >= osd_req->r_num_ops);
715 	op = &osd_req->r_ops[which];
716 	previous = op->extent.length;
717 
718 	if (length == previous)
719 		return;		/* Nothing to do */
720 	BUG_ON(length > previous);
721 
722 	op->extent.length = length;
723 	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
724 		op->indata_len -= previous - length;
725 }
726 EXPORT_SYMBOL(osd_req_op_extent_update);
727 
728 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
729 				unsigned int which, u64 offset_inc)
730 {
731 	struct ceph_osd_req_op *op, *prev_op;
732 
733 	BUG_ON(which + 1 >= osd_req->r_num_ops);
734 
735 	prev_op = &osd_req->r_ops[which];
736 	op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
737 	/* dup previous one */
738 	op->indata_len = prev_op->indata_len;
739 	op->outdata_len = prev_op->outdata_len;
740 	op->extent = prev_op->extent;
741 	/* adjust offset */
742 	op->extent.offset += offset_inc;
743 	op->extent.length -= offset_inc;
744 
745 	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
746 		op->indata_len -= offset_inc;
747 }
748 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
749 
750 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
751 			u16 opcode, const char *class, const char *method)
752 {
753 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
754 						      opcode, 0);
755 	struct ceph_pagelist *pagelist;
756 	size_t payload_len = 0;
757 	size_t size;
758 
759 	BUG_ON(opcode != CEPH_OSD_OP_CALL);
760 
761 	pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
762 	BUG_ON(!pagelist);
763 	ceph_pagelist_init(pagelist);
764 
765 	op->cls.class_name = class;
766 	size = strlen(class);
767 	BUG_ON(size > (size_t) U8_MAX);
768 	op->cls.class_len = size;
769 	ceph_pagelist_append(pagelist, class, size);
770 	payload_len += size;
771 
772 	op->cls.method_name = method;
773 	size = strlen(method);
774 	BUG_ON(size > (size_t) U8_MAX);
775 	op->cls.method_len = size;
776 	ceph_pagelist_append(pagelist, method, size);
777 	payload_len += size;
778 
779 	osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
780 
781 	op->indata_len = payload_len;
782 }
783 EXPORT_SYMBOL(osd_req_op_cls_init);
784 
785 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
786 			  u16 opcode, const char *name, const void *value,
787 			  size_t size, u8 cmp_op, u8 cmp_mode)
788 {
789 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
790 						      opcode, 0);
791 	struct ceph_pagelist *pagelist;
792 	size_t payload_len;
793 
794 	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
795 
796 	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
797 	if (!pagelist)
798 		return -ENOMEM;
799 
800 	ceph_pagelist_init(pagelist);
801 
802 	payload_len = strlen(name);
803 	op->xattr.name_len = payload_len;
804 	ceph_pagelist_append(pagelist, name, payload_len);
805 
806 	op->xattr.value_len = size;
807 	ceph_pagelist_append(pagelist, value, size);
808 	payload_len += size;
809 
810 	op->xattr.cmp_op = cmp_op;
811 	op->xattr.cmp_mode = cmp_mode;
812 
813 	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
814 	op->indata_len = payload_len;
815 	return 0;
816 }
817 EXPORT_SYMBOL(osd_req_op_xattr_init);
818 
819 /*
820  * @watch_opcode: CEPH_OSD_WATCH_OP_*
821  */
822 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
823 				  u64 cookie, u8 watch_opcode)
824 {
825 	struct ceph_osd_req_op *op;
826 
827 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
828 	op->watch.cookie = cookie;
829 	op->watch.op = watch_opcode;
830 	op->watch.gen = 0;
831 }
832 
833 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
834 				unsigned int which,
835 				u64 expected_object_size,
836 				u64 expected_write_size)
837 {
838 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
839 						      CEPH_OSD_OP_SETALLOCHINT,
840 						      0);
841 
842 	op->alloc_hint.expected_object_size = expected_object_size;
843 	op->alloc_hint.expected_write_size = expected_write_size;
844 
845 	/*
846 	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
847 	 * not worth a feature bit.  Set FAILOK per-op flag to make
848 	 * sure older osds don't trip over an unsupported opcode.
849 	 */
850 	op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
851 }
852 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
853 
854 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
855 				struct ceph_osd_data *osd_data)
856 {
857 	u64 length = ceph_osd_data_length(osd_data);
858 
859 	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
860 		BUG_ON(length > (u64) SIZE_MAX);
861 		if (length)
862 			ceph_msg_data_add_pages(msg, osd_data->pages,
863 					length, osd_data->alignment);
864 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
865 		BUG_ON(!length);
866 		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
867 #ifdef CONFIG_BLOCK
868 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
869 		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
870 #endif
871 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
872 		ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
873 	} else {
874 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
875 	}
876 }
877 
878 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
879 			     const struct ceph_osd_req_op *src)
880 {
881 	if (WARN_ON(!osd_req_opcode_valid(src->op))) {
882 		pr_err("unrecognized osd opcode %d\n", src->op);
883 
884 		return 0;
885 	}
886 
887 	switch (src->op) {
888 	case CEPH_OSD_OP_STAT:
889 		break;
890 	case CEPH_OSD_OP_READ:
891 	case CEPH_OSD_OP_WRITE:
892 	case CEPH_OSD_OP_WRITEFULL:
893 	case CEPH_OSD_OP_ZERO:
894 	case CEPH_OSD_OP_TRUNCATE:
895 		dst->extent.offset = cpu_to_le64(src->extent.offset);
896 		dst->extent.length = cpu_to_le64(src->extent.length);
897 		dst->extent.truncate_size =
898 			cpu_to_le64(src->extent.truncate_size);
899 		dst->extent.truncate_seq =
900 			cpu_to_le32(src->extent.truncate_seq);
901 		break;
902 	case CEPH_OSD_OP_CALL:
903 		dst->cls.class_len = src->cls.class_len;
904 		dst->cls.method_len = src->cls.method_len;
905 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
906 		break;
907 	case CEPH_OSD_OP_WATCH:
908 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
909 		dst->watch.ver = cpu_to_le64(0);
910 		dst->watch.op = src->watch.op;
911 		dst->watch.gen = cpu_to_le32(src->watch.gen);
912 		break;
913 	case CEPH_OSD_OP_NOTIFY_ACK:
914 		break;
915 	case CEPH_OSD_OP_NOTIFY:
916 		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
917 		break;
918 	case CEPH_OSD_OP_LIST_WATCHERS:
919 		break;
920 	case CEPH_OSD_OP_SETALLOCHINT:
921 		dst->alloc_hint.expected_object_size =
922 		    cpu_to_le64(src->alloc_hint.expected_object_size);
923 		dst->alloc_hint.expected_write_size =
924 		    cpu_to_le64(src->alloc_hint.expected_write_size);
925 		break;
926 	case CEPH_OSD_OP_SETXATTR:
927 	case CEPH_OSD_OP_CMPXATTR:
928 		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
929 		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
930 		dst->xattr.cmp_op = src->xattr.cmp_op;
931 		dst->xattr.cmp_mode = src->xattr.cmp_mode;
932 		break;
933 	case CEPH_OSD_OP_CREATE:
934 	case CEPH_OSD_OP_DELETE:
935 		break;
936 	default:
937 		pr_err("unsupported osd opcode %s\n",
938 			ceph_osd_op_name(src->op));
939 		WARN_ON(1);
940 
941 		return 0;
942 	}
943 
944 	dst->op = cpu_to_le16(src->op);
945 	dst->flags = cpu_to_le32(src->flags);
946 	dst->payload_len = cpu_to_le32(src->indata_len);
947 
948 	return src->indata_len;
949 }
950 
951 /*
952  * build new request AND message, calculate layout, and adjust file
953  * extent as needed.
954  *
955  * if the file was recently truncated, we include information about its
956  * old and new size so that the object can be updated appropriately.  (we
957  * avoid synchronously deleting truncated objects because it's slow.)
958  */
959 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
960 					       struct ceph_file_layout *layout,
961 					       struct ceph_vino vino,
962 					       u64 off, u64 *plen,
963 					       unsigned int which, int num_ops,
964 					       int opcode, int flags,
965 					       struct ceph_snap_context *snapc,
966 					       u32 truncate_seq,
967 					       u64 truncate_size,
968 					       bool use_mempool)
969 {
970 	struct ceph_osd_request *req;
971 	u64 objnum = 0;
972 	u64 objoff = 0;
973 	u64 objlen = 0;
974 	int r;
975 
976 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
977 	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
978 	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
979 
980 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
981 					GFP_NOFS);
982 	if (!req) {
983 		r = -ENOMEM;
984 		goto fail;
985 	}
986 
987 	/* calculate max write size */
988 	r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
989 	if (r)
990 		goto fail;
991 
992 	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
993 		osd_req_op_init(req, which, opcode, 0);
994 	} else {
995 		u32 object_size = layout->object_size;
996 		u32 object_base = off - objoff;
997 		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
998 			if (truncate_size <= object_base) {
999 				truncate_size = 0;
1000 			} else {
1001 				truncate_size -= object_base;
1002 				if (truncate_size > object_size)
1003 					truncate_size = object_size;
1004 			}
1005 		}
1006 		osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1007 				       truncate_size, truncate_seq);
1008 	}
1009 
1010 	req->r_abort_on_full = true;
1011 	req->r_flags = flags;
1012 	req->r_base_oloc.pool = layout->pool_id;
1013 	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1014 	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1015 
1016 	req->r_snapid = vino.snap;
1017 	if (flags & CEPH_OSD_FLAG_WRITE)
1018 		req->r_data_offset = off;
1019 
1020 	r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1021 	if (r)
1022 		goto fail;
1023 
1024 	return req;
1025 
1026 fail:
1027 	ceph_osdc_put_request(req);
1028 	return ERR_PTR(r);
1029 }
1030 EXPORT_SYMBOL(ceph_osdc_new_request);
1031 
1032 /*
1033  * We keep osd requests in an rbtree, sorted by ->r_tid.
1034  */
1035 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1036 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1037 
1038 static bool osd_homeless(struct ceph_osd *osd)
1039 {
1040 	return osd->o_osd == CEPH_HOMELESS_OSD;
1041 }
1042 
1043 static bool osd_registered(struct ceph_osd *osd)
1044 {
1045 	verify_osdc_locked(osd->o_osdc);
1046 
1047 	return !RB_EMPTY_NODE(&osd->o_node);
1048 }
1049 
1050 /*
1051  * Assumes @osd is zero-initialized.
1052  */
1053 static void osd_init(struct ceph_osd *osd)
1054 {
1055 	refcount_set(&osd->o_ref, 1);
1056 	RB_CLEAR_NODE(&osd->o_node);
1057 	osd->o_requests = RB_ROOT;
1058 	osd->o_linger_requests = RB_ROOT;
1059 	osd->o_backoff_mappings = RB_ROOT;
1060 	osd->o_backoffs_by_id = RB_ROOT;
1061 	INIT_LIST_HEAD(&osd->o_osd_lru);
1062 	INIT_LIST_HEAD(&osd->o_keepalive_item);
1063 	osd->o_incarnation = 1;
1064 	mutex_init(&osd->lock);
1065 }
1066 
1067 static void osd_cleanup(struct ceph_osd *osd)
1068 {
1069 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1070 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1071 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1072 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1073 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1074 	WARN_ON(!list_empty(&osd->o_osd_lru));
1075 	WARN_ON(!list_empty(&osd->o_keepalive_item));
1076 
1077 	if (osd->o_auth.authorizer) {
1078 		WARN_ON(osd_homeless(osd));
1079 		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1080 	}
1081 }
1082 
1083 /*
1084  * Track open sessions with osds.
1085  */
1086 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1087 {
1088 	struct ceph_osd *osd;
1089 
1090 	WARN_ON(onum == CEPH_HOMELESS_OSD);
1091 
1092 	osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1093 	osd_init(osd);
1094 	osd->o_osdc = osdc;
1095 	osd->o_osd = onum;
1096 
1097 	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1098 
1099 	return osd;
1100 }
1101 
1102 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1103 {
1104 	if (refcount_inc_not_zero(&osd->o_ref)) {
1105 		dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1106 		     refcount_read(&osd->o_ref));
1107 		return osd;
1108 	} else {
1109 		dout("get_osd %p FAIL\n", osd);
1110 		return NULL;
1111 	}
1112 }
1113 
1114 static void put_osd(struct ceph_osd *osd)
1115 {
1116 	dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1117 	     refcount_read(&osd->o_ref) - 1);
1118 	if (refcount_dec_and_test(&osd->o_ref)) {
1119 		osd_cleanup(osd);
1120 		kfree(osd);
1121 	}
1122 }
1123 
1124 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1125 
1126 static void __move_osd_to_lru(struct ceph_osd *osd)
1127 {
1128 	struct ceph_osd_client *osdc = osd->o_osdc;
1129 
1130 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1131 	BUG_ON(!list_empty(&osd->o_osd_lru));
1132 
1133 	spin_lock(&osdc->osd_lru_lock);
1134 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1135 	spin_unlock(&osdc->osd_lru_lock);
1136 
1137 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1138 }
1139 
1140 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1141 {
1142 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1143 	    RB_EMPTY_ROOT(&osd->o_linger_requests))
1144 		__move_osd_to_lru(osd);
1145 }
1146 
1147 static void __remove_osd_from_lru(struct ceph_osd *osd)
1148 {
1149 	struct ceph_osd_client *osdc = osd->o_osdc;
1150 
1151 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1152 
1153 	spin_lock(&osdc->osd_lru_lock);
1154 	if (!list_empty(&osd->o_osd_lru))
1155 		list_del_init(&osd->o_osd_lru);
1156 	spin_unlock(&osdc->osd_lru_lock);
1157 }
1158 
1159 /*
1160  * Close the connection and assign any leftover requests to the
1161  * homeless session.
1162  */
1163 static void close_osd(struct ceph_osd *osd)
1164 {
1165 	struct ceph_osd_client *osdc = osd->o_osdc;
1166 	struct rb_node *n;
1167 
1168 	verify_osdc_wrlocked(osdc);
1169 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1170 
1171 	ceph_con_close(&osd->o_con);
1172 
1173 	for (n = rb_first(&osd->o_requests); n; ) {
1174 		struct ceph_osd_request *req =
1175 		    rb_entry(n, struct ceph_osd_request, r_node);
1176 
1177 		n = rb_next(n); /* unlink_request() */
1178 
1179 		dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1180 		unlink_request(osd, req);
1181 		link_request(&osdc->homeless_osd, req);
1182 	}
1183 	for (n = rb_first(&osd->o_linger_requests); n; ) {
1184 		struct ceph_osd_linger_request *lreq =
1185 		    rb_entry(n, struct ceph_osd_linger_request, node);
1186 
1187 		n = rb_next(n); /* unlink_linger() */
1188 
1189 		dout(" reassigning lreq %p linger_id %llu\n", lreq,
1190 		     lreq->linger_id);
1191 		unlink_linger(osd, lreq);
1192 		link_linger(&osdc->homeless_osd, lreq);
1193 	}
1194 	clear_backoffs(osd);
1195 
1196 	__remove_osd_from_lru(osd);
1197 	erase_osd(&osdc->osds, osd);
1198 	put_osd(osd);
1199 }
1200 
1201 /*
1202  * reset osd connect
1203  */
1204 static int reopen_osd(struct ceph_osd *osd)
1205 {
1206 	struct ceph_entity_addr *peer_addr;
1207 
1208 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1209 
1210 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1211 	    RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1212 		close_osd(osd);
1213 		return -ENODEV;
1214 	}
1215 
1216 	peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1217 	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1218 			!ceph_con_opened(&osd->o_con)) {
1219 		struct rb_node *n;
1220 
1221 		dout("osd addr hasn't changed and connection never opened, "
1222 		     "letting msgr retry\n");
1223 		/* touch each r_stamp for handle_timeout()'s benfit */
1224 		for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1225 			struct ceph_osd_request *req =
1226 			    rb_entry(n, struct ceph_osd_request, r_node);
1227 			req->r_stamp = jiffies;
1228 		}
1229 
1230 		return -EAGAIN;
1231 	}
1232 
1233 	ceph_con_close(&osd->o_con);
1234 	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1235 	osd->o_incarnation++;
1236 
1237 	return 0;
1238 }
1239 
1240 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1241 					  bool wrlocked)
1242 {
1243 	struct ceph_osd *osd;
1244 
1245 	if (wrlocked)
1246 		verify_osdc_wrlocked(osdc);
1247 	else
1248 		verify_osdc_locked(osdc);
1249 
1250 	if (o != CEPH_HOMELESS_OSD)
1251 		osd = lookup_osd(&osdc->osds, o);
1252 	else
1253 		osd = &osdc->homeless_osd;
1254 	if (!osd) {
1255 		if (!wrlocked)
1256 			return ERR_PTR(-EAGAIN);
1257 
1258 		osd = create_osd(osdc, o);
1259 		insert_osd(&osdc->osds, osd);
1260 		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1261 			      &osdc->osdmap->osd_addr[osd->o_osd]);
1262 	}
1263 
1264 	dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1265 	return osd;
1266 }
1267 
1268 /*
1269  * Create request <-> OSD session relation.
1270  *
1271  * @req has to be assigned a tid, @osd may be homeless.
1272  */
1273 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1274 {
1275 	verify_osd_locked(osd);
1276 	WARN_ON(!req->r_tid || req->r_osd);
1277 	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1278 	     req, req->r_tid);
1279 
1280 	if (!osd_homeless(osd))
1281 		__remove_osd_from_lru(osd);
1282 	else
1283 		atomic_inc(&osd->o_osdc->num_homeless);
1284 
1285 	get_osd(osd);
1286 	insert_request(&osd->o_requests, req);
1287 	req->r_osd = osd;
1288 }
1289 
1290 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1291 {
1292 	verify_osd_locked(osd);
1293 	WARN_ON(req->r_osd != osd);
1294 	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1295 	     req, req->r_tid);
1296 
1297 	req->r_osd = NULL;
1298 	erase_request(&osd->o_requests, req);
1299 	put_osd(osd);
1300 
1301 	if (!osd_homeless(osd))
1302 		maybe_move_osd_to_lru(osd);
1303 	else
1304 		atomic_dec(&osd->o_osdc->num_homeless);
1305 }
1306 
1307 static bool __pool_full(struct ceph_pg_pool_info *pi)
1308 {
1309 	return pi->flags & CEPH_POOL_FLAG_FULL;
1310 }
1311 
1312 static bool have_pool_full(struct ceph_osd_client *osdc)
1313 {
1314 	struct rb_node *n;
1315 
1316 	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1317 		struct ceph_pg_pool_info *pi =
1318 		    rb_entry(n, struct ceph_pg_pool_info, node);
1319 
1320 		if (__pool_full(pi))
1321 			return true;
1322 	}
1323 
1324 	return false;
1325 }
1326 
1327 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1328 {
1329 	struct ceph_pg_pool_info *pi;
1330 
1331 	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1332 	if (!pi)
1333 		return false;
1334 
1335 	return __pool_full(pi);
1336 }
1337 
1338 /*
1339  * Returns whether a request should be blocked from being sent
1340  * based on the current osdmap and osd_client settings.
1341  */
1342 static bool target_should_be_paused(struct ceph_osd_client *osdc,
1343 				    const struct ceph_osd_request_target *t,
1344 				    struct ceph_pg_pool_info *pi)
1345 {
1346 	bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1347 	bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1348 		       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1349 		       __pool_full(pi);
1350 
1351 	WARN_ON(pi->id != t->target_oloc.pool);
1352 	return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1353 	       ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1354 	       (osdc->osdmap->epoch < osdc->epoch_barrier);
1355 }
1356 
1357 enum calc_target_result {
1358 	CALC_TARGET_NO_ACTION = 0,
1359 	CALC_TARGET_NEED_RESEND,
1360 	CALC_TARGET_POOL_DNE,
1361 };
1362 
1363 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1364 					   struct ceph_osd_request_target *t,
1365 					   struct ceph_connection *con,
1366 					   bool any_change)
1367 {
1368 	struct ceph_pg_pool_info *pi;
1369 	struct ceph_pg pgid, last_pgid;
1370 	struct ceph_osds up, acting;
1371 	bool force_resend = false;
1372 	bool unpaused = false;
1373 	bool legacy_change;
1374 	bool split = false;
1375 	bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1376 	bool recovery_deletes = ceph_osdmap_flag(osdc,
1377 						 CEPH_OSDMAP_RECOVERY_DELETES);
1378 	enum calc_target_result ct_res;
1379 	int ret;
1380 
1381 	t->epoch = osdc->osdmap->epoch;
1382 	pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1383 	if (!pi) {
1384 		t->osd = CEPH_HOMELESS_OSD;
1385 		ct_res = CALC_TARGET_POOL_DNE;
1386 		goto out;
1387 	}
1388 
1389 	if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1390 		if (t->last_force_resend < pi->last_force_request_resend) {
1391 			t->last_force_resend = pi->last_force_request_resend;
1392 			force_resend = true;
1393 		} else if (t->last_force_resend == 0) {
1394 			force_resend = true;
1395 		}
1396 	}
1397 
1398 	/* apply tiering */
1399 	ceph_oid_copy(&t->target_oid, &t->base_oid);
1400 	ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1401 	if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1402 		if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1403 			t->target_oloc.pool = pi->read_tier;
1404 		if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1405 			t->target_oloc.pool = pi->write_tier;
1406 
1407 		pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1408 		if (!pi) {
1409 			t->osd = CEPH_HOMELESS_OSD;
1410 			ct_res = CALC_TARGET_POOL_DNE;
1411 			goto out;
1412 		}
1413 	}
1414 
1415 	ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
1416 					  &pgid);
1417 	if (ret) {
1418 		WARN_ON(ret != -ENOENT);
1419 		t->osd = CEPH_HOMELESS_OSD;
1420 		ct_res = CALC_TARGET_POOL_DNE;
1421 		goto out;
1422 	}
1423 	last_pgid.pool = pgid.pool;
1424 	last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1425 
1426 	ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1427 	if (any_change &&
1428 	    ceph_is_new_interval(&t->acting,
1429 				 &acting,
1430 				 &t->up,
1431 				 &up,
1432 				 t->size,
1433 				 pi->size,
1434 				 t->min_size,
1435 				 pi->min_size,
1436 				 t->pg_num,
1437 				 pi->pg_num,
1438 				 t->sort_bitwise,
1439 				 sort_bitwise,
1440 				 t->recovery_deletes,
1441 				 recovery_deletes,
1442 				 &last_pgid))
1443 		force_resend = true;
1444 
1445 	if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1446 		t->paused = false;
1447 		unpaused = true;
1448 	}
1449 	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1450 			ceph_osds_changed(&t->acting, &acting, any_change);
1451 	if (t->pg_num)
1452 		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1453 
1454 	if (legacy_change || force_resend || split) {
1455 		t->pgid = pgid; /* struct */
1456 		ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1457 		ceph_osds_copy(&t->acting, &acting);
1458 		ceph_osds_copy(&t->up, &up);
1459 		t->size = pi->size;
1460 		t->min_size = pi->min_size;
1461 		t->pg_num = pi->pg_num;
1462 		t->pg_num_mask = pi->pg_num_mask;
1463 		t->sort_bitwise = sort_bitwise;
1464 		t->recovery_deletes = recovery_deletes;
1465 
1466 		t->osd = acting.primary;
1467 	}
1468 
1469 	if (unpaused || legacy_change || force_resend ||
1470 	    (split && con && CEPH_HAVE_FEATURE(con->peer_features,
1471 					       RESEND_ON_SPLIT)))
1472 		ct_res = CALC_TARGET_NEED_RESEND;
1473 	else
1474 		ct_res = CALC_TARGET_NO_ACTION;
1475 
1476 out:
1477 	dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1478 	return ct_res;
1479 }
1480 
1481 static struct ceph_spg_mapping *alloc_spg_mapping(void)
1482 {
1483 	struct ceph_spg_mapping *spg;
1484 
1485 	spg = kmalloc(sizeof(*spg), GFP_NOIO);
1486 	if (!spg)
1487 		return NULL;
1488 
1489 	RB_CLEAR_NODE(&spg->node);
1490 	spg->backoffs = RB_ROOT;
1491 	return spg;
1492 }
1493 
1494 static void free_spg_mapping(struct ceph_spg_mapping *spg)
1495 {
1496 	WARN_ON(!RB_EMPTY_NODE(&spg->node));
1497 	WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1498 
1499 	kfree(spg);
1500 }
1501 
1502 /*
1503  * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1504  * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1505  * defined only within a specific spgid; it does not pass anything to
1506  * children on split, or to another primary.
1507  */
1508 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1509 		 RB_BYPTR, const struct ceph_spg *, node)
1510 
1511 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1512 {
1513 	return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1514 }
1515 
1516 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1517 				   void **pkey, size_t *pkey_len)
1518 {
1519 	if (hoid->key_len) {
1520 		*pkey = hoid->key;
1521 		*pkey_len = hoid->key_len;
1522 	} else {
1523 		*pkey = hoid->oid;
1524 		*pkey_len = hoid->oid_len;
1525 	}
1526 }
1527 
1528 static int compare_names(const void *name1, size_t name1_len,
1529 			 const void *name2, size_t name2_len)
1530 {
1531 	int ret;
1532 
1533 	ret = memcmp(name1, name2, min(name1_len, name2_len));
1534 	if (!ret) {
1535 		if (name1_len < name2_len)
1536 			ret = -1;
1537 		else if (name1_len > name2_len)
1538 			ret = 1;
1539 	}
1540 	return ret;
1541 }
1542 
1543 static int hoid_compare(const struct ceph_hobject_id *lhs,
1544 			const struct ceph_hobject_id *rhs)
1545 {
1546 	void *effective_key1, *effective_key2;
1547 	size_t effective_key1_len, effective_key2_len;
1548 	int ret;
1549 
1550 	if (lhs->is_max < rhs->is_max)
1551 		return -1;
1552 	if (lhs->is_max > rhs->is_max)
1553 		return 1;
1554 
1555 	if (lhs->pool < rhs->pool)
1556 		return -1;
1557 	if (lhs->pool > rhs->pool)
1558 		return 1;
1559 
1560 	if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1561 		return -1;
1562 	if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1563 		return 1;
1564 
1565 	ret = compare_names(lhs->nspace, lhs->nspace_len,
1566 			    rhs->nspace, rhs->nspace_len);
1567 	if (ret)
1568 		return ret;
1569 
1570 	hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1571 	hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1572 	ret = compare_names(effective_key1, effective_key1_len,
1573 			    effective_key2, effective_key2_len);
1574 	if (ret)
1575 		return ret;
1576 
1577 	ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1578 	if (ret)
1579 		return ret;
1580 
1581 	if (lhs->snapid < rhs->snapid)
1582 		return -1;
1583 	if (lhs->snapid > rhs->snapid)
1584 		return 1;
1585 
1586 	return 0;
1587 }
1588 
1589 /*
1590  * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1591  * compat stuff here.
1592  *
1593  * Assumes @hoid is zero-initialized.
1594  */
1595 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1596 {
1597 	u8 struct_v;
1598 	u32 struct_len;
1599 	int ret;
1600 
1601 	ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1602 				  &struct_len);
1603 	if (ret)
1604 		return ret;
1605 
1606 	if (struct_v < 4) {
1607 		pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1608 		goto e_inval;
1609 	}
1610 
1611 	hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1612 						GFP_NOIO);
1613 	if (IS_ERR(hoid->key)) {
1614 		ret = PTR_ERR(hoid->key);
1615 		hoid->key = NULL;
1616 		return ret;
1617 	}
1618 
1619 	hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1620 						GFP_NOIO);
1621 	if (IS_ERR(hoid->oid)) {
1622 		ret = PTR_ERR(hoid->oid);
1623 		hoid->oid = NULL;
1624 		return ret;
1625 	}
1626 
1627 	ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1628 	ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1629 	ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1630 
1631 	hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1632 						   GFP_NOIO);
1633 	if (IS_ERR(hoid->nspace)) {
1634 		ret = PTR_ERR(hoid->nspace);
1635 		hoid->nspace = NULL;
1636 		return ret;
1637 	}
1638 
1639 	ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1640 
1641 	ceph_hoid_build_hash_cache(hoid);
1642 	return 0;
1643 
1644 e_inval:
1645 	return -EINVAL;
1646 }
1647 
1648 static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1649 {
1650 	return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1651 	       4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1652 }
1653 
1654 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1655 {
1656 	ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1657 	ceph_encode_string(p, end, hoid->key, hoid->key_len);
1658 	ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1659 	ceph_encode_64(p, hoid->snapid);
1660 	ceph_encode_32(p, hoid->hash);
1661 	ceph_encode_8(p, hoid->is_max);
1662 	ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1663 	ceph_encode_64(p, hoid->pool);
1664 }
1665 
1666 static void free_hoid(struct ceph_hobject_id *hoid)
1667 {
1668 	if (hoid) {
1669 		kfree(hoid->key);
1670 		kfree(hoid->oid);
1671 		kfree(hoid->nspace);
1672 		kfree(hoid);
1673 	}
1674 }
1675 
1676 static struct ceph_osd_backoff *alloc_backoff(void)
1677 {
1678 	struct ceph_osd_backoff *backoff;
1679 
1680 	backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1681 	if (!backoff)
1682 		return NULL;
1683 
1684 	RB_CLEAR_NODE(&backoff->spg_node);
1685 	RB_CLEAR_NODE(&backoff->id_node);
1686 	return backoff;
1687 }
1688 
1689 static void free_backoff(struct ceph_osd_backoff *backoff)
1690 {
1691 	WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1692 	WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1693 
1694 	free_hoid(backoff->begin);
1695 	free_hoid(backoff->end);
1696 	kfree(backoff);
1697 }
1698 
1699 /*
1700  * Within a specific spgid, backoffs are managed by ->begin hoid.
1701  */
1702 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1703 			RB_BYVAL, spg_node);
1704 
1705 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1706 					    const struct ceph_hobject_id *hoid)
1707 {
1708 	struct rb_node *n = root->rb_node;
1709 
1710 	while (n) {
1711 		struct ceph_osd_backoff *cur =
1712 		    rb_entry(n, struct ceph_osd_backoff, spg_node);
1713 		int cmp;
1714 
1715 		cmp = hoid_compare(hoid, cur->begin);
1716 		if (cmp < 0) {
1717 			n = n->rb_left;
1718 		} else if (cmp > 0) {
1719 			if (hoid_compare(hoid, cur->end) < 0)
1720 				return cur;
1721 
1722 			n = n->rb_right;
1723 		} else {
1724 			return cur;
1725 		}
1726 	}
1727 
1728 	return NULL;
1729 }
1730 
1731 /*
1732  * Each backoff has a unique id within its OSD session.
1733  */
1734 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1735 
1736 static void clear_backoffs(struct ceph_osd *osd)
1737 {
1738 	while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1739 		struct ceph_spg_mapping *spg =
1740 		    rb_entry(rb_first(&osd->o_backoff_mappings),
1741 			     struct ceph_spg_mapping, node);
1742 
1743 		while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1744 			struct ceph_osd_backoff *backoff =
1745 			    rb_entry(rb_first(&spg->backoffs),
1746 				     struct ceph_osd_backoff, spg_node);
1747 
1748 			erase_backoff(&spg->backoffs, backoff);
1749 			erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1750 			free_backoff(backoff);
1751 		}
1752 		erase_spg_mapping(&osd->o_backoff_mappings, spg);
1753 		free_spg_mapping(spg);
1754 	}
1755 }
1756 
1757 /*
1758  * Set up a temporary, non-owning view into @t.
1759  */
1760 static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1761 				  const struct ceph_osd_request_target *t)
1762 {
1763 	hoid->key = NULL;
1764 	hoid->key_len = 0;
1765 	hoid->oid = t->target_oid.name;
1766 	hoid->oid_len = t->target_oid.name_len;
1767 	hoid->snapid = CEPH_NOSNAP;
1768 	hoid->hash = t->pgid.seed;
1769 	hoid->is_max = false;
1770 	if (t->target_oloc.pool_ns) {
1771 		hoid->nspace = t->target_oloc.pool_ns->str;
1772 		hoid->nspace_len = t->target_oloc.pool_ns->len;
1773 	} else {
1774 		hoid->nspace = NULL;
1775 		hoid->nspace_len = 0;
1776 	}
1777 	hoid->pool = t->target_oloc.pool;
1778 	ceph_hoid_build_hash_cache(hoid);
1779 }
1780 
1781 static bool should_plug_request(struct ceph_osd_request *req)
1782 {
1783 	struct ceph_osd *osd = req->r_osd;
1784 	struct ceph_spg_mapping *spg;
1785 	struct ceph_osd_backoff *backoff;
1786 	struct ceph_hobject_id hoid;
1787 
1788 	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1789 	if (!spg)
1790 		return false;
1791 
1792 	hoid_fill_from_target(&hoid, &req->r_t);
1793 	backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
1794 	if (!backoff)
1795 		return false;
1796 
1797 	dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1798 	     __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
1799 	     backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
1800 	return true;
1801 }
1802 
1803 static void setup_request_data(struct ceph_osd_request *req,
1804 			       struct ceph_msg *msg)
1805 {
1806 	u32 data_len = 0;
1807 	int i;
1808 
1809 	if (!list_empty(&msg->data))
1810 		return;
1811 
1812 	WARN_ON(msg->data_length);
1813 	for (i = 0; i < req->r_num_ops; i++) {
1814 		struct ceph_osd_req_op *op = &req->r_ops[i];
1815 
1816 		switch (op->op) {
1817 		/* request */
1818 		case CEPH_OSD_OP_WRITE:
1819 		case CEPH_OSD_OP_WRITEFULL:
1820 			WARN_ON(op->indata_len != op->extent.length);
1821 			ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1822 			break;
1823 		case CEPH_OSD_OP_SETXATTR:
1824 		case CEPH_OSD_OP_CMPXATTR:
1825 			WARN_ON(op->indata_len != op->xattr.name_len +
1826 						  op->xattr.value_len);
1827 			ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1828 			break;
1829 		case CEPH_OSD_OP_NOTIFY_ACK:
1830 			ceph_osdc_msg_data_add(msg,
1831 					       &op->notify_ack.request_data);
1832 			break;
1833 
1834 		/* reply */
1835 		case CEPH_OSD_OP_STAT:
1836 			ceph_osdc_msg_data_add(req->r_reply,
1837 					       &op->raw_data_in);
1838 			break;
1839 		case CEPH_OSD_OP_READ:
1840 			ceph_osdc_msg_data_add(req->r_reply,
1841 					       &op->extent.osd_data);
1842 			break;
1843 		case CEPH_OSD_OP_LIST_WATCHERS:
1844 			ceph_osdc_msg_data_add(req->r_reply,
1845 					       &op->list_watchers.response_data);
1846 			break;
1847 
1848 		/* both */
1849 		case CEPH_OSD_OP_CALL:
1850 			WARN_ON(op->indata_len != op->cls.class_len +
1851 						  op->cls.method_len +
1852 						  op->cls.indata_len);
1853 			ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1854 			/* optional, can be NONE */
1855 			ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1856 			/* optional, can be NONE */
1857 			ceph_osdc_msg_data_add(req->r_reply,
1858 					       &op->cls.response_data);
1859 			break;
1860 		case CEPH_OSD_OP_NOTIFY:
1861 			ceph_osdc_msg_data_add(msg,
1862 					       &op->notify.request_data);
1863 			ceph_osdc_msg_data_add(req->r_reply,
1864 					       &op->notify.response_data);
1865 			break;
1866 		}
1867 
1868 		data_len += op->indata_len;
1869 	}
1870 
1871 	WARN_ON(data_len != msg->data_length);
1872 }
1873 
1874 static void encode_pgid(void **p, const struct ceph_pg *pgid)
1875 {
1876 	ceph_encode_8(p, 1);
1877 	ceph_encode_64(p, pgid->pool);
1878 	ceph_encode_32(p, pgid->seed);
1879 	ceph_encode_32(p, -1); /* preferred */
1880 }
1881 
1882 static void encode_spgid(void **p, const struct ceph_spg *spgid)
1883 {
1884 	ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
1885 	encode_pgid(p, &spgid->pgid);
1886 	ceph_encode_8(p, spgid->shard);
1887 }
1888 
1889 static void encode_oloc(void **p, void *end,
1890 			const struct ceph_object_locator *oloc)
1891 {
1892 	ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
1893 	ceph_encode_64(p, oloc->pool);
1894 	ceph_encode_32(p, -1); /* preferred */
1895 	ceph_encode_32(p, 0);  /* key len */
1896 	if (oloc->pool_ns)
1897 		ceph_encode_string(p, end, oloc->pool_ns->str,
1898 				   oloc->pool_ns->len);
1899 	else
1900 		ceph_encode_32(p, 0);
1901 }
1902 
1903 static void encode_request_partial(struct ceph_osd_request *req,
1904 				   struct ceph_msg *msg)
1905 {
1906 	void *p = msg->front.iov_base;
1907 	void *const end = p + msg->front_alloc_len;
1908 	u32 data_len = 0;
1909 	int i;
1910 
1911 	if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1912 		/* snapshots aren't writeable */
1913 		WARN_ON(req->r_snapid != CEPH_NOSNAP);
1914 	} else {
1915 		WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1916 			req->r_data_offset || req->r_snapc);
1917 	}
1918 
1919 	setup_request_data(req, msg);
1920 
1921 	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1922 	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
1923 	ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1924 	ceph_encode_32(&p, req->r_flags);
1925 
1926 	/* reqid */
1927 	ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
1928 	memset(p, 0, sizeof(struct ceph_osd_reqid));
1929 	p += sizeof(struct ceph_osd_reqid);
1930 
1931 	/* trace */
1932 	memset(p, 0, sizeof(struct ceph_blkin_trace_info));
1933 	p += sizeof(struct ceph_blkin_trace_info);
1934 
1935 	ceph_encode_32(&p, 0); /* client_inc, always 0 */
1936 	ceph_encode_timespec(p, &req->r_mtime);
1937 	p += sizeof(struct ceph_timespec);
1938 
1939 	encode_oloc(&p, end, &req->r_t.target_oloc);
1940 	ceph_encode_string(&p, end, req->r_t.target_oid.name,
1941 			   req->r_t.target_oid.name_len);
1942 
1943 	/* ops, can imply data */
1944 	ceph_encode_16(&p, req->r_num_ops);
1945 	for (i = 0; i < req->r_num_ops; i++) {
1946 		data_len += osd_req_encode_op(p, &req->r_ops[i]);
1947 		p += sizeof(struct ceph_osd_op);
1948 	}
1949 
1950 	ceph_encode_64(&p, req->r_snapid); /* snapid */
1951 	if (req->r_snapc) {
1952 		ceph_encode_64(&p, req->r_snapc->seq);
1953 		ceph_encode_32(&p, req->r_snapc->num_snaps);
1954 		for (i = 0; i < req->r_snapc->num_snaps; i++)
1955 			ceph_encode_64(&p, req->r_snapc->snaps[i]);
1956 	} else {
1957 		ceph_encode_64(&p, 0); /* snap_seq */
1958 		ceph_encode_32(&p, 0); /* snaps len */
1959 	}
1960 
1961 	ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1962 	BUG_ON(p > end - 8); /* space for features */
1963 
1964 	msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
1965 	/* front_len is finalized in encode_request_finish() */
1966 	msg->front.iov_len = p - msg->front.iov_base;
1967 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1968 	msg->hdr.data_len = cpu_to_le32(data_len);
1969 	/*
1970 	 * The header "data_off" is a hint to the receiver allowing it
1971 	 * to align received data into its buffers such that there's no
1972 	 * need to re-copy it before writing it to disk (direct I/O).
1973 	 */
1974 	msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1975 
1976 	dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
1977 	     req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1978 }
1979 
1980 static void encode_request_finish(struct ceph_msg *msg)
1981 {
1982 	void *p = msg->front.iov_base;
1983 	void *const partial_end = p + msg->front.iov_len;
1984 	void *const end = p + msg->front_alloc_len;
1985 
1986 	if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
1987 		/* luminous OSD -- encode features and be done */
1988 		p = partial_end;
1989 		ceph_encode_64(&p, msg->con->peer_features);
1990 	} else {
1991 		struct {
1992 			char spgid[CEPH_ENCODING_START_BLK_LEN +
1993 				   CEPH_PGID_ENCODING_LEN + 1];
1994 			__le32 hash;
1995 			__le32 epoch;
1996 			__le32 flags;
1997 			char reqid[CEPH_ENCODING_START_BLK_LEN +
1998 				   sizeof(struct ceph_osd_reqid)];
1999 			char trace[sizeof(struct ceph_blkin_trace_info)];
2000 			__le32 client_inc;
2001 			struct ceph_timespec mtime;
2002 		} __packed head;
2003 		struct ceph_pg pgid;
2004 		void *oloc, *oid, *tail;
2005 		int oloc_len, oid_len, tail_len;
2006 		int len;
2007 
2008 		/*
2009 		 * Pre-luminous OSD -- reencode v8 into v4 using @head
2010 		 * as a temporary buffer.  Encode the raw PG; the rest
2011 		 * is just a matter of moving oloc, oid and tail blobs
2012 		 * around.
2013 		 */
2014 		memcpy(&head, p, sizeof(head));
2015 		p += sizeof(head);
2016 
2017 		oloc = p;
2018 		p += CEPH_ENCODING_START_BLK_LEN;
2019 		pgid.pool = ceph_decode_64(&p);
2020 		p += 4 + 4; /* preferred, key len */
2021 		len = ceph_decode_32(&p);
2022 		p += len;   /* nspace */
2023 		oloc_len = p - oloc;
2024 
2025 		oid = p;
2026 		len = ceph_decode_32(&p);
2027 		p += len;
2028 		oid_len = p - oid;
2029 
2030 		tail = p;
2031 		tail_len = partial_end - p;
2032 
2033 		p = msg->front.iov_base;
2034 		ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2035 		ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2036 		ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2037 		ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2038 
2039 		/* reassert_version */
2040 		memset(p, 0, sizeof(struct ceph_eversion));
2041 		p += sizeof(struct ceph_eversion);
2042 
2043 		BUG_ON(p >= oloc);
2044 		memmove(p, oloc, oloc_len);
2045 		p += oloc_len;
2046 
2047 		pgid.seed = le32_to_cpu(head.hash);
2048 		encode_pgid(&p, &pgid); /* raw pg */
2049 
2050 		BUG_ON(p >= oid);
2051 		memmove(p, oid, oid_len);
2052 		p += oid_len;
2053 
2054 		/* tail -- ops, snapid, snapc, retry_attempt */
2055 		BUG_ON(p >= tail);
2056 		memmove(p, tail, tail_len);
2057 		p += tail_len;
2058 
2059 		msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2060 	}
2061 
2062 	BUG_ON(p > end);
2063 	msg->front.iov_len = p - msg->front.iov_base;
2064 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2065 
2066 	dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2067 	     le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2068 	     le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2069 	     le16_to_cpu(msg->hdr.version));
2070 }
2071 
2072 /*
2073  * @req has to be assigned a tid and registered.
2074  */
2075 static void send_request(struct ceph_osd_request *req)
2076 {
2077 	struct ceph_osd *osd = req->r_osd;
2078 
2079 	verify_osd_locked(osd);
2080 	WARN_ON(osd->o_osd != req->r_t.osd);
2081 
2082 	/* backoff? */
2083 	if (should_plug_request(req))
2084 		return;
2085 
2086 	/*
2087 	 * We may have a previously queued request message hanging
2088 	 * around.  Cancel it to avoid corrupting the msgr.
2089 	 */
2090 	if (req->r_sent)
2091 		ceph_msg_revoke(req->r_request);
2092 
2093 	req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2094 	if (req->r_attempts)
2095 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
2096 	else
2097 		WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2098 
2099 	encode_request_partial(req, req->r_request);
2100 
2101 	dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2102 	     __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2103 	     req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2104 	     req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2105 	     req->r_attempts);
2106 
2107 	req->r_t.paused = false;
2108 	req->r_stamp = jiffies;
2109 	req->r_attempts++;
2110 
2111 	req->r_sent = osd->o_incarnation;
2112 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2113 	ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2114 }
2115 
2116 static void maybe_request_map(struct ceph_osd_client *osdc)
2117 {
2118 	bool continuous = false;
2119 
2120 	verify_osdc_locked(osdc);
2121 	WARN_ON(!osdc->osdmap->epoch);
2122 
2123 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2124 	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2125 	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2126 		dout("%s osdc %p continuous\n", __func__, osdc);
2127 		continuous = true;
2128 	} else {
2129 		dout("%s osdc %p onetime\n", __func__, osdc);
2130 	}
2131 
2132 	if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2133 			       osdc->osdmap->epoch + 1, continuous))
2134 		ceph_monc_renew_subs(&osdc->client->monc);
2135 }
2136 
2137 static void complete_request(struct ceph_osd_request *req, int err);
2138 static void send_map_check(struct ceph_osd_request *req);
2139 
2140 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2141 {
2142 	struct ceph_osd_client *osdc = req->r_osdc;
2143 	struct ceph_osd *osd;
2144 	enum calc_target_result ct_res;
2145 	bool need_send = false;
2146 	bool promoted = false;
2147 	bool need_abort = false;
2148 
2149 	WARN_ON(req->r_tid);
2150 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2151 
2152 again:
2153 	ct_res = calc_target(osdc, &req->r_t, NULL, false);
2154 	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2155 		goto promote;
2156 
2157 	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2158 	if (IS_ERR(osd)) {
2159 		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2160 		goto promote;
2161 	}
2162 
2163 	if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2164 		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2165 		     osdc->epoch_barrier);
2166 		req->r_t.paused = true;
2167 		maybe_request_map(osdc);
2168 	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2169 		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2170 		dout("req %p pausewr\n", req);
2171 		req->r_t.paused = true;
2172 		maybe_request_map(osdc);
2173 	} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2174 		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2175 		dout("req %p pauserd\n", req);
2176 		req->r_t.paused = true;
2177 		maybe_request_map(osdc);
2178 	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2179 		   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2180 				     CEPH_OSD_FLAG_FULL_FORCE)) &&
2181 		   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2182 		    pool_full(osdc, req->r_t.base_oloc.pool))) {
2183 		dout("req %p full/pool_full\n", req);
2184 		pr_warn_ratelimited("FULL or reached pool quota\n");
2185 		req->r_t.paused = true;
2186 		maybe_request_map(osdc);
2187 		if (req->r_abort_on_full)
2188 			need_abort = true;
2189 	} else if (!osd_homeless(osd)) {
2190 		need_send = true;
2191 	} else {
2192 		maybe_request_map(osdc);
2193 	}
2194 
2195 	mutex_lock(&osd->lock);
2196 	/*
2197 	 * Assign the tid atomically with send_request() to protect
2198 	 * multiple writes to the same object from racing with each
2199 	 * other, resulting in out of order ops on the OSDs.
2200 	 */
2201 	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2202 	link_request(osd, req);
2203 	if (need_send)
2204 		send_request(req);
2205 	else if (need_abort)
2206 		complete_request(req, -ENOSPC);
2207 	mutex_unlock(&osd->lock);
2208 
2209 	if (ct_res == CALC_TARGET_POOL_DNE)
2210 		send_map_check(req);
2211 
2212 	if (promoted)
2213 		downgrade_write(&osdc->lock);
2214 	return;
2215 
2216 promote:
2217 	up_read(&osdc->lock);
2218 	down_write(&osdc->lock);
2219 	wrlocked = true;
2220 	promoted = true;
2221 	goto again;
2222 }
2223 
2224 static void account_request(struct ceph_osd_request *req)
2225 {
2226 	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2227 	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2228 
2229 	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2230 	atomic_inc(&req->r_osdc->num_requests);
2231 
2232 	req->r_start_stamp = jiffies;
2233 }
2234 
2235 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2236 {
2237 	ceph_osdc_get_request(req);
2238 	account_request(req);
2239 	__submit_request(req, wrlocked);
2240 }
2241 
2242 static void finish_request(struct ceph_osd_request *req)
2243 {
2244 	struct ceph_osd_client *osdc = req->r_osdc;
2245 
2246 	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2247 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2248 
2249 	if (req->r_osd)
2250 		unlink_request(req->r_osd, req);
2251 	atomic_dec(&osdc->num_requests);
2252 
2253 	/*
2254 	 * If an OSD has failed or returned and a request has been sent
2255 	 * twice, it's possible to get a reply and end up here while the
2256 	 * request message is queued for delivery.  We will ignore the
2257 	 * reply, so not a big deal, but better to try and catch it.
2258 	 */
2259 	ceph_msg_revoke(req->r_request);
2260 	ceph_msg_revoke_incoming(req->r_reply);
2261 }
2262 
2263 static void __complete_request(struct ceph_osd_request *req)
2264 {
2265 	if (req->r_callback) {
2266 		dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
2267 		     req->r_tid, req->r_callback, req->r_result);
2268 		req->r_callback(req);
2269 	}
2270 }
2271 
2272 /*
2273  * This is open-coded in handle_reply().
2274  */
2275 static void complete_request(struct ceph_osd_request *req, int err)
2276 {
2277 	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2278 
2279 	req->r_result = err;
2280 	finish_request(req);
2281 	__complete_request(req);
2282 	complete_all(&req->r_completion);
2283 	ceph_osdc_put_request(req);
2284 }
2285 
2286 static void cancel_map_check(struct ceph_osd_request *req)
2287 {
2288 	struct ceph_osd_client *osdc = req->r_osdc;
2289 	struct ceph_osd_request *lookup_req;
2290 
2291 	verify_osdc_wrlocked(osdc);
2292 
2293 	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2294 	if (!lookup_req)
2295 		return;
2296 
2297 	WARN_ON(lookup_req != req);
2298 	erase_request_mc(&osdc->map_checks, req);
2299 	ceph_osdc_put_request(req);
2300 }
2301 
2302 static void cancel_request(struct ceph_osd_request *req)
2303 {
2304 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2305 
2306 	cancel_map_check(req);
2307 	finish_request(req);
2308 	complete_all(&req->r_completion);
2309 	ceph_osdc_put_request(req);
2310 }
2311 
2312 static void abort_request(struct ceph_osd_request *req, int err)
2313 {
2314 	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2315 
2316 	cancel_map_check(req);
2317 	complete_request(req, err);
2318 }
2319 
2320 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2321 {
2322 	if (likely(eb > osdc->epoch_barrier)) {
2323 		dout("updating epoch_barrier from %u to %u\n",
2324 				osdc->epoch_barrier, eb);
2325 		osdc->epoch_barrier = eb;
2326 		/* Request map if we're not to the barrier yet */
2327 		if (eb > osdc->osdmap->epoch)
2328 			maybe_request_map(osdc);
2329 	}
2330 }
2331 
2332 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2333 {
2334 	down_read(&osdc->lock);
2335 	if (unlikely(eb > osdc->epoch_barrier)) {
2336 		up_read(&osdc->lock);
2337 		down_write(&osdc->lock);
2338 		update_epoch_barrier(osdc, eb);
2339 		up_write(&osdc->lock);
2340 	} else {
2341 		up_read(&osdc->lock);
2342 	}
2343 }
2344 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2345 
2346 /*
2347  * Drop all pending requests that are stalled waiting on a full condition to
2348  * clear, and complete them with ENOSPC as the return code. Set the
2349  * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2350  * cancelled.
2351  */
2352 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2353 {
2354 	struct rb_node *n;
2355 	bool victims = false;
2356 
2357 	dout("enter abort_on_full\n");
2358 
2359 	if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
2360 		goto out;
2361 
2362 	/* Scan list and see if there is anything to abort */
2363 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2364 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2365 		struct rb_node *m;
2366 
2367 		m = rb_first(&osd->o_requests);
2368 		while (m) {
2369 			struct ceph_osd_request *req = rb_entry(m,
2370 					struct ceph_osd_request, r_node);
2371 			m = rb_next(m);
2372 
2373 			if (req->r_abort_on_full) {
2374 				victims = true;
2375 				break;
2376 			}
2377 		}
2378 		if (victims)
2379 			break;
2380 	}
2381 
2382 	if (!victims)
2383 		goto out;
2384 
2385 	/*
2386 	 * Update the barrier to current epoch if it's behind that point,
2387 	 * since we know we have some calls to be aborted in the tree.
2388 	 */
2389 	update_epoch_barrier(osdc, osdc->osdmap->epoch);
2390 
2391 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2392 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2393 		struct rb_node *m;
2394 
2395 		m = rb_first(&osd->o_requests);
2396 		while (m) {
2397 			struct ceph_osd_request *req = rb_entry(m,
2398 					struct ceph_osd_request, r_node);
2399 			m = rb_next(m);
2400 
2401 			if (req->r_abort_on_full &&
2402 			    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2403 			     pool_full(osdc, req->r_t.target_oloc.pool)))
2404 				abort_request(req, -ENOSPC);
2405 		}
2406 	}
2407 out:
2408 	dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
2409 }
2410 
2411 static void check_pool_dne(struct ceph_osd_request *req)
2412 {
2413 	struct ceph_osd_client *osdc = req->r_osdc;
2414 	struct ceph_osdmap *map = osdc->osdmap;
2415 
2416 	verify_osdc_wrlocked(osdc);
2417 	WARN_ON(!map->epoch);
2418 
2419 	if (req->r_attempts) {
2420 		/*
2421 		 * We sent a request earlier, which means that
2422 		 * previously the pool existed, and now it does not
2423 		 * (i.e., it was deleted).
2424 		 */
2425 		req->r_map_dne_bound = map->epoch;
2426 		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2427 		     req->r_tid);
2428 	} else {
2429 		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2430 		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
2431 	}
2432 
2433 	if (req->r_map_dne_bound) {
2434 		if (map->epoch >= req->r_map_dne_bound) {
2435 			/* we had a new enough map */
2436 			pr_info_ratelimited("tid %llu pool does not exist\n",
2437 					    req->r_tid);
2438 			complete_request(req, -ENOENT);
2439 		}
2440 	} else {
2441 		send_map_check(req);
2442 	}
2443 }
2444 
2445 static void map_check_cb(struct ceph_mon_generic_request *greq)
2446 {
2447 	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2448 	struct ceph_osd_request *req;
2449 	u64 tid = greq->private_data;
2450 
2451 	WARN_ON(greq->result || !greq->u.newest);
2452 
2453 	down_write(&osdc->lock);
2454 	req = lookup_request_mc(&osdc->map_checks, tid);
2455 	if (!req) {
2456 		dout("%s tid %llu dne\n", __func__, tid);
2457 		goto out_unlock;
2458 	}
2459 
2460 	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2461 	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2462 	if (!req->r_map_dne_bound)
2463 		req->r_map_dne_bound = greq->u.newest;
2464 	erase_request_mc(&osdc->map_checks, req);
2465 	check_pool_dne(req);
2466 
2467 	ceph_osdc_put_request(req);
2468 out_unlock:
2469 	up_write(&osdc->lock);
2470 }
2471 
2472 static void send_map_check(struct ceph_osd_request *req)
2473 {
2474 	struct ceph_osd_client *osdc = req->r_osdc;
2475 	struct ceph_osd_request *lookup_req;
2476 	int ret;
2477 
2478 	verify_osdc_wrlocked(osdc);
2479 
2480 	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2481 	if (lookup_req) {
2482 		WARN_ON(lookup_req != req);
2483 		return;
2484 	}
2485 
2486 	ceph_osdc_get_request(req);
2487 	insert_request_mc(&osdc->map_checks, req);
2488 	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2489 					  map_check_cb, req->r_tid);
2490 	WARN_ON(ret);
2491 }
2492 
2493 /*
2494  * lingering requests, watch/notify v2 infrastructure
2495  */
2496 static void linger_release(struct kref *kref)
2497 {
2498 	struct ceph_osd_linger_request *lreq =
2499 	    container_of(kref, struct ceph_osd_linger_request, kref);
2500 
2501 	dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2502 	     lreq->reg_req, lreq->ping_req);
2503 	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2504 	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2505 	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2506 	WARN_ON(!list_empty(&lreq->scan_item));
2507 	WARN_ON(!list_empty(&lreq->pending_lworks));
2508 	WARN_ON(lreq->osd);
2509 
2510 	if (lreq->reg_req)
2511 		ceph_osdc_put_request(lreq->reg_req);
2512 	if (lreq->ping_req)
2513 		ceph_osdc_put_request(lreq->ping_req);
2514 	target_destroy(&lreq->t);
2515 	kfree(lreq);
2516 }
2517 
2518 static void linger_put(struct ceph_osd_linger_request *lreq)
2519 {
2520 	if (lreq)
2521 		kref_put(&lreq->kref, linger_release);
2522 }
2523 
2524 static struct ceph_osd_linger_request *
2525 linger_get(struct ceph_osd_linger_request *lreq)
2526 {
2527 	kref_get(&lreq->kref);
2528 	return lreq;
2529 }
2530 
2531 static struct ceph_osd_linger_request *
2532 linger_alloc(struct ceph_osd_client *osdc)
2533 {
2534 	struct ceph_osd_linger_request *lreq;
2535 
2536 	lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2537 	if (!lreq)
2538 		return NULL;
2539 
2540 	kref_init(&lreq->kref);
2541 	mutex_init(&lreq->lock);
2542 	RB_CLEAR_NODE(&lreq->node);
2543 	RB_CLEAR_NODE(&lreq->osdc_node);
2544 	RB_CLEAR_NODE(&lreq->mc_node);
2545 	INIT_LIST_HEAD(&lreq->scan_item);
2546 	INIT_LIST_HEAD(&lreq->pending_lworks);
2547 	init_completion(&lreq->reg_commit_wait);
2548 	init_completion(&lreq->notify_finish_wait);
2549 
2550 	lreq->osdc = osdc;
2551 	target_init(&lreq->t);
2552 
2553 	dout("%s lreq %p\n", __func__, lreq);
2554 	return lreq;
2555 }
2556 
2557 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2558 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2559 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2560 
2561 /*
2562  * Create linger request <-> OSD session relation.
2563  *
2564  * @lreq has to be registered, @osd may be homeless.
2565  */
2566 static void link_linger(struct ceph_osd *osd,
2567 			struct ceph_osd_linger_request *lreq)
2568 {
2569 	verify_osd_locked(osd);
2570 	WARN_ON(!lreq->linger_id || lreq->osd);
2571 	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2572 	     osd->o_osd, lreq, lreq->linger_id);
2573 
2574 	if (!osd_homeless(osd))
2575 		__remove_osd_from_lru(osd);
2576 	else
2577 		atomic_inc(&osd->o_osdc->num_homeless);
2578 
2579 	get_osd(osd);
2580 	insert_linger(&osd->o_linger_requests, lreq);
2581 	lreq->osd = osd;
2582 }
2583 
2584 static void unlink_linger(struct ceph_osd *osd,
2585 			  struct ceph_osd_linger_request *lreq)
2586 {
2587 	verify_osd_locked(osd);
2588 	WARN_ON(lreq->osd != osd);
2589 	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2590 	     osd->o_osd, lreq, lreq->linger_id);
2591 
2592 	lreq->osd = NULL;
2593 	erase_linger(&osd->o_linger_requests, lreq);
2594 	put_osd(osd);
2595 
2596 	if (!osd_homeless(osd))
2597 		maybe_move_osd_to_lru(osd);
2598 	else
2599 		atomic_dec(&osd->o_osdc->num_homeless);
2600 }
2601 
2602 static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2603 {
2604 	verify_osdc_locked(lreq->osdc);
2605 
2606 	return !RB_EMPTY_NODE(&lreq->osdc_node);
2607 }
2608 
2609 static bool linger_registered(struct ceph_osd_linger_request *lreq)
2610 {
2611 	struct ceph_osd_client *osdc = lreq->osdc;
2612 	bool registered;
2613 
2614 	down_read(&osdc->lock);
2615 	registered = __linger_registered(lreq);
2616 	up_read(&osdc->lock);
2617 
2618 	return registered;
2619 }
2620 
2621 static void linger_register(struct ceph_osd_linger_request *lreq)
2622 {
2623 	struct ceph_osd_client *osdc = lreq->osdc;
2624 
2625 	verify_osdc_wrlocked(osdc);
2626 	WARN_ON(lreq->linger_id);
2627 
2628 	linger_get(lreq);
2629 	lreq->linger_id = ++osdc->last_linger_id;
2630 	insert_linger_osdc(&osdc->linger_requests, lreq);
2631 }
2632 
2633 static void linger_unregister(struct ceph_osd_linger_request *lreq)
2634 {
2635 	struct ceph_osd_client *osdc = lreq->osdc;
2636 
2637 	verify_osdc_wrlocked(osdc);
2638 
2639 	erase_linger_osdc(&osdc->linger_requests, lreq);
2640 	linger_put(lreq);
2641 }
2642 
2643 static void cancel_linger_request(struct ceph_osd_request *req)
2644 {
2645 	struct ceph_osd_linger_request *lreq = req->r_priv;
2646 
2647 	WARN_ON(!req->r_linger);
2648 	cancel_request(req);
2649 	linger_put(lreq);
2650 }
2651 
2652 struct linger_work {
2653 	struct work_struct work;
2654 	struct ceph_osd_linger_request *lreq;
2655 	struct list_head pending_item;
2656 	unsigned long queued_stamp;
2657 
2658 	union {
2659 		struct {
2660 			u64 notify_id;
2661 			u64 notifier_id;
2662 			void *payload; /* points into @msg front */
2663 			size_t payload_len;
2664 
2665 			struct ceph_msg *msg; /* for ceph_msg_put() */
2666 		} notify;
2667 		struct {
2668 			int err;
2669 		} error;
2670 	};
2671 };
2672 
2673 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2674 				       work_func_t workfn)
2675 {
2676 	struct linger_work *lwork;
2677 
2678 	lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2679 	if (!lwork)
2680 		return NULL;
2681 
2682 	INIT_WORK(&lwork->work, workfn);
2683 	INIT_LIST_HEAD(&lwork->pending_item);
2684 	lwork->lreq = linger_get(lreq);
2685 
2686 	return lwork;
2687 }
2688 
2689 static void lwork_free(struct linger_work *lwork)
2690 {
2691 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2692 
2693 	mutex_lock(&lreq->lock);
2694 	list_del(&lwork->pending_item);
2695 	mutex_unlock(&lreq->lock);
2696 
2697 	linger_put(lreq);
2698 	kfree(lwork);
2699 }
2700 
2701 static void lwork_queue(struct linger_work *lwork)
2702 {
2703 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2704 	struct ceph_osd_client *osdc = lreq->osdc;
2705 
2706 	verify_lreq_locked(lreq);
2707 	WARN_ON(!list_empty(&lwork->pending_item));
2708 
2709 	lwork->queued_stamp = jiffies;
2710 	list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2711 	queue_work(osdc->notify_wq, &lwork->work);
2712 }
2713 
2714 static void do_watch_notify(struct work_struct *w)
2715 {
2716 	struct linger_work *lwork = container_of(w, struct linger_work, work);
2717 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2718 
2719 	if (!linger_registered(lreq)) {
2720 		dout("%s lreq %p not registered\n", __func__, lreq);
2721 		goto out;
2722 	}
2723 
2724 	WARN_ON(!lreq->is_watch);
2725 	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2726 	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2727 	     lwork->notify.payload_len);
2728 	lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2729 		  lwork->notify.notifier_id, lwork->notify.payload,
2730 		  lwork->notify.payload_len);
2731 
2732 out:
2733 	ceph_msg_put(lwork->notify.msg);
2734 	lwork_free(lwork);
2735 }
2736 
2737 static void do_watch_error(struct work_struct *w)
2738 {
2739 	struct linger_work *lwork = container_of(w, struct linger_work, work);
2740 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2741 
2742 	if (!linger_registered(lreq)) {
2743 		dout("%s lreq %p not registered\n", __func__, lreq);
2744 		goto out;
2745 	}
2746 
2747 	dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
2748 	lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
2749 
2750 out:
2751 	lwork_free(lwork);
2752 }
2753 
2754 static void queue_watch_error(struct ceph_osd_linger_request *lreq)
2755 {
2756 	struct linger_work *lwork;
2757 
2758 	lwork = lwork_alloc(lreq, do_watch_error);
2759 	if (!lwork) {
2760 		pr_err("failed to allocate error-lwork\n");
2761 		return;
2762 	}
2763 
2764 	lwork->error.err = lreq->last_error;
2765 	lwork_queue(lwork);
2766 }
2767 
2768 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2769 				       int result)
2770 {
2771 	if (!completion_done(&lreq->reg_commit_wait)) {
2772 		lreq->reg_commit_error = (result <= 0 ? result : 0);
2773 		complete_all(&lreq->reg_commit_wait);
2774 	}
2775 }
2776 
2777 static void linger_commit_cb(struct ceph_osd_request *req)
2778 {
2779 	struct ceph_osd_linger_request *lreq = req->r_priv;
2780 
2781 	mutex_lock(&lreq->lock);
2782 	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2783 	     lreq->linger_id, req->r_result);
2784 	linger_reg_commit_complete(lreq, req->r_result);
2785 	lreq->committed = true;
2786 
2787 	if (!lreq->is_watch) {
2788 		struct ceph_osd_data *osd_data =
2789 		    osd_req_op_data(req, 0, notify, response_data);
2790 		void *p = page_address(osd_data->pages[0]);
2791 
2792 		WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
2793 			osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
2794 
2795 		/* make note of the notify_id */
2796 		if (req->r_ops[0].outdata_len >= sizeof(u64)) {
2797 			lreq->notify_id = ceph_decode_64(&p);
2798 			dout("lreq %p notify_id %llu\n", lreq,
2799 			     lreq->notify_id);
2800 		} else {
2801 			dout("lreq %p no notify_id\n", lreq);
2802 		}
2803 	}
2804 
2805 	mutex_unlock(&lreq->lock);
2806 	linger_put(lreq);
2807 }
2808 
2809 static int normalize_watch_error(int err)
2810 {
2811 	/*
2812 	 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
2813 	 * notification and a failure to reconnect because we raced with
2814 	 * the delete appear the same to the user.
2815 	 */
2816 	if (err == -ENOENT)
2817 		err = -ENOTCONN;
2818 
2819 	return err;
2820 }
2821 
2822 static void linger_reconnect_cb(struct ceph_osd_request *req)
2823 {
2824 	struct ceph_osd_linger_request *lreq = req->r_priv;
2825 
2826 	mutex_lock(&lreq->lock);
2827 	dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
2828 	     lreq, lreq->linger_id, req->r_result, lreq->last_error);
2829 	if (req->r_result < 0) {
2830 		if (!lreq->last_error) {
2831 			lreq->last_error = normalize_watch_error(req->r_result);
2832 			queue_watch_error(lreq);
2833 		}
2834 	}
2835 
2836 	mutex_unlock(&lreq->lock);
2837 	linger_put(lreq);
2838 }
2839 
2840 static void send_linger(struct ceph_osd_linger_request *lreq)
2841 {
2842 	struct ceph_osd_request *req = lreq->reg_req;
2843 	struct ceph_osd_req_op *op = &req->r_ops[0];
2844 
2845 	verify_osdc_wrlocked(req->r_osdc);
2846 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2847 
2848 	if (req->r_osd)
2849 		cancel_linger_request(req);
2850 
2851 	request_reinit(req);
2852 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2853 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2854 	req->r_flags = lreq->t.flags;
2855 	req->r_mtime = lreq->mtime;
2856 
2857 	mutex_lock(&lreq->lock);
2858 	if (lreq->is_watch && lreq->committed) {
2859 		WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2860 			op->watch.cookie != lreq->linger_id);
2861 		op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2862 		op->watch.gen = ++lreq->register_gen;
2863 		dout("lreq %p reconnect register_gen %u\n", lreq,
2864 		     op->watch.gen);
2865 		req->r_callback = linger_reconnect_cb;
2866 	} else {
2867 		if (!lreq->is_watch)
2868 			lreq->notify_id = 0;
2869 		else
2870 			WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
2871 		dout("lreq %p register\n", lreq);
2872 		req->r_callback = linger_commit_cb;
2873 	}
2874 	mutex_unlock(&lreq->lock);
2875 
2876 	req->r_priv = linger_get(lreq);
2877 	req->r_linger = true;
2878 
2879 	submit_request(req, true);
2880 }
2881 
2882 static void linger_ping_cb(struct ceph_osd_request *req)
2883 {
2884 	struct ceph_osd_linger_request *lreq = req->r_priv;
2885 
2886 	mutex_lock(&lreq->lock);
2887 	dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
2888 	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
2889 	     lreq->last_error);
2890 	if (lreq->register_gen == req->r_ops[0].watch.gen) {
2891 		if (!req->r_result) {
2892 			lreq->watch_valid_thru = lreq->ping_sent;
2893 		} else if (!lreq->last_error) {
2894 			lreq->last_error = normalize_watch_error(req->r_result);
2895 			queue_watch_error(lreq);
2896 		}
2897 	} else {
2898 		dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
2899 		     lreq->register_gen, req->r_ops[0].watch.gen);
2900 	}
2901 
2902 	mutex_unlock(&lreq->lock);
2903 	linger_put(lreq);
2904 }
2905 
2906 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
2907 {
2908 	struct ceph_osd_client *osdc = lreq->osdc;
2909 	struct ceph_osd_request *req = lreq->ping_req;
2910 	struct ceph_osd_req_op *op = &req->r_ops[0];
2911 
2912 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2913 		dout("%s PAUSERD\n", __func__);
2914 		return;
2915 	}
2916 
2917 	lreq->ping_sent = jiffies;
2918 	dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
2919 	     __func__, lreq, lreq->linger_id, lreq->ping_sent,
2920 	     lreq->register_gen);
2921 
2922 	if (req->r_osd)
2923 		cancel_linger_request(req);
2924 
2925 	request_reinit(req);
2926 	target_copy(&req->r_t, &lreq->t);
2927 
2928 	WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2929 		op->watch.cookie != lreq->linger_id ||
2930 		op->watch.op != CEPH_OSD_WATCH_OP_PING);
2931 	op->watch.gen = lreq->register_gen;
2932 	req->r_callback = linger_ping_cb;
2933 	req->r_priv = linger_get(lreq);
2934 	req->r_linger = true;
2935 
2936 	ceph_osdc_get_request(req);
2937 	account_request(req);
2938 	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2939 	link_request(lreq->osd, req);
2940 	send_request(req);
2941 }
2942 
2943 static void linger_submit(struct ceph_osd_linger_request *lreq)
2944 {
2945 	struct ceph_osd_client *osdc = lreq->osdc;
2946 	struct ceph_osd *osd;
2947 
2948 	calc_target(osdc, &lreq->t, NULL, false);
2949 	osd = lookup_create_osd(osdc, lreq->t.osd, true);
2950 	link_linger(osd, lreq);
2951 
2952 	send_linger(lreq);
2953 }
2954 
2955 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
2956 {
2957 	struct ceph_osd_client *osdc = lreq->osdc;
2958 	struct ceph_osd_linger_request *lookup_lreq;
2959 
2960 	verify_osdc_wrlocked(osdc);
2961 
2962 	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
2963 				       lreq->linger_id);
2964 	if (!lookup_lreq)
2965 		return;
2966 
2967 	WARN_ON(lookup_lreq != lreq);
2968 	erase_linger_mc(&osdc->linger_map_checks, lreq);
2969 	linger_put(lreq);
2970 }
2971 
2972 /*
2973  * @lreq has to be both registered and linked.
2974  */
2975 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
2976 {
2977 	if (lreq->is_watch && lreq->ping_req->r_osd)
2978 		cancel_linger_request(lreq->ping_req);
2979 	if (lreq->reg_req->r_osd)
2980 		cancel_linger_request(lreq->reg_req);
2981 	cancel_linger_map_check(lreq);
2982 	unlink_linger(lreq->osd, lreq);
2983 	linger_unregister(lreq);
2984 }
2985 
2986 static void linger_cancel(struct ceph_osd_linger_request *lreq)
2987 {
2988 	struct ceph_osd_client *osdc = lreq->osdc;
2989 
2990 	down_write(&osdc->lock);
2991 	if (__linger_registered(lreq))
2992 		__linger_cancel(lreq);
2993 	up_write(&osdc->lock);
2994 }
2995 
2996 static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
2997 
2998 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
2999 {
3000 	struct ceph_osd_client *osdc = lreq->osdc;
3001 	struct ceph_osdmap *map = osdc->osdmap;
3002 
3003 	verify_osdc_wrlocked(osdc);
3004 	WARN_ON(!map->epoch);
3005 
3006 	if (lreq->register_gen) {
3007 		lreq->map_dne_bound = map->epoch;
3008 		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3009 		     lreq, lreq->linger_id);
3010 	} else {
3011 		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3012 		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3013 		     map->epoch);
3014 	}
3015 
3016 	if (lreq->map_dne_bound) {
3017 		if (map->epoch >= lreq->map_dne_bound) {
3018 			/* we had a new enough map */
3019 			pr_info("linger_id %llu pool does not exist\n",
3020 				lreq->linger_id);
3021 			linger_reg_commit_complete(lreq, -ENOENT);
3022 			__linger_cancel(lreq);
3023 		}
3024 	} else {
3025 		send_linger_map_check(lreq);
3026 	}
3027 }
3028 
3029 static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3030 {
3031 	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3032 	struct ceph_osd_linger_request *lreq;
3033 	u64 linger_id = greq->private_data;
3034 
3035 	WARN_ON(greq->result || !greq->u.newest);
3036 
3037 	down_write(&osdc->lock);
3038 	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3039 	if (!lreq) {
3040 		dout("%s linger_id %llu dne\n", __func__, linger_id);
3041 		goto out_unlock;
3042 	}
3043 
3044 	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3045 	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3046 	     greq->u.newest);
3047 	if (!lreq->map_dne_bound)
3048 		lreq->map_dne_bound = greq->u.newest;
3049 	erase_linger_mc(&osdc->linger_map_checks, lreq);
3050 	check_linger_pool_dne(lreq);
3051 
3052 	linger_put(lreq);
3053 out_unlock:
3054 	up_write(&osdc->lock);
3055 }
3056 
3057 static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3058 {
3059 	struct ceph_osd_client *osdc = lreq->osdc;
3060 	struct ceph_osd_linger_request *lookup_lreq;
3061 	int ret;
3062 
3063 	verify_osdc_wrlocked(osdc);
3064 
3065 	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3066 				       lreq->linger_id);
3067 	if (lookup_lreq) {
3068 		WARN_ON(lookup_lreq != lreq);
3069 		return;
3070 	}
3071 
3072 	linger_get(lreq);
3073 	insert_linger_mc(&osdc->linger_map_checks, lreq);
3074 	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3075 					  linger_map_check_cb, lreq->linger_id);
3076 	WARN_ON(ret);
3077 }
3078 
3079 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3080 {
3081 	int ret;
3082 
3083 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3084 	ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
3085 	return ret ?: lreq->reg_commit_error;
3086 }
3087 
3088 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
3089 {
3090 	int ret;
3091 
3092 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3093 	ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
3094 	return ret ?: lreq->notify_finish_error;
3095 }
3096 
3097 /*
3098  * Timeout callback, called every N seconds.  When 1 or more OSD
3099  * requests has been active for more than N seconds, we send a keepalive
3100  * (tag + timestamp) to its OSD to ensure any communications channel
3101  * reset is detected.
3102  */
3103 static void handle_timeout(struct work_struct *work)
3104 {
3105 	struct ceph_osd_client *osdc =
3106 		container_of(work, struct ceph_osd_client, timeout_work.work);
3107 	struct ceph_options *opts = osdc->client->options;
3108 	unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3109 	unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3110 	LIST_HEAD(slow_osds);
3111 	struct rb_node *n, *p;
3112 
3113 	dout("%s osdc %p\n", __func__, osdc);
3114 	down_write(&osdc->lock);
3115 
3116 	/*
3117 	 * ping osds that are a bit slow.  this ensures that if there
3118 	 * is a break in the TCP connection we will notice, and reopen
3119 	 * a connection with that osd (from the fault callback).
3120 	 */
3121 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3122 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3123 		bool found = false;
3124 
3125 		for (p = rb_first(&osd->o_requests); p; ) {
3126 			struct ceph_osd_request *req =
3127 			    rb_entry(p, struct ceph_osd_request, r_node);
3128 
3129 			p = rb_next(p); /* abort_request() */
3130 
3131 			if (time_before(req->r_stamp, cutoff)) {
3132 				dout(" req %p tid %llu on osd%d is laggy\n",
3133 				     req, req->r_tid, osd->o_osd);
3134 				found = true;
3135 			}
3136 			if (opts->osd_request_timeout &&
3137 			    time_before(req->r_start_stamp, expiry_cutoff)) {
3138 				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3139 				       req->r_tid, osd->o_osd);
3140 				abort_request(req, -ETIMEDOUT);
3141 			}
3142 		}
3143 		for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3144 			struct ceph_osd_linger_request *lreq =
3145 			    rb_entry(p, struct ceph_osd_linger_request, node);
3146 
3147 			dout(" lreq %p linger_id %llu is served by osd%d\n",
3148 			     lreq, lreq->linger_id, osd->o_osd);
3149 			found = true;
3150 
3151 			mutex_lock(&lreq->lock);
3152 			if (lreq->is_watch && lreq->committed && !lreq->last_error)
3153 				send_linger_ping(lreq);
3154 			mutex_unlock(&lreq->lock);
3155 		}
3156 
3157 		if (found)
3158 			list_move_tail(&osd->o_keepalive_item, &slow_osds);
3159 	}
3160 
3161 	if (opts->osd_request_timeout) {
3162 		for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3163 			struct ceph_osd_request *req =
3164 			    rb_entry(p, struct ceph_osd_request, r_node);
3165 
3166 			p = rb_next(p); /* abort_request() */
3167 
3168 			if (time_before(req->r_start_stamp, expiry_cutoff)) {
3169 				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3170 				       req->r_tid, osdc->homeless_osd.o_osd);
3171 				abort_request(req, -ETIMEDOUT);
3172 			}
3173 		}
3174 	}
3175 
3176 	if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3177 		maybe_request_map(osdc);
3178 
3179 	while (!list_empty(&slow_osds)) {
3180 		struct ceph_osd *osd = list_first_entry(&slow_osds,
3181 							struct ceph_osd,
3182 							o_keepalive_item);
3183 		list_del_init(&osd->o_keepalive_item);
3184 		ceph_con_keepalive(&osd->o_con);
3185 	}
3186 
3187 	up_write(&osdc->lock);
3188 	schedule_delayed_work(&osdc->timeout_work,
3189 			      osdc->client->options->osd_keepalive_timeout);
3190 }
3191 
3192 static void handle_osds_timeout(struct work_struct *work)
3193 {
3194 	struct ceph_osd_client *osdc =
3195 		container_of(work, struct ceph_osd_client,
3196 			     osds_timeout_work.work);
3197 	unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3198 	struct ceph_osd *osd, *nosd;
3199 
3200 	dout("%s osdc %p\n", __func__, osdc);
3201 	down_write(&osdc->lock);
3202 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3203 		if (time_before(jiffies, osd->lru_ttl))
3204 			break;
3205 
3206 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3207 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3208 		close_osd(osd);
3209 	}
3210 
3211 	up_write(&osdc->lock);
3212 	schedule_delayed_work(&osdc->osds_timeout_work,
3213 			      round_jiffies_relative(delay));
3214 }
3215 
3216 static int ceph_oloc_decode(void **p, void *end,
3217 			    struct ceph_object_locator *oloc)
3218 {
3219 	u8 struct_v, struct_cv;
3220 	u32 len;
3221 	void *struct_end;
3222 	int ret = 0;
3223 
3224 	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3225 	struct_v = ceph_decode_8(p);
3226 	struct_cv = ceph_decode_8(p);
3227 	if (struct_v < 3) {
3228 		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3229 			struct_v, struct_cv);
3230 		goto e_inval;
3231 	}
3232 	if (struct_cv > 6) {
3233 		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3234 			struct_v, struct_cv);
3235 		goto e_inval;
3236 	}
3237 	len = ceph_decode_32(p);
3238 	ceph_decode_need(p, end, len, e_inval);
3239 	struct_end = *p + len;
3240 
3241 	oloc->pool = ceph_decode_64(p);
3242 	*p += 4; /* skip preferred */
3243 
3244 	len = ceph_decode_32(p);
3245 	if (len > 0) {
3246 		pr_warn("ceph_object_locator::key is set\n");
3247 		goto e_inval;
3248 	}
3249 
3250 	if (struct_v >= 5) {
3251 		bool changed = false;
3252 
3253 		len = ceph_decode_32(p);
3254 		if (len > 0) {
3255 			ceph_decode_need(p, end, len, e_inval);
3256 			if (!oloc->pool_ns ||
3257 			    ceph_compare_string(oloc->pool_ns, *p, len))
3258 				changed = true;
3259 			*p += len;
3260 		} else {
3261 			if (oloc->pool_ns)
3262 				changed = true;
3263 		}
3264 		if (changed) {
3265 			/* redirect changes namespace */
3266 			pr_warn("ceph_object_locator::nspace is changed\n");
3267 			goto e_inval;
3268 		}
3269 	}
3270 
3271 	if (struct_v >= 6) {
3272 		s64 hash = ceph_decode_64(p);
3273 		if (hash != -1) {
3274 			pr_warn("ceph_object_locator::hash is set\n");
3275 			goto e_inval;
3276 		}
3277 	}
3278 
3279 	/* skip the rest */
3280 	*p = struct_end;
3281 out:
3282 	return ret;
3283 
3284 e_inval:
3285 	ret = -EINVAL;
3286 	goto out;
3287 }
3288 
3289 static int ceph_redirect_decode(void **p, void *end,
3290 				struct ceph_request_redirect *redir)
3291 {
3292 	u8 struct_v, struct_cv;
3293 	u32 len;
3294 	void *struct_end;
3295 	int ret;
3296 
3297 	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3298 	struct_v = ceph_decode_8(p);
3299 	struct_cv = ceph_decode_8(p);
3300 	if (struct_cv > 1) {
3301 		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3302 			struct_v, struct_cv);
3303 		goto e_inval;
3304 	}
3305 	len = ceph_decode_32(p);
3306 	ceph_decode_need(p, end, len, e_inval);
3307 	struct_end = *p + len;
3308 
3309 	ret = ceph_oloc_decode(p, end, &redir->oloc);
3310 	if (ret)
3311 		goto out;
3312 
3313 	len = ceph_decode_32(p);
3314 	if (len > 0) {
3315 		pr_warn("ceph_request_redirect::object_name is set\n");
3316 		goto e_inval;
3317 	}
3318 
3319 	len = ceph_decode_32(p);
3320 	*p += len; /* skip osd_instructions */
3321 
3322 	/* skip the rest */
3323 	*p = struct_end;
3324 out:
3325 	return ret;
3326 
3327 e_inval:
3328 	ret = -EINVAL;
3329 	goto out;
3330 }
3331 
3332 struct MOSDOpReply {
3333 	struct ceph_pg pgid;
3334 	u64 flags;
3335 	int result;
3336 	u32 epoch;
3337 	int num_ops;
3338 	u32 outdata_len[CEPH_OSD_MAX_OPS];
3339 	s32 rval[CEPH_OSD_MAX_OPS];
3340 	int retry_attempt;
3341 	struct ceph_eversion replay_version;
3342 	u64 user_version;
3343 	struct ceph_request_redirect redirect;
3344 };
3345 
3346 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3347 {
3348 	void *p = msg->front.iov_base;
3349 	void *const end = p + msg->front.iov_len;
3350 	u16 version = le16_to_cpu(msg->hdr.version);
3351 	struct ceph_eversion bad_replay_version;
3352 	u8 decode_redir;
3353 	u32 len;
3354 	int ret;
3355 	int i;
3356 
3357 	ceph_decode_32_safe(&p, end, len, e_inval);
3358 	ceph_decode_need(&p, end, len, e_inval);
3359 	p += len; /* skip oid */
3360 
3361 	ret = ceph_decode_pgid(&p, end, &m->pgid);
3362 	if (ret)
3363 		return ret;
3364 
3365 	ceph_decode_64_safe(&p, end, m->flags, e_inval);
3366 	ceph_decode_32_safe(&p, end, m->result, e_inval);
3367 	ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3368 	memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3369 	p += sizeof(bad_replay_version);
3370 	ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3371 
3372 	ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3373 	if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3374 		goto e_inval;
3375 
3376 	ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3377 			 e_inval);
3378 	for (i = 0; i < m->num_ops; i++) {
3379 		struct ceph_osd_op *op = p;
3380 
3381 		m->outdata_len[i] = le32_to_cpu(op->payload_len);
3382 		p += sizeof(*op);
3383 	}
3384 
3385 	ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3386 	for (i = 0; i < m->num_ops; i++)
3387 		ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3388 
3389 	if (version >= 5) {
3390 		ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3391 		memcpy(&m->replay_version, p, sizeof(m->replay_version));
3392 		p += sizeof(m->replay_version);
3393 		ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3394 	} else {
3395 		m->replay_version = bad_replay_version; /* struct */
3396 		m->user_version = le64_to_cpu(m->replay_version.version);
3397 	}
3398 
3399 	if (version >= 6) {
3400 		if (version >= 7)
3401 			ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3402 		else
3403 			decode_redir = 1;
3404 	} else {
3405 		decode_redir = 0;
3406 	}
3407 
3408 	if (decode_redir) {
3409 		ret = ceph_redirect_decode(&p, end, &m->redirect);
3410 		if (ret)
3411 			return ret;
3412 	} else {
3413 		ceph_oloc_init(&m->redirect.oloc);
3414 	}
3415 
3416 	return 0;
3417 
3418 e_inval:
3419 	return -EINVAL;
3420 }
3421 
3422 /*
3423  * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3424  * specified.
3425  */
3426 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3427 {
3428 	struct ceph_osd_client *osdc = osd->o_osdc;
3429 	struct ceph_osd_request *req;
3430 	struct MOSDOpReply m;
3431 	u64 tid = le64_to_cpu(msg->hdr.tid);
3432 	u32 data_len = 0;
3433 	int ret;
3434 	int i;
3435 
3436 	dout("%s msg %p tid %llu\n", __func__, msg, tid);
3437 
3438 	down_read(&osdc->lock);
3439 	if (!osd_registered(osd)) {
3440 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3441 		goto out_unlock_osdc;
3442 	}
3443 	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3444 
3445 	mutex_lock(&osd->lock);
3446 	req = lookup_request(&osd->o_requests, tid);
3447 	if (!req) {
3448 		dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3449 		goto out_unlock_session;
3450 	}
3451 
3452 	m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3453 	ret = decode_MOSDOpReply(msg, &m);
3454 	m.redirect.oloc.pool_ns = NULL;
3455 	if (ret) {
3456 		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3457 		       req->r_tid, ret);
3458 		ceph_msg_dump(msg);
3459 		goto fail_request;
3460 	}
3461 	dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3462 	     __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3463 	     m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3464 	     le64_to_cpu(m.replay_version.version), m.user_version);
3465 
3466 	if (m.retry_attempt >= 0) {
3467 		if (m.retry_attempt != req->r_attempts - 1) {
3468 			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3469 			     req, req->r_tid, m.retry_attempt,
3470 			     req->r_attempts - 1);
3471 			goto out_unlock_session;
3472 		}
3473 	} else {
3474 		WARN_ON(1); /* MOSDOpReply v4 is assumed */
3475 	}
3476 
3477 	if (!ceph_oloc_empty(&m.redirect.oloc)) {
3478 		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3479 		     m.redirect.oloc.pool);
3480 		unlink_request(osd, req);
3481 		mutex_unlock(&osd->lock);
3482 
3483 		/*
3484 		 * Not ceph_oloc_copy() - changing pool_ns is not
3485 		 * supported.
3486 		 */
3487 		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3488 		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
3489 		req->r_tid = 0;
3490 		__submit_request(req, false);
3491 		goto out_unlock_osdc;
3492 	}
3493 
3494 	if (m.num_ops != req->r_num_ops) {
3495 		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3496 		       req->r_num_ops, req->r_tid);
3497 		goto fail_request;
3498 	}
3499 	for (i = 0; i < req->r_num_ops; i++) {
3500 		dout(" req %p tid %llu op %d rval %d len %u\n", req,
3501 		     req->r_tid, i, m.rval[i], m.outdata_len[i]);
3502 		req->r_ops[i].rval = m.rval[i];
3503 		req->r_ops[i].outdata_len = m.outdata_len[i];
3504 		data_len += m.outdata_len[i];
3505 	}
3506 	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3507 		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3508 		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
3509 		goto fail_request;
3510 	}
3511 	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3512 	     req, req->r_tid, m.result, data_len);
3513 
3514 	/*
3515 	 * Since we only ever request ONDISK, we should only ever get
3516 	 * one (type of) reply back.
3517 	 */
3518 	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3519 	req->r_result = m.result ?: data_len;
3520 	finish_request(req);
3521 	mutex_unlock(&osd->lock);
3522 	up_read(&osdc->lock);
3523 
3524 	__complete_request(req);
3525 	complete_all(&req->r_completion);
3526 	ceph_osdc_put_request(req);
3527 	return;
3528 
3529 fail_request:
3530 	complete_request(req, -EIO);
3531 out_unlock_session:
3532 	mutex_unlock(&osd->lock);
3533 out_unlock_osdc:
3534 	up_read(&osdc->lock);
3535 }
3536 
3537 static void set_pool_was_full(struct ceph_osd_client *osdc)
3538 {
3539 	struct rb_node *n;
3540 
3541 	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3542 		struct ceph_pg_pool_info *pi =
3543 		    rb_entry(n, struct ceph_pg_pool_info, node);
3544 
3545 		pi->was_full = __pool_full(pi);
3546 	}
3547 }
3548 
3549 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3550 {
3551 	struct ceph_pg_pool_info *pi;
3552 
3553 	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3554 	if (!pi)
3555 		return false;
3556 
3557 	return pi->was_full && !__pool_full(pi);
3558 }
3559 
3560 static enum calc_target_result
3561 recalc_linger_target(struct ceph_osd_linger_request *lreq)
3562 {
3563 	struct ceph_osd_client *osdc = lreq->osdc;
3564 	enum calc_target_result ct_res;
3565 
3566 	ct_res = calc_target(osdc, &lreq->t, NULL, true);
3567 	if (ct_res == CALC_TARGET_NEED_RESEND) {
3568 		struct ceph_osd *osd;
3569 
3570 		osd = lookup_create_osd(osdc, lreq->t.osd, true);
3571 		if (osd != lreq->osd) {
3572 			unlink_linger(lreq->osd, lreq);
3573 			link_linger(osd, lreq);
3574 		}
3575 	}
3576 
3577 	return ct_res;
3578 }
3579 
3580 /*
3581  * Requeue requests whose mapping to an OSD has changed.
3582  */
3583 static void scan_requests(struct ceph_osd *osd,
3584 			  bool force_resend,
3585 			  bool cleared_full,
3586 			  bool check_pool_cleared_full,
3587 			  struct rb_root *need_resend,
3588 			  struct list_head *need_resend_linger)
3589 {
3590 	struct ceph_osd_client *osdc = osd->o_osdc;
3591 	struct rb_node *n;
3592 	bool force_resend_writes;
3593 
3594 	for (n = rb_first(&osd->o_linger_requests); n; ) {
3595 		struct ceph_osd_linger_request *lreq =
3596 		    rb_entry(n, struct ceph_osd_linger_request, node);
3597 		enum calc_target_result ct_res;
3598 
3599 		n = rb_next(n); /* recalc_linger_target() */
3600 
3601 		dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3602 		     lreq->linger_id);
3603 		ct_res = recalc_linger_target(lreq);
3604 		switch (ct_res) {
3605 		case CALC_TARGET_NO_ACTION:
3606 			force_resend_writes = cleared_full ||
3607 			    (check_pool_cleared_full &&
3608 			     pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3609 			if (!force_resend && !force_resend_writes)
3610 				break;
3611 
3612 			/* fall through */
3613 		case CALC_TARGET_NEED_RESEND:
3614 			cancel_linger_map_check(lreq);
3615 			/*
3616 			 * scan_requests() for the previous epoch(s)
3617 			 * may have already added it to the list, since
3618 			 * it's not unlinked here.
3619 			 */
3620 			if (list_empty(&lreq->scan_item))
3621 				list_add_tail(&lreq->scan_item, need_resend_linger);
3622 			break;
3623 		case CALC_TARGET_POOL_DNE:
3624 			list_del_init(&lreq->scan_item);
3625 			check_linger_pool_dne(lreq);
3626 			break;
3627 		}
3628 	}
3629 
3630 	for (n = rb_first(&osd->o_requests); n; ) {
3631 		struct ceph_osd_request *req =
3632 		    rb_entry(n, struct ceph_osd_request, r_node);
3633 		enum calc_target_result ct_res;
3634 
3635 		n = rb_next(n); /* unlink_request(), check_pool_dne() */
3636 
3637 		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3638 		ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
3639 				     false);
3640 		switch (ct_res) {
3641 		case CALC_TARGET_NO_ACTION:
3642 			force_resend_writes = cleared_full ||
3643 			    (check_pool_cleared_full &&
3644 			     pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3645 			if (!force_resend &&
3646 			    (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3647 			     !force_resend_writes))
3648 				break;
3649 
3650 			/* fall through */
3651 		case CALC_TARGET_NEED_RESEND:
3652 			cancel_map_check(req);
3653 			unlink_request(osd, req);
3654 			insert_request(need_resend, req);
3655 			break;
3656 		case CALC_TARGET_POOL_DNE:
3657 			check_pool_dne(req);
3658 			break;
3659 		}
3660 	}
3661 }
3662 
3663 static int handle_one_map(struct ceph_osd_client *osdc,
3664 			  void *p, void *end, bool incremental,
3665 			  struct rb_root *need_resend,
3666 			  struct list_head *need_resend_linger)
3667 {
3668 	struct ceph_osdmap *newmap;
3669 	struct rb_node *n;
3670 	bool skipped_map = false;
3671 	bool was_full;
3672 
3673 	was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3674 	set_pool_was_full(osdc);
3675 
3676 	if (incremental)
3677 		newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
3678 	else
3679 		newmap = ceph_osdmap_decode(&p, end);
3680 	if (IS_ERR(newmap))
3681 		return PTR_ERR(newmap);
3682 
3683 	if (newmap != osdc->osdmap) {
3684 		/*
3685 		 * Preserve ->was_full before destroying the old map.
3686 		 * For pools that weren't in the old map, ->was_full
3687 		 * should be false.
3688 		 */
3689 		for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
3690 			struct ceph_pg_pool_info *pi =
3691 			    rb_entry(n, struct ceph_pg_pool_info, node);
3692 			struct ceph_pg_pool_info *old_pi;
3693 
3694 			old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
3695 			if (old_pi)
3696 				pi->was_full = old_pi->was_full;
3697 			else
3698 				WARN_ON(pi->was_full);
3699 		}
3700 
3701 		if (osdc->osdmap->epoch &&
3702 		    osdc->osdmap->epoch + 1 < newmap->epoch) {
3703 			WARN_ON(incremental);
3704 			skipped_map = true;
3705 		}
3706 
3707 		ceph_osdmap_destroy(osdc->osdmap);
3708 		osdc->osdmap = newmap;
3709 	}
3710 
3711 	was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3712 	scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
3713 		      need_resend, need_resend_linger);
3714 
3715 	for (n = rb_first(&osdc->osds); n; ) {
3716 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3717 
3718 		n = rb_next(n); /* close_osd() */
3719 
3720 		scan_requests(osd, skipped_map, was_full, true, need_resend,
3721 			      need_resend_linger);
3722 		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3723 		    memcmp(&osd->o_con.peer_addr,
3724 			   ceph_osd_addr(osdc->osdmap, osd->o_osd),
3725 			   sizeof(struct ceph_entity_addr)))
3726 			close_osd(osd);
3727 	}
3728 
3729 	return 0;
3730 }
3731 
3732 static void kick_requests(struct ceph_osd_client *osdc,
3733 			  struct rb_root *need_resend,
3734 			  struct list_head *need_resend_linger)
3735 {
3736 	struct ceph_osd_linger_request *lreq, *nlreq;
3737 	enum calc_target_result ct_res;
3738 	struct rb_node *n;
3739 
3740 	/* make sure need_resend targets reflect latest map */
3741 	for (n = rb_first(need_resend); n; ) {
3742 		struct ceph_osd_request *req =
3743 		    rb_entry(n, struct ceph_osd_request, r_node);
3744 
3745 		n = rb_next(n);
3746 
3747 		if (req->r_t.epoch < osdc->osdmap->epoch) {
3748 			ct_res = calc_target(osdc, &req->r_t, NULL, false);
3749 			if (ct_res == CALC_TARGET_POOL_DNE) {
3750 				erase_request(need_resend, req);
3751 				check_pool_dne(req);
3752 			}
3753 		}
3754 	}
3755 
3756 	for (n = rb_first(need_resend); n; ) {
3757 		struct ceph_osd_request *req =
3758 		    rb_entry(n, struct ceph_osd_request, r_node);
3759 		struct ceph_osd *osd;
3760 
3761 		n = rb_next(n);
3762 		erase_request(need_resend, req); /* before link_request() */
3763 
3764 		osd = lookup_create_osd(osdc, req->r_t.osd, true);
3765 		link_request(osd, req);
3766 		if (!req->r_linger) {
3767 			if (!osd_homeless(osd) && !req->r_t.paused)
3768 				send_request(req);
3769 		} else {
3770 			cancel_linger_request(req);
3771 		}
3772 	}
3773 
3774 	list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
3775 		if (!osd_homeless(lreq->osd))
3776 			send_linger(lreq);
3777 
3778 		list_del_init(&lreq->scan_item);
3779 	}
3780 }
3781 
3782 /*
3783  * Process updated osd map.
3784  *
3785  * The message contains any number of incremental and full maps, normally
3786  * indicating some sort of topology change in the cluster.  Kick requests
3787  * off to different OSDs as needed.
3788  */
3789 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
3790 {
3791 	void *p = msg->front.iov_base;
3792 	void *const end = p + msg->front.iov_len;
3793 	u32 nr_maps, maplen;
3794 	u32 epoch;
3795 	struct ceph_fsid fsid;
3796 	struct rb_root need_resend = RB_ROOT;
3797 	LIST_HEAD(need_resend_linger);
3798 	bool handled_incremental = false;
3799 	bool was_pauserd, was_pausewr;
3800 	bool pauserd, pausewr;
3801 	int err;
3802 
3803 	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
3804 	down_write(&osdc->lock);
3805 
3806 	/* verify fsid */
3807 	ceph_decode_need(&p, end, sizeof(fsid), bad);
3808 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3809 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
3810 		goto bad;
3811 
3812 	was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3813 	was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3814 		      ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3815 		      have_pool_full(osdc);
3816 
3817 	/* incremental maps */
3818 	ceph_decode_32_safe(&p, end, nr_maps, bad);
3819 	dout(" %d inc maps\n", nr_maps);
3820 	while (nr_maps > 0) {
3821 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3822 		epoch = ceph_decode_32(&p);
3823 		maplen = ceph_decode_32(&p);
3824 		ceph_decode_need(&p, end, maplen, bad);
3825 		if (osdc->osdmap->epoch &&
3826 		    osdc->osdmap->epoch + 1 == epoch) {
3827 			dout("applying incremental map %u len %d\n",
3828 			     epoch, maplen);
3829 			err = handle_one_map(osdc, p, p + maplen, true,
3830 					     &need_resend, &need_resend_linger);
3831 			if (err)
3832 				goto bad;
3833 			handled_incremental = true;
3834 		} else {
3835 			dout("ignoring incremental map %u len %d\n",
3836 			     epoch, maplen);
3837 		}
3838 		p += maplen;
3839 		nr_maps--;
3840 	}
3841 	if (handled_incremental)
3842 		goto done;
3843 
3844 	/* full maps */
3845 	ceph_decode_32_safe(&p, end, nr_maps, bad);
3846 	dout(" %d full maps\n", nr_maps);
3847 	while (nr_maps) {
3848 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3849 		epoch = ceph_decode_32(&p);
3850 		maplen = ceph_decode_32(&p);
3851 		ceph_decode_need(&p, end, maplen, bad);
3852 		if (nr_maps > 1) {
3853 			dout("skipping non-latest full map %u len %d\n",
3854 			     epoch, maplen);
3855 		} else if (osdc->osdmap->epoch >= epoch) {
3856 			dout("skipping full map %u len %d, "
3857 			     "older than our %u\n", epoch, maplen,
3858 			     osdc->osdmap->epoch);
3859 		} else {
3860 			dout("taking full map %u len %d\n", epoch, maplen);
3861 			err = handle_one_map(osdc, p, p + maplen, false,
3862 					     &need_resend, &need_resend_linger);
3863 			if (err)
3864 				goto bad;
3865 		}
3866 		p += maplen;
3867 		nr_maps--;
3868 	}
3869 
3870 done:
3871 	/*
3872 	 * subscribe to subsequent osdmap updates if full to ensure
3873 	 * we find out when we are no longer full and stop returning
3874 	 * ENOSPC.
3875 	 */
3876 	pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3877 	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3878 		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3879 		  have_pool_full(osdc);
3880 	if (was_pauserd || was_pausewr || pauserd || pausewr ||
3881 	    osdc->osdmap->epoch < osdc->epoch_barrier)
3882 		maybe_request_map(osdc);
3883 
3884 	kick_requests(osdc, &need_resend, &need_resend_linger);
3885 
3886 	ceph_osdc_abort_on_full(osdc);
3887 	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
3888 			  osdc->osdmap->epoch);
3889 	up_write(&osdc->lock);
3890 	wake_up_all(&osdc->client->auth_wq);
3891 	return;
3892 
3893 bad:
3894 	pr_err("osdc handle_map corrupt msg\n");
3895 	ceph_msg_dump(msg);
3896 	up_write(&osdc->lock);
3897 }
3898 
3899 /*
3900  * Resubmit requests pending on the given osd.
3901  */
3902 static void kick_osd_requests(struct ceph_osd *osd)
3903 {
3904 	struct rb_node *n;
3905 
3906 	clear_backoffs(osd);
3907 
3908 	for (n = rb_first(&osd->o_requests); n; ) {
3909 		struct ceph_osd_request *req =
3910 		    rb_entry(n, struct ceph_osd_request, r_node);
3911 
3912 		n = rb_next(n); /* cancel_linger_request() */
3913 
3914 		if (!req->r_linger) {
3915 			if (!req->r_t.paused)
3916 				send_request(req);
3917 		} else {
3918 			cancel_linger_request(req);
3919 		}
3920 	}
3921 	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
3922 		struct ceph_osd_linger_request *lreq =
3923 		    rb_entry(n, struct ceph_osd_linger_request, node);
3924 
3925 		send_linger(lreq);
3926 	}
3927 }
3928 
3929 /*
3930  * If the osd connection drops, we need to resubmit all requests.
3931  */
3932 static void osd_fault(struct ceph_connection *con)
3933 {
3934 	struct ceph_osd *osd = con->private;
3935 	struct ceph_osd_client *osdc = osd->o_osdc;
3936 
3937 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
3938 
3939 	down_write(&osdc->lock);
3940 	if (!osd_registered(osd)) {
3941 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3942 		goto out_unlock;
3943 	}
3944 
3945 	if (!reopen_osd(osd))
3946 		kick_osd_requests(osd);
3947 	maybe_request_map(osdc);
3948 
3949 out_unlock:
3950 	up_write(&osdc->lock);
3951 }
3952 
3953 struct MOSDBackoff {
3954 	struct ceph_spg spgid;
3955 	u32 map_epoch;
3956 	u8 op;
3957 	u64 id;
3958 	struct ceph_hobject_id *begin;
3959 	struct ceph_hobject_id *end;
3960 };
3961 
3962 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
3963 {
3964 	void *p = msg->front.iov_base;
3965 	void *const end = p + msg->front.iov_len;
3966 	u8 struct_v;
3967 	u32 struct_len;
3968 	int ret;
3969 
3970 	ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
3971 	if (ret)
3972 		return ret;
3973 
3974 	ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
3975 	if (ret)
3976 		return ret;
3977 
3978 	ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
3979 	ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
3980 	ceph_decode_8_safe(&p, end, m->op, e_inval);
3981 	ceph_decode_64_safe(&p, end, m->id, e_inval);
3982 
3983 	m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
3984 	if (!m->begin)
3985 		return -ENOMEM;
3986 
3987 	ret = decode_hoid(&p, end, m->begin);
3988 	if (ret) {
3989 		free_hoid(m->begin);
3990 		return ret;
3991 	}
3992 
3993 	m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
3994 	if (!m->end) {
3995 		free_hoid(m->begin);
3996 		return -ENOMEM;
3997 	}
3998 
3999 	ret = decode_hoid(&p, end, m->end);
4000 	if (ret) {
4001 		free_hoid(m->begin);
4002 		free_hoid(m->end);
4003 		return ret;
4004 	}
4005 
4006 	return 0;
4007 
4008 e_inval:
4009 	return -EINVAL;
4010 }
4011 
4012 static struct ceph_msg *create_backoff_message(
4013 				const struct ceph_osd_backoff *backoff,
4014 				u32 map_epoch)
4015 {
4016 	struct ceph_msg *msg;
4017 	void *p, *end;
4018 	int msg_size;
4019 
4020 	msg_size = CEPH_ENCODING_START_BLK_LEN +
4021 			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4022 	msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4023 	msg_size += CEPH_ENCODING_START_BLK_LEN +
4024 			hoid_encoding_size(backoff->begin);
4025 	msg_size += CEPH_ENCODING_START_BLK_LEN +
4026 			hoid_encoding_size(backoff->end);
4027 
4028 	msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4029 	if (!msg)
4030 		return NULL;
4031 
4032 	p = msg->front.iov_base;
4033 	end = p + msg->front_alloc_len;
4034 
4035 	encode_spgid(&p, &backoff->spgid);
4036 	ceph_encode_32(&p, map_epoch);
4037 	ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4038 	ceph_encode_64(&p, backoff->id);
4039 	encode_hoid(&p, end, backoff->begin);
4040 	encode_hoid(&p, end, backoff->end);
4041 	BUG_ON(p != end);
4042 
4043 	msg->front.iov_len = p - msg->front.iov_base;
4044 	msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4045 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4046 
4047 	return msg;
4048 }
4049 
4050 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4051 {
4052 	struct ceph_spg_mapping *spg;
4053 	struct ceph_osd_backoff *backoff;
4054 	struct ceph_msg *msg;
4055 
4056 	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4057 	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4058 
4059 	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4060 	if (!spg) {
4061 		spg = alloc_spg_mapping();
4062 		if (!spg) {
4063 			pr_err("%s failed to allocate spg\n", __func__);
4064 			return;
4065 		}
4066 		spg->spgid = m->spgid; /* struct */
4067 		insert_spg_mapping(&osd->o_backoff_mappings, spg);
4068 	}
4069 
4070 	backoff = alloc_backoff();
4071 	if (!backoff) {
4072 		pr_err("%s failed to allocate backoff\n", __func__);
4073 		return;
4074 	}
4075 	backoff->spgid = m->spgid; /* struct */
4076 	backoff->id = m->id;
4077 	backoff->begin = m->begin;
4078 	m->begin = NULL; /* backoff now owns this */
4079 	backoff->end = m->end;
4080 	m->end = NULL;   /* ditto */
4081 
4082 	insert_backoff(&spg->backoffs, backoff);
4083 	insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4084 
4085 	/*
4086 	 * Ack with original backoff's epoch so that the OSD can
4087 	 * discard this if there was a PG split.
4088 	 */
4089 	msg = create_backoff_message(backoff, m->map_epoch);
4090 	if (!msg) {
4091 		pr_err("%s failed to allocate msg\n", __func__);
4092 		return;
4093 	}
4094 	ceph_con_send(&osd->o_con, msg);
4095 }
4096 
4097 static bool target_contained_by(const struct ceph_osd_request_target *t,
4098 				const struct ceph_hobject_id *begin,
4099 				const struct ceph_hobject_id *end)
4100 {
4101 	struct ceph_hobject_id hoid;
4102 	int cmp;
4103 
4104 	hoid_fill_from_target(&hoid, t);
4105 	cmp = hoid_compare(&hoid, begin);
4106 	return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4107 }
4108 
4109 static void handle_backoff_unblock(struct ceph_osd *osd,
4110 				   const struct MOSDBackoff *m)
4111 {
4112 	struct ceph_spg_mapping *spg;
4113 	struct ceph_osd_backoff *backoff;
4114 	struct rb_node *n;
4115 
4116 	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4117 	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4118 
4119 	backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4120 	if (!backoff) {
4121 		pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4122 		       __func__, osd->o_osd, m->spgid.pgid.pool,
4123 		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4124 		return;
4125 	}
4126 
4127 	if (hoid_compare(backoff->begin, m->begin) &&
4128 	    hoid_compare(backoff->end, m->end)) {
4129 		pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4130 		       __func__, osd->o_osd, m->spgid.pgid.pool,
4131 		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4132 		/* unblock it anyway... */
4133 	}
4134 
4135 	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4136 	BUG_ON(!spg);
4137 
4138 	erase_backoff(&spg->backoffs, backoff);
4139 	erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4140 	free_backoff(backoff);
4141 
4142 	if (RB_EMPTY_ROOT(&spg->backoffs)) {
4143 		erase_spg_mapping(&osd->o_backoff_mappings, spg);
4144 		free_spg_mapping(spg);
4145 	}
4146 
4147 	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4148 		struct ceph_osd_request *req =
4149 		    rb_entry(n, struct ceph_osd_request, r_node);
4150 
4151 		if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4152 			/*
4153 			 * Match against @m, not @backoff -- the PG may
4154 			 * have split on the OSD.
4155 			 */
4156 			if (target_contained_by(&req->r_t, m->begin, m->end)) {
4157 				/*
4158 				 * If no other installed backoff applies,
4159 				 * resend.
4160 				 */
4161 				send_request(req);
4162 			}
4163 		}
4164 	}
4165 }
4166 
4167 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4168 {
4169 	struct ceph_osd_client *osdc = osd->o_osdc;
4170 	struct MOSDBackoff m;
4171 	int ret;
4172 
4173 	down_read(&osdc->lock);
4174 	if (!osd_registered(osd)) {
4175 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
4176 		up_read(&osdc->lock);
4177 		return;
4178 	}
4179 	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4180 
4181 	mutex_lock(&osd->lock);
4182 	ret = decode_MOSDBackoff(msg, &m);
4183 	if (ret) {
4184 		pr_err("failed to decode MOSDBackoff: %d\n", ret);
4185 		ceph_msg_dump(msg);
4186 		goto out_unlock;
4187 	}
4188 
4189 	switch (m.op) {
4190 	case CEPH_OSD_BACKOFF_OP_BLOCK:
4191 		handle_backoff_block(osd, &m);
4192 		break;
4193 	case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4194 		handle_backoff_unblock(osd, &m);
4195 		break;
4196 	default:
4197 		pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4198 	}
4199 
4200 	free_hoid(m.begin);
4201 	free_hoid(m.end);
4202 
4203 out_unlock:
4204 	mutex_unlock(&osd->lock);
4205 	up_read(&osdc->lock);
4206 }
4207 
4208 /*
4209  * Process osd watch notifications
4210  */
4211 static void handle_watch_notify(struct ceph_osd_client *osdc,
4212 				struct ceph_msg *msg)
4213 {
4214 	void *p = msg->front.iov_base;
4215 	void *const end = p + msg->front.iov_len;
4216 	struct ceph_osd_linger_request *lreq;
4217 	struct linger_work *lwork;
4218 	u8 proto_ver, opcode;
4219 	u64 cookie, notify_id;
4220 	u64 notifier_id = 0;
4221 	s32 return_code = 0;
4222 	void *payload = NULL;
4223 	u32 payload_len = 0;
4224 
4225 	ceph_decode_8_safe(&p, end, proto_ver, bad);
4226 	ceph_decode_8_safe(&p, end, opcode, bad);
4227 	ceph_decode_64_safe(&p, end, cookie, bad);
4228 	p += 8; /* skip ver */
4229 	ceph_decode_64_safe(&p, end, notify_id, bad);
4230 
4231 	if (proto_ver >= 1) {
4232 		ceph_decode_32_safe(&p, end, payload_len, bad);
4233 		ceph_decode_need(&p, end, payload_len, bad);
4234 		payload = p;
4235 		p += payload_len;
4236 	}
4237 
4238 	if (le16_to_cpu(msg->hdr.version) >= 2)
4239 		ceph_decode_32_safe(&p, end, return_code, bad);
4240 
4241 	if (le16_to_cpu(msg->hdr.version) >= 3)
4242 		ceph_decode_64_safe(&p, end, notifier_id, bad);
4243 
4244 	down_read(&osdc->lock);
4245 	lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4246 	if (!lreq) {
4247 		dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4248 		     cookie);
4249 		goto out_unlock_osdc;
4250 	}
4251 
4252 	mutex_lock(&lreq->lock);
4253 	dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4254 	     opcode, cookie, lreq, lreq->is_watch);
4255 	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4256 		if (!lreq->last_error) {
4257 			lreq->last_error = -ENOTCONN;
4258 			queue_watch_error(lreq);
4259 		}
4260 	} else if (!lreq->is_watch) {
4261 		/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4262 		if (lreq->notify_id && lreq->notify_id != notify_id) {
4263 			dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4264 			     lreq->notify_id, notify_id);
4265 		} else if (!completion_done(&lreq->notify_finish_wait)) {
4266 			struct ceph_msg_data *data =
4267 			    list_first_entry_or_null(&msg->data,
4268 						     struct ceph_msg_data,
4269 						     links);
4270 
4271 			if (data) {
4272 				if (lreq->preply_pages) {
4273 					WARN_ON(data->type !=
4274 							CEPH_MSG_DATA_PAGES);
4275 					*lreq->preply_pages = data->pages;
4276 					*lreq->preply_len = data->length;
4277 				} else {
4278 					ceph_release_page_vector(data->pages,
4279 					       calc_pages_for(0, data->length));
4280 				}
4281 			}
4282 			lreq->notify_finish_error = return_code;
4283 			complete_all(&lreq->notify_finish_wait);
4284 		}
4285 	} else {
4286 		/* CEPH_WATCH_EVENT_NOTIFY */
4287 		lwork = lwork_alloc(lreq, do_watch_notify);
4288 		if (!lwork) {
4289 			pr_err("failed to allocate notify-lwork\n");
4290 			goto out_unlock_lreq;
4291 		}
4292 
4293 		lwork->notify.notify_id = notify_id;
4294 		lwork->notify.notifier_id = notifier_id;
4295 		lwork->notify.payload = payload;
4296 		lwork->notify.payload_len = payload_len;
4297 		lwork->notify.msg = ceph_msg_get(msg);
4298 		lwork_queue(lwork);
4299 	}
4300 
4301 out_unlock_lreq:
4302 	mutex_unlock(&lreq->lock);
4303 out_unlock_osdc:
4304 	up_read(&osdc->lock);
4305 	return;
4306 
4307 bad:
4308 	pr_err("osdc handle_watch_notify corrupt msg\n");
4309 }
4310 
4311 /*
4312  * Register request, send initial attempt.
4313  */
4314 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
4315 			    struct ceph_osd_request *req,
4316 			    bool nofail)
4317 {
4318 	down_read(&osdc->lock);
4319 	submit_request(req, false);
4320 	up_read(&osdc->lock);
4321 
4322 	return 0;
4323 }
4324 EXPORT_SYMBOL(ceph_osdc_start_request);
4325 
4326 /*
4327  * Unregister a registered request.  The request is not completed:
4328  * ->r_result isn't set and __complete_request() isn't called.
4329  */
4330 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4331 {
4332 	struct ceph_osd_client *osdc = req->r_osdc;
4333 
4334 	down_write(&osdc->lock);
4335 	if (req->r_osd)
4336 		cancel_request(req);
4337 	up_write(&osdc->lock);
4338 }
4339 EXPORT_SYMBOL(ceph_osdc_cancel_request);
4340 
4341 /*
4342  * @timeout: in jiffies, 0 means "wait forever"
4343  */
4344 static int wait_request_timeout(struct ceph_osd_request *req,
4345 				unsigned long timeout)
4346 {
4347 	long left;
4348 
4349 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4350 	left = wait_for_completion_killable_timeout(&req->r_completion,
4351 						ceph_timeout_jiffies(timeout));
4352 	if (left <= 0) {
4353 		left = left ?: -ETIMEDOUT;
4354 		ceph_osdc_cancel_request(req);
4355 	} else {
4356 		left = req->r_result; /* completed */
4357 	}
4358 
4359 	return left;
4360 }
4361 
4362 /*
4363  * wait for a request to complete
4364  */
4365 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4366 			   struct ceph_osd_request *req)
4367 {
4368 	return wait_request_timeout(req, 0);
4369 }
4370 EXPORT_SYMBOL(ceph_osdc_wait_request);
4371 
4372 /*
4373  * sync - wait for all in-flight requests to flush.  avoid starvation.
4374  */
4375 void ceph_osdc_sync(struct ceph_osd_client *osdc)
4376 {
4377 	struct rb_node *n, *p;
4378 	u64 last_tid = atomic64_read(&osdc->last_tid);
4379 
4380 again:
4381 	down_read(&osdc->lock);
4382 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4383 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4384 
4385 		mutex_lock(&osd->lock);
4386 		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4387 			struct ceph_osd_request *req =
4388 			    rb_entry(p, struct ceph_osd_request, r_node);
4389 
4390 			if (req->r_tid > last_tid)
4391 				break;
4392 
4393 			if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4394 				continue;
4395 
4396 			ceph_osdc_get_request(req);
4397 			mutex_unlock(&osd->lock);
4398 			up_read(&osdc->lock);
4399 			dout("%s waiting on req %p tid %llu last_tid %llu\n",
4400 			     __func__, req, req->r_tid, last_tid);
4401 			wait_for_completion(&req->r_completion);
4402 			ceph_osdc_put_request(req);
4403 			goto again;
4404 		}
4405 
4406 		mutex_unlock(&osd->lock);
4407 	}
4408 
4409 	up_read(&osdc->lock);
4410 	dout("%s done last_tid %llu\n", __func__, last_tid);
4411 }
4412 EXPORT_SYMBOL(ceph_osdc_sync);
4413 
4414 static struct ceph_osd_request *
4415 alloc_linger_request(struct ceph_osd_linger_request *lreq)
4416 {
4417 	struct ceph_osd_request *req;
4418 
4419 	req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
4420 	if (!req)
4421 		return NULL;
4422 
4423 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4424 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4425 
4426 	if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4427 		ceph_osdc_put_request(req);
4428 		return NULL;
4429 	}
4430 
4431 	return req;
4432 }
4433 
4434 /*
4435  * Returns a handle, caller owns a ref.
4436  */
4437 struct ceph_osd_linger_request *
4438 ceph_osdc_watch(struct ceph_osd_client *osdc,
4439 		struct ceph_object_id *oid,
4440 		struct ceph_object_locator *oloc,
4441 		rados_watchcb2_t wcb,
4442 		rados_watcherrcb_t errcb,
4443 		void *data)
4444 {
4445 	struct ceph_osd_linger_request *lreq;
4446 	int ret;
4447 
4448 	lreq = linger_alloc(osdc);
4449 	if (!lreq)
4450 		return ERR_PTR(-ENOMEM);
4451 
4452 	lreq->is_watch = true;
4453 	lreq->wcb = wcb;
4454 	lreq->errcb = errcb;
4455 	lreq->data = data;
4456 	lreq->watch_valid_thru = jiffies;
4457 
4458 	ceph_oid_copy(&lreq->t.base_oid, oid);
4459 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4460 	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4461 	ktime_get_real_ts(&lreq->mtime);
4462 
4463 	lreq->reg_req = alloc_linger_request(lreq);
4464 	if (!lreq->reg_req) {
4465 		ret = -ENOMEM;
4466 		goto err_put_lreq;
4467 	}
4468 
4469 	lreq->ping_req = alloc_linger_request(lreq);
4470 	if (!lreq->ping_req) {
4471 		ret = -ENOMEM;
4472 		goto err_put_lreq;
4473 	}
4474 
4475 	down_write(&osdc->lock);
4476 	linger_register(lreq); /* before osd_req_op_* */
4477 	osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
4478 			      CEPH_OSD_WATCH_OP_WATCH);
4479 	osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
4480 			      CEPH_OSD_WATCH_OP_PING);
4481 	linger_submit(lreq);
4482 	up_write(&osdc->lock);
4483 
4484 	ret = linger_reg_commit_wait(lreq);
4485 	if (ret) {
4486 		linger_cancel(lreq);
4487 		goto err_put_lreq;
4488 	}
4489 
4490 	return lreq;
4491 
4492 err_put_lreq:
4493 	linger_put(lreq);
4494 	return ERR_PTR(ret);
4495 }
4496 EXPORT_SYMBOL(ceph_osdc_watch);
4497 
4498 /*
4499  * Releases a ref.
4500  *
4501  * Times out after mount_timeout to preserve rbd unmap behaviour
4502  * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4503  * with mount_timeout").
4504  */
4505 int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4506 		      struct ceph_osd_linger_request *lreq)
4507 {
4508 	struct ceph_options *opts = osdc->client->options;
4509 	struct ceph_osd_request *req;
4510 	int ret;
4511 
4512 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4513 	if (!req)
4514 		return -ENOMEM;
4515 
4516 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4517 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4518 	req->r_flags = CEPH_OSD_FLAG_WRITE;
4519 	ktime_get_real_ts(&req->r_mtime);
4520 	osd_req_op_watch_init(req, 0, lreq->linger_id,
4521 			      CEPH_OSD_WATCH_OP_UNWATCH);
4522 
4523 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4524 	if (ret)
4525 		goto out_put_req;
4526 
4527 	ceph_osdc_start_request(osdc, req, false);
4528 	linger_cancel(lreq);
4529 	linger_put(lreq);
4530 	ret = wait_request_timeout(req, opts->mount_timeout);
4531 
4532 out_put_req:
4533 	ceph_osdc_put_request(req);
4534 	return ret;
4535 }
4536 EXPORT_SYMBOL(ceph_osdc_unwatch);
4537 
4538 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4539 				      u64 notify_id, u64 cookie, void *payload,
4540 				      size_t payload_len)
4541 {
4542 	struct ceph_osd_req_op *op;
4543 	struct ceph_pagelist *pl;
4544 	int ret;
4545 
4546 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4547 
4548 	pl = kmalloc(sizeof(*pl), GFP_NOIO);
4549 	if (!pl)
4550 		return -ENOMEM;
4551 
4552 	ceph_pagelist_init(pl);
4553 	ret = ceph_pagelist_encode_64(pl, notify_id);
4554 	ret |= ceph_pagelist_encode_64(pl, cookie);
4555 	if (payload) {
4556 		ret |= ceph_pagelist_encode_32(pl, payload_len);
4557 		ret |= ceph_pagelist_append(pl, payload, payload_len);
4558 	} else {
4559 		ret |= ceph_pagelist_encode_32(pl, 0);
4560 	}
4561 	if (ret) {
4562 		ceph_pagelist_release(pl);
4563 		return -ENOMEM;
4564 	}
4565 
4566 	ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4567 	op->indata_len = pl->length;
4568 	return 0;
4569 }
4570 
4571 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4572 			 struct ceph_object_id *oid,
4573 			 struct ceph_object_locator *oloc,
4574 			 u64 notify_id,
4575 			 u64 cookie,
4576 			 void *payload,
4577 			 size_t payload_len)
4578 {
4579 	struct ceph_osd_request *req;
4580 	int ret;
4581 
4582 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4583 	if (!req)
4584 		return -ENOMEM;
4585 
4586 	ceph_oid_copy(&req->r_base_oid, oid);
4587 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4588 	req->r_flags = CEPH_OSD_FLAG_READ;
4589 
4590 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4591 	if (ret)
4592 		goto out_put_req;
4593 
4594 	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4595 					 payload_len);
4596 	if (ret)
4597 		goto out_put_req;
4598 
4599 	ceph_osdc_start_request(osdc, req, false);
4600 	ret = ceph_osdc_wait_request(osdc, req);
4601 
4602 out_put_req:
4603 	ceph_osdc_put_request(req);
4604 	return ret;
4605 }
4606 EXPORT_SYMBOL(ceph_osdc_notify_ack);
4607 
4608 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4609 				  u64 cookie, u32 prot_ver, u32 timeout,
4610 				  void *payload, size_t payload_len)
4611 {
4612 	struct ceph_osd_req_op *op;
4613 	struct ceph_pagelist *pl;
4614 	int ret;
4615 
4616 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4617 	op->notify.cookie = cookie;
4618 
4619 	pl = kmalloc(sizeof(*pl), GFP_NOIO);
4620 	if (!pl)
4621 		return -ENOMEM;
4622 
4623 	ceph_pagelist_init(pl);
4624 	ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4625 	ret |= ceph_pagelist_encode_32(pl, timeout);
4626 	ret |= ceph_pagelist_encode_32(pl, payload_len);
4627 	ret |= ceph_pagelist_append(pl, payload, payload_len);
4628 	if (ret) {
4629 		ceph_pagelist_release(pl);
4630 		return -ENOMEM;
4631 	}
4632 
4633 	ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
4634 	op->indata_len = pl->length;
4635 	return 0;
4636 }
4637 
4638 /*
4639  * @timeout: in seconds
4640  *
4641  * @preply_{pages,len} are initialized both on success and error.
4642  * The caller is responsible for:
4643  *
4644  *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4645  */
4646 int ceph_osdc_notify(struct ceph_osd_client *osdc,
4647 		     struct ceph_object_id *oid,
4648 		     struct ceph_object_locator *oloc,
4649 		     void *payload,
4650 		     size_t payload_len,
4651 		     u32 timeout,
4652 		     struct page ***preply_pages,
4653 		     size_t *preply_len)
4654 {
4655 	struct ceph_osd_linger_request *lreq;
4656 	struct page **pages;
4657 	int ret;
4658 
4659 	WARN_ON(!timeout);
4660 	if (preply_pages) {
4661 		*preply_pages = NULL;
4662 		*preply_len = 0;
4663 	}
4664 
4665 	lreq = linger_alloc(osdc);
4666 	if (!lreq)
4667 		return -ENOMEM;
4668 
4669 	lreq->preply_pages = preply_pages;
4670 	lreq->preply_len = preply_len;
4671 
4672 	ceph_oid_copy(&lreq->t.base_oid, oid);
4673 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4674 	lreq->t.flags = CEPH_OSD_FLAG_READ;
4675 
4676 	lreq->reg_req = alloc_linger_request(lreq);
4677 	if (!lreq->reg_req) {
4678 		ret = -ENOMEM;
4679 		goto out_put_lreq;
4680 	}
4681 
4682 	/* for notify_id */
4683 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
4684 	if (IS_ERR(pages)) {
4685 		ret = PTR_ERR(pages);
4686 		goto out_put_lreq;
4687 	}
4688 
4689 	down_write(&osdc->lock);
4690 	linger_register(lreq); /* before osd_req_op_* */
4691 	ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4692 				     timeout, payload, payload_len);
4693 	if (ret) {
4694 		linger_unregister(lreq);
4695 		up_write(&osdc->lock);
4696 		ceph_release_page_vector(pages, 1);
4697 		goto out_put_lreq;
4698 	}
4699 	ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4700 						 response_data),
4701 				 pages, PAGE_SIZE, 0, false, true);
4702 	linger_submit(lreq);
4703 	up_write(&osdc->lock);
4704 
4705 	ret = linger_reg_commit_wait(lreq);
4706 	if (!ret)
4707 		ret = linger_notify_finish_wait(lreq);
4708 	else
4709 		dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4710 
4711 	linger_cancel(lreq);
4712 out_put_lreq:
4713 	linger_put(lreq);
4714 	return ret;
4715 }
4716 EXPORT_SYMBOL(ceph_osdc_notify);
4717 
4718 /*
4719  * Return the number of milliseconds since the watch was last
4720  * confirmed, or an error.  If there is an error, the watch is no
4721  * longer valid, and should be destroyed with ceph_osdc_unwatch().
4722  */
4723 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
4724 			  struct ceph_osd_linger_request *lreq)
4725 {
4726 	unsigned long stamp, age;
4727 	int ret;
4728 
4729 	down_read(&osdc->lock);
4730 	mutex_lock(&lreq->lock);
4731 	stamp = lreq->watch_valid_thru;
4732 	if (!list_empty(&lreq->pending_lworks)) {
4733 		struct linger_work *lwork =
4734 		    list_first_entry(&lreq->pending_lworks,
4735 				     struct linger_work,
4736 				     pending_item);
4737 
4738 		if (time_before(lwork->queued_stamp, stamp))
4739 			stamp = lwork->queued_stamp;
4740 	}
4741 	age = jiffies - stamp;
4742 	dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
4743 	     lreq, lreq->linger_id, age, lreq->last_error);
4744 	/* we are truncating to msecs, so return a safe upper bound */
4745 	ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
4746 
4747 	mutex_unlock(&lreq->lock);
4748 	up_read(&osdc->lock);
4749 	return ret;
4750 }
4751 
4752 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4753 {
4754 	u8 struct_v;
4755 	u32 struct_len;
4756 	int ret;
4757 
4758 	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4759 				  &struct_v, &struct_len);
4760 	if (ret)
4761 		return ret;
4762 
4763 	ceph_decode_copy(p, &item->name, sizeof(item->name));
4764 	item->cookie = ceph_decode_64(p);
4765 	*p += 4; /* skip timeout_seconds */
4766 	if (struct_v >= 2) {
4767 		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
4768 		ceph_decode_addr(&item->addr);
4769 	}
4770 
4771 	dout("%s %s%llu cookie %llu addr %s\n", __func__,
4772 	     ENTITY_NAME(item->name), item->cookie,
4773 	     ceph_pr_addr(&item->addr.in_addr));
4774 	return 0;
4775 }
4776 
4777 static int decode_watchers(void **p, void *end,
4778 			   struct ceph_watch_item **watchers,
4779 			   u32 *num_watchers)
4780 {
4781 	u8 struct_v;
4782 	u32 struct_len;
4783 	int i;
4784 	int ret;
4785 
4786 	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
4787 				  &struct_v, &struct_len);
4788 	if (ret)
4789 		return ret;
4790 
4791 	*num_watchers = ceph_decode_32(p);
4792 	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
4793 	if (!*watchers)
4794 		return -ENOMEM;
4795 
4796 	for (i = 0; i < *num_watchers; i++) {
4797 		ret = decode_watcher(p, end, *watchers + i);
4798 		if (ret) {
4799 			kfree(*watchers);
4800 			return ret;
4801 		}
4802 	}
4803 
4804 	return 0;
4805 }
4806 
4807 /*
4808  * On success, the caller is responsible for:
4809  *
4810  *     kfree(watchers);
4811  */
4812 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4813 			    struct ceph_object_id *oid,
4814 			    struct ceph_object_locator *oloc,
4815 			    struct ceph_watch_item **watchers,
4816 			    u32 *num_watchers)
4817 {
4818 	struct ceph_osd_request *req;
4819 	struct page **pages;
4820 	int ret;
4821 
4822 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4823 	if (!req)
4824 		return -ENOMEM;
4825 
4826 	ceph_oid_copy(&req->r_base_oid, oid);
4827 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4828 	req->r_flags = CEPH_OSD_FLAG_READ;
4829 
4830 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4831 	if (ret)
4832 		goto out_put_req;
4833 
4834 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
4835 	if (IS_ERR(pages)) {
4836 		ret = PTR_ERR(pages);
4837 		goto out_put_req;
4838 	}
4839 
4840 	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
4841 	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
4842 						 response_data),
4843 				 pages, PAGE_SIZE, 0, false, true);
4844 
4845 	ceph_osdc_start_request(osdc, req, false);
4846 	ret = ceph_osdc_wait_request(osdc, req);
4847 	if (ret >= 0) {
4848 		void *p = page_address(pages[0]);
4849 		void *const end = p + req->r_ops[0].outdata_len;
4850 
4851 		ret = decode_watchers(&p, end, watchers, num_watchers);
4852 	}
4853 
4854 out_put_req:
4855 	ceph_osdc_put_request(req);
4856 	return ret;
4857 }
4858 EXPORT_SYMBOL(ceph_osdc_list_watchers);
4859 
4860 /*
4861  * Call all pending notify callbacks - for use after a watch is
4862  * unregistered, to make sure no more callbacks for it will be invoked
4863  */
4864 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
4865 {
4866 	dout("%s osdc %p\n", __func__, osdc);
4867 	flush_workqueue(osdc->notify_wq);
4868 }
4869 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
4870 
4871 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
4872 {
4873 	down_read(&osdc->lock);
4874 	maybe_request_map(osdc);
4875 	up_read(&osdc->lock);
4876 }
4877 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
4878 
4879 /*
4880  * Execute an OSD class method on an object.
4881  *
4882  * @flags: CEPH_OSD_FLAG_*
4883  * @resp_len: in/out param for reply length
4884  */
4885 int ceph_osdc_call(struct ceph_osd_client *osdc,
4886 		   struct ceph_object_id *oid,
4887 		   struct ceph_object_locator *oloc,
4888 		   const char *class, const char *method,
4889 		   unsigned int flags,
4890 		   struct page *req_page, size_t req_len,
4891 		   struct page *resp_page, size_t *resp_len)
4892 {
4893 	struct ceph_osd_request *req;
4894 	int ret;
4895 
4896 	if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
4897 		return -E2BIG;
4898 
4899 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4900 	if (!req)
4901 		return -ENOMEM;
4902 
4903 	ceph_oid_copy(&req->r_base_oid, oid);
4904 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4905 	req->r_flags = flags;
4906 
4907 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4908 	if (ret)
4909 		goto out_put_req;
4910 
4911 	osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4912 	if (req_page)
4913 		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4914 						  0, false, false);
4915 	if (resp_page)
4916 		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4917 						   *resp_len, 0, false, false);
4918 
4919 	ceph_osdc_start_request(osdc, req, false);
4920 	ret = ceph_osdc_wait_request(osdc, req);
4921 	if (ret >= 0) {
4922 		ret = req->r_ops[0].rval;
4923 		if (resp_page)
4924 			*resp_len = req->r_ops[0].outdata_len;
4925 	}
4926 
4927 out_put_req:
4928 	ceph_osdc_put_request(req);
4929 	return ret;
4930 }
4931 EXPORT_SYMBOL(ceph_osdc_call);
4932 
4933 /*
4934  * init, shutdown
4935  */
4936 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
4937 {
4938 	int err;
4939 
4940 	dout("init\n");
4941 	osdc->client = client;
4942 	init_rwsem(&osdc->lock);
4943 	osdc->osds = RB_ROOT;
4944 	INIT_LIST_HEAD(&osdc->osd_lru);
4945 	spin_lock_init(&osdc->osd_lru_lock);
4946 	osd_init(&osdc->homeless_osd);
4947 	osdc->homeless_osd.o_osdc = osdc;
4948 	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
4949 	osdc->last_linger_id = CEPH_LINGER_ID_START;
4950 	osdc->linger_requests = RB_ROOT;
4951 	osdc->map_checks = RB_ROOT;
4952 	osdc->linger_map_checks = RB_ROOT;
4953 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
4954 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
4955 
4956 	err = -ENOMEM;
4957 	osdc->osdmap = ceph_osdmap_alloc();
4958 	if (!osdc->osdmap)
4959 		goto out;
4960 
4961 	osdc->req_mempool = mempool_create_slab_pool(10,
4962 						     ceph_osd_request_cache);
4963 	if (!osdc->req_mempool)
4964 		goto out_map;
4965 
4966 	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
4967 				PAGE_SIZE, 10, true, "osd_op");
4968 	if (err < 0)
4969 		goto out_mempool;
4970 	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
4971 				PAGE_SIZE, 10, true, "osd_op_reply");
4972 	if (err < 0)
4973 		goto out_msgpool;
4974 
4975 	err = -ENOMEM;
4976 	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
4977 	if (!osdc->notify_wq)
4978 		goto out_msgpool_reply;
4979 
4980 	schedule_delayed_work(&osdc->timeout_work,
4981 			      osdc->client->options->osd_keepalive_timeout);
4982 	schedule_delayed_work(&osdc->osds_timeout_work,
4983 	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
4984 
4985 	return 0;
4986 
4987 out_msgpool_reply:
4988 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
4989 out_msgpool:
4990 	ceph_msgpool_destroy(&osdc->msgpool_op);
4991 out_mempool:
4992 	mempool_destroy(osdc->req_mempool);
4993 out_map:
4994 	ceph_osdmap_destroy(osdc->osdmap);
4995 out:
4996 	return err;
4997 }
4998 
4999 void ceph_osdc_stop(struct ceph_osd_client *osdc)
5000 {
5001 	flush_workqueue(osdc->notify_wq);
5002 	destroy_workqueue(osdc->notify_wq);
5003 	cancel_delayed_work_sync(&osdc->timeout_work);
5004 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
5005 
5006 	down_write(&osdc->lock);
5007 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
5008 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5009 						struct ceph_osd, o_node);
5010 		close_osd(osd);
5011 	}
5012 	up_write(&osdc->lock);
5013 	WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5014 	osd_cleanup(&osdc->homeless_osd);
5015 
5016 	WARN_ON(!list_empty(&osdc->osd_lru));
5017 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5018 	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5019 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5020 	WARN_ON(atomic_read(&osdc->num_requests));
5021 	WARN_ON(atomic_read(&osdc->num_homeless));
5022 
5023 	ceph_osdmap_destroy(osdc->osdmap);
5024 	mempool_destroy(osdc->req_mempool);
5025 	ceph_msgpool_destroy(&osdc->msgpool_op);
5026 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5027 }
5028 
5029 /*
5030  * Read some contiguous pages.  If we cross a stripe boundary, shorten
5031  * *plen.  Return number of bytes read, or error.
5032  */
5033 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
5034 			struct ceph_vino vino, struct ceph_file_layout *layout,
5035 			u64 off, u64 *plen,
5036 			u32 truncate_seq, u64 truncate_size,
5037 			struct page **pages, int num_pages, int page_align)
5038 {
5039 	struct ceph_osd_request *req;
5040 	int rc = 0;
5041 
5042 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
5043 	     vino.snap, off, *plen);
5044 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
5045 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
5046 				    NULL, truncate_seq, truncate_size,
5047 				    false);
5048 	if (IS_ERR(req))
5049 		return PTR_ERR(req);
5050 
5051 	/* it may be a short read due to an object boundary */
5052 	osd_req_op_extent_osd_data_pages(req, 0,
5053 				pages, *plen, page_align, false, false);
5054 
5055 	dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
5056 	     off, *plen, *plen, page_align);
5057 
5058 	rc = ceph_osdc_start_request(osdc, req, false);
5059 	if (!rc)
5060 		rc = ceph_osdc_wait_request(osdc, req);
5061 
5062 	ceph_osdc_put_request(req);
5063 	dout("readpages result %d\n", rc);
5064 	return rc;
5065 }
5066 EXPORT_SYMBOL(ceph_osdc_readpages);
5067 
5068 /*
5069  * do a synchronous write on N pages
5070  */
5071 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
5072 			 struct ceph_file_layout *layout,
5073 			 struct ceph_snap_context *snapc,
5074 			 u64 off, u64 len,
5075 			 u32 truncate_seq, u64 truncate_size,
5076 			 struct timespec *mtime,
5077 			 struct page **pages, int num_pages)
5078 {
5079 	struct ceph_osd_request *req;
5080 	int rc = 0;
5081 	int page_align = off & ~PAGE_MASK;
5082 
5083 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
5084 				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
5085 				    snapc, truncate_seq, truncate_size,
5086 				    true);
5087 	if (IS_ERR(req))
5088 		return PTR_ERR(req);
5089 
5090 	/* it may be a short write due to an object boundary */
5091 	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
5092 				false, false);
5093 	dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
5094 
5095 	req->r_mtime = *mtime;
5096 	rc = ceph_osdc_start_request(osdc, req, true);
5097 	if (!rc)
5098 		rc = ceph_osdc_wait_request(osdc, req);
5099 
5100 	ceph_osdc_put_request(req);
5101 	if (rc == 0)
5102 		rc = len;
5103 	dout("writepages result %d\n", rc);
5104 	return rc;
5105 }
5106 EXPORT_SYMBOL(ceph_osdc_writepages);
5107 
5108 int __init ceph_osdc_setup(void)
5109 {
5110 	size_t size = sizeof(struct ceph_osd_request) +
5111 	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5112 
5113 	BUG_ON(ceph_osd_request_cache);
5114 	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5115 						   0, 0, NULL);
5116 
5117 	return ceph_osd_request_cache ? 0 : -ENOMEM;
5118 }
5119 
5120 void ceph_osdc_cleanup(void)
5121 {
5122 	BUG_ON(!ceph_osd_request_cache);
5123 	kmem_cache_destroy(ceph_osd_request_cache);
5124 	ceph_osd_request_cache = NULL;
5125 }
5126 
5127 /*
5128  * handle incoming message
5129  */
5130 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5131 {
5132 	struct ceph_osd *osd = con->private;
5133 	struct ceph_osd_client *osdc = osd->o_osdc;
5134 	int type = le16_to_cpu(msg->hdr.type);
5135 
5136 	switch (type) {
5137 	case CEPH_MSG_OSD_MAP:
5138 		ceph_osdc_handle_map(osdc, msg);
5139 		break;
5140 	case CEPH_MSG_OSD_OPREPLY:
5141 		handle_reply(osd, msg);
5142 		break;
5143 	case CEPH_MSG_OSD_BACKOFF:
5144 		handle_backoff(osd, msg);
5145 		break;
5146 	case CEPH_MSG_WATCH_NOTIFY:
5147 		handle_watch_notify(osdc, msg);
5148 		break;
5149 
5150 	default:
5151 		pr_err("received unknown message type %d %s\n", type,
5152 		       ceph_msg_type_name(type));
5153 	}
5154 
5155 	ceph_msg_put(msg);
5156 }
5157 
5158 /*
5159  * Lookup and return message for incoming reply.  Don't try to do
5160  * anything about a larger than preallocated data portion of the
5161  * message at the moment - for now, just skip the message.
5162  */
5163 static struct ceph_msg *get_reply(struct ceph_connection *con,
5164 				  struct ceph_msg_header *hdr,
5165 				  int *skip)
5166 {
5167 	struct ceph_osd *osd = con->private;
5168 	struct ceph_osd_client *osdc = osd->o_osdc;
5169 	struct ceph_msg *m = NULL;
5170 	struct ceph_osd_request *req;
5171 	int front_len = le32_to_cpu(hdr->front_len);
5172 	int data_len = le32_to_cpu(hdr->data_len);
5173 	u64 tid = le64_to_cpu(hdr->tid);
5174 
5175 	down_read(&osdc->lock);
5176 	if (!osd_registered(osd)) {
5177 		dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5178 		*skip = 1;
5179 		goto out_unlock_osdc;
5180 	}
5181 	WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5182 
5183 	mutex_lock(&osd->lock);
5184 	req = lookup_request(&osd->o_requests, tid);
5185 	if (!req) {
5186 		dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5187 		     osd->o_osd, tid);
5188 		*skip = 1;
5189 		goto out_unlock_session;
5190 	}
5191 
5192 	ceph_msg_revoke_incoming(req->r_reply);
5193 
5194 	if (front_len > req->r_reply->front_alloc_len) {
5195 		pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5196 			__func__, osd->o_osd, req->r_tid, front_len,
5197 			req->r_reply->front_alloc_len);
5198 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5199 				 false);
5200 		if (!m)
5201 			goto out_unlock_session;
5202 		ceph_msg_put(req->r_reply);
5203 		req->r_reply = m;
5204 	}
5205 
5206 	if (data_len > req->r_reply->data_length) {
5207 		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5208 			__func__, osd->o_osd, req->r_tid, data_len,
5209 			req->r_reply->data_length);
5210 		m = NULL;
5211 		*skip = 1;
5212 		goto out_unlock_session;
5213 	}
5214 
5215 	m = ceph_msg_get(req->r_reply);
5216 	dout("get_reply tid %lld %p\n", tid, m);
5217 
5218 out_unlock_session:
5219 	mutex_unlock(&osd->lock);
5220 out_unlock_osdc:
5221 	up_read(&osdc->lock);
5222 	return m;
5223 }
5224 
5225 /*
5226  * TODO: switch to a msg-owned pagelist
5227  */
5228 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5229 {
5230 	struct ceph_msg *m;
5231 	int type = le16_to_cpu(hdr->type);
5232 	u32 front_len = le32_to_cpu(hdr->front_len);
5233 	u32 data_len = le32_to_cpu(hdr->data_len);
5234 
5235 	m = ceph_msg_new(type, front_len, GFP_NOIO, false);
5236 	if (!m)
5237 		return NULL;
5238 
5239 	if (data_len) {
5240 		struct page **pages;
5241 		struct ceph_osd_data osd_data;
5242 
5243 		pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5244 					       GFP_NOIO);
5245 		if (IS_ERR(pages)) {
5246 			ceph_msg_put(m);
5247 			return NULL;
5248 		}
5249 
5250 		ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
5251 					 false);
5252 		ceph_osdc_msg_data_add(m, &osd_data);
5253 	}
5254 
5255 	return m;
5256 }
5257 
5258 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
5259 				  struct ceph_msg_header *hdr,
5260 				  int *skip)
5261 {
5262 	struct ceph_osd *osd = con->private;
5263 	int type = le16_to_cpu(hdr->type);
5264 
5265 	*skip = 0;
5266 	switch (type) {
5267 	case CEPH_MSG_OSD_MAP:
5268 	case CEPH_MSG_OSD_BACKOFF:
5269 	case CEPH_MSG_WATCH_NOTIFY:
5270 		return alloc_msg_with_page_vector(hdr);
5271 	case CEPH_MSG_OSD_OPREPLY:
5272 		return get_reply(con, hdr, skip);
5273 	default:
5274 		pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5275 			osd->o_osd, type);
5276 		*skip = 1;
5277 		return NULL;
5278 	}
5279 }
5280 
5281 /*
5282  * Wrappers to refcount containing ceph_osd struct
5283  */
5284 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
5285 {
5286 	struct ceph_osd *osd = con->private;
5287 	if (get_osd(osd))
5288 		return con;
5289 	return NULL;
5290 }
5291 
5292 static void put_osd_con(struct ceph_connection *con)
5293 {
5294 	struct ceph_osd *osd = con->private;
5295 	put_osd(osd);
5296 }
5297 
5298 /*
5299  * authentication
5300  */
5301 /*
5302  * Note: returned pointer is the address of a structure that's
5303  * managed separately.  Caller must *not* attempt to free it.
5304  */
5305 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5306 					int *proto, int force_new)
5307 {
5308 	struct ceph_osd *o = con->private;
5309 	struct ceph_osd_client *osdc = o->o_osdc;
5310 	struct ceph_auth_client *ac = osdc->client->monc.auth;
5311 	struct ceph_auth_handshake *auth = &o->o_auth;
5312 
5313 	if (force_new && auth->authorizer) {
5314 		ceph_auth_destroy_authorizer(auth->authorizer);
5315 		auth->authorizer = NULL;
5316 	}
5317 	if (!auth->authorizer) {
5318 		int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5319 						      auth);
5320 		if (ret)
5321 			return ERR_PTR(ret);
5322 	} else {
5323 		int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5324 						     auth);
5325 		if (ret)
5326 			return ERR_PTR(ret);
5327 	}
5328 	*proto = ac->protocol;
5329 
5330 	return auth;
5331 }
5332 
5333 
5334 static int verify_authorizer_reply(struct ceph_connection *con)
5335 {
5336 	struct ceph_osd *o = con->private;
5337 	struct ceph_osd_client *osdc = o->o_osdc;
5338 	struct ceph_auth_client *ac = osdc->client->monc.auth;
5339 
5340 	return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
5341 }
5342 
5343 static int invalidate_authorizer(struct ceph_connection *con)
5344 {
5345 	struct ceph_osd *o = con->private;
5346 	struct ceph_osd_client *osdc = o->o_osdc;
5347 	struct ceph_auth_client *ac = osdc->client->monc.auth;
5348 
5349 	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5350 	return ceph_monc_validate_auth(&osdc->client->monc);
5351 }
5352 
5353 static void osd_reencode_message(struct ceph_msg *msg)
5354 {
5355 	int type = le16_to_cpu(msg->hdr.type);
5356 
5357 	if (type == CEPH_MSG_OSD_OP)
5358 		encode_request_finish(msg);
5359 }
5360 
5361 static int osd_sign_message(struct ceph_msg *msg)
5362 {
5363 	struct ceph_osd *o = msg->con->private;
5364 	struct ceph_auth_handshake *auth = &o->o_auth;
5365 
5366 	return ceph_auth_sign_message(auth, msg);
5367 }
5368 
5369 static int osd_check_message_signature(struct ceph_msg *msg)
5370 {
5371 	struct ceph_osd *o = msg->con->private;
5372 	struct ceph_auth_handshake *auth = &o->o_auth;
5373 
5374 	return ceph_auth_check_message_signature(auth, msg);
5375 }
5376 
5377 static const struct ceph_connection_operations osd_con_ops = {
5378 	.get = get_osd_con,
5379 	.put = put_osd_con,
5380 	.dispatch = dispatch,
5381 	.get_authorizer = get_authorizer,
5382 	.verify_authorizer_reply = verify_authorizer_reply,
5383 	.invalidate_authorizer = invalidate_authorizer,
5384 	.alloc_msg = alloc_msg,
5385 	.reencode_message = osd_reencode_message,
5386 	.sign_message = osd_sign_message,
5387 	.check_message_signature = osd_check_message_signature,
5388 	.fault = osd_fault,
5389 };
5390