xref: /linux/fs/nfs/localio.c (revision c8b90d40d5bba8e6fba457b8a7c10d3c0d467e37)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * NFS client support for local clients to bypass network stack
4  *
5  * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6  * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7  * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8  * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9  */
10 
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22 
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26 
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30 
31 #define NFSDBG_FACILITY		NFSDBG_VFS
32 
33 struct nfs_local_kiocb {
34 	struct kiocb		kiocb;
35 	struct bio_vec		*bvec;
36 	struct nfs_pgio_header	*hdr;
37 	struct work_struct	work;
38 	struct nfsd_file	*localio;
39 };
40 
41 struct nfs_local_fsync_ctx {
42 	struct nfsd_file	*localio;
43 	struct nfs_commit_data	*data;
44 	struct work_struct	work;
45 	struct completion	*done;
46 };
47 
48 static bool localio_enabled __read_mostly = true;
49 module_param(localio_enabled, bool, 0644);
50 
51 static inline bool nfs_client_is_local(const struct nfs_client *clp)
52 {
53 	return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
54 }
55 
56 bool nfs_server_is_local(const struct nfs_client *clp)
57 {
58 	return nfs_client_is_local(clp) && localio_enabled;
59 }
60 EXPORT_SYMBOL_GPL(nfs_server_is_local);
61 
62 /*
63  * UUID_IS_LOCAL XDR functions
64  */
65 
66 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
67 				     struct xdr_stream *xdr,
68 				     const void *data)
69 {
70 	const u8 *uuid = data;
71 
72 	encode_opaque_fixed(xdr, uuid, UUID_SIZE);
73 }
74 
75 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
76 				   struct xdr_stream *xdr,
77 				   void *result)
78 {
79 	/* void return */
80 	return 0;
81 }
82 
83 static const struct rpc_procinfo nfs_localio_procedures[] = {
84 	[LOCALIOPROC_UUID_IS_LOCAL] = {
85 		.p_proc = LOCALIOPROC_UUID_IS_LOCAL,
86 		.p_encode = localio_xdr_enc_uuidargs,
87 		.p_decode = localio_xdr_dec_uuidres,
88 		.p_arglen = XDR_QUADLEN(UUID_SIZE),
89 		.p_replen = 0,
90 		.p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
91 		.p_name = "UUID_IS_LOCAL",
92 	},
93 };
94 
95 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
96 static const struct rpc_version nfslocalio_version1 = {
97 	.number			= 1,
98 	.nrprocs		= ARRAY_SIZE(nfs_localio_procedures),
99 	.procs			= nfs_localio_procedures,
100 	.counts			= nfs_localio_counts,
101 };
102 
103 static const struct rpc_version *nfslocalio_version[] = {
104        [1]			= &nfslocalio_version1,
105 };
106 
107 extern const struct rpc_program nfslocalio_program;
108 static struct rpc_stat		nfslocalio_rpcstat = { &nfslocalio_program };
109 
110 const struct rpc_program nfslocalio_program = {
111 	.name			= "nfslocalio",
112 	.number			= NFS_LOCALIO_PROGRAM,
113 	.nrvers			= ARRAY_SIZE(nfslocalio_version),
114 	.version		= nfslocalio_version,
115 	.stats			= &nfslocalio_rpcstat,
116 };
117 
118 /*
119  * nfs_local_enable - enable local i/o for an nfs_client
120  */
121 static void nfs_local_enable(struct nfs_client *clp)
122 {
123 	spin_lock(&clp->cl_localio_lock);
124 	set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
125 	trace_nfs_local_enable(clp);
126 	spin_unlock(&clp->cl_localio_lock);
127 }
128 
129 /*
130  * nfs_local_disable - disable local i/o for an nfs_client
131  */
132 void nfs_local_disable(struct nfs_client *clp)
133 {
134 	spin_lock(&clp->cl_localio_lock);
135 	if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) {
136 		trace_nfs_local_disable(clp);
137 		nfs_uuid_invalidate_one_client(&clp->cl_uuid);
138 	}
139 	spin_unlock(&clp->cl_localio_lock);
140 }
141 
142 /*
143  * nfs_init_localioclient - Initialise an NFS localio client connection
144  */
145 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
146 {
147 	struct rpc_clnt *rpcclient_localio;
148 
149 	rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
150 						 &nfslocalio_program, 1);
151 
152 	dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
153 		__func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
154 		(IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
155 
156 	return rpcclient_localio;
157 }
158 
159 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
160 {
161 	u8 uuid[UUID_SIZE];
162 	struct rpc_message msg = {
163 		.rpc_argp = &uuid,
164 	};
165 	struct rpc_clnt *rpcclient_localio;
166 	int status;
167 
168 	rpcclient_localio = nfs_init_localioclient(clp);
169 	if (IS_ERR(rpcclient_localio))
170 		return false;
171 
172 	export_uuid(uuid, &clp->cl_uuid.uuid);
173 
174 	msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
175 	status = rpc_call_sync(rpcclient_localio, &msg, 0);
176 	dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
177 		__func__, status);
178 	rpc_shutdown_client(rpcclient_localio);
179 
180 	/* Server is only local if it initialized required struct members */
181 	if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom)
182 		return false;
183 
184 	return true;
185 }
186 
187 /*
188  * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
189  * - called after alloc_client and init_client (so cl_rpcclient exists)
190  * - this function is idempotent, it can be called for old or new clients
191  */
192 void nfs_local_probe(struct nfs_client *clp)
193 {
194 	/* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
195 	if (!localio_enabled ||
196 	    clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
197 		nfs_local_disable(clp);
198 		return;
199 	}
200 
201 	if (nfs_client_is_local(clp)) {
202 		/* If already enabled, disable and re-enable */
203 		nfs_local_disable(clp);
204 	}
205 
206 	if (!nfs_uuid_begin(&clp->cl_uuid))
207 		return;
208 	if (nfs_server_uuid_is_local(clp))
209 		nfs_local_enable(clp);
210 	nfs_uuid_end(&clp->cl_uuid);
211 }
212 EXPORT_SYMBOL_GPL(nfs_local_probe);
213 
214 /*
215  * nfs_local_open_fh - open a local filehandle in terms of nfsd_file
216  *
217  * Returns a pointer to a struct nfsd_file or NULL
218  */
219 struct nfsd_file *
220 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
221 		  struct nfs_fh *fh, const fmode_t mode)
222 {
223 	struct nfsd_file *localio;
224 	int status;
225 
226 	if (!nfs_server_is_local(clp))
227 		return NULL;
228 	if (mode & ~(FMODE_READ | FMODE_WRITE))
229 		return NULL;
230 
231 	localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
232 				    cred, fh, mode);
233 	if (IS_ERR(localio)) {
234 		status = PTR_ERR(localio);
235 		trace_nfs_local_open_fh(fh, mode, status);
236 		switch (status) {
237 		case -ENOMEM:
238 		case -ENXIO:
239 		case -ENOENT:
240 			/* Revalidate localio, will disable if unsupported */
241 			nfs_local_probe(clp);
242 		}
243 		return NULL;
244 	}
245 	return localio;
246 }
247 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
248 
249 static struct bio_vec *
250 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec,
251 		unsigned int npages, gfp_t flags)
252 {
253 	struct bio_vec *bvec, *p;
254 
255 	bvec = kmalloc_array(npages, sizeof(*bvec), flags);
256 	if (bvec != NULL) {
257 		for (p = bvec; npages > 0; p++, pagevec++, npages--) {
258 			p->bv_page = *pagevec;
259 			p->bv_len = PAGE_SIZE;
260 			p->bv_offset = 0;
261 		}
262 	}
263 	return bvec;
264 }
265 
266 static void
267 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
268 {
269 	kfree(iocb->bvec);
270 	kfree(iocb);
271 }
272 
273 static struct nfs_local_kiocb *
274 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
275 		     struct file *file, gfp_t flags)
276 {
277 	struct nfs_local_kiocb *iocb;
278 
279 	iocb = kmalloc(sizeof(*iocb), flags);
280 	if (iocb == NULL)
281 		return NULL;
282 	iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec,
283 			hdr->page_array.npages, flags);
284 	if (iocb->bvec == NULL) {
285 		kfree(iocb);
286 		return NULL;
287 	}
288 	init_sync_kiocb(&iocb->kiocb, file);
289 	iocb->kiocb.ki_pos = hdr->args.offset;
290 	iocb->hdr = hdr;
291 	iocb->kiocb.ki_flags &= ~IOCB_APPEND;
292 	return iocb;
293 }
294 
295 static void
296 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir)
297 {
298 	struct nfs_pgio_header *hdr = iocb->hdr;
299 
300 	iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages,
301 		      hdr->args.count + hdr->args.pgbase);
302 	if (hdr->args.pgbase != 0)
303 		iov_iter_advance(i, hdr->args.pgbase);
304 }
305 
306 static void
307 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
308 		const struct rpc_call_ops *call_ops)
309 {
310 	call_ops->rpc_call_done(&hdr->task, hdr);
311 	call_ops->rpc_release(hdr);
312 }
313 
314 static void
315 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
316 		const struct rpc_call_ops *call_ops)
317 {
318 	hdr->task.tk_ops = call_ops;
319 	if (!hdr->task.tk_start)
320 		hdr->task.tk_start = ktime_get();
321 }
322 
323 static void
324 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
325 {
326 	if (status >= 0) {
327 		hdr->res.count = status;
328 		hdr->res.op_status = NFS4_OK;
329 		hdr->task.tk_status = 0;
330 	} else {
331 		hdr->res.op_status = nfs4_stat_to_errno(status);
332 		hdr->task.tk_status = status;
333 	}
334 }
335 
336 static void
337 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
338 {
339 	struct nfs_pgio_header *hdr = iocb->hdr;
340 
341 	nfs_to_nfsd_file_put_local(iocb->localio);
342 	nfs_local_iocb_free(iocb);
343 	nfs_local_hdr_release(hdr, hdr->task.tk_ops);
344 }
345 
346 static void
347 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
348 {
349 	struct nfs_pgio_header *hdr = iocb->hdr;
350 	struct file *filp = iocb->kiocb.ki_filp;
351 
352 	nfs_local_pgio_done(hdr, status);
353 
354 	/*
355 	 * Must clear replen otherwise NFSv3 data corruption will occur
356 	 * if/when switching from LOCALIO back to using normal RPC.
357 	 */
358 	hdr->res.replen = 0;
359 
360 	if (hdr->res.count != hdr->args.count ||
361 	    hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
362 		hdr->res.eof = true;
363 
364 	dprintk("%s: read %ld bytes eof %d.\n", __func__,
365 			status > 0 ? status : 0, hdr->res.eof);
366 }
367 
368 static void nfs_local_call_read(struct work_struct *work)
369 {
370 	struct nfs_local_kiocb *iocb =
371 		container_of(work, struct nfs_local_kiocb, work);
372 	struct file *filp = iocb->kiocb.ki_filp;
373 	const struct cred *save_cred;
374 	struct iov_iter iter;
375 	ssize_t status;
376 
377 	save_cred = override_creds(filp->f_cred);
378 
379 	nfs_local_iter_init(&iter, iocb, READ);
380 
381 	status = filp->f_op->read_iter(&iocb->kiocb, &iter);
382 	WARN_ON_ONCE(status == -EIOCBQUEUED);
383 
384 	nfs_local_read_done(iocb, status);
385 	nfs_local_pgio_release(iocb);
386 
387 	revert_creds(save_cred);
388 }
389 
390 static int
391 nfs_do_local_read(struct nfs_pgio_header *hdr,
392 		  struct nfsd_file *localio,
393 		  const struct rpc_call_ops *call_ops)
394 {
395 	struct nfs_local_kiocb *iocb;
396 	struct file *file = nfs_to->nfsd_file_file(localio);
397 
398 	/* Don't support filesystems without read_iter */
399 	if (!file->f_op->read_iter)
400 		return -EAGAIN;
401 
402 	dprintk("%s: vfs_read count=%u pos=%llu\n",
403 		__func__, hdr->args.count, hdr->args.offset);
404 
405 	iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL);
406 	if (iocb == NULL)
407 		return -ENOMEM;
408 	iocb->localio = localio;
409 
410 	nfs_local_pgio_init(hdr, call_ops);
411 	hdr->res.eof = false;
412 
413 	INIT_WORK(&iocb->work, nfs_local_call_read);
414 	queue_work(nfslocaliod_workqueue, &iocb->work);
415 
416 	return 0;
417 }
418 
419 static void
420 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
421 {
422 	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
423 	u32 *verf = (u32 *)verifier->data;
424 	int seq = 0;
425 
426 	do {
427 		read_seqbegin_or_lock(&clp->cl_boot_lock, &seq);
428 		verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
429 		verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
430 	} while (need_seqretry(&clp->cl_boot_lock, seq));
431 	done_seqretry(&clp->cl_boot_lock, seq);
432 }
433 
434 static void
435 nfs_reset_boot_verifier(struct inode *inode)
436 {
437 	struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
438 
439 	write_seqlock(&clp->cl_boot_lock);
440 	ktime_get_real_ts64(&clp->cl_nfssvc_boot);
441 	write_sequnlock(&clp->cl_boot_lock);
442 }
443 
444 static void
445 nfs_set_local_verifier(struct inode *inode,
446 		struct nfs_writeverf *verf,
447 		enum nfs3_stable_how how)
448 {
449 	nfs_copy_boot_verifier(&verf->verifier, inode);
450 	verf->committed = how;
451 }
452 
453 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
454 static int __vfs_getattr(struct path *p, struct kstat *stat, int version)
455 {
456 	u32 request_mask = STATX_BASIC_STATS;
457 
458 	if (version == 4)
459 		request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
460 	return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
461 }
462 
463 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
464 static u64 __nfsd4_change_attribute(const struct kstat *stat,
465 				    const struct inode *inode)
466 {
467 	u64 chattr;
468 
469 	if (stat->result_mask & STATX_CHANGE_COOKIE) {
470 		chattr = stat->change_cookie;
471 		if (S_ISREG(inode->i_mode) &&
472 		    !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
473 			chattr += (u64)stat->ctime.tv_sec << 30;
474 			chattr += stat->ctime.tv_nsec;
475 		}
476 	} else {
477 		chattr = time_to_chattr(&stat->ctime);
478 	}
479 	return chattr;
480 }
481 
482 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
483 {
484 	struct kstat stat;
485 	struct file *filp = iocb->kiocb.ki_filp;
486 	struct nfs_pgio_header *hdr = iocb->hdr;
487 	struct nfs_fattr *fattr = hdr->res.fattr;
488 	int version = NFS_PROTO(hdr->inode)->version;
489 
490 	if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
491 		return;
492 
493 	fattr->valid = (NFS_ATTR_FATTR_FILEID |
494 			NFS_ATTR_FATTR_CHANGE |
495 			NFS_ATTR_FATTR_SIZE |
496 			NFS_ATTR_FATTR_ATIME |
497 			NFS_ATTR_FATTR_MTIME |
498 			NFS_ATTR_FATTR_CTIME |
499 			NFS_ATTR_FATTR_SPACE_USED);
500 
501 	fattr->fileid = stat.ino;
502 	fattr->size = stat.size;
503 	fattr->atime = stat.atime;
504 	fattr->mtime = stat.mtime;
505 	fattr->ctime = stat.ctime;
506 	if (version == 4) {
507 		fattr->change_attr =
508 			__nfsd4_change_attribute(&stat, file_inode(filp));
509 	} else
510 		fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
511 	fattr->du.nfs3.used = stat.blocks << 9;
512 }
513 
514 static void
515 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
516 {
517 	struct nfs_pgio_header *hdr = iocb->hdr;
518 	struct inode *inode = hdr->inode;
519 
520 	dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
521 
522 	/* Handle short writes as if they are ENOSPC */
523 	if (status > 0 && status < hdr->args.count) {
524 		hdr->mds_offset += status;
525 		hdr->args.offset += status;
526 		hdr->args.pgbase += status;
527 		hdr->args.count -= status;
528 		nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
529 		status = -ENOSPC;
530 	}
531 	if (status < 0)
532 		nfs_reset_boot_verifier(inode);
533 
534 	nfs_local_pgio_done(hdr, status);
535 }
536 
537 static void nfs_local_call_write(struct work_struct *work)
538 {
539 	struct nfs_local_kiocb *iocb =
540 		container_of(work, struct nfs_local_kiocb, work);
541 	struct file *filp = iocb->kiocb.ki_filp;
542 	unsigned long old_flags = current->flags;
543 	const struct cred *save_cred;
544 	struct iov_iter iter;
545 	ssize_t status;
546 
547 	current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
548 	save_cred = override_creds(filp->f_cred);
549 
550 	nfs_local_iter_init(&iter, iocb, WRITE);
551 
552 	file_start_write(filp);
553 	status = filp->f_op->write_iter(&iocb->kiocb, &iter);
554 	file_end_write(filp);
555 	WARN_ON_ONCE(status == -EIOCBQUEUED);
556 
557 	nfs_local_write_done(iocb, status);
558 	nfs_local_vfs_getattr(iocb);
559 	nfs_local_pgio_release(iocb);
560 
561 	revert_creds(save_cred);
562 	current->flags = old_flags;
563 }
564 
565 static int
566 nfs_do_local_write(struct nfs_pgio_header *hdr,
567 		   struct nfsd_file *localio,
568 		   const struct rpc_call_ops *call_ops)
569 {
570 	struct nfs_local_kiocb *iocb;
571 	struct file *file = nfs_to->nfsd_file_file(localio);
572 
573 	/* Don't support filesystems without write_iter */
574 	if (!file->f_op->write_iter)
575 		return -EAGAIN;
576 
577 	dprintk("%s: vfs_write count=%u pos=%llu %s\n",
578 		__func__, hdr->args.count, hdr->args.offset,
579 		(hdr->args.stable == NFS_UNSTABLE) ?  "unstable" : "stable");
580 
581 	iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO);
582 	if (iocb == NULL)
583 		return -ENOMEM;
584 	iocb->localio = localio;
585 
586 	switch (hdr->args.stable) {
587 	default:
588 		break;
589 	case NFS_DATA_SYNC:
590 		iocb->kiocb.ki_flags |= IOCB_DSYNC;
591 		break;
592 	case NFS_FILE_SYNC:
593 		iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
594 	}
595 	nfs_local_pgio_init(hdr, call_ops);
596 
597 	nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
598 
599 	INIT_WORK(&iocb->work, nfs_local_call_write);
600 	queue_work(nfslocaliod_workqueue, &iocb->work);
601 
602 	return 0;
603 }
604 
605 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
606 		   struct nfs_pgio_header *hdr,
607 		   const struct rpc_call_ops *call_ops)
608 {
609 	int status = 0;
610 
611 	if (!hdr->args.count)
612 		return 0;
613 
614 	switch (hdr->rw_mode) {
615 	case FMODE_READ:
616 		status = nfs_do_local_read(hdr, localio, call_ops);
617 		break;
618 	case FMODE_WRITE:
619 		status = nfs_do_local_write(hdr, localio, call_ops);
620 		break;
621 	default:
622 		dprintk("%s: invalid mode: %d\n", __func__,
623 			hdr->rw_mode);
624 		status = -EINVAL;
625 	}
626 
627 	if (status != 0) {
628 		if (status == -EAGAIN)
629 			nfs_local_disable(clp);
630 		nfs_to_nfsd_file_put_local(localio);
631 		hdr->task.tk_status = status;
632 		nfs_local_hdr_release(hdr, call_ops);
633 	}
634 	return status;
635 }
636 
637 static void
638 nfs_local_init_commit(struct nfs_commit_data *data,
639 		const struct rpc_call_ops *call_ops)
640 {
641 	data->task.tk_ops = call_ops;
642 }
643 
644 static int
645 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
646 {
647 	loff_t start = data->args.offset;
648 	loff_t end = LLONG_MAX;
649 
650 	if (data->args.count > 0) {
651 		end = start + data->args.count - 1;
652 		if (end < start)
653 			end = LLONG_MAX;
654 	}
655 
656 	dprintk("%s: commit %llu - %llu\n", __func__, start, end);
657 	return vfs_fsync_range(filp, start, end, 0);
658 }
659 
660 static void
661 nfs_local_commit_done(struct nfs_commit_data *data, int status)
662 {
663 	if (status >= 0) {
664 		nfs_set_local_verifier(data->inode,
665 				data->res.verf,
666 				NFS_FILE_SYNC);
667 		data->res.op_status = NFS4_OK;
668 		data->task.tk_status = 0;
669 	} else {
670 		nfs_reset_boot_verifier(data->inode);
671 		data->res.op_status = nfs4_stat_to_errno(status);
672 		data->task.tk_status = status;
673 	}
674 }
675 
676 static void
677 nfs_local_release_commit_data(struct nfsd_file *localio,
678 		struct nfs_commit_data *data,
679 		const struct rpc_call_ops *call_ops)
680 {
681 	nfs_to_nfsd_file_put_local(localio);
682 	call_ops->rpc_call_done(&data->task, data);
683 	call_ops->rpc_release(data);
684 }
685 
686 static void
687 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
688 {
689 	nfs_local_release_commit_data(ctx->localio, ctx->data,
690 				      ctx->data->task.tk_ops);
691 	kfree(ctx);
692 }
693 
694 static void
695 nfs_local_fsync_work(struct work_struct *work)
696 {
697 	struct nfs_local_fsync_ctx *ctx;
698 	int status;
699 
700 	ctx = container_of(work, struct nfs_local_fsync_ctx, work);
701 
702 	status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
703 				      ctx->data);
704 	nfs_local_commit_done(ctx->data, status);
705 	if (ctx->done != NULL)
706 		complete(ctx->done);
707 	nfs_local_fsync_ctx_free(ctx);
708 }
709 
710 static struct nfs_local_fsync_ctx *
711 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
712 			  struct nfsd_file *localio, gfp_t flags)
713 {
714 	struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
715 
716 	if (ctx != NULL) {
717 		ctx->localio = localio;
718 		ctx->data = data;
719 		INIT_WORK(&ctx->work, nfs_local_fsync_work);
720 		ctx->done = NULL;
721 	}
722 	return ctx;
723 }
724 
725 int nfs_local_commit(struct nfsd_file *localio,
726 		     struct nfs_commit_data *data,
727 		     const struct rpc_call_ops *call_ops, int how)
728 {
729 	struct nfs_local_fsync_ctx *ctx;
730 
731 	ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
732 	if (!ctx) {
733 		nfs_local_commit_done(data, -ENOMEM);
734 		nfs_local_release_commit_data(localio, data, call_ops);
735 		return -ENOMEM;
736 	}
737 
738 	nfs_local_init_commit(data, call_ops);
739 
740 	if (how & FLUSH_SYNC) {
741 		DECLARE_COMPLETION_ONSTACK(done);
742 		ctx->done = &done;
743 		queue_work(nfsiod_workqueue, &ctx->work);
744 		wait_for_completion(&done);
745 	} else
746 		queue_work(nfsiod_workqueue, &ctx->work);
747 
748 	return 0;
749 }
750