1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 1996, 1997, 1998
5 *	Sleepycat Software.  All rights reserved.
6 */
7/*
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 *	Keith Bostic.  All rights reserved.
10 */
11/*
12 * Copyright (c) 1990, 1993, 1994, 1995
13 *	The Regents of the University of California.  All rights reserved.
14 *
15 * Redistribution and use in source and binary forms, with or without
16 * modification, are permitted provided that the following conditions
17 * are met:
18 * 1. Redistributions of source code must retain the above copyright
19 *    notice, this list of conditions and the following disclaimer.
20 * 2. Redistributions in binary form must reproduce the above copyright
21 *    notice, this list of conditions and the following disclaimer in the
22 *    documentation and/or other materials provided with the distribution.
23 * 3. All advertising materials mentioning features or use of this software
24 *    must display the following acknowledgement:
25 *	This product includes software developed by the University of
26 *	California, Berkeley and its contributors.
27 * 4. Neither the name of the University nor the names of its contributors
28 *    may be used to endorse or promote products derived from this software
29 *    without specific prior written permission.
30 *
31 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
32 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
33 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
35 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
36 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
37 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
38 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
39 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
40 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
41 * SUCH DAMAGE.
42 */
43
44#include "config.h"
45
46#ifndef lint
47static const char sccsid[] = "@(#)bt_split.c	10.33 (Sleepycat) 10/13/98";
48#endif /* not lint */
49
50#ifndef NO_SYSTEM_INCLUDES
51#include <sys/types.h>
52
53#include <errno.h>
54#include <limits.h>
55#include <string.h>
56#endif
57
58#include "db_int.h"
59#include "db_page.h"
60#include "btree.h"
61
62static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
63static int __bam_page __P((DBC *, EPG *, EPG *));
64static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *));
65static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
66static int __bam_root __P((DBC *, EPG *));
67static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
68
69/*
70 * __bam_split --
71 *	Split a page.
72 *
73 * PUBLIC: int __bam_split __P((DBC *, void *));
74 */
75int
76__bam_split(dbc, arg)
77	DBC *dbc;
78	void *arg;
79{
80	BTREE *t;
81	CURSOR *cp;
82	DB *dbp;
83	enum { UP, DOWN } dir;
84	int exact, level, ret;
85
86	dbp = dbc->dbp;
87	cp = dbc->internal;
88
89	/*
90	 * The locking protocol we use to avoid deadlock to acquire locks by
91	 * walking down the tree, but we do it as lazily as possible, locking
92	 * the root only as a last resort.  We expect all stack pages to have
93	 * been discarded before we're called; we discard all short-term locks.
94	 *
95	 * When __bam_split is first called, we know that a leaf page was too
96	 * full for an insert.  We don't know what leaf page it was, but we
97	 * have the key/recno that caused the problem.  We call XX_search to
98	 * reacquire the leaf page, but this time get both the leaf page and
99	 * its parent, locked.  We then split the leaf page and see if the new
100	 * internal key will fit into the parent page.  If it will, we're done.
101	 *
102	 * If it won't, we discard our current locks and repeat the process,
103	 * only this time acquiring the parent page and its parent, locked.
104	 * This process repeats until we succeed in the split, splitting the
105	 * root page as the final resort.  The entire process then repeats,
106	 * as necessary, until we split a leaf page.
107	 *
108	 * XXX
109	 * A traditional method of speeding this up is to maintain a stack of
110	 * the pages traversed in the original search.  You can detect if the
111	 * stack is correct by storing the page's LSN when it was searched and
112	 * comparing that LSN with the current one when it's locked during the
113	 * split.  This would be an easy change for this code, but I have no
114	 * numbers that indicate it's worthwhile.
115	 */
116	t = dbp->internal;
117	for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
118		/*
119		 * Acquire a page and its parent, locked.
120		 */
121		if ((ret = (dbp->type == DB_BTREE ?
122		    __bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
123		    __bam_rsearch(dbc,
124		        (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
125			return (ret);
126
127		/*
128		 * Split the page if it still needs it (it's possible another
129		 * thread of control has already split the page).  If we are
130		 * guaranteed that two items will fit on the page, the split
131		 * is no longer necessary.
132		 */
133		if (t->bt_ovflsize * 2 <=
134		    (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
135			__bam_stkrel(dbc, 1);
136			return (0);
137		}
138		ret = cp->csp[0].page->pgno == PGNO_ROOT ?
139		    __bam_root(dbc, &cp->csp[0]) :
140		    __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
141		BT_STK_CLR(cp);
142
143		switch (ret) {
144		case 0:
145			/* Once we've split the leaf page, we're done. */
146			if (level == LEAFLEVEL)
147				return (0);
148
149			/* Switch directions. */
150			if (dir == UP)
151				dir = DOWN;
152			break;
153		case DB_NEEDSPLIT:
154			/*
155			 * It's possible to fail to split repeatedly, as other
156			 * threads may be modifying the tree, or the page usage
157			 * is sufficiently bad that we don't get enough space
158			 * the first time.
159			 */
160			if (dir == DOWN)
161				dir = UP;
162			break;
163		default:
164			return (ret);
165		}
166	}
167	/* NOTREACHED */
168}
169
170/*
171 * __bam_root --
172 *	Split the root page of a btree.
173 */
174static int
175__bam_root(dbc, cp)
176	DBC *dbc;
177	EPG *cp;
178{
179	DB *dbp;
180	PAGE *lp, *rp;
181	db_indx_t split;
182	int ret;
183
184	dbp = dbc->dbp;
185
186	/* Yeah, right. */
187	if (cp->page->level >= MAXBTREELEVEL) {
188		ret = ENOSPC;
189		goto err;
190	}
191
192	/* Create new left and right pages for the split. */
193	lp = rp = NULL;
194	if ((ret = __bam_new(dbc, TYPE(cp->page), &lp)) != 0 ||
195	    (ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
196		goto err;
197	P_INIT(lp, dbp->pgsize, lp->pgno,
198	    PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
199	    cp->page->level, TYPE(cp->page));
200	P_INIT(rp, dbp->pgsize, rp->pgno,
201	    ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
202	    cp->page->level, TYPE(cp->page));
203
204	/* Split the page. */
205	if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
206		goto err;
207
208	/* Log the change. */
209	if (DB_LOGGING(dbc)) {
210		DBT __a;
211		DB_LSN __lsn;
212		memset(&__a, 0, sizeof(__a));
213		__a.data = cp->page;
214		__a.size = dbp->pgsize;
215		ZERO_LSN(__lsn);
216		if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
217		    &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
218		    PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn,
219		    &__a)) != 0)
220			goto err;
221		LSN(lp) = LSN(rp) = LSN(cp->page);
222	}
223
224	/* Clean up the new root page. */
225	if ((ret = (dbp->type == DB_RECNO ?
226	    __ram_root(dbc, cp->page, lp, rp) :
227	    __bam_broot(dbc, cp->page, lp, rp))) != 0)
228		goto err;
229
230	/* Adjust any cursors.  Do it last so we don't have to undo it. */
231	__bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
232
233	/* Success -- write the real pages back to the store. */
234	(void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
235	(void)__BT_TLPUT(dbc, cp->lock);
236	(void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
237	(void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
238
239	return (0);
240
241err:	if (lp != NULL)
242		(void)__bam_free(dbc, lp);
243	if (rp != NULL)
244		(void)__bam_free(dbc, rp);
245	(void)memp_fput(dbp->mpf, cp->page, 0);
246	(void)__BT_TLPUT(dbc, cp->lock);
247	return (ret);
248}
249
250/*
251 * __bam_page --
252 *	Split the non-root page of a btree.
253 */
254static int
255__bam_page(dbc, pp, cp)
256	DBC *dbc;
257	EPG *pp, *cp;
258{
259	DB *dbp;
260	DB_LOCK tplock;
261	PAGE *lp, *rp, *tp;
262	db_indx_t split;
263	int ret;
264
265	dbp = dbc->dbp;
266	lp = rp = tp = NULL;
267	ret = -1;
268
269	/* Create new right page for the split. */
270	if ((ret = __bam_new(dbc, TYPE(cp->page), &rp)) != 0)
271		goto err;
272	P_INIT(rp, dbp->pgsize, rp->pgno,
273	    ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno,
274	    ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno,
275	    cp->page->level, TYPE(cp->page));
276
277	/* Create new left page for the split. */
278	if ((ret = __os_malloc(dbp->pgsize, NULL, &lp)) != 0)
279		goto err;
280	P_INIT(lp, dbp->pgsize, cp->page->pgno,
281	    ISINTERNAL(cp->page) ?  PGNO_INVALID : cp->page->prev_pgno,
282	    ISINTERNAL(cp->page) ?  PGNO_INVALID : rp->pgno,
283	    cp->page->level, TYPE(cp->page));
284	ZERO_LSN(lp->lsn);
285
286	/*
287	 * Split right.
288	 *
289	 * Only the indices are sorted on the page, i.e., the key/data pairs
290	 * aren't, so it's simpler to copy the data from the split page onto
291	 * two new pages instead of copying half the data to the right page
292	 * and compacting the left page in place.  Since the left page can't
293	 * change, we swap the original and the allocated left page after the
294	 * split.
295	 */
296	if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
297		goto err;
298
299	/*
300	 * Fix up the previous pointer of any leaf page following the split
301	 * page.
302	 *
303	 * !!!
304	 * There are interesting deadlock situations here as we write-lock a
305	 * page that's not in our direct ancestry.  Consider a cursor walking
306	 * through the leaf pages, that has the previous page read-locked and
307	 * is waiting on a lock for the page we just split.  It will deadlock
308	 * here.  If this is a problem, we can fail in the split; it's not a
309	 * problem as the split will succeed after the cursor passes through
310	 * the page we're splitting.
311	 */
312	if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) {
313		if ((ret = __bam_lget(dbc,
314		    0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0)
315			goto err;
316		if ((ret = memp_fget(dbp->mpf, &rp->next_pgno, 0, &tp)) != 0)
317			goto err;
318	}
319
320	/* Insert the new pages into the parent page. */
321	if ((ret = __bam_pinsert(dbc, pp, lp, rp)) != 0)
322		goto err;
323
324	/* Log the change. */
325	if (DB_LOGGING(dbc)) {
326		DBT __a;
327		DB_LSN __lsn;
328		memset(&__a, 0, sizeof(__a));
329		__a.data = cp->page;
330		__a.size = dbp->pgsize;
331		if (tp == NULL)
332			ZERO_LSN(__lsn);
333		if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbc->txn,
334		    &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page),
335		    &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp),
336		    tp == NULL ? 0 : PGNO(tp),
337		    tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0)
338			goto err;
339
340		LSN(lp) = LSN(rp) = LSN(cp->page);
341		if (tp != NULL)
342			LSN(tp) = LSN(cp->page);
343	}
344
345	/* Copy the allocated page into place. */
346	memcpy(cp->page, lp, LOFFSET(lp));
347	memcpy((u_int8_t *)cp->page + HOFFSET(lp),
348	    (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
349	__os_free(lp, dbp->pgsize);
350	lp = NULL;
351
352	/* Finish the next-page link. */
353	if (tp != NULL)
354		tp->prev_pgno = rp->pgno;
355
356	/* Adjust any cursors.  Do so last so we don't have to undo it. */
357	__bam_ca_split(dbp, cp->page->pgno, cp->page->pgno, rp->pgno, split, 0);
358
359	/* Success -- write the real pages back to the store. */
360	(void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY);
361	(void)__BT_TLPUT(dbc, pp->lock);
362	(void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
363	(void)__BT_TLPUT(dbc, cp->lock);
364	(void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
365	if (tp != NULL) {
366		(void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY);
367		(void)__BT_TLPUT(dbc, tplock);
368	}
369	return (0);
370
371err:	if (lp != NULL)
372		__os_free(lp, dbp->pgsize);
373	if (rp != NULL)
374		(void)__bam_free(dbc, rp);
375	if (tp != NULL) {
376		(void)memp_fput(dbp->mpf, tp, 0);
377		if (ret == DB_NEEDSPLIT)
378			(void)__BT_LPUT(dbc, tplock);
379		else
380			(void)__BT_TLPUT(dbc, tplock);
381	}
382	(void)memp_fput(dbp->mpf, pp->page, 0);
383	if (ret == DB_NEEDSPLIT)
384		(void)__BT_LPUT(dbc, pp->lock);
385	else
386		(void)__BT_TLPUT(dbc, pp->lock);
387	(void)memp_fput(dbp->mpf, cp->page, 0);
388	if (ret == DB_NEEDSPLIT)
389		(void)__BT_LPUT(dbc, cp->lock);
390	else
391		(void)__BT_TLPUT(dbc, cp->lock);
392	return (ret);
393}
394
395/*
396 * __bam_broot --
397 *	Fix up the btree root page after it has been split.
398 */
399static int
400__bam_broot(dbc, rootp, lp, rp)
401	DBC *dbc;
402	PAGE *rootp, *lp, *rp;
403{
404	BINTERNAL bi, *child_bi;
405	BKEYDATA *child_bk;
406	DB *dbp;
407	DBT hdr, data;
408	int ret;
409
410	dbp = dbc->dbp;
411
412	/*
413	 * If the root page was a leaf page, change it into an internal page.
414	 * We copy the key we split on (but not the key's data, in the case of
415	 * a leaf page) to the new root page.
416	 */
417	P_INIT(rootp, dbp->pgsize,
418	    PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
419
420	memset(&data, 0, sizeof(data));
421	memset(&hdr, 0, sizeof(hdr));
422
423	/*
424	 * The btree comparison code guarantees that the left-most key on any
425	 * level of the tree is never used, so it doesn't need to be filled in.
426	 */
427	memset(&bi, 0, sizeof(bi));
428	bi.len = 0;
429	B_TSET(bi.type, B_KEYDATA, 0);
430	bi.pgno = lp->pgno;
431	if (F_ISSET(dbp, DB_BT_RECNUM)) {
432		bi.nrecs = __bam_total(lp);
433		RE_NREC_SET(rootp, bi.nrecs);
434	}
435	hdr.data = &bi;
436	hdr.size = SSZA(BINTERNAL, data);
437	if ((ret =
438	    __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
439		return (ret);
440
441	switch (TYPE(rp)) {
442	case P_IBTREE:
443		/* Copy the first key of the child page onto the root page. */
444		child_bi = GET_BINTERNAL(rp, 0);
445
446		bi.len = child_bi->len;
447		B_TSET(bi.type, child_bi->type, 0);
448		bi.pgno = rp->pgno;
449		if (F_ISSET(dbp, DB_BT_RECNUM)) {
450			bi.nrecs = __bam_total(rp);
451			RE_NREC_ADJ(rootp, bi.nrecs);
452		}
453		hdr.data = &bi;
454		hdr.size = SSZA(BINTERNAL, data);
455		data.data = child_bi->data;
456		data.size = child_bi->len;
457		if ((ret = __db_pitem(dbc, rootp, 1,
458		    BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
459			return (ret);
460
461		/* Increment the overflow ref count. */
462		if (B_TYPE(child_bi->type) == B_OVERFLOW)
463			if ((ret = __db_ovref(dbc,
464			    ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
465				return (ret);
466		break;
467	case P_LBTREE:
468		/* Copy the first key of the child page onto the root page. */
469		child_bk = GET_BKEYDATA(rp, 0);
470		switch (B_TYPE(child_bk->type)) {
471		case B_KEYDATA:
472			bi.len = child_bk->len;
473			B_TSET(bi.type, child_bk->type, 0);
474			bi.pgno = rp->pgno;
475			if (F_ISSET(dbp, DB_BT_RECNUM)) {
476				bi.nrecs = __bam_total(rp);
477				RE_NREC_ADJ(rootp, bi.nrecs);
478			}
479			hdr.data = &bi;
480			hdr.size = SSZA(BINTERNAL, data);
481			data.data = child_bk->data;
482			data.size = child_bk->len;
483			if ((ret = __db_pitem(dbc, rootp, 1,
484			    BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
485				return (ret);
486			break;
487		case B_DUPLICATE:
488		case B_OVERFLOW:
489			bi.len = BOVERFLOW_SIZE;
490			B_TSET(bi.type, child_bk->type, 0);
491			bi.pgno = rp->pgno;
492			if (F_ISSET(dbp, DB_BT_RECNUM)) {
493				bi.nrecs = __bam_total(rp);
494				RE_NREC_ADJ(rootp, bi.nrecs);
495			}
496			hdr.data = &bi;
497			hdr.size = SSZA(BINTERNAL, data);
498			data.data = child_bk;
499			data.size = BOVERFLOW_SIZE;
500			if ((ret = __db_pitem(dbc, rootp, 1,
501			    BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
502				return (ret);
503
504			/* Increment the overflow ref count. */
505			if (B_TYPE(child_bk->type) == B_OVERFLOW)
506				if ((ret = __db_ovref(dbc,
507				    ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
508					return (ret);
509			break;
510		default:
511			return (__db_pgfmt(dbp, rp->pgno));
512		}
513		break;
514	default:
515		return (__db_pgfmt(dbp, rp->pgno));
516	}
517	return (0);
518}
519
520/*
521 * __ram_root --
522 *	Fix up the recno root page after it has been split.
523 */
524static int
525__ram_root(dbc, rootp, lp, rp)
526	DBC *dbc;
527	PAGE *rootp, *lp, *rp;
528{
529	DB *dbp;
530	DBT hdr;
531	RINTERNAL ri;
532	int ret;
533
534	dbp = dbc->dbp;
535
536	/* Initialize the page. */
537	P_INIT(rootp, dbp->pgsize,
538	    PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
539
540	/* Initialize the header. */
541	memset(&hdr, 0, sizeof(hdr));
542	hdr.data = &ri;
543	hdr.size = RINTERNAL_SIZE;
544
545	/* Insert the left and right keys, set the header information. */
546	ri.pgno = lp->pgno;
547	ri.nrecs = __bam_total(lp);
548	if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
549		return (ret);
550	RE_NREC_SET(rootp, ri.nrecs);
551	ri.pgno = rp->pgno;
552	ri.nrecs = __bam_total(rp);
553	if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
554		return (ret);
555	RE_NREC_ADJ(rootp, ri.nrecs);
556	return (0);
557}
558
559/*
560 * __bam_pinsert --
561 *	Insert a new key into a parent page, completing the split.
562 */
563static int
564__bam_pinsert(dbc, parent, lchild, rchild)
565	DBC *dbc;
566	EPG *parent;
567	PAGE *lchild, *rchild;
568{
569	BINTERNAL bi, *child_bi;
570	BKEYDATA *child_bk, *tmp_bk;
571	BTREE *t;
572	DB *dbp;
573	DBT a, b, hdr, data;
574	PAGE *ppage;
575	RINTERNAL ri;
576	db_indx_t off;
577	db_recno_t nrecs;
578	u_int32_t n, nbytes, nksize;
579	int ret;
580
581	dbp = dbc->dbp;
582	t = dbp->internal;
583	ppage = parent->page;
584
585	/* If handling record numbers, count records split to the right page. */
586	nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ?
587	    __bam_total(rchild) : 0;
588
589	/*
590	 * Now we insert the new page's first key into the parent page, which
591	 * completes the split.  The parent points to a PAGE and a page index
592	 * offset, where the new key goes ONE AFTER the index, because we split
593	 * to the right.
594	 *
595	 * XXX
596	 * Some btree algorithms replace the key for the old page as well as
597	 * the new page.  We don't, as there's no reason to believe that the
598	 * first key on the old page is any better than the key we have, and,
599	 * in the case of a key being placed at index 0 causing the split, the
600	 * key is unavailable.
601	 */
602	off = parent->indx + O_INDX;
603
604	/*
605	 * Calculate the space needed on the parent page.
606	 *
607	 * Prefix trees: space hack used when inserting into BINTERNAL pages.
608	 * Retain only what's needed to distinguish between the new entry and
609	 * the LAST entry on the page to its left.  If the keys compare equal,
610	 * retain the entire key.  We ignore overflow keys, and the entire key
611	 * must be retained for the next-to-leftmost key on the leftmost page
612	 * of each level, or the search will fail.  Applicable ONLY to internal
613	 * pages that have leaf pages as children.  Further reduction of the
614	 * key between pairs of internal pages loses too much information.
615	 */
616	switch (TYPE(rchild)) {
617	case P_IBTREE:
618		child_bi = GET_BINTERNAL(rchild, 0);
619		nbytes = BINTERNAL_PSIZE(child_bi->len);
620
621		if (P_FREESPACE(ppage) < nbytes)
622			return (DB_NEEDSPLIT);
623
624		/* Add a new record for the right page. */
625		memset(&bi, 0, sizeof(bi));
626		bi.len = child_bi->len;
627		B_TSET(bi.type, child_bi->type, 0);
628		bi.pgno = rchild->pgno;
629		bi.nrecs = nrecs;
630		memset(&hdr, 0, sizeof(hdr));
631		hdr.data = &bi;
632		hdr.size = SSZA(BINTERNAL, data);
633		memset(&data, 0, sizeof(data));
634		data.data = child_bi->data;
635		data.size = child_bi->len;
636		if ((ret = __db_pitem(dbc, ppage, off,
637		    BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
638			return (ret);
639
640		/* Increment the overflow ref count. */
641		if (B_TYPE(child_bi->type) == B_OVERFLOW)
642			if ((ret = __db_ovref(dbc,
643			    ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
644				return (ret);
645		break;
646	case P_LBTREE:
647		child_bk = GET_BKEYDATA(rchild, 0);
648		switch (B_TYPE(child_bk->type)) {
649		case B_KEYDATA:
650			nbytes = BINTERNAL_PSIZE(child_bk->len);
651			nksize = child_bk->len;
652			if (t->bt_prefix == NULL)
653				goto noprefix;
654			if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
655				goto noprefix;
656			tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX);
657			if (B_TYPE(tmp_bk->type) != B_KEYDATA)
658				goto noprefix;
659			memset(&a, 0, sizeof(a));
660			a.size = tmp_bk->len;
661			a.data = tmp_bk->data;
662			memset(&b, 0, sizeof(b));
663			b.size = child_bk->len;
664			b.data = child_bk->data;
665			nksize = t->bt_prefix(&a, &b);
666			if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
667				nbytes = n;
668			else
669noprefix:			nksize = child_bk->len;
670
671			if (P_FREESPACE(ppage) < nbytes)
672				return (DB_NEEDSPLIT);
673
674			memset(&bi, 0, sizeof(bi));
675			bi.len = nksize;
676			B_TSET(bi.type, child_bk->type, 0);
677			bi.pgno = rchild->pgno;
678			bi.nrecs = nrecs;
679			memset(&hdr, 0, sizeof(hdr));
680			hdr.data = &bi;
681			hdr.size = SSZA(BINTERNAL, data);
682			memset(&data, 0, sizeof(data));
683			data.data = child_bk->data;
684			data.size = nksize;
685			if ((ret = __db_pitem(dbc, ppage, off,
686			    BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
687				return (ret);
688			break;
689		case B_DUPLICATE:
690		case B_OVERFLOW:
691			nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
692
693			if (P_FREESPACE(ppage) < nbytes)
694				return (DB_NEEDSPLIT);
695
696			memset(&bi, 0, sizeof(bi));
697			bi.len = BOVERFLOW_SIZE;
698			B_TSET(bi.type, child_bk->type, 0);
699			bi.pgno = rchild->pgno;
700			bi.nrecs = nrecs;
701			memset(&hdr, 0, sizeof(hdr));
702			hdr.data = &bi;
703			hdr.size = SSZA(BINTERNAL, data);
704			memset(&data, 0, sizeof(data));
705			data.data = child_bk;
706			data.size = BOVERFLOW_SIZE;
707			if ((ret = __db_pitem(dbc, ppage, off,
708			    BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
709				return (ret);
710
711			/* Increment the overflow ref count. */
712			if (B_TYPE(child_bk->type) == B_OVERFLOW)
713				if ((ret = __db_ovref(dbc,
714				    ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
715					return (ret);
716			break;
717		default:
718			return (__db_pgfmt(dbp, rchild->pgno));
719		}
720		break;
721	case P_IRECNO:
722	case P_LRECNO:
723		nbytes = RINTERNAL_PSIZE;
724
725		if (P_FREESPACE(ppage) < nbytes)
726			return (DB_NEEDSPLIT);
727
728		/* Add a new record for the right page. */
729		memset(&hdr, 0, sizeof(hdr));
730		hdr.data = &ri;
731		hdr.size = RINTERNAL_SIZE;
732		ri.pgno = rchild->pgno;
733		ri.nrecs = nrecs;
734		if ((ret = __db_pitem(dbc,
735		    ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
736			return (ret);
737		break;
738	default:
739		return (__db_pgfmt(dbp, rchild->pgno));
740	}
741
742	/* Adjust the parent page's left page record count. */
743	if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
744		/* Log the change. */
745		if (DB_LOGGING(dbc) &&
746		    (ret = __bam_cadjust_log(dbp->dbenv->lg_info,
747		    dbc->txn, &LSN(ppage), 0, dbp->log_fileid,
748		    PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx,
749		    -(int32_t)nrecs, (int32_t)0)) != 0)
750			return (ret);
751
752		/* Update the left page count. */
753		if (dbp->type == DB_RECNO)
754			GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
755		else
756			GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
757	}
758
759	return (0);
760}
761
762/*
763 * __bam_psplit --
764 *	Do the real work of splitting the page.
765 */
766static int
767__bam_psplit(dbc, cp, lp, rp, splitret)
768	DBC *dbc;
769	EPG *cp;
770	PAGE *lp, *rp;
771	db_indx_t *splitret;
772{
773	DB *dbp;
774	PAGE *pp;
775	db_indx_t half, nbytes, off, splitp, top;
776	int adjust, cnt, isbigkey, ret;
777
778	dbp = dbc->dbp;
779	pp = cp->page;
780	adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
781
782	/*
783	 * If we're splitting the first (last) page on a level because we're
784	 * inserting (appending) a key to it, it's likely that the data is
785	 * sorted.  Moving a single item to the new page is less work and can
786	 * push the fill factor higher than normal.  If we're wrong it's not
787	 * a big deal, we'll just do the split the right way next time.
788	 */
789	off = 0;
790	if (NEXT_PGNO(pp) == PGNO_INVALID &&
791	    ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
792	    (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
793		off = NUM_ENT(cp->page) - adjust;
794	else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
795		off = adjust;
796
797	if (off != 0)
798		goto sort;
799
800	/*
801	 * Split the data to the left and right pages.  Try not to split on
802	 * an overflow key.  (Overflow keys on internal pages will slow down
803	 * searches.)  Refuse to split in the middle of a set of duplicates.
804	 *
805	 * First, find the optimum place to split.
806	 *
807	 * It's possible to try and split past the last record on the page if
808	 * there's a very large record at the end of the page.  Make sure this
809	 * doesn't happen by bounding the check at the next-to-last entry on
810	 * the page.
811	 *
812	 * Note, we try and split half the data present on the page.  This is
813	 * because another process may have already split the page and left
814	 * it half empty.  We don't try and skip the split -- we don't know
815	 * how much space we're going to need on the page, and we may need up
816	 * to half the page for a big item, so there's no easy test to decide
817	 * if we need to split or not.  Besides, if two threads are inserting
818	 * data into the same place in the database, we're probably going to
819	 * need more space soon anyway.
820	 */
821	top = NUM_ENT(pp) - adjust;
822	half = (dbp->pgsize - HOFFSET(pp)) / 2;
823	for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
824		switch (TYPE(pp)) {
825		case P_IBTREE:
826			if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
827				nbytes +=
828				   BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
829			else
830				nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
831			break;
832		case P_LBTREE:
833			if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
834				nbytes +=
835				    BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
836			else
837				nbytes += BOVERFLOW_SIZE;
838
839			++off;
840			if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
841				nbytes +=
842				    BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
843			else
844				nbytes += BOVERFLOW_SIZE;
845			break;
846		case P_IRECNO:
847			nbytes += RINTERNAL_SIZE;
848			break;
849		case P_LRECNO:
850			nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
851			break;
852		default:
853			return (__db_pgfmt(dbp, pp->pgno));
854		}
855sort:	splitp = off;
856
857	/*
858	 * Splitp is either at or just past the optimum split point.  If
859	 * it's a big key, try and find something close by that's not.
860	 */
861	if (TYPE(pp) == P_IBTREE)
862		isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
863	else if (TYPE(pp) == P_LBTREE)
864		isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
865	else
866		isbigkey = 0;
867	if (isbigkey)
868		for (cnt = 1; cnt <= 3; ++cnt) {
869			off = splitp + cnt * adjust;
870			if (off < (db_indx_t)NUM_ENT(pp) &&
871			    ((TYPE(pp) == P_IBTREE &&
872			    B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
873			    B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
874				splitp = off;
875				break;
876			}
877			if (splitp <= (db_indx_t)(cnt * adjust))
878				continue;
879			off = splitp - cnt * adjust;
880			if (TYPE(pp) == P_IBTREE ?
881			    B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
882			    B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
883				splitp = off;
884				break;
885			}
886		}
887
888	/*
889	 * We can't split in the middle a set of duplicates.  We know that
890	 * no duplicate set can take up more than about 25% of the page,
891	 * because that's the point where we push it off onto a duplicate
892	 * page set.  So, this loop can't be unbounded.
893	 */
894	if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE &&
895	    pp->inp[splitp] == pp->inp[splitp - adjust])
896		for (cnt = 1;; ++cnt) {
897			off = splitp + cnt * adjust;
898			if (off < NUM_ENT(pp) &&
899			    pp->inp[splitp] != pp->inp[off]) {
900				splitp = off;
901				break;
902			}
903			if (splitp <= (db_indx_t)(cnt * adjust))
904				continue;
905			off = splitp - cnt * adjust;
906			if (pp->inp[splitp] != pp->inp[off]) {
907				splitp = off + adjust;
908				break;
909			}
910		}
911
912
913	/* We're going to split at splitp. */
914	if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
915		return (ret);
916	if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
917		return (ret);
918
919	*splitret = splitp;
920	return (0);
921}
922
923/*
924 * __bam_copy --
925 *	Copy a set of records from one page to another.
926 *
927 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
928 */
929int
930__bam_copy(dbp, pp, cp, nxt, stop)
931	DB *dbp;
932	PAGE *pp, *cp;
933	u_int32_t nxt, stop;
934{
935	db_indx_t nbytes, off;
936
937	/*
938	 * Copy the rest of the data to the right page.  Nxt is the next
939	 * offset placed on the target page.
940	 */
941	for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
942		switch (TYPE(pp)) {
943		case P_IBTREE:
944			if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
945				nbytes =
946				    BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
947			else
948				nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
949			break;
950		case P_LBTREE:
951			/*
952			 * If we're on a key and it's a duplicate, just copy
953			 * the offset.
954			 */
955			if (off != 0 && (nxt % P_INDX) == 0 &&
956			    pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
957				cp->inp[off] = cp->inp[off - P_INDX];
958				continue;
959			}
960			/* FALLTHROUGH */
961		case P_LRECNO:
962			if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
963				nbytes =
964				    BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
965			else
966				nbytes = BOVERFLOW_SIZE;
967			break;
968		case P_IRECNO:
969			nbytes = RINTERNAL_SIZE;
970			break;
971		default:
972			return (__db_pgfmt(dbp, pp->pgno));
973		}
974		cp->inp[off] = HOFFSET(cp) -= nbytes;
975		memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
976	}
977	return (0);
978}
979