xref: /linux/drivers/block/drbd/drbd_worker.c (revision 005438a8eef063495ac059d128eea71b58de50e5)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64 
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_endio(struct bio *bio, int error)
69 {
70 	struct drbd_device *device;
71 
72 	device = bio->bi_private;
73 	device->md_io.error = error;
74 
75 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 	 * to timeout on the lower level device, and eventually detach from it.
77 	 * If this io completion runs after that timeout expired, this
78 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 	 * During normal operation, this only puts that extra reference
80 	 * down to 1 again.
81 	 * Make sure we first drop the reference, and only then signal
82 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 	 * next drbd_md_sync_page_io(), that we trigger the
84 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85 	 */
86 	drbd_md_put_buffer(device);
87 	device->md_io.done = 1;
88 	wake_up(&device->misc_wait);
89 	bio_put(bio);
90 	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91 		put_ldev(device);
92 }
93 
94 /* reads on behalf of the partner,
95  * "submitted" by the receiver
96  */
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
98 {
99 	unsigned long flags = 0;
100 	struct drbd_peer_device *peer_device = peer_req->peer_device;
101 	struct drbd_device *device = peer_device->device;
102 
103 	spin_lock_irqsave(&device->resource->req_lock, flags);
104 	device->read_cnt += peer_req->i.size >> 9;
105 	list_del(&peer_req->w.list);
106 	if (list_empty(&device->read_ee))
107 		wake_up(&device->ee_wait);
108 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
110 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
111 
112 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113 	put_ldev(device);
114 }
115 
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120 	unsigned long flags = 0;
121 	struct drbd_peer_device *peer_device = peer_req->peer_device;
122 	struct drbd_device *device = peer_device->device;
123 	struct drbd_interval i;
124 	int do_wake;
125 	u64 block_id;
126 	int do_al_complete_io;
127 
128 	/* after we moved peer_req to done_ee,
129 	 * we may no longer access it,
130 	 * it may be freed/reused already!
131 	 * (as soon as we release the req_lock) */
132 	i = peer_req->i;
133 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 	block_id = peer_req->block_id;
135 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
136 
137 	spin_lock_irqsave(&device->resource->req_lock, flags);
138 	device->writ_cnt += peer_req->i.size >> 9;
139 	list_move_tail(&peer_req->w.list, &device->done_ee);
140 
141 	/*
142 	 * Do not remove from the write_requests tree here: we did not send the
143 	 * Ack yet and did not wake possibly waiting conflicting requests.
144 	 * Removed from the tree from "drbd_process_done_ee" within the
145 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146 	 * _drbd_clear_done_ee.
147 	 */
148 
149 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
150 
151 	/* FIXME do we want to detach for failed REQ_DISCARD?
152 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153 	if (peer_req->flags & EE_WAS_ERROR)
154 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
156 
157 	if (block_id == ID_SYNCER)
158 		drbd_rs_complete_io(device, i.sector);
159 
160 	if (do_wake)
161 		wake_up(&device->ee_wait);
162 
163 	if (do_al_complete_io)
164 		drbd_al_complete_io(device, &i);
165 
166 	wake_asender(peer_device->connection);
167 	put_ldev(device);
168 }
169 
170 /* writes on behalf of the partner, or resync writes,
171  * "submitted" by the receiver.
172  */
173 void drbd_peer_request_endio(struct bio *bio, int error)
174 {
175 	struct drbd_peer_request *peer_req = bio->bi_private;
176 	struct drbd_device *device = peer_req->peer_device->device;
177 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
178 	int is_write = bio_data_dir(bio) == WRITE;
179 	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
180 
181 	if (error && __ratelimit(&drbd_ratelimit_state))
182 		drbd_warn(device, "%s: error=%d s=%llus\n",
183 				is_write ? (is_discard ? "discard" : "write")
184 					: "read", error,
185 				(unsigned long long)peer_req->i.sector);
186 	if (!error && !uptodate) {
187 		if (__ratelimit(&drbd_ratelimit_state))
188 			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
189 					is_write ? "write" : "read",
190 					(unsigned long long)peer_req->i.sector);
191 		/* strange behavior of some lower level drivers...
192 		 * fail the request by clearing the uptodate flag,
193 		 * but do not return any error?! */
194 		error = -EIO;
195 	}
196 
197 	if (error)
198 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
199 
200 	bio_put(bio); /* no need for the bio anymore */
201 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
202 		if (is_write)
203 			drbd_endio_write_sec_final(peer_req);
204 		else
205 			drbd_endio_read_sec_final(peer_req);
206 	}
207 }
208 
209 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
210  */
211 void drbd_request_endio(struct bio *bio, int error)
212 {
213 	unsigned long flags;
214 	struct drbd_request *req = bio->bi_private;
215 	struct drbd_device *device = req->device;
216 	struct bio_and_error m;
217 	enum drbd_req_event what;
218 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
219 
220 	if (!error && !uptodate) {
221 		drbd_warn(device, "p %s: setting error to -EIO\n",
222 			 bio_data_dir(bio) == WRITE ? "write" : "read");
223 		/* strange behavior of some lower level drivers...
224 		 * fail the request by clearing the uptodate flag,
225 		 * but do not return any error?! */
226 		error = -EIO;
227 	}
228 
229 
230 	/* If this request was aborted locally before,
231 	 * but now was completed "successfully",
232 	 * chances are that this caused arbitrary data corruption.
233 	 *
234 	 * "aborting" requests, or force-detaching the disk, is intended for
235 	 * completely blocked/hung local backing devices which do no longer
236 	 * complete requests at all, not even do error completions.  In this
237 	 * situation, usually a hard-reset and failover is the only way out.
238 	 *
239 	 * By "aborting", basically faking a local error-completion,
240 	 * we allow for a more graceful swichover by cleanly migrating services.
241 	 * Still the affected node has to be rebooted "soon".
242 	 *
243 	 * By completing these requests, we allow the upper layers to re-use
244 	 * the associated data pages.
245 	 *
246 	 * If later the local backing device "recovers", and now DMAs some data
247 	 * from disk into the original request pages, in the best case it will
248 	 * just put random data into unused pages; but typically it will corrupt
249 	 * meanwhile completely unrelated data, causing all sorts of damage.
250 	 *
251 	 * Which means delayed successful completion,
252 	 * especially for READ requests,
253 	 * is a reason to panic().
254 	 *
255 	 * We assume that a delayed *error* completion is OK,
256 	 * though we still will complain noisily about it.
257 	 */
258 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
259 		if (__ratelimit(&drbd_ratelimit_state))
260 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
261 
262 		if (!error)
263 			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
264 	}
265 
266 	/* to avoid recursion in __req_mod */
267 	if (unlikely(error)) {
268 		if (bio->bi_rw & REQ_DISCARD)
269 			what = (error == -EOPNOTSUPP)
270 				? DISCARD_COMPLETED_NOTSUPP
271 				: DISCARD_COMPLETED_WITH_ERROR;
272 		else
273 			what = (bio_data_dir(bio) == WRITE)
274 			? WRITE_COMPLETED_WITH_ERROR
275 			: (bio_rw(bio) == READ)
276 			  ? READ_COMPLETED_WITH_ERROR
277 			  : READ_AHEAD_COMPLETED_WITH_ERROR;
278 	} else
279 		what = COMPLETED_OK;
280 
281 	bio_put(req->private_bio);
282 	req->private_bio = ERR_PTR(error);
283 
284 	/* not req_mod(), we need irqsave here! */
285 	spin_lock_irqsave(&device->resource->req_lock, flags);
286 	__req_mod(req, what, &m);
287 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
288 	put_ldev(device);
289 
290 	if (m.bio)
291 		complete_master_bio(device, &m);
292 }
293 
294 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
295 {
296 	struct hash_desc desc;
297 	struct scatterlist sg;
298 	struct page *page = peer_req->pages;
299 	struct page *tmp;
300 	unsigned len;
301 
302 	desc.tfm = tfm;
303 	desc.flags = 0;
304 
305 	sg_init_table(&sg, 1);
306 	crypto_hash_init(&desc);
307 
308 	while ((tmp = page_chain_next(page))) {
309 		/* all but the last page will be fully used */
310 		sg_set_page(&sg, page, PAGE_SIZE, 0);
311 		crypto_hash_update(&desc, &sg, sg.length);
312 		page = tmp;
313 	}
314 	/* and now the last, possibly only partially used page */
315 	len = peer_req->i.size & (PAGE_SIZE - 1);
316 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
317 	crypto_hash_update(&desc, &sg, sg.length);
318 	crypto_hash_final(&desc, digest);
319 }
320 
321 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
322 {
323 	struct hash_desc desc;
324 	struct scatterlist sg;
325 	struct bio_vec bvec;
326 	struct bvec_iter iter;
327 
328 	desc.tfm = tfm;
329 	desc.flags = 0;
330 
331 	sg_init_table(&sg, 1);
332 	crypto_hash_init(&desc);
333 
334 	bio_for_each_segment(bvec, bio, iter) {
335 		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
336 		crypto_hash_update(&desc, &sg, sg.length);
337 	}
338 	crypto_hash_final(&desc, digest);
339 }
340 
341 /* MAYBE merge common code with w_e_end_ov_req */
342 static int w_e_send_csum(struct drbd_work *w, int cancel)
343 {
344 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
345 	struct drbd_peer_device *peer_device = peer_req->peer_device;
346 	struct drbd_device *device = peer_device->device;
347 	int digest_size;
348 	void *digest;
349 	int err = 0;
350 
351 	if (unlikely(cancel))
352 		goto out;
353 
354 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
355 		goto out;
356 
357 	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
358 	digest = kmalloc(digest_size, GFP_NOIO);
359 	if (digest) {
360 		sector_t sector = peer_req->i.sector;
361 		unsigned int size = peer_req->i.size;
362 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
363 		/* Free peer_req and pages before send.
364 		 * In case we block on congestion, we could otherwise run into
365 		 * some distributed deadlock, if the other side blocks on
366 		 * congestion as well, because our receiver blocks in
367 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
368 		drbd_free_peer_req(device, peer_req);
369 		peer_req = NULL;
370 		inc_rs_pending(device);
371 		err = drbd_send_drequest_csum(peer_device, sector, size,
372 					      digest, digest_size,
373 					      P_CSUM_RS_REQUEST);
374 		kfree(digest);
375 	} else {
376 		drbd_err(device, "kmalloc() of digest failed.\n");
377 		err = -ENOMEM;
378 	}
379 
380 out:
381 	if (peer_req)
382 		drbd_free_peer_req(device, peer_req);
383 
384 	if (unlikely(err))
385 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
386 	return err;
387 }
388 
389 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
390 
391 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
392 {
393 	struct drbd_device *device = peer_device->device;
394 	struct drbd_peer_request *peer_req;
395 
396 	if (!get_ldev(device))
397 		return -EIO;
398 
399 	/* GFP_TRY, because if there is no memory available right now, this may
400 	 * be rescheduled for later. It is "only" background resync, after all. */
401 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
402 				       size, true /* has real payload */, GFP_TRY);
403 	if (!peer_req)
404 		goto defer;
405 
406 	peer_req->w.cb = w_e_send_csum;
407 	spin_lock_irq(&device->resource->req_lock);
408 	list_add_tail(&peer_req->w.list, &device->read_ee);
409 	spin_unlock_irq(&device->resource->req_lock);
410 
411 	atomic_add(size >> 9, &device->rs_sect_ev);
412 	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
413 		return 0;
414 
415 	/* If it failed because of ENOMEM, retry should help.  If it failed
416 	 * because bio_add_page failed (probably broken lower level driver),
417 	 * retry may or may not help.
418 	 * If it does not, you may need to force disconnect. */
419 	spin_lock_irq(&device->resource->req_lock);
420 	list_del(&peer_req->w.list);
421 	spin_unlock_irq(&device->resource->req_lock);
422 
423 	drbd_free_peer_req(device, peer_req);
424 defer:
425 	put_ldev(device);
426 	return -EAGAIN;
427 }
428 
429 int w_resync_timer(struct drbd_work *w, int cancel)
430 {
431 	struct drbd_device *device =
432 		container_of(w, struct drbd_device, resync_work);
433 
434 	switch (device->state.conn) {
435 	case C_VERIFY_S:
436 		make_ov_request(device, cancel);
437 		break;
438 	case C_SYNC_TARGET:
439 		make_resync_request(device, cancel);
440 		break;
441 	}
442 
443 	return 0;
444 }
445 
446 void resync_timer_fn(unsigned long data)
447 {
448 	struct drbd_device *device = (struct drbd_device *) data;
449 
450 	drbd_queue_work_if_unqueued(
451 		&first_peer_device(device)->connection->sender_work,
452 		&device->resync_work);
453 }
454 
455 static void fifo_set(struct fifo_buffer *fb, int value)
456 {
457 	int i;
458 
459 	for (i = 0; i < fb->size; i++)
460 		fb->values[i] = value;
461 }
462 
463 static int fifo_push(struct fifo_buffer *fb, int value)
464 {
465 	int ov;
466 
467 	ov = fb->values[fb->head_index];
468 	fb->values[fb->head_index++] = value;
469 
470 	if (fb->head_index >= fb->size)
471 		fb->head_index = 0;
472 
473 	return ov;
474 }
475 
476 static void fifo_add_val(struct fifo_buffer *fb, int value)
477 {
478 	int i;
479 
480 	for (i = 0; i < fb->size; i++)
481 		fb->values[i] += value;
482 }
483 
484 struct fifo_buffer *fifo_alloc(int fifo_size)
485 {
486 	struct fifo_buffer *fb;
487 
488 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
489 	if (!fb)
490 		return NULL;
491 
492 	fb->head_index = 0;
493 	fb->size = fifo_size;
494 	fb->total = 0;
495 
496 	return fb;
497 }
498 
499 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
500 {
501 	struct disk_conf *dc;
502 	unsigned int want;     /* The number of sectors we want in-flight */
503 	int req_sect; /* Number of sectors to request in this turn */
504 	int correction; /* Number of sectors more we need in-flight */
505 	int cps; /* correction per invocation of drbd_rs_controller() */
506 	int steps; /* Number of time steps to plan ahead */
507 	int curr_corr;
508 	int max_sect;
509 	struct fifo_buffer *plan;
510 
511 	dc = rcu_dereference(device->ldev->disk_conf);
512 	plan = rcu_dereference(device->rs_plan_s);
513 
514 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
515 
516 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
517 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
518 	} else { /* normal path */
519 		want = dc->c_fill_target ? dc->c_fill_target :
520 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
521 	}
522 
523 	correction = want - device->rs_in_flight - plan->total;
524 
525 	/* Plan ahead */
526 	cps = correction / steps;
527 	fifo_add_val(plan, cps);
528 	plan->total += cps * steps;
529 
530 	/* What we do in this step */
531 	curr_corr = fifo_push(plan, 0);
532 	plan->total -= curr_corr;
533 
534 	req_sect = sect_in + curr_corr;
535 	if (req_sect < 0)
536 		req_sect = 0;
537 
538 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
539 	if (req_sect > max_sect)
540 		req_sect = max_sect;
541 
542 	/*
543 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
544 		 sect_in, device->rs_in_flight, want, correction,
545 		 steps, cps, device->rs_planed, curr_corr, req_sect);
546 	*/
547 
548 	return req_sect;
549 }
550 
551 static int drbd_rs_number_requests(struct drbd_device *device)
552 {
553 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
554 	int number, mxb;
555 
556 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
557 	device->rs_in_flight -= sect_in;
558 
559 	rcu_read_lock();
560 	mxb = drbd_get_max_buffers(device) / 2;
561 	if (rcu_dereference(device->rs_plan_s)->size) {
562 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564 	} else {
565 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
567 	}
568 	rcu_read_unlock();
569 
570 	/* Don't have more than "max-buffers"/2 in-flight.
571 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572 	 * potentially causing a distributed deadlock on congestion during
573 	 * online-verify or (checksum-based) resync, if max-buffers,
574 	 * socket buffer sizes and resync rate settings are mis-configured. */
575 
576 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
578 	 * "number of pages" (typically also 4k),
579 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
580 	if (mxb - device->rs_in_flight/8 < number)
581 		number = mxb - device->rs_in_flight/8;
582 
583 	return number;
584 }
585 
586 static int make_resync_request(struct drbd_device *const device, int cancel)
587 {
588 	struct drbd_peer_device *const peer_device = first_peer_device(device);
589 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590 	unsigned long bit;
591 	sector_t sector;
592 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
593 	int max_bio_size;
594 	int number, rollback_i, size;
595 	int align, requeue = 0;
596 	int i = 0;
597 
598 	if (unlikely(cancel))
599 		return 0;
600 
601 	if (device->rs_total == 0) {
602 		/* empty resync? */
603 		drbd_resync_finished(device);
604 		return 0;
605 	}
606 
607 	if (!get_ldev(device)) {
608 		/* Since we only need to access device->rsync a
609 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
610 		   to continue resync with a broken disk makes no sense at
611 		   all */
612 		drbd_err(device, "Disk broke down during resync!\n");
613 		return 0;
614 	}
615 
616 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617 	number = drbd_rs_number_requests(device);
618 	if (number <= 0)
619 		goto requeue;
620 
621 	for (i = 0; i < number; i++) {
622 		/* Stop generating RS requests when half of the send buffer is filled,
623 		 * but notify TCP that we'd like to have more space. */
624 		mutex_lock(&connection->data.mutex);
625 		if (connection->data.socket) {
626 			struct sock *sk = connection->data.socket->sk;
627 			int queued = sk->sk_wmem_queued;
628 			int sndbuf = sk->sk_sndbuf;
629 			if (queued > sndbuf / 2) {
630 				requeue = 1;
631 				if (sk->sk_socket)
632 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633 			}
634 		} else
635 			requeue = 1;
636 		mutex_unlock(&connection->data.mutex);
637 		if (requeue)
638 			goto requeue;
639 
640 next_sector:
641 		size = BM_BLOCK_SIZE;
642 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
643 
644 		if (bit == DRBD_END_OF_BITMAP) {
645 			device->bm_resync_fo = drbd_bm_bits(device);
646 			put_ldev(device);
647 			return 0;
648 		}
649 
650 		sector = BM_BIT_TO_SECT(bit);
651 
652 		if (drbd_try_rs_begin_io(device, sector)) {
653 			device->bm_resync_fo = bit;
654 			goto requeue;
655 		}
656 		device->bm_resync_fo = bit + 1;
657 
658 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659 			drbd_rs_complete_io(device, sector);
660 			goto next_sector;
661 		}
662 
663 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
664 		/* try to find some adjacent bits.
665 		 * we stop if we have already the maximum req size.
666 		 *
667 		 * Additionally always align bigger requests, in order to
668 		 * be prepared for all stripe sizes of software RAIDs.
669 		 */
670 		align = 1;
671 		rollback_i = i;
672 		while (i < number) {
673 			if (size + BM_BLOCK_SIZE > max_bio_size)
674 				break;
675 
676 			/* Be always aligned */
677 			if (sector & ((1<<(align+3))-1))
678 				break;
679 
680 			/* do not cross extent boundaries */
681 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
682 				break;
683 			/* now, is it actually dirty, after all?
684 			 * caution, drbd_bm_test_bit is tri-state for some
685 			 * obscure reason; ( b == 0 ) would get the out-of-band
686 			 * only accidentally right because of the "oddly sized"
687 			 * adjustment below */
688 			if (drbd_bm_test_bit(device, bit+1) != 1)
689 				break;
690 			bit++;
691 			size += BM_BLOCK_SIZE;
692 			if ((BM_BLOCK_SIZE << align) <= size)
693 				align++;
694 			i++;
695 		}
696 		/* if we merged some,
697 		 * reset the offset to start the next drbd_bm_find_next from */
698 		if (size > BM_BLOCK_SIZE)
699 			device->bm_resync_fo = bit + 1;
700 #endif
701 
702 		/* adjust very last sectors, in case we are oddly sized */
703 		if (sector + (size>>9) > capacity)
704 			size = (capacity-sector)<<9;
705 
706 		if (device->use_csums) {
707 			switch (read_for_csum(peer_device, sector, size)) {
708 			case -EIO: /* Disk failure */
709 				put_ldev(device);
710 				return -EIO;
711 			case -EAGAIN: /* allocation failed, or ldev busy */
712 				drbd_rs_complete_io(device, sector);
713 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
714 				i = rollback_i;
715 				goto requeue;
716 			case 0:
717 				/* everything ok */
718 				break;
719 			default:
720 				BUG();
721 			}
722 		} else {
723 			int err;
724 
725 			inc_rs_pending(device);
726 			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
727 						 sector, size, ID_SYNCER);
728 			if (err) {
729 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
730 				dec_rs_pending(device);
731 				put_ldev(device);
732 				return err;
733 			}
734 		}
735 	}
736 
737 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
738 		/* last syncer _request_ was sent,
739 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
740 		 * next sync group will resume), as soon as we receive the last
741 		 * resync data block, and the last bit is cleared.
742 		 * until then resync "work" is "inactive" ...
743 		 */
744 		put_ldev(device);
745 		return 0;
746 	}
747 
748  requeue:
749 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
750 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
751 	put_ldev(device);
752 	return 0;
753 }
754 
755 static int make_ov_request(struct drbd_device *device, int cancel)
756 {
757 	int number, i, size;
758 	sector_t sector;
759 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
760 	bool stop_sector_reached = false;
761 
762 	if (unlikely(cancel))
763 		return 1;
764 
765 	number = drbd_rs_number_requests(device);
766 
767 	sector = device->ov_position;
768 	for (i = 0; i < number; i++) {
769 		if (sector >= capacity)
770 			return 1;
771 
772 		/* We check for "finished" only in the reply path:
773 		 * w_e_end_ov_reply().
774 		 * We need to send at least one request out. */
775 		stop_sector_reached = i > 0
776 			&& verify_can_do_stop_sector(device)
777 			&& sector >= device->ov_stop_sector;
778 		if (stop_sector_reached)
779 			break;
780 
781 		size = BM_BLOCK_SIZE;
782 
783 		if (drbd_try_rs_begin_io(device, sector)) {
784 			device->ov_position = sector;
785 			goto requeue;
786 		}
787 
788 		if (sector + (size>>9) > capacity)
789 			size = (capacity-sector)<<9;
790 
791 		inc_rs_pending(device);
792 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
793 			dec_rs_pending(device);
794 			return 0;
795 		}
796 		sector += BM_SECT_PER_BIT;
797 	}
798 	device->ov_position = sector;
799 
800  requeue:
801 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
802 	if (i == 0 || !stop_sector_reached)
803 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
804 	return 1;
805 }
806 
807 int w_ov_finished(struct drbd_work *w, int cancel)
808 {
809 	struct drbd_device_work *dw =
810 		container_of(w, struct drbd_device_work, w);
811 	struct drbd_device *device = dw->device;
812 	kfree(dw);
813 	ov_out_of_sync_print(device);
814 	drbd_resync_finished(device);
815 
816 	return 0;
817 }
818 
819 static int w_resync_finished(struct drbd_work *w, int cancel)
820 {
821 	struct drbd_device_work *dw =
822 		container_of(w, struct drbd_device_work, w);
823 	struct drbd_device *device = dw->device;
824 	kfree(dw);
825 
826 	drbd_resync_finished(device);
827 
828 	return 0;
829 }
830 
831 static void ping_peer(struct drbd_device *device)
832 {
833 	struct drbd_connection *connection = first_peer_device(device)->connection;
834 
835 	clear_bit(GOT_PING_ACK, &connection->flags);
836 	request_ping(connection);
837 	wait_event(connection->ping_wait,
838 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
839 }
840 
841 int drbd_resync_finished(struct drbd_device *device)
842 {
843 	unsigned long db, dt, dbdt;
844 	unsigned long n_oos;
845 	union drbd_state os, ns;
846 	struct drbd_device_work *dw;
847 	char *khelper_cmd = NULL;
848 	int verify_done = 0;
849 
850 	/* Remove all elements from the resync LRU. Since future actions
851 	 * might set bits in the (main) bitmap, then the entries in the
852 	 * resync LRU would be wrong. */
853 	if (drbd_rs_del_all(device)) {
854 		/* In case this is not possible now, most probably because
855 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
856 		 * queue (or even the read operations for those packets
857 		 * is not finished by now).   Retry in 100ms. */
858 
859 		schedule_timeout_interruptible(HZ / 10);
860 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
861 		if (dw) {
862 			dw->w.cb = w_resync_finished;
863 			dw->device = device;
864 			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
865 					&dw->w);
866 			return 1;
867 		}
868 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
869 	}
870 
871 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
872 	if (dt <= 0)
873 		dt = 1;
874 
875 	db = device->rs_total;
876 	/* adjust for verify start and stop sectors, respective reached position */
877 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
878 		db -= device->ov_left;
879 
880 	dbdt = Bit2KB(db/dt);
881 	device->rs_paused /= HZ;
882 
883 	if (!get_ldev(device))
884 		goto out;
885 
886 	ping_peer(device);
887 
888 	spin_lock_irq(&device->resource->req_lock);
889 	os = drbd_read_state(device);
890 
891 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
892 
893 	/* This protects us against multiple calls (that can happen in the presence
894 	   of application IO), and against connectivity loss just before we arrive here. */
895 	if (os.conn <= C_CONNECTED)
896 		goto out_unlock;
897 
898 	ns = os;
899 	ns.conn = C_CONNECTED;
900 
901 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
902 	     verify_done ? "Online verify" : "Resync",
903 	     dt + device->rs_paused, device->rs_paused, dbdt);
904 
905 	n_oos = drbd_bm_total_weight(device);
906 
907 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
908 		if (n_oos) {
909 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
910 			      n_oos, Bit2KB(1));
911 			khelper_cmd = "out-of-sync";
912 		}
913 	} else {
914 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
915 
916 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
917 			khelper_cmd = "after-resync-target";
918 
919 		if (device->use_csums && device->rs_total) {
920 			const unsigned long s = device->rs_same_csum;
921 			const unsigned long t = device->rs_total;
922 			const int ratio =
923 				(t == 0)     ? 0 :
924 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
925 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
926 			     "transferred %luK total %luK\n",
927 			     ratio,
928 			     Bit2KB(device->rs_same_csum),
929 			     Bit2KB(device->rs_total - device->rs_same_csum),
930 			     Bit2KB(device->rs_total));
931 		}
932 	}
933 
934 	if (device->rs_failed) {
935 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
936 
937 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
938 			ns.disk = D_INCONSISTENT;
939 			ns.pdsk = D_UP_TO_DATE;
940 		} else {
941 			ns.disk = D_UP_TO_DATE;
942 			ns.pdsk = D_INCONSISTENT;
943 		}
944 	} else {
945 		ns.disk = D_UP_TO_DATE;
946 		ns.pdsk = D_UP_TO_DATE;
947 
948 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
949 			if (device->p_uuid) {
950 				int i;
951 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
952 					_drbd_uuid_set(device, i, device->p_uuid[i]);
953 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
954 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
955 			} else {
956 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
957 			}
958 		}
959 
960 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
961 			/* for verify runs, we don't update uuids here,
962 			 * so there would be nothing to report. */
963 			drbd_uuid_set_bm(device, 0UL);
964 			drbd_print_uuids(device, "updated UUIDs");
965 			if (device->p_uuid) {
966 				/* Now the two UUID sets are equal, update what we
967 				 * know of the peer. */
968 				int i;
969 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
970 					device->p_uuid[i] = device->ldev->md.uuid[i];
971 			}
972 		}
973 	}
974 
975 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
976 out_unlock:
977 	spin_unlock_irq(&device->resource->req_lock);
978 	put_ldev(device);
979 out:
980 	device->rs_total  = 0;
981 	device->rs_failed = 0;
982 	device->rs_paused = 0;
983 
984 	/* reset start sector, if we reached end of device */
985 	if (verify_done && device->ov_left == 0)
986 		device->ov_start_sector = 0;
987 
988 	drbd_md_sync(device);
989 
990 	if (khelper_cmd)
991 		drbd_khelper(device, khelper_cmd);
992 
993 	return 1;
994 }
995 
996 /* helper */
997 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
998 {
999 	if (drbd_peer_req_has_active_page(peer_req)) {
1000 		/* This might happen if sendpage() has not finished */
1001 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1002 		atomic_add(i, &device->pp_in_use_by_net);
1003 		atomic_sub(i, &device->pp_in_use);
1004 		spin_lock_irq(&device->resource->req_lock);
1005 		list_add_tail(&peer_req->w.list, &device->net_ee);
1006 		spin_unlock_irq(&device->resource->req_lock);
1007 		wake_up(&drbd_pp_wait);
1008 	} else
1009 		drbd_free_peer_req(device, peer_req);
1010 }
1011 
1012 /**
1013  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1014  * @device:	DRBD device.
1015  * @w:		work object.
1016  * @cancel:	The connection will be closed anyways
1017  */
1018 int w_e_end_data_req(struct drbd_work *w, int cancel)
1019 {
1020 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1021 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1022 	struct drbd_device *device = peer_device->device;
1023 	int err;
1024 
1025 	if (unlikely(cancel)) {
1026 		drbd_free_peer_req(device, peer_req);
1027 		dec_unacked(device);
1028 		return 0;
1029 	}
1030 
1031 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1032 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1033 	} else {
1034 		if (__ratelimit(&drbd_ratelimit_state))
1035 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1036 			    (unsigned long long)peer_req->i.sector);
1037 
1038 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1039 	}
1040 
1041 	dec_unacked(device);
1042 
1043 	move_to_net_ee_or_free(device, peer_req);
1044 
1045 	if (unlikely(err))
1046 		drbd_err(device, "drbd_send_block() failed\n");
1047 	return err;
1048 }
1049 
1050 /**
1051  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1052  * @w:		work object.
1053  * @cancel:	The connection will be closed anyways
1054  */
1055 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1056 {
1057 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1059 	struct drbd_device *device = peer_device->device;
1060 	int err;
1061 
1062 	if (unlikely(cancel)) {
1063 		drbd_free_peer_req(device, peer_req);
1064 		dec_unacked(device);
1065 		return 0;
1066 	}
1067 
1068 	if (get_ldev_if_state(device, D_FAILED)) {
1069 		drbd_rs_complete_io(device, peer_req->i.sector);
1070 		put_ldev(device);
1071 	}
1072 
1073 	if (device->state.conn == C_AHEAD) {
1074 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1075 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1076 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1077 			inc_rs_pending(device);
1078 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1079 		} else {
1080 			if (__ratelimit(&drbd_ratelimit_state))
1081 				drbd_err(device, "Not sending RSDataReply, "
1082 				    "partner DISKLESS!\n");
1083 			err = 0;
1084 		}
1085 	} else {
1086 		if (__ratelimit(&drbd_ratelimit_state))
1087 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1088 			    (unsigned long long)peer_req->i.sector);
1089 
1090 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1091 
1092 		/* update resync data with failure */
1093 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1094 	}
1095 
1096 	dec_unacked(device);
1097 
1098 	move_to_net_ee_or_free(device, peer_req);
1099 
1100 	if (unlikely(err))
1101 		drbd_err(device, "drbd_send_block() failed\n");
1102 	return err;
1103 }
1104 
1105 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1106 {
1107 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1108 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1109 	struct drbd_device *device = peer_device->device;
1110 	struct digest_info *di;
1111 	int digest_size;
1112 	void *digest = NULL;
1113 	int err, eq = 0;
1114 
1115 	if (unlikely(cancel)) {
1116 		drbd_free_peer_req(device, peer_req);
1117 		dec_unacked(device);
1118 		return 0;
1119 	}
1120 
1121 	if (get_ldev(device)) {
1122 		drbd_rs_complete_io(device, peer_req->i.sector);
1123 		put_ldev(device);
1124 	}
1125 
1126 	di = peer_req->digest;
1127 
1128 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1129 		/* quick hack to try to avoid a race against reconfiguration.
1130 		 * a real fix would be much more involved,
1131 		 * introducing more locking mechanisms */
1132 		if (peer_device->connection->csums_tfm) {
1133 			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1134 			D_ASSERT(device, digest_size == di->digest_size);
1135 			digest = kmalloc(digest_size, GFP_NOIO);
1136 		}
1137 		if (digest) {
1138 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1139 			eq = !memcmp(digest, di->digest, digest_size);
1140 			kfree(digest);
1141 		}
1142 
1143 		if (eq) {
1144 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1145 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1146 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1147 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1148 		} else {
1149 			inc_rs_pending(device);
1150 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1151 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1152 			kfree(di);
1153 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154 		}
1155 	} else {
1156 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1157 		if (__ratelimit(&drbd_ratelimit_state))
1158 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1159 	}
1160 
1161 	dec_unacked(device);
1162 	move_to_net_ee_or_free(device, peer_req);
1163 
1164 	if (unlikely(err))
1165 		drbd_err(device, "drbd_send_block/ack() failed\n");
1166 	return err;
1167 }
1168 
1169 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1170 {
1171 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1173 	struct drbd_device *device = peer_device->device;
1174 	sector_t sector = peer_req->i.sector;
1175 	unsigned int size = peer_req->i.size;
1176 	int digest_size;
1177 	void *digest;
1178 	int err = 0;
1179 
1180 	if (unlikely(cancel))
1181 		goto out;
1182 
1183 	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1184 	digest = kmalloc(digest_size, GFP_NOIO);
1185 	if (!digest) {
1186 		err = 1;	/* terminate the connection in case the allocation failed */
1187 		goto out;
1188 	}
1189 
1190 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1191 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1192 	else
1193 		memset(digest, 0, digest_size);
1194 
1195 	/* Free e and pages before send.
1196 	 * In case we block on congestion, we could otherwise run into
1197 	 * some distributed deadlock, if the other side blocks on
1198 	 * congestion as well, because our receiver blocks in
1199 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1200 	drbd_free_peer_req(device, peer_req);
1201 	peer_req = NULL;
1202 	inc_rs_pending(device);
1203 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1204 	if (err)
1205 		dec_rs_pending(device);
1206 	kfree(digest);
1207 
1208 out:
1209 	if (peer_req)
1210 		drbd_free_peer_req(device, peer_req);
1211 	dec_unacked(device);
1212 	return err;
1213 }
1214 
1215 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1216 {
1217 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1218 		device->ov_last_oos_size += size>>9;
1219 	} else {
1220 		device->ov_last_oos_start = sector;
1221 		device->ov_last_oos_size = size>>9;
1222 	}
1223 	drbd_set_out_of_sync(device, sector, size);
1224 }
1225 
1226 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1227 {
1228 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1229 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1230 	struct drbd_device *device = peer_device->device;
1231 	struct digest_info *di;
1232 	void *digest;
1233 	sector_t sector = peer_req->i.sector;
1234 	unsigned int size = peer_req->i.size;
1235 	int digest_size;
1236 	int err, eq = 0;
1237 	bool stop_sector_reached = false;
1238 
1239 	if (unlikely(cancel)) {
1240 		drbd_free_peer_req(device, peer_req);
1241 		dec_unacked(device);
1242 		return 0;
1243 	}
1244 
1245 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1246 	 * the resync lru has been cleaned up already */
1247 	if (get_ldev(device)) {
1248 		drbd_rs_complete_io(device, peer_req->i.sector);
1249 		put_ldev(device);
1250 	}
1251 
1252 	di = peer_req->digest;
1253 
1254 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1255 		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1256 		digest = kmalloc(digest_size, GFP_NOIO);
1257 		if (digest) {
1258 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1259 
1260 			D_ASSERT(device, digest_size == di->digest_size);
1261 			eq = !memcmp(digest, di->digest, digest_size);
1262 			kfree(digest);
1263 		}
1264 	}
1265 
1266 	/* Free peer_req and pages before send.
1267 	 * In case we block on congestion, we could otherwise run into
1268 	 * some distributed deadlock, if the other side blocks on
1269 	 * congestion as well, because our receiver blocks in
1270 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1271 	drbd_free_peer_req(device, peer_req);
1272 	if (!eq)
1273 		drbd_ov_out_of_sync_found(device, sector, size);
1274 	else
1275 		ov_out_of_sync_print(device);
1276 
1277 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1278 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1279 
1280 	dec_unacked(device);
1281 
1282 	--device->ov_left;
1283 
1284 	/* let's advance progress step marks only for every other megabyte */
1285 	if ((device->ov_left & 0x200) == 0x200)
1286 		drbd_advance_rs_marks(device, device->ov_left);
1287 
1288 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1289 		(sector + (size>>9)) >= device->ov_stop_sector;
1290 
1291 	if (device->ov_left == 0 || stop_sector_reached) {
1292 		ov_out_of_sync_print(device);
1293 		drbd_resync_finished(device);
1294 	}
1295 
1296 	return err;
1297 }
1298 
1299 /* FIXME
1300  * We need to track the number of pending barrier acks,
1301  * and to be able to wait for them.
1302  * See also comment in drbd_adm_attach before drbd_suspend_io.
1303  */
1304 static int drbd_send_barrier(struct drbd_connection *connection)
1305 {
1306 	struct p_barrier *p;
1307 	struct drbd_socket *sock;
1308 
1309 	sock = &connection->data;
1310 	p = conn_prepare_command(connection, sock);
1311 	if (!p)
1312 		return -EIO;
1313 	p->barrier = connection->send.current_epoch_nr;
1314 	p->pad = 0;
1315 	connection->send.current_epoch_writes = 0;
1316 
1317 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1318 }
1319 
1320 int w_send_write_hint(struct drbd_work *w, int cancel)
1321 {
1322 	struct drbd_device *device =
1323 		container_of(w, struct drbd_device, unplug_work);
1324 	struct drbd_socket *sock;
1325 
1326 	if (cancel)
1327 		return 0;
1328 	sock = &first_peer_device(device)->connection->data;
1329 	if (!drbd_prepare_command(first_peer_device(device), sock))
1330 		return -EIO;
1331 	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1332 }
1333 
1334 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1335 {
1336 	if (!connection->send.seen_any_write_yet) {
1337 		connection->send.seen_any_write_yet = true;
1338 		connection->send.current_epoch_nr = epoch;
1339 		connection->send.current_epoch_writes = 0;
1340 	}
1341 }
1342 
1343 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1344 {
1345 	/* re-init if first write on this connection */
1346 	if (!connection->send.seen_any_write_yet)
1347 		return;
1348 	if (connection->send.current_epoch_nr != epoch) {
1349 		if (connection->send.current_epoch_writes)
1350 			drbd_send_barrier(connection);
1351 		connection->send.current_epoch_nr = epoch;
1352 	}
1353 }
1354 
1355 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1356 {
1357 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1358 	struct drbd_device *device = req->device;
1359 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1360 	struct drbd_connection *const connection = peer_device->connection;
1361 	int err;
1362 
1363 	if (unlikely(cancel)) {
1364 		req_mod(req, SEND_CANCELED);
1365 		return 0;
1366 	}
1367 	req->pre_send_jif = jiffies;
1368 
1369 	/* this time, no connection->send.current_epoch_writes++;
1370 	 * If it was sent, it was the closing barrier for the last
1371 	 * replicated epoch, before we went into AHEAD mode.
1372 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1373 	maybe_send_barrier(connection, req->epoch);
1374 
1375 	err = drbd_send_out_of_sync(peer_device, req);
1376 	req_mod(req, OOS_HANDED_TO_NETWORK);
1377 
1378 	return err;
1379 }
1380 
1381 /**
1382  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1383  * @w:		work object.
1384  * @cancel:	The connection will be closed anyways
1385  */
1386 int w_send_dblock(struct drbd_work *w, int cancel)
1387 {
1388 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1389 	struct drbd_device *device = req->device;
1390 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1391 	struct drbd_connection *connection = peer_device->connection;
1392 	int err;
1393 
1394 	if (unlikely(cancel)) {
1395 		req_mod(req, SEND_CANCELED);
1396 		return 0;
1397 	}
1398 	req->pre_send_jif = jiffies;
1399 
1400 	re_init_if_first_write(connection, req->epoch);
1401 	maybe_send_barrier(connection, req->epoch);
1402 	connection->send.current_epoch_writes++;
1403 
1404 	err = drbd_send_dblock(peer_device, req);
1405 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1406 
1407 	return err;
1408 }
1409 
1410 /**
1411  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1412  * @w:		work object.
1413  * @cancel:	The connection will be closed anyways
1414  */
1415 int w_send_read_req(struct drbd_work *w, int cancel)
1416 {
1417 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1418 	struct drbd_device *device = req->device;
1419 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1420 	struct drbd_connection *connection = peer_device->connection;
1421 	int err;
1422 
1423 	if (unlikely(cancel)) {
1424 		req_mod(req, SEND_CANCELED);
1425 		return 0;
1426 	}
1427 	req->pre_send_jif = jiffies;
1428 
1429 	/* Even read requests may close a write epoch,
1430 	 * if there was any yet. */
1431 	maybe_send_barrier(connection, req->epoch);
1432 
1433 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1434 				 (unsigned long)req);
1435 
1436 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1437 
1438 	return err;
1439 }
1440 
1441 int w_restart_disk_io(struct drbd_work *w, int cancel)
1442 {
1443 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1444 	struct drbd_device *device = req->device;
1445 
1446 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1447 		drbd_al_begin_io(device, &req->i);
1448 
1449 	drbd_req_make_private_bio(req, req->master_bio);
1450 	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1451 	generic_make_request(req->private_bio);
1452 
1453 	return 0;
1454 }
1455 
1456 static int _drbd_may_sync_now(struct drbd_device *device)
1457 {
1458 	struct drbd_device *odev = device;
1459 	int resync_after;
1460 
1461 	while (1) {
1462 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1463 			return 1;
1464 		rcu_read_lock();
1465 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1466 		rcu_read_unlock();
1467 		if (resync_after == -1)
1468 			return 1;
1469 		odev = minor_to_device(resync_after);
1470 		if (!odev)
1471 			return 1;
1472 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1473 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1474 		    odev->state.aftr_isp || odev->state.peer_isp ||
1475 		    odev->state.user_isp)
1476 			return 0;
1477 	}
1478 }
1479 
1480 /**
1481  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1482  * @device:	DRBD device.
1483  *
1484  * Called from process context only (admin command and after_state_ch).
1485  */
1486 static int _drbd_pause_after(struct drbd_device *device)
1487 {
1488 	struct drbd_device *odev;
1489 	int i, rv = 0;
1490 
1491 	rcu_read_lock();
1492 	idr_for_each_entry(&drbd_devices, odev, i) {
1493 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1494 			continue;
1495 		if (!_drbd_may_sync_now(odev))
1496 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1497 			       != SS_NOTHING_TO_DO);
1498 	}
1499 	rcu_read_unlock();
1500 
1501 	return rv;
1502 }
1503 
1504 /**
1505  * _drbd_resume_next() - Resume resync on all devices that may resync now
1506  * @device:	DRBD device.
1507  *
1508  * Called from process context only (admin command and worker).
1509  */
1510 static int _drbd_resume_next(struct drbd_device *device)
1511 {
1512 	struct drbd_device *odev;
1513 	int i, rv = 0;
1514 
1515 	rcu_read_lock();
1516 	idr_for_each_entry(&drbd_devices, odev, i) {
1517 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1518 			continue;
1519 		if (odev->state.aftr_isp) {
1520 			if (_drbd_may_sync_now(odev))
1521 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1522 							CS_HARD, NULL)
1523 				       != SS_NOTHING_TO_DO) ;
1524 		}
1525 	}
1526 	rcu_read_unlock();
1527 	return rv;
1528 }
1529 
1530 void resume_next_sg(struct drbd_device *device)
1531 {
1532 	write_lock_irq(&global_state_lock);
1533 	_drbd_resume_next(device);
1534 	write_unlock_irq(&global_state_lock);
1535 }
1536 
1537 void suspend_other_sg(struct drbd_device *device)
1538 {
1539 	write_lock_irq(&global_state_lock);
1540 	_drbd_pause_after(device);
1541 	write_unlock_irq(&global_state_lock);
1542 }
1543 
1544 /* caller must hold global_state_lock */
1545 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1546 {
1547 	struct drbd_device *odev;
1548 	int resync_after;
1549 
1550 	if (o_minor == -1)
1551 		return NO_ERROR;
1552 	if (o_minor < -1 || o_minor > MINORMASK)
1553 		return ERR_RESYNC_AFTER;
1554 
1555 	/* check for loops */
1556 	odev = minor_to_device(o_minor);
1557 	while (1) {
1558 		if (odev == device)
1559 			return ERR_RESYNC_AFTER_CYCLE;
1560 
1561 		/* You are free to depend on diskless, non-existing,
1562 		 * or not yet/no longer existing minors.
1563 		 * We only reject dependency loops.
1564 		 * We cannot follow the dependency chain beyond a detached or
1565 		 * missing minor.
1566 		 */
1567 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1568 			return NO_ERROR;
1569 
1570 		rcu_read_lock();
1571 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1572 		rcu_read_unlock();
1573 		/* dependency chain ends here, no cycles. */
1574 		if (resync_after == -1)
1575 			return NO_ERROR;
1576 
1577 		/* follow the dependency chain */
1578 		odev = minor_to_device(resync_after);
1579 	}
1580 }
1581 
1582 /* caller must hold global_state_lock */
1583 void drbd_resync_after_changed(struct drbd_device *device)
1584 {
1585 	int changes;
1586 
1587 	do {
1588 		changes  = _drbd_pause_after(device);
1589 		changes |= _drbd_resume_next(device);
1590 	} while (changes);
1591 }
1592 
1593 void drbd_rs_controller_reset(struct drbd_device *device)
1594 {
1595 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1596 	struct fifo_buffer *plan;
1597 
1598 	atomic_set(&device->rs_sect_in, 0);
1599 	atomic_set(&device->rs_sect_ev, 0);
1600 	device->rs_in_flight = 0;
1601 	device->rs_last_events =
1602 		(int)part_stat_read(&disk->part0, sectors[0]) +
1603 		(int)part_stat_read(&disk->part0, sectors[1]);
1604 
1605 	/* Updating the RCU protected object in place is necessary since
1606 	   this function gets called from atomic context.
1607 	   It is valid since all other updates also lead to an completely
1608 	   empty fifo */
1609 	rcu_read_lock();
1610 	plan = rcu_dereference(device->rs_plan_s);
1611 	plan->total = 0;
1612 	fifo_set(plan, 0);
1613 	rcu_read_unlock();
1614 }
1615 
1616 void start_resync_timer_fn(unsigned long data)
1617 {
1618 	struct drbd_device *device = (struct drbd_device *) data;
1619 	drbd_device_post_work(device, RS_START);
1620 }
1621 
1622 static void do_start_resync(struct drbd_device *device)
1623 {
1624 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1625 		drbd_warn(device, "postponing start_resync ...\n");
1626 		device->start_resync_timer.expires = jiffies + HZ/10;
1627 		add_timer(&device->start_resync_timer);
1628 		return;
1629 	}
1630 
1631 	drbd_start_resync(device, C_SYNC_SOURCE);
1632 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1633 }
1634 
1635 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1636 {
1637 	bool csums_after_crash_only;
1638 	rcu_read_lock();
1639 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1640 	rcu_read_unlock();
1641 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1642 		connection->csums_tfm &&			/* configured? */
1643 		(csums_after_crash_only == 0			/* use for each resync? */
1644 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1645 }
1646 
1647 /**
1648  * drbd_start_resync() - Start the resync process
1649  * @device:	DRBD device.
1650  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1651  *
1652  * This function might bring you directly into one of the
1653  * C_PAUSED_SYNC_* states.
1654  */
1655 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1656 {
1657 	struct drbd_peer_device *peer_device = first_peer_device(device);
1658 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1659 	union drbd_state ns;
1660 	int r;
1661 
1662 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1663 		drbd_err(device, "Resync already running!\n");
1664 		return;
1665 	}
1666 
1667 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1668 		if (side == C_SYNC_TARGET) {
1669 			/* Since application IO was locked out during C_WF_BITMAP_T and
1670 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1671 			   we check that we might make the data inconsistent. */
1672 			r = drbd_khelper(device, "before-resync-target");
1673 			r = (r >> 8) & 0xff;
1674 			if (r > 0) {
1675 				drbd_info(device, "before-resync-target handler returned %d, "
1676 					 "dropping connection.\n", r);
1677 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1678 				return;
1679 			}
1680 		} else /* C_SYNC_SOURCE */ {
1681 			r = drbd_khelper(device, "before-resync-source");
1682 			r = (r >> 8) & 0xff;
1683 			if (r > 0) {
1684 				if (r == 3) {
1685 					drbd_info(device, "before-resync-source handler returned %d, "
1686 						 "ignoring. Old userland tools?", r);
1687 				} else {
1688 					drbd_info(device, "before-resync-source handler returned %d, "
1689 						 "dropping connection.\n", r);
1690 					conn_request_state(connection,
1691 							   NS(conn, C_DISCONNECTING), CS_HARD);
1692 					return;
1693 				}
1694 			}
1695 		}
1696 	}
1697 
1698 	if (current == connection->worker.task) {
1699 		/* The worker should not sleep waiting for state_mutex,
1700 		   that can take long */
1701 		if (!mutex_trylock(device->state_mutex)) {
1702 			set_bit(B_RS_H_DONE, &device->flags);
1703 			device->start_resync_timer.expires = jiffies + HZ/5;
1704 			add_timer(&device->start_resync_timer);
1705 			return;
1706 		}
1707 	} else {
1708 		mutex_lock(device->state_mutex);
1709 	}
1710 	clear_bit(B_RS_H_DONE, &device->flags);
1711 
1712 	/* req_lock: serialize with drbd_send_and_submit() and others
1713 	 * global_state_lock: for stable sync-after dependencies */
1714 	spin_lock_irq(&device->resource->req_lock);
1715 	write_lock(&global_state_lock);
1716 	/* Did some connection breakage or IO error race with us? */
1717 	if (device->state.conn < C_CONNECTED
1718 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1719 		write_unlock(&global_state_lock);
1720 		spin_unlock_irq(&device->resource->req_lock);
1721 		mutex_unlock(device->state_mutex);
1722 		return;
1723 	}
1724 
1725 	ns = drbd_read_state(device);
1726 
1727 	ns.aftr_isp = !_drbd_may_sync_now(device);
1728 
1729 	ns.conn = side;
1730 
1731 	if (side == C_SYNC_TARGET)
1732 		ns.disk = D_INCONSISTENT;
1733 	else /* side == C_SYNC_SOURCE */
1734 		ns.pdsk = D_INCONSISTENT;
1735 
1736 	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1737 	ns = drbd_read_state(device);
1738 
1739 	if (ns.conn < C_CONNECTED)
1740 		r = SS_UNKNOWN_ERROR;
1741 
1742 	if (r == SS_SUCCESS) {
1743 		unsigned long tw = drbd_bm_total_weight(device);
1744 		unsigned long now = jiffies;
1745 		int i;
1746 
1747 		device->rs_failed    = 0;
1748 		device->rs_paused    = 0;
1749 		device->rs_same_csum = 0;
1750 		device->rs_last_sect_ev = 0;
1751 		device->rs_total     = tw;
1752 		device->rs_start     = now;
1753 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1754 			device->rs_mark_left[i] = tw;
1755 			device->rs_mark_time[i] = now;
1756 		}
1757 		_drbd_pause_after(device);
1758 		/* Forget potentially stale cached per resync extent bit-counts.
1759 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1760 		 * disabled, and know the disk state is ok. */
1761 		spin_lock(&device->al_lock);
1762 		lc_reset(device->resync);
1763 		device->resync_locked = 0;
1764 		device->resync_wenr = LC_FREE;
1765 		spin_unlock(&device->al_lock);
1766 	}
1767 	write_unlock(&global_state_lock);
1768 	spin_unlock_irq(&device->resource->req_lock);
1769 
1770 	if (r == SS_SUCCESS) {
1771 		wake_up(&device->al_wait); /* for lc_reset() above */
1772 		/* reset rs_last_bcast when a resync or verify is started,
1773 		 * to deal with potential jiffies wrap. */
1774 		device->rs_last_bcast = jiffies - HZ;
1775 
1776 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1777 		     drbd_conn_str(ns.conn),
1778 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1779 		     (unsigned long) device->rs_total);
1780 		if (side == C_SYNC_TARGET) {
1781 			device->bm_resync_fo = 0;
1782 			device->use_csums = use_checksum_based_resync(connection, device);
1783 		} else {
1784 			device->use_csums = 0;
1785 		}
1786 
1787 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1788 		 * with w_send_oos, or the sync target will get confused as to
1789 		 * how much bits to resync.  We cannot do that always, because for an
1790 		 * empty resync and protocol < 95, we need to do it here, as we call
1791 		 * drbd_resync_finished from here in that case.
1792 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1793 		 * and from after_state_ch otherwise. */
1794 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1795 			drbd_gen_and_send_sync_uuid(peer_device);
1796 
1797 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1798 			/* This still has a race (about when exactly the peers
1799 			 * detect connection loss) that can lead to a full sync
1800 			 * on next handshake. In 8.3.9 we fixed this with explicit
1801 			 * resync-finished notifications, but the fix
1802 			 * introduces a protocol change.  Sleeping for some
1803 			 * time longer than the ping interval + timeout on the
1804 			 * SyncSource, to give the SyncTarget the chance to
1805 			 * detect connection loss, then waiting for a ping
1806 			 * response (implicit in drbd_resync_finished) reduces
1807 			 * the race considerably, but does not solve it. */
1808 			if (side == C_SYNC_SOURCE) {
1809 				struct net_conf *nc;
1810 				int timeo;
1811 
1812 				rcu_read_lock();
1813 				nc = rcu_dereference(connection->net_conf);
1814 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1815 				rcu_read_unlock();
1816 				schedule_timeout_interruptible(timeo);
1817 			}
1818 			drbd_resync_finished(device);
1819 		}
1820 
1821 		drbd_rs_controller_reset(device);
1822 		/* ns.conn may already be != device->state.conn,
1823 		 * we may have been paused in between, or become paused until
1824 		 * the timer triggers.
1825 		 * No matter, that is handled in resync_timer_fn() */
1826 		if (ns.conn == C_SYNC_TARGET)
1827 			mod_timer(&device->resync_timer, jiffies);
1828 
1829 		drbd_md_sync(device);
1830 	}
1831 	put_ldev(device);
1832 	mutex_unlock(device->state_mutex);
1833 }
1834 
1835 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1836 {
1837 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1838 	device->rs_last_bcast = jiffies;
1839 
1840 	if (!get_ldev(device))
1841 		return;
1842 
1843 	drbd_bm_write_lazy(device, 0);
1844 	if (resync_done && is_sync_state(device->state.conn))
1845 		drbd_resync_finished(device);
1846 
1847 	drbd_bcast_event(device, &sib);
1848 	/* update timestamp, in case it took a while to write out stuff */
1849 	device->rs_last_bcast = jiffies;
1850 	put_ldev(device);
1851 }
1852 
1853 static void drbd_ldev_destroy(struct drbd_device *device)
1854 {
1855 	lc_destroy(device->resync);
1856 	device->resync = NULL;
1857 	lc_destroy(device->act_log);
1858 	device->act_log = NULL;
1859 
1860 	__acquire(local);
1861 	drbd_free_ldev(device->ldev);
1862 	device->ldev = NULL;
1863 	__release(local);
1864 
1865 	clear_bit(GOING_DISKLESS, &device->flags);
1866 	wake_up(&device->misc_wait);
1867 }
1868 
1869 static void go_diskless(struct drbd_device *device)
1870 {
1871 	D_ASSERT(device, device->state.disk == D_FAILED);
1872 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1873 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1874 	 * the protected members anymore, though, so once put_ldev reaches zero
1875 	 * again, it will be safe to free them. */
1876 
1877 	/* Try to write changed bitmap pages, read errors may have just
1878 	 * set some bits outside the area covered by the activity log.
1879 	 *
1880 	 * If we have an IO error during the bitmap writeout,
1881 	 * we will want a full sync next time, just in case.
1882 	 * (Do we want a specific meta data flag for this?)
1883 	 *
1884 	 * If that does not make it to stable storage either,
1885 	 * we cannot do anything about that anymore.
1886 	 *
1887 	 * We still need to check if both bitmap and ldev are present, we may
1888 	 * end up here after a failed attach, before ldev was even assigned.
1889 	 */
1890 	if (device->bitmap && device->ldev) {
1891 		/* An interrupted resync or similar is allowed to recounts bits
1892 		 * while we detach.
1893 		 * Any modifications would not be expected anymore, though.
1894 		 */
1895 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1896 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1897 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1898 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1899 				drbd_md_sync(device);
1900 			}
1901 		}
1902 	}
1903 
1904 	drbd_force_state(device, NS(disk, D_DISKLESS));
1905 }
1906 
1907 static int do_md_sync(struct drbd_device *device)
1908 {
1909 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1910 	drbd_md_sync(device);
1911 	return 0;
1912 }
1913 
1914 /* only called from drbd_worker thread, no locking */
1915 void __update_timing_details(
1916 		struct drbd_thread_timing_details *tdp,
1917 		unsigned int *cb_nr,
1918 		void *cb,
1919 		const char *fn, const unsigned int line)
1920 {
1921 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1922 	struct drbd_thread_timing_details *td = tdp + i;
1923 
1924 	td->start_jif = jiffies;
1925 	td->cb_addr = cb;
1926 	td->caller_fn = fn;
1927 	td->line = line;
1928 	td->cb_nr = *cb_nr;
1929 
1930 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1931 	td = tdp + i;
1932 	memset(td, 0, sizeof(*td));
1933 
1934 	++(*cb_nr);
1935 }
1936 
1937 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1938 {
1939 	if (test_bit(MD_SYNC, &todo))
1940 		do_md_sync(device);
1941 	if (test_bit(RS_DONE, &todo) ||
1942 	    test_bit(RS_PROGRESS, &todo))
1943 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1944 	if (test_bit(GO_DISKLESS, &todo))
1945 		go_diskless(device);
1946 	if (test_bit(DESTROY_DISK, &todo))
1947 		drbd_ldev_destroy(device);
1948 	if (test_bit(RS_START, &todo))
1949 		do_start_resync(device);
1950 }
1951 
1952 #define DRBD_DEVICE_WORK_MASK	\
1953 	((1UL << GO_DISKLESS)	\
1954 	|(1UL << DESTROY_DISK)	\
1955 	|(1UL << MD_SYNC)	\
1956 	|(1UL << RS_START)	\
1957 	|(1UL << RS_PROGRESS)	\
1958 	|(1UL << RS_DONE)	\
1959 	)
1960 
1961 static unsigned long get_work_bits(unsigned long *flags)
1962 {
1963 	unsigned long old, new;
1964 	do {
1965 		old = *flags;
1966 		new = old & ~DRBD_DEVICE_WORK_MASK;
1967 	} while (cmpxchg(flags, old, new) != old);
1968 	return old & DRBD_DEVICE_WORK_MASK;
1969 }
1970 
1971 static void do_unqueued_work(struct drbd_connection *connection)
1972 {
1973 	struct drbd_peer_device *peer_device;
1974 	int vnr;
1975 
1976 	rcu_read_lock();
1977 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1978 		struct drbd_device *device = peer_device->device;
1979 		unsigned long todo = get_work_bits(&device->flags);
1980 		if (!todo)
1981 			continue;
1982 
1983 		kref_get(&device->kref);
1984 		rcu_read_unlock();
1985 		do_device_work(device, todo);
1986 		kref_put(&device->kref, drbd_destroy_device);
1987 		rcu_read_lock();
1988 	}
1989 	rcu_read_unlock();
1990 }
1991 
1992 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1993 {
1994 	spin_lock_irq(&queue->q_lock);
1995 	list_splice_tail_init(&queue->q, work_list);
1996 	spin_unlock_irq(&queue->q_lock);
1997 	return !list_empty(work_list);
1998 }
1999 
2000 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2001 {
2002 	DEFINE_WAIT(wait);
2003 	struct net_conf *nc;
2004 	int uncork, cork;
2005 
2006 	dequeue_work_batch(&connection->sender_work, work_list);
2007 	if (!list_empty(work_list))
2008 		return;
2009 
2010 	/* Still nothing to do?
2011 	 * Maybe we still need to close the current epoch,
2012 	 * even if no new requests are queued yet.
2013 	 *
2014 	 * Also, poke TCP, just in case.
2015 	 * Then wait for new work (or signal). */
2016 	rcu_read_lock();
2017 	nc = rcu_dereference(connection->net_conf);
2018 	uncork = nc ? nc->tcp_cork : 0;
2019 	rcu_read_unlock();
2020 	if (uncork) {
2021 		mutex_lock(&connection->data.mutex);
2022 		if (connection->data.socket)
2023 			drbd_tcp_uncork(connection->data.socket);
2024 		mutex_unlock(&connection->data.mutex);
2025 	}
2026 
2027 	for (;;) {
2028 		int send_barrier;
2029 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2030 		spin_lock_irq(&connection->resource->req_lock);
2031 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2032 		if (!list_empty(&connection->sender_work.q))
2033 			list_splice_tail_init(&connection->sender_work.q, work_list);
2034 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2035 		if (!list_empty(work_list) || signal_pending(current)) {
2036 			spin_unlock_irq(&connection->resource->req_lock);
2037 			break;
2038 		}
2039 
2040 		/* We found nothing new to do, no to-be-communicated request,
2041 		 * no other work item.  We may still need to close the last
2042 		 * epoch.  Next incoming request epoch will be connection ->
2043 		 * current transfer log epoch number.  If that is different
2044 		 * from the epoch of the last request we communicated, it is
2045 		 * safe to send the epoch separating barrier now.
2046 		 */
2047 		send_barrier =
2048 			atomic_read(&connection->current_tle_nr) !=
2049 			connection->send.current_epoch_nr;
2050 		spin_unlock_irq(&connection->resource->req_lock);
2051 
2052 		if (send_barrier)
2053 			maybe_send_barrier(connection,
2054 					connection->send.current_epoch_nr + 1);
2055 
2056 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2057 			break;
2058 
2059 		/* drbd_send() may have called flush_signals() */
2060 		if (get_t_state(&connection->worker) != RUNNING)
2061 			break;
2062 
2063 		schedule();
2064 		/* may be woken up for other things but new work, too,
2065 		 * e.g. if the current epoch got closed.
2066 		 * In which case we send the barrier above. */
2067 	}
2068 	finish_wait(&connection->sender_work.q_wait, &wait);
2069 
2070 	/* someone may have changed the config while we have been waiting above. */
2071 	rcu_read_lock();
2072 	nc = rcu_dereference(connection->net_conf);
2073 	cork = nc ? nc->tcp_cork : 0;
2074 	rcu_read_unlock();
2075 	mutex_lock(&connection->data.mutex);
2076 	if (connection->data.socket) {
2077 		if (cork)
2078 			drbd_tcp_cork(connection->data.socket);
2079 		else if (!uncork)
2080 			drbd_tcp_uncork(connection->data.socket);
2081 	}
2082 	mutex_unlock(&connection->data.mutex);
2083 }
2084 
2085 int drbd_worker(struct drbd_thread *thi)
2086 {
2087 	struct drbd_connection *connection = thi->connection;
2088 	struct drbd_work *w = NULL;
2089 	struct drbd_peer_device *peer_device;
2090 	LIST_HEAD(work_list);
2091 	int vnr;
2092 
2093 	while (get_t_state(thi) == RUNNING) {
2094 		drbd_thread_current_set_cpu(thi);
2095 
2096 		if (list_empty(&work_list)) {
2097 			update_worker_timing_details(connection, wait_for_work);
2098 			wait_for_work(connection, &work_list);
2099 		}
2100 
2101 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2102 			update_worker_timing_details(connection, do_unqueued_work);
2103 			do_unqueued_work(connection);
2104 		}
2105 
2106 		if (signal_pending(current)) {
2107 			flush_signals(current);
2108 			if (get_t_state(thi) == RUNNING) {
2109 				drbd_warn(connection, "Worker got an unexpected signal\n");
2110 				continue;
2111 			}
2112 			break;
2113 		}
2114 
2115 		if (get_t_state(thi) != RUNNING)
2116 			break;
2117 
2118 		if (!list_empty(&work_list)) {
2119 			w = list_first_entry(&work_list, struct drbd_work, list);
2120 			list_del_init(&w->list);
2121 			update_worker_timing_details(connection, w->cb);
2122 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2123 				continue;
2124 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2125 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2126 		}
2127 	}
2128 
2129 	do {
2130 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2131 			update_worker_timing_details(connection, do_unqueued_work);
2132 			do_unqueued_work(connection);
2133 		}
2134 		if (!list_empty(&work_list)) {
2135 			w = list_first_entry(&work_list, struct drbd_work, list);
2136 			list_del_init(&w->list);
2137 			update_worker_timing_details(connection, w->cb);
2138 			w->cb(w, 1);
2139 		} else
2140 			dequeue_work_batch(&connection->sender_work, &work_list);
2141 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2142 
2143 	rcu_read_lock();
2144 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2145 		struct drbd_device *device = peer_device->device;
2146 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2147 		kref_get(&device->kref);
2148 		rcu_read_unlock();
2149 		drbd_device_cleanup(device);
2150 		kref_put(&device->kref, drbd_destroy_device);
2151 		rcu_read_lock();
2152 	}
2153 	rcu_read_unlock();
2154 
2155 	return 0;
2156 }
2157