xref: /linux/drivers/block/drbd/drbd_worker.c (revision be54f8c558027a218423134dd9b8c7c46d92204a)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_worker.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11 
12 */
13 
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26 
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30 
31 static int make_ov_request(struct drbd_peer_device *, int);
32 static int make_resync_request(struct drbd_peer_device *, int);
33 
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46 
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
drbd_md_endio(struct bio * bio)50 void drbd_md_endio(struct bio *bio)
51 {
52 	struct drbd_device *device;
53 
54 	device = bio->bi_private;
55 	device->md_io.error = blk_status_to_errno(bio->bi_status);
56 
57 	/* special case: drbd_md_read() during drbd_adm_attach() */
58 	if (device->ldev)
59 		put_ldev(device);
60 	bio_put(bio);
61 
62 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63 	 * to timeout on the lower level device, and eventually detach from it.
64 	 * If this io completion runs after that timeout expired, this
65 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
66 	 * During normal operation, this only puts that extra reference
67 	 * down to 1 again.
68 	 * Make sure we first drop the reference, and only then signal
69 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70 	 * next drbd_md_sync_page_io(), that we trigger the
71 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72 	 */
73 	drbd_md_put_buffer(device);
74 	device->md_io.done = 1;
75 	wake_up(&device->misc_wait);
76 }
77 
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
drbd_endio_read_sec_final(struct drbd_peer_request * peer_req)81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83 	unsigned long flags = 0;
84 	struct drbd_peer_device *peer_device = peer_req->peer_device;
85 	struct drbd_device *device = peer_device->device;
86 
87 	spin_lock_irqsave(&device->resource->req_lock, flags);
88 	device->read_cnt += peer_req->i.size >> 9;
89 	list_del(&peer_req->w.list);
90 	if (list_empty(&device->read_ee))
91 		wake_up(&device->ee_wait);
92 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
94 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
95 
96 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97 	put_ldev(device);
98 }
99 
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
drbd_endio_write_sec_final(struct drbd_peer_request * peer_req)102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104 	unsigned long flags = 0;
105 	struct drbd_peer_device *peer_device = peer_req->peer_device;
106 	struct drbd_device *device = peer_device->device;
107 	struct drbd_connection *connection = peer_device->connection;
108 	struct drbd_interval i;
109 	int do_wake;
110 	u64 block_id;
111 	int do_al_complete_io;
112 
113 	/* after we moved peer_req to done_ee,
114 	 * we may no longer access it,
115 	 * it may be freed/reused already!
116 	 * (as soon as we release the req_lock) */
117 	i = peer_req->i;
118 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119 	block_id = peer_req->block_id;
120 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121 
122 	if (peer_req->flags & EE_WAS_ERROR) {
123 		/* In protocol != C, we usually do not send write acks.
124 		 * In case of a write error, send the neg ack anyways. */
125 		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126 			inc_unacked(device);
127 		drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
128 	}
129 
130 	spin_lock_irqsave(&device->resource->req_lock, flags);
131 	device->writ_cnt += peer_req->i.size >> 9;
132 	list_move_tail(&peer_req->w.list, &device->done_ee);
133 
134 	/*
135 	 * Do not remove from the write_requests tree here: we did not send the
136 	 * Ack yet and did not wake possibly waiting conflicting requests.
137 	 * Removed from the tree from "drbd_process_done_ee" within the
138 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139 	 * _drbd_clear_done_ee.
140 	 */
141 
142 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143 
144 	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
145 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146 	if (peer_req->flags & EE_WAS_ERROR)
147 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148 
149 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
150 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152 			kref_put(&device->kref, drbd_destroy_device);
153 	}
154 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
155 
156 	if (block_id == ID_SYNCER)
157 		drbd_rs_complete_io(device, i.sector);
158 
159 	if (do_wake)
160 		wake_up(&device->ee_wait);
161 
162 	if (do_al_complete_io)
163 		drbd_al_complete_io(device, &i);
164 
165 	put_ldev(device);
166 }
167 
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
drbd_peer_request_endio(struct bio * bio)171 void drbd_peer_request_endio(struct bio *bio)
172 {
173 	struct drbd_peer_request *peer_req = bio->bi_private;
174 	struct drbd_device *device = peer_req->peer_device->device;
175 	bool is_write = bio_data_dir(bio) == WRITE;
176 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177 			  bio_op(bio) == REQ_OP_DISCARD;
178 
179 	if (bio->bi_status && drbd_ratelimit())
180 		drbd_warn(device, "%s: error=%d s=%llus\n",
181 				is_write ? (is_discard ? "discard" : "write")
182 					: "read", bio->bi_status,
183 				(unsigned long long)peer_req->i.sector);
184 
185 	if (bio->bi_status)
186 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187 
188 	bio_put(bio); /* no need for the bio anymore */
189 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190 		if (is_write)
191 			drbd_endio_write_sec_final(peer_req);
192 		else
193 			drbd_endio_read_sec_final(peer_req);
194 	}
195 }
196 
197 static void
drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device * device)198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201 		device->minor, device->resource->name, device->vnr);
202 }
203 
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
drbd_request_endio(struct bio * bio)206 void drbd_request_endio(struct bio *bio)
207 {
208 	unsigned long flags;
209 	struct drbd_request *req = bio->bi_private;
210 	struct drbd_device *device = req->device;
211 	struct bio_and_error m;
212 	enum drbd_req_event what;
213 
214 	/* If this request was aborted locally before,
215 	 * but now was completed "successfully",
216 	 * chances are that this caused arbitrary data corruption.
217 	 *
218 	 * "aborting" requests, or force-detaching the disk, is intended for
219 	 * completely blocked/hung local backing devices which do no longer
220 	 * complete requests at all, not even do error completions.  In this
221 	 * situation, usually a hard-reset and failover is the only way out.
222 	 *
223 	 * By "aborting", basically faking a local error-completion,
224 	 * we allow for a more graceful swichover by cleanly migrating services.
225 	 * Still the affected node has to be rebooted "soon".
226 	 *
227 	 * By completing these requests, we allow the upper layers to re-use
228 	 * the associated data pages.
229 	 *
230 	 * If later the local backing device "recovers", and now DMAs some data
231 	 * from disk into the original request pages, in the best case it will
232 	 * just put random data into unused pages; but typically it will corrupt
233 	 * meanwhile completely unrelated data, causing all sorts of damage.
234 	 *
235 	 * Which means delayed successful completion,
236 	 * especially for READ requests,
237 	 * is a reason to panic().
238 	 *
239 	 * We assume that a delayed *error* completion is OK,
240 	 * though we still will complain noisily about it.
241 	 */
242 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243 		if (drbd_ratelimit())
244 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245 
246 		if (!bio->bi_status)
247 			drbd_panic_after_delayed_completion_of_aborted_request(device);
248 	}
249 
250 	/* to avoid recursion in __req_mod */
251 	if (unlikely(bio->bi_status)) {
252 		switch (bio_op(bio)) {
253 		case REQ_OP_WRITE_ZEROES:
254 		case REQ_OP_DISCARD:
255 			if (bio->bi_status == BLK_STS_NOTSUPP)
256 				what = DISCARD_COMPLETED_NOTSUPP;
257 			else
258 				what = DISCARD_COMPLETED_WITH_ERROR;
259 			break;
260 		case REQ_OP_READ:
261 			if (bio->bi_opf & REQ_RAHEAD)
262 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
263 			else
264 				what = READ_COMPLETED_WITH_ERROR;
265 			break;
266 		default:
267 			what = WRITE_COMPLETED_WITH_ERROR;
268 			break;
269 		}
270 	} else {
271 		what = COMPLETED_OK;
272 	}
273 
274 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275 	bio_put(bio);
276 
277 	/* not req_mod(), we need irqsave here! */
278 	spin_lock_irqsave(&device->resource->req_lock, flags);
279 	__req_mod(req, what, NULL, &m);
280 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281 	put_ldev(device);
282 
283 	if (m.bio)
284 		complete_master_bio(device, &m);
285 }
286 
drbd_csum_ee(struct crypto_shash * tfm,struct drbd_peer_request * peer_req,void * digest)287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289 	SHASH_DESC_ON_STACK(desc, tfm);
290 	struct page *page = peer_req->pages;
291 	struct page *tmp;
292 	unsigned len;
293 	void *src;
294 
295 	desc->tfm = tfm;
296 
297 	crypto_shash_init(desc);
298 
299 	src = kmap_atomic(page);
300 	while ((tmp = page_chain_next(page))) {
301 		/* all but the last page will be fully used */
302 		crypto_shash_update(desc, src, PAGE_SIZE);
303 		kunmap_atomic(src);
304 		page = tmp;
305 		src = kmap_atomic(page);
306 	}
307 	/* and now the last, possibly only partially used page */
308 	len = peer_req->i.size & (PAGE_SIZE - 1);
309 	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310 	kunmap_atomic(src);
311 
312 	crypto_shash_final(desc, digest);
313 	shash_desc_zero(desc);
314 }
315 
drbd_csum_bio(struct crypto_shash * tfm,struct bio * bio,void * digest)316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318 	SHASH_DESC_ON_STACK(desc, tfm);
319 	struct bio_vec bvec;
320 	struct bvec_iter iter;
321 
322 	desc->tfm = tfm;
323 
324 	crypto_shash_init(desc);
325 
326 	bio_for_each_segment(bvec, bio, iter) {
327 		u8 *src;
328 
329 		src = bvec_kmap_local(&bvec);
330 		crypto_shash_update(desc, src, bvec.bv_len);
331 		kunmap_local(src);
332 	}
333 	crypto_shash_final(desc, digest);
334 	shash_desc_zero(desc);
335 }
336 
337 /* MAYBE merge common code with w_e_end_ov_req */
w_e_send_csum(struct drbd_work * w,int cancel)338 static int w_e_send_csum(struct drbd_work *w, int cancel)
339 {
340 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341 	struct drbd_peer_device *peer_device = peer_req->peer_device;
342 	struct drbd_device *device = peer_device->device;
343 	int digest_size;
344 	void *digest;
345 	int err = 0;
346 
347 	if (unlikely(cancel))
348 		goto out;
349 
350 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351 		goto out;
352 
353 	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354 	digest = kmalloc(digest_size, GFP_NOIO);
355 	if (digest) {
356 		sector_t sector = peer_req->i.sector;
357 		unsigned int size = peer_req->i.size;
358 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359 		/* Free peer_req and pages before send.
360 		 * In case we block on congestion, we could otherwise run into
361 		 * some distributed deadlock, if the other side blocks on
362 		 * congestion as well, because our receiver blocks in
363 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
364 		drbd_free_peer_req(device, peer_req);
365 		peer_req = NULL;
366 		inc_rs_pending(peer_device);
367 		err = drbd_send_drequest_csum(peer_device, sector, size,
368 					      digest, digest_size,
369 					      P_CSUM_RS_REQUEST);
370 		kfree(digest);
371 	} else {
372 		drbd_err(device, "kmalloc() of digest failed.\n");
373 		err = -ENOMEM;
374 	}
375 
376 out:
377 	if (peer_req)
378 		drbd_free_peer_req(device, peer_req);
379 
380 	if (unlikely(err))
381 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382 	return err;
383 }
384 
385 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
386 
read_for_csum(struct drbd_peer_device * peer_device,sector_t sector,int size)387 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388 {
389 	struct drbd_device *device = peer_device->device;
390 	struct drbd_peer_request *peer_req;
391 
392 	if (!get_ldev(device))
393 		return -EIO;
394 
395 	/* GFP_TRY, because if there is no memory available right now, this may
396 	 * be rescheduled for later. It is "only" background resync, after all. */
397 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398 				       size, size, GFP_TRY);
399 	if (!peer_req)
400 		goto defer;
401 
402 	peer_req->w.cb = w_e_send_csum;
403 	peer_req->opf = REQ_OP_READ;
404 	spin_lock_irq(&device->resource->req_lock);
405 	list_add_tail(&peer_req->w.list, &device->read_ee);
406 	spin_unlock_irq(&device->resource->req_lock);
407 
408 	atomic_add(size >> 9, &device->rs_sect_ev);
409 	if (drbd_submit_peer_request(peer_req) == 0)
410 		return 0;
411 
412 	/* If it failed because of ENOMEM, retry should help.  If it failed
413 	 * because bio_add_page failed (probably broken lower level driver),
414 	 * retry may or may not help.
415 	 * If it does not, you may need to force disconnect. */
416 	spin_lock_irq(&device->resource->req_lock);
417 	list_del(&peer_req->w.list);
418 	spin_unlock_irq(&device->resource->req_lock);
419 
420 	drbd_free_peer_req(device, peer_req);
421 defer:
422 	put_ldev(device);
423 	return -EAGAIN;
424 }
425 
w_resync_timer(struct drbd_work * w,int cancel)426 int w_resync_timer(struct drbd_work *w, int cancel)
427 {
428 	struct drbd_device *device =
429 		container_of(w, struct drbd_device, resync_work);
430 
431 	switch (device->state.conn) {
432 	case C_VERIFY_S:
433 		make_ov_request(first_peer_device(device), cancel);
434 		break;
435 	case C_SYNC_TARGET:
436 		make_resync_request(first_peer_device(device), cancel);
437 		break;
438 	}
439 
440 	return 0;
441 }
442 
resync_timer_fn(struct timer_list * t)443 void resync_timer_fn(struct timer_list *t)
444 {
445 	struct drbd_device *device = timer_container_of(device, t,
446 							resync_timer);
447 
448 	drbd_queue_work_if_unqueued(
449 		&first_peer_device(device)->connection->sender_work,
450 		&device->resync_work);
451 }
452 
fifo_set(struct fifo_buffer * fb,int value)453 static void fifo_set(struct fifo_buffer *fb, int value)
454 {
455 	int i;
456 
457 	for (i = 0; i < fb->size; i++)
458 		fb->values[i] = value;
459 }
460 
fifo_push(struct fifo_buffer * fb,int value)461 static int fifo_push(struct fifo_buffer *fb, int value)
462 {
463 	int ov;
464 
465 	ov = fb->values[fb->head_index];
466 	fb->values[fb->head_index++] = value;
467 
468 	if (fb->head_index >= fb->size)
469 		fb->head_index = 0;
470 
471 	return ov;
472 }
473 
fifo_add_val(struct fifo_buffer * fb,int value)474 static void fifo_add_val(struct fifo_buffer *fb, int value)
475 {
476 	int i;
477 
478 	for (i = 0; i < fb->size; i++)
479 		fb->values[i] += value;
480 }
481 
fifo_alloc(unsigned int fifo_size)482 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
483 {
484 	struct fifo_buffer *fb;
485 
486 	fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
487 	if (!fb)
488 		return NULL;
489 
490 	fb->head_index = 0;
491 	fb->size = fifo_size;
492 	fb->total = 0;
493 
494 	return fb;
495 }
496 
drbd_rs_controller(struct drbd_peer_device * peer_device,unsigned int sect_in)497 static int drbd_rs_controller(struct drbd_peer_device *peer_device, unsigned int sect_in)
498 {
499 	struct drbd_device *device = peer_device->device;
500 	struct disk_conf *dc;
501 	unsigned int want;     /* The number of sectors we want in-flight */
502 	int req_sect; /* Number of sectors to request in this turn */
503 	int correction; /* Number of sectors more we need in-flight */
504 	int cps; /* correction per invocation of drbd_rs_controller() */
505 	int steps; /* Number of time steps to plan ahead */
506 	int curr_corr;
507 	int max_sect;
508 	struct fifo_buffer *plan;
509 
510 	dc = rcu_dereference(device->ldev->disk_conf);
511 	plan = rcu_dereference(device->rs_plan_s);
512 
513 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
514 
515 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
516 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
517 	} else { /* normal path */
518 		want = dc->c_fill_target ? dc->c_fill_target :
519 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
520 	}
521 
522 	correction = want - device->rs_in_flight - plan->total;
523 
524 	/* Plan ahead */
525 	cps = correction / steps;
526 	fifo_add_val(plan, cps);
527 	plan->total += cps * steps;
528 
529 	/* What we do in this step */
530 	curr_corr = fifo_push(plan, 0);
531 	plan->total -= curr_corr;
532 
533 	req_sect = sect_in + curr_corr;
534 	if (req_sect < 0)
535 		req_sect = 0;
536 
537 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
538 	if (req_sect > max_sect)
539 		req_sect = max_sect;
540 
541 	/*
542 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
543 		 sect_in, device->rs_in_flight, want, correction,
544 		 steps, cps, device->rs_planed, curr_corr, req_sect);
545 	*/
546 
547 	return req_sect;
548 }
549 
drbd_rs_number_requests(struct drbd_peer_device * peer_device)550 static int drbd_rs_number_requests(struct drbd_peer_device *peer_device)
551 {
552 	struct drbd_device *device = peer_device->device;
553 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
554 	int number, mxb;
555 
556 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
557 	device->rs_in_flight -= sect_in;
558 
559 	rcu_read_lock();
560 	mxb = drbd_get_max_buffers(device) / 2;
561 	if (rcu_dereference(device->rs_plan_s)->size) {
562 		number = drbd_rs_controller(peer_device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564 	} else {
565 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
567 	}
568 	rcu_read_unlock();
569 
570 	/* Don't have more than "max-buffers"/2 in-flight.
571 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572 	 * potentially causing a distributed deadlock on congestion during
573 	 * online-verify or (checksum-based) resync, if max-buffers,
574 	 * socket buffer sizes and resync rate settings are mis-configured. */
575 
576 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
578 	 * "number of pages" (typically also 4k),
579 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
580 	if (mxb - device->rs_in_flight/8 < number)
581 		number = mxb - device->rs_in_flight/8;
582 
583 	return number;
584 }
585 
make_resync_request(struct drbd_peer_device * const peer_device,int cancel)586 static int make_resync_request(struct drbd_peer_device *const peer_device, int cancel)
587 {
588 	struct drbd_device *const device = peer_device->device;
589 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590 	unsigned long bit;
591 	sector_t sector;
592 	const sector_t capacity = get_capacity(device->vdisk);
593 	int max_bio_size;
594 	int number, rollback_i, size;
595 	int align, requeue = 0;
596 	int i = 0;
597 	int discard_granularity = 0;
598 
599 	if (unlikely(cancel))
600 		return 0;
601 
602 	if (device->rs_total == 0) {
603 		/* empty resync? */
604 		drbd_resync_finished(peer_device);
605 		return 0;
606 	}
607 
608 	if (!get_ldev(device)) {
609 		/* Since we only need to access device->rsync a
610 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
611 		   to continue resync with a broken disk makes no sense at
612 		   all */
613 		drbd_err(device, "Disk broke down during resync!\n");
614 		return 0;
615 	}
616 
617 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
618 		rcu_read_lock();
619 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
620 		rcu_read_unlock();
621 	}
622 
623 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
624 	number = drbd_rs_number_requests(peer_device);
625 	if (number <= 0)
626 		goto requeue;
627 
628 	for (i = 0; i < number; i++) {
629 		/* Stop generating RS requests when half of the send buffer is filled,
630 		 * but notify TCP that we'd like to have more space. */
631 		mutex_lock(&connection->data.mutex);
632 		if (connection->data.socket) {
633 			struct sock *sk = connection->data.socket->sk;
634 			int queued = sk->sk_wmem_queued;
635 			int sndbuf = sk->sk_sndbuf;
636 			if (queued > sndbuf / 2) {
637 				requeue = 1;
638 				if (sk->sk_socket)
639 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
640 			}
641 		} else
642 			requeue = 1;
643 		mutex_unlock(&connection->data.mutex);
644 		if (requeue)
645 			goto requeue;
646 
647 next_sector:
648 		size = BM_BLOCK_SIZE;
649 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
650 
651 		if (bit == DRBD_END_OF_BITMAP) {
652 			device->bm_resync_fo = drbd_bm_bits(device);
653 			put_ldev(device);
654 			return 0;
655 		}
656 
657 		sector = BM_BIT_TO_SECT(bit);
658 
659 		if (drbd_try_rs_begin_io(peer_device, sector)) {
660 			device->bm_resync_fo = bit;
661 			goto requeue;
662 		}
663 		device->bm_resync_fo = bit + 1;
664 
665 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
666 			drbd_rs_complete_io(device, sector);
667 			goto next_sector;
668 		}
669 
670 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
671 		/* try to find some adjacent bits.
672 		 * we stop if we have already the maximum req size.
673 		 *
674 		 * Additionally always align bigger requests, in order to
675 		 * be prepared for all stripe sizes of software RAIDs.
676 		 */
677 		align = 1;
678 		rollback_i = i;
679 		while (i < number) {
680 			if (size + BM_BLOCK_SIZE > max_bio_size)
681 				break;
682 
683 			/* Be always aligned */
684 			if (sector & ((1<<(align+3))-1))
685 				break;
686 
687 			if (discard_granularity && size == discard_granularity)
688 				break;
689 
690 			/* do not cross extent boundaries */
691 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
692 				break;
693 			/* now, is it actually dirty, after all?
694 			 * caution, drbd_bm_test_bit is tri-state for some
695 			 * obscure reason; ( b == 0 ) would get the out-of-band
696 			 * only accidentally right because of the "oddly sized"
697 			 * adjustment below */
698 			if (drbd_bm_test_bit(device, bit+1) != 1)
699 				break;
700 			bit++;
701 			size += BM_BLOCK_SIZE;
702 			if ((BM_BLOCK_SIZE << align) <= size)
703 				align++;
704 			i++;
705 		}
706 		/* if we merged some,
707 		 * reset the offset to start the next drbd_bm_find_next from */
708 		if (size > BM_BLOCK_SIZE)
709 			device->bm_resync_fo = bit + 1;
710 #endif
711 
712 		/* adjust very last sectors, in case we are oddly sized */
713 		if (sector + (size>>9) > capacity)
714 			size = (capacity-sector)<<9;
715 
716 		if (device->use_csums) {
717 			switch (read_for_csum(peer_device, sector, size)) {
718 			case -EIO: /* Disk failure */
719 				put_ldev(device);
720 				return -EIO;
721 			case -EAGAIN: /* allocation failed, or ldev busy */
722 				drbd_rs_complete_io(device, sector);
723 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
724 				i = rollback_i;
725 				goto requeue;
726 			case 0:
727 				/* everything ok */
728 				break;
729 			default:
730 				BUG();
731 			}
732 		} else {
733 			int err;
734 
735 			inc_rs_pending(peer_device);
736 			err = drbd_send_drequest(peer_device,
737 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
738 						 sector, size, ID_SYNCER);
739 			if (err) {
740 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
741 				dec_rs_pending(peer_device);
742 				put_ldev(device);
743 				return err;
744 			}
745 		}
746 	}
747 
748 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
749 		/* last syncer _request_ was sent,
750 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
751 		 * next sync group will resume), as soon as we receive the last
752 		 * resync data block, and the last bit is cleared.
753 		 * until then resync "work" is "inactive" ...
754 		 */
755 		put_ldev(device);
756 		return 0;
757 	}
758 
759  requeue:
760 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
761 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
762 	put_ldev(device);
763 	return 0;
764 }
765 
make_ov_request(struct drbd_peer_device * peer_device,int cancel)766 static int make_ov_request(struct drbd_peer_device *peer_device, int cancel)
767 {
768 	struct drbd_device *device = peer_device->device;
769 	int number, i, size;
770 	sector_t sector;
771 	const sector_t capacity = get_capacity(device->vdisk);
772 	bool stop_sector_reached = false;
773 
774 	if (unlikely(cancel))
775 		return 1;
776 
777 	number = drbd_rs_number_requests(peer_device);
778 
779 	sector = device->ov_position;
780 	for (i = 0; i < number; i++) {
781 		if (sector >= capacity)
782 			return 1;
783 
784 		/* We check for "finished" only in the reply path:
785 		 * w_e_end_ov_reply().
786 		 * We need to send at least one request out. */
787 		stop_sector_reached = i > 0
788 			&& verify_can_do_stop_sector(device)
789 			&& sector >= device->ov_stop_sector;
790 		if (stop_sector_reached)
791 			break;
792 
793 		size = BM_BLOCK_SIZE;
794 
795 		if (drbd_try_rs_begin_io(peer_device, sector)) {
796 			device->ov_position = sector;
797 			goto requeue;
798 		}
799 
800 		if (sector + (size>>9) > capacity)
801 			size = (capacity-sector)<<9;
802 
803 		inc_rs_pending(peer_device);
804 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
805 			dec_rs_pending(peer_device);
806 			return 0;
807 		}
808 		sector += BM_SECT_PER_BIT;
809 	}
810 	device->ov_position = sector;
811 
812  requeue:
813 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
814 	if (i == 0 || !stop_sector_reached)
815 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
816 	return 1;
817 }
818 
w_ov_finished(struct drbd_work * w,int cancel)819 int w_ov_finished(struct drbd_work *w, int cancel)
820 {
821 	struct drbd_device_work *dw =
822 		container_of(w, struct drbd_device_work, w);
823 	struct drbd_device *device = dw->device;
824 	kfree(dw);
825 	ov_out_of_sync_print(first_peer_device(device));
826 	drbd_resync_finished(first_peer_device(device));
827 
828 	return 0;
829 }
830 
w_resync_finished(struct drbd_work * w,int cancel)831 static int w_resync_finished(struct drbd_work *w, int cancel)
832 {
833 	struct drbd_device_work *dw =
834 		container_of(w, struct drbd_device_work, w);
835 	struct drbd_device *device = dw->device;
836 	kfree(dw);
837 
838 	drbd_resync_finished(first_peer_device(device));
839 
840 	return 0;
841 }
842 
ping_peer(struct drbd_device * device)843 static void ping_peer(struct drbd_device *device)
844 {
845 	struct drbd_connection *connection = first_peer_device(device)->connection;
846 
847 	clear_bit(GOT_PING_ACK, &connection->flags);
848 	request_ping(connection);
849 	wait_event(connection->ping_wait,
850 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
851 }
852 
drbd_resync_finished(struct drbd_peer_device * peer_device)853 int drbd_resync_finished(struct drbd_peer_device *peer_device)
854 {
855 	struct drbd_device *device = peer_device->device;
856 	struct drbd_connection *connection = peer_device->connection;
857 	unsigned long db, dt, dbdt;
858 	unsigned long n_oos;
859 	union drbd_state os, ns;
860 	struct drbd_device_work *dw;
861 	char *khelper_cmd = NULL;
862 	int verify_done = 0;
863 
864 	/* Remove all elements from the resync LRU. Since future actions
865 	 * might set bits in the (main) bitmap, then the entries in the
866 	 * resync LRU would be wrong. */
867 	if (drbd_rs_del_all(device)) {
868 		/* In case this is not possible now, most probably because
869 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
870 		 * queue (or even the read operations for those packets
871 		 * is not finished by now).   Retry in 100ms. */
872 
873 		schedule_timeout_interruptible(HZ / 10);
874 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875 		if (dw) {
876 			dw->w.cb = w_resync_finished;
877 			dw->device = device;
878 			drbd_queue_work(&connection->sender_work, &dw->w);
879 			return 1;
880 		}
881 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882 	}
883 
884 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885 	if (dt <= 0)
886 		dt = 1;
887 
888 	db = device->rs_total;
889 	/* adjust for verify start and stop sectors, respective reached position */
890 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891 		db -= device->ov_left;
892 
893 	dbdt = Bit2KB(db/dt);
894 	device->rs_paused /= HZ;
895 
896 	if (!get_ldev(device))
897 		goto out;
898 
899 	ping_peer(device);
900 
901 	spin_lock_irq(&device->resource->req_lock);
902 	os = drbd_read_state(device);
903 
904 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905 
906 	/* This protects us against multiple calls (that can happen in the presence
907 	   of application IO), and against connectivity loss just before we arrive here. */
908 	if (os.conn <= C_CONNECTED)
909 		goto out_unlock;
910 
911 	ns = os;
912 	ns.conn = C_CONNECTED;
913 
914 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915 	     verify_done ? "Online verify" : "Resync",
916 	     dt + device->rs_paused, device->rs_paused, dbdt);
917 
918 	n_oos = drbd_bm_total_weight(device);
919 
920 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921 		if (n_oos) {
922 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923 			      n_oos, Bit2KB(1));
924 			khelper_cmd = "out-of-sync";
925 		}
926 	} else {
927 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928 
929 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930 			khelper_cmd = "after-resync-target";
931 
932 		if (device->use_csums && device->rs_total) {
933 			const unsigned long s = device->rs_same_csum;
934 			const unsigned long t = device->rs_total;
935 			const int ratio =
936 				(t == 0)     ? 0 :
937 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
938 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939 			     "transferred %luK total %luK\n",
940 			     ratio,
941 			     Bit2KB(device->rs_same_csum),
942 			     Bit2KB(device->rs_total - device->rs_same_csum),
943 			     Bit2KB(device->rs_total));
944 		}
945 	}
946 
947 	if (device->rs_failed) {
948 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
949 
950 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951 			ns.disk = D_INCONSISTENT;
952 			ns.pdsk = D_UP_TO_DATE;
953 		} else {
954 			ns.disk = D_UP_TO_DATE;
955 			ns.pdsk = D_INCONSISTENT;
956 		}
957 	} else {
958 		ns.disk = D_UP_TO_DATE;
959 		ns.pdsk = D_UP_TO_DATE;
960 
961 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962 			if (device->p_uuid) {
963 				int i;
964 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965 					_drbd_uuid_set(device, i, device->p_uuid[i]);
966 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968 			} else {
969 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
970 			}
971 		}
972 
973 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974 			/* for verify runs, we don't update uuids here,
975 			 * so there would be nothing to report. */
976 			drbd_uuid_set_bm(device, 0UL);
977 			drbd_print_uuids(device, "updated UUIDs");
978 			if (device->p_uuid) {
979 				/* Now the two UUID sets are equal, update what we
980 				 * know of the peer. */
981 				int i;
982 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983 					device->p_uuid[i] = device->ldev->md.uuid[i];
984 			}
985 		}
986 	}
987 
988 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
989 out_unlock:
990 	spin_unlock_irq(&device->resource->req_lock);
991 
992 	/* If we have been sync source, and have an effective fencing-policy,
993 	 * once *all* volumes are back in sync, call "unfence". */
994 	if (os.conn == C_SYNC_SOURCE) {
995 		enum drbd_disk_state disk_state = D_MASK;
996 		enum drbd_disk_state pdsk_state = D_MASK;
997 		enum drbd_fencing_p fp = FP_DONT_CARE;
998 
999 		rcu_read_lock();
1000 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001 		if (fp != FP_DONT_CARE) {
1002 			struct drbd_peer_device *peer_device;
1003 			int vnr;
1004 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005 				struct drbd_device *device = peer_device->device;
1006 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008 			}
1009 		}
1010 		rcu_read_unlock();
1011 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012 			conn_khelper(connection, "unfence-peer");
1013 	}
1014 
1015 	put_ldev(device);
1016 out:
1017 	device->rs_total  = 0;
1018 	device->rs_failed = 0;
1019 	device->rs_paused = 0;
1020 
1021 	/* reset start sector, if we reached end of device */
1022 	if (verify_done && device->ov_left == 0)
1023 		device->ov_start_sector = 0;
1024 
1025 	drbd_md_sync(device);
1026 
1027 	if (khelper_cmd)
1028 		drbd_khelper(device, khelper_cmd);
1029 
1030 	return 1;
1031 }
1032 
1033 /* helper */
move_to_net_ee_or_free(struct drbd_device * device,struct drbd_peer_request * peer_req)1034 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1035 {
1036 	if (drbd_peer_req_has_active_page(peer_req)) {
1037 		/* This might happen if sendpage() has not finished */
1038 		int i = PFN_UP(peer_req->i.size);
1039 		atomic_add(i, &device->pp_in_use_by_net);
1040 		atomic_sub(i, &device->pp_in_use);
1041 		spin_lock_irq(&device->resource->req_lock);
1042 		list_add_tail(&peer_req->w.list, &device->net_ee);
1043 		spin_unlock_irq(&device->resource->req_lock);
1044 		wake_up(&drbd_pp_wait);
1045 	} else
1046 		drbd_free_peer_req(device, peer_req);
1047 }
1048 
1049 /**
1050  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1051  * @w:		work object.
1052  * @cancel:	The connection will be closed anyways
1053  */
w_e_end_data_req(struct drbd_work * w,int cancel)1054 int w_e_end_data_req(struct drbd_work *w, int cancel)
1055 {
1056 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1057 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1058 	struct drbd_device *device = peer_device->device;
1059 	int err;
1060 
1061 	if (unlikely(cancel)) {
1062 		drbd_free_peer_req(device, peer_req);
1063 		dec_unacked(device);
1064 		return 0;
1065 	}
1066 
1067 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1068 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1069 	} else {
1070 		if (drbd_ratelimit())
1071 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1072 			    (unsigned long long)peer_req->i.sector);
1073 
1074 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1075 	}
1076 
1077 	dec_unacked(device);
1078 
1079 	move_to_net_ee_or_free(device, peer_req);
1080 
1081 	if (unlikely(err))
1082 		drbd_err(device, "drbd_send_block() failed\n");
1083 	return err;
1084 }
1085 
all_zero(struct drbd_peer_request * peer_req)1086 static bool all_zero(struct drbd_peer_request *peer_req)
1087 {
1088 	struct page *page = peer_req->pages;
1089 	unsigned int len = peer_req->i.size;
1090 
1091 	page_chain_for_each(page) {
1092 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1093 		unsigned int i, words = l / sizeof(long);
1094 		unsigned long *d;
1095 
1096 		d = kmap_atomic(page);
1097 		for (i = 0; i < words; i++) {
1098 			if (d[i]) {
1099 				kunmap_atomic(d);
1100 				return false;
1101 			}
1102 		}
1103 		kunmap_atomic(d);
1104 		len -= l;
1105 	}
1106 
1107 	return true;
1108 }
1109 
1110 /**
1111  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1112  * @w:		work object.
1113  * @cancel:	The connection will be closed anyways
1114  */
w_e_end_rsdata_req(struct drbd_work * w,int cancel)1115 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1116 {
1117 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1119 	struct drbd_device *device = peer_device->device;
1120 	int err;
1121 
1122 	if (unlikely(cancel)) {
1123 		drbd_free_peer_req(device, peer_req);
1124 		dec_unacked(device);
1125 		return 0;
1126 	}
1127 
1128 	if (get_ldev_if_state(device, D_FAILED)) {
1129 		drbd_rs_complete_io(device, peer_req->i.sector);
1130 		put_ldev(device);
1131 	}
1132 
1133 	if (device->state.conn == C_AHEAD) {
1134 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1135 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1136 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1137 			inc_rs_pending(peer_device);
1138 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1139 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1140 			else
1141 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142 		} else {
1143 			if (drbd_ratelimit())
1144 				drbd_err(device, "Not sending RSDataReply, "
1145 				    "partner DISKLESS!\n");
1146 			err = 0;
1147 		}
1148 	} else {
1149 		if (drbd_ratelimit())
1150 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1151 			    (unsigned long long)peer_req->i.sector);
1152 
1153 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154 
1155 		/* update resync data with failure */
1156 		drbd_rs_failed_io(peer_device, peer_req->i.sector, peer_req->i.size);
1157 	}
1158 
1159 	dec_unacked(device);
1160 
1161 	move_to_net_ee_or_free(device, peer_req);
1162 
1163 	if (unlikely(err))
1164 		drbd_err(device, "drbd_send_block() failed\n");
1165 	return err;
1166 }
1167 
w_e_end_csum_rs_req(struct drbd_work * w,int cancel)1168 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1169 {
1170 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1171 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1172 	struct drbd_device *device = peer_device->device;
1173 	struct digest_info *di;
1174 	int digest_size;
1175 	void *digest = NULL;
1176 	int err, eq = 0;
1177 
1178 	if (unlikely(cancel)) {
1179 		drbd_free_peer_req(device, peer_req);
1180 		dec_unacked(device);
1181 		return 0;
1182 	}
1183 
1184 	if (get_ldev(device)) {
1185 		drbd_rs_complete_io(device, peer_req->i.sector);
1186 		put_ldev(device);
1187 	}
1188 
1189 	di = peer_req->digest;
1190 
1191 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1192 		/* quick hack to try to avoid a race against reconfiguration.
1193 		 * a real fix would be much more involved,
1194 		 * introducing more locking mechanisms */
1195 		if (peer_device->connection->csums_tfm) {
1196 			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1197 			D_ASSERT(device, digest_size == di->digest_size);
1198 			digest = kmalloc(digest_size, GFP_NOIO);
1199 		}
1200 		if (digest) {
1201 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1202 			eq = !memcmp(digest, di->digest, digest_size);
1203 			kfree(digest);
1204 		}
1205 
1206 		if (eq) {
1207 			drbd_set_in_sync(peer_device, peer_req->i.sector, peer_req->i.size);
1208 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1209 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1210 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1211 		} else {
1212 			inc_rs_pending(peer_device);
1213 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1214 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1215 			kfree(di);
1216 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1217 		}
1218 	} else {
1219 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1220 		if (drbd_ratelimit())
1221 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1222 	}
1223 
1224 	dec_unacked(device);
1225 	move_to_net_ee_or_free(device, peer_req);
1226 
1227 	if (unlikely(err))
1228 		drbd_err(device, "drbd_send_block/ack() failed\n");
1229 	return err;
1230 }
1231 
w_e_end_ov_req(struct drbd_work * w,int cancel)1232 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1233 {
1234 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1235 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1236 	struct drbd_device *device = peer_device->device;
1237 	sector_t sector = peer_req->i.sector;
1238 	unsigned int size = peer_req->i.size;
1239 	int digest_size;
1240 	void *digest;
1241 	int err = 0;
1242 
1243 	if (unlikely(cancel))
1244 		goto out;
1245 
1246 	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1247 	digest = kmalloc(digest_size, GFP_NOIO);
1248 	if (!digest) {
1249 		err = 1;	/* terminate the connection in case the allocation failed */
1250 		goto out;
1251 	}
1252 
1253 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1254 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1255 	else
1256 		memset(digest, 0, digest_size);
1257 
1258 	/* Free e and pages before send.
1259 	 * In case we block on congestion, we could otherwise run into
1260 	 * some distributed deadlock, if the other side blocks on
1261 	 * congestion as well, because our receiver blocks in
1262 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1263 	drbd_free_peer_req(device, peer_req);
1264 	peer_req = NULL;
1265 	inc_rs_pending(peer_device);
1266 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1267 	if (err)
1268 		dec_rs_pending(peer_device);
1269 	kfree(digest);
1270 
1271 out:
1272 	if (peer_req)
1273 		drbd_free_peer_req(device, peer_req);
1274 	dec_unacked(device);
1275 	return err;
1276 }
1277 
drbd_ov_out_of_sync_found(struct drbd_peer_device * peer_device,sector_t sector,int size)1278 void drbd_ov_out_of_sync_found(struct drbd_peer_device *peer_device, sector_t sector, int size)
1279 {
1280 	struct drbd_device *device = peer_device->device;
1281 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1282 		device->ov_last_oos_size += size>>9;
1283 	} else {
1284 		device->ov_last_oos_start = sector;
1285 		device->ov_last_oos_size = size>>9;
1286 	}
1287 	drbd_set_out_of_sync(peer_device, sector, size);
1288 }
1289 
w_e_end_ov_reply(struct drbd_work * w,int cancel)1290 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1291 {
1292 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1293 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1294 	struct drbd_device *device = peer_device->device;
1295 	struct digest_info *di;
1296 	void *digest;
1297 	sector_t sector = peer_req->i.sector;
1298 	unsigned int size = peer_req->i.size;
1299 	int digest_size;
1300 	int err, eq = 0;
1301 	bool stop_sector_reached = false;
1302 
1303 	if (unlikely(cancel)) {
1304 		drbd_free_peer_req(device, peer_req);
1305 		dec_unacked(device);
1306 		return 0;
1307 	}
1308 
1309 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1310 	 * the resync lru has been cleaned up already */
1311 	if (get_ldev(device)) {
1312 		drbd_rs_complete_io(device, peer_req->i.sector);
1313 		put_ldev(device);
1314 	}
1315 
1316 	di = peer_req->digest;
1317 
1318 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1319 		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1320 		digest = kmalloc(digest_size, GFP_NOIO);
1321 		if (digest) {
1322 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1323 
1324 			D_ASSERT(device, digest_size == di->digest_size);
1325 			eq = !memcmp(digest, di->digest, digest_size);
1326 			kfree(digest);
1327 		}
1328 	}
1329 
1330 	/* Free peer_req and pages before send.
1331 	 * In case we block on congestion, we could otherwise run into
1332 	 * some distributed deadlock, if the other side blocks on
1333 	 * congestion as well, because our receiver blocks in
1334 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1335 	drbd_free_peer_req(device, peer_req);
1336 	if (!eq)
1337 		drbd_ov_out_of_sync_found(peer_device, sector, size);
1338 	else
1339 		ov_out_of_sync_print(peer_device);
1340 
1341 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1342 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1343 
1344 	dec_unacked(device);
1345 
1346 	--device->ov_left;
1347 
1348 	/* let's advance progress step marks only for every other megabyte */
1349 	if ((device->ov_left & 0x200) == 0x200)
1350 		drbd_advance_rs_marks(peer_device, device->ov_left);
1351 
1352 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1353 		(sector + (size>>9)) >= device->ov_stop_sector;
1354 
1355 	if (device->ov_left == 0 || stop_sector_reached) {
1356 		ov_out_of_sync_print(peer_device);
1357 		drbd_resync_finished(peer_device);
1358 	}
1359 
1360 	return err;
1361 }
1362 
1363 /* FIXME
1364  * We need to track the number of pending barrier acks,
1365  * and to be able to wait for them.
1366  * See also comment in drbd_adm_attach before drbd_suspend_io.
1367  */
drbd_send_barrier(struct drbd_connection * connection)1368 static int drbd_send_barrier(struct drbd_connection *connection)
1369 {
1370 	struct p_barrier *p;
1371 	struct drbd_socket *sock;
1372 
1373 	sock = &connection->data;
1374 	p = conn_prepare_command(connection, sock);
1375 	if (!p)
1376 		return -EIO;
1377 	p->barrier = connection->send.current_epoch_nr;
1378 	p->pad = 0;
1379 	connection->send.current_epoch_writes = 0;
1380 	connection->send.last_sent_barrier_jif = jiffies;
1381 
1382 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1383 }
1384 
pd_send_unplug_remote(struct drbd_peer_device * pd)1385 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1386 {
1387 	struct drbd_socket *sock = &pd->connection->data;
1388 	if (!drbd_prepare_command(pd, sock))
1389 		return -EIO;
1390 	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1391 }
1392 
w_send_write_hint(struct drbd_work * w,int cancel)1393 int w_send_write_hint(struct drbd_work *w, int cancel)
1394 {
1395 	struct drbd_device *device =
1396 		container_of(w, struct drbd_device, unplug_work);
1397 
1398 	if (cancel)
1399 		return 0;
1400 	return pd_send_unplug_remote(first_peer_device(device));
1401 }
1402 
re_init_if_first_write(struct drbd_connection * connection,unsigned int epoch)1403 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1404 {
1405 	if (!connection->send.seen_any_write_yet) {
1406 		connection->send.seen_any_write_yet = true;
1407 		connection->send.current_epoch_nr = epoch;
1408 		connection->send.current_epoch_writes = 0;
1409 		connection->send.last_sent_barrier_jif = jiffies;
1410 	}
1411 }
1412 
maybe_send_barrier(struct drbd_connection * connection,unsigned int epoch)1413 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1414 {
1415 	/* re-init if first write on this connection */
1416 	if (!connection->send.seen_any_write_yet)
1417 		return;
1418 	if (connection->send.current_epoch_nr != epoch) {
1419 		if (connection->send.current_epoch_writes)
1420 			drbd_send_barrier(connection);
1421 		connection->send.current_epoch_nr = epoch;
1422 	}
1423 }
1424 
w_send_out_of_sync(struct drbd_work * w,int cancel)1425 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1426 {
1427 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1428 	struct drbd_device *device = req->device;
1429 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1430 	struct drbd_connection *const connection = peer_device->connection;
1431 	int err;
1432 
1433 	if (unlikely(cancel)) {
1434 		req_mod(req, SEND_CANCELED, peer_device);
1435 		return 0;
1436 	}
1437 	req->pre_send_jif = jiffies;
1438 
1439 	/* this time, no connection->send.current_epoch_writes++;
1440 	 * If it was sent, it was the closing barrier for the last
1441 	 * replicated epoch, before we went into AHEAD mode.
1442 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1443 	maybe_send_barrier(connection, req->epoch);
1444 
1445 	err = drbd_send_out_of_sync(peer_device, req);
1446 	req_mod(req, OOS_HANDED_TO_NETWORK, peer_device);
1447 
1448 	return err;
1449 }
1450 
1451 /**
1452  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1453  * @w:		work object.
1454  * @cancel:	The connection will be closed anyways
1455  */
w_send_dblock(struct drbd_work * w,int cancel)1456 int w_send_dblock(struct drbd_work *w, int cancel)
1457 {
1458 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1459 	struct drbd_device *device = req->device;
1460 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1461 	struct drbd_connection *connection = peer_device->connection;
1462 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1463 	int err;
1464 
1465 	if (unlikely(cancel)) {
1466 		req_mod(req, SEND_CANCELED, peer_device);
1467 		return 0;
1468 	}
1469 	req->pre_send_jif = jiffies;
1470 
1471 	re_init_if_first_write(connection, req->epoch);
1472 	maybe_send_barrier(connection, req->epoch);
1473 	connection->send.current_epoch_writes++;
1474 
1475 	err = drbd_send_dblock(peer_device, req);
1476 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1477 
1478 	if (do_send_unplug && !err)
1479 		pd_send_unplug_remote(peer_device);
1480 
1481 	return err;
1482 }
1483 
1484 /**
1485  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1486  * @w:		work object.
1487  * @cancel:	The connection will be closed anyways
1488  */
w_send_read_req(struct drbd_work * w,int cancel)1489 int w_send_read_req(struct drbd_work *w, int cancel)
1490 {
1491 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1492 	struct drbd_device *device = req->device;
1493 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1494 	struct drbd_connection *connection = peer_device->connection;
1495 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1496 	int err;
1497 
1498 	if (unlikely(cancel)) {
1499 		req_mod(req, SEND_CANCELED, peer_device);
1500 		return 0;
1501 	}
1502 	req->pre_send_jif = jiffies;
1503 
1504 	/* Even read requests may close a write epoch,
1505 	 * if there was any yet. */
1506 	maybe_send_barrier(connection, req->epoch);
1507 
1508 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1509 				 (unsigned long)req);
1510 
1511 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1512 
1513 	if (do_send_unplug && !err)
1514 		pd_send_unplug_remote(peer_device);
1515 
1516 	return err;
1517 }
1518 
w_restart_disk_io(struct drbd_work * w,int cancel)1519 int w_restart_disk_io(struct drbd_work *w, int cancel)
1520 {
1521 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1522 	struct drbd_device *device = req->device;
1523 
1524 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1525 		drbd_al_begin_io(device, &req->i);
1526 
1527 	req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1528 					   req->master_bio, GFP_NOIO,
1529 					  &drbd_io_bio_set);
1530 	req->private_bio->bi_private = req;
1531 	req->private_bio->bi_end_io = drbd_request_endio;
1532 	submit_bio_noacct(req->private_bio);
1533 
1534 	return 0;
1535 }
1536 
_drbd_may_sync_now(struct drbd_device * device)1537 static int _drbd_may_sync_now(struct drbd_device *device)
1538 {
1539 	struct drbd_device *odev = device;
1540 	int resync_after;
1541 
1542 	while (1) {
1543 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1544 			return 1;
1545 		rcu_read_lock();
1546 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1547 		rcu_read_unlock();
1548 		if (resync_after == -1)
1549 			return 1;
1550 		odev = minor_to_device(resync_after);
1551 		if (!odev)
1552 			return 1;
1553 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1554 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1555 		    odev->state.aftr_isp || odev->state.peer_isp ||
1556 		    odev->state.user_isp)
1557 			return 0;
1558 	}
1559 }
1560 
1561 /**
1562  * drbd_pause_after() - Pause resync on all devices that may not resync now
1563  * @device:	DRBD device.
1564  *
1565  * Called from process context only (admin command and after_state_ch).
1566  */
drbd_pause_after(struct drbd_device * device)1567 static bool drbd_pause_after(struct drbd_device *device)
1568 {
1569 	bool changed = false;
1570 	struct drbd_device *odev;
1571 	int i;
1572 
1573 	rcu_read_lock();
1574 	idr_for_each_entry(&drbd_devices, odev, i) {
1575 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1576 			continue;
1577 		if (!_drbd_may_sync_now(odev) &&
1578 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1579 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1580 			changed = true;
1581 	}
1582 	rcu_read_unlock();
1583 
1584 	return changed;
1585 }
1586 
1587 /**
1588  * drbd_resume_next() - Resume resync on all devices that may resync now
1589  * @device:	DRBD device.
1590  *
1591  * Called from process context only (admin command and worker).
1592  */
drbd_resume_next(struct drbd_device * device)1593 static bool drbd_resume_next(struct drbd_device *device)
1594 {
1595 	bool changed = false;
1596 	struct drbd_device *odev;
1597 	int i;
1598 
1599 	rcu_read_lock();
1600 	idr_for_each_entry(&drbd_devices, odev, i) {
1601 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1602 			continue;
1603 		if (odev->state.aftr_isp) {
1604 			if (_drbd_may_sync_now(odev) &&
1605 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1606 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1607 				changed = true;
1608 		}
1609 	}
1610 	rcu_read_unlock();
1611 	return changed;
1612 }
1613 
resume_next_sg(struct drbd_device * device)1614 void resume_next_sg(struct drbd_device *device)
1615 {
1616 	lock_all_resources();
1617 	drbd_resume_next(device);
1618 	unlock_all_resources();
1619 }
1620 
suspend_other_sg(struct drbd_device * device)1621 void suspend_other_sg(struct drbd_device *device)
1622 {
1623 	lock_all_resources();
1624 	drbd_pause_after(device);
1625 	unlock_all_resources();
1626 }
1627 
1628 /* caller must lock_all_resources() */
drbd_resync_after_valid(struct drbd_device * device,int o_minor)1629 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1630 {
1631 	struct drbd_device *odev;
1632 	int resync_after;
1633 
1634 	if (o_minor == -1)
1635 		return NO_ERROR;
1636 	if (o_minor < -1 || o_minor > MINORMASK)
1637 		return ERR_RESYNC_AFTER;
1638 
1639 	/* check for loops */
1640 	odev = minor_to_device(o_minor);
1641 	while (1) {
1642 		if (odev == device)
1643 			return ERR_RESYNC_AFTER_CYCLE;
1644 
1645 		/* You are free to depend on diskless, non-existing,
1646 		 * or not yet/no longer existing minors.
1647 		 * We only reject dependency loops.
1648 		 * We cannot follow the dependency chain beyond a detached or
1649 		 * missing minor.
1650 		 */
1651 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1652 			return NO_ERROR;
1653 
1654 		rcu_read_lock();
1655 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1656 		rcu_read_unlock();
1657 		/* dependency chain ends here, no cycles. */
1658 		if (resync_after == -1)
1659 			return NO_ERROR;
1660 
1661 		/* follow the dependency chain */
1662 		odev = minor_to_device(resync_after);
1663 	}
1664 }
1665 
1666 /* caller must lock_all_resources() */
drbd_resync_after_changed(struct drbd_device * device)1667 void drbd_resync_after_changed(struct drbd_device *device)
1668 {
1669 	int changed;
1670 
1671 	do {
1672 		changed  = drbd_pause_after(device);
1673 		changed |= drbd_resume_next(device);
1674 	} while (changed);
1675 }
1676 
drbd_rs_controller_reset(struct drbd_peer_device * peer_device)1677 void drbd_rs_controller_reset(struct drbd_peer_device *peer_device)
1678 {
1679 	struct drbd_device *device = peer_device->device;
1680 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1681 	struct fifo_buffer *plan;
1682 
1683 	atomic_set(&device->rs_sect_in, 0);
1684 	atomic_set(&device->rs_sect_ev, 0);
1685 	device->rs_in_flight = 0;
1686 	device->rs_last_events =
1687 		(int)part_stat_read_accum(disk->part0, sectors);
1688 
1689 	/* Updating the RCU protected object in place is necessary since
1690 	   this function gets called from atomic context.
1691 	   It is valid since all other updates also lead to an completely
1692 	   empty fifo */
1693 	rcu_read_lock();
1694 	plan = rcu_dereference(device->rs_plan_s);
1695 	plan->total = 0;
1696 	fifo_set(plan, 0);
1697 	rcu_read_unlock();
1698 }
1699 
start_resync_timer_fn(struct timer_list * t)1700 void start_resync_timer_fn(struct timer_list *t)
1701 {
1702 	struct drbd_device *device = timer_container_of(device, t,
1703 							start_resync_timer);
1704 	drbd_device_post_work(device, RS_START);
1705 }
1706 
do_start_resync(struct drbd_device * device)1707 static void do_start_resync(struct drbd_device *device)
1708 {
1709 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1710 		drbd_warn(device, "postponing start_resync ...\n");
1711 		device->start_resync_timer.expires = jiffies + HZ/10;
1712 		add_timer(&device->start_resync_timer);
1713 		return;
1714 	}
1715 
1716 	drbd_start_resync(device, C_SYNC_SOURCE);
1717 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1718 }
1719 
use_checksum_based_resync(struct drbd_connection * connection,struct drbd_device * device)1720 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1721 {
1722 	bool csums_after_crash_only;
1723 	rcu_read_lock();
1724 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1725 	rcu_read_unlock();
1726 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1727 		connection->csums_tfm &&			/* configured? */
1728 		(csums_after_crash_only == false		/* use for each resync? */
1729 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1730 }
1731 
1732 /**
1733  * drbd_start_resync() - Start the resync process
1734  * @device:	DRBD device.
1735  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1736  *
1737  * This function might bring you directly into one of the
1738  * C_PAUSED_SYNC_* states.
1739  */
drbd_start_resync(struct drbd_device * device,enum drbd_conns side)1740 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1741 {
1742 	struct drbd_peer_device *peer_device = first_peer_device(device);
1743 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1744 	union drbd_state ns;
1745 	int r;
1746 
1747 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1748 		drbd_err(device, "Resync already running!\n");
1749 		return;
1750 	}
1751 
1752 	if (!connection) {
1753 		drbd_err(device, "No connection to peer, aborting!\n");
1754 		return;
1755 	}
1756 
1757 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1758 		if (side == C_SYNC_TARGET) {
1759 			/* Since application IO was locked out during C_WF_BITMAP_T and
1760 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1761 			   we check that we might make the data inconsistent. */
1762 			r = drbd_khelper(device, "before-resync-target");
1763 			r = (r >> 8) & 0xff;
1764 			if (r > 0) {
1765 				drbd_info(device, "before-resync-target handler returned %d, "
1766 					 "dropping connection.\n", r);
1767 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1768 				return;
1769 			}
1770 		} else /* C_SYNC_SOURCE */ {
1771 			r = drbd_khelper(device, "before-resync-source");
1772 			r = (r >> 8) & 0xff;
1773 			if (r > 0) {
1774 				if (r == 3) {
1775 					drbd_info(device, "before-resync-source handler returned %d, "
1776 						 "ignoring. Old userland tools?", r);
1777 				} else {
1778 					drbd_info(device, "before-resync-source handler returned %d, "
1779 						 "dropping connection.\n", r);
1780 					conn_request_state(connection,
1781 							   NS(conn, C_DISCONNECTING), CS_HARD);
1782 					return;
1783 				}
1784 			}
1785 		}
1786 	}
1787 
1788 	if (current == connection->worker.task) {
1789 		/* The worker should not sleep waiting for state_mutex,
1790 		   that can take long */
1791 		if (!mutex_trylock(device->state_mutex)) {
1792 			set_bit(B_RS_H_DONE, &device->flags);
1793 			device->start_resync_timer.expires = jiffies + HZ/5;
1794 			add_timer(&device->start_resync_timer);
1795 			return;
1796 		}
1797 	} else {
1798 		mutex_lock(device->state_mutex);
1799 	}
1800 
1801 	lock_all_resources();
1802 	clear_bit(B_RS_H_DONE, &device->flags);
1803 	/* Did some connection breakage or IO error race with us? */
1804 	if (device->state.conn < C_CONNECTED
1805 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1806 		unlock_all_resources();
1807 		goto out;
1808 	}
1809 
1810 	ns = drbd_read_state(device);
1811 
1812 	ns.aftr_isp = !_drbd_may_sync_now(device);
1813 
1814 	ns.conn = side;
1815 
1816 	if (side == C_SYNC_TARGET)
1817 		ns.disk = D_INCONSISTENT;
1818 	else /* side == C_SYNC_SOURCE */
1819 		ns.pdsk = D_INCONSISTENT;
1820 
1821 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1822 	ns = drbd_read_state(device);
1823 
1824 	if (ns.conn < C_CONNECTED)
1825 		r = SS_UNKNOWN_ERROR;
1826 
1827 	if (r == SS_SUCCESS) {
1828 		unsigned long tw = drbd_bm_total_weight(device);
1829 		unsigned long now = jiffies;
1830 		int i;
1831 
1832 		device->rs_failed    = 0;
1833 		device->rs_paused    = 0;
1834 		device->rs_same_csum = 0;
1835 		device->rs_last_sect_ev = 0;
1836 		device->rs_total     = tw;
1837 		device->rs_start     = now;
1838 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1839 			device->rs_mark_left[i] = tw;
1840 			device->rs_mark_time[i] = now;
1841 		}
1842 		drbd_pause_after(device);
1843 		/* Forget potentially stale cached per resync extent bit-counts.
1844 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1845 		 * disabled, and know the disk state is ok. */
1846 		spin_lock(&device->al_lock);
1847 		lc_reset(device->resync);
1848 		device->resync_locked = 0;
1849 		device->resync_wenr = LC_FREE;
1850 		spin_unlock(&device->al_lock);
1851 	}
1852 	unlock_all_resources();
1853 
1854 	if (r == SS_SUCCESS) {
1855 		wake_up(&device->al_wait); /* for lc_reset() above */
1856 		/* reset rs_last_bcast when a resync or verify is started,
1857 		 * to deal with potential jiffies wrap. */
1858 		device->rs_last_bcast = jiffies - HZ;
1859 
1860 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1861 		     drbd_conn_str(ns.conn),
1862 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1863 		     (unsigned long) device->rs_total);
1864 		if (side == C_SYNC_TARGET) {
1865 			device->bm_resync_fo = 0;
1866 			device->use_csums = use_checksum_based_resync(connection, device);
1867 		} else {
1868 			device->use_csums = false;
1869 		}
1870 
1871 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1872 		 * with w_send_oos, or the sync target will get confused as to
1873 		 * how much bits to resync.  We cannot do that always, because for an
1874 		 * empty resync and protocol < 95, we need to do it here, as we call
1875 		 * drbd_resync_finished from here in that case.
1876 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1877 		 * and from after_state_ch otherwise. */
1878 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1879 			drbd_gen_and_send_sync_uuid(peer_device);
1880 
1881 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1882 			/* This still has a race (about when exactly the peers
1883 			 * detect connection loss) that can lead to a full sync
1884 			 * on next handshake. In 8.3.9 we fixed this with explicit
1885 			 * resync-finished notifications, but the fix
1886 			 * introduces a protocol change.  Sleeping for some
1887 			 * time longer than the ping interval + timeout on the
1888 			 * SyncSource, to give the SyncTarget the chance to
1889 			 * detect connection loss, then waiting for a ping
1890 			 * response (implicit in drbd_resync_finished) reduces
1891 			 * the race considerably, but does not solve it. */
1892 			if (side == C_SYNC_SOURCE) {
1893 				struct net_conf *nc;
1894 				int timeo;
1895 
1896 				rcu_read_lock();
1897 				nc = rcu_dereference(connection->net_conf);
1898 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1899 				rcu_read_unlock();
1900 				schedule_timeout_interruptible(timeo);
1901 			}
1902 			drbd_resync_finished(peer_device);
1903 		}
1904 
1905 		drbd_rs_controller_reset(peer_device);
1906 		/* ns.conn may already be != device->state.conn,
1907 		 * we may have been paused in between, or become paused until
1908 		 * the timer triggers.
1909 		 * No matter, that is handled in resync_timer_fn() */
1910 		if (ns.conn == C_SYNC_TARGET)
1911 			mod_timer(&device->resync_timer, jiffies);
1912 
1913 		drbd_md_sync(device);
1914 	}
1915 	put_ldev(device);
1916 out:
1917 	mutex_unlock(device->state_mutex);
1918 }
1919 
update_on_disk_bitmap(struct drbd_peer_device * peer_device,bool resync_done)1920 static void update_on_disk_bitmap(struct drbd_peer_device *peer_device, bool resync_done)
1921 {
1922 	struct drbd_device *device = peer_device->device;
1923 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1924 	device->rs_last_bcast = jiffies;
1925 
1926 	if (!get_ldev(device))
1927 		return;
1928 
1929 	drbd_bm_write_lazy(device, 0);
1930 	if (resync_done && is_sync_state(device->state.conn))
1931 		drbd_resync_finished(peer_device);
1932 
1933 	drbd_bcast_event(device, &sib);
1934 	/* update timestamp, in case it took a while to write out stuff */
1935 	device->rs_last_bcast = jiffies;
1936 	put_ldev(device);
1937 }
1938 
drbd_ldev_destroy(struct drbd_device * device)1939 static void drbd_ldev_destroy(struct drbd_device *device)
1940 {
1941 	lc_destroy(device->resync);
1942 	device->resync = NULL;
1943 	lc_destroy(device->act_log);
1944 	device->act_log = NULL;
1945 
1946 	__acquire(local);
1947 	drbd_backing_dev_free(device, device->ldev);
1948 	device->ldev = NULL;
1949 	__release(local);
1950 
1951 	clear_bit(GOING_DISKLESS, &device->flags);
1952 	wake_up(&device->misc_wait);
1953 }
1954 
go_diskless(struct drbd_device * device)1955 static void go_diskless(struct drbd_device *device)
1956 {
1957 	struct drbd_peer_device *peer_device = first_peer_device(device);
1958 	D_ASSERT(device, device->state.disk == D_FAILED);
1959 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1960 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1961 	 * the protected members anymore, though, so once put_ldev reaches zero
1962 	 * again, it will be safe to free them. */
1963 
1964 	/* Try to write changed bitmap pages, read errors may have just
1965 	 * set some bits outside the area covered by the activity log.
1966 	 *
1967 	 * If we have an IO error during the bitmap writeout,
1968 	 * we will want a full sync next time, just in case.
1969 	 * (Do we want a specific meta data flag for this?)
1970 	 *
1971 	 * If that does not make it to stable storage either,
1972 	 * we cannot do anything about that anymore.
1973 	 *
1974 	 * We still need to check if both bitmap and ldev are present, we may
1975 	 * end up here after a failed attach, before ldev was even assigned.
1976 	 */
1977 	if (device->bitmap && device->ldev) {
1978 		/* An interrupted resync or similar is allowed to recounts bits
1979 		 * while we detach.
1980 		 * Any modifications would not be expected anymore, though.
1981 		 */
1982 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1983 					"detach", BM_LOCKED_TEST_ALLOWED, peer_device)) {
1984 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1985 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1986 				drbd_md_sync(device);
1987 			}
1988 		}
1989 	}
1990 
1991 	drbd_force_state(device, NS(disk, D_DISKLESS));
1992 }
1993 
do_md_sync(struct drbd_device * device)1994 static int do_md_sync(struct drbd_device *device)
1995 {
1996 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1997 	drbd_md_sync(device);
1998 	return 0;
1999 }
2000 
2001 /* only called from drbd_worker thread, no locking */
__update_timing_details(struct drbd_thread_timing_details * tdp,unsigned int * cb_nr,void * cb,const char * fn,const unsigned int line)2002 void __update_timing_details(
2003 		struct drbd_thread_timing_details *tdp,
2004 		unsigned int *cb_nr,
2005 		void *cb,
2006 		const char *fn, const unsigned int line)
2007 {
2008 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2009 	struct drbd_thread_timing_details *td = tdp + i;
2010 
2011 	td->start_jif = jiffies;
2012 	td->cb_addr = cb;
2013 	td->caller_fn = fn;
2014 	td->line = line;
2015 	td->cb_nr = *cb_nr;
2016 
2017 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2018 	td = tdp + i;
2019 	memset(td, 0, sizeof(*td));
2020 
2021 	++(*cb_nr);
2022 }
2023 
do_device_work(struct drbd_device * device,const unsigned long todo)2024 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2025 {
2026 	if (test_bit(MD_SYNC, &todo))
2027 		do_md_sync(device);
2028 	if (test_bit(RS_DONE, &todo) ||
2029 	    test_bit(RS_PROGRESS, &todo))
2030 		update_on_disk_bitmap(first_peer_device(device), test_bit(RS_DONE, &todo));
2031 	if (test_bit(GO_DISKLESS, &todo))
2032 		go_diskless(device);
2033 	if (test_bit(DESTROY_DISK, &todo))
2034 		drbd_ldev_destroy(device);
2035 	if (test_bit(RS_START, &todo))
2036 		do_start_resync(device);
2037 }
2038 
2039 #define DRBD_DEVICE_WORK_MASK	\
2040 	((1UL << GO_DISKLESS)	\
2041 	|(1UL << DESTROY_DISK)	\
2042 	|(1UL << MD_SYNC)	\
2043 	|(1UL << RS_START)	\
2044 	|(1UL << RS_PROGRESS)	\
2045 	|(1UL << RS_DONE)	\
2046 	)
2047 
get_work_bits(unsigned long * flags)2048 static unsigned long get_work_bits(unsigned long *flags)
2049 {
2050 	unsigned long old, new;
2051 	do {
2052 		old = *flags;
2053 		new = old & ~DRBD_DEVICE_WORK_MASK;
2054 	} while (cmpxchg(flags, old, new) != old);
2055 	return old & DRBD_DEVICE_WORK_MASK;
2056 }
2057 
do_unqueued_work(struct drbd_connection * connection)2058 static void do_unqueued_work(struct drbd_connection *connection)
2059 {
2060 	struct drbd_peer_device *peer_device;
2061 	int vnr;
2062 
2063 	rcu_read_lock();
2064 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2065 		struct drbd_device *device = peer_device->device;
2066 		unsigned long todo = get_work_bits(&device->flags);
2067 		if (!todo)
2068 			continue;
2069 
2070 		kref_get(&device->kref);
2071 		rcu_read_unlock();
2072 		do_device_work(device, todo);
2073 		kref_put(&device->kref, drbd_destroy_device);
2074 		rcu_read_lock();
2075 	}
2076 	rcu_read_unlock();
2077 }
2078 
dequeue_work_batch(struct drbd_work_queue * queue,struct list_head * work_list)2079 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2080 {
2081 	spin_lock_irq(&queue->q_lock);
2082 	list_splice_tail_init(&queue->q, work_list);
2083 	spin_unlock_irq(&queue->q_lock);
2084 	return !list_empty(work_list);
2085 }
2086 
wait_for_work(struct drbd_connection * connection,struct list_head * work_list)2087 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2088 {
2089 	DEFINE_WAIT(wait);
2090 	struct net_conf *nc;
2091 	int uncork, cork;
2092 
2093 	dequeue_work_batch(&connection->sender_work, work_list);
2094 	if (!list_empty(work_list))
2095 		return;
2096 
2097 	/* Still nothing to do?
2098 	 * Maybe we still need to close the current epoch,
2099 	 * even if no new requests are queued yet.
2100 	 *
2101 	 * Also, poke TCP, just in case.
2102 	 * Then wait for new work (or signal). */
2103 	rcu_read_lock();
2104 	nc = rcu_dereference(connection->net_conf);
2105 	uncork = nc ? nc->tcp_cork : 0;
2106 	rcu_read_unlock();
2107 	if (uncork) {
2108 		mutex_lock(&connection->data.mutex);
2109 		if (connection->data.socket)
2110 			tcp_sock_set_cork(connection->data.socket->sk, false);
2111 		mutex_unlock(&connection->data.mutex);
2112 	}
2113 
2114 	for (;;) {
2115 		int send_barrier;
2116 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2117 		spin_lock_irq(&connection->resource->req_lock);
2118 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2119 		if (!list_empty(&connection->sender_work.q))
2120 			list_splice_tail_init(&connection->sender_work.q, work_list);
2121 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2122 		if (!list_empty(work_list) || signal_pending(current)) {
2123 			spin_unlock_irq(&connection->resource->req_lock);
2124 			break;
2125 		}
2126 
2127 		/* We found nothing new to do, no to-be-communicated request,
2128 		 * no other work item.  We may still need to close the last
2129 		 * epoch.  Next incoming request epoch will be connection ->
2130 		 * current transfer log epoch number.  If that is different
2131 		 * from the epoch of the last request we communicated, it is
2132 		 * safe to send the epoch separating barrier now.
2133 		 */
2134 		send_barrier =
2135 			atomic_read(&connection->current_tle_nr) !=
2136 			connection->send.current_epoch_nr;
2137 		spin_unlock_irq(&connection->resource->req_lock);
2138 
2139 		if (send_barrier)
2140 			maybe_send_barrier(connection,
2141 					connection->send.current_epoch_nr + 1);
2142 
2143 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2144 			break;
2145 
2146 		/* drbd_send() may have called flush_signals() */
2147 		if (get_t_state(&connection->worker) != RUNNING)
2148 			break;
2149 
2150 		schedule();
2151 		/* may be woken up for other things but new work, too,
2152 		 * e.g. if the current epoch got closed.
2153 		 * In which case we send the barrier above. */
2154 	}
2155 	finish_wait(&connection->sender_work.q_wait, &wait);
2156 
2157 	/* someone may have changed the config while we have been waiting above. */
2158 	rcu_read_lock();
2159 	nc = rcu_dereference(connection->net_conf);
2160 	cork = nc ? nc->tcp_cork : 0;
2161 	rcu_read_unlock();
2162 	mutex_lock(&connection->data.mutex);
2163 	if (connection->data.socket) {
2164 		if (cork)
2165 			tcp_sock_set_cork(connection->data.socket->sk, true);
2166 		else if (!uncork)
2167 			tcp_sock_set_cork(connection->data.socket->sk, false);
2168 	}
2169 	mutex_unlock(&connection->data.mutex);
2170 }
2171 
drbd_worker(struct drbd_thread * thi)2172 int drbd_worker(struct drbd_thread *thi)
2173 {
2174 	struct drbd_connection *connection = thi->connection;
2175 	struct drbd_work *w = NULL;
2176 	struct drbd_peer_device *peer_device;
2177 	LIST_HEAD(work_list);
2178 	int vnr;
2179 
2180 	while (get_t_state(thi) == RUNNING) {
2181 		drbd_thread_current_set_cpu(thi);
2182 
2183 		if (list_empty(&work_list)) {
2184 			update_worker_timing_details(connection, wait_for_work);
2185 			wait_for_work(connection, &work_list);
2186 		}
2187 
2188 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2189 			update_worker_timing_details(connection, do_unqueued_work);
2190 			do_unqueued_work(connection);
2191 		}
2192 
2193 		if (signal_pending(current)) {
2194 			flush_signals(current);
2195 			if (get_t_state(thi) == RUNNING) {
2196 				drbd_warn(connection, "Worker got an unexpected signal\n");
2197 				continue;
2198 			}
2199 			break;
2200 		}
2201 
2202 		if (get_t_state(thi) != RUNNING)
2203 			break;
2204 
2205 		if (!list_empty(&work_list)) {
2206 			w = list_first_entry(&work_list, struct drbd_work, list);
2207 			list_del_init(&w->list);
2208 			update_worker_timing_details(connection, w->cb);
2209 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2210 				continue;
2211 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2212 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2213 		}
2214 	}
2215 
2216 	do {
2217 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2218 			update_worker_timing_details(connection, do_unqueued_work);
2219 			do_unqueued_work(connection);
2220 		}
2221 		if (!list_empty(&work_list)) {
2222 			w = list_first_entry(&work_list, struct drbd_work, list);
2223 			list_del_init(&w->list);
2224 			update_worker_timing_details(connection, w->cb);
2225 			w->cb(w, 1);
2226 		} else
2227 			dequeue_work_batch(&connection->sender_work, &work_list);
2228 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2229 
2230 	rcu_read_lock();
2231 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2232 		struct drbd_device *device = peer_device->device;
2233 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2234 		kref_get(&device->kref);
2235 		rcu_read_unlock();
2236 		drbd_device_cleanup(device);
2237 		kref_put(&device->kref, drbd_destroy_device);
2238 		rcu_read_lock();
2239 	}
2240 	rcu_read_unlock();
2241 
2242 	return 0;
2243 }
2244