xref: /illumos-gate/usr/src/cmd/svc/configd/file_object.c (revision 9164a50bf932130cbb5097a16f6986873ce0e6e5)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  * Copyright (c) 2016 by Delphix. All rights reserved.
25  */
26 
27 /*
28  * file_object.c - enter objects into and load them from the backend
29  *
30  * The primary entry points in this layer are object_create(),
31  * object_create_pg(), object_delete(), and object_fill_children().  They each
32  * take an rc_node_t and use the functions in the object_info_t info array for
33  * the node's type.
34  */
35 
36 #include <assert.h>
37 #include <pthread.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <strings.h>
42 
43 #include "configd.h"
44 #include "repcache_protocol.h"
45 
46 typedef struct child_info {
47 	rc_node_t	*ci_parent;
48 	backend_tx_t	*ci_tx;			/* only for properties */
49 	rc_node_lookup_t ci_base_nl;
50 } child_info_t;
51 
52 typedef struct delete_ent delete_ent_t;
53 typedef struct delete_stack delete_stack_t;
54 typedef struct delete_info delete_info_t;
55 
56 typedef int	delete_cb_func(delete_info_t *, const delete_ent_t *);
57 
58 struct delete_ent {
59 	delete_cb_func	*de_cb;		/* callback */
60 	uint32_t	de_backend;
61 	uint32_t	de_id;
62 	uint32_t	de_gen;		/* only for property groups */
63 };
64 
65 struct delete_stack {
66 	struct delete_stack *ds_next;
67 	uint32_t	ds_size;	/* number of elements */
68 	uint32_t	ds_cur;		/* current offset */
69 	delete_ent_t	ds_buf[1];	/* actually ds_size */
70 };
71 #define	DELETE_STACK_SIZE(x)	offsetof(delete_stack_t, ds_buf[(x)])
72 
73 struct delete_info {
74 	backend_tx_t	*di_tx;
75 	backend_tx_t	*di_np_tx;
76 	delete_stack_t	*di_stack;
77 	delete_stack_t	*di_free;
78 };
79 
80 typedef struct object_info {
81 	uint32_t	obj_type;
82 	enum id_space	obj_id_space;
83 
84 	int (*obj_fill_children)(rc_node_t *);
85 	int (*obj_setup_child_info)(rc_node_t *, uint32_t, child_info_t *);
86 	int (*obj_query_child)(backend_query_t *, rc_node_lookup_t *,
87 	    const char *);
88 	int (*obj_insert_child)(backend_tx_t *, rc_node_lookup_t *,
89 	    const char *);
90 	int (*obj_insert_pg_child)(backend_tx_t *, rc_node_lookup_t *,
91 	    const char *, const char *, uint32_t, uint32_t);
92 	int (*obj_delete_start)(rc_node_t *, delete_info_t *);
93 } object_info_t;
94 
95 static void
96 string_to_id(const char *str, uint32_t *output, const char *fieldname)
97 {
98 	if (uu_strtouint(str, output, sizeof (*output), 0, 0, 0) == -1)
99 		backend_panic("invalid integer \"%s\" in field \"%s\"",
100 		    str, fieldname);
101 }
102 
103 #define	NUM_NEEDED	50
104 
105 static int
106 delete_stack_push(delete_info_t *dip, uint32_t be, delete_cb_func *cb,
107     uint32_t id, uint32_t gen)
108 {
109 	delete_stack_t *cur = dip->di_stack;
110 	delete_ent_t *ent;
111 
112 	if (cur == NULL || cur->ds_cur == cur->ds_size) {
113 		delete_stack_t *new = dip->di_free;
114 		dip->di_free = NULL;
115 		if (new == NULL) {
116 			new = uu_zalloc(DELETE_STACK_SIZE(NUM_NEEDED));
117 			if (new == NULL)
118 				return (REP_PROTOCOL_FAIL_NO_RESOURCES);
119 			new->ds_size = NUM_NEEDED;
120 		}
121 		new->ds_cur = 0;
122 		new->ds_next = dip->di_stack;
123 		dip->di_stack = new;
124 		cur = new;
125 	}
126 	assert(cur->ds_cur < cur->ds_size);
127 	ent = &cur->ds_buf[cur->ds_cur++];
128 
129 	ent->de_backend = be;
130 	ent->de_cb = cb;
131 	ent->de_id = id;
132 	ent->de_gen = gen;
133 
134 	return (REP_PROTOCOL_SUCCESS);
135 }
136 
137 static int
138 delete_stack_pop(delete_info_t *dip, delete_ent_t *out)
139 {
140 	delete_stack_t *cur = dip->di_stack;
141 	delete_ent_t *ent;
142 
143 	if (cur == NULL)
144 		return (0);
145 	assert(cur->ds_cur > 0 && cur->ds_cur <= cur->ds_size);
146 	ent = &cur->ds_buf[--cur->ds_cur];
147 	if (cur->ds_cur == 0) {
148 		dip->di_stack = cur->ds_next;
149 		cur->ds_next = NULL;
150 
151 		if (dip->di_free != NULL)
152 			uu_free(dip->di_free);
153 		dip->di_free = cur;
154 	}
155 	if (ent == NULL)
156 		return (0);
157 
158 	*out = *ent;
159 	return (1);
160 }
161 
162 static void
163 delete_stack_cleanup(delete_info_t *dip)
164 {
165 	delete_stack_t *cur;
166 	while ((cur = dip->di_stack) != NULL) {
167 		dip->di_stack = cur->ds_next;
168 
169 		uu_free(cur);
170 	}
171 
172 	if ((cur = dip->di_free) != NULL) {
173 		assert(cur->ds_next == NULL);	/* should only be one */
174 		uu_free(cur);
175 		dip->di_free = NULL;
176 	}
177 }
178 
179 struct delete_cb_info {
180 	delete_info_t	*dci_dip;
181 	uint32_t	dci_be;
182 	delete_cb_func	*dci_cb;
183 	int		dci_result;
184 };
185 
186 /*ARGSUSED*/
187 static int
188 push_delete_callback(void *data, int columns, char **vals, char **names)
189 {
190 	struct delete_cb_info *info = data;
191 
192 	const char *id_str = *vals++;
193 	const char *gen_str = *vals++;
194 
195 	uint32_t id;
196 	uint32_t gen;
197 
198 	assert(columns == 2);
199 
200 	string_to_id(id_str, &id, "id");
201 	string_to_id(gen_str, &gen, "gen_id");
202 
203 	info->dci_result = delete_stack_push(info->dci_dip, info->dci_be,
204 	    info->dci_cb, id, gen);
205 
206 	if (info->dci_result != REP_PROTOCOL_SUCCESS)
207 		return (BACKEND_CALLBACK_ABORT);
208 	return (BACKEND_CALLBACK_CONTINUE);
209 }
210 
211 static int
212 value_delete(delete_info_t *dip, const delete_ent_t *ent)
213 {
214 	uint32_t be = ent->de_backend;
215 	int r;
216 
217 	backend_query_t *q;
218 
219 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
220 	    dip->di_np_tx;
221 
222 	q = backend_query_alloc();
223 
224 	backend_query_add(q,
225 	    "SELECT 1 FROM prop_lnk_tbl WHERE (lnk_val_id = %d); "
226 	    "DELETE FROM value_tbl WHERE (value_id = %d); ",
227 	    ent->de_id, ent->de_id);
228 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
229 	backend_query_free(q);
230 	if (r == REP_PROTOCOL_DONE)
231 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
232 	return (r);
233 }
234 
235 static int
236 pg_lnk_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
237 {
238 	struct delete_cb_info info;
239 	uint32_t be = ent->de_backend;
240 	int r;
241 
242 	backend_query_t *q;
243 
244 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
245 	    dip->di_np_tx;
246 
247 	/*
248 	 * For non-persistent backends, we could only have one parent, and
249 	 * it's already been deleted.
250 	 *
251 	 * For normal backends, we need to check to see if we're in
252 	 * a snapshot or are the active generation for the property
253 	 * group.  If we are, there's nothing to be done.
254 	 */
255 	if (be == BACKEND_TYPE_NORMAL) {
256 		q = backend_query_alloc();
257 		backend_query_add(q,
258 		    "SELECT 1 "
259 		    "FROM pg_tbl "
260 		    "WHERE (pg_id = %d AND pg_gen_id = %d); "
261 		    "SELECT 1 "
262 		    "FROM snaplevel_lnk_tbl "
263 		    "WHERE (snaplvl_pg_id = %d AND snaplvl_gen_id = %d);",
264 		    ent->de_id, ent->de_gen,
265 		    ent->de_id, ent->de_gen);
266 		r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
267 		backend_query_free(q);
268 
269 		if (r == REP_PROTOCOL_DONE)
270 			return (REP_PROTOCOL_SUCCESS);	/* still in use */
271 	}
272 
273 	info.dci_dip = dip;
274 	info.dci_be =  be;
275 	info.dci_cb = &value_delete;
276 	info.dci_result = REP_PROTOCOL_SUCCESS;
277 
278 	q = backend_query_alloc();
279 	backend_query_add(q,
280 	    "SELECT DISTINCT lnk_val_id, 0 FROM prop_lnk_tbl "
281 	    "WHERE "
282 	    "    (lnk_pg_id = %d AND lnk_gen_id = %d AND lnk_val_id NOTNULL); "
283 	    "DELETE FROM prop_lnk_tbl "
284 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
285 	    ent->de_id, ent->de_gen, ent->de_id, ent->de_gen);
286 
287 	r = backend_tx_run(tx, q, push_delete_callback, &info);
288 	backend_query_free(q);
289 
290 	if (r == REP_PROTOCOL_DONE) {
291 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
292 		return (info.dci_result);
293 	}
294 	return (r);
295 }
296 
297 static int
298 propertygrp_delete(delete_info_t *dip, const delete_ent_t *ent)
299 {
300 	uint32_t be = ent->de_backend;
301 	backend_query_t *q;
302 	uint32_t gen;
303 
304 	int r;
305 
306 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
307 	    dip->di_np_tx;
308 
309 	q = backend_query_alloc();
310 	backend_query_add(q,
311 	    "SELECT pg_gen_id FROM pg_tbl WHERE pg_id = %d; "
312 	    "DELETE FROM pg_tbl WHERE pg_id = %d",
313 	    ent->de_id, ent->de_id);
314 	r = backend_tx_run_single_int(tx, q, &gen);
315 	backend_query_free(q);
316 
317 	if (r != REP_PROTOCOL_SUCCESS)
318 		return (r);
319 
320 	return (delete_stack_push(dip, be, &pg_lnk_tbl_delete,
321 	    ent->de_id, gen));
322 }
323 
324 static int
325 snaplevel_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
326 {
327 	uint32_t be = ent->de_backend;
328 	backend_query_t *q;
329 	struct delete_cb_info info;
330 
331 	int r;
332 
333 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
334 	    dip->di_np_tx;
335 
336 	info.dci_dip = dip;
337 	info.dci_be = be;
338 	info.dci_cb = &pg_lnk_tbl_delete;
339 	info.dci_result = REP_PROTOCOL_SUCCESS;
340 
341 	q = backend_query_alloc();
342 	backend_query_add(q,
343 	    "SELECT snaplvl_pg_id, snaplvl_gen_id "
344 	    "    FROM snaplevel_lnk_tbl "
345 	    "    WHERE snaplvl_level_id = %d; "
346 	    "DELETE FROM snaplevel_lnk_tbl WHERE snaplvl_level_id = %d",
347 	    ent->de_id, ent->de_id);
348 	r = backend_tx_run(tx, q, push_delete_callback, &info);
349 	backend_query_free(q);
350 
351 	if (r == REP_PROTOCOL_DONE) {
352 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
353 		return (info.dci_result);
354 	}
355 	return (r);
356 }
357 
358 static int
359 snaplevel_tbl_delete(delete_info_t *dip, const delete_ent_t *ent)
360 {
361 	uint32_t be = ent->de_backend;
362 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
363 	    dip->di_np_tx;
364 
365 	struct delete_cb_info info;
366 	backend_query_t *q;
367 	int r;
368 
369 	assert(be == BACKEND_TYPE_NORMAL);
370 
371 	q = backend_query_alloc();
372 	backend_query_add(q,
373 	    "SELECT 1 FROM snapshot_lnk_tbl WHERE lnk_snap_id = %d",
374 	    ent->de_id);
375 	r = backend_tx_run(tx, q, backend_fail_if_seen, NULL);
376 	backend_query_free(q);
377 
378 	if (r == REP_PROTOCOL_DONE)
379 		return (REP_PROTOCOL_SUCCESS);		/* still in use */
380 
381 	info.dci_dip = dip;
382 	info.dci_be = be;
383 	info.dci_cb = &snaplevel_lnk_delete;
384 	info.dci_result = REP_PROTOCOL_SUCCESS;
385 
386 	q = backend_query_alloc();
387 	backend_query_add(q,
388 	    "SELECT snap_level_id, 0 FROM snaplevel_tbl WHERE snap_id = %d;"
389 	    "DELETE FROM snaplevel_tbl WHERE snap_id = %d",
390 	    ent->de_id, ent->de_id);
391 	r = backend_tx_run(tx, q, push_delete_callback, &info);
392 	backend_query_free(q);
393 
394 	if (r == REP_PROTOCOL_DONE) {
395 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
396 		return (info.dci_result);
397 	}
398 	return (r);
399 }
400 
401 static int
402 snapshot_lnk_delete(delete_info_t *dip, const delete_ent_t *ent)
403 {
404 	uint32_t be = ent->de_backend;
405 	backend_tx_t *tx = (be == BACKEND_TYPE_NORMAL)? dip->di_tx :
406 	    dip->di_np_tx;
407 
408 	backend_query_t *q;
409 	uint32_t snapid;
410 	int r;
411 
412 	assert(be == BACKEND_TYPE_NORMAL);
413 
414 	q = backend_query_alloc();
415 	backend_query_add(q,
416 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
417 	    "DELETE FROM snapshot_lnk_tbl WHERE lnk_id = %d",
418 	    ent->de_id, ent->de_id);
419 	r = backend_tx_run_single_int(tx, q, &snapid);
420 	backend_query_free(q);
421 
422 	if (r != REP_PROTOCOL_SUCCESS)
423 		return (r);
424 
425 	return (delete_stack_push(dip, be, &snaplevel_tbl_delete, snapid, 0));
426 }
427 
428 static int
429 pgparent_delete_add_pgs(delete_info_t *dip, uint32_t parent_id)
430 {
431 	struct delete_cb_info info;
432 	backend_query_t *q;
433 	int r;
434 
435 	info.dci_dip = dip;
436 	info.dci_be = BACKEND_TYPE_NORMAL;
437 	info.dci_cb = &propertygrp_delete;
438 	info.dci_result = REP_PROTOCOL_SUCCESS;
439 
440 	q = backend_query_alloc();
441 	backend_query_add(q,
442 	    "SELECT pg_id, 0 FROM pg_tbl WHERE pg_parent_id = %d",
443 	    parent_id);
444 
445 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
446 
447 	if (r == REP_PROTOCOL_DONE) {
448 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
449 		backend_query_free(q);
450 		return (info.dci_result);
451 	}
452 	if (r != REP_PROTOCOL_SUCCESS) {
453 		backend_query_free(q);
454 		return (r);
455 	}
456 
457 	if (dip->di_np_tx != NULL) {
458 		info.dci_be = BACKEND_TYPE_NONPERSIST;
459 
460 		r = backend_tx_run(dip->di_np_tx, q, push_delete_callback,
461 		    &info);
462 
463 		if (r == REP_PROTOCOL_DONE) {
464 			assert(info.dci_result != REP_PROTOCOL_SUCCESS);
465 			backend_query_free(q);
466 			return (info.dci_result);
467 		}
468 		if (r != REP_PROTOCOL_SUCCESS) {
469 			backend_query_free(q);
470 			return (r);
471 		}
472 	}
473 	backend_query_free(q);
474 	return (REP_PROTOCOL_SUCCESS);
475 }
476 
477 static int
478 service_delete(delete_info_t *dip, const delete_ent_t *ent)
479 {
480 	int r;
481 
482 	r = backend_tx_run_update_changed(dip->di_tx,
483 	    "DELETE FROM service_tbl WHERE svc_id = %d", ent->de_id);
484 	if (r != REP_PROTOCOL_SUCCESS)
485 		return (r);
486 
487 	return (pgparent_delete_add_pgs(dip, ent->de_id));
488 }
489 
490 static int
491 instance_delete(delete_info_t *dip, const delete_ent_t *ent)
492 {
493 	struct delete_cb_info info;
494 	int r;
495 	backend_query_t *q;
496 
497 	r = backend_tx_run_update_changed(dip->di_tx,
498 	    "DELETE FROM instance_tbl WHERE instance_id = %d", ent->de_id);
499 	if (r != REP_PROTOCOL_SUCCESS)
500 		return (r);
501 
502 	r = pgparent_delete_add_pgs(dip, ent->de_id);
503 	if (r != REP_PROTOCOL_SUCCESS)
504 		return (r);
505 
506 	info.dci_dip = dip;
507 	info.dci_be = BACKEND_TYPE_NORMAL;
508 	info.dci_cb = &snapshot_lnk_delete;
509 	info.dci_result = REP_PROTOCOL_SUCCESS;
510 
511 	q = backend_query_alloc();
512 	backend_query_add(q,
513 	    "SELECT lnk_id, 0 FROM snapshot_lnk_tbl WHERE lnk_inst_id = %d",
514 	    ent->de_id);
515 	r = backend_tx_run(dip->di_tx, q, push_delete_callback, &info);
516 	backend_query_free(q);
517 
518 	if (r == REP_PROTOCOL_DONE) {
519 		assert(info.dci_result != REP_PROTOCOL_SUCCESS);
520 		return (info.dci_result);
521 	}
522 	return (r);
523 }
524 
525 /*ARGSUSED*/
526 static int
527 fill_child_callback(void *data, int columns, char **vals, char **names)
528 {
529 	child_info_t *cp = data;
530 	rc_node_t *np;
531 	uint32_t main_id;
532 	const char *name;
533 	const char *cur;
534 	rc_node_lookup_t *lp = &cp->ci_base_nl;
535 
536 	assert(columns == 2);
537 
538 	name = *vals++;
539 	columns--;
540 
541 	cur = *vals++;
542 	columns--;
543 	string_to_id(cur, &main_id, "id");
544 
545 	lp->rl_main_id = main_id;
546 
547 	if ((np = rc_node_alloc()) == NULL)
548 		return (BACKEND_CALLBACK_ABORT);
549 
550 	np = rc_node_setup(np, lp, name, cp->ci_parent);
551 	rc_node_rele(np);
552 
553 	return (BACKEND_CALLBACK_CONTINUE);
554 }
555 
556 /*ARGSUSED*/
557 static int
558 fill_snapshot_callback(void *data, int columns, char **vals, char **names)
559 {
560 	child_info_t *cp = data;
561 	rc_node_t *np;
562 	uint32_t main_id;
563 	uint32_t snap_id;
564 	const char *name;
565 	const char *cur;
566 	const char *snap;
567 	rc_node_lookup_t *lp = &cp->ci_base_nl;
568 
569 	assert(columns == 3);
570 
571 	name = *vals++;
572 	columns--;
573 
574 	cur = *vals++;
575 	columns--;
576 	snap = *vals++;
577 	columns--;
578 
579 	string_to_id(cur, &main_id, "lnk_id");
580 	string_to_id(snap, &snap_id, "lnk_snap_id");
581 
582 	lp->rl_main_id = main_id;
583 
584 	if ((np = rc_node_alloc()) == NULL)
585 		return (BACKEND_CALLBACK_ABORT);
586 
587 	np = rc_node_setup_snapshot(np, lp, name, snap_id, cp->ci_parent);
588 	rc_node_rele(np);
589 
590 	return (BACKEND_CALLBACK_CONTINUE);
591 }
592 
593 /*ARGSUSED*/
594 static int
595 fill_pg_callback(void *data, int columns, char **vals, char **names)
596 {
597 	child_info_t *cip = data;
598 	const char *name;
599 	const char *type;
600 	const char *cur;
601 	uint32_t main_id;
602 	uint32_t flags;
603 	uint32_t gen_id;
604 
605 	rc_node_lookup_t *lp = &cip->ci_base_nl;
606 	rc_node_t *newnode, *pg;
607 
608 	assert(columns == 5);
609 
610 	name = *vals++;		/* pg_name */
611 	columns--;
612 
613 	cur = *vals++;		/* pg_id */
614 	columns--;
615 	string_to_id(cur, &main_id, "pg_id");
616 
617 	lp->rl_main_id = main_id;
618 
619 	cur = *vals++;		/* pg_gen_id */
620 	columns--;
621 	string_to_id(cur, &gen_id, "pg_gen_id");
622 
623 	type = *vals++;		/* pg_type */
624 	columns--;
625 
626 	cur = *vals++;		/* pg_flags */
627 	columns--;
628 	string_to_id(cur, &flags, "pg_flags");
629 
630 	if ((newnode = rc_node_alloc()) == NULL)
631 		return (BACKEND_CALLBACK_ABORT);
632 
633 	pg = rc_node_setup_pg(newnode, lp, name, type, flags, gen_id,
634 	    cip->ci_parent);
635 	if (pg == NULL) {
636 		rc_node_destroy(newnode);
637 		return (BACKEND_CALLBACK_ABORT);
638 	}
639 
640 	rc_node_rele(pg);
641 
642 	return (BACKEND_CALLBACK_CONTINUE);
643 }
644 
645 struct property_value_info {
646 	char		*pvi_base;
647 	size_t		pvi_pos;
648 	size_t		pvi_size;
649 	size_t		pvi_count;
650 };
651 
652 /*ARGSUSED*/
653 static int
654 property_value_size_cb(void *data, int columns, char **vals, char **names)
655 {
656 	struct property_value_info *info = data;
657 	assert(columns == 1);
658 
659 	info->pvi_size += strlen(vals[0]) + 1;		/* count the '\0' */
660 
661 	return (BACKEND_CALLBACK_CONTINUE);
662 }
663 
664 /*ARGSUSED*/
665 static int
666 property_value_cb(void *data, int columns, char **vals, char **names)
667 {
668 	struct property_value_info *info = data;
669 	size_t pos, left, len;
670 
671 	assert(columns == 1);
672 	pos = info->pvi_pos;
673 	left = info->pvi_size - pos;
674 
675 	pos = info->pvi_pos;
676 	left = info->pvi_size - pos;
677 
678 	if ((len = strlcpy(&info->pvi_base[pos], vals[0], left)) >= left) {
679 		/*
680 		 * since we preallocated, above, this shouldn't happen
681 		 */
682 		backend_panic("unexpected database change");
683 	}
684 
685 	len += 1;	/* count the '\0' */
686 
687 	info->pvi_pos += len;
688 	info->pvi_count++;
689 
690 	return (BACKEND_CALLBACK_CONTINUE);
691 }
692 
693 /*ARGSUSED*/
694 void
695 object_free_values(const char *vals, uint32_t type, size_t count, size_t size)
696 {
697 	if (vals != NULL)
698 		uu_free((void *)vals);
699 }
700 
701 /*ARGSUSED*/
702 static int
703 fill_property_callback(void *data, int columns, char **vals, char **names)
704 {
705 	child_info_t *cp = data;
706 	backend_tx_t *tx = cp->ci_tx;
707 	uint32_t main_id;
708 	const char *name;
709 	const char *cur;
710 	rep_protocol_value_type_t type;
711 	rc_node_lookup_t *lp = &cp->ci_base_nl;
712 	struct property_value_info info;
713 	int rc;
714 
715 	assert(columns == 4);
716 	assert(tx != NULL);
717 
718 	info.pvi_base = NULL;
719 	info.pvi_pos = 0;
720 	info.pvi_size = 0;
721 	info.pvi_count = 0;
722 
723 	name = *vals++;
724 
725 	cur = *vals++;
726 	string_to_id(cur, &main_id, "lnk_prop_id");
727 
728 	cur = *vals++;
729 	assert(('a' <= cur[0] && 'z' >= cur[0]) ||
730 	    ('A' <= cur[0] && 'Z' >= cur[0]) &&
731 	    (cur[1] == 0 || ('a' <= cur[1] && 'z' >= cur[1]) ||
732 	    ('A' <= cur[1] && 'Z' >= cur[1])));
733 	type = cur[0] | (cur[1] << 8);
734 
735 	lp->rl_main_id = main_id;
736 
737 	/*
738 	 * fill in the values, if any
739 	 */
740 	if ((cur = *vals++) != NULL) {
741 		rep_protocol_responseid_t r;
742 		backend_query_t *q = backend_query_alloc();
743 
744 		/*
745 		 * Ensure that select operation is reflective
746 		 * of repository schema.  If the repository has
747 		 * been upgraded,  make use of value ordering
748 		 * by retrieving values in order using the
749 		 * value_order column.  Otherwise, simply
750 		 * run the select with no order specified.
751 		 * The order-insensitive select is necessary
752 		 * as on first reboot post-upgrade,  the repository
753 		 * contents need to be read before the repository
754 		 * backend is writable (and upgrade is possible).
755 		 */
756 		if (backend_is_upgraded(tx)) {
757 			backend_query_add(q,
758 			    "SELECT value_value FROM value_tbl "
759 			    "WHERE (value_id = '%q') ORDER BY value_order",
760 			    cur);
761 		} else {
762 			backend_query_add(q,
763 			    "SELECT value_value FROM value_tbl "
764 			    "WHERE (value_id = '%q')",
765 			    cur);
766 		}
767 
768 		switch (r = backend_tx_run(tx, q, property_value_size_cb,
769 		    &info)) {
770 		case REP_PROTOCOL_SUCCESS:
771 			break;
772 
773 		case REP_PROTOCOL_FAIL_NO_RESOURCES:
774 			backend_query_free(q);
775 			return (BACKEND_CALLBACK_ABORT);
776 
777 		case REP_PROTOCOL_DONE:
778 		default:
779 			backend_panic("backend_tx_run() returned %d", r);
780 		}
781 		if (info.pvi_size > 0) {
782 			info.pvi_base = uu_zalloc(info.pvi_size);
783 			if (info.pvi_base == NULL) {
784 				backend_query_free(q);
785 				return (BACKEND_CALLBACK_ABORT);
786 			}
787 			switch (r = backend_tx_run(tx, q, property_value_cb,
788 			    &info)) {
789 			case REP_PROTOCOL_SUCCESS:
790 				break;
791 
792 			case REP_PROTOCOL_FAIL_NO_RESOURCES:
793 				uu_free(info.pvi_base);
794 				backend_query_free(q);
795 				return (BACKEND_CALLBACK_ABORT);
796 
797 			case REP_PROTOCOL_DONE:
798 			default:
799 				backend_panic("backend_tx_run() returned %d",
800 				    r);
801 			}
802 		}
803 		backend_query_free(q);
804 	}
805 
806 	rc = rc_node_create_property(cp->ci_parent, lp, name, type,
807 	    info.pvi_base, info.pvi_count, info.pvi_size);
808 	if (rc != REP_PROTOCOL_SUCCESS) {
809 		assert(rc == REP_PROTOCOL_FAIL_NO_RESOURCES);
810 		return (BACKEND_CALLBACK_ABORT);
811 	}
812 
813 	return (BACKEND_CALLBACK_CONTINUE);
814 }
815 
816 /*
817  * The *_setup_child_info() functions fill in a child_info_t structure with the
818  * information for the children of np with type type.
819  *
820  * They fail with
821  *   _TYPE_MISMATCH - object cannot have children of type type
822  */
823 
824 static int
825 scope_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
826 {
827 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
828 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
829 
830 	bzero(cip, sizeof (*cip));
831 	cip->ci_parent = np;
832 	cip->ci_base_nl.rl_type = type;
833 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
834 	return (REP_PROTOCOL_SUCCESS);
835 }
836 
837 static int
838 service_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
839 {
840 	switch (type) {
841 	case REP_PROTOCOL_ENTITY_INSTANCE:
842 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
843 		break;
844 	default:
845 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
846 	}
847 
848 	bzero(cip, sizeof (*cip));
849 	cip->ci_parent = np;
850 	cip->ci_base_nl.rl_type = type;
851 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
852 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_main_id;
853 
854 	return (REP_PROTOCOL_SUCCESS);
855 }
856 
857 static int
858 instance_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
859 {
860 	switch (type) {
861 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
862 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
863 		break;
864 	default:
865 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
866 	}
867 
868 	bzero(cip, sizeof (*cip));
869 	cip->ci_parent = np;
870 	cip->ci_base_nl.rl_type = type;
871 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
872 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
873 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_main_id;
874 
875 	return (REP_PROTOCOL_SUCCESS);
876 }
877 
878 static int
879 snaplevel_setup_child_info(rc_node_t *np, uint32_t type, child_info_t *cip)
880 {
881 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
882 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
883 
884 	bzero(cip, sizeof (*cip));
885 	cip->ci_parent = np;
886 	cip->ci_base_nl.rl_type = type;
887 	cip->ci_base_nl.rl_backend = np->rn_id.rl_backend;
888 	cip->ci_base_nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
889 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
890 	cip->ci_base_nl.rl_ids[ID_NAME] = np->rn_id.rl_ids[ID_NAME];
891 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = np->rn_id.rl_ids[ID_SNAPSHOT];
892 	cip->ci_base_nl.rl_ids[ID_LEVEL] = np->rn_id.rl_main_id;
893 
894 	return (REP_PROTOCOL_SUCCESS);
895 }
896 
897 static int
898 propertygrp_setup_child_info(rc_node_t *pg, uint32_t type, child_info_t *cip)
899 {
900 	if (type != REP_PROTOCOL_ENTITY_PROPERTY)
901 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
902 
903 	bzero(cip, sizeof (*cip));
904 	cip->ci_parent = pg;
905 	cip->ci_base_nl.rl_type = type;
906 	cip->ci_base_nl.rl_backend = pg->rn_id.rl_backend;
907 	cip->ci_base_nl.rl_ids[ID_SERVICE] = pg->rn_id.rl_ids[ID_SERVICE];
908 	cip->ci_base_nl.rl_ids[ID_INSTANCE] = pg->rn_id.rl_ids[ID_INSTANCE];
909 	cip->ci_base_nl.rl_ids[ID_PG] = pg->rn_id.rl_main_id;
910 	cip->ci_base_nl.rl_ids[ID_GEN] = pg->rn_gen_id;
911 	cip->ci_base_nl.rl_ids[ID_NAME] = pg->rn_id.rl_ids[ID_NAME];
912 	cip->ci_base_nl.rl_ids[ID_SNAPSHOT] = pg->rn_id.rl_ids[ID_SNAPSHOT];
913 	cip->ci_base_nl.rl_ids[ID_LEVEL] = pg->rn_id.rl_ids[ID_LEVEL];
914 
915 	return (REP_PROTOCOL_SUCCESS);
916 }
917 
918 /*
919  * The *_fill_children() functions populate the children of the given rc_node_t
920  * by querying the database and calling rc_node_setup_*() functions (usually
921  * via a fill_*_callback()).
922  *
923  * They fail with
924  *   _NO_RESOURCES
925  */
926 
927 /*
928  * Returns
929  *   _NO_RESOURCES
930  *   _SUCCESS
931  */
932 static int
933 scope_fill_children(rc_node_t *np)
934 {
935 	backend_query_t *q;
936 	child_info_t ci;
937 	int res;
938 
939 	(void) scope_setup_child_info(np, REP_PROTOCOL_ENTITY_SERVICE, &ci);
940 
941 	q = backend_query_alloc();
942 	backend_query_append(q, "SELECT svc_name, svc_id FROM service_tbl");
943 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
944 	backend_query_free(q);
945 
946 	if (res == REP_PROTOCOL_DONE)
947 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
948 	return (res);
949 }
950 
951 /*
952  * Returns
953  *   _NO_RESOURCES
954  *   _SUCCESS
955  */
956 static int
957 service_fill_children(rc_node_t *np)
958 {
959 	backend_query_t *q;
960 	child_info_t ci;
961 	int res;
962 
963 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
964 
965 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_INSTANCE, &ci);
966 
967 	q = backend_query_alloc();
968 	backend_query_add(q,
969 	    "SELECT instance_name, instance_id FROM instance_tbl"
970 	    "    WHERE (instance_svc = %d)",
971 	    np->rn_id.rl_main_id);
972 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_child_callback, &ci);
973 	backend_query_free(q);
974 
975 	if (res == REP_PROTOCOL_DONE)
976 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
977 	if (res != REP_PROTOCOL_SUCCESS)
978 		return (res);
979 
980 	(void) service_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
981 	    &ci);
982 
983 	q = backend_query_alloc();
984 	backend_query_add(q,
985 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
986 	    "    WHERE (pg_parent_id = %d)",
987 	    np->rn_id.rl_main_id);
988 
989 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
990 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
991 	if (res == REP_PROTOCOL_SUCCESS) {
992 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
993 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
994 		    fill_pg_callback, &ci);
995 		/* nonpersistant database may not exist */
996 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
997 			res = REP_PROTOCOL_SUCCESS;
998 	}
999 	if (res == REP_PROTOCOL_DONE)
1000 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1001 	backend_query_free(q);
1002 
1003 	return (res);
1004 }
1005 
1006 /*
1007  * Returns
1008  *   _NO_RESOURCES
1009  *   _SUCCESS
1010  */
1011 static int
1012 instance_fill_children(rc_node_t *np)
1013 {
1014 	backend_query_t *q;
1015 	child_info_t ci;
1016 	int res;
1017 
1018 	assert(np->rn_id.rl_backend == BACKEND_TYPE_NORMAL);
1019 
1020 	/* Get child property groups */
1021 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1022 	    &ci);
1023 
1024 	q = backend_query_alloc();
1025 	backend_query_add(q,
1026 	    "SELECT pg_name, pg_id, pg_gen_id, pg_type, pg_flags FROM pg_tbl"
1027 	    "    WHERE (pg_parent_id = %d)",
1028 	    np->rn_id.rl_main_id);
1029 	ci.ci_base_nl.rl_backend = BACKEND_TYPE_NORMAL;
1030 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1031 	if (res == REP_PROTOCOL_SUCCESS) {
1032 		ci.ci_base_nl.rl_backend = BACKEND_TYPE_NONPERSIST;
1033 		res = backend_run(BACKEND_TYPE_NONPERSIST, q,
1034 		    fill_pg_callback, &ci);
1035 		/* nonpersistant database may not exist */
1036 		if (res == REP_PROTOCOL_FAIL_BACKEND_ACCESS)
1037 			res = REP_PROTOCOL_SUCCESS;
1038 	}
1039 	if (res == REP_PROTOCOL_DONE)
1040 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1041 	backend_query_free(q);
1042 
1043 	if (res != REP_PROTOCOL_SUCCESS)
1044 		return (res);
1045 
1046 	/* Get child snapshots */
1047 	(void) instance_setup_child_info(np, REP_PROTOCOL_ENTITY_SNAPSHOT,
1048 	    &ci);
1049 
1050 	q = backend_query_alloc();
1051 	backend_query_add(q,
1052 	    "SELECT lnk_snap_name, lnk_id, lnk_snap_id FROM snapshot_lnk_tbl"
1053 	    "    WHERE (lnk_inst_id = %d)",
1054 	    np->rn_id.rl_main_id);
1055 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_callback, &ci);
1056 	if (res == REP_PROTOCOL_DONE)
1057 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1058 	backend_query_free(q);
1059 
1060 	return (res);
1061 }
1062 
1063 /*
1064  * Returns
1065  *   _NO_RESOURCES
1066  *   _SUCCESS
1067  */
1068 static int
1069 snapshot_fill_children(rc_node_t *np)
1070 {
1071 	rc_node_t *nnp;
1072 	rc_snapshot_t *sp, *oldsp;
1073 	rc_snaplevel_t *lvl;
1074 	rc_node_lookup_t nl;
1075 	int r;
1076 
1077 	/* Get the rc_snapshot_t (& its rc_snaplevel_t's). */
1078 	(void) pthread_mutex_lock(&np->rn_lock);
1079 	sp = np->rn_snapshot;
1080 	(void) pthread_mutex_unlock(&np->rn_lock);
1081 	if (sp == NULL) {
1082 		r = rc_snapshot_get(np->rn_snapshot_id, &sp);
1083 		if (r != REP_PROTOCOL_SUCCESS) {
1084 			assert(r == REP_PROTOCOL_FAIL_NO_RESOURCES);
1085 			return (r);
1086 		}
1087 		(void) pthread_mutex_lock(&np->rn_lock);
1088 		oldsp = np->rn_snapshot;
1089 		assert(oldsp == NULL || oldsp == sp);
1090 		np->rn_snapshot = sp;
1091 		(void) pthread_mutex_unlock(&np->rn_lock);
1092 		if (oldsp != NULL)
1093 			rc_snapshot_rele(oldsp);
1094 	}
1095 
1096 	bzero(&nl, sizeof (nl));
1097 	nl.rl_type = REP_PROTOCOL_ENTITY_SNAPLEVEL;
1098 	nl.rl_backend = np->rn_id.rl_backend;
1099 	nl.rl_ids[ID_SERVICE] = np->rn_id.rl_ids[ID_SERVICE];
1100 	nl.rl_ids[ID_INSTANCE] = np->rn_id.rl_ids[ID_INSTANCE];
1101 	nl.rl_ids[ID_NAME] = np->rn_id.rl_main_id;
1102 	nl.rl_ids[ID_SNAPSHOT] = np->rn_snapshot_id;
1103 
1104 	/* Create rc_node_t's for the snapshot's rc_snaplevel_t's. */
1105 	for (lvl = sp->rs_levels; lvl != NULL; lvl = lvl->rsl_next) {
1106 		nnp = rc_node_alloc();
1107 		assert(nnp != NULL);
1108 		nl.rl_main_id = lvl->rsl_level_id;
1109 		nnp = rc_node_setup_snaplevel(nnp, &nl, lvl, np);
1110 		rc_node_rele(nnp);
1111 	}
1112 
1113 	return (REP_PROTOCOL_SUCCESS);
1114 }
1115 
1116 /*
1117  * Returns
1118  *   _NO_RESOURCES
1119  *   _SUCCESS
1120  */
1121 static int
1122 snaplevel_fill_children(rc_node_t *np)
1123 {
1124 	rc_snaplevel_t *lvl = np->rn_snaplevel;
1125 	child_info_t ci;
1126 	int res;
1127 	backend_query_t *q;
1128 
1129 	(void) snaplevel_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTYGRP,
1130 	    &ci);
1131 
1132 	q = backend_query_alloc();
1133 	backend_query_add(q,
1134 	    "SELECT snaplvl_pg_name, snaplvl_pg_id, snaplvl_gen_id, "
1135 	    "    snaplvl_pg_type, snaplvl_pg_flags "
1136 	    "    FROM snaplevel_lnk_tbl "
1137 	    "    WHERE (snaplvl_level_id = %d)",
1138 	    lvl->rsl_level_id);
1139 	res = backend_run(BACKEND_TYPE_NORMAL, q, fill_pg_callback, &ci);
1140 	if (res == REP_PROTOCOL_DONE)
1141 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1142 	backend_query_free(q);
1143 
1144 	return (res);
1145 }
1146 
1147 /*
1148  * Returns
1149  *   _NO_RESOURCES
1150  *   _SUCCESS
1151  */
1152 static int
1153 propertygrp_fill_children(rc_node_t *np)
1154 {
1155 	backend_query_t *q;
1156 	child_info_t ci;
1157 	int res;
1158 	backend_tx_t *tx;
1159 
1160 	backend_type_t backend = np->rn_id.rl_backend;
1161 
1162 	(void) propertygrp_setup_child_info(np, REP_PROTOCOL_ENTITY_PROPERTY,
1163 	    &ci);
1164 
1165 	res = backend_tx_begin_ro(backend, &tx);
1166 	if (res != REP_PROTOCOL_SUCCESS) {
1167 		/*
1168 		 * If the backend didn't exist, we wouldn't have got this
1169 		 * property group.
1170 		 */
1171 		assert(res != REP_PROTOCOL_FAIL_BACKEND_ACCESS);
1172 		return (res);
1173 	}
1174 
1175 	ci.ci_tx = tx;
1176 
1177 	q = backend_query_alloc();
1178 	backend_query_add(q,
1179 	    "SELECT lnk_prop_name, lnk_prop_id, lnk_prop_type, lnk_val_id "
1180 	    "FROM prop_lnk_tbl "
1181 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
1182 	    np->rn_id.rl_main_id, np->rn_gen_id);
1183 	res = backend_tx_run(tx, q, fill_property_callback, &ci);
1184 	if (res == REP_PROTOCOL_DONE)
1185 		res = REP_PROTOCOL_FAIL_NO_RESOURCES;
1186 	backend_query_free(q);
1187 	backend_tx_end_ro(tx);
1188 
1189 	return (res);
1190 }
1191 
1192 /*
1193  * Fails with
1194  *   _TYPE_MISMATCH - lp is not for a service
1195  *   _INVALID_TYPE - lp has invalid type
1196  *   _BAD_REQUEST - name is invalid
1197  */
1198 static int
1199 scope_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1200 {
1201 	uint32_t type = lp->rl_type;
1202 	int rc;
1203 
1204 	if (type != REP_PROTOCOL_ENTITY_SERVICE)
1205 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1206 
1207 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1208 		return (rc);
1209 
1210 	backend_query_add(q,
1211 	    "SELECT svc_id FROM service_tbl "
1212 	    "WHERE svc_name = '%q'",
1213 	    name);
1214 
1215 	return (REP_PROTOCOL_SUCCESS);
1216 }
1217 
1218 /*
1219  * Fails with
1220  *   _NO_RESOURCES - out of memory
1221  */
1222 static int
1223 scope_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1224 {
1225 	return (backend_tx_run_update(tx,
1226 	    "INSERT INTO service_tbl (svc_id, svc_name) "
1227 	    "VALUES (%d, '%q')",
1228 	    lp->rl_main_id, name));
1229 }
1230 
1231 /*
1232  * Fails with
1233  *   _TYPE_MISMATCH - lp is not for an instance or property group
1234  *   _INVALID_TYPE - lp has invalid type
1235  *   _BAD_REQUEST - name is invalid
1236  */
1237 static int
1238 service_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1239 {
1240 	uint32_t type = lp->rl_type;
1241 	int rc;
1242 
1243 	if (type != REP_PROTOCOL_ENTITY_INSTANCE &&
1244 	    type != REP_PROTOCOL_ENTITY_PROPERTYGRP)
1245 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1246 
1247 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1248 		return (rc);
1249 
1250 	switch (type) {
1251 	case REP_PROTOCOL_ENTITY_INSTANCE:
1252 		backend_query_add(q,
1253 		    "SELECT instance_id FROM instance_tbl "
1254 		    "WHERE instance_name = '%q' AND instance_svc = %d",
1255 		    name, lp->rl_ids[ID_SERVICE]);
1256 		break;
1257 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1258 		backend_query_add(q,
1259 		    "SELECT pg_id FROM pg_tbl "
1260 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1261 		    name, lp->rl_ids[ID_SERVICE]);
1262 		break;
1263 	default:
1264 		assert(0);
1265 		abort();
1266 	}
1267 
1268 	return (REP_PROTOCOL_SUCCESS);
1269 }
1270 
1271 /*
1272  * Fails with
1273  *   _NO_RESOURCES - out of memory
1274  */
1275 static int
1276 service_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1277 {
1278 	return (backend_tx_run_update(tx,
1279 	    "INSERT INTO instance_tbl "
1280 	    "    (instance_id, instance_name, instance_svc) "
1281 	    "VALUES (%d, '%q', %d)",
1282 	    lp->rl_main_id, name, lp->rl_ids[ID_SERVICE]));
1283 }
1284 
1285 /*
1286  * Fails with
1287  *   _NO_RESOURCES - out of memory
1288  */
1289 static int
1290 instance_insert_child(backend_tx_t *tx, rc_node_lookup_t *lp, const char *name)
1291 {
1292 	return (backend_tx_run_update(tx,
1293 	    "INSERT INTO snapshot_lnk_tbl "
1294 	    "    (lnk_id, lnk_inst_id, lnk_snap_name, lnk_snap_id) "
1295 	    "VALUES (%d, %d, '%q', 0)",
1296 	    lp->rl_main_id, lp->rl_ids[ID_INSTANCE], name));
1297 }
1298 
1299 /*
1300  * Fails with
1301  *   _TYPE_MISMATCH - lp is not for a property group or snapshot
1302  *   _INVALID_TYPE - lp has invalid type
1303  *   _BAD_REQUEST - name is invalid
1304  */
1305 static int
1306 instance_query_child(backend_query_t *q, rc_node_lookup_t *lp, const char *name)
1307 {
1308 	uint32_t type = lp->rl_type;
1309 	int rc;
1310 
1311 	if (type != REP_PROTOCOL_ENTITY_PROPERTYGRP &&
1312 	    type != REP_PROTOCOL_ENTITY_SNAPSHOT)
1313 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
1314 
1315 	if ((rc = rc_check_type_name(type, name)) != REP_PROTOCOL_SUCCESS)
1316 		return (rc);
1317 
1318 	switch (type) {
1319 	case REP_PROTOCOL_ENTITY_PROPERTYGRP:
1320 		backend_query_add(q,
1321 		    "SELECT pg_id FROM pg_tbl "
1322 		    "    WHERE pg_name = '%q' AND pg_parent_id = %d",
1323 		    name, lp->rl_ids[ID_INSTANCE]);
1324 		break;
1325 	case REP_PROTOCOL_ENTITY_SNAPSHOT:
1326 		backend_query_add(q,
1327 		    "SELECT lnk_id FROM snapshot_lnk_tbl "
1328 		    "    WHERE lnk_snap_name = '%q' AND lnk_inst_id = %d",
1329 		    name, lp->rl_ids[ID_INSTANCE]);
1330 		break;
1331 	default:
1332 		assert(0);
1333 		abort();
1334 	}
1335 
1336 	return (REP_PROTOCOL_SUCCESS);
1337 }
1338 
1339 static int
1340 generic_insert_pg_child(backend_tx_t *tx, rc_node_lookup_t *lp,
1341     const char *name, const char *pgtype, uint32_t flags, uint32_t gen)
1342 {
1343 	int parent_id = (lp->rl_ids[ID_INSTANCE] != 0)?
1344 	    lp->rl_ids[ID_INSTANCE] : lp->rl_ids[ID_SERVICE];
1345 	return (backend_tx_run_update(tx,
1346 	    "INSERT INTO pg_tbl "
1347 	    "    (pg_id, pg_name, pg_parent_id, pg_type, pg_flags, pg_gen_id) "
1348 	    "VALUES (%d, '%q', %d, '%q', %d, %d)",
1349 	    lp->rl_main_id, name, parent_id, pgtype, flags, gen));
1350 }
1351 
1352 static int
1353 service_delete_start(rc_node_t *np, delete_info_t *dip)
1354 {
1355 	int r;
1356 	backend_query_t *q = backend_query_alloc();
1357 
1358 	/*
1359 	 * Check for child instances, and refuse to delete if they exist.
1360 	 */
1361 	backend_query_add(q,
1362 	    "SELECT 1 FROM instance_tbl WHERE instance_svc = %d",
1363 	    np->rn_id.rl_main_id);
1364 
1365 	r = backend_tx_run(dip->di_tx, q, backend_fail_if_seen, NULL);
1366 	backend_query_free(q);
1367 
1368 	if (r == REP_PROTOCOL_DONE)
1369 		return (REP_PROTOCOL_FAIL_EXISTS);	/* instances exist */
1370 
1371 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &service_delete,
1372 	    np->rn_id.rl_main_id, 0));
1373 }
1374 
1375 static int
1376 instance_delete_start(rc_node_t *np, delete_info_t *dip)
1377 {
1378 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL, &instance_delete,
1379 	    np->rn_id.rl_main_id, 0));
1380 }
1381 
1382 static int
1383 snapshot_delete_start(rc_node_t *np, delete_info_t *dip)
1384 {
1385 	return (delete_stack_push(dip, BACKEND_TYPE_NORMAL,
1386 	    &snapshot_lnk_delete, np->rn_id.rl_main_id, 0));
1387 }
1388 
1389 static int
1390 propertygrp_delete_start(rc_node_t *np, delete_info_t *dip)
1391 {
1392 	return (delete_stack_push(dip, np->rn_id.rl_backend,
1393 	    &propertygrp_delete, np->rn_id.rl_main_id, 0));
1394 }
1395 
1396 static object_info_t info[] = {
1397 	{REP_PROTOCOL_ENTITY_NONE},
1398 	{REP_PROTOCOL_ENTITY_SCOPE,
1399 		BACKEND_ID_INVALID,
1400 		scope_fill_children,
1401 		scope_setup_child_info,
1402 		scope_query_child,
1403 		scope_insert_child,
1404 		NULL,
1405 		NULL,
1406 	},
1407 	{REP_PROTOCOL_ENTITY_SERVICE,
1408 		BACKEND_ID_SERVICE_INSTANCE,
1409 		service_fill_children,
1410 		service_setup_child_info,
1411 		service_query_child,
1412 		service_insert_child,
1413 		generic_insert_pg_child,
1414 		service_delete_start,
1415 	},
1416 	{REP_PROTOCOL_ENTITY_INSTANCE,
1417 		BACKEND_ID_SERVICE_INSTANCE,
1418 		instance_fill_children,
1419 		instance_setup_child_info,
1420 		instance_query_child,
1421 		instance_insert_child,
1422 		generic_insert_pg_child,
1423 		instance_delete_start,
1424 	},
1425 	{REP_PROTOCOL_ENTITY_SNAPSHOT,
1426 		BACKEND_ID_SNAPNAME,
1427 		snapshot_fill_children,
1428 		NULL,
1429 		NULL,
1430 		NULL,
1431 		NULL,
1432 		snapshot_delete_start,
1433 	},
1434 	{REP_PROTOCOL_ENTITY_SNAPLEVEL,
1435 		BACKEND_ID_SNAPLEVEL,
1436 		snaplevel_fill_children,
1437 		snaplevel_setup_child_info,
1438 	},
1439 	{REP_PROTOCOL_ENTITY_PROPERTYGRP,
1440 		BACKEND_ID_PROPERTYGRP,
1441 		propertygrp_fill_children,
1442 		NULL,
1443 		NULL,
1444 		NULL,
1445 		NULL,
1446 		propertygrp_delete_start,
1447 	},
1448 	{REP_PROTOCOL_ENTITY_PROPERTY},
1449 	{-1UL}
1450 };
1451 #define	NUM_INFO (sizeof (info) / sizeof (*info))
1452 
1453 /*
1454  * object_fill_children() populates the child list of an rc_node_t by calling
1455  * the appropriate <type>_fill_children() which runs backend queries that
1456  * call an appropriate fill_*_callback() which takes a row of results,
1457  * decodes them, and calls an rc_node_setup*() function in rc_node.c to create
1458  * a child.
1459  *
1460  * Fails with
1461  *   _NO_RESOURCES
1462  */
1463 int
1464 object_fill_children(rc_node_t *pp)
1465 {
1466 	uint32_t type = pp->rn_id.rl_type;
1467 	assert(type > 0 && type < NUM_INFO);
1468 
1469 	return ((*info[type].obj_fill_children)(pp));
1470 }
1471 
1472 int
1473 object_delete(rc_node_t *pp)
1474 {
1475 	int rc;
1476 
1477 	delete_info_t dip;
1478 	delete_ent_t de;
1479 
1480 	uint32_t type = pp->rn_id.rl_type;
1481 	assert(type > 0 && type < NUM_INFO);
1482 
1483 	if (info[type].obj_delete_start == NULL)
1484 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1485 
1486 	(void) memset(&dip, '\0', sizeof (dip));
1487 	rc = backend_tx_begin(BACKEND_TYPE_NORMAL, &dip.di_tx);
1488 	if (rc != REP_PROTOCOL_SUCCESS)
1489 		return (rc);
1490 
1491 	rc = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &dip.di_np_tx);
1492 	if (rc == REP_PROTOCOL_FAIL_BACKEND_ACCESS ||
1493 	    rc == REP_PROTOCOL_FAIL_BACKEND_READONLY)
1494 		dip.di_np_tx = NULL;
1495 	else if (rc != REP_PROTOCOL_SUCCESS) {
1496 		backend_tx_rollback(dip.di_tx);
1497 		return (rc);
1498 	}
1499 
1500 	if ((rc = (*info[type].obj_delete_start)(pp, &dip)) !=
1501 	    REP_PROTOCOL_SUCCESS) {
1502 		goto fail;
1503 	}
1504 
1505 	while (delete_stack_pop(&dip, &de)) {
1506 		rc = (*de.de_cb)(&dip, &de);
1507 		if (rc != REP_PROTOCOL_SUCCESS)
1508 			goto fail;
1509 	}
1510 
1511 	rc = backend_tx_commit(dip.di_tx);
1512 	if (rc != REP_PROTOCOL_SUCCESS)
1513 		backend_tx_rollback(dip.di_np_tx);
1514 	else if (dip.di_np_tx)
1515 		(void) backend_tx_commit(dip.di_np_tx);
1516 
1517 	delete_stack_cleanup(&dip);
1518 
1519 	return (rc);
1520 
1521 fail:
1522 	backend_tx_rollback(dip.di_tx);
1523 	backend_tx_rollback(dip.di_np_tx);
1524 	delete_stack_cleanup(&dip);
1525 	return (rc);
1526 }
1527 
1528 int
1529 object_do_create(backend_tx_t *tx, child_info_t *cip, rc_node_t *pp,
1530     uint32_t type, const char *name, rc_node_t **cpp)
1531 {
1532 	uint32_t ptype = pp->rn_id.rl_type;
1533 
1534 	backend_query_t *q;
1535 	uint32_t id;
1536 	rc_node_t *np = NULL;
1537 	int rc;
1538 	object_info_t *ip;
1539 
1540 	rc_node_lookup_t *lp = &cip->ci_base_nl;
1541 
1542 	assert(ptype > 0 && ptype < NUM_INFO);
1543 
1544 	ip = &info[ptype];
1545 
1546 	if (type == REP_PROTOCOL_ENTITY_PROPERTYGRP)
1547 		return (REP_PROTOCOL_FAIL_NOT_APPLICABLE);
1548 
1549 	if (ip->obj_setup_child_info == NULL ||
1550 	    ip->obj_query_child == NULL ||
1551 	    ip->obj_insert_child == NULL)
1552 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1553 
1554 	if ((rc = (*ip->obj_setup_child_info)(pp, type, cip)) !=
1555 	    REP_PROTOCOL_SUCCESS)
1556 		return (rc);
1557 
1558 	q = backend_query_alloc();
1559 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1560 	    REP_PROTOCOL_SUCCESS) {
1561 		assert(rc == REP_PROTOCOL_FAIL_BAD_REQUEST);
1562 		backend_query_free(q);
1563 		return (rc);
1564 	}
1565 
1566 	rc = backend_tx_run_single_int(tx, q, &id);
1567 	backend_query_free(q);
1568 
1569 	if (rc == REP_PROTOCOL_SUCCESS)
1570 		return (REP_PROTOCOL_FAIL_EXISTS);
1571 	else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND)
1572 		return (rc);
1573 
1574 	if ((lp->rl_main_id = backend_new_id(tx,
1575 	    info[type].obj_id_space)) == 0) {
1576 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1577 	}
1578 
1579 	if ((np = rc_node_alloc()) == NULL)
1580 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
1581 
1582 	if ((rc = (*ip->obj_insert_child)(tx, lp, name)) !=
1583 	    REP_PROTOCOL_SUCCESS) {
1584 		rc_node_destroy(np);
1585 		return (rc);
1586 	}
1587 
1588 	*cpp = np;
1589 	return (REP_PROTOCOL_SUCCESS);
1590 }
1591 
1592 /*
1593  * Fails with
1594  *   _NOT_APPLICABLE - type is _PROPERTYGRP
1595  *   _BAD_REQUEST - cannot create children for this type of node
1596  *		    name is invalid
1597  *   _TYPE_MISMATCH - object cannot have children of type type
1598  *   _NO_RESOURCES - out of memory, or could not allocate new id
1599  *   _BACKEND_READONLY
1600  *   _BACKEND_ACCESS
1601  *   _EXISTS - child already exists
1602  */
1603 int
1604 object_create(rc_node_t *pp, uint32_t type, const char *name, rc_node_t **cpp)
1605 {
1606 	backend_tx_t *tx;
1607 	rc_node_t *np = NULL;
1608 	child_info_t ci;
1609 	int rc;
1610 
1611 	if ((rc = backend_tx_begin(pp->rn_id.rl_backend, &tx)) !=
1612 	    REP_PROTOCOL_SUCCESS) {
1613 		return (rc);
1614 	}
1615 
1616 	if ((rc = object_do_create(tx, &ci, pp, type, name, &np)) !=
1617 	    REP_PROTOCOL_SUCCESS) {
1618 		backend_tx_rollback(tx);
1619 		return (rc);
1620 	}
1621 
1622 	rc = backend_tx_commit(tx);
1623 	if (rc != REP_PROTOCOL_SUCCESS) {
1624 		rc_node_destroy(np);
1625 		return (rc);
1626 	}
1627 
1628 	*cpp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
1629 
1630 	return (REP_PROTOCOL_SUCCESS);
1631 }
1632 
1633 /*ARGSUSED*/
1634 int
1635 object_create_pg(rc_node_t *pp, uint32_t type, const char *name,
1636     const char *pgtype, uint32_t flags, rc_node_t **cpp)
1637 {
1638 	uint32_t ptype = pp->rn_id.rl_type;
1639 	backend_tx_t *tx_ro, *tx_wr;
1640 	backend_query_t *q;
1641 	uint32_t id;
1642 	uint32_t gen = 0;
1643 	rc_node_t *np = NULL;
1644 	int rc;
1645 	int rc_wr;
1646 	int rc_ro;
1647 	object_info_t *ip;
1648 
1649 	int nonpersist = (flags & SCF_PG_FLAG_NONPERSISTENT);
1650 
1651 	child_info_t ci;
1652 	rc_node_lookup_t *lp = &ci.ci_base_nl;
1653 
1654 	assert(ptype > 0 && ptype < NUM_INFO);
1655 
1656 	if (ptype != REP_PROTOCOL_ENTITY_SERVICE &&
1657 	    ptype != REP_PROTOCOL_ENTITY_INSTANCE)
1658 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
1659 
1660 	ip = &info[ptype];
1661 
1662 	assert(ip->obj_setup_child_info != NULL &&
1663 	    ip->obj_query_child != NULL &&
1664 	    ip->obj_insert_pg_child != NULL);
1665 
1666 	if ((rc = (*ip->obj_setup_child_info)(pp, type, &ci)) !=
1667 	    REP_PROTOCOL_SUCCESS)
1668 		return (rc);
1669 
1670 	q = backend_query_alloc();
1671 	if ((rc = (*ip->obj_query_child)(q, lp, name)) !=
1672 	    REP_PROTOCOL_SUCCESS) {
1673 		backend_query_free(q);
1674 		return (rc);
1675 	}
1676 
1677 	if (!nonpersist) {
1678 		lp->rl_backend = BACKEND_TYPE_NORMAL;
1679 		rc_wr = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx_wr);
1680 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NONPERSIST, &tx_ro);
1681 	} else {
1682 		lp->rl_backend = BACKEND_TYPE_NONPERSIST;
1683 		rc_ro = backend_tx_begin_ro(BACKEND_TYPE_NORMAL, &tx_ro);
1684 		rc_wr = backend_tx_begin(BACKEND_TYPE_NONPERSIST, &tx_wr);
1685 	}
1686 
1687 	if (rc_wr != REP_PROTOCOL_SUCCESS) {
1688 		rc = rc_wr;
1689 		goto fail;
1690 	}
1691 	if (rc_ro != REP_PROTOCOL_SUCCESS &&
1692 	    rc_ro != REP_PROTOCOL_FAIL_BACKEND_ACCESS) {
1693 		rc = rc_ro;
1694 		goto fail;
1695 	}
1696 
1697 	if (tx_ro != NULL) {
1698 		rc = backend_tx_run_single_int(tx_ro, q, &id);
1699 
1700 		if (rc == REP_PROTOCOL_SUCCESS) {
1701 			backend_query_free(q);
1702 			rc = REP_PROTOCOL_FAIL_EXISTS;
1703 			goto fail;
1704 		} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1705 			backend_query_free(q);
1706 			goto fail;
1707 		}
1708 	}
1709 
1710 	rc = backend_tx_run_single_int(tx_wr, q, &id);
1711 	backend_query_free(q);
1712 
1713 	if (rc == REP_PROTOCOL_SUCCESS) {
1714 		rc = REP_PROTOCOL_FAIL_EXISTS;
1715 		goto fail;
1716 	} else if (rc != REP_PROTOCOL_FAIL_NOT_FOUND) {
1717 		goto fail;
1718 	}
1719 
1720 	if (tx_ro != NULL)
1721 		backend_tx_end_ro(tx_ro);
1722 	tx_ro = NULL;
1723 
1724 	if ((lp->rl_main_id = backend_new_id(tx_wr,
1725 	    info[type].obj_id_space)) == 0) {
1726 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1727 		goto fail;
1728 	}
1729 
1730 	if ((np = rc_node_alloc()) == NULL) {
1731 		rc = REP_PROTOCOL_FAIL_NO_RESOURCES;
1732 		goto fail;
1733 	}
1734 
1735 	if ((rc = (*ip->obj_insert_pg_child)(tx_wr, lp, name, pgtype, flags,
1736 	    gen)) != REP_PROTOCOL_SUCCESS) {
1737 		rc_node_destroy(np);
1738 		goto fail;
1739 	}
1740 
1741 	rc = backend_tx_commit(tx_wr);
1742 	if (rc != REP_PROTOCOL_SUCCESS) {
1743 		rc_node_destroy(np);
1744 		return (rc);
1745 	}
1746 
1747 	*cpp = rc_node_setup_pg(np, lp, name, pgtype, flags, gen, ci.ci_parent);
1748 
1749 	return (REP_PROTOCOL_SUCCESS);
1750 
1751 fail:
1752 	if (tx_ro != NULL)
1753 		backend_tx_end_ro(tx_ro);
1754 	if (tx_wr != NULL)
1755 		backend_tx_rollback(tx_wr);
1756 	return (rc);
1757 }
1758 
1759 /*
1760  * Given a row of snaplevel number, snaplevel id, service id, service name,
1761  * instance id, & instance name, create a rc_snaplevel_t & prepend it onto the
1762  * rs_levels list of the rc_snapshot_t passed in as data.
1763  * Returns _CONTINUE on success or _ABORT if any allocations fail.
1764  */
1765 /*ARGSUSED*/
1766 static int
1767 fill_snapshot_cb(void *data, int columns, char **vals, char **names)
1768 {
1769 	rc_snapshot_t *sp = data;
1770 	rc_snaplevel_t *lvl;
1771 	char *num = vals[0];
1772 	char *id = vals[1];
1773 	char *service_id = vals[2];
1774 	char *service = vals[3];
1775 	char *instance_id = vals[4];
1776 	char *instance = vals[5];
1777 	assert(columns == 6);
1778 
1779 	lvl = uu_zalloc(sizeof (*lvl));
1780 	if (lvl == NULL)
1781 		return (BACKEND_CALLBACK_ABORT);
1782 	lvl->rsl_parent = sp;
1783 	lvl->rsl_next = sp->rs_levels;
1784 	sp->rs_levels = lvl;
1785 
1786 	string_to_id(num, &lvl->rsl_level_num, "snap_level_num");
1787 	string_to_id(id, &lvl->rsl_level_id, "snap_level_id");
1788 	string_to_id(service_id, &lvl->rsl_service_id, "snap_level_service_id");
1789 	if (instance_id != NULL)
1790 		string_to_id(instance_id, &lvl->rsl_instance_id,
1791 		    "snap_level_instance_id");
1792 
1793 	lvl->rsl_scope = (const char *)"localhost";
1794 	lvl->rsl_service = strdup(service);
1795 	if (lvl->rsl_service == NULL) {
1796 		uu_free(lvl);
1797 		return (BACKEND_CALLBACK_ABORT);
1798 	}
1799 	if (instance) {
1800 		assert(lvl->rsl_instance_id != 0);
1801 		lvl->rsl_instance = strdup(instance);
1802 		if (lvl->rsl_instance == NULL) {
1803 			free((void *)lvl->rsl_instance);
1804 			uu_free(lvl);
1805 			return (BACKEND_CALLBACK_ABORT);
1806 		}
1807 	} else {
1808 		assert(lvl->rsl_instance_id == 0);
1809 	}
1810 
1811 	return (BACKEND_CALLBACK_CONTINUE);
1812 }
1813 
1814 /*
1815  * Populate sp's rs_levels list from the snaplevel_tbl table.
1816  * Fails with
1817  *   _NO_RESOURCES
1818  */
1819 int
1820 object_fill_snapshot(rc_snapshot_t *sp)
1821 {
1822 	backend_query_t *q;
1823 	rc_snaplevel_t *sl;
1824 	int result;
1825 	int i;
1826 
1827 	q = backend_query_alloc();
1828 	backend_query_add(q,
1829 	    "SELECT snap_level_num, snap_level_id, "
1830 	    "    snap_level_service_id, snap_level_service, "
1831 	    "    snap_level_instance_id, snap_level_instance "
1832 	    "FROM snaplevel_tbl "
1833 	    "WHERE snap_id = %d "
1834 	    "ORDER BY snap_level_id DESC",
1835 	    sp->rs_snap_id);
1836 
1837 	result = backend_run(BACKEND_TYPE_NORMAL, q, fill_snapshot_cb, sp);
1838 	if (result == REP_PROTOCOL_DONE)
1839 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
1840 	backend_query_free(q);
1841 
1842 	if (result == REP_PROTOCOL_SUCCESS) {
1843 		i = 0;
1844 		for (sl = sp->rs_levels; sl != NULL; sl = sl->rsl_next) {
1845 			if (sl->rsl_level_num != ++i) {
1846 				backend_panic("snaplevels corrupt; expected "
1847 				    "level %d, got %d", i, sl->rsl_level_num);
1848 			}
1849 		}
1850 	}
1851 	return (result);
1852 }
1853 
1854 /*
1855  * This represents a property group in a snapshot.
1856  */
1857 typedef struct check_snapshot_elem {
1858 	uint32_t cse_parent;
1859 	uint32_t cse_pg_id;
1860 	uint32_t cse_pg_gen;
1861 	char	cse_seen;
1862 } check_snapshot_elem_t;
1863 
1864 #define	CSI_MAX_PARENTS		COMPOSITION_DEPTH
1865 typedef struct check_snapshot_info {
1866 	size_t			csi_count;
1867 	size_t			csi_array_size;
1868 	check_snapshot_elem_t	*csi_array;
1869 	size_t			csi_nparents;
1870 	uint32_t		csi_parent_ids[CSI_MAX_PARENTS];
1871 } check_snapshot_info_t;
1872 
1873 /*ARGSUSED*/
1874 static int
1875 check_snapshot_fill_cb(void *data, int columns, char **vals, char **names)
1876 {
1877 	check_snapshot_info_t *csip = data;
1878 	check_snapshot_elem_t *cur;
1879 	const char *parent;
1880 	const char *pg_id;
1881 	const char *pg_gen_id;
1882 
1883 	if (columns == 1) {
1884 		uint32_t *target;
1885 
1886 		if (csip->csi_nparents >= CSI_MAX_PARENTS)
1887 			backend_panic("snaplevel table has too many elements");
1888 
1889 		target = &csip->csi_parent_ids[csip->csi_nparents++];
1890 		string_to_id(vals[0], target, "snap_level_*_id");
1891 
1892 		return (BACKEND_CALLBACK_CONTINUE);
1893 	}
1894 
1895 	assert(columns == 3);
1896 
1897 	parent = vals[0];
1898 	pg_id = vals[1];
1899 	pg_gen_id = vals[2];
1900 
1901 	if (csip->csi_count == csip->csi_array_size) {
1902 		size_t newsz = (csip->csi_array_size > 0) ?
1903 		    csip->csi_array_size * 2 : 8;
1904 		check_snapshot_elem_t *new = uu_zalloc(newsz * sizeof (*new));
1905 
1906 		if (new == NULL)
1907 			return (BACKEND_CALLBACK_ABORT);
1908 
1909 		(void) memcpy(new, csip->csi_array,
1910 		    sizeof (*new) * csip->csi_array_size);
1911 		uu_free(csip->csi_array);
1912 		csip->csi_array = new;
1913 		csip->csi_array_size = newsz;
1914 	}
1915 
1916 	cur = &csip->csi_array[csip->csi_count++];
1917 
1918 	string_to_id(parent, &cur->cse_parent, "snap_level_*_id");
1919 	string_to_id(pg_id, &cur->cse_pg_id, "snaplvl_pg_id");
1920 	string_to_id(pg_gen_id, &cur->cse_pg_gen, "snaplvl_gen_id");
1921 	cur->cse_seen = 0;
1922 
1923 	return (BACKEND_CALLBACK_CONTINUE);
1924 }
1925 
1926 static int
1927 check_snapshot_elem_cmp(const void *lhs_arg, const void *rhs_arg)
1928 {
1929 	const check_snapshot_elem_t *lhs = lhs_arg;
1930 	const check_snapshot_elem_t *rhs = rhs_arg;
1931 
1932 	if (lhs->cse_parent < rhs->cse_parent)
1933 		return (-1);
1934 	if (lhs->cse_parent > rhs->cse_parent)
1935 		return (1);
1936 
1937 	if (lhs->cse_pg_id < rhs->cse_pg_id)
1938 		return (-1);
1939 	if (lhs->cse_pg_id > rhs->cse_pg_id)
1940 		return (1);
1941 
1942 	if (lhs->cse_pg_gen < rhs->cse_pg_gen)
1943 		return (-1);
1944 	if (lhs->cse_pg_gen > rhs->cse_pg_gen)
1945 		return (1);
1946 
1947 	return (0);
1948 }
1949 
1950 /*ARGSUSED*/
1951 static int
1952 check_snapshot_check_cb(void *data, int columns, char **vals, char **names)
1953 {
1954 	check_snapshot_info_t *csip = data;
1955 	check_snapshot_elem_t elem;
1956 	check_snapshot_elem_t *cur;
1957 
1958 	const char *parent = vals[0];
1959 	const char *pg_id = vals[1];
1960 	const char *pg_gen_id = vals[2];
1961 
1962 	assert(columns == 3);
1963 
1964 	string_to_id(parent, &elem.cse_parent, "snap_level_*_id");
1965 	string_to_id(pg_id, &elem.cse_pg_id, "snaplvl_pg_id");
1966 	string_to_id(pg_gen_id, &elem.cse_pg_gen, "snaplvl_gen_id");
1967 
1968 	if ((cur = bsearch(&elem, csip->csi_array, csip->csi_count,
1969 	    sizeof (*csip->csi_array), check_snapshot_elem_cmp)) == NULL)
1970 		return (BACKEND_CALLBACK_ABORT);
1971 
1972 	if (cur->cse_seen)
1973 		backend_panic("duplicate property group reported");
1974 	cur->cse_seen = 1;
1975 	return (BACKEND_CALLBACK_CONTINUE);
1976 }
1977 
1978 /*
1979  * Check that a snapshot matches up with the latest in the repository.
1980  * Returns:
1981  *	REP_PROTOCOL_SUCCESS		if it is up-to-date,
1982  *	REP_PROTOCOL_DONE		if it is out-of-date, or
1983  *	REP_PROTOCOL_FAIL_NO_RESOURCES	if we ran out of memory.
1984  */
1985 static int
1986 object_check_snapshot(uint32_t snap_id)
1987 {
1988 	check_snapshot_info_t csi;
1989 	backend_query_t *q;
1990 	int result;
1991 	size_t idx;
1992 
1993 	/* if the snapshot has never been taken, it must be out of date. */
1994 	if (snap_id == 0)
1995 		return (REP_PROTOCOL_DONE);
1996 
1997 	(void) memset(&csi, '\0', sizeof (csi));
1998 
1999 	q = backend_query_alloc();
2000 	backend_query_add(q,
2001 	    "SELECT\n"
2002 	    "    CASE snap_level_instance_id\n"
2003 	    "        WHEN 0 THEN snap_level_service_id\n"
2004 	    "        ELSE snap_level_instance_id\n"
2005 	    "    END\n"
2006 	    "FROM snaplevel_tbl\n"
2007 	    "WHERE snap_id = %d;\n"
2008 	    "\n"
2009 	    "SELECT\n"
2010 	    "    CASE snap_level_instance_id\n"
2011 	    "        WHEN 0 THEN snap_level_service_id\n"
2012 	    "        ELSE snap_level_instance_id\n"
2013 	    "    END,\n"
2014 	    "    snaplvl_pg_id,\n"
2015 	    "    snaplvl_gen_id\n"
2016 	    "FROM snaplevel_tbl, snaplevel_lnk_tbl\n"
2017 	    "WHERE\n"
2018 	    "    (snaplvl_level_id = snap_level_id AND\n"
2019 	    "    snap_id = %d);",
2020 	    snap_id, snap_id);
2021 
2022 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_fill_cb,
2023 	    &csi);
2024 	if (result == REP_PROTOCOL_DONE)
2025 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2026 	backend_query_free(q);
2027 
2028 	if (result != REP_PROTOCOL_SUCCESS)
2029 		goto fail;
2030 
2031 	if (csi.csi_count > 0) {
2032 		qsort(csi.csi_array, csi.csi_count, sizeof (*csi.csi_array),
2033 		    check_snapshot_elem_cmp);
2034 	}
2035 
2036 #if COMPOSITION_DEPTH == 2
2037 	if (csi.csi_nparents != COMPOSITION_DEPTH) {
2038 		result = REP_PROTOCOL_DONE;
2039 		goto fail;
2040 	}
2041 
2042 	q = backend_query_alloc();
2043 	backend_query_add(q,
2044 	    "SELECT "
2045 	    "    pg_parent_id, pg_id, pg_gen_id "
2046 	    "FROM "
2047 	    "    pg_tbl "
2048 	    "WHERE (pg_parent_id = %d OR pg_parent_id = %d)",
2049 	    csi.csi_parent_ids[0], csi.csi_parent_ids[1]);
2050 
2051 	result = backend_run(BACKEND_TYPE_NORMAL, q, check_snapshot_check_cb,
2052 	    &csi);
2053 #else
2054 #error This code must be updated
2055 #endif
2056 	/*
2057 	 * To succeed, the callback must not have aborted, and we must have
2058 	 * found all of the items.
2059 	 */
2060 	if (result == REP_PROTOCOL_SUCCESS) {
2061 		for (idx = 0; idx < csi.csi_count; idx++) {
2062 			if (csi.csi_array[idx].cse_seen == 0) {
2063 				result = REP_PROTOCOL_DONE;
2064 				goto fail;
2065 			}
2066 		}
2067 	}
2068 
2069 fail:
2070 	uu_free(csi.csi_array);
2071 	return (result);
2072 }
2073 
2074 /*ARGSUSED*/
2075 static int
2076 object_copy_string(void *data_arg, int columns, char **vals, char **names)
2077 {
2078 	char **data = data_arg;
2079 
2080 	assert(columns == 1);
2081 
2082 	if (*data != NULL)
2083 		free(*data);
2084 	*data = NULL;
2085 
2086 	if (vals[0] != NULL) {
2087 		if ((*data = strdup(vals[0])) == NULL)
2088 			return (BACKEND_CALLBACK_ABORT);
2089 	}
2090 
2091 	return (BACKEND_CALLBACK_CONTINUE);
2092 }
2093 
2094 struct snaplevel_add_info {
2095 	backend_query_t *sai_q;
2096 	uint32_t	sai_level_id;
2097 	int		sai_used;		/* sai_q has been used */
2098 };
2099 
2100 /*ARGSUSED*/
2101 static int
2102 object_snaplevel_process_pg(void *data_arg, int columns, char **vals,
2103     char **names)
2104 {
2105 	struct snaplevel_add_info *data = data_arg;
2106 
2107 	assert(columns == 5);
2108 
2109 	backend_query_add(data->sai_q,
2110 	    "INSERT INTO snaplevel_lnk_tbl "
2111 	    "    (snaplvl_level_id, snaplvl_pg_id, snaplvl_pg_name, "
2112 	    "    snaplvl_pg_type, snaplvl_pg_flags, snaplvl_gen_id)"
2113 	    "VALUES (%d, %s, '%q', '%q', %s, %s);",
2114 	    data->sai_level_id, vals[0], vals[1], vals[2], vals[3], vals[4]);
2115 
2116 	data->sai_used = 1;
2117 
2118 	return (BACKEND_CALLBACK_CONTINUE);
2119 }
2120 
2121 /*ARGSUSED*/
2122 static int
2123 object_snapshot_add_level(backend_tx_t *tx, uint32_t snap_id,
2124     uint32_t snap_level_num, uint32_t svc_id, const char *svc_name,
2125     uint32_t inst_id, const char *inst_name)
2126 {
2127 	struct snaplevel_add_info data;
2128 	backend_query_t *q;
2129 	int result;
2130 
2131 	assert((snap_level_num == 1 && inst_name != NULL) ||
2132 	    snap_level_num == 2 && inst_name == NULL);
2133 
2134 	data.sai_level_id = backend_new_id(tx, BACKEND_ID_SNAPLEVEL);
2135 	if (data.sai_level_id == 0) {
2136 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
2137 	}
2138 
2139 	result = backend_tx_run_update(tx,
2140 	    "INSERT INTO snaplevel_tbl "
2141 	    "    (snap_id, snap_level_num, snap_level_id, "
2142 	    "    snap_level_service_id, snap_level_service, "
2143 	    "    snap_level_instance_id, snap_level_instance) "
2144 	    "VALUES (%d, %d, %d, %d, %Q, %d, %Q);",
2145 	    snap_id, snap_level_num, data.sai_level_id, svc_id, svc_name,
2146 	    inst_id, inst_name);
2147 
2148 	q = backend_query_alloc();
2149 	backend_query_add(q,
2150 	    "SELECT pg_id, pg_name, pg_type, pg_flags, pg_gen_id FROM pg_tbl "
2151 	    "WHERE (pg_parent_id = %d);",
2152 	    (inst_name != NULL)? inst_id : svc_id);
2153 
2154 	data.sai_q = backend_query_alloc();
2155 	data.sai_used = 0;
2156 	result = backend_tx_run(tx, q, object_snaplevel_process_pg,
2157 	    &data);
2158 	backend_query_free(q);
2159 
2160 	if (result == REP_PROTOCOL_SUCCESS && data.sai_used != 0)
2161 		result = backend_tx_run(tx, data.sai_q, NULL, NULL);
2162 	backend_query_free(data.sai_q);
2163 
2164 	return (result);
2165 }
2166 
2167 /*
2168  * Fails with:
2169  *	_NO_RESOURCES - no new id or out of disk space
2170  *	_BACKEND_READONLY - persistent backend is read-only
2171  */
2172 static int
2173 object_snapshot_do_take(uint32_t instid, const char *inst_name,
2174     uint32_t svcid, const char *svc_name,
2175     backend_tx_t **tx_out, uint32_t *snapid_out)
2176 {
2177 	backend_tx_t *tx;
2178 	backend_query_t *q;
2179 	int result;
2180 
2181 	char *svc_name_alloc = NULL;
2182 	char *inst_name_alloc = NULL;
2183 	uint32_t snapid;
2184 
2185 	result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2186 	if (result != REP_PROTOCOL_SUCCESS)
2187 		return (result);
2188 
2189 	snapid = backend_new_id(tx, BACKEND_ID_SNAPSHOT);
2190 	if (snapid == 0) {
2191 		result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2192 		goto fail;
2193 	}
2194 
2195 	if (svc_name == NULL) {
2196 		q = backend_query_alloc();
2197 		backend_query_add(q,
2198 		    "SELECT svc_name FROM service_tbl "
2199 		    "WHERE (svc_id = %d)", svcid);
2200 		result = backend_tx_run(tx, q, object_copy_string,
2201 		    &svc_name_alloc);
2202 		backend_query_free(q);
2203 
2204 		svc_name = svc_name_alloc;
2205 
2206 		if (result == REP_PROTOCOL_DONE) {
2207 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2208 			goto fail;
2209 		}
2210 		if (result == REP_PROTOCOL_SUCCESS && svc_name == NULL)
2211 			backend_panic("unable to find name for svc id %d\n",
2212 			    svcid);
2213 
2214 		if (result != REP_PROTOCOL_SUCCESS)
2215 			goto fail;
2216 	}
2217 
2218 	if (inst_name == NULL) {
2219 		q = backend_query_alloc();
2220 		backend_query_add(q,
2221 		    "SELECT instance_name FROM instance_tbl "
2222 		    "WHERE (instance_id = %d)", instid);
2223 		result = backend_tx_run(tx, q, object_copy_string,
2224 		    &inst_name_alloc);
2225 		backend_query_free(q);
2226 
2227 		inst_name = inst_name_alloc;
2228 
2229 		if (result == REP_PROTOCOL_DONE) {
2230 			result = REP_PROTOCOL_FAIL_NO_RESOURCES;
2231 			goto fail;
2232 		}
2233 
2234 		if (result == REP_PROTOCOL_SUCCESS && inst_name == NULL)
2235 			backend_panic(
2236 			    "unable to find name for instance id %d\n", instid);
2237 
2238 		if (result != REP_PROTOCOL_SUCCESS)
2239 			goto fail;
2240 	}
2241 
2242 	result = object_snapshot_add_level(tx, snapid, 1,
2243 	    svcid, svc_name, instid, inst_name);
2244 
2245 	if (result != REP_PROTOCOL_SUCCESS)
2246 		goto fail;
2247 
2248 	result = object_snapshot_add_level(tx, snapid, 2,
2249 	    svcid, svc_name, 0, NULL);
2250 
2251 	if (result != REP_PROTOCOL_SUCCESS)
2252 		goto fail;
2253 
2254 	*snapid_out = snapid;
2255 	*tx_out = tx;
2256 
2257 	free(svc_name_alloc);
2258 	free(inst_name_alloc);
2259 
2260 	return (REP_PROTOCOL_SUCCESS);
2261 
2262 fail:
2263 	backend_tx_rollback(tx);
2264 	free(svc_name_alloc);
2265 	free(inst_name_alloc);
2266 	return (result);
2267 }
2268 
2269 /*
2270  * Fails with:
2271  *	_TYPE_MISMATCH - pp is not an instance
2272  *	_NO_RESOURCES - no new id or out of disk space
2273  *	_BACKEND_READONLY - persistent backend is read-only
2274  */
2275 int
2276 object_snapshot_take_new(rc_node_t *pp,
2277     const char *svc_name, const char *inst_name,
2278     const char *name, rc_node_t **outp)
2279 {
2280 	rc_node_lookup_t *insti = &pp->rn_id;
2281 
2282 	uint32_t instid = insti->rl_main_id;
2283 	uint32_t svcid = insti->rl_ids[ID_SERVICE];
2284 	uint32_t snapid = 0;
2285 	backend_tx_t *tx = NULL;
2286 	child_info_t ci;
2287 	rc_node_t *np;
2288 	int result;
2289 
2290 	if (insti->rl_type != REP_PROTOCOL_ENTITY_INSTANCE)
2291 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2292 
2293 	result = object_snapshot_do_take(instid, inst_name, svcid, svc_name,
2294 	    &tx, &snapid);
2295 	if (result != REP_PROTOCOL_SUCCESS)
2296 		return (result);
2297 
2298 	if ((result = object_do_create(tx, &ci, pp,
2299 	    REP_PROTOCOL_ENTITY_SNAPSHOT, name, &np)) != REP_PROTOCOL_SUCCESS) {
2300 		backend_tx_rollback(tx);
2301 		return (result);
2302 	}
2303 
2304 	/*
2305 	 * link the new object to the new snapshot.
2306 	 */
2307 	np->rn_snapshot_id = snapid;
2308 
2309 	result = backend_tx_run_update(tx,
2310 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2311 	    snapid, ci.ci_base_nl.rl_main_id);
2312 	if (result != REP_PROTOCOL_SUCCESS) {
2313 		backend_tx_rollback(tx);
2314 		rc_node_destroy(np);
2315 		return (result);
2316 	}
2317 	result = backend_tx_commit(tx);
2318 	if (result != REP_PROTOCOL_SUCCESS) {
2319 		rc_node_destroy(np);
2320 		return (result);
2321 	}
2322 
2323 	*outp = rc_node_setup(np, &ci.ci_base_nl, name, ci.ci_parent);
2324 	return (REP_PROTOCOL_SUCCESS);
2325 }
2326 
2327 /*
2328  * Fails with:
2329  *	_TYPE_MISMATCH - pp is not an instance
2330  *	_NO_RESOURCES - no new id or out of disk space
2331  *	_BACKEND_READONLY - persistent backend is read-only
2332  */
2333 int
2334 object_snapshot_attach(rc_node_lookup_t *snapi, uint32_t *snapid_ptr,
2335     int takesnap)
2336 {
2337 	uint32_t svcid = snapi->rl_ids[ID_SERVICE];
2338 	uint32_t instid = snapi->rl_ids[ID_INSTANCE];
2339 	uint32_t snapid = *snapid_ptr;
2340 	uint32_t oldsnapid = 0;
2341 	backend_tx_t *tx = NULL;
2342 	backend_query_t *q;
2343 	int result;
2344 
2345 	delete_info_t dip;
2346 	delete_ent_t de;
2347 
2348 	if (snapi->rl_type != REP_PROTOCOL_ENTITY_SNAPSHOT)
2349 		return (REP_PROTOCOL_FAIL_TYPE_MISMATCH);
2350 
2351 	if (takesnap) {
2352 		/* first, check that we're actually out of date */
2353 		if (object_check_snapshot(snapid) == REP_PROTOCOL_SUCCESS)
2354 			return (REP_PROTOCOL_SUCCESS);
2355 
2356 		result = object_snapshot_do_take(instid, NULL,
2357 		    svcid, NULL, &tx, &snapid);
2358 		if (result != REP_PROTOCOL_SUCCESS)
2359 			return (result);
2360 	} else {
2361 		result = backend_tx_begin(BACKEND_TYPE_NORMAL, &tx);
2362 		if (result != REP_PROTOCOL_SUCCESS)
2363 			return (result);
2364 	}
2365 
2366 	q = backend_query_alloc();
2367 	backend_query_add(q,
2368 	    "SELECT lnk_snap_id FROM snapshot_lnk_tbl WHERE lnk_id = %d; "
2369 	    "UPDATE snapshot_lnk_tbl SET lnk_snap_id = %d WHERE lnk_id = %d;",
2370 	    snapi->rl_main_id, snapid, snapi->rl_main_id);
2371 	result = backend_tx_run_single_int(tx, q, &oldsnapid);
2372 	backend_query_free(q);
2373 
2374 	if (result == REP_PROTOCOL_FAIL_NOT_FOUND) {
2375 		backend_tx_rollback(tx);
2376 		backend_panic("unable to find snapshot id %d",
2377 		    snapi->rl_main_id);
2378 	}
2379 	if (result != REP_PROTOCOL_SUCCESS)
2380 		goto fail;
2381 
2382 	/*
2383 	 * Now we use the delete stack to handle the possible unreferencing
2384 	 * of oldsnapid.
2385 	 */
2386 	(void) memset(&dip, 0, sizeof (dip));
2387 	dip.di_tx = tx;
2388 	dip.di_np_tx = NULL;	/* no need for non-persistant backend */
2389 
2390 	if ((result = delete_stack_push(&dip, BACKEND_TYPE_NORMAL,
2391 	    &snaplevel_tbl_delete, oldsnapid, 0)) != REP_PROTOCOL_SUCCESS)
2392 		goto fail;
2393 
2394 	while (delete_stack_pop(&dip, &de)) {
2395 		result = (*de.de_cb)(&dip, &de);
2396 		if (result != REP_PROTOCOL_SUCCESS)
2397 			goto fail;
2398 	}
2399 
2400 	result = backend_tx_commit(tx);
2401 	if (result != REP_PROTOCOL_SUCCESS)
2402 		goto fail;
2403 
2404 	delete_stack_cleanup(&dip);
2405 	*snapid_ptr = snapid;
2406 	return (REP_PROTOCOL_SUCCESS);
2407 
2408 fail:
2409 	backend_tx_rollback(tx);
2410 	delete_stack_cleanup(&dip);
2411 	return (result);
2412 }
2413