[PATCH] pipe: enable atomic copying of pipe data to/from user space
[linux-2.6] / fs / splice.c
1 /*
2  * "splice": joining two ropes together by interweaving their strands.
3  *
4  * This is the "extended pipe" functionality, where a pipe is used as
5  * an arbitrary in-memory buffer. Think of a pipe as a small kernel
6  * buffer that you can use to transfer data from one end to the other.
7  *
8  * The traditional unix read/write is extended with a "splice()" operation
9  * that transfers data buffers to or from a pipe buffer.
10  *
11  * Named by Larry McVoy, original implementation from Linus, extended by
12  * Jens to support splicing to files, network, direct splicing, etc and
13  * fixing lots of bugs.
14  *
15  * Copyright (C) 2005-2006 Jens Axboe <axboe@suse.de>
16  * Copyright (C) 2005-2006 Linus Torvalds <torvalds@osdl.org>
17  * Copyright (C) 2006 Ingo Molnar <mingo@elte.hu>
18  *
19  */
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/pagemap.h>
23 #include <linux/pipe_fs_i.h>
24 #include <linux/mm_inline.h>
25 #include <linux/swap.h>
26 #include <linux/writeback.h>
27 #include <linux/buffer_head.h>
28 #include <linux/module.h>
29 #include <linux/syscalls.h>
30 #include <linux/uio.h>
31
32 struct partial_page {
33         unsigned int offset;
34         unsigned int len;
35 };
36
37 /*
38  * Passed to splice_to_pipe
39  */
40 struct splice_pipe_desc {
41         struct page **pages;            /* page map */
42         struct partial_page *partial;   /* pages[] may not be contig */
43         int nr_pages;                   /* number of pages in map */
44         unsigned int flags;             /* splice flags */
45         struct pipe_buf_operations *ops;/* ops associated with output pipe */
46 };
47
48 /*
49  * Attempt to steal a page from a pipe buffer. This should perhaps go into
50  * a vm helper function, it's already simplified quite a bit by the
51  * addition of remove_mapping(). If success is returned, the caller may
52  * attempt to reuse this page for another destination.
53  */
54 static int page_cache_pipe_buf_steal(struct pipe_inode_info *info,
55                                      struct pipe_buffer *buf)
56 {
57         struct page *page = buf->page;
58         struct address_space *mapping = page_mapping(page);
59
60         lock_page(page);
61
62         WARN_ON(!PageUptodate(page));
63
64         /*
65          * At least for ext2 with nobh option, we need to wait on writeback
66          * completing on this page, since we'll remove it from the pagecache.
67          * Otherwise truncate wont wait on the page, allowing the disk
68          * blocks to be reused by someone else before we actually wrote our
69          * data to them. fs corruption ensues.
70          */
71         wait_on_page_writeback(page);
72
73         if (PagePrivate(page))
74                 try_to_release_page(page, mapping_gfp_mask(mapping));
75
76         if (!remove_mapping(mapping, page)) {
77                 unlock_page(page);
78                 return 1;
79         }
80
81         buf->flags |= PIPE_BUF_FLAG_LRU;
82         return 0;
83 }
84
85 static void page_cache_pipe_buf_release(struct pipe_inode_info *info,
86                                         struct pipe_buffer *buf)
87 {
88         page_cache_release(buf->page);
89         buf->page = NULL;
90         buf->flags &= ~PIPE_BUF_FLAG_LRU;
91 }
92
93 static int page_cache_pipe_buf_pin(struct pipe_inode_info *info,
94                                    struct pipe_buffer *buf)
95 {
96         struct page *page = buf->page;
97         int err;
98
99         if (!PageUptodate(page)) {
100                 lock_page(page);
101
102                 /*
103                  * Page got truncated/unhashed. This will cause a 0-byte
104                  * splice, if this is the first page.
105                  */
106                 if (!page->mapping) {
107                         err = -ENODATA;
108                         goto error;
109                 }
110
111                 /*
112                  * Uh oh, read-error from disk.
113                  */
114                 if (!PageUptodate(page)) {
115                         err = -EIO;
116                         goto error;
117                 }
118
119                 /*
120                  * Page is ok afterall, we are done.
121                  */
122                 unlock_page(page);
123         }
124
125         return 0;
126 error:
127         unlock_page(page);
128         return err;
129 }
130
131 static struct pipe_buf_operations page_cache_pipe_buf_ops = {
132         .can_merge = 0,
133         .map = generic_pipe_buf_map,
134         .unmap = generic_pipe_buf_unmap,
135         .pin = page_cache_pipe_buf_pin,
136         .release = page_cache_pipe_buf_release,
137         .steal = page_cache_pipe_buf_steal,
138         .get = generic_pipe_buf_get,
139 };
140
141 static int user_page_pipe_buf_steal(struct pipe_inode_info *pipe,
142                                     struct pipe_buffer *buf)
143 {
144         return 1;
145 }
146
147 static struct pipe_buf_operations user_page_pipe_buf_ops = {
148         .can_merge = 0,
149         .map = generic_pipe_buf_map,
150         .unmap = generic_pipe_buf_unmap,
151         .pin = generic_pipe_buf_pin,
152         .release = page_cache_pipe_buf_release,
153         .steal = user_page_pipe_buf_steal,
154         .get = generic_pipe_buf_get,
155 };
156
157 /*
158  * Pipe output worker. This sets up our pipe format with the page cache
159  * pipe buffer operations. Otherwise very similar to the regular pipe_writev().
160  */
161 static ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
162                               struct splice_pipe_desc *spd)
163 {
164         int ret, do_wakeup, page_nr;
165
166         ret = 0;
167         do_wakeup = 0;
168         page_nr = 0;
169
170         if (pipe->inode)
171                 mutex_lock(&pipe->inode->i_mutex);
172
173         for (;;) {
174                 if (!pipe->readers) {
175                         send_sig(SIGPIPE, current, 0);
176                         if (!ret)
177                                 ret = -EPIPE;
178                         break;
179                 }
180
181                 if (pipe->nrbufs < PIPE_BUFFERS) {
182                         int newbuf = (pipe->curbuf + pipe->nrbufs) & (PIPE_BUFFERS - 1);
183                         struct pipe_buffer *buf = pipe->bufs + newbuf;
184
185                         buf->page = spd->pages[page_nr];
186                         buf->offset = spd->partial[page_nr].offset;
187                         buf->len = spd->partial[page_nr].len;
188                         buf->ops = spd->ops;
189                         pipe->nrbufs++;
190                         page_nr++;
191                         ret += buf->len;
192
193                         if (pipe->inode)
194                                 do_wakeup = 1;
195
196                         if (!--spd->nr_pages)
197                                 break;
198                         if (pipe->nrbufs < PIPE_BUFFERS)
199                                 continue;
200
201                         break;
202                 }
203
204                 if (spd->flags & SPLICE_F_NONBLOCK) {
205                         if (!ret)
206                                 ret = -EAGAIN;
207                         break;
208                 }
209
210                 if (signal_pending(current)) {
211                         if (!ret)
212                                 ret = -ERESTARTSYS;
213                         break;
214                 }
215
216                 if (do_wakeup) {
217                         smp_mb();
218                         if (waitqueue_active(&pipe->wait))
219                                 wake_up_interruptible_sync(&pipe->wait);
220                         kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
221                         do_wakeup = 0;
222                 }
223
224                 pipe->waiting_writers++;
225                 pipe_wait(pipe);
226                 pipe->waiting_writers--;
227         }
228
229         if (pipe->inode)
230                 mutex_unlock(&pipe->inode->i_mutex);
231
232         if (do_wakeup) {
233                 smp_mb();
234                 if (waitqueue_active(&pipe->wait))
235                         wake_up_interruptible(&pipe->wait);
236                 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
237         }
238
239         while (page_nr < spd->nr_pages)
240                 page_cache_release(spd->pages[page_nr++]);
241
242         return ret;
243 }
244
245 static int
246 __generic_file_splice_read(struct file *in, loff_t *ppos,
247                            struct pipe_inode_info *pipe, size_t len,
248                            unsigned int flags)
249 {
250         struct address_space *mapping = in->f_mapping;
251         unsigned int loff, nr_pages;
252         struct page *pages[PIPE_BUFFERS];
253         struct partial_page partial[PIPE_BUFFERS];
254         struct page *page;
255         pgoff_t index, end_index;
256         loff_t isize;
257         size_t total_len;
258         int error, page_nr;
259         struct splice_pipe_desc spd = {
260                 .pages = pages,
261                 .partial = partial,
262                 .flags = flags,
263                 .ops = &page_cache_pipe_buf_ops,
264         };
265
266         index = *ppos >> PAGE_CACHE_SHIFT;
267         loff = *ppos & ~PAGE_CACHE_MASK;
268         nr_pages = (len + loff + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
269
270         if (nr_pages > PIPE_BUFFERS)
271                 nr_pages = PIPE_BUFFERS;
272
273         /*
274          * Initiate read-ahead on this page range. however, don't call into
275          * read-ahead if this is a non-zero offset (we are likely doing small
276          * chunk splice and the page is already there) for a single page.
277          */
278         if (!loff || nr_pages > 1)
279                 page_cache_readahead(mapping, &in->f_ra, in, index, nr_pages);
280
281         /*
282          * Now fill in the holes:
283          */
284         error = 0;
285         total_len = 0;
286
287         /*
288          * Lookup the (hopefully) full range of pages we need.
289          */
290         spd.nr_pages = find_get_pages_contig(mapping, index, nr_pages, pages);
291
292         /*
293          * If find_get_pages_contig() returned fewer pages than we needed,
294          * allocate the rest.
295          */
296         index += spd.nr_pages;
297         while (spd.nr_pages < nr_pages) {
298                 /*
299                  * Page could be there, find_get_pages_contig() breaks on
300                  * the first hole.
301                  */
302                 page = find_get_page(mapping, index);
303                 if (!page) {
304                         /*
305                          * Make sure the read-ahead engine is notified
306                          * about this failure.
307                          */
308                         handle_ra_miss(mapping, &in->f_ra, index);
309
310                         /*
311                          * page didn't exist, allocate one.
312                          */
313                         page = page_cache_alloc_cold(mapping);
314                         if (!page)
315                                 break;
316
317                         error = add_to_page_cache_lru(page, mapping, index,
318                                               mapping_gfp_mask(mapping));
319                         if (unlikely(error)) {
320                                 page_cache_release(page);
321                                 break;
322                         }
323                         /*
324                          * add_to_page_cache() locks the page, unlock it
325                          * to avoid convoluting the logic below even more.
326                          */
327                         unlock_page(page);
328                 }
329
330                 pages[spd.nr_pages++] = page;
331                 index++;
332         }
333
334         /*
335          * Now loop over the map and see if we need to start IO on any
336          * pages, fill in the partial map, etc.
337          */
338         index = *ppos >> PAGE_CACHE_SHIFT;
339         nr_pages = spd.nr_pages;
340         spd.nr_pages = 0;
341         for (page_nr = 0; page_nr < nr_pages; page_nr++) {
342                 unsigned int this_len;
343
344                 if (!len)
345                         break;
346
347                 /*
348                  * this_len is the max we'll use from this page
349                  */
350                 this_len = min_t(unsigned long, len, PAGE_CACHE_SIZE - loff);
351                 page = pages[page_nr];
352
353                 /*
354                  * If the page isn't uptodate, we may need to start io on it
355                  */
356                 if (!PageUptodate(page)) {
357                         /*
358                          * If in nonblock mode then dont block on waiting
359                          * for an in-flight io page
360                          */
361                         if (flags & SPLICE_F_NONBLOCK)
362                                 break;
363
364                         lock_page(page);
365
366                         /*
367                          * page was truncated, stop here. if this isn't the
368                          * first page, we'll just complete what we already
369                          * added
370                          */
371                         if (!page->mapping) {
372                                 unlock_page(page);
373                                 break;
374                         }
375                         /*
376                          * page was already under io and is now done, great
377                          */
378                         if (PageUptodate(page)) {
379                                 unlock_page(page);
380                                 goto fill_it;
381                         }
382
383                         /*
384                          * need to read in the page
385                          */
386                         error = mapping->a_ops->readpage(in, page);
387                         if (unlikely(error)) {
388                                 /*
389                                  * We really should re-lookup the page here,
390                                  * but it complicates things a lot. Instead
391                                  * lets just do what we already stored, and
392                                  * we'll get it the next time we are called.
393                                  */
394                                 if (error == AOP_TRUNCATED_PAGE)
395                                         error = 0;
396
397                                 break;
398                         }
399
400                         /*
401                          * i_size must be checked after ->readpage().
402                          */
403                         isize = i_size_read(mapping->host);
404                         end_index = (isize - 1) >> PAGE_CACHE_SHIFT;
405                         if (unlikely(!isize || index > end_index))
406                                 break;
407
408                         /*
409                          * if this is the last page, see if we need to shrink
410                          * the length and stop
411                          */
412                         if (end_index == index) {
413                                 loff = PAGE_CACHE_SIZE - (isize & ~PAGE_CACHE_MASK);
414                                 if (total_len + loff > isize)
415                                         break;
416                                 /*
417                                  * force quit after adding this page
418                                  */
419                                 len = this_len;
420                                 this_len = min(this_len, loff);
421                                 loff = 0;
422                         }
423                 }
424 fill_it:
425                 partial[page_nr].offset = loff;
426                 partial[page_nr].len = this_len;
427                 len -= this_len;
428                 total_len += this_len;
429                 loff = 0;
430                 spd.nr_pages++;
431                 index++;
432         }
433
434         /*
435          * Release any pages at the end, if we quit early. 'i' is how far
436          * we got, 'nr_pages' is how many pages are in the map.
437          */
438         while (page_nr < nr_pages)
439                 page_cache_release(pages[page_nr++]);
440
441         if (spd.nr_pages)
442                 return splice_to_pipe(pipe, &spd);
443
444         return error;
445 }
446
447 /**
448  * generic_file_splice_read - splice data from file to a pipe
449  * @in:         file to splice from
450  * @pipe:       pipe to splice to
451  * @len:        number of bytes to splice
452  * @flags:      splice modifier flags
453  *
454  * Will read pages from given file and fill them into a pipe.
455  */
456 ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
457                                  struct pipe_inode_info *pipe, size_t len,
458                                  unsigned int flags)
459 {
460         ssize_t spliced;
461         int ret;
462
463         ret = 0;
464         spliced = 0;
465
466         while (len) {
467                 ret = __generic_file_splice_read(in, ppos, pipe, len, flags);
468
469                 if (ret < 0)
470                         break;
471                 else if (!ret) {
472                         if (spliced)
473                                 break;
474                         if (flags & SPLICE_F_NONBLOCK) {
475                                 ret = -EAGAIN;
476                                 break;
477                         }
478                 }
479
480                 *ppos += ret;
481                 len -= ret;
482                 spliced += ret;
483         }
484
485         if (spliced)
486                 return spliced;
487
488         return ret;
489 }
490
491 EXPORT_SYMBOL(generic_file_splice_read);
492
493 /*
494  * Send 'sd->len' bytes to socket from 'sd->file' at position 'sd->pos'
495  * using sendpage(). Return the number of bytes sent.
496  */
497 static int pipe_to_sendpage(struct pipe_inode_info *info,
498                             struct pipe_buffer *buf, struct splice_desc *sd)
499 {
500         struct file *file = sd->file;
501         loff_t pos = sd->pos;
502         int ret, more;
503
504         ret = buf->ops->pin(info, buf);
505         if (!ret) {
506                 more = (sd->flags & SPLICE_F_MORE) || sd->len < sd->total_len;
507
508                 ret = file->f_op->sendpage(file, buf->page, buf->offset,
509                                            sd->len, &pos, more);
510         }
511
512         return ret;
513 }
514
515 /*
516  * This is a little more tricky than the file -> pipe splicing. There are
517  * basically three cases:
518  *
519  *      - Destination page already exists in the address space and there
520  *        are users of it. For that case we have no other option that
521  *        copying the data. Tough luck.
522  *      - Destination page already exists in the address space, but there
523  *        are no users of it. Make sure it's uptodate, then drop it. Fall
524  *        through to last case.
525  *      - Destination page does not exist, we can add the pipe page to
526  *        the page cache and avoid the copy.
527  *
528  * If asked to move pages to the output file (SPLICE_F_MOVE is set in
529  * sd->flags), we attempt to migrate pages from the pipe to the output
530  * file address space page cache. This is possible if no one else has
531  * the pipe page referenced outside of the pipe and page cache. If
532  * SPLICE_F_MOVE isn't set, or we cannot move the page, we simply create
533  * a new page in the output file page cache and fill/dirty that.
534  */
535 static int pipe_to_file(struct pipe_inode_info *info, struct pipe_buffer *buf,
536                         struct splice_desc *sd)
537 {
538         struct file *file = sd->file;
539         struct address_space *mapping = file->f_mapping;
540         gfp_t gfp_mask = mapping_gfp_mask(mapping);
541         unsigned int offset, this_len;
542         struct page *page;
543         pgoff_t index;
544         int ret;
545
546         /*
547          * make sure the data in this buffer is uptodate
548          */
549         ret = buf->ops->pin(info, buf);
550         if (unlikely(ret))
551                 return ret;
552
553         index = sd->pos >> PAGE_CACHE_SHIFT;
554         offset = sd->pos & ~PAGE_CACHE_MASK;
555
556         this_len = sd->len;
557         if (this_len + offset > PAGE_CACHE_SIZE)
558                 this_len = PAGE_CACHE_SIZE - offset;
559
560         /*
561          * Reuse buf page, if SPLICE_F_MOVE is set and we are doing a full
562          * page.
563          */
564         if ((sd->flags & SPLICE_F_MOVE) && this_len == PAGE_CACHE_SIZE) {
565                 /*
566                  * If steal succeeds, buf->page is now pruned from the vm
567                  * side (LRU and page cache) and we can reuse it. The page
568                  * will also be looked on successful return.
569                  */
570                 if (buf->ops->steal(info, buf))
571                         goto find_page;
572
573                 page = buf->page;
574                 if (add_to_page_cache(page, mapping, index, gfp_mask)) {
575                         unlock_page(page);
576                         goto find_page;
577                 }
578
579                 page_cache_get(page);
580
581                 if (!(buf->flags & PIPE_BUF_FLAG_LRU))
582                         lru_cache_add(page);
583         } else {
584 find_page:
585                 page = find_lock_page(mapping, index);
586                 if (!page) {
587                         ret = -ENOMEM;
588                         page = page_cache_alloc_cold(mapping);
589                         if (unlikely(!page))
590                                 goto out_nomem;
591
592                         /*
593                          * This will also lock the page
594                          */
595                         ret = add_to_page_cache_lru(page, mapping, index,
596                                                     gfp_mask);
597                         if (unlikely(ret))
598                                 goto out;
599                 }
600
601                 /*
602                  * We get here with the page locked. If the page is also
603                  * uptodate, we don't need to do more. If it isn't, we
604                  * may need to bring it in if we are not going to overwrite
605                  * the full page.
606                  */
607                 if (!PageUptodate(page)) {
608                         if (this_len < PAGE_CACHE_SIZE) {
609                                 ret = mapping->a_ops->readpage(file, page);
610                                 if (unlikely(ret))
611                                         goto out;
612
613                                 lock_page(page);
614
615                                 if (!PageUptodate(page)) {
616                                         /*
617                                          * Page got invalidated, repeat.
618                                          */
619                                         if (!page->mapping) {
620                                                 unlock_page(page);
621                                                 page_cache_release(page);
622                                                 goto find_page;
623                                         }
624                                         ret = -EIO;
625                                         goto out;
626                                 }
627                         } else
628                                 SetPageUptodate(page);
629                 }
630         }
631
632         ret = mapping->a_ops->prepare_write(file, page, offset, offset+this_len);
633         if (ret == AOP_TRUNCATED_PAGE) {
634                 page_cache_release(page);
635                 goto find_page;
636         } else if (ret)
637                 goto out;
638
639         if (buf->page != page) {
640                 /*
641                  * Careful, ->map() uses KM_USER0!
642                  */
643                 char *src = buf->ops->map(info, buf, 1);
644                 char *dst = kmap_atomic(page, KM_USER1);
645
646                 memcpy(dst + offset, src + buf->offset, this_len);
647                 flush_dcache_page(page);
648                 kunmap_atomic(dst, KM_USER1);
649                 buf->ops->unmap(info, buf, src);
650         }
651
652         ret = mapping->a_ops->commit_write(file, page, offset, offset+this_len);
653         if (!ret) {
654                 /*
655                  * Return the number of bytes written and mark page as
656                  * accessed, we are now done!
657                  */
658                 ret = this_len;
659                 mark_page_accessed(page);
660                 balance_dirty_pages_ratelimited(mapping);
661         } else if (ret == AOP_TRUNCATED_PAGE) {
662                 page_cache_release(page);
663                 goto find_page;
664         }
665 out:
666         page_cache_release(page);
667         unlock_page(page);
668 out_nomem:
669         return ret;
670 }
671
672 /*
673  * Pipe input worker. Most of this logic works like a regular pipe, the
674  * key here is the 'actor' worker passed in that actually moves the data
675  * to the wanted destination. See pipe_to_file/pipe_to_sendpage above.
676  */
677 ssize_t splice_from_pipe(struct pipe_inode_info *pipe, struct file *out,
678                          loff_t *ppos, size_t len, unsigned int flags,
679                          splice_actor *actor)
680 {
681         int ret, do_wakeup, err;
682         struct splice_desc sd;
683
684         ret = 0;
685         do_wakeup = 0;
686
687         sd.total_len = len;
688         sd.flags = flags;
689         sd.file = out;
690         sd.pos = *ppos;
691
692         if (pipe->inode)
693                 mutex_lock(&pipe->inode->i_mutex);
694
695         for (;;) {
696                 if (pipe->nrbufs) {
697                         struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
698                         struct pipe_buf_operations *ops = buf->ops;
699
700                         sd.len = buf->len;
701                         if (sd.len > sd.total_len)
702                                 sd.len = sd.total_len;
703
704                         err = actor(pipe, buf, &sd);
705                         if (err <= 0) {
706                                 if (!ret && err != -ENODATA)
707                                         ret = err;
708
709                                 break;
710                         }
711
712                         ret += err;
713                         buf->offset += err;
714                         buf->len -= err;
715
716                         sd.len -= err;
717                         sd.pos += err;
718                         sd.total_len -= err;
719                         if (sd.len)
720                                 continue;
721
722                         if (!buf->len) {
723                                 buf->ops = NULL;
724                                 ops->release(pipe, buf);
725                                 pipe->curbuf = (pipe->curbuf + 1) & (PIPE_BUFFERS - 1);
726                                 pipe->nrbufs--;
727                                 if (pipe->inode)
728                                         do_wakeup = 1;
729                         }
730
731                         if (!sd.total_len)
732                                 break;
733                 }
734
735                 if (pipe->nrbufs)
736                         continue;
737                 if (!pipe->writers)
738                         break;
739                 if (!pipe->waiting_writers) {
740                         if (ret)
741                                 break;
742                 }
743
744                 if (flags & SPLICE_F_NONBLOCK) {
745                         if (!ret)
746                                 ret = -EAGAIN;
747                         break;
748                 }
749
750                 if (signal_pending(current)) {
751                         if (!ret)
752                                 ret = -ERESTARTSYS;
753                         break;
754                 }
755
756                 if (do_wakeup) {
757                         smp_mb();
758                         if (waitqueue_active(&pipe->wait))
759                                 wake_up_interruptible_sync(&pipe->wait);
760                         kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
761                         do_wakeup = 0;
762                 }
763
764                 pipe_wait(pipe);
765         }
766
767         if (pipe->inode)
768                 mutex_unlock(&pipe->inode->i_mutex);
769
770         if (do_wakeup) {
771                 smp_mb();
772                 if (waitqueue_active(&pipe->wait))
773                         wake_up_interruptible(&pipe->wait);
774                 kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
775         }
776
777         return ret;
778 }
779
780 /**
781  * generic_file_splice_write - splice data from a pipe to a file
782  * @pipe:       pipe info
783  * @out:        file to write to
784  * @len:        number of bytes to splice
785  * @flags:      splice modifier flags
786  *
787  * Will either move or copy pages (determined by @flags options) from
788  * the given pipe inode to the given file.
789  *
790  */
791 ssize_t
792 generic_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
793                           loff_t *ppos, size_t len, unsigned int flags)
794 {
795         struct address_space *mapping = out->f_mapping;
796         ssize_t ret;
797
798         ret = splice_from_pipe(pipe, out, ppos, len, flags, pipe_to_file);
799         if (ret > 0) {
800                 struct inode *inode = mapping->host;
801
802                 *ppos += ret;
803
804                 /*
805                  * If file or inode is SYNC and we actually wrote some data,
806                  * sync it.
807                  */
808                 if (unlikely((out->f_flags & O_SYNC) || IS_SYNC(inode))) {
809                         int err;
810
811                         mutex_lock(&inode->i_mutex);
812                         err = generic_osync_inode(inode, mapping,
813                                                   OSYNC_METADATA|OSYNC_DATA);
814                         mutex_unlock(&inode->i_mutex);
815
816                         if (err)
817                                 ret = err;
818                 }
819         }
820
821         return ret;
822 }
823
824 EXPORT_SYMBOL(generic_file_splice_write);
825
826 /**
827  * generic_splice_sendpage - splice data from a pipe to a socket
828  * @inode:      pipe inode
829  * @out:        socket to write to
830  * @len:        number of bytes to splice
831  * @flags:      splice modifier flags
832  *
833  * Will send @len bytes from the pipe to a network socket. No data copying
834  * is involved.
835  *
836  */
837 ssize_t generic_splice_sendpage(struct pipe_inode_info *pipe, struct file *out,
838                                 loff_t *ppos, size_t len, unsigned int flags)
839 {
840         return splice_from_pipe(pipe, out, ppos, len, flags, pipe_to_sendpage);
841 }
842
843 EXPORT_SYMBOL(generic_splice_sendpage);
844
845 /*
846  * Attempt to initiate a splice from pipe to file.
847  */
848 static long do_splice_from(struct pipe_inode_info *pipe, struct file *out,
849                            loff_t *ppos, size_t len, unsigned int flags)
850 {
851         int ret;
852
853         if (unlikely(!out->f_op || !out->f_op->splice_write))
854                 return -EINVAL;
855
856         if (unlikely(!(out->f_mode & FMODE_WRITE)))
857                 return -EBADF;
858
859         ret = rw_verify_area(WRITE, out, ppos, len);
860         if (unlikely(ret < 0))
861                 return ret;
862
863         return out->f_op->splice_write(pipe, out, ppos, len, flags);
864 }
865
866 /*
867  * Attempt to initiate a splice from a file to a pipe.
868  */
869 static long do_splice_to(struct file *in, loff_t *ppos,
870                          struct pipe_inode_info *pipe, size_t len,
871                          unsigned int flags)
872 {
873         loff_t isize, left;
874         int ret;
875
876         if (unlikely(!in->f_op || !in->f_op->splice_read))
877                 return -EINVAL;
878
879         if (unlikely(!(in->f_mode & FMODE_READ)))
880                 return -EBADF;
881
882         ret = rw_verify_area(READ, in, ppos, len);
883         if (unlikely(ret < 0))
884                 return ret;
885
886         isize = i_size_read(in->f_mapping->host);
887         if (unlikely(*ppos >= isize))
888                 return 0;
889         
890         left = isize - *ppos;
891         if (unlikely(left < len))
892                 len = left;
893
894         return in->f_op->splice_read(in, ppos, pipe, len, flags);
895 }
896
897 long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
898                       size_t len, unsigned int flags)
899 {
900         struct pipe_inode_info *pipe;
901         long ret, bytes;
902         loff_t out_off;
903         umode_t i_mode;
904         int i;
905
906         /*
907          * We require the input being a regular file, as we don't want to
908          * randomly drop data for eg socket -> socket splicing. Use the
909          * piped splicing for that!
910          */
911         i_mode = in->f_dentry->d_inode->i_mode;
912         if (unlikely(!S_ISREG(i_mode) && !S_ISBLK(i_mode)))
913                 return -EINVAL;
914
915         /*
916          * neither in nor out is a pipe, setup an internal pipe attached to
917          * 'out' and transfer the wanted data from 'in' to 'out' through that
918          */
919         pipe = current->splice_pipe;
920         if (unlikely(!pipe)) {
921                 pipe = alloc_pipe_info(NULL);
922                 if (!pipe)
923                         return -ENOMEM;
924
925                 /*
926                  * We don't have an immediate reader, but we'll read the stuff
927                  * out of the pipe right after the splice_to_pipe(). So set
928                  * PIPE_READERS appropriately.
929                  */
930                 pipe->readers = 1;
931
932                 current->splice_pipe = pipe;
933         }
934
935         /*
936          * Do the splice.
937          */
938         ret = 0;
939         bytes = 0;
940         out_off = 0;
941
942         while (len) {
943                 size_t read_len, max_read_len;
944
945                 /*
946                  * Do at most PIPE_BUFFERS pages worth of transfer:
947                  */
948                 max_read_len = min(len, (size_t)(PIPE_BUFFERS*PAGE_SIZE));
949
950                 ret = do_splice_to(in, ppos, pipe, max_read_len, flags);
951                 if (unlikely(ret < 0))
952                         goto out_release;
953
954                 read_len = ret;
955
956                 /*
957                  * NOTE: nonblocking mode only applies to the input. We
958                  * must not do the output in nonblocking mode as then we
959                  * could get stuck data in the internal pipe:
960                  */
961                 ret = do_splice_from(pipe, out, &out_off, read_len,
962                                      flags & ~SPLICE_F_NONBLOCK);
963                 if (unlikely(ret < 0))
964                         goto out_release;
965
966                 bytes += ret;
967                 len -= ret;
968
969                 /*
970                  * In nonblocking mode, if we got back a short read then
971                  * that was due to either an IO error or due to the
972                  * pagecache entry not being there. In the IO error case
973                  * the _next_ splice attempt will produce a clean IO error
974                  * return value (not a short read), so in both cases it's
975                  * correct to break out of the loop here:
976                  */
977                 if ((flags & SPLICE_F_NONBLOCK) && (read_len < max_read_len))
978                         break;
979         }
980
981         pipe->nrbufs = pipe->curbuf = 0;
982
983         return bytes;
984
985 out_release:
986         /*
987          * If we did an incomplete transfer we must release
988          * the pipe buffers in question:
989          */
990         for (i = 0; i < PIPE_BUFFERS; i++) {
991                 struct pipe_buffer *buf = pipe->bufs + i;
992
993                 if (buf->ops) {
994                         buf->ops->release(pipe, buf);
995                         buf->ops = NULL;
996                 }
997         }
998         pipe->nrbufs = pipe->curbuf = 0;
999
1000         /*
1001          * If we transferred some data, return the number of bytes:
1002          */
1003         if (bytes > 0)
1004                 return bytes;
1005
1006         return ret;
1007 }
1008
1009 EXPORT_SYMBOL(do_splice_direct);
1010
1011 /*
1012  * Determine where to splice to/from.
1013  */
1014 static long do_splice(struct file *in, loff_t __user *off_in,
1015                       struct file *out, loff_t __user *off_out,
1016                       size_t len, unsigned int flags)
1017 {
1018         struct pipe_inode_info *pipe;
1019         loff_t offset, *off;
1020         long ret;
1021
1022         pipe = in->f_dentry->d_inode->i_pipe;
1023         if (pipe) {
1024                 if (off_in)
1025                         return -ESPIPE;
1026                 if (off_out) {
1027                         if (out->f_op->llseek == no_llseek)
1028                                 return -EINVAL;
1029                         if (copy_from_user(&offset, off_out, sizeof(loff_t)))
1030                                 return -EFAULT;
1031                         off = &offset;
1032                 } else
1033                         off = &out->f_pos;
1034
1035                 ret = do_splice_from(pipe, out, off, len, flags);
1036
1037                 if (off_out && copy_to_user(off_out, off, sizeof(loff_t)))
1038                         ret = -EFAULT;
1039
1040                 return ret;
1041         }
1042
1043         pipe = out->f_dentry->d_inode->i_pipe;
1044         if (pipe) {
1045                 if (off_out)
1046                         return -ESPIPE;
1047                 if (off_in) {
1048                         if (in->f_op->llseek == no_llseek)
1049                                 return -EINVAL;
1050                         if (copy_from_user(&offset, off_in, sizeof(loff_t)))
1051                                 return -EFAULT;
1052                         off = &offset;
1053                 } else
1054                         off = &in->f_pos;
1055
1056                 ret = do_splice_to(in, off, pipe, len, flags);
1057
1058                 if (off_in && copy_to_user(off_in, off, sizeof(loff_t)))
1059                         ret = -EFAULT;
1060
1061                 return ret;
1062         }
1063
1064         return -EINVAL;
1065 }
1066
1067 /*
1068  * Map an iov into an array of pages and offset/length tupples. With the
1069  * partial_page structure, we can map several non-contiguous ranges into
1070  * our ones pages[] map instead of splitting that operation into pieces.
1071  * Could easily be exported as a generic helper for other users, in which
1072  * case one would probably want to add a 'max_nr_pages' parameter as well.
1073  */
1074 static int get_iovec_page_array(const struct iovec __user *iov,
1075                                 unsigned int nr_vecs, struct page **pages,
1076                                 struct partial_page *partial)
1077 {
1078         int buffers = 0, error = 0;
1079
1080         /*
1081          * It's ok to take the mmap_sem for reading, even
1082          * across a "get_user()".
1083          */
1084         down_read(&current->mm->mmap_sem);
1085
1086         while (nr_vecs) {
1087                 unsigned long off, npages;
1088                 void __user *base;
1089                 size_t len;
1090                 int i;
1091
1092                 /*
1093                  * Get user address base and length for this iovec.
1094                  */
1095                 error = get_user(base, &iov->iov_base);
1096                 if (unlikely(error))
1097                         break;
1098                 error = get_user(len, &iov->iov_len);
1099                 if (unlikely(error))
1100                         break;
1101
1102                 /*
1103                  * Sanity check this iovec. 0 read succeeds.
1104                  */
1105                 if (unlikely(!len))
1106                         break;
1107                 error = -EFAULT;
1108                 if (unlikely(!base))
1109                         break;
1110
1111                 /*
1112                  * Get this base offset and number of pages, then map
1113                  * in the user pages.
1114                  */
1115                 off = (unsigned long) base & ~PAGE_MASK;
1116                 npages = (off + len + PAGE_SIZE - 1) >> PAGE_SHIFT;
1117                 if (npages > PIPE_BUFFERS - buffers)
1118                         npages = PIPE_BUFFERS - buffers;
1119
1120                 error = get_user_pages(current, current->mm,
1121                                        (unsigned long) base, npages, 0, 0,
1122                                        &pages[buffers], NULL);
1123
1124                 if (unlikely(error <= 0))
1125                         break;
1126
1127                 /*
1128                  * Fill this contiguous range into the partial page map.
1129                  */
1130                 for (i = 0; i < error; i++) {
1131                         const int plen = min_t(size_t, len, PAGE_SIZE) - off;
1132
1133                         partial[buffers].offset = off;
1134                         partial[buffers].len = plen;
1135
1136                         off = 0;
1137                         len -= plen;
1138                         buffers++;
1139                 }
1140
1141                 /*
1142                  * We didn't complete this iov, stop here since it probably
1143                  * means we have to move some of this into a pipe to
1144                  * be able to continue.
1145                  */
1146                 if (len)
1147                         break;
1148
1149                 /*
1150                  * Don't continue if we mapped fewer pages than we asked for,
1151                  * or if we mapped the max number of pages that we have
1152                  * room for.
1153                  */
1154                 if (error < npages || buffers == PIPE_BUFFERS)
1155                         break;
1156
1157                 nr_vecs--;
1158                 iov++;
1159         }
1160
1161         up_read(&current->mm->mmap_sem);
1162
1163         if (buffers)
1164                 return buffers;
1165
1166         return error;
1167 }
1168
1169 /*
1170  * vmsplice splices a user address range into a pipe. It can be thought of
1171  * as splice-from-memory, where the regular splice is splice-from-file (or
1172  * to file). In both cases the output is a pipe, naturally.
1173  *
1174  * Note that vmsplice only supports splicing _from_ user memory to a pipe,
1175  * not the other way around. Splicing from user memory is a simple operation
1176  * that can be supported without any funky alignment restrictions or nasty
1177  * vm tricks. We simply map in the user memory and fill them into a pipe.
1178  * The reverse isn't quite as easy, though. There are two possible solutions
1179  * for that:
1180  *
1181  *      - memcpy() the data internally, at which point we might as well just
1182  *        do a regular read() on the buffer anyway.
1183  *      - Lots of nasty vm tricks, that are neither fast nor flexible (it
1184  *        has restriction limitations on both ends of the pipe).
1185  *
1186  * Alas, it isn't here.
1187  *
1188  */
1189 static long do_vmsplice(struct file *file, const struct iovec __user *iov,
1190                         unsigned long nr_segs, unsigned int flags)
1191 {
1192         struct pipe_inode_info *pipe = file->f_dentry->d_inode->i_pipe;
1193         struct page *pages[PIPE_BUFFERS];
1194         struct partial_page partial[PIPE_BUFFERS];
1195         struct splice_pipe_desc spd = {
1196                 .pages = pages,
1197                 .partial = partial,
1198                 .flags = flags,
1199                 .ops = &user_page_pipe_buf_ops,
1200         };
1201
1202         if (unlikely(!pipe))
1203                 return -EBADF;
1204         if (unlikely(nr_segs > UIO_MAXIOV))
1205                 return -EINVAL;
1206         else if (unlikely(!nr_segs))
1207                 return 0;
1208
1209         spd.nr_pages = get_iovec_page_array(iov, nr_segs, pages, partial);
1210         if (spd.nr_pages <= 0)
1211                 return spd.nr_pages;
1212
1213         return splice_to_pipe(pipe, &spd);
1214 }
1215
1216 asmlinkage long sys_vmsplice(int fd, const struct iovec __user *iov,
1217                              unsigned long nr_segs, unsigned int flags)
1218 {
1219         struct file *file;
1220         long error;
1221         int fput;
1222
1223         error = -EBADF;
1224         file = fget_light(fd, &fput);
1225         if (file) {
1226                 if (file->f_mode & FMODE_WRITE)
1227                         error = do_vmsplice(file, iov, nr_segs, flags);
1228
1229                 fput_light(file, fput);
1230         }
1231
1232         return error;
1233 }
1234
1235 asmlinkage long sys_splice(int fd_in, loff_t __user *off_in,
1236                            int fd_out, loff_t __user *off_out,
1237                            size_t len, unsigned int flags)
1238 {
1239         long error;
1240         struct file *in, *out;
1241         int fput_in, fput_out;
1242
1243         if (unlikely(!len))
1244                 return 0;
1245
1246         error = -EBADF;
1247         in = fget_light(fd_in, &fput_in);
1248         if (in) {
1249                 if (in->f_mode & FMODE_READ) {
1250                         out = fget_light(fd_out, &fput_out);
1251                         if (out) {
1252                                 if (out->f_mode & FMODE_WRITE)
1253                                         error = do_splice(in, off_in,
1254                                                           out, off_out,
1255                                                           len, flags);
1256                                 fput_light(out, fput_out);
1257                         }
1258                 }
1259
1260                 fput_light(in, fput_in);
1261         }
1262
1263         return error;
1264 }
1265
1266 /*
1267  * Link contents of ipipe to opipe.
1268  */
1269 static int link_pipe(struct pipe_inode_info *ipipe,
1270                      struct pipe_inode_info *opipe,
1271                      size_t len, unsigned int flags)
1272 {
1273         struct pipe_buffer *ibuf, *obuf;
1274         int ret, do_wakeup, i, ipipe_first;
1275
1276         ret = do_wakeup = ipipe_first = 0;
1277
1278         /*
1279          * Potential ABBA deadlock, work around it by ordering lock
1280          * grabbing by inode address. Otherwise two different processes
1281          * could deadlock (one doing tee from A -> B, the other from B -> A).
1282          */
1283         if (ipipe->inode < opipe->inode) {
1284                 ipipe_first = 1;
1285                 mutex_lock(&ipipe->inode->i_mutex);
1286                 mutex_lock(&opipe->inode->i_mutex);
1287         } else {
1288                 mutex_lock(&opipe->inode->i_mutex);
1289                 mutex_lock(&ipipe->inode->i_mutex);
1290         }
1291
1292         for (i = 0;; i++) {
1293                 if (!opipe->readers) {
1294                         send_sig(SIGPIPE, current, 0);
1295                         if (!ret)
1296                                 ret = -EPIPE;
1297                         break;
1298                 }
1299                 if (ipipe->nrbufs - i) {
1300                         ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (PIPE_BUFFERS - 1));
1301
1302                         /*
1303                          * If we have room, fill this buffer
1304                          */
1305                         if (opipe->nrbufs < PIPE_BUFFERS) {
1306                                 int nbuf = (opipe->curbuf + opipe->nrbufs) & (PIPE_BUFFERS - 1);
1307
1308                                 /*
1309                                  * Get a reference to this pipe buffer,
1310                                  * so we can copy the contents over.
1311                                  */
1312                                 ibuf->ops->get(ipipe, ibuf);
1313
1314                                 obuf = opipe->bufs + nbuf;
1315                                 *obuf = *ibuf;
1316
1317                                 if (obuf->len > len)
1318                                         obuf->len = len;
1319
1320                                 opipe->nrbufs++;
1321                                 do_wakeup = 1;
1322                                 ret += obuf->len;
1323                                 len -= obuf->len;
1324
1325                                 if (!len)
1326                                         break;
1327                                 if (opipe->nrbufs < PIPE_BUFFERS)
1328                                         continue;
1329                         }
1330
1331                         /*
1332                          * We have input available, but no output room.
1333                          * If we already copied data, return that. If we
1334                          * need to drop the opipe lock, it must be ordered
1335                          * last to avoid deadlocks.
1336                          */
1337                         if ((flags & SPLICE_F_NONBLOCK) || !ipipe_first) {
1338                                 if (!ret)
1339                                         ret = -EAGAIN;
1340                                 break;
1341                         }
1342                         if (signal_pending(current)) {
1343                                 if (!ret)
1344                                         ret = -ERESTARTSYS;
1345                                 break;
1346                         }
1347                         if (do_wakeup) {
1348                                 smp_mb();
1349                                 if (waitqueue_active(&opipe->wait))
1350                                         wake_up_interruptible(&opipe->wait);
1351                                 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1352                                 do_wakeup = 0;
1353                         }
1354
1355                         opipe->waiting_writers++;
1356                         pipe_wait(opipe);
1357                         opipe->waiting_writers--;
1358                         continue;
1359                 }
1360
1361                 /*
1362                  * No input buffers, do the usual checks for available
1363                  * writers and blocking and wait if necessary
1364                  */
1365                 if (!ipipe->writers)
1366                         break;
1367                 if (!ipipe->waiting_writers) {
1368                         if (ret)
1369                                 break;
1370                 }
1371                 /*
1372                  * pipe_wait() drops the ipipe mutex. To avoid deadlocks
1373                  * with another process, we can only safely do that if
1374                  * the ipipe lock is ordered last.
1375                  */
1376                 if ((flags & SPLICE_F_NONBLOCK) || ipipe_first) {
1377                         if (!ret)
1378                                 ret = -EAGAIN;
1379                         break;
1380                 }
1381                 if (signal_pending(current)) {
1382                         if (!ret)
1383                                 ret = -ERESTARTSYS;
1384                         break;
1385                 }
1386
1387                 if (waitqueue_active(&ipipe->wait))
1388                         wake_up_interruptible_sync(&ipipe->wait);
1389                 kill_fasync(&ipipe->fasync_writers, SIGIO, POLL_OUT);
1390
1391                 pipe_wait(ipipe);
1392         }
1393
1394         mutex_unlock(&ipipe->inode->i_mutex);
1395         mutex_unlock(&opipe->inode->i_mutex);
1396
1397         if (do_wakeup) {
1398                 smp_mb();
1399                 if (waitqueue_active(&opipe->wait))
1400                         wake_up_interruptible(&opipe->wait);
1401                 kill_fasync(&opipe->fasync_readers, SIGIO, POLL_IN);
1402         }
1403
1404         return ret;
1405 }
1406
1407 /*
1408  * This is a tee(1) implementation that works on pipes. It doesn't copy
1409  * any data, it simply references the 'in' pages on the 'out' pipe.
1410  * The 'flags' used are the SPLICE_F_* variants, currently the only
1411  * applicable one is SPLICE_F_NONBLOCK.
1412  */
1413 static long do_tee(struct file *in, struct file *out, size_t len,
1414                    unsigned int flags)
1415 {
1416         struct pipe_inode_info *ipipe = in->f_dentry->d_inode->i_pipe;
1417         struct pipe_inode_info *opipe = out->f_dentry->d_inode->i_pipe;
1418
1419         /*
1420          * Link ipipe to the two output pipes, consuming as we go along.
1421          */
1422         if (ipipe && opipe)
1423                 return link_pipe(ipipe, opipe, len, flags);
1424
1425         return -EINVAL;
1426 }
1427
1428 asmlinkage long sys_tee(int fdin, int fdout, size_t len, unsigned int flags)
1429 {
1430         struct file *in;
1431         int error, fput_in;
1432
1433         if (unlikely(!len))
1434                 return 0;
1435
1436         error = -EBADF;
1437         in = fget_light(fdin, &fput_in);
1438         if (in) {
1439                 if (in->f_mode & FMODE_READ) {
1440                         int fput_out;
1441                         struct file *out = fget_light(fdout, &fput_out);
1442
1443                         if (out) {
1444                                 if (out->f_mode & FMODE_WRITE)
1445                                         error = do_tee(in, out, len, flags);
1446                                 fput_light(out, fput_out);
1447                         }
1448                 }
1449                 fput_light(in, fput_in);
1450         }
1451
1452         return error;
1453 }