1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #include <sys/types.h>
27 #include <sys/ksynch.h>
28 #include <sys/cmn_err.h>
29 #include <sys/kmem.h>
30 #include <sys/stat.h>
31 #include <sys/errno.h>
32
33 #include "../solaris/nsc_thread.h"
34 #ifdef DS_DDICT
35 #include "../contract.h"
36 #endif
37 #include <sys/nsctl/nsctl.h>
38
39 #include <sys/kmem.h>
40 #include <sys/ddi.h>
41
42 #include <sys/sdt.h> /* dtrace is S10 or later */
43
44 #include "rdc_io.h"
45 #include "rdc_bitmap.h"
46 #include "rdc_diskq.h"
47 #include "rdc_clnt.h"
48
49 #include <sys/unistat/spcs_s.h>
50 #include <sys/unistat/spcs_s_k.h>
51 #include <sys/unistat/spcs_errors.h>
52
53 extern nsc_io_t *_rdc_io_hc;
54
55 int rdc_diskq_coalesce = 0;
56
57 int
_rdc_rsrv_diskq(rdc_group_t * group)58 _rdc_rsrv_diskq(rdc_group_t *group)
59 {
60 int rc = 0;
61
62 mutex_enter(&group->diskqmutex);
63 if (group->diskqfd == NULL) {
64 mutex_exit(&group->diskqmutex);
65 return (EIO);
66 } else if ((group->diskqrsrv == 0) &&
67 (rc = nsc_reserve(group->diskqfd, 0)) != 0) {
68 cmn_err(CE_WARN,
69 "!rdc: nsc_reserve(%s) failed %d\n",
70 nsc_pathname(group->diskqfd), rc);
71 } else {
72 group->diskqrsrv++;
73 }
74
75 mutex_exit(&group->diskqmutex);
76 return (rc);
77 }
78
79 void
_rdc_rlse_diskq(rdc_group_t * group)80 _rdc_rlse_diskq(rdc_group_t *group)
81 {
82 mutex_enter(&group->diskqmutex);
83 if (group->diskqrsrv > 0 && --group->diskqrsrv == 0) {
84 nsc_release(group->diskqfd);
85 }
86 mutex_exit(&group->diskqmutex);
87 }
88
89 void
rdc_wait_qbusy(disk_queue * q)90 rdc_wait_qbusy(disk_queue *q)
91 {
92 ASSERT(MUTEX_HELD(QLOCK(q)));
93 while (q->busycnt > 0)
94 cv_wait(&q->busycv, QLOCK(q));
95 }
96
97 void
rdc_set_qbusy(disk_queue * q)98 rdc_set_qbusy(disk_queue *q)
99 {
100 ASSERT(MUTEX_HELD(QLOCK(q)));
101 q->busycnt++;
102 }
103
104 void
rdc_clr_qbusy(disk_queue * q)105 rdc_clr_qbusy(disk_queue *q)
106 {
107 ASSERT(MUTEX_HELD(QLOCK(q)));
108 q->busycnt--;
109 if (q->busycnt == 0)
110 cv_broadcast(&q->busycv);
111 }
112
113 int
rdc_lookup_diskq(char * pathname)114 rdc_lookup_diskq(char *pathname)
115 {
116 rdc_u_info_t *urdc;
117 #ifdef DEBUG
118 rdc_k_info_t *krdc;
119 #endif
120 int index;
121
122 for (index = 0; index < rdc_max_sets; index++) {
123 urdc = &rdc_u_info[index];
124 #ifdef DEBUG
125 krdc = &rdc_k_info[index];
126 #endif
127 ASSERT(krdc->index == index);
128 ASSERT(urdc->index == index);
129 if (!IS_ENABLED(urdc))
130 continue;
131
132 if (strncmp(pathname, urdc->disk_queue,
133 NSC_MAXPATH) == 0)
134 return (index);
135 }
136
137 return (-1);
138 }
139
140 void
rdc_unintercept_diskq(rdc_group_t * grp)141 rdc_unintercept_diskq(rdc_group_t *grp)
142 {
143 if (!RDC_IS_DISKQ(grp))
144 return;
145 if (grp->q_tok)
146 (void) nsc_unregister_path(grp->q_tok, 0);
147 grp->q_tok = NULL;
148 }
149
150 void
rdc_close_diskq(rdc_group_t * grp)151 rdc_close_diskq(rdc_group_t *grp)
152 {
153
154 if (grp == NULL) {
155 #ifdef DEBUG
156 cmn_err(CE_WARN, "!rdc_close_diskq: NULL group!");
157 #endif
158 return;
159 }
160
161 if (grp->diskqfd) {
162 if (nsc_close(grp->diskqfd) != 0) {
163 #ifdef DEBUG
164 cmn_err(CE_WARN, "!nsc_close on diskq failed");
165 #else
166 ;
167 /*EMPTY*/
168 #endif
169 }
170 grp->diskqfd = 0;
171 grp->diskqrsrv = 0;
172 }
173 bzero(&grp->diskq.disk_hdr, sizeof (diskq_header));
174 }
175
176 /*
177 * nsc_open the diskq and attach
178 * the nsc_fd to krdc->diskqfd
179 */
180 int
rdc_open_diskq(rdc_k_info_t * krdc)181 rdc_open_diskq(rdc_k_info_t *krdc)
182 {
183 rdc_u_info_t *urdc;
184 rdc_group_t *grp;
185 int sts;
186 nsc_size_t size;
187 char *diskqname;
188 int mutexheld = 0;
189
190 grp = krdc->group;
191 urdc = &rdc_u_info[krdc->index];
192
193 mutex_enter(&grp->diskqmutex);
194 mutexheld++;
195 if (&urdc->disk_queue[0] == '\0') {
196 goto fail;
197 }
198
199 diskqname = &urdc->disk_queue[0];
200
201 if (grp->diskqfd == NULL) {
202 grp->diskqfd = nsc_open(diskqname,
203 NSC_RDCHR_ID|NSC_DEVICE|NSC_WRITE, 0, 0, 0);
204 if (grp->diskqfd == NULL) {
205 cmn_err(CE_WARN, "!rdc_open_diskq: Unable to open %s",
206 diskqname);
207 goto fail;
208 }
209 }
210 if (!grp->q_tok)
211 grp->q_tok = nsc_register_path(urdc->disk_queue,
212 NSC_DEVICE | NSC_CACHE, _rdc_io_hc);
213
214 grp->diskqrsrv = 0; /* init reserve count */
215
216 mutex_exit(&grp->diskqmutex);
217 mutexheld--;
218 /* just test a reserve release */
219 sts = _rdc_rsrv_diskq(grp);
220 if (!RDC_SUCCESS(sts)) {
221 cmn_err(CE_WARN, "!rdc_open_diskq: Reserve failed for %s",
222 diskqname);
223 goto fail;
224 }
225 sts = nsc_partsize(grp->diskqfd, &size);
226 _rdc_rlse_diskq(grp);
227
228 if ((sts == 0) && (size < 1)) {
229 rdc_unintercept_diskq(grp);
230 rdc_close_diskq(grp);
231 goto fail;
232 }
233
234 return (0);
235
236 fail:
237 bzero(&urdc->disk_queue, NSC_MAXPATH);
238 if (mutexheld)
239 mutex_exit(&grp->diskqmutex);
240 return (-1);
241
242 }
243
244 /*
245 * rdc_count_vecs
246 * simply vec++'s until sb_addr is null
247 * returns number of vectors encountered
248 */
249 int
rdc_count_vecs(nsc_vec_t * vec)250 rdc_count_vecs(nsc_vec_t *vec)
251 {
252 nsc_vec_t *vecp;
253 int i = 0;
254 vecp = vec;
255 while (vecp->sv_addr) {
256 vecp++;
257 i++;
258 }
259 return (i+1);
260 }
261 /*
262 * rdc_setid2idx
263 * given setid, return index
264 */
265 int
rdc_setid2idx(int setid)266 rdc_setid2idx(int setid) {
267
268 int index = 0;
269
270 for (index = 0; index < rdc_max_sets; index++) {
271 if (rdc_u_info[index].setid == setid)
272 break;
273 }
274 if (index >= rdc_max_sets)
275 index = -1;
276 return (index);
277 }
278
279 /*
280 * rdc_idx2setid
281 * given an index, return its setid
282 */
283 int
rdc_idx2setid(int index)284 rdc_idx2setid(int index)
285 {
286 return (rdc_u_info[index].setid);
287 }
288
289 /*
290 * rdc_fill_ioheader
291 * fill in all the stuff you want to save on disk
292 * at the beginnig of each queued write
293 */
294 void
rdc_fill_ioheader(rdc_aio_t * aio,io_hdr * hd,int qpos)295 rdc_fill_ioheader(rdc_aio_t *aio, io_hdr *hd, int qpos)
296 {
297 ASSERT(MUTEX_HELD(&rdc_k_info[aio->index].group->diskq.disk_qlock));
298
299 hd->dat.magic = RDC_IOHDR_MAGIC;
300 hd->dat.type = RDC_QUEUEIO;
301 hd->dat.pos = aio->pos;
302 hd->dat.hpos = aio->pos;
303 hd->dat.qpos = qpos;
304 hd->dat.len = aio->len;
305 hd->dat.flag = aio->flag;
306 hd->dat.iostatus = aio->iostatus;
307 hd->dat.setid = rdc_idx2setid(aio->index);
308 hd->dat.time = nsc_time();
309 if (!aio->handle)
310 hd->dat.flag |= RDC_NULL_BUF; /* no real data to queue */
311 }
312
313 /*
314 * rdc_dump_iohdrs
315 * give back the iohdr list
316 * and clear out q->lastio
317 */
318 void
rdc_dump_iohdrs(disk_queue * q)319 rdc_dump_iohdrs(disk_queue *q)
320 {
321 io_hdr *p, *r;
322
323 ASSERT(MUTEX_HELD(QLOCK(q)));
324
325 p = q->iohdrs;
326 while (p) {
327 r = p->dat.next;
328 kmem_free(p, sizeof (*p));
329 q->hdrcnt--;
330 p = r;
331 }
332 q->iohdrs = q->hdr_last = NULL;
333 q->hdrcnt = 0;
334 if (q->lastio->handle)
335 (void) nsc_free_buf(q->lastio->handle);
336 bzero(&(*q->lastio), sizeof (*q->lastio));
337 }
338
339 /*
340 * rdc_fail_diskq
341 * set flags, throw away q info
342 * clean up what you can
343 * wait for flusher threads to stop (taking into account this may be one)
344 * takes group_lock, so conf, many, and bitmap may not be held
345 */
346 void
rdc_fail_diskq(rdc_k_info_t * krdc,int wait,int flag)347 rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag)
348 {
349 rdc_k_info_t *p;
350 rdc_u_info_t *q = &rdc_u_info[krdc->index];
351 rdc_group_t *group = krdc->group;
352 disk_queue *dq = &krdc->group->diskq;
353
354 if (IS_STATE(q, RDC_DISKQ_FAILED))
355 return;
356
357 if (!(flag & RDC_NOFAIL))
358 cmn_err(CE_WARN, "!disk queue %s failure", q->disk_queue);
359
360 if (flag & RDC_DOLOG) {
361 rdc_group_enter(krdc);
362 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
363 "disk queue failed");
364 rdc_group_exit(krdc);
365 }
366 mutex_enter(QHEADLOCK(dq));
367 mutex_enter(QLOCK(dq));
368 /*
369 * quick stop of the flushers
370 * other cleanup is done on the un-failing of the diskq
371 */
372 SET_QHEAD(dq, RDC_DISKQ_DATA_OFF);
373 SET_QTAIL(dq, RDC_DISKQ_DATA_OFF);
374 SET_QNXTIO(dq, RDC_DISKQ_DATA_OFF);
375 SET_LASTQTAIL(dq, 0);
376
377 rdc_dump_iohdrs(dq);
378
379 mutex_exit(QLOCK(dq));
380 mutex_exit(QHEADLOCK(dq));
381
382 bzero(krdc->bitmap_ref, krdc->bitmap_size * BITS_IN_BYTE *
383 BMAP_REF_PREF_SIZE);
384
385 if (flag & RDC_DOLOG) /* otherwise, we already have the conf lock */
386 rdc_group_enter(krdc);
387
388 else if (!(flag & RDC_GROUP_LOCKED))
389 ASSERT(MUTEX_HELD(&rdc_conf_lock));
390
391 if (!(flag & RDC_NOFAIL)) {
392 rdc_set_flags(q, RDC_DISKQ_FAILED);
393 }
394 rdc_clr_flags(q, RDC_QUEUING);
395
396 for (p = krdc->group_next; p != krdc; p = p->group_next) {
397 q = &rdc_u_info[p->index];
398 if (!IS_ENABLED(q))
399 continue;
400 if (!(flag & RDC_NOFAIL)) {
401 rdc_set_flags(q, RDC_DISKQ_FAILED);
402 }
403 rdc_clr_flags(q, RDC_QUEUING);
404 bzero(p->bitmap_ref, p->bitmap_size * BITS_IN_BYTE *
405 BMAP_REF_PREF_SIZE);
406 /* RDC_QUEUING is cleared in group_log() */
407 }
408
409 if (flag & RDC_DOLOG)
410 rdc_group_exit(krdc);
411
412 /* can't wait for myself to go away, I'm a flusher */
413 if (wait & RDC_WAIT)
414 while (group->rdc_thrnum)
415 delay(2);
416
417 }
418
419 /*
420 * rdc_stamp_diskq
421 * write out diskq header info
422 * must have disk_qlock held
423 * if rsrvd flag is 0, the nsc_reserve is done
424 */
425 int
rdc_stamp_diskq(rdc_k_info_t * krdc,int rsrvd,int failflags)426 rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int failflags)
427 {
428 nsc_vec_t vec[2];
429 nsc_buf_t *head = NULL;
430 rdc_group_t *grp;
431 rdc_u_info_t *urdc;
432 disk_queue *q;
433 int rc, flags;
434
435 grp = krdc->group;
436 q = &krdc->group->diskq;
437
438 ASSERT(MUTEX_HELD(&q->disk_qlock));
439
440 urdc = &rdc_u_info[krdc->index];
441
442 if (!rsrvd && _rdc_rsrv_diskq(grp)) {
443 cmn_err(CE_WARN, "!rdc_stamp_diskq: %s reserve failed",
444 urdc->disk_queue);
445 mutex_exit(QLOCK(q));
446 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
447 mutex_enter(QLOCK(q));
448 return (-1);
449 }
450 flags = NSC_WRITE | NSC_NOCACHE | NSC_NODATA;
451 rc = nsc_alloc_buf(grp->diskqfd, 0, 1, flags, &head);
452
453 if (!RDC_SUCCESS(rc)) {
454 cmn_err(CE_WARN, "!Alloc buf failed for disk queue %s",
455 &urdc->disk_queue[0]);
456 mutex_exit(QLOCK(q));
457 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
458 mutex_enter(QLOCK(q));
459 return (-1);
460 }
461 vec[0].sv_len = FBA_SIZE(1);
462 vec[0].sv_addr = (uchar_t *)&q->disk_hdr;
463 vec[1].sv_len = 0;
464 vec[1].sv_addr = NULL;
465
466 head->sb_vec = &vec[0];
467
468 #ifdef DEBUG_DISKQ
469 cmn_err(CE_NOTE, "!rdc_stamp_diskq: hdr: %p magic: %x state: "
470 "%x head: %d tail: %d size: %d nitems: %d blocks: %d",
471 q, QMAGIC(q), QSTATE(q), QHEAD(q),
472 QTAIL(q), QSIZE(q), QNITEMS(q), QBLOCKS(q));
473 #endif
474
475 rc = nsc_write(head, 0, 1, 0);
476
477 if (!RDC_SUCCESS(rc)) {
478 if (!rsrvd)
479 _rdc_rlse_diskq(grp);
480 cmn_err(CE_CONT, "!disk queue %s failed rc %d",
481 &urdc->disk_queue[0], rc);
482 mutex_exit(QLOCK(q));
483 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
484 mutex_enter(QLOCK(q));
485 return (-1);
486 }
487
488 (void) nsc_free_buf(head);
489 if (!rsrvd)
490 _rdc_rlse_diskq(grp);
491
492 return (0);
493 }
494
495 /*
496 * rdc_init_diskq_header
497 * load initial values into the header
498 */
499 void
rdc_init_diskq_header(rdc_group_t * grp,dqheader * header)500 rdc_init_diskq_header(rdc_group_t *grp, dqheader *header)
501 {
502 int rc;
503 int type = 0;
504 disk_queue *q = &grp->diskq;
505
506 ASSERT(MUTEX_HELD(QLOCK(q)));
507
508 /* save q type if this is a failure */
509 if (QSTATE(q) & RDC_QNOBLOCK)
510 type = RDC_QNOBLOCK;
511 bzero(header, sizeof (*header));
512 header->h.magic = RDC_DISKQ_MAGIC;
513 header->h.vers = RDC_DISKQ_VERS;
514 header->h.state |= (RDC_SHUTDOWN_BAD|type); /* SHUTDOWN_OK on suspend */
515 header->h.head_offset = RDC_DISKQ_DATA_OFF;
516 header->h.tail_offset = RDC_DISKQ_DATA_OFF;
517 header->h.nitems = 0;
518 header->h.blocks = 0;
519 header->h.qwrap = 0;
520 SET_QNXTIO(q, QHEAD(q));
521 SET_QCOALBOUNDS(q, RDC_DISKQ_DATA_OFF);
522
523 /* do this last, as this might be a failure. get the kernel state ok */
524 rc = _rdc_rsrv_diskq(grp);
525 if (!RDC_SUCCESS(rc)) {
526 cmn_err(CE_WARN, "!init_diskq_hdr: Reserve failed for queue");
527 return;
528 }
529 (void) nsc_partsize(grp->diskqfd, &header->h.disk_size);
530 _rdc_rlse_diskq(grp);
531
532 }
533
534 /*
535 * rdc_unfail_diskq
536 * the diskq failed for some reason, lets try and re-start it
537 * the old stuff has already been thrown away
538 * should just be called from rdc_sync
539 */
540 void
rdc_unfail_diskq(rdc_k_info_t * krdc)541 rdc_unfail_diskq(rdc_k_info_t *krdc)
542 {
543 rdc_k_info_t *p;
544 rdc_u_info_t *q = &rdc_u_info[krdc->index];
545 rdc_group_t *group = krdc->group;
546 disk_queue *dq = &group->diskq;
547
548 rdc_group_enter(krdc);
549 rdc_clr_flags(q, RDC_ASYNC);
550 /* someone else won the race... */
551 if (!IS_STATE(q, RDC_DISKQ_FAILED)) {
552 rdc_group_exit(krdc);
553 return;
554 }
555 rdc_clr_flags(q, RDC_DISKQ_FAILED);
556 for (p = krdc->group_next; p != krdc; p = p->group_next) {
557 q = &rdc_u_info[p->index];
558 if (!IS_ENABLED(q))
559 continue;
560 rdc_clr_flags(q, RDC_DISKQ_FAILED);
561 rdc_clr_flags(q, RDC_ASYNC);
562 if (IS_STATE(q, RDC_QUEUING))
563 rdc_clr_flags(q, RDC_QUEUING);
564 }
565 rdc_group_exit(krdc);
566
567 mutex_enter(QLOCK(dq));
568
569 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
570 /* real i/o to the queue */
571 /* clear RDC_AUXSYNCIP because we cannot halt a sync that's not here */
572 krdc->aux_state &= ~RDC_AUXSYNCIP;
573 if (rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED | RDC_DOLOG) < 0) {
574 mutex_exit(QLOCK(dq));
575 goto fail;
576 }
577
578 SET_QNXTIO(dq, QHEAD(dq));
579 SET_QHDRCNT(dq, 0);
580 SET_QSTATE(dq, RDC_SHUTDOWN_BAD); /* only suspend can write good */
581 dq->iohdrs = NULL;
582 dq->hdr_last = NULL;
583
584 /* should be none, but.. */
585 rdc_dump_iohdrs(dq);
586
587 mutex_exit(QLOCK(dq));
588
589
590 fail:
591 krdc->aux_state |= RDC_AUXSYNCIP;
592 return;
593
594 }
595
596 int
rdc_read_diskq_header(rdc_k_info_t * krdc)597 rdc_read_diskq_header(rdc_k_info_t *krdc)
598 {
599 int rc;
600 diskq_header *header;
601 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
602
603 if (krdc->group->diskqfd == NULL) {
604 char buf[NSC_MAXPATH];
605 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
606 &urdc->secondary.intf[0]);
607 cmn_err(CE_WARN, "!Disk Queue Header read failed for %s",
608 &urdc->group_name[0] == '\0' ? buf:
609 &urdc->group_name[0]);
610 return (-1);
611 }
612
613 header = &krdc->group->diskq.disk_hdr.h;
614 if (_rdc_rsrv_diskq(krdc->group)) {
615 return (-1);
616 }
617
618 rc = rdc_ns_io(krdc->group->diskqfd, NSC_RDBUF, 0,
619 (uchar_t *)header, sizeof (diskq_header));
620
621 _rdc_rlse_diskq(krdc->group);
622
623 if (!RDC_SUCCESS(rc)) {
624 char buf[NSC_MAXPATH];
625 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
626 &urdc->secondary.file[0]);
627 cmn_err(CE_WARN, "!Disk Queue Header read failed(%d) for %s",
628 rc, &urdc->group_name[0] == '\0' ? buf :
629 &urdc->group_name[0]);
630 return (-1);
631 }
632 return (0);
633 }
634
635 /*
636 * rdc_stop_diskq_flusher
637 */
638 void
rdc_stop_diskq_flusher(rdc_k_info_t * krdc)639 rdc_stop_diskq_flusher(rdc_k_info_t *krdc)
640 {
641 disk_queue q, *qp;
642 rdc_group_t *group;
643 #ifdef DEBUG
644 cmn_err(CE_NOTE, "!stopping flusher threads");
645 #endif
646 group = krdc->group;
647 qp = &krdc->group->diskq;
648
649 /* save the queue info */
650 q = *qp;
651
652 /* lie a little */
653 SET_QTAIL(qp, RDC_DISKQ_DATA_OFF);
654 SET_QHEAD(qp, RDC_DISKQ_DATA_OFF);
655 SET_QSTATE(qp, RDC_QDISABLEPEND);
656 SET_QSTATE(qp, RDC_STOPPINGFLUSH);
657
658 /* drop locks to allow flushers to die */
659 mutex_exit(QLOCK(qp));
660 mutex_exit(QHEADLOCK(qp));
661 rdc_group_exit(krdc);
662
663 while (group->rdc_thrnum)
664 delay(2);
665
666 rdc_group_enter(krdc);
667 mutex_enter(QHEADLOCK(qp));
668 mutex_enter(QLOCK(qp));
669
670 CLR_QSTATE(qp, RDC_STOPPINGFLUSH);
671 *qp = q;
672 }
673
674 /*
675 * rdc_enable_diskq
676 * open the diskq
677 * and stamp the header onto it.
678 */
679 int
rdc_enable_diskq(rdc_k_info_t * krdc)680 rdc_enable_diskq(rdc_k_info_t *krdc)
681 {
682 rdc_group_t *group;
683 disk_queue *q;
684
685 group = krdc->group;
686 q = &group->diskq;
687
688 if (rdc_open_diskq(krdc) < 0)
689 goto fail;
690
691 mutex_enter(QLOCK(q));
692 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
693
694 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
695 mutex_exit(QLOCK(q));
696 goto fail;
697 }
698
699 SET_QNXTIO(q, QHEAD(q));
700
701 mutex_exit(QLOCK(q));
702 return (0);
703
704 fail:
705 mutex_enter(&group->diskqmutex);
706 rdc_close_diskq(group);
707 mutex_exit(&group->diskqmutex);
708
709 /* caller has to fail diskq after dropping conf & many locks */
710 return (RDC_EQNOADD);
711 }
712
713 /*
714 * rdc_resume_diskq
715 * open the diskq and read the header
716 */
717 int
rdc_resume_diskq(rdc_k_info_t * krdc)718 rdc_resume_diskq(rdc_k_info_t *krdc)
719 {
720 rdc_u_info_t *urdc;
721 rdc_group_t *group;
722 disk_queue *q;
723 int rc = 0;
724
725 urdc = &rdc_u_info[krdc->index];
726 group = krdc->group;
727 q = &group->diskq;
728
729 if (rdc_open_diskq(krdc) < 0) {
730 rc = RDC_EQNOADD;
731 goto fail;
732 }
733
734 mutex_enter(QLOCK(q));
735
736 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
737
738 if (rdc_read_diskq_header(krdc) < 0) {
739 SET_QSTATE(q, RDC_QBADRESUME);
740 rc = RDC_EQNOADD;
741 }
742
743 /* check diskq magic number */
744 if (QMAGIC(q) != RDC_DISKQ_MAGIC) {
745 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
746 " incorrect magic number in header", urdc->disk_queue);
747 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
748 SET_QSTATE(q, RDC_QBADRESUME);
749 rc = RDC_EQNOADD;
750 } else switch (QVERS(q)) {
751 diskq_header1 h1; /* version 1 header */
752 diskq_header *hc; /* current header */
753
754 #ifdef NSC_MULTI_TERABYTE
755 case RDC_DISKQ_VER_ORIG:
756 /* version 1 diskq header, upgrade to 64bit version */
757 h1 = *(diskq_header1 *)(&group->diskq.disk_hdr.h);
758 hc = &group->diskq.disk_hdr.h;
759
760 cmn_err(CE_WARN, "!SNDR: old version header for diskq %s,"
761 " upgrading to current version", urdc->disk_queue);
762 hc->vers = RDC_DISKQ_VERS;
763 hc->state = h1.state;
764 hc->head_offset = h1.head_offset;
765 hc->tail_offset = h1.tail_offset;
766 hc->disk_size = h1.disk_size;
767 hc->nitems = h1.nitems;
768 hc->blocks = h1.blocks;
769 hc->qwrap = h1.qwrap;
770 hc->auxqwrap = h1.auxqwrap;
771 hc->seq_last = h1.seq_last;
772 hc->ack_last = h1.ack_last;
773
774 if (hc->nitems > 0) {
775 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
776 " old version Q contains data", urdc->disk_queue);
777 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
778 SET_QSTATE(q, RDC_QBADRESUME);
779 rc = RDC_EQNOADD;
780 }
781 break;
782 #else
783 case RDC_DISKQ_VER_64BIT:
784 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
785 " diskq header newer than current version",
786 urdc->disk_queue);
787 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
788 SET_QSTATE(q, RDC_QBADRESUME);
789 rc = RDC_EQNOADD;
790 break;
791 #endif
792 case RDC_DISKQ_VERS:
793 /* okay, current version diskq */
794 break;
795 default:
796 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
797 " unknown diskq header version", urdc->disk_queue);
798 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
799 SET_QSTATE(q, RDC_QBADRESUME);
800 rc = RDC_EQNOADD;
801 break;
802 }
803 if (IS_QSTATE(q, RDC_SHUTDOWN_BAD)) {
804 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
805 " unsafe shutdown", urdc->disk_queue);
806 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
807 SET_QSTATE(q, RDC_QBADRESUME);
808 rc = RDC_EQNOADD;
809 }
810
811 CLR_QSTATE(q, RDC_SHUTDOWN_OK);
812 SET_QSTATE(q, RDC_SHUTDOWN_BAD);
813
814 /* bad, until proven not bad */
815 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
816 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_NOLOG);
817 rc = RDC_EQNOADD;
818 }
819
820 SET_QNXTIO(q, QHEAD(q));
821 group->diskq.nitems_hwm = QNITEMS(q);
822 group->diskq.blocks_hwm = QBLOCKS(q);
823
824 mutex_exit(QLOCK(q));
825
826 #ifdef DEBUG
827 cmn_err(CE_NOTE, "!rdc_resume_diskq: resuming diskq %s \n",
828 urdc->disk_queue);
829 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
830 #endif
831 if (rc == 0)
832 return (0);
833
834 fail:
835
836 /* caller has to set the diskq failed after dropping it's locks */
837 return (rc);
838
839 }
840
841 int
rdc_suspend_diskq(rdc_k_info_t * krdc)842 rdc_suspend_diskq(rdc_k_info_t *krdc)
843 {
844 int rc;
845 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
846 disk_queue *q;
847
848 q = &krdc->group->diskq;
849
850 /* grab both diskq locks as we are going to kill the flusher */
851 mutex_enter(QHEADLOCK(q));
852 mutex_enter(QLOCK(q));
853
854 if ((krdc->group->rdc_thrnum) && (!IS_QSTATE(q, RDC_STOPPINGFLUSH))) {
855 SET_QSTATE(q, RDC_STOPPINGFLUSH);
856 rdc_stop_diskq_flusher(krdc);
857 CLR_QSTATE(q, RDC_STOPPINGFLUSH);
858 }
859
860 krdc->group->diskq.disk_hdr.h.state &= ~RDC_SHUTDOWN_BAD;
861 krdc->group->diskq.disk_hdr.h.state |= RDC_SHUTDOWN_OK;
862 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QBADRESUME;
863
864 /* let's make sure that the flusher has stopped.. */
865 if (krdc->group->rdc_thrnum) {
866 mutex_exit(QLOCK(q));
867 mutex_exit(QHEADLOCK(q));
868 rdc_group_exit(krdc);
869
870 while (krdc->group->rdc_thrnum)
871 delay(5);
872
873 rdc_group_enter(krdc);
874 mutex_enter(QLOCK(q));
875 mutex_enter(QHEADLOCK(q));
876 }
877 /* write refcount to the bitmap */
878 if ((rc = rdc_write_refcount(krdc)) < 0) {
879 rdc_group_exit(krdc);
880 goto fail;
881 }
882
883 if (!QEMPTY(q)) {
884 rdc_set_flags(urdc, RDC_QUEUING);
885 } else {
886 rdc_clr_flags(urdc, RDC_QUEUING);
887 }
888
889 /* fill in diskq header info */
890 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QDISABLEPEND;
891
892 #ifdef DEBUG
893 cmn_err(CE_NOTE, "!suspending disk queue\n" QDISPLAY(q));
894 #endif
895
896 /* to avoid a possible deadlock, release in order, and reacquire */
897 mutex_exit(QLOCK(q));
898 mutex_exit(QHEADLOCK(q));
899
900 if (krdc->group->count > 1) {
901 rdc_group_exit(krdc);
902 goto fail; /* just stamp on the last suspend */
903 }
904 rdc_group_exit(krdc); /* in case this stamp fails */
905 mutex_enter(QLOCK(q));
906
907 rc = rdc_stamp_diskq(krdc, 0, RDC_NOLOG);
908
909 mutex_exit(QLOCK(q));
910
911 fail:
912 rdc_group_enter(krdc);
913
914 /* diskq already failed if stamp failed */
915
916 return (rc);
917 }
918
919 /*
920 * copy orig aio to copy, including the nsc_buf_t
921 */
922 int
rdc_dup_aio(rdc_aio_t * orig,rdc_aio_t * copy)923 rdc_dup_aio(rdc_aio_t *orig, rdc_aio_t *copy)
924 {
925 int rc;
926 bcopy(orig, copy, sizeof (*orig));
927 copy->handle = NULL;
928
929 if (orig->handle == NULL) /* no buf to alloc/copy */
930 return (0);
931
932 rc = nsc_alloc_abuf(orig->pos, orig->len, 0, ©->handle);
933 if (!RDC_SUCCESS(rc)) {
934 #ifdef DEBUG
935 cmn_err(CE_WARN, "!rdc_dup_aio: alloc_buf failed (%d)", rc);
936 #endif
937 return (rc);
938 }
939 rc = nsc_copy(orig->handle, copy->handle, orig->pos,
940 orig->pos, orig->len);
941 if (!RDC_SUCCESS(rc)) {
942 (void) nsc_free_buf(copy->handle);
943 #ifdef DEBUG
944 cmn_err(CE_WARN, "!rdc_dup_aio: copy buf failed (%d)", rc);
945 #endif
946 return (rc);
947 }
948 return (0);
949 }
950
951 /*
952 * rdc_qfill_shldwakeup()
953 * 0 if the memory queue has filled, and the low water
954 * mark has not been reached. 0 if diskq is empty.
955 * 1 if less than low water mark
956 * net_queue mutex is already held
957 */
958 int
rdc_qfill_shldwakeup(rdc_k_info_t * krdc)959 rdc_qfill_shldwakeup(rdc_k_info_t *krdc)
960 {
961 rdc_group_t *group = krdc->group;
962 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
963 net_queue *nq = &group->ra_queue;
964 disk_queue *dq = &group->diskq;
965
966 ASSERT(MUTEX_HELD(&nq->net_qlock));
967
968 if (!RDC_IS_DISKQ(krdc->group))
969 return (0);
970
971 if (nq->qfill_sleeping != RDC_QFILL_ASLEEP)
972 return (0);
973
974 if (nq->qfflags & RDC_QFILLSTOP)
975 return (1);
976
977 if (nq->qfflags & RDC_QFILLSLEEP)
978 return (0);
979
980 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING))
981 return (0);
982
983 mutex_enter(QLOCK(dq));
984 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
985 mutex_exit(QLOCK(dq));
986 return (0);
987 }
988 mutex_exit(QLOCK(dq));
989
990 if (nq->qfill_sleeping == RDC_QFILL_ASLEEP) {
991 if (nq->hwmhit) {
992 if (nq->blocks <= RDC_LOW_QBLOCKS) {
993 nq->hwmhit = 0;
994 } else {
995 return (0);
996 }
997 }
998 #ifdef DEBUG_DISKQ_NOISY
999 cmn_err(CE_NOTE, "!Waking up diskq->memq flusher, flags 0x%x"
1000 " idx: %d", rdc_get_vflags(urdc), urdc->index);
1001 #endif
1002 return (1);
1003 }
1004 return (0);
1005
1006 }
1007
1008 /*
1009 * rdc_diskq_enqueue
1010 * enqueue one i/o to the diskq
1011 * after appending some metadata to the front
1012 */
1013 int
rdc_diskq_enqueue(rdc_k_info_t * krdc,rdc_aio_t * aio)1014 rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1015 {
1016 nsc_vec_t *vec = NULL;
1017 nsc_buf_t *bp = NULL;
1018 nsc_buf_t *qbuf = NULL;
1019 io_hdr *iohdr = NULL;
1020 disk_queue *q;
1021 rdc_group_t *group;
1022 int numvecs;
1023 int i, j, rc = 0;
1024 int retries = 0;
1025 rdc_u_info_t *urdc;
1026 nsc_size_t iofbas; /* len of io + io header len */
1027 int qtail;
1028 int delay_time = 2;
1029 int print_msg = 1;
1030
1031 #ifdef DEBUG_WRITER_UBERNOISE
1032 int qhead;
1033 #endif
1034 urdc = &rdc_u_info[krdc->index];
1035 group = krdc->group;
1036 q = &group->diskq;
1037
1038 mutex_enter(QLOCK(q));
1039
1040 /*
1041 * there is a thread that is blocking because the queue is full,
1042 * don't try to set up this write until all is clear
1043 * check before and after for logging or failed queue just
1044 * in case a thread was in flight while the queue was full,
1045 * and in the proccess of failing
1046 */
1047 while (IS_QSTATE(q, RDC_QFULL)) {
1048 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1049 (IS_STATE(urdc, RDC_LOGGING) &&
1050 !IS_STATE(urdc, RDC_QUEUING))) {
1051 mutex_exit(QLOCK(q));
1052 if (aio->handle)
1053 (void) nsc_free_buf(aio->handle);
1054 return (-1);
1055 }
1056 cv_wait(&q->qfullcv, QLOCK(q));
1057
1058 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1059 (IS_STATE(urdc, RDC_LOGGING) &&
1060 !IS_STATE(urdc, RDC_QUEUING))) {
1061 mutex_exit(QLOCK(q));
1062 if (aio->handle)
1063 (void) nsc_free_buf(aio->handle);
1064 return (-1);
1065 }
1066
1067 }
1068
1069 SET_QSTATE(q, QTAILBUSY);
1070
1071 if (aio->handle == NULL) {
1072 /* we're only going to write the header to the queue */
1073 numvecs = 2; /* kmem_alloc io header + null terminate */
1074 iofbas = FBA_LEN(sizeof (io_hdr));
1075
1076 } else {
1077 /* find out how many vecs */
1078 numvecs = rdc_count_vecs(aio->handle->sb_vec) + 1;
1079 iofbas = aio->len + FBA_LEN(sizeof (io_hdr));
1080 }
1081
1082 /*
1083 * this, in conjunction with QTAILBUSY, will prevent
1084 * premature dequeuing
1085 */
1086
1087 SET_LASTQTAIL(q, QTAIL(q));
1088
1089 iohdr = (io_hdr *) kmem_zalloc(sizeof (io_hdr), KM_NOSLEEP);
1090 vec = (nsc_vec_t *) kmem_zalloc(sizeof (nsc_vec_t) * numvecs,
1091 KM_NOSLEEP);
1092
1093 if (!vec || !iohdr) {
1094 if (!vec) {
1095 cmn_err(CE_WARN, "!vec kmem alloc failed");
1096 } else {
1097 cmn_err(CE_WARN, "!iohdr kmem alloc failed");
1098 }
1099 if (vec)
1100 kmem_free(vec, sizeof (*vec));
1101 if (iohdr)
1102 kmem_free(iohdr, sizeof (*iohdr));
1103 CLR_QSTATE(q, QTAILBUSY);
1104 SET_LASTQTAIL(q, 0);
1105 mutex_exit(QLOCK(q));
1106 if (aio->handle)
1107 (void) nsc_free_buf(aio->handle);
1108 return (ENOMEM);
1109 }
1110
1111 vec[numvecs - 1].sv_len = 0;
1112 vec[numvecs - 1].sv_addr = 0;
1113
1114 /* now add the write itself */
1115 bp = aio->handle;
1116
1117 for (i = 1, j = 0; bp && bp->sb_vec[j].sv_addr &&
1118 i < numvecs; i++, j++) {
1119 vec[i].sv_len = bp->sb_vec[j].sv_len;
1120 vec[i].sv_addr = bp->sb_vec[j].sv_addr;
1121 }
1122
1123 retry:
1124
1125 /* check for queue wrap, then check for overflow */
1126 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1127 (IS_STATE(urdc, RDC_LOGGING) && !IS_STATE(urdc, RDC_QUEUING))) {
1128 kmem_free(iohdr, sizeof (*iohdr));
1129 kmem_free(vec, sizeof (*vec) * numvecs);
1130 CLR_QSTATE(q, QTAILBUSY);
1131 SET_LASTQTAIL(q, 0);
1132 if (IS_QSTATE(q, RDC_QFULL)) { /* wakeup blocked threads */
1133 CLR_QSTATE(q, RDC_QFULL);
1134 cv_broadcast(&q->qfullcv);
1135 }
1136 mutex_exit(QLOCK(q));
1137 if (aio->handle)
1138 (void) nsc_free_buf(aio->handle);
1139
1140 return (-1);
1141 }
1142
1143 if (QTAILSHLDWRAP(q, iofbas)) {
1144 /*
1145 * just go back to the beginning of the disk
1146 * it's not worth the trouble breaking up the write
1147 */
1148 #ifdef DEBUG_DISKQWRAP
1149 cmn_err(CE_NOTE, "!wrapping Q tail: " QDISPLAY(q));
1150 #endif
1151 /*LINTED*/
1152 WRAPQTAIL(q);
1153 }
1154
1155 /*
1156 * prepend the write's metadata
1157 */
1158 rdc_fill_ioheader(aio, iohdr, QTAIL(q));
1159
1160 vec[0].sv_len = FBA_SIZE(1);
1161 vec[0].sv_addr = (uchar_t *)iohdr;
1162
1163 /* check for tail < head */
1164
1165 if (!(FITSONQ(q, iofbas))) {
1166 /*
1167 * don't allow any more writes to start
1168 */
1169 SET_QSTATE(q, RDC_QFULL);
1170 mutex_exit(QLOCK(q));
1171
1172 if ((!group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1173 (void) rdc_writer(krdc->index);
1174
1175 delay(delay_time);
1176 q->throttle_delay += delay_time;
1177 retries++;
1178 delay_time *= 2; /* fairly aggressive */
1179 if ((retries >= 8) || (delay_time >= 256)) {
1180 delay_time = 2;
1181 if (print_msg) {
1182 cmn_err(CE_WARN, "!enqueue: disk queue %s full",
1183 &urdc->disk_queue[0]);
1184 print_msg = 0;
1185 #ifdef DEBUG
1186 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
1187 #else
1188 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1189 #endif
1190 }
1191 /*
1192 * if this is a no-block queue, or this is a blocking
1193 * queue that is not flushing. reset and log
1194 */
1195 if ((QSTATE(q) & RDC_QNOBLOCK) ||
1196 (IS_STATE(urdc, RDC_QUEUING))) {
1197
1198 if (IS_STATE(urdc, RDC_QUEUING)) {
1199 cmn_err(CE_WARN, "!SNDR: disk queue %s full and not flushing. "
1200 "giving up", &urdc->disk_queue[0]);
1201 cmn_err(CE_WARN, "!SNDR: %s:%s entering logging mode",
1202 urdc->secondary.intf, urdc->secondary.file);
1203 }
1204
1205 rdc_fail_diskq(krdc, RDC_WAIT,
1206 RDC_DOLOG | RDC_NOFAIL);
1207 kmem_free(iohdr, sizeof (*iohdr));
1208 kmem_free(vec, sizeof (*vec) * numvecs);
1209 mutex_enter(QLOCK(q));
1210 CLR_QSTATE(q, QTAILBUSY | RDC_QFULL);
1211 cv_broadcast(&q->qfullcv);
1212 mutex_exit(QLOCK(q));
1213 SET_LASTQTAIL(q, 0);
1214 if (aio->handle)
1215 (void) nsc_free_buf(aio->handle);
1216 return (ENOMEM);
1217 }
1218 }
1219
1220 mutex_enter(QLOCK(q));
1221 goto retry;
1222
1223 }
1224
1225 qtail = QTAIL(q);
1226 #ifdef DEBUG_WRITER_UBERNOISE
1227 qhead = QHEAD(q);
1228 #endif
1229
1230 /* update tail pointer, nitems on queue and blocks on queue */
1231 INC_QTAIL(q, iofbas); /* increment tail over i/o size + ioheader size */
1232 INC_QNITEMS(q, 1);
1233 /* increment counter for i/o blocks only */
1234 INC_QBLOCKS(q, (iofbas - FBA_LEN(sizeof (io_hdr))));
1235
1236 if (QNITEMS(q) > q->nitems_hwm)
1237 q->nitems_hwm = QNITEMS(q);
1238 if (QBLOCKS(q) > q->blocks_hwm)
1239 q->blocks_hwm = QBLOCKS(q);
1240
1241 if (IS_QSTATE(q, RDC_QFULL)) {
1242 CLR_QSTATE(q, RDC_QFULL);
1243 cv_broadcast(&q->qfullcv);
1244 }
1245
1246 mutex_exit(QLOCK(q));
1247
1248 /*
1249 * if (krdc->io_kstats) {
1250 * mutex_enter(krdc->io_kstats->ks_lock);
1251 * kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1252 * mutex_exit(krdc->io_kstats->ks_lock);
1253 * }
1254 */
1255
1256 DTRACE_PROBE(rdc_diskq_rsrv);
1257
1258 if (_rdc_rsrv_diskq(group)) {
1259 cmn_err(CE_WARN, "!rdc_enqueue: %s reserve failed",
1260 &urdc->disk_queue[0]);
1261 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1262 kmem_free(iohdr, sizeof (*iohdr));
1263 kmem_free(vec, sizeof (*vec) * numvecs);
1264 mutex_enter(QLOCK(q));
1265 CLR_QSTATE(q, QTAILBUSY);
1266 SET_LASTQTAIL(q, 0);
1267 mutex_exit(QLOCK(q));
1268 if (aio->handle)
1269 (void) nsc_free_buf(aio->handle);
1270 return (-1);
1271 }
1272
1273 /* XXX for now do this, but later pre-alloc handle in enable/resume */
1274
1275 DTRACE_PROBE(rdc_diskq_alloc_start);
1276 rc = nsc_alloc_buf(group->diskqfd, qtail, iofbas,
1277 NSC_NOCACHE | NSC_WRITE | NSC_NODATA, &qbuf);
1278
1279 DTRACE_PROBE(rdc_diskq_alloc_end);
1280
1281 if (!RDC_SUCCESS(rc)) {
1282 cmn_err(CE_WARN, "!disk queue %s alloc failed(%d) %" NSC_SZFMT,
1283 &urdc->disk_queue[0], rc, iofbas);
1284 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1285 rc = ENOMEM;
1286 goto fail;
1287 }
1288 /* move vec and write to queue */
1289 qbuf->sb_vec = &vec[0];
1290
1291 #ifdef DEBUG_WRITER_UBERNOISE
1292
1293 cmn_err(CE_NOTE, "!about to write to queue, qbuf: %p, qhead: %d, "
1294 "qtail: %d, len: %d contents: %c%c%c%c%c",
1295 (void *) qbuf, qhead, qtail, iofbas,
1296 qbuf->sb_vec[1].sv_addr[0],
1297 qbuf->sb_vec[1].sv_addr[1],
1298 qbuf->sb_vec[1].sv_addr[2],
1299 qbuf->sb_vec[1].sv_addr[3],
1300 qbuf->sb_vec[1].sv_addr[4]);
1301 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1302
1303 #endif
1304
1305 DTRACE_PROBE2(rdc_diskq_nswrite_start, int, qtail, nsc_size_t, iofbas);
1306 rc = nsc_write(qbuf, qtail, iofbas, 0);
1307 DTRACE_PROBE2(rdc_diskq_nswrite_end, int, qtail, nsc_size_t, iofbas);
1308
1309 if (!RDC_SUCCESS(rc)) {
1310 cmn_err(CE_WARN, "!disk queue %s write failed %d",
1311 &urdc->disk_queue[0], rc);
1312 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1313 goto fail;
1314
1315 }
1316
1317 mutex_enter(QLOCK(q));
1318
1319 SET_LASTQTAIL(q, 0);
1320 CLR_QSTATE(q, QTAILBUSY);
1321
1322 mutex_exit(QLOCK(q));
1323
1324 fail:
1325
1326 /*
1327 * return what should be returned
1328 * the aio is returned in _rdc_write after status is gathered.
1329 */
1330
1331 if (qbuf)
1332 qbuf->sb_vec = 0;
1333 (void) nsc_free_buf(qbuf);
1334
1335 if (aio->handle)
1336 (void) nsc_free_buf(aio->handle);
1337
1338 _rdc_rlse_diskq(group);
1339 DTRACE_PROBE(rdc_diskq_rlse);
1340
1341 /* free the iohdr and the vecs */
1342
1343 if (iohdr)
1344 kmem_free(iohdr, sizeof (*iohdr));
1345 if (vec)
1346 kmem_free(vec, sizeof (*vec) * numvecs);
1347
1348 /* if no flusher running, start one */
1349 if ((!krdc->group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1350 (void) rdc_writer(krdc->index);
1351
1352 return (rc);
1353 }
1354
1355 /*
1356 * place this on the pending list of io_hdr's out for flushing
1357 */
1358 void
rdc_add_iohdr(io_hdr * header,rdc_group_t * group)1359 rdc_add_iohdr(io_hdr *header, rdc_group_t *group)
1360 {
1361 disk_queue *q = NULL;
1362 #ifdef DEBUG
1363 io_hdr *p;
1364 #endif
1365
1366 q = &group->diskq;
1367
1368 /* paranoia */
1369 header->dat.next = NULL;
1370
1371 mutex_enter(QLOCK(q));
1372 #ifdef DEBUG /* AAAH! double flush!? */
1373 p = q->iohdrs;
1374 while (p) {
1375 if (p->dat.qpos == header->dat.qpos) {
1376 cmn_err(CE_WARN, "!ADDING DUPLICATE HEADER %" NSC_SZFMT,
1377 p->dat.qpos);
1378 kmem_free(header, sizeof (*header));
1379 mutex_exit(QLOCK(q));
1380 return;
1381 }
1382 p = p->dat.next;
1383 }
1384 #endif
1385 if (q->iohdrs == NULL) {
1386 q->iohdrs = q->hdr_last = header;
1387 q->hdrcnt = 1;
1388 mutex_exit(QLOCK(q));
1389 return;
1390 }
1391
1392 q->hdr_last->dat.next = header;
1393 q->hdr_last = header;
1394 q->hdrcnt++;
1395 mutex_exit(QLOCK(q));
1396 return;
1397
1398 }
1399
1400 /*
1401 * mark an io header as flushed. If it is the qhead,
1402 * then update the qpointers
1403 * free the io_hdrs
1404 * called after the bitmap is cleared by flusher
1405 */
1406 void
rdc_clr_iohdr(rdc_k_info_t * krdc,nsc_size_t qpos)1407 rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_size_t qpos)
1408 {
1409 rdc_group_t *group = krdc->group;
1410 disk_queue *q = NULL;
1411 io_hdr *hp = NULL;
1412 io_hdr *p = NULL;
1413 int found = 0;
1414 int cnt = 0;
1415
1416 #ifndef NSC_MULTI_TERABYTE
1417 ASSERT(qpos >= 0); /* assertion to validate change for 64bit */
1418 if (qpos < 0) /* not a diskq offset */
1419 return;
1420 #endif
1421
1422 q = &group->diskq;
1423 mutex_enter(QLOCK(q));
1424
1425 hp = p = q->iohdrs;
1426
1427 /* find outstanding io_hdr */
1428 while (hp) {
1429 if (hp->dat.qpos == qpos) {
1430 found++;
1431 break;
1432 }
1433 cnt++;
1434 p = hp;
1435 hp = hp->dat.next;
1436 }
1437
1438 if (!found) {
1439 if (RDC_BETWEEN(QHEAD(q), QNXTIO(q), qpos)) {
1440 #ifdef DEBUG
1441 cmn_err(CE_WARN, "!iohdr already cleared? "
1442 "qpos %" NSC_SZFMT " cnt %d ", qpos, cnt);
1443 cmn_err(CE_WARN, "!Qinfo: " QDISPLAY(q));
1444 #endif
1445 mutex_exit(QLOCK(q));
1446 return;
1447 }
1448 mutex_exit(QLOCK(q));
1449 return;
1450 }
1451
1452 /* mark it as flushed */
1453 hp->dat.iostatus = RDC_IOHDR_DONE;
1454
1455 /*
1456 * if it is the head pointer, travel the list updating the queue
1457 * pointers until the next unflushed is reached, freeing on the way.
1458 */
1459 while (hp && (hp->dat.qpos == QHEAD(q)) &&
1460 (hp->dat.iostatus == RDC_IOHDR_DONE)) {
1461 #ifdef DEBUG_FLUSHER_UBERNOISE
1462 cmn_err(CE_NOTE, "!clr_iohdr info: magic %x type %d pos %d"
1463 " qpos %d hpos %d len %d flag 0x%x iostatus %x setid %d",
1464 hp->dat.magic, hp->dat.type, hp->dat.pos, hp->dat.qpos,
1465 hp->dat.hpos, hp->dat.len, hp->dat.flag,
1466 hp->dat.iostatus, hp->dat.setid);
1467 #endif
1468 if (hp->dat.flag & RDC_NULL_BUF) {
1469 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)));
1470 } else {
1471 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)) + hp->dat.len);
1472 DEC_QBLOCKS(q, hp->dat.len);
1473 }
1474
1475 DEC_QNITEMS(q, 1);
1476
1477 if (QHEADSHLDWRAP(q)) { /* simple enough */
1478 #ifdef DEBUG_DISKQWRAP
1479 cmn_err(CE_NOTE, "!wrapping Q head: " QDISPLAY(q));
1480 #endif
1481 /*LINTED*/
1482 WRAPQHEAD(q);
1483 }
1484
1485 /* get rid of the iohdr */
1486 if (hp == q->iohdrs) {
1487 q->iohdrs = hp->dat.next;
1488 kmem_free(hp, sizeof (*hp));
1489 hp = q->iohdrs;
1490 } else {
1491 if (hp == q->hdr_last)
1492 q->hdr_last = p;
1493 p->dat.next = hp->dat.next;
1494 kmem_free(hp, sizeof (*hp));
1495 hp = p->dat.next;
1496 }
1497 q->hdrcnt--;
1498 }
1499
1500 if (QEMPTY(q) && !IS_QSTATE(q, RDC_QFULL) &&
1501 !(IS_QSTATE(q, RDC_QDISABLEPEND))) {
1502 #ifdef DEBUG_FLUSHER_UBERNOISE
1503 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1504 cmn_err(CE_NOTE, "!clr_iohdr: diskq %s empty, "
1505 "resetting defaults", urdc->disk_queue);
1506 #endif
1507
1508 rdc_init_diskq_header(group, &q->disk_hdr);
1509 SET_QNXTIO(q, QHEAD(q));
1510 }
1511
1512 /* wakeup any blocked enqueue threads */
1513 cv_broadcast(&q->qfullcv);
1514 mutex_exit(QLOCK(q));
1515 }
1516
1517 /*
1518 * put in whatever useful checks we can on the io header
1519 */
1520 int
rdc_iohdr_ok(io_hdr * hdr)1521 rdc_iohdr_ok(io_hdr *hdr)
1522 {
1523 if (hdr->dat.magic != RDC_IOHDR_MAGIC)
1524 goto bad;
1525 return (1);
1526 bad:
1527
1528 #ifdef DEBUG
1529 cmn_err(CE_WARN, "!Bad io header magic %x type %d pos %" NSC_SZFMT
1530 " hpos %" NSC_SZFMT " qpos %" NSC_SZFMT " len %" NSC_SZFMT
1531 " flag %d iostatus %d setid %d", hdr->dat.magic,
1532 hdr->dat.type, hdr->dat.pos, hdr->dat.hpos, hdr->dat.qpos,
1533 hdr->dat.len, hdr->dat.flag, hdr->dat.iostatus, hdr->dat.setid);
1534 #else
1535 cmn_err(CE_WARN, "!Bad io header retrieved");
1536 #endif
1537 return (0);
1538 }
1539
1540 /*
1541 * rdc_netqueue_insert()
1542 * add an item to a netqueue. No locks necessary as it should only
1543 * be used in a single threaded manor. If that changes, then
1544 * a lock or assertion should be done here
1545 */
1546 void
rdc_netqueue_insert(rdc_aio_t * aio,net_queue * q)1547 rdc_netqueue_insert(rdc_aio_t *aio, net_queue *q)
1548 {
1549 rdc_k_info_t *krdc = &rdc_k_info[aio->index];
1550
1551 /* paranoid check for bit set */
1552 RDC_CHECK_BIT(krdc, aio->pos, aio->len);
1553
1554 if (q->net_qhead == NULL) {
1555 q->net_qhead = q->net_qtail = aio;
1556
1557 } else {
1558 q->net_qtail->next = aio;
1559 q->net_qtail = aio;
1560 }
1561 q->blocks += aio->len;
1562 q->nitems++;
1563
1564 if (q->nitems > q->nitems_hwm) {
1565 q->nitems_hwm = q->nitems;
1566 }
1567 if (q->blocks > q->blocks_hwm) {
1568 q->nitems_hwm = q->blocks;
1569 }
1570 }
1571
1572 /*
1573 * rdc_fill_aio(aio, hdr)
1574 * take the pertinent info from an io_hdr and stick it in
1575 * an aio, including seq number, abuf.
1576 */
1577 void
rdc_fill_aio(rdc_group_t * grp,rdc_aio_t * aio,io_hdr * hdr,nsc_buf_t * abuf)1578 rdc_fill_aio(rdc_group_t *grp, rdc_aio_t *aio, io_hdr *hdr, nsc_buf_t *abuf)
1579 {
1580 if (hdr->dat.flag & RDC_NULL_BUF) {
1581 aio->handle = NULL;
1582 } else {
1583 aio->handle = abuf;
1584 }
1585 aio->qhandle = abuf;
1586 aio->pos = hdr->dat.pos;
1587 aio->qpos = hdr->dat.qpos;
1588 aio->len = hdr->dat.len;
1589 aio->flag = hdr->dat.flag;
1590 if ((aio->index = rdc_setid2idx(hdr->dat.setid)) < 0)
1591 return;
1592 mutex_enter(&grp->diskq.disk_qlock);
1593 if (grp->ra_queue.qfflags & RDC_QFILLSLEEP) {
1594 mutex_exit(&grp->diskq.disk_qlock);
1595 aio->seq = RDC_NOSEQ;
1596 return;
1597 }
1598 if (abuf && aio->qhandle) {
1599 abuf->sb_user++;
1600 }
1601 aio->seq = grp->seq++;
1602 if (grp->seq < aio->seq)
1603 grp->seq = RDC_NEWSEQ + 1;
1604 mutex_exit(&grp->diskq.disk_qlock);
1605 hdr->dat.iostatus = aio->seq;
1606
1607 }
1608
1609 #ifdef DEBUG
1610 int maxaios_perbuf = 0;
1611 int midaios_perbuf = 0;
1612 int aveaios_perbuf = 0;
1613 int totaios_perbuf = 0;
1614 int buf2qcalls = 0;
1615
1616 void
calc_perbuf(int items)1617 calc_perbuf(int items)
1618 {
1619 if (totaios_perbuf < 0) {
1620 maxaios_perbuf = 0;
1621 midaios_perbuf = 0;
1622 aveaios_perbuf = 0;
1623 totaios_perbuf = 0;
1624 buf2qcalls = 0;
1625 }
1626
1627 if (items > maxaios_perbuf)
1628 maxaios_perbuf = items;
1629 midaios_perbuf = maxaios_perbuf / 2;
1630 totaios_perbuf += items;
1631 aveaios_perbuf = totaios_perbuf / buf2qcalls;
1632 }
1633 #endif
1634
1635 /*
1636 * rdc_discard_tmpq()
1637 * free up the passed temporary queue
1638 * NOTE: no cv's or mutexes have been initialized
1639 */
1640 void
rdc_discard_tmpq(net_queue * q)1641 rdc_discard_tmpq(net_queue *q)
1642 {
1643 rdc_aio_t *aio;
1644
1645 if (q == NULL)
1646 return;
1647
1648 while (q->net_qhead) {
1649 aio = q->net_qhead;
1650 q->net_qhead = q->net_qhead->next;
1651 if (aio->qhandle) {
1652 aio->qhandle->sb_user--;
1653 if (aio->qhandle->sb_user == 0) {
1654 rdc_fixlen(aio);
1655 (void) nsc_free_buf(aio->qhandle);
1656 }
1657 }
1658 kmem_free(aio, sizeof (*aio));
1659 q->nitems--;
1660 }
1661 kmem_free(q, sizeof (*q));
1662
1663 }
1664
1665 /*
1666 * rdc_diskq_buf2queue()
1667 * take a chunk of the diskq, parse it and assemble
1668 * a chain of rdc_aio_t's.
1669 * updates QNXTIO()
1670 */
1671 net_queue *
rdc_diskq_buf2queue(rdc_group_t * grp,nsc_buf_t ** abuf,int index)1672 rdc_diskq_buf2queue(rdc_group_t *grp, nsc_buf_t **abuf, int index)
1673 {
1674 rdc_aio_t *aio = NULL;
1675 nsc_vec_t *vecp = NULL;
1676 uchar_t *vaddr = NULL;
1677 uchar_t *ioaddr = NULL;
1678 net_queue *netq = NULL;
1679 io_hdr *hdr = NULL;
1680 nsc_buf_t *buf = *abuf;
1681 rdc_u_info_t *urdc = &rdc_u_info[index];
1682 rdc_k_info_t *krdc = &rdc_k_info[index];
1683 disk_queue *dq = &grp->diskq;
1684 net_queue *nq = &grp->ra_queue;
1685 int nullbuf = 0;
1686 nsc_off_t endobuf;
1687 nsc_off_t bufoff;
1688 int vlen;
1689 nsc_off_t fpos;
1690 long bufcnt = 0;
1691 int nullblocks = 0;
1692 int fail = 1;
1693
1694 if (buf == NULL)
1695 return (NULL);
1696
1697 netq = kmem_zalloc(sizeof (*netq), KM_NOSLEEP);
1698 if (netq == NULL) {
1699 cmn_err(CE_WARN, "!SNDR: unable to allocate net queue");
1700 return (NULL);
1701 }
1702
1703 vecp = buf->sb_vec;
1704 vlen = vecp->sv_len;
1705 vaddr = vecp->sv_addr;
1706 bufoff = buf->sb_pos;
1707 endobuf = bufoff + buf->sb_len;
1708
1709 #ifdef DEBUG_FLUSHER_UBERNOISE
1710 cmn_err(CE_WARN, "!BUFFOFFENTER %d", bufoff);
1711 #endif
1712 /* CONSTCOND */
1713 while (1) {
1714 if (IS_STATE(urdc, RDC_LOGGING) ||
1715 (nq->qfflags & RDC_QFILLSLEEP)) {
1716 fail = 0;
1717 goto fail;
1718 }
1719 #ifdef DEBUG_FLUSHER_UBERNOISE
1720 cmn_err(CE_WARN, "!BUFFOFF_0 %d", bufoff);
1721 #endif
1722
1723 if ((vaddr == NULL) || (vlen == 0))
1724 break;
1725
1726 if (vlen <= 0) {
1727 vecp++;
1728 vaddr = vecp->sv_addr;
1729 vlen = vecp->sv_len;
1730 if (vaddr == NULL)
1731 break;
1732 }
1733
1734 /* get the iohdr information */
1735
1736 hdr = kmem_zalloc(sizeof (*hdr), KM_NOSLEEP);
1737 if (hdr == NULL) {
1738 cmn_err(CE_WARN,
1739 "!SNDR: unable to alocate net queue header");
1740 goto fail;
1741 }
1742
1743 ioaddr = (uchar_t *)hdr;
1744
1745 bcopy(vaddr, ioaddr, sizeof (*hdr));
1746
1747 if (!rdc_iohdr_ok(hdr)) {
1748 cmn_err(CE_WARN,
1749 "!unable to retrieve i/o data from queue %s "
1750 "at offset %" NSC_SZFMT " bp: %" NSC_SZFMT " bl: %"
1751 NSC_SZFMT, urdc->disk_queue,
1752 bufoff, buf->sb_pos, buf->sb_len);
1753 #ifdef DEBUG_DISKQ
1754 cmn_err(CE_WARN, "!FAILING QUEUE state: %x",
1755 rdc_get_vflags(urdc));
1756 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(dq));
1757 cmn_err(CE_WARN, "!VADDR %p, IOADDR %p", vaddr, ioaddr);
1758 cmn_err(CE_WARN, "!BUF %p", buf);
1759 #endif
1760 cmn_err(CE_WARN, "!qinfo: " QDISPLAYND(dq));
1761
1762 goto fail;
1763 }
1764
1765 nullbuf = hdr->dat.flag & RDC_NULL_BUF;
1766
1767 bufoff += FBA_NUM(sizeof (*hdr));
1768
1769 /* out of buffer, set nxtio to re read this last hdr */
1770 if (!nullbuf && ((bufoff + hdr->dat.len) > endobuf)) {
1771 break;
1772 }
1773
1774 bufcnt += FBA_NUM(sizeof (*hdr));
1775
1776 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1777 if (aio == NULL) {
1778 bufcnt -= FBA_NUM(sizeof (*hdr));
1779 cmn_err(CE_WARN, "!SNDR: net queue aio alloc failed");
1780 goto fail;
1781 }
1782
1783 if (!nullbuf) {
1784 /* move to next iohdr in big buf */
1785 bufoff += hdr->dat.len;
1786 bufcnt += hdr->dat.len;
1787 }
1788
1789 rdc_fill_aio(grp, aio, hdr, buf);
1790
1791 if (aio->index < 0) {
1792 cmn_err(CE_WARN, "!Set id %d not found or no longer "
1793 "enabled, failing disk queue", hdr->dat.setid);
1794 kmem_free(aio, sizeof (*aio));
1795 goto fail;
1796 }
1797 if (aio->seq == RDC_NOSEQ) {
1798 kmem_free(aio, sizeof (*aio));
1799 fail = 0;
1800 goto fail;
1801 }
1802 if (aio->handle == NULL)
1803 nullblocks += aio->len;
1804
1805 rdc_add_iohdr(hdr, grp);
1806 hdr = NULL; /* don't accidentally free on break or fail */
1807 rdc_netqueue_insert(aio, netq);
1808
1809 /* no more buffer, skip the below logic */
1810 if ((bufoff + FBA_NUM(sizeof (*hdr))) >= endobuf) {
1811 break;
1812 }
1813
1814 fpos = bufoff - buf->sb_pos;
1815 vecp = buf->sb_vec;
1816 for (; fpos >= FBA_NUM(vecp->sv_len); vecp++)
1817 fpos -= FBA_NUM(vecp->sv_len);
1818 vlen = vecp->sv_len - FBA_SIZE(fpos);
1819 vaddr = vecp->sv_addr + FBA_SIZE(fpos);
1820 /* abuf = NULL; */
1821
1822 }
1823
1824 /* free extraneous header */
1825 if (hdr) {
1826 kmem_free(hdr, sizeof (*hdr));
1827 hdr = NULL;
1828 }
1829
1830 /*
1831 * probably won't happen, but if we didn't goto fail, but
1832 * we don't contain anything meaningful.. return NULL
1833 * and let the flusher or the sleep/wakeup routines
1834 * decide
1835 */
1836 if (netq && netq->nitems == 0) {
1837 kmem_free(netq, sizeof (*netq));
1838 return (NULL);
1839 }
1840
1841 #ifdef DEBUG
1842 buf2qcalls++;
1843 calc_perbuf(netq->nitems);
1844 #endif
1845 if (IS_STATE(urdc, RDC_LOGGING) ||
1846 nq->qfflags & RDC_QFILLSLEEP) {
1847 fail = 0;
1848 goto fail;
1849 }
1850
1851 mutex_enter(QLOCK(dq));
1852 INC_QNXTIO(dq, bufcnt);
1853 mutex_exit(QLOCK(dq));
1854
1855 netq->net_qtail->orig_len = nullblocks; /* overload */
1856
1857 return (netq);
1858
1859 fail:
1860
1861 if (hdr) {
1862 kmem_free(hdr, sizeof (*hdr));
1863 }
1864
1865 if (netq) {
1866 if (netq->nitems > 0) {
1867 /* the never can happen case ... */
1868 if ((netq->nitems == 1) &&
1869 (netq->net_qhead->handle == NULL))
1870 (void) nsc_free_buf(buf);
1871 *abuf = NULL;
1872
1873 }
1874 rdc_discard_tmpq(netq);
1875 }
1876
1877 mutex_enter(QLOCK(dq));
1878 rdc_dump_iohdrs(dq);
1879 mutex_exit(QLOCK(dq));
1880
1881 if (fail) { /* real failure, not just state change */
1882 #ifdef DEBUG
1883 cmn_err(CE_WARN, "!rdc_diskq_buf2queue: failing disk queue %s",
1884 urdc->disk_queue);
1885 #endif
1886 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
1887 }
1888
1889 return (NULL);
1890
1891 }
1892
1893 /*
1894 * rdc_diskq_unqueue
1895 * remove one chunk from the diskq belonging to
1896 * rdc_k_info[index]
1897 * updates the head and tail pointers in the disk header
1898 * but does not write. The header should be written on ack
1899 * flusher should free whatever..
1900 */
1901 rdc_aio_t *
rdc_diskq_unqueue(int index)1902 rdc_diskq_unqueue(int index)
1903 {
1904 int rc, rc1, rc2;
1905 nsc_off_t qhead;
1906 int nullhandle = 0;
1907 io_hdr *iohdr;
1908 rdc_aio_t *aio = NULL;
1909 nsc_buf_t *buf = NULL;
1910 nsc_buf_t *abuf = NULL;
1911 rdc_group_t *group = NULL;
1912 disk_queue *q = NULL;
1913 rdc_k_info_t *krdc = &rdc_k_info[index];
1914 rdc_u_info_t *urdc = &rdc_u_info[index];
1915
1916 group = krdc->group;
1917 q = &group->diskq;
1918
1919 if (group->diskqfd == NULL) /* we've been disabled */
1920 return (NULL);
1921
1922 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1923 if (!aio) {
1924 return (NULL);
1925 }
1926
1927 iohdr = kmem_zalloc(sizeof (*iohdr), KM_NOSLEEP);
1928 if (!iohdr) {
1929 kmem_free(aio, sizeof (*aio));
1930 return (NULL);
1931 }
1932
1933 mutex_enter(QLOCK(q));
1934 rdc_set_qbusy(q); /* make sure no one disables the queue */
1935 mutex_exit(QLOCK(q));
1936
1937 DTRACE_PROBE(rdc_diskq_unq_rsrv);
1938
1939 if (_rdc_rsrv_diskq(group)) {
1940 cmn_err(CE_WARN, "!rdc_unqueue: %s reserve failed",
1941 urdc->disk_queue);
1942 goto fail;
1943 }
1944
1945 mutex_enter(QHEADLOCK(q));
1946 mutex_enter(QLOCK(q));
1947
1948 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || IS_STATE(urdc, RDC_LOGGING)) {
1949 rdc_clr_qbusy(q);
1950 mutex_exit(QLOCK(q));
1951 mutex_exit(QHEADLOCK(q));
1952 kmem_free(aio, sizeof (*aio));
1953 kmem_free(iohdr, sizeof (*iohdr));
1954 return (NULL);
1955 }
1956
1957 if (QNXTIOSHLDWRAP(q)) {
1958 #ifdef DEBUG_DISKQWRAP
1959 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(q));
1960 #endif
1961 /*LINTED*/
1962 WRAPQNXTIO(q);
1963 }
1964
1965 /* read the metainfo at q->nxt_io first */
1966 if (QNXTIO(q) == QTAIL(q)) { /* empty */
1967
1968 _rdc_rlse_diskq(group);
1969 if (q->lastio->handle)
1970 (void) nsc_free_buf(q->lastio->handle);
1971 bzero(&(*q->lastio), sizeof (*q->lastio));
1972
1973 mutex_exit(QHEADLOCK(q));
1974 rdc_clr_qbusy(q);
1975 mutex_exit(QLOCK(q));
1976 kmem_free(aio, sizeof (*aio));
1977 kmem_free(iohdr, sizeof (*iohdr));
1978 return (NULL);
1979 }
1980
1981 qhead = QNXTIO(q);
1982
1983 /*
1984 * have to drop the lock here, sigh. Cannot block incoming io
1985 * we have to wait until after this read to find out how
1986 * much to increment QNXTIO. Might as well grab the seq then too
1987 */
1988
1989 while ((qhead == LASTQTAIL(q)) && (IS_QSTATE(q, QTAILBUSY))) {
1990 mutex_exit(QLOCK(q));
1991 #ifdef DEBUG_DISKQ
1992 cmn_err(CE_NOTE, "!Qtail busy delay lastqtail: %d", qhead);
1993 #endif
1994 delay(5);
1995 mutex_enter(QLOCK(q));
1996 }
1997 mutex_exit(QLOCK(q));
1998
1999 DTRACE_PROBE(rdc_diskq_iohdr_read_start);
2000
2001 rc = rdc_ns_io(group->diskqfd, NSC_READ, qhead,
2002 (uchar_t *)iohdr, FBA_SIZE(1));
2003
2004 DTRACE_PROBE(rdc_diskq_iohdr_read_end);
2005
2006 if (!RDC_SUCCESS(rc) || !rdc_iohdr_ok(iohdr)) {
2007 cmn_err(CE_WARN, "!unable to retrieve i/o data from queue %s"
2008 " at offset %" NSC_SZFMT " rc %d", urdc->disk_queue,
2009 qhead, rc);
2010 #ifdef DEBUG_DISKQ
2011 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
2012 #endif
2013 mutex_exit(QHEADLOCK(q));
2014 goto fail;
2015 }
2016
2017 /* XXX process buffer here, creating rdc_aio_t's */
2018
2019 mutex_enter(QLOCK(q));
2020 /* update the next pointer */
2021 if (iohdr->dat.flag == RDC_NULL_BUF) {
2022 INC_QNXTIO(q, FBA_LEN(sizeof (io_hdr)));
2023 nullhandle = 1;
2024 } else {
2025 INC_QNXTIO(q, (FBA_LEN(sizeof (io_hdr)) + iohdr->dat.len));
2026 }
2027
2028 aio->seq = group->seq++;
2029 if (group->seq < aio->seq)
2030 group->seq = RDC_NEWSEQ + 1;
2031
2032 mutex_exit(QLOCK(q));
2033 mutex_exit(QHEADLOCK(q));
2034
2035 #ifdef DEBUG_FLUSHER_UBERNOISE
2036 p = &iohdr->dat;
2037 cmn_err(CE_NOTE, "!unqueued iohdr from %d pos: %d len: %d flag: %d "
2038 "iostatus: %d setid: %d time: %d", qhead, p->pos, p->len,
2039 p->flag, p->iostatus, p->setid, p->time);
2040 #endif
2041
2042 if (nullhandle) /* nothing to get from queue */
2043 goto nullbuf;
2044
2045 /* now that we know how much to get (iohdr.dat.len), get it */
2046 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_start);
2047
2048 rc = nsc_alloc_buf(group->diskqfd, qhead + 1, iohdr->dat.len,
2049 NSC_NOCACHE | NSC_READ, &buf);
2050
2051 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_end);
2052
2053 /* and get somewhere to keep it for a bit */
2054 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_start);
2055
2056 rc1 = nsc_alloc_abuf(qhead + 1, iohdr->dat.len, 0, &abuf);
2057
2058 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_end);
2059
2060 if (!RDC_SUCCESS(rc) || !RDC_SUCCESS(rc1)) { /* uh-oh */
2061 cmn_err(CE_WARN, "!disk queue %s read failure",
2062 urdc->disk_queue);
2063 goto fail;
2064 }
2065
2066 /* move it on over... */
2067 rc2 = nsc_copy(buf, abuf, qhead + 1, qhead + 1, iohdr->dat.len);
2068
2069 if (!RDC_SUCCESS(rc2)) {
2070 #ifdef DEBUG
2071 cmn_err(CE_WARN, "!nsc_copy failed for diskq unqueue");
2072 #endif
2073 goto fail;
2074 }
2075
2076 /* let go of the real buf, we've got the abuf */
2077 (void) nsc_free_buf(buf);
2078 buf = NULL;
2079
2080 aio->handle = abuf;
2081 /* Hack in the original sb_pos */
2082 aio->handle->sb_pos = iohdr->dat.hpos;
2083
2084 /* skip the RDC_HANDLE_LIMITS check */
2085 abuf->sb_user |= RDC_DISKQUE;
2086
2087 nullbuf:
2088 if (nullhandle) {
2089 aio->handle = NULL;
2090 }
2091
2092 /* set up the rest of the aio values, seq set above ... */
2093 aio->pos = iohdr->dat.pos;
2094 aio->qpos = iohdr->dat.qpos;
2095 aio->len = iohdr->dat.len;
2096 aio->flag = iohdr->dat.flag;
2097 aio->index = rdc_setid2idx(iohdr->dat.setid);
2098 if (aio->index < 0) { /* uh-oh */
2099 #ifdef DEBUG
2100 cmn_err(CE_WARN, "!rdc_diskq_unqueue: index < 0");
2101 #endif
2102 goto fail;
2103 }
2104
2105
2106 #ifdef DEBUG_FLUSHER_UBERNOISE_STAMP
2107 h = &q->disk_hdr.h;
2108 cmn_err(CE_NOTE, "!stamping diskq header:\n"
2109 "magic: %x\nstate: %d\nhead_offset: %d\n"
2110 "tail_offset: %d\ndisk_size: %d\nnitems: %d\nblocks: %d\n",
2111 h->magic, h->state, h->head_offset, h->tail_offset,
2112 h->disk_size, h->nitems, h->blocks);
2113 #endif
2114
2115 _rdc_rlse_diskq(group);
2116
2117 mutex_enter(QLOCK(q));
2118 rdc_clr_qbusy(q);
2119 mutex_exit(QLOCK(q));
2120
2121 DTRACE_PROBE(rdc_diskq_unq_rlse);
2122
2123 iohdr->dat.iostatus = aio->seq;
2124 rdc_add_iohdr(iohdr, group);
2125
2126 #ifdef DEBUG_FLUSHER_UBERNOISE
2127 if (!nullhandle) {
2128 cmn_err(CE_NOTE, "!UNQUEUING, %p"
2129 " contents: %c%c%c%c%c pos: %d len: %d",
2130 (void *)aio->handle,
2131 aio->handle->sb_vec[0].sv_addr[0],
2132 aio->handle->sb_vec[0].sv_addr[1],
2133 aio->handle->sb_vec[0].sv_addr[2],
2134 aio->handle->sb_vec[0].sv_addr[3],
2135 aio->handle->sb_vec[0].sv_addr[4],
2136 aio->handle->sb_pos, aio->handle->sb_len);
2137 } else {
2138 cmn_err(CE_NOTE, "!UNQUEUING, NULL " QDISPLAY(q));
2139 }
2140 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
2141 #endif
2142
2143 return (aio);
2144
2145 fail:
2146 if (aio)
2147 kmem_free(aio, sizeof (*aio));
2148 if (iohdr)
2149 kmem_free(iohdr, sizeof (*iohdr));
2150 if (buf)
2151 (void) nsc_free_buf(buf);
2152 if (abuf)
2153 (void) nsc_free_buf(abuf);
2154
2155 _rdc_rlse_diskq(group);
2156 #ifdef DEBUG
2157 cmn_err(CE_WARN, "!diskq_unqueue: failing diskq");
2158 #endif
2159 mutex_enter(QLOCK(q));
2160 rdc_clr_qbusy(q);
2161 mutex_exit(QLOCK(q));
2162
2163 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2164
2165 return (NULL);
2166 }
2167
2168 int
rdc_diskq_inuse(rdc_set_t * set,char * diskq)2169 rdc_diskq_inuse(rdc_set_t *set, char *diskq)
2170 {
2171 rdc_u_info_t *urdc;
2172 char *group;
2173 int index;
2174
2175 group = set->group_name;
2176
2177 ASSERT(MUTEX_HELD(&rdc_conf_lock));
2178
2179 if ((rdc_lookup_bitmap(diskq) >= 0) ||
2180 (rdc_lookup_configured(diskq) >= 0)) {
2181 return (1);
2182 }
2183 for (index = 0; index < rdc_max_sets; index++) {
2184 urdc = &rdc_u_info[index];
2185
2186 if (!IS_ENABLED(urdc))
2187 continue;
2188
2189 /* same diskq different group */
2190 if ((strcmp(urdc->disk_queue, diskq) == 0) &&
2191 (urdc->group_name[0] == '\0' ||
2192 strcmp(urdc->group_name, group))) {
2193 return (1);
2194 }
2195 }
2196 /* last, but not least, lets see if someone is getting really funky */
2197 if ((strcmp(set->disk_queue, set->primary.file) == 0) ||
2198 (strcmp(set->disk_queue, set->primary.bitmap) == 0)) {
2199 return (1);
2200 }
2201
2202 return (0);
2203
2204 }
2205
2206 #ifdef DEBUG
2207 int maxlen = 0;
2208 int avelen = 0;
2209 int totalen = 0;
2210 int lencalls = 0;
2211
2212 void
update_lenstats(int len)2213 update_lenstats(int len)
2214 {
2215 if (lencalls == 0) {
2216 lencalls = 1;
2217 avelen = 0;
2218 maxlen = 0;
2219 totalen = 0;
2220 }
2221
2222 if (len > maxlen)
2223 maxlen = len;
2224 totalen += len;
2225 avelen = totalen / lencalls;
2226 }
2227 #endif
2228
2229 /*
2230 * rdc_calc_len()
2231 * returns the size of the diskq that can be read for dequeuing
2232 * always <= RDC_MAX_DISKQREAD
2233 */
2234 int
rdc_calc_len(rdc_k_info_t * krdc,disk_queue * dq)2235 rdc_calc_len(rdc_k_info_t *krdc, disk_queue *dq)
2236 {
2237 nsc_size_t len = 0;
2238
2239 ASSERT(MUTEX_HELD(QLOCK(dq)));
2240
2241 /* ---H-----N-----T--- */
2242 if (QNXTIO(dq) < QTAIL(dq)) {
2243
2244 len = min(RDC_MAX_DISKQREAD, QTAIL(dq) - QNXTIO(dq));
2245
2246 /* ---T-----H-----N--- */
2247 } else if (QNXTIO(dq) > QTAIL(dq)) {
2248 if (QWRAP(dq)) {
2249 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2250 } else { /* should never happen */
2251 len = min(RDC_MAX_DISKQREAD, QSIZE(dq) - QNXTIO(dq));
2252 }
2253 } else if (QNXTIO(dq) == QTAIL(dq)) {
2254 if (QWRAP(dq) && !IS_QSTATE(dq, QNXTIOWRAPD))
2255 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2256 }
2257
2258 len = min(len, krdc->maxfbas);
2259
2260 #ifdef DEBUG
2261 lencalls++;
2262 update_lenstats(len);
2263 #endif
2264
2265 return ((int)len);
2266 }
2267
2268 /*
2269 * lie a little if we can, so we don't get tied up in
2270 * _nsc_wait_dbuf() on the next read. sb_len MUST be
2271 * restored before nsc_free_buf() however, or we will
2272 * be looking at memory leak city..
2273 * so update the entire queue with the info as well
2274 * and the one that ends up freeing it, can fix the len
2275 * IMPORTANT: This assumes that we are not cached, in
2276 * 3.2 caching was turned off for data volumes, if that
2277 * changes, then this must too
2278 */
2279 void
rdc_trim_buf(nsc_buf_t * buf,net_queue * q)2280 rdc_trim_buf(nsc_buf_t *buf, net_queue *q)
2281 {
2282 rdc_aio_t *p;
2283 int len;
2284
2285 if (buf == NULL || q == NULL)
2286 return;
2287
2288 if (q && (buf->sb_len >
2289 (q->blocks + q->nitems - q->net_qtail->orig_len))) {
2290 len = buf->sb_len;
2291 buf->sb_len = (q->blocks + q->nitems - q->net_qtail->orig_len);
2292 }
2293
2294 p = q->net_qhead;
2295 do {
2296 p->orig_len = len;
2297 p = p->next;
2298
2299 } while (p);
2300
2301 }
2302
2303 /*
2304 * rdc_read_diskq_buf()
2305 * read a large as possible chunk of the diskq into a nsc_buf_t
2306 * and convert it to a net_queue of rdc_aio_t's to be appended
2307 * to the group's netqueue
2308 */
2309 net_queue *
rdc_read_diskq_buf(int index)2310 rdc_read_diskq_buf(int index)
2311 {
2312 nsc_buf_t *buf = NULL;
2313 net_queue *tmpnq = NULL;
2314 disk_queue *dq = NULL;
2315 rdc_k_info_t *krdc = &rdc_k_info[index];
2316 rdc_u_info_t *urdc = &rdc_u_info[index];
2317 rdc_group_t *group = krdc->group;
2318 net_queue *nq = &group->ra_queue;
2319 int len = 0;
2320 int rc;
2321 int fail = 0;
2322 int offset = 0;
2323
2324 if (group == NULL || group->diskqfd == NULL) {
2325 DTRACE_PROBE(rdc_read_diskq_buf_bail1);
2326 return (NULL);
2327 }
2328
2329 dq = &group->diskq;
2330
2331 mutex_enter(QLOCK(dq));
2332 rdc_set_qbusy(dq); /* prevent disables on the queue */
2333 mutex_exit(QLOCK(dq));
2334
2335 if (_rdc_rsrv_diskq(group)) {
2336 cmn_err(CE_WARN, "!rdc_readdiskqbuf: %s reserve failed",
2337 urdc->disk_queue);
2338 mutex_enter(QLOCK(dq));
2339 rdc_clr_qbusy(dq); /* prevent disables on the queue */
2340 mutex_exit(QLOCK(dq));
2341 return (NULL);
2342 }
2343
2344 mutex_enter(QHEADLOCK(dq));
2345 mutex_enter(QLOCK(dq));
2346
2347 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2348 IS_STATE(urdc, RDC_LOGGING) ||
2349 (nq->qfflags & RDC_QFILLSLEEP)) {
2350 mutex_exit(QLOCK(dq));
2351 mutex_exit(QHEADLOCK(dq));
2352 DTRACE_PROBE(rdc_read_diskq_buf_bail2);
2353 goto done;
2354 }
2355
2356 /*
2357 * real corner case here, we need to let the flusher wrap first.
2358 * we've gotten too far ahead, so just delay and try again
2359 */
2360 if (IS_QSTATE(dq, QNXTIOWRAPD) && AUXQWRAP(dq)) {
2361 mutex_exit(QLOCK(dq));
2362 mutex_exit(QHEADLOCK(dq));
2363 goto done;
2364 }
2365
2366 if (QNXTIOSHLDWRAP(dq)) {
2367 #ifdef DEBUG_DISKQWRAP
2368 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(dq));
2369 #endif
2370 /*LINTED*/
2371 WRAPQNXTIO(dq);
2372 }
2373
2374 /* read the metainfo at q->nxt_io first */
2375 if (!QNITEMS(dq)) { /* empty */
2376
2377 if (dq->lastio->handle)
2378 (void) nsc_free_buf(dq->lastio->handle);
2379 bzero(&(*dq->lastio), sizeof (*dq->lastio));
2380 mutex_exit(QLOCK(dq));
2381 mutex_exit(QHEADLOCK(dq));
2382 DTRACE_PROBE(rdc_read_diskq_buf_bail3);
2383 goto done;
2384 }
2385
2386
2387 len = rdc_calc_len(krdc, dq);
2388
2389 if ((len <= 0) || (IS_STATE(urdc, RDC_LOGGING)) ||
2390 (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2391 (nq->qfflags & RDC_QFILLSLEEP)) {
2392 mutex_exit(QLOCK(dq));
2393 mutex_exit(QHEADLOCK(dq));
2394 /*
2395 * a write could be trying to get on the queue, or if
2396 * the queue is really really small, a complete image
2397 * of it could be on the net queue waiting for flush.
2398 * the latter being a fairly stupid scenario and a gross
2399 * misconfiguration.. but what the heck, why make the thread
2400 * thrash around.. just pause a little here.
2401 */
2402 if (len <= 0)
2403 delay(50);
2404
2405 DTRACE_PROBE3(rdc_read_diskq_buf_bail4, int, len,
2406 int, rdc_get_vflags(urdc), int, nq->qfflags);
2407
2408 goto done;
2409 }
2410
2411 DTRACE_PROBE2(rdc_calc_len, int, len, int, (int)QNXTIO(dq));
2412
2413 #ifdef DEBUG_FLUSHER_UBERNOISE
2414 cmn_err(CE_WARN, "!CALC_LEN(%d) h:%d n%d t%d, w%d",
2415 len, QHEAD(dq), QNXTIO(dq), QTAIL(dq), QWRAP(dq));
2416 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(dq));
2417 #endif
2418 SET_QCOALBOUNDS(dq, QNXTIO(dq) + len);
2419
2420 while ((LASTQTAIL(dq) > 0) && !QWRAP(dq) &&
2421 ((QNXTIO(dq) + len) >= LASTQTAIL(dq)) &&
2422 (IS_QSTATE(dq, QTAILBUSY))) {
2423 mutex_exit(QLOCK(dq));
2424
2425 #ifdef DEBUG_FLUSHER_UBERNOISE
2426 cmn_err(CE_NOTE, "!Qtail busy delay nxtio %d len %d "
2427 "lastqtail: %d", QNXTIO(dq), len, LASTQTAIL(dq));
2428 #endif
2429 delay(20);
2430 mutex_enter(QLOCK(dq));
2431 }
2432
2433 offset = QNXTIO(dq);
2434
2435 /*
2436 * one last check to see if we have gone logging, or should.
2437 * we may have released the mutex above, so check again
2438 */
2439 if ((IS_STATE(urdc, RDC_LOGGING)) ||
2440 (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2441 (nq->qfflags & RDC_QFILLSLEEP)) {
2442 mutex_exit(QLOCK(dq));
2443 mutex_exit(QHEADLOCK(dq));
2444 goto done;
2445 }
2446
2447 mutex_exit(QLOCK(dq));
2448 mutex_exit(QHEADLOCK(dq));
2449
2450 DTRACE_PROBE2(rdc_buf2q_preread, int, offset, int, len);
2451
2452 rc = nsc_alloc_buf(group->diskqfd, offset, len,
2453 NSC_NOCACHE | NSC_READ, &buf);
2454
2455 if (!RDC_SUCCESS(rc)) {
2456 cmn_err(CE_WARN, "!disk queue %s read failure pos %" NSC_SZFMT
2457 " len %d", urdc->disk_queue, QNXTIO(dq), len);
2458 fail++;
2459 buf = NULL;
2460 DTRACE_PROBE(rdc_read_diskq_buf_bail5);
2461 goto done;
2462 }
2463
2464 DTRACE_PROBE2(rdc_buf2q_postread, int, offset, nsc_size_t, buf->sb_len);
2465
2466 /*
2467 * convert buf to a net_queue. buf2queue will
2468 * update the QNXTIO pointer for us, based on
2469 * the last readable queue item
2470 */
2471 tmpnq = rdc_diskq_buf2queue(group, &buf, index);
2472
2473 #ifdef DEBUG_FLUSHER_UBERNOISE
2474 cmn_err(CE_NOTE, "!QBUF p: %d l: %d p+l: %d users: %d qblocks: %d ",
2475 "qitems: %d WASTED: %d", buf->sb_pos, buf->sb_len,
2476 buf->sb_pos+buf->sb_len, buf->sb_user, tmpnq?tmpnq->blocks:-1,
2477 tmpnq?tmpnq->nitems:-1,
2478 tmpnq?((buf->sb_len-tmpnq->nitems) - tmpnq->blocks):-1);
2479 #endif
2480
2481 DTRACE_PROBE3(rdc_buf2que_returned, net_queue *, tmpnq?tmpnq:0,
2482 uint64_t, tmpnq?tmpnq->nitems:0,
2483 uint_t, tmpnq?tmpnq->net_qhead->seq:0);
2484 done:
2485
2486 /* we don't need to retain the buf */
2487 if (tmpnq == NULL)
2488 if (buf) {
2489 (void) nsc_free_buf(buf);
2490 buf = NULL;
2491 }
2492
2493 rdc_trim_buf(buf, tmpnq);
2494
2495 mutex_enter(QLOCK(dq));
2496 rdc_clr_qbusy(dq);
2497 mutex_exit(QLOCK(dq));
2498
2499 _rdc_rlse_diskq(group);
2500
2501 if (fail) {
2502 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2503 tmpnq = NULL;
2504 }
2505
2506 return (tmpnq);
2507 }
2508
2509 /*
2510 * rdc_dequeue()
2511 * removes the head of the memory queue
2512 */
2513 rdc_aio_t *
rdc_dequeue(rdc_k_info_t * krdc,int * rc)2514 rdc_dequeue(rdc_k_info_t *krdc, int *rc)
2515 {
2516 net_queue *q = &krdc->group->ra_queue;
2517 disk_queue *dq = &krdc->group->diskq;
2518 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2519 rdc_aio_t *aio;
2520
2521 *rc = 0;
2522
2523 if (q == NULL)
2524 return (NULL);
2525
2526 mutex_enter(&q->net_qlock);
2527
2528 aio = q->net_qhead;
2529
2530 if (aio == NULL) {
2531 #ifdef DEBUG
2532 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2533 cmn_err(CE_PANIC,
2534 "rdc_dequeue(1): q %p, q blocks %" NSC_SZFMT
2535 " , nitems %" NSC_SZFMT ", qhead %p qtail %p",
2536 (void *) q, q->blocks, q->nitems,
2537 (void *) aio, (void *) q->net_qtail);
2538 }
2539 #endif
2540
2541 mutex_exit(&q->net_qlock);
2542
2543 if ((!IS_STATE(urdc, RDC_LOGGING)) &&
2544 (!(q->qfflags & RDC_QFILLSLEEP)) &&
2545 (!IS_STATE(urdc, RDC_SYNCING)) && (QNITEMS(dq) > 0)) {
2546 *rc = EAGAIN;
2547 }
2548
2549 goto done;
2550 }
2551
2552 /* aio remove from q */
2553
2554 q->net_qhead = aio->next;
2555 aio->next = NULL;
2556
2557 if (q->net_qtail == aio)
2558 q->net_qtail = q->net_qhead;
2559
2560 q->blocks -= aio->len;
2561 q->nitems--;
2562
2563 #ifdef DEBUG
2564 if (q->net_qhead == NULL) {
2565 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2566 cmn_err(CE_PANIC, "rdc_dequeue(2): q %p, q blocks %"
2567 NSC_SZFMT " nitems %" NSC_SZFMT
2568 " , qhead %p qtail %p",
2569 (void *) q, q->blocks, q->nitems,
2570 (void *) q->net_qhead, (void *) q->net_qtail);
2571 }
2572 }
2573 #endif
2574 mutex_exit(&q->net_qlock);
2575 done:
2576
2577 mutex_enter(&q->net_qlock);
2578
2579 if (rdc_qfill_shldwakeup(krdc))
2580 cv_broadcast(&q->qfcv);
2581
2582 /*
2583 * clear EAGAIN if
2584 * logging or q filler thread is sleeping or stopping altogether
2585 * or if q filler thread is dead already
2586 * or if syncing, this will return a null aio, with no error code set
2587 * telling the flusher to die
2588 */
2589 if (*rc == EAGAIN) {
2590 if (IS_STATE(urdc, RDC_LOGGING) ||
2591 (q->qfflags & (RDC_QFILLSLEEP | RDC_QFILLSTOP)) ||
2592 (IS_QSTATE(dq, (RDC_QDISABLEPEND | RDC_STOPPINGFLUSH))) ||
2593 (q->qfill_sleeping == RDC_QFILL_DEAD) ||
2594 (IS_STATE(urdc, RDC_SYNCING)))
2595 *rc = 0;
2596 }
2597
2598 mutex_exit(&q->net_qlock);
2599
2600 return (aio);
2601
2602 }
2603
2604 /*
2605 * rdc_qfill_shldsleep()
2606 * returns 1 if the qfilling code should cv_wait() 0 if not.
2607 * reasons for going into cv_wait();
2608 * there is nothing in the diskq to flush to mem.
2609 * the memory queue has gotten too big and needs more flushing attn.
2610 */
2611 int
rdc_qfill_shldsleep(rdc_k_info_t * krdc)2612 rdc_qfill_shldsleep(rdc_k_info_t *krdc)
2613 {
2614 net_queue *nq = &krdc->group->ra_queue;
2615 disk_queue *dq = &krdc->group->diskq;
2616 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2617
2618 ASSERT(MUTEX_HELD(&nq->net_qlock));
2619
2620 if (!RDC_IS_DISKQ(krdc->group))
2621 return (1);
2622
2623 if (nq->qfflags & RDC_QFILLSLEEP) {
2624 #ifdef DEBUG_DISKQ_NOISY
2625 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QFILLSLEEP idx: %d",
2626 krdc->index);
2627 #endif
2628 return (1);
2629 }
2630
2631 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2632 #ifdef DEBUG_DISKQ_NOISY
2633 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: Sync|Log (0x%x)"
2634 " idx: %d", rdc_get_vflags(urdc), urdc->index);
2635 #endif
2636 return (1);
2637 }
2638
2639 mutex_enter(QLOCK(dq));
2640 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
2641 #ifdef DEBUG_DISKQ_NOISY
2642 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QEMPTY");
2643 #endif
2644 mutex_exit(QLOCK(dq));
2645 return (1);
2646 }
2647 mutex_exit(QLOCK(dq));
2648
2649 if (nq->blocks >= RDC_MAX_QBLOCKS) {
2650 nq->hwmhit = 1;
2651 /* stuck flushers ? */
2652 #ifdef DEBUG_DISKQ_NOISY
2653 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: memq full:"
2654 " seq: %d seqack %d", krdc->group->seq,
2655 krdc->group->seqack);
2656 #endif
2657 return (1);
2658 }
2659
2660 return (0);
2661 }
2662
2663 /*
2664 * rdc_join_netqueues(a, b)
2665 * appends queue b to queue a updating all the queue info
2666 * as it is assumed queue a is the important one,
2667 * it's mutex must be held. no one can add to queue b
2668 */
2669 void
rdc_join_netqueues(net_queue * q,net_queue * tmpq)2670 rdc_join_netqueues(net_queue *q, net_queue *tmpq)
2671 {
2672 ASSERT(MUTEX_HELD(&q->net_qlock));
2673
2674 if (q->net_qhead == NULL) { /* empty */
2675 #ifdef DEBUG
2676 if (q->blocks != 0 || q->nitems != 0) {
2677 cmn_err(CE_PANIC, "rdc filler: q %p, qhead 0, "
2678 " q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT,
2679 (void *) q, q->blocks, q->nitems);
2680 }
2681 #endif
2682 q->net_qhead = tmpq->net_qhead;
2683 q->net_qtail = tmpq->net_qtail;
2684 q->nitems = tmpq->nitems;
2685 q->blocks = tmpq->blocks;
2686 } else {
2687 q->net_qtail->next = tmpq->net_qhead;
2688 q->net_qtail = tmpq->net_qtail;
2689 q->nitems += tmpq->nitems;
2690 q->blocks += tmpq->blocks;
2691 }
2692
2693 if (q->nitems > q->nitems_hwm) {
2694 q->nitems_hwm = q->nitems;
2695 }
2696
2697 if (q->blocks > q->blocks_hwm) {
2698 q->blocks_hwm = q->blocks;
2699 }
2700 }
2701
2702 /*
2703 * rdc_qfiller_thr() single thread that moves
2704 * data from the diskq to a memory queue for
2705 * the flusher to pick up.
2706 */
2707 void
rdc_qfiller_thr(rdc_k_info_t * krdc)2708 rdc_qfiller_thr(rdc_k_info_t *krdc)
2709 {
2710 rdc_group_t *grp = krdc->group;
2711 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2712 net_queue *q = &grp->ra_queue;
2713 net_queue *tmpq = NULL;
2714 int index = krdc->index;
2715
2716 q->qfill_sleeping = RDC_QFILL_AWAKE;
2717 while (!(q->qfflags & RDC_QFILLSTOP)) {
2718 if (!RDC_IS_DISKQ(grp) ||
2719 IS_STATE(urdc, RDC_LOGGING) ||
2720 IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2721 (q->qfflags & RDC_QFILLSLEEP)) {
2722 goto nulltmpq;
2723 }
2724
2725 DTRACE_PROBE(qfiller_top);
2726 tmpq = rdc_read_diskq_buf(index);
2727
2728 if (tmpq == NULL)
2729 goto nulltmpq;
2730
2731 if ((q->qfflags & RDC_QFILLSLEEP) ||
2732 IS_STATE(urdc, RDC_LOGGING)) {
2733 rdc_discard_tmpq(tmpq);
2734 goto nulltmpq;
2735 }
2736
2737 mutex_enter(&q->net_qlock);
2738
2739 /* race with log, redundant yet paranoid */
2740 if ((q->qfflags & RDC_QFILLSLEEP) ||
2741 IS_STATE(urdc, RDC_LOGGING)) {
2742 rdc_discard_tmpq(tmpq);
2743 mutex_exit(&q->net_qlock);
2744 goto nulltmpq;
2745 }
2746
2747
2748 rdc_join_netqueues(q, tmpq);
2749 kmem_free(tmpq, sizeof (*tmpq));
2750 tmpq = NULL;
2751
2752 mutex_exit(&q->net_qlock);
2753 nulltmpq:
2754 /*
2755 * sleep for a while if we can.
2756 * the enqueuing or flushing code will
2757 * wake us if if necessary.
2758 */
2759 mutex_enter(&q->net_qlock);
2760 while (rdc_qfill_shldsleep(krdc)) {
2761 q->qfill_sleeping = RDC_QFILL_ASLEEP;
2762 DTRACE_PROBE(qfiller_sleep);
2763 cv_wait(&q->qfcv, &q->net_qlock);
2764 DTRACE_PROBE(qfiller_wakeup);
2765 q->qfill_sleeping = RDC_QFILL_AWAKE;
2766 if (q->qfflags & RDC_QFILLSTOP) {
2767 #ifdef DEBUG_DISKQ
2768 cmn_err(CE_NOTE,
2769 "!rdc_qfiller_thr: recieved kill signal");
2770 #endif
2771 mutex_exit(&q->net_qlock);
2772 goto done;
2773 }
2774 }
2775 mutex_exit(&q->net_qlock);
2776
2777 DTRACE_PROBE(qfiller_bottom);
2778 }
2779 done:
2780 DTRACE_PROBE(qfiller_done);
2781 q->qfill_sleeping = RDC_QFILL_DEAD; /* the big sleep */
2782
2783 #ifdef DEBUG
2784 cmn_err(CE_NOTE, "!rdc_qfiller_thr stopping");
2785 #endif
2786 q->qfflags &= ~RDC_QFILLSTOP;
2787
2788 }
2789
2790 int
_rdc_add_diskq(int index,char * diskq)2791 _rdc_add_diskq(int index, char *diskq)
2792 {
2793 rdc_k_info_t *krdc, *kp;
2794 rdc_u_info_t *urdc, *up;
2795 rdc_group_t *group;
2796 int rc;
2797
2798 krdc = &rdc_k_info[index];
2799 urdc = &rdc_u_info[index];
2800 group = krdc->group;
2801
2802 if (!diskq || urdc->disk_queue[0]) { /* how'd that happen? */
2803 #ifdef DEBUG
2804 cmn_err(CE_WARN, "!NULL diskq in _rdc_add_diskq");
2805 #endif
2806 rc = -1;
2807 goto fail;
2808 }
2809
2810 /* if the enable fails, this is bzero'ed */
2811 (void) strncpy(urdc->disk_queue, diskq, NSC_MAXPATH);
2812 group->flags &= ~RDC_MEMQUE;
2813 group->flags |= RDC_DISKQUE;
2814
2815 #ifdef DEBUG
2816 cmn_err(CE_NOTE, "!adding diskq to group %s", urdc->group_name);
2817 #endif
2818 mutex_enter(&rdc_conf_lock);
2819 rc = rdc_enable_diskq(krdc);
2820 mutex_exit(&rdc_conf_lock);
2821
2822 if (rc == RDC_EQNOADD) {
2823 goto fail;
2824 }
2825
2826 RDC_ZERO_BITREF(krdc);
2827 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
2828 up = &rdc_u_info[kp->index];
2829 (void) strncpy(up->disk_queue, diskq, NSC_MAXPATH);
2830 /* size lives in the diskq structure, already set by enable */
2831 RDC_ZERO_BITREF(kp);
2832 }
2833
2834 fail:
2835 return (rc);
2836
2837 }
2838
2839 /*
2840 * add a diskq to an existing set/group
2841 */
2842 int
rdc_add_diskq(rdc_config_t * uparms,spcs_s_info_t kstatus)2843 rdc_add_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2844 {
2845 char *diskq;
2846 int rc;
2847 int index;
2848 rdc_k_info_t *krdc, *this;
2849 rdc_u_info_t *urdc;
2850 rdc_group_t *group;
2851 nsc_size_t vol_size = 0;
2852 nsc_size_t req_size = 0;
2853
2854 mutex_enter(&rdc_conf_lock);
2855 index = rdc_lookup_byname(uparms->rdc_set);
2856 mutex_exit(&rdc_conf_lock);
2857 if (index < 0) {
2858 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
2859 uparms->rdc_set->secondary.file);
2860 rc = RDC_EALREADY;
2861 goto failed;
2862 }
2863 urdc = &rdc_u_info[index];
2864 krdc = &rdc_k_info[index];
2865 this = &rdc_k_info[index];
2866 group = krdc->group;
2867 diskq = uparms->rdc_set->disk_queue;
2868
2869 if (!IS_ASYNC(urdc)) {
2870 spcs_s_add(kstatus, RDC_EQWRONGMODE, urdc->primary.intf,
2871 urdc->primary.file, urdc->secondary.intf,
2872 urdc->secondary.file);
2873 rc = RDC_EQNOQUEUE;
2874 goto failed;
2875 }
2876
2877 do {
2878 if (!IS_STATE(urdc, RDC_LOGGING)) {
2879 spcs_s_add(kstatus, RDC_EQNOTLOGGING,
2880 uparms->rdc_set->disk_queue);
2881 rc = RDC_EQNOTLOGGING;
2882 goto failed;
2883 }
2884 /* make sure that we have enough bitmap vol */
2885 req_size = RDC_BITMAP_FBA + FBA_LEN(krdc->bitmap_size);
2886 req_size += FBA_LEN(krdc->bitmap_size * BITS_IN_BYTE);
2887
2888 rc = _rdc_rsrv_devs(krdc, RDC_BMP, RDC_INTERNAL);
2889
2890 if (!RDC_SUCCESS(rc)) {
2891 cmn_err(CE_WARN,
2892 "!rdc_open_diskq: Bitmap reserve failed");
2893 spcs_s_add(kstatus, RDC_EBITMAP,
2894 urdc->primary.bitmap);
2895 rc = RDC_EBITMAP;
2896 goto failed;
2897 }
2898
2899 (void) nsc_partsize(krdc->bitmapfd, &vol_size);
2900
2901 _rdc_rlse_devs(krdc, RDC_BMP);
2902
2903 if (vol_size < req_size) {
2904 spcs_s_add(kstatus, RDC_EBITMAP2SMALL,
2905 urdc->primary.bitmap);
2906 rc = RDC_EBITMAP2SMALL;
2907 goto failed;
2908 }
2909
2910 krdc = krdc->group_next;
2911 urdc = &rdc_u_info[krdc->index];
2912
2913 } while (krdc != this);
2914
2915 if (urdc->disk_queue[0] != '\0') {
2916 spcs_s_add(kstatus, RDC_EQALREADY, urdc->primary.intf,
2917 urdc->primary.file, urdc->secondary.intf,
2918 urdc->secondary.file);
2919 rc = RDC_EQALREADY;
2920 goto failed;
2921 }
2922
2923 if (uparms->options & RDC_OPT_SECONDARY) { /* how'd we get here? */
2924 spcs_s_add(kstatus, RDC_EQWRONGMODE);
2925 rc = RDC_EQWRONGMODE;
2926 goto failed;
2927 }
2928
2929 mutex_enter(&rdc_conf_lock);
2930 if (rdc_diskq_inuse(uparms->rdc_set, uparms->rdc_set->disk_queue)) {
2931 spcs_s_add(kstatus, RDC_EDISKQINUSE,
2932 uparms->rdc_set->disk_queue);
2933 rc = RDC_EDISKQINUSE;
2934 mutex_exit(&rdc_conf_lock);
2935 goto failed;
2936 }
2937 mutex_exit(&rdc_conf_lock);
2938
2939 rdc_group_enter(krdc);
2940 rc = _rdc_add_diskq(urdc->index, diskq);
2941 if (rc < 0 || rc == RDC_EQNOADD) {
2942 group->flags &= ~RDC_DISKQUE;
2943 group->flags |= RDC_MEMQUE;
2944 spcs_s_add(kstatus, RDC_EQNOADD, uparms->rdc_set->disk_queue);
2945 rc = RDC_EQNOADD;
2946 }
2947 rdc_group_exit(krdc);
2948 failed:
2949 return (rc);
2950 }
2951
2952 int
_rdc_init_diskq(rdc_k_info_t * krdc)2953 _rdc_init_diskq(rdc_k_info_t *krdc)
2954 {
2955 rdc_group_t *group = krdc->group;
2956 disk_queue *q = &group->diskq;
2957
2958 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
2959 SET_QNXTIO(q, QHEAD(q));
2960
2961 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0)
2962 goto fail;
2963
2964 return (0);
2965 fail:
2966 return (-1);
2967 }
2968
2969 /*
2970 * inititalize the disk queue. This is a destructive
2971 * operation that will not check for emptiness of the queue.
2972 */
2973 int
rdc_init_diskq(rdc_config_t * uparms,spcs_s_info_t kstatus)2974 rdc_init_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2975 {
2976 int rc = 0;
2977 int index;
2978 rdc_k_info_t *krdc, *kp;
2979 rdc_u_info_t *urdc, *up;
2980 rdc_set_t *uset;
2981 rdc_group_t *group;
2982 disk_queue *qp;
2983
2984 uset = uparms->rdc_set;
2985
2986 mutex_enter(&rdc_conf_lock);
2987 index = rdc_lookup_byname(uset);
2988 mutex_exit(&rdc_conf_lock);
2989 if (index < 0) {
2990 spcs_s_add(kstatus, RDC_EALREADY, uset->primary.file,
2991 uset->secondary.file);
2992 rc = RDC_EALREADY;
2993 goto fail;
2994 }
2995
2996 krdc = &rdc_k_info[index];
2997 urdc = &rdc_u_info[index];
2998 group = krdc->group;
2999 qp = &group->diskq;
3000
3001 if (!IS_STATE(urdc, RDC_SYNCING) && !IS_STATE(urdc, RDC_LOGGING)) {
3002 spcs_s_add(kstatus, RDC_EQUEISREP, urdc->disk_queue);
3003 rc = RDC_EQUEISREP;
3004 goto fail;
3005 }
3006
3007 /*
3008 * a couple of big "ifs" here. in the first implementation
3009 * neither of these will be possible. This will come into
3010 * play when we persist the queue across reboots
3011 */
3012 if (!(uparms->options & RDC_OPT_FORCE_QINIT)) {
3013 if (!QEMPTY(qp)) {
3014 if (group->rdc_writer) {
3015 spcs_s_add(kstatus, RDC_EQFLUSHING,
3016 urdc->disk_queue);
3017 rc = RDC_EQFLUSHING;
3018 } else {
3019 spcs_s_add(kstatus, RDC_EQNOTEMPTY,
3020 urdc->disk_queue);
3021 rc = RDC_EQNOTEMPTY;
3022 }
3023 goto fail;
3024 }
3025 }
3026
3027 mutex_enter(QLOCK(qp));
3028 if (_rdc_init_diskq(krdc) < 0) {
3029 mutex_exit(QLOCK(qp));
3030 goto fail;
3031 }
3032 rdc_dump_iohdrs(qp);
3033
3034 rdc_group_enter(krdc);
3035
3036 rdc_clr_flags(urdc, RDC_QUEUING);
3037 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
3038 up = &rdc_u_info[kp->index];
3039 rdc_clr_flags(up, RDC_QUEUING);
3040 }
3041 rdc_group_exit(krdc);
3042
3043 mutex_exit(QLOCK(qp));
3044
3045 return (0);
3046 fail:
3047 /* generic queue failure */
3048 if (!rc) {
3049 spcs_s_add(kstatus, RDC_EQINITFAIL, urdc->disk_queue);
3050 rc = RDC_EQINITFAIL;
3051 }
3052
3053 return (rc);
3054 }
3055
3056 int
_rdc_kill_diskq(rdc_u_info_t * urdc)3057 _rdc_kill_diskq(rdc_u_info_t *urdc)
3058 {
3059 rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
3060 rdc_group_t *group = krdc->group;
3061 disk_queue *q = &group->diskq;
3062 rdc_u_info_t *up;
3063 rdc_k_info_t *p;
3064
3065 group->flags |= RDC_DISKQ_KILL;
3066 #ifdef DEBUG
3067 cmn_err(CE_NOTE, "!disabling disk queue %s", urdc->disk_queue);
3068 #endif
3069
3070 mutex_enter(QLOCK(q));
3071 rdc_init_diskq_header(group, &q->disk_hdr);
3072 rdc_dump_iohdrs(q);
3073
3074 /*
3075 * nsc_close the queue and zero out the queue name
3076 */
3077 rdc_wait_qbusy(q);
3078 rdc_close_diskq(group);
3079 mutex_exit(QLOCK(q));
3080 SET_QSIZE(q, 0);
3081 rdc_clr_flags(urdc, RDC_DISKQ_FAILED);
3082 bzero(urdc->disk_queue, NSC_MAXPATH);
3083 for (p = krdc->group_next; p != krdc; p = p->group_next) {
3084 up = &rdc_u_info[p->index];
3085 rdc_clr_flags(up, RDC_DISKQ_FAILED);
3086 bzero(up->disk_queue, NSC_MAXPATH);
3087 }
3088
3089 #ifdef DEBUG
3090 cmn_err(CE_NOTE, "!_rdc_kill_diskq: enabling memory queue");
3091 #endif
3092 group->flags &= ~(RDC_DISKQUE|RDC_DISKQ_KILL);
3093 group->flags |= RDC_MEMQUE;
3094 return (0);
3095 }
3096
3097 /*
3098 * remove this diskq regardless of whether it is draining or not
3099 * stops the flusher by invalidating the qdata (ie, instant empty)
3100 * remove the disk qeueue from the group, leaving the group with a memory
3101 * queue.
3102 */
3103 int
rdc_kill_diskq(rdc_config_t * uparms,spcs_s_info_t kstatus)3104 rdc_kill_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3105 {
3106 int rc;
3107 int index;
3108 rdc_u_info_t *urdc;
3109 rdc_k_info_t *krdc;
3110 rdc_set_t *rdc_set = uparms->rdc_set;
3111
3112 mutex_enter(&rdc_conf_lock);
3113 index = rdc_lookup_byname(uparms->rdc_set);
3114 mutex_exit(&rdc_conf_lock);
3115
3116 if (index < 0) {
3117 spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3118 rdc_set->secondary.file);
3119 rc = RDC_EALREADY;
3120 goto failed;
3121 }
3122
3123 urdc = &rdc_u_info[index];
3124 krdc = &rdc_k_info[index];
3125
3126 if (!RDC_IS_DISKQ(krdc->group)) {
3127 spcs_s_add(kstatus, RDC_EQNOQUEUE, rdc_set->primary.intf,
3128 rdc_set->primary.file, rdc_set->secondary.intf,
3129 rdc_set->secondary.file);
3130 rc = RDC_EQNOQUEUE;
3131 goto failed;
3132 }
3133
3134 /*
3135 * if (!IS_STATE(urdc, RDC_LOGGING)) {
3136 * spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3137 * uparms->rdc_set->disk_queue);
3138 * rc = RDC_EQNOTLOGGING;
3139 * goto failed;
3140 * }
3141 */
3142 rdc_unintercept_diskq(krdc->group); /* stop protecting queue */
3143 rdc_group_enter(krdc); /* to prevent further flushing */
3144 rc = _rdc_kill_diskq(urdc);
3145 rdc_group_exit(krdc);
3146
3147 failed:
3148 return (rc);
3149 }
3150
3151 /*
3152 * remove a diskq from a group.
3153 * removal of a diskq from a set, or rather
3154 * a set from a queue, is done by reconfigging out
3155 * of the group. This removes the diskq from a whole
3156 * group and replaces it with a memory based queue
3157 */
3158 #define NUM_RETRIES 15 /* Number of retries to wait if no progress */
3159 int
rdc_rem_diskq(rdc_config_t * uparms,spcs_s_info_t kstatus)3160 rdc_rem_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3161 {
3162 int index;
3163 rdc_u_info_t *urdc;
3164 rdc_k_info_t *krdc;
3165 rdc_k_info_t *this;
3166 volatile rdc_group_t *group;
3167 volatile disk_queue *diskq;
3168 int threads, counter;
3169 long blocks;
3170
3171 mutex_enter(&rdc_conf_lock);
3172 index = rdc_lookup_byname(uparms->rdc_set);
3173 mutex_exit(&rdc_conf_lock);
3174 if (index < 0) {
3175 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
3176 uparms->rdc_set->secondary.file);
3177 return (RDC_EALREADY);
3178 }
3179
3180 urdc = &rdc_u_info[index];
3181 this = &rdc_k_info[index];
3182 krdc = &rdc_k_info[index];
3183
3184 do {
3185 if (!IS_STATE(urdc, RDC_LOGGING)) {
3186 spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3187 urdc->disk_queue);
3188 return (RDC_EQNOTLOGGING);
3189 }
3190 krdc = krdc->group_next;
3191 urdc = &rdc_u_info[krdc->index];
3192
3193 } while (krdc != this);
3194
3195 /*
3196 * If there is no group or diskq configured, we can leave now
3197 */
3198 if (!(group = krdc->group) || !(diskq = &group->diskq))
3199 return (0);
3200
3201
3202 /*
3203 * Wait if not QEMPTY or threads still active
3204 */
3205 counter = 0;
3206 while (!QEMPTY(diskq) || group->rdc_thrnum) {
3207
3208 /*
3209 * Capture counters to determine if progress is being made
3210 */
3211 blocks = QBLOCKS(diskq);
3212 threads = group->rdc_thrnum;
3213
3214 /*
3215 * Wait
3216 */
3217 delay(HZ);
3218
3219 /*
3220 * Has the group or disk queue gone away while delayed?
3221 */
3222 if (!(group = krdc->group) || !(diskq = &group->diskq))
3223 return (0);
3224
3225 /*
3226 * Are we still seeing progress?
3227 */
3228 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
3229 /*
3230 * No progress see, decrement retry counter
3231 */
3232 if (counter++ > NUM_RETRIES) {
3233 /*
3234 * No progress seen, increment retry counter
3235 */
3236 int rc = group->rdc_thrnum ?
3237 RDC_EQFLUSHING : RDC_EQNOTEMPTY;
3238 spcs_s_add(kstatus, rc, urdc->disk_queue);
3239 return (rc);
3240 }
3241 } else {
3242 /*
3243 * Reset counter, as we've made progress
3244 */
3245 counter = 0;
3246 }
3247 }
3248
3249 return (0);
3250 }
3251