[BLOCK] aoe: update for combined io statistics
[linux-2.6] / drivers / md / kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  *
4  * This file is released under the GPL.
5  *
6  * Kcopyd provides a simple interface for copying an area of one
7  * block-device to one or more other block-devices, with an asynchronous
8  * completion notification.
9  */
10
11 #include <asm/atomic.h>
12
13 #include <linux/blkdev.h>
14 #include <linux/config.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24
25 #include "kcopyd.h"
26
27 static struct workqueue_struct *_kcopyd_wq;
28 static struct work_struct _kcopyd_work;
29
30 static inline void wake(void)
31 {
32         queue_work(_kcopyd_wq, &_kcopyd_work);
33 }
34
35 /*-----------------------------------------------------------------
36  * Each kcopyd client has its own little pool of preallocated
37  * pages for kcopyd io.
38  *---------------------------------------------------------------*/
39 struct kcopyd_client {
40         struct list_head list;
41
42         spinlock_t lock;
43         struct page_list *pages;
44         unsigned int nr_pages;
45         unsigned int nr_free_pages;
46 };
47
48 static struct page_list *alloc_pl(void)
49 {
50         struct page_list *pl;
51
52         pl = kmalloc(sizeof(*pl), GFP_KERNEL);
53         if (!pl)
54                 return NULL;
55
56         pl->page = alloc_page(GFP_KERNEL);
57         if (!pl->page) {
58                 kfree(pl);
59                 return NULL;
60         }
61
62         return pl;
63 }
64
65 static void free_pl(struct page_list *pl)
66 {
67         __free_page(pl->page);
68         kfree(pl);
69 }
70
71 static int kcopyd_get_pages(struct kcopyd_client *kc,
72                             unsigned int nr, struct page_list **pages)
73 {
74         struct page_list *pl;
75
76         spin_lock(&kc->lock);
77         if (kc->nr_free_pages < nr) {
78                 spin_unlock(&kc->lock);
79                 return -ENOMEM;
80         }
81
82         kc->nr_free_pages -= nr;
83         for (*pages = pl = kc->pages; --nr; pl = pl->next)
84                 ;
85
86         kc->pages = pl->next;
87         pl->next = NULL;
88
89         spin_unlock(&kc->lock);
90
91         return 0;
92 }
93
94 static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
95 {
96         struct page_list *cursor;
97
98         spin_lock(&kc->lock);
99         for (cursor = pl; cursor->next; cursor = cursor->next)
100                 kc->nr_free_pages++;
101
102         kc->nr_free_pages++;
103         cursor->next = kc->pages;
104         kc->pages = pl;
105         spin_unlock(&kc->lock);
106 }
107
108 /*
109  * These three functions resize the page pool.
110  */
111 static void drop_pages(struct page_list *pl)
112 {
113         struct page_list *next;
114
115         while (pl) {
116                 next = pl->next;
117                 free_pl(pl);
118                 pl = next;
119         }
120 }
121
122 static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
123 {
124         unsigned int i;
125         struct page_list *pl = NULL, *next;
126
127         for (i = 0; i < nr; i++) {
128                 next = alloc_pl();
129                 if (!next) {
130                         if (pl)
131                                 drop_pages(pl);
132                         return -ENOMEM;
133                 }
134                 next->next = pl;
135                 pl = next;
136         }
137
138         kcopyd_put_pages(kc, pl);
139         kc->nr_pages += nr;
140         return 0;
141 }
142
143 static void client_free_pages(struct kcopyd_client *kc)
144 {
145         BUG_ON(kc->nr_free_pages != kc->nr_pages);
146         drop_pages(kc->pages);
147         kc->pages = NULL;
148         kc->nr_free_pages = kc->nr_pages = 0;
149 }
150
151 /*-----------------------------------------------------------------
152  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
153  * for this reason we use a mempool to prevent the client from
154  * ever having to do io (which could cause a deadlock).
155  *---------------------------------------------------------------*/
156 struct kcopyd_job {
157         struct kcopyd_client *kc;
158         struct list_head list;
159         unsigned long flags;
160
161         /*
162          * Error state of the job.
163          */
164         int read_err;
165         unsigned int write_err;
166
167         /*
168          * Either READ or WRITE
169          */
170         int rw;
171         struct io_region source;
172
173         /*
174          * The destinations for the transfer.
175          */
176         unsigned int num_dests;
177         struct io_region dests[KCOPYD_MAX_REGIONS];
178
179         sector_t offset;
180         unsigned int nr_pages;
181         struct page_list *pages;
182
183         /*
184          * Set this to ensure you are notified when the job has
185          * completed.  'context' is for callback to use.
186          */
187         kcopyd_notify_fn fn;
188         void *context;
189
190         /*
191          * These fields are only used if the job has been split
192          * into more manageable parts.
193          */
194         struct semaphore lock;
195         atomic_t sub_jobs;
196         sector_t progress;
197 };
198
199 /* FIXME: this should scale with the number of pages */
200 #define MIN_JOBS 512
201
202 static kmem_cache_t *_job_cache;
203 static mempool_t *_job_pool;
204
205 /*
206  * We maintain three lists of jobs:
207  *
208  * i)   jobs waiting for pages
209  * ii)  jobs that have pages, and are waiting for the io to be issued.
210  * iii) jobs that have completed.
211  *
212  * All three of these are protected by job_lock.
213  */
214 static DEFINE_SPINLOCK(_job_lock);
215
216 static LIST_HEAD(_complete_jobs);
217 static LIST_HEAD(_io_jobs);
218 static LIST_HEAD(_pages_jobs);
219
220 static int jobs_init(void)
221 {
222         _job_cache = kmem_cache_create("kcopyd-jobs",
223                                        sizeof(struct kcopyd_job),
224                                        __alignof__(struct kcopyd_job),
225                                        0, NULL, NULL);
226         if (!_job_cache)
227                 return -ENOMEM;
228
229         _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
230                                    mempool_free_slab, _job_cache);
231         if (!_job_pool) {
232                 kmem_cache_destroy(_job_cache);
233                 return -ENOMEM;
234         }
235
236         return 0;
237 }
238
239 static void jobs_exit(void)
240 {
241         BUG_ON(!list_empty(&_complete_jobs));
242         BUG_ON(!list_empty(&_io_jobs));
243         BUG_ON(!list_empty(&_pages_jobs));
244
245         mempool_destroy(_job_pool);
246         kmem_cache_destroy(_job_cache);
247         _job_pool = NULL;
248         _job_cache = NULL;
249 }
250
251 /*
252  * Functions to push and pop a job onto the head of a given job
253  * list.
254  */
255 static inline struct kcopyd_job *pop(struct list_head *jobs)
256 {
257         struct kcopyd_job *job = NULL;
258         unsigned long flags;
259
260         spin_lock_irqsave(&_job_lock, flags);
261
262         if (!list_empty(jobs)) {
263                 job = list_entry(jobs->next, struct kcopyd_job, list);
264                 list_del(&job->list);
265         }
266         spin_unlock_irqrestore(&_job_lock, flags);
267
268         return job;
269 }
270
271 static inline void push(struct list_head *jobs, struct kcopyd_job *job)
272 {
273         unsigned long flags;
274
275         spin_lock_irqsave(&_job_lock, flags);
276         list_add_tail(&job->list, jobs);
277         spin_unlock_irqrestore(&_job_lock, flags);
278 }
279
280 /*
281  * These three functions process 1 item from the corresponding
282  * job list.
283  *
284  * They return:
285  * < 0: error
286  *   0: success
287  * > 0: can't process yet.
288  */
289 static int run_complete_job(struct kcopyd_job *job)
290 {
291         void *context = job->context;
292         int read_err = job->read_err;
293         unsigned int write_err = job->write_err;
294         kcopyd_notify_fn fn = job->fn;
295
296         kcopyd_put_pages(job->kc, job->pages);
297         mempool_free(job, _job_pool);
298         fn(read_err, write_err, context);
299         return 0;
300 }
301
302 static void complete_io(unsigned long error, void *context)
303 {
304         struct kcopyd_job *job = (struct kcopyd_job *) context;
305
306         if (error) {
307                 if (job->rw == WRITE)
308                         job->write_err &= error;
309                 else
310                         job->read_err = 1;
311
312                 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
313                         push(&_complete_jobs, job);
314                         wake();
315                         return;
316                 }
317         }
318
319         if (job->rw == WRITE)
320                 push(&_complete_jobs, job);
321
322         else {
323                 job->rw = WRITE;
324                 push(&_io_jobs, job);
325         }
326
327         wake();
328 }
329
330 /*
331  * Request io on as many buffer heads as we can currently get for
332  * a particular job.
333  */
334 static int run_io_job(struct kcopyd_job *job)
335 {
336         int r;
337
338         if (job->rw == READ)
339                 r = dm_io_async(1, &job->source, job->rw,
340                                 job->pages,
341                                 job->offset, complete_io, job);
342
343         else
344                 r = dm_io_async(job->num_dests, job->dests, job->rw,
345                                 job->pages,
346                                 job->offset, complete_io, job);
347
348         return r;
349 }
350
351 static int run_pages_job(struct kcopyd_job *job)
352 {
353         int r;
354
355         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
356                                   PAGE_SIZE >> 9);
357         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
358         if (!r) {
359                 /* this job is ready for io */
360                 push(&_io_jobs, job);
361                 return 0;
362         }
363
364         if (r == -ENOMEM)
365                 /* can't complete now */
366                 return 1;
367
368         return r;
369 }
370
371 /*
372  * Run through a list for as long as possible.  Returns the count
373  * of successful jobs.
374  */
375 static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
376 {
377         struct kcopyd_job *job;
378         int r, count = 0;
379
380         while ((job = pop(jobs))) {
381
382                 r = fn(job);
383
384                 if (r < 0) {
385                         /* error this rogue job */
386                         if (job->rw == WRITE)
387                                 job->write_err = (unsigned int) -1;
388                         else
389                                 job->read_err = 1;
390                         push(&_complete_jobs, job);
391                         break;
392                 }
393
394                 if (r > 0) {
395                         /*
396                          * We couldn't service this job ATM, so
397                          * push this job back onto the list.
398                          */
399                         push(jobs, job);
400                         break;
401                 }
402
403                 count++;
404         }
405
406         return count;
407 }
408
409 /*
410  * kcopyd does this every time it's woken up.
411  */
412 static void do_work(void *ignored)
413 {
414         /*
415          * The order that these are called is *very* important.
416          * complete jobs can free some pages for pages jobs.
417          * Pages jobs when successful will jump onto the io jobs
418          * list.  io jobs call wake when they complete and it all
419          * starts again.
420          */
421         process_jobs(&_complete_jobs, run_complete_job);
422         process_jobs(&_pages_jobs, run_pages_job);
423         process_jobs(&_io_jobs, run_io_job);
424 }
425
426 /*
427  * If we are copying a small region we just dispatch a single job
428  * to do the copy, otherwise the io has to be split up into many
429  * jobs.
430  */
431 static void dispatch_job(struct kcopyd_job *job)
432 {
433         push(&_pages_jobs, job);
434         wake();
435 }
436
437 #define SUB_JOB_SIZE 128
438 static void segment_complete(int read_err,
439                              unsigned int write_err, void *context)
440 {
441         /* FIXME: tidy this function */
442         sector_t progress = 0;
443         sector_t count = 0;
444         struct kcopyd_job *job = (struct kcopyd_job *) context;
445
446         down(&job->lock);
447
448         /* update the error */
449         if (read_err)
450                 job->read_err = 1;
451
452         if (write_err)
453                 job->write_err &= write_err;
454
455         /*
456          * Only dispatch more work if there hasn't been an error.
457          */
458         if ((!job->read_err && !job->write_err) ||
459             test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
460                 /* get the next chunk of work */
461                 progress = job->progress;
462                 count = job->source.count - progress;
463                 if (count) {
464                         if (count > SUB_JOB_SIZE)
465                                 count = SUB_JOB_SIZE;
466
467                         job->progress += count;
468                 }
469         }
470         up(&job->lock);
471
472         if (count) {
473                 int i;
474                 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
475
476                 *sub_job = *job;
477                 sub_job->source.sector += progress;
478                 sub_job->source.count = count;
479
480                 for (i = 0; i < job->num_dests; i++) {
481                         sub_job->dests[i].sector += progress;
482                         sub_job->dests[i].count = count;
483                 }
484
485                 sub_job->fn = segment_complete;
486                 sub_job->context = job;
487                 dispatch_job(sub_job);
488
489         } else if (atomic_dec_and_test(&job->sub_jobs)) {
490
491                 /*
492                  * To avoid a race we must keep the job around
493                  * until after the notify function has completed.
494                  * Otherwise the client may try and stop the job
495                  * after we've completed.
496                  */
497                 job->fn(read_err, write_err, job->context);
498                 mempool_free(job, _job_pool);
499         }
500 }
501
502 /*
503  * Create some little jobs that will do the move between
504  * them.
505  */
506 #define SPLIT_COUNT 8
507 static void split_job(struct kcopyd_job *job)
508 {
509         int i;
510
511         atomic_set(&job->sub_jobs, SPLIT_COUNT);
512         for (i = 0; i < SPLIT_COUNT; i++)
513                 segment_complete(0, 0u, job);
514 }
515
516 int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
517                 unsigned int num_dests, struct io_region *dests,
518                 unsigned int flags, kcopyd_notify_fn fn, void *context)
519 {
520         struct kcopyd_job *job;
521
522         /*
523          * Allocate a new job.
524          */
525         job = mempool_alloc(_job_pool, GFP_NOIO);
526
527         /*
528          * set up for the read.
529          */
530         job->kc = kc;
531         job->flags = flags;
532         job->read_err = 0;
533         job->write_err = 0;
534         job->rw = READ;
535
536         job->source = *from;
537
538         job->num_dests = num_dests;
539         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
540
541         job->offset = 0;
542         job->nr_pages = 0;
543         job->pages = NULL;
544
545         job->fn = fn;
546         job->context = context;
547
548         if (job->source.count < SUB_JOB_SIZE)
549                 dispatch_job(job);
550
551         else {
552                 init_MUTEX(&job->lock);
553                 job->progress = 0;
554                 split_job(job);
555         }
556
557         return 0;
558 }
559
560 /*
561  * Cancels a kcopyd job, eg. someone might be deactivating a
562  * mirror.
563  */
564 int kcopyd_cancel(struct kcopyd_job *job, int block)
565 {
566         /* FIXME: finish */
567         return -1;
568 }
569
570 /*-----------------------------------------------------------------
571  * Unit setup
572  *---------------------------------------------------------------*/
573 static DECLARE_MUTEX(_client_lock);
574 static LIST_HEAD(_clients);
575
576 static void client_add(struct kcopyd_client *kc)
577 {
578         down(&_client_lock);
579         list_add(&kc->list, &_clients);
580         up(&_client_lock);
581 }
582
583 static void client_del(struct kcopyd_client *kc)
584 {
585         down(&_client_lock);
586         list_del(&kc->list);
587         up(&_client_lock);
588 }
589
590 static DECLARE_MUTEX(kcopyd_init_lock);
591 static int kcopyd_clients = 0;
592
593 static int kcopyd_init(void)
594 {
595         int r;
596
597         down(&kcopyd_init_lock);
598
599         if (kcopyd_clients) {
600                 /* Already initialized. */
601                 kcopyd_clients++;
602                 up(&kcopyd_init_lock);
603                 return 0;
604         }
605
606         r = jobs_init();
607         if (r) {
608                 up(&kcopyd_init_lock);
609                 return r;
610         }
611
612         _kcopyd_wq = create_singlethread_workqueue("kcopyd");
613         if (!_kcopyd_wq) {
614                 jobs_exit();
615                 up(&kcopyd_init_lock);
616                 return -ENOMEM;
617         }
618
619         kcopyd_clients++;
620         INIT_WORK(&_kcopyd_work, do_work, NULL);
621         up(&kcopyd_init_lock);
622         return 0;
623 }
624
625 static void kcopyd_exit(void)
626 {
627         down(&kcopyd_init_lock);
628         kcopyd_clients--;
629         if (!kcopyd_clients) {
630                 jobs_exit();
631                 destroy_workqueue(_kcopyd_wq);
632                 _kcopyd_wq = NULL;
633         }
634         up(&kcopyd_init_lock);
635 }
636
637 int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
638 {
639         int r = 0;
640         struct kcopyd_client *kc;
641
642         r = kcopyd_init();
643         if (r)
644                 return r;
645
646         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
647         if (!kc) {
648                 kcopyd_exit();
649                 return -ENOMEM;
650         }
651
652         spin_lock_init(&kc->lock);
653         kc->pages = NULL;
654         kc->nr_pages = kc->nr_free_pages = 0;
655         r = client_alloc_pages(kc, nr_pages);
656         if (r) {
657                 kfree(kc);
658                 kcopyd_exit();
659                 return r;
660         }
661
662         r = dm_io_get(nr_pages);
663         if (r) {
664                 client_free_pages(kc);
665                 kfree(kc);
666                 kcopyd_exit();
667                 return r;
668         }
669
670         client_add(kc);
671         *result = kc;
672         return 0;
673 }
674
675 void kcopyd_client_destroy(struct kcopyd_client *kc)
676 {
677         dm_io_put(kc->nr_pages);
678         client_free_pages(kc);
679         client_del(kc);
680         kfree(kc);
681         kcopyd_exit();
682 }
683
684 EXPORT_SYMBOL(kcopyd_client_create);
685 EXPORT_SYMBOL(kcopyd_client_destroy);
686 EXPORT_SYMBOL(kcopyd_copy);
687 EXPORT_SYMBOL(kcopyd_cancel);