xref: /titanic_41/usr/src/cmd/sendmail/db/db/db_dup.c (revision 159d09a20817016f09b3ea28d1bdada4a336bb91)
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 1997, 1998
5  *	Sleepycat Software.  All rights reserved.
6  */
7 
8 #include "config.h"
9 
10 #ifndef lint
11 static const char sccsid[] = "@(#)db_dup.c	10.35 (Sleepycat) 12/2/98";
12 #endif /* not lint */
13 
14 #ifndef NO_SYSTEM_INCLUDES
15 #include <sys/types.h>
16 
17 #include <errno.h>
18 #include <string.h>
19 #endif
20 
21 #include "db_int.h"
22 #include "db_page.h"
23 #include "btree.h"
24 #include "db_am.h"
25 
26 static int __db_addpage __P((DBC *,
27     PAGE **, db_indx_t *, int (*)(DBC *, u_int32_t, PAGE **)));
28 static int __db_dsplit __P((DBC *,
29     PAGE **, db_indx_t *, u_int32_t, int (*)(DBC *, u_int32_t, PAGE **)));
30 
31 /*
32  * __db_dput --
33  *	Put a duplicate item onto a duplicate page at the given index.
34  *
35  * PUBLIC: int __db_dput __P((DBC *, DBT *,
36  * PUBLIC:    PAGE **, db_indx_t *, int (*)(DBC *, u_int32_t, PAGE **)));
37  */
38 int
39 __db_dput(dbc, dbt, pp, indxp, newfunc)
40 	DBC *dbc;
41 	DBT *dbt;
42 	PAGE **pp;
43 	db_indx_t *indxp;
44 	int (*newfunc) __P((DBC *, u_int32_t, PAGE **));
45 {
46 	BOVERFLOW bo;
47 	DBT *data_dbtp, hdr_dbt, *hdr_dbtp;
48 	PAGE *pagep;
49 	db_indx_t size, isize;
50 	db_pgno_t pgno;
51 	int ret;
52 
53 	/*
54 	 * We need some access method independent threshold for when we put
55 	 * a duplicate item onto an overflow page.
56 	 */
57 	if (dbt->size > 0.25 * dbc->dbp->pgsize) {
58 		if ((ret = __db_poff(dbc, dbt, &pgno, newfunc)) != 0)
59 			return (ret);
60 		UMRW(bo.unused1);
61 		B_TSET(bo.type, B_OVERFLOW, 0);
62 		UMRW(bo.unused2);
63 		bo.tlen = dbt->size;
64 		bo.pgno = pgno;
65 		hdr_dbt.data = &bo;
66 		hdr_dbt.size = isize = BOVERFLOW_SIZE;
67 		hdr_dbtp = &hdr_dbt;
68 		size = BOVERFLOW_PSIZE;
69 		data_dbtp = NULL;
70 	} else {
71 		size = BKEYDATA_PSIZE(dbt->size);
72 		isize = BKEYDATA_SIZE(dbt->size);
73 		hdr_dbtp = NULL;
74 		data_dbtp = dbt;
75 	}
76 
77 	pagep = *pp;
78 	if (size > P_FREESPACE(pagep)) {
79 		if (*indxp == NUM_ENT(*pp) && NEXT_PGNO(*pp) == PGNO_INVALID)
80 			ret = __db_addpage(dbc, pp, indxp, newfunc);
81 		else
82 			ret = __db_dsplit(dbc, pp, indxp, isize, newfunc);
83 		if (ret != 0)
84 			/*
85 			 * XXX
86 			 * Pages not returned to free list.
87 			 */
88 			return (ret);
89 		pagep = *pp;
90 	}
91 
92 	/*
93 	 * Now, pagep references the page on which to insert and indx is the
94 	 * the location to insert.
95 	 */
96 	if ((ret = __db_pitem(dbc,
97 	    pagep, (u_int32_t)*indxp, isize, hdr_dbtp, data_dbtp)) != 0)
98 		return (ret);
99 
100 	(void)memp_fset(dbc->dbp->mpf, pagep, DB_MPOOL_DIRTY);
101 	return (0);
102 }
103 
104 /*
105  * __db_drem --
106  *	Remove a duplicate at the given index on the given page.
107  *
108  * PUBLIC: int __db_drem __P((DBC *,
109  * PUBLIC:    PAGE **, u_int32_t, int (*)(DBC *, PAGE *)));
110  */
111 int
112 __db_drem(dbc, pp, indx, freefunc)
113 	DBC *dbc;
114 	PAGE **pp;
115 	u_int32_t indx;
116 	int (*freefunc) __P((DBC *, PAGE *));
117 {
118 	PAGE *pagep;
119 	int ret;
120 
121 	pagep = *pp;
122 
123 	/* Check if we are freeing a big item. */
124 	if (B_TYPE(GET_BKEYDATA(pagep, indx)->type) == B_OVERFLOW) {
125 		if ((ret = __db_doff(dbc,
126 		    GET_BOVERFLOW(pagep, indx)->pgno, freefunc)) != 0)
127 			return (ret);
128 		ret = __db_ditem(dbc, pagep, indx, BOVERFLOW_SIZE);
129 	} else
130 		ret = __db_ditem(dbc, pagep, indx,
131 		    BKEYDATA_SIZE(GET_BKEYDATA(pagep, indx)->len));
132 	if (ret != 0)
133 		return (ret);
134 
135 	if (NUM_ENT(pagep) == 0) {
136 		/*
137 		 * If the page is emptied, then the page is freed and the pp
138 		 * parameter is set to reference the next, locked page in the
139 		 * duplicate chain, if one exists.  If there was no such page,
140 		 * then it is set to NULL.
141 		 *
142 		 * !!!
143 		 * __db_relink will set the dirty bit for us.
144 		 */
145 		if ((ret = __db_relink(dbc, DB_REM_PAGE, pagep, pp, 0)) != 0)
146 			return (ret);
147 		if ((ret = freefunc(dbc, pagep)) != 0)
148 			return (ret);
149 	} else
150 		(void)memp_fset(dbc->dbp->mpf, pagep, DB_MPOOL_DIRTY);
151 
152 	return (0);
153 }
154 
155 /*
156  * __db_dend --
157  *	Find the last page in a set of offpage duplicates.
158  *
159  * PUBLIC: int __db_dend __P((DBC *, db_pgno_t, PAGE **));
160  */
161 int
162 __db_dend(dbc, pgno, pp)
163 	DBC *dbc;
164 	db_pgno_t pgno;
165 	PAGE **pp;
166 {
167 	DB *dbp;
168 	PAGE *h;
169 	int ret;
170 
171 	dbp = dbc->dbp;
172 
173 	/*
174 	 * This implements DB_KEYLAST.  The last page is returned in pp; pgno
175 	 * should be the page number of the first page of the duplicate chain.
176 	 *
177 	 * *pp may be non-NULL -- if given a valid page use it.
178 	 */
179 	if (*pp != NULL)
180 		goto started;
181 	for (;;) {
182 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) {
183 			(void)__db_pgerr(dbp, pgno);
184 			return (ret);
185 		}
186 started:	h = *pp;
187 
188 		if ((pgno = NEXT_PGNO(h)) == PGNO_INVALID)
189 			break;
190 
191 		if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
192 			return (ret);
193 	}
194 	return (0);
195 }
196 
197 /*
198  * __db_dsplit --
199  *	Split a page of duplicates, calculating the split point based
200  *	on an element of size "size" being added at "*indxp".
201  *	On entry hp contains a pointer to the page-pointer of the original
202  *	page.  On exit, it returns a pointer to the page containing "*indxp"
203  *	and "indxp" has been modified to reflect the index on the new page
204  *	where the element should be added.  The function returns with
205  *	the page on which the insert should happen, not yet put.
206  */
207 static int
208 __db_dsplit(dbc, hp, indxp, size, newfunc)
209 	DBC *dbc;
210 	PAGE **hp;
211 	db_indx_t *indxp;
212 	u_int32_t size;
213 	int (*newfunc) __P((DBC *, u_int32_t, PAGE **));
214 {
215 	PAGE *h, *np, *tp;
216 	BKEYDATA *bk;
217 	DBT page_dbt;
218 	DB *dbp;
219 	size_t pgsize;
220 	db_indx_t halfbytes, i, indx, lastsum, nindex, oindex, s, sum;
221 	int did_indx, ret, t_ret;
222 
223 	h = *hp;
224 	indx = *indxp;
225 	ret = 0;
226 	dbp = dbc->dbp;
227 	pgsize = dbp->pgsize;
228 
229 	/* Create a temporary page to do compaction onto. */
230 	if ((ret = __os_malloc(pgsize, NULL, &tp)) != 0)
231 		return (ret);
232 
233 	/* Create new page for the split. */
234 	if ((ret = newfunc(dbc, P_DUPLICATE, &np)) != 0) {
235 		__os_free(tp, pgsize);
236 		return (ret);
237 	}
238 
239 	P_INIT(np, pgsize, PGNO(np), PGNO(h), NEXT_PGNO(h), 0,
240 	    P_DUPLICATE);
241 	P_INIT(tp, pgsize, PGNO(h), PREV_PGNO(h), PGNO(np), 0,
242 	    P_DUPLICATE);
243 
244 	/* Figure out the split point */
245 	halfbytes = (pgsize - HOFFSET(h)) / 2;
246 	did_indx = 0;
247 	for (sum = 0, lastsum = 0, i = 0; i < NUM_ENT(h); i++) {
248 		if (i == indx) {
249 			sum += size;
250 			did_indx = 1;
251 			if (lastsum < halfbytes && sum >= halfbytes) {
252 				/* We've crossed the halfway point. */
253 				if ((db_indx_t)(halfbytes - lastsum) <
254 				    (db_indx_t)(sum - halfbytes)) {
255 					*hp = np;
256 					*indxp = 0;
257 				} else
258 					*indxp = i;
259 				break;
260 			}
261 			*indxp = i;
262 			lastsum = sum;
263 		}
264 		if (B_TYPE(GET_BKEYDATA(h, i)->type) == B_KEYDATA)
265 			sum += BKEYDATA_SIZE(GET_BKEYDATA(h, i)->len);
266 		else
267 			sum += BOVERFLOW_SIZE;
268 
269 		if (lastsum < halfbytes && sum >= halfbytes) {
270 			/* We've crossed the halfway point. */
271 			if ((db_indx_t)(sum - halfbytes) <
272 			    (db_indx_t)(halfbytes - lastsum))
273 				i++;
274 			break;
275 		}
276 	}
277 	/*
278 	 * Check if we have set the return values of the index pointer and
279 	 * page pointer.
280 	 */
281 	if (!did_indx) {
282 		*hp = np;
283 		*indxp = indx - i;
284 	}
285 
286 	if (DB_LOGGING(dbc)) {
287 		page_dbt.size = dbp->pgsize;
288 		page_dbt.data = h;
289 		if ((ret = __db_split_log(dbp->dbenv->lg_info,
290 		    dbc->txn, &LSN(h), 0, DB_SPLITOLD, dbp->log_fileid,
291 		    PGNO(h), &page_dbt, &LSN(h))) != 0) {
292 			__os_free(tp, pgsize);
293 			return (ret);
294 		}
295 		LSN(tp) = LSN(h);
296 	}
297 
298 	/*
299 	 * If it's a btree, adjust the cursors.
300 	 *
301 	 * i is the index of the first element to move onto the new page.
302 	 */
303 	if (dbp->type == DB_BTREE)
304 		__bam_ca_split(dbp, PGNO(h), PGNO(h), PGNO(np), i, 0);
305 
306 	for (nindex = 0, oindex = i; oindex < NUM_ENT(h); oindex++) {
307 		bk = GET_BKEYDATA(h, oindex);
308 		if (B_TYPE(bk->type) == B_KEYDATA)
309 			s = BKEYDATA_SIZE(bk->len);
310 		else
311 			s = BOVERFLOW_SIZE;
312 
313 		np->inp[nindex++] = HOFFSET(np) -= s;
314 		memcpy((u_int8_t *)np + HOFFSET(np), bk, s);
315 		NUM_ENT(np)++;
316 	}
317 
318 	/*
319 	 * Now do data compaction by copying the remaining stuff onto the
320 	 * temporary page and then copying it back to the real page.
321 	 */
322 	for (nindex = 0, oindex = 0; oindex < i; oindex++) {
323 		bk = GET_BKEYDATA(h, oindex);
324 		if (B_TYPE(bk->type) == B_KEYDATA)
325 			s = BKEYDATA_SIZE(bk->len);
326 		else
327 			s = BOVERFLOW_SIZE;
328 
329 		tp->inp[nindex++] = HOFFSET(tp) -= s;
330 		memcpy((u_int8_t *)tp + HOFFSET(tp), bk, s);
331 		NUM_ENT(tp)++;
332 	}
333 
334 	/*
335 	 * This page (the temporary) should be only half full, so we do two
336 	 * memcpy's, one for the top of the page and one for the bottom of
337 	 * the page.  This way we avoid copying the middle which should be
338 	 * about half a page.
339 	 */
340 	memcpy(h, tp, LOFFSET(tp));
341 	memcpy((u_int8_t *)h + HOFFSET(tp),
342 	    (u_int8_t *)tp + HOFFSET(tp), pgsize - HOFFSET(tp));
343 	__os_free(tp, pgsize);
344 
345 	if (DB_LOGGING(dbc)) {
346 		/*
347 		 * XXX
348 		 * If either of these fails, are we leaving pages pinned?
349 		 * Yes, but it seems like this happens in error case.
350 		 */
351 		page_dbt.size = pgsize;
352 		page_dbt.data = h;
353 		if ((ret = __db_split_log(dbp->dbenv->lg_info,
354 		    dbc->txn, &LSN(h), 0, DB_SPLITNEW, dbp->log_fileid,
355 		    PGNO(h), &page_dbt, &LSN(h))) != 0)
356 			return (ret);
357 
358 		page_dbt.size = pgsize;
359 		page_dbt.data = np;
360 		if ((ret = __db_split_log(dbp->dbenv->lg_info,
361 		    dbc->txn, &LSN(np), 0, DB_SPLITNEW, dbp->log_fileid,
362 		    PGNO(np),  &page_dbt, &LSN(np))) != 0)
363 			return (ret);
364 	}
365 
366 	/*
367 	 * Finally, if there was a next page after the page being
368 	 * split, fix its prev pointer.
369 	 */
370 	if (np->next_pgno != PGNO_INVALID)
371 	    ret = __db_relink(dbc, DB_ADD_PAGE, np, NULL, 1);
372 
373 	/*
374 	 * Figure out if the location we're interested in is on the new
375 	 * page, and if so, reset the callers' pointer.  Push the other
376 	 * page back to the store.
377 	 */
378 	if (*hp == h)
379 		t_ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY);
380 	else
381 		t_ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY);
382 
383 	return (ret != 0 ? ret : t_ret);
384 }
385 
386 /*
387  * __db_ditem --
388  *	Remove an item from a page.
389  *
390  * PUBLIC:  int __db_ditem __P((DBC *, PAGE *, u_int32_t, u_int32_t));
391  */
392 int
393 __db_ditem(dbc, pagep, indx, nbytes)
394 	DBC *dbc;
395 	PAGE *pagep;
396 	u_int32_t indx, nbytes;
397 {
398 	DB *dbp;
399 	DBT ldbt;
400 	db_indx_t cnt, offset;
401 	int ret;
402 	u_int8_t *from;
403 
404 	dbp = dbc->dbp;
405 	if (DB_LOGGING(dbc)) {
406 		ldbt.data = P_ENTRY(pagep, indx);
407 		ldbt.size = nbytes;
408 		if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbc->txn,
409 		    &LSN(pagep), 0, DB_REM_DUP, dbp->log_fileid, PGNO(pagep),
410 		    (u_int32_t)indx, nbytes, &ldbt, NULL, &LSN(pagep))) != 0)
411 			return (ret);
412 	}
413 
414 	/*
415 	 * If there's only a single item on the page, we don't have to
416 	 * work hard.
417 	 */
418 	if (NUM_ENT(pagep) == 1) {
419 		NUM_ENT(pagep) = 0;
420 		HOFFSET(pagep) = dbp->pgsize;
421 		return (0);
422 	}
423 
424 	/*
425 	 * Pack the remaining key/data items at the end of the page.  Use
426 	 * memmove(3), the regions may overlap.
427 	 */
428 	from = (u_int8_t *)pagep + HOFFSET(pagep);
429 	memmove(from + nbytes, from, pagep->inp[indx] - HOFFSET(pagep));
430 	HOFFSET(pagep) += nbytes;
431 
432 	/* Adjust the indices' offsets. */
433 	offset = pagep->inp[indx];
434 	for (cnt = 0; cnt < NUM_ENT(pagep); ++cnt)
435 		if (pagep->inp[cnt] < offset)
436 			pagep->inp[cnt] += nbytes;
437 
438 	/* Shift the indices down. */
439 	--NUM_ENT(pagep);
440 	if (indx != NUM_ENT(pagep))
441 		memmove(&pagep->inp[indx], &pagep->inp[indx + 1],
442 		    sizeof(db_indx_t) * (NUM_ENT(pagep) - indx));
443 
444 	/* If it's a btree, adjust the cursors. */
445 	if (dbp->type == DB_BTREE)
446 		__bam_ca_di(dbp, PGNO(pagep), indx, -1);
447 
448 	return (0);
449 }
450 
451 /*
452  * __db_pitem --
453  *	Put an item on a page.
454  *
455  * PUBLIC: int __db_pitem
456  * PUBLIC:     __P((DBC *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *));
457  */
458 int
459 __db_pitem(dbc, pagep, indx, nbytes, hdr, data)
460 	DBC *dbc;
461 	PAGE *pagep;
462 	u_int32_t indx;
463 	u_int32_t nbytes;
464 	DBT *hdr, *data;
465 {
466 	DB *dbp;
467 	BKEYDATA bk;
468 	DBT thdr;
469 	int ret;
470 	u_int8_t *p;
471 
472 	/*
473 	 * Put a single item onto a page.  The logic figuring out where to
474 	 * insert and whether it fits is handled in the caller.  All we do
475 	 * here is manage the page shuffling.  We cheat a little bit in that
476 	 * we don't want to copy the dbt on a normal put twice.  If hdr is
477 	 * NULL, we create a BKEYDATA structure on the page, otherwise, just
478 	 * copy the caller's information onto the page.
479 	 *
480 	 * This routine is also used to put entries onto the page where the
481 	 * entry is pre-built, e.g., during recovery.  In this case, the hdr
482 	 * will point to the entry, and the data argument will be NULL.
483 	 *
484 	 * !!!
485 	 * There's a tremendous potential for off-by-one errors here, since
486 	 * the passed in header sizes must be adjusted for the structure's
487 	 * placeholder for the trailing variable-length data field.
488 	 */
489 	dbp = dbc->dbp;
490 	if (DB_LOGGING(dbc))
491 		if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbc->txn,
492 		    &LSN(pagep), 0, DB_ADD_DUP, dbp->log_fileid, PGNO(pagep),
493 		    (u_int32_t)indx, nbytes, hdr, data, &LSN(pagep))) != 0)
494 			return (ret);
495 
496 	if (hdr == NULL) {
497 		B_TSET(bk.type, B_KEYDATA, 0);
498 		bk.len = data == NULL ? 0 : data->size;
499 
500 		thdr.data = &bk;
501 		thdr.size = SSZA(BKEYDATA, data);
502 		hdr = &thdr;
503 	}
504 
505 	/* Adjust the index table, then put the item on the page. */
506 	if (indx != NUM_ENT(pagep))
507 		memmove(&pagep->inp[indx + 1], &pagep->inp[indx],
508 		    sizeof(db_indx_t) * (NUM_ENT(pagep) - indx));
509 	HOFFSET(pagep) -= nbytes;
510 	pagep->inp[indx] = HOFFSET(pagep);
511 	++NUM_ENT(pagep);
512 
513 	p = P_ENTRY(pagep, indx);
514 	memcpy(p, hdr->data, hdr->size);
515 	if (data != NULL)
516 		memcpy(p + hdr->size, data->data, data->size);
517 
518 	/* If it's a btree, adjust the cursors. */
519 	if (dbp->type == DB_BTREE)
520 		__bam_ca_di(dbp, PGNO(pagep), indx, 1);
521 
522 	return (0);
523 }
524 
525 /*
526  * __db_relink --
527  *	Relink around a deleted page.
528  *
529  * PUBLIC: int __db_relink __P((DBC *, u_int32_t, PAGE *, PAGE **, int));
530  */
531 int
532 __db_relink(dbc, add_rem, pagep, new_next, needlock)
533 	DBC *dbc;
534 	u_int32_t add_rem;
535 	PAGE *pagep, **new_next;
536 	int needlock;
537 {
538 	DB *dbp;
539 	PAGE *np, *pp;
540 	DB_LOCK npl, ppl;
541 	DB_LSN *nlsnp, *plsnp;
542 	int ret;
543 
544 	ret = 0;
545 	np = pp = NULL;
546 	npl = ppl = LOCK_INVALID;
547 	nlsnp = plsnp = NULL;
548 	dbp = dbc->dbp;
549 
550 	/*
551 	 * Retrieve and lock the one/two pages.  For a remove, we may need
552 	 * two pages (the before and after).  For an add, we only need one
553 	 * because, the split took care of the prev.
554 	 */
555 	if (pagep->next_pgno != PGNO_INVALID) {
556 		if (needlock && (ret = __bam_lget(dbc,
557 		    0, pagep->next_pgno, DB_LOCK_WRITE, &npl)) != 0)
558 			goto err;
559 		if ((ret = memp_fget(dbp->mpf,
560 		    &pagep->next_pgno, 0, &np)) != 0) {
561 			(void)__db_pgerr(dbp, pagep->next_pgno);
562 			goto err;
563 		}
564 		nlsnp = &np->lsn;
565 	}
566 	if (add_rem == DB_REM_PAGE && pagep->prev_pgno != PGNO_INVALID) {
567 		if (needlock && (ret = __bam_lget(dbc,
568 		    0, pagep->prev_pgno, DB_LOCK_WRITE, &ppl)) != 0)
569 			goto err;
570 		if ((ret = memp_fget(dbp->mpf,
571 		    &pagep->prev_pgno, 0, &pp)) != 0) {
572 			(void)__db_pgerr(dbp, pagep->next_pgno);
573 			goto err;
574 		}
575 		plsnp = &pp->lsn;
576 	}
577 
578 	/* Log the change. */
579 	if (DB_LOGGING(dbc)) {
580 		if ((ret = __db_relink_log(dbp->dbenv->lg_info, dbc->txn,
581 		    &pagep->lsn, 0, add_rem, dbp->log_fileid,
582 		    pagep->pgno, &pagep->lsn,
583 		    pagep->prev_pgno, plsnp, pagep->next_pgno, nlsnp)) != 0)
584 			goto err;
585 		if (np != NULL)
586 			np->lsn = pagep->lsn;
587 		if (pp != NULL)
588 			pp->lsn = pagep->lsn;
589 	}
590 
591 	/*
592 	 * Modify and release the two pages.
593 	 *
594 	 * !!!
595 	 * The parameter new_next gets set to the page following the page we
596 	 * are removing.  If there is no following page, then new_next gets
597 	 * set to NULL.
598 	 */
599 	if (np != NULL) {
600 		if (add_rem == DB_ADD_PAGE)
601 			np->prev_pgno = pagep->pgno;
602 		else
603 			np->prev_pgno = pagep->prev_pgno;
604 		if (new_next == NULL)
605 			ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY);
606 		else {
607 			*new_next = np;
608 			ret = memp_fset(dbp->mpf, np, DB_MPOOL_DIRTY);
609 		}
610 		if (ret != 0)
611 			goto err;
612 		if (needlock)
613 			(void)__bam_lput(dbc, npl);
614 	} else if (new_next != NULL)
615 		*new_next = NULL;
616 
617 	if (pp != NULL) {
618 		pp->next_pgno = pagep->next_pgno;
619 		if ((ret = memp_fput(dbp->mpf, pp, DB_MPOOL_DIRTY)) != 0)
620 			goto err;
621 		if (needlock)
622 			(void)__bam_lput(dbc, ppl);
623 	}
624 	return (0);
625 
626 err:	if (np != NULL)
627 		(void)memp_fput(dbp->mpf, np, 0);
628 	if (needlock && npl != LOCK_INVALID)
629 		(void)__bam_lput(dbc, npl);
630 	if (pp != NULL)
631 		(void)memp_fput(dbp->mpf, pp, 0);
632 	if (needlock && ppl != LOCK_INVALID)
633 		(void)__bam_lput(dbc, ppl);
634 	return (ret);
635 }
636 
637 /*
638  * __db_ddup --
639  *	Delete an offpage chain of duplicates.
640  *
641  * PUBLIC: int __db_ddup __P((DBC *, db_pgno_t, int (*)(DBC *, PAGE *)));
642  */
643 int
644 __db_ddup(dbc, pgno, freefunc)
645 	DBC *dbc;
646 	db_pgno_t pgno;
647 	int (*freefunc) __P((DBC *, PAGE *));
648 {
649 	DB *dbp;
650 	PAGE *pagep;
651 	DBT tmp_dbt;
652 	int ret;
653 
654 	dbp = dbc->dbp;
655 	do {
656 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0) {
657 			(void)__db_pgerr(dbp, pgno);
658 			return (ret);
659 		}
660 
661 		if (DB_LOGGING(dbc)) {
662 			tmp_dbt.data = pagep;
663 			tmp_dbt.size = dbp->pgsize;
664 			if ((ret = __db_split_log(dbp->dbenv->lg_info,
665 			    dbc->txn, &LSN(pagep), 0, DB_SPLITOLD,
666 			    dbp->log_fileid, PGNO(pagep), &tmp_dbt,
667 			    &LSN(pagep))) != 0)
668 				return (ret);
669 		}
670 		pgno = pagep->next_pgno;
671 		if ((ret = freefunc(dbc, pagep)) != 0)
672 			return (ret);
673 	} while (pgno != PGNO_INVALID);
674 
675 	return (0);
676 }
677 
678 /*
679  * __db_addpage --
680  *	Create a new page and link it onto the next_pgno field of the
681  *	current page.
682  */
683 static int
684 __db_addpage(dbc, hp, indxp, newfunc)
685 	DBC *dbc;
686 	PAGE **hp;
687 	db_indx_t *indxp;
688 	int (*newfunc) __P((DBC *, u_int32_t, PAGE **));
689 {
690 	DB *dbp;
691 	PAGE *newpage;
692 	int ret;
693 
694 	dbp = dbc->dbp;
695 	if ((ret = newfunc(dbc, P_DUPLICATE, &newpage)) != 0)
696 		return (ret);
697 
698 	if (DB_LOGGING(dbc)) {
699 		if ((ret = __db_addpage_log(dbp->dbenv->lg_info,
700 		    dbc->txn, &LSN(*hp), 0, dbp->log_fileid,
701 		    PGNO(*hp), &LSN(*hp), PGNO(newpage), &LSN(newpage))) != 0) {
702 			return (ret);
703 		}
704 		LSN(newpage) = LSN(*hp);
705 	}
706 
707 	PREV_PGNO(newpage) = PGNO(*hp);
708 	NEXT_PGNO(*hp) = PGNO(newpage);
709 
710 	if ((ret = memp_fput(dbp->mpf, *hp, DB_MPOOL_DIRTY)) != 0)
711 		return (ret);
712 	*hp = newpage;
713 	*indxp = 0;
714 	return (0);
715 }
716 
717 /*
718  * __db_dsearch --
719  *	Search a set of duplicates for the proper position for a new duplicate.
720  *
721  *	+ pgno is the page number of the page on which to begin searching.
722  * 	  Since we can continue duplicate searches, it might not be the first
723  * 	  page.
724  *
725  * 	+ If we are continuing a search, then *pp may be non-NULL in which
726  * 	  case we do not have to retrieve the page.
727  *
728  *	+ If we are continuing a search, then *indxp contains the first
729  * 	  on pgno of where we should begin the search.
730  *
731  * 	NOTE: if there is no comparison function, then continuing is
732  * 	meaningless, and *pp should always be NULL and *indxp will be
733  *	ignored.
734  *
735  *	3 return values::
736  *
737  *	+ pp is the returned page pointer of where this element should go.
738  *	+ indxp is the returned index on that page
739  *	+ cmpp is the returned final comparison result.
740  *
741  * PUBLIC: int __db_dsearch __P((DBC *,
742  * PUBLIC:     int, DBT *, db_pgno_t, db_indx_t *, PAGE **, int *));
743  */
744 int
745 __db_dsearch(dbc, is_insert, dbt, pgno, indxp, pp, cmpp)
746 	DBC *dbc;
747 	int is_insert, *cmpp;
748 	DBT *dbt;
749 	db_pgno_t pgno;
750 	db_indx_t *indxp;
751 	PAGE **pp;
752 {
753 	DB *dbp;
754 	PAGE *h;
755 	db_indx_t base, indx, lim, save_indx;
756 	db_pgno_t save_pgno;
757 	int ret;
758 
759 	dbp = dbc->dbp;
760 
761 	if (dbp->dup_compare == NULL) {
762 		/*
763 		 * We may have been given a valid page, but we may not be
764 		 * able to use it.  The problem is that the application is
765 		 * doing a join and we're trying to continue the search,
766 		 * but since the items aren't sorted, we can't.  Discard
767 		 * the page if it's not the one we're going to start with
768 		 * anyway.
769 		 */
770 		if (*pp != NULL && (*pp)->pgno != pgno) {
771 			if ((ret = memp_fput(dbp->mpf, *pp, 0)) != 0)
772 				return (ret);
773 			*pp = NULL;
774 		}
775 
776 		/*
777 		 * If no duplicate function is specified, just go to the end
778 		 * of the duplicate set.
779 		 */
780 		if (is_insert) {
781 			if ((ret = __db_dend(dbc, pgno, pp)) != 0)
782 				return (ret);
783 			*indxp = NUM_ENT(*pp);
784 			return (0);
785 		}
786 
787 		/*
788 		 * We are looking for a specific duplicate, so do a linear
789 		 * search.
790 		 */
791 		if (*pp != NULL)
792 			goto nocmp_started;
793 		for (;;) {
794 			if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0)
795 				goto pg_err;
796 nocmp_started:		h = *pp;
797 
798 			for (*indxp = 0; *indxp < NUM_ENT(h); ++*indxp) {
799 				if ((*cmpp = __bam_cmp(dbp,
800 				    dbt, h, *indxp, __bam_defcmp)) != 0)
801 					continue;
802 				/*
803 				 * The duplicate may have already been deleted,
804 				 * if it's a btree page, in which case we skip
805 				 * it.
806 				 */
807 				if (dbp->type == DB_BTREE &&
808 				    B_DISSET(GET_BKEYDATA(h, *indxp)->type))
809 					continue;
810 
811 				return (0);
812 			}
813 
814 			if ((pgno = h->next_pgno) == PGNO_INVALID)
815 				break;
816 
817 			if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
818 				return (ret);
819 		}
820 		*cmpp = 1;			/* We didn't succeed... */
821 		return (0);
822 	}
823 
824 	/*
825 	 * We have a comparison routine, i.e., the duplicates are sorted.
826 	 * Walk through the chain of duplicates, checking the last entry
827 	 * on each page to decide if it's the page we want to search.
828 	 *
829 	 * *pp may be non-NULL -- if we were given a valid page (e.g., are
830 	 * in mid-search), then use the provided page.
831 	 */
832 	if (*pp != NULL)
833 		goto cmp_started;
834 	for (;;) {
835 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0)
836 			goto pg_err;
837 cmp_started:	h = *pp;
838 
839 		if ((pgno = h->next_pgno) == PGNO_INVALID || __bam_cmp(dbp,
840 		    dbt, h, h->entries - 1, dbp->dup_compare) <= 0)
841 			break;
842 		/*
843 		 * Even when continuing a search, make sure we don't skip
844 		 * entries on a new page
845 		 */
846 		*indxp = 0;
847 
848 		if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
849 			return (ret);
850 	}
851 
852 	/* Next, do a binary search on the page. */
853 	base = F_ISSET(dbc, DBC_CONTINUE) ? *indxp : 0;
854 	for (lim = NUM_ENT(h) - base; lim != 0; lim >>= 1) {
855 		indx = base + (lim >> 1);
856 		if ((*cmpp = __bam_cmp(dbp,
857 		    dbt, h, indx, dbp->dup_compare)) == 0) {
858 			*indxp = indx;
859 
860 			if (dbp->type != DB_BTREE ||
861 			    !B_DISSET(GET_BKEYDATA(h, *indxp)->type))
862 				return (0);
863 			goto check_delete;
864 		}
865 		if (*cmpp > 0) {
866 			base = indx + 1;
867 			lim--;
868 		}
869 	}
870 
871 	/*
872 	 * Base references the smallest index larger than the supplied DBT's
873 	 * data item, potentially both 0 and NUM_ENT.
874 	 */
875 	*indxp = base;
876 	return (0);
877 
878 check_delete:
879 	/*
880 	 * The duplicate may have already been deleted, if it's a btree page,
881 	 * in which case we wander around, hoping to find an entry that hasn't
882 	 * been deleted.  First, wander in a forwardly direction.
883 	 */
884 	save_pgno = (*pp)->pgno;
885 	save_indx = *indxp;
886 	for (++*indxp;;) {
887 		for (; *indxp < NUM_ENT(h); ++*indxp) {
888 			if ((*cmpp = __bam_cmp(dbp,
889 			    dbt, h, *indxp, dbp->dup_compare)) != 0)
890 				goto check_delete_rev;
891 
892 			if (!B_DISSET(GET_BKEYDATA(h, *indxp)->type))
893 				return (0);
894 		}
895 		if ((pgno = h->next_pgno) == PGNO_INVALID)
896 			break;
897 
898 		if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
899 			return (ret);
900 
901 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0)
902 			goto pg_err;
903 		h = *pp;
904 
905 		*indxp = 0;
906 	}
907 
908 check_delete_rev:
909 	/* Go back to where we started, and wander in a backwardly direction. */
910 	if (h->pgno != save_pgno) {
911 		if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
912 			return (ret);
913 		if ((ret = memp_fget(dbp->mpf, &save_pgno, 0, pp)) != 0)
914 			goto pg_err;
915 		h = *pp;
916 	}
917 
918 	for (;;) {
919 		while (*indxp > 0) {
920 			--*indxp;
921 			if ((*cmpp = __bam_cmp(dbp,
922 			    dbt, h, *indxp, dbp->dup_compare)) != 0)
923 				goto check_delete_fail;
924 
925 			if (!B_DISSET(GET_BKEYDATA(h, *indxp)->type))
926 				return (0);
927 		}
928 		if ((pgno = h->prev_pgno) == PGNO_INVALID)
929 			break;
930 
931 		if ((ret = memp_fput(dbp->mpf, h, 0)) != 0)
932 			return (ret);
933 
934 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0)
935 			goto pg_err;
936 		h = *pp;
937 
938 		*indxp = NUM_ENT(h);
939 	}
940 
941 check_delete_fail:
942 	*cmpp = 1;			/* We didn't succeed... */
943 	return (0);
944 
945 pg_err:	__db_pgerr(dbp, pgno);
946 	return (ret);
947 }
948