1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * NFS client support for local clients to bypass network stack 4 * 5 * Copyright (C) 2014 Weston Andros Adamson <dros@primarydata.com> 6 * Copyright (C) 2019 Trond Myklebust <trond.myklebust@hammerspace.com> 7 * Copyright (C) 2024 Mike Snitzer <snitzer@hammerspace.com> 8 * Copyright (C) 2024 NeilBrown <neilb@suse.de> 9 */ 10 11 #include <linux/module.h> 12 #include <linux/errno.h> 13 #include <linux/vfs.h> 14 #include <linux/file.h> 15 #include <linux/inet.h> 16 #include <linux/sunrpc/addr.h> 17 #include <linux/inetdevice.h> 18 #include <net/addrconf.h> 19 #include <linux/nfs_common.h> 20 #include <linux/nfslocalio.h> 21 #include <linux/bvec.h> 22 23 #include <linux/nfs.h> 24 #include <linux/nfs_fs.h> 25 #include <linux/nfs_xdr.h> 26 27 #include "internal.h" 28 #include "pnfs.h" 29 #include "nfstrace.h" 30 31 #define NFSDBG_FACILITY NFSDBG_VFS 32 33 struct nfs_local_kiocb { 34 struct kiocb kiocb; 35 struct bio_vec *bvec; 36 struct nfs_pgio_header *hdr; 37 struct work_struct work; 38 struct nfsd_file *localio; 39 }; 40 41 struct nfs_local_fsync_ctx { 42 struct nfsd_file *localio; 43 struct nfs_commit_data *data; 44 struct work_struct work; 45 struct completion *done; 46 }; 47 48 static bool localio_enabled __read_mostly = true; 49 module_param(localio_enabled, bool, 0644); 50 51 static inline bool nfs_client_is_local(const struct nfs_client *clp) 52 { 53 return !!test_bit(NFS_CS_LOCAL_IO, &clp->cl_flags); 54 } 55 56 bool nfs_server_is_local(const struct nfs_client *clp) 57 { 58 return nfs_client_is_local(clp) && localio_enabled; 59 } 60 EXPORT_SYMBOL_GPL(nfs_server_is_local); 61 62 /* 63 * UUID_IS_LOCAL XDR functions 64 */ 65 66 static void localio_xdr_enc_uuidargs(struct rpc_rqst *req, 67 struct xdr_stream *xdr, 68 const void *data) 69 { 70 const u8 *uuid = data; 71 72 encode_opaque_fixed(xdr, uuid, UUID_SIZE); 73 } 74 75 static int localio_xdr_dec_uuidres(struct rpc_rqst *req, 76 struct xdr_stream *xdr, 77 void *result) 78 { 79 /* void return */ 80 return 0; 81 } 82 83 static const struct rpc_procinfo nfs_localio_procedures[] = { 84 [LOCALIOPROC_UUID_IS_LOCAL] = { 85 .p_proc = LOCALIOPROC_UUID_IS_LOCAL, 86 .p_encode = localio_xdr_enc_uuidargs, 87 .p_decode = localio_xdr_dec_uuidres, 88 .p_arglen = XDR_QUADLEN(UUID_SIZE), 89 .p_replen = 0, 90 .p_statidx = LOCALIOPROC_UUID_IS_LOCAL, 91 .p_name = "UUID_IS_LOCAL", 92 }, 93 }; 94 95 static unsigned int nfs_localio_counts[ARRAY_SIZE(nfs_localio_procedures)]; 96 static const struct rpc_version nfslocalio_version1 = { 97 .number = 1, 98 .nrprocs = ARRAY_SIZE(nfs_localio_procedures), 99 .procs = nfs_localio_procedures, 100 .counts = nfs_localio_counts, 101 }; 102 103 static const struct rpc_version *nfslocalio_version[] = { 104 [1] = &nfslocalio_version1, 105 }; 106 107 extern const struct rpc_program nfslocalio_program; 108 static struct rpc_stat nfslocalio_rpcstat = { &nfslocalio_program }; 109 110 const struct rpc_program nfslocalio_program = { 111 .name = "nfslocalio", 112 .number = NFS_LOCALIO_PROGRAM, 113 .nrvers = ARRAY_SIZE(nfslocalio_version), 114 .version = nfslocalio_version, 115 .stats = &nfslocalio_rpcstat, 116 }; 117 118 /* 119 * nfs_local_enable - enable local i/o for an nfs_client 120 */ 121 static void nfs_local_enable(struct nfs_client *clp) 122 { 123 spin_lock(&clp->cl_localio_lock); 124 set_bit(NFS_CS_LOCAL_IO, &clp->cl_flags); 125 trace_nfs_local_enable(clp); 126 spin_unlock(&clp->cl_localio_lock); 127 } 128 129 /* 130 * nfs_local_disable - disable local i/o for an nfs_client 131 */ 132 void nfs_local_disable(struct nfs_client *clp) 133 { 134 spin_lock(&clp->cl_localio_lock); 135 if (test_and_clear_bit(NFS_CS_LOCAL_IO, &clp->cl_flags)) { 136 trace_nfs_local_disable(clp); 137 nfs_uuid_invalidate_one_client(&clp->cl_uuid); 138 } 139 spin_unlock(&clp->cl_localio_lock); 140 } 141 142 /* 143 * nfs_init_localioclient - Initialise an NFS localio client connection 144 */ 145 static struct rpc_clnt *nfs_init_localioclient(struct nfs_client *clp) 146 { 147 struct rpc_clnt *rpcclient_localio; 148 149 rpcclient_localio = rpc_bind_new_program(clp->cl_rpcclient, 150 &nfslocalio_program, 1); 151 152 dprintk_rcu("%s: server (%s) %s NFS LOCALIO.\n", 153 __func__, rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR), 154 (IS_ERR(rpcclient_localio) ? "does not support" : "supports")); 155 156 return rpcclient_localio; 157 } 158 159 static bool nfs_server_uuid_is_local(struct nfs_client *clp) 160 { 161 u8 uuid[UUID_SIZE]; 162 struct rpc_message msg = { 163 .rpc_argp = &uuid, 164 }; 165 struct rpc_clnt *rpcclient_localio; 166 int status; 167 168 rpcclient_localio = nfs_init_localioclient(clp); 169 if (IS_ERR(rpcclient_localio)) 170 return false; 171 172 export_uuid(uuid, &clp->cl_uuid.uuid); 173 174 msg.rpc_proc = &nfs_localio_procedures[LOCALIOPROC_UUID_IS_LOCAL]; 175 status = rpc_call_sync(rpcclient_localio, &msg, 0); 176 dprintk("%s: NFS reply UUID_IS_LOCAL: status=%d\n", 177 __func__, status); 178 rpc_shutdown_client(rpcclient_localio); 179 180 /* Server is only local if it initialized required struct members */ 181 if (status || !clp->cl_uuid.net || !clp->cl_uuid.dom) 182 return false; 183 184 return true; 185 } 186 187 /* 188 * nfs_local_probe - probe local i/o support for an nfs_server and nfs_client 189 * - called after alloc_client and init_client (so cl_rpcclient exists) 190 * - this function is idempotent, it can be called for old or new clients 191 */ 192 void nfs_local_probe(struct nfs_client *clp) 193 { 194 /* Disallow localio if disabled via sysfs or AUTH_SYS isn't used */ 195 if (!localio_enabled || 196 clp->cl_rpcclient->cl_auth->au_flavor != RPC_AUTH_UNIX) { 197 nfs_local_disable(clp); 198 return; 199 } 200 201 if (nfs_client_is_local(clp)) { 202 /* If already enabled, disable and re-enable */ 203 nfs_local_disable(clp); 204 } 205 206 if (!nfs_uuid_begin(&clp->cl_uuid)) 207 return; 208 if (nfs_server_uuid_is_local(clp)) 209 nfs_local_enable(clp); 210 nfs_uuid_end(&clp->cl_uuid); 211 } 212 EXPORT_SYMBOL_GPL(nfs_local_probe); 213 214 /* 215 * nfs_local_open_fh - open a local filehandle in terms of nfsd_file 216 * 217 * Returns a pointer to a struct nfsd_file or NULL 218 */ 219 struct nfsd_file * 220 nfs_local_open_fh(struct nfs_client *clp, const struct cred *cred, 221 struct nfs_fh *fh, const fmode_t mode) 222 { 223 struct nfsd_file *localio; 224 int status; 225 226 if (!nfs_server_is_local(clp)) 227 return NULL; 228 if (mode & ~(FMODE_READ | FMODE_WRITE)) 229 return NULL; 230 231 localio = nfs_open_local_fh(&clp->cl_uuid, clp->cl_rpcclient, 232 cred, fh, mode); 233 if (IS_ERR(localio)) { 234 status = PTR_ERR(localio); 235 trace_nfs_local_open_fh(fh, mode, status); 236 switch (status) { 237 case -ENOMEM: 238 case -ENXIO: 239 case -ENOENT: 240 /* Revalidate localio, will disable if unsupported */ 241 nfs_local_probe(clp); 242 } 243 return NULL; 244 } 245 return localio; 246 } 247 EXPORT_SYMBOL_GPL(nfs_local_open_fh); 248 249 static struct bio_vec * 250 nfs_bvec_alloc_and_import_pagevec(struct page **pagevec, 251 unsigned int npages, gfp_t flags) 252 { 253 struct bio_vec *bvec, *p; 254 255 bvec = kmalloc_array(npages, sizeof(*bvec), flags); 256 if (bvec != NULL) { 257 for (p = bvec; npages > 0; p++, pagevec++, npages--) { 258 p->bv_page = *pagevec; 259 p->bv_len = PAGE_SIZE; 260 p->bv_offset = 0; 261 } 262 } 263 return bvec; 264 } 265 266 static void 267 nfs_local_iocb_free(struct nfs_local_kiocb *iocb) 268 { 269 kfree(iocb->bvec); 270 kfree(iocb); 271 } 272 273 static struct nfs_local_kiocb * 274 nfs_local_iocb_alloc(struct nfs_pgio_header *hdr, 275 struct file *file, gfp_t flags) 276 { 277 struct nfs_local_kiocb *iocb; 278 279 iocb = kmalloc(sizeof(*iocb), flags); 280 if (iocb == NULL) 281 return NULL; 282 iocb->bvec = nfs_bvec_alloc_and_import_pagevec(hdr->page_array.pagevec, 283 hdr->page_array.npages, flags); 284 if (iocb->bvec == NULL) { 285 kfree(iocb); 286 return NULL; 287 } 288 init_sync_kiocb(&iocb->kiocb, file); 289 iocb->kiocb.ki_pos = hdr->args.offset; 290 iocb->hdr = hdr; 291 iocb->kiocb.ki_flags &= ~IOCB_APPEND; 292 return iocb; 293 } 294 295 static void 296 nfs_local_iter_init(struct iov_iter *i, struct nfs_local_kiocb *iocb, int dir) 297 { 298 struct nfs_pgio_header *hdr = iocb->hdr; 299 300 iov_iter_bvec(i, dir, iocb->bvec, hdr->page_array.npages, 301 hdr->args.count + hdr->args.pgbase); 302 if (hdr->args.pgbase != 0) 303 iov_iter_advance(i, hdr->args.pgbase); 304 } 305 306 static void 307 nfs_local_hdr_release(struct nfs_pgio_header *hdr, 308 const struct rpc_call_ops *call_ops) 309 { 310 call_ops->rpc_call_done(&hdr->task, hdr); 311 call_ops->rpc_release(hdr); 312 } 313 314 static void 315 nfs_local_pgio_init(struct nfs_pgio_header *hdr, 316 const struct rpc_call_ops *call_ops) 317 { 318 hdr->task.tk_ops = call_ops; 319 if (!hdr->task.tk_start) 320 hdr->task.tk_start = ktime_get(); 321 } 322 323 static void 324 nfs_local_pgio_done(struct nfs_pgio_header *hdr, long status) 325 { 326 if (status >= 0) { 327 hdr->res.count = status; 328 hdr->res.op_status = NFS4_OK; 329 hdr->task.tk_status = 0; 330 } else { 331 hdr->res.op_status = nfs4_stat_to_errno(status); 332 hdr->task.tk_status = status; 333 } 334 } 335 336 static void 337 nfs_local_pgio_release(struct nfs_local_kiocb *iocb) 338 { 339 struct nfs_pgio_header *hdr = iocb->hdr; 340 341 nfs_to_nfsd_file_put_local(iocb->localio); 342 nfs_local_iocb_free(iocb); 343 nfs_local_hdr_release(hdr, hdr->task.tk_ops); 344 } 345 346 static void 347 nfs_local_read_done(struct nfs_local_kiocb *iocb, long status) 348 { 349 struct nfs_pgio_header *hdr = iocb->hdr; 350 struct file *filp = iocb->kiocb.ki_filp; 351 352 nfs_local_pgio_done(hdr, status); 353 354 /* 355 * Must clear replen otherwise NFSv3 data corruption will occur 356 * if/when switching from LOCALIO back to using normal RPC. 357 */ 358 hdr->res.replen = 0; 359 360 if (hdr->res.count != hdr->args.count || 361 hdr->args.offset + hdr->res.count >= i_size_read(file_inode(filp))) 362 hdr->res.eof = true; 363 364 dprintk("%s: read %ld bytes eof %d.\n", __func__, 365 status > 0 ? status : 0, hdr->res.eof); 366 } 367 368 static void nfs_local_call_read(struct work_struct *work) 369 { 370 struct nfs_local_kiocb *iocb = 371 container_of(work, struct nfs_local_kiocb, work); 372 struct file *filp = iocb->kiocb.ki_filp; 373 const struct cred *save_cred; 374 struct iov_iter iter; 375 ssize_t status; 376 377 save_cred = override_creds(filp->f_cred); 378 379 nfs_local_iter_init(&iter, iocb, READ); 380 381 status = filp->f_op->read_iter(&iocb->kiocb, &iter); 382 WARN_ON_ONCE(status == -EIOCBQUEUED); 383 384 nfs_local_read_done(iocb, status); 385 nfs_local_pgio_release(iocb); 386 387 revert_creds(save_cred); 388 } 389 390 static int 391 nfs_do_local_read(struct nfs_pgio_header *hdr, 392 struct nfsd_file *localio, 393 const struct rpc_call_ops *call_ops) 394 { 395 struct nfs_local_kiocb *iocb; 396 struct file *file = nfs_to->nfsd_file_file(localio); 397 398 /* Don't support filesystems without read_iter */ 399 if (!file->f_op->read_iter) 400 return -EAGAIN; 401 402 dprintk("%s: vfs_read count=%u pos=%llu\n", 403 __func__, hdr->args.count, hdr->args.offset); 404 405 iocb = nfs_local_iocb_alloc(hdr, file, GFP_KERNEL); 406 if (iocb == NULL) 407 return -ENOMEM; 408 iocb->localio = localio; 409 410 nfs_local_pgio_init(hdr, call_ops); 411 hdr->res.eof = false; 412 413 INIT_WORK(&iocb->work, nfs_local_call_read); 414 queue_work(nfslocaliod_workqueue, &iocb->work); 415 416 return 0; 417 } 418 419 static void 420 nfs_copy_boot_verifier(struct nfs_write_verifier *verifier, struct inode *inode) 421 { 422 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client; 423 u32 *verf = (u32 *)verifier->data; 424 int seq = 0; 425 426 do { 427 read_seqbegin_or_lock(&clp->cl_boot_lock, &seq); 428 verf[0] = (u32)clp->cl_nfssvc_boot.tv_sec; 429 verf[1] = (u32)clp->cl_nfssvc_boot.tv_nsec; 430 } while (need_seqretry(&clp->cl_boot_lock, seq)); 431 done_seqretry(&clp->cl_boot_lock, seq); 432 } 433 434 static void 435 nfs_reset_boot_verifier(struct inode *inode) 436 { 437 struct nfs_client *clp = NFS_SERVER(inode)->nfs_client; 438 439 write_seqlock(&clp->cl_boot_lock); 440 ktime_get_real_ts64(&clp->cl_nfssvc_boot); 441 write_sequnlock(&clp->cl_boot_lock); 442 } 443 444 static void 445 nfs_set_local_verifier(struct inode *inode, 446 struct nfs_writeverf *verf, 447 enum nfs3_stable_how how) 448 { 449 nfs_copy_boot_verifier(&verf->verifier, inode); 450 verf->committed = how; 451 } 452 453 /* Factored out from fs/nfsd/vfs.h:fh_getattr() */ 454 static int __vfs_getattr(struct path *p, struct kstat *stat, int version) 455 { 456 u32 request_mask = STATX_BASIC_STATS; 457 458 if (version == 4) 459 request_mask |= (STATX_BTIME | STATX_CHANGE_COOKIE); 460 return vfs_getattr(p, stat, request_mask, AT_STATX_SYNC_AS_STAT); 461 } 462 463 /* Copied from fs/nfsd/nfsfh.c:nfsd4_change_attribute() */ 464 static u64 __nfsd4_change_attribute(const struct kstat *stat, 465 const struct inode *inode) 466 { 467 u64 chattr; 468 469 if (stat->result_mask & STATX_CHANGE_COOKIE) { 470 chattr = stat->change_cookie; 471 if (S_ISREG(inode->i_mode) && 472 !(stat->attributes & STATX_ATTR_CHANGE_MONOTONIC)) { 473 chattr += (u64)stat->ctime.tv_sec << 30; 474 chattr += stat->ctime.tv_nsec; 475 } 476 } else { 477 chattr = time_to_chattr(&stat->ctime); 478 } 479 return chattr; 480 } 481 482 static void nfs_local_vfs_getattr(struct nfs_local_kiocb *iocb) 483 { 484 struct kstat stat; 485 struct file *filp = iocb->kiocb.ki_filp; 486 struct nfs_pgio_header *hdr = iocb->hdr; 487 struct nfs_fattr *fattr = hdr->res.fattr; 488 int version = NFS_PROTO(hdr->inode)->version; 489 490 if (unlikely(!fattr) || __vfs_getattr(&filp->f_path, &stat, version)) 491 return; 492 493 fattr->valid = (NFS_ATTR_FATTR_FILEID | 494 NFS_ATTR_FATTR_CHANGE | 495 NFS_ATTR_FATTR_SIZE | 496 NFS_ATTR_FATTR_ATIME | 497 NFS_ATTR_FATTR_MTIME | 498 NFS_ATTR_FATTR_CTIME | 499 NFS_ATTR_FATTR_SPACE_USED); 500 501 fattr->fileid = stat.ino; 502 fattr->size = stat.size; 503 fattr->atime = stat.atime; 504 fattr->mtime = stat.mtime; 505 fattr->ctime = stat.ctime; 506 if (version == 4) { 507 fattr->change_attr = 508 __nfsd4_change_attribute(&stat, file_inode(filp)); 509 } else 510 fattr->change_attr = nfs_timespec_to_change_attr(&fattr->ctime); 511 fattr->du.nfs3.used = stat.blocks << 9; 512 } 513 514 static void 515 nfs_local_write_done(struct nfs_local_kiocb *iocb, long status) 516 { 517 struct nfs_pgio_header *hdr = iocb->hdr; 518 struct inode *inode = hdr->inode; 519 520 dprintk("%s: wrote %ld bytes.\n", __func__, status > 0 ? status : 0); 521 522 /* Handle short writes as if they are ENOSPC */ 523 if (status > 0 && status < hdr->args.count) { 524 hdr->mds_offset += status; 525 hdr->args.offset += status; 526 hdr->args.pgbase += status; 527 hdr->args.count -= status; 528 nfs_set_pgio_error(hdr, -ENOSPC, hdr->args.offset); 529 status = -ENOSPC; 530 } 531 if (status < 0) 532 nfs_reset_boot_verifier(inode); 533 534 nfs_local_pgio_done(hdr, status); 535 } 536 537 static void nfs_local_call_write(struct work_struct *work) 538 { 539 struct nfs_local_kiocb *iocb = 540 container_of(work, struct nfs_local_kiocb, work); 541 struct file *filp = iocb->kiocb.ki_filp; 542 unsigned long old_flags = current->flags; 543 const struct cred *save_cred; 544 struct iov_iter iter; 545 ssize_t status; 546 547 current->flags |= PF_LOCAL_THROTTLE | PF_MEMALLOC_NOIO; 548 save_cred = override_creds(filp->f_cred); 549 550 nfs_local_iter_init(&iter, iocb, WRITE); 551 552 file_start_write(filp); 553 status = filp->f_op->write_iter(&iocb->kiocb, &iter); 554 file_end_write(filp); 555 WARN_ON_ONCE(status == -EIOCBQUEUED); 556 557 nfs_local_write_done(iocb, status); 558 nfs_local_vfs_getattr(iocb); 559 nfs_local_pgio_release(iocb); 560 561 revert_creds(save_cred); 562 current->flags = old_flags; 563 } 564 565 static int 566 nfs_do_local_write(struct nfs_pgio_header *hdr, 567 struct nfsd_file *localio, 568 const struct rpc_call_ops *call_ops) 569 { 570 struct nfs_local_kiocb *iocb; 571 struct file *file = nfs_to->nfsd_file_file(localio); 572 573 /* Don't support filesystems without write_iter */ 574 if (!file->f_op->write_iter) 575 return -EAGAIN; 576 577 dprintk("%s: vfs_write count=%u pos=%llu %s\n", 578 __func__, hdr->args.count, hdr->args.offset, 579 (hdr->args.stable == NFS_UNSTABLE) ? "unstable" : "stable"); 580 581 iocb = nfs_local_iocb_alloc(hdr, file, GFP_NOIO); 582 if (iocb == NULL) 583 return -ENOMEM; 584 iocb->localio = localio; 585 586 switch (hdr->args.stable) { 587 default: 588 break; 589 case NFS_DATA_SYNC: 590 iocb->kiocb.ki_flags |= IOCB_DSYNC; 591 break; 592 case NFS_FILE_SYNC: 593 iocb->kiocb.ki_flags |= IOCB_DSYNC|IOCB_SYNC; 594 } 595 nfs_local_pgio_init(hdr, call_ops); 596 597 nfs_set_local_verifier(hdr->inode, hdr->res.verf, hdr->args.stable); 598 599 INIT_WORK(&iocb->work, nfs_local_call_write); 600 queue_work(nfslocaliod_workqueue, &iocb->work); 601 602 return 0; 603 } 604 605 int nfs_local_doio(struct nfs_client *clp, struct nfsd_file *localio, 606 struct nfs_pgio_header *hdr, 607 const struct rpc_call_ops *call_ops) 608 { 609 int status = 0; 610 611 if (!hdr->args.count) 612 return 0; 613 614 switch (hdr->rw_mode) { 615 case FMODE_READ: 616 status = nfs_do_local_read(hdr, localio, call_ops); 617 break; 618 case FMODE_WRITE: 619 status = nfs_do_local_write(hdr, localio, call_ops); 620 break; 621 default: 622 dprintk("%s: invalid mode: %d\n", __func__, 623 hdr->rw_mode); 624 status = -EINVAL; 625 } 626 627 if (status != 0) { 628 if (status == -EAGAIN) 629 nfs_local_disable(clp); 630 nfs_to_nfsd_file_put_local(localio); 631 hdr->task.tk_status = status; 632 nfs_local_hdr_release(hdr, call_ops); 633 } 634 return status; 635 } 636 637 static void 638 nfs_local_init_commit(struct nfs_commit_data *data, 639 const struct rpc_call_ops *call_ops) 640 { 641 data->task.tk_ops = call_ops; 642 } 643 644 static int 645 nfs_local_run_commit(struct file *filp, struct nfs_commit_data *data) 646 { 647 loff_t start = data->args.offset; 648 loff_t end = LLONG_MAX; 649 650 if (data->args.count > 0) { 651 end = start + data->args.count - 1; 652 if (end < start) 653 end = LLONG_MAX; 654 } 655 656 dprintk("%s: commit %llu - %llu\n", __func__, start, end); 657 return vfs_fsync_range(filp, start, end, 0); 658 } 659 660 static void 661 nfs_local_commit_done(struct nfs_commit_data *data, int status) 662 { 663 if (status >= 0) { 664 nfs_set_local_verifier(data->inode, 665 data->res.verf, 666 NFS_FILE_SYNC); 667 data->res.op_status = NFS4_OK; 668 data->task.tk_status = 0; 669 } else { 670 nfs_reset_boot_verifier(data->inode); 671 data->res.op_status = nfs4_stat_to_errno(status); 672 data->task.tk_status = status; 673 } 674 } 675 676 static void 677 nfs_local_release_commit_data(struct nfsd_file *localio, 678 struct nfs_commit_data *data, 679 const struct rpc_call_ops *call_ops) 680 { 681 nfs_to_nfsd_file_put_local(localio); 682 call_ops->rpc_call_done(&data->task, data); 683 call_ops->rpc_release(data); 684 } 685 686 static void 687 nfs_local_fsync_ctx_free(struct nfs_local_fsync_ctx *ctx) 688 { 689 nfs_local_release_commit_data(ctx->localio, ctx->data, 690 ctx->data->task.tk_ops); 691 kfree(ctx); 692 } 693 694 static void 695 nfs_local_fsync_work(struct work_struct *work) 696 { 697 struct nfs_local_fsync_ctx *ctx; 698 int status; 699 700 ctx = container_of(work, struct nfs_local_fsync_ctx, work); 701 702 status = nfs_local_run_commit(nfs_to->nfsd_file_file(ctx->localio), 703 ctx->data); 704 nfs_local_commit_done(ctx->data, status); 705 if (ctx->done != NULL) 706 complete(ctx->done); 707 nfs_local_fsync_ctx_free(ctx); 708 } 709 710 static struct nfs_local_fsync_ctx * 711 nfs_local_fsync_ctx_alloc(struct nfs_commit_data *data, 712 struct nfsd_file *localio, gfp_t flags) 713 { 714 struct nfs_local_fsync_ctx *ctx = kmalloc(sizeof(*ctx), flags); 715 716 if (ctx != NULL) { 717 ctx->localio = localio; 718 ctx->data = data; 719 INIT_WORK(&ctx->work, nfs_local_fsync_work); 720 ctx->done = NULL; 721 } 722 return ctx; 723 } 724 725 int nfs_local_commit(struct nfsd_file *localio, 726 struct nfs_commit_data *data, 727 const struct rpc_call_ops *call_ops, int how) 728 { 729 struct nfs_local_fsync_ctx *ctx; 730 731 ctx = nfs_local_fsync_ctx_alloc(data, localio, GFP_KERNEL); 732 if (!ctx) { 733 nfs_local_commit_done(data, -ENOMEM); 734 nfs_local_release_commit_data(localio, data, call_ops); 735 return -ENOMEM; 736 } 737 738 nfs_local_init_commit(data, call_ops); 739 740 if (how & FLUSH_SYNC) { 741 DECLARE_COMPLETION_ONSTACK(done); 742 ctx->done = &done; 743 queue_work(nfsiod_workqueue, &ctx->work); 744 wait_for_completion(&done); 745 } else 746 queue_work(nfsiod_workqueue, &ctx->work); 747 748 return 0; 749 } 750