xref: /linux/drivers/block/drbd/drbd_worker.c (revision 273b281fa22c293963ee3e6eec418f5dda2dbc83)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/version.h>
28 #include <linux/drbd.h>
29 #include <linux/sched.h>
30 #include <linux/smp_lock.h>
31 #include <linux/wait.h>
32 #include <linux/mm.h>
33 #include <linux/memcontrol.h>
34 #include <linux/mm_inline.h>
35 #include <linux/slab.h>
36 #include <linux/random.h>
37 #include <linux/mm.h>
38 #include <linux/string.h>
39 #include <linux/scatterlist.h>
40 
41 #include "drbd_int.h"
42 #include "drbd_req.h"
43 
44 #define SLEEP_TIME (HZ/10)
45 
46 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
47 
48 
49 
50 /* defined here:
51    drbd_md_io_complete
52    drbd_endio_write_sec
53    drbd_endio_read_sec
54    drbd_endio_pri
55 
56  * more endio handlers:
57    atodb_endio in drbd_actlog.c
58    drbd_bm_async_io_complete in drbd_bitmap.c
59 
60  * For all these callbacks, note the following:
61  * The callbacks will be called in irq context by the IDE drivers,
62  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
63  * Try to get the locking right :)
64  *
65  */
66 
67 
68 /* About the global_state_lock
69    Each state transition on an device holds a read lock. In case we have
70    to evaluate the sync after dependencies, we grab a write lock, because
71    we need stable states on all devices for that.  */
72 rwlock_t global_state_lock;
73 
74 /* used for synchronous meta data and bitmap IO
75  * submitted by drbd_md_sync_page_io()
76  */
77 void drbd_md_io_complete(struct bio *bio, int error)
78 {
79 	struct drbd_md_io *md_io;
80 
81 	md_io = (struct drbd_md_io *)bio->bi_private;
82 	md_io->error = error;
83 
84 	complete(&md_io->event);
85 }
86 
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
91 {
92 	unsigned long flags = 0;
93 	struct drbd_epoch_entry *e = NULL;
94 	struct drbd_conf *mdev;
95 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
96 
97 	e = bio->bi_private;
98 	mdev = e->mdev;
99 
100 	if (error)
101 		dev_warn(DEV, "read: error=%d s=%llus\n", error,
102 				(unsigned long long)e->sector);
103 	if (!error && !uptodate) {
104 		dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
105 				(unsigned long long)e->sector);
106 		/* strange behavior of some lower level drivers...
107 		 * fail the request by clearing the uptodate flag,
108 		 * but do not return any error?! */
109 		error = -EIO;
110 	}
111 
112 	D_ASSERT(e->block_id != ID_VACANT);
113 
114 	spin_lock_irqsave(&mdev->req_lock, flags);
115 	mdev->read_cnt += e->size >> 9;
116 	list_del(&e->w.list);
117 	if (list_empty(&mdev->read_ee))
118 		wake_up(&mdev->ee_wait);
119 	spin_unlock_irqrestore(&mdev->req_lock, flags);
120 
121 	drbd_chk_io_error(mdev, error, FALSE);
122 	drbd_queue_work(&mdev->data.work, &e->w);
123 	put_ldev(mdev);
124 }
125 
126 /* writes on behalf of the partner, or resync writes,
127  * "submitted" by the receiver.
128  */
129 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
130 {
131 	unsigned long flags = 0;
132 	struct drbd_epoch_entry *e = NULL;
133 	struct drbd_conf *mdev;
134 	sector_t e_sector;
135 	int do_wake;
136 	int is_syncer_req;
137 	int do_al_complete_io;
138 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
139 	int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
140 
141 	e = bio->bi_private;
142 	mdev = e->mdev;
143 
144 	if (error)
145 		dev_warn(DEV, "write: error=%d s=%llus\n", error,
146 				(unsigned long long)e->sector);
147 	if (!error && !uptodate) {
148 		dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
149 				(unsigned long long)e->sector);
150 		/* strange behavior of some lower level drivers...
151 		 * fail the request by clearing the uptodate flag,
152 		 * but do not return any error?! */
153 		error = -EIO;
154 	}
155 
156 	/* error == -ENOTSUPP would be a better test,
157 	 * alas it is not reliable */
158 	if (error && is_barrier && e->flags & EE_IS_BARRIER) {
159 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
160 		spin_lock_irqsave(&mdev->req_lock, flags);
161 		list_del(&e->w.list);
162 		e->w.cb = w_e_reissue;
163 		/* put_ldev actually happens below, once we come here again. */
164 		__release(local);
165 		spin_unlock_irqrestore(&mdev->req_lock, flags);
166 		drbd_queue_work(&mdev->data.work, &e->w);
167 		return;
168 	}
169 
170 	D_ASSERT(e->block_id != ID_VACANT);
171 
172 	spin_lock_irqsave(&mdev->req_lock, flags);
173 	mdev->writ_cnt += e->size >> 9;
174 	is_syncer_req = is_syncer_block_id(e->block_id);
175 
176 	/* after we moved e to done_ee,
177 	 * we may no longer access it,
178 	 * it may be freed/reused already!
179 	 * (as soon as we release the req_lock) */
180 	e_sector = e->sector;
181 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
182 
183 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
184 	list_add_tail(&e->w.list, &mdev->done_ee);
185 
186 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
187 	 * neither did we wake possibly waiting conflicting requests.
188 	 * done from "drbd_process_done_ee" within the appropriate w.cb
189 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
190 
191 	do_wake = is_syncer_req
192 		? list_empty(&mdev->sync_ee)
193 		: list_empty(&mdev->active_ee);
194 
195 	if (error)
196 		__drbd_chk_io_error(mdev, FALSE);
197 	spin_unlock_irqrestore(&mdev->req_lock, flags);
198 
199 	if (is_syncer_req)
200 		drbd_rs_complete_io(mdev, e_sector);
201 
202 	if (do_wake)
203 		wake_up(&mdev->ee_wait);
204 
205 	if (do_al_complete_io)
206 		drbd_al_complete_io(mdev, e_sector);
207 
208 	wake_asender(mdev);
209 	put_ldev(mdev);
210 
211 }
212 
213 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
214  */
215 void drbd_endio_pri(struct bio *bio, int error)
216 {
217 	unsigned long flags;
218 	struct drbd_request *req = bio->bi_private;
219 	struct drbd_conf *mdev = req->mdev;
220 	struct bio_and_error m;
221 	enum drbd_req_event what;
222 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
223 
224 	if (error)
225 		dev_warn(DEV, "p %s: error=%d\n",
226 			 bio_data_dir(bio) == WRITE ? "write" : "read", error);
227 	if (!error && !uptodate) {
228 		dev_warn(DEV, "p %s: setting error to -EIO\n",
229 			 bio_data_dir(bio) == WRITE ? "write" : "read");
230 		/* strange behavior of some lower level drivers...
231 		 * fail the request by clearing the uptodate flag,
232 		 * but do not return any error?! */
233 		error = -EIO;
234 	}
235 
236 	/* to avoid recursion in __req_mod */
237 	if (unlikely(error)) {
238 		what = (bio_data_dir(bio) == WRITE)
239 			? write_completed_with_error
240 			: (bio_rw(bio) == READA)
241 			  ? read_completed_with_error
242 			  : read_ahead_completed_with_error;
243 	} else
244 		what = completed_ok;
245 
246 	bio_put(req->private_bio);
247 	req->private_bio = ERR_PTR(error);
248 
249 	spin_lock_irqsave(&mdev->req_lock, flags);
250 	__req_mod(req, what, &m);
251 	spin_unlock_irqrestore(&mdev->req_lock, flags);
252 
253 	if (m.bio)
254 		complete_master_bio(mdev, &m);
255 }
256 
257 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
258 {
259 	struct drbd_request *req = container_of(w, struct drbd_request, w);
260 
261 	/* NOTE: mdev->ldev can be NULL by the time we get here! */
262 	/* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
263 
264 	/* the only way this callback is scheduled is from _req_may_be_done,
265 	 * when it is done and had a local write error, see comments there */
266 	drbd_req_free(req);
267 
268 	return TRUE;
269 }
270 
271 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
272 {
273 	struct drbd_request *req = container_of(w, struct drbd_request, w);
274 
275 	/* We should not detach for read io-error,
276 	 * but try to WRITE the P_DATA_REPLY to the failed location,
277 	 * to give the disk the chance to relocate that block */
278 
279 	spin_lock_irq(&mdev->req_lock);
280 	if (cancel ||
281 	    mdev->state.conn < C_CONNECTED ||
282 	    mdev->state.pdsk <= D_INCONSISTENT) {
283 		_req_mod(req, send_canceled);
284 		spin_unlock_irq(&mdev->req_lock);
285 		dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
286 		return 1;
287 	}
288 	spin_unlock_irq(&mdev->req_lock);
289 
290 	return w_send_read_req(mdev, w, 0);
291 }
292 
293 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
294 {
295 	ERR_IF(cancel) return 1;
296 	dev_err(DEV, "resync inactive, but callback triggered??\n");
297 	return 1; /* Simply ignore this! */
298 }
299 
300 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
301 {
302 	struct hash_desc desc;
303 	struct scatterlist sg;
304 	struct bio_vec *bvec;
305 	int i;
306 
307 	desc.tfm = tfm;
308 	desc.flags = 0;
309 
310 	sg_init_table(&sg, 1);
311 	crypto_hash_init(&desc);
312 
313 	__bio_for_each_segment(bvec, bio, i, 0) {
314 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
315 		crypto_hash_update(&desc, &sg, sg.length);
316 	}
317 	crypto_hash_final(&desc, digest);
318 }
319 
320 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
321 {
322 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
323 	int digest_size;
324 	void *digest;
325 	int ok;
326 
327 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
328 
329 	if (unlikely(cancel)) {
330 		drbd_free_ee(mdev, e);
331 		return 1;
332 	}
333 
334 	if (likely(drbd_bio_uptodate(e->private_bio))) {
335 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
336 		digest = kmalloc(digest_size, GFP_NOIO);
337 		if (digest) {
338 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
339 
340 			inc_rs_pending(mdev);
341 			ok = drbd_send_drequest_csum(mdev,
342 						     e->sector,
343 						     e->size,
344 						     digest,
345 						     digest_size,
346 						     P_CSUM_RS_REQUEST);
347 			kfree(digest);
348 		} else {
349 			dev_err(DEV, "kmalloc() of digest failed.\n");
350 			ok = 0;
351 		}
352 	} else
353 		ok = 1;
354 
355 	drbd_free_ee(mdev, e);
356 
357 	if (unlikely(!ok))
358 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
359 	return ok;
360 }
361 
362 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
363 
364 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
365 {
366 	struct drbd_epoch_entry *e;
367 
368 	if (!get_ldev(mdev))
369 		return 0;
370 
371 	/* GFP_TRY, because if there is no memory available right now, this may
372 	 * be rescheduled for later. It is "only" background resync, after all. */
373 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
374 	if (!e) {
375 		put_ldev(mdev);
376 		return 2;
377 	}
378 
379 	spin_lock_irq(&mdev->req_lock);
380 	list_add(&e->w.list, &mdev->read_ee);
381 	spin_unlock_irq(&mdev->req_lock);
382 
383 	e->private_bio->bi_end_io = drbd_endio_read_sec;
384 	e->private_bio->bi_rw = READ;
385 	e->w.cb = w_e_send_csum;
386 
387 	mdev->read_cnt += size >> 9;
388 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
389 
390 	return 1;
391 }
392 
393 void resync_timer_fn(unsigned long data)
394 {
395 	unsigned long flags;
396 	struct drbd_conf *mdev = (struct drbd_conf *) data;
397 	int queue;
398 
399 	spin_lock_irqsave(&mdev->req_lock, flags);
400 
401 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
402 		queue = 1;
403 		if (mdev->state.conn == C_VERIFY_S)
404 			mdev->resync_work.cb = w_make_ov_request;
405 		else
406 			mdev->resync_work.cb = w_make_resync_request;
407 	} else {
408 		queue = 0;
409 		mdev->resync_work.cb = w_resync_inactive;
410 	}
411 
412 	spin_unlock_irqrestore(&mdev->req_lock, flags);
413 
414 	/* harmless race: list_empty outside data.work.q_lock */
415 	if (list_empty(&mdev->resync_work.list) && queue)
416 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
417 }
418 
419 int w_make_resync_request(struct drbd_conf *mdev,
420 		struct drbd_work *w, int cancel)
421 {
422 	unsigned long bit;
423 	sector_t sector;
424 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
425 	int max_segment_size = queue_max_segment_size(mdev->rq_queue);
426 	int number, i, size, pe, mx;
427 	int align, queued, sndbuf;
428 
429 	if (unlikely(cancel))
430 		return 1;
431 
432 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
433 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
434 		return 0;
435 	}
436 
437 	if (mdev->state.conn != C_SYNC_TARGET)
438 		dev_err(DEV, "%s in w_make_resync_request\n",
439 			drbd_conn_str(mdev->state.conn));
440 
441 	if (!get_ldev(mdev)) {
442 		/* Since we only need to access mdev->rsync a
443 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
444 		   to continue resync with a broken disk makes no sense at
445 		   all */
446 		dev_err(DEV, "Disk broke down during resync!\n");
447 		mdev->resync_work.cb = w_resync_inactive;
448 		return 1;
449 	}
450 
451 	number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
452 	pe = atomic_read(&mdev->rs_pending_cnt);
453 
454 	mutex_lock(&mdev->data.mutex);
455 	if (mdev->data.socket)
456 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
457 	else
458 		mx = 1;
459 	mutex_unlock(&mdev->data.mutex);
460 
461 	/* For resync rates >160MB/sec, allow more pending RS requests */
462 	if (number > mx)
463 		mx = number;
464 
465 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
466 	if ((pe + number) > mx) {
467 		number = mx - pe;
468 	}
469 
470 	for (i = 0; i < number; i++) {
471 		/* Stop generating RS requests, when half of the send buffer is filled */
472 		mutex_lock(&mdev->data.mutex);
473 		if (mdev->data.socket) {
474 			queued = mdev->data.socket->sk->sk_wmem_queued;
475 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
476 		} else {
477 			queued = 1;
478 			sndbuf = 0;
479 		}
480 		mutex_unlock(&mdev->data.mutex);
481 		if (queued > sndbuf / 2)
482 			goto requeue;
483 
484 next_sector:
485 		size = BM_BLOCK_SIZE;
486 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
487 
488 		if (bit == -1UL) {
489 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
490 			mdev->resync_work.cb = w_resync_inactive;
491 			put_ldev(mdev);
492 			return 1;
493 		}
494 
495 		sector = BM_BIT_TO_SECT(bit);
496 
497 		if (drbd_try_rs_begin_io(mdev, sector)) {
498 			mdev->bm_resync_fo = bit;
499 			goto requeue;
500 		}
501 		mdev->bm_resync_fo = bit + 1;
502 
503 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
504 			drbd_rs_complete_io(mdev, sector);
505 			goto next_sector;
506 		}
507 
508 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
509 		/* try to find some adjacent bits.
510 		 * we stop if we have already the maximum req size.
511 		 *
512 		 * Additionally always align bigger requests, in order to
513 		 * be prepared for all stripe sizes of software RAIDs.
514 		 *
515 		 * we _do_ care about the agreed-upon q->max_segment_size
516 		 * here, as splitting up the requests on the other side is more
517 		 * difficult.  the consequence is, that on lvm and md and other
518 		 * "indirect" devices, this is dead code, since
519 		 * q->max_segment_size will be PAGE_SIZE.
520 		 */
521 		align = 1;
522 		for (;;) {
523 			if (size + BM_BLOCK_SIZE > max_segment_size)
524 				break;
525 
526 			/* Be always aligned */
527 			if (sector & ((1<<(align+3))-1))
528 				break;
529 
530 			/* do not cross extent boundaries */
531 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
532 				break;
533 			/* now, is it actually dirty, after all?
534 			 * caution, drbd_bm_test_bit is tri-state for some
535 			 * obscure reason; ( b == 0 ) would get the out-of-band
536 			 * only accidentally right because of the "oddly sized"
537 			 * adjustment below */
538 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
539 				break;
540 			bit++;
541 			size += BM_BLOCK_SIZE;
542 			if ((BM_BLOCK_SIZE << align) <= size)
543 				align++;
544 			i++;
545 		}
546 		/* if we merged some,
547 		 * reset the offset to start the next drbd_bm_find_next from */
548 		if (size > BM_BLOCK_SIZE)
549 			mdev->bm_resync_fo = bit + 1;
550 #endif
551 
552 		/* adjust very last sectors, in case we are oddly sized */
553 		if (sector + (size>>9) > capacity)
554 			size = (capacity-sector)<<9;
555 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
556 			switch (read_for_csum(mdev, sector, size)) {
557 			case 0: /* Disk failure*/
558 				put_ldev(mdev);
559 				return 0;
560 			case 2: /* Allocation failed */
561 				drbd_rs_complete_io(mdev, sector);
562 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
563 				goto requeue;
564 			/* case 1: everything ok */
565 			}
566 		} else {
567 			inc_rs_pending(mdev);
568 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
569 					       sector, size, ID_SYNCER)) {
570 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
571 				dec_rs_pending(mdev);
572 				put_ldev(mdev);
573 				return 0;
574 			}
575 		}
576 	}
577 
578 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
579 		/* last syncer _request_ was sent,
580 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
581 		 * next sync group will resume), as soon as we receive the last
582 		 * resync data block, and the last bit is cleared.
583 		 * until then resync "work" is "inactive" ...
584 		 */
585 		mdev->resync_work.cb = w_resync_inactive;
586 		put_ldev(mdev);
587 		return 1;
588 	}
589 
590  requeue:
591 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
592 	put_ldev(mdev);
593 	return 1;
594 }
595 
596 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
597 {
598 	int number, i, size;
599 	sector_t sector;
600 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
601 
602 	if (unlikely(cancel))
603 		return 1;
604 
605 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
606 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
607 		return 0;
608 	}
609 
610 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
611 	if (atomic_read(&mdev->rs_pending_cnt) > number)
612 		goto requeue;
613 
614 	number -= atomic_read(&mdev->rs_pending_cnt);
615 
616 	sector = mdev->ov_position;
617 	for (i = 0; i < number; i++) {
618 		if (sector >= capacity) {
619 			mdev->resync_work.cb = w_resync_inactive;
620 			return 1;
621 		}
622 
623 		size = BM_BLOCK_SIZE;
624 
625 		if (drbd_try_rs_begin_io(mdev, sector)) {
626 			mdev->ov_position = sector;
627 			goto requeue;
628 		}
629 
630 		if (sector + (size>>9) > capacity)
631 			size = (capacity-sector)<<9;
632 
633 		inc_rs_pending(mdev);
634 		if (!drbd_send_ov_request(mdev, sector, size)) {
635 			dec_rs_pending(mdev);
636 			return 0;
637 		}
638 		sector += BM_SECT_PER_BIT;
639 	}
640 	mdev->ov_position = sector;
641 
642  requeue:
643 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
644 	return 1;
645 }
646 
647 
648 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
649 {
650 	kfree(w);
651 	ov_oos_print(mdev);
652 	drbd_resync_finished(mdev);
653 
654 	return 1;
655 }
656 
657 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
658 {
659 	kfree(w);
660 
661 	drbd_resync_finished(mdev);
662 
663 	return 1;
664 }
665 
666 int drbd_resync_finished(struct drbd_conf *mdev)
667 {
668 	unsigned long db, dt, dbdt;
669 	unsigned long n_oos;
670 	union drbd_state os, ns;
671 	struct drbd_work *w;
672 	char *khelper_cmd = NULL;
673 
674 	/* Remove all elements from the resync LRU. Since future actions
675 	 * might set bits in the (main) bitmap, then the entries in the
676 	 * resync LRU would be wrong. */
677 	if (drbd_rs_del_all(mdev)) {
678 		/* In case this is not possible now, most probably because
679 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
680 		 * queue (or even the read operations for those packets
681 		 * is not finished by now).   Retry in 100ms. */
682 
683 		drbd_kick_lo(mdev);
684 		__set_current_state(TASK_INTERRUPTIBLE);
685 		schedule_timeout(HZ / 10);
686 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
687 		if (w) {
688 			w->cb = w_resync_finished;
689 			drbd_queue_work(&mdev->data.work, w);
690 			return 1;
691 		}
692 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
693 	}
694 
695 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
696 	if (dt <= 0)
697 		dt = 1;
698 	db = mdev->rs_total;
699 	dbdt = Bit2KB(db/dt);
700 	mdev->rs_paused /= HZ;
701 
702 	if (!get_ldev(mdev))
703 		goto out;
704 
705 	spin_lock_irq(&mdev->req_lock);
706 	os = mdev->state;
707 
708 	/* This protects us against multiple calls (that can happen in the presence
709 	   of application IO), and against connectivity loss just before we arrive here. */
710 	if (os.conn <= C_CONNECTED)
711 		goto out_unlock;
712 
713 	ns = os;
714 	ns.conn = C_CONNECTED;
715 
716 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
717 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
718 	     "Online verify " : "Resync",
719 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
720 
721 	n_oos = drbd_bm_total_weight(mdev);
722 
723 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
724 		if (n_oos) {
725 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
726 			      n_oos, Bit2KB(1));
727 			khelper_cmd = "out-of-sync";
728 		}
729 	} else {
730 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
731 
732 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
733 			khelper_cmd = "after-resync-target";
734 
735 		if (mdev->csums_tfm && mdev->rs_total) {
736 			const unsigned long s = mdev->rs_same_csum;
737 			const unsigned long t = mdev->rs_total;
738 			const int ratio =
739 				(t == 0)     ? 0 :
740 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
741 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
742 			     "transferred %luK total %luK\n",
743 			     ratio,
744 			     Bit2KB(mdev->rs_same_csum),
745 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
746 			     Bit2KB(mdev->rs_total));
747 		}
748 	}
749 
750 	if (mdev->rs_failed) {
751 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
752 
753 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
754 			ns.disk = D_INCONSISTENT;
755 			ns.pdsk = D_UP_TO_DATE;
756 		} else {
757 			ns.disk = D_UP_TO_DATE;
758 			ns.pdsk = D_INCONSISTENT;
759 		}
760 	} else {
761 		ns.disk = D_UP_TO_DATE;
762 		ns.pdsk = D_UP_TO_DATE;
763 
764 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
765 			if (mdev->p_uuid) {
766 				int i;
767 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
768 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
769 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
770 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
771 			} else {
772 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
773 			}
774 		}
775 
776 		drbd_uuid_set_bm(mdev, 0UL);
777 
778 		if (mdev->p_uuid) {
779 			/* Now the two UUID sets are equal, update what we
780 			 * know of the peer. */
781 			int i;
782 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
783 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
784 		}
785 	}
786 
787 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
788 out_unlock:
789 	spin_unlock_irq(&mdev->req_lock);
790 	put_ldev(mdev);
791 out:
792 	mdev->rs_total  = 0;
793 	mdev->rs_failed = 0;
794 	mdev->rs_paused = 0;
795 	mdev->ov_start_sector = 0;
796 
797 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
798 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
799 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
800 	}
801 
802 	if (khelper_cmd)
803 		drbd_khelper(mdev, khelper_cmd);
804 
805 	return 1;
806 }
807 
808 /* helper */
809 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
810 {
811 	if (drbd_bio_has_active_page(e->private_bio)) {
812 		/* This might happen if sendpage() has not finished */
813 		spin_lock_irq(&mdev->req_lock);
814 		list_add_tail(&e->w.list, &mdev->net_ee);
815 		spin_unlock_irq(&mdev->req_lock);
816 	} else
817 		drbd_free_ee(mdev, e);
818 }
819 
820 /**
821  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
822  * @mdev:	DRBD device.
823  * @w:		work object.
824  * @cancel:	The connection will be closed anyways
825  */
826 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
827 {
828 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
829 	int ok;
830 
831 	if (unlikely(cancel)) {
832 		drbd_free_ee(mdev, e);
833 		dec_unacked(mdev);
834 		return 1;
835 	}
836 
837 	if (likely(drbd_bio_uptodate(e->private_bio))) {
838 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
839 	} else {
840 		if (__ratelimit(&drbd_ratelimit_state))
841 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
842 			    (unsigned long long)e->sector);
843 
844 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
845 	}
846 
847 	dec_unacked(mdev);
848 
849 	move_to_net_ee_or_free(mdev, e);
850 
851 	if (unlikely(!ok))
852 		dev_err(DEV, "drbd_send_block() failed\n");
853 	return ok;
854 }
855 
856 /**
857  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
858  * @mdev:	DRBD device.
859  * @w:		work object.
860  * @cancel:	The connection will be closed anyways
861  */
862 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
863 {
864 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
865 	int ok;
866 
867 	if (unlikely(cancel)) {
868 		drbd_free_ee(mdev, e);
869 		dec_unacked(mdev);
870 		return 1;
871 	}
872 
873 	if (get_ldev_if_state(mdev, D_FAILED)) {
874 		drbd_rs_complete_io(mdev, e->sector);
875 		put_ldev(mdev);
876 	}
877 
878 	if (likely(drbd_bio_uptodate(e->private_bio))) {
879 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
880 			inc_rs_pending(mdev);
881 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
882 		} else {
883 			if (__ratelimit(&drbd_ratelimit_state))
884 				dev_err(DEV, "Not sending RSDataReply, "
885 				    "partner DISKLESS!\n");
886 			ok = 1;
887 		}
888 	} else {
889 		if (__ratelimit(&drbd_ratelimit_state))
890 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
891 			    (unsigned long long)e->sector);
892 
893 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
894 
895 		/* update resync data with failure */
896 		drbd_rs_failed_io(mdev, e->sector, e->size);
897 	}
898 
899 	dec_unacked(mdev);
900 
901 	move_to_net_ee_or_free(mdev, e);
902 
903 	if (unlikely(!ok))
904 		dev_err(DEV, "drbd_send_block() failed\n");
905 	return ok;
906 }
907 
908 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
909 {
910 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
911 	struct digest_info *di;
912 	int digest_size;
913 	void *digest = NULL;
914 	int ok, eq = 0;
915 
916 	if (unlikely(cancel)) {
917 		drbd_free_ee(mdev, e);
918 		dec_unacked(mdev);
919 		return 1;
920 	}
921 
922 	drbd_rs_complete_io(mdev, e->sector);
923 
924 	di = (struct digest_info *)(unsigned long)e->block_id;
925 
926 	if (likely(drbd_bio_uptodate(e->private_bio))) {
927 		/* quick hack to try to avoid a race against reconfiguration.
928 		 * a real fix would be much more involved,
929 		 * introducing more locking mechanisms */
930 		if (mdev->csums_tfm) {
931 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
932 			D_ASSERT(digest_size == di->digest_size);
933 			digest = kmalloc(digest_size, GFP_NOIO);
934 		}
935 		if (digest) {
936 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
937 			eq = !memcmp(digest, di->digest, digest_size);
938 			kfree(digest);
939 		}
940 
941 		if (eq) {
942 			drbd_set_in_sync(mdev, e->sector, e->size);
943 			mdev->rs_same_csum++;
944 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
945 		} else {
946 			inc_rs_pending(mdev);
947 			e->block_id = ID_SYNCER;
948 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
949 		}
950 	} else {
951 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
952 		if (__ratelimit(&drbd_ratelimit_state))
953 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
954 	}
955 
956 	dec_unacked(mdev);
957 
958 	kfree(di);
959 
960 	move_to_net_ee_or_free(mdev, e);
961 
962 	if (unlikely(!ok))
963 		dev_err(DEV, "drbd_send_block/ack() failed\n");
964 	return ok;
965 }
966 
967 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
968 {
969 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
970 	int digest_size;
971 	void *digest;
972 	int ok = 1;
973 
974 	if (unlikely(cancel))
975 		goto out;
976 
977 	if (unlikely(!drbd_bio_uptodate(e->private_bio)))
978 		goto out;
979 
980 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
981 	/* FIXME if this allocation fails, online verify will not terminate! */
982 	digest = kmalloc(digest_size, GFP_NOIO);
983 	if (digest) {
984 		drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
985 		inc_rs_pending(mdev);
986 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
987 					     digest, digest_size, P_OV_REPLY);
988 		if (!ok)
989 			dec_rs_pending(mdev);
990 		kfree(digest);
991 	}
992 
993 out:
994 	drbd_free_ee(mdev, e);
995 
996 	dec_unacked(mdev);
997 
998 	return ok;
999 }
1000 
1001 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1002 {
1003 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1004 		mdev->ov_last_oos_size += size>>9;
1005 	} else {
1006 		mdev->ov_last_oos_start = sector;
1007 		mdev->ov_last_oos_size = size>>9;
1008 	}
1009 	drbd_set_out_of_sync(mdev, sector, size);
1010 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1011 }
1012 
1013 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014 {
1015 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1016 	struct digest_info *di;
1017 	int digest_size;
1018 	void *digest;
1019 	int ok, eq = 0;
1020 
1021 	if (unlikely(cancel)) {
1022 		drbd_free_ee(mdev, e);
1023 		dec_unacked(mdev);
1024 		return 1;
1025 	}
1026 
1027 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1028 	 * the resync lru has been cleaned up already */
1029 	drbd_rs_complete_io(mdev, e->sector);
1030 
1031 	di = (struct digest_info *)(unsigned long)e->block_id;
1032 
1033 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1034 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1035 		digest = kmalloc(digest_size, GFP_NOIO);
1036 		if (digest) {
1037 			drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1038 
1039 			D_ASSERT(digest_size == di->digest_size);
1040 			eq = !memcmp(digest, di->digest, digest_size);
1041 			kfree(digest);
1042 		}
1043 	} else {
1044 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1045 		if (__ratelimit(&drbd_ratelimit_state))
1046 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1047 	}
1048 
1049 	dec_unacked(mdev);
1050 
1051 	kfree(di);
1052 
1053 	if (!eq)
1054 		drbd_ov_oos_found(mdev, e->sector, e->size);
1055 	else
1056 		ov_oos_print(mdev);
1057 
1058 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1059 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1060 
1061 	drbd_free_ee(mdev, e);
1062 
1063 	if (--mdev->ov_left == 0) {
1064 		ov_oos_print(mdev);
1065 		drbd_resync_finished(mdev);
1066 	}
1067 
1068 	return ok;
1069 }
1070 
1071 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1072 {
1073 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1074 	complete(&b->done);
1075 	return 1;
1076 }
1077 
1078 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1079 {
1080 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1081 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1082 	int ok = 1;
1083 
1084 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1085 	 * just before it was reassigned and re-queued, so double check that.
1086 	 * actually, this race was harmless, since we only try to send the
1087 	 * barrier packet here, and otherwise do nothing with the object.
1088 	 * but compare with the head of w_clear_epoch */
1089 	spin_lock_irq(&mdev->req_lock);
1090 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1091 		cancel = 1;
1092 	spin_unlock_irq(&mdev->req_lock);
1093 	if (cancel)
1094 		return 1;
1095 
1096 	if (!drbd_get_data_sock(mdev))
1097 		return 0;
1098 	p->barrier = b->br_number;
1099 	/* inc_ap_pending was done where this was queued.
1100 	 * dec_ap_pending will be done in got_BarrierAck
1101 	 * or (on connection loss) in w_clear_epoch.  */
1102 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1103 				(struct p_header *)p, sizeof(*p), 0);
1104 	drbd_put_data_sock(mdev);
1105 
1106 	return ok;
1107 }
1108 
1109 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1110 {
1111 	if (cancel)
1112 		return 1;
1113 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1114 }
1115 
1116 /**
1117  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1118  * @mdev:	DRBD device.
1119  * @w:		work object.
1120  * @cancel:	The connection will be closed anyways
1121  */
1122 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1123 {
1124 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1125 	int ok;
1126 
1127 	if (unlikely(cancel)) {
1128 		req_mod(req, send_canceled);
1129 		return 1;
1130 	}
1131 
1132 	ok = drbd_send_dblock(mdev, req);
1133 	req_mod(req, ok ? handed_over_to_network : send_failed);
1134 
1135 	return ok;
1136 }
1137 
1138 /**
1139  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1140  * @mdev:	DRBD device.
1141  * @w:		work object.
1142  * @cancel:	The connection will be closed anyways
1143  */
1144 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1145 {
1146 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1147 	int ok;
1148 
1149 	if (unlikely(cancel)) {
1150 		req_mod(req, send_canceled);
1151 		return 1;
1152 	}
1153 
1154 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1155 				(unsigned long)req);
1156 
1157 	if (!ok) {
1158 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1159 		 * so this is probably redundant */
1160 		if (mdev->state.conn >= C_CONNECTED)
1161 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1162 	}
1163 	req_mod(req, ok ? handed_over_to_network : send_failed);
1164 
1165 	return ok;
1166 }
1167 
1168 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1169 {
1170 	struct drbd_conf *odev = mdev;
1171 
1172 	while (1) {
1173 		if (odev->sync_conf.after == -1)
1174 			return 1;
1175 		odev = minor_to_mdev(odev->sync_conf.after);
1176 		ERR_IF(!odev) return 1;
1177 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1178 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1179 		    odev->state.aftr_isp || odev->state.peer_isp ||
1180 		    odev->state.user_isp)
1181 			return 0;
1182 	}
1183 }
1184 
1185 /**
1186  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1187  * @mdev:	DRBD device.
1188  *
1189  * Called from process context only (admin command and after_state_ch).
1190  */
1191 static int _drbd_pause_after(struct drbd_conf *mdev)
1192 {
1193 	struct drbd_conf *odev;
1194 	int i, rv = 0;
1195 
1196 	for (i = 0; i < minor_count; i++) {
1197 		odev = minor_to_mdev(i);
1198 		if (!odev)
1199 			continue;
1200 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1201 			continue;
1202 		if (!_drbd_may_sync_now(odev))
1203 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1204 			       != SS_NOTHING_TO_DO);
1205 	}
1206 
1207 	return rv;
1208 }
1209 
1210 /**
1211  * _drbd_resume_next() - Resume resync on all devices that may resync now
1212  * @mdev:	DRBD device.
1213  *
1214  * Called from process context only (admin command and worker).
1215  */
1216 static int _drbd_resume_next(struct drbd_conf *mdev)
1217 {
1218 	struct drbd_conf *odev;
1219 	int i, rv = 0;
1220 
1221 	for (i = 0; i < minor_count; i++) {
1222 		odev = minor_to_mdev(i);
1223 		if (!odev)
1224 			continue;
1225 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1226 			continue;
1227 		if (odev->state.aftr_isp) {
1228 			if (_drbd_may_sync_now(odev))
1229 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1230 							CS_HARD, NULL)
1231 				       != SS_NOTHING_TO_DO) ;
1232 		}
1233 	}
1234 	return rv;
1235 }
1236 
1237 void resume_next_sg(struct drbd_conf *mdev)
1238 {
1239 	write_lock_irq(&global_state_lock);
1240 	_drbd_resume_next(mdev);
1241 	write_unlock_irq(&global_state_lock);
1242 }
1243 
1244 void suspend_other_sg(struct drbd_conf *mdev)
1245 {
1246 	write_lock_irq(&global_state_lock);
1247 	_drbd_pause_after(mdev);
1248 	write_unlock_irq(&global_state_lock);
1249 }
1250 
1251 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1252 {
1253 	struct drbd_conf *odev;
1254 
1255 	if (o_minor == -1)
1256 		return NO_ERROR;
1257 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1258 		return ERR_SYNC_AFTER;
1259 
1260 	/* check for loops */
1261 	odev = minor_to_mdev(o_minor);
1262 	while (1) {
1263 		if (odev == mdev)
1264 			return ERR_SYNC_AFTER_CYCLE;
1265 
1266 		/* dependency chain ends here, no cycles. */
1267 		if (odev->sync_conf.after == -1)
1268 			return NO_ERROR;
1269 
1270 		/* follow the dependency chain */
1271 		odev = minor_to_mdev(odev->sync_conf.after);
1272 	}
1273 }
1274 
1275 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1276 {
1277 	int changes;
1278 	int retcode;
1279 
1280 	write_lock_irq(&global_state_lock);
1281 	retcode = sync_after_error(mdev, na);
1282 	if (retcode == NO_ERROR) {
1283 		mdev->sync_conf.after = na;
1284 		do {
1285 			changes  = _drbd_pause_after(mdev);
1286 			changes |= _drbd_resume_next(mdev);
1287 		} while (changes);
1288 	}
1289 	write_unlock_irq(&global_state_lock);
1290 	return retcode;
1291 }
1292 
1293 /**
1294  * drbd_start_resync() - Start the resync process
1295  * @mdev:	DRBD device.
1296  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1297  *
1298  * This function might bring you directly into one of the
1299  * C_PAUSED_SYNC_* states.
1300  */
1301 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1302 {
1303 	union drbd_state ns;
1304 	int r;
1305 
1306 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1307 		dev_err(DEV, "Resync already running!\n");
1308 		return;
1309 	}
1310 
1311 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1312 	drbd_rs_cancel_all(mdev);
1313 
1314 	if (side == C_SYNC_TARGET) {
1315 		/* Since application IO was locked out during C_WF_BITMAP_T and
1316 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1317 		   we check that we might make the data inconsistent. */
1318 		r = drbd_khelper(mdev, "before-resync-target");
1319 		r = (r >> 8) & 0xff;
1320 		if (r > 0) {
1321 			dev_info(DEV, "before-resync-target handler returned %d, "
1322 			     "dropping connection.\n", r);
1323 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1324 			return;
1325 		}
1326 	}
1327 
1328 	drbd_state_lock(mdev);
1329 
1330 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1331 		drbd_state_unlock(mdev);
1332 		return;
1333 	}
1334 
1335 	if (side == C_SYNC_TARGET) {
1336 		mdev->bm_resync_fo = 0;
1337 	} else /* side == C_SYNC_SOURCE */ {
1338 		u64 uuid;
1339 
1340 		get_random_bytes(&uuid, sizeof(u64));
1341 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1342 		drbd_send_sync_uuid(mdev, uuid);
1343 
1344 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1345 	}
1346 
1347 	write_lock_irq(&global_state_lock);
1348 	ns = mdev->state;
1349 
1350 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1351 
1352 	ns.conn = side;
1353 
1354 	if (side == C_SYNC_TARGET)
1355 		ns.disk = D_INCONSISTENT;
1356 	else /* side == C_SYNC_SOURCE */
1357 		ns.pdsk = D_INCONSISTENT;
1358 
1359 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1360 	ns = mdev->state;
1361 
1362 	if (ns.conn < C_CONNECTED)
1363 		r = SS_UNKNOWN_ERROR;
1364 
1365 	if (r == SS_SUCCESS) {
1366 		mdev->rs_total     =
1367 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1368 		mdev->rs_failed    = 0;
1369 		mdev->rs_paused    = 0;
1370 		mdev->rs_start     =
1371 		mdev->rs_mark_time = jiffies;
1372 		mdev->rs_same_csum = 0;
1373 		_drbd_pause_after(mdev);
1374 	}
1375 	write_unlock_irq(&global_state_lock);
1376 	drbd_state_unlock(mdev);
1377 	put_ldev(mdev);
1378 
1379 	if (r == SS_SUCCESS) {
1380 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1381 		     drbd_conn_str(ns.conn),
1382 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1383 		     (unsigned long) mdev->rs_total);
1384 
1385 		if (mdev->rs_total == 0) {
1386 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1387 			request_ping(mdev);
1388 			__set_current_state(TASK_INTERRUPTIBLE);
1389 			schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1390 			drbd_resync_finished(mdev);
1391 			return;
1392 		}
1393 
1394 		/* ns.conn may already be != mdev->state.conn,
1395 		 * we may have been paused in between, or become paused until
1396 		 * the timer triggers.
1397 		 * No matter, that is handled in resync_timer_fn() */
1398 		if (ns.conn == C_SYNC_TARGET)
1399 			mod_timer(&mdev->resync_timer, jiffies);
1400 
1401 		drbd_md_sync(mdev);
1402 	}
1403 }
1404 
1405 int drbd_worker(struct drbd_thread *thi)
1406 {
1407 	struct drbd_conf *mdev = thi->mdev;
1408 	struct drbd_work *w = NULL;
1409 	LIST_HEAD(work_list);
1410 	int intr = 0, i;
1411 
1412 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1413 
1414 	while (get_t_state(thi) == Running) {
1415 		drbd_thread_current_set_cpu(mdev);
1416 
1417 		if (down_trylock(&mdev->data.work.s)) {
1418 			mutex_lock(&mdev->data.mutex);
1419 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1420 				drbd_tcp_uncork(mdev->data.socket);
1421 			mutex_unlock(&mdev->data.mutex);
1422 
1423 			intr = down_interruptible(&mdev->data.work.s);
1424 
1425 			mutex_lock(&mdev->data.mutex);
1426 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1427 				drbd_tcp_cork(mdev->data.socket);
1428 			mutex_unlock(&mdev->data.mutex);
1429 		}
1430 
1431 		if (intr) {
1432 			D_ASSERT(intr == -EINTR);
1433 			flush_signals(current);
1434 			ERR_IF (get_t_state(thi) == Running)
1435 				continue;
1436 			break;
1437 		}
1438 
1439 		if (get_t_state(thi) != Running)
1440 			break;
1441 		/* With this break, we have done a down() but not consumed
1442 		   the entry from the list. The cleanup code takes care of
1443 		   this...   */
1444 
1445 		w = NULL;
1446 		spin_lock_irq(&mdev->data.work.q_lock);
1447 		ERR_IF(list_empty(&mdev->data.work.q)) {
1448 			/* something terribly wrong in our logic.
1449 			 * we were able to down() the semaphore,
1450 			 * but the list is empty... doh.
1451 			 *
1452 			 * what is the best thing to do now?
1453 			 * try again from scratch, restarting the receiver,
1454 			 * asender, whatnot? could break even more ugly,
1455 			 * e.g. when we are primary, but no good local data.
1456 			 *
1457 			 * I'll try to get away just starting over this loop.
1458 			 */
1459 			spin_unlock_irq(&mdev->data.work.q_lock);
1460 			continue;
1461 		}
1462 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1463 		list_del_init(&w->list);
1464 		spin_unlock_irq(&mdev->data.work.q_lock);
1465 
1466 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1467 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1468 			if (mdev->state.conn >= C_CONNECTED)
1469 				drbd_force_state(mdev,
1470 						NS(conn, C_NETWORK_FAILURE));
1471 		}
1472 	}
1473 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1474 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1475 
1476 	spin_lock_irq(&mdev->data.work.q_lock);
1477 	i = 0;
1478 	while (!list_empty(&mdev->data.work.q)) {
1479 		list_splice_init(&mdev->data.work.q, &work_list);
1480 		spin_unlock_irq(&mdev->data.work.q_lock);
1481 
1482 		while (!list_empty(&work_list)) {
1483 			w = list_entry(work_list.next, struct drbd_work, list);
1484 			list_del_init(&w->list);
1485 			w->cb(mdev, w, 1);
1486 			i++; /* dead debugging code */
1487 		}
1488 
1489 		spin_lock_irq(&mdev->data.work.q_lock);
1490 	}
1491 	sema_init(&mdev->data.work.s, 0);
1492 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1493 	 * but up() ed outside the spinlock, we could get an up() on the
1494 	 * semaphore without corresponding list entry.
1495 	 * So don't do that.
1496 	 */
1497 	spin_unlock_irq(&mdev->data.work.q_lock);
1498 
1499 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1500 	/* _drbd_set_state only uses stop_nowait.
1501 	 * wait here for the Exiting receiver. */
1502 	drbd_thread_stop(&mdev->receiver);
1503 	drbd_mdev_cleanup(mdev);
1504 
1505 	dev_info(DEV, "worker terminated\n");
1506 
1507 	clear_bit(DEVICE_DYING, &mdev->flags);
1508 	clear_bit(CONFIG_PENDING, &mdev->flags);
1509 	wake_up(&mdev->state_wait);
1510 
1511 	return 0;
1512 }
1513