xref: /titanic_41/usr/src/cmd/sendmail/db/btree/bt_split.c (revision 581cede61ac9c14d8d4ea452562a567189eead78)
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 1997, 1998
5  *	Sleepycat Software.  All rights reserved.
6  */
7 /*
8  * Copyright (c) 1990, 1993, 1994, 1995, 1996
9  *	Keith Bostic.  All rights reserved.
10  */
11 /*
12  * Copyright (c) 1990, 1993, 1994, 1995
13  *	The Regents of the University of California.  All rights reserved.
14  *
15  * Redistribution and use in source and binary forms, with or without
16  * modification, are permitted provided that the following conditions
17  * are met:
18  * 1. Redistributions of source code must retain the above copyright
19  *    notice, this list of conditions and the following disclaimer.
20  * 2. Redistributions in binary form must reproduce the above copyright
21  *    notice, this list of conditions and the following disclaimer in the
22  *    documentation and/or other materials provided with the distribution.
23  * 3. All advertising materials mentioning features or use of this software
24  *    must display the following acknowledgement:
25  *	This product includes software developed by the University of
26  *	California, Berkeley and its contributors.
27  * 4. Neither the name of the University nor the names of its contributors
28  *    may be used to endorse or promote products derived from this software
29  *    without specific prior written permission.
30  *
31  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
32  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
33  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
35  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
36  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
37  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
38  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
39  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
40  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
41  * SUCH DAMAGE.
42  */
43 
44 #include "config.h"
45 
46 #ifndef lint
47 static const char sccsid[] = "@(#)bt_split.c	10.33 (Sleepycat) 10/13/98";
48 #endif /* not lint */
49 
50 #ifndef NO_SYSTEM_INCLUDES
51 #include <sys/types.h>
52 
53 #include <errno.h>
54 #include <limits.h>
55 #include <string.h>
56 #endif
57 
58 #include "db_int.h"
59 #include "db_page.h"
60 #include "btree.h"
61 
62 static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
63 static int __bam_page __P((DBC *, EPG *, EPG *));
64 static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *));
65 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
66 static int __bam_root __P((DBC *, EPG *));
67 static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
68 
69 /*
70  * __bam_split --
71  *	Split a page.
72  *
73  * PUBLIC: int __bam_split __P((DBC *, void *));
74  */
75 int
76 __bam_split(dbc, arg)
77 	DBC *dbc;
78 	void *arg;
79 {
80 	BTREE *t;
81 	CURSOR *cp;
82 	DB *dbp;
83 	enum { UP, DOWN } dir;
84 	int exact, level, ret;
85 
86 	dbp = dbc->dbp;
87 	cp = dbc->internal;
88 
89 	/*
90 	 * The locking protocol we use to avoid deadlock to acquire locks by
91 	 * walking down the tree, but we do it as lazily as possible, locking
92 	 * the root only as a last resort.  We expect all stack pages to have
93 	 * been discarded before we're called; we discard all short-term locks.
94 	 *
95 	 * When __bam_split is first called, we know that a leaf page was too
96 	 * full for an insert.  We don't know what leaf page it was, but we
97 	 * have the key/recno that caused the problem.  We call XX_search to
98 	 * reacquire the leaf page, but this time get both the leaf page and
99 	 * its parent, locked.  We then split the leaf page and see if the new
100 	 * internal key will fit into the parent page.  If it will, we're done.
101 	 *
102 	 * If it won't, we discard our current locks and repeat the process,
103 	 * only this time acquiring the parent page and its parent, locked.
104 	 * This process repeats until we succeed in the split, splitting the
105 	 * root page as the final resort.  The entire process then repeats,
106 	 * as necessary, until we split a leaf page.
107 	 *
108 	 * XXX
109 	 * A traditional method of speeding this up is to maintain a stack of
110 	 * the pages traversed in the original search.  You can detect if the
111 	 * stack is correct by storing the page's LSN when it was searched and
112 	 * comparing that LSN with the current one when it's locked during the
113 	 * split.  This would be an easy change for this code, but I have no
114 	 * numbers that indicate it's worthwhile.
115 	 */
116 	t = dbp->internal;
117 	for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
118 		/*
119 		 * Acquire a page and its parent, locked.
120 		 */
121 		if ((ret = (dbp->type == DB_BTREE ?
122 		    __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
123 		    __bam_rsearch(dbc,
124 		        (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
125 			return (ret);
126 
127 		/*
128 		 * Split the page if it still needs it (it's possible another
129 		 * thread of control has already split the page).  If we are
130 		 * guaranteed that two items will fit on the page, the split
131 		 * is no longer necessary.
132 		 */
133 		if (t->bt_ovflsize * 2 <=
134 		    (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
135 			__bam_stkrel(dbc, 1);
136 			return (0);
137 		}
138 		ret = cp->csp[0].page->pgno == PGNO_ROOT ?
139 		    __bam_root(dbc, &cp->csp[0]) :
140 		    __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
141 		BT_STK_CLR(cp);
142 
143 		switch (ret) {
144 		case 0:
145 			/* Once we've split the leaf page, we're done. */
146 			if (level == LEAFLEVEL)
147 				return (0);
148 
149 			/* Switch directions. */
150 			if (dir == UP)
151 				dir = DOWN;
152 			break;
153 		case DB_NEEDSPLIT:
154 			/*
155 			 * It's possible to fail to split repeatedly, as other
156 			 * threads may be modifying the tree, or the page usage
157 			 * is sufficiently bad that we don't get enough space
158 			 * the first time.
159 			 */
160 			if (dir == DOWN)
161 				dir = UP;
162 			break;
163 		default:
164 			return (ret);
165 		}
166 	}
167 	/* NOTREACHED */
168 }
169 
170 /*
171  * __bam_root --
172  *	Split the root page of a btree.
173  */
174 static int
175 __bam_root(dbc, cp)
176 	DBC *dbc;
177 	EPG *cp;
178 {
179 	DB *dbp;
180 	PAGE *lp, *rp;
181 	db_indx_t split;
182 	int ret;
183 
184 	dbp = dbc->dbp;
185 
186 	/* Yeah, right. */
187 	if (cp->page->level >= MAXBTREELEVEL) {
188 		ret = ENOSPC;
189 		goto err;
190 	}
191 
192 	/* Create new left and right pages for the split. */
193 	lp = rp = NULL;
194 	if ((ret = __bam_new(dbc, TYPE(cp->page), &lp)) != 0 ||
195 	    (ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
196 		goto err;
197 	P_INIT(lp, dbp->pgsize, lp->pgno,
198 	    PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
199 	    cp->page->level, TYPE(cp->page));
200 	P_INIT(rp, dbp->pgsize, rp->pgno,
201 	    ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
202 	    cp->page->level, TYPE(cp->page));
203 
204 	/* Split the page. */
205 	if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
206 		goto err;
207 
208 	/* Log the change. */
209 	if (DB_LOGGING(dbc)) {
210 		DBT __a;
211 		DB_LSN __lsn;
212 		memset(&__a, 0, sizeof(__a));
213 		__a.data = cp->page;
214 		__a.size = dbp->pgsize;
215 		ZERO_LSN(__lsn);
216 		if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
217 		    &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
218 		    PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn,
219 		    &__a)) != 0)
220 			goto err;
221 		LSN(lp) = LSN(rp) = LSN(cp->page);
222 	}
223 
224 	/* Clean up the new root page. */
225 	if ((ret = (dbp->type == DB_RECNO ?
226 	    __ram_root(dbc, cp->page, lp, rp) :
227 	    __bam_broot(dbc, cp->page, lp, rp))) != 0)
228 		goto err;
229 
230 	/* Adjust any cursors.  Do it last so we don't have to undo it. */
231 	__bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
232 
233 	/* Success -- write the real pages back to the store. */
234 	(void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
235 	(void)__BT_TLPUT(dbc, cp->lock);
236 	(void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
237 	(void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
238 
239 	return (0);
240 
241 err:	if (lp != NULL)
242 		(void)__bam_free(dbc, lp);
243 	if (rp != NULL)
244 		(void)__bam_free(dbc, rp);
245 	(void)memp_fput(dbp->mpf, cp->page, 0);
246 	(void)__BT_TLPUT(dbc, cp->lock);
247 	return (ret);
248 }
249 
250 /*
251  * __bam_page --
252  *	Split the non-root page of a btree.
253  */
254 static int
255 __bam_page(dbc, pp, cp)
256 	DBC *dbc;
257 	EPG *pp, *cp;
258 {
259 	DB *dbp;
260 	DB_LOCK tplock;
261 	PAGE *lp, *rp, *tp;
262 	db_indx_t split;
263 	int ret;
264 
265 	dbp = dbc->dbp;
266 	lp = rp = tp = NULL;
267 	ret = -1;
268 
269 	/* Create new right page for the split. */
270 	if ((ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
271 		goto err;
272 	P_INIT(rp, dbp->pgsize, rp->pgno,
273 	    ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno,
274 	    ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno,
275 	    cp->page->level, TYPE(cp->page));
276 
277 	/* Create new left page for the split. */
278 	if ((ret = __os_malloc(dbp->pgsize, NULL, &lp)) != 0)
279 		goto err;
280 	P_INIT(lp, dbp->pgsize, cp->page->pgno,
281 	    ISINTERNAL(cp->page) ?  PGNO_INVALID : cp->page->prev_pgno,
282 	    ISINTERNAL(cp->page) ?  PGNO_INVALID : rp->pgno,
283 	    cp->page->level, TYPE(cp->page));
284 	ZERO_LSN(lp->lsn);
285 
286 	/*
287 	 * Split right.
288 	 *
289 	 * Only the indices are sorted on the page, i.e., the key/data pairs
290 	 * aren't, so it's simpler to copy the data from the split page onto
291 	 * two new pages instead of copying half the data to the right page
292 	 * and compacting the left page in place.  Since the left page can't
293 	 * change, we swap the original and the allocated left page after the
294 	 * split.
295 	 */
296 	if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
297 		goto err;
298 
299 	/*
300 	 * Fix up the previous pointer of any leaf page following the split
301 	 * page.
302 	 *
303 	 * !!!
304 	 * There are interesting deadlock situations here as we write-lock a
305 	 * page that's not in our direct ancestry.  Consider a cursor walking
306 	 * through the leaf pages, that has the previous page read-locked and
307 	 * is waiting on a lock for the page we just split.  It will deadlock
308 	 * here.  If this is a problem, we can fail in the split; it's not a
309 	 * problem as the split will succeed after the cursor passes through
310 	 * the page we're splitting.
311 	 */
312 	if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) {
313 		if ((ret = __bam_lget(dbc,
314 		    0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0)
315 			goto err;
316 		if ((ret = memp_fget(dbp->mpf, &rp->next_pgno, 0, &tp)) != 0)
317 			goto err;
318 	}
319 
320 	/* Insert the new pages into the parent page. */
321 	if ((ret = __bam_pinsert(dbc, pp, lp, rp)) != 0)
322 		goto err;
323 
324 	/* Log the change. */
325 	if (DB_LOGGING(dbc)) {
326 		DBT __a;
327 		DB_LSN __lsn;
328 		memset(&__a, 0, sizeof(__a));
329 		__a.data = cp->page;
330 		__a.size = dbp->pgsize;
331 		if (tp == NULL)
332 			ZERO_LSN(__lsn);
333 		if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
334 		    &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page),
335 		    &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp),
336 		    tp == NULL ? 0 : PGNO(tp),
337 		    tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0)
338 			goto err;
339 
340 		LSN(lp) = LSN(rp) = LSN(cp->page);
341 		if (tp != NULL)
342 			LSN(tp) = LSN(cp->page);
343 	}
344 
345 	/* Copy the allocated page into place. */
346 	memcpy(cp->page, lp, LOFFSET(lp));
347 	memcpy((u_int8_t *)cp->page + HOFFSET(lp),
348 	    (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
349 	__os_free(lp, dbp->pgsize);
350 	lp = NULL;
351 
352 	/* Finish the next-page link. */
353 	if (tp != NULL)
354 		tp->prev_pgno = rp->pgno;
355 
356 	/* Adjust any cursors.  Do so last so we don't have to undo it. */
357 	__bam_ca_split(dbp, cp->page->pgno, cp->page->pgno, rp->pgno, split, 0);
358 
359 	/* Success -- write the real pages back to the store. */
360 	(void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY);
361 	(void)__BT_TLPUT(dbc, pp->lock);
362 	(void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
363 	(void)__BT_TLPUT(dbc, cp->lock);
364 	(void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
365 	if (tp != NULL) {
366 		(void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY);
367 		(void)__BT_TLPUT(dbc, tplock);
368 	}
369 	return (0);
370 
371 err:	if (lp != NULL)
372 		__os_free(lp, dbp->pgsize);
373 	if (rp != NULL)
374 		(void)__bam_free(dbc, rp);
375 	if (tp != NULL) {
376 		(void)memp_fput(dbp->mpf, tp, 0);
377 		if (ret == DB_NEEDSPLIT)
378 			(void)__BT_LPUT(dbc, tplock);
379 		else
380 			(void)__BT_TLPUT(dbc, tplock);
381 	}
382 	(void)memp_fput(dbp->mpf, pp->page, 0);
383 	if (ret == DB_NEEDSPLIT)
384 		(void)__BT_LPUT(dbc, pp->lock);
385 	else
386 		(void)__BT_TLPUT(dbc, pp->lock);
387 	(void)memp_fput(dbp->mpf, cp->page, 0);
388 	if (ret == DB_NEEDSPLIT)
389 		(void)__BT_LPUT(dbc, cp->lock);
390 	else
391 		(void)__BT_TLPUT(dbc, cp->lock);
392 	return (ret);
393 }
394 
395 /*
396  * __bam_broot --
397  *	Fix up the btree root page after it has been split.
398  */
399 static int
400 __bam_broot(dbc, rootp, lp, rp)
401 	DBC *dbc;
402 	PAGE *rootp, *lp, *rp;
403 {
404 	BINTERNAL bi, *child_bi;
405 	BKEYDATA *child_bk;
406 	DB *dbp;
407 	DBT hdr, data;
408 	int ret;
409 
410 	dbp = dbc->dbp;
411 
412 	/*
413 	 * If the root page was a leaf page, change it into an internal page.
414 	 * We copy the key we split on (but not the key's data, in the case of
415 	 * a leaf page) to the new root page.
416 	 */
417 	P_INIT(rootp, dbp->pgsize,
418 	    PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
419 
420 	memset(&data, 0, sizeof(data));
421 	memset(&hdr, 0, sizeof(hdr));
422 
423 	/*
424 	 * The btree comparison code guarantees that the left-most key on any
425 	 * level of the tree is never used, so it doesn't need to be filled in.
426 	 */
427 	memset(&bi, 0, sizeof(bi));
428 	bi.len = 0;
429 	B_TSET(bi.type, B_KEYDATA, 0);
430 	bi.pgno = lp->pgno;
431 	if (F_ISSET(dbp, DB_BT_RECNUM)) {
432 		bi.nrecs = __bam_total(lp);
433 		RE_NREC_SET(rootp, bi.nrecs);
434 	}
435 	hdr.data = &bi;
436 	hdr.size = SSZA(BINTERNAL, data);
437 	if ((ret =
438 	    __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
439 		return (ret);
440 
441 	switch (TYPE(rp)) {
442 	case P_IBTREE:
443 		/* Copy the first key of the child page onto the root page. */
444 		child_bi = GET_BINTERNAL(rp, 0);
445 
446 		bi.len = child_bi->len;
447 		B_TSET(bi.type, child_bi->type, 0);
448 		bi.pgno = rp->pgno;
449 		if (F_ISSET(dbp, DB_BT_RECNUM)) {
450 			bi.nrecs = __bam_total(rp);
451 			RE_NREC_ADJ(rootp, bi.nrecs);
452 		}
453 		hdr.data = &bi;
454 		hdr.size = SSZA(BINTERNAL, data);
455 		data.data = child_bi->data;
456 		data.size = child_bi->len;
457 		if ((ret = __db_pitem(dbc, rootp, 1,
458 		    BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
459 			return (ret);
460 
461 		/* Increment the overflow ref count. */
462 		if (B_TYPE(child_bi->type) == B_OVERFLOW)
463 			if ((ret = __db_ovref(dbc,
464 			    ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
465 				return (ret);
466 		break;
467 	case P_LBTREE:
468 		/* Copy the first key of the child page onto the root page. */
469 		child_bk = GET_BKEYDATA(rp, 0);
470 		switch (B_TYPE(child_bk->type)) {
471 		case B_KEYDATA:
472 			bi.len = child_bk->len;
473 			B_TSET(bi.type, child_bk->type, 0);
474 			bi.pgno = rp->pgno;
475 			if (F_ISSET(dbp, DB_BT_RECNUM)) {
476 				bi.nrecs = __bam_total(rp);
477 				RE_NREC_ADJ(rootp, bi.nrecs);
478 			}
479 			hdr.data = &bi;
480 			hdr.size = SSZA(BINTERNAL, data);
481 			data.data = child_bk->data;
482 			data.size = child_bk->len;
483 			if ((ret = __db_pitem(dbc, rootp, 1,
484 			    BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
485 				return (ret);
486 			break;
487 		case B_DUPLICATE:
488 		case B_OVERFLOW:
489 			bi.len = BOVERFLOW_SIZE;
490 			B_TSET(bi.type, child_bk->type, 0);
491 			bi.pgno = rp->pgno;
492 			if (F_ISSET(dbp, DB_BT_RECNUM)) {
493 				bi.nrecs = __bam_total(rp);
494 				RE_NREC_ADJ(rootp, bi.nrecs);
495 			}
496 			hdr.data = &bi;
497 			hdr.size = SSZA(BINTERNAL, data);
498 			data.data = child_bk;
499 			data.size = BOVERFLOW_SIZE;
500 			if ((ret = __db_pitem(dbc, rootp, 1,
501 			    BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
502 				return (ret);
503 
504 			/* Increment the overflow ref count. */
505 			if (B_TYPE(child_bk->type) == B_OVERFLOW)
506 				if ((ret = __db_ovref(dbc,
507 				    ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
508 					return (ret);
509 			break;
510 		default:
511 			return (__db_pgfmt(dbp, rp->pgno));
512 		}
513 		break;
514 	default:
515 		return (__db_pgfmt(dbp, rp->pgno));
516 	}
517 	return (0);
518 }
519 
520 /*
521  * __ram_root --
522  *	Fix up the recno root page after it has been split.
523  */
524 static int
525 __ram_root(dbc, rootp, lp, rp)
526 	DBC *dbc;
527 	PAGE *rootp, *lp, *rp;
528 {
529 	DB *dbp;
530 	DBT hdr;
531 	RINTERNAL ri;
532 	int ret;
533 
534 	dbp = dbc->dbp;
535 
536 	/* Initialize the page. */
537 	P_INIT(rootp, dbp->pgsize,
538 	    PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
539 
540 	/* Initialize the header. */
541 	memset(&hdr, 0, sizeof(hdr));
542 	hdr.data = &ri;
543 	hdr.size = RINTERNAL_SIZE;
544 
545 	/* Insert the left and right keys, set the header information. */
546 	ri.pgno = lp->pgno;
547 	ri.nrecs = __bam_total(lp);
548 	if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
549 		return (ret);
550 	RE_NREC_SET(rootp, ri.nrecs);
551 	ri.pgno = rp->pgno;
552 	ri.nrecs = __bam_total(rp);
553 	if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
554 		return (ret);
555 	RE_NREC_ADJ(rootp, ri.nrecs);
556 	return (0);
557 }
558 
559 /*
560  * __bam_pinsert --
561  *	Insert a new key into a parent page, completing the split.
562  */
563 static int
564 __bam_pinsert(dbc, parent, lchild, rchild)
565 	DBC *dbc;
566 	EPG *parent;
567 	PAGE *lchild, *rchild;
568 {
569 	BINTERNAL bi, *child_bi;
570 	BKEYDATA *child_bk, *tmp_bk;
571 	BTREE *t;
572 	DB *dbp;
573 	DBT a, b, hdr, data;
574 	PAGE *ppage;
575 	RINTERNAL ri;
576 	db_indx_t off;
577 	db_recno_t nrecs;
578 	u_int32_t n, nbytes, nksize;
579 	int ret;
580 
581 	dbp = dbc->dbp;
582 	t = dbp->internal;
583 	ppage = parent->page;
584 
585 	/* If handling record numbers, count records split to the right page. */
586 	nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ?
587 	    __bam_total(rchild) : 0;
588 
589 	/*
590 	 * Now we insert the new page's first key into the parent page, which
591 	 * completes the split.  The parent points to a PAGE and a page index
592 	 * offset, where the new key goes ONE AFTER the index, because we split
593 	 * to the right.
594 	 *
595 	 * XXX
596 	 * Some btree algorithms replace the key for the old page as well as
597 	 * the new page.  We don't, as there's no reason to believe that the
598 	 * first key on the old page is any better than the key we have, and,
599 	 * in the case of a key being placed at index 0 causing the split, the
600 	 * key is unavailable.
601 	 */
602 	off = parent->indx + O_INDX;
603 
604 	/*
605 	 * Calculate the space needed on the parent page.
606 	 *
607 	 * Prefix trees: space hack used when inserting into BINTERNAL pages.
608 	 * Retain only what's needed to distinguish between the new entry and
609 	 * the LAST entry on the page to its left.  If the keys compare equal,
610 	 * retain the entire key.  We ignore overflow keys, and the entire key
611 	 * must be retained for the next-to-leftmost key on the leftmost page
612 	 * of each level, or the search will fail.  Applicable ONLY to internal
613 	 * pages that have leaf pages as children.  Further reduction of the
614 	 * key between pairs of internal pages loses too much information.
615 	 */
616 	switch (TYPE(rchild)) {
617 	case P_IBTREE:
618 		child_bi = GET_BINTERNAL(rchild, 0);
619 		nbytes = BINTERNAL_PSIZE(child_bi->len);
620 
621 		if (P_FREESPACE(ppage) < nbytes)
622 			return (DB_NEEDSPLIT);
623 
624 		/* Add a new record for the right page. */
625 		memset(&bi, 0, sizeof(bi));
626 		bi.len = child_bi->len;
627 		B_TSET(bi.type, child_bi->type, 0);
628 		bi.pgno = rchild->pgno;
629 		bi.nrecs = nrecs;
630 		memset(&hdr, 0, sizeof(hdr));
631 		hdr.data = &bi;
632 		hdr.size = SSZA(BINTERNAL, data);
633 		memset(&data, 0, sizeof(data));
634 		data.data = child_bi->data;
635 		data.size = child_bi->len;
636 		if ((ret = __db_pitem(dbc, ppage, off,
637 		    BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
638 			return (ret);
639 
640 		/* Increment the overflow ref count. */
641 		if (B_TYPE(child_bi->type) == B_OVERFLOW)
642 			if ((ret = __db_ovref(dbc,
643 			    ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
644 				return (ret);
645 		break;
646 	case P_LBTREE:
647 		child_bk = GET_BKEYDATA(rchild, 0);
648 		switch (B_TYPE(child_bk->type)) {
649 		case B_KEYDATA:
650 			nbytes = BINTERNAL_PSIZE(child_bk->len);
651 			nksize = child_bk->len;
652 			if (t->bt_prefix == NULL)
653 				goto noprefix;
654 			if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
655 				goto noprefix;
656 			tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX);
657 			if (B_TYPE(tmp_bk->type) != B_KEYDATA)
658 				goto noprefix;
659 			memset(&a, 0, sizeof(a));
660 			a.size = tmp_bk->len;
661 			a.data = tmp_bk->data;
662 			memset(&b, 0, sizeof(b));
663 			b.size = child_bk->len;
664 			b.data = child_bk->data;
665 			nksize = t->bt_prefix(&a, &b);
666 			if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
667 				nbytes = n;
668 			else
669 noprefix:			nksize = child_bk->len;
670 
671 			if (P_FREESPACE(ppage) < nbytes)
672 				return (DB_NEEDSPLIT);
673 
674 			memset(&bi, 0, sizeof(bi));
675 			bi.len = nksize;
676 			B_TSET(bi.type, child_bk->type, 0);
677 			bi.pgno = rchild->pgno;
678 			bi.nrecs = nrecs;
679 			memset(&hdr, 0, sizeof(hdr));
680 			hdr.data = &bi;
681 			hdr.size = SSZA(BINTERNAL, data);
682 			memset(&data, 0, sizeof(data));
683 			data.data = child_bk->data;
684 			data.size = nksize;
685 			if ((ret = __db_pitem(dbc, ppage, off,
686 			    BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
687 				return (ret);
688 			break;
689 		case B_DUPLICATE:
690 		case B_OVERFLOW:
691 			nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
692 
693 			if (P_FREESPACE(ppage) < nbytes)
694 				return (DB_NEEDSPLIT);
695 
696 			memset(&bi, 0, sizeof(bi));
697 			bi.len = BOVERFLOW_SIZE;
698 			B_TSET(bi.type, child_bk->type, 0);
699 			bi.pgno = rchild->pgno;
700 			bi.nrecs = nrecs;
701 			memset(&hdr, 0, sizeof(hdr));
702 			hdr.data = &bi;
703 			hdr.size = SSZA(BINTERNAL, data);
704 			memset(&data, 0, sizeof(data));
705 			data.data = child_bk;
706 			data.size = BOVERFLOW_SIZE;
707 			if ((ret = __db_pitem(dbc, ppage, off,
708 			    BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
709 				return (ret);
710 
711 			/* Increment the overflow ref count. */
712 			if (B_TYPE(child_bk->type) == B_OVERFLOW)
713 				if ((ret = __db_ovref(dbc,
714 				    ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
715 					return (ret);
716 			break;
717 		default:
718 			return (__db_pgfmt(dbp, rchild->pgno));
719 		}
720 		break;
721 	case P_IRECNO:
722 	case P_LRECNO:
723 		nbytes = RINTERNAL_PSIZE;
724 
725 		if (P_FREESPACE(ppage) < nbytes)
726 			return (DB_NEEDSPLIT);
727 
728 		/* Add a new record for the right page. */
729 		memset(&hdr, 0, sizeof(hdr));
730 		hdr.data = &ri;
731 		hdr.size = RINTERNAL_SIZE;
732 		ri.pgno = rchild->pgno;
733 		ri.nrecs = nrecs;
734 		if ((ret = __db_pitem(dbc,
735 		    ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
736 			return (ret);
737 		break;
738 	default:
739 		return (__db_pgfmt(dbp, rchild->pgno));
740 	}
741 
742 	/* Adjust the parent page's left page record count. */
743 	if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
744 		/* Log the change. */
745 		if (DB_LOGGING(dbc) &&
746 		    (ret = __bam_cadjust_log(dbp->dbenv->lg_info,
747 		    dbc->txn, &LSN(ppage), 0, dbp->log_fileid,
748 		    PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx,
749 		    -(int32_t)nrecs, (int32_t)0)) != 0)
750 			return (ret);
751 
752 		/* Update the left page count. */
753 		if (dbp->type == DB_RECNO)
754 			GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
755 		else
756 			GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
757 	}
758 
759 	return (0);
760 }
761 
762 /*
763  * __bam_psplit --
764  *	Do the real work of splitting the page.
765  */
766 static int
767 __bam_psplit(dbc, cp, lp, rp, splitret)
768 	DBC *dbc;
769 	EPG *cp;
770 	PAGE *lp, *rp;
771 	db_indx_t *splitret;
772 {
773 	DB *dbp;
774 	PAGE *pp;
775 	db_indx_t half, nbytes, off, splitp, top;
776 	int adjust, cnt, isbigkey, ret;
777 
778 	dbp = dbc->dbp;
779 	pp = cp->page;
780 	adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
781 
782 	/*
783 	 * If we're splitting the first (last) page on a level because we're
784 	 * inserting (appending) a key to it, it's likely that the data is
785 	 * sorted.  Moving a single item to the new page is less work and can
786 	 * push the fill factor higher than normal.  If we're wrong it's not
787 	 * a big deal, we'll just do the split the right way next time.
788 	 */
789 	off = 0;
790 	if (NEXT_PGNO(pp) == PGNO_INVALID &&
791 	    ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
792 	    (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
793 		off = NUM_ENT(cp->page) - adjust;
794 	else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
795 		off = adjust;
796 
797 	if (off != 0)
798 		goto sort;
799 
800 	/*
801 	 * Split the data to the left and right pages.  Try not to split on
802 	 * an overflow key.  (Overflow keys on internal pages will slow down
803 	 * searches.)  Refuse to split in the middle of a set of duplicates.
804 	 *
805 	 * First, find the optimum place to split.
806 	 *
807 	 * It's possible to try and split past the last record on the page if
808 	 * there's a very large record at the end of the page.  Make sure this
809 	 * doesn't happen by bounding the check at the next-to-last entry on
810 	 * the page.
811 	 *
812 	 * Note, we try and split half the data present on the page.  This is
813 	 * because another process may have already split the page and left
814 	 * it half empty.  We don't try and skip the split -- we don't know
815 	 * how much space we're going to need on the page, and we may need up
816 	 * to half the page for a big item, so there's no easy test to decide
817 	 * if we need to split or not.  Besides, if two threads are inserting
818 	 * data into the same place in the database, we're probably going to
819 	 * need more space soon anyway.
820 	 */
821 	top = NUM_ENT(pp) - adjust;
822 	half = (dbp->pgsize - HOFFSET(pp)) / 2;
823 	for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
824 		switch (TYPE(pp)) {
825 		case P_IBTREE:
826 			if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
827 				nbytes +=
828 				   BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
829 			else
830 				nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
831 			break;
832 		case P_LBTREE:
833 			if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
834 				nbytes +=
835 				    BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
836 			else
837 				nbytes += BOVERFLOW_SIZE;
838 
839 			++off;
840 			if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
841 				nbytes +=
842 				    BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
843 			else
844 				nbytes += BOVERFLOW_SIZE;
845 			break;
846 		case P_IRECNO:
847 			nbytes += RINTERNAL_SIZE;
848 			break;
849 		case P_LRECNO:
850 			nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
851 			break;
852 		default:
853 			return (__db_pgfmt(dbp, pp->pgno));
854 		}
855 sort:	splitp = off;
856 
857 	/*
858 	 * Splitp is either at or just past the optimum split point.  If
859 	 * it's a big key, try and find something close by that's not.
860 	 */
861 	if (TYPE(pp) == P_IBTREE)
862 		isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
863 	else if (TYPE(pp) == P_LBTREE)
864 		isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
865 	else
866 		isbigkey = 0;
867 	if (isbigkey)
868 		for (cnt = 1; cnt <= 3; ++cnt) {
869 			off = splitp + cnt * adjust;
870 			if (off < (db_indx_t)NUM_ENT(pp) &&
871 			    ((TYPE(pp) == P_IBTREE &&
872 			    B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
873 			    B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
874 				splitp = off;
875 				break;
876 			}
877 			if (splitp <= (db_indx_t)(cnt * adjust))
878 				continue;
879 			off = splitp - cnt * adjust;
880 			if (TYPE(pp) == P_IBTREE ?
881 			    B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
882 			    B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
883 				splitp = off;
884 				break;
885 			}
886 		}
887 
888 	/*
889 	 * We can't split in the middle a set of duplicates.  We know that
890 	 * no duplicate set can take up more than about 25% of the page,
891 	 * because that's the point where we push it off onto a duplicate
892 	 * page set.  So, this loop can't be unbounded.
893 	 */
894 	if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE &&
895 	    pp->inp[splitp] == pp->inp[splitp - adjust])
896 		for (cnt = 1;; ++cnt) {
897 			off = splitp + cnt * adjust;
898 			if (off < NUM_ENT(pp) &&
899 			    pp->inp[splitp] != pp->inp[off]) {
900 				splitp = off;
901 				break;
902 			}
903 			if (splitp <= (db_indx_t)(cnt * adjust))
904 				continue;
905 			off = splitp - cnt * adjust;
906 			if (pp->inp[splitp] != pp->inp[off]) {
907 				splitp = off + adjust;
908 				break;
909 			}
910 		}
911 
912 
913 	/* We're going to split at splitp. */
914 	if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
915 		return (ret);
916 	if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
917 		return (ret);
918 
919 	*splitret = splitp;
920 	return (0);
921 }
922 
923 /*
924  * __bam_copy --
925  *	Copy a set of records from one page to another.
926  *
927  * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
928  */
929 int
930 __bam_copy(dbp, pp, cp, nxt, stop)
931 	DB *dbp;
932 	PAGE *pp, *cp;
933 	u_int32_t nxt, stop;
934 {
935 	db_indx_t nbytes, off;
936 
937 	/*
938 	 * Copy the rest of the data to the right page.  Nxt is the next
939 	 * offset placed on the target page.
940 	 */
941 	for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
942 		switch (TYPE(pp)) {
943 		case P_IBTREE:
944 			if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
945 				nbytes =
946 				    BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
947 			else
948 				nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
949 			break;
950 		case P_LBTREE:
951 			/*
952 			 * If we're on a key and it's a duplicate, just copy
953 			 * the offset.
954 			 */
955 			if (off != 0 && (nxt % P_INDX) == 0 &&
956 			    pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
957 				cp->inp[off] = cp->inp[off - P_INDX];
958 				continue;
959 			}
960 			/* FALLTHROUGH */
961 		case P_LRECNO:
962 			if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
963 				nbytes =
964 				    BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
965 			else
966 				nbytes = BOVERFLOW_SIZE;
967 			break;
968 		case P_IRECNO:
969 			nbytes = RINTERNAL_SIZE;
970 			break;
971 		default:
972 			return (__db_pgfmt(dbp, pp->pgno));
973 		}
974 		cp->inp[off] = HOFFSET(cp) -= nbytes;
975 		memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
976 	}
977 	return (0);
978 }
979