NFS: O_DIRECT needs to use a completion
[linux-2.6] / fs / nfs / direct.c
1 /*
2  * linux/fs/nfs/direct.c
3  *
4  * Copyright (C) 2003 by Chuck Lever <cel@netapp.com>
5  *
6  * High-performance uncached I/O for the Linux NFS client
7  *
8  * There are important applications whose performance or correctness
9  * depends on uncached access to file data.  Database clusters
10  * (multiple copies of the same instance running on separate hosts)
11  * implement their own cache coherency protocol that subsumes file
12  * system cache protocols.  Applications that process datasets
13  * considerably larger than the client's memory do not always benefit
14  * from a local cache.  A streaming video server, for instance, has no
15  * need to cache the contents of a file.
16  *
17  * When an application requests uncached I/O, all read and write requests
18  * are made directly to the server; data stored or fetched via these
19  * requests is not cached in the Linux page cache.  The client does not
20  * correct unaligned requests from applications.  All requested bytes are
21  * held on permanent storage before a direct write system call returns to
22  * an application.
23  *
24  * Solaris implements an uncached I/O facility called directio() that
25  * is used for backups and sequential I/O to very large files.  Solaris
26  * also supports uncaching whole NFS partitions with "-o forcedirectio,"
27  * an undocumented mount option.
28  *
29  * Designed by Jeff Kimmel, Chuck Lever, and Trond Myklebust, with
30  * help from Andrew Morton.
31  *
32  * 18 Dec 2001  Initial implementation for 2.4  --cel
33  * 08 Jul 2002  Version for 2.4.19, with bug fixes --trondmy
34  * 08 Jun 2003  Port to 2.5 APIs  --cel
35  * 31 Mar 2004  Handle direct I/O without VFS support  --cel
36  * 15 Sep 2004  Parallel async reads  --cel
37  * 04 May 2005  support O_DIRECT with aio  --cel
38  *
39  */
40
41 #include <linux/config.h>
42 #include <linux/errno.h>
43 #include <linux/sched.h>
44 #include <linux/kernel.h>
45 #include <linux/smp_lock.h>
46 #include <linux/file.h>
47 #include <linux/pagemap.h>
48 #include <linux/kref.h>
49
50 #include <linux/nfs_fs.h>
51 #include <linux/nfs_page.h>
52 #include <linux/sunrpc/clnt.h>
53
54 #include <asm/system.h>
55 #include <asm/uaccess.h>
56 #include <asm/atomic.h>
57
58 #include "iostat.h"
59
60 #define NFSDBG_FACILITY         NFSDBG_VFS
61
62 static kmem_cache_t *nfs_direct_cachep;
63
64 /*
65  * This represents a set of asynchronous requests that we're waiting on
66  */
67 struct nfs_direct_req {
68         struct kref             kref;           /* release manager */
69
70         /* I/O parameters */
71         struct list_head        list,           /* nfs_read/write_data structs */
72                                 rewrite_list;   /* saved nfs_write_data structs */
73         struct nfs_open_context *ctx;           /* file open context info */
74         struct kiocb *          iocb;           /* controlling i/o request */
75         struct inode *          inode;          /* target file of i/o */
76         unsigned long           user_addr;      /* location of user's buffer */
77         size_t                  user_count;     /* total bytes to move */
78         loff_t                  pos;            /* starting offset in file */
79         struct page **          pages;          /* pages in our buffer */
80         unsigned int            npages;         /* count of pages */
81
82         /* completion state */
83         spinlock_t              lock;           /* protect completion state */
84         int                     outstanding;    /* i/os we're waiting for */
85         ssize_t                 count,          /* bytes actually processed */
86                                 error;          /* any reported error */
87         struct completion       completion;     /* wait for i/o completion */
88
89         /* commit state */
90         struct nfs_write_data * commit_data;    /* special write_data for commits */
91         int                     flags;
92 #define NFS_ODIRECT_DO_COMMIT           (1)     /* an unstable reply was received */
93 #define NFS_ODIRECT_RESCHED_WRITES      (2)     /* write verification failed */
94         struct nfs_writeverf    verf;           /* unstable write verifier */
95 };
96
97 static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync);
98 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode);
99
100 /**
101  * nfs_direct_IO - NFS address space operation for direct I/O
102  * @rw: direction (read or write)
103  * @iocb: target I/O control block
104  * @iov: array of vectors that define I/O buffer
105  * @pos: offset in file to begin the operation
106  * @nr_segs: size of iovec array
107  *
108  * The presence of this routine in the address space ops vector means
109  * the NFS client supports direct I/O.  However, we shunt off direct
110  * read and write requests before the VFS gets them, so this method
111  * should never be called.
112  */
113 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov, loff_t pos, unsigned long nr_segs)
114 {
115         struct dentry *dentry = iocb->ki_filp->f_dentry;
116
117         dprintk("NFS: nfs_direct_IO (%s) off/no(%Ld/%lu) EINVAL\n",
118                         dentry->d_name.name, (long long) pos, nr_segs);
119
120         return -EINVAL;
121 }
122
123 static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
124 {
125         int i;
126         for (i = 0; i < npages; i++) {
127                 struct page *page = pages[i];
128                 if (do_dirty && !PageCompound(page))
129                         set_page_dirty_lock(page);
130                 page_cache_release(page);
131         }
132         kfree(pages);
133 }
134
135 static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, struct page ***pages)
136 {
137         int result = -ENOMEM;
138         unsigned long page_count;
139         size_t array_size;
140
141         page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
142         page_count -= user_addr >> PAGE_SHIFT;
143
144         array_size = (page_count * sizeof(struct page *));
145         *pages = kmalloc(array_size, GFP_KERNEL);
146         if (*pages) {
147                 down_read(&current->mm->mmap_sem);
148                 result = get_user_pages(current, current->mm, user_addr,
149                                         page_count, (rw == READ), 0,
150                                         *pages, NULL);
151                 up_read(&current->mm->mmap_sem);
152                 if (result != page_count) {
153                         /*
154                          * If we got fewer pages than expected from
155                          * get_user_pages(), the user buffer runs off the
156                          * end of a mapping; return EFAULT.
157                          */
158                         if (result >= 0) {
159                                 nfs_free_user_pages(*pages, result, 0);
160                                 result = -EFAULT;
161                         } else
162                                 kfree(*pages);
163                         *pages = NULL;
164                 }
165         }
166         return result;
167 }
168
169 static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
170 {
171         struct nfs_direct_req *dreq;
172
173         dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
174         if (!dreq)
175                 return NULL;
176
177         kref_init(&dreq->kref);
178         init_completion(&dreq->completion);
179         INIT_LIST_HEAD(&dreq->list);
180         INIT_LIST_HEAD(&dreq->rewrite_list);
181         dreq->iocb = NULL;
182         dreq->ctx = NULL;
183         spin_lock_init(&dreq->lock);
184         dreq->outstanding = 0;
185         dreq->count = 0;
186         dreq->error = 0;
187         dreq->flags = 0;
188
189         return dreq;
190 }
191
192 static void nfs_direct_req_release(struct kref *kref)
193 {
194         struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
195
196         if (dreq->ctx != NULL)
197                 put_nfs_open_context(dreq->ctx);
198         kmem_cache_free(nfs_direct_cachep, dreq);
199 }
200
201 /*
202  * Collects and returns the final error value/byte-count.
203  */
204 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
205 {
206         ssize_t result = -EIOCBQUEUED;
207
208         /* Async requests don't wait here */
209         if (dreq->iocb)
210                 goto out;
211
212         result = wait_for_completion_interruptible(&dreq->completion);
213
214         if (!result)
215                 result = dreq->error;
216         if (!result)
217                 result = dreq->count;
218
219 out:
220         kref_put(&dreq->kref, nfs_direct_req_release);
221         return (ssize_t) result;
222 }
223
224 /*
225  * We must hold a reference to all the pages in this direct read request
226  * until the RPCs complete.  This could be long *after* we are woken up in
227  * nfs_direct_wait (for instance, if someone hits ^C on a slow server).
228  *
229  * In addition, synchronous I/O uses a stack-allocated iocb.  Thus we
230  * can't trust the iocb is still valid here if this is a synchronous
231  * request.  If the waiter is woken prematurely, the iocb is long gone.
232  */
233 static void nfs_direct_complete(struct nfs_direct_req *dreq)
234 {
235         nfs_free_user_pages(dreq->pages, dreq->npages, 1);
236
237         if (dreq->iocb) {
238                 long res = (long) dreq->error;
239                 if (!res)
240                         res = (long) dreq->count;
241                 aio_complete(dreq->iocb, res, 0);
242         }
243         complete_all(&dreq->completion);
244
245         kref_put(&dreq->kref, nfs_direct_req_release);
246 }
247
248 /*
249  * Note we also set the number of requests we have in the dreq when we are
250  * done.  This prevents races with I/O completion so we will always wait
251  * until all requests have been dispatched and completed.
252  */
253 static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
254 {
255         struct list_head *list;
256         struct nfs_direct_req *dreq;
257         unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
258
259         dreq = nfs_direct_req_alloc();
260         if (!dreq)
261                 return NULL;
262
263         list = &dreq->list;
264         for(;;) {
265                 struct nfs_read_data *data = nfs_readdata_alloc(rpages);
266
267                 if (unlikely(!data)) {
268                         while (!list_empty(list)) {
269                                 data = list_entry(list->next,
270                                                   struct nfs_read_data, pages);
271                                 list_del(&data->pages);
272                                 nfs_readdata_free(data);
273                         }
274                         kref_put(&dreq->kref, nfs_direct_req_release);
275                         return NULL;
276                 }
277
278                 INIT_LIST_HEAD(&data->pages);
279                 list_add(&data->pages, list);
280
281                 data->req = (struct nfs_page *) dreq;
282                 dreq->outstanding++;
283                 if (nbytes <= rsize)
284                         break;
285                 nbytes -= rsize;
286         }
287         kref_get(&dreq->kref);
288         return dreq;
289 }
290
291 static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
292 {
293         struct nfs_read_data *data = calldata;
294         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
295
296         if (nfs_readpage_result(task, data) != 0)
297                 return;
298
299         spin_lock(&dreq->lock);
300
301         if (likely(task->tk_status >= 0))
302                 dreq->count += data->res.count;
303         else
304                 dreq->error = task->tk_status;
305
306         if (--dreq->outstanding) {
307                 spin_unlock(&dreq->lock);
308                 return;
309         }
310
311         spin_unlock(&dreq->lock);
312         nfs_direct_complete(dreq);
313 }
314
315 static const struct rpc_call_ops nfs_read_direct_ops = {
316         .rpc_call_done = nfs_direct_read_result,
317         .rpc_release = nfs_readdata_release,
318 };
319
320 /*
321  * For each nfs_read_data struct that was allocated on the list, dispatch
322  * an NFS READ operation
323  */
324 static void nfs_direct_read_schedule(struct nfs_direct_req *dreq)
325 {
326         struct nfs_open_context *ctx = dreq->ctx;
327         struct inode *inode = ctx->dentry->d_inode;
328         struct list_head *list = &dreq->list;
329         struct page **pages = dreq->pages;
330         size_t count = dreq->user_count;
331         loff_t pos = dreq->pos;
332         size_t rsize = NFS_SERVER(inode)->rsize;
333         unsigned int curpage, pgbase;
334
335         curpage = 0;
336         pgbase = dreq->user_addr & ~PAGE_MASK;
337         do {
338                 struct nfs_read_data *data;
339                 size_t bytes;
340
341                 bytes = rsize;
342                 if (count < rsize)
343                         bytes = count;
344
345                 BUG_ON(list_empty(list));
346                 data = list_entry(list->next, struct nfs_read_data, pages);
347                 list_del_init(&data->pages);
348
349                 data->inode = inode;
350                 data->cred = ctx->cred;
351                 data->args.fh = NFS_FH(inode);
352                 data->args.context = ctx;
353                 data->args.offset = pos;
354                 data->args.pgbase = pgbase;
355                 data->args.pages = &pages[curpage];
356                 data->args.count = bytes;
357                 data->res.fattr = &data->fattr;
358                 data->res.eof = 0;
359                 data->res.count = bytes;
360
361                 rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
362                                 &nfs_read_direct_ops, data);
363                 NFS_PROTO(inode)->read_setup(data);
364
365                 data->task.tk_cookie = (unsigned long) inode;
366
367                 lock_kernel();
368                 rpc_execute(&data->task);
369                 unlock_kernel();
370
371                 dfprintk(VFS, "NFS: %5u initiated direct read call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
372                                 data->task.tk_pid,
373                                 inode->i_sb->s_id,
374                                 (long long)NFS_FILEID(inode),
375                                 bytes,
376                                 (unsigned long long)data->args.offset);
377
378                 pos += bytes;
379                 pgbase += bytes;
380                 curpage += pgbase >> PAGE_SHIFT;
381                 pgbase &= ~PAGE_MASK;
382
383                 count -= bytes;
384         } while (count != 0);
385         BUG_ON(!list_empty(list));
386 }
387
388 static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, unsigned int nr_pages)
389 {
390         ssize_t result;
391         sigset_t oldset;
392         struct inode *inode = iocb->ki_filp->f_mapping->host;
393         struct rpc_clnt *clnt = NFS_CLIENT(inode);
394         struct nfs_direct_req *dreq;
395
396         dreq = nfs_direct_read_alloc(count, NFS_SERVER(inode)->rsize);
397         if (!dreq)
398                 return -ENOMEM;
399
400         dreq->user_addr = user_addr;
401         dreq->user_count = count;
402         dreq->pos = pos;
403         dreq->pages = pages;
404         dreq->npages = nr_pages;
405         dreq->inode = inode;
406         dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
407         if (!is_sync_kiocb(iocb))
408                 dreq->iocb = iocb;
409
410         nfs_add_stats(inode, NFSIOS_DIRECTREADBYTES, count);
411         rpc_clnt_sigmask(clnt, &oldset);
412         nfs_direct_read_schedule(dreq);
413         result = nfs_direct_wait(dreq);
414         rpc_clnt_sigunmask(clnt, &oldset);
415
416         return result;
417 }
418
419 static void nfs_direct_free_writedata(struct nfs_direct_req *dreq)
420 {
421         list_splice_init(&dreq->rewrite_list, &dreq->list);
422         while (!list_empty(&dreq->list)) {
423                 struct nfs_write_data *data = list_entry(dreq->list.next, struct nfs_write_data, pages);
424                 list_del(&data->pages);
425                 nfs_writedata_release(data);
426         }
427 }
428
429 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
430 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
431 {
432         struct list_head *pos;
433
434         list_splice_init(&dreq->rewrite_list, &dreq->list);
435         list_for_each(pos, &dreq->list)
436                 dreq->outstanding++;
437         dreq->count = 0;
438
439         nfs_direct_write_schedule(dreq, FLUSH_STABLE);
440 }
441
442 static void nfs_direct_commit_result(struct rpc_task *task, void *calldata)
443 {
444         struct nfs_write_data *data = calldata;
445         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
446
447         /* Call the NFS version-specific code */
448         if (NFS_PROTO(data->inode)->commit_done(task, data) != 0)
449                 return;
450         if (unlikely(task->tk_status < 0)) {
451                 dreq->error = task->tk_status;
452                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
453         }
454         if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
455                 dprintk("NFS: %5u commit verify failed\n", task->tk_pid);
456                 dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
457         }
458
459         dprintk("NFS: %5u commit returned %d\n", task->tk_pid, task->tk_status);
460         nfs_direct_write_complete(dreq, data->inode);
461 }
462
463 static const struct rpc_call_ops nfs_commit_direct_ops = {
464         .rpc_call_done = nfs_direct_commit_result,
465         .rpc_release = nfs_commit_release,
466 };
467
468 static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
469 {
470         struct nfs_write_data *data = dreq->commit_data;
471         struct rpc_task *task = &data->task;
472
473         data->inode = dreq->inode;
474         data->cred = dreq->ctx->cred;
475
476         data->args.fh = NFS_FH(data->inode);
477         data->args.offset = dreq->pos;
478         data->args.count = dreq->user_count;
479         data->res.count = 0;
480         data->res.fattr = &data->fattr;
481         data->res.verf = &data->verf;
482
483         rpc_init_task(&data->task, NFS_CLIENT(dreq->inode), RPC_TASK_ASYNC,
484                                 &nfs_commit_direct_ops, data);
485         NFS_PROTO(data->inode)->commit_setup(data, 0);
486
487         data->task.tk_priority = RPC_PRIORITY_NORMAL;
488         data->task.tk_cookie = (unsigned long)data->inode;
489         /* Note: task.tk_ops->rpc_release will free dreq->commit_data */
490         dreq->commit_data = NULL;
491
492         dprintk("NFS: %5u initiated commit call\n", task->tk_pid);
493
494         lock_kernel();
495         rpc_execute(&data->task);
496         unlock_kernel();
497 }
498
499 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
500 {
501         int flags = dreq->flags;
502
503         dreq->flags = 0;
504         switch (flags) {
505                 case NFS_ODIRECT_DO_COMMIT:
506                         nfs_direct_commit_schedule(dreq);
507                         break;
508                 case NFS_ODIRECT_RESCHED_WRITES:
509                         nfs_direct_write_reschedule(dreq);
510                         break;
511                 default:
512                         nfs_end_data_update(inode);
513                         if (dreq->commit_data != NULL)
514                                 nfs_commit_free(dreq->commit_data);
515                         nfs_direct_free_writedata(dreq);
516                         nfs_direct_complete(dreq);
517         }
518 }
519
520 static void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
521 {
522         dreq->commit_data = nfs_commit_alloc(0);
523         if (dreq->commit_data != NULL)
524                 dreq->commit_data->req = (struct nfs_page *) dreq;
525 }
526 #else
527 static inline void nfs_alloc_commit_data(struct nfs_direct_req *dreq)
528 {
529         dreq->commit_data = NULL;
530 }
531
532 static void nfs_direct_write_complete(struct nfs_direct_req *dreq, struct inode *inode)
533 {
534         nfs_end_data_update(inode);
535         nfs_direct_free_writedata(dreq);
536         nfs_direct_complete(dreq);
537 }
538 #endif
539
540 static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
541 {
542         struct list_head *list;
543         struct nfs_direct_req *dreq;
544         unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
545
546         dreq = nfs_direct_req_alloc();
547         if (!dreq)
548                 return NULL;
549
550         list = &dreq->list;
551         for(;;) {
552                 struct nfs_write_data *data = nfs_writedata_alloc(wpages);
553
554                 if (unlikely(!data)) {
555                         while (!list_empty(list)) {
556                                 data = list_entry(list->next,
557                                                   struct nfs_write_data, pages);
558                                 list_del(&data->pages);
559                                 nfs_writedata_free(data);
560                         }
561                         kref_put(&dreq->kref, nfs_direct_req_release);
562                         return NULL;
563                 }
564
565                 INIT_LIST_HEAD(&data->pages);
566                 list_add(&data->pages, list);
567
568                 data->req = (struct nfs_page *) dreq;
569                 dreq->outstanding++;
570                 if (nbytes <= wsize)
571                         break;
572                 nbytes -= wsize;
573         }
574
575         nfs_alloc_commit_data(dreq);
576
577         kref_get(&dreq->kref);
578         return dreq;
579 }
580
581 static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
582 {
583         struct nfs_write_data *data = calldata;
584         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
585         int status = task->tk_status;
586
587         if (nfs_writeback_done(task, data) != 0)
588                 return;
589
590         spin_lock(&dreq->lock);
591
592         if (likely(status >= 0))
593                 dreq->count += data->res.count;
594         else
595                 dreq->error = task->tk_status;
596
597         if (data->res.verf->committed != NFS_FILE_SYNC) {
598                 switch (dreq->flags) {
599                         case 0:
600                                 memcpy(&dreq->verf, &data->verf, sizeof(dreq->verf));
601                                 dreq->flags = NFS_ODIRECT_DO_COMMIT;
602                                 break;
603                         case NFS_ODIRECT_DO_COMMIT:
604                                 if (memcmp(&dreq->verf, &data->verf, sizeof(dreq->verf))) {
605                                         dprintk("NFS: %5u write verify failed\n", task->tk_pid);
606                                         dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
607                                 }
608                 }
609         }
610         /* In case we have to resend */
611         data->args.stable = NFS_FILE_SYNC;
612
613         spin_unlock(&dreq->lock);
614 }
615
616 /*
617  * NB: Return the value of the first error return code.  Subsequent
618  *     errors after the first one are ignored.
619  */
620 static void nfs_direct_write_release(void *calldata)
621 {
622         struct nfs_write_data *data = calldata;
623         struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
624
625         spin_lock(&dreq->lock);
626         if (--dreq->outstanding) {
627                 spin_unlock(&dreq->lock);
628                 return;
629         }
630         spin_unlock(&dreq->lock);
631
632         nfs_direct_write_complete(dreq, data->inode);
633 }
634
635 static const struct rpc_call_ops nfs_write_direct_ops = {
636         .rpc_call_done = nfs_direct_write_result,
637         .rpc_release = nfs_direct_write_release,
638 };
639
640 /*
641  * For each nfs_write_data struct that was allocated on the list, dispatch
642  * an NFS WRITE operation
643  */
644 static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, int sync)
645 {
646         struct nfs_open_context *ctx = dreq->ctx;
647         struct inode *inode = ctx->dentry->d_inode;
648         struct list_head *list = &dreq->list;
649         struct page **pages = dreq->pages;
650         size_t count = dreq->user_count;
651         loff_t pos = dreq->pos;
652         size_t wsize = NFS_SERVER(inode)->wsize;
653         unsigned int curpage, pgbase;
654
655         curpage = 0;
656         pgbase = dreq->user_addr & ~PAGE_MASK;
657         do {
658                 struct nfs_write_data *data;
659                 size_t bytes;
660
661                 bytes = wsize;
662                 if (count < wsize)
663                         bytes = count;
664
665                 BUG_ON(list_empty(list));
666                 data = list_entry(list->next, struct nfs_write_data, pages);
667                 list_move_tail(&data->pages, &dreq->rewrite_list);
668
669                 data->inode = inode;
670                 data->cred = ctx->cred;
671                 data->args.fh = NFS_FH(inode);
672                 data->args.context = ctx;
673                 data->args.offset = pos;
674                 data->args.pgbase = pgbase;
675                 data->args.pages = &pages[curpage];
676                 data->args.count = bytes;
677                 data->res.fattr = &data->fattr;
678                 data->res.count = bytes;
679                 data->res.verf = &data->verf;
680
681                 rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
682                                 &nfs_write_direct_ops, data);
683                 NFS_PROTO(inode)->write_setup(data, sync);
684
685                 data->task.tk_priority = RPC_PRIORITY_NORMAL;
686                 data->task.tk_cookie = (unsigned long) inode;
687
688                 lock_kernel();
689                 rpc_execute(&data->task);
690                 unlock_kernel();
691
692                 dfprintk(VFS, "NFS: %5u initiated direct write call (req %s/%Ld, %zu bytes @ offset %Lu)\n",
693                                 data->task.tk_pid,
694                                 inode->i_sb->s_id,
695                                 (long long)NFS_FILEID(inode),
696                                 bytes,
697                                 (unsigned long long)data->args.offset);
698
699                 pos += bytes;
700                 pgbase += bytes;
701                 curpage += pgbase >> PAGE_SHIFT;
702                 pgbase &= ~PAGE_MASK;
703
704                 count -= bytes;
705         } while (count != 0);
706         BUG_ON(!list_empty(list));
707 }
708
709 static ssize_t nfs_direct_write(struct kiocb *iocb, unsigned long user_addr, size_t count, loff_t pos, struct page **pages, int nr_pages)
710 {
711         ssize_t result;
712         sigset_t oldset;
713         struct inode *inode = iocb->ki_filp->f_mapping->host;
714         struct rpc_clnt *clnt = NFS_CLIENT(inode);
715         struct nfs_direct_req *dreq;
716         size_t wsize = NFS_SERVER(inode)->wsize;
717         int sync = 0;
718
719         dreq = nfs_direct_write_alloc(count, wsize);
720         if (!dreq)
721                 return -ENOMEM;
722         if (dreq->commit_data == NULL || count < wsize)
723                 sync = FLUSH_STABLE;
724
725         dreq->user_addr = user_addr;
726         dreq->user_count = count;
727         dreq->pos = pos;
728         dreq->pages = pages;
729         dreq->npages = nr_pages;
730         dreq->inode = inode;
731         dreq->ctx = get_nfs_open_context((struct nfs_open_context *)iocb->ki_filp->private_data);
732         if (!is_sync_kiocb(iocb))
733                 dreq->iocb = iocb;
734
735         nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, count);
736
737         nfs_begin_data_update(inode);
738
739         rpc_clnt_sigmask(clnt, &oldset);
740         nfs_direct_write_schedule(dreq, sync);
741         result = nfs_direct_wait(dreq);
742         rpc_clnt_sigunmask(clnt, &oldset);
743
744         return result;
745 }
746
747 /**
748  * nfs_file_direct_read - file direct read operation for NFS files
749  * @iocb: target I/O control block
750  * @buf: user's buffer into which to read data
751  * @count: number of bytes to read
752  * @pos: byte offset in file where reading starts
753  *
754  * We use this function for direct reads instead of calling
755  * generic_file_aio_read() in order to avoid gfar's check to see if
756  * the request starts before the end of the file.  For that check
757  * to work, we must generate a GETATTR before each direct read, and
758  * even then there is a window between the GETATTR and the subsequent
759  * READ where the file size could change.  Our preference is simply
760  * to do all reads the application wants, and the server will take
761  * care of managing the end of file boundary.
762  *
763  * This function also eliminates unnecessarily updating the file's
764  * atime locally, as the NFS server sets the file's atime, and this
765  * client must read the updated atime from the server back into its
766  * cache.
767  */
768 ssize_t nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t pos)
769 {
770         ssize_t retval = -EINVAL;
771         int page_count;
772         struct page **pages;
773         struct file *file = iocb->ki_filp;
774         struct address_space *mapping = file->f_mapping;
775
776         dprintk("nfs: direct read(%s/%s, %lu@%Ld)\n",
777                 file->f_dentry->d_parent->d_name.name,
778                 file->f_dentry->d_name.name,
779                 (unsigned long) count, (long long) pos);
780
781         if (count < 0)
782                 goto out;
783         retval = -EFAULT;
784         if (!access_ok(VERIFY_WRITE, buf, count))
785                 goto out;
786         retval = 0;
787         if (!count)
788                 goto out;
789
790         retval = nfs_sync_mapping(mapping);
791         if (retval)
792                 goto out;
793
794         retval = nfs_get_user_pages(READ, (unsigned long) buf,
795                                                 count, &pages);
796         if (retval < 0)
797                 goto out;
798         page_count = retval;
799
800         retval = nfs_direct_read(iocb, (unsigned long) buf, count, pos,
801                                                 pages, page_count);
802         if (retval > 0)
803                 iocb->ki_pos = pos + retval;
804
805 out:
806         return retval;
807 }
808
809 /**
810  * nfs_file_direct_write - file direct write operation for NFS files
811  * @iocb: target I/O control block
812  * @buf: user's buffer from which to write data
813  * @count: number of bytes to write
814  * @pos: byte offset in file where writing starts
815  *
816  * We use this function for direct writes instead of calling
817  * generic_file_aio_write() in order to avoid taking the inode
818  * semaphore and updating the i_size.  The NFS server will set
819  * the new i_size and this client must read the updated size
820  * back into its cache.  We let the server do generic write
821  * parameter checking and report problems.
822  *
823  * We also avoid an unnecessary invocation of generic_osync_inode(),
824  * as it is fairly meaningless to sync the metadata of an NFS file.
825  *
826  * We eliminate local atime updates, see direct read above.
827  *
828  * We avoid unnecessary page cache invalidations for normal cached
829  * readers of this file.
830  *
831  * Note that O_APPEND is not supported for NFS direct writes, as there
832  * is no atomic O_APPEND write facility in the NFS protocol.
833  */
834 ssize_t nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, loff_t pos)
835 {
836         ssize_t retval;
837         int page_count;
838         struct page **pages;
839         struct file *file = iocb->ki_filp;
840         struct address_space *mapping = file->f_mapping;
841
842         dfprintk(VFS, "nfs: direct write(%s/%s, %lu@%Ld)\n",
843                 file->f_dentry->d_parent->d_name.name,
844                 file->f_dentry->d_name.name,
845                 (unsigned long) count, (long long) pos);
846
847         retval = generic_write_checks(file, &pos, &count, 0);
848         if (retval)
849                 goto out;
850
851         retval = -EINVAL;
852         if ((ssize_t) count < 0)
853                 goto out;
854         retval = 0;
855         if (!count)
856                 goto out;
857
858         retval = -EFAULT;
859         if (!access_ok(VERIFY_READ, buf, count))
860                 goto out;
861
862         retval = nfs_sync_mapping(mapping);
863         if (retval)
864                 goto out;
865
866         retval = nfs_get_user_pages(WRITE, (unsigned long) buf,
867                                                 count, &pages);
868         if (retval < 0)
869                 goto out;
870         page_count = retval;
871
872         retval = nfs_direct_write(iocb, (unsigned long) buf, count,
873                                         pos, pages, page_count);
874
875         /*
876          * XXX: nfs_end_data_update() already ensures this file's
877          *      cached data is subsequently invalidated.  Do we really
878          *      need to call invalidate_inode_pages2() again here?
879          *
880          *      For aio writes, this invalidation will almost certainly
881          *      occur before the writes complete.  Kind of racey.
882          */
883         if (mapping->nrpages)
884                 invalidate_inode_pages2(mapping);
885
886         if (retval > 0)
887                 iocb->ki_pos = pos + retval;
888
889 out:
890         return retval;
891 }
892
893 /**
894  * nfs_init_directcache - create a slab cache for nfs_direct_req structures
895  *
896  */
897 int nfs_init_directcache(void)
898 {
899         nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
900                                                 sizeof(struct nfs_direct_req),
901                                                 0, SLAB_RECLAIM_ACCOUNT,
902                                                 NULL, NULL);
903         if (nfs_direct_cachep == NULL)
904                 return -ENOMEM;
905
906         return 0;
907 }
908
909 /**
910  * nfs_init_directcache - destroy the slab cache for nfs_direct_req structures
911  *
912  */
913 void nfs_destroy_directcache(void)
914 {
915         if (kmem_cache_destroy(nfs_direct_cachep))
916                 printk(KERN_INFO "nfs_direct_cache: not all structures were freed\n");
917 }