xref: /linux/fs/nfs/direct.c (revision 14b42963f64b98ab61fa9723c03d71aa5ef4f862)
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001	Initial implementation for 2.4  --cel
33  * 08 Jul 2002	Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003	Port to 2.5 APIs  --cel
35  * 31 Mar 2004	Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004	Parallel async reads  --cel
37  * 04 May 2005	support O_DIRECT with aio  --cel
38  *
39  */
40 
41 #include <linux/errno.h>
42 #include <linux/sched.h>
43 #include <linux/kernel.h>
44 #include <linux/smp_lock.h>
45 #include <linux/file.h>
46 #include <linux/pagemap.h>
47 #include <linux/kref.h>
48 
49 #include <linux/nfs_fs.h>
50 #include <linux/nfs_page.h>
51 #include <linux/sunrpc/clnt.h>
52 
53 #include <asm/system.h>
54 #include <asm/uaccess.h>
55 #include <asm/atomic.h>
56 
57 #include "iostat.h"
58 
59 #define NFSDBG_FACILITY		NFSDBG_VFS
60 
61 static kmem_cache_t *nfs_direct_cachep;
62 
63 /*
64  * This represents a set of asynchronous requests that we're waiting on
65  */
66 struct nfs_direct_req {
67 	struct kref		kref;		/* release manager */
68 
69 	/* I/O parameters */
70 	struct nfs_open_context	*ctx;		/* file open context info */
71 	struct kiocb *		iocb;		/* controlling i/o request */
72 	struct inode *		inode;		/* target file of i/o */
73 
74 	/* completion state */
75 	atomic_t		io_count;	/* i/os we're waiting for */
76 	spinlock_t		lock;		/* protect completion state */
77 	ssize_t			count,		/* bytes actually processed */
78 				error;		/* any reported error */
79 	struct completion	completion;	/* wait for i/o completion */
80 
81 	/* commit state */
82 	struct list_head	rewrite_list;	/* saved nfs_write_data structs */
83 	struct nfs_write_data *	commit_data;	/* special write_data for commits */
84 	int			flags;
85 #define NFS_ODIRECT_DO_COMMIT		(1)	/* an unstable reply was received */
86 #define NFS_ODIRECT_RESCHED_WRITES	(2)	/* write verification failed */
87 	struct nfs_writeverf	verf;		/* unstable write verifier */
88 };
89 
90 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
91 static const struct rpc_call_ops nfs_write_direct_ops;
92 
93 static inline void get_dreq(struct nfs_direct_req *dreq)
94 {
95 	atomic_inc(&dreq->io_count);
96 }
97 
98 static inline int put_dreq(struct nfs_direct_req *dreq)
99 {
100 	return atomic_dec_and_test(&dreq->io_count);
101 }
102 
103 /*
104  * "size" is never larger than rsize or wsize.
105  */
106 static inline int nfs_direct_count_pages(unsigned long user_addr, size_t size)
107 {
108 	int page_count;
109 
110 	page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
111 	page_count -= user_addr >> PAGE_SHIFT;
112 	BUG_ON(page_count < 0);
113 
114 	return page_count;
115 }
116 
117 static inline unsigned int nfs_max_pages(unsigned int size)
118 {
119 	return (size + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
120 }
121 
122 /**
123  * nfs_direct_IO - NFS address space operation for direct I/O
124  * @rw: direction (read or write)
125  * @iocb: target I/O control block
126  * @iov: array of vectors that define I/O buffer
127  * @pos: offset in file to begin the operation
128  * @nr_segs: size of iovec array
129  *
130  * The presence of this routine in the address space ops vector means
131  * the NFS client supports direct I/O.  However, we shunt off direct
132  * read and write requests before the VFS gets them, so this method
133  * should never be called.
134  */
135 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
136 {
137 	dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
138 			iocb->ki_filp->f_dentry->d_name.name,
139 			(long long) pos, nr_segs);
140 
141 	return -EINVAL;
142 }
143 
144 static void nfs_direct_dirty_pages(struct page **pages, int npages)
145 {
146 	int i;
147 	for (i = 0; i < npages; i++) {
148 		struct page *page = pages[i];
149 		if (!PageCompound(page))
150 			set_page_dirty_lock(page);
151 	}
152 }
153 
154 static void nfs_direct_release_pages(struct page **pages, int npages)
155 {
156 	int i;
157 	for (i = 0; i < npages; i++)
158 		page_cache_release(pages[i]);
159 }
160 
161 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
162 {
163 	struct nfs_direct_req *dreq;
164 
165 	dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
166 	if (!dreq)
167 		return NULL;
168 
169 	kref_init(&dreq->kref);
170 	kref_get(&dreq->kref);
171 	init_completion(&dreq->completion);
172 	INIT_LIST_HEAD(&dreq->rewrite_list);
173 	dreq->iocb = NULL;
174 	dreq->ctx = NULL;
175 	spin_lock_init(&dreq->lock);
176 	atomic_set(&dreq->io_count, 0);
177 	dreq->count = 0;
178 	dreq->error = 0;
179 	dreq->flags = 0;
180 
181 	return dreq;
182 }
183 
184 static void nfs_direct_req_release(struct kref *kref)
185 {
186 	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
187 
188 	if (dreq->ctx != NULL)
189 		put_nfs_open_context(dreq->ctx);
190 	kmem_cache_free(nfs_direct_cachep, dreq);
191 }
192 
193 /*
194  * Collects and returns the final error value/byte-count.
195  */
196 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
197 {
198 	ssize_t result = -EIOCBQUEUED;
199 
200 	/* Async requests don't wait here */
201 	if (dreq->iocb)
202 		goto out;
203 
204 	result = wait_for_completion_interruptible(&dreq->completion);
205 
206 	if (!result)
207 		result = dreq->error;
208 	if (!result)
209 		result = dreq->count;
210 
211 out:
212 	kref_put(&dreq->kref, nfs_direct_req_release);
213 	return (ssize_t) result;
214 }
215 
216 /*
217  * Synchronous I/O uses a stack-allocated iocb.  Thus we can't trust
218  * the iocb is still valid here if this is a synchronous request.
219  */
220 static void nfs_direct_complete(struct nfs_direct_req *dreq)
221 {
222 	if (dreq->iocb) {
223 		long res = (long) dreq->error;
224 		if (!res)
225 			res = (long) dreq->count;
226 		aio_complete(dreq->iocb, res, 0);
227 	}
228 	complete_all(&dreq->completion);
229 
230 	kref_put(&dreq->kref, nfs_direct_req_release);
231 }
232 
233 /*
234  * We must hold a reference to all the pages in this direct read request
235  * until the RPCs complete.  This could be long *after* we are woken up in
236  * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
237  */
238 static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
239 {
240 	struct nfs_read_data *data = calldata;
241 	struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
242 
243 	if (nfs_readpage_result(task, data) != 0)
244 		return;
245 
246 	nfs_direct_dirty_pages(data->pagevec, data->npages);
247 	nfs_direct_release_pages(data->pagevec, data->npages);
248 
249 	spin_lock(&dreq->lock);
250 
251 	if (likely(task->tk_status >= 0))
252 		dreq->count += data->res.count;
253 	else
254 		dreq->error = task->tk_status;
255 
256 	spin_unlock(&dreq->lock);
257 
258 	if (put_dreq(dreq))
259 		nfs_direct_complete(dreq);
260 }
261 
262 static const struct rpc_call_ops nfs_read_direct_ops = {
263 	.rpc_call_done = nfs_direct_read_result,
264 	.rpc_release = nfs_readdata_release,
265 };
266 
267 /*
268  * For each rsize'd chunk of the user's buffer, dispatch an NFS READ
269  * operation.  If nfs_readdata_alloc() or get_user_pages() fails,
270  * bail and stop sending more reads.  Read length accounting is
271  * handled automatically by nfs_direct_read_result().  Otherwise, if
272  * no requests have been sent, just return an error.
273  */
274 static ssize_t nfs_direct_read_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos)
275 {
276 	struct nfs_open_context *ctx = dreq->ctx;
277 	struct inode *inode = ctx->dentry->d_inode;
278 	size_t rsize = NFS_SERVER(inode)->rsize;
279 	unsigned int rpages = nfs_max_pages(rsize);
280 	unsigned int pgbase;
281 	int result;
282 	ssize_t started = 0;
283 
284 	get_dreq(dreq);
285 
286 	pgbase = user_addr & ~PAGE_MASK;
287 	do {
288 		struct nfs_read_data *data;
289 		size_t bytes;
290 
291 		result = -ENOMEM;
292 		data = nfs_readdata_alloc(rpages);
293 		if (unlikely(!data))
294 			break;
295 
296 		bytes = rsize;
297 		if (count < rsize)
298 			bytes = count;
299 
300 		data->npages = nfs_direct_count_pages(user_addr, bytes);
301 		down_read(&current->mm->mmap_sem);
302 		result = get_user_pages(current, current->mm, user_addr,
303 					data->npages, 1, 0, data->pagevec, NULL);
304 		up_read(&current->mm->mmap_sem);
305 		if (unlikely(result < data->npages)) {
306 			if (result > 0)
307 				nfs_direct_release_pages(data->pagevec, result);
308 			nfs_readdata_release(data);
309 			break;
310 		}
311 
312 		get_dreq(dreq);
313 
314 		data->req = (struct nfs_page *) dreq;
315 		data->inode = inode;
316 		data->cred = ctx->cred;
317 		data->args.fh = NFS_FH(inode);
318 		data->args.context = ctx;
319 		data->args.offset = pos;
320 		data->args.pgbase = pgbase;
321 		data->args.pages = data->pagevec;
322 		data->args.count = bytes;
323 		data->res.fattr = &data->fattr;
324 		data->res.eof = 0;
325 		data->res.count = bytes;
326 
327 		rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
328 				&nfs_read_direct_ops, data);
329 		NFS_PROTO(inode)->read_setup(data);
330 
331 		data->task.tk_cookie = (unsigned long) inode;
332 
333 		lock_kernel();
334 		rpc_execute(&data->task);
335 		unlock_kernel();
336 
337 		dfprintk(VFS, "NFS: %5u initiated direct read call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
338 				data->task.tk_pid,
339 				inode->i_sb->s_id,
340 				(long long)NFS_FILEID(inode),
341 				bytes,
342 				(unsigned long long)data->args.offset);
343 
344 		started += bytes;
345 		user_addr += bytes;
346 		pos += bytes;
347 		pgbase += bytes;
348 		pgbase &= ~PAGE_MASK;
349 
350 		count -= bytes;
351 	} while (count != 0);
352 
353 	if (put_dreq(dreq))
354 		nfs_direct_complete(dreq);
355 
356 	if (started)
357 		return 0;
358 	return result < 0 ? (ssize_t) result : -EFAULT;
359 }
360 
361 static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
362 {
363 	ssize_t result = 0;
364 	sigset_t oldset;
365 	struct inode *inode = iocb->ki_filp->f_mapping->host;
366 	struct rpc_clnt *clnt = NFS_CLIENT(inode);
367 	struct nfs_direct_req *dreq;
368 
369 	dreq = nfs_direct_req_alloc();
370 	if (!dreq)
371 		return -ENOMEM;
372 
373 	dreq->inode = inode;
374 	dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
375 	if (!is_sync_kiocb(iocb))
376 		dreq->iocb = iocb;
377 
378 	nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
379 	rpc_clnt_sigmask(clnt, &oldset);
380 	result = nfs_direct_read_schedule(dreq, user_addr, count, pos);
381 	if (!result)
382 		result = nfs_direct_wait(dreq);
383 	rpc_clnt_sigunmask(clnt, &oldset);
384 
385 	return result;
386 }
387 
388 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq)
389 {
390 	while (!list_empty(&dreq->rewrite_list)) {
391 		struct nfs_write_data *data = list_entry(dreq->rewrite_list.next, struct nfs_write_data, pages);
392 		list_del(&data->pages);
393 		nfs_direct_release_pages(data->pagevec, data->npages);
394 		nfs_writedata_release(data);
395 	}
396 }
397 
398 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
399 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
400 {
401 	struct inode *inode = dreq->inode;
402 	struct list_head *p;
403 	struct nfs_write_data *data;
404 
405 	dreq->count = 0;
406 	get_dreq(dreq);
407 
408 	list_for_each(p, &dreq->rewrite_list) {
409 		data = list_entry(p, struct nfs_write_data, pages);
410 
411 		get_dreq(dreq);
412 
413 		/*
414 		 * Reset data->res.
415 		 */
416 		nfs_fattr_init(&data->fattr);
417 		data->res.count = data->args.count;
418 		memset(&data->verf, 0, sizeof(data->verf));
419 
420 		/*
421 		 * Reuse data->task; data->args should not have changed
422 		 * since the original request was sent.
423 		 */
424 		rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
425 				&nfs_write_direct_ops, data);
426 		NFS_PROTO(inode)->write_setup(data, FLUSH_STABLE);
427 
428 		data->task.tk_priority = RPC_PRIORITY_NORMAL;
429 		data->task.tk_cookie = (unsigned long) inode;
430 
431 		/*
432 		 * We're called via an RPC callback, so BKL is already held.
433 		 */
434 		rpc_execute(&data->task);
435 
436 		dprintk("NFS: %5u rescheduled direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
437 				data->task.tk_pid,
438 				inode->i_sb->s_id,
439 				(long long)NFS_FILEID(inode),
440 				data->args.count,
441 				(unsigned long long)data->args.offset);
442 	}
443 
444 	if (put_dreq(dreq))
445 		nfs_direct_write_complete(dreq, inode);
446 }
447 
448 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
449 {
450 	struct nfs_write_data *data = calldata;
451 	struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
452 
453 	/* Call the NFS version-specific code */
454 	if (NFS_PROTO(data->inode)->commit_done(task, data) != 0)
455 		return;
456 	if (unlikely(task->tk_status < 0)) {
457 		dreq->error = task->tk_status;
458 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
459 	}
460 	if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
461 		dprintk("NFS: %5u commit verify failed\n", task->tk_pid);
462 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
463 	}
464 
465 	dprintk("NFS: %5u commit returned %d\n", task->tk_pid, task->tk_status);
466 	nfs_direct_write_complete(dreq, data->inode);
467 }
468 
469 static const struct rpc_call_ops nfs_commit_direct_ops = {
470 	.rpc_call_done = nfs_direct_commit_result,
471 	.rpc_release = nfs_commit_release,
472 };
473 
474 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
475 {
476 	struct nfs_write_data *data = dreq->commit_data;
477 
478 	data->inode = dreq->inode;
479 	data->cred = dreq->ctx->cred;
480 
481 	data->args.fh = NFS_FH(data->inode);
482 	data->args.offset = 0;
483 	data->args.count = 0;
484 	data->res.count = 0;
485 	data->res.fattr = &data->fattr;
486 	data->res.verf = &data->verf;
487 
488 	rpc_init_task(&data->task, NFS_CLIENT(dreq->inode), RPC_TASK_ASYNC,
489 				&nfs_commit_direct_ops, data);
490 	NFS_PROTO(data->inode)->commit_setup(data, 0);
491 
492 	data->task.tk_priority = RPC_PRIORITY_NORMAL;
493 	data->task.tk_cookie = (unsigned long)data->inode;
494 	/* Note: task.tk_ops->rpc_release will free dreq->commit_data */
495 	dreq->commit_data = NULL;
496 
497 	dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid);
498 
499 	lock_kernel();
500 	rpc_execute(&data->task);
501 	unlock_kernel();
502 }
503 
504 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
505 {
506 	int flags = dreq->flags;
507 
508 	dreq->flags = 0;
509 	switch (flags) {
510 		case NFS_ODIRECT_DO_COMMIT:
511 			nfs_direct_commit_schedule(dreq);
512 			break;
513 		case NFS_ODIRECT_RESCHED_WRITES:
514 			nfs_direct_write_reschedule(dreq);
515 			break;
516 		default:
517 			nfs_end_data_update(inode);
518 			if (dreq->commit_data != NULL)
519 				nfs_commit_free(dreq->commit_data);
520 			nfs_direct_free_writedata(dreq);
521 			nfs_direct_complete(dreq);
522 	}
523 }
524 
525 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
526 {
527 	dreq->commit_data = nfs_commit_alloc(0);
528 	if (dreq->commit_data != NULL)
529 		dreq->commit_data->req = (struct nfs_page *) dreq;
530 }
531 #else
532 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
533 {
534 	dreq->commit_data = NULL;
535 }
536 
537 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
538 {
539 	nfs_end_data_update(inode);
540 	nfs_direct_free_writedata(dreq);
541 	nfs_direct_complete(dreq);
542 }
543 #endif
544 
545 static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
546 {
547 	struct nfs_write_data *data = calldata;
548 	struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
549 	int status = task->tk_status;
550 
551 	if (nfs_writeback_done(task, data) != 0)
552 		return;
553 
554 	spin_lock(&dreq->lock);
555 
556 	if (likely(status >= 0))
557 		dreq->count += data->res.count;
558 	else
559 		dreq->error = task->tk_status;
560 
561 	if (data->res.verf->committed != NFS_FILE_SYNC) {
562 		switch (dreq->flags) {
563 			case 0:
564 				memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf));
565 				dreq->flags = NFS_ODIRECT_DO_COMMIT;
566 				break;
567 			case NFS_ODIRECT_DO_COMMIT:
568 				if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) {
569 					dprintk("NFS: %5u write verify failed\n", task->tk_pid);
570 					dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
571 				}
572 		}
573 	}
574 
575 	spin_unlock(&dreq->lock);
576 }
577 
578 /*
579  * NB: Return the value of the first error return code.  Subsequent
580  *     errors after the first one are ignored.
581  */
582 static void nfs_direct_write_release(void *calldata)
583 {
584 	struct nfs_write_data *data = calldata;
585 	struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
586 
587 	if (put_dreq(dreq))
588 		nfs_direct_write_complete(dreq, data->inode);
589 }
590 
591 static const struct rpc_call_ops nfs_write_direct_ops = {
592 	.rpc_call_done = nfs_direct_write_result,
593 	.rpc_release = nfs_direct_write_release,
594 };
595 
596 /*
597  * For each wsize'd chunk of the user's buffer, dispatch an NFS WRITE
598  * operation.  If nfs_writedata_alloc() or get_user_pages() fails,
599  * bail and stop sending more writes.  Write length accounting is
600  * handled automatically by nfs_direct_write_result().  Otherwise, if
601  * no requests have been sent, just return an error.
602  */
603 static ssize_t nfs_direct_write_schedule(struct nfs_direct_req *dreq, unsigned long user_addr, size_t count, loff_t pos, int sync)
604 {
605 	struct nfs_open_context *ctx = dreq->ctx;
606 	struct inode *inode = ctx->dentry->d_inode;
607 	size_t wsize = NFS_SERVER(inode)->wsize;
608 	unsigned int wpages = nfs_max_pages(wsize);
609 	unsigned int pgbase;
610 	int result;
611 	ssize_t started = 0;
612 
613 	get_dreq(dreq);
614 
615 	pgbase = user_addr & ~PAGE_MASK;
616 	do {
617 		struct nfs_write_data *data;
618 		size_t bytes;
619 
620 		result = -ENOMEM;
621 		data = nfs_writedata_alloc(wpages);
622 		if (unlikely(!data))
623 			break;
624 
625 		bytes = wsize;
626 		if (count < wsize)
627 			bytes = count;
628 
629 		data->npages = nfs_direct_count_pages(user_addr, bytes);
630 		down_read(&current->mm->mmap_sem);
631 		result = get_user_pages(current, current->mm, user_addr,
632 					data->npages, 0, 0, data->pagevec, NULL);
633 		up_read(&current->mm->mmap_sem);
634 		if (unlikely(result < data->npages)) {
635 			if (result > 0)
636 				nfs_direct_release_pages(data->pagevec, result);
637 			nfs_writedata_release(data);
638 			break;
639 		}
640 
641 		get_dreq(dreq);
642 
643 		list_move_tail(&data->pages, &dreq->rewrite_list);
644 
645 		data->req = (struct nfs_page *) dreq;
646 		data->inode = inode;
647 		data->cred = ctx->cred;
648 		data->args.fh = NFS_FH(inode);
649 		data->args.context = ctx;
650 		data->args.offset = pos;
651 		data->args.pgbase = pgbase;
652 		data->args.pages = data->pagevec;
653 		data->args.count = bytes;
654 		data->res.fattr = &data->fattr;
655 		data->res.count = bytes;
656 		data->res.verf = &data->verf;
657 
658 		rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
659 				&nfs_write_direct_ops, data);
660 		NFS_PROTO(inode)->write_setup(data, sync);
661 
662 		data->task.tk_priority = RPC_PRIORITY_NORMAL;
663 		data->task.tk_cookie = (unsigned long) inode;
664 
665 		lock_kernel();
666 		rpc_execute(&data->task);
667 		unlock_kernel();
668 
669 		dfprintk(VFS, "NFS: %5u initiated direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
670 				data->task.tk_pid,
671 				inode->i_sb->s_id,
672 				(long long)NFS_FILEID(inode),
673 				bytes,
674 				(unsigned long long)data->args.offset);
675 
676 		started += bytes;
677 		user_addr += bytes;
678 		pos += bytes;
679 		pgbase += bytes;
680 		pgbase &= ~PAGE_MASK;
681 
682 		count -= bytes;
683 	} while (count != 0);
684 
685 	if (put_dreq(dreq))
686 		nfs_direct_write_complete(dreq, inode);
687 
688 	if (started)
689 		return 0;
690 	return result < 0 ? (ssize_t) result : -EFAULT;
691 }
692 
693 static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos)
694 {
695 	ssize_t result = 0;
696 	sigset_t oldset;
697 	struct inode *inode = iocb->ki_filp->f_mapping->host;
698 	struct rpc_clnt *clnt = NFS_CLIENT(inode);
699 	struct nfs_direct_req *dreq;
700 	size_t wsize = NFS_SERVER(inode)->wsize;
701 	int sync = 0;
702 
703 	dreq = nfs_direct_req_alloc();
704 	if (!dreq)
705 		return -ENOMEM;
706 	nfs_alloc_commit_data(dreq);
707 
708 	if (dreq->commit_data == NULL || count < wsize)
709 		sync = FLUSH_STABLE;
710 
711 	dreq->inode = inode;
712 	dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
713 	if (!is_sync_kiocb(iocb))
714 		dreq->iocb = iocb;
715 
716 	nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, count);
717 
718 	nfs_begin_data_update(inode);
719 
720 	rpc_clnt_sigmask(clnt, &oldset);
721 	result = nfs_direct_write_schedule(dreq, user_addr, count, pos, sync);
722 	if (!result)
723 		result = nfs_direct_wait(dreq);
724 	rpc_clnt_sigunmask(clnt, &oldset);
725 
726 	return result;
727 }
728 
729 /**
730  * nfs_file_direct_read - file direct read operation for NFS files
731  * @iocb: target I/O control block
732  * @buf: user's buffer into which to read data
733  * @count: number of bytes to read
734  * @pos: byte offset in file where reading starts
735  *
736  * We use this function for direct reads instead of calling
737  * generic_file_aio_read() in order to avoid gfar's check to see if
738  * the request starts before the end of the file.  For that check
739  * to work, we must generate a GETATTR before each direct read, and
740  * even then there is a window between the GETATTR and the subsequent
741  * READ where the file size could change.  Our preference is simply
742  * to do all reads the application wants, and the server will take
743  * care of managing the end of file boundary.
744  *
745  * This function also eliminates unnecessarily updating the file's
746  * atime locally, as the NFS server sets the file's atime, and this
747  * client must read the updated atime from the server back into its
748  * cache.
749  */
750 ssize_t nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos)
751 {
752 	ssize_t retval = -EINVAL;
753 	struct file *file = iocb->ki_filp;
754 	struct address_space *mapping = file->f_mapping;
755 
756 	dprintk("nfs: direct read(%s/%s, %lu@%Ld)\n",
757 		file->f_dentry->d_parent->d_name.name,
758 		file->f_dentry->d_name.name,
759 		(unsigned long) count, (long long) pos);
760 
761 	if (count < 0)
762 		goto out;
763 	retval = -EFAULT;
764 	if (!access_ok(VERIFY_WRITE, buf, count))
765 		goto out;
766 	retval = 0;
767 	if (!count)
768 		goto out;
769 
770 	retval = nfs_sync_mapping(mapping);
771 	if (retval)
772 		goto out;
773 
774 	retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos);
775 	if (retval > 0)
776 		iocb->ki_pos = pos + retval;
777 
778 out:
779 	return retval;
780 }
781 
782 /**
783  * nfs_file_direct_write - file direct write operation for NFS files
784  * @iocb: target I/O control block
785  * @buf: user's buffer from which to write data
786  * @count: number of bytes to write
787  * @pos: byte offset in file where writing starts
788  *
789  * We use this function for direct writes instead of calling
790  * generic_file_aio_write() in order to avoid taking the inode
791  * semaphore and updating the i_size.  The NFS server will set
792  * the new i_size and this client must read the updated size
793  * back into its cache.  We let the server do generic write
794  * parameter checking and report problems.
795  *
796  * We also avoid an unnecessary invocation of generic_osync_inode(),
797  * as it is fairly meaningless to sync the metadata of an NFS file.
798  *
799  * We eliminate local atime updates, see direct read above.
800  *
801  * We avoid unnecessary page cache invalidations for normal cached
802  * readers of this file.
803  *
804  * Note that O_APPEND is not supported for NFS direct writes, as there
805  * is no atomic O_APPEND write facility in the NFS protocol.
806  */
807 ssize_t nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos)
808 {
809 	ssize_t retval;
810 	struct file *file = iocb->ki_filp;
811 	struct address_space *mapping = file->f_mapping;
812 
813 	dfprintk(VFS, "nfs: direct write(%s/%s, %lu@%Ld)\n",
814 		file->f_dentry->d_parent->d_name.name,
815 		file->f_dentry->d_name.name,
816 		(unsigned long) count, (long long) pos);
817 
818 	retval = generic_write_checks(file, &pos, &count, 0);
819 	if (retval)
820 		goto out;
821 
822 	retval = -EINVAL;
823 	if ((ssize_t) count < 0)
824 		goto out;
825 	retval = 0;
826 	if (!count)
827 		goto out;
828 
829 	retval = -EFAULT;
830 	if (!access_ok(VERIFY_READ, buf, count))
831 		goto out;
832 
833 	retval = nfs_sync_mapping(mapping);
834 	if (retval)
835 		goto out;
836 
837 	retval = nfs_direct_write(iocb, (unsigned long) buf, count, pos);
838 
839 	/*
840 	 * XXX: nfs_end_data_update() already ensures this file's
841 	 *      cached data is subsequently invalidated.  Do we really
842 	 *      need to call invalidate_inode_pages2() again here?
843 	 *
844 	 *      For aio writes, this invalidation will almost certainly
845 	 *      occur before the writes complete.  Kind of racey.
846 	 */
847 	if (mapping->nrpages)
848 		invalidate_inode_pages2(mapping);
849 
850 	if (retval > 0)
851 		iocb->ki_pos = pos + retval;
852 
853 out:
854 	return retval;
855 }
856 
857 /**
858  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
859  *
860  */
861 int __init nfs_init_directcache(void)
862 {
863 	nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
864 						sizeof(struct nfs_direct_req),
865 						0, (SLAB_RECLAIM_ACCOUNT|
866 							SLAB_MEM_SPREAD),
867 						NULL, NULL);
868 	if (nfs_direct_cachep == NULL)
869 		return -ENOMEM;
870 
871 	return 0;
872 }
873 
874 /**
875  * nfs_destroy_directcache - destroy the slab cache for nfs_direct_req structures
876  *
877  */
878 void nfs_destroy_directcache(void)
879 {
880 	if (kmem_cache_destroy(nfs_direct_cachep))
881 		printk(KERN_INFO "nfs_direct_cache: not all structures were freed\n");
882 }
883