xref: /linux/drivers/block/drbd/drbd_worker.c (revision 606b2f490fb80e55d05cf0e6cec0b6c0ff0fc18f)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 #define SLEEP_TIME (HZ/10)
43 
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45 
46 
47 
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_sec
51    drbd_endio_pri
52 
53  * more endio handlers:
54    atodb_endio in drbd_actlog.c
55    drbd_bm_async_io_complete in drbd_bitmap.c
56 
57  * For all these callbacks, note the following:
58  * The callbacks will be called in irq context by the IDE drivers,
59  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60  * Try to get the locking right :)
61  *
62  */
63 
64 
65 /* About the global_state_lock
66    Each state transition on an device holds a read lock. In case we have
67    to evaluate the sync after dependencies, we grab a write lock, because
68    we need stable states on all devices for that.  */
69 rwlock_t global_state_lock;
70 
71 /* used for synchronous meta data and bitmap IO
72  * submitted by drbd_md_sync_page_io()
73  */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76 	struct drbd_md_io *md_io;
77 
78 	md_io = (struct drbd_md_io *)bio->bi_private;
79 	md_io->error = error;
80 
81 	complete(&md_io->event);
82 }
83 
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89 	unsigned long flags = 0;
90 	struct drbd_conf *mdev = e->mdev;
91 
92 	D_ASSERT(e->block_id != ID_VACANT);
93 
94 	spin_lock_irqsave(&mdev->req_lock, flags);
95 	mdev->read_cnt += e->size >> 9;
96 	list_del(&e->w.list);
97 	if (list_empty(&mdev->read_ee))
98 		wake_up(&mdev->ee_wait);
99 	if (test_bit(__EE_WAS_ERROR, &e->flags))
100 		__drbd_chk_io_error(mdev, FALSE);
101 	spin_unlock_irqrestore(&mdev->req_lock, flags);
102 
103 	drbd_queue_work(&mdev->data.work, &e->w);
104 	put_ldev(mdev);
105 }
106 
107 static int is_failed_barrier(int ee_flags)
108 {
109 	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 			== (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112 
113 /* writes on behalf of the partner, or resync writes,
114  * "submitted" by the receiver, final stage.  */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117 	unsigned long flags = 0;
118 	struct drbd_conf *mdev = e->mdev;
119 	sector_t e_sector;
120 	int do_wake;
121 	int is_syncer_req;
122 	int do_al_complete_io;
123 
124 	/* if this is a failed barrier request, disable use of barriers,
125 	 * and schedule for resubmission */
126 	if (is_failed_barrier(e->flags)) {
127 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
128 		spin_lock_irqsave(&mdev->req_lock, flags);
129 		list_del(&e->w.list);
130 		e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131 		e->w.cb = w_e_reissue;
132 		/* put_ldev actually happens below, once we come here again. */
133 		__release(local);
134 		spin_unlock_irqrestore(&mdev->req_lock, flags);
135 		drbd_queue_work(&mdev->data.work, &e->w);
136 		return;
137 	}
138 
139 	D_ASSERT(e->block_id != ID_VACANT);
140 
141 	/* after we moved e to done_ee,
142 	 * we may no longer access it,
143 	 * it may be freed/reused already!
144 	 * (as soon as we release the req_lock) */
145 	e_sector = e->sector;
146 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 	is_syncer_req = is_syncer_block_id(e->block_id);
148 
149 	spin_lock_irqsave(&mdev->req_lock, flags);
150 	mdev->writ_cnt += e->size >> 9;
151 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
152 	list_add_tail(&e->w.list, &mdev->done_ee);
153 
154 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155 	 * neither did we wake possibly waiting conflicting requests.
156 	 * done from "drbd_process_done_ee" within the appropriate w.cb
157 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158 
159 	do_wake = is_syncer_req
160 		? list_empty(&mdev->sync_ee)
161 		: list_empty(&mdev->active_ee);
162 
163 	if (test_bit(__EE_WAS_ERROR, &e->flags))
164 		__drbd_chk_io_error(mdev, FALSE);
165 	spin_unlock_irqrestore(&mdev->req_lock, flags);
166 
167 	if (is_syncer_req)
168 		drbd_rs_complete_io(mdev, e_sector);
169 
170 	if (do_wake)
171 		wake_up(&mdev->ee_wait);
172 
173 	if (do_al_complete_io)
174 		drbd_al_complete_io(mdev, e_sector);
175 
176 	wake_asender(mdev);
177 	put_ldev(mdev);
178 }
179 
180 /* writes on behalf of the partner, or resync writes,
181  * "submitted" by the receiver.
182  */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185 	struct drbd_epoch_entry *e = bio->bi_private;
186 	struct drbd_conf *mdev = e->mdev;
187 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 	int is_write = bio_data_dir(bio) == WRITE;
189 
190 	if (error)
191 		dev_warn(DEV, "%s: error=%d s=%llus\n",
192 				is_write ? "write" : "read", error,
193 				(unsigned long long)e->sector);
194 	if (!error && !uptodate) {
195 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 				is_write ? "write" : "read",
197 				(unsigned long long)e->sector);
198 		/* strange behavior of some lower level drivers...
199 		 * fail the request by clearing the uptodate flag,
200 		 * but do not return any error?! */
201 		error = -EIO;
202 	}
203 
204 	if (error)
205 		set_bit(__EE_WAS_ERROR, &e->flags);
206 
207 	bio_put(bio); /* no need for the bio anymore */
208 	if (atomic_dec_and_test(&e->pending_bios)) {
209 		if (is_write)
210 			drbd_endio_write_sec_final(e);
211 		else
212 			drbd_endio_read_sec_final(e);
213 	}
214 }
215 
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217  */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220 	unsigned long flags;
221 	struct drbd_request *req = bio->bi_private;
222 	struct drbd_conf *mdev = req->mdev;
223 	struct bio_and_error m;
224 	enum drbd_req_event what;
225 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
226 
227 	if (!error && !uptodate) {
228 		dev_warn(DEV, "p %s: setting error to -EIO\n",
229 			 bio_data_dir(bio) == WRITE ? "write" : "read");
230 		/* strange behavior of some lower level drivers...
231 		 * fail the request by clearing the uptodate flag,
232 		 * but do not return any error?! */
233 		error = -EIO;
234 	}
235 
236 	/* to avoid recursion in __req_mod */
237 	if (unlikely(error)) {
238 		what = (bio_data_dir(bio) == WRITE)
239 			? write_completed_with_error
240 			: (bio_rw(bio) == READ)
241 			  ? read_completed_with_error
242 			  : read_ahead_completed_with_error;
243 	} else
244 		what = completed_ok;
245 
246 	bio_put(req->private_bio);
247 	req->private_bio = ERR_PTR(error);
248 
249 	spin_lock_irqsave(&mdev->req_lock, flags);
250 	__req_mod(req, what, &m);
251 	spin_unlock_irqrestore(&mdev->req_lock, flags);
252 
253 	if (m.bio)
254 		complete_master_bio(mdev, &m);
255 }
256 
257 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
258 {
259 	struct drbd_request *req = container_of(w, struct drbd_request, w);
260 
261 	/* We should not detach for read io-error,
262 	 * but try to WRITE the P_DATA_REPLY to the failed location,
263 	 * to give the disk the chance to relocate that block */
264 
265 	spin_lock_irq(&mdev->req_lock);
266 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
267 		_req_mod(req, read_retry_remote_canceled);
268 		spin_unlock_irq(&mdev->req_lock);
269 		return 1;
270 	}
271 	spin_unlock_irq(&mdev->req_lock);
272 
273 	return w_send_read_req(mdev, w, 0);
274 }
275 
276 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
277 {
278 	ERR_IF(cancel) return 1;
279 	dev_err(DEV, "resync inactive, but callback triggered??\n");
280 	return 1; /* Simply ignore this! */
281 }
282 
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
284 {
285 	struct hash_desc desc;
286 	struct scatterlist sg;
287 	struct page *page = e->pages;
288 	struct page *tmp;
289 	unsigned len;
290 
291 	desc.tfm = tfm;
292 	desc.flags = 0;
293 
294 	sg_init_table(&sg, 1);
295 	crypto_hash_init(&desc);
296 
297 	while ((tmp = page_chain_next(page))) {
298 		/* all but the last page will be fully used */
299 		sg_set_page(&sg, page, PAGE_SIZE, 0);
300 		crypto_hash_update(&desc, &sg, sg.length);
301 		page = tmp;
302 	}
303 	/* and now the last, possibly only partially used page */
304 	len = e->size & (PAGE_SIZE - 1);
305 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
306 	crypto_hash_update(&desc, &sg, sg.length);
307 	crypto_hash_final(&desc, digest);
308 }
309 
310 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
311 {
312 	struct hash_desc desc;
313 	struct scatterlist sg;
314 	struct bio_vec *bvec;
315 	int i;
316 
317 	desc.tfm = tfm;
318 	desc.flags = 0;
319 
320 	sg_init_table(&sg, 1);
321 	crypto_hash_init(&desc);
322 
323 	__bio_for_each_segment(bvec, bio, i, 0) {
324 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
325 		crypto_hash_update(&desc, &sg, sg.length);
326 	}
327 	crypto_hash_final(&desc, digest);
328 }
329 
330 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
331 {
332 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
333 	int digest_size;
334 	void *digest;
335 	int ok;
336 
337 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
338 
339 	if (unlikely(cancel)) {
340 		drbd_free_ee(mdev, e);
341 		return 1;
342 	}
343 
344 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
345 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
346 		digest = kmalloc(digest_size, GFP_NOIO);
347 		if (digest) {
348 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
349 
350 			inc_rs_pending(mdev);
351 			ok = drbd_send_drequest_csum(mdev,
352 						     e->sector,
353 						     e->size,
354 						     digest,
355 						     digest_size,
356 						     P_CSUM_RS_REQUEST);
357 			kfree(digest);
358 		} else {
359 			dev_err(DEV, "kmalloc() of digest failed.\n");
360 			ok = 0;
361 		}
362 	} else
363 		ok = 1;
364 
365 	drbd_free_ee(mdev, e);
366 
367 	if (unlikely(!ok))
368 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
369 	return ok;
370 }
371 
372 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
373 
374 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
375 {
376 	struct drbd_epoch_entry *e;
377 
378 	if (!get_ldev(mdev))
379 		return 0;
380 
381 	/* GFP_TRY, because if there is no memory available right now, this may
382 	 * be rescheduled for later. It is "only" background resync, after all. */
383 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
384 	if (!e)
385 		goto fail;
386 
387 	spin_lock_irq(&mdev->req_lock);
388 	list_add(&e->w.list, &mdev->read_ee);
389 	spin_unlock_irq(&mdev->req_lock);
390 
391 	e->w.cb = w_e_send_csum;
392 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
393 		return 1;
394 
395 	drbd_free_ee(mdev, e);
396 fail:
397 	put_ldev(mdev);
398 	return 2;
399 }
400 
401 void resync_timer_fn(unsigned long data)
402 {
403 	unsigned long flags;
404 	struct drbd_conf *mdev = (struct drbd_conf *) data;
405 	int queue;
406 
407 	spin_lock_irqsave(&mdev->req_lock, flags);
408 
409 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
410 		queue = 1;
411 		if (mdev->state.conn == C_VERIFY_S)
412 			mdev->resync_work.cb = w_make_ov_request;
413 		else
414 			mdev->resync_work.cb = w_make_resync_request;
415 	} else {
416 		queue = 0;
417 		mdev->resync_work.cb = w_resync_inactive;
418 	}
419 
420 	spin_unlock_irqrestore(&mdev->req_lock, flags);
421 
422 	/* harmless race: list_empty outside data.work.q_lock */
423 	if (list_empty(&mdev->resync_work.list) && queue)
424 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
425 }
426 
427 int w_make_resync_request(struct drbd_conf *mdev,
428 		struct drbd_work *w, int cancel)
429 {
430 	unsigned long bit;
431 	sector_t sector;
432 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
433 	int max_segment_size;
434 	int number, i, size, pe, mx;
435 	int align, queued, sndbuf;
436 
437 	if (unlikely(cancel))
438 		return 1;
439 
440 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
441 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
442 		return 0;
443 	}
444 
445 	if (mdev->state.conn != C_SYNC_TARGET)
446 		dev_err(DEV, "%s in w_make_resync_request\n",
447 			drbd_conn_str(mdev->state.conn));
448 
449 	if (!get_ldev(mdev)) {
450 		/* Since we only need to access mdev->rsync a
451 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
452 		   to continue resync with a broken disk makes no sense at
453 		   all */
454 		dev_err(DEV, "Disk broke down during resync!\n");
455 		mdev->resync_work.cb = w_resync_inactive;
456 		return 1;
457 	}
458 
459 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
460 	 * if it should be necessary */
461 	max_segment_size = mdev->agreed_pro_version < 94 ?
462 		queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
463 
464 	number = SLEEP_TIME * mdev->sync_conf.rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
465 	pe = atomic_read(&mdev->rs_pending_cnt);
466 
467 	mutex_lock(&mdev->data.mutex);
468 	if (mdev->data.socket)
469 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
470 	else
471 		mx = 1;
472 	mutex_unlock(&mdev->data.mutex);
473 
474 	/* For resync rates >160MB/sec, allow more pending RS requests */
475 	if (number > mx)
476 		mx = number;
477 
478 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
479 	if ((pe + number) > mx) {
480 		number = mx - pe;
481 	}
482 
483 	for (i = 0; i < number; i++) {
484 		/* Stop generating RS requests, when half of the send buffer is filled */
485 		mutex_lock(&mdev->data.mutex);
486 		if (mdev->data.socket) {
487 			queued = mdev->data.socket->sk->sk_wmem_queued;
488 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
489 		} else {
490 			queued = 1;
491 			sndbuf = 0;
492 		}
493 		mutex_unlock(&mdev->data.mutex);
494 		if (queued > sndbuf / 2)
495 			goto requeue;
496 
497 next_sector:
498 		size = BM_BLOCK_SIZE;
499 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
500 
501 		if (bit == -1UL) {
502 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
503 			mdev->resync_work.cb = w_resync_inactive;
504 			put_ldev(mdev);
505 			return 1;
506 		}
507 
508 		sector = BM_BIT_TO_SECT(bit);
509 
510 		if (drbd_try_rs_begin_io(mdev, sector)) {
511 			mdev->bm_resync_fo = bit;
512 			goto requeue;
513 		}
514 		mdev->bm_resync_fo = bit + 1;
515 
516 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
517 			drbd_rs_complete_io(mdev, sector);
518 			goto next_sector;
519 		}
520 
521 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
522 		/* try to find some adjacent bits.
523 		 * we stop if we have already the maximum req size.
524 		 *
525 		 * Additionally always align bigger requests, in order to
526 		 * be prepared for all stripe sizes of software RAIDs.
527 		 */
528 		align = 1;
529 		for (;;) {
530 			if (size + BM_BLOCK_SIZE > max_segment_size)
531 				break;
532 
533 			/* Be always aligned */
534 			if (sector & ((1<<(align+3))-1))
535 				break;
536 
537 			/* do not cross extent boundaries */
538 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
539 				break;
540 			/* now, is it actually dirty, after all?
541 			 * caution, drbd_bm_test_bit is tri-state for some
542 			 * obscure reason; ( b == 0 ) would get the out-of-band
543 			 * only accidentally right because of the "oddly sized"
544 			 * adjustment below */
545 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
546 				break;
547 			bit++;
548 			size += BM_BLOCK_SIZE;
549 			if ((BM_BLOCK_SIZE << align) <= size)
550 				align++;
551 			i++;
552 		}
553 		/* if we merged some,
554 		 * reset the offset to start the next drbd_bm_find_next from */
555 		if (size > BM_BLOCK_SIZE)
556 			mdev->bm_resync_fo = bit + 1;
557 #endif
558 
559 		/* adjust very last sectors, in case we are oddly sized */
560 		if (sector + (size>>9) > capacity)
561 			size = (capacity-sector)<<9;
562 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
563 			switch (read_for_csum(mdev, sector, size)) {
564 			case 0: /* Disk failure*/
565 				put_ldev(mdev);
566 				return 0;
567 			case 2: /* Allocation failed */
568 				drbd_rs_complete_io(mdev, sector);
569 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
570 				goto requeue;
571 			/* case 1: everything ok */
572 			}
573 		} else {
574 			inc_rs_pending(mdev);
575 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
576 					       sector, size, ID_SYNCER)) {
577 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
578 				dec_rs_pending(mdev);
579 				put_ldev(mdev);
580 				return 0;
581 			}
582 		}
583 	}
584 
585 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
586 		/* last syncer _request_ was sent,
587 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
588 		 * next sync group will resume), as soon as we receive the last
589 		 * resync data block, and the last bit is cleared.
590 		 * until then resync "work" is "inactive" ...
591 		 */
592 		mdev->resync_work.cb = w_resync_inactive;
593 		put_ldev(mdev);
594 		return 1;
595 	}
596 
597  requeue:
598 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
599 	put_ldev(mdev);
600 	return 1;
601 }
602 
603 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
604 {
605 	int number, i, size;
606 	sector_t sector;
607 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
608 
609 	if (unlikely(cancel))
610 		return 1;
611 
612 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
613 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
614 		return 0;
615 	}
616 
617 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
618 	if (atomic_read(&mdev->rs_pending_cnt) > number)
619 		goto requeue;
620 
621 	number -= atomic_read(&mdev->rs_pending_cnt);
622 
623 	sector = mdev->ov_position;
624 	for (i = 0; i < number; i++) {
625 		if (sector >= capacity) {
626 			mdev->resync_work.cb = w_resync_inactive;
627 			return 1;
628 		}
629 
630 		size = BM_BLOCK_SIZE;
631 
632 		if (drbd_try_rs_begin_io(mdev, sector)) {
633 			mdev->ov_position = sector;
634 			goto requeue;
635 		}
636 
637 		if (sector + (size>>9) > capacity)
638 			size = (capacity-sector)<<9;
639 
640 		inc_rs_pending(mdev);
641 		if (!drbd_send_ov_request(mdev, sector, size)) {
642 			dec_rs_pending(mdev);
643 			return 0;
644 		}
645 		sector += BM_SECT_PER_BIT;
646 	}
647 	mdev->ov_position = sector;
648 
649  requeue:
650 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
651 	return 1;
652 }
653 
654 
655 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
656 {
657 	kfree(w);
658 	ov_oos_print(mdev);
659 	drbd_resync_finished(mdev);
660 
661 	return 1;
662 }
663 
664 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
665 {
666 	kfree(w);
667 
668 	drbd_resync_finished(mdev);
669 
670 	return 1;
671 }
672 
673 int drbd_resync_finished(struct drbd_conf *mdev)
674 {
675 	unsigned long db, dt, dbdt;
676 	unsigned long n_oos;
677 	union drbd_state os, ns;
678 	struct drbd_work *w;
679 	char *khelper_cmd = NULL;
680 
681 	/* Remove all elements from the resync LRU. Since future actions
682 	 * might set bits in the (main) bitmap, then the entries in the
683 	 * resync LRU would be wrong. */
684 	if (drbd_rs_del_all(mdev)) {
685 		/* In case this is not possible now, most probably because
686 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
687 		 * queue (or even the read operations for those packets
688 		 * is not finished by now).   Retry in 100ms. */
689 
690 		drbd_kick_lo(mdev);
691 		__set_current_state(TASK_INTERRUPTIBLE);
692 		schedule_timeout(HZ / 10);
693 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
694 		if (w) {
695 			w->cb = w_resync_finished;
696 			drbd_queue_work(&mdev->data.work, w);
697 			return 1;
698 		}
699 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
700 	}
701 
702 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
703 	if (dt <= 0)
704 		dt = 1;
705 	db = mdev->rs_total;
706 	dbdt = Bit2KB(db/dt);
707 	mdev->rs_paused /= HZ;
708 
709 	if (!get_ldev(mdev))
710 		goto out;
711 
712 	spin_lock_irq(&mdev->req_lock);
713 	os = mdev->state;
714 
715 	/* This protects us against multiple calls (that can happen in the presence
716 	   of application IO), and against connectivity loss just before we arrive here. */
717 	if (os.conn <= C_CONNECTED)
718 		goto out_unlock;
719 
720 	ns = os;
721 	ns.conn = C_CONNECTED;
722 
723 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
724 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
725 	     "Online verify " : "Resync",
726 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
727 
728 	n_oos = drbd_bm_total_weight(mdev);
729 
730 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
731 		if (n_oos) {
732 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
733 			      n_oos, Bit2KB(1));
734 			khelper_cmd = "out-of-sync";
735 		}
736 	} else {
737 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
738 
739 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
740 			khelper_cmd = "after-resync-target";
741 
742 		if (mdev->csums_tfm && mdev->rs_total) {
743 			const unsigned long s = mdev->rs_same_csum;
744 			const unsigned long t = mdev->rs_total;
745 			const int ratio =
746 				(t == 0)     ? 0 :
747 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
748 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
749 			     "transferred %luK total %luK\n",
750 			     ratio,
751 			     Bit2KB(mdev->rs_same_csum),
752 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
753 			     Bit2KB(mdev->rs_total));
754 		}
755 	}
756 
757 	if (mdev->rs_failed) {
758 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
759 
760 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
761 			ns.disk = D_INCONSISTENT;
762 			ns.pdsk = D_UP_TO_DATE;
763 		} else {
764 			ns.disk = D_UP_TO_DATE;
765 			ns.pdsk = D_INCONSISTENT;
766 		}
767 	} else {
768 		ns.disk = D_UP_TO_DATE;
769 		ns.pdsk = D_UP_TO_DATE;
770 
771 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
772 			if (mdev->p_uuid) {
773 				int i;
774 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
775 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
776 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
777 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
778 			} else {
779 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
780 			}
781 		}
782 
783 		drbd_uuid_set_bm(mdev, 0UL);
784 
785 		if (mdev->p_uuid) {
786 			/* Now the two UUID sets are equal, update what we
787 			 * know of the peer. */
788 			int i;
789 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
790 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
791 		}
792 	}
793 
794 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
795 out_unlock:
796 	spin_unlock_irq(&mdev->req_lock);
797 	put_ldev(mdev);
798 out:
799 	mdev->rs_total  = 0;
800 	mdev->rs_failed = 0;
801 	mdev->rs_paused = 0;
802 	mdev->ov_start_sector = 0;
803 
804 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
805 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
806 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
807 	}
808 
809 	if (khelper_cmd)
810 		drbd_khelper(mdev, khelper_cmd);
811 
812 	return 1;
813 }
814 
815 /* helper */
816 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
817 {
818 	if (drbd_ee_has_active_page(e)) {
819 		/* This might happen if sendpage() has not finished */
820 		spin_lock_irq(&mdev->req_lock);
821 		list_add_tail(&e->w.list, &mdev->net_ee);
822 		spin_unlock_irq(&mdev->req_lock);
823 	} else
824 		drbd_free_ee(mdev, e);
825 }
826 
827 /**
828  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
829  * @mdev:	DRBD device.
830  * @w:		work object.
831  * @cancel:	The connection will be closed anyways
832  */
833 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
834 {
835 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
836 	int ok;
837 
838 	if (unlikely(cancel)) {
839 		drbd_free_ee(mdev, e);
840 		dec_unacked(mdev);
841 		return 1;
842 	}
843 
844 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
845 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
846 	} else {
847 		if (__ratelimit(&drbd_ratelimit_state))
848 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
849 			    (unsigned long long)e->sector);
850 
851 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
852 	}
853 
854 	dec_unacked(mdev);
855 
856 	move_to_net_ee_or_free(mdev, e);
857 
858 	if (unlikely(!ok))
859 		dev_err(DEV, "drbd_send_block() failed\n");
860 	return ok;
861 }
862 
863 /**
864  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
865  * @mdev:	DRBD device.
866  * @w:		work object.
867  * @cancel:	The connection will be closed anyways
868  */
869 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
870 {
871 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
872 	int ok;
873 
874 	if (unlikely(cancel)) {
875 		drbd_free_ee(mdev, e);
876 		dec_unacked(mdev);
877 		return 1;
878 	}
879 
880 	if (get_ldev_if_state(mdev, D_FAILED)) {
881 		drbd_rs_complete_io(mdev, e->sector);
882 		put_ldev(mdev);
883 	}
884 
885 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
886 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
887 			inc_rs_pending(mdev);
888 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
889 		} else {
890 			if (__ratelimit(&drbd_ratelimit_state))
891 				dev_err(DEV, "Not sending RSDataReply, "
892 				    "partner DISKLESS!\n");
893 			ok = 1;
894 		}
895 	} else {
896 		if (__ratelimit(&drbd_ratelimit_state))
897 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
898 			    (unsigned long long)e->sector);
899 
900 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
901 
902 		/* update resync data with failure */
903 		drbd_rs_failed_io(mdev, e->sector, e->size);
904 	}
905 
906 	dec_unacked(mdev);
907 
908 	move_to_net_ee_or_free(mdev, e);
909 
910 	if (unlikely(!ok))
911 		dev_err(DEV, "drbd_send_block() failed\n");
912 	return ok;
913 }
914 
915 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
916 {
917 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
918 	struct digest_info *di;
919 	int digest_size;
920 	void *digest = NULL;
921 	int ok, eq = 0;
922 
923 	if (unlikely(cancel)) {
924 		drbd_free_ee(mdev, e);
925 		dec_unacked(mdev);
926 		return 1;
927 	}
928 
929 	drbd_rs_complete_io(mdev, e->sector);
930 
931 	di = (struct digest_info *)(unsigned long)e->block_id;
932 
933 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
934 		/* quick hack to try to avoid a race against reconfiguration.
935 		 * a real fix would be much more involved,
936 		 * introducing more locking mechanisms */
937 		if (mdev->csums_tfm) {
938 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
939 			D_ASSERT(digest_size == di->digest_size);
940 			digest = kmalloc(digest_size, GFP_NOIO);
941 		}
942 		if (digest) {
943 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
944 			eq = !memcmp(digest, di->digest, digest_size);
945 			kfree(digest);
946 		}
947 
948 		if (eq) {
949 			drbd_set_in_sync(mdev, e->sector, e->size);
950 			/* rs_same_csums unit is BM_BLOCK_SIZE */
951 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
952 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
953 		} else {
954 			inc_rs_pending(mdev);
955 			e->block_id = ID_SYNCER;
956 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
957 		}
958 	} else {
959 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
960 		if (__ratelimit(&drbd_ratelimit_state))
961 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
962 	}
963 
964 	dec_unacked(mdev);
965 
966 	kfree(di);
967 
968 	move_to_net_ee_or_free(mdev, e);
969 
970 	if (unlikely(!ok))
971 		dev_err(DEV, "drbd_send_block/ack() failed\n");
972 	return ok;
973 }
974 
975 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
976 {
977 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
978 	int digest_size;
979 	void *digest;
980 	int ok = 1;
981 
982 	if (unlikely(cancel))
983 		goto out;
984 
985 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
986 		goto out;
987 
988 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
989 	/* FIXME if this allocation fails, online verify will not terminate! */
990 	digest = kmalloc(digest_size, GFP_NOIO);
991 	if (digest) {
992 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
993 		inc_rs_pending(mdev);
994 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
995 					     digest, digest_size, P_OV_REPLY);
996 		if (!ok)
997 			dec_rs_pending(mdev);
998 		kfree(digest);
999 	}
1000 
1001 out:
1002 	drbd_free_ee(mdev, e);
1003 
1004 	dec_unacked(mdev);
1005 
1006 	return ok;
1007 }
1008 
1009 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1010 {
1011 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1012 		mdev->ov_last_oos_size += size>>9;
1013 	} else {
1014 		mdev->ov_last_oos_start = sector;
1015 		mdev->ov_last_oos_size = size>>9;
1016 	}
1017 	drbd_set_out_of_sync(mdev, sector, size);
1018 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1019 }
1020 
1021 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1022 {
1023 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1024 	struct digest_info *di;
1025 	int digest_size;
1026 	void *digest;
1027 	int ok, eq = 0;
1028 
1029 	if (unlikely(cancel)) {
1030 		drbd_free_ee(mdev, e);
1031 		dec_unacked(mdev);
1032 		return 1;
1033 	}
1034 
1035 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1036 	 * the resync lru has been cleaned up already */
1037 	drbd_rs_complete_io(mdev, e->sector);
1038 
1039 	di = (struct digest_info *)(unsigned long)e->block_id;
1040 
1041 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1042 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1043 		digest = kmalloc(digest_size, GFP_NOIO);
1044 		if (digest) {
1045 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1046 
1047 			D_ASSERT(digest_size == di->digest_size);
1048 			eq = !memcmp(digest, di->digest, digest_size);
1049 			kfree(digest);
1050 		}
1051 	} else {
1052 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1053 		if (__ratelimit(&drbd_ratelimit_state))
1054 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1055 	}
1056 
1057 	dec_unacked(mdev);
1058 
1059 	kfree(di);
1060 
1061 	if (!eq)
1062 		drbd_ov_oos_found(mdev, e->sector, e->size);
1063 	else
1064 		ov_oos_print(mdev);
1065 
1066 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1067 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1068 
1069 	drbd_free_ee(mdev, e);
1070 
1071 	if (--mdev->ov_left == 0) {
1072 		ov_oos_print(mdev);
1073 		drbd_resync_finished(mdev);
1074 	}
1075 
1076 	return ok;
1077 }
1078 
1079 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1080 {
1081 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1082 	complete(&b->done);
1083 	return 1;
1084 }
1085 
1086 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1087 {
1088 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1089 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1090 	int ok = 1;
1091 
1092 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1093 	 * just before it was reassigned and re-queued, so double check that.
1094 	 * actually, this race was harmless, since we only try to send the
1095 	 * barrier packet here, and otherwise do nothing with the object.
1096 	 * but compare with the head of w_clear_epoch */
1097 	spin_lock_irq(&mdev->req_lock);
1098 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1099 		cancel = 1;
1100 	spin_unlock_irq(&mdev->req_lock);
1101 	if (cancel)
1102 		return 1;
1103 
1104 	if (!drbd_get_data_sock(mdev))
1105 		return 0;
1106 	p->barrier = b->br_number;
1107 	/* inc_ap_pending was done where this was queued.
1108 	 * dec_ap_pending will be done in got_BarrierAck
1109 	 * or (on connection loss) in w_clear_epoch.  */
1110 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1111 				(struct p_header *)p, sizeof(*p), 0);
1112 	drbd_put_data_sock(mdev);
1113 
1114 	return ok;
1115 }
1116 
1117 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1118 {
1119 	if (cancel)
1120 		return 1;
1121 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1122 }
1123 
1124 /**
1125  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1126  * @mdev:	DRBD device.
1127  * @w:		work object.
1128  * @cancel:	The connection will be closed anyways
1129  */
1130 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1131 {
1132 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1133 	int ok;
1134 
1135 	if (unlikely(cancel)) {
1136 		req_mod(req, send_canceled);
1137 		return 1;
1138 	}
1139 
1140 	ok = drbd_send_dblock(mdev, req);
1141 	req_mod(req, ok ? handed_over_to_network : send_failed);
1142 
1143 	return ok;
1144 }
1145 
1146 /**
1147  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1148  * @mdev:	DRBD device.
1149  * @w:		work object.
1150  * @cancel:	The connection will be closed anyways
1151  */
1152 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1153 {
1154 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1155 	int ok;
1156 
1157 	if (unlikely(cancel)) {
1158 		req_mod(req, send_canceled);
1159 		return 1;
1160 	}
1161 
1162 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1163 				(unsigned long)req);
1164 
1165 	if (!ok) {
1166 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1167 		 * so this is probably redundant */
1168 		if (mdev->state.conn >= C_CONNECTED)
1169 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1170 	}
1171 	req_mod(req, ok ? handed_over_to_network : send_failed);
1172 
1173 	return ok;
1174 }
1175 
1176 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1177 {
1178 	struct drbd_conf *odev = mdev;
1179 
1180 	while (1) {
1181 		if (odev->sync_conf.after == -1)
1182 			return 1;
1183 		odev = minor_to_mdev(odev->sync_conf.after);
1184 		ERR_IF(!odev) return 1;
1185 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1186 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1187 		    odev->state.aftr_isp || odev->state.peer_isp ||
1188 		    odev->state.user_isp)
1189 			return 0;
1190 	}
1191 }
1192 
1193 /**
1194  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1195  * @mdev:	DRBD device.
1196  *
1197  * Called from process context only (admin command and after_state_ch).
1198  */
1199 static int _drbd_pause_after(struct drbd_conf *mdev)
1200 {
1201 	struct drbd_conf *odev;
1202 	int i, rv = 0;
1203 
1204 	for (i = 0; i < minor_count; i++) {
1205 		odev = minor_to_mdev(i);
1206 		if (!odev)
1207 			continue;
1208 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1209 			continue;
1210 		if (!_drbd_may_sync_now(odev))
1211 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1212 			       != SS_NOTHING_TO_DO);
1213 	}
1214 
1215 	return rv;
1216 }
1217 
1218 /**
1219  * _drbd_resume_next() - Resume resync on all devices that may resync now
1220  * @mdev:	DRBD device.
1221  *
1222  * Called from process context only (admin command and worker).
1223  */
1224 static int _drbd_resume_next(struct drbd_conf *mdev)
1225 {
1226 	struct drbd_conf *odev;
1227 	int i, rv = 0;
1228 
1229 	for (i = 0; i < minor_count; i++) {
1230 		odev = minor_to_mdev(i);
1231 		if (!odev)
1232 			continue;
1233 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1234 			continue;
1235 		if (odev->state.aftr_isp) {
1236 			if (_drbd_may_sync_now(odev))
1237 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1238 							CS_HARD, NULL)
1239 				       != SS_NOTHING_TO_DO) ;
1240 		}
1241 	}
1242 	return rv;
1243 }
1244 
1245 void resume_next_sg(struct drbd_conf *mdev)
1246 {
1247 	write_lock_irq(&global_state_lock);
1248 	_drbd_resume_next(mdev);
1249 	write_unlock_irq(&global_state_lock);
1250 }
1251 
1252 void suspend_other_sg(struct drbd_conf *mdev)
1253 {
1254 	write_lock_irq(&global_state_lock);
1255 	_drbd_pause_after(mdev);
1256 	write_unlock_irq(&global_state_lock);
1257 }
1258 
1259 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1260 {
1261 	struct drbd_conf *odev;
1262 
1263 	if (o_minor == -1)
1264 		return NO_ERROR;
1265 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1266 		return ERR_SYNC_AFTER;
1267 
1268 	/* check for loops */
1269 	odev = minor_to_mdev(o_minor);
1270 	while (1) {
1271 		if (odev == mdev)
1272 			return ERR_SYNC_AFTER_CYCLE;
1273 
1274 		/* dependency chain ends here, no cycles. */
1275 		if (odev->sync_conf.after == -1)
1276 			return NO_ERROR;
1277 
1278 		/* follow the dependency chain */
1279 		odev = minor_to_mdev(odev->sync_conf.after);
1280 	}
1281 }
1282 
1283 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1284 {
1285 	int changes;
1286 	int retcode;
1287 
1288 	write_lock_irq(&global_state_lock);
1289 	retcode = sync_after_error(mdev, na);
1290 	if (retcode == NO_ERROR) {
1291 		mdev->sync_conf.after = na;
1292 		do {
1293 			changes  = _drbd_pause_after(mdev);
1294 			changes |= _drbd_resume_next(mdev);
1295 		} while (changes);
1296 	}
1297 	write_unlock_irq(&global_state_lock);
1298 	return retcode;
1299 }
1300 
1301 static void ping_peer(struct drbd_conf *mdev)
1302 {
1303 	clear_bit(GOT_PING_ACK, &mdev->flags);
1304 	request_ping(mdev);
1305 	wait_event(mdev->misc_wait,
1306 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1307 }
1308 
1309 /**
1310  * drbd_start_resync() - Start the resync process
1311  * @mdev:	DRBD device.
1312  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1313  *
1314  * This function might bring you directly into one of the
1315  * C_PAUSED_SYNC_* states.
1316  */
1317 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1318 {
1319 	union drbd_state ns;
1320 	int r;
1321 
1322 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1323 		dev_err(DEV, "Resync already running!\n");
1324 		return;
1325 	}
1326 
1327 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1328 	drbd_rs_cancel_all(mdev);
1329 
1330 	if (side == C_SYNC_TARGET) {
1331 		/* Since application IO was locked out during C_WF_BITMAP_T and
1332 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1333 		   we check that we might make the data inconsistent. */
1334 		r = drbd_khelper(mdev, "before-resync-target");
1335 		r = (r >> 8) & 0xff;
1336 		if (r > 0) {
1337 			dev_info(DEV, "before-resync-target handler returned %d, "
1338 			     "dropping connection.\n", r);
1339 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1340 			return;
1341 		}
1342 	}
1343 
1344 	drbd_state_lock(mdev);
1345 
1346 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1347 		drbd_state_unlock(mdev);
1348 		return;
1349 	}
1350 
1351 	if (side == C_SYNC_TARGET) {
1352 		mdev->bm_resync_fo = 0;
1353 	} else /* side == C_SYNC_SOURCE */ {
1354 		u64 uuid;
1355 
1356 		get_random_bytes(&uuid, sizeof(u64));
1357 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1358 		drbd_send_sync_uuid(mdev, uuid);
1359 
1360 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1361 	}
1362 
1363 	write_lock_irq(&global_state_lock);
1364 	ns = mdev->state;
1365 
1366 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1367 
1368 	ns.conn = side;
1369 
1370 	if (side == C_SYNC_TARGET)
1371 		ns.disk = D_INCONSISTENT;
1372 	else /* side == C_SYNC_SOURCE */
1373 		ns.pdsk = D_INCONSISTENT;
1374 
1375 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1376 	ns = mdev->state;
1377 
1378 	if (ns.conn < C_CONNECTED)
1379 		r = SS_UNKNOWN_ERROR;
1380 
1381 	if (r == SS_SUCCESS) {
1382 		mdev->rs_total     =
1383 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1384 		mdev->rs_failed    = 0;
1385 		mdev->rs_paused    = 0;
1386 		mdev->rs_start     =
1387 		mdev->rs_mark_time = jiffies;
1388 		mdev->rs_same_csum = 0;
1389 		_drbd_pause_after(mdev);
1390 	}
1391 	write_unlock_irq(&global_state_lock);
1392 	put_ldev(mdev);
1393 
1394 	if (r == SS_SUCCESS) {
1395 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1396 		     drbd_conn_str(ns.conn),
1397 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1398 		     (unsigned long) mdev->rs_total);
1399 
1400 		if (mdev->rs_total == 0) {
1401 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1402 			ping_peer(mdev);
1403 			drbd_resync_finished(mdev);
1404 		}
1405 
1406 		/* ns.conn may already be != mdev->state.conn,
1407 		 * we may have been paused in between, or become paused until
1408 		 * the timer triggers.
1409 		 * No matter, that is handled in resync_timer_fn() */
1410 		if (ns.conn == C_SYNC_TARGET)
1411 			mod_timer(&mdev->resync_timer, jiffies);
1412 
1413 		drbd_md_sync(mdev);
1414 	}
1415 	drbd_state_unlock(mdev);
1416 }
1417 
1418 int drbd_worker(struct drbd_thread *thi)
1419 {
1420 	struct drbd_conf *mdev = thi->mdev;
1421 	struct drbd_work *w = NULL;
1422 	LIST_HEAD(work_list);
1423 	int intr = 0, i;
1424 
1425 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1426 
1427 	while (get_t_state(thi) == Running) {
1428 		drbd_thread_current_set_cpu(mdev);
1429 
1430 		if (down_trylock(&mdev->data.work.s)) {
1431 			mutex_lock(&mdev->data.mutex);
1432 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1433 				drbd_tcp_uncork(mdev->data.socket);
1434 			mutex_unlock(&mdev->data.mutex);
1435 
1436 			intr = down_interruptible(&mdev->data.work.s);
1437 
1438 			mutex_lock(&mdev->data.mutex);
1439 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1440 				drbd_tcp_cork(mdev->data.socket);
1441 			mutex_unlock(&mdev->data.mutex);
1442 		}
1443 
1444 		if (intr) {
1445 			D_ASSERT(intr == -EINTR);
1446 			flush_signals(current);
1447 			ERR_IF (get_t_state(thi) == Running)
1448 				continue;
1449 			break;
1450 		}
1451 
1452 		if (get_t_state(thi) != Running)
1453 			break;
1454 		/* With this break, we have done a down() but not consumed
1455 		   the entry from the list. The cleanup code takes care of
1456 		   this...   */
1457 
1458 		w = NULL;
1459 		spin_lock_irq(&mdev->data.work.q_lock);
1460 		ERR_IF(list_empty(&mdev->data.work.q)) {
1461 			/* something terribly wrong in our logic.
1462 			 * we were able to down() the semaphore,
1463 			 * but the list is empty... doh.
1464 			 *
1465 			 * what is the best thing to do now?
1466 			 * try again from scratch, restarting the receiver,
1467 			 * asender, whatnot? could break even more ugly,
1468 			 * e.g. when we are primary, but no good local data.
1469 			 *
1470 			 * I'll try to get away just starting over this loop.
1471 			 */
1472 			spin_unlock_irq(&mdev->data.work.q_lock);
1473 			continue;
1474 		}
1475 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1476 		list_del_init(&w->list);
1477 		spin_unlock_irq(&mdev->data.work.q_lock);
1478 
1479 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1480 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1481 			if (mdev->state.conn >= C_CONNECTED)
1482 				drbd_force_state(mdev,
1483 						NS(conn, C_NETWORK_FAILURE));
1484 		}
1485 	}
1486 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1487 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1488 
1489 	spin_lock_irq(&mdev->data.work.q_lock);
1490 	i = 0;
1491 	while (!list_empty(&mdev->data.work.q)) {
1492 		list_splice_init(&mdev->data.work.q, &work_list);
1493 		spin_unlock_irq(&mdev->data.work.q_lock);
1494 
1495 		while (!list_empty(&work_list)) {
1496 			w = list_entry(work_list.next, struct drbd_work, list);
1497 			list_del_init(&w->list);
1498 			w->cb(mdev, w, 1);
1499 			i++; /* dead debugging code */
1500 		}
1501 
1502 		spin_lock_irq(&mdev->data.work.q_lock);
1503 	}
1504 	sema_init(&mdev->data.work.s, 0);
1505 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1506 	 * but up() ed outside the spinlock, we could get an up() on the
1507 	 * semaphore without corresponding list entry.
1508 	 * So don't do that.
1509 	 */
1510 	spin_unlock_irq(&mdev->data.work.q_lock);
1511 
1512 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1513 	/* _drbd_set_state only uses stop_nowait.
1514 	 * wait here for the Exiting receiver. */
1515 	drbd_thread_stop(&mdev->receiver);
1516 	drbd_mdev_cleanup(mdev);
1517 
1518 	dev_info(DEV, "worker terminated\n");
1519 
1520 	clear_bit(DEVICE_DYING, &mdev->flags);
1521 	clear_bit(CONFIG_PENDING, &mdev->flags);
1522 	wake_up(&mdev->state_wait);
1523 
1524 	return 0;
1525 }
1526