1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched/signal.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_protocol.h" 40 #include "drbd_req.h" 41 42 static int make_ov_request(struct drbd_device *, int); 43 static int make_resync_request(struct drbd_device *, int); 44 45 /* endio handlers: 46 * drbd_md_endio (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * drbd_bm_endio (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 /* used for synchronous meta data and bitmap IO 59 * submitted by drbd_md_sync_page_io() 60 */ 61 void drbd_md_endio(struct bio *bio) 62 { 63 struct drbd_device *device; 64 65 device = bio->bi_private; 66 device->md_io.error = blk_status_to_errno(bio->bi_status); 67 68 /* special case: drbd_md_read() during drbd_adm_attach() */ 69 if (device->ldev) 70 put_ldev(device); 71 bio_put(bio); 72 73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 74 * to timeout on the lower level device, and eventually detach from it. 75 * If this io completion runs after that timeout expired, this 76 * drbd_md_put_buffer() may allow us to finally try and re-attach. 77 * During normal operation, this only puts that extra reference 78 * down to 1 again. 79 * Make sure we first drop the reference, and only then signal 80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 81 * next drbd_md_sync_page_io(), that we trigger the 82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 83 */ 84 drbd_md_put_buffer(device); 85 device->md_io.done = 1; 86 wake_up(&device->misc_wait); 87 } 88 89 /* reads on behalf of the partner, 90 * "submitted" by the receiver 91 */ 92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 93 { 94 unsigned long flags = 0; 95 struct drbd_peer_device *peer_device = peer_req->peer_device; 96 struct drbd_device *device = peer_device->device; 97 98 spin_lock_irqsave(&device->resource->req_lock, flags); 99 device->read_cnt += peer_req->i.size >> 9; 100 list_del(&peer_req->w.list); 101 if (list_empty(&device->read_ee)) 102 wake_up(&device->ee_wait); 103 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 104 __drbd_chk_io_error(device, DRBD_READ_ERROR); 105 spin_unlock_irqrestore(&device->resource->req_lock, flags); 106 107 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 108 put_ldev(device); 109 } 110 111 /* writes on behalf of the partner, or resync writes, 112 * "submitted" by the receiver, final stage. */ 113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 114 { 115 unsigned long flags = 0; 116 struct drbd_peer_device *peer_device = peer_req->peer_device; 117 struct drbd_device *device = peer_device->device; 118 struct drbd_connection *connection = peer_device->connection; 119 struct drbd_interval i; 120 int do_wake; 121 u64 block_id; 122 int do_al_complete_io; 123 124 /* after we moved peer_req to done_ee, 125 * we may no longer access it, 126 * it may be freed/reused already! 127 * (as soon as we release the req_lock) */ 128 i = peer_req->i; 129 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 130 block_id = peer_req->block_id; 131 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 132 133 if (peer_req->flags & EE_WAS_ERROR) { 134 /* In protocol != C, we usually do not send write acks. 135 * In case of a write error, send the neg ack anyways. */ 136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags)) 137 inc_unacked(device); 138 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 139 } 140 141 spin_lock_irqsave(&device->resource->req_lock, flags); 142 device->writ_cnt += peer_req->i.size >> 9; 143 list_move_tail(&peer_req->w.list, &device->done_ee); 144 145 /* 146 * Do not remove from the write_requests tree here: we did not send the 147 * Ack yet and did not wake possibly waiting conflicting requests. 148 * Removed from the tree from "drbd_process_done_ee" within the 149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 150 * _drbd_clear_done_ee. 151 */ 152 153 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 154 155 /* FIXME do we want to detach for failed REQ_DISCARD? 156 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */ 157 if (peer_req->flags & EE_WAS_ERROR) 158 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 159 160 if (connection->cstate >= C_WF_REPORT_PARAMS) { 161 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */ 162 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work)) 163 kref_put(&device->kref, drbd_destroy_device); 164 } 165 spin_unlock_irqrestore(&device->resource->req_lock, flags); 166 167 if (block_id == ID_SYNCER) 168 drbd_rs_complete_io(device, i.sector); 169 170 if (do_wake) 171 wake_up(&device->ee_wait); 172 173 if (do_al_complete_io) 174 drbd_al_complete_io(device, &i); 175 176 put_ldev(device); 177 } 178 179 /* writes on behalf of the partner, or resync writes, 180 * "submitted" by the receiver. 181 */ 182 void drbd_peer_request_endio(struct bio *bio) 183 { 184 struct drbd_peer_request *peer_req = bio->bi_private; 185 struct drbd_device *device = peer_req->peer_device->device; 186 bool is_write = bio_data_dir(bio) == WRITE; 187 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES || 188 bio_op(bio) == REQ_OP_DISCARD; 189 190 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state)) 191 drbd_warn(device, "%s: error=%d s=%llus\n", 192 is_write ? (is_discard ? "discard" : "write") 193 : "read", bio->bi_status, 194 (unsigned long long)peer_req->i.sector); 195 196 if (bio->bi_status) 197 set_bit(__EE_WAS_ERROR, &peer_req->flags); 198 199 bio_put(bio); /* no need for the bio anymore */ 200 if (atomic_dec_and_test(&peer_req->pending_bios)) { 201 if (is_write) 202 drbd_endio_write_sec_final(peer_req); 203 else 204 drbd_endio_read_sec_final(peer_req); 205 } 206 } 207 208 static void 209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device) 210 { 211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n", 212 device->minor, device->resource->name, device->vnr); 213 } 214 215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 216 */ 217 void drbd_request_endio(struct bio *bio) 218 { 219 unsigned long flags; 220 struct drbd_request *req = bio->bi_private; 221 struct drbd_device *device = req->device; 222 struct bio_and_error m; 223 enum drbd_req_event what; 224 225 /* If this request was aborted locally before, 226 * but now was completed "successfully", 227 * chances are that this caused arbitrary data corruption. 228 * 229 * "aborting" requests, or force-detaching the disk, is intended for 230 * completely blocked/hung local backing devices which do no longer 231 * complete requests at all, not even do error completions. In this 232 * situation, usually a hard-reset and failover is the only way out. 233 * 234 * By "aborting", basically faking a local error-completion, 235 * we allow for a more graceful swichover by cleanly migrating services. 236 * Still the affected node has to be rebooted "soon". 237 * 238 * By completing these requests, we allow the upper layers to re-use 239 * the associated data pages. 240 * 241 * If later the local backing device "recovers", and now DMAs some data 242 * from disk into the original request pages, in the best case it will 243 * just put random data into unused pages; but typically it will corrupt 244 * meanwhile completely unrelated data, causing all sorts of damage. 245 * 246 * Which means delayed successful completion, 247 * especially for READ requests, 248 * is a reason to panic(). 249 * 250 * We assume that a delayed *error* completion is OK, 251 * though we still will complain noisily about it. 252 */ 253 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 254 if (__ratelimit(&drbd_ratelimit_state)) 255 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 256 257 if (!bio->bi_status) 258 drbd_panic_after_delayed_completion_of_aborted_request(device); 259 } 260 261 /* to avoid recursion in __req_mod */ 262 if (unlikely(bio->bi_status)) { 263 switch (bio_op(bio)) { 264 case REQ_OP_WRITE_ZEROES: 265 case REQ_OP_DISCARD: 266 if (bio->bi_status == BLK_STS_NOTSUPP) 267 what = DISCARD_COMPLETED_NOTSUPP; 268 else 269 what = DISCARD_COMPLETED_WITH_ERROR; 270 break; 271 case REQ_OP_READ: 272 if (bio->bi_opf & REQ_RAHEAD) 273 what = READ_AHEAD_COMPLETED_WITH_ERROR; 274 else 275 what = READ_COMPLETED_WITH_ERROR; 276 break; 277 default: 278 what = WRITE_COMPLETED_WITH_ERROR; 279 break; 280 } 281 } else { 282 what = COMPLETED_OK; 283 } 284 285 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status)); 286 bio_put(bio); 287 288 /* not req_mod(), we need irqsave here! */ 289 spin_lock_irqsave(&device->resource->req_lock, flags); 290 __req_mod(req, what, &m); 291 spin_unlock_irqrestore(&device->resource->req_lock, flags); 292 put_ldev(device); 293 294 if (m.bio) 295 complete_master_bio(device, &m); 296 } 297 298 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest) 299 { 300 AHASH_REQUEST_ON_STACK(req, tfm); 301 struct scatterlist sg; 302 struct page *page = peer_req->pages; 303 struct page *tmp; 304 unsigned len; 305 306 ahash_request_set_tfm(req, tfm); 307 ahash_request_set_callback(req, 0, NULL, NULL); 308 309 sg_init_table(&sg, 1); 310 crypto_ahash_init(req); 311 312 while ((tmp = page_chain_next(page))) { 313 /* all but the last page will be fully used */ 314 sg_set_page(&sg, page, PAGE_SIZE, 0); 315 ahash_request_set_crypt(req, &sg, NULL, sg.length); 316 crypto_ahash_update(req); 317 page = tmp; 318 } 319 /* and now the last, possibly only partially used page */ 320 len = peer_req->i.size & (PAGE_SIZE - 1); 321 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 322 ahash_request_set_crypt(req, &sg, digest, sg.length); 323 crypto_ahash_finup(req); 324 ahash_request_zero(req); 325 } 326 327 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest) 328 { 329 AHASH_REQUEST_ON_STACK(req, tfm); 330 struct scatterlist sg; 331 struct bio_vec bvec; 332 struct bvec_iter iter; 333 334 ahash_request_set_tfm(req, tfm); 335 ahash_request_set_callback(req, 0, NULL, NULL); 336 337 sg_init_table(&sg, 1); 338 crypto_ahash_init(req); 339 340 bio_for_each_segment(bvec, bio, iter) { 341 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 342 ahash_request_set_crypt(req, &sg, NULL, sg.length); 343 crypto_ahash_update(req); 344 /* REQ_OP_WRITE_SAME has only one segment, 345 * checksum the payload only once. */ 346 if (bio_op(bio) == REQ_OP_WRITE_SAME) 347 break; 348 } 349 ahash_request_set_crypt(req, NULL, digest, 0); 350 crypto_ahash_final(req); 351 ahash_request_zero(req); 352 } 353 354 /* MAYBE merge common code with w_e_end_ov_req */ 355 static int w_e_send_csum(struct drbd_work *w, int cancel) 356 { 357 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 358 struct drbd_peer_device *peer_device = peer_req->peer_device; 359 struct drbd_device *device = peer_device->device; 360 int digest_size; 361 void *digest; 362 int err = 0; 363 364 if (unlikely(cancel)) 365 goto out; 366 367 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 368 goto out; 369 370 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm); 371 digest = kmalloc(digest_size, GFP_NOIO); 372 if (digest) { 373 sector_t sector = peer_req->i.sector; 374 unsigned int size = peer_req->i.size; 375 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 376 /* Free peer_req and pages before send. 377 * In case we block on congestion, we could otherwise run into 378 * some distributed deadlock, if the other side blocks on 379 * congestion as well, because our receiver blocks in 380 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 381 drbd_free_peer_req(device, peer_req); 382 peer_req = NULL; 383 inc_rs_pending(device); 384 err = drbd_send_drequest_csum(peer_device, sector, size, 385 digest, digest_size, 386 P_CSUM_RS_REQUEST); 387 kfree(digest); 388 } else { 389 drbd_err(device, "kmalloc() of digest failed.\n"); 390 err = -ENOMEM; 391 } 392 393 out: 394 if (peer_req) 395 drbd_free_peer_req(device, peer_req); 396 397 if (unlikely(err)) 398 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 399 return err; 400 } 401 402 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 403 404 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 405 { 406 struct drbd_device *device = peer_device->device; 407 struct drbd_peer_request *peer_req; 408 409 if (!get_ldev(device)) 410 return -EIO; 411 412 /* GFP_TRY, because if there is no memory available right now, this may 413 * be rescheduled for later. It is "only" background resync, after all. */ 414 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 415 size, size, GFP_TRY); 416 if (!peer_req) 417 goto defer; 418 419 peer_req->w.cb = w_e_send_csum; 420 spin_lock_irq(&device->resource->req_lock); 421 list_add_tail(&peer_req->w.list, &device->read_ee); 422 spin_unlock_irq(&device->resource->req_lock); 423 424 atomic_add(size >> 9, &device->rs_sect_ev); 425 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0, 426 DRBD_FAULT_RS_RD) == 0) 427 return 0; 428 429 /* If it failed because of ENOMEM, retry should help. If it failed 430 * because bio_add_page failed (probably broken lower level driver), 431 * retry may or may not help. 432 * If it does not, you may need to force disconnect. */ 433 spin_lock_irq(&device->resource->req_lock); 434 list_del(&peer_req->w.list); 435 spin_unlock_irq(&device->resource->req_lock); 436 437 drbd_free_peer_req(device, peer_req); 438 defer: 439 put_ldev(device); 440 return -EAGAIN; 441 } 442 443 int w_resync_timer(struct drbd_work *w, int cancel) 444 { 445 struct drbd_device *device = 446 container_of(w, struct drbd_device, resync_work); 447 448 switch (device->state.conn) { 449 case C_VERIFY_S: 450 make_ov_request(device, cancel); 451 break; 452 case C_SYNC_TARGET: 453 make_resync_request(device, cancel); 454 break; 455 } 456 457 return 0; 458 } 459 460 void resync_timer_fn(struct timer_list *t) 461 { 462 struct drbd_device *device = from_timer(device, t, resync_timer); 463 464 drbd_queue_work_if_unqueued( 465 &first_peer_device(device)->connection->sender_work, 466 &device->resync_work); 467 } 468 469 static void fifo_set(struct fifo_buffer *fb, int value) 470 { 471 int i; 472 473 for (i = 0; i < fb->size; i++) 474 fb->values[i] = value; 475 } 476 477 static int fifo_push(struct fifo_buffer *fb, int value) 478 { 479 int ov; 480 481 ov = fb->values[fb->head_index]; 482 fb->values[fb->head_index++] = value; 483 484 if (fb->head_index >= fb->size) 485 fb->head_index = 0; 486 487 return ov; 488 } 489 490 static void fifo_add_val(struct fifo_buffer *fb, int value) 491 { 492 int i; 493 494 for (i = 0; i < fb->size; i++) 495 fb->values[i] += value; 496 } 497 498 struct fifo_buffer *fifo_alloc(int fifo_size) 499 { 500 struct fifo_buffer *fb; 501 502 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 503 if (!fb) 504 return NULL; 505 506 fb->head_index = 0; 507 fb->size = fifo_size; 508 fb->total = 0; 509 510 return fb; 511 } 512 513 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 514 { 515 struct disk_conf *dc; 516 unsigned int want; /* The number of sectors we want in-flight */ 517 int req_sect; /* Number of sectors to request in this turn */ 518 int correction; /* Number of sectors more we need in-flight */ 519 int cps; /* correction per invocation of drbd_rs_controller() */ 520 int steps; /* Number of time steps to plan ahead */ 521 int curr_corr; 522 int max_sect; 523 struct fifo_buffer *plan; 524 525 dc = rcu_dereference(device->ldev->disk_conf); 526 plan = rcu_dereference(device->rs_plan_s); 527 528 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 529 530 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 531 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 532 } else { /* normal path */ 533 want = dc->c_fill_target ? dc->c_fill_target : 534 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 535 } 536 537 correction = want - device->rs_in_flight - plan->total; 538 539 /* Plan ahead */ 540 cps = correction / steps; 541 fifo_add_val(plan, cps); 542 plan->total += cps * steps; 543 544 /* What we do in this step */ 545 curr_corr = fifo_push(plan, 0); 546 plan->total -= curr_corr; 547 548 req_sect = sect_in + curr_corr; 549 if (req_sect < 0) 550 req_sect = 0; 551 552 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 553 if (req_sect > max_sect) 554 req_sect = max_sect; 555 556 /* 557 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 558 sect_in, device->rs_in_flight, want, correction, 559 steps, cps, device->rs_planed, curr_corr, req_sect); 560 */ 561 562 return req_sect; 563 } 564 565 static int drbd_rs_number_requests(struct drbd_device *device) 566 { 567 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 568 int number, mxb; 569 570 sect_in = atomic_xchg(&device->rs_sect_in, 0); 571 device->rs_in_flight -= sect_in; 572 573 rcu_read_lock(); 574 mxb = drbd_get_max_buffers(device) / 2; 575 if (rcu_dereference(device->rs_plan_s)->size) { 576 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 577 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 578 } else { 579 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 580 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 581 } 582 rcu_read_unlock(); 583 584 /* Don't have more than "max-buffers"/2 in-flight. 585 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 586 * potentially causing a distributed deadlock on congestion during 587 * online-verify or (checksum-based) resync, if max-buffers, 588 * socket buffer sizes and resync rate settings are mis-configured. */ 589 590 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 591 * mxb (as used here, and in drbd_alloc_pages on the peer) is 592 * "number of pages" (typically also 4k), 593 * but "rs_in_flight" is in "sectors" (512 Byte). */ 594 if (mxb - device->rs_in_flight/8 < number) 595 number = mxb - device->rs_in_flight/8; 596 597 return number; 598 } 599 600 static int make_resync_request(struct drbd_device *const device, int cancel) 601 { 602 struct drbd_peer_device *const peer_device = first_peer_device(device); 603 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 604 unsigned long bit; 605 sector_t sector; 606 const sector_t capacity = drbd_get_capacity(device->this_bdev); 607 int max_bio_size; 608 int number, rollback_i, size; 609 int align, requeue = 0; 610 int i = 0; 611 int discard_granularity = 0; 612 613 if (unlikely(cancel)) 614 return 0; 615 616 if (device->rs_total == 0) { 617 /* empty resync? */ 618 drbd_resync_finished(device); 619 return 0; 620 } 621 622 if (!get_ldev(device)) { 623 /* Since we only need to access device->rsync a 624 get_ldev_if_state(device,D_FAILED) would be sufficient, but 625 to continue resync with a broken disk makes no sense at 626 all */ 627 drbd_err(device, "Disk broke down during resync!\n"); 628 return 0; 629 } 630 631 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) { 632 rcu_read_lock(); 633 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity; 634 rcu_read_unlock(); 635 } 636 637 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 638 number = drbd_rs_number_requests(device); 639 if (number <= 0) 640 goto requeue; 641 642 for (i = 0; i < number; i++) { 643 /* Stop generating RS requests when half of the send buffer is filled, 644 * but notify TCP that we'd like to have more space. */ 645 mutex_lock(&connection->data.mutex); 646 if (connection->data.socket) { 647 struct sock *sk = connection->data.socket->sk; 648 int queued = sk->sk_wmem_queued; 649 int sndbuf = sk->sk_sndbuf; 650 if (queued > sndbuf / 2) { 651 requeue = 1; 652 if (sk->sk_socket) 653 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 654 } 655 } else 656 requeue = 1; 657 mutex_unlock(&connection->data.mutex); 658 if (requeue) 659 goto requeue; 660 661 next_sector: 662 size = BM_BLOCK_SIZE; 663 bit = drbd_bm_find_next(device, device->bm_resync_fo); 664 665 if (bit == DRBD_END_OF_BITMAP) { 666 device->bm_resync_fo = drbd_bm_bits(device); 667 put_ldev(device); 668 return 0; 669 } 670 671 sector = BM_BIT_TO_SECT(bit); 672 673 if (drbd_try_rs_begin_io(device, sector)) { 674 device->bm_resync_fo = bit; 675 goto requeue; 676 } 677 device->bm_resync_fo = bit + 1; 678 679 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 680 drbd_rs_complete_io(device, sector); 681 goto next_sector; 682 } 683 684 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 685 /* try to find some adjacent bits. 686 * we stop if we have already the maximum req size. 687 * 688 * Additionally always align bigger requests, in order to 689 * be prepared for all stripe sizes of software RAIDs. 690 */ 691 align = 1; 692 rollback_i = i; 693 while (i < number) { 694 if (size + BM_BLOCK_SIZE > max_bio_size) 695 break; 696 697 /* Be always aligned */ 698 if (sector & ((1<<(align+3))-1)) 699 break; 700 701 if (discard_granularity && size == discard_granularity) 702 break; 703 704 /* do not cross extent boundaries */ 705 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 706 break; 707 /* now, is it actually dirty, after all? 708 * caution, drbd_bm_test_bit is tri-state for some 709 * obscure reason; ( b == 0 ) would get the out-of-band 710 * only accidentally right because of the "oddly sized" 711 * adjustment below */ 712 if (drbd_bm_test_bit(device, bit+1) != 1) 713 break; 714 bit++; 715 size += BM_BLOCK_SIZE; 716 if ((BM_BLOCK_SIZE << align) <= size) 717 align++; 718 i++; 719 } 720 /* if we merged some, 721 * reset the offset to start the next drbd_bm_find_next from */ 722 if (size > BM_BLOCK_SIZE) 723 device->bm_resync_fo = bit + 1; 724 #endif 725 726 /* adjust very last sectors, in case we are oddly sized */ 727 if (sector + (size>>9) > capacity) 728 size = (capacity-sector)<<9; 729 730 if (device->use_csums) { 731 switch (read_for_csum(peer_device, sector, size)) { 732 case -EIO: /* Disk failure */ 733 put_ldev(device); 734 return -EIO; 735 case -EAGAIN: /* allocation failed, or ldev busy */ 736 drbd_rs_complete_io(device, sector); 737 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 738 i = rollback_i; 739 goto requeue; 740 case 0: 741 /* everything ok */ 742 break; 743 default: 744 BUG(); 745 } 746 } else { 747 int err; 748 749 inc_rs_pending(device); 750 err = drbd_send_drequest(peer_device, 751 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST, 752 sector, size, ID_SYNCER); 753 if (err) { 754 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 755 dec_rs_pending(device); 756 put_ldev(device); 757 return err; 758 } 759 } 760 } 761 762 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 763 /* last syncer _request_ was sent, 764 * but the P_RS_DATA_REPLY not yet received. sync will end (and 765 * next sync group will resume), as soon as we receive the last 766 * resync data block, and the last bit is cleared. 767 * until then resync "work" is "inactive" ... 768 */ 769 put_ldev(device); 770 return 0; 771 } 772 773 requeue: 774 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 775 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 776 put_ldev(device); 777 return 0; 778 } 779 780 static int make_ov_request(struct drbd_device *device, int cancel) 781 { 782 int number, i, size; 783 sector_t sector; 784 const sector_t capacity = drbd_get_capacity(device->this_bdev); 785 bool stop_sector_reached = false; 786 787 if (unlikely(cancel)) 788 return 1; 789 790 number = drbd_rs_number_requests(device); 791 792 sector = device->ov_position; 793 for (i = 0; i < number; i++) { 794 if (sector >= capacity) 795 return 1; 796 797 /* We check for "finished" only in the reply path: 798 * w_e_end_ov_reply(). 799 * We need to send at least one request out. */ 800 stop_sector_reached = i > 0 801 && verify_can_do_stop_sector(device) 802 && sector >= device->ov_stop_sector; 803 if (stop_sector_reached) 804 break; 805 806 size = BM_BLOCK_SIZE; 807 808 if (drbd_try_rs_begin_io(device, sector)) { 809 device->ov_position = sector; 810 goto requeue; 811 } 812 813 if (sector + (size>>9) > capacity) 814 size = (capacity-sector)<<9; 815 816 inc_rs_pending(device); 817 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 818 dec_rs_pending(device); 819 return 0; 820 } 821 sector += BM_SECT_PER_BIT; 822 } 823 device->ov_position = sector; 824 825 requeue: 826 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 827 if (i == 0 || !stop_sector_reached) 828 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 829 return 1; 830 } 831 832 int w_ov_finished(struct drbd_work *w, int cancel) 833 { 834 struct drbd_device_work *dw = 835 container_of(w, struct drbd_device_work, w); 836 struct drbd_device *device = dw->device; 837 kfree(dw); 838 ov_out_of_sync_print(device); 839 drbd_resync_finished(device); 840 841 return 0; 842 } 843 844 static int w_resync_finished(struct drbd_work *w, int cancel) 845 { 846 struct drbd_device_work *dw = 847 container_of(w, struct drbd_device_work, w); 848 struct drbd_device *device = dw->device; 849 kfree(dw); 850 851 drbd_resync_finished(device); 852 853 return 0; 854 } 855 856 static void ping_peer(struct drbd_device *device) 857 { 858 struct drbd_connection *connection = first_peer_device(device)->connection; 859 860 clear_bit(GOT_PING_ACK, &connection->flags); 861 request_ping(connection); 862 wait_event(connection->ping_wait, 863 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 864 } 865 866 int drbd_resync_finished(struct drbd_device *device) 867 { 868 struct drbd_connection *connection = first_peer_device(device)->connection; 869 unsigned long db, dt, dbdt; 870 unsigned long n_oos; 871 union drbd_state os, ns; 872 struct drbd_device_work *dw; 873 char *khelper_cmd = NULL; 874 int verify_done = 0; 875 876 /* Remove all elements from the resync LRU. Since future actions 877 * might set bits in the (main) bitmap, then the entries in the 878 * resync LRU would be wrong. */ 879 if (drbd_rs_del_all(device)) { 880 /* In case this is not possible now, most probably because 881 * there are P_RS_DATA_REPLY Packets lingering on the worker's 882 * queue (or even the read operations for those packets 883 * is not finished by now). Retry in 100ms. */ 884 885 schedule_timeout_interruptible(HZ / 10); 886 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 887 if (dw) { 888 dw->w.cb = w_resync_finished; 889 dw->device = device; 890 drbd_queue_work(&connection->sender_work, &dw->w); 891 return 1; 892 } 893 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 894 } 895 896 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 897 if (dt <= 0) 898 dt = 1; 899 900 db = device->rs_total; 901 /* adjust for verify start and stop sectors, respective reached position */ 902 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 903 db -= device->ov_left; 904 905 dbdt = Bit2KB(db/dt); 906 device->rs_paused /= HZ; 907 908 if (!get_ldev(device)) 909 goto out; 910 911 ping_peer(device); 912 913 spin_lock_irq(&device->resource->req_lock); 914 os = drbd_read_state(device); 915 916 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 917 918 /* This protects us against multiple calls (that can happen in the presence 919 of application IO), and against connectivity loss just before we arrive here. */ 920 if (os.conn <= C_CONNECTED) 921 goto out_unlock; 922 923 ns = os; 924 ns.conn = C_CONNECTED; 925 926 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 927 verify_done ? "Online verify" : "Resync", 928 dt + device->rs_paused, device->rs_paused, dbdt); 929 930 n_oos = drbd_bm_total_weight(device); 931 932 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 933 if (n_oos) { 934 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 935 n_oos, Bit2KB(1)); 936 khelper_cmd = "out-of-sync"; 937 } 938 } else { 939 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 940 941 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 942 khelper_cmd = "after-resync-target"; 943 944 if (device->use_csums && device->rs_total) { 945 const unsigned long s = device->rs_same_csum; 946 const unsigned long t = device->rs_total; 947 const int ratio = 948 (t == 0) ? 0 : 949 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 950 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 951 "transferred %luK total %luK\n", 952 ratio, 953 Bit2KB(device->rs_same_csum), 954 Bit2KB(device->rs_total - device->rs_same_csum), 955 Bit2KB(device->rs_total)); 956 } 957 } 958 959 if (device->rs_failed) { 960 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 961 962 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 963 ns.disk = D_INCONSISTENT; 964 ns.pdsk = D_UP_TO_DATE; 965 } else { 966 ns.disk = D_UP_TO_DATE; 967 ns.pdsk = D_INCONSISTENT; 968 } 969 } else { 970 ns.disk = D_UP_TO_DATE; 971 ns.pdsk = D_UP_TO_DATE; 972 973 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 974 if (device->p_uuid) { 975 int i; 976 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 977 _drbd_uuid_set(device, i, device->p_uuid[i]); 978 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 979 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 980 } else { 981 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 982 } 983 } 984 985 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 986 /* for verify runs, we don't update uuids here, 987 * so there would be nothing to report. */ 988 drbd_uuid_set_bm(device, 0UL); 989 drbd_print_uuids(device, "updated UUIDs"); 990 if (device->p_uuid) { 991 /* Now the two UUID sets are equal, update what we 992 * know of the peer. */ 993 int i; 994 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 995 device->p_uuid[i] = device->ldev->md.uuid[i]; 996 } 997 } 998 } 999 1000 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1001 out_unlock: 1002 spin_unlock_irq(&device->resource->req_lock); 1003 1004 /* If we have been sync source, and have an effective fencing-policy, 1005 * once *all* volumes are back in sync, call "unfence". */ 1006 if (os.conn == C_SYNC_SOURCE) { 1007 enum drbd_disk_state disk_state = D_MASK; 1008 enum drbd_disk_state pdsk_state = D_MASK; 1009 enum drbd_fencing_p fp = FP_DONT_CARE; 1010 1011 rcu_read_lock(); 1012 fp = rcu_dereference(device->ldev->disk_conf)->fencing; 1013 if (fp != FP_DONT_CARE) { 1014 struct drbd_peer_device *peer_device; 1015 int vnr; 1016 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1017 struct drbd_device *device = peer_device->device; 1018 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk); 1019 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk); 1020 } 1021 } 1022 rcu_read_unlock(); 1023 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE) 1024 conn_khelper(connection, "unfence-peer"); 1025 } 1026 1027 put_ldev(device); 1028 out: 1029 device->rs_total = 0; 1030 device->rs_failed = 0; 1031 device->rs_paused = 0; 1032 1033 /* reset start sector, if we reached end of device */ 1034 if (verify_done && device->ov_left == 0) 1035 device->ov_start_sector = 0; 1036 1037 drbd_md_sync(device); 1038 1039 if (khelper_cmd) 1040 drbd_khelper(device, khelper_cmd); 1041 1042 return 1; 1043 } 1044 1045 /* helper */ 1046 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 1047 { 1048 if (drbd_peer_req_has_active_page(peer_req)) { 1049 /* This might happen if sendpage() has not finished */ 1050 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 1051 atomic_add(i, &device->pp_in_use_by_net); 1052 atomic_sub(i, &device->pp_in_use); 1053 spin_lock_irq(&device->resource->req_lock); 1054 list_add_tail(&peer_req->w.list, &device->net_ee); 1055 spin_unlock_irq(&device->resource->req_lock); 1056 wake_up(&drbd_pp_wait); 1057 } else 1058 drbd_free_peer_req(device, peer_req); 1059 } 1060 1061 /** 1062 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1063 * @w: work object. 1064 * @cancel: The connection will be closed anyways 1065 */ 1066 int w_e_end_data_req(struct drbd_work *w, int cancel) 1067 { 1068 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1069 struct drbd_peer_device *peer_device = peer_req->peer_device; 1070 struct drbd_device *device = peer_device->device; 1071 int err; 1072 1073 if (unlikely(cancel)) { 1074 drbd_free_peer_req(device, peer_req); 1075 dec_unacked(device); 1076 return 0; 1077 } 1078 1079 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1080 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1081 } else { 1082 if (__ratelimit(&drbd_ratelimit_state)) 1083 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1084 (unsigned long long)peer_req->i.sector); 1085 1086 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1087 } 1088 1089 dec_unacked(device); 1090 1091 move_to_net_ee_or_free(device, peer_req); 1092 1093 if (unlikely(err)) 1094 drbd_err(device, "drbd_send_block() failed\n"); 1095 return err; 1096 } 1097 1098 static bool all_zero(struct drbd_peer_request *peer_req) 1099 { 1100 struct page *page = peer_req->pages; 1101 unsigned int len = peer_req->i.size; 1102 1103 page_chain_for_each(page) { 1104 unsigned int l = min_t(unsigned int, len, PAGE_SIZE); 1105 unsigned int i, words = l / sizeof(long); 1106 unsigned long *d; 1107 1108 d = kmap_atomic(page); 1109 for (i = 0; i < words; i++) { 1110 if (d[i]) { 1111 kunmap_atomic(d); 1112 return false; 1113 } 1114 } 1115 kunmap_atomic(d); 1116 len -= l; 1117 } 1118 1119 return true; 1120 } 1121 1122 /** 1123 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1124 * @w: work object. 1125 * @cancel: The connection will be closed anyways 1126 */ 1127 int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1128 { 1129 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1130 struct drbd_peer_device *peer_device = peer_req->peer_device; 1131 struct drbd_device *device = peer_device->device; 1132 int err; 1133 1134 if (unlikely(cancel)) { 1135 drbd_free_peer_req(device, peer_req); 1136 dec_unacked(device); 1137 return 0; 1138 } 1139 1140 if (get_ldev_if_state(device, D_FAILED)) { 1141 drbd_rs_complete_io(device, peer_req->i.sector); 1142 put_ldev(device); 1143 } 1144 1145 if (device->state.conn == C_AHEAD) { 1146 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1147 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1148 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1149 inc_rs_pending(device); 1150 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req)) 1151 err = drbd_send_rs_deallocated(peer_device, peer_req); 1152 else 1153 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1154 } else { 1155 if (__ratelimit(&drbd_ratelimit_state)) 1156 drbd_err(device, "Not sending RSDataReply, " 1157 "partner DISKLESS!\n"); 1158 err = 0; 1159 } 1160 } else { 1161 if (__ratelimit(&drbd_ratelimit_state)) 1162 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1163 (unsigned long long)peer_req->i.sector); 1164 1165 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1166 1167 /* update resync data with failure */ 1168 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1169 } 1170 1171 dec_unacked(device); 1172 1173 move_to_net_ee_or_free(device, peer_req); 1174 1175 if (unlikely(err)) 1176 drbd_err(device, "drbd_send_block() failed\n"); 1177 return err; 1178 } 1179 1180 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1181 { 1182 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1183 struct drbd_peer_device *peer_device = peer_req->peer_device; 1184 struct drbd_device *device = peer_device->device; 1185 struct digest_info *di; 1186 int digest_size; 1187 void *digest = NULL; 1188 int err, eq = 0; 1189 1190 if (unlikely(cancel)) { 1191 drbd_free_peer_req(device, peer_req); 1192 dec_unacked(device); 1193 return 0; 1194 } 1195 1196 if (get_ldev(device)) { 1197 drbd_rs_complete_io(device, peer_req->i.sector); 1198 put_ldev(device); 1199 } 1200 1201 di = peer_req->digest; 1202 1203 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1204 /* quick hack to try to avoid a race against reconfiguration. 1205 * a real fix would be much more involved, 1206 * introducing more locking mechanisms */ 1207 if (peer_device->connection->csums_tfm) { 1208 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm); 1209 D_ASSERT(device, digest_size == di->digest_size); 1210 digest = kmalloc(digest_size, GFP_NOIO); 1211 } 1212 if (digest) { 1213 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1214 eq = !memcmp(digest, di->digest, digest_size); 1215 kfree(digest); 1216 } 1217 1218 if (eq) { 1219 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1220 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1221 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1222 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1223 } else { 1224 inc_rs_pending(device); 1225 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1226 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1227 kfree(di); 1228 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1229 } 1230 } else { 1231 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1232 if (__ratelimit(&drbd_ratelimit_state)) 1233 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1234 } 1235 1236 dec_unacked(device); 1237 move_to_net_ee_or_free(device, peer_req); 1238 1239 if (unlikely(err)) 1240 drbd_err(device, "drbd_send_block/ack() failed\n"); 1241 return err; 1242 } 1243 1244 int w_e_end_ov_req(struct drbd_work *w, int cancel) 1245 { 1246 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1247 struct drbd_peer_device *peer_device = peer_req->peer_device; 1248 struct drbd_device *device = peer_device->device; 1249 sector_t sector = peer_req->i.sector; 1250 unsigned int size = peer_req->i.size; 1251 int digest_size; 1252 void *digest; 1253 int err = 0; 1254 1255 if (unlikely(cancel)) 1256 goto out; 1257 1258 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm); 1259 digest = kmalloc(digest_size, GFP_NOIO); 1260 if (!digest) { 1261 err = 1; /* terminate the connection in case the allocation failed */ 1262 goto out; 1263 } 1264 1265 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1266 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1267 else 1268 memset(digest, 0, digest_size); 1269 1270 /* Free e and pages before send. 1271 * In case we block on congestion, we could otherwise run into 1272 * some distributed deadlock, if the other side blocks on 1273 * congestion as well, because our receiver blocks in 1274 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1275 drbd_free_peer_req(device, peer_req); 1276 peer_req = NULL; 1277 inc_rs_pending(device); 1278 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1279 if (err) 1280 dec_rs_pending(device); 1281 kfree(digest); 1282 1283 out: 1284 if (peer_req) 1285 drbd_free_peer_req(device, peer_req); 1286 dec_unacked(device); 1287 return err; 1288 } 1289 1290 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1291 { 1292 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1293 device->ov_last_oos_size += size>>9; 1294 } else { 1295 device->ov_last_oos_start = sector; 1296 device->ov_last_oos_size = size>>9; 1297 } 1298 drbd_set_out_of_sync(device, sector, size); 1299 } 1300 1301 int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1302 { 1303 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1304 struct drbd_peer_device *peer_device = peer_req->peer_device; 1305 struct drbd_device *device = peer_device->device; 1306 struct digest_info *di; 1307 void *digest; 1308 sector_t sector = peer_req->i.sector; 1309 unsigned int size = peer_req->i.size; 1310 int digest_size; 1311 int err, eq = 0; 1312 bool stop_sector_reached = false; 1313 1314 if (unlikely(cancel)) { 1315 drbd_free_peer_req(device, peer_req); 1316 dec_unacked(device); 1317 return 0; 1318 } 1319 1320 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1321 * the resync lru has been cleaned up already */ 1322 if (get_ldev(device)) { 1323 drbd_rs_complete_io(device, peer_req->i.sector); 1324 put_ldev(device); 1325 } 1326 1327 di = peer_req->digest; 1328 1329 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1330 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm); 1331 digest = kmalloc(digest_size, GFP_NOIO); 1332 if (digest) { 1333 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1334 1335 D_ASSERT(device, digest_size == di->digest_size); 1336 eq = !memcmp(digest, di->digest, digest_size); 1337 kfree(digest); 1338 } 1339 } 1340 1341 /* Free peer_req and pages before send. 1342 * In case we block on congestion, we could otherwise run into 1343 * some distributed deadlock, if the other side blocks on 1344 * congestion as well, because our receiver blocks in 1345 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1346 drbd_free_peer_req(device, peer_req); 1347 if (!eq) 1348 drbd_ov_out_of_sync_found(device, sector, size); 1349 else 1350 ov_out_of_sync_print(device); 1351 1352 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1353 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1354 1355 dec_unacked(device); 1356 1357 --device->ov_left; 1358 1359 /* let's advance progress step marks only for every other megabyte */ 1360 if ((device->ov_left & 0x200) == 0x200) 1361 drbd_advance_rs_marks(device, device->ov_left); 1362 1363 stop_sector_reached = verify_can_do_stop_sector(device) && 1364 (sector + (size>>9)) >= device->ov_stop_sector; 1365 1366 if (device->ov_left == 0 || stop_sector_reached) { 1367 ov_out_of_sync_print(device); 1368 drbd_resync_finished(device); 1369 } 1370 1371 return err; 1372 } 1373 1374 /* FIXME 1375 * We need to track the number of pending barrier acks, 1376 * and to be able to wait for them. 1377 * See also comment in drbd_adm_attach before drbd_suspend_io. 1378 */ 1379 static int drbd_send_barrier(struct drbd_connection *connection) 1380 { 1381 struct p_barrier *p; 1382 struct drbd_socket *sock; 1383 1384 sock = &connection->data; 1385 p = conn_prepare_command(connection, sock); 1386 if (!p) 1387 return -EIO; 1388 p->barrier = connection->send.current_epoch_nr; 1389 p->pad = 0; 1390 connection->send.current_epoch_writes = 0; 1391 connection->send.last_sent_barrier_jif = jiffies; 1392 1393 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1394 } 1395 1396 static int pd_send_unplug_remote(struct drbd_peer_device *pd) 1397 { 1398 struct drbd_socket *sock = &pd->connection->data; 1399 if (!drbd_prepare_command(pd, sock)) 1400 return -EIO; 1401 return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1402 } 1403 1404 int w_send_write_hint(struct drbd_work *w, int cancel) 1405 { 1406 struct drbd_device *device = 1407 container_of(w, struct drbd_device, unplug_work); 1408 1409 if (cancel) 1410 return 0; 1411 return pd_send_unplug_remote(first_peer_device(device)); 1412 } 1413 1414 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1415 { 1416 if (!connection->send.seen_any_write_yet) { 1417 connection->send.seen_any_write_yet = true; 1418 connection->send.current_epoch_nr = epoch; 1419 connection->send.current_epoch_writes = 0; 1420 connection->send.last_sent_barrier_jif = jiffies; 1421 } 1422 } 1423 1424 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1425 { 1426 /* re-init if first write on this connection */ 1427 if (!connection->send.seen_any_write_yet) 1428 return; 1429 if (connection->send.current_epoch_nr != epoch) { 1430 if (connection->send.current_epoch_writes) 1431 drbd_send_barrier(connection); 1432 connection->send.current_epoch_nr = epoch; 1433 } 1434 } 1435 1436 int w_send_out_of_sync(struct drbd_work *w, int cancel) 1437 { 1438 struct drbd_request *req = container_of(w, struct drbd_request, w); 1439 struct drbd_device *device = req->device; 1440 struct drbd_peer_device *const peer_device = first_peer_device(device); 1441 struct drbd_connection *const connection = peer_device->connection; 1442 int err; 1443 1444 if (unlikely(cancel)) { 1445 req_mod(req, SEND_CANCELED); 1446 return 0; 1447 } 1448 req->pre_send_jif = jiffies; 1449 1450 /* this time, no connection->send.current_epoch_writes++; 1451 * If it was sent, it was the closing barrier for the last 1452 * replicated epoch, before we went into AHEAD mode. 1453 * No more barriers will be sent, until we leave AHEAD mode again. */ 1454 maybe_send_barrier(connection, req->epoch); 1455 1456 err = drbd_send_out_of_sync(peer_device, req); 1457 req_mod(req, OOS_HANDED_TO_NETWORK); 1458 1459 return err; 1460 } 1461 1462 /** 1463 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1464 * @w: work object. 1465 * @cancel: The connection will be closed anyways 1466 */ 1467 int w_send_dblock(struct drbd_work *w, int cancel) 1468 { 1469 struct drbd_request *req = container_of(w, struct drbd_request, w); 1470 struct drbd_device *device = req->device; 1471 struct drbd_peer_device *const peer_device = first_peer_device(device); 1472 struct drbd_connection *connection = peer_device->connection; 1473 bool do_send_unplug = req->rq_state & RQ_UNPLUG; 1474 int err; 1475 1476 if (unlikely(cancel)) { 1477 req_mod(req, SEND_CANCELED); 1478 return 0; 1479 } 1480 req->pre_send_jif = jiffies; 1481 1482 re_init_if_first_write(connection, req->epoch); 1483 maybe_send_barrier(connection, req->epoch); 1484 connection->send.current_epoch_writes++; 1485 1486 err = drbd_send_dblock(peer_device, req); 1487 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1488 1489 if (do_send_unplug && !err) 1490 pd_send_unplug_remote(peer_device); 1491 1492 return err; 1493 } 1494 1495 /** 1496 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1497 * @w: work object. 1498 * @cancel: The connection will be closed anyways 1499 */ 1500 int w_send_read_req(struct drbd_work *w, int cancel) 1501 { 1502 struct drbd_request *req = container_of(w, struct drbd_request, w); 1503 struct drbd_device *device = req->device; 1504 struct drbd_peer_device *const peer_device = first_peer_device(device); 1505 struct drbd_connection *connection = peer_device->connection; 1506 bool do_send_unplug = req->rq_state & RQ_UNPLUG; 1507 int err; 1508 1509 if (unlikely(cancel)) { 1510 req_mod(req, SEND_CANCELED); 1511 return 0; 1512 } 1513 req->pre_send_jif = jiffies; 1514 1515 /* Even read requests may close a write epoch, 1516 * if there was any yet. */ 1517 maybe_send_barrier(connection, req->epoch); 1518 1519 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1520 (unsigned long)req); 1521 1522 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1523 1524 if (do_send_unplug && !err) 1525 pd_send_unplug_remote(peer_device); 1526 1527 return err; 1528 } 1529 1530 int w_restart_disk_io(struct drbd_work *w, int cancel) 1531 { 1532 struct drbd_request *req = container_of(w, struct drbd_request, w); 1533 struct drbd_device *device = req->device; 1534 1535 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1536 drbd_al_begin_io(device, &req->i); 1537 1538 drbd_req_make_private_bio(req, req->master_bio); 1539 bio_set_dev(req->private_bio, device->ldev->backing_bdev); 1540 generic_make_request(req->private_bio); 1541 1542 return 0; 1543 } 1544 1545 static int _drbd_may_sync_now(struct drbd_device *device) 1546 { 1547 struct drbd_device *odev = device; 1548 int resync_after; 1549 1550 while (1) { 1551 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1552 return 1; 1553 rcu_read_lock(); 1554 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1555 rcu_read_unlock(); 1556 if (resync_after == -1) 1557 return 1; 1558 odev = minor_to_device(resync_after); 1559 if (!odev) 1560 return 1; 1561 if ((odev->state.conn >= C_SYNC_SOURCE && 1562 odev->state.conn <= C_PAUSED_SYNC_T) || 1563 odev->state.aftr_isp || odev->state.peer_isp || 1564 odev->state.user_isp) 1565 return 0; 1566 } 1567 } 1568 1569 /** 1570 * drbd_pause_after() - Pause resync on all devices that may not resync now 1571 * @device: DRBD device. 1572 * 1573 * Called from process context only (admin command and after_state_ch). 1574 */ 1575 static bool drbd_pause_after(struct drbd_device *device) 1576 { 1577 bool changed = false; 1578 struct drbd_device *odev; 1579 int i; 1580 1581 rcu_read_lock(); 1582 idr_for_each_entry(&drbd_devices, odev, i) { 1583 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1584 continue; 1585 if (!_drbd_may_sync_now(odev) && 1586 _drbd_set_state(_NS(odev, aftr_isp, 1), 1587 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1588 changed = true; 1589 } 1590 rcu_read_unlock(); 1591 1592 return changed; 1593 } 1594 1595 /** 1596 * drbd_resume_next() - Resume resync on all devices that may resync now 1597 * @device: DRBD device. 1598 * 1599 * Called from process context only (admin command and worker). 1600 */ 1601 static bool drbd_resume_next(struct drbd_device *device) 1602 { 1603 bool changed = false; 1604 struct drbd_device *odev; 1605 int i; 1606 1607 rcu_read_lock(); 1608 idr_for_each_entry(&drbd_devices, odev, i) { 1609 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1610 continue; 1611 if (odev->state.aftr_isp) { 1612 if (_drbd_may_sync_now(odev) && 1613 _drbd_set_state(_NS(odev, aftr_isp, 0), 1614 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1615 changed = true; 1616 } 1617 } 1618 rcu_read_unlock(); 1619 return changed; 1620 } 1621 1622 void resume_next_sg(struct drbd_device *device) 1623 { 1624 lock_all_resources(); 1625 drbd_resume_next(device); 1626 unlock_all_resources(); 1627 } 1628 1629 void suspend_other_sg(struct drbd_device *device) 1630 { 1631 lock_all_resources(); 1632 drbd_pause_after(device); 1633 unlock_all_resources(); 1634 } 1635 1636 /* caller must lock_all_resources() */ 1637 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1638 { 1639 struct drbd_device *odev; 1640 int resync_after; 1641 1642 if (o_minor == -1) 1643 return NO_ERROR; 1644 if (o_minor < -1 || o_minor > MINORMASK) 1645 return ERR_RESYNC_AFTER; 1646 1647 /* check for loops */ 1648 odev = minor_to_device(o_minor); 1649 while (1) { 1650 if (odev == device) 1651 return ERR_RESYNC_AFTER_CYCLE; 1652 1653 /* You are free to depend on diskless, non-existing, 1654 * or not yet/no longer existing minors. 1655 * We only reject dependency loops. 1656 * We cannot follow the dependency chain beyond a detached or 1657 * missing minor. 1658 */ 1659 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1660 return NO_ERROR; 1661 1662 rcu_read_lock(); 1663 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1664 rcu_read_unlock(); 1665 /* dependency chain ends here, no cycles. */ 1666 if (resync_after == -1) 1667 return NO_ERROR; 1668 1669 /* follow the dependency chain */ 1670 odev = minor_to_device(resync_after); 1671 } 1672 } 1673 1674 /* caller must lock_all_resources() */ 1675 void drbd_resync_after_changed(struct drbd_device *device) 1676 { 1677 int changed; 1678 1679 do { 1680 changed = drbd_pause_after(device); 1681 changed |= drbd_resume_next(device); 1682 } while (changed); 1683 } 1684 1685 void drbd_rs_controller_reset(struct drbd_device *device) 1686 { 1687 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 1688 struct fifo_buffer *plan; 1689 1690 atomic_set(&device->rs_sect_in, 0); 1691 atomic_set(&device->rs_sect_ev, 0); 1692 device->rs_in_flight = 0; 1693 device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors); 1694 1695 /* Updating the RCU protected object in place is necessary since 1696 this function gets called from atomic context. 1697 It is valid since all other updates also lead to an completely 1698 empty fifo */ 1699 rcu_read_lock(); 1700 plan = rcu_dereference(device->rs_plan_s); 1701 plan->total = 0; 1702 fifo_set(plan, 0); 1703 rcu_read_unlock(); 1704 } 1705 1706 void start_resync_timer_fn(struct timer_list *t) 1707 { 1708 struct drbd_device *device = from_timer(device, t, start_resync_timer); 1709 drbd_device_post_work(device, RS_START); 1710 } 1711 1712 static void do_start_resync(struct drbd_device *device) 1713 { 1714 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1715 drbd_warn(device, "postponing start_resync ...\n"); 1716 device->start_resync_timer.expires = jiffies + HZ/10; 1717 add_timer(&device->start_resync_timer); 1718 return; 1719 } 1720 1721 drbd_start_resync(device, C_SYNC_SOURCE); 1722 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1723 } 1724 1725 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1726 { 1727 bool csums_after_crash_only; 1728 rcu_read_lock(); 1729 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1730 rcu_read_unlock(); 1731 return connection->agreed_pro_version >= 89 && /* supported? */ 1732 connection->csums_tfm && /* configured? */ 1733 (csums_after_crash_only == false /* use for each resync? */ 1734 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1735 } 1736 1737 /** 1738 * drbd_start_resync() - Start the resync process 1739 * @device: DRBD device. 1740 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1741 * 1742 * This function might bring you directly into one of the 1743 * C_PAUSED_SYNC_* states. 1744 */ 1745 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1746 { 1747 struct drbd_peer_device *peer_device = first_peer_device(device); 1748 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1749 union drbd_state ns; 1750 int r; 1751 1752 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1753 drbd_err(device, "Resync already running!\n"); 1754 return; 1755 } 1756 1757 if (!connection) { 1758 drbd_err(device, "No connection to peer, aborting!\n"); 1759 return; 1760 } 1761 1762 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1763 if (side == C_SYNC_TARGET) { 1764 /* Since application IO was locked out during C_WF_BITMAP_T and 1765 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1766 we check that we might make the data inconsistent. */ 1767 r = drbd_khelper(device, "before-resync-target"); 1768 r = (r >> 8) & 0xff; 1769 if (r > 0) { 1770 drbd_info(device, "before-resync-target handler returned %d, " 1771 "dropping connection.\n", r); 1772 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1773 return; 1774 } 1775 } else /* C_SYNC_SOURCE */ { 1776 r = drbd_khelper(device, "before-resync-source"); 1777 r = (r >> 8) & 0xff; 1778 if (r > 0) { 1779 if (r == 3) { 1780 drbd_info(device, "before-resync-source handler returned %d, " 1781 "ignoring. Old userland tools?", r); 1782 } else { 1783 drbd_info(device, "before-resync-source handler returned %d, " 1784 "dropping connection.\n", r); 1785 conn_request_state(connection, 1786 NS(conn, C_DISCONNECTING), CS_HARD); 1787 return; 1788 } 1789 } 1790 } 1791 } 1792 1793 if (current == connection->worker.task) { 1794 /* The worker should not sleep waiting for state_mutex, 1795 that can take long */ 1796 if (!mutex_trylock(device->state_mutex)) { 1797 set_bit(B_RS_H_DONE, &device->flags); 1798 device->start_resync_timer.expires = jiffies + HZ/5; 1799 add_timer(&device->start_resync_timer); 1800 return; 1801 } 1802 } else { 1803 mutex_lock(device->state_mutex); 1804 } 1805 1806 lock_all_resources(); 1807 clear_bit(B_RS_H_DONE, &device->flags); 1808 /* Did some connection breakage or IO error race with us? */ 1809 if (device->state.conn < C_CONNECTED 1810 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1811 unlock_all_resources(); 1812 goto out; 1813 } 1814 1815 ns = drbd_read_state(device); 1816 1817 ns.aftr_isp = !_drbd_may_sync_now(device); 1818 1819 ns.conn = side; 1820 1821 if (side == C_SYNC_TARGET) 1822 ns.disk = D_INCONSISTENT; 1823 else /* side == C_SYNC_SOURCE */ 1824 ns.pdsk = D_INCONSISTENT; 1825 1826 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1827 ns = drbd_read_state(device); 1828 1829 if (ns.conn < C_CONNECTED) 1830 r = SS_UNKNOWN_ERROR; 1831 1832 if (r == SS_SUCCESS) { 1833 unsigned long tw = drbd_bm_total_weight(device); 1834 unsigned long now = jiffies; 1835 int i; 1836 1837 device->rs_failed = 0; 1838 device->rs_paused = 0; 1839 device->rs_same_csum = 0; 1840 device->rs_last_sect_ev = 0; 1841 device->rs_total = tw; 1842 device->rs_start = now; 1843 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1844 device->rs_mark_left[i] = tw; 1845 device->rs_mark_time[i] = now; 1846 } 1847 drbd_pause_after(device); 1848 /* Forget potentially stale cached per resync extent bit-counts. 1849 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1850 * disabled, and know the disk state is ok. */ 1851 spin_lock(&device->al_lock); 1852 lc_reset(device->resync); 1853 device->resync_locked = 0; 1854 device->resync_wenr = LC_FREE; 1855 spin_unlock(&device->al_lock); 1856 } 1857 unlock_all_resources(); 1858 1859 if (r == SS_SUCCESS) { 1860 wake_up(&device->al_wait); /* for lc_reset() above */ 1861 /* reset rs_last_bcast when a resync or verify is started, 1862 * to deal with potential jiffies wrap. */ 1863 device->rs_last_bcast = jiffies - HZ; 1864 1865 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1866 drbd_conn_str(ns.conn), 1867 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1868 (unsigned long) device->rs_total); 1869 if (side == C_SYNC_TARGET) { 1870 device->bm_resync_fo = 0; 1871 device->use_csums = use_checksum_based_resync(connection, device); 1872 } else { 1873 device->use_csums = false; 1874 } 1875 1876 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1877 * with w_send_oos, or the sync target will get confused as to 1878 * how much bits to resync. We cannot do that always, because for an 1879 * empty resync and protocol < 95, we need to do it here, as we call 1880 * drbd_resync_finished from here in that case. 1881 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1882 * and from after_state_ch otherwise. */ 1883 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1884 drbd_gen_and_send_sync_uuid(peer_device); 1885 1886 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1887 /* This still has a race (about when exactly the peers 1888 * detect connection loss) that can lead to a full sync 1889 * on next handshake. In 8.3.9 we fixed this with explicit 1890 * resync-finished notifications, but the fix 1891 * introduces a protocol change. Sleeping for some 1892 * time longer than the ping interval + timeout on the 1893 * SyncSource, to give the SyncTarget the chance to 1894 * detect connection loss, then waiting for a ping 1895 * response (implicit in drbd_resync_finished) reduces 1896 * the race considerably, but does not solve it. */ 1897 if (side == C_SYNC_SOURCE) { 1898 struct net_conf *nc; 1899 int timeo; 1900 1901 rcu_read_lock(); 1902 nc = rcu_dereference(connection->net_conf); 1903 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1904 rcu_read_unlock(); 1905 schedule_timeout_interruptible(timeo); 1906 } 1907 drbd_resync_finished(device); 1908 } 1909 1910 drbd_rs_controller_reset(device); 1911 /* ns.conn may already be != device->state.conn, 1912 * we may have been paused in between, or become paused until 1913 * the timer triggers. 1914 * No matter, that is handled in resync_timer_fn() */ 1915 if (ns.conn == C_SYNC_TARGET) 1916 mod_timer(&device->resync_timer, jiffies); 1917 1918 drbd_md_sync(device); 1919 } 1920 put_ldev(device); 1921 out: 1922 mutex_unlock(device->state_mutex); 1923 } 1924 1925 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1926 { 1927 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1928 device->rs_last_bcast = jiffies; 1929 1930 if (!get_ldev(device)) 1931 return; 1932 1933 drbd_bm_write_lazy(device, 0); 1934 if (resync_done && is_sync_state(device->state.conn)) 1935 drbd_resync_finished(device); 1936 1937 drbd_bcast_event(device, &sib); 1938 /* update timestamp, in case it took a while to write out stuff */ 1939 device->rs_last_bcast = jiffies; 1940 put_ldev(device); 1941 } 1942 1943 static void drbd_ldev_destroy(struct drbd_device *device) 1944 { 1945 lc_destroy(device->resync); 1946 device->resync = NULL; 1947 lc_destroy(device->act_log); 1948 device->act_log = NULL; 1949 1950 __acquire(local); 1951 drbd_backing_dev_free(device, device->ldev); 1952 device->ldev = NULL; 1953 __release(local); 1954 1955 clear_bit(GOING_DISKLESS, &device->flags); 1956 wake_up(&device->misc_wait); 1957 } 1958 1959 static void go_diskless(struct drbd_device *device) 1960 { 1961 D_ASSERT(device, device->state.disk == D_FAILED); 1962 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1963 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1964 * the protected members anymore, though, so once put_ldev reaches zero 1965 * again, it will be safe to free them. */ 1966 1967 /* Try to write changed bitmap pages, read errors may have just 1968 * set some bits outside the area covered by the activity log. 1969 * 1970 * If we have an IO error during the bitmap writeout, 1971 * we will want a full sync next time, just in case. 1972 * (Do we want a specific meta data flag for this?) 1973 * 1974 * If that does not make it to stable storage either, 1975 * we cannot do anything about that anymore. 1976 * 1977 * We still need to check if both bitmap and ldev are present, we may 1978 * end up here after a failed attach, before ldev was even assigned. 1979 */ 1980 if (device->bitmap && device->ldev) { 1981 /* An interrupted resync or similar is allowed to recounts bits 1982 * while we detach. 1983 * Any modifications would not be expected anymore, though. 1984 */ 1985 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1986 "detach", BM_LOCKED_TEST_ALLOWED)) { 1987 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1988 drbd_md_set_flag(device, MDF_FULL_SYNC); 1989 drbd_md_sync(device); 1990 } 1991 } 1992 } 1993 1994 drbd_force_state(device, NS(disk, D_DISKLESS)); 1995 } 1996 1997 static int do_md_sync(struct drbd_device *device) 1998 { 1999 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 2000 drbd_md_sync(device); 2001 return 0; 2002 } 2003 2004 /* only called from drbd_worker thread, no locking */ 2005 void __update_timing_details( 2006 struct drbd_thread_timing_details *tdp, 2007 unsigned int *cb_nr, 2008 void *cb, 2009 const char *fn, const unsigned int line) 2010 { 2011 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 2012 struct drbd_thread_timing_details *td = tdp + i; 2013 2014 td->start_jif = jiffies; 2015 td->cb_addr = cb; 2016 td->caller_fn = fn; 2017 td->line = line; 2018 td->cb_nr = *cb_nr; 2019 2020 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 2021 td = tdp + i; 2022 memset(td, 0, sizeof(*td)); 2023 2024 ++(*cb_nr); 2025 } 2026 2027 static void do_device_work(struct drbd_device *device, const unsigned long todo) 2028 { 2029 if (test_bit(MD_SYNC, &todo)) 2030 do_md_sync(device); 2031 if (test_bit(RS_DONE, &todo) || 2032 test_bit(RS_PROGRESS, &todo)) 2033 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo)); 2034 if (test_bit(GO_DISKLESS, &todo)) 2035 go_diskless(device); 2036 if (test_bit(DESTROY_DISK, &todo)) 2037 drbd_ldev_destroy(device); 2038 if (test_bit(RS_START, &todo)) 2039 do_start_resync(device); 2040 } 2041 2042 #define DRBD_DEVICE_WORK_MASK \ 2043 ((1UL << GO_DISKLESS) \ 2044 |(1UL << DESTROY_DISK) \ 2045 |(1UL << MD_SYNC) \ 2046 |(1UL << RS_START) \ 2047 |(1UL << RS_PROGRESS) \ 2048 |(1UL << RS_DONE) \ 2049 ) 2050 2051 static unsigned long get_work_bits(unsigned long *flags) 2052 { 2053 unsigned long old, new; 2054 do { 2055 old = *flags; 2056 new = old & ~DRBD_DEVICE_WORK_MASK; 2057 } while (cmpxchg(flags, old, new) != old); 2058 return old & DRBD_DEVICE_WORK_MASK; 2059 } 2060 2061 static void do_unqueued_work(struct drbd_connection *connection) 2062 { 2063 struct drbd_peer_device *peer_device; 2064 int vnr; 2065 2066 rcu_read_lock(); 2067 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2068 struct drbd_device *device = peer_device->device; 2069 unsigned long todo = get_work_bits(&device->flags); 2070 if (!todo) 2071 continue; 2072 2073 kref_get(&device->kref); 2074 rcu_read_unlock(); 2075 do_device_work(device, todo); 2076 kref_put(&device->kref, drbd_destroy_device); 2077 rcu_read_lock(); 2078 } 2079 rcu_read_unlock(); 2080 } 2081 2082 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 2083 { 2084 spin_lock_irq(&queue->q_lock); 2085 list_splice_tail_init(&queue->q, work_list); 2086 spin_unlock_irq(&queue->q_lock); 2087 return !list_empty(work_list); 2088 } 2089 2090 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 2091 { 2092 DEFINE_WAIT(wait); 2093 struct net_conf *nc; 2094 int uncork, cork; 2095 2096 dequeue_work_batch(&connection->sender_work, work_list); 2097 if (!list_empty(work_list)) 2098 return; 2099 2100 /* Still nothing to do? 2101 * Maybe we still need to close the current epoch, 2102 * even if no new requests are queued yet. 2103 * 2104 * Also, poke TCP, just in case. 2105 * Then wait for new work (or signal). */ 2106 rcu_read_lock(); 2107 nc = rcu_dereference(connection->net_conf); 2108 uncork = nc ? nc->tcp_cork : 0; 2109 rcu_read_unlock(); 2110 if (uncork) { 2111 mutex_lock(&connection->data.mutex); 2112 if (connection->data.socket) 2113 drbd_tcp_uncork(connection->data.socket); 2114 mutex_unlock(&connection->data.mutex); 2115 } 2116 2117 for (;;) { 2118 int send_barrier; 2119 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2120 spin_lock_irq(&connection->resource->req_lock); 2121 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2122 if (!list_empty(&connection->sender_work.q)) 2123 list_splice_tail_init(&connection->sender_work.q, work_list); 2124 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2125 if (!list_empty(work_list) || signal_pending(current)) { 2126 spin_unlock_irq(&connection->resource->req_lock); 2127 break; 2128 } 2129 2130 /* We found nothing new to do, no to-be-communicated request, 2131 * no other work item. We may still need to close the last 2132 * epoch. Next incoming request epoch will be connection -> 2133 * current transfer log epoch number. If that is different 2134 * from the epoch of the last request we communicated, it is 2135 * safe to send the epoch separating barrier now. 2136 */ 2137 send_barrier = 2138 atomic_read(&connection->current_tle_nr) != 2139 connection->send.current_epoch_nr; 2140 spin_unlock_irq(&connection->resource->req_lock); 2141 2142 if (send_barrier) 2143 maybe_send_barrier(connection, 2144 connection->send.current_epoch_nr + 1); 2145 2146 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2147 break; 2148 2149 /* drbd_send() may have called flush_signals() */ 2150 if (get_t_state(&connection->worker) != RUNNING) 2151 break; 2152 2153 schedule(); 2154 /* may be woken up for other things but new work, too, 2155 * e.g. if the current epoch got closed. 2156 * In which case we send the barrier above. */ 2157 } 2158 finish_wait(&connection->sender_work.q_wait, &wait); 2159 2160 /* someone may have changed the config while we have been waiting above. */ 2161 rcu_read_lock(); 2162 nc = rcu_dereference(connection->net_conf); 2163 cork = nc ? nc->tcp_cork : 0; 2164 rcu_read_unlock(); 2165 mutex_lock(&connection->data.mutex); 2166 if (connection->data.socket) { 2167 if (cork) 2168 drbd_tcp_cork(connection->data.socket); 2169 else if (!uncork) 2170 drbd_tcp_uncork(connection->data.socket); 2171 } 2172 mutex_unlock(&connection->data.mutex); 2173 } 2174 2175 int drbd_worker(struct drbd_thread *thi) 2176 { 2177 struct drbd_connection *connection = thi->connection; 2178 struct drbd_work *w = NULL; 2179 struct drbd_peer_device *peer_device; 2180 LIST_HEAD(work_list); 2181 int vnr; 2182 2183 while (get_t_state(thi) == RUNNING) { 2184 drbd_thread_current_set_cpu(thi); 2185 2186 if (list_empty(&work_list)) { 2187 update_worker_timing_details(connection, wait_for_work); 2188 wait_for_work(connection, &work_list); 2189 } 2190 2191 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2192 update_worker_timing_details(connection, do_unqueued_work); 2193 do_unqueued_work(connection); 2194 } 2195 2196 if (signal_pending(current)) { 2197 flush_signals(current); 2198 if (get_t_state(thi) == RUNNING) { 2199 drbd_warn(connection, "Worker got an unexpected signal\n"); 2200 continue; 2201 } 2202 break; 2203 } 2204 2205 if (get_t_state(thi) != RUNNING) 2206 break; 2207 2208 if (!list_empty(&work_list)) { 2209 w = list_first_entry(&work_list, struct drbd_work, list); 2210 list_del_init(&w->list); 2211 update_worker_timing_details(connection, w->cb); 2212 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2213 continue; 2214 if (connection->cstate >= C_WF_REPORT_PARAMS) 2215 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2216 } 2217 } 2218 2219 do { 2220 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2221 update_worker_timing_details(connection, do_unqueued_work); 2222 do_unqueued_work(connection); 2223 } 2224 if (!list_empty(&work_list)) { 2225 w = list_first_entry(&work_list, struct drbd_work, list); 2226 list_del_init(&w->list); 2227 update_worker_timing_details(connection, w->cb); 2228 w->cb(w, 1); 2229 } else 2230 dequeue_work_batch(&connection->sender_work, &work_list); 2231 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2232 2233 rcu_read_lock(); 2234 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2235 struct drbd_device *device = peer_device->device; 2236 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2237 kref_get(&device->kref); 2238 rcu_read_unlock(); 2239 drbd_device_cleanup(device); 2240 kref_put(&device->kref, drbd_destroy_device); 2241 rcu_read_lock(); 2242 } 2243 rcu_read_unlock(); 2244 2245 return 0; 2246 } 2247