xref: /linux/drivers/block/drbd/drbd_worker.c (revision 470ac62dfa5732c149adce2cbce84ac678de701f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_worker.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11 
12 */
13 
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26 
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30 
31 static int make_ov_request(struct drbd_device *, int);
32 static int make_resync_request(struct drbd_device *, int);
33 
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46 
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
50 void drbd_md_endio(struct bio *bio)
51 {
52 	struct drbd_device *device;
53 
54 	device = bio->bi_private;
55 	device->md_io.error = blk_status_to_errno(bio->bi_status);
56 
57 	/* special case: drbd_md_read() during drbd_adm_attach() */
58 	if (device->ldev)
59 		put_ldev(device);
60 	bio_put(bio);
61 
62 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63 	 * to timeout on the lower level device, and eventually detach from it.
64 	 * If this io completion runs after that timeout expired, this
65 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
66 	 * During normal operation, this only puts that extra reference
67 	 * down to 1 again.
68 	 * Make sure we first drop the reference, and only then signal
69 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70 	 * next drbd_md_sync_page_io(), that we trigger the
71 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72 	 */
73 	drbd_md_put_buffer(device);
74 	device->md_io.done = 1;
75 	wake_up(&device->misc_wait);
76 }
77 
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83 	unsigned long flags = 0;
84 	struct drbd_peer_device *peer_device = peer_req->peer_device;
85 	struct drbd_device *device = peer_device->device;
86 
87 	spin_lock_irqsave(&device->resource->req_lock, flags);
88 	device->read_cnt += peer_req->i.size >> 9;
89 	list_del(&peer_req->w.list);
90 	if (list_empty(&device->read_ee))
91 		wake_up(&device->ee_wait);
92 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
94 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
95 
96 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97 	put_ldev(device);
98 }
99 
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104 	unsigned long flags = 0;
105 	struct drbd_peer_device *peer_device = peer_req->peer_device;
106 	struct drbd_device *device = peer_device->device;
107 	struct drbd_connection *connection = peer_device->connection;
108 	struct drbd_interval i;
109 	int do_wake;
110 	u64 block_id;
111 	int do_al_complete_io;
112 
113 	/* after we moved peer_req to done_ee,
114 	 * we may no longer access it,
115 	 * it may be freed/reused already!
116 	 * (as soon as we release the req_lock) */
117 	i = peer_req->i;
118 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119 	block_id = peer_req->block_id;
120 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121 
122 	if (peer_req->flags & EE_WAS_ERROR) {
123 		/* In protocol != C, we usually do not send write acks.
124 		 * In case of a write error, send the neg ack anyways. */
125 		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126 			inc_unacked(device);
127 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
128 	}
129 
130 	spin_lock_irqsave(&device->resource->req_lock, flags);
131 	device->writ_cnt += peer_req->i.size >> 9;
132 	list_move_tail(&peer_req->w.list, &device->done_ee);
133 
134 	/*
135 	 * Do not remove from the write_requests tree here: we did not send the
136 	 * Ack yet and did not wake possibly waiting conflicting requests.
137 	 * Removed from the tree from "drbd_process_done_ee" within the
138 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139 	 * _drbd_clear_done_ee.
140 	 */
141 
142 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143 
144 	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
145 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146 	if (peer_req->flags & EE_WAS_ERROR)
147 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148 
149 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
150 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152 			kref_put(&device->kref, drbd_destroy_device);
153 	}
154 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
155 
156 	if (block_id == ID_SYNCER)
157 		drbd_rs_complete_io(device, i.sector);
158 
159 	if (do_wake)
160 		wake_up(&device->ee_wait);
161 
162 	if (do_al_complete_io)
163 		drbd_al_complete_io(device, &i);
164 
165 	put_ldev(device);
166 }
167 
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
171 void drbd_peer_request_endio(struct bio *bio)
172 {
173 	struct drbd_peer_request *peer_req = bio->bi_private;
174 	struct drbd_device *device = peer_req->peer_device->device;
175 	bool is_write = bio_data_dir(bio) == WRITE;
176 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177 			  bio_op(bio) == REQ_OP_DISCARD;
178 
179 	if (bio->bi_status && drbd_ratelimit())
180 		drbd_warn(device, "%s: error=%d s=%llus\n",
181 				is_write ? (is_discard ? "discard" : "write")
182 					: "read", bio->bi_status,
183 				(unsigned long long)peer_req->i.sector);
184 
185 	if (bio->bi_status)
186 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187 
188 	bio_put(bio); /* no need for the bio anymore */
189 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190 		if (is_write)
191 			drbd_endio_write_sec_final(peer_req);
192 		else
193 			drbd_endio_read_sec_final(peer_req);
194 	}
195 }
196 
197 static void
198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201 		device->minor, device->resource->name, device->vnr);
202 }
203 
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208 	unsigned long flags;
209 	struct drbd_request *req = bio->bi_private;
210 	struct drbd_device *device = req->device;
211 	struct bio_and_error m;
212 	enum drbd_req_event what;
213 
214 	/* If this request was aborted locally before,
215 	 * but now was completed "successfully",
216 	 * chances are that this caused arbitrary data corruption.
217 	 *
218 	 * "aborting" requests, or force-detaching the disk, is intended for
219 	 * completely blocked/hung local backing devices which do no longer
220 	 * complete requests at all, not even do error completions.  In this
221 	 * situation, usually a hard-reset and failover is the only way out.
222 	 *
223 	 * By "aborting", basically faking a local error-completion,
224 	 * we allow for a more graceful swichover by cleanly migrating services.
225 	 * Still the affected node has to be rebooted "soon".
226 	 *
227 	 * By completing these requests, we allow the upper layers to re-use
228 	 * the associated data pages.
229 	 *
230 	 * If later the local backing device "recovers", and now DMAs some data
231 	 * from disk into the original request pages, in the best case it will
232 	 * just put random data into unused pages; but typically it will corrupt
233 	 * meanwhile completely unrelated data, causing all sorts of damage.
234 	 *
235 	 * Which means delayed successful completion,
236 	 * especially for READ requests,
237 	 * is a reason to panic().
238 	 *
239 	 * We assume that a delayed *error* completion is OK,
240 	 * though we still will complain noisily about it.
241 	 */
242 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243 		if (drbd_ratelimit())
244 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245 
246 		if (!bio->bi_status)
247 			drbd_panic_after_delayed_completion_of_aborted_request(device);
248 	}
249 
250 	/* to avoid recursion in __req_mod */
251 	if (unlikely(bio->bi_status)) {
252 		switch (bio_op(bio)) {
253 		case REQ_OP_WRITE_ZEROES:
254 		case REQ_OP_DISCARD:
255 			if (bio->bi_status == BLK_STS_NOTSUPP)
256 				what = DISCARD_COMPLETED_NOTSUPP;
257 			else
258 				what = DISCARD_COMPLETED_WITH_ERROR;
259 			break;
260 		case REQ_OP_READ:
261 			if (bio->bi_opf & REQ_RAHEAD)
262 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
263 			else
264 				what = READ_COMPLETED_WITH_ERROR;
265 			break;
266 		default:
267 			what = WRITE_COMPLETED_WITH_ERROR;
268 			break;
269 		}
270 	} else {
271 		what = COMPLETED_OK;
272 	}
273 
274 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275 	bio_put(bio);
276 
277 	/* not req_mod(), we need irqsave here! */
278 	spin_lock_irqsave(&device->resource->req_lock, flags);
279 	__req_mod(req, what, &m);
280 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281 	put_ldev(device);
282 
283 	if (m.bio)
284 		complete_master_bio(device, &m);
285 }
286 
287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289 	SHASH_DESC_ON_STACK(desc, tfm);
290 	struct page *page = peer_req->pages;
291 	struct page *tmp;
292 	unsigned len;
293 	void *src;
294 
295 	desc->tfm = tfm;
296 
297 	crypto_shash_init(desc);
298 
299 	src = kmap_atomic(page);
300 	while ((tmp = page_chain_next(page))) {
301 		/* all but the last page will be fully used */
302 		crypto_shash_update(desc, src, PAGE_SIZE);
303 		kunmap_atomic(src);
304 		page = tmp;
305 		src = kmap_atomic(page);
306 	}
307 	/* and now the last, possibly only partially used page */
308 	len = peer_req->i.size & (PAGE_SIZE - 1);
309 	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310 	kunmap_atomic(src);
311 
312 	crypto_shash_final(desc, digest);
313 	shash_desc_zero(desc);
314 }
315 
316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318 	SHASH_DESC_ON_STACK(desc, tfm);
319 	struct bio_vec bvec;
320 	struct bvec_iter iter;
321 
322 	desc->tfm = tfm;
323 
324 	crypto_shash_init(desc);
325 
326 	bio_for_each_segment(bvec, bio, iter) {
327 		u8 *src;
328 
329 		src = bvec_kmap_local(&bvec);
330 		crypto_shash_update(desc, src, bvec.bv_len);
331 		kunmap_local(src);
332 	}
333 	crypto_shash_final(desc, digest);
334 	shash_desc_zero(desc);
335 }
336 
337 /* MAYBE merge common code with w_e_end_ov_req */
338 static int w_e_send_csum(struct drbd_work *w, int cancel)
339 {
340 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341 	struct drbd_peer_device *peer_device = peer_req->peer_device;
342 	struct drbd_device *device = peer_device->device;
343 	int digest_size;
344 	void *digest;
345 	int err = 0;
346 
347 	if (unlikely(cancel))
348 		goto out;
349 
350 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351 		goto out;
352 
353 	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354 	digest = kmalloc(digest_size, GFP_NOIO);
355 	if (digest) {
356 		sector_t sector = peer_req->i.sector;
357 		unsigned int size = peer_req->i.size;
358 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359 		/* Free peer_req and pages before send.
360 		 * In case we block on congestion, we could otherwise run into
361 		 * some distributed deadlock, if the other side blocks on
362 		 * congestion as well, because our receiver blocks in
363 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
364 		drbd_free_peer_req(device, peer_req);
365 		peer_req = NULL;
366 		inc_rs_pending(device);
367 		err = drbd_send_drequest_csum(peer_device, sector, size,
368 					      digest, digest_size,
369 					      P_CSUM_RS_REQUEST);
370 		kfree(digest);
371 	} else {
372 		drbd_err(device, "kmalloc() of digest failed.\n");
373 		err = -ENOMEM;
374 	}
375 
376 out:
377 	if (peer_req)
378 		drbd_free_peer_req(device, peer_req);
379 
380 	if (unlikely(err))
381 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382 	return err;
383 }
384 
385 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
386 
387 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388 {
389 	struct drbd_device *device = peer_device->device;
390 	struct drbd_peer_request *peer_req;
391 
392 	if (!get_ldev(device))
393 		return -EIO;
394 
395 	/* GFP_TRY, because if there is no memory available right now, this may
396 	 * be rescheduled for later. It is "only" background resync, after all. */
397 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398 				       size, size, GFP_TRY);
399 	if (!peer_req)
400 		goto defer;
401 
402 	peer_req->w.cb = w_e_send_csum;
403 	peer_req->opf = REQ_OP_READ;
404 	spin_lock_irq(&device->resource->req_lock);
405 	list_add_tail(&peer_req->w.list, &device->read_ee);
406 	spin_unlock_irq(&device->resource->req_lock);
407 
408 	atomic_add(size >> 9, &device->rs_sect_ev);
409 	if (drbd_submit_peer_request(peer_req) == 0)
410 		return 0;
411 
412 	/* If it failed because of ENOMEM, retry should help.  If it failed
413 	 * because bio_add_page failed (probably broken lower level driver),
414 	 * retry may or may not help.
415 	 * If it does not, you may need to force disconnect. */
416 	spin_lock_irq(&device->resource->req_lock);
417 	list_del(&peer_req->w.list);
418 	spin_unlock_irq(&device->resource->req_lock);
419 
420 	drbd_free_peer_req(device, peer_req);
421 defer:
422 	put_ldev(device);
423 	return -EAGAIN;
424 }
425 
426 int w_resync_timer(struct drbd_work *w, int cancel)
427 {
428 	struct drbd_device *device =
429 		container_of(w, struct drbd_device, resync_work);
430 
431 	switch (device->state.conn) {
432 	case C_VERIFY_S:
433 		make_ov_request(device, cancel);
434 		break;
435 	case C_SYNC_TARGET:
436 		make_resync_request(device, cancel);
437 		break;
438 	}
439 
440 	return 0;
441 }
442 
443 void resync_timer_fn(struct timer_list *t)
444 {
445 	struct drbd_device *device = from_timer(device, t, resync_timer);
446 
447 	drbd_queue_work_if_unqueued(
448 		&first_peer_device(device)->connection->sender_work,
449 		&device->resync_work);
450 }
451 
452 static void fifo_set(struct fifo_buffer *fb, int value)
453 {
454 	int i;
455 
456 	for (i = 0; i < fb->size; i++)
457 		fb->values[i] = value;
458 }
459 
460 static int fifo_push(struct fifo_buffer *fb, int value)
461 {
462 	int ov;
463 
464 	ov = fb->values[fb->head_index];
465 	fb->values[fb->head_index++] = value;
466 
467 	if (fb->head_index >= fb->size)
468 		fb->head_index = 0;
469 
470 	return ov;
471 }
472 
473 static void fifo_add_val(struct fifo_buffer *fb, int value)
474 {
475 	int i;
476 
477 	for (i = 0; i < fb->size; i++)
478 		fb->values[i] += value;
479 }
480 
481 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
482 {
483 	struct fifo_buffer *fb;
484 
485 	fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
486 	if (!fb)
487 		return NULL;
488 
489 	fb->head_index = 0;
490 	fb->size = fifo_size;
491 	fb->total = 0;
492 
493 	return fb;
494 }
495 
496 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
497 {
498 	struct disk_conf *dc;
499 	unsigned int want;     /* The number of sectors we want in-flight */
500 	int req_sect; /* Number of sectors to request in this turn */
501 	int correction; /* Number of sectors more we need in-flight */
502 	int cps; /* correction per invocation of drbd_rs_controller() */
503 	int steps; /* Number of time steps to plan ahead */
504 	int curr_corr;
505 	int max_sect;
506 	struct fifo_buffer *plan;
507 
508 	dc = rcu_dereference(device->ldev->disk_conf);
509 	plan = rcu_dereference(device->rs_plan_s);
510 
511 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
512 
513 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
514 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
515 	} else { /* normal path */
516 		want = dc->c_fill_target ? dc->c_fill_target :
517 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
518 	}
519 
520 	correction = want - device->rs_in_flight - plan->total;
521 
522 	/* Plan ahead */
523 	cps = correction / steps;
524 	fifo_add_val(plan, cps);
525 	plan->total += cps * steps;
526 
527 	/* What we do in this step */
528 	curr_corr = fifo_push(plan, 0);
529 	plan->total -= curr_corr;
530 
531 	req_sect = sect_in + curr_corr;
532 	if (req_sect < 0)
533 		req_sect = 0;
534 
535 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
536 	if (req_sect > max_sect)
537 		req_sect = max_sect;
538 
539 	/*
540 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
541 		 sect_in, device->rs_in_flight, want, correction,
542 		 steps, cps, device->rs_planed, curr_corr, req_sect);
543 	*/
544 
545 	return req_sect;
546 }
547 
548 static int drbd_rs_number_requests(struct drbd_device *device)
549 {
550 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
551 	int number, mxb;
552 
553 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
554 	device->rs_in_flight -= sect_in;
555 
556 	rcu_read_lock();
557 	mxb = drbd_get_max_buffers(device) / 2;
558 	if (rcu_dereference(device->rs_plan_s)->size) {
559 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
560 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
561 	} else {
562 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
563 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
564 	}
565 	rcu_read_unlock();
566 
567 	/* Don't have more than "max-buffers"/2 in-flight.
568 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
569 	 * potentially causing a distributed deadlock on congestion during
570 	 * online-verify or (checksum-based) resync, if max-buffers,
571 	 * socket buffer sizes and resync rate settings are mis-configured. */
572 
573 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
574 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
575 	 * "number of pages" (typically also 4k),
576 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
577 	if (mxb - device->rs_in_flight/8 < number)
578 		number = mxb - device->rs_in_flight/8;
579 
580 	return number;
581 }
582 
583 static int make_resync_request(struct drbd_device *const device, int cancel)
584 {
585 	struct drbd_peer_device *const peer_device = first_peer_device(device);
586 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
587 	unsigned long bit;
588 	sector_t sector;
589 	const sector_t capacity = get_capacity(device->vdisk);
590 	int max_bio_size;
591 	int number, rollback_i, size;
592 	int align, requeue = 0;
593 	int i = 0;
594 	int discard_granularity = 0;
595 
596 	if (unlikely(cancel))
597 		return 0;
598 
599 	if (device->rs_total == 0) {
600 		/* empty resync? */
601 		drbd_resync_finished(device);
602 		return 0;
603 	}
604 
605 	if (!get_ldev(device)) {
606 		/* Since we only need to access device->rsync a
607 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
608 		   to continue resync with a broken disk makes no sense at
609 		   all */
610 		drbd_err(device, "Disk broke down during resync!\n");
611 		return 0;
612 	}
613 
614 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
615 		rcu_read_lock();
616 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
617 		rcu_read_unlock();
618 	}
619 
620 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
621 	number = drbd_rs_number_requests(device);
622 	if (number <= 0)
623 		goto requeue;
624 
625 	for (i = 0; i < number; i++) {
626 		/* Stop generating RS requests when half of the send buffer is filled,
627 		 * but notify TCP that we'd like to have more space. */
628 		mutex_lock(&connection->data.mutex);
629 		if (connection->data.socket) {
630 			struct sock *sk = connection->data.socket->sk;
631 			int queued = sk->sk_wmem_queued;
632 			int sndbuf = sk->sk_sndbuf;
633 			if (queued > sndbuf / 2) {
634 				requeue = 1;
635 				if (sk->sk_socket)
636 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
637 			}
638 		} else
639 			requeue = 1;
640 		mutex_unlock(&connection->data.mutex);
641 		if (requeue)
642 			goto requeue;
643 
644 next_sector:
645 		size = BM_BLOCK_SIZE;
646 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
647 
648 		if (bit == DRBD_END_OF_BITMAP) {
649 			device->bm_resync_fo = drbd_bm_bits(device);
650 			put_ldev(device);
651 			return 0;
652 		}
653 
654 		sector = BM_BIT_TO_SECT(bit);
655 
656 		if (drbd_try_rs_begin_io(device, sector)) {
657 			device->bm_resync_fo = bit;
658 			goto requeue;
659 		}
660 		device->bm_resync_fo = bit + 1;
661 
662 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
663 			drbd_rs_complete_io(device, sector);
664 			goto next_sector;
665 		}
666 
667 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
668 		/* try to find some adjacent bits.
669 		 * we stop if we have already the maximum req size.
670 		 *
671 		 * Additionally always align bigger requests, in order to
672 		 * be prepared for all stripe sizes of software RAIDs.
673 		 */
674 		align = 1;
675 		rollback_i = i;
676 		while (i < number) {
677 			if (size + BM_BLOCK_SIZE > max_bio_size)
678 				break;
679 
680 			/* Be always aligned */
681 			if (sector & ((1<<(align+3))-1))
682 				break;
683 
684 			if (discard_granularity && size == discard_granularity)
685 				break;
686 
687 			/* do not cross extent boundaries */
688 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
689 				break;
690 			/* now, is it actually dirty, after all?
691 			 * caution, drbd_bm_test_bit is tri-state for some
692 			 * obscure reason; ( b == 0 ) would get the out-of-band
693 			 * only accidentally right because of the "oddly sized"
694 			 * adjustment below */
695 			if (drbd_bm_test_bit(device, bit+1) != 1)
696 				break;
697 			bit++;
698 			size += BM_BLOCK_SIZE;
699 			if ((BM_BLOCK_SIZE << align) <= size)
700 				align++;
701 			i++;
702 		}
703 		/* if we merged some,
704 		 * reset the offset to start the next drbd_bm_find_next from */
705 		if (size > BM_BLOCK_SIZE)
706 			device->bm_resync_fo = bit + 1;
707 #endif
708 
709 		/* adjust very last sectors, in case we are oddly sized */
710 		if (sector + (size>>9) > capacity)
711 			size = (capacity-sector)<<9;
712 
713 		if (device->use_csums) {
714 			switch (read_for_csum(peer_device, sector, size)) {
715 			case -EIO: /* Disk failure */
716 				put_ldev(device);
717 				return -EIO;
718 			case -EAGAIN: /* allocation failed, or ldev busy */
719 				drbd_rs_complete_io(device, sector);
720 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
721 				i = rollback_i;
722 				goto requeue;
723 			case 0:
724 				/* everything ok */
725 				break;
726 			default:
727 				BUG();
728 			}
729 		} else {
730 			int err;
731 
732 			inc_rs_pending(device);
733 			err = drbd_send_drequest(peer_device,
734 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
735 						 sector, size, ID_SYNCER);
736 			if (err) {
737 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
738 				dec_rs_pending(device);
739 				put_ldev(device);
740 				return err;
741 			}
742 		}
743 	}
744 
745 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
746 		/* last syncer _request_ was sent,
747 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
748 		 * next sync group will resume), as soon as we receive the last
749 		 * resync data block, and the last bit is cleared.
750 		 * until then resync "work" is "inactive" ...
751 		 */
752 		put_ldev(device);
753 		return 0;
754 	}
755 
756  requeue:
757 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
758 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
759 	put_ldev(device);
760 	return 0;
761 }
762 
763 static int make_ov_request(struct drbd_device *device, int cancel)
764 {
765 	int number, i, size;
766 	sector_t sector;
767 	const sector_t capacity = get_capacity(device->vdisk);
768 	bool stop_sector_reached = false;
769 
770 	if (unlikely(cancel))
771 		return 1;
772 
773 	number = drbd_rs_number_requests(device);
774 
775 	sector = device->ov_position;
776 	for (i = 0; i < number; i++) {
777 		if (sector >= capacity)
778 			return 1;
779 
780 		/* We check for "finished" only in the reply path:
781 		 * w_e_end_ov_reply().
782 		 * We need to send at least one request out. */
783 		stop_sector_reached = i > 0
784 			&& verify_can_do_stop_sector(device)
785 			&& sector >= device->ov_stop_sector;
786 		if (stop_sector_reached)
787 			break;
788 
789 		size = BM_BLOCK_SIZE;
790 
791 		if (drbd_try_rs_begin_io(device, sector)) {
792 			device->ov_position = sector;
793 			goto requeue;
794 		}
795 
796 		if (sector + (size>>9) > capacity)
797 			size = (capacity-sector)<<9;
798 
799 		inc_rs_pending(device);
800 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
801 			dec_rs_pending(device);
802 			return 0;
803 		}
804 		sector += BM_SECT_PER_BIT;
805 	}
806 	device->ov_position = sector;
807 
808  requeue:
809 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
810 	if (i == 0 || !stop_sector_reached)
811 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
812 	return 1;
813 }
814 
815 int w_ov_finished(struct drbd_work *w, int cancel)
816 {
817 	struct drbd_device_work *dw =
818 		container_of(w, struct drbd_device_work, w);
819 	struct drbd_device *device = dw->device;
820 	kfree(dw);
821 	ov_out_of_sync_print(device);
822 	drbd_resync_finished(device);
823 
824 	return 0;
825 }
826 
827 static int w_resync_finished(struct drbd_work *w, int cancel)
828 {
829 	struct drbd_device_work *dw =
830 		container_of(w, struct drbd_device_work, w);
831 	struct drbd_device *device = dw->device;
832 	kfree(dw);
833 
834 	drbd_resync_finished(device);
835 
836 	return 0;
837 }
838 
839 static void ping_peer(struct drbd_device *device)
840 {
841 	struct drbd_connection *connection = first_peer_device(device)->connection;
842 
843 	clear_bit(GOT_PING_ACK, &connection->flags);
844 	request_ping(connection);
845 	wait_event(connection->ping_wait,
846 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
847 }
848 
849 int drbd_resync_finished(struct drbd_device *device)
850 {
851 	struct drbd_connection *connection = first_peer_device(device)->connection;
852 	unsigned long db, dt, dbdt;
853 	unsigned long n_oos;
854 	union drbd_state os, ns;
855 	struct drbd_device_work *dw;
856 	char *khelper_cmd = NULL;
857 	int verify_done = 0;
858 
859 	/* Remove all elements from the resync LRU. Since future actions
860 	 * might set bits in the (main) bitmap, then the entries in the
861 	 * resync LRU would be wrong. */
862 	if (drbd_rs_del_all(device)) {
863 		/* In case this is not possible now, most probably because
864 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
865 		 * queue (or even the read operations for those packets
866 		 * is not finished by now).   Retry in 100ms. */
867 
868 		schedule_timeout_interruptible(HZ / 10);
869 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
870 		if (dw) {
871 			dw->w.cb = w_resync_finished;
872 			dw->device = device;
873 			drbd_queue_work(&connection->sender_work, &dw->w);
874 			return 1;
875 		}
876 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
877 	}
878 
879 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
880 	if (dt <= 0)
881 		dt = 1;
882 
883 	db = device->rs_total;
884 	/* adjust for verify start and stop sectors, respective reached position */
885 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
886 		db -= device->ov_left;
887 
888 	dbdt = Bit2KB(db/dt);
889 	device->rs_paused /= HZ;
890 
891 	if (!get_ldev(device))
892 		goto out;
893 
894 	ping_peer(device);
895 
896 	spin_lock_irq(&device->resource->req_lock);
897 	os = drbd_read_state(device);
898 
899 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
900 
901 	/* This protects us against multiple calls (that can happen in the presence
902 	   of application IO), and against connectivity loss just before we arrive here. */
903 	if (os.conn <= C_CONNECTED)
904 		goto out_unlock;
905 
906 	ns = os;
907 	ns.conn = C_CONNECTED;
908 
909 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
910 	     verify_done ? "Online verify" : "Resync",
911 	     dt + device->rs_paused, device->rs_paused, dbdt);
912 
913 	n_oos = drbd_bm_total_weight(device);
914 
915 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
916 		if (n_oos) {
917 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
918 			      n_oos, Bit2KB(1));
919 			khelper_cmd = "out-of-sync";
920 		}
921 	} else {
922 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
923 
924 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
925 			khelper_cmd = "after-resync-target";
926 
927 		if (device->use_csums && device->rs_total) {
928 			const unsigned long s = device->rs_same_csum;
929 			const unsigned long t = device->rs_total;
930 			const int ratio =
931 				(t == 0)     ? 0 :
932 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
933 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
934 			     "transferred %luK total %luK\n",
935 			     ratio,
936 			     Bit2KB(device->rs_same_csum),
937 			     Bit2KB(device->rs_total - device->rs_same_csum),
938 			     Bit2KB(device->rs_total));
939 		}
940 	}
941 
942 	if (device->rs_failed) {
943 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
944 
945 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
946 			ns.disk = D_INCONSISTENT;
947 			ns.pdsk = D_UP_TO_DATE;
948 		} else {
949 			ns.disk = D_UP_TO_DATE;
950 			ns.pdsk = D_INCONSISTENT;
951 		}
952 	} else {
953 		ns.disk = D_UP_TO_DATE;
954 		ns.pdsk = D_UP_TO_DATE;
955 
956 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
957 			if (device->p_uuid) {
958 				int i;
959 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
960 					_drbd_uuid_set(device, i, device->p_uuid[i]);
961 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
962 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
963 			} else {
964 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
965 			}
966 		}
967 
968 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
969 			/* for verify runs, we don't update uuids here,
970 			 * so there would be nothing to report. */
971 			drbd_uuid_set_bm(device, 0UL);
972 			drbd_print_uuids(device, "updated UUIDs");
973 			if (device->p_uuid) {
974 				/* Now the two UUID sets are equal, update what we
975 				 * know of the peer. */
976 				int i;
977 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
978 					device->p_uuid[i] = device->ldev->md.uuid[i];
979 			}
980 		}
981 	}
982 
983 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
984 out_unlock:
985 	spin_unlock_irq(&device->resource->req_lock);
986 
987 	/* If we have been sync source, and have an effective fencing-policy,
988 	 * once *all* volumes are back in sync, call "unfence". */
989 	if (os.conn == C_SYNC_SOURCE) {
990 		enum drbd_disk_state disk_state = D_MASK;
991 		enum drbd_disk_state pdsk_state = D_MASK;
992 		enum drbd_fencing_p fp = FP_DONT_CARE;
993 
994 		rcu_read_lock();
995 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
996 		if (fp != FP_DONT_CARE) {
997 			struct drbd_peer_device *peer_device;
998 			int vnr;
999 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1000 				struct drbd_device *device = peer_device->device;
1001 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1002 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1003 			}
1004 		}
1005 		rcu_read_unlock();
1006 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1007 			conn_khelper(connection, "unfence-peer");
1008 	}
1009 
1010 	put_ldev(device);
1011 out:
1012 	device->rs_total  = 0;
1013 	device->rs_failed = 0;
1014 	device->rs_paused = 0;
1015 
1016 	/* reset start sector, if we reached end of device */
1017 	if (verify_done && device->ov_left == 0)
1018 		device->ov_start_sector = 0;
1019 
1020 	drbd_md_sync(device);
1021 
1022 	if (khelper_cmd)
1023 		drbd_khelper(device, khelper_cmd);
1024 
1025 	return 1;
1026 }
1027 
1028 /* helper */
1029 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1030 {
1031 	if (drbd_peer_req_has_active_page(peer_req)) {
1032 		/* This might happen if sendpage() has not finished */
1033 		int i = PFN_UP(peer_req->i.size);
1034 		atomic_add(i, &device->pp_in_use_by_net);
1035 		atomic_sub(i, &device->pp_in_use);
1036 		spin_lock_irq(&device->resource->req_lock);
1037 		list_add_tail(&peer_req->w.list, &device->net_ee);
1038 		spin_unlock_irq(&device->resource->req_lock);
1039 		wake_up(&drbd_pp_wait);
1040 	} else
1041 		drbd_free_peer_req(device, peer_req);
1042 }
1043 
1044 /**
1045  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1046  * @w:		work object.
1047  * @cancel:	The connection will be closed anyways
1048  */
1049 int w_e_end_data_req(struct drbd_work *w, int cancel)
1050 {
1051 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1052 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1053 	struct drbd_device *device = peer_device->device;
1054 	int err;
1055 
1056 	if (unlikely(cancel)) {
1057 		drbd_free_peer_req(device, peer_req);
1058 		dec_unacked(device);
1059 		return 0;
1060 	}
1061 
1062 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1063 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1064 	} else {
1065 		if (drbd_ratelimit())
1066 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1067 			    (unsigned long long)peer_req->i.sector);
1068 
1069 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1070 	}
1071 
1072 	dec_unacked(device);
1073 
1074 	move_to_net_ee_or_free(device, peer_req);
1075 
1076 	if (unlikely(err))
1077 		drbd_err(device, "drbd_send_block() failed\n");
1078 	return err;
1079 }
1080 
1081 static bool all_zero(struct drbd_peer_request *peer_req)
1082 {
1083 	struct page *page = peer_req->pages;
1084 	unsigned int len = peer_req->i.size;
1085 
1086 	page_chain_for_each(page) {
1087 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1088 		unsigned int i, words = l / sizeof(long);
1089 		unsigned long *d;
1090 
1091 		d = kmap_atomic(page);
1092 		for (i = 0; i < words; i++) {
1093 			if (d[i]) {
1094 				kunmap_atomic(d);
1095 				return false;
1096 			}
1097 		}
1098 		kunmap_atomic(d);
1099 		len -= l;
1100 	}
1101 
1102 	return true;
1103 }
1104 
1105 /**
1106  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1107  * @w:		work object.
1108  * @cancel:	The connection will be closed anyways
1109  */
1110 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1111 {
1112 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1113 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1114 	struct drbd_device *device = peer_device->device;
1115 	int err;
1116 
1117 	if (unlikely(cancel)) {
1118 		drbd_free_peer_req(device, peer_req);
1119 		dec_unacked(device);
1120 		return 0;
1121 	}
1122 
1123 	if (get_ldev_if_state(device, D_FAILED)) {
1124 		drbd_rs_complete_io(device, peer_req->i.sector);
1125 		put_ldev(device);
1126 	}
1127 
1128 	if (device->state.conn == C_AHEAD) {
1129 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1130 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1131 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1132 			inc_rs_pending(device);
1133 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1134 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1135 			else
1136 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1137 		} else {
1138 			if (drbd_ratelimit())
1139 				drbd_err(device, "Not sending RSDataReply, "
1140 				    "partner DISKLESS!\n");
1141 			err = 0;
1142 		}
1143 	} else {
1144 		if (drbd_ratelimit())
1145 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1146 			    (unsigned long long)peer_req->i.sector);
1147 
1148 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1149 
1150 		/* update resync data with failure */
1151 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1152 	}
1153 
1154 	dec_unacked(device);
1155 
1156 	move_to_net_ee_or_free(device, peer_req);
1157 
1158 	if (unlikely(err))
1159 		drbd_err(device, "drbd_send_block() failed\n");
1160 	return err;
1161 }
1162 
1163 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1164 {
1165 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1166 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1167 	struct drbd_device *device = peer_device->device;
1168 	struct digest_info *di;
1169 	int digest_size;
1170 	void *digest = NULL;
1171 	int err, eq = 0;
1172 
1173 	if (unlikely(cancel)) {
1174 		drbd_free_peer_req(device, peer_req);
1175 		dec_unacked(device);
1176 		return 0;
1177 	}
1178 
1179 	if (get_ldev(device)) {
1180 		drbd_rs_complete_io(device, peer_req->i.sector);
1181 		put_ldev(device);
1182 	}
1183 
1184 	di = peer_req->digest;
1185 
1186 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1187 		/* quick hack to try to avoid a race against reconfiguration.
1188 		 * a real fix would be much more involved,
1189 		 * introducing more locking mechanisms */
1190 		if (peer_device->connection->csums_tfm) {
1191 			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1192 			D_ASSERT(device, digest_size == di->digest_size);
1193 			digest = kmalloc(digest_size, GFP_NOIO);
1194 		}
1195 		if (digest) {
1196 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1197 			eq = !memcmp(digest, di->digest, digest_size);
1198 			kfree(digest);
1199 		}
1200 
1201 		if (eq) {
1202 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1203 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1204 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1205 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1206 		} else {
1207 			inc_rs_pending(device);
1208 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1209 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1210 			kfree(di);
1211 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1212 		}
1213 	} else {
1214 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1215 		if (drbd_ratelimit())
1216 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1217 	}
1218 
1219 	dec_unacked(device);
1220 	move_to_net_ee_or_free(device, peer_req);
1221 
1222 	if (unlikely(err))
1223 		drbd_err(device, "drbd_send_block/ack() failed\n");
1224 	return err;
1225 }
1226 
1227 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1228 {
1229 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1230 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1231 	struct drbd_device *device = peer_device->device;
1232 	sector_t sector = peer_req->i.sector;
1233 	unsigned int size = peer_req->i.size;
1234 	int digest_size;
1235 	void *digest;
1236 	int err = 0;
1237 
1238 	if (unlikely(cancel))
1239 		goto out;
1240 
1241 	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1242 	digest = kmalloc(digest_size, GFP_NOIO);
1243 	if (!digest) {
1244 		err = 1;	/* terminate the connection in case the allocation failed */
1245 		goto out;
1246 	}
1247 
1248 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1249 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1250 	else
1251 		memset(digest, 0, digest_size);
1252 
1253 	/* Free e and pages before send.
1254 	 * In case we block on congestion, we could otherwise run into
1255 	 * some distributed deadlock, if the other side blocks on
1256 	 * congestion as well, because our receiver blocks in
1257 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1258 	drbd_free_peer_req(device, peer_req);
1259 	peer_req = NULL;
1260 	inc_rs_pending(device);
1261 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1262 	if (err)
1263 		dec_rs_pending(device);
1264 	kfree(digest);
1265 
1266 out:
1267 	if (peer_req)
1268 		drbd_free_peer_req(device, peer_req);
1269 	dec_unacked(device);
1270 	return err;
1271 }
1272 
1273 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1274 {
1275 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1276 		device->ov_last_oos_size += size>>9;
1277 	} else {
1278 		device->ov_last_oos_start = sector;
1279 		device->ov_last_oos_size = size>>9;
1280 	}
1281 	drbd_set_out_of_sync(device, sector, size);
1282 }
1283 
1284 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1285 {
1286 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1287 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1288 	struct drbd_device *device = peer_device->device;
1289 	struct digest_info *di;
1290 	void *digest;
1291 	sector_t sector = peer_req->i.sector;
1292 	unsigned int size = peer_req->i.size;
1293 	int digest_size;
1294 	int err, eq = 0;
1295 	bool stop_sector_reached = false;
1296 
1297 	if (unlikely(cancel)) {
1298 		drbd_free_peer_req(device, peer_req);
1299 		dec_unacked(device);
1300 		return 0;
1301 	}
1302 
1303 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1304 	 * the resync lru has been cleaned up already */
1305 	if (get_ldev(device)) {
1306 		drbd_rs_complete_io(device, peer_req->i.sector);
1307 		put_ldev(device);
1308 	}
1309 
1310 	di = peer_req->digest;
1311 
1312 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1313 		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1314 		digest = kmalloc(digest_size, GFP_NOIO);
1315 		if (digest) {
1316 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1317 
1318 			D_ASSERT(device, digest_size == di->digest_size);
1319 			eq = !memcmp(digest, di->digest, digest_size);
1320 			kfree(digest);
1321 		}
1322 	}
1323 
1324 	/* Free peer_req and pages before send.
1325 	 * In case we block on congestion, we could otherwise run into
1326 	 * some distributed deadlock, if the other side blocks on
1327 	 * congestion as well, because our receiver blocks in
1328 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1329 	drbd_free_peer_req(device, peer_req);
1330 	if (!eq)
1331 		drbd_ov_out_of_sync_found(device, sector, size);
1332 	else
1333 		ov_out_of_sync_print(device);
1334 
1335 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1336 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1337 
1338 	dec_unacked(device);
1339 
1340 	--device->ov_left;
1341 
1342 	/* let's advance progress step marks only for every other megabyte */
1343 	if ((device->ov_left & 0x200) == 0x200)
1344 		drbd_advance_rs_marks(device, device->ov_left);
1345 
1346 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1347 		(sector + (size>>9)) >= device->ov_stop_sector;
1348 
1349 	if (device->ov_left == 0 || stop_sector_reached) {
1350 		ov_out_of_sync_print(device);
1351 		drbd_resync_finished(device);
1352 	}
1353 
1354 	return err;
1355 }
1356 
1357 /* FIXME
1358  * We need to track the number of pending barrier acks,
1359  * and to be able to wait for them.
1360  * See also comment in drbd_adm_attach before drbd_suspend_io.
1361  */
1362 static int drbd_send_barrier(struct drbd_connection *connection)
1363 {
1364 	struct p_barrier *p;
1365 	struct drbd_socket *sock;
1366 
1367 	sock = &connection->data;
1368 	p = conn_prepare_command(connection, sock);
1369 	if (!p)
1370 		return -EIO;
1371 	p->barrier = connection->send.current_epoch_nr;
1372 	p->pad = 0;
1373 	connection->send.current_epoch_writes = 0;
1374 	connection->send.last_sent_barrier_jif = jiffies;
1375 
1376 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1377 }
1378 
1379 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1380 {
1381 	struct drbd_socket *sock = &pd->connection->data;
1382 	if (!drbd_prepare_command(pd, sock))
1383 		return -EIO;
1384 	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1385 }
1386 
1387 int w_send_write_hint(struct drbd_work *w, int cancel)
1388 {
1389 	struct drbd_device *device =
1390 		container_of(w, struct drbd_device, unplug_work);
1391 
1392 	if (cancel)
1393 		return 0;
1394 	return pd_send_unplug_remote(first_peer_device(device));
1395 }
1396 
1397 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1398 {
1399 	if (!connection->send.seen_any_write_yet) {
1400 		connection->send.seen_any_write_yet = true;
1401 		connection->send.current_epoch_nr = epoch;
1402 		connection->send.current_epoch_writes = 0;
1403 		connection->send.last_sent_barrier_jif = jiffies;
1404 	}
1405 }
1406 
1407 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1408 {
1409 	/* re-init if first write on this connection */
1410 	if (!connection->send.seen_any_write_yet)
1411 		return;
1412 	if (connection->send.current_epoch_nr != epoch) {
1413 		if (connection->send.current_epoch_writes)
1414 			drbd_send_barrier(connection);
1415 		connection->send.current_epoch_nr = epoch;
1416 	}
1417 }
1418 
1419 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1420 {
1421 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1422 	struct drbd_device *device = req->device;
1423 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1424 	struct drbd_connection *const connection = peer_device->connection;
1425 	int err;
1426 
1427 	if (unlikely(cancel)) {
1428 		req_mod(req, SEND_CANCELED);
1429 		return 0;
1430 	}
1431 	req->pre_send_jif = jiffies;
1432 
1433 	/* this time, no connection->send.current_epoch_writes++;
1434 	 * If it was sent, it was the closing barrier for the last
1435 	 * replicated epoch, before we went into AHEAD mode.
1436 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1437 	maybe_send_barrier(connection, req->epoch);
1438 
1439 	err = drbd_send_out_of_sync(peer_device, req);
1440 	req_mod(req, OOS_HANDED_TO_NETWORK);
1441 
1442 	return err;
1443 }
1444 
1445 /**
1446  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1447  * @w:		work object.
1448  * @cancel:	The connection will be closed anyways
1449  */
1450 int w_send_dblock(struct drbd_work *w, int cancel)
1451 {
1452 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1453 	struct drbd_device *device = req->device;
1454 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1455 	struct drbd_connection *connection = peer_device->connection;
1456 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1457 	int err;
1458 
1459 	if (unlikely(cancel)) {
1460 		req_mod(req, SEND_CANCELED);
1461 		return 0;
1462 	}
1463 	req->pre_send_jif = jiffies;
1464 
1465 	re_init_if_first_write(connection, req->epoch);
1466 	maybe_send_barrier(connection, req->epoch);
1467 	connection->send.current_epoch_writes++;
1468 
1469 	err = drbd_send_dblock(peer_device, req);
1470 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1471 
1472 	if (do_send_unplug && !err)
1473 		pd_send_unplug_remote(peer_device);
1474 
1475 	return err;
1476 }
1477 
1478 /**
1479  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1480  * @w:		work object.
1481  * @cancel:	The connection will be closed anyways
1482  */
1483 int w_send_read_req(struct drbd_work *w, int cancel)
1484 {
1485 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1486 	struct drbd_device *device = req->device;
1487 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1488 	struct drbd_connection *connection = peer_device->connection;
1489 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1490 	int err;
1491 
1492 	if (unlikely(cancel)) {
1493 		req_mod(req, SEND_CANCELED);
1494 		return 0;
1495 	}
1496 	req->pre_send_jif = jiffies;
1497 
1498 	/* Even read requests may close a write epoch,
1499 	 * if there was any yet. */
1500 	maybe_send_barrier(connection, req->epoch);
1501 
1502 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1503 				 (unsigned long)req);
1504 
1505 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1506 
1507 	if (do_send_unplug && !err)
1508 		pd_send_unplug_remote(peer_device);
1509 
1510 	return err;
1511 }
1512 
1513 int w_restart_disk_io(struct drbd_work *w, int cancel)
1514 {
1515 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1516 	struct drbd_device *device = req->device;
1517 
1518 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1519 		drbd_al_begin_io(device, &req->i);
1520 
1521 	req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1522 					   req->master_bio, GFP_NOIO,
1523 					  &drbd_io_bio_set);
1524 	req->private_bio->bi_private = req;
1525 	req->private_bio->bi_end_io = drbd_request_endio;
1526 	submit_bio_noacct(req->private_bio);
1527 
1528 	return 0;
1529 }
1530 
1531 static int _drbd_may_sync_now(struct drbd_device *device)
1532 {
1533 	struct drbd_device *odev = device;
1534 	int resync_after;
1535 
1536 	while (1) {
1537 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1538 			return 1;
1539 		rcu_read_lock();
1540 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1541 		rcu_read_unlock();
1542 		if (resync_after == -1)
1543 			return 1;
1544 		odev = minor_to_device(resync_after);
1545 		if (!odev)
1546 			return 1;
1547 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1548 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1549 		    odev->state.aftr_isp || odev->state.peer_isp ||
1550 		    odev->state.user_isp)
1551 			return 0;
1552 	}
1553 }
1554 
1555 /**
1556  * drbd_pause_after() - Pause resync on all devices that may not resync now
1557  * @device:	DRBD device.
1558  *
1559  * Called from process context only (admin command and after_state_ch).
1560  */
1561 static bool drbd_pause_after(struct drbd_device *device)
1562 {
1563 	bool changed = false;
1564 	struct drbd_device *odev;
1565 	int i;
1566 
1567 	rcu_read_lock();
1568 	idr_for_each_entry(&drbd_devices, odev, i) {
1569 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1570 			continue;
1571 		if (!_drbd_may_sync_now(odev) &&
1572 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1573 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1574 			changed = true;
1575 	}
1576 	rcu_read_unlock();
1577 
1578 	return changed;
1579 }
1580 
1581 /**
1582  * drbd_resume_next() - Resume resync on all devices that may resync now
1583  * @device:	DRBD device.
1584  *
1585  * Called from process context only (admin command and worker).
1586  */
1587 static bool drbd_resume_next(struct drbd_device *device)
1588 {
1589 	bool changed = false;
1590 	struct drbd_device *odev;
1591 	int i;
1592 
1593 	rcu_read_lock();
1594 	idr_for_each_entry(&drbd_devices, odev, i) {
1595 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1596 			continue;
1597 		if (odev->state.aftr_isp) {
1598 			if (_drbd_may_sync_now(odev) &&
1599 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1600 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1601 				changed = true;
1602 		}
1603 	}
1604 	rcu_read_unlock();
1605 	return changed;
1606 }
1607 
1608 void resume_next_sg(struct drbd_device *device)
1609 {
1610 	lock_all_resources();
1611 	drbd_resume_next(device);
1612 	unlock_all_resources();
1613 }
1614 
1615 void suspend_other_sg(struct drbd_device *device)
1616 {
1617 	lock_all_resources();
1618 	drbd_pause_after(device);
1619 	unlock_all_resources();
1620 }
1621 
1622 /* caller must lock_all_resources() */
1623 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1624 {
1625 	struct drbd_device *odev;
1626 	int resync_after;
1627 
1628 	if (o_minor == -1)
1629 		return NO_ERROR;
1630 	if (o_minor < -1 || o_minor > MINORMASK)
1631 		return ERR_RESYNC_AFTER;
1632 
1633 	/* check for loops */
1634 	odev = minor_to_device(o_minor);
1635 	while (1) {
1636 		if (odev == device)
1637 			return ERR_RESYNC_AFTER_CYCLE;
1638 
1639 		/* You are free to depend on diskless, non-existing,
1640 		 * or not yet/no longer existing minors.
1641 		 * We only reject dependency loops.
1642 		 * We cannot follow the dependency chain beyond a detached or
1643 		 * missing minor.
1644 		 */
1645 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1646 			return NO_ERROR;
1647 
1648 		rcu_read_lock();
1649 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1650 		rcu_read_unlock();
1651 		/* dependency chain ends here, no cycles. */
1652 		if (resync_after == -1)
1653 			return NO_ERROR;
1654 
1655 		/* follow the dependency chain */
1656 		odev = minor_to_device(resync_after);
1657 	}
1658 }
1659 
1660 /* caller must lock_all_resources() */
1661 void drbd_resync_after_changed(struct drbd_device *device)
1662 {
1663 	int changed;
1664 
1665 	do {
1666 		changed  = drbd_pause_after(device);
1667 		changed |= drbd_resume_next(device);
1668 	} while (changed);
1669 }
1670 
1671 void drbd_rs_controller_reset(struct drbd_device *device)
1672 {
1673 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1674 	struct fifo_buffer *plan;
1675 
1676 	atomic_set(&device->rs_sect_in, 0);
1677 	atomic_set(&device->rs_sect_ev, 0);
1678 	device->rs_in_flight = 0;
1679 	device->rs_last_events =
1680 		(int)part_stat_read_accum(disk->part0, sectors);
1681 
1682 	/* Updating the RCU protected object in place is necessary since
1683 	   this function gets called from atomic context.
1684 	   It is valid since all other updates also lead to an completely
1685 	   empty fifo */
1686 	rcu_read_lock();
1687 	plan = rcu_dereference(device->rs_plan_s);
1688 	plan->total = 0;
1689 	fifo_set(plan, 0);
1690 	rcu_read_unlock();
1691 }
1692 
1693 void start_resync_timer_fn(struct timer_list *t)
1694 {
1695 	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1696 	drbd_device_post_work(device, RS_START);
1697 }
1698 
1699 static void do_start_resync(struct drbd_device *device)
1700 {
1701 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1702 		drbd_warn(device, "postponing start_resync ...\n");
1703 		device->start_resync_timer.expires = jiffies + HZ/10;
1704 		add_timer(&device->start_resync_timer);
1705 		return;
1706 	}
1707 
1708 	drbd_start_resync(device, C_SYNC_SOURCE);
1709 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1710 }
1711 
1712 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1713 {
1714 	bool csums_after_crash_only;
1715 	rcu_read_lock();
1716 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1717 	rcu_read_unlock();
1718 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1719 		connection->csums_tfm &&			/* configured? */
1720 		(csums_after_crash_only == false		/* use for each resync? */
1721 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1722 }
1723 
1724 /**
1725  * drbd_start_resync() - Start the resync process
1726  * @device:	DRBD device.
1727  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1728  *
1729  * This function might bring you directly into one of the
1730  * C_PAUSED_SYNC_* states.
1731  */
1732 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1733 {
1734 	struct drbd_peer_device *peer_device = first_peer_device(device);
1735 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1736 	union drbd_state ns;
1737 	int r;
1738 
1739 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1740 		drbd_err(device, "Resync already running!\n");
1741 		return;
1742 	}
1743 
1744 	if (!connection) {
1745 		drbd_err(device, "No connection to peer, aborting!\n");
1746 		return;
1747 	}
1748 
1749 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1750 		if (side == C_SYNC_TARGET) {
1751 			/* Since application IO was locked out during C_WF_BITMAP_T and
1752 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1753 			   we check that we might make the data inconsistent. */
1754 			r = drbd_khelper(device, "before-resync-target");
1755 			r = (r >> 8) & 0xff;
1756 			if (r > 0) {
1757 				drbd_info(device, "before-resync-target handler returned %d, "
1758 					 "dropping connection.\n", r);
1759 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1760 				return;
1761 			}
1762 		} else /* C_SYNC_SOURCE */ {
1763 			r = drbd_khelper(device, "before-resync-source");
1764 			r = (r >> 8) & 0xff;
1765 			if (r > 0) {
1766 				if (r == 3) {
1767 					drbd_info(device, "before-resync-source handler returned %d, "
1768 						 "ignoring. Old userland tools?", r);
1769 				} else {
1770 					drbd_info(device, "before-resync-source handler returned %d, "
1771 						 "dropping connection.\n", r);
1772 					conn_request_state(connection,
1773 							   NS(conn, C_DISCONNECTING), CS_HARD);
1774 					return;
1775 				}
1776 			}
1777 		}
1778 	}
1779 
1780 	if (current == connection->worker.task) {
1781 		/* The worker should not sleep waiting for state_mutex,
1782 		   that can take long */
1783 		if (!mutex_trylock(device->state_mutex)) {
1784 			set_bit(B_RS_H_DONE, &device->flags);
1785 			device->start_resync_timer.expires = jiffies + HZ/5;
1786 			add_timer(&device->start_resync_timer);
1787 			return;
1788 		}
1789 	} else {
1790 		mutex_lock(device->state_mutex);
1791 	}
1792 
1793 	lock_all_resources();
1794 	clear_bit(B_RS_H_DONE, &device->flags);
1795 	/* Did some connection breakage or IO error race with us? */
1796 	if (device->state.conn < C_CONNECTED
1797 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1798 		unlock_all_resources();
1799 		goto out;
1800 	}
1801 
1802 	ns = drbd_read_state(device);
1803 
1804 	ns.aftr_isp = !_drbd_may_sync_now(device);
1805 
1806 	ns.conn = side;
1807 
1808 	if (side == C_SYNC_TARGET)
1809 		ns.disk = D_INCONSISTENT;
1810 	else /* side == C_SYNC_SOURCE */
1811 		ns.pdsk = D_INCONSISTENT;
1812 
1813 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1814 	ns = drbd_read_state(device);
1815 
1816 	if (ns.conn < C_CONNECTED)
1817 		r = SS_UNKNOWN_ERROR;
1818 
1819 	if (r == SS_SUCCESS) {
1820 		unsigned long tw = drbd_bm_total_weight(device);
1821 		unsigned long now = jiffies;
1822 		int i;
1823 
1824 		device->rs_failed    = 0;
1825 		device->rs_paused    = 0;
1826 		device->rs_same_csum = 0;
1827 		device->rs_last_sect_ev = 0;
1828 		device->rs_total     = tw;
1829 		device->rs_start     = now;
1830 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1831 			device->rs_mark_left[i] = tw;
1832 			device->rs_mark_time[i] = now;
1833 		}
1834 		drbd_pause_after(device);
1835 		/* Forget potentially stale cached per resync extent bit-counts.
1836 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1837 		 * disabled, and know the disk state is ok. */
1838 		spin_lock(&device->al_lock);
1839 		lc_reset(device->resync);
1840 		device->resync_locked = 0;
1841 		device->resync_wenr = LC_FREE;
1842 		spin_unlock(&device->al_lock);
1843 	}
1844 	unlock_all_resources();
1845 
1846 	if (r == SS_SUCCESS) {
1847 		wake_up(&device->al_wait); /* for lc_reset() above */
1848 		/* reset rs_last_bcast when a resync or verify is started,
1849 		 * to deal with potential jiffies wrap. */
1850 		device->rs_last_bcast = jiffies - HZ;
1851 
1852 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1853 		     drbd_conn_str(ns.conn),
1854 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1855 		     (unsigned long) device->rs_total);
1856 		if (side == C_SYNC_TARGET) {
1857 			device->bm_resync_fo = 0;
1858 			device->use_csums = use_checksum_based_resync(connection, device);
1859 		} else {
1860 			device->use_csums = false;
1861 		}
1862 
1863 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1864 		 * with w_send_oos, or the sync target will get confused as to
1865 		 * how much bits to resync.  We cannot do that always, because for an
1866 		 * empty resync and protocol < 95, we need to do it here, as we call
1867 		 * drbd_resync_finished from here in that case.
1868 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1869 		 * and from after_state_ch otherwise. */
1870 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1871 			drbd_gen_and_send_sync_uuid(peer_device);
1872 
1873 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1874 			/* This still has a race (about when exactly the peers
1875 			 * detect connection loss) that can lead to a full sync
1876 			 * on next handshake. In 8.3.9 we fixed this with explicit
1877 			 * resync-finished notifications, but the fix
1878 			 * introduces a protocol change.  Sleeping for some
1879 			 * time longer than the ping interval + timeout on the
1880 			 * SyncSource, to give the SyncTarget the chance to
1881 			 * detect connection loss, then waiting for a ping
1882 			 * response (implicit in drbd_resync_finished) reduces
1883 			 * the race considerably, but does not solve it. */
1884 			if (side == C_SYNC_SOURCE) {
1885 				struct net_conf *nc;
1886 				int timeo;
1887 
1888 				rcu_read_lock();
1889 				nc = rcu_dereference(connection->net_conf);
1890 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1891 				rcu_read_unlock();
1892 				schedule_timeout_interruptible(timeo);
1893 			}
1894 			drbd_resync_finished(device);
1895 		}
1896 
1897 		drbd_rs_controller_reset(device);
1898 		/* ns.conn may already be != device->state.conn,
1899 		 * we may have been paused in between, or become paused until
1900 		 * the timer triggers.
1901 		 * No matter, that is handled in resync_timer_fn() */
1902 		if (ns.conn == C_SYNC_TARGET)
1903 			mod_timer(&device->resync_timer, jiffies);
1904 
1905 		drbd_md_sync(device);
1906 	}
1907 	put_ldev(device);
1908 out:
1909 	mutex_unlock(device->state_mutex);
1910 }
1911 
1912 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1913 {
1914 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1915 	device->rs_last_bcast = jiffies;
1916 
1917 	if (!get_ldev(device))
1918 		return;
1919 
1920 	drbd_bm_write_lazy(device, 0);
1921 	if (resync_done && is_sync_state(device->state.conn))
1922 		drbd_resync_finished(device);
1923 
1924 	drbd_bcast_event(device, &sib);
1925 	/* update timestamp, in case it took a while to write out stuff */
1926 	device->rs_last_bcast = jiffies;
1927 	put_ldev(device);
1928 }
1929 
1930 static void drbd_ldev_destroy(struct drbd_device *device)
1931 {
1932 	lc_destroy(device->resync);
1933 	device->resync = NULL;
1934 	lc_destroy(device->act_log);
1935 	device->act_log = NULL;
1936 
1937 	__acquire(local);
1938 	drbd_backing_dev_free(device, device->ldev);
1939 	device->ldev = NULL;
1940 	__release(local);
1941 
1942 	clear_bit(GOING_DISKLESS, &device->flags);
1943 	wake_up(&device->misc_wait);
1944 }
1945 
1946 static void go_diskless(struct drbd_device *device)
1947 {
1948 	D_ASSERT(device, device->state.disk == D_FAILED);
1949 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1950 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1951 	 * the protected members anymore, though, so once put_ldev reaches zero
1952 	 * again, it will be safe to free them. */
1953 
1954 	/* Try to write changed bitmap pages, read errors may have just
1955 	 * set some bits outside the area covered by the activity log.
1956 	 *
1957 	 * If we have an IO error during the bitmap writeout,
1958 	 * we will want a full sync next time, just in case.
1959 	 * (Do we want a specific meta data flag for this?)
1960 	 *
1961 	 * If that does not make it to stable storage either,
1962 	 * we cannot do anything about that anymore.
1963 	 *
1964 	 * We still need to check if both bitmap and ldev are present, we may
1965 	 * end up here after a failed attach, before ldev was even assigned.
1966 	 */
1967 	if (device->bitmap && device->ldev) {
1968 		/* An interrupted resync or similar is allowed to recounts bits
1969 		 * while we detach.
1970 		 * Any modifications would not be expected anymore, though.
1971 		 */
1972 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1973 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1974 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1975 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1976 				drbd_md_sync(device);
1977 			}
1978 		}
1979 	}
1980 
1981 	drbd_force_state(device, NS(disk, D_DISKLESS));
1982 }
1983 
1984 static int do_md_sync(struct drbd_device *device)
1985 {
1986 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1987 	drbd_md_sync(device);
1988 	return 0;
1989 }
1990 
1991 /* only called from drbd_worker thread, no locking */
1992 void __update_timing_details(
1993 		struct drbd_thread_timing_details *tdp,
1994 		unsigned int *cb_nr,
1995 		void *cb,
1996 		const char *fn, const unsigned int line)
1997 {
1998 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1999 	struct drbd_thread_timing_details *td = tdp + i;
2000 
2001 	td->start_jif = jiffies;
2002 	td->cb_addr = cb;
2003 	td->caller_fn = fn;
2004 	td->line = line;
2005 	td->cb_nr = *cb_nr;
2006 
2007 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2008 	td = tdp + i;
2009 	memset(td, 0, sizeof(*td));
2010 
2011 	++(*cb_nr);
2012 }
2013 
2014 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2015 {
2016 	if (test_bit(MD_SYNC, &todo))
2017 		do_md_sync(device);
2018 	if (test_bit(RS_DONE, &todo) ||
2019 	    test_bit(RS_PROGRESS, &todo))
2020 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2021 	if (test_bit(GO_DISKLESS, &todo))
2022 		go_diskless(device);
2023 	if (test_bit(DESTROY_DISK, &todo))
2024 		drbd_ldev_destroy(device);
2025 	if (test_bit(RS_START, &todo))
2026 		do_start_resync(device);
2027 }
2028 
2029 #define DRBD_DEVICE_WORK_MASK	\
2030 	((1UL << GO_DISKLESS)	\
2031 	|(1UL << DESTROY_DISK)	\
2032 	|(1UL << MD_SYNC)	\
2033 	|(1UL << RS_START)	\
2034 	|(1UL << RS_PROGRESS)	\
2035 	|(1UL << RS_DONE)	\
2036 	)
2037 
2038 static unsigned long get_work_bits(unsigned long *flags)
2039 {
2040 	unsigned long old, new;
2041 	do {
2042 		old = *flags;
2043 		new = old & ~DRBD_DEVICE_WORK_MASK;
2044 	} while (cmpxchg(flags, old, new) != old);
2045 	return old & DRBD_DEVICE_WORK_MASK;
2046 }
2047 
2048 static void do_unqueued_work(struct drbd_connection *connection)
2049 {
2050 	struct drbd_peer_device *peer_device;
2051 	int vnr;
2052 
2053 	rcu_read_lock();
2054 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2055 		struct drbd_device *device = peer_device->device;
2056 		unsigned long todo = get_work_bits(&device->flags);
2057 		if (!todo)
2058 			continue;
2059 
2060 		kref_get(&device->kref);
2061 		rcu_read_unlock();
2062 		do_device_work(device, todo);
2063 		kref_put(&device->kref, drbd_destroy_device);
2064 		rcu_read_lock();
2065 	}
2066 	rcu_read_unlock();
2067 }
2068 
2069 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2070 {
2071 	spin_lock_irq(&queue->q_lock);
2072 	list_splice_tail_init(&queue->q, work_list);
2073 	spin_unlock_irq(&queue->q_lock);
2074 	return !list_empty(work_list);
2075 }
2076 
2077 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2078 {
2079 	DEFINE_WAIT(wait);
2080 	struct net_conf *nc;
2081 	int uncork, cork;
2082 
2083 	dequeue_work_batch(&connection->sender_work, work_list);
2084 	if (!list_empty(work_list))
2085 		return;
2086 
2087 	/* Still nothing to do?
2088 	 * Maybe we still need to close the current epoch,
2089 	 * even if no new requests are queued yet.
2090 	 *
2091 	 * Also, poke TCP, just in case.
2092 	 * Then wait for new work (or signal). */
2093 	rcu_read_lock();
2094 	nc = rcu_dereference(connection->net_conf);
2095 	uncork = nc ? nc->tcp_cork : 0;
2096 	rcu_read_unlock();
2097 	if (uncork) {
2098 		mutex_lock(&connection->data.mutex);
2099 		if (connection->data.socket)
2100 			tcp_sock_set_cork(connection->data.socket->sk, false);
2101 		mutex_unlock(&connection->data.mutex);
2102 	}
2103 
2104 	for (;;) {
2105 		int send_barrier;
2106 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2107 		spin_lock_irq(&connection->resource->req_lock);
2108 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2109 		if (!list_empty(&connection->sender_work.q))
2110 			list_splice_tail_init(&connection->sender_work.q, work_list);
2111 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2112 		if (!list_empty(work_list) || signal_pending(current)) {
2113 			spin_unlock_irq(&connection->resource->req_lock);
2114 			break;
2115 		}
2116 
2117 		/* We found nothing new to do, no to-be-communicated request,
2118 		 * no other work item.  We may still need to close the last
2119 		 * epoch.  Next incoming request epoch will be connection ->
2120 		 * current transfer log epoch number.  If that is different
2121 		 * from the epoch of the last request we communicated, it is
2122 		 * safe to send the epoch separating barrier now.
2123 		 */
2124 		send_barrier =
2125 			atomic_read(&connection->current_tle_nr) !=
2126 			connection->send.current_epoch_nr;
2127 		spin_unlock_irq(&connection->resource->req_lock);
2128 
2129 		if (send_barrier)
2130 			maybe_send_barrier(connection,
2131 					connection->send.current_epoch_nr + 1);
2132 
2133 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2134 			break;
2135 
2136 		/* drbd_send() may have called flush_signals() */
2137 		if (get_t_state(&connection->worker) != RUNNING)
2138 			break;
2139 
2140 		schedule();
2141 		/* may be woken up for other things but new work, too,
2142 		 * e.g. if the current epoch got closed.
2143 		 * In which case we send the barrier above. */
2144 	}
2145 	finish_wait(&connection->sender_work.q_wait, &wait);
2146 
2147 	/* someone may have changed the config while we have been waiting above. */
2148 	rcu_read_lock();
2149 	nc = rcu_dereference(connection->net_conf);
2150 	cork = nc ? nc->tcp_cork : 0;
2151 	rcu_read_unlock();
2152 	mutex_lock(&connection->data.mutex);
2153 	if (connection->data.socket) {
2154 		if (cork)
2155 			tcp_sock_set_cork(connection->data.socket->sk, true);
2156 		else if (!uncork)
2157 			tcp_sock_set_cork(connection->data.socket->sk, false);
2158 	}
2159 	mutex_unlock(&connection->data.mutex);
2160 }
2161 
2162 int drbd_worker(struct drbd_thread *thi)
2163 {
2164 	struct drbd_connection *connection = thi->connection;
2165 	struct drbd_work *w = NULL;
2166 	struct drbd_peer_device *peer_device;
2167 	LIST_HEAD(work_list);
2168 	int vnr;
2169 
2170 	while (get_t_state(thi) == RUNNING) {
2171 		drbd_thread_current_set_cpu(thi);
2172 
2173 		if (list_empty(&work_list)) {
2174 			update_worker_timing_details(connection, wait_for_work);
2175 			wait_for_work(connection, &work_list);
2176 		}
2177 
2178 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2179 			update_worker_timing_details(connection, do_unqueued_work);
2180 			do_unqueued_work(connection);
2181 		}
2182 
2183 		if (signal_pending(current)) {
2184 			flush_signals(current);
2185 			if (get_t_state(thi) == RUNNING) {
2186 				drbd_warn(connection, "Worker got an unexpected signal\n");
2187 				continue;
2188 			}
2189 			break;
2190 		}
2191 
2192 		if (get_t_state(thi) != RUNNING)
2193 			break;
2194 
2195 		if (!list_empty(&work_list)) {
2196 			w = list_first_entry(&work_list, struct drbd_work, list);
2197 			list_del_init(&w->list);
2198 			update_worker_timing_details(connection, w->cb);
2199 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2200 				continue;
2201 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2202 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2203 		}
2204 	}
2205 
2206 	do {
2207 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2208 			update_worker_timing_details(connection, do_unqueued_work);
2209 			do_unqueued_work(connection);
2210 		}
2211 		if (!list_empty(&work_list)) {
2212 			w = list_first_entry(&work_list, struct drbd_work, list);
2213 			list_del_init(&w->list);
2214 			update_worker_timing_details(connection, w->cb);
2215 			w->cb(w, 1);
2216 		} else
2217 			dequeue_work_batch(&connection->sender_work, &work_list);
2218 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2219 
2220 	rcu_read_lock();
2221 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2222 		struct drbd_device *device = peer_device->device;
2223 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2224 		kref_get(&device->kref);
2225 		rcu_read_unlock();
2226 		drbd_device_cleanup(device);
2227 		kref_put(&device->kref, drbd_destroy_device);
2228 		rcu_read_lock();
2229 	}
2230 	rcu_read_unlock();
2231 
2232 	return 0;
2233 }
2234