bt_split.c
Upload User: tsgydb
Upload Date: 2007-04-14
Package Size: 10674k
Code Size: 32k
Category:

MySQL

Development Platform:

Visual C++

  1. /*-
  2.  * See the file LICENSE for redistribution information.
  3.  *
  4.  * Copyright (c) 1996, 1997, 1998, 1999, 2000
  5.  * Sleepycat Software.  All rights reserved.
  6.  */
  7. /*
  8.  * Copyright (c) 1990, 1993, 1994, 1995, 1996
  9.  * Keith Bostic.  All rights reserved.
  10.  */
  11. /*
  12.  * Copyright (c) 1990, 1993, 1994, 1995
  13.  * The Regents of the University of California.  All rights reserved.
  14.  *
  15.  * Redistribution and use in source and binary forms, with or without
  16.  * modification, are permitted provided that the following conditions
  17.  * are met:
  18.  * 1. Redistributions of source code must retain the above copyright
  19.  *    notice, this list of conditions and the following disclaimer.
  20.  * 2. Redistributions in binary form must reproduce the above copyright
  21.  *    notice, this list of conditions and the following disclaimer in the
  22.  *    documentation and/or other materials provided with the distribution.
  23.  * 3. Neither the name of the University nor the names of its contributors
  24.  *    may be used to endorse or promote products derived from this software
  25.  *    without specific prior written permission.
  26.  *
  27.  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
  28.  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  29.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  30.  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
  31.  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  32.  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  33.  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  34.  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  35.  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  36.  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  37.  * SUCH DAMAGE.
  38.  */
  39. #include "db_config.h"
  40. #ifndef lint
  41. static const char revid[] = "$Id: bt_split.c,v 11.31 2000/12/22 19:08:27 bostic Exp $";
  42. #endif /* not lint */
  43. #ifndef NO_SYSTEM_INCLUDES
  44. #include <sys/types.h>
  45. #include <limits.h>
  46. #include <string.h>
  47. #endif
  48. #include "db_int.h"
  49. #include "db_page.h"
  50. #include "db_shash.h"
  51. #include "lock.h"
  52. #include "btree.h"
  53. static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
  54. static int __bam_page __P((DBC *, EPG *, EPG *));
  55. static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *, int));
  56. static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
  57. static int __bam_root __P((DBC *, EPG *));
  58. static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
  59. /*
  60.  * __bam_split --
  61.  * Split a page.
  62.  *
  63.  * PUBLIC: int __bam_split __P((DBC *, void *));
  64.  */
  65. int
  66. __bam_split(dbc, arg)
  67. DBC *dbc;
  68. void *arg;
  69. {
  70. BTREE *t;
  71. BTREE_CURSOR *cp;
  72. DB *dbp;
  73. enum { UP, DOWN } dir;
  74. db_pgno_t root_pgno;
  75. int exact, level, ret;
  76. dbp = dbc->dbp;
  77. cp = (BTREE_CURSOR *)dbc->internal;
  78. root_pgno = cp->root;
  79. /*
  80.  * The locking protocol we use to avoid deadlock to acquire locks by
  81.  * walking down the tree, but we do it as lazily as possible, locking
  82.  * the root only as a last resort.  We expect all stack pages to have
  83.  * been discarded before we're called; we discard all short-term locks.
  84.  *
  85.  * When __bam_split is first called, we know that a leaf page was too
  86.  * full for an insert.  We don't know what leaf page it was, but we
  87.  * have the key/recno that caused the problem.  We call XX_search to
  88.  * reacquire the leaf page, but this time get both the leaf page and
  89.  * its parent, locked.  We then split the leaf page and see if the new
  90.  * internal key will fit into the parent page.  If it will, we're done.
  91.  *
  92.  * If it won't, we discard our current locks and repeat the process,
  93.  * only this time acquiring the parent page and its parent, locked.
  94.  * This process repeats until we succeed in the split, splitting the
  95.  * root page as the final resort.  The entire process then repeats,
  96.  * as necessary, until we split a leaf page.
  97.  *
  98.  * XXX
  99.  * A traditional method of speeding this up is to maintain a stack of
  100.  * the pages traversed in the original search.  You can detect if the
  101.  * stack is correct by storing the page's LSN when it was searched and
  102.  * comparing that LSN with the current one when it's locked during the
  103.  * split.  This would be an easy change for this code, but I have no
  104.  * numbers that indicate it's worthwhile.
  105.  */
  106. t = dbp->bt_internal;
  107. for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
  108. /*
  109.  * Acquire a page and its parent, locked.
  110.  */
  111. if ((ret = (dbc->dbtype == DB_BTREE ?
  112.     __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
  113.     __bam_rsearch(dbc,
  114. (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
  115. return (ret);
  116. /*
  117.  * Split the page if it still needs it (it's possible another
  118.  * thread of control has already split the page).  If we are
  119.  * guaranteed that two items will fit on the page, the split
  120.  * is no longer necessary.
  121.  */
  122. if (2 * B_MAXSIZEONPAGE(cp->ovflsize)
  123.     <= (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
  124. __bam_stkrel(dbc, STK_NOLOCK);
  125. return (0);
  126. }
  127. ret = cp->csp[0].page->pgno == root_pgno ?
  128.     __bam_root(dbc, &cp->csp[0]) :
  129.     __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
  130. BT_STK_CLR(cp);
  131. switch (ret) {
  132. case 0:
  133. /* Once we've split the leaf page, we're done. */
  134. if (level == LEAFLEVEL)
  135. return (0);
  136. /* Switch directions. */
  137. if (dir == UP)
  138. dir = DOWN;
  139. break;
  140. case DB_NEEDSPLIT:
  141. /*
  142.  * It's possible to fail to split repeatedly, as other
  143.  * threads may be modifying the tree, or the page usage
  144.  * is sufficiently bad that we don't get enough space
  145.  * the first time.
  146.  */
  147. if (dir == DOWN)
  148. dir = UP;
  149. break;
  150. default:
  151. return (ret);
  152. }
  153. }
  154. /* NOTREACHED */
  155. }
  156. /*
  157.  * __bam_root --
  158.  * Split the root page of a btree.
  159.  */
  160. static int
  161. __bam_root(dbc, cp)
  162. DBC *dbc;
  163. EPG *cp;
  164. {
  165. DB *dbp;
  166. DBT log_dbt;
  167. DB_LSN log_lsn;
  168. PAGE *lp, *rp;
  169. db_indx_t split;
  170. u_int32_t opflags;
  171. int ret;
  172. dbp = dbc->dbp;
  173. /* Yeah, right. */
  174. if (cp->page->level >= MAXBTREELEVEL) {
  175. __db_err(dbp->dbenv,
  176.     "Too many btree levels: %d", cp->page->level);
  177. ret = ENOSPC;
  178. goto err;
  179. }
  180. /* Create new left and right pages for the split. */
  181. lp = rp = NULL;
  182. if ((ret = __db_new(dbc, TYPE(cp->page), &lp)) != 0 ||
  183.     (ret = __db_new(dbc, TYPE(cp->page), &rp)) != 0)
  184. goto err;
  185. P_INIT(lp, dbp->pgsize, lp->pgno,
  186.     PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
  187.     cp->page->level, TYPE(cp->page));
  188. P_INIT(rp, dbp->pgsize, rp->pgno,
  189.     ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
  190.     cp->page->level, TYPE(cp->page));
  191. /* Split the page. */
  192. if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
  193. goto err;
  194. /* Log the change. */
  195. if (DB_LOGGING(dbc)) {
  196. memset(&log_dbt, 0, sizeof(log_dbt));
  197. log_dbt.data = cp->page;
  198. log_dbt.size = dbp->pgsize;
  199. ZERO_LSN(log_lsn);
  200. opflags = F_ISSET(
  201.     (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
  202. if ((ret = __bam_split_log(dbp->dbenv, dbc->txn,
  203.     &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
  204.     PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn,
  205.     dbc->internal->root, &log_dbt, opflags)) != 0)
  206. goto err;
  207. LSN(lp) = LSN(cp->page);
  208. LSN(rp) = LSN(cp->page);
  209. }
  210. /* Clean up the new root page. */
  211. if ((ret = (dbc->dbtype == DB_RECNO ?
  212.     __ram_root(dbc, cp->page, lp, rp) :
  213.     __bam_broot(dbc, cp->page, lp, rp))) != 0)
  214. goto err;
  215. /* Adjust any cursors. */
  216. if ((ret = __bam_ca_split(dbc,
  217.     cp->page->pgno, lp->pgno, rp->pgno, split, 1)) != 0)
  218. goto err;
  219. /* Success -- write the real pages back to the store. */
  220. (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
  221. (void)__TLPUT(dbc, cp->lock);
  222. (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
  223. (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
  224. return (0);
  225. err: if (lp != NULL)
  226. (void)__db_free(dbc, lp);
  227. if (rp != NULL)
  228. (void)__db_free(dbc, rp);
  229. (void)memp_fput(dbp->mpf, cp->page, 0);
  230. (void)__TLPUT(dbc, cp->lock);
  231. return (ret);
  232. }
  233. /*
  234.  * __bam_page --
  235.  * Split the non-root page of a btree.
  236.  */
  237. static int
  238. __bam_page(dbc, pp, cp)
  239. DBC *dbc;
  240. EPG *pp, *cp;
  241. {
  242. BTREE_CURSOR *bc;
  243. DBT log_dbt;
  244. DB_LSN log_lsn;
  245. DB *dbp;
  246. DB_LOCK tplock;
  247. DB_LSN save_lsn;
  248. PAGE *lp, *rp, *alloc_rp, *tp;
  249. db_indx_t split;
  250. u_int32_t opflags;
  251. int ret, t_ret;
  252. dbp = dbc->dbp;
  253. alloc_rp = lp = rp = tp = NULL;
  254. tplock.off = LOCK_INVALID;
  255. ret = -1;
  256. /*
  257.  * Create a new right page for the split, and fill in everything
  258.  * except its LSN and page number.
  259.  *
  260.  * We malloc space for both the left and right pages, so we don't get
  261.  * a new page from the underlying buffer pool until we know the split
  262.  * is going to succeed.  The reason is that we can't release locks
  263.  * acquired during the get-a-new-page process because metadata page
  264.  * locks can't be discarded on failure since we may have modified the
  265.  * free list.  So, if you assume that we're holding a write lock on the
  266.  * leaf page which ran out of space and started this split (e.g., we
  267.  * have already written records to the page, or we retrieved a record
  268.  * from it with the DB_RMW flag set), failing in a split with both a
  269.  * leaf page locked and the metadata page locked can potentially lock
  270.  * up the tree badly, because we've violated the rule of always locking
  271.  * down the tree, and never up.
  272.  */
  273. if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, NULL, &rp)) != 0)
  274. goto err;
  275. P_INIT(rp, dbp->pgsize, 0,
  276.     ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
  277.     ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
  278.     cp->page->level, TYPE(cp->page));
  279. /*
  280.  * Create new left page for the split, and fill in everything
  281.  * except its LSN and next-page page number.
  282.  */
  283. if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, NULL, &lp)) != 0)
  284. goto err;
  285. P_INIT(lp, dbp->pgsize, PGNO(cp->page),
  286.     ISINTERNAL(cp->page) ?  PGNO_INVALID : PREV_PGNO(cp->page),
  287.     ISINTERNAL(cp->page) ?  PGNO_INVALID : 0,
  288.     cp->page->level, TYPE(cp->page));
  289. /*
  290.  * Split right.
  291.  *
  292.  * Only the indices are sorted on the page, i.e., the key/data pairs
  293.  * aren't, so it's simpler to copy the data from the split page onto
  294.  * two new pages instead of copying half the data to a new right page
  295.  * and compacting the left page in place.  Since the left page can't
  296.  * change, we swap the original and the allocated left page after the
  297.  * split.
  298.  */
  299. if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
  300. goto err;
  301. /*
  302.  * Test to see if we are going to be able to insert the new pages into
  303.  * the parent page.  The interesting failure here is that the parent
  304.  * page can't hold the new keys, and has to be split in turn, in which
  305.  * case we want to release all the locks we can.
  306.  */
  307. if ((ret = __bam_pinsert(dbc, pp, lp, rp, 1)) != 0)
  308. goto err;
  309. /*
  310.  * Fix up the previous pointer of any leaf page following the split
  311.  * page.
  312.  *
  313.  * There's interesting deadlock situations here as we try to write-lock
  314.  * a page that's not in our direct ancestry.  Consider a cursor walking
  315.  * backward through the leaf pages, that has our following page locked,
  316.  * and is waiting on a lock for the page we're splitting.  In that case
  317.  * we're going to deadlock here .  It's probably OK, stepping backward
  318.  * through the tree isn't a common operation.
  319.  */
  320. if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) {
  321. if ((ret = __db_lget(dbc,
  322.     0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0)
  323. goto err;
  324. if ((ret =
  325.     memp_fget(dbp->mpf, &NEXT_PGNO(cp->page), 0, &tp)) != 0)
  326. goto err;
  327. }
  328. /*
  329.  * We've got everything locked down we need, and we know the split
  330.  * is going to succeed.  Go and get the additional page we'll need.
  331.  */
  332. if ((ret = __db_new(dbc, TYPE(cp->page), &alloc_rp)) != 0)
  333. goto err;
  334. /*
  335.  * Fix up the page numbers we didn't have before.  We have to do this
  336.  * before calling __bam_pinsert because it may copy a page number onto
  337.  * the parent page and it takes the page number from its page argument.
  338.  */
  339. PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
  340. /* Actually update the parent page. */
  341. if ((ret = __bam_pinsert(dbc, pp, lp, rp, 0)) != 0)
  342. goto err;
  343. bc = (BTREE_CURSOR *)dbc->internal;
  344. /* Log the change. */
  345. if (DB_LOGGING(dbc)) {
  346. memset(&log_dbt, 0, sizeof(log_dbt));
  347. log_dbt.data = cp->page;
  348. log_dbt.size = dbp->pgsize;
  349. if (tp == NULL)
  350. ZERO_LSN(log_lsn);
  351. opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
  352. if ((ret = __bam_split_log(dbp->dbenv, dbc->txn,
  353.     &LSN(cp->page), 0, dbp->log_fileid, PGNO(cp->page),
  354.     &LSN(cp->page), PGNO(alloc_rp), &LSN(alloc_rp),
  355.     (u_int32_t)NUM_ENT(lp),
  356.     tp == NULL ? 0 : PGNO(tp),
  357.     tp == NULL ? &log_lsn : &LSN(tp),
  358.     bc->root, &log_dbt, opflags)) != 0)
  359. goto err;
  360. /* Update the LSNs for all involved pages. */
  361. LSN(alloc_rp) = LSN(cp->page);
  362. LSN(lp) = LSN(cp->page);
  363. LSN(rp) = LSN(cp->page);
  364. if (tp != NULL)
  365. LSN(tp) = LSN(cp->page);
  366. }
  367. /*
  368.  * Copy the left and right pages into place.  There are two paths
  369.  * through here.  Either we are logging and we set the LSNs in the
  370.  * logging path.  However, if we are not logging, then we do not
  371.  * have valid LSNs on lp or rp.  The correct LSNs to use are the
  372.  * ones on the page we got from __db_new or the one that was
  373.  * originally on cp->page.  In both cases, we save the LSN from the
  374.  * real database page (not a malloc'd one) and reapply it after we
  375.  * do the copy.
  376.  */
  377. save_lsn = alloc_rp->lsn;
  378. memcpy(alloc_rp, rp, LOFFSET(rp));
  379. memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
  380.     (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
  381. alloc_rp->lsn = save_lsn;
  382. save_lsn = cp->page->lsn;
  383. memcpy(cp->page, lp, LOFFSET(lp));
  384. memcpy((u_int8_t *)cp->page + HOFFSET(lp),
  385.     (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
  386. cp->page->lsn = save_lsn;
  387. /* Fix up the next-page link. */
  388. if (tp != NULL)
  389. PREV_PGNO(tp) = PGNO(rp);
  390. /* Adjust any cursors. */
  391. if ((ret = __bam_ca_split(dbc,
  392.     PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0)
  393. goto err;
  394. __os_free(lp, dbp->pgsize);
  395. __os_free(rp, dbp->pgsize);
  396. /*
  397.  * Success -- write the real pages back to the store.  As we never
  398.  * acquired any sort of lock on the new page, we release it before
  399.  * releasing locks on the pages that reference it.  We're finished
  400.  * modifying the page so it's not really necessary, but it's neater.
  401.  */
  402. if ((t_ret =
  403.     memp_fput(dbp->mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  404. ret = t_ret;
  405. if ((t_ret =
  406.     memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  407. ret = t_ret;
  408. (void)__TLPUT(dbc, pp->lock);
  409. if ((t_ret =
  410.     memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  411. ret = t_ret;
  412. (void)__TLPUT(dbc, cp->lock);
  413. if (tp != NULL) {
  414. if ((t_ret =
  415.     memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  416. ret = t_ret;
  417. (void)__TLPUT(dbc, tplock);
  418. }
  419. return (ret);
  420. err: if (lp != NULL)
  421. __os_free(lp, dbp->pgsize);
  422. if (rp != NULL)
  423. __os_free(rp, dbp->pgsize);
  424. if (alloc_rp != NULL)
  425. (void)__db_free(dbc, alloc_rp);
  426. if (tp != NULL)
  427. (void)memp_fput(dbp->mpf, tp, 0);
  428. if (tplock.off != LOCK_INVALID)
  429. /* We never updated the next page, we can release it. */
  430. (void)__LPUT(dbc, tplock);
  431. (void)memp_fput(dbp->mpf, pp->page, 0);
  432. if (ret == DB_NEEDSPLIT)
  433. (void)__LPUT(dbc, pp->lock);
  434. else
  435. (void)__TLPUT(dbc, pp->lock);
  436. (void)memp_fput(dbp->mpf, cp->page, 0);
  437. if (ret == DB_NEEDSPLIT)
  438. (void)__LPUT(dbc, cp->lock);
  439. else
  440. (void)__TLPUT(dbc, cp->lock);
  441. return (ret);
  442. }
  443. /*
  444.  * __bam_broot --
  445.  * Fix up the btree root page after it has been split.
  446.  */
  447. static int
  448. __bam_broot(dbc, rootp, lp, rp)
  449. DBC *dbc;
  450. PAGE *rootp, *lp, *rp;
  451. {
  452. BINTERNAL bi, *child_bi;
  453. BKEYDATA *child_bk;
  454. BTREE_CURSOR *cp;
  455. DB *dbp;
  456. DBT hdr, data;
  457. db_pgno_t root_pgno;
  458. int ret;
  459. dbp = dbc->dbp;
  460. cp = (BTREE_CURSOR *)dbc->internal;
  461. /*
  462.  * If the root page was a leaf page, change it into an internal page.
  463.  * We copy the key we split on (but not the key's data, in the case of
  464.  * a leaf page) to the new root page.
  465.  */
  466. root_pgno = cp->root;
  467. P_INIT(rootp, dbp->pgsize,
  468.     root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
  469. memset(&data, 0, sizeof(data));
  470. memset(&hdr, 0, sizeof(hdr));
  471. /*
  472.  * The btree comparison code guarantees that the left-most key on any
  473.  * internal btree page is never used, so it doesn't need to be filled
  474.  * in.  Set the record count if necessary.
  475.  */
  476. memset(&bi, 0, sizeof(bi));
  477. bi.len = 0;
  478. B_TSET(bi.type, B_KEYDATA, 0);
  479. bi.pgno = lp->pgno;
  480. if (F_ISSET(cp, C_RECNUM)) {
  481. bi.nrecs = __bam_total(lp);
  482. RE_NREC_SET(rootp, bi.nrecs);
  483. }
  484. hdr.data = &bi;
  485. hdr.size = SSZA(BINTERNAL, data);
  486. if ((ret =
  487.     __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
  488. return (ret);
  489. switch (TYPE(rp)) {
  490. case P_IBTREE:
  491. /* Copy the first key of the child page onto the root page. */
  492. child_bi = GET_BINTERNAL(rp, 0);
  493. bi.len = child_bi->len;
  494. B_TSET(bi.type, child_bi->type, 0);
  495. bi.pgno = rp->pgno;
  496. if (F_ISSET(cp, C_RECNUM)) {
  497. bi.nrecs = __bam_total(rp);
  498. RE_NREC_ADJ(rootp, bi.nrecs);
  499. }
  500. hdr.data = &bi;
  501. hdr.size = SSZA(BINTERNAL, data);
  502. data.data = child_bi->data;
  503. data.size = child_bi->len;
  504. if ((ret = __db_pitem(dbc, rootp, 1,
  505.     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
  506. return (ret);
  507. /* Increment the overflow ref count. */
  508. if (B_TYPE(child_bi->type) == B_OVERFLOW)
  509. if ((ret = __db_ovref(dbc,
  510.     ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
  511. return (ret);
  512. break;
  513. case P_LDUP:
  514. case P_LBTREE:
  515. /* Copy the first key of the child page onto the root page. */
  516. child_bk = GET_BKEYDATA(rp, 0);
  517. switch (B_TYPE(child_bk->type)) {
  518. case B_KEYDATA:
  519. bi.len = child_bk->len;
  520. B_TSET(bi.type, child_bk->type, 0);
  521. bi.pgno = rp->pgno;
  522. if (F_ISSET(cp, C_RECNUM)) {
  523. bi.nrecs = __bam_total(rp);
  524. RE_NREC_ADJ(rootp, bi.nrecs);
  525. }
  526. hdr.data = &bi;
  527. hdr.size = SSZA(BINTERNAL, data);
  528. data.data = child_bk->data;
  529. data.size = child_bk->len;
  530. if ((ret = __db_pitem(dbc, rootp, 1,
  531.     BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
  532. return (ret);
  533. break;
  534. case B_DUPLICATE:
  535. case B_OVERFLOW:
  536. bi.len = BOVERFLOW_SIZE;
  537. B_TSET(bi.type, child_bk->type, 0);
  538. bi.pgno = rp->pgno;
  539. if (F_ISSET(cp, C_RECNUM)) {
  540. bi.nrecs = __bam_total(rp);
  541. RE_NREC_ADJ(rootp, bi.nrecs);
  542. }
  543. hdr.data = &bi;
  544. hdr.size = SSZA(BINTERNAL, data);
  545. data.data = child_bk;
  546. data.size = BOVERFLOW_SIZE;
  547. if ((ret = __db_pitem(dbc, rootp, 1,
  548.     BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
  549. return (ret);
  550. /* Increment the overflow ref count. */
  551. if (B_TYPE(child_bk->type) == B_OVERFLOW)
  552. if ((ret = __db_ovref(dbc,
  553.     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
  554. return (ret);
  555. break;
  556. default:
  557. return (__db_pgfmt(dbp, rp->pgno));
  558. }
  559. break;
  560. default:
  561. return (__db_pgfmt(dbp, rp->pgno));
  562. }
  563. return (0);
  564. }
  565. /*
  566.  * __ram_root --
  567.  * Fix up the recno root page after it has been split.
  568.  */
  569. static int
  570. __ram_root(dbc, rootp, lp, rp)
  571. DBC *dbc;
  572. PAGE *rootp, *lp, *rp;
  573. {
  574. DB *dbp;
  575. DBT hdr;
  576. RINTERNAL ri;
  577. db_pgno_t root_pgno;
  578. int ret;
  579. dbp = dbc->dbp;
  580. root_pgno = dbc->internal->root;
  581. /* Initialize the page. */
  582. P_INIT(rootp, dbp->pgsize,
  583.     root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
  584. /* Initialize the header. */
  585. memset(&hdr, 0, sizeof(hdr));
  586. hdr.data = &ri;
  587. hdr.size = RINTERNAL_SIZE;
  588. /* Insert the left and right keys, set the header information. */
  589. ri.pgno = lp->pgno;
  590. ri.nrecs = __bam_total(lp);
  591. if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
  592. return (ret);
  593. RE_NREC_SET(rootp, ri.nrecs);
  594. ri.pgno = rp->pgno;
  595. ri.nrecs = __bam_total(rp);
  596. if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
  597. return (ret);
  598. RE_NREC_ADJ(rootp, ri.nrecs);
  599. return (0);
  600. }
  601. /*
  602.  * __bam_pinsert --
  603.  * Insert a new key into a parent page, completing the split.
  604.  */
  605. static int
  606. __bam_pinsert(dbc, parent, lchild, rchild, space_check)
  607. DBC *dbc;
  608. EPG *parent;
  609. PAGE *lchild, *rchild;
  610. int space_check;
  611. {
  612. BINTERNAL bi, *child_bi;
  613. BKEYDATA *child_bk, *tmp_bk;
  614. BTREE *t;
  615. BTREE_CURSOR *cp;
  616. DB *dbp;
  617. DBT a, b, hdr, data;
  618. PAGE *ppage;
  619. RINTERNAL ri;
  620. db_indx_t off;
  621. db_recno_t nrecs;
  622. size_t (*func) __P((DB *, const DBT *, const DBT *));
  623. u_int32_t n, nbytes, nksize;
  624. int ret;
  625. dbp = dbc->dbp;
  626. cp = (BTREE_CURSOR *)dbc->internal;
  627. t = dbp->bt_internal;
  628. ppage = parent->page;
  629. /* If handling record numbers, count records split to the right page. */
  630. nrecs = F_ISSET(cp, C_RECNUM) && !space_check ? __bam_total(rchild) : 0;
  631. /*
  632.  * Now we insert the new page's first key into the parent page, which
  633.  * completes the split.  The parent points to a PAGE and a page index
  634.  * offset, where the new key goes ONE AFTER the index, because we split
  635.  * to the right.
  636.  *
  637.  * XXX
  638.  * Some btree algorithms replace the key for the old page as well as
  639.  * the new page.  We don't, as there's no reason to believe that the
  640.  * first key on the old page is any better than the key we have, and,
  641.  * in the case of a key being placed at index 0 causing the split, the
  642.  * key is unavailable.
  643.  */
  644. off = parent->indx + O_INDX;
  645. /*
  646.  * Calculate the space needed on the parent page.
  647.  *
  648.  * Prefix trees: space hack used when inserting into BINTERNAL pages.
  649.  * Retain only what's needed to distinguish between the new entry and
  650.  * the LAST entry on the page to its left.  If the keys compare equal,
  651.  * retain the entire key.  We ignore overflow keys, and the entire key
  652.  * must be retained for the next-to-leftmost key on the leftmost page
  653.  * of each level, or the search will fail.  Applicable ONLY to internal
  654.  * pages that have leaf pages as children.  Further reduction of the
  655.  * key between pairs of internal pages loses too much information.
  656.  */
  657. switch (TYPE(rchild)) {
  658. case P_IBTREE:
  659. child_bi = GET_BINTERNAL(rchild, 0);
  660. nbytes = BINTERNAL_PSIZE(child_bi->len);
  661. if (P_FREESPACE(ppage) < nbytes)
  662. return (DB_NEEDSPLIT);
  663. if (space_check)
  664. return (0);
  665. /* Add a new record for the right page. */
  666. memset(&bi, 0, sizeof(bi));
  667. bi.len = child_bi->len;
  668. B_TSET(bi.type, child_bi->type, 0);
  669. bi.pgno = rchild->pgno;
  670. bi.nrecs = nrecs;
  671. memset(&hdr, 0, sizeof(hdr));
  672. hdr.data = &bi;
  673. hdr.size = SSZA(BINTERNAL, data);
  674. memset(&data, 0, sizeof(data));
  675. data.data = child_bi->data;
  676. data.size = child_bi->len;
  677. if ((ret = __db_pitem(dbc, ppage, off,
  678.     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
  679. return (ret);
  680. /* Increment the overflow ref count. */
  681. if (B_TYPE(child_bi->type) == B_OVERFLOW)
  682. if ((ret = __db_ovref(dbc,
  683.     ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
  684. return (ret);
  685. break;
  686. case P_LDUP:
  687. case P_LBTREE:
  688. child_bk = GET_BKEYDATA(rchild, 0);
  689. switch (B_TYPE(child_bk->type)) {
  690. case B_KEYDATA:
  691. /*
  692.  * We set t->bt_prefix to NULL if we have a comparison
  693.  * callback but no prefix compression callback.  But,
  694.  * if we're splitting in an off-page duplicates tree,
  695.  * we still have to do some checking.  If using the
  696.  * default off-page duplicates comparison routine we
  697.  * can use the default prefix compression callback. If
  698.  * not using the default off-page duplicates comparison
  699.  * routine, we can't do any kind of prefix compression
  700.  * as there's no way for an application to specify a
  701.  * prefix compression callback that corresponds to its
  702.  * comparison callback.
  703.  */
  704. if (F_ISSET(dbc, DBC_OPD)) {
  705. if (dbp->dup_compare == __bam_defcmp)
  706. func = __bam_defpfx;
  707. else
  708. func = NULL;
  709. } else
  710. func = t->bt_prefix;
  711. nbytes = BINTERNAL_PSIZE(child_bk->len);
  712. nksize = child_bk->len;
  713. if (func == NULL)
  714. goto noprefix;
  715. if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
  716. goto noprefix;
  717. tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) -
  718.     (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
  719. if (B_TYPE(tmp_bk->type) != B_KEYDATA)
  720. goto noprefix;
  721. memset(&a, 0, sizeof(a));
  722. a.size = tmp_bk->len;
  723. a.data = tmp_bk->data;
  724. memset(&b, 0, sizeof(b));
  725. b.size = child_bk->len;
  726. b.data = child_bk->data;
  727. nksize = func(dbp, &a, &b);
  728. if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
  729. nbytes = n;
  730. else
  731. noprefix: nksize = child_bk->len;
  732. if (P_FREESPACE(ppage) < nbytes)
  733. return (DB_NEEDSPLIT);
  734. if (space_check)
  735. return (0);
  736. memset(&bi, 0, sizeof(bi));
  737. bi.len = nksize;
  738. B_TSET(bi.type, child_bk->type, 0);
  739. bi.pgno = rchild->pgno;
  740. bi.nrecs = nrecs;
  741. memset(&hdr, 0, sizeof(hdr));
  742. hdr.data = &bi;
  743. hdr.size = SSZA(BINTERNAL, data);
  744. memset(&data, 0, sizeof(data));
  745. data.data = child_bk->data;
  746. data.size = nksize;
  747. if ((ret = __db_pitem(dbc, ppage, off,
  748.     BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
  749. return (ret);
  750. break;
  751. case B_DUPLICATE:
  752. case B_OVERFLOW:
  753. nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
  754. if (P_FREESPACE(ppage) < nbytes)
  755. return (DB_NEEDSPLIT);
  756. if (space_check)
  757. return (0);
  758. memset(&bi, 0, sizeof(bi));
  759. bi.len = BOVERFLOW_SIZE;
  760. B_TSET(bi.type, child_bk->type, 0);
  761. bi.pgno = rchild->pgno;
  762. bi.nrecs = nrecs;
  763. memset(&hdr, 0, sizeof(hdr));
  764. hdr.data = &bi;
  765. hdr.size = SSZA(BINTERNAL, data);
  766. memset(&data, 0, sizeof(data));
  767. data.data = child_bk;
  768. data.size = BOVERFLOW_SIZE;
  769. if ((ret = __db_pitem(dbc, ppage, off,
  770.     BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
  771. return (ret);
  772. /* Increment the overflow ref count. */
  773. if (B_TYPE(child_bk->type) == B_OVERFLOW)
  774. if ((ret = __db_ovref(dbc,
  775.     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
  776. return (ret);
  777. break;
  778. default:
  779. return (__db_pgfmt(dbp, rchild->pgno));
  780. }
  781. break;
  782. case P_IRECNO:
  783. case P_LRECNO:
  784. nbytes = RINTERNAL_PSIZE;
  785. if (P_FREESPACE(ppage) < nbytes)
  786. return (DB_NEEDSPLIT);
  787. if (space_check)
  788. return (0);
  789. /* Add a new record for the right page. */
  790. memset(&hdr, 0, sizeof(hdr));
  791. hdr.data = &ri;
  792. hdr.size = RINTERNAL_SIZE;
  793. ri.pgno = rchild->pgno;
  794. ri.nrecs = nrecs;
  795. if ((ret = __db_pitem(dbc,
  796.     ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
  797. return (ret);
  798. break;
  799. default:
  800. return (__db_pgfmt(dbp, rchild->pgno));
  801. }
  802. /*
  803.  * If a Recno or Btree with record numbers AM page, or an off-page
  804.  * duplicates tree, adjust the parent page's left page record count.
  805.  */
  806. if (F_ISSET(cp, C_RECNUM)) {
  807. /* Log the change. */
  808. if (DB_LOGGING(dbc) &&
  809.     (ret = __bam_cadjust_log(dbp->dbenv, dbc->txn,
  810.     &LSN(ppage), 0, dbp->log_fileid, PGNO(ppage),
  811.     &LSN(ppage), parent->indx, -(int32_t)nrecs, 0)) != 0)
  812. return (ret);
  813. /* Update the left page count. */
  814. if (dbc->dbtype == DB_RECNO)
  815. GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
  816. else
  817. GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
  818. }
  819. return (0);
  820. }
  821. /*
  822.  * __bam_psplit --
  823.  * Do the real work of splitting the page.
  824.  */
  825. static int
  826. __bam_psplit(dbc, cp, lp, rp, splitret)
  827. DBC *dbc;
  828. EPG *cp;
  829. PAGE *lp, *rp;
  830. db_indx_t *splitret;
  831. {
  832. DB *dbp;
  833. PAGE *pp;
  834. db_indx_t half, nbytes, off, splitp, top;
  835. int adjust, cnt, iflag, isbigkey, ret;
  836. dbp = dbc->dbp;
  837. pp = cp->page;
  838. adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
  839. /*
  840.  * If we're splitting the first (last) page on a level because we're
  841.  * inserting (appending) a key to it, it's likely that the data is
  842.  * sorted.  Moving a single item to the new page is less work and can
  843.  * push the fill factor higher than normal.  If we're wrong it's not
  844.  * a big deal, we'll just do the split the right way next time.
  845.  */
  846. off = 0;
  847. if (NEXT_PGNO(pp) == PGNO_INVALID &&
  848.     ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
  849.     (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
  850. off = NUM_ENT(cp->page) - adjust;
  851. else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
  852. off = adjust;
  853. if (off != 0)
  854. goto sort;
  855. /*
  856.  * Split the data to the left and right pages.  Try not to split on
  857.  * an overflow key.  (Overflow keys on internal pages will slow down
  858.  * searches.)  Refuse to split in the middle of a set of duplicates.
  859.  *
  860.  * First, find the optimum place to split.
  861.  *
  862.  * It's possible to try and split past the last record on the page if
  863.  * there's a very large record at the end of the page.  Make sure this
  864.  * doesn't happen by bounding the check at the next-to-last entry on
  865.  * the page.
  866.  *
  867.  * Note, we try and split half the data present on the page.  This is
  868.  * because another process may have already split the page and left
  869.  * it half empty.  We don't try and skip the split -- we don't know
  870.  * how much space we're going to need on the page, and we may need up
  871.  * to half the page for a big item, so there's no easy test to decide
  872.  * if we need to split or not.  Besides, if two threads are inserting
  873.  * data into the same place in the database, we're probably going to
  874.  * need more space soon anyway.
  875.  */
  876. top = NUM_ENT(pp) - adjust;
  877. half = (dbp->pgsize - HOFFSET(pp)) / 2;
  878. for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
  879. switch (TYPE(pp)) {
  880. case P_IBTREE:
  881. if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
  882. nbytes +=
  883.    BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
  884. else
  885. nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
  886. break;
  887. case P_LBTREE:
  888. if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
  889. nbytes +=
  890.     BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
  891. else
  892. nbytes += BOVERFLOW_SIZE;
  893. ++off;
  894. /* FALLTHROUGH */
  895. case P_LDUP:
  896. case P_LRECNO:
  897. if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
  898. nbytes +=
  899.     BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
  900. else
  901. nbytes += BOVERFLOW_SIZE;
  902. break;
  903. case P_IRECNO:
  904. nbytes += RINTERNAL_SIZE;
  905. break;
  906. default:
  907. return (__db_pgfmt(dbp, pp->pgno));
  908. }
  909. sort: splitp = off;
  910. /*
  911.  * Splitp is either at or just past the optimum split point.  If the
  912.  * tree type is such that we're going to promote a key to an internal
  913.  * page, and our current choice is an overflow key, look for something
  914.  * close by that's smaller.
  915.  */
  916. switch (TYPE(pp)) {
  917. case P_IBTREE:
  918. iflag = 1;
  919. isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
  920. break;
  921. case P_LBTREE:
  922. case P_LDUP:
  923. iflag = 0;
  924. isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
  925. break;
  926. default:
  927. iflag = isbigkey = 0;
  928. }
  929. if (isbigkey)
  930. for (cnt = 1; cnt <= 3; ++cnt) {
  931. off = splitp + cnt * adjust;
  932. if (off < (db_indx_t)NUM_ENT(pp) &&
  933.     ((iflag &&
  934.     B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
  935.     B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
  936. splitp = off;
  937. break;
  938. }
  939. if (splitp <= (db_indx_t)(cnt * adjust))
  940. continue;
  941. off = splitp - cnt * adjust;
  942. if (iflag ?
  943.     B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
  944.     B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
  945. splitp = off;
  946. break;
  947. }
  948. }
  949. /*
  950.  * We can't split in the middle a set of duplicates.  We know that
  951.  * no duplicate set can take up more than about 25% of the page,
  952.  * because that's the point where we push it off onto a duplicate
  953.  * page set.  So, this loop can't be unbounded.
  954.  */
  955. if (TYPE(pp) == P_LBTREE &&
  956.     pp->inp[splitp] == pp->inp[splitp - adjust])
  957. for (cnt = 1;; ++cnt) {
  958. off = splitp + cnt * adjust;
  959. if (off < NUM_ENT(pp) &&
  960.     pp->inp[splitp] != pp->inp[off]) {
  961. splitp = off;
  962. break;
  963. }
  964. if (splitp <= (db_indx_t)(cnt * adjust))
  965. continue;
  966. off = splitp - cnt * adjust;
  967. if (pp->inp[splitp] != pp->inp[off]) {
  968. splitp = off + adjust;
  969. break;
  970. }
  971. }
  972. /* We're going to split at splitp. */
  973. if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
  974. return (ret);
  975. if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
  976. return (ret);
  977. *splitret = splitp;
  978. return (0);
  979. }
  980. /*
  981.  * __bam_copy --
  982.  * Copy a set of records from one page to another.
  983.  *
  984.  * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
  985.  */
  986. int
  987. __bam_copy(dbp, pp, cp, nxt, stop)
  988. DB *dbp;
  989. PAGE *pp, *cp;
  990. u_int32_t nxt, stop;
  991. {
  992. db_indx_t nbytes, off;
  993. /*
  994.  * Copy the rest of the data to the right page.  Nxt is the next
  995.  * offset placed on the target page.
  996.  */
  997. for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
  998. switch (TYPE(pp)) {
  999. case P_IBTREE:
  1000. if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
  1001. nbytes =
  1002.     BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
  1003. else
  1004. nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
  1005. break;
  1006. case P_LBTREE:
  1007. /*
  1008.  * If we're on a key and it's a duplicate, just copy
  1009.  * the offset.
  1010.  */
  1011. if (off != 0 && (nxt % P_INDX) == 0 &&
  1012.     pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
  1013. cp->inp[off] = cp->inp[off - P_INDX];
  1014. continue;
  1015. }
  1016. /* FALLTHROUGH */
  1017. case P_LDUP:
  1018. case P_LRECNO:
  1019. if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
  1020. nbytes =
  1021.     BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
  1022. else
  1023. nbytes = BOVERFLOW_SIZE;
  1024. break;
  1025. case P_IRECNO:
  1026. nbytes = RINTERNAL_SIZE;
  1027. break;
  1028. default:
  1029. return (__db_pgfmt(dbp, pp->pgno));
  1030. }
  1031. cp->inp[off] = HOFFSET(cp) -= nbytes;
  1032. memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
  1033. }
  1034. return (0);
  1035. }