xref: /titanic_44/usr/src/cmd/svc/configd/file_object.c (revision ee5416c9d7e449233197d5d20bc6b81e4ff091b2)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #pragma ident	"%Z%%M%	%I%	%E% SMI"
28 
29 /*
30  * file_object.c - enter objects into and load them from the backend
31  *
32  * The primary entry points in this layer are object_create(),
33  * object_create_pg(), object_delete(), and object_fill_children().  They each
34  * take an rc_node_t and use the functions in the object_info_t info array for
35  * the node's type.
36  */
37 
38 #include <assert.h>
39 #include <pthread.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #include <strings.h>
44 
45 #include "configd.h"
46 #include "repcache_protocol.h"
47 
48 typedef struct child_info {
49 	rc_node_t	*ci_parent;
50 	backend_tx_t	*ci_tx;			/* only for properties */
51 	rc_node_lookup_t ci_base_nl;
52 } child_info_t;
53 
54 typedef struct delete_ent delete_ent_t;
55 typedef struct delete_stack delete_stack_t;
56 typedef struct delete_info delete_info_t;
57 
58 typedef int	delete_cb_func(delete_info_t *, const delete_ent_t *);
59 
60 struct delete_ent {
61 	delete_cb_func	*de_cb;		/* callback */
62 	uint32_t	de_backend;
63 	uint32_t	de_id;
64 	uint32_t	de_gen;		/* only for property groups */
65 };
66 
67 struct delete_stack {
68 	struct delete_stack *ds_next;
69 	uint32_t	ds_size;	/* number of elements */
70 	uint32_t	ds_cur;		/* current offset */
71 	delete_ent_t	ds_buf[1];	/* actually ds_size */
72 };
73 #define	DELETE_STACK_SIZE(x)	offsetof(delete_stack_t, ds_buf[(x)])
74 
75 struct delete_info {
76 	backend_tx_t	*di_tx;
77 	backend_tx_t	*di_np_tx;
78 	delete_stack_t	*di_stack;
79 	delete_stack_t	*di_free;
80 };
81 
82 typedef struct object_info {
83 	uint32_t	obj_type;
84 	enum id_space	obj_id_space;
85 
86 	int (*obj_fill_children)(rc_node_t *);
87 	int (*obj_setup_child_info)(rc_node_t *, uint32_t, child_info_t *);
88 	int (*obj_query_child)(backend_query_t *, rc_node_lookup_t *,
89 	    const char *);
90 	int (*obj_insert_child)(backend_tx_t *, rc_node_lookup_t *,
91 	    const char *);
92 	int (*obj_insert_pg_child)(backend_tx_t *, rc_node_lookup_t *,
93 	    const char *, const char *, uint32_t, uint32_t);
94 	int (*obj_delete_start)(rc_node_t *, delete_info_t *);
95 } object_info_t;
96 
97 static void
98 string_to_id(const char *str, uint32_t *output, const char *fieldname)
99 {
100 	if (uu_strtouint(str, output, sizeof (*output), 0, 0, 0) == -1)
101 		backend_panic("invalid integer \"%s\" in field \"%s\"",
102 		    str, fieldname);
103 }
104 
105 #define	NUM_NEEDED	50
106 
107 static int
108 delete_stack_push(delete_info_t *dip, uint32_t be, delete_cb_func *cb,
109     uint32_t id, uint32_t gen)
110 {
111 	delete_stack_t *cur = dip->di_stack;
112 	delete_ent_t *ent;
113 
114 	if (cur == NULL || cur->ds_cur == cur->ds_size) {
115 		delete_stack_t *new = dip->di_free;
116 		dip->di_free = NULL;
117 		if (new == NULL) {
118 			new = uu_zalloc(DELETE_STACK_SIZE(NUM_NEEDED));
119 			if (new == NULL)
120 				return (REP_PROTOCOL_FAIL_NO_RESOURCES);
121 			new->ds_size = NUM_NEEDED;
122 		}
123 		new->ds_cur = 0;
124 		new->ds_next = dip->di_stack;
125 		dip->di_stack = new;
126 		cur = new;
127 	}
128 	assert(cur->ds_cur < cur->ds_size);
129 	ent = &cur->ds_buf[cur->ds_cur++];
130 
131 	ent->de_backend = be;
132 	ent->de_cb = cb;
133 	ent->de_id = id;
134 	ent->de_gen = gen;
135 
136 	return (REP_PROTOCOL_SUCCESS);
137 }
138 
139 static int
140 delete_stack_pop(delete_info_t *dip, delete_ent_t *out)
141 {
142 	delete_stack_t *cur = dip->di_stack;
143 	delete_ent_t *ent;
144 
145 	if (cur == NULL)
146 		return (0);
147 	assert(cur->ds_cur > 0 && cur->ds_cur <= cur->ds_size);
148 	ent = &cur->ds_buf[--cur->ds_cur];
149 	if (cur->ds_cur == 0) {
150 		dip->di_stack = cur->ds_next;
151 		cur->ds_next = NULL;
152 
153 		if (dip->di_free != NULL)
154 			uu_free(dip->di_free);
155 		dip->di_free = cur;
156 	}
157 	if (ent == NULL)
158 		return (0);
159 
160 	*out = *ent;
161 	return (1);
162 }
163 
164 static void
165 delete_stack_cleanup(delete_info_t *dip)
166 {
167 	delete_stack_t *cur;
168 	while ((cur = dip->di_stack) != NULL) {
169 		dip->di_stack = cur->ds_next;
170 
171 		uu_free(cur);
172 	}
173 
174 	if ((cur = dip->di_free) != NULL) {
175 		assert(cur->ds_next == NULL);	/* should only be one */
176 		uu_free(cur);
177 		dip->di_free = NULL;
178 	}
179 }
180 
181 struct delete_cb_info {
182 	delete_info_t	*dci_dip;
183 	uint32_t	dci_be;
184 	delete_cb_func	*dci_cb;
185 	int		dci_result;
186 };
187 
188 /*ARGSUSED*/
189 static int
190 push_delete_callback(void *data, int columns, char **vals, char **names)
191 {
192 	struct delete_cb_info *info = data;
193 
194 	const char *id_str = *vals++;
195 	const char *gen_str = *vals++;
196 
197 	uint32_t id;
198 	uint32_t gen;
199 
200 	assert(columns == 2);
201 
202 	string_to_id(id_str, &id, "id");
203 	string_to_id(gen_str, &gen, "gen_id");
204 
205 	info->dci_result = delete_stack_push(info->dci_dip, info->dci_be,
206 	    info->dci_cb, id, gen);
207 
208 	if (info->dci_result != REP_PROTOCOL_SUCCESS)
209 		return (BACKEND_CALLBACK_ABORT);
210 	return (BACKEND_CALLBACK_CONTINUE);
211 }
212 
213 static int
214 value_delete(delete_info_t *dip, const delete_ent_t *ent)
215 {
216 	uint32_t be = ent->de_backend;
217 	int r;
218 
219 	backend_query_t *q;
220 
221 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
222 	    dip->di_np_tx;
223 
224 	q = backend_query_alloc();
225 
226 	backend_query_add(q,
227 	    "SELECT 1 FROM prop_lnk_tbl WHERE (lnk_val_id = %d); "
228 	    "DELETE FROM value_tbl WHERE (value_id = %d); ",
229 	    ent->de_id, ent->de_id);
230 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
231 	backend_query_free(q);
232 	if (r == REP_PROTOCOL_DONE)
233 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
234 	return (r);
235 }
236 
237 static int
238 pg_lnk_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
239 {
240 	struct delete_cb_info info;
241 	uint32_t be = ent->de_backend;
242 	int r;
243 
244 	backend_query_t *q;
245 
246 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
247 	    dip->di_np_tx;
248 
249 	/*
250 	 * For non-persistent backends, we could only have one parent, and
251 	 * he's already been deleted.
252 	 *
253 	 * For normal backends, we need to check to see if we're in
254 	 * a snapshot or are the active generation for the property
255 	 * group.  If we are, there's nothing to be done.
256 	 */
257 	if (be == BACKEND_TYPE_NORMAL) {
258 		q = backend_query_alloc();
259 		backend_query_add(q,
260 		    "SELECT 1 "
261 		    "FROM pg_tbl "
262 		    "WHERE (pg_id = %d AND pg_gen_id = %d); "
263 		    "SELECT 1 "
264 		    "FROM snaplevel_lnk_tbl "
265 		    "WHERE (snaplvl_pg_id = %d AND snaplvl_gen_id = %d);",
266 		    ent->de_id, ent->de_gen,
267 		    ent->de_id, ent->de_gen);
268 		r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
269 		backend_query_free(q);
270 
271 		if (r == REP_PROTOCOL_DONE)
272 			return (REP_PROTOCOL_SUCCESS);	/* still in use */
273 	}
274 
275 	info.dci_dip = dip;
276 	info.dci_be =  be;
277 	info.dci_cb = &value_delete;
278 	info.dci_result = REP_PROTOCOL_SUCCESS;
279 
280 	q = backend_query_alloc();
281 	backend_query_add(q,
282 	    "SELECT DISTINCT lnk_val_id, 0 FROM prop_lnk_tbl "
283 	    "WHERE "
284 	    "    (lnk_pg_id = %d AND lnk_gen_id = %d AND lnk_val_id NOTNULL); "
285 	    "DELETE FROM prop_lnk_tbl "
286 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
287 	    ent->de_id, ent->de_gen, ent->de_id, ent->de_gen);
288 
289 	r = backend_tx_run(tx, q, push_delete_callback, &info);
290 	backend_query_free(q);
291 
292 	if (r == REP_PROTOCOL_DONE) {
293 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
294 		return (info.dci_result);
295 	}
296 	return (r);
297 }
298 
299 static int
300 propertygrp_delete(delete_info_t *dip, const delete_ent_t *ent)
301 {
302 	uint32_t be = ent->de_backend;
303 	backend_query_t *q;
304 	uint32_t gen;
305 
306 	int r;
307 
308 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
309 	    dip->di_np_tx;
310 
311 	q = backend_query_alloc();
312 	backend_query_add(q,
313 	    "SELECT pg_gen_id FROM pg_tbl WHERE pg_id = %d; "
314 	    "DELETE FROM pg_tbl WHERE pg_id = %d",
315 	    ent->de_id, ent->de_id);
316 	r = backend_tx_run_single_int(tx, q, &gen);
317 	backend_query_free(q);
318 
319 	if (r != REP_PROTOCOL_SUCCESS)
320 		return (r);
321 
322 	return (delete_stack_push(dip, be, &pg_lnk_tbl_delete,
323 	    ent->de_id, gen));
324 }
325 
326 static int
327 snaplevel_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
328 {
329 	uint32_t be = ent->de_backend;
330 	backend_query_t *q;
331 	struct delete_cb_info info;
332 
333 	int r;
334 
335 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
336 	    dip->di_np_tx;
337 
338 	info.dci_dip = dip;
339 	info.dci_be = be;
340 	info.dci_cb = &pg_lnk_tbl_delete;
341 	info.dci_result = REP_PROTOCOL_SUCCESS;
342 
343 	q = backend_query_alloc();
344 	backend_query_add(q,
345 	    "SELECT snaplvl_pg_id, snaplvl_gen_id "
346 	    "    FROM snaplevel_lnk_tbl "
347 	    "    WHERE snaplvl_level_id = %d; "
348 	    "DELETE FROM snaplevel_lnk_tbl WHERE snaplvl_level_id = %d",
349 	    ent->de_id, ent->de_id);
350 	r = backend_tx_run(tx, q, push_delete_callback, &info);
351 	backend_query_free(q);
352 
353 	if (r == REP_PROTOCOL_DONE) {
354 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
355 		return (info.dci_result);
356 	}
357 	return (r);
358 }
359 
360 static int
361 snaplevel_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
362 {
363 	uint32_t be = ent->de_backend;
364 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
365 	    dip->di_np_tx;
366 
367 	struct delete_cb_info info;
368 	backend_query_t *q;
369 	int r;
370 
371 	assert(be == BACKEND_TYPE_NORMAL);
372 
373 	q = backend_query_alloc();
374 	backend_query_add(q,
375 	    "SELECT 1 FROM snapshot_lnk_tbl WHERE lnk_snap_id = %d",
376 	    ent->de_id);
377 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
378 	backend_query_free(q);
379 
380 	if (r == REP_PROTOCOL_DONE)
381 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
382 
383 	info.dci_dip = dip;
384 	info.dci_be = be;
385 	info.dci_cb = &snaplevel_lnk_delete;
386 	info.dci_result = REP_PROTOCOL_SUCCESS;
387 
388 	q = backend_query_alloc();
389 	backend_query_add(q,
390 	    "SELECT snap_level_id, 0 FROM snaplevel_tbl WHERE snap_id = %d;"
391 	    "DELETE FROM snaplevel_tbl WHERE snap_id = %d",
392 	    ent->de_id, ent->de_id);
393 	r = backend_tx_run(tx, q, push_delete_callback, &info);
394 	backend_query_free(q);
395 
396 	if (r == REP_PROTOCOL_DONE) {
397 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
398 		return (info.dci_result);
399 	}
400 	return (r);
401 }
402 
403 static int
404 snapshot_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
405 {
406 	uint32_t be = ent->de_backend;
407 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
408 	    dip->di_np_tx;
409 
410 	backend_query_t *q;
411 	uint32_t snapid;
412 	int r;
413 
414 	assert(be == BACKEND_TYPE_NORMAL);
415 
416 	q = backend_query_alloc();
417 	backend_query_add(q,
418 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
419 	    "DELETE FROM snapshot_lnk_tbl WHERE lnk_id = %d",
420 	    ent->de_id, ent->de_id);
421 	r = backend_tx_run_single_int(tx, q, &snapid);
422 	backend_query_free(q);
423 
424 	if (r != REP_PROTOCOL_SUCCESS)
425 		return (r);
426 
427 	return (delete_stack_push(dip, be, &snaplevel_tbl_delete, snapid, 0));
428 }
429 
430 static int
431 pgparent_delete_add_pgs(delete_info_t *dip, uint32_t parent_id)
432 {
433 	struct delete_cb_info info;
434 	backend_query_t *q;
435 	int r;
436 
437 	info.dci_dip = dip;
438 	info.dci_be = BACKEND_TYPE_NORMAL;
439 	info.dci_cb = &propertygrp_delete;
440 	info.dci_result = REP_PROTOCOL_SUCCESS;
441 
442 	q = backend_query_alloc();
443 	backend_query_add(q,
444 	    "SELECT pg_id, 0 FROM pg_tbl WHERE pg_parent_id = %d",
445 	    parent_id);
446 
447 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
448 
449 	if (r == REP_PROTOCOL_DONE) {
450 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
451 		backend_query_free(q);
452 		return (info.dci_result);
453 	}
454 	if (r != REP_PROTOCOL_SUCCESS) {
455 		backend_query_free(q);
456 		return (r);
457 	}
458 
459 	if (dip->di_np_tx != NULL) {
460 		info.dci_be = BACKEND_TYPE_NONPERSIST;
461 
462 		r = backend_tx_run(dip->di_np_tx, q, push_delete_callback,
463 		    &info);
464 
465 		if (r == REP_PROTOCOL_DONE) {
466 			assert(info.dci_result != REP_PROTOCOL_SUCCESS);
467 			backend_query_free(q);
468 			return (info.dci_result);
469 		}
470 		if (r != REP_PROTOCOL_SUCCESS) {
471 			backend_query_free(q);
472 			return (r);
473 		}
474 	}
475 	backend_query_free(q);
476 	return (REP_PROTOCOL_SUCCESS);
477 }
478 
479 static int
480 service_delete(delete_info_t *dip, const delete_ent_t *ent)
481 {
482 	int r;
483 
484 	r = backend_tx_run_update_changed(dip->di_tx,
485 	    "DELETE FROM service_tbl WHERE svc_id = %d", ent->de_id);
486 	if (r != REP_PROTOCOL_SUCCESS)
487 		return (r);
488 
489 	return (pgparent_delete_add_pgs(dip, ent->de_id));
490 }
491 
492 static int
493 instance_delete(delete_info_t *dip, const delete_ent_t *ent)
494 {
495 	struct delete_cb_info info;
496 	int r;
497 	backend_query_t *q;
498 
499 	r = backend_tx_run_update_changed(dip->di_tx,
500 	    "DELETE FROM instance_tbl WHERE instance_id = %d", ent->de_id);
501 	if (r != REP_PROTOCOL_SUCCESS)
502 		return (r);
503 
504 	r = pgparent_delete_add_pgs(dip, ent->de_id);
505 	if (r != REP_PROTOCOL_SUCCESS)
506 		return (r);
507 
508 	info.dci_dip = dip;
509 	info.dci_be = BACKEND_TYPE_NORMAL;
510 	info.dci_cb = &snapshot_lnk_delete;
511 	info.dci_result = REP_PROTOCOL_SUCCESS;
512 
513 	q = backend_query_alloc();
514 	backend_query_add(q,
515 	    "SELECT lnk_id, 0 FROM snapshot_lnk_tbl WHERE lnk_inst_id = %d",
516 	    ent->de_id);
517 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
518 	backend_query_free(q);
519 
520 	if (r == REP_PROTOCOL_DONE) {
521 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
522 		return (info.dci_result);
523 	}
524 	return (r);
525 }
526 
527 /*ARGSUSED*/
528 static int
529 fill_child_callback(void *data, int columns, char **vals, char **names)
530 {
531 	child_info_t *cp = data;
532 	rc_node_t *np;
533 	uint32_t main_id;
534 	const char *name;
535 	const char *cur;
536 	rc_node_lookup_t *lp = &cp->ci_base_nl;
537 
538 	assert(columns == 2);
539 
540 	name = *vals++;
541 	columns--;
542 
543 	cur = *vals++;
544 	columns--;
545 	string_to_id(cur, &main_id, "id");
546 
547 	lp->rl_main_id = main_id;
548 
549 	if ((np = rc_node_alloc()) == NULL)
550 		return (BACKEND_CALLBACK_ABORT);
551 
552 	np = rc_node_setup(np, lp, name, cp->ci_parent);
553 	rc_node_rele(np);
554 
555 	return (BACKEND_CALLBACK_CONTINUE);
556 }
557 
558 /*ARGSUSED*/
559 static int
560 fill_snapshot_callback(void *data, int columns, char **vals, char **names)
561 {
562 	child_info_t *cp = data;
563 	rc_node_t *np;
564 	uint32_t main_id;
565 	uint32_t snap_id;
566 	const char *name;
567 	const char *cur;
568 	const char *snap;
569 	rc_node_lookup_t *lp = &cp->ci_base_nl;
570 
571 	assert(columns == 3);
572 
573 	name = *vals++;
574 	columns--;
575 
576 	cur = *vals++;
577 	columns--;
578 	snap = *vals++;
579 	columns--;
580 
581 	string_to_id(cur, &main_id, "lnk_id");
582 	string_to_id(snap, &snap_id, "lnk_snap_id");
583 
584 	lp->rl_main_id = main_id;
585 
586 	if ((np = rc_node_alloc()) == NULL)
587 		return (BACKEND_CALLBACK_ABORT);
588 
589 	np = rc_node_setup_snapshot(np, lp, name, snap_id, cp->ci_parent);
590 	rc_node_rele(np);
591 
592 	return (BACKEND_CALLBACK_CONTINUE);
593 }
594 
595 /*ARGSUSED*/
596 static int
597 fill_pg_callback(void *data, int columns, char **vals, char **names)
598 {
599 	child_info_t *cip = data;
600 	const char *name;
601 	const char *type;
602 	const char *cur;
603 	uint32_t main_id;
604 	uint32_t flags;
605 	uint32_t gen_id;
606 
607 	rc_node_lookup_t *lp = &cip->ci_base_nl;
608 	rc_node_t *newnode, *pg;
609 
610 	assert(columns == 5);
611 
612 	name = *vals++;		/* pg_name */
613 	columns--;
614 
615 	cur = *vals++;		/* pg_id */
616 	columns--;
617 	string_to_id(cur, &main_id, "pg_id");
618 
619 	lp->rl_main_id = main_id;
620 
621 	cur = *vals++;		/* pg_gen_id */
622 	columns--;
623 	string_to_id(cur, &gen_id, "pg_gen_id");
624 
625 	type = *vals++;		/* pg_type */
626 	columns--;
627 
628 	cur = *vals++;		/* pg_flags */
629 	columns--;
630 	string_to_id(cur, &flags, "pg_flags");
631 
632 	if ((newnode = rc_node_alloc()) == NULL)
633 		return (BACKEND_CALLBACK_ABORT);
634 
635 	pg = rc_node_setup_pg(newnode, lp, name, type, flags, gen_id,
636 	    cip->ci_parent);
637 	if (pg == NULL) {
638 		rc_node_destroy(newnode);
639 		return (BACKEND_CALLBACK_ABORT);
640 	}
641 
642 	rc_node_rele(pg);
643 
644 	return (BACKEND_CALLBACK_CONTINUE);
645 }
646 
647 struct property_value_info {
648 	char		*pvi_base;
649 	size_t		pvi_pos;
650 	size_t		pvi_size;
651 	size_t		pvi_count;
652 };
653 
654 /*ARGSUSED*/
655 static int
656 property_value_size_cb(void *data, int columns, char **vals, char **names)
657 {
658 	struct property_value_info *info = data;
659 	assert(columns == 1);
660 
661 	info->pvi_size += strlen(vals[0]) + 1;		/* count the '\0' */
662 
663 	return (BACKEND_CALLBACK_CONTINUE);
664 }
665 
666 /*ARGSUSED*/
667 static int
668 property_value_cb(void *data, int columns, char **vals, char **names)
669 {
670 	struct property_value_info *info = data;
671 	size_t pos, left, len;
672 
673 	assert(columns == 1);
674 	pos = info->pvi_pos;
675 	left = info->pvi_size - pos;
676 
677 	pos = info->pvi_pos;
678 	left = info->pvi_size - pos;
679 
680 	if ((len = strlcpy(&info->pvi_base[pos], vals[0], left)) >= left) {
681 		/*
682 		 * since we preallocated, above, this shouldn't happen
683 		 */
684 		backend_panic("unexpected database change");
685 	}
686 
687 	len += 1;	/* count the '\0' */
688 
689 	info->pvi_pos += len;
690 	info->pvi_count++;
691 
692 	return (BACKEND_CALLBACK_CONTINUE);
693 }
694 
695 /*ARGSUSED*/
696 void
697 object_free_values(const char *vals, uint32_t type, size_t count, size_t size)
698 {
699 	if (vals != NULL)
700 		uu_free((void *)vals);
701 }
702 
703 /*ARGSUSED*/
704 static int
705 fill_property_callback(void *data, int columns, char **vals, char **names)
706 {
707 	child_info_t *cp = data;
708 	backend_tx_t *tx = cp->ci_tx;
709 	uint32_t main_id;
710 	const char *name;
711 	const char *cur;
712 	rep_protocol_value_type_t type;
713 	rc_node_lookup_t *lp = &cp->ci_base_nl;
714 	struct property_value_info info;
715 	int rc;
716 
717 	assert(columns == 4);
718 	assert(tx != NULL);
719 
720 	info.pvi_base = NULL;
721 	info.pvi_pos = 0;
722 	info.pvi_size = 0;
723 	info.pvi_count = 0;
724 
725 	name = *vals++;
726 
727 	cur = *vals++;
728 	string_to_id(cur, &main_id, "lnk_prop_id");
729 
730 	cur = *vals++;
731 	assert(('a' <= cur[0] && 'z' >= cur[0]) ||
732 	    ('A' <= cur[0] && 'Z' >= cur[0]) &&
733 	    (cur[1] == 0 || ('a' <= cur[1] && 'z' >= cur[1]) ||
734 	    ('A' <= cur[1] && 'Z' >= cur[1])));
735 	type = cur[0] | (cur[1] << 8);
736 
737 	lp->rl_main_id = main_id;
738 
739 	/*
740 	 * fill in the values, if any
741 	 */
742 	if ((cur = *vals++) != NULL) {
743 		rep_protocol_responseid_t r;
744 		backend_query_t *q = backend_query_alloc();
745 
746 		backend_query_add(q,
747 		    "SELECT value_value FROM value_tbl "
748 		    "WHERE (value_id = '%q')", cur);
749 
750 		switch (r = backend_tx_run(tx, q, property_value_size_cb,
751 		    &info)) {
752 		case REP_PROTOCOL_SUCCESS:
753 			break;
754 
755 		case REP_PROTOCOL_FAIL_NO_RESOURCES:
756 			backend_query_free(q);
757 			return (BACKEND_CALLBACK_ABORT);
758 
759 		case REP_PROTOCOL_DONE:
760 		default:
761 			backend_panic("backend_tx_run() returned %d", r);
762 		}
763 		if (info.pvi_size > 0) {
764 			info.pvi_base = uu_zalloc(info.pvi_size);
765 			if (info.pvi_base == NULL) {
766 				backend_query_free(q);
767 				return (BACKEND_CALLBACK_ABORT);
768 			}
769 			switch (r = backend_tx_run(tx, q, property_value_cb,
770 			    &info)) {
771 			case REP_PROTOCOL_SUCCESS:
772 				break;
773 
774 			case REP_PROTOCOL_FAIL_NO_RESOURCES:
775 				uu_free(info.pvi_base);
776 				backend_query_free(q);
777 				return (BACKEND_CALLBACK_ABORT);
778 
779 			case REP_PROTOCOL_DONE:
780 			default:
781 				backend_panic("backend_tx_run() returned %d",
782 				    r);
783 			}
784 		}
785 		backend_query_free(q);
786 	}
787 
788 	rc = rc_node_create_property(cp->ci_parent, lp, name, type,
789 	    info.pvi_base, info.pvi_count, info.pvi_size);
790 	if (rc != REP_PROTOCOL_SUCCESS) {
791 		assert(rc == REP_PROTOCOL_FAIL_NO_RESOURCES);
792 		return (BACKEND_CALLBACK_ABORT);
793 	}
794 
795 	return (BACKEND_CALLBACK_CONTINUE);
796 }
797 
798 /*
799  * The *_setup_child_info() functions fill in a child_info_t structure with the
800  * information for the children of np with type type.
801  *
802  * They fail with
803  *   _TYPE_MISMATCH - object cannot have children of type type
804  */
805 
806 static int
807 scope_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
808 {
809 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
810 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
811 
812 	bzero(cip, sizeof (*cip));
813 	cip->ci_parent = np;
814 	cip->ci_base_nl.rl_type = type;
815 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
816 	return (REP_PROTOCOL_SUCCESS);
817 }
818 
819 static int
820 service_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
821 {
822 	switch (type) {
823 	case REP_PROTOCOL_ENTITY_INSTANCE:
824 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
825 		break;
826 	default:
827 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
828 	}
829 
830 	bzero(cip, sizeof (*cip));
831 	cip->ci_parent = np;
832 	cip->ci_base_nl.rl_type = type;
833 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
834 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_main_id;
835 
836 	return (REP_PROTOCOL_SUCCESS);
837 }
838 
839 static int
840 instance_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
841 {
842 	switch (type) {
843 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
844 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
845 		break;
846 	default:
847 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
848 	}
849 
850 	bzero(cip, sizeof (*cip));
851 	cip->ci_parent = np;
852 	cip->ci_base_nl.rl_type = type;
853 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
854 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
855 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_main_id;
856 
857 	return (REP_PROTOCOL_SUCCESS);
858 }
859 
860 static int
861 snaplevel_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
862 {
863 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
864 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
865 
866 	bzero(cip, sizeof (*cip));
867 	cip->ci_parent = np;
868 	cip->ci_base_nl.rl_type = type;
869 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
870 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
871 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
872 	cip->ci_base_nl.rl_ids[ID_NAME] = np->rn_id.rl_ids[ID_NAME];
873 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = np->rn_id.rl_ids[ID_SNAPSHOT];
874 	cip->ci_base_nl.rl_ids[ID_LEVEL] = np->rn_id.rl_main_id;
875 
876 	return (REP_PROTOCOL_SUCCESS);
877 }
878 
879 static int
880 propertygrp_setup_child_info(rc_node_t *pg, uint32_t type, child_info_t *cip)
881 {
882 	if (type != REP_PROTOCOL_ENTITY_PROPERTY)
883 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
884 
885 	bzero(cip, sizeof (*cip));
886 	cip->ci_parent = pg;
887 	cip->ci_base_nl.rl_type = type;
888 	cip->ci_base_nl.rl_backend = pg->rn_id.rl_backend;
889 	cip->ci_base_nl.rl_ids[ID_SERVICE] = pg->rn_id.rl_ids[ID_SERVICE];
890 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = pg->rn_id.rl_ids[ID_INSTANCE];
891 	cip->ci_base_nl.rl_ids[ID_PG] = pg->rn_id.rl_main_id;
892 	cip->ci_base_nl.rl_ids[ID_GEN] = pg->rn_gen_id;
893 	cip->ci_base_nl.rl_ids[ID_NAME] = pg->rn_id.rl_ids[ID_NAME];
894 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = pg->rn_id.rl_ids[ID_SNAPSHOT];
895 	cip->ci_base_nl.rl_ids[ID_LEVEL] = pg->rn_id.rl_ids[ID_LEVEL];
896 
897 	return (REP_PROTOCOL_SUCCESS);
898 }
899 
900 /*
901  * The *_fill_children() functions populate the children of the given rc_node_t
902  * by querying the database and calling rc_node_setup_*() functions (usually
903  * via a fill_*_callback()).
904  *
905  * They fail with
906  *   _NO_RESOURCES
907  */
908 
909 /*
910  * Returns
911  *   _NO_RESOURCES
912  *   _SUCCESS
913  */
914 static int
915 scope_fill_children(rc_node_t *np)
916 {
917 	backend_query_t *q;
918 	child_info_t ci;
919 	int res;
920 
921 	(void) scope_setup_child_info(np, REP_PROTOCOL_ENTITY_SERVICE, &ci);
922 
923 	q = backend_query_alloc();
924 	backend_query_append(q, "SELECT svc_name, svc_id FROM service_tbl");
925 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
926 	backend_query_free(q);
927 
928 	if (res == REP_PROTOCOL_DONE)
929 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
930 	return (res);
931 }
932 
933 /*
934  * Returns
935  *   _NO_RESOURCES
936  *   _SUCCESS
937  */
938 static int
939 service_fill_children(rc_node_t *np)
940 {
941 	backend_query_t *q;
942 	child_info_t ci;
943 	int res;
944 
945 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
946 
947 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_INSTANCE, &ci);
948 
949 	q = backend_query_alloc();
950 	backend_query_add(q,
951 	    "SELECT instance_name, instance_id FROM instance_tbl"
952 	    "    WHERE (instance_svc = %d)",
953 	    np->rn_id.rl_main_id);
954 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
955 	backend_query_free(q);
956 
957 	if (res == REP_PROTOCOL_DONE)
958 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
959 	if (res != REP_PROTOCOL_SUCCESS)
960 		return (res);
961 
962 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
963 	    &ci);
964 
965 	q = backend_query_alloc();
966 	backend_query_add(q,
967 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
968 	    "    WHERE (pg_parent_id = %d)",
969 	    np->rn_id.rl_main_id);
970 
971 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
972 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
973 	if (res == REP_PROTOCOL_SUCCESS) {
974 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
975 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
976 		    fill_pg_callback, &ci);
977 		/* nonpersistant database may not exist */
978 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
979 			res = REP_PROTOCOL_SUCCESS;
980 	}
981 	if (res == REP_PROTOCOL_DONE)
982 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
983 	backend_query_free(q);
984 
985 	return (res);
986 }
987 
988 /*
989  * Returns
990  *   _NO_RESOURCES
991  *   _SUCCESS
992  */
993 static int
994 instance_fill_children(rc_node_t *np)
995 {
996 	backend_query_t *q;
997 	child_info_t ci;
998 	int res;
999 
1000 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
1001 
1002 	/* Get child property groups */
1003 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1004 	    &ci);
1005 
1006 	q = backend_query_alloc();
1007 	backend_query_add(q,
1008 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
1009 	    "    WHERE (pg_parent_id = %d)",
1010 	    np->rn_id.rl_main_id);
1011 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
1012 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1013 	if (res == REP_PROTOCOL_SUCCESS) {
1014 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
1015 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
1016 		    fill_pg_callback, &ci);
1017 		/* nonpersistant database may not exist */
1018 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
1019 			res = REP_PROTOCOL_SUCCESS;
1020 	}
1021 	if (res == REP_PROTOCOL_DONE)
1022 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1023 	backend_query_free(q);
1024 
1025 	if (res != REP_PROTOCOL_SUCCESS)
1026 		return (res);
1027 
1028 	/* Get child snapshots */
1029 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_SNAPSHOT,
1030 	    &ci);
1031 
1032 	q = backend_query_alloc();
1033 	backend_query_add(q,
1034 	    "SELECT lnk_snap_name, lnk_id, lnk_snap_id FROM snapshot_lnk_tbl"
1035 	    "    WHERE (lnk_inst_id = %d)",
1036 	    np->rn_id.rl_main_id);
1037 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_callback, &ci);
1038 	if (res == REP_PROTOCOL_DONE)
1039 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1040 	backend_query_free(q);
1041 
1042 	return (res);
1043 }
1044 
1045 /*
1046  * Returns
1047  *   _NO_RESOURCES
1048  *   _SUCCESS
1049  */
1050 static int
1051 snapshot_fill_children(rc_node_t *np)
1052 {
1053 	rc_node_t *nnp;
1054 	rc_snapshot_t *sp, *oldsp;
1055 	rc_snaplevel_t *lvl;
1056 	rc_node_lookup_t nl;
1057 	int r;
1058 
1059 	/* Get the rc_snapshot_t (& its rc_snaplevel_t's). */
1060 	(void) pthread_mutex_lock(&np->rn_lock);
1061 	sp = np->rn_snapshot;
1062 	(void) pthread_mutex_unlock(&np->rn_lock);
1063 	if (sp == NULL) {
1064 		r = rc_snapshot_get(np->rn_snapshot_id, &sp);
1065 		if (r != REP_PROTOCOL_SUCCESS) {
1066 			assert(r == REP_PROTOCOL_FAIL_NO_RESOURCES);
1067 			return (r);
1068 		}
1069 		(void) pthread_mutex_lock(&np->rn_lock);
1070 		oldsp = np->rn_snapshot;
1071 		assert(oldsp == NULL || oldsp == sp);
1072 		np->rn_snapshot = sp;
1073 		(void) pthread_mutex_unlock(&np->rn_lock);
1074 		if (oldsp != NULL)
1075 			rc_snapshot_rele(oldsp);
1076 	}
1077 
1078 	bzero(&nl, sizeof (nl));
1079 	nl.rl_type = REP_PROTOCOL_ENTITY_SNAPLEVEL;
1080 	nl.rl_backend = np->rn_id.rl_backend;
1081 	nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
1082 	nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
1083 	nl.rl_ids[ID_NAME] = np->rn_id.rl_main_id;
1084 	nl.rl_ids[ID_SNAPSHOT] = np->rn_snapshot_id;
1085 
1086 	/* Create rc_node_t's for the snapshot's rc_snaplevel_t's. */
1087 	for (lvl = sp->rs_levels; lvl != NULL; lvl = lvl->rsl_next) {
1088 		nnp = rc_node_alloc();
1089 		assert(nnp != NULL);
1090 		nl.rl_main_id = lvl->rsl_level_id;
1091 		nnp = rc_node_setup_snaplevel(nnp, &nl, lvl, np);
1092 		rc_node_rele(nnp);
1093 	}
1094 
1095 	return (REP_PROTOCOL_SUCCESS);
1096 }
1097 
1098 /*
1099  * Returns
1100  *   _NO_RESOURCES
1101  *   _SUCCESS
1102  */
1103 static int
1104 snaplevel_fill_children(rc_node_t *np)
1105 {
1106 	rc_snaplevel_t *lvl = np->rn_snaplevel;
1107 	child_info_t ci;
1108 	int res;
1109 	backend_query_t *q;
1110 
1111 	(void) snaplevel_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1112 	    &ci);
1113 
1114 	q = backend_query_alloc();
1115 	backend_query_add(q,
1116 	    "SELECT snaplvl_pg_name, snaplvl_pg_id, snaplvl_gen_id, "
1117 	    "    snaplvl_pg_type, snaplvl_pg_flags "
1118 	    "    FROM snaplevel_lnk_tbl "
1119 	    "    WHERE (snaplvl_level_id = %d)",
1120 	    lvl->rsl_level_id);
1121 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1122 	if (res == REP_PROTOCOL_DONE)
1123 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1124 	backend_query_free(q);
1125 
1126 	return (res);
1127 }
1128 
1129 /*
1130  * Returns
1131  *   _NO_RESOURCES
1132  *   _SUCCESS
1133  */
1134 static int
1135 propertygrp_fill_children(rc_node_t *np)
1136 {
1137 	backend_query_t *q;
1138 	child_info_t ci;
1139 	int res;
1140 	backend_tx_t *tx;
1141 
1142 	backend_type_t backend = np->rn_id.rl_backend;
1143 
1144 	(void) propertygrp_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTY,
1145 	    &ci);
1146 
1147 	res = backend_tx_begin_ro(backend, &tx);
1148 	if (res != REP_PROTOCOL_SUCCESS) {
1149 		/*
1150 		 * If the backend didn't exist, we wouldn't have got this
1151 		 * property group.
1152 		 */
1153 		assert(res != REP_PROTOCOL_FAIL_BACKEND_ACCESS);
1154 		return (res);
1155 	}
1156 
1157 	ci.ci_tx = tx;
1158 
1159 	q = backend_query_alloc();
1160 	backend_query_add(q,
1161 	    "SELECT lnk_prop_name, lnk_prop_id, lnk_prop_type, lnk_val_id "
1162 	    "FROM prop_lnk_tbl "
1163 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
1164 	    np->rn_id.rl_main_id, np->rn_gen_id);
1165 	res = backend_tx_run(tx, q, fill_property_callback, &ci);
1166 	if (res == REP_PROTOCOL_DONE)
1167 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1168 	backend_query_free(q);
1169 	backend_tx_end_ro(tx);
1170 
1171 	return (res);
1172 }
1173 
1174 /*
1175  * Fails with
1176  *   _TYPE_MISMATCH - lp is not for a service
1177  *   _INVALID_TYPE - lp has invalid type
1178  *   _BAD_REQUEST - name is invalid
1179  */
1180 static int
1181 scope_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1182 {
1183 	uint32_t type = lp->rl_type;
1184 	int rc;
1185 
1186 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
1187 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1188 
1189 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1190 		return (rc);
1191 
1192 	backend_query_add(q,
1193 	    "SELECT svc_id FROM service_tbl "
1194 	    "WHERE svc_name = '%q'",
1195 	    name);
1196 
1197 	return (REP_PROTOCOL_SUCCESS);
1198 }
1199 
1200 /*
1201  * Fails with
1202  *   _NO_RESOURCES - out of memory
1203  */
1204 static int
1205 scope_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1206 {
1207 	return (backend_tx_run_update(tx,
1208 	    "INSERT INTO service_tbl (svc_id, svc_name) "
1209 	    "VALUES (%d, '%q')",
1210 	    lp->rl_main_id, name));
1211 }
1212 
1213 /*
1214  * Fails with
1215  *   _TYPE_MISMATCH - lp is not for an instance or property group
1216  *   _INVALID_TYPE - lp has invalid type
1217  *   _BAD_REQUEST - name is invalid
1218  */
1219 static int
1220 service_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1221 {
1222 	uint32_t type = lp->rl_type;
1223 	int rc;
1224 
1225 	if (type != REP_PROTOCOL_ENTITY_INSTANCE &&
1226 	    type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
1227 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1228 
1229 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1230 		return (rc);
1231 
1232 	switch (type) {
1233 	case REP_PROTOCOL_ENTITY_INSTANCE:
1234 		backend_query_add(q,
1235 		    "SELECT instance_id FROM instance_tbl "
1236 		    "WHERE instance_name = '%q' AND instance_svc = %d",
1237 		    name, lp->rl_ids[ID_SERVICE]);
1238 		break;
1239 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1240 		backend_query_add(q,
1241 		    "SELECT pg_id FROM pg_tbl "
1242 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1243 		    name, lp->rl_ids[ID_SERVICE]);
1244 		break;
1245 	default:
1246 		assert(0);
1247 		abort();
1248 	}
1249 
1250 	return (REP_PROTOCOL_SUCCESS);
1251 }
1252 
1253 /*
1254  * Fails with
1255  *   _NO_RESOURCES - out of memory
1256  */
1257 static int
1258 service_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1259 {
1260 	return (backend_tx_run_update(tx,
1261 	    "INSERT INTO instance_tbl "
1262 	    "    (instance_id, instance_name, instance_svc) "
1263 	    "VALUES (%d, '%q', %d)",
1264 	    lp->rl_main_id, name, lp->rl_ids[ID_SERVICE]));
1265 }
1266 
1267 /*
1268  * Fails with
1269  *   _NO_RESOURCES - out of memory
1270  */
1271 static int
1272 instance_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1273 {
1274 	return (backend_tx_run_update(tx,
1275 	    "INSERT INTO snapshot_lnk_tbl "
1276 	    "    (lnk_id, lnk_inst_id, lnk_snap_name, lnk_snap_id) "
1277 	    "VALUES (%d, %d, '%q', 0)",
1278 	    lp->rl_main_id, lp->rl_ids[ID_INSTANCE], name));
1279 }
1280 
1281 /*
1282  * Fails with
1283  *   _TYPE_MISMATCH - lp is not for a property group or snapshot
1284  *   _INVALID_TYPE - lp has invalid type
1285  *   _BAD_REQUEST - name is invalid
1286  */
1287 static int
1288 instance_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1289 {
1290 	uint32_t type = lp->rl_type;
1291 	int rc;
1292 
1293 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP &&
1294 	    type != REP_PROTOCOL_ENTITY_SNAPSHOT)
1295 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1296 
1297 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1298 		return (rc);
1299 
1300 	switch (type) {
1301 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1302 		backend_query_add(q,
1303 		    "SELECT pg_id FROM pg_tbl "
1304 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1305 		    name, lp->rl_ids[ID_INSTANCE]);
1306 		break;
1307 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
1308 		backend_query_add(q,
1309 		    "SELECT lnk_id FROM snapshot_lnk_tbl "
1310 		    "    WHERE lnk_snap_name = '%q' AND lnk_inst_id = %d",
1311 		    name, lp->rl_ids[ID_INSTANCE]);
1312 		break;
1313 	default:
1314 		assert(0);
1315 		abort();
1316 	}
1317 
1318 	return (REP_PROTOCOL_SUCCESS);
1319 }
1320 
1321 static int
1322 generic_insert_pg_child(backend_tx_t *tx, rc_node_lookup_t *lp,
1323     const char *name, const char *pgtype, uint32_t flags, uint32_t gen)
1324 {
1325 	int parent_id = (lp->rl_ids[ID_INSTANCE] != 0)?
1326 	    lp->rl_ids[ID_INSTANCE] : lp->rl_ids[ID_SERVICE];
1327 	return (backend_tx_run_update(tx,
1328 	    "INSERT INTO pg_tbl "
1329 	    "    (pg_id, pg_name, pg_parent_id, pg_type, pg_flags, pg_gen_id) "
1330 	    "VALUES (%d, '%q', %d, '%q', %d, %d)",
1331 	    lp->rl_main_id, name, parent_id, pgtype, flags, gen));
1332 }
1333 
1334 static int
1335 service_delete_start(rc_node_t *np, delete_info_t *dip)
1336 {
1337 	int r;
1338 	backend_query_t *q = backend_query_alloc();
1339 
1340 	/*
1341 	 * Check for child instances, and refuse to delete if they exist.
1342 	 */
1343 	backend_query_add(q,
1344 	    "SELECT 1 FROM instance_tbl WHERE instance_svc = %d",
1345 	    np->rn_id.rl_main_id);
1346 
1347 	r = backend_tx_run(dip->di_tx, q, backend_fail_if_seen, NULL);
1348 	backend_query_free(q);
1349 
1350 	if (r == REP_PROTOCOL_DONE)
1351 		return (REP_PROTOCOL_FAIL_EXISTS);	/* instances exist */
1352 
1353 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &service_delete,
1354 	    np->rn_id.rl_main_id, 0));
1355 }
1356 
1357 static int
1358 instance_delete_start(rc_node_t *np, delete_info_t *dip)
1359 {
1360 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &instance_delete,
1361 	    np->rn_id.rl_main_id, 0));
1362 }
1363 
1364 static int
1365 snapshot_delete_start(rc_node_t *np, delete_info_t *dip)
1366 {
1367 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL,
1368 	    &snapshot_lnk_delete, np->rn_id.rl_main_id, 0));
1369 }
1370 
1371 static int
1372 propertygrp_delete_start(rc_node_t *np, delete_info_t *dip)
1373 {
1374 	return (delete_stack_push(dip, np->rn_id.rl_backend,
1375 	    &propertygrp_delete, np->rn_id.rl_main_id, 0));
1376 }
1377 
1378 static object_info_t info[] = {
1379 	{REP_PROTOCOL_ENTITY_NONE},
1380 	{REP_PROTOCOL_ENTITY_SCOPE,
1381 		BACKEND_ID_INVALID,
1382 		scope_fill_children,
1383 		scope_setup_child_info,
1384 		scope_query_child,
1385 		scope_insert_child,
1386 		NULL,
1387 		NULL,
1388 	},
1389 	{REP_PROTOCOL_ENTITY_SERVICE,
1390 		BACKEND_ID_SERVICE_INSTANCE,
1391 		service_fill_children,
1392 		service_setup_child_info,
1393 		service_query_child,
1394 		service_insert_child,
1395 		generic_insert_pg_child,
1396 		service_delete_start,
1397 	},
1398 	{REP_PROTOCOL_ENTITY_INSTANCE,
1399 		BACKEND_ID_SERVICE_INSTANCE,
1400 		instance_fill_children,
1401 		instance_setup_child_info,
1402 		instance_query_child,
1403 		instance_insert_child,
1404 		generic_insert_pg_child,
1405 		instance_delete_start,
1406 	},
1407 	{REP_PROTOCOL_ENTITY_SNAPSHOT,
1408 		BACKEND_ID_SNAPNAME,
1409 		snapshot_fill_children,
1410 		NULL,
1411 		NULL,
1412 		NULL,
1413 		NULL,
1414 		snapshot_delete_start,
1415 	},
1416 	{REP_PROTOCOL_ENTITY_SNAPLEVEL,
1417 		BACKEND_ID_SNAPLEVEL,
1418 		snaplevel_fill_children,
1419 		snaplevel_setup_child_info,
1420 	},
1421 	{REP_PROTOCOL_ENTITY_PROPERTYGRP,
1422 		BACKEND_ID_PROPERTYGRP,
1423 		propertygrp_fill_children,
1424 		NULL,
1425 		NULL,
1426 		NULL,
1427 		NULL,
1428 		propertygrp_delete_start,
1429 	},
1430 	{REP_PROTOCOL_ENTITY_PROPERTY},
1431 	{-1UL}
1432 };
1433 #define	NUM_INFO (sizeof (info) / sizeof (*info))
1434 
1435 /*
1436  * object_fill_children() populates the child list of an rc_node_t by calling
1437  * the appropriate <type>_fill_children() which runs backend queries that
1438  * call an appropriate fill_*_callback() which takes a row of results,
1439  * decodes them, and calls an rc_node_setup*() function in rc_node.c to create
1440  * a child.
1441  *
1442  * Fails with
1443  *   _NO_RESOURCES
1444  */
1445 int
1446 object_fill_children(rc_node_t *pp)
1447 {
1448 	uint32_t type = pp->rn_id.rl_type;
1449 	assert(type > 0 && type < NUM_INFO);
1450 
1451 	return ((*info[type].obj_fill_children)(pp));
1452 }
1453 
1454 int
1455 object_delete(rc_node_t *pp)
1456 {
1457 	int rc;
1458 
1459 	delete_info_t dip;
1460 	delete_ent_t de;
1461 
1462 	uint32_t type = pp->rn_id.rl_type;
1463 	assert(type > 0 && type < NUM_INFO);
1464 
1465 	if (info[type].obj_delete_start == NULL)
1466 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1467 
1468 	(void) memset(&dip, '\0', sizeof (dip));
1469 	rc = backend_tx_begin(BACKEND_TYPE_NORMAL, &dip.di_tx);
1470 	if (rc != REP_PROTOCOL_SUCCESS)
1471 		return (rc);
1472 
1473 	rc = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &dip.di_np_tx);
1474 	if (rc == REP_PROTOCOL_FAIL_BACKEND_ACCESS ||
1475 	    rc == REP_PROTOCOL_FAIL_BACKEND_READONLY)
1476 		dip.di_np_tx = NULL;
1477 	else if (rc != REP_PROTOCOL_SUCCESS) {
1478 		backend_tx_rollback(dip.di_tx);
1479 		return (rc);
1480 	}
1481 
1482 	if ((rc = (*info[type].obj_delete_start)(pp, &dip)) !=
1483 	    REP_PROTOCOL_SUCCESS) {
1484 		goto fail;
1485 	}
1486 
1487 	while (delete_stack_pop(&dip, &de)) {
1488 		rc = (*de.de_cb)(&dip, &de);
1489 		if (rc != REP_PROTOCOL_SUCCESS)
1490 			goto fail;
1491 	}
1492 
1493 	rc = backend_tx_commit(dip.di_tx);
1494 	if (rc != REP_PROTOCOL_SUCCESS)
1495 		backend_tx_rollback(dip.di_np_tx);
1496 	else if (dip.di_np_tx)
1497 		(void) backend_tx_commit(dip.di_np_tx);
1498 
1499 	delete_stack_cleanup(&dip);
1500 
1501 	return (rc);
1502 
1503 fail:
1504 	backend_tx_rollback(dip.di_tx);
1505 	backend_tx_rollback(dip.di_np_tx);
1506 	delete_stack_cleanup(&dip);
1507 	return (rc);
1508 }
1509 
1510 int
1511 object_do_create(backend_tx_t *tx, child_info_t *cip, rc_node_t *pp,
1512     uint32_t type, const char *name, rc_node_t **cpp)
1513 {
1514 	uint32_t ptype = pp->rn_id.rl_type;
1515 
1516 	backend_query_t *q;
1517 	uint32_t id;
1518 	rc_node_t *np = NULL;
1519 	int rc;
1520 	object_info_t *ip;
1521 
1522 	rc_node_lookup_t *lp = &cip->ci_base_nl;
1523 
1524 	assert(ptype > 0 && ptype < NUM_INFO);
1525 
1526 	ip = &info[ptype];
1527 
1528 	if (type == REP_PROTOCOL_ENTITY_PROPERTYGRP)
1529 		return (REP_PROTOCOL_FAIL_NOT_APPLICABLE);
1530 
1531 	if (ip->obj_setup_child_info == NULL ||
1532 	    ip->obj_query_child == NULL ||
1533 	    ip->obj_insert_child == NULL)
1534 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1535 
1536 	if ((rc = (*ip->obj_setup_child_info)(pp, type, cip)) !=
1537 	    REP_PROTOCOL_SUCCESS)
1538 		return (rc);
1539 
1540 	q = backend_query_alloc();
1541 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1542 	    REP_PROTOCOL_SUCCESS) {
1543 		assert(rc == REP_PROTOCOL_FAIL_BAD_REQUEST);
1544 		backend_query_free(q);
1545 		return (rc);
1546 	}
1547 
1548 	rc = backend_tx_run_single_int(tx, q, &id);
1549 	backend_query_free(q);
1550 
1551 	if (rc == REP_PROTOCOL_SUCCESS)
1552 		return (REP_PROTOCOL_FAIL_EXISTS);
1553 	else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND)
1554 		return (rc);
1555 
1556 	if ((lp->rl_main_id = backend_new_id(tx,
1557 	    info[type].obj_id_space)) == 0) {
1558 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1559 	}
1560 
1561 	if ((np = rc_node_alloc()) == NULL)
1562 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1563 
1564 	if ((rc = (*ip->obj_insert_child)(tx, lp, name)) !=
1565 	    REP_PROTOCOL_SUCCESS) {
1566 		rc_node_destroy(np);
1567 		return (rc);
1568 	}
1569 
1570 	*cpp = np;
1571 	return (REP_PROTOCOL_SUCCESS);
1572 }
1573 
1574 /*
1575  * Fails with
1576  *   _NOT_APPLICABLE - type is _PROPERTYGRP
1577  *   _BAD_REQUEST - cannot create children for this type of node
1578  *		    name is invalid
1579  *   _TYPE_MISMATCH - object cannot have children of type type
1580  *   _NO_RESOURCES - out of memory, or could not allocate new id
1581  *   _BACKEND_READONLY
1582  *   _BACKEND_ACCESS
1583  *   _EXISTS - child already exists
1584  */
1585 int
1586 object_create(rc_node_t *pp, uint32_t type, const char *name, rc_node_t **cpp)
1587 {
1588 	backend_tx_t *tx;
1589 	rc_node_t *np = NULL;
1590 	child_info_t ci;
1591 	int rc;
1592 
1593 	if ((rc = backend_tx_begin(pp->rn_id.rl_backend, &tx)) !=
1594 	    REP_PROTOCOL_SUCCESS) {
1595 		return (rc);
1596 	}
1597 
1598 	if ((rc = object_do_create(tx, &ci, pp, type, name, &np)) !=
1599 	    REP_PROTOCOL_SUCCESS) {
1600 		backend_tx_rollback(tx);
1601 		return (rc);
1602 	}
1603 
1604 	rc = backend_tx_commit(tx);
1605 	if (rc != REP_PROTOCOL_SUCCESS) {
1606 		rc_node_destroy(np);
1607 		return (rc);
1608 	}
1609 
1610 	*cpp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
1611 
1612 	return (REP_PROTOCOL_SUCCESS);
1613 }
1614 
1615 /*ARGSUSED*/
1616 int
1617 object_create_pg(rc_node_t *pp, uint32_t type, const char *name,
1618     const char *pgtype, uint32_t flags, rc_node_t **cpp)
1619 {
1620 	uint32_t ptype = pp->rn_id.rl_type;
1621 	backend_tx_t *tx_ro, *tx_wr;
1622 	backend_query_t *q;
1623 	uint32_t id;
1624 	uint32_t gen = 0;
1625 	rc_node_t *np = NULL;
1626 	int rc;
1627 	int rc_wr;
1628 	int rc_ro;
1629 	object_info_t *ip;
1630 
1631 	int nonpersist = (flags & SCF_PG_FLAG_NONPERSISTENT);
1632 
1633 	child_info_t ci;
1634 	rc_node_lookup_t *lp = &ci.ci_base_nl;
1635 
1636 	assert(ptype > 0 && ptype < NUM_INFO);
1637 
1638 	if (ptype != REP_PROTOCOL_ENTITY_SERVICE &&
1639 	    ptype != REP_PROTOCOL_ENTITY_INSTANCE)
1640 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1641 
1642 	ip = &info[ptype];
1643 
1644 	assert(ip->obj_setup_child_info != NULL &&
1645 	    ip->obj_query_child != NULL &&
1646 	    ip->obj_insert_pg_child != NULL);
1647 
1648 	if ((rc = (*ip->obj_setup_child_info)(pp, type, &ci)) !=
1649 	    REP_PROTOCOL_SUCCESS)
1650 		return (rc);
1651 
1652 	q = backend_query_alloc();
1653 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1654 	    REP_PROTOCOL_SUCCESS) {
1655 		backend_query_free(q);
1656 		return (rc);
1657 	}
1658 
1659 	if (!nonpersist) {
1660 		lp->rl_backend = BACKEND_TYPE_NORMAL;
1661 		rc_wr = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx_wr);
1662 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NONPERSIST, &tx_ro);
1663 	} else {
1664 		lp->rl_backend = BACKEND_TYPE_NONPERSIST;
1665 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NORMAL, &tx_ro);
1666 		rc_wr = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &tx_wr);
1667 	}
1668 
1669 	if (rc_wr != REP_PROTOCOL_SUCCESS) {
1670 		rc = rc_wr;
1671 		goto fail;
1672 	}
1673 	if (rc_ro != REP_PROTOCOL_SUCCESS &&
1674 	    rc_ro != REP_PROTOCOL_FAIL_BACKEND_ACCESS) {
1675 		rc = rc_ro;
1676 		goto fail;
1677 	}
1678 
1679 	if (tx_ro != NULL) {
1680 		rc = backend_tx_run_single_int(tx_ro, q, &id);
1681 
1682 		if (rc == REP_PROTOCOL_SUCCESS) {
1683 			backend_query_free(q);
1684 			rc = REP_PROTOCOL_FAIL_EXISTS;
1685 			goto fail;
1686 		} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1687 			backend_query_free(q);
1688 			goto fail;
1689 		}
1690 	}
1691 
1692 	rc = backend_tx_run_single_int(tx_wr, q, &id);
1693 	backend_query_free(q);
1694 
1695 	if (rc == REP_PROTOCOL_SUCCESS) {
1696 		rc = REP_PROTOCOL_FAIL_EXISTS;
1697 		goto fail;
1698 	} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1699 		goto fail;
1700 	}
1701 
1702 	if (tx_ro != NULL)
1703 		backend_tx_end_ro(tx_ro);
1704 	tx_ro = NULL;
1705 
1706 	if ((lp->rl_main_id = backend_new_id(tx_wr,
1707 	    info[type].obj_id_space)) == 0) {
1708 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1709 		goto fail;
1710 	}
1711 
1712 	if ((np = rc_node_alloc()) == NULL) {
1713 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1714 		goto fail;
1715 	}
1716 
1717 	if ((rc = (*ip->obj_insert_pg_child)(tx_wr, lp, name, pgtype, flags,
1718 	    gen)) != REP_PROTOCOL_SUCCESS) {
1719 		rc_node_destroy(np);
1720 		goto fail;
1721 	}
1722 
1723 	rc = backend_tx_commit(tx_wr);
1724 	if (rc != REP_PROTOCOL_SUCCESS) {
1725 		rc_node_destroy(np);
1726 		return (rc);
1727 	}
1728 
1729 	*cpp = rc_node_setup_pg(np, lp, name, pgtype, flags, gen, ci.ci_parent);
1730 
1731 	return (REP_PROTOCOL_SUCCESS);
1732 
1733 fail:
1734 	if (tx_ro != NULL)
1735 		backend_tx_end_ro(tx_ro);
1736 	if (tx_wr != NULL)
1737 		backend_tx_rollback(tx_wr);
1738 	return (rc);
1739 }
1740 
1741 /*
1742  * Given a row of snaplevel number, snaplevel id, service id, service name,
1743  * instance id, & instance name, create a rc_snaplevel_t & prepend it onto the
1744  * rs_levels list of the rc_snapshot_t passed in as data.
1745  * Returns _CONTINUE on success or _ABORT if any allocations fail.
1746  */
1747 /*ARGSUSED*/
1748 static int
1749 fill_snapshot_cb(void *data, int columns, char **vals, char **names)
1750 {
1751 	rc_snapshot_t *sp = data;
1752 	rc_snaplevel_t *lvl;
1753 	char *num = vals[0];
1754 	char *id = vals[1];
1755 	char *service_id = vals[2];
1756 	char *service = vals[3];
1757 	char *instance_id = vals[4];
1758 	char *instance = vals[5];
1759 	assert(columns == 6);
1760 
1761 	lvl = uu_zalloc(sizeof (*lvl));
1762 	if (lvl == NULL)
1763 		return (BACKEND_CALLBACK_ABORT);
1764 	lvl->rsl_parent = sp;
1765 	lvl->rsl_next = sp->rs_levels;
1766 	sp->rs_levels = lvl;
1767 
1768 	string_to_id(num, &lvl->rsl_level_num, "snap_level_num");
1769 	string_to_id(id, &lvl->rsl_level_id, "snap_level_id");
1770 	string_to_id(service_id, &lvl->rsl_service_id, "snap_level_service_id");
1771 	if (instance_id != NULL)
1772 		string_to_id(instance_id, &lvl->rsl_instance_id,
1773 		    "snap_level_instance_id");
1774 
1775 	lvl->rsl_scope = (const char *)"localhost";
1776 	lvl->rsl_service = strdup(service);
1777 	if (lvl->rsl_service == NULL) {
1778 		uu_free(lvl);
1779 		return (BACKEND_CALLBACK_ABORT);
1780 	}
1781 	if (instance) {
1782 		assert(lvl->rsl_instance_id != 0);
1783 		lvl->rsl_instance = strdup(instance);
1784 		if (lvl->rsl_instance == NULL) {
1785 			free((void *)lvl->rsl_instance);
1786 			uu_free(lvl);
1787 			return (BACKEND_CALLBACK_ABORT);
1788 		}
1789 	} else {
1790 		assert(lvl->rsl_instance_id == 0);
1791 	}
1792 
1793 	return (BACKEND_CALLBACK_CONTINUE);
1794 }
1795 
1796 /*
1797  * Populate sp's rs_levels list from the snaplevel_tbl table.
1798  * Fails with
1799  *   _NO_RESOURCES
1800  */
1801 int
1802 object_fill_snapshot(rc_snapshot_t *sp)
1803 {
1804 	backend_query_t *q;
1805 	rc_snaplevel_t *sl;
1806 	int result;
1807 	int i;
1808 
1809 	q = backend_query_alloc();
1810 	backend_query_add(q,
1811 	    "SELECT snap_level_num, snap_level_id, "
1812 	    "    snap_level_service_id, snap_level_service, "
1813 	    "    snap_level_instance_id, snap_level_instance "
1814 	    "FROM snaplevel_tbl "
1815 	    "WHERE snap_id = %d "
1816 	    "ORDER BY snap_level_id DESC",
1817 	    sp->rs_snap_id);
1818 
1819 	result = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_cb, sp);
1820 	if (result == REP_PROTOCOL_DONE)
1821 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
1822 	backend_query_free(q);
1823 
1824 	if (result == REP_PROTOCOL_SUCCESS) {
1825 		i = 0;
1826 		for (sl = sp->rs_levels; sl != NULL; sl = sl->rsl_next) {
1827 			if (sl->rsl_level_num != ++i) {
1828 				backend_panic("snaplevels corrupt; expected "
1829 				    "level %d, got %d", i, sl->rsl_level_num);
1830 			}
1831 		}
1832 	}
1833 	return (result);
1834 }
1835 
1836 /*
1837  * This represents a property group in a snapshot.
1838  */
1839 typedef struct check_snapshot_elem {
1840 	uint32_t cse_parent;
1841 	uint32_t cse_pg_id;
1842 	uint32_t cse_pg_gen;
1843 	char	cse_seen;
1844 } check_snapshot_elem_t;
1845 
1846 #define	CSI_MAX_PARENTS		COMPOSITION_DEPTH
1847 typedef struct check_snapshot_info {
1848 	size_t			csi_count;
1849 	size_t			csi_array_size;
1850 	check_snapshot_elem_t	*csi_array;
1851 	size_t			csi_nparents;
1852 	uint32_t		csi_parent_ids[CSI_MAX_PARENTS];
1853 } check_snapshot_info_t;
1854 
1855 /*ARGSUSED*/
1856 static int
1857 check_snapshot_fill_cb(void *data, int columns, char **vals, char **names)
1858 {
1859 	check_snapshot_info_t *csip = data;
1860 	check_snapshot_elem_t *cur;
1861 	const char *parent;
1862 	const char *pg_id;
1863 	const char *pg_gen_id;
1864 
1865 	if (columns == 1) {
1866 		uint32_t *target;
1867 
1868 		if (csip->csi_nparents >= CSI_MAX_PARENTS)
1869 			backend_panic("snaplevel table has too many elements");
1870 
1871 		target = &csip->csi_parent_ids[csip->csi_nparents++];
1872 		string_to_id(vals[0], target, "snap_level_*_id");
1873 
1874 		return (BACKEND_CALLBACK_CONTINUE);
1875 	}
1876 
1877 	assert(columns == 3);
1878 
1879 	parent = vals[0];
1880 	pg_id = vals[1];
1881 	pg_gen_id = vals[2];
1882 
1883 	if (csip->csi_count == csip->csi_array_size) {
1884 		size_t newsz = (csip->csi_array_size > 0) ?
1885 		    csip->csi_array_size * 2 : 8;
1886 		check_snapshot_elem_t *new = uu_zalloc(newsz * sizeof (*new));
1887 
1888 		if (new == NULL)
1889 			return (BACKEND_CALLBACK_ABORT);
1890 
1891 		(void) memcpy(new, csip->csi_array,
1892 		    sizeof (*new) * csip->csi_array_size);
1893 		uu_free(csip->csi_array);
1894 		csip->csi_array = new;
1895 		csip->csi_array_size = newsz;
1896 	}
1897 
1898 	cur = &csip->csi_array[csip->csi_count++];
1899 
1900 	string_to_id(parent, &cur->cse_parent, "snap_level_*_id");
1901 	string_to_id(pg_id, &cur->cse_pg_id, "snaplvl_pg_id");
1902 	string_to_id(pg_gen_id, &cur->cse_pg_gen, "snaplvl_gen_id");
1903 	cur->cse_seen = 0;
1904 
1905 	return (BACKEND_CALLBACK_CONTINUE);
1906 }
1907 
1908 static int
1909 check_snapshot_elem_cmp(const void *lhs_arg, const void *rhs_arg)
1910 {
1911 	const check_snapshot_elem_t *lhs = lhs_arg;
1912 	const check_snapshot_elem_t *rhs = rhs_arg;
1913 
1914 	if (lhs->cse_parent < rhs->cse_parent)
1915 		return (-1);
1916 	if (lhs->cse_parent > rhs->cse_parent)
1917 		return (1);
1918 
1919 	if (lhs->cse_pg_id < rhs->cse_pg_id)
1920 		return (-1);
1921 	if (lhs->cse_pg_id > rhs->cse_pg_id)
1922 		return (1);
1923 
1924 	if (lhs->cse_pg_gen < rhs->cse_pg_gen)
1925 		return (-1);
1926 	if (lhs->cse_pg_gen > rhs->cse_pg_gen)
1927 		return (1);
1928 
1929 	return (0);
1930 }
1931 
1932 /*ARGSUSED*/
1933 static int
1934 check_snapshot_check_cb(void *data, int columns, char **vals, char **names)
1935 {
1936 	check_snapshot_info_t *csip = data;
1937 	check_snapshot_elem_t elem;
1938 	check_snapshot_elem_t *cur;
1939 
1940 	const char *parent = vals[0];
1941 	const char *pg_id = vals[1];
1942 	const char *pg_gen_id = vals[2];
1943 
1944 	assert(columns == 3);
1945 
1946 	string_to_id(parent, &elem.cse_parent, "snap_level_*_id");
1947 	string_to_id(pg_id, &elem.cse_pg_id, "snaplvl_pg_id");
1948 	string_to_id(pg_gen_id, &elem.cse_pg_gen, "snaplvl_gen_id");
1949 
1950 	if ((cur = bsearch(&elem, csip->csi_array, csip->csi_count,
1951 	    sizeof (*csip->csi_array), check_snapshot_elem_cmp)) == NULL)
1952 		return (BACKEND_CALLBACK_ABORT);
1953 
1954 	if (cur->cse_seen)
1955 		backend_panic("duplicate property group reported");
1956 	cur->cse_seen = 1;
1957 	return (BACKEND_CALLBACK_CONTINUE);
1958 }
1959 
1960 /*
1961  * Check that a snapshot matches up with the latest in the repository.
1962  * Returns:
1963  *	REP_PROTOCOL_SUCCESS		if it is up-to-date,
1964  *	REP_PROTOCOL_DONE		if it is out-of-date, or
1965  *	REP_PROTOCOL_FAIL_NO_RESOURCES	if we ran out of memory.
1966  */
1967 static int
1968 object_check_snapshot(uint32_t snap_id)
1969 {
1970 	check_snapshot_info_t csi;
1971 	backend_query_t *q;
1972 	int result;
1973 	size_t idx;
1974 
1975 	/* if the snapshot has never been taken, it must be out of date. */
1976 	if (snap_id == 0)
1977 		return (REP_PROTOCOL_DONE);
1978 
1979 	(void) memset(&csi, '\0', sizeof (csi));
1980 
1981 	q = backend_query_alloc();
1982 	backend_query_add(q,
1983 	    "SELECT\n"
1984 	    "    CASE snap_level_instance_id\n"
1985 	    "        WHEN 0 THEN snap_level_service_id\n"
1986 	    "        ELSE snap_level_instance_id\n"
1987 	    "    END\n"
1988 	    "FROM snaplevel_tbl\n"
1989 	    "WHERE snap_id = %d;\n"
1990 	    "\n"
1991 	    "SELECT\n"
1992 	    "    CASE snap_level_instance_id\n"
1993 	    "        WHEN 0 THEN snap_level_service_id\n"
1994 	    "        ELSE snap_level_instance_id\n"
1995 	    "    END,\n"
1996 	    "    snaplvl_pg_id,\n"
1997 	    "    snaplvl_gen_id\n"
1998 	    "FROM snaplevel_tbl, snaplevel_lnk_tbl\n"
1999 	    "WHERE\n"
2000 	    "    (snaplvl_level_id = snap_level_id AND\n"
2001 	    "    snap_id = %d);",
2002 	    snap_id, snap_id);
2003 
2004 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_fill_cb,
2005 	    &csi);
2006 	if (result == REP_PROTOCOL_DONE)
2007 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2008 	backend_query_free(q);
2009 
2010 	if (result != REP_PROTOCOL_SUCCESS)
2011 		goto fail;
2012 
2013 	if (csi.csi_count > 0) {
2014 		qsort(csi.csi_array, csi.csi_count, sizeof (*csi.csi_array),
2015 		    check_snapshot_elem_cmp);
2016 	}
2017 
2018 #if COMPOSITION_DEPTH == 2
2019 	if (csi.csi_nparents != COMPOSITION_DEPTH) {
2020 		result = REP_PROTOCOL_DONE;
2021 		goto fail;
2022 	}
2023 
2024 	q = backend_query_alloc();
2025 	backend_query_add(q,
2026 	    "SELECT "
2027 	    "    pg_parent_id, pg_id, pg_gen_id "
2028 	    "FROM "
2029 	    "    pg_tbl "
2030 	    "WHERE (pg_parent_id = %d OR pg_parent_id = %d)",
2031 	    csi.csi_parent_ids[0], csi.csi_parent_ids[1]);
2032 
2033 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_check_cb,
2034 	    &csi);
2035 #else
2036 #error This code must be updated
2037 #endif
2038 	/*
2039 	 * To succeed, the callback must not have aborted, and we must have
2040 	 * found all of the items.
2041 	 */
2042 	if (result == REP_PROTOCOL_SUCCESS) {
2043 		for (idx = 0; idx < csi.csi_count; idx++) {
2044 			if (csi.csi_array[idx].cse_seen == 0) {
2045 				result = REP_PROTOCOL_DONE;
2046 				goto fail;
2047 			}
2048 		}
2049 	}
2050 
2051 fail:
2052 	uu_free(csi.csi_array);
2053 	return (result);
2054 }
2055 
2056 /*ARGSUSED*/
2057 static int
2058 object_copy_string(void *data_arg, int columns, char **vals, char **names)
2059 {
2060 	char **data = data_arg;
2061 
2062 	assert(columns == 1);
2063 
2064 	if (*data != NULL)
2065 		free(*data);
2066 	*data = NULL;
2067 
2068 	if (vals[0] != NULL) {
2069 		if ((*data = strdup(vals[0])) == NULL)
2070 			return (BACKEND_CALLBACK_ABORT);
2071 	}
2072 
2073 	return (BACKEND_CALLBACK_CONTINUE);
2074 }
2075 
2076 struct snaplevel_add_info {
2077 	backend_query_t *sai_q;
2078 	uint32_t	sai_level_id;
2079 	int		sai_used;		/* sai_q has been used */
2080 };
2081 
2082 /*ARGSUSED*/
2083 static int
2084 object_snaplevel_process_pg(void *data_arg, int columns, char **vals,
2085     char **names)
2086 {
2087 	struct snaplevel_add_info *data = data_arg;
2088 
2089 	assert(columns == 5);
2090 
2091 	backend_query_add(data->sai_q,
2092 	    "INSERT INTO snaplevel_lnk_tbl "
2093 	    "    (snaplvl_level_id, snaplvl_pg_id, snaplvl_pg_name, "
2094 	    "    snaplvl_pg_type, snaplvl_pg_flags, snaplvl_gen_id)"
2095 	    "VALUES (%d, %s, '%q', '%q', %s, %s);",
2096 	    data->sai_level_id, vals[0], vals[1], vals[2], vals[3], vals[4]);
2097 
2098 	data->sai_used = 1;
2099 
2100 	return (BACKEND_CALLBACK_CONTINUE);
2101 }
2102 
2103 /*ARGSUSED*/
2104 static int
2105 object_snapshot_add_level(backend_tx_t *tx, uint32_t snap_id,
2106     uint32_t snap_level_num, uint32_t svc_id, const char *svc_name,
2107     uint32_t inst_id, const char *inst_name)
2108 {
2109 	struct snaplevel_add_info data;
2110 	backend_query_t *q;
2111 	int result;
2112 
2113 	assert((snap_level_num == 1 && inst_name != NULL) ||
2114 	    snap_level_num == 2 && inst_name == NULL);
2115 
2116 	data.sai_level_id = backend_new_id(tx, BACKEND_ID_SNAPLEVEL);
2117 	if (data.sai_level_id == 0) {
2118 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
2119 	}
2120 
2121 	result = backend_tx_run_update(tx,
2122 	    "INSERT INTO snaplevel_tbl "
2123 	    "    (snap_id, snap_level_num, snap_level_id, "
2124 	    "    snap_level_service_id, snap_level_service, "
2125 	    "    snap_level_instance_id, snap_level_instance) "
2126 	    "VALUES (%d, %d, %d, %d, %Q, %d, %Q);",
2127 	    snap_id, snap_level_num, data.sai_level_id, svc_id, svc_name,
2128 	    inst_id, inst_name);
2129 
2130 	q = backend_query_alloc();
2131 	backend_query_add(q,
2132 	    "SELECT pg_id, pg_name, pg_type, pg_flags, pg_gen_id FROM pg_tbl "
2133 	    "WHERE (pg_parent_id = %d);",
2134 	    (inst_name != NULL)? inst_id : svc_id);
2135 
2136 	data.sai_q = backend_query_alloc();
2137 	data.sai_used = 0;
2138 	result = backend_tx_run(tx, q, object_snaplevel_process_pg,
2139 	    &data);
2140 	backend_query_free(q);
2141 
2142 	if (result == REP_PROTOCOL_SUCCESS && data.sai_used != 0)
2143 		result = backend_tx_run(tx, data.sai_q, NULL, NULL);
2144 	backend_query_free(data.sai_q);
2145 
2146 	return (result);
2147 }
2148 
2149 /*
2150  * Fails with:
2151  *	_NO_RESOURCES - no new id or out of disk space
2152  *	_BACKEND_READONLY - persistent backend is read-only
2153  */
2154 static int
2155 object_snapshot_do_take(uint32_t instid, const char *inst_name,
2156     uint32_t svcid, const char *svc_name,
2157     backend_tx_t **tx_out, uint32_t *snapid_out)
2158 {
2159 	backend_tx_t *tx;
2160 	backend_query_t *q;
2161 	int result;
2162 
2163 	char *svc_name_alloc = NULL;
2164 	char *inst_name_alloc = NULL;
2165 	uint32_t snapid;
2166 
2167 	result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2168 	if (result != REP_PROTOCOL_SUCCESS)
2169 		return (result);
2170 
2171 	snapid = backend_new_id(tx, BACKEND_ID_SNAPSHOT);
2172 	if (snapid == 0) {
2173 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2174 		goto fail;
2175 	}
2176 
2177 	if (svc_name == NULL) {
2178 		q = backend_query_alloc();
2179 		backend_query_add(q,
2180 		    "SELECT svc_name FROM service_tbl "
2181 		    "WHERE (svc_id = %d)", svcid);
2182 		result = backend_tx_run(tx, q, object_copy_string,
2183 		    &svc_name_alloc);
2184 		backend_query_free(q);
2185 
2186 		svc_name = svc_name_alloc;
2187 
2188 		if (result == REP_PROTOCOL_DONE) {
2189 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2190 			goto fail;
2191 		}
2192 		if (result == REP_PROTOCOL_SUCCESS && svc_name == NULL)
2193 			backend_panic("unable to find name for svc id %d\n",
2194 			    svcid);
2195 
2196 		if (result != REP_PROTOCOL_SUCCESS)
2197 			goto fail;
2198 	}
2199 
2200 	if (inst_name == NULL) {
2201 		q = backend_query_alloc();
2202 		backend_query_add(q,
2203 		    "SELECT instance_name FROM instance_tbl "
2204 		    "WHERE (instance_id = %d)", instid);
2205 		result = backend_tx_run(tx, q, object_copy_string,
2206 		    &inst_name_alloc);
2207 		backend_query_free(q);
2208 
2209 		inst_name = inst_name_alloc;
2210 
2211 		if (result == REP_PROTOCOL_DONE) {
2212 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2213 			goto fail;
2214 		}
2215 
2216 		if (result == REP_PROTOCOL_SUCCESS && inst_name == NULL)
2217 			backend_panic(
2218 			    "unable to find name for instance id %d\n", instid);
2219 
2220 		if (result != REP_PROTOCOL_SUCCESS)
2221 			goto fail;
2222 	}
2223 
2224 	result = object_snapshot_add_level(tx, snapid, 1,
2225 	    svcid, svc_name, instid, inst_name);
2226 
2227 	if (result != REP_PROTOCOL_SUCCESS)
2228 		goto fail;
2229 
2230 	result = object_snapshot_add_level(tx, snapid, 2,
2231 	    svcid, svc_name, 0, NULL);
2232 
2233 	if (result != REP_PROTOCOL_SUCCESS)
2234 		goto fail;
2235 
2236 	*snapid_out = snapid;
2237 	*tx_out = tx;
2238 
2239 	free(svc_name_alloc);
2240 	free(inst_name_alloc);
2241 
2242 	return (REP_PROTOCOL_SUCCESS);
2243 
2244 fail:
2245 	backend_tx_rollback(tx);
2246 	free(svc_name_alloc);
2247 	free(inst_name_alloc);
2248 	return (result);
2249 }
2250 
2251 /*
2252  * Fails with:
2253  *	_TYPE_MISMATCH - pp is not an instance
2254  *	_NO_RESOURCES - no new id or out of disk space
2255  *	_BACKEND_READONLY - persistent backend is read-only
2256  */
2257 int
2258 object_snapshot_take_new(rc_node_t *pp,
2259     const char *svc_name, const char *inst_name,
2260     const char *name, rc_node_t **outp)
2261 {
2262 	rc_node_lookup_t *insti = &pp->rn_id;
2263 
2264 	uint32_t instid = insti->rl_main_id;
2265 	uint32_t svcid = insti->rl_ids[ID_SERVICE];
2266 	uint32_t snapid = 0;
2267 	backend_tx_t *tx = NULL;
2268 	child_info_t ci;
2269 	rc_node_t *np;
2270 	int result;
2271 
2272 	if (insti->rl_type != REP_PROTOCOL_ENTITY_INSTANCE)
2273 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2274 
2275 	result = object_snapshot_do_take(instid, inst_name, svcid, svc_name,
2276 	    &tx, &snapid);
2277 	if (result != REP_PROTOCOL_SUCCESS)
2278 		return (result);
2279 
2280 	if ((result = object_do_create(tx, &ci, pp,
2281 	    REP_PROTOCOL_ENTITY_SNAPSHOT, name, &np)) != REP_PROTOCOL_SUCCESS) {
2282 		backend_tx_rollback(tx);
2283 		return (result);
2284 	}
2285 
2286 	/*
2287 	 * link the new object to the new snapshot.
2288 	 */
2289 	np->rn_snapshot_id = snapid;
2290 
2291 	result = backend_tx_run_update(tx,
2292 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2293 	    snapid, ci.ci_base_nl.rl_main_id);
2294 	if (result != REP_PROTOCOL_SUCCESS) {
2295 		backend_tx_rollback(tx);
2296 		rc_node_destroy(np);
2297 		return (result);
2298 	}
2299 	result = backend_tx_commit(tx);
2300 	if (result != REP_PROTOCOL_SUCCESS) {
2301 		rc_node_destroy(np);
2302 		return (result);
2303 	}
2304 
2305 	*outp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
2306 	return (REP_PROTOCOL_SUCCESS);
2307 }
2308 
2309 /*
2310  * Fails with:
2311  *	_TYPE_MISMATCH - pp is not an instance
2312  *	_NO_RESOURCES - no new id or out of disk space
2313  *	_BACKEND_READONLY - persistent backend is read-only
2314  */
2315 int
2316 object_snapshot_attach(rc_node_lookup_t *snapi, uint32_t *snapid_ptr,
2317     int takesnap)
2318 {
2319 	uint32_t svcid = snapi->rl_ids[ID_SERVICE];
2320 	uint32_t instid = snapi->rl_ids[ID_INSTANCE];
2321 	uint32_t snapid = *snapid_ptr;
2322 	uint32_t oldsnapid = 0;
2323 	backend_tx_t *tx = NULL;
2324 	backend_query_t *q;
2325 	int result;
2326 
2327 	delete_info_t dip;
2328 	delete_ent_t de;
2329 
2330 	if (snapi->rl_type != REP_PROTOCOL_ENTITY_SNAPSHOT)
2331 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2332 
2333 	if (takesnap) {
2334 		/* first, check that we're actually out of date */
2335 		if (object_check_snapshot(snapid) == REP_PROTOCOL_SUCCESS)
2336 			return (REP_PROTOCOL_SUCCESS);
2337 
2338 		result = object_snapshot_do_take(instid, NULL,
2339 		    svcid, NULL, &tx, &snapid);
2340 		if (result != REP_PROTOCOL_SUCCESS)
2341 			return (result);
2342 	} else {
2343 		result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2344 		if (result != REP_PROTOCOL_SUCCESS)
2345 			return (result);
2346 	}
2347 
2348 	q = backend_query_alloc();
2349 	backend_query_add(q,
2350 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
2351 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2352 	    snapi->rl_main_id, snapid, snapi->rl_main_id);
2353 	result = backend_tx_run_single_int(tx, q, &oldsnapid);
2354 	backend_query_free(q);
2355 
2356 	if (result == REP_PROTOCOL_FAIL_NOT_FOUND) {
2357 		backend_tx_rollback(tx);
2358 		backend_panic("unable to find snapshot id %d",
2359 		    snapi->rl_main_id);
2360 	}
2361 	if (result != REP_PROTOCOL_SUCCESS)
2362 		goto fail;
2363 
2364 	/*
2365 	 * Now we use the delete stack to handle the possible unreferencing
2366 	 * of oldsnapid.
2367 	 */
2368 	(void) memset(&dip, 0, sizeof (dip));
2369 	dip.di_tx = tx;
2370 	dip.di_np_tx = NULL;	/* no need for non-persistant backend */
2371 
2372 	if ((result = delete_stack_push(&dip, BACKEND_TYPE_NORMAL,
2373 	    &snaplevel_tbl_delete, oldsnapid, 0)) != REP_PROTOCOL_SUCCESS)
2374 		goto fail;
2375 
2376 	while (delete_stack_pop(&dip, &de)) {
2377 		result = (*de.de_cb)(&dip, &de);
2378 		if (result != REP_PROTOCOL_SUCCESS)
2379 			goto fail;
2380 	}
2381 
2382 	result = backend_tx_commit(tx);
2383 	if (result != REP_PROTOCOL_SUCCESS)
2384 		goto fail;
2385 
2386 	delete_stack_cleanup(&dip);
2387 	*snapid_ptr = snapid;
2388 	return (REP_PROTOCOL_SUCCESS);
2389 
2390 fail:
2391 	backend_tx_rollback(tx);
2392 	delete_stack_cleanup(&dip);
2393 	return (result);
2394 }
2395