xref: /illumos-gate/usr/src/lib/libnisdb/db.cc (revision e8921a52c53ee69f7b65f054d9b2e886139daa59)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  *	db.cc
24  *
25  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
26  * Use is subject to license terms.
27  */
28 
29 #include <stdio.h>
30 #include <string.h>
31 #ifdef TDRPC
32 #include <sysent.h>
33 #else
34 #include <unistd.h>
35 #endif
36 
37 #include "nisdb_mt.h"
38 #include "db_headers.h"
39 #include "db.h"
40 
41 extern db_result *empty_result(db_status);
42 extern int add_to_standby_list(db*);
43 extern int remove_from_standby_list(db*);
44 
45 /* for db_next_desc */
46 
47 #define	LINEAR 1
48 #define	CHAINED 2
49 
50 struct db_next_info {
51 	int next_type;		/* linear or chained */
52 	void* next_value;	/* linear: entryp; */
53 				/* chained: db_next_index_desc* */
54 };
55 
56 
57 /* Constructor:  Create a database using the given name, 'dbname.'
58 	    The database is stored in a file named 'dbname'.
59 	    The log file is stored in a file named 'dbname'.log.
60 	    A temporary file 'dbname'.tmp is also used.   */
61 db::db(char* dbname)
62 {
63 	int len = strlen(dbname);
64 	dbfilename = new char[len+1];
65 	if (dbfilename == NULL)
66 		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
67 	logfilename = new char[len+5];
68 	if (logfilename == NULL) {
69 		delete dbfilename;
70 		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
71 	}
72 	tmpfilename = new char[len+5];
73 	if (tmpfilename == NULL) {
74 		delete dbfilename;
75 		delete logfilename;
76 		FATAL("db::db: cannot allocate space", DB_MEMORY_LIMIT);
77 	}
78 	sprintf(dbfilename, "%s", dbname);
79 	sprintf(logfilename, "%s.log", dbname);
80 	sprintf(tmpfilename, "%s.tmp", dbname);
81 	logfile = NULL;
82 	logfile_opened = FALSE;
83 	changed = FALSE;
84 	INITRW(db);
85 	READLOCKOK(db);
86 
87 	internal_db.setDbPtr(this);
88 	(void) internal_db.configure(dbname);
89 }
90 
91 /* destructor:  note that associated files should be removed separated  */
92 db::~db()
93 {
94 	(void)acqexcl();
95 	internal_db.reset();  /* clear any associated data structures */
96 	delete dbfilename;
97 	delete logfilename;
98 	delete tmpfilename;
99 	close_log();
100 	delete logfile;
101 	(void)destroylock();
102 }
103 
104 
105 static void
106 assign_next_desc(db_next_desc* desc, entryp value)
107 {
108 	db_next_info * store = new db_next_info;
109 	if (store == NULL) {
110 		desc->db_next_desc_val =  NULL;
111 		desc->db_next_desc_len = 0;
112 		FATAL("db::assign_next_desc: cannot allocate space",
113 			DB_MEMORY_LIMIT);
114 	}
115 
116 	store->next_type = LINEAR;
117 	store->next_value = (void*)value;
118 	desc->db_next_desc_val =  (char*) store;
119 	desc->db_next_desc_len = sizeof (db_next_info);
120 }
121 
122 static void
123 assign_next_desc(db_next_desc* desc, db_next_index_desc * value)
124 {
125 	db_next_info * store = new db_next_info;
126 	if (store == NULL) {
127 		desc->db_next_desc_val =  NULL;
128 		desc->db_next_desc_len = 0;
129 		FATAL("db::assign_next_desc: cannot allocate space (2)",
130 			DB_MEMORY_LIMIT);
131 	}
132 	store->next_type = CHAINED;
133 	store->next_value = (void*)value;
134 	desc->db_next_desc_val =  (char*) store;
135 	desc->db_next_desc_len = sizeof (db_next_info);
136 }
137 
138 static entryp
139 extract_next_desc(db_next_desc* desc, int *next_type,
140 		db_next_index_desc** place2)
141 {
142 	entryp place;
143 
144 	if (desc == NULL || desc->db_next_desc_len != sizeof (db_next_info)) {
145 		*next_type = 0;
146 		return (0);
147 	}
148 	*next_type = ((db_next_info*) desc->db_next_desc_val)->next_type;
149 	switch (*next_type) {
150 	case LINEAR:
151 		place = (entryp)
152 			((db_next_info*) desc->db_next_desc_val)->next_value;
153 		return (place);
154 
155 	case CHAINED:
156 		*place2 = (db_next_index_desc*)
157 			((db_next_info*) desc->db_next_desc_val) ->next_value;
158 		return (0);
159 	default:
160 		*next_type = 0;   // invalid type
161 		return (0);
162 	}
163 }
164 
165 /* Execute the specified action using the rest of the arguments as input.
166 	    Return  a structure db_result containing the result. */
167 db_result *
168 db::exec_action(db_action action, db_query *query,
169 		entry_object *content, db_next_desc* previous)
170 {
171 	entryp where, prev;
172 	db_result *res = new db_result;
173 	long num_answers;
174 	entry_object_p * ans;
175 	entry_object * single;
176 	db_next_index_desc *index_desc;
177 	int next_type;
178 	db_next_index_desc *prev_desc;
179 
180 	if (res == NULL)
181 		FATAL3("db::exec_action: cannot allocate space for result",
182 			DB_MEMORY_LIMIT, NULL);
183 
184 	res->objects.objects_len = 0; /* default */
185 	res->objects.objects_val = NULL;  /* default */
186 
187 	switch (action) {
188 	case DB_LOOKUP:
189 		res->status = internal_db.lookup(query, &num_answers, &ans);
190 		res->objects.objects_len = (int) num_answers;
191 		res->objects.objects_val = ans;
192 		break;
193 
194 	case DB_ADD:
195 		res->status = internal_db.add(query, content);
196 		break;
197 
198 	case DB_REMOVE:
199 		res->status = internal_db.remove(query);
200 		break;
201 
202 	case DB_FIRST:
203 		if (query == NULL) {
204 			res->status = internal_db.first(&where, &single);
205 			if (res->status == DB_SUCCESS)
206 				assign_next_desc(&(res->nextinfo), where);
207 		}  else {
208 			res->status = internal_db.first(query,
209 							&index_desc,
210 							&single);
211 			if (res->status == DB_SUCCESS)
212 				assign_next_desc(&(res->nextinfo), index_desc);
213 		}
214 		if (res->status == DB_SUCCESS) {
215 			res->objects.objects_val = new entry_object_p;
216 			if (res->objects.objects_val == NULL) {
217 				res->objects.objects_len = 0;
218 				delete res;
219 				FATAL3(
220 		"db::exec_action: cannot allocate space for DB_FIRST result",
221 		DB_MEMORY_LIMIT, NULL);
222 			}
223 			res->objects.objects_len = 1;
224 			res->objects.objects_val[0] = single;
225 		}
226 		break;
227 
228 	case DB_NEXT:
229 		prev = extract_next_desc(previous, &next_type, &prev_desc);
230 		switch (next_type) {
231 		case LINEAR:
232 			if (prev != 0) {
233 				res->status = internal_db.next(prev, &where,
234 								&single);
235 				if (res->status == DB_SUCCESS)
236 					assign_next_desc(&(res->nextinfo),
237 								where);
238 			} else
239 					// invalid previous indicator
240 				res->status = DB_NOTFOUND;
241 			break;
242 		case CHAINED:
243 			if (prev_desc != NULL) {
244 				res->status = internal_db.next(prev_desc,
245 							&index_desc, &single);
246 				if (res->status == DB_SUCCESS)
247 					assign_next_desc(&(res->nextinfo),
248 								index_desc);
249 			} else
250 					// invalid previous indicator
251 				res->status = DB_NOTFOUND;
252 			break;
253 		default:
254 			WARNING("db::exec_action: invalid previous indicator");
255 			res->status = DB_BADQUERY;
256 		}
257 		if (previous && previous->db_next_desc_val) {
258 			delete previous->db_next_desc_val;
259 			previous->db_next_desc_len = 0;
260 			previous->db_next_desc_val = NULL;
261 		}
262 		if (res->status == DB_SUCCESS) {
263 			res->objects.objects_len = 1;
264 			res->objects.objects_val = new entry_object_p;
265 			if (res->objects.objects_val == NULL) {
266 				res->objects.objects_len = 0;
267 				delete res;
268 				FATAL3(
269 		    "db::exec_action: cannot allocate space for DB_NEXT result",
270 		    DB_MEMORY_LIMIT, NULL);
271 			}
272 			res->objects.objects_val[0] = single;
273 		}
274 		break;
275 
276 	case DB_RESET_NEXT:
277 		prev = extract_next_desc(previous, &next_type, &prev_desc);
278 		switch (next_type) {
279 		case LINEAR:
280 			res->status = DB_SUCCESS;
281 			if (previous->db_next_desc_val) {
282 	delete previous->db_next_desc_val;
283 	previous->db_next_desc_len = 0;
284 	previous->db_next_desc_val = NULL;
285 			}
286 			break;   // do nothing
287 		case CHAINED:
288 			res->status = internal_db.reset_next(prev_desc);
289 			if (previous->db_next_desc_val) {
290 	delete previous->db_next_desc_val;
291 	previous->db_next_desc_len = 0;
292 	previous->db_next_desc_val = NULL;
293 			}
294 			break;
295 		default:
296 			WARNING("db::exec_action: invalid previous indicator");
297 			res->status = DB_BADQUERY;
298 		}
299 		break;
300 
301 	case DB_ALL:
302 		res->status = internal_db.all(&num_answers, &ans);
303 		res->objects.objects_len = (int) num_answers;
304 		res->objects.objects_val = ans;
305 		break;
306 
307 	default:
308 		WARNING("unknown request");
309 		res->status = DB_BADQUERY;
310 		return (res);
311 	}
312 	return (res);
313 }
314 
315 /*
316  * Log the given action and execute it.
317  * The minor version of the database is updated after the action has
318  * been executed and the database is flagged as being changed.
319  * Return the structure db_result, or NULL if the logging failed or the
320  * action is unknown.
321 */
322 db_result *
323 db::log_action(db_action action, db_query *query, entry_object *content)
324 {
325 	vers *v = internal_db.get_version()->nextminor();
326 	db_result * res;
327 	db_log_entry le(action, v, query, content);
328 	bool_t copylog = FALSE;
329 
330 	WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::log_action");
331 	/*
332 	 * If this is a synchronous operation on the master we should
333 	 * not copy the log for each operation.  Doing so causes
334 	 * massive disk IO that hampers the performance of these operations.
335 	 * Where as on the replica these operations are not synchronous
336 	 * (batched) and don't affect the performance as much.
337 	 */
338 
339 	if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC))
340 		copylog = TRUE;
341 
342 	if (open_log(copylog) < 0)  {
343 		delete v;
344 		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
345 				"wu db::log_action DB_STORAGE_LIMIT");
346 		return (empty_result(DB_STORAGE_LIMIT));
347 	}
348 
349 	if (logfile->append(&le) < 0) {
350 		close_log();
351 		WARNING_M("db::log_action: could not add log entry: ");
352 		delete v;
353 		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
354 				"wu db::log_action DB_STORAGE_LIMIT");
355 		return (empty_result(DB_STORAGE_LIMIT));
356 	}
357 
358 	switch (action) {
359 	case DB_ADD_NOSYNC:
360 		action = DB_ADD;
361 		break;
362 	case DB_REMOVE_NOSYNC:
363 		action = DB_REMOVE;
364 		break;
365 	default:
366 		if (logfile->sync_log() < 0) {
367 			close_log();
368 			WARNING_M("db::log_action: could not add log entry: ");
369 			delete v;
370 			WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
371 					"wu db::log_action DB_STORAGE_LIMIT");
372 			return (empty_result(DB_STORAGE_LIMIT));
373 		}
374 		break;
375 	}
376 	res = exec_action(action, query, content, NULL);
377 	internal_db.change_version(v);
378 	delete v;
379 	changed = TRUE;
380 	WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR), "wu db::log_action");
381 
382 	return (res);
383 }
384 
385 /*
386  * Execute 'action' using the rest of the arguments as input.
387  * Return the result of the operation in a db_result structure;
388  * Return NULL if the request is unknown.
389  * If the action involves updates (ADD and REMOVE), it is logged first.
390  */
391 db_result *
392 db::execute(db_action action, db_query *query,
393 		entry_object *content, db_next_desc* previous)
394 {
395 	db_result	*res;
396 
397 	switch (action) {
398 	case DB_LOOKUP:
399 	case DB_FIRST:
400 	case DB_NEXT:
401 	case DB_ALL:
402 	case DB_RESET_NEXT:
403 		READLOCK(this, empty_result(DB_LOCK_ERROR), "r db::execute");
404 		res = exec_action(action, query, content, previous);
405 		READUNLOCK(this, empty_result(DB_LOCK_ERROR),
406 				"ru db::execute");
407 		return (res);
408 
409 	case DB_ADD_NOLOG:
410 		WRITELOCK(this, empty_result(DB_LOCK_ERROR), "w db::execute");
411 		changed = TRUE;
412 		res = exec_action(DB_ADD, query, content, previous);
413 		WRITEUNLOCK(this, empty_result(DB_LOCK_ERROR),
414 				"wu db::execute");
415 		return (res);
416 
417 	case DB_ADD:
418 	case DB_REMOVE:
419 	case DB_ADD_NOSYNC:
420 	case DB_REMOVE_NOSYNC:
421 		/* log_action() will do the locking */
422 		return (log_action(action, query, content));
423 
424 	default:
425 		WARNING("db::execute: unknown request");
426 		return (empty_result(DB_INTERNAL_ERROR));
427 	}
428 }
429 
430 /* close existing logfile and delete its structure */
431 int
432 db::reset_log()
433 {
434 	WRITELOCK(this, -1, "w db::reset_log");
435 	/* try to close old log file */
436 	/* doesnot matter since we do synchronous writes only */
437 	if (logfile != NULL) {
438 	    if (logfile_opened == TRUE) {
439 		    logfile->sync_log();
440 		    if (logfile->close() < 0) {
441 			WARNING_M("db::reset_log: could not close log file: ");
442 		    }
443 		    remove_from_standby_list(this);
444 	    }
445 	    delete logfile;
446 	    logfile = NULL;
447 	}
448 	logfile_opened = FALSE;
449 	WRITEUNLOCK(this, -1, "wu db::reset_log");
450 	return (0);
451 }
452 
453 /* close existing logfile, but leave its structure if exists */
454 int
455 db::close_log(int bypass_standby)
456 {
457 	WRITELOCK(this, -1, "w db::close_log");
458 	if (logfile != NULL && logfile_opened == TRUE) {
459 		logfile->sync_log();
460 		logfile->close();
461 		if (!bypass_standby)
462 		    remove_from_standby_list(this);
463 	}
464 	logfile_opened = FALSE;
465 	WRITEUNLOCK(this, -1, "wu db::close_log");
466 	return (0);
467 }
468 
469 /* open logfile, creating its structure if it does not exist */
470 int
471 db::open_log(bool_t copylog)
472 {
473 	WRITELOCK(this, -1, "w db::open_log");
474 	if (logfile == NULL) {
475 		if ((logfile = new db_log(logfilename, PICKLE_APPEND))
476 		    == NULL)
477 			FATAL3("db::reset_log: cannot allocate space",
478 			    DB_MEMORY_LIMIT, -1);
479 	}
480 
481 	if (logfile_opened == TRUE) {
482 		WRITEUNLOCK(this, -1, "wu db::open_log");
483 		return (0);
484 	}
485 
486 	logfile->copylog = copylog;
487 
488 	if ((logfile->open()) == FALSE){
489 		WARNING_M("db::open_log: could not open log file: ");
490 		delete logfile;
491 		logfile = NULL;
492 		WRITEUNLOCK(this, -1, "wu db::open_log");
493 		return (-1);
494 	}
495 	add_to_standby_list(this);
496 	logfile_opened = TRUE;
497 	WRITEUNLOCK(this, -1, "wu db::open_log");
498 	return (0);
499 }
500 
501 /*
502  * Execute log entry 'j' on the database identified by 'dbchar' if the
503  * version of j is later than that of the database.  If 'j' is executed,
504  * 'count' is incremented and the database's verison is updated to that of 'j'.
505  * Returns TRUE always for valid log entries; FALSE otherwise.
506  */
507 static bool_t
508 apply_log_entry(db_log_entry * j, char * dbchar, int *count)
509 {
510 	db_mindex * db = (db_mindex *) dbchar;
511 	bool_t status = TRUE;
512 
513 	WRITELOCK(db, FALSE, "db::apply_log_entry");
514 
515 	if (db->get_version()->earlier_than(j->get_version())) {
516 		++ *count;
517 #ifdef DEBUG
518 		j->print();
519 #endif /* DEBUG */
520 		switch (j->get_action()) {
521 		case DB_ADD:
522 		case DB_ADD_NOSYNC:
523 			db->add(j->get_query(), j->get_object());
524 			break;
525 
526 		case DB_REMOVE:
527 		case DB_REMOVE_NOSYNC:
528 			db->remove(j->get_query());
529 			break;
530 
531 		default:
532 			WARNING("db::apply_log_entry: unknown action_type");
533 			WRITEUNLOCK(db, FALSE, "db::apply_log_entry");
534 			return (FALSE);
535 		}
536 		db->change_version(j->get_version());
537 	}
538 
539 	WRITEUNLOCK(db, FALSE, "db::apply_log_entry");
540 
541 	return (TRUE);  /* always want to TRUE if action valid ? */
542 }
543 
544 /*
545  * Execute log entry 'j' on this db.  'j' is executed if its version is
546  * later than that of the database; if executed, the database's version
547  * will be changed to that of 'j', regardless of the status of the operation.
548  * Returns TRUE if 'j' was executed;   FALSE if it was not.
549  * Log entry is added to this database's log if log_entry is applied.
550  */
551 bool_t
552 db::execute_log_entry(db_log_entry *j)
553 {
554 	int count = 0;
555 	apply_log_entry (j, (char *) &internal_db, &count);
556 	bool_t copylog = FALSE;
557 	db_action action;
558 
559 	/*
560 	 * If this is a synchronous operation on the master we should
561 	 * not copy the log for each operation.  Doing so causes
562 	 * massive disk IO that hampers the performance of these operations.
563 	 * Where as on the replica these operations are not synchronous
564 	 * (batched) and don't affect the performance as much.
565 	 */
566 
567 	action = j->get_action();
568 	if ((action == DB_ADD_NOSYNC) || (action == DB_REMOVE_NOSYNC))
569 		copylog = TRUE;
570 
571 	/*
572 	 * should really record the log entry first, but can''t do that without
573 	 * knowing whether the log entry is applicable.
574 	 */
575 	WRITELOCK(this, FALSE, "w db::execute_log_entry");
576 	if (count == 1) {
577 		if (open_log(copylog) < 0) {
578 			WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
579 			return (FALSE);
580 		}
581 
582 		if (logfile->append(j) < 0) {
583 			close_log();
584 			WARNING_M(
585 			"db::execute_log_entry: could not add log entry: ");
586 			WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
587 			return (FALSE);
588 		}
589 //	  close_log();  /* do this asynchronously */
590 	}
591 	WRITEUNLOCK(this, FALSE, "wu db::execute_log_entry");
592 
593 	return (count == 1);
594 }
595 
596 /* Incorporate updates in log to database already loaded.
597 	    Does not affect "logfile" */
598 int
599 db::incorporate_log(char* filename)
600 {
601 	db_log f(filename, PICKLE_READ);
602 	int ret;
603 
604 	WRITELOCK(this, -1, "w db::incorporate_log");
605 	WRITELOCK2((&internal_db), -1, "w internal_db db::incorporate_log",
606 			this);
607 	internal_db.setNoWriteThrough();
608 	ret = f.execute_on_log(&(apply_log_entry), (char *) &internal_db);
609 	internal_db.clearNoWriteThrough();
610 	WRITEUNLOCK2(this, (&internal_db), ret, ret,
611 			"wu db::incorporate_log",
612 			"wu mindex db::incorporate_log");
613 	return (ret);
614 }
615 
616 /* Load database and incorporate any logged updates into the loaded copy.
617 	    Return TRUE if load succeeds; FALSE otherwise. */
618 bool_t
619 db::load()
620 {
621 	int count;
622 	int load_status;
623 
624 	WRITELOCK(this, FALSE, "w db::load");
625 	if (changed == TRUE)
626 		syslog(LOG_ERR,
627 	"WARNING: the current db '%s' has been changed but not checkpointed",
628 			dbfilename);
629 
630 	unlink(tmpfilename);  /* get rid of partial checkpoints */
631 
632 	if ((load_status = internal_db.load(dbfilename)) != 0) {
633 	    if (load_status < 0)
634 		    syslog(LOG_ERR, "Load of db '%s' failed", dbfilename);
635 	    /* otherwise, there was just nothing to load */
636 	    WRITEUNLOCK(this, FALSE, "wu db::load");
637 	    return (FALSE);
638 	}
639 
640 	changed = FALSE;
641 	reset_log();
642 	WRITELOCK2((&internal_db), FALSE, "w internal_db db::load", this);
643 	internal_db.setInitialLoad();
644 	if ((count = incorporate_log(logfilename)) < 0)
645 		syslog(LOG_ERR, "incorporation of db logfile '%s' load failed",
646 	    logfilename);
647 	changed = (count > 0);
648 	internal_db.clearInitialLoad();
649 	WRITEUNLOCK2(this, (&internal_db),
650 			(changed ? TRUE : FALSE), (changed ? TRUE : FALSE),
651 			"wu db::load", "wu internal_db db::load");
652 	return (TRUE);
653 }
654 
655 /*
656  * Initialize the database using table scheme 's'.
657  * Because the 'scheme' must be 'remembered' between restarts,
658  * after the initialization, the empty database is checkpointed to record
659  * the scheme. Returns TRUE if initialization succeeds; FALSE otherwise.
660  */
661 bool_t
662 db::init(db_scheme * s)
663 {
664 	bool_t	ret = FALSE;
665 
666 	WRITELOCK(this, FALSE, "w db::init");
667 	internal_db.init(s);
668 	if (internal_db.good()) {
669 		unlink(tmpfilename);	/* delete partial checkpoints */
670 		unlink(logfilename);	/* delete previous logfile */
671 		reset_log();
672 		changed = TRUE;		/* force dump to get scheme stored. */
673 		ret = checkpoint();
674 	}
675 	WRITEUNLOCK(this, FALSE, "wu db::init");
676 	return (ret);
677 }
678 
679 /*
680     Write out in-memory copy of database to file.
681 	    1.  Update major version.
682 	    2.  Dump contents to temporary file.
683 	    3.  Rename temporary file to real database file.
684 	    4.  Remove log file.
685     A checkpoint is done only if it has changed since the previous checkpoint.
686     Returns TRUE if checkpoint was successful; FALSE otherwise.
687 */
688 bool_t
689 db::checkpoint()
690 {
691 	WRITELOCK(this, FALSE, "w db::checkpoint");
692 	if (changed == FALSE) {
693 		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
694 		return (TRUE);
695 	}
696 
697 	vers *oldversion = new vers(internal_db.get_version()); /* copy */
698 	vers *nextversion = oldversion->nextmajor();	/* get next version */
699 	internal_db.change_version(nextversion);	/* change version */
700 
701 	if (internal_db.dump(tmpfilename) < 0) {  	/* dump to tempfile */
702 		WARNING_M("db::checkpoint: could not dump database: ");
703 		internal_db.change_version(oldversion);	/* rollback */
704 		delete nextversion;
705 		delete oldversion;
706 		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
707 		return (FALSE);
708 	}
709 	if (rename(tmpfilename, dbfilename) < 0){  	/* rename permanently */
710 		WARNING_M(
711 		    "db::checkpoint: could not rename temp file to db file: ");
712 		internal_db.change_version(oldversion);	/* rollback */
713 		delete nextversion;
714 		delete oldversion;
715 		WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
716 		return (FALSE);
717 	}
718 	reset_log();		/* should check for what? */
719 	unlink(logfilename);	/* should do atomic rename and log delete */
720 	delete nextversion;
721 	delete oldversion;
722 	changed = FALSE;
723 	WRITEUNLOCK(this, FALSE, "wu db::checkpoint");
724 	return (TRUE);
725 }
726 
727 
728 /* For generating log_list */
729 
730 struct traverse_info {
731 	vers *version;		// version to check for
732 	db_log_entry * head;	// head of list of log entries found
733 	db_log_entry * tail;	// tail of list of log entries found
734 };
735 
736 /*
737  * For the given entry determine, if it is later than the version supplied,
738  *	    1.  increment 'count'.
739  *	    2.  add the entry to the list of log entries found.
740  *
741  * Since traversal happens on an automatic (struct traverse_info) in
742  * db::get_log_entries_since(), no locking is necessary.
743  */
744 static bool_t entry_since(db_log_entry * j, char * tichar, int *count)
745 {
746 	traverse_info *ti = (traverse_info*) tichar;
747 
748 	if (ti->version->earlier_than(j->get_version())) {
749 		++ *count;
750 //    j->print();   // debug
751 		if (ti->head == NULL)
752 			ti->head = j;
753 		else {
754 			ti->tail->setnextptr(j); // make last entry point to j
755 		}
756 		ti->tail = j;			// make j new last entry
757 	}
758 
759 	return (TRUE);
760 }
761 
762 /* Return structure db_log_list containing entries that are later
763 	    than the version 'v' given.  */
764 db_log_list*
765 db::get_log_entries_since(vers * v)
766 {
767 	int count;
768 	struct traverse_info ti;
769 	db_log f(logfilename, PICKLE_READ);
770 
771 	ti.version = v;
772 	ti.head = ti.tail = NULL;
773 
774 	count = f.execute_on_log(&(entry_since), (char *) &ti, FALSE);
775 
776 	db_log_list * answer = new db_log_list;
777 
778 	if (answer == NULL)
779 		FATAL3("db::get_log_entries_since: cannot allocate space",
780 			DB_MEMORY_LIMIT, NULL);
781 
782 	answer->list.list_len = count;
783 
784 	if (count > 0) {
785 		db_log_entry_p *entries;
786 		db_log_entry_p currentry, nextentry;
787 		int i;
788 
789 		entries = answer->list.list_val = new db_log_entry_p[count];
790 		if (entries == NULL) {
791 			delete answer;
792 			FATAL3(
793 		"db::get_log_entries_since: cannot allocate space for entries",
794 		DB_MEMORY_LIMIT, NULL);
795 			}
796 		currentry = ti.head;
797 		for (i = 0, currentry = ti.head;
798 			i < count && currentry != NULL;
799 			i++) {
800 			entries[i] = currentry;
801 			nextentry = currentry->getnextptr();
802 			currentry->setnextptr(NULL);
803 			currentry = nextentry;
804 		}
805 	} else
806 		answer->list.list_val = NULL;
807 
808 	return (answer);
809 }
810 
811 /* Delete all files associated with database. */
812 int
813 db::remove_files()
814 {
815 	WRITELOCK(this, -1, "w db::remove_files");
816 	unlink(tmpfilename);  /* delete partial checkpoints */
817 	reset_log();
818 	unlink(logfilename);  /* delete logfile */
819 	unlink(dbfilename);   /* delete database file */
820 	WRITEUNLOCK(this, -1, "wu db::remove_files");
821 	return (0);
822 }
823 
824 db_status
825 db::sync_log() {
826 
827 	db_status	ret;
828 
829 	WRITELOCK(this, DB_LOCK_ERROR, "w db::sync_log");
830 	if (logfile == 0) {
831 		ret = DB_BADTABLE;
832 	} else {
833 		if (logfile_opened == FALSE || logfile->sync_log())
834 			ret = DB_SUCCESS;
835 		else
836 			ret = DB_SYNC_FAILED;
837 	}
838 	WRITEUNLOCK(this, DB_LOCK_ERROR, "wu db::sync_log");
839 	return (ret);
840 }
841 
842 /* Pass configuration information to the db_mindex */
843 bool_t
844 db::configure(char *objName) {
845 	return (internal_db.configure(objName));
846 }
847 
848 db_mindex *
849 db::mindex(void) {
850 	return (&internal_db);
851 }
852