1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 1997, 1998
5  *	Sleepycat Software.  All rights reserved.
6  */
7 /*
8  * Copyright (c) 1990, 1993, 1994, 1995, 1996
9  *	Keith Bostic.  All rights reserved.
10  */
11 /*
12  * Copyright (c) 1990, 1993, 1994, 1995
13  *	The Regents of the University of California.  All rights reserved.
14  *
15  * This code is derived from software contributed to Berkeley by
16  * Mike Olson.
17  *
18  * Redistribution and use in source and binary forms, with or without
19  * modification, are permitted provided that the following conditions
20  * are met:
21  * 1. Redistributions of source code must retain the above copyright
22  *    notice, this list of conditions and the following disclaimer.
23  * 2. Redistributions in binary form must reproduce the above copyright
24  *    notice, this list of conditions and the following disclaimer in the
25  *    documentation and/or other materials provided with the distribution.
26  * 3. All advertising materials mentioning features or use of this software
27  *    must display the following acknowledgement:
28  *	This product includes software developed by the University of
29  *	California, Berkeley and its contributors.
30  * 4. Neither the name of the University nor the names of its contributors
31  *    may be used to endorse or promote products derived from this software
32  *    without specific prior written permission.
33  *
34  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
35  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44  * SUCH DAMAGE.
45  */
46 
47 #include "config.h"
48 
49 #ifndef lint
50 static const char sccsid[] = "@(#)db_overflow.c	10.21 (Sleepycat) 9/27/98";
51 #endif /* not lint */
52 
53 #ifndef NO_SYSTEM_INCLUDES
54 #include <sys/types.h>
55 
56 #include <errno.h>
57 #include <string.h>
58 #endif
59 
60 #include "db_int.h"
61 #include "db_page.h"
62 #include "db_am.h"
63 #include "common_ext.h"
64 
65 /*
66  * Big key/data code.
67  *
68  * Big key and data entries are stored on linked lists of pages.  The initial
69  * reference is a structure with the total length of the item and the page
70  * number where it begins.  Each entry in the linked list contains a pointer
71  * to the next page of data, and so on.
72  */
73 
74 /*
75  * __db_goff --
76  *	Get an offpage item.
77  *
78  * PUBLIC: int __db_goff __P((DB *, DBT *,
79  * PUBLIC:     u_int32_t, db_pgno_t, void **, u_int32_t *));
80  */
81 int
__db_goff(dbp,dbt,tlen,pgno,bpp,bpsz)82 __db_goff(dbp, dbt, tlen, pgno, bpp, bpsz)
83 	DB *dbp;
84 	DBT *dbt;
85 	u_int32_t tlen;
86 	db_pgno_t pgno;
87 	void **bpp;
88 	u_int32_t *bpsz;
89 {
90 	PAGE *h;
91 	db_indx_t bytes;
92 	u_int32_t curoff, needed, start;
93 	u_int8_t *p, *src;
94 	int ret;
95 
96 	/*
97 	 * Check if the buffer is big enough; if it is not and we are
98 	 * allowed to malloc space, then we'll malloc it.  If we are
99 	 * not (DB_DBT_USERMEM), then we'll set the dbt and return
100 	 * appropriately.
101 	 */
102 	if (F_ISSET(dbt, DB_DBT_PARTIAL)) {
103 		start = dbt->doff;
104 		needed = dbt->dlen;
105 	} else {
106 		start = 0;
107 		needed = tlen;
108 	}
109 
110 	/* Allocate any necessary memory. */
111 	if (F_ISSET(dbt, DB_DBT_USERMEM)) {
112 		if (needed > dbt->ulen) {
113 			dbt->size = needed;
114 			return (ENOMEM);
115 		}
116 	} else if (F_ISSET(dbt, DB_DBT_MALLOC)) {
117 		if ((ret =
118 		    __os_malloc(needed, dbp->db_malloc, &dbt->data)) != 0)
119 			return (ret);
120 	} else if (*bpsz == 0 || *bpsz < needed) {
121 		if ((ret = __os_realloc(bpp, needed)) != 0)
122 			return (ret);
123 		*bpsz = needed;
124 		dbt->data = *bpp;
125 	} else
126 		dbt->data = *bpp;
127 
128 	/*
129 	 * Step through the linked list of pages, copying the data on each
130 	 * one into the buffer.  Never copy more than the total data length.
131 	 */
132 	dbt->size = needed;
133 	for (curoff = 0, p = dbt->data; pgno != P_INVALID && needed > 0;) {
134 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) {
135 			(void)__db_pgerr(dbp, pgno);
136 			return (ret);
137 		}
138 		/* Check if we need any bytes from this page. */
139 		if (curoff + OV_LEN(h) >= start) {
140 			src = (u_int8_t *)h + P_OVERHEAD;
141 			bytes = OV_LEN(h);
142 			if (start > curoff) {
143 				src += start - curoff;
144 				bytes -= start - curoff;
145 			}
146 			if (bytes > needed)
147 				bytes = needed;
148 			memcpy(p, src, bytes);
149 			p += bytes;
150 			needed -= bytes;
151 		}
152 		curoff += OV_LEN(h);
153 		pgno = h->next_pgno;
154 		memp_fput(dbp->mpf, h, 0);
155 	}
156 	return (0);
157 }
158 
159 /*
160  * __db_poff --
161  *	Put an offpage item.
162  *
163  * PUBLIC: int __db_poff __P((DBC *, const DBT *, db_pgno_t *,
164  * PUBLIC:     int (*)(DBC *, u_int32_t, PAGE **)));
165  */
166 int
__db_poff(dbc,dbt,pgnop,newfunc)167 __db_poff(dbc, dbt, pgnop, newfunc)
168 	DBC *dbc;
169 	const DBT *dbt;
170 	db_pgno_t *pgnop;
171 	int (*newfunc) __P((DBC *, u_int32_t, PAGE **));
172 {
173 	DB *dbp;
174 	PAGE *pagep, *lastp;
175 	DB_LSN new_lsn, null_lsn;
176 	DBT tmp_dbt;
177 	db_indx_t pagespace;
178 	u_int32_t sz;
179 	u_int8_t *p;
180 	int ret;
181 
182 	/*
183 	 * Allocate pages and copy the key/data item into them.  Calculate the
184 	 * number of bytes we get for pages we fill completely with a single
185 	 * item.
186 	 */
187 	dbp = dbc->dbp;
188 	pagespace = P_MAXSPACE(dbp->pgsize);
189 
190 	lastp = NULL;
191 	for (p = dbt->data,
192 	    sz = dbt->size; sz > 0; p += pagespace, sz -= pagespace) {
193 		/*
194 		 * Reduce pagespace so we terminate the loop correctly and
195 		 * don't copy too much data.
196 		 */
197 		if (sz < pagespace)
198 			pagespace = sz;
199 
200 		/*
201 		 * Allocate and initialize a new page and copy all or part of
202 		 * the item onto the page.  If sz is less than pagespace, we
203 		 * have a partial record.
204 		 */
205 		if ((ret = newfunc(dbc, P_OVERFLOW, &pagep)) != 0)
206 			return (ret);
207 		if (DB_LOGGING(dbc)) {
208 			tmp_dbt.data = p;
209 			tmp_dbt.size = pagespace;
210 			ZERO_LSN(null_lsn);
211 			if ((ret = __db_big_log(dbp->dbenv->lg_info, dbc->txn,
212 			    &new_lsn, 0, DB_ADD_BIG, dbp->log_fileid,
213 			    PGNO(pagep), lastp ? PGNO(lastp) : PGNO_INVALID,
214 			    PGNO_INVALID, &tmp_dbt, &LSN(pagep),
215 			    lastp == NULL ? &null_lsn : &LSN(lastp),
216 			    &null_lsn)) != 0)
217 				return (ret);
218 
219 			/* Move lsn onto page. */
220 			if (lastp)
221 				LSN(lastp) = new_lsn;
222 			LSN(pagep) = new_lsn;
223 		}
224 
225 		P_INIT(pagep, dbp->pgsize,
226 		    PGNO(pagep), PGNO_INVALID, PGNO_INVALID, 0, P_OVERFLOW);
227 		OV_LEN(pagep) = pagespace;
228 		OV_REF(pagep) = 1;
229 		memcpy((u_int8_t *)pagep + P_OVERHEAD, p, pagespace);
230 
231 		/*
232 		 * If this is the first entry, update the user's info.
233 		 * Otherwise, update the entry on the last page filled
234 		 * in and release that page.
235 		 */
236 		if (lastp == NULL)
237 			*pgnop = PGNO(pagep);
238 		else {
239 			lastp->next_pgno = PGNO(pagep);
240 			pagep->prev_pgno = PGNO(lastp);
241 			(void)memp_fput(dbp->mpf, lastp, DB_MPOOL_DIRTY);
242 		}
243 		lastp = pagep;
244 	}
245 	(void)memp_fput(dbp->mpf, lastp, DB_MPOOL_DIRTY);
246 	return (0);
247 }
248 
249 /*
250  * __db_ovref --
251  *	Increment/decrement the reference count on an overflow page.
252  *
253  * PUBLIC: int __db_ovref __P((DBC *, db_pgno_t, int32_t));
254  */
255 int
__db_ovref(dbc,pgno,adjust)256 __db_ovref(dbc, pgno, adjust)
257 	DBC *dbc;
258 	db_pgno_t pgno;
259 	int32_t adjust;
260 {
261 	DB *dbp;
262 	PAGE *h;
263 	int ret;
264 
265 	dbp = dbc->dbp;
266 	if ((ret = memp_fget(dbp->mpf, &pgno, 0, &h)) != 0) {
267 		(void)__db_pgerr(dbp, pgno);
268 		return (ret);
269 	}
270 
271 	if (DB_LOGGING(dbc))
272 		if ((ret = __db_ovref_log(dbp->dbenv->lg_info, dbc->txn,
273 		    &LSN(h), 0, dbp->log_fileid, h->pgno, adjust,
274 		    &LSN(h))) != 0)
275 			return (ret);
276 	OV_REF(h) += adjust;
277 
278 	(void)memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY);
279 	return (0);
280 }
281 
282 /*
283  * __db_doff --
284  *	Delete an offpage chain of overflow pages.
285  *
286  * PUBLIC: int __db_doff __P((DBC *, db_pgno_t, int (*)(DBC *, PAGE *)));
287  */
288 int
__db_doff(dbc,pgno,freefunc)289 __db_doff(dbc, pgno, freefunc)
290 	DBC *dbc;
291 	db_pgno_t pgno;
292 	int (*freefunc) __P((DBC *, PAGE *));
293 {
294 	DB *dbp;
295 	PAGE *pagep;
296 	DB_LSN null_lsn;
297 	DBT tmp_dbt;
298 	int ret;
299 
300 	dbp = dbc->dbp;
301 	do {
302 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0) {
303 			(void)__db_pgerr(dbp, pgno);
304 			return (ret);
305 		}
306 
307 		/*
308 		 * If it's an overflow page and it's referenced by more than
309 		 * one key/data item, decrement the reference count and return.
310 		 */
311 		if (TYPE(pagep) == P_OVERFLOW && OV_REF(pagep) > 1) {
312 			(void)memp_fput(dbp->mpf, pagep, 0);
313 			return (__db_ovref(dbc, pgno, -1));
314 		}
315 
316 		if (DB_LOGGING(dbc)) {
317 			tmp_dbt.data = (u_int8_t *)pagep + P_OVERHEAD;
318 			tmp_dbt.size = OV_LEN(pagep);
319 			ZERO_LSN(null_lsn);
320 			if ((ret = __db_big_log(dbp->dbenv->lg_info, dbc->txn,
321 			    &LSN(pagep), 0, DB_REM_BIG, dbp->log_fileid,
322 			    PGNO(pagep), PREV_PGNO(pagep), NEXT_PGNO(pagep),
323 			    &tmp_dbt, &LSN(pagep), &null_lsn, &null_lsn)) != 0)
324 				return (ret);
325 		}
326 		pgno = pagep->next_pgno;
327 		if ((ret = freefunc(dbc, pagep)) != 0)
328 			return (ret);
329 	} while (pgno != PGNO_INVALID);
330 
331 	return (0);
332 }
333 
334 /*
335  * __db_moff --
336  *	Match on overflow pages.
337  *
338  * Given a starting page number and a key, return <0, 0, >0 to indicate if the
339  * key on the page is less than, equal to or greater than the key specified.
340  * We optimize this by doing chunk at a time comparison unless the user has
341  * specified a comparison function.  In this case, we need to materialize
342  * the entire object and call their comparison routine.
343  *
344  * PUBLIC: int __db_moff __P((DB *, const DBT *, db_pgno_t, u_int32_t,
345  * PUBLIC:     int (*)(const DBT *, const DBT *), int *));
346  */
347 int
__db_moff(dbp,dbt,pgno,tlen,cmpfunc,cmpp)348 __db_moff(dbp, dbt, pgno, tlen, cmpfunc, cmpp)
349 	DB *dbp;
350 	const DBT *dbt;
351 	db_pgno_t pgno;
352 	u_int32_t tlen;
353 	int (*cmpfunc) __P((const DBT *, const DBT *)), *cmpp;
354 {
355 	PAGE *pagep;
356 	DBT local_dbt;
357 	void *buf;
358 	u_int32_t bufsize, cmp_bytes, key_left;
359 	u_int8_t *p1, *p2;
360 	int ret;
361 
362 	/*
363 	 * If there is a user-specified comparison function, build a
364 	 * contiguous copy of the key, and call it.
365 	 */
366 	if (cmpfunc != NULL) {
367 		memset(&local_dbt, 0, sizeof(local_dbt));
368 		buf = NULL;
369 		bufsize = 0;
370 
371 		if ((ret = __db_goff(dbp,
372 		    &local_dbt, tlen, pgno, &buf, &bufsize)) != 0)
373 			return (ret);
374 		*cmpp = cmpfunc(&local_dbt, dbt);
375 		__os_free(buf, bufsize);
376 		return (0);
377 	}
378 
379 	/* While there are both keys to compare. */
380 	for (*cmpp = 0, p1 = dbt->data,
381 	    key_left = dbt->size; key_left > 0 && pgno != PGNO_INVALID;) {
382 		if ((ret = memp_fget(dbp->mpf, &pgno, 0, &pagep)) != 0)
383 			return (ret);
384 
385 		cmp_bytes = OV_LEN(pagep) < key_left ? OV_LEN(pagep) : key_left;
386 		key_left -= cmp_bytes;
387 		for (p2 =
388 		    (u_int8_t *)pagep + P_OVERHEAD; cmp_bytes-- > 0; ++p1, ++p2)
389 			if (*p1 != *p2) {
390 				*cmpp = (long)*p1 - (long)*p2;
391 				break;
392 			}
393 		pgno = NEXT_PGNO(pagep);
394 		if ((ret = memp_fput(dbp->mpf, pagep, 0)) != 0)
395 			return (ret);
396 		if (*cmpp != 0)
397 			return (0);
398 	}
399 	if (key_left > 0)		/* DBT is longer than page key. */
400 		*cmpp = -1;
401 	else if (pgno != PGNO_INVALID)	/* DBT is shorter than page key. */
402 		*cmpp = 1;
403 	else
404 		*cmpp = 0;
405 
406 	return (0);
407 }
408