1 /*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998
5 * Sleepycat Software. All rights reserved.
6 */
7 /*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 * Keith Bostic. All rights reserved.
10 */
11 /*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 * The Regents of the University of California. All rights reserved.
14 *
15 * This code is derived from software contributed to Berkeley by
16 * Mike Olson.
17 *
18 * Redistribution and use in source and binary forms, with or without
19 * modification, are permitted provided that the following conditions
20 * are met:
21 * 1. Redistributions of source code must retain the above copyright
22 * notice, this list of conditions and the following disclaimer.
23 * 2. Redistributions in binary form must reproduce the above copyright
24 * notice, this list of conditions and the following disclaimer in the
25 * documentation and/or other materials provided with the distribution.
26 * 3. All advertising materials mentioning features or use of this software
27 * must display the following acknowledgement:
28 * This product includes software developed by the University of
29 * California, Berkeley and its contributors.
30 * 4. Neither the name of the University nor the names of its contributors
31 * may be used to endorse or promote products derived from this software
32 * without specific prior written permission.
33 *
34 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
35 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44 * SUCH DAMAGE.
45 */
46
47 #include "config.h"
48
49 #ifndef lint
50 static const char sccsid[] = "@(#)bt_delete.c 10.43 (Sleepycat) 12/7/98";
51 #endif /* not lint */
52
53 #ifndef NO_SYSTEM_INCLUDES
54 #include <sys/types.h>
55
56 #include <string.h>
57 #endif
58
59 #include "db_int.h"
60 #include "db_page.h"
61 #include "btree.h"
62
63 /*
64 * __bam_delete --
65 * Delete the items referenced by a key.
66 *
67 * PUBLIC: int __bam_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
68 */
69 int
__bam_delete(dbp,txn,key,flags)70 __bam_delete(dbp, txn, key, flags)
71 DB *dbp;
72 DB_TXN *txn;
73 DBT *key;
74 u_int32_t flags;
75 {
76 DBC *dbc;
77 DBT data;
78 u_int32_t f_init, f_next;
79 int ret, t_ret;
80
81 DB_PANIC_CHECK(dbp);
82
83 /* Check for invalid flags. */
84 if ((ret =
85 __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
86 return (ret);
87
88 /* Allocate a cursor. */
89 if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
90 return (ret);
91
92 DEBUG_LWRITE(dbc, txn, "bam_delete", key, NULL, flags);
93
94 /*
95 * Walk a cursor through the key/data pairs, deleting as we go. Set
96 * the DB_DBT_USERMEM flag, as this might be a threaded application
97 * and the flags checking will catch us. We don't actually want the
98 * keys or data, so request a partial of length 0.
99 */
100 memset(&data, 0, sizeof(data));
101 F_SET(&data, DB_DBT_USERMEM | DB_DBT_PARTIAL);
102
103 /* If locking, set read-modify-write flag. */
104 f_init = DB_SET;
105 f_next = DB_NEXT_DUP;
106 if (dbp->dbenv != NULL && dbp->dbenv->lk_info != NULL) {
107 f_init |= DB_RMW;
108 f_next |= DB_RMW;
109 }
110
111 /* Walk through the set of key/data pairs, deleting as we go. */
112 if ((ret = dbc->c_get(dbc, key, &data, f_init)) != 0)
113 goto err;
114 for (;;) {
115 if ((ret = dbc->c_del(dbc, 0)) != 0)
116 goto err;
117 if ((ret = dbc->c_get(dbc, key, &data, f_next)) != 0) {
118 if (ret == DB_NOTFOUND) {
119 ret = 0;
120 break;
121 }
122 goto err;
123 }
124 }
125
126 err: /* Discard the cursor. */
127 if ((t_ret = dbc->c_close(dbc)) != 0 &&
128 (ret == 0 || ret == DB_NOTFOUND))
129 ret = t_ret;
130
131 return (ret);
132 }
133
134 /*
135 * __bam_ditem --
136 * Delete one or more entries from a page.
137 *
138 * PUBLIC: int __bam_ditem __P((DBC *, PAGE *, u_int32_t));
139 */
140 int
__bam_ditem(dbc,h,indx)141 __bam_ditem(dbc, h, indx)
142 DBC *dbc;
143 PAGE *h;
144 u_int32_t indx;
145 {
146 BINTERNAL *bi;
147 BKEYDATA *bk;
148 BOVERFLOW *bo;
149 DB *dbp;
150 u_int32_t nbytes;
151 int ret;
152
153 dbp = dbc->dbp;
154
155 switch (TYPE(h)) {
156 case P_IBTREE:
157 bi = GET_BINTERNAL(h, indx);
158 switch (B_TYPE(bi->type)) {
159 case B_DUPLICATE:
160 case B_OVERFLOW:
161 nbytes = BINTERNAL_SIZE(bi->len);
162 bo = (BOVERFLOW *)bi->data;
163 goto offpage;
164 case B_KEYDATA:
165 nbytes = BINTERNAL_SIZE(bi->len);
166 break;
167 default:
168 return (__db_pgfmt(dbp, h->pgno));
169 }
170 break;
171 case P_IRECNO:
172 nbytes = RINTERNAL_SIZE;
173 break;
174 case P_LBTREE:
175 /*
176 * If it's a duplicate key, discard the index and don't touch
177 * the actual page item.
178 *
179 * XXX
180 * This works because no data item can have an index matching
181 * any other index so even if the data item is in a key "slot",
182 * it won't match any other index.
183 */
184 if ((indx % 2) == 0) {
185 /*
186 * Check for a duplicate after us on the page. NOTE:
187 * we have to delete the key item before deleting the
188 * data item, otherwise the "indx + P_INDX" calculation
189 * won't work!
190 */
191 if (indx + P_INDX < (u_int32_t)NUM_ENT(h) &&
192 h->inp[indx] == h->inp[indx + P_INDX])
193 return (__bam_adjindx(dbc,
194 h, indx, indx + O_INDX, 0));
195 /*
196 * Check for a duplicate before us on the page. It
197 * doesn't matter if we delete the key item before or
198 * after the data item for the purposes of this one.
199 */
200 if (indx > 0 && h->inp[indx] == h->inp[indx - P_INDX])
201 return (__bam_adjindx(dbc,
202 h, indx, indx - P_INDX, 0));
203 }
204 /* FALLTHROUGH */
205 case P_LRECNO:
206 bk = GET_BKEYDATA(h, indx);
207 switch (B_TYPE(bk->type)) {
208 case B_DUPLICATE:
209 case B_OVERFLOW:
210 nbytes = BOVERFLOW_SIZE;
211 bo = GET_BOVERFLOW(h, indx);
212
213 offpage: /* Delete duplicate/offpage chains. */
214 if (B_TYPE(bo->type) == B_DUPLICATE) {
215 if ((ret =
216 __db_ddup(dbc, bo->pgno, __bam_free)) != 0)
217 return (ret);
218 } else
219 if ((ret =
220 __db_doff(dbc, bo->pgno, __bam_free)) != 0)
221 return (ret);
222 break;
223 case B_KEYDATA:
224 nbytes = BKEYDATA_SIZE(bk->len);
225 break;
226 default:
227 return (__db_pgfmt(dbp, h->pgno));
228 }
229 break;
230 default:
231 return (__db_pgfmt(dbp, h->pgno));
232 }
233
234 /* Delete the item. */
235 if ((ret = __db_ditem(dbc, h, indx, nbytes)) != 0)
236 return (ret);
237
238 /* Mark the page dirty. */
239 return (memp_fset(dbp->mpf, h, DB_MPOOL_DIRTY));
240 }
241
242 /*
243 * __bam_adjindx --
244 * Adjust an index on the page.
245 *
246 * PUBLIC: int __bam_adjindx __P((DBC *, PAGE *, u_int32_t, u_int32_t, int));
247 */
248 int
__bam_adjindx(dbc,h,indx,indx_copy,is_insert)249 __bam_adjindx(dbc, h, indx, indx_copy, is_insert)
250 DBC *dbc;
251 PAGE *h;
252 u_int32_t indx, indx_copy;
253 int is_insert;
254 {
255 DB *dbp;
256 db_indx_t copy;
257 int ret;
258
259 dbp = dbc->dbp;
260
261 /* Log the change. */
262 if (DB_LOGGING(dbc) &&
263 (ret = __bam_adj_log(dbp->dbenv->lg_info, dbc->txn, &LSN(h),
264 0, dbp->log_fileid, PGNO(h), &LSN(h), indx, indx_copy,
265 (u_int32_t)is_insert)) != 0)
266 return (ret);
267
268 if (is_insert) {
269 copy = h->inp[indx_copy];
270 if (indx != NUM_ENT(h))
271 memmove(&h->inp[indx + O_INDX], &h->inp[indx],
272 sizeof(db_indx_t) * (NUM_ENT(h) - indx));
273 h->inp[indx] = copy;
274 ++NUM_ENT(h);
275 } else {
276 --NUM_ENT(h);
277 if (indx != NUM_ENT(h))
278 memmove(&h->inp[indx], &h->inp[indx + O_INDX],
279 sizeof(db_indx_t) * (NUM_ENT(h) - indx));
280 }
281
282 /* Mark the page dirty. */
283 ret = memp_fset(dbp->mpf, h, DB_MPOOL_DIRTY);
284
285 /* Adjust the cursors. */
286 __bam_ca_di(dbp, h->pgno, indx, is_insert ? 1 : -1);
287 return (0);
288 }
289
290 /*
291 * __bam_dpage --
292 * Delete a page from the tree.
293 *
294 * PUBLIC: int __bam_dpage __P((DBC *, const DBT *));
295 */
296 int
__bam_dpage(dbc,key)297 __bam_dpage(dbc, key)
298 DBC *dbc;
299 const DBT *key;
300 {
301 CURSOR *cp;
302 DB *dbp;
303 DB_LOCK lock;
304 PAGE *h;
305 db_pgno_t pgno;
306 int level; /* !!!: has to hold number of tree levels. */
307 int exact, ret;
308
309 dbp = dbc->dbp;
310 cp = dbc->internal;
311 ret = 0;
312
313 /*
314 * The locking protocol is that we acquire locks by walking down the
315 * tree, to avoid the obvious deadlocks.
316 *
317 * Call __bam_search to reacquire the empty leaf page, but this time
318 * get both the leaf page and it's parent, locked. Walk back up the
319 * tree, until we have the top pair of pages that we want to delete.
320 * Once we have the top page that we want to delete locked, lock the
321 * underlying pages and check to make sure they're still empty. If
322 * they are, delete them.
323 */
324 for (level = LEAFLEVEL;; ++level) {
325 /* Acquire a page and its parent, locked. */
326 if ((ret =
327 __bam_search(dbc, key, S_WRPAIR, level, NULL, &exact)) != 0)
328 return (ret);
329
330 /*
331 * If we reach the root or the page isn't going to be empty
332 * when we delete one record, quit.
333 */
334 h = cp->csp[-1].page;
335 if (h->pgno == PGNO_ROOT || NUM_ENT(h) != 1)
336 break;
337
338 /* Release the two locked pages. */
339 (void)memp_fput(dbp->mpf, cp->csp[-1].page, 0);
340 (void)__BT_TLPUT(dbc, cp->csp[-1].lock);
341 (void)memp_fput(dbp->mpf, cp->csp[0].page, 0);
342 (void)__BT_TLPUT(dbc, cp->csp[0].lock);
343 }
344
345 /*
346 * Leave the stack pointer one after the last entry, we may be about
347 * to push more items on the stack.
348 */
349 ++cp->csp;
350
351 /*
352 * cp->csp[-2].page is the top page, which we're not going to delete,
353 * and cp->csp[-1].page is the first page we are going to delete.
354 *
355 * Walk down the chain, acquiring the rest of the pages until we've
356 * retrieved the leaf page. If we find any pages that aren't going
357 * to be emptied by the delete, someone else added something while we
358 * were walking the tree, and we discontinue the delete.
359 */
360 for (h = cp->csp[-1].page;;) {
361 if (ISLEAF(h)) {
362 if (NUM_ENT(h) != 0)
363 goto release;
364 break;
365 } else
366 if (NUM_ENT(h) != 1)
367 goto release;
368
369 /*
370 * Get the next page, write lock it and push it onto the stack.
371 * We know it's index 0, because it can only have one element.
372 */
373 pgno = TYPE(h) == P_IBTREE ?
374 GET_BINTERNAL(h, 0)->pgno : GET_RINTERNAL(h, 0)->pgno;
375
376 if ((ret = __bam_lget(dbc, 0, pgno, DB_LOCK_WRITE, &lock)) != 0)
377 goto release;
378 if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0)
379 goto release;
380 BT_STK_PUSH(cp, h, 0, lock, ret);
381 }
382
383 /* Adjust back to reference the last page on the stack. */
384 BT_STK_POP(cp);
385
386 /* Delete the pages. */
387 return (__bam_dpages(dbc));
388
389 release:
390 /* Adjust back to reference the last page on the stack. */
391 BT_STK_POP(cp);
392
393 /* Discard any locked pages and return. */
394 __bam_stkrel(dbc, 0);
395
396 return (ret);
397 }
398
399 /*
400 * __bam_dpages --
401 * Delete a set of locked pages.
402 *
403 * PUBLIC: int __bam_dpages __P((DBC *));
404 */
405 int
__bam_dpages(dbc)406 __bam_dpages(dbc)
407 DBC *dbc;
408 {
409 CURSOR *cp;
410 DB *dbp;
411 DBT a, b;
412 DB_LOCK c_lock, p_lock;
413 EPG *epg;
414 PAGE *child, *parent;
415 db_indx_t nitems;
416 db_pgno_t pgno;
417 db_recno_t rcnt;
418 int done, ret;
419
420 dbp = dbc->dbp;
421 cp = dbc->internal;
422 epg = cp->sp;
423
424 /*
425 * !!!
426 * There is an interesting deadlock situation here. We have to relink
427 * the leaf page chain around the leaf page being deleted. Consider
428 * a cursor walking through the leaf pages, that has the previous page
429 * read-locked and is waiting on a lock for the page we're deleting.
430 * It will deadlock here. This is a problem, because if our process is
431 * selected to resolve the deadlock, we'll leave an empty leaf page
432 * that we can never again access by walking down the tree. So, before
433 * we unlink the subtree, we relink the leaf page chain.
434 */
435 if ((ret = __db_relink(dbc, DB_REM_PAGE, cp->csp->page, NULL, 1)) != 0)
436 goto release;
437
438 /*
439 * We have the entire stack of deletable pages locked.
440 *
441 * Delete the highest page in the tree's reference to the underlying
442 * stack of pages. Then, release that page, letting the rest of the
443 * tree get back to business.
444 */
445 if ((ret = __bam_ditem(dbc, epg->page, epg->indx)) != 0) {
446 release: (void)__bam_stkrel(dbc, 0);
447 return (ret);
448 }
449
450 pgno = epg->page->pgno;
451 nitems = NUM_ENT(epg->page);
452
453 (void)memp_fput(dbp->mpf, epg->page, 0);
454 (void)__BT_TLPUT(dbc, epg->lock);
455
456 /*
457 * Free the rest of the stack of pages.
458 *
459 * !!!
460 * Don't bother checking for errors. We've unlinked the subtree from
461 * the tree, and there's no possibility of recovery outside of doing
462 * TXN rollback.
463 */
464 while (++epg <= cp->csp) {
465 /*
466 * Delete page entries so they will be restored as part of
467 * recovery.
468 */
469 if (NUM_ENT(epg->page) != 0)
470 (void)__bam_ditem(dbc, epg->page, epg->indx);
471
472 (void)__bam_free(dbc, epg->page);
473 (void)__BT_TLPUT(dbc, epg->lock);
474 }
475 BT_STK_CLR(cp);
476
477 /*
478 * Try and collapse the tree a level -- this is only applicable
479 * if we've deleted the next-to-last element from the root page.
480 *
481 * There are two cases when collapsing a tree.
482 *
483 * If we've just deleted the last item from the root page, there is no
484 * further work to be done. The code above has emptied the root page
485 * and freed all pages below it.
486 */
487 if (pgno != PGNO_ROOT || nitems != 1)
488 return (0);
489
490 /*
491 * If we just deleted the next-to-last item from the root page, the
492 * tree can collapse one or more levels. While there remains only a
493 * single item on the root page, write lock the last page referenced
494 * by the root page and copy it over the root page. If we can't get a
495 * write lock, that's okay, the tree just stays deeper than we'd like.
496 */
497 for (done = 0; !done;) {
498 /* Initialize. */
499 parent = child = NULL;
500 p_lock = c_lock = LOCK_INVALID;
501
502 /* Lock the root. */
503 pgno = PGNO_ROOT;
504 if ((ret =
505 __bam_lget(dbc, 0, pgno, DB_LOCK_WRITE, &p_lock)) != 0)
506 goto stop;
507 if ((ret = memp_fget(dbp->mpf, &pgno, 0, &parent)) != 0)
508 goto stop;
509
510 if (NUM_ENT(parent) != 1 ||
511 (TYPE(parent) != P_IBTREE && TYPE(parent) != P_IRECNO))
512 goto stop;
513
514 pgno = TYPE(parent) == P_IBTREE ?
515 GET_BINTERNAL(parent, 0)->pgno :
516 GET_RINTERNAL(parent, 0)->pgno;
517
518 /* Lock the child page. */
519 if ((ret =
520 __bam_lget(dbc, 0, pgno, DB_LOCK_WRITE, &c_lock)) != 0)
521 goto stop;
522 if ((ret = memp_fget(dbp->mpf, &pgno, 0, &child)) != 0)
523 goto stop;
524
525 /* Log the change. */
526 if (DB_LOGGING(dbc)) {
527 memset(&a, 0, sizeof(a));
528 a.data = child;
529 a.size = dbp->pgsize;
530 memset(&b, 0, sizeof(b));
531 b.data = P_ENTRY(parent, 0);
532 b.size = BINTERNAL_SIZE(((BINTERNAL *)b.data)->len);
533 __bam_rsplit_log(dbp->dbenv->lg_info, dbc->txn,
534 &child->lsn, 0, dbp->log_fileid, child->pgno, &a,
535 RE_NREC(parent), &b, &parent->lsn);
536 }
537
538 /*
539 * Make the switch.
540 *
541 * One fixup -- if the tree has record numbers and we're not
542 * converting to a leaf page, we have to preserve the total
543 * record count. Note that we are about to overwrite everything
544 * on the parent, including its LSN. This is actually OK,
545 * because the above log message, which describes this update,
546 * stores its LSN on the child page. When the child is copied
547 * to the parent, the correct LSN is going to copied into
548 * place in the parent.
549 */
550 COMPQUIET(rcnt, 0);
551 if (TYPE(child) == P_IRECNO ||
552 (TYPE(child) == P_IBTREE && F_ISSET(dbp, DB_BT_RECNUM)))
553 rcnt = RE_NREC(parent);
554 memcpy(parent, child, dbp->pgsize);
555 parent->pgno = PGNO_ROOT;
556 if (TYPE(child) == P_IRECNO ||
557 (TYPE(child) == P_IBTREE && F_ISSET(dbp, DB_BT_RECNUM)))
558 RE_NREC_SET(parent, rcnt);
559
560 /* Mark the pages dirty. */
561 memp_fset(dbp->mpf, parent, DB_MPOOL_DIRTY);
562 memp_fset(dbp->mpf, child, DB_MPOOL_DIRTY);
563
564 /* Adjust the cursors. */
565 __bam_ca_rsplit(dbp, child->pgno, PGNO_ROOT);
566
567 /*
568 * Free the page copied onto the root page and discard its
569 * lock. (The call to __bam_free() discards our reference
570 * to the page.)
571 */
572 (void)__bam_free(dbc, child);
573 child = NULL;
574
575 if (0) {
576 stop: done = 1;
577 }
578 if (p_lock != LOCK_INVALID)
579 (void)__BT_TLPUT(dbc, p_lock);
580 if (parent != NULL)
581 memp_fput(dbp->mpf, parent, 0);
582 if (c_lock != LOCK_INVALID)
583 (void)__BT_TLPUT(dbc, c_lock);
584 if (child != NULL)
585 memp_fput(dbp->mpf, child, 0);
586 }
587
588 return (0);
589 }
590