1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched/signal.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_protocol.h" 40 #include "drbd_req.h" 41 42 static int make_ov_request(struct drbd_device *, int); 43 static int make_resync_request(struct drbd_device *, int); 44 45 /* endio handlers: 46 * drbd_md_endio (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * drbd_bm_endio (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 /* used for synchronous meta data and bitmap IO 59 * submitted by drbd_md_sync_page_io() 60 */ 61 void drbd_md_endio(struct bio *bio) 62 { 63 struct drbd_device *device; 64 65 device = bio->bi_private; 66 device->md_io.error = bio->bi_error; 67 68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 69 * to timeout on the lower level device, and eventually detach from it. 70 * If this io completion runs after that timeout expired, this 71 * drbd_md_put_buffer() may allow us to finally try and re-attach. 72 * During normal operation, this only puts that extra reference 73 * down to 1 again. 74 * Make sure we first drop the reference, and only then signal 75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 76 * next drbd_md_sync_page_io(), that we trigger the 77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 78 */ 79 drbd_md_put_buffer(device); 80 device->md_io.done = 1; 81 wake_up(&device->misc_wait); 82 bio_put(bio); 83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 84 put_ldev(device); 85 } 86 87 /* reads on behalf of the partner, 88 * "submitted" by the receiver 89 */ 90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 91 { 92 unsigned long flags = 0; 93 struct drbd_peer_device *peer_device = peer_req->peer_device; 94 struct drbd_device *device = peer_device->device; 95 96 spin_lock_irqsave(&device->resource->req_lock, flags); 97 device->read_cnt += peer_req->i.size >> 9; 98 list_del(&peer_req->w.list); 99 if (list_empty(&device->read_ee)) 100 wake_up(&device->ee_wait); 101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 102 __drbd_chk_io_error(device, DRBD_READ_ERROR); 103 spin_unlock_irqrestore(&device->resource->req_lock, flags); 104 105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 106 put_ldev(device); 107 } 108 109 /* writes on behalf of the partner, or resync writes, 110 * "submitted" by the receiver, final stage. */ 111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 112 { 113 unsigned long flags = 0; 114 struct drbd_peer_device *peer_device = peer_req->peer_device; 115 struct drbd_device *device = peer_device->device; 116 struct drbd_connection *connection = peer_device->connection; 117 struct drbd_interval i; 118 int do_wake; 119 u64 block_id; 120 int do_al_complete_io; 121 122 /* after we moved peer_req to done_ee, 123 * we may no longer access it, 124 * it may be freed/reused already! 125 * (as soon as we release the req_lock) */ 126 i = peer_req->i; 127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 128 block_id = peer_req->block_id; 129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 130 131 spin_lock_irqsave(&device->resource->req_lock, flags); 132 device->writ_cnt += peer_req->i.size >> 9; 133 list_move_tail(&peer_req->w.list, &device->done_ee); 134 135 /* 136 * Do not remove from the write_requests tree here: we did not send the 137 * Ack yet and did not wake possibly waiting conflicting requests. 138 * Removed from the tree from "drbd_process_done_ee" within the 139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 140 * _drbd_clear_done_ee. 141 */ 142 143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 144 145 /* FIXME do we want to detach for failed REQ_DISCARD? 146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */ 147 if (peer_req->flags & EE_WAS_ERROR) 148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 149 150 if (connection->cstate >= C_WF_REPORT_PARAMS) { 151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */ 152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work)) 153 kref_put(&device->kref, drbd_destroy_device); 154 } 155 spin_unlock_irqrestore(&device->resource->req_lock, flags); 156 157 if (block_id == ID_SYNCER) 158 drbd_rs_complete_io(device, i.sector); 159 160 if (do_wake) 161 wake_up(&device->ee_wait); 162 163 if (do_al_complete_io) 164 drbd_al_complete_io(device, &i); 165 166 put_ldev(device); 167 } 168 169 /* writes on behalf of the partner, or resync writes, 170 * "submitted" by the receiver. 171 */ 172 void drbd_peer_request_endio(struct bio *bio) 173 { 174 struct drbd_peer_request *peer_req = bio->bi_private; 175 struct drbd_device *device = peer_req->peer_device->device; 176 bool is_write = bio_data_dir(bio) == WRITE; 177 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES || 178 bio_op(bio) == REQ_OP_DISCARD; 179 180 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state)) 181 drbd_warn(device, "%s: error=%d s=%llus\n", 182 is_write ? (is_discard ? "discard" : "write") 183 : "read", bio->bi_error, 184 (unsigned long long)peer_req->i.sector); 185 186 if (bio->bi_error) 187 set_bit(__EE_WAS_ERROR, &peer_req->flags); 188 189 bio_put(bio); /* no need for the bio anymore */ 190 if (atomic_dec_and_test(&peer_req->pending_bios)) { 191 if (is_write) 192 drbd_endio_write_sec_final(peer_req); 193 else 194 drbd_endio_read_sec_final(peer_req); 195 } 196 } 197 198 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device) 199 { 200 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n", 201 device->minor, device->resource->name, device->vnr); 202 } 203 204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 205 */ 206 void drbd_request_endio(struct bio *bio) 207 { 208 unsigned long flags; 209 struct drbd_request *req = bio->bi_private; 210 struct drbd_device *device = req->device; 211 struct bio_and_error m; 212 enum drbd_req_event what; 213 214 /* If this request was aborted locally before, 215 * but now was completed "successfully", 216 * chances are that this caused arbitrary data corruption. 217 * 218 * "aborting" requests, or force-detaching the disk, is intended for 219 * completely blocked/hung local backing devices which do no longer 220 * complete requests at all, not even do error completions. In this 221 * situation, usually a hard-reset and failover is the only way out. 222 * 223 * By "aborting", basically faking a local error-completion, 224 * we allow for a more graceful swichover by cleanly migrating services. 225 * Still the affected node has to be rebooted "soon". 226 * 227 * By completing these requests, we allow the upper layers to re-use 228 * the associated data pages. 229 * 230 * If later the local backing device "recovers", and now DMAs some data 231 * from disk into the original request pages, in the best case it will 232 * just put random data into unused pages; but typically it will corrupt 233 * meanwhile completely unrelated data, causing all sorts of damage. 234 * 235 * Which means delayed successful completion, 236 * especially for READ requests, 237 * is a reason to panic(). 238 * 239 * We assume that a delayed *error* completion is OK, 240 * though we still will complain noisily about it. 241 */ 242 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 243 if (__ratelimit(&drbd_ratelimit_state)) 244 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 245 246 if (!bio->bi_error) 247 drbd_panic_after_delayed_completion_of_aborted_request(device); 248 } 249 250 /* to avoid recursion in __req_mod */ 251 if (unlikely(bio->bi_error)) { 252 switch (bio_op(bio)) { 253 case REQ_OP_WRITE_ZEROES: 254 case REQ_OP_DISCARD: 255 if (bio->bi_error == -EOPNOTSUPP) 256 what = DISCARD_COMPLETED_NOTSUPP; 257 else 258 what = DISCARD_COMPLETED_WITH_ERROR; 259 break; 260 case REQ_OP_READ: 261 if (bio->bi_opf & REQ_RAHEAD) 262 what = READ_AHEAD_COMPLETED_WITH_ERROR; 263 else 264 what = READ_COMPLETED_WITH_ERROR; 265 break; 266 default: 267 what = WRITE_COMPLETED_WITH_ERROR; 268 break; 269 } 270 } else { 271 what = COMPLETED_OK; 272 } 273 274 bio_put(req->private_bio); 275 req->private_bio = ERR_PTR(bio->bi_error); 276 277 /* not req_mod(), we need irqsave here! */ 278 spin_lock_irqsave(&device->resource->req_lock, flags); 279 __req_mod(req, what, &m); 280 spin_unlock_irqrestore(&device->resource->req_lock, flags); 281 put_ldev(device); 282 283 if (m.bio) 284 complete_master_bio(device, &m); 285 } 286 287 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest) 288 { 289 AHASH_REQUEST_ON_STACK(req, tfm); 290 struct scatterlist sg; 291 struct page *page = peer_req->pages; 292 struct page *tmp; 293 unsigned len; 294 295 ahash_request_set_tfm(req, tfm); 296 ahash_request_set_callback(req, 0, NULL, NULL); 297 298 sg_init_table(&sg, 1); 299 crypto_ahash_init(req); 300 301 while ((tmp = page_chain_next(page))) { 302 /* all but the last page will be fully used */ 303 sg_set_page(&sg, page, PAGE_SIZE, 0); 304 ahash_request_set_crypt(req, &sg, NULL, sg.length); 305 crypto_ahash_update(req); 306 page = tmp; 307 } 308 /* and now the last, possibly only partially used page */ 309 len = peer_req->i.size & (PAGE_SIZE - 1); 310 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 311 ahash_request_set_crypt(req, &sg, digest, sg.length); 312 crypto_ahash_finup(req); 313 ahash_request_zero(req); 314 } 315 316 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest) 317 { 318 AHASH_REQUEST_ON_STACK(req, tfm); 319 struct scatterlist sg; 320 struct bio_vec bvec; 321 struct bvec_iter iter; 322 323 ahash_request_set_tfm(req, tfm); 324 ahash_request_set_callback(req, 0, NULL, NULL); 325 326 sg_init_table(&sg, 1); 327 crypto_ahash_init(req); 328 329 bio_for_each_segment(bvec, bio, iter) { 330 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 331 ahash_request_set_crypt(req, &sg, NULL, sg.length); 332 crypto_ahash_update(req); 333 /* REQ_OP_WRITE_SAME has only one segment, 334 * checksum the payload only once. */ 335 if (bio_op(bio) == REQ_OP_WRITE_SAME) 336 break; 337 } 338 ahash_request_set_crypt(req, NULL, digest, 0); 339 crypto_ahash_final(req); 340 ahash_request_zero(req); 341 } 342 343 /* MAYBE merge common code with w_e_end_ov_req */ 344 static int w_e_send_csum(struct drbd_work *w, int cancel) 345 { 346 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 347 struct drbd_peer_device *peer_device = peer_req->peer_device; 348 struct drbd_device *device = peer_device->device; 349 int digest_size; 350 void *digest; 351 int err = 0; 352 353 if (unlikely(cancel)) 354 goto out; 355 356 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 357 goto out; 358 359 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm); 360 digest = kmalloc(digest_size, GFP_NOIO); 361 if (digest) { 362 sector_t sector = peer_req->i.sector; 363 unsigned int size = peer_req->i.size; 364 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 365 /* Free peer_req and pages before send. 366 * In case we block on congestion, we could otherwise run into 367 * some distributed deadlock, if the other side blocks on 368 * congestion as well, because our receiver blocks in 369 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 370 drbd_free_peer_req(device, peer_req); 371 peer_req = NULL; 372 inc_rs_pending(device); 373 err = drbd_send_drequest_csum(peer_device, sector, size, 374 digest, digest_size, 375 P_CSUM_RS_REQUEST); 376 kfree(digest); 377 } else { 378 drbd_err(device, "kmalloc() of digest failed.\n"); 379 err = -ENOMEM; 380 } 381 382 out: 383 if (peer_req) 384 drbd_free_peer_req(device, peer_req); 385 386 if (unlikely(err)) 387 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 388 return err; 389 } 390 391 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 392 393 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 394 { 395 struct drbd_device *device = peer_device->device; 396 struct drbd_peer_request *peer_req; 397 398 if (!get_ldev(device)) 399 return -EIO; 400 401 /* GFP_TRY, because if there is no memory available right now, this may 402 * be rescheduled for later. It is "only" background resync, after all. */ 403 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 404 size, size, GFP_TRY); 405 if (!peer_req) 406 goto defer; 407 408 peer_req->w.cb = w_e_send_csum; 409 spin_lock_irq(&device->resource->req_lock); 410 list_add_tail(&peer_req->w.list, &device->read_ee); 411 spin_unlock_irq(&device->resource->req_lock); 412 413 atomic_add(size >> 9, &device->rs_sect_ev); 414 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0, 415 DRBD_FAULT_RS_RD) == 0) 416 return 0; 417 418 /* If it failed because of ENOMEM, retry should help. If it failed 419 * because bio_add_page failed (probably broken lower level driver), 420 * retry may or may not help. 421 * If it does not, you may need to force disconnect. */ 422 spin_lock_irq(&device->resource->req_lock); 423 list_del(&peer_req->w.list); 424 spin_unlock_irq(&device->resource->req_lock); 425 426 drbd_free_peer_req(device, peer_req); 427 defer: 428 put_ldev(device); 429 return -EAGAIN; 430 } 431 432 int w_resync_timer(struct drbd_work *w, int cancel) 433 { 434 struct drbd_device *device = 435 container_of(w, struct drbd_device, resync_work); 436 437 switch (device->state.conn) { 438 case C_VERIFY_S: 439 make_ov_request(device, cancel); 440 break; 441 case C_SYNC_TARGET: 442 make_resync_request(device, cancel); 443 break; 444 } 445 446 return 0; 447 } 448 449 void resync_timer_fn(unsigned long data) 450 { 451 struct drbd_device *device = (struct drbd_device *) data; 452 453 drbd_queue_work_if_unqueued( 454 &first_peer_device(device)->connection->sender_work, 455 &device->resync_work); 456 } 457 458 static void fifo_set(struct fifo_buffer *fb, int value) 459 { 460 int i; 461 462 for (i = 0; i < fb->size; i++) 463 fb->values[i] = value; 464 } 465 466 static int fifo_push(struct fifo_buffer *fb, int value) 467 { 468 int ov; 469 470 ov = fb->values[fb->head_index]; 471 fb->values[fb->head_index++] = value; 472 473 if (fb->head_index >= fb->size) 474 fb->head_index = 0; 475 476 return ov; 477 } 478 479 static void fifo_add_val(struct fifo_buffer *fb, int value) 480 { 481 int i; 482 483 for (i = 0; i < fb->size; i++) 484 fb->values[i] += value; 485 } 486 487 struct fifo_buffer *fifo_alloc(int fifo_size) 488 { 489 struct fifo_buffer *fb; 490 491 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 492 if (!fb) 493 return NULL; 494 495 fb->head_index = 0; 496 fb->size = fifo_size; 497 fb->total = 0; 498 499 return fb; 500 } 501 502 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 503 { 504 struct disk_conf *dc; 505 unsigned int want; /* The number of sectors we want in-flight */ 506 int req_sect; /* Number of sectors to request in this turn */ 507 int correction; /* Number of sectors more we need in-flight */ 508 int cps; /* correction per invocation of drbd_rs_controller() */ 509 int steps; /* Number of time steps to plan ahead */ 510 int curr_corr; 511 int max_sect; 512 struct fifo_buffer *plan; 513 514 dc = rcu_dereference(device->ldev->disk_conf); 515 plan = rcu_dereference(device->rs_plan_s); 516 517 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 518 519 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 520 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 521 } else { /* normal path */ 522 want = dc->c_fill_target ? dc->c_fill_target : 523 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 524 } 525 526 correction = want - device->rs_in_flight - plan->total; 527 528 /* Plan ahead */ 529 cps = correction / steps; 530 fifo_add_val(plan, cps); 531 plan->total += cps * steps; 532 533 /* What we do in this step */ 534 curr_corr = fifo_push(plan, 0); 535 plan->total -= curr_corr; 536 537 req_sect = sect_in + curr_corr; 538 if (req_sect < 0) 539 req_sect = 0; 540 541 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 542 if (req_sect > max_sect) 543 req_sect = max_sect; 544 545 /* 546 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 547 sect_in, device->rs_in_flight, want, correction, 548 steps, cps, device->rs_planed, curr_corr, req_sect); 549 */ 550 551 return req_sect; 552 } 553 554 static int drbd_rs_number_requests(struct drbd_device *device) 555 { 556 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 557 int number, mxb; 558 559 sect_in = atomic_xchg(&device->rs_sect_in, 0); 560 device->rs_in_flight -= sect_in; 561 562 rcu_read_lock(); 563 mxb = drbd_get_max_buffers(device) / 2; 564 if (rcu_dereference(device->rs_plan_s)->size) { 565 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 566 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 567 } else { 568 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 569 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 570 } 571 rcu_read_unlock(); 572 573 /* Don't have more than "max-buffers"/2 in-flight. 574 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 575 * potentially causing a distributed deadlock on congestion during 576 * online-verify or (checksum-based) resync, if max-buffers, 577 * socket buffer sizes and resync rate settings are mis-configured. */ 578 579 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 580 * mxb (as used here, and in drbd_alloc_pages on the peer) is 581 * "number of pages" (typically also 4k), 582 * but "rs_in_flight" is in "sectors" (512 Byte). */ 583 if (mxb - device->rs_in_flight/8 < number) 584 number = mxb - device->rs_in_flight/8; 585 586 return number; 587 } 588 589 static int make_resync_request(struct drbd_device *const device, int cancel) 590 { 591 struct drbd_peer_device *const peer_device = first_peer_device(device); 592 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 593 unsigned long bit; 594 sector_t sector; 595 const sector_t capacity = drbd_get_capacity(device->this_bdev); 596 int max_bio_size; 597 int number, rollback_i, size; 598 int align, requeue = 0; 599 int i = 0; 600 int discard_granularity = 0; 601 602 if (unlikely(cancel)) 603 return 0; 604 605 if (device->rs_total == 0) { 606 /* empty resync? */ 607 drbd_resync_finished(device); 608 return 0; 609 } 610 611 if (!get_ldev(device)) { 612 /* Since we only need to access device->rsync a 613 get_ldev_if_state(device,D_FAILED) would be sufficient, but 614 to continue resync with a broken disk makes no sense at 615 all */ 616 drbd_err(device, "Disk broke down during resync!\n"); 617 return 0; 618 } 619 620 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) { 621 rcu_read_lock(); 622 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity; 623 rcu_read_unlock(); 624 } 625 626 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 627 number = drbd_rs_number_requests(device); 628 if (number <= 0) 629 goto requeue; 630 631 for (i = 0; i < number; i++) { 632 /* Stop generating RS requests when half of the send buffer is filled, 633 * but notify TCP that we'd like to have more space. */ 634 mutex_lock(&connection->data.mutex); 635 if (connection->data.socket) { 636 struct sock *sk = connection->data.socket->sk; 637 int queued = sk->sk_wmem_queued; 638 int sndbuf = sk->sk_sndbuf; 639 if (queued > sndbuf / 2) { 640 requeue = 1; 641 if (sk->sk_socket) 642 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 643 } 644 } else 645 requeue = 1; 646 mutex_unlock(&connection->data.mutex); 647 if (requeue) 648 goto requeue; 649 650 next_sector: 651 size = BM_BLOCK_SIZE; 652 bit = drbd_bm_find_next(device, device->bm_resync_fo); 653 654 if (bit == DRBD_END_OF_BITMAP) { 655 device->bm_resync_fo = drbd_bm_bits(device); 656 put_ldev(device); 657 return 0; 658 } 659 660 sector = BM_BIT_TO_SECT(bit); 661 662 if (drbd_try_rs_begin_io(device, sector)) { 663 device->bm_resync_fo = bit; 664 goto requeue; 665 } 666 device->bm_resync_fo = bit + 1; 667 668 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 669 drbd_rs_complete_io(device, sector); 670 goto next_sector; 671 } 672 673 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 674 /* try to find some adjacent bits. 675 * we stop if we have already the maximum req size. 676 * 677 * Additionally always align bigger requests, in order to 678 * be prepared for all stripe sizes of software RAIDs. 679 */ 680 align = 1; 681 rollback_i = i; 682 while (i < number) { 683 if (size + BM_BLOCK_SIZE > max_bio_size) 684 break; 685 686 /* Be always aligned */ 687 if (sector & ((1<<(align+3))-1)) 688 break; 689 690 if (discard_granularity && size == discard_granularity) 691 break; 692 693 /* do not cross extent boundaries */ 694 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 695 break; 696 /* now, is it actually dirty, after all? 697 * caution, drbd_bm_test_bit is tri-state for some 698 * obscure reason; ( b == 0 ) would get the out-of-band 699 * only accidentally right because of the "oddly sized" 700 * adjustment below */ 701 if (drbd_bm_test_bit(device, bit+1) != 1) 702 break; 703 bit++; 704 size += BM_BLOCK_SIZE; 705 if ((BM_BLOCK_SIZE << align) <= size) 706 align++; 707 i++; 708 } 709 /* if we merged some, 710 * reset the offset to start the next drbd_bm_find_next from */ 711 if (size > BM_BLOCK_SIZE) 712 device->bm_resync_fo = bit + 1; 713 #endif 714 715 /* adjust very last sectors, in case we are oddly sized */ 716 if (sector + (size>>9) > capacity) 717 size = (capacity-sector)<<9; 718 719 if (device->use_csums) { 720 switch (read_for_csum(peer_device, sector, size)) { 721 case -EIO: /* Disk failure */ 722 put_ldev(device); 723 return -EIO; 724 case -EAGAIN: /* allocation failed, or ldev busy */ 725 drbd_rs_complete_io(device, sector); 726 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 727 i = rollback_i; 728 goto requeue; 729 case 0: 730 /* everything ok */ 731 break; 732 default: 733 BUG(); 734 } 735 } else { 736 int err; 737 738 inc_rs_pending(device); 739 err = drbd_send_drequest(peer_device, 740 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST, 741 sector, size, ID_SYNCER); 742 if (err) { 743 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 744 dec_rs_pending(device); 745 put_ldev(device); 746 return err; 747 } 748 } 749 } 750 751 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 752 /* last syncer _request_ was sent, 753 * but the P_RS_DATA_REPLY not yet received. sync will end (and 754 * next sync group will resume), as soon as we receive the last 755 * resync data block, and the last bit is cleared. 756 * until then resync "work" is "inactive" ... 757 */ 758 put_ldev(device); 759 return 0; 760 } 761 762 requeue: 763 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 764 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 765 put_ldev(device); 766 return 0; 767 } 768 769 static int make_ov_request(struct drbd_device *device, int cancel) 770 { 771 int number, i, size; 772 sector_t sector; 773 const sector_t capacity = drbd_get_capacity(device->this_bdev); 774 bool stop_sector_reached = false; 775 776 if (unlikely(cancel)) 777 return 1; 778 779 number = drbd_rs_number_requests(device); 780 781 sector = device->ov_position; 782 for (i = 0; i < number; i++) { 783 if (sector >= capacity) 784 return 1; 785 786 /* We check for "finished" only in the reply path: 787 * w_e_end_ov_reply(). 788 * We need to send at least one request out. */ 789 stop_sector_reached = i > 0 790 && verify_can_do_stop_sector(device) 791 && sector >= device->ov_stop_sector; 792 if (stop_sector_reached) 793 break; 794 795 size = BM_BLOCK_SIZE; 796 797 if (drbd_try_rs_begin_io(device, sector)) { 798 device->ov_position = sector; 799 goto requeue; 800 } 801 802 if (sector + (size>>9) > capacity) 803 size = (capacity-sector)<<9; 804 805 inc_rs_pending(device); 806 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 807 dec_rs_pending(device); 808 return 0; 809 } 810 sector += BM_SECT_PER_BIT; 811 } 812 device->ov_position = sector; 813 814 requeue: 815 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 816 if (i == 0 || !stop_sector_reached) 817 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 818 return 1; 819 } 820 821 int w_ov_finished(struct drbd_work *w, int cancel) 822 { 823 struct drbd_device_work *dw = 824 container_of(w, struct drbd_device_work, w); 825 struct drbd_device *device = dw->device; 826 kfree(dw); 827 ov_out_of_sync_print(device); 828 drbd_resync_finished(device); 829 830 return 0; 831 } 832 833 static int w_resync_finished(struct drbd_work *w, int cancel) 834 { 835 struct drbd_device_work *dw = 836 container_of(w, struct drbd_device_work, w); 837 struct drbd_device *device = dw->device; 838 kfree(dw); 839 840 drbd_resync_finished(device); 841 842 return 0; 843 } 844 845 static void ping_peer(struct drbd_device *device) 846 { 847 struct drbd_connection *connection = first_peer_device(device)->connection; 848 849 clear_bit(GOT_PING_ACK, &connection->flags); 850 request_ping(connection); 851 wait_event(connection->ping_wait, 852 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 853 } 854 855 int drbd_resync_finished(struct drbd_device *device) 856 { 857 struct drbd_connection *connection = first_peer_device(device)->connection; 858 unsigned long db, dt, dbdt; 859 unsigned long n_oos; 860 union drbd_state os, ns; 861 struct drbd_device_work *dw; 862 char *khelper_cmd = NULL; 863 int verify_done = 0; 864 865 /* Remove all elements from the resync LRU. Since future actions 866 * might set bits in the (main) bitmap, then the entries in the 867 * resync LRU would be wrong. */ 868 if (drbd_rs_del_all(device)) { 869 /* In case this is not possible now, most probably because 870 * there are P_RS_DATA_REPLY Packets lingering on the worker's 871 * queue (or even the read operations for those packets 872 * is not finished by now). Retry in 100ms. */ 873 874 schedule_timeout_interruptible(HZ / 10); 875 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 876 if (dw) { 877 dw->w.cb = w_resync_finished; 878 dw->device = device; 879 drbd_queue_work(&connection->sender_work, &dw->w); 880 return 1; 881 } 882 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 883 } 884 885 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 886 if (dt <= 0) 887 dt = 1; 888 889 db = device->rs_total; 890 /* adjust for verify start and stop sectors, respective reached position */ 891 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 892 db -= device->ov_left; 893 894 dbdt = Bit2KB(db/dt); 895 device->rs_paused /= HZ; 896 897 if (!get_ldev(device)) 898 goto out; 899 900 ping_peer(device); 901 902 spin_lock_irq(&device->resource->req_lock); 903 os = drbd_read_state(device); 904 905 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 906 907 /* This protects us against multiple calls (that can happen in the presence 908 of application IO), and against connectivity loss just before we arrive here. */ 909 if (os.conn <= C_CONNECTED) 910 goto out_unlock; 911 912 ns = os; 913 ns.conn = C_CONNECTED; 914 915 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 916 verify_done ? "Online verify" : "Resync", 917 dt + device->rs_paused, device->rs_paused, dbdt); 918 919 n_oos = drbd_bm_total_weight(device); 920 921 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 922 if (n_oos) { 923 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 924 n_oos, Bit2KB(1)); 925 khelper_cmd = "out-of-sync"; 926 } 927 } else { 928 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 929 930 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 931 khelper_cmd = "after-resync-target"; 932 933 if (device->use_csums && device->rs_total) { 934 const unsigned long s = device->rs_same_csum; 935 const unsigned long t = device->rs_total; 936 const int ratio = 937 (t == 0) ? 0 : 938 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 939 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 940 "transferred %luK total %luK\n", 941 ratio, 942 Bit2KB(device->rs_same_csum), 943 Bit2KB(device->rs_total - device->rs_same_csum), 944 Bit2KB(device->rs_total)); 945 } 946 } 947 948 if (device->rs_failed) { 949 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 950 951 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 952 ns.disk = D_INCONSISTENT; 953 ns.pdsk = D_UP_TO_DATE; 954 } else { 955 ns.disk = D_UP_TO_DATE; 956 ns.pdsk = D_INCONSISTENT; 957 } 958 } else { 959 ns.disk = D_UP_TO_DATE; 960 ns.pdsk = D_UP_TO_DATE; 961 962 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 963 if (device->p_uuid) { 964 int i; 965 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 966 _drbd_uuid_set(device, i, device->p_uuid[i]); 967 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 968 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 969 } else { 970 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 971 } 972 } 973 974 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 975 /* for verify runs, we don't update uuids here, 976 * so there would be nothing to report. */ 977 drbd_uuid_set_bm(device, 0UL); 978 drbd_print_uuids(device, "updated UUIDs"); 979 if (device->p_uuid) { 980 /* Now the two UUID sets are equal, update what we 981 * know of the peer. */ 982 int i; 983 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 984 device->p_uuid[i] = device->ldev->md.uuid[i]; 985 } 986 } 987 } 988 989 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 990 out_unlock: 991 spin_unlock_irq(&device->resource->req_lock); 992 993 /* If we have been sync source, and have an effective fencing-policy, 994 * once *all* volumes are back in sync, call "unfence". */ 995 if (os.conn == C_SYNC_SOURCE) { 996 enum drbd_disk_state disk_state = D_MASK; 997 enum drbd_disk_state pdsk_state = D_MASK; 998 enum drbd_fencing_p fp = FP_DONT_CARE; 999 1000 rcu_read_lock(); 1001 fp = rcu_dereference(device->ldev->disk_conf)->fencing; 1002 if (fp != FP_DONT_CARE) { 1003 struct drbd_peer_device *peer_device; 1004 int vnr; 1005 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1006 struct drbd_device *device = peer_device->device; 1007 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk); 1008 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk); 1009 } 1010 } 1011 rcu_read_unlock(); 1012 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE) 1013 conn_khelper(connection, "unfence-peer"); 1014 } 1015 1016 put_ldev(device); 1017 out: 1018 device->rs_total = 0; 1019 device->rs_failed = 0; 1020 device->rs_paused = 0; 1021 1022 /* reset start sector, if we reached end of device */ 1023 if (verify_done && device->ov_left == 0) 1024 device->ov_start_sector = 0; 1025 1026 drbd_md_sync(device); 1027 1028 if (khelper_cmd) 1029 drbd_khelper(device, khelper_cmd); 1030 1031 return 1; 1032 } 1033 1034 /* helper */ 1035 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 1036 { 1037 if (drbd_peer_req_has_active_page(peer_req)) { 1038 /* This might happen if sendpage() has not finished */ 1039 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 1040 atomic_add(i, &device->pp_in_use_by_net); 1041 atomic_sub(i, &device->pp_in_use); 1042 spin_lock_irq(&device->resource->req_lock); 1043 list_add_tail(&peer_req->w.list, &device->net_ee); 1044 spin_unlock_irq(&device->resource->req_lock); 1045 wake_up(&drbd_pp_wait); 1046 } else 1047 drbd_free_peer_req(device, peer_req); 1048 } 1049 1050 /** 1051 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1052 * @w: work object. 1053 * @cancel: The connection will be closed anyways 1054 */ 1055 int w_e_end_data_req(struct drbd_work *w, int cancel) 1056 { 1057 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1058 struct drbd_peer_device *peer_device = peer_req->peer_device; 1059 struct drbd_device *device = peer_device->device; 1060 int err; 1061 1062 if (unlikely(cancel)) { 1063 drbd_free_peer_req(device, peer_req); 1064 dec_unacked(device); 1065 return 0; 1066 } 1067 1068 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1069 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1070 } else { 1071 if (__ratelimit(&drbd_ratelimit_state)) 1072 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1073 (unsigned long long)peer_req->i.sector); 1074 1075 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1076 } 1077 1078 dec_unacked(device); 1079 1080 move_to_net_ee_or_free(device, peer_req); 1081 1082 if (unlikely(err)) 1083 drbd_err(device, "drbd_send_block() failed\n"); 1084 return err; 1085 } 1086 1087 static bool all_zero(struct drbd_peer_request *peer_req) 1088 { 1089 struct page *page = peer_req->pages; 1090 unsigned int len = peer_req->i.size; 1091 1092 page_chain_for_each(page) { 1093 unsigned int l = min_t(unsigned int, len, PAGE_SIZE); 1094 unsigned int i, words = l / sizeof(long); 1095 unsigned long *d; 1096 1097 d = kmap_atomic(page); 1098 for (i = 0; i < words; i++) { 1099 if (d[i]) { 1100 kunmap_atomic(d); 1101 return false; 1102 } 1103 } 1104 kunmap_atomic(d); 1105 len -= l; 1106 } 1107 1108 return true; 1109 } 1110 1111 /** 1112 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1113 * @w: work object. 1114 * @cancel: The connection will be closed anyways 1115 */ 1116 int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1117 { 1118 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1119 struct drbd_peer_device *peer_device = peer_req->peer_device; 1120 struct drbd_device *device = peer_device->device; 1121 int err; 1122 1123 if (unlikely(cancel)) { 1124 drbd_free_peer_req(device, peer_req); 1125 dec_unacked(device); 1126 return 0; 1127 } 1128 1129 if (get_ldev_if_state(device, D_FAILED)) { 1130 drbd_rs_complete_io(device, peer_req->i.sector); 1131 put_ldev(device); 1132 } 1133 1134 if (device->state.conn == C_AHEAD) { 1135 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1136 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1137 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1138 inc_rs_pending(device); 1139 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req)) 1140 err = drbd_send_rs_deallocated(peer_device, peer_req); 1141 else 1142 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1143 } else { 1144 if (__ratelimit(&drbd_ratelimit_state)) 1145 drbd_err(device, "Not sending RSDataReply, " 1146 "partner DISKLESS!\n"); 1147 err = 0; 1148 } 1149 } else { 1150 if (__ratelimit(&drbd_ratelimit_state)) 1151 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1152 (unsigned long long)peer_req->i.sector); 1153 1154 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1155 1156 /* update resync data with failure */ 1157 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1158 } 1159 1160 dec_unacked(device); 1161 1162 move_to_net_ee_or_free(device, peer_req); 1163 1164 if (unlikely(err)) 1165 drbd_err(device, "drbd_send_block() failed\n"); 1166 return err; 1167 } 1168 1169 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1170 { 1171 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1172 struct drbd_peer_device *peer_device = peer_req->peer_device; 1173 struct drbd_device *device = peer_device->device; 1174 struct digest_info *di; 1175 int digest_size; 1176 void *digest = NULL; 1177 int err, eq = 0; 1178 1179 if (unlikely(cancel)) { 1180 drbd_free_peer_req(device, peer_req); 1181 dec_unacked(device); 1182 return 0; 1183 } 1184 1185 if (get_ldev(device)) { 1186 drbd_rs_complete_io(device, peer_req->i.sector); 1187 put_ldev(device); 1188 } 1189 1190 di = peer_req->digest; 1191 1192 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1193 /* quick hack to try to avoid a race against reconfiguration. 1194 * a real fix would be much more involved, 1195 * introducing more locking mechanisms */ 1196 if (peer_device->connection->csums_tfm) { 1197 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm); 1198 D_ASSERT(device, digest_size == di->digest_size); 1199 digest = kmalloc(digest_size, GFP_NOIO); 1200 } 1201 if (digest) { 1202 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1203 eq = !memcmp(digest, di->digest, digest_size); 1204 kfree(digest); 1205 } 1206 1207 if (eq) { 1208 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1209 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1210 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1211 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1212 } else { 1213 inc_rs_pending(device); 1214 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1215 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1216 kfree(di); 1217 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1218 } 1219 } else { 1220 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1221 if (__ratelimit(&drbd_ratelimit_state)) 1222 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1223 } 1224 1225 dec_unacked(device); 1226 move_to_net_ee_or_free(device, peer_req); 1227 1228 if (unlikely(err)) 1229 drbd_err(device, "drbd_send_block/ack() failed\n"); 1230 return err; 1231 } 1232 1233 int w_e_end_ov_req(struct drbd_work *w, int cancel) 1234 { 1235 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1236 struct drbd_peer_device *peer_device = peer_req->peer_device; 1237 struct drbd_device *device = peer_device->device; 1238 sector_t sector = peer_req->i.sector; 1239 unsigned int size = peer_req->i.size; 1240 int digest_size; 1241 void *digest; 1242 int err = 0; 1243 1244 if (unlikely(cancel)) 1245 goto out; 1246 1247 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm); 1248 digest = kmalloc(digest_size, GFP_NOIO); 1249 if (!digest) { 1250 err = 1; /* terminate the connection in case the allocation failed */ 1251 goto out; 1252 } 1253 1254 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1255 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1256 else 1257 memset(digest, 0, digest_size); 1258 1259 /* Free e and pages before send. 1260 * In case we block on congestion, we could otherwise run into 1261 * some distributed deadlock, if the other side blocks on 1262 * congestion as well, because our receiver blocks in 1263 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1264 drbd_free_peer_req(device, peer_req); 1265 peer_req = NULL; 1266 inc_rs_pending(device); 1267 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1268 if (err) 1269 dec_rs_pending(device); 1270 kfree(digest); 1271 1272 out: 1273 if (peer_req) 1274 drbd_free_peer_req(device, peer_req); 1275 dec_unacked(device); 1276 return err; 1277 } 1278 1279 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1280 { 1281 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1282 device->ov_last_oos_size += size>>9; 1283 } else { 1284 device->ov_last_oos_start = sector; 1285 device->ov_last_oos_size = size>>9; 1286 } 1287 drbd_set_out_of_sync(device, sector, size); 1288 } 1289 1290 int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1291 { 1292 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1293 struct drbd_peer_device *peer_device = peer_req->peer_device; 1294 struct drbd_device *device = peer_device->device; 1295 struct digest_info *di; 1296 void *digest; 1297 sector_t sector = peer_req->i.sector; 1298 unsigned int size = peer_req->i.size; 1299 int digest_size; 1300 int err, eq = 0; 1301 bool stop_sector_reached = false; 1302 1303 if (unlikely(cancel)) { 1304 drbd_free_peer_req(device, peer_req); 1305 dec_unacked(device); 1306 return 0; 1307 } 1308 1309 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1310 * the resync lru has been cleaned up already */ 1311 if (get_ldev(device)) { 1312 drbd_rs_complete_io(device, peer_req->i.sector); 1313 put_ldev(device); 1314 } 1315 1316 di = peer_req->digest; 1317 1318 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1319 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm); 1320 digest = kmalloc(digest_size, GFP_NOIO); 1321 if (digest) { 1322 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1323 1324 D_ASSERT(device, digest_size == di->digest_size); 1325 eq = !memcmp(digest, di->digest, digest_size); 1326 kfree(digest); 1327 } 1328 } 1329 1330 /* Free peer_req and pages before send. 1331 * In case we block on congestion, we could otherwise run into 1332 * some distributed deadlock, if the other side blocks on 1333 * congestion as well, because our receiver blocks in 1334 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1335 drbd_free_peer_req(device, peer_req); 1336 if (!eq) 1337 drbd_ov_out_of_sync_found(device, sector, size); 1338 else 1339 ov_out_of_sync_print(device); 1340 1341 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1342 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1343 1344 dec_unacked(device); 1345 1346 --device->ov_left; 1347 1348 /* let's advance progress step marks only for every other megabyte */ 1349 if ((device->ov_left & 0x200) == 0x200) 1350 drbd_advance_rs_marks(device, device->ov_left); 1351 1352 stop_sector_reached = verify_can_do_stop_sector(device) && 1353 (sector + (size>>9)) >= device->ov_stop_sector; 1354 1355 if (device->ov_left == 0 || stop_sector_reached) { 1356 ov_out_of_sync_print(device); 1357 drbd_resync_finished(device); 1358 } 1359 1360 return err; 1361 } 1362 1363 /* FIXME 1364 * We need to track the number of pending barrier acks, 1365 * and to be able to wait for them. 1366 * See also comment in drbd_adm_attach before drbd_suspend_io. 1367 */ 1368 static int drbd_send_barrier(struct drbd_connection *connection) 1369 { 1370 struct p_barrier *p; 1371 struct drbd_socket *sock; 1372 1373 sock = &connection->data; 1374 p = conn_prepare_command(connection, sock); 1375 if (!p) 1376 return -EIO; 1377 p->barrier = connection->send.current_epoch_nr; 1378 p->pad = 0; 1379 connection->send.current_epoch_writes = 0; 1380 connection->send.last_sent_barrier_jif = jiffies; 1381 1382 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1383 } 1384 1385 int w_send_write_hint(struct drbd_work *w, int cancel) 1386 { 1387 struct drbd_device *device = 1388 container_of(w, struct drbd_device, unplug_work); 1389 struct drbd_socket *sock; 1390 1391 if (cancel) 1392 return 0; 1393 sock = &first_peer_device(device)->connection->data; 1394 if (!drbd_prepare_command(first_peer_device(device), sock)) 1395 return -EIO; 1396 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1397 } 1398 1399 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1400 { 1401 if (!connection->send.seen_any_write_yet) { 1402 connection->send.seen_any_write_yet = true; 1403 connection->send.current_epoch_nr = epoch; 1404 connection->send.current_epoch_writes = 0; 1405 connection->send.last_sent_barrier_jif = jiffies; 1406 } 1407 } 1408 1409 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1410 { 1411 /* re-init if first write on this connection */ 1412 if (!connection->send.seen_any_write_yet) 1413 return; 1414 if (connection->send.current_epoch_nr != epoch) { 1415 if (connection->send.current_epoch_writes) 1416 drbd_send_barrier(connection); 1417 connection->send.current_epoch_nr = epoch; 1418 } 1419 } 1420 1421 int w_send_out_of_sync(struct drbd_work *w, int cancel) 1422 { 1423 struct drbd_request *req = container_of(w, struct drbd_request, w); 1424 struct drbd_device *device = req->device; 1425 struct drbd_peer_device *const peer_device = first_peer_device(device); 1426 struct drbd_connection *const connection = peer_device->connection; 1427 int err; 1428 1429 if (unlikely(cancel)) { 1430 req_mod(req, SEND_CANCELED); 1431 return 0; 1432 } 1433 req->pre_send_jif = jiffies; 1434 1435 /* this time, no connection->send.current_epoch_writes++; 1436 * If it was sent, it was the closing barrier for the last 1437 * replicated epoch, before we went into AHEAD mode. 1438 * No more barriers will be sent, until we leave AHEAD mode again. */ 1439 maybe_send_barrier(connection, req->epoch); 1440 1441 err = drbd_send_out_of_sync(peer_device, req); 1442 req_mod(req, OOS_HANDED_TO_NETWORK); 1443 1444 return err; 1445 } 1446 1447 /** 1448 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1449 * @w: work object. 1450 * @cancel: The connection will be closed anyways 1451 */ 1452 int w_send_dblock(struct drbd_work *w, int cancel) 1453 { 1454 struct drbd_request *req = container_of(w, struct drbd_request, w); 1455 struct drbd_device *device = req->device; 1456 struct drbd_peer_device *const peer_device = first_peer_device(device); 1457 struct drbd_connection *connection = peer_device->connection; 1458 int err; 1459 1460 if (unlikely(cancel)) { 1461 req_mod(req, SEND_CANCELED); 1462 return 0; 1463 } 1464 req->pre_send_jif = jiffies; 1465 1466 re_init_if_first_write(connection, req->epoch); 1467 maybe_send_barrier(connection, req->epoch); 1468 connection->send.current_epoch_writes++; 1469 1470 err = drbd_send_dblock(peer_device, req); 1471 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1472 1473 return err; 1474 } 1475 1476 /** 1477 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1478 * @w: work object. 1479 * @cancel: The connection will be closed anyways 1480 */ 1481 int w_send_read_req(struct drbd_work *w, int cancel) 1482 { 1483 struct drbd_request *req = container_of(w, struct drbd_request, w); 1484 struct drbd_device *device = req->device; 1485 struct drbd_peer_device *const peer_device = first_peer_device(device); 1486 struct drbd_connection *connection = peer_device->connection; 1487 int err; 1488 1489 if (unlikely(cancel)) { 1490 req_mod(req, SEND_CANCELED); 1491 return 0; 1492 } 1493 req->pre_send_jif = jiffies; 1494 1495 /* Even read requests may close a write epoch, 1496 * if there was any yet. */ 1497 maybe_send_barrier(connection, req->epoch); 1498 1499 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1500 (unsigned long)req); 1501 1502 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1503 1504 return err; 1505 } 1506 1507 int w_restart_disk_io(struct drbd_work *w, int cancel) 1508 { 1509 struct drbd_request *req = container_of(w, struct drbd_request, w); 1510 struct drbd_device *device = req->device; 1511 1512 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1513 drbd_al_begin_io(device, &req->i); 1514 1515 drbd_req_make_private_bio(req, req->master_bio); 1516 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1517 generic_make_request(req->private_bio); 1518 1519 return 0; 1520 } 1521 1522 static int _drbd_may_sync_now(struct drbd_device *device) 1523 { 1524 struct drbd_device *odev = device; 1525 int resync_after; 1526 1527 while (1) { 1528 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1529 return 1; 1530 rcu_read_lock(); 1531 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1532 rcu_read_unlock(); 1533 if (resync_after == -1) 1534 return 1; 1535 odev = minor_to_device(resync_after); 1536 if (!odev) 1537 return 1; 1538 if ((odev->state.conn >= C_SYNC_SOURCE && 1539 odev->state.conn <= C_PAUSED_SYNC_T) || 1540 odev->state.aftr_isp || odev->state.peer_isp || 1541 odev->state.user_isp) 1542 return 0; 1543 } 1544 } 1545 1546 /** 1547 * drbd_pause_after() - Pause resync on all devices that may not resync now 1548 * @device: DRBD device. 1549 * 1550 * Called from process context only (admin command and after_state_ch). 1551 */ 1552 static bool drbd_pause_after(struct drbd_device *device) 1553 { 1554 bool changed = false; 1555 struct drbd_device *odev; 1556 int i; 1557 1558 rcu_read_lock(); 1559 idr_for_each_entry(&drbd_devices, odev, i) { 1560 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1561 continue; 1562 if (!_drbd_may_sync_now(odev) && 1563 _drbd_set_state(_NS(odev, aftr_isp, 1), 1564 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1565 changed = true; 1566 } 1567 rcu_read_unlock(); 1568 1569 return changed; 1570 } 1571 1572 /** 1573 * drbd_resume_next() - Resume resync on all devices that may resync now 1574 * @device: DRBD device. 1575 * 1576 * Called from process context only (admin command and worker). 1577 */ 1578 static bool drbd_resume_next(struct drbd_device *device) 1579 { 1580 bool changed = false; 1581 struct drbd_device *odev; 1582 int i; 1583 1584 rcu_read_lock(); 1585 idr_for_each_entry(&drbd_devices, odev, i) { 1586 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1587 continue; 1588 if (odev->state.aftr_isp) { 1589 if (_drbd_may_sync_now(odev) && 1590 _drbd_set_state(_NS(odev, aftr_isp, 0), 1591 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1592 changed = true; 1593 } 1594 } 1595 rcu_read_unlock(); 1596 return changed; 1597 } 1598 1599 void resume_next_sg(struct drbd_device *device) 1600 { 1601 lock_all_resources(); 1602 drbd_resume_next(device); 1603 unlock_all_resources(); 1604 } 1605 1606 void suspend_other_sg(struct drbd_device *device) 1607 { 1608 lock_all_resources(); 1609 drbd_pause_after(device); 1610 unlock_all_resources(); 1611 } 1612 1613 /* caller must lock_all_resources() */ 1614 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1615 { 1616 struct drbd_device *odev; 1617 int resync_after; 1618 1619 if (o_minor == -1) 1620 return NO_ERROR; 1621 if (o_minor < -1 || o_minor > MINORMASK) 1622 return ERR_RESYNC_AFTER; 1623 1624 /* check for loops */ 1625 odev = minor_to_device(o_minor); 1626 while (1) { 1627 if (odev == device) 1628 return ERR_RESYNC_AFTER_CYCLE; 1629 1630 /* You are free to depend on diskless, non-existing, 1631 * or not yet/no longer existing minors. 1632 * We only reject dependency loops. 1633 * We cannot follow the dependency chain beyond a detached or 1634 * missing minor. 1635 */ 1636 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1637 return NO_ERROR; 1638 1639 rcu_read_lock(); 1640 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1641 rcu_read_unlock(); 1642 /* dependency chain ends here, no cycles. */ 1643 if (resync_after == -1) 1644 return NO_ERROR; 1645 1646 /* follow the dependency chain */ 1647 odev = minor_to_device(resync_after); 1648 } 1649 } 1650 1651 /* caller must lock_all_resources() */ 1652 void drbd_resync_after_changed(struct drbd_device *device) 1653 { 1654 int changed; 1655 1656 do { 1657 changed = drbd_pause_after(device); 1658 changed |= drbd_resume_next(device); 1659 } while (changed); 1660 } 1661 1662 void drbd_rs_controller_reset(struct drbd_device *device) 1663 { 1664 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 1665 struct fifo_buffer *plan; 1666 1667 atomic_set(&device->rs_sect_in, 0); 1668 atomic_set(&device->rs_sect_ev, 0); 1669 device->rs_in_flight = 0; 1670 device->rs_last_events = 1671 (int)part_stat_read(&disk->part0, sectors[0]) + 1672 (int)part_stat_read(&disk->part0, sectors[1]); 1673 1674 /* Updating the RCU protected object in place is necessary since 1675 this function gets called from atomic context. 1676 It is valid since all other updates also lead to an completely 1677 empty fifo */ 1678 rcu_read_lock(); 1679 plan = rcu_dereference(device->rs_plan_s); 1680 plan->total = 0; 1681 fifo_set(plan, 0); 1682 rcu_read_unlock(); 1683 } 1684 1685 void start_resync_timer_fn(unsigned long data) 1686 { 1687 struct drbd_device *device = (struct drbd_device *) data; 1688 drbd_device_post_work(device, RS_START); 1689 } 1690 1691 static void do_start_resync(struct drbd_device *device) 1692 { 1693 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1694 drbd_warn(device, "postponing start_resync ...\n"); 1695 device->start_resync_timer.expires = jiffies + HZ/10; 1696 add_timer(&device->start_resync_timer); 1697 return; 1698 } 1699 1700 drbd_start_resync(device, C_SYNC_SOURCE); 1701 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1702 } 1703 1704 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1705 { 1706 bool csums_after_crash_only; 1707 rcu_read_lock(); 1708 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1709 rcu_read_unlock(); 1710 return connection->agreed_pro_version >= 89 && /* supported? */ 1711 connection->csums_tfm && /* configured? */ 1712 (csums_after_crash_only == false /* use for each resync? */ 1713 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1714 } 1715 1716 /** 1717 * drbd_start_resync() - Start the resync process 1718 * @device: DRBD device. 1719 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1720 * 1721 * This function might bring you directly into one of the 1722 * C_PAUSED_SYNC_* states. 1723 */ 1724 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1725 { 1726 struct drbd_peer_device *peer_device = first_peer_device(device); 1727 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1728 union drbd_state ns; 1729 int r; 1730 1731 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1732 drbd_err(device, "Resync already running!\n"); 1733 return; 1734 } 1735 1736 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1737 if (side == C_SYNC_TARGET) { 1738 /* Since application IO was locked out during C_WF_BITMAP_T and 1739 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1740 we check that we might make the data inconsistent. */ 1741 r = drbd_khelper(device, "before-resync-target"); 1742 r = (r >> 8) & 0xff; 1743 if (r > 0) { 1744 drbd_info(device, "before-resync-target handler returned %d, " 1745 "dropping connection.\n", r); 1746 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1747 return; 1748 } 1749 } else /* C_SYNC_SOURCE */ { 1750 r = drbd_khelper(device, "before-resync-source"); 1751 r = (r >> 8) & 0xff; 1752 if (r > 0) { 1753 if (r == 3) { 1754 drbd_info(device, "before-resync-source handler returned %d, " 1755 "ignoring. Old userland tools?", r); 1756 } else { 1757 drbd_info(device, "before-resync-source handler returned %d, " 1758 "dropping connection.\n", r); 1759 conn_request_state(connection, 1760 NS(conn, C_DISCONNECTING), CS_HARD); 1761 return; 1762 } 1763 } 1764 } 1765 } 1766 1767 if (current == connection->worker.task) { 1768 /* The worker should not sleep waiting for state_mutex, 1769 that can take long */ 1770 if (!mutex_trylock(device->state_mutex)) { 1771 set_bit(B_RS_H_DONE, &device->flags); 1772 device->start_resync_timer.expires = jiffies + HZ/5; 1773 add_timer(&device->start_resync_timer); 1774 return; 1775 } 1776 } else { 1777 mutex_lock(device->state_mutex); 1778 } 1779 1780 lock_all_resources(); 1781 clear_bit(B_RS_H_DONE, &device->flags); 1782 /* Did some connection breakage or IO error race with us? */ 1783 if (device->state.conn < C_CONNECTED 1784 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1785 unlock_all_resources(); 1786 goto out; 1787 } 1788 1789 ns = drbd_read_state(device); 1790 1791 ns.aftr_isp = !_drbd_may_sync_now(device); 1792 1793 ns.conn = side; 1794 1795 if (side == C_SYNC_TARGET) 1796 ns.disk = D_INCONSISTENT; 1797 else /* side == C_SYNC_SOURCE */ 1798 ns.pdsk = D_INCONSISTENT; 1799 1800 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1801 ns = drbd_read_state(device); 1802 1803 if (ns.conn < C_CONNECTED) 1804 r = SS_UNKNOWN_ERROR; 1805 1806 if (r == SS_SUCCESS) { 1807 unsigned long tw = drbd_bm_total_weight(device); 1808 unsigned long now = jiffies; 1809 int i; 1810 1811 device->rs_failed = 0; 1812 device->rs_paused = 0; 1813 device->rs_same_csum = 0; 1814 device->rs_last_sect_ev = 0; 1815 device->rs_total = tw; 1816 device->rs_start = now; 1817 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1818 device->rs_mark_left[i] = tw; 1819 device->rs_mark_time[i] = now; 1820 } 1821 drbd_pause_after(device); 1822 /* Forget potentially stale cached per resync extent bit-counts. 1823 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1824 * disabled, and know the disk state is ok. */ 1825 spin_lock(&device->al_lock); 1826 lc_reset(device->resync); 1827 device->resync_locked = 0; 1828 device->resync_wenr = LC_FREE; 1829 spin_unlock(&device->al_lock); 1830 } 1831 unlock_all_resources(); 1832 1833 if (r == SS_SUCCESS) { 1834 wake_up(&device->al_wait); /* for lc_reset() above */ 1835 /* reset rs_last_bcast when a resync or verify is started, 1836 * to deal with potential jiffies wrap. */ 1837 device->rs_last_bcast = jiffies - HZ; 1838 1839 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1840 drbd_conn_str(ns.conn), 1841 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1842 (unsigned long) device->rs_total); 1843 if (side == C_SYNC_TARGET) { 1844 device->bm_resync_fo = 0; 1845 device->use_csums = use_checksum_based_resync(connection, device); 1846 } else { 1847 device->use_csums = false; 1848 } 1849 1850 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1851 * with w_send_oos, or the sync target will get confused as to 1852 * how much bits to resync. We cannot do that always, because for an 1853 * empty resync and protocol < 95, we need to do it here, as we call 1854 * drbd_resync_finished from here in that case. 1855 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1856 * and from after_state_ch otherwise. */ 1857 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1858 drbd_gen_and_send_sync_uuid(peer_device); 1859 1860 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1861 /* This still has a race (about when exactly the peers 1862 * detect connection loss) that can lead to a full sync 1863 * on next handshake. In 8.3.9 we fixed this with explicit 1864 * resync-finished notifications, but the fix 1865 * introduces a protocol change. Sleeping for some 1866 * time longer than the ping interval + timeout on the 1867 * SyncSource, to give the SyncTarget the chance to 1868 * detect connection loss, then waiting for a ping 1869 * response (implicit in drbd_resync_finished) reduces 1870 * the race considerably, but does not solve it. */ 1871 if (side == C_SYNC_SOURCE) { 1872 struct net_conf *nc; 1873 int timeo; 1874 1875 rcu_read_lock(); 1876 nc = rcu_dereference(connection->net_conf); 1877 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1878 rcu_read_unlock(); 1879 schedule_timeout_interruptible(timeo); 1880 } 1881 drbd_resync_finished(device); 1882 } 1883 1884 drbd_rs_controller_reset(device); 1885 /* ns.conn may already be != device->state.conn, 1886 * we may have been paused in between, or become paused until 1887 * the timer triggers. 1888 * No matter, that is handled in resync_timer_fn() */ 1889 if (ns.conn == C_SYNC_TARGET) 1890 mod_timer(&device->resync_timer, jiffies); 1891 1892 drbd_md_sync(device); 1893 } 1894 put_ldev(device); 1895 out: 1896 mutex_unlock(device->state_mutex); 1897 } 1898 1899 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1900 { 1901 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1902 device->rs_last_bcast = jiffies; 1903 1904 if (!get_ldev(device)) 1905 return; 1906 1907 drbd_bm_write_lazy(device, 0); 1908 if (resync_done && is_sync_state(device->state.conn)) 1909 drbd_resync_finished(device); 1910 1911 drbd_bcast_event(device, &sib); 1912 /* update timestamp, in case it took a while to write out stuff */ 1913 device->rs_last_bcast = jiffies; 1914 put_ldev(device); 1915 } 1916 1917 static void drbd_ldev_destroy(struct drbd_device *device) 1918 { 1919 lc_destroy(device->resync); 1920 device->resync = NULL; 1921 lc_destroy(device->act_log); 1922 device->act_log = NULL; 1923 1924 __acquire(local); 1925 drbd_backing_dev_free(device, device->ldev); 1926 device->ldev = NULL; 1927 __release(local); 1928 1929 clear_bit(GOING_DISKLESS, &device->flags); 1930 wake_up(&device->misc_wait); 1931 } 1932 1933 static void go_diskless(struct drbd_device *device) 1934 { 1935 D_ASSERT(device, device->state.disk == D_FAILED); 1936 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1937 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1938 * the protected members anymore, though, so once put_ldev reaches zero 1939 * again, it will be safe to free them. */ 1940 1941 /* Try to write changed bitmap pages, read errors may have just 1942 * set some bits outside the area covered by the activity log. 1943 * 1944 * If we have an IO error during the bitmap writeout, 1945 * we will want a full sync next time, just in case. 1946 * (Do we want a specific meta data flag for this?) 1947 * 1948 * If that does not make it to stable storage either, 1949 * we cannot do anything about that anymore. 1950 * 1951 * We still need to check if both bitmap and ldev are present, we may 1952 * end up here after a failed attach, before ldev was even assigned. 1953 */ 1954 if (device->bitmap && device->ldev) { 1955 /* An interrupted resync or similar is allowed to recounts bits 1956 * while we detach. 1957 * Any modifications would not be expected anymore, though. 1958 */ 1959 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1960 "detach", BM_LOCKED_TEST_ALLOWED)) { 1961 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1962 drbd_md_set_flag(device, MDF_FULL_SYNC); 1963 drbd_md_sync(device); 1964 } 1965 } 1966 } 1967 1968 drbd_force_state(device, NS(disk, D_DISKLESS)); 1969 } 1970 1971 static int do_md_sync(struct drbd_device *device) 1972 { 1973 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 1974 drbd_md_sync(device); 1975 return 0; 1976 } 1977 1978 /* only called from drbd_worker thread, no locking */ 1979 void __update_timing_details( 1980 struct drbd_thread_timing_details *tdp, 1981 unsigned int *cb_nr, 1982 void *cb, 1983 const char *fn, const unsigned int line) 1984 { 1985 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 1986 struct drbd_thread_timing_details *td = tdp + i; 1987 1988 td->start_jif = jiffies; 1989 td->cb_addr = cb; 1990 td->caller_fn = fn; 1991 td->line = line; 1992 td->cb_nr = *cb_nr; 1993 1994 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 1995 td = tdp + i; 1996 memset(td, 0, sizeof(*td)); 1997 1998 ++(*cb_nr); 1999 } 2000 2001 static void do_device_work(struct drbd_device *device, const unsigned long todo) 2002 { 2003 if (test_bit(MD_SYNC, &todo)) 2004 do_md_sync(device); 2005 if (test_bit(RS_DONE, &todo) || 2006 test_bit(RS_PROGRESS, &todo)) 2007 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo)); 2008 if (test_bit(GO_DISKLESS, &todo)) 2009 go_diskless(device); 2010 if (test_bit(DESTROY_DISK, &todo)) 2011 drbd_ldev_destroy(device); 2012 if (test_bit(RS_START, &todo)) 2013 do_start_resync(device); 2014 } 2015 2016 #define DRBD_DEVICE_WORK_MASK \ 2017 ((1UL << GO_DISKLESS) \ 2018 |(1UL << DESTROY_DISK) \ 2019 |(1UL << MD_SYNC) \ 2020 |(1UL << RS_START) \ 2021 |(1UL << RS_PROGRESS) \ 2022 |(1UL << RS_DONE) \ 2023 ) 2024 2025 static unsigned long get_work_bits(unsigned long *flags) 2026 { 2027 unsigned long old, new; 2028 do { 2029 old = *flags; 2030 new = old & ~DRBD_DEVICE_WORK_MASK; 2031 } while (cmpxchg(flags, old, new) != old); 2032 return old & DRBD_DEVICE_WORK_MASK; 2033 } 2034 2035 static void do_unqueued_work(struct drbd_connection *connection) 2036 { 2037 struct drbd_peer_device *peer_device; 2038 int vnr; 2039 2040 rcu_read_lock(); 2041 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2042 struct drbd_device *device = peer_device->device; 2043 unsigned long todo = get_work_bits(&device->flags); 2044 if (!todo) 2045 continue; 2046 2047 kref_get(&device->kref); 2048 rcu_read_unlock(); 2049 do_device_work(device, todo); 2050 kref_put(&device->kref, drbd_destroy_device); 2051 rcu_read_lock(); 2052 } 2053 rcu_read_unlock(); 2054 } 2055 2056 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 2057 { 2058 spin_lock_irq(&queue->q_lock); 2059 list_splice_tail_init(&queue->q, work_list); 2060 spin_unlock_irq(&queue->q_lock); 2061 return !list_empty(work_list); 2062 } 2063 2064 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 2065 { 2066 DEFINE_WAIT(wait); 2067 struct net_conf *nc; 2068 int uncork, cork; 2069 2070 dequeue_work_batch(&connection->sender_work, work_list); 2071 if (!list_empty(work_list)) 2072 return; 2073 2074 /* Still nothing to do? 2075 * Maybe we still need to close the current epoch, 2076 * even if no new requests are queued yet. 2077 * 2078 * Also, poke TCP, just in case. 2079 * Then wait for new work (or signal). */ 2080 rcu_read_lock(); 2081 nc = rcu_dereference(connection->net_conf); 2082 uncork = nc ? nc->tcp_cork : 0; 2083 rcu_read_unlock(); 2084 if (uncork) { 2085 mutex_lock(&connection->data.mutex); 2086 if (connection->data.socket) 2087 drbd_tcp_uncork(connection->data.socket); 2088 mutex_unlock(&connection->data.mutex); 2089 } 2090 2091 for (;;) { 2092 int send_barrier; 2093 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2094 spin_lock_irq(&connection->resource->req_lock); 2095 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2096 if (!list_empty(&connection->sender_work.q)) 2097 list_splice_tail_init(&connection->sender_work.q, work_list); 2098 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2099 if (!list_empty(work_list) || signal_pending(current)) { 2100 spin_unlock_irq(&connection->resource->req_lock); 2101 break; 2102 } 2103 2104 /* We found nothing new to do, no to-be-communicated request, 2105 * no other work item. We may still need to close the last 2106 * epoch. Next incoming request epoch will be connection -> 2107 * current transfer log epoch number. If that is different 2108 * from the epoch of the last request we communicated, it is 2109 * safe to send the epoch separating barrier now. 2110 */ 2111 send_barrier = 2112 atomic_read(&connection->current_tle_nr) != 2113 connection->send.current_epoch_nr; 2114 spin_unlock_irq(&connection->resource->req_lock); 2115 2116 if (send_barrier) 2117 maybe_send_barrier(connection, 2118 connection->send.current_epoch_nr + 1); 2119 2120 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2121 break; 2122 2123 /* drbd_send() may have called flush_signals() */ 2124 if (get_t_state(&connection->worker) != RUNNING) 2125 break; 2126 2127 schedule(); 2128 /* may be woken up for other things but new work, too, 2129 * e.g. if the current epoch got closed. 2130 * In which case we send the barrier above. */ 2131 } 2132 finish_wait(&connection->sender_work.q_wait, &wait); 2133 2134 /* someone may have changed the config while we have been waiting above. */ 2135 rcu_read_lock(); 2136 nc = rcu_dereference(connection->net_conf); 2137 cork = nc ? nc->tcp_cork : 0; 2138 rcu_read_unlock(); 2139 mutex_lock(&connection->data.mutex); 2140 if (connection->data.socket) { 2141 if (cork) 2142 drbd_tcp_cork(connection->data.socket); 2143 else if (!uncork) 2144 drbd_tcp_uncork(connection->data.socket); 2145 } 2146 mutex_unlock(&connection->data.mutex); 2147 } 2148 2149 int drbd_worker(struct drbd_thread *thi) 2150 { 2151 struct drbd_connection *connection = thi->connection; 2152 struct drbd_work *w = NULL; 2153 struct drbd_peer_device *peer_device; 2154 LIST_HEAD(work_list); 2155 int vnr; 2156 2157 while (get_t_state(thi) == RUNNING) { 2158 drbd_thread_current_set_cpu(thi); 2159 2160 if (list_empty(&work_list)) { 2161 update_worker_timing_details(connection, wait_for_work); 2162 wait_for_work(connection, &work_list); 2163 } 2164 2165 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2166 update_worker_timing_details(connection, do_unqueued_work); 2167 do_unqueued_work(connection); 2168 } 2169 2170 if (signal_pending(current)) { 2171 flush_signals(current); 2172 if (get_t_state(thi) == RUNNING) { 2173 drbd_warn(connection, "Worker got an unexpected signal\n"); 2174 continue; 2175 } 2176 break; 2177 } 2178 2179 if (get_t_state(thi) != RUNNING) 2180 break; 2181 2182 if (!list_empty(&work_list)) { 2183 w = list_first_entry(&work_list, struct drbd_work, list); 2184 list_del_init(&w->list); 2185 update_worker_timing_details(connection, w->cb); 2186 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2187 continue; 2188 if (connection->cstate >= C_WF_REPORT_PARAMS) 2189 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2190 } 2191 } 2192 2193 do { 2194 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2195 update_worker_timing_details(connection, do_unqueued_work); 2196 do_unqueued_work(connection); 2197 } 2198 if (!list_empty(&work_list)) { 2199 w = list_first_entry(&work_list, struct drbd_work, list); 2200 list_del_init(&w->list); 2201 update_worker_timing_details(connection, w->cb); 2202 w->cb(w, 1); 2203 } else 2204 dequeue_work_batch(&connection->sender_work, &work_list); 2205 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2206 2207 rcu_read_lock(); 2208 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2209 struct drbd_device *device = peer_device->device; 2210 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2211 kref_get(&device->kref); 2212 rcu_read_unlock(); 2213 drbd_device_cleanup(device); 2214 kref_put(&device->kref, drbd_destroy_device); 2215 rcu_read_lock(); 2216 } 2217 rcu_read_unlock(); 2218 2219 return 0; 2220 } 2221