2  * Copyright (C) 2002 Sistina Software (UK) Limited.
 
   3  * Copyright (C) 2006 Red Hat GmbH
 
   5  * This file is released under the GPL.
 
   7  * Kcopyd provides a simple interface for copying an area of one
 
   8  * block-device to one or more other block-devices, with an asynchronous
 
   9  * completion notification.
 
  12 #include <asm/types.h>
 
  13 #include <asm/atomic.h>
 
  15 #include <linux/blkdev.h>
 
  17 #include <linux/init.h>
 
  18 #include <linux/list.h>
 
  19 #include <linux/mempool.h>
 
  20 #include <linux/module.h>
 
  21 #include <linux/pagemap.h>
 
  22 #include <linux/slab.h>
 
  23 #include <linux/vmalloc.h>
 
  24 #include <linux/workqueue.h>
 
  25 #include <linux/mutex.h>
 
  29 static struct workqueue_struct *_kcopyd_wq;
 
  30 static struct work_struct _kcopyd_work;
 
  32 static inline void wake(void)
 
  34         queue_work(_kcopyd_wq, &_kcopyd_work);
 
  37 /*-----------------------------------------------------------------
 
  38  * Each kcopyd client has its own little pool of preallocated
 
  39  * pages for kcopyd io.
 
  40  *---------------------------------------------------------------*/
 
  41 struct kcopyd_client {
 
  42         struct list_head list;
 
  45         struct page_list *pages;
 
  46         unsigned int nr_pages;
 
  47         unsigned int nr_free_pages;
 
  49         struct dm_io_client *io_client;
 
  51         wait_queue_head_t destroyq;
 
  55 static struct page_list *alloc_pl(void)
 
  59         pl = kmalloc(sizeof(*pl), GFP_KERNEL);
 
  63         pl->page = alloc_page(GFP_KERNEL);
 
  72 static void free_pl(struct page_list *pl)
 
  74         __free_page(pl->page);
 
  78 static int kcopyd_get_pages(struct kcopyd_client *kc,
 
  79                             unsigned int nr, struct page_list **pages)
 
  84         if (kc->nr_free_pages < nr) {
 
  85                 spin_unlock(&kc->lock);
 
  89         kc->nr_free_pages -= nr;
 
  90         for (*pages = pl = kc->pages; --nr; pl = pl->next)
 
  96         spin_unlock(&kc->lock);
 
 101 static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
 
 103         struct page_list *cursor;
 
 105         spin_lock(&kc->lock);
 
 106         for (cursor = pl; cursor->next; cursor = cursor->next)
 
 110         cursor->next = kc->pages;
 
 112         spin_unlock(&kc->lock);
 
 116  * These three functions resize the page pool.
 
 118 static void drop_pages(struct page_list *pl)
 
 120         struct page_list *next;
 
 129 static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
 
 132         struct page_list *pl = NULL, *next;
 
 134         for (i = 0; i < nr; i++) {
 
 145         kcopyd_put_pages(kc, pl);
 
 150 static void client_free_pages(struct kcopyd_client *kc)
 
 152         BUG_ON(kc->nr_free_pages != kc->nr_pages);
 
 153         drop_pages(kc->pages);
 
 155         kc->nr_free_pages = kc->nr_pages = 0;
 
 158 /*-----------------------------------------------------------------
 
 159  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
 
 160  * for this reason we use a mempool to prevent the client from
 
 161  * ever having to do io (which could cause a deadlock).
 
 162  *---------------------------------------------------------------*/
 
 164         struct kcopyd_client *kc;
 
 165         struct list_head list;
 
 169          * Error state of the job.
 
 172         unsigned int write_err;
 
 175          * Either READ or WRITE
 
 178         struct io_region source;
 
 181          * The destinations for the transfer.
 
 183         unsigned int num_dests;
 
 184         struct io_region dests[KCOPYD_MAX_REGIONS];
 
 187         unsigned int nr_pages;
 
 188         struct page_list *pages;
 
 191          * Set this to ensure you are notified when the job has
 
 192          * completed.  'context' is for callback to use.
 
 198          * These fields are only used if the job has been split
 
 199          * into more manageable parts.
 
 201         struct semaphore lock;
 
 206 /* FIXME: this should scale with the number of pages */
 
 209 static struct kmem_cache *_job_cache;
 
 210 static mempool_t *_job_pool;
 
 213  * We maintain three lists of jobs:
 
 215  * i)   jobs waiting for pages
 
 216  * ii)  jobs that have pages, and are waiting for the io to be issued.
 
 217  * iii) jobs that have completed.
 
 219  * All three of these are protected by job_lock.
 
 221 static DEFINE_SPINLOCK(_job_lock);
 
 223 static LIST_HEAD(_complete_jobs);
 
 224 static LIST_HEAD(_io_jobs);
 
 225 static LIST_HEAD(_pages_jobs);
 
 227 static int jobs_init(void)
 
 229         _job_cache = kmem_cache_create("kcopyd-jobs",
 
 230                                        sizeof(struct kcopyd_job),
 
 231                                        __alignof__(struct kcopyd_job),
 
 236         _job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
 
 238                 kmem_cache_destroy(_job_cache);
 
 245 static void jobs_exit(void)
 
 247         BUG_ON(!list_empty(&_complete_jobs));
 
 248         BUG_ON(!list_empty(&_io_jobs));
 
 249         BUG_ON(!list_empty(&_pages_jobs));
 
 251         mempool_destroy(_job_pool);
 
 252         kmem_cache_destroy(_job_cache);
 
 258  * Functions to push and pop a job onto the head of a given job
 
 261 static inline struct kcopyd_job *pop(struct list_head *jobs)
 
 263         struct kcopyd_job *job = NULL;
 
 266         spin_lock_irqsave(&_job_lock, flags);
 
 268         if (!list_empty(jobs)) {
 
 269                 job = list_entry(jobs->next, struct kcopyd_job, list);
 
 270                 list_del(&job->list);
 
 272         spin_unlock_irqrestore(&_job_lock, flags);
 
 277 static inline void push(struct list_head *jobs, struct kcopyd_job *job)
 
 281         spin_lock_irqsave(&_job_lock, flags);
 
 282         list_add_tail(&job->list, jobs);
 
 283         spin_unlock_irqrestore(&_job_lock, flags);
 
 287  * These three functions process 1 item from the corresponding
 
 293  * > 0: can't process yet.
 
 295 static int run_complete_job(struct kcopyd_job *job)
 
 297         void *context = job->context;
 
 298         int read_err = job->read_err;
 
 299         unsigned int write_err = job->write_err;
 
 300         kcopyd_notify_fn fn = job->fn;
 
 301         struct kcopyd_client *kc = job->kc;
 
 303         kcopyd_put_pages(kc, job->pages);
 
 304         mempool_free(job, _job_pool);
 
 305         fn(read_err, write_err, context);
 
 307         if (atomic_dec_and_test(&kc->nr_jobs))
 
 308                 wake_up(&kc->destroyq);
 
 313 static void complete_io(unsigned long error, void *context)
 
 315         struct kcopyd_job *job = (struct kcopyd_job *) context;
 
 318                 if (job->rw == WRITE)
 
 319                         job->write_err |= error;
 
 323                 if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
 
 324                         push(&_complete_jobs, job);
 
 330         if (job->rw == WRITE)
 
 331                 push(&_complete_jobs, job);
 
 335                 push(&_io_jobs, job);
 
 342  * Request io on as many buffer heads as we can currently get for
 
 345 static int run_io_job(struct kcopyd_job *job)
 
 348         struct dm_io_request io_req = {
 
 350                 .mem.type = DM_IO_PAGE_LIST,
 
 351                 .mem.ptr.pl = job->pages,
 
 352                 .mem.offset = job->offset,
 
 353                 .notify.fn = complete_io,
 
 354                 .notify.context = job,
 
 355                 .client = job->kc->io_client,
 
 359                 r = dm_io(&io_req, 1, &job->source, NULL);
 
 361                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
 
 366 static int run_pages_job(struct kcopyd_job *job)
 
 370         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
 
 372         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
 
 374                 /* this job is ready for io */
 
 375                 push(&_io_jobs, job);
 
 380                 /* can't complete now */
 
 387  * Run through a list for as long as possible.  Returns the count
 
 388  * of successful jobs.
 
 390 static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
 
 392         struct kcopyd_job *job;
 
 395         while ((job = pop(jobs))) {
 
 400                         /* error this rogue job */
 
 401                         if (job->rw == WRITE)
 
 402                                 job->write_err = (unsigned int) -1;
 
 405                         push(&_complete_jobs, job);
 
 411                          * We couldn't service this job ATM, so
 
 412                          * push this job back onto the list.
 
 425  * kcopyd does this every time it's woken up.
 
 427 static void do_work(struct work_struct *ignored)
 
 430          * The order that these are called is *very* important.
 
 431          * complete jobs can free some pages for pages jobs.
 
 432          * Pages jobs when successful will jump onto the io jobs
 
 433          * list.  io jobs call wake when they complete and it all
 
 436         process_jobs(&_complete_jobs, run_complete_job);
 
 437         process_jobs(&_pages_jobs, run_pages_job);
 
 438         process_jobs(&_io_jobs, run_io_job);
 
 442  * If we are copying a small region we just dispatch a single job
 
 443  * to do the copy, otherwise the io has to be split up into many
 
 446 static void dispatch_job(struct kcopyd_job *job)
 
 448         atomic_inc(&job->kc->nr_jobs);
 
 449         push(&_pages_jobs, job);
 
 453 #define SUB_JOB_SIZE 128
 
 454 static void segment_complete(int read_err,
 
 455                              unsigned int write_err, void *context)
 
 457         /* FIXME: tidy this function */
 
 458         sector_t progress = 0;
 
 460         struct kcopyd_job *job = (struct kcopyd_job *) context;
 
 464         /* update the error */
 
 469                 job->write_err |= write_err;
 
 472          * Only dispatch more work if there hasn't been an error.
 
 474         if ((!job->read_err && !job->write_err) ||
 
 475             test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
 
 476                 /* get the next chunk of work */
 
 477                 progress = job->progress;
 
 478                 count = job->source.count - progress;
 
 480                         if (count > SUB_JOB_SIZE)
 
 481                                 count = SUB_JOB_SIZE;
 
 483                         job->progress += count;
 
 490                 struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
 
 493                 sub_job->source.sector += progress;
 
 494                 sub_job->source.count = count;
 
 496                 for (i = 0; i < job->num_dests; i++) {
 
 497                         sub_job->dests[i].sector += progress;
 
 498                         sub_job->dests[i].count = count;
 
 501                 sub_job->fn = segment_complete;
 
 502                 sub_job->context = job;
 
 503                 dispatch_job(sub_job);
 
 505         } else if (atomic_dec_and_test(&job->sub_jobs)) {
 
 508                  * To avoid a race we must keep the job around
 
 509                  * until after the notify function has completed.
 
 510                  * Otherwise the client may try and stop the job
 
 511                  * after we've completed.
 
 513                 job->fn(read_err, write_err, job->context);
 
 514                 mempool_free(job, _job_pool);
 
 519  * Create some little jobs that will do the move between
 
 522 #define SPLIT_COUNT 8
 
 523 static void split_job(struct kcopyd_job *job)
 
 527         atomic_set(&job->sub_jobs, SPLIT_COUNT);
 
 528         for (i = 0; i < SPLIT_COUNT; i++)
 
 529                 segment_complete(0, 0u, job);
 
 532 int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
 
 533                 unsigned int num_dests, struct io_region *dests,
 
 534                 unsigned int flags, kcopyd_notify_fn fn, void *context)
 
 536         struct kcopyd_job *job;
 
 539          * Allocate a new job.
 
 541         job = mempool_alloc(_job_pool, GFP_NOIO);
 
 544          * set up for the read.
 
 554         job->num_dests = num_dests;
 
 555         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
 
 562         job->context = context;
 
 564         if (job->source.count < SUB_JOB_SIZE)
 
 568                 init_MUTEX(&job->lock);
 
 577  * Cancels a kcopyd job, eg. someone might be deactivating a
 
 581 int kcopyd_cancel(struct kcopyd_job *job, int block)
 
 588 /*-----------------------------------------------------------------
 
 590  *---------------------------------------------------------------*/
 
 591 static DEFINE_MUTEX(_client_lock);
 
 592 static LIST_HEAD(_clients);
 
 594 static void client_add(struct kcopyd_client *kc)
 
 596         mutex_lock(&_client_lock);
 
 597         list_add(&kc->list, &_clients);
 
 598         mutex_unlock(&_client_lock);
 
 601 static void client_del(struct kcopyd_client *kc)
 
 603         mutex_lock(&_client_lock);
 
 605         mutex_unlock(&_client_lock);
 
 608 static DEFINE_MUTEX(kcopyd_init_lock);
 
 609 static int kcopyd_clients = 0;
 
 611 static int kcopyd_init(void)
 
 615         mutex_lock(&kcopyd_init_lock);
 
 617         if (kcopyd_clients) {
 
 618                 /* Already initialized. */
 
 620                 mutex_unlock(&kcopyd_init_lock);
 
 626                 mutex_unlock(&kcopyd_init_lock);
 
 630         _kcopyd_wq = create_singlethread_workqueue("kcopyd");
 
 633                 mutex_unlock(&kcopyd_init_lock);
 
 638         INIT_WORK(&_kcopyd_work, do_work);
 
 639         mutex_unlock(&kcopyd_init_lock);
 
 643 static void kcopyd_exit(void)
 
 645         mutex_lock(&kcopyd_init_lock);
 
 647         if (!kcopyd_clients) {
 
 649                 destroy_workqueue(_kcopyd_wq);
 
 652         mutex_unlock(&kcopyd_init_lock);
 
 655 int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
 
 658         struct kcopyd_client *kc;
 
 664         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
 
 670         spin_lock_init(&kc->lock);
 
 672         kc->nr_pages = kc->nr_free_pages = 0;
 
 673         r = client_alloc_pages(kc, nr_pages);
 
 680         kc->io_client = dm_io_client_create(nr_pages);
 
 681         if (IS_ERR(kc->io_client)) {
 
 682                 r = PTR_ERR(kc->io_client);
 
 683                 client_free_pages(kc);
 
 689         init_waitqueue_head(&kc->destroyq);
 
 690         atomic_set(&kc->nr_jobs, 0);
 
 697 void kcopyd_client_destroy(struct kcopyd_client *kc)
 
 699         /* Wait for completion of all jobs submitted by this client. */
 
 700         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
 
 702         dm_io_client_destroy(kc->io_client);
 
 703         client_free_pages(kc);
 
 709 EXPORT_SYMBOL(kcopyd_client_create);
 
 710 EXPORT_SYMBOL(kcopyd_client_destroy);
 
 711 EXPORT_SYMBOL(kcopyd_copy);