xref: /linux/drivers/block/drbd/drbd_worker.c (revision f8324e20f8289dffc646d64366332e05eaacab25)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 #define SLEEP_TIME (HZ/10)
43 
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45 
46 
47 
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_sec
51    drbd_endio_pri
52 
53  * more endio handlers:
54    atodb_endio in drbd_actlog.c
55    drbd_bm_async_io_complete in drbd_bitmap.c
56 
57  * For all these callbacks, note the following:
58  * The callbacks will be called in irq context by the IDE drivers,
59  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60  * Try to get the locking right :)
61  *
62  */
63 
64 
65 /* About the global_state_lock
66    Each state transition on an device holds a read lock. In case we have
67    to evaluate the sync after dependencies, we grab a write lock, because
68    we need stable states on all devices for that.  */
69 rwlock_t global_state_lock;
70 
71 /* used for synchronous meta data and bitmap IO
72  * submitted by drbd_md_sync_page_io()
73  */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76 	struct drbd_md_io *md_io;
77 
78 	md_io = (struct drbd_md_io *)bio->bi_private;
79 	md_io->error = error;
80 
81 	complete(&md_io->event);
82 }
83 
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89 	unsigned long flags = 0;
90 	struct drbd_conf *mdev = e->mdev;
91 
92 	D_ASSERT(e->block_id != ID_VACANT);
93 
94 	spin_lock_irqsave(&mdev->req_lock, flags);
95 	mdev->read_cnt += e->size >> 9;
96 	list_del(&e->w.list);
97 	if (list_empty(&mdev->read_ee))
98 		wake_up(&mdev->ee_wait);
99 	if (test_bit(__EE_WAS_ERROR, &e->flags))
100 		__drbd_chk_io_error(mdev, FALSE);
101 	spin_unlock_irqrestore(&mdev->req_lock, flags);
102 
103 	drbd_queue_work(&mdev->data.work, &e->w);
104 	put_ldev(mdev);
105 }
106 
107 static int is_failed_barrier(int ee_flags)
108 {
109 	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 			== (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112 
113 /* writes on behalf of the partner, or resync writes,
114  * "submitted" by the receiver, final stage.  */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117 	unsigned long flags = 0;
118 	struct drbd_conf *mdev = e->mdev;
119 	sector_t e_sector;
120 	int do_wake;
121 	int is_syncer_req;
122 	int do_al_complete_io;
123 
124 	/* if this is a failed barrier request, disable use of barriers,
125 	 * and schedule for resubmission */
126 	if (is_failed_barrier(e->flags)) {
127 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
128 		spin_lock_irqsave(&mdev->req_lock, flags);
129 		list_del(&e->w.list);
130 		e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131 		e->w.cb = w_e_reissue;
132 		/* put_ldev actually happens below, once we come here again. */
133 		__release(local);
134 		spin_unlock_irqrestore(&mdev->req_lock, flags);
135 		drbd_queue_work(&mdev->data.work, &e->w);
136 		return;
137 	}
138 
139 	D_ASSERT(e->block_id != ID_VACANT);
140 
141 	/* after we moved e to done_ee,
142 	 * we may no longer access it,
143 	 * it may be freed/reused already!
144 	 * (as soon as we release the req_lock) */
145 	e_sector = e->sector;
146 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 	is_syncer_req = is_syncer_block_id(e->block_id);
148 
149 	spin_lock_irqsave(&mdev->req_lock, flags);
150 	mdev->writ_cnt += e->size >> 9;
151 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
152 	list_add_tail(&e->w.list, &mdev->done_ee);
153 
154 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155 	 * neither did we wake possibly waiting conflicting requests.
156 	 * done from "drbd_process_done_ee" within the appropriate w.cb
157 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158 
159 	do_wake = is_syncer_req
160 		? list_empty(&mdev->sync_ee)
161 		: list_empty(&mdev->active_ee);
162 
163 	if (test_bit(__EE_WAS_ERROR, &e->flags))
164 		__drbd_chk_io_error(mdev, FALSE);
165 	spin_unlock_irqrestore(&mdev->req_lock, flags);
166 
167 	if (is_syncer_req)
168 		drbd_rs_complete_io(mdev, e_sector);
169 
170 	if (do_wake)
171 		wake_up(&mdev->ee_wait);
172 
173 	if (do_al_complete_io)
174 		drbd_al_complete_io(mdev, e_sector);
175 
176 	wake_asender(mdev);
177 	put_ldev(mdev);
178 }
179 
180 /* writes on behalf of the partner, or resync writes,
181  * "submitted" by the receiver.
182  */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185 	struct drbd_epoch_entry *e = bio->bi_private;
186 	struct drbd_conf *mdev = e->mdev;
187 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 	int is_write = bio_data_dir(bio) == WRITE;
189 
190 	if (error)
191 		dev_warn(DEV, "%s: error=%d s=%llus\n",
192 				is_write ? "write" : "read", error,
193 				(unsigned long long)e->sector);
194 	if (!error && !uptodate) {
195 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 				is_write ? "write" : "read",
197 				(unsigned long long)e->sector);
198 		/* strange behavior of some lower level drivers...
199 		 * fail the request by clearing the uptodate flag,
200 		 * but do not return any error?! */
201 		error = -EIO;
202 	}
203 
204 	if (error)
205 		set_bit(__EE_WAS_ERROR, &e->flags);
206 
207 	bio_put(bio); /* no need for the bio anymore */
208 	if (atomic_dec_and_test(&e->pending_bios)) {
209 		if (is_write)
210 			drbd_endio_write_sec_final(e);
211 		else
212 			drbd_endio_read_sec_final(e);
213 	}
214 }
215 
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217  */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220 	unsigned long flags;
221 	struct drbd_request *req = bio->bi_private;
222 	struct drbd_conf *mdev = req->mdev;
223 	struct bio_and_error m;
224 	enum drbd_req_event what;
225 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
226 
227 	if (!error && !uptodate) {
228 		dev_warn(DEV, "p %s: setting error to -EIO\n",
229 			 bio_data_dir(bio) == WRITE ? "write" : "read");
230 		/* strange behavior of some lower level drivers...
231 		 * fail the request by clearing the uptodate flag,
232 		 * but do not return any error?! */
233 		error = -EIO;
234 	}
235 
236 	/* to avoid recursion in __req_mod */
237 	if (unlikely(error)) {
238 		what = (bio_data_dir(bio) == WRITE)
239 			? write_completed_with_error
240 			: (bio_rw(bio) == READ)
241 			  ? read_completed_with_error
242 			  : read_ahead_completed_with_error;
243 	} else
244 		what = completed_ok;
245 
246 	bio_put(req->private_bio);
247 	req->private_bio = ERR_PTR(error);
248 
249 	spin_lock_irqsave(&mdev->req_lock, flags);
250 	__req_mod(req, what, &m);
251 	spin_unlock_irqrestore(&mdev->req_lock, flags);
252 
253 	if (m.bio)
254 		complete_master_bio(mdev, &m);
255 }
256 
257 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
258 {
259 	struct drbd_request *req = container_of(w, struct drbd_request, w);
260 
261 	/* We should not detach for read io-error,
262 	 * but try to WRITE the P_DATA_REPLY to the failed location,
263 	 * to give the disk the chance to relocate that block */
264 
265 	spin_lock_irq(&mdev->req_lock);
266 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
267 		_req_mod(req, read_retry_remote_canceled);
268 		spin_unlock_irq(&mdev->req_lock);
269 		return 1;
270 	}
271 	spin_unlock_irq(&mdev->req_lock);
272 
273 	return w_send_read_req(mdev, w, 0);
274 }
275 
276 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
277 {
278 	ERR_IF(cancel) return 1;
279 	dev_err(DEV, "resync inactive, but callback triggered??\n");
280 	return 1; /* Simply ignore this! */
281 }
282 
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
284 {
285 	struct hash_desc desc;
286 	struct scatterlist sg;
287 	struct page *page = e->pages;
288 	struct page *tmp;
289 	unsigned len;
290 
291 	desc.tfm = tfm;
292 	desc.flags = 0;
293 
294 	sg_init_table(&sg, 1);
295 	crypto_hash_init(&desc);
296 
297 	while ((tmp = page_chain_next(page))) {
298 		/* all but the last page will be fully used */
299 		sg_set_page(&sg, page, PAGE_SIZE, 0);
300 		crypto_hash_update(&desc, &sg, sg.length);
301 		page = tmp;
302 	}
303 	/* and now the last, possibly only partially used page */
304 	len = e->size & (PAGE_SIZE - 1);
305 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
306 	crypto_hash_update(&desc, &sg, sg.length);
307 	crypto_hash_final(&desc, digest);
308 }
309 
310 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
311 {
312 	struct hash_desc desc;
313 	struct scatterlist sg;
314 	struct bio_vec *bvec;
315 	int i;
316 
317 	desc.tfm = tfm;
318 	desc.flags = 0;
319 
320 	sg_init_table(&sg, 1);
321 	crypto_hash_init(&desc);
322 
323 	__bio_for_each_segment(bvec, bio, i, 0) {
324 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
325 		crypto_hash_update(&desc, &sg, sg.length);
326 	}
327 	crypto_hash_final(&desc, digest);
328 }
329 
330 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
331 {
332 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
333 	int digest_size;
334 	void *digest;
335 	int ok;
336 
337 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
338 
339 	if (unlikely(cancel)) {
340 		drbd_free_ee(mdev, e);
341 		return 1;
342 	}
343 
344 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
345 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
346 		digest = kmalloc(digest_size, GFP_NOIO);
347 		if (digest) {
348 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
349 
350 			inc_rs_pending(mdev);
351 			ok = drbd_send_drequest_csum(mdev,
352 						     e->sector,
353 						     e->size,
354 						     digest,
355 						     digest_size,
356 						     P_CSUM_RS_REQUEST);
357 			kfree(digest);
358 		} else {
359 			dev_err(DEV, "kmalloc() of digest failed.\n");
360 			ok = 0;
361 		}
362 	} else
363 		ok = 1;
364 
365 	drbd_free_ee(mdev, e);
366 
367 	if (unlikely(!ok))
368 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
369 	return ok;
370 }
371 
372 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
373 
374 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
375 {
376 	struct drbd_epoch_entry *e;
377 
378 	if (!get_ldev(mdev))
379 		return 0;
380 
381 	/* GFP_TRY, because if there is no memory available right now, this may
382 	 * be rescheduled for later. It is "only" background resync, after all. */
383 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
384 	if (!e)
385 		goto fail;
386 
387 	spin_lock_irq(&mdev->req_lock);
388 	list_add(&e->w.list, &mdev->read_ee);
389 	spin_unlock_irq(&mdev->req_lock);
390 
391 	e->w.cb = w_e_send_csum;
392 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
393 		return 1;
394 
395 	drbd_free_ee(mdev, e);
396 fail:
397 	put_ldev(mdev);
398 	return 2;
399 }
400 
401 void resync_timer_fn(unsigned long data)
402 {
403 	unsigned long flags;
404 	struct drbd_conf *mdev = (struct drbd_conf *) data;
405 	int queue;
406 
407 	spin_lock_irqsave(&mdev->req_lock, flags);
408 
409 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
410 		queue = 1;
411 		if (mdev->state.conn == C_VERIFY_S)
412 			mdev->resync_work.cb = w_make_ov_request;
413 		else
414 			mdev->resync_work.cb = w_make_resync_request;
415 	} else {
416 		queue = 0;
417 		mdev->resync_work.cb = w_resync_inactive;
418 	}
419 
420 	spin_unlock_irqrestore(&mdev->req_lock, flags);
421 
422 	/* harmless race: list_empty outside data.work.q_lock */
423 	if (list_empty(&mdev->resync_work.list) && queue)
424 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
425 }
426 
427 static int calc_resync_rate(struct drbd_conf *mdev)
428 {
429 	int d = mdev->data_delay / 1000; /* us -> ms */
430 	int td = mdev->sync_conf.throttle_th * 100;  /* 0.1s -> ms */
431 	int hd = mdev->sync_conf.hold_off_th * 100;  /* 0.1s -> ms */
432 	int cr = mdev->sync_conf.rate;
433 
434 	return d <= td ? cr :
435 		d >= hd ? 0 :
436 		cr + (cr * (td - d) / (hd - td));
437 }
438 
439 int w_make_resync_request(struct drbd_conf *mdev,
440 		struct drbd_work *w, int cancel)
441 {
442 	unsigned long bit;
443 	sector_t sector;
444 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
445 	int max_segment_size;
446 	int number, i, size, pe, mx;
447 	int align, queued, sndbuf;
448 
449 	if (unlikely(cancel))
450 		return 1;
451 
452 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
453 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
454 		return 0;
455 	}
456 
457 	if (mdev->state.conn != C_SYNC_TARGET)
458 		dev_err(DEV, "%s in w_make_resync_request\n",
459 			drbd_conn_str(mdev->state.conn));
460 
461 	if (!get_ldev(mdev)) {
462 		/* Since we only need to access mdev->rsync a
463 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
464 		   to continue resync with a broken disk makes no sense at
465 		   all */
466 		dev_err(DEV, "Disk broke down during resync!\n");
467 		mdev->resync_work.cb = w_resync_inactive;
468 		return 1;
469 	}
470 
471 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
472 	 * if it should be necessary */
473 	max_segment_size = mdev->agreed_pro_version < 94 ?
474 		queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
475 
476 	mdev->c_sync_rate = calc_resync_rate(mdev);
477 	number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
478 	pe = atomic_read(&mdev->rs_pending_cnt);
479 
480 	mutex_lock(&mdev->data.mutex);
481 	if (mdev->data.socket)
482 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
483 	else
484 		mx = 1;
485 	mutex_unlock(&mdev->data.mutex);
486 
487 	/* For resync rates >160MB/sec, allow more pending RS requests */
488 	if (number > mx)
489 		mx = number;
490 
491 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
492 	if ((pe + number) > mx) {
493 		number = mx - pe;
494 	}
495 
496 	for (i = 0; i < number; i++) {
497 		/* Stop generating RS requests, when half of the send buffer is filled */
498 		mutex_lock(&mdev->data.mutex);
499 		if (mdev->data.socket) {
500 			queued = mdev->data.socket->sk->sk_wmem_queued;
501 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
502 		} else {
503 			queued = 1;
504 			sndbuf = 0;
505 		}
506 		mutex_unlock(&mdev->data.mutex);
507 		if (queued > sndbuf / 2)
508 			goto requeue;
509 
510 next_sector:
511 		size = BM_BLOCK_SIZE;
512 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
513 
514 		if (bit == -1UL) {
515 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
516 			mdev->resync_work.cb = w_resync_inactive;
517 			put_ldev(mdev);
518 			return 1;
519 		}
520 
521 		sector = BM_BIT_TO_SECT(bit);
522 
523 		if (drbd_try_rs_begin_io(mdev, sector)) {
524 			mdev->bm_resync_fo = bit;
525 			goto requeue;
526 		}
527 		mdev->bm_resync_fo = bit + 1;
528 
529 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
530 			drbd_rs_complete_io(mdev, sector);
531 			goto next_sector;
532 		}
533 
534 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
535 		/* try to find some adjacent bits.
536 		 * we stop if we have already the maximum req size.
537 		 *
538 		 * Additionally always align bigger requests, in order to
539 		 * be prepared for all stripe sizes of software RAIDs.
540 		 */
541 		align = 1;
542 		for (;;) {
543 			if (size + BM_BLOCK_SIZE > max_segment_size)
544 				break;
545 
546 			/* Be always aligned */
547 			if (sector & ((1<<(align+3))-1))
548 				break;
549 
550 			/* do not cross extent boundaries */
551 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
552 				break;
553 			/* now, is it actually dirty, after all?
554 			 * caution, drbd_bm_test_bit is tri-state for some
555 			 * obscure reason; ( b == 0 ) would get the out-of-band
556 			 * only accidentally right because of the "oddly sized"
557 			 * adjustment below */
558 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
559 				break;
560 			bit++;
561 			size += BM_BLOCK_SIZE;
562 			if ((BM_BLOCK_SIZE << align) <= size)
563 				align++;
564 			i++;
565 		}
566 		/* if we merged some,
567 		 * reset the offset to start the next drbd_bm_find_next from */
568 		if (size > BM_BLOCK_SIZE)
569 			mdev->bm_resync_fo = bit + 1;
570 #endif
571 
572 		/* adjust very last sectors, in case we are oddly sized */
573 		if (sector + (size>>9) > capacity)
574 			size = (capacity-sector)<<9;
575 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
576 			switch (read_for_csum(mdev, sector, size)) {
577 			case 0: /* Disk failure*/
578 				put_ldev(mdev);
579 				return 0;
580 			case 2: /* Allocation failed */
581 				drbd_rs_complete_io(mdev, sector);
582 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
583 				goto requeue;
584 			/* case 1: everything ok */
585 			}
586 		} else {
587 			inc_rs_pending(mdev);
588 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
589 					       sector, size, ID_SYNCER)) {
590 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
591 				dec_rs_pending(mdev);
592 				put_ldev(mdev);
593 				return 0;
594 			}
595 		}
596 	}
597 
598 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
599 		/* last syncer _request_ was sent,
600 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
601 		 * next sync group will resume), as soon as we receive the last
602 		 * resync data block, and the last bit is cleared.
603 		 * until then resync "work" is "inactive" ...
604 		 */
605 		mdev->resync_work.cb = w_resync_inactive;
606 		put_ldev(mdev);
607 		return 1;
608 	}
609 
610  requeue:
611 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
612 	put_ldev(mdev);
613 	return 1;
614 }
615 
616 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
617 {
618 	int number, i, size;
619 	sector_t sector;
620 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
621 
622 	if (unlikely(cancel))
623 		return 1;
624 
625 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
626 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
627 		return 0;
628 	}
629 
630 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
631 	if (atomic_read(&mdev->rs_pending_cnt) > number)
632 		goto requeue;
633 
634 	number -= atomic_read(&mdev->rs_pending_cnt);
635 
636 	sector = mdev->ov_position;
637 	for (i = 0; i < number; i++) {
638 		if (sector >= capacity) {
639 			mdev->resync_work.cb = w_resync_inactive;
640 			return 1;
641 		}
642 
643 		size = BM_BLOCK_SIZE;
644 
645 		if (drbd_try_rs_begin_io(mdev, sector)) {
646 			mdev->ov_position = sector;
647 			goto requeue;
648 		}
649 
650 		if (sector + (size>>9) > capacity)
651 			size = (capacity-sector)<<9;
652 
653 		inc_rs_pending(mdev);
654 		if (!drbd_send_ov_request(mdev, sector, size)) {
655 			dec_rs_pending(mdev);
656 			return 0;
657 		}
658 		sector += BM_SECT_PER_BIT;
659 	}
660 	mdev->ov_position = sector;
661 
662  requeue:
663 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
664 	return 1;
665 }
666 
667 
668 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
669 {
670 	kfree(w);
671 	ov_oos_print(mdev);
672 	drbd_resync_finished(mdev);
673 
674 	return 1;
675 }
676 
677 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
678 {
679 	kfree(w);
680 
681 	drbd_resync_finished(mdev);
682 
683 	return 1;
684 }
685 
686 int drbd_resync_finished(struct drbd_conf *mdev)
687 {
688 	unsigned long db, dt, dbdt;
689 	unsigned long n_oos;
690 	union drbd_state os, ns;
691 	struct drbd_work *w;
692 	char *khelper_cmd = NULL;
693 
694 	/* Remove all elements from the resync LRU. Since future actions
695 	 * might set bits in the (main) bitmap, then the entries in the
696 	 * resync LRU would be wrong. */
697 	if (drbd_rs_del_all(mdev)) {
698 		/* In case this is not possible now, most probably because
699 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
700 		 * queue (or even the read operations for those packets
701 		 * is not finished by now).   Retry in 100ms. */
702 
703 		drbd_kick_lo(mdev);
704 		__set_current_state(TASK_INTERRUPTIBLE);
705 		schedule_timeout(HZ / 10);
706 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
707 		if (w) {
708 			w->cb = w_resync_finished;
709 			drbd_queue_work(&mdev->data.work, w);
710 			return 1;
711 		}
712 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
713 	}
714 
715 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
716 	if (dt <= 0)
717 		dt = 1;
718 	db = mdev->rs_total;
719 	dbdt = Bit2KB(db/dt);
720 	mdev->rs_paused /= HZ;
721 
722 	if (!get_ldev(mdev))
723 		goto out;
724 
725 	spin_lock_irq(&mdev->req_lock);
726 	os = mdev->state;
727 
728 	/* This protects us against multiple calls (that can happen in the presence
729 	   of application IO), and against connectivity loss just before we arrive here. */
730 	if (os.conn <= C_CONNECTED)
731 		goto out_unlock;
732 
733 	ns = os;
734 	ns.conn = C_CONNECTED;
735 
736 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
737 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
738 	     "Online verify " : "Resync",
739 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
740 
741 	n_oos = drbd_bm_total_weight(mdev);
742 
743 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
744 		if (n_oos) {
745 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
746 			      n_oos, Bit2KB(1));
747 			khelper_cmd = "out-of-sync";
748 		}
749 	} else {
750 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
751 
752 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
753 			khelper_cmd = "after-resync-target";
754 
755 		if (mdev->csums_tfm && mdev->rs_total) {
756 			const unsigned long s = mdev->rs_same_csum;
757 			const unsigned long t = mdev->rs_total;
758 			const int ratio =
759 				(t == 0)     ? 0 :
760 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
761 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
762 			     "transferred %luK total %luK\n",
763 			     ratio,
764 			     Bit2KB(mdev->rs_same_csum),
765 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
766 			     Bit2KB(mdev->rs_total));
767 		}
768 	}
769 
770 	if (mdev->rs_failed) {
771 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
772 
773 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
774 			ns.disk = D_INCONSISTENT;
775 			ns.pdsk = D_UP_TO_DATE;
776 		} else {
777 			ns.disk = D_UP_TO_DATE;
778 			ns.pdsk = D_INCONSISTENT;
779 		}
780 	} else {
781 		ns.disk = D_UP_TO_DATE;
782 		ns.pdsk = D_UP_TO_DATE;
783 
784 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
785 			if (mdev->p_uuid) {
786 				int i;
787 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
788 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
789 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
790 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
791 			} else {
792 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
793 			}
794 		}
795 
796 		drbd_uuid_set_bm(mdev, 0UL);
797 
798 		if (mdev->p_uuid) {
799 			/* Now the two UUID sets are equal, update what we
800 			 * know of the peer. */
801 			int i;
802 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
803 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
804 		}
805 	}
806 
807 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
808 out_unlock:
809 	spin_unlock_irq(&mdev->req_lock);
810 	put_ldev(mdev);
811 out:
812 	mdev->rs_total  = 0;
813 	mdev->rs_failed = 0;
814 	mdev->rs_paused = 0;
815 	mdev->ov_start_sector = 0;
816 
817 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
818 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
819 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
820 	}
821 
822 	if (khelper_cmd)
823 		drbd_khelper(mdev, khelper_cmd);
824 
825 	return 1;
826 }
827 
828 /* helper */
829 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
830 {
831 	if (drbd_ee_has_active_page(e)) {
832 		/* This might happen if sendpage() has not finished */
833 		spin_lock_irq(&mdev->req_lock);
834 		list_add_tail(&e->w.list, &mdev->net_ee);
835 		spin_unlock_irq(&mdev->req_lock);
836 	} else
837 		drbd_free_ee(mdev, e);
838 }
839 
840 /**
841  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
842  * @mdev:	DRBD device.
843  * @w:		work object.
844  * @cancel:	The connection will be closed anyways
845  */
846 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
847 {
848 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
849 	int ok;
850 
851 	if (unlikely(cancel)) {
852 		drbd_free_ee(mdev, e);
853 		dec_unacked(mdev);
854 		return 1;
855 	}
856 
857 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
858 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
859 	} else {
860 		if (__ratelimit(&drbd_ratelimit_state))
861 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
862 			    (unsigned long long)e->sector);
863 
864 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
865 	}
866 
867 	dec_unacked(mdev);
868 
869 	move_to_net_ee_or_free(mdev, e);
870 
871 	if (unlikely(!ok))
872 		dev_err(DEV, "drbd_send_block() failed\n");
873 	return ok;
874 }
875 
876 /**
877  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
878  * @mdev:	DRBD device.
879  * @w:		work object.
880  * @cancel:	The connection will be closed anyways
881  */
882 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
883 {
884 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
885 	int ok;
886 
887 	if (unlikely(cancel)) {
888 		drbd_free_ee(mdev, e);
889 		dec_unacked(mdev);
890 		return 1;
891 	}
892 
893 	if (get_ldev_if_state(mdev, D_FAILED)) {
894 		drbd_rs_complete_io(mdev, e->sector);
895 		put_ldev(mdev);
896 	}
897 
898 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
899 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
900 			inc_rs_pending(mdev);
901 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
902 		} else {
903 			if (__ratelimit(&drbd_ratelimit_state))
904 				dev_err(DEV, "Not sending RSDataReply, "
905 				    "partner DISKLESS!\n");
906 			ok = 1;
907 		}
908 	} else {
909 		if (__ratelimit(&drbd_ratelimit_state))
910 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
911 			    (unsigned long long)e->sector);
912 
913 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
914 
915 		/* update resync data with failure */
916 		drbd_rs_failed_io(mdev, e->sector, e->size);
917 	}
918 
919 	dec_unacked(mdev);
920 
921 	move_to_net_ee_or_free(mdev, e);
922 
923 	if (unlikely(!ok))
924 		dev_err(DEV, "drbd_send_block() failed\n");
925 	return ok;
926 }
927 
928 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
929 {
930 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
931 	struct digest_info *di;
932 	int digest_size;
933 	void *digest = NULL;
934 	int ok, eq = 0;
935 
936 	if (unlikely(cancel)) {
937 		drbd_free_ee(mdev, e);
938 		dec_unacked(mdev);
939 		return 1;
940 	}
941 
942 	drbd_rs_complete_io(mdev, e->sector);
943 
944 	di = (struct digest_info *)(unsigned long)e->block_id;
945 
946 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
947 		/* quick hack to try to avoid a race against reconfiguration.
948 		 * a real fix would be much more involved,
949 		 * introducing more locking mechanisms */
950 		if (mdev->csums_tfm) {
951 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
952 			D_ASSERT(digest_size == di->digest_size);
953 			digest = kmalloc(digest_size, GFP_NOIO);
954 		}
955 		if (digest) {
956 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
957 			eq = !memcmp(digest, di->digest, digest_size);
958 			kfree(digest);
959 		}
960 
961 		if (eq) {
962 			drbd_set_in_sync(mdev, e->sector, e->size);
963 			/* rs_same_csums unit is BM_BLOCK_SIZE */
964 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
965 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
966 		} else {
967 			inc_rs_pending(mdev);
968 			e->block_id = ID_SYNCER;
969 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
970 		}
971 	} else {
972 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
973 		if (__ratelimit(&drbd_ratelimit_state))
974 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
975 	}
976 
977 	dec_unacked(mdev);
978 
979 	kfree(di);
980 
981 	move_to_net_ee_or_free(mdev, e);
982 
983 	if (unlikely(!ok))
984 		dev_err(DEV, "drbd_send_block/ack() failed\n");
985 	return ok;
986 }
987 
988 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
989 {
990 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
991 	int digest_size;
992 	void *digest;
993 	int ok = 1;
994 
995 	if (unlikely(cancel))
996 		goto out;
997 
998 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
999 		goto out;
1000 
1001 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1002 	/* FIXME if this allocation fails, online verify will not terminate! */
1003 	digest = kmalloc(digest_size, GFP_NOIO);
1004 	if (digest) {
1005 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1006 		inc_rs_pending(mdev);
1007 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1008 					     digest, digest_size, P_OV_REPLY);
1009 		if (!ok)
1010 			dec_rs_pending(mdev);
1011 		kfree(digest);
1012 	}
1013 
1014 out:
1015 	drbd_free_ee(mdev, e);
1016 
1017 	dec_unacked(mdev);
1018 
1019 	return ok;
1020 }
1021 
1022 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1023 {
1024 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1025 		mdev->ov_last_oos_size += size>>9;
1026 	} else {
1027 		mdev->ov_last_oos_start = sector;
1028 		mdev->ov_last_oos_size = size>>9;
1029 	}
1030 	drbd_set_out_of_sync(mdev, sector, size);
1031 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1032 }
1033 
1034 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1035 {
1036 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1037 	struct digest_info *di;
1038 	int digest_size;
1039 	void *digest;
1040 	int ok, eq = 0;
1041 
1042 	if (unlikely(cancel)) {
1043 		drbd_free_ee(mdev, e);
1044 		dec_unacked(mdev);
1045 		return 1;
1046 	}
1047 
1048 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1049 	 * the resync lru has been cleaned up already */
1050 	drbd_rs_complete_io(mdev, e->sector);
1051 
1052 	di = (struct digest_info *)(unsigned long)e->block_id;
1053 
1054 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1055 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1056 		digest = kmalloc(digest_size, GFP_NOIO);
1057 		if (digest) {
1058 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1059 
1060 			D_ASSERT(digest_size == di->digest_size);
1061 			eq = !memcmp(digest, di->digest, digest_size);
1062 			kfree(digest);
1063 		}
1064 	} else {
1065 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1066 		if (__ratelimit(&drbd_ratelimit_state))
1067 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1068 	}
1069 
1070 	dec_unacked(mdev);
1071 
1072 	kfree(di);
1073 
1074 	if (!eq)
1075 		drbd_ov_oos_found(mdev, e->sector, e->size);
1076 	else
1077 		ov_oos_print(mdev);
1078 
1079 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1080 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1081 
1082 	drbd_free_ee(mdev, e);
1083 
1084 	if (--mdev->ov_left == 0) {
1085 		ov_oos_print(mdev);
1086 		drbd_resync_finished(mdev);
1087 	}
1088 
1089 	return ok;
1090 }
1091 
1092 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1093 {
1094 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1095 	complete(&b->done);
1096 	return 1;
1097 }
1098 
1099 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1100 {
1101 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1102 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1103 	int ok = 1;
1104 
1105 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1106 	 * just before it was reassigned and re-queued, so double check that.
1107 	 * actually, this race was harmless, since we only try to send the
1108 	 * barrier packet here, and otherwise do nothing with the object.
1109 	 * but compare with the head of w_clear_epoch */
1110 	spin_lock_irq(&mdev->req_lock);
1111 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1112 		cancel = 1;
1113 	spin_unlock_irq(&mdev->req_lock);
1114 	if (cancel)
1115 		return 1;
1116 
1117 	if (!drbd_get_data_sock(mdev))
1118 		return 0;
1119 	p->barrier = b->br_number;
1120 	/* inc_ap_pending was done where this was queued.
1121 	 * dec_ap_pending will be done in got_BarrierAck
1122 	 * or (on connection loss) in w_clear_epoch.  */
1123 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1124 				(struct p_header *)p, sizeof(*p), 0);
1125 	drbd_put_data_sock(mdev);
1126 
1127 	return ok;
1128 }
1129 
1130 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1131 {
1132 	if (cancel)
1133 		return 1;
1134 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1135 }
1136 
1137 /**
1138  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1139  * @mdev:	DRBD device.
1140  * @w:		work object.
1141  * @cancel:	The connection will be closed anyways
1142  */
1143 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1144 {
1145 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1146 	int ok;
1147 
1148 	if (unlikely(cancel)) {
1149 		req_mod(req, send_canceled);
1150 		return 1;
1151 	}
1152 
1153 	ok = drbd_send_dblock(mdev, req);
1154 	req_mod(req, ok ? handed_over_to_network : send_failed);
1155 
1156 	return ok;
1157 }
1158 
1159 /**
1160  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1161  * @mdev:	DRBD device.
1162  * @w:		work object.
1163  * @cancel:	The connection will be closed anyways
1164  */
1165 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1166 {
1167 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1168 	int ok;
1169 
1170 	if (unlikely(cancel)) {
1171 		req_mod(req, send_canceled);
1172 		return 1;
1173 	}
1174 
1175 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1176 				(unsigned long)req);
1177 
1178 	if (!ok) {
1179 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1180 		 * so this is probably redundant */
1181 		if (mdev->state.conn >= C_CONNECTED)
1182 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1183 	}
1184 	req_mod(req, ok ? handed_over_to_network : send_failed);
1185 
1186 	return ok;
1187 }
1188 
1189 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1190 {
1191 	struct drbd_conf *odev = mdev;
1192 
1193 	while (1) {
1194 		if (odev->sync_conf.after == -1)
1195 			return 1;
1196 		odev = minor_to_mdev(odev->sync_conf.after);
1197 		ERR_IF(!odev) return 1;
1198 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1199 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1200 		    odev->state.aftr_isp || odev->state.peer_isp ||
1201 		    odev->state.user_isp)
1202 			return 0;
1203 	}
1204 }
1205 
1206 /**
1207  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1208  * @mdev:	DRBD device.
1209  *
1210  * Called from process context only (admin command and after_state_ch).
1211  */
1212 static int _drbd_pause_after(struct drbd_conf *mdev)
1213 {
1214 	struct drbd_conf *odev;
1215 	int i, rv = 0;
1216 
1217 	for (i = 0; i < minor_count; i++) {
1218 		odev = minor_to_mdev(i);
1219 		if (!odev)
1220 			continue;
1221 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1222 			continue;
1223 		if (!_drbd_may_sync_now(odev))
1224 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1225 			       != SS_NOTHING_TO_DO);
1226 	}
1227 
1228 	return rv;
1229 }
1230 
1231 /**
1232  * _drbd_resume_next() - Resume resync on all devices that may resync now
1233  * @mdev:	DRBD device.
1234  *
1235  * Called from process context only (admin command and worker).
1236  */
1237 static int _drbd_resume_next(struct drbd_conf *mdev)
1238 {
1239 	struct drbd_conf *odev;
1240 	int i, rv = 0;
1241 
1242 	for (i = 0; i < minor_count; i++) {
1243 		odev = minor_to_mdev(i);
1244 		if (!odev)
1245 			continue;
1246 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1247 			continue;
1248 		if (odev->state.aftr_isp) {
1249 			if (_drbd_may_sync_now(odev))
1250 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1251 							CS_HARD, NULL)
1252 				       != SS_NOTHING_TO_DO) ;
1253 		}
1254 	}
1255 	return rv;
1256 }
1257 
1258 void resume_next_sg(struct drbd_conf *mdev)
1259 {
1260 	write_lock_irq(&global_state_lock);
1261 	_drbd_resume_next(mdev);
1262 	write_unlock_irq(&global_state_lock);
1263 }
1264 
1265 void suspend_other_sg(struct drbd_conf *mdev)
1266 {
1267 	write_lock_irq(&global_state_lock);
1268 	_drbd_pause_after(mdev);
1269 	write_unlock_irq(&global_state_lock);
1270 }
1271 
1272 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1273 {
1274 	struct drbd_conf *odev;
1275 
1276 	if (o_minor == -1)
1277 		return NO_ERROR;
1278 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1279 		return ERR_SYNC_AFTER;
1280 
1281 	/* check for loops */
1282 	odev = minor_to_mdev(o_minor);
1283 	while (1) {
1284 		if (odev == mdev)
1285 			return ERR_SYNC_AFTER_CYCLE;
1286 
1287 		/* dependency chain ends here, no cycles. */
1288 		if (odev->sync_conf.after == -1)
1289 			return NO_ERROR;
1290 
1291 		/* follow the dependency chain */
1292 		odev = minor_to_mdev(odev->sync_conf.after);
1293 	}
1294 }
1295 
1296 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1297 {
1298 	int changes;
1299 	int retcode;
1300 
1301 	write_lock_irq(&global_state_lock);
1302 	retcode = sync_after_error(mdev, na);
1303 	if (retcode == NO_ERROR) {
1304 		mdev->sync_conf.after = na;
1305 		do {
1306 			changes  = _drbd_pause_after(mdev);
1307 			changes |= _drbd_resume_next(mdev);
1308 		} while (changes);
1309 	}
1310 	write_unlock_irq(&global_state_lock);
1311 	return retcode;
1312 }
1313 
1314 static void ping_peer(struct drbd_conf *mdev)
1315 {
1316 	clear_bit(GOT_PING_ACK, &mdev->flags);
1317 	request_ping(mdev);
1318 	wait_event(mdev->misc_wait,
1319 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1320 }
1321 
1322 /**
1323  * drbd_start_resync() - Start the resync process
1324  * @mdev:	DRBD device.
1325  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1326  *
1327  * This function might bring you directly into one of the
1328  * C_PAUSED_SYNC_* states.
1329  */
1330 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1331 {
1332 	union drbd_state ns;
1333 	int r;
1334 
1335 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1336 		dev_err(DEV, "Resync already running!\n");
1337 		return;
1338 	}
1339 
1340 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1341 	drbd_rs_cancel_all(mdev);
1342 
1343 	if (side == C_SYNC_TARGET) {
1344 		/* Since application IO was locked out during C_WF_BITMAP_T and
1345 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1346 		   we check that we might make the data inconsistent. */
1347 		r = drbd_khelper(mdev, "before-resync-target");
1348 		r = (r >> 8) & 0xff;
1349 		if (r > 0) {
1350 			dev_info(DEV, "before-resync-target handler returned %d, "
1351 			     "dropping connection.\n", r);
1352 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1353 			return;
1354 		}
1355 	}
1356 
1357 	drbd_state_lock(mdev);
1358 
1359 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1360 		drbd_state_unlock(mdev);
1361 		return;
1362 	}
1363 
1364 	if (side == C_SYNC_TARGET) {
1365 		mdev->bm_resync_fo = 0;
1366 	} else /* side == C_SYNC_SOURCE */ {
1367 		u64 uuid;
1368 
1369 		get_random_bytes(&uuid, sizeof(u64));
1370 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1371 		drbd_send_sync_uuid(mdev, uuid);
1372 
1373 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1374 	}
1375 
1376 	write_lock_irq(&global_state_lock);
1377 	ns = mdev->state;
1378 
1379 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1380 
1381 	ns.conn = side;
1382 
1383 	if (side == C_SYNC_TARGET)
1384 		ns.disk = D_INCONSISTENT;
1385 	else /* side == C_SYNC_SOURCE */
1386 		ns.pdsk = D_INCONSISTENT;
1387 
1388 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1389 	ns = mdev->state;
1390 
1391 	if (ns.conn < C_CONNECTED)
1392 		r = SS_UNKNOWN_ERROR;
1393 
1394 	if (r == SS_SUCCESS) {
1395 		mdev->rs_total     =
1396 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1397 		mdev->rs_failed    = 0;
1398 		mdev->rs_paused    = 0;
1399 		mdev->rs_start     =
1400 		mdev->rs_mark_time = jiffies;
1401 		mdev->rs_same_csum = 0;
1402 		_drbd_pause_after(mdev);
1403 	}
1404 	write_unlock_irq(&global_state_lock);
1405 	put_ldev(mdev);
1406 
1407 	if (r == SS_SUCCESS) {
1408 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1409 		     drbd_conn_str(ns.conn),
1410 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1411 		     (unsigned long) mdev->rs_total);
1412 
1413 		if (mdev->rs_total == 0) {
1414 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1415 			ping_peer(mdev);
1416 			drbd_resync_finished(mdev);
1417 		}
1418 
1419 		/* ns.conn may already be != mdev->state.conn,
1420 		 * we may have been paused in between, or become paused until
1421 		 * the timer triggers.
1422 		 * No matter, that is handled in resync_timer_fn() */
1423 		if (ns.conn == C_SYNC_TARGET)
1424 			mod_timer(&mdev->resync_timer, jiffies);
1425 
1426 		drbd_md_sync(mdev);
1427 	}
1428 	drbd_state_unlock(mdev);
1429 }
1430 
1431 int drbd_worker(struct drbd_thread *thi)
1432 {
1433 	struct drbd_conf *mdev = thi->mdev;
1434 	struct drbd_work *w = NULL;
1435 	LIST_HEAD(work_list);
1436 	int intr = 0, i;
1437 
1438 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1439 
1440 	while (get_t_state(thi) == Running) {
1441 		drbd_thread_current_set_cpu(mdev);
1442 
1443 		if (down_trylock(&mdev->data.work.s)) {
1444 			mutex_lock(&mdev->data.mutex);
1445 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1446 				drbd_tcp_uncork(mdev->data.socket);
1447 			mutex_unlock(&mdev->data.mutex);
1448 
1449 			intr = down_interruptible(&mdev->data.work.s);
1450 
1451 			mutex_lock(&mdev->data.mutex);
1452 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1453 				drbd_tcp_cork(mdev->data.socket);
1454 			mutex_unlock(&mdev->data.mutex);
1455 		}
1456 
1457 		if (intr) {
1458 			D_ASSERT(intr == -EINTR);
1459 			flush_signals(current);
1460 			ERR_IF (get_t_state(thi) == Running)
1461 				continue;
1462 			break;
1463 		}
1464 
1465 		if (get_t_state(thi) != Running)
1466 			break;
1467 		/* With this break, we have done a down() but not consumed
1468 		   the entry from the list. The cleanup code takes care of
1469 		   this...   */
1470 
1471 		w = NULL;
1472 		spin_lock_irq(&mdev->data.work.q_lock);
1473 		ERR_IF(list_empty(&mdev->data.work.q)) {
1474 			/* something terribly wrong in our logic.
1475 			 * we were able to down() the semaphore,
1476 			 * but the list is empty... doh.
1477 			 *
1478 			 * what is the best thing to do now?
1479 			 * try again from scratch, restarting the receiver,
1480 			 * asender, whatnot? could break even more ugly,
1481 			 * e.g. when we are primary, but no good local data.
1482 			 *
1483 			 * I'll try to get away just starting over this loop.
1484 			 */
1485 			spin_unlock_irq(&mdev->data.work.q_lock);
1486 			continue;
1487 		}
1488 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1489 		list_del_init(&w->list);
1490 		spin_unlock_irq(&mdev->data.work.q_lock);
1491 
1492 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1493 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1494 			if (mdev->state.conn >= C_CONNECTED)
1495 				drbd_force_state(mdev,
1496 						NS(conn, C_NETWORK_FAILURE));
1497 		}
1498 	}
1499 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1500 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1501 
1502 	spin_lock_irq(&mdev->data.work.q_lock);
1503 	i = 0;
1504 	while (!list_empty(&mdev->data.work.q)) {
1505 		list_splice_init(&mdev->data.work.q, &work_list);
1506 		spin_unlock_irq(&mdev->data.work.q_lock);
1507 
1508 		while (!list_empty(&work_list)) {
1509 			w = list_entry(work_list.next, struct drbd_work, list);
1510 			list_del_init(&w->list);
1511 			w->cb(mdev, w, 1);
1512 			i++; /* dead debugging code */
1513 		}
1514 
1515 		spin_lock_irq(&mdev->data.work.q_lock);
1516 	}
1517 	sema_init(&mdev->data.work.s, 0);
1518 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1519 	 * but up() ed outside the spinlock, we could get an up() on the
1520 	 * semaphore without corresponding list entry.
1521 	 * So don't do that.
1522 	 */
1523 	spin_unlock_irq(&mdev->data.work.q_lock);
1524 
1525 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1526 	/* _drbd_set_state only uses stop_nowait.
1527 	 * wait here for the Exiting receiver. */
1528 	drbd_thread_stop(&mdev->receiver);
1529 	drbd_mdev_cleanup(mdev);
1530 
1531 	dev_info(DEV, "worker terminated\n");
1532 
1533 	clear_bit(DEVICE_DYING, &mdev->flags);
1534 	clear_bit(CONFIG_PENDING, &mdev->flags);
1535 	wake_up(&mdev->state_wait);
1536 
1537 	return 0;
1538 }
1539