kcopyd --- diff/drivers/md/Makefile 2004-02-23 14:06:37.000000000 +0000 +++ source/drivers/md/Makefile 2004-02-23 14:06:44.000000000 +0000 @@ -3,7 +3,7 @@ # dm-mod-objs := dm.o dm-table.o dm-target.o dm-linear.o dm-stripe.o \ - dm-ioctl.o dm-io.o + dm-ioctl.o dm-io.o kcopyd.o dm-multipath-objs := dm-path-selector.o dm-mpath.o raid6-objs := raid6main.o raid6algos.o raid6recov.o raid6tables.o \ raid6int1.o raid6int2.o raid6int4.o \ --- diff/drivers/md/dm.c 2004-02-23 13:56:43.000000000 +0000 +++ source/drivers/md/dm.c 2004-02-23 14:06:44.000000000 +0000 @@ -150,6 +150,7 @@ static struct { xx(dm_target) xx(dm_linear) xx(dm_stripe) + xx(kcopyd) xx(dm_interface) #undef xx }; --- diff/drivers/md/dm.h 2004-02-23 13:56:43.000000000 +0000 +++ source/drivers/md/dm.h 2004-02-23 14:06:44.000000000 +0000 @@ -179,6 +179,9 @@ void dm_linear_exit(void); int dm_stripe_init(void); void dm_stripe_exit(void); +int kcopyd_init(void); +void kcopyd_exit(void); + void *dm_vcalloc(unsigned long nmemb, unsigned long elem_size); #endif --- diff/drivers/md/kcopyd.c 1970-01-01 01:00:00.000000000 +0100 +++ source/drivers/md/kcopyd.c 2004-02-23 14:06:44.000000000 +0000 @@ -0,0 +1,654 @@ +/* + * Copyright (C) 2002 Sistina Software (UK) Limited. + * + * This file is released under the GPL. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kcopyd.h" + +/* FIXME: this is only needed for the DMERR macros */ +#include "dm.h" + +static struct workqueue_struct *_kcopyd_wq; +static struct work_struct _kcopyd_work; + +static inline void wake(void) +{ + queue_work(_kcopyd_wq, &_kcopyd_work); +} + +/*----------------------------------------------------------------- + * Each kcopyd client has its own little pool of preallocated + * pages for kcopyd io. + *---------------------------------------------------------------*/ +struct kcopyd_client { + struct list_head list; + + spinlock_t lock; + struct list_head pages; + unsigned int nr_pages; + unsigned int nr_free_pages; +}; + +static inline void __push_page(struct kcopyd_client *kc, struct page *p) +{ + list_add(&p->list, &kc->pages); + kc->nr_free_pages++; +} + +static inline struct page *__pop_page(struct kcopyd_client *kc) +{ + struct page *p; + + p = list_entry(kc->pages.next, struct page, list); + list_del(&p->list); + kc->nr_free_pages--; + + return p; +} + +static int kcopyd_get_pages(struct kcopyd_client *kc, + unsigned int nr, struct list_head *pages) +{ + struct page *p; + INIT_LIST_HEAD(pages); + + spin_lock(&kc->lock); + if (kc->nr_free_pages < nr) { + spin_unlock(&kc->lock); + return -ENOMEM; + } + + while (nr--) { + p = __pop_page(kc); + list_add(&p->list, pages); + } + spin_unlock(&kc->lock); + + return 0; +} + +static void kcopyd_put_pages(struct kcopyd_client *kc, struct list_head *pages) +{ + struct page *page, *next; + + spin_lock(&kc->lock); + list_for_each_entry_safe (page, next, pages, list) + __push_page(kc, page); + spin_unlock(&kc->lock); +} + +/* + * These three functions resize the page pool. + */ +static void drop_pages(struct list_head *pages) +{ + struct page *page, *next; + + list_for_each_entry_safe (page, next, pages, list) { + ClearPageLocked(page); + __free_page(page); + } +} + +static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr) +{ + unsigned int i; + struct page *p; + LIST_HEAD(new); + + for (i = 0; i < nr; i++) { + p = alloc_page(GFP_KERNEL); + if (!p) { + drop_pages(&new); + return -ENOMEM; + } + + SetPageLocked(p); + list_add(&p->list, &new); + } + + kcopyd_put_pages(kc, &new); + kc->nr_pages += nr; + return 0; +} + +static void client_free_pages(struct kcopyd_client *kc) +{ + BUG_ON(kc->nr_free_pages != kc->nr_pages); + drop_pages(&kc->pages); + kc->nr_free_pages = kc->nr_pages = 0; +} + +/*----------------------------------------------------------------- + * kcopyd_jobs need to be allocated by the *clients* of kcopyd, + * for this reason we use a mempool to prevent the client from + * ever having to do io (which could cause a deadlock). + *---------------------------------------------------------------*/ +struct kcopyd_job { + struct kcopyd_client *kc; + struct list_head list; + unsigned long flags; + + /* + * Error state of the job. + */ + int read_err; + unsigned int write_err; + + /* + * Either READ or WRITE + */ + int rw; + struct io_region source; + + /* + * The destinations for the transfer. + */ + unsigned int num_dests; + struct io_region dests[KCOPYD_MAX_REGIONS]; + + sector_t offset; + unsigned int nr_pages; + struct list_head pages; + + /* + * Set this to ensure you are notified when the job has + * completed. 'context' is for callback to use. + */ + kcopyd_notify_fn fn; + void *context; + + /* + * These fields are only used if the job has been split + * into more manageable parts. + */ + struct semaphore lock; + atomic_t sub_jobs; + sector_t progress; +}; + +/* FIXME: this should scale with the number of pages */ +#define MIN_JOBS 512 + +static kmem_cache_t *_job_cache; +static mempool_t *_job_pool; + +/* + * We maintain three lists of jobs: + * + * i) jobs waiting for pages + * ii) jobs that have pages, and are waiting for the io to be issued. + * iii) jobs that have completed. + * + * All three of these are protected by job_lock. + */ +static spinlock_t _job_lock = SPIN_LOCK_UNLOCKED; + +static LIST_HEAD(_complete_jobs); +static LIST_HEAD(_io_jobs); +static LIST_HEAD(_pages_jobs); + +static int jobs_init(void) +{ + INIT_LIST_HEAD(&_complete_jobs); + INIT_LIST_HEAD(&_io_jobs); + INIT_LIST_HEAD(&_pages_jobs); + + _job_cache = kmem_cache_create("kcopyd-jobs", + sizeof(struct kcopyd_job), + __alignof__(struct kcopyd_job), + 0, NULL, NULL); + if (!_job_cache) + return -ENOMEM; + + _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab, + mempool_free_slab, _job_cache); + if (!_job_pool) { + kmem_cache_destroy(_job_cache); + return -ENOMEM; + } + + return 0; +} + +static void jobs_exit(void) +{ + BUG_ON(!list_empty(&_complete_jobs)); + BUG_ON(!list_empty(&_io_jobs)); + BUG_ON(!list_empty(&_pages_jobs)); + + mempool_destroy(_job_pool); + kmem_cache_destroy(_job_cache); +} + +/* + * Functions to push and pop a job onto the head of a given job + * list. + */ +static inline struct kcopyd_job *pop(struct list_head *jobs) +{ + struct kcopyd_job *job = NULL; + unsigned long flags; + + spin_lock_irqsave(&_job_lock, flags); + + if (!list_empty(jobs)) { + job = list_entry(jobs->next, struct kcopyd_job, list); + list_del(&job->list); + } + spin_unlock_irqrestore(&_job_lock, flags); + + return job; +} + +static inline void push(struct list_head *jobs, struct kcopyd_job *job) +{ + unsigned long flags; + + spin_lock_irqsave(&_job_lock, flags); + list_add_tail(&job->list, jobs); + spin_unlock_irqrestore(&_job_lock, flags); +} + +/* + * These three functions process 1 item from the corresponding + * job list. + * + * They return: + * < 0: error + * 0: success + * > 0: can't process yet. + */ +static int run_complete_job(struct kcopyd_job *job) +{ + void *context = job->context; + int read_err = job->read_err; + unsigned int write_err = job->write_err; + kcopyd_notify_fn fn = job->fn; + + kcopyd_put_pages(job->kc, &job->pages); + mempool_free(job, _job_pool); + fn(read_err, write_err, context); + return 0; +} + +static void complete_io(unsigned long error, void *context) +{ + struct kcopyd_job *job = (struct kcopyd_job *) context; + + if (error) { + if (job->rw == WRITE) + job->write_err &= error; + else + job->read_err = 1; + + if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { + push(&_complete_jobs, job); + wake(); + return; + } + } + + if (job->rw == WRITE) + push(&_complete_jobs, job); + + else { + job->rw = WRITE; + push(&_io_jobs, job); + } + + wake(); +} + +/* + * Request io on as many buffer heads as we can currently get for + * a particular job. + */ +static int run_io_job(struct kcopyd_job *job) +{ + int r; + + if (job->rw == READ) + r = dm_io_async(1, &job->source, job->rw, + list_entry(job->pages.next, struct page, list), + job->offset, complete_io, job); + + else + r = dm_io_async(job->num_dests, job->dests, job->rw, + list_entry(job->pages.next, struct page, list), + job->offset, complete_io, job); + + return r; +} + +static int run_pages_job(struct kcopyd_job *job) +{ + int r; + + job->nr_pages = dm_div_up(job->dests[0].count + job->offset, + PAGE_SIZE >> 9); + r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); + if (!r) { + /* this job is ready for io */ + push(&_io_jobs, job); + return 0; + } + + if (r == -ENOMEM) + /* can't complete now */ + return 1; + + return r; +} + +/* + * Run through a list for as long as possible. Returns the count + * of successful jobs. + */ +static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *)) +{ + struct kcopyd_job *job; + int r, count = 0; + + while ((job = pop(jobs))) { + + r = fn(job); + + if (r < 0) { + /* error this rogue job */ + if (job->rw == WRITE) + job->write_err = (unsigned int) -1; + else + job->read_err = 1; + push(&_complete_jobs, job); + break; + } + + if (r > 0) { + /* + * We couldn't service this job ATM, so + * push this job back onto the list. + */ + push(jobs, job); + break; + } + + count++; + } + + return count; +} + +/* + * kcopyd does this every time it's woken up. + */ +static void do_work(void *ignored) +{ + /* + * The order that these are called is *very* important. + * complete jobs can free some pages for pages jobs. + * Pages jobs when successful will jump onto the io jobs + * list. io jobs call wake when they complete and it all + * starts again. + */ + process_jobs(&_complete_jobs, run_complete_job); + process_jobs(&_pages_jobs, run_pages_job); + process_jobs(&_io_jobs, run_io_job); + + blk_run_queues(); +} + +/* + * If we are copying a small region we just dispatch a single job + * to do the copy, otherwise the io has to be split up into many + * jobs. + */ +static void dispatch_job(struct kcopyd_job *job) +{ + push(&_pages_jobs, job); + wake(); +} + +#define SUB_JOB_SIZE 128 +static void segment_complete(int read_err, + unsigned int write_err, void *context) +{ + /* FIXME: tidy this function */ + sector_t progress = 0; + sector_t count = 0; + struct kcopyd_job *job = (struct kcopyd_job *) context; + + down(&job->lock); + + /* update the error */ + if (read_err) + job->read_err = 1; + + if (write_err) + job->write_err &= write_err; + + /* + * Only dispatch more work if there hasn't been an error. + */ + if ((!job->read_err && !job->write_err) || + test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) { + /* get the next chunk of work */ + progress = job->progress; + count = job->source.count - progress; + if (count) { + if (count > SUB_JOB_SIZE) + count = SUB_JOB_SIZE; + + job->progress += count; + } + } + up(&job->lock); + + if (count) { + int i; + struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO); + + memcpy(sub_job, job, sizeof(*job)); + sub_job->source.sector += progress; + sub_job->source.count = count; + + for (i = 0; i < job->num_dests; i++) { + sub_job->dests[i].sector += progress; + sub_job->dests[i].count = count; + } + + sub_job->fn = segment_complete; + sub_job->context = job; + dispatch_job(sub_job); + + } else if (atomic_dec_and_test(&job->sub_jobs)) { + + /* + * To avoid a race we must keep the job around + * until after the notify function has completed. + * Otherwise the client may try and stop the job + * after we've completed. + */ + job->fn(read_err, write_err, job->context); + mempool_free(job, _job_pool); + } +} + +/* + * Create some little jobs that will do the move between + * them. + */ +#define SPLIT_COUNT 8 +static void split_job(struct kcopyd_job *job) +{ + int i; + + atomic_set(&job->sub_jobs, SPLIT_COUNT); + for (i = 0; i < SPLIT_COUNT; i++) + segment_complete(0, 0u, job); +} + +int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, + unsigned int num_dests, struct io_region *dests, + unsigned int flags, kcopyd_notify_fn fn, void *context) +{ + struct kcopyd_job *job; + + /* + * Allocate a new job. + */ + job = mempool_alloc(_job_pool, GFP_NOIO); + + /* + * set up for the read. + */ + job->kc = kc; + job->flags = flags; + job->read_err = 0; + job->write_err = 0; + job->rw = READ; + + memcpy(&job->source, from, sizeof(*from)); + + job->num_dests = num_dests; + memcpy(&job->dests, dests, sizeof(*dests) * num_dests); + + job->offset = 0; + job->nr_pages = 0; + INIT_LIST_HEAD(&job->pages); + + job->fn = fn; + job->context = context; + + if (job->source.count < SUB_JOB_SIZE) + dispatch_job(job); + + else { + init_MUTEX(&job->lock); + job->progress = 0; + split_job(job); + } + + return 0; +} + +/* + * Cancels a kcopyd job, eg. someone might be deactivating a + * mirror. + */ +int kcopyd_cancel(struct kcopyd_job *job, int block) +{ + /* FIXME: finish */ + return -1; +} + +/*----------------------------------------------------------------- + * Unit setup + *---------------------------------------------------------------*/ +static DECLARE_MUTEX(_client_lock); +static LIST_HEAD(_clients); + +static int client_add(struct kcopyd_client *kc) +{ + down(&_client_lock); + list_add(&kc->list, &_clients); + up(&_client_lock); + return 0; +} + +static void client_del(struct kcopyd_client *kc) +{ + down(&_client_lock); + list_del(&kc->list); + up(&_client_lock); +} + +int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result) +{ + int r = 0; + struct kcopyd_client *kc; + + kc = kmalloc(sizeof(*kc), GFP_KERNEL); + if (!kc) + return -ENOMEM; + + kc->lock = SPIN_LOCK_UNLOCKED; + INIT_LIST_HEAD(&kc->pages); + kc->nr_pages = kc->nr_free_pages = 0; + r = client_alloc_pages(kc, nr_pages); + if (r) { + kfree(kc); + return r; + } + + r = dm_io_get(nr_pages); + if (r) { + client_free_pages(kc); + kfree(kc); + return r; + } + + r = client_add(kc); + if (r) { + dm_io_put(nr_pages); + client_free_pages(kc); + kfree(kc); + return r; + } + + *result = kc; + return 0; +} + +void kcopyd_client_destroy(struct kcopyd_client *kc) +{ + dm_io_put(kc->nr_pages); + client_free_pages(kc); + client_del(kc); + kfree(kc); +} + + +int __init kcopyd_init(void) +{ + int r; + + r = jobs_init(); + if (r) + return r; + + _kcopyd_wq = create_workqueue("kcopyd"); + if (!_kcopyd_wq) { + jobs_exit(); + return -ENOMEM; + } + + INIT_WORK(&_kcopyd_work, do_work, NULL); + return 0; +} + +void kcopyd_exit(void) +{ + jobs_exit(); + destroy_workqueue(_kcopyd_wq); +} + +EXPORT_SYMBOL(kcopyd_client_create); +EXPORT_SYMBOL(kcopyd_client_destroy); +EXPORT_SYMBOL(kcopyd_copy); +EXPORT_SYMBOL(kcopyd_cancel); --- diff/drivers/md/kcopyd.h 1970-01-01 01:00:00.000000000 +0100 +++ source/drivers/md/kcopyd.h 2004-02-23 14:06:44.000000000 +0000 @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2001 Sistina Software + * + * This file is released under the GPL. + */ + +#ifndef DM_KCOPYD_H +#define DM_KCOPYD_H + +#include "dm-io.h" + +int kcopyd_init(void); +void kcopyd_exit(void); + +/* FIXME: make this configurable */ +#define KCOPYD_MAX_REGIONS 8 + +#define KCOPYD_IGNORE_ERROR 1 + +/* + * To use kcopyd you must first create a kcopyd client object. + */ +struct kcopyd_client; +int kcopyd_client_create(unsigned int num_pages, struct kcopyd_client **result); +void kcopyd_client_destroy(struct kcopyd_client *kc); + +/* + * Submit a copy job to kcopyd. This is built on top of the + * previous three fns. + * + * read_err is a boolean, + * write_err is a bitset, with 1 bit for each destination region + */ +typedef void (*kcopyd_notify_fn)(int read_err, + unsigned int write_err, void *context); + +int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from, + unsigned int num_dests, struct io_region *dests, + unsigned int flags, kcopyd_notify_fn fn, void *context); + +#endif