1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998
5 * Sleepycat Software. All rights reserved.
6 */
7 /*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 * Keith Bostic. All rights reserved.
10 */
11 /*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 * The Regents of the University of California. All rights reserved.
14 *
15 * Redistribution and use in source and binary forms, with or without
16 * modification, are permitted provided that the following conditions
17 * are met:
18 * 1. Redistributions of source code must retain the above copyright
19 * notice, this list of conditions and the following disclaimer.
20 * 2. Redistributions in binary form must reproduce the above copyright
21 * notice, this list of conditions and the following disclaimer in the
22 * documentation and/or other materials provided with the distribution.
23 * 3. All advertising materials mentioning features or use of this software
24 * must display the following acknowledgement:
25 * This product includes software developed by the University of
26 * California, Berkeley and its contributors.
27 * 4. Neither the name of the University nor the names of its contributors
28 * may be used to endorse or promote products derived from this software
29 * without specific prior written permission.
30 *
31 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
32 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
33 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
35 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
36 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
37 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
38 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
39 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
40 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
41 * SUCH DAMAGE.
42 */
43
44 #include "config.h"
45
46 #ifndef lint
47 static const char sccsid[] = "@(#)bt_split.c 10.33 (Sleepycat) 10/13/98";
48 #endif /* not lint */
49
50 #ifndef NO_SYSTEM_INCLUDES
51 #include <sys/types.h>
52
53 #include <errno.h>
54 #include <limits.h>
55 #include <string.h>
56 #endif
57
58 #include "db_int.h"
59 #include "db_page.h"
60 #include "btree.h"
61
62 static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
63 static int __bam_page __P((DBC *, EPG *, EPG *));
64 static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *));
65 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
66 static int __bam_root __P((DBC *, EPG *));
67 static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
68
69 /*
70 * __bam_split --
71 * Split a page.
72 *
73 * PUBLIC: int __bam_split __P((DBC *, void *));
74 */
75 int
__bam_split(dbc,arg)76 __bam_split(dbc, arg)
77 DBC *dbc;
78 void *arg;
79 {
80 BTREE *t;
81 CURSOR *cp;
82 DB *dbp;
83 enum { UP, DOWN } dir;
84 int exact, level, ret;
85
86 dbp = dbc->dbp;
87 cp = dbc->internal;
88
89 /*
90 * The locking protocol we use to avoid deadlock to acquire locks by
91 * walking down the tree, but we do it as lazily as possible, locking
92 * the root only as a last resort. We expect all stack pages to have
93 * been discarded before we're called; we discard all short-term locks.
94 *
95 * When __bam_split is first called, we know that a leaf page was too
96 * full for an insert. We don't know what leaf page it was, but we
97 * have the key/recno that caused the problem. We call XX_search to
98 * reacquire the leaf page, but this time get both the leaf page and
99 * its parent, locked. We then split the leaf page and see if the new
100 * internal key will fit into the parent page. If it will, we're done.
101 *
102 * If it won't, we discard our current locks and repeat the process,
103 * only this time acquiring the parent page and its parent, locked.
104 * This process repeats until we succeed in the split, splitting the
105 * root page as the final resort. The entire process then repeats,
106 * as necessary, until we split a leaf page.
107 *
108 * XXX
109 * A traditional method of speeding this up is to maintain a stack of
110 * the pages traversed in the original search. You can detect if the
111 * stack is correct by storing the page's LSN when it was searched and
112 * comparing that LSN with the current one when it's locked during the
113 * split. This would be an easy change for this code, but I have no
114 * numbers that indicate it's worthwhile.
115 */
116 t = dbp->internal;
117 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
118 /*
119 * Acquire a page and its parent, locked.
120 */
121 if ((ret = (dbp->type == DB_BTREE ?
122 __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
123 __bam_rsearch(dbc,
124 (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
125 return (ret);
126
127 /*
128 * Split the page if it still needs it (it's possible another
129 * thread of control has already split the page). If we are
130 * guaranteed that two items will fit on the page, the split
131 * is no longer necessary.
132 */
133 if (t->bt_ovflsize * 2 <=
134 (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
135 __bam_stkrel(dbc, 1);
136 return (0);
137 }
138 ret = cp->csp[0].page->pgno == PGNO_ROOT ?
139 __bam_root(dbc, &cp->csp[0]) :
140 __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
141 BT_STK_CLR(cp);
142
143 switch (ret) {
144 case 0:
145 /* Once we've split the leaf page, we're done. */
146 if (level == LEAFLEVEL)
147 return (0);
148
149 /* Switch directions. */
150 if (dir == UP)
151 dir = DOWN;
152 break;
153 case DB_NEEDSPLIT:
154 /*
155 * It's possible to fail to split repeatedly, as other
156 * threads may be modifying the tree, or the page usage
157 * is sufficiently bad that we don't get enough space
158 * the first time.
159 */
160 if (dir == DOWN)
161 dir = UP;
162 break;
163 default:
164 return (ret);
165 }
166 }
167 /* NOTREACHED */
168 }
169
170 /*
171 * __bam_root --
172 * Split the root page of a btree.
173 */
174 static int
__bam_root(dbc,cp)175 __bam_root(dbc, cp)
176 DBC *dbc;
177 EPG *cp;
178 {
179 DB *dbp;
180 PAGE *lp, *rp;
181 db_indx_t split;
182 int ret;
183
184 dbp = dbc->dbp;
185
186 /* Yeah, right. */
187 if (cp->page->level >= MAXBTREELEVEL) {
188 ret = ENOSPC;
189 goto err;
190 }
191
192 /* Create new left and right pages for the split. */
193 lp = rp = NULL;
194 if ((ret = __bam_new(dbc, TYPE(cp->page), &lp)) != 0 ||
195 (ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
196 goto err;
197 P_INIT(lp, dbp->pgsize, lp->pgno,
198 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
199 cp->page->level, TYPE(cp->page));
200 P_INIT(rp, dbp->pgsize, rp->pgno,
201 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID,
202 cp->page->level, TYPE(cp->page));
203
204 /* Split the page. */
205 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
206 goto err;
207
208 /* Log the change. */
209 if (DB_LOGGING(dbc)) {
210 DBT __a;
211 DB_LSN __lsn;
212 memset(&__a, 0, sizeof(__a));
213 __a.data = cp->page;
214 __a.size = dbp->pgsize;
215 ZERO_LSN(__lsn);
216 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
217 &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
218 PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn,
219 &__a)) != 0)
220 goto err;
221 LSN(lp) = LSN(rp) = LSN(cp->page);
222 }
223
224 /* Clean up the new root page. */
225 if ((ret = (dbp->type == DB_RECNO ?
226 __ram_root(dbc, cp->page, lp, rp) :
227 __bam_broot(dbc, cp->page, lp, rp))) != 0)
228 goto err;
229
230 /* Adjust any cursors. Do it last so we don't have to undo it. */
231 __bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
232
233 /* Success -- write the real pages back to the store. */
234 (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
235 (void)__BT_TLPUT(dbc, cp->lock);
236 (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
237 (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
238
239 return (0);
240
241 err: if (lp != NULL)
242 (void)__bam_free(dbc, lp);
243 if (rp != NULL)
244 (void)__bam_free(dbc, rp);
245 (void)memp_fput(dbp->mpf, cp->page, 0);
246 (void)__BT_TLPUT(dbc, cp->lock);
247 return (ret);
248 }
249
250 /*
251 * __bam_page --
252 * Split the non-root page of a btree.
253 */
254 static int
__bam_page(dbc,pp,cp)255 __bam_page(dbc, pp, cp)
256 DBC *dbc;
257 EPG *pp, *cp;
258 {
259 DB *dbp;
260 DB_LOCK tplock;
261 PAGE *lp, *rp, *tp;
262 db_indx_t split;
263 int ret;
264
265 dbp = dbc->dbp;
266 lp = rp = tp = NULL;
267 ret = -1;
268
269 /* Create new right page for the split. */
270 if ((ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
271 goto err;
272 P_INIT(rp, dbp->pgsize, rp->pgno,
273 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno,
274 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno,
275 cp->page->level, TYPE(cp->page));
276
277 /* Create new left page for the split. */
278 if ((ret = __os_malloc(dbp->pgsize, NULL, &lp)) != 0)
279 goto err;
280 P_INIT(lp, dbp->pgsize, cp->page->pgno,
281 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->prev_pgno,
282 ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
283 cp->page->level, TYPE(cp->page));
284 ZERO_LSN(lp->lsn);
285
286 /*
287 * Split right.
288 *
289 * Only the indices are sorted on the page, i.e., the key/data pairs
290 * aren't, so it's simpler to copy the data from the split page onto
291 * two new pages instead of copying half the data to the right page
292 * and compacting the left page in place. Since the left page can't
293 * change, we swap the original and the allocated left page after the
294 * split.
295 */
296 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
297 goto err;
298
299 /*
300 * Fix up the previous pointer of any leaf page following the split
301 * page.
302 *
303 * !!!
304 * There are interesting deadlock situations here as we write-lock a
305 * page that's not in our direct ancestry. Consider a cursor walking
306 * through the leaf pages, that has the previous page read-locked and
307 * is waiting on a lock for the page we just split. It will deadlock
308 * here. If this is a problem, we can fail in the split; it's not a
309 * problem as the split will succeed after the cursor passes through
310 * the page we're splitting.
311 */
312 if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) {
313 if ((ret = __bam_lget(dbc,
314 0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0)
315 goto err;
316 if ((ret = memp_fget(dbp->mpf, &rp->next_pgno, 0, &tp)) != 0)
317 goto err;
318 }
319
320 /* Insert the new pages into the parent page. */
321 if ((ret = __bam_pinsert(dbc, pp, lp, rp)) != 0)
322 goto err;
323
324 /* Log the change. */
325 if (DB_LOGGING(dbc)) {
326 DBT __a;
327 DB_LSN __lsn;
328 memset(&__a, 0, sizeof(__a));
329 __a.data = cp->page;
330 __a.size = dbp->pgsize;
331 if (tp == NULL)
332 ZERO_LSN(__lsn);
333 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
334 &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page),
335 &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp),
336 tp == NULL ? 0 : PGNO(tp),
337 tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0)
338 goto err;
339
340 LSN(lp) = LSN(rp) = LSN(cp->page);
341 if (tp != NULL)
342 LSN(tp) = LSN(cp->page);
343 }
344
345 /* Copy the allocated page into place. */
346 memcpy(cp->page, lp, LOFFSET(lp));
347 memcpy((u_int8_t *)cp->page + HOFFSET(lp),
348 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
349 __os_free(lp, dbp->pgsize);
350 lp = NULL;
351
352 /* Finish the next-page link. */
353 if (tp != NULL)
354 tp->prev_pgno = rp->pgno;
355
356 /* Adjust any cursors. Do so last so we don't have to undo it. */
357 __bam_ca_split(dbp, cp->page->pgno, cp->page->pgno, rp->pgno, split, 0);
358
359 /* Success -- write the real pages back to the store. */
360 (void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY);
361 (void)__BT_TLPUT(dbc, pp->lock);
362 (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
363 (void)__BT_TLPUT(dbc, cp->lock);
364 (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
365 if (tp != NULL) {
366 (void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY);
367 (void)__BT_TLPUT(dbc, tplock);
368 }
369 return (0);
370
371 err: if (lp != NULL)
372 __os_free(lp, dbp->pgsize);
373 if (rp != NULL)
374 (void)__bam_free(dbc, rp);
375 if (tp != NULL) {
376 (void)memp_fput(dbp->mpf, tp, 0);
377 if (ret == DB_NEEDSPLIT)
378 (void)__BT_LPUT(dbc, tplock);
379 else
380 (void)__BT_TLPUT(dbc, tplock);
381 }
382 (void)memp_fput(dbp->mpf, pp->page, 0);
383 if (ret == DB_NEEDSPLIT)
384 (void)__BT_LPUT(dbc, pp->lock);
385 else
386 (void)__BT_TLPUT(dbc, pp->lock);
387 (void)memp_fput(dbp->mpf, cp->page, 0);
388 if (ret == DB_NEEDSPLIT)
389 (void)__BT_LPUT(dbc, cp->lock);
390 else
391 (void)__BT_TLPUT(dbc, cp->lock);
392 return (ret);
393 }
394
395 /*
396 * __bam_broot --
397 * Fix up the btree root page after it has been split.
398 */
399 static int
__bam_broot(dbc,rootp,lp,rp)400 __bam_broot(dbc, rootp, lp, rp)
401 DBC *dbc;
402 PAGE *rootp, *lp, *rp;
403 {
404 BINTERNAL bi, *child_bi;
405 BKEYDATA *child_bk;
406 DB *dbp;
407 DBT hdr, data;
408 int ret;
409
410 dbp = dbc->dbp;
411
412 /*
413 * If the root page was a leaf page, change it into an internal page.
414 * We copy the key we split on (but not the key's data, in the case of
415 * a leaf page) to the new root page.
416 */
417 P_INIT(rootp, dbp->pgsize,
418 PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
419
420 memset(&data, 0, sizeof(data));
421 memset(&hdr, 0, sizeof(hdr));
422
423 /*
424 * The btree comparison code guarantees that the left-most key on any
425 * level of the tree is never used, so it doesn't need to be filled in.
426 */
427 memset(&bi, 0, sizeof(bi));
428 bi.len = 0;
429 B_TSET(bi.type, B_KEYDATA, 0);
430 bi.pgno = lp->pgno;
431 if (F_ISSET(dbp, DB_BT_RECNUM)) {
432 bi.nrecs = __bam_total(lp);
433 RE_NREC_SET(rootp, bi.nrecs);
434 }
435 hdr.data = &bi;
436 hdr.size = SSZA(BINTERNAL, data);
437 if ((ret =
438 __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
439 return (ret);
440
441 switch (TYPE(rp)) {
442 case P_IBTREE:
443 /* Copy the first key of the child page onto the root page. */
444 child_bi = GET_BINTERNAL(rp, 0);
445
446 bi.len = child_bi->len;
447 B_TSET(bi.type, child_bi->type, 0);
448 bi.pgno = rp->pgno;
449 if (F_ISSET(dbp, DB_BT_RECNUM)) {
450 bi.nrecs = __bam_total(rp);
451 RE_NREC_ADJ(rootp, bi.nrecs);
452 }
453 hdr.data = &bi;
454 hdr.size = SSZA(BINTERNAL, data);
455 data.data = child_bi->data;
456 data.size = child_bi->len;
457 if ((ret = __db_pitem(dbc, rootp, 1,
458 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
459 return (ret);
460
461 /* Increment the overflow ref count. */
462 if (B_TYPE(child_bi->type) == B_OVERFLOW)
463 if ((ret = __db_ovref(dbc,
464 ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
465 return (ret);
466 break;
467 case P_LBTREE:
468 /* Copy the first key of the child page onto the root page. */
469 child_bk = GET_BKEYDATA(rp, 0);
470 switch (B_TYPE(child_bk->type)) {
471 case B_KEYDATA:
472 bi.len = child_bk->len;
473 B_TSET(bi.type, child_bk->type, 0);
474 bi.pgno = rp->pgno;
475 if (F_ISSET(dbp, DB_BT_RECNUM)) {
476 bi.nrecs = __bam_total(rp);
477 RE_NREC_ADJ(rootp, bi.nrecs);
478 }
479 hdr.data = &bi;
480 hdr.size = SSZA(BINTERNAL, data);
481 data.data = child_bk->data;
482 data.size = child_bk->len;
483 if ((ret = __db_pitem(dbc, rootp, 1,
484 BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
485 return (ret);
486 break;
487 case B_DUPLICATE:
488 case B_OVERFLOW:
489 bi.len = BOVERFLOW_SIZE;
490 B_TSET(bi.type, child_bk->type, 0);
491 bi.pgno = rp->pgno;
492 if (F_ISSET(dbp, DB_BT_RECNUM)) {
493 bi.nrecs = __bam_total(rp);
494 RE_NREC_ADJ(rootp, bi.nrecs);
495 }
496 hdr.data = &bi;
497 hdr.size = SSZA(BINTERNAL, data);
498 data.data = child_bk;
499 data.size = BOVERFLOW_SIZE;
500 if ((ret = __db_pitem(dbc, rootp, 1,
501 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
502 return (ret);
503
504 /* Increment the overflow ref count. */
505 if (B_TYPE(child_bk->type) == B_OVERFLOW)
506 if ((ret = __db_ovref(dbc,
507 ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
508 return (ret);
509 break;
510 default:
511 return (__db_pgfmt(dbp, rp->pgno));
512 }
513 break;
514 default:
515 return (__db_pgfmt(dbp, rp->pgno));
516 }
517 return (0);
518 }
519
520 /*
521 * __ram_root --
522 * Fix up the recno root page after it has been split.
523 */
524 static int
__ram_root(dbc,rootp,lp,rp)525 __ram_root(dbc, rootp, lp, rp)
526 DBC *dbc;
527 PAGE *rootp, *lp, *rp;
528 {
529 DB *dbp;
530 DBT hdr;
531 RINTERNAL ri;
532 int ret;
533
534 dbp = dbc->dbp;
535
536 /* Initialize the page. */
537 P_INIT(rootp, dbp->pgsize,
538 PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
539
540 /* Initialize the header. */
541 memset(&hdr, 0, sizeof(hdr));
542 hdr.data = &ri;
543 hdr.size = RINTERNAL_SIZE;
544
545 /* Insert the left and right keys, set the header information. */
546 ri.pgno = lp->pgno;
547 ri.nrecs = __bam_total(lp);
548 if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
549 return (ret);
550 RE_NREC_SET(rootp, ri.nrecs);
551 ri.pgno = rp->pgno;
552 ri.nrecs = __bam_total(rp);
553 if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
554 return (ret);
555 RE_NREC_ADJ(rootp, ri.nrecs);
556 return (0);
557 }
558
559 /*
560 * __bam_pinsert --
561 * Insert a new key into a parent page, completing the split.
562 */
563 static int
__bam_pinsert(dbc,parent,lchild,rchild)564 __bam_pinsert(dbc, parent, lchild, rchild)
565 DBC *dbc;
566 EPG *parent;
567 PAGE *lchild, *rchild;
568 {
569 BINTERNAL bi, *child_bi;
570 BKEYDATA *child_bk, *tmp_bk;
571 BTREE *t;
572 DB *dbp;
573 DBT a, b, hdr, data;
574 PAGE *ppage;
575 RINTERNAL ri;
576 db_indx_t off;
577 db_recno_t nrecs;
578 u_int32_t n, nbytes, nksize;
579 int ret;
580
581 dbp = dbc->dbp;
582 t = dbp->internal;
583 ppage = parent->page;
584
585 /* If handling record numbers, count records split to the right page. */
586 nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ?
587 __bam_total(rchild) : 0;
588
589 /*
590 * Now we insert the new page's first key into the parent page, which
591 * completes the split. The parent points to a PAGE and a page index
592 * offset, where the new key goes ONE AFTER the index, because we split
593 * to the right.
594 *
595 * XXX
596 * Some btree algorithms replace the key for the old page as well as
597 * the new page. We don't, as there's no reason to believe that the
598 * first key on the old page is any better than the key we have, and,
599 * in the case of a key being placed at index 0 causing the split, the
600 * key is unavailable.
601 */
602 off = parent->indx + O_INDX;
603
604 /*
605 * Calculate the space needed on the parent page.
606 *
607 * Prefix trees: space hack used when inserting into BINTERNAL pages.
608 * Retain only what's needed to distinguish between the new entry and
609 * the LAST entry on the page to its left. If the keys compare equal,
610 * retain the entire key. We ignore overflow keys, and the entire key
611 * must be retained for the next-to-leftmost key on the leftmost page
612 * of each level, or the search will fail. Applicable ONLY to internal
613 * pages that have leaf pages as children. Further reduction of the
614 * key between pairs of internal pages loses too much information.
615 */
616 switch (TYPE(rchild)) {
617 case P_IBTREE:
618 child_bi = GET_BINTERNAL(rchild, 0);
619 nbytes = BINTERNAL_PSIZE(child_bi->len);
620
621 if (P_FREESPACE(ppage) < nbytes)
622 return (DB_NEEDSPLIT);
623
624 /* Add a new record for the right page. */
625 memset(&bi, 0, sizeof(bi));
626 bi.len = child_bi->len;
627 B_TSET(bi.type, child_bi->type, 0);
628 bi.pgno = rchild->pgno;
629 bi.nrecs = nrecs;
630 memset(&hdr, 0, sizeof(hdr));
631 hdr.data = &bi;
632 hdr.size = SSZA(BINTERNAL, data);
633 memset(&data, 0, sizeof(data));
634 data.data = child_bi->data;
635 data.size = child_bi->len;
636 if ((ret = __db_pitem(dbc, ppage, off,
637 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
638 return (ret);
639
640 /* Increment the overflow ref count. */
641 if (B_TYPE(child_bi->type) == B_OVERFLOW)
642 if ((ret = __db_ovref(dbc,
643 ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
644 return (ret);
645 break;
646 case P_LBTREE:
647 child_bk = GET_BKEYDATA(rchild, 0);
648 switch (B_TYPE(child_bk->type)) {
649 case B_KEYDATA:
650 nbytes = BINTERNAL_PSIZE(child_bk->len);
651 nksize = child_bk->len;
652 if (t->bt_prefix == NULL)
653 goto noprefix;
654 if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
655 goto noprefix;
656 tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX);
657 if (B_TYPE(tmp_bk->type) != B_KEYDATA)
658 goto noprefix;
659 memset(&a, 0, sizeof(a));
660 a.size = tmp_bk->len;
661 a.data = tmp_bk->data;
662 memset(&b, 0, sizeof(b));
663 b.size = child_bk->len;
664 b.data = child_bk->data;
665 nksize = t->bt_prefix(&a, &b);
666 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
667 nbytes = n;
668 else
669 noprefix: nksize = child_bk->len;
670
671 if (P_FREESPACE(ppage) < nbytes)
672 return (DB_NEEDSPLIT);
673
674 memset(&bi, 0, sizeof(bi));
675 bi.len = nksize;
676 B_TSET(bi.type, child_bk->type, 0);
677 bi.pgno = rchild->pgno;
678 bi.nrecs = nrecs;
679 memset(&hdr, 0, sizeof(hdr));
680 hdr.data = &bi;
681 hdr.size = SSZA(BINTERNAL, data);
682 memset(&data, 0, sizeof(data));
683 data.data = child_bk->data;
684 data.size = nksize;
685 if ((ret = __db_pitem(dbc, ppage, off,
686 BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
687 return (ret);
688 break;
689 case B_DUPLICATE:
690 case B_OVERFLOW:
691 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
692
693 if (P_FREESPACE(ppage) < nbytes)
694 return (DB_NEEDSPLIT);
695
696 memset(&bi, 0, sizeof(bi));
697 bi.len = BOVERFLOW_SIZE;
698 B_TSET(bi.type, child_bk->type, 0);
699 bi.pgno = rchild->pgno;
700 bi.nrecs = nrecs;
701 memset(&hdr, 0, sizeof(hdr));
702 hdr.data = &bi;
703 hdr.size = SSZA(BINTERNAL, data);
704 memset(&data, 0, sizeof(data));
705 data.data = child_bk;
706 data.size = BOVERFLOW_SIZE;
707 if ((ret = __db_pitem(dbc, ppage, off,
708 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
709 return (ret);
710
711 /* Increment the overflow ref count. */
712 if (B_TYPE(child_bk->type) == B_OVERFLOW)
713 if ((ret = __db_ovref(dbc,
714 ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
715 return (ret);
716 break;
717 default:
718 return (__db_pgfmt(dbp, rchild->pgno));
719 }
720 break;
721 case P_IRECNO:
722 case P_LRECNO:
723 nbytes = RINTERNAL_PSIZE;
724
725 if (P_FREESPACE(ppage) < nbytes)
726 return (DB_NEEDSPLIT);
727
728 /* Add a new record for the right page. */
729 memset(&hdr, 0, sizeof(hdr));
730 hdr.data = &ri;
731 hdr.size = RINTERNAL_SIZE;
732 ri.pgno = rchild->pgno;
733 ri.nrecs = nrecs;
734 if ((ret = __db_pitem(dbc,
735 ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
736 return (ret);
737 break;
738 default:
739 return (__db_pgfmt(dbp, rchild->pgno));
740 }
741
742 /* Adjust the parent page's left page record count. */
743 if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
744 /* Log the change. */
745 if (DB_LOGGING(dbc) &&
746 (ret = __bam_cadjust_log(dbp->dbenv->lg_info,
747 dbc->txn, &LSN(ppage), 0, dbp->log_fileid,
748 PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx,
749 -(int32_t)nrecs, (int32_t)0)) != 0)
750 return (ret);
751
752 /* Update the left page count. */
753 if (dbp->type == DB_RECNO)
754 GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
755 else
756 GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
757 }
758
759 return (0);
760 }
761
762 /*
763 * __bam_psplit --
764 * Do the real work of splitting the page.
765 */
766 static int
__bam_psplit(dbc,cp,lp,rp,splitret)767 __bam_psplit(dbc, cp, lp, rp, splitret)
768 DBC *dbc;
769 EPG *cp;
770 PAGE *lp, *rp;
771 db_indx_t *splitret;
772 {
773 DB *dbp;
774 PAGE *pp;
775 db_indx_t half, nbytes, off, splitp, top;
776 int adjust, cnt, isbigkey, ret;
777
778 dbp = dbc->dbp;
779 pp = cp->page;
780 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
781
782 /*
783 * If we're splitting the first (last) page on a level because we're
784 * inserting (appending) a key to it, it's likely that the data is
785 * sorted. Moving a single item to the new page is less work and can
786 * push the fill factor higher than normal. If we're wrong it's not
787 * a big deal, we'll just do the split the right way next time.
788 */
789 off = 0;
790 if (NEXT_PGNO(pp) == PGNO_INVALID &&
791 ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
792 (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
793 off = NUM_ENT(cp->page) - adjust;
794 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
795 off = adjust;
796
797 if (off != 0)
798 goto sort;
799
800 /*
801 * Split the data to the left and right pages. Try not to split on
802 * an overflow key. (Overflow keys on internal pages will slow down
803 * searches.) Refuse to split in the middle of a set of duplicates.
804 *
805 * First, find the optimum place to split.
806 *
807 * It's possible to try and split past the last record on the page if
808 * there's a very large record at the end of the page. Make sure this
809 * doesn't happen by bounding the check at the next-to-last entry on
810 * the page.
811 *
812 * Note, we try and split half the data present on the page. This is
813 * because another process may have already split the page and left
814 * it half empty. We don't try and skip the split -- we don't know
815 * how much space we're going to need on the page, and we may need up
816 * to half the page for a big item, so there's no easy test to decide
817 * if we need to split or not. Besides, if two threads are inserting
818 * data into the same place in the database, we're probably going to
819 * need more space soon anyway.
820 */
821 top = NUM_ENT(pp) - adjust;
822 half = (dbp->pgsize - HOFFSET(pp)) / 2;
823 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
824 switch (TYPE(pp)) {
825 case P_IBTREE:
826 if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
827 nbytes +=
828 BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
829 else
830 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
831 break;
832 case P_LBTREE:
833 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
834 nbytes +=
835 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
836 else
837 nbytes += BOVERFLOW_SIZE;
838
839 ++off;
840 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
841 nbytes +=
842 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
843 else
844 nbytes += BOVERFLOW_SIZE;
845 break;
846 case P_IRECNO:
847 nbytes += RINTERNAL_SIZE;
848 break;
849 case P_LRECNO:
850 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
851 break;
852 default:
853 return (__db_pgfmt(dbp, pp->pgno));
854 }
855 sort: splitp = off;
856
857 /*
858 * Splitp is either at or just past the optimum split point. If
859 * it's a big key, try and find something close by that's not.
860 */
861 if (TYPE(pp) == P_IBTREE)
862 isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
863 else if (TYPE(pp) == P_LBTREE)
864 isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
865 else
866 isbigkey = 0;
867 if (isbigkey)
868 for (cnt = 1; cnt <= 3; ++cnt) {
869 off = splitp + cnt * adjust;
870 if (off < (db_indx_t)NUM_ENT(pp) &&
871 ((TYPE(pp) == P_IBTREE &&
872 B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
873 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
874 splitp = off;
875 break;
876 }
877 if (splitp <= (db_indx_t)(cnt * adjust))
878 continue;
879 off = splitp - cnt * adjust;
880 if (TYPE(pp) == P_IBTREE ?
881 B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
882 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
883 splitp = off;
884 break;
885 }
886 }
887
888 /*
889 * We can't split in the middle a set of duplicates. We know that
890 * no duplicate set can take up more than about 25% of the page,
891 * because that's the point where we push it off onto a duplicate
892 * page set. So, this loop can't be unbounded.
893 */
894 if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE &&
895 pp->inp[splitp] == pp->inp[splitp - adjust])
896 for (cnt = 1;; ++cnt) {
897 off = splitp + cnt * adjust;
898 if (off < NUM_ENT(pp) &&
899 pp->inp[splitp] != pp->inp[off]) {
900 splitp = off;
901 break;
902 }
903 if (splitp <= (db_indx_t)(cnt * adjust))
904 continue;
905 off = splitp - cnt * adjust;
906 if (pp->inp[splitp] != pp->inp[off]) {
907 splitp = off + adjust;
908 break;
909 }
910 }
911
912
913 /* We're going to split at splitp. */
914 if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
915 return (ret);
916 if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
917 return (ret);
918
919 *splitret = splitp;
920 return (0);
921 }
922
923 /*
924 * __bam_copy --
925 * Copy a set of records from one page to another.
926 *
927 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
928 */
929 int
__bam_copy(dbp,pp,cp,nxt,stop)930 __bam_copy(dbp, pp, cp, nxt, stop)
931 DB *dbp;
932 PAGE *pp, *cp;
933 u_int32_t nxt, stop;
934 {
935 db_indx_t nbytes, off;
936
937 /*
938 * Copy the rest of the data to the right page. Nxt is the next
939 * offset placed on the target page.
940 */
941 for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
942 switch (TYPE(pp)) {
943 case P_IBTREE:
944 if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
945 nbytes =
946 BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
947 else
948 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
949 break;
950 case P_LBTREE:
951 /*
952 * If we're on a key and it's a duplicate, just copy
953 * the offset.
954 */
955 if (off != 0 && (nxt % P_INDX) == 0 &&
956 pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
957 cp->inp[off] = cp->inp[off - P_INDX];
958 continue;
959 }
960 /* FALLTHROUGH */
961 case P_LRECNO:
962 if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
963 nbytes =
964 BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
965 else
966 nbytes = BOVERFLOW_SIZE;
967 break;
968 case P_IRECNO:
969 nbytes = RINTERNAL_SIZE;
970 break;
971 default:
972 return (__db_pgfmt(dbp, pp->pgno));
973 }
974 cp->inp[off] = HOFFSET(cp) -= nbytes;
975 memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
976 }
977 return (0);
978 }
979