1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched.h> 29 #include <linux/smp_lock.h> 30 #include <linux/wait.h> 31 #include <linux/mm.h> 32 #include <linux/memcontrol.h> 33 #include <linux/mm_inline.h> 34 #include <linux/slab.h> 35 #include <linux/random.h> 36 #include <linux/string.h> 37 #include <linux/scatterlist.h> 38 39 #include "drbd_int.h" 40 #include "drbd_req.h" 41 42 #define SLEEP_TIME (HZ/10) 43 44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 45 46 47 48 /* defined here: 49 drbd_md_io_complete 50 drbd_endio_sec 51 drbd_endio_pri 52 53 * more endio handlers: 54 atodb_endio in drbd_actlog.c 55 drbd_bm_async_io_complete in drbd_bitmap.c 56 57 * For all these callbacks, note the following: 58 * The callbacks will be called in irq context by the IDE drivers, 59 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 60 * Try to get the locking right :) 61 * 62 */ 63 64 65 /* About the global_state_lock 66 Each state transition on an device holds a read lock. In case we have 67 to evaluate the sync after dependencies, we grab a write lock, because 68 we need stable states on all devices for that. */ 69 rwlock_t global_state_lock; 70 71 /* used for synchronous meta data and bitmap IO 72 * submitted by drbd_md_sync_page_io() 73 */ 74 void drbd_md_io_complete(struct bio *bio, int error) 75 { 76 struct drbd_md_io *md_io; 77 78 md_io = (struct drbd_md_io *)bio->bi_private; 79 md_io->error = error; 80 81 complete(&md_io->event); 82 } 83 84 /* reads on behalf of the partner, 85 * "submitted" by the receiver 86 */ 87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) 88 { 89 unsigned long flags = 0; 90 struct drbd_conf *mdev = e->mdev; 91 92 D_ASSERT(e->block_id != ID_VACANT); 93 94 spin_lock_irqsave(&mdev->req_lock, flags); 95 mdev->read_cnt += e->size >> 9; 96 list_del(&e->w.list); 97 if (list_empty(&mdev->read_ee)) 98 wake_up(&mdev->ee_wait); 99 if (test_bit(__EE_WAS_ERROR, &e->flags)) 100 __drbd_chk_io_error(mdev, FALSE); 101 spin_unlock_irqrestore(&mdev->req_lock, flags); 102 103 drbd_queue_work(&mdev->data.work, &e->w); 104 put_ldev(mdev); 105 } 106 107 static int is_failed_barrier(int ee_flags) 108 { 109 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED)) 110 == (EE_IS_BARRIER|EE_WAS_ERROR); 111 } 112 113 /* writes on behalf of the partner, or resync writes, 114 * "submitted" by the receiver, final stage. */ 115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) 116 { 117 unsigned long flags = 0; 118 struct drbd_conf *mdev = e->mdev; 119 sector_t e_sector; 120 int do_wake; 121 int is_syncer_req; 122 int do_al_complete_io; 123 124 /* if this is a failed barrier request, disable use of barriers, 125 * and schedule for resubmission */ 126 if (is_failed_barrier(e->flags)) { 127 drbd_bump_write_ordering(mdev, WO_bdev_flush); 128 spin_lock_irqsave(&mdev->req_lock, flags); 129 list_del(&e->w.list); 130 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED; 131 e->w.cb = w_e_reissue; 132 /* put_ldev actually happens below, once we come here again. */ 133 __release(local); 134 spin_unlock_irqrestore(&mdev->req_lock, flags); 135 drbd_queue_work(&mdev->data.work, &e->w); 136 return; 137 } 138 139 D_ASSERT(e->block_id != ID_VACANT); 140 141 /* after we moved e to done_ee, 142 * we may no longer access it, 143 * it may be freed/reused already! 144 * (as soon as we release the req_lock) */ 145 e_sector = e->sector; 146 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 147 is_syncer_req = is_syncer_block_id(e->block_id); 148 149 spin_lock_irqsave(&mdev->req_lock, flags); 150 mdev->writ_cnt += e->size >> 9; 151 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 152 list_add_tail(&e->w.list, &mdev->done_ee); 153 154 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet, 155 * neither did we wake possibly waiting conflicting requests. 156 * done from "drbd_process_done_ee" within the appropriate w.cb 157 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */ 158 159 do_wake = is_syncer_req 160 ? list_empty(&mdev->sync_ee) 161 : list_empty(&mdev->active_ee); 162 163 if (test_bit(__EE_WAS_ERROR, &e->flags)) 164 __drbd_chk_io_error(mdev, FALSE); 165 spin_unlock_irqrestore(&mdev->req_lock, flags); 166 167 if (is_syncer_req) 168 drbd_rs_complete_io(mdev, e_sector); 169 170 if (do_wake) 171 wake_up(&mdev->ee_wait); 172 173 if (do_al_complete_io) 174 drbd_al_complete_io(mdev, e_sector); 175 176 wake_asender(mdev); 177 put_ldev(mdev); 178 } 179 180 /* writes on behalf of the partner, or resync writes, 181 * "submitted" by the receiver. 182 */ 183 void drbd_endio_sec(struct bio *bio, int error) 184 { 185 struct drbd_epoch_entry *e = bio->bi_private; 186 struct drbd_conf *mdev = e->mdev; 187 int uptodate = bio_flagged(bio, BIO_UPTODATE); 188 int is_write = bio_data_dir(bio) == WRITE; 189 190 if (error) 191 dev_warn(DEV, "%s: error=%d s=%llus\n", 192 is_write ? "write" : "read", error, 193 (unsigned long long)e->sector); 194 if (!error && !uptodate) { 195 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 196 is_write ? "write" : "read", 197 (unsigned long long)e->sector); 198 /* strange behavior of some lower level drivers... 199 * fail the request by clearing the uptodate flag, 200 * but do not return any error?! */ 201 error = -EIO; 202 } 203 204 if (error) 205 set_bit(__EE_WAS_ERROR, &e->flags); 206 207 bio_put(bio); /* no need for the bio anymore */ 208 if (atomic_dec_and_test(&e->pending_bios)) { 209 if (is_write) 210 drbd_endio_write_sec_final(e); 211 else 212 drbd_endio_read_sec_final(e); 213 } 214 } 215 216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 217 */ 218 void drbd_endio_pri(struct bio *bio, int error) 219 { 220 unsigned long flags; 221 struct drbd_request *req = bio->bi_private; 222 struct drbd_conf *mdev = req->mdev; 223 struct bio_and_error m; 224 enum drbd_req_event what; 225 int uptodate = bio_flagged(bio, BIO_UPTODATE); 226 227 if (!error && !uptodate) { 228 dev_warn(DEV, "p %s: setting error to -EIO\n", 229 bio_data_dir(bio) == WRITE ? "write" : "read"); 230 /* strange behavior of some lower level drivers... 231 * fail the request by clearing the uptodate flag, 232 * but do not return any error?! */ 233 error = -EIO; 234 } 235 236 /* to avoid recursion in __req_mod */ 237 if (unlikely(error)) { 238 what = (bio_data_dir(bio) == WRITE) 239 ? write_completed_with_error 240 : (bio_rw(bio) == READ) 241 ? read_completed_with_error 242 : read_ahead_completed_with_error; 243 } else 244 what = completed_ok; 245 246 bio_put(req->private_bio); 247 req->private_bio = ERR_PTR(error); 248 249 spin_lock_irqsave(&mdev->req_lock, flags); 250 __req_mod(req, what, &m); 251 spin_unlock_irqrestore(&mdev->req_lock, flags); 252 253 if (m.bio) 254 complete_master_bio(mdev, &m); 255 } 256 257 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 258 { 259 struct drbd_request *req = container_of(w, struct drbd_request, w); 260 261 /* We should not detach for read io-error, 262 * but try to WRITE the P_DATA_REPLY to the failed location, 263 * to give the disk the chance to relocate that block */ 264 265 spin_lock_irq(&mdev->req_lock); 266 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 267 _req_mod(req, read_retry_remote_canceled); 268 spin_unlock_irq(&mdev->req_lock); 269 return 1; 270 } 271 spin_unlock_irq(&mdev->req_lock); 272 273 return w_send_read_req(mdev, w, 0); 274 } 275 276 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 277 { 278 ERR_IF(cancel) return 1; 279 dev_err(DEV, "resync inactive, but callback triggered??\n"); 280 return 1; /* Simply ignore this! */ 281 } 282 283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) 284 { 285 struct hash_desc desc; 286 struct scatterlist sg; 287 struct page *page = e->pages; 288 struct page *tmp; 289 unsigned len; 290 291 desc.tfm = tfm; 292 desc.flags = 0; 293 294 sg_init_table(&sg, 1); 295 crypto_hash_init(&desc); 296 297 while ((tmp = page_chain_next(page))) { 298 /* all but the last page will be fully used */ 299 sg_set_page(&sg, page, PAGE_SIZE, 0); 300 crypto_hash_update(&desc, &sg, sg.length); 301 page = tmp; 302 } 303 /* and now the last, possibly only partially used page */ 304 len = e->size & (PAGE_SIZE - 1); 305 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 306 crypto_hash_update(&desc, &sg, sg.length); 307 crypto_hash_final(&desc, digest); 308 } 309 310 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 311 { 312 struct hash_desc desc; 313 struct scatterlist sg; 314 struct bio_vec *bvec; 315 int i; 316 317 desc.tfm = tfm; 318 desc.flags = 0; 319 320 sg_init_table(&sg, 1); 321 crypto_hash_init(&desc); 322 323 __bio_for_each_segment(bvec, bio, i, 0) { 324 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 325 crypto_hash_update(&desc, &sg, sg.length); 326 } 327 crypto_hash_final(&desc, digest); 328 } 329 330 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 331 { 332 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 333 int digest_size; 334 void *digest; 335 int ok; 336 337 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 338 339 if (unlikely(cancel)) { 340 drbd_free_ee(mdev, e); 341 return 1; 342 } 343 344 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 345 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 346 digest = kmalloc(digest_size, GFP_NOIO); 347 if (digest) { 348 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 349 350 inc_rs_pending(mdev); 351 ok = drbd_send_drequest_csum(mdev, 352 e->sector, 353 e->size, 354 digest, 355 digest_size, 356 P_CSUM_RS_REQUEST); 357 kfree(digest); 358 } else { 359 dev_err(DEV, "kmalloc() of digest failed.\n"); 360 ok = 0; 361 } 362 } else 363 ok = 1; 364 365 drbd_free_ee(mdev, e); 366 367 if (unlikely(!ok)) 368 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 369 return ok; 370 } 371 372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 373 374 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 375 { 376 struct drbd_epoch_entry *e; 377 378 if (!get_ldev(mdev)) 379 return 0; 380 381 /* GFP_TRY, because if there is no memory available right now, this may 382 * be rescheduled for later. It is "only" background resync, after all. */ 383 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 384 if (!e) 385 goto fail; 386 387 spin_lock_irq(&mdev->req_lock); 388 list_add(&e->w.list, &mdev->read_ee); 389 spin_unlock_irq(&mdev->req_lock); 390 391 e->w.cb = w_e_send_csum; 392 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) 393 return 1; 394 395 drbd_free_ee(mdev, e); 396 fail: 397 put_ldev(mdev); 398 return 2; 399 } 400 401 void resync_timer_fn(unsigned long data) 402 { 403 unsigned long flags; 404 struct drbd_conf *mdev = (struct drbd_conf *) data; 405 int queue; 406 407 spin_lock_irqsave(&mdev->req_lock, flags); 408 409 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) { 410 queue = 1; 411 if (mdev->state.conn == C_VERIFY_S) 412 mdev->resync_work.cb = w_make_ov_request; 413 else 414 mdev->resync_work.cb = w_make_resync_request; 415 } else { 416 queue = 0; 417 mdev->resync_work.cb = w_resync_inactive; 418 } 419 420 spin_unlock_irqrestore(&mdev->req_lock, flags); 421 422 /* harmless race: list_empty outside data.work.q_lock */ 423 if (list_empty(&mdev->resync_work.list) && queue) 424 drbd_queue_work(&mdev->data.work, &mdev->resync_work); 425 } 426 427 static int calc_resync_rate(struct drbd_conf *mdev) 428 { 429 int d = mdev->data_delay / 1000; /* us -> ms */ 430 int td = mdev->sync_conf.throttle_th * 100; /* 0.1s -> ms */ 431 int hd = mdev->sync_conf.hold_off_th * 100; /* 0.1s -> ms */ 432 int cr = mdev->sync_conf.rate; 433 434 return d <= td ? cr : 435 d >= hd ? 0 : 436 cr + (cr * (td - d) / (hd - td)); 437 } 438 439 int w_make_resync_request(struct drbd_conf *mdev, 440 struct drbd_work *w, int cancel) 441 { 442 unsigned long bit; 443 sector_t sector; 444 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 445 int max_segment_size; 446 int number, i, size, pe, mx; 447 int align, queued, sndbuf; 448 449 if (unlikely(cancel)) 450 return 1; 451 452 if (unlikely(mdev->state.conn < C_CONNECTED)) { 453 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected"); 454 return 0; 455 } 456 457 if (mdev->state.conn != C_SYNC_TARGET) 458 dev_err(DEV, "%s in w_make_resync_request\n", 459 drbd_conn_str(mdev->state.conn)); 460 461 if (!get_ldev(mdev)) { 462 /* Since we only need to access mdev->rsync a 463 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 464 to continue resync with a broken disk makes no sense at 465 all */ 466 dev_err(DEV, "Disk broke down during resync!\n"); 467 mdev->resync_work.cb = w_resync_inactive; 468 return 1; 469 } 470 471 /* starting with drbd 8.3.8, we can handle multi-bio EEs, 472 * if it should be necessary */ 473 max_segment_size = mdev->agreed_pro_version < 94 ? 474 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE; 475 476 mdev->c_sync_rate = calc_resync_rate(mdev); 477 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 478 pe = atomic_read(&mdev->rs_pending_cnt); 479 480 mutex_lock(&mdev->data.mutex); 481 if (mdev->data.socket) 482 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req); 483 else 484 mx = 1; 485 mutex_unlock(&mdev->data.mutex); 486 487 /* For resync rates >160MB/sec, allow more pending RS requests */ 488 if (number > mx) 489 mx = number; 490 491 /* Limit the number of pending RS requests to no more than the peer's receive buffer */ 492 if ((pe + number) > mx) { 493 number = mx - pe; 494 } 495 496 for (i = 0; i < number; i++) { 497 /* Stop generating RS requests, when half of the send buffer is filled */ 498 mutex_lock(&mdev->data.mutex); 499 if (mdev->data.socket) { 500 queued = mdev->data.socket->sk->sk_wmem_queued; 501 sndbuf = mdev->data.socket->sk->sk_sndbuf; 502 } else { 503 queued = 1; 504 sndbuf = 0; 505 } 506 mutex_unlock(&mdev->data.mutex); 507 if (queued > sndbuf / 2) 508 goto requeue; 509 510 next_sector: 511 size = BM_BLOCK_SIZE; 512 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 513 514 if (bit == -1UL) { 515 mdev->bm_resync_fo = drbd_bm_bits(mdev); 516 mdev->resync_work.cb = w_resync_inactive; 517 put_ldev(mdev); 518 return 1; 519 } 520 521 sector = BM_BIT_TO_SECT(bit); 522 523 if (drbd_try_rs_begin_io(mdev, sector)) { 524 mdev->bm_resync_fo = bit; 525 goto requeue; 526 } 527 mdev->bm_resync_fo = bit + 1; 528 529 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 530 drbd_rs_complete_io(mdev, sector); 531 goto next_sector; 532 } 533 534 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE 535 /* try to find some adjacent bits. 536 * we stop if we have already the maximum req size. 537 * 538 * Additionally always align bigger requests, in order to 539 * be prepared for all stripe sizes of software RAIDs. 540 */ 541 align = 1; 542 for (;;) { 543 if (size + BM_BLOCK_SIZE > max_segment_size) 544 break; 545 546 /* Be always aligned */ 547 if (sector & ((1<<(align+3))-1)) 548 break; 549 550 /* do not cross extent boundaries */ 551 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 552 break; 553 /* now, is it actually dirty, after all? 554 * caution, drbd_bm_test_bit is tri-state for some 555 * obscure reason; ( b == 0 ) would get the out-of-band 556 * only accidentally right because of the "oddly sized" 557 * adjustment below */ 558 if (drbd_bm_test_bit(mdev, bit+1) != 1) 559 break; 560 bit++; 561 size += BM_BLOCK_SIZE; 562 if ((BM_BLOCK_SIZE << align) <= size) 563 align++; 564 i++; 565 } 566 /* if we merged some, 567 * reset the offset to start the next drbd_bm_find_next from */ 568 if (size > BM_BLOCK_SIZE) 569 mdev->bm_resync_fo = bit + 1; 570 #endif 571 572 /* adjust very last sectors, in case we are oddly sized */ 573 if (sector + (size>>9) > capacity) 574 size = (capacity-sector)<<9; 575 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) { 576 switch (read_for_csum(mdev, sector, size)) { 577 case 0: /* Disk failure*/ 578 put_ldev(mdev); 579 return 0; 580 case 2: /* Allocation failed */ 581 drbd_rs_complete_io(mdev, sector); 582 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 583 goto requeue; 584 /* case 1: everything ok */ 585 } 586 } else { 587 inc_rs_pending(mdev); 588 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 589 sector, size, ID_SYNCER)) { 590 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 591 dec_rs_pending(mdev); 592 put_ldev(mdev); 593 return 0; 594 } 595 } 596 } 597 598 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 599 /* last syncer _request_ was sent, 600 * but the P_RS_DATA_REPLY not yet received. sync will end (and 601 * next sync group will resume), as soon as we receive the last 602 * resync data block, and the last bit is cleared. 603 * until then resync "work" is "inactive" ... 604 */ 605 mdev->resync_work.cb = w_resync_inactive; 606 put_ldev(mdev); 607 return 1; 608 } 609 610 requeue: 611 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 612 put_ldev(mdev); 613 return 1; 614 } 615 616 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 617 { 618 int number, i, size; 619 sector_t sector; 620 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 621 622 if (unlikely(cancel)) 623 return 1; 624 625 if (unlikely(mdev->state.conn < C_CONNECTED)) { 626 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected"); 627 return 0; 628 } 629 630 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); 631 if (atomic_read(&mdev->rs_pending_cnt) > number) 632 goto requeue; 633 634 number -= atomic_read(&mdev->rs_pending_cnt); 635 636 sector = mdev->ov_position; 637 for (i = 0; i < number; i++) { 638 if (sector >= capacity) { 639 mdev->resync_work.cb = w_resync_inactive; 640 return 1; 641 } 642 643 size = BM_BLOCK_SIZE; 644 645 if (drbd_try_rs_begin_io(mdev, sector)) { 646 mdev->ov_position = sector; 647 goto requeue; 648 } 649 650 if (sector + (size>>9) > capacity) 651 size = (capacity-sector)<<9; 652 653 inc_rs_pending(mdev); 654 if (!drbd_send_ov_request(mdev, sector, size)) { 655 dec_rs_pending(mdev); 656 return 0; 657 } 658 sector += BM_SECT_PER_BIT; 659 } 660 mdev->ov_position = sector; 661 662 requeue: 663 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 664 return 1; 665 } 666 667 668 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 669 { 670 kfree(w); 671 ov_oos_print(mdev); 672 drbd_resync_finished(mdev); 673 674 return 1; 675 } 676 677 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 678 { 679 kfree(w); 680 681 drbd_resync_finished(mdev); 682 683 return 1; 684 } 685 686 int drbd_resync_finished(struct drbd_conf *mdev) 687 { 688 unsigned long db, dt, dbdt; 689 unsigned long n_oos; 690 union drbd_state os, ns; 691 struct drbd_work *w; 692 char *khelper_cmd = NULL; 693 694 /* Remove all elements from the resync LRU. Since future actions 695 * might set bits in the (main) bitmap, then the entries in the 696 * resync LRU would be wrong. */ 697 if (drbd_rs_del_all(mdev)) { 698 /* In case this is not possible now, most probably because 699 * there are P_RS_DATA_REPLY Packets lingering on the worker's 700 * queue (or even the read operations for those packets 701 * is not finished by now). Retry in 100ms. */ 702 703 drbd_kick_lo(mdev); 704 __set_current_state(TASK_INTERRUPTIBLE); 705 schedule_timeout(HZ / 10); 706 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 707 if (w) { 708 w->cb = w_resync_finished; 709 drbd_queue_work(&mdev->data.work, w); 710 return 1; 711 } 712 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 713 } 714 715 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 716 if (dt <= 0) 717 dt = 1; 718 db = mdev->rs_total; 719 dbdt = Bit2KB(db/dt); 720 mdev->rs_paused /= HZ; 721 722 if (!get_ldev(mdev)) 723 goto out; 724 725 spin_lock_irq(&mdev->req_lock); 726 os = mdev->state; 727 728 /* This protects us against multiple calls (that can happen in the presence 729 of application IO), and against connectivity loss just before we arrive here. */ 730 if (os.conn <= C_CONNECTED) 731 goto out_unlock; 732 733 ns = os; 734 ns.conn = C_CONNECTED; 735 736 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 737 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ? 738 "Online verify " : "Resync", 739 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 740 741 n_oos = drbd_bm_total_weight(mdev); 742 743 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 744 if (n_oos) { 745 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 746 n_oos, Bit2KB(1)); 747 khelper_cmd = "out-of-sync"; 748 } 749 } else { 750 D_ASSERT((n_oos - mdev->rs_failed) == 0); 751 752 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 753 khelper_cmd = "after-resync-target"; 754 755 if (mdev->csums_tfm && mdev->rs_total) { 756 const unsigned long s = mdev->rs_same_csum; 757 const unsigned long t = mdev->rs_total; 758 const int ratio = 759 (t == 0) ? 0 : 760 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 761 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; " 762 "transferred %luK total %luK\n", 763 ratio, 764 Bit2KB(mdev->rs_same_csum), 765 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 766 Bit2KB(mdev->rs_total)); 767 } 768 } 769 770 if (mdev->rs_failed) { 771 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 772 773 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 774 ns.disk = D_INCONSISTENT; 775 ns.pdsk = D_UP_TO_DATE; 776 } else { 777 ns.disk = D_UP_TO_DATE; 778 ns.pdsk = D_INCONSISTENT; 779 } 780 } else { 781 ns.disk = D_UP_TO_DATE; 782 ns.pdsk = D_UP_TO_DATE; 783 784 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 785 if (mdev->p_uuid) { 786 int i; 787 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 788 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 789 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 790 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 791 } else { 792 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 793 } 794 } 795 796 drbd_uuid_set_bm(mdev, 0UL); 797 798 if (mdev->p_uuid) { 799 /* Now the two UUID sets are equal, update what we 800 * know of the peer. */ 801 int i; 802 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 803 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 804 } 805 } 806 807 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 808 out_unlock: 809 spin_unlock_irq(&mdev->req_lock); 810 put_ldev(mdev); 811 out: 812 mdev->rs_total = 0; 813 mdev->rs_failed = 0; 814 mdev->rs_paused = 0; 815 mdev->ov_start_sector = 0; 816 817 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) { 818 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n"); 819 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished"); 820 } 821 822 if (khelper_cmd) 823 drbd_khelper(mdev, khelper_cmd); 824 825 return 1; 826 } 827 828 /* helper */ 829 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 830 { 831 if (drbd_ee_has_active_page(e)) { 832 /* This might happen if sendpage() has not finished */ 833 spin_lock_irq(&mdev->req_lock); 834 list_add_tail(&e->w.list, &mdev->net_ee); 835 spin_unlock_irq(&mdev->req_lock); 836 } else 837 drbd_free_ee(mdev, e); 838 } 839 840 /** 841 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 842 * @mdev: DRBD device. 843 * @w: work object. 844 * @cancel: The connection will be closed anyways 845 */ 846 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 847 { 848 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 849 int ok; 850 851 if (unlikely(cancel)) { 852 drbd_free_ee(mdev, e); 853 dec_unacked(mdev); 854 return 1; 855 } 856 857 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 858 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 859 } else { 860 if (__ratelimit(&drbd_ratelimit_state)) 861 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 862 (unsigned long long)e->sector); 863 864 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e); 865 } 866 867 dec_unacked(mdev); 868 869 move_to_net_ee_or_free(mdev, e); 870 871 if (unlikely(!ok)) 872 dev_err(DEV, "drbd_send_block() failed\n"); 873 return ok; 874 } 875 876 /** 877 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 878 * @mdev: DRBD device. 879 * @w: work object. 880 * @cancel: The connection will be closed anyways 881 */ 882 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 883 { 884 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 885 int ok; 886 887 if (unlikely(cancel)) { 888 drbd_free_ee(mdev, e); 889 dec_unacked(mdev); 890 return 1; 891 } 892 893 if (get_ldev_if_state(mdev, D_FAILED)) { 894 drbd_rs_complete_io(mdev, e->sector); 895 put_ldev(mdev); 896 } 897 898 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 899 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 900 inc_rs_pending(mdev); 901 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 902 } else { 903 if (__ratelimit(&drbd_ratelimit_state)) 904 dev_err(DEV, "Not sending RSDataReply, " 905 "partner DISKLESS!\n"); 906 ok = 1; 907 } 908 } else { 909 if (__ratelimit(&drbd_ratelimit_state)) 910 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 911 (unsigned long long)e->sector); 912 913 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 914 915 /* update resync data with failure */ 916 drbd_rs_failed_io(mdev, e->sector, e->size); 917 } 918 919 dec_unacked(mdev); 920 921 move_to_net_ee_or_free(mdev, e); 922 923 if (unlikely(!ok)) 924 dev_err(DEV, "drbd_send_block() failed\n"); 925 return ok; 926 } 927 928 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 929 { 930 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 931 struct digest_info *di; 932 int digest_size; 933 void *digest = NULL; 934 int ok, eq = 0; 935 936 if (unlikely(cancel)) { 937 drbd_free_ee(mdev, e); 938 dec_unacked(mdev); 939 return 1; 940 } 941 942 drbd_rs_complete_io(mdev, e->sector); 943 944 di = (struct digest_info *)(unsigned long)e->block_id; 945 946 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 947 /* quick hack to try to avoid a race against reconfiguration. 948 * a real fix would be much more involved, 949 * introducing more locking mechanisms */ 950 if (mdev->csums_tfm) { 951 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 952 D_ASSERT(digest_size == di->digest_size); 953 digest = kmalloc(digest_size, GFP_NOIO); 954 } 955 if (digest) { 956 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 957 eq = !memcmp(digest, di->digest, digest_size); 958 kfree(digest); 959 } 960 961 if (eq) { 962 drbd_set_in_sync(mdev, e->sector, e->size); 963 /* rs_same_csums unit is BM_BLOCK_SIZE */ 964 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT; 965 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e); 966 } else { 967 inc_rs_pending(mdev); 968 e->block_id = ID_SYNCER; 969 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 970 } 971 } else { 972 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 973 if (__ratelimit(&drbd_ratelimit_state)) 974 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 975 } 976 977 dec_unacked(mdev); 978 979 kfree(di); 980 981 move_to_net_ee_or_free(mdev, e); 982 983 if (unlikely(!ok)) 984 dev_err(DEV, "drbd_send_block/ack() failed\n"); 985 return ok; 986 } 987 988 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 989 { 990 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 991 int digest_size; 992 void *digest; 993 int ok = 1; 994 995 if (unlikely(cancel)) 996 goto out; 997 998 if (unlikely((e->flags & EE_WAS_ERROR) != 0)) 999 goto out; 1000 1001 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1002 /* FIXME if this allocation fails, online verify will not terminate! */ 1003 digest = kmalloc(digest_size, GFP_NOIO); 1004 if (digest) { 1005 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1006 inc_rs_pending(mdev); 1007 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1008 digest, digest_size, P_OV_REPLY); 1009 if (!ok) 1010 dec_rs_pending(mdev); 1011 kfree(digest); 1012 } 1013 1014 out: 1015 drbd_free_ee(mdev, e); 1016 1017 dec_unacked(mdev); 1018 1019 return ok; 1020 } 1021 1022 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1023 { 1024 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1025 mdev->ov_last_oos_size += size>>9; 1026 } else { 1027 mdev->ov_last_oos_start = sector; 1028 mdev->ov_last_oos_size = size>>9; 1029 } 1030 drbd_set_out_of_sync(mdev, sector, size); 1031 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags); 1032 } 1033 1034 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1035 { 1036 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1037 struct digest_info *di; 1038 int digest_size; 1039 void *digest; 1040 int ok, eq = 0; 1041 1042 if (unlikely(cancel)) { 1043 drbd_free_ee(mdev, e); 1044 dec_unacked(mdev); 1045 return 1; 1046 } 1047 1048 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1049 * the resync lru has been cleaned up already */ 1050 drbd_rs_complete_io(mdev, e->sector); 1051 1052 di = (struct digest_info *)(unsigned long)e->block_id; 1053 1054 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1055 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1056 digest = kmalloc(digest_size, GFP_NOIO); 1057 if (digest) { 1058 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1059 1060 D_ASSERT(digest_size == di->digest_size); 1061 eq = !memcmp(digest, di->digest, digest_size); 1062 kfree(digest); 1063 } 1064 } else { 1065 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1066 if (__ratelimit(&drbd_ratelimit_state)) 1067 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1068 } 1069 1070 dec_unacked(mdev); 1071 1072 kfree(di); 1073 1074 if (!eq) 1075 drbd_ov_oos_found(mdev, e->sector, e->size); 1076 else 1077 ov_oos_print(mdev); 1078 1079 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1080 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1081 1082 drbd_free_ee(mdev, e); 1083 1084 if (--mdev->ov_left == 0) { 1085 ov_oos_print(mdev); 1086 drbd_resync_finished(mdev); 1087 } 1088 1089 return ok; 1090 } 1091 1092 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1093 { 1094 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1095 complete(&b->done); 1096 return 1; 1097 } 1098 1099 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1100 { 1101 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1102 struct p_barrier *p = &mdev->data.sbuf.barrier; 1103 int ok = 1; 1104 1105 /* really avoid racing with tl_clear. w.cb may have been referenced 1106 * just before it was reassigned and re-queued, so double check that. 1107 * actually, this race was harmless, since we only try to send the 1108 * barrier packet here, and otherwise do nothing with the object. 1109 * but compare with the head of w_clear_epoch */ 1110 spin_lock_irq(&mdev->req_lock); 1111 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1112 cancel = 1; 1113 spin_unlock_irq(&mdev->req_lock); 1114 if (cancel) 1115 return 1; 1116 1117 if (!drbd_get_data_sock(mdev)) 1118 return 0; 1119 p->barrier = b->br_number; 1120 /* inc_ap_pending was done where this was queued. 1121 * dec_ap_pending will be done in got_BarrierAck 1122 * or (on connection loss) in w_clear_epoch. */ 1123 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER, 1124 (struct p_header *)p, sizeof(*p), 0); 1125 drbd_put_data_sock(mdev); 1126 1127 return ok; 1128 } 1129 1130 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1131 { 1132 if (cancel) 1133 return 1; 1134 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1135 } 1136 1137 /** 1138 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1139 * @mdev: DRBD device. 1140 * @w: work object. 1141 * @cancel: The connection will be closed anyways 1142 */ 1143 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1144 { 1145 struct drbd_request *req = container_of(w, struct drbd_request, w); 1146 int ok; 1147 1148 if (unlikely(cancel)) { 1149 req_mod(req, send_canceled); 1150 return 1; 1151 } 1152 1153 ok = drbd_send_dblock(mdev, req); 1154 req_mod(req, ok ? handed_over_to_network : send_failed); 1155 1156 return ok; 1157 } 1158 1159 /** 1160 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1161 * @mdev: DRBD device. 1162 * @w: work object. 1163 * @cancel: The connection will be closed anyways 1164 */ 1165 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1166 { 1167 struct drbd_request *req = container_of(w, struct drbd_request, w); 1168 int ok; 1169 1170 if (unlikely(cancel)) { 1171 req_mod(req, send_canceled); 1172 return 1; 1173 } 1174 1175 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size, 1176 (unsigned long)req); 1177 1178 if (!ok) { 1179 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1180 * so this is probably redundant */ 1181 if (mdev->state.conn >= C_CONNECTED) 1182 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1183 } 1184 req_mod(req, ok ? handed_over_to_network : send_failed); 1185 1186 return ok; 1187 } 1188 1189 static int _drbd_may_sync_now(struct drbd_conf *mdev) 1190 { 1191 struct drbd_conf *odev = mdev; 1192 1193 while (1) { 1194 if (odev->sync_conf.after == -1) 1195 return 1; 1196 odev = minor_to_mdev(odev->sync_conf.after); 1197 ERR_IF(!odev) return 1; 1198 if ((odev->state.conn >= C_SYNC_SOURCE && 1199 odev->state.conn <= C_PAUSED_SYNC_T) || 1200 odev->state.aftr_isp || odev->state.peer_isp || 1201 odev->state.user_isp) 1202 return 0; 1203 } 1204 } 1205 1206 /** 1207 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1208 * @mdev: DRBD device. 1209 * 1210 * Called from process context only (admin command and after_state_ch). 1211 */ 1212 static int _drbd_pause_after(struct drbd_conf *mdev) 1213 { 1214 struct drbd_conf *odev; 1215 int i, rv = 0; 1216 1217 for (i = 0; i < minor_count; i++) { 1218 odev = minor_to_mdev(i); 1219 if (!odev) 1220 continue; 1221 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1222 continue; 1223 if (!_drbd_may_sync_now(odev)) 1224 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1225 != SS_NOTHING_TO_DO); 1226 } 1227 1228 return rv; 1229 } 1230 1231 /** 1232 * _drbd_resume_next() - Resume resync on all devices that may resync now 1233 * @mdev: DRBD device. 1234 * 1235 * Called from process context only (admin command and worker). 1236 */ 1237 static int _drbd_resume_next(struct drbd_conf *mdev) 1238 { 1239 struct drbd_conf *odev; 1240 int i, rv = 0; 1241 1242 for (i = 0; i < minor_count; i++) { 1243 odev = minor_to_mdev(i); 1244 if (!odev) 1245 continue; 1246 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1247 continue; 1248 if (odev->state.aftr_isp) { 1249 if (_drbd_may_sync_now(odev)) 1250 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1251 CS_HARD, NULL) 1252 != SS_NOTHING_TO_DO) ; 1253 } 1254 } 1255 return rv; 1256 } 1257 1258 void resume_next_sg(struct drbd_conf *mdev) 1259 { 1260 write_lock_irq(&global_state_lock); 1261 _drbd_resume_next(mdev); 1262 write_unlock_irq(&global_state_lock); 1263 } 1264 1265 void suspend_other_sg(struct drbd_conf *mdev) 1266 { 1267 write_lock_irq(&global_state_lock); 1268 _drbd_pause_after(mdev); 1269 write_unlock_irq(&global_state_lock); 1270 } 1271 1272 static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1273 { 1274 struct drbd_conf *odev; 1275 1276 if (o_minor == -1) 1277 return NO_ERROR; 1278 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1279 return ERR_SYNC_AFTER; 1280 1281 /* check for loops */ 1282 odev = minor_to_mdev(o_minor); 1283 while (1) { 1284 if (odev == mdev) 1285 return ERR_SYNC_AFTER_CYCLE; 1286 1287 /* dependency chain ends here, no cycles. */ 1288 if (odev->sync_conf.after == -1) 1289 return NO_ERROR; 1290 1291 /* follow the dependency chain */ 1292 odev = minor_to_mdev(odev->sync_conf.after); 1293 } 1294 } 1295 1296 int drbd_alter_sa(struct drbd_conf *mdev, int na) 1297 { 1298 int changes; 1299 int retcode; 1300 1301 write_lock_irq(&global_state_lock); 1302 retcode = sync_after_error(mdev, na); 1303 if (retcode == NO_ERROR) { 1304 mdev->sync_conf.after = na; 1305 do { 1306 changes = _drbd_pause_after(mdev); 1307 changes |= _drbd_resume_next(mdev); 1308 } while (changes); 1309 } 1310 write_unlock_irq(&global_state_lock); 1311 return retcode; 1312 } 1313 1314 static void ping_peer(struct drbd_conf *mdev) 1315 { 1316 clear_bit(GOT_PING_ACK, &mdev->flags); 1317 request_ping(mdev); 1318 wait_event(mdev->misc_wait, 1319 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED); 1320 } 1321 1322 /** 1323 * drbd_start_resync() - Start the resync process 1324 * @mdev: DRBD device. 1325 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1326 * 1327 * This function might bring you directly into one of the 1328 * C_PAUSED_SYNC_* states. 1329 */ 1330 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1331 { 1332 union drbd_state ns; 1333 int r; 1334 1335 if (mdev->state.conn >= C_SYNC_SOURCE) { 1336 dev_err(DEV, "Resync already running!\n"); 1337 return; 1338 } 1339 1340 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1341 drbd_rs_cancel_all(mdev); 1342 1343 if (side == C_SYNC_TARGET) { 1344 /* Since application IO was locked out during C_WF_BITMAP_T and 1345 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1346 we check that we might make the data inconsistent. */ 1347 r = drbd_khelper(mdev, "before-resync-target"); 1348 r = (r >> 8) & 0xff; 1349 if (r > 0) { 1350 dev_info(DEV, "before-resync-target handler returned %d, " 1351 "dropping connection.\n", r); 1352 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1353 return; 1354 } 1355 } 1356 1357 drbd_state_lock(mdev); 1358 1359 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1360 drbd_state_unlock(mdev); 1361 return; 1362 } 1363 1364 if (side == C_SYNC_TARGET) { 1365 mdev->bm_resync_fo = 0; 1366 } else /* side == C_SYNC_SOURCE */ { 1367 u64 uuid; 1368 1369 get_random_bytes(&uuid, sizeof(u64)); 1370 drbd_uuid_set(mdev, UI_BITMAP, uuid); 1371 drbd_send_sync_uuid(mdev, uuid); 1372 1373 D_ASSERT(mdev->state.disk == D_UP_TO_DATE); 1374 } 1375 1376 write_lock_irq(&global_state_lock); 1377 ns = mdev->state; 1378 1379 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1380 1381 ns.conn = side; 1382 1383 if (side == C_SYNC_TARGET) 1384 ns.disk = D_INCONSISTENT; 1385 else /* side == C_SYNC_SOURCE */ 1386 ns.pdsk = D_INCONSISTENT; 1387 1388 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1389 ns = mdev->state; 1390 1391 if (ns.conn < C_CONNECTED) 1392 r = SS_UNKNOWN_ERROR; 1393 1394 if (r == SS_SUCCESS) { 1395 mdev->rs_total = 1396 mdev->rs_mark_left = drbd_bm_total_weight(mdev); 1397 mdev->rs_failed = 0; 1398 mdev->rs_paused = 0; 1399 mdev->rs_start = 1400 mdev->rs_mark_time = jiffies; 1401 mdev->rs_same_csum = 0; 1402 _drbd_pause_after(mdev); 1403 } 1404 write_unlock_irq(&global_state_lock); 1405 put_ldev(mdev); 1406 1407 if (r == SS_SUCCESS) { 1408 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1409 drbd_conn_str(ns.conn), 1410 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1411 (unsigned long) mdev->rs_total); 1412 1413 if (mdev->rs_total == 0) { 1414 /* Peer still reachable? Beware of failing before-resync-target handlers! */ 1415 ping_peer(mdev); 1416 drbd_resync_finished(mdev); 1417 } 1418 1419 /* ns.conn may already be != mdev->state.conn, 1420 * we may have been paused in between, or become paused until 1421 * the timer triggers. 1422 * No matter, that is handled in resync_timer_fn() */ 1423 if (ns.conn == C_SYNC_TARGET) 1424 mod_timer(&mdev->resync_timer, jiffies); 1425 1426 drbd_md_sync(mdev); 1427 } 1428 drbd_state_unlock(mdev); 1429 } 1430 1431 int drbd_worker(struct drbd_thread *thi) 1432 { 1433 struct drbd_conf *mdev = thi->mdev; 1434 struct drbd_work *w = NULL; 1435 LIST_HEAD(work_list); 1436 int intr = 0, i; 1437 1438 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1439 1440 while (get_t_state(thi) == Running) { 1441 drbd_thread_current_set_cpu(mdev); 1442 1443 if (down_trylock(&mdev->data.work.s)) { 1444 mutex_lock(&mdev->data.mutex); 1445 if (mdev->data.socket && !mdev->net_conf->no_cork) 1446 drbd_tcp_uncork(mdev->data.socket); 1447 mutex_unlock(&mdev->data.mutex); 1448 1449 intr = down_interruptible(&mdev->data.work.s); 1450 1451 mutex_lock(&mdev->data.mutex); 1452 if (mdev->data.socket && !mdev->net_conf->no_cork) 1453 drbd_tcp_cork(mdev->data.socket); 1454 mutex_unlock(&mdev->data.mutex); 1455 } 1456 1457 if (intr) { 1458 D_ASSERT(intr == -EINTR); 1459 flush_signals(current); 1460 ERR_IF (get_t_state(thi) == Running) 1461 continue; 1462 break; 1463 } 1464 1465 if (get_t_state(thi) != Running) 1466 break; 1467 /* With this break, we have done a down() but not consumed 1468 the entry from the list. The cleanup code takes care of 1469 this... */ 1470 1471 w = NULL; 1472 spin_lock_irq(&mdev->data.work.q_lock); 1473 ERR_IF(list_empty(&mdev->data.work.q)) { 1474 /* something terribly wrong in our logic. 1475 * we were able to down() the semaphore, 1476 * but the list is empty... doh. 1477 * 1478 * what is the best thing to do now? 1479 * try again from scratch, restarting the receiver, 1480 * asender, whatnot? could break even more ugly, 1481 * e.g. when we are primary, but no good local data. 1482 * 1483 * I'll try to get away just starting over this loop. 1484 */ 1485 spin_unlock_irq(&mdev->data.work.q_lock); 1486 continue; 1487 } 1488 w = list_entry(mdev->data.work.q.next, struct drbd_work, list); 1489 list_del_init(&w->list); 1490 spin_unlock_irq(&mdev->data.work.q_lock); 1491 1492 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1493 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1494 if (mdev->state.conn >= C_CONNECTED) 1495 drbd_force_state(mdev, 1496 NS(conn, C_NETWORK_FAILURE)); 1497 } 1498 } 1499 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1500 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1501 1502 spin_lock_irq(&mdev->data.work.q_lock); 1503 i = 0; 1504 while (!list_empty(&mdev->data.work.q)) { 1505 list_splice_init(&mdev->data.work.q, &work_list); 1506 spin_unlock_irq(&mdev->data.work.q_lock); 1507 1508 while (!list_empty(&work_list)) { 1509 w = list_entry(work_list.next, struct drbd_work, list); 1510 list_del_init(&w->list); 1511 w->cb(mdev, w, 1); 1512 i++; /* dead debugging code */ 1513 } 1514 1515 spin_lock_irq(&mdev->data.work.q_lock); 1516 } 1517 sema_init(&mdev->data.work.s, 0); 1518 /* DANGEROUS race: if someone did queue his work within the spinlock, 1519 * but up() ed outside the spinlock, we could get an up() on the 1520 * semaphore without corresponding list entry. 1521 * So don't do that. 1522 */ 1523 spin_unlock_irq(&mdev->data.work.q_lock); 1524 1525 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1526 /* _drbd_set_state only uses stop_nowait. 1527 * wait here for the Exiting receiver. */ 1528 drbd_thread_stop(&mdev->receiver); 1529 drbd_mdev_cleanup(mdev); 1530 1531 dev_info(DEV, "worker terminated\n"); 1532 1533 clear_bit(DEVICE_DYING, &mdev->flags); 1534 clear_bit(CONFIG_PENDING, &mdev->flags); 1535 wake_up(&mdev->state_wait); 1536 1537 return 0; 1538 } 1539