xref: /titanic_50/usr/src/lib/libnisdb/db.cc (revision 5c51f1241dbbdf2656d0e10011981411ed0c9673)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  *	db.cc
24  *
25  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #pragma ident	"%Z%%M%	%I%	%E% SMI"
30 
31 #include <stdio.h>
32 #include <string.h>
33 #ifdef TDRPC
34 #include <sysent.h>
35 #else
36 #include <unistd.h>
37 #endif
38 
39 #include "nisdb_mt.h"
40 #include "db_headers.h"
41 #include "db.h"
42 
43 extern db_result *empty_result(db_status);
44 extern int add_to_standby_list(db*);
45 extern int remove_from_standby_list(db*);
46 
47 /* for db_next_desc */
48 
49 #define	LINEAR 1
50 #define	CHAINED 2
51 
52 struct db_next_info {
53 	int next_type;		/* linear or chained */
54 	void* next_value;	/* linear: entryp; */
55 				/* chained: db_next_index_desc* */
56 };
57 
58 
59 /* Constructor:  Create a database using the given name, 'dbname.'
60 	    The database is stored in a file named 'dbname'.
61 	    The log file is stored in a file named 'dbname'.log.
62 	    A temporary file 'dbname'.tmp is also used.   */
63 db::db(char* dbname)
64 {
65 	int len = strlen(dbname);
66 	dbfilename = new char[len+1];
67 	if (dbfilename == NULL)
68 		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
69 	logfilename = new char[len+5];
70 	if (logfilename == NULL) {
71 		delete dbfilename;
72 		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
73 	}
74 	tmpfilename = new char[len+5];
75 	if (tmpfilename == NULL) {
76 		delete dbfilename;
77 		delete logfilename;
78 		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
79 	}
80 	sprintf(dbfilename, "%s", dbname);
81 	sprintf(logfilename, "%s.log", dbname);
82 	sprintf(tmpfilename, "%s.tmp", dbname);
83 	logfile = NULL;
84 	logfile_opened = FALSE;
85 	changed = FALSE;
86 	INITRW(db);
87 	READLOCKOK(db);
88 
89 	internal_db.setDbPtr(this);
90 	(void) internal_db.configure(dbname);
91 }
92 
93 /* destructor:  note that associated files should be removed separated  */
94 db::~db()
95 {
96 	(void)acqexcl();
97 	internal_db.reset();  /* clear any associated data structures */
98 	delete dbfilename;
99 	delete logfilename;
100 	delete tmpfilename;
101 	close_log();
102 	delete logfile;
103 	(void)destroylock();
104 }
105 
106 
107 static void
108 assign_next_desc(db_next_desc* desc, entryp value)
109 {
110 	db_next_info * store = new db_next_info;
111 	if (store == NULL) {
112 		desc->db_next_desc_val =  NULL;
113 		desc->db_next_desc_len = 0;
114 		FATAL("db::assign_next_desc: cannot allocate space",
115 			DB_MEMORY_LIMIT);
116 	}
117 
118 	store->next_type = LINEAR;
119 	store->next_value = (void*)value;
120 	desc->db_next_desc_val =  (char*) store;
121 	desc->db_next_desc_len = sizeof (db_next_info);
122 }
123 
124 static void
125 assign_next_desc(db_next_desc* desc, db_next_index_desc * value)
126 {
127 	db_next_info * store = new db_next_info;
128 	if (store == NULL) {
129 		desc->db_next_desc_val =  NULL;
130 		desc->db_next_desc_len = 0;
131 		FATAL("db::assign_next_desc: cannot allocate space (2)",
132 			DB_MEMORY_LIMIT);
133 	}
134 	store->next_type = CHAINED;
135 	store->next_value = (void*)value;
136 	desc->db_next_desc_val =  (char*) store;
137 	desc->db_next_desc_len = sizeof (db_next_info);
138 }
139 
140 static entryp
141 extract_next_desc(db_next_desc* desc, int *next_type,
142 		db_next_index_desc** place2)
143 {
144 	entryp place;
145 
146 	if (desc == NULL || desc->db_next_desc_len != sizeof (db_next_info)) {
147 		*next_type = 0;
148 		return (0);
149 	}
150 	*next_type = ((db_next_info*) desc->db_next_desc_val)->next_type;
151 	switch (*next_type) {
152 	case LINEAR:
153 		place = (entryp)
154 			((db_next_info*) desc->db_next_desc_val)->next_value;
155 		return (place);
156 
157 	case CHAINED:
158 		*place2 = (db_next_index_desc*)
159 			((db_next_info*) desc->db_next_desc_val) ->next_value;
160 		return (0);
161 	default:
162 		*next_type = 0;   // invalid type
163 		return (0);
164 	}
165 }
166 
167 /* Execute the specified action using the rest of the arguments as input.
168 	    Return  a structure db_result containing the result. */
169 db_result *
170 db::exec_action(db_action action, db_query *query,
171 		entry_object *content, db_next_desc* previous)
172 {
173 	entryp where, prev;
174 	db_result *res = new db_result;
175 	long num_answers;
176 	entry_object_p * ans;
177 	entry_object * single;
178 	db_next_index_desc *index_desc;
179 	int next_type;
180 	db_next_index_desc *prev_desc;
181 
182 	if (res == NULL)
183 		FATAL3("db::exec_action: cannot allocate space for result",
184 			DB_MEMORY_LIMIT, NULL);
185 
186 	res->objects.objects_len = 0; /* default */
187 	res->objects.objects_val = NULL;  /* default */
188 
189 	switch (action) {
190 	case DB_LOOKUP:
191 		res->status = internal_db.lookup(query, &num_answers, &ans);
192 		res->objects.objects_len = (int) num_answers;
193 		res->objects.objects_val = ans;
194 		break;
195 
196 	case DB_ADD:
197 		res->status = internal_db.add(query, content);
198 		break;
199 
200 	case DB_REMOVE:
201 		res->status = internal_db.remove(query);
202 		break;
203 
204 	case DB_FIRST:
205 		if (query == NULL) {
206 			res->status = internal_db.first(&where, &single);
207 			if (res->status == DB_SUCCESS)
208 				assign_next_desc(&(res->nextinfo), where);
209 		}  else {
210 			res->status = internal_db.first(query,
211 							&index_desc,
212 							&single);
213 			if (res->status == DB_SUCCESS)
214 				assign_next_desc(&(res->nextinfo), index_desc);
215 		}
216 		if (res->status == DB_SUCCESS) {
217 			res->objects.objects_val = new entry_object_p;
218 			if (res->objects.objects_val == NULL) {
219 				res->objects.objects_len = 0;
220 				delete res;
221 				FATAL3(
222 		"db::exec_action: cannot allocate space for DB_FIRST result",
223 		DB_MEMORY_LIMIT, NULL);
224 			}
225 			res->objects.objects_len = 1;
226 			res->objects.objects_val[0] = single;
227 		}
228 		break;
229 
230 	case DB_NEXT:
231 		prev = extract_next_desc(previous, &next_type, &prev_desc);
232 		switch (next_type) {
233 		case LINEAR:
234 			if (prev != 0) {
235 				res->status = internal_db.next(prev, &where,
236 								&single);
237 				if (res->status == DB_SUCCESS)
238 					assign_next_desc(&(res->nextinfo),
239 								where);
240 			} else
241 					// invalid previous indicator
242 				res->status = DB_NOTFOUND;
243 			break;
244 		case CHAINED:
245 			if (prev_desc != NULL) {
246 				res->status = internal_db.next(prev_desc,
247 							&index_desc, &single);
248 				if (res->status == DB_SUCCESS)
249 					assign_next_desc(&(res->nextinfo),
250 								index_desc);
251 			} else
252 					// invalid previous indicator
253 				res->status = DB_NOTFOUND;
254 			break;
255 		default:
256 			WARNING("db::exec_action: invalid previous indicator");
257 			res->status = DB_BADQUERY;
258 		}
259 		if (previous && previous->db_next_desc_val) {
260 			delete previous->db_next_desc_val;
261 			previous->db_next_desc_len = 0;
262 			previous->db_next_desc_val = NULL;
263 		}
264 		if (res->status == DB_SUCCESS) {
265 			res->objects.objects_len = 1;
266 			res->objects.objects_val = new entry_object_p;
267 			if (res->objects.objects_val == NULL) {
268 				res->objects.objects_len = 0;
269 				delete res;
270 				FATAL3(
271 		    "db::exec_action: cannot allocate space for DB_NEXT result",
272 		    DB_MEMORY_LIMIT, NULL);
273 			}
274 			res->objects.objects_val[0] = single;
275 		}
276 		break;
277 
278 	case DB_RESET_NEXT:
279 		prev = extract_next_desc(previous, &next_type, &prev_desc);
280 		switch (next_type) {
281 		case LINEAR:
282 			res->status = DB_SUCCESS;
283 			if (previous->db_next_desc_val) {
284 	delete previous->db_next_desc_val;
285 	previous->db_next_desc_len = 0;
286 	previous->db_next_desc_val = NULL;
287 			}
288 			break;   // do nothing
289 		case CHAINED:
290 			res->status = internal_db.reset_next(prev_desc);
291 			if (previous->db_next_desc_val) {
292 	delete previous->db_next_desc_val;
293 	previous->db_next_desc_len = 0;
294 	previous->db_next_desc_val = NULL;
295 			}
296 			break;
297 		default:
298 			WARNING("db::exec_action: invalid previous indicator");
299 			res->status = DB_BADQUERY;
300 		}
301 		break;
302 
303 	case DB_ALL:
304 		res->status = internal_db.all(&num_answers, &ans);
305 		res->objects.objects_len = (int) num_answers;
306 		res->objects.objects_val = ans;
307 		break;
308 
309 	default:
310 		WARNING("unknown request");
311 		res->status = DB_BADQUERY;
312 		return (res);
313 	}
314 	return (res);
315 }
316 
317 /*
318  * Log the given action and execute it.
319  * The minor version of the database is updated after the action has
320  * been executed and the database is flagged as being changed.
321  * Return the structure db_result, or NULL if the logging failed or the
322  * action is unknown.
323 */
324 db_result *
325 db::log_action(db_action action, db_query *query, entry_object *content)
326 {
327 	vers *v = internal_db.get_version()->nextminor();
328 	db_result * res;
329 	db_log_entry le(action, v, query, content);
330 	bool_t copylog = FALSE;
331 
332 	WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::log_action");
333 	/*
334 	 * If this is a synchronous operation on the master we should
335 	 * not copy the log for each operation.  Doing so causes
336 	 * massive disk IO that hampers the performance of these operations.
337 	 * Where as on the replica these operations are not synchronous
338 	 * (batched) and don't affect the performance as much.
339 	 */
340 
341 	if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC))
342 		copylog = TRUE;
343 
344 	if (open_log(copylog) < 0)  {
345 		delete v;
346 		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
347 				"wu db::log_action DB_STORAGE_LIMIT");
348 		return (empty_result(DB_STORAGE_LIMIT));
349 	}
350 
351 	if (logfile->append(&le) < 0) {
352 		close_log();
353 		WARNING_M("db::log_action: could not add log entry: ");
354 		delete v;
355 		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
356 				"wu db::log_action DB_STORAGE_LIMIT");
357 		return (empty_result(DB_STORAGE_LIMIT));
358 	}
359 
360 	switch (action) {
361 	case DB_ADD_NOSYNC:
362 		action = DB_ADD;
363 		break;
364 	case DB_REMOVE_NOSYNC:
365 		action = DB_REMOVE;
366 		break;
367 	default:
368 		if (logfile->sync_log() < 0) {
369 			close_log();
370 			WARNING_M("db::log_action: could not add log entry: ");
371 			delete v;
372 			WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
373 					"wu db::log_action DB_STORAGE_LIMIT");
374 			return (empty_result(DB_STORAGE_LIMIT));
375 		}
376 		break;
377 	}
378 	res = exec_action(action, query, content, NULL);
379 	internal_db.change_version(v);
380 	delete v;
381 	changed = TRUE;
382 	WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action");
383 
384 	return (res);
385 }
386 
387 /*
388  * Execute 'action' using the rest of the arguments as input.
389  * Return the result of the operation in a db_result structure;
390  * Return NULL if the request is unknown.
391  * If the action involves updates (ADD and REMOVE), it is logged first.
392  */
393 db_result *
394 db::execute(db_action action, db_query *query,
395 		entry_object *content, db_next_desc* previous)
396 {
397 	db_result	*res;
398 
399 	switch (action) {
400 	case DB_LOOKUP:
401 	case DB_FIRST:
402 	case DB_NEXT:
403 	case DB_ALL:
404 	case DB_RESET_NEXT:
405 		READLOCK(this, empty_result(DB_LOCK_ERROR), "r db::execute");
406 		res = exec_action(action, query, content, previous);
407 		READUNLOCK(this, empty_result(DB_LOCK_ERROR),
408 				"ru db::execute");
409 		return (res);
410 
411 	case DB_ADD_NOLOG:
412 		WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::execute");
413 		changed = TRUE;
414 		res = exec_action(DB_ADD, query, content, previous);
415 		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
416 				"wu db::execute");
417 		return (res);
418 
419 	case DB_ADD:
420 	case DB_REMOVE:
421 	case DB_ADD_NOSYNC:
422 	case DB_REMOVE_NOSYNC:
423 		/* log_action() will do the locking */
424 		return (log_action(action, query, content));
425 
426 	default:
427 		WARNING("db::execute: unknown request");
428 		return (empty_result(DB_INTERNAL_ERROR));
429 	}
430 }
431 
432 /* close existing logfile and delete its structure */
433 int
434 db::reset_log()
435 {
436 	WRITELOCK(this, -1, "w db::reset_log");
437 	/* try to close old log file */
438 	/* doesnot matter since we do synchronous writes only */
439 	if (logfile != NULL) {
440 	    if (logfile_opened == TRUE) {
441 		    logfile->sync_log();
442 		    if (logfile->close() < 0) {
443 			WARNING_M("db::reset_log: could not close log file: ");
444 		    }
445 		    remove_from_standby_list(this);
446 	    }
447 	    delete logfile;
448 	    logfile = NULL;
449 	}
450 	logfile_opened = FALSE;
451 	WRITEUNLOCK(this, -1, "wu db::reset_log");
452 	return (0);
453 }
454 
455 /* close existing logfile, but leave its structure if exists */
456 int
457 db::close_log(int bypass_standby)
458 {
459 	WRITELOCK(this, -1, "w db::close_log");
460 	if (logfile != NULL && logfile_opened == TRUE) {
461 		logfile->sync_log();
462 		logfile->close();
463 		if (!bypass_standby)
464 		    remove_from_standby_list(this);
465 	}
466 	logfile_opened = FALSE;
467 	WRITEUNLOCK(this, -1, "wu db::close_log");
468 	return (0);
469 }
470 
471 /* open logfile, creating its structure if it does not exist */
472 int
473 db::open_log(bool_t copylog)
474 {
475 	WRITELOCK(this, -1, "w db::open_log");
476 	if (logfile == NULL) {
477 		if ((logfile = new db_log(logfilename, PICKLE_APPEND))
478 		    == NULL)
479 			FATAL3("db::reset_log: cannot allocate space",
480 			    DB_MEMORY_LIMIT, -1);
481 	}
482 
483 	if (logfile_opened == TRUE) {
484 		WRITEUNLOCK(this, -1, "wu db::open_log");
485 		return (0);
486 	}
487 
488 	logfile->copylog = copylog;
489 
490 	if ((logfile->open()) == NULL){
491 		WARNING_M("db::open_log: could not open log file: ");
492 		delete logfile;
493 		logfile = NULL;
494 		WRITEUNLOCK(this, -1, "wu db::open_log");
495 		return (-1);
496 	}
497 	add_to_standby_list(this);
498 	logfile_opened = TRUE;
499 	WRITEUNLOCK(this, -1, "wu db::open_log");
500 	return (0);
501 }
502 
503 /*
504  * Execute log entry 'j' on the database identified by 'dbchar' if the
505  * version of j is later than that of the database.  If 'j' is executed,
506  * 'count' is incremented and the database's verison is updated to that of 'j'.
507  * Returns TRUE always for valid log entries; FALSE otherwise.
508  */
509 static bool_t
510 apply_log_entry(db_log_entry * j, char * dbchar, int *count)
511 {
512 	db_mindex * db = (db_mindex *) dbchar;
513 	bool_t status = TRUE;
514 
515 	WRITELOCK(db, FALSE, "db::apply_log_entry");
516 
517 	if (db->get_version()->earlier_than(j->get_version())) {
518 		++ *count;
519 #ifdef DEBUG
520 		j->print();
521 #endif /* DEBUG */
522 		switch (j->get_action()) {
523 		case DB_ADD:
524 		case DB_ADD_NOSYNC:
525 			db->add(j->get_query(), j->get_object());
526 			break;
527 
528 		case DB_REMOVE:
529 		case DB_REMOVE_NOSYNC:
530 			db->remove(j->get_query());
531 			break;
532 
533 		default:
534 			WARNING("db::apply_log_entry: unknown action_type");
535 			WRITEUNLOCK(db, FALSE, "db::apply_log_entry");
536 			return (FALSE);
537 		}
538 		db->change_version(j->get_version());
539 	}
540 
541 	WRITEUNLOCK(db, FALSE, "db::apply_log_entry");
542 
543 	return (TRUE);  /* always want to TRUE if action valid ? */
544 }
545 
546 /*
547  * Execute log entry 'j' on this db.  'j' is executed if its version is
548  * later than that of the database; if executed, the database's version
549  * will be changed to that of 'j', regardless of the status of the operation.
550  * Returns TRUE if 'j' was executed;   FALSE if it was not.
551  * Log entry is added to this database's log if log_entry is applied.
552  */
553 bool_t
554 db::execute_log_entry(db_log_entry *j)
555 {
556 	int count = 0;
557 	apply_log_entry (j, (char *) &internal_db, &count);
558 	bool_t copylog = FALSE;
559 	db_action action;
560 
561 	/*
562 	 * If this is a synchronous operation on the master we should
563 	 * not copy the log for each operation.  Doing so causes
564 	 * massive disk IO that hampers the performance of these operations.
565 	 * Where as on the replica these operations are not synchronous
566 	 * (batched) and don't affect the performance as much.
567 	 */
568 
569 	action = j->get_action();
570 	if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC))
571 		copylog = TRUE;
572 
573 	/*
574 	 * should really record the log entry first, but can''t do that without
575 	 * knowing whether the log entry is applicable.
576 	 */
577 	WRITELOCK(this, FALSE, "w db::execute_log_entry");
578 	if (count == 1) {
579 		if (open_log(copylog) < 0) {
580 			WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
581 			return (FALSE);
582 		}
583 
584 		if (logfile->append(j) < 0) {
585 			close_log();
586 			WARNING_M(
587 			"db::execute_log_entry: could not add log entry: ");
588 			WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
589 			return (FALSE);
590 		}
591 //	  close_log();  /* do this asynchronously */
592 	}
593 	WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
594 
595 	return (count == 1);
596 }
597 
598 /* Incorporate updates in log to database already loaded.
599 	    Does not affect "logfile" */
600 int
601 db::incorporate_log(char* filename)
602 {
603 	db_log f(filename, PICKLE_READ);
604 	int ret;
605 
606 	WRITELOCK(this, -1, "w db::incorporate_log");
607 	WRITELOCK2((&internal_db), -1, "w internal_db db::incorporate_log",
608 			this);
609 	internal_db.setNoWriteThrough();
610 	ret = f.execute_on_log(&(apply_log_entry), (char *) &internal_db);
611 	internal_db.clearNoWriteThrough();
612 	WRITEUNLOCK2(this, (&internal_db), ret, ret,
613 			"wu db::incorporate_log",
614 			"wu mindex db::incorporate_log");
615 	return (ret);
616 }
617 
618 /* Load database and incorporate any logged updates into the loaded copy.
619 	    Return TRUE if load succeeds; FALSE otherwise. */
620 bool_t
621 db::load()
622 {
623 	int count;
624 	int load_status;
625 
626 	WRITELOCK(this, FALSE, "w db::load");
627 	if (changed == TRUE)
628 		syslog(LOG_ERR,
629 	"WARNING: the current db '%s' has been changed but not checkpointed",
630 			dbfilename);
631 
632 	unlink(tmpfilename);  /* get rid of partial checkpoints */
633 
634 	if ((load_status = internal_db.load(dbfilename)) != 0) {
635 	    if (load_status < 0)
636 		    syslog(LOG_ERR, "Load of db '%s' failed", dbfilename);
637 	    /* otherwise, there was just nothing to load */
638 	    WRITEUNLOCK(this, FALSE, "wu db::load");
639 	    return (FALSE);
640 	}
641 
642 	changed = FALSE;
643 	reset_log();
644 	WRITELOCK2((&internal_db), FALSE, "w internal_db db::load", this);
645 	internal_db.setInitialLoad();
646 	if ((count = incorporate_log(logfilename)) < 0)
647 		syslog(LOG_ERR, "incorporation of db logfile '%s' load failed",
648 	    logfilename);
649 	changed = (count > 0);
650 	internal_db.clearInitialLoad();
651 	WRITEUNLOCK2(this, (&internal_db),
652 			(changed ? TRUE : FALSE), (changed ? TRUE : FALSE),
653 			"wu db::load", "wu internal_db db::load");
654 	return (TRUE);
655 }
656 
657 /*
658  * Initialize the database using table scheme 's'.
659  * Because the 'scheme' must be 'remembered' between restarts,
660  * after the initialization, the empty database is checkpointed to record
661  * the scheme. Returns TRUE if initialization succeeds; FALSE otherwise.
662  */
663 bool_t
664 db::init(db_scheme * s)
665 {
666 	bool_t	ret = FALSE;
667 
668 	WRITELOCK(this, FALSE, "w db::init");
669 	internal_db.init(s);
670 	if (internal_db.good()) {
671 		unlink(tmpfilename);	/* delete partial checkpoints */
672 		unlink(logfilename);	/* delete previous logfile */
673 		reset_log();
674 		changed = TRUE;		/* force dump to get scheme stored. */
675 		ret = checkpoint();
676 	}
677 	WRITEUNLOCK(this, FALSE, "wu db::init");
678 	return (ret);
679 }
680 
681 /*
682     Write out in-memory copy of database to file.
683 	    1.  Update major version.
684 	    2.  Dump contents to temporary file.
685 	    3.  Rename temporary file to real database file.
686 	    4.  Remove log file.
687     A checkpoint is done only if it has changed since the previous checkpoint.
688     Returns TRUE if checkpoint was successful; FALSE otherwise.
689 */
690 bool_t
691 db::checkpoint()
692 {
693 	WRITELOCK(this, FALSE, "w db::checkpoint");
694 	if (changed == FALSE) {
695 		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
696 		return (TRUE);
697 	}
698 
699 	vers *oldversion = new vers(internal_db.get_version()); /* copy */
700 	vers *nextversion = oldversion->nextmajor();	/* get next version */
701 	internal_db.change_version(nextversion);	/* change version */
702 
703 	if (internal_db.dump(tmpfilename) < 0) {  	/* dump to tempfile */
704 		WARNING_M("db::checkpoint: could not dump database: ");
705 		internal_db.change_version(oldversion);	/* rollback */
706 		delete nextversion;
707 		delete oldversion;
708 		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
709 		return (FALSE);
710 	}
711 	if (rename(tmpfilename, dbfilename) < 0){  	/* rename permanently */
712 		WARNING_M(
713 		    "db::checkpoint: could not rename temp file to db file: ");
714 		internal_db.change_version(oldversion);	/* rollback */
715 		delete nextversion;
716 		delete oldversion;
717 		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
718 		return (FALSE);
719 	}
720 	reset_log();		/* should check for what? */
721 	unlink(logfilename);	/* should do atomic rename and log delete */
722 	delete nextversion;
723 	delete oldversion;
724 	changed = FALSE;
725 	WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
726 	return (TRUE);
727 }
728 
729 
730 /* For generating log_list */
731 
732 struct traverse_info {
733 	vers *version;		// version to check for
734 	db_log_entry * head;	// head of list of log entries found
735 	db_log_entry * tail;	// tail of list of log entries found
736 };
737 
738 /*
739  * For the given entry determine, if it is later than the version supplied,
740  *	    1.  increment 'count'.
741  *	    2.  add the entry to the list of log entries found.
742  *
743  * Since traversal happens on an automatic (struct traverse_info) in
744  * db::get_log_entries_since(), no locking is necessary.
745  */
746 static bool_t entry_since(db_log_entry * j, char * tichar, int *count)
747 {
748 	traverse_info *ti = (traverse_info*) tichar;
749 
750 	if (ti->version->earlier_than(j->get_version())) {
751 		++ *count;
752 //    j->print();   // debug
753 		if (ti->head == NULL)
754 			ti->head = j;
755 		else {
756 			ti->tail->setnextptr(j); // make last entry point to j
757 		}
758 		ti->tail = j;			// make j new last entry
759 	}
760 
761 	return (TRUE);
762 }
763 
764 /* Return structure db_log_list containing entries that are later
765 	    than the version 'v' given.  */
766 db_log_list*
767 db::get_log_entries_since(vers * v)
768 {
769 	int count;
770 	struct traverse_info ti;
771 	db_log f(logfilename, PICKLE_READ);
772 
773 	ti.version = v;
774 	ti.head = ti.tail = NULL;
775 
776 	count = f.execute_on_log(&(entry_since), (char *) &ti, FALSE);
777 
778 	db_log_list * answer = new db_log_list;
779 
780 	if (answer == NULL)
781 		FATAL3("db::get_log_entries_since: cannot allocate space",
782 			DB_MEMORY_LIMIT, NULL);
783 
784 	answer->list.list_len = count;
785 
786 	if (count > 0) {
787 		db_log_entry_p *entries;
788 		db_log_entry_p currentry, nextentry;
789 		int i;
790 
791 		entries = answer->list.list_val = new db_log_entry_p[count];
792 		if (entries == NULL) {
793 			delete answer;
794 			FATAL3(
795 		"db::get_log_entries_since: cannot allocate space for entries",
796 		DB_MEMORY_LIMIT, NULL);
797 			}
798 		currentry = ti.head;
799 		for (i = 0, currentry = ti.head;
800 			i < count && currentry != NULL;
801 			i++) {
802 			entries[i] = currentry;
803 			nextentry = currentry->getnextptr();
804 			currentry->setnextptr(NULL);
805 			currentry = nextentry;
806 		}
807 	} else
808 		answer->list.list_val = NULL;
809 
810 	return (answer);
811 }
812 
813 /* Delete all files associated with database. */
814 int
815 db::remove_files()
816 {
817 	WRITELOCK(this, -1, "w db::remove_files");
818 	unlink(tmpfilename);  /* delete partial checkpoints */
819 	reset_log();
820 	unlink(logfilename);  /* delete logfile */
821 	unlink(dbfilename);   /* delete database file */
822 	WRITEUNLOCK(this, -1, "wu db::remove_files");
823 	return (0);
824 }
825 
826 db_status
827 db::sync_log() {
828 
829 	db_status	ret;
830 
831 	WRITELOCK(this, DB_LOCK_ERROR, "w db::sync_log");
832 	if (logfile == 0) {
833 		ret = DB_BADTABLE;
834 	} else {
835 		if (logfile_opened == FALSE || logfile->sync_log())
836 			ret = DB_SUCCESS;
837 		else
838 			ret = DB_SYNC_FAILED;
839 	}
840 	WRITEUNLOCK(this, DB_LOCK_ERROR, "wu db::sync_log");
841 	return (ret);
842 }
843 
844 /* Pass configuration information to the db_mindex */
845 bool_t
846 db::configure(char *objName) {
847 	return (internal_db.configure(objName));
848 }
849 
850 db_mindex *
851 db::mindex(void) {
852 	return (&internal_db);
853 }
854