1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched/signal.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_protocol.h" 40 #include "drbd_req.h" 41 42 static int make_ov_request(struct drbd_device *, int); 43 static int make_resync_request(struct drbd_device *, int); 44 45 /* endio handlers: 46 * drbd_md_endio (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * drbd_bm_endio (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 /* used for synchronous meta data and bitmap IO 59 * submitted by drbd_md_sync_page_io() 60 */ 61 void drbd_md_endio(struct bio *bio) 62 { 63 struct drbd_device *device; 64 65 device = bio->bi_private; 66 device->md_io.error = blk_status_to_errno(bio->bi_status); 67 68 /* special case: drbd_md_read() during drbd_adm_attach() */ 69 if (device->ldev) 70 put_ldev(device); 71 bio_put(bio); 72 73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 74 * to timeout on the lower level device, and eventually detach from it. 75 * If this io completion runs after that timeout expired, this 76 * drbd_md_put_buffer() may allow us to finally try and re-attach. 77 * During normal operation, this only puts that extra reference 78 * down to 1 again. 79 * Make sure we first drop the reference, and only then signal 80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 81 * next drbd_md_sync_page_io(), that we trigger the 82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 83 */ 84 drbd_md_put_buffer(device); 85 device->md_io.done = 1; 86 wake_up(&device->misc_wait); 87 } 88 89 /* reads on behalf of the partner, 90 * "submitted" by the receiver 91 */ 92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 93 { 94 unsigned long flags = 0; 95 struct drbd_peer_device *peer_device = peer_req->peer_device; 96 struct drbd_device *device = peer_device->device; 97 98 spin_lock_irqsave(&device->resource->req_lock, flags); 99 device->read_cnt += peer_req->i.size >> 9; 100 list_del(&peer_req->w.list); 101 if (list_empty(&device->read_ee)) 102 wake_up(&device->ee_wait); 103 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 104 __drbd_chk_io_error(device, DRBD_READ_ERROR); 105 spin_unlock_irqrestore(&device->resource->req_lock, flags); 106 107 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 108 put_ldev(device); 109 } 110 111 /* writes on behalf of the partner, or resync writes, 112 * "submitted" by the receiver, final stage. */ 113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 114 { 115 unsigned long flags = 0; 116 struct drbd_peer_device *peer_device = peer_req->peer_device; 117 struct drbd_device *device = peer_device->device; 118 struct drbd_connection *connection = peer_device->connection; 119 struct drbd_interval i; 120 int do_wake; 121 u64 block_id; 122 int do_al_complete_io; 123 124 /* after we moved peer_req to done_ee, 125 * we may no longer access it, 126 * it may be freed/reused already! 127 * (as soon as we release the req_lock) */ 128 i = peer_req->i; 129 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 130 block_id = peer_req->block_id; 131 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 132 133 if (peer_req->flags & EE_WAS_ERROR) { 134 /* In protocol != C, we usually do not send write acks. 135 * In case of a write error, send the neg ack anyways. */ 136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags)) 137 inc_unacked(device); 138 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 139 } 140 141 spin_lock_irqsave(&device->resource->req_lock, flags); 142 device->writ_cnt += peer_req->i.size >> 9; 143 list_move_tail(&peer_req->w.list, &device->done_ee); 144 145 /* 146 * Do not remove from the write_requests tree here: we did not send the 147 * Ack yet and did not wake possibly waiting conflicting requests. 148 * Removed from the tree from "drbd_process_done_ee" within the 149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 150 * _drbd_clear_done_ee. 151 */ 152 153 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 154 155 /* FIXME do we want to detach for failed REQ_OP_DISCARD? 156 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */ 157 if (peer_req->flags & EE_WAS_ERROR) 158 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 159 160 if (connection->cstate >= C_WF_REPORT_PARAMS) { 161 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */ 162 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work)) 163 kref_put(&device->kref, drbd_destroy_device); 164 } 165 spin_unlock_irqrestore(&device->resource->req_lock, flags); 166 167 if (block_id == ID_SYNCER) 168 drbd_rs_complete_io(device, i.sector); 169 170 if (do_wake) 171 wake_up(&device->ee_wait); 172 173 if (do_al_complete_io) 174 drbd_al_complete_io(device, &i); 175 176 put_ldev(device); 177 } 178 179 /* writes on behalf of the partner, or resync writes, 180 * "submitted" by the receiver. 181 */ 182 void drbd_peer_request_endio(struct bio *bio) 183 { 184 struct drbd_peer_request *peer_req = bio->bi_private; 185 struct drbd_device *device = peer_req->peer_device->device; 186 bool is_write = bio_data_dir(bio) == WRITE; 187 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES || 188 bio_op(bio) == REQ_OP_DISCARD; 189 190 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state)) 191 drbd_warn(device, "%s: error=%d s=%llus\n", 192 is_write ? (is_discard ? "discard" : "write") 193 : "read", bio->bi_status, 194 (unsigned long long)peer_req->i.sector); 195 196 if (bio->bi_status) 197 set_bit(__EE_WAS_ERROR, &peer_req->flags); 198 199 bio_put(bio); /* no need for the bio anymore */ 200 if (atomic_dec_and_test(&peer_req->pending_bios)) { 201 if (is_write) 202 drbd_endio_write_sec_final(peer_req); 203 else 204 drbd_endio_read_sec_final(peer_req); 205 } 206 } 207 208 static void 209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device) 210 { 211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n", 212 device->minor, device->resource->name, device->vnr); 213 } 214 215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 216 */ 217 void drbd_request_endio(struct bio *bio) 218 { 219 unsigned long flags; 220 struct drbd_request *req = bio->bi_private; 221 struct drbd_device *device = req->device; 222 struct bio_and_error m; 223 enum drbd_req_event what; 224 225 /* If this request was aborted locally before, 226 * but now was completed "successfully", 227 * chances are that this caused arbitrary data corruption. 228 * 229 * "aborting" requests, or force-detaching the disk, is intended for 230 * completely blocked/hung local backing devices which do no longer 231 * complete requests at all, not even do error completions. In this 232 * situation, usually a hard-reset and failover is the only way out. 233 * 234 * By "aborting", basically faking a local error-completion, 235 * we allow for a more graceful swichover by cleanly migrating services. 236 * Still the affected node has to be rebooted "soon". 237 * 238 * By completing these requests, we allow the upper layers to re-use 239 * the associated data pages. 240 * 241 * If later the local backing device "recovers", and now DMAs some data 242 * from disk into the original request pages, in the best case it will 243 * just put random data into unused pages; but typically it will corrupt 244 * meanwhile completely unrelated data, causing all sorts of damage. 245 * 246 * Which means delayed successful completion, 247 * especially for READ requests, 248 * is a reason to panic(). 249 * 250 * We assume that a delayed *error* completion is OK, 251 * though we still will complain noisily about it. 252 */ 253 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 254 if (__ratelimit(&drbd_ratelimit_state)) 255 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 256 257 if (!bio->bi_status) 258 drbd_panic_after_delayed_completion_of_aborted_request(device); 259 } 260 261 /* to avoid recursion in __req_mod */ 262 if (unlikely(bio->bi_status)) { 263 switch (bio_op(bio)) { 264 case REQ_OP_WRITE_ZEROES: 265 case REQ_OP_DISCARD: 266 if (bio->bi_status == BLK_STS_NOTSUPP) 267 what = DISCARD_COMPLETED_NOTSUPP; 268 else 269 what = DISCARD_COMPLETED_WITH_ERROR; 270 break; 271 case REQ_OP_READ: 272 if (bio->bi_opf & REQ_RAHEAD) 273 what = READ_AHEAD_COMPLETED_WITH_ERROR; 274 else 275 what = READ_COMPLETED_WITH_ERROR; 276 break; 277 default: 278 what = WRITE_COMPLETED_WITH_ERROR; 279 break; 280 } 281 } else { 282 what = COMPLETED_OK; 283 } 284 285 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status)); 286 bio_put(bio); 287 288 /* not req_mod(), we need irqsave here! */ 289 spin_lock_irqsave(&device->resource->req_lock, flags); 290 __req_mod(req, what, &m); 291 spin_unlock_irqrestore(&device->resource->req_lock, flags); 292 put_ldev(device); 293 294 if (m.bio) 295 complete_master_bio(device, &m); 296 } 297 298 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest) 299 { 300 SHASH_DESC_ON_STACK(desc, tfm); 301 struct page *page = peer_req->pages; 302 struct page *tmp; 303 unsigned len; 304 void *src; 305 306 desc->tfm = tfm; 307 desc->flags = 0; 308 309 crypto_shash_init(desc); 310 311 src = kmap_atomic(page); 312 while ((tmp = page_chain_next(page))) { 313 /* all but the last page will be fully used */ 314 crypto_shash_update(desc, src, PAGE_SIZE); 315 kunmap_atomic(src); 316 page = tmp; 317 src = kmap_atomic(page); 318 } 319 /* and now the last, possibly only partially used page */ 320 len = peer_req->i.size & (PAGE_SIZE - 1); 321 crypto_shash_update(desc, src, len ?: PAGE_SIZE); 322 kunmap_atomic(src); 323 324 crypto_shash_final(desc, digest); 325 shash_desc_zero(desc); 326 } 327 328 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest) 329 { 330 SHASH_DESC_ON_STACK(desc, tfm); 331 struct bio_vec bvec; 332 struct bvec_iter iter; 333 334 desc->tfm = tfm; 335 desc->flags = 0; 336 337 crypto_shash_init(desc); 338 339 bio_for_each_segment(bvec, bio, iter) { 340 u8 *src; 341 342 src = kmap_atomic(bvec.bv_page); 343 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len); 344 kunmap_atomic(src); 345 346 /* REQ_OP_WRITE_SAME has only one segment, 347 * checksum the payload only once. */ 348 if (bio_op(bio) == REQ_OP_WRITE_SAME) 349 break; 350 } 351 crypto_shash_final(desc, digest); 352 shash_desc_zero(desc); 353 } 354 355 /* MAYBE merge common code with w_e_end_ov_req */ 356 static int w_e_send_csum(struct drbd_work *w, int cancel) 357 { 358 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 359 struct drbd_peer_device *peer_device = peer_req->peer_device; 360 struct drbd_device *device = peer_device->device; 361 int digest_size; 362 void *digest; 363 int err = 0; 364 365 if (unlikely(cancel)) 366 goto out; 367 368 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 369 goto out; 370 371 digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm); 372 digest = kmalloc(digest_size, GFP_NOIO); 373 if (digest) { 374 sector_t sector = peer_req->i.sector; 375 unsigned int size = peer_req->i.size; 376 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 377 /* Free peer_req and pages before send. 378 * In case we block on congestion, we could otherwise run into 379 * some distributed deadlock, if the other side blocks on 380 * congestion as well, because our receiver blocks in 381 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 382 drbd_free_peer_req(device, peer_req); 383 peer_req = NULL; 384 inc_rs_pending(device); 385 err = drbd_send_drequest_csum(peer_device, sector, size, 386 digest, digest_size, 387 P_CSUM_RS_REQUEST); 388 kfree(digest); 389 } else { 390 drbd_err(device, "kmalloc() of digest failed.\n"); 391 err = -ENOMEM; 392 } 393 394 out: 395 if (peer_req) 396 drbd_free_peer_req(device, peer_req); 397 398 if (unlikely(err)) 399 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 400 return err; 401 } 402 403 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 404 405 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 406 { 407 struct drbd_device *device = peer_device->device; 408 struct drbd_peer_request *peer_req; 409 410 if (!get_ldev(device)) 411 return -EIO; 412 413 /* GFP_TRY, because if there is no memory available right now, this may 414 * be rescheduled for later. It is "only" background resync, after all. */ 415 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 416 size, size, GFP_TRY); 417 if (!peer_req) 418 goto defer; 419 420 peer_req->w.cb = w_e_send_csum; 421 spin_lock_irq(&device->resource->req_lock); 422 list_add_tail(&peer_req->w.list, &device->read_ee); 423 spin_unlock_irq(&device->resource->req_lock); 424 425 atomic_add(size >> 9, &device->rs_sect_ev); 426 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0, 427 DRBD_FAULT_RS_RD) == 0) 428 return 0; 429 430 /* If it failed because of ENOMEM, retry should help. If it failed 431 * because bio_add_page failed (probably broken lower level driver), 432 * retry may or may not help. 433 * If it does not, you may need to force disconnect. */ 434 spin_lock_irq(&device->resource->req_lock); 435 list_del(&peer_req->w.list); 436 spin_unlock_irq(&device->resource->req_lock); 437 438 drbd_free_peer_req(device, peer_req); 439 defer: 440 put_ldev(device); 441 return -EAGAIN; 442 } 443 444 int w_resync_timer(struct drbd_work *w, int cancel) 445 { 446 struct drbd_device *device = 447 container_of(w, struct drbd_device, resync_work); 448 449 switch (device->state.conn) { 450 case C_VERIFY_S: 451 make_ov_request(device, cancel); 452 break; 453 case C_SYNC_TARGET: 454 make_resync_request(device, cancel); 455 break; 456 } 457 458 return 0; 459 } 460 461 void resync_timer_fn(struct timer_list *t) 462 { 463 struct drbd_device *device = from_timer(device, t, resync_timer); 464 465 drbd_queue_work_if_unqueued( 466 &first_peer_device(device)->connection->sender_work, 467 &device->resync_work); 468 } 469 470 static void fifo_set(struct fifo_buffer *fb, int value) 471 { 472 int i; 473 474 for (i = 0; i < fb->size; i++) 475 fb->values[i] = value; 476 } 477 478 static int fifo_push(struct fifo_buffer *fb, int value) 479 { 480 int ov; 481 482 ov = fb->values[fb->head_index]; 483 fb->values[fb->head_index++] = value; 484 485 if (fb->head_index >= fb->size) 486 fb->head_index = 0; 487 488 return ov; 489 } 490 491 static void fifo_add_val(struct fifo_buffer *fb, int value) 492 { 493 int i; 494 495 for (i = 0; i < fb->size; i++) 496 fb->values[i] += value; 497 } 498 499 struct fifo_buffer *fifo_alloc(int fifo_size) 500 { 501 struct fifo_buffer *fb; 502 503 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 504 if (!fb) 505 return NULL; 506 507 fb->head_index = 0; 508 fb->size = fifo_size; 509 fb->total = 0; 510 511 return fb; 512 } 513 514 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 515 { 516 struct disk_conf *dc; 517 unsigned int want; /* The number of sectors we want in-flight */ 518 int req_sect; /* Number of sectors to request in this turn */ 519 int correction; /* Number of sectors more we need in-flight */ 520 int cps; /* correction per invocation of drbd_rs_controller() */ 521 int steps; /* Number of time steps to plan ahead */ 522 int curr_corr; 523 int max_sect; 524 struct fifo_buffer *plan; 525 526 dc = rcu_dereference(device->ldev->disk_conf); 527 plan = rcu_dereference(device->rs_plan_s); 528 529 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 530 531 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 532 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 533 } else { /* normal path */ 534 want = dc->c_fill_target ? dc->c_fill_target : 535 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 536 } 537 538 correction = want - device->rs_in_flight - plan->total; 539 540 /* Plan ahead */ 541 cps = correction / steps; 542 fifo_add_val(plan, cps); 543 plan->total += cps * steps; 544 545 /* What we do in this step */ 546 curr_corr = fifo_push(plan, 0); 547 plan->total -= curr_corr; 548 549 req_sect = sect_in + curr_corr; 550 if (req_sect < 0) 551 req_sect = 0; 552 553 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 554 if (req_sect > max_sect) 555 req_sect = max_sect; 556 557 /* 558 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 559 sect_in, device->rs_in_flight, want, correction, 560 steps, cps, device->rs_planed, curr_corr, req_sect); 561 */ 562 563 return req_sect; 564 } 565 566 static int drbd_rs_number_requests(struct drbd_device *device) 567 { 568 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 569 int number, mxb; 570 571 sect_in = atomic_xchg(&device->rs_sect_in, 0); 572 device->rs_in_flight -= sect_in; 573 574 rcu_read_lock(); 575 mxb = drbd_get_max_buffers(device) / 2; 576 if (rcu_dereference(device->rs_plan_s)->size) { 577 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 578 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 579 } else { 580 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 581 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 582 } 583 rcu_read_unlock(); 584 585 /* Don't have more than "max-buffers"/2 in-flight. 586 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 587 * potentially causing a distributed deadlock on congestion during 588 * online-verify or (checksum-based) resync, if max-buffers, 589 * socket buffer sizes and resync rate settings are mis-configured. */ 590 591 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 592 * mxb (as used here, and in drbd_alloc_pages on the peer) is 593 * "number of pages" (typically also 4k), 594 * but "rs_in_flight" is in "sectors" (512 Byte). */ 595 if (mxb - device->rs_in_flight/8 < number) 596 number = mxb - device->rs_in_flight/8; 597 598 return number; 599 } 600 601 static int make_resync_request(struct drbd_device *const device, int cancel) 602 { 603 struct drbd_peer_device *const peer_device = first_peer_device(device); 604 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 605 unsigned long bit; 606 sector_t sector; 607 const sector_t capacity = drbd_get_capacity(device->this_bdev); 608 int max_bio_size; 609 int number, rollback_i, size; 610 int align, requeue = 0; 611 int i = 0; 612 int discard_granularity = 0; 613 614 if (unlikely(cancel)) 615 return 0; 616 617 if (device->rs_total == 0) { 618 /* empty resync? */ 619 drbd_resync_finished(device); 620 return 0; 621 } 622 623 if (!get_ldev(device)) { 624 /* Since we only need to access device->rsync a 625 get_ldev_if_state(device,D_FAILED) would be sufficient, but 626 to continue resync with a broken disk makes no sense at 627 all */ 628 drbd_err(device, "Disk broke down during resync!\n"); 629 return 0; 630 } 631 632 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) { 633 rcu_read_lock(); 634 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity; 635 rcu_read_unlock(); 636 } 637 638 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 639 number = drbd_rs_number_requests(device); 640 if (number <= 0) 641 goto requeue; 642 643 for (i = 0; i < number; i++) { 644 /* Stop generating RS requests when half of the send buffer is filled, 645 * but notify TCP that we'd like to have more space. */ 646 mutex_lock(&connection->data.mutex); 647 if (connection->data.socket) { 648 struct sock *sk = connection->data.socket->sk; 649 int queued = sk->sk_wmem_queued; 650 int sndbuf = sk->sk_sndbuf; 651 if (queued > sndbuf / 2) { 652 requeue = 1; 653 if (sk->sk_socket) 654 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 655 } 656 } else 657 requeue = 1; 658 mutex_unlock(&connection->data.mutex); 659 if (requeue) 660 goto requeue; 661 662 next_sector: 663 size = BM_BLOCK_SIZE; 664 bit = drbd_bm_find_next(device, device->bm_resync_fo); 665 666 if (bit == DRBD_END_OF_BITMAP) { 667 device->bm_resync_fo = drbd_bm_bits(device); 668 put_ldev(device); 669 return 0; 670 } 671 672 sector = BM_BIT_TO_SECT(bit); 673 674 if (drbd_try_rs_begin_io(device, sector)) { 675 device->bm_resync_fo = bit; 676 goto requeue; 677 } 678 device->bm_resync_fo = bit + 1; 679 680 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 681 drbd_rs_complete_io(device, sector); 682 goto next_sector; 683 } 684 685 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 686 /* try to find some adjacent bits. 687 * we stop if we have already the maximum req size. 688 * 689 * Additionally always align bigger requests, in order to 690 * be prepared for all stripe sizes of software RAIDs. 691 */ 692 align = 1; 693 rollback_i = i; 694 while (i < number) { 695 if (size + BM_BLOCK_SIZE > max_bio_size) 696 break; 697 698 /* Be always aligned */ 699 if (sector & ((1<<(align+3))-1)) 700 break; 701 702 if (discard_granularity && size == discard_granularity) 703 break; 704 705 /* do not cross extent boundaries */ 706 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 707 break; 708 /* now, is it actually dirty, after all? 709 * caution, drbd_bm_test_bit is tri-state for some 710 * obscure reason; ( b == 0 ) would get the out-of-band 711 * only accidentally right because of the "oddly sized" 712 * adjustment below */ 713 if (drbd_bm_test_bit(device, bit+1) != 1) 714 break; 715 bit++; 716 size += BM_BLOCK_SIZE; 717 if ((BM_BLOCK_SIZE << align) <= size) 718 align++; 719 i++; 720 } 721 /* if we merged some, 722 * reset the offset to start the next drbd_bm_find_next from */ 723 if (size > BM_BLOCK_SIZE) 724 device->bm_resync_fo = bit + 1; 725 #endif 726 727 /* adjust very last sectors, in case we are oddly sized */ 728 if (sector + (size>>9) > capacity) 729 size = (capacity-sector)<<9; 730 731 if (device->use_csums) { 732 switch (read_for_csum(peer_device, sector, size)) { 733 case -EIO: /* Disk failure */ 734 put_ldev(device); 735 return -EIO; 736 case -EAGAIN: /* allocation failed, or ldev busy */ 737 drbd_rs_complete_io(device, sector); 738 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 739 i = rollback_i; 740 goto requeue; 741 case 0: 742 /* everything ok */ 743 break; 744 default: 745 BUG(); 746 } 747 } else { 748 int err; 749 750 inc_rs_pending(device); 751 err = drbd_send_drequest(peer_device, 752 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST, 753 sector, size, ID_SYNCER); 754 if (err) { 755 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 756 dec_rs_pending(device); 757 put_ldev(device); 758 return err; 759 } 760 } 761 } 762 763 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 764 /* last syncer _request_ was sent, 765 * but the P_RS_DATA_REPLY not yet received. sync will end (and 766 * next sync group will resume), as soon as we receive the last 767 * resync data block, and the last bit is cleared. 768 * until then resync "work" is "inactive" ... 769 */ 770 put_ldev(device); 771 return 0; 772 } 773 774 requeue: 775 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 776 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 777 put_ldev(device); 778 return 0; 779 } 780 781 static int make_ov_request(struct drbd_device *device, int cancel) 782 { 783 int number, i, size; 784 sector_t sector; 785 const sector_t capacity = drbd_get_capacity(device->this_bdev); 786 bool stop_sector_reached = false; 787 788 if (unlikely(cancel)) 789 return 1; 790 791 number = drbd_rs_number_requests(device); 792 793 sector = device->ov_position; 794 for (i = 0; i < number; i++) { 795 if (sector >= capacity) 796 return 1; 797 798 /* We check for "finished" only in the reply path: 799 * w_e_end_ov_reply(). 800 * We need to send at least one request out. */ 801 stop_sector_reached = i > 0 802 && verify_can_do_stop_sector(device) 803 && sector >= device->ov_stop_sector; 804 if (stop_sector_reached) 805 break; 806 807 size = BM_BLOCK_SIZE; 808 809 if (drbd_try_rs_begin_io(device, sector)) { 810 device->ov_position = sector; 811 goto requeue; 812 } 813 814 if (sector + (size>>9) > capacity) 815 size = (capacity-sector)<<9; 816 817 inc_rs_pending(device); 818 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 819 dec_rs_pending(device); 820 return 0; 821 } 822 sector += BM_SECT_PER_BIT; 823 } 824 device->ov_position = sector; 825 826 requeue: 827 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 828 if (i == 0 || !stop_sector_reached) 829 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 830 return 1; 831 } 832 833 int w_ov_finished(struct drbd_work *w, int cancel) 834 { 835 struct drbd_device_work *dw = 836 container_of(w, struct drbd_device_work, w); 837 struct drbd_device *device = dw->device; 838 kfree(dw); 839 ov_out_of_sync_print(device); 840 drbd_resync_finished(device); 841 842 return 0; 843 } 844 845 static int w_resync_finished(struct drbd_work *w, int cancel) 846 { 847 struct drbd_device_work *dw = 848 container_of(w, struct drbd_device_work, w); 849 struct drbd_device *device = dw->device; 850 kfree(dw); 851 852 drbd_resync_finished(device); 853 854 return 0; 855 } 856 857 static void ping_peer(struct drbd_device *device) 858 { 859 struct drbd_connection *connection = first_peer_device(device)->connection; 860 861 clear_bit(GOT_PING_ACK, &connection->flags); 862 request_ping(connection); 863 wait_event(connection->ping_wait, 864 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 865 } 866 867 int drbd_resync_finished(struct drbd_device *device) 868 { 869 struct drbd_connection *connection = first_peer_device(device)->connection; 870 unsigned long db, dt, dbdt; 871 unsigned long n_oos; 872 union drbd_state os, ns; 873 struct drbd_device_work *dw; 874 char *khelper_cmd = NULL; 875 int verify_done = 0; 876 877 /* Remove all elements from the resync LRU. Since future actions 878 * might set bits in the (main) bitmap, then the entries in the 879 * resync LRU would be wrong. */ 880 if (drbd_rs_del_all(device)) { 881 /* In case this is not possible now, most probably because 882 * there are P_RS_DATA_REPLY Packets lingering on the worker's 883 * queue (or even the read operations for those packets 884 * is not finished by now). Retry in 100ms. */ 885 886 schedule_timeout_interruptible(HZ / 10); 887 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 888 if (dw) { 889 dw->w.cb = w_resync_finished; 890 dw->device = device; 891 drbd_queue_work(&connection->sender_work, &dw->w); 892 return 1; 893 } 894 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 895 } 896 897 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 898 if (dt <= 0) 899 dt = 1; 900 901 db = device->rs_total; 902 /* adjust for verify start and stop sectors, respective reached position */ 903 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 904 db -= device->ov_left; 905 906 dbdt = Bit2KB(db/dt); 907 device->rs_paused /= HZ; 908 909 if (!get_ldev(device)) 910 goto out; 911 912 ping_peer(device); 913 914 spin_lock_irq(&device->resource->req_lock); 915 os = drbd_read_state(device); 916 917 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 918 919 /* This protects us against multiple calls (that can happen in the presence 920 of application IO), and against connectivity loss just before we arrive here. */ 921 if (os.conn <= C_CONNECTED) 922 goto out_unlock; 923 924 ns = os; 925 ns.conn = C_CONNECTED; 926 927 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 928 verify_done ? "Online verify" : "Resync", 929 dt + device->rs_paused, device->rs_paused, dbdt); 930 931 n_oos = drbd_bm_total_weight(device); 932 933 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 934 if (n_oos) { 935 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 936 n_oos, Bit2KB(1)); 937 khelper_cmd = "out-of-sync"; 938 } 939 } else { 940 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 941 942 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 943 khelper_cmd = "after-resync-target"; 944 945 if (device->use_csums && device->rs_total) { 946 const unsigned long s = device->rs_same_csum; 947 const unsigned long t = device->rs_total; 948 const int ratio = 949 (t == 0) ? 0 : 950 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 951 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 952 "transferred %luK total %luK\n", 953 ratio, 954 Bit2KB(device->rs_same_csum), 955 Bit2KB(device->rs_total - device->rs_same_csum), 956 Bit2KB(device->rs_total)); 957 } 958 } 959 960 if (device->rs_failed) { 961 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 962 963 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 964 ns.disk = D_INCONSISTENT; 965 ns.pdsk = D_UP_TO_DATE; 966 } else { 967 ns.disk = D_UP_TO_DATE; 968 ns.pdsk = D_INCONSISTENT; 969 } 970 } else { 971 ns.disk = D_UP_TO_DATE; 972 ns.pdsk = D_UP_TO_DATE; 973 974 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 975 if (device->p_uuid) { 976 int i; 977 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 978 _drbd_uuid_set(device, i, device->p_uuid[i]); 979 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 980 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 981 } else { 982 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 983 } 984 } 985 986 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 987 /* for verify runs, we don't update uuids here, 988 * so there would be nothing to report. */ 989 drbd_uuid_set_bm(device, 0UL); 990 drbd_print_uuids(device, "updated UUIDs"); 991 if (device->p_uuid) { 992 /* Now the two UUID sets are equal, update what we 993 * know of the peer. */ 994 int i; 995 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 996 device->p_uuid[i] = device->ldev->md.uuid[i]; 997 } 998 } 999 } 1000 1001 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1002 out_unlock: 1003 spin_unlock_irq(&device->resource->req_lock); 1004 1005 /* If we have been sync source, and have an effective fencing-policy, 1006 * once *all* volumes are back in sync, call "unfence". */ 1007 if (os.conn == C_SYNC_SOURCE) { 1008 enum drbd_disk_state disk_state = D_MASK; 1009 enum drbd_disk_state pdsk_state = D_MASK; 1010 enum drbd_fencing_p fp = FP_DONT_CARE; 1011 1012 rcu_read_lock(); 1013 fp = rcu_dereference(device->ldev->disk_conf)->fencing; 1014 if (fp != FP_DONT_CARE) { 1015 struct drbd_peer_device *peer_device; 1016 int vnr; 1017 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1018 struct drbd_device *device = peer_device->device; 1019 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk); 1020 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk); 1021 } 1022 } 1023 rcu_read_unlock(); 1024 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE) 1025 conn_khelper(connection, "unfence-peer"); 1026 } 1027 1028 put_ldev(device); 1029 out: 1030 device->rs_total = 0; 1031 device->rs_failed = 0; 1032 device->rs_paused = 0; 1033 1034 /* reset start sector, if we reached end of device */ 1035 if (verify_done && device->ov_left == 0) 1036 device->ov_start_sector = 0; 1037 1038 drbd_md_sync(device); 1039 1040 if (khelper_cmd) 1041 drbd_khelper(device, khelper_cmd); 1042 1043 return 1; 1044 } 1045 1046 /* helper */ 1047 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 1048 { 1049 if (drbd_peer_req_has_active_page(peer_req)) { 1050 /* This might happen if sendpage() has not finished */ 1051 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 1052 atomic_add(i, &device->pp_in_use_by_net); 1053 atomic_sub(i, &device->pp_in_use); 1054 spin_lock_irq(&device->resource->req_lock); 1055 list_add_tail(&peer_req->w.list, &device->net_ee); 1056 spin_unlock_irq(&device->resource->req_lock); 1057 wake_up(&drbd_pp_wait); 1058 } else 1059 drbd_free_peer_req(device, peer_req); 1060 } 1061 1062 /** 1063 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1064 * @w: work object. 1065 * @cancel: The connection will be closed anyways 1066 */ 1067 int w_e_end_data_req(struct drbd_work *w, int cancel) 1068 { 1069 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1070 struct drbd_peer_device *peer_device = peer_req->peer_device; 1071 struct drbd_device *device = peer_device->device; 1072 int err; 1073 1074 if (unlikely(cancel)) { 1075 drbd_free_peer_req(device, peer_req); 1076 dec_unacked(device); 1077 return 0; 1078 } 1079 1080 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1081 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1082 } else { 1083 if (__ratelimit(&drbd_ratelimit_state)) 1084 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1085 (unsigned long long)peer_req->i.sector); 1086 1087 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1088 } 1089 1090 dec_unacked(device); 1091 1092 move_to_net_ee_or_free(device, peer_req); 1093 1094 if (unlikely(err)) 1095 drbd_err(device, "drbd_send_block() failed\n"); 1096 return err; 1097 } 1098 1099 static bool all_zero(struct drbd_peer_request *peer_req) 1100 { 1101 struct page *page = peer_req->pages; 1102 unsigned int len = peer_req->i.size; 1103 1104 page_chain_for_each(page) { 1105 unsigned int l = min_t(unsigned int, len, PAGE_SIZE); 1106 unsigned int i, words = l / sizeof(long); 1107 unsigned long *d; 1108 1109 d = kmap_atomic(page); 1110 for (i = 0; i < words; i++) { 1111 if (d[i]) { 1112 kunmap_atomic(d); 1113 return false; 1114 } 1115 } 1116 kunmap_atomic(d); 1117 len -= l; 1118 } 1119 1120 return true; 1121 } 1122 1123 /** 1124 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1125 * @w: work object. 1126 * @cancel: The connection will be closed anyways 1127 */ 1128 int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1129 { 1130 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1131 struct drbd_peer_device *peer_device = peer_req->peer_device; 1132 struct drbd_device *device = peer_device->device; 1133 int err; 1134 1135 if (unlikely(cancel)) { 1136 drbd_free_peer_req(device, peer_req); 1137 dec_unacked(device); 1138 return 0; 1139 } 1140 1141 if (get_ldev_if_state(device, D_FAILED)) { 1142 drbd_rs_complete_io(device, peer_req->i.sector); 1143 put_ldev(device); 1144 } 1145 1146 if (device->state.conn == C_AHEAD) { 1147 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1148 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1149 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1150 inc_rs_pending(device); 1151 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req)) 1152 err = drbd_send_rs_deallocated(peer_device, peer_req); 1153 else 1154 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1155 } else { 1156 if (__ratelimit(&drbd_ratelimit_state)) 1157 drbd_err(device, "Not sending RSDataReply, " 1158 "partner DISKLESS!\n"); 1159 err = 0; 1160 } 1161 } else { 1162 if (__ratelimit(&drbd_ratelimit_state)) 1163 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1164 (unsigned long long)peer_req->i.sector); 1165 1166 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1167 1168 /* update resync data with failure */ 1169 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1170 } 1171 1172 dec_unacked(device); 1173 1174 move_to_net_ee_or_free(device, peer_req); 1175 1176 if (unlikely(err)) 1177 drbd_err(device, "drbd_send_block() failed\n"); 1178 return err; 1179 } 1180 1181 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1182 { 1183 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1184 struct drbd_peer_device *peer_device = peer_req->peer_device; 1185 struct drbd_device *device = peer_device->device; 1186 struct digest_info *di; 1187 int digest_size; 1188 void *digest = NULL; 1189 int err, eq = 0; 1190 1191 if (unlikely(cancel)) { 1192 drbd_free_peer_req(device, peer_req); 1193 dec_unacked(device); 1194 return 0; 1195 } 1196 1197 if (get_ldev(device)) { 1198 drbd_rs_complete_io(device, peer_req->i.sector); 1199 put_ldev(device); 1200 } 1201 1202 di = peer_req->digest; 1203 1204 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1205 /* quick hack to try to avoid a race against reconfiguration. 1206 * a real fix would be much more involved, 1207 * introducing more locking mechanisms */ 1208 if (peer_device->connection->csums_tfm) { 1209 digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm); 1210 D_ASSERT(device, digest_size == di->digest_size); 1211 digest = kmalloc(digest_size, GFP_NOIO); 1212 } 1213 if (digest) { 1214 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1215 eq = !memcmp(digest, di->digest, digest_size); 1216 kfree(digest); 1217 } 1218 1219 if (eq) { 1220 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1221 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1222 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1223 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1224 } else { 1225 inc_rs_pending(device); 1226 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1227 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1228 kfree(di); 1229 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1230 } 1231 } else { 1232 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1233 if (__ratelimit(&drbd_ratelimit_state)) 1234 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1235 } 1236 1237 dec_unacked(device); 1238 move_to_net_ee_or_free(device, peer_req); 1239 1240 if (unlikely(err)) 1241 drbd_err(device, "drbd_send_block/ack() failed\n"); 1242 return err; 1243 } 1244 1245 int w_e_end_ov_req(struct drbd_work *w, int cancel) 1246 { 1247 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1248 struct drbd_peer_device *peer_device = peer_req->peer_device; 1249 struct drbd_device *device = peer_device->device; 1250 sector_t sector = peer_req->i.sector; 1251 unsigned int size = peer_req->i.size; 1252 int digest_size; 1253 void *digest; 1254 int err = 0; 1255 1256 if (unlikely(cancel)) 1257 goto out; 1258 1259 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm); 1260 digest = kmalloc(digest_size, GFP_NOIO); 1261 if (!digest) { 1262 err = 1; /* terminate the connection in case the allocation failed */ 1263 goto out; 1264 } 1265 1266 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1267 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1268 else 1269 memset(digest, 0, digest_size); 1270 1271 /* Free e and pages before send. 1272 * In case we block on congestion, we could otherwise run into 1273 * some distributed deadlock, if the other side blocks on 1274 * congestion as well, because our receiver blocks in 1275 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1276 drbd_free_peer_req(device, peer_req); 1277 peer_req = NULL; 1278 inc_rs_pending(device); 1279 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1280 if (err) 1281 dec_rs_pending(device); 1282 kfree(digest); 1283 1284 out: 1285 if (peer_req) 1286 drbd_free_peer_req(device, peer_req); 1287 dec_unacked(device); 1288 return err; 1289 } 1290 1291 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1292 { 1293 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1294 device->ov_last_oos_size += size>>9; 1295 } else { 1296 device->ov_last_oos_start = sector; 1297 device->ov_last_oos_size = size>>9; 1298 } 1299 drbd_set_out_of_sync(device, sector, size); 1300 } 1301 1302 int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1303 { 1304 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1305 struct drbd_peer_device *peer_device = peer_req->peer_device; 1306 struct drbd_device *device = peer_device->device; 1307 struct digest_info *di; 1308 void *digest; 1309 sector_t sector = peer_req->i.sector; 1310 unsigned int size = peer_req->i.size; 1311 int digest_size; 1312 int err, eq = 0; 1313 bool stop_sector_reached = false; 1314 1315 if (unlikely(cancel)) { 1316 drbd_free_peer_req(device, peer_req); 1317 dec_unacked(device); 1318 return 0; 1319 } 1320 1321 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1322 * the resync lru has been cleaned up already */ 1323 if (get_ldev(device)) { 1324 drbd_rs_complete_io(device, peer_req->i.sector); 1325 put_ldev(device); 1326 } 1327 1328 di = peer_req->digest; 1329 1330 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1331 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm); 1332 digest = kmalloc(digest_size, GFP_NOIO); 1333 if (digest) { 1334 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1335 1336 D_ASSERT(device, digest_size == di->digest_size); 1337 eq = !memcmp(digest, di->digest, digest_size); 1338 kfree(digest); 1339 } 1340 } 1341 1342 /* Free peer_req and pages before send. 1343 * In case we block on congestion, we could otherwise run into 1344 * some distributed deadlock, if the other side blocks on 1345 * congestion as well, because our receiver blocks in 1346 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1347 drbd_free_peer_req(device, peer_req); 1348 if (!eq) 1349 drbd_ov_out_of_sync_found(device, sector, size); 1350 else 1351 ov_out_of_sync_print(device); 1352 1353 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1354 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1355 1356 dec_unacked(device); 1357 1358 --device->ov_left; 1359 1360 /* let's advance progress step marks only for every other megabyte */ 1361 if ((device->ov_left & 0x200) == 0x200) 1362 drbd_advance_rs_marks(device, device->ov_left); 1363 1364 stop_sector_reached = verify_can_do_stop_sector(device) && 1365 (sector + (size>>9)) >= device->ov_stop_sector; 1366 1367 if (device->ov_left == 0 || stop_sector_reached) { 1368 ov_out_of_sync_print(device); 1369 drbd_resync_finished(device); 1370 } 1371 1372 return err; 1373 } 1374 1375 /* FIXME 1376 * We need to track the number of pending barrier acks, 1377 * and to be able to wait for them. 1378 * See also comment in drbd_adm_attach before drbd_suspend_io. 1379 */ 1380 static int drbd_send_barrier(struct drbd_connection *connection) 1381 { 1382 struct p_barrier *p; 1383 struct drbd_socket *sock; 1384 1385 sock = &connection->data; 1386 p = conn_prepare_command(connection, sock); 1387 if (!p) 1388 return -EIO; 1389 p->barrier = connection->send.current_epoch_nr; 1390 p->pad = 0; 1391 connection->send.current_epoch_writes = 0; 1392 connection->send.last_sent_barrier_jif = jiffies; 1393 1394 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1395 } 1396 1397 static int pd_send_unplug_remote(struct drbd_peer_device *pd) 1398 { 1399 struct drbd_socket *sock = &pd->connection->data; 1400 if (!drbd_prepare_command(pd, sock)) 1401 return -EIO; 1402 return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1403 } 1404 1405 int w_send_write_hint(struct drbd_work *w, int cancel) 1406 { 1407 struct drbd_device *device = 1408 container_of(w, struct drbd_device, unplug_work); 1409 1410 if (cancel) 1411 return 0; 1412 return pd_send_unplug_remote(first_peer_device(device)); 1413 } 1414 1415 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1416 { 1417 if (!connection->send.seen_any_write_yet) { 1418 connection->send.seen_any_write_yet = true; 1419 connection->send.current_epoch_nr = epoch; 1420 connection->send.current_epoch_writes = 0; 1421 connection->send.last_sent_barrier_jif = jiffies; 1422 } 1423 } 1424 1425 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1426 { 1427 /* re-init if first write on this connection */ 1428 if (!connection->send.seen_any_write_yet) 1429 return; 1430 if (connection->send.current_epoch_nr != epoch) { 1431 if (connection->send.current_epoch_writes) 1432 drbd_send_barrier(connection); 1433 connection->send.current_epoch_nr = epoch; 1434 } 1435 } 1436 1437 int w_send_out_of_sync(struct drbd_work *w, int cancel) 1438 { 1439 struct drbd_request *req = container_of(w, struct drbd_request, w); 1440 struct drbd_device *device = req->device; 1441 struct drbd_peer_device *const peer_device = first_peer_device(device); 1442 struct drbd_connection *const connection = peer_device->connection; 1443 int err; 1444 1445 if (unlikely(cancel)) { 1446 req_mod(req, SEND_CANCELED); 1447 return 0; 1448 } 1449 req->pre_send_jif = jiffies; 1450 1451 /* this time, no connection->send.current_epoch_writes++; 1452 * If it was sent, it was the closing barrier for the last 1453 * replicated epoch, before we went into AHEAD mode. 1454 * No more barriers will be sent, until we leave AHEAD mode again. */ 1455 maybe_send_barrier(connection, req->epoch); 1456 1457 err = drbd_send_out_of_sync(peer_device, req); 1458 req_mod(req, OOS_HANDED_TO_NETWORK); 1459 1460 return err; 1461 } 1462 1463 /** 1464 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1465 * @w: work object. 1466 * @cancel: The connection will be closed anyways 1467 */ 1468 int w_send_dblock(struct drbd_work *w, int cancel) 1469 { 1470 struct drbd_request *req = container_of(w, struct drbd_request, w); 1471 struct drbd_device *device = req->device; 1472 struct drbd_peer_device *const peer_device = first_peer_device(device); 1473 struct drbd_connection *connection = peer_device->connection; 1474 bool do_send_unplug = req->rq_state & RQ_UNPLUG; 1475 int err; 1476 1477 if (unlikely(cancel)) { 1478 req_mod(req, SEND_CANCELED); 1479 return 0; 1480 } 1481 req->pre_send_jif = jiffies; 1482 1483 re_init_if_first_write(connection, req->epoch); 1484 maybe_send_barrier(connection, req->epoch); 1485 connection->send.current_epoch_writes++; 1486 1487 err = drbd_send_dblock(peer_device, req); 1488 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1489 1490 if (do_send_unplug && !err) 1491 pd_send_unplug_remote(peer_device); 1492 1493 return err; 1494 } 1495 1496 /** 1497 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1498 * @w: work object. 1499 * @cancel: The connection will be closed anyways 1500 */ 1501 int w_send_read_req(struct drbd_work *w, int cancel) 1502 { 1503 struct drbd_request *req = container_of(w, struct drbd_request, w); 1504 struct drbd_device *device = req->device; 1505 struct drbd_peer_device *const peer_device = first_peer_device(device); 1506 struct drbd_connection *connection = peer_device->connection; 1507 bool do_send_unplug = req->rq_state & RQ_UNPLUG; 1508 int err; 1509 1510 if (unlikely(cancel)) { 1511 req_mod(req, SEND_CANCELED); 1512 return 0; 1513 } 1514 req->pre_send_jif = jiffies; 1515 1516 /* Even read requests may close a write epoch, 1517 * if there was any yet. */ 1518 maybe_send_barrier(connection, req->epoch); 1519 1520 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1521 (unsigned long)req); 1522 1523 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1524 1525 if (do_send_unplug && !err) 1526 pd_send_unplug_remote(peer_device); 1527 1528 return err; 1529 } 1530 1531 int w_restart_disk_io(struct drbd_work *w, int cancel) 1532 { 1533 struct drbd_request *req = container_of(w, struct drbd_request, w); 1534 struct drbd_device *device = req->device; 1535 1536 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1537 drbd_al_begin_io(device, &req->i); 1538 1539 drbd_req_make_private_bio(req, req->master_bio); 1540 bio_set_dev(req->private_bio, device->ldev->backing_bdev); 1541 generic_make_request(req->private_bio); 1542 1543 return 0; 1544 } 1545 1546 static int _drbd_may_sync_now(struct drbd_device *device) 1547 { 1548 struct drbd_device *odev = device; 1549 int resync_after; 1550 1551 while (1) { 1552 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1553 return 1; 1554 rcu_read_lock(); 1555 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1556 rcu_read_unlock(); 1557 if (resync_after == -1) 1558 return 1; 1559 odev = minor_to_device(resync_after); 1560 if (!odev) 1561 return 1; 1562 if ((odev->state.conn >= C_SYNC_SOURCE && 1563 odev->state.conn <= C_PAUSED_SYNC_T) || 1564 odev->state.aftr_isp || odev->state.peer_isp || 1565 odev->state.user_isp) 1566 return 0; 1567 } 1568 } 1569 1570 /** 1571 * drbd_pause_after() - Pause resync on all devices that may not resync now 1572 * @device: DRBD device. 1573 * 1574 * Called from process context only (admin command and after_state_ch). 1575 */ 1576 static bool drbd_pause_after(struct drbd_device *device) 1577 { 1578 bool changed = false; 1579 struct drbd_device *odev; 1580 int i; 1581 1582 rcu_read_lock(); 1583 idr_for_each_entry(&drbd_devices, odev, i) { 1584 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1585 continue; 1586 if (!_drbd_may_sync_now(odev) && 1587 _drbd_set_state(_NS(odev, aftr_isp, 1), 1588 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1589 changed = true; 1590 } 1591 rcu_read_unlock(); 1592 1593 return changed; 1594 } 1595 1596 /** 1597 * drbd_resume_next() - Resume resync on all devices that may resync now 1598 * @device: DRBD device. 1599 * 1600 * Called from process context only (admin command and worker). 1601 */ 1602 static bool drbd_resume_next(struct drbd_device *device) 1603 { 1604 bool changed = false; 1605 struct drbd_device *odev; 1606 int i; 1607 1608 rcu_read_lock(); 1609 idr_for_each_entry(&drbd_devices, odev, i) { 1610 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1611 continue; 1612 if (odev->state.aftr_isp) { 1613 if (_drbd_may_sync_now(odev) && 1614 _drbd_set_state(_NS(odev, aftr_isp, 0), 1615 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1616 changed = true; 1617 } 1618 } 1619 rcu_read_unlock(); 1620 return changed; 1621 } 1622 1623 void resume_next_sg(struct drbd_device *device) 1624 { 1625 lock_all_resources(); 1626 drbd_resume_next(device); 1627 unlock_all_resources(); 1628 } 1629 1630 void suspend_other_sg(struct drbd_device *device) 1631 { 1632 lock_all_resources(); 1633 drbd_pause_after(device); 1634 unlock_all_resources(); 1635 } 1636 1637 /* caller must lock_all_resources() */ 1638 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1639 { 1640 struct drbd_device *odev; 1641 int resync_after; 1642 1643 if (o_minor == -1) 1644 return NO_ERROR; 1645 if (o_minor < -1 || o_minor > MINORMASK) 1646 return ERR_RESYNC_AFTER; 1647 1648 /* check for loops */ 1649 odev = minor_to_device(o_minor); 1650 while (1) { 1651 if (odev == device) 1652 return ERR_RESYNC_AFTER_CYCLE; 1653 1654 /* You are free to depend on diskless, non-existing, 1655 * or not yet/no longer existing minors. 1656 * We only reject dependency loops. 1657 * We cannot follow the dependency chain beyond a detached or 1658 * missing minor. 1659 */ 1660 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1661 return NO_ERROR; 1662 1663 rcu_read_lock(); 1664 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1665 rcu_read_unlock(); 1666 /* dependency chain ends here, no cycles. */ 1667 if (resync_after == -1) 1668 return NO_ERROR; 1669 1670 /* follow the dependency chain */ 1671 odev = minor_to_device(resync_after); 1672 } 1673 } 1674 1675 /* caller must lock_all_resources() */ 1676 void drbd_resync_after_changed(struct drbd_device *device) 1677 { 1678 int changed; 1679 1680 do { 1681 changed = drbd_pause_after(device); 1682 changed |= drbd_resume_next(device); 1683 } while (changed); 1684 } 1685 1686 void drbd_rs_controller_reset(struct drbd_device *device) 1687 { 1688 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 1689 struct fifo_buffer *plan; 1690 1691 atomic_set(&device->rs_sect_in, 0); 1692 atomic_set(&device->rs_sect_ev, 0); 1693 device->rs_in_flight = 0; 1694 device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors); 1695 1696 /* Updating the RCU protected object in place is necessary since 1697 this function gets called from atomic context. 1698 It is valid since all other updates also lead to an completely 1699 empty fifo */ 1700 rcu_read_lock(); 1701 plan = rcu_dereference(device->rs_plan_s); 1702 plan->total = 0; 1703 fifo_set(plan, 0); 1704 rcu_read_unlock(); 1705 } 1706 1707 void start_resync_timer_fn(struct timer_list *t) 1708 { 1709 struct drbd_device *device = from_timer(device, t, start_resync_timer); 1710 drbd_device_post_work(device, RS_START); 1711 } 1712 1713 static void do_start_resync(struct drbd_device *device) 1714 { 1715 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1716 drbd_warn(device, "postponing start_resync ...\n"); 1717 device->start_resync_timer.expires = jiffies + HZ/10; 1718 add_timer(&device->start_resync_timer); 1719 return; 1720 } 1721 1722 drbd_start_resync(device, C_SYNC_SOURCE); 1723 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1724 } 1725 1726 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1727 { 1728 bool csums_after_crash_only; 1729 rcu_read_lock(); 1730 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1731 rcu_read_unlock(); 1732 return connection->agreed_pro_version >= 89 && /* supported? */ 1733 connection->csums_tfm && /* configured? */ 1734 (csums_after_crash_only == false /* use for each resync? */ 1735 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1736 } 1737 1738 /** 1739 * drbd_start_resync() - Start the resync process 1740 * @device: DRBD device. 1741 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1742 * 1743 * This function might bring you directly into one of the 1744 * C_PAUSED_SYNC_* states. 1745 */ 1746 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1747 { 1748 struct drbd_peer_device *peer_device = first_peer_device(device); 1749 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1750 union drbd_state ns; 1751 int r; 1752 1753 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1754 drbd_err(device, "Resync already running!\n"); 1755 return; 1756 } 1757 1758 if (!connection) { 1759 drbd_err(device, "No connection to peer, aborting!\n"); 1760 return; 1761 } 1762 1763 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1764 if (side == C_SYNC_TARGET) { 1765 /* Since application IO was locked out during C_WF_BITMAP_T and 1766 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1767 we check that we might make the data inconsistent. */ 1768 r = drbd_khelper(device, "before-resync-target"); 1769 r = (r >> 8) & 0xff; 1770 if (r > 0) { 1771 drbd_info(device, "before-resync-target handler returned %d, " 1772 "dropping connection.\n", r); 1773 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1774 return; 1775 } 1776 } else /* C_SYNC_SOURCE */ { 1777 r = drbd_khelper(device, "before-resync-source"); 1778 r = (r >> 8) & 0xff; 1779 if (r > 0) { 1780 if (r == 3) { 1781 drbd_info(device, "before-resync-source handler returned %d, " 1782 "ignoring. Old userland tools?", r); 1783 } else { 1784 drbd_info(device, "before-resync-source handler returned %d, " 1785 "dropping connection.\n", r); 1786 conn_request_state(connection, 1787 NS(conn, C_DISCONNECTING), CS_HARD); 1788 return; 1789 } 1790 } 1791 } 1792 } 1793 1794 if (current == connection->worker.task) { 1795 /* The worker should not sleep waiting for state_mutex, 1796 that can take long */ 1797 if (!mutex_trylock(device->state_mutex)) { 1798 set_bit(B_RS_H_DONE, &device->flags); 1799 device->start_resync_timer.expires = jiffies + HZ/5; 1800 add_timer(&device->start_resync_timer); 1801 return; 1802 } 1803 } else { 1804 mutex_lock(device->state_mutex); 1805 } 1806 1807 lock_all_resources(); 1808 clear_bit(B_RS_H_DONE, &device->flags); 1809 /* Did some connection breakage or IO error race with us? */ 1810 if (device->state.conn < C_CONNECTED 1811 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1812 unlock_all_resources(); 1813 goto out; 1814 } 1815 1816 ns = drbd_read_state(device); 1817 1818 ns.aftr_isp = !_drbd_may_sync_now(device); 1819 1820 ns.conn = side; 1821 1822 if (side == C_SYNC_TARGET) 1823 ns.disk = D_INCONSISTENT; 1824 else /* side == C_SYNC_SOURCE */ 1825 ns.pdsk = D_INCONSISTENT; 1826 1827 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1828 ns = drbd_read_state(device); 1829 1830 if (ns.conn < C_CONNECTED) 1831 r = SS_UNKNOWN_ERROR; 1832 1833 if (r == SS_SUCCESS) { 1834 unsigned long tw = drbd_bm_total_weight(device); 1835 unsigned long now = jiffies; 1836 int i; 1837 1838 device->rs_failed = 0; 1839 device->rs_paused = 0; 1840 device->rs_same_csum = 0; 1841 device->rs_last_sect_ev = 0; 1842 device->rs_total = tw; 1843 device->rs_start = now; 1844 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1845 device->rs_mark_left[i] = tw; 1846 device->rs_mark_time[i] = now; 1847 } 1848 drbd_pause_after(device); 1849 /* Forget potentially stale cached per resync extent bit-counts. 1850 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1851 * disabled, and know the disk state is ok. */ 1852 spin_lock(&device->al_lock); 1853 lc_reset(device->resync); 1854 device->resync_locked = 0; 1855 device->resync_wenr = LC_FREE; 1856 spin_unlock(&device->al_lock); 1857 } 1858 unlock_all_resources(); 1859 1860 if (r == SS_SUCCESS) { 1861 wake_up(&device->al_wait); /* for lc_reset() above */ 1862 /* reset rs_last_bcast when a resync or verify is started, 1863 * to deal with potential jiffies wrap. */ 1864 device->rs_last_bcast = jiffies - HZ; 1865 1866 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1867 drbd_conn_str(ns.conn), 1868 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1869 (unsigned long) device->rs_total); 1870 if (side == C_SYNC_TARGET) { 1871 device->bm_resync_fo = 0; 1872 device->use_csums = use_checksum_based_resync(connection, device); 1873 } else { 1874 device->use_csums = false; 1875 } 1876 1877 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1878 * with w_send_oos, or the sync target will get confused as to 1879 * how much bits to resync. We cannot do that always, because for an 1880 * empty resync and protocol < 95, we need to do it here, as we call 1881 * drbd_resync_finished from here in that case. 1882 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1883 * and from after_state_ch otherwise. */ 1884 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1885 drbd_gen_and_send_sync_uuid(peer_device); 1886 1887 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1888 /* This still has a race (about when exactly the peers 1889 * detect connection loss) that can lead to a full sync 1890 * on next handshake. In 8.3.9 we fixed this with explicit 1891 * resync-finished notifications, but the fix 1892 * introduces a protocol change. Sleeping for some 1893 * time longer than the ping interval + timeout on the 1894 * SyncSource, to give the SyncTarget the chance to 1895 * detect connection loss, then waiting for a ping 1896 * response (implicit in drbd_resync_finished) reduces 1897 * the race considerably, but does not solve it. */ 1898 if (side == C_SYNC_SOURCE) { 1899 struct net_conf *nc; 1900 int timeo; 1901 1902 rcu_read_lock(); 1903 nc = rcu_dereference(connection->net_conf); 1904 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1905 rcu_read_unlock(); 1906 schedule_timeout_interruptible(timeo); 1907 } 1908 drbd_resync_finished(device); 1909 } 1910 1911 drbd_rs_controller_reset(device); 1912 /* ns.conn may already be != device->state.conn, 1913 * we may have been paused in between, or become paused until 1914 * the timer triggers. 1915 * No matter, that is handled in resync_timer_fn() */ 1916 if (ns.conn == C_SYNC_TARGET) 1917 mod_timer(&device->resync_timer, jiffies); 1918 1919 drbd_md_sync(device); 1920 } 1921 put_ldev(device); 1922 out: 1923 mutex_unlock(device->state_mutex); 1924 } 1925 1926 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1927 { 1928 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1929 device->rs_last_bcast = jiffies; 1930 1931 if (!get_ldev(device)) 1932 return; 1933 1934 drbd_bm_write_lazy(device, 0); 1935 if (resync_done && is_sync_state(device->state.conn)) 1936 drbd_resync_finished(device); 1937 1938 drbd_bcast_event(device, &sib); 1939 /* update timestamp, in case it took a while to write out stuff */ 1940 device->rs_last_bcast = jiffies; 1941 put_ldev(device); 1942 } 1943 1944 static void drbd_ldev_destroy(struct drbd_device *device) 1945 { 1946 lc_destroy(device->resync); 1947 device->resync = NULL; 1948 lc_destroy(device->act_log); 1949 device->act_log = NULL; 1950 1951 __acquire(local); 1952 drbd_backing_dev_free(device, device->ldev); 1953 device->ldev = NULL; 1954 __release(local); 1955 1956 clear_bit(GOING_DISKLESS, &device->flags); 1957 wake_up(&device->misc_wait); 1958 } 1959 1960 static void go_diskless(struct drbd_device *device) 1961 { 1962 D_ASSERT(device, device->state.disk == D_FAILED); 1963 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1964 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1965 * the protected members anymore, though, so once put_ldev reaches zero 1966 * again, it will be safe to free them. */ 1967 1968 /* Try to write changed bitmap pages, read errors may have just 1969 * set some bits outside the area covered by the activity log. 1970 * 1971 * If we have an IO error during the bitmap writeout, 1972 * we will want a full sync next time, just in case. 1973 * (Do we want a specific meta data flag for this?) 1974 * 1975 * If that does not make it to stable storage either, 1976 * we cannot do anything about that anymore. 1977 * 1978 * We still need to check if both bitmap and ldev are present, we may 1979 * end up here after a failed attach, before ldev was even assigned. 1980 */ 1981 if (device->bitmap && device->ldev) { 1982 /* An interrupted resync or similar is allowed to recounts bits 1983 * while we detach. 1984 * Any modifications would not be expected anymore, though. 1985 */ 1986 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1987 "detach", BM_LOCKED_TEST_ALLOWED)) { 1988 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1989 drbd_md_set_flag(device, MDF_FULL_SYNC); 1990 drbd_md_sync(device); 1991 } 1992 } 1993 } 1994 1995 drbd_force_state(device, NS(disk, D_DISKLESS)); 1996 } 1997 1998 static int do_md_sync(struct drbd_device *device) 1999 { 2000 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 2001 drbd_md_sync(device); 2002 return 0; 2003 } 2004 2005 /* only called from drbd_worker thread, no locking */ 2006 void __update_timing_details( 2007 struct drbd_thread_timing_details *tdp, 2008 unsigned int *cb_nr, 2009 void *cb, 2010 const char *fn, const unsigned int line) 2011 { 2012 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 2013 struct drbd_thread_timing_details *td = tdp + i; 2014 2015 td->start_jif = jiffies; 2016 td->cb_addr = cb; 2017 td->caller_fn = fn; 2018 td->line = line; 2019 td->cb_nr = *cb_nr; 2020 2021 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 2022 td = tdp + i; 2023 memset(td, 0, sizeof(*td)); 2024 2025 ++(*cb_nr); 2026 } 2027 2028 static void do_device_work(struct drbd_device *device, const unsigned long todo) 2029 { 2030 if (test_bit(MD_SYNC, &todo)) 2031 do_md_sync(device); 2032 if (test_bit(RS_DONE, &todo) || 2033 test_bit(RS_PROGRESS, &todo)) 2034 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo)); 2035 if (test_bit(GO_DISKLESS, &todo)) 2036 go_diskless(device); 2037 if (test_bit(DESTROY_DISK, &todo)) 2038 drbd_ldev_destroy(device); 2039 if (test_bit(RS_START, &todo)) 2040 do_start_resync(device); 2041 } 2042 2043 #define DRBD_DEVICE_WORK_MASK \ 2044 ((1UL << GO_DISKLESS) \ 2045 |(1UL << DESTROY_DISK) \ 2046 |(1UL << MD_SYNC) \ 2047 |(1UL << RS_START) \ 2048 |(1UL << RS_PROGRESS) \ 2049 |(1UL << RS_DONE) \ 2050 ) 2051 2052 static unsigned long get_work_bits(unsigned long *flags) 2053 { 2054 unsigned long old, new; 2055 do { 2056 old = *flags; 2057 new = old & ~DRBD_DEVICE_WORK_MASK; 2058 } while (cmpxchg(flags, old, new) != old); 2059 return old & DRBD_DEVICE_WORK_MASK; 2060 } 2061 2062 static void do_unqueued_work(struct drbd_connection *connection) 2063 { 2064 struct drbd_peer_device *peer_device; 2065 int vnr; 2066 2067 rcu_read_lock(); 2068 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2069 struct drbd_device *device = peer_device->device; 2070 unsigned long todo = get_work_bits(&device->flags); 2071 if (!todo) 2072 continue; 2073 2074 kref_get(&device->kref); 2075 rcu_read_unlock(); 2076 do_device_work(device, todo); 2077 kref_put(&device->kref, drbd_destroy_device); 2078 rcu_read_lock(); 2079 } 2080 rcu_read_unlock(); 2081 } 2082 2083 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 2084 { 2085 spin_lock_irq(&queue->q_lock); 2086 list_splice_tail_init(&queue->q, work_list); 2087 spin_unlock_irq(&queue->q_lock); 2088 return !list_empty(work_list); 2089 } 2090 2091 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 2092 { 2093 DEFINE_WAIT(wait); 2094 struct net_conf *nc; 2095 int uncork, cork; 2096 2097 dequeue_work_batch(&connection->sender_work, work_list); 2098 if (!list_empty(work_list)) 2099 return; 2100 2101 /* Still nothing to do? 2102 * Maybe we still need to close the current epoch, 2103 * even if no new requests are queued yet. 2104 * 2105 * Also, poke TCP, just in case. 2106 * Then wait for new work (or signal). */ 2107 rcu_read_lock(); 2108 nc = rcu_dereference(connection->net_conf); 2109 uncork = nc ? nc->tcp_cork : 0; 2110 rcu_read_unlock(); 2111 if (uncork) { 2112 mutex_lock(&connection->data.mutex); 2113 if (connection->data.socket) 2114 drbd_tcp_uncork(connection->data.socket); 2115 mutex_unlock(&connection->data.mutex); 2116 } 2117 2118 for (;;) { 2119 int send_barrier; 2120 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2121 spin_lock_irq(&connection->resource->req_lock); 2122 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2123 if (!list_empty(&connection->sender_work.q)) 2124 list_splice_tail_init(&connection->sender_work.q, work_list); 2125 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2126 if (!list_empty(work_list) || signal_pending(current)) { 2127 spin_unlock_irq(&connection->resource->req_lock); 2128 break; 2129 } 2130 2131 /* We found nothing new to do, no to-be-communicated request, 2132 * no other work item. We may still need to close the last 2133 * epoch. Next incoming request epoch will be connection -> 2134 * current transfer log epoch number. If that is different 2135 * from the epoch of the last request we communicated, it is 2136 * safe to send the epoch separating barrier now. 2137 */ 2138 send_barrier = 2139 atomic_read(&connection->current_tle_nr) != 2140 connection->send.current_epoch_nr; 2141 spin_unlock_irq(&connection->resource->req_lock); 2142 2143 if (send_barrier) 2144 maybe_send_barrier(connection, 2145 connection->send.current_epoch_nr + 1); 2146 2147 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2148 break; 2149 2150 /* drbd_send() may have called flush_signals() */ 2151 if (get_t_state(&connection->worker) != RUNNING) 2152 break; 2153 2154 schedule(); 2155 /* may be woken up for other things but new work, too, 2156 * e.g. if the current epoch got closed. 2157 * In which case we send the barrier above. */ 2158 } 2159 finish_wait(&connection->sender_work.q_wait, &wait); 2160 2161 /* someone may have changed the config while we have been waiting above. */ 2162 rcu_read_lock(); 2163 nc = rcu_dereference(connection->net_conf); 2164 cork = nc ? nc->tcp_cork : 0; 2165 rcu_read_unlock(); 2166 mutex_lock(&connection->data.mutex); 2167 if (connection->data.socket) { 2168 if (cork) 2169 drbd_tcp_cork(connection->data.socket); 2170 else if (!uncork) 2171 drbd_tcp_uncork(connection->data.socket); 2172 } 2173 mutex_unlock(&connection->data.mutex); 2174 } 2175 2176 int drbd_worker(struct drbd_thread *thi) 2177 { 2178 struct drbd_connection *connection = thi->connection; 2179 struct drbd_work *w = NULL; 2180 struct drbd_peer_device *peer_device; 2181 LIST_HEAD(work_list); 2182 int vnr; 2183 2184 while (get_t_state(thi) == RUNNING) { 2185 drbd_thread_current_set_cpu(thi); 2186 2187 if (list_empty(&work_list)) { 2188 update_worker_timing_details(connection, wait_for_work); 2189 wait_for_work(connection, &work_list); 2190 } 2191 2192 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2193 update_worker_timing_details(connection, do_unqueued_work); 2194 do_unqueued_work(connection); 2195 } 2196 2197 if (signal_pending(current)) { 2198 flush_signals(current); 2199 if (get_t_state(thi) == RUNNING) { 2200 drbd_warn(connection, "Worker got an unexpected signal\n"); 2201 continue; 2202 } 2203 break; 2204 } 2205 2206 if (get_t_state(thi) != RUNNING) 2207 break; 2208 2209 if (!list_empty(&work_list)) { 2210 w = list_first_entry(&work_list, struct drbd_work, list); 2211 list_del_init(&w->list); 2212 update_worker_timing_details(connection, w->cb); 2213 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2214 continue; 2215 if (connection->cstate >= C_WF_REPORT_PARAMS) 2216 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2217 } 2218 } 2219 2220 do { 2221 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2222 update_worker_timing_details(connection, do_unqueued_work); 2223 do_unqueued_work(connection); 2224 } 2225 if (!list_empty(&work_list)) { 2226 w = list_first_entry(&work_list, struct drbd_work, list); 2227 list_del_init(&w->list); 2228 update_worker_timing_details(connection, w->cb); 2229 w->cb(w, 1); 2230 } else 2231 dequeue_work_batch(&connection->sender_work, &work_list); 2232 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2233 2234 rcu_read_lock(); 2235 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2236 struct drbd_device *device = peer_device->device; 2237 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2238 kref_get(&device->kref); 2239 rcu_read_unlock(); 2240 drbd_device_cleanup(device); 2241 kref_put(&device->kref, drbd_destroy_device); 2242 rcu_read_lock(); 2243 } 2244 rcu_read_unlock(); 2245 2246 return 0; 2247 } 2248