xref: /titanic_41/usr/src/cmd/svc/configd/file_object.c (revision cc1a9a89a73172cc2db053635fab3b1b91691657)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 #pragma ident	"%Z%%M%	%I%	%E% SMI"
27 
28 /*
29  * file_object.c - enter objects into and load them from the backend
30  *
31  * The primary entry points in this layer are object_create(),
32  * object_create_pg(), object_delete(), and object_fill_children().  They each
33  * take an rc_node_t and use the functions in the object_info_t info array for
34  * the node's type.
35  */
36 
37 #include <assert.h>
38 #include <pthread.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 #include <strings.h>
43 
44 #include "configd.h"
45 #include "repcache_protocol.h"
46 
47 typedef struct child_info {
48 	rc_node_t	*ci_parent;
49 	backend_tx_t	*ci_tx;			/* only for properties */
50 	rc_node_lookup_t ci_base_nl;
51 } child_info_t;
52 
53 typedef struct delete_ent delete_ent_t;
54 typedef struct delete_stack delete_stack_t;
55 typedef struct delete_info delete_info_t;
56 
57 typedef int	delete_cb_func(delete_info_t *, const delete_ent_t *);
58 
59 struct delete_ent {
60 	delete_cb_func	*de_cb;		/* callback */
61 	uint32_t	de_backend;
62 	uint32_t	de_id;
63 	uint32_t	de_gen;		/* only for property groups */
64 };
65 
66 struct delete_stack {
67 	struct delete_stack *ds_next;
68 	uint32_t	ds_size;	/* number of elements */
69 	uint32_t	ds_cur;		/* current offset */
70 	delete_ent_t	ds_buf[1];	/* actually ds_size */
71 };
72 #define	DELETE_STACK_SIZE(x)	offsetof(delete_stack_t, ds_buf[(x)])
73 
74 struct delete_info {
75 	backend_tx_t	*di_tx;
76 	backend_tx_t	*di_np_tx;
77 	delete_stack_t	*di_stack;
78 	delete_stack_t	*di_free;
79 };
80 
81 typedef struct object_info {
82 	uint32_t	obj_type;
83 	enum id_space	obj_id_space;
84 
85 	int (*obj_fill_children)(rc_node_t *);
86 	int (*obj_setup_child_info)(rc_node_t *, uint32_t, child_info_t *);
87 	int (*obj_query_child)(backend_query_t *, rc_node_lookup_t *,
88 	    const char *);
89 	int (*obj_insert_child)(backend_tx_t *, rc_node_lookup_t *,
90 	    const char *);
91 	int (*obj_insert_pg_child)(backend_tx_t *, rc_node_lookup_t *,
92 	    const char *, const char *, uint32_t, uint32_t);
93 	int (*obj_delete_start)(rc_node_t *, delete_info_t *);
94 } object_info_t;
95 
96 static void
97 string_to_id(const char *str, uint32_t *output, const char *fieldname)
98 {
99 	if (uu_strtouint(str, output, sizeof (*output), 0, 0, 0) == -1)
100 		backend_panic("invalid integer \"%s\" in field \"%s\"",
101 		    str, fieldname);
102 }
103 
104 #define	NUM_NEEDED	50
105 
106 static int
107 delete_stack_push(delete_info_t *dip, uint32_t be, delete_cb_func *cb,
108     uint32_t id, uint32_t gen)
109 {
110 	delete_stack_t *cur = dip->di_stack;
111 	delete_ent_t *ent;
112 
113 	if (cur == NULL || cur->ds_cur == cur->ds_size) {
114 		delete_stack_t *new = dip->di_free;
115 		dip->di_free = NULL;
116 		if (new == NULL) {
117 			new = uu_zalloc(DELETE_STACK_SIZE(NUM_NEEDED));
118 			if (new == NULL)
119 				return (REP_PROTOCOL_FAIL_NO_RESOURCES);
120 			new->ds_size = NUM_NEEDED;
121 		}
122 		new->ds_cur = 0;
123 		new->ds_next = dip->di_stack;
124 		dip->di_stack = new;
125 		cur = new;
126 	}
127 	assert(cur->ds_cur < cur->ds_size);
128 	ent = &cur->ds_buf[cur->ds_cur++];
129 
130 	ent->de_backend = be;
131 	ent->de_cb = cb;
132 	ent->de_id = id;
133 	ent->de_gen = gen;
134 
135 	return (REP_PROTOCOL_SUCCESS);
136 }
137 
138 static int
139 delete_stack_pop(delete_info_t *dip, delete_ent_t *out)
140 {
141 	delete_stack_t *cur = dip->di_stack;
142 	delete_ent_t *ent;
143 
144 	if (cur == NULL)
145 		return (0);
146 	assert(cur->ds_cur > 0 && cur->ds_cur <= cur->ds_size);
147 	ent = &cur->ds_buf[--cur->ds_cur];
148 	if (cur->ds_cur == 0) {
149 		dip->di_stack = cur->ds_next;
150 		cur->ds_next = NULL;
151 
152 		if (dip->di_free != NULL)
153 			uu_free(dip->di_free);
154 		dip->di_free = cur;
155 	}
156 	if (ent == NULL)
157 		return (0);
158 
159 	*out = *ent;
160 	return (1);
161 }
162 
163 static void
164 delete_stack_cleanup(delete_info_t *dip)
165 {
166 	delete_stack_t *cur;
167 	while ((cur = dip->di_stack) != NULL) {
168 		dip->di_stack = cur->ds_next;
169 
170 		uu_free(cur);
171 	}
172 
173 	if ((cur = dip->di_free) != NULL) {
174 		assert(cur->ds_next == NULL);	/* should only be one */
175 		uu_free(cur);
176 		dip->di_free = NULL;
177 	}
178 }
179 
180 struct delete_cb_info {
181 	delete_info_t	*dci_dip;
182 	uint32_t	dci_be;
183 	delete_cb_func	*dci_cb;
184 	int		dci_result;
185 };
186 
187 /*ARGSUSED*/
188 static int
189 push_delete_callback(void *data, int columns, char **vals, char **names)
190 {
191 	struct delete_cb_info *info = data;
192 
193 	const char *id_str = *vals++;
194 	const char *gen_str = *vals++;
195 
196 	uint32_t id;
197 	uint32_t gen;
198 
199 	assert(columns == 2);
200 
201 	string_to_id(id_str, &id, "id");
202 	string_to_id(gen_str, &gen, "gen_id");
203 
204 	info->dci_result = delete_stack_push(info->dci_dip, info->dci_be,
205 	    info->dci_cb, id, gen);
206 
207 	if (info->dci_result != REP_PROTOCOL_SUCCESS)
208 		return (BACKEND_CALLBACK_ABORT);
209 	return (BACKEND_CALLBACK_CONTINUE);
210 }
211 
212 static int
213 value_delete(delete_info_t *dip, const delete_ent_t *ent)
214 {
215 	uint32_t be = ent->de_backend;
216 	int r;
217 
218 	backend_query_t *q;
219 
220 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
221 	    dip->di_np_tx;
222 
223 	q = backend_query_alloc();
224 
225 	backend_query_add(q,
226 	    "SELECT 1 FROM prop_lnk_tbl WHERE (lnk_val_id = %d); "
227 	    "DELETE FROM value_tbl WHERE (value_id = %d); ",
228 	    ent->de_id, ent->de_id);
229 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
230 	backend_query_free(q);
231 	if (r == REP_PROTOCOL_DONE)
232 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
233 	return (r);
234 }
235 
236 static int
237 pg_lnk_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
238 {
239 	struct delete_cb_info info;
240 	uint32_t be = ent->de_backend;
241 	int r;
242 
243 	backend_query_t *q;
244 
245 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
246 	    dip->di_np_tx;
247 
248 	/*
249 	 * For non-persistent backends, we could only have one parent, and
250 	 * he's already been deleted.
251 	 *
252 	 * For normal backends, we need to check to see if we're in
253 	 * a snapshot or are the active generation for the property
254 	 * group.  If we are, there's nothing to be done.
255 	 */
256 	if (be == BACKEND_TYPE_NORMAL) {
257 		q = backend_query_alloc();
258 		backend_query_add(q,
259 		    "SELECT 1 "
260 		    "FROM pg_tbl "
261 		    "WHERE (pg_id = %d AND pg_gen_id = %d); "
262 		    "SELECT 1 "
263 		    "FROM snaplevel_lnk_tbl "
264 		    "WHERE (snaplvl_pg_id = %d AND snaplvl_gen_id = %d);",
265 		    ent->de_id, ent->de_gen,
266 		    ent->de_id, ent->de_gen);
267 		r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
268 		backend_query_free(q);
269 
270 		if (r == REP_PROTOCOL_DONE)
271 			return (REP_PROTOCOL_SUCCESS);	/* still in use */
272 	}
273 
274 	info.dci_dip = dip;
275 	info.dci_be =  be;
276 	info.dci_cb = &value_delete;
277 	info.dci_result = REP_PROTOCOL_SUCCESS;
278 
279 	q = backend_query_alloc();
280 	backend_query_add(q,
281 	    "SELECT DISTINCT lnk_val_id, 0 FROM prop_lnk_tbl "
282 	    "WHERE "
283 	    "    (lnk_pg_id = %d AND lnk_gen_id = %d AND lnk_val_id NOTNULL); "
284 	    "DELETE FROM prop_lnk_tbl "
285 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
286 	    ent->de_id, ent->de_gen, ent->de_id, ent->de_gen);
287 
288 	r = backend_tx_run(tx, q, push_delete_callback, &info);
289 	backend_query_free(q);
290 
291 	if (r == REP_PROTOCOL_DONE) {
292 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
293 		return (info.dci_result);
294 	}
295 	return (r);
296 }
297 
298 static int
299 propertygrp_delete(delete_info_t *dip, const delete_ent_t *ent)
300 {
301 	uint32_t be = ent->de_backend;
302 	backend_query_t *q;
303 	uint32_t gen;
304 
305 	int r;
306 
307 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
308 	    dip->di_np_tx;
309 
310 	q = backend_query_alloc();
311 	backend_query_add(q,
312 	    "SELECT pg_gen_id FROM pg_tbl WHERE pg_id = %d; "
313 	    "DELETE FROM pg_tbl WHERE pg_id = %d",
314 	    ent->de_id, ent->de_id);
315 	r = backend_tx_run_single_int(tx, q, &gen);
316 	backend_query_free(q);
317 
318 	if (r != REP_PROTOCOL_SUCCESS)
319 		return (r);
320 
321 	return (delete_stack_push(dip, be, &pg_lnk_tbl_delete,
322 	    ent->de_id, gen));
323 }
324 
325 static int
326 snaplevel_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
327 {
328 	uint32_t be = ent->de_backend;
329 	backend_query_t *q;
330 	struct delete_cb_info info;
331 
332 	int r;
333 
334 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
335 	    dip->di_np_tx;
336 
337 	info.dci_dip = dip;
338 	info.dci_be = be;
339 	info.dci_cb = &pg_lnk_tbl_delete;
340 	info.dci_result = REP_PROTOCOL_SUCCESS;
341 
342 	q = backend_query_alloc();
343 	backend_query_add(q,
344 	    "SELECT snaplvl_pg_id, snaplvl_gen_id "
345 	    "    FROM snaplevel_lnk_tbl "
346 	    "    WHERE snaplvl_level_id = %d; "
347 	    "DELETE FROM snaplevel_lnk_tbl WHERE snaplvl_level_id = %d",
348 	    ent->de_id, ent->de_id);
349 	r = backend_tx_run(tx, q, push_delete_callback, &info);
350 	backend_query_free(q);
351 
352 	if (r == REP_PROTOCOL_DONE) {
353 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
354 		return (info.dci_result);
355 	}
356 	return (r);
357 }
358 
359 static int
360 snaplevel_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
361 {
362 	uint32_t be = ent->de_backend;
363 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
364 	    dip->di_np_tx;
365 
366 	struct delete_cb_info info;
367 	backend_query_t *q;
368 	int r;
369 
370 	assert(be == BACKEND_TYPE_NORMAL);
371 
372 	q = backend_query_alloc();
373 	backend_query_add(q,
374 	    "SELECT 1 FROM snapshot_lnk_tbl WHERE lnk_snap_id = %d",
375 	    ent->de_id);
376 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
377 	backend_query_free(q);
378 
379 	if (r == REP_PROTOCOL_DONE)
380 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
381 
382 	info.dci_dip = dip;
383 	info.dci_be = be;
384 	info.dci_cb = &snaplevel_lnk_delete;
385 	info.dci_result = REP_PROTOCOL_SUCCESS;
386 
387 	q = backend_query_alloc();
388 	backend_query_add(q,
389 	    "SELECT snap_level_id, 0 FROM snaplevel_tbl WHERE snap_id = %d;"
390 	    "DELETE FROM snaplevel_tbl WHERE snap_id = %d",
391 	    ent->de_id, ent->de_id);
392 	r = backend_tx_run(tx, q, push_delete_callback, &info);
393 	backend_query_free(q);
394 
395 	if (r == REP_PROTOCOL_DONE) {
396 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
397 		return (info.dci_result);
398 	}
399 	return (r);
400 }
401 
402 static int
403 snapshot_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
404 {
405 	uint32_t be = ent->de_backend;
406 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
407 	    dip->di_np_tx;
408 
409 	backend_query_t *q;
410 	uint32_t snapid;
411 	int r;
412 
413 	assert(be == BACKEND_TYPE_NORMAL);
414 
415 	q = backend_query_alloc();
416 	backend_query_add(q,
417 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
418 	    "DELETE FROM snapshot_lnk_tbl WHERE lnk_id = %d",
419 	    ent->de_id, ent->de_id);
420 	r = backend_tx_run_single_int(tx, q, &snapid);
421 	backend_query_free(q);
422 
423 	if (r != REP_PROTOCOL_SUCCESS)
424 		return (r);
425 
426 	return (delete_stack_push(dip, be, &snaplevel_tbl_delete, snapid, 0));
427 }
428 
429 static int
430 pgparent_delete_add_pgs(delete_info_t *dip, uint32_t parent_id)
431 {
432 	struct delete_cb_info info;
433 	backend_query_t *q;
434 	int r;
435 
436 	info.dci_dip = dip;
437 	info.dci_be = BACKEND_TYPE_NORMAL;
438 	info.dci_cb = &propertygrp_delete;
439 	info.dci_result = REP_PROTOCOL_SUCCESS;
440 
441 	q = backend_query_alloc();
442 	backend_query_add(q,
443 	    "SELECT pg_id, 0 FROM pg_tbl WHERE pg_parent_id = %d",
444 	    parent_id);
445 
446 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
447 
448 	if (r == REP_PROTOCOL_DONE) {
449 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
450 		backend_query_free(q);
451 		return (info.dci_result);
452 	}
453 	if (r != REP_PROTOCOL_SUCCESS) {
454 		backend_query_free(q);
455 		return (r);
456 	}
457 
458 	if (dip->di_np_tx != NULL) {
459 		info.dci_be = BACKEND_TYPE_NONPERSIST;
460 
461 		r = backend_tx_run(dip->di_np_tx, q, push_delete_callback,
462 		    &info);
463 
464 		if (r == REP_PROTOCOL_DONE) {
465 			assert(info.dci_result != REP_PROTOCOL_SUCCESS);
466 			backend_query_free(q);
467 			return (info.dci_result);
468 		}
469 		if (r != REP_PROTOCOL_SUCCESS) {
470 			backend_query_free(q);
471 			return (r);
472 		}
473 	}
474 	backend_query_free(q);
475 	return (REP_PROTOCOL_SUCCESS);
476 }
477 
478 static int
479 service_delete(delete_info_t *dip, const delete_ent_t *ent)
480 {
481 	int r;
482 
483 	r = backend_tx_run_update_changed(dip->di_tx,
484 	    "DELETE FROM service_tbl WHERE svc_id = %d", ent->de_id);
485 	if (r != REP_PROTOCOL_SUCCESS)
486 		return (r);
487 
488 	return (pgparent_delete_add_pgs(dip, ent->de_id));
489 }
490 
491 static int
492 instance_delete(delete_info_t *dip, const delete_ent_t *ent)
493 {
494 	struct delete_cb_info info;
495 	int r;
496 	backend_query_t *q;
497 
498 	r = backend_tx_run_update_changed(dip->di_tx,
499 	    "DELETE FROM instance_tbl WHERE instance_id = %d", ent->de_id);
500 	if (r != REP_PROTOCOL_SUCCESS)
501 		return (r);
502 
503 	r = pgparent_delete_add_pgs(dip, ent->de_id);
504 	if (r != REP_PROTOCOL_SUCCESS)
505 		return (r);
506 
507 	info.dci_dip = dip;
508 	info.dci_be = BACKEND_TYPE_NORMAL;
509 	info.dci_cb = &snapshot_lnk_delete;
510 	info.dci_result = REP_PROTOCOL_SUCCESS;
511 
512 	q = backend_query_alloc();
513 	backend_query_add(q,
514 	    "SELECT lnk_id, 0 FROM snapshot_lnk_tbl WHERE lnk_inst_id = %d",
515 	    ent->de_id);
516 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
517 	backend_query_free(q);
518 
519 	if (r == REP_PROTOCOL_DONE) {
520 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
521 		return (info.dci_result);
522 	}
523 	return (r);
524 }
525 
526 /*ARGSUSED*/
527 static int
528 fill_child_callback(void *data, int columns, char **vals, char **names)
529 {
530 	child_info_t *cp = data;
531 	rc_node_t *np;
532 	uint32_t main_id;
533 	const char *name;
534 	const char *cur;
535 	rc_node_lookup_t *lp = &cp->ci_base_nl;
536 
537 	assert(columns == 2);
538 
539 	name = *vals++;
540 	columns--;
541 
542 	cur = *vals++;
543 	columns--;
544 	string_to_id(cur, &main_id, "id");
545 
546 	lp->rl_main_id = main_id;
547 
548 	if ((np = rc_node_alloc()) == NULL)
549 		return (BACKEND_CALLBACK_ABORT);
550 
551 	np = rc_node_setup(np, lp, name, cp->ci_parent);
552 	rc_node_rele(np);
553 
554 	return (BACKEND_CALLBACK_CONTINUE);
555 }
556 
557 /*ARGSUSED*/
558 static int
559 fill_snapshot_callback(void *data, int columns, char **vals, char **names)
560 {
561 	child_info_t *cp = data;
562 	rc_node_t *np;
563 	uint32_t main_id;
564 	uint32_t snap_id;
565 	const char *name;
566 	const char *cur;
567 	const char *snap;
568 	rc_node_lookup_t *lp = &cp->ci_base_nl;
569 
570 	assert(columns == 3);
571 
572 	name = *vals++;
573 	columns--;
574 
575 	cur = *vals++;
576 	columns--;
577 	snap = *vals++;
578 	columns--;
579 
580 	string_to_id(cur, &main_id, "lnk_id");
581 	string_to_id(snap, &snap_id, "lnk_snap_id");
582 
583 	lp->rl_main_id = main_id;
584 
585 	if ((np = rc_node_alloc()) == NULL)
586 		return (BACKEND_CALLBACK_ABORT);
587 
588 	np = rc_node_setup_snapshot(np, lp, name, snap_id, cp->ci_parent);
589 	rc_node_rele(np);
590 
591 	return (BACKEND_CALLBACK_CONTINUE);
592 }
593 
594 /*ARGSUSED*/
595 static int
596 fill_pg_callback(void *data, int columns, char **vals, char **names)
597 {
598 	child_info_t *cip = data;
599 	const char *name;
600 	const char *type;
601 	const char *cur;
602 	uint32_t main_id;
603 	uint32_t flags;
604 	uint32_t gen_id;
605 
606 	rc_node_lookup_t *lp = &cip->ci_base_nl;
607 	rc_node_t *newnode, *pg;
608 
609 	assert(columns == 5);
610 
611 	name = *vals++;		/* pg_name */
612 	columns--;
613 
614 	cur = *vals++;		/* pg_id */
615 	columns--;
616 	string_to_id(cur, &main_id, "pg_id");
617 
618 	lp->rl_main_id = main_id;
619 
620 	cur = *vals++;		/* pg_gen_id */
621 	columns--;
622 	string_to_id(cur, &gen_id, "pg_gen_id");
623 
624 	type = *vals++;		/* pg_type */
625 	columns--;
626 
627 	cur = *vals++;		/* pg_flags */
628 	columns--;
629 	string_to_id(cur, &flags, "pg_flags");
630 
631 	if ((newnode = rc_node_alloc()) == NULL)
632 		return (BACKEND_CALLBACK_ABORT);
633 
634 	pg = rc_node_setup_pg(newnode, lp, name, type, flags, gen_id,
635 	    cip->ci_parent);
636 	if (pg == NULL) {
637 		rc_node_destroy(newnode);
638 		return (BACKEND_CALLBACK_ABORT);
639 	}
640 
641 	rc_node_rele(pg);
642 
643 	return (BACKEND_CALLBACK_CONTINUE);
644 }
645 
646 struct property_value_info {
647 	char		*pvi_base;
648 	size_t		pvi_pos;
649 	size_t		pvi_size;
650 	size_t		pvi_count;
651 };
652 
653 /*ARGSUSED*/
654 static int
655 property_value_size_cb(void *data, int columns, char **vals, char **names)
656 {
657 	struct property_value_info *info = data;
658 	assert(columns == 1);
659 
660 	info->pvi_size += strlen(vals[0]) + 1;		/* count the '\0' */
661 
662 	return (BACKEND_CALLBACK_CONTINUE);
663 }
664 
665 /*ARGSUSED*/
666 static int
667 property_value_cb(void *data, int columns, char **vals, char **names)
668 {
669 	struct property_value_info *info = data;
670 	size_t pos, left, len;
671 
672 	assert(columns == 1);
673 	pos = info->pvi_pos;
674 	left = info->pvi_size - pos;
675 
676 	pos = info->pvi_pos;
677 	left = info->pvi_size - pos;
678 
679 	if ((len = strlcpy(&info->pvi_base[pos], vals[0], left)) >= left) {
680 		/*
681 		 * since we preallocated, above, this shouldn't happen
682 		 */
683 		backend_panic("unexpected database change");
684 	}
685 
686 	len += 1;	/* count the '\0' */
687 
688 	info->pvi_pos += len;
689 	info->pvi_count++;
690 
691 	return (BACKEND_CALLBACK_CONTINUE);
692 }
693 
694 /*ARGSUSED*/
695 void
696 object_free_values(const char *vals, uint32_t type, size_t count, size_t size)
697 {
698 	if (vals != NULL)
699 		uu_free((void *)vals);
700 }
701 
702 /*ARGSUSED*/
703 static int
704 fill_property_callback(void *data, int columns, char **vals, char **names)
705 {
706 	child_info_t *cp = data;
707 	backend_tx_t *tx = cp->ci_tx;
708 	uint32_t main_id;
709 	const char *name;
710 	const char *cur;
711 	rep_protocol_value_type_t type;
712 	rc_node_lookup_t *lp = &cp->ci_base_nl;
713 	struct property_value_info info;
714 	int rc;
715 
716 	assert(columns == 4);
717 	assert(tx != NULL);
718 
719 	info.pvi_base = NULL;
720 	info.pvi_pos = 0;
721 	info.pvi_size = 0;
722 	info.pvi_count = 0;
723 
724 	name = *vals++;
725 
726 	cur = *vals++;
727 	string_to_id(cur, &main_id, "lnk_prop_id");
728 
729 	cur = *vals++;
730 	assert(('a' <= cur[0] && 'z' >= cur[0]) ||
731 	    ('A' <= cur[0] && 'Z' >= cur[0]) &&
732 	    (cur[1] == 0 || ('a' <= cur[1] && 'z' >= cur[1]) ||
733 	    ('A' <= cur[1] && 'Z' >= cur[1])));
734 	type = cur[0] | (cur[1] << 8);
735 
736 	lp->rl_main_id = main_id;
737 
738 	/*
739 	 * fill in the values, if any
740 	 */
741 	if ((cur = *vals++) != NULL) {
742 		rep_protocol_responseid_t r;
743 		backend_query_t *q = backend_query_alloc();
744 
745 		/*
746 		 * Ensure that select operation is reflective
747 		 * of repository schema.  If the repository has
748 		 * been upgraded,  make use of value ordering
749 		 * by retrieving values in order using the
750 		 * value_order column.  Otherwise, simply
751 		 * run the select with no order specified.
752 		 * The order-insensitive select is necessary
753 		 * as on first reboot post-upgrade,  the repository
754 		 * contents need to be read before the repository
755 		 * backend is writable (and upgrade is possible).
756 		 */
757 		if (backend_is_upgraded(tx)) {
758 			backend_query_add(q,
759 			    "SELECT value_value FROM value_tbl "
760 			    "WHERE (value_id = '%q') ORDER BY value_order",
761 			    cur);
762 		} else {
763 			backend_query_add(q,
764 			    "SELECT value_value FROM value_tbl "
765 			    "WHERE (value_id = '%q')",
766 			    cur);
767 		}
768 
769 		switch (r = backend_tx_run(tx, q, property_value_size_cb,
770 		    &info)) {
771 		case REP_PROTOCOL_SUCCESS:
772 			break;
773 
774 		case REP_PROTOCOL_FAIL_NO_RESOURCES:
775 			backend_query_free(q);
776 			return (BACKEND_CALLBACK_ABORT);
777 
778 		case REP_PROTOCOL_DONE:
779 		default:
780 			backend_panic("backend_tx_run() returned %d", r);
781 		}
782 		if (info.pvi_size > 0) {
783 			info.pvi_base = uu_zalloc(info.pvi_size);
784 			if (info.pvi_base == NULL) {
785 				backend_query_free(q);
786 				return (BACKEND_CALLBACK_ABORT);
787 			}
788 			switch (r = backend_tx_run(tx, q, property_value_cb,
789 			    &info)) {
790 			case REP_PROTOCOL_SUCCESS:
791 				break;
792 
793 			case REP_PROTOCOL_FAIL_NO_RESOURCES:
794 				uu_free(info.pvi_base);
795 				backend_query_free(q);
796 				return (BACKEND_CALLBACK_ABORT);
797 
798 			case REP_PROTOCOL_DONE:
799 			default:
800 				backend_panic("backend_tx_run() returned %d",
801 				    r);
802 			}
803 		}
804 		backend_query_free(q);
805 	}
806 
807 	rc = rc_node_create_property(cp->ci_parent, lp, name, type,
808 	    info.pvi_base, info.pvi_count, info.pvi_size);
809 	if (rc != REP_PROTOCOL_SUCCESS) {
810 		assert(rc == REP_PROTOCOL_FAIL_NO_RESOURCES);
811 		return (BACKEND_CALLBACK_ABORT);
812 	}
813 
814 	return (BACKEND_CALLBACK_CONTINUE);
815 }
816 
817 /*
818  * The *_setup_child_info() functions fill in a child_info_t structure with the
819  * information for the children of np with type type.
820  *
821  * They fail with
822  *   _TYPE_MISMATCH - object cannot have children of type type
823  */
824 
825 static int
826 scope_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
827 {
828 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
829 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
830 
831 	bzero(cip, sizeof (*cip));
832 	cip->ci_parent = np;
833 	cip->ci_base_nl.rl_type = type;
834 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
835 	return (REP_PROTOCOL_SUCCESS);
836 }
837 
838 static int
839 service_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
840 {
841 	switch (type) {
842 	case REP_PROTOCOL_ENTITY_INSTANCE:
843 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
844 		break;
845 	default:
846 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
847 	}
848 
849 	bzero(cip, sizeof (*cip));
850 	cip->ci_parent = np;
851 	cip->ci_base_nl.rl_type = type;
852 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
853 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_main_id;
854 
855 	return (REP_PROTOCOL_SUCCESS);
856 }
857 
858 static int
859 instance_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
860 {
861 	switch (type) {
862 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
863 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
864 		break;
865 	default:
866 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
867 	}
868 
869 	bzero(cip, sizeof (*cip));
870 	cip->ci_parent = np;
871 	cip->ci_base_nl.rl_type = type;
872 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
873 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
874 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_main_id;
875 
876 	return (REP_PROTOCOL_SUCCESS);
877 }
878 
879 static int
880 snaplevel_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
881 {
882 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
883 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
884 
885 	bzero(cip, sizeof (*cip));
886 	cip->ci_parent = np;
887 	cip->ci_base_nl.rl_type = type;
888 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
889 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
890 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
891 	cip->ci_base_nl.rl_ids[ID_NAME] = np->rn_id.rl_ids[ID_NAME];
892 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = np->rn_id.rl_ids[ID_SNAPSHOT];
893 	cip->ci_base_nl.rl_ids[ID_LEVEL] = np->rn_id.rl_main_id;
894 
895 	return (REP_PROTOCOL_SUCCESS);
896 }
897 
898 static int
899 propertygrp_setup_child_info(rc_node_t *pg, uint32_t type, child_info_t *cip)
900 {
901 	if (type != REP_PROTOCOL_ENTITY_PROPERTY)
902 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
903 
904 	bzero(cip, sizeof (*cip));
905 	cip->ci_parent = pg;
906 	cip->ci_base_nl.rl_type = type;
907 	cip->ci_base_nl.rl_backend = pg->rn_id.rl_backend;
908 	cip->ci_base_nl.rl_ids[ID_SERVICE] = pg->rn_id.rl_ids[ID_SERVICE];
909 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = pg->rn_id.rl_ids[ID_INSTANCE];
910 	cip->ci_base_nl.rl_ids[ID_PG] = pg->rn_id.rl_main_id;
911 	cip->ci_base_nl.rl_ids[ID_GEN] = pg->rn_gen_id;
912 	cip->ci_base_nl.rl_ids[ID_NAME] = pg->rn_id.rl_ids[ID_NAME];
913 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = pg->rn_id.rl_ids[ID_SNAPSHOT];
914 	cip->ci_base_nl.rl_ids[ID_LEVEL] = pg->rn_id.rl_ids[ID_LEVEL];
915 
916 	return (REP_PROTOCOL_SUCCESS);
917 }
918 
919 /*
920  * The *_fill_children() functions populate the children of the given rc_node_t
921  * by querying the database and calling rc_node_setup_*() functions (usually
922  * via a fill_*_callback()).
923  *
924  * They fail with
925  *   _NO_RESOURCES
926  */
927 
928 /*
929  * Returns
930  *   _NO_RESOURCES
931  *   _SUCCESS
932  */
933 static int
934 scope_fill_children(rc_node_t *np)
935 {
936 	backend_query_t *q;
937 	child_info_t ci;
938 	int res;
939 
940 	(void) scope_setup_child_info(np, REP_PROTOCOL_ENTITY_SERVICE, &ci);
941 
942 	q = backend_query_alloc();
943 	backend_query_append(q, "SELECT svc_name, svc_id FROM service_tbl");
944 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
945 	backend_query_free(q);
946 
947 	if (res == REP_PROTOCOL_DONE)
948 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
949 	return (res);
950 }
951 
952 /*
953  * Returns
954  *   _NO_RESOURCES
955  *   _SUCCESS
956  */
957 static int
958 service_fill_children(rc_node_t *np)
959 {
960 	backend_query_t *q;
961 	child_info_t ci;
962 	int res;
963 
964 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
965 
966 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_INSTANCE, &ci);
967 
968 	q = backend_query_alloc();
969 	backend_query_add(q,
970 	    "SELECT instance_name, instance_id FROM instance_tbl"
971 	    "    WHERE (instance_svc = %d)",
972 	    np->rn_id.rl_main_id);
973 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
974 	backend_query_free(q);
975 
976 	if (res == REP_PROTOCOL_DONE)
977 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
978 	if (res != REP_PROTOCOL_SUCCESS)
979 		return (res);
980 
981 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
982 	    &ci);
983 
984 	q = backend_query_alloc();
985 	backend_query_add(q,
986 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
987 	    "    WHERE (pg_parent_id = %d)",
988 	    np->rn_id.rl_main_id);
989 
990 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
991 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
992 	if (res == REP_PROTOCOL_SUCCESS) {
993 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
994 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
995 		    fill_pg_callback, &ci);
996 		/* nonpersistant database may not exist */
997 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
998 			res = REP_PROTOCOL_SUCCESS;
999 	}
1000 	if (res == REP_PROTOCOL_DONE)
1001 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1002 	backend_query_free(q);
1003 
1004 	return (res);
1005 }
1006 
1007 /*
1008  * Returns
1009  *   _NO_RESOURCES
1010  *   _SUCCESS
1011  */
1012 static int
1013 instance_fill_children(rc_node_t *np)
1014 {
1015 	backend_query_t *q;
1016 	child_info_t ci;
1017 	int res;
1018 
1019 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
1020 
1021 	/* Get child property groups */
1022 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1023 	    &ci);
1024 
1025 	q = backend_query_alloc();
1026 	backend_query_add(q,
1027 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
1028 	    "    WHERE (pg_parent_id = %d)",
1029 	    np->rn_id.rl_main_id);
1030 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
1031 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1032 	if (res == REP_PROTOCOL_SUCCESS) {
1033 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
1034 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
1035 		    fill_pg_callback, &ci);
1036 		/* nonpersistant database may not exist */
1037 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
1038 			res = REP_PROTOCOL_SUCCESS;
1039 	}
1040 	if (res == REP_PROTOCOL_DONE)
1041 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1042 	backend_query_free(q);
1043 
1044 	if (res != REP_PROTOCOL_SUCCESS)
1045 		return (res);
1046 
1047 	/* Get child snapshots */
1048 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_SNAPSHOT,
1049 	    &ci);
1050 
1051 	q = backend_query_alloc();
1052 	backend_query_add(q,
1053 	    "SELECT lnk_snap_name, lnk_id, lnk_snap_id FROM snapshot_lnk_tbl"
1054 	    "    WHERE (lnk_inst_id = %d)",
1055 	    np->rn_id.rl_main_id);
1056 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_callback, &ci);
1057 	if (res == REP_PROTOCOL_DONE)
1058 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1059 	backend_query_free(q);
1060 
1061 	return (res);
1062 }
1063 
1064 /*
1065  * Returns
1066  *   _NO_RESOURCES
1067  *   _SUCCESS
1068  */
1069 static int
1070 snapshot_fill_children(rc_node_t *np)
1071 {
1072 	rc_node_t *nnp;
1073 	rc_snapshot_t *sp, *oldsp;
1074 	rc_snaplevel_t *lvl;
1075 	rc_node_lookup_t nl;
1076 	int r;
1077 
1078 	/* Get the rc_snapshot_t (& its rc_snaplevel_t's). */
1079 	(void) pthread_mutex_lock(&np->rn_lock);
1080 	sp = np->rn_snapshot;
1081 	(void) pthread_mutex_unlock(&np->rn_lock);
1082 	if (sp == NULL) {
1083 		r = rc_snapshot_get(np->rn_snapshot_id, &sp);
1084 		if (r != REP_PROTOCOL_SUCCESS) {
1085 			assert(r == REP_PROTOCOL_FAIL_NO_RESOURCES);
1086 			return (r);
1087 		}
1088 		(void) pthread_mutex_lock(&np->rn_lock);
1089 		oldsp = np->rn_snapshot;
1090 		assert(oldsp == NULL || oldsp == sp);
1091 		np->rn_snapshot = sp;
1092 		(void) pthread_mutex_unlock(&np->rn_lock);
1093 		if (oldsp != NULL)
1094 			rc_snapshot_rele(oldsp);
1095 	}
1096 
1097 	bzero(&nl, sizeof (nl));
1098 	nl.rl_type = REP_PROTOCOL_ENTITY_SNAPLEVEL;
1099 	nl.rl_backend = np->rn_id.rl_backend;
1100 	nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
1101 	nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
1102 	nl.rl_ids[ID_NAME] = np->rn_id.rl_main_id;
1103 	nl.rl_ids[ID_SNAPSHOT] = np->rn_snapshot_id;
1104 
1105 	/* Create rc_node_t's for the snapshot's rc_snaplevel_t's. */
1106 	for (lvl = sp->rs_levels; lvl != NULL; lvl = lvl->rsl_next) {
1107 		nnp = rc_node_alloc();
1108 		assert(nnp != NULL);
1109 		nl.rl_main_id = lvl->rsl_level_id;
1110 		nnp = rc_node_setup_snaplevel(nnp, &nl, lvl, np);
1111 		rc_node_rele(nnp);
1112 	}
1113 
1114 	return (REP_PROTOCOL_SUCCESS);
1115 }
1116 
1117 /*
1118  * Returns
1119  *   _NO_RESOURCES
1120  *   _SUCCESS
1121  */
1122 static int
1123 snaplevel_fill_children(rc_node_t *np)
1124 {
1125 	rc_snaplevel_t *lvl = np->rn_snaplevel;
1126 	child_info_t ci;
1127 	int res;
1128 	backend_query_t *q;
1129 
1130 	(void) snaplevel_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1131 	    &ci);
1132 
1133 	q = backend_query_alloc();
1134 	backend_query_add(q,
1135 	    "SELECT snaplvl_pg_name, snaplvl_pg_id, snaplvl_gen_id, "
1136 	    "    snaplvl_pg_type, snaplvl_pg_flags "
1137 	    "    FROM snaplevel_lnk_tbl "
1138 	    "    WHERE (snaplvl_level_id = %d)",
1139 	    lvl->rsl_level_id);
1140 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1141 	if (res == REP_PROTOCOL_DONE)
1142 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1143 	backend_query_free(q);
1144 
1145 	return (res);
1146 }
1147 
1148 /*
1149  * Returns
1150  *   _NO_RESOURCES
1151  *   _SUCCESS
1152  */
1153 static int
1154 propertygrp_fill_children(rc_node_t *np)
1155 {
1156 	backend_query_t *q;
1157 	child_info_t ci;
1158 	int res;
1159 	backend_tx_t *tx;
1160 
1161 	backend_type_t backend = np->rn_id.rl_backend;
1162 
1163 	(void) propertygrp_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTY,
1164 	    &ci);
1165 
1166 	res = backend_tx_begin_ro(backend, &tx);
1167 	if (res != REP_PROTOCOL_SUCCESS) {
1168 		/*
1169 		 * If the backend didn't exist, we wouldn't have got this
1170 		 * property group.
1171 		 */
1172 		assert(res != REP_PROTOCOL_FAIL_BACKEND_ACCESS);
1173 		return (res);
1174 	}
1175 
1176 	ci.ci_tx = tx;
1177 
1178 	q = backend_query_alloc();
1179 	backend_query_add(q,
1180 	    "SELECT lnk_prop_name, lnk_prop_id, lnk_prop_type, lnk_val_id "
1181 	    "FROM prop_lnk_tbl "
1182 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
1183 	    np->rn_id.rl_main_id, np->rn_gen_id);
1184 	res = backend_tx_run(tx, q, fill_property_callback, &ci);
1185 	if (res == REP_PROTOCOL_DONE)
1186 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1187 	backend_query_free(q);
1188 	backend_tx_end_ro(tx);
1189 
1190 	return (res);
1191 }
1192 
1193 /*
1194  * Fails with
1195  *   _TYPE_MISMATCH - lp is not for a service
1196  *   _INVALID_TYPE - lp has invalid type
1197  *   _BAD_REQUEST - name is invalid
1198  */
1199 static int
1200 scope_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1201 {
1202 	uint32_t type = lp->rl_type;
1203 	int rc;
1204 
1205 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
1206 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1207 
1208 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1209 		return (rc);
1210 
1211 	backend_query_add(q,
1212 	    "SELECT svc_id FROM service_tbl "
1213 	    "WHERE svc_name = '%q'",
1214 	    name);
1215 
1216 	return (REP_PROTOCOL_SUCCESS);
1217 }
1218 
1219 /*
1220  * Fails with
1221  *   _NO_RESOURCES - out of memory
1222  */
1223 static int
1224 scope_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1225 {
1226 	return (backend_tx_run_update(tx,
1227 	    "INSERT INTO service_tbl (svc_id, svc_name) "
1228 	    "VALUES (%d, '%q')",
1229 	    lp->rl_main_id, name));
1230 }
1231 
1232 /*
1233  * Fails with
1234  *   _TYPE_MISMATCH - lp is not for an instance or property group
1235  *   _INVALID_TYPE - lp has invalid type
1236  *   _BAD_REQUEST - name is invalid
1237  */
1238 static int
1239 service_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1240 {
1241 	uint32_t type = lp->rl_type;
1242 	int rc;
1243 
1244 	if (type != REP_PROTOCOL_ENTITY_INSTANCE &&
1245 	    type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
1246 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1247 
1248 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1249 		return (rc);
1250 
1251 	switch (type) {
1252 	case REP_PROTOCOL_ENTITY_INSTANCE:
1253 		backend_query_add(q,
1254 		    "SELECT instance_id FROM instance_tbl "
1255 		    "WHERE instance_name = '%q' AND instance_svc = %d",
1256 		    name, lp->rl_ids[ID_SERVICE]);
1257 		break;
1258 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1259 		backend_query_add(q,
1260 		    "SELECT pg_id FROM pg_tbl "
1261 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1262 		    name, lp->rl_ids[ID_SERVICE]);
1263 		break;
1264 	default:
1265 		assert(0);
1266 		abort();
1267 	}
1268 
1269 	return (REP_PROTOCOL_SUCCESS);
1270 }
1271 
1272 /*
1273  * Fails with
1274  *   _NO_RESOURCES - out of memory
1275  */
1276 static int
1277 service_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1278 {
1279 	return (backend_tx_run_update(tx,
1280 	    "INSERT INTO instance_tbl "
1281 	    "    (instance_id, instance_name, instance_svc) "
1282 	    "VALUES (%d, '%q', %d)",
1283 	    lp->rl_main_id, name, lp->rl_ids[ID_SERVICE]));
1284 }
1285 
1286 /*
1287  * Fails with
1288  *   _NO_RESOURCES - out of memory
1289  */
1290 static int
1291 instance_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1292 {
1293 	return (backend_tx_run_update(tx,
1294 	    "INSERT INTO snapshot_lnk_tbl "
1295 	    "    (lnk_id, lnk_inst_id, lnk_snap_name, lnk_snap_id) "
1296 	    "VALUES (%d, %d, '%q', 0)",
1297 	    lp->rl_main_id, lp->rl_ids[ID_INSTANCE], name));
1298 }
1299 
1300 /*
1301  * Fails with
1302  *   _TYPE_MISMATCH - lp is not for a property group or snapshot
1303  *   _INVALID_TYPE - lp has invalid type
1304  *   _BAD_REQUEST - name is invalid
1305  */
1306 static int
1307 instance_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1308 {
1309 	uint32_t type = lp->rl_type;
1310 	int rc;
1311 
1312 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP &&
1313 	    type != REP_PROTOCOL_ENTITY_SNAPSHOT)
1314 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1315 
1316 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1317 		return (rc);
1318 
1319 	switch (type) {
1320 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1321 		backend_query_add(q,
1322 		    "SELECT pg_id FROM pg_tbl "
1323 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1324 		    name, lp->rl_ids[ID_INSTANCE]);
1325 		break;
1326 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
1327 		backend_query_add(q,
1328 		    "SELECT lnk_id FROM snapshot_lnk_tbl "
1329 		    "    WHERE lnk_snap_name = '%q' AND lnk_inst_id = %d",
1330 		    name, lp->rl_ids[ID_INSTANCE]);
1331 		break;
1332 	default:
1333 		assert(0);
1334 		abort();
1335 	}
1336 
1337 	return (REP_PROTOCOL_SUCCESS);
1338 }
1339 
1340 static int
1341 generic_insert_pg_child(backend_tx_t *tx, rc_node_lookup_t *lp,
1342     const char *name, const char *pgtype, uint32_t flags, uint32_t gen)
1343 {
1344 	int parent_id = (lp->rl_ids[ID_INSTANCE] != 0)?
1345 	    lp->rl_ids[ID_INSTANCE] : lp->rl_ids[ID_SERVICE];
1346 	return (backend_tx_run_update(tx,
1347 	    "INSERT INTO pg_tbl "
1348 	    "    (pg_id, pg_name, pg_parent_id, pg_type, pg_flags, pg_gen_id) "
1349 	    "VALUES (%d, '%q', %d, '%q', %d, %d)",
1350 	    lp->rl_main_id, name, parent_id, pgtype, flags, gen));
1351 }
1352 
1353 static int
1354 service_delete_start(rc_node_t *np, delete_info_t *dip)
1355 {
1356 	int r;
1357 	backend_query_t *q = backend_query_alloc();
1358 
1359 	/*
1360 	 * Check for child instances, and refuse to delete if they exist.
1361 	 */
1362 	backend_query_add(q,
1363 	    "SELECT 1 FROM instance_tbl WHERE instance_svc = %d",
1364 	    np->rn_id.rl_main_id);
1365 
1366 	r = backend_tx_run(dip->di_tx, q, backend_fail_if_seen, NULL);
1367 	backend_query_free(q);
1368 
1369 	if (r == REP_PROTOCOL_DONE)
1370 		return (REP_PROTOCOL_FAIL_EXISTS);	/* instances exist */
1371 
1372 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &service_delete,
1373 	    np->rn_id.rl_main_id, 0));
1374 }
1375 
1376 static int
1377 instance_delete_start(rc_node_t *np, delete_info_t *dip)
1378 {
1379 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &instance_delete,
1380 	    np->rn_id.rl_main_id, 0));
1381 }
1382 
1383 static int
1384 snapshot_delete_start(rc_node_t *np, delete_info_t *dip)
1385 {
1386 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL,
1387 	    &snapshot_lnk_delete, np->rn_id.rl_main_id, 0));
1388 }
1389 
1390 static int
1391 propertygrp_delete_start(rc_node_t *np, delete_info_t *dip)
1392 {
1393 	return (delete_stack_push(dip, np->rn_id.rl_backend,
1394 	    &propertygrp_delete, np->rn_id.rl_main_id, 0));
1395 }
1396 
1397 static object_info_t info[] = {
1398 	{REP_PROTOCOL_ENTITY_NONE},
1399 	{REP_PROTOCOL_ENTITY_SCOPE,
1400 		BACKEND_ID_INVALID,
1401 		scope_fill_children,
1402 		scope_setup_child_info,
1403 		scope_query_child,
1404 		scope_insert_child,
1405 		NULL,
1406 		NULL,
1407 	},
1408 	{REP_PROTOCOL_ENTITY_SERVICE,
1409 		BACKEND_ID_SERVICE_INSTANCE,
1410 		service_fill_children,
1411 		service_setup_child_info,
1412 		service_query_child,
1413 		service_insert_child,
1414 		generic_insert_pg_child,
1415 		service_delete_start,
1416 	},
1417 	{REP_PROTOCOL_ENTITY_INSTANCE,
1418 		BACKEND_ID_SERVICE_INSTANCE,
1419 		instance_fill_children,
1420 		instance_setup_child_info,
1421 		instance_query_child,
1422 		instance_insert_child,
1423 		generic_insert_pg_child,
1424 		instance_delete_start,
1425 	},
1426 	{REP_PROTOCOL_ENTITY_SNAPSHOT,
1427 		BACKEND_ID_SNAPNAME,
1428 		snapshot_fill_children,
1429 		NULL,
1430 		NULL,
1431 		NULL,
1432 		NULL,
1433 		snapshot_delete_start,
1434 	},
1435 	{REP_PROTOCOL_ENTITY_SNAPLEVEL,
1436 		BACKEND_ID_SNAPLEVEL,
1437 		snaplevel_fill_children,
1438 		snaplevel_setup_child_info,
1439 	},
1440 	{REP_PROTOCOL_ENTITY_PROPERTYGRP,
1441 		BACKEND_ID_PROPERTYGRP,
1442 		propertygrp_fill_children,
1443 		NULL,
1444 		NULL,
1445 		NULL,
1446 		NULL,
1447 		propertygrp_delete_start,
1448 	},
1449 	{REP_PROTOCOL_ENTITY_PROPERTY},
1450 	{-1UL}
1451 };
1452 #define	NUM_INFO (sizeof (info) / sizeof (*info))
1453 
1454 /*
1455  * object_fill_children() populates the child list of an rc_node_t by calling
1456  * the appropriate <type>_fill_children() which runs backend queries that
1457  * call an appropriate fill_*_callback() which takes a row of results,
1458  * decodes them, and calls an rc_node_setup*() function in rc_node.c to create
1459  * a child.
1460  *
1461  * Fails with
1462  *   _NO_RESOURCES
1463  */
1464 int
1465 object_fill_children(rc_node_t *pp)
1466 {
1467 	uint32_t type = pp->rn_id.rl_type;
1468 	assert(type > 0 && type < NUM_INFO);
1469 
1470 	return ((*info[type].obj_fill_children)(pp));
1471 }
1472 
1473 int
1474 object_delete(rc_node_t *pp)
1475 {
1476 	int rc;
1477 
1478 	delete_info_t dip;
1479 	delete_ent_t de;
1480 
1481 	uint32_t type = pp->rn_id.rl_type;
1482 	assert(type > 0 && type < NUM_INFO);
1483 
1484 	if (info[type].obj_delete_start == NULL)
1485 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1486 
1487 	(void) memset(&dip, '\0', sizeof (dip));
1488 	rc = backend_tx_begin(BACKEND_TYPE_NORMAL, &dip.di_tx);
1489 	if (rc != REP_PROTOCOL_SUCCESS)
1490 		return (rc);
1491 
1492 	rc = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &dip.di_np_tx);
1493 	if (rc == REP_PROTOCOL_FAIL_BACKEND_ACCESS ||
1494 	    rc == REP_PROTOCOL_FAIL_BACKEND_READONLY)
1495 		dip.di_np_tx = NULL;
1496 	else if (rc != REP_PROTOCOL_SUCCESS) {
1497 		backend_tx_rollback(dip.di_tx);
1498 		return (rc);
1499 	}
1500 
1501 	if ((rc = (*info[type].obj_delete_start)(pp, &dip)) !=
1502 	    REP_PROTOCOL_SUCCESS) {
1503 		goto fail;
1504 	}
1505 
1506 	while (delete_stack_pop(&dip, &de)) {
1507 		rc = (*de.de_cb)(&dip, &de);
1508 		if (rc != REP_PROTOCOL_SUCCESS)
1509 			goto fail;
1510 	}
1511 
1512 	rc = backend_tx_commit(dip.di_tx);
1513 	if (rc != REP_PROTOCOL_SUCCESS)
1514 		backend_tx_rollback(dip.di_np_tx);
1515 	else if (dip.di_np_tx)
1516 		(void) backend_tx_commit(dip.di_np_tx);
1517 
1518 	delete_stack_cleanup(&dip);
1519 
1520 	return (rc);
1521 
1522 fail:
1523 	backend_tx_rollback(dip.di_tx);
1524 	backend_tx_rollback(dip.di_np_tx);
1525 	delete_stack_cleanup(&dip);
1526 	return (rc);
1527 }
1528 
1529 int
1530 object_do_create(backend_tx_t *tx, child_info_t *cip, rc_node_t *pp,
1531     uint32_t type, const char *name, rc_node_t **cpp)
1532 {
1533 	uint32_t ptype = pp->rn_id.rl_type;
1534 
1535 	backend_query_t *q;
1536 	uint32_t id;
1537 	rc_node_t *np = NULL;
1538 	int rc;
1539 	object_info_t *ip;
1540 
1541 	rc_node_lookup_t *lp = &cip->ci_base_nl;
1542 
1543 	assert(ptype > 0 && ptype < NUM_INFO);
1544 
1545 	ip = &info[ptype];
1546 
1547 	if (type == REP_PROTOCOL_ENTITY_PROPERTYGRP)
1548 		return (REP_PROTOCOL_FAIL_NOT_APPLICABLE);
1549 
1550 	if (ip->obj_setup_child_info == NULL ||
1551 	    ip->obj_query_child == NULL ||
1552 	    ip->obj_insert_child == NULL)
1553 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1554 
1555 	if ((rc = (*ip->obj_setup_child_info)(pp, type, cip)) !=
1556 	    REP_PROTOCOL_SUCCESS)
1557 		return (rc);
1558 
1559 	q = backend_query_alloc();
1560 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1561 	    REP_PROTOCOL_SUCCESS) {
1562 		assert(rc == REP_PROTOCOL_FAIL_BAD_REQUEST);
1563 		backend_query_free(q);
1564 		return (rc);
1565 	}
1566 
1567 	rc = backend_tx_run_single_int(tx, q, &id);
1568 	backend_query_free(q);
1569 
1570 	if (rc == REP_PROTOCOL_SUCCESS)
1571 		return (REP_PROTOCOL_FAIL_EXISTS);
1572 	else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND)
1573 		return (rc);
1574 
1575 	if ((lp->rl_main_id = backend_new_id(tx,
1576 	    info[type].obj_id_space)) == 0) {
1577 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1578 	}
1579 
1580 	if ((np = rc_node_alloc()) == NULL)
1581 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1582 
1583 	if ((rc = (*ip->obj_insert_child)(tx, lp, name)) !=
1584 	    REP_PROTOCOL_SUCCESS) {
1585 		rc_node_destroy(np);
1586 		return (rc);
1587 	}
1588 
1589 	*cpp = np;
1590 	return (REP_PROTOCOL_SUCCESS);
1591 }
1592 
1593 /*
1594  * Fails with
1595  *   _NOT_APPLICABLE - type is _PROPERTYGRP
1596  *   _BAD_REQUEST - cannot create children for this type of node
1597  *		    name is invalid
1598  *   _TYPE_MISMATCH - object cannot have children of type type
1599  *   _NO_RESOURCES - out of memory, or could not allocate new id
1600  *   _BACKEND_READONLY
1601  *   _BACKEND_ACCESS
1602  *   _EXISTS - child already exists
1603  */
1604 int
1605 object_create(rc_node_t *pp, uint32_t type, const char *name, rc_node_t **cpp)
1606 {
1607 	backend_tx_t *tx;
1608 	rc_node_t *np = NULL;
1609 	child_info_t ci;
1610 	int rc;
1611 
1612 	if ((rc = backend_tx_begin(pp->rn_id.rl_backend, &tx)) !=
1613 	    REP_PROTOCOL_SUCCESS) {
1614 		return (rc);
1615 	}
1616 
1617 	if ((rc = object_do_create(tx, &ci, pp, type, name, &np)) !=
1618 	    REP_PROTOCOL_SUCCESS) {
1619 		backend_tx_rollback(tx);
1620 		return (rc);
1621 	}
1622 
1623 	rc = backend_tx_commit(tx);
1624 	if (rc != REP_PROTOCOL_SUCCESS) {
1625 		rc_node_destroy(np);
1626 		return (rc);
1627 	}
1628 
1629 	*cpp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
1630 
1631 	return (REP_PROTOCOL_SUCCESS);
1632 }
1633 
1634 /*ARGSUSED*/
1635 int
1636 object_create_pg(rc_node_t *pp, uint32_t type, const char *name,
1637     const char *pgtype, uint32_t flags, rc_node_t **cpp)
1638 {
1639 	uint32_t ptype = pp->rn_id.rl_type;
1640 	backend_tx_t *tx_ro, *tx_wr;
1641 	backend_query_t *q;
1642 	uint32_t id;
1643 	uint32_t gen = 0;
1644 	rc_node_t *np = NULL;
1645 	int rc;
1646 	int rc_wr;
1647 	int rc_ro;
1648 	object_info_t *ip;
1649 
1650 	int nonpersist = (flags & SCF_PG_FLAG_NONPERSISTENT);
1651 
1652 	child_info_t ci;
1653 	rc_node_lookup_t *lp = &ci.ci_base_nl;
1654 
1655 	assert(ptype > 0 && ptype < NUM_INFO);
1656 
1657 	if (ptype != REP_PROTOCOL_ENTITY_SERVICE &&
1658 	    ptype != REP_PROTOCOL_ENTITY_INSTANCE)
1659 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1660 
1661 	ip = &info[ptype];
1662 
1663 	assert(ip->obj_setup_child_info != NULL &&
1664 	    ip->obj_query_child != NULL &&
1665 	    ip->obj_insert_pg_child != NULL);
1666 
1667 	if ((rc = (*ip->obj_setup_child_info)(pp, type, &ci)) !=
1668 	    REP_PROTOCOL_SUCCESS)
1669 		return (rc);
1670 
1671 	q = backend_query_alloc();
1672 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1673 	    REP_PROTOCOL_SUCCESS) {
1674 		backend_query_free(q);
1675 		return (rc);
1676 	}
1677 
1678 	if (!nonpersist) {
1679 		lp->rl_backend = BACKEND_TYPE_NORMAL;
1680 		rc_wr = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx_wr);
1681 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NONPERSIST, &tx_ro);
1682 	} else {
1683 		lp->rl_backend = BACKEND_TYPE_NONPERSIST;
1684 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NORMAL, &tx_ro);
1685 		rc_wr = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &tx_wr);
1686 	}
1687 
1688 	if (rc_wr != REP_PROTOCOL_SUCCESS) {
1689 		rc = rc_wr;
1690 		goto fail;
1691 	}
1692 	if (rc_ro != REP_PROTOCOL_SUCCESS &&
1693 	    rc_ro != REP_PROTOCOL_FAIL_BACKEND_ACCESS) {
1694 		rc = rc_ro;
1695 		goto fail;
1696 	}
1697 
1698 	if (tx_ro != NULL) {
1699 		rc = backend_tx_run_single_int(tx_ro, q, &id);
1700 
1701 		if (rc == REP_PROTOCOL_SUCCESS) {
1702 			backend_query_free(q);
1703 			rc = REP_PROTOCOL_FAIL_EXISTS;
1704 			goto fail;
1705 		} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1706 			backend_query_free(q);
1707 			goto fail;
1708 		}
1709 	}
1710 
1711 	rc = backend_tx_run_single_int(tx_wr, q, &id);
1712 	backend_query_free(q);
1713 
1714 	if (rc == REP_PROTOCOL_SUCCESS) {
1715 		rc = REP_PROTOCOL_FAIL_EXISTS;
1716 		goto fail;
1717 	} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1718 		goto fail;
1719 	}
1720 
1721 	if (tx_ro != NULL)
1722 		backend_tx_end_ro(tx_ro);
1723 	tx_ro = NULL;
1724 
1725 	if ((lp->rl_main_id = backend_new_id(tx_wr,
1726 	    info[type].obj_id_space)) == 0) {
1727 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1728 		goto fail;
1729 	}
1730 
1731 	if ((np = rc_node_alloc()) == NULL) {
1732 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1733 		goto fail;
1734 	}
1735 
1736 	if ((rc = (*ip->obj_insert_pg_child)(tx_wr, lp, name, pgtype, flags,
1737 	    gen)) != REP_PROTOCOL_SUCCESS) {
1738 		rc_node_destroy(np);
1739 		goto fail;
1740 	}
1741 
1742 	rc = backend_tx_commit(tx_wr);
1743 	if (rc != REP_PROTOCOL_SUCCESS) {
1744 		rc_node_destroy(np);
1745 		return (rc);
1746 	}
1747 
1748 	*cpp = rc_node_setup_pg(np, lp, name, pgtype, flags, gen, ci.ci_parent);
1749 
1750 	return (REP_PROTOCOL_SUCCESS);
1751 
1752 fail:
1753 	if (tx_ro != NULL)
1754 		backend_tx_end_ro(tx_ro);
1755 	if (tx_wr != NULL)
1756 		backend_tx_rollback(tx_wr);
1757 	return (rc);
1758 }
1759 
1760 /*
1761  * Given a row of snaplevel number, snaplevel id, service id, service name,
1762  * instance id, & instance name, create a rc_snaplevel_t & prepend it onto the
1763  * rs_levels list of the rc_snapshot_t passed in as data.
1764  * Returns _CONTINUE on success or _ABORT if any allocations fail.
1765  */
1766 /*ARGSUSED*/
1767 static int
1768 fill_snapshot_cb(void *data, int columns, char **vals, char **names)
1769 {
1770 	rc_snapshot_t *sp = data;
1771 	rc_snaplevel_t *lvl;
1772 	char *num = vals[0];
1773 	char *id = vals[1];
1774 	char *service_id = vals[2];
1775 	char *service = vals[3];
1776 	char *instance_id = vals[4];
1777 	char *instance = vals[5];
1778 	assert(columns == 6);
1779 
1780 	lvl = uu_zalloc(sizeof (*lvl));
1781 	if (lvl == NULL)
1782 		return (BACKEND_CALLBACK_ABORT);
1783 	lvl->rsl_parent = sp;
1784 	lvl->rsl_next = sp->rs_levels;
1785 	sp->rs_levels = lvl;
1786 
1787 	string_to_id(num, &lvl->rsl_level_num, "snap_level_num");
1788 	string_to_id(id, &lvl->rsl_level_id, "snap_level_id");
1789 	string_to_id(service_id, &lvl->rsl_service_id, "snap_level_service_id");
1790 	if (instance_id != NULL)
1791 		string_to_id(instance_id, &lvl->rsl_instance_id,
1792 		    "snap_level_instance_id");
1793 
1794 	lvl->rsl_scope = (const char *)"localhost";
1795 	lvl->rsl_service = strdup(service);
1796 	if (lvl->rsl_service == NULL) {
1797 		uu_free(lvl);
1798 		return (BACKEND_CALLBACK_ABORT);
1799 	}
1800 	if (instance) {
1801 		assert(lvl->rsl_instance_id != 0);
1802 		lvl->rsl_instance = strdup(instance);
1803 		if (lvl->rsl_instance == NULL) {
1804 			free((void *)lvl->rsl_instance);
1805 			uu_free(lvl);
1806 			return (BACKEND_CALLBACK_ABORT);
1807 		}
1808 	} else {
1809 		assert(lvl->rsl_instance_id == 0);
1810 	}
1811 
1812 	return (BACKEND_CALLBACK_CONTINUE);
1813 }
1814 
1815 /*
1816  * Populate sp's rs_levels list from the snaplevel_tbl table.
1817  * Fails with
1818  *   _NO_RESOURCES
1819  */
1820 int
1821 object_fill_snapshot(rc_snapshot_t *sp)
1822 {
1823 	backend_query_t *q;
1824 	rc_snaplevel_t *sl;
1825 	int result;
1826 	int i;
1827 
1828 	q = backend_query_alloc();
1829 	backend_query_add(q,
1830 	    "SELECT snap_level_num, snap_level_id, "
1831 	    "    snap_level_service_id, snap_level_service, "
1832 	    "    snap_level_instance_id, snap_level_instance "
1833 	    "FROM snaplevel_tbl "
1834 	    "WHERE snap_id = %d "
1835 	    "ORDER BY snap_level_id DESC",
1836 	    sp->rs_snap_id);
1837 
1838 	result = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_cb, sp);
1839 	if (result == REP_PROTOCOL_DONE)
1840 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
1841 	backend_query_free(q);
1842 
1843 	if (result == REP_PROTOCOL_SUCCESS) {
1844 		i = 0;
1845 		for (sl = sp->rs_levels; sl != NULL; sl = sl->rsl_next) {
1846 			if (sl->rsl_level_num != ++i) {
1847 				backend_panic("snaplevels corrupt; expected "
1848 				    "level %d, got %d", i, sl->rsl_level_num);
1849 			}
1850 		}
1851 	}
1852 	return (result);
1853 }
1854 
1855 /*
1856  * This represents a property group in a snapshot.
1857  */
1858 typedef struct check_snapshot_elem {
1859 	uint32_t cse_parent;
1860 	uint32_t cse_pg_id;
1861 	uint32_t cse_pg_gen;
1862 	char	cse_seen;
1863 } check_snapshot_elem_t;
1864 
1865 #define	CSI_MAX_PARENTS		COMPOSITION_DEPTH
1866 typedef struct check_snapshot_info {
1867 	size_t			csi_count;
1868 	size_t			csi_array_size;
1869 	check_snapshot_elem_t	*csi_array;
1870 	size_t			csi_nparents;
1871 	uint32_t		csi_parent_ids[CSI_MAX_PARENTS];
1872 } check_snapshot_info_t;
1873 
1874 /*ARGSUSED*/
1875 static int
1876 check_snapshot_fill_cb(void *data, int columns, char **vals, char **names)
1877 {
1878 	check_snapshot_info_t *csip = data;
1879 	check_snapshot_elem_t *cur;
1880 	const char *parent;
1881 	const char *pg_id;
1882 	const char *pg_gen_id;
1883 
1884 	if (columns == 1) {
1885 		uint32_t *target;
1886 
1887 		if (csip->csi_nparents >= CSI_MAX_PARENTS)
1888 			backend_panic("snaplevel table has too many elements");
1889 
1890 		target = &csip->csi_parent_ids[csip->csi_nparents++];
1891 		string_to_id(vals[0], target, "snap_level_*_id");
1892 
1893 		return (BACKEND_CALLBACK_CONTINUE);
1894 	}
1895 
1896 	assert(columns == 3);
1897 
1898 	parent = vals[0];
1899 	pg_id = vals[1];
1900 	pg_gen_id = vals[2];
1901 
1902 	if (csip->csi_count == csip->csi_array_size) {
1903 		size_t newsz = (csip->csi_array_size > 0) ?
1904 		    csip->csi_array_size * 2 : 8;
1905 		check_snapshot_elem_t *new = uu_zalloc(newsz * sizeof (*new));
1906 
1907 		if (new == NULL)
1908 			return (BACKEND_CALLBACK_ABORT);
1909 
1910 		(void) memcpy(new, csip->csi_array,
1911 		    sizeof (*new) * csip->csi_array_size);
1912 		uu_free(csip->csi_array);
1913 		csip->csi_array = new;
1914 		csip->csi_array_size = newsz;
1915 	}
1916 
1917 	cur = &csip->csi_array[csip->csi_count++];
1918 
1919 	string_to_id(parent, &cur->cse_parent, "snap_level_*_id");
1920 	string_to_id(pg_id, &cur->cse_pg_id, "snaplvl_pg_id");
1921 	string_to_id(pg_gen_id, &cur->cse_pg_gen, "snaplvl_gen_id");
1922 	cur->cse_seen = 0;
1923 
1924 	return (BACKEND_CALLBACK_CONTINUE);
1925 }
1926 
1927 static int
1928 check_snapshot_elem_cmp(const void *lhs_arg, const void *rhs_arg)
1929 {
1930 	const check_snapshot_elem_t *lhs = lhs_arg;
1931 	const check_snapshot_elem_t *rhs = rhs_arg;
1932 
1933 	if (lhs->cse_parent < rhs->cse_parent)
1934 		return (-1);
1935 	if (lhs->cse_parent > rhs->cse_parent)
1936 		return (1);
1937 
1938 	if (lhs->cse_pg_id < rhs->cse_pg_id)
1939 		return (-1);
1940 	if (lhs->cse_pg_id > rhs->cse_pg_id)
1941 		return (1);
1942 
1943 	if (lhs->cse_pg_gen < rhs->cse_pg_gen)
1944 		return (-1);
1945 	if (lhs->cse_pg_gen > rhs->cse_pg_gen)
1946 		return (1);
1947 
1948 	return (0);
1949 }
1950 
1951 /*ARGSUSED*/
1952 static int
1953 check_snapshot_check_cb(void *data, int columns, char **vals, char **names)
1954 {
1955 	check_snapshot_info_t *csip = data;
1956 	check_snapshot_elem_t elem;
1957 	check_snapshot_elem_t *cur;
1958 
1959 	const char *parent = vals[0];
1960 	const char *pg_id = vals[1];
1961 	const char *pg_gen_id = vals[2];
1962 
1963 	assert(columns == 3);
1964 
1965 	string_to_id(parent, &elem.cse_parent, "snap_level_*_id");
1966 	string_to_id(pg_id, &elem.cse_pg_id, "snaplvl_pg_id");
1967 	string_to_id(pg_gen_id, &elem.cse_pg_gen, "snaplvl_gen_id");
1968 
1969 	if ((cur = bsearch(&elem, csip->csi_array, csip->csi_count,
1970 	    sizeof (*csip->csi_array), check_snapshot_elem_cmp)) == NULL)
1971 		return (BACKEND_CALLBACK_ABORT);
1972 
1973 	if (cur->cse_seen)
1974 		backend_panic("duplicate property group reported");
1975 	cur->cse_seen = 1;
1976 	return (BACKEND_CALLBACK_CONTINUE);
1977 }
1978 
1979 /*
1980  * Check that a snapshot matches up with the latest in the repository.
1981  * Returns:
1982  *	REP_PROTOCOL_SUCCESS		if it is up-to-date,
1983  *	REP_PROTOCOL_DONE		if it is out-of-date, or
1984  *	REP_PROTOCOL_FAIL_NO_RESOURCES	if we ran out of memory.
1985  */
1986 static int
1987 object_check_snapshot(uint32_t snap_id)
1988 {
1989 	check_snapshot_info_t csi;
1990 	backend_query_t *q;
1991 	int result;
1992 	size_t idx;
1993 
1994 	/* if the snapshot has never been taken, it must be out of date. */
1995 	if (snap_id == 0)
1996 		return (REP_PROTOCOL_DONE);
1997 
1998 	(void) memset(&csi, '\0', sizeof (csi));
1999 
2000 	q = backend_query_alloc();
2001 	backend_query_add(q,
2002 	    "SELECT\n"
2003 	    "    CASE snap_level_instance_id\n"
2004 	    "        WHEN 0 THEN snap_level_service_id\n"
2005 	    "        ELSE snap_level_instance_id\n"
2006 	    "    END\n"
2007 	    "FROM snaplevel_tbl\n"
2008 	    "WHERE snap_id = %d;\n"
2009 	    "\n"
2010 	    "SELECT\n"
2011 	    "    CASE snap_level_instance_id\n"
2012 	    "        WHEN 0 THEN snap_level_service_id\n"
2013 	    "        ELSE snap_level_instance_id\n"
2014 	    "    END,\n"
2015 	    "    snaplvl_pg_id,\n"
2016 	    "    snaplvl_gen_id\n"
2017 	    "FROM snaplevel_tbl, snaplevel_lnk_tbl\n"
2018 	    "WHERE\n"
2019 	    "    (snaplvl_level_id = snap_level_id AND\n"
2020 	    "    snap_id = %d);",
2021 	    snap_id, snap_id);
2022 
2023 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_fill_cb,
2024 	    &csi);
2025 	if (result == REP_PROTOCOL_DONE)
2026 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2027 	backend_query_free(q);
2028 
2029 	if (result != REP_PROTOCOL_SUCCESS)
2030 		goto fail;
2031 
2032 	if (csi.csi_count > 0) {
2033 		qsort(csi.csi_array, csi.csi_count, sizeof (*csi.csi_array),
2034 		    check_snapshot_elem_cmp);
2035 	}
2036 
2037 #if COMPOSITION_DEPTH == 2
2038 	if (csi.csi_nparents != COMPOSITION_DEPTH) {
2039 		result = REP_PROTOCOL_DONE;
2040 		goto fail;
2041 	}
2042 
2043 	q = backend_query_alloc();
2044 	backend_query_add(q,
2045 	    "SELECT "
2046 	    "    pg_parent_id, pg_id, pg_gen_id "
2047 	    "FROM "
2048 	    "    pg_tbl "
2049 	    "WHERE (pg_parent_id = %d OR pg_parent_id = %d)",
2050 	    csi.csi_parent_ids[0], csi.csi_parent_ids[1]);
2051 
2052 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_check_cb,
2053 	    &csi);
2054 #else
2055 #error This code must be updated
2056 #endif
2057 	/*
2058 	 * To succeed, the callback must not have aborted, and we must have
2059 	 * found all of the items.
2060 	 */
2061 	if (result == REP_PROTOCOL_SUCCESS) {
2062 		for (idx = 0; idx < csi.csi_count; idx++) {
2063 			if (csi.csi_array[idx].cse_seen == 0) {
2064 				result = REP_PROTOCOL_DONE;
2065 				goto fail;
2066 			}
2067 		}
2068 	}
2069 
2070 fail:
2071 	uu_free(csi.csi_array);
2072 	return (result);
2073 }
2074 
2075 /*ARGSUSED*/
2076 static int
2077 object_copy_string(void *data_arg, int columns, char **vals, char **names)
2078 {
2079 	char **data = data_arg;
2080 
2081 	assert(columns == 1);
2082 
2083 	if (*data != NULL)
2084 		free(*data);
2085 	*data = NULL;
2086 
2087 	if (vals[0] != NULL) {
2088 		if ((*data = strdup(vals[0])) == NULL)
2089 			return (BACKEND_CALLBACK_ABORT);
2090 	}
2091 
2092 	return (BACKEND_CALLBACK_CONTINUE);
2093 }
2094 
2095 struct snaplevel_add_info {
2096 	backend_query_t *sai_q;
2097 	uint32_t	sai_level_id;
2098 	int		sai_used;		/* sai_q has been used */
2099 };
2100 
2101 /*ARGSUSED*/
2102 static int
2103 object_snaplevel_process_pg(void *data_arg, int columns, char **vals,
2104     char **names)
2105 {
2106 	struct snaplevel_add_info *data = data_arg;
2107 
2108 	assert(columns == 5);
2109 
2110 	backend_query_add(data->sai_q,
2111 	    "INSERT INTO snaplevel_lnk_tbl "
2112 	    "    (snaplvl_level_id, snaplvl_pg_id, snaplvl_pg_name, "
2113 	    "    snaplvl_pg_type, snaplvl_pg_flags, snaplvl_gen_id)"
2114 	    "VALUES (%d, %s, '%q', '%q', %s, %s);",
2115 	    data->sai_level_id, vals[0], vals[1], vals[2], vals[3], vals[4]);
2116 
2117 	data->sai_used = 1;
2118 
2119 	return (BACKEND_CALLBACK_CONTINUE);
2120 }
2121 
2122 /*ARGSUSED*/
2123 static int
2124 object_snapshot_add_level(backend_tx_t *tx, uint32_t snap_id,
2125     uint32_t snap_level_num, uint32_t svc_id, const char *svc_name,
2126     uint32_t inst_id, const char *inst_name)
2127 {
2128 	struct snaplevel_add_info data;
2129 	backend_query_t *q;
2130 	int result;
2131 
2132 	assert((snap_level_num == 1 && inst_name != NULL) ||
2133 	    snap_level_num == 2 && inst_name == NULL);
2134 
2135 	data.sai_level_id = backend_new_id(tx, BACKEND_ID_SNAPLEVEL);
2136 	if (data.sai_level_id == 0) {
2137 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
2138 	}
2139 
2140 	result = backend_tx_run_update(tx,
2141 	    "INSERT INTO snaplevel_tbl "
2142 	    "    (snap_id, snap_level_num, snap_level_id, "
2143 	    "    snap_level_service_id, snap_level_service, "
2144 	    "    snap_level_instance_id, snap_level_instance) "
2145 	    "VALUES (%d, %d, %d, %d, %Q, %d, %Q);",
2146 	    snap_id, snap_level_num, data.sai_level_id, svc_id, svc_name,
2147 	    inst_id, inst_name);
2148 
2149 	q = backend_query_alloc();
2150 	backend_query_add(q,
2151 	    "SELECT pg_id, pg_name, pg_type, pg_flags, pg_gen_id FROM pg_tbl "
2152 	    "WHERE (pg_parent_id = %d);",
2153 	    (inst_name != NULL)? inst_id : svc_id);
2154 
2155 	data.sai_q = backend_query_alloc();
2156 	data.sai_used = 0;
2157 	result = backend_tx_run(tx, q, object_snaplevel_process_pg,
2158 	    &data);
2159 	backend_query_free(q);
2160 
2161 	if (result == REP_PROTOCOL_SUCCESS && data.sai_used != 0)
2162 		result = backend_tx_run(tx, data.sai_q, NULL, NULL);
2163 	backend_query_free(data.sai_q);
2164 
2165 	return (result);
2166 }
2167 
2168 /*
2169  * Fails with:
2170  *	_NO_RESOURCES - no new id or out of disk space
2171  *	_BACKEND_READONLY - persistent backend is read-only
2172  */
2173 static int
2174 object_snapshot_do_take(uint32_t instid, const char *inst_name,
2175     uint32_t svcid, const char *svc_name,
2176     backend_tx_t **tx_out, uint32_t *snapid_out)
2177 {
2178 	backend_tx_t *tx;
2179 	backend_query_t *q;
2180 	int result;
2181 
2182 	char *svc_name_alloc = NULL;
2183 	char *inst_name_alloc = NULL;
2184 	uint32_t snapid;
2185 
2186 	result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2187 	if (result != REP_PROTOCOL_SUCCESS)
2188 		return (result);
2189 
2190 	snapid = backend_new_id(tx, BACKEND_ID_SNAPSHOT);
2191 	if (snapid == 0) {
2192 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2193 		goto fail;
2194 	}
2195 
2196 	if (svc_name == NULL) {
2197 		q = backend_query_alloc();
2198 		backend_query_add(q,
2199 		    "SELECT svc_name FROM service_tbl "
2200 		    "WHERE (svc_id = %d)", svcid);
2201 		result = backend_tx_run(tx, q, object_copy_string,
2202 		    &svc_name_alloc);
2203 		backend_query_free(q);
2204 
2205 		svc_name = svc_name_alloc;
2206 
2207 		if (result == REP_PROTOCOL_DONE) {
2208 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2209 			goto fail;
2210 		}
2211 		if (result == REP_PROTOCOL_SUCCESS && svc_name == NULL)
2212 			backend_panic("unable to find name for svc id %d\n",
2213 			    svcid);
2214 
2215 		if (result != REP_PROTOCOL_SUCCESS)
2216 			goto fail;
2217 	}
2218 
2219 	if (inst_name == NULL) {
2220 		q = backend_query_alloc();
2221 		backend_query_add(q,
2222 		    "SELECT instance_name FROM instance_tbl "
2223 		    "WHERE (instance_id = %d)", instid);
2224 		result = backend_tx_run(tx, q, object_copy_string,
2225 		    &inst_name_alloc);
2226 		backend_query_free(q);
2227 
2228 		inst_name = inst_name_alloc;
2229 
2230 		if (result == REP_PROTOCOL_DONE) {
2231 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2232 			goto fail;
2233 		}
2234 
2235 		if (result == REP_PROTOCOL_SUCCESS && inst_name == NULL)
2236 			backend_panic(
2237 			    "unable to find name for instance id %d\n", instid);
2238 
2239 		if (result != REP_PROTOCOL_SUCCESS)
2240 			goto fail;
2241 	}
2242 
2243 	result = object_snapshot_add_level(tx, snapid, 1,
2244 	    svcid, svc_name, instid, inst_name);
2245 
2246 	if (result != REP_PROTOCOL_SUCCESS)
2247 		goto fail;
2248 
2249 	result = object_snapshot_add_level(tx, snapid, 2,
2250 	    svcid, svc_name, 0, NULL);
2251 
2252 	if (result != REP_PROTOCOL_SUCCESS)
2253 		goto fail;
2254 
2255 	*snapid_out = snapid;
2256 	*tx_out = tx;
2257 
2258 	free(svc_name_alloc);
2259 	free(inst_name_alloc);
2260 
2261 	return (REP_PROTOCOL_SUCCESS);
2262 
2263 fail:
2264 	backend_tx_rollback(tx);
2265 	free(svc_name_alloc);
2266 	free(inst_name_alloc);
2267 	return (result);
2268 }
2269 
2270 /*
2271  * Fails with:
2272  *	_TYPE_MISMATCH - pp is not an instance
2273  *	_NO_RESOURCES - no new id or out of disk space
2274  *	_BACKEND_READONLY - persistent backend is read-only
2275  */
2276 int
2277 object_snapshot_take_new(rc_node_t *pp,
2278     const char *svc_name, const char *inst_name,
2279     const char *name, rc_node_t **outp)
2280 {
2281 	rc_node_lookup_t *insti = &pp->rn_id;
2282 
2283 	uint32_t instid = insti->rl_main_id;
2284 	uint32_t svcid = insti->rl_ids[ID_SERVICE];
2285 	uint32_t snapid = 0;
2286 	backend_tx_t *tx = NULL;
2287 	child_info_t ci;
2288 	rc_node_t *np;
2289 	int result;
2290 
2291 	if (insti->rl_type != REP_PROTOCOL_ENTITY_INSTANCE)
2292 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2293 
2294 	result = object_snapshot_do_take(instid, inst_name, svcid, svc_name,
2295 	    &tx, &snapid);
2296 	if (result != REP_PROTOCOL_SUCCESS)
2297 		return (result);
2298 
2299 	if ((result = object_do_create(tx, &ci, pp,
2300 	    REP_PROTOCOL_ENTITY_SNAPSHOT, name, &np)) != REP_PROTOCOL_SUCCESS) {
2301 		backend_tx_rollback(tx);
2302 		return (result);
2303 	}
2304 
2305 	/*
2306 	 * link the new object to the new snapshot.
2307 	 */
2308 	np->rn_snapshot_id = snapid;
2309 
2310 	result = backend_tx_run_update(tx,
2311 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2312 	    snapid, ci.ci_base_nl.rl_main_id);
2313 	if (result != REP_PROTOCOL_SUCCESS) {
2314 		backend_tx_rollback(tx);
2315 		rc_node_destroy(np);
2316 		return (result);
2317 	}
2318 	result = backend_tx_commit(tx);
2319 	if (result != REP_PROTOCOL_SUCCESS) {
2320 		rc_node_destroy(np);
2321 		return (result);
2322 	}
2323 
2324 	*outp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
2325 	return (REP_PROTOCOL_SUCCESS);
2326 }
2327 
2328 /*
2329  * Fails with:
2330  *	_TYPE_MISMATCH - pp is not an instance
2331  *	_NO_RESOURCES - no new id or out of disk space
2332  *	_BACKEND_READONLY - persistent backend is read-only
2333  */
2334 int
2335 object_snapshot_attach(rc_node_lookup_t *snapi, uint32_t *snapid_ptr,
2336     int takesnap)
2337 {
2338 	uint32_t svcid = snapi->rl_ids[ID_SERVICE];
2339 	uint32_t instid = snapi->rl_ids[ID_INSTANCE];
2340 	uint32_t snapid = *snapid_ptr;
2341 	uint32_t oldsnapid = 0;
2342 	backend_tx_t *tx = NULL;
2343 	backend_query_t *q;
2344 	int result;
2345 
2346 	delete_info_t dip;
2347 	delete_ent_t de;
2348 
2349 	if (snapi->rl_type != REP_PROTOCOL_ENTITY_SNAPSHOT)
2350 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2351 
2352 	if (takesnap) {
2353 		/* first, check that we're actually out of date */
2354 		if (object_check_snapshot(snapid) == REP_PROTOCOL_SUCCESS)
2355 			return (REP_PROTOCOL_SUCCESS);
2356 
2357 		result = object_snapshot_do_take(instid, NULL,
2358 		    svcid, NULL, &tx, &snapid);
2359 		if (result != REP_PROTOCOL_SUCCESS)
2360 			return (result);
2361 	} else {
2362 		result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2363 		if (result != REP_PROTOCOL_SUCCESS)
2364 			return (result);
2365 	}
2366 
2367 	q = backend_query_alloc();
2368 	backend_query_add(q,
2369 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
2370 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2371 	    snapi->rl_main_id, snapid, snapi->rl_main_id);
2372 	result = backend_tx_run_single_int(tx, q, &oldsnapid);
2373 	backend_query_free(q);
2374 
2375 	if (result == REP_PROTOCOL_FAIL_NOT_FOUND) {
2376 		backend_tx_rollback(tx);
2377 		backend_panic("unable to find snapshot id %d",
2378 		    snapi->rl_main_id);
2379 	}
2380 	if (result != REP_PROTOCOL_SUCCESS)
2381 		goto fail;
2382 
2383 	/*
2384 	 * Now we use the delete stack to handle the possible unreferencing
2385 	 * of oldsnapid.
2386 	 */
2387 	(void) memset(&dip, 0, sizeof (dip));
2388 	dip.di_tx = tx;
2389 	dip.di_np_tx = NULL;	/* no need for non-persistant backend */
2390 
2391 	if ((result = delete_stack_push(&dip, BACKEND_TYPE_NORMAL,
2392 	    &snaplevel_tbl_delete, oldsnapid, 0)) != REP_PROTOCOL_SUCCESS)
2393 		goto fail;
2394 
2395 	while (delete_stack_pop(&dip, &de)) {
2396 		result = (*de.de_cb)(&dip, &de);
2397 		if (result != REP_PROTOCOL_SUCCESS)
2398 			goto fail;
2399 	}
2400 
2401 	result = backend_tx_commit(tx);
2402 	if (result != REP_PROTOCOL_SUCCESS)
2403 		goto fail;
2404 
2405 	delete_stack_cleanup(&dip);
2406 	*snapid_ptr = snapid;
2407 	return (REP_PROTOCOL_SUCCESS);
2408 
2409 fail:
2410 	backend_tx_rollback(tx);
2411 	delete_stack_cleanup(&dip);
2412 	return (result);
2413 }
2414