/* * kcopyd.c * * Copyright (C) 2002 Sistina Software (UK) Limited. * * This file is released under the GPL. */ #include <linux/config.h> #include <linux/module.h> #include <linux/init.h> #include <linux/slab.h> #include <linux/list.h> #include <linux/fs.h> #include <linux/blkdev.h> #include <linux/device-mapper.h> #include "dm.h" /* Hard sector size used all over the kernel */ #define SECTOR_SIZE 512 /* Number of entries in the free list */ #define FREE_LIST_SIZE 32 /* Number of iobufs we have, therefore the number of I/Os we can be doing at once */ #define NUM_IOBUFS 16 /* Slab cache for work entries when the freelist runs out */ static kmem_cache_t *entry_cachep; /* Structure of work to do in the list */ struct copy_work { unsigned long fromsec; unsigned long tosec; unsigned long nr_sectors; unsigned long done_sectors; kdev_t fromdev; kdev_t todev; int throttle; int priority; /* 0=highest */ void (*callback)(copy_cb_reason_t, void *, long); void *context; /* Parameter for callback */ int freelist; /* Whether we came from the free list */ struct iobuf_entry *iobuf; struct list_head list; }; /* The free list of iobufs */ struct iobuf_entry { struct kiobuf *iobuf; struct copy_work *work; /* Work entry we are doing */ struct list_head list; copy_cb_reason_t complete_reason; long nr_sectors; int rw; }; static LIST_HEAD(work_list); /* Work to do or multiple-read blocks in progress */ static LIST_HEAD(write_list); /* Writes to do */ static LIST_HEAD(free_list); /* Free work units */ static LIST_HEAD(iobuf_list); /* Free iobufs */ static LIST_HEAD(complete_list); /* work entries completed waiting notification */ static struct task_struct *copy_task = NULL; static struct rw_semaphore work_list_lock; static struct rw_semaphore free_list_lock; static spinlock_t write_list_spinlock = SPIN_LOCK_UNLOCKED; static spinlock_t complete_list_spinlock = SPIN_LOCK_UNLOCKED; static DECLARE_MUTEX(start_lock); static DECLARE_MUTEX(run_lock); static DECLARE_WAIT_QUEUE_HEAD(start_waitq); static DECLARE_WAIT_QUEUE_HEAD(work_waitq); static DECLARE_WAIT_QUEUE_HEAD(freelist_waitq); static int thread_exit = 0; /* Find a free entry from the free-list or allocate a new one. This routine always returns a valid pointer even if it has to wait for it */ static struct copy_work *get_work_struct(void) { struct copy_work *entry = NULL; while (!entry) { down_write(&free_list_lock); if (!list_empty(&free_list)) { entry = list_entry(free_list.next, struct copy_work, list); list_del(&entry->list); } up_write(&free_list_lock); /* Nothing on the free-list - try to allocate one without doing IO */ if (!entry) { entry = kmem_cache_alloc(entry_cachep, GFP_NOIO); /* Make sure we know it didn't come from the free list */ if (entry) { entry->freelist = 0; } } /* Failed...wait for IO to finish */ if (!entry) { DECLARE_WAITQUEUE(wq, current); set_task_state(current, TASK_INTERRUPTIBLE); add_wait_queue(&freelist_waitq, &wq); if (list_empty(&free_list)) schedule(); set_task_state(current, TASK_RUNNING); remove_wait_queue(&freelist_waitq, &wq); } } return entry; } /* Add a new entry to the work list - in priority+FIFO order. The work_list_lock semaphore must be held */ static void add_to_work_list(struct copy_work *item) { struct list_head *entry; list_for_each(entry, &work_list) { struct copy_work *cw; cw = list_entry(entry, struct copy_work, list); if (cw->priority > item->priority) { __list_add(&item->list, cw->list.prev, &cw->list); return; } } list_add_tail(&item->list, &work_list); } /* Called when the kio has finished - add the used bits back to their free lists and notify the user */ static void end_copy(struct iobuf_entry *ioe, copy_cb_reason_t reason) { /* Tell the caller */ if (ioe->work->callback) ioe->work->callback(reason, ioe->work->context, ioe->work->done_sectors); down_write(&free_list_lock); if (ioe->work->freelist) { list_add(&ioe->work->list, &free_list); } else { kmem_cache_free(entry_cachep, ioe->work); } list_add(&ioe->list, &iobuf_list); up_write(&free_list_lock); wake_up_interruptible(&freelist_waitq); } /* A single BH has completed */ static void end_bh(struct buffer_head *bh, int uptodate) { struct kiobuf *kiobuf = bh->b_private; mark_buffer_uptodate(bh, uptodate); unlock_buffer(bh); if ((!uptodate) && !kiobuf->errno) kiobuf->errno = -EIO; /* Have all of them done ? */ if (atomic_dec_and_test(&kiobuf->io_count)) { if (kiobuf->end_io) kiobuf->end_io(kiobuf); } } /* The whole iobuf has finished */ static void end_kiobuf(struct kiobuf *iobuf) { struct iobuf_entry *ioe; /* Now, where did we leave that pointer...ah yes... */ ioe = (struct iobuf_entry *)iobuf->blocks[0]; if (ioe->rw == READ) { if (iobuf->errno) { ioe->complete_reason = COPY_CB_FAILED_READ; spin_lock_irq(&complete_list_spinlock); list_add(&ioe->list, &complete_list); spin_unlock_irq(&complete_list_spinlock); wake_up_interruptible(&work_waitq); } else { /* Put it on the write list */ spin_lock_irq(&write_list_spinlock); list_add(&ioe->work->list, &write_list); spin_unlock_irq(&write_list_spinlock); wake_up_interruptible(&work_waitq); } } else { /* WRITE */ if (iobuf->errno) { ioe->complete_reason = COPY_CB_FAILED_WRITE; spin_lock_irq(&complete_list_spinlock); list_add(&ioe->list, &complete_list); spin_unlock_irq(&complete_list_spinlock); wake_up_interruptible(&work_waitq); } else { /* All went well */ ioe->work->done_sectors += ioe->nr_sectors; /* If not finished yet then do a progress callback */ if (ioe->work->done_sectors < ioe->work->nr_sectors) { if (ioe->work->callback) ioe->work->callback(COPY_CB_PROGRESS, ioe->work->context, ioe->work->done_sectors); /* Put it back in the queue */ down_write(&work_list_lock); add_to_work_list(ioe->work); up_write(&work_list_lock); wake_up_interruptible(&work_waitq); } else { ioe->complete_reason = COPY_CB_COMPLETE; spin_lock_irq(&complete_list_spinlock); list_add(&ioe->list, &complete_list); spin_unlock_irq(&complete_list_spinlock); wake_up_interruptible(&work_waitq); } } } } /* Asynchronous simplified version of brw_kiovec */ static int brw_kiobuf_async(int rw, struct iobuf_entry *ioe, unsigned long blocknr, kdev_t dev) { int r, length, pi, bi = 0, offset, bsize; int nr_pages, nr_blocks; struct page *map; struct buffer_head *bh = 0; struct buffer_head **bhs = 0; length = ioe->iobuf->length; ioe->iobuf->errno = 0; bhs = ioe->iobuf->bh; bsize = get_hardsect_size(dev); nr_pages = length / PAGE_SIZE; nr_blocks = ioe->nr_sectors / (bsize/SECTOR_SIZE); /* Squirrel our pointer away somewhere secret */ ioe->iobuf->blocks[0] = (long)ioe; ioe->iobuf->end_io = end_kiobuf; for (pi = 0; pi < nr_pages; pi++) { if (!(map = ioe->iobuf->maplist[pi])) { r = -EFAULT; goto bad; } offset = 0; while (offset < PAGE_SIZE) { bh = bhs[bi++]; bh->b_dev = B_FREE; bh->b_size = bsize; set_bh_page(bh, map, offset); bh->b_this_page = bh; init_buffer(bh, end_bh, ioe->iobuf); bh->b_dev = dev; bh->b_blocknr = blocknr++; bh->b_private = ioe->iobuf; bh->b_state = ((1 << BH_Mapped) | (1 << BH_Lock) | (1 << BH_Req)); set_bit(BH_Uptodate, &bh->b_state); if (rw == WRITE) clear_bit(BH_Dirty, &bh->b_state); offset += bsize; atomic_inc(&ioe->iobuf->io_count); submit_bh(rw, bh); if (atomic_read(&ioe->iobuf->io_count) >= nr_blocks) break; } } return 0; bad: ioe->iobuf->errno = r; return r; } /* Allocate pages for a kiobuf */ static int alloc_iobuf_pages(struct kiobuf *iobuf, int nr_sectors) { int nr_pages, err, i; if (nr_sectors > KIO_MAX_SECTORS) return -1; nr_pages = nr_sectors / (PAGE_SIZE/SECTOR_SIZE); err = expand_kiobuf(iobuf, nr_pages); if (err) goto out; err = -ENOMEM; iobuf->locked = 1; iobuf->nr_pages = 0; for (i = 0; i < nr_pages; i++) { struct page * page; page = alloc_page(GFP_KERNEL); if (!page) goto out; iobuf->maplist[i] = page; LockPage(page); iobuf->nr_pages++; } iobuf->offset = 0; err = 0; out: return err; } /* Read/write chunk of data */ static int do_io(int rw, struct iobuf_entry *ioe, kdev_t dev, unsigned long start, int nr_sectors) { int sectors_per_block; int blocksize = get_hardsect_size(dev); sectors_per_block = blocksize / SECTOR_SIZE; start /= sectors_per_block; ioe->iobuf->length = nr_sectors << 9; ioe->rw = rw; ioe->nr_sectors = nr_sectors; return brw_kiobuf_async(rw, ioe, start, dev); } /* This is where all the real work happens */ static int copy_kthread(void *unused) { daemonize(); down(&run_lock); strcpy(current->comm, "kcopyd"); copy_task = current; wake_up_interruptible(&start_waitq); do { DECLARE_WAITQUEUE(wq, current); struct task_struct *tsk = current; struct list_head *entry, *temp; /* First, check for outstanding writes to do */ spin_lock_irq(&write_list_spinlock); list_for_each_safe(entry, temp, &write_list) { struct copy_work *work_item = list_entry(entry, struct copy_work, list); struct iobuf_entry *ioe = work_item->iobuf; list_del(&work_item->list); spin_unlock_irq(&write_list_spinlock); /* OK we read the data, now write it to the target device */ if (do_io(WRITE, ioe, work_item->todev, work_item->tosec + work_item->done_sectors, ioe->nr_sectors) != 0) { DMERR("Write blocks to device %s failed", kdevname(work_item->todev)); end_copy(ioe, COPY_CB_FAILED_WRITE); } spin_lock_irq(&write_list_spinlock); } spin_unlock_irq(&write_list_spinlock); /* Now look for new work, remember the list is in priority order */ down_write(&work_list_lock); while (!list_empty(&work_list) && !list_empty(&iobuf_list)) { struct copy_work *work_item = list_entry(work_list.next, struct copy_work, list); struct iobuf_entry *ioe = list_entry(iobuf_list.next, struct iobuf_entry, list); long nr_sectors = min((unsigned long)KIO_MAX_SECTORS, work_item->nr_sectors - work_item->done_sectors); list_del(&work_item->list); list_del(&ioe->list); up_write(&work_list_lock); /* Exchange pointers, this is legal for structures over 16 */ ioe->work = work_item; work_item->iobuf = ioe; /* Read original blocks */ if (do_io(READ, ioe, work_item->fromdev, work_item->fromsec + work_item->done_sectors, nr_sectors) != 0) { DMERR("Read blocks from device %s failed", kdevname(work_item->fromdev)); end_copy(ioe, COPY_CB_FAILED_READ); } /* Get the work lock again for the top of the while loop */ down_write(&work_list_lock); } up_write(&work_list_lock); /* Wait for more work */ set_task_state(tsk, TASK_INTERRUPTIBLE); add_wait_queue(&work_waitq, &wq); /* No work, or nothing to do it with */ if ( (list_empty(&work_list) || list_empty(&iobuf_list)) && list_empty(&complete_list) && list_empty(&write_list)) schedule(); set_task_state(tsk, TASK_RUNNING); remove_wait_queue(&work_waitq, &wq); /* Check for completed entries and do the callbacks */ spin_lock_irq(&complete_list_spinlock); list_for_each_safe(entry, temp, &complete_list) { struct iobuf_entry *ioe = list_entry(entry, struct iobuf_entry, list); list_del(&ioe->list); spin_unlock_irq(&complete_list_spinlock); end_copy(ioe, ioe->complete_reason); spin_lock_irq(&complete_list_spinlock); } spin_unlock_irq(&complete_list_spinlock); } while (thread_exit == 0); up(&run_lock); DMINFO("kcopyd shutting down"); return 0; } /* API entry point */ int dm_blockcopy(unsigned long fromsec, unsigned long tosec, unsigned long nr_sectors, kdev_t fromdev, kdev_t todev, int priority, int throttle, void (*callback)(copy_cb_reason_t, void *, long), void *context) { struct copy_work *newwork; static pid_t thread_pid = 0; long from_blocksize = get_hardsect_size(fromdev); long to_blocksize = get_hardsect_size(todev); /* Make sure the start sectors are on physical block boundaries */ if (fromsec % (from_blocksize/SECTOR_SIZE)) return -EINVAL; if (tosec % (to_blocksize/SECTOR_SIZE)) return -EINVAL; /* Start the thread if we don't have one already */ down(&start_lock); if (copy_task == NULL) { thread_pid = kernel_thread(copy_kthread, NULL, 0); if (thread_pid > 0) { DECLARE_WAITQUEUE(wq, current); struct task_struct *tsk = current; DMINFO("Started kcopyd thread, %d buffers", NUM_IOBUFS); /* Wait for it to complete it's startup initialisation */ set_task_state(tsk, TASK_INTERRUPTIBLE); add_wait_queue(&start_waitq, &wq); if (!copy_task) schedule(); set_task_state(tsk, TASK_RUNNING); remove_wait_queue(&start_waitq, &wq); } else { DMERR("Failed to start kcopyd thread"); up(&start_lock); return -EAGAIN; } } up(&start_lock); /* This will wait until one is available */ newwork = get_work_struct(); newwork->fromsec = fromsec; newwork->tosec = tosec; newwork->fromdev = fromdev; newwork->todev = todev; newwork->nr_sectors = nr_sectors; newwork->done_sectors = 0; newwork->throttle = throttle; newwork->priority = priority; newwork->callback = callback; newwork->context = context; down_write(&work_list_lock); add_to_work_list(newwork); up_write(&work_list_lock); wake_up_interruptible(&work_waitq); return 0; } /* Pre-allocate some structures for the free list */ static int allocate_free_list(void) { int i; struct copy_work *newwork; for (i=0; i<FREE_LIST_SIZE; i++) { newwork = kmalloc(sizeof(struct copy_work), GFP_KERNEL); if (!newwork) return i; newwork->freelist = 1; list_add(&newwork->list, &free_list); } return i; } static void free_iobufs(void) { struct list_head *entry, *temp; list_for_each_safe(entry, temp, &iobuf_list) { struct iobuf_entry *ioe = list_entry(entry, struct iobuf_entry, list); unmap_kiobuf(ioe->iobuf); free_kiovec(1, &ioe->iobuf); list_del(&ioe->list); } } int __init kcopyd_init(void) { int i; init_rwsem(&work_list_lock); init_rwsem(&free_list_lock); init_MUTEX(&start_lock); init_MUTEX(&run_lock); for (i=0; i< NUM_IOBUFS; i++) { struct iobuf_entry *entry = kmalloc(sizeof(struct iobuf_entry), GFP_KERNEL); if (entry == NULL) { DMERR("Unable to allocate memory for kiobuf"); free_iobufs(); return -1; } if (alloc_kiovec(1, &entry->iobuf)) { DMERR("Unable to allocate kiobuf for kcopyd"); kfree(entry); free_iobufs(); return -1; } if (alloc_iobuf_pages(entry->iobuf, KIO_MAX_SECTORS)) { DMERR("Unable to allocate pages for kcopyd"); free_kiovec(1, &entry->iobuf); kfree(entry); free_iobufs(); return -1; } list_add(&entry->list, &iobuf_list); } entry_cachep = kmem_cache_create("kcopyd", sizeof(struct copy_work), __alignof__(struct copy_work), 0, NULL, NULL); if (!entry_cachep) { free_iobufs(); DMERR("Unable to allocate slab cache for kcopyd"); return -1; } if (allocate_free_list() == 0) { free_iobufs(); kmem_cache_destroy(entry_cachep); DMERR("Unable to allocate any work structures for the free list"); return -1; } return 0; } void kcopyd_exit(void) { struct list_head *entry, *temp; thread_exit = 1; wake_up_interruptible(&work_waitq); /* Wait for the thread to finish */ down(&run_lock); up(&run_lock); /* Free the iobufs */ free_iobufs(); /* Free the free list */ list_for_each_safe(entry, temp, &free_list) { struct copy_work *cw; cw = list_entry(entry, struct copy_work, list); list_del(&cw->list); kfree(cw); } if (entry_cachep) kmem_cache_destroy(entry_cachep); } EXPORT_SYMBOL(dm_blockcopy); /* * Overrides for Emacs so that we follow Linus's tabbing style. * Emacs will notice this stuff at the end of the file and automatically * adjust the settings for this buffer only. This must remain at the end * of the file. * --------------------------------------------------------------------------- * Local variables: * c-file-style: "linux" * End: */