1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 48 #include <linux/nfs_fs.h> 49 #include <linux/nfs_page.h> 50 #include <linux/sunrpc/clnt.h> 51 52 #include <asm/system.h> 53 #include <asm/uaccess.h> 54 #include <asm/atomic.h> 55 56 #include "internal.h" 57 #include "iostat.h" 58 59 #define NFSDBG_FACILITY NFSDBG_VFS 60 61 static struct kmem_cache *nfs_direct_cachep; 62 63 /* 64 * This represents a set of asynchronous requests that we're waiting on 65 */ 66 struct nfs_direct_req { 67 struct kref kref; /* release manager */ 68 69 /* I/O parameters */ 70 struct nfs_open_context *ctx; /* file open context info */ 71 struct kiocb * iocb; /* controlling i/o request */ 72 struct inode * inode; /* target file of i/o */ 73 74 /* completion state */ 75 atomic_t io_count; /* i/os we're waiting for */ 76 spinlock_t lock; /* protect completion state */ 77 ssize_t count, /* bytes actually processed */ 78 error; /* any reported error */ 79 struct completion completion; /* wait for i/o completion */ 80 81 /* commit state */ 82 struct list_head rewrite_list; /* saved nfs_write_data structs */ 83 struct nfs_write_data * commit_data; /* special write_data for commits */ 84 int flags; 85 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 86 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 87 struct nfs_writeverf verf; /* unstable write verifier */ 88 }; 89 90 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 91 static const struct rpc_call_ops nfs_write_direct_ops; 92 93 static inline void get_dreq(struct nfs_direct_req *dreq) 94 { 95 atomic_inc(&dreq->io_count); 96 } 97 98 static inline int put_dreq(struct nfs_direct_req *dreq) 99 { 100 return atomic_dec_and_test(&dreq->io_count); 101 } 102 103 /** 104 * nfs_direct_IO - NFS address space operation for direct I/O 105 * @rw: direction (read or write) 106 * @iocb: target I/O control block 107 * @iov: array of vectors that define I/O buffer 108 * @pos: offset in file to begin the operation 109 * @nr_segs: size of iovec array 110 * 111 * The presence of this routine in the address space ops vector means 112 * the NFS client supports direct I/O. However, we shunt off direct 113 * read and write requests before the VFS gets them, so this method 114 * should never be called. 115 */ 116 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 117 { 118 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 119 iocb->ki_filp->f_path.dentry->d_name.name, 120 (long long) pos, nr_segs); 121 122 return -EINVAL; 123 } 124 125 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 126 { 127 unsigned int npages; 128 unsigned int i; 129 130 if (count == 0) 131 return; 132 pages += (pgbase >> PAGE_SHIFT); 133 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 134 for (i = 0; i < npages; i++) { 135 struct page *page = pages[i]; 136 if (!PageCompound(page)) 137 set_page_dirty(page); 138 } 139 } 140 141 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 142 { 143 unsigned int i; 144 for (i = 0; i < npages; i++) 145 page_cache_release(pages[i]); 146 } 147 148 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 149 { 150 struct nfs_direct_req *dreq; 151 152 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 153 if (!dreq) 154 return NULL; 155 156 kref_init(&dreq->kref); 157 kref_get(&dreq->kref); 158 init_completion(&dreq->completion); 159 INIT_LIST_HEAD(&dreq->rewrite_list); 160 dreq->iocb = NULL; 161 dreq->ctx = NULL; 162 spin_lock_init(&dreq->lock); 163 atomic_set(&dreq->io_count, 0); 164 dreq->count = 0; 165 dreq->error = 0; 166 dreq->flags = 0; 167 168 return dreq; 169 } 170 171 static void nfs_direct_req_free(struct kref *kref) 172 { 173 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 174 175 if (dreq->ctx != NULL) 176 put_nfs_open_context(dreq->ctx); 177 kmem_cache_free(nfs_direct_cachep, dreq); 178 } 179 180 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 181 { 182 kref_put(&dreq->kref, nfs_direct_req_free); 183 } 184 185 /* 186 * Collects and returns the final error value/byte-count. 187 */ 188 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 189 { 190 ssize_t result = -EIOCBQUEUED; 191 192 /* Async requests don't wait here */ 193 if (dreq->iocb) 194 goto out; 195 196 result = wait_for_completion_killable(&dreq->completion); 197 198 if (!result) 199 result = dreq->error; 200 if (!result) 201 result = dreq->count; 202 203 out: 204 return (ssize_t) result; 205 } 206 207 /* 208 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 209 * the iocb is still valid here if this is a synchronous request. 210 */ 211 static void nfs_direct_complete(struct nfs_direct_req *dreq) 212 { 213 if (dreq->iocb) { 214 long res = (long) dreq->error; 215 if (!res) 216 res = (long) dreq->count; 217 aio_complete(dreq->iocb, res, 0); 218 } 219 complete_all(&dreq->completion); 220 221 nfs_direct_req_release(dreq); 222 } 223 224 /* 225 * We must hold a reference to all the pages in this direct read request 226 * until the RPCs complete. This could be long *after* we are woken up in 227 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 228 */ 229 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 230 { 231 struct nfs_read_data *data = calldata; 232 233 nfs_readpage_result(task, data); 234 } 235 236 static void nfs_direct_read_release(void *calldata) 237 { 238 239 struct nfs_read_data *data = calldata; 240 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 241 int status = data->task.tk_status; 242 243 spin_lock(&dreq->lock); 244 if (unlikely(status < 0)) { 245 dreq->error = status; 246 spin_unlock(&dreq->lock); 247 } else { 248 dreq->count += data->res.count; 249 spin_unlock(&dreq->lock); 250 nfs_direct_dirty_pages(data->pagevec, 251 data->args.pgbase, 252 data->res.count); 253 } 254 nfs_direct_release_pages(data->pagevec, data->npages); 255 256 if (put_dreq(dreq)) 257 nfs_direct_complete(dreq); 258 nfs_readdata_release(calldata); 259 } 260 261 static const struct rpc_call_ops nfs_read_direct_ops = { 262 .rpc_call_done = nfs_direct_read_result, 263 .rpc_release = nfs_direct_read_release, 264 }; 265 266 /* 267 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 268 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 269 * bail and stop sending more reads. Read length accounting is 270 * handled automatically by nfs_direct_read_result(). Otherwise, if 271 * no requests have been sent, just return an error. 272 */ 273 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 274 const struct iovec *iov, 275 loff_t pos) 276 { 277 struct nfs_open_context *ctx = dreq->ctx; 278 struct inode *inode = ctx->path.dentry->d_inode; 279 unsigned long user_addr = (unsigned long)iov->iov_base; 280 size_t count = iov->iov_len; 281 size_t rsize = NFS_SERVER(inode)->rsize; 282 struct rpc_task *task; 283 struct rpc_message msg = { 284 .rpc_cred = ctx->cred, 285 }; 286 struct rpc_task_setup task_setup_data = { 287 .rpc_client = NFS_CLIENT(inode), 288 .rpc_message = &msg, 289 .callback_ops = &nfs_read_direct_ops, 290 .workqueue = nfsiod_workqueue, 291 .flags = RPC_TASK_ASYNC, 292 }; 293 unsigned int pgbase; 294 int result; 295 ssize_t started = 0; 296 297 do { 298 struct nfs_read_data *data; 299 size_t bytes; 300 301 pgbase = user_addr & ~PAGE_MASK; 302 bytes = min(rsize,count); 303 304 result = -ENOMEM; 305 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 306 if (unlikely(!data)) 307 break; 308 309 down_read(¤t->mm->mmap_sem); 310 result = get_user_pages(current, current->mm, user_addr, 311 data->npages, 1, 0, data->pagevec, NULL); 312 up_read(¤t->mm->mmap_sem); 313 if (result < 0) { 314 nfs_readdata_release(data); 315 break; 316 } 317 if ((unsigned)result < data->npages) { 318 bytes = result * PAGE_SIZE; 319 if (bytes <= pgbase) { 320 nfs_direct_release_pages(data->pagevec, result); 321 nfs_readdata_release(data); 322 break; 323 } 324 bytes -= pgbase; 325 data->npages = result; 326 } 327 328 get_dreq(dreq); 329 330 data->req = (struct nfs_page *) dreq; 331 data->inode = inode; 332 data->cred = msg.rpc_cred; 333 data->args.fh = NFS_FH(inode); 334 data->args.context = get_nfs_open_context(ctx); 335 data->args.offset = pos; 336 data->args.pgbase = pgbase; 337 data->args.pages = data->pagevec; 338 data->args.count = bytes; 339 data->res.fattr = &data->fattr; 340 data->res.eof = 0; 341 data->res.count = bytes; 342 msg.rpc_argp = &data->args; 343 msg.rpc_resp = &data->res; 344 345 task_setup_data.task = &data->task; 346 task_setup_data.callback_data = data; 347 NFS_PROTO(inode)->read_setup(data, &msg); 348 349 task = rpc_run_task(&task_setup_data); 350 if (IS_ERR(task)) 351 break; 352 rpc_put_task(task); 353 354 dprintk("NFS: %5u initiated direct read call " 355 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 356 data->task.tk_pid, 357 inode->i_sb->s_id, 358 (long long)NFS_FILEID(inode), 359 bytes, 360 (unsigned long long)data->args.offset); 361 362 started += bytes; 363 user_addr += bytes; 364 pos += bytes; 365 /* FIXME: Remove this unnecessary math from final patch */ 366 pgbase += bytes; 367 pgbase &= ~PAGE_MASK; 368 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 369 370 count -= bytes; 371 } while (count != 0); 372 373 if (started) 374 return started; 375 return result < 0 ? (ssize_t) result : -EFAULT; 376 } 377 378 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 379 const struct iovec *iov, 380 unsigned long nr_segs, 381 loff_t pos) 382 { 383 ssize_t result = -EINVAL; 384 size_t requested_bytes = 0; 385 unsigned long seg; 386 387 get_dreq(dreq); 388 389 for (seg = 0; seg < nr_segs; seg++) { 390 const struct iovec *vec = &iov[seg]; 391 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 392 if (result < 0) 393 break; 394 requested_bytes += result; 395 if ((size_t)result < vec->iov_len) 396 break; 397 pos += vec->iov_len; 398 } 399 400 if (put_dreq(dreq)) 401 nfs_direct_complete(dreq); 402 403 if (requested_bytes != 0) 404 return 0; 405 406 if (result < 0) 407 return result; 408 return -EIO; 409 } 410 411 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 412 unsigned long nr_segs, loff_t pos) 413 { 414 ssize_t result = 0; 415 struct inode *inode = iocb->ki_filp->f_mapping->host; 416 struct nfs_direct_req *dreq; 417 418 dreq = nfs_direct_req_alloc(); 419 if (!dreq) 420 return -ENOMEM; 421 422 dreq->inode = inode; 423 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 424 if (!is_sync_kiocb(iocb)) 425 dreq->iocb = iocb; 426 427 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 428 if (!result) 429 result = nfs_direct_wait(dreq); 430 nfs_direct_req_release(dreq); 431 432 return result; 433 } 434 435 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 436 { 437 while (!list_empty(&dreq->rewrite_list)) { 438 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 439 list_del(&data->pages); 440 nfs_direct_release_pages(data->pagevec, data->npages); 441 nfs_writedata_release(data); 442 } 443 } 444 445 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 446 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 447 { 448 struct inode *inode = dreq->inode; 449 struct list_head *p; 450 struct nfs_write_data *data; 451 struct rpc_task *task; 452 struct rpc_message msg = { 453 .rpc_cred = dreq->ctx->cred, 454 }; 455 struct rpc_task_setup task_setup_data = { 456 .rpc_client = NFS_CLIENT(inode), 457 .callback_ops = &nfs_write_direct_ops, 458 .workqueue = nfsiod_workqueue, 459 .flags = RPC_TASK_ASYNC, 460 }; 461 462 dreq->count = 0; 463 get_dreq(dreq); 464 465 list_for_each(p, &dreq->rewrite_list) { 466 data = list_entry(p, struct nfs_write_data, pages); 467 468 get_dreq(dreq); 469 470 /* Use stable writes */ 471 data->args.stable = NFS_FILE_SYNC; 472 473 /* 474 * Reset data->res. 475 */ 476 nfs_fattr_init(&data->fattr); 477 data->res.count = data->args.count; 478 memset(&data->verf, 0, sizeof(data->verf)); 479 480 /* 481 * Reuse data->task; data->args should not have changed 482 * since the original request was sent. 483 */ 484 task_setup_data.task = &data->task; 485 task_setup_data.callback_data = data; 486 msg.rpc_argp = &data->args; 487 msg.rpc_resp = &data->res; 488 NFS_PROTO(inode)->write_setup(data, &msg); 489 490 /* 491 * We're called via an RPC callback, so BKL is already held. 492 */ 493 task = rpc_run_task(&task_setup_data); 494 if (!IS_ERR(task)) 495 rpc_put_task(task); 496 497 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 498 data->task.tk_pid, 499 inode->i_sb->s_id, 500 (long long)NFS_FILEID(inode), 501 data->args.count, 502 (unsigned long long)data->args.offset); 503 } 504 505 if (put_dreq(dreq)) 506 nfs_direct_write_complete(dreq, inode); 507 } 508 509 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 510 { 511 struct nfs_write_data *data = calldata; 512 513 /* Call the NFS version-specific code */ 514 NFS_PROTO(data->inode)->commit_done(task, data); 515 } 516 517 static void nfs_direct_commit_release(void *calldata) 518 { 519 struct nfs_write_data *data = calldata; 520 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 521 int status = data->task.tk_status; 522 523 if (status < 0) { 524 dprintk("NFS: %5u commit failed with error %d.\n", 525 data->task.tk_pid, status); 526 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 527 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 528 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 529 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 530 } 531 532 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 533 nfs_direct_write_complete(dreq, data->inode); 534 nfs_commitdata_release(calldata); 535 } 536 537 static const struct rpc_call_ops nfs_commit_direct_ops = { 538 .rpc_call_done = nfs_direct_commit_result, 539 .rpc_release = nfs_direct_commit_release, 540 }; 541 542 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 543 { 544 struct nfs_write_data *data = dreq->commit_data; 545 struct rpc_task *task; 546 struct rpc_message msg = { 547 .rpc_argp = &data->args, 548 .rpc_resp = &data->res, 549 .rpc_cred = dreq->ctx->cred, 550 }; 551 struct rpc_task_setup task_setup_data = { 552 .task = &data->task, 553 .rpc_client = NFS_CLIENT(dreq->inode), 554 .rpc_message = &msg, 555 .callback_ops = &nfs_commit_direct_ops, 556 .callback_data = data, 557 .workqueue = nfsiod_workqueue, 558 .flags = RPC_TASK_ASYNC, 559 }; 560 561 data->inode = dreq->inode; 562 data->cred = msg.rpc_cred; 563 564 data->args.fh = NFS_FH(data->inode); 565 data->args.offset = 0; 566 data->args.count = 0; 567 data->args.context = get_nfs_open_context(dreq->ctx); 568 data->res.count = 0; 569 data->res.fattr = &data->fattr; 570 data->res.verf = &data->verf; 571 572 NFS_PROTO(data->inode)->commit_setup(data, &msg); 573 574 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 575 dreq->commit_data = NULL; 576 577 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 578 579 task = rpc_run_task(&task_setup_data); 580 if (!IS_ERR(task)) 581 rpc_put_task(task); 582 } 583 584 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 585 { 586 int flags = dreq->flags; 587 588 dreq->flags = 0; 589 switch (flags) { 590 case NFS_ODIRECT_DO_COMMIT: 591 nfs_direct_commit_schedule(dreq); 592 break; 593 case NFS_ODIRECT_RESCHED_WRITES: 594 nfs_direct_write_reschedule(dreq); 595 break; 596 default: 597 if (dreq->commit_data != NULL) 598 nfs_commit_free(dreq->commit_data); 599 nfs_direct_free_writedata(dreq); 600 nfs_zap_mapping(inode, inode->i_mapping); 601 nfs_direct_complete(dreq); 602 } 603 } 604 605 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 606 { 607 dreq->commit_data = nfs_commitdata_alloc(); 608 if (dreq->commit_data != NULL) 609 dreq->commit_data->req = (struct nfs_page *) dreq; 610 } 611 #else 612 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 613 { 614 dreq->commit_data = NULL; 615 } 616 617 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 618 { 619 nfs_direct_free_writedata(dreq); 620 nfs_zap_mapping(inode, inode->i_mapping); 621 nfs_direct_complete(dreq); 622 } 623 #endif 624 625 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 626 { 627 struct nfs_write_data *data = calldata; 628 629 if (nfs_writeback_done(task, data) != 0) 630 return; 631 } 632 633 /* 634 * NB: Return the value of the first error return code. Subsequent 635 * errors after the first one are ignored. 636 */ 637 static void nfs_direct_write_release(void *calldata) 638 { 639 struct nfs_write_data *data = calldata; 640 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 641 int status = data->task.tk_status; 642 643 spin_lock(&dreq->lock); 644 645 if (unlikely(status < 0)) { 646 /* An error has occurred, so we should not commit */ 647 dreq->flags = 0; 648 dreq->error = status; 649 } 650 if (unlikely(dreq->error != 0)) 651 goto out_unlock; 652 653 dreq->count += data->res.count; 654 655 if (data->res.verf->committed != NFS_FILE_SYNC) { 656 switch (dreq->flags) { 657 case 0: 658 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 659 dreq->flags = NFS_ODIRECT_DO_COMMIT; 660 break; 661 case NFS_ODIRECT_DO_COMMIT: 662 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 663 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 664 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 665 } 666 } 667 } 668 out_unlock: 669 spin_unlock(&dreq->lock); 670 671 if (put_dreq(dreq)) 672 nfs_direct_write_complete(dreq, data->inode); 673 } 674 675 static const struct rpc_call_ops nfs_write_direct_ops = { 676 .rpc_call_done = nfs_direct_write_result, 677 .rpc_release = nfs_direct_write_release, 678 }; 679 680 /* 681 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 682 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 683 * bail and stop sending more writes. Write length accounting is 684 * handled automatically by nfs_direct_write_result(). Otherwise, if 685 * no requests have been sent, just return an error. 686 */ 687 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 688 const struct iovec *iov, 689 loff_t pos, int sync) 690 { 691 struct nfs_open_context *ctx = dreq->ctx; 692 struct inode *inode = ctx->path.dentry->d_inode; 693 unsigned long user_addr = (unsigned long)iov->iov_base; 694 size_t count = iov->iov_len; 695 struct rpc_task *task; 696 struct rpc_message msg = { 697 .rpc_cred = ctx->cred, 698 }; 699 struct rpc_task_setup task_setup_data = { 700 .rpc_client = NFS_CLIENT(inode), 701 .rpc_message = &msg, 702 .callback_ops = &nfs_write_direct_ops, 703 .workqueue = nfsiod_workqueue, 704 .flags = RPC_TASK_ASYNC, 705 }; 706 size_t wsize = NFS_SERVER(inode)->wsize; 707 unsigned int pgbase; 708 int result; 709 ssize_t started = 0; 710 711 do { 712 struct nfs_write_data *data; 713 size_t bytes; 714 715 pgbase = user_addr & ~PAGE_MASK; 716 bytes = min(wsize,count); 717 718 result = -ENOMEM; 719 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 720 if (unlikely(!data)) 721 break; 722 723 down_read(¤t->mm->mmap_sem); 724 result = get_user_pages(current, current->mm, user_addr, 725 data->npages, 0, 0, data->pagevec, NULL); 726 up_read(¤t->mm->mmap_sem); 727 if (result < 0) { 728 nfs_writedata_release(data); 729 break; 730 } 731 if ((unsigned)result < data->npages) { 732 bytes = result * PAGE_SIZE; 733 if (bytes <= pgbase) { 734 nfs_direct_release_pages(data->pagevec, result); 735 nfs_writedata_release(data); 736 break; 737 } 738 bytes -= pgbase; 739 data->npages = result; 740 } 741 742 get_dreq(dreq); 743 744 list_move_tail(&data->pages, &dreq->rewrite_list); 745 746 data->req = (struct nfs_page *) dreq; 747 data->inode = inode; 748 data->cred = msg.rpc_cred; 749 data->args.fh = NFS_FH(inode); 750 data->args.context = get_nfs_open_context(ctx); 751 data->args.offset = pos; 752 data->args.pgbase = pgbase; 753 data->args.pages = data->pagevec; 754 data->args.count = bytes; 755 data->args.stable = sync; 756 data->res.fattr = &data->fattr; 757 data->res.count = bytes; 758 data->res.verf = &data->verf; 759 760 task_setup_data.task = &data->task; 761 task_setup_data.callback_data = data; 762 msg.rpc_argp = &data->args; 763 msg.rpc_resp = &data->res; 764 NFS_PROTO(inode)->write_setup(data, &msg); 765 766 task = rpc_run_task(&task_setup_data); 767 if (IS_ERR(task)) 768 break; 769 rpc_put_task(task); 770 771 dprintk("NFS: %5u initiated direct write call " 772 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 773 data->task.tk_pid, 774 inode->i_sb->s_id, 775 (long long)NFS_FILEID(inode), 776 bytes, 777 (unsigned long long)data->args.offset); 778 779 started += bytes; 780 user_addr += bytes; 781 pos += bytes; 782 783 /* FIXME: Remove this useless math from the final patch */ 784 pgbase += bytes; 785 pgbase &= ~PAGE_MASK; 786 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 787 788 count -= bytes; 789 } while (count != 0); 790 791 if (started) 792 return started; 793 return result < 0 ? (ssize_t) result : -EFAULT; 794 } 795 796 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 797 const struct iovec *iov, 798 unsigned long nr_segs, 799 loff_t pos, int sync) 800 { 801 ssize_t result = 0; 802 size_t requested_bytes = 0; 803 unsigned long seg; 804 805 get_dreq(dreq); 806 807 for (seg = 0; seg < nr_segs; seg++) { 808 const struct iovec *vec = &iov[seg]; 809 result = nfs_direct_write_schedule_segment(dreq, vec, 810 pos, sync); 811 if (result < 0) 812 break; 813 requested_bytes += result; 814 if ((size_t)result < vec->iov_len) 815 break; 816 pos += vec->iov_len; 817 } 818 819 if (put_dreq(dreq)) 820 nfs_direct_write_complete(dreq, dreq->inode); 821 822 if (requested_bytes != 0) 823 return 0; 824 825 if (result < 0) 826 return result; 827 return -EIO; 828 } 829 830 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 831 unsigned long nr_segs, loff_t pos, 832 size_t count) 833 { 834 ssize_t result = 0; 835 struct inode *inode = iocb->ki_filp->f_mapping->host; 836 struct nfs_direct_req *dreq; 837 size_t wsize = NFS_SERVER(inode)->wsize; 838 int sync = NFS_UNSTABLE; 839 840 dreq = nfs_direct_req_alloc(); 841 if (!dreq) 842 return -ENOMEM; 843 nfs_alloc_commit_data(dreq); 844 845 if (dreq->commit_data == NULL || count < wsize) 846 sync = NFS_FILE_SYNC; 847 848 dreq->inode = inode; 849 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 850 if (!is_sync_kiocb(iocb)) 851 dreq->iocb = iocb; 852 853 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 854 if (!result) 855 result = nfs_direct_wait(dreq); 856 nfs_direct_req_release(dreq); 857 858 return result; 859 } 860 861 /** 862 * nfs_file_direct_read - file direct read operation for NFS files 863 * @iocb: target I/O control block 864 * @iov: vector of user buffers into which to read data 865 * @nr_segs: size of iov vector 866 * @pos: byte offset in file where reading starts 867 * 868 * We use this function for direct reads instead of calling 869 * generic_file_aio_read() in order to avoid gfar's check to see if 870 * the request starts before the end of the file. For that check 871 * to work, we must generate a GETATTR before each direct read, and 872 * even then there is a window between the GETATTR and the subsequent 873 * READ where the file size could change. Our preference is simply 874 * to do all reads the application wants, and the server will take 875 * care of managing the end of file boundary. 876 * 877 * This function also eliminates unnecessarily updating the file's 878 * atime locally, as the NFS server sets the file's atime, and this 879 * client must read the updated atime from the server back into its 880 * cache. 881 */ 882 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 883 unsigned long nr_segs, loff_t pos) 884 { 885 ssize_t retval = -EINVAL; 886 struct file *file = iocb->ki_filp; 887 struct address_space *mapping = file->f_mapping; 888 size_t count; 889 890 count = iov_length(iov, nr_segs); 891 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 892 893 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 894 file->f_path.dentry->d_parent->d_name.name, 895 file->f_path.dentry->d_name.name, 896 count, (long long) pos); 897 898 retval = 0; 899 if (!count) 900 goto out; 901 902 retval = nfs_sync_mapping(mapping); 903 if (retval) 904 goto out; 905 906 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 907 if (retval > 0) 908 iocb->ki_pos = pos + retval; 909 910 out: 911 return retval; 912 } 913 914 /** 915 * nfs_file_direct_write - file direct write operation for NFS files 916 * @iocb: target I/O control block 917 * @iov: vector of user buffers from which to write data 918 * @nr_segs: size of iov vector 919 * @pos: byte offset in file where writing starts 920 * 921 * We use this function for direct writes instead of calling 922 * generic_file_aio_write() in order to avoid taking the inode 923 * semaphore and updating the i_size. The NFS server will set 924 * the new i_size and this client must read the updated size 925 * back into its cache. We let the server do generic write 926 * parameter checking and report problems. 927 * 928 * We also avoid an unnecessary invocation of generic_osync_inode(), 929 * as it is fairly meaningless to sync the metadata of an NFS file. 930 * 931 * We eliminate local atime updates, see direct read above. 932 * 933 * We avoid unnecessary page cache invalidations for normal cached 934 * readers of this file. 935 * 936 * Note that O_APPEND is not supported for NFS direct writes, as there 937 * is no atomic O_APPEND write facility in the NFS protocol. 938 */ 939 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 940 unsigned long nr_segs, loff_t pos) 941 { 942 ssize_t retval = -EINVAL; 943 struct file *file = iocb->ki_filp; 944 struct address_space *mapping = file->f_mapping; 945 size_t count; 946 947 count = iov_length(iov, nr_segs); 948 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 949 950 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 951 file->f_path.dentry->d_parent->d_name.name, 952 file->f_path.dentry->d_name.name, 953 count, (long long) pos); 954 955 retval = generic_write_checks(file, &pos, &count, 0); 956 if (retval) 957 goto out; 958 959 retval = -EINVAL; 960 if ((ssize_t) count < 0) 961 goto out; 962 retval = 0; 963 if (!count) 964 goto out; 965 966 retval = nfs_sync_mapping(mapping); 967 if (retval) 968 goto out; 969 970 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 971 972 if (retval > 0) 973 iocb->ki_pos = pos + retval; 974 975 out: 976 return retval; 977 } 978 979 /** 980 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 981 * 982 */ 983 int __init nfs_init_directcache(void) 984 { 985 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 986 sizeof(struct nfs_direct_req), 987 0, (SLAB_RECLAIM_ACCOUNT| 988 SLAB_MEM_SPREAD), 989 NULL); 990 if (nfs_direct_cachep == NULL) 991 return -ENOMEM; 992 993 return 0; 994 } 995 996 /** 997 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 998 * 999 */ 1000 void nfs_destroy_directcache(void) 1001 { 1002 kmem_cache_destroy(nfs_direct_cachep); 1003 } 1004