1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 1996, 1997, 1998 5 * Sleepycat Software. All rights reserved. 6 */ 7 8 #include "config.h" 9 10 #ifndef lint 11 static const char sccsid[] = "@(#)db_dup.c 10.35 (Sleepycat) 12/2/98"; 12 #endif /* not lint */ 13 14 #ifndef NO_SYSTEM_INCLUDES 15 #include <sys/types.h> 16 17 #include <errno.h> 18 #include <string.h> 19 #endif 20 21 #include "db_int.h" 22 #include "db_page.h" 23 #include "btree.h" 24 #include "db_am.h" 25 26 static int __db_addpage __P((DBC *, 27 PAGE **, db_indx_t *, int (*)(DBC *, u_int32_t, PAGE **))); 28 static int __db_dsplit __P((DBC *, 29 PAGE **, db_indx_t *, u_int32_t, int (*)(DBC *, u_int32_t, PAGE **))); 30 31 /* 32 * __db_dput -- 33 * Put a duplicate item onto a duplicate page at the given index. 34 * 35 * PUBLIC: int __db_dput __P((DBC *, DBT *, 36 * PUBLIC: PAGE **, db_indx_t *, int (*)(DBC *, u_int32_t, PAGE **))); 37 */ 38 int 39 __db_dput(dbc, dbt, pp, indxp, newfunc) 40 DBC *dbc; 41 DBT *dbt; 42 PAGE **pp; 43 db_indx_t *indxp; 44 int (*newfunc) __P((DBC *, u_int32_t, PAGE **)); 45 { 46 BOVERFLOW bo; 47 DBT *data_dbtp, hdr_dbt, *hdr_dbtp; 48 PAGE *pagep; 49 db_indx_t size, isize; 50 db_pgno_t pgno; 51 int ret; 52 53 /* 54 * We need some access method independent threshold for when we put 55 * a duplicate item onto an overflow page. 56 */ 57 if (dbt->size > 0.25 * dbc->dbp->pgsize) { 58 if ((ret = __db_poff(dbc, dbt, &pgno, newfunc)) != 0) 59 return (ret); 60 UMRW(bo.unused1); 61 B_TSET(bo.type, B_OVERFLOW, 0); 62 UMRW(bo.unused2); 63 bo.tlen = dbt->size; 64 bo.pgno = pgno; 65 hdr_dbt.data = &bo; 66 hdr_dbt.size = isize = BOVERFLOW_SIZE; 67 hdr_dbtp = &hdr_dbt; 68 size = BOVERFLOW_PSIZE; 69 data_dbtp = NULL; 70 } else { 71 size = BKEYDATA_PSIZE(dbt->size); 72 isize = BKEYDATA_SIZE(dbt->size); 73 hdr_dbtp = NULL; 74 data_dbtp = dbt; 75 } 76 77 pagep = *pp; 78 if (size > P_FREESPACE(pagep)) { 79 if (*indxp == NUM_ENT(*pp) && NEXT_PGNO(*pp) == PGNO_INVALID) 80 ret = __db_addpage(dbc, pp, indxp, newfunc); 81 else 82 ret = __db_dsplit(dbc, pp, indxp, isize, newfunc); 83 if (ret != 0) 84 /* 85 * XXX 86 * Pages not returned to free list. 87 */ 88 return (ret); 89 pagep = *pp; 90 } 91 92 /* 93 * Now, pagep references the page on which to insert and indx is the 94 * the location to insert. 95 */ 96 if ((ret = __db_pitem(dbc, 97 pagep, (u_int32_t)*indxp, isize, hdr_dbtp, data_dbtp)) != 0) 98 return (ret); 99 100 (void)memp_fset(dbc->dbp->mpf, pagep, DB_MPOOL_DIRTY); 101 return (0); 102 } 103 104 /* 105 * __db_drem -- 106 * Remove a duplicate at the given index on the given page. 107 * 108 * PUBLIC: int __db_drem __P((DBC *, 109 * PUBLIC: PAGE **, u_int32_t, int (*)(DBC *, PAGE *))); 110 */ 111 int 112 __db_drem(dbc, pp, indx, freefunc) 113 DBC *dbc; 114 PAGE **pp; 115 u_int32_t indx; 116 int (*freefunc) __P((DBC *, PAGE *)); 117 { 118 PAGE *pagep; 119 int ret; 120 121 pagep = *pp; 122 123 /* Check if we are freeing a big item. */ 124 if (B_TYPE(GET_BKEYDATA(pagep, indx)->type) == B_OVERFLOW) { 125 if ((ret = __db_doff(dbc, 126 GET_BOVERFLOW(pagep, indx)->pgno, freefunc)) != 0) 127 return (ret); 128 ret = __db_ditem(dbc, pagep, indx, BOVERFLOW_SIZE); 129 } else 130 ret = __db_ditem(dbc, pagep, indx, 131 BKEYDATA_SIZE(GET_BKEYDATA(pagep, indx)->len)); 132 if (ret != 0) 133 return (ret); 134 135 if (NUM_ENT(pagep) == 0) { 136 /* 137 * If the page is emptied, then the page is freed and the pp 138 * parameter is set to reference the next, locked page in the 139 * duplicate chain, if one exists. If there was no such page, 140 * then it is set to NULL. 141 * 142 * !!! 143 * __db_relink will set the dirty bit for us. 144 */ 145 if ((ret = __db_relink(dbc, DB_REM_PAGE, pagep, pp, 0)) != 0) 146 return (ret); 147 if ((ret = freefunc(dbc, pagep)) != 0) 148 return (ret); 149 } else 150 (void)memp_fset(dbc->dbp->mpf, pagep, DB_MPOOL_DIRTY); 151 152 return (0); 153 } 154 155 /* 156 * __db_dend -- 157 * Find the last page in a set of offpage duplicates. 158 * 159 * PUBLIC: int __db_dend __P((DBC *, db_pgno_t, PAGE **)); 160 */ 161 int 162 __db_dend(dbc, pgno, pp) 163 DBC *dbc; 164 db_pgno_t pgno; 165 PAGE **pp; 166 { 167 DB *dbp; 168 PAGE *h; 169 int ret; 170 171 dbp = dbc->dbp; 172 173 /* 174 * This implements DB_KEYLAST. The last page is returned in pp; pgno 175 * should be the page number of the first page of the duplicate chain. 176 * 177 * *pp may be non-NULL -- if given a valid page use it. 178 */ 179 if (*pp != NULL) 180 goto started; 181 for (;;) { 182 if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) { 183 (void)__db_pgerr(dbp, pgno); 184 return (ret); 185 } 186 started: h = *pp; 187 188 if ((pgno = NEXT_PGNO(h)) == PGNO_INVALID) 189 break; 190 191 if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) 192 return (ret); 193 } 194 return (0); 195 } 196 197 /* 198 * __db_dsplit -- 199 * Split a page of duplicates, calculating the split point based 200 * on an element of size "size" being added at "*indxp". 201 * On entry hp contains a pointer to the page-pointer of the original 202 * page. On exit, it returns a pointer to the page containing "*indxp" 203 * and "indxp" has been modified to reflect the index on the new page 204 * where the element should be added. The function returns with 205 * the page on which the insert should happen, not yet put. 206 */ 207 static int 208 __db_dsplit(dbc, hp, indxp, size, newfunc) 209 DBC *dbc; 210 PAGE **hp; 211 db_indx_t *indxp; 212 u_int32_t size; 213 int (*newfunc) __P((DBC *, u_int32_t, PAGE **)); 214 { 215 PAGE *h, *np, *tp; 216 BKEYDATA *bk; 217 DBT page_dbt; 218 DB *dbp; 219 size_t pgsize; 220 db_indx_t halfbytes, i, indx, lastsum, nindex, oindex, s, sum; 221 int did_indx, ret, t_ret; 222 223 h = *hp; 224 indx = *indxp; 225 ret = 0; 226 dbp = dbc->dbp; 227 pgsize = dbp->pgsize; 228 229 /* Create a temporary page to do compaction onto. */ 230 if ((ret = __os_malloc(pgsize, NULL, &tp)) != 0) 231 return (ret); 232 233 /* Create new page for the split. */ 234 if ((ret = newfunc(dbc, P_DUPLICATE, &np)) != 0) { 235 __os_free(tp, pgsize); 236 return (ret); 237 } 238 239 P_INIT(np, pgsize, PGNO(np), PGNO(h), NEXT_PGNO(h), 0, 240 P_DUPLICATE); 241 P_INIT(tp, pgsize, PGNO(h), PREV_PGNO(h), PGNO(np), 0, 242 P_DUPLICATE); 243 244 /* Figure out the split point */ 245 halfbytes = (pgsize - HOFFSET(h)) / 2; 246 did_indx = 0; 247 for (sum = 0, lastsum = 0, i = 0; i < NUM_ENT(h); i++) { 248 if (i == indx) { 249 sum += size; 250 did_indx = 1; 251 if (lastsum < halfbytes && sum >= halfbytes) { 252 /* We've crossed the halfway point. */ 253 if ((db_indx_t)(halfbytes - lastsum) < 254 (db_indx_t)(sum - halfbytes)) { 255 *hp = np; 256 *indxp = 0; 257 } else 258 *indxp = i; 259 break; 260 } 261 *indxp = i; 262 lastsum = sum; 263 } 264 if (B_TYPE(GET_BKEYDATA(h, i)->type) == B_KEYDATA) 265 sum += BKEYDATA_SIZE(GET_BKEYDATA(h, i)->len); 266 else 267 sum += BOVERFLOW_SIZE; 268 269 if (lastsum < halfbytes && sum >= halfbytes) { 270 /* We've crossed the halfway point. */ 271 if ((db_indx_t)(sum - halfbytes) < 272 (db_indx_t)(halfbytes - lastsum)) 273 i++; 274 break; 275 } 276 } 277 /* 278 * Check if we have set the return values of the index pointer and 279 * page pointer. 280 */ 281 if (!did_indx) { 282 *hp = np; 283 *indxp = indx - i; 284 } 285 286 if (DB_LOGGING(dbc)) { 287 page_dbt.size = dbp->pgsize; 288 page_dbt.data = h; 289 if ((ret = __db_split_log(dbp->dbenv->lg_info, 290 dbc->txn, &LSN(h), 0, DB_SPLITOLD, dbp->log_fileid, 291 PGNO(h), &page_dbt, &LSN(h))) != 0) { 292 __os_free(tp, pgsize); 293 return (ret); 294 } 295 LSN(tp) = LSN(h); 296 } 297 298 /* 299 * If it's a btree, adjust the cursors. 300 * 301 * i is the index of the first element to move onto the new page. 302 */ 303 if (dbp->type == DB_BTREE) 304 __bam_ca_split(dbp, PGNO(h), PGNO(h), PGNO(np), i, 0); 305 306 for (nindex = 0, oindex = i; oindex < NUM_ENT(h); oindex++) { 307 bk = GET_BKEYDATA(h, oindex); 308 if (B_TYPE(bk->type) == B_KEYDATA) 309 s = BKEYDATA_SIZE(bk->len); 310 else 311 s = BOVERFLOW_SIZE; 312 313 np->inp[nindex++] = HOFFSET(np) -= s; 314 memcpy((u_int8_t *)np + HOFFSET(np), bk, s); 315 NUM_ENT(np)++; 316 } 317 318 /* 319 * Now do data compaction by copying the remaining stuff onto the 320 * temporary page and then copying it back to the real page. 321 */ 322 for (nindex = 0, oindex = 0; oindex < i; oindex++) { 323 bk = GET_BKEYDATA(h, oindex); 324 if (B_TYPE(bk->type) == B_KEYDATA) 325 s = BKEYDATA_SIZE(bk->len); 326 else 327 s = BOVERFLOW_SIZE; 328 329 tp->inp[nindex++] = HOFFSET(tp) -= s; 330 memcpy((u_int8_t *)tp + HOFFSET(tp), bk, s); 331 NUM_ENT(tp)++; 332 } 333 334 /* 335 * This page (the temporary) should be only half full, so we do two 336 * memcpy's, one for the top of the page and one for the bottom of 337 * the page. This way we avoid copying the middle which should be 338 * about half a page. 339 */ 340 memcpy(h, tp, LOFFSET(tp)); 341 memcpy((u_int8_t *)h + HOFFSET(tp), 342 (u_int8_t *)tp + HOFFSET(tp), pgsize - HOFFSET(tp)); 343 __os_free(tp, pgsize); 344 345 if (DB_LOGGING(dbc)) { 346 /* 347 * XXX 348 * If either of these fails, are we leaving pages pinned? 349 * Yes, but it seems like this happens in error case. 350 */ 351 page_dbt.size = pgsize; 352 page_dbt.data = h; 353 if ((ret = __db_split_log(dbp->dbenv->lg_info, 354 dbc->txn, &LSN(h), 0, DB_SPLITNEW, dbp->log_fileid, 355 PGNO(h), &page_dbt, &LSN(h))) != 0) 356 return (ret); 357 358 page_dbt.size = pgsize; 359 page_dbt.data = np; 360 if ((ret = __db_split_log(dbp->dbenv->lg_info, 361 dbc->txn, &LSN(np), 0, DB_SPLITNEW, dbp->log_fileid, 362 PGNO(np), &page_dbt, &LSN(np))) != 0) 363 return (ret); 364 } 365 366 /* 367 * Finally, if there was a next page after the page being 368 * split, fix its prev pointer. 369 */ 370 if (np->next_pgno != PGNO_INVALID) 371 ret = __db_relink(dbc, DB_ADD_PAGE, np, NULL, 1); 372 373 /* 374 * Figure out if the location we're interested in is on the new 375 * page, and if so, reset the callers' pointer. Push the other 376 * page back to the store. 377 */ 378 if (*hp == h) 379 t_ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY); 380 else 381 t_ret = memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY); 382 383 return (ret != 0 ? ret : t_ret); 384 } 385 386 /* 387 * __db_ditem -- 388 * Remove an item from a page. 389 * 390 * PUBLIC: int __db_ditem __P((DBC *, PAGE *, u_int32_t, u_int32_t)); 391 */ 392 int 393 __db_ditem(dbc, pagep, indx, nbytes) 394 DBC *dbc; 395 PAGE *pagep; 396 u_int32_t indx, nbytes; 397 { 398 DB *dbp; 399 DBT ldbt; 400 db_indx_t cnt, offset; 401 int ret; 402 u_int8_t *from; 403 404 dbp = dbc->dbp; 405 if (DB_LOGGING(dbc)) { 406 ldbt.data = P_ENTRY(pagep, indx); 407 ldbt.size = nbytes; 408 if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbc->txn, 409 &LSN(pagep), 0, DB_REM_DUP, dbp->log_fileid, PGNO(pagep), 410 (u_int32_t)indx, nbytes, &ldbt, NULL, &LSN(pagep))) != 0) 411 return (ret); 412 } 413 414 /* 415 * If there's only a single item on the page, we don't have to 416 * work hard. 417 */ 418 if (NUM_ENT(pagep) == 1) { 419 NUM_ENT(pagep) = 0; 420 HOFFSET(pagep) = dbp->pgsize; 421 return (0); 422 } 423 424 /* 425 * Pack the remaining key/data items at the end of the page. Use 426 * memmove(3), the regions may overlap. 427 */ 428 from = (u_int8_t *)pagep + HOFFSET(pagep); 429 memmove(from + nbytes, from, pagep->inp[indx] - HOFFSET(pagep)); 430 HOFFSET(pagep) += nbytes; 431 432 /* Adjust the indices' offsets. */ 433 offset = pagep->inp[indx]; 434 for (cnt = 0; cnt < NUM_ENT(pagep); ++cnt) 435 if (pagep->inp[cnt] < offset) 436 pagep->inp[cnt] += nbytes; 437 438 /* Shift the indices down. */ 439 --NUM_ENT(pagep); 440 if (indx != NUM_ENT(pagep)) 441 memmove(&pagep->inp[indx], &pagep->inp[indx + 1], 442 sizeof(db_indx_t) * (NUM_ENT(pagep) - indx)); 443 444 /* If it's a btree, adjust the cursors. */ 445 if (dbp->type == DB_BTREE) 446 __bam_ca_di(dbp, PGNO(pagep), indx, -1); 447 448 return (0); 449 } 450 451 /* 452 * __db_pitem -- 453 * Put an item on a page. 454 * 455 * PUBLIC: int __db_pitem 456 * PUBLIC: __P((DBC *, PAGE *, u_int32_t, u_int32_t, DBT *, DBT *)); 457 */ 458 int 459 __db_pitem(dbc, pagep, indx, nbytes, hdr, data) 460 DBC *dbc; 461 PAGE *pagep; 462 u_int32_t indx; 463 u_int32_t nbytes; 464 DBT *hdr, *data; 465 { 466 DB *dbp; 467 BKEYDATA bk; 468 DBT thdr; 469 int ret; 470 u_int8_t *p; 471 472 /* 473 * Put a single item onto a page. The logic figuring out where to 474 * insert and whether it fits is handled in the caller. All we do 475 * here is manage the page shuffling. We cheat a little bit in that 476 * we don't want to copy the dbt on a normal put twice. If hdr is 477 * NULL, we create a BKEYDATA structure on the page, otherwise, just 478 * copy the caller's information onto the page. 479 * 480 * This routine is also used to put entries onto the page where the 481 * entry is pre-built, e.g., during recovery. In this case, the hdr 482 * will point to the entry, and the data argument will be NULL. 483 * 484 * !!! 485 * There's a tremendous potential for off-by-one errors here, since 486 * the passed in header sizes must be adjusted for the structure's 487 * placeholder for the trailing variable-length data field. 488 */ 489 dbp = dbc->dbp; 490 if (DB_LOGGING(dbc)) 491 if ((ret = __db_addrem_log(dbp->dbenv->lg_info, dbc->txn, 492 &LSN(pagep), 0, DB_ADD_DUP, dbp->log_fileid, PGNO(pagep), 493 (u_int32_t)indx, nbytes, hdr, data, &LSN(pagep))) != 0) 494 return (ret); 495 496 if (hdr == NULL) { 497 B_TSET(bk.type, B_KEYDATA, 0); 498 bk.len = data == NULL ? 0 : data->size; 499 500 thdr.data = &bk; 501 thdr.size = SSZA(BKEYDATA, data); 502 hdr = &thdr; 503 } 504 505 /* Adjust the index table, then put the item on the page. */ 506 if (indx != NUM_ENT(pagep)) 507 memmove(&pagep->inp[indx + 1], &pagep->inp[indx], 508 sizeof(db_indx_t) * (NUM_ENT(pagep) - indx)); 509 HOFFSET(pagep) -= nbytes; 510 pagep->inp[indx] = HOFFSET(pagep); 511 ++NUM_ENT(pagep); 512 513 p = P_ENTRY(pagep, indx); 514 memcpy(p, hdr->data, hdr->size); 515 if (data != NULL) 516 memcpy(p + hdr->size, data->data, data->size); 517 518 /* If it's a btree, adjust the cursors. */ 519 if (dbp->type == DB_BTREE) 520 __bam_ca_di(dbp, PGNO(pagep), indx, 1); 521 522 return (0); 523 } 524 525 /* 526 * __db_relink -- 527 * Relink around a deleted page. 528 * 529 * PUBLIC: int __db_relink __P((DBC *, u_int32_t, PAGE *, PAGE **, int)); 530 */ 531 int 532 __db_relink(dbc, add_rem, pagep, new_next, needlock) 533 DBC *dbc; 534 u_int32_t add_rem; 535 PAGE *pagep, **new_next; 536 int needlock; 537 { 538 DB *dbp; 539 PAGE *np, *pp; 540 DB_LOCK npl, ppl; 541 DB_LSN *nlsnp, *plsnp; 542 int ret; 543 544 ret = 0; 545 np = pp = NULL; 546 npl = ppl = LOCK_INVALID; 547 nlsnp = plsnp = NULL; 548 dbp = dbc->dbp; 549 550 /* 551 * Retrieve and lock the one/two pages. For a remove, we may need 552 * two pages (the before and after). For an add, we only need one 553 * because, the split took care of the prev. 554 */ 555 if (pagep->next_pgno != PGNO_INVALID) { 556 if (needlock && (ret = __bam_lget(dbc, 557 0, pagep->next_pgno, DB_LOCK_WRITE, &npl)) != 0) 558 goto err; 559 if ((ret = memp_fget(dbp->mpf, 560 &pagep->next_pgno, 0, &np)) != 0) { 561 (void)__db_pgerr(dbp, pagep->next_pgno); 562 goto err; 563 } 564 nlsnp = &np->lsn; 565 } 566 if (add_rem == DB_REM_PAGE && pagep->prev_pgno != PGNO_INVALID) { 567 if (needlock && (ret = __bam_lget(dbc, 568 0, pagep->prev_pgno, DB_LOCK_WRITE, &ppl)) != 0) 569 goto err; 570 if ((ret = memp_fget(dbp->mpf, 571 &pagep->prev_pgno, 0, &pp)) != 0) { 572 (void)__db_pgerr(dbp, pagep->next_pgno); 573 goto err; 574 } 575 plsnp = &pp->lsn; 576 } 577 578 /* Log the change. */ 579 if (DB_LOGGING(dbc)) { 580 if ((ret = __db_relink_log(dbp->dbenv->lg_info, dbc->txn, 581 &pagep->lsn, 0, add_rem, dbp->log_fileid, 582 pagep->pgno, &pagep->lsn, 583 pagep->prev_pgno, plsnp, pagep->next_pgno, nlsnp)) != 0) 584 goto err; 585 if (np != NULL) 586 np->lsn = pagep->lsn; 587 if (pp != NULL) 588 pp->lsn = pagep->lsn; 589 } 590 591 /* 592 * Modify and release the two pages. 593 * 594 * !!! 595 * The parameter new_next gets set to the page following the page we 596 * are removing. If there is no following page, then new_next gets 597 * set to NULL. 598 */ 599 if (np != NULL) { 600 if (add_rem == DB_ADD_PAGE) 601 np->prev_pgno = pagep->pgno; 602 else 603 np->prev_pgno = pagep->prev_pgno; 604 if (new_next == NULL) 605 ret = memp_fput(dbp->mpf, np, DB_MPOOL_DIRTY); 606 else { 607 *new_next = np; 608 ret = memp_fset(dbp->mpf, np, DB_MPOOL_DIRTY); 609 } 610 if (ret != 0) 611 goto err; 612 if (needlock) 613 (void)__bam_lput(dbc, npl); 614 } else if (new_next != NULL) 615 *new_next = NULL; 616 617 if (pp != NULL) { 618 pp->next_pgno = pagep->next_pgno; 619 if ((ret = memp_fput(dbp->mpf, pp, DB_MPOOL_DIRTY)) != 0) 620 goto err; 621 if (needlock) 622 (void)__bam_lput(dbc, ppl); 623 } 624 return (0); 625 626 err: if (np != NULL) 627 (void)memp_fput(dbp->mpf, np, 0); 628 if (needlock && npl != LOCK_INVALID) 629 (void)__bam_lput(dbc, npl); 630 if (pp != NULL) 631 (void)memp_fput(dbp->mpf, pp, 0); 632 if (needlock && ppl != LOCK_INVALID) 633 (void)__bam_lput(dbc, ppl); 634 return (ret); 635 } 636 637 /* 638 * __db_ddup -- 639 * Delete an offpage chain of duplicates. 640 * 641 * PUBLIC: int __db_ddup __P((DBC *, db_pgno_t, int (*)(DBC *, PAGE *))); 642 */ 643 int 644 __db_ddup(dbc, pgno, freefunc) 645 DBC *dbc; 646 db_pgno_t pgno; 647 int (*freefunc) __P((DBC *, PAGE *)); 648 { 649 DB *dbp; 650 PAGE *pagep; 651 DBT tmp_dbt; 652 int ret; 653 654 dbp = dbc->dbp; 655 do { 656 if ((ret = memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0) { 657 (void)__db_pgerr(dbp, pgno); 658 return (ret); 659 } 660 661 if (DB_LOGGING(dbc)) { 662 tmp_dbt.data = pagep; 663 tmp_dbt.size = dbp->pgsize; 664 if ((ret = __db_split_log(dbp->dbenv->lg_info, 665 dbc->txn, &LSN(pagep), 0, DB_SPLITOLD, 666 dbp->log_fileid, PGNO(pagep), &tmp_dbt, 667 &LSN(pagep))) != 0) 668 return (ret); 669 } 670 pgno = pagep->next_pgno; 671 if ((ret = freefunc(dbc, pagep)) != 0) 672 return (ret); 673 } while (pgno != PGNO_INVALID); 674 675 return (0); 676 } 677 678 /* 679 * __db_addpage -- 680 * Create a new page and link it onto the next_pgno field of the 681 * current page. 682 */ 683 static int 684 __db_addpage(dbc, hp, indxp, newfunc) 685 DBC *dbc; 686 PAGE **hp; 687 db_indx_t *indxp; 688 int (*newfunc) __P((DBC *, u_int32_t, PAGE **)); 689 { 690 DB *dbp; 691 PAGE *newpage; 692 int ret; 693 694 dbp = dbc->dbp; 695 if ((ret = newfunc(dbc, P_DUPLICATE, &newpage)) != 0) 696 return (ret); 697 698 if (DB_LOGGING(dbc)) { 699 if ((ret = __db_addpage_log(dbp->dbenv->lg_info, 700 dbc->txn, &LSN(*hp), 0, dbp->log_fileid, 701 PGNO(*hp), &LSN(*hp), PGNO(newpage), &LSN(newpage))) != 0) { 702 return (ret); 703 } 704 LSN(newpage) = LSN(*hp); 705 } 706 707 PREV_PGNO(newpage) = PGNO(*hp); 708 NEXT_PGNO(*hp) = PGNO(newpage); 709 710 if ((ret = memp_fput(dbp->mpf, *hp, DB_MPOOL_DIRTY)) != 0) 711 return (ret); 712 *hp = newpage; 713 *indxp = 0; 714 return (0); 715 } 716 717 /* 718 * __db_dsearch -- 719 * Search a set of duplicates for the proper position for a new duplicate. 720 * 721 * + pgno is the page number of the page on which to begin searching. 722 * Since we can continue duplicate searches, it might not be the first 723 * page. 724 * 725 * + If we are continuing a search, then *pp may be non-NULL in which 726 * case we do not have to retrieve the page. 727 * 728 * + If we are continuing a search, then *indxp contains the first 729 * on pgno of where we should begin the search. 730 * 731 * NOTE: if there is no comparison function, then continuing is 732 * meaningless, and *pp should always be NULL and *indxp will be 733 * ignored. 734 * 735 * 3 return values:: 736 * 737 * + pp is the returned page pointer of where this element should go. 738 * + indxp is the returned index on that page 739 * + cmpp is the returned final comparison result. 740 * 741 * PUBLIC: int __db_dsearch __P((DBC *, 742 * PUBLIC: int, DBT *, db_pgno_t, db_indx_t *, PAGE **, int *)); 743 */ 744 int 745 __db_dsearch(dbc, is_insert, dbt, pgno, indxp, pp, cmpp) 746 DBC *dbc; 747 int is_insert, *cmpp; 748 DBT *dbt; 749 db_pgno_t pgno; 750 db_indx_t *indxp; 751 PAGE **pp; 752 { 753 DB *dbp; 754 PAGE *h; 755 db_indx_t base, indx, lim, save_indx; 756 db_pgno_t save_pgno; 757 int ret; 758 759 dbp = dbc->dbp; 760 761 if (dbp->dup_compare == NULL) { 762 /* 763 * We may have been given a valid page, but we may not be 764 * able to use it. The problem is that the application is 765 * doing a join and we're trying to continue the search, 766 * but since the items aren't sorted, we can't. Discard 767 * the page if it's not the one we're going to start with 768 * anyway. 769 */ 770 if (*pp != NULL && (*pp)->pgno != pgno) { 771 if ((ret = memp_fput(dbp->mpf, *pp, 0)) != 0) 772 return (ret); 773 *pp = NULL; 774 } 775 776 /* 777 * If no duplicate function is specified, just go to the end 778 * of the duplicate set. 779 */ 780 if (is_insert) { 781 if ((ret = __db_dend(dbc, pgno, pp)) != 0) 782 return (ret); 783 *indxp = NUM_ENT(*pp); 784 return (0); 785 } 786 787 /* 788 * We are looking for a specific duplicate, so do a linear 789 * search. 790 */ 791 if (*pp != NULL) 792 goto nocmp_started; 793 for (;;) { 794 if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) 795 goto pg_err; 796 nocmp_started: h = *pp; 797 798 for (*indxp = 0; *indxp < NUM_ENT(h); ++*indxp) { 799 if ((*cmpp = __bam_cmp(dbp, 800 dbt, h, *indxp, __bam_defcmp)) != 0) 801 continue; 802 /* 803 * The duplicate may have already been deleted, 804 * if it's a btree page, in which case we skip 805 * it. 806 */ 807 if (dbp->type == DB_BTREE && 808 B_DISSET(GET_BKEYDATA(h, *indxp)->type)) 809 continue; 810 811 return (0); 812 } 813 814 if ((pgno = h->next_pgno) == PGNO_INVALID) 815 break; 816 817 if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) 818 return (ret); 819 } 820 *cmpp = 1; /* We didn't succeed... */ 821 return (0); 822 } 823 824 /* 825 * We have a comparison routine, i.e., the duplicates are sorted. 826 * Walk through the chain of duplicates, checking the last entry 827 * on each page to decide if it's the page we want to search. 828 * 829 * *pp may be non-NULL -- if we were given a valid page (e.g., are 830 * in mid-search), then use the provided page. 831 */ 832 if (*pp != NULL) 833 goto cmp_started; 834 for (;;) { 835 if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) 836 goto pg_err; 837 cmp_started: h = *pp; 838 839 if ((pgno = h->next_pgno) == PGNO_INVALID || __bam_cmp(dbp, 840 dbt, h, h->entries - 1, dbp->dup_compare) <= 0) 841 break; 842 /* 843 * Even when continuing a search, make sure we don't skip 844 * entries on a new page 845 */ 846 *indxp = 0; 847 848 if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) 849 return (ret); 850 } 851 852 /* Next, do a binary search on the page. */ 853 base = F_ISSET(dbc, DBC_CONTINUE) ? *indxp : 0; 854 for (lim = NUM_ENT(h) - base; lim != 0; lim >>= 1) { 855 indx = base + (lim >> 1); 856 if ((*cmpp = __bam_cmp(dbp, 857 dbt, h, indx, dbp->dup_compare)) == 0) { 858 *indxp = indx; 859 860 if (dbp->type != DB_BTREE || 861 !B_DISSET(GET_BKEYDATA(h, *indxp)->type)) 862 return (0); 863 goto check_delete; 864 } 865 if (*cmpp > 0) { 866 base = indx + 1; 867 lim--; 868 } 869 } 870 871 /* 872 * Base references the smallest index larger than the supplied DBT's 873 * data item, potentially both 0 and NUM_ENT. 874 */ 875 *indxp = base; 876 return (0); 877 878 check_delete: 879 /* 880 * The duplicate may have already been deleted, if it's a btree page, 881 * in which case we wander around, hoping to find an entry that hasn't 882 * been deleted. First, wander in a forwardly direction. 883 */ 884 save_pgno = (*pp)->pgno; 885 save_indx = *indxp; 886 for (++*indxp;;) { 887 for (; *indxp < NUM_ENT(h); ++*indxp) { 888 if ((*cmpp = __bam_cmp(dbp, 889 dbt, h, *indxp, dbp->dup_compare)) != 0) 890 goto check_delete_rev; 891 892 if (!B_DISSET(GET_BKEYDATA(h, *indxp)->type)) 893 return (0); 894 } 895 if ((pgno = h->next_pgno) == PGNO_INVALID) 896 break; 897 898 if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) 899 return (ret); 900 901 if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) 902 goto pg_err; 903 h = *pp; 904 905 *indxp = 0; 906 } 907 908 check_delete_rev: 909 /* Go back to where we started, and wander in a backwardly direction. */ 910 if (h->pgno != save_pgno) { 911 if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) 912 return (ret); 913 if ((ret = memp_fget(dbp->mpf, &save_pgno, 0, pp)) != 0) 914 goto pg_err; 915 h = *pp; 916 } 917 918 for (;;) { 919 while (*indxp > 0) { 920 --*indxp; 921 if ((*cmpp = __bam_cmp(dbp, 922 dbt, h, *indxp, dbp->dup_compare)) != 0) 923 goto check_delete_fail; 924 925 if (!B_DISSET(GET_BKEYDATA(h, *indxp)->type)) 926 return (0); 927 } 928 if ((pgno = h->prev_pgno) == PGNO_INVALID) 929 break; 930 931 if ((ret = memp_fput(dbp->mpf, h, 0)) != 0) 932 return (ret); 933 934 if ((ret = memp_fget(dbp->mpf, &pgno, 0, pp)) != 0) 935 goto pg_err; 936 h = *pp; 937 938 *indxp = NUM_ENT(h); 939 } 940 941 check_delete_fail: 942 *cmpp = 1; /* We didn't succeed... */ 943 return (0); 944 945 pg_err: __db_pgerr(dbp, pgno); 946 return (ret); 947 } 948