1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * NFS client support for local clients to bypass network stack
4 *
5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8 * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9 */
10
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30
31 #define NFSDBG_FACILITY NFSDBG_VFS
32
33 struct nfs_local_kiocb {
34 struct kiocb kiocb;
35 struct bio_vec *bvec;
36 struct nfs_pgio_header *hdr;
37 struct work_struct work;
38 struct nfsd_file *localio;
39 };
40
41 struct nfs_local_fsync_ctx {
42 struct nfsd_file *localio;
43 struct nfs_commit_data *data;
44 struct work_struct work;
45 struct kref kref;
46 struct completion *done;
47 };
48 static void nfs_local_fsync_work(struct work_struct *work);
49
50 static bool localio_enabled __read_mostly = true;
51 module_param(localio_enabled, bool, 0644);
52
nfs_client_is_local(const struct nfs_client * clp)53 static inline bool nfs_client_is_local(const struct nfs_client *clp)
54 {
55 return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
56 }
57
nfs_server_is_local(const struct nfs_client * clp)58 bool nfs_server_is_local(const struct nfs_client *clp)
59 {
60 return nfs_client_is_local(clp) && localio_enabled;
61 }
62 EXPORT_SYMBOL_GPL(nfs_server_is_local);
63
64 /*
65 * UUID_IS_LOCAL XDR functions
66 */
67
localio_xdr_enc_uuidargs(struct rpc_rqst * req,struct xdr_stream * xdr,const void * data)68 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
69 struct xdr_stream *xdr,
70 const void *data)
71 {
72 const u8 *uuid = data;
73
74 encode_opaque_fixed(xdr, uuid, UUID_SIZE);
75 }
76
localio_xdr_dec_uuidres(struct rpc_rqst * req,struct xdr_stream * xdr,void * result)77 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
78 struct xdr_stream *xdr,
79 void *result)
80 {
81 /* void return */
82 return 0;
83 }
84
85 static const struct rpc_procinfo nfs_localio_procedures[] = {
86 [LOCALIOPROC_UUID_IS_LOCAL] = {
87 .p_proc = LOCALIOPROC_UUID_IS_LOCAL,
88 .p_encode = localio_xdr_enc_uuidargs,
89 .p_decode = localio_xdr_dec_uuidres,
90 .p_arglen = XDR_QUADLEN(UUID_SIZE),
91 .p_replen = 0,
92 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
93 .p_name = "UUID_IS_LOCAL",
94 },
95 };
96
97 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
98 static const struct rpc_version nfslocalio_version1 = {
99 .number = 1,
100 .nrprocs = ARRAY_SIZE(nfs_localio_procedures),
101 .procs = nfs_localio_procedures,
102 .counts = nfs_localio_counts,
103 };
104
105 static const struct rpc_version *nfslocalio_version[] = {
106 [1] = &nfslocalio_version1,
107 };
108
109 extern const struct rpc_program nfslocalio_program;
110 static struct rpc_stat nfslocalio_rpcstat = { &nfslocalio_program };
111
112 const struct rpc_program nfslocalio_program = {
113 .name = "nfslocalio",
114 .number = NFS_LOCALIO_PROGRAM,
115 .nrvers = ARRAY_SIZE(nfslocalio_version),
116 .version = nfslocalio_version,
117 .stats = &nfslocalio_rpcstat,
118 };
119
120 /*
121 * nfs_local_enable - enable local i/o for an nfs_client
122 */
nfs_local_enable(struct nfs_client * clp)123 static void nfs_local_enable(struct nfs_client *clp)
124 {
125 spin_lock(&clp->cl_localio_lock);
126 set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
127 trace_nfs_local_enable(clp);
128 spin_unlock(&clp->cl_localio_lock);
129 }
130
131 /*
132 * nfs_local_disable - disable local i/o for an nfs_client
133 */
nfs_local_disable(struct nfs_client * clp)134 void nfs_local_disable(struct nfs_client *clp)
135 {
136 spin_lock(&clp->cl_localio_lock);
137 if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) {
138 trace_nfs_local_disable(clp);
139 nfs_uuid_invalidate_one_client(&clp->cl_uuid);
140 }
141 spin_unlock(&clp->cl_localio_lock);
142 }
143
144 /*
145 * nfs_init_localioclient - Initialise an NFS localio client connection
146 */
nfs_init_localioclient(struct nfs_client * clp)147 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
148 {
149 struct rpc_clnt *rpcclient_localio;
150
151 rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
152 &nfslocalio_program, 1);
153
154 dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
155 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
156 (IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
157
158 return rpcclient_localio;
159 }
160
nfs_server_uuid_is_local(struct nfs_client * clp)161 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
162 {
163 u8 uuid[UUID_SIZE];
164 struct rpc_message msg = {
165 .rpc_argp = &uuid,
166 };
167 struct rpc_clnt *rpcclient_localio;
168 int status;
169
170 rpcclient_localio = nfs_init_localioclient(clp);
171 if (IS_ERR(rpcclient_localio))
172 return false;
173
174 export_uuid(uuid, &clp->cl_uuid.uuid);
175
176 msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
177 status = rpc_call_sync(rpcclient_localio, &msg, 0);
178 dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
179 __func__, status);
180 rpc_shutdown_client(rpcclient_localio);
181
182 /* Server is only local if it initialized required struct members */
183 if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom)
184 return false;
185
186 return true;
187 }
188
189 /*
190 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
191 * - called after alloc_client and init_client (so cl_rpcclient exists)
192 * - this function is idempotent, it can be called for old or new clients
193 */
nfs_local_probe(struct nfs_client * clp)194 void nfs_local_probe(struct nfs_client *clp)
195 {
196 /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
197 if (!localio_enabled ||
198 clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
199 nfs_local_disable(clp);
200 return;
201 }
202
203 if (nfs_client_is_local(clp)) {
204 /* If already enabled, disable and re-enable */
205 nfs_local_disable(clp);
206 }
207
208 nfs_uuid_begin(&clp->cl_uuid);
209 if (nfs_server_uuid_is_local(clp))
210 nfs_local_enable(clp);
211 nfs_uuid_end(&clp->cl_uuid);
212 }
213 EXPORT_SYMBOL_GPL(nfs_local_probe);
214
215 /*
216 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file
217 *
218 * Returns a pointer to a struct nfsd_file or NULL
219 */
220 struct nfsd_file *
nfs_local_open_fh(struct nfs_client * clp,const struct cred * cred,struct nfs_fh * fh,const fmode_t mode)221 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
222 struct nfs_fh *fh, const fmode_t mode)
223 {
224 struct nfsd_file *localio;
225 int status;
226
227 if (!nfs_server_is_local(clp))
228 return NULL;
229 if (mode & ~(FMODE_READ | FMODE_WRITE))
230 return NULL;
231
232 localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
233 cred, fh, mode);
234 if (IS_ERR(localio)) {
235 status = PTR_ERR(localio);
236 trace_nfs_local_open_fh(fh, mode, status);
237 switch (status) {
238 case -ENOMEM:
239 case -ENXIO:
240 case -ENOENT:
241 /* Revalidate localio, will disable if unsupported */
242 nfs_local_probe(clp);
243 }
244 return NULL;
245 }
246 return localio;
247 }
248 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
249
250 static struct bio_vec *
nfs_bvec_alloc_and_import_pagevec(struct page ** pagevec,unsigned int npages,gfp_t flags)251 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec,
252 unsigned int npages, gfp_t flags)
253 {
254 struct bio_vec *bvec, *p;
255
256 bvec = kmalloc_array(npages, sizeof(*bvec), flags);
257 if (bvec != NULL) {
258 for (p = bvec; npages > 0; p++, pagevec++, npages--) {
259 p->bv_page = *pagevec;
260 p->bv_len = PAGE_SIZE;
261 p->bv_offset = 0;
262 }
263 }
264 return bvec;
265 }
266
267 static void
nfs_local_iocb_free(struct nfs_local_kiocb * iocb)268 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
269 {
270 kfree(iocb->bvec);
271 kfree(iocb);
272 }
273
274 static struct nfs_local_kiocb *
nfs_local_iocb_alloc(struct nfs_pgio_header * hdr,struct nfsd_file * localio,gfp_t flags)275 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
276 struct nfsd_file *localio, gfp_t flags)
277 {
278 struct nfs_local_kiocb *iocb;
279
280 iocb = kmalloc(sizeof(*iocb), flags);
281 if (iocb == NULL)
282 return NULL;
283 iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec,
284 hdr->page_array.npages, flags);
285 if (iocb->bvec == NULL) {
286 kfree(iocb);
287 return NULL;
288 }
289 init_sync_kiocb(&iocb->kiocb, nfs_to->nfsd_file_file(localio));
290 iocb->kiocb.ki_pos = hdr->args.offset;
291 iocb->localio = localio;
292 iocb->hdr = hdr;
293 iocb->kiocb.ki_flags &= ~IOCB_APPEND;
294 return iocb;
295 }
296
297 static void
nfs_local_iter_init(struct iov_iter * i,struct nfs_local_kiocb * iocb,int dir)298 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir)
299 {
300 struct nfs_pgio_header *hdr = iocb->hdr;
301
302 iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages,
303 hdr->args.count + hdr->args.pgbase);
304 if (hdr->args.pgbase != 0)
305 iov_iter_advance(i, hdr->args.pgbase);
306 }
307
308 static void
nfs_local_hdr_release(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)309 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
310 const struct rpc_call_ops *call_ops)
311 {
312 call_ops->rpc_call_done(&hdr->task, hdr);
313 call_ops->rpc_release(hdr);
314 }
315
316 static void
nfs_local_pgio_init(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)317 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
318 const struct rpc_call_ops *call_ops)
319 {
320 hdr->task.tk_ops = call_ops;
321 if (!hdr->task.tk_start)
322 hdr->task.tk_start = ktime_get();
323 }
324
325 static void
nfs_local_pgio_done(struct nfs_pgio_header * hdr,long status)326 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
327 {
328 if (status >= 0) {
329 hdr->res.count = status;
330 hdr->res.op_status = NFS4_OK;
331 hdr->task.tk_status = 0;
332 } else {
333 hdr->res.op_status = nfs4_stat_to_errno(status);
334 hdr->task.tk_status = status;
335 }
336 }
337
338 static void
nfs_local_pgio_release(struct nfs_local_kiocb * iocb)339 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
340 {
341 struct nfs_pgio_header *hdr = iocb->hdr;
342
343 nfs_to_nfsd_file_put_local(iocb->localio);
344 nfs_local_iocb_free(iocb);
345 nfs_local_hdr_release(hdr, hdr->task.tk_ops);
346 }
347
348 static void
nfs_local_read_done(struct nfs_local_kiocb * iocb,long status)349 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
350 {
351 struct nfs_pgio_header *hdr = iocb->hdr;
352 struct file *filp = iocb->kiocb.ki_filp;
353
354 nfs_local_pgio_done(hdr, status);
355
356 if (hdr->res.count != hdr->args.count ||
357 hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
358 hdr->res.eof = true;
359
360 dprintk("%s: read %ld bytes eof %d.\n", __func__,
361 status > 0 ? status : 0, hdr->res.eof);
362 }
363
nfs_local_call_read(struct work_struct * work)364 static void nfs_local_call_read(struct work_struct *work)
365 {
366 struct nfs_local_kiocb *iocb =
367 container_of(work, struct nfs_local_kiocb, work);
368 struct file *filp = iocb->kiocb.ki_filp;
369 const struct cred *save_cred;
370 struct iov_iter iter;
371 ssize_t status;
372
373 save_cred = override_creds(filp->f_cred);
374
375 nfs_local_iter_init(&iter, iocb, READ);
376
377 status = filp->f_op->read_iter(&iocb->kiocb, &iter);
378 WARN_ON_ONCE(status == -EIOCBQUEUED);
379
380 nfs_local_read_done(iocb, status);
381 nfs_local_pgio_release(iocb);
382
383 revert_creds(save_cred);
384 }
385
386 static int
nfs_do_local_read(struct nfs_pgio_header * hdr,struct nfsd_file * localio,const struct rpc_call_ops * call_ops)387 nfs_do_local_read(struct nfs_pgio_header *hdr,
388 struct nfsd_file *localio,
389 const struct rpc_call_ops *call_ops)
390 {
391 struct nfs_local_kiocb *iocb;
392
393 dprintk("%s: vfs_read count=%u pos=%llu\n",
394 __func__, hdr->args.count, hdr->args.offset);
395
396 iocb = nfs_local_iocb_alloc(hdr, localio, GFP_KERNEL);
397 if (iocb == NULL)
398 return -ENOMEM;
399
400 nfs_local_pgio_init(hdr, call_ops);
401 hdr->res.eof = false;
402
403 INIT_WORK(&iocb->work, nfs_local_call_read);
404 queue_work(nfslocaliod_workqueue, &iocb->work);
405
406 return 0;
407 }
408
409 static void
nfs_copy_boot_verifier(struct nfs_write_verifier * verifier,struct inode * inode)410 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
411 {
412 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
413 u32 *verf = (u32 *)verifier->data;
414 int seq = 0;
415
416 do {
417 read_seqbegin_or_lock(&clp->cl_boot_lock, &seq);
418 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
419 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
420 } while (need_seqretry(&clp->cl_boot_lock, seq));
421 done_seqretry(&clp->cl_boot_lock, seq);
422 }
423
424 static void
nfs_reset_boot_verifier(struct inode * inode)425 nfs_reset_boot_verifier(struct inode *inode)
426 {
427 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
428
429 write_seqlock(&clp->cl_boot_lock);
430 ktime_get_real_ts64(&clp->cl_nfssvc_boot);
431 write_sequnlock(&clp->cl_boot_lock);
432 }
433
434 static void
nfs_set_local_verifier(struct inode * inode,struct nfs_writeverf * verf,enum nfs3_stable_how how)435 nfs_set_local_verifier(struct inode *inode,
436 struct nfs_writeverf *verf,
437 enum nfs3_stable_how how)
438 {
439 nfs_copy_boot_verifier(&verf->verifier, inode);
440 verf->committed = how;
441 }
442
443 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
__vfs_getattr(struct path * p,struct kstat * stat,int version)444 static int __vfs_getattr(struct path *p, struct kstat *stat, int version)
445 {
446 u32 request_mask = STATX_BASIC_STATS;
447
448 if (version == 4)
449 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
450 return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
451 }
452
453 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
__nfsd4_change_attribute(const struct kstat * stat,const struct inode * inode)454 static u64 __nfsd4_change_attribute(const struct kstat *stat,
455 const struct inode *inode)
456 {
457 u64 chattr;
458
459 if (stat->result_mask & STATX_CHANGE_COOKIE) {
460 chattr = stat->change_cookie;
461 if (S_ISREG(inode->i_mode) &&
462 !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
463 chattr += (u64)stat->ctime.tv_sec << 30;
464 chattr += stat->ctime.tv_nsec;
465 }
466 } else {
467 chattr = time_to_chattr(&stat->ctime);
468 }
469 return chattr;
470 }
471
nfs_local_vfs_getattr(struct nfs_local_kiocb * iocb)472 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
473 {
474 struct kstat stat;
475 struct file *filp = iocb->kiocb.ki_filp;
476 struct nfs_pgio_header *hdr = iocb->hdr;
477 struct nfs_fattr *fattr = hdr->res.fattr;
478 int version = NFS_PROTO(hdr->inode)->version;
479
480 if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
481 return;
482
483 fattr->valid = (NFS_ATTR_FATTR_FILEID |
484 NFS_ATTR_FATTR_CHANGE |
485 NFS_ATTR_FATTR_SIZE |
486 NFS_ATTR_FATTR_ATIME |
487 NFS_ATTR_FATTR_MTIME |
488 NFS_ATTR_FATTR_CTIME |
489 NFS_ATTR_FATTR_SPACE_USED);
490
491 fattr->fileid = stat.ino;
492 fattr->size = stat.size;
493 fattr->atime = stat.atime;
494 fattr->mtime = stat.mtime;
495 fattr->ctime = stat.ctime;
496 if (version == 4) {
497 fattr->change_attr =
498 __nfsd4_change_attribute(&stat, file_inode(filp));
499 } else
500 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
501 fattr->du.nfs3.used = stat.blocks << 9;
502 }
503
504 static void
nfs_local_write_done(struct nfs_local_kiocb * iocb,long status)505 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
506 {
507 struct nfs_pgio_header *hdr = iocb->hdr;
508 struct inode *inode = hdr->inode;
509
510 dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
511
512 /* Handle short writes as if they are ENOSPC */
513 if (status > 0 && status < hdr->args.count) {
514 hdr->mds_offset += status;
515 hdr->args.offset += status;
516 hdr->args.pgbase += status;
517 hdr->args.count -= status;
518 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
519 status = -ENOSPC;
520 }
521 if (status < 0)
522 nfs_reset_boot_verifier(inode);
523 else if (nfs_should_remove_suid(inode)) {
524 /* Deal with the suid/sgid bit corner case */
525 spin_lock(&inode->i_lock);
526 nfs_set_cache_invalid(inode, NFS_INO_INVALID_MODE);
527 spin_unlock(&inode->i_lock);
528 }
529 nfs_local_pgio_done(hdr, status);
530 }
531
nfs_local_call_write(struct work_struct * work)532 static void nfs_local_call_write(struct work_struct *work)
533 {
534 struct nfs_local_kiocb *iocb =
535 container_of(work, struct nfs_local_kiocb, work);
536 struct file *filp = iocb->kiocb.ki_filp;
537 unsigned long old_flags = current->flags;
538 const struct cred *save_cred;
539 struct iov_iter iter;
540 ssize_t status;
541
542 current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
543 save_cred = override_creds(filp->f_cred);
544
545 nfs_local_iter_init(&iter, iocb, WRITE);
546
547 file_start_write(filp);
548 status = filp->f_op->write_iter(&iocb->kiocb, &iter);
549 file_end_write(filp);
550 WARN_ON_ONCE(status == -EIOCBQUEUED);
551
552 nfs_local_write_done(iocb, status);
553 nfs_local_vfs_getattr(iocb);
554 nfs_local_pgio_release(iocb);
555
556 revert_creds(save_cred);
557 current->flags = old_flags;
558 }
559
560 static int
nfs_do_local_write(struct nfs_pgio_header * hdr,struct nfsd_file * localio,const struct rpc_call_ops * call_ops)561 nfs_do_local_write(struct nfs_pgio_header *hdr,
562 struct nfsd_file *localio,
563 const struct rpc_call_ops *call_ops)
564 {
565 struct nfs_local_kiocb *iocb;
566
567 dprintk("%s: vfs_write count=%u pos=%llu %s\n",
568 __func__, hdr->args.count, hdr->args.offset,
569 (hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable");
570
571 iocb = nfs_local_iocb_alloc(hdr, localio, GFP_NOIO);
572 if (iocb == NULL)
573 return -ENOMEM;
574
575 switch (hdr->args.stable) {
576 default:
577 break;
578 case NFS_DATA_SYNC:
579 iocb->kiocb.ki_flags |= IOCB_DSYNC;
580 break;
581 case NFS_FILE_SYNC:
582 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
583 }
584 nfs_local_pgio_init(hdr, call_ops);
585
586 nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
587
588 INIT_WORK(&iocb->work, nfs_local_call_write);
589 queue_work(nfslocaliod_workqueue, &iocb->work);
590
591 return 0;
592 }
593
nfs_local_doio(struct nfs_client * clp,struct nfsd_file * localio,struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)594 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
595 struct nfs_pgio_header *hdr,
596 const struct rpc_call_ops *call_ops)
597 {
598 int status = 0;
599 struct file *filp = nfs_to->nfsd_file_file(localio);
600
601 if (!hdr->args.count)
602 return 0;
603 /* Don't support filesystems without read_iter/write_iter */
604 if (!filp->f_op->read_iter || !filp->f_op->write_iter) {
605 nfs_local_disable(clp);
606 status = -EAGAIN;
607 goto out;
608 }
609
610 switch (hdr->rw_mode) {
611 case FMODE_READ:
612 status = nfs_do_local_read(hdr, localio, call_ops);
613 break;
614 case FMODE_WRITE:
615 status = nfs_do_local_write(hdr, localio, call_ops);
616 break;
617 default:
618 dprintk("%s: invalid mode: %d\n", __func__,
619 hdr->rw_mode);
620 status = -EINVAL;
621 }
622 out:
623 if (status != 0) {
624 nfs_to_nfsd_file_put_local(localio);
625 hdr->task.tk_status = status;
626 nfs_local_hdr_release(hdr, call_ops);
627 }
628 return status;
629 }
630
631 static void
nfs_local_init_commit(struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)632 nfs_local_init_commit(struct nfs_commit_data *data,
633 const struct rpc_call_ops *call_ops)
634 {
635 data->task.tk_ops = call_ops;
636 }
637
638 static int
nfs_local_run_commit(struct file * filp,struct nfs_commit_data * data)639 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
640 {
641 loff_t start = data->args.offset;
642 loff_t end = LLONG_MAX;
643
644 if (data->args.count > 0) {
645 end = start + data->args.count - 1;
646 if (end < start)
647 end = LLONG_MAX;
648 }
649
650 dprintk("%s: commit %llu - %llu\n", __func__, start, end);
651 return vfs_fsync_range(filp, start, end, 0);
652 }
653
654 static void
nfs_local_commit_done(struct nfs_commit_data * data,int status)655 nfs_local_commit_done(struct nfs_commit_data *data, int status)
656 {
657 if (status >= 0) {
658 nfs_set_local_verifier(data->inode,
659 data->res.verf,
660 NFS_FILE_SYNC);
661 data->res.op_status = NFS4_OK;
662 data->task.tk_status = 0;
663 } else {
664 nfs_reset_boot_verifier(data->inode);
665 data->res.op_status = nfs4_stat_to_errno(status);
666 data->task.tk_status = status;
667 }
668 }
669
670 static void
nfs_local_release_commit_data(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)671 nfs_local_release_commit_data(struct nfsd_file *localio,
672 struct nfs_commit_data *data,
673 const struct rpc_call_ops *call_ops)
674 {
675 nfs_to_nfsd_file_put_local(localio);
676 call_ops->rpc_call_done(&data->task, data);
677 call_ops->rpc_release(data);
678 }
679
680 static struct nfs_local_fsync_ctx *
nfs_local_fsync_ctx_alloc(struct nfs_commit_data * data,struct nfsd_file * localio,gfp_t flags)681 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
682 struct nfsd_file *localio, gfp_t flags)
683 {
684 struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
685
686 if (ctx != NULL) {
687 ctx->localio = localio;
688 ctx->data = data;
689 INIT_WORK(&ctx->work, nfs_local_fsync_work);
690 kref_init(&ctx->kref);
691 ctx->done = NULL;
692 }
693 return ctx;
694 }
695
696 static void
nfs_local_fsync_ctx_kref_free(struct kref * kref)697 nfs_local_fsync_ctx_kref_free(struct kref *kref)
698 {
699 kfree(container_of(kref, struct nfs_local_fsync_ctx, kref));
700 }
701
702 static void
nfs_local_fsync_ctx_put(struct nfs_local_fsync_ctx * ctx)703 nfs_local_fsync_ctx_put(struct nfs_local_fsync_ctx *ctx)
704 {
705 kref_put(&ctx->kref, nfs_local_fsync_ctx_kref_free);
706 }
707
708 static void
nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx * ctx)709 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
710 {
711 nfs_local_release_commit_data(ctx->localio, ctx->data,
712 ctx->data->task.tk_ops);
713 nfs_local_fsync_ctx_put(ctx);
714 }
715
716 static void
nfs_local_fsync_work(struct work_struct * work)717 nfs_local_fsync_work(struct work_struct *work)
718 {
719 struct nfs_local_fsync_ctx *ctx;
720 int status;
721
722 ctx = container_of(work, struct nfs_local_fsync_ctx, work);
723
724 status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
725 ctx->data);
726 nfs_local_commit_done(ctx->data, status);
727 if (ctx->done != NULL)
728 complete(ctx->done);
729 nfs_local_fsync_ctx_free(ctx);
730 }
731
nfs_local_commit(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops,int how)732 int nfs_local_commit(struct nfsd_file *localio,
733 struct nfs_commit_data *data,
734 const struct rpc_call_ops *call_ops, int how)
735 {
736 struct nfs_local_fsync_ctx *ctx;
737
738 ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
739 if (!ctx) {
740 nfs_local_commit_done(data, -ENOMEM);
741 nfs_local_release_commit_data(localio, data, call_ops);
742 return -ENOMEM;
743 }
744
745 nfs_local_init_commit(data, call_ops);
746 kref_get(&ctx->kref);
747 if (how & FLUSH_SYNC) {
748 DECLARE_COMPLETION_ONSTACK(done);
749 ctx->done = &done;
750 queue_work(nfsiod_workqueue, &ctx->work);
751 wait_for_completion(&done);
752 } else
753 queue_work(nfsiod_workqueue, &ctx->work);
754 nfs_local_fsync_ctx_put(ctx);
755 return 0;
756 }
757