xref: /linux/fs/nfs/localio.c (revision 50647a1176b7abd1b4ae55b491eb2fbbeef89db9)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * NFS client support for local clients to bypass network stack
4  *
5  * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6  * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7  * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8  * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9  */
10 
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22 
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26 
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30 
31 #define NFSDBG_FACILITY		NFSDBG_VFS
32 
33 #define NFSLOCAL_MAX_IOS	3
34 
35 struct nfs_local_kiocb {
36 	struct kiocb		kiocb;
37 	struct bio_vec		*bvec;
38 	struct nfs_pgio_header	*hdr;
39 	struct work_struct	work;
40 	void (*aio_complete_work)(struct work_struct *);
41 	struct nfsd_file	*localio;
42 	/* Begin mostly DIO-specific members */
43 	size_t                  end_len;
44 	short int		end_iter_index;
45 	short int		n_iters;
46 	bool			iter_is_dio_aligned[NFSLOCAL_MAX_IOS];
47 	loff_t                  offset[NFSLOCAL_MAX_IOS] ____cacheline_aligned;
48 	struct iov_iter		iters[NFSLOCAL_MAX_IOS];
49 	/* End mostly DIO-specific members */
50 };
51 
52 struct nfs_local_fsync_ctx {
53 	struct nfsd_file	*localio;
54 	struct nfs_commit_data	*data;
55 	struct work_struct	work;
56 	struct completion	*done;
57 };
58 
59 static bool localio_enabled __read_mostly = true;
60 module_param(localio_enabled, bool, 0644);
61 
nfs_client_is_local(const struct nfs_client * clp)62 static inline bool nfs_client_is_local(const struct nfs_client *clp)
63 {
64 	return !!rcu_access_pointer(clp->cl_uuid.net);
65 }
66 
nfs_server_is_local(const struct nfs_client * clp)67 bool nfs_server_is_local(const struct nfs_client *clp)
68 {
69 	return nfs_client_is_local(clp) && localio_enabled;
70 }
71 EXPORT_SYMBOL_GPL(nfs_server_is_local);
72 
73 /*
74  * UUID_IS_LOCAL XDR functions
75  */
76 
localio_xdr_enc_uuidargs(struct rpc_rqst * req,struct xdr_stream * xdr,const void * data)77 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
78 				     struct xdr_stream *xdr,
79 				     const void *data)
80 {
81 	const u8 *uuid = data;
82 
83 	encode_opaque_fixed(xdr, uuid, UUID_SIZE);
84 }
85 
localio_xdr_dec_uuidres(struct rpc_rqst * req,struct xdr_stream * xdr,void * result)86 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
87 				   struct xdr_stream *xdr,
88 				   void *result)
89 {
90 	/* void return */
91 	return 0;
92 }
93 
94 static const struct rpc_procinfo nfs_localio_procedures[] = {
95 	[LOCALIOPROC_UUID_IS_LOCAL] = {
96 		.p_proc = LOCALIOPROC_UUID_IS_LOCAL,
97 		.p_encode = localio_xdr_enc_uuidargs,
98 		.p_decode = localio_xdr_dec_uuidres,
99 		.p_arglen = XDR_QUADLEN(UUID_SIZE),
100 		.p_replen = 0,
101 		.p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
102 		.p_name = "UUID_IS_LOCAL",
103 	},
104 };
105 
106 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
107 static const struct rpc_version nfslocalio_version1 = {
108 	.number			= 1,
109 	.nrprocs		= ARRAY_SIZE(nfs_localio_procedures),
110 	.procs			= nfs_localio_procedures,
111 	.counts			= nfs_localio_counts,
112 };
113 
114 static const struct rpc_version *nfslocalio_version[] = {
115        [1]			= &nfslocalio_version1,
116 };
117 
118 extern const struct rpc_program nfslocalio_program;
119 static struct rpc_stat		nfslocalio_rpcstat = { &nfslocalio_program };
120 
121 const struct rpc_program nfslocalio_program = {
122 	.name			= "nfslocalio",
123 	.number			= NFS_LOCALIO_PROGRAM,
124 	.nrvers			= ARRAY_SIZE(nfslocalio_version),
125 	.version		= nfslocalio_version,
126 	.stats			= &nfslocalio_rpcstat,
127 };
128 
129 /*
130  * nfs_init_localioclient - Initialise an NFS localio client connection
131  */
nfs_init_localioclient(struct nfs_client * clp)132 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
133 {
134 	struct rpc_clnt *rpcclient_localio;
135 
136 	rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
137 						 &nfslocalio_program, 1);
138 
139 	dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
140 		__func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
141 		(IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
142 
143 	return rpcclient_localio;
144 }
145 
nfs_server_uuid_is_local(struct nfs_client * clp)146 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
147 {
148 	u8 uuid[UUID_SIZE];
149 	struct rpc_message msg = {
150 		.rpc_argp = &uuid,
151 	};
152 	struct rpc_clnt *rpcclient_localio;
153 	int status;
154 
155 	rpcclient_localio = nfs_init_localioclient(clp);
156 	if (IS_ERR(rpcclient_localio))
157 		return false;
158 
159 	export_uuid(uuid, &clp->cl_uuid.uuid);
160 
161 	msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
162 	status = rpc_call_sync(rpcclient_localio, &msg, 0);
163 	dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
164 		__func__, status);
165 	rpc_shutdown_client(rpcclient_localio);
166 
167 	/* Server is only local if it initialized required struct members */
168 	if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom)
169 		return false;
170 
171 	return true;
172 }
173 
174 /*
175  * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
176  * - called after alloc_client and init_client (so cl_rpcclient exists)
177  * - this function is idempotent, it can be called for old or new clients
178  */
nfs_local_probe(struct nfs_client * clp)179 static void nfs_local_probe(struct nfs_client *clp)
180 {
181 	/* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
182 	if (!localio_enabled ||
183 	    clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
184 		nfs_localio_disable_client(clp);
185 		return;
186 	}
187 
188 	if (nfs_client_is_local(clp))
189 		return;
190 
191 	if (!nfs_uuid_begin(&clp->cl_uuid))
192 		return;
193 	if (nfs_server_uuid_is_local(clp))
194 		nfs_localio_enable_client(clp);
195 	nfs_uuid_end(&clp->cl_uuid);
196 }
197 
nfs_local_probe_async_work(struct work_struct * work)198 void nfs_local_probe_async_work(struct work_struct *work)
199 {
200 	struct nfs_client *clp =
201 		container_of(work, struct nfs_client, cl_local_probe_work);
202 
203 	if (!refcount_inc_not_zero(&clp->cl_count))
204 		return;
205 	nfs_local_probe(clp);
206 	nfs_put_client(clp);
207 }
208 
nfs_local_probe_async(struct nfs_client * clp)209 void nfs_local_probe_async(struct nfs_client *clp)
210 {
211 	queue_work(nfsiod_workqueue, &clp->cl_local_probe_work);
212 }
213 EXPORT_SYMBOL_GPL(nfs_local_probe_async);
214 
nfs_local_file_put(struct nfsd_file * localio)215 static inline void nfs_local_file_put(struct nfsd_file *localio)
216 {
217 	/* nfs_to_nfsd_file_put_local() expects an __rcu pointer
218 	 * but we have a __kernel pointer.  It is always safe
219 	 * to cast a __kernel pointer to an __rcu pointer
220 	 * because the cast only weakens what is known about the pointer.
221 	 */
222 	struct nfsd_file __rcu *nf = (struct nfsd_file __rcu*) localio;
223 
224 	nfs_to_nfsd_file_put_local(&nf);
225 }
226 
227 /*
228  * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
229  *
230  * Returns a pointer to a struct nfsd_file or ERR_PTR.
231  * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local().
232  */
233 static struct nfsd_file *
__nfs_local_open_fh(struct nfs_client * clp,const struct cred * cred,struct nfs_fh * fh,struct nfs_file_localio * nfl,struct nfsd_file __rcu ** pnf,const fmode_t mode)234 __nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
235 		    struct nfs_fh *fh, struct nfs_file_localio *nfl,
236 		    struct nfsd_file __rcu **pnf,
237 		    const fmode_t mode)
238 {
239 	int status = 0;
240 	struct nfsd_file *localio;
241 
242 	localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
243 				    cred, fh, nfl, pnf, mode);
244 	if (IS_ERR(localio)) {
245 		status = PTR_ERR(localio);
246 		switch (status) {
247 		case -ENOMEM:
248 		case -ENXIO:
249 		case -ENOENT:
250 			/* Revalidate localio */
251 			nfs_localio_disable_client(clp);
252 			nfs_local_probe(clp);
253 		}
254 	}
255 	trace_nfs_local_open_fh(fh, mode, status);
256 	return localio;
257 }
258 
259 /*
260  * nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
261  * First checking if the open nfsd_file is already cached, otherwise
262  * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio.
263  *
264  * Returns a pointer to a struct nfsd_file or NULL.
265  */
266 struct nfsd_file *
nfs_local_open_fh(struct nfs_client * clp,const struct cred * cred,struct nfs_fh * fh,struct nfs_file_localio * nfl,const fmode_t mode)267 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
268 		  struct nfs_fh *fh, struct nfs_file_localio *nfl,
269 		  const fmode_t mode)
270 {
271 	struct nfsd_file *nf, __rcu **pnf;
272 
273 	if (!nfs_server_is_local(clp))
274 		return NULL;
275 	if (mode & ~(FMODE_READ | FMODE_WRITE))
276 		return NULL;
277 
278 	if (mode & FMODE_WRITE)
279 		pnf = &nfl->rw_file;
280 	else
281 		pnf = &nfl->ro_file;
282 
283 	nf = __nfs_local_open_fh(clp, cred, fh, nfl, pnf, mode);
284 	if (IS_ERR(nf))
285 		return NULL;
286 	return nf;
287 }
288 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
289 
290 static void
nfs_local_iocb_free(struct nfs_local_kiocb * iocb)291 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
292 {
293 	kfree(iocb->bvec);
294 	kfree(iocb);
295 }
296 
297 static struct nfs_local_kiocb *
nfs_local_iocb_alloc(struct nfs_pgio_header * hdr,struct file * file,gfp_t flags)298 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
299 		     struct file *file, gfp_t flags)
300 {
301 	struct nfs_local_kiocb *iocb;
302 
303 	iocb = kzalloc(sizeof(*iocb), flags);
304 	if (iocb == NULL)
305 		return NULL;
306 
307 	iocb->bvec = kmalloc_array(hdr->page_array.npages,
308 				   sizeof(struct bio_vec), flags);
309 	if (iocb->bvec == NULL) {
310 		kfree(iocb);
311 		return NULL;
312 	}
313 
314 	init_sync_kiocb(&iocb->kiocb, file);
315 
316 	iocb->hdr = hdr;
317 	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
318 	iocb->aio_complete_work = NULL;
319 
320 	iocb->end_iter_index = -1;
321 
322 	return iocb;
323 }
324 
325 static bool
nfs_is_local_dio_possible(struct nfs_local_kiocb * iocb,int rw,size_t len,struct nfs_local_dio * local_dio)326 nfs_is_local_dio_possible(struct nfs_local_kiocb *iocb, int rw,
327 			  size_t len, struct nfs_local_dio *local_dio)
328 {
329 	struct nfs_pgio_header *hdr = iocb->hdr;
330 	loff_t offset = hdr->args.offset;
331 	u32 nf_dio_mem_align, nf_dio_offset_align, nf_dio_read_offset_align;
332 	loff_t start_end, orig_end, middle_end;
333 
334 	nfs_to->nfsd_file_dio_alignment(iocb->localio, &nf_dio_mem_align,
335 			&nf_dio_offset_align, &nf_dio_read_offset_align);
336 	if (rw == ITER_DEST)
337 		nf_dio_offset_align = nf_dio_read_offset_align;
338 
339 	if (unlikely(!nf_dio_mem_align || !nf_dio_offset_align))
340 		return false;
341 	if (unlikely(nf_dio_offset_align > PAGE_SIZE))
342 		return false;
343 	if (unlikely(len < nf_dio_offset_align))
344 		return false;
345 
346 	local_dio->mem_align = nf_dio_mem_align;
347 	local_dio->offset_align = nf_dio_offset_align;
348 
349 	start_end = round_up(offset, nf_dio_offset_align);
350 	orig_end = offset + len;
351 	middle_end = round_down(orig_end, nf_dio_offset_align);
352 
353 	local_dio->middle_offset = start_end;
354 	local_dio->end_offset = middle_end;
355 
356 	local_dio->start_len = start_end - offset;
357 	local_dio->middle_len = middle_end - start_end;
358 	local_dio->end_len = orig_end - middle_end;
359 
360 	if (rw == ITER_DEST)
361 		trace_nfs_local_dio_read(hdr->inode, offset, len, local_dio);
362 	else
363 		trace_nfs_local_dio_write(hdr->inode, offset, len, local_dio);
364 	return true;
365 }
366 
nfs_iov_iter_aligned_bvec(const struct iov_iter * i,unsigned int addr_mask,unsigned int len_mask)367 static bool nfs_iov_iter_aligned_bvec(const struct iov_iter *i,
368 		unsigned int addr_mask, unsigned int len_mask)
369 {
370 	const struct bio_vec *bvec = i->bvec;
371 	size_t skip = i->iov_offset;
372 	size_t size = i->count;
373 
374 	if (size & len_mask)
375 		return false;
376 	do {
377 		size_t len = bvec->bv_len;
378 
379 		if (len > size)
380 			len = size;
381 		if ((unsigned long)(bvec->bv_offset + skip) & addr_mask)
382 			return false;
383 		bvec++;
384 		size -= len;
385 		skip = 0;
386 	} while (size);
387 
388 	return true;
389 }
390 
391 /*
392  * Setup as many as 3 iov_iter based on extents described by @local_dio.
393  * Returns the number of iov_iter that were setup.
394  */
395 static int
nfs_local_iters_setup_dio(struct nfs_local_kiocb * iocb,int rw,unsigned int nvecs,size_t len,struct nfs_local_dio * local_dio)396 nfs_local_iters_setup_dio(struct nfs_local_kiocb *iocb, int rw,
397 			  unsigned int nvecs, size_t len,
398 			  struct nfs_local_dio *local_dio)
399 {
400 	int n_iters = 0;
401 	struct iov_iter *iters = iocb->iters;
402 
403 	/* Setup misaligned start? */
404 	if (local_dio->start_len) {
405 		iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
406 		iters[n_iters].count = local_dio->start_len;
407 		iocb->offset[n_iters] = iocb->hdr->args.offset;
408 		iocb->iter_is_dio_aligned[n_iters] = false;
409 		++n_iters;
410 	}
411 
412 	/* Setup misaligned end?
413 	 * If so, the end is purposely setup to be issued using buffered IO
414 	 * before the middle (which will use DIO, if DIO-aligned, with AIO).
415 	 * This creates problems if/when the end results in a partial write.
416 	 * So must save index and length of end to handle this corner case.
417 	 */
418 	if (local_dio->end_len) {
419 		iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
420 		iocb->offset[n_iters] = local_dio->end_offset;
421 		iov_iter_advance(&iters[n_iters],
422 			local_dio->start_len + local_dio->middle_len);
423 		iocb->iter_is_dio_aligned[n_iters] = false;
424 		/* Save index and length of end */
425 		iocb->end_iter_index = n_iters;
426 		iocb->end_len = local_dio->end_len;
427 		++n_iters;
428 	}
429 
430 	/* Setup DIO-aligned middle to be issued last, to allow for
431 	 * DIO with AIO completion (see nfs_local_call_{read,write}).
432 	 */
433 	iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
434 	if (local_dio->start_len)
435 		iov_iter_advance(&iters[n_iters], local_dio->start_len);
436 	iters[n_iters].count -= local_dio->end_len;
437 	iocb->offset[n_iters] = local_dio->middle_offset;
438 
439 	iocb->iter_is_dio_aligned[n_iters] =
440 		nfs_iov_iter_aligned_bvec(&iters[n_iters],
441 			local_dio->mem_align-1, local_dio->offset_align-1);
442 
443 	if (unlikely(!iocb->iter_is_dio_aligned[n_iters])) {
444 		trace_nfs_local_dio_misaligned(iocb->hdr->inode,
445 			iocb->hdr->args.offset, len, local_dio);
446 		return 0; /* no DIO-aligned IO possible */
447 	}
448 	++n_iters;
449 
450 	iocb->n_iters = n_iters;
451 	return n_iters;
452 }
453 
454 static noinline_for_stack void
nfs_local_iters_init(struct nfs_local_kiocb * iocb,int rw)455 nfs_local_iters_init(struct nfs_local_kiocb *iocb, int rw)
456 {
457 	struct nfs_pgio_header *hdr = iocb->hdr;
458 	struct page **pagevec = hdr->page_array.pagevec;
459 	unsigned long v, total;
460 	unsigned int base;
461 	size_t len;
462 
463 	v = 0;
464 	total = hdr->args.count;
465 	base = hdr->args.pgbase;
466 	while (total && v < hdr->page_array.npages) {
467 		len = min_t(size_t, total, PAGE_SIZE - base);
468 		bvec_set_page(&iocb->bvec[v], *pagevec, len, base);
469 		total -= len;
470 		++pagevec;
471 		++v;
472 		base = 0;
473 	}
474 	len = hdr->args.count - total;
475 
476 	if (test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
477 		struct nfs_local_dio local_dio;
478 
479 		if (nfs_is_local_dio_possible(iocb, rw, len, &local_dio) &&
480 		    nfs_local_iters_setup_dio(iocb, rw, v, len, &local_dio) != 0)
481 			return; /* is DIO-aligned */
482 	}
483 
484 	/* Use buffered IO */
485 	iocb->offset[0] = hdr->args.offset;
486 	iov_iter_bvec(&iocb->iters[0], rw, iocb->bvec, v, len);
487 	iocb->n_iters = 1;
488 }
489 
490 static void
nfs_local_hdr_release(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)491 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
492 		const struct rpc_call_ops *call_ops)
493 {
494 	call_ops->rpc_call_done(&hdr->task, hdr);
495 	call_ops->rpc_release(hdr);
496 }
497 
498 static void
nfs_local_pgio_init(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)499 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
500 		const struct rpc_call_ops *call_ops)
501 {
502 	hdr->task.tk_ops = call_ops;
503 	if (!hdr->task.tk_start)
504 		hdr->task.tk_start = ktime_get();
505 }
506 
507 static void
nfs_local_pgio_done(struct nfs_pgio_header * hdr,long status)508 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
509 {
510 	/* Must handle partial completions */
511 	if (status >= 0) {
512 		hdr->res.count += status;
513 		/* @hdr was initialized to 0 (zeroed during allocation) */
514 		if (hdr->task.tk_status == 0)
515 			hdr->res.op_status = NFS4_OK;
516 	} else {
517 		hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
518 		hdr->task.tk_status = status;
519 	}
520 }
521 
522 static void
nfs_local_iocb_release(struct nfs_local_kiocb * iocb)523 nfs_local_iocb_release(struct nfs_local_kiocb *iocb)
524 {
525 	nfs_local_file_put(iocb->localio);
526 	nfs_local_iocb_free(iocb);
527 }
528 
529 static void
nfs_local_pgio_release(struct nfs_local_kiocb * iocb)530 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
531 {
532 	struct nfs_pgio_header *hdr = iocb->hdr;
533 
534 	nfs_local_iocb_release(iocb);
535 	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
536 }
537 
538 /*
539  * Complete the I/O from iocb->kiocb.ki_complete()
540  *
541  * Note that this function can be called from a bottom half context,
542  * hence we need to queue the rpc_call_done() etc to a workqueue
543  */
nfs_local_pgio_aio_complete(struct nfs_local_kiocb * iocb)544 static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
545 {
546 	INIT_WORK(&iocb->work, iocb->aio_complete_work);
547 	queue_work(nfsiod_workqueue, &iocb->work);
548 }
549 
550 static void
nfs_local_read_done(struct nfs_local_kiocb * iocb,long status)551 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
552 {
553 	struct nfs_pgio_header *hdr = iocb->hdr;
554 	struct file *filp = iocb->kiocb.ki_filp;
555 
556 	if ((iocb->kiocb.ki_flags & IOCB_DIRECT) && status == -EINVAL) {
557 		/* Underlying FS will return -EINVAL if misaligned DIO is attempted. */
558 		pr_info_ratelimited("nfs: Unexpected direct I/O read alignment failure\n");
559 	}
560 
561 	/*
562 	 * Must clear replen otherwise NFSv3 data corruption will occur
563 	 * if/when switching from LOCALIO back to using normal RPC.
564 	 */
565 	hdr->res.replen = 0;
566 
567 	if (hdr->res.count != hdr->args.count ||
568 	    hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
569 		hdr->res.eof = true;
570 
571 	dprintk("%s: read %ld bytes eof %d.\n", __func__,
572 			status > 0 ? status : 0, hdr->res.eof);
573 }
574 
nfs_local_read_aio_complete_work(struct work_struct * work)575 static void nfs_local_read_aio_complete_work(struct work_struct *work)
576 {
577 	struct nfs_local_kiocb *iocb =
578 		container_of(work, struct nfs_local_kiocb, work);
579 
580 	nfs_local_pgio_release(iocb);
581 }
582 
nfs_local_read_aio_complete(struct kiocb * kiocb,long ret)583 static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
584 {
585 	struct nfs_local_kiocb *iocb =
586 		container_of(kiocb, struct nfs_local_kiocb, kiocb);
587 
588 	nfs_local_pgio_done(iocb->hdr, ret);
589 	nfs_local_read_done(iocb, ret);
590 	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
591 }
592 
nfs_local_call_read(struct work_struct * work)593 static void nfs_local_call_read(struct work_struct *work)
594 {
595 	struct nfs_local_kiocb *iocb =
596 		container_of(work, struct nfs_local_kiocb, work);
597 	struct file *filp = iocb->kiocb.ki_filp;
598 	const struct cred *save_cred;
599 	ssize_t status;
600 
601 	save_cred = override_creds(filp->f_cred);
602 
603 	for (int i = 0; i < iocb->n_iters ; i++) {
604 		if (iocb->iter_is_dio_aligned[i]) {
605 			iocb->kiocb.ki_flags |= IOCB_DIRECT;
606 			iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
607 			iocb->aio_complete_work = nfs_local_read_aio_complete_work;
608 		}
609 
610 		iocb->kiocb.ki_pos = iocb->offset[i];
611 		status = filp->f_op->read_iter(&iocb->kiocb, &iocb->iters[i]);
612 		if (status != -EIOCBQUEUED) {
613 			nfs_local_pgio_done(iocb->hdr, status);
614 			if (iocb->hdr->task.tk_status)
615 				break;
616 		}
617 	}
618 
619 	revert_creds(save_cred);
620 
621 	if (status != -EIOCBQUEUED) {
622 		nfs_local_read_done(iocb, status);
623 		nfs_local_pgio_release(iocb);
624 	}
625 }
626 
627 static int
nfs_local_do_read(struct nfs_local_kiocb * iocb,const struct rpc_call_ops * call_ops)628 nfs_local_do_read(struct nfs_local_kiocb *iocb,
629 		  const struct rpc_call_ops *call_ops)
630 {
631 	struct nfs_pgio_header *hdr = iocb->hdr;
632 
633 	dprintk("%s: vfs_read count=%u pos=%llu\n",
634 		__func__, hdr->args.count, hdr->args.offset);
635 
636 	nfs_local_pgio_init(hdr, call_ops);
637 	hdr->res.eof = false;
638 
639 	INIT_WORK(&iocb->work, nfs_local_call_read);
640 	queue_work(nfslocaliod_workqueue, &iocb->work);
641 
642 	return 0;
643 }
644 
645 static void
nfs_copy_boot_verifier(struct nfs_write_verifier * verifier,struct inode * inode)646 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
647 {
648 	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
649 	u32 *verf = (u32 *)verifier->data;
650 	unsigned int seq;
651 
652 	do {
653 		seq = read_seqbegin(&clp->cl_boot_lock);
654 		verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
655 		verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
656 	} while (read_seqretry(&clp->cl_boot_lock, seq));
657 }
658 
659 static void
nfs_reset_boot_verifier(struct inode * inode)660 nfs_reset_boot_verifier(struct inode *inode)
661 {
662 	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
663 
664 	write_seqlock(&clp->cl_boot_lock);
665 	ktime_get_real_ts64(&clp->cl_nfssvc_boot);
666 	write_sequnlock(&clp->cl_boot_lock);
667 }
668 
669 static void
nfs_set_local_verifier(struct inode * inode,struct nfs_writeverf * verf,enum nfs3_stable_how how)670 nfs_set_local_verifier(struct inode *inode,
671 		struct nfs_writeverf *verf,
672 		enum nfs3_stable_how how)
673 {
674 	nfs_copy_boot_verifier(&verf->verifier, inode);
675 	verf->committed = how;
676 }
677 
678 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
__vfs_getattr(const struct path * p,struct kstat * stat,int version)679 static int __vfs_getattr(const struct path *p, struct kstat *stat, int version)
680 {
681 	u32 request_mask = STATX_BASIC_STATS;
682 
683 	if (version == 4)
684 		request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
685 	return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
686 }
687 
688 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
__nfsd4_change_attribute(const struct kstat * stat,const struct inode * inode)689 static u64 __nfsd4_change_attribute(const struct kstat *stat,
690 				    const struct inode *inode)
691 {
692 	u64 chattr;
693 
694 	if (stat->result_mask & STATX_CHANGE_COOKIE) {
695 		chattr = stat->change_cookie;
696 		if (S_ISREG(inode->i_mode) &&
697 		    !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
698 			chattr += (u64)stat->ctime.tv_sec << 30;
699 			chattr += stat->ctime.tv_nsec;
700 		}
701 	} else {
702 		chattr = time_to_chattr(&stat->ctime);
703 	}
704 	return chattr;
705 }
706 
nfs_local_vfs_getattr(struct nfs_local_kiocb * iocb)707 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
708 {
709 	struct kstat stat;
710 	struct file *filp = iocb->kiocb.ki_filp;
711 	struct nfs_pgio_header *hdr = iocb->hdr;
712 	struct nfs_fattr *fattr = hdr->res.fattr;
713 	int version = NFS_PROTO(hdr->inode)->version;
714 
715 	if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
716 		return;
717 
718 	fattr->valid = (NFS_ATTR_FATTR_FILEID |
719 			NFS_ATTR_FATTR_CHANGE |
720 			NFS_ATTR_FATTR_SIZE |
721 			NFS_ATTR_FATTR_ATIME |
722 			NFS_ATTR_FATTR_MTIME |
723 			NFS_ATTR_FATTR_CTIME |
724 			NFS_ATTR_FATTR_SPACE_USED);
725 
726 	fattr->fileid = stat.ino;
727 	fattr->size = stat.size;
728 	fattr->atime = stat.atime;
729 	fattr->mtime = stat.mtime;
730 	fattr->ctime = stat.ctime;
731 	if (version == 4) {
732 		fattr->change_attr =
733 			__nfsd4_change_attribute(&stat, file_inode(filp));
734 	} else
735 		fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
736 	fattr->du.nfs3.used = stat.blocks << 9;
737 }
738 
739 static void
nfs_local_write_done(struct nfs_local_kiocb * iocb,long status)740 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
741 {
742 	struct nfs_pgio_header *hdr = iocb->hdr;
743 	struct inode *inode = hdr->inode;
744 
745 	dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
746 
747 	if ((iocb->kiocb.ki_flags & IOCB_DIRECT) && status == -EINVAL) {
748 		/* Underlying FS will return -EINVAL if misaligned DIO is attempted. */
749 		pr_info_ratelimited("nfs: Unexpected direct I/O write alignment failure\n");
750 	}
751 
752 	/* Handle short writes as if they are ENOSPC */
753 	status = hdr->res.count;
754 	if (status > 0 && status < hdr->args.count) {
755 		hdr->mds_offset += status;
756 		hdr->args.offset += status;
757 		hdr->args.pgbase += status;
758 		hdr->args.count -= status;
759 		nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
760 		status = -ENOSPC;
761 		/* record -ENOSPC in terms of nfs_local_pgio_done */
762 		nfs_local_pgio_done(hdr, status);
763 	}
764 	if (hdr->task.tk_status < 0)
765 		nfs_reset_boot_verifier(inode);
766 }
767 
nfs_local_write_aio_complete_work(struct work_struct * work)768 static void nfs_local_write_aio_complete_work(struct work_struct *work)
769 {
770 	struct nfs_local_kiocb *iocb =
771 		container_of(work, struct nfs_local_kiocb, work);
772 
773 	nfs_local_vfs_getattr(iocb);
774 	nfs_local_pgio_release(iocb);
775 }
776 
nfs_local_write_aio_complete(struct kiocb * kiocb,long ret)777 static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
778 {
779 	struct nfs_local_kiocb *iocb =
780 		container_of(kiocb, struct nfs_local_kiocb, kiocb);
781 
782 	nfs_local_pgio_done(iocb->hdr, ret);
783 	nfs_local_write_done(iocb, ret);
784 	nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
785 }
786 
nfs_local_call_write(struct work_struct * work)787 static void nfs_local_call_write(struct work_struct *work)
788 {
789 	struct nfs_local_kiocb *iocb =
790 		container_of(work, struct nfs_local_kiocb, work);
791 	struct file *filp = iocb->kiocb.ki_filp;
792 	unsigned long old_flags = current->flags;
793 	const struct cred *save_cred;
794 	ssize_t status;
795 
796 	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
797 	save_cred = override_creds(filp->f_cred);
798 
799 	file_start_write(filp);
800 	for (int i = 0; i < iocb->n_iters ; i++) {
801 		if (iocb->iter_is_dio_aligned[i]) {
802 			iocb->kiocb.ki_flags |= IOCB_DIRECT;
803 			iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
804 			iocb->aio_complete_work = nfs_local_write_aio_complete_work;
805 		}
806 retry:
807 		iocb->kiocb.ki_pos = iocb->offset[i];
808 		status = filp->f_op->write_iter(&iocb->kiocb, &iocb->iters[i]);
809 		if (status != -EIOCBQUEUED) {
810 			if (unlikely(status >= 0 && status < iocb->iters[i].count)) {
811 				/* partial write */
812 				if (i == iocb->end_iter_index) {
813 					/* Must not account partial end, otherwise, due
814 					 * to end being issued before middle: the partial
815 					 * write accounting in nfs_local_write_done()
816 					 * would incorrectly advance hdr->args.offset
817 					 */
818 					status = 0;
819 				} else {
820 					/* Partial write at start or buffered middle,
821 					 * exit early.
822 					 */
823 					nfs_local_pgio_done(iocb->hdr, status);
824 					break;
825 				}
826 			} else if (unlikely(status == -ENOTBLK &&
827 					    (iocb->kiocb.ki_flags & IOCB_DIRECT))) {
828 				/* VFS will return -ENOTBLK if DIO WRITE fails to
829 				 * invalidate the page cache. Retry using buffered IO.
830 				 */
831 				iocb->kiocb.ki_flags &= ~IOCB_DIRECT;
832 				iocb->kiocb.ki_complete = NULL;
833 				iocb->aio_complete_work = NULL;
834 				goto retry;
835 			}
836 			nfs_local_pgio_done(iocb->hdr, status);
837 			if (iocb->hdr->task.tk_status)
838 				break;
839 		}
840 	}
841 	file_end_write(filp);
842 
843 	revert_creds(save_cred);
844 	current->flags = old_flags;
845 
846 	if (status != -EIOCBQUEUED) {
847 		nfs_local_write_done(iocb, status);
848 		nfs_local_vfs_getattr(iocb);
849 		nfs_local_pgio_release(iocb);
850 	}
851 }
852 
853 static int
nfs_local_do_write(struct nfs_local_kiocb * iocb,const struct rpc_call_ops * call_ops)854 nfs_local_do_write(struct nfs_local_kiocb *iocb,
855 		   const struct rpc_call_ops *call_ops)
856 {
857 	struct nfs_pgio_header *hdr = iocb->hdr;
858 
859 	dprintk("%s: vfs_write count=%u pos=%llu %s\n",
860 		__func__, hdr->args.count, hdr->args.offset,
861 		(hdr->args.stable == NFS_UNSTABLE) ?  "unstable" : "stable");
862 
863 	switch (hdr->args.stable) {
864 	default:
865 		break;
866 	case NFS_DATA_SYNC:
867 		iocb->kiocb.ki_flags |= IOCB_DSYNC;
868 		break;
869 	case NFS_FILE_SYNC:
870 		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
871 	}
872 
873 	nfs_local_pgio_init(hdr, call_ops);
874 
875 	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
876 
877 	INIT_WORK(&iocb->work, nfs_local_call_write);
878 	queue_work(nfslocaliod_workqueue, &iocb->work);
879 
880 	return 0;
881 }
882 
883 static struct nfs_local_kiocb *
nfs_local_iocb_init(struct nfs_pgio_header * hdr,struct nfsd_file * localio)884 nfs_local_iocb_init(struct nfs_pgio_header *hdr, struct nfsd_file *localio)
885 {
886 	struct file *file = nfs_to->nfsd_file_file(localio);
887 	struct nfs_local_kiocb *iocb;
888 	gfp_t gfp_mask;
889 	int rw;
890 
891 	if (hdr->rw_mode & FMODE_READ) {
892 		if (!file->f_op->read_iter)
893 			return ERR_PTR(-EOPNOTSUPP);
894 		gfp_mask = GFP_KERNEL;
895 		rw = ITER_DEST;
896 	} else {
897 		if (!file->f_op->write_iter)
898 			return ERR_PTR(-EOPNOTSUPP);
899 		gfp_mask = GFP_NOIO;
900 		rw = ITER_SOURCE;
901 	}
902 
903 	iocb = nfs_local_iocb_alloc(hdr, file, gfp_mask);
904 	if (iocb == NULL)
905 		return ERR_PTR(-ENOMEM);
906 	iocb->hdr = hdr;
907 	iocb->localio = localio;
908 
909 	nfs_local_iters_init(iocb, rw);
910 
911 	return iocb;
912 }
913 
nfs_local_doio(struct nfs_client * clp,struct nfsd_file * localio,struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)914 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
915 		   struct nfs_pgio_header *hdr,
916 		   const struct rpc_call_ops *call_ops)
917 {
918 	struct nfs_local_kiocb *iocb;
919 	int status = 0;
920 
921 	if (!hdr->args.count)
922 		return 0;
923 
924 	iocb = nfs_local_iocb_init(hdr, localio);
925 	if (IS_ERR(iocb))
926 		return PTR_ERR(iocb);
927 
928 	switch (hdr->rw_mode) {
929 	case FMODE_READ:
930 		status = nfs_local_do_read(iocb, call_ops);
931 		break;
932 	case FMODE_WRITE:
933 		status = nfs_local_do_write(iocb, call_ops);
934 		break;
935 	default:
936 		dprintk("%s: invalid mode: %d\n", __func__,
937 			hdr->rw_mode);
938 		status = -EOPNOTSUPP;
939 	}
940 
941 	if (status != 0) {
942 		if (status == -EAGAIN)
943 			nfs_localio_disable_client(clp);
944 		nfs_local_iocb_release(iocb);
945 		hdr->task.tk_status = status;
946 		nfs_local_hdr_release(hdr, call_ops);
947 	}
948 	return status;
949 }
950 
951 static void
nfs_local_init_commit(struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)952 nfs_local_init_commit(struct nfs_commit_data *data,
953 		const struct rpc_call_ops *call_ops)
954 {
955 	data->task.tk_ops = call_ops;
956 }
957 
958 static int
nfs_local_run_commit(struct file * filp,struct nfs_commit_data * data)959 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
960 {
961 	loff_t start = data->args.offset;
962 	loff_t end = LLONG_MAX;
963 
964 	if (data->args.count > 0) {
965 		end = start + data->args.count - 1;
966 		if (end < start)
967 			end = LLONG_MAX;
968 	}
969 
970 	dprintk("%s: commit %llu - %llu\n", __func__, start, end);
971 	return vfs_fsync_range(filp, start, end, 0);
972 }
973 
974 static void
nfs_local_commit_done(struct nfs_commit_data * data,int status)975 nfs_local_commit_done(struct nfs_commit_data *data, int status)
976 {
977 	if (status >= 0) {
978 		nfs_set_local_verifier(data->inode,
979 				data->res.verf,
980 				NFS_FILE_SYNC);
981 		data->res.op_status = NFS4_OK;
982 		data->task.tk_status = 0;
983 	} else {
984 		nfs_reset_boot_verifier(data->inode);
985 		data->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
986 		data->task.tk_status = status;
987 	}
988 }
989 
990 static void
nfs_local_release_commit_data(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)991 nfs_local_release_commit_data(struct nfsd_file *localio,
992 		struct nfs_commit_data *data,
993 		const struct rpc_call_ops *call_ops)
994 {
995 	nfs_local_file_put(localio);
996 	call_ops->rpc_call_done(&data->task, data);
997 	call_ops->rpc_release(data);
998 }
999 
1000 static void
nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx * ctx)1001 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
1002 {
1003 	nfs_local_release_commit_data(ctx->localio, ctx->data,
1004 				      ctx->data->task.tk_ops);
1005 	kfree(ctx);
1006 }
1007 
1008 static void
nfs_local_fsync_work(struct work_struct * work)1009 nfs_local_fsync_work(struct work_struct *work)
1010 {
1011 	struct nfs_local_fsync_ctx *ctx;
1012 	int status;
1013 
1014 	ctx = container_of(work, struct nfs_local_fsync_ctx, work);
1015 
1016 	status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
1017 				      ctx->data);
1018 	nfs_local_commit_done(ctx->data, status);
1019 	if (ctx->done != NULL)
1020 		complete(ctx->done);
1021 	nfs_local_fsync_ctx_free(ctx);
1022 }
1023 
1024 static struct nfs_local_fsync_ctx *
nfs_local_fsync_ctx_alloc(struct nfs_commit_data * data,struct nfsd_file * localio,gfp_t flags)1025 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
1026 			  struct nfsd_file *localio, gfp_t flags)
1027 {
1028 	struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
1029 
1030 	if (ctx != NULL) {
1031 		ctx->localio = localio;
1032 		ctx->data = data;
1033 		INIT_WORK(&ctx->work, nfs_local_fsync_work);
1034 		ctx->done = NULL;
1035 	}
1036 	return ctx;
1037 }
1038 
nfs_local_commit(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops,int how)1039 int nfs_local_commit(struct nfsd_file *localio,
1040 		     struct nfs_commit_data *data,
1041 		     const struct rpc_call_ops *call_ops, int how)
1042 {
1043 	struct nfs_local_fsync_ctx *ctx;
1044 
1045 	ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
1046 	if (!ctx) {
1047 		nfs_local_commit_done(data, -ENOMEM);
1048 		nfs_local_release_commit_data(localio, data, call_ops);
1049 		return -ENOMEM;
1050 	}
1051 
1052 	nfs_local_init_commit(data, call_ops);
1053 
1054 	if (how & FLUSH_SYNC) {
1055 		DECLARE_COMPLETION_ONSTACK(done);
1056 		ctx->done = &done;
1057 		queue_work(nfsiod_workqueue, &ctx->work);
1058 		wait_for_completion(&done);
1059 	} else
1060 		queue_work(nfsiod_workqueue, &ctx->work);
1061 
1062 	return 0;
1063 }
1064