2  * Copyright (C) 2002 Sistina Software (UK) Limited.
 
   4  * This file is released under the GPL.
 
   6  * Kcopyd provides a simple interface for copying an area of one
 
   7  * block-device to one or more other block-devices, with an asynchronous
 
   8  * completion notification.
 
  11 #include <asm/atomic.h>
 
  13 #include <linux/blkdev.h>
 
  14 #include <linux/config.h>
 
  16 #include <linux/init.h>
 
  17 #include <linux/list.h>
 
  18 #include <linux/mempool.h>
 
  19 #include <linux/module.h>
 
  20 #include <linux/pagemap.h>
 
  21 #include <linux/slab.h>
 
  22 #include <linux/vmalloc.h>
 
  23 #include <linux/workqueue.h>
 
  27 static struct workqueue_struct *_kcopyd_wq;
 
  28 static struct work_struct _kcopyd_work;
 
  30 static inline void wake(void)
 
  32         queue_work(_kcopyd_wq, &_kcopyd_work);
 
  35 /*-----------------------------------------------------------------
 
  36  * Each kcopyd client has its own little pool of preallocated
 
  37  * pages for kcopyd io.
 
  38  *---------------------------------------------------------------*/
 
  39 struct kcopyd_client {
 
  40         struct list_head list;
 
  43         struct page_list *pages;
 
  44         unsigned int nr_pages;
 
  45         unsigned int nr_free_pages;
 
  48 static struct page_list *alloc_pl(void)
 
  52         pl = kmalloc(sizeof(*pl), GFP_KERNEL);
 
  56         pl->page = alloc_page(GFP_KERNEL);
 
  65 static void free_pl(struct page_list *pl)
 
  67         __free_page(pl->page);
 
  71 static int kcopyd_get_pages(struct kcopyd_client *kc,
 
  72                             unsigned int nr, struct page_list **pages)
 
  77         if (kc->nr_free_pages < nr) {
 
  78                 spin_unlock(&kc->lock);
 
  82         kc->nr_free_pages -= nr;
 
  83         for (*pages = pl = kc->pages; --nr; pl = pl->next)
 
  89         spin_unlock(&kc->lock);
 
  94 static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
 
  96         struct page_list *cursor;
 
  99         for (cursor = pl; cursor->next; cursor = cursor->next)
 
 103         cursor->next = kc->pages;
 
 105         spin_unlock(&kc->lock);
 
 109  * These three functions resize the page pool.
 
 111 static void drop_pages(struct page_list *pl)
 
 113         struct page_list *next;
 
 122 static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
 
 125         struct page_list *pl = NULL, *next;
 
 127         for (i = 0; i < nr; i++) {
 
 138         kcopyd_put_pages(kc, pl);
 
 143 static void client_free_pages(struct kcopyd_client *kc)
 
 145         BUG_ON(kc->nr_free_pages != kc->nr_pages);
 
 146         drop_pages(kc->pages);
 
 148         kc->nr_free_pages = kc->nr_pages = 0;
 
 151 /*-----------------------------------------------------------------
 
 152  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 
 153  * for this reason we use a mempool to prevent the client from
 
 154  * ever having to do io (which could cause a deadlock).
 
 155  *---------------------------------------------------------------*/
 
 157         struct kcopyd_client *kc;
 
 158         struct list_head list;
 
 162          * Error state of the job.
 
 165         unsigned int write_err;
 
 168          * Either READ or WRITE
 
 171         struct io_region source;
 
 174          * The destinations for the transfer.
 
 176         unsigned int num_dests;
 
 177         struct io_region dests[KCOPYD_MAX_REGIONS];
 
 180         unsigned int nr_pages;
 
 181         struct page_list *pages;
 
 184          * Set this to ensure you are notified when the job has
 
 185          * completed.  'context' is for callback to use.
 
 191          * These fields are only used if the job has been split
 
 192          * into more manageable parts.
 
 194         struct semaphore lock;
 
 199 /* FIXME: this should scale with the number of pages */
 
 202 static kmem_cache_t *_job_cache;
 
 203 static mempool_t *_job_pool;
 
 206  * We maintain three lists of jobs:
 
 208  * i)   jobs waiting for pages
 
 209  * ii)  jobs that have pages, and are waiting for the io to be issued.
 
 210  * iii) jobs that have completed.
 
 212  * All three of these are protected by job_lock.
 
 214 static DEFINE_SPINLOCK(_job_lock);
 
 216 static LIST_HEAD(_complete_jobs);
 
 217 static LIST_HEAD(_io_jobs);
 
 218 static LIST_HEAD(_pages_jobs);
 
 220 static int jobs_init(void)
 
 222         _job_cache = kmem_cache_create("kcopyd-jobs",
 
 223                                        sizeof(struct kcopyd_job),
 
 224                                        __alignof__(struct kcopyd_job),
 
 229         _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
 
 230                                    mempool_free_slab, _job_cache);
 
 232                 kmem_cache_destroy(_job_cache);
 
 239 static void jobs_exit(void)
 
 241         BUG_ON(!list_empty(&_complete_jobs));
 
 242         BUG_ON(!list_empty(&_io_jobs));
 
 243         BUG_ON(!list_empty(&_pages_jobs));
 
 245         mempool_destroy(_job_pool);
 
 246         kmem_cache_destroy(_job_cache);
 
 252  * Functions to push and pop a job onto the head of a given job
 
 255 static inline struct kcopyd_job *pop(struct list_head *jobs)
 
 257         struct kcopyd_job *job = NULL;
 
 260         spin_lock_irqsave(&_job_lock, flags);
 
 262         if (!list_empty(jobs)) {
 
 263                 job = list_entry(jobs->next, struct kcopyd_job, list);
 
 264                 list_del(&job->list);
 
 266         spin_unlock_irqrestore(&_job_lock, flags);
 
 271 static inline void push(struct list_head *jobs, struct kcopyd_job *job)
 
 275         spin_lock_irqsave(&_job_lock, flags);
 
 276         list_add_tail(&job->list, jobs);
 
 277         spin_unlock_irqrestore(&_job_lock, flags);
 
 281  * These three functions process 1 item from the corresponding
 
 287  * > 0: can't process yet.
 
 289 static int run_complete_job(struct kcopyd_job *job)
 
 291         void *context = job->context;
 
 292         int read_err = job->read_err;
 
 293         unsigned int write_err = job->write_err;
 
 294         kcopyd_notify_fn fn = job->fn;
 
 296         kcopyd_put_pages(job->kc, job->pages);
 
 297         mempool_free(job, _job_pool);
 
 298         fn(read_err, write_err, context);
 
 302 static void complete_io(unsigned long error, void *context)
 
 304         struct kcopyd_job *job = (struct kcopyd_job *) context;
 
 307                 if (job->rw == WRITE)
 
 308                         job->write_err &= error;
 
 312                 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
 
 313                         push(&_complete_jobs, job);
 
 319         if (job->rw == WRITE)
 
 320                 push(&_complete_jobs, job);
 
 324                 push(&_io_jobs, job);
 
 331  * Request io on as many buffer heads as we can currently get for
 
 334 static int run_io_job(struct kcopyd_job *job)
 
 339                 r = dm_io_async(1, &job->source, job->rw,
 
 341                                 job->offset, complete_io, job);
 
 344                 r = dm_io_async(job->num_dests, job->dests, job->rw,
 
 346                                 job->offset, complete_io, job);
 
 351 static int run_pages_job(struct kcopyd_job *job)
 
 355         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
 
 357         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
 
 359                 /* this job is ready for io */
 
 360                 push(&_io_jobs, job);
 
 365                 /* can't complete now */
 
 372  * Run through a list for as long as possible.  Returns the count
 
 373  * of successful jobs.
 
 375 static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
 
 377         struct kcopyd_job *job;
 
 380         while ((job = pop(jobs))) {
 
 385                         /* error this rogue job */
 
 386                         if (job->rw == WRITE)
 
 387                                 job->write_err = (unsigned int) -1;
 
 390                         push(&_complete_jobs, job);
 
 396                          * We couldn't service this job ATM, so
 
 397                          * push this job back onto the list.
 
 410  * kcopyd does this every time it's woken up.
 
 412 static void do_work(void *ignored)
 
 415          * The order that these are called is *very* important.
 
 416          * complete jobs can free some pages for pages jobs.
 
 417          * Pages jobs when successful will jump onto the io jobs
 
 418          * list.  io jobs call wake when they complete and it all
 
 421         process_jobs(&_complete_jobs, run_complete_job);
 
 422         process_jobs(&_pages_jobs, run_pages_job);
 
 423         process_jobs(&_io_jobs, run_io_job);
 
 427  * If we are copying a small region we just dispatch a single job
 
 428  * to do the copy, otherwise the io has to be split up into many
 
 431 static void dispatch_job(struct kcopyd_job *job)
 
 433         push(&_pages_jobs, job);
 
 437 #define SUB_JOB_SIZE 128
 
 438 static void segment_complete(int read_err,
 
 439                              unsigned int write_err, void *context)
 
 441         /* FIXME: tidy this function */
 
 442         sector_t progress = 0;
 
 444         struct kcopyd_job *job = (struct kcopyd_job *) context;
 
 448         /* update the error */
 
 453                 job->write_err &= write_err;
 
 456          * Only dispatch more work if there hasn't been an error.
 
 458         if ((!job->read_err && !job->write_err) ||
 
 459             test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
 
 460                 /* get the next chunk of work */
 
 461                 progress = job->progress;
 
 462                 count = job->source.count - progress;
 
 464                         if (count > SUB_JOB_SIZE)
 
 465                                 count = SUB_JOB_SIZE;
 
 467                         job->progress += count;
 
 474                 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
 
 477                 sub_job->source.sector += progress;
 
 478                 sub_job->source.count = count;
 
 480                 for (i = 0; i < job->num_dests; i++) {
 
 481                         sub_job->dests[i].sector += progress;
 
 482                         sub_job->dests[i].count = count;
 
 485                 sub_job->fn = segment_complete;
 
 486                 sub_job->context = job;
 
 487                 dispatch_job(sub_job);
 
 489         } else if (atomic_dec_and_test(&job->sub_jobs)) {
 
 492                  * To avoid a race we must keep the job around
 
 493                  * until after the notify function has completed.
 
 494                  * Otherwise the client may try and stop the job
 
 495                  * after we've completed.
 
 497                 job->fn(read_err, write_err, job->context);
 
 498                 mempool_free(job, _job_pool);
 
 503  * Create some little jobs that will do the move between
 
 506 #define SPLIT_COUNT 8
 
 507 static void split_job(struct kcopyd_job *job)
 
 511         atomic_set(&job->sub_jobs, SPLIT_COUNT);
 
 512         for (i = 0; i < SPLIT_COUNT; i++)
 
 513                 segment_complete(0, 0u, job);
 
 516 int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
 
 517                 unsigned int num_dests, struct io_region *dests,
 
 518                 unsigned int flags, kcopyd_notify_fn fn, void *context)
 
 520         struct kcopyd_job *job;
 
 523          * Allocate a new job.
 
 525         job = mempool_alloc(_job_pool, GFP_NOIO);
 
 528          * set up for the read.
 
 538         job->num_dests = num_dests;
 
 539         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 
 546         job->context = context;
 
 548         if (job->source.count < SUB_JOB_SIZE)
 
 552                 init_MUTEX(&job->lock);
 
 561  * Cancels a kcopyd job, eg. someone might be deactivating a
 
 565 int kcopyd_cancel(struct kcopyd_job *job, int block)
 
 572 /*-----------------------------------------------------------------
 
 574  *---------------------------------------------------------------*/
 
 575 static DECLARE_MUTEX(_client_lock);
 
 576 static LIST_HEAD(_clients);
 
 578 static void client_add(struct kcopyd_client *kc)
 
 581         list_add(&kc->list, &_clients);
 
 585 static void client_del(struct kcopyd_client *kc)
 
 592 static DECLARE_MUTEX(kcopyd_init_lock);
 
 593 static int kcopyd_clients = 0;
 
 595 static int kcopyd_init(void)
 
 599         down(&kcopyd_init_lock);
 
 601         if (kcopyd_clients) {
 
 602                 /* Already initialized. */
 
 604                 up(&kcopyd_init_lock);
 
 610                 up(&kcopyd_init_lock);
 
 614         _kcopyd_wq = create_singlethread_workqueue("kcopyd");
 
 617                 up(&kcopyd_init_lock);
 
 622         INIT_WORK(&_kcopyd_work, do_work, NULL);
 
 623         up(&kcopyd_init_lock);
 
 627 static void kcopyd_exit(void)
 
 629         down(&kcopyd_init_lock);
 
 631         if (!kcopyd_clients) {
 
 633                 destroy_workqueue(_kcopyd_wq);
 
 636         up(&kcopyd_init_lock);
 
 639 int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
 
 642         struct kcopyd_client *kc;
 
 648         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 
 654         spin_lock_init(&kc->lock);
 
 656         kc->nr_pages = kc->nr_free_pages = 0;
 
 657         r = client_alloc_pages(kc, nr_pages);
 
 664         r = dm_io_get(nr_pages);
 
 666                 client_free_pages(kc);
 
 677 void kcopyd_client_destroy(struct kcopyd_client *kc)
 
 679         dm_io_put(kc->nr_pages);
 
 680         client_free_pages(kc);
 
 686 EXPORT_SYMBOL(kcopyd_client_create);
 
 687 EXPORT_SYMBOL(kcopyd_client_destroy);
 
 688 EXPORT_SYMBOL(kcopyd_copy);