1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * NFS client support for local clients to bypass network stack
4 *
5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com>
6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com>
7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com>
8 * Copyright (C) 2024 NeilBrown <neilb@suse.de>
9 */
10
11 #include <linux/module.h>
12 #include <linux/errno.h>
13 #include <linux/vfs.h>
14 #include <linux/file.h>
15 #include <linux/inet.h>
16 #include <linux/sunrpc/addr.h>
17 #include <linux/inetdevice.h>
18 #include <net/addrconf.h>
19 #include <linux/nfs_common.h>
20 #include <linux/nfslocalio.h>
21 #include <linux/bvec.h>
22
23 #include <linux/nfs.h>
24 #include <linux/nfs_fs.h>
25 #include <linux/nfs_xdr.h>
26
27 #include "internal.h"
28 #include "pnfs.h"
29 #include "nfstrace.h"
30
31 #define NFSDBG_FACILITY NFSDBG_VFS
32
33 #define NFSLOCAL_MAX_IOS 3
34
35 struct nfs_local_kiocb {
36 struct kiocb kiocb;
37 struct bio_vec *bvec;
38 struct nfs_pgio_header *hdr;
39 struct work_struct work;
40 void (*aio_complete_work)(struct work_struct *);
41 struct nfsd_file *localio;
42 /* Begin mostly DIO-specific members */
43 size_t end_len;
44 short int end_iter_index;
45 short int n_iters;
46 bool iter_is_dio_aligned[NFSLOCAL_MAX_IOS];
47 loff_t offset[NFSLOCAL_MAX_IOS] ____cacheline_aligned;
48 struct iov_iter iters[NFSLOCAL_MAX_IOS];
49 /* End mostly DIO-specific members */
50 };
51
52 struct nfs_local_fsync_ctx {
53 struct nfsd_file *localio;
54 struct nfs_commit_data *data;
55 struct work_struct work;
56 struct completion *done;
57 };
58
59 static bool localio_enabled __read_mostly = true;
60 module_param(localio_enabled, bool, 0644);
61
nfs_client_is_local(const struct nfs_client * clp)62 static inline bool nfs_client_is_local(const struct nfs_client *clp)
63 {
64 return !!rcu_access_pointer(clp->cl_uuid.net);
65 }
66
nfs_server_is_local(const struct nfs_client * clp)67 bool nfs_server_is_local(const struct nfs_client *clp)
68 {
69 return nfs_client_is_local(clp) && localio_enabled;
70 }
71 EXPORT_SYMBOL_GPL(nfs_server_is_local);
72
73 /*
74 * UUID_IS_LOCAL XDR functions
75 */
76
localio_xdr_enc_uuidargs(struct rpc_rqst * req,struct xdr_stream * xdr,const void * data)77 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req,
78 struct xdr_stream *xdr,
79 const void *data)
80 {
81 const u8 *uuid = data;
82
83 encode_opaque_fixed(xdr, uuid, UUID_SIZE);
84 }
85
localio_xdr_dec_uuidres(struct rpc_rqst * req,struct xdr_stream * xdr,void * result)86 static int localio_xdr_dec_uuidres(struct rpc_rqst *req,
87 struct xdr_stream *xdr,
88 void *result)
89 {
90 /* void return */
91 return 0;
92 }
93
94 static const struct rpc_procinfo nfs_localio_procedures[] = {
95 [LOCALIOPROC_UUID_IS_LOCAL] = {
96 .p_proc = LOCALIOPROC_UUID_IS_LOCAL,
97 .p_encode = localio_xdr_enc_uuidargs,
98 .p_decode = localio_xdr_dec_uuidres,
99 .p_arglen = XDR_QUADLEN(UUID_SIZE),
100 .p_replen = 0,
101 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL,
102 .p_name = "UUID_IS_LOCAL",
103 },
104 };
105
106 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)];
107 static const struct rpc_version nfslocalio_version1 = {
108 .number = 1,
109 .nrprocs = ARRAY_SIZE(nfs_localio_procedures),
110 .procs = nfs_localio_procedures,
111 .counts = nfs_localio_counts,
112 };
113
114 static const struct rpc_version *nfslocalio_version[] = {
115 [1] = &nfslocalio_version1,
116 };
117
118 extern const struct rpc_program nfslocalio_program;
119 static struct rpc_stat nfslocalio_rpcstat = { &nfslocalio_program };
120
121 const struct rpc_program nfslocalio_program = {
122 .name = "nfslocalio",
123 .number = NFS_LOCALIO_PROGRAM,
124 .nrvers = ARRAY_SIZE(nfslocalio_version),
125 .version = nfslocalio_version,
126 .stats = &nfslocalio_rpcstat,
127 };
128
129 /*
130 * nfs_init_localioclient - Initialise an NFS localio client connection
131 */
nfs_init_localioclient(struct nfs_client * clp)132 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp)
133 {
134 struct rpc_clnt *rpcclient_localio;
135
136 rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient,
137 &nfslocalio_program, 1);
138
139 dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n",
140 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
141 (IS_ERR(rpcclient_localio) ? "does not support" : "supports"));
142
143 return rpcclient_localio;
144 }
145
nfs_server_uuid_is_local(struct nfs_client * clp)146 static bool nfs_server_uuid_is_local(struct nfs_client *clp)
147 {
148 u8 uuid[UUID_SIZE];
149 struct rpc_message msg = {
150 .rpc_argp = &uuid,
151 };
152 struct rpc_clnt *rpcclient_localio;
153 int status;
154
155 rpcclient_localio = nfs_init_localioclient(clp);
156 if (IS_ERR(rpcclient_localio))
157 return false;
158
159 export_uuid(uuid, &clp->cl_uuid.uuid);
160
161 msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL];
162 status = rpc_call_sync(rpcclient_localio, &msg, 0);
163 dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n",
164 __func__, status);
165 rpc_shutdown_client(rpcclient_localio);
166
167 /* Server is only local if it initialized required struct members */
168 if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom)
169 return false;
170
171 return true;
172 }
173
174 /*
175 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client
176 * - called after alloc_client and init_client (so cl_rpcclient exists)
177 * - this function is idempotent, it can be called for old or new clients
178 */
nfs_local_probe(struct nfs_client * clp)179 static void nfs_local_probe(struct nfs_client *clp)
180 {
181 /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */
182 if (!localio_enabled ||
183 clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) {
184 nfs_localio_disable_client(clp);
185 return;
186 }
187
188 if (nfs_client_is_local(clp))
189 return;
190
191 if (!nfs_uuid_begin(&clp->cl_uuid))
192 return;
193 if (nfs_server_uuid_is_local(clp))
194 nfs_localio_enable_client(clp);
195 nfs_uuid_end(&clp->cl_uuid);
196 }
197
nfs_local_probe_async_work(struct work_struct * work)198 void nfs_local_probe_async_work(struct work_struct *work)
199 {
200 struct nfs_client *clp =
201 container_of(work, struct nfs_client, cl_local_probe_work);
202
203 if (!refcount_inc_not_zero(&clp->cl_count))
204 return;
205 nfs_local_probe(clp);
206 nfs_put_client(clp);
207 }
208
nfs_local_probe_async(struct nfs_client * clp)209 void nfs_local_probe_async(struct nfs_client *clp)
210 {
211 queue_work(nfsiod_workqueue, &clp->cl_local_probe_work);
212 }
213 EXPORT_SYMBOL_GPL(nfs_local_probe_async);
214
nfs_local_file_put(struct nfsd_file * localio)215 static inline void nfs_local_file_put(struct nfsd_file *localio)
216 {
217 /* nfs_to_nfsd_file_put_local() expects an __rcu pointer
218 * but we have a __kernel pointer. It is always safe
219 * to cast a __kernel pointer to an __rcu pointer
220 * because the cast only weakens what is known about the pointer.
221 */
222 struct nfsd_file __rcu *nf = (struct nfsd_file __rcu*) localio;
223
224 nfs_to_nfsd_file_put_local(&nf);
225 }
226
227 /*
228 * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
229 *
230 * Returns a pointer to a struct nfsd_file or ERR_PTR.
231 * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local().
232 */
233 static struct nfsd_file *
__nfs_local_open_fh(struct nfs_client * clp,const struct cred * cred,struct nfs_fh * fh,struct nfs_file_localio * nfl,struct nfsd_file __rcu ** pnf,const fmode_t mode)234 __nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
235 struct nfs_fh *fh, struct nfs_file_localio *nfl,
236 struct nfsd_file __rcu **pnf,
237 const fmode_t mode)
238 {
239 int status = 0;
240 struct nfsd_file *localio;
241
242 localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient,
243 cred, fh, nfl, pnf, mode);
244 if (IS_ERR(localio)) {
245 status = PTR_ERR(localio);
246 switch (status) {
247 case -ENOMEM:
248 case -ENXIO:
249 case -ENOENT:
250 /* Revalidate localio */
251 nfs_localio_disable_client(clp);
252 nfs_local_probe(clp);
253 }
254 }
255 trace_nfs_local_open_fh(fh, mode, status);
256 return localio;
257 }
258
259 /*
260 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file.
261 * First checking if the open nfsd_file is already cached, otherwise
262 * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio.
263 *
264 * Returns a pointer to a struct nfsd_file or NULL.
265 */
266 struct nfsd_file *
nfs_local_open_fh(struct nfs_client * clp,const struct cred * cred,struct nfs_fh * fh,struct nfs_file_localio * nfl,const fmode_t mode)267 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred,
268 struct nfs_fh *fh, struct nfs_file_localio *nfl,
269 const fmode_t mode)
270 {
271 struct nfsd_file *nf, __rcu **pnf;
272
273 if (!nfs_server_is_local(clp))
274 return NULL;
275 if (mode & ~(FMODE_READ | FMODE_WRITE))
276 return NULL;
277
278 if (mode & FMODE_WRITE)
279 pnf = &nfl->rw_file;
280 else
281 pnf = &nfl->ro_file;
282
283 nf = __nfs_local_open_fh(clp, cred, fh, nfl, pnf, mode);
284 if (IS_ERR(nf))
285 return NULL;
286 return nf;
287 }
288 EXPORT_SYMBOL_GPL(nfs_local_open_fh);
289
290 static void
nfs_local_iocb_free(struct nfs_local_kiocb * iocb)291 nfs_local_iocb_free(struct nfs_local_kiocb *iocb)
292 {
293 kfree(iocb->bvec);
294 kfree(iocb);
295 }
296
297 static struct nfs_local_kiocb *
nfs_local_iocb_alloc(struct nfs_pgio_header * hdr,struct file * file,gfp_t flags)298 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr,
299 struct file *file, gfp_t flags)
300 {
301 struct nfs_local_kiocb *iocb;
302
303 iocb = kzalloc(sizeof(*iocb), flags);
304 if (iocb == NULL)
305 return NULL;
306
307 iocb->bvec = kmalloc_array(hdr->page_array.npages,
308 sizeof(struct bio_vec), flags);
309 if (iocb->bvec == NULL) {
310 kfree(iocb);
311 return NULL;
312 }
313
314 init_sync_kiocb(&iocb->kiocb, file);
315
316 iocb->hdr = hdr;
317 iocb->kiocb.ki_flags &= ~IOCB_APPEND;
318 iocb->aio_complete_work = NULL;
319
320 iocb->end_iter_index = -1;
321
322 return iocb;
323 }
324
325 static bool
nfs_is_local_dio_possible(struct nfs_local_kiocb * iocb,int rw,size_t len,struct nfs_local_dio * local_dio)326 nfs_is_local_dio_possible(struct nfs_local_kiocb *iocb, int rw,
327 size_t len, struct nfs_local_dio *local_dio)
328 {
329 struct nfs_pgio_header *hdr = iocb->hdr;
330 loff_t offset = hdr->args.offset;
331 u32 nf_dio_mem_align, nf_dio_offset_align, nf_dio_read_offset_align;
332 loff_t start_end, orig_end, middle_end;
333
334 nfs_to->nfsd_file_dio_alignment(iocb->localio, &nf_dio_mem_align,
335 &nf_dio_offset_align, &nf_dio_read_offset_align);
336 if (rw == ITER_DEST)
337 nf_dio_offset_align = nf_dio_read_offset_align;
338
339 if (unlikely(!nf_dio_mem_align || !nf_dio_offset_align))
340 return false;
341 if (unlikely(nf_dio_offset_align > PAGE_SIZE))
342 return false;
343 if (unlikely(len < nf_dio_offset_align))
344 return false;
345
346 local_dio->mem_align = nf_dio_mem_align;
347 local_dio->offset_align = nf_dio_offset_align;
348
349 start_end = round_up(offset, nf_dio_offset_align);
350 orig_end = offset + len;
351 middle_end = round_down(orig_end, nf_dio_offset_align);
352
353 local_dio->middle_offset = start_end;
354 local_dio->end_offset = middle_end;
355
356 local_dio->start_len = start_end - offset;
357 local_dio->middle_len = middle_end - start_end;
358 local_dio->end_len = orig_end - middle_end;
359
360 if (rw == ITER_DEST)
361 trace_nfs_local_dio_read(hdr->inode, offset, len, local_dio);
362 else
363 trace_nfs_local_dio_write(hdr->inode, offset, len, local_dio);
364 return true;
365 }
366
nfs_iov_iter_aligned_bvec(const struct iov_iter * i,unsigned int addr_mask,unsigned int len_mask)367 static bool nfs_iov_iter_aligned_bvec(const struct iov_iter *i,
368 unsigned int addr_mask, unsigned int len_mask)
369 {
370 const struct bio_vec *bvec = i->bvec;
371 size_t skip = i->iov_offset;
372 size_t size = i->count;
373
374 if (size & len_mask)
375 return false;
376 do {
377 size_t len = bvec->bv_len;
378
379 if (len > size)
380 len = size;
381 if ((unsigned long)(bvec->bv_offset + skip) & addr_mask)
382 return false;
383 bvec++;
384 size -= len;
385 skip = 0;
386 } while (size);
387
388 return true;
389 }
390
391 /*
392 * Setup as many as 3 iov_iter based on extents described by @local_dio.
393 * Returns the number of iov_iter that were setup.
394 */
395 static int
nfs_local_iters_setup_dio(struct nfs_local_kiocb * iocb,int rw,unsigned int nvecs,size_t len,struct nfs_local_dio * local_dio)396 nfs_local_iters_setup_dio(struct nfs_local_kiocb *iocb, int rw,
397 unsigned int nvecs, size_t len,
398 struct nfs_local_dio *local_dio)
399 {
400 int n_iters = 0;
401 struct iov_iter *iters = iocb->iters;
402
403 /* Setup misaligned start? */
404 if (local_dio->start_len) {
405 iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
406 iters[n_iters].count = local_dio->start_len;
407 iocb->offset[n_iters] = iocb->hdr->args.offset;
408 iocb->iter_is_dio_aligned[n_iters] = false;
409 ++n_iters;
410 }
411
412 /* Setup misaligned end?
413 * If so, the end is purposely setup to be issued using buffered IO
414 * before the middle (which will use DIO, if DIO-aligned, with AIO).
415 * This creates problems if/when the end results in a partial write.
416 * So must save index and length of end to handle this corner case.
417 */
418 if (local_dio->end_len) {
419 iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
420 iocb->offset[n_iters] = local_dio->end_offset;
421 iov_iter_advance(&iters[n_iters],
422 local_dio->start_len + local_dio->middle_len);
423 iocb->iter_is_dio_aligned[n_iters] = false;
424 /* Save index and length of end */
425 iocb->end_iter_index = n_iters;
426 iocb->end_len = local_dio->end_len;
427 ++n_iters;
428 }
429
430 /* Setup DIO-aligned middle to be issued last, to allow for
431 * DIO with AIO completion (see nfs_local_call_{read,write}).
432 */
433 iov_iter_bvec(&iters[n_iters], rw, iocb->bvec, nvecs, len);
434 if (local_dio->start_len)
435 iov_iter_advance(&iters[n_iters], local_dio->start_len);
436 iters[n_iters].count -= local_dio->end_len;
437 iocb->offset[n_iters] = local_dio->middle_offset;
438
439 iocb->iter_is_dio_aligned[n_iters] =
440 nfs_iov_iter_aligned_bvec(&iters[n_iters],
441 local_dio->mem_align-1, local_dio->offset_align-1);
442
443 if (unlikely(!iocb->iter_is_dio_aligned[n_iters])) {
444 trace_nfs_local_dio_misaligned(iocb->hdr->inode,
445 iocb->hdr->args.offset, len, local_dio);
446 return 0; /* no DIO-aligned IO possible */
447 }
448 ++n_iters;
449
450 iocb->n_iters = n_iters;
451 return n_iters;
452 }
453
454 static noinline_for_stack void
nfs_local_iters_init(struct nfs_local_kiocb * iocb,int rw)455 nfs_local_iters_init(struct nfs_local_kiocb *iocb, int rw)
456 {
457 struct nfs_pgio_header *hdr = iocb->hdr;
458 struct page **pagevec = hdr->page_array.pagevec;
459 unsigned long v, total;
460 unsigned int base;
461 size_t len;
462
463 v = 0;
464 total = hdr->args.count;
465 base = hdr->args.pgbase;
466 while (total && v < hdr->page_array.npages) {
467 len = min_t(size_t, total, PAGE_SIZE - base);
468 bvec_set_page(&iocb->bvec[v], *pagevec, len, base);
469 total -= len;
470 ++pagevec;
471 ++v;
472 base = 0;
473 }
474 len = hdr->args.count - total;
475
476 if (test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) {
477 struct nfs_local_dio local_dio;
478
479 if (nfs_is_local_dio_possible(iocb, rw, len, &local_dio) &&
480 nfs_local_iters_setup_dio(iocb, rw, v, len, &local_dio) != 0)
481 return; /* is DIO-aligned */
482 }
483
484 /* Use buffered IO */
485 iocb->offset[0] = hdr->args.offset;
486 iov_iter_bvec(&iocb->iters[0], rw, iocb->bvec, v, len);
487 iocb->n_iters = 1;
488 }
489
490 static void
nfs_local_hdr_release(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)491 nfs_local_hdr_release(struct nfs_pgio_header *hdr,
492 const struct rpc_call_ops *call_ops)
493 {
494 call_ops->rpc_call_done(&hdr->task, hdr);
495 call_ops->rpc_release(hdr);
496 }
497
498 static void
nfs_local_pgio_init(struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)499 nfs_local_pgio_init(struct nfs_pgio_header *hdr,
500 const struct rpc_call_ops *call_ops)
501 {
502 hdr->task.tk_ops = call_ops;
503 if (!hdr->task.tk_start)
504 hdr->task.tk_start = ktime_get();
505 }
506
507 static void
nfs_local_pgio_done(struct nfs_pgio_header * hdr,long status)508 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status)
509 {
510 /* Must handle partial completions */
511 if (status >= 0) {
512 hdr->res.count += status;
513 /* @hdr was initialized to 0 (zeroed during allocation) */
514 if (hdr->task.tk_status == 0)
515 hdr->res.op_status = NFS4_OK;
516 } else {
517 hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
518 hdr->task.tk_status = status;
519 }
520 }
521
522 static void
nfs_local_iocb_release(struct nfs_local_kiocb * iocb)523 nfs_local_iocb_release(struct nfs_local_kiocb *iocb)
524 {
525 nfs_local_file_put(iocb->localio);
526 nfs_local_iocb_free(iocb);
527 }
528
529 static void
nfs_local_pgio_release(struct nfs_local_kiocb * iocb)530 nfs_local_pgio_release(struct nfs_local_kiocb *iocb)
531 {
532 struct nfs_pgio_header *hdr = iocb->hdr;
533
534 nfs_local_iocb_release(iocb);
535 nfs_local_hdr_release(hdr, hdr->task.tk_ops);
536 }
537
538 /*
539 * Complete the I/O from iocb->kiocb.ki_complete()
540 *
541 * Note that this function can be called from a bottom half context,
542 * hence we need to queue the rpc_call_done() etc to a workqueue
543 */
nfs_local_pgio_aio_complete(struct nfs_local_kiocb * iocb)544 static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb)
545 {
546 INIT_WORK(&iocb->work, iocb->aio_complete_work);
547 queue_work(nfsiod_workqueue, &iocb->work);
548 }
549
550 static void
nfs_local_read_done(struct nfs_local_kiocb * iocb,long status)551 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status)
552 {
553 struct nfs_pgio_header *hdr = iocb->hdr;
554 struct file *filp = iocb->kiocb.ki_filp;
555
556 if ((iocb->kiocb.ki_flags & IOCB_DIRECT) && status == -EINVAL) {
557 /* Underlying FS will return -EINVAL if misaligned DIO is attempted. */
558 pr_info_ratelimited("nfs: Unexpected direct I/O read alignment failure\n");
559 }
560
561 /*
562 * Must clear replen otherwise NFSv3 data corruption will occur
563 * if/when switching from LOCALIO back to using normal RPC.
564 */
565 hdr->res.replen = 0;
566
567 if (hdr->res.count != hdr->args.count ||
568 hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp)))
569 hdr->res.eof = true;
570
571 dprintk("%s: read %ld bytes eof %d.\n", __func__,
572 status > 0 ? status : 0, hdr->res.eof);
573 }
574
nfs_local_read_aio_complete_work(struct work_struct * work)575 static void nfs_local_read_aio_complete_work(struct work_struct *work)
576 {
577 struct nfs_local_kiocb *iocb =
578 container_of(work, struct nfs_local_kiocb, work);
579
580 nfs_local_pgio_release(iocb);
581 }
582
nfs_local_read_aio_complete(struct kiocb * kiocb,long ret)583 static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret)
584 {
585 struct nfs_local_kiocb *iocb =
586 container_of(kiocb, struct nfs_local_kiocb, kiocb);
587
588 nfs_local_pgio_done(iocb->hdr, ret);
589 nfs_local_read_done(iocb, ret);
590 nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */
591 }
592
nfs_local_call_read(struct work_struct * work)593 static void nfs_local_call_read(struct work_struct *work)
594 {
595 struct nfs_local_kiocb *iocb =
596 container_of(work, struct nfs_local_kiocb, work);
597 struct file *filp = iocb->kiocb.ki_filp;
598 const struct cred *save_cred;
599 ssize_t status;
600
601 save_cred = override_creds(filp->f_cred);
602
603 for (int i = 0; i < iocb->n_iters ; i++) {
604 if (iocb->iter_is_dio_aligned[i]) {
605 iocb->kiocb.ki_flags |= IOCB_DIRECT;
606 iocb->kiocb.ki_complete = nfs_local_read_aio_complete;
607 iocb->aio_complete_work = nfs_local_read_aio_complete_work;
608 }
609
610 iocb->kiocb.ki_pos = iocb->offset[i];
611 status = filp->f_op->read_iter(&iocb->kiocb, &iocb->iters[i]);
612 if (status != -EIOCBQUEUED) {
613 nfs_local_pgio_done(iocb->hdr, status);
614 if (iocb->hdr->task.tk_status)
615 break;
616 }
617 }
618
619 revert_creds(save_cred);
620
621 if (status != -EIOCBQUEUED) {
622 nfs_local_read_done(iocb, status);
623 nfs_local_pgio_release(iocb);
624 }
625 }
626
627 static int
nfs_local_do_read(struct nfs_local_kiocb * iocb,const struct rpc_call_ops * call_ops)628 nfs_local_do_read(struct nfs_local_kiocb *iocb,
629 const struct rpc_call_ops *call_ops)
630 {
631 struct nfs_pgio_header *hdr = iocb->hdr;
632
633 dprintk("%s: vfs_read count=%u pos=%llu\n",
634 __func__, hdr->args.count, hdr->args.offset);
635
636 nfs_local_pgio_init(hdr, call_ops);
637 hdr->res.eof = false;
638
639 INIT_WORK(&iocb->work, nfs_local_call_read);
640 queue_work(nfslocaliod_workqueue, &iocb->work);
641
642 return 0;
643 }
644
645 static void
nfs_copy_boot_verifier(struct nfs_write_verifier * verifier,struct inode * inode)646 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode)
647 {
648 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
649 u32 *verf = (u32 *)verifier->data;
650 unsigned int seq;
651
652 do {
653 seq = read_seqbegin(&clp->cl_boot_lock);
654 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec;
655 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec;
656 } while (read_seqretry(&clp->cl_boot_lock, seq));
657 }
658
659 static void
nfs_reset_boot_verifier(struct inode * inode)660 nfs_reset_boot_verifier(struct inode *inode)
661 {
662 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client;
663
664 write_seqlock(&clp->cl_boot_lock);
665 ktime_get_real_ts64(&clp->cl_nfssvc_boot);
666 write_sequnlock(&clp->cl_boot_lock);
667 }
668
669 static void
nfs_set_local_verifier(struct inode * inode,struct nfs_writeverf * verf,enum nfs3_stable_how how)670 nfs_set_local_verifier(struct inode *inode,
671 struct nfs_writeverf *verf,
672 enum nfs3_stable_how how)
673 {
674 nfs_copy_boot_verifier(&verf->verifier, inode);
675 verf->committed = how;
676 }
677
678 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */
__vfs_getattr(const struct path * p,struct kstat * stat,int version)679 static int __vfs_getattr(const struct path *p, struct kstat *stat, int version)
680 {
681 u32 request_mask = STATX_BASIC_STATS;
682
683 if (version == 4)
684 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE);
685 return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT);
686 }
687
688 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */
__nfsd4_change_attribute(const struct kstat * stat,const struct inode * inode)689 static u64 __nfsd4_change_attribute(const struct kstat *stat,
690 const struct inode *inode)
691 {
692 u64 chattr;
693
694 if (stat->result_mask & STATX_CHANGE_COOKIE) {
695 chattr = stat->change_cookie;
696 if (S_ISREG(inode->i_mode) &&
697 !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) {
698 chattr += (u64)stat->ctime.tv_sec << 30;
699 chattr += stat->ctime.tv_nsec;
700 }
701 } else {
702 chattr = time_to_chattr(&stat->ctime);
703 }
704 return chattr;
705 }
706
nfs_local_vfs_getattr(struct nfs_local_kiocb * iocb)707 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb)
708 {
709 struct kstat stat;
710 struct file *filp = iocb->kiocb.ki_filp;
711 struct nfs_pgio_header *hdr = iocb->hdr;
712 struct nfs_fattr *fattr = hdr->res.fattr;
713 int version = NFS_PROTO(hdr->inode)->version;
714
715 if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version))
716 return;
717
718 fattr->valid = (NFS_ATTR_FATTR_FILEID |
719 NFS_ATTR_FATTR_CHANGE |
720 NFS_ATTR_FATTR_SIZE |
721 NFS_ATTR_FATTR_ATIME |
722 NFS_ATTR_FATTR_MTIME |
723 NFS_ATTR_FATTR_CTIME |
724 NFS_ATTR_FATTR_SPACE_USED);
725
726 fattr->fileid = stat.ino;
727 fattr->size = stat.size;
728 fattr->atime = stat.atime;
729 fattr->mtime = stat.mtime;
730 fattr->ctime = stat.ctime;
731 if (version == 4) {
732 fattr->change_attr =
733 __nfsd4_change_attribute(&stat, file_inode(filp));
734 } else
735 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime);
736 fattr->du.nfs3.used = stat.blocks << 9;
737 }
738
739 static void
nfs_local_write_done(struct nfs_local_kiocb * iocb,long status)740 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status)
741 {
742 struct nfs_pgio_header *hdr = iocb->hdr;
743 struct inode *inode = hdr->inode;
744
745 dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0);
746
747 if ((iocb->kiocb.ki_flags & IOCB_DIRECT) && status == -EINVAL) {
748 /* Underlying FS will return -EINVAL if misaligned DIO is attempted. */
749 pr_info_ratelimited("nfs: Unexpected direct I/O write alignment failure\n");
750 }
751
752 /* Handle short writes as if they are ENOSPC */
753 status = hdr->res.count;
754 if (status > 0 && status < hdr->args.count) {
755 hdr->mds_offset += status;
756 hdr->args.offset += status;
757 hdr->args.pgbase += status;
758 hdr->args.count -= status;
759 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset);
760 status = -ENOSPC;
761 /* record -ENOSPC in terms of nfs_local_pgio_done */
762 nfs_local_pgio_done(hdr, status);
763 }
764 if (hdr->task.tk_status < 0)
765 nfs_reset_boot_verifier(inode);
766 }
767
nfs_local_write_aio_complete_work(struct work_struct * work)768 static void nfs_local_write_aio_complete_work(struct work_struct *work)
769 {
770 struct nfs_local_kiocb *iocb =
771 container_of(work, struct nfs_local_kiocb, work);
772
773 nfs_local_vfs_getattr(iocb);
774 nfs_local_pgio_release(iocb);
775 }
776
nfs_local_write_aio_complete(struct kiocb * kiocb,long ret)777 static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret)
778 {
779 struct nfs_local_kiocb *iocb =
780 container_of(kiocb, struct nfs_local_kiocb, kiocb);
781
782 nfs_local_pgio_done(iocb->hdr, ret);
783 nfs_local_write_done(iocb, ret);
784 nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */
785 }
786
nfs_local_call_write(struct work_struct * work)787 static void nfs_local_call_write(struct work_struct *work)
788 {
789 struct nfs_local_kiocb *iocb =
790 container_of(work, struct nfs_local_kiocb, work);
791 struct file *filp = iocb->kiocb.ki_filp;
792 unsigned long old_flags = current->flags;
793 const struct cred *save_cred;
794 ssize_t status;
795
796 current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO;
797 save_cred = override_creds(filp->f_cred);
798
799 file_start_write(filp);
800 for (int i = 0; i < iocb->n_iters ; i++) {
801 if (iocb->iter_is_dio_aligned[i]) {
802 iocb->kiocb.ki_flags |= IOCB_DIRECT;
803 iocb->kiocb.ki_complete = nfs_local_write_aio_complete;
804 iocb->aio_complete_work = nfs_local_write_aio_complete_work;
805 }
806 retry:
807 iocb->kiocb.ki_pos = iocb->offset[i];
808 status = filp->f_op->write_iter(&iocb->kiocb, &iocb->iters[i]);
809 if (status != -EIOCBQUEUED) {
810 if (unlikely(status >= 0 && status < iocb->iters[i].count)) {
811 /* partial write */
812 if (i == iocb->end_iter_index) {
813 /* Must not account partial end, otherwise, due
814 * to end being issued before middle: the partial
815 * write accounting in nfs_local_write_done()
816 * would incorrectly advance hdr->args.offset
817 */
818 status = 0;
819 } else {
820 /* Partial write at start or buffered middle,
821 * exit early.
822 */
823 nfs_local_pgio_done(iocb->hdr, status);
824 break;
825 }
826 } else if (unlikely(status == -ENOTBLK &&
827 (iocb->kiocb.ki_flags & IOCB_DIRECT))) {
828 /* VFS will return -ENOTBLK if DIO WRITE fails to
829 * invalidate the page cache. Retry using buffered IO.
830 */
831 iocb->kiocb.ki_flags &= ~IOCB_DIRECT;
832 iocb->kiocb.ki_complete = NULL;
833 iocb->aio_complete_work = NULL;
834 goto retry;
835 }
836 nfs_local_pgio_done(iocb->hdr, status);
837 if (iocb->hdr->task.tk_status)
838 break;
839 }
840 }
841 file_end_write(filp);
842
843 revert_creds(save_cred);
844 current->flags = old_flags;
845
846 if (status != -EIOCBQUEUED) {
847 nfs_local_write_done(iocb, status);
848 nfs_local_vfs_getattr(iocb);
849 nfs_local_pgio_release(iocb);
850 }
851 }
852
853 static int
nfs_local_do_write(struct nfs_local_kiocb * iocb,const struct rpc_call_ops * call_ops)854 nfs_local_do_write(struct nfs_local_kiocb *iocb,
855 const struct rpc_call_ops *call_ops)
856 {
857 struct nfs_pgio_header *hdr = iocb->hdr;
858
859 dprintk("%s: vfs_write count=%u pos=%llu %s\n",
860 __func__, hdr->args.count, hdr->args.offset,
861 (hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable");
862
863 switch (hdr->args.stable) {
864 default:
865 break;
866 case NFS_DATA_SYNC:
867 iocb->kiocb.ki_flags |= IOCB_DSYNC;
868 break;
869 case NFS_FILE_SYNC:
870 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC;
871 }
872
873 nfs_local_pgio_init(hdr, call_ops);
874
875 nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable);
876
877 INIT_WORK(&iocb->work, nfs_local_call_write);
878 queue_work(nfslocaliod_workqueue, &iocb->work);
879
880 return 0;
881 }
882
883 static struct nfs_local_kiocb *
nfs_local_iocb_init(struct nfs_pgio_header * hdr,struct nfsd_file * localio)884 nfs_local_iocb_init(struct nfs_pgio_header *hdr, struct nfsd_file *localio)
885 {
886 struct file *file = nfs_to->nfsd_file_file(localio);
887 struct nfs_local_kiocb *iocb;
888 gfp_t gfp_mask;
889 int rw;
890
891 if (hdr->rw_mode & FMODE_READ) {
892 if (!file->f_op->read_iter)
893 return ERR_PTR(-EOPNOTSUPP);
894 gfp_mask = GFP_KERNEL;
895 rw = ITER_DEST;
896 } else {
897 if (!file->f_op->write_iter)
898 return ERR_PTR(-EOPNOTSUPP);
899 gfp_mask = GFP_NOIO;
900 rw = ITER_SOURCE;
901 }
902
903 iocb = nfs_local_iocb_alloc(hdr, file, gfp_mask);
904 if (iocb == NULL)
905 return ERR_PTR(-ENOMEM);
906 iocb->hdr = hdr;
907 iocb->localio = localio;
908
909 nfs_local_iters_init(iocb, rw);
910
911 return iocb;
912 }
913
nfs_local_doio(struct nfs_client * clp,struct nfsd_file * localio,struct nfs_pgio_header * hdr,const struct rpc_call_ops * call_ops)914 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio,
915 struct nfs_pgio_header *hdr,
916 const struct rpc_call_ops *call_ops)
917 {
918 struct nfs_local_kiocb *iocb;
919 int status = 0;
920
921 if (!hdr->args.count)
922 return 0;
923
924 iocb = nfs_local_iocb_init(hdr, localio);
925 if (IS_ERR(iocb))
926 return PTR_ERR(iocb);
927
928 switch (hdr->rw_mode) {
929 case FMODE_READ:
930 status = nfs_local_do_read(iocb, call_ops);
931 break;
932 case FMODE_WRITE:
933 status = nfs_local_do_write(iocb, call_ops);
934 break;
935 default:
936 dprintk("%s: invalid mode: %d\n", __func__,
937 hdr->rw_mode);
938 status = -EOPNOTSUPP;
939 }
940
941 if (status != 0) {
942 if (status == -EAGAIN)
943 nfs_localio_disable_client(clp);
944 nfs_local_iocb_release(iocb);
945 hdr->task.tk_status = status;
946 nfs_local_hdr_release(hdr, call_ops);
947 }
948 return status;
949 }
950
951 static void
nfs_local_init_commit(struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)952 nfs_local_init_commit(struct nfs_commit_data *data,
953 const struct rpc_call_ops *call_ops)
954 {
955 data->task.tk_ops = call_ops;
956 }
957
958 static int
nfs_local_run_commit(struct file * filp,struct nfs_commit_data * data)959 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data)
960 {
961 loff_t start = data->args.offset;
962 loff_t end = LLONG_MAX;
963
964 if (data->args.count > 0) {
965 end = start + data->args.count - 1;
966 if (end < start)
967 end = LLONG_MAX;
968 }
969
970 dprintk("%s: commit %llu - %llu\n", __func__, start, end);
971 return vfs_fsync_range(filp, start, end, 0);
972 }
973
974 static void
nfs_local_commit_done(struct nfs_commit_data * data,int status)975 nfs_local_commit_done(struct nfs_commit_data *data, int status)
976 {
977 if (status >= 0) {
978 nfs_set_local_verifier(data->inode,
979 data->res.verf,
980 NFS_FILE_SYNC);
981 data->res.op_status = NFS4_OK;
982 data->task.tk_status = 0;
983 } else {
984 nfs_reset_boot_verifier(data->inode);
985 data->res.op_status = nfs_localio_errno_to_nfs4_stat(status);
986 data->task.tk_status = status;
987 }
988 }
989
990 static void
nfs_local_release_commit_data(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops)991 nfs_local_release_commit_data(struct nfsd_file *localio,
992 struct nfs_commit_data *data,
993 const struct rpc_call_ops *call_ops)
994 {
995 nfs_local_file_put(localio);
996 call_ops->rpc_call_done(&data->task, data);
997 call_ops->rpc_release(data);
998 }
999
1000 static void
nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx * ctx)1001 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx)
1002 {
1003 nfs_local_release_commit_data(ctx->localio, ctx->data,
1004 ctx->data->task.tk_ops);
1005 kfree(ctx);
1006 }
1007
1008 static void
nfs_local_fsync_work(struct work_struct * work)1009 nfs_local_fsync_work(struct work_struct *work)
1010 {
1011 struct nfs_local_fsync_ctx *ctx;
1012 int status;
1013
1014 ctx = container_of(work, struct nfs_local_fsync_ctx, work);
1015
1016 status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio),
1017 ctx->data);
1018 nfs_local_commit_done(ctx->data, status);
1019 if (ctx->done != NULL)
1020 complete(ctx->done);
1021 nfs_local_fsync_ctx_free(ctx);
1022 }
1023
1024 static struct nfs_local_fsync_ctx *
nfs_local_fsync_ctx_alloc(struct nfs_commit_data * data,struct nfsd_file * localio,gfp_t flags)1025 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data,
1026 struct nfsd_file *localio, gfp_t flags)
1027 {
1028 struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags);
1029
1030 if (ctx != NULL) {
1031 ctx->localio = localio;
1032 ctx->data = data;
1033 INIT_WORK(&ctx->work, nfs_local_fsync_work);
1034 ctx->done = NULL;
1035 }
1036 return ctx;
1037 }
1038
nfs_local_commit(struct nfsd_file * localio,struct nfs_commit_data * data,const struct rpc_call_ops * call_ops,int how)1039 int nfs_local_commit(struct nfsd_file *localio,
1040 struct nfs_commit_data *data,
1041 const struct rpc_call_ops *call_ops, int how)
1042 {
1043 struct nfs_local_fsync_ctx *ctx;
1044
1045 ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL);
1046 if (!ctx) {
1047 nfs_local_commit_done(data, -ENOMEM);
1048 nfs_local_release_commit_data(localio, data, call_ops);
1049 return -ENOMEM;
1050 }
1051
1052 nfs_local_init_commit(data, call_ops);
1053
1054 if (how & FLUSH_SYNC) {
1055 DECLARE_COMPLETION_ONSTACK(done);
1056 ctx->done = &done;
1057 queue_work(nfsiod_workqueue, &ctx->work);
1058 wait_for_completion(&done);
1059 } else
1060 queue_work(nfsiod_workqueue, &ctx->work);
1061
1062 return 0;
1063 }
1064