xref: /linux/drivers/block/drbd/drbd_worker.c (revision ec63e2a4897075e427c121d863bd89c44578094f)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63 	struct drbd_device *device;
64 
65 	device = bio->bi_private;
66 	device->md_io.error = blk_status_to_errno(bio->bi_status);
67 
68 	/* special case: drbd_md_read() during drbd_adm_attach() */
69 	if (device->ldev)
70 		put_ldev(device);
71 	bio_put(bio);
72 
73 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 	 * to timeout on the lower level device, and eventually detach from it.
75 	 * If this io completion runs after that timeout expired, this
76 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 	 * During normal operation, this only puts that extra reference
78 	 * down to 1 again.
79 	 * Make sure we first drop the reference, and only then signal
80 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 	 * next drbd_md_sync_page_io(), that we trigger the
82 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
83 	 */
84 	drbd_md_put_buffer(device);
85 	device->md_io.done = 1;
86 	wake_up(&device->misc_wait);
87 }
88 
89 /* reads on behalf of the partner,
90  * "submitted" by the receiver
91  */
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
93 {
94 	unsigned long flags = 0;
95 	struct drbd_peer_device *peer_device = peer_req->peer_device;
96 	struct drbd_device *device = peer_device->device;
97 
98 	spin_lock_irqsave(&device->resource->req_lock, flags);
99 	device->read_cnt += peer_req->i.size >> 9;
100 	list_del(&peer_req->w.list);
101 	if (list_empty(&device->read_ee))
102 		wake_up(&device->ee_wait);
103 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
105 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
106 
107 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108 	put_ldev(device);
109 }
110 
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
114 {
115 	unsigned long flags = 0;
116 	struct drbd_peer_device *peer_device = peer_req->peer_device;
117 	struct drbd_device *device = peer_device->device;
118 	struct drbd_connection *connection = peer_device->connection;
119 	struct drbd_interval i;
120 	int do_wake;
121 	u64 block_id;
122 	int do_al_complete_io;
123 
124 	/* after we moved peer_req to done_ee,
125 	 * we may no longer access it,
126 	 * it may be freed/reused already!
127 	 * (as soon as we release the req_lock) */
128 	i = peer_req->i;
129 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130 	block_id = peer_req->block_id;
131 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
132 
133 	if (peer_req->flags & EE_WAS_ERROR) {
134 		/* In protocol != C, we usually do not send write acks.
135 		 * In case of a write error, send the neg ack anyways. */
136 		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137 			inc_unacked(device);
138 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
139 	}
140 
141 	spin_lock_irqsave(&device->resource->req_lock, flags);
142 	device->writ_cnt += peer_req->i.size >> 9;
143 	list_move_tail(&peer_req->w.list, &device->done_ee);
144 
145 	/*
146 	 * Do not remove from the write_requests tree here: we did not send the
147 	 * Ack yet and did not wake possibly waiting conflicting requests.
148 	 * Removed from the tree from "drbd_process_done_ee" within the
149 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 	 * _drbd_clear_done_ee.
151 	 */
152 
153 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
154 
155 	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
156 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
157 	if (peer_req->flags & EE_WAS_ERROR)
158 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
159 
160 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
161 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163 			kref_put(&device->kref, drbd_destroy_device);
164 	}
165 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
166 
167 	if (block_id == ID_SYNCER)
168 		drbd_rs_complete_io(device, i.sector);
169 
170 	if (do_wake)
171 		wake_up(&device->ee_wait);
172 
173 	if (do_al_complete_io)
174 		drbd_al_complete_io(device, &i);
175 
176 	put_ldev(device);
177 }
178 
179 /* writes on behalf of the partner, or resync writes,
180  * "submitted" by the receiver.
181  */
182 void drbd_peer_request_endio(struct bio *bio)
183 {
184 	struct drbd_peer_request *peer_req = bio->bi_private;
185 	struct drbd_device *device = peer_req->peer_device->device;
186 	bool is_write = bio_data_dir(bio) == WRITE;
187 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188 			  bio_op(bio) == REQ_OP_DISCARD;
189 
190 	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191 		drbd_warn(device, "%s: error=%d s=%llus\n",
192 				is_write ? (is_discard ? "discard" : "write")
193 					: "read", bio->bi_status,
194 				(unsigned long long)peer_req->i.sector);
195 
196 	if (bio->bi_status)
197 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
198 
199 	bio_put(bio); /* no need for the bio anymore */
200 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
201 		if (is_write)
202 			drbd_endio_write_sec_final(peer_req);
203 		else
204 			drbd_endio_read_sec_final(peer_req);
205 	}
206 }
207 
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
210 {
211 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 		device->minor, device->resource->name, device->vnr);
213 }
214 
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216  */
217 void drbd_request_endio(struct bio *bio)
218 {
219 	unsigned long flags;
220 	struct drbd_request *req = bio->bi_private;
221 	struct drbd_device *device = req->device;
222 	struct bio_and_error m;
223 	enum drbd_req_event what;
224 
225 	/* If this request was aborted locally before,
226 	 * but now was completed "successfully",
227 	 * chances are that this caused arbitrary data corruption.
228 	 *
229 	 * "aborting" requests, or force-detaching the disk, is intended for
230 	 * completely blocked/hung local backing devices which do no longer
231 	 * complete requests at all, not even do error completions.  In this
232 	 * situation, usually a hard-reset and failover is the only way out.
233 	 *
234 	 * By "aborting", basically faking a local error-completion,
235 	 * we allow for a more graceful swichover by cleanly migrating services.
236 	 * Still the affected node has to be rebooted "soon".
237 	 *
238 	 * By completing these requests, we allow the upper layers to re-use
239 	 * the associated data pages.
240 	 *
241 	 * If later the local backing device "recovers", and now DMAs some data
242 	 * from disk into the original request pages, in the best case it will
243 	 * just put random data into unused pages; but typically it will corrupt
244 	 * meanwhile completely unrelated data, causing all sorts of damage.
245 	 *
246 	 * Which means delayed successful completion,
247 	 * especially for READ requests,
248 	 * is a reason to panic().
249 	 *
250 	 * We assume that a delayed *error* completion is OK,
251 	 * though we still will complain noisily about it.
252 	 */
253 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 		if (__ratelimit(&drbd_ratelimit_state))
255 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
256 
257 		if (!bio->bi_status)
258 			drbd_panic_after_delayed_completion_of_aborted_request(device);
259 	}
260 
261 	/* to avoid recursion in __req_mod */
262 	if (unlikely(bio->bi_status)) {
263 		switch (bio_op(bio)) {
264 		case REQ_OP_WRITE_ZEROES:
265 		case REQ_OP_DISCARD:
266 			if (bio->bi_status == BLK_STS_NOTSUPP)
267 				what = DISCARD_COMPLETED_NOTSUPP;
268 			else
269 				what = DISCARD_COMPLETED_WITH_ERROR;
270 			break;
271 		case REQ_OP_READ:
272 			if (bio->bi_opf & REQ_RAHEAD)
273 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
274 			else
275 				what = READ_COMPLETED_WITH_ERROR;
276 			break;
277 		default:
278 			what = WRITE_COMPLETED_WITH_ERROR;
279 			break;
280 		}
281 	} else {
282 		what = COMPLETED_OK;
283 	}
284 
285 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286 	bio_put(bio);
287 
288 	/* not req_mod(), we need irqsave here! */
289 	spin_lock_irqsave(&device->resource->req_lock, flags);
290 	__req_mod(req, what, &m);
291 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
292 	put_ldev(device);
293 
294 	if (m.bio)
295 		complete_master_bio(device, &m);
296 }
297 
298 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
299 {
300 	SHASH_DESC_ON_STACK(desc, tfm);
301 	struct page *page = peer_req->pages;
302 	struct page *tmp;
303 	unsigned len;
304 	void *src;
305 
306 	desc->tfm = tfm;
307 	desc->flags = 0;
308 
309 	crypto_shash_init(desc);
310 
311 	src = kmap_atomic(page);
312 	while ((tmp = page_chain_next(page))) {
313 		/* all but the last page will be fully used */
314 		crypto_shash_update(desc, src, PAGE_SIZE);
315 		kunmap_atomic(src);
316 		page = tmp;
317 		src = kmap_atomic(page);
318 	}
319 	/* and now the last, possibly only partially used page */
320 	len = peer_req->i.size & (PAGE_SIZE - 1);
321 	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
322 	kunmap_atomic(src);
323 
324 	crypto_shash_final(desc, digest);
325 	shash_desc_zero(desc);
326 }
327 
328 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
329 {
330 	SHASH_DESC_ON_STACK(desc, tfm);
331 	struct bio_vec bvec;
332 	struct bvec_iter iter;
333 
334 	desc->tfm = tfm;
335 	desc->flags = 0;
336 
337 	crypto_shash_init(desc);
338 
339 	bio_for_each_segment(bvec, bio, iter) {
340 		u8 *src;
341 
342 		src = kmap_atomic(bvec.bv_page);
343 		crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
344 		kunmap_atomic(src);
345 
346 		/* REQ_OP_WRITE_SAME has only one segment,
347 		 * checksum the payload only once. */
348 		if (bio_op(bio) == REQ_OP_WRITE_SAME)
349 			break;
350 	}
351 	crypto_shash_final(desc, digest);
352 	shash_desc_zero(desc);
353 }
354 
355 /* MAYBE merge common code with w_e_end_ov_req */
356 static int w_e_send_csum(struct drbd_work *w, int cancel)
357 {
358 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
359 	struct drbd_peer_device *peer_device = peer_req->peer_device;
360 	struct drbd_device *device = peer_device->device;
361 	int digest_size;
362 	void *digest;
363 	int err = 0;
364 
365 	if (unlikely(cancel))
366 		goto out;
367 
368 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
369 		goto out;
370 
371 	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
372 	digest = kmalloc(digest_size, GFP_NOIO);
373 	if (digest) {
374 		sector_t sector = peer_req->i.sector;
375 		unsigned int size = peer_req->i.size;
376 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
377 		/* Free peer_req and pages before send.
378 		 * In case we block on congestion, we could otherwise run into
379 		 * some distributed deadlock, if the other side blocks on
380 		 * congestion as well, because our receiver blocks in
381 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
382 		drbd_free_peer_req(device, peer_req);
383 		peer_req = NULL;
384 		inc_rs_pending(device);
385 		err = drbd_send_drequest_csum(peer_device, sector, size,
386 					      digest, digest_size,
387 					      P_CSUM_RS_REQUEST);
388 		kfree(digest);
389 	} else {
390 		drbd_err(device, "kmalloc() of digest failed.\n");
391 		err = -ENOMEM;
392 	}
393 
394 out:
395 	if (peer_req)
396 		drbd_free_peer_req(device, peer_req);
397 
398 	if (unlikely(err))
399 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
400 	return err;
401 }
402 
403 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
404 
405 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
406 {
407 	struct drbd_device *device = peer_device->device;
408 	struct drbd_peer_request *peer_req;
409 
410 	if (!get_ldev(device))
411 		return -EIO;
412 
413 	/* GFP_TRY, because if there is no memory available right now, this may
414 	 * be rescheduled for later. It is "only" background resync, after all. */
415 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
416 				       size, size, GFP_TRY);
417 	if (!peer_req)
418 		goto defer;
419 
420 	peer_req->w.cb = w_e_send_csum;
421 	spin_lock_irq(&device->resource->req_lock);
422 	list_add_tail(&peer_req->w.list, &device->read_ee);
423 	spin_unlock_irq(&device->resource->req_lock);
424 
425 	atomic_add(size >> 9, &device->rs_sect_ev);
426 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
427 				     DRBD_FAULT_RS_RD) == 0)
428 		return 0;
429 
430 	/* If it failed because of ENOMEM, retry should help.  If it failed
431 	 * because bio_add_page failed (probably broken lower level driver),
432 	 * retry may or may not help.
433 	 * If it does not, you may need to force disconnect. */
434 	spin_lock_irq(&device->resource->req_lock);
435 	list_del(&peer_req->w.list);
436 	spin_unlock_irq(&device->resource->req_lock);
437 
438 	drbd_free_peer_req(device, peer_req);
439 defer:
440 	put_ldev(device);
441 	return -EAGAIN;
442 }
443 
444 int w_resync_timer(struct drbd_work *w, int cancel)
445 {
446 	struct drbd_device *device =
447 		container_of(w, struct drbd_device, resync_work);
448 
449 	switch (device->state.conn) {
450 	case C_VERIFY_S:
451 		make_ov_request(device, cancel);
452 		break;
453 	case C_SYNC_TARGET:
454 		make_resync_request(device, cancel);
455 		break;
456 	}
457 
458 	return 0;
459 }
460 
461 void resync_timer_fn(struct timer_list *t)
462 {
463 	struct drbd_device *device = from_timer(device, t, resync_timer);
464 
465 	drbd_queue_work_if_unqueued(
466 		&first_peer_device(device)->connection->sender_work,
467 		&device->resync_work);
468 }
469 
470 static void fifo_set(struct fifo_buffer *fb, int value)
471 {
472 	int i;
473 
474 	for (i = 0; i < fb->size; i++)
475 		fb->values[i] = value;
476 }
477 
478 static int fifo_push(struct fifo_buffer *fb, int value)
479 {
480 	int ov;
481 
482 	ov = fb->values[fb->head_index];
483 	fb->values[fb->head_index++] = value;
484 
485 	if (fb->head_index >= fb->size)
486 		fb->head_index = 0;
487 
488 	return ov;
489 }
490 
491 static void fifo_add_val(struct fifo_buffer *fb, int value)
492 {
493 	int i;
494 
495 	for (i = 0; i < fb->size; i++)
496 		fb->values[i] += value;
497 }
498 
499 struct fifo_buffer *fifo_alloc(int fifo_size)
500 {
501 	struct fifo_buffer *fb;
502 
503 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
504 	if (!fb)
505 		return NULL;
506 
507 	fb->head_index = 0;
508 	fb->size = fifo_size;
509 	fb->total = 0;
510 
511 	return fb;
512 }
513 
514 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
515 {
516 	struct disk_conf *dc;
517 	unsigned int want;     /* The number of sectors we want in-flight */
518 	int req_sect; /* Number of sectors to request in this turn */
519 	int correction; /* Number of sectors more we need in-flight */
520 	int cps; /* correction per invocation of drbd_rs_controller() */
521 	int steps; /* Number of time steps to plan ahead */
522 	int curr_corr;
523 	int max_sect;
524 	struct fifo_buffer *plan;
525 
526 	dc = rcu_dereference(device->ldev->disk_conf);
527 	plan = rcu_dereference(device->rs_plan_s);
528 
529 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
530 
531 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
532 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
533 	} else { /* normal path */
534 		want = dc->c_fill_target ? dc->c_fill_target :
535 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
536 	}
537 
538 	correction = want - device->rs_in_flight - plan->total;
539 
540 	/* Plan ahead */
541 	cps = correction / steps;
542 	fifo_add_val(plan, cps);
543 	plan->total += cps * steps;
544 
545 	/* What we do in this step */
546 	curr_corr = fifo_push(plan, 0);
547 	plan->total -= curr_corr;
548 
549 	req_sect = sect_in + curr_corr;
550 	if (req_sect < 0)
551 		req_sect = 0;
552 
553 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
554 	if (req_sect > max_sect)
555 		req_sect = max_sect;
556 
557 	/*
558 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
559 		 sect_in, device->rs_in_flight, want, correction,
560 		 steps, cps, device->rs_planed, curr_corr, req_sect);
561 	*/
562 
563 	return req_sect;
564 }
565 
566 static int drbd_rs_number_requests(struct drbd_device *device)
567 {
568 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
569 	int number, mxb;
570 
571 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
572 	device->rs_in_flight -= sect_in;
573 
574 	rcu_read_lock();
575 	mxb = drbd_get_max_buffers(device) / 2;
576 	if (rcu_dereference(device->rs_plan_s)->size) {
577 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
578 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
579 	} else {
580 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
581 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
582 	}
583 	rcu_read_unlock();
584 
585 	/* Don't have more than "max-buffers"/2 in-flight.
586 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
587 	 * potentially causing a distributed deadlock on congestion during
588 	 * online-verify or (checksum-based) resync, if max-buffers,
589 	 * socket buffer sizes and resync rate settings are mis-configured. */
590 
591 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
592 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
593 	 * "number of pages" (typically also 4k),
594 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
595 	if (mxb - device->rs_in_flight/8 < number)
596 		number = mxb - device->rs_in_flight/8;
597 
598 	return number;
599 }
600 
601 static int make_resync_request(struct drbd_device *const device, int cancel)
602 {
603 	struct drbd_peer_device *const peer_device = first_peer_device(device);
604 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
605 	unsigned long bit;
606 	sector_t sector;
607 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
608 	int max_bio_size;
609 	int number, rollback_i, size;
610 	int align, requeue = 0;
611 	int i = 0;
612 	int discard_granularity = 0;
613 
614 	if (unlikely(cancel))
615 		return 0;
616 
617 	if (device->rs_total == 0) {
618 		/* empty resync? */
619 		drbd_resync_finished(device);
620 		return 0;
621 	}
622 
623 	if (!get_ldev(device)) {
624 		/* Since we only need to access device->rsync a
625 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
626 		   to continue resync with a broken disk makes no sense at
627 		   all */
628 		drbd_err(device, "Disk broke down during resync!\n");
629 		return 0;
630 	}
631 
632 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
633 		rcu_read_lock();
634 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
635 		rcu_read_unlock();
636 	}
637 
638 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
639 	number = drbd_rs_number_requests(device);
640 	if (number <= 0)
641 		goto requeue;
642 
643 	for (i = 0; i < number; i++) {
644 		/* Stop generating RS requests when half of the send buffer is filled,
645 		 * but notify TCP that we'd like to have more space. */
646 		mutex_lock(&connection->data.mutex);
647 		if (connection->data.socket) {
648 			struct sock *sk = connection->data.socket->sk;
649 			int queued = sk->sk_wmem_queued;
650 			int sndbuf = sk->sk_sndbuf;
651 			if (queued > sndbuf / 2) {
652 				requeue = 1;
653 				if (sk->sk_socket)
654 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
655 			}
656 		} else
657 			requeue = 1;
658 		mutex_unlock(&connection->data.mutex);
659 		if (requeue)
660 			goto requeue;
661 
662 next_sector:
663 		size = BM_BLOCK_SIZE;
664 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
665 
666 		if (bit == DRBD_END_OF_BITMAP) {
667 			device->bm_resync_fo = drbd_bm_bits(device);
668 			put_ldev(device);
669 			return 0;
670 		}
671 
672 		sector = BM_BIT_TO_SECT(bit);
673 
674 		if (drbd_try_rs_begin_io(device, sector)) {
675 			device->bm_resync_fo = bit;
676 			goto requeue;
677 		}
678 		device->bm_resync_fo = bit + 1;
679 
680 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
681 			drbd_rs_complete_io(device, sector);
682 			goto next_sector;
683 		}
684 
685 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
686 		/* try to find some adjacent bits.
687 		 * we stop if we have already the maximum req size.
688 		 *
689 		 * Additionally always align bigger requests, in order to
690 		 * be prepared for all stripe sizes of software RAIDs.
691 		 */
692 		align = 1;
693 		rollback_i = i;
694 		while (i < number) {
695 			if (size + BM_BLOCK_SIZE > max_bio_size)
696 				break;
697 
698 			/* Be always aligned */
699 			if (sector & ((1<<(align+3))-1))
700 				break;
701 
702 			if (discard_granularity && size == discard_granularity)
703 				break;
704 
705 			/* do not cross extent boundaries */
706 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
707 				break;
708 			/* now, is it actually dirty, after all?
709 			 * caution, drbd_bm_test_bit is tri-state for some
710 			 * obscure reason; ( b == 0 ) would get the out-of-band
711 			 * only accidentally right because of the "oddly sized"
712 			 * adjustment below */
713 			if (drbd_bm_test_bit(device, bit+1) != 1)
714 				break;
715 			bit++;
716 			size += BM_BLOCK_SIZE;
717 			if ((BM_BLOCK_SIZE << align) <= size)
718 				align++;
719 			i++;
720 		}
721 		/* if we merged some,
722 		 * reset the offset to start the next drbd_bm_find_next from */
723 		if (size > BM_BLOCK_SIZE)
724 			device->bm_resync_fo = bit + 1;
725 #endif
726 
727 		/* adjust very last sectors, in case we are oddly sized */
728 		if (sector + (size>>9) > capacity)
729 			size = (capacity-sector)<<9;
730 
731 		if (device->use_csums) {
732 			switch (read_for_csum(peer_device, sector, size)) {
733 			case -EIO: /* Disk failure */
734 				put_ldev(device);
735 				return -EIO;
736 			case -EAGAIN: /* allocation failed, or ldev busy */
737 				drbd_rs_complete_io(device, sector);
738 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
739 				i = rollback_i;
740 				goto requeue;
741 			case 0:
742 				/* everything ok */
743 				break;
744 			default:
745 				BUG();
746 			}
747 		} else {
748 			int err;
749 
750 			inc_rs_pending(device);
751 			err = drbd_send_drequest(peer_device,
752 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
753 						 sector, size, ID_SYNCER);
754 			if (err) {
755 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
756 				dec_rs_pending(device);
757 				put_ldev(device);
758 				return err;
759 			}
760 		}
761 	}
762 
763 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
764 		/* last syncer _request_ was sent,
765 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
766 		 * next sync group will resume), as soon as we receive the last
767 		 * resync data block, and the last bit is cleared.
768 		 * until then resync "work" is "inactive" ...
769 		 */
770 		put_ldev(device);
771 		return 0;
772 	}
773 
774  requeue:
775 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
776 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
777 	put_ldev(device);
778 	return 0;
779 }
780 
781 static int make_ov_request(struct drbd_device *device, int cancel)
782 {
783 	int number, i, size;
784 	sector_t sector;
785 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
786 	bool stop_sector_reached = false;
787 
788 	if (unlikely(cancel))
789 		return 1;
790 
791 	number = drbd_rs_number_requests(device);
792 
793 	sector = device->ov_position;
794 	for (i = 0; i < number; i++) {
795 		if (sector >= capacity)
796 			return 1;
797 
798 		/* We check for "finished" only in the reply path:
799 		 * w_e_end_ov_reply().
800 		 * We need to send at least one request out. */
801 		stop_sector_reached = i > 0
802 			&& verify_can_do_stop_sector(device)
803 			&& sector >= device->ov_stop_sector;
804 		if (stop_sector_reached)
805 			break;
806 
807 		size = BM_BLOCK_SIZE;
808 
809 		if (drbd_try_rs_begin_io(device, sector)) {
810 			device->ov_position = sector;
811 			goto requeue;
812 		}
813 
814 		if (sector + (size>>9) > capacity)
815 			size = (capacity-sector)<<9;
816 
817 		inc_rs_pending(device);
818 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
819 			dec_rs_pending(device);
820 			return 0;
821 		}
822 		sector += BM_SECT_PER_BIT;
823 	}
824 	device->ov_position = sector;
825 
826  requeue:
827 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
828 	if (i == 0 || !stop_sector_reached)
829 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
830 	return 1;
831 }
832 
833 int w_ov_finished(struct drbd_work *w, int cancel)
834 {
835 	struct drbd_device_work *dw =
836 		container_of(w, struct drbd_device_work, w);
837 	struct drbd_device *device = dw->device;
838 	kfree(dw);
839 	ov_out_of_sync_print(device);
840 	drbd_resync_finished(device);
841 
842 	return 0;
843 }
844 
845 static int w_resync_finished(struct drbd_work *w, int cancel)
846 {
847 	struct drbd_device_work *dw =
848 		container_of(w, struct drbd_device_work, w);
849 	struct drbd_device *device = dw->device;
850 	kfree(dw);
851 
852 	drbd_resync_finished(device);
853 
854 	return 0;
855 }
856 
857 static void ping_peer(struct drbd_device *device)
858 {
859 	struct drbd_connection *connection = first_peer_device(device)->connection;
860 
861 	clear_bit(GOT_PING_ACK, &connection->flags);
862 	request_ping(connection);
863 	wait_event(connection->ping_wait,
864 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
865 }
866 
867 int drbd_resync_finished(struct drbd_device *device)
868 {
869 	struct drbd_connection *connection = first_peer_device(device)->connection;
870 	unsigned long db, dt, dbdt;
871 	unsigned long n_oos;
872 	union drbd_state os, ns;
873 	struct drbd_device_work *dw;
874 	char *khelper_cmd = NULL;
875 	int verify_done = 0;
876 
877 	/* Remove all elements from the resync LRU. Since future actions
878 	 * might set bits in the (main) bitmap, then the entries in the
879 	 * resync LRU would be wrong. */
880 	if (drbd_rs_del_all(device)) {
881 		/* In case this is not possible now, most probably because
882 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
883 		 * queue (or even the read operations for those packets
884 		 * is not finished by now).   Retry in 100ms. */
885 
886 		schedule_timeout_interruptible(HZ / 10);
887 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
888 		if (dw) {
889 			dw->w.cb = w_resync_finished;
890 			dw->device = device;
891 			drbd_queue_work(&connection->sender_work, &dw->w);
892 			return 1;
893 		}
894 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
895 	}
896 
897 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
898 	if (dt <= 0)
899 		dt = 1;
900 
901 	db = device->rs_total;
902 	/* adjust for verify start and stop sectors, respective reached position */
903 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
904 		db -= device->ov_left;
905 
906 	dbdt = Bit2KB(db/dt);
907 	device->rs_paused /= HZ;
908 
909 	if (!get_ldev(device))
910 		goto out;
911 
912 	ping_peer(device);
913 
914 	spin_lock_irq(&device->resource->req_lock);
915 	os = drbd_read_state(device);
916 
917 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
918 
919 	/* This protects us against multiple calls (that can happen in the presence
920 	   of application IO), and against connectivity loss just before we arrive here. */
921 	if (os.conn <= C_CONNECTED)
922 		goto out_unlock;
923 
924 	ns = os;
925 	ns.conn = C_CONNECTED;
926 
927 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
928 	     verify_done ? "Online verify" : "Resync",
929 	     dt + device->rs_paused, device->rs_paused, dbdt);
930 
931 	n_oos = drbd_bm_total_weight(device);
932 
933 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
934 		if (n_oos) {
935 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
936 			      n_oos, Bit2KB(1));
937 			khelper_cmd = "out-of-sync";
938 		}
939 	} else {
940 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
941 
942 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
943 			khelper_cmd = "after-resync-target";
944 
945 		if (device->use_csums && device->rs_total) {
946 			const unsigned long s = device->rs_same_csum;
947 			const unsigned long t = device->rs_total;
948 			const int ratio =
949 				(t == 0)     ? 0 :
950 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
951 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
952 			     "transferred %luK total %luK\n",
953 			     ratio,
954 			     Bit2KB(device->rs_same_csum),
955 			     Bit2KB(device->rs_total - device->rs_same_csum),
956 			     Bit2KB(device->rs_total));
957 		}
958 	}
959 
960 	if (device->rs_failed) {
961 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
962 
963 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
964 			ns.disk = D_INCONSISTENT;
965 			ns.pdsk = D_UP_TO_DATE;
966 		} else {
967 			ns.disk = D_UP_TO_DATE;
968 			ns.pdsk = D_INCONSISTENT;
969 		}
970 	} else {
971 		ns.disk = D_UP_TO_DATE;
972 		ns.pdsk = D_UP_TO_DATE;
973 
974 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
975 			if (device->p_uuid) {
976 				int i;
977 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
978 					_drbd_uuid_set(device, i, device->p_uuid[i]);
979 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
980 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
981 			} else {
982 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
983 			}
984 		}
985 
986 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
987 			/* for verify runs, we don't update uuids here,
988 			 * so there would be nothing to report. */
989 			drbd_uuid_set_bm(device, 0UL);
990 			drbd_print_uuids(device, "updated UUIDs");
991 			if (device->p_uuid) {
992 				/* Now the two UUID sets are equal, update what we
993 				 * know of the peer. */
994 				int i;
995 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
996 					device->p_uuid[i] = device->ldev->md.uuid[i];
997 			}
998 		}
999 	}
1000 
1001 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
1002 out_unlock:
1003 	spin_unlock_irq(&device->resource->req_lock);
1004 
1005 	/* If we have been sync source, and have an effective fencing-policy,
1006 	 * once *all* volumes are back in sync, call "unfence". */
1007 	if (os.conn == C_SYNC_SOURCE) {
1008 		enum drbd_disk_state disk_state = D_MASK;
1009 		enum drbd_disk_state pdsk_state = D_MASK;
1010 		enum drbd_fencing_p fp = FP_DONT_CARE;
1011 
1012 		rcu_read_lock();
1013 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1014 		if (fp != FP_DONT_CARE) {
1015 			struct drbd_peer_device *peer_device;
1016 			int vnr;
1017 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1018 				struct drbd_device *device = peer_device->device;
1019 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1020 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1021 			}
1022 		}
1023 		rcu_read_unlock();
1024 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1025 			conn_khelper(connection, "unfence-peer");
1026 	}
1027 
1028 	put_ldev(device);
1029 out:
1030 	device->rs_total  = 0;
1031 	device->rs_failed = 0;
1032 	device->rs_paused = 0;
1033 
1034 	/* reset start sector, if we reached end of device */
1035 	if (verify_done && device->ov_left == 0)
1036 		device->ov_start_sector = 0;
1037 
1038 	drbd_md_sync(device);
1039 
1040 	if (khelper_cmd)
1041 		drbd_khelper(device, khelper_cmd);
1042 
1043 	return 1;
1044 }
1045 
1046 /* helper */
1047 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1048 {
1049 	if (drbd_peer_req_has_active_page(peer_req)) {
1050 		/* This might happen if sendpage() has not finished */
1051 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1052 		atomic_add(i, &device->pp_in_use_by_net);
1053 		atomic_sub(i, &device->pp_in_use);
1054 		spin_lock_irq(&device->resource->req_lock);
1055 		list_add_tail(&peer_req->w.list, &device->net_ee);
1056 		spin_unlock_irq(&device->resource->req_lock);
1057 		wake_up(&drbd_pp_wait);
1058 	} else
1059 		drbd_free_peer_req(device, peer_req);
1060 }
1061 
1062 /**
1063  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1064  * @w:		work object.
1065  * @cancel:	The connection will be closed anyways
1066  */
1067 int w_e_end_data_req(struct drbd_work *w, int cancel)
1068 {
1069 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1070 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1071 	struct drbd_device *device = peer_device->device;
1072 	int err;
1073 
1074 	if (unlikely(cancel)) {
1075 		drbd_free_peer_req(device, peer_req);
1076 		dec_unacked(device);
1077 		return 0;
1078 	}
1079 
1080 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1081 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1082 	} else {
1083 		if (__ratelimit(&drbd_ratelimit_state))
1084 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1085 			    (unsigned long long)peer_req->i.sector);
1086 
1087 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1088 	}
1089 
1090 	dec_unacked(device);
1091 
1092 	move_to_net_ee_or_free(device, peer_req);
1093 
1094 	if (unlikely(err))
1095 		drbd_err(device, "drbd_send_block() failed\n");
1096 	return err;
1097 }
1098 
1099 static bool all_zero(struct drbd_peer_request *peer_req)
1100 {
1101 	struct page *page = peer_req->pages;
1102 	unsigned int len = peer_req->i.size;
1103 
1104 	page_chain_for_each(page) {
1105 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1106 		unsigned int i, words = l / sizeof(long);
1107 		unsigned long *d;
1108 
1109 		d = kmap_atomic(page);
1110 		for (i = 0; i < words; i++) {
1111 			if (d[i]) {
1112 				kunmap_atomic(d);
1113 				return false;
1114 			}
1115 		}
1116 		kunmap_atomic(d);
1117 		len -= l;
1118 	}
1119 
1120 	return true;
1121 }
1122 
1123 /**
1124  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1125  * @w:		work object.
1126  * @cancel:	The connection will be closed anyways
1127  */
1128 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1129 {
1130 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1131 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1132 	struct drbd_device *device = peer_device->device;
1133 	int err;
1134 
1135 	if (unlikely(cancel)) {
1136 		drbd_free_peer_req(device, peer_req);
1137 		dec_unacked(device);
1138 		return 0;
1139 	}
1140 
1141 	if (get_ldev_if_state(device, D_FAILED)) {
1142 		drbd_rs_complete_io(device, peer_req->i.sector);
1143 		put_ldev(device);
1144 	}
1145 
1146 	if (device->state.conn == C_AHEAD) {
1147 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1148 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1149 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1150 			inc_rs_pending(device);
1151 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1152 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1153 			else
1154 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1155 		} else {
1156 			if (__ratelimit(&drbd_ratelimit_state))
1157 				drbd_err(device, "Not sending RSDataReply, "
1158 				    "partner DISKLESS!\n");
1159 			err = 0;
1160 		}
1161 	} else {
1162 		if (__ratelimit(&drbd_ratelimit_state))
1163 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1164 			    (unsigned long long)peer_req->i.sector);
1165 
1166 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1167 
1168 		/* update resync data with failure */
1169 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1170 	}
1171 
1172 	dec_unacked(device);
1173 
1174 	move_to_net_ee_or_free(device, peer_req);
1175 
1176 	if (unlikely(err))
1177 		drbd_err(device, "drbd_send_block() failed\n");
1178 	return err;
1179 }
1180 
1181 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1182 {
1183 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1184 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1185 	struct drbd_device *device = peer_device->device;
1186 	struct digest_info *di;
1187 	int digest_size;
1188 	void *digest = NULL;
1189 	int err, eq = 0;
1190 
1191 	if (unlikely(cancel)) {
1192 		drbd_free_peer_req(device, peer_req);
1193 		dec_unacked(device);
1194 		return 0;
1195 	}
1196 
1197 	if (get_ldev(device)) {
1198 		drbd_rs_complete_io(device, peer_req->i.sector);
1199 		put_ldev(device);
1200 	}
1201 
1202 	di = peer_req->digest;
1203 
1204 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1205 		/* quick hack to try to avoid a race against reconfiguration.
1206 		 * a real fix would be much more involved,
1207 		 * introducing more locking mechanisms */
1208 		if (peer_device->connection->csums_tfm) {
1209 			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1210 			D_ASSERT(device, digest_size == di->digest_size);
1211 			digest = kmalloc(digest_size, GFP_NOIO);
1212 		}
1213 		if (digest) {
1214 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1215 			eq = !memcmp(digest, di->digest, digest_size);
1216 			kfree(digest);
1217 		}
1218 
1219 		if (eq) {
1220 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1221 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1222 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1223 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1224 		} else {
1225 			inc_rs_pending(device);
1226 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1227 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1228 			kfree(di);
1229 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1230 		}
1231 	} else {
1232 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1233 		if (__ratelimit(&drbd_ratelimit_state))
1234 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1235 	}
1236 
1237 	dec_unacked(device);
1238 	move_to_net_ee_or_free(device, peer_req);
1239 
1240 	if (unlikely(err))
1241 		drbd_err(device, "drbd_send_block/ack() failed\n");
1242 	return err;
1243 }
1244 
1245 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1246 {
1247 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1248 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1249 	struct drbd_device *device = peer_device->device;
1250 	sector_t sector = peer_req->i.sector;
1251 	unsigned int size = peer_req->i.size;
1252 	int digest_size;
1253 	void *digest;
1254 	int err = 0;
1255 
1256 	if (unlikely(cancel))
1257 		goto out;
1258 
1259 	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1260 	digest = kmalloc(digest_size, GFP_NOIO);
1261 	if (!digest) {
1262 		err = 1;	/* terminate the connection in case the allocation failed */
1263 		goto out;
1264 	}
1265 
1266 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1267 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1268 	else
1269 		memset(digest, 0, digest_size);
1270 
1271 	/* Free e and pages before send.
1272 	 * In case we block on congestion, we could otherwise run into
1273 	 * some distributed deadlock, if the other side blocks on
1274 	 * congestion as well, because our receiver blocks in
1275 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1276 	drbd_free_peer_req(device, peer_req);
1277 	peer_req = NULL;
1278 	inc_rs_pending(device);
1279 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1280 	if (err)
1281 		dec_rs_pending(device);
1282 	kfree(digest);
1283 
1284 out:
1285 	if (peer_req)
1286 		drbd_free_peer_req(device, peer_req);
1287 	dec_unacked(device);
1288 	return err;
1289 }
1290 
1291 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1292 {
1293 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1294 		device->ov_last_oos_size += size>>9;
1295 	} else {
1296 		device->ov_last_oos_start = sector;
1297 		device->ov_last_oos_size = size>>9;
1298 	}
1299 	drbd_set_out_of_sync(device, sector, size);
1300 }
1301 
1302 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1303 {
1304 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1305 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1306 	struct drbd_device *device = peer_device->device;
1307 	struct digest_info *di;
1308 	void *digest;
1309 	sector_t sector = peer_req->i.sector;
1310 	unsigned int size = peer_req->i.size;
1311 	int digest_size;
1312 	int err, eq = 0;
1313 	bool stop_sector_reached = false;
1314 
1315 	if (unlikely(cancel)) {
1316 		drbd_free_peer_req(device, peer_req);
1317 		dec_unacked(device);
1318 		return 0;
1319 	}
1320 
1321 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1322 	 * the resync lru has been cleaned up already */
1323 	if (get_ldev(device)) {
1324 		drbd_rs_complete_io(device, peer_req->i.sector);
1325 		put_ldev(device);
1326 	}
1327 
1328 	di = peer_req->digest;
1329 
1330 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1331 		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1332 		digest = kmalloc(digest_size, GFP_NOIO);
1333 		if (digest) {
1334 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1335 
1336 			D_ASSERT(device, digest_size == di->digest_size);
1337 			eq = !memcmp(digest, di->digest, digest_size);
1338 			kfree(digest);
1339 		}
1340 	}
1341 
1342 	/* Free peer_req and pages before send.
1343 	 * In case we block on congestion, we could otherwise run into
1344 	 * some distributed deadlock, if the other side blocks on
1345 	 * congestion as well, because our receiver blocks in
1346 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1347 	drbd_free_peer_req(device, peer_req);
1348 	if (!eq)
1349 		drbd_ov_out_of_sync_found(device, sector, size);
1350 	else
1351 		ov_out_of_sync_print(device);
1352 
1353 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1354 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1355 
1356 	dec_unacked(device);
1357 
1358 	--device->ov_left;
1359 
1360 	/* let's advance progress step marks only for every other megabyte */
1361 	if ((device->ov_left & 0x200) == 0x200)
1362 		drbd_advance_rs_marks(device, device->ov_left);
1363 
1364 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1365 		(sector + (size>>9)) >= device->ov_stop_sector;
1366 
1367 	if (device->ov_left == 0 || stop_sector_reached) {
1368 		ov_out_of_sync_print(device);
1369 		drbd_resync_finished(device);
1370 	}
1371 
1372 	return err;
1373 }
1374 
1375 /* FIXME
1376  * We need to track the number of pending barrier acks,
1377  * and to be able to wait for them.
1378  * See also comment in drbd_adm_attach before drbd_suspend_io.
1379  */
1380 static int drbd_send_barrier(struct drbd_connection *connection)
1381 {
1382 	struct p_barrier *p;
1383 	struct drbd_socket *sock;
1384 
1385 	sock = &connection->data;
1386 	p = conn_prepare_command(connection, sock);
1387 	if (!p)
1388 		return -EIO;
1389 	p->barrier = connection->send.current_epoch_nr;
1390 	p->pad = 0;
1391 	connection->send.current_epoch_writes = 0;
1392 	connection->send.last_sent_barrier_jif = jiffies;
1393 
1394 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1395 }
1396 
1397 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1398 {
1399 	struct drbd_socket *sock = &pd->connection->data;
1400 	if (!drbd_prepare_command(pd, sock))
1401 		return -EIO;
1402 	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1403 }
1404 
1405 int w_send_write_hint(struct drbd_work *w, int cancel)
1406 {
1407 	struct drbd_device *device =
1408 		container_of(w, struct drbd_device, unplug_work);
1409 
1410 	if (cancel)
1411 		return 0;
1412 	return pd_send_unplug_remote(first_peer_device(device));
1413 }
1414 
1415 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1416 {
1417 	if (!connection->send.seen_any_write_yet) {
1418 		connection->send.seen_any_write_yet = true;
1419 		connection->send.current_epoch_nr = epoch;
1420 		connection->send.current_epoch_writes = 0;
1421 		connection->send.last_sent_barrier_jif = jiffies;
1422 	}
1423 }
1424 
1425 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1426 {
1427 	/* re-init if first write on this connection */
1428 	if (!connection->send.seen_any_write_yet)
1429 		return;
1430 	if (connection->send.current_epoch_nr != epoch) {
1431 		if (connection->send.current_epoch_writes)
1432 			drbd_send_barrier(connection);
1433 		connection->send.current_epoch_nr = epoch;
1434 	}
1435 }
1436 
1437 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1438 {
1439 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1440 	struct drbd_device *device = req->device;
1441 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1442 	struct drbd_connection *const connection = peer_device->connection;
1443 	int err;
1444 
1445 	if (unlikely(cancel)) {
1446 		req_mod(req, SEND_CANCELED);
1447 		return 0;
1448 	}
1449 	req->pre_send_jif = jiffies;
1450 
1451 	/* this time, no connection->send.current_epoch_writes++;
1452 	 * If it was sent, it was the closing barrier for the last
1453 	 * replicated epoch, before we went into AHEAD mode.
1454 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1455 	maybe_send_barrier(connection, req->epoch);
1456 
1457 	err = drbd_send_out_of_sync(peer_device, req);
1458 	req_mod(req, OOS_HANDED_TO_NETWORK);
1459 
1460 	return err;
1461 }
1462 
1463 /**
1464  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1465  * @w:		work object.
1466  * @cancel:	The connection will be closed anyways
1467  */
1468 int w_send_dblock(struct drbd_work *w, int cancel)
1469 {
1470 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1471 	struct drbd_device *device = req->device;
1472 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1473 	struct drbd_connection *connection = peer_device->connection;
1474 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1475 	int err;
1476 
1477 	if (unlikely(cancel)) {
1478 		req_mod(req, SEND_CANCELED);
1479 		return 0;
1480 	}
1481 	req->pre_send_jif = jiffies;
1482 
1483 	re_init_if_first_write(connection, req->epoch);
1484 	maybe_send_barrier(connection, req->epoch);
1485 	connection->send.current_epoch_writes++;
1486 
1487 	err = drbd_send_dblock(peer_device, req);
1488 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1489 
1490 	if (do_send_unplug && !err)
1491 		pd_send_unplug_remote(peer_device);
1492 
1493 	return err;
1494 }
1495 
1496 /**
1497  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1498  * @w:		work object.
1499  * @cancel:	The connection will be closed anyways
1500  */
1501 int w_send_read_req(struct drbd_work *w, int cancel)
1502 {
1503 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1504 	struct drbd_device *device = req->device;
1505 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1506 	struct drbd_connection *connection = peer_device->connection;
1507 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1508 	int err;
1509 
1510 	if (unlikely(cancel)) {
1511 		req_mod(req, SEND_CANCELED);
1512 		return 0;
1513 	}
1514 	req->pre_send_jif = jiffies;
1515 
1516 	/* Even read requests may close a write epoch,
1517 	 * if there was any yet. */
1518 	maybe_send_barrier(connection, req->epoch);
1519 
1520 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1521 				 (unsigned long)req);
1522 
1523 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1524 
1525 	if (do_send_unplug && !err)
1526 		pd_send_unplug_remote(peer_device);
1527 
1528 	return err;
1529 }
1530 
1531 int w_restart_disk_io(struct drbd_work *w, int cancel)
1532 {
1533 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1534 	struct drbd_device *device = req->device;
1535 
1536 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1537 		drbd_al_begin_io(device, &req->i);
1538 
1539 	drbd_req_make_private_bio(req, req->master_bio);
1540 	bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1541 	generic_make_request(req->private_bio);
1542 
1543 	return 0;
1544 }
1545 
1546 static int _drbd_may_sync_now(struct drbd_device *device)
1547 {
1548 	struct drbd_device *odev = device;
1549 	int resync_after;
1550 
1551 	while (1) {
1552 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1553 			return 1;
1554 		rcu_read_lock();
1555 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1556 		rcu_read_unlock();
1557 		if (resync_after == -1)
1558 			return 1;
1559 		odev = minor_to_device(resync_after);
1560 		if (!odev)
1561 			return 1;
1562 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1563 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1564 		    odev->state.aftr_isp || odev->state.peer_isp ||
1565 		    odev->state.user_isp)
1566 			return 0;
1567 	}
1568 }
1569 
1570 /**
1571  * drbd_pause_after() - Pause resync on all devices that may not resync now
1572  * @device:	DRBD device.
1573  *
1574  * Called from process context only (admin command and after_state_ch).
1575  */
1576 static bool drbd_pause_after(struct drbd_device *device)
1577 {
1578 	bool changed = false;
1579 	struct drbd_device *odev;
1580 	int i;
1581 
1582 	rcu_read_lock();
1583 	idr_for_each_entry(&drbd_devices, odev, i) {
1584 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1585 			continue;
1586 		if (!_drbd_may_sync_now(odev) &&
1587 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1588 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1589 			changed = true;
1590 	}
1591 	rcu_read_unlock();
1592 
1593 	return changed;
1594 }
1595 
1596 /**
1597  * drbd_resume_next() - Resume resync on all devices that may resync now
1598  * @device:	DRBD device.
1599  *
1600  * Called from process context only (admin command and worker).
1601  */
1602 static bool drbd_resume_next(struct drbd_device *device)
1603 {
1604 	bool changed = false;
1605 	struct drbd_device *odev;
1606 	int i;
1607 
1608 	rcu_read_lock();
1609 	idr_for_each_entry(&drbd_devices, odev, i) {
1610 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1611 			continue;
1612 		if (odev->state.aftr_isp) {
1613 			if (_drbd_may_sync_now(odev) &&
1614 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1615 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1616 				changed = true;
1617 		}
1618 	}
1619 	rcu_read_unlock();
1620 	return changed;
1621 }
1622 
1623 void resume_next_sg(struct drbd_device *device)
1624 {
1625 	lock_all_resources();
1626 	drbd_resume_next(device);
1627 	unlock_all_resources();
1628 }
1629 
1630 void suspend_other_sg(struct drbd_device *device)
1631 {
1632 	lock_all_resources();
1633 	drbd_pause_after(device);
1634 	unlock_all_resources();
1635 }
1636 
1637 /* caller must lock_all_resources() */
1638 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1639 {
1640 	struct drbd_device *odev;
1641 	int resync_after;
1642 
1643 	if (o_minor == -1)
1644 		return NO_ERROR;
1645 	if (o_minor < -1 || o_minor > MINORMASK)
1646 		return ERR_RESYNC_AFTER;
1647 
1648 	/* check for loops */
1649 	odev = minor_to_device(o_minor);
1650 	while (1) {
1651 		if (odev == device)
1652 			return ERR_RESYNC_AFTER_CYCLE;
1653 
1654 		/* You are free to depend on diskless, non-existing,
1655 		 * or not yet/no longer existing minors.
1656 		 * We only reject dependency loops.
1657 		 * We cannot follow the dependency chain beyond a detached or
1658 		 * missing minor.
1659 		 */
1660 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1661 			return NO_ERROR;
1662 
1663 		rcu_read_lock();
1664 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1665 		rcu_read_unlock();
1666 		/* dependency chain ends here, no cycles. */
1667 		if (resync_after == -1)
1668 			return NO_ERROR;
1669 
1670 		/* follow the dependency chain */
1671 		odev = minor_to_device(resync_after);
1672 	}
1673 }
1674 
1675 /* caller must lock_all_resources() */
1676 void drbd_resync_after_changed(struct drbd_device *device)
1677 {
1678 	int changed;
1679 
1680 	do {
1681 		changed  = drbd_pause_after(device);
1682 		changed |= drbd_resume_next(device);
1683 	} while (changed);
1684 }
1685 
1686 void drbd_rs_controller_reset(struct drbd_device *device)
1687 {
1688 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1689 	struct fifo_buffer *plan;
1690 
1691 	atomic_set(&device->rs_sect_in, 0);
1692 	atomic_set(&device->rs_sect_ev, 0);
1693 	device->rs_in_flight = 0;
1694 	device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1695 
1696 	/* Updating the RCU protected object in place is necessary since
1697 	   this function gets called from atomic context.
1698 	   It is valid since all other updates also lead to an completely
1699 	   empty fifo */
1700 	rcu_read_lock();
1701 	plan = rcu_dereference(device->rs_plan_s);
1702 	plan->total = 0;
1703 	fifo_set(plan, 0);
1704 	rcu_read_unlock();
1705 }
1706 
1707 void start_resync_timer_fn(struct timer_list *t)
1708 {
1709 	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1710 	drbd_device_post_work(device, RS_START);
1711 }
1712 
1713 static void do_start_resync(struct drbd_device *device)
1714 {
1715 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1716 		drbd_warn(device, "postponing start_resync ...\n");
1717 		device->start_resync_timer.expires = jiffies + HZ/10;
1718 		add_timer(&device->start_resync_timer);
1719 		return;
1720 	}
1721 
1722 	drbd_start_resync(device, C_SYNC_SOURCE);
1723 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1724 }
1725 
1726 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1727 {
1728 	bool csums_after_crash_only;
1729 	rcu_read_lock();
1730 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1731 	rcu_read_unlock();
1732 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1733 		connection->csums_tfm &&			/* configured? */
1734 		(csums_after_crash_only == false		/* use for each resync? */
1735 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1736 }
1737 
1738 /**
1739  * drbd_start_resync() - Start the resync process
1740  * @device:	DRBD device.
1741  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1742  *
1743  * This function might bring you directly into one of the
1744  * C_PAUSED_SYNC_* states.
1745  */
1746 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1747 {
1748 	struct drbd_peer_device *peer_device = first_peer_device(device);
1749 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1750 	union drbd_state ns;
1751 	int r;
1752 
1753 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1754 		drbd_err(device, "Resync already running!\n");
1755 		return;
1756 	}
1757 
1758 	if (!connection) {
1759 		drbd_err(device, "No connection to peer, aborting!\n");
1760 		return;
1761 	}
1762 
1763 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1764 		if (side == C_SYNC_TARGET) {
1765 			/* Since application IO was locked out during C_WF_BITMAP_T and
1766 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1767 			   we check that we might make the data inconsistent. */
1768 			r = drbd_khelper(device, "before-resync-target");
1769 			r = (r >> 8) & 0xff;
1770 			if (r > 0) {
1771 				drbd_info(device, "before-resync-target handler returned %d, "
1772 					 "dropping connection.\n", r);
1773 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1774 				return;
1775 			}
1776 		} else /* C_SYNC_SOURCE */ {
1777 			r = drbd_khelper(device, "before-resync-source");
1778 			r = (r >> 8) & 0xff;
1779 			if (r > 0) {
1780 				if (r == 3) {
1781 					drbd_info(device, "before-resync-source handler returned %d, "
1782 						 "ignoring. Old userland tools?", r);
1783 				} else {
1784 					drbd_info(device, "before-resync-source handler returned %d, "
1785 						 "dropping connection.\n", r);
1786 					conn_request_state(connection,
1787 							   NS(conn, C_DISCONNECTING), CS_HARD);
1788 					return;
1789 				}
1790 			}
1791 		}
1792 	}
1793 
1794 	if (current == connection->worker.task) {
1795 		/* The worker should not sleep waiting for state_mutex,
1796 		   that can take long */
1797 		if (!mutex_trylock(device->state_mutex)) {
1798 			set_bit(B_RS_H_DONE, &device->flags);
1799 			device->start_resync_timer.expires = jiffies + HZ/5;
1800 			add_timer(&device->start_resync_timer);
1801 			return;
1802 		}
1803 	} else {
1804 		mutex_lock(device->state_mutex);
1805 	}
1806 
1807 	lock_all_resources();
1808 	clear_bit(B_RS_H_DONE, &device->flags);
1809 	/* Did some connection breakage or IO error race with us? */
1810 	if (device->state.conn < C_CONNECTED
1811 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1812 		unlock_all_resources();
1813 		goto out;
1814 	}
1815 
1816 	ns = drbd_read_state(device);
1817 
1818 	ns.aftr_isp = !_drbd_may_sync_now(device);
1819 
1820 	ns.conn = side;
1821 
1822 	if (side == C_SYNC_TARGET)
1823 		ns.disk = D_INCONSISTENT;
1824 	else /* side == C_SYNC_SOURCE */
1825 		ns.pdsk = D_INCONSISTENT;
1826 
1827 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1828 	ns = drbd_read_state(device);
1829 
1830 	if (ns.conn < C_CONNECTED)
1831 		r = SS_UNKNOWN_ERROR;
1832 
1833 	if (r == SS_SUCCESS) {
1834 		unsigned long tw = drbd_bm_total_weight(device);
1835 		unsigned long now = jiffies;
1836 		int i;
1837 
1838 		device->rs_failed    = 0;
1839 		device->rs_paused    = 0;
1840 		device->rs_same_csum = 0;
1841 		device->rs_last_sect_ev = 0;
1842 		device->rs_total     = tw;
1843 		device->rs_start     = now;
1844 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1845 			device->rs_mark_left[i] = tw;
1846 			device->rs_mark_time[i] = now;
1847 		}
1848 		drbd_pause_after(device);
1849 		/* Forget potentially stale cached per resync extent bit-counts.
1850 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1851 		 * disabled, and know the disk state is ok. */
1852 		spin_lock(&device->al_lock);
1853 		lc_reset(device->resync);
1854 		device->resync_locked = 0;
1855 		device->resync_wenr = LC_FREE;
1856 		spin_unlock(&device->al_lock);
1857 	}
1858 	unlock_all_resources();
1859 
1860 	if (r == SS_SUCCESS) {
1861 		wake_up(&device->al_wait); /* for lc_reset() above */
1862 		/* reset rs_last_bcast when a resync or verify is started,
1863 		 * to deal with potential jiffies wrap. */
1864 		device->rs_last_bcast = jiffies - HZ;
1865 
1866 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1867 		     drbd_conn_str(ns.conn),
1868 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1869 		     (unsigned long) device->rs_total);
1870 		if (side == C_SYNC_TARGET) {
1871 			device->bm_resync_fo = 0;
1872 			device->use_csums = use_checksum_based_resync(connection, device);
1873 		} else {
1874 			device->use_csums = false;
1875 		}
1876 
1877 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1878 		 * with w_send_oos, or the sync target will get confused as to
1879 		 * how much bits to resync.  We cannot do that always, because for an
1880 		 * empty resync and protocol < 95, we need to do it here, as we call
1881 		 * drbd_resync_finished from here in that case.
1882 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1883 		 * and from after_state_ch otherwise. */
1884 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1885 			drbd_gen_and_send_sync_uuid(peer_device);
1886 
1887 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1888 			/* This still has a race (about when exactly the peers
1889 			 * detect connection loss) that can lead to a full sync
1890 			 * on next handshake. In 8.3.9 we fixed this with explicit
1891 			 * resync-finished notifications, but the fix
1892 			 * introduces a protocol change.  Sleeping for some
1893 			 * time longer than the ping interval + timeout on the
1894 			 * SyncSource, to give the SyncTarget the chance to
1895 			 * detect connection loss, then waiting for a ping
1896 			 * response (implicit in drbd_resync_finished) reduces
1897 			 * the race considerably, but does not solve it. */
1898 			if (side == C_SYNC_SOURCE) {
1899 				struct net_conf *nc;
1900 				int timeo;
1901 
1902 				rcu_read_lock();
1903 				nc = rcu_dereference(connection->net_conf);
1904 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1905 				rcu_read_unlock();
1906 				schedule_timeout_interruptible(timeo);
1907 			}
1908 			drbd_resync_finished(device);
1909 		}
1910 
1911 		drbd_rs_controller_reset(device);
1912 		/* ns.conn may already be != device->state.conn,
1913 		 * we may have been paused in between, or become paused until
1914 		 * the timer triggers.
1915 		 * No matter, that is handled in resync_timer_fn() */
1916 		if (ns.conn == C_SYNC_TARGET)
1917 			mod_timer(&device->resync_timer, jiffies);
1918 
1919 		drbd_md_sync(device);
1920 	}
1921 	put_ldev(device);
1922 out:
1923 	mutex_unlock(device->state_mutex);
1924 }
1925 
1926 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1927 {
1928 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1929 	device->rs_last_bcast = jiffies;
1930 
1931 	if (!get_ldev(device))
1932 		return;
1933 
1934 	drbd_bm_write_lazy(device, 0);
1935 	if (resync_done && is_sync_state(device->state.conn))
1936 		drbd_resync_finished(device);
1937 
1938 	drbd_bcast_event(device, &sib);
1939 	/* update timestamp, in case it took a while to write out stuff */
1940 	device->rs_last_bcast = jiffies;
1941 	put_ldev(device);
1942 }
1943 
1944 static void drbd_ldev_destroy(struct drbd_device *device)
1945 {
1946 	lc_destroy(device->resync);
1947 	device->resync = NULL;
1948 	lc_destroy(device->act_log);
1949 	device->act_log = NULL;
1950 
1951 	__acquire(local);
1952 	drbd_backing_dev_free(device, device->ldev);
1953 	device->ldev = NULL;
1954 	__release(local);
1955 
1956 	clear_bit(GOING_DISKLESS, &device->flags);
1957 	wake_up(&device->misc_wait);
1958 }
1959 
1960 static void go_diskless(struct drbd_device *device)
1961 {
1962 	D_ASSERT(device, device->state.disk == D_FAILED);
1963 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1964 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1965 	 * the protected members anymore, though, so once put_ldev reaches zero
1966 	 * again, it will be safe to free them. */
1967 
1968 	/* Try to write changed bitmap pages, read errors may have just
1969 	 * set some bits outside the area covered by the activity log.
1970 	 *
1971 	 * If we have an IO error during the bitmap writeout,
1972 	 * we will want a full sync next time, just in case.
1973 	 * (Do we want a specific meta data flag for this?)
1974 	 *
1975 	 * If that does not make it to stable storage either,
1976 	 * we cannot do anything about that anymore.
1977 	 *
1978 	 * We still need to check if both bitmap and ldev are present, we may
1979 	 * end up here after a failed attach, before ldev was even assigned.
1980 	 */
1981 	if (device->bitmap && device->ldev) {
1982 		/* An interrupted resync or similar is allowed to recounts bits
1983 		 * while we detach.
1984 		 * Any modifications would not be expected anymore, though.
1985 		 */
1986 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1987 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1988 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1989 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1990 				drbd_md_sync(device);
1991 			}
1992 		}
1993 	}
1994 
1995 	drbd_force_state(device, NS(disk, D_DISKLESS));
1996 }
1997 
1998 static int do_md_sync(struct drbd_device *device)
1999 {
2000 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2001 	drbd_md_sync(device);
2002 	return 0;
2003 }
2004 
2005 /* only called from drbd_worker thread, no locking */
2006 void __update_timing_details(
2007 		struct drbd_thread_timing_details *tdp,
2008 		unsigned int *cb_nr,
2009 		void *cb,
2010 		const char *fn, const unsigned int line)
2011 {
2012 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2013 	struct drbd_thread_timing_details *td = tdp + i;
2014 
2015 	td->start_jif = jiffies;
2016 	td->cb_addr = cb;
2017 	td->caller_fn = fn;
2018 	td->line = line;
2019 	td->cb_nr = *cb_nr;
2020 
2021 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2022 	td = tdp + i;
2023 	memset(td, 0, sizeof(*td));
2024 
2025 	++(*cb_nr);
2026 }
2027 
2028 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2029 {
2030 	if (test_bit(MD_SYNC, &todo))
2031 		do_md_sync(device);
2032 	if (test_bit(RS_DONE, &todo) ||
2033 	    test_bit(RS_PROGRESS, &todo))
2034 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2035 	if (test_bit(GO_DISKLESS, &todo))
2036 		go_diskless(device);
2037 	if (test_bit(DESTROY_DISK, &todo))
2038 		drbd_ldev_destroy(device);
2039 	if (test_bit(RS_START, &todo))
2040 		do_start_resync(device);
2041 }
2042 
2043 #define DRBD_DEVICE_WORK_MASK	\
2044 	((1UL << GO_DISKLESS)	\
2045 	|(1UL << DESTROY_DISK)	\
2046 	|(1UL << MD_SYNC)	\
2047 	|(1UL << RS_START)	\
2048 	|(1UL << RS_PROGRESS)	\
2049 	|(1UL << RS_DONE)	\
2050 	)
2051 
2052 static unsigned long get_work_bits(unsigned long *flags)
2053 {
2054 	unsigned long old, new;
2055 	do {
2056 		old = *flags;
2057 		new = old & ~DRBD_DEVICE_WORK_MASK;
2058 	} while (cmpxchg(flags, old, new) != old);
2059 	return old & DRBD_DEVICE_WORK_MASK;
2060 }
2061 
2062 static void do_unqueued_work(struct drbd_connection *connection)
2063 {
2064 	struct drbd_peer_device *peer_device;
2065 	int vnr;
2066 
2067 	rcu_read_lock();
2068 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2069 		struct drbd_device *device = peer_device->device;
2070 		unsigned long todo = get_work_bits(&device->flags);
2071 		if (!todo)
2072 			continue;
2073 
2074 		kref_get(&device->kref);
2075 		rcu_read_unlock();
2076 		do_device_work(device, todo);
2077 		kref_put(&device->kref, drbd_destroy_device);
2078 		rcu_read_lock();
2079 	}
2080 	rcu_read_unlock();
2081 }
2082 
2083 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2084 {
2085 	spin_lock_irq(&queue->q_lock);
2086 	list_splice_tail_init(&queue->q, work_list);
2087 	spin_unlock_irq(&queue->q_lock);
2088 	return !list_empty(work_list);
2089 }
2090 
2091 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2092 {
2093 	DEFINE_WAIT(wait);
2094 	struct net_conf *nc;
2095 	int uncork, cork;
2096 
2097 	dequeue_work_batch(&connection->sender_work, work_list);
2098 	if (!list_empty(work_list))
2099 		return;
2100 
2101 	/* Still nothing to do?
2102 	 * Maybe we still need to close the current epoch,
2103 	 * even if no new requests are queued yet.
2104 	 *
2105 	 * Also, poke TCP, just in case.
2106 	 * Then wait for new work (or signal). */
2107 	rcu_read_lock();
2108 	nc = rcu_dereference(connection->net_conf);
2109 	uncork = nc ? nc->tcp_cork : 0;
2110 	rcu_read_unlock();
2111 	if (uncork) {
2112 		mutex_lock(&connection->data.mutex);
2113 		if (connection->data.socket)
2114 			drbd_tcp_uncork(connection->data.socket);
2115 		mutex_unlock(&connection->data.mutex);
2116 	}
2117 
2118 	for (;;) {
2119 		int send_barrier;
2120 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2121 		spin_lock_irq(&connection->resource->req_lock);
2122 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2123 		if (!list_empty(&connection->sender_work.q))
2124 			list_splice_tail_init(&connection->sender_work.q, work_list);
2125 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2126 		if (!list_empty(work_list) || signal_pending(current)) {
2127 			spin_unlock_irq(&connection->resource->req_lock);
2128 			break;
2129 		}
2130 
2131 		/* We found nothing new to do, no to-be-communicated request,
2132 		 * no other work item.  We may still need to close the last
2133 		 * epoch.  Next incoming request epoch will be connection ->
2134 		 * current transfer log epoch number.  If that is different
2135 		 * from the epoch of the last request we communicated, it is
2136 		 * safe to send the epoch separating barrier now.
2137 		 */
2138 		send_barrier =
2139 			atomic_read(&connection->current_tle_nr) !=
2140 			connection->send.current_epoch_nr;
2141 		spin_unlock_irq(&connection->resource->req_lock);
2142 
2143 		if (send_barrier)
2144 			maybe_send_barrier(connection,
2145 					connection->send.current_epoch_nr + 1);
2146 
2147 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2148 			break;
2149 
2150 		/* drbd_send() may have called flush_signals() */
2151 		if (get_t_state(&connection->worker) != RUNNING)
2152 			break;
2153 
2154 		schedule();
2155 		/* may be woken up for other things but new work, too,
2156 		 * e.g. if the current epoch got closed.
2157 		 * In which case we send the barrier above. */
2158 	}
2159 	finish_wait(&connection->sender_work.q_wait, &wait);
2160 
2161 	/* someone may have changed the config while we have been waiting above. */
2162 	rcu_read_lock();
2163 	nc = rcu_dereference(connection->net_conf);
2164 	cork = nc ? nc->tcp_cork : 0;
2165 	rcu_read_unlock();
2166 	mutex_lock(&connection->data.mutex);
2167 	if (connection->data.socket) {
2168 		if (cork)
2169 			drbd_tcp_cork(connection->data.socket);
2170 		else if (!uncork)
2171 			drbd_tcp_uncork(connection->data.socket);
2172 	}
2173 	mutex_unlock(&connection->data.mutex);
2174 }
2175 
2176 int drbd_worker(struct drbd_thread *thi)
2177 {
2178 	struct drbd_connection *connection = thi->connection;
2179 	struct drbd_work *w = NULL;
2180 	struct drbd_peer_device *peer_device;
2181 	LIST_HEAD(work_list);
2182 	int vnr;
2183 
2184 	while (get_t_state(thi) == RUNNING) {
2185 		drbd_thread_current_set_cpu(thi);
2186 
2187 		if (list_empty(&work_list)) {
2188 			update_worker_timing_details(connection, wait_for_work);
2189 			wait_for_work(connection, &work_list);
2190 		}
2191 
2192 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2193 			update_worker_timing_details(connection, do_unqueued_work);
2194 			do_unqueued_work(connection);
2195 		}
2196 
2197 		if (signal_pending(current)) {
2198 			flush_signals(current);
2199 			if (get_t_state(thi) == RUNNING) {
2200 				drbd_warn(connection, "Worker got an unexpected signal\n");
2201 				continue;
2202 			}
2203 			break;
2204 		}
2205 
2206 		if (get_t_state(thi) != RUNNING)
2207 			break;
2208 
2209 		if (!list_empty(&work_list)) {
2210 			w = list_first_entry(&work_list, struct drbd_work, list);
2211 			list_del_init(&w->list);
2212 			update_worker_timing_details(connection, w->cb);
2213 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2214 				continue;
2215 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2216 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2217 		}
2218 	}
2219 
2220 	do {
2221 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2222 			update_worker_timing_details(connection, do_unqueued_work);
2223 			do_unqueued_work(connection);
2224 		}
2225 		if (!list_empty(&work_list)) {
2226 			w = list_first_entry(&work_list, struct drbd_work, list);
2227 			list_del_init(&w->list);
2228 			update_worker_timing_details(connection, w->cb);
2229 			w->cb(w, 1);
2230 		} else
2231 			dequeue_work_batch(&connection->sender_work, &work_list);
2232 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2233 
2234 	rcu_read_lock();
2235 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2236 		struct drbd_device *device = peer_device->device;
2237 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2238 		kref_get(&device->kref);
2239 		rcu_read_unlock();
2240 		drbd_device_cleanup(device);
2241 		kref_put(&device->kref, drbd_destroy_device);
2242 		rcu_read_lock();
2243 	}
2244 	rcu_read_unlock();
2245 
2246 	return 0;
2247 }
2248