xref: /linux/drivers/block/drbd/drbd_worker.c (revision c4ee0af3fa0dc65f690fc908f02b8355f9576ea0)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42 
43 
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56 
57 
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63 
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69 	struct drbd_md_io *md_io;
70 	struct drbd_conf *mdev;
71 
72 	md_io = (struct drbd_md_io *)bio->bi_private;
73 	mdev = container_of(md_io, struct drbd_conf, md_io);
74 
75 	md_io->error = error;
76 
77 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
78 	 * to timeout on the lower level device, and eventually detach from it.
79 	 * If this io completion runs after that timeout expired, this
80 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
81 	 * During normal operation, this only puts that extra reference
82 	 * down to 1 again.
83 	 * Make sure we first drop the reference, and only then signal
84 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
85 	 * next drbd_md_sync_page_io(), that we trigger the
86 	 * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
87 	 */
88 	drbd_md_put_buffer(mdev);
89 	md_io->done = 1;
90 	wake_up(&mdev->misc_wait);
91 	bio_put(bio);
92 	if (mdev->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
93 		put_ldev(mdev);
94 }
95 
96 /* reads on behalf of the partner,
97  * "submitted" by the receiver
98  */
99 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
100 {
101 	unsigned long flags = 0;
102 	struct drbd_conf *mdev = peer_req->w.mdev;
103 
104 	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
105 	mdev->read_cnt += peer_req->i.size >> 9;
106 	list_del(&peer_req->w.list);
107 	if (list_empty(&mdev->read_ee))
108 		wake_up(&mdev->ee_wait);
109 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
110 		__drbd_chk_io_error(mdev, DRBD_READ_ERROR);
111 	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
112 
113 	drbd_queue_work(&mdev->tconn->sender_work, &peer_req->w);
114 	put_ldev(mdev);
115 }
116 
117 /* writes on behalf of the partner, or resync writes,
118  * "submitted" by the receiver, final stage.  */
119 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
120 {
121 	unsigned long flags = 0;
122 	struct drbd_conf *mdev = peer_req->w.mdev;
123 	struct drbd_interval i;
124 	int do_wake;
125 	u64 block_id;
126 	int do_al_complete_io;
127 
128 	/* after we moved peer_req to done_ee,
129 	 * we may no longer access it,
130 	 * it may be freed/reused already!
131 	 * (as soon as we release the req_lock) */
132 	i = peer_req->i;
133 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 	block_id = peer_req->block_id;
135 
136 	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
137 	mdev->writ_cnt += peer_req->i.size >> 9;
138 	list_move_tail(&peer_req->w.list, &mdev->done_ee);
139 
140 	/*
141 	 * Do not remove from the write_requests tree here: we did not send the
142 	 * Ack yet and did not wake possibly waiting conflicting requests.
143 	 * Removed from the tree from "drbd_process_done_ee" within the
144 	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
145 	 * _drbd_clear_done_ee.
146 	 */
147 
148 	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
149 
150 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
151 		__drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
152 	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
153 
154 	if (block_id == ID_SYNCER)
155 		drbd_rs_complete_io(mdev, i.sector);
156 
157 	if (do_wake)
158 		wake_up(&mdev->ee_wait);
159 
160 	if (do_al_complete_io)
161 		drbd_al_complete_io(mdev, &i);
162 
163 	wake_asender(mdev->tconn);
164 	put_ldev(mdev);
165 }
166 
167 /* writes on behalf of the partner, or resync writes,
168  * "submitted" by the receiver.
169  */
170 void drbd_peer_request_endio(struct bio *bio, int error)
171 {
172 	struct drbd_peer_request *peer_req = bio->bi_private;
173 	struct drbd_conf *mdev = peer_req->w.mdev;
174 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
175 	int is_write = bio_data_dir(bio) == WRITE;
176 
177 	if (error && __ratelimit(&drbd_ratelimit_state))
178 		dev_warn(DEV, "%s: error=%d s=%llus\n",
179 				is_write ? "write" : "read", error,
180 				(unsigned long long)peer_req->i.sector);
181 	if (!error && !uptodate) {
182 		if (__ratelimit(&drbd_ratelimit_state))
183 			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
184 					is_write ? "write" : "read",
185 					(unsigned long long)peer_req->i.sector);
186 		/* strange behavior of some lower level drivers...
187 		 * fail the request by clearing the uptodate flag,
188 		 * but do not return any error?! */
189 		error = -EIO;
190 	}
191 
192 	if (error)
193 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
194 
195 	bio_put(bio); /* no need for the bio anymore */
196 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
197 		if (is_write)
198 			drbd_endio_write_sec_final(peer_req);
199 		else
200 			drbd_endio_read_sec_final(peer_req);
201 	}
202 }
203 
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio, int error)
207 {
208 	unsigned long flags;
209 	struct drbd_request *req = bio->bi_private;
210 	struct drbd_conf *mdev = req->w.mdev;
211 	struct bio_and_error m;
212 	enum drbd_req_event what;
213 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
214 
215 	if (!error && !uptodate) {
216 		dev_warn(DEV, "p %s: setting error to -EIO\n",
217 			 bio_data_dir(bio) == WRITE ? "write" : "read");
218 		/* strange behavior of some lower level drivers...
219 		 * fail the request by clearing the uptodate flag,
220 		 * but do not return any error?! */
221 		error = -EIO;
222 	}
223 
224 
225 	/* If this request was aborted locally before,
226 	 * but now was completed "successfully",
227 	 * chances are that this caused arbitrary data corruption.
228 	 *
229 	 * "aborting" requests, or force-detaching the disk, is intended for
230 	 * completely blocked/hung local backing devices which do no longer
231 	 * complete requests at all, not even do error completions.  In this
232 	 * situation, usually a hard-reset and failover is the only way out.
233 	 *
234 	 * By "aborting", basically faking a local error-completion,
235 	 * we allow for a more graceful swichover by cleanly migrating services.
236 	 * Still the affected node has to be rebooted "soon".
237 	 *
238 	 * By completing these requests, we allow the upper layers to re-use
239 	 * the associated data pages.
240 	 *
241 	 * If later the local backing device "recovers", and now DMAs some data
242 	 * from disk into the original request pages, in the best case it will
243 	 * just put random data into unused pages; but typically it will corrupt
244 	 * meanwhile completely unrelated data, causing all sorts of damage.
245 	 *
246 	 * Which means delayed successful completion,
247 	 * especially for READ requests,
248 	 * is a reason to panic().
249 	 *
250 	 * We assume that a delayed *error* completion is OK,
251 	 * though we still will complain noisily about it.
252 	 */
253 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 		if (__ratelimit(&drbd_ratelimit_state))
255 			dev_emerg(DEV, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
256 
257 		if (!error)
258 			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
259 	}
260 
261 	/* to avoid recursion in __req_mod */
262 	if (unlikely(error)) {
263 		what = (bio_data_dir(bio) == WRITE)
264 			? WRITE_COMPLETED_WITH_ERROR
265 			: (bio_rw(bio) == READ)
266 			  ? READ_COMPLETED_WITH_ERROR
267 			  : READ_AHEAD_COMPLETED_WITH_ERROR;
268 	} else
269 		what = COMPLETED_OK;
270 
271 	bio_put(req->private_bio);
272 	req->private_bio = ERR_PTR(error);
273 
274 	/* not req_mod(), we need irqsave here! */
275 	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
276 	__req_mod(req, what, &m);
277 	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
278 	put_ldev(mdev);
279 
280 	if (m.bio)
281 		complete_master_bio(mdev, &m);
282 }
283 
284 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
285 		  struct drbd_peer_request *peer_req, void *digest)
286 {
287 	struct hash_desc desc;
288 	struct scatterlist sg;
289 	struct page *page = peer_req->pages;
290 	struct page *tmp;
291 	unsigned len;
292 
293 	desc.tfm = tfm;
294 	desc.flags = 0;
295 
296 	sg_init_table(&sg, 1);
297 	crypto_hash_init(&desc);
298 
299 	while ((tmp = page_chain_next(page))) {
300 		/* all but the last page will be fully used */
301 		sg_set_page(&sg, page, PAGE_SIZE, 0);
302 		crypto_hash_update(&desc, &sg, sg.length);
303 		page = tmp;
304 	}
305 	/* and now the last, possibly only partially used page */
306 	len = peer_req->i.size & (PAGE_SIZE - 1);
307 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
308 	crypto_hash_update(&desc, &sg, sg.length);
309 	crypto_hash_final(&desc, digest);
310 }
311 
312 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
313 {
314 	struct hash_desc desc;
315 	struct scatterlist sg;
316 	struct bio_vec *bvec;
317 	int i;
318 
319 	desc.tfm = tfm;
320 	desc.flags = 0;
321 
322 	sg_init_table(&sg, 1);
323 	crypto_hash_init(&desc);
324 
325 	bio_for_each_segment(bvec, bio, i) {
326 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
327 		crypto_hash_update(&desc, &sg, sg.length);
328 	}
329 	crypto_hash_final(&desc, digest);
330 }
331 
332 /* MAYBE merge common code with w_e_end_ov_req */
333 static int w_e_send_csum(struct drbd_work *w, int cancel)
334 {
335 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
336 	struct drbd_conf *mdev = w->mdev;
337 	int digest_size;
338 	void *digest;
339 	int err = 0;
340 
341 	if (unlikely(cancel))
342 		goto out;
343 
344 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
345 		goto out;
346 
347 	digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
348 	digest = kmalloc(digest_size, GFP_NOIO);
349 	if (digest) {
350 		sector_t sector = peer_req->i.sector;
351 		unsigned int size = peer_req->i.size;
352 		drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
353 		/* Free peer_req and pages before send.
354 		 * In case we block on congestion, we could otherwise run into
355 		 * some distributed deadlock, if the other side blocks on
356 		 * congestion as well, because our receiver blocks in
357 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
358 		drbd_free_peer_req(mdev, peer_req);
359 		peer_req = NULL;
360 		inc_rs_pending(mdev);
361 		err = drbd_send_drequest_csum(mdev, sector, size,
362 					      digest, digest_size,
363 					      P_CSUM_RS_REQUEST);
364 		kfree(digest);
365 	} else {
366 		dev_err(DEV, "kmalloc() of digest failed.\n");
367 		err = -ENOMEM;
368 	}
369 
370 out:
371 	if (peer_req)
372 		drbd_free_peer_req(mdev, peer_req);
373 
374 	if (unlikely(err))
375 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
376 	return err;
377 }
378 
379 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
380 
381 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
382 {
383 	struct drbd_peer_request *peer_req;
384 
385 	if (!get_ldev(mdev))
386 		return -EIO;
387 
388 	if (drbd_rs_should_slow_down(mdev, sector))
389 		goto defer;
390 
391 	/* GFP_TRY, because if there is no memory available right now, this may
392 	 * be rescheduled for later. It is "only" background resync, after all. */
393 	peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
394 				       size, GFP_TRY);
395 	if (!peer_req)
396 		goto defer;
397 
398 	peer_req->w.cb = w_e_send_csum;
399 	spin_lock_irq(&mdev->tconn->req_lock);
400 	list_add(&peer_req->w.list, &mdev->read_ee);
401 	spin_unlock_irq(&mdev->tconn->req_lock);
402 
403 	atomic_add(size >> 9, &mdev->rs_sect_ev);
404 	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
405 		return 0;
406 
407 	/* If it failed because of ENOMEM, retry should help.  If it failed
408 	 * because bio_add_page failed (probably broken lower level driver),
409 	 * retry may or may not help.
410 	 * If it does not, you may need to force disconnect. */
411 	spin_lock_irq(&mdev->tconn->req_lock);
412 	list_del(&peer_req->w.list);
413 	spin_unlock_irq(&mdev->tconn->req_lock);
414 
415 	drbd_free_peer_req(mdev, peer_req);
416 defer:
417 	put_ldev(mdev);
418 	return -EAGAIN;
419 }
420 
421 int w_resync_timer(struct drbd_work *w, int cancel)
422 {
423 	struct drbd_conf *mdev = w->mdev;
424 	switch (mdev->state.conn) {
425 	case C_VERIFY_S:
426 		w_make_ov_request(w, cancel);
427 		break;
428 	case C_SYNC_TARGET:
429 		w_make_resync_request(w, cancel);
430 		break;
431 	}
432 
433 	return 0;
434 }
435 
436 void resync_timer_fn(unsigned long data)
437 {
438 	struct drbd_conf *mdev = (struct drbd_conf *) data;
439 
440 	if (list_empty(&mdev->resync_work.list))
441 		drbd_queue_work(&mdev->tconn->sender_work, &mdev->resync_work);
442 }
443 
444 static void fifo_set(struct fifo_buffer *fb, int value)
445 {
446 	int i;
447 
448 	for (i = 0; i < fb->size; i++)
449 		fb->values[i] = value;
450 }
451 
452 static int fifo_push(struct fifo_buffer *fb, int value)
453 {
454 	int ov;
455 
456 	ov = fb->values[fb->head_index];
457 	fb->values[fb->head_index++] = value;
458 
459 	if (fb->head_index >= fb->size)
460 		fb->head_index = 0;
461 
462 	return ov;
463 }
464 
465 static void fifo_add_val(struct fifo_buffer *fb, int value)
466 {
467 	int i;
468 
469 	for (i = 0; i < fb->size; i++)
470 		fb->values[i] += value;
471 }
472 
473 struct fifo_buffer *fifo_alloc(int fifo_size)
474 {
475 	struct fifo_buffer *fb;
476 
477 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
478 	if (!fb)
479 		return NULL;
480 
481 	fb->head_index = 0;
482 	fb->size = fifo_size;
483 	fb->total = 0;
484 
485 	return fb;
486 }
487 
488 static int drbd_rs_controller(struct drbd_conf *mdev)
489 {
490 	struct disk_conf *dc;
491 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
492 	unsigned int want;     /* The number of sectors we want in the proxy */
493 	int req_sect; /* Number of sectors to request in this turn */
494 	int correction; /* Number of sectors more we need in the proxy*/
495 	int cps; /* correction per invocation of drbd_rs_controller() */
496 	int steps; /* Number of time steps to plan ahead */
497 	int curr_corr;
498 	int max_sect;
499 	struct fifo_buffer *plan;
500 
501 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
502 	mdev->rs_in_flight -= sect_in;
503 
504 	dc = rcu_dereference(mdev->ldev->disk_conf);
505 	plan = rcu_dereference(mdev->rs_plan_s);
506 
507 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
508 
509 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
510 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
511 	} else { /* normal path */
512 		want = dc->c_fill_target ? dc->c_fill_target :
513 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
514 	}
515 
516 	correction = want - mdev->rs_in_flight - plan->total;
517 
518 	/* Plan ahead */
519 	cps = correction / steps;
520 	fifo_add_val(plan, cps);
521 	plan->total += cps * steps;
522 
523 	/* What we do in this step */
524 	curr_corr = fifo_push(plan, 0);
525 	plan->total -= curr_corr;
526 
527 	req_sect = sect_in + curr_corr;
528 	if (req_sect < 0)
529 		req_sect = 0;
530 
531 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
532 	if (req_sect > max_sect)
533 		req_sect = max_sect;
534 
535 	/*
536 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
537 		 sect_in, mdev->rs_in_flight, want, correction,
538 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
539 	*/
540 
541 	return req_sect;
542 }
543 
544 static int drbd_rs_number_requests(struct drbd_conf *mdev)
545 {
546 	int number;
547 
548 	rcu_read_lock();
549 	if (rcu_dereference(mdev->rs_plan_s)->size) {
550 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
551 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
552 	} else {
553 		mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
554 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
555 	}
556 	rcu_read_unlock();
557 
558 	/* ignore the amount of pending requests, the resync controller should
559 	 * throttle down to incoming reply rate soon enough anyways. */
560 	return number;
561 }
562 
563 int w_make_resync_request(struct drbd_work *w, int cancel)
564 {
565 	struct drbd_conf *mdev = w->mdev;
566 	unsigned long bit;
567 	sector_t sector;
568 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
569 	int max_bio_size;
570 	int number, rollback_i, size;
571 	int align, queued, sndbuf;
572 	int i = 0;
573 
574 	if (unlikely(cancel))
575 		return 0;
576 
577 	if (mdev->rs_total == 0) {
578 		/* empty resync? */
579 		drbd_resync_finished(mdev);
580 		return 0;
581 	}
582 
583 	if (!get_ldev(mdev)) {
584 		/* Since we only need to access mdev->rsync a
585 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
586 		   to continue resync with a broken disk makes no sense at
587 		   all */
588 		dev_err(DEV, "Disk broke down during resync!\n");
589 		return 0;
590 	}
591 
592 	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
593 	number = drbd_rs_number_requests(mdev);
594 	if (number == 0)
595 		goto requeue;
596 
597 	for (i = 0; i < number; i++) {
598 		/* Stop generating RS requests, when half of the send buffer is filled */
599 		mutex_lock(&mdev->tconn->data.mutex);
600 		if (mdev->tconn->data.socket) {
601 			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
602 			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
603 		} else {
604 			queued = 1;
605 			sndbuf = 0;
606 		}
607 		mutex_unlock(&mdev->tconn->data.mutex);
608 		if (queued > sndbuf / 2)
609 			goto requeue;
610 
611 next_sector:
612 		size = BM_BLOCK_SIZE;
613 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
614 
615 		if (bit == DRBD_END_OF_BITMAP) {
616 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
617 			put_ldev(mdev);
618 			return 0;
619 		}
620 
621 		sector = BM_BIT_TO_SECT(bit);
622 
623 		if (drbd_rs_should_slow_down(mdev, sector) ||
624 		    drbd_try_rs_begin_io(mdev, sector)) {
625 			mdev->bm_resync_fo = bit;
626 			goto requeue;
627 		}
628 		mdev->bm_resync_fo = bit + 1;
629 
630 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
631 			drbd_rs_complete_io(mdev, sector);
632 			goto next_sector;
633 		}
634 
635 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
636 		/* try to find some adjacent bits.
637 		 * we stop if we have already the maximum req size.
638 		 *
639 		 * Additionally always align bigger requests, in order to
640 		 * be prepared for all stripe sizes of software RAIDs.
641 		 */
642 		align = 1;
643 		rollback_i = i;
644 		for (;;) {
645 			if (size + BM_BLOCK_SIZE > max_bio_size)
646 				break;
647 
648 			/* Be always aligned */
649 			if (sector & ((1<<(align+3))-1))
650 				break;
651 
652 			/* do not cross extent boundaries */
653 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
654 				break;
655 			/* now, is it actually dirty, after all?
656 			 * caution, drbd_bm_test_bit is tri-state for some
657 			 * obscure reason; ( b == 0 ) would get the out-of-band
658 			 * only accidentally right because of the "oddly sized"
659 			 * adjustment below */
660 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
661 				break;
662 			bit++;
663 			size += BM_BLOCK_SIZE;
664 			if ((BM_BLOCK_SIZE << align) <= size)
665 				align++;
666 			i++;
667 		}
668 		/* if we merged some,
669 		 * reset the offset to start the next drbd_bm_find_next from */
670 		if (size > BM_BLOCK_SIZE)
671 			mdev->bm_resync_fo = bit + 1;
672 #endif
673 
674 		/* adjust very last sectors, in case we are oddly sized */
675 		if (sector + (size>>9) > capacity)
676 			size = (capacity-sector)<<9;
677 		if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
678 			switch (read_for_csum(mdev, sector, size)) {
679 			case -EIO: /* Disk failure */
680 				put_ldev(mdev);
681 				return -EIO;
682 			case -EAGAIN: /* allocation failed, or ldev busy */
683 				drbd_rs_complete_io(mdev, sector);
684 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
685 				i = rollback_i;
686 				goto requeue;
687 			case 0:
688 				/* everything ok */
689 				break;
690 			default:
691 				BUG();
692 			}
693 		} else {
694 			int err;
695 
696 			inc_rs_pending(mdev);
697 			err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
698 						 sector, size, ID_SYNCER);
699 			if (err) {
700 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
701 				dec_rs_pending(mdev);
702 				put_ldev(mdev);
703 				return err;
704 			}
705 		}
706 	}
707 
708 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
709 		/* last syncer _request_ was sent,
710 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
711 		 * next sync group will resume), as soon as we receive the last
712 		 * resync data block, and the last bit is cleared.
713 		 * until then resync "work" is "inactive" ...
714 		 */
715 		put_ldev(mdev);
716 		return 0;
717 	}
718 
719  requeue:
720 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
721 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
722 	put_ldev(mdev);
723 	return 0;
724 }
725 
726 static int w_make_ov_request(struct drbd_work *w, int cancel)
727 {
728 	struct drbd_conf *mdev = w->mdev;
729 	int number, i, size;
730 	sector_t sector;
731 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
732 	bool stop_sector_reached = false;
733 
734 	if (unlikely(cancel))
735 		return 1;
736 
737 	number = drbd_rs_number_requests(mdev);
738 
739 	sector = mdev->ov_position;
740 	for (i = 0; i < number; i++) {
741 		if (sector >= capacity)
742 			return 1;
743 
744 		/* We check for "finished" only in the reply path:
745 		 * w_e_end_ov_reply().
746 		 * We need to send at least one request out. */
747 		stop_sector_reached = i > 0
748 			&& verify_can_do_stop_sector(mdev)
749 			&& sector >= mdev->ov_stop_sector;
750 		if (stop_sector_reached)
751 			break;
752 
753 		size = BM_BLOCK_SIZE;
754 
755 		if (drbd_rs_should_slow_down(mdev, sector) ||
756 		    drbd_try_rs_begin_io(mdev, sector)) {
757 			mdev->ov_position = sector;
758 			goto requeue;
759 		}
760 
761 		if (sector + (size>>9) > capacity)
762 			size = (capacity-sector)<<9;
763 
764 		inc_rs_pending(mdev);
765 		if (drbd_send_ov_request(mdev, sector, size)) {
766 			dec_rs_pending(mdev);
767 			return 0;
768 		}
769 		sector += BM_SECT_PER_BIT;
770 	}
771 	mdev->ov_position = sector;
772 
773  requeue:
774 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
775 	if (i == 0 || !stop_sector_reached)
776 		mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
777 	return 1;
778 }
779 
780 int w_ov_finished(struct drbd_work *w, int cancel)
781 {
782 	struct drbd_conf *mdev = w->mdev;
783 	kfree(w);
784 	ov_out_of_sync_print(mdev);
785 	drbd_resync_finished(mdev);
786 
787 	return 0;
788 }
789 
790 static int w_resync_finished(struct drbd_work *w, int cancel)
791 {
792 	struct drbd_conf *mdev = w->mdev;
793 	kfree(w);
794 
795 	drbd_resync_finished(mdev);
796 
797 	return 0;
798 }
799 
800 static void ping_peer(struct drbd_conf *mdev)
801 {
802 	struct drbd_tconn *tconn = mdev->tconn;
803 
804 	clear_bit(GOT_PING_ACK, &tconn->flags);
805 	request_ping(tconn);
806 	wait_event(tconn->ping_wait,
807 		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
808 }
809 
810 int drbd_resync_finished(struct drbd_conf *mdev)
811 {
812 	unsigned long db, dt, dbdt;
813 	unsigned long n_oos;
814 	union drbd_state os, ns;
815 	struct drbd_work *w;
816 	char *khelper_cmd = NULL;
817 	int verify_done = 0;
818 
819 	/* Remove all elements from the resync LRU. Since future actions
820 	 * might set bits in the (main) bitmap, then the entries in the
821 	 * resync LRU would be wrong. */
822 	if (drbd_rs_del_all(mdev)) {
823 		/* In case this is not possible now, most probably because
824 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
825 		 * queue (or even the read operations for those packets
826 		 * is not finished by now).   Retry in 100ms. */
827 
828 		schedule_timeout_interruptible(HZ / 10);
829 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
830 		if (w) {
831 			w->cb = w_resync_finished;
832 			w->mdev = mdev;
833 			drbd_queue_work(&mdev->tconn->sender_work, w);
834 			return 1;
835 		}
836 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
837 	}
838 
839 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
840 	if (dt <= 0)
841 		dt = 1;
842 
843 	db = mdev->rs_total;
844 	/* adjust for verify start and stop sectors, respective reached position */
845 	if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
846 		db -= mdev->ov_left;
847 
848 	dbdt = Bit2KB(db/dt);
849 	mdev->rs_paused /= HZ;
850 
851 	if (!get_ldev(mdev))
852 		goto out;
853 
854 	ping_peer(mdev);
855 
856 	spin_lock_irq(&mdev->tconn->req_lock);
857 	os = drbd_read_state(mdev);
858 
859 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
860 
861 	/* This protects us against multiple calls (that can happen in the presence
862 	   of application IO), and against connectivity loss just before we arrive here. */
863 	if (os.conn <= C_CONNECTED)
864 		goto out_unlock;
865 
866 	ns = os;
867 	ns.conn = C_CONNECTED;
868 
869 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
870 	     verify_done ? "Online verify" : "Resync",
871 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
872 
873 	n_oos = drbd_bm_total_weight(mdev);
874 
875 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
876 		if (n_oos) {
877 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
878 			      n_oos, Bit2KB(1));
879 			khelper_cmd = "out-of-sync";
880 		}
881 	} else {
882 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
883 
884 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
885 			khelper_cmd = "after-resync-target";
886 
887 		if (mdev->tconn->csums_tfm && mdev->rs_total) {
888 			const unsigned long s = mdev->rs_same_csum;
889 			const unsigned long t = mdev->rs_total;
890 			const int ratio =
891 				(t == 0)     ? 0 :
892 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
893 			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
894 			     "transferred %luK total %luK\n",
895 			     ratio,
896 			     Bit2KB(mdev->rs_same_csum),
897 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
898 			     Bit2KB(mdev->rs_total));
899 		}
900 	}
901 
902 	if (mdev->rs_failed) {
903 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
904 
905 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
906 			ns.disk = D_INCONSISTENT;
907 			ns.pdsk = D_UP_TO_DATE;
908 		} else {
909 			ns.disk = D_UP_TO_DATE;
910 			ns.pdsk = D_INCONSISTENT;
911 		}
912 	} else {
913 		ns.disk = D_UP_TO_DATE;
914 		ns.pdsk = D_UP_TO_DATE;
915 
916 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
917 			if (mdev->p_uuid) {
918 				int i;
919 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
920 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
921 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
922 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
923 			} else {
924 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
925 			}
926 		}
927 
928 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
929 			/* for verify runs, we don't update uuids here,
930 			 * so there would be nothing to report. */
931 			drbd_uuid_set_bm(mdev, 0UL);
932 			drbd_print_uuids(mdev, "updated UUIDs");
933 			if (mdev->p_uuid) {
934 				/* Now the two UUID sets are equal, update what we
935 				 * know of the peer. */
936 				int i;
937 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
938 					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
939 			}
940 		}
941 	}
942 
943 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
944 out_unlock:
945 	spin_unlock_irq(&mdev->tconn->req_lock);
946 	put_ldev(mdev);
947 out:
948 	mdev->rs_total  = 0;
949 	mdev->rs_failed = 0;
950 	mdev->rs_paused = 0;
951 
952 	/* reset start sector, if we reached end of device */
953 	if (verify_done && mdev->ov_left == 0)
954 		mdev->ov_start_sector = 0;
955 
956 	drbd_md_sync(mdev);
957 
958 	if (khelper_cmd)
959 		drbd_khelper(mdev, khelper_cmd);
960 
961 	return 1;
962 }
963 
964 /* helper */
965 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
966 {
967 	if (drbd_peer_req_has_active_page(peer_req)) {
968 		/* This might happen if sendpage() has not finished */
969 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
970 		atomic_add(i, &mdev->pp_in_use_by_net);
971 		atomic_sub(i, &mdev->pp_in_use);
972 		spin_lock_irq(&mdev->tconn->req_lock);
973 		list_add_tail(&peer_req->w.list, &mdev->net_ee);
974 		spin_unlock_irq(&mdev->tconn->req_lock);
975 		wake_up(&drbd_pp_wait);
976 	} else
977 		drbd_free_peer_req(mdev, peer_req);
978 }
979 
980 /**
981  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
982  * @mdev:	DRBD device.
983  * @w:		work object.
984  * @cancel:	The connection will be closed anyways
985  */
986 int w_e_end_data_req(struct drbd_work *w, int cancel)
987 {
988 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
989 	struct drbd_conf *mdev = w->mdev;
990 	int err;
991 
992 	if (unlikely(cancel)) {
993 		drbd_free_peer_req(mdev, peer_req);
994 		dec_unacked(mdev);
995 		return 0;
996 	}
997 
998 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
999 		err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
1000 	} else {
1001 		if (__ratelimit(&drbd_ratelimit_state))
1002 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
1003 			    (unsigned long long)peer_req->i.sector);
1004 
1005 		err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
1006 	}
1007 
1008 	dec_unacked(mdev);
1009 
1010 	move_to_net_ee_or_free(mdev, peer_req);
1011 
1012 	if (unlikely(err))
1013 		dev_err(DEV, "drbd_send_block() failed\n");
1014 	return err;
1015 }
1016 
1017 /**
1018  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1019  * @mdev:	DRBD device.
1020  * @w:		work object.
1021  * @cancel:	The connection will be closed anyways
1022  */
1023 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1024 {
1025 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1026 	struct drbd_conf *mdev = w->mdev;
1027 	int err;
1028 
1029 	if (unlikely(cancel)) {
1030 		drbd_free_peer_req(mdev, peer_req);
1031 		dec_unacked(mdev);
1032 		return 0;
1033 	}
1034 
1035 	if (get_ldev_if_state(mdev, D_FAILED)) {
1036 		drbd_rs_complete_io(mdev, peer_req->i.sector);
1037 		put_ldev(mdev);
1038 	}
1039 
1040 	if (mdev->state.conn == C_AHEAD) {
1041 		err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1042 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1043 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1044 			inc_rs_pending(mdev);
1045 			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1046 		} else {
1047 			if (__ratelimit(&drbd_ratelimit_state))
1048 				dev_err(DEV, "Not sending RSDataReply, "
1049 				    "partner DISKLESS!\n");
1050 			err = 0;
1051 		}
1052 	} else {
1053 		if (__ratelimit(&drbd_ratelimit_state))
1054 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1055 			    (unsigned long long)peer_req->i.sector);
1056 
1057 		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1058 
1059 		/* update resync data with failure */
1060 		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1061 	}
1062 
1063 	dec_unacked(mdev);
1064 
1065 	move_to_net_ee_or_free(mdev, peer_req);
1066 
1067 	if (unlikely(err))
1068 		dev_err(DEV, "drbd_send_block() failed\n");
1069 	return err;
1070 }
1071 
1072 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1073 {
1074 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1075 	struct drbd_conf *mdev = w->mdev;
1076 	struct digest_info *di;
1077 	int digest_size;
1078 	void *digest = NULL;
1079 	int err, eq = 0;
1080 
1081 	if (unlikely(cancel)) {
1082 		drbd_free_peer_req(mdev, peer_req);
1083 		dec_unacked(mdev);
1084 		return 0;
1085 	}
1086 
1087 	if (get_ldev(mdev)) {
1088 		drbd_rs_complete_io(mdev, peer_req->i.sector);
1089 		put_ldev(mdev);
1090 	}
1091 
1092 	di = peer_req->digest;
1093 
1094 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1095 		/* quick hack to try to avoid a race against reconfiguration.
1096 		 * a real fix would be much more involved,
1097 		 * introducing more locking mechanisms */
1098 		if (mdev->tconn->csums_tfm) {
1099 			digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1100 			D_ASSERT(digest_size == di->digest_size);
1101 			digest = kmalloc(digest_size, GFP_NOIO);
1102 		}
1103 		if (digest) {
1104 			drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1105 			eq = !memcmp(digest, di->digest, digest_size);
1106 			kfree(digest);
1107 		}
1108 
1109 		if (eq) {
1110 			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1111 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1112 			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1113 			err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1114 		} else {
1115 			inc_rs_pending(mdev);
1116 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1117 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1118 			kfree(di);
1119 			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1120 		}
1121 	} else {
1122 		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1123 		if (__ratelimit(&drbd_ratelimit_state))
1124 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1125 	}
1126 
1127 	dec_unacked(mdev);
1128 	move_to_net_ee_or_free(mdev, peer_req);
1129 
1130 	if (unlikely(err))
1131 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1132 	return err;
1133 }
1134 
1135 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1136 {
1137 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1138 	struct drbd_conf *mdev = w->mdev;
1139 	sector_t sector = peer_req->i.sector;
1140 	unsigned int size = peer_req->i.size;
1141 	int digest_size;
1142 	void *digest;
1143 	int err = 0;
1144 
1145 	if (unlikely(cancel))
1146 		goto out;
1147 
1148 	digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1149 	digest = kmalloc(digest_size, GFP_NOIO);
1150 	if (!digest) {
1151 		err = 1;	/* terminate the connection in case the allocation failed */
1152 		goto out;
1153 	}
1154 
1155 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1156 		drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1157 	else
1158 		memset(digest, 0, digest_size);
1159 
1160 	/* Free e and pages before send.
1161 	 * In case we block on congestion, we could otherwise run into
1162 	 * some distributed deadlock, if the other side blocks on
1163 	 * congestion as well, because our receiver blocks in
1164 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1165 	drbd_free_peer_req(mdev, peer_req);
1166 	peer_req = NULL;
1167 	inc_rs_pending(mdev);
1168 	err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1169 	if (err)
1170 		dec_rs_pending(mdev);
1171 	kfree(digest);
1172 
1173 out:
1174 	if (peer_req)
1175 		drbd_free_peer_req(mdev, peer_req);
1176 	dec_unacked(mdev);
1177 	return err;
1178 }
1179 
1180 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1181 {
1182 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1183 		mdev->ov_last_oos_size += size>>9;
1184 	} else {
1185 		mdev->ov_last_oos_start = sector;
1186 		mdev->ov_last_oos_size = size>>9;
1187 	}
1188 	drbd_set_out_of_sync(mdev, sector, size);
1189 }
1190 
1191 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1192 {
1193 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1194 	struct drbd_conf *mdev = w->mdev;
1195 	struct digest_info *di;
1196 	void *digest;
1197 	sector_t sector = peer_req->i.sector;
1198 	unsigned int size = peer_req->i.size;
1199 	int digest_size;
1200 	int err, eq = 0;
1201 	bool stop_sector_reached = false;
1202 
1203 	if (unlikely(cancel)) {
1204 		drbd_free_peer_req(mdev, peer_req);
1205 		dec_unacked(mdev);
1206 		return 0;
1207 	}
1208 
1209 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1210 	 * the resync lru has been cleaned up already */
1211 	if (get_ldev(mdev)) {
1212 		drbd_rs_complete_io(mdev, peer_req->i.sector);
1213 		put_ldev(mdev);
1214 	}
1215 
1216 	di = peer_req->digest;
1217 
1218 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1219 		digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1220 		digest = kmalloc(digest_size, GFP_NOIO);
1221 		if (digest) {
1222 			drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1223 
1224 			D_ASSERT(digest_size == di->digest_size);
1225 			eq = !memcmp(digest, di->digest, digest_size);
1226 			kfree(digest);
1227 		}
1228 	}
1229 
1230 	/* Free peer_req and pages before send.
1231 	 * In case we block on congestion, we could otherwise run into
1232 	 * some distributed deadlock, if the other side blocks on
1233 	 * congestion as well, because our receiver blocks in
1234 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1235 	drbd_free_peer_req(mdev, peer_req);
1236 	if (!eq)
1237 		drbd_ov_out_of_sync_found(mdev, sector, size);
1238 	else
1239 		ov_out_of_sync_print(mdev);
1240 
1241 	err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1242 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1243 
1244 	dec_unacked(mdev);
1245 
1246 	--mdev->ov_left;
1247 
1248 	/* let's advance progress step marks only for every other megabyte */
1249 	if ((mdev->ov_left & 0x200) == 0x200)
1250 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1251 
1252 	stop_sector_reached = verify_can_do_stop_sector(mdev) &&
1253 		(sector + (size>>9)) >= mdev->ov_stop_sector;
1254 
1255 	if (mdev->ov_left == 0 || stop_sector_reached) {
1256 		ov_out_of_sync_print(mdev);
1257 		drbd_resync_finished(mdev);
1258 	}
1259 
1260 	return err;
1261 }
1262 
1263 int w_prev_work_done(struct drbd_work *w, int cancel)
1264 {
1265 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1266 
1267 	complete(&b->done);
1268 	return 0;
1269 }
1270 
1271 /* FIXME
1272  * We need to track the number of pending barrier acks,
1273  * and to be able to wait for them.
1274  * See also comment in drbd_adm_attach before drbd_suspend_io.
1275  */
1276 int drbd_send_barrier(struct drbd_tconn *tconn)
1277 {
1278 	struct p_barrier *p;
1279 	struct drbd_socket *sock;
1280 
1281 	sock = &tconn->data;
1282 	p = conn_prepare_command(tconn, sock);
1283 	if (!p)
1284 		return -EIO;
1285 	p->barrier = tconn->send.current_epoch_nr;
1286 	p->pad = 0;
1287 	tconn->send.current_epoch_writes = 0;
1288 
1289 	return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1290 }
1291 
1292 int w_send_write_hint(struct drbd_work *w, int cancel)
1293 {
1294 	struct drbd_conf *mdev = w->mdev;
1295 	struct drbd_socket *sock;
1296 
1297 	if (cancel)
1298 		return 0;
1299 	sock = &mdev->tconn->data;
1300 	if (!drbd_prepare_command(mdev, sock))
1301 		return -EIO;
1302 	return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1303 }
1304 
1305 static void re_init_if_first_write(struct drbd_tconn *tconn, unsigned int epoch)
1306 {
1307 	if (!tconn->send.seen_any_write_yet) {
1308 		tconn->send.seen_any_write_yet = true;
1309 		tconn->send.current_epoch_nr = epoch;
1310 		tconn->send.current_epoch_writes = 0;
1311 	}
1312 }
1313 
1314 static void maybe_send_barrier(struct drbd_tconn *tconn, unsigned int epoch)
1315 {
1316 	/* re-init if first write on this connection */
1317 	if (!tconn->send.seen_any_write_yet)
1318 		return;
1319 	if (tconn->send.current_epoch_nr != epoch) {
1320 		if (tconn->send.current_epoch_writes)
1321 			drbd_send_barrier(tconn);
1322 		tconn->send.current_epoch_nr = epoch;
1323 	}
1324 }
1325 
1326 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1327 {
1328 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1329 	struct drbd_conf *mdev = w->mdev;
1330 	struct drbd_tconn *tconn = mdev->tconn;
1331 	int err;
1332 
1333 	if (unlikely(cancel)) {
1334 		req_mod(req, SEND_CANCELED);
1335 		return 0;
1336 	}
1337 
1338 	/* this time, no tconn->send.current_epoch_writes++;
1339 	 * If it was sent, it was the closing barrier for the last
1340 	 * replicated epoch, before we went into AHEAD mode.
1341 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1342 	maybe_send_barrier(tconn, req->epoch);
1343 
1344 	err = drbd_send_out_of_sync(mdev, req);
1345 	req_mod(req, OOS_HANDED_TO_NETWORK);
1346 
1347 	return err;
1348 }
1349 
1350 /**
1351  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1352  * @mdev:	DRBD device.
1353  * @w:		work object.
1354  * @cancel:	The connection will be closed anyways
1355  */
1356 int w_send_dblock(struct drbd_work *w, int cancel)
1357 {
1358 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1359 	struct drbd_conf *mdev = w->mdev;
1360 	struct drbd_tconn *tconn = mdev->tconn;
1361 	int err;
1362 
1363 	if (unlikely(cancel)) {
1364 		req_mod(req, SEND_CANCELED);
1365 		return 0;
1366 	}
1367 
1368 	re_init_if_first_write(tconn, req->epoch);
1369 	maybe_send_barrier(tconn, req->epoch);
1370 	tconn->send.current_epoch_writes++;
1371 
1372 	err = drbd_send_dblock(mdev, req);
1373 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1374 
1375 	return err;
1376 }
1377 
1378 /**
1379  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1380  * @mdev:	DRBD device.
1381  * @w:		work object.
1382  * @cancel:	The connection will be closed anyways
1383  */
1384 int w_send_read_req(struct drbd_work *w, int cancel)
1385 {
1386 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1387 	struct drbd_conf *mdev = w->mdev;
1388 	struct drbd_tconn *tconn = mdev->tconn;
1389 	int err;
1390 
1391 	if (unlikely(cancel)) {
1392 		req_mod(req, SEND_CANCELED);
1393 		return 0;
1394 	}
1395 
1396 	/* Even read requests may close a write epoch,
1397 	 * if there was any yet. */
1398 	maybe_send_barrier(tconn, req->epoch);
1399 
1400 	err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1401 				 (unsigned long)req);
1402 
1403 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1404 
1405 	return err;
1406 }
1407 
1408 int w_restart_disk_io(struct drbd_work *w, int cancel)
1409 {
1410 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1411 	struct drbd_conf *mdev = w->mdev;
1412 
1413 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1414 		drbd_al_begin_io(mdev, &req->i, false);
1415 
1416 	drbd_req_make_private_bio(req, req->master_bio);
1417 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1418 	generic_make_request(req->private_bio);
1419 
1420 	return 0;
1421 }
1422 
1423 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1424 {
1425 	struct drbd_conf *odev = mdev;
1426 	int resync_after;
1427 
1428 	while (1) {
1429 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1430 			return 1;
1431 		rcu_read_lock();
1432 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1433 		rcu_read_unlock();
1434 		if (resync_after == -1)
1435 			return 1;
1436 		odev = minor_to_mdev(resync_after);
1437 		if (!odev)
1438 			return 1;
1439 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1440 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1441 		    odev->state.aftr_isp || odev->state.peer_isp ||
1442 		    odev->state.user_isp)
1443 			return 0;
1444 	}
1445 }
1446 
1447 /**
1448  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1449  * @mdev:	DRBD device.
1450  *
1451  * Called from process context only (admin command and after_state_ch).
1452  */
1453 static int _drbd_pause_after(struct drbd_conf *mdev)
1454 {
1455 	struct drbd_conf *odev;
1456 	int i, rv = 0;
1457 
1458 	rcu_read_lock();
1459 	idr_for_each_entry(&minors, odev, i) {
1460 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1461 			continue;
1462 		if (!_drbd_may_sync_now(odev))
1463 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1464 			       != SS_NOTHING_TO_DO);
1465 	}
1466 	rcu_read_unlock();
1467 
1468 	return rv;
1469 }
1470 
1471 /**
1472  * _drbd_resume_next() - Resume resync on all devices that may resync now
1473  * @mdev:	DRBD device.
1474  *
1475  * Called from process context only (admin command and worker).
1476  */
1477 static int _drbd_resume_next(struct drbd_conf *mdev)
1478 {
1479 	struct drbd_conf *odev;
1480 	int i, rv = 0;
1481 
1482 	rcu_read_lock();
1483 	idr_for_each_entry(&minors, odev, i) {
1484 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1485 			continue;
1486 		if (odev->state.aftr_isp) {
1487 			if (_drbd_may_sync_now(odev))
1488 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1489 							CS_HARD, NULL)
1490 				       != SS_NOTHING_TO_DO) ;
1491 		}
1492 	}
1493 	rcu_read_unlock();
1494 	return rv;
1495 }
1496 
1497 void resume_next_sg(struct drbd_conf *mdev)
1498 {
1499 	write_lock_irq(&global_state_lock);
1500 	_drbd_resume_next(mdev);
1501 	write_unlock_irq(&global_state_lock);
1502 }
1503 
1504 void suspend_other_sg(struct drbd_conf *mdev)
1505 {
1506 	write_lock_irq(&global_state_lock);
1507 	_drbd_pause_after(mdev);
1508 	write_unlock_irq(&global_state_lock);
1509 }
1510 
1511 /* caller must hold global_state_lock */
1512 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1513 {
1514 	struct drbd_conf *odev;
1515 	int resync_after;
1516 
1517 	if (o_minor == -1)
1518 		return NO_ERROR;
1519 	if (o_minor < -1 || o_minor > MINORMASK)
1520 		return ERR_RESYNC_AFTER;
1521 
1522 	/* check for loops */
1523 	odev = minor_to_mdev(o_minor);
1524 	while (1) {
1525 		if (odev == mdev)
1526 			return ERR_RESYNC_AFTER_CYCLE;
1527 
1528 		/* You are free to depend on diskless, non-existing,
1529 		 * or not yet/no longer existing minors.
1530 		 * We only reject dependency loops.
1531 		 * We cannot follow the dependency chain beyond a detached or
1532 		 * missing minor.
1533 		 */
1534 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1535 			return NO_ERROR;
1536 
1537 		rcu_read_lock();
1538 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1539 		rcu_read_unlock();
1540 		/* dependency chain ends here, no cycles. */
1541 		if (resync_after == -1)
1542 			return NO_ERROR;
1543 
1544 		/* follow the dependency chain */
1545 		odev = minor_to_mdev(resync_after);
1546 	}
1547 }
1548 
1549 /* caller must hold global_state_lock */
1550 void drbd_resync_after_changed(struct drbd_conf *mdev)
1551 {
1552 	int changes;
1553 
1554 	do {
1555 		changes  = _drbd_pause_after(mdev);
1556 		changes |= _drbd_resume_next(mdev);
1557 	} while (changes);
1558 }
1559 
1560 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1561 {
1562 	struct fifo_buffer *plan;
1563 
1564 	atomic_set(&mdev->rs_sect_in, 0);
1565 	atomic_set(&mdev->rs_sect_ev, 0);
1566 	mdev->rs_in_flight = 0;
1567 
1568 	/* Updating the RCU protected object in place is necessary since
1569 	   this function gets called from atomic context.
1570 	   It is valid since all other updates also lead to an completely
1571 	   empty fifo */
1572 	rcu_read_lock();
1573 	plan = rcu_dereference(mdev->rs_plan_s);
1574 	plan->total = 0;
1575 	fifo_set(plan, 0);
1576 	rcu_read_unlock();
1577 }
1578 
1579 void start_resync_timer_fn(unsigned long data)
1580 {
1581 	struct drbd_conf *mdev = (struct drbd_conf *) data;
1582 
1583 	drbd_queue_work(&mdev->tconn->sender_work, &mdev->start_resync_work);
1584 }
1585 
1586 int w_start_resync(struct drbd_work *w, int cancel)
1587 {
1588 	struct drbd_conf *mdev = w->mdev;
1589 
1590 	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1591 		dev_warn(DEV, "w_start_resync later...\n");
1592 		mdev->start_resync_timer.expires = jiffies + HZ/10;
1593 		add_timer(&mdev->start_resync_timer);
1594 		return 0;
1595 	}
1596 
1597 	drbd_start_resync(mdev, C_SYNC_SOURCE);
1598 	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
1599 	return 0;
1600 }
1601 
1602 /**
1603  * drbd_start_resync() - Start the resync process
1604  * @mdev:	DRBD device.
1605  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1606  *
1607  * This function might bring you directly into one of the
1608  * C_PAUSED_SYNC_* states.
1609  */
1610 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1611 {
1612 	union drbd_state ns;
1613 	int r;
1614 
1615 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1616 		dev_err(DEV, "Resync already running!\n");
1617 		return;
1618 	}
1619 
1620 	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1621 		if (side == C_SYNC_TARGET) {
1622 			/* Since application IO was locked out during C_WF_BITMAP_T and
1623 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1624 			   we check that we might make the data inconsistent. */
1625 			r = drbd_khelper(mdev, "before-resync-target");
1626 			r = (r >> 8) & 0xff;
1627 			if (r > 0) {
1628 				dev_info(DEV, "before-resync-target handler returned %d, "
1629 					 "dropping connection.\n", r);
1630 				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1631 				return;
1632 			}
1633 		} else /* C_SYNC_SOURCE */ {
1634 			r = drbd_khelper(mdev, "before-resync-source");
1635 			r = (r >> 8) & 0xff;
1636 			if (r > 0) {
1637 				if (r == 3) {
1638 					dev_info(DEV, "before-resync-source handler returned %d, "
1639 						 "ignoring. Old userland tools?", r);
1640 				} else {
1641 					dev_info(DEV, "before-resync-source handler returned %d, "
1642 						 "dropping connection.\n", r);
1643 					conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1644 					return;
1645 				}
1646 			}
1647 		}
1648 	}
1649 
1650 	if (current == mdev->tconn->worker.task) {
1651 		/* The worker should not sleep waiting for state_mutex,
1652 		   that can take long */
1653 		if (!mutex_trylock(mdev->state_mutex)) {
1654 			set_bit(B_RS_H_DONE, &mdev->flags);
1655 			mdev->start_resync_timer.expires = jiffies + HZ/5;
1656 			add_timer(&mdev->start_resync_timer);
1657 			return;
1658 		}
1659 	} else {
1660 		mutex_lock(mdev->state_mutex);
1661 	}
1662 	clear_bit(B_RS_H_DONE, &mdev->flags);
1663 
1664 	write_lock_irq(&global_state_lock);
1665 	/* Did some connection breakage or IO error race with us? */
1666 	if (mdev->state.conn < C_CONNECTED
1667 	|| !get_ldev_if_state(mdev, D_NEGOTIATING)) {
1668 		write_unlock_irq(&global_state_lock);
1669 		mutex_unlock(mdev->state_mutex);
1670 		return;
1671 	}
1672 
1673 	ns = drbd_read_state(mdev);
1674 
1675 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1676 
1677 	ns.conn = side;
1678 
1679 	if (side == C_SYNC_TARGET)
1680 		ns.disk = D_INCONSISTENT;
1681 	else /* side == C_SYNC_SOURCE */
1682 		ns.pdsk = D_INCONSISTENT;
1683 
1684 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1685 	ns = drbd_read_state(mdev);
1686 
1687 	if (ns.conn < C_CONNECTED)
1688 		r = SS_UNKNOWN_ERROR;
1689 
1690 	if (r == SS_SUCCESS) {
1691 		unsigned long tw = drbd_bm_total_weight(mdev);
1692 		unsigned long now = jiffies;
1693 		int i;
1694 
1695 		mdev->rs_failed    = 0;
1696 		mdev->rs_paused    = 0;
1697 		mdev->rs_same_csum = 0;
1698 		mdev->rs_last_events = 0;
1699 		mdev->rs_last_sect_ev = 0;
1700 		mdev->rs_total     = tw;
1701 		mdev->rs_start     = now;
1702 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1703 			mdev->rs_mark_left[i] = tw;
1704 			mdev->rs_mark_time[i] = now;
1705 		}
1706 		_drbd_pause_after(mdev);
1707 	}
1708 	write_unlock_irq(&global_state_lock);
1709 
1710 	if (r == SS_SUCCESS) {
1711 		/* reset rs_last_bcast when a resync or verify is started,
1712 		 * to deal with potential jiffies wrap. */
1713 		mdev->rs_last_bcast = jiffies - HZ;
1714 
1715 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1716 		     drbd_conn_str(ns.conn),
1717 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1718 		     (unsigned long) mdev->rs_total);
1719 		if (side == C_SYNC_TARGET)
1720 			mdev->bm_resync_fo = 0;
1721 
1722 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1723 		 * with w_send_oos, or the sync target will get confused as to
1724 		 * how much bits to resync.  We cannot do that always, because for an
1725 		 * empty resync and protocol < 95, we need to do it here, as we call
1726 		 * drbd_resync_finished from here in that case.
1727 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1728 		 * and from after_state_ch otherwise. */
1729 		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1730 			drbd_gen_and_send_sync_uuid(mdev);
1731 
1732 		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1733 			/* This still has a race (about when exactly the peers
1734 			 * detect connection loss) that can lead to a full sync
1735 			 * on next handshake. In 8.3.9 we fixed this with explicit
1736 			 * resync-finished notifications, but the fix
1737 			 * introduces a protocol change.  Sleeping for some
1738 			 * time longer than the ping interval + timeout on the
1739 			 * SyncSource, to give the SyncTarget the chance to
1740 			 * detect connection loss, then waiting for a ping
1741 			 * response (implicit in drbd_resync_finished) reduces
1742 			 * the race considerably, but does not solve it. */
1743 			if (side == C_SYNC_SOURCE) {
1744 				struct net_conf *nc;
1745 				int timeo;
1746 
1747 				rcu_read_lock();
1748 				nc = rcu_dereference(mdev->tconn->net_conf);
1749 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1750 				rcu_read_unlock();
1751 				schedule_timeout_interruptible(timeo);
1752 			}
1753 			drbd_resync_finished(mdev);
1754 		}
1755 
1756 		drbd_rs_controller_reset(mdev);
1757 		/* ns.conn may already be != mdev->state.conn,
1758 		 * we may have been paused in between, or become paused until
1759 		 * the timer triggers.
1760 		 * No matter, that is handled in resync_timer_fn() */
1761 		if (ns.conn == C_SYNC_TARGET)
1762 			mod_timer(&mdev->resync_timer, jiffies);
1763 
1764 		drbd_md_sync(mdev);
1765 	}
1766 	put_ldev(mdev);
1767 	mutex_unlock(mdev->state_mutex);
1768 }
1769 
1770 /* If the resource already closed the current epoch, but we did not
1771  * (because we have not yet seen new requests), we should send the
1772  * corresponding barrier now.  Must be checked within the same spinlock
1773  * that is used to check for new requests. */
1774 bool need_to_send_barrier(struct drbd_tconn *connection)
1775 {
1776 	if (!connection->send.seen_any_write_yet)
1777 		return false;
1778 
1779 	/* Skip barriers that do not contain any writes.
1780 	 * This may happen during AHEAD mode. */
1781 	if (!connection->send.current_epoch_writes)
1782 		return false;
1783 
1784 	/* ->req_lock is held when requests are queued on
1785 	 * connection->sender_work, and put into ->transfer_log.
1786 	 * It is also held when ->current_tle_nr is increased.
1787 	 * So either there are already new requests queued,
1788 	 * and corresponding barriers will be send there.
1789 	 * Or nothing new is queued yet, so the difference will be 1.
1790 	 */
1791 	if (atomic_read(&connection->current_tle_nr) !=
1792 	    connection->send.current_epoch_nr + 1)
1793 		return false;
1794 
1795 	return true;
1796 }
1797 
1798 bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1799 {
1800 	spin_lock_irq(&queue->q_lock);
1801 	list_splice_init(&queue->q, work_list);
1802 	spin_unlock_irq(&queue->q_lock);
1803 	return !list_empty(work_list);
1804 }
1805 
1806 bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1807 {
1808 	spin_lock_irq(&queue->q_lock);
1809 	if (!list_empty(&queue->q))
1810 		list_move(queue->q.next, work_list);
1811 	spin_unlock_irq(&queue->q_lock);
1812 	return !list_empty(work_list);
1813 }
1814 
1815 void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1816 {
1817 	DEFINE_WAIT(wait);
1818 	struct net_conf *nc;
1819 	int uncork, cork;
1820 
1821 	dequeue_work_item(&connection->sender_work, work_list);
1822 	if (!list_empty(work_list))
1823 		return;
1824 
1825 	/* Still nothing to do?
1826 	 * Maybe we still need to close the current epoch,
1827 	 * even if no new requests are queued yet.
1828 	 *
1829 	 * Also, poke TCP, just in case.
1830 	 * Then wait for new work (or signal). */
1831 	rcu_read_lock();
1832 	nc = rcu_dereference(connection->net_conf);
1833 	uncork = nc ? nc->tcp_cork : 0;
1834 	rcu_read_unlock();
1835 	if (uncork) {
1836 		mutex_lock(&connection->data.mutex);
1837 		if (connection->data.socket)
1838 			drbd_tcp_uncork(connection->data.socket);
1839 		mutex_unlock(&connection->data.mutex);
1840 	}
1841 
1842 	for (;;) {
1843 		int send_barrier;
1844 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1845 		spin_lock_irq(&connection->req_lock);
1846 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1847 		/* dequeue single item only,
1848 		 * we still use drbd_queue_work_front() in some places */
1849 		if (!list_empty(&connection->sender_work.q))
1850 			list_move(connection->sender_work.q.next, work_list);
1851 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1852 		if (!list_empty(work_list) || signal_pending(current)) {
1853 			spin_unlock_irq(&connection->req_lock);
1854 			break;
1855 		}
1856 		send_barrier = need_to_send_barrier(connection);
1857 		spin_unlock_irq(&connection->req_lock);
1858 		if (send_barrier) {
1859 			drbd_send_barrier(connection);
1860 			connection->send.current_epoch_nr++;
1861 		}
1862 		schedule();
1863 		/* may be woken up for other things but new work, too,
1864 		 * e.g. if the current epoch got closed.
1865 		 * In which case we send the barrier above. */
1866 	}
1867 	finish_wait(&connection->sender_work.q_wait, &wait);
1868 
1869 	/* someone may have changed the config while we have been waiting above. */
1870 	rcu_read_lock();
1871 	nc = rcu_dereference(connection->net_conf);
1872 	cork = nc ? nc->tcp_cork : 0;
1873 	rcu_read_unlock();
1874 	mutex_lock(&connection->data.mutex);
1875 	if (connection->data.socket) {
1876 		if (cork)
1877 			drbd_tcp_cork(connection->data.socket);
1878 		else if (!uncork)
1879 			drbd_tcp_uncork(connection->data.socket);
1880 	}
1881 	mutex_unlock(&connection->data.mutex);
1882 }
1883 
1884 int drbd_worker(struct drbd_thread *thi)
1885 {
1886 	struct drbd_tconn *tconn = thi->tconn;
1887 	struct drbd_work *w = NULL;
1888 	struct drbd_conf *mdev;
1889 	LIST_HEAD(work_list);
1890 	int vnr;
1891 
1892 	while (get_t_state(thi) == RUNNING) {
1893 		drbd_thread_current_set_cpu(thi);
1894 
1895 		/* as long as we use drbd_queue_work_front(),
1896 		 * we may only dequeue single work items here, not batches. */
1897 		if (list_empty(&work_list))
1898 			wait_for_work(tconn, &work_list);
1899 
1900 		if (signal_pending(current)) {
1901 			flush_signals(current);
1902 			if (get_t_state(thi) == RUNNING) {
1903 				conn_warn(tconn, "Worker got an unexpected signal\n");
1904 				continue;
1905 			}
1906 			break;
1907 		}
1908 
1909 		if (get_t_state(thi) != RUNNING)
1910 			break;
1911 
1912 		while (!list_empty(&work_list)) {
1913 			w = list_first_entry(&work_list, struct drbd_work, list);
1914 			list_del_init(&w->list);
1915 			if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1916 				continue;
1917 			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1918 				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1919 		}
1920 	}
1921 
1922 	do {
1923 		while (!list_empty(&work_list)) {
1924 			w = list_first_entry(&work_list, struct drbd_work, list);
1925 			list_del_init(&w->list);
1926 			w->cb(w, 1);
1927 		}
1928 		dequeue_work_batch(&tconn->sender_work, &work_list);
1929 	} while (!list_empty(&work_list));
1930 
1931 	rcu_read_lock();
1932 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1933 		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1934 		kref_get(&mdev->kref);
1935 		rcu_read_unlock();
1936 		drbd_mdev_cleanup(mdev);
1937 		kref_put(&mdev->kref, &drbd_minor_destroy);
1938 		rcu_read_lock();
1939 	}
1940 	rcu_read_unlock();
1941 
1942 	return 0;
1943 }
1944