NetBSD-5.0.2/external/gpl2/libdevmapper/dist/kernel/common/kcopyd.c.async

/*
 * kcopyd.c
 *
 * Copyright (C) 2002 Sistina Software (UK) Limited.
 *
 * This file is released under the GPL.
 */

#include <linux/config.h>
#include <linux/module.h>
#include <linux/init.h>
#include <linux/slab.h>
#include <linux/list.h>
#include <linux/fs.h>
#include <linux/blkdev.h>
#include <linux/device-mapper.h>

#include "dm.h"


/* Hard sector size used all over the kernel */
#define SECTOR_SIZE 512

/* Number of entries in the free list */
#define FREE_LIST_SIZE 32

/* Number of iobufs we have, therefore the number of I/Os we
   can be doing at once */
#define NUM_IOBUFS 16

/* Slab cache for work entries when the freelist runs out */
static kmem_cache_t *entry_cachep;

/* Structure of work to do in the list */
struct copy_work
{
	unsigned long fromsec;
	unsigned long tosec;
	unsigned long nr_sectors;
	unsigned long done_sectors;
	kdev_t fromdev;
	kdev_t todev;
	int    throttle;
	int    priority;      /* 0=highest */
	void   (*callback)(copy_cb_reason_t, void *, long);
	void   *context;      /* Parameter for callback */
	int    freelist;      /* Whether we came from the free list */
	struct iobuf_entry *iobuf;
	struct list_head list;
};

/* The free list of iobufs */
struct iobuf_entry
{
	struct kiobuf    *iobuf;
	struct copy_work *work;     /* Work entry we are doing */
	struct list_head list;
	copy_cb_reason_t complete_reason;
	long     nr_sectors;
	int      rw;
};

static LIST_HEAD(work_list);     /* Work to do or multiple-read blocks in progress */
static LIST_HEAD(write_list);    /* Writes to do */
static LIST_HEAD(free_list);     /* Free work units */
static LIST_HEAD(iobuf_list);    /* Free iobufs */
static LIST_HEAD(complete_list); /* work entries completed waiting notification */
static struct task_struct *copy_task = NULL;
static struct rw_semaphore work_list_lock;
static struct rw_semaphore free_list_lock;
static spinlock_t write_list_spinlock = SPIN_LOCK_UNLOCKED;
static spinlock_t complete_list_spinlock = SPIN_LOCK_UNLOCKED;
static DECLARE_MUTEX(start_lock);
static DECLARE_MUTEX(run_lock);
static DECLARE_WAIT_QUEUE_HEAD(start_waitq);
static DECLARE_WAIT_QUEUE_HEAD(work_waitq);
static DECLARE_WAIT_QUEUE_HEAD(freelist_waitq);
static int thread_exit = 0;

/* Find a free entry from the free-list or allocate a new one.
   This routine always returns a valid pointer even if it has to wait
   for it */
static struct copy_work *get_work_struct(void)
{
	struct copy_work *entry = NULL;

	while (!entry) {
		down_write(&free_list_lock);
		if (!list_empty(&free_list)) {
			entry = list_entry(free_list.next, struct copy_work, list);
			list_del(&entry->list);
		}
		up_write(&free_list_lock);

		/* Nothing on the free-list - try to allocate one without doing IO */
		if (!entry) {
			entry = kmem_cache_alloc(entry_cachep, GFP_NOIO);

			/* Make sure we know it didn't come from the free list */
			if (entry) {
				entry->freelist = 0;
			}
		}

		/* Failed...wait for IO to finish */
		if (!entry) {
			DECLARE_WAITQUEUE(wq, current);

			set_task_state(current, TASK_INTERRUPTIBLE);
			add_wait_queue(&freelist_waitq, &wq);

			if (list_empty(&free_list))
				schedule();

			set_task_state(current, TASK_RUNNING);
			remove_wait_queue(&freelist_waitq, &wq);
		}
	}

	return entry;
}

/* Add a new entry to the work list - in priority+FIFO order.
   The work_list_lock semaphore must be held */
static void add_to_work_list(struct copy_work *item)
{
	struct list_head *entry;

	list_for_each(entry, &work_list) {
		struct copy_work *cw;

		cw = list_entry(entry, struct copy_work, list);
		if (cw->priority > item->priority) {
			__list_add(&item->list, cw->list.prev, &cw->list);
			return;
		}
	}
	list_add_tail(&item->list, &work_list);
}

/* Called when the kio has finished - add the used bits back to their
   free lists and notify the user */
static void end_copy(struct iobuf_entry *ioe, copy_cb_reason_t reason)
{
        /* Tell the caller */
	if (ioe->work->callback)
		ioe->work->callback(reason, ioe->work->context, ioe->work->done_sectors);

	down_write(&free_list_lock);
	if (ioe->work->freelist) {
		list_add(&ioe->work->list, &free_list);
	}
	else {
		kmem_cache_free(entry_cachep, ioe->work);
	}
	list_add(&ioe->list, &iobuf_list);
	up_write(&free_list_lock);
	wake_up_interruptible(&freelist_waitq);
}

/* A single BH has completed */
static void end_bh(struct buffer_head *bh, int uptodate)
{
	struct kiobuf *kiobuf = bh->b_private;

	mark_buffer_uptodate(bh, uptodate);
	unlock_buffer(bh);

	if ((!uptodate) && !kiobuf->errno)
		kiobuf->errno = -EIO;

	/* Have all of them done ? */
	if (atomic_dec_and_test(&kiobuf->io_count)) {
		if (kiobuf->end_io)
			kiobuf->end_io(kiobuf);
	}
}

/* The whole iobuf has finished */
static void end_kiobuf(struct kiobuf *iobuf)
{
	struct iobuf_entry *ioe;

	/* Now, where did we leave that pointer...ah yes... */
	ioe = (struct iobuf_entry *)iobuf->blocks[0];

	if (ioe->rw == READ) {
		if (iobuf->errno) {
			ioe->complete_reason = COPY_CB_FAILED_READ;
			spin_lock_irq(&complete_list_spinlock);
			list_add(&ioe->list, &complete_list);
			spin_unlock_irq(&complete_list_spinlock);
			wake_up_interruptible(&work_waitq);
		}
		else {
			/* Put it on the write list */
			spin_lock_irq(&write_list_spinlock);
			list_add(&ioe->work->list, &write_list);
			spin_unlock_irq(&write_list_spinlock);
			wake_up_interruptible(&work_waitq);
		}
	}
	else {
		/* WRITE */
		if (iobuf->errno) {
			ioe->complete_reason = COPY_CB_FAILED_WRITE;
			spin_lock_irq(&complete_list_spinlock);
			list_add(&ioe->list, &complete_list);
			spin_unlock_irq(&complete_list_spinlock);
			wake_up_interruptible(&work_waitq);
		}
		else {
			/* All went well */
			ioe->work->done_sectors += ioe->nr_sectors;

			/* If not finished yet then do a progress callback */
			if (ioe->work->done_sectors < ioe->work->nr_sectors) {
				if (ioe->work->callback)
					ioe->work->callback(COPY_CB_PROGRESS, ioe->work->context, ioe->work->done_sectors);

				/* Put it back in the queue */
				down_write(&work_list_lock);
				add_to_work_list(ioe->work);
				up_write(&work_list_lock);
				wake_up_interruptible(&work_waitq);
			}
			else {
				ioe->complete_reason = COPY_CB_COMPLETE;
				spin_lock_irq(&complete_list_spinlock);
				list_add(&ioe->list, &complete_list);
				spin_unlock_irq(&complete_list_spinlock);
				wake_up_interruptible(&work_waitq);
			}
		}
	}
}

/* Asynchronous simplified version of brw_kiovec */
static int brw_kiobuf_async(int rw, struct iobuf_entry *ioe, unsigned long blocknr, kdev_t dev)
{
	int r, length, pi, bi = 0, offset, bsize;
	int nr_pages, nr_blocks;
	struct page *map;
	struct buffer_head *bh = 0;
	struct buffer_head **bhs = 0;

	length = ioe->iobuf->length;
	ioe->iobuf->errno = 0;
	bhs = ioe->iobuf->bh;
	bsize = get_hardsect_size(dev);
	nr_pages = length / PAGE_SIZE;
	nr_blocks = ioe->nr_sectors / (bsize/SECTOR_SIZE);

	/* Squirrel our pointer away somewhere secret */
	ioe->iobuf->blocks[0] = (long)ioe;
	ioe->iobuf->end_io = end_kiobuf;

	for (pi = 0; pi < nr_pages; pi++) {
		if (!(map = ioe->iobuf->maplist[pi])) {
			r = -EFAULT;
			goto bad;
		}
		offset = 0;

		while (offset < PAGE_SIZE) {
			bh = bhs[bi++];

			bh->b_dev = B_FREE;
			bh->b_size = bsize;
			set_bh_page(bh, map, offset);
			bh->b_this_page = bh;

			init_buffer(bh, end_bh, ioe->iobuf);
			bh->b_dev = dev;
			bh->b_blocknr = blocknr++;
			bh->b_private = ioe->iobuf;
			bh->b_state = ((1 << BH_Mapped) |
				       (1 << BH_Lock)   |
				       (1 << BH_Req));

			set_bit(BH_Uptodate, &bh->b_state);
			if (rw == WRITE)
				clear_bit(BH_Dirty, &bh->b_state);

			offset += bsize;
			atomic_inc(&ioe->iobuf->io_count);
			submit_bh(rw, bh);

			if (atomic_read(&ioe->iobuf->io_count) >= nr_blocks)
				break;
		}
	}
	return 0;

 bad:
	ioe->iobuf->errno = r;
	return r;
}

/* Allocate pages for a kiobuf */
static int alloc_iobuf_pages(struct kiobuf *iobuf, int nr_sectors)
{
	int nr_pages, err, i;

	if (nr_sectors > KIO_MAX_SECTORS)
		return -1;

	nr_pages = nr_sectors / (PAGE_SIZE/SECTOR_SIZE);
	err = expand_kiobuf(iobuf, nr_pages);
	if (err) goto out;

	err = -ENOMEM;
	iobuf->locked = 1;
	iobuf->nr_pages = 0;
	for (i = 0; i < nr_pages; i++) {
		struct page * page;

		page = alloc_page(GFP_KERNEL);
		if (!page) goto out;

		iobuf->maplist[i] = page;
		LockPage(page);
		iobuf->nr_pages++;
	}
	iobuf->offset = 0;

	err = 0;

out:
	return err;
}

/* Read/write chunk of data */
static int do_io(int rw, struct iobuf_entry *ioe, kdev_t dev, unsigned long start, int nr_sectors)
{
	int sectors_per_block;
	int blocksize = get_hardsect_size(dev);

	sectors_per_block = blocksize / SECTOR_SIZE;

	start /= sectors_per_block;

	ioe->iobuf->length = nr_sectors << 9;
	ioe->rw = rw;
	ioe->nr_sectors = nr_sectors;

	return brw_kiobuf_async(rw, ioe, start, dev);
}

/* This is where all the real work happens */
static int copy_kthread(void *unused)
{
	daemonize();
	down(&run_lock);

	strcpy(current->comm, "kcopyd");
	copy_task = current;
	wake_up_interruptible(&start_waitq);

	do {
		DECLARE_WAITQUEUE(wq, current);
		struct task_struct *tsk = current;
		struct list_head   *entry, *temp;

		/* First, check for outstanding writes to do */
		spin_lock_irq(&write_list_spinlock);
		list_for_each_safe(entry, temp, &write_list) {
			struct copy_work *work_item = list_entry(entry, struct copy_work, list);
			struct iobuf_entry *ioe = work_item->iobuf;

			list_del(&work_item->list);
			spin_unlock_irq(&write_list_spinlock);

			/* OK we read the data, now write it to the target device */
			if (do_io(WRITE, ioe, work_item->todev,
				  work_item->tosec + work_item->done_sectors,
				  ioe->nr_sectors) != 0) {
				DMERR("Write blocks to device %s failed", kdevname(work_item->todev));

				end_copy(ioe, COPY_CB_FAILED_WRITE);
			}
			spin_lock_irq(&write_list_spinlock);
		}
		spin_unlock_irq(&write_list_spinlock);

		/* Now look for new work, remember the list is in priority order */
		down_write(&work_list_lock);
		while (!list_empty(&work_list) && !list_empty(&iobuf_list)) {

			struct copy_work *work_item = list_entry(work_list.next, struct copy_work, list);
			struct iobuf_entry *ioe = list_entry(iobuf_list.next, struct iobuf_entry, list);
			long nr_sectors = min((unsigned long)KIO_MAX_SECTORS,
					      work_item->nr_sectors - work_item->done_sectors);

			list_del(&work_item->list);
			list_del(&ioe->list);
			up_write(&work_list_lock);

			/* Exchange pointers, this is legal for structures over 16 */
			ioe->work = work_item;
			work_item->iobuf = ioe;

			/* Read original blocks */
			if (do_io(READ, ioe, work_item->fromdev, work_item->fromsec + work_item->done_sectors,
				  nr_sectors) != 0) {
				DMERR("Read blocks from device %s failed", kdevname(work_item->fromdev));

				end_copy(ioe, COPY_CB_FAILED_READ);
			}

			/* Get the work lock again for the top of the while loop */
			down_write(&work_list_lock);
		}

		up_write(&work_list_lock);

		/* Wait for more work */
		set_task_state(tsk, TASK_INTERRUPTIBLE);
		add_wait_queue(&work_waitq, &wq);

		/* No work, or nothing to do it with */
		if ( (list_empty(&work_list) || list_empty(&iobuf_list)) &&
		     list_empty(&complete_list) &&
		     list_empty(&write_list))
			schedule();

		set_task_state(tsk, TASK_RUNNING);
		remove_wait_queue(&work_waitq, &wq);

		/* Check for completed entries and do the callbacks */
		spin_lock_irq(&complete_list_spinlock);
		list_for_each_safe(entry, temp, &complete_list) {
			struct iobuf_entry *ioe = list_entry(entry, struct iobuf_entry, list);
			list_del(&ioe->list);
			spin_unlock_irq(&complete_list_spinlock);

			end_copy(ioe, ioe->complete_reason);

			spin_lock_irq(&complete_list_spinlock);
		}
		spin_unlock_irq(&complete_list_spinlock);

	} while (thread_exit == 0);

	up(&run_lock);
	DMINFO("kcopyd shutting down");
	return 0;
}

/* API entry point */
int dm_blockcopy(unsigned long fromsec, unsigned long tosec, unsigned long nr_sectors,
		 kdev_t fromdev, kdev_t todev,
		 int priority, int throttle, void (*callback)(copy_cb_reason_t, void *, long), void *context)
{
	struct copy_work *newwork;
	static pid_t thread_pid = 0;
	long from_blocksize = get_hardsect_size(fromdev);
	long to_blocksize = get_hardsect_size(todev);

	/* Make sure the start sectors are on physical block boundaries */
	if (fromsec % (from_blocksize/SECTOR_SIZE))
		return -EINVAL;
	if (tosec % (to_blocksize/SECTOR_SIZE))
		return -EINVAL;

	/* Start the thread if we don't have one already */
	down(&start_lock);
	if (copy_task == NULL) {
		thread_pid = kernel_thread(copy_kthread, NULL, 0);
		if (thread_pid > 0) {

			DECLARE_WAITQUEUE(wq, current);
			struct task_struct *tsk = current;

			DMINFO("Started kcopyd thread, %d buffers", NUM_IOBUFS);

			/* Wait for it to complete it's startup initialisation */
			set_task_state(tsk, TASK_INTERRUPTIBLE);
			add_wait_queue(&start_waitq, &wq);

			if (!copy_task)
				schedule();

			set_task_state(tsk, TASK_RUNNING);
			remove_wait_queue(&start_waitq, &wq);
		}
		else {
			DMERR("Failed to start kcopyd thread");
			up(&start_lock);
			return -EAGAIN;
		}
	}
	up(&start_lock);

	/* This will wait until one is available */
	newwork = get_work_struct();

	newwork->fromsec      = fromsec;
	newwork->tosec        = tosec;
	newwork->fromdev      = fromdev;
	newwork->todev        = todev;
	newwork->nr_sectors   = nr_sectors;
	newwork->done_sectors = 0;
	newwork->throttle     = throttle;
	newwork->priority     = priority;
	newwork->callback     = callback;
	newwork->context      = context;

	down_write(&work_list_lock);
	add_to_work_list(newwork);
	up_write(&work_list_lock);

	wake_up_interruptible(&work_waitq);
	return 0;
}


/* Pre-allocate some structures for the free list */
static int allocate_free_list(void)
{
	int i;
	struct copy_work *newwork;

	for (i=0; i<FREE_LIST_SIZE; i++) {
		newwork = kmalloc(sizeof(struct copy_work), GFP_KERNEL);
		if (!newwork)
			return i;
		newwork->freelist = 1;
		list_add(&newwork->list, &free_list);
	}
	return i;
}

static void free_iobufs(void)
{
	struct list_head *entry, *temp;

	list_for_each_safe(entry, temp, &iobuf_list) {
		struct iobuf_entry *ioe = list_entry(entry, struct iobuf_entry, list);
		unmap_kiobuf(ioe->iobuf);
		free_kiovec(1, &ioe->iobuf);

		list_del(&ioe->list);
	}
}

int __init kcopyd_init(void)
{
	int i;

	init_rwsem(&work_list_lock);
	init_rwsem(&free_list_lock);
	init_MUTEX(&start_lock);
	init_MUTEX(&run_lock);

	for (i=0; i< NUM_IOBUFS; i++) {
		struct iobuf_entry *entry = kmalloc(sizeof(struct iobuf_entry), GFP_KERNEL);
		if (entry == NULL) {
			DMERR("Unable to allocate memory for kiobuf");
			free_iobufs();
			return -1;
		}
		if (alloc_kiovec(1, &entry->iobuf)) {
			DMERR("Unable to allocate kiobuf for kcopyd");
			kfree(entry);
			free_iobufs();
			return -1;
		}

		if (alloc_iobuf_pages(entry->iobuf, KIO_MAX_SECTORS)) {
			DMERR("Unable to allocate pages for kcopyd");
			free_kiovec(1, &entry->iobuf);
			kfree(entry);
			free_iobufs();
			return -1;
		}
		list_add(&entry->list, &iobuf_list);
	}

	entry_cachep = kmem_cache_create("kcopyd",
					 sizeof(struct copy_work),
					 __alignof__(struct copy_work),
					 0, NULL, NULL);
	if (!entry_cachep) {
		free_iobufs();
		DMERR("Unable to allocate slab cache for kcopyd");
		return -1;
	}

	if (allocate_free_list() == 0) {
		free_iobufs();
		kmem_cache_destroy(entry_cachep);
		DMERR("Unable to allocate any work structures for the free list");
		return -1;
	}

	return 0;
}

void kcopyd_exit(void)
{
	struct list_head *entry, *temp;

	thread_exit = 1;
	wake_up_interruptible(&work_waitq);

	/* Wait for the thread to finish */
	down(&run_lock);
	up(&run_lock);

	/* Free the iobufs */
	free_iobufs();

        /* Free the free list */
	list_for_each_safe(entry, temp, &free_list) {
		struct copy_work *cw;
		cw = list_entry(entry, struct copy_work, list);
		list_del(&cw->list);
		kfree(cw);
	}

	if (entry_cachep)
		kmem_cache_destroy(entry_cachep);
}

EXPORT_SYMBOL(dm_blockcopy);

/*
 * Overrides for Emacs so that we follow Linus's tabbing style.
 * Emacs will notice this stuff at the end of the file and automatically
 * adjust the settings for this buffer only.  This must remain at the end
 * of the file.
 * ---------------------------------------------------------------------------
 * Local variables:
 * c-file-style: "linux"
 * End:
 */