1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched.h> 29 #include <linux/smp_lock.h> 30 #include <linux/wait.h> 31 #include <linux/mm.h> 32 #include <linux/memcontrol.h> 33 #include <linux/mm_inline.h> 34 #include <linux/slab.h> 35 #include <linux/random.h> 36 #include <linux/string.h> 37 #include <linux/scatterlist.h> 38 39 #include "drbd_int.h" 40 #include "drbd_req.h" 41 42 #define SLEEP_TIME (HZ/10) 43 44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 45 46 47 48 /* defined here: 49 drbd_md_io_complete 50 drbd_endio_sec 51 drbd_endio_pri 52 53 * more endio handlers: 54 atodb_endio in drbd_actlog.c 55 drbd_bm_async_io_complete in drbd_bitmap.c 56 57 * For all these callbacks, note the following: 58 * The callbacks will be called in irq context by the IDE drivers, 59 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 60 * Try to get the locking right :) 61 * 62 */ 63 64 65 /* About the global_state_lock 66 Each state transition on an device holds a read lock. In case we have 67 to evaluate the sync after dependencies, we grab a write lock, because 68 we need stable states on all devices for that. */ 69 rwlock_t global_state_lock; 70 71 /* used for synchronous meta data and bitmap IO 72 * submitted by drbd_md_sync_page_io() 73 */ 74 void drbd_md_io_complete(struct bio *bio, int error) 75 { 76 struct drbd_md_io *md_io; 77 78 md_io = (struct drbd_md_io *)bio->bi_private; 79 md_io->error = error; 80 81 complete(&md_io->event); 82 } 83 84 /* reads on behalf of the partner, 85 * "submitted" by the receiver 86 */ 87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) 88 { 89 unsigned long flags = 0; 90 struct drbd_conf *mdev = e->mdev; 91 92 D_ASSERT(e->block_id != ID_VACANT); 93 94 spin_lock_irqsave(&mdev->req_lock, flags); 95 mdev->read_cnt += e->size >> 9; 96 list_del(&e->w.list); 97 if (list_empty(&mdev->read_ee)) 98 wake_up(&mdev->ee_wait); 99 if (test_bit(__EE_WAS_ERROR, &e->flags)) 100 __drbd_chk_io_error(mdev, FALSE); 101 spin_unlock_irqrestore(&mdev->req_lock, flags); 102 103 drbd_queue_work(&mdev->data.work, &e->w); 104 put_ldev(mdev); 105 } 106 107 static int is_failed_barrier(int ee_flags) 108 { 109 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED)) 110 == (EE_IS_BARRIER|EE_WAS_ERROR); 111 } 112 113 /* writes on behalf of the partner, or resync writes, 114 * "submitted" by the receiver, final stage. */ 115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) 116 { 117 unsigned long flags = 0; 118 struct drbd_conf *mdev = e->mdev; 119 sector_t e_sector; 120 int do_wake; 121 int is_syncer_req; 122 int do_al_complete_io; 123 124 /* if this is a failed barrier request, disable use of barriers, 125 * and schedule for resubmission */ 126 if (is_failed_barrier(e->flags)) { 127 drbd_bump_write_ordering(mdev, WO_bdev_flush); 128 spin_lock_irqsave(&mdev->req_lock, flags); 129 list_del(&e->w.list); 130 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED; 131 e->w.cb = w_e_reissue; 132 /* put_ldev actually happens below, once we come here again. */ 133 __release(local); 134 spin_unlock_irqrestore(&mdev->req_lock, flags); 135 drbd_queue_work(&mdev->data.work, &e->w); 136 return; 137 } 138 139 D_ASSERT(e->block_id != ID_VACANT); 140 141 /* after we moved e to done_ee, 142 * we may no longer access it, 143 * it may be freed/reused already! 144 * (as soon as we release the req_lock) */ 145 e_sector = e->sector; 146 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 147 is_syncer_req = is_syncer_block_id(e->block_id); 148 149 spin_lock_irqsave(&mdev->req_lock, flags); 150 mdev->writ_cnt += e->size >> 9; 151 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 152 list_add_tail(&e->w.list, &mdev->done_ee); 153 154 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet, 155 * neither did we wake possibly waiting conflicting requests. 156 * done from "drbd_process_done_ee" within the appropriate w.cb 157 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */ 158 159 do_wake = is_syncer_req 160 ? list_empty(&mdev->sync_ee) 161 : list_empty(&mdev->active_ee); 162 163 if (test_bit(__EE_WAS_ERROR, &e->flags)) 164 __drbd_chk_io_error(mdev, FALSE); 165 spin_unlock_irqrestore(&mdev->req_lock, flags); 166 167 if (is_syncer_req) 168 drbd_rs_complete_io(mdev, e_sector); 169 170 if (do_wake) 171 wake_up(&mdev->ee_wait); 172 173 if (do_al_complete_io) 174 drbd_al_complete_io(mdev, e_sector); 175 176 wake_asender(mdev); 177 put_ldev(mdev); 178 } 179 180 /* writes on behalf of the partner, or resync writes, 181 * "submitted" by the receiver. 182 */ 183 void drbd_endio_sec(struct bio *bio, int error) 184 { 185 struct drbd_epoch_entry *e = bio->bi_private; 186 struct drbd_conf *mdev = e->mdev; 187 int uptodate = bio_flagged(bio, BIO_UPTODATE); 188 int is_write = bio_data_dir(bio) == WRITE; 189 190 if (error) 191 dev_warn(DEV, "%s: error=%d s=%llus\n", 192 is_write ? "write" : "read", error, 193 (unsigned long long)e->sector); 194 if (!error && !uptodate) { 195 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 196 is_write ? "write" : "read", 197 (unsigned long long)e->sector); 198 /* strange behavior of some lower level drivers... 199 * fail the request by clearing the uptodate flag, 200 * but do not return any error?! */ 201 error = -EIO; 202 } 203 204 if (error) 205 set_bit(__EE_WAS_ERROR, &e->flags); 206 207 bio_put(bio); /* no need for the bio anymore */ 208 if (atomic_dec_and_test(&e->pending_bios)) { 209 if (is_write) 210 drbd_endio_write_sec_final(e); 211 else 212 drbd_endio_read_sec_final(e); 213 } 214 } 215 216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 217 */ 218 void drbd_endio_pri(struct bio *bio, int error) 219 { 220 unsigned long flags; 221 struct drbd_request *req = bio->bi_private; 222 struct drbd_conf *mdev = req->mdev; 223 struct bio_and_error m; 224 enum drbd_req_event what; 225 int uptodate = bio_flagged(bio, BIO_UPTODATE); 226 227 if (!error && !uptodate) { 228 dev_warn(DEV, "p %s: setting error to -EIO\n", 229 bio_data_dir(bio) == WRITE ? "write" : "read"); 230 /* strange behavior of some lower level drivers... 231 * fail the request by clearing the uptodate flag, 232 * but do not return any error?! */ 233 error = -EIO; 234 } 235 236 /* to avoid recursion in __req_mod */ 237 if (unlikely(error)) { 238 what = (bio_data_dir(bio) == WRITE) 239 ? write_completed_with_error 240 : (bio_rw(bio) == READ) 241 ? read_completed_with_error 242 : read_ahead_completed_with_error; 243 } else 244 what = completed_ok; 245 246 bio_put(req->private_bio); 247 req->private_bio = ERR_PTR(error); 248 249 spin_lock_irqsave(&mdev->req_lock, flags); 250 __req_mod(req, what, &m); 251 spin_unlock_irqrestore(&mdev->req_lock, flags); 252 253 if (m.bio) 254 complete_master_bio(mdev, &m); 255 } 256 257 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 258 { 259 struct drbd_request *req = container_of(w, struct drbd_request, w); 260 261 /* We should not detach for read io-error, 262 * but try to WRITE the P_DATA_REPLY to the failed location, 263 * to give the disk the chance to relocate that block */ 264 265 spin_lock_irq(&mdev->req_lock); 266 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 267 _req_mod(req, read_retry_remote_canceled); 268 spin_unlock_irq(&mdev->req_lock); 269 return 1; 270 } 271 spin_unlock_irq(&mdev->req_lock); 272 273 return w_send_read_req(mdev, w, 0); 274 } 275 276 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 277 { 278 ERR_IF(cancel) return 1; 279 dev_err(DEV, "resync inactive, but callback triggered??\n"); 280 return 1; /* Simply ignore this! */ 281 } 282 283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) 284 { 285 struct hash_desc desc; 286 struct scatterlist sg; 287 struct page *page = e->pages; 288 struct page *tmp; 289 unsigned len; 290 291 desc.tfm = tfm; 292 desc.flags = 0; 293 294 sg_init_table(&sg, 1); 295 crypto_hash_init(&desc); 296 297 while ((tmp = page_chain_next(page))) { 298 /* all but the last page will be fully used */ 299 sg_set_page(&sg, page, PAGE_SIZE, 0); 300 crypto_hash_update(&desc, &sg, sg.length); 301 page = tmp; 302 } 303 /* and now the last, possibly only partially used page */ 304 len = e->size & (PAGE_SIZE - 1); 305 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 306 crypto_hash_update(&desc, &sg, sg.length); 307 crypto_hash_final(&desc, digest); 308 } 309 310 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 311 { 312 struct hash_desc desc; 313 struct scatterlist sg; 314 struct bio_vec *bvec; 315 int i; 316 317 desc.tfm = tfm; 318 desc.flags = 0; 319 320 sg_init_table(&sg, 1); 321 crypto_hash_init(&desc); 322 323 __bio_for_each_segment(bvec, bio, i, 0) { 324 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 325 crypto_hash_update(&desc, &sg, sg.length); 326 } 327 crypto_hash_final(&desc, digest); 328 } 329 330 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 331 { 332 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 333 int digest_size; 334 void *digest; 335 int ok; 336 337 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 338 339 if (unlikely(cancel)) { 340 drbd_free_ee(mdev, e); 341 return 1; 342 } 343 344 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 345 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 346 digest = kmalloc(digest_size, GFP_NOIO); 347 if (digest) { 348 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 349 350 inc_rs_pending(mdev); 351 ok = drbd_send_drequest_csum(mdev, 352 e->sector, 353 e->size, 354 digest, 355 digest_size, 356 P_CSUM_RS_REQUEST); 357 kfree(digest); 358 } else { 359 dev_err(DEV, "kmalloc() of digest failed.\n"); 360 ok = 0; 361 } 362 } else 363 ok = 1; 364 365 drbd_free_ee(mdev, e); 366 367 if (unlikely(!ok)) 368 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 369 return ok; 370 } 371 372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 373 374 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 375 { 376 struct drbd_epoch_entry *e; 377 378 if (!get_ldev(mdev)) 379 return 0; 380 381 /* GFP_TRY, because if there is no memory available right now, this may 382 * be rescheduled for later. It is "only" background resync, after all. */ 383 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 384 if (!e) 385 goto fail; 386 387 spin_lock_irq(&mdev->req_lock); 388 list_add(&e->w.list, &mdev->read_ee); 389 spin_unlock_irq(&mdev->req_lock); 390 391 e->w.cb = w_e_send_csum; 392 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) 393 return 1; 394 395 drbd_free_ee(mdev, e); 396 fail: 397 put_ldev(mdev); 398 return 2; 399 } 400 401 void resync_timer_fn(unsigned long data) 402 { 403 unsigned long flags; 404 struct drbd_conf *mdev = (struct drbd_conf *) data; 405 int queue; 406 407 spin_lock_irqsave(&mdev->req_lock, flags); 408 409 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) { 410 queue = 1; 411 if (mdev->state.conn == C_VERIFY_S) 412 mdev->resync_work.cb = w_make_ov_request; 413 else 414 mdev->resync_work.cb = w_make_resync_request; 415 } else { 416 queue = 0; 417 mdev->resync_work.cb = w_resync_inactive; 418 } 419 420 spin_unlock_irqrestore(&mdev->req_lock, flags); 421 422 /* harmless race: list_empty outside data.work.q_lock */ 423 if (list_empty(&mdev->resync_work.list) && queue) 424 drbd_queue_work(&mdev->data.work, &mdev->resync_work); 425 } 426 427 int w_make_resync_request(struct drbd_conf *mdev, 428 struct drbd_work *w, int cancel) 429 { 430 unsigned long bit; 431 sector_t sector; 432 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 433 int max_segment_size; 434 int number, i, size, pe, mx; 435 int align, queued, sndbuf; 436 437 if (unlikely(cancel)) 438 return 1; 439 440 if (unlikely(mdev->state.conn < C_CONNECTED)) { 441 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected"); 442 return 0; 443 } 444 445 if (mdev->state.conn != C_SYNC_TARGET) 446 dev_err(DEV, "%s in w_make_resync_request\n", 447 drbd_conn_str(mdev->state.conn)); 448 449 if (!get_ldev(mdev)) { 450 /* Since we only need to access mdev->rsync a 451 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 452 to continue resync with a broken disk makes no sense at 453 all */ 454 dev_err(DEV, "Disk broke down during resync!\n"); 455 mdev->resync_work.cb = w_resync_inactive; 456 return 1; 457 } 458 459 /* starting with drbd 8.3.8, we can handle multi-bio EEs, 460 * if it should be necessary */ 461 max_segment_size = mdev->agreed_pro_version < 94 ? 462 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE; 463 464 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE / 1024) * HZ); 465 pe = atomic_read(&mdev->rs_pending_cnt); 466 467 mutex_lock(&mdev->data.mutex); 468 if (mdev->data.socket) 469 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req); 470 else 471 mx = 1; 472 mutex_unlock(&mdev->data.mutex); 473 474 /* For resync rates >160MB/sec, allow more pending RS requests */ 475 if (number > mx) 476 mx = number; 477 478 /* Limit the number of pending RS requests to no more than the peer's receive buffer */ 479 if ((pe + number) > mx) { 480 number = mx - pe; 481 } 482 483 for (i = 0; i < number; i++) { 484 /* Stop generating RS requests, when half of the send buffer is filled */ 485 mutex_lock(&mdev->data.mutex); 486 if (mdev->data.socket) { 487 queued = mdev->data.socket->sk->sk_wmem_queued; 488 sndbuf = mdev->data.socket->sk->sk_sndbuf; 489 } else { 490 queued = 1; 491 sndbuf = 0; 492 } 493 mutex_unlock(&mdev->data.mutex); 494 if (queued > sndbuf / 2) 495 goto requeue; 496 497 next_sector: 498 size = BM_BLOCK_SIZE; 499 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 500 501 if (bit == -1UL) { 502 mdev->bm_resync_fo = drbd_bm_bits(mdev); 503 mdev->resync_work.cb = w_resync_inactive; 504 put_ldev(mdev); 505 return 1; 506 } 507 508 sector = BM_BIT_TO_SECT(bit); 509 510 if (drbd_try_rs_begin_io(mdev, sector)) { 511 mdev->bm_resync_fo = bit; 512 goto requeue; 513 } 514 mdev->bm_resync_fo = bit + 1; 515 516 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 517 drbd_rs_complete_io(mdev, sector); 518 goto next_sector; 519 } 520 521 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE 522 /* try to find some adjacent bits. 523 * we stop if we have already the maximum req size. 524 * 525 * Additionally always align bigger requests, in order to 526 * be prepared for all stripe sizes of software RAIDs. 527 */ 528 align = 1; 529 for (;;) { 530 if (size + BM_BLOCK_SIZE > max_segment_size) 531 break; 532 533 /* Be always aligned */ 534 if (sector & ((1<<(align+3))-1)) 535 break; 536 537 /* do not cross extent boundaries */ 538 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 539 break; 540 /* now, is it actually dirty, after all? 541 * caution, drbd_bm_test_bit is tri-state for some 542 * obscure reason; ( b == 0 ) would get the out-of-band 543 * only accidentally right because of the "oddly sized" 544 * adjustment below */ 545 if (drbd_bm_test_bit(mdev, bit+1) != 1) 546 break; 547 bit++; 548 size += BM_BLOCK_SIZE; 549 if ((BM_BLOCK_SIZE << align) <= size) 550 align++; 551 i++; 552 } 553 /* if we merged some, 554 * reset the offset to start the next drbd_bm_find_next from */ 555 if (size > BM_BLOCK_SIZE) 556 mdev->bm_resync_fo = bit + 1; 557 #endif 558 559 /* adjust very last sectors, in case we are oddly sized */ 560 if (sector + (size>>9) > capacity) 561 size = (capacity-sector)<<9; 562 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) { 563 switch (read_for_csum(mdev, sector, size)) { 564 case 0: /* Disk failure*/ 565 put_ldev(mdev); 566 return 0; 567 case 2: /* Allocation failed */ 568 drbd_rs_complete_io(mdev, sector); 569 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 570 goto requeue; 571 /* case 1: everything ok */ 572 } 573 } else { 574 inc_rs_pending(mdev); 575 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 576 sector, size, ID_SYNCER)) { 577 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 578 dec_rs_pending(mdev); 579 put_ldev(mdev); 580 return 0; 581 } 582 } 583 } 584 585 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 586 /* last syncer _request_ was sent, 587 * but the P_RS_DATA_REPLY not yet received. sync will end (and 588 * next sync group will resume), as soon as we receive the last 589 * resync data block, and the last bit is cleared. 590 * until then resync "work" is "inactive" ... 591 */ 592 mdev->resync_work.cb = w_resync_inactive; 593 put_ldev(mdev); 594 return 1; 595 } 596 597 requeue: 598 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 599 put_ldev(mdev); 600 return 1; 601 } 602 603 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 604 { 605 int number, i, size; 606 sector_t sector; 607 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 608 609 if (unlikely(cancel)) 610 return 1; 611 612 if (unlikely(mdev->state.conn < C_CONNECTED)) { 613 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected"); 614 return 0; 615 } 616 617 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); 618 if (atomic_read(&mdev->rs_pending_cnt) > number) 619 goto requeue; 620 621 number -= atomic_read(&mdev->rs_pending_cnt); 622 623 sector = mdev->ov_position; 624 for (i = 0; i < number; i++) { 625 if (sector >= capacity) { 626 mdev->resync_work.cb = w_resync_inactive; 627 return 1; 628 } 629 630 size = BM_BLOCK_SIZE; 631 632 if (drbd_try_rs_begin_io(mdev, sector)) { 633 mdev->ov_position = sector; 634 goto requeue; 635 } 636 637 if (sector + (size>>9) > capacity) 638 size = (capacity-sector)<<9; 639 640 inc_rs_pending(mdev); 641 if (!drbd_send_ov_request(mdev, sector, size)) { 642 dec_rs_pending(mdev); 643 return 0; 644 } 645 sector += BM_SECT_PER_BIT; 646 } 647 mdev->ov_position = sector; 648 649 requeue: 650 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 651 return 1; 652 } 653 654 655 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 656 { 657 kfree(w); 658 ov_oos_print(mdev); 659 drbd_resync_finished(mdev); 660 661 return 1; 662 } 663 664 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 665 { 666 kfree(w); 667 668 drbd_resync_finished(mdev); 669 670 return 1; 671 } 672 673 int drbd_resync_finished(struct drbd_conf *mdev) 674 { 675 unsigned long db, dt, dbdt; 676 unsigned long n_oos; 677 union drbd_state os, ns; 678 struct drbd_work *w; 679 char *khelper_cmd = NULL; 680 681 /* Remove all elements from the resync LRU. Since future actions 682 * might set bits in the (main) bitmap, then the entries in the 683 * resync LRU would be wrong. */ 684 if (drbd_rs_del_all(mdev)) { 685 /* In case this is not possible now, most probably because 686 * there are P_RS_DATA_REPLY Packets lingering on the worker's 687 * queue (or even the read operations for those packets 688 * is not finished by now). Retry in 100ms. */ 689 690 drbd_kick_lo(mdev); 691 __set_current_state(TASK_INTERRUPTIBLE); 692 schedule_timeout(HZ / 10); 693 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 694 if (w) { 695 w->cb = w_resync_finished; 696 drbd_queue_work(&mdev->data.work, w); 697 return 1; 698 } 699 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 700 } 701 702 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 703 if (dt <= 0) 704 dt = 1; 705 db = mdev->rs_total; 706 dbdt = Bit2KB(db/dt); 707 mdev->rs_paused /= HZ; 708 709 if (!get_ldev(mdev)) 710 goto out; 711 712 spin_lock_irq(&mdev->req_lock); 713 os = mdev->state; 714 715 /* This protects us against multiple calls (that can happen in the presence 716 of application IO), and against connectivity loss just before we arrive here. */ 717 if (os.conn <= C_CONNECTED) 718 goto out_unlock; 719 720 ns = os; 721 ns.conn = C_CONNECTED; 722 723 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 724 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ? 725 "Online verify " : "Resync", 726 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 727 728 n_oos = drbd_bm_total_weight(mdev); 729 730 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 731 if (n_oos) { 732 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 733 n_oos, Bit2KB(1)); 734 khelper_cmd = "out-of-sync"; 735 } 736 } else { 737 D_ASSERT((n_oos - mdev->rs_failed) == 0); 738 739 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 740 khelper_cmd = "after-resync-target"; 741 742 if (mdev->csums_tfm && mdev->rs_total) { 743 const unsigned long s = mdev->rs_same_csum; 744 const unsigned long t = mdev->rs_total; 745 const int ratio = 746 (t == 0) ? 0 : 747 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 748 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; " 749 "transferred %luK total %luK\n", 750 ratio, 751 Bit2KB(mdev->rs_same_csum), 752 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 753 Bit2KB(mdev->rs_total)); 754 } 755 } 756 757 if (mdev->rs_failed) { 758 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 759 760 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 761 ns.disk = D_INCONSISTENT; 762 ns.pdsk = D_UP_TO_DATE; 763 } else { 764 ns.disk = D_UP_TO_DATE; 765 ns.pdsk = D_INCONSISTENT; 766 } 767 } else { 768 ns.disk = D_UP_TO_DATE; 769 ns.pdsk = D_UP_TO_DATE; 770 771 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 772 if (mdev->p_uuid) { 773 int i; 774 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 775 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 776 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 777 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 778 } else { 779 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 780 } 781 } 782 783 drbd_uuid_set_bm(mdev, 0UL); 784 785 if (mdev->p_uuid) { 786 /* Now the two UUID sets are equal, update what we 787 * know of the peer. */ 788 int i; 789 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 790 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 791 } 792 } 793 794 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 795 out_unlock: 796 spin_unlock_irq(&mdev->req_lock); 797 put_ldev(mdev); 798 out: 799 mdev->rs_total = 0; 800 mdev->rs_failed = 0; 801 mdev->rs_paused = 0; 802 mdev->ov_start_sector = 0; 803 804 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) { 805 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n"); 806 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished"); 807 } 808 809 if (khelper_cmd) 810 drbd_khelper(mdev, khelper_cmd); 811 812 return 1; 813 } 814 815 /* helper */ 816 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 817 { 818 if (drbd_ee_has_active_page(e)) { 819 /* This might happen if sendpage() has not finished */ 820 spin_lock_irq(&mdev->req_lock); 821 list_add_tail(&e->w.list, &mdev->net_ee); 822 spin_unlock_irq(&mdev->req_lock); 823 } else 824 drbd_free_ee(mdev, e); 825 } 826 827 /** 828 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 829 * @mdev: DRBD device. 830 * @w: work object. 831 * @cancel: The connection will be closed anyways 832 */ 833 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 834 { 835 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 836 int ok; 837 838 if (unlikely(cancel)) { 839 drbd_free_ee(mdev, e); 840 dec_unacked(mdev); 841 return 1; 842 } 843 844 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 845 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 846 } else { 847 if (__ratelimit(&drbd_ratelimit_state)) 848 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 849 (unsigned long long)e->sector); 850 851 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e); 852 } 853 854 dec_unacked(mdev); 855 856 move_to_net_ee_or_free(mdev, e); 857 858 if (unlikely(!ok)) 859 dev_err(DEV, "drbd_send_block() failed\n"); 860 return ok; 861 } 862 863 /** 864 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 865 * @mdev: DRBD device. 866 * @w: work object. 867 * @cancel: The connection will be closed anyways 868 */ 869 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 870 { 871 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 872 int ok; 873 874 if (unlikely(cancel)) { 875 drbd_free_ee(mdev, e); 876 dec_unacked(mdev); 877 return 1; 878 } 879 880 if (get_ldev_if_state(mdev, D_FAILED)) { 881 drbd_rs_complete_io(mdev, e->sector); 882 put_ldev(mdev); 883 } 884 885 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 886 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 887 inc_rs_pending(mdev); 888 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 889 } else { 890 if (__ratelimit(&drbd_ratelimit_state)) 891 dev_err(DEV, "Not sending RSDataReply, " 892 "partner DISKLESS!\n"); 893 ok = 1; 894 } 895 } else { 896 if (__ratelimit(&drbd_ratelimit_state)) 897 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 898 (unsigned long long)e->sector); 899 900 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 901 902 /* update resync data with failure */ 903 drbd_rs_failed_io(mdev, e->sector, e->size); 904 } 905 906 dec_unacked(mdev); 907 908 move_to_net_ee_or_free(mdev, e); 909 910 if (unlikely(!ok)) 911 dev_err(DEV, "drbd_send_block() failed\n"); 912 return ok; 913 } 914 915 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 916 { 917 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 918 struct digest_info *di; 919 int digest_size; 920 void *digest = NULL; 921 int ok, eq = 0; 922 923 if (unlikely(cancel)) { 924 drbd_free_ee(mdev, e); 925 dec_unacked(mdev); 926 return 1; 927 } 928 929 drbd_rs_complete_io(mdev, e->sector); 930 931 di = (struct digest_info *)(unsigned long)e->block_id; 932 933 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 934 /* quick hack to try to avoid a race against reconfiguration. 935 * a real fix would be much more involved, 936 * introducing more locking mechanisms */ 937 if (mdev->csums_tfm) { 938 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 939 D_ASSERT(digest_size == di->digest_size); 940 digest = kmalloc(digest_size, GFP_NOIO); 941 } 942 if (digest) { 943 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 944 eq = !memcmp(digest, di->digest, digest_size); 945 kfree(digest); 946 } 947 948 if (eq) { 949 drbd_set_in_sync(mdev, e->sector, e->size); 950 /* rs_same_csums unit is BM_BLOCK_SIZE */ 951 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT; 952 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e); 953 } else { 954 inc_rs_pending(mdev); 955 e->block_id = ID_SYNCER; 956 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 957 } 958 } else { 959 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 960 if (__ratelimit(&drbd_ratelimit_state)) 961 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 962 } 963 964 dec_unacked(mdev); 965 966 kfree(di); 967 968 move_to_net_ee_or_free(mdev, e); 969 970 if (unlikely(!ok)) 971 dev_err(DEV, "drbd_send_block/ack() failed\n"); 972 return ok; 973 } 974 975 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 976 { 977 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 978 int digest_size; 979 void *digest; 980 int ok = 1; 981 982 if (unlikely(cancel)) 983 goto out; 984 985 if (unlikely((e->flags & EE_WAS_ERROR) != 0)) 986 goto out; 987 988 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 989 /* FIXME if this allocation fails, online verify will not terminate! */ 990 digest = kmalloc(digest_size, GFP_NOIO); 991 if (digest) { 992 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 993 inc_rs_pending(mdev); 994 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 995 digest, digest_size, P_OV_REPLY); 996 if (!ok) 997 dec_rs_pending(mdev); 998 kfree(digest); 999 } 1000 1001 out: 1002 drbd_free_ee(mdev, e); 1003 1004 dec_unacked(mdev); 1005 1006 return ok; 1007 } 1008 1009 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1010 { 1011 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1012 mdev->ov_last_oos_size += size>>9; 1013 } else { 1014 mdev->ov_last_oos_start = sector; 1015 mdev->ov_last_oos_size = size>>9; 1016 } 1017 drbd_set_out_of_sync(mdev, sector, size); 1018 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags); 1019 } 1020 1021 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1022 { 1023 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1024 struct digest_info *di; 1025 int digest_size; 1026 void *digest; 1027 int ok, eq = 0; 1028 1029 if (unlikely(cancel)) { 1030 drbd_free_ee(mdev, e); 1031 dec_unacked(mdev); 1032 return 1; 1033 } 1034 1035 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1036 * the resync lru has been cleaned up already */ 1037 drbd_rs_complete_io(mdev, e->sector); 1038 1039 di = (struct digest_info *)(unsigned long)e->block_id; 1040 1041 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1042 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1043 digest = kmalloc(digest_size, GFP_NOIO); 1044 if (digest) { 1045 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1046 1047 D_ASSERT(digest_size == di->digest_size); 1048 eq = !memcmp(digest, di->digest, digest_size); 1049 kfree(digest); 1050 } 1051 } else { 1052 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1053 if (__ratelimit(&drbd_ratelimit_state)) 1054 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1055 } 1056 1057 dec_unacked(mdev); 1058 1059 kfree(di); 1060 1061 if (!eq) 1062 drbd_ov_oos_found(mdev, e->sector, e->size); 1063 else 1064 ov_oos_print(mdev); 1065 1066 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1067 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1068 1069 drbd_free_ee(mdev, e); 1070 1071 if (--mdev->ov_left == 0) { 1072 ov_oos_print(mdev); 1073 drbd_resync_finished(mdev); 1074 } 1075 1076 return ok; 1077 } 1078 1079 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1080 { 1081 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1082 complete(&b->done); 1083 return 1; 1084 } 1085 1086 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1087 { 1088 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1089 struct p_barrier *p = &mdev->data.sbuf.barrier; 1090 int ok = 1; 1091 1092 /* really avoid racing with tl_clear. w.cb may have been referenced 1093 * just before it was reassigned and re-queued, so double check that. 1094 * actually, this race was harmless, since we only try to send the 1095 * barrier packet here, and otherwise do nothing with the object. 1096 * but compare with the head of w_clear_epoch */ 1097 spin_lock_irq(&mdev->req_lock); 1098 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1099 cancel = 1; 1100 spin_unlock_irq(&mdev->req_lock); 1101 if (cancel) 1102 return 1; 1103 1104 if (!drbd_get_data_sock(mdev)) 1105 return 0; 1106 p->barrier = b->br_number; 1107 /* inc_ap_pending was done where this was queued. 1108 * dec_ap_pending will be done in got_BarrierAck 1109 * or (on connection loss) in w_clear_epoch. */ 1110 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER, 1111 (struct p_header *)p, sizeof(*p), 0); 1112 drbd_put_data_sock(mdev); 1113 1114 return ok; 1115 } 1116 1117 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1118 { 1119 if (cancel) 1120 return 1; 1121 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1122 } 1123 1124 /** 1125 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1126 * @mdev: DRBD device. 1127 * @w: work object. 1128 * @cancel: The connection will be closed anyways 1129 */ 1130 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1131 { 1132 struct drbd_request *req = container_of(w, struct drbd_request, w); 1133 int ok; 1134 1135 if (unlikely(cancel)) { 1136 req_mod(req, send_canceled); 1137 return 1; 1138 } 1139 1140 ok = drbd_send_dblock(mdev, req); 1141 req_mod(req, ok ? handed_over_to_network : send_failed); 1142 1143 return ok; 1144 } 1145 1146 /** 1147 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1148 * @mdev: DRBD device. 1149 * @w: work object. 1150 * @cancel: The connection will be closed anyways 1151 */ 1152 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1153 { 1154 struct drbd_request *req = container_of(w, struct drbd_request, w); 1155 int ok; 1156 1157 if (unlikely(cancel)) { 1158 req_mod(req, send_canceled); 1159 return 1; 1160 } 1161 1162 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size, 1163 (unsigned long)req); 1164 1165 if (!ok) { 1166 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1167 * so this is probably redundant */ 1168 if (mdev->state.conn >= C_CONNECTED) 1169 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1170 } 1171 req_mod(req, ok ? handed_over_to_network : send_failed); 1172 1173 return ok; 1174 } 1175 1176 static int _drbd_may_sync_now(struct drbd_conf *mdev) 1177 { 1178 struct drbd_conf *odev = mdev; 1179 1180 while (1) { 1181 if (odev->sync_conf.after == -1) 1182 return 1; 1183 odev = minor_to_mdev(odev->sync_conf.after); 1184 ERR_IF(!odev) return 1; 1185 if ((odev->state.conn >= C_SYNC_SOURCE && 1186 odev->state.conn <= C_PAUSED_SYNC_T) || 1187 odev->state.aftr_isp || odev->state.peer_isp || 1188 odev->state.user_isp) 1189 return 0; 1190 } 1191 } 1192 1193 /** 1194 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1195 * @mdev: DRBD device. 1196 * 1197 * Called from process context only (admin command and after_state_ch). 1198 */ 1199 static int _drbd_pause_after(struct drbd_conf *mdev) 1200 { 1201 struct drbd_conf *odev; 1202 int i, rv = 0; 1203 1204 for (i = 0; i < minor_count; i++) { 1205 odev = minor_to_mdev(i); 1206 if (!odev) 1207 continue; 1208 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1209 continue; 1210 if (!_drbd_may_sync_now(odev)) 1211 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1212 != SS_NOTHING_TO_DO); 1213 } 1214 1215 return rv; 1216 } 1217 1218 /** 1219 * _drbd_resume_next() - Resume resync on all devices that may resync now 1220 * @mdev: DRBD device. 1221 * 1222 * Called from process context only (admin command and worker). 1223 */ 1224 static int _drbd_resume_next(struct drbd_conf *mdev) 1225 { 1226 struct drbd_conf *odev; 1227 int i, rv = 0; 1228 1229 for (i = 0; i < minor_count; i++) { 1230 odev = minor_to_mdev(i); 1231 if (!odev) 1232 continue; 1233 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1234 continue; 1235 if (odev->state.aftr_isp) { 1236 if (_drbd_may_sync_now(odev)) 1237 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1238 CS_HARD, NULL) 1239 != SS_NOTHING_TO_DO) ; 1240 } 1241 } 1242 return rv; 1243 } 1244 1245 void resume_next_sg(struct drbd_conf *mdev) 1246 { 1247 write_lock_irq(&global_state_lock); 1248 _drbd_resume_next(mdev); 1249 write_unlock_irq(&global_state_lock); 1250 } 1251 1252 void suspend_other_sg(struct drbd_conf *mdev) 1253 { 1254 write_lock_irq(&global_state_lock); 1255 _drbd_pause_after(mdev); 1256 write_unlock_irq(&global_state_lock); 1257 } 1258 1259 static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1260 { 1261 struct drbd_conf *odev; 1262 1263 if (o_minor == -1) 1264 return NO_ERROR; 1265 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1266 return ERR_SYNC_AFTER; 1267 1268 /* check for loops */ 1269 odev = minor_to_mdev(o_minor); 1270 while (1) { 1271 if (odev == mdev) 1272 return ERR_SYNC_AFTER_CYCLE; 1273 1274 /* dependency chain ends here, no cycles. */ 1275 if (odev->sync_conf.after == -1) 1276 return NO_ERROR; 1277 1278 /* follow the dependency chain */ 1279 odev = minor_to_mdev(odev->sync_conf.after); 1280 } 1281 } 1282 1283 int drbd_alter_sa(struct drbd_conf *mdev, int na) 1284 { 1285 int changes; 1286 int retcode; 1287 1288 write_lock_irq(&global_state_lock); 1289 retcode = sync_after_error(mdev, na); 1290 if (retcode == NO_ERROR) { 1291 mdev->sync_conf.after = na; 1292 do { 1293 changes = _drbd_pause_after(mdev); 1294 changes |= _drbd_resume_next(mdev); 1295 } while (changes); 1296 } 1297 write_unlock_irq(&global_state_lock); 1298 return retcode; 1299 } 1300 1301 static void ping_peer(struct drbd_conf *mdev) 1302 { 1303 clear_bit(GOT_PING_ACK, &mdev->flags); 1304 request_ping(mdev); 1305 wait_event(mdev->misc_wait, 1306 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED); 1307 } 1308 1309 /** 1310 * drbd_start_resync() - Start the resync process 1311 * @mdev: DRBD device. 1312 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1313 * 1314 * This function might bring you directly into one of the 1315 * C_PAUSED_SYNC_* states. 1316 */ 1317 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1318 { 1319 union drbd_state ns; 1320 int r; 1321 1322 if (mdev->state.conn >= C_SYNC_SOURCE) { 1323 dev_err(DEV, "Resync already running!\n"); 1324 return; 1325 } 1326 1327 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1328 drbd_rs_cancel_all(mdev); 1329 1330 if (side == C_SYNC_TARGET) { 1331 /* Since application IO was locked out during C_WF_BITMAP_T and 1332 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1333 we check that we might make the data inconsistent. */ 1334 r = drbd_khelper(mdev, "before-resync-target"); 1335 r = (r >> 8) & 0xff; 1336 if (r > 0) { 1337 dev_info(DEV, "before-resync-target handler returned %d, " 1338 "dropping connection.\n", r); 1339 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1340 return; 1341 } 1342 } 1343 1344 drbd_state_lock(mdev); 1345 1346 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1347 drbd_state_unlock(mdev); 1348 return; 1349 } 1350 1351 if (side == C_SYNC_TARGET) { 1352 mdev->bm_resync_fo = 0; 1353 } else /* side == C_SYNC_SOURCE */ { 1354 u64 uuid; 1355 1356 get_random_bytes(&uuid, sizeof(u64)); 1357 drbd_uuid_set(mdev, UI_BITMAP, uuid); 1358 drbd_send_sync_uuid(mdev, uuid); 1359 1360 D_ASSERT(mdev->state.disk == D_UP_TO_DATE); 1361 } 1362 1363 write_lock_irq(&global_state_lock); 1364 ns = mdev->state; 1365 1366 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1367 1368 ns.conn = side; 1369 1370 if (side == C_SYNC_TARGET) 1371 ns.disk = D_INCONSISTENT; 1372 else /* side == C_SYNC_SOURCE */ 1373 ns.pdsk = D_INCONSISTENT; 1374 1375 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1376 ns = mdev->state; 1377 1378 if (ns.conn < C_CONNECTED) 1379 r = SS_UNKNOWN_ERROR; 1380 1381 if (r == SS_SUCCESS) { 1382 mdev->rs_total = 1383 mdev->rs_mark_left = drbd_bm_total_weight(mdev); 1384 mdev->rs_failed = 0; 1385 mdev->rs_paused = 0; 1386 mdev->rs_start = 1387 mdev->rs_mark_time = jiffies; 1388 mdev->rs_same_csum = 0; 1389 _drbd_pause_after(mdev); 1390 } 1391 write_unlock_irq(&global_state_lock); 1392 put_ldev(mdev); 1393 1394 if (r == SS_SUCCESS) { 1395 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1396 drbd_conn_str(ns.conn), 1397 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1398 (unsigned long) mdev->rs_total); 1399 1400 if (mdev->rs_total == 0) { 1401 /* Peer still reachable? Beware of failing before-resync-target handlers! */ 1402 ping_peer(mdev); 1403 drbd_resync_finished(mdev); 1404 } 1405 1406 /* ns.conn may already be != mdev->state.conn, 1407 * we may have been paused in between, or become paused until 1408 * the timer triggers. 1409 * No matter, that is handled in resync_timer_fn() */ 1410 if (ns.conn == C_SYNC_TARGET) 1411 mod_timer(&mdev->resync_timer, jiffies); 1412 1413 drbd_md_sync(mdev); 1414 } 1415 drbd_state_unlock(mdev); 1416 } 1417 1418 int drbd_worker(struct drbd_thread *thi) 1419 { 1420 struct drbd_conf *mdev = thi->mdev; 1421 struct drbd_work *w = NULL; 1422 LIST_HEAD(work_list); 1423 int intr = 0, i; 1424 1425 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1426 1427 while (get_t_state(thi) == Running) { 1428 drbd_thread_current_set_cpu(mdev); 1429 1430 if (down_trylock(&mdev->data.work.s)) { 1431 mutex_lock(&mdev->data.mutex); 1432 if (mdev->data.socket && !mdev->net_conf->no_cork) 1433 drbd_tcp_uncork(mdev->data.socket); 1434 mutex_unlock(&mdev->data.mutex); 1435 1436 intr = down_interruptible(&mdev->data.work.s); 1437 1438 mutex_lock(&mdev->data.mutex); 1439 if (mdev->data.socket && !mdev->net_conf->no_cork) 1440 drbd_tcp_cork(mdev->data.socket); 1441 mutex_unlock(&mdev->data.mutex); 1442 } 1443 1444 if (intr) { 1445 D_ASSERT(intr == -EINTR); 1446 flush_signals(current); 1447 ERR_IF (get_t_state(thi) == Running) 1448 continue; 1449 break; 1450 } 1451 1452 if (get_t_state(thi) != Running) 1453 break; 1454 /* With this break, we have done a down() but not consumed 1455 the entry from the list. The cleanup code takes care of 1456 this... */ 1457 1458 w = NULL; 1459 spin_lock_irq(&mdev->data.work.q_lock); 1460 ERR_IF(list_empty(&mdev->data.work.q)) { 1461 /* something terribly wrong in our logic. 1462 * we were able to down() the semaphore, 1463 * but the list is empty... doh. 1464 * 1465 * what is the best thing to do now? 1466 * try again from scratch, restarting the receiver, 1467 * asender, whatnot? could break even more ugly, 1468 * e.g. when we are primary, but no good local data. 1469 * 1470 * I'll try to get away just starting over this loop. 1471 */ 1472 spin_unlock_irq(&mdev->data.work.q_lock); 1473 continue; 1474 } 1475 w = list_entry(mdev->data.work.q.next, struct drbd_work, list); 1476 list_del_init(&w->list); 1477 spin_unlock_irq(&mdev->data.work.q_lock); 1478 1479 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1480 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1481 if (mdev->state.conn >= C_CONNECTED) 1482 drbd_force_state(mdev, 1483 NS(conn, C_NETWORK_FAILURE)); 1484 } 1485 } 1486 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1487 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1488 1489 spin_lock_irq(&mdev->data.work.q_lock); 1490 i = 0; 1491 while (!list_empty(&mdev->data.work.q)) { 1492 list_splice_init(&mdev->data.work.q, &work_list); 1493 spin_unlock_irq(&mdev->data.work.q_lock); 1494 1495 while (!list_empty(&work_list)) { 1496 w = list_entry(work_list.next, struct drbd_work, list); 1497 list_del_init(&w->list); 1498 w->cb(mdev, w, 1); 1499 i++; /* dead debugging code */ 1500 } 1501 1502 spin_lock_irq(&mdev->data.work.q_lock); 1503 } 1504 sema_init(&mdev->data.work.s, 0); 1505 /* DANGEROUS race: if someone did queue his work within the spinlock, 1506 * but up() ed outside the spinlock, we could get an up() on the 1507 * semaphore without corresponding list entry. 1508 * So don't do that. 1509 */ 1510 spin_unlock_irq(&mdev->data.work.q_lock); 1511 1512 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1513 /* _drbd_set_state only uses stop_nowait. 1514 * wait here for the Exiting receiver. */ 1515 drbd_thread_stop(&mdev->receiver); 1516 drbd_mdev_cleanup(mdev); 1517 1518 dev_info(DEV, "worker terminated\n"); 1519 1520 clear_bit(DEVICE_DYING, &mdev->flags); 1521 clear_bit(CONFIG_PENDING, &mdev->flags); 1522 wake_up(&mdev->state_wait); 1523 1524 return 0; 1525 } 1526