xref: /illumos-gate/usr/src/cmd/svc/configd/object.c (revision 6bc074b1c1f7d3014541f4c3e3152dcf2b19eed6)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 /*
28  * This file only contains the transaction commit logic.
29  */
30 
31 #include <assert.h>
32 #include <alloca.h>
33 #include <errno.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <strings.h>
37 #include <sys/sysmacros.h>
38 #include "configd.h"
39 
40 #define	INVALID_OBJ_ID ((uint32_t)-1)
41 #define	INVALID_TYPE ((uint32_t)-1)
42 
43 struct tx_cmd {
44 	const struct rep_protocol_transaction_cmd *tx_cmd;
45 	const char	*tx_prop;
46 	uint32_t	*tx_values;
47 	uint32_t	tx_nvalues;
48 	uint32_t	tx_orig_value_id;
49 	char		tx_found;
50 	char		tx_processed;
51 	char		tx_bad;
52 };
53 
54 static int
55 tx_cmd_compare(const void *key, const void *elem_arg)
56 {
57 	const struct tx_cmd *elem = elem_arg;
58 
59 	return (strcmp((const char *)key, elem->tx_prop));
60 }
61 
62 struct tx_commit_data {
63 	uint32_t	txc_pg_id;
64 	uint32_t	txc_gen;
65 	uint32_t	txc_oldgen;
66 	short		txc_backend;
67 	backend_tx_t	*txc_tx;
68 	backend_query_t	*txc_inserts;
69 	size_t		txc_count;
70 	rep_protocol_responseid_t txc_result;
71 	struct tx_cmd	txc_cmds[1];		/* actually txc_count */
72 };
73 #define	TX_COMMIT_DATA_SIZE(count) \
74 	offsetof(struct tx_commit_data, txc_cmds[count])
75 
76 /*ARGSUSED*/
77 static int
78 tx_check_genid(void *data_arg, int columns, char **vals, char **names)
79 {
80 	tx_commit_data_t *data = data_arg;
81 	assert(columns == 1);
82 	if (atoi(vals[0]) != data->txc_oldgen)
83 		data->txc_result = REP_PROTOCOL_FAIL_NOT_LATEST;
84 	else
85 		data->txc_result = REP_PROTOCOL_SUCCESS;
86 	return (BACKEND_CALLBACK_CONTINUE);
87 }
88 
89 /*
90  * tx_process_property() is called once for each property in current
91  * property group generation.  Its purpose is threefold:
92  *
93  *	1. copy properties not mentioned in the transaction over unchanged.
94  *	2. mark DELETEd properties as seen (they will be left out of the new
95  *	   generation).
96  *	3. consistancy-check NEW, CLEAR, and REPLACE commands.
97  *
98  * Any consistancy problems set tx_bad, and seen properties are marked
99  * tx_found.  These is used later, in tx_process_cmds().
100  */
101 /*ARGSUSED*/
102 static int
103 tx_process_property(void *data_arg, int columns, char **vals, char **names)
104 {
105 	tx_commit_data_t *data = data_arg;
106 	struct tx_cmd *elem;
107 
108 	const char *prop_name = vals[0];
109 	const char *prop_type = vals[1];
110 	const char *lnk_val_id = vals[2];
111 
112 	char *endptr;
113 
114 	assert(columns == 3);
115 
116 	elem = bsearch(prop_name, data->txc_cmds, data->txc_count,
117 	    sizeof (*data->txc_cmds), tx_cmd_compare);
118 
119 	if (elem == NULL) {
120 		backend_query_add(data->txc_inserts,
121 		    "INSERT INTO prop_lnk_tbl"
122 		    "    (lnk_pg_id, lnk_gen_id, lnk_prop_name, lnk_prop_type,"
123 		    "    lnk_val_id) "
124 		    "VALUES ( %d, %d, '%q', '%q', %Q );",
125 		    data->txc_pg_id, data->txc_gen, prop_name, prop_type,
126 		    lnk_val_id);
127 	} else {
128 		assert(!elem->tx_found);
129 		elem->tx_found = 1;
130 
131 		if (lnk_val_id != NULL) {
132 			errno = 0;
133 			elem->tx_orig_value_id =
134 			    strtoul(lnk_val_id, &endptr, 10);
135 			if (elem->tx_orig_value_id == 0 || *endptr != 0 ||
136 			    errno != 0) {
137 				return (BACKEND_CALLBACK_ABORT);
138 			}
139 		} else {
140 			elem->tx_orig_value_id = 0;
141 		}
142 
143 		switch (elem->tx_cmd->rptc_action) {
144 		case REP_PROTOCOL_TX_ENTRY_NEW:
145 			elem->tx_bad = 1;
146 			data->txc_result = REP_PROTOCOL_FAIL_EXISTS;
147 			break;
148 		case REP_PROTOCOL_TX_ENTRY_CLEAR:
149 			if (REP_PROTOCOL_BASE_TYPE(elem->tx_cmd->rptc_type) !=
150 			    prop_type[0] &&
151 			    REP_PROTOCOL_SUBTYPE(elem->tx_cmd->rptc_type) !=
152 			    prop_type[1]) {
153 				elem->tx_bad = 1;
154 				data->txc_result =
155 				    REP_PROTOCOL_FAIL_TYPE_MISMATCH;
156 			}
157 			break;
158 		case REP_PROTOCOL_TX_ENTRY_REPLACE:
159 			break;
160 		case REP_PROTOCOL_TX_ENTRY_DELETE:
161 			elem->tx_processed = 1;
162 			break;
163 		default:
164 			assert(0);
165 			break;
166 		}
167 	}
168 	return (BACKEND_CALLBACK_CONTINUE);
169 }
170 
171 /*
172  * tx_process_cmds() finishes the job tx_process_property() started:
173  *
174  *	1. if tx_process_property() marked a command as bad, we skip it.
175  *	2. if a DELETE, REPLACE, or CLEAR operated on a non-existant property,
176  *	    we mark it as bad.
177  *	3. we complete the work of NEW, REPLACE, and CLEAR, by inserting the
178  *	    appropriate values into the database.
179  *	4. we delete all replaced data, if it is no longer referenced.
180  *
181  * Finally, we check all of the commands, and fail if anything was marked bad.
182  */
183 static int
184 tx_process_cmds(tx_commit_data_t *data)
185 {
186 	int idx;
187 	int r;
188 	int count = data->txc_count;
189 	struct tx_cmd *elem;
190 	uint32_t val_id = 0;
191 	uint8_t type[3];
192 
193 	backend_query_t *q;
194 	int do_delete;
195 
196 	/*
197 	 * For persistent pgs, we use backend_fail_if_seen to abort the
198 	 * deletion if there is a snapshot using our current state.
199 	 *
200 	 * All of the deletions in this function are safe, since
201 	 * rc_tx_commit() guarantees that all the data is in-cache.
202 	 */
203 	q = backend_query_alloc();
204 
205 	if (data->txc_backend != BACKEND_TYPE_NONPERSIST) {
206 		backend_query_add(q,
207 		    "SELECT 1 FROM snaplevel_lnk_tbl "
208 		    "    WHERE (snaplvl_pg_id = %d AND snaplvl_gen_id = %d); ",
209 		    data->txc_pg_id, data->txc_oldgen);
210 	}
211 	backend_query_add(q,
212 	    "DELETE FROM prop_lnk_tbl"
213 	    "    WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
214 	    data->txc_pg_id, data->txc_oldgen);
215 	r = backend_tx_run(data->txc_tx, q, backend_fail_if_seen, NULL);
216 	backend_query_free(q);
217 
218 	if (r == REP_PROTOCOL_SUCCESS)
219 		do_delete = 1;
220 	else if (r == REP_PROTOCOL_DONE)
221 		do_delete = 0;		/* old gen_id is in use */
222 	else
223 		return (r);
224 
225 	for (idx = 0; idx < count; idx++) {
226 		elem = &data->txc_cmds[idx];
227 
228 		if (elem->tx_bad)
229 			continue;
230 
231 		switch (elem->tx_cmd->rptc_action) {
232 		case REP_PROTOCOL_TX_ENTRY_DELETE:
233 		case REP_PROTOCOL_TX_ENTRY_REPLACE:
234 		case REP_PROTOCOL_TX_ENTRY_CLEAR:
235 			if (!elem->tx_found) {
236 				elem->tx_bad = 1;
237 				continue;
238 			}
239 			break;
240 		case REP_PROTOCOL_TX_ENTRY_NEW:
241 			break;
242 		default:
243 			assert(0);
244 			break;
245 		}
246 
247 		if (do_delete &&
248 		    elem->tx_cmd->rptc_action != REP_PROTOCOL_TX_ENTRY_NEW &&
249 		    elem->tx_orig_value_id != 0) {
250 			/*
251 			 * delete the old values, if they are not in use
252 			 */
253 			q = backend_query_alloc();
254 			backend_query_add(q,
255 			    "SELECT 1 FROM prop_lnk_tbl "
256 			    "    WHERE (lnk_val_id = %d); "
257 			    "DELETE FROM value_tbl"
258 			    "    WHERE (value_id = %d)",
259 			    elem->tx_orig_value_id, elem->tx_orig_value_id);
260 			r = backend_tx_run(data->txc_tx, q,
261 			    backend_fail_if_seen, NULL);
262 			backend_query_free(q);
263 			if (r != REP_PROTOCOL_SUCCESS && r != REP_PROTOCOL_DONE)
264 				return (r);
265 		}
266 
267 		if (elem->tx_cmd->rptc_action == REP_PROTOCOL_TX_ENTRY_DELETE)
268 			continue;		/* no further work to do */
269 
270 		type[0] = REP_PROTOCOL_BASE_TYPE(elem->tx_cmd->rptc_type);
271 		type[1] = REP_PROTOCOL_SUBTYPE(elem->tx_cmd->rptc_type);
272 		type[2] = 0;
273 
274 		if (elem->tx_nvalues == 0) {
275 			r = backend_tx_run_update(data->txc_tx,
276 			    "INSERT INTO prop_lnk_tbl"
277 			    "    (lnk_pg_id, lnk_gen_id, "
278 			    "    lnk_prop_name, lnk_prop_type, lnk_val_id) "
279 			    "VALUES ( %d, %d, '%q', '%q', NULL );",
280 			    data->txc_pg_id, data->txc_gen, elem->tx_prop,
281 			    type);
282 		} else {
283 			uint32_t *v, i = 0;
284 			const char *str;
285 
286 			val_id = backend_new_id(data->txc_tx, BACKEND_ID_VALUE);
287 			if (val_id == 0)
288 				return (REP_PROTOCOL_FAIL_NO_RESOURCES);
289 			r = backend_tx_run_update(data->txc_tx,
290 			    "INSERT INTO prop_lnk_tbl "
291 			    "    (lnk_pg_id, lnk_gen_id, "
292 			    "    lnk_prop_name, lnk_prop_type, lnk_val_id) "
293 			    "VALUES ( %d, %d, '%q', '%q', %d );",
294 			    data->txc_pg_id, data->txc_gen, elem->tx_prop,
295 			    type, val_id);
296 
297 			v = elem->tx_values;
298 
299 			for (i = 0; i < elem->tx_nvalues; i++) {
300 				str = (const char *)&v[1];
301 
302 				/*
303 				 * Update values in backend,  imposing
304 				 * ordering via the value_order column.
305 				 * This ordering is then used in subseqent
306 				 * value retrieval operations.  We can
307 				 * safely assume that the repository schema
308 				 * has been upgraded (and hence has the
309 				 * value_order column in value_tbl),  since
310 				 * it is upgraded as soon as the repository
311 				 * is writable.
312 				 */
313 				r = backend_tx_run_update(data->txc_tx,
314 				    "INSERT INTO value_tbl (value_id, "
315 				    "value_type, value_value, "
316 				    "value_order) VALUES (%d, '%c', "
317 				    "'%q', '%d');\n",
318 				    val_id, elem->tx_cmd->rptc_type,
319 				    str, i);
320 				if (r != REP_PROTOCOL_SUCCESS)
321 					break;
322 
323 				/*LINTED alignment*/
324 				v = (uint32_t *)((caddr_t)str + TX_SIZE(*v));
325 			}
326 		}
327 		if (r != REP_PROTOCOL_SUCCESS)
328 			return (REP_PROTOCOL_FAIL_UNKNOWN);
329 		elem->tx_processed = 1;
330 	}
331 
332 	for (idx = 0; idx < count; idx++) {
333 		elem = &data->txc_cmds[idx];
334 
335 		if (elem->tx_bad)
336 			return (REP_PROTOCOL_FAIL_BAD_TX);
337 	}
338 	return (REP_PROTOCOL_SUCCESS);
339 }
340 
341 static boolean_t
342 check_string(uintptr_t loc, uint32_t len, uint32_t sz)
343 {
344 	const char *ptr = (const char *)loc;
345 
346 	if (len == 0 || len > sz || ptr[len - 1] != 0 || strlen(ptr) != len - 1)
347 		return (0);
348 	return (1);
349 }
350 
351 static int
352 tx_check_and_setup(tx_commit_data_t *data, const void *cmds_arg,
353     uint32_t count)
354 {
355 	const struct rep_protocol_transaction_cmd *cmds;
356 	struct tx_cmd *cur;
357 	struct tx_cmd *prev = NULL;
358 
359 	uintptr_t loc;
360 	uint32_t sz, len;
361 	int idx;
362 
363 	loc = (uintptr_t)cmds_arg;
364 
365 	for (idx = 0; idx < count; idx++) {
366 		cur = &data->txc_cmds[idx];
367 
368 		cmds = (struct rep_protocol_transaction_cmd *)loc;
369 		cur->tx_cmd = cmds;
370 
371 		sz = cmds->rptc_size;
372 
373 		loc += REP_PROTOCOL_TRANSACTION_CMD_MIN_SIZE;
374 		sz -= REP_PROTOCOL_TRANSACTION_CMD_MIN_SIZE;
375 
376 		len = cmds->rptc_name_len;
377 		if (len <= 1 || !check_string(loc, len, sz)) {
378 			return (REP_PROTOCOL_FAIL_BAD_REQUEST);
379 		}
380 		cur->tx_prop = (const char *)loc;
381 
382 		len = TX_SIZE(len);
383 		loc += len;
384 		sz -= len;
385 
386 		cur->tx_nvalues = 0;
387 		cur->tx_values = (uint32_t *)loc;
388 
389 		while (sz > 0) {
390 			if (sz < sizeof (uint32_t))
391 				return (REP_PROTOCOL_FAIL_BAD_REQUEST);
392 
393 			cur->tx_nvalues++;
394 
395 			len = *(uint32_t *)loc;
396 			loc += sizeof (uint32_t);
397 			sz -= sizeof (uint32_t);
398 
399 			if (!check_string(loc, len, sz))
400 				return (REP_PROTOCOL_FAIL_BAD_REQUEST);
401 
402 			/*
403 			 * XXX here, we should be checking that the values
404 			 * match the purported type
405 			 */
406 
407 			len = TX_SIZE(len);
408 
409 			if (len > sz)
410 				return (REP_PROTOCOL_FAIL_BAD_REQUEST);
411 
412 			loc += len;
413 			sz -= len;
414 		}
415 
416 		if (prev != NULL && strcmp(prev->tx_prop, cur->tx_prop) >= 0)
417 			return (REP_PROTOCOL_FAIL_BAD_REQUEST);
418 
419 		prev = cur;
420 	}
421 	return (REP_PROTOCOL_SUCCESS);
422 }
423 
424 /*
425  * Free the memory associated with a tx_commit_data structure.
426  */
427 void
428 tx_commit_data_free(tx_commit_data_t *tx_data)
429 {
430 	uu_free(tx_data);
431 }
432 
433 /*
434  * Parse the data of a REP_PROTOCOL_PROPERTYGRP_TX_COMMIT message into a
435  * more useful form.  The data in the message will be represented by a
436  * tx_commit_data_t structure which is allocated by this function.  The
437  * address of the allocated structure is returned to *tx_data and must be
438  * freed by calling tx_commit_data_free().
439  *
440  * Parameters:
441  *	cmds_arg	Address of the commands in the
442  *			REP_PROTOCOL_PROPERTYGRP_TX_COMMIT message.
443  *
444  *	cmds_sz		Number of message bytes at cmds_arg.
445  *
446  *	tx_data		Points to the place to receive the address of the
447  *			allocated memory.
448  *
449  * Fails with
450  *	_BAD_REQUEST
451  *	_NO_RESOURCES
452  */
453 int
454 tx_commit_data_new(const void *cmds_arg, size_t cmds_sz,
455     tx_commit_data_t **tx_data)
456 {
457 	const struct rep_protocol_transaction_cmd *cmds;
458 	tx_commit_data_t *data;
459 	uintptr_t loc;
460 	uint32_t count;
461 	uint32_t sz;
462 	int ret;
463 
464 	/*
465 	 * First, verify that the reported sizes make sense, and count
466 	 * the number of commands.
467 	 */
468 	count = 0;
469 	loc = (uintptr_t)cmds_arg;
470 
471 	while (cmds_sz > 0) {
472 		cmds = (struct rep_protocol_transaction_cmd *)loc;
473 
474 		if (cmds_sz <= REP_PROTOCOL_TRANSACTION_CMD_MIN_SIZE)
475 			return (REP_PROTOCOL_FAIL_BAD_REQUEST);
476 
477 		sz = cmds->rptc_size;
478 		if (sz <= REP_PROTOCOL_TRANSACTION_CMD_MIN_SIZE)
479 			return (REP_PROTOCOL_FAIL_BAD_REQUEST);
480 
481 		sz = TX_SIZE(sz);
482 		if (sz > cmds_sz)
483 			return (REP_PROTOCOL_FAIL_BAD_REQUEST);
484 
485 		loc += sz;
486 		cmds_sz -= sz;
487 		count++;
488 	}
489 
490 	data = uu_zalloc(TX_COMMIT_DATA_SIZE(count));
491 	if (data == NULL)
492 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
493 
494 	/*
495 	 * verify that everything looks okay, and set up our command
496 	 * datastructures.
497 	 */
498 	data->txc_count = count;
499 	ret = tx_check_and_setup(data, cmds_arg, count);
500 	if (ret == REP_PROTOCOL_SUCCESS) {
501 		*tx_data = data;
502 	} else {
503 		*tx_data = NULL;
504 		uu_free(data);
505 	}
506 	return (ret);
507 }
508 
509 /*
510  * The following are a set of accessor functions to retrieve data from a
511  * tx_commit_data_t that has been allocated by tx_commit_data_new().
512  */
513 
514 /*
515  * Return the action of the transaction command whose command number is
516  * cmd_no.  The action is placed at *action.
517  *
518  * Returns:
519  *	_FAIL_BAD_REQUEST	cmd_no is out of range.
520  */
521 int
522 tx_cmd_action(tx_commit_data_t *tx_data, size_t cmd_no,
523     enum rep_protocol_transaction_action *action)
524 {
525 	struct tx_cmd *cur;
526 
527 	assert(cmd_no < tx_data->txc_count);
528 	if (cmd_no >= tx_data->txc_count)
529 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
530 
531 	cur = &tx_data->txc_cmds[cmd_no];
532 	*action = cur->tx_cmd->rptc_action;
533 	return (REP_PROTOCOL_SUCCESS);
534 }
535 
536 /*
537  * Return the number of transaction commands held in tx_data.
538  */
539 size_t
540 tx_cmd_count(tx_commit_data_t *tx_data)
541 {
542 	return (tx_data->txc_count);
543 }
544 
545 /*
546  * Return the number of property values that are associated with the
547  * transaction command whose number is cmd_no.  The number of values is
548  * returned to *nvalues.
549  *
550  * Returns:
551  *	_FAIL_BAD_REQUEST	cmd_no is out of range.
552  */
553 int
554 tx_cmd_nvalues(tx_commit_data_t *tx_data, size_t cmd_no, uint32_t *nvalues)
555 {
556 	struct tx_cmd *cur;
557 
558 	assert(cmd_no < tx_data->txc_count);
559 	if (cmd_no >= tx_data->txc_count)
560 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
561 
562 	cur = &tx_data->txc_cmds[cmd_no];
563 	*nvalues = cur->tx_nvalues;
564 	return (REP_PROTOCOL_SUCCESS);
565 }
566 
567 /*
568  * Return a pointer to the property name of the command whose number is
569  * cmd_no.  The property name pointer is returned to *pname.
570  *
571  * Returns:
572  *	_FAIL_BAD_REQUEST	cmd_no is out of range.
573  */
574 int
575 tx_cmd_prop(tx_commit_data_t *tx_data, size_t cmd_no, const char **pname)
576 {
577 	struct tx_cmd *cur;
578 
579 	assert(cmd_no < tx_data->txc_count);
580 	if (cmd_no >= tx_data->txc_count)
581 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
582 
583 	cur = &tx_data->txc_cmds[cmd_no];
584 	*pname = cur->tx_prop;
585 	return (REP_PROTOCOL_SUCCESS);
586 }
587 
588 /*
589  * Return the property type of the property whose command number is
590  * cmd_no.  The property type is returned to *ptype.
591  *
592  * Returns:
593  *	_FAIL_BAD_REQUEST	cmd_no is out of range.
594  */
595 int
596 tx_cmd_prop_type(tx_commit_data_t *tx_data, size_t cmd_no, uint32_t *ptype)
597 {
598 	struct tx_cmd *cur;
599 
600 	assert(cmd_no < tx_data->txc_count);
601 	if (cmd_no >= tx_data->txc_count)
602 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
603 
604 	cur = &tx_data->txc_cmds[cmd_no];
605 	*ptype = cur->tx_cmd->rptc_type;
606 	return (REP_PROTOCOL_SUCCESS);
607 }
608 
609 /*
610  * This function is used to retrieve a property value from the transaction
611  * data.  val_no specifies which value is to be retrieved from the
612  * transaction command whose number is cmd_no.  A pointer to the specified
613  * value is placed in *val.
614  *
615  * Returns:
616  *	_FAIL_BAD_REQUEST	cmd_no or val_no is out of range.
617  */
618 int
619 tx_cmd_value(tx_commit_data_t *tx_data, size_t cmd_no, uint32_t val_no,
620     const char **val)
621 {
622 	const char *bp;
623 	struct tx_cmd *cur;
624 	uint32_t i;
625 	uint32_t value_len;
626 
627 	assert(cmd_no < tx_data->txc_count);
628 	if (cmd_no >= tx_data->txc_count)
629 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
630 
631 	cur = &tx_data->txc_cmds[cmd_no];
632 	assert(val_no < cur->tx_nvalues);
633 	if (val_no >= cur->tx_nvalues)
634 		return (REP_PROTOCOL_FAIL_BAD_REQUEST);
635 
636 	/* Find the correct value */
637 	bp = (char *)cur->tx_values;
638 	for (i = 0; i < val_no; i++) {
639 		/* LINTED alignment */
640 		value_len = *(uint32_t *)bp;
641 		bp += sizeof (uint32_t) + TX_SIZE(value_len);
642 	}
643 
644 	/* Bypass the count & return pointer to value. */
645 	bp += sizeof (uint32_t);
646 	*val = bp;
647 	return (REP_PROTOCOL_SUCCESS);
648 }
649 
650 int
651 object_tx_commit(rc_node_lookup_t *lp, tx_commit_data_t *data, uint32_t *gen)
652 {
653 	uint32_t new_gen;
654 	int ret;
655 	rep_protocol_responseid_t r;
656 	backend_tx_t *tx;
657 	backend_query_t *q;
658 	int backend = lp->rl_backend;
659 
660 	ret = backend_tx_begin(backend, &tx);
661 	if (ret != REP_PROTOCOL_SUCCESS)
662 		return (ret);
663 
664 	/* Make sure the pg is up-to-date. */
665 	data->txc_oldgen = *gen;
666 	data->txc_backend = backend;
667 	data->txc_result = REP_PROTOCOL_FAIL_NOT_FOUND;
668 
669 	q = backend_query_alloc();
670 	backend_query_add(q, "SELECT pg_gen_id FROM pg_tbl WHERE (pg_id = %d);",
671 	    lp->rl_main_id);
672 	r = backend_tx_run(tx, q, tx_check_genid, data);
673 	backend_query_free(q);
674 
675 	if (r != REP_PROTOCOL_SUCCESS ||
676 	    (r = data->txc_result) != REP_PROTOCOL_SUCCESS) {
677 		backend_tx_rollback(tx);
678 		goto end;
679 	}
680 
681 	/* If the transaction is empty, cut out early. */
682 	if (data->txc_count == 0) {
683 		backend_tx_rollback(tx);
684 		r = REP_PROTOCOL_DONE;
685 		goto end;
686 	}
687 
688 	new_gen = backend_new_id(tx, BACKEND_ID_GENERATION);
689 	if (new_gen == 0) {
690 		backend_tx_rollback(tx);
691 		return (REP_PROTOCOL_FAIL_NO_RESOURCES);
692 	}
693 
694 	data->txc_pg_id = lp->rl_main_id;
695 	data->txc_gen = new_gen;
696 	data->txc_tx = tx;
697 
698 	r = backend_tx_run_update(tx,
699 	    "UPDATE pg_tbl SET pg_gen_id = %d "
700 	    "    WHERE (pg_id = %d AND pg_gen_id = %d);",
701 	    new_gen, lp->rl_main_id, *gen);
702 
703 	if (r != REP_PROTOCOL_SUCCESS) {
704 		backend_tx_rollback(tx);
705 		goto end;
706 	}
707 
708 	q = backend_query_alloc();
709 
710 	backend_query_add(q,
711 	    "SELECT lnk_prop_name, lnk_prop_type, lnk_val_id "
712 	    "FROM prop_lnk_tbl "
713 	    "WHERE (lnk_pg_id = %d AND lnk_gen_id = %d)",
714 	    lp->rl_main_id, *gen);
715 
716 	data->txc_inserts = backend_query_alloc();
717 	r = backend_tx_run(tx, q, tx_process_property, data);
718 	backend_query_free(q);
719 
720 	if (r == REP_PROTOCOL_DONE)
721 		r = REP_PROTOCOL_FAIL_UNKNOWN;		/* corruption */
722 
723 	if (r != REP_PROTOCOL_SUCCESS ||
724 	    (r = data->txc_result) != REP_PROTOCOL_SUCCESS) {
725 		backend_query_free(data->txc_inserts);
726 		backend_tx_rollback(tx);
727 		goto end;
728 	}
729 
730 	r = backend_tx_run(tx, data->txc_inserts, NULL, NULL);
731 	backend_query_free(data->txc_inserts);
732 
733 	if (r != REP_PROTOCOL_SUCCESS) {
734 		backend_tx_rollback(tx);
735 		goto end;
736 	}
737 
738 	r = tx_process_cmds(data);
739 	if (r != REP_PROTOCOL_SUCCESS) {
740 		backend_tx_rollback(tx);
741 		goto end;
742 	}
743 	r = backend_tx_commit(tx);
744 
745 	if (r == REP_PROTOCOL_SUCCESS)
746 		*gen = new_gen;
747 end:
748 	return (r);
749 }
750