1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  * Copyright (c) 2016 by Delphix. All rights reserved.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * file_object.c - enter objects into and load them from the backend
31  *
32  * The primary entry points in this layer are object_create(),
33  * object_create_pg(), object_delete(), and object_fill_children().  They each
34  * take an rc_node_t and use the functions in the object_info_t info array for
35  * the node's type.
36  */
37 
38 #include <assert.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #include <strings.h>
44 
45 #include "configd.h"
46 #include "repcache_protocol.h"
47 
48 typedef struct child_info {
49 	rc_node_t	*ci_parent;
50 	backend_tx_t	*ci_tx;			/* only for properties */
51 	rc_node_lookup_t ci_base_nl;
52 } child_info_t;
53 
54 typedef struct delete_ent delete_ent_t;
55 typedef struct delete_stack delete_stack_t;
56 typedef struct delete_info delete_info_t;
57 
58 typedef int	delete_cb_func(delete_info_t *, const delete_ent_t *);
59 
60 struct delete_ent {
61 	delete_cb_func	*de_cb;		/* callback */
62 	uint32_t	de_backend;
63 	uint32_t	de_id;
64 	uint32_t	de_gen;		/* only for property groups */
65 };
66 
67 struct delete_stack {
68 	struct delete_stack *ds_next;
69 	uint32_t	ds_size;	/* number of elements */
70 	uint32_t	ds_cur;		/* current offset */
71 	delete_ent_t	ds_buf[1];	/* actually ds_size */
72 };
73 #define	DELETE_STACK_SIZE(x)	offsetof(delete_stack_t, ds_buf[(x)])
74 
75 struct delete_info {
76 	backend_tx_t	*di_tx;
77 	backend_tx_t	*di_np_tx;
78 	delete_stack_t	*di_stack;
79 	delete_stack_t	*di_free;
80 };
81 
82 typedef struct object_info {
83 	uint32_t	obj_type;
84 	enum id_space	obj_id_space;
85 
86 	int (*obj_fill_children)(rc_node_t *);
87 	int (*obj_setup_child_info)(rc_node_t *, uint32_t, child_info_t *);
88 	int (*obj_query_child)(backend_query_t *, rc_node_lookup_t *,
89 	    const char *);
90 	int (*obj_insert_child)(backend_tx_t *, rc_node_lookup_t *,
91 	    const char *);
92 	int (*obj_insert_pg_child)(backend_tx_t *, rc_node_lookup_t *,
93 	    const char *, const char *, uint32_t, uint32_t);
94 	int (*obj_delete_start)(rc_node_t *, delete_info_t *);
95 } object_info_t;
96 
97 static void
string_to_id(const char * str,uint32_t * output,const char * fieldname)98 string_to_id(const char *str, uint32_t *output, const char *fieldname)
99 {
100 	if (uu_strtouint(str, output, sizeof (*output), 0, 0, 0) == -1)
101 		backend_panic("invalid integer \"%s\" in field \"%s\"",
102 		    str, fieldname);
103 }
104 
105 #define	NUM_NEEDED	50
106 
107 static int
delete_stack_push(delete_info_t * dip,uint32_t be,delete_cb_func * cb,uint32_t id,uint32_t gen)108 delete_stack_push(delete_info_t *dip, uint32_t be, delete_cb_func *cb,
109     uint32_t id, uint32_t gen)
110 {
111 	delete_stack_t *cur = dip->di_stack;
112 	delete_ent_t *ent;
113 
114 	if (cur == NULL || cur->ds_cur == cur->ds_size) {
115 		delete_stack_t *new = dip->di_free;
116 		dip->di_free = NULL;
117 		if (new == NULL) {
118 			new = uu_zalloc(DELETE_STACK_SIZE(NUM_NEEDED));
119 			if (new == NULL)
120 				return (REP_PROTOCOL_FAIL_NO_RESOURCES);
121 			new->ds_size = NUM_NEEDED;
122 		}
123 		new->ds_cur = 0;
124 		new->ds_next = dip->di_stack;
125 		dip->di_stack = new;
126 		cur = new;
127 	}
128 	assert(cur->ds_cur < cur->ds_size);
129 	ent = &cur->ds_buf[cur->ds_cur++];
130 
131 	ent->de_backend = be;
132 	ent->de_cb = cb;
133 	ent->de_id = id;
134 	ent->de_gen = gen;
135 
136 	return (REP_PROTOCOL_SUCCESS);
137 }
138 
139 static int
delete_stack_pop(delete_info_t * dip,delete_ent_t * out)140 delete_stack_pop(delete_info_t *dip, delete_ent_t *out)
141 {
142 	delete_stack_t *cur = dip->di_stack;
143 	delete_ent_t *ent;
144 
145 	if (cur == NULL)
146 		return (0);
147 	assert(cur->ds_cur > 0 && cur->ds_cur <= cur->ds_size);
148 	ent = &cur->ds_buf[--cur->ds_cur];
149 	if (cur->ds_cur == 0) {
150 		dip->di_stack = cur->ds_next;
151 		cur->ds_next = NULL;
152 
153 		if (dip->di_free != NULL)
154 			uu_free(dip->di_free);
155 		dip->di_free = cur;
156 	}
157 	if (ent == NULL)
158 		return (0);
159 
160 	*out = *ent;
161 	return (1);
162 }
163 
164 static void
delete_stack_cleanup(delete_info_t * dip)165 delete_stack_cleanup(delete_info_t *dip)
166 {
167 	delete_stack_t *cur;
168 	while ((cur = dip->di_stack) != NULL) {
169 		dip->di_stack = cur->ds_next;
170 
171 		uu_free(cur);
172 	}
173 
174 	if ((cur = dip->di_free) != NULL) {
175 		assert(cur->ds_next == NULL);	/* should only be one */
176 		uu_free(cur);
177 		dip->di_free = NULL;
178 	}
179 }
180 
181 struct delete_cb_info {
182 	delete_info_t	*dci_dip;
183 	uint32_t	dci_be;
184 	delete_cb_func	*dci_cb;
185 	int		dci_result;
186 };
187 
188 /*ARGSUSED*/
189 static int
push_delete_callback(void * data,int columns,char ** vals,char ** names)190 push_delete_callback(void *data, int columns, char **vals, char **names)
191 {
192 	struct delete_cb_info *info = data;
193 
194 	const char *id_str = *vals++;
195 	const char *gen_str = *vals++;
196 
197 	uint32_t id;
198 	uint32_t gen;
199 
200 	assert(columns == 2);
201 
202 	string_to_id(id_str, &id, "id");
203 	string_to_id(gen_str, &gen, "gen_id");
204 
205 	info->dci_result = delete_stack_push(info->dci_dip, info->dci_be,
206 	    info->dci_cb, id, gen);
207 
208 	if (info->dci_result != REP_PROTOCOL_SUCCESS)
209 		return (BACKEND_CALLBACK_ABORT);
210 	return (BACKEND_CALLBACK_CONTINUE);
211 }
212 
213 static int
value_delete(delete_info_t * dip,const delete_ent_t * ent)214 value_delete(delete_info_t *dip, const delete_ent_t *ent)
215 {
216 	uint32_t be = ent->de_backend;
217 	int r;
218 
219 	backend_query_t *q;
220 
221 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
222 	    dip->di_np_tx;
223 
224 	q = backend_query_alloc();
225 
226 	backend_query_add(q,
227 	    "SELECT 1 FROM prop_lnk_tbl WHERE (lnk_val_id = %d); "
228 	    "DELETE FROM value_tbl WHERE (value_id = %d); ",
229 	    ent->de_id, ent->de_id);
230 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
231 	backend_query_free(q);
232 	if (r == REP_PROTOCOL_DONE)
233 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
234 	return (r);
235 }
236 
237 static int
pg_lnk_tbl_delete(delete_info_t * dip,const delete_ent_t * ent)238 pg_lnk_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
239 {
240 	struct delete_cb_info info;
241 	uint32_t be = ent->de_backend;
242 	int r;
243 
244 	backend_query_t *q;
245 
246 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
247 	    dip->di_np_tx;
248 
249 	/*
250 	 * For non-persistent backends, we could only have one parent, and
251 	 * it's already been deleted.
252 	 *
253 	 * For normal backends, we need to check to see if we're in
254 	 * a snapshot or are the active generation for the property
255 	 * group.  If we are, there's nothing to be done.
256 	 */
257 	if (be == BACKEND_TYPE_NORMAL) {
258 		q = backend_query_alloc();
259 		backend_query_add(q,
260 		    "SELECT 1 "
261 		    "FROM pg_tbl "
262 		    "WHERE (pg_id = %d AND pg_gen_id = %d); "
263 		    "SELECT 1 "
264 		    "FROM snaplevel_lnk_tbl "
265 		    "WHERE (snaplvl_pg_id = %d AND snaplvl_gen_id = %d);",
266 		    ent->de_id, ent->de_gen,
267 		    ent->de_id, ent->de_gen);
268 		r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
269 		backend_query_free(q);
270 
271 		if (r == REP_PROTOCOL_DONE)
272 			return (REP_PROTOCOL_SUCCESS);	/* still in use */
273 	}
274 
275 	info.dci_dip = dip;
276 	info.dci_be =  be;
277 	info.dci_cb = &value_delete;
278 	info.dci_result = REP_PROTOCOL_SUCCESS;
279 
280 	q = backend_query_alloc();
281 	backend_query_add(q,
282 	    "SELECT DISTINCT lnk_val_id, 0 FROM prop_lnk_tbl "
283 	    "WHERE "
284 	    "    (lnk_pg_id = %d AND lnk_gen_id = %d AND lnk_val_id NOTNULL); "
285 	    "DELETE FROM prop_lnk_tbl "
286 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
287 	    ent->de_id, ent->de_gen, ent->de_id, ent->de_gen);
288 
289 	r = backend_tx_run(tx, q, push_delete_callback, &info);
290 	backend_query_free(q);
291 
292 	if (r == REP_PROTOCOL_DONE) {
293 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
294 		return (info.dci_result);
295 	}
296 	return (r);
297 }
298 
299 static int
propertygrp_delete(delete_info_t * dip,const delete_ent_t * ent)300 propertygrp_delete(delete_info_t *dip, const delete_ent_t *ent)
301 {
302 	uint32_t be = ent->de_backend;
303 	backend_query_t *q;
304 	uint32_t gen;
305 
306 	int r;
307 
308 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
309 	    dip->di_np_tx;
310 
311 	q = backend_query_alloc();
312 	backend_query_add(q,
313 	    "SELECT pg_gen_id FROM pg_tbl WHERE pg_id = %d; "
314 	    "DELETE FROM pg_tbl WHERE pg_id = %d",
315 	    ent->de_id, ent->de_id);
316 	r = backend_tx_run_single_int(tx, q, &gen);
317 	backend_query_free(q);
318 
319 	if (r != REP_PROTOCOL_SUCCESS)
320 		return (r);
321 
322 	return (delete_stack_push(dip, be, &pg_lnk_tbl_delete,
323 	    ent->de_id, gen));
324 }
325 
326 static int
snaplevel_lnk_delete(delete_info_t * dip,const delete_ent_t * ent)327 snaplevel_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
328 {
329 	uint32_t be = ent->de_backend;
330 	backend_query_t *q;
331 	struct delete_cb_info info;
332 
333 	int r;
334 
335 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
336 	    dip->di_np_tx;
337 
338 	info.dci_dip = dip;
339 	info.dci_be = be;
340 	info.dci_cb = &pg_lnk_tbl_delete;
341 	info.dci_result = REP_PROTOCOL_SUCCESS;
342 
343 	q = backend_query_alloc();
344 	backend_query_add(q,
345 	    "SELECT snaplvl_pg_id, snaplvl_gen_id "
346 	    "    FROM snaplevel_lnk_tbl "
347 	    "    WHERE snaplvl_level_id = %d; "
348 	    "DELETE FROM snaplevel_lnk_tbl WHERE snaplvl_level_id = %d",
349 	    ent->de_id, ent->de_id);
350 	r = backend_tx_run(tx, q, push_delete_callback, &info);
351 	backend_query_free(q);
352 
353 	if (r == REP_PROTOCOL_DONE) {
354 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
355 		return (info.dci_result);
356 	}
357 	return (r);
358 }
359 
360 static int
snaplevel_tbl_delete(delete_info_t * dip,const delete_ent_t * ent)361 snaplevel_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
362 {
363 	uint32_t be = ent->de_backend;
364 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
365 	    dip->di_np_tx;
366 
367 	struct delete_cb_info info;
368 	backend_query_t *q;
369 	int r;
370 
371 	assert(be == BACKEND_TYPE_NORMAL);
372 
373 	q = backend_query_alloc();
374 	backend_query_add(q,
375 	    "SELECT 1 FROM snapshot_lnk_tbl WHERE lnk_snap_id = %d",
376 	    ent->de_id);
377 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
378 	backend_query_free(q);
379 
380 	if (r == REP_PROTOCOL_DONE)
381 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
382 
383 	info.dci_dip = dip;
384 	info.dci_be = be;
385 	info.dci_cb = &snaplevel_lnk_delete;
386 	info.dci_result = REP_PROTOCOL_SUCCESS;
387 
388 	q = backend_query_alloc();
389 	backend_query_add(q,
390 	    "SELECT snap_level_id, 0 FROM snaplevel_tbl WHERE snap_id = %d;"
391 	    "DELETE FROM snaplevel_tbl WHERE snap_id = %d",
392 	    ent->de_id, ent->de_id);
393 	r = backend_tx_run(tx, q, push_delete_callback, &info);
394 	backend_query_free(q);
395 
396 	if (r == REP_PROTOCOL_DONE) {
397 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
398 		return (info.dci_result);
399 	}
400 	return (r);
401 }
402 
403 static int
snapshot_lnk_delete(delete_info_t * dip,const delete_ent_t * ent)404 snapshot_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
405 {
406 	uint32_t be = ent->de_backend;
407 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
408 	    dip->di_np_tx;
409 
410 	backend_query_t *q;
411 	uint32_t snapid;
412 	int r;
413 
414 	assert(be == BACKEND_TYPE_NORMAL);
415 
416 	q = backend_query_alloc();
417 	backend_query_add(q,
418 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
419 	    "DELETE FROM snapshot_lnk_tbl WHERE lnk_id = %d",
420 	    ent->de_id, ent->de_id);
421 	r = backend_tx_run_single_int(tx, q, &snapid);
422 	backend_query_free(q);
423 
424 	if (r != REP_PROTOCOL_SUCCESS)
425 		return (r);
426 
427 	return (delete_stack_push(dip, be, &snaplevel_tbl_delete, snapid, 0));
428 }
429 
430 static int
pgparent_delete_add_pgs(delete_info_t * dip,uint32_t parent_id)431 pgparent_delete_add_pgs(delete_info_t *dip, uint32_t parent_id)
432 {
433 	struct delete_cb_info info;
434 	backend_query_t *q;
435 	int r;
436 
437 	info.dci_dip = dip;
438 	info.dci_be = BACKEND_TYPE_NORMAL;
439 	info.dci_cb = &propertygrp_delete;
440 	info.dci_result = REP_PROTOCOL_SUCCESS;
441 
442 	q = backend_query_alloc();
443 	backend_query_add(q,
444 	    "SELECT pg_id, 0 FROM pg_tbl WHERE pg_parent_id = %d",
445 	    parent_id);
446 
447 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
448 
449 	if (r == REP_PROTOCOL_DONE) {
450 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
451 		backend_query_free(q);
452 		return (info.dci_result);
453 	}
454 	if (r != REP_PROTOCOL_SUCCESS) {
455 		backend_query_free(q);
456 		return (r);
457 	}
458 
459 	if (dip->di_np_tx != NULL) {
460 		info.dci_be = BACKEND_TYPE_NONPERSIST;
461 
462 		r = backend_tx_run(dip->di_np_tx, q, push_delete_callback,
463 		    &info);
464 
465 		if (r == REP_PROTOCOL_DONE) {
466 			assert(info.dci_result != REP_PROTOCOL_SUCCESS);
467 			backend_query_free(q);
468 			return (info.dci_result);
469 		}
470 		if (r != REP_PROTOCOL_SUCCESS) {
471 			backend_query_free(q);
472 			return (r);
473 		}
474 	}
475 	backend_query_free(q);
476 	return (REP_PROTOCOL_SUCCESS);
477 }
478 
479 static int
service_delete(delete_info_t * dip,const delete_ent_t * ent)480 service_delete(delete_info_t *dip, const delete_ent_t *ent)
481 {
482 	int r;
483 
484 	r = backend_tx_run_update_changed(dip->di_tx,
485 	    "DELETE FROM service_tbl WHERE svc_id = %d", ent->de_id);
486 	if (r != REP_PROTOCOL_SUCCESS)
487 		return (r);
488 
489 	return (pgparent_delete_add_pgs(dip, ent->de_id));
490 }
491 
492 static int
instance_delete(delete_info_t * dip,const delete_ent_t * ent)493 instance_delete(delete_info_t *dip, const delete_ent_t *ent)
494 {
495 	struct delete_cb_info info;
496 	int r;
497 	backend_query_t *q;
498 
499 	r = backend_tx_run_update_changed(dip->di_tx,
500 	    "DELETE FROM instance_tbl WHERE instance_id = %d", ent->de_id);
501 	if (r != REP_PROTOCOL_SUCCESS)
502 		return (r);
503 
504 	r = pgparent_delete_add_pgs(dip, ent->de_id);
505 	if (r != REP_PROTOCOL_SUCCESS)
506 		return (r);
507 
508 	info.dci_dip = dip;
509 	info.dci_be = BACKEND_TYPE_NORMAL;
510 	info.dci_cb = &snapshot_lnk_delete;
511 	info.dci_result = REP_PROTOCOL_SUCCESS;
512 
513 	q = backend_query_alloc();
514 	backend_query_add(q,
515 	    "SELECT lnk_id, 0 FROM snapshot_lnk_tbl WHERE lnk_inst_id = %d",
516 	    ent->de_id);
517 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
518 	backend_query_free(q);
519 
520 	if (r == REP_PROTOCOL_DONE) {
521 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
522 		return (info.dci_result);
523 	}
524 	return (r);
525 }
526 
527 /*ARGSUSED*/
528 static int
fill_child_callback(void * data,int columns,char ** vals,char ** names)529 fill_child_callback(void *data, int columns, char **vals, char **names)
530 {
531 	child_info_t *cp = data;
532 	rc_node_t *np;
533 	uint32_t main_id;
534 	const char *name;
535 	const char *cur;
536 	rc_node_lookup_t *lp = &cp->ci_base_nl;
537 
538 	assert(columns == 2);
539 
540 	name = *vals++;
541 	columns--;
542 
543 	cur = *vals++;
544 	columns--;
545 	string_to_id(cur, &main_id, "id");
546 
547 	lp->rl_main_id = main_id;
548 
549 	if ((np = rc_node_alloc()) == NULL)
550 		return (BACKEND_CALLBACK_ABORT);
551 
552 	np = rc_node_setup(np, lp, name, cp->ci_parent);
553 	rc_node_rele(np);
554 
555 	return (BACKEND_CALLBACK_CONTINUE);
556 }
557 
558 /*ARGSUSED*/
559 static int
fill_snapshot_callback(void * data,int columns,char ** vals,char ** names)560 fill_snapshot_callback(void *data, int columns, char **vals, char **names)
561 {
562 	child_info_t *cp = data;
563 	rc_node_t *np;
564 	uint32_t main_id;
565 	uint32_t snap_id;
566 	const char *name;
567 	const char *cur;
568 	const char *snap;
569 	rc_node_lookup_t *lp = &cp->ci_base_nl;
570 
571 	assert(columns == 3);
572 
573 	name = *vals++;
574 	columns--;
575 
576 	cur = *vals++;
577 	columns--;
578 	snap = *vals++;
579 	columns--;
580 
581 	string_to_id(cur, &main_id, "lnk_id");
582 	string_to_id(snap, &snap_id, "lnk_snap_id");
583 
584 	lp->rl_main_id = main_id;
585 
586 	if ((np = rc_node_alloc()) == NULL)
587 		return (BACKEND_CALLBACK_ABORT);
588 
589 	np = rc_node_setup_snapshot(np, lp, name, snap_id, cp->ci_parent);
590 	rc_node_rele(np);
591 
592 	return (BACKEND_CALLBACK_CONTINUE);
593 }
594 
595 /*ARGSUSED*/
596 static int
fill_pg_callback(void * data,int columns,char ** vals,char ** names)597 fill_pg_callback(void *data, int columns, char **vals, char **names)
598 {
599 	child_info_t *cip = data;
600 	const char *name;
601 	const char *type;
602 	const char *cur;
603 	uint32_t main_id;
604 	uint32_t flags;
605 	uint32_t gen_id;
606 
607 	rc_node_lookup_t *lp = &cip->ci_base_nl;
608 	rc_node_t *newnode, *pg;
609 
610 	assert(columns == 5);
611 
612 	name = *vals++;		/* pg_name */
613 	columns--;
614 
615 	cur = *vals++;		/* pg_id */
616 	columns--;
617 	string_to_id(cur, &main_id, "pg_id");
618 
619 	lp->rl_main_id = main_id;
620 
621 	cur = *vals++;		/* pg_gen_id */
622 	columns--;
623 	string_to_id(cur, &gen_id, "pg_gen_id");
624 
625 	type = *vals++;		/* pg_type */
626 	columns--;
627 
628 	cur = *vals++;		/* pg_flags */
629 	columns--;
630 	string_to_id(cur, &flags, "pg_flags");
631 
632 	if ((newnode = rc_node_alloc()) == NULL)
633 		return (BACKEND_CALLBACK_ABORT);
634 
635 	pg = rc_node_setup_pg(newnode, lp, name, type, flags, gen_id,
636 	    cip->ci_parent);
637 	if (pg == NULL) {
638 		rc_node_destroy(newnode);
639 		return (BACKEND_CALLBACK_ABORT);
640 	}
641 
642 	rc_node_rele(pg);
643 
644 	return (BACKEND_CALLBACK_CONTINUE);
645 }
646 
647 struct property_value_info {
648 	char		*pvi_base;
649 	size_t		pvi_pos;
650 	size_t		pvi_size;
651 	size_t		pvi_count;
652 };
653 
654 /*ARGSUSED*/
655 static int
property_value_size_cb(void * data,int columns,char ** vals,char ** names)656 property_value_size_cb(void *data, int columns, char **vals, char **names)
657 {
658 	struct property_value_info *info = data;
659 	assert(columns == 1);
660 
661 	info->pvi_size += strlen(vals[0]) + 1;		/* count the '\0' */
662 
663 	return (BACKEND_CALLBACK_CONTINUE);
664 }
665 
666 /*ARGSUSED*/
667 static int
property_value_cb(void * data,int columns,char ** vals,char ** names)668 property_value_cb(void *data, int columns, char **vals, char **names)
669 {
670 	struct property_value_info *info = data;
671 	size_t pos, left, len;
672 
673 	assert(columns == 1);
674 	pos = info->pvi_pos;
675 	left = info->pvi_size - pos;
676 
677 	pos = info->pvi_pos;
678 	left = info->pvi_size - pos;
679 
680 	if ((len = strlcpy(&info->pvi_base[pos], vals[0], left)) >= left) {
681 		/*
682 		 * since we preallocated, above, this shouldn't happen
683 		 */
684 		backend_panic("unexpected database change");
685 	}
686 
687 	len += 1;	/* count the '\0' */
688 
689 	info->pvi_pos += len;
690 	info->pvi_count++;
691 
692 	return (BACKEND_CALLBACK_CONTINUE);
693 }
694 
695 /*ARGSUSED*/
696 void
object_free_values(const char * vals,uint32_t type,size_t count,size_t size)697 object_free_values(const char *vals, uint32_t type, size_t count, size_t size)
698 {
699 	if (vals != NULL)
700 		uu_free((void *)vals);
701 }
702 
703 /*ARGSUSED*/
704 static int
fill_property_callback(void * data,int columns,char ** vals,char ** names)705 fill_property_callback(void *data, int columns, char **vals, char **names)
706 {
707 	child_info_t *cp = data;
708 	backend_tx_t *tx = cp->ci_tx;
709 	uint32_t main_id;
710 	const char *name;
711 	const char *cur;
712 	rep_protocol_value_type_t type;
713 	rc_node_lookup_t *lp = &cp->ci_base_nl;
714 	struct property_value_info info;
715 	int rc;
716 
717 	assert(columns == 4);
718 	assert(tx != NULL);
719 
720 	info.pvi_base = NULL;
721 	info.pvi_pos = 0;
722 	info.pvi_size = 0;
723 	info.pvi_count = 0;
724 
725 	name = *vals++;
726 
727 	cur = *vals++;
728 	string_to_id(cur, &main_id, "lnk_prop_id");
729 
730 	cur = *vals++;
731 	assert(('a' <= cur[0] && 'z' >= cur[0]) ||
732 	    ('A' <= cur[0] && 'Z' >= cur[0]) &&
733 	    (cur[1] == 0 || ('a' <= cur[1] && 'z' >= cur[1]) ||
734 	    ('A' <= cur[1] && 'Z' >= cur[1])));
735 	type = cur[0] | (cur[1] << 8);
736 
737 	lp->rl_main_id = main_id;
738 
739 	/*
740 	 * fill in the values, if any
741 	 */
742 	if ((cur = *vals++) != NULL) {
743 		rep_protocol_responseid_t r;
744 		backend_query_t *q = backend_query_alloc();
745 
746 		/*
747 		 * Ensure that select operation is reflective
748 		 * of repository schema.  If the repository has
749 		 * been upgraded,  make use of value ordering
750 		 * by retrieving values in order using the
751 		 * value_order column.  Otherwise, simply
752 		 * run the select with no order specified.
753 		 * The order-insensitive select is necessary
754 		 * as on first reboot post-upgrade,  the repository
755 		 * contents need to be read before the repository
756 		 * backend is writable (and upgrade is possible).
757 		 */
758 		if (backend_is_upgraded(tx)) {
759 			backend_query_add(q,
760 			    "SELECT value_value FROM value_tbl "
761 			    "WHERE (value_id = '%q') ORDER BY value_order",
762 			    cur);
763 		} else {
764 			backend_query_add(q,
765 			    "SELECT value_value FROM value_tbl "
766 			    "WHERE (value_id = '%q')",
767 			    cur);
768 		}
769 
770 		switch (r = backend_tx_run(tx, q, property_value_size_cb,
771 		    &info)) {
772 		case REP_PROTOCOL_SUCCESS:
773 			break;
774 
775 		case REP_PROTOCOL_FAIL_NO_RESOURCES:
776 			backend_query_free(q);
777 			return (BACKEND_CALLBACK_ABORT);
778 
779 		case REP_PROTOCOL_DONE:
780 		default:
781 			backend_panic("backend_tx_run() returned %d", r);
782 		}
783 		if (info.pvi_size > 0) {
784 			info.pvi_base = uu_zalloc(info.pvi_size);
785 			if (info.pvi_base == NULL) {
786 				backend_query_free(q);
787 				return (BACKEND_CALLBACK_ABORT);
788 			}
789 			switch (r = backend_tx_run(tx, q, property_value_cb,
790 			    &info)) {
791 			case REP_PROTOCOL_SUCCESS:
792 				break;
793 
794 			case REP_PROTOCOL_FAIL_NO_RESOURCES:
795 				uu_free(info.pvi_base);
796 				backend_query_free(q);
797 				return (BACKEND_CALLBACK_ABORT);
798 
799 			case REP_PROTOCOL_DONE:
800 			default:
801 				backend_panic("backend_tx_run() returned %d",
802 				    r);
803 			}
804 		}
805 		backend_query_free(q);
806 	}
807 
808 	rc = rc_node_create_property(cp->ci_parent, lp, name, type,
809 	    info.pvi_base, info.pvi_count, info.pvi_size);
810 	if (rc != REP_PROTOCOL_SUCCESS) {
811 		assert(rc == REP_PROTOCOL_FAIL_NO_RESOURCES);
812 		return (BACKEND_CALLBACK_ABORT);
813 	}
814 
815 	return (BACKEND_CALLBACK_CONTINUE);
816 }
817 
818 /*
819  * The *_setup_child_info() functions fill in a child_info_t structure with the
820  * information for the children of np with type type.
821  *
822  * They fail with
823  *   _TYPE_MISMATCH - object cannot have children of type type
824  */
825 
826 static int
scope_setup_child_info(rc_node_t * np,uint32_t type,child_info_t * cip)827 scope_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
828 {
829 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
830 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
831 
832 	bzero(cip, sizeof (*cip));
833 	cip->ci_parent = np;
834 	cip->ci_base_nl.rl_type = type;
835 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
836 	return (REP_PROTOCOL_SUCCESS);
837 }
838 
839 static int
service_setup_child_info(rc_node_t * np,uint32_t type,child_info_t * cip)840 service_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
841 {
842 	switch (type) {
843 	case REP_PROTOCOL_ENTITY_INSTANCE:
844 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
845 		break;
846 	default:
847 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
848 	}
849 
850 	bzero(cip, sizeof (*cip));
851 	cip->ci_parent = np;
852 	cip->ci_base_nl.rl_type = type;
853 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
854 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_main_id;
855 
856 	return (REP_PROTOCOL_SUCCESS);
857 }
858 
859 static int
instance_setup_child_info(rc_node_t * np,uint32_t type,child_info_t * cip)860 instance_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
861 {
862 	switch (type) {
863 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
864 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
865 		break;
866 	default:
867 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
868 	}
869 
870 	bzero(cip, sizeof (*cip));
871 	cip->ci_parent = np;
872 	cip->ci_base_nl.rl_type = type;
873 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
874 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
875 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_main_id;
876 
877 	return (REP_PROTOCOL_SUCCESS);
878 }
879 
880 static int
snaplevel_setup_child_info(rc_node_t * np,uint32_t type,child_info_t * cip)881 snaplevel_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
882 {
883 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
884 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
885 
886 	bzero(cip, sizeof (*cip));
887 	cip->ci_parent = np;
888 	cip->ci_base_nl.rl_type = type;
889 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
890 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
891 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
892 	cip->ci_base_nl.rl_ids[ID_NAME] = np->rn_id.rl_ids[ID_NAME];
893 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = np->rn_id.rl_ids[ID_SNAPSHOT];
894 	cip->ci_base_nl.rl_ids[ID_LEVEL] = np->rn_id.rl_main_id;
895 
896 	return (REP_PROTOCOL_SUCCESS);
897 }
898 
899 static int
propertygrp_setup_child_info(rc_node_t * pg,uint32_t type,child_info_t * cip)900 propertygrp_setup_child_info(rc_node_t *pg, uint32_t type, child_info_t *cip)
901 {
902 	if (type != REP_PROTOCOL_ENTITY_PROPERTY)
903 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
904 
905 	bzero(cip, sizeof (*cip));
906 	cip->ci_parent = pg;
907 	cip->ci_base_nl.rl_type = type;
908 	cip->ci_base_nl.rl_backend = pg->rn_id.rl_backend;
909 	cip->ci_base_nl.rl_ids[ID_SERVICE] = pg->rn_id.rl_ids[ID_SERVICE];
910 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = pg->rn_id.rl_ids[ID_INSTANCE];
911 	cip->ci_base_nl.rl_ids[ID_PG] = pg->rn_id.rl_main_id;
912 	cip->ci_base_nl.rl_ids[ID_GEN] = pg->rn_gen_id;
913 	cip->ci_base_nl.rl_ids[ID_NAME] = pg->rn_id.rl_ids[ID_NAME];
914 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = pg->rn_id.rl_ids[ID_SNAPSHOT];
915 	cip->ci_base_nl.rl_ids[ID_LEVEL] = pg->rn_id.rl_ids[ID_LEVEL];
916 
917 	return (REP_PROTOCOL_SUCCESS);
918 }
919 
920 /*
921  * The *_fill_children() functions populate the children of the given rc_node_t
922  * by querying the database and calling rc_node_setup_*() functions (usually
923  * via a fill_*_callback()).
924  *
925  * They fail with
926  *   _NO_RESOURCES
927  */
928 
929 /*
930  * Returns
931  *   _NO_RESOURCES
932  *   _SUCCESS
933  */
934 static int
scope_fill_children(rc_node_t * np)935 scope_fill_children(rc_node_t *np)
936 {
937 	backend_query_t *q;
938 	child_info_t ci;
939 	int res;
940 
941 	(void) scope_setup_child_info(np, REP_PROTOCOL_ENTITY_SERVICE, &ci);
942 
943 	q = backend_query_alloc();
944 	backend_query_append(q, "SELECT svc_name, svc_id FROM service_tbl");
945 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
946 	backend_query_free(q);
947 
948 	if (res == REP_PROTOCOL_DONE)
949 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
950 	return (res);
951 }
952 
953 /*
954  * Returns
955  *   _NO_RESOURCES
956  *   _SUCCESS
957  */
958 static int
service_fill_children(rc_node_t * np)959 service_fill_children(rc_node_t *np)
960 {
961 	backend_query_t *q;
962 	child_info_t ci;
963 	int res;
964 
965 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
966 
967 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_INSTANCE, &ci);
968 
969 	q = backend_query_alloc();
970 	backend_query_add(q,
971 	    "SELECT instance_name, instance_id FROM instance_tbl"
972 	    "    WHERE (instance_svc = %d)",
973 	    np->rn_id.rl_main_id);
974 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
975 	backend_query_free(q);
976 
977 	if (res == REP_PROTOCOL_DONE)
978 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
979 	if (res != REP_PROTOCOL_SUCCESS)
980 		return (res);
981 
982 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
983 	    &ci);
984 
985 	q = backend_query_alloc();
986 	backend_query_add(q,
987 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
988 	    "    WHERE (pg_parent_id = %d)",
989 	    np->rn_id.rl_main_id);
990 
991 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
992 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
993 	if (res == REP_PROTOCOL_SUCCESS) {
994 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
995 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
996 		    fill_pg_callback, &ci);
997 		/* nonpersistant database may not exist */
998 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
999 			res = REP_PROTOCOL_SUCCESS;
1000 	}
1001 	if (res == REP_PROTOCOL_DONE)
1002 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1003 	backend_query_free(q);
1004 
1005 	return (res);
1006 }
1007 
1008 /*
1009  * Returns
1010  *   _NO_RESOURCES
1011  *   _SUCCESS
1012  */
1013 static int
instance_fill_children(rc_node_t * np)1014 instance_fill_children(rc_node_t *np)
1015 {
1016 	backend_query_t *q;
1017 	child_info_t ci;
1018 	int res;
1019 
1020 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
1021 
1022 	/* Get child property groups */
1023 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1024 	    &ci);
1025 
1026 	q = backend_query_alloc();
1027 	backend_query_add(q,
1028 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
1029 	    "    WHERE (pg_parent_id = %d)",
1030 	    np->rn_id.rl_main_id);
1031 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
1032 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1033 	if (res == REP_PROTOCOL_SUCCESS) {
1034 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
1035 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
1036 		    fill_pg_callback, &ci);
1037 		/* nonpersistant database may not exist */
1038 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
1039 			res = REP_PROTOCOL_SUCCESS;
1040 	}
1041 	if (res == REP_PROTOCOL_DONE)
1042 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1043 	backend_query_free(q);
1044 
1045 	if (res != REP_PROTOCOL_SUCCESS)
1046 		return (res);
1047 
1048 	/* Get child snapshots */
1049 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_SNAPSHOT,
1050 	    &ci);
1051 
1052 	q = backend_query_alloc();
1053 	backend_query_add(q,
1054 	    "SELECT lnk_snap_name, lnk_id, lnk_snap_id FROM snapshot_lnk_tbl"
1055 	    "    WHERE (lnk_inst_id = %d)",
1056 	    np->rn_id.rl_main_id);
1057 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_callback, &ci);
1058 	if (res == REP_PROTOCOL_DONE)
1059 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1060 	backend_query_free(q);
1061 
1062 	return (res);
1063 }
1064 
1065 /*
1066  * Returns
1067  *   _NO_RESOURCES
1068  *   _SUCCESS
1069  */
1070 static int
snapshot_fill_children(rc_node_t * np)1071 snapshot_fill_children(rc_node_t *np)
1072 {
1073 	rc_node_t *nnp;
1074 	rc_snapshot_t *sp, *oldsp;
1075 	rc_snaplevel_t *lvl;
1076 	rc_node_lookup_t nl;
1077 	int r;
1078 
1079 	/* Get the rc_snapshot_t (& its rc_snaplevel_t's). */
1080 	(void) pthread_mutex_lock(&np->rn_lock);
1081 	sp = np->rn_snapshot;
1082 	(void) pthread_mutex_unlock(&np->rn_lock);
1083 	if (sp == NULL) {
1084 		r = rc_snapshot_get(np->rn_snapshot_id, &sp);
1085 		if (r != REP_PROTOCOL_SUCCESS) {
1086 			assert(r == REP_PROTOCOL_FAIL_NO_RESOURCES);
1087 			return (r);
1088 		}
1089 		(void) pthread_mutex_lock(&np->rn_lock);
1090 		oldsp = np->rn_snapshot;
1091 		assert(oldsp == NULL || oldsp == sp);
1092 		np->rn_snapshot = sp;
1093 		(void) pthread_mutex_unlock(&np->rn_lock);
1094 		if (oldsp != NULL)
1095 			rc_snapshot_rele(oldsp);
1096 	}
1097 
1098 	bzero(&nl, sizeof (nl));
1099 	nl.rl_type = REP_PROTOCOL_ENTITY_SNAPLEVEL;
1100 	nl.rl_backend = np->rn_id.rl_backend;
1101 	nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
1102 	nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
1103 	nl.rl_ids[ID_NAME] = np->rn_id.rl_main_id;
1104 	nl.rl_ids[ID_SNAPSHOT] = np->rn_snapshot_id;
1105 
1106 	/* Create rc_node_t's for the snapshot's rc_snaplevel_t's. */
1107 	for (lvl = sp->rs_levels; lvl != NULL; lvl = lvl->rsl_next) {
1108 		nnp = rc_node_alloc();
1109 		assert(nnp != NULL);
1110 		nl.rl_main_id = lvl->rsl_level_id;
1111 		nnp = rc_node_setup_snaplevel(nnp, &nl, lvl, np);
1112 		rc_node_rele(nnp);
1113 	}
1114 
1115 	return (REP_PROTOCOL_SUCCESS);
1116 }
1117 
1118 /*
1119  * Returns
1120  *   _NO_RESOURCES
1121  *   _SUCCESS
1122  */
1123 static int
snaplevel_fill_children(rc_node_t * np)1124 snaplevel_fill_children(rc_node_t *np)
1125 {
1126 	rc_snaplevel_t *lvl = np->rn_snaplevel;
1127 	child_info_t ci;
1128 	int res;
1129 	backend_query_t *q;
1130 
1131 	(void) snaplevel_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1132 	    &ci);
1133 
1134 	q = backend_query_alloc();
1135 	backend_query_add(q,
1136 	    "SELECT snaplvl_pg_name, snaplvl_pg_id, snaplvl_gen_id, "
1137 	    "    snaplvl_pg_type, snaplvl_pg_flags "
1138 	    "    FROM snaplevel_lnk_tbl "
1139 	    "    WHERE (snaplvl_level_id = %d)",
1140 	    lvl->rsl_level_id);
1141 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1142 	if (res == REP_PROTOCOL_DONE)
1143 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1144 	backend_query_free(q);
1145 
1146 	return (res);
1147 }
1148 
1149 /*
1150  * Returns
1151  *   _NO_RESOURCES
1152  *   _SUCCESS
1153  */
1154 static int
propertygrp_fill_children(rc_node_t * np)1155 propertygrp_fill_children(rc_node_t *np)
1156 {
1157 	backend_query_t *q;
1158 	child_info_t ci;
1159 	int res;
1160 	backend_tx_t *tx;
1161 
1162 	backend_type_t backend = np->rn_id.rl_backend;
1163 
1164 	(void) propertygrp_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTY,
1165 	    &ci);
1166 
1167 	res = backend_tx_begin_ro(backend, &tx);
1168 	if (res != REP_PROTOCOL_SUCCESS) {
1169 		/*
1170 		 * If the backend didn't exist, we wouldn't have got this
1171 		 * property group.
1172 		 */
1173 		assert(res != REP_PROTOCOL_FAIL_BACKEND_ACCESS);
1174 		return (res);
1175 	}
1176 
1177 	ci.ci_tx = tx;
1178 
1179 	q = backend_query_alloc();
1180 	backend_query_add(q,
1181 	    "SELECT lnk_prop_name, lnk_prop_id, lnk_prop_type, lnk_val_id "
1182 	    "FROM prop_lnk_tbl "
1183 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
1184 	    np->rn_id.rl_main_id, np->rn_gen_id);
1185 	res = backend_tx_run(tx, q, fill_property_callback, &ci);
1186 	if (res == REP_PROTOCOL_DONE)
1187 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1188 	backend_query_free(q);
1189 	backend_tx_end_ro(tx);
1190 
1191 	return (res);
1192 }
1193 
1194 /*
1195  * Fails with
1196  *   _TYPE_MISMATCH - lp is not for a service
1197  *   _INVALID_TYPE - lp has invalid type
1198  *   _BAD_REQUEST - name is invalid
1199  */
1200 static int
scope_query_child(backend_query_t * q,rc_node_lookup_t * lp,const char * name)1201 scope_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1202 {
1203 	uint32_t type = lp->rl_type;
1204 	int rc;
1205 
1206 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
1207 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1208 
1209 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1210 		return (rc);
1211 
1212 	backend_query_add(q,
1213 	    "SELECT svc_id FROM service_tbl "
1214 	    "WHERE svc_name = '%q'",
1215 	    name);
1216 
1217 	return (REP_PROTOCOL_SUCCESS);
1218 }
1219 
1220 /*
1221  * Fails with
1222  *   _NO_RESOURCES - out of memory
1223  */
1224 static int
scope_insert_child(backend_tx_t * tx,rc_node_lookup_t * lp,const char * name)1225 scope_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1226 {
1227 	return (backend_tx_run_update(tx,
1228 	    "INSERT INTO service_tbl (svc_id, svc_name) "
1229 	    "VALUES (%d, '%q')",
1230 	    lp->rl_main_id, name));
1231 }
1232 
1233 /*
1234  * Fails with
1235  *   _TYPE_MISMATCH - lp is not for an instance or property group
1236  *   _INVALID_TYPE - lp has invalid type
1237  *   _BAD_REQUEST - name is invalid
1238  */
1239 static int
service_query_child(backend_query_t * q,rc_node_lookup_t * lp,const char * name)1240 service_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1241 {
1242 	uint32_t type = lp->rl_type;
1243 	int rc;
1244 
1245 	if (type != REP_PROTOCOL_ENTITY_INSTANCE &&
1246 	    type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
1247 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1248 
1249 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1250 		return (rc);
1251 
1252 	switch (type) {
1253 	case REP_PROTOCOL_ENTITY_INSTANCE:
1254 		backend_query_add(q,
1255 		    "SELECT instance_id FROM instance_tbl "
1256 		    "WHERE instance_name = '%q' AND instance_svc = %d",
1257 		    name, lp->rl_ids[ID_SERVICE]);
1258 		break;
1259 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1260 		backend_query_add(q,
1261 		    "SELECT pg_id FROM pg_tbl "
1262 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1263 		    name, lp->rl_ids[ID_SERVICE]);
1264 		break;
1265 	default:
1266 		assert(0);
1267 		abort();
1268 	}
1269 
1270 	return (REP_PROTOCOL_SUCCESS);
1271 }
1272 
1273 /*
1274  * Fails with
1275  *   _NO_RESOURCES - out of memory
1276  */
1277 static int
service_insert_child(backend_tx_t * tx,rc_node_lookup_t * lp,const char * name)1278 service_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1279 {
1280 	return (backend_tx_run_update(tx,
1281 	    "INSERT INTO instance_tbl "
1282 	    "    (instance_id, instance_name, instance_svc) "
1283 	    "VALUES (%d, '%q', %d)",
1284 	    lp->rl_main_id, name, lp->rl_ids[ID_SERVICE]));
1285 }
1286 
1287 /*
1288  * Fails with
1289  *   _NO_RESOURCES - out of memory
1290  */
1291 static int
instance_insert_child(backend_tx_t * tx,rc_node_lookup_t * lp,const char * name)1292 instance_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1293 {
1294 	return (backend_tx_run_update(tx,
1295 	    "INSERT INTO snapshot_lnk_tbl "
1296 	    "    (lnk_id, lnk_inst_id, lnk_snap_name, lnk_snap_id) "
1297 	    "VALUES (%d, %d, '%q', 0)",
1298 	    lp->rl_main_id, lp->rl_ids[ID_INSTANCE], name));
1299 }
1300 
1301 /*
1302  * Fails with
1303  *   _TYPE_MISMATCH - lp is not for a property group or snapshot
1304  *   _INVALID_TYPE - lp has invalid type
1305  *   _BAD_REQUEST - name is invalid
1306  */
1307 static int
instance_query_child(backend_query_t * q,rc_node_lookup_t * lp,const char * name)1308 instance_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1309 {
1310 	uint32_t type = lp->rl_type;
1311 	int rc;
1312 
1313 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP &&
1314 	    type != REP_PROTOCOL_ENTITY_SNAPSHOT)
1315 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1316 
1317 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1318 		return (rc);
1319 
1320 	switch (type) {
1321 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1322 		backend_query_add(q,
1323 		    "SELECT pg_id FROM pg_tbl "
1324 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1325 		    name, lp->rl_ids[ID_INSTANCE]);
1326 		break;
1327 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
1328 		backend_query_add(q,
1329 		    "SELECT lnk_id FROM snapshot_lnk_tbl "
1330 		    "    WHERE lnk_snap_name = '%q' AND lnk_inst_id = %d",
1331 		    name, lp->rl_ids[ID_INSTANCE]);
1332 		break;
1333 	default:
1334 		assert(0);
1335 		abort();
1336 	}
1337 
1338 	return (REP_PROTOCOL_SUCCESS);
1339 }
1340 
1341 static int
generic_insert_pg_child(backend_tx_t * tx,rc_node_lookup_t * lp,const char * name,const char * pgtype,uint32_t flags,uint32_t gen)1342 generic_insert_pg_child(backend_tx_t *tx, rc_node_lookup_t *lp,
1343     const char *name, const char *pgtype, uint32_t flags, uint32_t gen)
1344 {
1345 	int parent_id = (lp->rl_ids[ID_INSTANCE] != 0)?
1346 	    lp->rl_ids[ID_INSTANCE] : lp->rl_ids[ID_SERVICE];
1347 	return (backend_tx_run_update(tx,
1348 	    "INSERT INTO pg_tbl "
1349 	    "    (pg_id, pg_name, pg_parent_id, pg_type, pg_flags, pg_gen_id) "
1350 	    "VALUES (%d, '%q', %d, '%q', %d, %d)",
1351 	    lp->rl_main_id, name, parent_id, pgtype, flags, gen));
1352 }
1353 
1354 static int
service_delete_start(rc_node_t * np,delete_info_t * dip)1355 service_delete_start(rc_node_t *np, delete_info_t *dip)
1356 {
1357 	int r;
1358 	backend_query_t *q = backend_query_alloc();
1359 
1360 	/*
1361 	 * Check for child instances, and refuse to delete if they exist.
1362 	 */
1363 	backend_query_add(q,
1364 	    "SELECT 1 FROM instance_tbl WHERE instance_svc = %d",
1365 	    np->rn_id.rl_main_id);
1366 
1367 	r = backend_tx_run(dip->di_tx, q, backend_fail_if_seen, NULL);
1368 	backend_query_free(q);
1369 
1370 	if (r == REP_PROTOCOL_DONE)
1371 		return (REP_PROTOCOL_FAIL_EXISTS);	/* instances exist */
1372 
1373 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &service_delete,
1374 	    np->rn_id.rl_main_id, 0));
1375 }
1376 
1377 static int
instance_delete_start(rc_node_t * np,delete_info_t * dip)1378 instance_delete_start(rc_node_t *np, delete_info_t *dip)
1379 {
1380 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &instance_delete,
1381 	    np->rn_id.rl_main_id, 0));
1382 }
1383 
1384 static int
snapshot_delete_start(rc_node_t * np,delete_info_t * dip)1385 snapshot_delete_start(rc_node_t *np, delete_info_t *dip)
1386 {
1387 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL,
1388 	    &snapshot_lnk_delete, np->rn_id.rl_main_id, 0));
1389 }
1390 
1391 static int
propertygrp_delete_start(rc_node_t * np,delete_info_t * dip)1392 propertygrp_delete_start(rc_node_t *np, delete_info_t *dip)
1393 {
1394 	return (delete_stack_push(dip, np->rn_id.rl_backend,
1395 	    &propertygrp_delete, np->rn_id.rl_main_id, 0));
1396 }
1397 
1398 static object_info_t info[] = {
1399 	{REP_PROTOCOL_ENTITY_NONE},
1400 	{REP_PROTOCOL_ENTITY_SCOPE,
1401 		BACKEND_ID_INVALID,
1402 		scope_fill_children,
1403 		scope_setup_child_info,
1404 		scope_query_child,
1405 		scope_insert_child,
1406 		NULL,
1407 		NULL,
1408 	},
1409 	{REP_PROTOCOL_ENTITY_SERVICE,
1410 		BACKEND_ID_SERVICE_INSTANCE,
1411 		service_fill_children,
1412 		service_setup_child_info,
1413 		service_query_child,
1414 		service_insert_child,
1415 		generic_insert_pg_child,
1416 		service_delete_start,
1417 	},
1418 	{REP_PROTOCOL_ENTITY_INSTANCE,
1419 		BACKEND_ID_SERVICE_INSTANCE,
1420 		instance_fill_children,
1421 		instance_setup_child_info,
1422 		instance_query_child,
1423 		instance_insert_child,
1424 		generic_insert_pg_child,
1425 		instance_delete_start,
1426 	},
1427 	{REP_PROTOCOL_ENTITY_SNAPSHOT,
1428 		BACKEND_ID_SNAPNAME,
1429 		snapshot_fill_children,
1430 		NULL,
1431 		NULL,
1432 		NULL,
1433 		NULL,
1434 		snapshot_delete_start,
1435 	},
1436 	{REP_PROTOCOL_ENTITY_SNAPLEVEL,
1437 		BACKEND_ID_SNAPLEVEL,
1438 		snaplevel_fill_children,
1439 		snaplevel_setup_child_info,
1440 	},
1441 	{REP_PROTOCOL_ENTITY_PROPERTYGRP,
1442 		BACKEND_ID_PROPERTYGRP,
1443 		propertygrp_fill_children,
1444 		NULL,
1445 		NULL,
1446 		NULL,
1447 		NULL,
1448 		propertygrp_delete_start,
1449 	},
1450 	{REP_PROTOCOL_ENTITY_PROPERTY},
1451 	{-1UL}
1452 };
1453 #define	NUM_INFO (sizeof (info) / sizeof (*info))
1454 
1455 /*
1456  * object_fill_children() populates the child list of an rc_node_t by calling
1457  * the appropriate <type>_fill_children() which runs backend queries that
1458  * call an appropriate fill_*_callback() which takes a row of results,
1459  * decodes them, and calls an rc_node_setup*() function in rc_node.c to create
1460  * a child.
1461  *
1462  * Fails with
1463  *   _NO_RESOURCES
1464  */
1465 int
object_fill_children(rc_node_t * pp)1466 object_fill_children(rc_node_t *pp)
1467 {
1468 	uint32_t type = pp->rn_id.rl_type;
1469 	assert(type > 0 && type < NUM_INFO);
1470 
1471 	return ((*info[type].obj_fill_children)(pp));
1472 }
1473 
1474 int
object_delete(rc_node_t * pp)1475 object_delete(rc_node_t *pp)
1476 {
1477 	int rc;
1478 
1479 	delete_info_t dip;
1480 	delete_ent_t de;
1481 
1482 	uint32_t type = pp->rn_id.rl_type;
1483 	assert(type > 0 && type < NUM_INFO);
1484 
1485 	if (info[type].obj_delete_start == NULL)
1486 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1487 
1488 	(void) memset(&dip, '\0', sizeof (dip));
1489 	rc = backend_tx_begin(BACKEND_TYPE_NORMAL, &dip.di_tx);
1490 	if (rc != REP_PROTOCOL_SUCCESS)
1491 		return (rc);
1492 
1493 	rc = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &dip.di_np_tx);
1494 	if (rc == REP_PROTOCOL_FAIL_BACKEND_ACCESS ||
1495 	    rc == REP_PROTOCOL_FAIL_BACKEND_READONLY)
1496 		dip.di_np_tx = NULL;
1497 	else if (rc != REP_PROTOCOL_SUCCESS) {
1498 		backend_tx_rollback(dip.di_tx);
1499 		return (rc);
1500 	}
1501 
1502 	if ((rc = (*info[type].obj_delete_start)(pp, &dip)) !=
1503 	    REP_PROTOCOL_SUCCESS) {
1504 		goto fail;
1505 	}
1506 
1507 	while (delete_stack_pop(&dip, &de)) {
1508 		rc = (*de.de_cb)(&dip, &de);
1509 		if (rc != REP_PROTOCOL_SUCCESS)
1510 			goto fail;
1511 	}
1512 
1513 	rc = backend_tx_commit(dip.di_tx);
1514 	if (rc != REP_PROTOCOL_SUCCESS)
1515 		backend_tx_rollback(dip.di_np_tx);
1516 	else if (dip.di_np_tx)
1517 		(void) backend_tx_commit(dip.di_np_tx);
1518 
1519 	delete_stack_cleanup(&dip);
1520 
1521 	return (rc);
1522 
1523 fail:
1524 	backend_tx_rollback(dip.di_tx);
1525 	backend_tx_rollback(dip.di_np_tx);
1526 	delete_stack_cleanup(&dip);
1527 	return (rc);
1528 }
1529 
1530 int
object_do_create(backend_tx_t * tx,child_info_t * cip,rc_node_t * pp,uint32_t type,const char * name,rc_node_t ** cpp)1531 object_do_create(backend_tx_t *tx, child_info_t *cip, rc_node_t *pp,
1532     uint32_t type, const char *name, rc_node_t **cpp)
1533 {
1534 	uint32_t ptype = pp->rn_id.rl_type;
1535 
1536 	backend_query_t *q;
1537 	uint32_t id;
1538 	rc_node_t *np = NULL;
1539 	int rc;
1540 	object_info_t *ip;
1541 
1542 	rc_node_lookup_t *lp = &cip->ci_base_nl;
1543 
1544 	assert(ptype > 0 && ptype < NUM_INFO);
1545 
1546 	ip = &info[ptype];
1547 
1548 	if (type == REP_PROTOCOL_ENTITY_PROPERTYGRP)
1549 		return (REP_PROTOCOL_FAIL_NOT_APPLICABLE);
1550 
1551 	if (ip->obj_setup_child_info == NULL ||
1552 	    ip->obj_query_child == NULL ||
1553 	    ip->obj_insert_child == NULL)
1554 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1555 
1556 	if ((rc = (*ip->obj_setup_child_info)(pp, type, cip)) !=
1557 	    REP_PROTOCOL_SUCCESS)
1558 		return (rc);
1559 
1560 	q = backend_query_alloc();
1561 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1562 	    REP_PROTOCOL_SUCCESS) {
1563 		assert(rc == REP_PROTOCOL_FAIL_BAD_REQUEST);
1564 		backend_query_free(q);
1565 		return (rc);
1566 	}
1567 
1568 	rc = backend_tx_run_single_int(tx, q, &id);
1569 	backend_query_free(q);
1570 
1571 	if (rc == REP_PROTOCOL_SUCCESS)
1572 		return (REP_PROTOCOL_FAIL_EXISTS);
1573 	else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND)
1574 		return (rc);
1575 
1576 	if ((lp->rl_main_id = backend_new_id(tx,
1577 	    info[type].obj_id_space)) == 0) {
1578 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1579 	}
1580 
1581 	if ((np = rc_node_alloc()) == NULL)
1582 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1583 
1584 	if ((rc = (*ip->obj_insert_child)(tx, lp, name)) !=
1585 	    REP_PROTOCOL_SUCCESS) {
1586 		rc_node_destroy(np);
1587 		return (rc);
1588 	}
1589 
1590 	*cpp = np;
1591 	return (REP_PROTOCOL_SUCCESS);
1592 }
1593 
1594 /*
1595  * Fails with
1596  *   _NOT_APPLICABLE - type is _PROPERTYGRP
1597  *   _BAD_REQUEST - cannot create children for this type of node
1598  *		    name is invalid
1599  *   _TYPE_MISMATCH - object cannot have children of type type
1600  *   _NO_RESOURCES - out of memory, or could not allocate new id
1601  *   _BACKEND_READONLY
1602  *   _BACKEND_ACCESS
1603  *   _EXISTS - child already exists
1604  */
1605 int
object_create(rc_node_t * pp,uint32_t type,const char * name,rc_node_t ** cpp)1606 object_create(rc_node_t *pp, uint32_t type, const char *name, rc_node_t **cpp)
1607 {
1608 	backend_tx_t *tx;
1609 	rc_node_t *np = NULL;
1610 	child_info_t ci;
1611 	int rc;
1612 
1613 	if ((rc = backend_tx_begin(pp->rn_id.rl_backend, &tx)) !=
1614 	    REP_PROTOCOL_SUCCESS) {
1615 		return (rc);
1616 	}
1617 
1618 	if ((rc = object_do_create(tx, &ci, pp, type, name, &np)) !=
1619 	    REP_PROTOCOL_SUCCESS) {
1620 		backend_tx_rollback(tx);
1621 		return (rc);
1622 	}
1623 
1624 	rc = backend_tx_commit(tx);
1625 	if (rc != REP_PROTOCOL_SUCCESS) {
1626 		rc_node_destroy(np);
1627 		return (rc);
1628 	}
1629 
1630 	*cpp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
1631 
1632 	return (REP_PROTOCOL_SUCCESS);
1633 }
1634 
1635 /*ARGSUSED*/
1636 int
object_create_pg(rc_node_t * pp,uint32_t type,const char * name,const char * pgtype,uint32_t flags,rc_node_t ** cpp)1637 object_create_pg(rc_node_t *pp, uint32_t type, const char *name,
1638     const char *pgtype, uint32_t flags, rc_node_t **cpp)
1639 {
1640 	uint32_t ptype = pp->rn_id.rl_type;
1641 	backend_tx_t *tx_ro, *tx_wr;
1642 	backend_query_t *q;
1643 	uint32_t id;
1644 	uint32_t gen = 0;
1645 	rc_node_t *np = NULL;
1646 	int rc;
1647 	int rc_wr;
1648 	int rc_ro;
1649 	object_info_t *ip;
1650 
1651 	int nonpersist = (flags & SCF_PG_FLAG_NONPERSISTENT);
1652 
1653 	child_info_t ci;
1654 	rc_node_lookup_t *lp = &ci.ci_base_nl;
1655 
1656 	assert(ptype > 0 && ptype < NUM_INFO);
1657 
1658 	if (ptype != REP_PROTOCOL_ENTITY_SERVICE &&
1659 	    ptype != REP_PROTOCOL_ENTITY_INSTANCE)
1660 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1661 
1662 	ip = &info[ptype];
1663 
1664 	assert(ip->obj_setup_child_info != NULL &&
1665 	    ip->obj_query_child != NULL &&
1666 	    ip->obj_insert_pg_child != NULL);
1667 
1668 	if ((rc = (*ip->obj_setup_child_info)(pp, type, &ci)) !=
1669 	    REP_PROTOCOL_SUCCESS)
1670 		return (rc);
1671 
1672 	q = backend_query_alloc();
1673 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1674 	    REP_PROTOCOL_SUCCESS) {
1675 		backend_query_free(q);
1676 		return (rc);
1677 	}
1678 
1679 	if (!nonpersist) {
1680 		lp->rl_backend = BACKEND_TYPE_NORMAL;
1681 		rc_wr = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx_wr);
1682 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NONPERSIST, &tx_ro);
1683 	} else {
1684 		lp->rl_backend = BACKEND_TYPE_NONPERSIST;
1685 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NORMAL, &tx_ro);
1686 		rc_wr = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &tx_wr);
1687 	}
1688 
1689 	if (rc_wr != REP_PROTOCOL_SUCCESS) {
1690 		rc = rc_wr;
1691 		goto fail;
1692 	}
1693 	if (rc_ro != REP_PROTOCOL_SUCCESS &&
1694 	    rc_ro != REP_PROTOCOL_FAIL_BACKEND_ACCESS) {
1695 		rc = rc_ro;
1696 		goto fail;
1697 	}
1698 
1699 	if (tx_ro != NULL) {
1700 		rc = backend_tx_run_single_int(tx_ro, q, &id);
1701 
1702 		if (rc == REP_PROTOCOL_SUCCESS) {
1703 			backend_query_free(q);
1704 			rc = REP_PROTOCOL_FAIL_EXISTS;
1705 			goto fail;
1706 		} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1707 			backend_query_free(q);
1708 			goto fail;
1709 		}
1710 	}
1711 
1712 	rc = backend_tx_run_single_int(tx_wr, q, &id);
1713 	backend_query_free(q);
1714 
1715 	if (rc == REP_PROTOCOL_SUCCESS) {
1716 		rc = REP_PROTOCOL_FAIL_EXISTS;
1717 		goto fail;
1718 	} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1719 		goto fail;
1720 	}
1721 
1722 	if (tx_ro != NULL)
1723 		backend_tx_end_ro(tx_ro);
1724 	tx_ro = NULL;
1725 
1726 	if ((lp->rl_main_id = backend_new_id(tx_wr,
1727 	    info[type].obj_id_space)) == 0) {
1728 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1729 		goto fail;
1730 	}
1731 
1732 	if ((np = rc_node_alloc()) == NULL) {
1733 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1734 		goto fail;
1735 	}
1736 
1737 	if ((rc = (*ip->obj_insert_pg_child)(tx_wr, lp, name, pgtype, flags,
1738 	    gen)) != REP_PROTOCOL_SUCCESS) {
1739 		rc_node_destroy(np);
1740 		goto fail;
1741 	}
1742 
1743 	rc = backend_tx_commit(tx_wr);
1744 	if (rc != REP_PROTOCOL_SUCCESS) {
1745 		rc_node_destroy(np);
1746 		return (rc);
1747 	}
1748 
1749 	*cpp = rc_node_setup_pg(np, lp, name, pgtype, flags, gen, ci.ci_parent);
1750 
1751 	return (REP_PROTOCOL_SUCCESS);
1752 
1753 fail:
1754 	if (tx_ro != NULL)
1755 		backend_tx_end_ro(tx_ro);
1756 	if (tx_wr != NULL)
1757 		backend_tx_rollback(tx_wr);
1758 	return (rc);
1759 }
1760 
1761 /*
1762  * Given a row of snaplevel number, snaplevel id, service id, service name,
1763  * instance id, & instance name, create a rc_snaplevel_t & prepend it onto the
1764  * rs_levels list of the rc_snapshot_t passed in as data.
1765  * Returns _CONTINUE on success or _ABORT if any allocations fail.
1766  */
1767 /*ARGSUSED*/
1768 static int
fill_snapshot_cb(void * data,int columns,char ** vals,char ** names)1769 fill_snapshot_cb(void *data, int columns, char **vals, char **names)
1770 {
1771 	rc_snapshot_t *sp = data;
1772 	rc_snaplevel_t *lvl;
1773 	char *num = vals[0];
1774 	char *id = vals[1];
1775 	char *service_id = vals[2];
1776 	char *service = vals[3];
1777 	char *instance_id = vals[4];
1778 	char *instance = vals[5];
1779 	assert(columns == 6);
1780 
1781 	lvl = uu_zalloc(sizeof (*lvl));
1782 	if (lvl == NULL)
1783 		return (BACKEND_CALLBACK_ABORT);
1784 	lvl->rsl_parent = sp;
1785 	lvl->rsl_next = sp->rs_levels;
1786 	sp->rs_levels = lvl;
1787 
1788 	string_to_id(num, &lvl->rsl_level_num, "snap_level_num");
1789 	string_to_id(id, &lvl->rsl_level_id, "snap_level_id");
1790 	string_to_id(service_id, &lvl->rsl_service_id, "snap_level_service_id");
1791 	if (instance_id != NULL)
1792 		string_to_id(instance_id, &lvl->rsl_instance_id,
1793 		    "snap_level_instance_id");
1794 
1795 	lvl->rsl_scope = (const char *)"localhost";
1796 	lvl->rsl_service = strdup(service);
1797 	if (lvl->rsl_service == NULL) {
1798 		uu_free(lvl);
1799 		return (BACKEND_CALLBACK_ABORT);
1800 	}
1801 	if (instance) {
1802 		assert(lvl->rsl_instance_id != 0);
1803 		lvl->rsl_instance = strdup(instance);
1804 		if (lvl->rsl_instance == NULL) {
1805 			free((void *)lvl->rsl_instance);
1806 			uu_free(lvl);
1807 			return (BACKEND_CALLBACK_ABORT);
1808 		}
1809 	} else {
1810 		assert(lvl->rsl_instance_id == 0);
1811 	}
1812 
1813 	return (BACKEND_CALLBACK_CONTINUE);
1814 }
1815 
1816 /*
1817  * Populate sp's rs_levels list from the snaplevel_tbl table.
1818  * Fails with
1819  *   _NO_RESOURCES
1820  */
1821 int
object_fill_snapshot(rc_snapshot_t * sp)1822 object_fill_snapshot(rc_snapshot_t *sp)
1823 {
1824 	backend_query_t *q;
1825 	rc_snaplevel_t *sl;
1826 	int result;
1827 	int i;
1828 
1829 	q = backend_query_alloc();
1830 	backend_query_add(q,
1831 	    "SELECT snap_level_num, snap_level_id, "
1832 	    "    snap_level_service_id, snap_level_service, "
1833 	    "    snap_level_instance_id, snap_level_instance "
1834 	    "FROM snaplevel_tbl "
1835 	    "WHERE snap_id = %d "
1836 	    "ORDER BY snap_level_id DESC",
1837 	    sp->rs_snap_id);
1838 
1839 	result = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_cb, sp);
1840 	if (result == REP_PROTOCOL_DONE)
1841 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
1842 	backend_query_free(q);
1843 
1844 	if (result == REP_PROTOCOL_SUCCESS) {
1845 		i = 0;
1846 		for (sl = sp->rs_levels; sl != NULL; sl = sl->rsl_next) {
1847 			if (sl->rsl_level_num != ++i) {
1848 				backend_panic("snaplevels corrupt; expected "
1849 				    "level %d, got %d", i, sl->rsl_level_num);
1850 			}
1851 		}
1852 	}
1853 	return (result);
1854 }
1855 
1856 /*
1857  * This represents a property group in a snapshot.
1858  */
1859 typedef struct check_snapshot_elem {
1860 	uint32_t cse_parent;
1861 	uint32_t cse_pg_id;
1862 	uint32_t cse_pg_gen;
1863 	char	cse_seen;
1864 } check_snapshot_elem_t;
1865 
1866 #define	CSI_MAX_PARENTS		COMPOSITION_DEPTH
1867 typedef struct check_snapshot_info {
1868 	size_t			csi_count;
1869 	size_t			csi_array_size;
1870 	check_snapshot_elem_t	*csi_array;
1871 	size_t			csi_nparents;
1872 	uint32_t		csi_parent_ids[CSI_MAX_PARENTS];
1873 } check_snapshot_info_t;
1874 
1875 /*ARGSUSED*/
1876 static int
check_snapshot_fill_cb(void * data,int columns,char ** vals,char ** names)1877 check_snapshot_fill_cb(void *data, int columns, char **vals, char **names)
1878 {
1879 	check_snapshot_info_t *csip = data;
1880 	check_snapshot_elem_t *cur;
1881 	const char *parent;
1882 	const char *pg_id;
1883 	const char *pg_gen_id;
1884 
1885 	if (columns == 1) {
1886 		uint32_t *target;
1887 
1888 		if (csip->csi_nparents >= CSI_MAX_PARENTS)
1889 			backend_panic("snaplevel table has too many elements");
1890 
1891 		target = &csip->csi_parent_ids[csip->csi_nparents++];
1892 		string_to_id(vals[0], target, "snap_level_*_id");
1893 
1894 		return (BACKEND_CALLBACK_CONTINUE);
1895 	}
1896 
1897 	assert(columns == 3);
1898 
1899 	parent = vals[0];
1900 	pg_id = vals[1];
1901 	pg_gen_id = vals[2];
1902 
1903 	if (csip->csi_count == csip->csi_array_size) {
1904 		size_t newsz = (csip->csi_array_size > 0) ?
1905 		    csip->csi_array_size * 2 : 8;
1906 		check_snapshot_elem_t *new = uu_zalloc(newsz * sizeof (*new));
1907 
1908 		if (new == NULL)
1909 			return (BACKEND_CALLBACK_ABORT);
1910 
1911 		(void) memcpy(new, csip->csi_array,
1912 		    sizeof (*new) * csip->csi_array_size);
1913 		uu_free(csip->csi_array);
1914 		csip->csi_array = new;
1915 		csip->csi_array_size = newsz;
1916 	}
1917 
1918 	cur = &csip->csi_array[csip->csi_count++];
1919 
1920 	string_to_id(parent, &cur->cse_parent, "snap_level_*_id");
1921 	string_to_id(pg_id, &cur->cse_pg_id, "snaplvl_pg_id");
1922 	string_to_id(pg_gen_id, &cur->cse_pg_gen, "snaplvl_gen_id");
1923 	cur->cse_seen = 0;
1924 
1925 	return (BACKEND_CALLBACK_CONTINUE);
1926 }
1927 
1928 static int
check_snapshot_elem_cmp(const void * lhs_arg,const void * rhs_arg)1929 check_snapshot_elem_cmp(const void *lhs_arg, const void *rhs_arg)
1930 {
1931 	const check_snapshot_elem_t *lhs = lhs_arg;
1932 	const check_snapshot_elem_t *rhs = rhs_arg;
1933 
1934 	if (lhs->cse_parent < rhs->cse_parent)
1935 		return (-1);
1936 	if (lhs->cse_parent > rhs->cse_parent)
1937 		return (1);
1938 
1939 	if (lhs->cse_pg_id < rhs->cse_pg_id)
1940 		return (-1);
1941 	if (lhs->cse_pg_id > rhs->cse_pg_id)
1942 		return (1);
1943 
1944 	if (lhs->cse_pg_gen < rhs->cse_pg_gen)
1945 		return (-1);
1946 	if (lhs->cse_pg_gen > rhs->cse_pg_gen)
1947 		return (1);
1948 
1949 	return (0);
1950 }
1951 
1952 /*ARGSUSED*/
1953 static int
check_snapshot_check_cb(void * data,int columns,char ** vals,char ** names)1954 check_snapshot_check_cb(void *data, int columns, char **vals, char **names)
1955 {
1956 	check_snapshot_info_t *csip = data;
1957 	check_snapshot_elem_t elem;
1958 	check_snapshot_elem_t *cur;
1959 
1960 	const char *parent = vals[0];
1961 	const char *pg_id = vals[1];
1962 	const char *pg_gen_id = vals[2];
1963 
1964 	assert(columns == 3);
1965 
1966 	string_to_id(parent, &elem.cse_parent, "snap_level_*_id");
1967 	string_to_id(pg_id, &elem.cse_pg_id, "snaplvl_pg_id");
1968 	string_to_id(pg_gen_id, &elem.cse_pg_gen, "snaplvl_gen_id");
1969 
1970 	if ((cur = bsearch(&elem, csip->csi_array, csip->csi_count,
1971 	    sizeof (*csip->csi_array), check_snapshot_elem_cmp)) == NULL)
1972 		return (BACKEND_CALLBACK_ABORT);
1973 
1974 	if (cur->cse_seen)
1975 		backend_panic("duplicate property group reported");
1976 	cur->cse_seen = 1;
1977 	return (BACKEND_CALLBACK_CONTINUE);
1978 }
1979 
1980 /*
1981  * Check that a snapshot matches up with the latest in the repository.
1982  * Returns:
1983  *	REP_PROTOCOL_SUCCESS		if it is up-to-date,
1984  *	REP_PROTOCOL_DONE		if it is out-of-date, or
1985  *	REP_PROTOCOL_FAIL_NO_RESOURCES	if we ran out of memory.
1986  */
1987 static int
object_check_snapshot(uint32_t snap_id)1988 object_check_snapshot(uint32_t snap_id)
1989 {
1990 	check_snapshot_info_t csi;
1991 	backend_query_t *q;
1992 	int result;
1993 	size_t idx;
1994 
1995 	/* if the snapshot has never been taken, it must be out of date. */
1996 	if (snap_id == 0)
1997 		return (REP_PROTOCOL_DONE);
1998 
1999 	(void) memset(&csi, '\0', sizeof (csi));
2000 
2001 	q = backend_query_alloc();
2002 	backend_query_add(q,
2003 	    "SELECT\n"
2004 	    "    CASE snap_level_instance_id\n"
2005 	    "        WHEN 0 THEN snap_level_service_id\n"
2006 	    "        ELSE snap_level_instance_id\n"
2007 	    "    END\n"
2008 	    "FROM snaplevel_tbl\n"
2009 	    "WHERE snap_id = %d;\n"
2010 	    "\n"
2011 	    "SELECT\n"
2012 	    "    CASE snap_level_instance_id\n"
2013 	    "        WHEN 0 THEN snap_level_service_id\n"
2014 	    "        ELSE snap_level_instance_id\n"
2015 	    "    END,\n"
2016 	    "    snaplvl_pg_id,\n"
2017 	    "    snaplvl_gen_id\n"
2018 	    "FROM snaplevel_tbl, snaplevel_lnk_tbl\n"
2019 	    "WHERE\n"
2020 	    "    (snaplvl_level_id = snap_level_id AND\n"
2021 	    "    snap_id = %d);",
2022 	    snap_id, snap_id);
2023 
2024 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_fill_cb,
2025 	    &csi);
2026 	if (result == REP_PROTOCOL_DONE)
2027 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2028 	backend_query_free(q);
2029 
2030 	if (result != REP_PROTOCOL_SUCCESS)
2031 		goto fail;
2032 
2033 	if (csi.csi_count > 0) {
2034 		qsort(csi.csi_array, csi.csi_count, sizeof (*csi.csi_array),
2035 		    check_snapshot_elem_cmp);
2036 	}
2037 
2038 #if COMPOSITION_DEPTH == 2
2039 	if (csi.csi_nparents != COMPOSITION_DEPTH) {
2040 		result = REP_PROTOCOL_DONE;
2041 		goto fail;
2042 	}
2043 
2044 	q = backend_query_alloc();
2045 	backend_query_add(q,
2046 	    "SELECT "
2047 	    "    pg_parent_id, pg_id, pg_gen_id "
2048 	    "FROM "
2049 	    "    pg_tbl "
2050 	    "WHERE (pg_parent_id = %d OR pg_parent_id = %d)",
2051 	    csi.csi_parent_ids[0], csi.csi_parent_ids[1]);
2052 
2053 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_check_cb,
2054 	    &csi);
2055 #else
2056 #error This code must be updated
2057 #endif
2058 	/*
2059 	 * To succeed, the callback must not have aborted, and we must have
2060 	 * found all of the items.
2061 	 */
2062 	if (result == REP_PROTOCOL_SUCCESS) {
2063 		for (idx = 0; idx < csi.csi_count; idx++) {
2064 			if (csi.csi_array[idx].cse_seen == 0) {
2065 				result = REP_PROTOCOL_DONE;
2066 				goto fail;
2067 			}
2068 		}
2069 	}
2070 
2071 fail:
2072 	uu_free(csi.csi_array);
2073 	return (result);
2074 }
2075 
2076 /*ARGSUSED*/
2077 static int
object_copy_string(void * data_arg,int columns,char ** vals,char ** names)2078 object_copy_string(void *data_arg, int columns, char **vals, char **names)
2079 {
2080 	char **data = data_arg;
2081 
2082 	assert(columns == 1);
2083 
2084 	if (*data != NULL)
2085 		free(*data);
2086 	*data = NULL;
2087 
2088 	if (vals[0] != NULL) {
2089 		if ((*data = strdup(vals[0])) == NULL)
2090 			return (BACKEND_CALLBACK_ABORT);
2091 	}
2092 
2093 	return (BACKEND_CALLBACK_CONTINUE);
2094 }
2095 
2096 struct snaplevel_add_info {
2097 	backend_query_t *sai_q;
2098 	uint32_t	sai_level_id;
2099 	int		sai_used;		/* sai_q has been used */
2100 };
2101 
2102 /*ARGSUSED*/
2103 static int
object_snaplevel_process_pg(void * data_arg,int columns,char ** vals,char ** names)2104 object_snaplevel_process_pg(void *data_arg, int columns, char **vals,
2105     char **names)
2106 {
2107 	struct snaplevel_add_info *data = data_arg;
2108 
2109 	assert(columns == 5);
2110 
2111 	backend_query_add(data->sai_q,
2112 	    "INSERT INTO snaplevel_lnk_tbl "
2113 	    "    (snaplvl_level_id, snaplvl_pg_id, snaplvl_pg_name, "
2114 	    "    snaplvl_pg_type, snaplvl_pg_flags, snaplvl_gen_id)"
2115 	    "VALUES (%d, %s, '%q', '%q', %s, %s);",
2116 	    data->sai_level_id, vals[0], vals[1], vals[2], vals[3], vals[4]);
2117 
2118 	data->sai_used = 1;
2119 
2120 	return (BACKEND_CALLBACK_CONTINUE);
2121 }
2122 
2123 /*ARGSUSED*/
2124 static int
object_snapshot_add_level(backend_tx_t * tx,uint32_t snap_id,uint32_t snap_level_num,uint32_t svc_id,const char * svc_name,uint32_t inst_id,const char * inst_name)2125 object_snapshot_add_level(backend_tx_t *tx, uint32_t snap_id,
2126     uint32_t snap_level_num, uint32_t svc_id, const char *svc_name,
2127     uint32_t inst_id, const char *inst_name)
2128 {
2129 	struct snaplevel_add_info data;
2130 	backend_query_t *q;
2131 	int result;
2132 
2133 	assert((snap_level_num == 1 && inst_name != NULL) ||
2134 	    snap_level_num == 2 && inst_name == NULL);
2135 
2136 	data.sai_level_id = backend_new_id(tx, BACKEND_ID_SNAPLEVEL);
2137 	if (data.sai_level_id == 0) {
2138 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
2139 	}
2140 
2141 	result = backend_tx_run_update(tx,
2142 	    "INSERT INTO snaplevel_tbl "
2143 	    "    (snap_id, snap_level_num, snap_level_id, "
2144 	    "    snap_level_service_id, snap_level_service, "
2145 	    "    snap_level_instance_id, snap_level_instance) "
2146 	    "VALUES (%d, %d, %d, %d, %Q, %d, %Q);",
2147 	    snap_id, snap_level_num, data.sai_level_id, svc_id, svc_name,
2148 	    inst_id, inst_name);
2149 
2150 	q = backend_query_alloc();
2151 	backend_query_add(q,
2152 	    "SELECT pg_id, pg_name, pg_type, pg_flags, pg_gen_id FROM pg_tbl "
2153 	    "WHERE (pg_parent_id = %d);",
2154 	    (inst_name != NULL)? inst_id : svc_id);
2155 
2156 	data.sai_q = backend_query_alloc();
2157 	data.sai_used = 0;
2158 	result = backend_tx_run(tx, q, object_snaplevel_process_pg,
2159 	    &data);
2160 	backend_query_free(q);
2161 
2162 	if (result == REP_PROTOCOL_SUCCESS && data.sai_used != 0)
2163 		result = backend_tx_run(tx, data.sai_q, NULL, NULL);
2164 	backend_query_free(data.sai_q);
2165 
2166 	return (result);
2167 }
2168 
2169 /*
2170  * Fails with:
2171  *	_NO_RESOURCES - no new id or out of disk space
2172  *	_BACKEND_READONLY - persistent backend is read-only
2173  */
2174 static int
object_snapshot_do_take(uint32_t instid,const char * inst_name,uint32_t svcid,const char * svc_name,backend_tx_t ** tx_out,uint32_t * snapid_out)2175 object_snapshot_do_take(uint32_t instid, const char *inst_name,
2176     uint32_t svcid, const char *svc_name,
2177     backend_tx_t **tx_out, uint32_t *snapid_out)
2178 {
2179 	backend_tx_t *tx;
2180 	backend_query_t *q;
2181 	int result;
2182 
2183 	char *svc_name_alloc = NULL;
2184 	char *inst_name_alloc = NULL;
2185 	uint32_t snapid;
2186 
2187 	result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2188 	if (result != REP_PROTOCOL_SUCCESS)
2189 		return (result);
2190 
2191 	snapid = backend_new_id(tx, BACKEND_ID_SNAPSHOT);
2192 	if (snapid == 0) {
2193 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2194 		goto fail;
2195 	}
2196 
2197 	if (svc_name == NULL) {
2198 		q = backend_query_alloc();
2199 		backend_query_add(q,
2200 		    "SELECT svc_name FROM service_tbl "
2201 		    "WHERE (svc_id = %d)", svcid);
2202 		result = backend_tx_run(tx, q, object_copy_string,
2203 		    &svc_name_alloc);
2204 		backend_query_free(q);
2205 
2206 		svc_name = svc_name_alloc;
2207 
2208 		if (result == REP_PROTOCOL_DONE) {
2209 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2210 			goto fail;
2211 		}
2212 		if (result == REP_PROTOCOL_SUCCESS && svc_name == NULL)
2213 			backend_panic("unable to find name for svc id %d\n",
2214 			    svcid);
2215 
2216 		if (result != REP_PROTOCOL_SUCCESS)
2217 			goto fail;
2218 	}
2219 
2220 	if (inst_name == NULL) {
2221 		q = backend_query_alloc();
2222 		backend_query_add(q,
2223 		    "SELECT instance_name FROM instance_tbl "
2224 		    "WHERE (instance_id = %d)", instid);
2225 		result = backend_tx_run(tx, q, object_copy_string,
2226 		    &inst_name_alloc);
2227 		backend_query_free(q);
2228 
2229 		inst_name = inst_name_alloc;
2230 
2231 		if (result == REP_PROTOCOL_DONE) {
2232 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2233 			goto fail;
2234 		}
2235 
2236 		if (result == REP_PROTOCOL_SUCCESS && inst_name == NULL)
2237 			backend_panic(
2238 			    "unable to find name for instance id %d\n", instid);
2239 
2240 		if (result != REP_PROTOCOL_SUCCESS)
2241 			goto fail;
2242 	}
2243 
2244 	result = object_snapshot_add_level(tx, snapid, 1,
2245 	    svcid, svc_name, instid, inst_name);
2246 
2247 	if (result != REP_PROTOCOL_SUCCESS)
2248 		goto fail;
2249 
2250 	result = object_snapshot_add_level(tx, snapid, 2,
2251 	    svcid, svc_name, 0, NULL);
2252 
2253 	if (result != REP_PROTOCOL_SUCCESS)
2254 		goto fail;
2255 
2256 	*snapid_out = snapid;
2257 	*tx_out = tx;
2258 
2259 	free(svc_name_alloc);
2260 	free(inst_name_alloc);
2261 
2262 	return (REP_PROTOCOL_SUCCESS);
2263 
2264 fail:
2265 	backend_tx_rollback(tx);
2266 	free(svc_name_alloc);
2267 	free(inst_name_alloc);
2268 	return (result);
2269 }
2270 
2271 /*
2272  * Fails with:
2273  *	_TYPE_MISMATCH - pp is not an instance
2274  *	_NO_RESOURCES - no new id or out of disk space
2275  *	_BACKEND_READONLY - persistent backend is read-only
2276  */
2277 int
object_snapshot_take_new(rc_node_t * pp,const char * svc_name,const char * inst_name,const char * name,rc_node_t ** outp)2278 object_snapshot_take_new(rc_node_t *pp,
2279     const char *svc_name, const char *inst_name,
2280     const char *name, rc_node_t **outp)
2281 {
2282 	rc_node_lookup_t *insti = &pp->rn_id;
2283 
2284 	uint32_t instid = insti->rl_main_id;
2285 	uint32_t svcid = insti->rl_ids[ID_SERVICE];
2286 	uint32_t snapid = 0;
2287 	backend_tx_t *tx = NULL;
2288 	child_info_t ci;
2289 	rc_node_t *np;
2290 	int result;
2291 
2292 	if (insti->rl_type != REP_PROTOCOL_ENTITY_INSTANCE)
2293 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2294 
2295 	result = object_snapshot_do_take(instid, inst_name, svcid, svc_name,
2296 	    &tx, &snapid);
2297 	if (result != REP_PROTOCOL_SUCCESS)
2298 		return (result);
2299 
2300 	if ((result = object_do_create(tx, &ci, pp,
2301 	    REP_PROTOCOL_ENTITY_SNAPSHOT, name, &np)) != REP_PROTOCOL_SUCCESS) {
2302 		backend_tx_rollback(tx);
2303 		return (result);
2304 	}
2305 
2306 	/*
2307 	 * link the new object to the new snapshot.
2308 	 */
2309 	np->rn_snapshot_id = snapid;
2310 
2311 	result = backend_tx_run_update(tx,
2312 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2313 	    snapid, ci.ci_base_nl.rl_main_id);
2314 	if (result != REP_PROTOCOL_SUCCESS) {
2315 		backend_tx_rollback(tx);
2316 		rc_node_destroy(np);
2317 		return (result);
2318 	}
2319 	result = backend_tx_commit(tx);
2320 	if (result != REP_PROTOCOL_SUCCESS) {
2321 		rc_node_destroy(np);
2322 		return (result);
2323 	}
2324 
2325 	*outp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
2326 	return (REP_PROTOCOL_SUCCESS);
2327 }
2328 
2329 /*
2330  * Fails with:
2331  *	_TYPE_MISMATCH - pp is not an instance
2332  *	_NO_RESOURCES - no new id or out of disk space
2333  *	_BACKEND_READONLY - persistent backend is read-only
2334  */
2335 int
object_snapshot_attach(rc_node_lookup_t * snapi,uint32_t * snapid_ptr,int takesnap)2336 object_snapshot_attach(rc_node_lookup_t *snapi, uint32_t *snapid_ptr,
2337     int takesnap)
2338 {
2339 	uint32_t svcid = snapi->rl_ids[ID_SERVICE];
2340 	uint32_t instid = snapi->rl_ids[ID_INSTANCE];
2341 	uint32_t snapid = *snapid_ptr;
2342 	uint32_t oldsnapid = 0;
2343 	backend_tx_t *tx = NULL;
2344 	backend_query_t *q;
2345 	int result;
2346 
2347 	delete_info_t dip;
2348 	delete_ent_t de;
2349 
2350 	if (snapi->rl_type != REP_PROTOCOL_ENTITY_SNAPSHOT)
2351 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2352 
2353 	if (takesnap) {
2354 		/* first, check that we're actually out of date */
2355 		if (object_check_snapshot(snapid) == REP_PROTOCOL_SUCCESS)
2356 			return (REP_PROTOCOL_SUCCESS);
2357 
2358 		result = object_snapshot_do_take(instid, NULL,
2359 		    svcid, NULL, &tx, &snapid);
2360 		if (result != REP_PROTOCOL_SUCCESS)
2361 			return (result);
2362 	} else {
2363 		result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2364 		if (result != REP_PROTOCOL_SUCCESS)
2365 			return (result);
2366 	}
2367 
2368 	q = backend_query_alloc();
2369 	backend_query_add(q,
2370 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
2371 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2372 	    snapi->rl_main_id, snapid, snapi->rl_main_id);
2373 	result = backend_tx_run_single_int(tx, q, &oldsnapid);
2374 	backend_query_free(q);
2375 
2376 	if (result == REP_PROTOCOL_FAIL_NOT_FOUND) {
2377 		backend_tx_rollback(tx);
2378 		backend_panic("unable to find snapshot id %d",
2379 		    snapi->rl_main_id);
2380 	}
2381 	if (result != REP_PROTOCOL_SUCCESS)
2382 		goto fail;
2383 
2384 	/*
2385 	 * Now we use the delete stack to handle the possible unreferencing
2386 	 * of oldsnapid.
2387 	 */
2388 	(void) memset(&dip, 0, sizeof (dip));
2389 	dip.di_tx = tx;
2390 	dip.di_np_tx = NULL;	/* no need for non-persistant backend */
2391 
2392 	if ((result = delete_stack_push(&dip, BACKEND_TYPE_NORMAL,
2393 	    &snaplevel_tbl_delete, oldsnapid, 0)) != REP_PROTOCOL_SUCCESS)
2394 		goto fail;
2395 
2396 	while (delete_stack_pop(&dip, &de)) {
2397 		result = (*de.de_cb)(&dip, &de);
2398 		if (result != REP_PROTOCOL_SUCCESS)
2399 			goto fail;
2400 	}
2401 
2402 	result = backend_tx_commit(tx);
2403 	if (result != REP_PROTOCOL_SUCCESS)
2404 		goto fail;
2405 
2406 	delete_stack_cleanup(&dip);
2407 	*snapid_ptr = snapid;
2408 	return (REP_PROTOCOL_SUCCESS);
2409 
2410 fail:
2411 	backend_tx_rollback(tx);
2412 	delete_stack_cleanup(&dip);
2413 	return (result);
2414 }
2415