1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996, 1997, 1998 5 * Sleepycat Software. All rights reserved. 6 */ 7 /* 8 * Copyright (c) 1990, 1993, 1994, 1995, 1996 9 * Keith Bostic. All rights reserved. 10 */ 11 /* 12 * Copyright (c) 1990, 1993, 1994, 1995 13 * The Regents of the University of California. All rights reserved. 14 * 15 * Redistribution and use in source and binary forms, with or without 16 * modification, are permitted provided that the following conditions 17 * are met: 18 * 1. Redistributions of source code must retain the above copyright 19 * notice, this list of conditions and the following disclaimer. 20 * 2. Redistributions in binary form must reproduce the above copyright 21 * notice, this list of conditions and the following disclaimer in the 22 * documentation and/or other materials provided with the distribution. 23 * 3. All advertising materials mentioning features or use of this software 24 * must display the following acknowledgement: 25 * This product includes software developed by the University of 26 * California, Berkeley and its contributors. 27 * 4. Neither the name of the University nor the names of its contributors 28 * may be used to endorse or promote products derived from this software 29 * without specific prior written permission. 30 * 31 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 32 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 33 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 34 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 35 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 36 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 37 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 38 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 39 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 40 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 41 * SUCH DAMAGE. 42 */ 43 44 #include "config.h" 45 46 #ifndef lint 47 static const char sccsid[] = "@(#)bt_split.c 10.33 (Sleepycat) 10/13/98"; 48 #endif /* not lint */ 49 50 #ifndef NO_SYSTEM_INCLUDES 51 #include <sys/types.h> 52 53 #include <errno.h> 54 #include <limits.h> 55 #include <string.h> 56 #endif 57 58 #include "db_int.h" 59 #include "db_page.h" 60 #include "btree.h" 61 62 static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *)); 63 static int __bam_page __P((DBC *, EPG *, EPG *)); 64 static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *)); 65 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *)); 66 static int __bam_root __P((DBC *, EPG *)); 67 static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *)); 68 69 /* 70 * __bam_split -- 71 * Split a page. 72 * 73 * PUBLIC: int __bam_split __P((DBC *, void *)); 74 */ 75 int 76 __bam_split(dbc, arg) 77 DBC *dbc; 78 void *arg; 79 { 80 BTREE *t; 81 CURSOR *cp; 82 DB *dbp; 83 enum { UP, DOWN } dir; 84 int exact, level, ret; 85 86 dbp = dbc->dbp; 87 cp = dbc->internal; 88 89 /* 90 * The locking protocol we use to avoid deadlock to acquire locks by 91 * walking down the tree, but we do it as lazily as possible, locking 92 * the root only as a last resort. We expect all stack pages to have 93 * been discarded before we're called; we discard all short-term locks. 94 * 95 * When __bam_split is first called, we know that a leaf page was too 96 * full for an insert. We don't know what leaf page it was, but we 97 * have the key/recno that caused the problem. We call XX_search to 98 * reacquire the leaf page, but this time get both the leaf page and 99 * its parent, locked. We then split the leaf page and see if the new 100 * internal key will fit into the parent page. If it will, we're done. 101 * 102 * If it won't, we discard our current locks and repeat the process, 103 * only this time acquiring the parent page and its parent, locked. 104 * This process repeats until we succeed in the split, splitting the 105 * root page as the final resort. The entire process then repeats, 106 * as necessary, until we split a leaf page. 107 * 108 * XXX 109 * A traditional method of speeding this up is to maintain a stack of 110 * the pages traversed in the original search. You can detect if the 111 * stack is correct by storing the page's LSN when it was searched and 112 * comparing that LSN with the current one when it's locked during the 113 * split. This would be an easy change for this code, but I have no 114 * numbers that indicate it's worthwhile. 115 */ 116 t = dbp->internal; 117 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) { 118 /* 119 * Acquire a page and its parent, locked. 120 */ 121 if ((ret = (dbp->type == DB_BTREE ? 122 __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) : 123 __bam_rsearch(dbc, 124 (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0) 125 return (ret); 126 127 /* 128 * Split the page if it still needs it (it's possible another 129 * thread of control has already split the page). If we are 130 * guaranteed that two items will fit on the page, the split 131 * is no longer necessary. 132 */ 133 if (t->bt_ovflsize * 2 <= 134 (db_indx_t)P_FREESPACE(cp->csp[0].page)) { 135 __bam_stkrel(dbc, 1); 136 return (0); 137 } 138 ret = cp->csp[0].page->pgno == PGNO_ROOT ? 139 __bam_root(dbc, &cp->csp[0]) : 140 __bam_page(dbc, &cp->csp[-1], &cp->csp[0]); 141 BT_STK_CLR(cp); 142 143 switch (ret) { 144 case 0: 145 /* Once we've split the leaf page, we're done. */ 146 if (level == LEAFLEVEL) 147 return (0); 148 149 /* Switch directions. */ 150 if (dir == UP) 151 dir = DOWN; 152 break; 153 case DB_NEEDSPLIT: 154 /* 155 * It's possible to fail to split repeatedly, as other 156 * threads may be modifying the tree, or the page usage 157 * is sufficiently bad that we don't get enough space 158 * the first time. 159 */ 160 if (dir == DOWN) 161 dir = UP; 162 break; 163 default: 164 return (ret); 165 } 166 } 167 /* NOTREACHED */ 168 } 169 170 /* 171 * __bam_root -- 172 * Split the root page of a btree. 173 */ 174 static int 175 __bam_root(dbc, cp) 176 DBC *dbc; 177 EPG *cp; 178 { 179 DB *dbp; 180 PAGE *lp, *rp; 181 db_indx_t split; 182 int ret; 183 184 dbp = dbc->dbp; 185 186 /* Yeah, right. */ 187 if (cp->page->level >= MAXBTREELEVEL) { 188 ret = ENOSPC; 189 goto err; 190 } 191 192 /* Create new left and right pages for the split. */ 193 lp = rp = NULL; 194 if ((ret = __bam_new(dbc, TYPE(cp->page), &lp)) != 0 || 195 (ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0) 196 goto err; 197 P_INIT(lp, dbp->pgsize, lp->pgno, 198 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno, 199 cp->page->level, TYPE(cp->page)); 200 P_INIT(rp, dbp->pgsize, rp->pgno, 201 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID, 202 cp->page->level, TYPE(cp->page)); 203 204 /* Split the page. */ 205 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) 206 goto err; 207 208 /* Log the change. */ 209 if (DB_LOGGING(dbc)) { 210 DBT __a; 211 DB_LSN __lsn; 212 memset(&__a, 0, sizeof(__a)); 213 __a.data = cp->page; 214 __a.size = dbp->pgsize; 215 ZERO_LSN(__lsn); 216 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn, 217 &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp), 218 PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn, 219 &__a)) != 0) 220 goto err; 221 LSN(lp) = LSN(rp) = LSN(cp->page); 222 } 223 224 /* Clean up the new root page. */ 225 if ((ret = (dbp->type == DB_RECNO ? 226 __ram_root(dbc, cp->page, lp, rp) : 227 __bam_broot(dbc, cp->page, lp, rp))) != 0) 228 goto err; 229 230 /* Adjust any cursors. Do it last so we don't have to undo it. */ 231 __bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1); 232 233 /* Success -- write the real pages back to the store. */ 234 (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY); 235 (void)__BT_TLPUT(dbc, cp->lock); 236 (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY); 237 (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY); 238 239 return (0); 240 241 err: if (lp != NULL) 242 (void)__bam_free(dbc, lp); 243 if (rp != NULL) 244 (void)__bam_free(dbc, rp); 245 (void)memp_fput(dbp->mpf, cp->page, 0); 246 (void)__BT_TLPUT(dbc, cp->lock); 247 return (ret); 248 } 249 250 /* 251 * __bam_page -- 252 * Split the non-root page of a btree. 253 */ 254 static int 255 __bam_page(dbc, pp, cp) 256 DBC *dbc; 257 EPG *pp, *cp; 258 { 259 DB *dbp; 260 DB_LOCK tplock; 261 PAGE *lp, *rp, *tp; 262 db_indx_t split; 263 int ret; 264 265 dbp = dbc->dbp; 266 lp = rp = tp = NULL; 267 ret = -1; 268 269 /* Create new right page for the split. */ 270 if ((ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0) 271 goto err; 272 P_INIT(rp, dbp->pgsize, rp->pgno, 273 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno, 274 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno, 275 cp->page->level, TYPE(cp->page)); 276 277 /* Create new left page for the split. */ 278 if ((ret = __os_malloc(dbp->pgsize, NULL, &lp)) != 0) 279 goto err; 280 P_INIT(lp, dbp->pgsize, cp->page->pgno, 281 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->prev_pgno, 282 ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno, 283 cp->page->level, TYPE(cp->page)); 284 ZERO_LSN(lp->lsn); 285 286 /* 287 * Split right. 288 * 289 * Only the indices are sorted on the page, i.e., the key/data pairs 290 * aren't, so it's simpler to copy the data from the split page onto 291 * two new pages instead of copying half the data to the right page 292 * and compacting the left page in place. Since the left page can't 293 * change, we swap the original and the allocated left page after the 294 * split. 295 */ 296 if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0) 297 goto err; 298 299 /* 300 * Fix up the previous pointer of any leaf page following the split 301 * page. 302 * 303 * !!! 304 * There are interesting deadlock situations here as we write-lock a 305 * page that's not in our direct ancestry. Consider a cursor walking 306 * through the leaf pages, that has the previous page read-locked and 307 * is waiting on a lock for the page we just split. It will deadlock 308 * here. If this is a problem, we can fail in the split; it's not a 309 * problem as the split will succeed after the cursor passes through 310 * the page we're splitting. 311 */ 312 if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) { 313 if ((ret = __bam_lget(dbc, 314 0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0) 315 goto err; 316 if ((ret = memp_fget(dbp->mpf, &rp->next_pgno, 0, &tp)) != 0) 317 goto err; 318 } 319 320 /* Insert the new pages into the parent page. */ 321 if ((ret = __bam_pinsert(dbc, pp, lp, rp)) != 0) 322 goto err; 323 324 /* Log the change. */ 325 if (DB_LOGGING(dbc)) { 326 DBT __a; 327 DB_LSN __lsn; 328 memset(&__a, 0, sizeof(__a)); 329 __a.data = cp->page; 330 __a.size = dbp->pgsize; 331 if (tp == NULL) 332 ZERO_LSN(__lsn); 333 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn, 334 &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page), 335 &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 336 tp == NULL ? 0 : PGNO(tp), 337 tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0) 338 goto err; 339 340 LSN(lp) = LSN(rp) = LSN(cp->page); 341 if (tp != NULL) 342 LSN(tp) = LSN(cp->page); 343 } 344 345 /* Copy the allocated page into place. */ 346 memcpy(cp->page, lp, LOFFSET(lp)); 347 memcpy((u_int8_t *)cp->page + HOFFSET(lp), 348 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp)); 349 __os_free(lp, dbp->pgsize); 350 lp = NULL; 351 352 /* Finish the next-page link. */ 353 if (tp != NULL) 354 tp->prev_pgno = rp->pgno; 355 356 /* Adjust any cursors. Do so last so we don't have to undo it. */ 357 __bam_ca_split(dbp, cp->page->pgno, cp->page->pgno, rp->pgno, split, 0); 358 359 /* Success -- write the real pages back to the store. */ 360 (void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY); 361 (void)__BT_TLPUT(dbc, pp->lock); 362 (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY); 363 (void)__BT_TLPUT(dbc, cp->lock); 364 (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY); 365 if (tp != NULL) { 366 (void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY); 367 (void)__BT_TLPUT(dbc, tplock); 368 } 369 return (0); 370 371 err: if (lp != NULL) 372 __os_free(lp, dbp->pgsize); 373 if (rp != NULL) 374 (void)__bam_free(dbc, rp); 375 if (tp != NULL) { 376 (void)memp_fput(dbp->mpf, tp, 0); 377 if (ret == DB_NEEDSPLIT) 378 (void)__BT_LPUT(dbc, tplock); 379 else 380 (void)__BT_TLPUT(dbc, tplock); 381 } 382 (void)memp_fput(dbp->mpf, pp->page, 0); 383 if (ret == DB_NEEDSPLIT) 384 (void)__BT_LPUT(dbc, pp->lock); 385 else 386 (void)__BT_TLPUT(dbc, pp->lock); 387 (void)memp_fput(dbp->mpf, cp->page, 0); 388 if (ret == DB_NEEDSPLIT) 389 (void)__BT_LPUT(dbc, cp->lock); 390 else 391 (void)__BT_TLPUT(dbc, cp->lock); 392 return (ret); 393 } 394 395 /* 396 * __bam_broot -- 397 * Fix up the btree root page after it has been split. 398 */ 399 static int 400 __bam_broot(dbc, rootp, lp, rp) 401 DBC *dbc; 402 PAGE *rootp, *lp, *rp; 403 { 404 BINTERNAL bi, *child_bi; 405 BKEYDATA *child_bk; 406 DB *dbp; 407 DBT hdr, data; 408 int ret; 409 410 dbp = dbc->dbp; 411 412 /* 413 * If the root page was a leaf page, change it into an internal page. 414 * We copy the key we split on (but not the key's data, in the case of 415 * a leaf page) to the new root page. 416 */ 417 P_INIT(rootp, dbp->pgsize, 418 PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE); 419 420 memset(&data, 0, sizeof(data)); 421 memset(&hdr, 0, sizeof(hdr)); 422 423 /* 424 * The btree comparison code guarantees that the left-most key on any 425 * level of the tree is never used, so it doesn't need to be filled in. 426 */ 427 memset(&bi, 0, sizeof(bi)); 428 bi.len = 0; 429 B_TSET(bi.type, B_KEYDATA, 0); 430 bi.pgno = lp->pgno; 431 if (F_ISSET(dbp, DB_BT_RECNUM)) { 432 bi.nrecs = __bam_total(lp); 433 RE_NREC_SET(rootp, bi.nrecs); 434 } 435 hdr.data = &bi; 436 hdr.size = SSZA(BINTERNAL, data); 437 if ((ret = 438 __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0) 439 return (ret); 440 441 switch (TYPE(rp)) { 442 case P_IBTREE: 443 /* Copy the first key of the child page onto the root page. */ 444 child_bi = GET_BINTERNAL(rp, 0); 445 446 bi.len = child_bi->len; 447 B_TSET(bi.type, child_bi->type, 0); 448 bi.pgno = rp->pgno; 449 if (F_ISSET(dbp, DB_BT_RECNUM)) { 450 bi.nrecs = __bam_total(rp); 451 RE_NREC_ADJ(rootp, bi.nrecs); 452 } 453 hdr.data = &bi; 454 hdr.size = SSZA(BINTERNAL, data); 455 data.data = child_bi->data; 456 data.size = child_bi->len; 457 if ((ret = __db_pitem(dbc, rootp, 1, 458 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) 459 return (ret); 460 461 /* Increment the overflow ref count. */ 462 if (B_TYPE(child_bi->type) == B_OVERFLOW) 463 if ((ret = __db_ovref(dbc, 464 ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0) 465 return (ret); 466 break; 467 case P_LBTREE: 468 /* Copy the first key of the child page onto the root page. */ 469 child_bk = GET_BKEYDATA(rp, 0); 470 switch (B_TYPE(child_bk->type)) { 471 case B_KEYDATA: 472 bi.len = child_bk->len; 473 B_TSET(bi.type, child_bk->type, 0); 474 bi.pgno = rp->pgno; 475 if (F_ISSET(dbp, DB_BT_RECNUM)) { 476 bi.nrecs = __bam_total(rp); 477 RE_NREC_ADJ(rootp, bi.nrecs); 478 } 479 hdr.data = &bi; 480 hdr.size = SSZA(BINTERNAL, data); 481 data.data = child_bk->data; 482 data.size = child_bk->len; 483 if ((ret = __db_pitem(dbc, rootp, 1, 484 BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0) 485 return (ret); 486 break; 487 case B_DUPLICATE: 488 case B_OVERFLOW: 489 bi.len = BOVERFLOW_SIZE; 490 B_TSET(bi.type, child_bk->type, 0); 491 bi.pgno = rp->pgno; 492 if (F_ISSET(dbp, DB_BT_RECNUM)) { 493 bi.nrecs = __bam_total(rp); 494 RE_NREC_ADJ(rootp, bi.nrecs); 495 } 496 hdr.data = &bi; 497 hdr.size = SSZA(BINTERNAL, data); 498 data.data = child_bk; 499 data.size = BOVERFLOW_SIZE; 500 if ((ret = __db_pitem(dbc, rootp, 1, 501 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) 502 return (ret); 503 504 /* Increment the overflow ref count. */ 505 if (B_TYPE(child_bk->type) == B_OVERFLOW) 506 if ((ret = __db_ovref(dbc, 507 ((BOVERFLOW *)child_bk)->pgno, 1)) != 0) 508 return (ret); 509 break; 510 default: 511 return (__db_pgfmt(dbp, rp->pgno)); 512 } 513 break; 514 default: 515 return (__db_pgfmt(dbp, rp->pgno)); 516 } 517 return (0); 518 } 519 520 /* 521 * __ram_root -- 522 * Fix up the recno root page after it has been split. 523 */ 524 static int 525 __ram_root(dbc, rootp, lp, rp) 526 DBC *dbc; 527 PAGE *rootp, *lp, *rp; 528 { 529 DB *dbp; 530 DBT hdr; 531 RINTERNAL ri; 532 int ret; 533 534 dbp = dbc->dbp; 535 536 /* Initialize the page. */ 537 P_INIT(rootp, dbp->pgsize, 538 PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO); 539 540 /* Initialize the header. */ 541 memset(&hdr, 0, sizeof(hdr)); 542 hdr.data = &ri; 543 hdr.size = RINTERNAL_SIZE; 544 545 /* Insert the left and right keys, set the header information. */ 546 ri.pgno = lp->pgno; 547 ri.nrecs = __bam_total(lp); 548 if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0) 549 return (ret); 550 RE_NREC_SET(rootp, ri.nrecs); 551 ri.pgno = rp->pgno; 552 ri.nrecs = __bam_total(rp); 553 if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0) 554 return (ret); 555 RE_NREC_ADJ(rootp, ri.nrecs); 556 return (0); 557 } 558 559 /* 560 * __bam_pinsert -- 561 * Insert a new key into a parent page, completing the split. 562 */ 563 static int 564 __bam_pinsert(dbc, parent, lchild, rchild) 565 DBC *dbc; 566 EPG *parent; 567 PAGE *lchild, *rchild; 568 { 569 BINTERNAL bi, *child_bi; 570 BKEYDATA *child_bk, *tmp_bk; 571 BTREE *t; 572 DB *dbp; 573 DBT a, b, hdr, data; 574 PAGE *ppage; 575 RINTERNAL ri; 576 db_indx_t off; 577 db_recno_t nrecs; 578 u_int32_t n, nbytes, nksize; 579 int ret; 580 581 dbp = dbc->dbp; 582 t = dbp->internal; 583 ppage = parent->page; 584 585 /* If handling record numbers, count records split to the right page. */ 586 nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ? 587 __bam_total(rchild) : 0; 588 589 /* 590 * Now we insert the new page's first key into the parent page, which 591 * completes the split. The parent points to a PAGE and a page index 592 * offset, where the new key goes ONE AFTER the index, because we split 593 * to the right. 594 * 595 * XXX 596 * Some btree algorithms replace the key for the old page as well as 597 * the new page. We don't, as there's no reason to believe that the 598 * first key on the old page is any better than the key we have, and, 599 * in the case of a key being placed at index 0 causing the split, the 600 * key is unavailable. 601 */ 602 off = parent->indx + O_INDX; 603 604 /* 605 * Calculate the space needed on the parent page. 606 * 607 * Prefix trees: space hack used when inserting into BINTERNAL pages. 608 * Retain only what's needed to distinguish between the new entry and 609 * the LAST entry on the page to its left. If the keys compare equal, 610 * retain the entire key. We ignore overflow keys, and the entire key 611 * must be retained for the next-to-leftmost key on the leftmost page 612 * of each level, or the search will fail. Applicable ONLY to internal 613 * pages that have leaf pages as children. Further reduction of the 614 * key between pairs of internal pages loses too much information. 615 */ 616 switch (TYPE(rchild)) { 617 case P_IBTREE: 618 child_bi = GET_BINTERNAL(rchild, 0); 619 nbytes = BINTERNAL_PSIZE(child_bi->len); 620 621 if (P_FREESPACE(ppage) < nbytes) 622 return (DB_NEEDSPLIT); 623 624 /* Add a new record for the right page. */ 625 memset(&bi, 0, sizeof(bi)); 626 bi.len = child_bi->len; 627 B_TSET(bi.type, child_bi->type, 0); 628 bi.pgno = rchild->pgno; 629 bi.nrecs = nrecs; 630 memset(&hdr, 0, sizeof(hdr)); 631 hdr.data = &bi; 632 hdr.size = SSZA(BINTERNAL, data); 633 memset(&data, 0, sizeof(data)); 634 data.data = child_bi->data; 635 data.size = child_bi->len; 636 if ((ret = __db_pitem(dbc, ppage, off, 637 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0) 638 return (ret); 639 640 /* Increment the overflow ref count. */ 641 if (B_TYPE(child_bi->type) == B_OVERFLOW) 642 if ((ret = __db_ovref(dbc, 643 ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0) 644 return (ret); 645 break; 646 case P_LBTREE: 647 child_bk = GET_BKEYDATA(rchild, 0); 648 switch (B_TYPE(child_bk->type)) { 649 case B_KEYDATA: 650 nbytes = BINTERNAL_PSIZE(child_bk->len); 651 nksize = child_bk->len; 652 if (t->bt_prefix == NULL) 653 goto noprefix; 654 if (ppage->prev_pgno == PGNO_INVALID && off <= 1) 655 goto noprefix; 656 tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX); 657 if (B_TYPE(tmp_bk->type) != B_KEYDATA) 658 goto noprefix; 659 memset(&a, 0, sizeof(a)); 660 a.size = tmp_bk->len; 661 a.data = tmp_bk->data; 662 memset(&b, 0, sizeof(b)); 663 b.size = child_bk->len; 664 b.data = child_bk->data; 665 nksize = t->bt_prefix(&a, &b); 666 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) 667 nbytes = n; 668 else 669 noprefix: nksize = child_bk->len; 670 671 if (P_FREESPACE(ppage) < nbytes) 672 return (DB_NEEDSPLIT); 673 674 memset(&bi, 0, sizeof(bi)); 675 bi.len = nksize; 676 B_TSET(bi.type, child_bk->type, 0); 677 bi.pgno = rchild->pgno; 678 bi.nrecs = nrecs; 679 memset(&hdr, 0, sizeof(hdr)); 680 hdr.data = &bi; 681 hdr.size = SSZA(BINTERNAL, data); 682 memset(&data, 0, sizeof(data)); 683 data.data = child_bk->data; 684 data.size = nksize; 685 if ((ret = __db_pitem(dbc, ppage, off, 686 BINTERNAL_SIZE(nksize), &hdr, &data)) != 0) 687 return (ret); 688 break; 689 case B_DUPLICATE: 690 case B_OVERFLOW: 691 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE); 692 693 if (P_FREESPACE(ppage) < nbytes) 694 return (DB_NEEDSPLIT); 695 696 memset(&bi, 0, sizeof(bi)); 697 bi.len = BOVERFLOW_SIZE; 698 B_TSET(bi.type, child_bk->type, 0); 699 bi.pgno = rchild->pgno; 700 bi.nrecs = nrecs; 701 memset(&hdr, 0, sizeof(hdr)); 702 hdr.data = &bi; 703 hdr.size = SSZA(BINTERNAL, data); 704 memset(&data, 0, sizeof(data)); 705 data.data = child_bk; 706 data.size = BOVERFLOW_SIZE; 707 if ((ret = __db_pitem(dbc, ppage, off, 708 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0) 709 return (ret); 710 711 /* Increment the overflow ref count. */ 712 if (B_TYPE(child_bk->type) == B_OVERFLOW) 713 if ((ret = __db_ovref(dbc, 714 ((BOVERFLOW *)child_bk)->pgno, 1)) != 0) 715 return (ret); 716 break; 717 default: 718 return (__db_pgfmt(dbp, rchild->pgno)); 719 } 720 break; 721 case P_IRECNO: 722 case P_LRECNO: 723 nbytes = RINTERNAL_PSIZE; 724 725 if (P_FREESPACE(ppage) < nbytes) 726 return (DB_NEEDSPLIT); 727 728 /* Add a new record for the right page. */ 729 memset(&hdr, 0, sizeof(hdr)); 730 hdr.data = &ri; 731 hdr.size = RINTERNAL_SIZE; 732 ri.pgno = rchild->pgno; 733 ri.nrecs = nrecs; 734 if ((ret = __db_pitem(dbc, 735 ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0) 736 return (ret); 737 break; 738 default: 739 return (__db_pgfmt(dbp, rchild->pgno)); 740 } 741 742 /* Adjust the parent page's left page record count. */ 743 if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) { 744 /* Log the change. */ 745 if (DB_LOGGING(dbc) && 746 (ret = __bam_cadjust_log(dbp->dbenv->lg_info, 747 dbc->txn, &LSN(ppage), 0, dbp->log_fileid, 748 PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx, 749 -(int32_t)nrecs, (int32_t)0)) != 0) 750 return (ret); 751 752 /* Update the left page count. */ 753 if (dbp->type == DB_RECNO) 754 GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs; 755 else 756 GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs; 757 } 758 759 return (0); 760 } 761 762 /* 763 * __bam_psplit -- 764 * Do the real work of splitting the page. 765 */ 766 static int 767 __bam_psplit(dbc, cp, lp, rp, splitret) 768 DBC *dbc; 769 EPG *cp; 770 PAGE *lp, *rp; 771 db_indx_t *splitret; 772 { 773 DB *dbp; 774 PAGE *pp; 775 db_indx_t half, nbytes, off, splitp, top; 776 int adjust, cnt, isbigkey, ret; 777 778 dbp = dbc->dbp; 779 pp = cp->page; 780 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX; 781 782 /* 783 * If we're splitting the first (last) page on a level because we're 784 * inserting (appending) a key to it, it's likely that the data is 785 * sorted. Moving a single item to the new page is less work and can 786 * push the fill factor higher than normal. If we're wrong it's not 787 * a big deal, we'll just do the split the right way next time. 788 */ 789 off = 0; 790 if (NEXT_PGNO(pp) == PGNO_INVALID && 791 ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) || 792 (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page)))) 793 off = NUM_ENT(cp->page) - adjust; 794 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0) 795 off = adjust; 796 797 if (off != 0) 798 goto sort; 799 800 /* 801 * Split the data to the left and right pages. Try not to split on 802 * an overflow key. (Overflow keys on internal pages will slow down 803 * searches.) Refuse to split in the middle of a set of duplicates. 804 * 805 * First, find the optimum place to split. 806 * 807 * It's possible to try and split past the last record on the page if 808 * there's a very large record at the end of the page. Make sure this 809 * doesn't happen by bounding the check at the next-to-last entry on 810 * the page. 811 * 812 * Note, we try and split half the data present on the page. This is 813 * because another process may have already split the page and left 814 * it half empty. We don't try and skip the split -- we don't know 815 * how much space we're going to need on the page, and we may need up 816 * to half the page for a big item, so there's no easy test to decide 817 * if we need to split or not. Besides, if two threads are inserting 818 * data into the same place in the database, we're probably going to 819 * need more space soon anyway. 820 */ 821 top = NUM_ENT(pp) - adjust; 822 half = (dbp->pgsize - HOFFSET(pp)) / 2; 823 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off) 824 switch (TYPE(pp)) { 825 case P_IBTREE: 826 if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA) 827 nbytes += 828 BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len); 829 else 830 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE); 831 break; 832 case P_LBTREE: 833 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) 834 nbytes += 835 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len); 836 else 837 nbytes += BOVERFLOW_SIZE; 838 839 ++off; 840 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) 841 nbytes += 842 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len); 843 else 844 nbytes += BOVERFLOW_SIZE; 845 break; 846 case P_IRECNO: 847 nbytes += RINTERNAL_SIZE; 848 break; 849 case P_LRECNO: 850 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len); 851 break; 852 default: 853 return (__db_pgfmt(dbp, pp->pgno)); 854 } 855 sort: splitp = off; 856 857 /* 858 * Splitp is either at or just past the optimum split point. If 859 * it's a big key, try and find something close by that's not. 860 */ 861 if (TYPE(pp) == P_IBTREE) 862 isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA; 863 else if (TYPE(pp) == P_LBTREE) 864 isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA; 865 else 866 isbigkey = 0; 867 if (isbigkey) 868 for (cnt = 1; cnt <= 3; ++cnt) { 869 off = splitp + cnt * adjust; 870 if (off < (db_indx_t)NUM_ENT(pp) && 871 ((TYPE(pp) == P_IBTREE && 872 B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) || 873 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) { 874 splitp = off; 875 break; 876 } 877 if (splitp <= (db_indx_t)(cnt * adjust)) 878 continue; 879 off = splitp - cnt * adjust; 880 if (TYPE(pp) == P_IBTREE ? 881 B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA : 882 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) { 883 splitp = off; 884 break; 885 } 886 } 887 888 /* 889 * We can't split in the middle a set of duplicates. We know that 890 * no duplicate set can take up more than about 25% of the page, 891 * because that's the point where we push it off onto a duplicate 892 * page set. So, this loop can't be unbounded. 893 */ 894 if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE && 895 pp->inp[splitp] == pp->inp[splitp - adjust]) 896 for (cnt = 1;; ++cnt) { 897 off = splitp + cnt * adjust; 898 if (off < NUM_ENT(pp) && 899 pp->inp[splitp] != pp->inp[off]) { 900 splitp = off; 901 break; 902 } 903 if (splitp <= (db_indx_t)(cnt * adjust)) 904 continue; 905 off = splitp - cnt * adjust; 906 if (pp->inp[splitp] != pp->inp[off]) { 907 splitp = off + adjust; 908 break; 909 } 910 } 911 912 913 /* We're going to split at splitp. */ 914 if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0) 915 return (ret); 916 if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0) 917 return (ret); 918 919 *splitret = splitp; 920 return (0); 921 } 922 923 /* 924 * __bam_copy -- 925 * Copy a set of records from one page to another. 926 * 927 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t)); 928 */ 929 int 930 __bam_copy(dbp, pp, cp, nxt, stop) 931 DB *dbp; 932 PAGE *pp, *cp; 933 u_int32_t nxt, stop; 934 { 935 db_indx_t nbytes, off; 936 937 /* 938 * Copy the rest of the data to the right page. Nxt is the next 939 * offset placed on the target page. 940 */ 941 for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) { 942 switch (TYPE(pp)) { 943 case P_IBTREE: 944 if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA) 945 nbytes = 946 BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len); 947 else 948 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE); 949 break; 950 case P_LBTREE: 951 /* 952 * If we're on a key and it's a duplicate, just copy 953 * the offset. 954 */ 955 if (off != 0 && (nxt % P_INDX) == 0 && 956 pp->inp[nxt] == pp->inp[nxt - P_INDX]) { 957 cp->inp[off] = cp->inp[off - P_INDX]; 958 continue; 959 } 960 /* FALLTHROUGH */ 961 case P_LRECNO: 962 if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA) 963 nbytes = 964 BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len); 965 else 966 nbytes = BOVERFLOW_SIZE; 967 break; 968 case P_IRECNO: 969 nbytes = RINTERNAL_SIZE; 970 break; 971 default: 972 return (__db_pgfmt(dbp, pp->pgno)); 973 } 974 cp->inp[off] = HOFFSET(cp) -= nbytes; 975 memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes); 976 } 977 return (0); 978 } 979