1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #include <sys/types.h> 27 #include <sys/ksynch.h> 28 #include <sys/cmn_err.h> 29 #include <sys/kmem.h> 30 #include <sys/stat.h> 31 #include <sys/errno.h> 32 33 #include "../solaris/nsc_thread.h" 34 #ifdef DS_DDICT 35 #include "../contract.h" 36 #endif 37 #include <sys/nsctl/nsctl.h> 38 39 #include <sys/kmem.h> 40 #include <sys/ddi.h> 41 42 #include <sys/sdt.h> /* dtrace is S10 or later */ 43 44 #include "rdc_io.h" 45 #include "rdc_bitmap.h" 46 #include "rdc_diskq.h" 47 #include "rdc_clnt.h" 48 49 #include <sys/unistat/spcs_s.h> 50 #include <sys/unistat/spcs_s_k.h> 51 #include <sys/unistat/spcs_errors.h> 52 53 extern nsc_io_t *_rdc_io_hc; 54 55 int rdc_diskq_coalesce = 0; 56 57 int 58 _rdc_rsrv_diskq(rdc_group_t *group) 59 { 60 int rc = 0; 61 62 mutex_enter(&group->diskqmutex); 63 if (group->diskqfd == NULL) { 64 mutex_exit(&group->diskqmutex); 65 return (EIO); 66 } else if ((group->diskqrsrv == 0) && 67 (rc = nsc_reserve(group->diskqfd, 0)) != 0) { 68 cmn_err(CE_WARN, 69 "!rdc: nsc_reserve(%s) failed %d\n", 70 nsc_pathname(group->diskqfd), rc); 71 } else { 72 group->diskqrsrv++; 73 } 74 75 mutex_exit(&group->diskqmutex); 76 return (rc); 77 } 78 79 void 80 _rdc_rlse_diskq(rdc_group_t *group) 81 { 82 mutex_enter(&group->diskqmutex); 83 if (group->diskqrsrv > 0 && --group->diskqrsrv == 0) { 84 nsc_release(group->diskqfd); 85 } 86 mutex_exit(&group->diskqmutex); 87 } 88 89 void 90 rdc_wait_qbusy(disk_queue *q) 91 { 92 ASSERT(MUTEX_HELD(QLOCK(q))); 93 while (q->busycnt > 0) 94 cv_wait(&q->busycv, QLOCK(q)); 95 } 96 97 void 98 rdc_set_qbusy(disk_queue *q) 99 { 100 ASSERT(MUTEX_HELD(QLOCK(q))); 101 q->busycnt++; 102 } 103 104 void 105 rdc_clr_qbusy(disk_queue *q) 106 { 107 ASSERT(MUTEX_HELD(QLOCK(q))); 108 q->busycnt--; 109 if (q->busycnt == 0) 110 cv_broadcast(&q->busycv); 111 } 112 113 int 114 rdc_lookup_diskq(char *pathname) 115 { 116 rdc_u_info_t *urdc; 117 #ifdef DEBUG 118 rdc_k_info_t *krdc; 119 #endif 120 int index; 121 122 for (index = 0; index < rdc_max_sets; index++) { 123 urdc = &rdc_u_info[index]; 124 #ifdef DEBUG 125 krdc = &rdc_k_info[index]; 126 #endif 127 ASSERT(krdc->index == index); 128 ASSERT(urdc->index == index); 129 if (!IS_ENABLED(urdc)) 130 continue; 131 132 if (strncmp(pathname, urdc->disk_queue, 133 NSC_MAXPATH) == 0) 134 return (index); 135 } 136 137 return (-1); 138 } 139 140 void 141 rdc_unintercept_diskq(rdc_group_t *grp) 142 { 143 if (!RDC_IS_DISKQ(grp)) 144 return; 145 if (grp->q_tok) 146 (void) nsc_unregister_path(grp->q_tok, 0); 147 grp->q_tok = NULL; 148 } 149 150 void 151 rdc_close_diskq(rdc_group_t *grp) 152 { 153 154 if (grp == NULL) { 155 #ifdef DEBUG 156 cmn_err(CE_WARN, "!rdc_close_diskq: NULL group!"); 157 #endif 158 return; 159 } 160 161 if (grp->diskqfd) { 162 if (nsc_close(grp->diskqfd) != 0) { 163 #ifdef DEBUG 164 cmn_err(CE_WARN, "!nsc_close on diskq failed"); 165 #else 166 ; 167 /*EMPTY*/ 168 #endif 169 } 170 grp->diskqfd = 0; 171 grp->diskqrsrv = 0; 172 } 173 bzero(&grp->diskq.disk_hdr, sizeof (diskq_header)); 174 } 175 176 /* 177 * nsc_open the diskq and attach 178 * the nsc_fd to krdc->diskqfd 179 */ 180 int 181 rdc_open_diskq(rdc_k_info_t *krdc) 182 { 183 rdc_u_info_t *urdc; 184 rdc_group_t *grp; 185 int sts; 186 nsc_size_t size; 187 char *diskqname; 188 int mutexheld = 0; 189 190 grp = krdc->group; 191 urdc = &rdc_u_info[krdc->index]; 192 193 mutex_enter(&grp->diskqmutex); 194 mutexheld++; 195 if (&urdc->disk_queue[0] == '\0') { 196 goto fail; 197 } 198 199 diskqname = &urdc->disk_queue[0]; 200 201 if (grp->diskqfd == NULL) { 202 grp->diskqfd = nsc_open(diskqname, 203 NSC_RDCHR_ID|NSC_DEVICE|NSC_WRITE, 0, 0, 0); 204 if (grp->diskqfd == NULL) { 205 cmn_err(CE_WARN, "!rdc_open_diskq: Unable to open %s", 206 diskqname); 207 goto fail; 208 } 209 } 210 if (!grp->q_tok) 211 grp->q_tok = nsc_register_path(urdc->disk_queue, 212 NSC_DEVICE | NSC_CACHE, _rdc_io_hc); 213 214 grp->diskqrsrv = 0; /* init reserve count */ 215 216 mutex_exit(&grp->diskqmutex); 217 mutexheld--; 218 /* just test a reserve release */ 219 sts = _rdc_rsrv_diskq(grp); 220 if (!RDC_SUCCESS(sts)) { 221 cmn_err(CE_WARN, "!rdc_open_diskq: Reserve failed for %s", 222 diskqname); 223 goto fail; 224 } 225 sts = nsc_partsize(grp->diskqfd, &size); 226 _rdc_rlse_diskq(grp); 227 228 if ((sts == 0) && (size < 1)) { 229 rdc_unintercept_diskq(grp); 230 rdc_close_diskq(grp); 231 goto fail; 232 } 233 234 return (0); 235 236 fail: 237 bzero(&urdc->disk_queue, NSC_MAXPATH); 238 if (mutexheld) 239 mutex_exit(&grp->diskqmutex); 240 return (-1); 241 242 } 243 244 /* 245 * rdc_count_vecs 246 * simply vec++'s until sb_addr is null 247 * returns number of vectors encountered 248 */ 249 int 250 rdc_count_vecs(nsc_vec_t *vec) 251 { 252 nsc_vec_t *vecp; 253 int i = 0; 254 vecp = vec; 255 while (vecp->sv_addr) { 256 vecp++; 257 i++; 258 } 259 return (i+1); 260 } 261 /* 262 * rdc_setid2idx 263 * given setid, return index 264 */ 265 int 266 rdc_setid2idx(int setid) { 267 268 int index = 0; 269 270 for (index = 0; index < rdc_max_sets; index++) { 271 if (rdc_u_info[index].setid == setid) 272 break; 273 } 274 if (index >= rdc_max_sets) 275 index = -1; 276 return (index); 277 } 278 279 /* 280 * rdc_idx2setid 281 * given an index, return its setid 282 */ 283 int 284 rdc_idx2setid(int index) 285 { 286 return (rdc_u_info[index].setid); 287 } 288 289 /* 290 * rdc_fill_ioheader 291 * fill in all the stuff you want to save on disk 292 * at the beginnig of each queued write 293 */ 294 void 295 rdc_fill_ioheader(rdc_aio_t *aio, io_hdr *hd, int qpos) 296 { 297 ASSERT(MUTEX_HELD(&rdc_k_info[aio->index].group->diskq.disk_qlock)); 298 299 hd->dat.magic = RDC_IOHDR_MAGIC; 300 hd->dat.type = RDC_QUEUEIO; 301 hd->dat.pos = aio->pos; 302 hd->dat.hpos = aio->pos; 303 hd->dat.qpos = qpos; 304 hd->dat.len = aio->len; 305 hd->dat.flag = aio->flag; 306 hd->dat.iostatus = aio->iostatus; 307 hd->dat.setid = rdc_idx2setid(aio->index); 308 hd->dat.time = nsc_time(); 309 if (!aio->handle) 310 hd->dat.flag |= RDC_NULL_BUF; /* no real data to queue */ 311 } 312 313 /* 314 * rdc_dump_iohdrs 315 * give back the iohdr list 316 * and clear out q->lastio 317 */ 318 void 319 rdc_dump_iohdrs(disk_queue *q) 320 { 321 io_hdr *p, *r; 322 323 ASSERT(MUTEX_HELD(QLOCK(q))); 324 325 p = q->iohdrs; 326 while (p) { 327 r = p->dat.next; 328 kmem_free(p, sizeof (*p)); 329 q->hdrcnt--; 330 p = r; 331 } 332 q->iohdrs = q->hdr_last = NULL; 333 q->hdrcnt = 0; 334 if (q->lastio->handle) 335 (void) nsc_free_buf(q->lastio->handle); 336 bzero(&(*q->lastio), sizeof (*q->lastio)); 337 } 338 339 /* 340 * rdc_fail_diskq 341 * set flags, throw away q info 342 * clean up what you can 343 * wait for flusher threads to stop (taking into account this may be one) 344 * takes group_lock, so conf, many, and bitmap may not be held 345 */ 346 void 347 rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag) 348 { 349 rdc_k_info_t *p; 350 rdc_u_info_t *q = &rdc_u_info[krdc->index]; 351 rdc_group_t *group = krdc->group; 352 disk_queue *dq = &krdc->group->diskq; 353 354 if (IS_STATE(q, RDC_DISKQ_FAILED)) 355 return; 356 357 if (!(flag & RDC_NOFAIL)) 358 cmn_err(CE_WARN, "!disk queue %s failure", q->disk_queue); 359 360 if (flag & RDC_DOLOG) { 361 rdc_group_enter(krdc); 362 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE, 363 "disk queue failed"); 364 rdc_group_exit(krdc); 365 } 366 mutex_enter(QHEADLOCK(dq)); 367 mutex_enter(QLOCK(dq)); 368 /* 369 * quick stop of the flushers 370 * other cleanup is done on the un-failing of the diskq 371 */ 372 SET_QHEAD(dq, RDC_DISKQ_DATA_OFF); 373 SET_QTAIL(dq, RDC_DISKQ_DATA_OFF); 374 SET_QNXTIO(dq, RDC_DISKQ_DATA_OFF); 375 SET_LASTQTAIL(dq, 0); 376 377 rdc_dump_iohdrs(dq); 378 379 mutex_exit(QLOCK(dq)); 380 mutex_exit(QHEADLOCK(dq)); 381 382 bzero(krdc->bitmap_ref, krdc->bitmap_size * BITS_IN_BYTE * 383 BMAP_REF_PREF_SIZE); 384 385 if (flag & RDC_DOLOG) /* otherwise, we already have the conf lock */ 386 rdc_group_enter(krdc); 387 388 else if (!(flag & RDC_GROUP_LOCKED)) 389 ASSERT(MUTEX_HELD(&rdc_conf_lock)); 390 391 if (!(flag & RDC_NOFAIL)) { 392 rdc_set_flags(q, RDC_DISKQ_FAILED); 393 } 394 rdc_clr_flags(q, RDC_QUEUING); 395 396 for (p = krdc->group_next; p != krdc; p = p->group_next) { 397 q = &rdc_u_info[p->index]; 398 if (!IS_ENABLED(q)) 399 continue; 400 if (!(flag & RDC_NOFAIL)) { 401 rdc_set_flags(q, RDC_DISKQ_FAILED); 402 } 403 rdc_clr_flags(q, RDC_QUEUING); 404 bzero(p->bitmap_ref, p->bitmap_size * BITS_IN_BYTE * 405 BMAP_REF_PREF_SIZE); 406 /* RDC_QUEUING is cleared in group_log() */ 407 } 408 409 if (flag & RDC_DOLOG) 410 rdc_group_exit(krdc); 411 412 /* can't wait for myself to go away, I'm a flusher */ 413 if (wait & RDC_WAIT) 414 while (group->rdc_thrnum) 415 delay(2); 416 417 } 418 419 /* 420 * rdc_stamp_diskq 421 * write out diskq header info 422 * must have disk_qlock held 423 * if rsrvd flag is 0, the nsc_reserve is done 424 */ 425 int 426 rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int failflags) 427 { 428 nsc_vec_t vec[2]; 429 nsc_buf_t *head = NULL; 430 rdc_group_t *grp; 431 rdc_u_info_t *urdc; 432 disk_queue *q; 433 int rc, flags; 434 435 grp = krdc->group; 436 q = &krdc->group->diskq; 437 438 ASSERT(MUTEX_HELD(&q->disk_qlock)); 439 440 urdc = &rdc_u_info[krdc->index]; 441 442 if (!rsrvd && _rdc_rsrv_diskq(grp)) { 443 cmn_err(CE_WARN, "!rdc_stamp_diskq: %s reserve failed", 444 urdc->disk_queue); 445 mutex_exit(QLOCK(q)); 446 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags); 447 mutex_enter(QLOCK(q)); 448 return (-1); 449 } 450 flags = NSC_WRITE | NSC_NOCACHE | NSC_NODATA; 451 rc = nsc_alloc_buf(grp->diskqfd, 0, 1, flags, &head); 452 453 if (!RDC_SUCCESS(rc)) { 454 cmn_err(CE_WARN, "!Alloc buf failed for disk queue %s", 455 &urdc->disk_queue[0]); 456 mutex_exit(QLOCK(q)); 457 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags); 458 mutex_enter(QLOCK(q)); 459 return (-1); 460 } 461 vec[0].sv_len = FBA_SIZE(1); 462 vec[0].sv_addr = (uchar_t *)&q->disk_hdr; 463 vec[1].sv_len = 0; 464 vec[1].sv_addr = NULL; 465 466 head->sb_vec = &vec[0]; 467 468 #ifdef DEBUG_DISKQ 469 cmn_err(CE_NOTE, "!rdc_stamp_diskq: hdr: %p magic: %x state: " 470 "%x head: %d tail: %d size: %d nitems: %d blocks: %d", 471 q, QMAGIC(q), QSTATE(q), QHEAD(q), 472 QTAIL(q), QSIZE(q), QNITEMS(q), QBLOCKS(q)); 473 #endif 474 475 rc = nsc_write(head, 0, 1, 0); 476 477 if (!RDC_SUCCESS(rc)) { 478 if (!rsrvd) 479 _rdc_rlse_diskq(grp); 480 cmn_err(CE_CONT, "!disk queue %s failed rc %d", 481 &urdc->disk_queue[0], rc); 482 mutex_exit(QLOCK(q)); 483 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags); 484 mutex_enter(QLOCK(q)); 485 return (-1); 486 } 487 488 (void) nsc_free_buf(head); 489 if (!rsrvd) 490 _rdc_rlse_diskq(grp); 491 492 return (0); 493 } 494 495 /* 496 * rdc_init_diskq_header 497 * load initial values into the header 498 */ 499 void 500 rdc_init_diskq_header(rdc_group_t *grp, dqheader *header) 501 { 502 int rc; 503 int type = 0; 504 disk_queue *q = &grp->diskq; 505 506 ASSERT(MUTEX_HELD(QLOCK(q))); 507 508 /* save q type if this is a failure */ 509 if (QSTATE(q) & RDC_QNOBLOCK) 510 type = RDC_QNOBLOCK; 511 bzero(header, sizeof (*header)); 512 header->h.magic = RDC_DISKQ_MAGIC; 513 header->h.vers = RDC_DISKQ_VERS; 514 header->h.state |= (RDC_SHUTDOWN_BAD|type); /* SHUTDOWN_OK on suspend */ 515 header->h.head_offset = RDC_DISKQ_DATA_OFF; 516 header->h.tail_offset = RDC_DISKQ_DATA_OFF; 517 header->h.nitems = 0; 518 header->h.blocks = 0; 519 header->h.qwrap = 0; 520 SET_QNXTIO(q, QHEAD(q)); 521 SET_QCOALBOUNDS(q, RDC_DISKQ_DATA_OFF); 522 523 /* do this last, as this might be a failure. get the kernel state ok */ 524 rc = _rdc_rsrv_diskq(grp); 525 if (!RDC_SUCCESS(rc)) { 526 cmn_err(CE_WARN, "!init_diskq_hdr: Reserve failed for queue"); 527 return; 528 } 529 (void) nsc_partsize(grp->diskqfd, &header->h.disk_size); 530 _rdc_rlse_diskq(grp); 531 532 } 533 534 /* 535 * rdc_unfail_diskq 536 * the diskq failed for some reason, lets try and re-start it 537 * the old stuff has already been thrown away 538 * should just be called from rdc_sync 539 */ 540 void 541 rdc_unfail_diskq(rdc_k_info_t *krdc) 542 { 543 rdc_k_info_t *p; 544 rdc_u_info_t *q = &rdc_u_info[krdc->index]; 545 rdc_group_t *group = krdc->group; 546 disk_queue *dq = &group->diskq; 547 548 rdc_group_enter(krdc); 549 rdc_clr_flags(q, RDC_ASYNC); 550 /* someone else won the race... */ 551 if (!IS_STATE(q, RDC_DISKQ_FAILED)) { 552 rdc_group_exit(krdc); 553 return; 554 } 555 rdc_clr_flags(q, RDC_DISKQ_FAILED); 556 for (p = krdc->group_next; p != krdc; p = p->group_next) { 557 q = &rdc_u_info[p->index]; 558 if (!IS_ENABLED(q)) 559 continue; 560 rdc_clr_flags(q, RDC_DISKQ_FAILED); 561 rdc_clr_flags(q, RDC_ASYNC); 562 if (IS_STATE(q, RDC_QUEUING)) 563 rdc_clr_flags(q, RDC_QUEUING); 564 } 565 rdc_group_exit(krdc); 566 567 mutex_enter(QLOCK(dq)); 568 569 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 570 /* real i/o to the queue */ 571 /* clear RDC_AUXSYNCIP because we cannot halt a sync that's not here */ 572 krdc->aux_state &= ~RDC_AUXSYNCIP; 573 if (rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED | RDC_DOLOG) < 0) { 574 mutex_exit(QLOCK(dq)); 575 goto fail; 576 } 577 578 SET_QNXTIO(dq, QHEAD(dq)); 579 SET_QHDRCNT(dq, 0); 580 SET_QSTATE(dq, RDC_SHUTDOWN_BAD); /* only suspend can write good */ 581 dq->iohdrs = NULL; 582 dq->hdr_last = NULL; 583 584 /* should be none, but.. */ 585 rdc_dump_iohdrs(dq); 586 587 mutex_exit(QLOCK(dq)); 588 589 590 fail: 591 krdc->aux_state |= RDC_AUXSYNCIP; 592 return; 593 594 } 595 596 int 597 rdc_read_diskq_header(rdc_k_info_t *krdc) 598 { 599 int rc; 600 diskq_header *header; 601 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 602 603 if (krdc->group->diskqfd == NULL) { 604 char buf[NSC_MAXPATH]; 605 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf, 606 &urdc->secondary.intf[0]); 607 cmn_err(CE_WARN, "!Disk Queue Header read failed for %s", 608 &urdc->group_name[0] == '\0' ? buf: 609 &urdc->group_name[0]); 610 return (-1); 611 } 612 613 header = &krdc->group->diskq.disk_hdr.h; 614 if (_rdc_rsrv_diskq(krdc->group)) { 615 return (-1); 616 } 617 618 rc = rdc_ns_io(krdc->group->diskqfd, NSC_RDBUF, 0, 619 (uchar_t *)header, sizeof (diskq_header)); 620 621 _rdc_rlse_diskq(krdc->group); 622 623 if (!RDC_SUCCESS(rc)) { 624 char buf[NSC_MAXPATH]; 625 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf, 626 &urdc->secondary.file[0]); 627 cmn_err(CE_WARN, "!Disk Queue Header read failed(%d) for %s", 628 rc, &urdc->group_name[0] == '\0' ? buf : 629 &urdc->group_name[0]); 630 return (-1); 631 } 632 return (0); 633 } 634 635 /* 636 * rdc_stop_diskq_flusher 637 */ 638 void 639 rdc_stop_diskq_flusher(rdc_k_info_t *krdc) 640 { 641 disk_queue q, *qp; 642 rdc_group_t *group; 643 #ifdef DEBUG 644 cmn_err(CE_NOTE, "!stopping flusher threads"); 645 #endif 646 group = krdc->group; 647 qp = &krdc->group->diskq; 648 649 /* save the queue info */ 650 q = *qp; 651 652 /* lie a little */ 653 SET_QTAIL(qp, RDC_DISKQ_DATA_OFF); 654 SET_QHEAD(qp, RDC_DISKQ_DATA_OFF); 655 SET_QSTATE(qp, RDC_QDISABLEPEND); 656 SET_QSTATE(qp, RDC_STOPPINGFLUSH); 657 658 /* drop locks to allow flushers to die */ 659 mutex_exit(QLOCK(qp)); 660 mutex_exit(QHEADLOCK(qp)); 661 rdc_group_exit(krdc); 662 663 while (group->rdc_thrnum) 664 delay(2); 665 666 rdc_group_enter(krdc); 667 mutex_enter(QHEADLOCK(qp)); 668 mutex_enter(QLOCK(qp)); 669 670 CLR_QSTATE(qp, RDC_STOPPINGFLUSH); 671 *qp = q; 672 } 673 674 /* 675 * rdc_enable_diskq 676 * open the diskq 677 * and stamp the header onto it. 678 */ 679 int 680 rdc_enable_diskq(rdc_k_info_t *krdc) 681 { 682 rdc_group_t *group; 683 disk_queue *q; 684 685 group = krdc->group; 686 q = &group->diskq; 687 688 if (rdc_open_diskq(krdc) < 0) 689 goto fail; 690 691 mutex_enter(QLOCK(q)); 692 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 693 694 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) { 695 mutex_exit(QLOCK(q)); 696 goto fail; 697 } 698 699 SET_QNXTIO(q, QHEAD(q)); 700 701 mutex_exit(QLOCK(q)); 702 return (0); 703 704 fail: 705 mutex_enter(&group->diskqmutex); 706 rdc_close_diskq(group); 707 mutex_exit(&group->diskqmutex); 708 709 /* caller has to fail diskq after dropping conf & many locks */ 710 return (RDC_EQNOADD); 711 } 712 713 /* 714 * rdc_resume_diskq 715 * open the diskq and read the header 716 */ 717 int 718 rdc_resume_diskq(rdc_k_info_t *krdc) 719 { 720 rdc_u_info_t *urdc; 721 rdc_group_t *group; 722 disk_queue *q; 723 int rc = 0; 724 725 urdc = &rdc_u_info[krdc->index]; 726 group = krdc->group; 727 q = &group->diskq; 728 729 if (rdc_open_diskq(krdc) < 0) { 730 rc = RDC_EQNOADD; 731 goto fail; 732 } 733 734 mutex_enter(QLOCK(q)); 735 736 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 737 738 if (rdc_read_diskq_header(krdc) < 0) { 739 SET_QSTATE(q, RDC_QBADRESUME); 740 rc = RDC_EQNOADD; 741 } 742 743 /* check diskq magic number */ 744 if (QMAGIC(q) != RDC_DISKQ_MAGIC) { 745 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 746 " incorrect magic number in header", urdc->disk_queue); 747 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 748 SET_QSTATE(q, RDC_QBADRESUME); 749 rc = RDC_EQNOADD; 750 } else switch (QVERS(q)) { 751 diskq_header1 h1; /* version 1 header */ 752 diskq_header *hc; /* current header */ 753 754 #ifdef NSC_MULTI_TERABYTE 755 case RDC_DISKQ_VER_ORIG: 756 /* version 1 diskq header, upgrade to 64bit version */ 757 h1 = *(diskq_header1 *)(&group->diskq.disk_hdr.h); 758 hc = &group->diskq.disk_hdr.h; 759 760 cmn_err(CE_WARN, "!SNDR: old version header for diskq %s," 761 " upgrading to current version", urdc->disk_queue); 762 hc->vers = RDC_DISKQ_VERS; 763 hc->state = h1.state; 764 hc->head_offset = h1.head_offset; 765 hc->tail_offset = h1.tail_offset; 766 hc->disk_size = h1.disk_size; 767 hc->nitems = h1.nitems; 768 hc->blocks = h1.blocks; 769 hc->qwrap = h1.qwrap; 770 hc->auxqwrap = h1.auxqwrap; 771 hc->seq_last = h1.seq_last; 772 hc->ack_last = h1.ack_last; 773 774 if (hc->nitems > 0) { 775 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 776 " old version Q contains data", urdc->disk_queue); 777 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 778 SET_QSTATE(q, RDC_QBADRESUME); 779 rc = RDC_EQNOADD; 780 } 781 break; 782 #else 783 case RDC_DISKQ_VER_64BIT: 784 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 785 " diskq header newer than current version", 786 urdc->disk_queue); 787 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 788 SET_QSTATE(q, RDC_QBADRESUME); 789 rc = RDC_EQNOADD; 790 break; 791 #endif 792 case RDC_DISKQ_VERS: 793 /* okay, current version diskq */ 794 break; 795 default: 796 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 797 " unknown diskq header version", urdc->disk_queue); 798 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 799 SET_QSTATE(q, RDC_QBADRESUME); 800 rc = RDC_EQNOADD; 801 break; 802 } 803 if (IS_QSTATE(q, RDC_SHUTDOWN_BAD)) { 804 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 805 " unsafe shutdown", urdc->disk_queue); 806 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 807 SET_QSTATE(q, RDC_QBADRESUME); 808 rc = RDC_EQNOADD; 809 } 810 811 CLR_QSTATE(q, RDC_SHUTDOWN_OK); 812 SET_QSTATE(q, RDC_SHUTDOWN_BAD); 813 814 /* bad, until proven not bad */ 815 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) { 816 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_NOLOG); 817 rc = RDC_EQNOADD; 818 } 819 820 SET_QNXTIO(q, QHEAD(q)); 821 group->diskq.nitems_hwm = QNITEMS(q); 822 group->diskq.blocks_hwm = QBLOCKS(q); 823 824 mutex_exit(QLOCK(q)); 825 826 #ifdef DEBUG 827 cmn_err(CE_NOTE, "!rdc_resume_diskq: resuming diskq %s \n", 828 urdc->disk_queue); 829 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q)); 830 #endif 831 if (rc == 0) 832 return (0); 833 834 fail: 835 836 /* caller has to set the diskq failed after dropping it's locks */ 837 return (rc); 838 839 } 840 841 int 842 rdc_suspend_diskq(rdc_k_info_t *krdc) 843 { 844 int rc; 845 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 846 disk_queue *q; 847 848 q = &krdc->group->diskq; 849 850 /* grab both diskq locks as we are going to kill the flusher */ 851 mutex_enter(QHEADLOCK(q)); 852 mutex_enter(QLOCK(q)); 853 854 if ((krdc->group->rdc_thrnum) && (!IS_QSTATE(q, RDC_STOPPINGFLUSH))) { 855 SET_QSTATE(q, RDC_STOPPINGFLUSH); 856 rdc_stop_diskq_flusher(krdc); 857 CLR_QSTATE(q, RDC_STOPPINGFLUSH); 858 } 859 860 krdc->group->diskq.disk_hdr.h.state &= ~RDC_SHUTDOWN_BAD; 861 krdc->group->diskq.disk_hdr.h.state |= RDC_SHUTDOWN_OK; 862 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QBADRESUME; 863 864 /* let's make sure that the flusher has stopped.. */ 865 if (krdc->group->rdc_thrnum) { 866 mutex_exit(QLOCK(q)); 867 mutex_exit(QHEADLOCK(q)); 868 rdc_group_exit(krdc); 869 870 while (krdc->group->rdc_thrnum) 871 delay(5); 872 873 rdc_group_enter(krdc); 874 mutex_enter(QLOCK(q)); 875 mutex_enter(QHEADLOCK(q)); 876 } 877 /* write refcount to the bitmap */ 878 if ((rc = rdc_write_refcount(krdc)) < 0) { 879 rdc_group_exit(krdc); 880 goto fail; 881 } 882 883 if (!QEMPTY(q)) { 884 rdc_set_flags(urdc, RDC_QUEUING); 885 } else { 886 rdc_clr_flags(urdc, RDC_QUEUING); 887 } 888 889 /* fill in diskq header info */ 890 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QDISABLEPEND; 891 892 #ifdef DEBUG 893 cmn_err(CE_NOTE, "!suspending disk queue\n" QDISPLAY(q)); 894 #endif 895 896 /* to avoid a possible deadlock, release in order, and reacquire */ 897 mutex_exit(QLOCK(q)); 898 mutex_exit(QHEADLOCK(q)); 899 900 if (krdc->group->count > 1) { 901 rdc_group_exit(krdc); 902 goto fail; /* just stamp on the last suspend */ 903 } 904 rdc_group_exit(krdc); /* in case this stamp fails */ 905 mutex_enter(QLOCK(q)); 906 907 rc = rdc_stamp_diskq(krdc, 0, RDC_NOLOG); 908 909 mutex_exit(QLOCK(q)); 910 911 fail: 912 rdc_group_enter(krdc); 913 914 /* diskq already failed if stamp failed */ 915 916 return (rc); 917 } 918 919 /* 920 * copy orig aio to copy, including the nsc_buf_t 921 */ 922 int 923 rdc_dup_aio(rdc_aio_t *orig, rdc_aio_t *copy) 924 { 925 int rc; 926 bcopy(orig, copy, sizeof (*orig)); 927 copy->handle = NULL; 928 929 if (orig->handle == NULL) /* no buf to alloc/copy */ 930 return (0); 931 932 rc = nsc_alloc_abuf(orig->pos, orig->len, 0, ©->handle); 933 if (!RDC_SUCCESS(rc)) { 934 #ifdef DEBUG 935 cmn_err(CE_WARN, "!rdc_dup_aio: alloc_buf failed (%d)", rc); 936 #endif 937 return (rc); 938 } 939 rc = nsc_copy(orig->handle, copy->handle, orig->pos, 940 orig->pos, orig->len); 941 if (!RDC_SUCCESS(rc)) { 942 (void) nsc_free_buf(copy->handle); 943 #ifdef DEBUG 944 cmn_err(CE_WARN, "!rdc_dup_aio: copy buf failed (%d)", rc); 945 #endif 946 return (rc); 947 } 948 return (0); 949 } 950 951 /* 952 * rdc_qfill_shldwakeup() 953 * 0 if the memory queue has filled, and the low water 954 * mark has not been reached. 0 if diskq is empty. 955 * 1 if less than low water mark 956 * net_queue mutex is already held 957 */ 958 int 959 rdc_qfill_shldwakeup(rdc_k_info_t *krdc) 960 { 961 rdc_group_t *group = krdc->group; 962 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 963 net_queue *nq = &group->ra_queue; 964 disk_queue *dq = &group->diskq; 965 966 ASSERT(MUTEX_HELD(&nq->net_qlock)); 967 968 if (!RDC_IS_DISKQ(krdc->group)) 969 return (0); 970 971 if (nq->qfill_sleeping != RDC_QFILL_ASLEEP) 972 return (0); 973 974 if (nq->qfflags & RDC_QFILLSTOP) 975 return (1); 976 977 if (nq->qfflags & RDC_QFILLSLEEP) 978 return (0); 979 980 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) 981 return (0); 982 983 mutex_enter(QLOCK(dq)); 984 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) { 985 mutex_exit(QLOCK(dq)); 986 return (0); 987 } 988 mutex_exit(QLOCK(dq)); 989 990 if (nq->qfill_sleeping == RDC_QFILL_ASLEEP) { 991 if (nq->hwmhit) { 992 if (nq->blocks <= RDC_LOW_QBLOCKS) { 993 nq->hwmhit = 0; 994 } else { 995 return (0); 996 } 997 } 998 #ifdef DEBUG_DISKQ_NOISY 999 cmn_err(CE_NOTE, "!Waking up diskq->memq flusher, flags 0x%x" 1000 " idx: %d", rdc_get_vflags(urdc), urdc->index); 1001 #endif 1002 return (1); 1003 } 1004 return (0); 1005 1006 } 1007 1008 /* 1009 * rdc_diskq_enqueue 1010 * enqueue one i/o to the diskq 1011 * after appending some metadata to the front 1012 */ 1013 int 1014 rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio) 1015 { 1016 nsc_vec_t *vec = NULL; 1017 nsc_buf_t *bp = NULL; 1018 nsc_buf_t *qbuf = NULL; 1019 io_hdr *iohdr = NULL; 1020 disk_queue *q; 1021 rdc_group_t *group; 1022 int numvecs; 1023 int i, j, rc = 0; 1024 int retries = 0; 1025 rdc_u_info_t *urdc; 1026 nsc_size_t iofbas; /* len of io + io header len */ 1027 int qtail; 1028 int delay_time = 2; 1029 int print_msg = 1; 1030 1031 #ifdef DEBUG_WRITER_UBERNOISE 1032 int qhead; 1033 #endif 1034 urdc = &rdc_u_info[krdc->index]; 1035 group = krdc->group; 1036 q = &group->diskq; 1037 1038 mutex_enter(QLOCK(q)); 1039 1040 /* 1041 * there is a thread that is blocking because the queue is full, 1042 * don't try to set up this write until all is clear 1043 * check before and after for logging or failed queue just 1044 * in case a thread was in flight while the queue was full, 1045 * and in the proccess of failing 1046 */ 1047 while (IS_QSTATE(q, RDC_QFULL)) { 1048 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 1049 (IS_STATE(urdc, RDC_LOGGING) && 1050 !IS_STATE(urdc, RDC_QUEUING))) { 1051 mutex_exit(QLOCK(q)); 1052 if (aio->handle) 1053 (void) nsc_free_buf(aio->handle); 1054 return (-1); 1055 } 1056 cv_wait(&q->qfullcv, QLOCK(q)); 1057 1058 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 1059 (IS_STATE(urdc, RDC_LOGGING) && 1060 !IS_STATE(urdc, RDC_QUEUING))) { 1061 mutex_exit(QLOCK(q)); 1062 if (aio->handle) 1063 (void) nsc_free_buf(aio->handle); 1064 return (-1); 1065 } 1066 1067 } 1068 1069 SET_QSTATE(q, QTAILBUSY); 1070 1071 if (aio->handle == NULL) { 1072 /* we're only going to write the header to the queue */ 1073 numvecs = 2; /* kmem_alloc io header + null terminate */ 1074 iofbas = FBA_LEN(sizeof (io_hdr)); 1075 1076 } else { 1077 /* find out how many vecs */ 1078 numvecs = rdc_count_vecs(aio->handle->sb_vec) + 1; 1079 iofbas = aio->len + FBA_LEN(sizeof (io_hdr)); 1080 } 1081 1082 /* 1083 * this, in conjunction with QTAILBUSY, will prevent 1084 * premature dequeuing 1085 */ 1086 1087 SET_LASTQTAIL(q, QTAIL(q)); 1088 1089 iohdr = (io_hdr *) kmem_zalloc(sizeof (io_hdr), KM_NOSLEEP); 1090 vec = (nsc_vec_t *) kmem_zalloc(sizeof (nsc_vec_t) * numvecs, 1091 KM_NOSLEEP); 1092 1093 if (!vec || !iohdr) { 1094 if (!vec) { 1095 cmn_err(CE_WARN, "!vec kmem alloc failed"); 1096 } else { 1097 cmn_err(CE_WARN, "!iohdr kmem alloc failed"); 1098 } 1099 if (vec) 1100 kmem_free(vec, sizeof (*vec)); 1101 if (iohdr) 1102 kmem_free(iohdr, sizeof (*iohdr)); 1103 CLR_QSTATE(q, QTAILBUSY); 1104 SET_LASTQTAIL(q, 0); 1105 mutex_exit(QLOCK(q)); 1106 if (aio->handle) 1107 (void) nsc_free_buf(aio->handle); 1108 return (ENOMEM); 1109 } 1110 1111 vec[numvecs - 1].sv_len = 0; 1112 vec[numvecs - 1].sv_addr = 0; 1113 1114 /* now add the write itself */ 1115 bp = aio->handle; 1116 1117 for (i = 1, j = 0; bp && bp->sb_vec[j].sv_addr && 1118 i < numvecs; i++, j++) { 1119 vec[i].sv_len = bp->sb_vec[j].sv_len; 1120 vec[i].sv_addr = bp->sb_vec[j].sv_addr; 1121 } 1122 1123 retry: 1124 1125 /* check for queue wrap, then check for overflow */ 1126 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 1127 (IS_STATE(urdc, RDC_LOGGING) && !IS_STATE(urdc, RDC_QUEUING))) { 1128 kmem_free(iohdr, sizeof (*iohdr)); 1129 kmem_free(vec, sizeof (*vec) * numvecs); 1130 CLR_QSTATE(q, QTAILBUSY); 1131 SET_LASTQTAIL(q, 0); 1132 if (IS_QSTATE(q, RDC_QFULL)) { /* wakeup blocked threads */ 1133 CLR_QSTATE(q, RDC_QFULL); 1134 cv_broadcast(&q->qfullcv); 1135 } 1136 mutex_exit(QLOCK(q)); 1137 if (aio->handle) 1138 (void) nsc_free_buf(aio->handle); 1139 1140 return (-1); 1141 } 1142 1143 if (QTAILSHLDWRAP(q, iofbas)) { 1144 /* 1145 * just go back to the beginning of the disk 1146 * it's not worth the trouble breaking up the write 1147 */ 1148 #ifdef DEBUG_DISKQWRAP 1149 cmn_err(CE_NOTE, "!wrapping Q tail: " QDISPLAY(q)); 1150 #endif 1151 /*LINTED*/ 1152 WRAPQTAIL(q); 1153 } 1154 1155 /* 1156 * prepend the write's metadata 1157 */ 1158 rdc_fill_ioheader(aio, iohdr, QTAIL(q)); 1159 1160 vec[0].sv_len = FBA_SIZE(1); 1161 vec[0].sv_addr = (uchar_t *)iohdr; 1162 1163 /* check for tail < head */ 1164 1165 if (!(FITSONQ(q, iofbas))) { 1166 /* 1167 * don't allow any more writes to start 1168 */ 1169 SET_QSTATE(q, RDC_QFULL); 1170 mutex_exit(QLOCK(q)); 1171 1172 if ((!group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING)) 1173 (void) rdc_writer(krdc->index); 1174 1175 delay(delay_time); 1176 q->throttle_delay += delay_time; 1177 retries++; 1178 delay_time *= 2; /* fairly aggressive */ 1179 if ((retries >= 8) || (delay_time >= 256)) { 1180 delay_time = 2; 1181 if (print_msg) { 1182 cmn_err(CE_WARN, "!enqueue: disk queue %s full", 1183 &urdc->disk_queue[0]); 1184 print_msg = 0; 1185 #ifdef DEBUG 1186 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q)); 1187 #else 1188 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q)); 1189 #endif 1190 } 1191 /* 1192 * if this is a no-block queue, or this is a blocking 1193 * queue that is not flushing. reset and log 1194 */ 1195 if ((QSTATE(q) & RDC_QNOBLOCK) || 1196 (IS_STATE(urdc, RDC_QUEUING))) { 1197 1198 if (IS_STATE(urdc, RDC_QUEUING)) { 1199 cmn_err(CE_WARN, "!SNDR: disk queue %s full and not flushing. " 1200 "giving up", &urdc->disk_queue[0]); 1201 cmn_err(CE_WARN, "!SNDR: %s:%s entering logging mode", 1202 urdc->secondary.intf, urdc->secondary.file); 1203 } 1204 1205 rdc_fail_diskq(krdc, RDC_WAIT, 1206 RDC_DOLOG | RDC_NOFAIL); 1207 kmem_free(iohdr, sizeof (*iohdr)); 1208 kmem_free(vec, sizeof (*vec) * numvecs); 1209 mutex_enter(QLOCK(q)); 1210 CLR_QSTATE(q, QTAILBUSY | RDC_QFULL); 1211 cv_broadcast(&q->qfullcv); 1212 mutex_exit(QLOCK(q)); 1213 SET_LASTQTAIL(q, 0); 1214 if (aio->handle) 1215 (void) nsc_free_buf(aio->handle); 1216 return (ENOMEM); 1217 } 1218 } 1219 1220 mutex_enter(QLOCK(q)); 1221 goto retry; 1222 1223 } 1224 1225 qtail = QTAIL(q); 1226 #ifdef DEBUG_WRITER_UBERNOISE 1227 qhead = QHEAD(q); 1228 #endif 1229 1230 /* update tail pointer, nitems on queue and blocks on queue */ 1231 INC_QTAIL(q, iofbas); /* increment tail over i/o size + ioheader size */ 1232 INC_QNITEMS(q, 1); 1233 /* increment counter for i/o blocks only */ 1234 INC_QBLOCKS(q, (iofbas - FBA_LEN(sizeof (io_hdr)))); 1235 1236 if (QNITEMS(q) > q->nitems_hwm) 1237 q->nitems_hwm = QNITEMS(q); 1238 if (QBLOCKS(q) > q->blocks_hwm) 1239 q->blocks_hwm = QBLOCKS(q); 1240 1241 if (IS_QSTATE(q, RDC_QFULL)) { 1242 CLR_QSTATE(q, RDC_QFULL); 1243 cv_broadcast(&q->qfullcv); 1244 } 1245 1246 mutex_exit(QLOCK(q)); 1247 1248 /* 1249 * if (krdc->io_kstats) { 1250 * mutex_enter(krdc->io_kstats->ks_lock); 1251 * kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats)); 1252 * mutex_exit(krdc->io_kstats->ks_lock); 1253 * } 1254 */ 1255 1256 DTRACE_PROBE(rdc_diskq_rsrv); 1257 1258 if (_rdc_rsrv_diskq(group)) { 1259 cmn_err(CE_WARN, "!rdc_enqueue: %s reserve failed", 1260 &urdc->disk_queue[0]); 1261 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG); 1262 kmem_free(iohdr, sizeof (*iohdr)); 1263 kmem_free(vec, sizeof (*vec) * numvecs); 1264 mutex_enter(QLOCK(q)); 1265 CLR_QSTATE(q, QTAILBUSY); 1266 SET_LASTQTAIL(q, 0); 1267 mutex_exit(QLOCK(q)); 1268 if (aio->handle) 1269 (void) nsc_free_buf(aio->handle); 1270 return (-1); 1271 } 1272 1273 /* XXX for now do this, but later pre-alloc handle in enable/resume */ 1274 1275 DTRACE_PROBE(rdc_diskq_alloc_start); 1276 rc = nsc_alloc_buf(group->diskqfd, qtail, iofbas, 1277 NSC_NOCACHE | NSC_WRITE | NSC_NODATA, &qbuf); 1278 1279 DTRACE_PROBE(rdc_diskq_alloc_end); 1280 1281 if (!RDC_SUCCESS(rc)) { 1282 cmn_err(CE_WARN, "!disk queue %s alloc failed(%d) %" NSC_SZFMT, 1283 &urdc->disk_queue[0], rc, iofbas); 1284 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG); 1285 rc = ENOMEM; 1286 goto fail; 1287 } 1288 /* move vec and write to queue */ 1289 qbuf->sb_vec = &vec[0]; 1290 1291 #ifdef DEBUG_WRITER_UBERNOISE 1292 1293 cmn_err(CE_NOTE, "!about to write to queue, qbuf: %p, qhead: %d, " 1294 "qtail: %d, len: %d contents: %c%c%c%c%c", 1295 (void *) qbuf, qhead, qtail, iofbas, 1296 qbuf->sb_vec[1].sv_addr[0], 1297 qbuf->sb_vec[1].sv_addr[1], 1298 qbuf->sb_vec[1].sv_addr[2], 1299 qbuf->sb_vec[1].sv_addr[3], 1300 qbuf->sb_vec[1].sv_addr[4]); 1301 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q)); 1302 1303 #endif 1304 1305 DTRACE_PROBE2(rdc_diskq_nswrite_start, int, qtail, nsc_size_t, iofbas); 1306 rc = nsc_write(qbuf, qtail, iofbas, 0); 1307 DTRACE_PROBE2(rdc_diskq_nswrite_end, int, qtail, nsc_size_t, iofbas); 1308 1309 if (!RDC_SUCCESS(rc)) { 1310 cmn_err(CE_WARN, "!disk queue %s write failed %d", 1311 &urdc->disk_queue[0], rc); 1312 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG); 1313 goto fail; 1314 1315 } 1316 1317 mutex_enter(QLOCK(q)); 1318 1319 SET_LASTQTAIL(q, 0); 1320 CLR_QSTATE(q, QTAILBUSY); 1321 1322 mutex_exit(QLOCK(q)); 1323 1324 fail: 1325 1326 /* 1327 * return what should be returned 1328 * the aio is returned in _rdc_write after status is gathered. 1329 */ 1330 1331 if (qbuf) 1332 qbuf->sb_vec = 0; 1333 (void) nsc_free_buf(qbuf); 1334 1335 if (aio->handle) 1336 (void) nsc_free_buf(aio->handle); 1337 1338 _rdc_rlse_diskq(group); 1339 DTRACE_PROBE(rdc_diskq_rlse); 1340 1341 /* free the iohdr and the vecs */ 1342 1343 if (iohdr) 1344 kmem_free(iohdr, sizeof (*iohdr)); 1345 if (vec) 1346 kmem_free(vec, sizeof (*vec) * numvecs); 1347 1348 /* if no flusher running, start one */ 1349 if ((!krdc->group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING)) 1350 (void) rdc_writer(krdc->index); 1351 1352 return (rc); 1353 } 1354 1355 /* 1356 * place this on the pending list of io_hdr's out for flushing 1357 */ 1358 void 1359 rdc_add_iohdr(io_hdr *header, rdc_group_t *group) 1360 { 1361 disk_queue *q = NULL; 1362 #ifdef DEBUG 1363 io_hdr *p; 1364 #endif 1365 1366 q = &group->diskq; 1367 1368 /* paranoia */ 1369 header->dat.next = NULL; 1370 1371 mutex_enter(QLOCK(q)); 1372 #ifdef DEBUG /* AAAH! double flush!? */ 1373 p = q->iohdrs; 1374 while (p) { 1375 if (p->dat.qpos == header->dat.qpos) { 1376 cmn_err(CE_WARN, "!ADDING DUPLICATE HEADER %" NSC_SZFMT, 1377 p->dat.qpos); 1378 kmem_free(header, sizeof (*header)); 1379 mutex_exit(QLOCK(q)); 1380 return; 1381 } 1382 p = p->dat.next; 1383 } 1384 #endif 1385 if (q->iohdrs == NULL) { 1386 q->iohdrs = q->hdr_last = header; 1387 q->hdrcnt = 1; 1388 mutex_exit(QLOCK(q)); 1389 return; 1390 } 1391 1392 q->hdr_last->dat.next = header; 1393 q->hdr_last = header; 1394 q->hdrcnt++; 1395 mutex_exit(QLOCK(q)); 1396 return; 1397 1398 } 1399 1400 /* 1401 * mark an io header as flushed. If it is the qhead, 1402 * then update the qpointers 1403 * free the io_hdrs 1404 * called after the bitmap is cleared by flusher 1405 */ 1406 void 1407 rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_size_t qpos) 1408 { 1409 rdc_group_t *group = krdc->group; 1410 disk_queue *q = NULL; 1411 io_hdr *hp = NULL; 1412 io_hdr *p = NULL; 1413 int found = 0; 1414 int cnt = 0; 1415 1416 #ifndef NSC_MULTI_TERABYTE 1417 ASSERT(qpos >= 0); /* assertion to validate change for 64bit */ 1418 if (qpos < 0) /* not a diskq offset */ 1419 return; 1420 #endif 1421 1422 q = &group->diskq; 1423 mutex_enter(QLOCK(q)); 1424 1425 hp = p = q->iohdrs; 1426 1427 /* find outstanding io_hdr */ 1428 while (hp) { 1429 if (hp->dat.qpos == qpos) { 1430 found++; 1431 break; 1432 } 1433 cnt++; 1434 p = hp; 1435 hp = hp->dat.next; 1436 } 1437 1438 if (!found) { 1439 if (RDC_BETWEEN(QHEAD(q), QNXTIO(q), qpos)) { 1440 #ifdef DEBUG 1441 cmn_err(CE_WARN, "!iohdr already cleared? " 1442 "qpos %" NSC_SZFMT " cnt %d ", qpos, cnt); 1443 cmn_err(CE_WARN, "!Qinfo: " QDISPLAY(q)); 1444 #endif 1445 mutex_exit(QLOCK(q)); 1446 return; 1447 } 1448 mutex_exit(QLOCK(q)); 1449 return; 1450 } 1451 1452 /* mark it as flushed */ 1453 hp->dat.iostatus = RDC_IOHDR_DONE; 1454 1455 /* 1456 * if it is the head pointer, travel the list updating the queue 1457 * pointers until the next unflushed is reached, freeing on the way. 1458 */ 1459 while (hp && (hp->dat.qpos == QHEAD(q)) && 1460 (hp->dat.iostatus == RDC_IOHDR_DONE)) { 1461 #ifdef DEBUG_FLUSHER_UBERNOISE 1462 cmn_err(CE_NOTE, "!clr_iohdr info: magic %x type %d pos %d" 1463 " qpos %d hpos %d len %d flag 0x%x iostatus %x setid %d", 1464 hp->dat.magic, hp->dat.type, hp->dat.pos, hp->dat.qpos, 1465 hp->dat.hpos, hp->dat.len, hp->dat.flag, 1466 hp->dat.iostatus, hp->dat.setid); 1467 #endif 1468 if (hp->dat.flag & RDC_NULL_BUF) { 1469 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr))); 1470 } else { 1471 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)) + hp->dat.len); 1472 DEC_QBLOCKS(q, hp->dat.len); 1473 } 1474 1475 DEC_QNITEMS(q, 1); 1476 1477 if (QHEADSHLDWRAP(q)) { /* simple enough */ 1478 #ifdef DEBUG_DISKQWRAP 1479 cmn_err(CE_NOTE, "!wrapping Q head: " QDISPLAY(q)); 1480 #endif 1481 /*LINTED*/ 1482 WRAPQHEAD(q); 1483 } 1484 1485 /* get rid of the iohdr */ 1486 if (hp == q->iohdrs) { 1487 q->iohdrs = hp->dat.next; 1488 kmem_free(hp, sizeof (*hp)); 1489 hp = q->iohdrs; 1490 } else { 1491 if (hp == q->hdr_last) 1492 q->hdr_last = p; 1493 p->dat.next = hp->dat.next; 1494 kmem_free(hp, sizeof (*hp)); 1495 hp = p->dat.next; 1496 } 1497 q->hdrcnt--; 1498 } 1499 1500 if (QEMPTY(q) && !IS_QSTATE(q, RDC_QFULL) && 1501 !(IS_QSTATE(q, RDC_QDISABLEPEND))) { 1502 #ifdef DEBUG_FLUSHER_UBERNOISE 1503 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 1504 cmn_err(CE_NOTE, "!clr_iohdr: diskq %s empty, " 1505 "resetting defaults", urdc->disk_queue); 1506 #endif 1507 1508 rdc_init_diskq_header(group, &q->disk_hdr); 1509 SET_QNXTIO(q, QHEAD(q)); 1510 } 1511 1512 /* wakeup any blocked enqueue threads */ 1513 cv_broadcast(&q->qfullcv); 1514 mutex_exit(QLOCK(q)); 1515 } 1516 1517 /* 1518 * put in whatever useful checks we can on the io header 1519 */ 1520 int 1521 rdc_iohdr_ok(io_hdr *hdr) 1522 { 1523 if (hdr->dat.magic != RDC_IOHDR_MAGIC) 1524 goto bad; 1525 return (1); 1526 bad: 1527 1528 #ifdef DEBUG 1529 cmn_err(CE_WARN, "!Bad io header magic %x type %d pos %" NSC_SZFMT 1530 " hpos %" NSC_SZFMT " qpos %" NSC_SZFMT " len %" NSC_SZFMT 1531 " flag %d iostatus %d setid %d", hdr->dat.magic, 1532 hdr->dat.type, hdr->dat.pos, hdr->dat.hpos, hdr->dat.qpos, 1533 hdr->dat.len, hdr->dat.flag, hdr->dat.iostatus, hdr->dat.setid); 1534 #else 1535 cmn_err(CE_WARN, "!Bad io header retrieved"); 1536 #endif 1537 return (0); 1538 } 1539 1540 /* 1541 * rdc_netqueue_insert() 1542 * add an item to a netqueue. No locks necessary as it should only 1543 * be used in a single threaded manor. If that changes, then 1544 * a lock or assertion should be done here 1545 */ 1546 void 1547 rdc_netqueue_insert(rdc_aio_t *aio, net_queue *q) 1548 { 1549 rdc_k_info_t *krdc = &rdc_k_info[aio->index]; 1550 1551 /* paranoid check for bit set */ 1552 RDC_CHECK_BIT(krdc, aio->pos, aio->len); 1553 1554 if (q->net_qhead == NULL) { 1555 q->net_qhead = q->net_qtail = aio; 1556 1557 } else { 1558 q->net_qtail->next = aio; 1559 q->net_qtail = aio; 1560 } 1561 q->blocks += aio->len; 1562 q->nitems++; 1563 1564 if (q->nitems > q->nitems_hwm) { 1565 q->nitems_hwm = q->nitems; 1566 } 1567 if (q->blocks > q->blocks_hwm) { 1568 q->nitems_hwm = q->blocks; 1569 } 1570 } 1571 1572 /* 1573 * rdc_fill_aio(aio, hdr) 1574 * take the pertinent info from an io_hdr and stick it in 1575 * an aio, including seq number, abuf. 1576 */ 1577 void 1578 rdc_fill_aio(rdc_group_t *grp, rdc_aio_t *aio, io_hdr *hdr, nsc_buf_t *abuf) 1579 { 1580 if (hdr->dat.flag & RDC_NULL_BUF) { 1581 aio->handle = NULL; 1582 } else { 1583 aio->handle = abuf; 1584 } 1585 aio->qhandle = abuf; 1586 aio->pos = hdr->dat.pos; 1587 aio->qpos = hdr->dat.qpos; 1588 aio->len = hdr->dat.len; 1589 aio->flag = hdr->dat.flag; 1590 if ((aio->index = rdc_setid2idx(hdr->dat.setid)) < 0) 1591 return; 1592 mutex_enter(&grp->diskq.disk_qlock); 1593 if (grp->ra_queue.qfflags & RDC_QFILLSLEEP) { 1594 mutex_exit(&grp->diskq.disk_qlock); 1595 aio->seq = RDC_NOSEQ; 1596 return; 1597 } 1598 if (abuf && aio->qhandle) { 1599 abuf->sb_user++; 1600 } 1601 aio->seq = grp->seq++; 1602 if (grp->seq < aio->seq) 1603 grp->seq = RDC_NEWSEQ + 1; 1604 mutex_exit(&grp->diskq.disk_qlock); 1605 hdr->dat.iostatus = aio->seq; 1606 1607 } 1608 1609 #ifdef DEBUG 1610 int maxaios_perbuf = 0; 1611 int midaios_perbuf = 0; 1612 int aveaios_perbuf = 0; 1613 int totaios_perbuf = 0; 1614 int buf2qcalls = 0; 1615 1616 void 1617 calc_perbuf(int items) 1618 { 1619 if (totaios_perbuf < 0) { 1620 maxaios_perbuf = 0; 1621 midaios_perbuf = 0; 1622 aveaios_perbuf = 0; 1623 totaios_perbuf = 0; 1624 buf2qcalls = 0; 1625 } 1626 1627 if (items > maxaios_perbuf) 1628 maxaios_perbuf = items; 1629 midaios_perbuf = maxaios_perbuf / 2; 1630 totaios_perbuf += items; 1631 aveaios_perbuf = totaios_perbuf / buf2qcalls; 1632 } 1633 #endif 1634 1635 /* 1636 * rdc_discard_tmpq() 1637 * free up the passed temporary queue 1638 * NOTE: no cv's or mutexes have been initialized 1639 */ 1640 void 1641 rdc_discard_tmpq(net_queue *q) 1642 { 1643 rdc_aio_t *aio; 1644 1645 if (q == NULL) 1646 return; 1647 1648 while (q->net_qhead) { 1649 aio = q->net_qhead; 1650 q->net_qhead = q->net_qhead->next; 1651 if (aio->qhandle) { 1652 aio->qhandle->sb_user--; 1653 if (aio->qhandle->sb_user == 0) { 1654 rdc_fixlen(aio); 1655 (void) nsc_free_buf(aio->qhandle); 1656 } 1657 } 1658 kmem_free(aio, sizeof (*aio)); 1659 q->nitems--; 1660 } 1661 kmem_free(q, sizeof (*q)); 1662 1663 } 1664 1665 /* 1666 * rdc_diskq_buf2queue() 1667 * take a chunk of the diskq, parse it and assemble 1668 * a chain of rdc_aio_t's. 1669 * updates QNXTIO() 1670 */ 1671 net_queue * 1672 rdc_diskq_buf2queue(rdc_group_t *grp, nsc_buf_t **abuf, int index) 1673 { 1674 rdc_aio_t *aio = NULL; 1675 nsc_vec_t *vecp = NULL; 1676 uchar_t *vaddr = NULL; 1677 uchar_t *ioaddr = NULL; 1678 net_queue *netq = NULL; 1679 io_hdr *hdr = NULL; 1680 nsc_buf_t *buf = *abuf; 1681 rdc_u_info_t *urdc = &rdc_u_info[index]; 1682 rdc_k_info_t *krdc = &rdc_k_info[index]; 1683 disk_queue *dq = &grp->diskq; 1684 net_queue *nq = &grp->ra_queue; 1685 int nullbuf = 0; 1686 nsc_off_t endobuf; 1687 nsc_off_t bufoff; 1688 int vlen; 1689 nsc_off_t fpos; 1690 long bufcnt = 0; 1691 int nullblocks = 0; 1692 int fail = 1; 1693 1694 if (buf == NULL) 1695 return (NULL); 1696 1697 netq = kmem_zalloc(sizeof (*netq), KM_NOSLEEP); 1698 if (netq == NULL) { 1699 cmn_err(CE_WARN, "!SNDR: unable to allocate net queue"); 1700 return (NULL); 1701 } 1702 1703 vecp = buf->sb_vec; 1704 vlen = vecp->sv_len; 1705 vaddr = vecp->sv_addr; 1706 bufoff = buf->sb_pos; 1707 endobuf = bufoff + buf->sb_len; 1708 1709 #ifdef DEBUG_FLUSHER_UBERNOISE 1710 cmn_err(CE_WARN, "!BUFFOFFENTER %d", bufoff); 1711 #endif 1712 /* CONSTCOND */ 1713 while (1) { 1714 if (IS_STATE(urdc, RDC_LOGGING) || 1715 (nq->qfflags & RDC_QFILLSLEEP)) { 1716 fail = 0; 1717 goto fail; 1718 } 1719 #ifdef DEBUG_FLUSHER_UBERNOISE 1720 cmn_err(CE_WARN, "!BUFFOFF_0 %d", bufoff); 1721 #endif 1722 1723 if ((vaddr == NULL) || (vlen == 0)) 1724 break; 1725 1726 if (vlen <= 0) { 1727 vecp++; 1728 vaddr = vecp->sv_addr; 1729 vlen = vecp->sv_len; 1730 if (vaddr == NULL) 1731 break; 1732 } 1733 1734 /* get the iohdr information */ 1735 1736 hdr = kmem_zalloc(sizeof (*hdr), KM_NOSLEEP); 1737 if (hdr == NULL) { 1738 cmn_err(CE_WARN, 1739 "!SNDR: unable to alocate net queue header"); 1740 goto fail; 1741 } 1742 1743 ioaddr = (uchar_t *)hdr; 1744 1745 bcopy(vaddr, ioaddr, sizeof (*hdr)); 1746 1747 if (!rdc_iohdr_ok(hdr)) { 1748 cmn_err(CE_WARN, 1749 "!unable to retrieve i/o data from queue %s " 1750 "at offset %" NSC_SZFMT " bp: %" NSC_SZFMT " bl: %" 1751 NSC_SZFMT, urdc->disk_queue, 1752 bufoff, buf->sb_pos, buf->sb_len); 1753 #ifdef DEBUG_DISKQ 1754 cmn_err(CE_WARN, "!FAILING QUEUE state: %x", 1755 rdc_get_vflags(urdc)); 1756 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(dq)); 1757 cmn_err(CE_WARN, "!VADDR %p, IOADDR %p", vaddr, ioaddr); 1758 cmn_err(CE_WARN, "!BUF %p", buf); 1759 #endif 1760 cmn_err(CE_WARN, "!qinfo: " QDISPLAYND(dq)); 1761 1762 goto fail; 1763 } 1764 1765 nullbuf = hdr->dat.flag & RDC_NULL_BUF; 1766 1767 bufoff += FBA_NUM(sizeof (*hdr)); 1768 1769 /* out of buffer, set nxtio to re read this last hdr */ 1770 if (!nullbuf && ((bufoff + hdr->dat.len) > endobuf)) { 1771 break; 1772 } 1773 1774 bufcnt += FBA_NUM(sizeof (*hdr)); 1775 1776 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP); 1777 if (aio == NULL) { 1778 bufcnt -= FBA_NUM(sizeof (*hdr)); 1779 cmn_err(CE_WARN, "!SNDR: net queue aio alloc failed"); 1780 goto fail; 1781 } 1782 1783 if (!nullbuf) { 1784 /* move to next iohdr in big buf */ 1785 bufoff += hdr->dat.len; 1786 bufcnt += hdr->dat.len; 1787 } 1788 1789 rdc_fill_aio(grp, aio, hdr, buf); 1790 1791 if (aio->index < 0) { 1792 cmn_err(CE_WARN, "!Set id %d not found or no longer " 1793 "enabled, failing disk queue", hdr->dat.setid); 1794 kmem_free(aio, sizeof (*aio)); 1795 goto fail; 1796 } 1797 if (aio->seq == RDC_NOSEQ) { 1798 kmem_free(aio, sizeof (*aio)); 1799 fail = 0; 1800 goto fail; 1801 } 1802 if (aio->handle == NULL) 1803 nullblocks += aio->len; 1804 1805 rdc_add_iohdr(hdr, grp); 1806 hdr = NULL; /* don't accidentally free on break or fail */ 1807 rdc_netqueue_insert(aio, netq); 1808 1809 /* no more buffer, skip the below logic */ 1810 if ((bufoff + FBA_NUM(sizeof (*hdr))) >= endobuf) { 1811 break; 1812 } 1813 1814 fpos = bufoff - buf->sb_pos; 1815 vecp = buf->sb_vec; 1816 for (; fpos >= FBA_NUM(vecp->sv_len); vecp++) 1817 fpos -= FBA_NUM(vecp->sv_len); 1818 vlen = vecp->sv_len - FBA_SIZE(fpos); 1819 vaddr = vecp->sv_addr + FBA_SIZE(fpos); 1820 /* abuf = NULL; */ 1821 1822 } 1823 1824 /* free extraneous header */ 1825 if (hdr) { 1826 kmem_free(hdr, sizeof (*hdr)); 1827 hdr = NULL; 1828 } 1829 1830 /* 1831 * probably won't happen, but if we didn't goto fail, but 1832 * we don't contain anything meaningful.. return NULL 1833 * and let the flusher or the sleep/wakeup routines 1834 * decide 1835 */ 1836 if (netq && netq->nitems == 0) { 1837 kmem_free(netq, sizeof (*netq)); 1838 return (NULL); 1839 } 1840 1841 #ifdef DEBUG 1842 buf2qcalls++; 1843 calc_perbuf(netq->nitems); 1844 #endif 1845 if (IS_STATE(urdc, RDC_LOGGING) || 1846 nq->qfflags & RDC_QFILLSLEEP) { 1847 fail = 0; 1848 goto fail; 1849 } 1850 1851 mutex_enter(QLOCK(dq)); 1852 INC_QNXTIO(dq, bufcnt); 1853 mutex_exit(QLOCK(dq)); 1854 1855 netq->net_qtail->orig_len = nullblocks; /* overload */ 1856 1857 return (netq); 1858 1859 fail: 1860 1861 if (hdr) { 1862 kmem_free(hdr, sizeof (*hdr)); 1863 } 1864 1865 if (netq) { 1866 if (netq->nitems > 0) { 1867 /* the never can happen case ... */ 1868 if ((netq->nitems == 1) && 1869 (netq->net_qhead->handle == NULL)) 1870 (void) nsc_free_buf(buf); 1871 *abuf = NULL; 1872 1873 } 1874 rdc_discard_tmpq(netq); 1875 } 1876 1877 mutex_enter(QLOCK(dq)); 1878 rdc_dump_iohdrs(dq); 1879 mutex_exit(QLOCK(dq)); 1880 1881 if (fail) { /* real failure, not just state change */ 1882 #ifdef DEBUG 1883 cmn_err(CE_WARN, "!rdc_diskq_buf2queue: failing disk queue %s", 1884 urdc->disk_queue); 1885 #endif 1886 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG); 1887 } 1888 1889 return (NULL); 1890 1891 } 1892 1893 /* 1894 * rdc_diskq_unqueue 1895 * remove one chunk from the diskq belonging to 1896 * rdc_k_info[index] 1897 * updates the head and tail pointers in the disk header 1898 * but does not write. The header should be written on ack 1899 * flusher should free whatever.. 1900 */ 1901 rdc_aio_t * 1902 rdc_diskq_unqueue(int index) 1903 { 1904 int rc, rc1, rc2; 1905 nsc_off_t qhead; 1906 int nullhandle = 0; 1907 io_hdr *iohdr; 1908 rdc_aio_t *aio = NULL; 1909 nsc_buf_t *buf = NULL; 1910 nsc_buf_t *abuf = NULL; 1911 rdc_group_t *group = NULL; 1912 disk_queue *q = NULL; 1913 rdc_k_info_t *krdc = &rdc_k_info[index]; 1914 rdc_u_info_t *urdc = &rdc_u_info[index]; 1915 1916 group = krdc->group; 1917 q = &group->diskq; 1918 1919 if (group->diskqfd == NULL) /* we've been disabled */ 1920 return (NULL); 1921 1922 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP); 1923 if (!aio) { 1924 return (NULL); 1925 } 1926 1927 iohdr = kmem_zalloc(sizeof (*iohdr), KM_NOSLEEP); 1928 if (!iohdr) { 1929 kmem_free(aio, sizeof (*aio)); 1930 return (NULL); 1931 } 1932 1933 mutex_enter(QLOCK(q)); 1934 rdc_set_qbusy(q); /* make sure no one disables the queue */ 1935 mutex_exit(QLOCK(q)); 1936 1937 DTRACE_PROBE(rdc_diskq_unq_rsrv); 1938 1939 if (_rdc_rsrv_diskq(group)) { 1940 cmn_err(CE_WARN, "!rdc_unqueue: %s reserve failed", 1941 urdc->disk_queue); 1942 goto fail; 1943 } 1944 1945 mutex_enter(QHEADLOCK(q)); 1946 mutex_enter(QLOCK(q)); 1947 1948 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || IS_STATE(urdc, RDC_LOGGING)) { 1949 rdc_clr_qbusy(q); 1950 mutex_exit(QLOCK(q)); 1951 mutex_exit(QHEADLOCK(q)); 1952 kmem_free(aio, sizeof (*aio)); 1953 kmem_free(iohdr, sizeof (*iohdr)); 1954 return (NULL); 1955 } 1956 1957 if (QNXTIOSHLDWRAP(q)) { 1958 #ifdef DEBUG_DISKQWRAP 1959 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(q)); 1960 #endif 1961 /*LINTED*/ 1962 WRAPQNXTIO(q); 1963 } 1964 1965 /* read the metainfo at q->nxt_io first */ 1966 if (QNXTIO(q) == QTAIL(q)) { /* empty */ 1967 1968 _rdc_rlse_diskq(group); 1969 if (q->lastio->handle) 1970 (void) nsc_free_buf(q->lastio->handle); 1971 bzero(&(*q->lastio), sizeof (*q->lastio)); 1972 1973 mutex_exit(QHEADLOCK(q)); 1974 rdc_clr_qbusy(q); 1975 mutex_exit(QLOCK(q)); 1976 kmem_free(aio, sizeof (*aio)); 1977 kmem_free(iohdr, sizeof (*iohdr)); 1978 return (NULL); 1979 } 1980 1981 qhead = QNXTIO(q); 1982 1983 /* 1984 * have to drop the lock here, sigh. Cannot block incoming io 1985 * we have to wait until after this read to find out how 1986 * much to increment QNXTIO. Might as well grab the seq then too 1987 */ 1988 1989 while ((qhead == LASTQTAIL(q)) && (IS_QSTATE(q, QTAILBUSY))) { 1990 mutex_exit(QLOCK(q)); 1991 #ifdef DEBUG_DISKQ 1992 cmn_err(CE_NOTE, "!Qtail busy delay lastqtail: %d", qhead); 1993 #endif 1994 delay(5); 1995 mutex_enter(QLOCK(q)); 1996 } 1997 mutex_exit(QLOCK(q)); 1998 1999 DTRACE_PROBE(rdc_diskq_iohdr_read_start); 2000 2001 rc = rdc_ns_io(group->diskqfd, NSC_READ, qhead, 2002 (uchar_t *)iohdr, FBA_SIZE(1)); 2003 2004 DTRACE_PROBE(rdc_diskq_iohdr_read_end); 2005 2006 if (!RDC_SUCCESS(rc) || !rdc_iohdr_ok(iohdr)) { 2007 cmn_err(CE_WARN, "!unable to retrieve i/o data from queue %s" 2008 " at offset %" NSC_SZFMT " rc %d", urdc->disk_queue, 2009 qhead, rc); 2010 #ifdef DEBUG_DISKQ 2011 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q)); 2012 #endif 2013 mutex_exit(QHEADLOCK(q)); 2014 goto fail; 2015 } 2016 2017 /* XXX process buffer here, creating rdc_aio_t's */ 2018 2019 mutex_enter(QLOCK(q)); 2020 /* update the next pointer */ 2021 if (iohdr->dat.flag == RDC_NULL_BUF) { 2022 INC_QNXTIO(q, FBA_LEN(sizeof (io_hdr))); 2023 nullhandle = 1; 2024 } else { 2025 INC_QNXTIO(q, (FBA_LEN(sizeof (io_hdr)) + iohdr->dat.len)); 2026 } 2027 2028 aio->seq = group->seq++; 2029 if (group->seq < aio->seq) 2030 group->seq = RDC_NEWSEQ + 1; 2031 2032 mutex_exit(QLOCK(q)); 2033 mutex_exit(QHEADLOCK(q)); 2034 2035 #ifdef DEBUG_FLUSHER_UBERNOISE 2036 p = &iohdr->dat; 2037 cmn_err(CE_NOTE, "!unqueued iohdr from %d pos: %d len: %d flag: %d " 2038 "iostatus: %d setid: %d time: %d", qhead, p->pos, p->len, 2039 p->flag, p->iostatus, p->setid, p->time); 2040 #endif 2041 2042 if (nullhandle) /* nothing to get from queue */ 2043 goto nullbuf; 2044 2045 /* now that we know how much to get (iohdr.dat.len), get it */ 2046 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_start); 2047 2048 rc = nsc_alloc_buf(group->diskqfd, qhead + 1, iohdr->dat.len, 2049 NSC_NOCACHE | NSC_READ, &buf); 2050 2051 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_end); 2052 2053 /* and get somewhere to keep it for a bit */ 2054 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_start); 2055 2056 rc1 = nsc_alloc_abuf(qhead + 1, iohdr->dat.len, 0, &abuf); 2057 2058 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_end); 2059 2060 if (!RDC_SUCCESS(rc) || !RDC_SUCCESS(rc1)) { /* uh-oh */ 2061 cmn_err(CE_WARN, "!disk queue %s read failure", 2062 urdc->disk_queue); 2063 goto fail; 2064 } 2065 2066 /* move it on over... */ 2067 rc2 = nsc_copy(buf, abuf, qhead + 1, qhead + 1, iohdr->dat.len); 2068 2069 if (!RDC_SUCCESS(rc2)) { 2070 #ifdef DEBUG 2071 cmn_err(CE_WARN, "!nsc_copy failed for diskq unqueue"); 2072 #endif 2073 goto fail; 2074 } 2075 2076 /* let go of the real buf, we've got the abuf */ 2077 (void) nsc_free_buf(buf); 2078 buf = NULL; 2079 2080 aio->handle = abuf; 2081 /* Hack in the original sb_pos */ 2082 aio->handle->sb_pos = iohdr->dat.hpos; 2083 2084 /* skip the RDC_HANDLE_LIMITS check */ 2085 abuf->sb_user |= RDC_DISKQUE; 2086 2087 nullbuf: 2088 if (nullhandle) { 2089 aio->handle = NULL; 2090 } 2091 2092 /* set up the rest of the aio values, seq set above ... */ 2093 aio->pos = iohdr->dat.pos; 2094 aio->qpos = iohdr->dat.qpos; 2095 aio->len = iohdr->dat.len; 2096 aio->flag = iohdr->dat.flag; 2097 aio->index = rdc_setid2idx(iohdr->dat.setid); 2098 if (aio->index < 0) { /* uh-oh */ 2099 #ifdef DEBUG 2100 cmn_err(CE_WARN, "!rdc_diskq_unqueue: index < 0"); 2101 #endif 2102 goto fail; 2103 } 2104 2105 2106 #ifdef DEBUG_FLUSHER_UBERNOISE_STAMP 2107 h = &q->disk_hdr.h; 2108 cmn_err(CE_NOTE, "!stamping diskq header:\n" 2109 "magic: %x\nstate: %d\nhead_offset: %d\n" 2110 "tail_offset: %d\ndisk_size: %d\nnitems: %d\nblocks: %d\n", 2111 h->magic, h->state, h->head_offset, h->tail_offset, 2112 h->disk_size, h->nitems, h->blocks); 2113 #endif 2114 2115 _rdc_rlse_diskq(group); 2116 2117 mutex_enter(QLOCK(q)); 2118 rdc_clr_qbusy(q); 2119 mutex_exit(QLOCK(q)); 2120 2121 DTRACE_PROBE(rdc_diskq_unq_rlse); 2122 2123 iohdr->dat.iostatus = aio->seq; 2124 rdc_add_iohdr(iohdr, group); 2125 2126 #ifdef DEBUG_FLUSHER_UBERNOISE 2127 if (!nullhandle) { 2128 cmn_err(CE_NOTE, "!UNQUEUING, %p" 2129 " contents: %c%c%c%c%c pos: %d len: %d", 2130 (void *)aio->handle, 2131 aio->handle->sb_vec[0].sv_addr[0], 2132 aio->handle->sb_vec[0].sv_addr[1], 2133 aio->handle->sb_vec[0].sv_addr[2], 2134 aio->handle->sb_vec[0].sv_addr[3], 2135 aio->handle->sb_vec[0].sv_addr[4], 2136 aio->handle->sb_pos, aio->handle->sb_len); 2137 } else { 2138 cmn_err(CE_NOTE, "!UNQUEUING, NULL " QDISPLAY(q)); 2139 } 2140 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q)); 2141 #endif 2142 2143 return (aio); 2144 2145 fail: 2146 if (aio) 2147 kmem_free(aio, sizeof (*aio)); 2148 if (iohdr) 2149 kmem_free(iohdr, sizeof (*iohdr)); 2150 if (buf) 2151 (void) nsc_free_buf(buf); 2152 if (abuf) 2153 (void) nsc_free_buf(abuf); 2154 2155 _rdc_rlse_diskq(group); 2156 #ifdef DEBUG 2157 cmn_err(CE_WARN, "!diskq_unqueue: failing diskq"); 2158 #endif 2159 mutex_enter(QLOCK(q)); 2160 rdc_clr_qbusy(q); 2161 mutex_exit(QLOCK(q)); 2162 2163 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG); 2164 2165 return (NULL); 2166 } 2167 2168 int 2169 rdc_diskq_inuse(rdc_set_t *set, char *diskq) 2170 { 2171 rdc_u_info_t *urdc; 2172 char *group; 2173 int index; 2174 2175 group = set->group_name; 2176 2177 ASSERT(MUTEX_HELD(&rdc_conf_lock)); 2178 2179 if ((rdc_lookup_bitmap(diskq) >= 0) || 2180 (rdc_lookup_configured(diskq) >= 0)) { 2181 return (1); 2182 } 2183 for (index = 0; index < rdc_max_sets; index++) { 2184 urdc = &rdc_u_info[index]; 2185 2186 if (!IS_ENABLED(urdc)) 2187 continue; 2188 2189 /* same diskq different group */ 2190 if ((strcmp(urdc->disk_queue, diskq) == 0) && 2191 (urdc->group_name[0] == '\0' || 2192 strcmp(urdc->group_name, group))) { 2193 return (1); 2194 } 2195 } 2196 /* last, but not least, lets see if someone is getting really funky */ 2197 if ((strcmp(set->disk_queue, set->primary.file) == 0) || 2198 (strcmp(set->disk_queue, set->primary.bitmap) == 0)) { 2199 return (1); 2200 } 2201 2202 return (0); 2203 2204 } 2205 2206 #ifdef DEBUG 2207 int maxlen = 0; 2208 int avelen = 0; 2209 int totalen = 0; 2210 int lencalls = 0; 2211 2212 void 2213 update_lenstats(int len) 2214 { 2215 if (lencalls == 0) { 2216 lencalls = 1; 2217 avelen = 0; 2218 maxlen = 0; 2219 totalen = 0; 2220 } 2221 2222 if (len > maxlen) 2223 maxlen = len; 2224 totalen += len; 2225 avelen = totalen / lencalls; 2226 } 2227 #endif 2228 2229 /* 2230 * rdc_calc_len() 2231 * returns the size of the diskq that can be read for dequeuing 2232 * always <= RDC_MAX_DISKQREAD 2233 */ 2234 int 2235 rdc_calc_len(rdc_k_info_t *krdc, disk_queue *dq) 2236 { 2237 nsc_size_t len = 0; 2238 2239 ASSERT(MUTEX_HELD(QLOCK(dq))); 2240 2241 /* ---H-----N-----T--- */ 2242 if (QNXTIO(dq) < QTAIL(dq)) { 2243 2244 len = min(RDC_MAX_DISKQREAD, QTAIL(dq) - QNXTIO(dq)); 2245 2246 /* ---T-----H-----N--- */ 2247 } else if (QNXTIO(dq) > QTAIL(dq)) { 2248 if (QWRAP(dq)) { 2249 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq)); 2250 } else { /* should never happen */ 2251 len = min(RDC_MAX_DISKQREAD, QSIZE(dq) - QNXTIO(dq)); 2252 } 2253 } else if (QNXTIO(dq) == QTAIL(dq)) { 2254 if (QWRAP(dq) && !IS_QSTATE(dq, QNXTIOWRAPD)) 2255 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq)); 2256 } 2257 2258 len = min(len, krdc->maxfbas); 2259 2260 #ifdef DEBUG 2261 lencalls++; 2262 update_lenstats(len); 2263 #endif 2264 2265 return ((int)len); 2266 } 2267 2268 /* 2269 * lie a little if we can, so we don't get tied up in 2270 * _nsc_wait_dbuf() on the next read. sb_len MUST be 2271 * restored before nsc_free_buf() however, or we will 2272 * be looking at memory leak city.. 2273 * so update the entire queue with the info as well 2274 * and the one that ends up freeing it, can fix the len 2275 * IMPORTANT: This assumes that we are not cached, in 2276 * 3.2 caching was turned off for data volumes, if that 2277 * changes, then this must too 2278 */ 2279 void 2280 rdc_trim_buf(nsc_buf_t *buf, net_queue *q) 2281 { 2282 rdc_aio_t *p; 2283 int len; 2284 2285 if (buf == NULL || q == NULL) 2286 return; 2287 2288 if (q && (buf->sb_len > 2289 (q->blocks + q->nitems - q->net_qtail->orig_len))) { 2290 len = buf->sb_len; 2291 buf->sb_len = (q->blocks + q->nitems - q->net_qtail->orig_len); 2292 } 2293 2294 p = q->net_qhead; 2295 do { 2296 p->orig_len = len; 2297 p = p->next; 2298 2299 } while (p); 2300 2301 } 2302 2303 /* 2304 * rdc_read_diskq_buf() 2305 * read a large as possible chunk of the diskq into a nsc_buf_t 2306 * and convert it to a net_queue of rdc_aio_t's to be appended 2307 * to the group's netqueue 2308 */ 2309 net_queue * 2310 rdc_read_diskq_buf(int index) 2311 { 2312 nsc_buf_t *buf = NULL; 2313 net_queue *tmpnq = NULL; 2314 disk_queue *dq = NULL; 2315 rdc_k_info_t *krdc = &rdc_k_info[index]; 2316 rdc_u_info_t *urdc = &rdc_u_info[index]; 2317 rdc_group_t *group = krdc->group; 2318 net_queue *nq = &group->ra_queue; 2319 int len = 0; 2320 int rc; 2321 int fail = 0; 2322 int offset = 0; 2323 2324 if (group == NULL || group->diskqfd == NULL) { 2325 DTRACE_PROBE(rdc_read_diskq_buf_bail1); 2326 return (NULL); 2327 } 2328 2329 dq = &group->diskq; 2330 2331 mutex_enter(QLOCK(dq)); 2332 rdc_set_qbusy(dq); /* prevent disables on the queue */ 2333 mutex_exit(QLOCK(dq)); 2334 2335 if (_rdc_rsrv_diskq(group)) { 2336 cmn_err(CE_WARN, "!rdc_readdiskqbuf: %s reserve failed", 2337 urdc->disk_queue); 2338 mutex_enter(QLOCK(dq)); 2339 rdc_clr_qbusy(dq); /* prevent disables on the queue */ 2340 mutex_exit(QLOCK(dq)); 2341 return (NULL); 2342 } 2343 2344 mutex_enter(QHEADLOCK(dq)); 2345 mutex_enter(QLOCK(dq)); 2346 2347 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 2348 IS_STATE(urdc, RDC_LOGGING) || 2349 (nq->qfflags & RDC_QFILLSLEEP)) { 2350 mutex_exit(QLOCK(dq)); 2351 mutex_exit(QHEADLOCK(dq)); 2352 DTRACE_PROBE(rdc_read_diskq_buf_bail2); 2353 goto done; 2354 } 2355 2356 /* 2357 * real corner case here, we need to let the flusher wrap first. 2358 * we've gotten too far ahead, so just delay and try again 2359 */ 2360 if (IS_QSTATE(dq, QNXTIOWRAPD) && AUXQWRAP(dq)) { 2361 mutex_exit(QLOCK(dq)); 2362 mutex_exit(QHEADLOCK(dq)); 2363 goto done; 2364 } 2365 2366 if (QNXTIOSHLDWRAP(dq)) { 2367 #ifdef DEBUG_DISKQWRAP 2368 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(dq)); 2369 #endif 2370 /*LINTED*/ 2371 WRAPQNXTIO(dq); 2372 } 2373 2374 /* read the metainfo at q->nxt_io first */ 2375 if (!QNITEMS(dq)) { /* empty */ 2376 2377 if (dq->lastio->handle) 2378 (void) nsc_free_buf(dq->lastio->handle); 2379 bzero(&(*dq->lastio), sizeof (*dq->lastio)); 2380 mutex_exit(QLOCK(dq)); 2381 mutex_exit(QHEADLOCK(dq)); 2382 DTRACE_PROBE(rdc_read_diskq_buf_bail3); 2383 goto done; 2384 } 2385 2386 2387 len = rdc_calc_len(krdc, dq); 2388 2389 if ((len <= 0) || (IS_STATE(urdc, RDC_LOGGING)) || 2390 (IS_STATE(urdc, RDC_DISKQ_FAILED)) || 2391 (nq->qfflags & RDC_QFILLSLEEP)) { 2392 mutex_exit(QLOCK(dq)); 2393 mutex_exit(QHEADLOCK(dq)); 2394 /* 2395 * a write could be trying to get on the queue, or if 2396 * the queue is really really small, a complete image 2397 * of it could be on the net queue waiting for flush. 2398 * the latter being a fairly stupid scenario and a gross 2399 * misconfiguration.. but what the heck, why make the thread 2400 * thrash around.. just pause a little here. 2401 */ 2402 if (len <= 0) 2403 delay(50); 2404 2405 DTRACE_PROBE3(rdc_read_diskq_buf_bail4, int, len, 2406 int, rdc_get_vflags(urdc), int, nq->qfflags); 2407 2408 goto done; 2409 } 2410 2411 DTRACE_PROBE2(rdc_calc_len, int, len, int, (int)QNXTIO(dq)); 2412 2413 #ifdef DEBUG_FLUSHER_UBERNOISE 2414 cmn_err(CE_WARN, "!CALC_LEN(%d) h:%d n%d t%d, w%d", 2415 len, QHEAD(dq), QNXTIO(dq), QTAIL(dq), QWRAP(dq)); 2416 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(dq)); 2417 #endif 2418 SET_QCOALBOUNDS(dq, QNXTIO(dq) + len); 2419 2420 while ((LASTQTAIL(dq) > 0) && !QWRAP(dq) && 2421 ((QNXTIO(dq) + len) >= LASTQTAIL(dq)) && 2422 (IS_QSTATE(dq, QTAILBUSY))) { 2423 mutex_exit(QLOCK(dq)); 2424 2425 #ifdef DEBUG_FLUSHER_UBERNOISE 2426 cmn_err(CE_NOTE, "!Qtail busy delay nxtio %d len %d " 2427 "lastqtail: %d", QNXTIO(dq), len, LASTQTAIL(dq)); 2428 #endif 2429 delay(20); 2430 mutex_enter(QLOCK(dq)); 2431 } 2432 2433 offset = QNXTIO(dq); 2434 2435 /* 2436 * one last check to see if we have gone logging, or should. 2437 * we may have released the mutex above, so check again 2438 */ 2439 if ((IS_STATE(urdc, RDC_LOGGING)) || 2440 (IS_STATE(urdc, RDC_DISKQ_FAILED)) || 2441 (nq->qfflags & RDC_QFILLSLEEP)) { 2442 mutex_exit(QLOCK(dq)); 2443 mutex_exit(QHEADLOCK(dq)); 2444 goto done; 2445 } 2446 2447 mutex_exit(QLOCK(dq)); 2448 mutex_exit(QHEADLOCK(dq)); 2449 2450 DTRACE_PROBE2(rdc_buf2q_preread, int, offset, int, len); 2451 2452 rc = nsc_alloc_buf(group->diskqfd, offset, len, 2453 NSC_NOCACHE | NSC_READ, &buf); 2454 2455 if (!RDC_SUCCESS(rc)) { 2456 cmn_err(CE_WARN, "!disk queue %s read failure pos %" NSC_SZFMT 2457 " len %d", urdc->disk_queue, QNXTIO(dq), len); 2458 fail++; 2459 buf = NULL; 2460 DTRACE_PROBE(rdc_read_diskq_buf_bail5); 2461 goto done; 2462 } 2463 2464 DTRACE_PROBE2(rdc_buf2q_postread, int, offset, nsc_size_t, buf->sb_len); 2465 2466 /* 2467 * convert buf to a net_queue. buf2queue will 2468 * update the QNXTIO pointer for us, based on 2469 * the last readable queue item 2470 */ 2471 tmpnq = rdc_diskq_buf2queue(group, &buf, index); 2472 2473 #ifdef DEBUG_FLUSHER_UBERNOISE 2474 cmn_err(CE_NOTE, "!QBUF p: %d l: %d p+l: %d users: %d qblocks: %d ", 2475 "qitems: %d WASTED: %d", buf->sb_pos, buf->sb_len, 2476 buf->sb_pos+buf->sb_len, buf->sb_user, tmpnq?tmpnq->blocks:-1, 2477 tmpnq?tmpnq->nitems:-1, 2478 tmpnq?((buf->sb_len-tmpnq->nitems) - tmpnq->blocks):-1); 2479 #endif 2480 2481 DTRACE_PROBE3(rdc_buf2que_returned, net_queue *, tmpnq?tmpnq:0, 2482 uint64_t, tmpnq?tmpnq->nitems:0, 2483 uint_t, tmpnq?tmpnq->net_qhead->seq:0); 2484 done: 2485 2486 /* we don't need to retain the buf */ 2487 if (tmpnq == NULL) 2488 if (buf) { 2489 (void) nsc_free_buf(buf); 2490 buf = NULL; 2491 } 2492 2493 rdc_trim_buf(buf, tmpnq); 2494 2495 mutex_enter(QLOCK(dq)); 2496 rdc_clr_qbusy(dq); 2497 mutex_exit(QLOCK(dq)); 2498 2499 _rdc_rlse_diskq(group); 2500 2501 if (fail) { 2502 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG); 2503 tmpnq = NULL; 2504 } 2505 2506 return (tmpnq); 2507 } 2508 2509 /* 2510 * rdc_dequeue() 2511 * removes the head of the memory queue 2512 */ 2513 rdc_aio_t * 2514 rdc_dequeue(rdc_k_info_t *krdc, int *rc) 2515 { 2516 net_queue *q = &krdc->group->ra_queue; 2517 disk_queue *dq = &krdc->group->diskq; 2518 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 2519 rdc_aio_t *aio; 2520 2521 *rc = 0; 2522 2523 if (q == NULL) 2524 return (NULL); 2525 2526 mutex_enter(&q->net_qlock); 2527 2528 aio = q->net_qhead; 2529 2530 if (aio == NULL) { 2531 #ifdef DEBUG 2532 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) { 2533 cmn_err(CE_PANIC, 2534 "rdc_dequeue(1): q %p, q blocks %" NSC_SZFMT 2535 " , nitems %" NSC_SZFMT ", qhead %p qtail %p", 2536 (void *) q, q->blocks, q->nitems, 2537 (void *) aio, (void *) q->net_qtail); 2538 } 2539 #endif 2540 2541 mutex_exit(&q->net_qlock); 2542 2543 if ((!IS_STATE(urdc, RDC_LOGGING)) && 2544 (!(q->qfflags & RDC_QFILLSLEEP)) && 2545 (!IS_STATE(urdc, RDC_SYNCING)) && (QNITEMS(dq) > 0)) { 2546 *rc = EAGAIN; 2547 } 2548 2549 goto done; 2550 } 2551 2552 /* aio remove from q */ 2553 2554 q->net_qhead = aio->next; 2555 aio->next = NULL; 2556 2557 if (q->net_qtail == aio) 2558 q->net_qtail = q->net_qhead; 2559 2560 q->blocks -= aio->len; 2561 q->nitems--; 2562 2563 #ifdef DEBUG 2564 if (q->net_qhead == NULL) { 2565 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) { 2566 cmn_err(CE_PANIC, "rdc_dequeue(2): q %p, q blocks %" 2567 NSC_SZFMT " nitems %" NSC_SZFMT 2568 " , qhead %p qtail %p", 2569 (void *) q, q->blocks, q->nitems, 2570 (void *) q->net_qhead, (void *) q->net_qtail); 2571 } 2572 } 2573 #endif 2574 mutex_exit(&q->net_qlock); 2575 done: 2576 2577 mutex_enter(&q->net_qlock); 2578 2579 if (rdc_qfill_shldwakeup(krdc)) 2580 cv_broadcast(&q->qfcv); 2581 2582 /* 2583 * clear EAGAIN if 2584 * logging or q filler thread is sleeping or stopping altogether 2585 * or if q filler thread is dead already 2586 * or if syncing, this will return a null aio, with no error code set 2587 * telling the flusher to die 2588 */ 2589 if (*rc == EAGAIN) { 2590 if (IS_STATE(urdc, RDC_LOGGING) || 2591 (q->qfflags & (RDC_QFILLSLEEP | RDC_QFILLSTOP)) || 2592 (IS_QSTATE(dq, (RDC_QDISABLEPEND | RDC_STOPPINGFLUSH))) || 2593 (q->qfill_sleeping == RDC_QFILL_DEAD) || 2594 (IS_STATE(urdc, RDC_SYNCING))) 2595 *rc = 0; 2596 } 2597 2598 mutex_exit(&q->net_qlock); 2599 2600 return (aio); 2601 2602 } 2603 2604 /* 2605 * rdc_qfill_shldsleep() 2606 * returns 1 if the qfilling code should cv_wait() 0 if not. 2607 * reasons for going into cv_wait(); 2608 * there is nothing in the diskq to flush to mem. 2609 * the memory queue has gotten too big and needs more flushing attn. 2610 */ 2611 int 2612 rdc_qfill_shldsleep(rdc_k_info_t *krdc) 2613 { 2614 net_queue *nq = &krdc->group->ra_queue; 2615 disk_queue *dq = &krdc->group->diskq; 2616 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 2617 2618 ASSERT(MUTEX_HELD(&nq->net_qlock)); 2619 2620 if (!RDC_IS_DISKQ(krdc->group)) 2621 return (1); 2622 2623 if (nq->qfflags & RDC_QFILLSLEEP) { 2624 #ifdef DEBUG_DISKQ_NOISY 2625 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QFILLSLEEP idx: %d", 2626 krdc->index); 2627 #endif 2628 return (1); 2629 } 2630 2631 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) { 2632 #ifdef DEBUG_DISKQ_NOISY 2633 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: Sync|Log (0x%x)" 2634 " idx: %d", rdc_get_vflags(urdc), urdc->index); 2635 #endif 2636 return (1); 2637 } 2638 2639 mutex_enter(QLOCK(dq)); 2640 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) { 2641 #ifdef DEBUG_DISKQ_NOISY 2642 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QEMPTY"); 2643 #endif 2644 mutex_exit(QLOCK(dq)); 2645 return (1); 2646 } 2647 mutex_exit(QLOCK(dq)); 2648 2649 if (nq->blocks >= RDC_MAX_QBLOCKS) { 2650 nq->hwmhit = 1; 2651 /* stuck flushers ? */ 2652 #ifdef DEBUG_DISKQ_NOISY 2653 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: memq full:" 2654 " seq: %d seqack %d", krdc->group->seq, 2655 krdc->group->seqack); 2656 #endif 2657 return (1); 2658 } 2659 2660 return (0); 2661 } 2662 2663 /* 2664 * rdc_join_netqueues(a, b) 2665 * appends queue b to queue a updating all the queue info 2666 * as it is assumed queue a is the important one, 2667 * it's mutex must be held. no one can add to queue b 2668 */ 2669 void 2670 rdc_join_netqueues(net_queue *q, net_queue *tmpq) 2671 { 2672 ASSERT(MUTEX_HELD(&q->net_qlock)); 2673 2674 if (q->net_qhead == NULL) { /* empty */ 2675 #ifdef DEBUG 2676 if (q->blocks != 0 || q->nitems != 0) { 2677 cmn_err(CE_PANIC, "rdc filler: q %p, qhead 0, " 2678 " q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT, 2679 (void *) q, q->blocks, q->nitems); 2680 } 2681 #endif 2682 q->net_qhead = tmpq->net_qhead; 2683 q->net_qtail = tmpq->net_qtail; 2684 q->nitems = tmpq->nitems; 2685 q->blocks = tmpq->blocks; 2686 } else { 2687 q->net_qtail->next = tmpq->net_qhead; 2688 q->net_qtail = tmpq->net_qtail; 2689 q->nitems += tmpq->nitems; 2690 q->blocks += tmpq->blocks; 2691 } 2692 2693 if (q->nitems > q->nitems_hwm) { 2694 q->nitems_hwm = q->nitems; 2695 } 2696 2697 if (q->blocks > q->blocks_hwm) { 2698 q->blocks_hwm = q->blocks; 2699 } 2700 } 2701 2702 /* 2703 * rdc_qfiller_thr() single thread that moves 2704 * data from the diskq to a memory queue for 2705 * the flusher to pick up. 2706 */ 2707 void 2708 rdc_qfiller_thr(rdc_k_info_t *krdc) 2709 { 2710 rdc_group_t *grp = krdc->group; 2711 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 2712 net_queue *q = &grp->ra_queue; 2713 net_queue *tmpq = NULL; 2714 int index = krdc->index; 2715 2716 q->qfill_sleeping = RDC_QFILL_AWAKE; 2717 while (!(q->qfflags & RDC_QFILLSTOP)) { 2718 if (!RDC_IS_DISKQ(grp) || 2719 IS_STATE(urdc, RDC_LOGGING) || 2720 IS_STATE(urdc, RDC_DISKQ_FAILED) || 2721 (q->qfflags & RDC_QFILLSLEEP)) { 2722 goto nulltmpq; 2723 } 2724 2725 DTRACE_PROBE(qfiller_top); 2726 tmpq = rdc_read_diskq_buf(index); 2727 2728 if (tmpq == NULL) 2729 goto nulltmpq; 2730 2731 if ((q->qfflags & RDC_QFILLSLEEP) || 2732 IS_STATE(urdc, RDC_LOGGING)) { 2733 rdc_discard_tmpq(tmpq); 2734 goto nulltmpq; 2735 } 2736 2737 mutex_enter(&q->net_qlock); 2738 2739 /* race with log, redundant yet paranoid */ 2740 if ((q->qfflags & RDC_QFILLSLEEP) || 2741 IS_STATE(urdc, RDC_LOGGING)) { 2742 rdc_discard_tmpq(tmpq); 2743 mutex_exit(&q->net_qlock); 2744 goto nulltmpq; 2745 } 2746 2747 2748 rdc_join_netqueues(q, tmpq); 2749 kmem_free(tmpq, sizeof (*tmpq)); 2750 tmpq = NULL; 2751 2752 mutex_exit(&q->net_qlock); 2753 nulltmpq: 2754 /* 2755 * sleep for a while if we can. 2756 * the enqueuing or flushing code will 2757 * wake us if if necessary. 2758 */ 2759 mutex_enter(&q->net_qlock); 2760 while (rdc_qfill_shldsleep(krdc)) { 2761 q->qfill_sleeping = RDC_QFILL_ASLEEP; 2762 DTRACE_PROBE(qfiller_sleep); 2763 cv_wait(&q->qfcv, &q->net_qlock); 2764 DTRACE_PROBE(qfiller_wakeup); 2765 q->qfill_sleeping = RDC_QFILL_AWAKE; 2766 if (q->qfflags & RDC_QFILLSTOP) { 2767 #ifdef DEBUG_DISKQ 2768 cmn_err(CE_NOTE, 2769 "!rdc_qfiller_thr: recieved kill signal"); 2770 #endif 2771 mutex_exit(&q->net_qlock); 2772 goto done; 2773 } 2774 } 2775 mutex_exit(&q->net_qlock); 2776 2777 DTRACE_PROBE(qfiller_bottom); 2778 } 2779 done: 2780 DTRACE_PROBE(qfiller_done); 2781 q->qfill_sleeping = RDC_QFILL_DEAD; /* the big sleep */ 2782 2783 #ifdef DEBUG 2784 cmn_err(CE_NOTE, "!rdc_qfiller_thr stopping"); 2785 #endif 2786 q->qfflags &= ~RDC_QFILLSTOP; 2787 2788 } 2789 2790 int 2791 _rdc_add_diskq(int index, char *diskq) 2792 { 2793 rdc_k_info_t *krdc, *kp; 2794 rdc_u_info_t *urdc, *up; 2795 rdc_group_t *group; 2796 int rc; 2797 2798 krdc = &rdc_k_info[index]; 2799 urdc = &rdc_u_info[index]; 2800 group = krdc->group; 2801 2802 if (!diskq || urdc->disk_queue[0]) { /* how'd that happen? */ 2803 #ifdef DEBUG 2804 cmn_err(CE_WARN, "!NULL diskq in _rdc_add_diskq"); 2805 #endif 2806 rc = -1; 2807 goto fail; 2808 } 2809 2810 /* if the enable fails, this is bzero'ed */ 2811 (void) strncpy(urdc->disk_queue, diskq, NSC_MAXPATH); 2812 group->flags &= ~RDC_MEMQUE; 2813 group->flags |= RDC_DISKQUE; 2814 2815 #ifdef DEBUG 2816 cmn_err(CE_NOTE, "!adding diskq to group %s", urdc->group_name); 2817 #endif 2818 mutex_enter(&rdc_conf_lock); 2819 rc = rdc_enable_diskq(krdc); 2820 mutex_exit(&rdc_conf_lock); 2821 2822 if (rc == RDC_EQNOADD) { 2823 goto fail; 2824 } 2825 2826 RDC_ZERO_BITREF(krdc); 2827 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) { 2828 up = &rdc_u_info[kp->index]; 2829 (void) strncpy(up->disk_queue, diskq, NSC_MAXPATH); 2830 /* size lives in the diskq structure, already set by enable */ 2831 RDC_ZERO_BITREF(kp); 2832 } 2833 2834 fail: 2835 return (rc); 2836 2837 } 2838 2839 /* 2840 * add a diskq to an existing set/group 2841 */ 2842 int 2843 rdc_add_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 2844 { 2845 char *diskq; 2846 int rc; 2847 int index; 2848 rdc_k_info_t *krdc, *this; 2849 rdc_u_info_t *urdc; 2850 rdc_group_t *group; 2851 nsc_size_t vol_size = 0; 2852 nsc_size_t req_size = 0; 2853 2854 mutex_enter(&rdc_conf_lock); 2855 index = rdc_lookup_byname(uparms->rdc_set); 2856 mutex_exit(&rdc_conf_lock); 2857 if (index < 0) { 2858 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file, 2859 uparms->rdc_set->secondary.file); 2860 rc = RDC_EALREADY; 2861 goto failed; 2862 } 2863 urdc = &rdc_u_info[index]; 2864 krdc = &rdc_k_info[index]; 2865 this = &rdc_k_info[index]; 2866 group = krdc->group; 2867 diskq = uparms->rdc_set->disk_queue; 2868 2869 if (!IS_ASYNC(urdc)) { 2870 spcs_s_add(kstatus, RDC_EQWRONGMODE, urdc->primary.intf, 2871 urdc->primary.file, urdc->secondary.intf, 2872 urdc->secondary.file); 2873 rc = RDC_EQNOQUEUE; 2874 goto failed; 2875 } 2876 2877 do { 2878 if (!IS_STATE(urdc, RDC_LOGGING)) { 2879 spcs_s_add(kstatus, RDC_EQNOTLOGGING, 2880 uparms->rdc_set->disk_queue); 2881 rc = RDC_EQNOTLOGGING; 2882 goto failed; 2883 } 2884 /* make sure that we have enough bitmap vol */ 2885 req_size = RDC_BITMAP_FBA + FBA_LEN(krdc->bitmap_size); 2886 req_size += FBA_LEN(krdc->bitmap_size * BITS_IN_BYTE); 2887 2888 rc = _rdc_rsrv_devs(krdc, RDC_BMP, RDC_INTERNAL); 2889 2890 if (!RDC_SUCCESS(rc)) { 2891 cmn_err(CE_WARN, 2892 "!rdc_open_diskq: Bitmap reserve failed"); 2893 spcs_s_add(kstatus, RDC_EBITMAP, 2894 urdc->primary.bitmap); 2895 rc = RDC_EBITMAP; 2896 goto failed; 2897 } 2898 2899 (void) nsc_partsize(krdc->bitmapfd, &vol_size); 2900 2901 _rdc_rlse_devs(krdc, RDC_BMP); 2902 2903 if (vol_size < req_size) { 2904 spcs_s_add(kstatus, RDC_EBITMAP2SMALL, 2905 urdc->primary.bitmap); 2906 rc = RDC_EBITMAP2SMALL; 2907 goto failed; 2908 } 2909 2910 krdc = krdc->group_next; 2911 urdc = &rdc_u_info[krdc->index]; 2912 2913 } while (krdc != this); 2914 2915 if (urdc->disk_queue[0] != '\0') { 2916 spcs_s_add(kstatus, RDC_EQALREADY, urdc->primary.intf, 2917 urdc->primary.file, urdc->secondary.intf, 2918 urdc->secondary.file); 2919 rc = RDC_EQALREADY; 2920 goto failed; 2921 } 2922 2923 if (uparms->options & RDC_OPT_SECONDARY) { /* how'd we get here? */ 2924 spcs_s_add(kstatus, RDC_EQWRONGMODE); 2925 rc = RDC_EQWRONGMODE; 2926 goto failed; 2927 } 2928 2929 mutex_enter(&rdc_conf_lock); 2930 if (rdc_diskq_inuse(uparms->rdc_set, uparms->rdc_set->disk_queue)) { 2931 spcs_s_add(kstatus, RDC_EDISKQINUSE, 2932 uparms->rdc_set->disk_queue); 2933 rc = RDC_EDISKQINUSE; 2934 mutex_exit(&rdc_conf_lock); 2935 goto failed; 2936 } 2937 mutex_exit(&rdc_conf_lock); 2938 2939 rdc_group_enter(krdc); 2940 rc = _rdc_add_diskq(urdc->index, diskq); 2941 if (rc < 0 || rc == RDC_EQNOADD) { 2942 group->flags &= ~RDC_DISKQUE; 2943 group->flags |= RDC_MEMQUE; 2944 spcs_s_add(kstatus, RDC_EQNOADD, uparms->rdc_set->disk_queue); 2945 rc = RDC_EQNOADD; 2946 } 2947 rdc_group_exit(krdc); 2948 failed: 2949 return (rc); 2950 } 2951 2952 int 2953 _rdc_init_diskq(rdc_k_info_t *krdc) 2954 { 2955 rdc_group_t *group = krdc->group; 2956 disk_queue *q = &group->diskq; 2957 2958 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 2959 SET_QNXTIO(q, QHEAD(q)); 2960 2961 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) 2962 goto fail; 2963 2964 return (0); 2965 fail: 2966 return (-1); 2967 } 2968 2969 /* 2970 * inititalize the disk queue. This is a destructive 2971 * operation that will not check for emptiness of the queue. 2972 */ 2973 int 2974 rdc_init_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 2975 { 2976 int rc = 0; 2977 int index; 2978 rdc_k_info_t *krdc, *kp; 2979 rdc_u_info_t *urdc, *up; 2980 rdc_set_t *uset; 2981 rdc_group_t *group; 2982 disk_queue *qp; 2983 2984 uset = uparms->rdc_set; 2985 2986 mutex_enter(&rdc_conf_lock); 2987 index = rdc_lookup_byname(uset); 2988 mutex_exit(&rdc_conf_lock); 2989 if (index < 0) { 2990 spcs_s_add(kstatus, RDC_EALREADY, uset->primary.file, 2991 uset->secondary.file); 2992 rc = RDC_EALREADY; 2993 goto fail; 2994 } 2995 2996 krdc = &rdc_k_info[index]; 2997 urdc = &rdc_u_info[index]; 2998 group = krdc->group; 2999 qp = &group->diskq; 3000 3001 if (!IS_STATE(urdc, RDC_SYNCING) && !IS_STATE(urdc, RDC_LOGGING)) { 3002 spcs_s_add(kstatus, RDC_EQUEISREP, urdc->disk_queue); 3003 rc = RDC_EQUEISREP; 3004 goto fail; 3005 } 3006 3007 /* 3008 * a couple of big "ifs" here. in the first implementation 3009 * neither of these will be possible. This will come into 3010 * play when we persist the queue across reboots 3011 */ 3012 if (!(uparms->options & RDC_OPT_FORCE_QINIT)) { 3013 if (!QEMPTY(qp)) { 3014 if (group->rdc_writer) { 3015 spcs_s_add(kstatus, RDC_EQFLUSHING, 3016 urdc->disk_queue); 3017 rc = RDC_EQFLUSHING; 3018 } else { 3019 spcs_s_add(kstatus, RDC_EQNOTEMPTY, 3020 urdc->disk_queue); 3021 rc = RDC_EQNOTEMPTY; 3022 } 3023 goto fail; 3024 } 3025 } 3026 3027 mutex_enter(QLOCK(qp)); 3028 if (_rdc_init_diskq(krdc) < 0) { 3029 mutex_exit(QLOCK(qp)); 3030 goto fail; 3031 } 3032 rdc_dump_iohdrs(qp); 3033 3034 rdc_group_enter(krdc); 3035 3036 rdc_clr_flags(urdc, RDC_QUEUING); 3037 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) { 3038 up = &rdc_u_info[kp->index]; 3039 rdc_clr_flags(up, RDC_QUEUING); 3040 } 3041 rdc_group_exit(krdc); 3042 3043 mutex_exit(QLOCK(qp)); 3044 3045 return (0); 3046 fail: 3047 /* generic queue failure */ 3048 if (!rc) { 3049 spcs_s_add(kstatus, RDC_EQINITFAIL, urdc->disk_queue); 3050 rc = RDC_EQINITFAIL; 3051 } 3052 3053 return (rc); 3054 } 3055 3056 int 3057 _rdc_kill_diskq(rdc_u_info_t *urdc) 3058 { 3059 rdc_k_info_t *krdc = &rdc_k_info[urdc->index]; 3060 rdc_group_t *group = krdc->group; 3061 disk_queue *q = &group->diskq; 3062 rdc_u_info_t *up; 3063 rdc_k_info_t *p; 3064 3065 group->flags |= RDC_DISKQ_KILL; 3066 #ifdef DEBUG 3067 cmn_err(CE_NOTE, "!disabling disk queue %s", urdc->disk_queue); 3068 #endif 3069 3070 mutex_enter(QLOCK(q)); 3071 rdc_init_diskq_header(group, &q->disk_hdr); 3072 rdc_dump_iohdrs(q); 3073 3074 /* 3075 * nsc_close the queue and zero out the queue name 3076 */ 3077 rdc_wait_qbusy(q); 3078 rdc_close_diskq(group); 3079 mutex_exit(QLOCK(q)); 3080 SET_QSIZE(q, 0); 3081 rdc_clr_flags(urdc, RDC_DISKQ_FAILED); 3082 bzero(urdc->disk_queue, NSC_MAXPATH); 3083 for (p = krdc->group_next; p != krdc; p = p->group_next) { 3084 up = &rdc_u_info[p->index]; 3085 rdc_clr_flags(up, RDC_DISKQ_FAILED); 3086 bzero(up->disk_queue, NSC_MAXPATH); 3087 } 3088 3089 #ifdef DEBUG 3090 cmn_err(CE_NOTE, "!_rdc_kill_diskq: enabling memory queue"); 3091 #endif 3092 group->flags &= ~(RDC_DISKQUE|RDC_DISKQ_KILL); 3093 group->flags |= RDC_MEMQUE; 3094 return (0); 3095 } 3096 3097 /* 3098 * remove this diskq regardless of whether it is draining or not 3099 * stops the flusher by invalidating the qdata (ie, instant empty) 3100 * remove the disk qeueue from the group, leaving the group with a memory 3101 * queue. 3102 */ 3103 int 3104 rdc_kill_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 3105 { 3106 int rc; 3107 int index; 3108 rdc_u_info_t *urdc; 3109 rdc_k_info_t *krdc; 3110 rdc_set_t *rdc_set = uparms->rdc_set; 3111 3112 mutex_enter(&rdc_conf_lock); 3113 index = rdc_lookup_byname(uparms->rdc_set); 3114 mutex_exit(&rdc_conf_lock); 3115 3116 if (index < 0) { 3117 spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file, 3118 rdc_set->secondary.file); 3119 rc = RDC_EALREADY; 3120 goto failed; 3121 } 3122 3123 urdc = &rdc_u_info[index]; 3124 krdc = &rdc_k_info[index]; 3125 3126 if (!RDC_IS_DISKQ(krdc->group)) { 3127 spcs_s_add(kstatus, RDC_EQNOQUEUE, rdc_set->primary.intf, 3128 rdc_set->primary.file, rdc_set->secondary.intf, 3129 rdc_set->secondary.file); 3130 rc = RDC_EQNOQUEUE; 3131 goto failed; 3132 } 3133 3134 /* 3135 * if (!IS_STATE(urdc, RDC_LOGGING)) { 3136 * spcs_s_add(kstatus, RDC_EQNOTLOGGING, 3137 * uparms->rdc_set->disk_queue); 3138 * rc = RDC_EQNOTLOGGING; 3139 * goto failed; 3140 * } 3141 */ 3142 rdc_unintercept_diskq(krdc->group); /* stop protecting queue */ 3143 rdc_group_enter(krdc); /* to prevent further flushing */ 3144 rc = _rdc_kill_diskq(urdc); 3145 rdc_group_exit(krdc); 3146 3147 failed: 3148 return (rc); 3149 } 3150 3151 /* 3152 * remove a diskq from a group. 3153 * removal of a diskq from a set, or rather 3154 * a set from a queue, is done by reconfigging out 3155 * of the group. This removes the diskq from a whole 3156 * group and replaces it with a memory based queue 3157 */ 3158 #define NUM_RETRIES 15 /* Number of retries to wait if no progress */ 3159 int 3160 rdc_rem_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 3161 { 3162 int index; 3163 rdc_u_info_t *urdc; 3164 rdc_k_info_t *krdc; 3165 rdc_k_info_t *this; 3166 volatile rdc_group_t *group; 3167 volatile disk_queue *diskq; 3168 int threads, counter; 3169 long blocks; 3170 3171 mutex_enter(&rdc_conf_lock); 3172 index = rdc_lookup_byname(uparms->rdc_set); 3173 mutex_exit(&rdc_conf_lock); 3174 if (index < 0) { 3175 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file, 3176 uparms->rdc_set->secondary.file); 3177 return (RDC_EALREADY); 3178 } 3179 3180 urdc = &rdc_u_info[index]; 3181 this = &rdc_k_info[index]; 3182 krdc = &rdc_k_info[index]; 3183 3184 do { 3185 if (!IS_STATE(urdc, RDC_LOGGING)) { 3186 spcs_s_add(kstatus, RDC_EQNOTLOGGING, 3187 urdc->disk_queue); 3188 return (RDC_EQNOTLOGGING); 3189 } 3190 krdc = krdc->group_next; 3191 urdc = &rdc_u_info[krdc->index]; 3192 3193 } while (krdc != this); 3194 3195 /* 3196 * If there is no group or diskq configured, we can leave now 3197 */ 3198 if (!(group = krdc->group) || !(diskq = &group->diskq)) 3199 return (0); 3200 3201 3202 /* 3203 * Wait if not QEMPTY or threads still active 3204 */ 3205 counter = 0; 3206 while (!QEMPTY(diskq) || group->rdc_thrnum) { 3207 3208 /* 3209 * Capture counters to determine if progress is being made 3210 */ 3211 blocks = QBLOCKS(diskq); 3212 threads = group->rdc_thrnum; 3213 3214 /* 3215 * Wait 3216 */ 3217 delay(HZ); 3218 3219 /* 3220 * Has the group or disk queue gone away while delayed? 3221 */ 3222 if (!(group = krdc->group) || !(diskq = &group->diskq)) 3223 return (0); 3224 3225 /* 3226 * Are we still seeing progress? 3227 */ 3228 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) { 3229 /* 3230 * No progress see, decrement retry counter 3231 */ 3232 if (counter++ > NUM_RETRIES) { 3233 /* 3234 * No progress seen, increment retry counter 3235 */ 3236 int rc = group->rdc_thrnum ? 3237 RDC_EQFLUSHING : RDC_EQNOTEMPTY; 3238 spcs_s_add(kstatus, rc, urdc->disk_queue); 3239 return (rc); 3240 } 3241 } else { 3242 /* 3243 * Reset counter, as we've made progress 3244 */ 3245 counter = 0; 3246 } 3247 } 3248 3249 return (0); 3250 } 3251