xref: /linux/drivers/block/drbd/drbd_worker.c (revision 12871a0bd67dd4db4418e1daafcd46e9d329ef10)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43 				 struct drbd_work *w, int cancel);
44 
45 
46 
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59 
60 
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66 
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72 	struct drbd_md_io *md_io;
73 
74 	md_io = (struct drbd_md_io *)bio->bi_private;
75 	md_io->error = error;
76 
77 	complete(&md_io->event);
78 }
79 
80 /* reads on behalf of the partner,
81  * "submitted" by the receiver
82  */
83 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84 {
85 	unsigned long flags = 0;
86 	struct drbd_conf *mdev = e->mdev;
87 
88 	D_ASSERT(e->block_id != ID_VACANT);
89 
90 	spin_lock_irqsave(&mdev->req_lock, flags);
91 	mdev->read_cnt += e->size >> 9;
92 	list_del(&e->w.list);
93 	if (list_empty(&mdev->read_ee))
94 		wake_up(&mdev->ee_wait);
95 	if (test_bit(__EE_WAS_ERROR, &e->flags))
96 		__drbd_chk_io_error(mdev, false);
97 	spin_unlock_irqrestore(&mdev->req_lock, flags);
98 
99 	drbd_queue_work(&mdev->data.work, &e->w);
100 	put_ldev(mdev);
101 }
102 
103 /* writes on behalf of the partner, or resync writes,
104  * "submitted" by the receiver, final stage.  */
105 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
106 {
107 	unsigned long flags = 0;
108 	struct drbd_conf *mdev = e->mdev;
109 	sector_t e_sector;
110 	int do_wake;
111 	int is_syncer_req;
112 	int do_al_complete_io;
113 
114 	D_ASSERT(e->block_id != ID_VACANT);
115 
116 	/* after we moved e to done_ee,
117 	 * we may no longer access it,
118 	 * it may be freed/reused already!
119 	 * (as soon as we release the req_lock) */
120 	e_sector = e->sector;
121 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
122 	is_syncer_req = is_syncer_block_id(e->block_id);
123 
124 	spin_lock_irqsave(&mdev->req_lock, flags);
125 	mdev->writ_cnt += e->size >> 9;
126 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
127 	list_add_tail(&e->w.list, &mdev->done_ee);
128 
129 	/* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
130 	 * neither did we wake possibly waiting conflicting requests.
131 	 * done from "drbd_process_done_ee" within the appropriate w.cb
132 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
133 
134 	do_wake = is_syncer_req
135 		? list_empty(&mdev->sync_ee)
136 		: list_empty(&mdev->active_ee);
137 
138 	if (test_bit(__EE_WAS_ERROR, &e->flags))
139 		__drbd_chk_io_error(mdev, false);
140 	spin_unlock_irqrestore(&mdev->req_lock, flags);
141 
142 	if (is_syncer_req)
143 		drbd_rs_complete_io(mdev, e_sector);
144 
145 	if (do_wake)
146 		wake_up(&mdev->ee_wait);
147 
148 	if (do_al_complete_io)
149 		drbd_al_complete_io(mdev, e_sector);
150 
151 	wake_asender(mdev);
152 	put_ldev(mdev);
153 }
154 
155 /* writes on behalf of the partner, or resync writes,
156  * "submitted" by the receiver.
157  */
158 void drbd_endio_sec(struct bio *bio, int error)
159 {
160 	struct drbd_epoch_entry *e = bio->bi_private;
161 	struct drbd_conf *mdev = e->mdev;
162 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
163 	int is_write = bio_data_dir(bio) == WRITE;
164 
165 	if (error && __ratelimit(&drbd_ratelimit_state))
166 		dev_warn(DEV, "%s: error=%d s=%llus\n",
167 				is_write ? "write" : "read", error,
168 				(unsigned long long)e->sector);
169 	if (!error && !uptodate) {
170 		if (__ratelimit(&drbd_ratelimit_state))
171 			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172 					is_write ? "write" : "read",
173 					(unsigned long long)e->sector);
174 		/* strange behavior of some lower level drivers...
175 		 * fail the request by clearing the uptodate flag,
176 		 * but do not return any error?! */
177 		error = -EIO;
178 	}
179 
180 	if (error)
181 		set_bit(__EE_WAS_ERROR, &e->flags);
182 
183 	bio_put(bio); /* no need for the bio anymore */
184 	if (atomic_dec_and_test(&e->pending_bios)) {
185 		if (is_write)
186 			drbd_endio_write_sec_final(e);
187 		else
188 			drbd_endio_read_sec_final(e);
189 	}
190 }
191 
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196 	unsigned long flags;
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	struct bio_and_error m;
200 	enum drbd_req_event what;
201 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
202 
203 	if (!error && !uptodate) {
204 		dev_warn(DEV, "p %s: setting error to -EIO\n",
205 			 bio_data_dir(bio) == WRITE ? "write" : "read");
206 		/* strange behavior of some lower level drivers...
207 		 * fail the request by clearing the uptodate flag,
208 		 * but do not return any error?! */
209 		error = -EIO;
210 	}
211 
212 	/* to avoid recursion in __req_mod */
213 	if (unlikely(error)) {
214 		what = (bio_data_dir(bio) == WRITE)
215 			? write_completed_with_error
216 			: (bio_rw(bio) == READ)
217 			  ? read_completed_with_error
218 			  : read_ahead_completed_with_error;
219 	} else
220 		what = completed_ok;
221 
222 	bio_put(req->private_bio);
223 	req->private_bio = ERR_PTR(error);
224 
225 	/* not req_mod(), we need irqsave here! */
226 	spin_lock_irqsave(&mdev->req_lock, flags);
227 	__req_mod(req, what, &m);
228 	spin_unlock_irqrestore(&mdev->req_lock, flags);
229 
230 	if (m.bio)
231 		complete_master_bio(mdev, &m);
232 }
233 
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236 	struct drbd_request *req = container_of(w, struct drbd_request, w);
237 
238 	/* We should not detach for read io-error,
239 	 * but try to WRITE the P_DATA_REPLY to the failed location,
240 	 * to give the disk the chance to relocate that block */
241 
242 	spin_lock_irq(&mdev->req_lock);
243 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244 		_req_mod(req, read_retry_remote_canceled);
245 		spin_unlock_irq(&mdev->req_lock);
246 		return 1;
247 	}
248 	spin_unlock_irq(&mdev->req_lock);
249 
250 	return w_send_read_req(mdev, w, 0);
251 }
252 
253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255 	struct hash_desc desc;
256 	struct scatterlist sg;
257 	struct page *page = e->pages;
258 	struct page *tmp;
259 	unsigned len;
260 
261 	desc.tfm = tfm;
262 	desc.flags = 0;
263 
264 	sg_init_table(&sg, 1);
265 	crypto_hash_init(&desc);
266 
267 	while ((tmp = page_chain_next(page))) {
268 		/* all but the last page will be fully used */
269 		sg_set_page(&sg, page, PAGE_SIZE, 0);
270 		crypto_hash_update(&desc, &sg, sg.length);
271 		page = tmp;
272 	}
273 	/* and now the last, possibly only partially used page */
274 	len = e->size & (PAGE_SIZE - 1);
275 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276 	crypto_hash_update(&desc, &sg, sg.length);
277 	crypto_hash_final(&desc, digest);
278 }
279 
280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282 	struct hash_desc desc;
283 	struct scatterlist sg;
284 	struct bio_vec *bvec;
285 	int i;
286 
287 	desc.tfm = tfm;
288 	desc.flags = 0;
289 
290 	sg_init_table(&sg, 1);
291 	crypto_hash_init(&desc);
292 
293 	__bio_for_each_segment(bvec, bio, i, 0) {
294 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295 		crypto_hash_update(&desc, &sg, sg.length);
296 	}
297 	crypto_hash_final(&desc, digest);
298 }
299 
300 /* TODO merge common code with w_e_end_ov_req */
301 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
302 {
303 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
304 	int digest_size;
305 	void *digest;
306 	int ok = 1;
307 
308 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
309 
310 	if (unlikely(cancel))
311 		goto out;
312 
313 	if (likely((e->flags & EE_WAS_ERROR) != 0))
314 		goto out;
315 
316 	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
317 	digest = kmalloc(digest_size, GFP_NOIO);
318 	if (digest) {
319 		sector_t sector = e->sector;
320 		unsigned int size = e->size;
321 		drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
322 		/* Free e and pages before send.
323 		 * In case we block on congestion, we could otherwise run into
324 		 * some distributed deadlock, if the other side blocks on
325 		 * congestion as well, because our receiver blocks in
326 		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
327 		drbd_free_ee(mdev, e);
328 		e = NULL;
329 		inc_rs_pending(mdev);
330 		ok = drbd_send_drequest_csum(mdev, sector, size,
331 					     digest, digest_size,
332 					     P_CSUM_RS_REQUEST);
333 		kfree(digest);
334 	} else {
335 		dev_err(DEV, "kmalloc() of digest failed.\n");
336 		ok = 0;
337 	}
338 
339 out:
340 	if (e)
341 		drbd_free_ee(mdev, e);
342 
343 	if (unlikely(!ok))
344 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
345 	return ok;
346 }
347 
348 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
349 
350 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
351 {
352 	struct drbd_epoch_entry *e;
353 
354 	if (!get_ldev(mdev))
355 		return -EIO;
356 
357 	if (drbd_rs_should_slow_down(mdev, sector))
358 		goto defer;
359 
360 	/* GFP_TRY, because if there is no memory available right now, this may
361 	 * be rescheduled for later. It is "only" background resync, after all. */
362 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
363 	if (!e)
364 		goto defer;
365 
366 	e->w.cb = w_e_send_csum;
367 	spin_lock_irq(&mdev->req_lock);
368 	list_add(&e->w.list, &mdev->read_ee);
369 	spin_unlock_irq(&mdev->req_lock);
370 
371 	atomic_add(size >> 9, &mdev->rs_sect_ev);
372 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
373 		return 0;
374 
375 	/* If it failed because of ENOMEM, retry should help.  If it failed
376 	 * because bio_add_page failed (probably broken lower level driver),
377 	 * retry may or may not help.
378 	 * If it does not, you may need to force disconnect. */
379 	spin_lock_irq(&mdev->req_lock);
380 	list_del(&e->w.list);
381 	spin_unlock_irq(&mdev->req_lock);
382 
383 	drbd_free_ee(mdev, e);
384 defer:
385 	put_ldev(mdev);
386 	return -EAGAIN;
387 }
388 
389 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
390 {
391 	switch (mdev->state.conn) {
392 	case C_VERIFY_S:
393 		w_make_ov_request(mdev, w, cancel);
394 		break;
395 	case C_SYNC_TARGET:
396 		w_make_resync_request(mdev, w, cancel);
397 		break;
398 	}
399 
400 	return 1;
401 }
402 
403 void resync_timer_fn(unsigned long data)
404 {
405 	struct drbd_conf *mdev = (struct drbd_conf *) data;
406 
407 	if (list_empty(&mdev->resync_work.list))
408 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
409 }
410 
411 static void fifo_set(struct fifo_buffer *fb, int value)
412 {
413 	int i;
414 
415 	for (i = 0; i < fb->size; i++)
416 		fb->values[i] = value;
417 }
418 
419 static int fifo_push(struct fifo_buffer *fb, int value)
420 {
421 	int ov;
422 
423 	ov = fb->values[fb->head_index];
424 	fb->values[fb->head_index++] = value;
425 
426 	if (fb->head_index >= fb->size)
427 		fb->head_index = 0;
428 
429 	return ov;
430 }
431 
432 static void fifo_add_val(struct fifo_buffer *fb, int value)
433 {
434 	int i;
435 
436 	for (i = 0; i < fb->size; i++)
437 		fb->values[i] += value;
438 }
439 
440 static int drbd_rs_controller(struct drbd_conf *mdev)
441 {
442 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
443 	unsigned int want;     /* The number of sectors we want in the proxy */
444 	int req_sect; /* Number of sectors to request in this turn */
445 	int correction; /* Number of sectors more we need in the proxy*/
446 	int cps; /* correction per invocation of drbd_rs_controller() */
447 	int steps; /* Number of time steps to plan ahead */
448 	int curr_corr;
449 	int max_sect;
450 
451 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
452 	mdev->rs_in_flight -= sect_in;
453 
454 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
455 
456 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
457 
458 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
459 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
460 	} else { /* normal path */
461 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
462 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
463 	}
464 
465 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
466 
467 	/* Plan ahead */
468 	cps = correction / steps;
469 	fifo_add_val(&mdev->rs_plan_s, cps);
470 	mdev->rs_planed += cps * steps;
471 
472 	/* What we do in this step */
473 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
474 	spin_unlock(&mdev->peer_seq_lock);
475 	mdev->rs_planed -= curr_corr;
476 
477 	req_sect = sect_in + curr_corr;
478 	if (req_sect < 0)
479 		req_sect = 0;
480 
481 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
482 	if (req_sect > max_sect)
483 		req_sect = max_sect;
484 
485 	/*
486 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
487 		 sect_in, mdev->rs_in_flight, want, correction,
488 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
489 	*/
490 
491 	return req_sect;
492 }
493 
494 static int drbd_rs_number_requests(struct drbd_conf *mdev)
495 {
496 	int number;
497 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
498 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
499 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
500 	} else {
501 		mdev->c_sync_rate = mdev->sync_conf.rate;
502 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
503 	}
504 
505 	/* ignore the amount of pending requests, the resync controller should
506 	 * throttle down to incoming reply rate soon enough anyways. */
507 	return number;
508 }
509 
510 static int w_make_resync_request(struct drbd_conf *mdev,
511 				 struct drbd_work *w, int cancel)
512 {
513 	unsigned long bit;
514 	sector_t sector;
515 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
516 	int max_bio_size;
517 	int number, rollback_i, size;
518 	int align, queued, sndbuf;
519 	int i = 0;
520 
521 	if (unlikely(cancel))
522 		return 1;
523 
524 	if (mdev->rs_total == 0) {
525 		/* empty resync? */
526 		drbd_resync_finished(mdev);
527 		return 1;
528 	}
529 
530 	if (!get_ldev(mdev)) {
531 		/* Since we only need to access mdev->rsync a
532 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
533 		   to continue resync with a broken disk makes no sense at
534 		   all */
535 		dev_err(DEV, "Disk broke down during resync!\n");
536 		return 1;
537 	}
538 
539 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
540 	 * if it should be necessary */
541 	max_bio_size =
542 		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
543 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
544 
545 	number = drbd_rs_number_requests(mdev);
546 	if (number == 0)
547 		goto requeue;
548 
549 	for (i = 0; i < number; i++) {
550 		/* Stop generating RS requests, when half of the send buffer is filled */
551 		mutex_lock(&mdev->data.mutex);
552 		if (mdev->data.socket) {
553 			queued = mdev->data.socket->sk->sk_wmem_queued;
554 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
555 		} else {
556 			queued = 1;
557 			sndbuf = 0;
558 		}
559 		mutex_unlock(&mdev->data.mutex);
560 		if (queued > sndbuf / 2)
561 			goto requeue;
562 
563 next_sector:
564 		size = BM_BLOCK_SIZE;
565 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
566 
567 		if (bit == DRBD_END_OF_BITMAP) {
568 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
569 			put_ldev(mdev);
570 			return 1;
571 		}
572 
573 		sector = BM_BIT_TO_SECT(bit);
574 
575 		if (drbd_rs_should_slow_down(mdev, sector) ||
576 		    drbd_try_rs_begin_io(mdev, sector)) {
577 			mdev->bm_resync_fo = bit;
578 			goto requeue;
579 		}
580 		mdev->bm_resync_fo = bit + 1;
581 
582 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
583 			drbd_rs_complete_io(mdev, sector);
584 			goto next_sector;
585 		}
586 
587 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
588 		/* try to find some adjacent bits.
589 		 * we stop if we have already the maximum req size.
590 		 *
591 		 * Additionally always align bigger requests, in order to
592 		 * be prepared for all stripe sizes of software RAIDs.
593 		 */
594 		align = 1;
595 		rollback_i = i;
596 		for (;;) {
597 			if (size + BM_BLOCK_SIZE > max_bio_size)
598 				break;
599 
600 			/* Be always aligned */
601 			if (sector & ((1<<(align+3))-1))
602 				break;
603 
604 			/* do not cross extent boundaries */
605 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
606 				break;
607 			/* now, is it actually dirty, after all?
608 			 * caution, drbd_bm_test_bit is tri-state for some
609 			 * obscure reason; ( b == 0 ) would get the out-of-band
610 			 * only accidentally right because of the "oddly sized"
611 			 * adjustment below */
612 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
613 				break;
614 			bit++;
615 			size += BM_BLOCK_SIZE;
616 			if ((BM_BLOCK_SIZE << align) <= size)
617 				align++;
618 			i++;
619 		}
620 		/* if we merged some,
621 		 * reset the offset to start the next drbd_bm_find_next from */
622 		if (size > BM_BLOCK_SIZE)
623 			mdev->bm_resync_fo = bit + 1;
624 #endif
625 
626 		/* adjust very last sectors, in case we are oddly sized */
627 		if (sector + (size>>9) > capacity)
628 			size = (capacity-sector)<<9;
629 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
630 			switch (read_for_csum(mdev, sector, size)) {
631 			case -EIO: /* Disk failure */
632 				put_ldev(mdev);
633 				return 0;
634 			case -EAGAIN: /* allocation failed, or ldev busy */
635 				drbd_rs_complete_io(mdev, sector);
636 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
637 				i = rollback_i;
638 				goto requeue;
639 			case 0:
640 				/* everything ok */
641 				break;
642 			default:
643 				BUG();
644 			}
645 		} else {
646 			inc_rs_pending(mdev);
647 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
648 					       sector, size, ID_SYNCER)) {
649 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
650 				dec_rs_pending(mdev);
651 				put_ldev(mdev);
652 				return 0;
653 			}
654 		}
655 	}
656 
657 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
658 		/* last syncer _request_ was sent,
659 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
660 		 * next sync group will resume), as soon as we receive the last
661 		 * resync data block, and the last bit is cleared.
662 		 * until then resync "work" is "inactive" ...
663 		 */
664 		put_ldev(mdev);
665 		return 1;
666 	}
667 
668  requeue:
669 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
670 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
671 	put_ldev(mdev);
672 	return 1;
673 }
674 
675 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
676 {
677 	int number, i, size;
678 	sector_t sector;
679 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
680 
681 	if (unlikely(cancel))
682 		return 1;
683 
684 	number = drbd_rs_number_requests(mdev);
685 
686 	sector = mdev->ov_position;
687 	for (i = 0; i < number; i++) {
688 		if (sector >= capacity) {
689 			return 1;
690 		}
691 
692 		size = BM_BLOCK_SIZE;
693 
694 		if (drbd_rs_should_slow_down(mdev, sector) ||
695 		    drbd_try_rs_begin_io(mdev, sector)) {
696 			mdev->ov_position = sector;
697 			goto requeue;
698 		}
699 
700 		if (sector + (size>>9) > capacity)
701 			size = (capacity-sector)<<9;
702 
703 		inc_rs_pending(mdev);
704 		if (!drbd_send_ov_request(mdev, sector, size)) {
705 			dec_rs_pending(mdev);
706 			return 0;
707 		}
708 		sector += BM_SECT_PER_BIT;
709 	}
710 	mdev->ov_position = sector;
711 
712  requeue:
713 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
714 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
715 	return 1;
716 }
717 
718 
719 void start_resync_timer_fn(unsigned long data)
720 {
721 	struct drbd_conf *mdev = (struct drbd_conf *) data;
722 
723 	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
724 }
725 
726 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
727 {
728 	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
729 		dev_warn(DEV, "w_start_resync later...\n");
730 		mdev->start_resync_timer.expires = jiffies + HZ/10;
731 		add_timer(&mdev->start_resync_timer);
732 		return 1;
733 	}
734 
735 	drbd_start_resync(mdev, C_SYNC_SOURCE);
736 	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
737 	return 1;
738 }
739 
740 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741 {
742 	kfree(w);
743 	ov_oos_print(mdev);
744 	drbd_resync_finished(mdev);
745 
746 	return 1;
747 }
748 
749 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
750 {
751 	kfree(w);
752 
753 	drbd_resync_finished(mdev);
754 
755 	return 1;
756 }
757 
758 static void ping_peer(struct drbd_conf *mdev)
759 {
760 	clear_bit(GOT_PING_ACK, &mdev->flags);
761 	request_ping(mdev);
762 	wait_event(mdev->misc_wait,
763 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
764 }
765 
766 int drbd_resync_finished(struct drbd_conf *mdev)
767 {
768 	unsigned long db, dt, dbdt;
769 	unsigned long n_oos;
770 	union drbd_state os, ns;
771 	struct drbd_work *w;
772 	char *khelper_cmd = NULL;
773 	int verify_done = 0;
774 
775 	/* Remove all elements from the resync LRU. Since future actions
776 	 * might set bits in the (main) bitmap, then the entries in the
777 	 * resync LRU would be wrong. */
778 	if (drbd_rs_del_all(mdev)) {
779 		/* In case this is not possible now, most probably because
780 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
781 		 * queue (or even the read operations for those packets
782 		 * is not finished by now).   Retry in 100ms. */
783 
784 		schedule_timeout_interruptible(HZ / 10);
785 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
786 		if (w) {
787 			w->cb = w_resync_finished;
788 			drbd_queue_work(&mdev->data.work, w);
789 			return 1;
790 		}
791 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
792 	}
793 
794 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
795 	if (dt <= 0)
796 		dt = 1;
797 	db = mdev->rs_total;
798 	dbdt = Bit2KB(db/dt);
799 	mdev->rs_paused /= HZ;
800 
801 	if (!get_ldev(mdev))
802 		goto out;
803 
804 	ping_peer(mdev);
805 
806 	spin_lock_irq(&mdev->req_lock);
807 	os = mdev->state;
808 
809 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
810 
811 	/* This protects us against multiple calls (that can happen in the presence
812 	   of application IO), and against connectivity loss just before we arrive here. */
813 	if (os.conn <= C_CONNECTED)
814 		goto out_unlock;
815 
816 	ns = os;
817 	ns.conn = C_CONNECTED;
818 
819 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
820 	     verify_done ? "Online verify " : "Resync",
821 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
822 
823 	n_oos = drbd_bm_total_weight(mdev);
824 
825 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
826 		if (n_oos) {
827 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
828 			      n_oos, Bit2KB(1));
829 			khelper_cmd = "out-of-sync";
830 		}
831 	} else {
832 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
833 
834 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
835 			khelper_cmd = "after-resync-target";
836 
837 		if (mdev->csums_tfm && mdev->rs_total) {
838 			const unsigned long s = mdev->rs_same_csum;
839 			const unsigned long t = mdev->rs_total;
840 			const int ratio =
841 				(t == 0)     ? 0 :
842 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
843 			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
844 			     "transferred %luK total %luK\n",
845 			     ratio,
846 			     Bit2KB(mdev->rs_same_csum),
847 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
848 			     Bit2KB(mdev->rs_total));
849 		}
850 	}
851 
852 	if (mdev->rs_failed) {
853 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
854 
855 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
856 			ns.disk = D_INCONSISTENT;
857 			ns.pdsk = D_UP_TO_DATE;
858 		} else {
859 			ns.disk = D_UP_TO_DATE;
860 			ns.pdsk = D_INCONSISTENT;
861 		}
862 	} else {
863 		ns.disk = D_UP_TO_DATE;
864 		ns.pdsk = D_UP_TO_DATE;
865 
866 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
867 			if (mdev->p_uuid) {
868 				int i;
869 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
870 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
871 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
872 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
873 			} else {
874 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
875 			}
876 		}
877 
878 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
879 			/* for verify runs, we don't update uuids here,
880 			 * so there would be nothing to report. */
881 			drbd_uuid_set_bm(mdev, 0UL);
882 			drbd_print_uuids(mdev, "updated UUIDs");
883 			if (mdev->p_uuid) {
884 				/* Now the two UUID sets are equal, update what we
885 				 * know of the peer. */
886 				int i;
887 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
888 					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
889 			}
890 		}
891 	}
892 
893 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
894 out_unlock:
895 	spin_unlock_irq(&mdev->req_lock);
896 	put_ldev(mdev);
897 out:
898 	mdev->rs_total  = 0;
899 	mdev->rs_failed = 0;
900 	mdev->rs_paused = 0;
901 	if (verify_done)
902 		mdev->ov_start_sector = 0;
903 
904 	drbd_md_sync(mdev);
905 
906 	if (khelper_cmd)
907 		drbd_khelper(mdev, khelper_cmd);
908 
909 	return 1;
910 }
911 
912 /* helper */
913 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
914 {
915 	if (drbd_ee_has_active_page(e)) {
916 		/* This might happen if sendpage() has not finished */
917 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
918 		atomic_add(i, &mdev->pp_in_use_by_net);
919 		atomic_sub(i, &mdev->pp_in_use);
920 		spin_lock_irq(&mdev->req_lock);
921 		list_add_tail(&e->w.list, &mdev->net_ee);
922 		spin_unlock_irq(&mdev->req_lock);
923 		wake_up(&drbd_pp_wait);
924 	} else
925 		drbd_free_ee(mdev, e);
926 }
927 
928 /**
929  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
930  * @mdev:	DRBD device.
931  * @w:		work object.
932  * @cancel:	The connection will be closed anyways
933  */
934 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
935 {
936 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
937 	int ok;
938 
939 	if (unlikely(cancel)) {
940 		drbd_free_ee(mdev, e);
941 		dec_unacked(mdev);
942 		return 1;
943 	}
944 
945 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
946 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
947 	} else {
948 		if (__ratelimit(&drbd_ratelimit_state))
949 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
950 			    (unsigned long long)e->sector);
951 
952 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
953 	}
954 
955 	dec_unacked(mdev);
956 
957 	move_to_net_ee_or_free(mdev, e);
958 
959 	if (unlikely(!ok))
960 		dev_err(DEV, "drbd_send_block() failed\n");
961 	return ok;
962 }
963 
964 /**
965  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
966  * @mdev:	DRBD device.
967  * @w:		work object.
968  * @cancel:	The connection will be closed anyways
969  */
970 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
971 {
972 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
973 	int ok;
974 
975 	if (unlikely(cancel)) {
976 		drbd_free_ee(mdev, e);
977 		dec_unacked(mdev);
978 		return 1;
979 	}
980 
981 	if (get_ldev_if_state(mdev, D_FAILED)) {
982 		drbd_rs_complete_io(mdev, e->sector);
983 		put_ldev(mdev);
984 	}
985 
986 	if (mdev->state.conn == C_AHEAD) {
987 		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
988 	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
989 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
990 			inc_rs_pending(mdev);
991 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
992 		} else {
993 			if (__ratelimit(&drbd_ratelimit_state))
994 				dev_err(DEV, "Not sending RSDataReply, "
995 				    "partner DISKLESS!\n");
996 			ok = 1;
997 		}
998 	} else {
999 		if (__ratelimit(&drbd_ratelimit_state))
1000 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1001 			    (unsigned long long)e->sector);
1002 
1003 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1004 
1005 		/* update resync data with failure */
1006 		drbd_rs_failed_io(mdev, e->sector, e->size);
1007 	}
1008 
1009 	dec_unacked(mdev);
1010 
1011 	move_to_net_ee_or_free(mdev, e);
1012 
1013 	if (unlikely(!ok))
1014 		dev_err(DEV, "drbd_send_block() failed\n");
1015 	return ok;
1016 }
1017 
1018 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1019 {
1020 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1021 	struct digest_info *di;
1022 	int digest_size;
1023 	void *digest = NULL;
1024 	int ok, eq = 0;
1025 
1026 	if (unlikely(cancel)) {
1027 		drbd_free_ee(mdev, e);
1028 		dec_unacked(mdev);
1029 		return 1;
1030 	}
1031 
1032 	if (get_ldev(mdev)) {
1033 		drbd_rs_complete_io(mdev, e->sector);
1034 		put_ldev(mdev);
1035 	}
1036 
1037 	di = e->digest;
1038 
1039 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1040 		/* quick hack to try to avoid a race against reconfiguration.
1041 		 * a real fix would be much more involved,
1042 		 * introducing more locking mechanisms */
1043 		if (mdev->csums_tfm) {
1044 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1045 			D_ASSERT(digest_size == di->digest_size);
1046 			digest = kmalloc(digest_size, GFP_NOIO);
1047 		}
1048 		if (digest) {
1049 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1050 			eq = !memcmp(digest, di->digest, digest_size);
1051 			kfree(digest);
1052 		}
1053 
1054 		if (eq) {
1055 			drbd_set_in_sync(mdev, e->sector, e->size);
1056 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1057 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1058 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1059 		} else {
1060 			inc_rs_pending(mdev);
1061 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1062 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1063 			kfree(di);
1064 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1065 		}
1066 	} else {
1067 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1068 		if (__ratelimit(&drbd_ratelimit_state))
1069 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1070 	}
1071 
1072 	dec_unacked(mdev);
1073 	move_to_net_ee_or_free(mdev, e);
1074 
1075 	if (unlikely(!ok))
1076 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1077 	return ok;
1078 }
1079 
1080 /* TODO merge common code with w_e_send_csum */
1081 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1082 {
1083 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1084 	sector_t sector = e->sector;
1085 	unsigned int size = e->size;
1086 	int digest_size;
1087 	void *digest;
1088 	int ok = 1;
1089 
1090 	if (unlikely(cancel))
1091 		goto out;
1092 
1093 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1094 	digest = kmalloc(digest_size, GFP_NOIO);
1095 	if (!digest) {
1096 		ok = 0;	/* terminate the connection in case the allocation failed */
1097 		goto out;
1098 	}
1099 
1100 	if (likely(!(e->flags & EE_WAS_ERROR)))
1101 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1102 	else
1103 		memset(digest, 0, digest_size);
1104 
1105 	/* Free e and pages before send.
1106 	 * In case we block on congestion, we could otherwise run into
1107 	 * some distributed deadlock, if the other side blocks on
1108 	 * congestion as well, because our receiver blocks in
1109 	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1110 	drbd_free_ee(mdev, e);
1111 	e = NULL;
1112 	inc_rs_pending(mdev);
1113 	ok = drbd_send_drequest_csum(mdev, sector, size,
1114 				     digest, digest_size,
1115 				     P_OV_REPLY);
1116 	if (!ok)
1117 		dec_rs_pending(mdev);
1118 	kfree(digest);
1119 
1120 out:
1121 	if (e)
1122 		drbd_free_ee(mdev, e);
1123 	dec_unacked(mdev);
1124 	return ok;
1125 }
1126 
1127 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1128 {
1129 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1130 		mdev->ov_last_oos_size += size>>9;
1131 	} else {
1132 		mdev->ov_last_oos_start = sector;
1133 		mdev->ov_last_oos_size = size>>9;
1134 	}
1135 	drbd_set_out_of_sync(mdev, sector, size);
1136 }
1137 
1138 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1139 {
1140 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1141 	struct digest_info *di;
1142 	void *digest;
1143 	sector_t sector = e->sector;
1144 	unsigned int size = e->size;
1145 	int digest_size;
1146 	int ok, eq = 0;
1147 
1148 	if (unlikely(cancel)) {
1149 		drbd_free_ee(mdev, e);
1150 		dec_unacked(mdev);
1151 		return 1;
1152 	}
1153 
1154 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1155 	 * the resync lru has been cleaned up already */
1156 	if (get_ldev(mdev)) {
1157 		drbd_rs_complete_io(mdev, e->sector);
1158 		put_ldev(mdev);
1159 	}
1160 
1161 	di = e->digest;
1162 
1163 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1164 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1165 		digest = kmalloc(digest_size, GFP_NOIO);
1166 		if (digest) {
1167 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1168 
1169 			D_ASSERT(digest_size == di->digest_size);
1170 			eq = !memcmp(digest, di->digest, digest_size);
1171 			kfree(digest);
1172 		}
1173 	}
1174 
1175 		/* Free e and pages before send.
1176 		 * In case we block on congestion, we could otherwise run into
1177 		 * some distributed deadlock, if the other side blocks on
1178 		 * congestion as well, because our receiver blocks in
1179 		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1180 	drbd_free_ee(mdev, e);
1181 	if (!eq)
1182 		drbd_ov_oos_found(mdev, sector, size);
1183 	else
1184 		ov_oos_print(mdev);
1185 
1186 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1187 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1188 
1189 	dec_unacked(mdev);
1190 
1191 	--mdev->ov_left;
1192 
1193 	/* let's advance progress step marks only for every other megabyte */
1194 	if ((mdev->ov_left & 0x200) == 0x200)
1195 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1196 
1197 	if (mdev->ov_left == 0) {
1198 		ov_oos_print(mdev);
1199 		drbd_resync_finished(mdev);
1200 	}
1201 
1202 	return ok;
1203 }
1204 
1205 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1206 {
1207 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1208 	complete(&b->done);
1209 	return 1;
1210 }
1211 
1212 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1213 {
1214 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1215 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1216 	int ok = 1;
1217 
1218 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1219 	 * just before it was reassigned and re-queued, so double check that.
1220 	 * actually, this race was harmless, since we only try to send the
1221 	 * barrier packet here, and otherwise do nothing with the object.
1222 	 * but compare with the head of w_clear_epoch */
1223 	spin_lock_irq(&mdev->req_lock);
1224 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1225 		cancel = 1;
1226 	spin_unlock_irq(&mdev->req_lock);
1227 	if (cancel)
1228 		return 1;
1229 
1230 	if (!drbd_get_data_sock(mdev))
1231 		return 0;
1232 	p->barrier = b->br_number;
1233 	/* inc_ap_pending was done where this was queued.
1234 	 * dec_ap_pending will be done in got_BarrierAck
1235 	 * or (on connection loss) in w_clear_epoch.  */
1236 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1237 				(struct p_header80 *)p, sizeof(*p), 0);
1238 	drbd_put_data_sock(mdev);
1239 
1240 	return ok;
1241 }
1242 
1243 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1244 {
1245 	if (cancel)
1246 		return 1;
1247 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1248 }
1249 
1250 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1251 {
1252 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1253 	int ok;
1254 
1255 	if (unlikely(cancel)) {
1256 		req_mod(req, send_canceled);
1257 		return 1;
1258 	}
1259 
1260 	ok = drbd_send_oos(mdev, req);
1261 	req_mod(req, oos_handed_to_network);
1262 
1263 	return ok;
1264 }
1265 
1266 /**
1267  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1268  * @mdev:	DRBD device.
1269  * @w:		work object.
1270  * @cancel:	The connection will be closed anyways
1271  */
1272 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1273 {
1274 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1275 	int ok;
1276 
1277 	if (unlikely(cancel)) {
1278 		req_mod(req, send_canceled);
1279 		return 1;
1280 	}
1281 
1282 	ok = drbd_send_dblock(mdev, req);
1283 	req_mod(req, ok ? handed_over_to_network : send_failed);
1284 
1285 	return ok;
1286 }
1287 
1288 /**
1289  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1290  * @mdev:	DRBD device.
1291  * @w:		work object.
1292  * @cancel:	The connection will be closed anyways
1293  */
1294 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1295 {
1296 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1297 	int ok;
1298 
1299 	if (unlikely(cancel)) {
1300 		req_mod(req, send_canceled);
1301 		return 1;
1302 	}
1303 
1304 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1305 				(unsigned long)req);
1306 
1307 	if (!ok) {
1308 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1309 		 * so this is probably redundant */
1310 		if (mdev->state.conn >= C_CONNECTED)
1311 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1312 	}
1313 	req_mod(req, ok ? handed_over_to_network : send_failed);
1314 
1315 	return ok;
1316 }
1317 
1318 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1319 {
1320 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1321 
1322 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1323 		drbd_al_begin_io(mdev, req->sector);
1324 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1325 	   theoretically. Practically it can not deadlock, since this is
1326 	   only used when unfreezing IOs. All the extents of the requests
1327 	   that made it into the TL are already active */
1328 
1329 	drbd_req_make_private_bio(req, req->master_bio);
1330 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1331 	generic_make_request(req->private_bio);
1332 
1333 	return 1;
1334 }
1335 
1336 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1337 {
1338 	struct drbd_conf *odev = mdev;
1339 
1340 	while (1) {
1341 		if (odev->sync_conf.after == -1)
1342 			return 1;
1343 		odev = minor_to_mdev(odev->sync_conf.after);
1344 		ERR_IF(!odev) return 1;
1345 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1346 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1347 		    odev->state.aftr_isp || odev->state.peer_isp ||
1348 		    odev->state.user_isp)
1349 			return 0;
1350 	}
1351 }
1352 
1353 /**
1354  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1355  * @mdev:	DRBD device.
1356  *
1357  * Called from process context only (admin command and after_state_ch).
1358  */
1359 static int _drbd_pause_after(struct drbd_conf *mdev)
1360 {
1361 	struct drbd_conf *odev;
1362 	int i, rv = 0;
1363 
1364 	for (i = 0; i < minor_count; i++) {
1365 		odev = minor_to_mdev(i);
1366 		if (!odev)
1367 			continue;
1368 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1369 			continue;
1370 		if (!_drbd_may_sync_now(odev))
1371 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1372 			       != SS_NOTHING_TO_DO);
1373 	}
1374 
1375 	return rv;
1376 }
1377 
1378 /**
1379  * _drbd_resume_next() - Resume resync on all devices that may resync now
1380  * @mdev:	DRBD device.
1381  *
1382  * Called from process context only (admin command and worker).
1383  */
1384 static int _drbd_resume_next(struct drbd_conf *mdev)
1385 {
1386 	struct drbd_conf *odev;
1387 	int i, rv = 0;
1388 
1389 	for (i = 0; i < minor_count; i++) {
1390 		odev = minor_to_mdev(i);
1391 		if (!odev)
1392 			continue;
1393 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1394 			continue;
1395 		if (odev->state.aftr_isp) {
1396 			if (_drbd_may_sync_now(odev))
1397 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1398 							CS_HARD, NULL)
1399 				       != SS_NOTHING_TO_DO) ;
1400 		}
1401 	}
1402 	return rv;
1403 }
1404 
1405 void resume_next_sg(struct drbd_conf *mdev)
1406 {
1407 	write_lock_irq(&global_state_lock);
1408 	_drbd_resume_next(mdev);
1409 	write_unlock_irq(&global_state_lock);
1410 }
1411 
1412 void suspend_other_sg(struct drbd_conf *mdev)
1413 {
1414 	write_lock_irq(&global_state_lock);
1415 	_drbd_pause_after(mdev);
1416 	write_unlock_irq(&global_state_lock);
1417 }
1418 
1419 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1420 {
1421 	struct drbd_conf *odev;
1422 
1423 	if (o_minor == -1)
1424 		return NO_ERROR;
1425 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1426 		return ERR_SYNC_AFTER;
1427 
1428 	/* check for loops */
1429 	odev = minor_to_mdev(o_minor);
1430 	while (1) {
1431 		if (odev == mdev)
1432 			return ERR_SYNC_AFTER_CYCLE;
1433 
1434 		/* dependency chain ends here, no cycles. */
1435 		if (odev->sync_conf.after == -1)
1436 			return NO_ERROR;
1437 
1438 		/* follow the dependency chain */
1439 		odev = minor_to_mdev(odev->sync_conf.after);
1440 	}
1441 }
1442 
1443 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1444 {
1445 	int changes;
1446 	int retcode;
1447 
1448 	write_lock_irq(&global_state_lock);
1449 	retcode = sync_after_error(mdev, na);
1450 	if (retcode == NO_ERROR) {
1451 		mdev->sync_conf.after = na;
1452 		do {
1453 			changes  = _drbd_pause_after(mdev);
1454 			changes |= _drbd_resume_next(mdev);
1455 		} while (changes);
1456 	}
1457 	write_unlock_irq(&global_state_lock);
1458 	return retcode;
1459 }
1460 
1461 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1462 {
1463 	atomic_set(&mdev->rs_sect_in, 0);
1464 	atomic_set(&mdev->rs_sect_ev, 0);
1465 	mdev->rs_in_flight = 0;
1466 	mdev->rs_planed = 0;
1467 	spin_lock(&mdev->peer_seq_lock);
1468 	fifo_set(&mdev->rs_plan_s, 0);
1469 	spin_unlock(&mdev->peer_seq_lock);
1470 }
1471 
1472 /**
1473  * drbd_start_resync() - Start the resync process
1474  * @mdev:	DRBD device.
1475  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1476  *
1477  * This function might bring you directly into one of the
1478  * C_PAUSED_SYNC_* states.
1479  */
1480 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1481 {
1482 	union drbd_state ns;
1483 	int r;
1484 
1485 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1486 		dev_err(DEV, "Resync already running!\n");
1487 		return;
1488 	}
1489 
1490 	if (mdev->state.conn < C_AHEAD) {
1491 		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1492 		drbd_rs_cancel_all(mdev);
1493 		/* This should be done when we abort the resync. We definitely do not
1494 		   want to have this for connections going back and forth between
1495 		   Ahead/Behind and SyncSource/SyncTarget */
1496 	}
1497 
1498 	if (side == C_SYNC_TARGET) {
1499 		/* Since application IO was locked out during C_WF_BITMAP_T and
1500 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1501 		   we check that we might make the data inconsistent. */
1502 		r = drbd_khelper(mdev, "before-resync-target");
1503 		r = (r >> 8) & 0xff;
1504 		if (r > 0) {
1505 			dev_info(DEV, "before-resync-target handler returned %d, "
1506 			     "dropping connection.\n", r);
1507 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1508 			return;
1509 		}
1510 	} else /* C_SYNC_SOURCE */ {
1511 		r = drbd_khelper(mdev, "before-resync-source");
1512 		r = (r >> 8) & 0xff;
1513 		if (r > 0) {
1514 			if (r == 3) {
1515 				dev_info(DEV, "before-resync-source handler returned %d, "
1516 					 "ignoring. Old userland tools?", r);
1517 			} else {
1518 				dev_info(DEV, "before-resync-source handler returned %d, "
1519 					 "dropping connection.\n", r);
1520 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1521 				return;
1522 			}
1523 		}
1524 	}
1525 
1526 	drbd_state_lock(mdev);
1527 
1528 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1529 		drbd_state_unlock(mdev);
1530 		return;
1531 	}
1532 
1533 	write_lock_irq(&global_state_lock);
1534 	ns = mdev->state;
1535 
1536 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1537 
1538 	ns.conn = side;
1539 
1540 	if (side == C_SYNC_TARGET)
1541 		ns.disk = D_INCONSISTENT;
1542 	else /* side == C_SYNC_SOURCE */
1543 		ns.pdsk = D_INCONSISTENT;
1544 
1545 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1546 	ns = mdev->state;
1547 
1548 	if (ns.conn < C_CONNECTED)
1549 		r = SS_UNKNOWN_ERROR;
1550 
1551 	if (r == SS_SUCCESS) {
1552 		unsigned long tw = drbd_bm_total_weight(mdev);
1553 		unsigned long now = jiffies;
1554 		int i;
1555 
1556 		mdev->rs_failed    = 0;
1557 		mdev->rs_paused    = 0;
1558 		mdev->rs_same_csum = 0;
1559 		mdev->rs_last_events = 0;
1560 		mdev->rs_last_sect_ev = 0;
1561 		mdev->rs_total     = tw;
1562 		mdev->rs_start     = now;
1563 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1564 			mdev->rs_mark_left[i] = tw;
1565 			mdev->rs_mark_time[i] = now;
1566 		}
1567 		_drbd_pause_after(mdev);
1568 	}
1569 	write_unlock_irq(&global_state_lock);
1570 
1571 	if (r == SS_SUCCESS) {
1572 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1573 		     drbd_conn_str(ns.conn),
1574 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1575 		     (unsigned long) mdev->rs_total);
1576 		if (side == C_SYNC_TARGET)
1577 			mdev->bm_resync_fo = 0;
1578 
1579 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1580 		 * with w_send_oos, or the sync target will get confused as to
1581 		 * how much bits to resync.  We cannot do that always, because for an
1582 		 * empty resync and protocol < 95, we need to do it here, as we call
1583 		 * drbd_resync_finished from here in that case.
1584 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1585 		 * and from after_state_ch otherwise. */
1586 		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1587 			drbd_gen_and_send_sync_uuid(mdev);
1588 
1589 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1590 			/* This still has a race (about when exactly the peers
1591 			 * detect connection loss) that can lead to a full sync
1592 			 * on next handshake. In 8.3.9 we fixed this with explicit
1593 			 * resync-finished notifications, but the fix
1594 			 * introduces a protocol change.  Sleeping for some
1595 			 * time longer than the ping interval + timeout on the
1596 			 * SyncSource, to give the SyncTarget the chance to
1597 			 * detect connection loss, then waiting for a ping
1598 			 * response (implicit in drbd_resync_finished) reduces
1599 			 * the race considerably, but does not solve it. */
1600 			if (side == C_SYNC_SOURCE)
1601 				schedule_timeout_interruptible(
1602 					mdev->net_conf->ping_int * HZ +
1603 					mdev->net_conf->ping_timeo*HZ/9);
1604 			drbd_resync_finished(mdev);
1605 		}
1606 
1607 		drbd_rs_controller_reset(mdev);
1608 		/* ns.conn may already be != mdev->state.conn,
1609 		 * we may have been paused in between, or become paused until
1610 		 * the timer triggers.
1611 		 * No matter, that is handled in resync_timer_fn() */
1612 		if (ns.conn == C_SYNC_TARGET)
1613 			mod_timer(&mdev->resync_timer, jiffies);
1614 
1615 		drbd_md_sync(mdev);
1616 	}
1617 	put_ldev(mdev);
1618 	drbd_state_unlock(mdev);
1619 }
1620 
1621 int drbd_worker(struct drbd_thread *thi)
1622 {
1623 	struct drbd_conf *mdev = thi->mdev;
1624 	struct drbd_work *w = NULL;
1625 	LIST_HEAD(work_list);
1626 	int intr = 0, i;
1627 
1628 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1629 
1630 	while (get_t_state(thi) == Running) {
1631 		drbd_thread_current_set_cpu(mdev);
1632 
1633 		if (down_trylock(&mdev->data.work.s)) {
1634 			mutex_lock(&mdev->data.mutex);
1635 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1636 				drbd_tcp_uncork(mdev->data.socket);
1637 			mutex_unlock(&mdev->data.mutex);
1638 
1639 			intr = down_interruptible(&mdev->data.work.s);
1640 
1641 			mutex_lock(&mdev->data.mutex);
1642 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1643 				drbd_tcp_cork(mdev->data.socket);
1644 			mutex_unlock(&mdev->data.mutex);
1645 		}
1646 
1647 		if (intr) {
1648 			D_ASSERT(intr == -EINTR);
1649 			flush_signals(current);
1650 			ERR_IF (get_t_state(thi) == Running)
1651 				continue;
1652 			break;
1653 		}
1654 
1655 		if (get_t_state(thi) != Running)
1656 			break;
1657 		/* With this break, we have done a down() but not consumed
1658 		   the entry from the list. The cleanup code takes care of
1659 		   this...   */
1660 
1661 		w = NULL;
1662 		spin_lock_irq(&mdev->data.work.q_lock);
1663 		ERR_IF(list_empty(&mdev->data.work.q)) {
1664 			/* something terribly wrong in our logic.
1665 			 * we were able to down() the semaphore,
1666 			 * but the list is empty... doh.
1667 			 *
1668 			 * what is the best thing to do now?
1669 			 * try again from scratch, restarting the receiver,
1670 			 * asender, whatnot? could break even more ugly,
1671 			 * e.g. when we are primary, but no good local data.
1672 			 *
1673 			 * I'll try to get away just starting over this loop.
1674 			 */
1675 			spin_unlock_irq(&mdev->data.work.q_lock);
1676 			continue;
1677 		}
1678 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1679 		list_del_init(&w->list);
1680 		spin_unlock_irq(&mdev->data.work.q_lock);
1681 
1682 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1683 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1684 			if (mdev->state.conn >= C_CONNECTED)
1685 				drbd_force_state(mdev,
1686 						NS(conn, C_NETWORK_FAILURE));
1687 		}
1688 	}
1689 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1690 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1691 
1692 	spin_lock_irq(&mdev->data.work.q_lock);
1693 	i = 0;
1694 	while (!list_empty(&mdev->data.work.q)) {
1695 		list_splice_init(&mdev->data.work.q, &work_list);
1696 		spin_unlock_irq(&mdev->data.work.q_lock);
1697 
1698 		while (!list_empty(&work_list)) {
1699 			w = list_entry(work_list.next, struct drbd_work, list);
1700 			list_del_init(&w->list);
1701 			w->cb(mdev, w, 1);
1702 			i++; /* dead debugging code */
1703 		}
1704 
1705 		spin_lock_irq(&mdev->data.work.q_lock);
1706 	}
1707 	sema_init(&mdev->data.work.s, 0);
1708 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1709 	 * but up() ed outside the spinlock, we could get an up() on the
1710 	 * semaphore without corresponding list entry.
1711 	 * So don't do that.
1712 	 */
1713 	spin_unlock_irq(&mdev->data.work.q_lock);
1714 
1715 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1716 	/* _drbd_set_state only uses stop_nowait.
1717 	 * wait here for the Exiting receiver. */
1718 	drbd_thread_stop(&mdev->receiver);
1719 	drbd_mdev_cleanup(mdev);
1720 
1721 	dev_info(DEV, "worker terminated\n");
1722 
1723 	clear_bit(DEVICE_DYING, &mdev->flags);
1724 	clear_bit(CONFIG_PENDING, &mdev->flags);
1725 	wake_up(&mdev->state_wait);
1726 
1727 	return 0;
1728 }
1729