xref: /titanic_51/usr/src/uts/common/avs/ns/rdc/rdc_dev.c (revision 3270659f55e0928d6edec3d26217cc29398a8149)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 
27 #include <sys/types.h>
28 #include <sys/ksynch.h>
29 #include <sys/kmem.h>
30 #include <sys/errno.h>
31 #include <sys/cmn_err.h>
32 #include <sys/debug.h>
33 #include <sys/cred.h>
34 #include <sys/file.h>
35 #include <sys/ddi.h>
36 #include <sys/nsc_thread.h>
37 #include <sys/unistat/spcs_s.h>
38 #include <sys/unistat/spcs_errors.h>
39 
40 #include <sys/unistat/spcs_s_k.h>
41 #ifdef DS_DDICT
42 #include "../contract.h"
43 #endif
44 
45 #include <sys/nsctl/nsctl.h>
46 
47 #include <sys/sdt.h>		/* dtrace is S10 or later */
48 
49 #include "rdc.h"
50 #include "rdc_io.h"
51 #include "rdc_bitmap.h"
52 
53 /*
54  * Remote Dual Copy
55  *
56  * This file contains the nsctl io provider functionality for RDC.
57  *
58  * RDC is implemented as a simple filter module that pushes itself between
59  * user (SIMCKD, STE, etc.) and SDBC.
60  */
61 
62 
63 static int _rdc_open_count;
64 int	rdc_eio_nobmp = 0;
65 
66 nsc_io_t *_rdc_io_hc;
67 static nsc_io_t *_rdc_io_hr;
68 static nsc_def_t _rdc_fd_def[], _rdc_io_def[], _rdc_ior_def[];
69 
70 void _rdc_deinit_dev();
71 int rdc_diskq_enqueue(rdc_k_info_t *, rdc_aio_t *);
72 extern void rdc_unintercept_diskq(rdc_group_t *);
73 rdc_aio_t *rdc_aio_tbuf_get(void *, void *, int, int, int, int, int);
74 
75 static nsc_buf_t *_rdc_alloc_handle(void (*)(), void (*)(),
76     void (*)(), rdc_fd_t *);
77 static int _rdc_free_handle(rdc_buf_t *, rdc_fd_t *);
78 
79 #ifdef DEBUG
80 int	rdc_overlap_cnt;
81 int	rdc_overlap_hnd_cnt;
82 #endif
83 
84 static rdc_info_dev_t *rdc_devices;
85 
86 extern int _rdc_rsrv_diskq(rdc_group_t *group);
87 extern void _rdc_rlse_diskq(rdc_group_t *group);
88 
89 /*
90  * _rdc_init_dev
91  *	Initialise the io provider.
92  */
93 
94 int
95 _rdc_init_dev()
96 {
97 	_rdc_io_hc = nsc_register_io("rdc-high-cache",
98 	    NSC_RDCH_ID|NSC_REFCNT|NSC_FILTER, _rdc_io_def);
99 	if (_rdc_io_hc == NULL)
100 		cmn_err(CE_WARN, "!rdc: nsc_register_io (high, cache) failed.");
101 
102 	_rdc_io_hr = nsc_register_io("rdc-high-raw",
103 	    NSC_RDCHR_ID|NSC_REFCNT|NSC_FILTER, _rdc_ior_def);
104 	if (_rdc_io_hr == NULL)
105 		cmn_err(CE_WARN, "!rdc: nsc_register_io (high, raw) failed.");
106 
107 	if (!_rdc_io_hc || !_rdc_io_hr) {
108 		_rdc_deinit_dev();
109 		return (ENOMEM);
110 	}
111 
112 	return (0);
113 }
114 
115 
116 /*
117  * _rdc_deinit_dev
118  *	De-initialise the io provider.
119  *
120  */
121 
122 void
123 _rdc_deinit_dev()
124 {
125 	int rc;
126 
127 	if (_rdc_io_hc) {
128 		if ((rc = nsc_unregister_io(_rdc_io_hc, 0)) != 0)
129 			cmn_err(CE_WARN,
130 			    "!rdc: nsc_unregister_io (high, cache) failed: %d",
131 			    rc);
132 	}
133 
134 	if (_rdc_io_hr) {
135 		if ((rc = nsc_unregister_io(_rdc_io_hr, 0)) != 0)
136 			cmn_err(CE_WARN,
137 			    "!rdc: nsc_unregister_io (high, raw) failed: %d",
138 			    rc);
139 	}
140 }
141 
142 
143 /*
144  * rdc_idev_open
145  * - Open the nsctl file descriptors for the data devices.
146  *
147  * Must be called with rdc_conf_lock held.
148  * id_sets is protected by rdc_conf_lock.
149  */
150 static rdc_info_dev_t *
151 rdc_idev_open(rdc_k_info_t *krdc, char *pathname, int *rc)
152 {
153 	rdc_info_dev_t *dp;
154 
155 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
156 
157 	for (dp = rdc_devices; dp; dp = dp->id_next) {
158 		if (dp->id_cache_dev.bi_fd &&
159 		    strcmp(pathname, nsc_pathname(dp->id_cache_dev.bi_fd)) == 0)
160 			break;
161 	}
162 
163 	if (!dp) {
164 		dp = kmem_zalloc(sizeof (*dp), KM_SLEEP);
165 		if (!dp)
166 			return (NULL);
167 
168 		dp->id_cache_dev.bi_krdc = krdc;
169 		dp->id_cache_dev.bi_fd = nsc_open(pathname,
170 		    NSC_RDCHR_ID|NSC_RDWR|NSC_DEVICE,
171 		    _rdc_fd_def, (blind_t)&dp->id_cache_dev, rc);
172 		if (!dp->id_cache_dev.bi_fd) {
173 			kmem_free(dp, sizeof (*dp));
174 			return (NULL);
175 		}
176 
177 		dp->id_raw_dev.bi_krdc = krdc;
178 		dp->id_raw_dev.bi_fd = nsc_open(pathname,
179 		    NSC_RDCHR_ID|NSC_RDWR|NSC_DEVICE,
180 		    _rdc_fd_def, (blind_t)&dp->id_raw_dev, rc);
181 		if (!dp->id_raw_dev.bi_fd) {
182 			(void) nsc_close(dp->id_cache_dev.bi_fd);
183 			kmem_free(dp, sizeof (*dp));
184 			return (NULL);
185 		}
186 
187 		mutex_init(&dp->id_rlock, NULL, MUTEX_DRIVER, NULL);
188 		cv_init(&dp->id_rcv, NULL, CV_DRIVER, NULL);
189 
190 		dp->id_next = rdc_devices;
191 		rdc_devices = dp;
192 	}
193 
194 	dp->id_sets++;
195 	return (dp);
196 }
197 
198 
199 /*
200  * rdc_idev_close
201  * - Close the nsctl file descriptors for the data devices.
202  *
203  * Must be called with rdc_conf_lock and dp->id_rlock held.
204  * Will release dp->id_rlock before returning.
205  *
206  * id_sets is protected by rdc_conf_lock.
207  */
208 static void
209 rdc_idev_close(rdc_k_info_t *krdc, rdc_info_dev_t *dp)
210 {
211 	rdc_info_dev_t **dpp;
212 #ifdef DEBUG
213 	int count = 0;
214 #endif
215 
216 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
217 	ASSERT(MUTEX_HELD(&dp->id_rlock));
218 
219 	dp->id_sets--;
220 	if (dp->id_sets > 0) {
221 		mutex_exit(&dp->id_rlock);
222 		return;
223 	}
224 
225 	/* external references must have gone */
226 	ASSERT((krdc->c_ref + krdc->r_ref + krdc->b_ref) == 0);
227 
228 	/* unlink from chain */
229 
230 	for (dpp = &rdc_devices; *dpp; dpp = &((*dpp)->id_next)) {
231 		if (*dpp == dp) {
232 			/* unlink */
233 			*dpp = dp->id_next;
234 			break;
235 		}
236 	}
237 
238 	/*
239 	 * Wait for all reserves to go away - the rpc server is
240 	 * running asynchronously with this close, and so we
241 	 * have to wait for it to spot that the krdc is !IS_ENABLED()
242 	 * and throw away the nsc_buf_t's that it has allocated
243 	 * and release the device.
244 	 */
245 
246 	while (IS_CRSRV(krdc) || IS_RRSRV(krdc)) {
247 #ifdef DEBUG
248 		if (!(++count % 16)) {
249 			cmn_err(CE_NOTE,
250 			    "!_rdc_idev_close(%s): waiting for nsc_release",
251 			    rdc_u_info[krdc->index].primary.file);
252 		}
253 		if (count > (16*20)) {
254 			/* waited for 20 seconds - too long - panic */
255 			cmn_err(CE_PANIC,
256 			    "!_rdc_idev_close(%s, %p): lost nsc_release",
257 			    rdc_u_info[krdc->index].primary.file, (void *)krdc);
258 		}
259 #endif
260 		mutex_exit(&dp->id_rlock);
261 		delay(HZ>>4);
262 		mutex_enter(&dp->id_rlock);
263 	}
264 
265 	if (dp->id_cache_dev.bi_fd) {
266 		(void) nsc_close(dp->id_cache_dev.bi_fd);
267 		dp->id_cache_dev.bi_fd = NULL;
268 	}
269 
270 	if (dp->id_raw_dev.bi_fd) {
271 		(void) nsc_close(dp->id_raw_dev.bi_fd);
272 		dp->id_raw_dev.bi_fd = NULL;
273 	}
274 
275 	mutex_exit(&dp->id_rlock);
276 	mutex_destroy(&dp->id_rlock);
277 	cv_destroy(&dp->id_rcv);
278 
279 	kmem_free(dp, sizeof (*dp));
280 }
281 
282 
283 /*
284  * This function provokes an nsc_reserve() for the device which
285  * if successful will populate krdc->maxfbas and urdc->volume_size
286  * via the _rdc_attach_fd() callback.
287  */
288 void
289 rdc_get_details(rdc_k_info_t *krdc)
290 {
291 	int rc;
292 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
293 	nsc_size_t vol_size, maxfbas;
294 
295 	if (_rdc_rsrv_devs(krdc, RDC_RAW, RDC_INTERNAL) == 0) {
296 		/*
297 		 * if the vol is already reserved,
298 		 * volume_size won't be populated on enable because
299 		 * it is a *fake* reserve and does not make it to
300 		 * _rdc_attach_fd(). So do it here.
301 		 */
302 		rc = nsc_partsize(RDC_U_FD(krdc), &vol_size);
303 		if (rc != 0) {
304 #ifdef DEBUG
305 			cmn_err(CE_WARN,
306 			    "!rdc_get_details: partsize failed (%d)", rc);
307 #endif /* DEBUG */
308 			urdc->volume_size = vol_size = 0;
309 		}
310 
311 		urdc->volume_size = vol_size;
312 		rc = nsc_maxfbas(RDC_U_FD(krdc), 0, &maxfbas);
313 		if (rc != 0) {
314 #ifdef DEBUG
315 			cmn_err(CE_WARN,
316 			    "!rdc_get_details: maxfbas failed (%d)", rc);
317 #endif /* DEBUG */
318 			maxfbas = 0;
319 		}
320 		krdc->maxfbas = min(RDC_MAX_MAXFBAS, maxfbas);
321 
322 		_rdc_rlse_devs(krdc, RDC_RAW);
323 	}
324 }
325 
326 
327 /*
328  * Should only be used by the config code.
329  */
330 
331 int
332 rdc_dev_open(rdc_set_t *rdc_set, int options)
333 {
334 	rdc_k_info_t *krdc;
335 	int index;
336 	int rc;
337 	char *pathname;
338 
339 	ASSERT(MUTEX_HELD(&rdc_conf_lock));
340 
341 	if (options & RDC_OPT_PRIMARY)
342 		pathname = rdc_set->primary.file;
343 	else
344 		pathname = rdc_set->secondary.file;
345 
346 	for (index = 0; index < rdc_max_sets; index++) {
347 		krdc = &rdc_k_info[index];
348 
349 		if (!IS_CONFIGURED(krdc))
350 			break;
351 	}
352 
353 	if (index == rdc_max_sets) {
354 #ifdef DEBUG
355 		cmn_err(CE_WARN, "!rdc_dev_open: out of cd\'s");
356 #endif
357 		index = -EINVAL;
358 		goto out;
359 	}
360 
361 	if (krdc->devices && (krdc->c_fd || krdc->r_fd)) {
362 #ifdef DEBUG
363 		cmn_err(CE_WARN, "!rdc_dev_open: %s already open", pathname);
364 #endif
365 		index = -EINVAL;
366 		goto out;
367 	}
368 
369 	_rdc_open_count++;
370 
371 	krdc->devices = rdc_idev_open(krdc, pathname, &rc);
372 	if (!krdc->devices) {
373 		index = -rc;
374 		goto open_fail;
375 	}
376 
377 	/*
378 	 * Grab the device size and maxfbas now.
379 	 */
380 
381 	rdc_get_details(krdc);
382 
383 out:
384 	return (index);
385 
386 open_fail:
387 	_rdc_open_count--;
388 
389 	return (index);
390 }
391 
392 
393 void
394 rdc_dev_close(rdc_k_info_t *krdc)
395 {
396 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
397 
398 	mutex_enter(&rdc_conf_lock);
399 
400 	if (krdc->devices)
401 		mutex_enter(&krdc->devices->id_rlock);
402 
403 #ifdef DEBUG
404 	if (!krdc->devices || !krdc->c_fd || !krdc->r_fd) {
405 		cmn_err(CE_WARN,
406 		    "!rdc_dev_close(%p): c_fd %p r_fd %p", (void *)krdc,
407 		    (void *) (krdc->devices ? krdc->c_fd : 0),
408 		    (void *) (krdc->devices ? krdc->r_fd : 0));
409 	}
410 #endif
411 
412 	if (krdc->devices) {
413 		/* rdc_idev_close will release id_rlock */
414 		rdc_idev_close(krdc, krdc->devices);
415 		krdc->devices = NULL;
416 	}
417 
418 	urdc->primary.file[0] = '\0';
419 
420 	if (_rdc_open_count <= 0) {
421 		cmn_err(CE_WARN, "!rdc: _rdc_open_count corrupt: %d",
422 		    _rdc_open_count);
423 	}
424 
425 	_rdc_open_count--;
426 
427 	mutex_exit(&rdc_conf_lock);
428 }
429 
430 
431 /*
432  * rdc_intercept
433  *
434  * Register for IO on this device with nsctl.
435  *
436  * For a 1-to-many primary we register for each krdc and let nsctl sort
437  * out which it wants to be using. This means that we cannot tell which
438  * krdc will receive the incoming io from nsctl, though we do know that
439  * at any one time only one krdc will be 'attached' and so get io from
440  * nsctl.
441  *
442  * So the krdc->many_next pointer is maintained as a circular list. The
443  * result of these multiple nsc_register_paths is that we will see a
444  * few more attach and detach io provider calls during enable/resume
445  * and disable/suspend of the 1-to-many whilst nsctl settles down to
446  * using a single krdc.
447  *
448  * The major advantage of this scheme is that nsctl sorts out all the
449  * rdc_fd_t's so that they can only point to krdc's that are currently
450  * active.
451  */
452 int
453 rdc_intercept(rdc_k_info_t *krdc)
454 {
455 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
456 	char *pathname;
457 	char *bitmap;
458 
459 	if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
460 		pathname = urdc->primary.file;
461 		bitmap = urdc->primary.bitmap;
462 	} else {
463 		pathname = urdc->secondary.file;
464 		bitmap = urdc->secondary.bitmap;
465 	}
466 
467 	if (!krdc->b_tok)
468 		krdc->b_tok = nsc_register_path(bitmap, NSC_CACHE | NSC_DEVICE,
469 		    _rdc_io_hc);
470 
471 	if (!krdc->c_tok)
472 		krdc->c_tok = nsc_register_path(pathname, NSC_CACHE,
473 		    _rdc_io_hc);
474 
475 	if (!krdc->r_tok)
476 		krdc->r_tok = nsc_register_path(pathname, NSC_DEVICE,
477 		    _rdc_io_hr);
478 
479 	if (!krdc->c_tok || !krdc->r_tok) {
480 		(void) rdc_unintercept(krdc);
481 		return (ENXIO);
482 	}
483 
484 	return (0);
485 }
486 
487 
488 static void
489 wait_unregistering(rdc_k_info_t *krdc)
490 {
491 	while (krdc->group->unregistering > 0)
492 		(void) cv_wait_sig(&krdc->group->unregistercv, &rdc_conf_lock);
493 }
494 
495 static void
496 set_unregistering(rdc_k_info_t *krdc)
497 {
498 	wait_unregistering(krdc);
499 
500 	krdc->group->unregistering++;
501 }
502 
503 static void
504 wakeup_unregistering(rdc_k_info_t *krdc)
505 {
506 	if (krdc->group->unregistering <= 0)
507 		return;
508 
509 	krdc->group->unregistering--;
510 	cv_broadcast(&krdc->group->unregistercv);
511 }
512 
513 
514 /*
515  * rdc_unintercept
516  *
517  * Unregister for IO on this device.
518  *
519  * See comments above rdc_intercept.
520  */
521 int
522 rdc_unintercept(rdc_k_info_t *krdc)
523 {
524 	int err = 0;
525 	int rc;
526 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
527 
528 	mutex_enter(&rdc_conf_lock);
529 	set_unregistering(krdc);
530 	krdc->type_flag |= RDC_UNREGISTER;
531 	mutex_exit(&rdc_conf_lock);
532 
533 	if (krdc->r_tok) {
534 		rc = nsc_unregister_path(krdc->r_tok, 0);
535 		if (rc) {
536 			cmn_err(CE_WARN, "!rdc: unregister rawfd %d", rc);
537 			err = rc;
538 		}
539 		krdc->r_tok = NULL;
540 	}
541 
542 	if (krdc->c_tok) {
543 		rc = nsc_unregister_path(krdc->c_tok, 0);
544 		if (rc) {
545 			cmn_err(CE_WARN, "!rdc: unregister cachefd %d", rc);
546 			if (!err)
547 				err = rc;
548 		}
549 		krdc->c_tok = NULL;
550 	}
551 
552 	if (krdc->b_tok) {
553 		rc = nsc_unregister_path(krdc->b_tok, 0);
554 		if (rc) {
555 			cmn_err(CE_WARN, "!rdc: unregister bitmap %d", rc);
556 			err = rc;
557 		}
558 		krdc->b_tok = NULL;
559 	}
560 
561 	rdc_group_enter(krdc);
562 
563 	/* Wait for all necessary _rdc_close() calls to complete */
564 	while ((krdc->c_ref + krdc->r_ref + krdc->b_ref) != 0) {
565 		krdc->closing++;
566 		cv_wait(&krdc->closingcv, &krdc->group->lock);
567 		krdc->closing--;
568 	}
569 
570 	rdc_clr_flags(urdc, RDC_ENABLED);
571 	rdc_group_exit(krdc);
572 
573 
574 	/*
575 	 * Check there are no outstanding writes in progress.
576 	 * This can happen when a set is being disabled which
577 	 * is one of the 'one_to_many' chain, that did not
578 	 * intercept the original write call.
579 	 */
580 
581 	for (;;) {
582 		rdc_group_enter(krdc);
583 		if (krdc->aux_state & RDC_AUXWRITE) {
584 			rdc_group_exit(krdc);
585 			/*
586 			 * This doesn't happen very often,
587 			 * just delay a bit and re-look.
588 			 */
589 			delay(50);
590 		} else {
591 			rdc_group_exit(krdc);
592 			break;
593 		}
594 	}
595 
596 	mutex_enter(&rdc_conf_lock);
597 	krdc->type_flag &= ~RDC_UNREGISTER;
598 	wakeup_unregistering(krdc);
599 	mutex_exit(&rdc_conf_lock);
600 
601 	return (err);
602 }
603 
604 
605 /*
606  * _rdc_rlse_d
607  *	Internal version of _rdc_rlse_devs(), only concerned with the
608  *	data device, not the bitmap.
609  */
610 
611 static void
612 _rdc_rlse_d(rdc_k_info_t *krdc, int devs)
613 {
614 	_rdc_info_dev_t *cip;
615 	_rdc_info_dev_t *rip;
616 	int raw = (devs & RDC_RAW);
617 
618 	if (!krdc) {
619 		cmn_err(CE_WARN, "!rdc: _rdc_rlse_devs null krdc");
620 		return;
621 	}
622 
623 	ASSERT((devs & (~RDC_BMP)) != 0);
624 
625 	cip = &krdc->devices->id_cache_dev;
626 	rip = &krdc->devices->id_raw_dev;
627 
628 	if (IS_RSRV(cip)) {
629 		/* decrement count */
630 
631 		if (raw) {
632 			if (cip->bi_ofailed > 0) {
633 				cip->bi_ofailed--;
634 			} else if (cip->bi_orsrv > 0) {
635 				cip->bi_orsrv--;
636 			}
637 		} else {
638 			if (cip->bi_failed > 0) {
639 				cip->bi_failed--;
640 			} else if (cip->bi_rsrv > 0) {
641 				cip->bi_rsrv--;
642 			}
643 		}
644 
645 		/*
646 		 * reset nsc_fd ownership back link, it is only set if
647 		 * we have really done an underlying reserve, not for
648 		 * failed (faked) reserves.
649 		 */
650 
651 		if (cip->bi_rsrv > 0 || cip->bi_orsrv > 0) {
652 			nsc_set_owner(cip->bi_fd, krdc->iodev);
653 		} else {
654 			nsc_set_owner(cip->bi_fd, NULL);
655 		}
656 
657 		/* release nsc_fd */
658 
659 		if (!IS_RSRV(cip)) {
660 			nsc_release(cip->bi_fd);
661 		}
662 	} else if (IS_RSRV(rip)) {
663 		/* decrement count */
664 
665 		if (raw) {
666 			if (rip->bi_failed > 0) {
667 				rip->bi_failed--;
668 			} else if (rip->bi_rsrv > 0) {
669 				rip->bi_rsrv--;
670 			}
671 		} else {
672 			if (rip->bi_ofailed > 0) {
673 				rip->bi_ofailed--;
674 			} else if (rip->bi_orsrv > 0) {
675 				rip->bi_orsrv--;
676 			}
677 		}
678 
679 		/*
680 		 * reset nsc_fd ownership back link, it is only set if
681 		 * we have really done an underlying reserve, not for
682 		 * failed (faked) reserves.
683 		 */
684 
685 		if (rip->bi_rsrv > 0 || rip->bi_orsrv > 0) {
686 			nsc_set_owner(rip->bi_fd, krdc->iodev);
687 		} else {
688 			nsc_set_owner(rip->bi_fd, NULL);
689 		}
690 
691 		/* release nsc_fd and any waiters */
692 
693 		if (!IS_RSRV(rip)) {
694 			rip->bi_flag = 0;
695 			nsc_release(rip->bi_fd);
696 			cv_broadcast(&krdc->devices->id_rcv);
697 		}
698 	} else {
699 		cmn_err(CE_WARN, "!rdc: _rdc_rlse_devs no reserve? krdc %p",
700 		    (void *) krdc);
701 	}
702 }
703 
704 /*
705  * _rdc_rlse_devs
706  *	Release named underlying devices and take care of setting the
707  *	back link on the nsc_fd to the correct parent iodev.
708  *
709  *	NOTE: the 'devs' argument must be the same as that passed to
710  *	the preceding _rdc_rsrv_devs call.
711  */
712 
713 void
714 _rdc_rlse_devs(rdc_k_info_t *krdc, int devs)
715 {
716 
717 	DTRACE_PROBE(_rdc_rlse_devs_start);
718 	mutex_enter(&krdc->devices->id_rlock);
719 
720 	ASSERT(!(devs & RDC_CACHE));
721 
722 	if ((devs & (~RDC_BMP)) != 0) {
723 		_rdc_rlse_d(krdc, devs);
724 	}
725 
726 	if ((devs & RDC_BMP) != 0) {
727 		if (krdc->bmaprsrv > 0 && --krdc->bmaprsrv == 0) {
728 			nsc_release(krdc->bitmapfd);
729 		}
730 	}
731 
732 	mutex_exit(&krdc->devices->id_rlock);
733 
734 }
735 
736 /*
737  * _rdc_rsrv_d
738  *	Reserve device flagged, unless its companion is already reserved,
739  *	in that case increase the reserve on the companion.  Take care
740  *	of setting the nsc_fd ownership back link to the correct parent
741  *	iodev pointer.
742  */
743 
744 static int
745 _rdc_rsrv_d(int raw, _rdc_info_dev_t *rid, _rdc_info_dev_t *cid, int flag,
746     rdc_k_info_t *krdc)
747 {
748 	_rdc_info_dev_t *p = NULL;
749 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
750 	int other = 0;
751 	int rc;
752 
753 
754 #ifdef DEBUG
755 	if ((rid->bi_rsrv < 0) ||
756 	    (cid->bi_rsrv < 0) ||
757 	    (rid->bi_orsrv < 0) ||
758 	    (cid->bi_orsrv < 0) ||
759 	    (rid->bi_failed < 0) ||
760 	    (cid->bi_failed < 0) ||
761 	    (rid->bi_ofailed < 0) ||
762 	    (cid->bi_ofailed < 0)) {
763 		cmn_err(CE_WARN,
764 		    "!_rdc_rsrv_d: negative counts (rsrv %d %d orsrv %d %d)",
765 		    rid->bi_rsrv, cid->bi_rsrv,
766 		    rid->bi_orsrv, cid->bi_orsrv);
767 		cmn_err(CE_WARN,
768 		    "!_rdc_rsrv_d: negative counts (fail %d %d ofail %d %d)",
769 		    rid->bi_failed, cid->bi_failed,
770 		    rid->bi_ofailed, cid->bi_ofailed);
771 		cmn_err(CE_PANIC, "_rdc_rsrv_d: negative counts (krdc %p)",
772 		    (void *) krdc);
773 	}
774 #endif
775 
776 	/*
777 	 * If user wants to do a cache reserve and it's already
778 	 * raw reserved internally, we need to do a real nsc_reserve, so wait
779 	 * until the release has been done.
780 	 */
781 	if (IS_RSRV(rid) && (flag == RDC_EXTERNAL) &&
782 	    (raw == 0) && (rid->bi_flag != RDC_EXTERNAL)) {
783 		krdc->devices->id_release++;
784 		while (IS_RSRV(rid))
785 			cv_wait(&krdc->devices->id_rcv,
786 			    &krdc->devices->id_rlock);
787 		krdc->devices->id_release--;
788 	}
789 
790 	/* select underlying device to use */
791 
792 	if (IS_RSRV(rid)) {
793 		p = rid;
794 		if (!raw) {
795 			other = 1;
796 		}
797 	} else if (IS_RSRV(cid)) {
798 		p = cid;
799 		if (raw) {
800 			other = 1;
801 		}
802 	}
803 
804 	/* just increment count and return if already reserved */
805 
806 	if (p && !RFAILED(p)) {
807 		if (other) {
808 			p->bi_orsrv++;
809 		} else {
810 			p->bi_rsrv++;
811 		}
812 
813 		/* set nsc_fd ownership back link */
814 		nsc_set_owner(p->bi_fd, krdc->iodev);
815 		return (0);
816 	}
817 
818 	/* attempt reserve */
819 
820 	if (!p) {
821 		p = raw ? rid : cid;
822 	}
823 
824 	if (!p->bi_fd) {
825 		/* rpc server raced with rdc_dev_close() */
826 		return (EIO);
827 	}
828 	if ((rc = nsc_reserve(p->bi_fd, 0)) == 0) {
829 		/*
830 		 * convert failed counts into reserved counts, and add
831 		 * in this reserve.
832 		 */
833 
834 		p->bi_orsrv = p->bi_ofailed;
835 		p->bi_rsrv = p->bi_failed;
836 
837 		if (other) {
838 			p->bi_orsrv++;
839 		} else {
840 			p->bi_rsrv++;
841 		}
842 
843 		p->bi_ofailed = 0;
844 		p->bi_failed = 0;
845 
846 		/* set nsc_fd ownership back link */
847 
848 		nsc_set_owner(p->bi_fd, krdc->iodev);
849 	} else if (rc != EINTR) {
850 		/*
851 		 * If this is the master, and the secondary is not
852 		 * failed, then just fake this external reserve so that
853 		 * we can do remote io to the secondary and continue to
854 		 * provide service to the client.
855 		 *
856 		 * Subsequent calls to _rdc_rsrv_d() will re-try the
857 		 * nsc_reserve() until it succeeds.
858 		 */
859 
860 		if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
861 		    !(rdc_get_vflags(urdc) & RDC_LOGGING) &&
862 		    !((rdc_get_vflags(urdc) & RDC_SLAVE) &&
863 		    (rdc_get_vflags(urdc) & RDC_SYNCING))) {
864 			if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
865 				rdc_many_enter(krdc);
866 				/* Primary, so reverse sync needed */
867 				rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
868 				rdc_set_flags_log(urdc, RDC_VOL_FAILED,
869 				    "nsc_reserve failed");
870 				rdc_many_exit(krdc);
871 				rc = -1;
872 #ifdef DEBUG
873 				cmn_err(CE_NOTE, "!nsc_reserve failed "
874 				    "with rc == %d\n", rc);
875 #endif
876 			} else {
877 				rc = 0;
878 			}
879 
880 			if (other) {
881 				p->bi_ofailed++;
882 			} else {
883 				p->bi_failed++;
884 			}
885 
886 			if (krdc->maxfbas == 0) {
887 				/*
888 				 * fake a maxfbas value for remote i/o,
889 				 * this will get reset when the next
890 				 * successful reserve happens as part
891 				 * of the rdc_attach_fd() callback.
892 				 */
893 				krdc->maxfbas = 128;
894 			}
895 		}
896 	}
897 
898 	if (rc == 0 && raw) {
899 		p->bi_flag = flag;
900 	}
901 
902 
903 	return (rc);
904 }
905 
906 /*
907  * _rdc_rsrv_devs
908  *	Reserve named underlying devices.
909  *
910  */
911 
912 int
913 _rdc_rsrv_devs(rdc_k_info_t *krdc, int devs, int flag)
914 {
915 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
916 	int write = 0;
917 	int rc = 0;
918 	int got = 0;
919 
920 	if (!krdc) {
921 		return (EINVAL);
922 	}
923 
924 	ASSERT(!(devs & RDC_CACHE));
925 
926 	mutex_enter(&krdc->devices->id_rlock);
927 
928 	if ((devs & (~RDC_BMP)) != 0) {
929 		if ((rc = _rdc_rsrv_d((devs & RDC_CACHE) == 0,
930 		    &krdc->devices->id_raw_dev, &krdc->devices->id_cache_dev,
931 		    flag, krdc)) != 0) {
932 			if (rc == -1) {
933 				/*
934 				 * we need to call rdc_write_state()
935 				 * after we drop the mutex
936 				 */
937 				write = 1;
938 				rc = 0;
939 			} else {
940 				cmn_err(CE_WARN,
941 				    "!rdc: nsc_reserve(%s) failed %d\n",
942 				    nsc_pathname(krdc->c_fd), rc);
943 			}
944 		} else {
945 			got |= (devs & (~RDC_BMP));
946 		}
947 	}
948 
949 	if (rc == 0 && (devs & RDC_BMP) != 0) {
950 		if (krdc->bitmapfd == NULL)
951 			rc = EIO;
952 		else if ((krdc->bmaprsrv == 0) &&
953 		    (rc = nsc_reserve(krdc->bitmapfd, 0)) != 0) {
954 			cmn_err(CE_WARN, "!rdc: nsc_reserve(%s) failed %d\n",
955 			    nsc_pathname(krdc->bitmapfd), rc);
956 		} else {
957 			krdc->bmaprsrv++;
958 			got |= RDC_BMP;
959 		}
960 		if (!RDC_SUCCESS(rc)) {
961 			/* Undo any previous reserve */
962 			if (got != 0)
963 				_rdc_rlse_d(krdc, got);
964 		}
965 	}
966 
967 	mutex_exit(&krdc->devices->id_rlock);
968 
969 	if (write) {
970 		rdc_write_state(urdc);
971 	}
972 
973 	return (rc);
974 }
975 
976 
977 /*
978  * Read from the remote end, ensuring that if this is a many group in
979  * slave mode that we only remote read from the secondary with the
980  * valid data.
981  */
982 int
983 _rdc_remote_read(rdc_k_info_t *krdc, nsc_buf_t *h, nsc_off_t pos,
984     nsc_size_t len, int flag)
985 {
986 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
987 	rdc_k_info_t *this = krdc;	/* krdc that was requested */
988 	int rc;
989 
990 	if (flag & NSC_RDAHEAD) {
991 		/*
992 		 * no point in doing readahead remotely,
993 		 * just say we did it ok - the client is about to
994 		 * throw this buffer away as soon as we return.
995 		 */
996 		return (NSC_DONE);
997 	}
998 
999 	/*
1000 	 * If this is a many group with a reverse sync in progress and
1001 	 * this is not the slave krdc/urdc, then search for the slave
1002 	 * so that we can do the remote io from the correct secondary.
1003 	 */
1004 	if ((rdc_get_mflags(urdc) & RDC_SLAVE) &&
1005 	    !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
1006 		rdc_many_enter(krdc);
1007 		for (krdc = krdc->many_next; krdc != this;
1008 		    krdc = krdc->many_next) {
1009 			urdc = &rdc_u_info[krdc->index];
1010 			if (!IS_ENABLED(urdc))
1011 				continue;
1012 			if (rdc_get_vflags(urdc) & RDC_SLAVE)
1013 				break;
1014 		}
1015 		rdc_many_exit(krdc);
1016 
1017 		this = krdc;
1018 	}
1019 
1020 read1:
1021 	if (rdc_get_vflags(urdc) & RDC_LOGGING) {
1022 		/* cannot do remote io without the remote node! */
1023 		rc = ENETDOWN;
1024 		goto read2;
1025 	}
1026 
1027 
1028 	/* wait for the remote end to have the latest data */
1029 
1030 	if (IS_ASYNC(urdc)) {
1031 		while (krdc->group->ra_queue.blocks != 0) {
1032 			if (!krdc->group->rdc_writer)
1033 				(void) rdc_writer(krdc->index);
1034 
1035 			(void) rdc_drain_queue(krdc->index);
1036 		}
1037 	}
1038 
1039 	if (krdc->io_kstats) {
1040 		mutex_enter(krdc->io_kstats->ks_lock);
1041 		kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1042 		mutex_exit(krdc->io_kstats->ks_lock);
1043 	}
1044 
1045 	rc = rdc_net_read(krdc->index, krdc->remote_index, h, pos, len);
1046 
1047 	if (krdc->io_kstats) {
1048 		mutex_enter(krdc->io_kstats->ks_lock);
1049 		kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
1050 		mutex_exit(krdc->io_kstats->ks_lock);
1051 	}
1052 
1053 	/* If read error keep trying every secondary until no more */
1054 read2:
1055 	if (!RDC_SUCCESS(rc) && IS_MANY(krdc) &&
1056 	    !(rdc_get_mflags(urdc) & RDC_SLAVE)) {
1057 		rdc_many_enter(krdc);
1058 		for (krdc = krdc->many_next; krdc != this;
1059 		    krdc = krdc->many_next) {
1060 			urdc = &rdc_u_info[krdc->index];
1061 			if (!IS_ENABLED(urdc))
1062 				continue;
1063 			rdc_many_exit(krdc);
1064 			goto read1;
1065 		}
1066 		rdc_many_exit(krdc);
1067 	}
1068 
1069 	return (rc);
1070 }
1071 
1072 
1073 /*
1074  * _rdc_alloc_buf
1075  *	Allocate a buffer of data
1076  *
1077  * Calling/Exit State:
1078  *	Returns NSC_DONE or NSC_HIT for success, NSC_PENDING for async
1079  *	I/O, > 0 is an error code.
1080  *
1081  * Description:
1082  */
1083 int rdcbufs = 0;
1084 
1085 static int
1086 _rdc_alloc_buf(rdc_fd_t *rfd, nsc_off_t pos, nsc_size_t len, int flag,
1087     rdc_buf_t **ptr)
1088 {
1089 	rdc_k_info_t *krdc = rfd->rdc_info;
1090 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1091 	nsc_vec_t *vec = NULL;
1092 	rdc_buf_t *h;
1093 	size_t size;
1094 	int ioflag;
1095 	int rc = 0;
1096 
1097 	if (RDC_IS_BMP(rfd) || RDC_IS_QUE(rfd))
1098 		return (EIO);
1099 
1100 	if (len == 0)
1101 		return (EINVAL);
1102 
1103 	if (flag & NSC_WRBUF) {
1104 
1105 		if (!(rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1106 		    !(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1107 			/*
1108 			 * Forbid writes to secondary unless logging.
1109 			 */
1110 			return (EIO);
1111 		}
1112 	}
1113 
1114 	if (!(rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1115 	    (rdc_get_vflags(urdc) & RDC_SYNC_NEEDED)) {
1116 		/*
1117 		 * Forbid any io to secondary if it needs a sync.
1118 		 */
1119 		return (EIO);
1120 	}
1121 
1122 	if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1123 	    (rdc_get_vflags(urdc) & RDC_RSYNC_NEEDED) &&
1124 	    !(rdc_get_vflags(urdc) & RDC_VOL_FAILED) &&
1125 	    !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
1126 		/*
1127 		 * Forbid any io to primary if it needs a reverse sync
1128 		 * and is not actively syncing.
1129 		 */
1130 		return (EIO);
1131 	}
1132 
1133 	/* Bounds checking */
1134 	ASSERT(urdc->volume_size != 0);
1135 	if (pos + len > urdc->volume_size) {
1136 #ifdef DEBUG
1137 		cmn_err(CE_NOTE,
1138 		    "!rdc: Attempt to access beyond end of rdc volume");
1139 #endif
1140 		return (EIO);
1141 	}
1142 
1143 	h = *ptr;
1144 	if (h == NULL) {
1145 		/* should never happen (nsctl does this for us) */
1146 #ifdef DEBUG
1147 		cmn_err(CE_WARN, "!_rdc_alloc_buf entered without buffer!");
1148 #endif
1149 		h = (rdc_buf_t *)_rdc_alloc_handle(NULL, NULL, NULL, rfd);
1150 		if (h == NULL)
1151 			return (ENOMEM);
1152 
1153 		h->rdc_bufh.sb_flag &= ~NSC_HALLOCATED;
1154 		*ptr = h;
1155 	}
1156 
1157 	if (flag & NSC_NOBLOCK) {
1158 		cmn_err(CE_WARN,
1159 		    "!_rdc_alloc_buf: removing unsupported NSC_NOBLOCK flag");
1160 		flag &= ~(NSC_NOBLOCK);
1161 	}
1162 
1163 	h->rdc_bufh.sb_error = 0;
1164 	h->rdc_bufh.sb_flag |= flag;
1165 	h->rdc_bufh.sb_pos = pos;
1166 	h->rdc_bufh.sb_len = len;
1167 	ioflag = flag;
1168 
1169 	bzero(&h->rdc_sync, sizeof (h->rdc_sync));
1170 	mutex_init(&h->rdc_sync.lock, NULL, MUTEX_DRIVER, NULL);
1171 	cv_init(&h->rdc_sync.cv, NULL, CV_DRIVER, NULL);
1172 
1173 	if (flag & NSC_WRBUF)
1174 		_rdc_async_throttle(krdc, len);	/* throttle incoming io */
1175 
1176 	/*
1177 	 * Use remote io when:
1178 	 * - local volume is failed
1179 	 * - reserve status is failed
1180 	 */
1181 	if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED) || IS_RFAILED(krdc)) {
1182 		rc = EIO;
1183 	} else {
1184 		rc = nsc_alloc_buf(RDC_U_FD(krdc), pos, len,
1185 		    ioflag, &h->rdc_bufp);
1186 		if (!RDC_SUCCESS(rc)) {
1187 			rdc_many_enter(krdc);
1188 			if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1189 				/* Primary, so reverse sync needed */
1190 				rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
1191 			} else {
1192 				/* Secondary, so forward sync needed */
1193 				rdc_set_flags(urdc, RDC_SYNC_NEEDED);
1194 			}
1195 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
1196 			    "nsc_alloc_buf failed");
1197 			rdc_many_exit(krdc);
1198 			rdc_write_state(urdc);
1199 		}
1200 	}
1201 
1202 	if (RDC_SUCCESS(rc)) {
1203 		h->rdc_bufh.sb_vec = h->rdc_bufp->sb_vec;
1204 		h->rdc_flags |= RDC_ALLOC;
1205 
1206 		/*
1207 		 * If in slave and reading data, remote read on top of
1208 		 * the buffer to ensure that we have the latest data.
1209 		 */
1210 		if ((flag & NSC_READ) &&
1211 		    (rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1212 		    (rdc_get_mflags(urdc) & RDC_SLAVE)) {
1213 			rc = _rdc_remote_read(krdc, &h->rdc_bufh,
1214 			    pos, len, flag);
1215 			/*
1216 			 * Set NSC_MIXED so that the
1217 			 * cache will throw away this buffer when we free
1218 			 * it since we have combined data from multiple
1219 			 * sources into a single buffer.
1220 			 */
1221 			h->rdc_bufp->sb_flag |= NSC_MIXED;
1222 		}
1223 	}
1224 
1225 	/*
1226 	 * If nsc_alloc_buf above fails, or local volume is failed or
1227 	 * bitmap is failed or reserve, then we fill the buf from remote
1228 	 */
1229 
1230 	if ((!RDC_SUCCESS(rc)) && (rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1231 	    !(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1232 		if (flag & NSC_NODATA) {
1233 			ASSERT(!(flag & NSC_READ));
1234 			h->rdc_flags |= RDC_REMOTE_BUF;
1235 			h->rdc_bufh.sb_vec = NULL;
1236 		} else {
1237 			size = sizeof (nsc_vec_t) * 2;
1238 			h->rdc_vsize = size + FBA_SIZE(len);
1239 			vec = kmem_zalloc(h->rdc_vsize, KM_SLEEP);
1240 
1241 			if (!vec) {
1242 				rc = ENOMEM;
1243 				goto error;
1244 			}
1245 
1246 			/* single flat buffer */
1247 
1248 			vec[0].sv_addr = (uchar_t *)vec + size;
1249 			vec[0].sv_len  = FBA_SIZE(len);
1250 			vec[0].sv_vme  = 0;
1251 
1252 			/* null terminator */
1253 
1254 			vec[1].sv_addr = NULL;
1255 			vec[1].sv_len  = 0;
1256 			vec[1].sv_vme  = 0;
1257 
1258 			h->rdc_bufh.sb_vec = vec;
1259 			h->rdc_flags |= RDC_REMOTE_BUF;
1260 			h->rdc_flags |= RDC_VEC_ALLOC;
1261 		}
1262 
1263 		if (flag & NSC_READ) {
1264 			rc = _rdc_remote_read(krdc, &h->rdc_bufh,
1265 			    pos, len, flag);
1266 		} else {
1267 			rc = NSC_DONE;
1268 		}
1269 	}
1270 error:
1271 	if (!RDC_SUCCESS(rc)) {
1272 		h->rdc_bufh.sb_error = rc;
1273 	}
1274 
1275 	return (rc);
1276 }
1277 
1278 
1279 /*
1280  * _rdc_free_buf
1281  */
1282 
1283 static int
1284 _rdc_free_buf(rdc_buf_t *h)
1285 {
1286 	int rc = 0;
1287 
1288 	if (h->rdc_flags & RDC_ALLOC) {
1289 		if (h->rdc_bufp) {
1290 			rc = nsc_free_buf(h->rdc_bufp);
1291 		}
1292 		h->rdc_flags &= ~(RDC_ALLOC);
1293 
1294 		if (!RDC_SUCCESS(rc)) {
1295 #ifdef DEBUG
1296 			cmn_err(CE_WARN,
1297 			    "!_rdc_free_buf(%p): nsc_free_buf(%p) returned %d",
1298 			    (void *) h, (void *) h->rdc_bufp, rc);
1299 #endif
1300 			return (rc);
1301 		}
1302 	}
1303 
1304 	if (h->rdc_flags & (RDC_REMOTE_BUF|RDC_VEC_ALLOC)) {
1305 		if (h->rdc_flags & RDC_VEC_ALLOC) {
1306 			kmem_free(h->rdc_bufh.sb_vec, h->rdc_vsize);
1307 		}
1308 		h->rdc_flags &= ~(RDC_REMOTE_BUF|RDC_VEC_ALLOC);
1309 	}
1310 
1311 	if (h->rdc_anon) {
1312 		/* anon buffers still pending */
1313 		DTRACE_PROBE1(rdc_free_buf_err, aio_buf_t, h->rdc_anon);
1314 	}
1315 
1316 	if ((h->rdc_bufh.sb_flag & NSC_HALLOCATED) == 0) {
1317 		rc = _rdc_free_handle(h, h->rdc_fd);
1318 		if (!RDC_SUCCESS(rc)) {
1319 #ifdef DEBUG
1320 			cmn_err(CE_WARN,
1321 			    "!_rdc_free_buf(%p): _rdc_free_handle returned %d",
1322 			    (void *) h, rc);
1323 #endif
1324 			return (rc);
1325 		}
1326 	} else {
1327 		h->rdc_bufh.sb_flag = NSC_HALLOCATED;
1328 		h->rdc_bufh.sb_vec = NULL;
1329 		h->rdc_bufh.sb_error = 0;
1330 		h->rdc_bufh.sb_pos = 0;
1331 		h->rdc_bufh.sb_len = 0;
1332 		h->rdc_anon = NULL;
1333 		h->rdc_vsize = 0;
1334 
1335 		cv_destroy(&h->rdc_sync.cv);
1336 		mutex_destroy(&h->rdc_sync.lock);
1337 
1338 	}
1339 
1340 	return (0);
1341 }
1342 
1343 
1344 /*
1345  * _rdc_open
1346  *	Open a device
1347  *
1348  * Calling/Exit State:
1349  *	Returns a token to identify the device.
1350  *
1351  * Description:
1352  *	Performs the housekeeping operations associated with an upper layer
1353  *	of the nsctl stack opening a device.
1354  */
1355 
1356 /* ARGSUSED */
1357 
1358 static int
1359 _rdc_open(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1360 {
1361 	rdc_k_info_t *krdc;
1362 #ifdef DEBUG
1363 	rdc_u_info_t *urdc;
1364 #endif
1365 	rdc_fd_t *rfd;
1366 	int raw = ((flag & NSC_CACHE) == 0);
1367 	int index;
1368 	int bmp = 0;
1369 	int queue = 0;
1370 
1371 	rfd = kmem_zalloc(sizeof (*rfd), KM_SLEEP);
1372 	if (!rfd)
1373 		return (ENOMEM);
1374 
1375 	/*
1376 	 * Take config lock to prevent a race with the
1377 	 * (de)configuration code.
1378 	 */
1379 
1380 	mutex_enter(&rdc_conf_lock);
1381 
1382 	index = rdc_lookup_enabled(path, 0);
1383 	if (index < 0) {
1384 		index = rdc_lookup_bitmap(path);
1385 		if (index >= 0)
1386 			bmp = 1;
1387 	}
1388 	if (index < 0) {
1389 		index = rdc_lookup_diskq(path);
1390 		if (index >= 0)
1391 			queue = 1;
1392 	}
1393 	if (index < 0) {
1394 		/* not found in config */
1395 		mutex_exit(&rdc_conf_lock);
1396 		kmem_free(rfd, sizeof (*rfd));
1397 		return (ENXIO);
1398 	}
1399 #ifdef DEBUG
1400 	urdc = &rdc_u_info[index];
1401 #endif
1402 	krdc = &rdc_k_info[index];
1403 
1404 	mutex_exit(&rdc_conf_lock);
1405 
1406 	rdc_group_enter(krdc);
1407 
1408 	ASSERT(IS_ENABLED(urdc));
1409 
1410 	if (bmp) {
1411 		krdc->b_ref++;
1412 	} else if (raw) {
1413 		krdc->r_ref++;
1414 	} else if (!queue) {
1415 		krdc->c_ref++;
1416 	}
1417 
1418 	rfd->rdc_info = krdc;
1419 	if (bmp)
1420 		rfd->rdc_type = RDC_BMP;
1421 	else if (queue)
1422 		rfd->rdc_type = RDC_QUE;
1423 	else
1424 		rfd->rdc_oflags = flag;
1425 
1426 	rdc_group_exit(krdc);
1427 
1428 	*cdp = (blind_t)rfd;
1429 
1430 	return (0);
1431 }
1432 
1433 static int
1434 _rdc_openc(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1435 {
1436 	return (_rdc_open(path, NSC_CACHE|flag, cdp, iodev));
1437 }
1438 
1439 static int
1440 _rdc_openr(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1441 {
1442 	return (_rdc_open(path, NSC_DEVICE|flag, cdp, iodev));
1443 }
1444 
1445 
1446 /*
1447  * _rdc_close
1448  *	Close a device
1449  *
1450  * Calling/Exit State:
1451  *	Always succeeds - returns 0
1452  *
1453  * Description:
1454  *	Performs the housekeeping operations associated with an upper layer
1455  *	of the sd stack closing a shadowed device.
1456  */
1457 
1458 static int
1459 _rdc_close(rfd)
1460 rdc_fd_t *rfd;
1461 {
1462 	rdc_k_info_t *krdc = rfd->rdc_info;
1463 	int bmp = RDC_IS_BMP(rfd);
1464 	int raw = RDC_IS_RAW(rfd);
1465 	int queue = RDC_IS_QUE(rfd);
1466 
1467 	/*
1468 	 * we don't keep ref counts for the queue, so skip this stuff.
1469 	 * we may not even have a valid krdc at this point
1470 	 */
1471 	if (queue)
1472 		goto queue;
1473 	rdc_group_enter(krdc);
1474 
1475 	if (bmp) {
1476 		krdc->b_ref--;
1477 	} else if (raw && !queue) {
1478 		krdc->r_ref--;
1479 	} else if (!queue) {
1480 		krdc->c_ref--;
1481 	}
1482 
1483 	if (krdc->closing) {
1484 		cv_broadcast(&krdc->closingcv);
1485 	}
1486 
1487 	rdc_group_exit(krdc);
1488 queue:
1489 	kmem_free(rfd, sizeof (*rfd));
1490 	return (0);
1491 }
1492 
1493 /*
1494  * _rdc_alloc_handle
1495  *	Allocate a handle
1496  *
1497  */
1498 
1499 static nsc_buf_t *
1500 _rdc_alloc_handle(void (*d_cb)(), void (*r_cb)(), void (*w_cb)(), rdc_fd_t *rfd)
1501 {
1502 	rdc_buf_t *h;
1503 
1504 	h = kmem_zalloc(sizeof (*h), KM_SLEEP);
1505 	if (!h)
1506 		return (NULL);
1507 
1508 	h->rdc_bufp = nsc_alloc_handle(RDC_FD(rfd), d_cb, r_cb, w_cb);
1509 	if (!h->rdc_bufp) {
1510 		if (!IS_RFAILED(rfd->rdc_info)) {
1511 			/*
1512 			 * This is a real failure from the io provider below.
1513 			 */
1514 			kmem_free(h, sizeof (*h));
1515 			return (NULL);
1516 		} else {
1517 			/* EMPTY */
1518 			/*
1519 			 * This is just a failed primary device where
1520 			 * we can do remote io to the secondary.
1521 			 */
1522 		}
1523 	}
1524 
1525 	h->rdc_bufh.sb_flag = NSC_HALLOCATED;
1526 	h->rdc_fd = rfd;
1527 	mutex_init(&h->aio_lock, NULL, MUTEX_DRIVER, NULL);
1528 
1529 	return (&h->rdc_bufh);
1530 }
1531 
1532 
1533 /*
1534  * _rdc_free_handle
1535  *	Free a handle
1536  *
1537  */
1538 
1539 /* ARGSUSED */
1540 static int
1541 _rdc_free_handle(rdc_buf_t *h, rdc_fd_t *rfd)
1542 {
1543 	int rc;
1544 
1545 	mutex_destroy(&h->aio_lock);
1546 	if (h->rdc_bufp) {
1547 		rc = nsc_free_handle(h->rdc_bufp);
1548 		if (!RDC_SUCCESS(rc))
1549 			return (rc);
1550 	}
1551 	kmem_free(h, sizeof (rdc_buf_t));
1552 	return (0);
1553 }
1554 
1555 
1556 /*
1557  * _rdc_attach
1558  *	Attach
1559  *
1560  * Calling/Exit State:
1561  *	Returns 0 for success, errno on failure.
1562  *
1563  * Description:
1564  */
1565 
1566 static int
1567 _rdc_attach(rdc_fd_t *rfd, nsc_iodev_t *iodev)
1568 {
1569 	rdc_k_info_t *krdc;
1570 	int raw = RDC_IS_RAW(rfd);
1571 	int rc;
1572 
1573 	if ((RDC_IS_BMP(rfd)) || RDC_IS_QUE(rfd))
1574 		return (EINVAL);
1575 
1576 	krdc = rfd->rdc_info;
1577 	if (krdc == NULL)
1578 		return (EINVAL);
1579 
1580 	mutex_enter(&krdc->devices->id_rlock);
1581 	krdc->iodev = iodev;
1582 	mutex_exit(&krdc->devices->id_rlock);
1583 
1584 	rc = _rdc_rsrv_devs(krdc, (raw ? RDC_RAW : RDC_CACHE), RDC_EXTERNAL);
1585 	return (rc);
1586 }
1587 
1588 
1589 /*
1590  * _rdc_detach
1591  *	Detach
1592  *
1593  * Calling/Exit State:
1594  *	Returns 0 for success, always succeeds
1595  *
1596  * Description:
1597  */
1598 
1599 static int
1600 _rdc_detach(rdc_fd_t *rfd, nsc_iodev_t *iodev)
1601 {
1602 	rdc_k_info_t *krdc = rfd->rdc_info;
1603 	int raw = RDC_IS_RAW(rfd);
1604 
1605 	/*
1606 	 * Flush the async queue if necessary.
1607 	 */
1608 
1609 	if (IS_ASYNC(&rdc_u_info[krdc->index]) && !RDC_IS_DISKQ(krdc->group)) {
1610 		int tries = 1;
1611 
1612 		while (krdc->group->ra_queue.blocks != 0 && tries--) {
1613 			if (!krdc->group->rdc_writer)
1614 				(void) rdc_writer(krdc->index);
1615 
1616 			(void) rdc_drain_queue(krdc->index);
1617 		}
1618 
1619 		/* force disgard of possibly blocked flusher threads */
1620 		if (rdc_drain_queue(krdc->index) != 0) {
1621 #ifdef DEBUG
1622 			net_queue *qp = &krdc->group->ra_queue;
1623 #endif
1624 			do {
1625 				mutex_enter(&krdc->group->ra_queue.net_qlock);
1626 				krdc->group->asyncdis = 1;
1627 				cv_broadcast(&krdc->group->asyncqcv);
1628 				mutex_exit(&krdc->group->ra_queue.net_qlock);
1629 				cmn_err(CE_WARN,
1630 				    "!RDC: async I/O pending and not drained "
1631 				    "for %s during detach",
1632 				    rdc_u_info[krdc->index].primary.file);
1633 #ifdef DEBUG
1634 				cmn_err(CE_WARN,
1635 				    "!nitems: %" NSC_SZFMT " nblocks: %"
1636 				    NSC_SZFMT " head: 0x%p tail: 0x%p",
1637 				    qp->nitems, qp->blocks,
1638 				    (void *)qp->net_qhead,
1639 				    (void *)qp->net_qtail);
1640 #endif
1641 			} while (krdc->group->rdc_thrnum > 0);
1642 		}
1643 	}
1644 
1645 	mutex_enter(&krdc->devices->id_rlock);
1646 	if (krdc->iodev != iodev)
1647 		cmn_err(CE_WARN, "!_rdc_detach: iodev mismatch %p : %p",
1648 		    (void *) krdc->iodev, (void *) iodev);
1649 
1650 	krdc->iodev = NULL;
1651 	mutex_exit(&krdc->devices->id_rlock);
1652 
1653 	_rdc_rlse_devs(krdc, (raw ? RDC_RAW : RDC_CACHE));
1654 
1655 	return (0);
1656 }
1657 
1658 /*
1659  * _rdc_get_pinned
1660  *
1661  * only affects local node.
1662  */
1663 
1664 static int
1665 _rdc_get_pinned(rdc_fd_t *rfd)
1666 {
1667 	return (nsc_get_pinned(RDC_FD(rfd)));
1668 }
1669 
1670 /*
1671  * _rdc_discard_pinned
1672  *
1673  * only affects local node.
1674  */
1675 
1676 static int
1677 _rdc_discard_pinned(rdc_fd_t *rfd, nsc_off_t pos, nsc_size_t len)
1678 {
1679 	return (nsc_discard_pinned(RDC_FD(rfd), pos, len));
1680 }
1681 
1682 /*
1683  * _rdc_partsize
1684  *
1685  * only affects the local node.
1686  */
1687 
1688 static int
1689 _rdc_partsize(rdc_fd_t *rfd, nsc_size_t *ptr)
1690 {
1691 	rdc_u_info_t *urdc;
1692 
1693 	urdc = &rdc_u_info[rfd->rdc_info->index];
1694 	/* Always return saved size */
1695 	ASSERT(urdc->volume_size != 0);
1696 	*ptr = urdc->volume_size;
1697 	return (0);
1698 }
1699 
1700 /*
1701  * _rdc_maxfbas
1702  *
1703  * only affects local node
1704  */
1705 
1706 /* ARGSUSED */
1707 static int
1708 _rdc_maxfbas(rdc_fd_t *rfd, int flag, nsc_size_t *ptr)
1709 {
1710 	rdc_k_info_t *krdc = rfd->rdc_info;
1711 	int raw = RDC_IS_RAW(rfd);
1712 	int rtype = raw ? RDC_RAW : RDC_CACHE;
1713 	int rc = 0;
1714 
1715 	if (krdc == NULL)
1716 		return (EINVAL);
1717 	if (flag == NSC_RDAHEAD || flag == NSC_CACHEBLK) {
1718 		rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
1719 		if (rc == 0) {
1720 			rc = nsc_maxfbas(RDC_U_FD(krdc), flag, ptr);
1721 			_rdc_rlse_devs(krdc, rtype);
1722 		}
1723 	} else {
1724 		/* Always return saved size */
1725 		ASSERT(krdc->maxfbas != 0);
1726 		*ptr = krdc->maxfbas - 1;
1727 	}
1728 
1729 	return (rc);
1730 }
1731 
1732 /* ARGSUSED */
1733 static int
1734 _rdc_control(rdc_fd_t *rfd, int cmd, void *ptr, int len)
1735 {
1736 	return (nsc_control(RDC_FD(rfd),  cmd, ptr, len));
1737 }
1738 
1739 /*
1740  * _rdc_attach_fd
1741  *
1742  * called by nsctl as part of nsc_reserve() processing when one of
1743  * SNDR's underlying file descriptors becomes available and metadata
1744  * should be re-acquired.
1745  */
1746 static int
1747 _rdc_attach_fd(blind_t arg)
1748 {
1749 	_rdc_info_dev_t *dip = (_rdc_info_dev_t *)arg;
1750 	rdc_k_info_t *krdc;
1751 	rdc_u_info_t *urdc;
1752 	nsc_size_t maxfbas, partsize;
1753 	int rc;
1754 
1755 	krdc = dip->bi_krdc;
1756 	urdc = &rdc_u_info[krdc->index];
1757 
1758 	if ((rc = nsc_partsize(dip->bi_fd, &partsize)) != 0) {
1759 		cmn_err(CE_WARN,
1760 		    "!SNDR: cannot get volume size of %s, error %d",
1761 		    nsc_pathname(dip->bi_fd), rc);
1762 	} else if (urdc->volume_size == 0 && partsize > 0) {
1763 		/* set volume size for the first time */
1764 		urdc->volume_size = partsize;
1765 	} else if (urdc->volume_size != partsize) {
1766 		/*
1767 		 * SNDR cannot yet cope with a volume being resized,
1768 		 * so fail it.
1769 		 */
1770 		if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
1771 			rdc_many_enter(krdc);
1772 			if (rdc_get_vflags(urdc) & RDC_PRIMARY)
1773 				rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
1774 			else
1775 				rdc_set_mflags(urdc, RDC_SYNC_NEEDED);
1776 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
1777 			    "volume resized");
1778 			rdc_many_exit(krdc);
1779 			rdc_write_state(urdc);
1780 		}
1781 
1782 		cmn_err(CE_WARN,
1783 		    "!SNDR: %s changed size from %" NSC_SZFMT " to %" NSC_SZFMT,
1784 		    nsc_pathname(dip->bi_fd), urdc->volume_size, partsize);
1785 	}
1786 
1787 	if ((rc = nsc_maxfbas(dip->bi_fd, 0, &maxfbas)) != 0) {
1788 		cmn_err(CE_WARN,
1789 		    "!SNDR: cannot get max transfer size for %s, error %d",
1790 		    nsc_pathname(dip->bi_fd), rc);
1791 	} else if (maxfbas > 0) {
1792 		krdc->maxfbas = min(RDC_MAX_MAXFBAS, maxfbas);
1793 	}
1794 
1795 	return (0);
1796 }
1797 
1798 
1799 /*
1800  * _rdc_pinned
1801  *
1802  * only affects local node
1803  */
1804 
1805 static void
1806 _rdc_pinned(_rdc_info_dev_t *dip, nsc_off_t pos, nsc_size_t len)
1807 {
1808 	nsc_pinned_data(dip->bi_krdc->iodev, pos, len);
1809 }
1810 
1811 
1812 /*
1813  * _rdc_unpinned
1814  *
1815  * only affects local node.
1816  */
1817 
1818 static void
1819 _rdc_unpinned(_rdc_info_dev_t *dip, nsc_off_t pos, nsc_size_t len)
1820 {
1821 	nsc_unpinned_data(dip->bi_krdc->iodev, pos, len);
1822 }
1823 
1824 
1825 /*
1826  * _rdc_read
1827  *
1828  * read the specified data into the buffer - go remote if local down,
1829  * or the remote end has more recent data because an reverse sync is
1830  * in progress.
1831  */
1832 
1833 static int
1834 _rdc_read(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
1835 {
1836 	rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
1837 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1838 	int remote = (RDC_REMOTE(h) || (rdc_get_mflags(urdc) & RDC_SLAVE));
1839 	int rc1, rc2;
1840 
1841 	rc1 = rc2 = 0;
1842 
1843 	if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
1844 		cmn_err(CE_WARN,
1845 		    "!_rdc_read: bounds check: io(handle) pos %" NSC_XSZFMT
1846 		    "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
1847 		    pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
1848 		h->rdc_bufh.sb_error = EINVAL;
1849 		return (h->rdc_bufh.sb_error);
1850 	}
1851 
1852 	if (flag & NSC_NOBLOCK) {
1853 		cmn_err(CE_WARN,
1854 		    "!_rdc_read: removing unsupported NSC_NOBLOCK flag");
1855 		flag &= ~(NSC_NOBLOCK);
1856 	}
1857 
1858 
1859 	if (!remote) {
1860 		rc1 = nsc_read(h->rdc_bufp, pos, len, flag);
1861 	}
1862 
1863 	if (remote || !RDC_SUCCESS(rc1)) {
1864 		rc2 = _rdc_remote_read(krdc, &h->rdc_bufh, pos, len, flag);
1865 	}
1866 
1867 	if (remote && !RDC_SUCCESS(rc2))
1868 		h->rdc_bufh.sb_error = rc2;
1869 	else if (!RDC_SUCCESS(rc1) && !RDC_SUCCESS(rc2))
1870 		h->rdc_bufh.sb_error = rc1;
1871 
1872 	return (h->rdc_bufh.sb_error);
1873 }
1874 
1875 
1876 static int
1877 _rdc_remote_write(rdc_k_info_t *krdc, rdc_buf_t *h, nsc_buf_t *nsc_h,
1878     nsc_off_t pos, nsc_size_t len, int flag, uint_t bitmask)
1879 {
1880 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1881 	int rc = 0;
1882 	nsc_size_t plen, syncblockpos;
1883 	aio_buf_t *anon = NULL;
1884 
1885 	if (!(rdc_get_vflags(urdc) & RDC_PRIMARY))
1886 		return (EINVAL);
1887 
1888 	if ((rdc_get_vflags(urdc) & RDC_LOGGING) &&
1889 	    (!IS_STATE(urdc, RDC_QUEUING))) {
1890 		goto done;
1891 	}
1892 
1893 	/*
1894 	 * this check for RDC_SYNCING may seem redundant, but there is a window
1895 	 * in rdc_sync, where an async set has not yet been transformed into a
1896 	 * sync set.
1897 	 */
1898 	if ((!IS_ASYNC(urdc) || IS_STATE(urdc, RDC_SYNCING)) ||
1899 	    RDC_REMOTE(h) ||
1900 	    krdc->group->synccount > 0 ||
1901 	    (rdc_get_vflags(urdc) & RDC_SLAVE) ||
1902 	    (rdc_get_vflags(urdc) & RDC_VOL_FAILED) ||
1903 	    (rdc_get_vflags(urdc) & RDC_BMP_FAILED)) {
1904 
1905 		/* sync mode, or remote io mode, or local device is dead */
1906 		rc = rdc_net_write(krdc->index, krdc->remote_index,
1907 		    nsc_h, pos, len, RDC_NOSEQ, RDC_NOQUE, NULL);
1908 
1909 		if ((rc == 0) &&
1910 		    !(rdc_get_vflags(urdc) & RDC_BMP_FAILED) &&
1911 		    !(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
1912 			if (IS_STATE(urdc, RDC_SYNCING) &&
1913 			    !IS_STATE(urdc, RDC_FULL) ||
1914 			    !IS_STATE(urdc, RDC_SLAVE)) {
1915 				mutex_enter(&krdc->syncbitmutex);
1916 
1917 				syncblockpos = LOG_TO_FBA_NUM(krdc->syncbitpos);
1918 
1919 				DTRACE_PROBE4(rdc_remote_write,
1920 				    nsc_off_t, krdc->syncbitpos,
1921 				    nsc_off_t, syncblockpos,
1922 				    nsc_off_t, pos,
1923 				    nsc_size_t, len);
1924 
1925 				/*
1926 				 * If the current I/O's position plus length is
1927 				 * greater then the sync block position, only
1928 				 * clear those blocks upto sync block position
1929 				 */
1930 				if (pos < syncblockpos) {
1931 					if ((pos + len) > syncblockpos)
1932 						plen = syncblockpos - pos;
1933 					else
1934 						plen = len;
1935 					RDC_CLR_BITMAP(krdc, pos, plen, bitmask,
1936 					    RDC_BIT_BUMP);
1937 				}
1938 				mutex_exit(&krdc->syncbitmutex);
1939 			} else {
1940 				RDC_CLR_BITMAP(krdc, pos, len, bitmask,
1941 				    RDC_BIT_BUMP);
1942 			}
1943 		} else if (rc != 0) {
1944 			rdc_group_enter(krdc);
1945 			rdc_set_flags_log(urdc, RDC_LOGGING,
1946 			    "net write failed");
1947 			rdc_write_state(urdc);
1948 			if (rdc_get_vflags(urdc) & RDC_SYNCING)
1949 				krdc->disk_status = 1;
1950 			rdc_group_exit(krdc);
1951 		}
1952 	} else if (!IS_STATE(urdc, RDC_SYNCING)) {
1953 		DTRACE_PROBE1(async_enque_start, rdc_buf_t *, h);
1954 
1955 		ASSERT(krdc->group->synccount == 0);
1956 		/* async mode */
1957 		if ((h == NULL) || ((h->rdc_flags & RDC_ASYNC_VEC) == 0)) {
1958 
1959 			rc = _rdc_enqueue_write(krdc, pos, len, flag, NULL);
1960 
1961 		} else {
1962 			anon = rdc_aio_buf_get(h, krdc->index);
1963 			if (anon == NULL) {
1964 #ifdef DEBUG
1965 				cmn_err(CE_WARN,
1966 				    "!enqueue write failed for handle %p",
1967 				    (void *) h);
1968 #endif
1969 				return (EINVAL);
1970 			}
1971 			rc = _rdc_enqueue_write(krdc, pos, len, flag,
1972 			    anon->rdc_abufp);
1973 
1974 			/*
1975 			 * get rid of the aio_buf_t now, as this
1976 			 * may not be the set that this rdc_buf
1977 			 * was allocated on, we are done with it anyways
1978 			 * enqueuing code frees the nsc_abuf
1979 			 */
1980 			rdc_aio_buf_del(h, krdc);
1981 		}
1982 
1983 	} else {
1984 		ASSERT(IS_STATE(urdc, RDC_SYNCING));
1985 		ASSERT(0);
1986 	}
1987 
1988 done:
1989 	if ((anon == NULL) && h && (h->rdc_flags & RDC_ASYNC_VEC)) {
1990 		/*
1991 		 * Toss the anonymous buffer if we have one allocated.
1992 		 */
1993 		anon = rdc_aio_buf_get(h, krdc->index);
1994 		if (anon) {
1995 			(void) nsc_free_buf(anon->rdc_abufp);
1996 			rdc_aio_buf_del(h, krdc);
1997 		}
1998 	}
1999 
2000 	return (rc);
2001 }
2002 
2003 /*
2004  * _rdc_multi_write
2005  *
2006  * Send to multihop remote. Obeys 1 to many if present and we are crazy
2007  * enough to support it.
2008  *
2009  */
2010 int
2011 _rdc_multi_write(nsc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag,
2012     rdc_k_info_t *krdc)
2013 {
2014 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2015 	rdc_k_info_t *this = krdc;	/* krdc that was requested */
2016 	int rc, retval;
2017 	uint_t bitmask;
2018 
2019 	retval = rc = 0;
2020 	if (!RDC_HANDLE_LIMITS(h, pos, len)) {
2021 		cmn_err(CE_WARN,
2022 		    "!_rdc_multi_write: bounds check: io(handle) pos %"
2023 		    NSC_XSZFMT "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%"
2024 		    NSC_XSZFMT ")", pos, h->sb_pos, len, h->sb_len);
2025 		return (EINVAL);
2026 	}
2027 
2028 	/* if this is a 1 to many, set all the bits for all the sets */
2029 	do {
2030 		if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2031 			(void) nsc_uncommit(h, pos, len, flag);
2032 			/* set the error, but try other sets */
2033 			retval = EIO;
2034 		}
2035 		if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
2036 			rdc_many_enter(krdc);
2037 			for (krdc = krdc->many_next; krdc != this;
2038 			    krdc = krdc->many_next) {
2039 				urdc = &rdc_u_info[krdc->index];
2040 				if (!IS_ENABLED(urdc))
2041 					continue;
2042 				break;
2043 			}
2044 			rdc_many_exit(krdc);
2045 		}
2046 	} while (krdc != this);
2047 
2048 	urdc = &rdc_u_info[krdc->index];
2049 
2050 	if (flag & NSC_NOBLOCK) {
2051 		cmn_err(CE_WARN,
2052 		    "!_rdc_multi_write: removing unsupported NSC_NOBLOCK flag");
2053 		flag &= ~(NSC_NOBLOCK);
2054 	}
2055 
2056 multiwrite1:
2057 	if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
2058 	    (!IS_STATE(urdc, RDC_LOGGING) ||
2059 	    (IS_STATE(urdc, RDC_LOGGING) &&
2060 	    IS_STATE(urdc, RDC_QUEUING)))) {
2061 		rc = _rdc_remote_write(krdc, NULL, h, pos, len, flag, bitmask);
2062 	}
2063 
2064 	if (!RDC_SUCCESS(rc) && retval == 0) {
2065 		retval = rc;
2066 	}
2067 
2068 multiwrite2:
2069 	if (IS_MANY(krdc) && (rdc_get_vflags(urdc) && RDC_PRIMARY)) {
2070 		rdc_many_enter(krdc);
2071 		for (krdc = krdc->many_next; krdc != this;
2072 		    krdc = krdc->many_next) {
2073 			urdc = &rdc_u_info[krdc->index];
2074 			if (!IS_ENABLED(urdc))
2075 				continue;
2076 			rc = 0;
2077 			rdc_many_exit(krdc);
2078 
2079 			goto multiwrite1;
2080 		}
2081 		rdc_many_exit(krdc);
2082 	}
2083 
2084 	return (retval);
2085 }
2086 
2087 void
2088 _rdc_diskq_enqueue_thr(rdc_aio_t *p)
2089 {
2090 	rdc_thrsync_t *sync = (rdc_thrsync_t *)p->next;
2091 	rdc_k_info_t *krdc = &rdc_k_info[p->index];
2092 	int rc2;
2093 
2094 
2095 	rc2 = rdc_diskq_enqueue(krdc, p);
2096 
2097 	/*
2098 	 * overload flag with error return if any
2099 	 */
2100 	if (!RDC_SUCCESS(rc2)) {
2101 		p->flag = rc2;
2102 	} else {
2103 		p->flag = 0;
2104 	}
2105 	mutex_enter(&sync->lock);
2106 	sync->complete++;
2107 	cv_broadcast(&sync->cv);
2108 	mutex_exit(&sync->lock);
2109 }
2110 
2111 /*
2112  * _rdc_sync_write_thr
2113  * syncronous write thread which writes to network while
2114  * local write is occuring
2115  */
2116 void
2117 _rdc_sync_write_thr(rdc_aio_t *p)
2118 {
2119 	rdc_thrsync_t *sync = (rdc_thrsync_t *)p->next;
2120 	rdc_buf_t *h = (rdc_buf_t *)p->handle;
2121 	rdc_k_info_t *krdc = &rdc_k_info[p->index];
2122 #ifdef	DEBUG
2123 	rdc_u_info_t *urdc;
2124 #endif
2125 	int rc2;
2126 	int bitmask;
2127 
2128 	rdc_group_enter(krdc);
2129 	krdc->aux_state |= RDC_AUXWRITE;
2130 #ifdef	DEBUG
2131 	urdc = &rdc_u_info[krdc->index];
2132 	if (!IS_ENABLED(urdc)) {
2133 		cmn_err(CE_WARN, "!rdc_sync_write_thr: set not enabled %s:%s",
2134 		    urdc->secondary.file,
2135 		    urdc->secondary.bitmap);
2136 	}
2137 #endif
2138 	rdc_group_exit(krdc);
2139 	bitmask = p->iostatus;	/* overload */
2140 	rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh, p->pos, p->len,
2141 	    p->flag, bitmask);
2142 
2143 
2144 	/*
2145 	 * overload flag with error return if any
2146 	 */
2147 	if (!RDC_SUCCESS(rc2)) {
2148 		p->flag = rc2;
2149 	} else {
2150 		p->flag = 0;
2151 	}
2152 
2153 	rdc_group_enter(krdc);
2154 	krdc->aux_state &= ~RDC_AUXWRITE;
2155 	rdc_group_exit(krdc);
2156 
2157 	mutex_enter(&sync->lock);
2158 	sync->complete++;
2159 	cv_broadcast(&sync->cv);
2160 	mutex_exit(&sync->lock);
2161 }
2162 
2163 /*
2164  * _rdc_write
2165  *
2166  * Commit changes to the buffer locally and send remote.
2167  *
2168  * If this write is whilst the local primary volume is being synced,
2169  * then we write the remote end first to ensure that the new data
2170  * cannot be overwritten by a concurrent sync operation.
2171  */
2172 
2173 static int
2174 _rdc_write(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2175 {
2176 	rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
2177 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2178 	rdc_k_info_t *this;
2179 	rdc_k_info_t *multi = NULL;
2180 	int remote = RDC_REMOTE(h);
2181 	int rc1, rc2;
2182 	uint_t bitmask;
2183 	int first;
2184 	int rsync;
2185 	int nthr;
2186 	int winddown;
2187 	int thrrc = 0;
2188 	rdc_aio_t *bp[SNDR_MAXTHREADS];
2189 	aio_buf_t *anon;
2190 	nsthread_t  *tp;
2191 	rdc_thrsync_t *sync = &h->rdc_sync;
2192 
2193 	/* If this is the multi-hop secondary, move along to the primary */
2194 	if (IS_MULTI(krdc) && !IS_PRIMARY(urdc)) {
2195 		multi = krdc;
2196 		krdc = krdc->multi_next;
2197 		urdc = &rdc_u_info[krdc->index];
2198 
2199 		if (!IS_ENABLED(urdc)) {
2200 			krdc = h->rdc_fd->rdc_info;
2201 			urdc = &rdc_u_info[krdc->index];
2202 			multi = NULL;
2203 		}
2204 	}
2205 	this = krdc;
2206 
2207 	rsync = (IS_PRIMARY(urdc)) && (IS_SLAVE(urdc));
2208 
2209 	/*
2210 	 * If this is a many group with a reverse sync in progress and
2211 	 * this is not the slave krdc/urdc, then search for the slave
2212 	 * so that we can do the remote io to the correct secondary
2213 	 * before the local io.
2214 	 */
2215 	if (rsync && !(IS_SLAVE(urdc))) {
2216 		rdc_many_enter(krdc);
2217 		for (krdc = krdc->many_next; krdc != this;
2218 		    krdc = krdc->many_next) {
2219 			urdc = &rdc_u_info[krdc->index];
2220 			if (!IS_ENABLED(urdc))
2221 				continue;
2222 			if (rdc_get_vflags(urdc) & RDC_SLAVE)
2223 				break;
2224 		}
2225 		rdc_many_exit(krdc);
2226 
2227 		this = krdc;
2228 	}
2229 
2230 	urdc = &rdc_u_info[krdc->index];
2231 
2232 	rc1 = rc2 = 0;
2233 	first = 1;
2234 	nthr = 0;
2235 	if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2236 		cmn_err(CE_WARN,
2237 		    "!_rdc_write: bounds check: io(handle) pos %" NSC_XSZFMT
2238 		    "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2239 		    pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2240 		h->rdc_bufh.sb_error = EINVAL;
2241 		return (h->rdc_bufh.sb_error);
2242 	}
2243 
2244 	DTRACE_PROBE(rdc_write_bitmap_start);
2245 
2246 	/* if this is a 1 to many, set all the bits for all the sets */
2247 	do {
2248 		if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2249 			if (rdc_eio_nobmp) {
2250 				(void) nsc_uncommit
2251 				    (h->rdc_bufp, pos, len, flag);
2252 				/* set the error, but try the other sets */
2253 				h->rdc_bufh.sb_error = EIO;
2254 			}
2255 		}
2256 
2257 		if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
2258 			rdc_many_enter(krdc);
2259 			for (krdc = krdc->many_next; krdc != this;
2260 			    krdc = krdc->many_next) {
2261 				urdc = &rdc_u_info[krdc->index];
2262 				if (!IS_ENABLED(urdc))
2263 					continue;
2264 				break;
2265 			}
2266 			rdc_many_exit(krdc);
2267 		}
2268 
2269 	} while (krdc != this);
2270 
2271 	urdc = &rdc_u_info[krdc->index];
2272 
2273 	DTRACE_PROBE(rdc_write_bitmap_end);
2274 
2275 write1:
2276 	/* just in case we switch mode during write */
2277 	if (IS_ASYNC(urdc) && (!IS_STATE(urdc, RDC_SYNCING)) &&
2278 	    (!IS_STATE(urdc, RDC_LOGGING) ||
2279 	    IS_STATE(urdc, RDC_QUEUING))) {
2280 		h->rdc_flags |= RDC_ASYNC_BUF;
2281 	}
2282 	if (BUF_IS_ASYNC(h)) {
2283 		/*
2284 		 * We are async mode
2285 		 */
2286 		aio_buf_t *p;
2287 		DTRACE_PROBE(rdc_write_async_start);
2288 
2289 		if ((krdc->type_flag & RDC_DISABLEPEND) ||
2290 		    ((IS_STATE(urdc, RDC_LOGGING) &&
2291 		    !IS_STATE(urdc, RDC_QUEUING)))) {
2292 			goto localwrite;
2293 		}
2294 		if (IS_STATE(urdc, RDC_VOL_FAILED)) {
2295 			/*
2296 			 * overload remote as we don't want to do local
2297 			 * IO later. forge ahead with async
2298 			 */
2299 			remote++;
2300 		}
2301 		if ((IS_STATE(urdc, RDC_SYNCING)) ||
2302 		    (IS_STATE(urdc, RDC_LOGGING) &&
2303 		    !IS_STATE(urdc, RDC_QUEUING))) {
2304 			goto localwrite;
2305 		}
2306 
2307 		p = rdc_aio_buf_add(krdc->index, h);
2308 		if (p == NULL) {
2309 #ifdef DEBUG
2310 			cmn_err(CE_WARN,
2311 			    "!rdc_alloc_buf  aio_buf allocation failed");
2312 #endif
2313 			goto localwrite;
2314 		}
2315 
2316 		mutex_enter(&h->aio_lock);
2317 
2318 		DTRACE_PROBE(rdc_write_async__allocabuf_start);
2319 		rc1 = nsc_alloc_abuf(pos, len, 0, &p->rdc_abufp);
2320 		DTRACE_PROBE(rdc_write_async__allocabuf_end);
2321 		if (!RDC_SUCCESS(rc1)) {
2322 #ifdef DEBUG
2323 			cmn_err(CE_WARN,
2324 			    "!rdc_alloc_buf NSC_ANON allocation failed rc %d",
2325 			    rc1);
2326 #endif
2327 			mutex_exit(&h->aio_lock);
2328 			goto localwrite;
2329 		}
2330 		h->rdc_flags |= RDC_ASYNC_VEC;
2331 		mutex_exit(&h->aio_lock);
2332 
2333 		/*
2334 		 * Copy buffer into anonymous buffer
2335 		 */
2336 
2337 		DTRACE_PROBE(rdc_write_async_nsccopy_start);
2338 		rc1 =
2339 		    nsc_copy(&h->rdc_bufh, p->rdc_abufp, pos, pos, len);
2340 		DTRACE_PROBE(rdc_write_async_nsccopy_end);
2341 		if (!RDC_SUCCESS(rc1)) {
2342 #ifdef DEBUG
2343 			cmn_err(CE_WARN,
2344 			    "!_rdc_write: nsc_copy failed rc=%d state %x",
2345 			    rc1, rdc_get_vflags(urdc));
2346 #endif
2347 			rc1 = nsc_free_buf(p->rdc_abufp);
2348 			rdc_aio_buf_del(h, krdc);
2349 			rdc_group_enter(krdc);
2350 			rdc_group_log(krdc, RDC_FLUSH|RDC_OTHERREMOTE,
2351 			    "nsc_copy failure");
2352 			rdc_group_exit(krdc);
2353 		}
2354 		DTRACE_PROBE(rdc_write_async_end);
2355 
2356 		/*
2357 		 * using a diskq, launch a thread to queue it
2358 		 * and free the aio->h and aio
2359 		 * if the thread fails, do it the old way (see localwrite)
2360 		 */
2361 
2362 		if (RDC_IS_DISKQ(krdc->group)) {
2363 
2364 			if (nthr >= SNDR_MAXTHREADS) {
2365 #ifdef DEBUG
2366 				cmn_err(CE_NOTE, "!nthr overrun in _rdc_write");
2367 #endif
2368 				thrrc = ENOEXEC;
2369 				goto localwrite;
2370 			}
2371 
2372 			anon = rdc_aio_buf_get(h, krdc->index);
2373 			if (anon == NULL) {
2374 #ifdef DEBUG
2375 				cmn_err(CE_WARN, "!rdc_aio_buf_get failed for "
2376 				    "%p", (void *)h);
2377 #endif
2378 				thrrc = ENOEXEC;
2379 				goto localwrite;
2380 			}
2381 
2382 			/* get a populated rdc_aio_t */
2383 			bp[nthr] =
2384 			    rdc_aio_tbuf_get(sync, anon->rdc_abufp, pos, len,
2385 			    flag, krdc->index, bitmask);
2386 
2387 			if (bp[nthr] == NULL) {
2388 #ifdef DEBUG
2389 				cmn_err(CE_NOTE, "!_rdcwrite: "
2390 				    "kmem_alloc failed bp aio (1)");
2391 #endif
2392 				thrrc = ENOEXEC;
2393 				goto localwrite;
2394 			}
2395 			/* start the queue io */
2396 			tp = nst_create(_rdc_ioset, _rdc_diskq_enqueue_thr,
2397 			    (void *)bp[nthr], NST_SLEEP);
2398 
2399 			if (tp == NULL) {
2400 #ifdef DEBUG
2401 				cmn_err(CE_NOTE,
2402 				    "!_rdcwrite: nst_create failure");
2403 #endif
2404 				thrrc = ENOEXEC;
2405 			} else {
2406 				mutex_enter(&(sync->lock));
2407 				sync->threads++;
2408 				mutex_exit(&(sync->lock));
2409 				nthr++;
2410 
2411 			}
2412 			/*
2413 			 * the handle that is to be enqueued is now in
2414 			 * the rdc_aio_t, and will be freed there.
2415 			 * dump the aio_t now. If this is 1 to many
2416 			 * we may not do this in _rdc_free_buf()
2417 			 * if this was not the index that the rdc_buf_t
2418 			 * was allocated on.
2419 			 */
2420 			rdc_aio_buf_del(h, krdc);
2421 
2422 		}
2423 	}	/* end of async */
2424 
2425 	/*
2426 	 * We try to overlap local and network IO for the sync case
2427 	 * (we already do it for async)
2428 	 * If one to many, we need to track the resulting nst_thread
2429 	 * so we don't trash the nsc_buf on a free
2430 	 * Start network IO first then do local (sync only)
2431 	 */
2432 
2433 	if (IS_PRIMARY(urdc) && !IS_STATE(urdc, RDC_LOGGING) &&
2434 	    !BUF_IS_ASYNC(h)) {
2435 		/*
2436 		 * if forward syncing, we must do local IO first
2437 		 * then remote io. Don't spawn thread
2438 		 */
2439 		if (!rsync && (IS_STATE(urdc, RDC_SYNCING))) {
2440 			thrrc = ENOEXEC;
2441 			goto localwrite;
2442 		}
2443 		if (IS_MULTI(krdc)) {
2444 			rdc_k_info_t *ktmp;
2445 			rdc_u_info_t *utmp;
2446 
2447 			ktmp = krdc->multi_next;
2448 			utmp = &rdc_u_info[ktmp->index];
2449 			if (IS_ENABLED(utmp))
2450 				multi = ktmp;
2451 		}
2452 		if (nthr >= SNDR_MAXTHREADS) {
2453 #ifdef DEBUG
2454 			cmn_err(CE_NOTE, "!nthr overrun in _rdc_write");
2455 #endif
2456 			thrrc = ENOEXEC;
2457 			goto localwrite;
2458 		}
2459 
2460 		bp[nthr] = rdc_aio_tbuf_get(sync, h, pos, len,
2461 		    flag, krdc->index, bitmask);
2462 
2463 		if (bp[nthr] == NULL) {
2464 			thrrc = ENOEXEC;
2465 			goto localwrite;
2466 		}
2467 		tp = nst_create(_rdc_ioset, _rdc_sync_write_thr,
2468 		    (void *)bp[nthr], NST_SLEEP);
2469 		if (tp == NULL) {
2470 #ifdef DEBUG
2471 			cmn_err(CE_NOTE, "!_rdcwrite: nst_create failure");
2472 #endif
2473 			thrrc = ENOEXEC;
2474 		} else {
2475 			mutex_enter(&(sync->lock));
2476 			sync->threads++;
2477 			mutex_exit(&(sync->lock));
2478 			nthr++;
2479 		}
2480 	}
2481 localwrite:
2482 	if (!remote && !rsync && first) {
2483 		DTRACE_PROBE(rdc_write_nscwrite_start);
2484 		rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2485 		DTRACE_PROBE(rdc_write_nscwrite_end);
2486 		if (!RDC_SUCCESS(rc1)) {
2487 			rdc_many_enter(krdc);
2488 			if (IS_PRIMARY(urdc))
2489 				/* Primary, so reverse sync needed */
2490 				rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2491 			else
2492 				/* Secondary, so sync needed */
2493 				rdc_set_flags(urdc, RDC_SYNC_NEEDED);
2494 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2495 			    "local write failed");
2496 			rdc_many_exit(krdc);
2497 			rdc_write_state(urdc);
2498 		}
2499 	}
2500 
2501 	/*
2502 	 * This is where we either enqueue async IO for the flusher
2503 	 * or do sync IO in the case of an error in thread creation
2504 	 * or we are doing a forward sync
2505 	 * NOTE: if we are async, and using a diskq, we have
2506 	 * already enqueued this write.
2507 	 * _rdc_remote_write will end up enqueuueing to memory,
2508 	 * or in case of a thread creation error above, try again
2509 	 * enqueue the diskq write if thrrc == ENOEXEC
2510 	 */
2511 	if ((IS_PRIMARY(urdc)) && (thrrc == ENOEXEC) ||
2512 	    (BUF_IS_ASYNC(h) && !RDC_IS_DISKQ(krdc->group))) {
2513 		thrrc = 0;
2514 		if (IS_MULTI(krdc)) {
2515 			rdc_k_info_t *ktmp;
2516 			rdc_u_info_t *utmp;
2517 
2518 			ktmp = krdc->multi_next;
2519 			utmp = &rdc_u_info[ktmp->index];
2520 			if (IS_ENABLED(utmp))
2521 				multi = ktmp;
2522 		}
2523 
2524 		DTRACE_PROBE(rdc_write_remote_start);
2525 
2526 		rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh,
2527 		    pos, len, flag, bitmask);
2528 
2529 		DTRACE_PROBE(rdc_rdcwrite_remote_end);
2530 	}
2531 
2532 	if (!RDC_SUCCESS(rc1)) {
2533 		if ((IS_PRIMARY(urdc)) && !RDC_SUCCESS(rc2)) {
2534 			h->rdc_bufh.sb_error = rc1;
2535 		}
2536 	} else if ((remote || rsync) && !RDC_SUCCESS(rc2)) {
2537 		h->rdc_bufh.sb_error = rc2;
2538 	}
2539 write2:
2540 	/*
2541 	 * If one to many, jump back into the loop to continue IO
2542 	 */
2543 	if (IS_MANY(krdc) && (IS_PRIMARY(urdc))) {
2544 		rdc_many_enter(krdc);
2545 		for (krdc = krdc->many_next; krdc != this;
2546 		    krdc = krdc->many_next) {
2547 			urdc = &rdc_u_info[krdc->index];
2548 			if (!IS_ENABLED(urdc))
2549 				continue;
2550 			rc2 = first = 0;
2551 			h->rdc_flags &= ~RDC_ASYNC_BUF;
2552 			rdc_many_exit(krdc);
2553 			goto write1;
2554 		}
2555 		rdc_many_exit(krdc);
2556 	}
2557 	urdc = &rdc_u_info[krdc->index];
2558 
2559 	/*
2560 	 * collect all of our threads if any
2561 	 */
2562 	if (nthr) {
2563 
2564 		mutex_enter(&(sync->lock));
2565 		/* wait for the threads */
2566 		while (sync->complete != sync->threads) {
2567 			cv_wait(&(sync->cv), &(sync->lock));
2568 		}
2569 		mutex_exit(&(sync->lock));
2570 
2571 		/* collect status */
2572 
2573 		winddown = 0;
2574 		while (winddown < nthr) {
2575 			/*
2576 			 * Get any error return from thread
2577 			 */
2578 			if ((remote || rsync) && bp[winddown]->flag) {
2579 				h->rdc_bufh.sb_error = bp[winddown]->flag;
2580 			}
2581 			if (bp[winddown])
2582 				kmem_free(bp[winddown], sizeof (rdc_aio_t));
2583 			winddown++;
2584 		}
2585 	}
2586 
2587 	if (rsync && !(IS_STATE(urdc, RDC_VOL_FAILED))) {
2588 		rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2589 		if (!RDC_SUCCESS(rc1)) {
2590 			/* rsync, so reverse sync needed already set */
2591 			rdc_many_enter(krdc);
2592 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2593 			    "rsync local write failed");
2594 			rdc_many_exit(krdc);
2595 			rdc_write_state(urdc);
2596 
2597 			/*
2598 			 * only report the error if a remote error
2599 			 * occurred as well.
2600 			 */
2601 			if (h->rdc_bufh.sb_error)
2602 				h->rdc_bufh.sb_error = rc1;
2603 		}
2604 	}
2605 
2606 	if (multi) {
2607 		/* Multi-hop secondary, just set bits in the bitmap */
2608 		(void) RDC_SET_BITMAP(multi, pos, len, &bitmask);
2609 	}
2610 
2611 	return (h->rdc_bufh.sb_error);
2612 }
2613 
2614 
2615 static void
2616 _rdc_bzero(nsc_buf_t *h, nsc_off_t pos, nsc_size_t len)
2617 {
2618 	nsc_vec_t *v;
2619 	uchar_t *a;
2620 	size_t sz;
2621 	int l;
2622 
2623 	if (!RDC_HANDLE_LIMITS(h, pos, len)) {
2624 		cmn_err(CE_WARN,
2625 		    "!_rdc_bzero: bounds check: io(handle) pos %" NSC_XSZFMT
2626 		    "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2627 		    pos, h->sb_pos, len, h->sb_len);
2628 		return;
2629 	}
2630 
2631 	if (!len)
2632 		return;
2633 
2634 	/* find starting point */
2635 
2636 	v = h->sb_vec;
2637 	pos -= h->sb_pos;
2638 
2639 	for (; pos >= FBA_NUM(v->sv_len); v++)
2640 		pos -= FBA_NUM(v->sv_len);
2641 
2642 	a = v->sv_addr + FBA_SIZE(pos);
2643 	l = v->sv_len - FBA_SIZE(pos);
2644 
2645 	/* zero */
2646 
2647 	len = FBA_SIZE(len);	/* convert to bytes */
2648 
2649 	while (len) {
2650 		if (!a)		/* end of vec */
2651 			break;
2652 
2653 		sz = (size_t)min((nsc_size_t)l, len);
2654 
2655 		bzero(a, sz);
2656 
2657 		len -= sz;
2658 		l -= sz;
2659 		a += sz;
2660 
2661 		if (!l) {
2662 			v++;
2663 			a = v->sv_addr;
2664 			l = v->sv_len;
2665 		}
2666 	}
2667 }
2668 
2669 
2670 /*
2671  * _rdc_zero
2672  *
2673  * Zero and commit the specified area of the buffer.
2674  *
2675  * If this write is whilst the local primary volume is being synced,
2676  * then we write the remote end first to ensure that the new data
2677  * cannot be overwritten by a concurrent sync operation.
2678  */
2679 
2680 static int
2681 _rdc_zero(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2682 {
2683 	rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
2684 	rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2685 	rdc_k_info_t *this;
2686 	rdc_k_info_t *multi = NULL;
2687 	int remote = RDC_REMOTE(h);
2688 	int rc1, rc2;
2689 	uint_t bitmask;
2690 	int first;
2691 	int rsync;
2692 
2693 	/* If this is the multi-hop secondary, move along to the primary */
2694 	if (IS_MULTI(krdc) && !(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
2695 		multi = krdc;
2696 		krdc = krdc->multi_next;
2697 		urdc = &rdc_u_info[krdc->index];
2698 
2699 		if (!IS_ENABLED(urdc)) {
2700 			krdc = h->rdc_fd->rdc_info;
2701 			urdc = &rdc_u_info[krdc->index];
2702 			multi = NULL;
2703 		}
2704 	}
2705 	this = krdc;
2706 
2707 	rsync = ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
2708 	    (rdc_get_mflags(urdc) & RDC_SLAVE));
2709 
2710 	/*
2711 	 * If this is a many group with a reverse sync in progress and
2712 	 * this is not the slave krdc/urdc, then search for the slave
2713 	 * so that we can do the remote io to the correct secondary
2714 	 * before the local io.
2715 	 */
2716 	if (rsync && !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
2717 		rdc_many_enter(krdc);
2718 		for (krdc = krdc->many_next; krdc != this;
2719 		    krdc = krdc->many_next) {
2720 			urdc = &rdc_u_info[krdc->index];
2721 			if (!IS_ENABLED(urdc))
2722 				continue;
2723 			if (rdc_get_vflags(urdc) & RDC_SLAVE)
2724 				break;
2725 		}
2726 		rdc_many_exit(krdc);
2727 
2728 		this = krdc;
2729 	}
2730 
2731 	rc1 = rc2 = 0;
2732 	first = 1;
2733 
2734 	if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2735 		cmn_err(CE_WARN,
2736 		    "!_rdc_zero: bounds check: io(handle) pos %" NSC_XSZFMT
2737 		    "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2738 		    pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2739 		h->rdc_bufh.sb_error = EINVAL;
2740 		return (h->rdc_bufh.sb_error);
2741 	}
2742 
2743 zero1:
2744 	if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2745 		(void) nsc_uncommit(h->rdc_bufp, pos, len, flag);
2746 		h->rdc_bufh.sb_error = EIO;
2747 		goto zero2;
2748 	}
2749 
2750 	if (IS_ASYNC(urdc)) {
2751 		/*
2752 		 * We are async mode
2753 		 */
2754 		aio_buf_t *p;
2755 
2756 		if ((krdc->type_flag & RDC_DISABLEPEND) ||
2757 		    (rdc_get_vflags(urdc) & RDC_LOGGING)) {
2758 			mutex_exit(&krdc->group->ra_queue.net_qlock);
2759 			goto localzero;
2760 		}
2761 
2762 		if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED) ||
2763 		    (rdc_get_vflags(urdc) & RDC_BMP_FAILED)) {
2764 			mutex_exit(&krdc->group->ra_queue.net_qlock);
2765 			goto zero2;
2766 		}
2767 		if (rdc_get_vflags(urdc) & RDC_LOGGING) {
2768 			mutex_exit(&krdc->group->ra_queue.net_qlock);
2769 			goto localzero;
2770 		}
2771 		p = rdc_aio_buf_add(krdc->index, h);
2772 		if (p == NULL) {
2773 #ifdef DEBUG
2774 			cmn_err(CE_WARN,
2775 			    "!rdc_alloc_buf  aio_buf allocation failed");
2776 #endif
2777 			goto localzero;
2778 		}
2779 		mutex_enter(&h->aio_lock);
2780 		rc1 = nsc_alloc_abuf(pos, len, 0, &p->rdc_abufp);
2781 		if (!RDC_SUCCESS(rc1)) {
2782 #ifdef DEBUG
2783 			cmn_err(CE_WARN,
2784 			    "!rdc_alloc_buf NSC_ANON allocation failed rc %d",
2785 			    rc1);
2786 #endif
2787 			mutex_exit(&h->aio_lock);
2788 			goto localzero;
2789 		}
2790 		h->rdc_flags |= RDC_ASYNC_VEC;
2791 		mutex_exit(&h->aio_lock);
2792 
2793 		/*
2794 		 * Copy buffer into anonymous buffer
2795 		 */
2796 
2797 		rc1 = nsc_zero(p->rdc_abufp, pos, len, flag);
2798 		if (!RDC_SUCCESS(rc1)) {
2799 #ifdef DEBUG
2800 			cmn_err(CE_WARN,
2801 			    "!_rdc_zero: nsc_zero failed rc=%d state %x",
2802 			    rc1, rdc_get_vflags(urdc));
2803 #endif
2804 			rc1 = nsc_free_buf(p->rdc_abufp);
2805 			rdc_aio_buf_del(h, krdc);
2806 			rdc_group_enter(krdc);
2807 			rdc_group_log(krdc, RDC_FLUSH | RDC_OTHERREMOTE,
2808 			    "nsc_zero failed");
2809 			rdc_group_exit(krdc);
2810 		}
2811 	}	/* end of async */
2812 
2813 localzero:
2814 
2815 	if (flag & NSC_NOBLOCK) {
2816 		cmn_err(CE_WARN,
2817 		    "!_rdc_zero: removing unsupported NSC_NOBLOCK flag");
2818 		flag &= ~(NSC_NOBLOCK);
2819 	}
2820 
2821 	if (!remote && !rsync && first) {
2822 		rc1 = nsc_zero(h->rdc_bufp, pos, len, flag);
2823 		if (!RDC_SUCCESS(rc1)) {
2824 			ASSERT(rdc_get_vflags(urdc) & RDC_PRIMARY);
2825 			rdc_many_enter(krdc);
2826 			/* Primary, so reverse sync needed */
2827 			rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2828 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2829 			    "nsc_zero failed");
2830 			rdc_many_exit(krdc);
2831 			rdc_write_state(urdc);
2832 		}
2833 	}
2834 
2835 	/*
2836 	 * send new data to remote end - nsc_zero has zero'd
2837 	 * the data in the buffer, or _rdc_bzero will be used below.
2838 	 */
2839 
2840 	if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
2841 		if (first && (remote || rsync || !RDC_SUCCESS(rc1))) {
2842 			/* bzero so that we can send new data to remote node */
2843 			_rdc_bzero(&h->rdc_bufh, pos, len);
2844 		}
2845 
2846 		if (IS_MULTI(krdc)) {
2847 			rdc_k_info_t *ktmp;
2848 			rdc_u_info_t *utmp;
2849 
2850 			ktmp = krdc->multi_next;
2851 			utmp = &rdc_u_info[ktmp->index];
2852 			if (IS_ENABLED(utmp))
2853 				multi = ktmp;
2854 		}
2855 
2856 		rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh,
2857 		    pos, len, flag, bitmask);
2858 	}
2859 
2860 	if (!RDC_SUCCESS(rc1)) {
2861 		if ((rdc_get_vflags(urdc) & RDC_PRIMARY) && !RDC_SUCCESS(rc2)) {
2862 			h->rdc_bufh.sb_error = rc1;
2863 		}
2864 	} else if ((remote || rsync) && !RDC_SUCCESS(rc2)) {
2865 		h->rdc_bufh.sb_error = rc2;
2866 	}
2867 
2868 zero2:
2869 	if (IS_MANY(krdc) && (rdc_get_vflags(urdc) && RDC_PRIMARY)) {
2870 		rdc_many_enter(krdc);
2871 		for (krdc = krdc->many_next; krdc != this;
2872 		    krdc = krdc->many_next) {
2873 			urdc = &rdc_u_info[krdc->index];
2874 			if (!IS_ENABLED(urdc))
2875 				continue;
2876 			rc2 = first = 0;
2877 			rdc_many_exit(krdc);
2878 			goto zero1;
2879 		}
2880 		rdc_many_exit(krdc);
2881 	}
2882 
2883 	if (rsync && !(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
2884 		rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2885 		if (!RDC_SUCCESS(rc1)) {
2886 			/* rsync, so reverse sync needed already set */
2887 			rdc_many_enter(krdc);
2888 			rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2889 			    "nsc_write failed");
2890 			rdc_many_exit(krdc);
2891 			rdc_write_state(urdc);
2892 
2893 			/*
2894 			 * only report the error if a remote error
2895 			 * occurred as well.
2896 			 */
2897 			if (h->rdc_bufh.sb_error)
2898 				h->rdc_bufh.sb_error = rc1;
2899 		}
2900 	}
2901 
2902 	if (multi) {
2903 		/* Multi-hop secondary, just set bits in the bitmap */
2904 		(void) RDC_SET_BITMAP(multi, pos, len, &bitmask);
2905 	}
2906 
2907 	return (h->rdc_bufh.sb_error);
2908 }
2909 
2910 
2911 /*
2912  * _rdc_uncommit
2913  * - refresh specified data region in the buffer to prevent the cache
2914  *   serving the scribbled on data back to another client.
2915  *
2916  * Only needs to happen on the local node.  If in remote io mode, then
2917  * just return 0 - we do not cache the data on the local node and the
2918  * changed data will not have made it to the cache on the other node,
2919  * so it has no need to uncommit.
2920  */
2921 
2922 static int
2923 _rdc_uncommit(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2924 {
2925 	int remote = RDC_REMOTE(h);
2926 	int rc = 0;
2927 
2928 	if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2929 		cmn_err(CE_WARN,
2930 		    "!_rdc_uncommit: bounds check: io(handle) pos %" NSC_XSZFMT
2931 		    "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2932 		    pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2933 		h->rdc_bufh.sb_error = EINVAL;
2934 		return (h->rdc_bufh.sb_error);
2935 	}
2936 
2937 	if (flag & NSC_NOBLOCK) {
2938 		cmn_err(CE_WARN,
2939 		    "!_rdc_uncommit: removing unsupported NSC_NOBLOCK flag");
2940 		flag &= ~(NSC_NOBLOCK);
2941 	}
2942 
2943 	if (!remote) {
2944 		rc = nsc_uncommit(h->rdc_bufp, pos, len, flag);
2945 	}
2946 
2947 	if (!RDC_SUCCESS(rc))
2948 		h->rdc_bufh.sb_error = rc;
2949 
2950 	return (rc);
2951 }
2952 
2953 
2954 /*
2955  * _rdc_trksize
2956  *
2957  * only needs to happen on local node.
2958  */
2959 
2960 static int
2961 _rdc_trksize(rdc_fd_t *rfd, nsc_size_t trksize)
2962 {
2963 	return (nsc_set_trksize(RDC_FD(rfd), trksize));
2964 }
2965 
2966 
2967 static nsc_def_t _rdc_fd_def[] = {
2968 	"Attach",	(uintptr_t)_rdc_attach_fd,	0,
2969 	"Pinned",	(uintptr_t)_rdc_pinned,		0,
2970 	"Unpinned",	(uintptr_t)_rdc_unpinned,	0,
2971 	0,		0,				0
2972 };
2973 
2974 
2975 static nsc_def_t _rdc_io_def[] = {
2976 	"Open",		(uintptr_t)_rdc_openc,		0,
2977 	"Close",	(uintptr_t)_rdc_close,		0,
2978 	"Attach",	(uintptr_t)_rdc_attach,		0,
2979 	"Detach",	(uintptr_t)_rdc_detach,		0,
2980 	"AllocHandle",	(uintptr_t)_rdc_alloc_handle,	0,
2981 	"FreeHandle",	(uintptr_t)_rdc_free_handle,	0,
2982 	"AllocBuf",	(uintptr_t)_rdc_alloc_buf,	0,
2983 	"FreeBuf",	(uintptr_t)_rdc_free_buf,	0,
2984 	"GetPinned",	(uintptr_t)_rdc_get_pinned,	0,
2985 	"Discard",	(uintptr_t)_rdc_discard_pinned,	0,
2986 	"PartSize",	(uintptr_t)_rdc_partsize,	0,
2987 	"MaxFbas",	(uintptr_t)_rdc_maxfbas,	0,
2988 	"Control",	(uintptr_t)_rdc_control,	0,
2989 	"Read",		(uintptr_t)_rdc_read,		0,
2990 	"Write",	(uintptr_t)_rdc_write,		0,
2991 	"Zero",		(uintptr_t)_rdc_zero,		0,
2992 	"Uncommit",	(uintptr_t)_rdc_uncommit,	0,
2993 	"TrackSize",	(uintptr_t)_rdc_trksize,	0,
2994 	"Provide",	0,				0,
2995 	0,		0,				0
2996 };
2997 
2998 static nsc_def_t _rdc_ior_def[] = {
2999 	"Open",		(uintptr_t)_rdc_openr,		0,
3000 	"Close",	(uintptr_t)_rdc_close,		0,
3001 	"Attach",	(uintptr_t)_rdc_attach,		0,
3002 	"Detach",	(uintptr_t)_rdc_detach,		0,
3003 	"AllocHandle",	(uintptr_t)_rdc_alloc_handle,	0,
3004 	"FreeHandle",	(uintptr_t)_rdc_free_handle,	0,
3005 	"AllocBuf",	(uintptr_t)_rdc_alloc_buf,	0,
3006 	"FreeBuf",	(uintptr_t)_rdc_free_buf,	0,
3007 	"GetPinned",	(uintptr_t)_rdc_get_pinned,	0,
3008 	"Discard",	(uintptr_t)_rdc_discard_pinned,	0,
3009 	"PartSize",	(uintptr_t)_rdc_partsize,	0,
3010 	"MaxFbas",	(uintptr_t)_rdc_maxfbas,	0,
3011 	"Control",	(uintptr_t)_rdc_control,	0,
3012 	"Read",		(uintptr_t)_rdc_read,		0,
3013 	"Write",	(uintptr_t)_rdc_write,		0,
3014 	"Zero",		(uintptr_t)_rdc_zero,		0,
3015 	"Uncommit",	(uintptr_t)_rdc_uncommit,	0,
3016 	"TrackSize",	(uintptr_t)_rdc_trksize,	0,
3017 	"Provide",	0,				0,
3018 	0,		0,				0
3019 };
3020