1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * NFS client support for local clients to bypass network stack
4 *
5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8 * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9 */
10
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30
31 #define NFSDBG_FACILITY NFSDBG_VFS
32
33 struct nfs_local_kiocb {
34 struct kiocb kiocb;
35 struct bio_vec *bvec;
36 struct nfs_pgio_header *hdr;
37 struct work_struct work;
38 struct nfsd_file *localio;
39 };
40
41 struct nfs_local_fsync_ctx {
42 struct nfsd_file *localio;
43 struct nfs_commit_data *data;
44 struct work_struct work;
45 struct completion *done;
46 };
47
48 static bool localio_enabled __read_mostly = true;
49 module_param(localio_enabled, bool, 0644);
50
nfs_client_is_local(const struct nfs_client * clp)51 static inline bool nfs_client_is_local(const struct nfs_client *clp)
52 {
53 return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
54 }
55
nfs_server_is_local(const struct nfs_client * clp)56 bool nfs_server_is_local(const struct nfs_client *clp)
57 {
58 return nfs_client_is_local(clp) && localio_enabled;
59 }
60 EXPORT_SYMBOL_GPL(nfs_server_is_local);
61
62 /*
63 * UUID_IS_LOCAL XDR functions
64 */
65
localio_xdr_enc_uuidargs(struct rpc_rqst * req,struct xdr_stream * xdr,const void * data)66 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
67 struct xdr_stream *xdr,
68 const void *data)
69 {
70 const u8 *uuid = data;
71
72 encode_opaque_fixed(xdr, uuid, UUID_SIZE);
73 }
74
localio_xdr_dec_uuidres(struct rpc_rqst * req,struct xdr_stream * xdr,void * result)75 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
76 struct xdr_stream *xdr,
77 void *result)
78 {
79 /* void return */
80 return 0;
81 }
82
83 static const struct rpc_procinfo nfs_localio_procedures[] = {
84 [LOCALIOPROC_UUID_IS_LOCAL] = {
85 .p_proc = LOCALIOPROC_UUID_IS_LOCAL,
86 .p_encode = localio_xdr_enc_uuidargs,
87 .p_decode = localio_xdr_dec_uuidres,
88 .p_arglen = XDR_QUADLEN(UUID_SIZE),
89 .p_replen = 0,
90 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
91 .p_name = "UUID_IS_LOCAL",
92 },
93 };
94
95 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
96 static const struct rpc_version nfslocalio_version1 = {
97 .number = 1,
98 .nrprocs = ARRAY_SIZE(nfs_localio_procedures),
99 .procs = nfs_localio_procedures,
100 .counts = nfs_localio_counts,
101 };
102
103 static const struct rpc_version *nfslocalio_version[] = {
104 [1] = &nfslocalio_version1,
105 };
106
107 extern const struct rpc_program nfslocalio_program;
108 static struct rpc_stat nfslocalio_rpcstat = { &nfslocalio_program };
109
110 const struct rpc_program nfslocalio_program = {
111 .name = "nfslocalio",
112 .number = NFS_LOCALIO_PROGRAM,
113 .nrvers = ARRAY_SIZE(nfslocalio_version),
114 .version = nfslocalio_version,
115 .stats = &nfslocalio_rpcstat,
116 };
117
118 /*
119 * nfs_local_enable - enable local i/o for an nfs_client
120 */
nfs_local_enable(struct nfs_client * clp)121 static void nfs_local_enable(struct nfs_client *clp)
122 {
123 spin_lock(&clp->cl_localio_lock);
124 set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags);
125 trace_nfs_local_enable(clp);
126 spin_unlock(&clp->cl_localio_lock);
127 }
128
129 /*
130 * nfs_local_disable - disable local i/o for an nfs_client
131 */
nfs_local_disable(struct nfs_client * clp)132 void nfs_local_disable(struct nfs_client *clp)
133 {
134 spin_lock(&clp->cl_localio_lock);
135 if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) {
136 trace_nfs_local_disable(clp);
137 nfs_uuid_invalidate_one_client(&clp->cl_uuid);
138 }
139 spin_unlock(&clp->cl_localio_lock);
140 }
141
142 /*
143 * nfs_init_localioclient - Initialise an NFS localio client connection
144 */
nfs_init_localioclient(struct nfs_client * clp)145 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
146 {
147 struct rpc_clnt *rpcclient_localio;
148
149 rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
150 &nfslocalio_program, 1);
151
152 dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
153 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
154 (IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
155
156 return rpcclient_localio;
157 }
158
nfs_server_uuid_is_local(struct nfs_client * clp)159 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
160 {
161 u8 uuid[UUID_SIZE];
162 struct rpc_message msg = {
163 .rpc_argp = &uuid,
164 };
165 struct rpc_clnt *rpcclient_localio;
166 int status;
167
168 rpcclient_localio = nfs_init_localioclient(clp);
169 if (IS_ERR(rpcclient_localio))
170 return false;
171
172 export_uuid(uuid, &clp->cl_uuid.uuid);
173
174 msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
175 status = rpc_call_sync(rpcclient_localio, &msg, 0);
176 dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
177 __func__, status);
178 rpc_shutdown_client(rpcclient_localio);
179
180 /* Server is only local if it initialized required struct members */
181 if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom)
182 return false;
183
184 return true;
185 }
186
187 /*
188 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
189 * - called after alloc_client and init_client (so cl_rpcclient exists)
190 * - this function is idempotent, it can be called for old or new clients
191 */
nfs_local_probe(struct nfs_client * clp)192 void nfs_local_probe(struct nfs_client *clp)
193 {
194 /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
195 if (!localio_enabled ||
196 clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
197 nfs_local_disable(clp);
198 return;
199 }
200
201 if (nfs_client_is_local(clp)) {
202 /* If already enabled, disable and re-enable */
203 nfs_local_disable(clp);
204 }
205
206 if (!nfs_uuid_begin(&clp->cl_uuid))
207 return;
208 if (nfs_server_uuid_is_local(clp))
209 nfs_local_enable(clp);
210 nfs_uuid_end(&clp->cl_uuid);
211 }
212 EXPORT_SYMBOL_GPL(nfs_local_probe);
213
214 /*
215 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file
216 *
217 * Returns a pointer to a struct nfsd_file or NULL
218 */
219 struct nfsd_file *
nfs_local_open_fh(struct nfs_client * clp,const struct cred * cred,struct nfs_fh * fh,const fmode_t mode)220 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
221 struct nfs_fh *fh, const fmode_t mode)
222 {
223 struct nfsd_file *localio;
224 int status;
225
226 if (!nfs_server_is_local(clp))
227 return NULL;
228 if (mode & ~(FMODE_READ | FMODE_WRITE))
229 return NULL;
230
231 localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
232 cred, fh, mode);
233 if (IS_ERR(localio)) {
234 status = PTR_ERR(localio);
235 trace_nfs_local_open_fh(fh, mode, status);
236 switch (status) {
237 case -ENOMEM:
238 case -ENXIO:
239 case -ENOENT:
240 /* Revalidate localio, will disable if unsupported */
241 nfs_local_probe(clp);
242 }
243 return NULL;
244 }
245 return localio;
246 }
247 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
248
249 static struct bio_vec *
nfs_bvec_alloc_and_import_pagevec(struct page ** pagevec,unsigned int npages,gfp_t flags)250 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec,
251 unsigned int npages, gfp_t flags)
252 {
253 struct bio_vec *bvec, *p;
254
255 bvec = kmalloc_array(npages, sizeof(*bvec), flags);
256 if (bvec != NULL) {
257 for (p = bvec; npages > 0; p++, pagevec++, npages--) {
258 p->bv_page = *pagevec;
259 p->bv_len = PAGE_SIZE;
260 p->bv_offset = 0;
261 }
262 }
263 return bvec;
264 }
265
266 static void
nfs_local_iocb_free(struct nfs_local_kiocb * iocb)267 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
268 {
269 kfree(iocb->bvec);
270 kfree(iocb);
271 }
272
273 static struct nfs_local_kiocb *
nfs_local_iocb_alloc(struct nfs_pgio_header * hdr,struct file * file,gfp_t flags)274 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
275 struct file *file, gfp_t flags)
276 {
277 struct nfs_local_kiocb *iocb;
278
279 iocb = kmalloc(sizeof(*iocb), flags);
280 if (iocb == NULL)
281 return NULL;
282 iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec,
283 hdr->page_array.npages, flags);
284 if (iocb->bvec == NULL) {
285 kfree(iocb);
286 return NULL;
287 }
288 init_sync_kiocb(&iocb->kiocb, file);
289 iocb->kiocb.ki_pos = hdr->args.offset;
290 iocb->hdr = hdr;
291 iocb->kiocb.ki_flags &= ~IOCB_APPEND;
292 return iocb;
293 }
294
295 static void
nfs_local_iter_init(struct iov_iter * i,struct nfs_local_kiocb * iocb,int dir)296 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir)
297 {
298 struct nfs_pgio_header *hdr = iocb->hdr;
299
300 iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages,
301 hdr->args.count + hdr->args.pgbase);
302 if (hdr->args.pgbase != 0)
303 iov_iter_advance(i, hdr->args.pgbase);
304 }
305
306 static void
nfs_local_hdr_release(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)307 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
308 const struct rpc_call_ops *call_ops)
309 {
310 call_ops->rpc_call_done(&hdr->task, hdr);
311 call_ops->rpc_release(hdr);
312 }
313
314 static void
nfs_local_pgio_init(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)315 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
316 const struct rpc_call_ops *call_ops)
317 {
318 hdr->task.tk_ops = call_ops;
319 if (!hdr->task.tk_start)
320 hdr->task.tk_start = ktime_get();
321 }
322
323 static void
nfs_local_pgio_done(struct nfs_pgio_header * hdr,long status)324 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
325 {
326 if (status >= 0) {
327 hdr->res.count = status;
328 hdr->res.op_status = NFS4_OK;
329 hdr->task.tk_status = 0;
330 } else {
331 hdr->res.op_status = nfs4_stat_to_errno(status);
332 hdr->task.tk_status = status;
333 }
334 }
335
336 static void
nfs_local_pgio_release(struct nfs_local_kiocb * iocb)337 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
338 {
339 struct nfs_pgio_header *hdr = iocb->hdr;
340
341 nfs_to_nfsd_file_put_local(iocb->localio);
342 nfs_local_iocb_free(iocb);
343 nfs_local_hdr_release(hdr, hdr->task.tk_ops);
344 }
345
346 static void
nfs_local_read_done(struct nfs_local_kiocb * iocb,long status)347 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
348 {
349 struct nfs_pgio_header *hdr = iocb->hdr;
350 struct file *filp = iocb->kiocb.ki_filp;
351
352 nfs_local_pgio_done(hdr, status);
353
354 /*
355 * Must clear replen otherwise NFSv3 data corruption will occur
356 * if/when switching from LOCALIO back to using normal RPC.
357 */
358 hdr->res.replen = 0;
359
360 if (hdr->res.count != hdr->args.count ||
361 hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
362 hdr->res.eof = true;
363
364 dprintk("%s: read %ld bytes eof %d.\n", __func__,
365 status > 0 ? status : 0, hdr->res.eof);
366 }
367
nfs_local_call_read(struct work_struct * work)368 static void nfs_local_call_read(struct work_struct *work)
369 {
370 struct nfs_local_kiocb *iocb =
371 container_of(work, struct nfs_local_kiocb, work);
372 struct file *filp = iocb->kiocb.ki_filp;
373 const struct cred *save_cred;
374 struct iov_iter iter;
375 ssize_t status;
376
377 save_cred = override_creds(filp->f_cred);
378
379 nfs_local_iter_init(&iter, iocb, READ);
380
381 status = filp->f_op->read_iter(&iocb->kiocb, &iter);
382 WARN_ON_ONCE(status == -EIOCBQUEUED);
383
384 nfs_local_read_done(iocb, status);
385 nfs_local_pgio_release(iocb);
386
387 revert_creds(save_cred);
388 }
389
390 static int
nfs_do_local_read(struct nfs_pgio_header * hdr,struct nfsd_file * localio,const struct rpc_call_ops * call_ops)391 nfs_do_local_read(struct nfs_pgio_header *hdr,
392 struct nfsd_file *localio,
393 const struct rpc_call_ops *call_ops)
394 {
395 struct nfs_local_kiocb *iocb;
396 struct file *file = nfs_to->nfsd_file_file(localio);
397
398 /* Don't support filesystems without read_iter */
399 if (!file->f_op->read_iter)
400 return -EAGAIN;
401
402 dprintk("%s: vfs_read count=%u pos=%llu\n",
403 __func__, hdr->args.count, hdr->args.offset);
404
405 iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL);
406 if (iocb == NULL)
407 return -ENOMEM;
408 iocb->localio = localio;
409
410 nfs_local_pgio_init(hdr, call_ops);
411 hdr->res.eof = false;
412
413 INIT_WORK(&iocb->work, nfs_local_call_read);
414 queue_work(nfslocaliod_workqueue, &iocb->work);
415
416 return 0;
417 }
418
419 static void
nfs_copy_boot_verifier(struct nfs_write_verifier * verifier,struct inode * inode)420 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
421 {
422 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
423 u32 *verf = (u32 *)verifier->data;
424 int seq = 0;
425
426 do {
427 read_seqbegin_or_lock(&clp->cl_boot_lock, &seq);
428 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
429 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
430 } while (need_seqretry(&clp->cl_boot_lock, seq));
431 done_seqretry(&clp->cl_boot_lock, seq);
432 }
433
434 static void
nfs_reset_boot_verifier(struct inode * inode)435 nfs_reset_boot_verifier(struct inode *inode)
436 {
437 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
438
439 write_seqlock(&clp->cl_boot_lock);
440 ktime_get_real_ts64(&clp->cl_nfssvc_boot);
441 write_sequnlock(&clp->cl_boot_lock);
442 }
443
444 static void
nfs_set_local_verifier(struct inode * inode,struct nfs_writeverf * verf,enum nfs3_stable_how how)445 nfs_set_local_verifier(struct inode *inode,
446 struct nfs_writeverf *verf,
447 enum nfs3_stable_how how)
448 {
449 nfs_copy_boot_verifier(&verf->verifier, inode);
450 verf->committed = how;
451 }
452
453 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
__vfs_getattr(struct path * p,struct kstat * stat,int version)454 static int __vfs_getattr(struct path *p, struct kstat *stat, int version)
455 {
456 u32 request_mask = STATX_BASIC_STATS;
457
458 if (version == 4)
459 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
460 return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
461 }
462
463 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
__nfsd4_change_attribute(const struct kstat * stat,const struct inode * inode)464 static u64 __nfsd4_change_attribute(const struct kstat *stat,
465 const struct inode *inode)
466 {
467 u64 chattr;
468
469 if (stat->result_mask & STATX_CHANGE_COOKIE) {
470 chattr = stat->change_cookie;
471 if (S_ISREG(inode->i_mode) &&
472 !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
473 chattr += (u64)stat->ctime.tv_sec << 30;
474 chattr += stat->ctime.tv_nsec;
475 }
476 } else {
477 chattr = time_to_chattr(&stat->ctime);
478 }
479 return chattr;
480 }
481
nfs_local_vfs_getattr(struct nfs_local_kiocb * iocb)482 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
483 {
484 struct kstat stat;
485 struct file *filp = iocb->kiocb.ki_filp;
486 struct nfs_pgio_header *hdr = iocb->hdr;
487 struct nfs_fattr *fattr = hdr->res.fattr;
488 int version = NFS_PROTO(hdr->inode)->version;
489
490 if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
491 return;
492
493 fattr->valid = (NFS_ATTR_FATTR_FILEID |
494 NFS_ATTR_FATTR_CHANGE |
495 NFS_ATTR_FATTR_SIZE |
496 NFS_ATTR_FATTR_ATIME |
497 NFS_ATTR_FATTR_MTIME |
498 NFS_ATTR_FATTR_CTIME |
499 NFS_ATTR_FATTR_SPACE_USED);
500
501 fattr->fileid = stat.ino;
502 fattr->size = stat.size;
503 fattr->atime = stat.atime;
504 fattr->mtime = stat.mtime;
505 fattr->ctime = stat.ctime;
506 if (version == 4) {
507 fattr->change_attr =
508 __nfsd4_change_attribute(&stat, file_inode(filp));
509 } else
510 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
511 fattr->du.nfs3.used = stat.blocks << 9;
512 }
513
514 static void
nfs_local_write_done(struct nfs_local_kiocb * iocb,long status)515 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
516 {
517 struct nfs_pgio_header *hdr = iocb->hdr;
518 struct inode *inode = hdr->inode;
519
520 dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
521
522 /* Handle short writes as if they are ENOSPC */
523 if (status > 0 && status < hdr->args.count) {
524 hdr->mds_offset += status;
525 hdr->args.offset += status;
526 hdr->args.pgbase += status;
527 hdr->args.count -= status;
528 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
529 status = -ENOSPC;
530 }
531 if (status < 0)
532 nfs_reset_boot_verifier(inode);
533
534 nfs_local_pgio_done(hdr, status);
535 }
536
nfs_local_call_write(struct work_struct * work)537 static void nfs_local_call_write(struct work_struct *work)
538 {
539 struct nfs_local_kiocb *iocb =
540 container_of(work, struct nfs_local_kiocb, work);
541 struct file *filp = iocb->kiocb.ki_filp;
542 unsigned long old_flags = current->flags;
543 const struct cred *save_cred;
544 struct iov_iter iter;
545 ssize_t status;
546
547 current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
548 save_cred = override_creds(filp->f_cred);
549
550 nfs_local_iter_init(&iter, iocb, WRITE);
551
552 file_start_write(filp);
553 status = filp->f_op->write_iter(&iocb->kiocb, &iter);
554 file_end_write(filp);
555 WARN_ON_ONCE(status == -EIOCBQUEUED);
556
557 nfs_local_write_done(iocb, status);
558 nfs_local_vfs_getattr(iocb);
559 nfs_local_pgio_release(iocb);
560
561 revert_creds(save_cred);
562 current->flags = old_flags;
563 }
564
565 static int
nfs_do_local_write(struct nfs_pgio_header * hdr,struct nfsd_file * localio,const struct rpc_call_ops * call_ops)566 nfs_do_local_write(struct nfs_pgio_header *hdr,
567 struct nfsd_file *localio,
568 const struct rpc_call_ops *call_ops)
569 {
570 struct nfs_local_kiocb *iocb;
571 struct file *file = nfs_to->nfsd_file_file(localio);
572
573 /* Don't support filesystems without write_iter */
574 if (!file->f_op->write_iter)
575 return -EAGAIN;
576
577 dprintk("%s: vfs_write count=%u pos=%llu %s\n",
578 __func__, hdr->args.count, hdr->args.offset,
579 (hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable");
580
581 iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO);
582 if (iocb == NULL)
583 return -ENOMEM;
584 iocb->localio = localio;
585
586 switch (hdr->args.stable) {
587 default:
588 break;
589 case NFS_DATA_SYNC:
590 iocb->kiocb.ki_flags |= IOCB_DSYNC;
591 break;
592 case NFS_FILE_SYNC:
593 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
594 }
595 nfs_local_pgio_init(hdr, call_ops);
596
597 nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
598
599 INIT_WORK(&iocb->work, nfs_local_call_write);
600 queue_work(nfslocaliod_workqueue, &iocb->work);
601
602 return 0;
603 }
604
nfs_local_doio(struct nfs_client * clp,struct nfsd_file * localio,struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)605 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
606 struct nfs_pgio_header *hdr,
607 const struct rpc_call_ops *call_ops)
608 {
609 int status = 0;
610
611 if (!hdr->args.count)
612 return 0;
613
614 switch (hdr->rw_mode) {
615 case FMODE_READ:
616 status = nfs_do_local_read(hdr, localio, call_ops);
617 break;
618 case FMODE_WRITE:
619 status = nfs_do_local_write(hdr, localio, call_ops);
620 break;
621 default:
622 dprintk("%s: invalid mode: %d\n", __func__,
623 hdr->rw_mode);
624 status = -EINVAL;
625 }
626
627 if (status != 0) {
628 if (status == -EAGAIN)
629 nfs_local_disable(clp);
630 nfs_to_nfsd_file_put_local(localio);
631 hdr->task.tk_status = status;
632 nfs_local_hdr_release(hdr, call_ops);
633 }
634 return status;
635 }
636
637 static void
nfs_local_init_commit(struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)638 nfs_local_init_commit(struct nfs_commit_data *data,
639 const struct rpc_call_ops *call_ops)
640 {
641 data->task.tk_ops = call_ops;
642 }
643
644 static int
nfs_local_run_commit(struct file * filp,struct nfs_commit_data * data)645 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
646 {
647 loff_t start = data->args.offset;
648 loff_t end = LLONG_MAX;
649
650 if (data->args.count > 0) {
651 end = start + data->args.count - 1;
652 if (end < start)
653 end = LLONG_MAX;
654 }
655
656 dprintk("%s: commit %llu - %llu\n", __func__, start, end);
657 return vfs_fsync_range(filp, start, end, 0);
658 }
659
660 static void
nfs_local_commit_done(struct nfs_commit_data * data,int status)661 nfs_local_commit_done(struct nfs_commit_data *data, int status)
662 {
663 if (status >= 0) {
664 nfs_set_local_verifier(data->inode,
665 data->res.verf,
666 NFS_FILE_SYNC);
667 data->res.op_status = NFS4_OK;
668 data->task.tk_status = 0;
669 } else {
670 nfs_reset_boot_verifier(data->inode);
671 data->res.op_status = nfs4_stat_to_errno(status);
672 data->task.tk_status = status;
673 }
674 }
675
676 static void
nfs_local_release_commit_data(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)677 nfs_local_release_commit_data(struct nfsd_file *localio,
678 struct nfs_commit_data *data,
679 const struct rpc_call_ops *call_ops)
680 {
681 nfs_to_nfsd_file_put_local(localio);
682 call_ops->rpc_call_done(&data->task, data);
683 call_ops->rpc_release(data);
684 }
685
686 static void
nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx * ctx)687 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
688 {
689 nfs_local_release_commit_data(ctx->localio, ctx->data,
690 ctx->data->task.tk_ops);
691 kfree(ctx);
692 }
693
694 static void
nfs_local_fsync_work(struct work_struct * work)695 nfs_local_fsync_work(struct work_struct *work)
696 {
697 struct nfs_local_fsync_ctx *ctx;
698 int status;
699
700 ctx = container_of(work, struct nfs_local_fsync_ctx, work);
701
702 status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
703 ctx->data);
704 nfs_local_commit_done(ctx->data, status);
705 if (ctx->done != NULL)
706 complete(ctx->done);
707 nfs_local_fsync_ctx_free(ctx);
708 }
709
710 static struct nfs_local_fsync_ctx *
nfs_local_fsync_ctx_alloc(struct nfs_commit_data * data,struct nfsd_file * localio,gfp_t flags)711 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
712 struct nfsd_file *localio, gfp_t flags)
713 {
714 struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
715
716 if (ctx != NULL) {
717 ctx->localio = localio;
718 ctx->data = data;
719 INIT_WORK(&ctx->work, nfs_local_fsync_work);
720 ctx->done = NULL;
721 }
722 return ctx;
723 }
724
nfs_local_commit(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops,int how)725 int nfs_local_commit(struct nfsd_file *localio,
726 struct nfs_commit_data *data,
727 const struct rpc_call_ops *call_ops, int how)
728 {
729 struct nfs_local_fsync_ctx *ctx;
730
731 ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
732 if (!ctx) {
733 nfs_local_commit_done(data, -ENOMEM);
734 nfs_local_release_commit_data(localio, data, call_ops);
735 return -ENOMEM;
736 }
737
738 nfs_local_init_commit(data, call_ops);
739
740 if (how & FLUSH_SYNC) {
741 DECLARE_COMPLETION_ONSTACK(done);
742 ctx->done = &done;
743 queue_work(nfsiod_workqueue, &ctx->work);
744 wait_for_completion(&done);
745 } else
746 queue_work(nfsiod_workqueue, &ctx->work);
747
748 return 0;
749 }
750