xref: /linux/drivers/block/drbd/drbd_worker.c (revision b3b77c8caef1750ebeea1054e39e358550ea9f55)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 #define SLEEP_TIME (HZ/10)
43 
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45 
46 
47 
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_sec
51    drbd_endio_pri
52 
53  * more endio handlers:
54    atodb_endio in drbd_actlog.c
55    drbd_bm_async_io_complete in drbd_bitmap.c
56 
57  * For all these callbacks, note the following:
58  * The callbacks will be called in irq context by the IDE drivers,
59  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60  * Try to get the locking right :)
61  *
62  */
63 
64 
65 /* About the global_state_lock
66    Each state transition on an device holds a read lock. In case we have
67    to evaluate the sync after dependencies, we grab a write lock, because
68    we need stable states on all devices for that.  */
69 rwlock_t global_state_lock;
70 
71 /* used for synchronous meta data and bitmap IO
72  * submitted by drbd_md_sync_page_io()
73  */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76 	struct drbd_md_io *md_io;
77 
78 	md_io = (struct drbd_md_io *)bio->bi_private;
79 	md_io->error = error;
80 
81 	complete(&md_io->event);
82 }
83 
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89 	unsigned long flags = 0;
90 	struct drbd_conf *mdev = e->mdev;
91 
92 	D_ASSERT(e->block_id != ID_VACANT);
93 
94 	spin_lock_irqsave(&mdev->req_lock, flags);
95 	mdev->read_cnt += e->size >> 9;
96 	list_del(&e->w.list);
97 	if (list_empty(&mdev->read_ee))
98 		wake_up(&mdev->ee_wait);
99 	if (test_bit(__EE_WAS_ERROR, &e->flags))
100 		__drbd_chk_io_error(mdev, FALSE);
101 	spin_unlock_irqrestore(&mdev->req_lock, flags);
102 
103 	drbd_queue_work(&mdev->data.work, &e->w);
104 	put_ldev(mdev);
105 }
106 
107 static int is_failed_barrier(int ee_flags)
108 {
109 	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110 			== (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112 
113 /* writes on behalf of the partner, or resync writes,
114  * "submitted" by the receiver, final stage.  */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117 	unsigned long flags = 0;
118 	struct drbd_conf *mdev = e->mdev;
119 	sector_t e_sector;
120 	int do_wake;
121 	int is_syncer_req;
122 	int do_al_complete_io;
123 
124 	/* if this is a failed barrier request, disable use of barriers,
125 	 * and schedule for resubmission */
126 	if (is_failed_barrier(e->flags)) {
127 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
128 		spin_lock_irqsave(&mdev->req_lock, flags);
129 		list_del(&e->w.list);
130 		e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131 		e->w.cb = w_e_reissue;
132 		/* put_ldev actually happens below, once we come here again. */
133 		__release(local);
134 		spin_unlock_irqrestore(&mdev->req_lock, flags);
135 		drbd_queue_work(&mdev->data.work, &e->w);
136 		return;
137 	}
138 
139 	D_ASSERT(e->block_id != ID_VACANT);
140 
141 	/* after we moved e to done_ee,
142 	 * we may no longer access it,
143 	 * it may be freed/reused already!
144 	 * (as soon as we release the req_lock) */
145 	e_sector = e->sector;
146 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147 	is_syncer_req = is_syncer_block_id(e->block_id);
148 
149 	spin_lock_irqsave(&mdev->req_lock, flags);
150 	mdev->writ_cnt += e->size >> 9;
151 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
152 	list_add_tail(&e->w.list, &mdev->done_ee);
153 
154 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155 	 * neither did we wake possibly waiting conflicting requests.
156 	 * done from "drbd_process_done_ee" within the appropriate w.cb
157 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158 
159 	do_wake = is_syncer_req
160 		? list_empty(&mdev->sync_ee)
161 		: list_empty(&mdev->active_ee);
162 
163 	if (test_bit(__EE_WAS_ERROR, &e->flags))
164 		__drbd_chk_io_error(mdev, FALSE);
165 	spin_unlock_irqrestore(&mdev->req_lock, flags);
166 
167 	if (is_syncer_req)
168 		drbd_rs_complete_io(mdev, e_sector);
169 
170 	if (do_wake)
171 		wake_up(&mdev->ee_wait);
172 
173 	if (do_al_complete_io)
174 		drbd_al_complete_io(mdev, e_sector);
175 
176 	wake_asender(mdev);
177 	put_ldev(mdev);
178 }
179 
180 /* writes on behalf of the partner, or resync writes,
181  * "submitted" by the receiver.
182  */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185 	struct drbd_epoch_entry *e = bio->bi_private;
186 	struct drbd_conf *mdev = e->mdev;
187 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
188 	int is_write = bio_data_dir(bio) == WRITE;
189 
190 	if (error)
191 		dev_warn(DEV, "%s: error=%d s=%llus\n",
192 				is_write ? "write" : "read", error,
193 				(unsigned long long)e->sector);
194 	if (!error && !uptodate) {
195 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196 				is_write ? "write" : "read",
197 				(unsigned long long)e->sector);
198 		/* strange behavior of some lower level drivers...
199 		 * fail the request by clearing the uptodate flag,
200 		 * but do not return any error?! */
201 		error = -EIO;
202 	}
203 
204 	if (error)
205 		set_bit(__EE_WAS_ERROR, &e->flags);
206 
207 	bio_put(bio); /* no need for the bio anymore */
208 	if (atomic_dec_and_test(&e->pending_bios)) {
209 		if (is_write)
210 			drbd_endio_write_sec_final(e);
211 		else
212 			drbd_endio_read_sec_final(e);
213 	}
214 }
215 
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217  */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220 	unsigned long flags;
221 	struct drbd_request *req = bio->bi_private;
222 	struct drbd_conf *mdev = req->mdev;
223 	struct bio_and_error m;
224 	enum drbd_req_event what;
225 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
226 
227 	if (error)
228 		dev_warn(DEV, "p %s: error=%d\n",
229 			 bio_data_dir(bio) == WRITE ? "write" : "read", error);
230 	if (!error && !uptodate) {
231 		dev_warn(DEV, "p %s: setting error to -EIO\n",
232 			 bio_data_dir(bio) == WRITE ? "write" : "read");
233 		/* strange behavior of some lower level drivers...
234 		 * fail the request by clearing the uptodate flag,
235 		 * but do not return any error?! */
236 		error = -EIO;
237 	}
238 
239 	/* to avoid recursion in __req_mod */
240 	if (unlikely(error)) {
241 		what = (bio_data_dir(bio) == WRITE)
242 			? write_completed_with_error
243 			: (bio_rw(bio) == READ)
244 			  ? read_completed_with_error
245 			  : read_ahead_completed_with_error;
246 	} else
247 		what = completed_ok;
248 
249 	bio_put(req->private_bio);
250 	req->private_bio = ERR_PTR(error);
251 
252 	spin_lock_irqsave(&mdev->req_lock, flags);
253 	__req_mod(req, what, &m);
254 	spin_unlock_irqrestore(&mdev->req_lock, flags);
255 
256 	if (m.bio)
257 		complete_master_bio(mdev, &m);
258 }
259 
260 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
261 {
262 	struct drbd_request *req = container_of(w, struct drbd_request, w);
263 
264 	/* NOTE: mdev->ldev can be NULL by the time we get here! */
265 	/* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
266 
267 	/* the only way this callback is scheduled is from _req_may_be_done,
268 	 * when it is done and had a local write error, see comments there */
269 	drbd_req_free(req);
270 
271 	return TRUE;
272 }
273 
274 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
275 {
276 	struct drbd_request *req = container_of(w, struct drbd_request, w);
277 
278 	/* We should not detach for read io-error,
279 	 * but try to WRITE the P_DATA_REPLY to the failed location,
280 	 * to give the disk the chance to relocate that block */
281 
282 	spin_lock_irq(&mdev->req_lock);
283 	if (cancel ||
284 	    mdev->state.conn < C_CONNECTED ||
285 	    mdev->state.pdsk <= D_INCONSISTENT) {
286 		_req_mod(req, send_canceled);
287 		spin_unlock_irq(&mdev->req_lock);
288 		dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
289 		return 1;
290 	}
291 	spin_unlock_irq(&mdev->req_lock);
292 
293 	return w_send_read_req(mdev, w, 0);
294 }
295 
296 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
297 {
298 	ERR_IF(cancel) return 1;
299 	dev_err(DEV, "resync inactive, but callback triggered??\n");
300 	return 1; /* Simply ignore this! */
301 }
302 
303 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
304 {
305 	struct hash_desc desc;
306 	struct scatterlist sg;
307 	struct page *page = e->pages;
308 	struct page *tmp;
309 	unsigned len;
310 
311 	desc.tfm = tfm;
312 	desc.flags = 0;
313 
314 	sg_init_table(&sg, 1);
315 	crypto_hash_init(&desc);
316 
317 	while ((tmp = page_chain_next(page))) {
318 		/* all but the last page will be fully used */
319 		sg_set_page(&sg, page, PAGE_SIZE, 0);
320 		crypto_hash_update(&desc, &sg, sg.length);
321 		page = tmp;
322 	}
323 	/* and now the last, possibly only partially used page */
324 	len = e->size & (PAGE_SIZE - 1);
325 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
326 	crypto_hash_update(&desc, &sg, sg.length);
327 	crypto_hash_final(&desc, digest);
328 }
329 
330 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
331 {
332 	struct hash_desc desc;
333 	struct scatterlist sg;
334 	struct bio_vec *bvec;
335 	int i;
336 
337 	desc.tfm = tfm;
338 	desc.flags = 0;
339 
340 	sg_init_table(&sg, 1);
341 	crypto_hash_init(&desc);
342 
343 	__bio_for_each_segment(bvec, bio, i, 0) {
344 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
345 		crypto_hash_update(&desc, &sg, sg.length);
346 	}
347 	crypto_hash_final(&desc, digest);
348 }
349 
350 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
351 {
352 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
353 	int digest_size;
354 	void *digest;
355 	int ok;
356 
357 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
358 
359 	if (unlikely(cancel)) {
360 		drbd_free_ee(mdev, e);
361 		return 1;
362 	}
363 
364 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
365 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
366 		digest = kmalloc(digest_size, GFP_NOIO);
367 		if (digest) {
368 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
369 
370 			inc_rs_pending(mdev);
371 			ok = drbd_send_drequest_csum(mdev,
372 						     e->sector,
373 						     e->size,
374 						     digest,
375 						     digest_size,
376 						     P_CSUM_RS_REQUEST);
377 			kfree(digest);
378 		} else {
379 			dev_err(DEV, "kmalloc() of digest failed.\n");
380 			ok = 0;
381 		}
382 	} else
383 		ok = 1;
384 
385 	drbd_free_ee(mdev, e);
386 
387 	if (unlikely(!ok))
388 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
389 	return ok;
390 }
391 
392 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
393 
394 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
395 {
396 	struct drbd_epoch_entry *e;
397 
398 	if (!get_ldev(mdev))
399 		return 0;
400 
401 	/* GFP_TRY, because if there is no memory available right now, this may
402 	 * be rescheduled for later. It is "only" background resync, after all. */
403 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
404 	if (!e)
405 		goto fail;
406 
407 	spin_lock_irq(&mdev->req_lock);
408 	list_add(&e->w.list, &mdev->read_ee);
409 	spin_unlock_irq(&mdev->req_lock);
410 
411 	e->w.cb = w_e_send_csum;
412 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
413 		return 1;
414 
415 	drbd_free_ee(mdev, e);
416 fail:
417 	put_ldev(mdev);
418 	return 2;
419 }
420 
421 void resync_timer_fn(unsigned long data)
422 {
423 	unsigned long flags;
424 	struct drbd_conf *mdev = (struct drbd_conf *) data;
425 	int queue;
426 
427 	spin_lock_irqsave(&mdev->req_lock, flags);
428 
429 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
430 		queue = 1;
431 		if (mdev->state.conn == C_VERIFY_S)
432 			mdev->resync_work.cb = w_make_ov_request;
433 		else
434 			mdev->resync_work.cb = w_make_resync_request;
435 	} else {
436 		queue = 0;
437 		mdev->resync_work.cb = w_resync_inactive;
438 	}
439 
440 	spin_unlock_irqrestore(&mdev->req_lock, flags);
441 
442 	/* harmless race: list_empty outside data.work.q_lock */
443 	if (list_empty(&mdev->resync_work.list) && queue)
444 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
445 }
446 
447 static int calc_resync_rate(struct drbd_conf *mdev)
448 {
449 	int d = mdev->data_delay / 1000; /* us -> ms */
450 	int td = mdev->sync_conf.throttle_th * 100;  /* 0.1s -> ms */
451 	int hd = mdev->sync_conf.hold_off_th * 100;  /* 0.1s -> ms */
452 	int cr = mdev->sync_conf.rate;
453 
454 	return d <= td ? cr :
455 		d >= hd ? 0 :
456 		cr + (cr * (td - d) / (hd - td));
457 }
458 
459 int w_make_resync_request(struct drbd_conf *mdev,
460 		struct drbd_work *w, int cancel)
461 {
462 	unsigned long bit;
463 	sector_t sector;
464 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
465 	int max_segment_size;
466 	int number, i, size, pe, mx;
467 	int align, queued, sndbuf;
468 
469 	if (unlikely(cancel))
470 		return 1;
471 
472 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
473 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
474 		return 0;
475 	}
476 
477 	if (mdev->state.conn != C_SYNC_TARGET)
478 		dev_err(DEV, "%s in w_make_resync_request\n",
479 			drbd_conn_str(mdev->state.conn));
480 
481 	if (!get_ldev(mdev)) {
482 		/* Since we only need to access mdev->rsync a
483 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
484 		   to continue resync with a broken disk makes no sense at
485 		   all */
486 		dev_err(DEV, "Disk broke down during resync!\n");
487 		mdev->resync_work.cb = w_resync_inactive;
488 		return 1;
489 	}
490 
491 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
492 	 * if it should be necessary */
493 	max_segment_size = mdev->agreed_pro_version < 94 ?
494 		queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
495 
496 	mdev->c_sync_rate = calc_resync_rate(mdev);
497 	number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
498 	pe = atomic_read(&mdev->rs_pending_cnt);
499 
500 	mutex_lock(&mdev->data.mutex);
501 	if (mdev->data.socket)
502 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
503 	else
504 		mx = 1;
505 	mutex_unlock(&mdev->data.mutex);
506 
507 	/* For resync rates >160MB/sec, allow more pending RS requests */
508 	if (number > mx)
509 		mx = number;
510 
511 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
512 	if ((pe + number) > mx) {
513 		number = mx - pe;
514 	}
515 
516 	for (i = 0; i < number; i++) {
517 		/* Stop generating RS requests, when half of the send buffer is filled */
518 		mutex_lock(&mdev->data.mutex);
519 		if (mdev->data.socket) {
520 			queued = mdev->data.socket->sk->sk_wmem_queued;
521 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
522 		} else {
523 			queued = 1;
524 			sndbuf = 0;
525 		}
526 		mutex_unlock(&mdev->data.mutex);
527 		if (queued > sndbuf / 2)
528 			goto requeue;
529 
530 next_sector:
531 		size = BM_BLOCK_SIZE;
532 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
533 
534 		if (bit == -1UL) {
535 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
536 			mdev->resync_work.cb = w_resync_inactive;
537 			put_ldev(mdev);
538 			return 1;
539 		}
540 
541 		sector = BM_BIT_TO_SECT(bit);
542 
543 		if (drbd_try_rs_begin_io(mdev, sector)) {
544 			mdev->bm_resync_fo = bit;
545 			goto requeue;
546 		}
547 		mdev->bm_resync_fo = bit + 1;
548 
549 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
550 			drbd_rs_complete_io(mdev, sector);
551 			goto next_sector;
552 		}
553 
554 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
555 		/* try to find some adjacent bits.
556 		 * we stop if we have already the maximum req size.
557 		 *
558 		 * Additionally always align bigger requests, in order to
559 		 * be prepared for all stripe sizes of software RAIDs.
560 		 */
561 		align = 1;
562 		for (;;) {
563 			if (size + BM_BLOCK_SIZE > max_segment_size)
564 				break;
565 
566 			/* Be always aligned */
567 			if (sector & ((1<<(align+3))-1))
568 				break;
569 
570 			/* do not cross extent boundaries */
571 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
572 				break;
573 			/* now, is it actually dirty, after all?
574 			 * caution, drbd_bm_test_bit is tri-state for some
575 			 * obscure reason; ( b == 0 ) would get the out-of-band
576 			 * only accidentally right because of the "oddly sized"
577 			 * adjustment below */
578 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
579 				break;
580 			bit++;
581 			size += BM_BLOCK_SIZE;
582 			if ((BM_BLOCK_SIZE << align) <= size)
583 				align++;
584 			i++;
585 		}
586 		/* if we merged some,
587 		 * reset the offset to start the next drbd_bm_find_next from */
588 		if (size > BM_BLOCK_SIZE)
589 			mdev->bm_resync_fo = bit + 1;
590 #endif
591 
592 		/* adjust very last sectors, in case we are oddly sized */
593 		if (sector + (size>>9) > capacity)
594 			size = (capacity-sector)<<9;
595 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
596 			switch (read_for_csum(mdev, sector, size)) {
597 			case 0: /* Disk failure*/
598 				put_ldev(mdev);
599 				return 0;
600 			case 2: /* Allocation failed */
601 				drbd_rs_complete_io(mdev, sector);
602 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
603 				goto requeue;
604 			/* case 1: everything ok */
605 			}
606 		} else {
607 			inc_rs_pending(mdev);
608 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
609 					       sector, size, ID_SYNCER)) {
610 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
611 				dec_rs_pending(mdev);
612 				put_ldev(mdev);
613 				return 0;
614 			}
615 		}
616 	}
617 
618 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
619 		/* last syncer _request_ was sent,
620 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
621 		 * next sync group will resume), as soon as we receive the last
622 		 * resync data block, and the last bit is cleared.
623 		 * until then resync "work" is "inactive" ...
624 		 */
625 		mdev->resync_work.cb = w_resync_inactive;
626 		put_ldev(mdev);
627 		return 1;
628 	}
629 
630  requeue:
631 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
632 	put_ldev(mdev);
633 	return 1;
634 }
635 
636 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
637 {
638 	int number, i, size;
639 	sector_t sector;
640 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
641 
642 	if (unlikely(cancel))
643 		return 1;
644 
645 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
646 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
647 		return 0;
648 	}
649 
650 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
651 	if (atomic_read(&mdev->rs_pending_cnt) > number)
652 		goto requeue;
653 
654 	number -= atomic_read(&mdev->rs_pending_cnt);
655 
656 	sector = mdev->ov_position;
657 	for (i = 0; i < number; i++) {
658 		if (sector >= capacity) {
659 			mdev->resync_work.cb = w_resync_inactive;
660 			return 1;
661 		}
662 
663 		size = BM_BLOCK_SIZE;
664 
665 		if (drbd_try_rs_begin_io(mdev, sector)) {
666 			mdev->ov_position = sector;
667 			goto requeue;
668 		}
669 
670 		if (sector + (size>>9) > capacity)
671 			size = (capacity-sector)<<9;
672 
673 		inc_rs_pending(mdev);
674 		if (!drbd_send_ov_request(mdev, sector, size)) {
675 			dec_rs_pending(mdev);
676 			return 0;
677 		}
678 		sector += BM_SECT_PER_BIT;
679 	}
680 	mdev->ov_position = sector;
681 
682  requeue:
683 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
684 	return 1;
685 }
686 
687 
688 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
689 {
690 	kfree(w);
691 	ov_oos_print(mdev);
692 	drbd_resync_finished(mdev);
693 
694 	return 1;
695 }
696 
697 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699 	kfree(w);
700 
701 	drbd_resync_finished(mdev);
702 
703 	return 1;
704 }
705 
706 int drbd_resync_finished(struct drbd_conf *mdev)
707 {
708 	unsigned long db, dt, dbdt;
709 	unsigned long n_oos;
710 	union drbd_state os, ns;
711 	struct drbd_work *w;
712 	char *khelper_cmd = NULL;
713 
714 	/* Remove all elements from the resync LRU. Since future actions
715 	 * might set bits in the (main) bitmap, then the entries in the
716 	 * resync LRU would be wrong. */
717 	if (drbd_rs_del_all(mdev)) {
718 		/* In case this is not possible now, most probably because
719 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
720 		 * queue (or even the read operations for those packets
721 		 * is not finished by now).   Retry in 100ms. */
722 
723 		drbd_kick_lo(mdev);
724 		__set_current_state(TASK_INTERRUPTIBLE);
725 		schedule_timeout(HZ / 10);
726 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
727 		if (w) {
728 			w->cb = w_resync_finished;
729 			drbd_queue_work(&mdev->data.work, w);
730 			return 1;
731 		}
732 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
733 	}
734 
735 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
736 	if (dt <= 0)
737 		dt = 1;
738 	db = mdev->rs_total;
739 	dbdt = Bit2KB(db/dt);
740 	mdev->rs_paused /= HZ;
741 
742 	if (!get_ldev(mdev))
743 		goto out;
744 
745 	spin_lock_irq(&mdev->req_lock);
746 	os = mdev->state;
747 
748 	/* This protects us against multiple calls (that can happen in the presence
749 	   of application IO), and against connectivity loss just before we arrive here. */
750 	if (os.conn <= C_CONNECTED)
751 		goto out_unlock;
752 
753 	ns = os;
754 	ns.conn = C_CONNECTED;
755 
756 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
757 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
758 	     "Online verify " : "Resync",
759 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
760 
761 	n_oos = drbd_bm_total_weight(mdev);
762 
763 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
764 		if (n_oos) {
765 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
766 			      n_oos, Bit2KB(1));
767 			khelper_cmd = "out-of-sync";
768 		}
769 	} else {
770 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
771 
772 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
773 			khelper_cmd = "after-resync-target";
774 
775 		if (mdev->csums_tfm && mdev->rs_total) {
776 			const unsigned long s = mdev->rs_same_csum;
777 			const unsigned long t = mdev->rs_total;
778 			const int ratio =
779 				(t == 0)     ? 0 :
780 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
781 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
782 			     "transferred %luK total %luK\n",
783 			     ratio,
784 			     Bit2KB(mdev->rs_same_csum),
785 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
786 			     Bit2KB(mdev->rs_total));
787 		}
788 	}
789 
790 	if (mdev->rs_failed) {
791 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
792 
793 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
794 			ns.disk = D_INCONSISTENT;
795 			ns.pdsk = D_UP_TO_DATE;
796 		} else {
797 			ns.disk = D_UP_TO_DATE;
798 			ns.pdsk = D_INCONSISTENT;
799 		}
800 	} else {
801 		ns.disk = D_UP_TO_DATE;
802 		ns.pdsk = D_UP_TO_DATE;
803 
804 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
805 			if (mdev->p_uuid) {
806 				int i;
807 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
808 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
809 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
810 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
811 			} else {
812 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
813 			}
814 		}
815 
816 		drbd_uuid_set_bm(mdev, 0UL);
817 
818 		if (mdev->p_uuid) {
819 			/* Now the two UUID sets are equal, update what we
820 			 * know of the peer. */
821 			int i;
822 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
823 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
824 		}
825 	}
826 
827 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
828 out_unlock:
829 	spin_unlock_irq(&mdev->req_lock);
830 	put_ldev(mdev);
831 out:
832 	mdev->rs_total  = 0;
833 	mdev->rs_failed = 0;
834 	mdev->rs_paused = 0;
835 	mdev->ov_start_sector = 0;
836 
837 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
838 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
839 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
840 	}
841 
842 	if (khelper_cmd)
843 		drbd_khelper(mdev, khelper_cmd);
844 
845 	return 1;
846 }
847 
848 /* helper */
849 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
850 {
851 	if (drbd_ee_has_active_page(e)) {
852 		/* This might happen if sendpage() has not finished */
853 		spin_lock_irq(&mdev->req_lock);
854 		list_add_tail(&e->w.list, &mdev->net_ee);
855 		spin_unlock_irq(&mdev->req_lock);
856 	} else
857 		drbd_free_ee(mdev, e);
858 }
859 
860 /**
861  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
862  * @mdev:	DRBD device.
863  * @w:		work object.
864  * @cancel:	The connection will be closed anyways
865  */
866 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
867 {
868 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
869 	int ok;
870 
871 	if (unlikely(cancel)) {
872 		drbd_free_ee(mdev, e);
873 		dec_unacked(mdev);
874 		return 1;
875 	}
876 
877 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
878 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
879 	} else {
880 		if (__ratelimit(&drbd_ratelimit_state))
881 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
882 			    (unsigned long long)e->sector);
883 
884 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
885 	}
886 
887 	dec_unacked(mdev);
888 
889 	move_to_net_ee_or_free(mdev, e);
890 
891 	if (unlikely(!ok))
892 		dev_err(DEV, "drbd_send_block() failed\n");
893 	return ok;
894 }
895 
896 /**
897  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
898  * @mdev:	DRBD device.
899  * @w:		work object.
900  * @cancel:	The connection will be closed anyways
901  */
902 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
903 {
904 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
905 	int ok;
906 
907 	if (unlikely(cancel)) {
908 		drbd_free_ee(mdev, e);
909 		dec_unacked(mdev);
910 		return 1;
911 	}
912 
913 	if (get_ldev_if_state(mdev, D_FAILED)) {
914 		drbd_rs_complete_io(mdev, e->sector);
915 		put_ldev(mdev);
916 	}
917 
918 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
919 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
920 			inc_rs_pending(mdev);
921 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
922 		} else {
923 			if (__ratelimit(&drbd_ratelimit_state))
924 				dev_err(DEV, "Not sending RSDataReply, "
925 				    "partner DISKLESS!\n");
926 			ok = 1;
927 		}
928 	} else {
929 		if (__ratelimit(&drbd_ratelimit_state))
930 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
931 			    (unsigned long long)e->sector);
932 
933 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
934 
935 		/* update resync data with failure */
936 		drbd_rs_failed_io(mdev, e->sector, e->size);
937 	}
938 
939 	dec_unacked(mdev);
940 
941 	move_to_net_ee_or_free(mdev, e);
942 
943 	if (unlikely(!ok))
944 		dev_err(DEV, "drbd_send_block() failed\n");
945 	return ok;
946 }
947 
948 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
949 {
950 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
951 	struct digest_info *di;
952 	int digest_size;
953 	void *digest = NULL;
954 	int ok, eq = 0;
955 
956 	if (unlikely(cancel)) {
957 		drbd_free_ee(mdev, e);
958 		dec_unacked(mdev);
959 		return 1;
960 	}
961 
962 	drbd_rs_complete_io(mdev, e->sector);
963 
964 	di = (struct digest_info *)(unsigned long)e->block_id;
965 
966 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
967 		/* quick hack to try to avoid a race against reconfiguration.
968 		 * a real fix would be much more involved,
969 		 * introducing more locking mechanisms */
970 		if (mdev->csums_tfm) {
971 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
972 			D_ASSERT(digest_size == di->digest_size);
973 			digest = kmalloc(digest_size, GFP_NOIO);
974 		}
975 		if (digest) {
976 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
977 			eq = !memcmp(digest, di->digest, digest_size);
978 			kfree(digest);
979 		}
980 
981 		if (eq) {
982 			drbd_set_in_sync(mdev, e->sector, e->size);
983 			/* rs_same_csums unit is BM_BLOCK_SIZE */
984 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
985 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
986 		} else {
987 			inc_rs_pending(mdev);
988 			e->block_id = ID_SYNCER;
989 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
990 		}
991 	} else {
992 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
993 		if (__ratelimit(&drbd_ratelimit_state))
994 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
995 	}
996 
997 	dec_unacked(mdev);
998 
999 	kfree(di);
1000 
1001 	move_to_net_ee_or_free(mdev, e);
1002 
1003 	if (unlikely(!ok))
1004 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1005 	return ok;
1006 }
1007 
1008 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1009 {
1010 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1011 	int digest_size;
1012 	void *digest;
1013 	int ok = 1;
1014 
1015 	if (unlikely(cancel))
1016 		goto out;
1017 
1018 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1019 		goto out;
1020 
1021 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1022 	/* FIXME if this allocation fails, online verify will not terminate! */
1023 	digest = kmalloc(digest_size, GFP_NOIO);
1024 	if (digest) {
1025 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1026 		inc_rs_pending(mdev);
1027 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1028 					     digest, digest_size, P_OV_REPLY);
1029 		if (!ok)
1030 			dec_rs_pending(mdev);
1031 		kfree(digest);
1032 	}
1033 
1034 out:
1035 	drbd_free_ee(mdev, e);
1036 
1037 	dec_unacked(mdev);
1038 
1039 	return ok;
1040 }
1041 
1042 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1043 {
1044 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1045 		mdev->ov_last_oos_size += size>>9;
1046 	} else {
1047 		mdev->ov_last_oos_start = sector;
1048 		mdev->ov_last_oos_size = size>>9;
1049 	}
1050 	drbd_set_out_of_sync(mdev, sector, size);
1051 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1052 }
1053 
1054 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1055 {
1056 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1057 	struct digest_info *di;
1058 	int digest_size;
1059 	void *digest;
1060 	int ok, eq = 0;
1061 
1062 	if (unlikely(cancel)) {
1063 		drbd_free_ee(mdev, e);
1064 		dec_unacked(mdev);
1065 		return 1;
1066 	}
1067 
1068 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1069 	 * the resync lru has been cleaned up already */
1070 	drbd_rs_complete_io(mdev, e->sector);
1071 
1072 	di = (struct digest_info *)(unsigned long)e->block_id;
1073 
1074 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1075 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1076 		digest = kmalloc(digest_size, GFP_NOIO);
1077 		if (digest) {
1078 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1079 
1080 			D_ASSERT(digest_size == di->digest_size);
1081 			eq = !memcmp(digest, di->digest, digest_size);
1082 			kfree(digest);
1083 		}
1084 	} else {
1085 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1086 		if (__ratelimit(&drbd_ratelimit_state))
1087 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1088 	}
1089 
1090 	dec_unacked(mdev);
1091 
1092 	kfree(di);
1093 
1094 	if (!eq)
1095 		drbd_ov_oos_found(mdev, e->sector, e->size);
1096 	else
1097 		ov_oos_print(mdev);
1098 
1099 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1100 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1101 
1102 	drbd_free_ee(mdev, e);
1103 
1104 	if (--mdev->ov_left == 0) {
1105 		ov_oos_print(mdev);
1106 		drbd_resync_finished(mdev);
1107 	}
1108 
1109 	return ok;
1110 }
1111 
1112 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1113 {
1114 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1115 	complete(&b->done);
1116 	return 1;
1117 }
1118 
1119 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1120 {
1121 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1122 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1123 	int ok = 1;
1124 
1125 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1126 	 * just before it was reassigned and re-queued, so double check that.
1127 	 * actually, this race was harmless, since we only try to send the
1128 	 * barrier packet here, and otherwise do nothing with the object.
1129 	 * but compare with the head of w_clear_epoch */
1130 	spin_lock_irq(&mdev->req_lock);
1131 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1132 		cancel = 1;
1133 	spin_unlock_irq(&mdev->req_lock);
1134 	if (cancel)
1135 		return 1;
1136 
1137 	if (!drbd_get_data_sock(mdev))
1138 		return 0;
1139 	p->barrier = b->br_number;
1140 	/* inc_ap_pending was done where this was queued.
1141 	 * dec_ap_pending will be done in got_BarrierAck
1142 	 * or (on connection loss) in w_clear_epoch.  */
1143 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1144 				(struct p_header *)p, sizeof(*p), 0);
1145 	drbd_put_data_sock(mdev);
1146 
1147 	return ok;
1148 }
1149 
1150 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1151 {
1152 	if (cancel)
1153 		return 1;
1154 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1155 }
1156 
1157 /**
1158  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1159  * @mdev:	DRBD device.
1160  * @w:		work object.
1161  * @cancel:	The connection will be closed anyways
1162  */
1163 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1164 {
1165 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1166 	int ok;
1167 
1168 	if (unlikely(cancel)) {
1169 		req_mod(req, send_canceled);
1170 		return 1;
1171 	}
1172 
1173 	ok = drbd_send_dblock(mdev, req);
1174 	req_mod(req, ok ? handed_over_to_network : send_failed);
1175 
1176 	return ok;
1177 }
1178 
1179 /**
1180  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1181  * @mdev:	DRBD device.
1182  * @w:		work object.
1183  * @cancel:	The connection will be closed anyways
1184  */
1185 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1186 {
1187 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1188 	int ok;
1189 
1190 	if (unlikely(cancel)) {
1191 		req_mod(req, send_canceled);
1192 		return 1;
1193 	}
1194 
1195 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1196 				(unsigned long)req);
1197 
1198 	if (!ok) {
1199 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1200 		 * so this is probably redundant */
1201 		if (mdev->state.conn >= C_CONNECTED)
1202 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1203 	}
1204 	req_mod(req, ok ? handed_over_to_network : send_failed);
1205 
1206 	return ok;
1207 }
1208 
1209 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1210 {
1211 	struct drbd_conf *odev = mdev;
1212 
1213 	while (1) {
1214 		if (odev->sync_conf.after == -1)
1215 			return 1;
1216 		odev = minor_to_mdev(odev->sync_conf.after);
1217 		ERR_IF(!odev) return 1;
1218 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1219 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1220 		    odev->state.aftr_isp || odev->state.peer_isp ||
1221 		    odev->state.user_isp)
1222 			return 0;
1223 	}
1224 }
1225 
1226 /**
1227  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1228  * @mdev:	DRBD device.
1229  *
1230  * Called from process context only (admin command and after_state_ch).
1231  */
1232 static int _drbd_pause_after(struct drbd_conf *mdev)
1233 {
1234 	struct drbd_conf *odev;
1235 	int i, rv = 0;
1236 
1237 	for (i = 0; i < minor_count; i++) {
1238 		odev = minor_to_mdev(i);
1239 		if (!odev)
1240 			continue;
1241 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1242 			continue;
1243 		if (!_drbd_may_sync_now(odev))
1244 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1245 			       != SS_NOTHING_TO_DO);
1246 	}
1247 
1248 	return rv;
1249 }
1250 
1251 /**
1252  * _drbd_resume_next() - Resume resync on all devices that may resync now
1253  * @mdev:	DRBD device.
1254  *
1255  * Called from process context only (admin command and worker).
1256  */
1257 static int _drbd_resume_next(struct drbd_conf *mdev)
1258 {
1259 	struct drbd_conf *odev;
1260 	int i, rv = 0;
1261 
1262 	for (i = 0; i < minor_count; i++) {
1263 		odev = minor_to_mdev(i);
1264 		if (!odev)
1265 			continue;
1266 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1267 			continue;
1268 		if (odev->state.aftr_isp) {
1269 			if (_drbd_may_sync_now(odev))
1270 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1271 							CS_HARD, NULL)
1272 				       != SS_NOTHING_TO_DO) ;
1273 		}
1274 	}
1275 	return rv;
1276 }
1277 
1278 void resume_next_sg(struct drbd_conf *mdev)
1279 {
1280 	write_lock_irq(&global_state_lock);
1281 	_drbd_resume_next(mdev);
1282 	write_unlock_irq(&global_state_lock);
1283 }
1284 
1285 void suspend_other_sg(struct drbd_conf *mdev)
1286 {
1287 	write_lock_irq(&global_state_lock);
1288 	_drbd_pause_after(mdev);
1289 	write_unlock_irq(&global_state_lock);
1290 }
1291 
1292 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1293 {
1294 	struct drbd_conf *odev;
1295 
1296 	if (o_minor == -1)
1297 		return NO_ERROR;
1298 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1299 		return ERR_SYNC_AFTER;
1300 
1301 	/* check for loops */
1302 	odev = minor_to_mdev(o_minor);
1303 	while (1) {
1304 		if (odev == mdev)
1305 			return ERR_SYNC_AFTER_CYCLE;
1306 
1307 		/* dependency chain ends here, no cycles. */
1308 		if (odev->sync_conf.after == -1)
1309 			return NO_ERROR;
1310 
1311 		/* follow the dependency chain */
1312 		odev = minor_to_mdev(odev->sync_conf.after);
1313 	}
1314 }
1315 
1316 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1317 {
1318 	int changes;
1319 	int retcode;
1320 
1321 	write_lock_irq(&global_state_lock);
1322 	retcode = sync_after_error(mdev, na);
1323 	if (retcode == NO_ERROR) {
1324 		mdev->sync_conf.after = na;
1325 		do {
1326 			changes  = _drbd_pause_after(mdev);
1327 			changes |= _drbd_resume_next(mdev);
1328 		} while (changes);
1329 	}
1330 	write_unlock_irq(&global_state_lock);
1331 	return retcode;
1332 }
1333 
1334 static void ping_peer(struct drbd_conf *mdev)
1335 {
1336 	clear_bit(GOT_PING_ACK, &mdev->flags);
1337 	request_ping(mdev);
1338 	wait_event(mdev->misc_wait,
1339 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1340 }
1341 
1342 /**
1343  * drbd_start_resync() - Start the resync process
1344  * @mdev:	DRBD device.
1345  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1346  *
1347  * This function might bring you directly into one of the
1348  * C_PAUSED_SYNC_* states.
1349  */
1350 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1351 {
1352 	union drbd_state ns;
1353 	int r;
1354 
1355 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1356 		dev_err(DEV, "Resync already running!\n");
1357 		return;
1358 	}
1359 
1360 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1361 	drbd_rs_cancel_all(mdev);
1362 
1363 	if (side == C_SYNC_TARGET) {
1364 		/* Since application IO was locked out during C_WF_BITMAP_T and
1365 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1366 		   we check that we might make the data inconsistent. */
1367 		r = drbd_khelper(mdev, "before-resync-target");
1368 		r = (r >> 8) & 0xff;
1369 		if (r > 0) {
1370 			dev_info(DEV, "before-resync-target handler returned %d, "
1371 			     "dropping connection.\n", r);
1372 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1373 			return;
1374 		}
1375 	}
1376 
1377 	drbd_state_lock(mdev);
1378 
1379 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1380 		drbd_state_unlock(mdev);
1381 		return;
1382 	}
1383 
1384 	if (side == C_SYNC_TARGET) {
1385 		mdev->bm_resync_fo = 0;
1386 	} else /* side == C_SYNC_SOURCE */ {
1387 		u64 uuid;
1388 
1389 		get_random_bytes(&uuid, sizeof(u64));
1390 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1391 		drbd_send_sync_uuid(mdev, uuid);
1392 
1393 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1394 	}
1395 
1396 	write_lock_irq(&global_state_lock);
1397 	ns = mdev->state;
1398 
1399 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1400 
1401 	ns.conn = side;
1402 
1403 	if (side == C_SYNC_TARGET)
1404 		ns.disk = D_INCONSISTENT;
1405 	else /* side == C_SYNC_SOURCE */
1406 		ns.pdsk = D_INCONSISTENT;
1407 
1408 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1409 	ns = mdev->state;
1410 
1411 	if (ns.conn < C_CONNECTED)
1412 		r = SS_UNKNOWN_ERROR;
1413 
1414 	if (r == SS_SUCCESS) {
1415 		mdev->rs_total     =
1416 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1417 		mdev->rs_failed    = 0;
1418 		mdev->rs_paused    = 0;
1419 		mdev->rs_start     =
1420 		mdev->rs_mark_time = jiffies;
1421 		mdev->rs_same_csum = 0;
1422 		_drbd_pause_after(mdev);
1423 	}
1424 	write_unlock_irq(&global_state_lock);
1425 	put_ldev(mdev);
1426 
1427 	if (r == SS_SUCCESS) {
1428 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1429 		     drbd_conn_str(ns.conn),
1430 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1431 		     (unsigned long) mdev->rs_total);
1432 
1433 		if (mdev->rs_total == 0) {
1434 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1435 			ping_peer(mdev);
1436 			drbd_resync_finished(mdev);
1437 		}
1438 
1439 		/* ns.conn may already be != mdev->state.conn,
1440 		 * we may have been paused in between, or become paused until
1441 		 * the timer triggers.
1442 		 * No matter, that is handled in resync_timer_fn() */
1443 		if (ns.conn == C_SYNC_TARGET)
1444 			mod_timer(&mdev->resync_timer, jiffies);
1445 
1446 		drbd_md_sync(mdev);
1447 	}
1448 	drbd_state_unlock(mdev);
1449 }
1450 
1451 int drbd_worker(struct drbd_thread *thi)
1452 {
1453 	struct drbd_conf *mdev = thi->mdev;
1454 	struct drbd_work *w = NULL;
1455 	LIST_HEAD(work_list);
1456 	int intr = 0, i;
1457 
1458 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1459 
1460 	while (get_t_state(thi) == Running) {
1461 		drbd_thread_current_set_cpu(mdev);
1462 
1463 		if (down_trylock(&mdev->data.work.s)) {
1464 			mutex_lock(&mdev->data.mutex);
1465 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1466 				drbd_tcp_uncork(mdev->data.socket);
1467 			mutex_unlock(&mdev->data.mutex);
1468 
1469 			intr = down_interruptible(&mdev->data.work.s);
1470 
1471 			mutex_lock(&mdev->data.mutex);
1472 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1473 				drbd_tcp_cork(mdev->data.socket);
1474 			mutex_unlock(&mdev->data.mutex);
1475 		}
1476 
1477 		if (intr) {
1478 			D_ASSERT(intr == -EINTR);
1479 			flush_signals(current);
1480 			ERR_IF (get_t_state(thi) == Running)
1481 				continue;
1482 			break;
1483 		}
1484 
1485 		if (get_t_state(thi) != Running)
1486 			break;
1487 		/* With this break, we have done a down() but not consumed
1488 		   the entry from the list. The cleanup code takes care of
1489 		   this...   */
1490 
1491 		w = NULL;
1492 		spin_lock_irq(&mdev->data.work.q_lock);
1493 		ERR_IF(list_empty(&mdev->data.work.q)) {
1494 			/* something terribly wrong in our logic.
1495 			 * we were able to down() the semaphore,
1496 			 * but the list is empty... doh.
1497 			 *
1498 			 * what is the best thing to do now?
1499 			 * try again from scratch, restarting the receiver,
1500 			 * asender, whatnot? could break even more ugly,
1501 			 * e.g. when we are primary, but no good local data.
1502 			 *
1503 			 * I'll try to get away just starting over this loop.
1504 			 */
1505 			spin_unlock_irq(&mdev->data.work.q_lock);
1506 			continue;
1507 		}
1508 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1509 		list_del_init(&w->list);
1510 		spin_unlock_irq(&mdev->data.work.q_lock);
1511 
1512 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1513 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1514 			if (mdev->state.conn >= C_CONNECTED)
1515 				drbd_force_state(mdev,
1516 						NS(conn, C_NETWORK_FAILURE));
1517 		}
1518 	}
1519 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1520 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1521 
1522 	spin_lock_irq(&mdev->data.work.q_lock);
1523 	i = 0;
1524 	while (!list_empty(&mdev->data.work.q)) {
1525 		list_splice_init(&mdev->data.work.q, &work_list);
1526 		spin_unlock_irq(&mdev->data.work.q_lock);
1527 
1528 		while (!list_empty(&work_list)) {
1529 			w = list_entry(work_list.next, struct drbd_work, list);
1530 			list_del_init(&w->list);
1531 			w->cb(mdev, w, 1);
1532 			i++; /* dead debugging code */
1533 		}
1534 
1535 		spin_lock_irq(&mdev->data.work.q_lock);
1536 	}
1537 	sema_init(&mdev->data.work.s, 0);
1538 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1539 	 * but up() ed outside the spinlock, we could get an up() on the
1540 	 * semaphore without corresponding list entry.
1541 	 * So don't do that.
1542 	 */
1543 	spin_unlock_irq(&mdev->data.work.q_lock);
1544 
1545 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1546 	/* _drbd_set_state only uses stop_nowait.
1547 	 * wait here for the Exiting receiver. */
1548 	drbd_thread_stop(&mdev->receiver);
1549 	drbd_mdev_cleanup(mdev);
1550 
1551 	dev_info(DEV, "worker terminated\n");
1552 
1553 	clear_bit(DEVICE_DYING, &mdev->flags);
1554 	clear_bit(CONFIG_PENDING, &mdev->flags);
1555 	wake_up(&mdev->state_wait);
1556 
1557 	return 0;
1558 }
1559