kcopyd: a kernel daemon for copying parts of a disk from one place to another. Used in the snapshot and mirror targets. --- diff/drivers/md/Makefile 2003-12-09 11:19:28.000000000 +0000 +++ source/drivers/md/Makefile 2003-12-09 11:15:19.000000000 +0000 @@ -4,12 +4,13 @@ O_TARGET := mddev.o -export-objs := md.o xor.o dm-table.o dm-target.o dm.o dm-daemon.o +export-objs := md.o xor.o dm-table.o dm-target.o dm.o dm-daemon.o \ + kcopyd.o dm-io.o list-multi := lvm-mod.o dm-mod.o dm-mirror-mod.o lvm-mod-objs := lvm.o lvm-snap.o lvm-fs.o dm-mod-objs := dm.o dm-table.o dm-target.o dm-linear.o dm-stripe.o \ - dm-ioctl.o dm-daemon.o + dm-ioctl.o dm-daemon.o kcopyd.o dm-io.o # Note: link order is important. All raid personalities # and xor.o must come before md.o, as they each initialise --- diff/drivers/md/dm.c 2003-12-09 11:19:29.000000000 +0000 +++ source/drivers/md/dm.c 2003-12-09 11:15:04.000000000 +0000 @@ -5,6 +5,7 @@ */ #include "dm.h" +#include "kcopyd.h" #include #include @@ -367,6 +368,7 @@ xx(dm_linear) xx(dm_stripe) xx(dm_interface) + xx(kcopyd) #undef xx }; --- diff/drivers/md/dm-io.c 1970-01-01 01:00:00.000000000 +0100 +++ source/drivers/md/dm-io.c 2003-12-09 11:13:16.000000000 +0000 @@ -0,0 +1,361 @@ +/* + * Copyright (C) 2003 Sistina Software + * + * This file is released under the GPL. + */ + +#include "dm-io.h" + +#include +#include +#include +#include +#include + +/* FIXME: can we shrink this ? */ +struct io_context { + int rw; + unsigned int error; + atomic_t count; + struct task_struct *sleeper; + io_notify_fn callback; + void *context; +}; + +/* + * We maintain a pool of buffer heads for dispatching the io. + */ +static unsigned int _num_bhs; +static mempool_t *_buffer_pool; + +/* + * io contexts are only dynamically allocated for asynchronous + * io. Since async io is likely to be the majority of io we'll + * have the same number of io contexts as buffer heads ! (FIXME: + * must reduce this). + */ +mempool_t *_io_pool; + +static void *alloc_bh(int gfp_mask, void *pool_data) +{ + struct buffer_head *bh; + + bh = kmem_cache_alloc(bh_cachep, gfp_mask); + if (bh) { + bh->b_reqnext = NULL; + init_waitqueue_head(&bh->b_wait); + INIT_LIST_HEAD(&bh->b_inode_buffers); + } + + return bh; +} + +static void *alloc_io(int gfp_mask, void *pool_data) +{ + return kmalloc(sizeof(struct io_context), gfp_mask); +} + +static void free_io(void *element, void *pool_data) +{ + kfree(element); +} + +static unsigned int pages_to_buffers(unsigned int pages) +{ + return 4 * pages; /* too many ? */ +} + +static int resize_pool(unsigned int new_bhs) +{ + int r = 0; + + if (_buffer_pool) { + if (new_bhs == 0) { + /* free off the pools */ + mempool_destroy(_buffer_pool); + mempool_destroy(_io_pool); + _buffer_pool = _io_pool = NULL; + } else { + /* resize the pools */ + r = mempool_resize(_buffer_pool, new_bhs, GFP_KERNEL); + if (!r) + r = mempool_resize(_io_pool, + new_bhs, GFP_KERNEL); + } + } else { + /* create new pools */ + _buffer_pool = mempool_create(new_bhs, alloc_bh, + mempool_free_slab, bh_cachep); + if (!_buffer_pool) + r = -ENOMEM; + + _io_pool = mempool_create(new_bhs, alloc_io, free_io, NULL); + if (!_io_pool) { + mempool_destroy(_buffer_pool); + _buffer_pool = NULL; + r = -ENOMEM; + } + } + + if (!r) + _num_bhs = new_bhs; + + return r; +} + +int dm_io_get(unsigned int num_pages) +{ + return resize_pool(_num_bhs + pages_to_buffers(num_pages)); +} + +void dm_io_put(unsigned int num_pages) +{ + resize_pool(_num_bhs - pages_to_buffers(num_pages)); +} + +/*----------------------------------------------------------------- + * We need to keep track of which region a buffer is doing io + * for. In order to save a memory allocation we store this in an + * unused field of the buffer head, and provide these access + * functions. + * + * FIXME: add compile time check that an unsigned int can fit + * into a pointer. + * + *---------------------------------------------------------------*/ +static inline void bh_set_region(struct buffer_head *bh, unsigned int region) +{ + bh->b_journal_head = (void *) region; +} + +static inline int bh_get_region(struct buffer_head *bh) +{ + return (unsigned int) bh->b_journal_head; +} + +/*----------------------------------------------------------------- + * We need an io object to keep track of the number of bhs that + * have been dispatched for a particular io. + *---------------------------------------------------------------*/ +static void dec_count(struct io_context *io, unsigned int region, int error) +{ + if (error) + set_bit(region, &io->error); + + if (atomic_dec_and_test(&io->count)) { + if (io->sleeper) + wake_up_process(io->sleeper); + + else { + int r = io->error; + io_notify_fn fn = io->callback; + void *context = io->context; + + mempool_free(io, _io_pool); + fn(r, context); + } + } +} + +static void endio(struct buffer_head *bh, int uptodate) +{ + struct io_context *io = (struct io_context *) bh->b_private; + + if (!uptodate && io->rw != WRITE) { + /* + * We need to zero this region, otherwise people + * like kcopyd may write the arbitrary contents + * of the page. + */ + memset(bh->b_data, 0, bh->b_size); + } + + dec_count((struct io_context *) bh->b_private, + bh_get_region(bh), !uptodate); + mempool_free(bh, _buffer_pool); +} + +/* + * Primitives for alignment calculations. + */ +int fls(unsigned n) +{ + return generic_fls32(n); +} + +static inline int log2_floor(unsigned n) +{ + return ffs(n) - 1; +} + +static inline int log2_align(unsigned n) +{ + return fls(n) - 1; +} + +/* + * Returns the next block for io. + */ +static int do_page(kdev_t dev, sector_t *block, sector_t end_block, + unsigned int block_size, + struct page *p, unsigned int offset, + unsigned int region, struct io_context *io) +{ + struct buffer_head *bh; + sector_t b = *block; + sector_t blocks_per_page = PAGE_SIZE / block_size; + unsigned int this_size; /* holds the size of the current io */ + sector_t len; + + if (!blocks_per_page) { + DMERR("dm-io: PAGE_SIZE (%lu) < block_size (%u) unsupported", + PAGE_SIZE, block_size); + return 0; + } + + while ((offset < PAGE_SIZE) && (b != end_block)) { + bh = mempool_alloc(_buffer_pool, GFP_NOIO); + init_buffer(bh, endio, io); + bh_set_region(bh, region); + + /* + * Block size must be a power of 2 and aligned + * correctly. + */ + + len = min(end_block - b, blocks_per_page); + len = min(len, blocks_per_page - offset / block_size); + + if (!len) { + DMERR("dm-io: Invalid offset/block_size (%u/%u).", + offset, block_size); + return 0; + } + + this_size = 1 << log2_align(len); + if (b) + this_size = min(this_size, + (unsigned) 1 << log2_floor(b)); + + /* + * Add in the job offset. + */ + bh->b_blocknr = (b / this_size); + bh->b_size = block_size * this_size; + set_bh_page(bh, p, offset); + bh->b_this_page = bh; + + bh->b_dev = dev; + atomic_set(&bh->b_count, 1); + + bh->b_state = ((1 << BH_Uptodate) | (1 << BH_Mapped) | + (1 << BH_Lock)); + + if (io->rw == WRITE) + clear_bit(BH_Dirty, &bh->b_state); + + atomic_inc(&io->count); + submit_bh(io->rw, bh); + + b += this_size; + offset += block_size * this_size; + } + + *block = b; + return (b == end_block); +} + +static void do_region(unsigned int region, struct io_region *where, + struct page *page, unsigned int offset, + struct io_context *io) +{ + unsigned int block_size = get_hardsect_size(where->dev); + unsigned int sblock_size = block_size >> 9; + sector_t block = where->sector / sblock_size; + sector_t end_block = (where->sector + where->count) / sblock_size; + + while (1) { + if (do_page(where->dev, &block, end_block, block_size, + page, offset, region, io)) + break; + + offset = 0; /* only offset the first page */ + + page = list_entry(page->list.next, struct page, list); + } +} + +static void dispatch_io(unsigned int num_regions, struct io_region *where, + struct page *pages, unsigned int offset, + struct io_context *io) +{ + int i; + + for (i = 0; i < num_regions; i++) + if (where[i].count) + do_region(i, where + i, pages, offset, io); + + /* + * Drop the extra refence that we were holding to avoid + * the io being completed too early. + */ + dec_count(io, 0, 0); +} + +/* + * Synchronous io + */ +int dm_io_sync(unsigned int num_regions, struct io_region *where, + int rw, struct page *pages, unsigned int offset, + unsigned int *error_bits) +{ + struct io_context io; + + BUG_ON(num_regions > 1 && rw != WRITE); + + io.rw = rw; + io.error = 0; + atomic_set(&io.count, 1); /* see dispatch_io() */ + io.sleeper = current; + + dispatch_io(num_regions, where, pages, offset, &io); + run_task_queue(&tq_disk); + + while (1) { + set_current_state(TASK_UNINTERRUPTIBLE); + + if (!atomic_read(&io.count)) + break; + + schedule(); + } + set_current_state(TASK_RUNNING); + + *error_bits = io.error; + return io.error ? -EIO : 0; +} + +/* + * Asynchronous io + */ +int dm_io_async(unsigned int num_regions, struct io_region *where, int rw, + struct page *pages, unsigned int offset, + io_notify_fn fn, void *context) +{ + struct io_context *io = mempool_alloc(_io_pool, GFP_NOIO); + + io->rw = rw; + io->error = 0; + atomic_set(&io->count, 1); /* see dispatch_io() */ + io->sleeper = NULL; + io->callback = fn; + io->context = context; + + dispatch_io(num_regions, where, pages, offset, io); + return 0; +} + +EXPORT_SYMBOL(dm_io_get); +EXPORT_SYMBOL(dm_io_put); +EXPORT_SYMBOL(dm_io_sync); +EXPORT_SYMBOL(dm_io_async); --- diff/drivers/md/dm-io.h 1970-01-01 01:00:00.000000000 +0100 +++ source/drivers/md/dm-io.h 2003-12-09 11:13:16.000000000 +0000 @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2003 Sistina Software + * + * This file is released under the GPL. + */ + +#ifndef _DM_IO_H +#define _DM_IO_H + +#include "dm.h" + +#include + +/* Move these to bitops.h eventually */ +/* Improved generic_fls algorithm (in 2.4 there is no generic_fls so far) */ +/* (c) 2002, D.Phillips and Sistina Software */ +/* Licensed under Version 2 of the GPL */ + +static unsigned generic_fls8(unsigned n) +{ + return n & 0xf0 ? + n & 0xc0 ? (n >> 7) + 7 : (n >> 5) + 5: + n & 0x0c ? (n >> 3) + 3 : n - ((n + 1) >> 2); +} + +static inline unsigned generic_fls16(unsigned n) +{ + return n & 0xff00? generic_fls8(n >> 8) + 8 : generic_fls8(n); +} + +static inline unsigned generic_fls32(unsigned n) +{ + return n & 0xffff0000 ? generic_fls16(n >> 16) + 16 : generic_fls16(n); +} + +/* FIXME make this configurable */ +#define DM_MAX_IO_REGIONS 8 + +struct io_region { + kdev_t dev; + sector_t sector; + sector_t count; +}; + + +/* + * 'error' is a bitset, with each bit indicating whether an error + * occurred doing io to the corresponding region. + */ +typedef void (*io_notify_fn)(unsigned int error, void *context); + + +/* + * Before anyone uses the IO interface they should call + * dm_io_get(), specifying roughly how many pages they are + * expecting to perform io on concurrently. + * + * This function may block. + */ +int dm_io_get(unsigned int num_pages); +void dm_io_put(unsigned int num_pages); + + +/* + * Synchronous IO. + * + * Please ensure that the rw flag in the next two functions is + * either READ or WRITE, ie. we don't take READA. Any + * regions with a zero count field will be ignored. + */ +int dm_io_sync(unsigned int num_regions, struct io_region *where, int rw, + struct page *pages, unsigned int offset, + unsigned int *error_bits); + + +/* + * Aynchronous IO. + * + * The 'where' array may be safely allocated on the stack since + * the function takes a copy. + */ +int dm_io_async(unsigned int num_regions, struct io_region *where, int rw, + struct page *pages, unsigned int offset, + io_notify_fn fn, void *context); + +#endif --- diff/drivers/md/kcopyd.c 1970-01-01 01:00:00.000000000 +0100 +++ source/drivers/md/kcopyd.c 2003-12-09 11:13:16.000000000 +0000 @@ -0,0 +1,666 @@ +/* + * Copyright (C) 2002 Sistina Software (UK) Limited. + * + * This file is released under the GPL. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kcopyd.h" +#include "dm-daemon.h" + +/* FIXME: this is only needed for the DMERR macros */ +#include "dm.h" + +static struct dm_daemon _kcopyd; + +#define SECTORS_PER_PAGE (PAGE_SIZE / SECTOR_SIZE) +#define SUB_JOB_SIZE 128 +#define PAGES_PER_SUB_JOB (SUB_JOB_SIZE / SECTORS_PER_PAGE) +#define SUB_JOB_COUNT 8 + +/*----------------------------------------------------------------- + * Each kcopyd client has its own little pool of preallocated + * pages for kcopyd io. + *---------------------------------------------------------------*/ +struct kcopyd_client { + struct list_head list; + + spinlock_t lock; + struct list_head pages; + unsigned int nr_pages; + unsigned int nr_free_pages; + unsigned int max_split; +}; + +static inline void __push_page(struct kcopyd_client *kc, struct page *p) +{ + list_add(&p->list, &kc->pages); + kc->nr_free_pages++; +} + +static inline struct page *__pop_page(struct kcopyd_client *kc) +{ + struct page *p; + + p = list_entry(kc->pages.next, struct page, list); + list_del(&p->list); + kc->nr_free_pages--; + + return p; +} + +static int kcopyd_get_pages(struct kcopyd_client *kc, + unsigned int nr, struct list_head *pages) +{ + struct page *p; + INIT_LIST_HEAD(pages); + + spin_lock(&kc->lock); + if (kc->nr_free_pages < nr) { + spin_unlock(&kc->lock); + return -ENOMEM; + } + + while (nr--) { + p = __pop_page(kc); + list_add(&p->list, pages); + } + spin_unlock(&kc->lock); + + return 0; +} + +static void kcopyd_put_pages(struct kcopyd_client *kc, struct list_head *pages) +{ + struct list_head *tmp, *tmp2; + + spin_lock(&kc->lock); + list_for_each_safe (tmp, tmp2, pages) + __push_page(kc, list_entry(tmp, struct page, list)); + spin_unlock(&kc->lock); +} + +/* + * These three functions resize the page pool. + */ +static void release_pages(struct list_head *pages) +{ + struct page *p; + struct list_head *tmp, *tmp2; + + list_for_each_safe (tmp, tmp2, pages) { + p = list_entry(tmp, struct page, list); + UnlockPage(p); + __free_page(p); + } +} + +static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr) +{ + unsigned int i; + struct page *p; + LIST_HEAD(new); + + for (i = 0; i < nr; i++) { + p = alloc_page(GFP_KERNEL); + if (!p) { + release_pages(&new); + return -ENOMEM; + } + + LockPage(p); + list_add(&p->list, &new); + } + + kcopyd_put_pages(kc, &new); + kc->nr_pages += nr; + kc->max_split = kc->nr_pages / PAGES_PER_SUB_JOB; + if (kc->max_split > SUB_JOB_COUNT) + kc->max_split = SUB_JOB_COUNT; + + return 0; +} + +static void client_free_pages(struct kcopyd_client *kc) +{ + BUG_ON(kc->nr_free_pages != kc->nr_pages); + release_pages(&kc->pages); + kc->nr_free_pages = kc->nr_pages = 0; +} + +/*----------------------------------------------------------------- + * kcopyd_jobs need to be allocated by the *clients* of kcopyd, + * for this reason we use a mempool to prevent the client from + * ever having to do io (which could cause a deadlock). + *---------------------------------------------------------------*/ +struct kcopyd_job { + struct kcopyd_client *kc; + struct list_head list; + unsigned int flags; + + /* + * Error state of the job. + */ + int read_err; + unsigned int write_err; + + /* + * Either READ or WRITE + */ + int rw; + struct io_region source; + + /* + * The destinations for the transfer. + */ + unsigned int num_dests; + struct io_region dests[KCOPYD_MAX_REGIONS]; + + sector_t offset; + unsigned int nr_pages; + struct list_head pages; + + /* + * Set this to ensure you are notified when the job has + * completed. 'context' is for callback to use. + */ + kcopyd_notify_fn fn; + void *context; + + /* + * These fields are only used if the job has been split + * into more manageable parts. + */ + struct semaphore lock; + atomic_t sub_jobs; + sector_t progress; +}; + +/* FIXME: this should scale with the number of pages */ +#define MIN_JOBS 512 + +static kmem_cache_t *_job_cache; +static mempool_t *_job_pool; + +/* + * We maintain three lists of jobs: + * + * i) jobs waiting for pages + * ii) jobs that have pages, and are waiting for the io to be issued. + * iii) jobs that have completed. + * + * All three of these are protected by job_lock. + */ +static spinlock_t _job_lock = SPIN_LOCK_UNLOCKED; + +static LIST_HEAD(_complete_jobs); +static LIST_HEAD(_io_jobs); +static LIST_HEAD(_pages_jobs); + +static int jobs_init(void) +{ + INIT_LIST_HEAD(&_complete_jobs); + INIT_LIST_HEAD(&_io_jobs); + INIT_LIST_HEAD(&_pages_jobs); + + _job_cache = kmem_cache_create("kcopyd-jobs", + sizeof(struct kcopyd_job), + __alignof__(struct kcopyd_job), + 0, NULL, NULL); + if (!_job_cache) + return -ENOMEM; + + _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab, + mempool_free_slab, _job_cache); + if (!_job_pool) { + kmem_cache_destroy(_job_cache); + return -ENOMEM; + } + + return 0; +} + +static void jobs_exit(void) +{ + BUG_ON(!list_empty(&_complete_jobs)); + BUG_ON(!list_empty(&_io_jobs)); + BUG_ON(!list_empty(&_pages_jobs)); + + mempool_destroy(_job_pool); + kmem_cache_destroy(_job_cache); +} + +/* + * Functions to push and pop a job onto the head of a given job + * list. + */ +static inline struct kcopyd_job *pop(struct list_head *jobs) +{ + struct kcopyd_job *job = NULL; + unsigned long flags; + + spin_lock_irqsave(&_job_lock, flags); + + if (!list_empty(jobs)) { + job = list_entry(jobs->next, struct kcopyd_job, list); + list_del(&job->list); + } + spin_unlock_irqrestore(&_job_lock, flags); + + return job; +} + +static inline void push(struct list_head *jobs, struct kcopyd_job *job) +{ + unsigned long flags; + + spin_lock_irqsave(&_job_lock, flags); + list_add_tail(&job->list, jobs); + spin_unlock_irqrestore(&_job_lock, flags); +} + +/* + * These three functions process 1 item from the corresponding + * job list. + * + * They return: + * < 0: error + * 0: success + * > 0: can't process yet. + */ +static int run_complete_job(struct kcopyd_job *job) +{ + void *context = job->context; + int read_err = job->read_err; + unsigned int write_err = job->write_err; + kcopyd_notify_fn fn = job->fn; + + kcopyd_put_pages(job->kc, &job->pages); + mempool_free(job, _job_pool); + fn(read_err, write_err, context); + return 0; +} + +static void complete_io(unsigned int error, void *context) +{ + struct kcopyd_job *job = (struct kcopyd_job *) context; + + if (error) { + if (job->rw == WRITE) + job->write_err &= error; + else + job->read_err = 1; + + if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { + push(&_complete_jobs, job); + dm_daemon_wake(&_kcopyd); + return; + } + } + + if (job->rw == WRITE) + push(&_complete_jobs, job); + + else { + job->rw = WRITE; + push(&_io_jobs, job); + } + + dm_daemon_wake(&_kcopyd); +} + +/* + * Request io on as many buffer heads as we can currently get for + * a particular job. + */ +static int run_io_job(struct kcopyd_job *job) +{ + int r; + + if (job->rw == READ) + r = dm_io_async(1, &job->source, job->rw, + list_entry(job->pages.next, struct page, list), + job->offset, complete_io, job); + + else + r = dm_io_async(job->num_dests, job->dests, job->rw, + list_entry(job->pages.next, struct page, list), + job->offset, complete_io, job); + + return r; +} + +static int run_pages_job(struct kcopyd_job *job) +{ + int r; + + job->nr_pages = dm_div_up(job->dests[0].count + job->offset, + SECTORS_PER_PAGE); + r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); + if (!r) { + /* this job is ready for io */ + push(&_io_jobs, job); + return 0; + } + + if (r == -ENOMEM) + /* can't complete now */ + return 1; + + return r; +} + +/* + * Run through a list for as long as possible. Returns the count + * of successful jobs. + */ +static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) +{ + struct kcopyd_job *job; + int r, count = 0; + + while ((job = pop(jobs))) { + + r = fn(job); + + if (r < 0) { + /* error this rogue job */ + if (job->rw == WRITE) + job->write_err = (unsigned int) -1; + else + job->read_err = 1; + push(&_complete_jobs, job); + break; + } + + if (r > 0) { + /* + * We couldn't service this job ATM, so + * push this job back onto the list. + */ + push(jobs, job); + break; + } + + count++; + } + + return count; +} + +/* + * kcopyd does this every time it's woken up. + */ +static void do_work(void) +{ + /* + * The order that these are called is *very* important. + * complete jobs can free some pages for pages jobs. + * Pages jobs when successful will jump onto the io jobs + * list. io jobs call wake when they complete and it all + * starts again. + */ + process_jobs(&_complete_jobs, run_complete_job); + process_jobs(&_pages_jobs, run_pages_job); + process_jobs(&_io_jobs, run_io_job); + run_task_queue(&tq_disk); +} + +/* + * If we are copying a small region we just dispatch a single job + * to do the copy, otherwise the io has to be split up into many + * jobs. + */ +static void dispatch_job(struct kcopyd_job *job) +{ + push(&_pages_jobs, job); + dm_daemon_wake(&_kcopyd); +} + +static void segment_complete(int read_err, + unsigned int write_err, void *context) +{ + /* FIXME: tidy this function */ + sector_t progress = 0; + sector_t count = 0; + struct kcopyd_job *job = (struct kcopyd_job *) context; + + down(&job->lock); + + /* update the error */ + if (read_err) + job->read_err = 1; + + if (write_err) + job->write_err &= write_err; + + /* + * Only dispatch more work if there hasn't been an error. + */ + if ((!job->read_err && !job->write_err) || + test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { + /* get the next chunk of work */ + progress = job->progress; + count = job->source.count - progress; + if (count) { + if (count > SUB_JOB_SIZE) + count = SUB_JOB_SIZE; + + job->progress += count; + } + } + up(&job->lock); + + if (count) { + int i; + struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO); + + memcpy(sub_job, job, sizeof(*job)); + sub_job->source.sector += progress; + sub_job->source.count = count; + + for (i = 0; i < job->num_dests; i++) { + sub_job->dests[i].sector += progress; + sub_job->dests[i].count = count; + } + + sub_job->fn = segment_complete; + sub_job->context = job; + dispatch_job(sub_job); + + } else if (atomic_dec_and_test(&job->sub_jobs)) { + + /* + * To avoid a race we must keep the job around + * until after the notify function has completed. + * Otherwise the client may try and stop the job + * after we've completed. + */ + job->fn(read_err, write_err, job->context); + mempool_free(job, _job_pool); + } +} + +/* + * Create some little jobs that will do the move between + * them. + */ +static void split_job(struct kcopyd_job *job) +{ + int nr; + + nr = dm_div_up(job->source.count, SUB_JOB_SIZE); + if (nr > job->kc->max_split) + nr = job->kc->max_split; + + atomic_set(&job->sub_jobs, nr); + while (nr--) + segment_complete(0, 0u, job); +} + +int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, + unsigned int num_dests, struct io_region *dests, + unsigned int flags, kcopyd_notify_fn fn, void *context) +{ + struct kcopyd_job *job; + + /* + * Allocate a new job. + */ + job = mempool_alloc(_job_pool, GFP_NOIO); + + /* + * set up for the read. + */ + job->kc = kc; + job->flags = flags; + job->read_err = 0; + job->write_err = 0; + job->rw = READ; + + memcpy(&job->source, from, sizeof(*from)); + + job->num_dests = num_dests; + memcpy(&job->dests, dests, sizeof(*dests) * num_dests); + + job->offset = 0; + job->nr_pages = 0; + INIT_LIST_HEAD(&job->pages); + + job->fn = fn; + job->context = context; + + if (job->source.count < SUB_JOB_SIZE) + dispatch_job(job); + + else { + init_MUTEX(&job->lock); + job->progress = 0; + split_job(job); + } + + return 0; +} + +/* + * Cancels a kcopyd job, eg. someone might be deactivating a + * mirror. + */ +int kcopyd_cancel(struct kcopyd_job *job, int block) +{ + /* FIXME: finish */ + return -1; +} + +/*----------------------------------------------------------------- + * Unit setup + *---------------------------------------------------------------*/ +static DECLARE_MUTEX(_client_lock); +static LIST_HEAD(_clients); + +static int client_add(struct kcopyd_client *kc) +{ + down(&_client_lock); + list_add(&kc->list, &_clients); + up(&_client_lock); + return 0; +} + +static void client_del(struct kcopyd_client *kc) +{ + down(&_client_lock); + list_del(&kc->list); + up(&_client_lock); +} + +int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result) +{ + int r = 0; + struct kcopyd_client *kc; + + if (nr_pages * SECTORS_PER_PAGE < SUB_JOB_SIZE) { + DMERR("kcopyd client requested %u pages: minimum is %lu", + nr_pages, SUB_JOB_SIZE / SECTORS_PER_PAGE); + return -ENOMEM; + } + + kc = kmalloc(sizeof(*kc), GFP_KERNEL); + if (!kc) + return -ENOMEM; + + kc->lock = SPIN_LOCK_UNLOCKED; + INIT_LIST_HEAD(&kc->pages); + kc->nr_pages = kc->nr_free_pages = 0; + r = client_alloc_pages(kc, nr_pages); + if (r) { + kfree(kc); + return r; + } + + r = dm_io_get(nr_pages); + if (r) { + client_free_pages(kc); + kfree(kc); + return r; + } + + r = client_add(kc); + if (r) { + dm_io_put(nr_pages); + client_free_pages(kc); + kfree(kc); + return r; + } + + *result = kc; + return 0; +} + +void kcopyd_client_destroy(struct kcopyd_client *kc) +{ + dm_io_put(kc->nr_pages); + client_free_pages(kc); + client_del(kc); + kfree(kc); +} + + +int __init kcopyd_init(void) +{ + int r; + + r = jobs_init(); + if (r) + return r; + + r = dm_daemon_start(&_kcopyd, "kcopyd", do_work); + if (r) + jobs_exit(); + + return r; +} + +void kcopyd_exit(void) +{ + jobs_exit(); + dm_daemon_stop(&_kcopyd); +} + +EXPORT_SYMBOL(kcopyd_client_create); +EXPORT_SYMBOL(kcopyd_client_destroy); +EXPORT_SYMBOL(kcopyd_copy); +EXPORT_SYMBOL(kcopyd_cancel); --- diff/drivers/md/kcopyd.h 1970-01-01 01:00:00.000000000 +0100 +++ source/drivers/md/kcopyd.h 2003-12-09 11:13:16.000000000 +0000 @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2001 Sistina Software + * + * This file is released under the GPL. + */ + +#ifndef DM_KCOPYD_H +#define DM_KCOPYD_H + +/* + * Needed for the definition of offset_t. + */ +#include +#include + +#include "dm-io.h" + +int kcopyd_init(void); +void kcopyd_exit(void); + +/* FIXME: make this configurable */ +#define KCOPYD_MAX_REGIONS 8 + +#define KCOPYD_IGNORE_ERROR 1 + +/* + * To use kcopyd you must first create a kcopyd client object. + */ +struct kcopyd_client; +int kcopyd_client_create(unsigned int num_pages, struct kcopyd_client **result); +void kcopyd_client_destroy(struct kcopyd_client *kc); + +/* + * Submit a copy job to kcopyd. This is built on top of the + * previous three fns. + * + * read_err is a boolean, + * write_err is a bitset, with 1 bit for each destination region + */ +typedef void (*kcopyd_notify_fn)(int read_err, + unsigned int write_err, void *context); + +int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, + unsigned int num_dests, struct io_region *dests, + unsigned int flags, kcopyd_notify_fn fn, void *context); + +#endif