xref: /linux/fs/nfs/flexfilelayout/flexfilelayout.c (revision 005438a8eef063495ac059d128eea71b58de50e5)
1 /*
2  * Module for pnfs flexfile layout driver.
3  *
4  * Copyright (c) 2014, Primary Data, Inc. All rights reserved.
5  *
6  * Tao Peng <bergwolf@primarydata.com>
7  */
8 
9 #include <linux/nfs_fs.h>
10 #include <linux/nfs_page.h>
11 #include <linux/module.h>
12 
13 #include <linux/sunrpc/metrics.h>
14 
15 #include "flexfilelayout.h"
16 #include "../nfs4session.h"
17 #include "../nfs4idmap.h"
18 #include "../internal.h"
19 #include "../delegation.h"
20 #include "../nfs4trace.h"
21 #include "../iostat.h"
22 #include "../nfs.h"
23 #include "../nfs42.h"
24 
25 #define NFSDBG_FACILITY         NFSDBG_PNFS_LD
26 
27 #define FF_LAYOUT_POLL_RETRY_MAX     (15*HZ)
28 
29 static struct pnfs_layout_hdr *
30 ff_layout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
31 {
32 	struct nfs4_flexfile_layout *ffl;
33 
34 	ffl = kzalloc(sizeof(*ffl), gfp_flags);
35 	if (ffl) {
36 		INIT_LIST_HEAD(&ffl->error_list);
37 		return &ffl->generic_hdr;
38 	} else
39 		return NULL;
40 }
41 
42 static void
43 ff_layout_free_layout_hdr(struct pnfs_layout_hdr *lo)
44 {
45 	struct nfs4_ff_layout_ds_err *err, *n;
46 
47 	list_for_each_entry_safe(err, n, &FF_LAYOUT_FROM_HDR(lo)->error_list,
48 				 list) {
49 		list_del(&err->list);
50 		kfree(err);
51 	}
52 	kfree(FF_LAYOUT_FROM_HDR(lo));
53 }
54 
55 static int decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
56 {
57 	__be32 *p;
58 
59 	p = xdr_inline_decode(xdr, NFS4_STATEID_SIZE);
60 	if (unlikely(p == NULL))
61 		return -ENOBUFS;
62 	memcpy(stateid, p, NFS4_STATEID_SIZE);
63 	dprintk("%s: stateid id= [%x%x%x%x]\n", __func__,
64 		p[0], p[1], p[2], p[3]);
65 	return 0;
66 }
67 
68 static int decode_deviceid(struct xdr_stream *xdr, struct nfs4_deviceid *devid)
69 {
70 	__be32 *p;
71 
72 	p = xdr_inline_decode(xdr, NFS4_DEVICEID4_SIZE);
73 	if (unlikely(!p))
74 		return -ENOBUFS;
75 	memcpy(devid, p, NFS4_DEVICEID4_SIZE);
76 	nfs4_print_deviceid(devid);
77 	return 0;
78 }
79 
80 static int decode_nfs_fh(struct xdr_stream *xdr, struct nfs_fh *fh)
81 {
82 	__be32 *p;
83 
84 	p = xdr_inline_decode(xdr, 4);
85 	if (unlikely(!p))
86 		return -ENOBUFS;
87 	fh->size = be32_to_cpup(p++);
88 	if (fh->size > sizeof(struct nfs_fh)) {
89 		printk(KERN_ERR "NFS flexfiles: Too big fh received %d\n",
90 		       fh->size);
91 		return -EOVERFLOW;
92 	}
93 	/* fh.data */
94 	p = xdr_inline_decode(xdr, fh->size);
95 	if (unlikely(!p))
96 		return -ENOBUFS;
97 	memcpy(&fh->data, p, fh->size);
98 	dprintk("%s: fh len %d\n", __func__, fh->size);
99 
100 	return 0;
101 }
102 
103 /*
104  * Currently only stringified uids and gids are accepted.
105  * I.e., kerberos is not supported to the DSes, so no pricipals.
106  *
107  * That means that one common function will suffice, but when
108  * principals are added, this should be split to accomodate
109  * calls to both nfs_map_name_to_uid() and nfs_map_group_to_gid().
110  */
111 static int
112 decode_name(struct xdr_stream *xdr, u32 *id)
113 {
114 	__be32 *p;
115 	int len;
116 
117 	/* opaque_length(4)*/
118 	p = xdr_inline_decode(xdr, 4);
119 	if (unlikely(!p))
120 		return -ENOBUFS;
121 	len = be32_to_cpup(p++);
122 	if (len < 0)
123 		return -EINVAL;
124 
125 	dprintk("%s: len %u\n", __func__, len);
126 
127 	/* opaque body */
128 	p = xdr_inline_decode(xdr, len);
129 	if (unlikely(!p))
130 		return -ENOBUFS;
131 
132 	if (!nfs_map_string_to_numeric((char *)p, len, id))
133 		return -EINVAL;
134 
135 	return 0;
136 }
137 
138 static void ff_layout_free_mirror_array(struct nfs4_ff_layout_segment *fls)
139 {
140 	int i;
141 
142 	if (fls->mirror_array) {
143 		for (i = 0; i < fls->mirror_array_cnt; i++) {
144 			/* normally mirror_ds is freed in
145 			 * .free_deviceid_node but we still do it here
146 			 * for .alloc_lseg error path */
147 			if (fls->mirror_array[i]) {
148 				kfree(fls->mirror_array[i]->fh_versions);
149 				nfs4_ff_layout_put_deviceid(fls->mirror_array[i]->mirror_ds);
150 				kfree(fls->mirror_array[i]);
151 			}
152 		}
153 		kfree(fls->mirror_array);
154 		fls->mirror_array = NULL;
155 	}
156 }
157 
158 static int ff_layout_check_layout(struct nfs4_layoutget_res *lgr)
159 {
160 	int ret = 0;
161 
162 	dprintk("--> %s\n", __func__);
163 
164 	/* FIXME: remove this check when layout segment support is added */
165 	if (lgr->range.offset != 0 ||
166 	    lgr->range.length != NFS4_MAX_UINT64) {
167 		dprintk("%s Only whole file layouts supported. Use MDS i/o\n",
168 			__func__);
169 		ret = -EINVAL;
170 	}
171 
172 	dprintk("--> %s returns %d\n", __func__, ret);
173 	return ret;
174 }
175 
176 static void _ff_layout_free_lseg(struct nfs4_ff_layout_segment *fls)
177 {
178 	if (fls) {
179 		ff_layout_free_mirror_array(fls);
180 		kfree(fls);
181 	}
182 }
183 
184 static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls)
185 {
186 	int i, j;
187 
188 	for (i = 0; i < fls->mirror_array_cnt - 1; i++) {
189 		for (j = i + 1; j < fls->mirror_array_cnt; j++)
190 			if (fls->mirror_array[i]->efficiency <
191 			    fls->mirror_array[j]->efficiency)
192 				swap(fls->mirror_array[i],
193 				     fls->mirror_array[j]);
194 	}
195 }
196 
197 static struct pnfs_layout_segment *
198 ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
199 		     struct nfs4_layoutget_res *lgr,
200 		     gfp_t gfp_flags)
201 {
202 	struct pnfs_layout_segment *ret;
203 	struct nfs4_ff_layout_segment *fls = NULL;
204 	struct xdr_stream stream;
205 	struct xdr_buf buf;
206 	struct page *scratch;
207 	u64 stripe_unit;
208 	u32 mirror_array_cnt;
209 	__be32 *p;
210 	int i, rc;
211 
212 	dprintk("--> %s\n", __func__);
213 	scratch = alloc_page(gfp_flags);
214 	if (!scratch)
215 		return ERR_PTR(-ENOMEM);
216 
217 	xdr_init_decode_pages(&stream, &buf, lgr->layoutp->pages,
218 			      lgr->layoutp->len);
219 	xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
220 
221 	/* stripe unit and mirror_array_cnt */
222 	rc = -EIO;
223 	p = xdr_inline_decode(&stream, 8 + 4);
224 	if (!p)
225 		goto out_err_free;
226 
227 	p = xdr_decode_hyper(p, &stripe_unit);
228 	mirror_array_cnt = be32_to_cpup(p++);
229 	dprintk("%s: stripe_unit=%llu mirror_array_cnt=%u\n", __func__,
230 		stripe_unit, mirror_array_cnt);
231 
232 	if (mirror_array_cnt > NFS4_FLEXFILE_LAYOUT_MAX_MIRROR_CNT ||
233 	    mirror_array_cnt == 0)
234 		goto out_err_free;
235 
236 	rc = -ENOMEM;
237 	fls = kzalloc(sizeof(*fls), gfp_flags);
238 	if (!fls)
239 		goto out_err_free;
240 
241 	fls->mirror_array_cnt = mirror_array_cnt;
242 	fls->stripe_unit = stripe_unit;
243 	fls->mirror_array = kcalloc(fls->mirror_array_cnt,
244 				    sizeof(fls->mirror_array[0]), gfp_flags);
245 	if (fls->mirror_array == NULL)
246 		goto out_err_free;
247 
248 	for (i = 0; i < fls->mirror_array_cnt; i++) {
249 		struct nfs4_deviceid devid;
250 		struct nfs4_deviceid_node *idnode;
251 		u32 ds_count;
252 		u32 fh_count;
253 		int j;
254 
255 		rc = -EIO;
256 		p = xdr_inline_decode(&stream, 4);
257 		if (!p)
258 			goto out_err_free;
259 		ds_count = be32_to_cpup(p);
260 
261 		/* FIXME: allow for striping? */
262 		if (ds_count != 1)
263 			goto out_err_free;
264 
265 		fls->mirror_array[i] =
266 			kzalloc(sizeof(struct nfs4_ff_layout_mirror),
267 				gfp_flags);
268 		if (fls->mirror_array[i] == NULL) {
269 			rc = -ENOMEM;
270 			goto out_err_free;
271 		}
272 
273 		spin_lock_init(&fls->mirror_array[i]->lock);
274 		fls->mirror_array[i]->ds_count = ds_count;
275 		fls->mirror_array[i]->lseg = &fls->generic_hdr;
276 
277 		/* deviceid */
278 		rc = decode_deviceid(&stream, &devid);
279 		if (rc)
280 			goto out_err_free;
281 
282 		idnode = nfs4_find_get_deviceid(NFS_SERVER(lh->plh_inode),
283 						&devid, lh->plh_lc_cred,
284 						gfp_flags);
285 		/*
286 		 * upon success, mirror_ds is allocated by previous
287 		 * getdeviceinfo, or newly by .alloc_deviceid_node
288 		 * nfs4_find_get_deviceid failure is indeed getdeviceinfo falure
289 		 */
290 		if (idnode)
291 			fls->mirror_array[i]->mirror_ds =
292 				FF_LAYOUT_MIRROR_DS(idnode);
293 		else
294 			goto out_err_free;
295 
296 		/* efficiency */
297 		rc = -EIO;
298 		p = xdr_inline_decode(&stream, 4);
299 		if (!p)
300 			goto out_err_free;
301 		fls->mirror_array[i]->efficiency = be32_to_cpup(p);
302 
303 		/* stateid */
304 		rc = decode_stateid(&stream, &fls->mirror_array[i]->stateid);
305 		if (rc)
306 			goto out_err_free;
307 
308 		/* fh */
309 		p = xdr_inline_decode(&stream, 4);
310 		if (!p)
311 			goto out_err_free;
312 		fh_count = be32_to_cpup(p);
313 
314 		fls->mirror_array[i]->fh_versions =
315 			kzalloc(fh_count * sizeof(struct nfs_fh),
316 				gfp_flags);
317 		if (fls->mirror_array[i]->fh_versions == NULL) {
318 			rc = -ENOMEM;
319 			goto out_err_free;
320 		}
321 
322 		for (j = 0; j < fh_count; j++) {
323 			rc = decode_nfs_fh(&stream,
324 					   &fls->mirror_array[i]->fh_versions[j]);
325 			if (rc)
326 				goto out_err_free;
327 		}
328 
329 		fls->mirror_array[i]->fh_versions_cnt = fh_count;
330 
331 		/* user */
332 		rc = decode_name(&stream, &fls->mirror_array[i]->uid);
333 		if (rc)
334 			goto out_err_free;
335 
336 		/* group */
337 		rc = decode_name(&stream, &fls->mirror_array[i]->gid);
338 		if (rc)
339 			goto out_err_free;
340 
341 		dprintk("%s: uid %d gid %d\n", __func__,
342 			fls->mirror_array[i]->uid,
343 			fls->mirror_array[i]->gid);
344 	}
345 
346 	p = xdr_inline_decode(&stream, 4);
347 	if (p)
348 		fls->flags = be32_to_cpup(p);
349 
350 	ff_layout_sort_mirrors(fls);
351 	rc = ff_layout_check_layout(lgr);
352 	if (rc)
353 		goto out_err_free;
354 
355 	ret = &fls->generic_hdr;
356 	dprintk("<-- %s (success)\n", __func__);
357 out_free_page:
358 	__free_page(scratch);
359 	return ret;
360 out_err_free:
361 	_ff_layout_free_lseg(fls);
362 	ret = ERR_PTR(rc);
363 	dprintk("<-- %s (%d)\n", __func__, rc);
364 	goto out_free_page;
365 }
366 
367 static bool ff_layout_has_rw_segments(struct pnfs_layout_hdr *layout)
368 {
369 	struct pnfs_layout_segment *lseg;
370 
371 	list_for_each_entry(lseg, &layout->plh_segs, pls_list)
372 		if (lseg->pls_range.iomode == IOMODE_RW)
373 			return true;
374 
375 	return false;
376 }
377 
378 static void
379 ff_layout_free_lseg(struct pnfs_layout_segment *lseg)
380 {
381 	struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
382 	int i;
383 
384 	dprintk("--> %s\n", __func__);
385 
386 	for (i = 0; i < fls->mirror_array_cnt; i++) {
387 		if (fls->mirror_array[i]) {
388 			nfs4_ff_layout_put_deviceid(fls->mirror_array[i]->mirror_ds);
389 			fls->mirror_array[i]->mirror_ds = NULL;
390 			if (fls->mirror_array[i]->cred) {
391 				put_rpccred(fls->mirror_array[i]->cred);
392 				fls->mirror_array[i]->cred = NULL;
393 			}
394 		}
395 	}
396 
397 	if (lseg->pls_range.iomode == IOMODE_RW) {
398 		struct nfs4_flexfile_layout *ffl;
399 		struct inode *inode;
400 
401 		ffl = FF_LAYOUT_FROM_HDR(lseg->pls_layout);
402 		inode = ffl->generic_hdr.plh_inode;
403 		spin_lock(&inode->i_lock);
404 		if (!ff_layout_has_rw_segments(lseg->pls_layout)) {
405 			ffl->commit_info.nbuckets = 0;
406 			kfree(ffl->commit_info.buckets);
407 			ffl->commit_info.buckets = NULL;
408 		}
409 		spin_unlock(&inode->i_lock);
410 	}
411 	_ff_layout_free_lseg(fls);
412 }
413 
414 /* Return 1 until we have multiple lsegs support */
415 static int
416 ff_layout_get_lseg_count(struct nfs4_ff_layout_segment *fls)
417 {
418 	return 1;
419 }
420 
421 static void
422 nfs4_ff_start_busy_timer(struct nfs4_ff_busy_timer *timer)
423 {
424 	/* first IO request? */
425 	if (atomic_inc_return(&timer->n_ops) == 1) {
426 		timer->start_time = ktime_get();
427 	}
428 }
429 
430 static ktime_t
431 nfs4_ff_end_busy_timer(struct nfs4_ff_busy_timer *timer)
432 {
433 	ktime_t start, now;
434 
435 	if (atomic_dec_return(&timer->n_ops) < 0)
436 		WARN_ON_ONCE(1);
437 
438 	now = ktime_get();
439 	start = timer->start_time;
440 	timer->start_time = now;
441 	return ktime_sub(now, start);
442 }
443 
444 static ktime_t
445 nfs4_ff_layout_calc_completion_time(struct rpc_task *task)
446 {
447 	return ktime_sub(ktime_get(), task->tk_start);
448 }
449 
450 static bool
451 nfs4_ff_layoutstat_start_io(struct nfs4_ff_layout_mirror *mirror,
452 			    struct nfs4_ff_layoutstat *layoutstat)
453 {
454 	static const ktime_t notime = {0};
455 	ktime_t now = ktime_get();
456 
457 	nfs4_ff_start_busy_timer(&layoutstat->busy_timer);
458 	if (ktime_equal(mirror->start_time, notime))
459 		mirror->start_time = now;
460 	if (ktime_equal(mirror->last_report_time, notime))
461 		mirror->last_report_time = now;
462 	if (ktime_to_ms(ktime_sub(now, mirror->last_report_time)) >=
463 			FF_LAYOUTSTATS_REPORT_INTERVAL) {
464 		mirror->last_report_time = now;
465 		return true;
466 	}
467 
468 	return false;
469 }
470 
471 static void
472 nfs4_ff_layout_stat_io_update_requested(struct nfs4_ff_layoutstat *layoutstat,
473 		__u64 requested)
474 {
475 	struct nfs4_ff_io_stat *iostat = &layoutstat->io_stat;
476 
477 	iostat->ops_requested++;
478 	iostat->bytes_requested += requested;
479 }
480 
481 static void
482 nfs4_ff_layout_stat_io_update_completed(struct nfs4_ff_layoutstat *layoutstat,
483 		__u64 requested,
484 		__u64 completed,
485 		ktime_t time_completed)
486 {
487 	struct nfs4_ff_io_stat *iostat = &layoutstat->io_stat;
488 	ktime_t timer;
489 
490 	iostat->ops_completed++;
491 	iostat->bytes_completed += completed;
492 	iostat->bytes_not_delivered += requested - completed;
493 
494 	timer = nfs4_ff_end_busy_timer(&layoutstat->busy_timer);
495 	iostat->total_busy_time =
496 			ktime_add(iostat->total_busy_time, timer);
497 	iostat->aggregate_completion_time =
498 			ktime_add(iostat->aggregate_completion_time, time_completed);
499 }
500 
501 static void
502 nfs4_ff_layout_stat_io_start_read(struct nfs4_ff_layout_mirror *mirror,
503 		__u64 requested)
504 {
505 	bool report;
506 
507 	spin_lock(&mirror->lock);
508 	report = nfs4_ff_layoutstat_start_io(mirror, &mirror->read_stat);
509 	nfs4_ff_layout_stat_io_update_requested(&mirror->read_stat, requested);
510 	spin_unlock(&mirror->lock);
511 
512 	if (report)
513 		pnfs_report_layoutstat(mirror->lseg->pls_layout->plh_inode);
514 }
515 
516 static void
517 nfs4_ff_layout_stat_io_end_read(struct rpc_task *task,
518 		struct nfs4_ff_layout_mirror *mirror,
519 		__u64 requested,
520 		__u64 completed)
521 {
522 	spin_lock(&mirror->lock);
523 	nfs4_ff_layout_stat_io_update_completed(&mirror->read_stat,
524 			requested, completed,
525 			nfs4_ff_layout_calc_completion_time(task));
526 	spin_unlock(&mirror->lock);
527 }
528 
529 static void
530 nfs4_ff_layout_stat_io_start_write(struct nfs4_ff_layout_mirror *mirror,
531 		__u64 requested)
532 {
533 	bool report;
534 
535 	spin_lock(&mirror->lock);
536 	report = nfs4_ff_layoutstat_start_io(mirror , &mirror->write_stat);
537 	nfs4_ff_layout_stat_io_update_requested(&mirror->write_stat, requested);
538 	spin_unlock(&mirror->lock);
539 
540 	if (report)
541 		pnfs_report_layoutstat(mirror->lseg->pls_layout->plh_inode);
542 }
543 
544 static void
545 nfs4_ff_layout_stat_io_end_write(struct rpc_task *task,
546 		struct nfs4_ff_layout_mirror *mirror,
547 		__u64 requested,
548 		__u64 completed,
549 		enum nfs3_stable_how committed)
550 {
551 	if (committed == NFS_UNSTABLE)
552 		requested = completed = 0;
553 
554 	spin_lock(&mirror->lock);
555 	nfs4_ff_layout_stat_io_update_completed(&mirror->write_stat,
556 			requested, completed,
557 			nfs4_ff_layout_calc_completion_time(task));
558 	spin_unlock(&mirror->lock);
559 }
560 
561 static int
562 ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
563 			    struct nfs_commit_info *cinfo,
564 			    gfp_t gfp_flags)
565 {
566 	struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
567 	struct pnfs_commit_bucket *buckets;
568 	int size;
569 
570 	if (cinfo->ds->nbuckets != 0) {
571 		/* This assumes there is only one RW lseg per file.
572 		 * To support multiple lseg per file, we need to
573 		 * change struct pnfs_commit_bucket to allow dynamic
574 		 * increasing nbuckets.
575 		 */
576 		return 0;
577 	}
578 
579 	size = ff_layout_get_lseg_count(fls) * FF_LAYOUT_MIRROR_COUNT(lseg);
580 
581 	buckets = kcalloc(size, sizeof(struct pnfs_commit_bucket),
582 			  gfp_flags);
583 	if (!buckets)
584 		return -ENOMEM;
585 	else {
586 		int i;
587 
588 		spin_lock(cinfo->lock);
589 		if (cinfo->ds->nbuckets != 0)
590 			kfree(buckets);
591 		else {
592 			cinfo->ds->buckets = buckets;
593 			cinfo->ds->nbuckets = size;
594 			for (i = 0; i < size; i++) {
595 				INIT_LIST_HEAD(&buckets[i].written);
596 				INIT_LIST_HEAD(&buckets[i].committing);
597 				/* mark direct verifier as unset */
598 				buckets[i].direct_verf.committed =
599 					NFS_INVALID_STABLE_HOW;
600 			}
601 		}
602 		spin_unlock(cinfo->lock);
603 		return 0;
604 	}
605 }
606 
607 static struct nfs4_pnfs_ds *
608 ff_layout_choose_best_ds_for_read(struct nfs_pageio_descriptor *pgio,
609 				  int *best_idx)
610 {
611 	struct nfs4_ff_layout_segment *fls;
612 	struct nfs4_pnfs_ds *ds;
613 	int idx;
614 
615 	fls = FF_LAYOUT_LSEG(pgio->pg_lseg);
616 	/* mirrors are sorted by efficiency */
617 	for (idx = 0; idx < fls->mirror_array_cnt; idx++) {
618 		ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, idx, false);
619 		if (ds) {
620 			*best_idx = idx;
621 			return ds;
622 		}
623 	}
624 
625 	return NULL;
626 }
627 
628 static void
629 ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
630 			struct nfs_page *req)
631 {
632 	struct nfs_pgio_mirror *pgm;
633 	struct nfs4_ff_layout_mirror *mirror;
634 	struct nfs4_pnfs_ds *ds;
635 	int ds_idx;
636 
637 	/* Use full layout for now */
638 	if (!pgio->pg_lseg)
639 		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
640 						   req->wb_context,
641 						   0,
642 						   NFS4_MAX_UINT64,
643 						   IOMODE_READ,
644 						   GFP_KERNEL);
645 	/* If no lseg, fall back to read through mds */
646 	if (pgio->pg_lseg == NULL)
647 		goto out_mds;
648 
649 	ds = ff_layout_choose_best_ds_for_read(pgio, &ds_idx);
650 	if (!ds)
651 		goto out_mds;
652 	mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx);
653 
654 	pgio->pg_mirror_idx = ds_idx;
655 
656 	/* read always uses only one mirror - idx 0 for pgio layer */
657 	pgm = &pgio->pg_mirrors[0];
658 	pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].rsize;
659 
660 	return;
661 out_mds:
662 	pnfs_put_lseg(pgio->pg_lseg);
663 	pgio->pg_lseg = NULL;
664 	nfs_pageio_reset_read_mds(pgio);
665 }
666 
667 static void
668 ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio,
669 			struct nfs_page *req)
670 {
671 	struct nfs4_ff_layout_mirror *mirror;
672 	struct nfs_pgio_mirror *pgm;
673 	struct nfs_commit_info cinfo;
674 	struct nfs4_pnfs_ds *ds;
675 	int i;
676 	int status;
677 
678 	if (!pgio->pg_lseg)
679 		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
680 						   req->wb_context,
681 						   0,
682 						   NFS4_MAX_UINT64,
683 						   IOMODE_RW,
684 						   GFP_NOFS);
685 	/* If no lseg, fall back to write through mds */
686 	if (pgio->pg_lseg == NULL)
687 		goto out_mds;
688 
689 	nfs_init_cinfo(&cinfo, pgio->pg_inode, pgio->pg_dreq);
690 	status = ff_layout_alloc_commit_info(pgio->pg_lseg, &cinfo, GFP_NOFS);
691 	if (status < 0)
692 		goto out_mds;
693 
694 	/* Use a direct mapping of ds_idx to pgio mirror_idx */
695 	if (WARN_ON_ONCE(pgio->pg_mirror_count !=
696 	    FF_LAYOUT_MIRROR_COUNT(pgio->pg_lseg)))
697 		goto out_mds;
698 
699 	for (i = 0; i < pgio->pg_mirror_count; i++) {
700 		ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, i, true);
701 		if (!ds)
702 			goto out_mds;
703 		pgm = &pgio->pg_mirrors[i];
704 		mirror = FF_LAYOUT_COMP(pgio->pg_lseg, i);
705 		pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].wsize;
706 	}
707 
708 	return;
709 
710 out_mds:
711 	pnfs_put_lseg(pgio->pg_lseg);
712 	pgio->pg_lseg = NULL;
713 	nfs_pageio_reset_write_mds(pgio);
714 }
715 
716 static unsigned int
717 ff_layout_pg_get_mirror_count_write(struct nfs_pageio_descriptor *pgio,
718 				    struct nfs_page *req)
719 {
720 	if (!pgio->pg_lseg)
721 		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
722 						   req->wb_context,
723 						   0,
724 						   NFS4_MAX_UINT64,
725 						   IOMODE_RW,
726 						   GFP_NOFS);
727 	if (pgio->pg_lseg)
728 		return FF_LAYOUT_MIRROR_COUNT(pgio->pg_lseg);
729 
730 	/* no lseg means that pnfs is not in use, so no mirroring here */
731 	pnfs_put_lseg(pgio->pg_lseg);
732 	pgio->pg_lseg = NULL;
733 	nfs_pageio_reset_write_mds(pgio);
734 	return 1;
735 }
736 
737 static const struct nfs_pageio_ops ff_layout_pg_read_ops = {
738 	.pg_init = ff_layout_pg_init_read,
739 	.pg_test = pnfs_generic_pg_test,
740 	.pg_doio = pnfs_generic_pg_readpages,
741 	.pg_cleanup = pnfs_generic_pg_cleanup,
742 };
743 
744 static const struct nfs_pageio_ops ff_layout_pg_write_ops = {
745 	.pg_init = ff_layout_pg_init_write,
746 	.pg_test = pnfs_generic_pg_test,
747 	.pg_doio = pnfs_generic_pg_writepages,
748 	.pg_get_mirror_count = ff_layout_pg_get_mirror_count_write,
749 	.pg_cleanup = pnfs_generic_pg_cleanup,
750 };
751 
752 static void ff_layout_reset_write(struct nfs_pgio_header *hdr, bool retry_pnfs)
753 {
754 	struct rpc_task *task = &hdr->task;
755 
756 	pnfs_layoutcommit_inode(hdr->inode, false);
757 
758 	if (retry_pnfs) {
759 		dprintk("%s Reset task %5u for i/o through pNFS "
760 			"(req %s/%llu, %u bytes @ offset %llu)\n", __func__,
761 			hdr->task.tk_pid,
762 			hdr->inode->i_sb->s_id,
763 			(unsigned long long)NFS_FILEID(hdr->inode),
764 			hdr->args.count,
765 			(unsigned long long)hdr->args.offset);
766 
767 		if (!hdr->dreq) {
768 			struct nfs_open_context *ctx;
769 
770 			ctx = nfs_list_entry(hdr->pages.next)->wb_context;
771 			set_bit(NFS_CONTEXT_RESEND_WRITES, &ctx->flags);
772 			hdr->completion_ops->error_cleanup(&hdr->pages);
773 		} else {
774 			nfs_direct_set_resched_writes(hdr->dreq);
775 			/* fake unstable write to let common nfs resend pages */
776 			hdr->verf.committed = NFS_UNSTABLE;
777 			hdr->good_bytes = hdr->args.count;
778 		}
779 		return;
780 	}
781 
782 	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
783 		dprintk("%s Reset task %5u for i/o through MDS "
784 			"(req %s/%llu, %u bytes @ offset %llu)\n", __func__,
785 			hdr->task.tk_pid,
786 			hdr->inode->i_sb->s_id,
787 			(unsigned long long)NFS_FILEID(hdr->inode),
788 			hdr->args.count,
789 			(unsigned long long)hdr->args.offset);
790 
791 		task->tk_status = pnfs_write_done_resend_to_mds(hdr);
792 	}
793 }
794 
795 static void ff_layout_reset_read(struct nfs_pgio_header *hdr)
796 {
797 	struct rpc_task *task = &hdr->task;
798 
799 	pnfs_layoutcommit_inode(hdr->inode, false);
800 
801 	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
802 		dprintk("%s Reset task %5u for i/o through MDS "
803 			"(req %s/%llu, %u bytes @ offset %llu)\n", __func__,
804 			hdr->task.tk_pid,
805 			hdr->inode->i_sb->s_id,
806 			(unsigned long long)NFS_FILEID(hdr->inode),
807 			hdr->args.count,
808 			(unsigned long long)hdr->args.offset);
809 
810 		task->tk_status = pnfs_read_done_resend_to_mds(hdr);
811 	}
812 }
813 
814 static int ff_layout_async_handle_error_v4(struct rpc_task *task,
815 					   struct nfs4_state *state,
816 					   struct nfs_client *clp,
817 					   struct pnfs_layout_segment *lseg,
818 					   int idx)
819 {
820 	struct pnfs_layout_hdr *lo = lseg->pls_layout;
821 	struct inode *inode = lo->plh_inode;
822 	struct nfs_server *mds_server = NFS_SERVER(inode);
823 
824 	struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
825 	struct nfs_client *mds_client = mds_server->nfs_client;
826 	struct nfs4_slot_table *tbl = &clp->cl_session->fc_slot_table;
827 
828 	if (task->tk_status >= 0)
829 		return 0;
830 
831 	switch (task->tk_status) {
832 	/* MDS state errors */
833 	case -NFS4ERR_DELEG_REVOKED:
834 	case -NFS4ERR_ADMIN_REVOKED:
835 	case -NFS4ERR_BAD_STATEID:
836 		if (state == NULL)
837 			break;
838 		nfs_remove_bad_delegation(state->inode);
839 	case -NFS4ERR_OPENMODE:
840 		if (state == NULL)
841 			break;
842 		if (nfs4_schedule_stateid_recovery(mds_server, state) < 0)
843 			goto out_bad_stateid;
844 		goto wait_on_recovery;
845 	case -NFS4ERR_EXPIRED:
846 		if (state != NULL) {
847 			if (nfs4_schedule_stateid_recovery(mds_server, state) < 0)
848 				goto out_bad_stateid;
849 		}
850 		nfs4_schedule_lease_recovery(mds_client);
851 		goto wait_on_recovery;
852 	/* DS session errors */
853 	case -NFS4ERR_BADSESSION:
854 	case -NFS4ERR_BADSLOT:
855 	case -NFS4ERR_BAD_HIGH_SLOT:
856 	case -NFS4ERR_DEADSESSION:
857 	case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION:
858 	case -NFS4ERR_SEQ_FALSE_RETRY:
859 	case -NFS4ERR_SEQ_MISORDERED:
860 		dprintk("%s ERROR %d, Reset session. Exchangeid "
861 			"flags 0x%x\n", __func__, task->tk_status,
862 			clp->cl_exchange_flags);
863 		nfs4_schedule_session_recovery(clp->cl_session, task->tk_status);
864 		break;
865 	case -NFS4ERR_DELAY:
866 	case -NFS4ERR_GRACE:
867 		rpc_delay(task, FF_LAYOUT_POLL_RETRY_MAX);
868 		break;
869 	case -NFS4ERR_RETRY_UNCACHED_REP:
870 		break;
871 	/* Invalidate Layout errors */
872 	case -NFS4ERR_PNFS_NO_LAYOUT:
873 	case -ESTALE:           /* mapped NFS4ERR_STALE */
874 	case -EBADHANDLE:       /* mapped NFS4ERR_BADHANDLE */
875 	case -EISDIR:           /* mapped NFS4ERR_ISDIR */
876 	case -NFS4ERR_FHEXPIRED:
877 	case -NFS4ERR_WRONG_TYPE:
878 		dprintk("%s Invalid layout error %d\n", __func__,
879 			task->tk_status);
880 		/*
881 		 * Destroy layout so new i/o will get a new layout.
882 		 * Layout will not be destroyed until all current lseg
883 		 * references are put. Mark layout as invalid to resend failed
884 		 * i/o and all i/o waiting on the slot table to the MDS until
885 		 * layout is destroyed and a new valid layout is obtained.
886 		 */
887 		pnfs_destroy_layout(NFS_I(inode));
888 		rpc_wake_up(&tbl->slot_tbl_waitq);
889 		goto reset;
890 	/* RPC connection errors */
891 	case -ECONNREFUSED:
892 	case -EHOSTDOWN:
893 	case -EHOSTUNREACH:
894 	case -ENETUNREACH:
895 	case -EIO:
896 	case -ETIMEDOUT:
897 	case -EPIPE:
898 		dprintk("%s DS connection error %d\n", __func__,
899 			task->tk_status);
900 		nfs4_mark_deviceid_unavailable(devid);
901 		rpc_wake_up(&tbl->slot_tbl_waitq);
902 		/* fall through */
903 	default:
904 		if (ff_layout_has_available_ds(lseg))
905 			return -NFS4ERR_RESET_TO_PNFS;
906 reset:
907 		dprintk("%s Retry through MDS. Error %d\n", __func__,
908 			task->tk_status);
909 		return -NFS4ERR_RESET_TO_MDS;
910 	}
911 out:
912 	task->tk_status = 0;
913 	return -EAGAIN;
914 out_bad_stateid:
915 	task->tk_status = -EIO;
916 	return 0;
917 wait_on_recovery:
918 	rpc_sleep_on(&mds_client->cl_rpcwaitq, task, NULL);
919 	if (test_bit(NFS4CLNT_MANAGER_RUNNING, &mds_client->cl_state) == 0)
920 		rpc_wake_up_queued_task(&mds_client->cl_rpcwaitq, task);
921 	goto out;
922 }
923 
924 /* Retry all errors through either pNFS or MDS except for -EJUKEBOX */
925 static int ff_layout_async_handle_error_v3(struct rpc_task *task,
926 					   struct pnfs_layout_segment *lseg,
927 					   int idx)
928 {
929 	struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
930 
931 	if (task->tk_status >= 0)
932 		return 0;
933 
934 	if (task->tk_status != -EJUKEBOX) {
935 		dprintk("%s DS connection error %d\n", __func__,
936 			task->tk_status);
937 		nfs4_mark_deviceid_unavailable(devid);
938 		if (ff_layout_has_available_ds(lseg))
939 			return -NFS4ERR_RESET_TO_PNFS;
940 		else
941 			return -NFS4ERR_RESET_TO_MDS;
942 	}
943 
944 	if (task->tk_status == -EJUKEBOX)
945 		nfs_inc_stats(lseg->pls_layout->plh_inode, NFSIOS_DELAY);
946 	task->tk_status = 0;
947 	rpc_restart_call(task);
948 	rpc_delay(task, NFS_JUKEBOX_RETRY_TIME);
949 	return -EAGAIN;
950 }
951 
952 static int ff_layout_async_handle_error(struct rpc_task *task,
953 					struct nfs4_state *state,
954 					struct nfs_client *clp,
955 					struct pnfs_layout_segment *lseg,
956 					int idx)
957 {
958 	int vers = clp->cl_nfs_mod->rpc_vers->number;
959 
960 	switch (vers) {
961 	case 3:
962 		return ff_layout_async_handle_error_v3(task, lseg, idx);
963 	case 4:
964 		return ff_layout_async_handle_error_v4(task, state, clp,
965 						       lseg, idx);
966 	default:
967 		/* should never happen */
968 		WARN_ON_ONCE(1);
969 		return 0;
970 	}
971 }
972 
973 static void ff_layout_io_track_ds_error(struct pnfs_layout_segment *lseg,
974 					int idx, u64 offset, u64 length,
975 					u32 status, int opnum)
976 {
977 	struct nfs4_ff_layout_mirror *mirror;
978 	int err;
979 
980 	mirror = FF_LAYOUT_COMP(lseg, idx);
981 	err = ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
982 				       mirror, offset, length, status, opnum,
983 				       GFP_NOIO);
984 	dprintk("%s: err %d op %d status %u\n", __func__, err, opnum, status);
985 }
986 
987 /* NFS_PROTO call done callback routines */
988 
989 static int ff_layout_read_done_cb(struct rpc_task *task,
990 				struct nfs_pgio_header *hdr)
991 {
992 	struct inode *inode;
993 	int err;
994 
995 	trace_nfs4_pnfs_read(hdr, task->tk_status);
996 	if (task->tk_status == -ETIMEDOUT && !hdr->res.op_status)
997 		hdr->res.op_status = NFS4ERR_NXIO;
998 	if (task->tk_status < 0 && hdr->res.op_status)
999 		ff_layout_io_track_ds_error(hdr->lseg, hdr->pgio_mirror_idx,
1000 					    hdr->args.offset, hdr->args.count,
1001 					    hdr->res.op_status, OP_READ);
1002 	err = ff_layout_async_handle_error(task, hdr->args.context->state,
1003 					   hdr->ds_clp, hdr->lseg,
1004 					   hdr->pgio_mirror_idx);
1005 
1006 	switch (err) {
1007 	case -NFS4ERR_RESET_TO_PNFS:
1008 		set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
1009 			&hdr->lseg->pls_layout->plh_flags);
1010 		pnfs_read_resend_pnfs(hdr);
1011 		return task->tk_status;
1012 	case -NFS4ERR_RESET_TO_MDS:
1013 		inode = hdr->lseg->pls_layout->plh_inode;
1014 		pnfs_error_mark_layout_for_return(inode, hdr->lseg);
1015 		ff_layout_reset_read(hdr);
1016 		return task->tk_status;
1017 	case -EAGAIN:
1018 		rpc_restart_call_prepare(task);
1019 		return -EAGAIN;
1020 	}
1021 
1022 	return 0;
1023 }
1024 
1025 static bool
1026 ff_layout_need_layoutcommit(struct pnfs_layout_segment *lseg)
1027 {
1028 	return !(FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_LAYOUTCOMMIT);
1029 }
1030 
1031 /*
1032  * We reference the rpc_cred of the first WRITE that triggers the need for
1033  * a LAYOUTCOMMIT, and use it to send the layoutcommit compound.
1034  * rfc5661 is not clear about which credential should be used.
1035  *
1036  * Flexlayout client should treat DS replied FILE_SYNC as DATA_SYNC, so
1037  * to follow http://www.rfc-editor.org/errata_search.php?rfc=5661&eid=2751
1038  * we always send layoutcommit after DS writes.
1039  */
1040 static void
1041 ff_layout_set_layoutcommit(struct nfs_pgio_header *hdr)
1042 {
1043 	if (!ff_layout_need_layoutcommit(hdr->lseg))
1044 		return;
1045 
1046 	pnfs_set_layoutcommit(hdr->inode, hdr->lseg,
1047 			hdr->mds_offset + hdr->res.count);
1048 	dprintk("%s inode %lu pls_end_pos %lu\n", __func__, hdr->inode->i_ino,
1049 		(unsigned long) NFS_I(hdr->inode)->layout->plh_lwb);
1050 }
1051 
1052 static bool
1053 ff_layout_reset_to_mds(struct pnfs_layout_segment *lseg, int idx)
1054 {
1055 	/* No mirroring for now */
1056 	struct nfs4_deviceid_node *node = FF_LAYOUT_DEVID_NODE(lseg, idx);
1057 
1058 	return ff_layout_test_devid_unavailable(node);
1059 }
1060 
1061 static int ff_layout_read_prepare_common(struct rpc_task *task,
1062 					 struct nfs_pgio_header *hdr)
1063 {
1064 	nfs4_ff_layout_stat_io_start_read(
1065 			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1066 			hdr->args.count);
1067 
1068 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &hdr->args.context->flags))) {
1069 		rpc_exit(task, -EIO);
1070 		return -EIO;
1071 	}
1072 	if (ff_layout_reset_to_mds(hdr->lseg, hdr->pgio_mirror_idx)) {
1073 		dprintk("%s task %u reset io to MDS\n", __func__, task->tk_pid);
1074 		if (ff_layout_has_available_ds(hdr->lseg))
1075 			pnfs_read_resend_pnfs(hdr);
1076 		else
1077 			ff_layout_reset_read(hdr);
1078 		rpc_exit(task, 0);
1079 		return -EAGAIN;
1080 	}
1081 	hdr->pgio_done_cb = ff_layout_read_done_cb;
1082 
1083 	return 0;
1084 }
1085 
1086 /*
1087  * Call ops for the async read/write cases
1088  * In the case of dense layouts, the offset needs to be reset to its
1089  * original value.
1090  */
1091 static void ff_layout_read_prepare_v3(struct rpc_task *task, void *data)
1092 {
1093 	struct nfs_pgio_header *hdr = data;
1094 
1095 	if (ff_layout_read_prepare_common(task, hdr))
1096 		return;
1097 
1098 	rpc_call_start(task);
1099 }
1100 
1101 static int ff_layout_setup_sequence(struct nfs_client *ds_clp,
1102 				    struct nfs4_sequence_args *args,
1103 				    struct nfs4_sequence_res *res,
1104 				    struct rpc_task *task)
1105 {
1106 	if (ds_clp->cl_session)
1107 		return nfs41_setup_sequence(ds_clp->cl_session,
1108 					   args,
1109 					   res,
1110 					   task);
1111 	return nfs40_setup_sequence(ds_clp->cl_slot_tbl,
1112 				   args,
1113 				   res,
1114 				   task);
1115 }
1116 
1117 static void ff_layout_read_prepare_v4(struct rpc_task *task, void *data)
1118 {
1119 	struct nfs_pgio_header *hdr = data;
1120 
1121 	if (ff_layout_setup_sequence(hdr->ds_clp,
1122 				     &hdr->args.seq_args,
1123 				     &hdr->res.seq_res,
1124 				     task))
1125 		return;
1126 
1127 	if (ff_layout_read_prepare_common(task, hdr))
1128 		return;
1129 
1130 	if (nfs4_set_rw_stateid(&hdr->args.stateid, hdr->args.context,
1131 			hdr->args.lock_context, FMODE_READ) == -EIO)
1132 		rpc_exit(task, -EIO); /* lost lock, terminate I/O */
1133 }
1134 
1135 static void ff_layout_read_call_done(struct rpc_task *task, void *data)
1136 {
1137 	struct nfs_pgio_header *hdr = data;
1138 
1139 	dprintk("--> %s task->tk_status %d\n", __func__, task->tk_status);
1140 
1141 	nfs4_ff_layout_stat_io_end_read(task,
1142 			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1143 			hdr->args.count, hdr->res.count);
1144 
1145 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags) &&
1146 	    task->tk_status == 0) {
1147 		nfs4_sequence_done(task, &hdr->res.seq_res);
1148 		return;
1149 	}
1150 
1151 	/* Note this may cause RPC to be resent */
1152 	hdr->mds_ops->rpc_call_done(task, hdr);
1153 }
1154 
1155 static void ff_layout_read_count_stats(struct rpc_task *task, void *data)
1156 {
1157 	struct nfs_pgio_header *hdr = data;
1158 
1159 	rpc_count_iostats_metrics(task,
1160 	    &NFS_CLIENT(hdr->inode)->cl_metrics[NFSPROC4_CLNT_READ]);
1161 }
1162 
1163 static int ff_layout_write_done_cb(struct rpc_task *task,
1164 				struct nfs_pgio_header *hdr)
1165 {
1166 	struct inode *inode;
1167 	int err;
1168 
1169 	trace_nfs4_pnfs_write(hdr, task->tk_status);
1170 	if (task->tk_status == -ETIMEDOUT && !hdr->res.op_status)
1171 		hdr->res.op_status = NFS4ERR_NXIO;
1172 	if (task->tk_status < 0 && hdr->res.op_status)
1173 		ff_layout_io_track_ds_error(hdr->lseg, hdr->pgio_mirror_idx,
1174 					    hdr->args.offset, hdr->args.count,
1175 					    hdr->res.op_status, OP_WRITE);
1176 	err = ff_layout_async_handle_error(task, hdr->args.context->state,
1177 					   hdr->ds_clp, hdr->lseg,
1178 					   hdr->pgio_mirror_idx);
1179 
1180 	switch (err) {
1181 	case -NFS4ERR_RESET_TO_PNFS:
1182 	case -NFS4ERR_RESET_TO_MDS:
1183 		inode = hdr->lseg->pls_layout->plh_inode;
1184 		pnfs_error_mark_layout_for_return(inode, hdr->lseg);
1185 		if (err == -NFS4ERR_RESET_TO_PNFS) {
1186 			pnfs_set_retry_layoutget(hdr->lseg->pls_layout);
1187 			ff_layout_reset_write(hdr, true);
1188 		} else {
1189 			pnfs_clear_retry_layoutget(hdr->lseg->pls_layout);
1190 			ff_layout_reset_write(hdr, false);
1191 		}
1192 		return task->tk_status;
1193 	case -EAGAIN:
1194 		rpc_restart_call_prepare(task);
1195 		return -EAGAIN;
1196 	}
1197 
1198 	if (hdr->res.verf->committed == NFS_FILE_SYNC ||
1199 	    hdr->res.verf->committed == NFS_DATA_SYNC)
1200 		ff_layout_set_layoutcommit(hdr);
1201 
1202 	return 0;
1203 }
1204 
1205 static int ff_layout_commit_done_cb(struct rpc_task *task,
1206 				     struct nfs_commit_data *data)
1207 {
1208 	struct inode *inode;
1209 	int err;
1210 
1211 	trace_nfs4_pnfs_commit_ds(data, task->tk_status);
1212 	if (task->tk_status == -ETIMEDOUT && !data->res.op_status)
1213 		data->res.op_status = NFS4ERR_NXIO;
1214 	if (task->tk_status < 0 && data->res.op_status)
1215 		ff_layout_io_track_ds_error(data->lseg, data->ds_commit_index,
1216 					    data->args.offset, data->args.count,
1217 					    data->res.op_status, OP_COMMIT);
1218 	err = ff_layout_async_handle_error(task, NULL, data->ds_clp,
1219 					   data->lseg, data->ds_commit_index);
1220 
1221 	switch (err) {
1222 	case -NFS4ERR_RESET_TO_PNFS:
1223 	case -NFS4ERR_RESET_TO_MDS:
1224 		inode = data->lseg->pls_layout->plh_inode;
1225 		pnfs_error_mark_layout_for_return(inode, data->lseg);
1226 		if (err == -NFS4ERR_RESET_TO_PNFS)
1227 			pnfs_set_retry_layoutget(data->lseg->pls_layout);
1228 		else
1229 			pnfs_clear_retry_layoutget(data->lseg->pls_layout);
1230 		pnfs_generic_prepare_to_resend_writes(data);
1231 		return -EAGAIN;
1232 	case -EAGAIN:
1233 		rpc_restart_call_prepare(task);
1234 		return -EAGAIN;
1235 	}
1236 
1237 	if (data->verf.committed == NFS_UNSTABLE
1238 	    && ff_layout_need_layoutcommit(data->lseg))
1239 		pnfs_set_layoutcommit(data->inode, data->lseg, data->lwb);
1240 
1241 	return 0;
1242 }
1243 
1244 static int ff_layout_write_prepare_common(struct rpc_task *task,
1245 					  struct nfs_pgio_header *hdr)
1246 {
1247 	nfs4_ff_layout_stat_io_start_write(
1248 			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1249 			hdr->args.count);
1250 
1251 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &hdr->args.context->flags))) {
1252 		rpc_exit(task, -EIO);
1253 		return -EIO;
1254 	}
1255 
1256 	if (ff_layout_reset_to_mds(hdr->lseg, hdr->pgio_mirror_idx)) {
1257 		bool retry_pnfs;
1258 
1259 		retry_pnfs = ff_layout_has_available_ds(hdr->lseg);
1260 		dprintk("%s task %u reset io to %s\n", __func__,
1261 			task->tk_pid, retry_pnfs ? "pNFS" : "MDS");
1262 		ff_layout_reset_write(hdr, retry_pnfs);
1263 		rpc_exit(task, 0);
1264 		return -EAGAIN;
1265 	}
1266 
1267 	return 0;
1268 }
1269 
1270 static void ff_layout_write_prepare_v3(struct rpc_task *task, void *data)
1271 {
1272 	struct nfs_pgio_header *hdr = data;
1273 
1274 	if (ff_layout_write_prepare_common(task, hdr))
1275 		return;
1276 
1277 	rpc_call_start(task);
1278 }
1279 
1280 static void ff_layout_write_prepare_v4(struct rpc_task *task, void *data)
1281 {
1282 	struct nfs_pgio_header *hdr = data;
1283 
1284 	if (ff_layout_setup_sequence(hdr->ds_clp,
1285 				     &hdr->args.seq_args,
1286 				     &hdr->res.seq_res,
1287 				     task))
1288 		return;
1289 
1290 	if (ff_layout_write_prepare_common(task, hdr))
1291 		return;
1292 
1293 	if (nfs4_set_rw_stateid(&hdr->args.stateid, hdr->args.context,
1294 			hdr->args.lock_context, FMODE_WRITE) == -EIO)
1295 		rpc_exit(task, -EIO); /* lost lock, terminate I/O */
1296 }
1297 
1298 static void ff_layout_write_call_done(struct rpc_task *task, void *data)
1299 {
1300 	struct nfs_pgio_header *hdr = data;
1301 
1302 	nfs4_ff_layout_stat_io_end_write(task,
1303 			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
1304 			hdr->args.count, hdr->res.count,
1305 			hdr->res.verf->committed);
1306 
1307 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags) &&
1308 	    task->tk_status == 0) {
1309 		nfs4_sequence_done(task, &hdr->res.seq_res);
1310 		return;
1311 	}
1312 
1313 	/* Note this may cause RPC to be resent */
1314 	hdr->mds_ops->rpc_call_done(task, hdr);
1315 }
1316 
1317 static void ff_layout_write_count_stats(struct rpc_task *task, void *data)
1318 {
1319 	struct nfs_pgio_header *hdr = data;
1320 
1321 	rpc_count_iostats_metrics(task,
1322 	    &NFS_CLIENT(hdr->inode)->cl_metrics[NFSPROC4_CLNT_WRITE]);
1323 }
1324 
1325 static void ff_layout_commit_prepare_common(struct rpc_task *task,
1326 		struct nfs_commit_data *cdata)
1327 {
1328 	nfs4_ff_layout_stat_io_start_write(
1329 			FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
1330 			0);
1331 }
1332 
1333 static void ff_layout_commit_prepare_v3(struct rpc_task *task, void *data)
1334 {
1335 	ff_layout_commit_prepare_common(task, data);
1336 	rpc_call_start(task);
1337 }
1338 
1339 static void ff_layout_commit_prepare_v4(struct rpc_task *task, void *data)
1340 {
1341 	struct nfs_commit_data *wdata = data;
1342 
1343 	if (ff_layout_setup_sequence(wdata->ds_clp,
1344 				 &wdata->args.seq_args,
1345 				 &wdata->res.seq_res,
1346 				 task))
1347 		return;
1348 	ff_layout_commit_prepare_common(task, data);
1349 }
1350 
1351 static void ff_layout_commit_done(struct rpc_task *task, void *data)
1352 {
1353 	struct nfs_commit_data *cdata = data;
1354 	struct nfs_page *req;
1355 	__u64 count = 0;
1356 
1357 	if (task->tk_status == 0) {
1358 		list_for_each_entry(req, &cdata->pages, wb_list)
1359 			count += req->wb_bytes;
1360 	}
1361 
1362 	nfs4_ff_layout_stat_io_end_write(task,
1363 			FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
1364 			count, count, NFS_FILE_SYNC);
1365 
1366 	pnfs_generic_write_commit_done(task, data);
1367 }
1368 
1369 static void ff_layout_commit_count_stats(struct rpc_task *task, void *data)
1370 {
1371 	struct nfs_commit_data *cdata = data;
1372 
1373 	rpc_count_iostats_metrics(task,
1374 	    &NFS_CLIENT(cdata->inode)->cl_metrics[NFSPROC4_CLNT_COMMIT]);
1375 }
1376 
1377 static const struct rpc_call_ops ff_layout_read_call_ops_v3 = {
1378 	.rpc_call_prepare = ff_layout_read_prepare_v3,
1379 	.rpc_call_done = ff_layout_read_call_done,
1380 	.rpc_count_stats = ff_layout_read_count_stats,
1381 	.rpc_release = pnfs_generic_rw_release,
1382 };
1383 
1384 static const struct rpc_call_ops ff_layout_read_call_ops_v4 = {
1385 	.rpc_call_prepare = ff_layout_read_prepare_v4,
1386 	.rpc_call_done = ff_layout_read_call_done,
1387 	.rpc_count_stats = ff_layout_read_count_stats,
1388 	.rpc_release = pnfs_generic_rw_release,
1389 };
1390 
1391 static const struct rpc_call_ops ff_layout_write_call_ops_v3 = {
1392 	.rpc_call_prepare = ff_layout_write_prepare_v3,
1393 	.rpc_call_done = ff_layout_write_call_done,
1394 	.rpc_count_stats = ff_layout_write_count_stats,
1395 	.rpc_release = pnfs_generic_rw_release,
1396 };
1397 
1398 static const struct rpc_call_ops ff_layout_write_call_ops_v4 = {
1399 	.rpc_call_prepare = ff_layout_write_prepare_v4,
1400 	.rpc_call_done = ff_layout_write_call_done,
1401 	.rpc_count_stats = ff_layout_write_count_stats,
1402 	.rpc_release = pnfs_generic_rw_release,
1403 };
1404 
1405 static const struct rpc_call_ops ff_layout_commit_call_ops_v3 = {
1406 	.rpc_call_prepare = ff_layout_commit_prepare_v3,
1407 	.rpc_call_done = ff_layout_commit_done,
1408 	.rpc_count_stats = ff_layout_commit_count_stats,
1409 	.rpc_release = pnfs_generic_commit_release,
1410 };
1411 
1412 static const struct rpc_call_ops ff_layout_commit_call_ops_v4 = {
1413 	.rpc_call_prepare = ff_layout_commit_prepare_v4,
1414 	.rpc_call_done = ff_layout_commit_done,
1415 	.rpc_count_stats = ff_layout_commit_count_stats,
1416 	.rpc_release = pnfs_generic_commit_release,
1417 };
1418 
1419 static enum pnfs_try_status
1420 ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
1421 {
1422 	struct pnfs_layout_segment *lseg = hdr->lseg;
1423 	struct nfs4_pnfs_ds *ds;
1424 	struct rpc_clnt *ds_clnt;
1425 	struct rpc_cred *ds_cred;
1426 	loff_t offset = hdr->args.offset;
1427 	u32 idx = hdr->pgio_mirror_idx;
1428 	int vers;
1429 	struct nfs_fh *fh;
1430 
1431 	dprintk("--> %s ino %lu pgbase %u req %Zu@%llu\n",
1432 		__func__, hdr->inode->i_ino,
1433 		hdr->args.pgbase, (size_t)hdr->args.count, offset);
1434 
1435 	ds = nfs4_ff_layout_prepare_ds(lseg, idx, false);
1436 	if (!ds)
1437 		goto out_failed;
1438 
1439 	ds_clnt = nfs4_ff_find_or_create_ds_client(lseg, idx, ds->ds_clp,
1440 						   hdr->inode);
1441 	if (IS_ERR(ds_clnt))
1442 		goto out_failed;
1443 
1444 	ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred);
1445 	if (IS_ERR(ds_cred))
1446 		goto out_failed;
1447 
1448 	vers = nfs4_ff_layout_ds_version(lseg, idx);
1449 
1450 	dprintk("%s USE DS: %s cl_count %d vers %d\n", __func__,
1451 		ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count), vers);
1452 
1453 	atomic_inc(&ds->ds_clp->cl_count);
1454 	hdr->ds_clp = ds->ds_clp;
1455 	fh = nfs4_ff_layout_select_ds_fh(lseg, idx);
1456 	if (fh)
1457 		hdr->args.fh = fh;
1458 	/*
1459 	 * Note that if we ever decide to split across DSes,
1460 	 * then we may need to handle dense-like offsets.
1461 	 */
1462 	hdr->args.offset = offset;
1463 	hdr->mds_offset = offset;
1464 
1465 	/* Perform an asynchronous read to ds */
1466 	nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
1467 			  vers == 3 ? &ff_layout_read_call_ops_v3 :
1468 				      &ff_layout_read_call_ops_v4,
1469 			  0, RPC_TASK_SOFTCONN);
1470 
1471 	return PNFS_ATTEMPTED;
1472 
1473 out_failed:
1474 	if (ff_layout_has_available_ds(lseg))
1475 		return PNFS_TRY_AGAIN;
1476 	return PNFS_NOT_ATTEMPTED;
1477 }
1478 
1479 /* Perform async writes. */
1480 static enum pnfs_try_status
1481 ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
1482 {
1483 	struct pnfs_layout_segment *lseg = hdr->lseg;
1484 	struct nfs4_pnfs_ds *ds;
1485 	struct rpc_clnt *ds_clnt;
1486 	struct rpc_cred *ds_cred;
1487 	loff_t offset = hdr->args.offset;
1488 	int vers;
1489 	struct nfs_fh *fh;
1490 	int idx = hdr->pgio_mirror_idx;
1491 
1492 	ds = nfs4_ff_layout_prepare_ds(lseg, idx, true);
1493 	if (!ds)
1494 		return PNFS_NOT_ATTEMPTED;
1495 
1496 	ds_clnt = nfs4_ff_find_or_create_ds_client(lseg, idx, ds->ds_clp,
1497 						   hdr->inode);
1498 	if (IS_ERR(ds_clnt))
1499 		return PNFS_NOT_ATTEMPTED;
1500 
1501 	ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred);
1502 	if (IS_ERR(ds_cred))
1503 		return PNFS_NOT_ATTEMPTED;
1504 
1505 	vers = nfs4_ff_layout_ds_version(lseg, idx);
1506 
1507 	dprintk("%s ino %lu sync %d req %Zu@%llu DS: %s cl_count %d vers %d\n",
1508 		__func__, hdr->inode->i_ino, sync, (size_t) hdr->args.count,
1509 		offset, ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count),
1510 		vers);
1511 
1512 	hdr->pgio_done_cb = ff_layout_write_done_cb;
1513 	atomic_inc(&ds->ds_clp->cl_count);
1514 	hdr->ds_clp = ds->ds_clp;
1515 	hdr->ds_commit_idx = idx;
1516 	fh = nfs4_ff_layout_select_ds_fh(lseg, idx);
1517 	if (fh)
1518 		hdr->args.fh = fh;
1519 
1520 	/*
1521 	 * Note that if we ever decide to split across DSes,
1522 	 * then we may need to handle dense-like offsets.
1523 	 */
1524 	hdr->args.offset = offset;
1525 
1526 	/* Perform an asynchronous write */
1527 	nfs_initiate_pgio(ds_clnt, hdr, ds_cred, ds->ds_clp->rpc_ops,
1528 			  vers == 3 ? &ff_layout_write_call_ops_v3 :
1529 				      &ff_layout_write_call_ops_v4,
1530 			  sync, RPC_TASK_SOFTCONN);
1531 	return PNFS_ATTEMPTED;
1532 }
1533 
1534 static u32 calc_ds_index_from_commit(struct pnfs_layout_segment *lseg, u32 i)
1535 {
1536 	return i;
1537 }
1538 
1539 static struct nfs_fh *
1540 select_ds_fh_from_commit(struct pnfs_layout_segment *lseg, u32 i)
1541 {
1542 	struct nfs4_ff_layout_segment *flseg = FF_LAYOUT_LSEG(lseg);
1543 
1544 	/* FIXME: Assume that there is only one NFS version available
1545 	 * for the DS.
1546 	 */
1547 	return &flseg->mirror_array[i]->fh_versions[0];
1548 }
1549 
1550 static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
1551 {
1552 	struct pnfs_layout_segment *lseg = data->lseg;
1553 	struct nfs4_pnfs_ds *ds;
1554 	struct rpc_clnt *ds_clnt;
1555 	struct rpc_cred *ds_cred;
1556 	u32 idx;
1557 	int vers;
1558 	struct nfs_fh *fh;
1559 
1560 	idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
1561 	ds = nfs4_ff_layout_prepare_ds(lseg, idx, true);
1562 	if (!ds)
1563 		goto out_err;
1564 
1565 	ds_clnt = nfs4_ff_find_or_create_ds_client(lseg, idx, ds->ds_clp,
1566 						   data->inode);
1567 	if (IS_ERR(ds_clnt))
1568 		goto out_err;
1569 
1570 	ds_cred = ff_layout_get_ds_cred(lseg, idx, data->cred);
1571 	if (IS_ERR(ds_cred))
1572 		goto out_err;
1573 
1574 	vers = nfs4_ff_layout_ds_version(lseg, idx);
1575 
1576 	dprintk("%s ino %lu, how %d cl_count %d vers %d\n", __func__,
1577 		data->inode->i_ino, how, atomic_read(&ds->ds_clp->cl_count),
1578 		vers);
1579 	data->commit_done_cb = ff_layout_commit_done_cb;
1580 	data->cred = ds_cred;
1581 	atomic_inc(&ds->ds_clp->cl_count);
1582 	data->ds_clp = ds->ds_clp;
1583 	fh = select_ds_fh_from_commit(lseg, data->ds_commit_index);
1584 	if (fh)
1585 		data->args.fh = fh;
1586 
1587 	return nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
1588 				   vers == 3 ? &ff_layout_commit_call_ops_v3 :
1589 					       &ff_layout_commit_call_ops_v4,
1590 				   how, RPC_TASK_SOFTCONN);
1591 out_err:
1592 	pnfs_generic_prepare_to_resend_writes(data);
1593 	pnfs_generic_commit_release(data);
1594 	return -EAGAIN;
1595 }
1596 
1597 static int
1598 ff_layout_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
1599 			   int how, struct nfs_commit_info *cinfo)
1600 {
1601 	return pnfs_generic_commit_pagelist(inode, mds_pages, how, cinfo,
1602 					    ff_layout_initiate_commit);
1603 }
1604 
1605 static struct pnfs_ds_commit_info *
1606 ff_layout_get_ds_info(struct inode *inode)
1607 {
1608 	struct pnfs_layout_hdr *layout = NFS_I(inode)->layout;
1609 
1610 	if (layout == NULL)
1611 		return NULL;
1612 
1613 	return &FF_LAYOUT_FROM_HDR(layout)->commit_info;
1614 }
1615 
1616 static void
1617 ff_layout_free_deviceid_node(struct nfs4_deviceid_node *d)
1618 {
1619 	nfs4_ff_layout_free_deviceid(container_of(d, struct nfs4_ff_layout_ds,
1620 						  id_node));
1621 }
1622 
1623 static int ff_layout_encode_ioerr(struct nfs4_flexfile_layout *flo,
1624 				  struct xdr_stream *xdr,
1625 				  const struct nfs4_layoutreturn_args *args)
1626 {
1627 	struct pnfs_layout_hdr *hdr = &flo->generic_hdr;
1628 	__be32 *start;
1629 	int count = 0, ret = 0;
1630 
1631 	start = xdr_reserve_space(xdr, 4);
1632 	if (unlikely(!start))
1633 		return -E2BIG;
1634 
1635 	/* This assume we always return _ALL_ layouts */
1636 	spin_lock(&hdr->plh_inode->i_lock);
1637 	ret = ff_layout_encode_ds_ioerr(flo, xdr, &count, &args->range);
1638 	spin_unlock(&hdr->plh_inode->i_lock);
1639 
1640 	*start = cpu_to_be32(count);
1641 
1642 	return ret;
1643 }
1644 
1645 /* report nothing for now */
1646 static void ff_layout_encode_iostats(struct nfs4_flexfile_layout *flo,
1647 				     struct xdr_stream *xdr,
1648 				     const struct nfs4_layoutreturn_args *args)
1649 {
1650 	__be32 *p;
1651 
1652 	p = xdr_reserve_space(xdr, 4);
1653 	if (likely(p))
1654 		*p = cpu_to_be32(0);
1655 }
1656 
1657 static struct nfs4_deviceid_node *
1658 ff_layout_alloc_deviceid_node(struct nfs_server *server,
1659 			      struct pnfs_device *pdev, gfp_t gfp_flags)
1660 {
1661 	struct nfs4_ff_layout_ds *dsaddr;
1662 
1663 	dsaddr = nfs4_ff_alloc_deviceid_node(server, pdev, gfp_flags);
1664 	if (!dsaddr)
1665 		return NULL;
1666 	return &dsaddr->id_node;
1667 }
1668 
1669 static void
1670 ff_layout_encode_layoutreturn(struct pnfs_layout_hdr *lo,
1671 			      struct xdr_stream *xdr,
1672 			      const struct nfs4_layoutreturn_args *args)
1673 {
1674 	struct nfs4_flexfile_layout *flo = FF_LAYOUT_FROM_HDR(lo);
1675 	__be32 *start;
1676 
1677 	dprintk("%s: Begin\n", __func__);
1678 	start = xdr_reserve_space(xdr, 4);
1679 	BUG_ON(!start);
1680 
1681 	if (ff_layout_encode_ioerr(flo, xdr, args))
1682 		goto out;
1683 
1684 	ff_layout_encode_iostats(flo, xdr, args);
1685 out:
1686 	*start = cpu_to_be32((xdr->p - start - 1) * 4);
1687 	dprintk("%s: Return\n", __func__);
1688 }
1689 
1690 static int
1691 ff_layout_ntop4(const struct sockaddr *sap, char *buf, const size_t buflen)
1692 {
1693 	const struct sockaddr_in *sin = (struct sockaddr_in *)sap;
1694 
1695 	return snprintf(buf, buflen, "%pI4", &sin->sin_addr);
1696 }
1697 
1698 static size_t
1699 ff_layout_ntop6_noscopeid(const struct sockaddr *sap, char *buf,
1700 			  const int buflen)
1701 {
1702 	const struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
1703 	const struct in6_addr *addr = &sin6->sin6_addr;
1704 
1705 	/*
1706 	 * RFC 4291, Section 2.2.2
1707 	 *
1708 	 * Shorthanded ANY address
1709 	 */
1710 	if (ipv6_addr_any(addr))
1711 		return snprintf(buf, buflen, "::");
1712 
1713 	/*
1714 	 * RFC 4291, Section 2.2.2
1715 	 *
1716 	 * Shorthanded loopback address
1717 	 */
1718 	if (ipv6_addr_loopback(addr))
1719 		return snprintf(buf, buflen, "::1");
1720 
1721 	/*
1722 	 * RFC 4291, Section 2.2.3
1723 	 *
1724 	 * Special presentation address format for mapped v4
1725 	 * addresses.
1726 	 */
1727 	if (ipv6_addr_v4mapped(addr))
1728 		return snprintf(buf, buflen, "::ffff:%pI4",
1729 					&addr->s6_addr32[3]);
1730 
1731 	/*
1732 	 * RFC 4291, Section 2.2.1
1733 	 */
1734 	return snprintf(buf, buflen, "%pI6c", addr);
1735 }
1736 
1737 /* Derived from rpc_sockaddr2uaddr */
1738 static void
1739 ff_layout_encode_netaddr(struct xdr_stream *xdr, struct nfs4_pnfs_ds_addr *da)
1740 {
1741 	struct sockaddr *sap = (struct sockaddr *)&da->da_addr;
1742 	char portbuf[RPCBIND_MAXUADDRPLEN];
1743 	char addrbuf[RPCBIND_MAXUADDRLEN];
1744 	char *netid;
1745 	unsigned short port;
1746 	int len, netid_len;
1747 	__be32 *p;
1748 
1749 	switch (sap->sa_family) {
1750 	case AF_INET:
1751 		if (ff_layout_ntop4(sap, addrbuf, sizeof(addrbuf)) == 0)
1752 			return;
1753 		port = ntohs(((struct sockaddr_in *)sap)->sin_port);
1754 		netid = "tcp";
1755 		netid_len = 3;
1756 		break;
1757 	case AF_INET6:
1758 		if (ff_layout_ntop6_noscopeid(sap, addrbuf, sizeof(addrbuf)) == 0)
1759 			return;
1760 		port = ntohs(((struct sockaddr_in6 *)sap)->sin6_port);
1761 		netid = "tcp6";
1762 		netid_len = 4;
1763 		break;
1764 	default:
1765 		/* we only support tcp and tcp6 */
1766 		WARN_ON_ONCE(1);
1767 		return;
1768 	}
1769 
1770 	snprintf(portbuf, sizeof(portbuf), ".%u.%u", port >> 8, port & 0xff);
1771 	len = strlcat(addrbuf, portbuf, sizeof(addrbuf));
1772 
1773 	p = xdr_reserve_space(xdr, 4 + netid_len);
1774 	xdr_encode_opaque(p, netid, netid_len);
1775 
1776 	p = xdr_reserve_space(xdr, 4 + len);
1777 	xdr_encode_opaque(p, addrbuf, len);
1778 }
1779 
1780 static void
1781 ff_layout_encode_nfstime(struct xdr_stream *xdr,
1782 			 ktime_t t)
1783 {
1784 	struct timespec64 ts;
1785 	__be32 *p;
1786 
1787 	p = xdr_reserve_space(xdr, 12);
1788 	ts = ktime_to_timespec64(t);
1789 	p = xdr_encode_hyper(p, ts.tv_sec);
1790 	*p++ = cpu_to_be32(ts.tv_nsec);
1791 }
1792 
1793 static void
1794 ff_layout_encode_io_latency(struct xdr_stream *xdr,
1795 			    struct nfs4_ff_io_stat *stat)
1796 {
1797 	__be32 *p;
1798 
1799 	p = xdr_reserve_space(xdr, 5 * 8);
1800 	p = xdr_encode_hyper(p, stat->ops_requested);
1801 	p = xdr_encode_hyper(p, stat->bytes_requested);
1802 	p = xdr_encode_hyper(p, stat->ops_completed);
1803 	p = xdr_encode_hyper(p, stat->bytes_completed);
1804 	p = xdr_encode_hyper(p, stat->bytes_not_delivered);
1805 	ff_layout_encode_nfstime(xdr, stat->total_busy_time);
1806 	ff_layout_encode_nfstime(xdr, stat->aggregate_completion_time);
1807 }
1808 
1809 static void
1810 ff_layout_encode_layoutstats(struct xdr_stream *xdr,
1811 			     struct nfs42_layoutstat_args *args,
1812 			     struct nfs42_layoutstat_devinfo *devinfo)
1813 {
1814 	struct nfs4_ff_layout_mirror *mirror = devinfo->layout_private;
1815 	struct nfs4_pnfs_ds_addr *da;
1816 	struct nfs4_pnfs_ds *ds = mirror->mirror_ds->ds;
1817 	struct nfs_fh *fh = &mirror->fh_versions[0];
1818 	__be32 *p, *start;
1819 
1820 	da = list_first_entry(&ds->ds_addrs, struct nfs4_pnfs_ds_addr, da_node);
1821 	dprintk("%s: DS %s: encoding address %s\n",
1822 		__func__, ds->ds_remotestr, da->da_remotestr);
1823 	/* layoutupdate length */
1824 	start = xdr_reserve_space(xdr, 4);
1825 	/* netaddr4 */
1826 	ff_layout_encode_netaddr(xdr, da);
1827 	/* nfs_fh4 */
1828 	p = xdr_reserve_space(xdr, 4 + fh->size);
1829 	xdr_encode_opaque(p, fh->data, fh->size);
1830 	/* ff_io_latency4 read */
1831 	spin_lock(&mirror->lock);
1832 	ff_layout_encode_io_latency(xdr, &mirror->read_stat.io_stat);
1833 	/* ff_io_latency4 write */
1834 	ff_layout_encode_io_latency(xdr, &mirror->write_stat.io_stat);
1835 	spin_unlock(&mirror->lock);
1836 	/* nfstime4 */
1837 	ff_layout_encode_nfstime(xdr, ktime_sub(ktime_get(), mirror->start_time));
1838 	/* bool */
1839 	p = xdr_reserve_space(xdr, 4);
1840 	*p = cpu_to_be32(false);
1841 
1842 	*start = cpu_to_be32((xdr->p - start - 1) * 4);
1843 }
1844 
1845 static bool
1846 ff_layout_mirror_prepare_stats(struct nfs42_layoutstat_args *args,
1847 			       struct pnfs_layout_segment *pls,
1848 			       int *dev_count, int dev_limit)
1849 {
1850 	struct nfs4_ff_layout_mirror *mirror;
1851 	struct nfs4_deviceid_node *dev;
1852 	struct nfs42_layoutstat_devinfo *devinfo;
1853 	int i;
1854 
1855 	for (i = 0; i <= FF_LAYOUT_MIRROR_COUNT(pls); i++) {
1856 		if (*dev_count >= dev_limit)
1857 			break;
1858 		mirror = FF_LAYOUT_COMP(pls, i);
1859 		if (!mirror || !mirror->mirror_ds)
1860 			continue;
1861 		dev = FF_LAYOUT_DEVID_NODE(pls, i);
1862 		devinfo = &args->devinfo[*dev_count];
1863 		memcpy(&devinfo->dev_id, &dev->deviceid, NFS4_DEVICEID4_SIZE);
1864 		devinfo->offset = pls->pls_range.offset;
1865 		devinfo->length = pls->pls_range.length;
1866 		/* well, we don't really know if IO is continuous or not! */
1867 		devinfo->read_count = mirror->read_stat.io_stat.bytes_completed;
1868 		devinfo->read_bytes = mirror->read_stat.io_stat.bytes_completed;
1869 		devinfo->write_count = mirror->write_stat.io_stat.bytes_completed;
1870 		devinfo->write_bytes = mirror->write_stat.io_stat.bytes_completed;
1871 		devinfo->layout_type = LAYOUT_FLEX_FILES;
1872 		devinfo->layoutstats_encode = ff_layout_encode_layoutstats;
1873 		devinfo->layout_private = mirror;
1874 		/* lseg refcount put in cleanup_layoutstats */
1875 		pnfs_get_lseg(pls);
1876 
1877 		++(*dev_count);
1878 	}
1879 
1880 	return *dev_count < dev_limit;
1881 }
1882 
1883 static int
1884 ff_layout_prepare_layoutstats(struct nfs42_layoutstat_args *args)
1885 {
1886 	struct pnfs_layout_segment *pls;
1887 	int dev_count = 0;
1888 
1889 	spin_lock(&args->inode->i_lock);
1890 	list_for_each_entry(pls, &NFS_I(args->inode)->layout->plh_segs, pls_list) {
1891 		dev_count += FF_LAYOUT_MIRROR_COUNT(pls);
1892 	}
1893 	spin_unlock(&args->inode->i_lock);
1894 	/* For now, send at most PNFS_LAYOUTSTATS_MAXDEV statistics */
1895 	if (dev_count > PNFS_LAYOUTSTATS_MAXDEV) {
1896 		dprintk("%s: truncating devinfo to limit (%d:%d)\n",
1897 			__func__, dev_count, PNFS_LAYOUTSTATS_MAXDEV);
1898 		dev_count = PNFS_LAYOUTSTATS_MAXDEV;
1899 	}
1900 	args->devinfo = kmalloc(dev_count * sizeof(*args->devinfo), GFP_KERNEL);
1901 	if (!args->devinfo)
1902 		return -ENOMEM;
1903 
1904 	dev_count = 0;
1905 	spin_lock(&args->inode->i_lock);
1906 	list_for_each_entry(pls, &NFS_I(args->inode)->layout->plh_segs, pls_list) {
1907 		if (!ff_layout_mirror_prepare_stats(args, pls, &dev_count,
1908 						    PNFS_LAYOUTSTATS_MAXDEV)) {
1909 			break;
1910 		}
1911 	}
1912 	spin_unlock(&args->inode->i_lock);
1913 	args->num_dev = dev_count;
1914 
1915 	return 0;
1916 }
1917 
1918 static void
1919 ff_layout_cleanup_layoutstats(struct nfs42_layoutstat_data *data)
1920 {
1921 	struct nfs4_ff_layout_mirror *mirror;
1922 	int i;
1923 
1924 	for (i = 0; i < data->args.num_dev; i++) {
1925 		mirror = data->args.devinfo[i].layout_private;
1926 		data->args.devinfo[i].layout_private = NULL;
1927 		pnfs_put_lseg(mirror->lseg);
1928 	}
1929 }
1930 
1931 static struct pnfs_layoutdriver_type flexfilelayout_type = {
1932 	.id			= LAYOUT_FLEX_FILES,
1933 	.name			= "LAYOUT_FLEX_FILES",
1934 	.owner			= THIS_MODULE,
1935 	.alloc_layout_hdr	= ff_layout_alloc_layout_hdr,
1936 	.free_layout_hdr	= ff_layout_free_layout_hdr,
1937 	.alloc_lseg		= ff_layout_alloc_lseg,
1938 	.free_lseg		= ff_layout_free_lseg,
1939 	.pg_read_ops		= &ff_layout_pg_read_ops,
1940 	.pg_write_ops		= &ff_layout_pg_write_ops,
1941 	.get_ds_info		= ff_layout_get_ds_info,
1942 	.free_deviceid_node	= ff_layout_free_deviceid_node,
1943 	.mark_request_commit	= pnfs_layout_mark_request_commit,
1944 	.clear_request_commit	= pnfs_generic_clear_request_commit,
1945 	.scan_commit_lists	= pnfs_generic_scan_commit_lists,
1946 	.recover_commit_reqs	= pnfs_generic_recover_commit_reqs,
1947 	.commit_pagelist	= ff_layout_commit_pagelist,
1948 	.read_pagelist		= ff_layout_read_pagelist,
1949 	.write_pagelist		= ff_layout_write_pagelist,
1950 	.alloc_deviceid_node    = ff_layout_alloc_deviceid_node,
1951 	.encode_layoutreturn    = ff_layout_encode_layoutreturn,
1952 	.sync			= pnfs_nfs_generic_sync,
1953 	.prepare_layoutstats	= ff_layout_prepare_layoutstats,
1954 	.cleanup_layoutstats	= ff_layout_cleanup_layoutstats,
1955 };
1956 
1957 static int __init nfs4flexfilelayout_init(void)
1958 {
1959 	printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Registering...\n",
1960 	       __func__);
1961 	return pnfs_register_layoutdriver(&flexfilelayout_type);
1962 }
1963 
1964 static void __exit nfs4flexfilelayout_exit(void)
1965 {
1966 	printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Unregistering...\n",
1967 	       __func__);
1968 	pnfs_unregister_layoutdriver(&flexfilelayout_type);
1969 }
1970 
1971 MODULE_ALIAS("nfs-layouttype4-4");
1972 
1973 MODULE_LICENSE("GPL");
1974 MODULE_DESCRIPTION("The NFSv4 flexfile layout driver");
1975 
1976 module_init(nfs4flexfilelayout_init);
1977 module_exit(nfs4flexfilelayout_exit);
1978