xref: /linux/fs/nfs/localio.c (revision 4037e28cd47e5a860ea23214024bcbe8a7585d81)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * NFS client support for local clients to bypass network stack
4  *
5  * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6  * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7  * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8  * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9  */
10 
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22 
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26 
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30 
31 #define NFSDBG_FACILITY		NFSDBG_VFS
32 
33 #define NFSLOCAL_MAX_IOS	3
34 
35 struct nfs_local_kiocb {
36 	struct kiocb		kiocb;
37 	struct bio_vec		*bvec;
38 	struct nfs_pgio_header	*hdr;
39 	struct work_struct	work;
40 	void (*aio_complete_work)(struct work_struct *);
41 	struct nfsd_file	*localio;
42 	/* Begin mostly DIO-specific members */
43 	size_t                  end_len;
44 	short int		end_iter_index;
45 	short int		n_iters;
46 	bool			iter_is_dio_aligned[NFSLOCAL_MAX_IOS];
47 	loff_t                  offset[NFSLOCAL_MAX_IOS] ____cacheline_aligned;
48 	struct iov_iter		iters[NFSLOCAL_MAX_IOS];
49 	/* End mostly DIO-specific members */
50 };
51 
52 struct nfs_local_fsync_ctx {
53 	struct nfsd_file	*localio;
54 	struct nfs_commit_data	*data;
55 	struct work_struct	work;
56 	struct completion	*done;
57 };
58 
59 static bool localio_enabled __read_mostly = true;
60 module_param(localio_enabled, bool, 0644);
61 
62 static inline bool nfs_client_is_local(const struct nfs_client *clp)
63 {
64 	return !!rcu_access_pointer(clp->cl_uuid.net);
65 }
66 
67 bool nfs_server_is_local(const struct nfs_client *clp)
68 {
69 	return nfs_client_is_local(clp) && localio_enabled;
70 }
71 EXPORT_SYMBOL_GPL(nfs_server_is_local);
72 
73 /*
74  * UUID_IS_LOCAL XDR functions
75  */
76 
77 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
78 				     struct xdr_stream *xdr,
79 				     const void *data)
80 {
81 	const u8 *uuid = data;
82 
83 	encode_opaque_fixed(xdr, uuid, UUID_SIZE);
84 }
85 
86 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
87 				   struct xdr_stream *xdr,
88 				   void *result)
89 {
90 	/* void return */
91 	return 0;
92 }
93 
94 static const struct rpc_procinfo nfs_localio_procedures[] = {
95 	[LOCALIOPROC_UUID_IS_LOCAL] = {
96 		.p_proc = LOCALIOPROC_UUID_IS_LOCAL,
97 		.p_encode = localio_xdr_enc_uuidargs,
98 		.p_decode = localio_xdr_dec_uuidres,
99 		.p_arglen = XDR_QUADLEN(UUID_SIZE),
100 		.p_replen = 0,
101 		.p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
102 		.p_name = "UUID_IS_LOCAL",
103 	},
104 };
105 
106 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
107 static const struct rpc_version nfslocalio_version1 = {
108 	.number			= 1,
109 	.nrprocs		= ARRAY_SIZE(nfs_localio_procedures),
110 	.procs			= nfs_localio_procedures,
111 	.counts			= nfs_localio_counts,
112 };
113 
114 static const struct rpc_version *nfslocalio_version[] = {
115        [1]			= &nfslocalio_version1,
116 };
117 
118 extern const struct rpc_program nfslocalio_program;
119 static struct rpc_stat		nfslocalio_rpcstat = { &nfslocalio_program };
120 
121 const struct rpc_program nfslocalio_program = {
122 	.name			= "nfslocalio",
123 	.number			= NFS_LOCALIO_PROGRAM,
124 	.nrvers			= ARRAY_SIZE(nfslocalio_version),
125 	.version		= nfslocalio_version,
126 	.stats			= &nfslocalio_rpcstat,
127 };
128 
129 /*
130  * nfs_init_localioclient - Initialise an NFS localio client connection
131  */
132 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
133 {
134 	struct rpc_clnt *rpcclient_localio;
135 
136 	rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
137 						 &nfslocalio_program, 1);
138 
139 	dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
140 		__func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
141 		(IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
142 
143 	return rpcclient_localio;
144 }
145 
146 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
147 {
148 	u8 uuid[UUID_SIZE];
149 	struct rpc_message msg = {
150 		.rpc_argp = &uuid,
151 	};
152 	struct rpc_clnt *rpcclient_localio;
153 	int status;
154 
155 	rpcclient_localio = nfs_init_localioclient(clp);
156 	if (IS_ERR(rpcclient_localio))
157 		return false;
158 
159 	export_uuid(uuid, &clp->cl_uuid.uuid);
160 
161 	msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
162 	status = rpc_call_sync(rpcclient_localio, &msg, 0);
163 	dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
164 		__func__, status);
165 	rpc_shutdown_client(rpcclient_localio);
166 
167 	/* Server is only local if it initialized required struct members */
168 	if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom)
169 		return false;
170 
171 	return true;
172 }
173 
174 /*
175  * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
176  * - called after alloc_client and init_client (so cl_rpcclient exists)
177  * - this function is idempotent, it can be called for old or new clients
178  */
179 static void nfs_local_probe(struct nfs_client *clp)
180 {
181 	/* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
182 	if (!localio_enabled ||
183 	    clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
184 		nfs_localio_disable_client(clp);
185 		return;
186 	}
187 
188 	if (nfs_client_is_local(clp))
189 		return;
190 
191 	if (!nfs_uuid_begin(&clp->cl_uuid))
192 		return;
193 	if (nfs_server_uuid_is_local(clp))
194 		nfs_localio_enable_client(clp);
195 	nfs_uuid_end(&clp->cl_uuid);
196 }
197 
198 void nfs_local_probe_async_work(struct work_struct *work)
199 {
200 	struct nfs_client *clp =
201 		container_of(work, struct nfs_client, cl_local_probe_work);
202 
203 	if (!refcount_inc_not_zero(&clp->cl_count))
204 		return;
205 	nfs_local_probe(clp);
206 	nfs_put_client(clp);
207 }
208 
209 void nfs_local_probe_async(struct nfs_client *clp)
210 {
211 	queue_work(nfsiod_workqueue, &clp->cl_local_probe_work);
212 }
213 EXPORT_SYMBOL_GPL(nfs_local_probe_async);
214 
215 static inline void nfs_local_file_put(struct nfsd_file *localio)
216 {
217 	/* nfs_to_nfsd_file_put_local() expects an __rcu pointer
218 	 * but we have a __kernel pointer.  It is always safe
219 	 * to cast a __kernel pointer to an __rcu pointer
220 	 * because the cast only weakens what is known about the pointer.
221 	 */
222 	struct nfsd_file __rcu *nf = (struct nfsd_file __rcu*) localio;
223 
224 	nfs_to_nfsd_file_put_local(&nf);
225 }
226 
227 /*
228  * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
229  *
230  * Returns a pointer to a struct nfsd_file or ERR_PTR.
231  * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local().
232  */
233 static struct nfsd_file *
234 __nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
235 		    struct nfs_fh *fh, struct nfs_file_localio *nfl,
236 		    struct nfsd_file __rcu **pnf,
237 		    const fmode_t mode)
238 {
239 	int status = 0;
240 	struct nfsd_file *localio;
241 
242 	localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
243 				    cred, fh, nfl, pnf, mode);
244 	if (IS_ERR(localio)) {
245 		status = PTR_ERR(localio);
246 		switch (status) {
247 		case -ENOMEM:
248 		case -ENXIO:
249 		case -ENOENT:
250 			/* Revalidate localio */
251 			nfs_localio_disable_client(clp);
252 			nfs_local_probe(clp);
253 		}
254 	}
255 	trace_nfs_local_open_fh(fh, mode, status);
256 	return localio;
257 }
258 
259 /*
260  * nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
261  * First checking if the open nfsd_file is already cached, otherwise
262  * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio.
263  *
264  * Returns a pointer to a struct nfsd_file or NULL.
265  */
266 struct nfsd_file *
267 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
268 		  struct nfs_fh *fh, struct nfs_file_localio *nfl,
269 		  const fmode_t mode)
270 {
271 	struct nfsd_file *nf, __rcu **pnf;
272 
273 	if (!nfs_server_is_local(clp))
274 		return NULL;
275 	if (mode & ~(FMODE_READ | FMODE_WRITE))
276 		return NULL;
277 
278 	if (mode & FMODE_WRITE)
279 		pnf = &nfl->rw_file;
280 	else
281 		pnf = &nfl->ro_file;
282 
283 	nf = __nfs_local_open_fh(clp, cred, fh, nfl, pnf, mode);
284 	if (IS_ERR(nf))
285 		return NULL;
286 	return nf;
287 }
288 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
289 
290 static void
291 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
292 {
293 	kfree(iocb->bvec);
294 	kfree(iocb);
295 }
296 
297 static struct nfs_local_kiocb *
298 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
299 		     struct file *file, gfp_t flags)
300 {
301 	struct nfs_local_kiocb *iocb;
302 
303 	iocb = kzalloc(sizeof(*iocb), flags);
304 	if (iocb == NULL)
305 		return NULL;
306 
307 	iocb->bvec = kmalloc_array(hdr->page_array.npages,
308 				   sizeof(struct bio_vec), flags);
309 	if (iocb->bvec == NULL) {
310 		kfree(iocb);
311 		return NULL;
312 	}
313 
314 	init_sync_kiocb(&iocb->kiocb, file);
315 
316 	iocb->hdr = hdr;
317 	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
318 	iocb->aio_complete_work = NULL;
319 
320 	iocb->end_iter_index = -1;
321 
322 	return iocb;
323 }
324 
325 static bool
326 nfs_is_local_dio_possible(struct nfs_local_kiocb *iocb, int rw,
327 			  size_t len, struct nfs_local_dio *local_dio)
328 {
329 	struct nfs_pgio_header *hdr = iocb->hdr;
330 	loff_t offset = hdr->args.offset;
331 	u32 nf_dio_mem_align, nf_dio_offset_align, nf_dio_read_offset_align;
332 	loff_t start_end, orig_end, middle_end;
333 
334 	nfs_to->nfsd_file_dio_alignment(iocb->localio, &nf_dio_mem_align,
335 			&nf_dio_offset_align, &nf_dio_read_offset_align);
336 	if (rw == ITER_DEST)
337 		nf_dio_offset_align = nf_dio_read_offset_align;
338 
339 	if (unlikely(!nf_dio_mem_align || !nf_dio_offset_align))
340 		return false;
341 	if (unlikely(nf_dio_offset_align > PAGE_SIZE))
342 		return false;
343 	if (unlikely(len < nf_dio_offset_align))
344 		return false;
345 
346 	local_dio->mem_align = nf_dio_mem_align;
347 	local_dio->offset_align = nf_dio_offset_align;
348 
349 	start_end = round_up(offset, nf_dio_offset_align);
350 	orig_end = offset + len;
351 	middle_end = round_down(orig_end, nf_dio_offset_align);
352 
353 	local_dio->middle_offset = start_end;
354 	local_dio->end_offset = middle_end;
355 
356 	local_dio->start_len = start_end - offset;
357 	local_dio->middle_len = middle_end - start_end;
358 	local_dio->end_len = orig_end - middle_end;
359 
360 	if (rw == ITER_DEST)
361 		trace_nfs_local_dio_read(hdr->inode, offset, len, local_dio);
362 	else
363 		trace_nfs_local_dio_write(hdr->inode, offset, len, local_dio);
364 	return true;
365 }
366 
367 static bool nfs_iov_iter_aligned_bvec(const struct iov_iter *i,
368 		unsigned int addr_mask, unsigned int len_mask)
369 {
370 	const struct bio_vec *bvec = i->bvec;
371 	size_t skip = i->iov_offset;
372 	size_t size = i->count;
373 
374 	if (size & len_mask)
375 		return false;
376 	do {
377 		size_t len = bvec->bv_len;
378 
379 		if (len > size)
380 			len = size;
381 		if ((unsigned long)(bvec->bv_offset + skip) & addr_mask)
382 			return false;
383 		bvec++;
384 		size -= len;
385 		skip = 0;
386 	} while (size);
387 
388 	return true;
389 }
390 
391 /*
392  * Setup as many as 3 iov_iter based on extents described by @local_dio.
393  * Returns the number of iov_iter that were setup.
394  */
395 static int
396 nfs_local_iters_setup_dio(struct nfs_local_kiocb *iocb, int rw,
397 			  unsigned int nvecs, size_t len,
398 			  struct nfs_local_dio *local_dio)
399 {
400 	int n_iters = 0;
401 	struct iov_iter *iters = iocb->iters;
402 
403 	/* Setup misaligned start? */
404 	if (local_dio->start_len) {
405 		iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
406 		iters[n_iters].count = local_dio->start_len;
407 		iocb->offset[n_iters] = iocb->hdr->args.offset;
408 		iocb->iter_is_dio_aligned[n_iters] = false;
409 		++n_iters;
410 	}
411 
412 	/* Setup misaligned end?
413 	 * If so, the end is purposely setup to be issued using buffered IO
414 	 * before the middle (which will use DIO, if DIO-aligned, with AIO).
415 	 * This creates problems if/when the end results in a partial write.
416 	 * So must save index and length of end to handle this corner case.
417 	 */
418 	if (local_dio->end_len) {
419 		iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
420 		iocb->offset[n_iters] = local_dio->end_offset;
421 		iov_iter_advance(&iters[n_iters],
422 			local_dio->start_len + local_dio->middle_len);
423 		iocb->iter_is_dio_aligned[n_iters] = false;
424 		/* Save index and length of end */
425 		iocb->end_iter_index = n_iters;
426 		iocb->end_len = local_dio->end_len;
427 		++n_iters;
428 	}
429 
430 	/* Setup DIO-aligned middle to be issued last, to allow for
431 	 * DIO with AIO completion (see nfs_local_call_{read,write}).
432 	 */
433 	iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
434 	if (local_dio->start_len)
435 		iov_iter_advance(&iters[n_iters], local_dio->start_len);
436 	iters[n_iters].count -= local_dio->end_len;
437 	iocb->offset[n_iters] = local_dio->middle_offset;
438 
439 	iocb->iter_is_dio_aligned[n_iters] =
440 		nfs_iov_iter_aligned_bvec(&iters[n_iters],
441 			local_dio->mem_align-1, local_dio->offset_align-1);
442 
443 	if (unlikely(!iocb->iter_is_dio_aligned[n_iters])) {
444 		trace_nfs_local_dio_misaligned(iocb->hdr->inode,
445 			iocb->hdr->args.offset, len, local_dio);
446 		return 0; /* no DIO-aligned IO possible */
447 	}
448 	++n_iters;
449 
450 	iocb->n_iters = n_iters;
451 	return n_iters;
452 }
453 
454 static noinline_for_stack void
455 nfs_local_iters_init(struct nfs_local_kiocb *iocb, int rw)
456 {
457 	struct nfs_pgio_header *hdr = iocb->hdr;
458 	struct page **pagevec = hdr->page_array.pagevec;
459 	unsigned long v, total;
460 	unsigned int base;
461 	size_t len;
462 
463 	v = 0;
464 	total = hdr->args.count;
465 	base = hdr->args.pgbase;
466 	while (total && v < hdr->page_array.npages) {
467 		len = min_t(size_t, total, PAGE_SIZE - base);
468 		bvec_set_page(&iocb->bvec[v], *pagevec, len, base);
469 		total -= len;
470 		++pagevec;
471 		++v;
472 		base = 0;
473 	}
474 	len = hdr->args.count - total;
475 
476 	if (test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
477 		struct nfs_local_dio local_dio;
478 
479 		if (nfs_is_local_dio_possible(iocb, rw, len, &local_dio) &&
480 		    nfs_local_iters_setup_dio(iocb, rw, v, len, &local_dio) != 0)
481 			return; /* is DIO-aligned */
482 	}
483 
484 	/* Use buffered IO */
485 	iocb->offset[0] = hdr->args.offset;
486 	iov_iter_bvec(&iocb->iters[0], rw, iocb->bvec, v, len);
487 	iocb->n_iters = 1;
488 }
489 
490 static void
491 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
492 		const struct rpc_call_ops *call_ops)
493 {
494 	call_ops->rpc_call_done(&hdr->task, hdr);
495 	call_ops->rpc_release(hdr);
496 }
497 
498 static void
499 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
500 		const struct rpc_call_ops *call_ops)
501 {
502 	hdr->task.tk_ops = call_ops;
503 	if (!hdr->task.tk_start)
504 		hdr->task.tk_start = ktime_get();
505 }
506 
507 static void
508 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
509 {
510 	/* Must handle partial completions */
511 	if (status >= 0) {
512 		hdr->res.count += status;
513 		/* @hdr was initialized to 0 (zeroed during allocation) */
514 		if (hdr->task.tk_status == 0)
515 			hdr->res.op_status = NFS4_OK;
516 	} else {
517 		hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
518 		hdr->task.tk_status = status;
519 	}
520 }
521 
522 static void
523 nfs_local_iocb_release(struct nfs_local_kiocb *iocb)
524 {
525 	nfs_local_file_put(iocb->localio);
526 	nfs_local_iocb_free(iocb);
527 }
528 
529 static void
530 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
531 {
532 	struct nfs_pgio_header *hdr = iocb->hdr;
533 
534 	nfs_local_iocb_release(iocb);
535 	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
536 }
537 
538 /*
539  * Complete the I/O from iocb->kiocb.ki_complete()
540  *
541  * Note that this function can be called from a bottom half context,
542  * hence we need to queue the rpc_call_done() etc to a workqueue
543  */
544 static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
545 {
546 	INIT_WORK(&iocb->work, iocb->aio_complete_work);
547 	queue_work(nfsiod_workqueue, &iocb->work);
548 }
549 
550 static void
551 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
552 {
553 	struct nfs_pgio_header *hdr = iocb->hdr;
554 	struct file *filp = iocb->kiocb.ki_filp;
555 
556 	if ((iocb->kiocb.ki_flags & IOCB_DIRECT) && status == -EINVAL) {
557 		/* Underlying FS will return -EINVAL if misaligned DIO is attempted. */
558 		pr_info_ratelimited("nfs: Unexpected direct I/O read alignment failure\n");
559 	}
560 
561 	/*
562 	 * Must clear replen otherwise NFSv3 data corruption will occur
563 	 * if/when switching from LOCALIO back to using normal RPC.
564 	 */
565 	hdr->res.replen = 0;
566 
567 	if (hdr->res.count != hdr->args.count ||
568 	    hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
569 		hdr->res.eof = true;
570 
571 	dprintk("%s: read %ld bytes eof %d.\n", __func__,
572 			status > 0 ? status : 0, hdr->res.eof);
573 }
574 
575 static void nfs_local_read_aio_complete_work(struct work_struct *work)
576 {
577 	struct nfs_local_kiocb *iocb =
578 		container_of(work, struct nfs_local_kiocb, work);
579 
580 	nfs_local_pgio_release(iocb);
581 }
582 
583 static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
584 {
585 	struct nfs_local_kiocb *iocb =
586 		container_of(kiocb, struct nfs_local_kiocb, kiocb);
587 
588 	nfs_local_pgio_done(iocb->hdr, ret);
589 	nfs_local_read_done(iocb, ret);
590 	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
591 }
592 
593 static void nfs_local_call_read(struct work_struct *work)
594 {
595 	struct nfs_local_kiocb *iocb =
596 		container_of(work, struct nfs_local_kiocb, work);
597 	struct file *filp = iocb->kiocb.ki_filp;
598 	ssize_t status;
599 
600 	scoped_with_creds(filp->f_cred) {
601 		for (int i = 0; i < iocb->n_iters ; i++) {
602 			if (iocb->iter_is_dio_aligned[i]) {
603 				iocb->kiocb.ki_flags |= IOCB_DIRECT;
604 				iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
605 				iocb->aio_complete_work = nfs_local_read_aio_complete_work;
606 			}
607 
608 			iocb->kiocb.ki_pos = iocb->offset[i];
609 			status = filp->f_op->read_iter(&iocb->kiocb, &iocb->iters[i]);
610 			if (status != -EIOCBQUEUED) {
611 				nfs_local_pgio_done(iocb->hdr, status);
612 				if (iocb->hdr->task.tk_status)
613 					break;
614 			}
615 		}
616 	}
617 
618 	if (status != -EIOCBQUEUED) {
619 		nfs_local_read_done(iocb, status);
620 		nfs_local_pgio_release(iocb);
621 	}
622 }
623 
624 static int
625 nfs_local_do_read(struct nfs_local_kiocb *iocb,
626 		  const struct rpc_call_ops *call_ops)
627 {
628 	struct nfs_pgio_header *hdr = iocb->hdr;
629 
630 	dprintk("%s: vfs_read count=%u pos=%llu\n",
631 		__func__, hdr->args.count, hdr->args.offset);
632 
633 	nfs_local_pgio_init(hdr, call_ops);
634 	hdr->res.eof = false;
635 
636 	INIT_WORK(&iocb->work, nfs_local_call_read);
637 	queue_work(nfslocaliod_workqueue, &iocb->work);
638 
639 	return 0;
640 }
641 
642 static void
643 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
644 {
645 	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
646 	u32 *verf = (u32 *)verifier->data;
647 	unsigned int seq;
648 
649 	do {
650 		seq = read_seqbegin(&clp->cl_boot_lock);
651 		verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
652 		verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
653 	} while (read_seqretry(&clp->cl_boot_lock, seq));
654 }
655 
656 static void
657 nfs_reset_boot_verifier(struct inode *inode)
658 {
659 	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
660 
661 	write_seqlock(&clp->cl_boot_lock);
662 	ktime_get_real_ts64(&clp->cl_nfssvc_boot);
663 	write_sequnlock(&clp->cl_boot_lock);
664 }
665 
666 static void
667 nfs_set_local_verifier(struct inode *inode,
668 		struct nfs_writeverf *verf,
669 		enum nfs3_stable_how how)
670 {
671 	nfs_copy_boot_verifier(&verf->verifier, inode);
672 	verf->committed = how;
673 }
674 
675 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
676 static int __vfs_getattr(const struct path *p, struct kstat *stat, int version)
677 {
678 	u32 request_mask = STATX_BASIC_STATS;
679 
680 	if (version == 4)
681 		request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
682 	return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
683 }
684 
685 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
686 static u64 __nfsd4_change_attribute(const struct kstat *stat,
687 				    const struct inode *inode)
688 {
689 	u64 chattr;
690 
691 	if (stat->result_mask & STATX_CHANGE_COOKIE) {
692 		chattr = stat->change_cookie;
693 		if (S_ISREG(inode->i_mode) &&
694 		    !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
695 			chattr += (u64)stat->ctime.tv_sec << 30;
696 			chattr += stat->ctime.tv_nsec;
697 		}
698 	} else {
699 		chattr = time_to_chattr(&stat->ctime);
700 	}
701 	return chattr;
702 }
703 
704 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
705 {
706 	struct kstat stat;
707 	struct file *filp = iocb->kiocb.ki_filp;
708 	struct nfs_pgio_header *hdr = iocb->hdr;
709 	struct nfs_fattr *fattr = hdr->res.fattr;
710 	int version = NFS_PROTO(hdr->inode)->version;
711 
712 	if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
713 		return;
714 
715 	fattr->valid = (NFS_ATTR_FATTR_FILEID |
716 			NFS_ATTR_FATTR_CHANGE |
717 			NFS_ATTR_FATTR_SIZE |
718 			NFS_ATTR_FATTR_ATIME |
719 			NFS_ATTR_FATTR_MTIME |
720 			NFS_ATTR_FATTR_CTIME |
721 			NFS_ATTR_FATTR_SPACE_USED);
722 
723 	fattr->fileid = stat.ino;
724 	fattr->size = stat.size;
725 	fattr->atime = stat.atime;
726 	fattr->mtime = stat.mtime;
727 	fattr->ctime = stat.ctime;
728 	if (version == 4) {
729 		fattr->change_attr =
730 			__nfsd4_change_attribute(&stat, file_inode(filp));
731 	} else
732 		fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
733 	fattr->du.nfs3.used = stat.blocks << 9;
734 }
735 
736 static void
737 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
738 {
739 	struct nfs_pgio_header *hdr = iocb->hdr;
740 	struct inode *inode = hdr->inode;
741 
742 	dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
743 
744 	if ((iocb->kiocb.ki_flags & IOCB_DIRECT) && status == -EINVAL) {
745 		/* Underlying FS will return -EINVAL if misaligned DIO is attempted. */
746 		pr_info_ratelimited("nfs: Unexpected direct I/O write alignment failure\n");
747 	}
748 
749 	/* Handle short writes as if they are ENOSPC */
750 	status = hdr->res.count;
751 	if (status > 0 && status < hdr->args.count) {
752 		hdr->mds_offset += status;
753 		hdr->args.offset += status;
754 		hdr->args.pgbase += status;
755 		hdr->args.count -= status;
756 		nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
757 		status = -ENOSPC;
758 		/* record -ENOSPC in terms of nfs_local_pgio_done */
759 		nfs_local_pgio_done(hdr, status);
760 	}
761 	if (hdr->task.tk_status < 0)
762 		nfs_reset_boot_verifier(inode);
763 }
764 
765 static void nfs_local_write_aio_complete_work(struct work_struct *work)
766 {
767 	struct nfs_local_kiocb *iocb =
768 		container_of(work, struct nfs_local_kiocb, work);
769 
770 	nfs_local_vfs_getattr(iocb);
771 	nfs_local_pgio_release(iocb);
772 }
773 
774 static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
775 {
776 	struct nfs_local_kiocb *iocb =
777 		container_of(kiocb, struct nfs_local_kiocb, kiocb);
778 
779 	nfs_local_pgio_done(iocb->hdr, ret);
780 	nfs_local_write_done(iocb, ret);
781 	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
782 }
783 
784 static ssize_t do_nfs_local_call_write(struct nfs_local_kiocb *iocb,
785 				       struct file *filp)
786 {
787 	ssize_t status;
788 
789 	file_start_write(filp);
790 	for (int i = 0; i < iocb->n_iters ; i++) {
791 		if (iocb->iter_is_dio_aligned[i]) {
792 			iocb->kiocb.ki_flags |= IOCB_DIRECT;
793 			iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
794 			iocb->aio_complete_work = nfs_local_write_aio_complete_work;
795 		}
796 retry:
797 		iocb->kiocb.ki_pos = iocb->offset[i];
798 		status = filp->f_op->write_iter(&iocb->kiocb, &iocb->iters[i]);
799 		if (status != -EIOCBQUEUED) {
800 			if (unlikely(status >= 0 && status < iocb->iters[i].count)) {
801 				/* partial write */
802 				if (i == iocb->end_iter_index) {
803 					/* Must not account partial end, otherwise, due
804 					 * to end being issued before middle: the partial
805 					 * write accounting in nfs_local_write_done()
806 					 * would incorrectly advance hdr->args.offset
807 					 */
808 					status = 0;
809 				} else {
810 					/* Partial write at start or buffered middle,
811 					 * exit early.
812 					 */
813 					nfs_local_pgio_done(iocb->hdr, status);
814 					break;
815 				}
816 			} else if (unlikely(status == -ENOTBLK &&
817 					    (iocb->kiocb.ki_flags & IOCB_DIRECT))) {
818 				/* VFS will return -ENOTBLK if DIO WRITE fails to
819 				 * invalidate the page cache. Retry using buffered IO.
820 				 */
821 				iocb->kiocb.ki_flags &= ~IOCB_DIRECT;
822 				iocb->kiocb.ki_complete = NULL;
823 				iocb->aio_complete_work = NULL;
824 				goto retry;
825 			}
826 			nfs_local_pgio_done(iocb->hdr, status);
827 			if (iocb->hdr->task.tk_status)
828 				break;
829 		}
830 	}
831 	file_end_write(filp);
832 
833 	return status;
834 }
835 
836 static void nfs_local_call_write(struct work_struct *work)
837 {
838 	struct nfs_local_kiocb *iocb =
839 		container_of(work, struct nfs_local_kiocb, work);
840 	struct file *filp = iocb->kiocb.ki_filp;
841 	unsigned long old_flags = current->flags;
842 	ssize_t status;
843 
844 	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
845 
846 	scoped_with_creds(filp->f_cred)
847 		status = do_nfs_local_call_write(iocb, filp);
848 
849 	current->flags = old_flags;
850 
851 	if (status != -EIOCBQUEUED) {
852 		nfs_local_write_done(iocb, status);
853 		nfs_local_vfs_getattr(iocb);
854 		nfs_local_pgio_release(iocb);
855 	}
856 }
857 
858 static int
859 nfs_local_do_write(struct nfs_local_kiocb *iocb,
860 		   const struct rpc_call_ops *call_ops)
861 {
862 	struct nfs_pgio_header *hdr = iocb->hdr;
863 
864 	dprintk("%s: vfs_write count=%u pos=%llu %s\n",
865 		__func__, hdr->args.count, hdr->args.offset,
866 		(hdr->args.stable == NFS_UNSTABLE) ?  "unstable" : "stable");
867 
868 	switch (hdr->args.stable) {
869 	default:
870 		break;
871 	case NFS_DATA_SYNC:
872 		iocb->kiocb.ki_flags |= IOCB_DSYNC;
873 		break;
874 	case NFS_FILE_SYNC:
875 		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
876 	}
877 
878 	nfs_local_pgio_init(hdr, call_ops);
879 
880 	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
881 
882 	INIT_WORK(&iocb->work, nfs_local_call_write);
883 	queue_work(nfslocaliod_workqueue, &iocb->work);
884 
885 	return 0;
886 }
887 
888 static struct nfs_local_kiocb *
889 nfs_local_iocb_init(struct nfs_pgio_header *hdr, struct nfsd_file *localio)
890 {
891 	struct file *file = nfs_to->nfsd_file_file(localio);
892 	struct nfs_local_kiocb *iocb;
893 	gfp_t gfp_mask;
894 	int rw;
895 
896 	if (hdr->rw_mode & FMODE_READ) {
897 		if (!file->f_op->read_iter)
898 			return ERR_PTR(-EOPNOTSUPP);
899 		gfp_mask = GFP_KERNEL;
900 		rw = ITER_DEST;
901 	} else {
902 		if (!file->f_op->write_iter)
903 			return ERR_PTR(-EOPNOTSUPP);
904 		gfp_mask = GFP_NOIO;
905 		rw = ITER_SOURCE;
906 	}
907 
908 	iocb = nfs_local_iocb_alloc(hdr, file, gfp_mask);
909 	if (iocb == NULL)
910 		return ERR_PTR(-ENOMEM);
911 	iocb->hdr = hdr;
912 	iocb->localio = localio;
913 
914 	nfs_local_iters_init(iocb, rw);
915 
916 	return iocb;
917 }
918 
919 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
920 		   struct nfs_pgio_header *hdr,
921 		   const struct rpc_call_ops *call_ops)
922 {
923 	struct nfs_local_kiocb *iocb;
924 	int status = 0;
925 
926 	if (!hdr->args.count)
927 		return 0;
928 
929 	iocb = nfs_local_iocb_init(hdr, localio);
930 	if (IS_ERR(iocb))
931 		return PTR_ERR(iocb);
932 
933 	switch (hdr->rw_mode) {
934 	case FMODE_READ:
935 		status = nfs_local_do_read(iocb, call_ops);
936 		break;
937 	case FMODE_WRITE:
938 		status = nfs_local_do_write(iocb, call_ops);
939 		break;
940 	default:
941 		dprintk("%s: invalid mode: %d\n", __func__,
942 			hdr->rw_mode);
943 		status = -EOPNOTSUPP;
944 	}
945 
946 	if (status != 0) {
947 		if (status == -EAGAIN)
948 			nfs_localio_disable_client(clp);
949 		nfs_local_iocb_release(iocb);
950 		hdr->task.tk_status = status;
951 		nfs_local_hdr_release(hdr, call_ops);
952 	}
953 	return status;
954 }
955 
956 static void
957 nfs_local_init_commit(struct nfs_commit_data *data,
958 		const struct rpc_call_ops *call_ops)
959 {
960 	data->task.tk_ops = call_ops;
961 }
962 
963 static int
964 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
965 {
966 	loff_t start = data->args.offset;
967 	loff_t end = LLONG_MAX;
968 
969 	if (data->args.count > 0) {
970 		end = start + data->args.count - 1;
971 		if (end < start)
972 			end = LLONG_MAX;
973 	}
974 
975 	dprintk("%s: commit %llu - %llu\n", __func__, start, end);
976 	return vfs_fsync_range(filp, start, end, 0);
977 }
978 
979 static void
980 nfs_local_commit_done(struct nfs_commit_data *data, int status)
981 {
982 	if (status >= 0) {
983 		nfs_set_local_verifier(data->inode,
984 				data->res.verf,
985 				NFS_FILE_SYNC);
986 		data->res.op_status = NFS4_OK;
987 		data->task.tk_status = 0;
988 	} else {
989 		nfs_reset_boot_verifier(data->inode);
990 		data->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
991 		data->task.tk_status = status;
992 	}
993 }
994 
995 static void
996 nfs_local_release_commit_data(struct nfsd_file *localio,
997 		struct nfs_commit_data *data,
998 		const struct rpc_call_ops *call_ops)
999 {
1000 	nfs_local_file_put(localio);
1001 	call_ops->rpc_call_done(&data->task, data);
1002 	call_ops->rpc_release(data);
1003 }
1004 
1005 static void
1006 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
1007 {
1008 	nfs_local_release_commit_data(ctx->localio, ctx->data,
1009 				      ctx->data->task.tk_ops);
1010 	kfree(ctx);
1011 }
1012 
1013 static void
1014 nfs_local_fsync_work(struct work_struct *work)
1015 {
1016 	struct nfs_local_fsync_ctx *ctx;
1017 	int status;
1018 
1019 	ctx = container_of(work, struct nfs_local_fsync_ctx, work);
1020 
1021 	status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
1022 				      ctx->data);
1023 	nfs_local_commit_done(ctx->data, status);
1024 	if (ctx->done != NULL)
1025 		complete(ctx->done);
1026 	nfs_local_fsync_ctx_free(ctx);
1027 }
1028 
1029 static struct nfs_local_fsync_ctx *
1030 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
1031 			  struct nfsd_file *localio, gfp_t flags)
1032 {
1033 	struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
1034 
1035 	if (ctx != NULL) {
1036 		ctx->localio = localio;
1037 		ctx->data = data;
1038 		INIT_WORK(&ctx->work, nfs_local_fsync_work);
1039 		ctx->done = NULL;
1040 	}
1041 	return ctx;
1042 }
1043 
1044 int nfs_local_commit(struct nfsd_file *localio,
1045 		     struct nfs_commit_data *data,
1046 		     const struct rpc_call_ops *call_ops, int how)
1047 {
1048 	struct nfs_local_fsync_ctx *ctx;
1049 
1050 	ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
1051 	if (!ctx) {
1052 		nfs_local_commit_done(data, -ENOMEM);
1053 		nfs_local_release_commit_data(localio, data, call_ops);
1054 		return -ENOMEM;
1055 	}
1056 
1057 	nfs_local_init_commit(data, call_ops);
1058 
1059 	if (how & FLUSH_SYNC) {
1060 		DECLARE_COMPLETION_ONSTACK(done);
1061 		ctx->done = &done;
1062 		queue_work(nfsiod_workqueue, &ctx->work);
1063 		wait_for_completion(&done);
1064 	} else
1065 		queue_work(nfsiod_workqueue, &ctx->work);
1066 
1067 	return 0;
1068 }
1069