OpenSolaris_b135/lib/libnisdb/db.cc

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*
 *	db.cc
 *
 * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

#include <stdio.h>
#include <string.h>
#ifdef TDRPC
#include <sysent.h>
#else
#include <unistd.h>
#endif

#include "nisdb_mt.h"
#include "db_headers.h"
#include "db.h"

extern db_result *empty_result(db_status);
extern int add_to_standby_list(db*);
extern int remove_from_standby_list(db*);

/* for db_next_desc */

#define	LINEAR 1
#define	CHAINED 2

struct db_next_info {
	int next_type;		/* linear or chained */
	void* next_value;	/* linear: entryp; */
				/* chained: db_next_index_desc* */
};


/* Constructor:  Create a database using the given name, 'dbname.'
	    The database is stored in a file named 'dbname'.
	    The log file is stored in a file named 'dbname'.log.
	    A temporary file 'dbname'.tmp is also used.   */
db::db(char* dbname)
{
	int len = strlen(dbname);
	dbfilename = new char[len+1];
	if (dbfilename == NULL)
		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
	logfilename = new char[len+5];
	if (logfilename == NULL) {
		delete dbfilename;
		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
	}
	tmpfilename = new char[len+5];
	if (tmpfilename == NULL) {
		delete dbfilename;
		delete logfilename;
		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
	}
	sprintf(dbfilename, "%s", dbname);
	sprintf(logfilename, "%s.log", dbname);
	sprintf(tmpfilename, "%s.tmp", dbname);
	logfile = NULL;
	logfile_opened = FALSE;
	changed = FALSE;
	INITRW(db);
	READLOCKOK(db);

	internal_db.setDbPtr(this);
	(void) internal_db.configure(dbname);
}

/* destructor:  note that associated files should be removed separated  */
db::~db()
{
	(void)acqexcl();
	internal_db.reset();  /* clear any associated data structures */
	delete dbfilename;
	delete logfilename;
	delete tmpfilename;
	close_log();
	delete logfile;
	(void)destroylock();
}


static void
assign_next_desc(db_next_desc* desc, entryp value)
{
	db_next_info * store = new db_next_info;
	if (store == NULL) {
		desc->db_next_desc_val =  NULL;
		desc->db_next_desc_len = 0;
		FATAL("db::assign_next_desc: cannot allocate space",
			DB_MEMORY_LIMIT);
	}

	store->next_type = LINEAR;
	store->next_value = (void*)value;
	desc->db_next_desc_val =  (char*) store;
	desc->db_next_desc_len = sizeof (db_next_info);
}

static void
assign_next_desc(db_next_desc* desc, db_next_index_desc * value)
{
	db_next_info * store = new db_next_info;
	if (store == NULL) {
		desc->db_next_desc_val =  NULL;
		desc->db_next_desc_len = 0;
		FATAL("db::assign_next_desc: cannot allocate space (2)",
			DB_MEMORY_LIMIT);
	}
	store->next_type = CHAINED;
	store->next_value = (void*)value;
	desc->db_next_desc_val =  (char*) store;
	desc->db_next_desc_len = sizeof (db_next_info);
}

static entryp
extract_next_desc(db_next_desc* desc, int *next_type,
		db_next_index_desc** place2)
{
	entryp place;

	if (desc == NULL || desc->db_next_desc_len != sizeof (db_next_info)) {
		*next_type = 0;
		return (0);
	}
	*next_type = ((db_next_info*) desc->db_next_desc_val)->next_type;
	switch (*next_type) {
	case LINEAR:
		place = (entryp)
			((db_next_info*) desc->db_next_desc_val)->next_value;
		return (place);

	case CHAINED:
		*place2 = (db_next_index_desc*)
			((db_next_info*) desc->db_next_desc_val) ->next_value;
		return (0);
	default:
		*next_type = 0;   // invalid type
		return (0);
	}
}

/* Execute the specified action using the rest of the arguments as input.
	    Return  a structure db_result containing the result. */
db_result *
db::exec_action(db_action action, db_query *query,
		entry_object *content, db_next_desc* previous)
{
	entryp where, prev;
	db_result *res = new db_result;
	long num_answers;
	entry_object_p * ans;
	entry_object * single;
	db_next_index_desc *index_desc;
	int next_type;
	db_next_index_desc *prev_desc;

	if (res == NULL)
		FATAL3("db::exec_action: cannot allocate space for result",
			DB_MEMORY_LIMIT, NULL);

	res->objects.objects_len = 0; /* default */
	res->objects.objects_val = NULL;  /* default */

	switch (action) {
	case DB_LOOKUP:
		res->status = internal_db.lookup(query, &num_answers, &ans);
		res->objects.objects_len = (int) num_answers;
		res->objects.objects_val = ans;
		break;

	case DB_ADD:
		res->status = internal_db.add(query, content);
		break;

	case DB_REMOVE:
		res->status = internal_db.remove(query);
		break;

	case DB_FIRST:
		if (query == NULL) {
			res->status = internal_db.first(&where, &single);
			if (res->status == DB_SUCCESS)
				assign_next_desc(&(res->nextinfo), where);
		}  else {
			res->status = internal_db.first(query,
							&index_desc,
							&single);
			if (res->status == DB_SUCCESS)
				assign_next_desc(&(res->nextinfo), index_desc);
		}
		if (res->status == DB_SUCCESS) {
			res->objects.objects_val = new entry_object_p;
			if (res->objects.objects_val == NULL) {
				res->objects.objects_len = 0;
				delete res;
				FATAL3(
		"db::exec_action: cannot allocate space for DB_FIRST result",
		DB_MEMORY_LIMIT, NULL);
			}
			res->objects.objects_len = 1;
			res->objects.objects_val[0] = single;
		}
		break;

	case DB_NEXT:
		prev = extract_next_desc(previous, &next_type, &prev_desc);
		switch (next_type) {
		case LINEAR:
			if (prev != 0) {
				res->status = internal_db.next(prev, &where,
								&single);
				if (res->status == DB_SUCCESS)
					assign_next_desc(&(res->nextinfo),
								where);
			} else
					// invalid previous indicator
				res->status = DB_NOTFOUND;
			break;
		case CHAINED:
			if (prev_desc != NULL) {
				res->status = internal_db.next(prev_desc,
							&index_desc, &single);
				if (res->status == DB_SUCCESS)
					assign_next_desc(&(res->nextinfo),
								index_desc);
			} else
					// invalid previous indicator
				res->status = DB_NOTFOUND;
			break;
		default:
			WARNING("db::exec_action: invalid previous indicator");
			res->status = DB_BADQUERY;
		}
		if (previous && previous->db_next_desc_val) {
			delete previous->db_next_desc_val;
			previous->db_next_desc_len = 0;
			previous->db_next_desc_val = NULL;
		}
		if (res->status == DB_SUCCESS) {
			res->objects.objects_len = 1;
			res->objects.objects_val = new entry_object_p;
			if (res->objects.objects_val == NULL) {
				res->objects.objects_len = 0;
				delete res;
				FATAL3(
		    "db::exec_action: cannot allocate space for DB_NEXT result",
		    DB_MEMORY_LIMIT, NULL);
			}
			res->objects.objects_val[0] = single;
		}
		break;

	case DB_RESET_NEXT:
		prev = extract_next_desc(previous, &next_type, &prev_desc);
		switch (next_type) {
		case LINEAR:
			res->status = DB_SUCCESS;
			if (previous->db_next_desc_val) {
	delete previous->db_next_desc_val;
	previous->db_next_desc_len = 0;
	previous->db_next_desc_val = NULL;
			}
			break;   // do nothing
		case CHAINED:
			res->status = internal_db.reset_next(prev_desc);
			if (previous->db_next_desc_val) {
	delete previous->db_next_desc_val;
	previous->db_next_desc_len = 0;
	previous->db_next_desc_val = NULL;
			}
			break;
		default:
			WARNING("db::exec_action: invalid previous indicator");
			res->status = DB_BADQUERY;
		}
		break;

	case DB_ALL:
		res->status = internal_db.all(&num_answers, &ans);
		res->objects.objects_len = (int) num_answers;
		res->objects.objects_val = ans;
		break;

	default:
		WARNING("unknown request");
		res->status = DB_BADQUERY;
		return (res);
	}
	return (res);
}

/*
 * Log the given action and execute it.
 * The minor version of the database is updated after the action has
 * been executed and the database is flagged as being changed.
 * Return the structure db_result, or NULL if the logging failed or the
 * action is unknown.
*/
db_result *
db::log_action(db_action action, db_query *query, entry_object *content)
{
	vers *v = internal_db.get_version()->nextminor();
	db_result * res;
	db_log_entry le(action, v, query, content);
	bool_t copylog = FALSE;

	WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::log_action");
	/*
	 * If this is a synchronous operation on the master we should
	 * not copy the log for each operation.  Doing so causes
	 * massive disk IO that hampers the performance of these operations.
	 * Where as on the replica these operations are not synchronous
	 * (batched) and don't affect the performance as much.
	 */

	if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC))
		copylog = TRUE;

	if (open_log(copylog) < 0)  {
		delete v;
		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
				"wu db::log_action DB_STORAGE_LIMIT");
		return (empty_result(DB_STORAGE_LIMIT));
	}

	if (logfile->append(&le) < 0) {
		close_log();
		WARNING_M("db::log_action: could not add log entry: ");
		delete v;
		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
				"wu db::log_action DB_STORAGE_LIMIT");
		return (empty_result(DB_STORAGE_LIMIT));
	}

	switch (action) {
	case DB_ADD_NOSYNC:
		action = DB_ADD;
		break;
	case DB_REMOVE_NOSYNC:
		action = DB_REMOVE;
		break;
	default:
		if (logfile->sync_log() < 0) {
			close_log();
			WARNING_M("db::log_action: could not add log entry: ");
			delete v;
			WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
					"wu db::log_action DB_STORAGE_LIMIT");
			return (empty_result(DB_STORAGE_LIMIT));
		}
		break;
	}
	res = exec_action(action, query, content, NULL);
	internal_db.change_version(v);
	delete v;
	changed = TRUE;
	WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action");

	return (res);
}

/*
 * Execute 'action' using the rest of the arguments as input.
 * Return the result of the operation in a db_result structure;
 * Return NULL if the request is unknown.
 * If the action involves updates (ADD and REMOVE), it is logged first.
 */
db_result *
db::execute(db_action action, db_query *query,
		entry_object *content, db_next_desc* previous)
{
	db_result	*res;

	switch (action) {
	case DB_LOOKUP:
	case DB_FIRST:
	case DB_NEXT:
	case DB_ALL:
	case DB_RESET_NEXT:
		READLOCK(this, empty_result(DB_LOCK_ERROR), "r db::execute");
		res = exec_action(action, query, content, previous);
		READUNLOCK(this, empty_result(DB_LOCK_ERROR),
				"ru db::execute");
		return (res);

	case DB_ADD_NOLOG:
		WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::execute");
		changed = TRUE;
		res = exec_action(DB_ADD, query, content, previous);
		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
				"wu db::execute");
		return (res);

	case DB_ADD:
	case DB_REMOVE:
	case DB_ADD_NOSYNC:
	case DB_REMOVE_NOSYNC:
		/* log_action() will do the locking */
		return (log_action(action, query, content));

	default:
		WARNING("db::execute: unknown request");
		return (empty_result(DB_INTERNAL_ERROR));
	}
}

/* close existing logfile and delete its structure */
int
db::reset_log()
{
	WRITELOCK(this, -1, "w db::reset_log");
	/* try to close old log file */
	/* doesnot matter since we do synchronous writes only */
	if (logfile != NULL) {
	    if (logfile_opened == TRUE) {
		    logfile->sync_log();
		    if (logfile->close() < 0) {
			WARNING_M("db::reset_log: could not close log file: ");
		    }
		    remove_from_standby_list(this);
	    }
	    delete logfile;
	    logfile = NULL;
	}
	logfile_opened = FALSE;
	WRITEUNLOCK(this, -1, "wu db::reset_log");
	return (0);
}

/* close existing logfile, but leave its structure if exists */
int
db::close_log(int bypass_standby)
{
	WRITELOCK(this, -1, "w db::close_log");
	if (logfile != NULL && logfile_opened == TRUE) {
		logfile->sync_log();
		logfile->close();
		if (!bypass_standby)
		    remove_from_standby_list(this);
	}
	logfile_opened = FALSE;
	WRITEUNLOCK(this, -1, "wu db::close_log");
	return (0);
}

/* open logfile, creating its structure if it does not exist */
int
db::open_log(bool_t copylog)
{
	WRITELOCK(this, -1, "w db::open_log");
	if (logfile == NULL) {
		if ((logfile = new db_log(logfilename, PICKLE_APPEND))
		    == NULL)
			FATAL3("db::reset_log: cannot allocate space",
			    DB_MEMORY_LIMIT, -1);
	}

	if (logfile_opened == TRUE) {
		WRITEUNLOCK(this, -1, "wu db::open_log");
		return (0);
	}

	logfile->copylog = copylog;

	if ((logfile->open()) == NULL){
		WARNING_M("db::open_log: could not open log file: ");
		delete logfile;
		logfile = NULL;
		WRITEUNLOCK(this, -1, "wu db::open_log");
		return (-1);
	}
	add_to_standby_list(this);
	logfile_opened = TRUE;
	WRITEUNLOCK(this, -1, "wu db::open_log");
	return (0);
}

/*
 * Execute log entry 'j' on the database identified by 'dbchar' if the
 * version of j is later than that of the database.  If 'j' is executed,
 * 'count' is incremented and the database's verison is updated to that of 'j'.
 * Returns TRUE always for valid log entries; FALSE otherwise.
 */
static bool_t
apply_log_entry(db_log_entry * j, char * dbchar, int *count)
{
	db_mindex * db = (db_mindex *) dbchar;
	bool_t status = TRUE;

	WRITELOCK(db, FALSE, "db::apply_log_entry");

	if (db->get_version()->earlier_than(j->get_version())) {
		++ *count;
#ifdef DEBUG
		j->print();
#endif /* DEBUG */
		switch (j->get_action()) {
		case DB_ADD:
		case DB_ADD_NOSYNC:
			db->add(j->get_query(), j->get_object());
			break;

		case DB_REMOVE:
		case DB_REMOVE_NOSYNC:
			db->remove(j->get_query());
			break;

		default:
			WARNING("db::apply_log_entry: unknown action_type");
			WRITEUNLOCK(db, FALSE, "db::apply_log_entry");
			return (FALSE);
		}
		db->change_version(j->get_version());
	}

	WRITEUNLOCK(db, FALSE, "db::apply_log_entry");

	return (TRUE);  /* always want to TRUE if action valid ? */
}

/*
 * Execute log entry 'j' on this db.  'j' is executed if its version is
 * later than that of the database; if executed, the database's version
 * will be changed to that of 'j', regardless of the status of the operation.
 * Returns TRUE if 'j' was executed;   FALSE if it was not.
 * Log entry is added to this database's log if log_entry is applied.
 */
bool_t
db::execute_log_entry(db_log_entry *j)
{
	int count = 0;
	apply_log_entry (j, (char *) &internal_db, &count);
	bool_t copylog = FALSE;
	db_action action;

	/*
	 * If this is a synchronous operation on the master we should
	 * not copy the log for each operation.  Doing so causes
	 * massive disk IO that hampers the performance of these operations.
	 * Where as on the replica these operations are not synchronous
	 * (batched) and don't affect the performance as much.
	 */

	action = j->get_action();
	if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC))
		copylog = TRUE;

	/*
	 * should really record the log entry first, but can''t do that without
	 * knowing whether the log entry is applicable.
	 */
	WRITELOCK(this, FALSE, "w db::execute_log_entry");
	if (count == 1) {
		if (open_log(copylog) < 0) {
			WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
			return (FALSE);
		}

		if (logfile->append(j) < 0) {
			close_log();
			WARNING_M(
			"db::execute_log_entry: could not add log entry: ");
			WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
			return (FALSE);
		}
//	  close_log();  /* do this asynchronously */
	}
	WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");

	return (count == 1);
}

/* Incorporate updates in log to database already loaded.
	    Does not affect "logfile" */
int
db::incorporate_log(char* filename)
{
	db_log f(filename, PICKLE_READ);
	int ret;

	WRITELOCK(this, -1, "w db::incorporate_log");
	WRITELOCK2((&internal_db), -1, "w internal_db db::incorporate_log",
			this);
	internal_db.setNoWriteThrough();
	ret = f.execute_on_log(&(apply_log_entry), (char *) &internal_db);
	internal_db.clearNoWriteThrough();
	WRITEUNLOCK2(this, (&internal_db), ret, ret,
			"wu db::incorporate_log",
			"wu mindex db::incorporate_log");
	return (ret);
}

/* Load database and incorporate any logged updates into the loaded copy.
	    Return TRUE if load succeeds; FALSE otherwise. */
bool_t
db::load()
{
	int count;
	int load_status;

	WRITELOCK(this, FALSE, "w db::load");
	if (changed == TRUE)
		syslog(LOG_ERR,
	"WARNING: the current db '%s' has been changed but not checkpointed",
			dbfilename);

	unlink(tmpfilename);  /* get rid of partial checkpoints */

	if ((load_status = internal_db.load(dbfilename)) != 0) {
	    if (load_status < 0)
		    syslog(LOG_ERR, "Load of db '%s' failed", dbfilename);
	    /* otherwise, there was just nothing to load */
	    WRITEUNLOCK(this, FALSE, "wu db::load");
	    return (FALSE);
	}

	changed = FALSE;
	reset_log();
	WRITELOCK2((&internal_db), FALSE, "w internal_db db::load", this);
	internal_db.setInitialLoad();
	if ((count = incorporate_log(logfilename)) < 0)
		syslog(LOG_ERR, "incorporation of db logfile '%s' load failed",
	    logfilename);
	changed = (count > 0);
	internal_db.clearInitialLoad();
	WRITEUNLOCK2(this, (&internal_db),
			(changed ? TRUE : FALSE), (changed ? TRUE : FALSE),
			"wu db::load", "wu internal_db db::load");
	return (TRUE);
}

/*
 * Initialize the database using table scheme 's'.
 * Because the 'scheme' must be 'remembered' between restarts,
 * after the initialization, the empty database is checkpointed to record
 * the scheme. Returns TRUE if initialization succeeds; FALSE otherwise.
 */
bool_t
db::init(db_scheme * s)
{
	bool_t	ret = FALSE;

	WRITELOCK(this, FALSE, "w db::init");
	internal_db.init(s);
	if (internal_db.good()) {
		unlink(tmpfilename);	/* delete partial checkpoints */
		unlink(logfilename);	/* delete previous logfile */
		reset_log();
		changed = TRUE;		/* force dump to get scheme stored. */
		ret = checkpoint();
	}
	WRITEUNLOCK(this, FALSE, "wu db::init");
	return (ret);
}

/*
    Write out in-memory copy of database to file.
	    1.  Update major version.
	    2.  Dump contents to temporary file.
	    3.  Rename temporary file to real database file.
	    4.  Remove log file.
    A checkpoint is done only if it has changed since the previous checkpoint.
    Returns TRUE if checkpoint was successful; FALSE otherwise.
*/
bool_t
db::checkpoint()
{
	WRITELOCK(this, FALSE, "w db::checkpoint");
	if (changed == FALSE) {
		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
		return (TRUE);
	}

	vers *oldversion = new vers(internal_db.get_version()); /* copy */
	vers *nextversion = oldversion->nextmajor();	/* get next version */
	internal_db.change_version(nextversion);	/* change version */

	if (internal_db.dump(tmpfilename) < 0) {  	/* dump to tempfile */
		WARNING_M("db::checkpoint: could not dump database: ");
		internal_db.change_version(oldversion);	/* rollback */
		delete nextversion;
		delete oldversion;
		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
		return (FALSE);
	}
	if (rename(tmpfilename, dbfilename) < 0){  	/* rename permanently */
		WARNING_M(
		    "db::checkpoint: could not rename temp file to db file: ");
		internal_db.change_version(oldversion);	/* rollback */
		delete nextversion;
		delete oldversion;
		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
		return (FALSE);
	}
	reset_log();		/* should check for what? */
	unlink(logfilename);	/* should do atomic rename and log delete */
	delete nextversion;
	delete oldversion;
	changed = FALSE;
	WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
	return (TRUE);
}


/* For generating log_list */

struct traverse_info {
	vers *version;		// version to check for
	db_log_entry * head;	// head of list of log entries found
	db_log_entry * tail;	// tail of list of log entries found
};

/*
 * For the given entry determine, if it is later than the version supplied,
 *	    1.  increment 'count'.
 *	    2.  add the entry to the list of log entries found.
 *
 * Since traversal happens on an automatic (struct traverse_info) in
 * db::get_log_entries_since(), no locking is necessary.
 */
static bool_t entry_since(db_log_entry * j, char * tichar, int *count)
{
	traverse_info *ti = (traverse_info*) tichar;

	if (ti->version->earlier_than(j->get_version())) {
		++ *count;
//    j->print();   // debug
		if (ti->head == NULL)
			ti->head = j;
		else {
			ti->tail->setnextptr(j); // make last entry point to j
		}
		ti->tail = j;			// make j new last entry
	}

	return (TRUE);
}

/* Return structure db_log_list containing entries that are later
	    than the version 'v' given.  */
db_log_list*
db::get_log_entries_since(vers * v)
{
	int count;
	struct traverse_info ti;
	db_log f(logfilename, PICKLE_READ);

	ti.version = v;
	ti.head = ti.tail = NULL;

	count = f.execute_on_log(&(entry_since), (char *) &ti, FALSE);

	db_log_list * answer = new db_log_list;

	if (answer == NULL)
		FATAL3("db::get_log_entries_since: cannot allocate space",
			DB_MEMORY_LIMIT, NULL);

	answer->list.list_len = count;

	if (count > 0) {
		db_log_entry_p *entries;
		db_log_entry_p currentry, nextentry;
		int i;

		entries = answer->list.list_val = new db_log_entry_p[count];
		if (entries == NULL) {
			delete answer;
			FATAL3(
		"db::get_log_entries_since: cannot allocate space for entries",
		DB_MEMORY_LIMIT, NULL);
			}
		currentry = ti.head;
		for (i = 0, currentry = ti.head;
			i < count && currentry != NULL;
			i++) {
			entries[i] = currentry;
			nextentry = currentry->getnextptr();
			currentry->setnextptr(NULL);
			currentry = nextentry;
		}
	} else
		answer->list.list_val = NULL;

	return (answer);
}

/* Delete all files associated with database. */
int
db::remove_files()
{
	WRITELOCK(this, -1, "w db::remove_files");
	unlink(tmpfilename);  /* delete partial checkpoints */
	reset_log();
	unlink(logfilename);  /* delete logfile */
	unlink(dbfilename);   /* delete database file */
	WRITEUNLOCK(this, -1, "wu db::remove_files");
	return (0);
}

db_status
db::sync_log() {

	db_status	ret;

	WRITELOCK(this, DB_LOCK_ERROR, "w db::sync_log");
	if (logfile == 0) {
		ret = DB_BADTABLE;
	} else {
		if (logfile_opened == FALSE || logfile->sync_log())
			ret = DB_SUCCESS;
		else
			ret = DB_SYNC_FAILED;
	}
	WRITEUNLOCK(this, DB_LOCK_ERROR, "wu db::sync_log");
	return (ret);
}

/* Pass configuration information to the db_mindex */
bool_t
db::configure(char *objName) {
	return (internal_db.configure(objName));
}

db_mindex *
db::mindex(void) {
	return (&internal_db);
}