1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26
27 #include <sys/types.h>
28 #include <sys/ksynch.h>
29 #include <sys/kmem.h>
30 #include <sys/errno.h>
31 #include <sys/cmn_err.h>
32 #include <sys/debug.h>
33 #include <sys/cred.h>
34 #include <sys/file.h>
35 #include <sys/ddi.h>
36 #include <sys/nsc_thread.h>
37 #include <sys/unistat/spcs_s.h>
38 #include <sys/unistat/spcs_errors.h>
39
40 #include <sys/unistat/spcs_s_k.h>
41 #ifdef DS_DDICT
42 #include "../contract.h"
43 #endif
44
45 #include <sys/nsctl/nsctl.h>
46
47 #include <sys/sdt.h> /* dtrace is S10 or later */
48
49 #include "rdc.h"
50 #include "rdc_io.h"
51 #include "rdc_bitmap.h"
52
53 /*
54 * Remote Dual Copy
55 *
56 * This file contains the nsctl io provider functionality for RDC.
57 *
58 * RDC is implemented as a simple filter module that pushes itself between
59 * user (SIMCKD, STE, etc.) and SDBC.
60 */
61
62
63 static int _rdc_open_count;
64 int rdc_eio_nobmp = 0;
65
66 nsc_io_t *_rdc_io_hc;
67 static nsc_io_t *_rdc_io_hr;
68 static nsc_def_t _rdc_fd_def[], _rdc_io_def[], _rdc_ior_def[];
69
70 void _rdc_deinit_dev();
71 int rdc_diskq_enqueue(rdc_k_info_t *, rdc_aio_t *);
72 extern void rdc_unintercept_diskq(rdc_group_t *);
73 rdc_aio_t *rdc_aio_tbuf_get(void *, void *, int, int, int, int, int);
74
75 static nsc_buf_t *_rdc_alloc_handle(void (*)(), void (*)(),
76 void (*)(), rdc_fd_t *);
77 static int _rdc_free_handle(rdc_buf_t *, rdc_fd_t *);
78
79 #ifdef DEBUG
80 int rdc_overlap_cnt;
81 int rdc_overlap_hnd_cnt;
82 #endif
83
84 static rdc_info_dev_t *rdc_devices;
85
86 extern int _rdc_rsrv_diskq(rdc_group_t *group);
87 extern void _rdc_rlse_diskq(rdc_group_t *group);
88
89 /*
90 * _rdc_init_dev
91 * Initialise the io provider.
92 */
93
94 int
_rdc_init_dev()95 _rdc_init_dev()
96 {
97 _rdc_io_hc = nsc_register_io("rdc-high-cache",
98 NSC_RDCH_ID|NSC_REFCNT|NSC_FILTER, _rdc_io_def);
99 if (_rdc_io_hc == NULL)
100 cmn_err(CE_WARN, "!rdc: nsc_register_io (high, cache) failed.");
101
102 _rdc_io_hr = nsc_register_io("rdc-high-raw",
103 NSC_RDCHR_ID|NSC_REFCNT|NSC_FILTER, _rdc_ior_def);
104 if (_rdc_io_hr == NULL)
105 cmn_err(CE_WARN, "!rdc: nsc_register_io (high, raw) failed.");
106
107 if (!_rdc_io_hc || !_rdc_io_hr) {
108 _rdc_deinit_dev();
109 return (ENOMEM);
110 }
111
112 return (0);
113 }
114
115
116 /*
117 * _rdc_deinit_dev
118 * De-initialise the io provider.
119 *
120 */
121
122 void
_rdc_deinit_dev()123 _rdc_deinit_dev()
124 {
125 int rc;
126
127 if (_rdc_io_hc) {
128 if ((rc = nsc_unregister_io(_rdc_io_hc, 0)) != 0)
129 cmn_err(CE_WARN,
130 "!rdc: nsc_unregister_io (high, cache) failed: %d",
131 rc);
132 }
133
134 if (_rdc_io_hr) {
135 if ((rc = nsc_unregister_io(_rdc_io_hr, 0)) != 0)
136 cmn_err(CE_WARN,
137 "!rdc: nsc_unregister_io (high, raw) failed: %d",
138 rc);
139 }
140 }
141
142
143 /*
144 * rdc_idev_open
145 * - Open the nsctl file descriptors for the data devices.
146 *
147 * Must be called with rdc_conf_lock held.
148 * id_sets is protected by rdc_conf_lock.
149 */
150 static rdc_info_dev_t *
rdc_idev_open(rdc_k_info_t * krdc,char * pathname,int * rc)151 rdc_idev_open(rdc_k_info_t *krdc, char *pathname, int *rc)
152 {
153 rdc_info_dev_t *dp;
154
155 ASSERT(MUTEX_HELD(&rdc_conf_lock));
156
157 for (dp = rdc_devices; dp; dp = dp->id_next) {
158 if (dp->id_cache_dev.bi_fd &&
159 strcmp(pathname, nsc_pathname(dp->id_cache_dev.bi_fd)) == 0)
160 break;
161 }
162
163 if (!dp) {
164 dp = kmem_zalloc(sizeof (*dp), KM_SLEEP);
165 if (!dp)
166 return (NULL);
167
168 dp->id_cache_dev.bi_krdc = krdc;
169 dp->id_cache_dev.bi_fd = nsc_open(pathname,
170 NSC_RDCHR_ID|NSC_RDWR|NSC_DEVICE,
171 _rdc_fd_def, (blind_t)&dp->id_cache_dev, rc);
172 if (!dp->id_cache_dev.bi_fd) {
173 kmem_free(dp, sizeof (*dp));
174 return (NULL);
175 }
176
177 dp->id_raw_dev.bi_krdc = krdc;
178 dp->id_raw_dev.bi_fd = nsc_open(pathname,
179 NSC_RDCHR_ID|NSC_RDWR|NSC_DEVICE,
180 _rdc_fd_def, (blind_t)&dp->id_raw_dev, rc);
181 if (!dp->id_raw_dev.bi_fd) {
182 (void) nsc_close(dp->id_cache_dev.bi_fd);
183 kmem_free(dp, sizeof (*dp));
184 return (NULL);
185 }
186
187 mutex_init(&dp->id_rlock, NULL, MUTEX_DRIVER, NULL);
188 cv_init(&dp->id_rcv, NULL, CV_DRIVER, NULL);
189
190 dp->id_next = rdc_devices;
191 rdc_devices = dp;
192 }
193
194 dp->id_sets++;
195 return (dp);
196 }
197
198
199 /*
200 * rdc_idev_close
201 * - Close the nsctl file descriptors for the data devices.
202 *
203 * Must be called with rdc_conf_lock and dp->id_rlock held.
204 * Will release dp->id_rlock before returning.
205 *
206 * id_sets is protected by rdc_conf_lock.
207 */
208 static void
rdc_idev_close(rdc_k_info_t * krdc,rdc_info_dev_t * dp)209 rdc_idev_close(rdc_k_info_t *krdc, rdc_info_dev_t *dp)
210 {
211 rdc_info_dev_t **dpp;
212 #ifdef DEBUG
213 int count = 0;
214 #endif
215
216 ASSERT(MUTEX_HELD(&rdc_conf_lock));
217 ASSERT(MUTEX_HELD(&dp->id_rlock));
218
219 dp->id_sets--;
220 if (dp->id_sets > 0) {
221 mutex_exit(&dp->id_rlock);
222 return;
223 }
224
225 /* external references must have gone */
226 ASSERT((krdc->c_ref + krdc->r_ref + krdc->b_ref) == 0);
227
228 /* unlink from chain */
229
230 for (dpp = &rdc_devices; *dpp; dpp = &((*dpp)->id_next)) {
231 if (*dpp == dp) {
232 /* unlink */
233 *dpp = dp->id_next;
234 break;
235 }
236 }
237
238 /*
239 * Wait for all reserves to go away - the rpc server is
240 * running asynchronously with this close, and so we
241 * have to wait for it to spot that the krdc is !IS_ENABLED()
242 * and throw away the nsc_buf_t's that it has allocated
243 * and release the device.
244 */
245
246 while (IS_CRSRV(krdc) || IS_RRSRV(krdc)) {
247 #ifdef DEBUG
248 if (!(++count % 16)) {
249 cmn_err(CE_NOTE,
250 "!_rdc_idev_close(%s): waiting for nsc_release",
251 rdc_u_info[krdc->index].primary.file);
252 }
253 if (count > (16*20)) {
254 /* waited for 20 seconds - too long - panic */
255 cmn_err(CE_PANIC,
256 "!_rdc_idev_close(%s, %p): lost nsc_release",
257 rdc_u_info[krdc->index].primary.file, (void *)krdc);
258 }
259 #endif
260 mutex_exit(&dp->id_rlock);
261 delay(HZ>>4);
262 mutex_enter(&dp->id_rlock);
263 }
264
265 if (dp->id_cache_dev.bi_fd) {
266 (void) nsc_close(dp->id_cache_dev.bi_fd);
267 dp->id_cache_dev.bi_fd = NULL;
268 }
269
270 if (dp->id_raw_dev.bi_fd) {
271 (void) nsc_close(dp->id_raw_dev.bi_fd);
272 dp->id_raw_dev.bi_fd = NULL;
273 }
274
275 mutex_exit(&dp->id_rlock);
276 mutex_destroy(&dp->id_rlock);
277 cv_destroy(&dp->id_rcv);
278
279 kmem_free(dp, sizeof (*dp));
280 }
281
282
283 /*
284 * This function provokes an nsc_reserve() for the device which
285 * if successful will populate krdc->maxfbas and urdc->volume_size
286 * via the _rdc_attach_fd() callback.
287 */
288 void
rdc_get_details(rdc_k_info_t * krdc)289 rdc_get_details(rdc_k_info_t *krdc)
290 {
291 int rc;
292 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
293 nsc_size_t vol_size, maxfbas;
294
295 if (_rdc_rsrv_devs(krdc, RDC_RAW, RDC_INTERNAL) == 0) {
296 /*
297 * if the vol is already reserved,
298 * volume_size won't be populated on enable because
299 * it is a *fake* reserve and does not make it to
300 * _rdc_attach_fd(). So do it here.
301 */
302 rc = nsc_partsize(RDC_U_FD(krdc), &vol_size);
303 if (rc != 0) {
304 #ifdef DEBUG
305 cmn_err(CE_WARN,
306 "!rdc_get_details: partsize failed (%d)", rc);
307 #endif /* DEBUG */
308 urdc->volume_size = vol_size = 0;
309 }
310
311 urdc->volume_size = vol_size;
312 rc = nsc_maxfbas(RDC_U_FD(krdc), 0, &maxfbas);
313 if (rc != 0) {
314 #ifdef DEBUG
315 cmn_err(CE_WARN,
316 "!rdc_get_details: maxfbas failed (%d)", rc);
317 #endif /* DEBUG */
318 maxfbas = 0;
319 }
320 krdc->maxfbas = min(RDC_MAX_MAXFBAS, maxfbas);
321
322 _rdc_rlse_devs(krdc, RDC_RAW);
323 }
324 }
325
326
327 /*
328 * Should only be used by the config code.
329 */
330
331 int
rdc_dev_open(rdc_set_t * rdc_set,int options)332 rdc_dev_open(rdc_set_t *rdc_set, int options)
333 {
334 rdc_k_info_t *krdc;
335 int index;
336 int rc;
337 char *pathname;
338
339 ASSERT(MUTEX_HELD(&rdc_conf_lock));
340
341 if (options & RDC_OPT_PRIMARY)
342 pathname = rdc_set->primary.file;
343 else
344 pathname = rdc_set->secondary.file;
345
346 for (index = 0; index < rdc_max_sets; index++) {
347 krdc = &rdc_k_info[index];
348
349 if (!IS_CONFIGURED(krdc))
350 break;
351 }
352
353 if (index == rdc_max_sets) {
354 #ifdef DEBUG
355 cmn_err(CE_WARN, "!rdc_dev_open: out of cd\'s");
356 #endif
357 index = -EINVAL;
358 goto out;
359 }
360
361 if (krdc->devices && (krdc->c_fd || krdc->r_fd)) {
362 #ifdef DEBUG
363 cmn_err(CE_WARN, "!rdc_dev_open: %s already open", pathname);
364 #endif
365 index = -EINVAL;
366 goto out;
367 }
368
369 _rdc_open_count++;
370
371 krdc->devices = rdc_idev_open(krdc, pathname, &rc);
372 if (!krdc->devices) {
373 index = -rc;
374 goto open_fail;
375 }
376
377 /*
378 * Grab the device size and maxfbas now.
379 */
380
381 rdc_get_details(krdc);
382
383 out:
384 return (index);
385
386 open_fail:
387 _rdc_open_count--;
388
389 return (index);
390 }
391
392
393 void
rdc_dev_close(rdc_k_info_t * krdc)394 rdc_dev_close(rdc_k_info_t *krdc)
395 {
396 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
397
398 mutex_enter(&rdc_conf_lock);
399
400 if (krdc->devices)
401 mutex_enter(&krdc->devices->id_rlock);
402
403 #ifdef DEBUG
404 if (!krdc->devices || !krdc->c_fd || !krdc->r_fd) {
405 cmn_err(CE_WARN,
406 "!rdc_dev_close(%p): c_fd %p r_fd %p", (void *)krdc,
407 (void *) (krdc->devices ? krdc->c_fd : 0),
408 (void *) (krdc->devices ? krdc->r_fd : 0));
409 }
410 #endif
411
412 if (krdc->devices) {
413 /* rdc_idev_close will release id_rlock */
414 rdc_idev_close(krdc, krdc->devices);
415 krdc->devices = NULL;
416 }
417
418 urdc->primary.file[0] = '\0';
419
420 if (_rdc_open_count <= 0) {
421 cmn_err(CE_WARN, "!rdc: _rdc_open_count corrupt: %d",
422 _rdc_open_count);
423 }
424
425 _rdc_open_count--;
426
427 mutex_exit(&rdc_conf_lock);
428 }
429
430
431 /*
432 * rdc_intercept
433 *
434 * Register for IO on this device with nsctl.
435 *
436 * For a 1-to-many primary we register for each krdc and let nsctl sort
437 * out which it wants to be using. This means that we cannot tell which
438 * krdc will receive the incoming io from nsctl, though we do know that
439 * at any one time only one krdc will be 'attached' and so get io from
440 * nsctl.
441 *
442 * So the krdc->many_next pointer is maintained as a circular list. The
443 * result of these multiple nsc_register_paths is that we will see a
444 * few more attach and detach io provider calls during enable/resume
445 * and disable/suspend of the 1-to-many whilst nsctl settles down to
446 * using a single krdc.
447 *
448 * The major advantage of this scheme is that nsctl sorts out all the
449 * rdc_fd_t's so that they can only point to krdc's that are currently
450 * active.
451 */
452 int
rdc_intercept(rdc_k_info_t * krdc)453 rdc_intercept(rdc_k_info_t *krdc)
454 {
455 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
456 char *pathname;
457 char *bitmap;
458
459 if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
460 pathname = urdc->primary.file;
461 bitmap = urdc->primary.bitmap;
462 } else {
463 pathname = urdc->secondary.file;
464 bitmap = urdc->secondary.bitmap;
465 }
466
467 if (!krdc->b_tok)
468 krdc->b_tok = nsc_register_path(bitmap, NSC_CACHE | NSC_DEVICE,
469 _rdc_io_hc);
470
471 if (!krdc->c_tok)
472 krdc->c_tok = nsc_register_path(pathname, NSC_CACHE,
473 _rdc_io_hc);
474
475 if (!krdc->r_tok)
476 krdc->r_tok = nsc_register_path(pathname, NSC_DEVICE,
477 _rdc_io_hr);
478
479 if (!krdc->c_tok || !krdc->r_tok) {
480 (void) rdc_unintercept(krdc);
481 return (ENXIO);
482 }
483
484 return (0);
485 }
486
487
488 static void
wait_unregistering(rdc_k_info_t * krdc)489 wait_unregistering(rdc_k_info_t *krdc)
490 {
491 while (krdc->group->unregistering > 0)
492 (void) cv_wait_sig(&krdc->group->unregistercv, &rdc_conf_lock);
493 }
494
495 static void
set_unregistering(rdc_k_info_t * krdc)496 set_unregistering(rdc_k_info_t *krdc)
497 {
498 wait_unregistering(krdc);
499
500 krdc->group->unregistering++;
501 }
502
503 static void
wakeup_unregistering(rdc_k_info_t * krdc)504 wakeup_unregistering(rdc_k_info_t *krdc)
505 {
506 if (krdc->group->unregistering <= 0)
507 return;
508
509 krdc->group->unregistering--;
510 cv_broadcast(&krdc->group->unregistercv);
511 }
512
513
514 /*
515 * rdc_unintercept
516 *
517 * Unregister for IO on this device.
518 *
519 * See comments above rdc_intercept.
520 */
521 int
rdc_unintercept(rdc_k_info_t * krdc)522 rdc_unintercept(rdc_k_info_t *krdc)
523 {
524 int err = 0;
525 int rc;
526 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
527
528 mutex_enter(&rdc_conf_lock);
529 set_unregistering(krdc);
530 krdc->type_flag |= RDC_UNREGISTER;
531 mutex_exit(&rdc_conf_lock);
532
533 if (krdc->r_tok) {
534 rc = nsc_unregister_path(krdc->r_tok, 0);
535 if (rc) {
536 cmn_err(CE_WARN, "!rdc: unregister rawfd %d", rc);
537 err = rc;
538 }
539 krdc->r_tok = NULL;
540 }
541
542 if (krdc->c_tok) {
543 rc = nsc_unregister_path(krdc->c_tok, 0);
544 if (rc) {
545 cmn_err(CE_WARN, "!rdc: unregister cachefd %d", rc);
546 if (!err)
547 err = rc;
548 }
549 krdc->c_tok = NULL;
550 }
551
552 if (krdc->b_tok) {
553 rc = nsc_unregister_path(krdc->b_tok, 0);
554 if (rc) {
555 cmn_err(CE_WARN, "!rdc: unregister bitmap %d", rc);
556 err = rc;
557 }
558 krdc->b_tok = NULL;
559 }
560
561 rdc_group_enter(krdc);
562
563 /* Wait for all necessary _rdc_close() calls to complete */
564 while ((krdc->c_ref + krdc->r_ref + krdc->b_ref) != 0) {
565 krdc->closing++;
566 cv_wait(&krdc->closingcv, &krdc->group->lock);
567 krdc->closing--;
568 }
569
570 rdc_clr_flags(urdc, RDC_ENABLED);
571 rdc_group_exit(krdc);
572
573
574 /*
575 * Check there are no outstanding writes in progress.
576 * This can happen when a set is being disabled which
577 * is one of the 'one_to_many' chain, that did not
578 * intercept the original write call.
579 */
580
581 for (;;) {
582 rdc_group_enter(krdc);
583 if (krdc->aux_state & RDC_AUXWRITE) {
584 rdc_group_exit(krdc);
585 /*
586 * This doesn't happen very often,
587 * just delay a bit and re-look.
588 */
589 delay(50);
590 } else {
591 rdc_group_exit(krdc);
592 break;
593 }
594 }
595
596 mutex_enter(&rdc_conf_lock);
597 krdc->type_flag &= ~RDC_UNREGISTER;
598 wakeup_unregistering(krdc);
599 mutex_exit(&rdc_conf_lock);
600
601 return (err);
602 }
603
604
605 /*
606 * _rdc_rlse_d
607 * Internal version of _rdc_rlse_devs(), only concerned with the
608 * data device, not the bitmap.
609 */
610
611 static void
_rdc_rlse_d(rdc_k_info_t * krdc,int devs)612 _rdc_rlse_d(rdc_k_info_t *krdc, int devs)
613 {
614 _rdc_info_dev_t *cip;
615 _rdc_info_dev_t *rip;
616 int raw = (devs & RDC_RAW);
617
618 if (!krdc) {
619 cmn_err(CE_WARN, "!rdc: _rdc_rlse_devs null krdc");
620 return;
621 }
622
623 ASSERT((devs & (~RDC_BMP)) != 0);
624
625 cip = &krdc->devices->id_cache_dev;
626 rip = &krdc->devices->id_raw_dev;
627
628 if (IS_RSRV(cip)) {
629 /* decrement count */
630
631 if (raw) {
632 if (cip->bi_ofailed > 0) {
633 cip->bi_ofailed--;
634 } else if (cip->bi_orsrv > 0) {
635 cip->bi_orsrv--;
636 }
637 } else {
638 if (cip->bi_failed > 0) {
639 cip->bi_failed--;
640 } else if (cip->bi_rsrv > 0) {
641 cip->bi_rsrv--;
642 }
643 }
644
645 /*
646 * reset nsc_fd ownership back link, it is only set if
647 * we have really done an underlying reserve, not for
648 * failed (faked) reserves.
649 */
650
651 if (cip->bi_rsrv > 0 || cip->bi_orsrv > 0) {
652 nsc_set_owner(cip->bi_fd, krdc->iodev);
653 } else {
654 nsc_set_owner(cip->bi_fd, NULL);
655 }
656
657 /* release nsc_fd */
658
659 if (!IS_RSRV(cip)) {
660 nsc_release(cip->bi_fd);
661 }
662 } else if (IS_RSRV(rip)) {
663 /* decrement count */
664
665 if (raw) {
666 if (rip->bi_failed > 0) {
667 rip->bi_failed--;
668 } else if (rip->bi_rsrv > 0) {
669 rip->bi_rsrv--;
670 }
671 } else {
672 if (rip->bi_ofailed > 0) {
673 rip->bi_ofailed--;
674 } else if (rip->bi_orsrv > 0) {
675 rip->bi_orsrv--;
676 }
677 }
678
679 /*
680 * reset nsc_fd ownership back link, it is only set if
681 * we have really done an underlying reserve, not for
682 * failed (faked) reserves.
683 */
684
685 if (rip->bi_rsrv > 0 || rip->bi_orsrv > 0) {
686 nsc_set_owner(rip->bi_fd, krdc->iodev);
687 } else {
688 nsc_set_owner(rip->bi_fd, NULL);
689 }
690
691 /* release nsc_fd and any waiters */
692
693 if (!IS_RSRV(rip)) {
694 rip->bi_flag = 0;
695 nsc_release(rip->bi_fd);
696 cv_broadcast(&krdc->devices->id_rcv);
697 }
698 } else {
699 cmn_err(CE_WARN, "!rdc: _rdc_rlse_devs no reserve? krdc %p",
700 (void *) krdc);
701 }
702 }
703
704 /*
705 * _rdc_rlse_devs
706 * Release named underlying devices and take care of setting the
707 * back link on the nsc_fd to the correct parent iodev.
708 *
709 * NOTE: the 'devs' argument must be the same as that passed to
710 * the preceding _rdc_rsrv_devs call.
711 */
712
713 void
_rdc_rlse_devs(rdc_k_info_t * krdc,int devs)714 _rdc_rlse_devs(rdc_k_info_t *krdc, int devs)
715 {
716
717 DTRACE_PROBE(_rdc_rlse_devs_start);
718 mutex_enter(&krdc->devices->id_rlock);
719
720 ASSERT(!(devs & RDC_CACHE));
721
722 if ((devs & (~RDC_BMP)) != 0) {
723 _rdc_rlse_d(krdc, devs);
724 }
725
726 if ((devs & RDC_BMP) != 0) {
727 if (krdc->bmaprsrv > 0 && --krdc->bmaprsrv == 0) {
728 nsc_release(krdc->bitmapfd);
729 }
730 }
731
732 mutex_exit(&krdc->devices->id_rlock);
733
734 }
735
736 /*
737 * _rdc_rsrv_d
738 * Reserve device flagged, unless its companion is already reserved,
739 * in that case increase the reserve on the companion. Take care
740 * of setting the nsc_fd ownership back link to the correct parent
741 * iodev pointer.
742 */
743
744 static int
_rdc_rsrv_d(int raw,_rdc_info_dev_t * rid,_rdc_info_dev_t * cid,int flag,rdc_k_info_t * krdc)745 _rdc_rsrv_d(int raw, _rdc_info_dev_t *rid, _rdc_info_dev_t *cid, int flag,
746 rdc_k_info_t *krdc)
747 {
748 _rdc_info_dev_t *p = NULL;
749 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
750 int other = 0;
751 int rc;
752
753
754 #ifdef DEBUG
755 if ((rid->bi_rsrv < 0) ||
756 (cid->bi_rsrv < 0) ||
757 (rid->bi_orsrv < 0) ||
758 (cid->bi_orsrv < 0) ||
759 (rid->bi_failed < 0) ||
760 (cid->bi_failed < 0) ||
761 (rid->bi_ofailed < 0) ||
762 (cid->bi_ofailed < 0)) {
763 cmn_err(CE_WARN,
764 "!_rdc_rsrv_d: negative counts (rsrv %d %d orsrv %d %d)",
765 rid->bi_rsrv, cid->bi_rsrv,
766 rid->bi_orsrv, cid->bi_orsrv);
767 cmn_err(CE_WARN,
768 "!_rdc_rsrv_d: negative counts (fail %d %d ofail %d %d)",
769 rid->bi_failed, cid->bi_failed,
770 rid->bi_ofailed, cid->bi_ofailed);
771 cmn_err(CE_PANIC, "_rdc_rsrv_d: negative counts (krdc %p)",
772 (void *) krdc);
773 }
774 #endif
775
776 /*
777 * If user wants to do a cache reserve and it's already
778 * raw reserved internally, we need to do a real nsc_reserve, so wait
779 * until the release has been done.
780 */
781 if (IS_RSRV(rid) && (flag == RDC_EXTERNAL) &&
782 (raw == 0) && (rid->bi_flag != RDC_EXTERNAL)) {
783 krdc->devices->id_release++;
784 while (IS_RSRV(rid))
785 cv_wait(&krdc->devices->id_rcv,
786 &krdc->devices->id_rlock);
787 krdc->devices->id_release--;
788 }
789
790 /* select underlying device to use */
791
792 if (IS_RSRV(rid)) {
793 p = rid;
794 if (!raw) {
795 other = 1;
796 }
797 } else if (IS_RSRV(cid)) {
798 p = cid;
799 if (raw) {
800 other = 1;
801 }
802 }
803
804 /* just increment count and return if already reserved */
805
806 if (p && !RFAILED(p)) {
807 if (other) {
808 p->bi_orsrv++;
809 } else {
810 p->bi_rsrv++;
811 }
812
813 /* set nsc_fd ownership back link */
814 nsc_set_owner(p->bi_fd, krdc->iodev);
815 return (0);
816 }
817
818 /* attempt reserve */
819
820 if (!p) {
821 p = raw ? rid : cid;
822 }
823
824 if (!p->bi_fd) {
825 /* rpc server raced with rdc_dev_close() */
826 return (EIO);
827 }
828 if ((rc = nsc_reserve(p->bi_fd, 0)) == 0) {
829 /*
830 * convert failed counts into reserved counts, and add
831 * in this reserve.
832 */
833
834 p->bi_orsrv = p->bi_ofailed;
835 p->bi_rsrv = p->bi_failed;
836
837 if (other) {
838 p->bi_orsrv++;
839 } else {
840 p->bi_rsrv++;
841 }
842
843 p->bi_ofailed = 0;
844 p->bi_failed = 0;
845
846 /* set nsc_fd ownership back link */
847
848 nsc_set_owner(p->bi_fd, krdc->iodev);
849 } else if (rc != EINTR) {
850 /*
851 * If this is the master, and the secondary is not
852 * failed, then just fake this external reserve so that
853 * we can do remote io to the secondary and continue to
854 * provide service to the client.
855 *
856 * Subsequent calls to _rdc_rsrv_d() will re-try the
857 * nsc_reserve() until it succeeds.
858 */
859
860 if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
861 !(rdc_get_vflags(urdc) & RDC_LOGGING) &&
862 !((rdc_get_vflags(urdc) & RDC_SLAVE) &&
863 (rdc_get_vflags(urdc) & RDC_SYNCING))) {
864 if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
865 rdc_many_enter(krdc);
866 /* Primary, so reverse sync needed */
867 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
868 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
869 "nsc_reserve failed");
870 rdc_many_exit(krdc);
871 rc = -1;
872 #ifdef DEBUG
873 cmn_err(CE_NOTE, "!nsc_reserve failed "
874 "with rc == %d\n", rc);
875 #endif
876 } else {
877 rc = 0;
878 }
879
880 if (other) {
881 p->bi_ofailed++;
882 } else {
883 p->bi_failed++;
884 }
885
886 if (krdc->maxfbas == 0) {
887 /*
888 * fake a maxfbas value for remote i/o,
889 * this will get reset when the next
890 * successful reserve happens as part
891 * of the rdc_attach_fd() callback.
892 */
893 krdc->maxfbas = 128;
894 }
895 }
896 }
897
898 if (rc == 0 && raw) {
899 p->bi_flag = flag;
900 }
901
902
903 return (rc);
904 }
905
906 /*
907 * _rdc_rsrv_devs
908 * Reserve named underlying devices.
909 *
910 */
911
912 int
_rdc_rsrv_devs(rdc_k_info_t * krdc,int devs,int flag)913 _rdc_rsrv_devs(rdc_k_info_t *krdc, int devs, int flag)
914 {
915 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
916 int write = 0;
917 int rc = 0;
918 int got = 0;
919
920 if (!krdc) {
921 return (EINVAL);
922 }
923
924 ASSERT(!(devs & RDC_CACHE));
925
926 mutex_enter(&krdc->devices->id_rlock);
927
928 if ((devs & (~RDC_BMP)) != 0) {
929 if ((rc = _rdc_rsrv_d((devs & RDC_CACHE) == 0,
930 &krdc->devices->id_raw_dev, &krdc->devices->id_cache_dev,
931 flag, krdc)) != 0) {
932 if (rc == -1) {
933 /*
934 * we need to call rdc_write_state()
935 * after we drop the mutex
936 */
937 write = 1;
938 rc = 0;
939 } else {
940 cmn_err(CE_WARN,
941 "!rdc: nsc_reserve(%s) failed %d\n",
942 nsc_pathname(krdc->c_fd), rc);
943 }
944 } else {
945 got |= (devs & (~RDC_BMP));
946 }
947 }
948
949 if (rc == 0 && (devs & RDC_BMP) != 0) {
950 if (krdc->bitmapfd == NULL)
951 rc = EIO;
952 else if ((krdc->bmaprsrv == 0) &&
953 (rc = nsc_reserve(krdc->bitmapfd, 0)) != 0) {
954 cmn_err(CE_WARN, "!rdc: nsc_reserve(%s) failed %d\n",
955 nsc_pathname(krdc->bitmapfd), rc);
956 } else {
957 krdc->bmaprsrv++;
958 got |= RDC_BMP;
959 }
960 if (!RDC_SUCCESS(rc)) {
961 /* Undo any previous reserve */
962 if (got != 0)
963 _rdc_rlse_d(krdc, got);
964 }
965 }
966
967 mutex_exit(&krdc->devices->id_rlock);
968
969 if (write) {
970 rdc_write_state(urdc);
971 }
972
973 return (rc);
974 }
975
976
977 /*
978 * Read from the remote end, ensuring that if this is a many group in
979 * slave mode that we only remote read from the secondary with the
980 * valid data.
981 */
982 int
_rdc_remote_read(rdc_k_info_t * krdc,nsc_buf_t * h,nsc_off_t pos,nsc_size_t len,int flag)983 _rdc_remote_read(rdc_k_info_t *krdc, nsc_buf_t *h, nsc_off_t pos,
984 nsc_size_t len, int flag)
985 {
986 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
987 rdc_k_info_t *this = krdc; /* krdc that was requested */
988 int rc;
989
990 if (flag & NSC_RDAHEAD) {
991 /*
992 * no point in doing readahead remotely,
993 * just say we did it ok - the client is about to
994 * throw this buffer away as soon as we return.
995 */
996 return (NSC_DONE);
997 }
998
999 /*
1000 * If this is a many group with a reverse sync in progress and
1001 * this is not the slave krdc/urdc, then search for the slave
1002 * so that we can do the remote io from the correct secondary.
1003 */
1004 if ((rdc_get_mflags(urdc) & RDC_SLAVE) &&
1005 !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
1006 rdc_many_enter(krdc);
1007 for (krdc = krdc->many_next; krdc != this;
1008 krdc = krdc->many_next) {
1009 urdc = &rdc_u_info[krdc->index];
1010 if (!IS_ENABLED(urdc))
1011 continue;
1012 if (rdc_get_vflags(urdc) & RDC_SLAVE)
1013 break;
1014 }
1015 rdc_many_exit(krdc);
1016
1017 this = krdc;
1018 }
1019
1020 read1:
1021 if (rdc_get_vflags(urdc) & RDC_LOGGING) {
1022 /* cannot do remote io without the remote node! */
1023 rc = ENETDOWN;
1024 goto read2;
1025 }
1026
1027
1028 /* wait for the remote end to have the latest data */
1029
1030 if (IS_ASYNC(urdc)) {
1031 while (krdc->group->ra_queue.blocks != 0) {
1032 if (!krdc->group->rdc_writer)
1033 (void) rdc_writer(krdc->index);
1034
1035 (void) rdc_drain_queue(krdc->index);
1036 }
1037 }
1038
1039 if (krdc->io_kstats) {
1040 mutex_enter(krdc->io_kstats->ks_lock);
1041 kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1042 mutex_exit(krdc->io_kstats->ks_lock);
1043 }
1044
1045 rc = rdc_net_read(krdc->index, krdc->remote_index, h, pos, len);
1046
1047 if (krdc->io_kstats) {
1048 mutex_enter(krdc->io_kstats->ks_lock);
1049 kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
1050 mutex_exit(krdc->io_kstats->ks_lock);
1051 }
1052
1053 /* If read error keep trying every secondary until no more */
1054 read2:
1055 if (!RDC_SUCCESS(rc) && IS_MANY(krdc) &&
1056 !(rdc_get_mflags(urdc) & RDC_SLAVE)) {
1057 rdc_many_enter(krdc);
1058 for (krdc = krdc->many_next; krdc != this;
1059 krdc = krdc->many_next) {
1060 urdc = &rdc_u_info[krdc->index];
1061 if (!IS_ENABLED(urdc))
1062 continue;
1063 rdc_many_exit(krdc);
1064 goto read1;
1065 }
1066 rdc_many_exit(krdc);
1067 }
1068
1069 return (rc);
1070 }
1071
1072
1073 /*
1074 * _rdc_alloc_buf
1075 * Allocate a buffer of data
1076 *
1077 * Calling/Exit State:
1078 * Returns NSC_DONE or NSC_HIT for success, NSC_PENDING for async
1079 * I/O, > 0 is an error code.
1080 *
1081 * Description:
1082 */
1083 int rdcbufs = 0;
1084
1085 static int
_rdc_alloc_buf(rdc_fd_t * rfd,nsc_off_t pos,nsc_size_t len,int flag,rdc_buf_t ** ptr)1086 _rdc_alloc_buf(rdc_fd_t *rfd, nsc_off_t pos, nsc_size_t len, int flag,
1087 rdc_buf_t **ptr)
1088 {
1089 rdc_k_info_t *krdc = rfd->rdc_info;
1090 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1091 nsc_vec_t *vec = NULL;
1092 rdc_buf_t *h;
1093 size_t size;
1094 int ioflag;
1095 int rc = 0;
1096
1097 if (RDC_IS_BMP(rfd) || RDC_IS_QUE(rfd))
1098 return (EIO);
1099
1100 if (len == 0)
1101 return (EINVAL);
1102
1103 if (flag & NSC_WRBUF) {
1104
1105 if (!(rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1106 !(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1107 /*
1108 * Forbid writes to secondary unless logging.
1109 */
1110 return (EIO);
1111 }
1112 }
1113
1114 if (!(rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1115 (rdc_get_vflags(urdc) & RDC_SYNC_NEEDED)) {
1116 /*
1117 * Forbid any io to secondary if it needs a sync.
1118 */
1119 return (EIO);
1120 }
1121
1122 if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1123 (rdc_get_vflags(urdc) & RDC_RSYNC_NEEDED) &&
1124 !(rdc_get_vflags(urdc) & RDC_VOL_FAILED) &&
1125 !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
1126 /*
1127 * Forbid any io to primary if it needs a reverse sync
1128 * and is not actively syncing.
1129 */
1130 return (EIO);
1131 }
1132
1133 /* Bounds checking */
1134 ASSERT(urdc->volume_size != 0);
1135 if (pos + len > urdc->volume_size) {
1136 #ifdef DEBUG
1137 cmn_err(CE_NOTE,
1138 "!rdc: Attempt to access beyond end of rdc volume");
1139 #endif
1140 return (EIO);
1141 }
1142
1143 h = *ptr;
1144 if (h == NULL) {
1145 /* should never happen (nsctl does this for us) */
1146 #ifdef DEBUG
1147 cmn_err(CE_WARN, "!_rdc_alloc_buf entered without buffer!");
1148 #endif
1149 h = (rdc_buf_t *)_rdc_alloc_handle(NULL, NULL, NULL, rfd);
1150 if (h == NULL)
1151 return (ENOMEM);
1152
1153 h->rdc_bufh.sb_flag &= ~NSC_HALLOCATED;
1154 *ptr = h;
1155 }
1156
1157 if (flag & NSC_NOBLOCK) {
1158 cmn_err(CE_WARN,
1159 "!_rdc_alloc_buf: removing unsupported NSC_NOBLOCK flag");
1160 flag &= ~(NSC_NOBLOCK);
1161 }
1162
1163 h->rdc_bufh.sb_error = 0;
1164 h->rdc_bufh.sb_flag |= flag;
1165 h->rdc_bufh.sb_pos = pos;
1166 h->rdc_bufh.sb_len = len;
1167 ioflag = flag;
1168
1169 bzero(&h->rdc_sync, sizeof (h->rdc_sync));
1170 mutex_init(&h->rdc_sync.lock, NULL, MUTEX_DRIVER, NULL);
1171 cv_init(&h->rdc_sync.cv, NULL, CV_DRIVER, NULL);
1172
1173 if (flag & NSC_WRBUF)
1174 _rdc_async_throttle(krdc, len); /* throttle incoming io */
1175
1176 /*
1177 * Use remote io when:
1178 * - local volume is failed
1179 * - reserve status is failed
1180 */
1181 if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED) || IS_RFAILED(krdc)) {
1182 rc = EIO;
1183 } else {
1184 rc = nsc_alloc_buf(RDC_U_FD(krdc), pos, len,
1185 ioflag, &h->rdc_bufp);
1186 if (!RDC_SUCCESS(rc)) {
1187 rdc_many_enter(krdc);
1188 if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1189 /* Primary, so reverse sync needed */
1190 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
1191 } else {
1192 /* Secondary, so forward sync needed */
1193 rdc_set_flags(urdc, RDC_SYNC_NEEDED);
1194 }
1195 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
1196 "nsc_alloc_buf failed");
1197 rdc_many_exit(krdc);
1198 rdc_write_state(urdc);
1199 }
1200 }
1201
1202 if (RDC_SUCCESS(rc)) {
1203 h->rdc_bufh.sb_vec = h->rdc_bufp->sb_vec;
1204 h->rdc_flags |= RDC_ALLOC;
1205
1206 /*
1207 * If in slave and reading data, remote read on top of
1208 * the buffer to ensure that we have the latest data.
1209 */
1210 if ((flag & NSC_READ) &&
1211 (rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1212 (rdc_get_mflags(urdc) & RDC_SLAVE)) {
1213 rc = _rdc_remote_read(krdc, &h->rdc_bufh,
1214 pos, len, flag);
1215 /*
1216 * Set NSC_MIXED so that the
1217 * cache will throw away this buffer when we free
1218 * it since we have combined data from multiple
1219 * sources into a single buffer.
1220 */
1221 h->rdc_bufp->sb_flag |= NSC_MIXED;
1222 }
1223 }
1224
1225 /*
1226 * If nsc_alloc_buf above fails, or local volume is failed or
1227 * bitmap is failed or reserve, then we fill the buf from remote
1228 */
1229
1230 if ((!RDC_SUCCESS(rc)) && (rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1231 !(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1232 if (flag & NSC_NODATA) {
1233 ASSERT(!(flag & NSC_READ));
1234 h->rdc_flags |= RDC_REMOTE_BUF;
1235 h->rdc_bufh.sb_vec = NULL;
1236 } else {
1237 size = sizeof (nsc_vec_t) * 2;
1238 h->rdc_vsize = size + FBA_SIZE(len);
1239 vec = kmem_zalloc(h->rdc_vsize, KM_SLEEP);
1240
1241 if (!vec) {
1242 rc = ENOMEM;
1243 goto error;
1244 }
1245
1246 /* single flat buffer */
1247
1248 vec[0].sv_addr = (uchar_t *)vec + size;
1249 vec[0].sv_len = FBA_SIZE(len);
1250 vec[0].sv_vme = 0;
1251
1252 /* null terminator */
1253
1254 vec[1].sv_addr = NULL;
1255 vec[1].sv_len = 0;
1256 vec[1].sv_vme = 0;
1257
1258 h->rdc_bufh.sb_vec = vec;
1259 h->rdc_flags |= RDC_REMOTE_BUF;
1260 h->rdc_flags |= RDC_VEC_ALLOC;
1261 }
1262
1263 if (flag & NSC_READ) {
1264 rc = _rdc_remote_read(krdc, &h->rdc_bufh,
1265 pos, len, flag);
1266 } else {
1267 rc = NSC_DONE;
1268 }
1269 }
1270 error:
1271 if (!RDC_SUCCESS(rc)) {
1272 h->rdc_bufh.sb_error = rc;
1273 }
1274
1275 return (rc);
1276 }
1277
1278
1279 /*
1280 * _rdc_free_buf
1281 */
1282
1283 static int
_rdc_free_buf(rdc_buf_t * h)1284 _rdc_free_buf(rdc_buf_t *h)
1285 {
1286 int rc = 0;
1287
1288 if (h->rdc_flags & RDC_ALLOC) {
1289 if (h->rdc_bufp) {
1290 rc = nsc_free_buf(h->rdc_bufp);
1291 }
1292 h->rdc_flags &= ~(RDC_ALLOC);
1293
1294 if (!RDC_SUCCESS(rc)) {
1295 #ifdef DEBUG
1296 cmn_err(CE_WARN,
1297 "!_rdc_free_buf(%p): nsc_free_buf(%p) returned %d",
1298 (void *) h, (void *) h->rdc_bufp, rc);
1299 #endif
1300 return (rc);
1301 }
1302 }
1303
1304 if (h->rdc_flags & (RDC_REMOTE_BUF|RDC_VEC_ALLOC)) {
1305 if (h->rdc_flags & RDC_VEC_ALLOC) {
1306 kmem_free(h->rdc_bufh.sb_vec, h->rdc_vsize);
1307 }
1308 h->rdc_flags &= ~(RDC_REMOTE_BUF|RDC_VEC_ALLOC);
1309 }
1310
1311 if (h->rdc_anon) {
1312 /* anon buffers still pending */
1313 DTRACE_PROBE1(rdc_free_buf_err, aio_buf_t, h->rdc_anon);
1314 }
1315
1316 if ((h->rdc_bufh.sb_flag & NSC_HALLOCATED) == 0) {
1317 rc = _rdc_free_handle(h, h->rdc_fd);
1318 if (!RDC_SUCCESS(rc)) {
1319 #ifdef DEBUG
1320 cmn_err(CE_WARN,
1321 "!_rdc_free_buf(%p): _rdc_free_handle returned %d",
1322 (void *) h, rc);
1323 #endif
1324 return (rc);
1325 }
1326 } else {
1327 h->rdc_bufh.sb_flag = NSC_HALLOCATED;
1328 h->rdc_bufh.sb_vec = NULL;
1329 h->rdc_bufh.sb_error = 0;
1330 h->rdc_bufh.sb_pos = 0;
1331 h->rdc_bufh.sb_len = 0;
1332 h->rdc_anon = NULL;
1333 h->rdc_vsize = 0;
1334
1335 cv_destroy(&h->rdc_sync.cv);
1336 mutex_destroy(&h->rdc_sync.lock);
1337
1338 }
1339
1340 return (0);
1341 }
1342
1343
1344 /*
1345 * _rdc_open
1346 * Open a device
1347 *
1348 * Calling/Exit State:
1349 * Returns a token to identify the device.
1350 *
1351 * Description:
1352 * Performs the housekeeping operations associated with an upper layer
1353 * of the nsctl stack opening a device.
1354 */
1355
1356 /* ARGSUSED */
1357
1358 static int
_rdc_open(char * path,int flag,blind_t * cdp,nsc_iodev_t * iodev)1359 _rdc_open(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1360 {
1361 rdc_k_info_t *krdc;
1362 #ifdef DEBUG
1363 rdc_u_info_t *urdc;
1364 #endif
1365 rdc_fd_t *rfd;
1366 int raw = ((flag & NSC_CACHE) == 0);
1367 int index;
1368 int bmp = 0;
1369 int queue = 0;
1370
1371 rfd = kmem_zalloc(sizeof (*rfd), KM_SLEEP);
1372 if (!rfd)
1373 return (ENOMEM);
1374
1375 /*
1376 * Take config lock to prevent a race with the
1377 * (de)configuration code.
1378 */
1379
1380 mutex_enter(&rdc_conf_lock);
1381
1382 index = rdc_lookup_enabled(path, 0);
1383 if (index < 0) {
1384 index = rdc_lookup_bitmap(path);
1385 if (index >= 0)
1386 bmp = 1;
1387 }
1388 if (index < 0) {
1389 index = rdc_lookup_diskq(path);
1390 if (index >= 0)
1391 queue = 1;
1392 }
1393 if (index < 0) {
1394 /* not found in config */
1395 mutex_exit(&rdc_conf_lock);
1396 kmem_free(rfd, sizeof (*rfd));
1397 return (ENXIO);
1398 }
1399 #ifdef DEBUG
1400 urdc = &rdc_u_info[index];
1401 #endif
1402 krdc = &rdc_k_info[index];
1403
1404 mutex_exit(&rdc_conf_lock);
1405
1406 rdc_group_enter(krdc);
1407
1408 ASSERT(IS_ENABLED(urdc));
1409
1410 if (bmp) {
1411 krdc->b_ref++;
1412 } else if (raw) {
1413 krdc->r_ref++;
1414 } else if (!queue) {
1415 krdc->c_ref++;
1416 }
1417
1418 rfd->rdc_info = krdc;
1419 if (bmp)
1420 rfd->rdc_type = RDC_BMP;
1421 else if (queue)
1422 rfd->rdc_type = RDC_QUE;
1423 else
1424 rfd->rdc_oflags = flag;
1425
1426 rdc_group_exit(krdc);
1427
1428 *cdp = (blind_t)rfd;
1429
1430 return (0);
1431 }
1432
1433 static int
_rdc_openc(char * path,int flag,blind_t * cdp,nsc_iodev_t * iodev)1434 _rdc_openc(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1435 {
1436 return (_rdc_open(path, NSC_CACHE|flag, cdp, iodev));
1437 }
1438
1439 static int
_rdc_openr(char * path,int flag,blind_t * cdp,nsc_iodev_t * iodev)1440 _rdc_openr(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1441 {
1442 return (_rdc_open(path, NSC_DEVICE|flag, cdp, iodev));
1443 }
1444
1445
1446 /*
1447 * _rdc_close
1448 * Close a device
1449 *
1450 * Calling/Exit State:
1451 * Always succeeds - returns 0
1452 *
1453 * Description:
1454 * Performs the housekeeping operations associated with an upper layer
1455 * of the sd stack closing a shadowed device.
1456 */
1457
1458 static int
_rdc_close(rfd)1459 _rdc_close(rfd)
1460 rdc_fd_t *rfd;
1461 {
1462 rdc_k_info_t *krdc = rfd->rdc_info;
1463 int bmp = RDC_IS_BMP(rfd);
1464 int raw = RDC_IS_RAW(rfd);
1465 int queue = RDC_IS_QUE(rfd);
1466
1467 /*
1468 * we don't keep ref counts for the queue, so skip this stuff.
1469 * we may not even have a valid krdc at this point
1470 */
1471 if (queue)
1472 goto queue;
1473 rdc_group_enter(krdc);
1474
1475 if (bmp) {
1476 krdc->b_ref--;
1477 } else if (raw && !queue) {
1478 krdc->r_ref--;
1479 } else if (!queue) {
1480 krdc->c_ref--;
1481 }
1482
1483 if (krdc->closing) {
1484 cv_broadcast(&krdc->closingcv);
1485 }
1486
1487 rdc_group_exit(krdc);
1488 queue:
1489 kmem_free(rfd, sizeof (*rfd));
1490 return (0);
1491 }
1492
1493 /*
1494 * _rdc_alloc_handle
1495 * Allocate a handle
1496 *
1497 */
1498
1499 static nsc_buf_t *
_rdc_alloc_handle(void (* d_cb)(),void (* r_cb)(),void (* w_cb)(),rdc_fd_t * rfd)1500 _rdc_alloc_handle(void (*d_cb)(), void (*r_cb)(), void (*w_cb)(), rdc_fd_t *rfd)
1501 {
1502 rdc_buf_t *h;
1503
1504 h = kmem_zalloc(sizeof (*h), KM_SLEEP);
1505 if (!h)
1506 return (NULL);
1507
1508 h->rdc_bufp = nsc_alloc_handle(RDC_FD(rfd), d_cb, r_cb, w_cb);
1509 if (!h->rdc_bufp) {
1510 if (!IS_RFAILED(rfd->rdc_info)) {
1511 /*
1512 * This is a real failure from the io provider below.
1513 */
1514 kmem_free(h, sizeof (*h));
1515 return (NULL);
1516 } else {
1517 /* EMPTY */
1518 /*
1519 * This is just a failed primary device where
1520 * we can do remote io to the secondary.
1521 */
1522 }
1523 }
1524
1525 h->rdc_bufh.sb_flag = NSC_HALLOCATED;
1526 h->rdc_fd = rfd;
1527 mutex_init(&h->aio_lock, NULL, MUTEX_DRIVER, NULL);
1528
1529 return (&h->rdc_bufh);
1530 }
1531
1532
1533 /*
1534 * _rdc_free_handle
1535 * Free a handle
1536 *
1537 */
1538
1539 /* ARGSUSED */
1540 static int
_rdc_free_handle(rdc_buf_t * h,rdc_fd_t * rfd)1541 _rdc_free_handle(rdc_buf_t *h, rdc_fd_t *rfd)
1542 {
1543 int rc;
1544
1545 mutex_destroy(&h->aio_lock);
1546 if (h->rdc_bufp) {
1547 rc = nsc_free_handle(h->rdc_bufp);
1548 if (!RDC_SUCCESS(rc))
1549 return (rc);
1550 }
1551 kmem_free(h, sizeof (rdc_buf_t));
1552 return (0);
1553 }
1554
1555
1556 /*
1557 * _rdc_attach
1558 * Attach
1559 *
1560 * Calling/Exit State:
1561 * Returns 0 for success, errno on failure.
1562 *
1563 * Description:
1564 */
1565
1566 static int
_rdc_attach(rdc_fd_t * rfd,nsc_iodev_t * iodev)1567 _rdc_attach(rdc_fd_t *rfd, nsc_iodev_t *iodev)
1568 {
1569 rdc_k_info_t *krdc;
1570 int raw = RDC_IS_RAW(rfd);
1571 int rc;
1572
1573 if ((RDC_IS_BMP(rfd)) || RDC_IS_QUE(rfd))
1574 return (EINVAL);
1575
1576 krdc = rfd->rdc_info;
1577 if (krdc == NULL)
1578 return (EINVAL);
1579
1580 mutex_enter(&krdc->devices->id_rlock);
1581 krdc->iodev = iodev;
1582 mutex_exit(&krdc->devices->id_rlock);
1583
1584 rc = _rdc_rsrv_devs(krdc, (raw ? RDC_RAW : RDC_CACHE), RDC_EXTERNAL);
1585 return (rc);
1586 }
1587
1588
1589 /*
1590 * _rdc_detach
1591 * Detach
1592 *
1593 * Calling/Exit State:
1594 * Returns 0 for success, always succeeds
1595 *
1596 * Description:
1597 */
1598
1599 static int
_rdc_detach(rdc_fd_t * rfd,nsc_iodev_t * iodev)1600 _rdc_detach(rdc_fd_t *rfd, nsc_iodev_t *iodev)
1601 {
1602 rdc_k_info_t *krdc = rfd->rdc_info;
1603 int raw = RDC_IS_RAW(rfd);
1604
1605 /*
1606 * Flush the async queue if necessary.
1607 */
1608
1609 if (IS_ASYNC(&rdc_u_info[krdc->index]) && !RDC_IS_DISKQ(krdc->group)) {
1610 int tries = 1;
1611
1612 while (krdc->group->ra_queue.blocks != 0 && tries--) {
1613 if (!krdc->group->rdc_writer)
1614 (void) rdc_writer(krdc->index);
1615
1616 (void) rdc_drain_queue(krdc->index);
1617 }
1618
1619 /* force disgard of possibly blocked flusher threads */
1620 if (rdc_drain_queue(krdc->index) != 0) {
1621 #ifdef DEBUG
1622 net_queue *qp = &krdc->group->ra_queue;
1623 #endif
1624 do {
1625 mutex_enter(&krdc->group->ra_queue.net_qlock);
1626 krdc->group->asyncdis = 1;
1627 cv_broadcast(&krdc->group->asyncqcv);
1628 mutex_exit(&krdc->group->ra_queue.net_qlock);
1629 cmn_err(CE_WARN,
1630 "!RDC: async I/O pending and not drained "
1631 "for %s during detach",
1632 rdc_u_info[krdc->index].primary.file);
1633 #ifdef DEBUG
1634 cmn_err(CE_WARN,
1635 "!nitems: %" NSC_SZFMT " nblocks: %"
1636 NSC_SZFMT " head: 0x%p tail: 0x%p",
1637 qp->nitems, qp->blocks,
1638 (void *)qp->net_qhead,
1639 (void *)qp->net_qtail);
1640 #endif
1641 } while (krdc->group->rdc_thrnum > 0);
1642 }
1643 }
1644
1645 mutex_enter(&krdc->devices->id_rlock);
1646 if (krdc->iodev != iodev)
1647 cmn_err(CE_WARN, "!_rdc_detach: iodev mismatch %p : %p",
1648 (void *) krdc->iodev, (void *) iodev);
1649
1650 krdc->iodev = NULL;
1651 mutex_exit(&krdc->devices->id_rlock);
1652
1653 _rdc_rlse_devs(krdc, (raw ? RDC_RAW : RDC_CACHE));
1654
1655 return (0);
1656 }
1657
1658 /*
1659 * _rdc_get_pinned
1660 *
1661 * only affects local node.
1662 */
1663
1664 static int
_rdc_get_pinned(rdc_fd_t * rfd)1665 _rdc_get_pinned(rdc_fd_t *rfd)
1666 {
1667 return (nsc_get_pinned(RDC_FD(rfd)));
1668 }
1669
1670 /*
1671 * _rdc_discard_pinned
1672 *
1673 * only affects local node.
1674 */
1675
1676 static int
_rdc_discard_pinned(rdc_fd_t * rfd,nsc_off_t pos,nsc_size_t len)1677 _rdc_discard_pinned(rdc_fd_t *rfd, nsc_off_t pos, nsc_size_t len)
1678 {
1679 return (nsc_discard_pinned(RDC_FD(rfd), pos, len));
1680 }
1681
1682 /*
1683 * _rdc_partsize
1684 *
1685 * only affects the local node.
1686 */
1687
1688 static int
_rdc_partsize(rdc_fd_t * rfd,nsc_size_t * ptr)1689 _rdc_partsize(rdc_fd_t *rfd, nsc_size_t *ptr)
1690 {
1691 rdc_u_info_t *urdc;
1692
1693 urdc = &rdc_u_info[rfd->rdc_info->index];
1694 /* Always return saved size */
1695 ASSERT(urdc->volume_size != 0);
1696 *ptr = urdc->volume_size;
1697 return (0);
1698 }
1699
1700 /*
1701 * _rdc_maxfbas
1702 *
1703 * only affects local node
1704 */
1705
1706 /* ARGSUSED */
1707 static int
_rdc_maxfbas(rdc_fd_t * rfd,int flag,nsc_size_t * ptr)1708 _rdc_maxfbas(rdc_fd_t *rfd, int flag, nsc_size_t *ptr)
1709 {
1710 rdc_k_info_t *krdc = rfd->rdc_info;
1711 int raw = RDC_IS_RAW(rfd);
1712 int rtype = raw ? RDC_RAW : RDC_CACHE;
1713 int rc = 0;
1714
1715 if (krdc == NULL)
1716 return (EINVAL);
1717 if (flag == NSC_RDAHEAD || flag == NSC_CACHEBLK) {
1718 rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
1719 if (rc == 0) {
1720 rc = nsc_maxfbas(RDC_U_FD(krdc), flag, ptr);
1721 _rdc_rlse_devs(krdc, rtype);
1722 }
1723 } else {
1724 /* Always return saved size */
1725 ASSERT(krdc->maxfbas != 0);
1726 *ptr = krdc->maxfbas - 1;
1727 }
1728
1729 return (rc);
1730 }
1731
1732 /* ARGSUSED */
1733 static int
_rdc_control(rdc_fd_t * rfd,int cmd,void * ptr,int len)1734 _rdc_control(rdc_fd_t *rfd, int cmd, void *ptr, int len)
1735 {
1736 return (nsc_control(RDC_FD(rfd), cmd, ptr, len));
1737 }
1738
1739 /*
1740 * _rdc_attach_fd
1741 *
1742 * called by nsctl as part of nsc_reserve() processing when one of
1743 * SNDR's underlying file descriptors becomes available and metadata
1744 * should be re-acquired.
1745 */
1746 static int
_rdc_attach_fd(blind_t arg)1747 _rdc_attach_fd(blind_t arg)
1748 {
1749 _rdc_info_dev_t *dip = (_rdc_info_dev_t *)arg;
1750 rdc_k_info_t *krdc;
1751 rdc_u_info_t *urdc;
1752 nsc_size_t maxfbas, partsize;
1753 int rc;
1754
1755 krdc = dip->bi_krdc;
1756 urdc = &rdc_u_info[krdc->index];
1757
1758 if ((rc = nsc_partsize(dip->bi_fd, &partsize)) != 0) {
1759 cmn_err(CE_WARN,
1760 "!SNDR: cannot get volume size of %s, error %d",
1761 nsc_pathname(dip->bi_fd), rc);
1762 } else if (urdc->volume_size == 0 && partsize > 0) {
1763 /* set volume size for the first time */
1764 urdc->volume_size = partsize;
1765 } else if (urdc->volume_size != partsize) {
1766 /*
1767 * SNDR cannot yet cope with a volume being resized,
1768 * so fail it.
1769 */
1770 if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
1771 rdc_many_enter(krdc);
1772 if (rdc_get_vflags(urdc) & RDC_PRIMARY)
1773 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
1774 else
1775 rdc_set_mflags(urdc, RDC_SYNC_NEEDED);
1776 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
1777 "volume resized");
1778 rdc_many_exit(krdc);
1779 rdc_write_state(urdc);
1780 }
1781
1782 cmn_err(CE_WARN,
1783 "!SNDR: %s changed size from %" NSC_SZFMT " to %" NSC_SZFMT,
1784 nsc_pathname(dip->bi_fd), urdc->volume_size, partsize);
1785 }
1786
1787 if ((rc = nsc_maxfbas(dip->bi_fd, 0, &maxfbas)) != 0) {
1788 cmn_err(CE_WARN,
1789 "!SNDR: cannot get max transfer size for %s, error %d",
1790 nsc_pathname(dip->bi_fd), rc);
1791 } else if (maxfbas > 0) {
1792 krdc->maxfbas = min(RDC_MAX_MAXFBAS, maxfbas);
1793 }
1794
1795 return (0);
1796 }
1797
1798
1799 /*
1800 * _rdc_pinned
1801 *
1802 * only affects local node
1803 */
1804
1805 static void
_rdc_pinned(_rdc_info_dev_t * dip,nsc_off_t pos,nsc_size_t len)1806 _rdc_pinned(_rdc_info_dev_t *dip, nsc_off_t pos, nsc_size_t len)
1807 {
1808 nsc_pinned_data(dip->bi_krdc->iodev, pos, len);
1809 }
1810
1811
1812 /*
1813 * _rdc_unpinned
1814 *
1815 * only affects local node.
1816 */
1817
1818 static void
_rdc_unpinned(_rdc_info_dev_t * dip,nsc_off_t pos,nsc_size_t len)1819 _rdc_unpinned(_rdc_info_dev_t *dip, nsc_off_t pos, nsc_size_t len)
1820 {
1821 nsc_unpinned_data(dip->bi_krdc->iodev, pos, len);
1822 }
1823
1824
1825 /*
1826 * _rdc_read
1827 *
1828 * read the specified data into the buffer - go remote if local down,
1829 * or the remote end has more recent data because an reverse sync is
1830 * in progress.
1831 */
1832
1833 static int
_rdc_read(rdc_buf_t * h,nsc_off_t pos,nsc_size_t len,int flag)1834 _rdc_read(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
1835 {
1836 rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
1837 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1838 int remote = (RDC_REMOTE(h) || (rdc_get_mflags(urdc) & RDC_SLAVE));
1839 int rc1, rc2;
1840
1841 rc1 = rc2 = 0;
1842
1843 if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
1844 cmn_err(CE_WARN,
1845 "!_rdc_read: bounds check: io(handle) pos %" NSC_XSZFMT
1846 "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
1847 pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
1848 h->rdc_bufh.sb_error = EINVAL;
1849 return (h->rdc_bufh.sb_error);
1850 }
1851
1852 if (flag & NSC_NOBLOCK) {
1853 cmn_err(CE_WARN,
1854 "!_rdc_read: removing unsupported NSC_NOBLOCK flag");
1855 flag &= ~(NSC_NOBLOCK);
1856 }
1857
1858
1859 if (!remote) {
1860 rc1 = nsc_read(h->rdc_bufp, pos, len, flag);
1861 }
1862
1863 if (remote || !RDC_SUCCESS(rc1)) {
1864 rc2 = _rdc_remote_read(krdc, &h->rdc_bufh, pos, len, flag);
1865 }
1866
1867 if (remote && !RDC_SUCCESS(rc2))
1868 h->rdc_bufh.sb_error = rc2;
1869 else if (!RDC_SUCCESS(rc1) && !RDC_SUCCESS(rc2))
1870 h->rdc_bufh.sb_error = rc1;
1871
1872 return (h->rdc_bufh.sb_error);
1873 }
1874
1875
1876 static int
_rdc_remote_write(rdc_k_info_t * krdc,rdc_buf_t * h,nsc_buf_t * nsc_h,nsc_off_t pos,nsc_size_t len,int flag,uint_t bitmask)1877 _rdc_remote_write(rdc_k_info_t *krdc, rdc_buf_t *h, nsc_buf_t *nsc_h,
1878 nsc_off_t pos, nsc_size_t len, int flag, uint_t bitmask)
1879 {
1880 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1881 int rc = 0;
1882 nsc_size_t plen, syncblockpos;
1883 aio_buf_t *anon = NULL;
1884
1885 if (!(rdc_get_vflags(urdc) & RDC_PRIMARY))
1886 return (EINVAL);
1887
1888 if ((rdc_get_vflags(urdc) & RDC_LOGGING) &&
1889 (!IS_STATE(urdc, RDC_QUEUING))) {
1890 goto done;
1891 }
1892
1893 /*
1894 * this check for RDC_SYNCING may seem redundant, but there is a window
1895 * in rdc_sync, where an async set has not yet been transformed into a
1896 * sync set.
1897 */
1898 if ((!IS_ASYNC(urdc) || IS_STATE(urdc, RDC_SYNCING)) ||
1899 RDC_REMOTE(h) ||
1900 krdc->group->synccount > 0 ||
1901 (rdc_get_vflags(urdc) & RDC_SLAVE) ||
1902 (rdc_get_vflags(urdc) & RDC_VOL_FAILED) ||
1903 (rdc_get_vflags(urdc) & RDC_BMP_FAILED)) {
1904
1905 /* sync mode, or remote io mode, or local device is dead */
1906 rc = rdc_net_write(krdc->index, krdc->remote_index,
1907 nsc_h, pos, len, RDC_NOSEQ, RDC_NOQUE, NULL);
1908
1909 if ((rc == 0) &&
1910 !(rdc_get_vflags(urdc) & RDC_BMP_FAILED) &&
1911 !(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
1912 if (IS_STATE(urdc, RDC_SYNCING) &&
1913 !IS_STATE(urdc, RDC_FULL) ||
1914 !IS_STATE(urdc, RDC_SLAVE)) {
1915 mutex_enter(&krdc->syncbitmutex);
1916
1917 syncblockpos = LOG_TO_FBA_NUM(krdc->syncbitpos);
1918
1919 DTRACE_PROBE4(rdc_remote_write,
1920 nsc_off_t, krdc->syncbitpos,
1921 nsc_off_t, syncblockpos,
1922 nsc_off_t, pos,
1923 nsc_size_t, len);
1924
1925 /*
1926 * If the current I/O's position plus length is
1927 * greater then the sync block position, only
1928 * clear those blocks upto sync block position
1929 */
1930 if (pos < syncblockpos) {
1931 if ((pos + len) > syncblockpos)
1932 plen = syncblockpos - pos;
1933 else
1934 plen = len;
1935 RDC_CLR_BITMAP(krdc, pos, plen, bitmask,
1936 RDC_BIT_BUMP);
1937 }
1938 mutex_exit(&krdc->syncbitmutex);
1939 } else {
1940 RDC_CLR_BITMAP(krdc, pos, len, bitmask,
1941 RDC_BIT_BUMP);
1942 }
1943 } else if (rc != 0) {
1944 rdc_group_enter(krdc);
1945 rdc_set_flags_log(urdc, RDC_LOGGING,
1946 "net write failed");
1947 rdc_write_state(urdc);
1948 if (rdc_get_vflags(urdc) & RDC_SYNCING)
1949 krdc->disk_status = 1;
1950 rdc_group_exit(krdc);
1951 }
1952 } else if (!IS_STATE(urdc, RDC_SYNCING)) {
1953 DTRACE_PROBE1(async_enque_start, rdc_buf_t *, h);
1954
1955 ASSERT(krdc->group->synccount == 0);
1956 /* async mode */
1957 if ((h == NULL) || ((h->rdc_flags & RDC_ASYNC_VEC) == 0)) {
1958
1959 rc = _rdc_enqueue_write(krdc, pos, len, flag, NULL);
1960
1961 } else {
1962 anon = rdc_aio_buf_get(h, krdc->index);
1963 if (anon == NULL) {
1964 #ifdef DEBUG
1965 cmn_err(CE_WARN,
1966 "!enqueue write failed for handle %p",
1967 (void *) h);
1968 #endif
1969 return (EINVAL);
1970 }
1971 rc = _rdc_enqueue_write(krdc, pos, len, flag,
1972 anon->rdc_abufp);
1973
1974 /*
1975 * get rid of the aio_buf_t now, as this
1976 * may not be the set that this rdc_buf
1977 * was allocated on, we are done with it anyways
1978 * enqueuing code frees the nsc_abuf
1979 */
1980 rdc_aio_buf_del(h, krdc);
1981 }
1982
1983 } else {
1984 ASSERT(IS_STATE(urdc, RDC_SYNCING));
1985 ASSERT(0);
1986 }
1987
1988 done:
1989 if ((anon == NULL) && h && (h->rdc_flags & RDC_ASYNC_VEC)) {
1990 /*
1991 * Toss the anonymous buffer if we have one allocated.
1992 */
1993 anon = rdc_aio_buf_get(h, krdc->index);
1994 if (anon) {
1995 (void) nsc_free_buf(anon->rdc_abufp);
1996 rdc_aio_buf_del(h, krdc);
1997 }
1998 }
1999
2000 return (rc);
2001 }
2002
2003 /*
2004 * _rdc_multi_write
2005 *
2006 * Send to multihop remote. Obeys 1 to many if present and we are crazy
2007 * enough to support it.
2008 *
2009 */
2010 int
_rdc_multi_write(nsc_buf_t * h,nsc_off_t pos,nsc_size_t len,int flag,rdc_k_info_t * krdc)2011 _rdc_multi_write(nsc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag,
2012 rdc_k_info_t *krdc)
2013 {
2014 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2015 rdc_k_info_t *this = krdc; /* krdc that was requested */
2016 int rc, retval;
2017 uint_t bitmask;
2018
2019 retval = rc = 0;
2020 if (!RDC_HANDLE_LIMITS(h, pos, len)) {
2021 cmn_err(CE_WARN,
2022 "!_rdc_multi_write: bounds check: io(handle) pos %"
2023 NSC_XSZFMT "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%"
2024 NSC_XSZFMT ")", pos, h->sb_pos, len, h->sb_len);
2025 return (EINVAL);
2026 }
2027
2028 /* if this is a 1 to many, set all the bits for all the sets */
2029 do {
2030 if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2031 (void) nsc_uncommit(h, pos, len, flag);
2032 /* set the error, but try other sets */
2033 retval = EIO;
2034 }
2035 if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
2036 rdc_many_enter(krdc);
2037 for (krdc = krdc->many_next; krdc != this;
2038 krdc = krdc->many_next) {
2039 urdc = &rdc_u_info[krdc->index];
2040 if (!IS_ENABLED(urdc))
2041 continue;
2042 break;
2043 }
2044 rdc_many_exit(krdc);
2045 }
2046 } while (krdc != this);
2047
2048 urdc = &rdc_u_info[krdc->index];
2049
2050 if (flag & NSC_NOBLOCK) {
2051 cmn_err(CE_WARN,
2052 "!_rdc_multi_write: removing unsupported NSC_NOBLOCK flag");
2053 flag &= ~(NSC_NOBLOCK);
2054 }
2055
2056 multiwrite1:
2057 if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
2058 (!IS_STATE(urdc, RDC_LOGGING) ||
2059 (IS_STATE(urdc, RDC_LOGGING) &&
2060 IS_STATE(urdc, RDC_QUEUING)))) {
2061 rc = _rdc_remote_write(krdc, NULL, h, pos, len, flag, bitmask);
2062 }
2063
2064 if (!RDC_SUCCESS(rc) && retval == 0) {
2065 retval = rc;
2066 }
2067
2068 multiwrite2:
2069 if (IS_MANY(krdc) && (rdc_get_vflags(urdc) && RDC_PRIMARY)) {
2070 rdc_many_enter(krdc);
2071 for (krdc = krdc->many_next; krdc != this;
2072 krdc = krdc->many_next) {
2073 urdc = &rdc_u_info[krdc->index];
2074 if (!IS_ENABLED(urdc))
2075 continue;
2076 rc = 0;
2077 rdc_many_exit(krdc);
2078
2079 goto multiwrite1;
2080 }
2081 rdc_many_exit(krdc);
2082 }
2083
2084 return (retval);
2085 }
2086
2087 void
_rdc_diskq_enqueue_thr(rdc_aio_t * p)2088 _rdc_diskq_enqueue_thr(rdc_aio_t *p)
2089 {
2090 rdc_thrsync_t *sync = (rdc_thrsync_t *)p->next;
2091 rdc_k_info_t *krdc = &rdc_k_info[p->index];
2092 int rc2;
2093
2094
2095 rc2 = rdc_diskq_enqueue(krdc, p);
2096
2097 /*
2098 * overload flag with error return if any
2099 */
2100 if (!RDC_SUCCESS(rc2)) {
2101 p->flag = rc2;
2102 } else {
2103 p->flag = 0;
2104 }
2105 mutex_enter(&sync->lock);
2106 sync->complete++;
2107 cv_broadcast(&sync->cv);
2108 mutex_exit(&sync->lock);
2109 }
2110
2111 /*
2112 * _rdc_sync_write_thr
2113 * syncronous write thread which writes to network while
2114 * local write is occuring
2115 */
2116 void
_rdc_sync_write_thr(rdc_aio_t * p)2117 _rdc_sync_write_thr(rdc_aio_t *p)
2118 {
2119 rdc_thrsync_t *sync = (rdc_thrsync_t *)p->next;
2120 rdc_buf_t *h = (rdc_buf_t *)p->handle;
2121 rdc_k_info_t *krdc = &rdc_k_info[p->index];
2122 #ifdef DEBUG
2123 rdc_u_info_t *urdc;
2124 #endif
2125 int rc2;
2126 int bitmask;
2127
2128 rdc_group_enter(krdc);
2129 krdc->aux_state |= RDC_AUXWRITE;
2130 #ifdef DEBUG
2131 urdc = &rdc_u_info[krdc->index];
2132 if (!IS_ENABLED(urdc)) {
2133 cmn_err(CE_WARN, "!rdc_sync_write_thr: set not enabled %s:%s",
2134 urdc->secondary.file,
2135 urdc->secondary.bitmap);
2136 }
2137 #endif
2138 rdc_group_exit(krdc);
2139 bitmask = p->iostatus; /* overload */
2140 rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh, p->pos, p->len,
2141 p->flag, bitmask);
2142
2143
2144 /*
2145 * overload flag with error return if any
2146 */
2147 if (!RDC_SUCCESS(rc2)) {
2148 p->flag = rc2;
2149 } else {
2150 p->flag = 0;
2151 }
2152
2153 rdc_group_enter(krdc);
2154 krdc->aux_state &= ~RDC_AUXWRITE;
2155 rdc_group_exit(krdc);
2156
2157 mutex_enter(&sync->lock);
2158 sync->complete++;
2159 cv_broadcast(&sync->cv);
2160 mutex_exit(&sync->lock);
2161 }
2162
2163 /*
2164 * _rdc_write
2165 *
2166 * Commit changes to the buffer locally and send remote.
2167 *
2168 * If this write is whilst the local primary volume is being synced,
2169 * then we write the remote end first to ensure that the new data
2170 * cannot be overwritten by a concurrent sync operation.
2171 */
2172
2173 static int
_rdc_write(rdc_buf_t * h,nsc_off_t pos,nsc_size_t len,int flag)2174 _rdc_write(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2175 {
2176 rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
2177 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2178 rdc_k_info_t *this;
2179 rdc_k_info_t *multi = NULL;
2180 int remote = RDC_REMOTE(h);
2181 int rc1, rc2;
2182 uint_t bitmask;
2183 int first;
2184 int rsync;
2185 int nthr;
2186 int winddown;
2187 int thrrc = 0;
2188 rdc_aio_t *bp[SNDR_MAXTHREADS];
2189 aio_buf_t *anon;
2190 nsthread_t *tp;
2191 rdc_thrsync_t *sync = &h->rdc_sync;
2192
2193 /* If this is the multi-hop secondary, move along to the primary */
2194 if (IS_MULTI(krdc) && !IS_PRIMARY(urdc)) {
2195 multi = krdc;
2196 krdc = krdc->multi_next;
2197 urdc = &rdc_u_info[krdc->index];
2198
2199 if (!IS_ENABLED(urdc)) {
2200 krdc = h->rdc_fd->rdc_info;
2201 urdc = &rdc_u_info[krdc->index];
2202 multi = NULL;
2203 }
2204 }
2205 this = krdc;
2206
2207 rsync = (IS_PRIMARY(urdc)) && (IS_SLAVE(urdc));
2208
2209 /*
2210 * If this is a many group with a reverse sync in progress and
2211 * this is not the slave krdc/urdc, then search for the slave
2212 * so that we can do the remote io to the correct secondary
2213 * before the local io.
2214 */
2215 if (rsync && !(IS_SLAVE(urdc))) {
2216 rdc_many_enter(krdc);
2217 for (krdc = krdc->many_next; krdc != this;
2218 krdc = krdc->many_next) {
2219 urdc = &rdc_u_info[krdc->index];
2220 if (!IS_ENABLED(urdc))
2221 continue;
2222 if (rdc_get_vflags(urdc) & RDC_SLAVE)
2223 break;
2224 }
2225 rdc_many_exit(krdc);
2226
2227 this = krdc;
2228 }
2229
2230 urdc = &rdc_u_info[krdc->index];
2231
2232 rc1 = rc2 = 0;
2233 first = 1;
2234 nthr = 0;
2235 if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2236 cmn_err(CE_WARN,
2237 "!_rdc_write: bounds check: io(handle) pos %" NSC_XSZFMT
2238 "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2239 pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2240 h->rdc_bufh.sb_error = EINVAL;
2241 return (h->rdc_bufh.sb_error);
2242 }
2243
2244 DTRACE_PROBE(rdc_write_bitmap_start);
2245
2246 /* if this is a 1 to many, set all the bits for all the sets */
2247 do {
2248 if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2249 if (rdc_eio_nobmp) {
2250 (void) nsc_uncommit
2251 (h->rdc_bufp, pos, len, flag);
2252 /* set the error, but try the other sets */
2253 h->rdc_bufh.sb_error = EIO;
2254 }
2255 }
2256
2257 if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
2258 rdc_many_enter(krdc);
2259 for (krdc = krdc->many_next; krdc != this;
2260 krdc = krdc->many_next) {
2261 urdc = &rdc_u_info[krdc->index];
2262 if (!IS_ENABLED(urdc))
2263 continue;
2264 break;
2265 }
2266 rdc_many_exit(krdc);
2267 }
2268
2269 } while (krdc != this);
2270
2271 urdc = &rdc_u_info[krdc->index];
2272
2273 DTRACE_PROBE(rdc_write_bitmap_end);
2274
2275 write1:
2276 /* just in case we switch mode during write */
2277 if (IS_ASYNC(urdc) && (!IS_STATE(urdc, RDC_SYNCING)) &&
2278 (!IS_STATE(urdc, RDC_LOGGING) ||
2279 IS_STATE(urdc, RDC_QUEUING))) {
2280 h->rdc_flags |= RDC_ASYNC_BUF;
2281 }
2282 if (BUF_IS_ASYNC(h)) {
2283 /*
2284 * We are async mode
2285 */
2286 aio_buf_t *p;
2287 DTRACE_PROBE(rdc_write_async_start);
2288
2289 if ((krdc->type_flag & RDC_DISABLEPEND) ||
2290 ((IS_STATE(urdc, RDC_LOGGING) &&
2291 !IS_STATE(urdc, RDC_QUEUING)))) {
2292 goto localwrite;
2293 }
2294 if (IS_STATE(urdc, RDC_VOL_FAILED)) {
2295 /*
2296 * overload remote as we don't want to do local
2297 * IO later. forge ahead with async
2298 */
2299 remote++;
2300 }
2301 if ((IS_STATE(urdc, RDC_SYNCING)) ||
2302 (IS_STATE(urdc, RDC_LOGGING) &&
2303 !IS_STATE(urdc, RDC_QUEUING))) {
2304 goto localwrite;
2305 }
2306
2307 p = rdc_aio_buf_add(krdc->index, h);
2308 if (p == NULL) {
2309 #ifdef DEBUG
2310 cmn_err(CE_WARN,
2311 "!rdc_alloc_buf aio_buf allocation failed");
2312 #endif
2313 goto localwrite;
2314 }
2315
2316 mutex_enter(&h->aio_lock);
2317
2318 DTRACE_PROBE(rdc_write_async__allocabuf_start);
2319 rc1 = nsc_alloc_abuf(pos, len, 0, &p->rdc_abufp);
2320 DTRACE_PROBE(rdc_write_async__allocabuf_end);
2321 if (!RDC_SUCCESS(rc1)) {
2322 #ifdef DEBUG
2323 cmn_err(CE_WARN,
2324 "!rdc_alloc_buf NSC_ANON allocation failed rc %d",
2325 rc1);
2326 #endif
2327 mutex_exit(&h->aio_lock);
2328 goto localwrite;
2329 }
2330 h->rdc_flags |= RDC_ASYNC_VEC;
2331 mutex_exit(&h->aio_lock);
2332
2333 /*
2334 * Copy buffer into anonymous buffer
2335 */
2336
2337 DTRACE_PROBE(rdc_write_async_nsccopy_start);
2338 rc1 =
2339 nsc_copy(&h->rdc_bufh, p->rdc_abufp, pos, pos, len);
2340 DTRACE_PROBE(rdc_write_async_nsccopy_end);
2341 if (!RDC_SUCCESS(rc1)) {
2342 #ifdef DEBUG
2343 cmn_err(CE_WARN,
2344 "!_rdc_write: nsc_copy failed rc=%d state %x",
2345 rc1, rdc_get_vflags(urdc));
2346 #endif
2347 rc1 = nsc_free_buf(p->rdc_abufp);
2348 rdc_aio_buf_del(h, krdc);
2349 rdc_group_enter(krdc);
2350 rdc_group_log(krdc, RDC_FLUSH|RDC_OTHERREMOTE,
2351 "nsc_copy failure");
2352 rdc_group_exit(krdc);
2353 }
2354 DTRACE_PROBE(rdc_write_async_end);
2355
2356 /*
2357 * using a diskq, launch a thread to queue it
2358 * and free the aio->h and aio
2359 * if the thread fails, do it the old way (see localwrite)
2360 */
2361
2362 if (RDC_IS_DISKQ(krdc->group)) {
2363
2364 if (nthr >= SNDR_MAXTHREADS) {
2365 #ifdef DEBUG
2366 cmn_err(CE_NOTE, "!nthr overrun in _rdc_write");
2367 #endif
2368 thrrc = ENOEXEC;
2369 goto localwrite;
2370 }
2371
2372 anon = rdc_aio_buf_get(h, krdc->index);
2373 if (anon == NULL) {
2374 #ifdef DEBUG
2375 cmn_err(CE_WARN, "!rdc_aio_buf_get failed for "
2376 "%p", (void *)h);
2377 #endif
2378 thrrc = ENOEXEC;
2379 goto localwrite;
2380 }
2381
2382 /* get a populated rdc_aio_t */
2383 bp[nthr] =
2384 rdc_aio_tbuf_get(sync, anon->rdc_abufp, pos, len,
2385 flag, krdc->index, bitmask);
2386
2387 if (bp[nthr] == NULL) {
2388 #ifdef DEBUG
2389 cmn_err(CE_NOTE, "!_rdcwrite: "
2390 "kmem_alloc failed bp aio (1)");
2391 #endif
2392 thrrc = ENOEXEC;
2393 goto localwrite;
2394 }
2395 /* start the queue io */
2396 tp = nst_create(_rdc_ioset, _rdc_diskq_enqueue_thr,
2397 (void *)bp[nthr], NST_SLEEP);
2398
2399 if (tp == NULL) {
2400 #ifdef DEBUG
2401 cmn_err(CE_NOTE,
2402 "!_rdcwrite: nst_create failure");
2403 #endif
2404 thrrc = ENOEXEC;
2405 } else {
2406 mutex_enter(&(sync->lock));
2407 sync->threads++;
2408 mutex_exit(&(sync->lock));
2409 nthr++;
2410
2411 }
2412 /*
2413 * the handle that is to be enqueued is now in
2414 * the rdc_aio_t, and will be freed there.
2415 * dump the aio_t now. If this is 1 to many
2416 * we may not do this in _rdc_free_buf()
2417 * if this was not the index that the rdc_buf_t
2418 * was allocated on.
2419 */
2420 rdc_aio_buf_del(h, krdc);
2421
2422 }
2423 } /* end of async */
2424
2425 /*
2426 * We try to overlap local and network IO for the sync case
2427 * (we already do it for async)
2428 * If one to many, we need to track the resulting nst_thread
2429 * so we don't trash the nsc_buf on a free
2430 * Start network IO first then do local (sync only)
2431 */
2432
2433 if (IS_PRIMARY(urdc) && !IS_STATE(urdc, RDC_LOGGING) &&
2434 !BUF_IS_ASYNC(h)) {
2435 /*
2436 * if forward syncing, we must do local IO first
2437 * then remote io. Don't spawn thread
2438 */
2439 if (!rsync && (IS_STATE(urdc, RDC_SYNCING))) {
2440 thrrc = ENOEXEC;
2441 goto localwrite;
2442 }
2443 if (IS_MULTI(krdc)) {
2444 rdc_k_info_t *ktmp;
2445 rdc_u_info_t *utmp;
2446
2447 ktmp = krdc->multi_next;
2448 utmp = &rdc_u_info[ktmp->index];
2449 if (IS_ENABLED(utmp))
2450 multi = ktmp;
2451 }
2452 if (nthr >= SNDR_MAXTHREADS) {
2453 #ifdef DEBUG
2454 cmn_err(CE_NOTE, "!nthr overrun in _rdc_write");
2455 #endif
2456 thrrc = ENOEXEC;
2457 goto localwrite;
2458 }
2459
2460 bp[nthr] = rdc_aio_tbuf_get(sync, h, pos, len,
2461 flag, krdc->index, bitmask);
2462
2463 if (bp[nthr] == NULL) {
2464 thrrc = ENOEXEC;
2465 goto localwrite;
2466 }
2467 tp = nst_create(_rdc_ioset, _rdc_sync_write_thr,
2468 (void *)bp[nthr], NST_SLEEP);
2469 if (tp == NULL) {
2470 #ifdef DEBUG
2471 cmn_err(CE_NOTE, "!_rdcwrite: nst_create failure");
2472 #endif
2473 thrrc = ENOEXEC;
2474 } else {
2475 mutex_enter(&(sync->lock));
2476 sync->threads++;
2477 mutex_exit(&(sync->lock));
2478 nthr++;
2479 }
2480 }
2481 localwrite:
2482 if (!remote && !rsync && first) {
2483 DTRACE_PROBE(rdc_write_nscwrite_start);
2484 rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2485 DTRACE_PROBE(rdc_write_nscwrite_end);
2486 if (!RDC_SUCCESS(rc1)) {
2487 rdc_many_enter(krdc);
2488 if (IS_PRIMARY(urdc))
2489 /* Primary, so reverse sync needed */
2490 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2491 else
2492 /* Secondary, so sync needed */
2493 rdc_set_flags(urdc, RDC_SYNC_NEEDED);
2494 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2495 "local write failed");
2496 rdc_many_exit(krdc);
2497 rdc_write_state(urdc);
2498 }
2499 }
2500
2501 /*
2502 * This is where we either enqueue async IO for the flusher
2503 * or do sync IO in the case of an error in thread creation
2504 * or we are doing a forward sync
2505 * NOTE: if we are async, and using a diskq, we have
2506 * already enqueued this write.
2507 * _rdc_remote_write will end up enqueuueing to memory,
2508 * or in case of a thread creation error above, try again
2509 * enqueue the diskq write if thrrc == ENOEXEC
2510 */
2511 if ((IS_PRIMARY(urdc)) && (thrrc == ENOEXEC) ||
2512 (BUF_IS_ASYNC(h) && !RDC_IS_DISKQ(krdc->group))) {
2513 thrrc = 0;
2514 if (IS_MULTI(krdc)) {
2515 rdc_k_info_t *ktmp;
2516 rdc_u_info_t *utmp;
2517
2518 ktmp = krdc->multi_next;
2519 utmp = &rdc_u_info[ktmp->index];
2520 if (IS_ENABLED(utmp))
2521 multi = ktmp;
2522 }
2523
2524 DTRACE_PROBE(rdc_write_remote_start);
2525
2526 rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh,
2527 pos, len, flag, bitmask);
2528
2529 DTRACE_PROBE(rdc_rdcwrite_remote_end);
2530 }
2531
2532 if (!RDC_SUCCESS(rc1)) {
2533 if ((IS_PRIMARY(urdc)) && !RDC_SUCCESS(rc2)) {
2534 h->rdc_bufh.sb_error = rc1;
2535 }
2536 } else if ((remote || rsync) && !RDC_SUCCESS(rc2)) {
2537 h->rdc_bufh.sb_error = rc2;
2538 }
2539 write2:
2540 /*
2541 * If one to many, jump back into the loop to continue IO
2542 */
2543 if (IS_MANY(krdc) && (IS_PRIMARY(urdc))) {
2544 rdc_many_enter(krdc);
2545 for (krdc = krdc->many_next; krdc != this;
2546 krdc = krdc->many_next) {
2547 urdc = &rdc_u_info[krdc->index];
2548 if (!IS_ENABLED(urdc))
2549 continue;
2550 rc2 = first = 0;
2551 h->rdc_flags &= ~RDC_ASYNC_BUF;
2552 rdc_many_exit(krdc);
2553 goto write1;
2554 }
2555 rdc_many_exit(krdc);
2556 }
2557 urdc = &rdc_u_info[krdc->index];
2558
2559 /*
2560 * collect all of our threads if any
2561 */
2562 if (nthr) {
2563
2564 mutex_enter(&(sync->lock));
2565 /* wait for the threads */
2566 while (sync->complete != sync->threads) {
2567 cv_wait(&(sync->cv), &(sync->lock));
2568 }
2569 mutex_exit(&(sync->lock));
2570
2571 /* collect status */
2572
2573 winddown = 0;
2574 while (winddown < nthr) {
2575 /*
2576 * Get any error return from thread
2577 */
2578 if ((remote || rsync) && bp[winddown]->flag) {
2579 h->rdc_bufh.sb_error = bp[winddown]->flag;
2580 }
2581 if (bp[winddown])
2582 kmem_free(bp[winddown], sizeof (rdc_aio_t));
2583 winddown++;
2584 }
2585 }
2586
2587 if (rsync && !(IS_STATE(urdc, RDC_VOL_FAILED))) {
2588 rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2589 if (!RDC_SUCCESS(rc1)) {
2590 /* rsync, so reverse sync needed already set */
2591 rdc_many_enter(krdc);
2592 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2593 "rsync local write failed");
2594 rdc_many_exit(krdc);
2595 rdc_write_state(urdc);
2596
2597 /*
2598 * only report the error if a remote error
2599 * occurred as well.
2600 */
2601 if (h->rdc_bufh.sb_error)
2602 h->rdc_bufh.sb_error = rc1;
2603 }
2604 }
2605
2606 if (multi) {
2607 /* Multi-hop secondary, just set bits in the bitmap */
2608 (void) RDC_SET_BITMAP(multi, pos, len, &bitmask);
2609 }
2610
2611 return (h->rdc_bufh.sb_error);
2612 }
2613
2614
2615 static void
_rdc_bzero(nsc_buf_t * h,nsc_off_t pos,nsc_size_t len)2616 _rdc_bzero(nsc_buf_t *h, nsc_off_t pos, nsc_size_t len)
2617 {
2618 nsc_vec_t *v;
2619 uchar_t *a;
2620 size_t sz;
2621 int l;
2622
2623 if (!RDC_HANDLE_LIMITS(h, pos, len)) {
2624 cmn_err(CE_WARN,
2625 "!_rdc_bzero: bounds check: io(handle) pos %" NSC_XSZFMT
2626 "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2627 pos, h->sb_pos, len, h->sb_len);
2628 return;
2629 }
2630
2631 if (!len)
2632 return;
2633
2634 /* find starting point */
2635
2636 v = h->sb_vec;
2637 pos -= h->sb_pos;
2638
2639 for (; pos >= FBA_NUM(v->sv_len); v++)
2640 pos -= FBA_NUM(v->sv_len);
2641
2642 a = v->sv_addr + FBA_SIZE(pos);
2643 l = v->sv_len - FBA_SIZE(pos);
2644
2645 /* zero */
2646
2647 len = FBA_SIZE(len); /* convert to bytes */
2648
2649 while (len) {
2650 if (!a) /* end of vec */
2651 break;
2652
2653 sz = (size_t)min((nsc_size_t)l, len);
2654
2655 bzero(a, sz);
2656
2657 len -= sz;
2658 l -= sz;
2659 a += sz;
2660
2661 if (!l) {
2662 v++;
2663 a = v->sv_addr;
2664 l = v->sv_len;
2665 }
2666 }
2667 }
2668
2669
2670 /*
2671 * _rdc_zero
2672 *
2673 * Zero and commit the specified area of the buffer.
2674 *
2675 * If this write is whilst the local primary volume is being synced,
2676 * then we write the remote end first to ensure that the new data
2677 * cannot be overwritten by a concurrent sync operation.
2678 */
2679
2680 static int
_rdc_zero(rdc_buf_t * h,nsc_off_t pos,nsc_size_t len,int flag)2681 _rdc_zero(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2682 {
2683 rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
2684 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2685 rdc_k_info_t *this;
2686 rdc_k_info_t *multi = NULL;
2687 int remote = RDC_REMOTE(h);
2688 int rc1, rc2;
2689 uint_t bitmask;
2690 int first;
2691 int rsync;
2692
2693 /* If this is the multi-hop secondary, move along to the primary */
2694 if (IS_MULTI(krdc) && !(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
2695 multi = krdc;
2696 krdc = krdc->multi_next;
2697 urdc = &rdc_u_info[krdc->index];
2698
2699 if (!IS_ENABLED(urdc)) {
2700 krdc = h->rdc_fd->rdc_info;
2701 urdc = &rdc_u_info[krdc->index];
2702 multi = NULL;
2703 }
2704 }
2705 this = krdc;
2706
2707 rsync = ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
2708 (rdc_get_mflags(urdc) & RDC_SLAVE));
2709
2710 /*
2711 * If this is a many group with a reverse sync in progress and
2712 * this is not the slave krdc/urdc, then search for the slave
2713 * so that we can do the remote io to the correct secondary
2714 * before the local io.
2715 */
2716 if (rsync && !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
2717 rdc_many_enter(krdc);
2718 for (krdc = krdc->many_next; krdc != this;
2719 krdc = krdc->many_next) {
2720 urdc = &rdc_u_info[krdc->index];
2721 if (!IS_ENABLED(urdc))
2722 continue;
2723 if (rdc_get_vflags(urdc) & RDC_SLAVE)
2724 break;
2725 }
2726 rdc_many_exit(krdc);
2727
2728 this = krdc;
2729 }
2730
2731 rc1 = rc2 = 0;
2732 first = 1;
2733
2734 if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2735 cmn_err(CE_WARN,
2736 "!_rdc_zero: bounds check: io(handle) pos %" NSC_XSZFMT
2737 "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2738 pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2739 h->rdc_bufh.sb_error = EINVAL;
2740 return (h->rdc_bufh.sb_error);
2741 }
2742
2743 zero1:
2744 if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2745 (void) nsc_uncommit(h->rdc_bufp, pos, len, flag);
2746 h->rdc_bufh.sb_error = EIO;
2747 goto zero2;
2748 }
2749
2750 if (IS_ASYNC(urdc)) {
2751 /*
2752 * We are async mode
2753 */
2754 aio_buf_t *p;
2755
2756 if ((krdc->type_flag & RDC_DISABLEPEND) ||
2757 (rdc_get_vflags(urdc) & RDC_LOGGING)) {
2758 mutex_exit(&krdc->group->ra_queue.net_qlock);
2759 goto localzero;
2760 }
2761
2762 if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED) ||
2763 (rdc_get_vflags(urdc) & RDC_BMP_FAILED)) {
2764 mutex_exit(&krdc->group->ra_queue.net_qlock);
2765 goto zero2;
2766 }
2767 if (rdc_get_vflags(urdc) & RDC_LOGGING) {
2768 mutex_exit(&krdc->group->ra_queue.net_qlock);
2769 goto localzero;
2770 }
2771 p = rdc_aio_buf_add(krdc->index, h);
2772 if (p == NULL) {
2773 #ifdef DEBUG
2774 cmn_err(CE_WARN,
2775 "!rdc_alloc_buf aio_buf allocation failed");
2776 #endif
2777 goto localzero;
2778 }
2779 mutex_enter(&h->aio_lock);
2780 rc1 = nsc_alloc_abuf(pos, len, 0, &p->rdc_abufp);
2781 if (!RDC_SUCCESS(rc1)) {
2782 #ifdef DEBUG
2783 cmn_err(CE_WARN,
2784 "!rdc_alloc_buf NSC_ANON allocation failed rc %d",
2785 rc1);
2786 #endif
2787 mutex_exit(&h->aio_lock);
2788 goto localzero;
2789 }
2790 h->rdc_flags |= RDC_ASYNC_VEC;
2791 mutex_exit(&h->aio_lock);
2792
2793 /*
2794 * Copy buffer into anonymous buffer
2795 */
2796
2797 rc1 = nsc_zero(p->rdc_abufp, pos, len, flag);
2798 if (!RDC_SUCCESS(rc1)) {
2799 #ifdef DEBUG
2800 cmn_err(CE_WARN,
2801 "!_rdc_zero: nsc_zero failed rc=%d state %x",
2802 rc1, rdc_get_vflags(urdc));
2803 #endif
2804 rc1 = nsc_free_buf(p->rdc_abufp);
2805 rdc_aio_buf_del(h, krdc);
2806 rdc_group_enter(krdc);
2807 rdc_group_log(krdc, RDC_FLUSH | RDC_OTHERREMOTE,
2808 "nsc_zero failed");
2809 rdc_group_exit(krdc);
2810 }
2811 } /* end of async */
2812
2813 localzero:
2814
2815 if (flag & NSC_NOBLOCK) {
2816 cmn_err(CE_WARN,
2817 "!_rdc_zero: removing unsupported NSC_NOBLOCK flag");
2818 flag &= ~(NSC_NOBLOCK);
2819 }
2820
2821 if (!remote && !rsync && first) {
2822 rc1 = nsc_zero(h->rdc_bufp, pos, len, flag);
2823 if (!RDC_SUCCESS(rc1)) {
2824 ASSERT(rdc_get_vflags(urdc) & RDC_PRIMARY);
2825 rdc_many_enter(krdc);
2826 /* Primary, so reverse sync needed */
2827 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2828 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2829 "nsc_zero failed");
2830 rdc_many_exit(krdc);
2831 rdc_write_state(urdc);
2832 }
2833 }
2834
2835 /*
2836 * send new data to remote end - nsc_zero has zero'd
2837 * the data in the buffer, or _rdc_bzero will be used below.
2838 */
2839
2840 if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
2841 if (first && (remote || rsync || !RDC_SUCCESS(rc1))) {
2842 /* bzero so that we can send new data to remote node */
2843 _rdc_bzero(&h->rdc_bufh, pos, len);
2844 }
2845
2846 if (IS_MULTI(krdc)) {
2847 rdc_k_info_t *ktmp;
2848 rdc_u_info_t *utmp;
2849
2850 ktmp = krdc->multi_next;
2851 utmp = &rdc_u_info[ktmp->index];
2852 if (IS_ENABLED(utmp))
2853 multi = ktmp;
2854 }
2855
2856 rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh,
2857 pos, len, flag, bitmask);
2858 }
2859
2860 if (!RDC_SUCCESS(rc1)) {
2861 if ((rdc_get_vflags(urdc) & RDC_PRIMARY) && !RDC_SUCCESS(rc2)) {
2862 h->rdc_bufh.sb_error = rc1;
2863 }
2864 } else if ((remote || rsync) && !RDC_SUCCESS(rc2)) {
2865 h->rdc_bufh.sb_error = rc2;
2866 }
2867
2868 zero2:
2869 if (IS_MANY(krdc) && (rdc_get_vflags(urdc) && RDC_PRIMARY)) {
2870 rdc_many_enter(krdc);
2871 for (krdc = krdc->many_next; krdc != this;
2872 krdc = krdc->many_next) {
2873 urdc = &rdc_u_info[krdc->index];
2874 if (!IS_ENABLED(urdc))
2875 continue;
2876 rc2 = first = 0;
2877 rdc_many_exit(krdc);
2878 goto zero1;
2879 }
2880 rdc_many_exit(krdc);
2881 }
2882
2883 if (rsync && !(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
2884 rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2885 if (!RDC_SUCCESS(rc1)) {
2886 /* rsync, so reverse sync needed already set */
2887 rdc_many_enter(krdc);
2888 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2889 "nsc_write failed");
2890 rdc_many_exit(krdc);
2891 rdc_write_state(urdc);
2892
2893 /*
2894 * only report the error if a remote error
2895 * occurred as well.
2896 */
2897 if (h->rdc_bufh.sb_error)
2898 h->rdc_bufh.sb_error = rc1;
2899 }
2900 }
2901
2902 if (multi) {
2903 /* Multi-hop secondary, just set bits in the bitmap */
2904 (void) RDC_SET_BITMAP(multi, pos, len, &bitmask);
2905 }
2906
2907 return (h->rdc_bufh.sb_error);
2908 }
2909
2910
2911 /*
2912 * _rdc_uncommit
2913 * - refresh specified data region in the buffer to prevent the cache
2914 * serving the scribbled on data back to another client.
2915 *
2916 * Only needs to happen on the local node. If in remote io mode, then
2917 * just return 0 - we do not cache the data on the local node and the
2918 * changed data will not have made it to the cache on the other node,
2919 * so it has no need to uncommit.
2920 */
2921
2922 static int
_rdc_uncommit(rdc_buf_t * h,nsc_off_t pos,nsc_size_t len,int flag)2923 _rdc_uncommit(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2924 {
2925 int remote = RDC_REMOTE(h);
2926 int rc = 0;
2927
2928 if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2929 cmn_err(CE_WARN,
2930 "!_rdc_uncommit: bounds check: io(handle) pos %" NSC_XSZFMT
2931 "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2932 pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2933 h->rdc_bufh.sb_error = EINVAL;
2934 return (h->rdc_bufh.sb_error);
2935 }
2936
2937 if (flag & NSC_NOBLOCK) {
2938 cmn_err(CE_WARN,
2939 "!_rdc_uncommit: removing unsupported NSC_NOBLOCK flag");
2940 flag &= ~(NSC_NOBLOCK);
2941 }
2942
2943 if (!remote) {
2944 rc = nsc_uncommit(h->rdc_bufp, pos, len, flag);
2945 }
2946
2947 if (!RDC_SUCCESS(rc))
2948 h->rdc_bufh.sb_error = rc;
2949
2950 return (rc);
2951 }
2952
2953
2954 /*
2955 * _rdc_trksize
2956 *
2957 * only needs to happen on local node.
2958 */
2959
2960 static int
_rdc_trksize(rdc_fd_t * rfd,nsc_size_t trksize)2961 _rdc_trksize(rdc_fd_t *rfd, nsc_size_t trksize)
2962 {
2963 return (nsc_set_trksize(RDC_FD(rfd), trksize));
2964 }
2965
2966
2967 static nsc_def_t _rdc_fd_def[] = {
2968 "Attach", (uintptr_t)_rdc_attach_fd, 0,
2969 "Pinned", (uintptr_t)_rdc_pinned, 0,
2970 "Unpinned", (uintptr_t)_rdc_unpinned, 0,
2971 0, 0, 0
2972 };
2973
2974
2975 static nsc_def_t _rdc_io_def[] = {
2976 "Open", (uintptr_t)_rdc_openc, 0,
2977 "Close", (uintptr_t)_rdc_close, 0,
2978 "Attach", (uintptr_t)_rdc_attach, 0,
2979 "Detach", (uintptr_t)_rdc_detach, 0,
2980 "AllocHandle", (uintptr_t)_rdc_alloc_handle, 0,
2981 "FreeHandle", (uintptr_t)_rdc_free_handle, 0,
2982 "AllocBuf", (uintptr_t)_rdc_alloc_buf, 0,
2983 "FreeBuf", (uintptr_t)_rdc_free_buf, 0,
2984 "GetPinned", (uintptr_t)_rdc_get_pinned, 0,
2985 "Discard", (uintptr_t)_rdc_discard_pinned, 0,
2986 "PartSize", (uintptr_t)_rdc_partsize, 0,
2987 "MaxFbas", (uintptr_t)_rdc_maxfbas, 0,
2988 "Control", (uintptr_t)_rdc_control, 0,
2989 "Read", (uintptr_t)_rdc_read, 0,
2990 "Write", (uintptr_t)_rdc_write, 0,
2991 "Zero", (uintptr_t)_rdc_zero, 0,
2992 "Uncommit", (uintptr_t)_rdc_uncommit, 0,
2993 "TrackSize", (uintptr_t)_rdc_trksize, 0,
2994 "Provide", 0, 0,
2995 0, 0, 0
2996 };
2997
2998 static nsc_def_t _rdc_ior_def[] = {
2999 "Open", (uintptr_t)_rdc_openr, 0,
3000 "Close", (uintptr_t)_rdc_close, 0,
3001 "Attach", (uintptr_t)_rdc_attach, 0,
3002 "Detach", (uintptr_t)_rdc_detach, 0,
3003 "AllocHandle", (uintptr_t)_rdc_alloc_handle, 0,
3004 "FreeHandle", (uintptr_t)_rdc_free_handle, 0,
3005 "AllocBuf", (uintptr_t)_rdc_alloc_buf, 0,
3006 "FreeBuf", (uintptr_t)_rdc_free_buf, 0,
3007 "GetPinned", (uintptr_t)_rdc_get_pinned, 0,
3008 "Discard", (uintptr_t)_rdc_discard_pinned, 0,
3009 "PartSize", (uintptr_t)_rdc_partsize, 0,
3010 "MaxFbas", (uintptr_t)_rdc_maxfbas, 0,
3011 "Control", (uintptr_t)_rdc_control, 0,
3012 "Read", (uintptr_t)_rdc_read, 0,
3013 "Write", (uintptr_t)_rdc_write, 0,
3014 "Zero", (uintptr_t)_rdc_zero, 0,
3015 "Uncommit", (uintptr_t)_rdc_uncommit, 0,
3016 "TrackSize", (uintptr_t)_rdc_trksize, 0,
3017 "Provide", 0, 0,
3018 0, 0, 0
3019 };
3020