1 /* 2 * linux/fs/nfs/direct.c 3 * 4 * Copyright (C) 2003 by Chuck Lever <cel@netapp.com> 5 * 6 * High-performance uncached I/O for the Linux NFS client 7 * 8 * There are important applications whose performance or correctness 9 * depends on uncached access to file data. Database clusters 10 * (multiple copies of the same instance running on separate hosts) 11 * implement their own cache coherency protocol that subsumes file 12 * system cache protocols. Applications that process datasets 13 * considerably larger than the client's memory do not always benefit 14 * from a local cache. A streaming video server, for instance, has no 15 * need to cache the contents of a file. 16 * 17 * When an application requests uncached I/O, all read and write requests 18 * are made directly to the server; data stored or fetched via these 19 * requests is not cached in the Linux page cache. The client does not 20 * correct unaligned requests from applications. All requested bytes are 21 * held on permanent storage before a direct write system call returns to 22 * an application. 23 * 24 * Solaris implements an uncached I/O facility called directio() that 25 * is used for backups and sequential I/O to very large files. Solaris 26 * also supports uncaching whole NFS partitions with "-o forcedirectio," 27 * an undocumented mount option. 28 * 29 * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with 30 * help from Andrew Morton. 31 * 32 * 18 Dec 2001 Initial implementation for 2.4 --cel 33 * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy 34 * 08 Jun 2003 Port to 2.5 APIs --cel 35 * 31 Mar 2004 Handle direct I/O without VFS support --cel 36 * 15 Sep 2004 Parallel async reads --cel 37 * 04 May 2005 support O_DIRECT with aio --cel 38 * 39 */ 40 41 #include <linux/errno.h> 42 #include <linux/sched.h> 43 #include <linux/kernel.h> 44 #include <linux/file.h> 45 #include <linux/pagemap.h> 46 #include <linux/kref.h> 47 48 #include <linux/nfs_fs.h> 49 #include <linux/nfs_page.h> 50 #include <linux/sunrpc/clnt.h> 51 52 #include <asm/system.h> 53 #include <asm/uaccess.h> 54 #include <asm/atomic.h> 55 56 #include "internal.h" 57 #include "iostat.h" 58 59 #define NFSDBG_FACILITY NFSDBG_VFS 60 61 static struct kmem_cache *nfs_direct_cachep; 62 63 /* 64 * This represents a set of asynchronous requests that we're waiting on 65 */ 66 struct nfs_direct_req { 67 struct kref kref; /* release manager */ 68 69 /* I/O parameters */ 70 struct nfs_open_context *ctx; /* file open context info */ 71 struct kiocb * iocb; /* controlling i/o request */ 72 struct inode * inode; /* target file of i/o */ 73 74 /* completion state */ 75 atomic_t io_count; /* i/os we're waiting for */ 76 spinlock_t lock; /* protect completion state */ 77 ssize_t count, /* bytes actually processed */ 78 error; /* any reported error */ 79 struct completion completion; /* wait for i/o completion */ 80 81 /* commit state */ 82 struct list_head rewrite_list; /* saved nfs_write_data structs */ 83 struct nfs_write_data * commit_data; /* special write_data for commits */ 84 int flags; 85 #define NFS_ODIRECT_DO_COMMIT (1) /* an unstable reply was received */ 86 #define NFS_ODIRECT_RESCHED_WRITES (2) /* write verification failed */ 87 struct nfs_writeverf verf; /* unstable write verifier */ 88 }; 89 90 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode); 91 static const struct rpc_call_ops nfs_write_direct_ops; 92 93 static inline void get_dreq(struct nfs_direct_req *dreq) 94 { 95 atomic_inc(&dreq->io_count); 96 } 97 98 static inline int put_dreq(struct nfs_direct_req *dreq) 99 { 100 return atomic_dec_and_test(&dreq->io_count); 101 } 102 103 /** 104 * nfs_direct_IO - NFS address space operation for direct I/O 105 * @rw: direction (read or write) 106 * @iocb: target I/O control block 107 * @iov: array of vectors that define I/O buffer 108 * @pos: offset in file to begin the operation 109 * @nr_segs: size of iovec array 110 * 111 * The presence of this routine in the address space ops vector means 112 * the NFS client supports direct I/O. However, we shunt off direct 113 * read and write requests before the VFS gets them, so this method 114 * should never be called. 115 */ 116 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs) 117 { 118 dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n", 119 iocb->ki_filp->f_path.dentry->d_name.name, 120 (long long) pos, nr_segs); 121 122 return -EINVAL; 123 } 124 125 static void nfs_direct_dirty_pages(struct page **pages, unsigned int pgbase, size_t count) 126 { 127 unsigned int npages; 128 unsigned int i; 129 130 if (count == 0) 131 return; 132 pages += (pgbase >> PAGE_SHIFT); 133 npages = (count + (pgbase & ~PAGE_MASK) + PAGE_SIZE - 1) >> PAGE_SHIFT; 134 for (i = 0; i < npages; i++) { 135 struct page *page = pages[i]; 136 if (!PageCompound(page)) 137 set_page_dirty(page); 138 } 139 } 140 141 static void nfs_direct_release_pages(struct page **pages, unsigned int npages) 142 { 143 unsigned int i; 144 for (i = 0; i < npages; i++) 145 page_cache_release(pages[i]); 146 } 147 148 static inline struct nfs_direct_req *nfs_direct_req_alloc(void) 149 { 150 struct nfs_direct_req *dreq; 151 152 dreq = kmem_cache_alloc(nfs_direct_cachep, GFP_KERNEL); 153 if (!dreq) 154 return NULL; 155 156 kref_init(&dreq->kref); 157 kref_get(&dreq->kref); 158 init_completion(&dreq->completion); 159 INIT_LIST_HEAD(&dreq->rewrite_list); 160 dreq->iocb = NULL; 161 dreq->ctx = NULL; 162 spin_lock_init(&dreq->lock); 163 atomic_set(&dreq->io_count, 0); 164 dreq->count = 0; 165 dreq->error = 0; 166 dreq->flags = 0; 167 168 return dreq; 169 } 170 171 static void nfs_direct_req_free(struct kref *kref) 172 { 173 struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); 174 175 if (dreq->ctx != NULL) 176 put_nfs_open_context(dreq->ctx); 177 kmem_cache_free(nfs_direct_cachep, dreq); 178 } 179 180 static void nfs_direct_req_release(struct nfs_direct_req *dreq) 181 { 182 kref_put(&dreq->kref, nfs_direct_req_free); 183 } 184 185 /* 186 * Collects and returns the final error value/byte-count. 187 */ 188 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq) 189 { 190 ssize_t result = -EIOCBQUEUED; 191 192 /* Async requests don't wait here */ 193 if (dreq->iocb) 194 goto out; 195 196 result = wait_for_completion_killable(&dreq->completion); 197 198 if (!result) 199 result = dreq->error; 200 if (!result) 201 result = dreq->count; 202 203 out: 204 return (ssize_t) result; 205 } 206 207 /* 208 * Synchronous I/O uses a stack-allocated iocb. Thus we can't trust 209 * the iocb is still valid here if this is a synchronous request. 210 */ 211 static void nfs_direct_complete(struct nfs_direct_req *dreq) 212 { 213 if (dreq->iocb) { 214 long res = (long) dreq->error; 215 if (!res) 216 res = (long) dreq->count; 217 aio_complete(dreq->iocb, res, 0); 218 } 219 complete_all(&dreq->completion); 220 221 nfs_direct_req_release(dreq); 222 } 223 224 /* 225 * We must hold a reference to all the pages in this direct read request 226 * until the RPCs complete. This could be long *after* we are woken up in 227 * nfs_direct_wait (for instance, if someone hits ^C on a slow server). 228 */ 229 static void nfs_direct_read_result(struct rpc_task *task, void *calldata) 230 { 231 struct nfs_read_data *data = calldata; 232 233 nfs_readpage_result(task, data); 234 } 235 236 static void nfs_direct_read_release(void *calldata) 237 { 238 239 struct nfs_read_data *data = calldata; 240 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 241 int status = data->task.tk_status; 242 243 spin_lock(&dreq->lock); 244 if (unlikely(status < 0)) { 245 dreq->error = status; 246 spin_unlock(&dreq->lock); 247 } else { 248 dreq->count += data->res.count; 249 spin_unlock(&dreq->lock); 250 nfs_direct_dirty_pages(data->pagevec, 251 data->args.pgbase, 252 data->res.count); 253 } 254 nfs_direct_release_pages(data->pagevec, data->npages); 255 256 if (put_dreq(dreq)) 257 nfs_direct_complete(dreq); 258 nfs_readdata_free(data); 259 } 260 261 static const struct rpc_call_ops nfs_read_direct_ops = { 262 #if defined(CONFIG_NFS_V4_1) 263 .rpc_call_prepare = nfs_read_prepare, 264 #endif /* CONFIG_NFS_V4_1 */ 265 .rpc_call_done = nfs_direct_read_result, 266 .rpc_release = nfs_direct_read_release, 267 }; 268 269 /* 270 * For each rsize'd chunk of the user's buffer, dispatch an NFS READ 271 * operation. If nfs_readdata_alloc() or get_user_pages() fails, 272 * bail and stop sending more reads. Read length accounting is 273 * handled automatically by nfs_direct_read_result(). Otherwise, if 274 * no requests have been sent, just return an error. 275 */ 276 static ssize_t nfs_direct_read_schedule_segment(struct nfs_direct_req *dreq, 277 const struct iovec *iov, 278 loff_t pos) 279 { 280 struct nfs_open_context *ctx = dreq->ctx; 281 struct inode *inode = ctx->path.dentry->d_inode; 282 unsigned long user_addr = (unsigned long)iov->iov_base; 283 size_t count = iov->iov_len; 284 size_t rsize = NFS_SERVER(inode)->rsize; 285 struct rpc_task *task; 286 struct rpc_message msg = { 287 .rpc_cred = ctx->cred, 288 }; 289 struct rpc_task_setup task_setup_data = { 290 .rpc_client = NFS_CLIENT(inode), 291 .rpc_message = &msg, 292 .callback_ops = &nfs_read_direct_ops, 293 .workqueue = nfsiod_workqueue, 294 .flags = RPC_TASK_ASYNC, 295 }; 296 unsigned int pgbase; 297 int result; 298 ssize_t started = 0; 299 300 do { 301 struct nfs_read_data *data; 302 size_t bytes; 303 304 pgbase = user_addr & ~PAGE_MASK; 305 bytes = min(rsize,count); 306 307 result = -ENOMEM; 308 data = nfs_readdata_alloc(nfs_page_array_len(pgbase, bytes)); 309 if (unlikely(!data)) 310 break; 311 312 down_read(¤t->mm->mmap_sem); 313 result = get_user_pages(current, current->mm, user_addr, 314 data->npages, 1, 0, data->pagevec, NULL); 315 up_read(¤t->mm->mmap_sem); 316 if (result < 0) { 317 nfs_readdata_free(data); 318 break; 319 } 320 if ((unsigned)result < data->npages) { 321 bytes = result * PAGE_SIZE; 322 if (bytes <= pgbase) { 323 nfs_direct_release_pages(data->pagevec, result); 324 nfs_readdata_free(data); 325 break; 326 } 327 bytes -= pgbase; 328 data->npages = result; 329 } 330 331 get_dreq(dreq); 332 333 data->req = (struct nfs_page *) dreq; 334 data->inode = inode; 335 data->cred = msg.rpc_cred; 336 data->args.fh = NFS_FH(inode); 337 data->args.context = ctx; 338 data->args.offset = pos; 339 data->args.pgbase = pgbase; 340 data->args.pages = data->pagevec; 341 data->args.count = bytes; 342 data->res.fattr = &data->fattr; 343 data->res.eof = 0; 344 data->res.count = bytes; 345 nfs_fattr_init(&data->fattr); 346 msg.rpc_argp = &data->args; 347 msg.rpc_resp = &data->res; 348 349 task_setup_data.task = &data->task; 350 task_setup_data.callback_data = data; 351 NFS_PROTO(inode)->read_setup(data, &msg); 352 353 task = rpc_run_task(&task_setup_data); 354 if (IS_ERR(task)) 355 break; 356 rpc_put_task(task); 357 358 dprintk("NFS: %5u initiated direct read call " 359 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 360 data->task.tk_pid, 361 inode->i_sb->s_id, 362 (long long)NFS_FILEID(inode), 363 bytes, 364 (unsigned long long)data->args.offset); 365 366 started += bytes; 367 user_addr += bytes; 368 pos += bytes; 369 /* FIXME: Remove this unnecessary math from final patch */ 370 pgbase += bytes; 371 pgbase &= ~PAGE_MASK; 372 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 373 374 count -= bytes; 375 } while (count != 0); 376 377 if (started) 378 return started; 379 return result < 0 ? (ssize_t) result : -EFAULT; 380 } 381 382 static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq, 383 const struct iovec *iov, 384 unsigned long nr_segs, 385 loff_t pos) 386 { 387 ssize_t result = -EINVAL; 388 size_t requested_bytes = 0; 389 unsigned long seg; 390 391 get_dreq(dreq); 392 393 for (seg = 0; seg < nr_segs; seg++) { 394 const struct iovec *vec = &iov[seg]; 395 result = nfs_direct_read_schedule_segment(dreq, vec, pos); 396 if (result < 0) 397 break; 398 requested_bytes += result; 399 if ((size_t)result < vec->iov_len) 400 break; 401 pos += vec->iov_len; 402 } 403 404 if (put_dreq(dreq)) 405 nfs_direct_complete(dreq); 406 407 if (requested_bytes != 0) 408 return 0; 409 410 if (result < 0) 411 return result; 412 return -EIO; 413 } 414 415 static ssize_t nfs_direct_read(struct kiocb *iocb, const struct iovec *iov, 416 unsigned long nr_segs, loff_t pos) 417 { 418 ssize_t result = 0; 419 struct inode *inode = iocb->ki_filp->f_mapping->host; 420 struct nfs_direct_req *dreq; 421 422 dreq = nfs_direct_req_alloc(); 423 if (!dreq) 424 return -ENOMEM; 425 426 dreq->inode = inode; 427 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 428 if (!is_sync_kiocb(iocb)) 429 dreq->iocb = iocb; 430 431 result = nfs_direct_read_schedule_iovec(dreq, iov, nr_segs, pos); 432 if (!result) 433 result = nfs_direct_wait(dreq); 434 nfs_direct_req_release(dreq); 435 436 return result; 437 } 438 439 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq) 440 { 441 while (!list_empty(&dreq->rewrite_list)) { 442 struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages); 443 list_del(&data->pages); 444 nfs_direct_release_pages(data->pagevec, data->npages); 445 nfs_writedata_free(data); 446 } 447 } 448 449 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4) 450 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) 451 { 452 struct inode *inode = dreq->inode; 453 struct list_head *p; 454 struct nfs_write_data *data; 455 struct rpc_task *task; 456 struct rpc_message msg = { 457 .rpc_cred = dreq->ctx->cred, 458 }; 459 struct rpc_task_setup task_setup_data = { 460 .rpc_client = NFS_CLIENT(inode), 461 .rpc_message = &msg, 462 .callback_ops = &nfs_write_direct_ops, 463 .workqueue = nfsiod_workqueue, 464 .flags = RPC_TASK_ASYNC, 465 }; 466 467 dreq->count = 0; 468 get_dreq(dreq); 469 470 list_for_each(p, &dreq->rewrite_list) { 471 data = list_entry(p, struct nfs_write_data, pages); 472 473 get_dreq(dreq); 474 475 /* Use stable writes */ 476 data->args.stable = NFS_FILE_SYNC; 477 478 /* 479 * Reset data->res. 480 */ 481 nfs_fattr_init(&data->fattr); 482 data->res.count = data->args.count; 483 memset(&data->verf, 0, sizeof(data->verf)); 484 485 /* 486 * Reuse data->task; data->args should not have changed 487 * since the original request was sent. 488 */ 489 task_setup_data.task = &data->task; 490 task_setup_data.callback_data = data; 491 msg.rpc_argp = &data->args; 492 msg.rpc_resp = &data->res; 493 NFS_PROTO(inode)->write_setup(data, &msg); 494 495 /* 496 * We're called via an RPC callback, so BKL is already held. 497 */ 498 task = rpc_run_task(&task_setup_data); 499 if (!IS_ERR(task)) 500 rpc_put_task(task); 501 502 dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n", 503 data->task.tk_pid, 504 inode->i_sb->s_id, 505 (long long)NFS_FILEID(inode), 506 data->args.count, 507 (unsigned long long)data->args.offset); 508 } 509 510 if (put_dreq(dreq)) 511 nfs_direct_write_complete(dreq, inode); 512 } 513 514 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata) 515 { 516 struct nfs_write_data *data = calldata; 517 518 /* Call the NFS version-specific code */ 519 NFS_PROTO(data->inode)->commit_done(task, data); 520 } 521 522 static void nfs_direct_commit_release(void *calldata) 523 { 524 struct nfs_write_data *data = calldata; 525 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 526 int status = data->task.tk_status; 527 528 if (status < 0) { 529 dprintk("NFS: %5u commit failed with error %d.\n", 530 data->task.tk_pid, status); 531 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 532 } else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) { 533 dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid); 534 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 535 } 536 537 dprintk("NFS: %5u commit returned %d\n", data->task.tk_pid, status); 538 nfs_direct_write_complete(dreq, data->inode); 539 nfs_commit_free(data); 540 } 541 542 static const struct rpc_call_ops nfs_commit_direct_ops = { 543 #if defined(CONFIG_NFS_V4_1) 544 .rpc_call_prepare = nfs_write_prepare, 545 #endif /* CONFIG_NFS_V4_1 */ 546 .rpc_call_done = nfs_direct_commit_result, 547 .rpc_release = nfs_direct_commit_release, 548 }; 549 550 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq) 551 { 552 struct nfs_write_data *data = dreq->commit_data; 553 struct rpc_task *task; 554 struct rpc_message msg = { 555 .rpc_argp = &data->args, 556 .rpc_resp = &data->res, 557 .rpc_cred = dreq->ctx->cred, 558 }; 559 struct rpc_task_setup task_setup_data = { 560 .task = &data->task, 561 .rpc_client = NFS_CLIENT(dreq->inode), 562 .rpc_message = &msg, 563 .callback_ops = &nfs_commit_direct_ops, 564 .callback_data = data, 565 .workqueue = nfsiod_workqueue, 566 .flags = RPC_TASK_ASYNC, 567 }; 568 569 data->inode = dreq->inode; 570 data->cred = msg.rpc_cred; 571 572 data->args.fh = NFS_FH(data->inode); 573 data->args.offset = 0; 574 data->args.count = 0; 575 data->args.context = dreq->ctx; 576 data->res.count = 0; 577 data->res.fattr = &data->fattr; 578 data->res.verf = &data->verf; 579 nfs_fattr_init(&data->fattr); 580 581 NFS_PROTO(data->inode)->commit_setup(data, &msg); 582 583 /* Note: task.tk_ops->rpc_release will free dreq->commit_data */ 584 dreq->commit_data = NULL; 585 586 dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid); 587 588 task = rpc_run_task(&task_setup_data); 589 if (!IS_ERR(task)) 590 rpc_put_task(task); 591 } 592 593 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 594 { 595 int flags = dreq->flags; 596 597 dreq->flags = 0; 598 switch (flags) { 599 case NFS_ODIRECT_DO_COMMIT: 600 nfs_direct_commit_schedule(dreq); 601 break; 602 case NFS_ODIRECT_RESCHED_WRITES: 603 nfs_direct_write_reschedule(dreq); 604 break; 605 default: 606 if (dreq->commit_data != NULL) 607 nfs_commit_free(dreq->commit_data); 608 nfs_direct_free_writedata(dreq); 609 nfs_zap_mapping(inode, inode->i_mapping); 610 nfs_direct_complete(dreq); 611 } 612 } 613 614 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 615 { 616 dreq->commit_data = nfs_commitdata_alloc(); 617 if (dreq->commit_data != NULL) 618 dreq->commit_data->req = (struct nfs_page *) dreq; 619 } 620 #else 621 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq) 622 { 623 dreq->commit_data = NULL; 624 } 625 626 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode) 627 { 628 nfs_direct_free_writedata(dreq); 629 nfs_zap_mapping(inode, inode->i_mapping); 630 nfs_direct_complete(dreq); 631 } 632 #endif 633 634 static void nfs_direct_write_result(struct rpc_task *task, void *calldata) 635 { 636 struct nfs_write_data *data = calldata; 637 638 if (nfs_writeback_done(task, data) != 0) 639 return; 640 } 641 642 /* 643 * NB: Return the value of the first error return code. Subsequent 644 * errors after the first one are ignored. 645 */ 646 static void nfs_direct_write_release(void *calldata) 647 { 648 struct nfs_write_data *data = calldata; 649 struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; 650 int status = data->task.tk_status; 651 652 spin_lock(&dreq->lock); 653 654 if (unlikely(status < 0)) { 655 /* An error has occurred, so we should not commit */ 656 dreq->flags = 0; 657 dreq->error = status; 658 } 659 if (unlikely(dreq->error != 0)) 660 goto out_unlock; 661 662 dreq->count += data->res.count; 663 664 if (data->res.verf->committed != NFS_FILE_SYNC) { 665 switch (dreq->flags) { 666 case 0: 667 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf)); 668 dreq->flags = NFS_ODIRECT_DO_COMMIT; 669 break; 670 case NFS_ODIRECT_DO_COMMIT: 671 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) { 672 dprintk("NFS: %5u write verify failed\n", data->task.tk_pid); 673 dreq->flags = NFS_ODIRECT_RESCHED_WRITES; 674 } 675 } 676 } 677 out_unlock: 678 spin_unlock(&dreq->lock); 679 680 if (put_dreq(dreq)) 681 nfs_direct_write_complete(dreq, data->inode); 682 } 683 684 static const struct rpc_call_ops nfs_write_direct_ops = { 685 #if defined(CONFIG_NFS_V4_1) 686 .rpc_call_prepare = nfs_write_prepare, 687 #endif /* CONFIG_NFS_V4_1 */ 688 .rpc_call_done = nfs_direct_write_result, 689 .rpc_release = nfs_direct_write_release, 690 }; 691 692 /* 693 * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE 694 * operation. If nfs_writedata_alloc() or get_user_pages() fails, 695 * bail and stop sending more writes. Write length accounting is 696 * handled automatically by nfs_direct_write_result(). Otherwise, if 697 * no requests have been sent, just return an error. 698 */ 699 static ssize_t nfs_direct_write_schedule_segment(struct nfs_direct_req *dreq, 700 const struct iovec *iov, 701 loff_t pos, int sync) 702 { 703 struct nfs_open_context *ctx = dreq->ctx; 704 struct inode *inode = ctx->path.dentry->d_inode; 705 unsigned long user_addr = (unsigned long)iov->iov_base; 706 size_t count = iov->iov_len; 707 struct rpc_task *task; 708 struct rpc_message msg = { 709 .rpc_cred = ctx->cred, 710 }; 711 struct rpc_task_setup task_setup_data = { 712 .rpc_client = NFS_CLIENT(inode), 713 .rpc_message = &msg, 714 .callback_ops = &nfs_write_direct_ops, 715 .workqueue = nfsiod_workqueue, 716 .flags = RPC_TASK_ASYNC, 717 }; 718 size_t wsize = NFS_SERVER(inode)->wsize; 719 unsigned int pgbase; 720 int result; 721 ssize_t started = 0; 722 723 do { 724 struct nfs_write_data *data; 725 size_t bytes; 726 727 pgbase = user_addr & ~PAGE_MASK; 728 bytes = min(wsize,count); 729 730 result = -ENOMEM; 731 data = nfs_writedata_alloc(nfs_page_array_len(pgbase, bytes)); 732 if (unlikely(!data)) 733 break; 734 735 down_read(¤t->mm->mmap_sem); 736 result = get_user_pages(current, current->mm, user_addr, 737 data->npages, 0, 0, data->pagevec, NULL); 738 up_read(¤t->mm->mmap_sem); 739 if (result < 0) { 740 nfs_writedata_free(data); 741 break; 742 } 743 if ((unsigned)result < data->npages) { 744 bytes = result * PAGE_SIZE; 745 if (bytes <= pgbase) { 746 nfs_direct_release_pages(data->pagevec, result); 747 nfs_writedata_free(data); 748 break; 749 } 750 bytes -= pgbase; 751 data->npages = result; 752 } 753 754 get_dreq(dreq); 755 756 list_move_tail(&data->pages, &dreq->rewrite_list); 757 758 data->req = (struct nfs_page *) dreq; 759 data->inode = inode; 760 data->cred = msg.rpc_cred; 761 data->args.fh = NFS_FH(inode); 762 data->args.context = ctx; 763 data->args.offset = pos; 764 data->args.pgbase = pgbase; 765 data->args.pages = data->pagevec; 766 data->args.count = bytes; 767 data->args.stable = sync; 768 data->res.fattr = &data->fattr; 769 data->res.count = bytes; 770 data->res.verf = &data->verf; 771 nfs_fattr_init(&data->fattr); 772 773 task_setup_data.task = &data->task; 774 task_setup_data.callback_data = data; 775 msg.rpc_argp = &data->args; 776 msg.rpc_resp = &data->res; 777 NFS_PROTO(inode)->write_setup(data, &msg); 778 779 task = rpc_run_task(&task_setup_data); 780 if (IS_ERR(task)) 781 break; 782 rpc_put_task(task); 783 784 dprintk("NFS: %5u initiated direct write call " 785 "(req %s/%Ld, %zu bytes @ offset %Lu)\n", 786 data->task.tk_pid, 787 inode->i_sb->s_id, 788 (long long)NFS_FILEID(inode), 789 bytes, 790 (unsigned long long)data->args.offset); 791 792 started += bytes; 793 user_addr += bytes; 794 pos += bytes; 795 796 /* FIXME: Remove this useless math from the final patch */ 797 pgbase += bytes; 798 pgbase &= ~PAGE_MASK; 799 BUG_ON(pgbase != (user_addr & ~PAGE_MASK)); 800 801 count -= bytes; 802 } while (count != 0); 803 804 if (started) 805 return started; 806 return result < 0 ? (ssize_t) result : -EFAULT; 807 } 808 809 static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq, 810 const struct iovec *iov, 811 unsigned long nr_segs, 812 loff_t pos, int sync) 813 { 814 ssize_t result = 0; 815 size_t requested_bytes = 0; 816 unsigned long seg; 817 818 get_dreq(dreq); 819 820 for (seg = 0; seg < nr_segs; seg++) { 821 const struct iovec *vec = &iov[seg]; 822 result = nfs_direct_write_schedule_segment(dreq, vec, 823 pos, sync); 824 if (result < 0) 825 break; 826 requested_bytes += result; 827 if ((size_t)result < vec->iov_len) 828 break; 829 pos += vec->iov_len; 830 } 831 832 if (put_dreq(dreq)) 833 nfs_direct_write_complete(dreq, dreq->inode); 834 835 if (requested_bytes != 0) 836 return 0; 837 838 if (result < 0) 839 return result; 840 return -EIO; 841 } 842 843 static ssize_t nfs_direct_write(struct kiocb *iocb, const struct iovec *iov, 844 unsigned long nr_segs, loff_t pos, 845 size_t count) 846 { 847 ssize_t result = 0; 848 struct inode *inode = iocb->ki_filp->f_mapping->host; 849 struct nfs_direct_req *dreq; 850 size_t wsize = NFS_SERVER(inode)->wsize; 851 int sync = NFS_UNSTABLE; 852 853 dreq = nfs_direct_req_alloc(); 854 if (!dreq) 855 return -ENOMEM; 856 nfs_alloc_commit_data(dreq); 857 858 if (dreq->commit_data == NULL || count < wsize) 859 sync = NFS_FILE_SYNC; 860 861 dreq->inode = inode; 862 dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); 863 if (!is_sync_kiocb(iocb)) 864 dreq->iocb = iocb; 865 866 result = nfs_direct_write_schedule_iovec(dreq, iov, nr_segs, pos, sync); 867 if (!result) 868 result = nfs_direct_wait(dreq); 869 nfs_direct_req_release(dreq); 870 871 return result; 872 } 873 874 /** 875 * nfs_file_direct_read - file direct read operation for NFS files 876 * @iocb: target I/O control block 877 * @iov: vector of user buffers into which to read data 878 * @nr_segs: size of iov vector 879 * @pos: byte offset in file where reading starts 880 * 881 * We use this function for direct reads instead of calling 882 * generic_file_aio_read() in order to avoid gfar's check to see if 883 * the request starts before the end of the file. For that check 884 * to work, we must generate a GETATTR before each direct read, and 885 * even then there is a window between the GETATTR and the subsequent 886 * READ where the file size could change. Our preference is simply 887 * to do all reads the application wants, and the server will take 888 * care of managing the end of file boundary. 889 * 890 * This function also eliminates unnecessarily updating the file's 891 * atime locally, as the NFS server sets the file's atime, and this 892 * client must read the updated atime from the server back into its 893 * cache. 894 */ 895 ssize_t nfs_file_direct_read(struct kiocb *iocb, const struct iovec *iov, 896 unsigned long nr_segs, loff_t pos) 897 { 898 ssize_t retval = -EINVAL; 899 struct file *file = iocb->ki_filp; 900 struct address_space *mapping = file->f_mapping; 901 size_t count; 902 903 count = iov_length(iov, nr_segs); 904 nfs_add_stats(mapping->host, NFSIOS_DIRECTREADBYTES, count); 905 906 dfprintk(FILE, "NFS: direct read(%s/%s, %zd@%Ld)\n", 907 file->f_path.dentry->d_parent->d_name.name, 908 file->f_path.dentry->d_name.name, 909 count, (long long) pos); 910 911 retval = 0; 912 if (!count) 913 goto out; 914 915 retval = nfs_sync_mapping(mapping); 916 if (retval) 917 goto out; 918 919 retval = nfs_direct_read(iocb, iov, nr_segs, pos); 920 if (retval > 0) 921 iocb->ki_pos = pos + retval; 922 923 out: 924 return retval; 925 } 926 927 /** 928 * nfs_file_direct_write - file direct write operation for NFS files 929 * @iocb: target I/O control block 930 * @iov: vector of user buffers from which to write data 931 * @nr_segs: size of iov vector 932 * @pos: byte offset in file where writing starts 933 * 934 * We use this function for direct writes instead of calling 935 * generic_file_aio_write() in order to avoid taking the inode 936 * semaphore and updating the i_size. The NFS server will set 937 * the new i_size and this client must read the updated size 938 * back into its cache. We let the server do generic write 939 * parameter checking and report problems. 940 * 941 * We eliminate local atime updates, see direct read above. 942 * 943 * We avoid unnecessary page cache invalidations for normal cached 944 * readers of this file. 945 * 946 * Note that O_APPEND is not supported for NFS direct writes, as there 947 * is no atomic O_APPEND write facility in the NFS protocol. 948 */ 949 ssize_t nfs_file_direct_write(struct kiocb *iocb, const struct iovec *iov, 950 unsigned long nr_segs, loff_t pos) 951 { 952 ssize_t retval = -EINVAL; 953 struct file *file = iocb->ki_filp; 954 struct address_space *mapping = file->f_mapping; 955 size_t count; 956 957 count = iov_length(iov, nr_segs); 958 nfs_add_stats(mapping->host, NFSIOS_DIRECTWRITTENBYTES, count); 959 960 dfprintk(FILE, "NFS: direct write(%s/%s, %zd@%Ld)\n", 961 file->f_path.dentry->d_parent->d_name.name, 962 file->f_path.dentry->d_name.name, 963 count, (long long) pos); 964 965 retval = generic_write_checks(file, &pos, &count, 0); 966 if (retval) 967 goto out; 968 969 retval = -EINVAL; 970 if ((ssize_t) count < 0) 971 goto out; 972 retval = 0; 973 if (!count) 974 goto out; 975 976 retval = nfs_sync_mapping(mapping); 977 if (retval) 978 goto out; 979 980 retval = nfs_direct_write(iocb, iov, nr_segs, pos, count); 981 982 if (retval > 0) 983 iocb->ki_pos = pos + retval; 984 985 out: 986 return retval; 987 } 988 989 /** 990 * nfs_init_directcache - create a slab cache for nfs_direct_req structures 991 * 992 */ 993 int __init nfs_init_directcache(void) 994 { 995 nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", 996 sizeof(struct nfs_direct_req), 997 0, (SLAB_RECLAIM_ACCOUNT| 998 SLAB_MEM_SPREAD), 999 NULL); 1000 if (nfs_direct_cachep == NULL) 1001 return -ENOMEM; 1002 1003 return 0; 1004 } 1005 1006 /** 1007 * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures 1008 * 1009 */ 1010 void nfs_destroy_directcache(void) 1011 { 1012 kmem_cache_destroy(nfs_direct_cachep); 1013 } 1014