1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * NFS client support for local clients to bypass network stack 4 * 5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com> 6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com> 7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com> 8 * Copyright (C) 2024 NeilBrown <neilb@suse.de> 9 */ 10 11 #include <linux/module.h> 12 #include <linux/errno.h> 13 #include <linux/vfs.h> 14 #include <linux/file.h> 15 #include <linux/inet.h> 16 #include <linux/sunrpc/addr.h> 17 #include <linux/inetdevice.h> 18 #include <net/addrconf.h> 19 #include <linux/nfs_common.h> 20 #include <linux/nfslocalio.h> 21 #include <linux/bvec.h> 22 23 #include <linux/nfs.h> 24 #include <linux/nfs_fs.h> 25 #include <linux/nfs_xdr.h> 26 27 #include "internal.h" 28 #include "pnfs.h" 29 #include "nfstrace.h" 30 31 #define NFSDBG_FACILITY NFSDBG_VFS 32 33 struct nfs_local_kiocb { 34 struct kiocb kiocb; 35 struct bio_vec *bvec; 36 struct nfs_pgio_header *hdr; 37 struct work_struct work; 38 void (*aio_complete_work)(struct work_struct *); 39 struct nfsd_file *localio; 40 }; 41 42 struct nfs_local_fsync_ctx { 43 struct nfsd_file *localio; 44 struct nfs_commit_data *data; 45 struct work_struct work; 46 struct completion *done; 47 }; 48 49 static bool localio_enabled __read_mostly = true; 50 module_param(localio_enabled, bool, 0644); 51 52 static bool localio_O_DIRECT_semantics __read_mostly = false; 53 module_param(localio_O_DIRECT_semantics, bool, 0644); 54 MODULE_PARM_DESC(localio_O_DIRECT_semantics, 55 "LOCALIO will use O_DIRECT semantics to filesystem."); 56 57 static inline bool nfs_client_is_local(const struct nfs_client *clp) 58 { 59 return !!rcu_access_pointer(clp->cl_uuid.net); 60 } 61 62 bool nfs_server_is_local(const struct nfs_client *clp) 63 { 64 return nfs_client_is_local(clp) && localio_enabled; 65 } 66 EXPORT_SYMBOL_GPL(nfs_server_is_local); 67 68 /* 69 * UUID_IS_LOCAL XDR functions 70 */ 71 72 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req, 73 struct xdr_stream *xdr, 74 const void *data) 75 { 76 const u8 *uuid = data; 77 78 encode_opaque_fixed(xdr, uuid, UUID_SIZE); 79 } 80 81 static int localio_xdr_dec_uuidres(struct rpc_rqst *req, 82 struct xdr_stream *xdr, 83 void *result) 84 { 85 /* void return */ 86 return 0; 87 } 88 89 static const struct rpc_procinfo nfs_localio_procedures[] = { 90 [LOCALIOPROC_UUID_IS_LOCAL] = { 91 .p_proc = LOCALIOPROC_UUID_IS_LOCAL, 92 .p_encode = localio_xdr_enc_uuidargs, 93 .p_decode = localio_xdr_dec_uuidres, 94 .p_arglen = XDR_QUADLEN(UUID_SIZE), 95 .p_replen = 0, 96 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL, 97 .p_name = "UUID_IS_LOCAL", 98 }, 99 }; 100 101 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)]; 102 static const struct rpc_version nfslocalio_version1 = { 103 .number = 1, 104 .nrprocs = ARRAY_SIZE(nfs_localio_procedures), 105 .procs = nfs_localio_procedures, 106 .counts = nfs_localio_counts, 107 }; 108 109 static const struct rpc_version *nfslocalio_version[] = { 110 [1] = &nfslocalio_version1, 111 }; 112 113 extern const struct rpc_program nfslocalio_program; 114 static struct rpc_stat nfslocalio_rpcstat = { &nfslocalio_program }; 115 116 const struct rpc_program nfslocalio_program = { 117 .name = "nfslocalio", 118 .number = NFS_LOCALIO_PROGRAM, 119 .nrvers = ARRAY_SIZE(nfslocalio_version), 120 .version = nfslocalio_version, 121 .stats = &nfslocalio_rpcstat, 122 }; 123 124 /* 125 * nfs_init_localioclient - Initialise an NFS localio client connection 126 */ 127 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp) 128 { 129 struct rpc_clnt *rpcclient_localio; 130 131 rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient, 132 &nfslocalio_program, 1); 133 134 dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n", 135 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR), 136 (IS_ERR(rpcclient_localio) ? "does not support" : "supports")); 137 138 return rpcclient_localio; 139 } 140 141 static bool nfs_server_uuid_is_local(struct nfs_client *clp) 142 { 143 u8 uuid[UUID_SIZE]; 144 struct rpc_message msg = { 145 .rpc_argp = &uuid, 146 }; 147 struct rpc_clnt *rpcclient_localio; 148 int status; 149 150 rpcclient_localio = nfs_init_localioclient(clp); 151 if (IS_ERR(rpcclient_localio)) 152 return false; 153 154 export_uuid(uuid, &clp->cl_uuid.uuid); 155 156 msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL]; 157 status = rpc_call_sync(rpcclient_localio, &msg, 0); 158 dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n", 159 __func__, status); 160 rpc_shutdown_client(rpcclient_localio); 161 162 /* Server is only local if it initialized required struct members */ 163 if (status || !rcu_access_pointer(clp->cl_uuid.net) || !clp->cl_uuid.dom) 164 return false; 165 166 return true; 167 } 168 169 /* 170 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client 171 * - called after alloc_client and init_client (so cl_rpcclient exists) 172 * - this function is idempotent, it can be called for old or new clients 173 */ 174 void nfs_local_probe(struct nfs_client *clp) 175 { 176 /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */ 177 if (!localio_enabled || 178 clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) { 179 nfs_localio_disable_client(clp); 180 return; 181 } 182 183 if (nfs_client_is_local(clp)) { 184 /* If already enabled, disable and re-enable */ 185 nfs_localio_disable_client(clp); 186 } 187 188 if (!nfs_uuid_begin(&clp->cl_uuid)) 189 return; 190 if (nfs_server_uuid_is_local(clp)) 191 nfs_localio_enable_client(clp); 192 nfs_uuid_end(&clp->cl_uuid); 193 } 194 EXPORT_SYMBOL_GPL(nfs_local_probe); 195 196 void nfs_local_probe_async_work(struct work_struct *work) 197 { 198 struct nfs_client *clp = 199 container_of(work, struct nfs_client, cl_local_probe_work); 200 201 nfs_local_probe(clp); 202 } 203 204 void nfs_local_probe_async(struct nfs_client *clp) 205 { 206 queue_work(nfsiod_workqueue, &clp->cl_local_probe_work); 207 } 208 EXPORT_SYMBOL_GPL(nfs_local_probe_async); 209 210 static inline struct nfsd_file *nfs_local_file_get(struct nfsd_file *nf) 211 { 212 return nfs_to->nfsd_file_get(nf); 213 } 214 215 static inline void nfs_local_file_put(struct nfsd_file *nf) 216 { 217 nfs_to->nfsd_file_put(nf); 218 } 219 220 /* 221 * __nfs_local_open_fh - open a local filehandle in terms of nfsd_file. 222 * 223 * Returns a pointer to a struct nfsd_file or ERR_PTR. 224 * Caller must release returned nfsd_file with nfs_to_nfsd_file_put_local(). 225 */ 226 static struct nfsd_file * 227 __nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, 228 struct nfs_fh *fh, struct nfs_file_localio *nfl, 229 const fmode_t mode) 230 { 231 struct nfsd_file *localio; 232 233 localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient, 234 cred, fh, nfl, mode); 235 if (IS_ERR(localio)) { 236 int status = PTR_ERR(localio); 237 trace_nfs_local_open_fh(fh, mode, status); 238 switch (status) { 239 case -ENOMEM: 240 case -ENXIO: 241 case -ENOENT: 242 /* Revalidate localio, will disable if unsupported */ 243 nfs_local_probe(clp); 244 } 245 } 246 return localio; 247 } 248 249 /* 250 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file. 251 * First checking if the open nfsd_file is already cached, otherwise 252 * must __nfs_local_open_fh and insert the nfsd_file in nfs_file_localio. 253 * 254 * Returns a pointer to a struct nfsd_file or NULL. 255 */ 256 struct nfsd_file * 257 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, 258 struct nfs_fh *fh, struct nfs_file_localio *nfl, 259 const fmode_t mode) 260 { 261 struct nfsd_file *nf, *new, __rcu **pnf; 262 263 if (!nfs_server_is_local(clp)) 264 return NULL; 265 if (mode & ~(FMODE_READ | FMODE_WRITE)) 266 return NULL; 267 268 if (mode & FMODE_WRITE) 269 pnf = &nfl->rw_file; 270 else 271 pnf = &nfl->ro_file; 272 273 new = NULL; 274 rcu_read_lock(); 275 nf = rcu_dereference(*pnf); 276 if (!nf) { 277 rcu_read_unlock(); 278 new = __nfs_local_open_fh(clp, cred, fh, nfl, mode); 279 if (IS_ERR(new)) 280 return NULL; 281 /* try to swap in the pointer */ 282 spin_lock(&clp->cl_uuid.lock); 283 nf = rcu_dereference_protected(*pnf, 1); 284 if (!nf) { 285 nf = new; 286 new = NULL; 287 rcu_assign_pointer(*pnf, nf); 288 } 289 spin_unlock(&clp->cl_uuid.lock); 290 rcu_read_lock(); 291 } 292 nf = nfs_local_file_get(nf); 293 rcu_read_unlock(); 294 if (new) 295 nfs_to_nfsd_file_put_local(new); 296 return nf; 297 } 298 EXPORT_SYMBOL_GPL(nfs_local_open_fh); 299 300 static struct bio_vec * 301 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec, 302 unsigned int npages, gfp_t flags) 303 { 304 struct bio_vec *bvec, *p; 305 306 bvec = kmalloc_array(npages, sizeof(*bvec), flags); 307 if (bvec != NULL) { 308 for (p = bvec; npages > 0; p++, pagevec++, npages--) { 309 p->bv_page = *pagevec; 310 p->bv_len = PAGE_SIZE; 311 p->bv_offset = 0; 312 } 313 } 314 return bvec; 315 } 316 317 static void 318 nfs_local_iocb_free(struct nfs_local_kiocb *iocb) 319 { 320 kfree(iocb->bvec); 321 kfree(iocb); 322 } 323 324 static struct nfs_local_kiocb * 325 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr, 326 struct file *file, gfp_t flags) 327 { 328 struct nfs_local_kiocb *iocb; 329 330 iocb = kmalloc(sizeof(*iocb), flags); 331 if (iocb == NULL) 332 return NULL; 333 iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec, 334 hdr->page_array.npages, flags); 335 if (iocb->bvec == NULL) { 336 kfree(iocb); 337 return NULL; 338 } 339 340 if (localio_O_DIRECT_semantics && 341 test_bit(NFS_IOHDR_ODIRECT, &hdr->flags)) { 342 iocb->kiocb.ki_filp = file; 343 iocb->kiocb.ki_flags = IOCB_DIRECT; 344 } else 345 init_sync_kiocb(&iocb->kiocb, file); 346 347 iocb->kiocb.ki_pos = hdr->args.offset; 348 iocb->hdr = hdr; 349 iocb->kiocb.ki_flags &= ~IOCB_APPEND; 350 iocb->aio_complete_work = NULL; 351 352 return iocb; 353 } 354 355 static void 356 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir) 357 { 358 struct nfs_pgio_header *hdr = iocb->hdr; 359 360 iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages, 361 hdr->args.count + hdr->args.pgbase); 362 if (hdr->args.pgbase != 0) 363 iov_iter_advance(i, hdr->args.pgbase); 364 } 365 366 static void 367 nfs_local_hdr_release(struct nfs_pgio_header *hdr, 368 const struct rpc_call_ops *call_ops) 369 { 370 call_ops->rpc_call_done(&hdr->task, hdr); 371 call_ops->rpc_release(hdr); 372 } 373 374 static void 375 nfs_local_pgio_init(struct nfs_pgio_header *hdr, 376 const struct rpc_call_ops *call_ops) 377 { 378 hdr->task.tk_ops = call_ops; 379 if (!hdr->task.tk_start) 380 hdr->task.tk_start = ktime_get(); 381 } 382 383 static void 384 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status) 385 { 386 if (status >= 0) { 387 hdr->res.count = status; 388 hdr->res.op_status = NFS4_OK; 389 hdr->task.tk_status = 0; 390 } else { 391 hdr->res.op_status = nfs_localio_errno_to_nfs4_stat(status); 392 hdr->task.tk_status = status; 393 } 394 } 395 396 static void 397 nfs_local_pgio_release(struct nfs_local_kiocb *iocb) 398 { 399 struct nfs_pgio_header *hdr = iocb->hdr; 400 401 nfs_local_file_put(iocb->localio); 402 nfs_local_iocb_free(iocb); 403 nfs_local_hdr_release(hdr, hdr->task.tk_ops); 404 } 405 406 /* 407 * Complete the I/O from iocb->kiocb.ki_complete() 408 * 409 * Note that this function can be called from a bottom half context, 410 * hence we need to queue the rpc_call_done() etc to a workqueue 411 */ 412 static inline void nfs_local_pgio_aio_complete(struct nfs_local_kiocb *iocb) 413 { 414 INIT_WORK(&iocb->work, iocb->aio_complete_work); 415 queue_work(nfsiod_workqueue, &iocb->work); 416 } 417 418 static void 419 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status) 420 { 421 struct nfs_pgio_header *hdr = iocb->hdr; 422 struct file *filp = iocb->kiocb.ki_filp; 423 424 nfs_local_pgio_done(hdr, status); 425 426 /* 427 * Must clear replen otherwise NFSv3 data corruption will occur 428 * if/when switching from LOCALIO back to using normal RPC. 429 */ 430 hdr->res.replen = 0; 431 432 if (hdr->res.count != hdr->args.count || 433 hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp))) 434 hdr->res.eof = true; 435 436 dprintk("%s: read %ld bytes eof %d.\n", __func__, 437 status > 0 ? status : 0, hdr->res.eof); 438 } 439 440 static void nfs_local_read_aio_complete_work(struct work_struct *work) 441 { 442 struct nfs_local_kiocb *iocb = 443 container_of(work, struct nfs_local_kiocb, work); 444 445 nfs_local_pgio_release(iocb); 446 } 447 448 static void nfs_local_read_aio_complete(struct kiocb *kiocb, long ret) 449 { 450 struct nfs_local_kiocb *iocb = 451 container_of(kiocb, struct nfs_local_kiocb, kiocb); 452 453 nfs_local_read_done(iocb, ret); 454 nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_read_aio_complete_work */ 455 } 456 457 static void nfs_local_call_read(struct work_struct *work) 458 { 459 struct nfs_local_kiocb *iocb = 460 container_of(work, struct nfs_local_kiocb, work); 461 struct file *filp = iocb->kiocb.ki_filp; 462 const struct cred *save_cred; 463 struct iov_iter iter; 464 ssize_t status; 465 466 save_cred = override_creds(filp->f_cred); 467 468 nfs_local_iter_init(&iter, iocb, READ); 469 470 status = filp->f_op->read_iter(&iocb->kiocb, &iter); 471 if (status != -EIOCBQUEUED) { 472 nfs_local_read_done(iocb, status); 473 nfs_local_pgio_release(iocb); 474 } 475 476 revert_creds(save_cred); 477 } 478 479 static int 480 nfs_do_local_read(struct nfs_pgio_header *hdr, 481 struct nfsd_file *localio, 482 const struct rpc_call_ops *call_ops) 483 { 484 struct nfs_local_kiocb *iocb; 485 struct file *file = nfs_to->nfsd_file_file(localio); 486 487 /* Don't support filesystems without read_iter */ 488 if (!file->f_op->read_iter) 489 return -EAGAIN; 490 491 dprintk("%s: vfs_read count=%u pos=%llu\n", 492 __func__, hdr->args.count, hdr->args.offset); 493 494 iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL); 495 if (iocb == NULL) 496 return -ENOMEM; 497 iocb->localio = localio; 498 499 nfs_local_pgio_init(hdr, call_ops); 500 hdr->res.eof = false; 501 502 if (iocb->kiocb.ki_flags & IOCB_DIRECT) { 503 iocb->kiocb.ki_complete = nfs_local_read_aio_complete; 504 iocb->aio_complete_work = nfs_local_read_aio_complete_work; 505 } 506 507 INIT_WORK(&iocb->work, nfs_local_call_read); 508 queue_work(nfslocaliod_workqueue, &iocb->work); 509 510 return 0; 511 } 512 513 static void 514 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode) 515 { 516 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client; 517 u32 *verf = (u32 *)verifier->data; 518 int seq = 0; 519 520 do { 521 read_seqbegin_or_lock(&clp->cl_boot_lock, &seq); 522 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec; 523 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec; 524 } while (need_seqretry(&clp->cl_boot_lock, seq)); 525 done_seqretry(&clp->cl_boot_lock, seq); 526 } 527 528 static void 529 nfs_reset_boot_verifier(struct inode *inode) 530 { 531 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client; 532 533 write_seqlock(&clp->cl_boot_lock); 534 ktime_get_real_ts64(&clp->cl_nfssvc_boot); 535 write_sequnlock(&clp->cl_boot_lock); 536 } 537 538 static void 539 nfs_set_local_verifier(struct inode *inode, 540 struct nfs_writeverf *verf, 541 enum nfs3_stable_how how) 542 { 543 nfs_copy_boot_verifier(&verf->verifier, inode); 544 verf->committed = how; 545 } 546 547 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */ 548 static int __vfs_getattr(struct path *p, struct kstat *stat, int version) 549 { 550 u32 request_mask = STATX_BASIC_STATS; 551 552 if (version == 4) 553 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE); 554 return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT); 555 } 556 557 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */ 558 static u64 __nfsd4_change_attribute(const struct kstat *stat, 559 const struct inode *inode) 560 { 561 u64 chattr; 562 563 if (stat->result_mask & STATX_CHANGE_COOKIE) { 564 chattr = stat->change_cookie; 565 if (S_ISREG(inode->i_mode) && 566 !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) { 567 chattr += (u64)stat->ctime.tv_sec << 30; 568 chattr += stat->ctime.tv_nsec; 569 } 570 } else { 571 chattr = time_to_chattr(&stat->ctime); 572 } 573 return chattr; 574 } 575 576 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb) 577 { 578 struct kstat stat; 579 struct file *filp = iocb->kiocb.ki_filp; 580 struct nfs_pgio_header *hdr = iocb->hdr; 581 struct nfs_fattr *fattr = hdr->res.fattr; 582 int version = NFS_PROTO(hdr->inode)->version; 583 584 if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version)) 585 return; 586 587 fattr->valid = (NFS_ATTR_FATTR_FILEID | 588 NFS_ATTR_FATTR_CHANGE | 589 NFS_ATTR_FATTR_SIZE | 590 NFS_ATTR_FATTR_ATIME | 591 NFS_ATTR_FATTR_MTIME | 592 NFS_ATTR_FATTR_CTIME | 593 NFS_ATTR_FATTR_SPACE_USED); 594 595 fattr->fileid = stat.ino; 596 fattr->size = stat.size; 597 fattr->atime = stat.atime; 598 fattr->mtime = stat.mtime; 599 fattr->ctime = stat.ctime; 600 if (version == 4) { 601 fattr->change_attr = 602 __nfsd4_change_attribute(&stat, file_inode(filp)); 603 } else 604 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime); 605 fattr->du.nfs3.used = stat.blocks << 9; 606 } 607 608 static void 609 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status) 610 { 611 struct nfs_pgio_header *hdr = iocb->hdr; 612 struct inode *inode = hdr->inode; 613 614 dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0); 615 616 /* Handle short writes as if they are ENOSPC */ 617 if (status > 0 && status < hdr->args.count) { 618 hdr->mds_offset += status; 619 hdr->args.offset += status; 620 hdr->args.pgbase += status; 621 hdr->args.count -= status; 622 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset); 623 status = -ENOSPC; 624 } 625 if (status < 0) 626 nfs_reset_boot_verifier(inode); 627 628 nfs_local_pgio_done(hdr, status); 629 } 630 631 static void nfs_local_write_aio_complete_work(struct work_struct *work) 632 { 633 struct nfs_local_kiocb *iocb = 634 container_of(work, struct nfs_local_kiocb, work); 635 636 nfs_local_vfs_getattr(iocb); 637 nfs_local_pgio_release(iocb); 638 } 639 640 static void nfs_local_write_aio_complete(struct kiocb *kiocb, long ret) 641 { 642 struct nfs_local_kiocb *iocb = 643 container_of(kiocb, struct nfs_local_kiocb, kiocb); 644 645 nfs_local_write_done(iocb, ret); 646 nfs_local_pgio_aio_complete(iocb); /* Calls nfs_local_write_aio_complete_work */ 647 } 648 649 static void nfs_local_call_write(struct work_struct *work) 650 { 651 struct nfs_local_kiocb *iocb = 652 container_of(work, struct nfs_local_kiocb, work); 653 struct file *filp = iocb->kiocb.ki_filp; 654 unsigned long old_flags = current->flags; 655 const struct cred *save_cred; 656 struct iov_iter iter; 657 ssize_t status; 658 659 current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO; 660 save_cred = override_creds(filp->f_cred); 661 662 nfs_local_iter_init(&iter, iocb, WRITE); 663 664 file_start_write(filp); 665 status = filp->f_op->write_iter(&iocb->kiocb, &iter); 666 file_end_write(filp); 667 if (status != -EIOCBQUEUED) { 668 nfs_local_write_done(iocb, status); 669 nfs_local_vfs_getattr(iocb); 670 nfs_local_pgio_release(iocb); 671 } 672 673 revert_creds(save_cred); 674 current->flags = old_flags; 675 } 676 677 static int 678 nfs_do_local_write(struct nfs_pgio_header *hdr, 679 struct nfsd_file *localio, 680 const struct rpc_call_ops *call_ops) 681 { 682 struct nfs_local_kiocb *iocb; 683 struct file *file = nfs_to->nfsd_file_file(localio); 684 685 /* Don't support filesystems without write_iter */ 686 if (!file->f_op->write_iter) 687 return -EAGAIN; 688 689 dprintk("%s: vfs_write count=%u pos=%llu %s\n", 690 __func__, hdr->args.count, hdr->args.offset, 691 (hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable"); 692 693 iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO); 694 if (iocb == NULL) 695 return -ENOMEM; 696 iocb->localio = localio; 697 698 switch (hdr->args.stable) { 699 default: 700 break; 701 case NFS_DATA_SYNC: 702 iocb->kiocb.ki_flags |= IOCB_DSYNC; 703 break; 704 case NFS_FILE_SYNC: 705 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC; 706 } 707 708 nfs_local_pgio_init(hdr, call_ops); 709 710 nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable); 711 712 if (iocb->kiocb.ki_flags & IOCB_DIRECT) { 713 iocb->kiocb.ki_complete = nfs_local_write_aio_complete; 714 iocb->aio_complete_work = nfs_local_write_aio_complete_work; 715 } 716 717 INIT_WORK(&iocb->work, nfs_local_call_write); 718 queue_work(nfslocaliod_workqueue, &iocb->work); 719 720 return 0; 721 } 722 723 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio, 724 struct nfs_pgio_header *hdr, 725 const struct rpc_call_ops *call_ops) 726 { 727 int status = 0; 728 729 if (!hdr->args.count) 730 return 0; 731 732 switch (hdr->rw_mode) { 733 case FMODE_READ: 734 status = nfs_do_local_read(hdr, localio, call_ops); 735 break; 736 case FMODE_WRITE: 737 status = nfs_do_local_write(hdr, localio, call_ops); 738 break; 739 default: 740 dprintk("%s: invalid mode: %d\n", __func__, 741 hdr->rw_mode); 742 status = -EINVAL; 743 } 744 745 if (status != 0) { 746 if (status == -EAGAIN) 747 nfs_localio_disable_client(clp); 748 nfs_local_file_put(localio); 749 hdr->task.tk_status = status; 750 nfs_local_hdr_release(hdr, call_ops); 751 } 752 return status; 753 } 754 755 static void 756 nfs_local_init_commit(struct nfs_commit_data *data, 757 const struct rpc_call_ops *call_ops) 758 { 759 data->task.tk_ops = call_ops; 760 } 761 762 static int 763 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data) 764 { 765 loff_t start = data->args.offset; 766 loff_t end = LLONG_MAX; 767 768 if (data->args.count > 0) { 769 end = start + data->args.count - 1; 770 if (end < start) 771 end = LLONG_MAX; 772 } 773 774 dprintk("%s: commit %llu - %llu\n", __func__, start, end); 775 return vfs_fsync_range(filp, start, end, 0); 776 } 777 778 static void 779 nfs_local_commit_done(struct nfs_commit_data *data, int status) 780 { 781 if (status >= 0) { 782 nfs_set_local_verifier(data->inode, 783 data->res.verf, 784 NFS_FILE_SYNC); 785 data->res.op_status = NFS4_OK; 786 data->task.tk_status = 0; 787 } else { 788 nfs_reset_boot_verifier(data->inode); 789 data->res.op_status = nfs_localio_errno_to_nfs4_stat(status); 790 data->task.tk_status = status; 791 } 792 } 793 794 static void 795 nfs_local_release_commit_data(struct nfsd_file *localio, 796 struct nfs_commit_data *data, 797 const struct rpc_call_ops *call_ops) 798 { 799 nfs_local_file_put(localio); 800 call_ops->rpc_call_done(&data->task, data); 801 call_ops->rpc_release(data); 802 } 803 804 static void 805 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx) 806 { 807 nfs_local_release_commit_data(ctx->localio, ctx->data, 808 ctx->data->task.tk_ops); 809 kfree(ctx); 810 } 811 812 static void 813 nfs_local_fsync_work(struct work_struct *work) 814 { 815 struct nfs_local_fsync_ctx *ctx; 816 int status; 817 818 ctx = container_of(work, struct nfs_local_fsync_ctx, work); 819 820 status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio), 821 ctx->data); 822 nfs_local_commit_done(ctx->data, status); 823 if (ctx->done != NULL) 824 complete(ctx->done); 825 nfs_local_fsync_ctx_free(ctx); 826 } 827 828 static struct nfs_local_fsync_ctx * 829 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data, 830 struct nfsd_file *localio, gfp_t flags) 831 { 832 struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags); 833 834 if (ctx != NULL) { 835 ctx->localio = localio; 836 ctx->data = data; 837 INIT_WORK(&ctx->work, nfs_local_fsync_work); 838 ctx->done = NULL; 839 } 840 return ctx; 841 } 842 843 int nfs_local_commit(struct nfsd_file *localio, 844 struct nfs_commit_data *data, 845 const struct rpc_call_ops *call_ops, int how) 846 { 847 struct nfs_local_fsync_ctx *ctx; 848 849 ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL); 850 if (!ctx) { 851 nfs_local_commit_done(data, -ENOMEM); 852 nfs_local_release_commit_data(localio, data, call_ops); 853 return -ENOMEM; 854 } 855 856 nfs_local_init_commit(data, call_ops); 857 858 if (how & FLUSH_SYNC) { 859 DECLARE_COMPLETION_ONSTACK(done); 860 ctx->done = &done; 861 queue_work(nfsiod_workqueue, &ctx->work); 862 wait_for_completion(&done); 863 } else 864 queue_work(nfsiod_workqueue, &ctx->work); 865 866 return 0; 867 } 868