1 // SPDX-License-Identifier: GPL-2.0
2
3 #include <linux/ceph/ceph_debug.h>
4
5 #include <linux/module.h>
6 #include <linux/err.h>
7 #include <linux/highmem.h>
8 #include <linux/mm.h>
9 #include <linux/pagemap.h>
10 #include <linux/slab.h>
11 #include <linux/uaccess.h>
12 #ifdef CONFIG_BLOCK
13 #include <linux/bio.h>
14 #endif
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/libceph.h>
18 #include <linux/ceph/osd_client.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/auth.h>
22 #include <linux/ceph/pagelist.h>
23 #include <linux/ceph/striper.h>
24
25 #define OSD_OPREPLY_FRONT_LEN 512
26
27 static struct kmem_cache *ceph_osd_request_cache;
28
29 static const struct ceph_connection_operations osd_con_ops;
30
31 /*
32 * Implement client access to distributed object storage cluster.
33 *
34 * All data objects are stored within a cluster/cloud of OSDs, or
35 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
36 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
37 * remote daemons serving up and coordinating consistent and safe
38 * access to storage.
39 *
40 * Cluster membership and the mapping of data objects onto storage devices
41 * are described by the osd map.
42 *
43 * We keep track of pending OSD requests (read, write), resubmit
44 * requests to different OSDs when the cluster topology/data layout
45 * change, or retry the affected requests when the communications
46 * channel with an OSD is reset.
47 */
48
49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51 static void link_linger(struct ceph_osd *osd,
52 struct ceph_osd_linger_request *lreq);
53 static void unlink_linger(struct ceph_osd *osd,
54 struct ceph_osd_linger_request *lreq);
55 static void clear_backoffs(struct ceph_osd *osd);
56
57 #if 1
rwsem_is_wrlocked(struct rw_semaphore * sem)58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59 {
60 bool wrlocked = true;
61
62 if (unlikely(down_read_trylock(sem))) {
63 wrlocked = false;
64 up_read(sem);
65 }
66
67 return wrlocked;
68 }
verify_osdc_locked(struct ceph_osd_client * osdc)69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70 {
71 WARN_ON(!rwsem_is_locked(&osdc->lock));
72 }
verify_osdc_wrlocked(struct ceph_osd_client * osdc)73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74 {
75 WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76 }
verify_osd_locked(struct ceph_osd * osd)77 static inline void verify_osd_locked(struct ceph_osd *osd)
78 {
79 struct ceph_osd_client *osdc = osd->o_osdc;
80
81 WARN_ON(!(mutex_is_locked(&osd->lock) &&
82 rwsem_is_locked(&osdc->lock)) &&
83 !rwsem_is_wrlocked(&osdc->lock));
84 }
verify_lreq_locked(struct ceph_osd_linger_request * lreq)85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86 {
87 WARN_ON(!mutex_is_locked(&lreq->lock));
88 }
89 #else
verify_osdc_locked(struct ceph_osd_client * osdc)90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
verify_osdc_wrlocked(struct ceph_osd_client * osdc)91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
verify_osd_locked(struct ceph_osd * osd)92 static inline void verify_osd_locked(struct ceph_osd *osd) { }
verify_lreq_locked(struct ceph_osd_linger_request * lreq)93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94 #endif
95
96 /*
97 * calculate the mapping of a file extent onto an object, and fill out the
98 * request accordingly. shorten extent as necessary if it crosses an
99 * object boundary.
100 *
101 * fill osd op in request message.
102 */
calc_layout(struct ceph_file_layout * layout,u64 off,u64 * plen,u64 * objnum,u64 * objoff,u64 * objlen)103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104 u64 *objnum, u64 *objoff, u64 *objlen)
105 {
106 u64 orig_len = *plen;
107 u32 xlen;
108
109 /* object extent? */
110 ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111 objoff, &xlen);
112 *objlen = xlen;
113 if (*objlen < orig_len) {
114 *plen = *objlen;
115 dout(" skipping last %llu, final file extent %llu~%llu\n",
116 orig_len - *plen, off, *plen);
117 }
118
119 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120 return 0;
121 }
122
ceph_osd_data_init(struct ceph_osd_data * osd_data)123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124 {
125 memset(osd_data, 0, sizeof (*osd_data));
126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127 }
128
129 /*
130 * Consumes @pages if @own_pages is true.
131 */
ceph_osd_data_pages_init(struct ceph_osd_data * osd_data,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)132 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
133 struct page **pages, u64 length, u32 alignment,
134 bool pages_from_pool, bool own_pages)
135 {
136 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
137 osd_data->pages = pages;
138 osd_data->length = length;
139 osd_data->alignment = alignment;
140 osd_data->pages_from_pool = pages_from_pool;
141 osd_data->own_pages = own_pages;
142 }
143
144 /*
145 * Consumes a ref on @pagelist.
146 */
ceph_osd_data_pagelist_init(struct ceph_osd_data * osd_data,struct ceph_pagelist * pagelist)147 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
148 struct ceph_pagelist *pagelist)
149 {
150 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
151 osd_data->pagelist = pagelist;
152 }
153
154 #ifdef CONFIG_BLOCK
ceph_osd_data_bio_init(struct ceph_osd_data * osd_data,struct ceph_bio_iter * bio_pos,u32 bio_length)155 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
156 struct ceph_bio_iter *bio_pos,
157 u32 bio_length)
158 {
159 osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
160 osd_data->bio_pos = *bio_pos;
161 osd_data->bio_length = bio_length;
162 }
163 #endif /* CONFIG_BLOCK */
164
ceph_osd_data_bvecs_init(struct ceph_osd_data * osd_data,struct ceph_bvec_iter * bvec_pos,u32 num_bvecs)165 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
166 struct ceph_bvec_iter *bvec_pos,
167 u32 num_bvecs)
168 {
169 osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
170 osd_data->bvec_pos = *bvec_pos;
171 osd_data->num_bvecs = num_bvecs;
172 }
173
ceph_osd_iter_init(struct ceph_osd_data * osd_data,struct iov_iter * iter)174 static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
175 struct iov_iter *iter)
176 {
177 osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
178 osd_data->iter = *iter;
179 }
180
181 static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request * osd_req,unsigned int which)182 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
183 {
184 BUG_ON(which >= osd_req->r_num_ops);
185
186 return &osd_req->r_ops[which].raw_data_in;
187 }
188
189 struct ceph_osd_data *
osd_req_op_extent_osd_data(struct ceph_osd_request * osd_req,unsigned int which)190 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
191 unsigned int which)
192 {
193 return osd_req_op_data(osd_req, which, extent, osd_data);
194 }
195 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
196
osd_req_op_raw_data_in_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)197 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
198 unsigned int which, struct page **pages,
199 u64 length, u32 alignment,
200 bool pages_from_pool, bool own_pages)
201 {
202 struct ceph_osd_data *osd_data;
203
204 osd_data = osd_req_op_raw_data_in(osd_req, which);
205 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
206 pages_from_pool, own_pages);
207 }
208 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
209
osd_req_op_extent_osd_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)210 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
211 unsigned int which, struct page **pages,
212 u64 length, u32 alignment,
213 bool pages_from_pool, bool own_pages)
214 {
215 struct ceph_osd_data *osd_data;
216
217 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
218 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
219 pages_from_pool, own_pages);
220 }
221 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
222
223 #ifdef CONFIG_BLOCK
osd_req_op_extent_osd_data_bio(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_bio_iter * bio_pos,u32 bio_length)224 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
225 unsigned int which,
226 struct ceph_bio_iter *bio_pos,
227 u32 bio_length)
228 {
229 struct ceph_osd_data *osd_data;
230
231 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
232 ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
233 }
234 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
235 #endif /* CONFIG_BLOCK */
236
osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request * osd_req,unsigned int which,struct bio_vec * bvecs,u32 num_bvecs,u32 bytes)237 void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
238 unsigned int which,
239 struct bio_vec *bvecs, u32 num_bvecs,
240 u32 bytes)
241 {
242 struct ceph_osd_data *osd_data;
243 struct ceph_bvec_iter it = {
244 .bvecs = bvecs,
245 .iter = { .bi_size = bytes },
246 };
247
248 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
249 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
250 }
251 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
252
osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_bvec_iter * bvec_pos)253 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
254 unsigned int which,
255 struct ceph_bvec_iter *bvec_pos)
256 {
257 struct ceph_osd_data *osd_data;
258
259 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
260 ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
261 }
262 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
263
264 /**
265 * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
266 * @osd_req: The request to set up
267 * @which: Index of the operation in which to set the iter
268 * @iter: The buffer iterator
269 */
osd_req_op_extent_osd_iter(struct ceph_osd_request * osd_req,unsigned int which,struct iov_iter * iter)270 void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
271 unsigned int which, struct iov_iter *iter)
272 {
273 struct ceph_osd_data *osd_data;
274
275 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
276 ceph_osd_iter_init(osd_data, iter);
277 }
278 EXPORT_SYMBOL(osd_req_op_extent_osd_iter);
279
osd_req_op_cls_request_info_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)280 static void osd_req_op_cls_request_info_pagelist(
281 struct ceph_osd_request *osd_req,
282 unsigned int which, struct ceph_pagelist *pagelist)
283 {
284 struct ceph_osd_data *osd_data;
285
286 osd_data = osd_req_op_data(osd_req, which, cls, request_info);
287 ceph_osd_data_pagelist_init(osd_data, pagelist);
288 }
289
osd_req_op_cls_request_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)290 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
291 unsigned int which, struct page **pages, u64 length,
292 u32 alignment, bool pages_from_pool, bool own_pages)
293 {
294 struct ceph_osd_data *osd_data;
295
296 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
297 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
298 pages_from_pool, own_pages);
299 osd_req->r_ops[which].cls.indata_len += length;
300 osd_req->r_ops[which].indata_len += length;
301 }
302 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
303
osd_req_op_cls_request_data_bvecs(struct ceph_osd_request * osd_req,unsigned int which,struct bio_vec * bvecs,u32 num_bvecs,u32 bytes)304 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
305 unsigned int which,
306 struct bio_vec *bvecs, u32 num_bvecs,
307 u32 bytes)
308 {
309 struct ceph_osd_data *osd_data;
310 struct ceph_bvec_iter it = {
311 .bvecs = bvecs,
312 .iter = { .bi_size = bytes },
313 };
314
315 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
316 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
317 osd_req->r_ops[which].cls.indata_len += bytes;
318 osd_req->r_ops[which].indata_len += bytes;
319 }
320 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
321
osd_req_op_cls_response_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)322 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
323 unsigned int which, struct page **pages, u64 length,
324 u32 alignment, bool pages_from_pool, bool own_pages)
325 {
326 struct ceph_osd_data *osd_data;
327
328 osd_data = osd_req_op_data(osd_req, which, cls, response_data);
329 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
330 pages_from_pool, own_pages);
331 }
332 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
333
ceph_osd_data_length(struct ceph_osd_data * osd_data)334 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
335 {
336 switch (osd_data->type) {
337 case CEPH_OSD_DATA_TYPE_NONE:
338 return 0;
339 case CEPH_OSD_DATA_TYPE_PAGES:
340 return osd_data->length;
341 case CEPH_OSD_DATA_TYPE_PAGELIST:
342 return (u64)osd_data->pagelist->length;
343 #ifdef CONFIG_BLOCK
344 case CEPH_OSD_DATA_TYPE_BIO:
345 return (u64)osd_data->bio_length;
346 #endif /* CONFIG_BLOCK */
347 case CEPH_OSD_DATA_TYPE_BVECS:
348 return osd_data->bvec_pos.iter.bi_size;
349 case CEPH_OSD_DATA_TYPE_ITER:
350 return iov_iter_count(&osd_data->iter);
351 default:
352 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
353 return 0;
354 }
355 }
356
ceph_osd_data_release(struct ceph_osd_data * osd_data)357 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
358 {
359 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
360 int num_pages;
361
362 num_pages = calc_pages_for((u64)osd_data->alignment,
363 (u64)osd_data->length);
364 ceph_release_page_vector(osd_data->pages, num_pages);
365 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
366 ceph_pagelist_release(osd_data->pagelist);
367 }
368 ceph_osd_data_init(osd_data);
369 }
370
osd_req_op_data_release(struct ceph_osd_request * osd_req,unsigned int which)371 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
372 unsigned int which)
373 {
374 struct ceph_osd_req_op *op;
375
376 BUG_ON(which >= osd_req->r_num_ops);
377 op = &osd_req->r_ops[which];
378
379 switch (op->op) {
380 case CEPH_OSD_OP_READ:
381 case CEPH_OSD_OP_SPARSE_READ:
382 case CEPH_OSD_OP_WRITE:
383 case CEPH_OSD_OP_WRITEFULL:
384 kfree(op->extent.sparse_ext);
385 ceph_osd_data_release(&op->extent.osd_data);
386 break;
387 case CEPH_OSD_OP_CALL:
388 ceph_osd_data_release(&op->cls.request_info);
389 ceph_osd_data_release(&op->cls.request_data);
390 ceph_osd_data_release(&op->cls.response_data);
391 break;
392 case CEPH_OSD_OP_SETXATTR:
393 case CEPH_OSD_OP_CMPXATTR:
394 ceph_osd_data_release(&op->xattr.osd_data);
395 break;
396 case CEPH_OSD_OP_STAT:
397 ceph_osd_data_release(&op->raw_data_in);
398 break;
399 case CEPH_OSD_OP_NOTIFY_ACK:
400 ceph_osd_data_release(&op->notify_ack.request_data);
401 break;
402 case CEPH_OSD_OP_NOTIFY:
403 ceph_osd_data_release(&op->notify.request_data);
404 ceph_osd_data_release(&op->notify.response_data);
405 break;
406 case CEPH_OSD_OP_LIST_WATCHERS:
407 ceph_osd_data_release(&op->list_watchers.response_data);
408 break;
409 case CEPH_OSD_OP_COPY_FROM2:
410 ceph_osd_data_release(&op->copy_from.osd_data);
411 break;
412 default:
413 break;
414 }
415 }
416
417 /*
418 * Assumes @t is zero-initialized.
419 */
target_init(struct ceph_osd_request_target * t)420 static void target_init(struct ceph_osd_request_target *t)
421 {
422 ceph_oid_init(&t->base_oid);
423 ceph_oloc_init(&t->base_oloc);
424 ceph_oid_init(&t->target_oid);
425 ceph_oloc_init(&t->target_oloc);
426
427 ceph_osds_init(&t->acting);
428 ceph_osds_init(&t->up);
429 t->size = -1;
430 t->min_size = -1;
431
432 t->osd = CEPH_HOMELESS_OSD;
433 }
434
target_copy(struct ceph_osd_request_target * dest,const struct ceph_osd_request_target * src)435 static void target_copy(struct ceph_osd_request_target *dest,
436 const struct ceph_osd_request_target *src)
437 {
438 ceph_oid_copy(&dest->base_oid, &src->base_oid);
439 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
440 ceph_oid_copy(&dest->target_oid, &src->target_oid);
441 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
442
443 dest->pgid = src->pgid; /* struct */
444 dest->spgid = src->spgid; /* struct */
445 dest->pg_num = src->pg_num;
446 dest->pg_num_mask = src->pg_num_mask;
447 ceph_osds_copy(&dest->acting, &src->acting);
448 ceph_osds_copy(&dest->up, &src->up);
449 dest->size = src->size;
450 dest->min_size = src->min_size;
451 dest->sort_bitwise = src->sort_bitwise;
452 dest->recovery_deletes = src->recovery_deletes;
453
454 dest->flags = src->flags;
455 dest->used_replica = src->used_replica;
456 dest->paused = src->paused;
457
458 dest->epoch = src->epoch;
459 dest->last_force_resend = src->last_force_resend;
460
461 dest->osd = src->osd;
462 }
463
target_destroy(struct ceph_osd_request_target * t)464 static void target_destroy(struct ceph_osd_request_target *t)
465 {
466 ceph_oid_destroy(&t->base_oid);
467 ceph_oloc_destroy(&t->base_oloc);
468 ceph_oid_destroy(&t->target_oid);
469 ceph_oloc_destroy(&t->target_oloc);
470 }
471
472 /*
473 * requests
474 */
request_release_checks(struct ceph_osd_request * req)475 static void request_release_checks(struct ceph_osd_request *req)
476 {
477 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
478 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
479 WARN_ON(!list_empty(&req->r_private_item));
480 WARN_ON(req->r_osd);
481 }
482
ceph_osdc_release_request(struct kref * kref)483 static void ceph_osdc_release_request(struct kref *kref)
484 {
485 struct ceph_osd_request *req = container_of(kref,
486 struct ceph_osd_request, r_kref);
487 unsigned int which;
488
489 dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
490 req->r_request, req->r_reply);
491 request_release_checks(req);
492
493 if (req->r_request)
494 ceph_msg_put(req->r_request);
495 if (req->r_reply)
496 ceph_msg_put(req->r_reply);
497
498 for (which = 0; which < req->r_num_ops; which++)
499 osd_req_op_data_release(req, which);
500
501 target_destroy(&req->r_t);
502 ceph_put_snap_context(req->r_snapc);
503
504 if (req->r_mempool)
505 mempool_free(req, req->r_osdc->req_mempool);
506 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
507 kmem_cache_free(ceph_osd_request_cache, req);
508 else
509 kfree(req);
510 }
511
ceph_osdc_get_request(struct ceph_osd_request * req)512 void ceph_osdc_get_request(struct ceph_osd_request *req)
513 {
514 dout("%s %p (was %d)\n", __func__, req,
515 kref_read(&req->r_kref));
516 kref_get(&req->r_kref);
517 }
518 EXPORT_SYMBOL(ceph_osdc_get_request);
519
ceph_osdc_put_request(struct ceph_osd_request * req)520 void ceph_osdc_put_request(struct ceph_osd_request *req)
521 {
522 if (req) {
523 dout("%s %p (was %d)\n", __func__, req,
524 kref_read(&req->r_kref));
525 kref_put(&req->r_kref, ceph_osdc_release_request);
526 }
527 }
528 EXPORT_SYMBOL(ceph_osdc_put_request);
529
request_init(struct ceph_osd_request * req)530 static void request_init(struct ceph_osd_request *req)
531 {
532 /* req only, each op is zeroed in osd_req_op_init() */
533 memset(req, 0, sizeof(*req));
534
535 kref_init(&req->r_kref);
536 init_completion(&req->r_completion);
537 RB_CLEAR_NODE(&req->r_node);
538 RB_CLEAR_NODE(&req->r_mc_node);
539 INIT_LIST_HEAD(&req->r_private_item);
540
541 target_init(&req->r_t);
542 }
543
ceph_osdc_alloc_request(struct ceph_osd_client * osdc,struct ceph_snap_context * snapc,unsigned int num_ops,bool use_mempool,gfp_t gfp_flags)544 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
545 struct ceph_snap_context *snapc,
546 unsigned int num_ops,
547 bool use_mempool,
548 gfp_t gfp_flags)
549 {
550 struct ceph_osd_request *req;
551
552 if (use_mempool) {
553 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
554 req = mempool_alloc(osdc->req_mempool, gfp_flags);
555 } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
556 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
557 } else {
558 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
559 req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
560 }
561 if (unlikely(!req))
562 return NULL;
563
564 request_init(req);
565 req->r_osdc = osdc;
566 req->r_mempool = use_mempool;
567 req->r_num_ops = num_ops;
568 req->r_snapid = CEPH_NOSNAP;
569 req->r_snapc = ceph_get_snap_context(snapc);
570
571 dout("%s req %p\n", __func__, req);
572 return req;
573 }
574 EXPORT_SYMBOL(ceph_osdc_alloc_request);
575
ceph_oloc_encoding_size(const struct ceph_object_locator * oloc)576 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
577 {
578 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
579 }
580
__ceph_osdc_alloc_messages(struct ceph_osd_request * req,gfp_t gfp,int num_request_data_items,int num_reply_data_items)581 static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
582 int num_request_data_items,
583 int num_reply_data_items)
584 {
585 struct ceph_osd_client *osdc = req->r_osdc;
586 struct ceph_msg *msg;
587 int msg_size;
588
589 WARN_ON(req->r_request || req->r_reply);
590 WARN_ON(ceph_oid_empty(&req->r_base_oid));
591 WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
592
593 /* create request message */
594 msg_size = CEPH_ENCODING_START_BLK_LEN +
595 CEPH_PGID_ENCODING_LEN + 1; /* spgid */
596 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
597 msg_size += CEPH_ENCODING_START_BLK_LEN +
598 sizeof(struct ceph_osd_reqid); /* reqid */
599 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
600 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
601 msg_size += CEPH_ENCODING_START_BLK_LEN +
602 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
603 msg_size += 4 + req->r_base_oid.name_len; /* oid */
604 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
605 msg_size += 8; /* snapid */
606 msg_size += 8; /* snap_seq */
607 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
608 msg_size += 4 + 8; /* retry_attempt, features */
609
610 if (req->r_mempool)
611 msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
612 num_request_data_items);
613 else
614 msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
615 num_request_data_items, gfp, true);
616 if (!msg)
617 return -ENOMEM;
618
619 memset(msg->front.iov_base, 0, msg->front.iov_len);
620 req->r_request = msg;
621
622 /* create reply message */
623 msg_size = OSD_OPREPLY_FRONT_LEN;
624 msg_size += req->r_base_oid.name_len;
625 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
626
627 if (req->r_mempool)
628 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
629 num_reply_data_items);
630 else
631 msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
632 num_reply_data_items, gfp, true);
633 if (!msg)
634 return -ENOMEM;
635
636 req->r_reply = msg;
637
638 return 0;
639 }
640
osd_req_opcode_valid(u16 opcode)641 static bool osd_req_opcode_valid(u16 opcode)
642 {
643 switch (opcode) {
644 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
645 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
646 #undef GENERATE_CASE
647 default:
648 return false;
649 }
650 }
651
get_num_data_items(struct ceph_osd_request * req,int * num_request_data_items,int * num_reply_data_items)652 static void get_num_data_items(struct ceph_osd_request *req,
653 int *num_request_data_items,
654 int *num_reply_data_items)
655 {
656 struct ceph_osd_req_op *op;
657
658 *num_request_data_items = 0;
659 *num_reply_data_items = 0;
660
661 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
662 switch (op->op) {
663 /* request */
664 case CEPH_OSD_OP_WRITE:
665 case CEPH_OSD_OP_WRITEFULL:
666 case CEPH_OSD_OP_SETXATTR:
667 case CEPH_OSD_OP_CMPXATTR:
668 case CEPH_OSD_OP_NOTIFY_ACK:
669 case CEPH_OSD_OP_COPY_FROM2:
670 *num_request_data_items += 1;
671 break;
672
673 /* reply */
674 case CEPH_OSD_OP_STAT:
675 case CEPH_OSD_OP_READ:
676 case CEPH_OSD_OP_SPARSE_READ:
677 case CEPH_OSD_OP_LIST_WATCHERS:
678 *num_reply_data_items += 1;
679 break;
680
681 /* both */
682 case CEPH_OSD_OP_NOTIFY:
683 *num_request_data_items += 1;
684 *num_reply_data_items += 1;
685 break;
686 case CEPH_OSD_OP_CALL:
687 *num_request_data_items += 2;
688 *num_reply_data_items += 1;
689 break;
690
691 default:
692 WARN_ON(!osd_req_opcode_valid(op->op));
693 break;
694 }
695 }
696 }
697
698 /*
699 * oid, oloc and OSD op opcode(s) must be filled in before this function
700 * is called.
701 */
ceph_osdc_alloc_messages(struct ceph_osd_request * req,gfp_t gfp)702 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
703 {
704 int num_request_data_items, num_reply_data_items;
705
706 get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
707 return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
708 num_reply_data_items);
709 }
710 EXPORT_SYMBOL(ceph_osdc_alloc_messages);
711
712 /*
713 * This is an osd op init function for opcodes that have no data or
714 * other information associated with them. It also serves as a
715 * common init routine for all the other init functions, below.
716 */
717 struct ceph_osd_req_op *
osd_req_op_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,u32 flags)718 osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
719 u16 opcode, u32 flags)
720 {
721 struct ceph_osd_req_op *op;
722
723 BUG_ON(which >= osd_req->r_num_ops);
724 BUG_ON(!osd_req_opcode_valid(opcode));
725
726 op = &osd_req->r_ops[which];
727 memset(op, 0, sizeof (*op));
728 op->op = opcode;
729 op->flags = flags;
730
731 return op;
732 }
733 EXPORT_SYMBOL(osd_req_op_init);
734
osd_req_op_extent_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,u64 offset,u64 length,u64 truncate_size,u32 truncate_seq)735 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
736 unsigned int which, u16 opcode,
737 u64 offset, u64 length,
738 u64 truncate_size, u32 truncate_seq)
739 {
740 struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
741 opcode, 0);
742 size_t payload_len = 0;
743
744 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
745 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
746 opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
747
748 op->extent.offset = offset;
749 op->extent.length = length;
750 op->extent.truncate_size = truncate_size;
751 op->extent.truncate_seq = truncate_seq;
752 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
753 payload_len += length;
754
755 op->indata_len = payload_len;
756 }
757 EXPORT_SYMBOL(osd_req_op_extent_init);
758
osd_req_op_extent_update(struct ceph_osd_request * osd_req,unsigned int which,u64 length)759 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
760 unsigned int which, u64 length)
761 {
762 struct ceph_osd_req_op *op;
763 u64 previous;
764
765 BUG_ON(which >= osd_req->r_num_ops);
766 op = &osd_req->r_ops[which];
767 previous = op->extent.length;
768
769 if (length == previous)
770 return; /* Nothing to do */
771 BUG_ON(length > previous);
772
773 op->extent.length = length;
774 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
775 op->indata_len -= previous - length;
776 }
777 EXPORT_SYMBOL(osd_req_op_extent_update);
778
osd_req_op_extent_dup_last(struct ceph_osd_request * osd_req,unsigned int which,u64 offset_inc)779 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
780 unsigned int which, u64 offset_inc)
781 {
782 struct ceph_osd_req_op *op, *prev_op;
783
784 BUG_ON(which + 1 >= osd_req->r_num_ops);
785
786 prev_op = &osd_req->r_ops[which];
787 op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
788 /* dup previous one */
789 op->indata_len = prev_op->indata_len;
790 op->outdata_len = prev_op->outdata_len;
791 op->extent = prev_op->extent;
792 /* adjust offset */
793 op->extent.offset += offset_inc;
794 op->extent.length -= offset_inc;
795
796 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
797 op->indata_len -= offset_inc;
798 }
799 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
800
osd_req_op_cls_init(struct ceph_osd_request * osd_req,unsigned int which,const char * class,const char * method)801 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
802 const char *class, const char *method)
803 {
804 struct ceph_osd_req_op *op;
805 struct ceph_pagelist *pagelist;
806 size_t payload_len = 0;
807 size_t size;
808 int ret;
809
810 op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
811
812 pagelist = ceph_pagelist_alloc(GFP_NOFS);
813 if (!pagelist)
814 return -ENOMEM;
815
816 op->cls.class_name = class;
817 size = strlen(class);
818 BUG_ON(size > (size_t) U8_MAX);
819 op->cls.class_len = size;
820 ret = ceph_pagelist_append(pagelist, class, size);
821 if (ret)
822 goto err_pagelist_free;
823 payload_len += size;
824
825 op->cls.method_name = method;
826 size = strlen(method);
827 BUG_ON(size > (size_t) U8_MAX);
828 op->cls.method_len = size;
829 ret = ceph_pagelist_append(pagelist, method, size);
830 if (ret)
831 goto err_pagelist_free;
832 payload_len += size;
833
834 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
835 op->indata_len = payload_len;
836 return 0;
837
838 err_pagelist_free:
839 ceph_pagelist_release(pagelist);
840 return ret;
841 }
842 EXPORT_SYMBOL(osd_req_op_cls_init);
843
osd_req_op_xattr_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,const char * name,const void * value,size_t size,u8 cmp_op,u8 cmp_mode)844 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
845 u16 opcode, const char *name, const void *value,
846 size_t size, u8 cmp_op, u8 cmp_mode)
847 {
848 struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
849 opcode, 0);
850 struct ceph_pagelist *pagelist;
851 size_t payload_len;
852 int ret;
853
854 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
855
856 pagelist = ceph_pagelist_alloc(GFP_NOFS);
857 if (!pagelist)
858 return -ENOMEM;
859
860 payload_len = strlen(name);
861 op->xattr.name_len = payload_len;
862 ret = ceph_pagelist_append(pagelist, name, payload_len);
863 if (ret)
864 goto err_pagelist_free;
865
866 op->xattr.value_len = size;
867 ret = ceph_pagelist_append(pagelist, value, size);
868 if (ret)
869 goto err_pagelist_free;
870 payload_len += size;
871
872 op->xattr.cmp_op = cmp_op;
873 op->xattr.cmp_mode = cmp_mode;
874
875 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
876 op->indata_len = payload_len;
877 return 0;
878
879 err_pagelist_free:
880 ceph_pagelist_release(pagelist);
881 return ret;
882 }
883 EXPORT_SYMBOL(osd_req_op_xattr_init);
884
885 /*
886 * @watch_opcode: CEPH_OSD_WATCH_OP_*
887 */
osd_req_op_watch_init(struct ceph_osd_request * req,int which,u8 watch_opcode,u64 cookie,u32 gen)888 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
889 u8 watch_opcode, u64 cookie, u32 gen)
890 {
891 struct ceph_osd_req_op *op;
892
893 op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
894 op->watch.cookie = cookie;
895 op->watch.op = watch_opcode;
896 op->watch.gen = gen;
897 }
898
899 /*
900 * prot_ver, timeout and notify payload (may be empty) should already be
901 * encoded in @request_pl
902 */
osd_req_op_notify_init(struct ceph_osd_request * req,int which,u64 cookie,struct ceph_pagelist * request_pl)903 static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
904 u64 cookie, struct ceph_pagelist *request_pl)
905 {
906 struct ceph_osd_req_op *op;
907
908 op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
909 op->notify.cookie = cookie;
910
911 ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
912 op->indata_len = request_pl->length;
913 }
914
915 /*
916 * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
917 */
osd_req_op_alloc_hint_init(struct ceph_osd_request * osd_req,unsigned int which,u64 expected_object_size,u64 expected_write_size,u32 flags)918 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
919 unsigned int which,
920 u64 expected_object_size,
921 u64 expected_write_size,
922 u32 flags)
923 {
924 struct ceph_osd_req_op *op;
925
926 op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
927 op->alloc_hint.expected_object_size = expected_object_size;
928 op->alloc_hint.expected_write_size = expected_write_size;
929 op->alloc_hint.flags = flags;
930
931 /*
932 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
933 * not worth a feature bit. Set FAILOK per-op flag to make
934 * sure older osds don't trip over an unsupported opcode.
935 */
936 op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
937 }
938 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
939
ceph_osdc_msg_data_add(struct ceph_msg * msg,struct ceph_osd_data * osd_data)940 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
941 struct ceph_osd_data *osd_data)
942 {
943 u64 length = ceph_osd_data_length(osd_data);
944
945 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
946 BUG_ON(length > (u64) SIZE_MAX);
947 if (length)
948 ceph_msg_data_add_pages(msg, osd_data->pages,
949 length, osd_data->alignment, false);
950 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
951 BUG_ON(!length);
952 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
953 #ifdef CONFIG_BLOCK
954 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
955 ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
956 #endif
957 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
958 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
959 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
960 ceph_msg_data_add_iter(msg, &osd_data->iter);
961 } else {
962 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
963 }
964 }
965
osd_req_encode_op(struct ceph_osd_op * dst,const struct ceph_osd_req_op * src)966 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
967 const struct ceph_osd_req_op *src)
968 {
969 switch (src->op) {
970 case CEPH_OSD_OP_STAT:
971 break;
972 case CEPH_OSD_OP_READ:
973 case CEPH_OSD_OP_SPARSE_READ:
974 case CEPH_OSD_OP_WRITE:
975 case CEPH_OSD_OP_WRITEFULL:
976 case CEPH_OSD_OP_ZERO:
977 case CEPH_OSD_OP_TRUNCATE:
978 dst->extent.offset = cpu_to_le64(src->extent.offset);
979 dst->extent.length = cpu_to_le64(src->extent.length);
980 dst->extent.truncate_size =
981 cpu_to_le64(src->extent.truncate_size);
982 dst->extent.truncate_seq =
983 cpu_to_le32(src->extent.truncate_seq);
984 break;
985 case CEPH_OSD_OP_CALL:
986 dst->cls.class_len = src->cls.class_len;
987 dst->cls.method_len = src->cls.method_len;
988 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
989 break;
990 case CEPH_OSD_OP_WATCH:
991 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
992 dst->watch.ver = cpu_to_le64(0);
993 dst->watch.op = src->watch.op;
994 dst->watch.gen = cpu_to_le32(src->watch.gen);
995 break;
996 case CEPH_OSD_OP_NOTIFY_ACK:
997 break;
998 case CEPH_OSD_OP_NOTIFY:
999 dst->notify.cookie = cpu_to_le64(src->notify.cookie);
1000 break;
1001 case CEPH_OSD_OP_LIST_WATCHERS:
1002 break;
1003 case CEPH_OSD_OP_SETALLOCHINT:
1004 dst->alloc_hint.expected_object_size =
1005 cpu_to_le64(src->alloc_hint.expected_object_size);
1006 dst->alloc_hint.expected_write_size =
1007 cpu_to_le64(src->alloc_hint.expected_write_size);
1008 dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
1009 break;
1010 case CEPH_OSD_OP_SETXATTR:
1011 case CEPH_OSD_OP_CMPXATTR:
1012 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
1013 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
1014 dst->xattr.cmp_op = src->xattr.cmp_op;
1015 dst->xattr.cmp_mode = src->xattr.cmp_mode;
1016 break;
1017 case CEPH_OSD_OP_CREATE:
1018 case CEPH_OSD_OP_DELETE:
1019 break;
1020 case CEPH_OSD_OP_COPY_FROM2:
1021 dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1022 dst->copy_from.src_version =
1023 cpu_to_le64(src->copy_from.src_version);
1024 dst->copy_from.flags = src->copy_from.flags;
1025 dst->copy_from.src_fadvise_flags =
1026 cpu_to_le32(src->copy_from.src_fadvise_flags);
1027 break;
1028 case CEPH_OSD_OP_ASSERT_VER:
1029 dst->assert_ver.unused = cpu_to_le64(0);
1030 dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver);
1031 break;
1032 default:
1033 pr_err("unsupported osd opcode %s\n",
1034 ceph_osd_op_name(src->op));
1035 WARN_ON(1);
1036
1037 return 0;
1038 }
1039
1040 dst->op = cpu_to_le16(src->op);
1041 dst->flags = cpu_to_le32(src->flags);
1042 dst->payload_len = cpu_to_le32(src->indata_len);
1043
1044 return src->indata_len;
1045 }
1046
1047 /*
1048 * build new request AND message, calculate layout, and adjust file
1049 * extent as needed.
1050 *
1051 * if the file was recently truncated, we include information about its
1052 * old and new size so that the object can be updated appropriately. (we
1053 * avoid synchronously deleting truncated objects because it's slow.)
1054 */
ceph_osdc_new_request(struct ceph_osd_client * osdc,struct ceph_file_layout * layout,struct ceph_vino vino,u64 off,u64 * plen,unsigned int which,int num_ops,int opcode,int flags,struct ceph_snap_context * snapc,u32 truncate_seq,u64 truncate_size,bool use_mempool)1055 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1056 struct ceph_file_layout *layout,
1057 struct ceph_vino vino,
1058 u64 off, u64 *plen,
1059 unsigned int which, int num_ops,
1060 int opcode, int flags,
1061 struct ceph_snap_context *snapc,
1062 u32 truncate_seq,
1063 u64 truncate_size,
1064 bool use_mempool)
1065 {
1066 struct ceph_osd_request *req;
1067 u64 objnum = 0;
1068 u64 objoff = 0;
1069 u64 objlen = 0;
1070 int r;
1071
1072 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
1073 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
1074 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
1075 opcode != CEPH_OSD_OP_SPARSE_READ);
1076
1077 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1078 GFP_NOFS);
1079 if (!req) {
1080 r = -ENOMEM;
1081 goto fail;
1082 }
1083
1084 /* calculate max write size */
1085 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1086 if (r)
1087 goto fail;
1088
1089 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1090 osd_req_op_init(req, which, opcode, 0);
1091 } else {
1092 u32 object_size = layout->object_size;
1093 u32 object_base = off - objoff;
1094 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1095 if (truncate_size <= object_base) {
1096 truncate_size = 0;
1097 } else {
1098 truncate_size -= object_base;
1099 if (truncate_size > object_size)
1100 truncate_size = object_size;
1101 }
1102 }
1103 osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1104 truncate_size, truncate_seq);
1105 }
1106
1107 req->r_base_oloc.pool = layout->pool_id;
1108 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1109 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1110 req->r_flags = flags | osdc->client->options->read_from_replica;
1111
1112 req->r_snapid = vino.snap;
1113 if (flags & CEPH_OSD_FLAG_WRITE)
1114 req->r_data_offset = off;
1115
1116 if (num_ops > 1) {
1117 int num_req_ops, num_rep_ops;
1118
1119 /*
1120 * If this is a multi-op write request, assume that we'll need
1121 * request ops. If it's a multi-op read then assume we'll need
1122 * reply ops. Anything else and call it -EINVAL.
1123 */
1124 if (flags & CEPH_OSD_FLAG_WRITE) {
1125 num_req_ops = num_ops;
1126 num_rep_ops = 0;
1127 } else if (flags & CEPH_OSD_FLAG_READ) {
1128 num_req_ops = 0;
1129 num_rep_ops = num_ops;
1130 } else {
1131 r = -EINVAL;
1132 goto fail;
1133 }
1134
1135 r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops,
1136 num_rep_ops);
1137 } else {
1138 r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1139 }
1140 if (r)
1141 goto fail;
1142
1143 return req;
1144
1145 fail:
1146 ceph_osdc_put_request(req);
1147 return ERR_PTR(r);
1148 }
1149 EXPORT_SYMBOL(ceph_osdc_new_request);
1150
__ceph_alloc_sparse_ext_map(struct ceph_osd_req_op * op,int cnt)1151 int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt)
1152 {
1153 WARN_ON(op->op != CEPH_OSD_OP_SPARSE_READ);
1154
1155 op->extent.sparse_ext_cnt = cnt;
1156 op->extent.sparse_ext = kmalloc_array(cnt,
1157 sizeof(*op->extent.sparse_ext),
1158 GFP_NOFS);
1159 if (!op->extent.sparse_ext)
1160 return -ENOMEM;
1161 return 0;
1162 }
1163 EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map);
1164
1165 /*
1166 * We keep osd requests in an rbtree, sorted by ->r_tid.
1167 */
DEFINE_RB_FUNCS(request,struct ceph_osd_request,r_tid,r_node)1168 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1169 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1170
1171 /*
1172 * Call @fn on each OSD request as long as @fn returns 0.
1173 */
1174 static void for_each_request(struct ceph_osd_client *osdc,
1175 int (*fn)(struct ceph_osd_request *req, void *arg),
1176 void *arg)
1177 {
1178 struct rb_node *n, *p;
1179
1180 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1181 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1182
1183 for (p = rb_first(&osd->o_requests); p; ) {
1184 struct ceph_osd_request *req =
1185 rb_entry(p, struct ceph_osd_request, r_node);
1186
1187 p = rb_next(p);
1188 if (fn(req, arg))
1189 return;
1190 }
1191 }
1192
1193 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1194 struct ceph_osd_request *req =
1195 rb_entry(p, struct ceph_osd_request, r_node);
1196
1197 p = rb_next(p);
1198 if (fn(req, arg))
1199 return;
1200 }
1201 }
1202
osd_homeless(struct ceph_osd * osd)1203 static bool osd_homeless(struct ceph_osd *osd)
1204 {
1205 return osd->o_osd == CEPH_HOMELESS_OSD;
1206 }
1207
osd_registered(struct ceph_osd * osd)1208 static bool osd_registered(struct ceph_osd *osd)
1209 {
1210 verify_osdc_locked(osd->o_osdc);
1211
1212 return !RB_EMPTY_NODE(&osd->o_node);
1213 }
1214
1215 /*
1216 * Assumes @osd is zero-initialized.
1217 */
osd_init(struct ceph_osd * osd)1218 static void osd_init(struct ceph_osd *osd)
1219 {
1220 refcount_set(&osd->o_ref, 1);
1221 RB_CLEAR_NODE(&osd->o_node);
1222 spin_lock_init(&osd->o_requests_lock);
1223 osd->o_requests = RB_ROOT;
1224 osd->o_linger_requests = RB_ROOT;
1225 osd->o_backoff_mappings = RB_ROOT;
1226 osd->o_backoffs_by_id = RB_ROOT;
1227 INIT_LIST_HEAD(&osd->o_osd_lru);
1228 INIT_LIST_HEAD(&osd->o_keepalive_item);
1229 osd->o_incarnation = 1;
1230 mutex_init(&osd->lock);
1231 }
1232
ceph_init_sparse_read(struct ceph_sparse_read * sr)1233 static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
1234 {
1235 kfree(sr->sr_extent);
1236 memset(sr, '\0', sizeof(*sr));
1237 sr->sr_state = CEPH_SPARSE_READ_HDR;
1238 }
1239
osd_cleanup(struct ceph_osd * osd)1240 static void osd_cleanup(struct ceph_osd *osd)
1241 {
1242 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1243 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1244 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1245 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1246 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1247 WARN_ON(!list_empty(&osd->o_osd_lru));
1248 WARN_ON(!list_empty(&osd->o_keepalive_item));
1249
1250 ceph_init_sparse_read(&osd->o_sparse_read);
1251
1252 if (osd->o_auth.authorizer) {
1253 WARN_ON(osd_homeless(osd));
1254 ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1255 }
1256 }
1257
1258 /*
1259 * Track open sessions with osds.
1260 */
create_osd(struct ceph_osd_client * osdc,int onum)1261 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1262 {
1263 struct ceph_osd *osd;
1264
1265 WARN_ON(onum == CEPH_HOMELESS_OSD);
1266
1267 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1268 osd_init(osd);
1269 osd->o_osdc = osdc;
1270 osd->o_osd = onum;
1271 osd->o_sparse_op_idx = -1;
1272
1273 ceph_init_sparse_read(&osd->o_sparse_read);
1274
1275 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1276
1277 return osd;
1278 }
1279
get_osd(struct ceph_osd * osd)1280 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1281 {
1282 if (refcount_inc_not_zero(&osd->o_ref)) {
1283 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1284 refcount_read(&osd->o_ref));
1285 return osd;
1286 } else {
1287 dout("get_osd %p FAIL\n", osd);
1288 return NULL;
1289 }
1290 }
1291
put_osd(struct ceph_osd * osd)1292 static void put_osd(struct ceph_osd *osd)
1293 {
1294 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1295 refcount_read(&osd->o_ref) - 1);
1296 if (refcount_dec_and_test(&osd->o_ref)) {
1297 osd_cleanup(osd);
1298 kfree(osd);
1299 }
1300 }
1301
DEFINE_RB_FUNCS(osd,struct ceph_osd,o_osd,o_node)1302 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1303
1304 static void __move_osd_to_lru(struct ceph_osd *osd)
1305 {
1306 struct ceph_osd_client *osdc = osd->o_osdc;
1307
1308 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1309 BUG_ON(!list_empty(&osd->o_osd_lru));
1310
1311 spin_lock(&osdc->osd_lru_lock);
1312 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1313 spin_unlock(&osdc->osd_lru_lock);
1314
1315 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1316 }
1317
maybe_move_osd_to_lru(struct ceph_osd * osd)1318 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1319 {
1320 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1321 RB_EMPTY_ROOT(&osd->o_linger_requests))
1322 __move_osd_to_lru(osd);
1323 }
1324
__remove_osd_from_lru(struct ceph_osd * osd)1325 static void __remove_osd_from_lru(struct ceph_osd *osd)
1326 {
1327 struct ceph_osd_client *osdc = osd->o_osdc;
1328
1329 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1330
1331 spin_lock(&osdc->osd_lru_lock);
1332 if (!list_empty(&osd->o_osd_lru))
1333 list_del_init(&osd->o_osd_lru);
1334 spin_unlock(&osdc->osd_lru_lock);
1335 }
1336
1337 /*
1338 * Close the connection and assign any leftover requests to the
1339 * homeless session.
1340 */
close_osd(struct ceph_osd * osd)1341 static void close_osd(struct ceph_osd *osd)
1342 {
1343 struct ceph_osd_client *osdc = osd->o_osdc;
1344 struct rb_node *n;
1345
1346 verify_osdc_wrlocked(osdc);
1347 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1348
1349 ceph_con_close(&osd->o_con);
1350
1351 for (n = rb_first(&osd->o_requests); n; ) {
1352 struct ceph_osd_request *req =
1353 rb_entry(n, struct ceph_osd_request, r_node);
1354
1355 n = rb_next(n); /* unlink_request() */
1356
1357 dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1358 unlink_request(osd, req);
1359 link_request(&osdc->homeless_osd, req);
1360 }
1361 for (n = rb_first(&osd->o_linger_requests); n; ) {
1362 struct ceph_osd_linger_request *lreq =
1363 rb_entry(n, struct ceph_osd_linger_request, node);
1364
1365 n = rb_next(n); /* unlink_linger() */
1366
1367 dout(" reassigning lreq %p linger_id %llu\n", lreq,
1368 lreq->linger_id);
1369 unlink_linger(osd, lreq);
1370 link_linger(&osdc->homeless_osd, lreq);
1371 }
1372 clear_backoffs(osd);
1373
1374 __remove_osd_from_lru(osd);
1375 erase_osd(&osdc->osds, osd);
1376 put_osd(osd);
1377 }
1378
1379 /*
1380 * reset osd connect
1381 */
reopen_osd(struct ceph_osd * osd)1382 static int reopen_osd(struct ceph_osd *osd)
1383 {
1384 struct ceph_entity_addr *peer_addr;
1385
1386 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1387
1388 if (RB_EMPTY_ROOT(&osd->o_requests) &&
1389 RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1390 close_osd(osd);
1391 return -ENODEV;
1392 }
1393
1394 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1395 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1396 !ceph_con_opened(&osd->o_con)) {
1397 struct rb_node *n;
1398
1399 dout("osd addr hasn't changed and connection never opened, "
1400 "letting msgr retry\n");
1401 /* touch each r_stamp for handle_timeout()'s benfit */
1402 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1403 struct ceph_osd_request *req =
1404 rb_entry(n, struct ceph_osd_request, r_node);
1405 req->r_stamp = jiffies;
1406 }
1407
1408 return -EAGAIN;
1409 }
1410
1411 ceph_con_close(&osd->o_con);
1412 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1413 osd->o_incarnation++;
1414
1415 return 0;
1416 }
1417
lookup_create_osd(struct ceph_osd_client * osdc,int o,bool wrlocked)1418 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1419 bool wrlocked)
1420 {
1421 struct ceph_osd *osd;
1422
1423 if (wrlocked)
1424 verify_osdc_wrlocked(osdc);
1425 else
1426 verify_osdc_locked(osdc);
1427
1428 if (o != CEPH_HOMELESS_OSD)
1429 osd = lookup_osd(&osdc->osds, o);
1430 else
1431 osd = &osdc->homeless_osd;
1432 if (!osd) {
1433 if (!wrlocked)
1434 return ERR_PTR(-EAGAIN);
1435
1436 osd = create_osd(osdc, o);
1437 insert_osd(&osdc->osds, osd);
1438 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1439 &osdc->osdmap->osd_addr[osd->o_osd]);
1440 }
1441
1442 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1443 return osd;
1444 }
1445
1446 /*
1447 * Create request <-> OSD session relation.
1448 *
1449 * @req has to be assigned a tid, @osd may be homeless.
1450 */
link_request(struct ceph_osd * osd,struct ceph_osd_request * req)1451 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1452 {
1453 verify_osd_locked(osd);
1454 WARN_ON(!req->r_tid || req->r_osd);
1455 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1456 req, req->r_tid);
1457
1458 if (!osd_homeless(osd))
1459 __remove_osd_from_lru(osd);
1460 else
1461 atomic_inc(&osd->o_osdc->num_homeless);
1462
1463 get_osd(osd);
1464 spin_lock(&osd->o_requests_lock);
1465 insert_request(&osd->o_requests, req);
1466 spin_unlock(&osd->o_requests_lock);
1467 req->r_osd = osd;
1468 }
1469
unlink_request(struct ceph_osd * osd,struct ceph_osd_request * req)1470 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1471 {
1472 verify_osd_locked(osd);
1473 WARN_ON(req->r_osd != osd);
1474 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1475 req, req->r_tid);
1476
1477 req->r_osd = NULL;
1478 spin_lock(&osd->o_requests_lock);
1479 erase_request(&osd->o_requests, req);
1480 spin_unlock(&osd->o_requests_lock);
1481 put_osd(osd);
1482
1483 if (!osd_homeless(osd))
1484 maybe_move_osd_to_lru(osd);
1485 else
1486 atomic_dec(&osd->o_osdc->num_homeless);
1487 }
1488
__pool_full(struct ceph_pg_pool_info * pi)1489 static bool __pool_full(struct ceph_pg_pool_info *pi)
1490 {
1491 return pi->flags & CEPH_POOL_FLAG_FULL;
1492 }
1493
have_pool_full(struct ceph_osd_client * osdc)1494 static bool have_pool_full(struct ceph_osd_client *osdc)
1495 {
1496 struct rb_node *n;
1497
1498 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1499 struct ceph_pg_pool_info *pi =
1500 rb_entry(n, struct ceph_pg_pool_info, node);
1501
1502 if (__pool_full(pi))
1503 return true;
1504 }
1505
1506 return false;
1507 }
1508
pool_full(struct ceph_osd_client * osdc,s64 pool_id)1509 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1510 {
1511 struct ceph_pg_pool_info *pi;
1512
1513 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1514 if (!pi)
1515 return false;
1516
1517 return __pool_full(pi);
1518 }
1519
1520 /*
1521 * Returns whether a request should be blocked from being sent
1522 * based on the current osdmap and osd_client settings.
1523 */
target_should_be_paused(struct ceph_osd_client * osdc,const struct ceph_osd_request_target * t,struct ceph_pg_pool_info * pi)1524 static bool target_should_be_paused(struct ceph_osd_client *osdc,
1525 const struct ceph_osd_request_target *t,
1526 struct ceph_pg_pool_info *pi)
1527 {
1528 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1529 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1530 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1531 __pool_full(pi);
1532
1533 WARN_ON(pi->id != t->target_oloc.pool);
1534 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1535 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1536 (osdc->osdmap->epoch < osdc->epoch_barrier);
1537 }
1538
pick_random_replica(const struct ceph_osds * acting)1539 static int pick_random_replica(const struct ceph_osds *acting)
1540 {
1541 int i = get_random_u32_below(acting->size);
1542
1543 dout("%s picked osd%d, primary osd%d\n", __func__,
1544 acting->osds[i], acting->primary);
1545 return i;
1546 }
1547
1548 /*
1549 * Picks the closest replica based on client's location given by
1550 * crush_location option. Prefers the primary if the locality is
1551 * the same.
1552 */
pick_closest_replica(struct ceph_osd_client * osdc,const struct ceph_osds * acting)1553 static int pick_closest_replica(struct ceph_osd_client *osdc,
1554 const struct ceph_osds *acting)
1555 {
1556 struct ceph_options *opt = osdc->client->options;
1557 int best_i, best_locality;
1558 int i = 0, locality;
1559
1560 do {
1561 locality = ceph_get_crush_locality(osdc->osdmap,
1562 acting->osds[i],
1563 &opt->crush_locs);
1564 if (i == 0 ||
1565 (locality >= 0 && best_locality < 0) ||
1566 (locality >= 0 && best_locality >= 0 &&
1567 locality < best_locality)) {
1568 best_i = i;
1569 best_locality = locality;
1570 }
1571 } while (++i < acting->size);
1572
1573 dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1574 acting->osds[best_i], best_locality, acting->primary);
1575 return best_i;
1576 }
1577
1578 enum calc_target_result {
1579 CALC_TARGET_NO_ACTION = 0,
1580 CALC_TARGET_NEED_RESEND,
1581 CALC_TARGET_POOL_DNE,
1582 };
1583
calc_target(struct ceph_osd_client * osdc,struct ceph_osd_request_target * t,bool any_change)1584 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1585 struct ceph_osd_request_target *t,
1586 bool any_change)
1587 {
1588 struct ceph_pg_pool_info *pi;
1589 struct ceph_pg pgid, last_pgid;
1590 struct ceph_osds up, acting;
1591 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1592 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
1593 bool force_resend = false;
1594 bool unpaused = false;
1595 bool legacy_change = false;
1596 bool split = false;
1597 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1598 bool recovery_deletes = ceph_osdmap_flag(osdc,
1599 CEPH_OSDMAP_RECOVERY_DELETES);
1600 enum calc_target_result ct_res;
1601
1602 t->epoch = osdc->osdmap->epoch;
1603 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1604 if (!pi) {
1605 t->osd = CEPH_HOMELESS_OSD;
1606 ct_res = CALC_TARGET_POOL_DNE;
1607 goto out;
1608 }
1609
1610 if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1611 if (t->last_force_resend < pi->last_force_request_resend) {
1612 t->last_force_resend = pi->last_force_request_resend;
1613 force_resend = true;
1614 } else if (t->last_force_resend == 0) {
1615 force_resend = true;
1616 }
1617 }
1618
1619 /* apply tiering */
1620 ceph_oid_copy(&t->target_oid, &t->base_oid);
1621 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1622 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1623 if (is_read && pi->read_tier >= 0)
1624 t->target_oloc.pool = pi->read_tier;
1625 if (is_write && pi->write_tier >= 0)
1626 t->target_oloc.pool = pi->write_tier;
1627
1628 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1629 if (!pi) {
1630 t->osd = CEPH_HOMELESS_OSD;
1631 ct_res = CALC_TARGET_POOL_DNE;
1632 goto out;
1633 }
1634 }
1635
1636 __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1637 last_pgid.pool = pgid.pool;
1638 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1639
1640 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1641 if (any_change &&
1642 ceph_is_new_interval(&t->acting,
1643 &acting,
1644 &t->up,
1645 &up,
1646 t->size,
1647 pi->size,
1648 t->min_size,
1649 pi->min_size,
1650 t->pg_num,
1651 pi->pg_num,
1652 t->sort_bitwise,
1653 sort_bitwise,
1654 t->recovery_deletes,
1655 recovery_deletes,
1656 &last_pgid))
1657 force_resend = true;
1658
1659 if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1660 t->paused = false;
1661 unpaused = true;
1662 }
1663 legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1664 ceph_osds_changed(&t->acting, &acting,
1665 t->used_replica || any_change);
1666 if (t->pg_num)
1667 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1668
1669 if (legacy_change || force_resend || split) {
1670 t->pgid = pgid; /* struct */
1671 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1672 ceph_osds_copy(&t->acting, &acting);
1673 ceph_osds_copy(&t->up, &up);
1674 t->size = pi->size;
1675 t->min_size = pi->min_size;
1676 t->pg_num = pi->pg_num;
1677 t->pg_num_mask = pi->pg_num_mask;
1678 t->sort_bitwise = sort_bitwise;
1679 t->recovery_deletes = recovery_deletes;
1680
1681 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1682 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1683 !is_write && pi->type == CEPH_POOL_TYPE_REP &&
1684 acting.size > 1) {
1685 int pos;
1686
1687 WARN_ON(!is_read || acting.osds[0] != acting.primary);
1688 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1689 pos = pick_random_replica(&acting);
1690 } else {
1691 pos = pick_closest_replica(osdc, &acting);
1692 }
1693 t->osd = acting.osds[pos];
1694 t->used_replica = pos > 0;
1695 } else {
1696 t->osd = acting.primary;
1697 t->used_replica = false;
1698 }
1699 }
1700
1701 if (unpaused || legacy_change || force_resend || split)
1702 ct_res = CALC_TARGET_NEED_RESEND;
1703 else
1704 ct_res = CALC_TARGET_NO_ACTION;
1705
1706 out:
1707 dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1708 legacy_change, force_resend, split, ct_res, t->osd);
1709 return ct_res;
1710 }
1711
alloc_spg_mapping(void)1712 static struct ceph_spg_mapping *alloc_spg_mapping(void)
1713 {
1714 struct ceph_spg_mapping *spg;
1715
1716 spg = kmalloc(sizeof(*spg), GFP_NOIO);
1717 if (!spg)
1718 return NULL;
1719
1720 RB_CLEAR_NODE(&spg->node);
1721 spg->backoffs = RB_ROOT;
1722 return spg;
1723 }
1724
free_spg_mapping(struct ceph_spg_mapping * spg)1725 static void free_spg_mapping(struct ceph_spg_mapping *spg)
1726 {
1727 WARN_ON(!RB_EMPTY_NODE(&spg->node));
1728 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1729
1730 kfree(spg);
1731 }
1732
1733 /*
1734 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1735 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is
1736 * defined only within a specific spgid; it does not pass anything to
1737 * children on split, or to another primary.
1738 */
DEFINE_RB_FUNCS2(spg_mapping,struct ceph_spg_mapping,spgid,ceph_spg_compare,RB_BYPTR,const struct ceph_spg *,node)1739 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1740 RB_BYPTR, const struct ceph_spg *, node)
1741
1742 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1743 {
1744 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1745 }
1746
hoid_get_effective_key(const struct ceph_hobject_id * hoid,void ** pkey,size_t * pkey_len)1747 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1748 void **pkey, size_t *pkey_len)
1749 {
1750 if (hoid->key_len) {
1751 *pkey = hoid->key;
1752 *pkey_len = hoid->key_len;
1753 } else {
1754 *pkey = hoid->oid;
1755 *pkey_len = hoid->oid_len;
1756 }
1757 }
1758
compare_names(const void * name1,size_t name1_len,const void * name2,size_t name2_len)1759 static int compare_names(const void *name1, size_t name1_len,
1760 const void *name2, size_t name2_len)
1761 {
1762 int ret;
1763
1764 ret = memcmp(name1, name2, min(name1_len, name2_len));
1765 if (!ret) {
1766 if (name1_len < name2_len)
1767 ret = -1;
1768 else if (name1_len > name2_len)
1769 ret = 1;
1770 }
1771 return ret;
1772 }
1773
hoid_compare(const struct ceph_hobject_id * lhs,const struct ceph_hobject_id * rhs)1774 static int hoid_compare(const struct ceph_hobject_id *lhs,
1775 const struct ceph_hobject_id *rhs)
1776 {
1777 void *effective_key1, *effective_key2;
1778 size_t effective_key1_len, effective_key2_len;
1779 int ret;
1780
1781 if (lhs->is_max < rhs->is_max)
1782 return -1;
1783 if (lhs->is_max > rhs->is_max)
1784 return 1;
1785
1786 if (lhs->pool < rhs->pool)
1787 return -1;
1788 if (lhs->pool > rhs->pool)
1789 return 1;
1790
1791 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1792 return -1;
1793 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1794 return 1;
1795
1796 ret = compare_names(lhs->nspace, lhs->nspace_len,
1797 rhs->nspace, rhs->nspace_len);
1798 if (ret)
1799 return ret;
1800
1801 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1802 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1803 ret = compare_names(effective_key1, effective_key1_len,
1804 effective_key2, effective_key2_len);
1805 if (ret)
1806 return ret;
1807
1808 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1809 if (ret)
1810 return ret;
1811
1812 if (lhs->snapid < rhs->snapid)
1813 return -1;
1814 if (lhs->snapid > rhs->snapid)
1815 return 1;
1816
1817 return 0;
1818 }
1819
1820 /*
1821 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1822 * compat stuff here.
1823 *
1824 * Assumes @hoid is zero-initialized.
1825 */
decode_hoid(void ** p,void * end,struct ceph_hobject_id * hoid)1826 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1827 {
1828 u8 struct_v;
1829 u32 struct_len;
1830 int ret;
1831
1832 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1833 &struct_len);
1834 if (ret)
1835 return ret;
1836
1837 if (struct_v < 4) {
1838 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1839 goto e_inval;
1840 }
1841
1842 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1843 GFP_NOIO);
1844 if (IS_ERR(hoid->key)) {
1845 ret = PTR_ERR(hoid->key);
1846 hoid->key = NULL;
1847 return ret;
1848 }
1849
1850 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1851 GFP_NOIO);
1852 if (IS_ERR(hoid->oid)) {
1853 ret = PTR_ERR(hoid->oid);
1854 hoid->oid = NULL;
1855 return ret;
1856 }
1857
1858 ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1859 ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1860 ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1861
1862 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1863 GFP_NOIO);
1864 if (IS_ERR(hoid->nspace)) {
1865 ret = PTR_ERR(hoid->nspace);
1866 hoid->nspace = NULL;
1867 return ret;
1868 }
1869
1870 ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1871
1872 ceph_hoid_build_hash_cache(hoid);
1873 return 0;
1874
1875 e_inval:
1876 return -EINVAL;
1877 }
1878
hoid_encoding_size(const struct ceph_hobject_id * hoid)1879 static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1880 {
1881 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1882 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1883 }
1884
encode_hoid(void ** p,void * end,const struct ceph_hobject_id * hoid)1885 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1886 {
1887 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1888 ceph_encode_string(p, end, hoid->key, hoid->key_len);
1889 ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1890 ceph_encode_64(p, hoid->snapid);
1891 ceph_encode_32(p, hoid->hash);
1892 ceph_encode_8(p, hoid->is_max);
1893 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1894 ceph_encode_64(p, hoid->pool);
1895 }
1896
free_hoid(struct ceph_hobject_id * hoid)1897 static void free_hoid(struct ceph_hobject_id *hoid)
1898 {
1899 if (hoid) {
1900 kfree(hoid->key);
1901 kfree(hoid->oid);
1902 kfree(hoid->nspace);
1903 kfree(hoid);
1904 }
1905 }
1906
alloc_backoff(void)1907 static struct ceph_osd_backoff *alloc_backoff(void)
1908 {
1909 struct ceph_osd_backoff *backoff;
1910
1911 backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1912 if (!backoff)
1913 return NULL;
1914
1915 RB_CLEAR_NODE(&backoff->spg_node);
1916 RB_CLEAR_NODE(&backoff->id_node);
1917 return backoff;
1918 }
1919
free_backoff(struct ceph_osd_backoff * backoff)1920 static void free_backoff(struct ceph_osd_backoff *backoff)
1921 {
1922 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1923 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1924
1925 free_hoid(backoff->begin);
1926 free_hoid(backoff->end);
1927 kfree(backoff);
1928 }
1929
1930 /*
1931 * Within a specific spgid, backoffs are managed by ->begin hoid.
1932 */
1933 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1934 RB_BYVAL, spg_node);
1935
lookup_containing_backoff(struct rb_root * root,const struct ceph_hobject_id * hoid)1936 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1937 const struct ceph_hobject_id *hoid)
1938 {
1939 struct rb_node *n = root->rb_node;
1940
1941 while (n) {
1942 struct ceph_osd_backoff *cur =
1943 rb_entry(n, struct ceph_osd_backoff, spg_node);
1944 int cmp;
1945
1946 cmp = hoid_compare(hoid, cur->begin);
1947 if (cmp < 0) {
1948 n = n->rb_left;
1949 } else if (cmp > 0) {
1950 if (hoid_compare(hoid, cur->end) < 0)
1951 return cur;
1952
1953 n = n->rb_right;
1954 } else {
1955 return cur;
1956 }
1957 }
1958
1959 return NULL;
1960 }
1961
1962 /*
1963 * Each backoff has a unique id within its OSD session.
1964 */
DEFINE_RB_FUNCS(backoff_by_id,struct ceph_osd_backoff,id,id_node)1965 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1966
1967 static void clear_backoffs(struct ceph_osd *osd)
1968 {
1969 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1970 struct ceph_spg_mapping *spg =
1971 rb_entry(rb_first(&osd->o_backoff_mappings),
1972 struct ceph_spg_mapping, node);
1973
1974 while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1975 struct ceph_osd_backoff *backoff =
1976 rb_entry(rb_first(&spg->backoffs),
1977 struct ceph_osd_backoff, spg_node);
1978
1979 erase_backoff(&spg->backoffs, backoff);
1980 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1981 free_backoff(backoff);
1982 }
1983 erase_spg_mapping(&osd->o_backoff_mappings, spg);
1984 free_spg_mapping(spg);
1985 }
1986 }
1987
1988 /*
1989 * Set up a temporary, non-owning view into @t.
1990 */
hoid_fill_from_target(struct ceph_hobject_id * hoid,const struct ceph_osd_request_target * t)1991 static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1992 const struct ceph_osd_request_target *t)
1993 {
1994 hoid->key = NULL;
1995 hoid->key_len = 0;
1996 hoid->oid = t->target_oid.name;
1997 hoid->oid_len = t->target_oid.name_len;
1998 hoid->snapid = CEPH_NOSNAP;
1999 hoid->hash = t->pgid.seed;
2000 hoid->is_max = false;
2001 if (t->target_oloc.pool_ns) {
2002 hoid->nspace = t->target_oloc.pool_ns->str;
2003 hoid->nspace_len = t->target_oloc.pool_ns->len;
2004 } else {
2005 hoid->nspace = NULL;
2006 hoid->nspace_len = 0;
2007 }
2008 hoid->pool = t->target_oloc.pool;
2009 ceph_hoid_build_hash_cache(hoid);
2010 }
2011
should_plug_request(struct ceph_osd_request * req)2012 static bool should_plug_request(struct ceph_osd_request *req)
2013 {
2014 struct ceph_osd *osd = req->r_osd;
2015 struct ceph_spg_mapping *spg;
2016 struct ceph_osd_backoff *backoff;
2017 struct ceph_hobject_id hoid;
2018
2019 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
2020 if (!spg)
2021 return false;
2022
2023 hoid_fill_from_target(&hoid, &req->r_t);
2024 backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
2025 if (!backoff)
2026 return false;
2027
2028 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
2029 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
2030 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
2031 return true;
2032 }
2033
2034 /*
2035 * Keep get_num_data_items() in sync with this function.
2036 */
setup_request_data(struct ceph_osd_request * req)2037 static void setup_request_data(struct ceph_osd_request *req)
2038 {
2039 struct ceph_msg *request_msg = req->r_request;
2040 struct ceph_msg *reply_msg = req->r_reply;
2041 struct ceph_osd_req_op *op;
2042
2043 if (req->r_request->num_data_items || req->r_reply->num_data_items)
2044 return;
2045
2046 WARN_ON(request_msg->data_length || reply_msg->data_length);
2047 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
2048 switch (op->op) {
2049 /* request */
2050 case CEPH_OSD_OP_WRITE:
2051 case CEPH_OSD_OP_WRITEFULL:
2052 WARN_ON(op->indata_len != op->extent.length);
2053 ceph_osdc_msg_data_add(request_msg,
2054 &op->extent.osd_data);
2055 break;
2056 case CEPH_OSD_OP_SETXATTR:
2057 case CEPH_OSD_OP_CMPXATTR:
2058 WARN_ON(op->indata_len != op->xattr.name_len +
2059 op->xattr.value_len);
2060 ceph_osdc_msg_data_add(request_msg,
2061 &op->xattr.osd_data);
2062 break;
2063 case CEPH_OSD_OP_NOTIFY_ACK:
2064 ceph_osdc_msg_data_add(request_msg,
2065 &op->notify_ack.request_data);
2066 break;
2067 case CEPH_OSD_OP_COPY_FROM2:
2068 ceph_osdc_msg_data_add(request_msg,
2069 &op->copy_from.osd_data);
2070 break;
2071
2072 /* reply */
2073 case CEPH_OSD_OP_STAT:
2074 ceph_osdc_msg_data_add(reply_msg,
2075 &op->raw_data_in);
2076 break;
2077 case CEPH_OSD_OP_READ:
2078 case CEPH_OSD_OP_SPARSE_READ:
2079 ceph_osdc_msg_data_add(reply_msg,
2080 &op->extent.osd_data);
2081 break;
2082 case CEPH_OSD_OP_LIST_WATCHERS:
2083 ceph_osdc_msg_data_add(reply_msg,
2084 &op->list_watchers.response_data);
2085 break;
2086
2087 /* both */
2088 case CEPH_OSD_OP_CALL:
2089 WARN_ON(op->indata_len != op->cls.class_len +
2090 op->cls.method_len +
2091 op->cls.indata_len);
2092 ceph_osdc_msg_data_add(request_msg,
2093 &op->cls.request_info);
2094 /* optional, can be NONE */
2095 ceph_osdc_msg_data_add(request_msg,
2096 &op->cls.request_data);
2097 /* optional, can be NONE */
2098 ceph_osdc_msg_data_add(reply_msg,
2099 &op->cls.response_data);
2100 break;
2101 case CEPH_OSD_OP_NOTIFY:
2102 ceph_osdc_msg_data_add(request_msg,
2103 &op->notify.request_data);
2104 ceph_osdc_msg_data_add(reply_msg,
2105 &op->notify.response_data);
2106 break;
2107 }
2108 }
2109 }
2110
encode_pgid(void ** p,const struct ceph_pg * pgid)2111 static void encode_pgid(void **p, const struct ceph_pg *pgid)
2112 {
2113 ceph_encode_8(p, 1);
2114 ceph_encode_64(p, pgid->pool);
2115 ceph_encode_32(p, pgid->seed);
2116 ceph_encode_32(p, -1); /* preferred */
2117 }
2118
encode_spgid(void ** p,const struct ceph_spg * spgid)2119 static void encode_spgid(void **p, const struct ceph_spg *spgid)
2120 {
2121 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
2122 encode_pgid(p, &spgid->pgid);
2123 ceph_encode_8(p, spgid->shard);
2124 }
2125
encode_oloc(void ** p,void * end,const struct ceph_object_locator * oloc)2126 static void encode_oloc(void **p, void *end,
2127 const struct ceph_object_locator *oloc)
2128 {
2129 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
2130 ceph_encode_64(p, oloc->pool);
2131 ceph_encode_32(p, -1); /* preferred */
2132 ceph_encode_32(p, 0); /* key len */
2133 if (oloc->pool_ns)
2134 ceph_encode_string(p, end, oloc->pool_ns->str,
2135 oloc->pool_ns->len);
2136 else
2137 ceph_encode_32(p, 0);
2138 }
2139
encode_request_partial(struct ceph_osd_request * req,struct ceph_msg * msg)2140 static void encode_request_partial(struct ceph_osd_request *req,
2141 struct ceph_msg *msg)
2142 {
2143 void *p = msg->front.iov_base;
2144 void *const end = p + msg->front_alloc_len;
2145 u32 data_len = 0;
2146 int i;
2147
2148 if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
2149 /* snapshots aren't writeable */
2150 WARN_ON(req->r_snapid != CEPH_NOSNAP);
2151 } else {
2152 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
2153 req->r_data_offset || req->r_snapc);
2154 }
2155
2156 setup_request_data(req);
2157
2158 encode_spgid(&p, &req->r_t.spgid); /* actual spg */
2159 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
2160 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
2161 ceph_encode_32(&p, req->r_flags);
2162
2163 /* reqid */
2164 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
2165 memset(p, 0, sizeof(struct ceph_osd_reqid));
2166 p += sizeof(struct ceph_osd_reqid);
2167
2168 /* trace */
2169 memset(p, 0, sizeof(struct ceph_blkin_trace_info));
2170 p += sizeof(struct ceph_blkin_trace_info);
2171
2172 ceph_encode_32(&p, 0); /* client_inc, always 0 */
2173 ceph_encode_timespec64(p, &req->r_mtime);
2174 p += sizeof(struct ceph_timespec);
2175
2176 encode_oloc(&p, end, &req->r_t.target_oloc);
2177 ceph_encode_string(&p, end, req->r_t.target_oid.name,
2178 req->r_t.target_oid.name_len);
2179
2180 /* ops, can imply data */
2181 ceph_encode_16(&p, req->r_num_ops);
2182 for (i = 0; i < req->r_num_ops; i++) {
2183 data_len += osd_req_encode_op(p, &req->r_ops[i]);
2184 p += sizeof(struct ceph_osd_op);
2185 }
2186
2187 ceph_encode_64(&p, req->r_snapid); /* snapid */
2188 if (req->r_snapc) {
2189 ceph_encode_64(&p, req->r_snapc->seq);
2190 ceph_encode_32(&p, req->r_snapc->num_snaps);
2191 for (i = 0; i < req->r_snapc->num_snaps; i++)
2192 ceph_encode_64(&p, req->r_snapc->snaps[i]);
2193 } else {
2194 ceph_encode_64(&p, 0); /* snap_seq */
2195 ceph_encode_32(&p, 0); /* snaps len */
2196 }
2197
2198 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
2199 BUG_ON(p > end - 8); /* space for features */
2200
2201 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
2202 /* front_len is finalized in encode_request_finish() */
2203 msg->front.iov_len = p - msg->front.iov_base;
2204 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2205 msg->hdr.data_len = cpu_to_le32(data_len);
2206 /*
2207 * The header "data_off" is a hint to the receiver allowing it
2208 * to align received data into its buffers such that there's no
2209 * need to re-copy it before writing it to disk (direct I/O).
2210 */
2211 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
2212
2213 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
2214 req->r_t.target_oid.name, req->r_t.target_oid.name_len);
2215 }
2216
encode_request_finish(struct ceph_msg * msg)2217 static void encode_request_finish(struct ceph_msg *msg)
2218 {
2219 void *p = msg->front.iov_base;
2220 void *const partial_end = p + msg->front.iov_len;
2221 void *const end = p + msg->front_alloc_len;
2222
2223 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2224 /* luminous OSD -- encode features and be done */
2225 p = partial_end;
2226 ceph_encode_64(&p, msg->con->peer_features);
2227 } else {
2228 struct {
2229 char spgid[CEPH_ENCODING_START_BLK_LEN +
2230 CEPH_PGID_ENCODING_LEN + 1];
2231 __le32 hash;
2232 __le32 epoch;
2233 __le32 flags;
2234 char reqid[CEPH_ENCODING_START_BLK_LEN +
2235 sizeof(struct ceph_osd_reqid)];
2236 char trace[sizeof(struct ceph_blkin_trace_info)];
2237 __le32 client_inc;
2238 struct ceph_timespec mtime;
2239 } __packed head;
2240 struct ceph_pg pgid;
2241 void *oloc, *oid, *tail;
2242 int oloc_len, oid_len, tail_len;
2243 int len;
2244
2245 /*
2246 * Pre-luminous OSD -- reencode v8 into v4 using @head
2247 * as a temporary buffer. Encode the raw PG; the rest
2248 * is just a matter of moving oloc, oid and tail blobs
2249 * around.
2250 */
2251 memcpy(&head, p, sizeof(head));
2252 p += sizeof(head);
2253
2254 oloc = p;
2255 p += CEPH_ENCODING_START_BLK_LEN;
2256 pgid.pool = ceph_decode_64(&p);
2257 p += 4 + 4; /* preferred, key len */
2258 len = ceph_decode_32(&p);
2259 p += len; /* nspace */
2260 oloc_len = p - oloc;
2261
2262 oid = p;
2263 len = ceph_decode_32(&p);
2264 p += len;
2265 oid_len = p - oid;
2266
2267 tail = p;
2268 tail_len = partial_end - p;
2269
2270 p = msg->front.iov_base;
2271 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2272 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2273 ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2274 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2275
2276 /* reassert_version */
2277 memset(p, 0, sizeof(struct ceph_eversion));
2278 p += sizeof(struct ceph_eversion);
2279
2280 BUG_ON(p >= oloc);
2281 memmove(p, oloc, oloc_len);
2282 p += oloc_len;
2283
2284 pgid.seed = le32_to_cpu(head.hash);
2285 encode_pgid(&p, &pgid); /* raw pg */
2286
2287 BUG_ON(p >= oid);
2288 memmove(p, oid, oid_len);
2289 p += oid_len;
2290
2291 /* tail -- ops, snapid, snapc, retry_attempt */
2292 BUG_ON(p >= tail);
2293 memmove(p, tail, tail_len);
2294 p += tail_len;
2295
2296 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2297 }
2298
2299 BUG_ON(p > end);
2300 msg->front.iov_len = p - msg->front.iov_base;
2301 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2302
2303 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2304 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2305 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2306 le16_to_cpu(msg->hdr.version));
2307 }
2308
2309 /*
2310 * @req has to be assigned a tid and registered.
2311 */
send_request(struct ceph_osd_request * req)2312 static void send_request(struct ceph_osd_request *req)
2313 {
2314 struct ceph_osd *osd = req->r_osd;
2315
2316 verify_osd_locked(osd);
2317 WARN_ON(osd->o_osd != req->r_t.osd);
2318
2319 /* backoff? */
2320 if (should_plug_request(req))
2321 return;
2322
2323 /*
2324 * We may have a previously queued request message hanging
2325 * around. Cancel it to avoid corrupting the msgr.
2326 */
2327 if (req->r_sent)
2328 ceph_msg_revoke(req->r_request);
2329
2330 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2331 if (req->r_attempts)
2332 req->r_flags |= CEPH_OSD_FLAG_RETRY;
2333 else
2334 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2335
2336 encode_request_partial(req, req->r_request);
2337
2338 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2339 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2340 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2341 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2342 req->r_attempts);
2343
2344 req->r_t.paused = false;
2345 req->r_stamp = jiffies;
2346 req->r_attempts++;
2347
2348 req->r_sent = osd->o_incarnation;
2349 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2350 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2351 }
2352
maybe_request_map(struct ceph_osd_client * osdc)2353 static void maybe_request_map(struct ceph_osd_client *osdc)
2354 {
2355 bool continuous = false;
2356
2357 verify_osdc_locked(osdc);
2358 WARN_ON(!osdc->osdmap->epoch);
2359
2360 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2361 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2362 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2363 dout("%s osdc %p continuous\n", __func__, osdc);
2364 continuous = true;
2365 } else {
2366 dout("%s osdc %p onetime\n", __func__, osdc);
2367 }
2368
2369 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2370 osdc->osdmap->epoch + 1, continuous))
2371 ceph_monc_renew_subs(&osdc->client->monc);
2372 }
2373
2374 static void complete_request(struct ceph_osd_request *req, int err);
2375 static void send_map_check(struct ceph_osd_request *req);
2376
__submit_request(struct ceph_osd_request * req,bool wrlocked)2377 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2378 {
2379 struct ceph_osd_client *osdc = req->r_osdc;
2380 struct ceph_osd *osd;
2381 enum calc_target_result ct_res;
2382 int err = 0;
2383 bool need_send = false;
2384 bool promoted = false;
2385
2386 WARN_ON(req->r_tid);
2387 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2388
2389 again:
2390 ct_res = calc_target(osdc, &req->r_t, false);
2391 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2392 goto promote;
2393
2394 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2395 if (IS_ERR(osd)) {
2396 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2397 goto promote;
2398 }
2399
2400 if (osdc->abort_err) {
2401 dout("req %p abort_err %d\n", req, osdc->abort_err);
2402 err = osdc->abort_err;
2403 } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2404 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2405 osdc->epoch_barrier);
2406 req->r_t.paused = true;
2407 maybe_request_map(osdc);
2408 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2409 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2410 dout("req %p pausewr\n", req);
2411 req->r_t.paused = true;
2412 maybe_request_map(osdc);
2413 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2414 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2415 dout("req %p pauserd\n", req);
2416 req->r_t.paused = true;
2417 maybe_request_map(osdc);
2418 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2419 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2420 CEPH_OSD_FLAG_FULL_FORCE)) &&
2421 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2422 pool_full(osdc, req->r_t.base_oloc.pool))) {
2423 dout("req %p full/pool_full\n", req);
2424 if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
2425 err = -ENOSPC;
2426 } else {
2427 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL))
2428 pr_warn_ratelimited("cluster is full (osdmap FULL)\n");
2429 else
2430 pr_warn_ratelimited("pool %lld is full or reached quota\n",
2431 req->r_t.base_oloc.pool);
2432 req->r_t.paused = true;
2433 maybe_request_map(osdc);
2434 }
2435 } else if (!osd_homeless(osd)) {
2436 need_send = true;
2437 } else {
2438 maybe_request_map(osdc);
2439 }
2440
2441 mutex_lock(&osd->lock);
2442 /*
2443 * Assign the tid atomically with send_request() to protect
2444 * multiple writes to the same object from racing with each
2445 * other, resulting in out of order ops on the OSDs.
2446 */
2447 req->r_tid = atomic64_inc_return(&osdc->last_tid);
2448 link_request(osd, req);
2449 if (need_send)
2450 send_request(req);
2451 else if (err)
2452 complete_request(req, err);
2453 mutex_unlock(&osd->lock);
2454
2455 if (!err && ct_res == CALC_TARGET_POOL_DNE)
2456 send_map_check(req);
2457
2458 if (promoted)
2459 downgrade_write(&osdc->lock);
2460 return;
2461
2462 promote:
2463 up_read(&osdc->lock);
2464 down_write(&osdc->lock);
2465 wrlocked = true;
2466 promoted = true;
2467 goto again;
2468 }
2469
account_request(struct ceph_osd_request * req)2470 static void account_request(struct ceph_osd_request *req)
2471 {
2472 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2473 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2474
2475 req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2476 atomic_inc(&req->r_osdc->num_requests);
2477
2478 req->r_start_stamp = jiffies;
2479 req->r_start_latency = ktime_get();
2480 }
2481
submit_request(struct ceph_osd_request * req,bool wrlocked)2482 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2483 {
2484 ceph_osdc_get_request(req);
2485 account_request(req);
2486 __submit_request(req, wrlocked);
2487 }
2488
finish_request(struct ceph_osd_request * req)2489 static void finish_request(struct ceph_osd_request *req)
2490 {
2491 struct ceph_osd_client *osdc = req->r_osdc;
2492
2493 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2494 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2495
2496 req->r_end_latency = ktime_get();
2497
2498 if (req->r_osd) {
2499 ceph_init_sparse_read(&req->r_osd->o_sparse_read);
2500 unlink_request(req->r_osd, req);
2501 }
2502 atomic_dec(&osdc->num_requests);
2503
2504 /*
2505 * If an OSD has failed or returned and a request has been sent
2506 * twice, it's possible to get a reply and end up here while the
2507 * request message is queued for delivery. We will ignore the
2508 * reply, so not a big deal, but better to try and catch it.
2509 */
2510 ceph_msg_revoke(req->r_request);
2511 ceph_msg_revoke_incoming(req->r_reply);
2512 }
2513
__complete_request(struct ceph_osd_request * req)2514 static void __complete_request(struct ceph_osd_request *req)
2515 {
2516 dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
2517 req->r_tid, req->r_callback, req->r_result);
2518
2519 if (req->r_callback)
2520 req->r_callback(req);
2521 complete_all(&req->r_completion);
2522 ceph_osdc_put_request(req);
2523 }
2524
complete_request_workfn(struct work_struct * work)2525 static void complete_request_workfn(struct work_struct *work)
2526 {
2527 struct ceph_osd_request *req =
2528 container_of(work, struct ceph_osd_request, r_complete_work);
2529
2530 __complete_request(req);
2531 }
2532
2533 /*
2534 * This is open-coded in handle_reply().
2535 */
complete_request(struct ceph_osd_request * req,int err)2536 static void complete_request(struct ceph_osd_request *req, int err)
2537 {
2538 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2539
2540 req->r_result = err;
2541 finish_request(req);
2542
2543 INIT_WORK(&req->r_complete_work, complete_request_workfn);
2544 queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2545 }
2546
cancel_map_check(struct ceph_osd_request * req)2547 static void cancel_map_check(struct ceph_osd_request *req)
2548 {
2549 struct ceph_osd_client *osdc = req->r_osdc;
2550 struct ceph_osd_request *lookup_req;
2551
2552 verify_osdc_wrlocked(osdc);
2553
2554 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2555 if (!lookup_req)
2556 return;
2557
2558 WARN_ON(lookup_req != req);
2559 erase_request_mc(&osdc->map_checks, req);
2560 ceph_osdc_put_request(req);
2561 }
2562
cancel_request(struct ceph_osd_request * req)2563 static void cancel_request(struct ceph_osd_request *req)
2564 {
2565 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2566
2567 cancel_map_check(req);
2568 finish_request(req);
2569 complete_all(&req->r_completion);
2570 ceph_osdc_put_request(req);
2571 }
2572
abort_request(struct ceph_osd_request * req,int err)2573 static void abort_request(struct ceph_osd_request *req, int err)
2574 {
2575 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2576
2577 cancel_map_check(req);
2578 complete_request(req, err);
2579 }
2580
abort_fn(struct ceph_osd_request * req,void * arg)2581 static int abort_fn(struct ceph_osd_request *req, void *arg)
2582 {
2583 int err = *(int *)arg;
2584
2585 abort_request(req, err);
2586 return 0; /* continue iteration */
2587 }
2588
2589 /*
2590 * Abort all in-flight requests with @err and arrange for all future
2591 * requests to be failed immediately.
2592 */
ceph_osdc_abort_requests(struct ceph_osd_client * osdc,int err)2593 void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2594 {
2595 dout("%s osdc %p err %d\n", __func__, osdc, err);
2596 down_write(&osdc->lock);
2597 for_each_request(osdc, abort_fn, &err);
2598 osdc->abort_err = err;
2599 up_write(&osdc->lock);
2600 }
2601 EXPORT_SYMBOL(ceph_osdc_abort_requests);
2602
ceph_osdc_clear_abort_err(struct ceph_osd_client * osdc)2603 void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2604 {
2605 down_write(&osdc->lock);
2606 osdc->abort_err = 0;
2607 up_write(&osdc->lock);
2608 }
2609 EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2610
update_epoch_barrier(struct ceph_osd_client * osdc,u32 eb)2611 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2612 {
2613 if (likely(eb > osdc->epoch_barrier)) {
2614 dout("updating epoch_barrier from %u to %u\n",
2615 osdc->epoch_barrier, eb);
2616 osdc->epoch_barrier = eb;
2617 /* Request map if we're not to the barrier yet */
2618 if (eb > osdc->osdmap->epoch)
2619 maybe_request_map(osdc);
2620 }
2621 }
2622
ceph_osdc_update_epoch_barrier(struct ceph_osd_client * osdc,u32 eb)2623 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2624 {
2625 down_read(&osdc->lock);
2626 if (unlikely(eb > osdc->epoch_barrier)) {
2627 up_read(&osdc->lock);
2628 down_write(&osdc->lock);
2629 update_epoch_barrier(osdc, eb);
2630 up_write(&osdc->lock);
2631 } else {
2632 up_read(&osdc->lock);
2633 }
2634 }
2635 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2636
2637 /*
2638 * We can end up releasing caps as a result of abort_request().
2639 * In that case, we probably want to ensure that the cap release message
2640 * has an updated epoch barrier in it, so set the epoch barrier prior to
2641 * aborting the first request.
2642 */
abort_on_full_fn(struct ceph_osd_request * req,void * arg)2643 static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2644 {
2645 struct ceph_osd_client *osdc = req->r_osdc;
2646 bool *victims = arg;
2647
2648 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2649 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2650 pool_full(osdc, req->r_t.base_oloc.pool))) {
2651 if (!*victims) {
2652 update_epoch_barrier(osdc, osdc->osdmap->epoch);
2653 *victims = true;
2654 }
2655 abort_request(req, -ENOSPC);
2656 }
2657
2658 return 0; /* continue iteration */
2659 }
2660
2661 /*
2662 * Drop all pending requests that are stalled waiting on a full condition to
2663 * clear, and complete them with ENOSPC as the return code. Set the
2664 * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2665 * cancelled.
2666 */
ceph_osdc_abort_on_full(struct ceph_osd_client * osdc)2667 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2668 {
2669 bool victims = false;
2670
2671 if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
2672 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2673 for_each_request(osdc, abort_on_full_fn, &victims);
2674 }
2675
check_pool_dne(struct ceph_osd_request * req)2676 static void check_pool_dne(struct ceph_osd_request *req)
2677 {
2678 struct ceph_osd_client *osdc = req->r_osdc;
2679 struct ceph_osdmap *map = osdc->osdmap;
2680
2681 verify_osdc_wrlocked(osdc);
2682 WARN_ON(!map->epoch);
2683
2684 if (req->r_attempts) {
2685 /*
2686 * We sent a request earlier, which means that
2687 * previously the pool existed, and now it does not
2688 * (i.e., it was deleted).
2689 */
2690 req->r_map_dne_bound = map->epoch;
2691 dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2692 req->r_tid);
2693 } else {
2694 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2695 req, req->r_tid, req->r_map_dne_bound, map->epoch);
2696 }
2697
2698 if (req->r_map_dne_bound) {
2699 if (map->epoch >= req->r_map_dne_bound) {
2700 /* we had a new enough map */
2701 pr_info_ratelimited("tid %llu pool does not exist\n",
2702 req->r_tid);
2703 complete_request(req, -ENOENT);
2704 }
2705 } else {
2706 send_map_check(req);
2707 }
2708 }
2709
map_check_cb(struct ceph_mon_generic_request * greq)2710 static void map_check_cb(struct ceph_mon_generic_request *greq)
2711 {
2712 struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2713 struct ceph_osd_request *req;
2714 u64 tid = greq->private_data;
2715
2716 WARN_ON(greq->result || !greq->u.newest);
2717
2718 down_write(&osdc->lock);
2719 req = lookup_request_mc(&osdc->map_checks, tid);
2720 if (!req) {
2721 dout("%s tid %llu dne\n", __func__, tid);
2722 goto out_unlock;
2723 }
2724
2725 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2726 req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2727 if (!req->r_map_dne_bound)
2728 req->r_map_dne_bound = greq->u.newest;
2729 erase_request_mc(&osdc->map_checks, req);
2730 check_pool_dne(req);
2731
2732 ceph_osdc_put_request(req);
2733 out_unlock:
2734 up_write(&osdc->lock);
2735 }
2736
send_map_check(struct ceph_osd_request * req)2737 static void send_map_check(struct ceph_osd_request *req)
2738 {
2739 struct ceph_osd_client *osdc = req->r_osdc;
2740 struct ceph_osd_request *lookup_req;
2741 int ret;
2742
2743 verify_osdc_wrlocked(osdc);
2744
2745 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2746 if (lookup_req) {
2747 WARN_ON(lookup_req != req);
2748 return;
2749 }
2750
2751 ceph_osdc_get_request(req);
2752 insert_request_mc(&osdc->map_checks, req);
2753 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2754 map_check_cb, req->r_tid);
2755 WARN_ON(ret);
2756 }
2757
2758 /*
2759 * lingering requests, watch/notify v2 infrastructure
2760 */
linger_release(struct kref * kref)2761 static void linger_release(struct kref *kref)
2762 {
2763 struct ceph_osd_linger_request *lreq =
2764 container_of(kref, struct ceph_osd_linger_request, kref);
2765
2766 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2767 lreq->reg_req, lreq->ping_req);
2768 WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2769 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2770 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2771 WARN_ON(!list_empty(&lreq->scan_item));
2772 WARN_ON(!list_empty(&lreq->pending_lworks));
2773 WARN_ON(lreq->osd);
2774
2775 if (lreq->request_pl)
2776 ceph_pagelist_release(lreq->request_pl);
2777 if (lreq->notify_id_pages)
2778 ceph_release_page_vector(lreq->notify_id_pages, 1);
2779
2780 ceph_osdc_put_request(lreq->reg_req);
2781 ceph_osdc_put_request(lreq->ping_req);
2782 target_destroy(&lreq->t);
2783 kfree(lreq);
2784 }
2785
linger_put(struct ceph_osd_linger_request * lreq)2786 static void linger_put(struct ceph_osd_linger_request *lreq)
2787 {
2788 if (lreq)
2789 kref_put(&lreq->kref, linger_release);
2790 }
2791
2792 static struct ceph_osd_linger_request *
linger_get(struct ceph_osd_linger_request * lreq)2793 linger_get(struct ceph_osd_linger_request *lreq)
2794 {
2795 kref_get(&lreq->kref);
2796 return lreq;
2797 }
2798
2799 static struct ceph_osd_linger_request *
linger_alloc(struct ceph_osd_client * osdc)2800 linger_alloc(struct ceph_osd_client *osdc)
2801 {
2802 struct ceph_osd_linger_request *lreq;
2803
2804 lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2805 if (!lreq)
2806 return NULL;
2807
2808 kref_init(&lreq->kref);
2809 mutex_init(&lreq->lock);
2810 RB_CLEAR_NODE(&lreq->node);
2811 RB_CLEAR_NODE(&lreq->osdc_node);
2812 RB_CLEAR_NODE(&lreq->mc_node);
2813 INIT_LIST_HEAD(&lreq->scan_item);
2814 INIT_LIST_HEAD(&lreq->pending_lworks);
2815 init_completion(&lreq->reg_commit_wait);
2816 init_completion(&lreq->notify_finish_wait);
2817
2818 lreq->osdc = osdc;
2819 target_init(&lreq->t);
2820
2821 dout("%s lreq %p\n", __func__, lreq);
2822 return lreq;
2823 }
2824
DEFINE_RB_INSDEL_FUNCS(linger,struct ceph_osd_linger_request,linger_id,node)2825 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2826 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2827 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2828
2829 /*
2830 * Create linger request <-> OSD session relation.
2831 *
2832 * @lreq has to be registered, @osd may be homeless.
2833 */
2834 static void link_linger(struct ceph_osd *osd,
2835 struct ceph_osd_linger_request *lreq)
2836 {
2837 verify_osd_locked(osd);
2838 WARN_ON(!lreq->linger_id || lreq->osd);
2839 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2840 osd->o_osd, lreq, lreq->linger_id);
2841
2842 if (!osd_homeless(osd))
2843 __remove_osd_from_lru(osd);
2844 else
2845 atomic_inc(&osd->o_osdc->num_homeless);
2846
2847 get_osd(osd);
2848 insert_linger(&osd->o_linger_requests, lreq);
2849 lreq->osd = osd;
2850 }
2851
unlink_linger(struct ceph_osd * osd,struct ceph_osd_linger_request * lreq)2852 static void unlink_linger(struct ceph_osd *osd,
2853 struct ceph_osd_linger_request *lreq)
2854 {
2855 verify_osd_locked(osd);
2856 WARN_ON(lreq->osd != osd);
2857 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2858 osd->o_osd, lreq, lreq->linger_id);
2859
2860 lreq->osd = NULL;
2861 erase_linger(&osd->o_linger_requests, lreq);
2862 put_osd(osd);
2863
2864 if (!osd_homeless(osd))
2865 maybe_move_osd_to_lru(osd);
2866 else
2867 atomic_dec(&osd->o_osdc->num_homeless);
2868 }
2869
__linger_registered(struct ceph_osd_linger_request * lreq)2870 static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2871 {
2872 verify_osdc_locked(lreq->osdc);
2873
2874 return !RB_EMPTY_NODE(&lreq->osdc_node);
2875 }
2876
linger_registered(struct ceph_osd_linger_request * lreq)2877 static bool linger_registered(struct ceph_osd_linger_request *lreq)
2878 {
2879 struct ceph_osd_client *osdc = lreq->osdc;
2880 bool registered;
2881
2882 down_read(&osdc->lock);
2883 registered = __linger_registered(lreq);
2884 up_read(&osdc->lock);
2885
2886 return registered;
2887 }
2888
linger_register(struct ceph_osd_linger_request * lreq)2889 static void linger_register(struct ceph_osd_linger_request *lreq)
2890 {
2891 struct ceph_osd_client *osdc = lreq->osdc;
2892
2893 verify_osdc_wrlocked(osdc);
2894 WARN_ON(lreq->linger_id);
2895
2896 linger_get(lreq);
2897 lreq->linger_id = ++osdc->last_linger_id;
2898 insert_linger_osdc(&osdc->linger_requests, lreq);
2899 }
2900
linger_unregister(struct ceph_osd_linger_request * lreq)2901 static void linger_unregister(struct ceph_osd_linger_request *lreq)
2902 {
2903 struct ceph_osd_client *osdc = lreq->osdc;
2904
2905 verify_osdc_wrlocked(osdc);
2906
2907 erase_linger_osdc(&osdc->linger_requests, lreq);
2908 linger_put(lreq);
2909 }
2910
cancel_linger_request(struct ceph_osd_request * req)2911 static void cancel_linger_request(struct ceph_osd_request *req)
2912 {
2913 struct ceph_osd_linger_request *lreq = req->r_priv;
2914
2915 WARN_ON(!req->r_linger);
2916 cancel_request(req);
2917 linger_put(lreq);
2918 }
2919
2920 struct linger_work {
2921 struct work_struct work;
2922 struct ceph_osd_linger_request *lreq;
2923 struct list_head pending_item;
2924 unsigned long queued_stamp;
2925
2926 union {
2927 struct {
2928 u64 notify_id;
2929 u64 notifier_id;
2930 void *payload; /* points into @msg front */
2931 size_t payload_len;
2932
2933 struct ceph_msg *msg; /* for ceph_msg_put() */
2934 } notify;
2935 struct {
2936 int err;
2937 } error;
2938 };
2939 };
2940
lwork_alloc(struct ceph_osd_linger_request * lreq,work_func_t workfn)2941 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2942 work_func_t workfn)
2943 {
2944 struct linger_work *lwork;
2945
2946 lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2947 if (!lwork)
2948 return NULL;
2949
2950 INIT_WORK(&lwork->work, workfn);
2951 INIT_LIST_HEAD(&lwork->pending_item);
2952 lwork->lreq = linger_get(lreq);
2953
2954 return lwork;
2955 }
2956
lwork_free(struct linger_work * lwork)2957 static void lwork_free(struct linger_work *lwork)
2958 {
2959 struct ceph_osd_linger_request *lreq = lwork->lreq;
2960
2961 mutex_lock(&lreq->lock);
2962 list_del(&lwork->pending_item);
2963 mutex_unlock(&lreq->lock);
2964
2965 linger_put(lreq);
2966 kfree(lwork);
2967 }
2968
lwork_queue(struct linger_work * lwork)2969 static void lwork_queue(struct linger_work *lwork)
2970 {
2971 struct ceph_osd_linger_request *lreq = lwork->lreq;
2972 struct ceph_osd_client *osdc = lreq->osdc;
2973
2974 verify_lreq_locked(lreq);
2975 WARN_ON(!list_empty(&lwork->pending_item));
2976
2977 lwork->queued_stamp = jiffies;
2978 list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2979 queue_work(osdc->notify_wq, &lwork->work);
2980 }
2981
do_watch_notify(struct work_struct * w)2982 static void do_watch_notify(struct work_struct *w)
2983 {
2984 struct linger_work *lwork = container_of(w, struct linger_work, work);
2985 struct ceph_osd_linger_request *lreq = lwork->lreq;
2986
2987 if (!linger_registered(lreq)) {
2988 dout("%s lreq %p not registered\n", __func__, lreq);
2989 goto out;
2990 }
2991
2992 WARN_ON(!lreq->is_watch);
2993 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2994 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2995 lwork->notify.payload_len);
2996 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2997 lwork->notify.notifier_id, lwork->notify.payload,
2998 lwork->notify.payload_len);
2999
3000 out:
3001 ceph_msg_put(lwork->notify.msg);
3002 lwork_free(lwork);
3003 }
3004
do_watch_error(struct work_struct * w)3005 static void do_watch_error(struct work_struct *w)
3006 {
3007 struct linger_work *lwork = container_of(w, struct linger_work, work);
3008 struct ceph_osd_linger_request *lreq = lwork->lreq;
3009
3010 if (!linger_registered(lreq)) {
3011 dout("%s lreq %p not registered\n", __func__, lreq);
3012 goto out;
3013 }
3014
3015 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
3016 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
3017
3018 out:
3019 lwork_free(lwork);
3020 }
3021
queue_watch_error(struct ceph_osd_linger_request * lreq)3022 static void queue_watch_error(struct ceph_osd_linger_request *lreq)
3023 {
3024 struct linger_work *lwork;
3025
3026 lwork = lwork_alloc(lreq, do_watch_error);
3027 if (!lwork) {
3028 pr_err("failed to allocate error-lwork\n");
3029 return;
3030 }
3031
3032 lwork->error.err = lreq->last_error;
3033 lwork_queue(lwork);
3034 }
3035
linger_reg_commit_complete(struct ceph_osd_linger_request * lreq,int result)3036 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
3037 int result)
3038 {
3039 if (!completion_done(&lreq->reg_commit_wait)) {
3040 lreq->reg_commit_error = (result <= 0 ? result : 0);
3041 complete_all(&lreq->reg_commit_wait);
3042 }
3043 }
3044
linger_commit_cb(struct ceph_osd_request * req)3045 static void linger_commit_cb(struct ceph_osd_request *req)
3046 {
3047 struct ceph_osd_linger_request *lreq = req->r_priv;
3048
3049 mutex_lock(&lreq->lock);
3050 if (req != lreq->reg_req) {
3051 dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3052 __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3053 goto out;
3054 }
3055
3056 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
3057 lreq->linger_id, req->r_result);
3058 linger_reg_commit_complete(lreq, req->r_result);
3059 lreq->committed = true;
3060
3061 if (!lreq->is_watch) {
3062 struct ceph_osd_data *osd_data =
3063 osd_req_op_data(req, 0, notify, response_data);
3064 void *p = page_address(osd_data->pages[0]);
3065
3066 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
3067 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
3068
3069 /* make note of the notify_id */
3070 if (req->r_ops[0].outdata_len >= sizeof(u64)) {
3071 lreq->notify_id = ceph_decode_64(&p);
3072 dout("lreq %p notify_id %llu\n", lreq,
3073 lreq->notify_id);
3074 } else {
3075 dout("lreq %p no notify_id\n", lreq);
3076 }
3077 }
3078
3079 out:
3080 mutex_unlock(&lreq->lock);
3081 linger_put(lreq);
3082 }
3083
normalize_watch_error(int err)3084 static int normalize_watch_error(int err)
3085 {
3086 /*
3087 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
3088 * notification and a failure to reconnect because we raced with
3089 * the delete appear the same to the user.
3090 */
3091 if (err == -ENOENT)
3092 err = -ENOTCONN;
3093
3094 return err;
3095 }
3096
linger_reconnect_cb(struct ceph_osd_request * req)3097 static void linger_reconnect_cb(struct ceph_osd_request *req)
3098 {
3099 struct ceph_osd_linger_request *lreq = req->r_priv;
3100
3101 mutex_lock(&lreq->lock);
3102 if (req != lreq->reg_req) {
3103 dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3104 __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3105 goto out;
3106 }
3107
3108 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
3109 lreq, lreq->linger_id, req->r_result, lreq->last_error);
3110 if (req->r_result < 0) {
3111 if (!lreq->last_error) {
3112 lreq->last_error = normalize_watch_error(req->r_result);
3113 queue_watch_error(lreq);
3114 }
3115 }
3116
3117 out:
3118 mutex_unlock(&lreq->lock);
3119 linger_put(lreq);
3120 }
3121
send_linger(struct ceph_osd_linger_request * lreq)3122 static void send_linger(struct ceph_osd_linger_request *lreq)
3123 {
3124 struct ceph_osd_client *osdc = lreq->osdc;
3125 struct ceph_osd_request *req;
3126 int ret;
3127
3128 verify_osdc_wrlocked(osdc);
3129 mutex_lock(&lreq->lock);
3130 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3131
3132 if (lreq->reg_req) {
3133 if (lreq->reg_req->r_osd)
3134 cancel_linger_request(lreq->reg_req);
3135 ceph_osdc_put_request(lreq->reg_req);
3136 }
3137
3138 req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3139 BUG_ON(!req);
3140
3141 target_copy(&req->r_t, &lreq->t);
3142 req->r_mtime = lreq->mtime;
3143
3144 if (lreq->is_watch && lreq->committed) {
3145 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT,
3146 lreq->linger_id, ++lreq->register_gen);
3147 dout("lreq %p reconnect register_gen %u\n", lreq,
3148 req->r_ops[0].watch.gen);
3149 req->r_callback = linger_reconnect_cb;
3150 } else {
3151 if (lreq->is_watch) {
3152 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH,
3153 lreq->linger_id, 0);
3154 } else {
3155 lreq->notify_id = 0;
3156
3157 refcount_inc(&lreq->request_pl->refcnt);
3158 osd_req_op_notify_init(req, 0, lreq->linger_id,
3159 lreq->request_pl);
3160 ceph_osd_data_pages_init(
3161 osd_req_op_data(req, 0, notify, response_data),
3162 lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
3163 }
3164 dout("lreq %p register\n", lreq);
3165 req->r_callback = linger_commit_cb;
3166 }
3167
3168 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3169 BUG_ON(ret);
3170
3171 req->r_priv = linger_get(lreq);
3172 req->r_linger = true;
3173 lreq->reg_req = req;
3174 mutex_unlock(&lreq->lock);
3175
3176 submit_request(req, true);
3177 }
3178
linger_ping_cb(struct ceph_osd_request * req)3179 static void linger_ping_cb(struct ceph_osd_request *req)
3180 {
3181 struct ceph_osd_linger_request *lreq = req->r_priv;
3182
3183 mutex_lock(&lreq->lock);
3184 if (req != lreq->ping_req) {
3185 dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3186 __func__, lreq, lreq->linger_id, req, lreq->ping_req);
3187 goto out;
3188 }
3189
3190 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
3191 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
3192 lreq->last_error);
3193 if (lreq->register_gen == req->r_ops[0].watch.gen) {
3194 if (!req->r_result) {
3195 lreq->watch_valid_thru = lreq->ping_sent;
3196 } else if (!lreq->last_error) {
3197 lreq->last_error = normalize_watch_error(req->r_result);
3198 queue_watch_error(lreq);
3199 }
3200 } else {
3201 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
3202 lreq->register_gen, req->r_ops[0].watch.gen);
3203 }
3204
3205 out:
3206 mutex_unlock(&lreq->lock);
3207 linger_put(lreq);
3208 }
3209
send_linger_ping(struct ceph_osd_linger_request * lreq)3210 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
3211 {
3212 struct ceph_osd_client *osdc = lreq->osdc;
3213 struct ceph_osd_request *req;
3214 int ret;
3215
3216 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
3217 dout("%s PAUSERD\n", __func__);
3218 return;
3219 }
3220
3221 lreq->ping_sent = jiffies;
3222 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
3223 __func__, lreq, lreq->linger_id, lreq->ping_sent,
3224 lreq->register_gen);
3225
3226 if (lreq->ping_req) {
3227 if (lreq->ping_req->r_osd)
3228 cancel_linger_request(lreq->ping_req);
3229 ceph_osdc_put_request(lreq->ping_req);
3230 }
3231
3232 req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3233 BUG_ON(!req);
3234
3235 target_copy(&req->r_t, &lreq->t);
3236 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id,
3237 lreq->register_gen);
3238 req->r_callback = linger_ping_cb;
3239
3240 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3241 BUG_ON(ret);
3242
3243 req->r_priv = linger_get(lreq);
3244 req->r_linger = true;
3245 lreq->ping_req = req;
3246
3247 ceph_osdc_get_request(req);
3248 account_request(req);
3249 req->r_tid = atomic64_inc_return(&osdc->last_tid);
3250 link_request(lreq->osd, req);
3251 send_request(req);
3252 }
3253
linger_submit(struct ceph_osd_linger_request * lreq)3254 static void linger_submit(struct ceph_osd_linger_request *lreq)
3255 {
3256 struct ceph_osd_client *osdc = lreq->osdc;
3257 struct ceph_osd *osd;
3258
3259 down_write(&osdc->lock);
3260 linger_register(lreq);
3261
3262 calc_target(osdc, &lreq->t, false);
3263 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3264 link_linger(osd, lreq);
3265
3266 send_linger(lreq);
3267 up_write(&osdc->lock);
3268 }
3269
cancel_linger_map_check(struct ceph_osd_linger_request * lreq)3270 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
3271 {
3272 struct ceph_osd_client *osdc = lreq->osdc;
3273 struct ceph_osd_linger_request *lookup_lreq;
3274
3275 verify_osdc_wrlocked(osdc);
3276
3277 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3278 lreq->linger_id);
3279 if (!lookup_lreq)
3280 return;
3281
3282 WARN_ON(lookup_lreq != lreq);
3283 erase_linger_mc(&osdc->linger_map_checks, lreq);
3284 linger_put(lreq);
3285 }
3286
3287 /*
3288 * @lreq has to be both registered and linked.
3289 */
__linger_cancel(struct ceph_osd_linger_request * lreq)3290 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
3291 {
3292 if (lreq->ping_req && lreq->ping_req->r_osd)
3293 cancel_linger_request(lreq->ping_req);
3294 if (lreq->reg_req && lreq->reg_req->r_osd)
3295 cancel_linger_request(lreq->reg_req);
3296 cancel_linger_map_check(lreq);
3297 unlink_linger(lreq->osd, lreq);
3298 linger_unregister(lreq);
3299 }
3300
linger_cancel(struct ceph_osd_linger_request * lreq)3301 static void linger_cancel(struct ceph_osd_linger_request *lreq)
3302 {
3303 struct ceph_osd_client *osdc = lreq->osdc;
3304
3305 down_write(&osdc->lock);
3306 if (__linger_registered(lreq))
3307 __linger_cancel(lreq);
3308 up_write(&osdc->lock);
3309 }
3310
3311 static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3312
check_linger_pool_dne(struct ceph_osd_linger_request * lreq)3313 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3314 {
3315 struct ceph_osd_client *osdc = lreq->osdc;
3316 struct ceph_osdmap *map = osdc->osdmap;
3317
3318 verify_osdc_wrlocked(osdc);
3319 WARN_ON(!map->epoch);
3320
3321 if (lreq->register_gen) {
3322 lreq->map_dne_bound = map->epoch;
3323 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3324 lreq, lreq->linger_id);
3325 } else {
3326 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3327 __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3328 map->epoch);
3329 }
3330
3331 if (lreq->map_dne_bound) {
3332 if (map->epoch >= lreq->map_dne_bound) {
3333 /* we had a new enough map */
3334 pr_info("linger_id %llu pool does not exist\n",
3335 lreq->linger_id);
3336 linger_reg_commit_complete(lreq, -ENOENT);
3337 __linger_cancel(lreq);
3338 }
3339 } else {
3340 send_linger_map_check(lreq);
3341 }
3342 }
3343
linger_map_check_cb(struct ceph_mon_generic_request * greq)3344 static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3345 {
3346 struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3347 struct ceph_osd_linger_request *lreq;
3348 u64 linger_id = greq->private_data;
3349
3350 WARN_ON(greq->result || !greq->u.newest);
3351
3352 down_write(&osdc->lock);
3353 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3354 if (!lreq) {
3355 dout("%s linger_id %llu dne\n", __func__, linger_id);
3356 goto out_unlock;
3357 }
3358
3359 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3360 __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3361 greq->u.newest);
3362 if (!lreq->map_dne_bound)
3363 lreq->map_dne_bound = greq->u.newest;
3364 erase_linger_mc(&osdc->linger_map_checks, lreq);
3365 check_linger_pool_dne(lreq);
3366
3367 linger_put(lreq);
3368 out_unlock:
3369 up_write(&osdc->lock);
3370 }
3371
send_linger_map_check(struct ceph_osd_linger_request * lreq)3372 static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3373 {
3374 struct ceph_osd_client *osdc = lreq->osdc;
3375 struct ceph_osd_linger_request *lookup_lreq;
3376 int ret;
3377
3378 verify_osdc_wrlocked(osdc);
3379
3380 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3381 lreq->linger_id);
3382 if (lookup_lreq) {
3383 WARN_ON(lookup_lreq != lreq);
3384 return;
3385 }
3386
3387 linger_get(lreq);
3388 insert_linger_mc(&osdc->linger_map_checks, lreq);
3389 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3390 linger_map_check_cb, lreq->linger_id);
3391 WARN_ON(ret);
3392 }
3393
linger_reg_commit_wait(struct ceph_osd_linger_request * lreq)3394 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3395 {
3396 int ret;
3397
3398 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3399 ret = wait_for_completion_killable(&lreq->reg_commit_wait);
3400 return ret ?: lreq->reg_commit_error;
3401 }
3402
linger_notify_finish_wait(struct ceph_osd_linger_request * lreq,unsigned long timeout)3403 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq,
3404 unsigned long timeout)
3405 {
3406 long left;
3407
3408 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3409 left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait,
3410 ceph_timeout_jiffies(timeout));
3411 if (left <= 0)
3412 left = left ?: -ETIMEDOUT;
3413 else
3414 left = lreq->notify_finish_error; /* completed */
3415
3416 return left;
3417 }
3418
3419 /*
3420 * Timeout callback, called every N seconds. When 1 or more OSD
3421 * requests has been active for more than N seconds, we send a keepalive
3422 * (tag + timestamp) to its OSD to ensure any communications channel
3423 * reset is detected.
3424 */
handle_timeout(struct work_struct * work)3425 static void handle_timeout(struct work_struct *work)
3426 {
3427 struct ceph_osd_client *osdc =
3428 container_of(work, struct ceph_osd_client, timeout_work.work);
3429 struct ceph_options *opts = osdc->client->options;
3430 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3431 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3432 LIST_HEAD(slow_osds);
3433 struct rb_node *n, *p;
3434
3435 dout("%s osdc %p\n", __func__, osdc);
3436 down_write(&osdc->lock);
3437
3438 /*
3439 * ping osds that are a bit slow. this ensures that if there
3440 * is a break in the TCP connection we will notice, and reopen
3441 * a connection with that osd (from the fault callback).
3442 */
3443 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3444 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3445 bool found = false;
3446
3447 for (p = rb_first(&osd->o_requests); p; ) {
3448 struct ceph_osd_request *req =
3449 rb_entry(p, struct ceph_osd_request, r_node);
3450
3451 p = rb_next(p); /* abort_request() */
3452
3453 if (time_before(req->r_stamp, cutoff)) {
3454 dout(" req %p tid %llu on osd%d is laggy\n",
3455 req, req->r_tid, osd->o_osd);
3456 found = true;
3457 }
3458 if (opts->osd_request_timeout &&
3459 time_before(req->r_start_stamp, expiry_cutoff)) {
3460 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3461 req->r_tid, osd->o_osd);
3462 abort_request(req, -ETIMEDOUT);
3463 }
3464 }
3465 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3466 struct ceph_osd_linger_request *lreq =
3467 rb_entry(p, struct ceph_osd_linger_request, node);
3468
3469 dout(" lreq %p linger_id %llu is served by osd%d\n",
3470 lreq, lreq->linger_id, osd->o_osd);
3471 found = true;
3472
3473 mutex_lock(&lreq->lock);
3474 if (lreq->is_watch && lreq->committed && !lreq->last_error)
3475 send_linger_ping(lreq);
3476 mutex_unlock(&lreq->lock);
3477 }
3478
3479 if (found)
3480 list_move_tail(&osd->o_keepalive_item, &slow_osds);
3481 }
3482
3483 if (opts->osd_request_timeout) {
3484 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3485 struct ceph_osd_request *req =
3486 rb_entry(p, struct ceph_osd_request, r_node);
3487
3488 p = rb_next(p); /* abort_request() */
3489
3490 if (time_before(req->r_start_stamp, expiry_cutoff)) {
3491 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3492 req->r_tid, osdc->homeless_osd.o_osd);
3493 abort_request(req, -ETIMEDOUT);
3494 }
3495 }
3496 }
3497
3498 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3499 maybe_request_map(osdc);
3500
3501 while (!list_empty(&slow_osds)) {
3502 struct ceph_osd *osd = list_first_entry(&slow_osds,
3503 struct ceph_osd,
3504 o_keepalive_item);
3505 list_del_init(&osd->o_keepalive_item);
3506 ceph_con_keepalive(&osd->o_con);
3507 }
3508
3509 up_write(&osdc->lock);
3510 schedule_delayed_work(&osdc->timeout_work,
3511 osdc->client->options->osd_keepalive_timeout);
3512 }
3513
handle_osds_timeout(struct work_struct * work)3514 static void handle_osds_timeout(struct work_struct *work)
3515 {
3516 struct ceph_osd_client *osdc =
3517 container_of(work, struct ceph_osd_client,
3518 osds_timeout_work.work);
3519 unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3520 struct ceph_osd *osd, *nosd;
3521
3522 dout("%s osdc %p\n", __func__, osdc);
3523 down_write(&osdc->lock);
3524 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3525 if (time_before(jiffies, osd->lru_ttl))
3526 break;
3527
3528 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3529 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3530 close_osd(osd);
3531 }
3532
3533 up_write(&osdc->lock);
3534 schedule_delayed_work(&osdc->osds_timeout_work,
3535 round_jiffies_relative(delay));
3536 }
3537
ceph_oloc_decode(void ** p,void * end,struct ceph_object_locator * oloc)3538 static int ceph_oloc_decode(void **p, void *end,
3539 struct ceph_object_locator *oloc)
3540 {
3541 u8 struct_v, struct_cv;
3542 u32 len;
3543 void *struct_end;
3544 int ret = 0;
3545
3546 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3547 struct_v = ceph_decode_8(p);
3548 struct_cv = ceph_decode_8(p);
3549 if (struct_v < 3) {
3550 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3551 struct_v, struct_cv);
3552 goto e_inval;
3553 }
3554 if (struct_cv > 6) {
3555 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3556 struct_v, struct_cv);
3557 goto e_inval;
3558 }
3559 len = ceph_decode_32(p);
3560 ceph_decode_need(p, end, len, e_inval);
3561 struct_end = *p + len;
3562
3563 oloc->pool = ceph_decode_64(p);
3564 *p += 4; /* skip preferred */
3565
3566 len = ceph_decode_32(p);
3567 if (len > 0) {
3568 pr_warn("ceph_object_locator::key is set\n");
3569 goto e_inval;
3570 }
3571
3572 if (struct_v >= 5) {
3573 bool changed = false;
3574
3575 len = ceph_decode_32(p);
3576 if (len > 0) {
3577 ceph_decode_need(p, end, len, e_inval);
3578 if (!oloc->pool_ns ||
3579 ceph_compare_string(oloc->pool_ns, *p, len))
3580 changed = true;
3581 *p += len;
3582 } else {
3583 if (oloc->pool_ns)
3584 changed = true;
3585 }
3586 if (changed) {
3587 /* redirect changes namespace */
3588 pr_warn("ceph_object_locator::nspace is changed\n");
3589 goto e_inval;
3590 }
3591 }
3592
3593 if (struct_v >= 6) {
3594 s64 hash = ceph_decode_64(p);
3595 if (hash != -1) {
3596 pr_warn("ceph_object_locator::hash is set\n");
3597 goto e_inval;
3598 }
3599 }
3600
3601 /* skip the rest */
3602 *p = struct_end;
3603 out:
3604 return ret;
3605
3606 e_inval:
3607 ret = -EINVAL;
3608 goto out;
3609 }
3610
ceph_redirect_decode(void ** p,void * end,struct ceph_request_redirect * redir)3611 static int ceph_redirect_decode(void **p, void *end,
3612 struct ceph_request_redirect *redir)
3613 {
3614 u8 struct_v, struct_cv;
3615 u32 len;
3616 void *struct_end;
3617 int ret;
3618
3619 ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3620 struct_v = ceph_decode_8(p);
3621 struct_cv = ceph_decode_8(p);
3622 if (struct_cv > 1) {
3623 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3624 struct_v, struct_cv);
3625 goto e_inval;
3626 }
3627 len = ceph_decode_32(p);
3628 ceph_decode_need(p, end, len, e_inval);
3629 struct_end = *p + len;
3630
3631 ret = ceph_oloc_decode(p, end, &redir->oloc);
3632 if (ret)
3633 goto out;
3634
3635 len = ceph_decode_32(p);
3636 if (len > 0) {
3637 pr_warn("ceph_request_redirect::object_name is set\n");
3638 goto e_inval;
3639 }
3640
3641 /* skip the rest */
3642 *p = struct_end;
3643 out:
3644 return ret;
3645
3646 e_inval:
3647 ret = -EINVAL;
3648 goto out;
3649 }
3650
3651 struct MOSDOpReply {
3652 struct ceph_pg pgid;
3653 u64 flags;
3654 int result;
3655 u32 epoch;
3656 int num_ops;
3657 u32 outdata_len[CEPH_OSD_MAX_OPS];
3658 s32 rval[CEPH_OSD_MAX_OPS];
3659 int retry_attempt;
3660 struct ceph_eversion replay_version;
3661 u64 user_version;
3662 struct ceph_request_redirect redirect;
3663 };
3664
decode_MOSDOpReply(const struct ceph_msg * msg,struct MOSDOpReply * m)3665 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3666 {
3667 void *p = msg->front.iov_base;
3668 void *const end = p + msg->front.iov_len;
3669 u16 version = le16_to_cpu(msg->hdr.version);
3670 struct ceph_eversion bad_replay_version;
3671 u8 decode_redir;
3672 u32 len;
3673 int ret;
3674 int i;
3675
3676 ceph_decode_32_safe(&p, end, len, e_inval);
3677 ceph_decode_need(&p, end, len, e_inval);
3678 p += len; /* skip oid */
3679
3680 ret = ceph_decode_pgid(&p, end, &m->pgid);
3681 if (ret)
3682 return ret;
3683
3684 ceph_decode_64_safe(&p, end, m->flags, e_inval);
3685 ceph_decode_32_safe(&p, end, m->result, e_inval);
3686 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3687 memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3688 p += sizeof(bad_replay_version);
3689 ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3690
3691 ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3692 if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3693 goto e_inval;
3694
3695 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3696 e_inval);
3697 for (i = 0; i < m->num_ops; i++) {
3698 struct ceph_osd_op *op = p;
3699
3700 m->outdata_len[i] = le32_to_cpu(op->payload_len);
3701 p += sizeof(*op);
3702 }
3703
3704 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3705 for (i = 0; i < m->num_ops; i++)
3706 ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3707
3708 if (version >= 5) {
3709 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3710 memcpy(&m->replay_version, p, sizeof(m->replay_version));
3711 p += sizeof(m->replay_version);
3712 ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3713 } else {
3714 m->replay_version = bad_replay_version; /* struct */
3715 m->user_version = le64_to_cpu(m->replay_version.version);
3716 }
3717
3718 if (version >= 6) {
3719 if (version >= 7)
3720 ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3721 else
3722 decode_redir = 1;
3723 } else {
3724 decode_redir = 0;
3725 }
3726
3727 if (decode_redir) {
3728 ret = ceph_redirect_decode(&p, end, &m->redirect);
3729 if (ret)
3730 return ret;
3731 } else {
3732 ceph_oloc_init(&m->redirect.oloc);
3733 }
3734
3735 return 0;
3736
3737 e_inval:
3738 return -EINVAL;
3739 }
3740
3741 /*
3742 * Handle MOSDOpReply. Set ->r_result and call the callback if it is
3743 * specified.
3744 */
handle_reply(struct ceph_osd * osd,struct ceph_msg * msg)3745 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3746 {
3747 struct ceph_osd_client *osdc = osd->o_osdc;
3748 struct ceph_osd_request *req;
3749 struct MOSDOpReply m;
3750 u64 tid = le64_to_cpu(msg->hdr.tid);
3751 u32 data_len = 0;
3752 int ret;
3753 int i;
3754
3755 dout("%s msg %p tid %llu\n", __func__, msg, tid);
3756
3757 down_read(&osdc->lock);
3758 if (!osd_registered(osd)) {
3759 dout("%s osd%d unknown\n", __func__, osd->o_osd);
3760 goto out_unlock_osdc;
3761 }
3762 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3763
3764 mutex_lock(&osd->lock);
3765 req = lookup_request(&osd->o_requests, tid);
3766 if (!req) {
3767 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3768 goto out_unlock_session;
3769 }
3770
3771 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3772 ret = decode_MOSDOpReply(msg, &m);
3773 m.redirect.oloc.pool_ns = NULL;
3774 if (ret) {
3775 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3776 req->r_tid, ret);
3777 ceph_msg_dump(msg);
3778 goto fail_request;
3779 }
3780 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3781 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3782 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3783 le64_to_cpu(m.replay_version.version), m.user_version);
3784
3785 if (m.retry_attempt >= 0) {
3786 if (m.retry_attempt != req->r_attempts - 1) {
3787 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3788 req, req->r_tid, m.retry_attempt,
3789 req->r_attempts - 1);
3790 goto out_unlock_session;
3791 }
3792 } else {
3793 WARN_ON(1); /* MOSDOpReply v4 is assumed */
3794 }
3795
3796 if (!ceph_oloc_empty(&m.redirect.oloc)) {
3797 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3798 m.redirect.oloc.pool);
3799 unlink_request(osd, req);
3800 mutex_unlock(&osd->lock);
3801
3802 /*
3803 * Not ceph_oloc_copy() - changing pool_ns is not
3804 * supported.
3805 */
3806 req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3807 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED |
3808 CEPH_OSD_FLAG_IGNORE_OVERLAY |
3809 CEPH_OSD_FLAG_IGNORE_CACHE;
3810 req->r_tid = 0;
3811 __submit_request(req, false);
3812 goto out_unlock_osdc;
3813 }
3814
3815 if (m.result == -EAGAIN) {
3816 dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3817 unlink_request(osd, req);
3818 mutex_unlock(&osd->lock);
3819
3820 /*
3821 * The object is missing on the replica or not (yet)
3822 * readable. Clear pgid to force a resend to the primary
3823 * via legacy_change.
3824 */
3825 req->r_t.pgid.pool = 0;
3826 req->r_t.pgid.seed = 0;
3827 WARN_ON(!req->r_t.used_replica);
3828 req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3829 CEPH_OSD_FLAG_LOCALIZE_READS);
3830 req->r_tid = 0;
3831 __submit_request(req, false);
3832 goto out_unlock_osdc;
3833 }
3834
3835 if (m.num_ops != req->r_num_ops) {
3836 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3837 req->r_num_ops, req->r_tid);
3838 goto fail_request;
3839 }
3840 for (i = 0; i < req->r_num_ops; i++) {
3841 dout(" req %p tid %llu op %d rval %d len %u\n", req,
3842 req->r_tid, i, m.rval[i], m.outdata_len[i]);
3843 req->r_ops[i].rval = m.rval[i];
3844 req->r_ops[i].outdata_len = m.outdata_len[i];
3845 data_len += m.outdata_len[i];
3846 }
3847 if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3848 pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3849 le32_to_cpu(msg->hdr.data_len), req->r_tid);
3850 goto fail_request;
3851 }
3852 dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3853 req, req->r_tid, m.result, data_len);
3854
3855 /*
3856 * Since we only ever request ONDISK, we should only ever get
3857 * one (type of) reply back.
3858 */
3859 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3860 req->r_version = m.user_version;
3861 req->r_result = m.result ?: data_len;
3862 finish_request(req);
3863 mutex_unlock(&osd->lock);
3864 up_read(&osdc->lock);
3865
3866 __complete_request(req);
3867 return;
3868
3869 fail_request:
3870 complete_request(req, -EIO);
3871 out_unlock_session:
3872 mutex_unlock(&osd->lock);
3873 out_unlock_osdc:
3874 up_read(&osdc->lock);
3875 }
3876
set_pool_was_full(struct ceph_osd_client * osdc)3877 static void set_pool_was_full(struct ceph_osd_client *osdc)
3878 {
3879 struct rb_node *n;
3880
3881 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3882 struct ceph_pg_pool_info *pi =
3883 rb_entry(n, struct ceph_pg_pool_info, node);
3884
3885 pi->was_full = __pool_full(pi);
3886 }
3887 }
3888
pool_cleared_full(struct ceph_osd_client * osdc,s64 pool_id)3889 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3890 {
3891 struct ceph_pg_pool_info *pi;
3892
3893 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3894 if (!pi)
3895 return false;
3896
3897 return pi->was_full && !__pool_full(pi);
3898 }
3899
3900 static enum calc_target_result
recalc_linger_target(struct ceph_osd_linger_request * lreq)3901 recalc_linger_target(struct ceph_osd_linger_request *lreq)
3902 {
3903 struct ceph_osd_client *osdc = lreq->osdc;
3904 enum calc_target_result ct_res;
3905
3906 ct_res = calc_target(osdc, &lreq->t, true);
3907 if (ct_res == CALC_TARGET_NEED_RESEND) {
3908 struct ceph_osd *osd;
3909
3910 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3911 if (osd != lreq->osd) {
3912 unlink_linger(lreq->osd, lreq);
3913 link_linger(osd, lreq);
3914 }
3915 }
3916
3917 return ct_res;
3918 }
3919
3920 /*
3921 * Requeue requests whose mapping to an OSD has changed.
3922 */
scan_requests(struct ceph_osd * osd,bool force_resend,bool cleared_full,bool check_pool_cleared_full,struct rb_root * need_resend,struct list_head * need_resend_linger)3923 static void scan_requests(struct ceph_osd *osd,
3924 bool force_resend,
3925 bool cleared_full,
3926 bool check_pool_cleared_full,
3927 struct rb_root *need_resend,
3928 struct list_head *need_resend_linger)
3929 {
3930 struct ceph_osd_client *osdc = osd->o_osdc;
3931 struct rb_node *n;
3932 bool force_resend_writes;
3933
3934 for (n = rb_first(&osd->o_linger_requests); n; ) {
3935 struct ceph_osd_linger_request *lreq =
3936 rb_entry(n, struct ceph_osd_linger_request, node);
3937 enum calc_target_result ct_res;
3938
3939 n = rb_next(n); /* recalc_linger_target() */
3940
3941 dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3942 lreq->linger_id);
3943 ct_res = recalc_linger_target(lreq);
3944 switch (ct_res) {
3945 case CALC_TARGET_NO_ACTION:
3946 force_resend_writes = cleared_full ||
3947 (check_pool_cleared_full &&
3948 pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3949 if (!force_resend && !force_resend_writes)
3950 break;
3951
3952 fallthrough;
3953 case CALC_TARGET_NEED_RESEND:
3954 cancel_linger_map_check(lreq);
3955 /*
3956 * scan_requests() for the previous epoch(s)
3957 * may have already added it to the list, since
3958 * it's not unlinked here.
3959 */
3960 if (list_empty(&lreq->scan_item))
3961 list_add_tail(&lreq->scan_item, need_resend_linger);
3962 break;
3963 case CALC_TARGET_POOL_DNE:
3964 list_del_init(&lreq->scan_item);
3965 check_linger_pool_dne(lreq);
3966 break;
3967 }
3968 }
3969
3970 for (n = rb_first(&osd->o_requests); n; ) {
3971 struct ceph_osd_request *req =
3972 rb_entry(n, struct ceph_osd_request, r_node);
3973 enum calc_target_result ct_res;
3974
3975 n = rb_next(n); /* unlink_request(), check_pool_dne() */
3976
3977 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3978 ct_res = calc_target(osdc, &req->r_t, false);
3979 switch (ct_res) {
3980 case CALC_TARGET_NO_ACTION:
3981 force_resend_writes = cleared_full ||
3982 (check_pool_cleared_full &&
3983 pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3984 if (!force_resend &&
3985 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3986 !force_resend_writes))
3987 break;
3988
3989 fallthrough;
3990 case CALC_TARGET_NEED_RESEND:
3991 cancel_map_check(req);
3992 unlink_request(osd, req);
3993 insert_request(need_resend, req);
3994 break;
3995 case CALC_TARGET_POOL_DNE:
3996 check_pool_dne(req);
3997 break;
3998 }
3999 }
4000 }
4001
handle_one_map(struct ceph_osd_client * osdc,void * p,void * end,bool incremental,struct rb_root * need_resend,struct list_head * need_resend_linger)4002 static int handle_one_map(struct ceph_osd_client *osdc,
4003 void *p, void *end, bool incremental,
4004 struct rb_root *need_resend,
4005 struct list_head *need_resend_linger)
4006 {
4007 struct ceph_osdmap *newmap;
4008 struct rb_node *n;
4009 bool skipped_map = false;
4010 bool was_full;
4011
4012 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4013 set_pool_was_full(osdc);
4014
4015 if (incremental)
4016 newmap = osdmap_apply_incremental(&p, end,
4017 ceph_msgr2(osdc->client),
4018 osdc->osdmap);
4019 else
4020 newmap = ceph_osdmap_decode(&p, end, ceph_msgr2(osdc->client));
4021 if (IS_ERR(newmap))
4022 return PTR_ERR(newmap);
4023
4024 if (newmap != osdc->osdmap) {
4025 /*
4026 * Preserve ->was_full before destroying the old map.
4027 * For pools that weren't in the old map, ->was_full
4028 * should be false.
4029 */
4030 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
4031 struct ceph_pg_pool_info *pi =
4032 rb_entry(n, struct ceph_pg_pool_info, node);
4033 struct ceph_pg_pool_info *old_pi;
4034
4035 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
4036 if (old_pi)
4037 pi->was_full = old_pi->was_full;
4038 else
4039 WARN_ON(pi->was_full);
4040 }
4041
4042 if (osdc->osdmap->epoch &&
4043 osdc->osdmap->epoch + 1 < newmap->epoch) {
4044 WARN_ON(incremental);
4045 skipped_map = true;
4046 }
4047
4048 ceph_osdmap_destroy(osdc->osdmap);
4049 osdc->osdmap = newmap;
4050 }
4051
4052 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4053 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
4054 need_resend, need_resend_linger);
4055
4056 for (n = rb_first(&osdc->osds); n; ) {
4057 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4058
4059 n = rb_next(n); /* close_osd() */
4060
4061 scan_requests(osd, skipped_map, was_full, true, need_resend,
4062 need_resend_linger);
4063 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
4064 memcmp(&osd->o_con.peer_addr,
4065 ceph_osd_addr(osdc->osdmap, osd->o_osd),
4066 sizeof(struct ceph_entity_addr)))
4067 close_osd(osd);
4068 }
4069
4070 return 0;
4071 }
4072
kick_requests(struct ceph_osd_client * osdc,struct rb_root * need_resend,struct list_head * need_resend_linger)4073 static void kick_requests(struct ceph_osd_client *osdc,
4074 struct rb_root *need_resend,
4075 struct list_head *need_resend_linger)
4076 {
4077 struct ceph_osd_linger_request *lreq, *nlreq;
4078 enum calc_target_result ct_res;
4079 struct rb_node *n;
4080
4081 /* make sure need_resend targets reflect latest map */
4082 for (n = rb_first(need_resend); n; ) {
4083 struct ceph_osd_request *req =
4084 rb_entry(n, struct ceph_osd_request, r_node);
4085
4086 n = rb_next(n);
4087
4088 if (req->r_t.epoch < osdc->osdmap->epoch) {
4089 ct_res = calc_target(osdc, &req->r_t, false);
4090 if (ct_res == CALC_TARGET_POOL_DNE) {
4091 erase_request(need_resend, req);
4092 check_pool_dne(req);
4093 }
4094 }
4095 }
4096
4097 for (n = rb_first(need_resend); n; ) {
4098 struct ceph_osd_request *req =
4099 rb_entry(n, struct ceph_osd_request, r_node);
4100 struct ceph_osd *osd;
4101
4102 n = rb_next(n);
4103 erase_request(need_resend, req); /* before link_request() */
4104
4105 osd = lookup_create_osd(osdc, req->r_t.osd, true);
4106 link_request(osd, req);
4107 if (!req->r_linger) {
4108 if (!osd_homeless(osd) && !req->r_t.paused)
4109 send_request(req);
4110 } else {
4111 cancel_linger_request(req);
4112 }
4113 }
4114
4115 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
4116 if (!osd_homeless(lreq->osd))
4117 send_linger(lreq);
4118
4119 list_del_init(&lreq->scan_item);
4120 }
4121 }
4122
4123 /*
4124 * Process updated osd map.
4125 *
4126 * The message contains any number of incremental and full maps, normally
4127 * indicating some sort of topology change in the cluster. Kick requests
4128 * off to different OSDs as needed.
4129 */
ceph_osdc_handle_map(struct ceph_osd_client * osdc,struct ceph_msg * msg)4130 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
4131 {
4132 void *p = msg->front.iov_base;
4133 void *const end = p + msg->front.iov_len;
4134 u32 nr_maps, maplen;
4135 u32 epoch;
4136 struct ceph_fsid fsid;
4137 struct rb_root need_resend = RB_ROOT;
4138 LIST_HEAD(need_resend_linger);
4139 bool handled_incremental = false;
4140 bool was_pauserd, was_pausewr;
4141 bool pauserd, pausewr;
4142 int err;
4143
4144 dout("%s have %u\n", __func__, osdc->osdmap->epoch);
4145 down_write(&osdc->lock);
4146
4147 /* verify fsid */
4148 ceph_decode_need(&p, end, sizeof(fsid), bad);
4149 ceph_decode_copy(&p, &fsid, sizeof(fsid));
4150 if (ceph_check_fsid(osdc->client, &fsid) < 0)
4151 goto bad;
4152
4153 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4154 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4155 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4156 have_pool_full(osdc);
4157
4158 /* incremental maps */
4159 ceph_decode_32_safe(&p, end, nr_maps, bad);
4160 dout(" %d inc maps\n", nr_maps);
4161 while (nr_maps > 0) {
4162 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4163 epoch = ceph_decode_32(&p);
4164 maplen = ceph_decode_32(&p);
4165 ceph_decode_need(&p, end, maplen, bad);
4166 if (osdc->osdmap->epoch &&
4167 osdc->osdmap->epoch + 1 == epoch) {
4168 dout("applying incremental map %u len %d\n",
4169 epoch, maplen);
4170 err = handle_one_map(osdc, p, p + maplen, true,
4171 &need_resend, &need_resend_linger);
4172 if (err)
4173 goto bad;
4174 handled_incremental = true;
4175 } else {
4176 dout("ignoring incremental map %u len %d\n",
4177 epoch, maplen);
4178 }
4179 p += maplen;
4180 nr_maps--;
4181 }
4182 if (handled_incremental)
4183 goto done;
4184
4185 /* full maps */
4186 ceph_decode_32_safe(&p, end, nr_maps, bad);
4187 dout(" %d full maps\n", nr_maps);
4188 while (nr_maps) {
4189 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4190 epoch = ceph_decode_32(&p);
4191 maplen = ceph_decode_32(&p);
4192 ceph_decode_need(&p, end, maplen, bad);
4193 if (nr_maps > 1) {
4194 dout("skipping non-latest full map %u len %d\n",
4195 epoch, maplen);
4196 } else if (osdc->osdmap->epoch >= epoch) {
4197 dout("skipping full map %u len %d, "
4198 "older than our %u\n", epoch, maplen,
4199 osdc->osdmap->epoch);
4200 } else {
4201 dout("taking full map %u len %d\n", epoch, maplen);
4202 err = handle_one_map(osdc, p, p + maplen, false,
4203 &need_resend, &need_resend_linger);
4204 if (err)
4205 goto bad;
4206 }
4207 p += maplen;
4208 nr_maps--;
4209 }
4210
4211 done:
4212 /*
4213 * subscribe to subsequent osdmap updates if full to ensure
4214 * we find out when we are no longer full and stop returning
4215 * ENOSPC.
4216 */
4217 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4218 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4219 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4220 have_pool_full(osdc);
4221 if (was_pauserd || was_pausewr || pauserd || pausewr ||
4222 osdc->osdmap->epoch < osdc->epoch_barrier)
4223 maybe_request_map(osdc);
4224
4225 kick_requests(osdc, &need_resend, &need_resend_linger);
4226
4227 ceph_osdc_abort_on_full(osdc);
4228 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
4229 osdc->osdmap->epoch);
4230 up_write(&osdc->lock);
4231 wake_up_all(&osdc->client->auth_wq);
4232 return;
4233
4234 bad:
4235 pr_err("osdc handle_map corrupt msg\n");
4236 ceph_msg_dump(msg);
4237 up_write(&osdc->lock);
4238 }
4239
4240 /*
4241 * Resubmit requests pending on the given osd.
4242 */
kick_osd_requests(struct ceph_osd * osd)4243 static void kick_osd_requests(struct ceph_osd *osd)
4244 {
4245 struct rb_node *n;
4246
4247 clear_backoffs(osd);
4248
4249 for (n = rb_first(&osd->o_requests); n; ) {
4250 struct ceph_osd_request *req =
4251 rb_entry(n, struct ceph_osd_request, r_node);
4252
4253 n = rb_next(n); /* cancel_linger_request() */
4254
4255 if (!req->r_linger) {
4256 if (!req->r_t.paused)
4257 send_request(req);
4258 } else {
4259 cancel_linger_request(req);
4260 }
4261 }
4262 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4263 struct ceph_osd_linger_request *lreq =
4264 rb_entry(n, struct ceph_osd_linger_request, node);
4265
4266 send_linger(lreq);
4267 }
4268 }
4269
4270 /*
4271 * If the osd connection drops, we need to resubmit all requests.
4272 */
osd_fault(struct ceph_connection * con)4273 static void osd_fault(struct ceph_connection *con)
4274 {
4275 struct ceph_osd *osd = con->private;
4276 struct ceph_osd_client *osdc = osd->o_osdc;
4277
4278 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4279
4280 down_write(&osdc->lock);
4281 if (!osd_registered(osd)) {
4282 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4283 goto out_unlock;
4284 }
4285
4286 if (!reopen_osd(osd))
4287 kick_osd_requests(osd);
4288 maybe_request_map(osdc);
4289
4290 out_unlock:
4291 up_write(&osdc->lock);
4292 }
4293
4294 struct MOSDBackoff {
4295 struct ceph_spg spgid;
4296 u32 map_epoch;
4297 u8 op;
4298 u64 id;
4299 struct ceph_hobject_id *begin;
4300 struct ceph_hobject_id *end;
4301 };
4302
decode_MOSDBackoff(const struct ceph_msg * msg,struct MOSDBackoff * m)4303 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
4304 {
4305 void *p = msg->front.iov_base;
4306 void *const end = p + msg->front.iov_len;
4307 u8 struct_v;
4308 u32 struct_len;
4309 int ret;
4310
4311 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
4312 if (ret)
4313 return ret;
4314
4315 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
4316 if (ret)
4317 return ret;
4318
4319 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
4320 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4321 ceph_decode_8_safe(&p, end, m->op, e_inval);
4322 ceph_decode_64_safe(&p, end, m->id, e_inval);
4323
4324 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4325 if (!m->begin)
4326 return -ENOMEM;
4327
4328 ret = decode_hoid(&p, end, m->begin);
4329 if (ret) {
4330 free_hoid(m->begin);
4331 return ret;
4332 }
4333
4334 m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4335 if (!m->end) {
4336 free_hoid(m->begin);
4337 return -ENOMEM;
4338 }
4339
4340 ret = decode_hoid(&p, end, m->end);
4341 if (ret) {
4342 free_hoid(m->begin);
4343 free_hoid(m->end);
4344 return ret;
4345 }
4346
4347 return 0;
4348
4349 e_inval:
4350 return -EINVAL;
4351 }
4352
create_backoff_message(const struct ceph_osd_backoff * backoff,u32 map_epoch)4353 static struct ceph_msg *create_backoff_message(
4354 const struct ceph_osd_backoff *backoff,
4355 u32 map_epoch)
4356 {
4357 struct ceph_msg *msg;
4358 void *p, *end;
4359 int msg_size;
4360
4361 msg_size = CEPH_ENCODING_START_BLK_LEN +
4362 CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4363 msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4364 msg_size += CEPH_ENCODING_START_BLK_LEN +
4365 hoid_encoding_size(backoff->begin);
4366 msg_size += CEPH_ENCODING_START_BLK_LEN +
4367 hoid_encoding_size(backoff->end);
4368
4369 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4370 if (!msg)
4371 return NULL;
4372
4373 p = msg->front.iov_base;
4374 end = p + msg->front_alloc_len;
4375
4376 encode_spgid(&p, &backoff->spgid);
4377 ceph_encode_32(&p, map_epoch);
4378 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4379 ceph_encode_64(&p, backoff->id);
4380 encode_hoid(&p, end, backoff->begin);
4381 encode_hoid(&p, end, backoff->end);
4382 BUG_ON(p != end);
4383
4384 msg->front.iov_len = p - msg->front.iov_base;
4385 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4386 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4387
4388 return msg;
4389 }
4390
handle_backoff_block(struct ceph_osd * osd,struct MOSDBackoff * m)4391 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4392 {
4393 struct ceph_spg_mapping *spg;
4394 struct ceph_osd_backoff *backoff;
4395 struct ceph_msg *msg;
4396
4397 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4398 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4399
4400 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4401 if (!spg) {
4402 spg = alloc_spg_mapping();
4403 if (!spg) {
4404 pr_err("%s failed to allocate spg\n", __func__);
4405 return;
4406 }
4407 spg->spgid = m->spgid; /* struct */
4408 insert_spg_mapping(&osd->o_backoff_mappings, spg);
4409 }
4410
4411 backoff = alloc_backoff();
4412 if (!backoff) {
4413 pr_err("%s failed to allocate backoff\n", __func__);
4414 return;
4415 }
4416 backoff->spgid = m->spgid; /* struct */
4417 backoff->id = m->id;
4418 backoff->begin = m->begin;
4419 m->begin = NULL; /* backoff now owns this */
4420 backoff->end = m->end;
4421 m->end = NULL; /* ditto */
4422
4423 insert_backoff(&spg->backoffs, backoff);
4424 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4425
4426 /*
4427 * Ack with original backoff's epoch so that the OSD can
4428 * discard this if there was a PG split.
4429 */
4430 msg = create_backoff_message(backoff, m->map_epoch);
4431 if (!msg) {
4432 pr_err("%s failed to allocate msg\n", __func__);
4433 return;
4434 }
4435 ceph_con_send(&osd->o_con, msg);
4436 }
4437
target_contained_by(const struct ceph_osd_request_target * t,const struct ceph_hobject_id * begin,const struct ceph_hobject_id * end)4438 static bool target_contained_by(const struct ceph_osd_request_target *t,
4439 const struct ceph_hobject_id *begin,
4440 const struct ceph_hobject_id *end)
4441 {
4442 struct ceph_hobject_id hoid;
4443 int cmp;
4444
4445 hoid_fill_from_target(&hoid, t);
4446 cmp = hoid_compare(&hoid, begin);
4447 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4448 }
4449
handle_backoff_unblock(struct ceph_osd * osd,const struct MOSDBackoff * m)4450 static void handle_backoff_unblock(struct ceph_osd *osd,
4451 const struct MOSDBackoff *m)
4452 {
4453 struct ceph_spg_mapping *spg;
4454 struct ceph_osd_backoff *backoff;
4455 struct rb_node *n;
4456
4457 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4458 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4459
4460 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4461 if (!backoff) {
4462 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4463 __func__, osd->o_osd, m->spgid.pgid.pool,
4464 m->spgid.pgid.seed, m->spgid.shard, m->id);
4465 return;
4466 }
4467
4468 if (hoid_compare(backoff->begin, m->begin) &&
4469 hoid_compare(backoff->end, m->end)) {
4470 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4471 __func__, osd->o_osd, m->spgid.pgid.pool,
4472 m->spgid.pgid.seed, m->spgid.shard, m->id);
4473 /* unblock it anyway... */
4474 }
4475
4476 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4477 BUG_ON(!spg);
4478
4479 erase_backoff(&spg->backoffs, backoff);
4480 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4481 free_backoff(backoff);
4482
4483 if (RB_EMPTY_ROOT(&spg->backoffs)) {
4484 erase_spg_mapping(&osd->o_backoff_mappings, spg);
4485 free_spg_mapping(spg);
4486 }
4487
4488 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4489 struct ceph_osd_request *req =
4490 rb_entry(n, struct ceph_osd_request, r_node);
4491
4492 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4493 /*
4494 * Match against @m, not @backoff -- the PG may
4495 * have split on the OSD.
4496 */
4497 if (target_contained_by(&req->r_t, m->begin, m->end)) {
4498 /*
4499 * If no other installed backoff applies,
4500 * resend.
4501 */
4502 send_request(req);
4503 }
4504 }
4505 }
4506 }
4507
handle_backoff(struct ceph_osd * osd,struct ceph_msg * msg)4508 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4509 {
4510 struct ceph_osd_client *osdc = osd->o_osdc;
4511 struct MOSDBackoff m;
4512 int ret;
4513
4514 down_read(&osdc->lock);
4515 if (!osd_registered(osd)) {
4516 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4517 up_read(&osdc->lock);
4518 return;
4519 }
4520 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4521
4522 mutex_lock(&osd->lock);
4523 ret = decode_MOSDBackoff(msg, &m);
4524 if (ret) {
4525 pr_err("failed to decode MOSDBackoff: %d\n", ret);
4526 ceph_msg_dump(msg);
4527 goto out_unlock;
4528 }
4529
4530 switch (m.op) {
4531 case CEPH_OSD_BACKOFF_OP_BLOCK:
4532 handle_backoff_block(osd, &m);
4533 break;
4534 case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4535 handle_backoff_unblock(osd, &m);
4536 break;
4537 default:
4538 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4539 }
4540
4541 free_hoid(m.begin);
4542 free_hoid(m.end);
4543
4544 out_unlock:
4545 mutex_unlock(&osd->lock);
4546 up_read(&osdc->lock);
4547 }
4548
4549 /*
4550 * Process osd watch notifications
4551 */
handle_watch_notify(struct ceph_osd_client * osdc,struct ceph_msg * msg)4552 static void handle_watch_notify(struct ceph_osd_client *osdc,
4553 struct ceph_msg *msg)
4554 {
4555 void *p = msg->front.iov_base;
4556 void *const end = p + msg->front.iov_len;
4557 struct ceph_osd_linger_request *lreq;
4558 struct linger_work *lwork;
4559 u8 proto_ver, opcode;
4560 u64 cookie, notify_id;
4561 u64 notifier_id = 0;
4562 s32 return_code = 0;
4563 void *payload = NULL;
4564 u32 payload_len = 0;
4565
4566 ceph_decode_8_safe(&p, end, proto_ver, bad);
4567 ceph_decode_8_safe(&p, end, opcode, bad);
4568 ceph_decode_64_safe(&p, end, cookie, bad);
4569 p += 8; /* skip ver */
4570 ceph_decode_64_safe(&p, end, notify_id, bad);
4571
4572 if (proto_ver >= 1) {
4573 ceph_decode_32_safe(&p, end, payload_len, bad);
4574 ceph_decode_need(&p, end, payload_len, bad);
4575 payload = p;
4576 p += payload_len;
4577 }
4578
4579 if (le16_to_cpu(msg->hdr.version) >= 2)
4580 ceph_decode_32_safe(&p, end, return_code, bad);
4581
4582 if (le16_to_cpu(msg->hdr.version) >= 3)
4583 ceph_decode_64_safe(&p, end, notifier_id, bad);
4584
4585 down_read(&osdc->lock);
4586 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4587 if (!lreq) {
4588 dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4589 cookie);
4590 goto out_unlock_osdc;
4591 }
4592
4593 mutex_lock(&lreq->lock);
4594 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4595 opcode, cookie, lreq, lreq->is_watch);
4596 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4597 if (!lreq->last_error) {
4598 lreq->last_error = -ENOTCONN;
4599 queue_watch_error(lreq);
4600 }
4601 } else if (!lreq->is_watch) {
4602 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4603 if (lreq->notify_id && lreq->notify_id != notify_id) {
4604 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4605 lreq->notify_id, notify_id);
4606 } else if (!completion_done(&lreq->notify_finish_wait)) {
4607 struct ceph_msg_data *data =
4608 msg->num_data_items ? &msg->data[0] : NULL;
4609
4610 if (data) {
4611 if (lreq->preply_pages) {
4612 WARN_ON(data->type !=
4613 CEPH_MSG_DATA_PAGES);
4614 *lreq->preply_pages = data->pages;
4615 *lreq->preply_len = data->length;
4616 data->own_pages = false;
4617 }
4618 }
4619 lreq->notify_finish_error = return_code;
4620 complete_all(&lreq->notify_finish_wait);
4621 }
4622 } else {
4623 /* CEPH_WATCH_EVENT_NOTIFY */
4624 lwork = lwork_alloc(lreq, do_watch_notify);
4625 if (!lwork) {
4626 pr_err("failed to allocate notify-lwork\n");
4627 goto out_unlock_lreq;
4628 }
4629
4630 lwork->notify.notify_id = notify_id;
4631 lwork->notify.notifier_id = notifier_id;
4632 lwork->notify.payload = payload;
4633 lwork->notify.payload_len = payload_len;
4634 lwork->notify.msg = ceph_msg_get(msg);
4635 lwork_queue(lwork);
4636 }
4637
4638 out_unlock_lreq:
4639 mutex_unlock(&lreq->lock);
4640 out_unlock_osdc:
4641 up_read(&osdc->lock);
4642 return;
4643
4644 bad:
4645 pr_err("osdc handle_watch_notify corrupt msg\n");
4646 }
4647
4648 /*
4649 * Register request, send initial attempt.
4650 */
ceph_osdc_start_request(struct ceph_osd_client * osdc,struct ceph_osd_request * req)4651 void ceph_osdc_start_request(struct ceph_osd_client *osdc,
4652 struct ceph_osd_request *req)
4653 {
4654 down_read(&osdc->lock);
4655 submit_request(req, false);
4656 up_read(&osdc->lock);
4657 }
4658 EXPORT_SYMBOL(ceph_osdc_start_request);
4659
4660 /*
4661 * Unregister request. If @req was registered, it isn't completed:
4662 * r_result isn't set and __complete_request() isn't invoked.
4663 *
4664 * If @req wasn't registered, this call may have raced with
4665 * handle_reply(), in which case r_result would already be set and
4666 * __complete_request() would be getting invoked, possibly even
4667 * concurrently with this call.
4668 */
ceph_osdc_cancel_request(struct ceph_osd_request * req)4669 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4670 {
4671 struct ceph_osd_client *osdc = req->r_osdc;
4672
4673 down_write(&osdc->lock);
4674 if (req->r_osd)
4675 cancel_request(req);
4676 up_write(&osdc->lock);
4677 }
4678 EXPORT_SYMBOL(ceph_osdc_cancel_request);
4679
4680 /*
4681 * @timeout: in jiffies, 0 means "wait forever"
4682 */
wait_request_timeout(struct ceph_osd_request * req,unsigned long timeout)4683 static int wait_request_timeout(struct ceph_osd_request *req,
4684 unsigned long timeout)
4685 {
4686 long left;
4687
4688 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4689 left = wait_for_completion_killable_timeout(&req->r_completion,
4690 ceph_timeout_jiffies(timeout));
4691 if (left <= 0) {
4692 left = left ?: -ETIMEDOUT;
4693 ceph_osdc_cancel_request(req);
4694 } else {
4695 left = req->r_result; /* completed */
4696 }
4697
4698 return left;
4699 }
4700
4701 /*
4702 * wait for a request to complete
4703 */
ceph_osdc_wait_request(struct ceph_osd_client * osdc,struct ceph_osd_request * req)4704 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4705 struct ceph_osd_request *req)
4706 {
4707 return wait_request_timeout(req, 0);
4708 }
4709 EXPORT_SYMBOL(ceph_osdc_wait_request);
4710
4711 /*
4712 * sync - wait for all in-flight requests to flush. avoid starvation.
4713 */
ceph_osdc_sync(struct ceph_osd_client * osdc)4714 void ceph_osdc_sync(struct ceph_osd_client *osdc)
4715 {
4716 struct rb_node *n, *p;
4717 u64 last_tid = atomic64_read(&osdc->last_tid);
4718
4719 again:
4720 down_read(&osdc->lock);
4721 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4722 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4723
4724 mutex_lock(&osd->lock);
4725 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4726 struct ceph_osd_request *req =
4727 rb_entry(p, struct ceph_osd_request, r_node);
4728
4729 if (req->r_tid > last_tid)
4730 break;
4731
4732 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4733 continue;
4734
4735 ceph_osdc_get_request(req);
4736 mutex_unlock(&osd->lock);
4737 up_read(&osdc->lock);
4738 dout("%s waiting on req %p tid %llu last_tid %llu\n",
4739 __func__, req, req->r_tid, last_tid);
4740 wait_for_completion(&req->r_completion);
4741 ceph_osdc_put_request(req);
4742 goto again;
4743 }
4744
4745 mutex_unlock(&osd->lock);
4746 }
4747
4748 up_read(&osdc->lock);
4749 dout("%s done last_tid %llu\n", __func__, last_tid);
4750 }
4751 EXPORT_SYMBOL(ceph_osdc_sync);
4752
4753 /*
4754 * Returns a handle, caller owns a ref.
4755 */
4756 struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,rados_watchcb2_t wcb,rados_watcherrcb_t errcb,void * data)4757 ceph_osdc_watch(struct ceph_osd_client *osdc,
4758 struct ceph_object_id *oid,
4759 struct ceph_object_locator *oloc,
4760 rados_watchcb2_t wcb,
4761 rados_watcherrcb_t errcb,
4762 void *data)
4763 {
4764 struct ceph_osd_linger_request *lreq;
4765 int ret;
4766
4767 lreq = linger_alloc(osdc);
4768 if (!lreq)
4769 return ERR_PTR(-ENOMEM);
4770
4771 lreq->is_watch = true;
4772 lreq->wcb = wcb;
4773 lreq->errcb = errcb;
4774 lreq->data = data;
4775 lreq->watch_valid_thru = jiffies;
4776
4777 ceph_oid_copy(&lreq->t.base_oid, oid);
4778 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4779 lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4780 ktime_get_real_ts64(&lreq->mtime);
4781
4782 linger_submit(lreq);
4783 ret = linger_reg_commit_wait(lreq);
4784 if (ret) {
4785 linger_cancel(lreq);
4786 goto err_put_lreq;
4787 }
4788
4789 return lreq;
4790
4791 err_put_lreq:
4792 linger_put(lreq);
4793 return ERR_PTR(ret);
4794 }
4795 EXPORT_SYMBOL(ceph_osdc_watch);
4796
4797 /*
4798 * Releases a ref.
4799 *
4800 * Times out after mount_timeout to preserve rbd unmap behaviour
4801 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4802 * with mount_timeout").
4803 */
ceph_osdc_unwatch(struct ceph_osd_client * osdc,struct ceph_osd_linger_request * lreq)4804 int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4805 struct ceph_osd_linger_request *lreq)
4806 {
4807 struct ceph_options *opts = osdc->client->options;
4808 struct ceph_osd_request *req;
4809 int ret;
4810
4811 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4812 if (!req)
4813 return -ENOMEM;
4814
4815 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4816 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4817 req->r_flags = CEPH_OSD_FLAG_WRITE;
4818 ktime_get_real_ts64(&req->r_mtime);
4819 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH,
4820 lreq->linger_id, 0);
4821
4822 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4823 if (ret)
4824 goto out_put_req;
4825
4826 ceph_osdc_start_request(osdc, req);
4827 linger_cancel(lreq);
4828 linger_put(lreq);
4829 ret = wait_request_timeout(req, opts->mount_timeout);
4830
4831 out_put_req:
4832 ceph_osdc_put_request(req);
4833 return ret;
4834 }
4835 EXPORT_SYMBOL(ceph_osdc_unwatch);
4836
osd_req_op_notify_ack_init(struct ceph_osd_request * req,int which,u64 notify_id,u64 cookie,void * payload,u32 payload_len)4837 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4838 u64 notify_id, u64 cookie, void *payload,
4839 u32 payload_len)
4840 {
4841 struct ceph_osd_req_op *op;
4842 struct ceph_pagelist *pl;
4843 int ret;
4844
4845 op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4846
4847 pl = ceph_pagelist_alloc(GFP_NOIO);
4848 if (!pl)
4849 return -ENOMEM;
4850
4851 ret = ceph_pagelist_encode_64(pl, notify_id);
4852 ret |= ceph_pagelist_encode_64(pl, cookie);
4853 if (payload) {
4854 ret |= ceph_pagelist_encode_32(pl, payload_len);
4855 ret |= ceph_pagelist_append(pl, payload, payload_len);
4856 } else {
4857 ret |= ceph_pagelist_encode_32(pl, 0);
4858 }
4859 if (ret) {
4860 ceph_pagelist_release(pl);
4861 return -ENOMEM;
4862 }
4863
4864 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4865 op->indata_len = pl->length;
4866 return 0;
4867 }
4868
ceph_osdc_notify_ack(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,u64 notify_id,u64 cookie,void * payload,u32 payload_len)4869 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4870 struct ceph_object_id *oid,
4871 struct ceph_object_locator *oloc,
4872 u64 notify_id,
4873 u64 cookie,
4874 void *payload,
4875 u32 payload_len)
4876 {
4877 struct ceph_osd_request *req;
4878 int ret;
4879
4880 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4881 if (!req)
4882 return -ENOMEM;
4883
4884 ceph_oid_copy(&req->r_base_oid, oid);
4885 ceph_oloc_copy(&req->r_base_oloc, oloc);
4886 req->r_flags = CEPH_OSD_FLAG_READ;
4887
4888 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4889 payload_len);
4890 if (ret)
4891 goto out_put_req;
4892
4893 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4894 if (ret)
4895 goto out_put_req;
4896
4897 ceph_osdc_start_request(osdc, req);
4898 ret = ceph_osdc_wait_request(osdc, req);
4899
4900 out_put_req:
4901 ceph_osdc_put_request(req);
4902 return ret;
4903 }
4904 EXPORT_SYMBOL(ceph_osdc_notify_ack);
4905
4906 /*
4907 * @timeout: in seconds
4908 *
4909 * @preply_{pages,len} are initialized both on success and error.
4910 * The caller is responsible for:
4911 *
4912 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4913 */
ceph_osdc_notify(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,void * payload,u32 payload_len,u32 timeout,struct page *** preply_pages,size_t * preply_len)4914 int ceph_osdc_notify(struct ceph_osd_client *osdc,
4915 struct ceph_object_id *oid,
4916 struct ceph_object_locator *oloc,
4917 void *payload,
4918 u32 payload_len,
4919 u32 timeout,
4920 struct page ***preply_pages,
4921 size_t *preply_len)
4922 {
4923 struct ceph_osd_linger_request *lreq;
4924 int ret;
4925
4926 WARN_ON(!timeout);
4927 if (preply_pages) {
4928 *preply_pages = NULL;
4929 *preply_len = 0;
4930 }
4931
4932 lreq = linger_alloc(osdc);
4933 if (!lreq)
4934 return -ENOMEM;
4935
4936 lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
4937 if (!lreq->request_pl) {
4938 ret = -ENOMEM;
4939 goto out_put_lreq;
4940 }
4941
4942 ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
4943 ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
4944 ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
4945 ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
4946 if (ret) {
4947 ret = -ENOMEM;
4948 goto out_put_lreq;
4949 }
4950
4951 /* for notify_id */
4952 lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
4953 if (IS_ERR(lreq->notify_id_pages)) {
4954 ret = PTR_ERR(lreq->notify_id_pages);
4955 lreq->notify_id_pages = NULL;
4956 goto out_put_lreq;
4957 }
4958
4959 lreq->preply_pages = preply_pages;
4960 lreq->preply_len = preply_len;
4961
4962 ceph_oid_copy(&lreq->t.base_oid, oid);
4963 ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4964 lreq->t.flags = CEPH_OSD_FLAG_READ;
4965
4966 linger_submit(lreq);
4967 ret = linger_reg_commit_wait(lreq);
4968 if (!ret)
4969 ret = linger_notify_finish_wait(lreq,
4970 msecs_to_jiffies(2 * timeout * MSEC_PER_SEC));
4971 else
4972 dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4973
4974 linger_cancel(lreq);
4975 out_put_lreq:
4976 linger_put(lreq);
4977 return ret;
4978 }
4979 EXPORT_SYMBOL(ceph_osdc_notify);
4980
decode_watcher(void ** p,void * end,struct ceph_watch_item * item)4981 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4982 {
4983 u8 struct_v;
4984 u32 struct_len;
4985 int ret;
4986
4987 ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4988 &struct_v, &struct_len);
4989 if (ret)
4990 goto bad;
4991
4992 ret = -EINVAL;
4993 ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
4994 ceph_decode_64_safe(p, end, item->cookie, bad);
4995 ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
4996
4997 if (struct_v >= 2) {
4998 ret = ceph_decode_entity_addr(p, end, &item->addr);
4999 if (ret)
5000 goto bad;
5001 } else {
5002 ret = 0;
5003 }
5004
5005 dout("%s %s%llu cookie %llu addr %s\n", __func__,
5006 ENTITY_NAME(item->name), item->cookie,
5007 ceph_pr_addr(&item->addr));
5008 bad:
5009 return ret;
5010 }
5011
decode_watchers(void ** p,void * end,struct ceph_watch_item ** watchers,u32 * num_watchers)5012 static int decode_watchers(void **p, void *end,
5013 struct ceph_watch_item **watchers,
5014 u32 *num_watchers)
5015 {
5016 u8 struct_v;
5017 u32 struct_len;
5018 int i;
5019 int ret;
5020
5021 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
5022 &struct_v, &struct_len);
5023 if (ret)
5024 return ret;
5025
5026 *num_watchers = ceph_decode_32(p);
5027 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
5028 if (!*watchers)
5029 return -ENOMEM;
5030
5031 for (i = 0; i < *num_watchers; i++) {
5032 ret = decode_watcher(p, end, *watchers + i);
5033 if (ret) {
5034 kfree(*watchers);
5035 return ret;
5036 }
5037 }
5038
5039 return 0;
5040 }
5041
5042 /*
5043 * On success, the caller is responsible for:
5044 *
5045 * kfree(watchers);
5046 */
ceph_osdc_list_watchers(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,struct ceph_watch_item ** watchers,u32 * num_watchers)5047 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
5048 struct ceph_object_id *oid,
5049 struct ceph_object_locator *oloc,
5050 struct ceph_watch_item **watchers,
5051 u32 *num_watchers)
5052 {
5053 struct ceph_osd_request *req;
5054 struct page **pages;
5055 int ret;
5056
5057 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5058 if (!req)
5059 return -ENOMEM;
5060
5061 ceph_oid_copy(&req->r_base_oid, oid);
5062 ceph_oloc_copy(&req->r_base_oloc, oloc);
5063 req->r_flags = CEPH_OSD_FLAG_READ;
5064
5065 pages = ceph_alloc_page_vector(1, GFP_NOIO);
5066 if (IS_ERR(pages)) {
5067 ret = PTR_ERR(pages);
5068 goto out_put_req;
5069 }
5070
5071 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
5072 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
5073 response_data),
5074 pages, PAGE_SIZE, 0, false, true);
5075
5076 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5077 if (ret)
5078 goto out_put_req;
5079
5080 ceph_osdc_start_request(osdc, req);
5081 ret = ceph_osdc_wait_request(osdc, req);
5082 if (ret >= 0) {
5083 void *p = page_address(pages[0]);
5084 void *const end = p + req->r_ops[0].outdata_len;
5085
5086 ret = decode_watchers(&p, end, watchers, num_watchers);
5087 }
5088
5089 out_put_req:
5090 ceph_osdc_put_request(req);
5091 return ret;
5092 }
5093 EXPORT_SYMBOL(ceph_osdc_list_watchers);
5094
5095 /*
5096 * Call all pending notify callbacks - for use after a watch is
5097 * unregistered, to make sure no more callbacks for it will be invoked
5098 */
ceph_osdc_flush_notifies(struct ceph_osd_client * osdc)5099 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
5100 {
5101 dout("%s osdc %p\n", __func__, osdc);
5102 flush_workqueue(osdc->notify_wq);
5103 }
5104 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
5105
ceph_osdc_maybe_request_map(struct ceph_osd_client * osdc)5106 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
5107 {
5108 down_read(&osdc->lock);
5109 maybe_request_map(osdc);
5110 up_read(&osdc->lock);
5111 }
5112 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
5113
5114 /*
5115 * Execute an OSD class method on an object.
5116 *
5117 * @flags: CEPH_OSD_FLAG_*
5118 * @resp_len: in/out param for reply length
5119 */
ceph_osdc_call(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,const char * class,const char * method,unsigned int flags,struct page * req_page,size_t req_len,struct page ** resp_pages,size_t * resp_len)5120 int ceph_osdc_call(struct ceph_osd_client *osdc,
5121 struct ceph_object_id *oid,
5122 struct ceph_object_locator *oloc,
5123 const char *class, const char *method,
5124 unsigned int flags,
5125 struct page *req_page, size_t req_len,
5126 struct page **resp_pages, size_t *resp_len)
5127 {
5128 struct ceph_osd_request *req;
5129 int ret;
5130
5131 if (req_len > PAGE_SIZE)
5132 return -E2BIG;
5133
5134 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5135 if (!req)
5136 return -ENOMEM;
5137
5138 ceph_oid_copy(&req->r_base_oid, oid);
5139 ceph_oloc_copy(&req->r_base_oloc, oloc);
5140 req->r_flags = flags;
5141
5142 ret = osd_req_op_cls_init(req, 0, class, method);
5143 if (ret)
5144 goto out_put_req;
5145
5146 if (req_page)
5147 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
5148 0, false, false);
5149 if (resp_pages)
5150 osd_req_op_cls_response_data_pages(req, 0, resp_pages,
5151 *resp_len, 0, false, false);
5152
5153 ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5154 if (ret)
5155 goto out_put_req;
5156
5157 ceph_osdc_start_request(osdc, req);
5158 ret = ceph_osdc_wait_request(osdc, req);
5159 if (ret >= 0) {
5160 ret = req->r_ops[0].rval;
5161 if (resp_pages)
5162 *resp_len = req->r_ops[0].outdata_len;
5163 }
5164
5165 out_put_req:
5166 ceph_osdc_put_request(req);
5167 return ret;
5168 }
5169 EXPORT_SYMBOL(ceph_osdc_call);
5170
5171 /*
5172 * reset all osd connections
5173 */
ceph_osdc_reopen_osds(struct ceph_osd_client * osdc)5174 void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5175 {
5176 struct rb_node *n;
5177
5178 down_write(&osdc->lock);
5179 for (n = rb_first(&osdc->osds); n; ) {
5180 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5181
5182 n = rb_next(n);
5183 if (!reopen_osd(osd))
5184 kick_osd_requests(osd);
5185 }
5186 up_write(&osdc->lock);
5187 }
5188
5189 /*
5190 * init, shutdown
5191 */
ceph_osdc_init(struct ceph_osd_client * osdc,struct ceph_client * client)5192 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5193 {
5194 int err;
5195
5196 dout("init\n");
5197 osdc->client = client;
5198 init_rwsem(&osdc->lock);
5199 osdc->osds = RB_ROOT;
5200 INIT_LIST_HEAD(&osdc->osd_lru);
5201 spin_lock_init(&osdc->osd_lru_lock);
5202 osd_init(&osdc->homeless_osd);
5203 osdc->homeless_osd.o_osdc = osdc;
5204 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
5205 osdc->last_linger_id = CEPH_LINGER_ID_START;
5206 osdc->linger_requests = RB_ROOT;
5207 osdc->map_checks = RB_ROOT;
5208 osdc->linger_map_checks = RB_ROOT;
5209 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
5210 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
5211
5212 err = -ENOMEM;
5213 osdc->osdmap = ceph_osdmap_alloc();
5214 if (!osdc->osdmap)
5215 goto out;
5216
5217 osdc->req_mempool = mempool_create_slab_pool(10,
5218 ceph_osd_request_cache);
5219 if (!osdc->req_mempool)
5220 goto out_map;
5221
5222 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5223 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5224 if (err < 0)
5225 goto out_mempool;
5226 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5227 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5228 "osd_op_reply");
5229 if (err < 0)
5230 goto out_msgpool;
5231
5232 err = -ENOMEM;
5233 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
5234 if (!osdc->notify_wq)
5235 goto out_msgpool_reply;
5236
5237 osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5238 if (!osdc->completion_wq)
5239 goto out_notify_wq;
5240
5241 schedule_delayed_work(&osdc->timeout_work,
5242 osdc->client->options->osd_keepalive_timeout);
5243 schedule_delayed_work(&osdc->osds_timeout_work,
5244 round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5245
5246 return 0;
5247
5248 out_notify_wq:
5249 destroy_workqueue(osdc->notify_wq);
5250 out_msgpool_reply:
5251 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5252 out_msgpool:
5253 ceph_msgpool_destroy(&osdc->msgpool_op);
5254 out_mempool:
5255 mempool_destroy(osdc->req_mempool);
5256 out_map:
5257 ceph_osdmap_destroy(osdc->osdmap);
5258 out:
5259 return err;
5260 }
5261
ceph_osdc_stop(struct ceph_osd_client * osdc)5262 void ceph_osdc_stop(struct ceph_osd_client *osdc)
5263 {
5264 destroy_workqueue(osdc->completion_wq);
5265 destroy_workqueue(osdc->notify_wq);
5266 cancel_delayed_work_sync(&osdc->timeout_work);
5267 cancel_delayed_work_sync(&osdc->osds_timeout_work);
5268
5269 down_write(&osdc->lock);
5270 while (!RB_EMPTY_ROOT(&osdc->osds)) {
5271 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5272 struct ceph_osd, o_node);
5273 close_osd(osd);
5274 }
5275 up_write(&osdc->lock);
5276 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5277 osd_cleanup(&osdc->homeless_osd);
5278
5279 WARN_ON(!list_empty(&osdc->osd_lru));
5280 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5281 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5282 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5283 WARN_ON(atomic_read(&osdc->num_requests));
5284 WARN_ON(atomic_read(&osdc->num_homeless));
5285
5286 ceph_osdmap_destroy(osdc->osdmap);
5287 mempool_destroy(osdc->req_mempool);
5288 ceph_msgpool_destroy(&osdc->msgpool_op);
5289 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5290 }
5291
osd_req_op_copy_from_init(struct ceph_osd_request * req,u64 src_snapid,u64 src_version,struct ceph_object_id * src_oid,struct ceph_object_locator * src_oloc,u32 src_fadvise_flags,u32 dst_fadvise_flags,u32 truncate_seq,u64 truncate_size,u8 copy_from_flags)5292 int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5293 u64 src_snapid, u64 src_version,
5294 struct ceph_object_id *src_oid,
5295 struct ceph_object_locator *src_oloc,
5296 u32 src_fadvise_flags,
5297 u32 dst_fadvise_flags,
5298 u32 truncate_seq, u64 truncate_size,
5299 u8 copy_from_flags)
5300 {
5301 struct ceph_osd_req_op *op;
5302 struct page **pages;
5303 void *p, *end;
5304
5305 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5306 if (IS_ERR(pages))
5307 return PTR_ERR(pages);
5308
5309 op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5310 dst_fadvise_flags);
5311 op->copy_from.snapid = src_snapid;
5312 op->copy_from.src_version = src_version;
5313 op->copy_from.flags = copy_from_flags;
5314 op->copy_from.src_fadvise_flags = src_fadvise_flags;
5315
5316 p = page_address(pages[0]);
5317 end = p + PAGE_SIZE;
5318 ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5319 encode_oloc(&p, end, src_oloc);
5320 ceph_encode_32(&p, truncate_seq);
5321 ceph_encode_64(&p, truncate_size);
5322 op->indata_len = PAGE_SIZE - (end - p);
5323
5324 ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5325 op->indata_len, 0, false, true);
5326 return 0;
5327 }
5328 EXPORT_SYMBOL(osd_req_op_copy_from_init);
5329
ceph_osdc_setup(void)5330 int __init ceph_osdc_setup(void)
5331 {
5332 size_t size = sizeof(struct ceph_osd_request) +
5333 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5334
5335 BUG_ON(ceph_osd_request_cache);
5336 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5337 0, 0, NULL);
5338
5339 return ceph_osd_request_cache ? 0 : -ENOMEM;
5340 }
5341
ceph_osdc_cleanup(void)5342 void ceph_osdc_cleanup(void)
5343 {
5344 BUG_ON(!ceph_osd_request_cache);
5345 kmem_cache_destroy(ceph_osd_request_cache);
5346 ceph_osd_request_cache = NULL;
5347 }
5348
5349 /*
5350 * handle incoming message
5351 */
osd_dispatch(struct ceph_connection * con,struct ceph_msg * msg)5352 static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5353 {
5354 struct ceph_osd *osd = con->private;
5355 struct ceph_osd_client *osdc = osd->o_osdc;
5356 int type = le16_to_cpu(msg->hdr.type);
5357
5358 switch (type) {
5359 case CEPH_MSG_OSD_MAP:
5360 ceph_osdc_handle_map(osdc, msg);
5361 break;
5362 case CEPH_MSG_OSD_OPREPLY:
5363 handle_reply(osd, msg);
5364 break;
5365 case CEPH_MSG_OSD_BACKOFF:
5366 handle_backoff(osd, msg);
5367 break;
5368 case CEPH_MSG_WATCH_NOTIFY:
5369 handle_watch_notify(osdc, msg);
5370 break;
5371
5372 default:
5373 pr_err("received unknown message type %d %s\n", type,
5374 ceph_msg_type_name(type));
5375 }
5376
5377 ceph_msg_put(msg);
5378 }
5379
5380 /* How much sparse data was requested? */
sparse_data_requested(struct ceph_osd_request * req)5381 static u64 sparse_data_requested(struct ceph_osd_request *req)
5382 {
5383 u64 len = 0;
5384
5385 if (req->r_flags & CEPH_OSD_FLAG_READ) {
5386 int i;
5387
5388 for (i = 0; i < req->r_num_ops; ++i) {
5389 struct ceph_osd_req_op *op = &req->r_ops[i];
5390
5391 if (op->op == CEPH_OSD_OP_SPARSE_READ)
5392 len += op->extent.length;
5393 }
5394 }
5395 return len;
5396 }
5397
5398 /*
5399 * Lookup and return message for incoming reply. Don't try to do
5400 * anything about a larger than preallocated data portion of the
5401 * message at the moment - for now, just skip the message.
5402 */
get_reply(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5403 static struct ceph_msg *get_reply(struct ceph_connection *con,
5404 struct ceph_msg_header *hdr,
5405 int *skip)
5406 {
5407 struct ceph_osd *osd = con->private;
5408 struct ceph_osd_client *osdc = osd->o_osdc;
5409 struct ceph_msg *m = NULL;
5410 struct ceph_osd_request *req;
5411 int front_len = le32_to_cpu(hdr->front_len);
5412 int data_len = le32_to_cpu(hdr->data_len);
5413 u64 tid = le64_to_cpu(hdr->tid);
5414 u64 srlen;
5415
5416 down_read(&osdc->lock);
5417 if (!osd_registered(osd)) {
5418 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5419 *skip = 1;
5420 goto out_unlock_osdc;
5421 }
5422 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5423
5424 mutex_lock(&osd->lock);
5425 req = lookup_request(&osd->o_requests, tid);
5426 if (!req) {
5427 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5428 osd->o_osd, tid);
5429 *skip = 1;
5430 goto out_unlock_session;
5431 }
5432
5433 ceph_msg_revoke_incoming(req->r_reply);
5434
5435 if (front_len > req->r_reply->front_alloc_len) {
5436 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5437 __func__, osd->o_osd, req->r_tid, front_len,
5438 req->r_reply->front_alloc_len);
5439 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5440 false);
5441 if (!m)
5442 goto out_unlock_session;
5443 ceph_msg_put(req->r_reply);
5444 req->r_reply = m;
5445 }
5446
5447 srlen = sparse_data_requested(req);
5448 if (!srlen && data_len > req->r_reply->data_length) {
5449 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5450 __func__, osd->o_osd, req->r_tid, data_len,
5451 req->r_reply->data_length);
5452 m = NULL;
5453 *skip = 1;
5454 goto out_unlock_session;
5455 }
5456
5457 m = ceph_msg_get(req->r_reply);
5458 m->sparse_read_total = srlen;
5459
5460 dout("get_reply tid %lld %p\n", tid, m);
5461
5462 out_unlock_session:
5463 mutex_unlock(&osd->lock);
5464 out_unlock_osdc:
5465 up_read(&osdc->lock);
5466 return m;
5467 }
5468
alloc_msg_with_page_vector(struct ceph_msg_header * hdr)5469 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5470 {
5471 struct ceph_msg *m;
5472 int type = le16_to_cpu(hdr->type);
5473 u32 front_len = le32_to_cpu(hdr->front_len);
5474 u32 data_len = le32_to_cpu(hdr->data_len);
5475
5476 m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5477 if (!m)
5478 return NULL;
5479
5480 if (data_len) {
5481 struct page **pages;
5482
5483 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5484 GFP_NOIO);
5485 if (IS_ERR(pages)) {
5486 ceph_msg_put(m);
5487 return NULL;
5488 }
5489
5490 ceph_msg_data_add_pages(m, pages, data_len, 0, true);
5491 }
5492
5493 return m;
5494 }
5495
osd_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5496 static struct ceph_msg *osd_alloc_msg(struct ceph_connection *con,
5497 struct ceph_msg_header *hdr,
5498 int *skip)
5499 {
5500 struct ceph_osd *osd = con->private;
5501 int type = le16_to_cpu(hdr->type);
5502
5503 *skip = 0;
5504 switch (type) {
5505 case CEPH_MSG_OSD_MAP:
5506 case CEPH_MSG_OSD_BACKOFF:
5507 case CEPH_MSG_WATCH_NOTIFY:
5508 return alloc_msg_with_page_vector(hdr);
5509 case CEPH_MSG_OSD_OPREPLY:
5510 return get_reply(con, hdr, skip);
5511 default:
5512 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5513 osd->o_osd, type);
5514 *skip = 1;
5515 return NULL;
5516 }
5517 }
5518
5519 /*
5520 * Wrappers to refcount containing ceph_osd struct
5521 */
osd_get_con(struct ceph_connection * con)5522 static struct ceph_connection *osd_get_con(struct ceph_connection *con)
5523 {
5524 struct ceph_osd *osd = con->private;
5525 if (get_osd(osd))
5526 return con;
5527 return NULL;
5528 }
5529
osd_put_con(struct ceph_connection * con)5530 static void osd_put_con(struct ceph_connection *con)
5531 {
5532 struct ceph_osd *osd = con->private;
5533 put_osd(osd);
5534 }
5535
5536 /*
5537 * authentication
5538 */
5539
5540 /*
5541 * Note: returned pointer is the address of a structure that's
5542 * managed separately. Caller must *not* attempt to free it.
5543 */
5544 static struct ceph_auth_handshake *
osd_get_authorizer(struct ceph_connection * con,int * proto,int force_new)5545 osd_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5546 {
5547 struct ceph_osd *o = con->private;
5548 struct ceph_osd_client *osdc = o->o_osdc;
5549 struct ceph_auth_client *ac = osdc->client->monc.auth;
5550 struct ceph_auth_handshake *auth = &o->o_auth;
5551 int ret;
5552
5553 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5554 force_new, proto, NULL, NULL);
5555 if (ret)
5556 return ERR_PTR(ret);
5557
5558 return auth;
5559 }
5560
osd_add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)5561 static int osd_add_authorizer_challenge(struct ceph_connection *con,
5562 void *challenge_buf, int challenge_buf_len)
5563 {
5564 struct ceph_osd *o = con->private;
5565 struct ceph_osd_client *osdc = o->o_osdc;
5566 struct ceph_auth_client *ac = osdc->client->monc.auth;
5567
5568 return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer,
5569 challenge_buf, challenge_buf_len);
5570 }
5571
osd_verify_authorizer_reply(struct ceph_connection * con)5572 static int osd_verify_authorizer_reply(struct ceph_connection *con)
5573 {
5574 struct ceph_osd *o = con->private;
5575 struct ceph_osd_client *osdc = o->o_osdc;
5576 struct ceph_auth_client *ac = osdc->client->monc.auth;
5577 struct ceph_auth_handshake *auth = &o->o_auth;
5578
5579 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5580 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5581 NULL, NULL, NULL, NULL);
5582 }
5583
osd_invalidate_authorizer(struct ceph_connection * con)5584 static int osd_invalidate_authorizer(struct ceph_connection *con)
5585 {
5586 struct ceph_osd *o = con->private;
5587 struct ceph_osd_client *osdc = o->o_osdc;
5588 struct ceph_auth_client *ac = osdc->client->monc.auth;
5589
5590 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5591 return ceph_monc_validate_auth(&osdc->client->monc);
5592 }
5593
osd_get_auth_request(struct ceph_connection * con,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5594 static int osd_get_auth_request(struct ceph_connection *con,
5595 void *buf, int *buf_len,
5596 void **authorizer, int *authorizer_len)
5597 {
5598 struct ceph_osd *o = con->private;
5599 struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5600 struct ceph_auth_handshake *auth = &o->o_auth;
5601 int ret;
5602
5603 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5604 buf, buf_len);
5605 if (ret)
5606 return ret;
5607
5608 *authorizer = auth->authorizer_buf;
5609 *authorizer_len = auth->authorizer_buf_len;
5610 return 0;
5611 }
5612
osd_handle_auth_reply_more(struct ceph_connection * con,void * reply,int reply_len,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5613 static int osd_handle_auth_reply_more(struct ceph_connection *con,
5614 void *reply, int reply_len,
5615 void *buf, int *buf_len,
5616 void **authorizer, int *authorizer_len)
5617 {
5618 struct ceph_osd *o = con->private;
5619 struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5620 struct ceph_auth_handshake *auth = &o->o_auth;
5621 int ret;
5622
5623 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5624 buf, buf_len);
5625 if (ret)
5626 return ret;
5627
5628 *authorizer = auth->authorizer_buf;
5629 *authorizer_len = auth->authorizer_buf_len;
5630 return 0;
5631 }
5632
osd_handle_auth_done(struct ceph_connection * con,u64 global_id,void * reply,int reply_len,u8 * session_key,int * session_key_len,u8 * con_secret,int * con_secret_len)5633 static int osd_handle_auth_done(struct ceph_connection *con,
5634 u64 global_id, void *reply, int reply_len,
5635 u8 *session_key, int *session_key_len,
5636 u8 *con_secret, int *con_secret_len)
5637 {
5638 struct ceph_osd *o = con->private;
5639 struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5640 struct ceph_auth_handshake *auth = &o->o_auth;
5641
5642 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5643 session_key, session_key_len,
5644 con_secret, con_secret_len);
5645 }
5646
osd_handle_auth_bad_method(struct ceph_connection * con,int used_proto,int result,const int * allowed_protos,int proto_cnt,const int * allowed_modes,int mode_cnt)5647 static int osd_handle_auth_bad_method(struct ceph_connection *con,
5648 int used_proto, int result,
5649 const int *allowed_protos, int proto_cnt,
5650 const int *allowed_modes, int mode_cnt)
5651 {
5652 struct ceph_osd *o = con->private;
5653 struct ceph_mon_client *monc = &o->o_osdc->client->monc;
5654 int ret;
5655
5656 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_OSD,
5657 used_proto, result,
5658 allowed_protos, proto_cnt,
5659 allowed_modes, mode_cnt)) {
5660 ret = ceph_monc_validate_auth(monc);
5661 if (ret)
5662 return ret;
5663 }
5664
5665 return -EACCES;
5666 }
5667
osd_reencode_message(struct ceph_msg * msg)5668 static void osd_reencode_message(struct ceph_msg *msg)
5669 {
5670 int type = le16_to_cpu(msg->hdr.type);
5671
5672 if (type == CEPH_MSG_OSD_OP)
5673 encode_request_finish(msg);
5674 }
5675
osd_sign_message(struct ceph_msg * msg)5676 static int osd_sign_message(struct ceph_msg *msg)
5677 {
5678 struct ceph_osd *o = msg->con->private;
5679 struct ceph_auth_handshake *auth = &o->o_auth;
5680
5681 return ceph_auth_sign_message(auth, msg);
5682 }
5683
osd_check_message_signature(struct ceph_msg * msg)5684 static int osd_check_message_signature(struct ceph_msg *msg)
5685 {
5686 struct ceph_osd *o = msg->con->private;
5687 struct ceph_auth_handshake *auth = &o->o_auth;
5688
5689 return ceph_auth_check_message_signature(auth, msg);
5690 }
5691
advance_cursor(struct ceph_msg_data_cursor * cursor,size_t len,bool zero)5692 static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len,
5693 bool zero)
5694 {
5695 while (len) {
5696 struct page *page;
5697 size_t poff, plen;
5698
5699 page = ceph_msg_data_next(cursor, &poff, &plen);
5700 if (plen > len)
5701 plen = len;
5702 if (zero)
5703 zero_user_segment(page, poff, poff + plen);
5704 len -= plen;
5705 ceph_msg_data_advance(cursor, plen);
5706 }
5707 }
5708
prep_next_sparse_read(struct ceph_connection * con,struct ceph_msg_data_cursor * cursor)5709 static int prep_next_sparse_read(struct ceph_connection *con,
5710 struct ceph_msg_data_cursor *cursor)
5711 {
5712 struct ceph_osd *o = con->private;
5713 struct ceph_sparse_read *sr = &o->o_sparse_read;
5714 struct ceph_osd_request *req;
5715 struct ceph_osd_req_op *op;
5716
5717 spin_lock(&o->o_requests_lock);
5718 req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
5719 if (!req) {
5720 spin_unlock(&o->o_requests_lock);
5721 return -EBADR;
5722 }
5723
5724 if (o->o_sparse_op_idx < 0) {
5725 dout("%s: [%d] starting new sparse read req\n",
5726 __func__, o->o_osd);
5727 } else {
5728 u64 end;
5729
5730 op = &req->r_ops[o->o_sparse_op_idx];
5731
5732 WARN_ON_ONCE(op->extent.sparse_ext);
5733
5734 /* hand back buffer we took earlier */
5735 op->extent.sparse_ext = sr->sr_extent;
5736 sr->sr_extent = NULL;
5737 op->extent.sparse_ext_cnt = sr->sr_count;
5738 sr->sr_ext_len = 0;
5739 dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
5740 __func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid);
5741 /* Advance to end of data for this operation */
5742 end = ceph_sparse_ext_map_end(op);
5743 if (end < sr->sr_req_len)
5744 advance_cursor(cursor, sr->sr_req_len - end, false);
5745 }
5746
5747 ceph_init_sparse_read(sr);
5748
5749 /* find next op in this request (if any) */
5750 while (++o->o_sparse_op_idx < req->r_num_ops) {
5751 op = &req->r_ops[o->o_sparse_op_idx];
5752 if (op->op == CEPH_OSD_OP_SPARSE_READ)
5753 goto found;
5754 }
5755
5756 /* reset for next sparse read request */
5757 spin_unlock(&o->o_requests_lock);
5758 o->o_sparse_op_idx = -1;
5759 return 0;
5760 found:
5761 sr->sr_req_off = op->extent.offset;
5762 sr->sr_req_len = op->extent.length;
5763 sr->sr_pos = sr->sr_req_off;
5764 dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
5765 o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
5766
5767 /* hand off request's sparse extent map buffer */
5768 sr->sr_ext_len = op->extent.sparse_ext_cnt;
5769 op->extent.sparse_ext_cnt = 0;
5770 sr->sr_extent = op->extent.sparse_ext;
5771 op->extent.sparse_ext = NULL;
5772
5773 spin_unlock(&o->o_requests_lock);
5774 return 1;
5775 }
5776
5777 #ifdef __BIG_ENDIAN
convert_extent_map(struct ceph_sparse_read * sr)5778 static inline void convert_extent_map(struct ceph_sparse_read *sr)
5779 {
5780 int i;
5781
5782 for (i = 0; i < sr->sr_count; i++) {
5783 struct ceph_sparse_extent *ext = &sr->sr_extent[i];
5784
5785 ext->off = le64_to_cpu((__force __le64)ext->off);
5786 ext->len = le64_to_cpu((__force __le64)ext->len);
5787 }
5788 }
5789 #else
convert_extent_map(struct ceph_sparse_read * sr)5790 static inline void convert_extent_map(struct ceph_sparse_read *sr)
5791 {
5792 }
5793 #endif
5794
osd_sparse_read(struct ceph_connection * con,struct ceph_msg_data_cursor * cursor,char ** pbuf)5795 static int osd_sparse_read(struct ceph_connection *con,
5796 struct ceph_msg_data_cursor *cursor,
5797 char **pbuf)
5798 {
5799 struct ceph_osd *o = con->private;
5800 struct ceph_sparse_read *sr = &o->o_sparse_read;
5801 u32 count = sr->sr_count;
5802 u64 eoff, elen, len = 0;
5803 int i, ret;
5804
5805 switch (sr->sr_state) {
5806 case CEPH_SPARSE_READ_HDR:
5807 next_op:
5808 ret = prep_next_sparse_read(con, cursor);
5809 if (ret <= 0)
5810 return ret;
5811
5812 /* number of extents */
5813 ret = sizeof(sr->sr_count);
5814 *pbuf = (char *)&sr->sr_count;
5815 sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
5816 break;
5817 case CEPH_SPARSE_READ_EXTENTS:
5818 /* Convert sr_count to host-endian */
5819 count = le32_to_cpu((__force __le32)sr->sr_count);
5820 sr->sr_count = count;
5821 dout("[%d] got %u extents\n", o->o_osd, count);
5822
5823 if (count > 0) {
5824 if (!sr->sr_extent || count > sr->sr_ext_len) {
5825 /* no extent array provided, or too short */
5826 kfree(sr->sr_extent);
5827 sr->sr_extent = kmalloc_array(count,
5828 sizeof(*sr->sr_extent),
5829 GFP_NOIO);
5830 if (!sr->sr_extent) {
5831 pr_err("%s: failed to allocate %u extents\n",
5832 __func__, count);
5833 return -ENOMEM;
5834 }
5835 sr->sr_ext_len = count;
5836 }
5837 ret = count * sizeof(*sr->sr_extent);
5838 *pbuf = (char *)sr->sr_extent;
5839 sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
5840 break;
5841 }
5842 /* No extents? Read data len */
5843 fallthrough;
5844 case CEPH_SPARSE_READ_DATA_LEN:
5845 convert_extent_map(sr);
5846 ret = sizeof(sr->sr_datalen);
5847 *pbuf = (char *)&sr->sr_datalen;
5848 sr->sr_state = CEPH_SPARSE_READ_DATA_PRE;
5849 break;
5850 case CEPH_SPARSE_READ_DATA_PRE:
5851 /* Convert sr_datalen to host-endian */
5852 sr->sr_datalen = le32_to_cpu((__force __le32)sr->sr_datalen);
5853 for (i = 0; i < count; i++)
5854 len += sr->sr_extent[i].len;
5855 if (sr->sr_datalen != len) {
5856 pr_warn_ratelimited("data len %u != extent len %llu\n",
5857 sr->sr_datalen, len);
5858 return -EREMOTEIO;
5859 }
5860 sr->sr_state = CEPH_SPARSE_READ_DATA;
5861 fallthrough;
5862 case CEPH_SPARSE_READ_DATA:
5863 if (sr->sr_index >= count) {
5864 sr->sr_state = CEPH_SPARSE_READ_HDR;
5865 goto next_op;
5866 }
5867
5868 eoff = sr->sr_extent[sr->sr_index].off;
5869 elen = sr->sr_extent[sr->sr_index].len;
5870
5871 dout("[%d] ext %d off 0x%llx len 0x%llx\n",
5872 o->o_osd, sr->sr_index, eoff, elen);
5873
5874 if (elen > INT_MAX) {
5875 dout("Sparse read extent length too long (0x%llx)\n",
5876 elen);
5877 return -EREMOTEIO;
5878 }
5879
5880 /* zero out anything from sr_pos to start of extent */
5881 if (sr->sr_pos < eoff)
5882 advance_cursor(cursor, eoff - sr->sr_pos, true);
5883
5884 /* Set position to end of extent */
5885 sr->sr_pos = eoff + elen;
5886
5887 /* send back the new length and nullify the ptr */
5888 cursor->sr_resid = elen;
5889 ret = elen;
5890 *pbuf = NULL;
5891
5892 /* Bump the array index */
5893 ++sr->sr_index;
5894 break;
5895 }
5896 return ret;
5897 }
5898
5899 static const struct ceph_connection_operations osd_con_ops = {
5900 .get = osd_get_con,
5901 .put = osd_put_con,
5902 .sparse_read = osd_sparse_read,
5903 .alloc_msg = osd_alloc_msg,
5904 .dispatch = osd_dispatch,
5905 .fault = osd_fault,
5906 .reencode_message = osd_reencode_message,
5907 .get_authorizer = osd_get_authorizer,
5908 .add_authorizer_challenge = osd_add_authorizer_challenge,
5909 .verify_authorizer_reply = osd_verify_authorizer_reply,
5910 .invalidate_authorizer = osd_invalidate_authorizer,
5911 .sign_message = osd_sign_message,
5912 .check_message_signature = osd_check_message_signature,
5913 .get_auth_request = osd_get_auth_request,
5914 .handle_auth_reply_more = osd_handle_auth_reply_more,
5915 .handle_auth_done = osd_handle_auth_done,
5916 .handle_auth_bad_method = osd_handle_auth_bad_method,
5917 };
5918