1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_worker.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11 
12 */
13 
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26 
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30 
31 static int make_ov_request(struct drbd_peer_device *, int);
32 static int make_resync_request(struct drbd_peer_device *, int);
33 
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46 
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
drbd_md_endio(struct bio * bio)50 void drbd_md_endio(struct bio *bio)
51 {
52 	struct drbd_device *device;
53 
54 	device = bio->bi_private;
55 	device->md_io.error = blk_status_to_errno(bio->bi_status);
56 
57 	/* special case: drbd_md_read() during drbd_adm_attach() */
58 	if (device->ldev)
59 		put_ldev(device);
60 	bio_put(bio);
61 
62 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63 	 * to timeout on the lower level device, and eventually detach from it.
64 	 * If this io completion runs after that timeout expired, this
65 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
66 	 * During normal operation, this only puts that extra reference
67 	 * down to 1 again.
68 	 * Make sure we first drop the reference, and only then signal
69 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70 	 * next drbd_md_sync_page_io(), that we trigger the
71 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72 	 */
73 	drbd_md_put_buffer(device);
74 	device->md_io.done = 1;
75 	wake_up(&device->misc_wait);
76 }
77 
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
drbd_endio_read_sec_final(struct drbd_peer_request * peer_req)81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83 	unsigned long flags = 0;
84 	struct drbd_peer_device *peer_device = peer_req->peer_device;
85 	struct drbd_device *device = peer_device->device;
86 
87 	spin_lock_irqsave(&device->resource->req_lock, flags);
88 	device->read_cnt += peer_req->i.size >> 9;
89 	list_del(&peer_req->w.list);
90 	if (list_empty(&device->read_ee))
91 		wake_up(&device->ee_wait);
92 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
94 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
95 
96 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97 	put_ldev(device);
98 }
99 
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
drbd_endio_write_sec_final(struct drbd_peer_request * peer_req)102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104 	unsigned long flags = 0;
105 	struct drbd_peer_device *peer_device = peer_req->peer_device;
106 	struct drbd_device *device = peer_device->device;
107 	struct drbd_connection *connection = peer_device->connection;
108 	struct drbd_interval i;
109 	int do_wake;
110 	u64 block_id;
111 	int do_al_complete_io;
112 
113 	/* after we moved peer_req to done_ee,
114 	 * we may no longer access it,
115 	 * it may be freed/reused already!
116 	 * (as soon as we release the req_lock) */
117 	i = peer_req->i;
118 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119 	block_id = peer_req->block_id;
120 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121 
122 	if (peer_req->flags & EE_WAS_ERROR) {
123 		/* In protocol != C, we usually do not send write acks.
124 		 * In case of a write error, send the neg ack anyways. */
125 		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126 			inc_unacked(device);
127 		drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
128 	}
129 
130 	spin_lock_irqsave(&device->resource->req_lock, flags);
131 	device->writ_cnt += peer_req->i.size >> 9;
132 	list_move_tail(&peer_req->w.list, &device->done_ee);
133 
134 	/*
135 	 * Do not remove from the write_requests tree here: we did not send the
136 	 * Ack yet and did not wake possibly waiting conflicting requests.
137 	 * Removed from the tree from "drbd_process_done_ee" within the
138 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139 	 * _drbd_clear_done_ee.
140 	 */
141 
142 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143 
144 	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
145 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146 	if (peer_req->flags & EE_WAS_ERROR)
147 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148 
149 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
150 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152 			kref_put(&device->kref, drbd_destroy_device);
153 	}
154 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
155 
156 	if (block_id == ID_SYNCER)
157 		drbd_rs_complete_io(device, i.sector);
158 
159 	if (do_wake)
160 		wake_up(&device->ee_wait);
161 
162 	if (do_al_complete_io)
163 		drbd_al_complete_io(device, &i);
164 
165 	put_ldev(device);
166 }
167 
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
drbd_peer_request_endio(struct bio * bio)171 void drbd_peer_request_endio(struct bio *bio)
172 {
173 	struct drbd_peer_request *peer_req = bio->bi_private;
174 	struct drbd_device *device = peer_req->peer_device->device;
175 	bool is_write = bio_data_dir(bio) == WRITE;
176 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177 			  bio_op(bio) == REQ_OP_DISCARD;
178 
179 	if (bio->bi_status && drbd_ratelimit())
180 		drbd_warn(device, "%s: error=%d s=%llus\n",
181 				is_write ? (is_discard ? "discard" : "write")
182 					: "read", bio->bi_status,
183 				(unsigned long long)peer_req->i.sector);
184 
185 	if (bio->bi_status)
186 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187 
188 	bio_put(bio); /* no need for the bio anymore */
189 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190 		if (is_write)
191 			drbd_endio_write_sec_final(peer_req);
192 		else
193 			drbd_endio_read_sec_final(peer_req);
194 	}
195 }
196 
197 static void
drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device * device)198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201 		device->minor, device->resource->name, device->vnr);
202 }
203 
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
drbd_request_endio(struct bio * bio)206 void drbd_request_endio(struct bio *bio)
207 {
208 	unsigned long flags;
209 	struct drbd_request *req = bio->bi_private;
210 	struct drbd_device *device = req->device;
211 	struct bio_and_error m;
212 	enum drbd_req_event what;
213 
214 	/* If this request was aborted locally before,
215 	 * but now was completed "successfully",
216 	 * chances are that this caused arbitrary data corruption.
217 	 *
218 	 * "aborting" requests, or force-detaching the disk, is intended for
219 	 * completely blocked/hung local backing devices which do no longer
220 	 * complete requests at all, not even do error completions.  In this
221 	 * situation, usually a hard-reset and failover is the only way out.
222 	 *
223 	 * By "aborting", basically faking a local error-completion,
224 	 * we allow for a more graceful swichover by cleanly migrating services.
225 	 * Still the affected node has to be rebooted "soon".
226 	 *
227 	 * By completing these requests, we allow the upper layers to re-use
228 	 * the associated data pages.
229 	 *
230 	 * If later the local backing device "recovers", and now DMAs some data
231 	 * from disk into the original request pages, in the best case it will
232 	 * just put random data into unused pages; but typically it will corrupt
233 	 * meanwhile completely unrelated data, causing all sorts of damage.
234 	 *
235 	 * Which means delayed successful completion,
236 	 * especially for READ requests,
237 	 * is a reason to panic().
238 	 *
239 	 * We assume that a delayed *error* completion is OK,
240 	 * though we still will complain noisily about it.
241 	 */
242 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243 		if (drbd_ratelimit())
244 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245 
246 		if (!bio->bi_status)
247 			drbd_panic_after_delayed_completion_of_aborted_request(device);
248 	}
249 
250 	/* to avoid recursion in __req_mod */
251 	if (unlikely(bio->bi_status)) {
252 		switch (bio_op(bio)) {
253 		case REQ_OP_WRITE_ZEROES:
254 		case REQ_OP_DISCARD:
255 			if (bio->bi_status == BLK_STS_NOTSUPP)
256 				what = DISCARD_COMPLETED_NOTSUPP;
257 			else
258 				what = DISCARD_COMPLETED_WITH_ERROR;
259 			break;
260 		case REQ_OP_READ:
261 			if (bio->bi_opf & REQ_RAHEAD)
262 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
263 			else
264 				what = READ_COMPLETED_WITH_ERROR;
265 			break;
266 		default:
267 			what = WRITE_COMPLETED_WITH_ERROR;
268 			break;
269 		}
270 	} else {
271 		what = COMPLETED_OK;
272 	}
273 
274 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275 	bio_put(bio);
276 
277 	/* not req_mod(), we need irqsave here! */
278 	spin_lock_irqsave(&device->resource->req_lock, flags);
279 	__req_mod(req, what, NULL, &m);
280 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281 	put_ldev(device);
282 
283 	if (m.bio)
284 		complete_master_bio(device, &m);
285 }
286 
drbd_csum_ee(struct crypto_shash * tfm,struct drbd_peer_request * peer_req,void * digest)287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289 	SHASH_DESC_ON_STACK(desc, tfm);
290 	struct page *page = peer_req->pages;
291 	struct page *tmp;
292 	unsigned len;
293 	void *src;
294 
295 	desc->tfm = tfm;
296 
297 	crypto_shash_init(desc);
298 
299 	src = kmap_atomic(page);
300 	while ((tmp = page_chain_next(page))) {
301 		/* all but the last page will be fully used */
302 		crypto_shash_update(desc, src, PAGE_SIZE);
303 		kunmap_atomic(src);
304 		page = tmp;
305 		src = kmap_atomic(page);
306 	}
307 	/* and now the last, possibly only partially used page */
308 	len = peer_req->i.size & (PAGE_SIZE - 1);
309 	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310 	kunmap_atomic(src);
311 
312 	crypto_shash_final(desc, digest);
313 	shash_desc_zero(desc);
314 }
315 
drbd_csum_bio(struct crypto_shash * tfm,struct bio * bio,void * digest)316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318 	SHASH_DESC_ON_STACK(desc, tfm);
319 	struct bio_vec bvec;
320 	struct bvec_iter iter;
321 
322 	desc->tfm = tfm;
323 
324 	crypto_shash_init(desc);
325 
326 	bio_for_each_segment(bvec, bio, iter) {
327 		u8 *src;
328 
329 		src = bvec_kmap_local(&bvec);
330 		crypto_shash_update(desc, src, bvec.bv_len);
331 		kunmap_local(src);
332 	}
333 	crypto_shash_final(desc, digest);
334 	shash_desc_zero(desc);
335 }
336 
337 /* MAYBE merge common code with w_e_end_ov_req */
w_e_send_csum(struct drbd_work * w,int cancel)338 static int w_e_send_csum(struct drbd_work *w, int cancel)
339 {
340 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341 	struct drbd_peer_device *peer_device = peer_req->peer_device;
342 	struct drbd_device *device = peer_device->device;
343 	int digest_size;
344 	void *digest;
345 	int err = 0;
346 
347 	if (unlikely(cancel))
348 		goto out;
349 
350 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351 		goto out;
352 
353 	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354 	digest = kmalloc(digest_size, GFP_NOIO);
355 	if (digest) {
356 		sector_t sector = peer_req->i.sector;
357 		unsigned int size = peer_req->i.size;
358 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359 		/* Free peer_req and pages before send.
360 		 * In case we block on congestion, we could otherwise run into
361 		 * some distributed deadlock, if the other side blocks on
362 		 * congestion as well, because our receiver blocks in
363 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
364 		drbd_free_peer_req(device, peer_req);
365 		peer_req = NULL;
366 		inc_rs_pending(peer_device);
367 		err = drbd_send_drequest_csum(peer_device, sector, size,
368 					      digest, digest_size,
369 					      P_CSUM_RS_REQUEST);
370 		kfree(digest);
371 	} else {
372 		drbd_err(device, "kmalloc() of digest failed.\n");
373 		err = -ENOMEM;
374 	}
375 
376 out:
377 	if (peer_req)
378 		drbd_free_peer_req(device, peer_req);
379 
380 	if (unlikely(err))
381 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382 	return err;
383 }
384 
385 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
386 
read_for_csum(struct drbd_peer_device * peer_device,sector_t sector,int size)387 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388 {
389 	struct drbd_device *device = peer_device->device;
390 	struct drbd_peer_request *peer_req;
391 
392 	if (!get_ldev(device))
393 		return -EIO;
394 
395 	/* GFP_TRY, because if there is no memory available right now, this may
396 	 * be rescheduled for later. It is "only" background resync, after all. */
397 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398 				       size, size, GFP_TRY);
399 	if (!peer_req)
400 		goto defer;
401 
402 	peer_req->w.cb = w_e_send_csum;
403 	peer_req->opf = REQ_OP_READ;
404 	spin_lock_irq(&device->resource->req_lock);
405 	list_add_tail(&peer_req->w.list, &device->read_ee);
406 	spin_unlock_irq(&device->resource->req_lock);
407 
408 	atomic_add(size >> 9, &device->rs_sect_ev);
409 	if (drbd_submit_peer_request(peer_req) == 0)
410 		return 0;
411 
412 	/* If it failed because of ENOMEM, retry should help.  If it failed
413 	 * because bio_add_page failed (probably broken lower level driver),
414 	 * retry may or may not help.
415 	 * If it does not, you may need to force disconnect. */
416 	spin_lock_irq(&device->resource->req_lock);
417 	list_del(&peer_req->w.list);
418 	spin_unlock_irq(&device->resource->req_lock);
419 
420 	drbd_free_peer_req(device, peer_req);
421 defer:
422 	put_ldev(device);
423 	return -EAGAIN;
424 }
425 
w_resync_timer(struct drbd_work * w,int cancel)426 int w_resync_timer(struct drbd_work *w, int cancel)
427 {
428 	struct drbd_device *device =
429 		container_of(w, struct drbd_device, resync_work);
430 
431 	switch (device->state.conn) {
432 	case C_VERIFY_S:
433 		make_ov_request(first_peer_device(device), cancel);
434 		break;
435 	case C_SYNC_TARGET:
436 		make_resync_request(first_peer_device(device), cancel);
437 		break;
438 	}
439 
440 	return 0;
441 }
442 
resync_timer_fn(struct timer_list * t)443 void resync_timer_fn(struct timer_list *t)
444 {
445 	struct drbd_device *device = timer_container_of(device, t,
446 							resync_timer);
447 
448 	drbd_queue_work_if_unqueued(
449 		&first_peer_device(device)->connection->sender_work,
450 		&device->resync_work);
451 }
452 
fifo_set(struct fifo_buffer * fb,int value)453 static void fifo_set(struct fifo_buffer *fb, int value)
454 {
455 	int i;
456 
457 	for (i = 0; i < fb->size; i++)
458 		fb->values[i] = value;
459 }
460 
fifo_push(struct fifo_buffer * fb,int value)461 static int fifo_push(struct fifo_buffer *fb, int value)
462 {
463 	int ov;
464 
465 	ov = fb->values[fb->head_index];
466 	fb->values[fb->head_index++] = value;
467 
468 	if (fb->head_index >= fb->size)
469 		fb->head_index = 0;
470 
471 	return ov;
472 }
473 
fifo_add_val(struct fifo_buffer * fb,int value)474 static void fifo_add_val(struct fifo_buffer *fb, int value)
475 {
476 	int i;
477 
478 	for (i = 0; i < fb->size; i++)
479 		fb->values[i] += value;
480 }
481 
fifo_alloc(unsigned int fifo_size)482 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
483 {
484 	struct fifo_buffer *fb;
485 
486 	fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
487 	if (!fb)
488 		return NULL;
489 
490 	fb->head_index = 0;
491 	fb->size = fifo_size;
492 	fb->total = 0;
493 
494 	return fb;
495 }
496 
drbd_rs_controller(struct drbd_peer_device * peer_device,unsigned int sect_in)497 static int drbd_rs_controller(struct drbd_peer_device *peer_device, unsigned int sect_in)
498 {
499 	struct drbd_device *device = peer_device->device;
500 	struct disk_conf *dc;
501 	unsigned int want;     /* The number of sectors we want in-flight */
502 	int req_sect; /* Number of sectors to request in this turn */
503 	int correction; /* Number of sectors more we need in-flight */
504 	int cps; /* correction per invocation of drbd_rs_controller() */
505 	int steps; /* Number of time steps to plan ahead */
506 	int curr_corr;
507 	int max_sect;
508 	struct fifo_buffer *plan;
509 
510 	dc = rcu_dereference(device->ldev->disk_conf);
511 	plan = rcu_dereference(device->rs_plan_s);
512 
513 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
514 
515 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
516 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
517 	} else { /* normal path */
518 		want = dc->c_fill_target ? dc->c_fill_target :
519 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
520 	}
521 
522 	correction = want - device->rs_in_flight - plan->total;
523 
524 	/* Plan ahead */
525 	cps = correction / steps;
526 	fifo_add_val(plan, cps);
527 	plan->total += cps * steps;
528 
529 	/* What we do in this step */
530 	curr_corr = fifo_push(plan, 0);
531 	plan->total -= curr_corr;
532 
533 	req_sect = sect_in + curr_corr;
534 	if (req_sect < 0)
535 		req_sect = 0;
536 
537 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
538 	if (req_sect > max_sect)
539 		req_sect = max_sect;
540 
541 	/*
542 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
543 		 sect_in, device->rs_in_flight, want, correction,
544 		 steps, cps, device->rs_planed, curr_corr, req_sect);
545 	*/
546 
547 	return req_sect;
548 }
549 
drbd_rs_number_requests(struct drbd_peer_device * peer_device)550 static int drbd_rs_number_requests(struct drbd_peer_device *peer_device)
551 {
552 	struct drbd_device *device = peer_device->device;
553 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
554 	int number, mxb;
555 
556 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
557 	device->rs_in_flight -= sect_in;
558 
559 	rcu_read_lock();
560 	mxb = drbd_get_max_buffers(device) / 2;
561 	if (rcu_dereference(device->rs_plan_s)->size) {
562 		number = drbd_rs_controller(peer_device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564 	} else {
565 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
567 	}
568 	rcu_read_unlock();
569 
570 	/* Don't have more than "max-buffers"/2 in-flight.
571 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572 	 * potentially causing a distributed deadlock on congestion during
573 	 * online-verify or (checksum-based) resync, if max-buffers,
574 	 * socket buffer sizes and resync rate settings are mis-configured. */
575 
576 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
578 	 * "number of pages" (typically also 4k),
579 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
580 	if (mxb - device->rs_in_flight/8 < number)
581 		number = mxb - device->rs_in_flight/8;
582 
583 	return number;
584 }
585 
make_resync_request(struct drbd_peer_device * const peer_device,int cancel)586 static int make_resync_request(struct drbd_peer_device *const peer_device, int cancel)
587 {
588 	struct drbd_device *const device = peer_device->device;
589 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590 	unsigned long bit;
591 	sector_t sector;
592 	const sector_t capacity = get_capacity(device->vdisk);
593 	int max_bio_size;
594 	int number, rollback_i, size;
595 	int align, requeue = 0;
596 	int i = 0;
597 	int discard_granularity = 0;
598 
599 	if (unlikely(cancel))
600 		return 0;
601 
602 	if (device->rs_total == 0) {
603 		/* empty resync? */
604 		drbd_resync_finished(peer_device);
605 		return 0;
606 	}
607 
608 	if (!get_ldev(device)) {
609 		/* Since we only need to access device->rsync a
610 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
611 		   to continue resync with a broken disk makes no sense at
612 		   all */
613 		drbd_err(device, "Disk broke down during resync!\n");
614 		return 0;
615 	}
616 
617 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
618 		rcu_read_lock();
619 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
620 		rcu_read_unlock();
621 	}
622 
623 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
624 	number = drbd_rs_number_requests(peer_device);
625 	if (number <= 0)
626 		goto requeue;
627 
628 	for (i = 0; i < number; i++) {
629 		/* Stop generating RS requests when half of the send buffer is filled,
630 		 * but notify TCP that we'd like to have more space. */
631 		mutex_lock(&connection->data.mutex);
632 		if (connection->data.socket) {
633 			struct sock *sk = connection->data.socket->sk;
634 			int queued = sk->sk_wmem_queued;
635 			int sndbuf = sk->sk_sndbuf;
636 			if (queued > sndbuf / 2) {
637 				requeue = 1;
638 				if (sk->sk_socket)
639 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
640 			}
641 		} else
642 			requeue = 1;
643 		mutex_unlock(&connection->data.mutex);
644 		if (requeue)
645 			goto requeue;
646 
647 next_sector:
648 		size = BM_BLOCK_SIZE;
649 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
650 
651 		if (bit == DRBD_END_OF_BITMAP) {
652 			device->bm_resync_fo = drbd_bm_bits(device);
653 			put_ldev(device);
654 			return 0;
655 		}
656 
657 		sector = BM_BIT_TO_SECT(bit);
658 
659 		if (drbd_try_rs_begin_io(peer_device, sector)) {
660 			device->bm_resync_fo = bit;
661 			goto requeue;
662 		}
663 		device->bm_resync_fo = bit + 1;
664 
665 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
666 			drbd_rs_complete_io(device, sector);
667 			goto next_sector;
668 		}
669 
670 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
671 		/* try to find some adjacent bits.
672 		 * we stop if we have already the maximum req size.
673 		 *
674 		 * Additionally always align bigger requests, in order to
675 		 * be prepared for all stripe sizes of software RAIDs.
676 		 */
677 		align = 1;
678 		rollback_i = i;
679 		while (i < number) {
680 			if (size + BM_BLOCK_SIZE > max_bio_size)
681 				break;
682 
683 			/* Be always aligned */
684 			if (sector & ((1<<(align+3))-1))
685 				break;
686 
687 			if (discard_granularity && size == discard_granularity)
688 				break;
689 
690 			/* do not cross extent boundaries */
691 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
692 				break;
693 			/* now, is it actually dirty, after all?
694 			 * caution, drbd_bm_test_bit is tri-state for some
695 			 * obscure reason; ( b == 0 ) would get the out-of-band
696 			 * only accidentally right because of the "oddly sized"
697 			 * adjustment below */
698 			if (drbd_bm_test_bit(device, bit+1) != 1)
699 				break;
700 			bit++;
701 			size += BM_BLOCK_SIZE;
702 			if ((BM_BLOCK_SIZE << align) <= size)
703 				align++;
704 			i++;
705 		}
706 		/* if we merged some,
707 		 * reset the offset to start the next drbd_bm_find_next from */
708 		if (size > BM_BLOCK_SIZE)
709 			device->bm_resync_fo = bit + 1;
710 #endif
711 
712 		/* adjust very last sectors, in case we are oddly sized */
713 		if (sector + (size>>9) > capacity)
714 			size = (capacity-sector)<<9;
715 
716 		if (device->use_csums) {
717 			switch (read_for_csum(peer_device, sector, size)) {
718 			case -EIO: /* Disk failure */
719 				put_ldev(device);
720 				return -EIO;
721 			case -EAGAIN: /* allocation failed, or ldev busy */
722 				drbd_rs_complete_io(device, sector);
723 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
724 				i = rollback_i;
725 				goto requeue;
726 			case 0:
727 				/* everything ok */
728 				break;
729 			default:
730 				BUG();
731 			}
732 		} else {
733 			int err;
734 
735 			inc_rs_pending(peer_device);
736 			err = drbd_send_drequest(peer_device,
737 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
738 						 sector, size, ID_SYNCER);
739 			if (err) {
740 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
741 				dec_rs_pending(peer_device);
742 				put_ldev(device);
743 				return err;
744 			}
745 		}
746 	}
747 
748 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
749 		/* last syncer _request_ was sent,
750 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
751 		 * next sync group will resume), as soon as we receive the last
752 		 * resync data block, and the last bit is cleared.
753 		 * until then resync "work" is "inactive" ...
754 		 */
755 		put_ldev(device);
756 		return 0;
757 	}
758 
759  requeue:
760 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
761 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
762 	put_ldev(device);
763 	return 0;
764 }
765 
make_ov_request(struct drbd_peer_device * peer_device,int cancel)766 static int make_ov_request(struct drbd_peer_device *peer_device, int cancel)
767 {
768 	struct drbd_device *device = peer_device->device;
769 	int number, i, size;
770 	sector_t sector;
771 	const sector_t capacity = get_capacity(device->vdisk);
772 	bool stop_sector_reached = false;
773 
774 	if (unlikely(cancel))
775 		return 1;
776 
777 	number = drbd_rs_number_requests(peer_device);
778 
779 	sector = device->ov_position;
780 	for (i = 0; i < number; i++) {
781 		if (sector >= capacity)
782 			return 1;
783 
784 		/* We check for "finished" only in the reply path:
785 		 * w_e_end_ov_reply().
786 		 * We need to send at least one request out. */
787 		stop_sector_reached = i > 0
788 			&& verify_can_do_stop_sector(device)
789 			&& sector >= device->ov_stop_sector;
790 		if (stop_sector_reached)
791 			break;
792 
793 		size = BM_BLOCK_SIZE;
794 
795 		if (drbd_try_rs_begin_io(peer_device, sector)) {
796 			device->ov_position = sector;
797 			goto requeue;
798 		}
799 
800 		if (sector + (size>>9) > capacity)
801 			size = (capacity-sector)<<9;
802 
803 		inc_rs_pending(peer_device);
804 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
805 			dec_rs_pending(peer_device);
806 			return 0;
807 		}
808 		sector += BM_SECT_PER_BIT;
809 	}
810 	device->ov_position = sector;
811 
812  requeue:
813 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
814 	if (i == 0 || !stop_sector_reached)
815 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
816 	return 1;
817 }
818 
w_ov_finished(struct drbd_work * w,int cancel)819 int w_ov_finished(struct drbd_work *w, int cancel)
820 {
821 	struct drbd_device_work *dw =
822 		container_of(w, struct drbd_device_work, w);
823 	struct drbd_device *device = dw->device;
824 	kfree(dw);
825 	ov_out_of_sync_print(first_peer_device(device));
826 	drbd_resync_finished(first_peer_device(device));
827 
828 	return 0;
829 }
830 
w_resync_finished(struct drbd_work * w,int cancel)831 static int w_resync_finished(struct drbd_work *w, int cancel)
832 {
833 	struct drbd_device_work *dw =
834 		container_of(w, struct drbd_device_work, w);
835 	struct drbd_device *device = dw->device;
836 	kfree(dw);
837 
838 	drbd_resync_finished(first_peer_device(device));
839 
840 	return 0;
841 }
842 
ping_peer(struct drbd_device * device)843 static void ping_peer(struct drbd_device *device)
844 {
845 	struct drbd_connection *connection = first_peer_device(device)->connection;
846 
847 	clear_bit(GOT_PING_ACK, &connection->flags);
848 	request_ping(connection);
849 	wait_event(connection->ping_wait,
850 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
851 }
852 
drbd_resync_finished(struct drbd_peer_device * peer_device)853 int drbd_resync_finished(struct drbd_peer_device *peer_device)
854 {
855 	struct drbd_device *device = peer_device->device;
856 	struct drbd_connection *connection = peer_device->connection;
857 	unsigned long db, dt, dbdt;
858 	unsigned long n_oos;
859 	union drbd_state os, ns;
860 	struct drbd_device_work *dw;
861 	char *khelper_cmd = NULL;
862 	int verify_done = 0;
863 
864 	/* Remove all elements from the resync LRU. Since future actions
865 	 * might set bits in the (main) bitmap, then the entries in the
866 	 * resync LRU would be wrong. */
867 	if (drbd_rs_del_all(device)) {
868 		/* In case this is not possible now, most probably because
869 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
870 		 * queue (or even the read operations for those packets
871 		 * is not finished by now).   Retry in 100ms. */
872 
873 		schedule_timeout_interruptible(HZ / 10);
874 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875 		if (dw) {
876 			dw->w.cb = w_resync_finished;
877 			dw->device = device;
878 			drbd_queue_work(&connection->sender_work, &dw->w);
879 			return 1;
880 		}
881 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882 	}
883 
884 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885 	if (dt <= 0)
886 		dt = 1;
887 
888 	db = device->rs_total;
889 	/* adjust for verify start and stop sectors, respective reached position */
890 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891 		db -= device->ov_left;
892 
893 	dbdt = Bit2KB(db/dt);
894 	device->rs_paused /= HZ;
895 
896 	if (!get_ldev(device))
897 		goto out;
898 
899 	ping_peer(device);
900 
901 	spin_lock_irq(&device->resource->req_lock);
902 	os = drbd_read_state(device);
903 
904 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905 
906 	/* This protects us against multiple calls (that can happen in the presence
907 	   of application IO), and against connectivity loss just before we arrive here. */
908 	if (os.conn <= C_CONNECTED)
909 		goto out_unlock;
910 
911 	ns = os;
912 	ns.conn = C_CONNECTED;
913 
914 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915 	     verify_done ? "Online verify" : "Resync",
916 	     dt + device->rs_paused, device->rs_paused, dbdt);
917 
918 	n_oos = drbd_bm_total_weight(device);
919 
920 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921 		if (n_oos) {
922 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923 			      n_oos, Bit2KB(1));
924 			khelper_cmd = "out-of-sync";
925 		}
926 	} else {
927 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928 
929 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930 			khelper_cmd = "after-resync-target";
931 
932 		if (device->use_csums && device->rs_total) {
933 			const unsigned long s = device->rs_same_csum;
934 			const unsigned long t = device->rs_total;
935 			const int ratio =
936 				(t == 0)     ? 0 :
937 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
938 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939 			     "transferred %luK total %luK\n",
940 			     ratio,
941 			     Bit2KB(device->rs_same_csum),
942 			     Bit2KB(device->rs_total - device->rs_same_csum),
943 			     Bit2KB(device->rs_total));
944 		}
945 	}
946 
947 	if (device->rs_failed) {
948 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
949 
950 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951 			ns.disk = D_INCONSISTENT;
952 			ns.pdsk = D_UP_TO_DATE;
953 		} else {
954 			ns.disk = D_UP_TO_DATE;
955 			ns.pdsk = D_INCONSISTENT;
956 		}
957 	} else {
958 		ns.disk = D_UP_TO_DATE;
959 		ns.pdsk = D_UP_TO_DATE;
960 
961 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962 			if (device->p_uuid) {
963 				int i;
964 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965 					_drbd_uuid_set(device, i, device->p_uuid[i]);
966 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968 			} else {
969 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
970 			}
971 		}
972 
973 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974 			/* for verify runs, we don't update uuids here,
975 			 * so there would be nothing to report. */
976 			drbd_uuid_set_bm(device, 0UL);
977 			drbd_print_uuids(device, "updated UUIDs");
978 			if (device->p_uuid) {
979 				/* Now the two UUID sets are equal, update what we
980 				 * know of the peer. */
981 				int i;
982 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983 					device->p_uuid[i] = device->ldev->md.uuid[i];
984 			}
985 		}
986 	}
987 
988 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
989 out_unlock:
990 	spin_unlock_irq(&device->resource->req_lock);
991 
992 	/* If we have been sync source, and have an effective fencing-policy,
993 	 * once *all* volumes are back in sync, call "unfence". */
994 	if (os.conn == C_SYNC_SOURCE) {
995 		enum drbd_disk_state disk_state = D_MASK;
996 		enum drbd_disk_state pdsk_state = D_MASK;
997 		enum drbd_fencing_p fp = FP_DONT_CARE;
998 
999 		rcu_read_lock();
1000 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001 		if (fp != FP_DONT_CARE) {
1002 			struct drbd_peer_device *peer_device;
1003 			int vnr;
1004 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005 				struct drbd_device *device = peer_device->device;
1006 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008 			}
1009 		}
1010 		rcu_read_unlock();
1011 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012 			conn_khelper(connection, "unfence-peer");
1013 	}
1014 
1015 	put_ldev(device);
1016 out:
1017 	device->rs_total  = 0;
1018 	device->rs_failed = 0;
1019 	device->rs_paused = 0;
1020 
1021 	/* reset start sector, if we reached end of device */
1022 	if (verify_done && device->ov_left == 0)
1023 		device->ov_start_sector = 0;
1024 
1025 	drbd_md_sync(device);
1026 
1027 	if (khelper_cmd)
1028 		drbd_khelper(device, khelper_cmd);
1029 
1030 	return 1;
1031 }
1032 
1033 /**
1034  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1035  * @w:		work object.
1036  * @cancel:	The connection will be closed anyways
1037  */
w_e_end_data_req(struct drbd_work * w,int cancel)1038 int w_e_end_data_req(struct drbd_work *w, int cancel)
1039 {
1040 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1041 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1042 	struct drbd_device *device = peer_device->device;
1043 	int err;
1044 
1045 	if (unlikely(cancel)) {
1046 		err = 0;
1047 		goto out;
1048 	}
1049 
1050 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1051 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1052 	} else {
1053 		if (drbd_ratelimit())
1054 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1055 			    (unsigned long long)peer_req->i.sector);
1056 
1057 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1058 	}
1059 
1060 	if (unlikely(err))
1061 		drbd_err(device, "drbd_send_block() failed\n");
1062 out:
1063 	dec_unacked(device);
1064 	drbd_free_peer_req(device, peer_req);
1065 
1066 	return err;
1067 }
1068 
all_zero(struct drbd_peer_request * peer_req)1069 static bool all_zero(struct drbd_peer_request *peer_req)
1070 {
1071 	struct page *page = peer_req->pages;
1072 	unsigned int len = peer_req->i.size;
1073 
1074 	page_chain_for_each(page) {
1075 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1076 		unsigned int i, words = l / sizeof(long);
1077 		unsigned long *d;
1078 
1079 		d = kmap_atomic(page);
1080 		for (i = 0; i < words; i++) {
1081 			if (d[i]) {
1082 				kunmap_atomic(d);
1083 				return false;
1084 			}
1085 		}
1086 		kunmap_atomic(d);
1087 		len -= l;
1088 	}
1089 
1090 	return true;
1091 }
1092 
1093 /**
1094  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1095  * @w:		work object.
1096  * @cancel:	The connection will be closed anyways
1097  */
w_e_end_rsdata_req(struct drbd_work * w,int cancel)1098 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1099 {
1100 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1101 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1102 	struct drbd_device *device = peer_device->device;
1103 	int err;
1104 
1105 	if (unlikely(cancel)) {
1106 		err = 0;
1107 		goto out;
1108 	}
1109 
1110 	if (get_ldev_if_state(device, D_FAILED)) {
1111 		drbd_rs_complete_io(device, peer_req->i.sector);
1112 		put_ldev(device);
1113 	}
1114 
1115 	if (device->state.conn == C_AHEAD) {
1116 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1117 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1118 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1119 			inc_rs_pending(peer_device);
1120 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1121 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1122 			else
1123 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1124 		} else {
1125 			if (drbd_ratelimit())
1126 				drbd_err(device, "Not sending RSDataReply, "
1127 				    "partner DISKLESS!\n");
1128 			err = 0;
1129 		}
1130 	} else {
1131 		if (drbd_ratelimit())
1132 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1133 			    (unsigned long long)peer_req->i.sector);
1134 
1135 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1136 
1137 		/* update resync data with failure */
1138 		drbd_rs_failed_io(peer_device, peer_req->i.sector, peer_req->i.size);
1139 	}
1140 	if (unlikely(err))
1141 		drbd_err(device, "drbd_send_block() failed\n");
1142 out:
1143 	dec_unacked(device);
1144 	drbd_free_peer_req(device, peer_req);
1145 
1146 	return err;
1147 }
1148 
w_e_end_csum_rs_req(struct drbd_work * w,int cancel)1149 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1150 {
1151 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1152 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1153 	struct drbd_device *device = peer_device->device;
1154 	struct digest_info *di;
1155 	int digest_size;
1156 	void *digest = NULL;
1157 	int err, eq = 0;
1158 
1159 	if (unlikely(cancel)) {
1160 		err = 0;
1161 		goto out;
1162 	}
1163 
1164 	if (get_ldev(device)) {
1165 		drbd_rs_complete_io(device, peer_req->i.sector);
1166 		put_ldev(device);
1167 	}
1168 
1169 	di = peer_req->digest;
1170 
1171 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1172 		/* quick hack to try to avoid a race against reconfiguration.
1173 		 * a real fix would be much more involved,
1174 		 * introducing more locking mechanisms */
1175 		if (peer_device->connection->csums_tfm) {
1176 			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1177 			D_ASSERT(device, digest_size == di->digest_size);
1178 			digest = kmalloc(digest_size, GFP_NOIO);
1179 		}
1180 		if (digest) {
1181 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1182 			eq = !memcmp(digest, di->digest, digest_size);
1183 			kfree(digest);
1184 		}
1185 
1186 		if (eq) {
1187 			drbd_set_in_sync(peer_device, peer_req->i.sector, peer_req->i.size);
1188 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1189 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1190 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1191 		} else {
1192 			inc_rs_pending(peer_device);
1193 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1194 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1195 			kfree(di);
1196 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1197 		}
1198 	} else {
1199 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1200 		if (drbd_ratelimit())
1201 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1202 	}
1203 	if (unlikely(err))
1204 		drbd_err(device, "drbd_send_block/ack() failed\n");
1205 out:
1206 	dec_unacked(device);
1207 	drbd_free_peer_req(device, peer_req);
1208 
1209 	return err;
1210 }
1211 
w_e_end_ov_req(struct drbd_work * w,int cancel)1212 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1213 {
1214 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1215 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1216 	struct drbd_device *device = peer_device->device;
1217 	sector_t sector = peer_req->i.sector;
1218 	unsigned int size = peer_req->i.size;
1219 	int digest_size;
1220 	void *digest;
1221 	int err = 0;
1222 
1223 	if (unlikely(cancel))
1224 		goto out;
1225 
1226 	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1227 	digest = kmalloc(digest_size, GFP_NOIO);
1228 	if (!digest) {
1229 		err = 1;	/* terminate the connection in case the allocation failed */
1230 		goto out;
1231 	}
1232 
1233 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1234 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1235 	else
1236 		memset(digest, 0, digest_size);
1237 
1238 	/* Free e and pages before send.
1239 	 * In case we block on congestion, we could otherwise run into
1240 	 * some distributed deadlock, if the other side blocks on
1241 	 * congestion as well, because our receiver blocks in
1242 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1243 	drbd_free_peer_req(device, peer_req);
1244 	peer_req = NULL;
1245 	inc_rs_pending(peer_device);
1246 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1247 	if (err)
1248 		dec_rs_pending(peer_device);
1249 	kfree(digest);
1250 
1251 out:
1252 	if (peer_req)
1253 		drbd_free_peer_req(device, peer_req);
1254 	dec_unacked(device);
1255 	return err;
1256 }
1257 
drbd_ov_out_of_sync_found(struct drbd_peer_device * peer_device,sector_t sector,int size)1258 void drbd_ov_out_of_sync_found(struct drbd_peer_device *peer_device, sector_t sector, int size)
1259 {
1260 	struct drbd_device *device = peer_device->device;
1261 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1262 		device->ov_last_oos_size += size>>9;
1263 	} else {
1264 		device->ov_last_oos_start = sector;
1265 		device->ov_last_oos_size = size>>9;
1266 	}
1267 	drbd_set_out_of_sync(peer_device, sector, size);
1268 }
1269 
w_e_end_ov_reply(struct drbd_work * w,int cancel)1270 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1271 {
1272 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1273 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1274 	struct drbd_device *device = peer_device->device;
1275 	struct digest_info *di;
1276 	void *digest;
1277 	sector_t sector = peer_req->i.sector;
1278 	unsigned int size = peer_req->i.size;
1279 	int digest_size;
1280 	int err, eq = 0;
1281 	bool stop_sector_reached = false;
1282 
1283 	if (unlikely(cancel)) {
1284 		drbd_free_peer_req(device, peer_req);
1285 		dec_unacked(device);
1286 		return 0;
1287 	}
1288 
1289 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1290 	 * the resync lru has been cleaned up already */
1291 	if (get_ldev(device)) {
1292 		drbd_rs_complete_io(device, peer_req->i.sector);
1293 		put_ldev(device);
1294 	}
1295 
1296 	di = peer_req->digest;
1297 
1298 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1299 		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1300 		digest = kmalloc(digest_size, GFP_NOIO);
1301 		if (digest) {
1302 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1303 
1304 			D_ASSERT(device, digest_size == di->digest_size);
1305 			eq = !memcmp(digest, di->digest, digest_size);
1306 			kfree(digest);
1307 		}
1308 	}
1309 
1310 	/* Free peer_req and pages before send.
1311 	 * In case we block on congestion, we could otherwise run into
1312 	 * some distributed deadlock, if the other side blocks on
1313 	 * congestion as well, because our receiver blocks in
1314 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1315 	drbd_free_peer_req(device, peer_req);
1316 	if (!eq)
1317 		drbd_ov_out_of_sync_found(peer_device, sector, size);
1318 	else
1319 		ov_out_of_sync_print(peer_device);
1320 
1321 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1322 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1323 
1324 	dec_unacked(device);
1325 
1326 	--device->ov_left;
1327 
1328 	/* let's advance progress step marks only for every other megabyte */
1329 	if ((device->ov_left & 0x200) == 0x200)
1330 		drbd_advance_rs_marks(peer_device, device->ov_left);
1331 
1332 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1333 		(sector + (size>>9)) >= device->ov_stop_sector;
1334 
1335 	if (device->ov_left == 0 || stop_sector_reached) {
1336 		ov_out_of_sync_print(peer_device);
1337 		drbd_resync_finished(peer_device);
1338 	}
1339 
1340 	return err;
1341 }
1342 
1343 /* FIXME
1344  * We need to track the number of pending barrier acks,
1345  * and to be able to wait for them.
1346  * See also comment in drbd_adm_attach before drbd_suspend_io.
1347  */
drbd_send_barrier(struct drbd_connection * connection)1348 static int drbd_send_barrier(struct drbd_connection *connection)
1349 {
1350 	struct p_barrier *p;
1351 	struct drbd_socket *sock;
1352 
1353 	sock = &connection->data;
1354 	p = conn_prepare_command(connection, sock);
1355 	if (!p)
1356 		return -EIO;
1357 	p->barrier = connection->send.current_epoch_nr;
1358 	p->pad = 0;
1359 	connection->send.current_epoch_writes = 0;
1360 	connection->send.last_sent_barrier_jif = jiffies;
1361 
1362 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1363 }
1364 
pd_send_unplug_remote(struct drbd_peer_device * pd)1365 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1366 {
1367 	struct drbd_socket *sock = &pd->connection->data;
1368 	if (!drbd_prepare_command(pd, sock))
1369 		return -EIO;
1370 	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1371 }
1372 
w_send_write_hint(struct drbd_work * w,int cancel)1373 int w_send_write_hint(struct drbd_work *w, int cancel)
1374 {
1375 	struct drbd_device *device =
1376 		container_of(w, struct drbd_device, unplug_work);
1377 
1378 	if (cancel)
1379 		return 0;
1380 	return pd_send_unplug_remote(first_peer_device(device));
1381 }
1382 
re_init_if_first_write(struct drbd_connection * connection,unsigned int epoch)1383 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1384 {
1385 	if (!connection->send.seen_any_write_yet) {
1386 		connection->send.seen_any_write_yet = true;
1387 		connection->send.current_epoch_nr = epoch;
1388 		connection->send.current_epoch_writes = 0;
1389 		connection->send.last_sent_barrier_jif = jiffies;
1390 	}
1391 }
1392 
maybe_send_barrier(struct drbd_connection * connection,unsigned int epoch)1393 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1394 {
1395 	/* re-init if first write on this connection */
1396 	if (!connection->send.seen_any_write_yet)
1397 		return;
1398 	if (connection->send.current_epoch_nr != epoch) {
1399 		if (connection->send.current_epoch_writes)
1400 			drbd_send_barrier(connection);
1401 		connection->send.current_epoch_nr = epoch;
1402 	}
1403 }
1404 
w_send_out_of_sync(struct drbd_work * w,int cancel)1405 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1406 {
1407 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1408 	struct drbd_device *device = req->device;
1409 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1410 	struct drbd_connection *const connection = peer_device->connection;
1411 	int err;
1412 
1413 	if (unlikely(cancel)) {
1414 		req_mod(req, SEND_CANCELED, peer_device);
1415 		return 0;
1416 	}
1417 	req->pre_send_jif = jiffies;
1418 
1419 	/* this time, no connection->send.current_epoch_writes++;
1420 	 * If it was sent, it was the closing barrier for the last
1421 	 * replicated epoch, before we went into AHEAD mode.
1422 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1423 	maybe_send_barrier(connection, req->epoch);
1424 
1425 	err = drbd_send_out_of_sync(peer_device, req);
1426 	req_mod(req, OOS_HANDED_TO_NETWORK, peer_device);
1427 
1428 	return err;
1429 }
1430 
1431 /**
1432  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1433  * @w:		work object.
1434  * @cancel:	The connection will be closed anyways
1435  */
w_send_dblock(struct drbd_work * w,int cancel)1436 int w_send_dblock(struct drbd_work *w, int cancel)
1437 {
1438 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1439 	struct drbd_device *device = req->device;
1440 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1441 	struct drbd_connection *connection = peer_device->connection;
1442 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1443 	int err;
1444 
1445 	if (unlikely(cancel)) {
1446 		req_mod(req, SEND_CANCELED, peer_device);
1447 		return 0;
1448 	}
1449 	req->pre_send_jif = jiffies;
1450 
1451 	re_init_if_first_write(connection, req->epoch);
1452 	maybe_send_barrier(connection, req->epoch);
1453 	connection->send.current_epoch_writes++;
1454 
1455 	err = drbd_send_dblock(peer_device, req);
1456 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1457 
1458 	if (do_send_unplug && !err)
1459 		pd_send_unplug_remote(peer_device);
1460 
1461 	return err;
1462 }
1463 
1464 /**
1465  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1466  * @w:		work object.
1467  * @cancel:	The connection will be closed anyways
1468  */
w_send_read_req(struct drbd_work * w,int cancel)1469 int w_send_read_req(struct drbd_work *w, int cancel)
1470 {
1471 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1472 	struct drbd_device *device = req->device;
1473 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1474 	struct drbd_connection *connection = peer_device->connection;
1475 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1476 	int err;
1477 
1478 	if (unlikely(cancel)) {
1479 		req_mod(req, SEND_CANCELED, peer_device);
1480 		return 0;
1481 	}
1482 	req->pre_send_jif = jiffies;
1483 
1484 	/* Even read requests may close a write epoch,
1485 	 * if there was any yet. */
1486 	maybe_send_barrier(connection, req->epoch);
1487 
1488 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1489 				 (unsigned long)req);
1490 
1491 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1492 
1493 	if (do_send_unplug && !err)
1494 		pd_send_unplug_remote(peer_device);
1495 
1496 	return err;
1497 }
1498 
w_restart_disk_io(struct drbd_work * w,int cancel)1499 int w_restart_disk_io(struct drbd_work *w, int cancel)
1500 {
1501 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1502 	struct drbd_device *device = req->device;
1503 
1504 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1505 		drbd_al_begin_io(device, &req->i);
1506 
1507 	req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1508 					   req->master_bio, GFP_NOIO,
1509 					  &drbd_io_bio_set);
1510 	req->private_bio->bi_private = req;
1511 	req->private_bio->bi_end_io = drbd_request_endio;
1512 	submit_bio_noacct(req->private_bio);
1513 
1514 	return 0;
1515 }
1516 
_drbd_may_sync_now(struct drbd_device * device)1517 static int _drbd_may_sync_now(struct drbd_device *device)
1518 {
1519 	struct drbd_device *odev = device;
1520 	int resync_after;
1521 
1522 	while (1) {
1523 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1524 			return 1;
1525 		rcu_read_lock();
1526 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1527 		rcu_read_unlock();
1528 		if (resync_after == -1)
1529 			return 1;
1530 		odev = minor_to_device(resync_after);
1531 		if (!odev)
1532 			return 1;
1533 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1534 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1535 		    odev->state.aftr_isp || odev->state.peer_isp ||
1536 		    odev->state.user_isp)
1537 			return 0;
1538 	}
1539 }
1540 
1541 /**
1542  * drbd_pause_after() - Pause resync on all devices that may not resync now
1543  * @device:	DRBD device.
1544  *
1545  * Called from process context only (admin command and after_state_ch).
1546  */
drbd_pause_after(struct drbd_device * device)1547 static bool drbd_pause_after(struct drbd_device *device)
1548 {
1549 	bool changed = false;
1550 	struct drbd_device *odev;
1551 	int i;
1552 
1553 	rcu_read_lock();
1554 	idr_for_each_entry(&drbd_devices, odev, i) {
1555 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1556 			continue;
1557 		if (!_drbd_may_sync_now(odev) &&
1558 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1559 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1560 			changed = true;
1561 	}
1562 	rcu_read_unlock();
1563 
1564 	return changed;
1565 }
1566 
1567 /**
1568  * drbd_resume_next() - Resume resync on all devices that may resync now
1569  * @device:	DRBD device.
1570  *
1571  * Called from process context only (admin command and worker).
1572  */
drbd_resume_next(struct drbd_device * device)1573 static bool drbd_resume_next(struct drbd_device *device)
1574 {
1575 	bool changed = false;
1576 	struct drbd_device *odev;
1577 	int i;
1578 
1579 	rcu_read_lock();
1580 	idr_for_each_entry(&drbd_devices, odev, i) {
1581 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1582 			continue;
1583 		if (odev->state.aftr_isp) {
1584 			if (_drbd_may_sync_now(odev) &&
1585 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1586 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1587 				changed = true;
1588 		}
1589 	}
1590 	rcu_read_unlock();
1591 	return changed;
1592 }
1593 
resume_next_sg(struct drbd_device * device)1594 void resume_next_sg(struct drbd_device *device)
1595 {
1596 	lock_all_resources();
1597 	drbd_resume_next(device);
1598 	unlock_all_resources();
1599 }
1600 
suspend_other_sg(struct drbd_device * device)1601 void suspend_other_sg(struct drbd_device *device)
1602 {
1603 	lock_all_resources();
1604 	drbd_pause_after(device);
1605 	unlock_all_resources();
1606 }
1607 
1608 /* caller must lock_all_resources() */
drbd_resync_after_valid(struct drbd_device * device,int o_minor)1609 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1610 {
1611 	struct drbd_device *odev;
1612 	int resync_after;
1613 
1614 	if (o_minor == -1)
1615 		return NO_ERROR;
1616 	if (o_minor < -1 || o_minor > MINORMASK)
1617 		return ERR_RESYNC_AFTER;
1618 
1619 	/* check for loops */
1620 	odev = minor_to_device(o_minor);
1621 	while (1) {
1622 		if (odev == device)
1623 			return ERR_RESYNC_AFTER_CYCLE;
1624 
1625 		/* You are free to depend on diskless, non-existing,
1626 		 * or not yet/no longer existing minors.
1627 		 * We only reject dependency loops.
1628 		 * We cannot follow the dependency chain beyond a detached or
1629 		 * missing minor.
1630 		 */
1631 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1632 			return NO_ERROR;
1633 
1634 		rcu_read_lock();
1635 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1636 		rcu_read_unlock();
1637 		/* dependency chain ends here, no cycles. */
1638 		if (resync_after == -1)
1639 			return NO_ERROR;
1640 
1641 		/* follow the dependency chain */
1642 		odev = minor_to_device(resync_after);
1643 	}
1644 }
1645 
1646 /* caller must lock_all_resources() */
drbd_resync_after_changed(struct drbd_device * device)1647 void drbd_resync_after_changed(struct drbd_device *device)
1648 {
1649 	int changed;
1650 
1651 	do {
1652 		changed  = drbd_pause_after(device);
1653 		changed |= drbd_resume_next(device);
1654 	} while (changed);
1655 }
1656 
drbd_rs_controller_reset(struct drbd_peer_device * peer_device)1657 void drbd_rs_controller_reset(struct drbd_peer_device *peer_device)
1658 {
1659 	struct drbd_device *device = peer_device->device;
1660 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1661 	struct fifo_buffer *plan;
1662 
1663 	atomic_set(&device->rs_sect_in, 0);
1664 	atomic_set(&device->rs_sect_ev, 0);
1665 	device->rs_in_flight = 0;
1666 	device->rs_last_events =
1667 		(int)part_stat_read_accum(disk->part0, sectors);
1668 
1669 	/* Updating the RCU protected object in place is necessary since
1670 	   this function gets called from atomic context.
1671 	   It is valid since all other updates also lead to an completely
1672 	   empty fifo */
1673 	rcu_read_lock();
1674 	plan = rcu_dereference(device->rs_plan_s);
1675 	plan->total = 0;
1676 	fifo_set(plan, 0);
1677 	rcu_read_unlock();
1678 }
1679 
start_resync_timer_fn(struct timer_list * t)1680 void start_resync_timer_fn(struct timer_list *t)
1681 {
1682 	struct drbd_device *device = timer_container_of(device, t,
1683 							start_resync_timer);
1684 	drbd_device_post_work(device, RS_START);
1685 }
1686 
do_start_resync(struct drbd_device * device)1687 static void do_start_resync(struct drbd_device *device)
1688 {
1689 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1690 		drbd_warn(device, "postponing start_resync ...\n");
1691 		device->start_resync_timer.expires = jiffies + HZ/10;
1692 		add_timer(&device->start_resync_timer);
1693 		return;
1694 	}
1695 
1696 	drbd_start_resync(device, C_SYNC_SOURCE);
1697 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1698 }
1699 
use_checksum_based_resync(struct drbd_connection * connection,struct drbd_device * device)1700 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1701 {
1702 	bool csums_after_crash_only;
1703 	rcu_read_lock();
1704 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1705 	rcu_read_unlock();
1706 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1707 		connection->csums_tfm &&			/* configured? */
1708 		(csums_after_crash_only == false		/* use for each resync? */
1709 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1710 }
1711 
1712 /**
1713  * drbd_start_resync() - Start the resync process
1714  * @device:	DRBD device.
1715  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1716  *
1717  * This function might bring you directly into one of the
1718  * C_PAUSED_SYNC_* states.
1719  */
drbd_start_resync(struct drbd_device * device,enum drbd_conns side)1720 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1721 {
1722 	struct drbd_peer_device *peer_device = first_peer_device(device);
1723 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1724 	union drbd_state ns;
1725 	int r;
1726 
1727 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1728 		drbd_err(device, "Resync already running!\n");
1729 		return;
1730 	}
1731 
1732 	if (!connection) {
1733 		drbd_err(device, "No connection to peer, aborting!\n");
1734 		return;
1735 	}
1736 
1737 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1738 		if (side == C_SYNC_TARGET) {
1739 			/* Since application IO was locked out during C_WF_BITMAP_T and
1740 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1741 			   we check that we might make the data inconsistent. */
1742 			r = drbd_khelper(device, "before-resync-target");
1743 			r = (r >> 8) & 0xff;
1744 			if (r > 0) {
1745 				drbd_info(device, "before-resync-target handler returned %d, "
1746 					 "dropping connection.\n", r);
1747 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1748 				return;
1749 			}
1750 		} else /* C_SYNC_SOURCE */ {
1751 			r = drbd_khelper(device, "before-resync-source");
1752 			r = (r >> 8) & 0xff;
1753 			if (r > 0) {
1754 				if (r == 3) {
1755 					drbd_info(device, "before-resync-source handler returned %d, "
1756 						 "ignoring. Old userland tools?", r);
1757 				} else {
1758 					drbd_info(device, "before-resync-source handler returned %d, "
1759 						 "dropping connection.\n", r);
1760 					conn_request_state(connection,
1761 							   NS(conn, C_DISCONNECTING), CS_HARD);
1762 					return;
1763 				}
1764 			}
1765 		}
1766 	}
1767 
1768 	if (current == connection->worker.task) {
1769 		/* The worker should not sleep waiting for state_mutex,
1770 		   that can take long */
1771 		if (!mutex_trylock(device->state_mutex)) {
1772 			set_bit(B_RS_H_DONE, &device->flags);
1773 			device->start_resync_timer.expires = jiffies + HZ/5;
1774 			add_timer(&device->start_resync_timer);
1775 			return;
1776 		}
1777 	} else {
1778 		mutex_lock(device->state_mutex);
1779 	}
1780 
1781 	lock_all_resources();
1782 	clear_bit(B_RS_H_DONE, &device->flags);
1783 	/* Did some connection breakage or IO error race with us? */
1784 	if (device->state.conn < C_CONNECTED
1785 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1786 		unlock_all_resources();
1787 		goto out;
1788 	}
1789 
1790 	ns = drbd_read_state(device);
1791 
1792 	ns.aftr_isp = !_drbd_may_sync_now(device);
1793 
1794 	ns.conn = side;
1795 
1796 	if (side == C_SYNC_TARGET)
1797 		ns.disk = D_INCONSISTENT;
1798 	else /* side == C_SYNC_SOURCE */
1799 		ns.pdsk = D_INCONSISTENT;
1800 
1801 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1802 	ns = drbd_read_state(device);
1803 
1804 	if (ns.conn < C_CONNECTED)
1805 		r = SS_UNKNOWN_ERROR;
1806 
1807 	if (r == SS_SUCCESS) {
1808 		unsigned long tw = drbd_bm_total_weight(device);
1809 		unsigned long now = jiffies;
1810 		int i;
1811 
1812 		device->rs_failed    = 0;
1813 		device->rs_paused    = 0;
1814 		device->rs_same_csum = 0;
1815 		device->rs_last_sect_ev = 0;
1816 		device->rs_total     = tw;
1817 		device->rs_start     = now;
1818 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1819 			device->rs_mark_left[i] = tw;
1820 			device->rs_mark_time[i] = now;
1821 		}
1822 		drbd_pause_after(device);
1823 		/* Forget potentially stale cached per resync extent bit-counts.
1824 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1825 		 * disabled, and know the disk state is ok. */
1826 		spin_lock(&device->al_lock);
1827 		lc_reset(device->resync);
1828 		device->resync_locked = 0;
1829 		device->resync_wenr = LC_FREE;
1830 		spin_unlock(&device->al_lock);
1831 	}
1832 	unlock_all_resources();
1833 
1834 	if (r == SS_SUCCESS) {
1835 		wake_up(&device->al_wait); /* for lc_reset() above */
1836 		/* reset rs_last_bcast when a resync or verify is started,
1837 		 * to deal with potential jiffies wrap. */
1838 		device->rs_last_bcast = jiffies - HZ;
1839 
1840 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1841 		     drbd_conn_str(ns.conn),
1842 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1843 		     (unsigned long) device->rs_total);
1844 		if (side == C_SYNC_TARGET) {
1845 			device->bm_resync_fo = 0;
1846 			device->use_csums = use_checksum_based_resync(connection, device);
1847 		} else {
1848 			device->use_csums = false;
1849 		}
1850 
1851 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1852 		 * with w_send_oos, or the sync target will get confused as to
1853 		 * how much bits to resync.  We cannot do that always, because for an
1854 		 * empty resync and protocol < 95, we need to do it here, as we call
1855 		 * drbd_resync_finished from here in that case.
1856 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1857 		 * and from after_state_ch otherwise. */
1858 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1859 			drbd_gen_and_send_sync_uuid(peer_device);
1860 
1861 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1862 			/* This still has a race (about when exactly the peers
1863 			 * detect connection loss) that can lead to a full sync
1864 			 * on next handshake. In 8.3.9 we fixed this with explicit
1865 			 * resync-finished notifications, but the fix
1866 			 * introduces a protocol change.  Sleeping for some
1867 			 * time longer than the ping interval + timeout on the
1868 			 * SyncSource, to give the SyncTarget the chance to
1869 			 * detect connection loss, then waiting for a ping
1870 			 * response (implicit in drbd_resync_finished) reduces
1871 			 * the race considerably, but does not solve it. */
1872 			if (side == C_SYNC_SOURCE) {
1873 				struct net_conf *nc;
1874 				int timeo;
1875 
1876 				rcu_read_lock();
1877 				nc = rcu_dereference(connection->net_conf);
1878 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1879 				rcu_read_unlock();
1880 				schedule_timeout_interruptible(timeo);
1881 			}
1882 			drbd_resync_finished(peer_device);
1883 		}
1884 
1885 		drbd_rs_controller_reset(peer_device);
1886 		/* ns.conn may already be != device->state.conn,
1887 		 * we may have been paused in between, or become paused until
1888 		 * the timer triggers.
1889 		 * No matter, that is handled in resync_timer_fn() */
1890 		if (ns.conn == C_SYNC_TARGET)
1891 			mod_timer(&device->resync_timer, jiffies);
1892 
1893 		drbd_md_sync(device);
1894 	}
1895 	put_ldev(device);
1896 out:
1897 	mutex_unlock(device->state_mutex);
1898 }
1899 
update_on_disk_bitmap(struct drbd_peer_device * peer_device,bool resync_done)1900 static void update_on_disk_bitmap(struct drbd_peer_device *peer_device, bool resync_done)
1901 {
1902 	struct drbd_device *device = peer_device->device;
1903 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1904 	device->rs_last_bcast = jiffies;
1905 
1906 	if (!get_ldev(device))
1907 		return;
1908 
1909 	drbd_bm_write_lazy(device, 0);
1910 	if (resync_done && is_sync_state(device->state.conn))
1911 		drbd_resync_finished(peer_device);
1912 
1913 	drbd_bcast_event(device, &sib);
1914 	/* update timestamp, in case it took a while to write out stuff */
1915 	device->rs_last_bcast = jiffies;
1916 	put_ldev(device);
1917 }
1918 
drbd_ldev_destroy(struct drbd_device * device)1919 static void drbd_ldev_destroy(struct drbd_device *device)
1920 {
1921 	lc_destroy(device->resync);
1922 	device->resync = NULL;
1923 	lc_destroy(device->act_log);
1924 	device->act_log = NULL;
1925 
1926 	__acquire(local);
1927 	drbd_backing_dev_free(device, device->ldev);
1928 	device->ldev = NULL;
1929 	__release(local);
1930 
1931 	clear_bit(GOING_DISKLESS, &device->flags);
1932 	wake_up(&device->misc_wait);
1933 }
1934 
go_diskless(struct drbd_device * device)1935 static void go_diskless(struct drbd_device *device)
1936 {
1937 	struct drbd_peer_device *peer_device = first_peer_device(device);
1938 	D_ASSERT(device, device->state.disk == D_FAILED);
1939 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1940 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1941 	 * the protected members anymore, though, so once put_ldev reaches zero
1942 	 * again, it will be safe to free them. */
1943 
1944 	/* Try to write changed bitmap pages, read errors may have just
1945 	 * set some bits outside the area covered by the activity log.
1946 	 *
1947 	 * If we have an IO error during the bitmap writeout,
1948 	 * we will want a full sync next time, just in case.
1949 	 * (Do we want a specific meta data flag for this?)
1950 	 *
1951 	 * If that does not make it to stable storage either,
1952 	 * we cannot do anything about that anymore.
1953 	 *
1954 	 * We still need to check if both bitmap and ldev are present, we may
1955 	 * end up here after a failed attach, before ldev was even assigned.
1956 	 */
1957 	if (device->bitmap && device->ldev) {
1958 		/* An interrupted resync or similar is allowed to recounts bits
1959 		 * while we detach.
1960 		 * Any modifications would not be expected anymore, though.
1961 		 */
1962 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1963 					"detach", BM_LOCKED_TEST_ALLOWED, peer_device)) {
1964 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1965 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1966 				drbd_md_sync(device);
1967 			}
1968 		}
1969 	}
1970 
1971 	drbd_force_state(device, NS(disk, D_DISKLESS));
1972 }
1973 
do_md_sync(struct drbd_device * device)1974 static int do_md_sync(struct drbd_device *device)
1975 {
1976 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1977 	drbd_md_sync(device);
1978 	return 0;
1979 }
1980 
1981 /* only called from drbd_worker thread, no locking */
__update_timing_details(struct drbd_thread_timing_details * tdp,unsigned int * cb_nr,void * cb,const char * fn,const unsigned int line)1982 void __update_timing_details(
1983 		struct drbd_thread_timing_details *tdp,
1984 		unsigned int *cb_nr,
1985 		void *cb,
1986 		const char *fn, const unsigned int line)
1987 {
1988 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1989 	struct drbd_thread_timing_details *td = tdp + i;
1990 
1991 	td->start_jif = jiffies;
1992 	td->cb_addr = cb;
1993 	td->caller_fn = fn;
1994 	td->line = line;
1995 	td->cb_nr = *cb_nr;
1996 
1997 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1998 	td = tdp + i;
1999 	memset(td, 0, sizeof(*td));
2000 
2001 	++(*cb_nr);
2002 }
2003 
do_device_work(struct drbd_device * device,const unsigned long todo)2004 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2005 {
2006 	if (test_bit(MD_SYNC, &todo))
2007 		do_md_sync(device);
2008 	if (test_bit(RS_DONE, &todo) ||
2009 	    test_bit(RS_PROGRESS, &todo))
2010 		update_on_disk_bitmap(first_peer_device(device), test_bit(RS_DONE, &todo));
2011 	if (test_bit(GO_DISKLESS, &todo))
2012 		go_diskless(device);
2013 	if (test_bit(DESTROY_DISK, &todo))
2014 		drbd_ldev_destroy(device);
2015 	if (test_bit(RS_START, &todo))
2016 		do_start_resync(device);
2017 }
2018 
2019 #define DRBD_DEVICE_WORK_MASK	\
2020 	((1UL << GO_DISKLESS)	\
2021 	|(1UL << DESTROY_DISK)	\
2022 	|(1UL << MD_SYNC)	\
2023 	|(1UL << RS_START)	\
2024 	|(1UL << RS_PROGRESS)	\
2025 	|(1UL << RS_DONE)	\
2026 	)
2027 
get_work_bits(unsigned long * flags)2028 static unsigned long get_work_bits(unsigned long *flags)
2029 {
2030 	unsigned long old, new;
2031 	do {
2032 		old = *flags;
2033 		new = old & ~DRBD_DEVICE_WORK_MASK;
2034 	} while (cmpxchg(flags, old, new) != old);
2035 	return old & DRBD_DEVICE_WORK_MASK;
2036 }
2037 
do_unqueued_work(struct drbd_connection * connection)2038 static void do_unqueued_work(struct drbd_connection *connection)
2039 {
2040 	struct drbd_peer_device *peer_device;
2041 	int vnr;
2042 
2043 	rcu_read_lock();
2044 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2045 		struct drbd_device *device = peer_device->device;
2046 		unsigned long todo = get_work_bits(&device->flags);
2047 		if (!todo)
2048 			continue;
2049 
2050 		kref_get(&device->kref);
2051 		rcu_read_unlock();
2052 		do_device_work(device, todo);
2053 		kref_put(&device->kref, drbd_destroy_device);
2054 		rcu_read_lock();
2055 	}
2056 	rcu_read_unlock();
2057 }
2058 
dequeue_work_batch(struct drbd_work_queue * queue,struct list_head * work_list)2059 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2060 {
2061 	spin_lock_irq(&queue->q_lock);
2062 	list_splice_tail_init(&queue->q, work_list);
2063 	spin_unlock_irq(&queue->q_lock);
2064 	return !list_empty(work_list);
2065 }
2066 
wait_for_work(struct drbd_connection * connection,struct list_head * work_list)2067 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2068 {
2069 	DEFINE_WAIT(wait);
2070 	struct net_conf *nc;
2071 	int uncork, cork;
2072 
2073 	dequeue_work_batch(&connection->sender_work, work_list);
2074 	if (!list_empty(work_list))
2075 		return;
2076 
2077 	/* Still nothing to do?
2078 	 * Maybe we still need to close the current epoch,
2079 	 * even if no new requests are queued yet.
2080 	 *
2081 	 * Also, poke TCP, just in case.
2082 	 * Then wait for new work (or signal). */
2083 	rcu_read_lock();
2084 	nc = rcu_dereference(connection->net_conf);
2085 	uncork = nc ? nc->tcp_cork : 0;
2086 	rcu_read_unlock();
2087 	if (uncork) {
2088 		mutex_lock(&connection->data.mutex);
2089 		if (connection->data.socket)
2090 			tcp_sock_set_cork(connection->data.socket->sk, false);
2091 		mutex_unlock(&connection->data.mutex);
2092 	}
2093 
2094 	for (;;) {
2095 		int send_barrier;
2096 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2097 		spin_lock_irq(&connection->resource->req_lock);
2098 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2099 		if (!list_empty(&connection->sender_work.q))
2100 			list_splice_tail_init(&connection->sender_work.q, work_list);
2101 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2102 		if (!list_empty(work_list) || signal_pending(current)) {
2103 			spin_unlock_irq(&connection->resource->req_lock);
2104 			break;
2105 		}
2106 
2107 		/* We found nothing new to do, no to-be-communicated request,
2108 		 * no other work item.  We may still need to close the last
2109 		 * epoch.  Next incoming request epoch will be connection ->
2110 		 * current transfer log epoch number.  If that is different
2111 		 * from the epoch of the last request we communicated, it is
2112 		 * safe to send the epoch separating barrier now.
2113 		 */
2114 		send_barrier =
2115 			atomic_read(&connection->current_tle_nr) !=
2116 			connection->send.current_epoch_nr;
2117 		spin_unlock_irq(&connection->resource->req_lock);
2118 
2119 		if (send_barrier)
2120 			maybe_send_barrier(connection,
2121 					connection->send.current_epoch_nr + 1);
2122 
2123 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2124 			break;
2125 
2126 		/* drbd_send() may have called flush_signals() */
2127 		if (get_t_state(&connection->worker) != RUNNING)
2128 			break;
2129 
2130 		schedule();
2131 		/* may be woken up for other things but new work, too,
2132 		 * e.g. if the current epoch got closed.
2133 		 * In which case we send the barrier above. */
2134 	}
2135 	finish_wait(&connection->sender_work.q_wait, &wait);
2136 
2137 	/* someone may have changed the config while we have been waiting above. */
2138 	rcu_read_lock();
2139 	nc = rcu_dereference(connection->net_conf);
2140 	cork = nc ? nc->tcp_cork : 0;
2141 	rcu_read_unlock();
2142 	mutex_lock(&connection->data.mutex);
2143 	if (connection->data.socket) {
2144 		if (cork)
2145 			tcp_sock_set_cork(connection->data.socket->sk, true);
2146 		else if (!uncork)
2147 			tcp_sock_set_cork(connection->data.socket->sk, false);
2148 	}
2149 	mutex_unlock(&connection->data.mutex);
2150 }
2151 
drbd_worker(struct drbd_thread * thi)2152 int drbd_worker(struct drbd_thread *thi)
2153 {
2154 	struct drbd_connection *connection = thi->connection;
2155 	struct drbd_work *w = NULL;
2156 	struct drbd_peer_device *peer_device;
2157 	LIST_HEAD(work_list);
2158 	int vnr;
2159 
2160 	while (get_t_state(thi) == RUNNING) {
2161 		drbd_thread_current_set_cpu(thi);
2162 
2163 		if (list_empty(&work_list)) {
2164 			update_worker_timing_details(connection, wait_for_work);
2165 			wait_for_work(connection, &work_list);
2166 		}
2167 
2168 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2169 			update_worker_timing_details(connection, do_unqueued_work);
2170 			do_unqueued_work(connection);
2171 		}
2172 
2173 		if (signal_pending(current)) {
2174 			flush_signals(current);
2175 			if (get_t_state(thi) == RUNNING) {
2176 				drbd_warn(connection, "Worker got an unexpected signal\n");
2177 				continue;
2178 			}
2179 			break;
2180 		}
2181 
2182 		if (get_t_state(thi) != RUNNING)
2183 			break;
2184 
2185 		if (!list_empty(&work_list)) {
2186 			w = list_first_entry(&work_list, struct drbd_work, list);
2187 			list_del_init(&w->list);
2188 			update_worker_timing_details(connection, w->cb);
2189 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2190 				continue;
2191 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2192 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2193 		}
2194 	}
2195 
2196 	do {
2197 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2198 			update_worker_timing_details(connection, do_unqueued_work);
2199 			do_unqueued_work(connection);
2200 		}
2201 		if (!list_empty(&work_list)) {
2202 			w = list_first_entry(&work_list, struct drbd_work, list);
2203 			list_del_init(&w->list);
2204 			update_worker_timing_details(connection, w->cb);
2205 			w->cb(w, 1);
2206 		} else
2207 			dequeue_work_batch(&connection->sender_work, &work_list);
2208 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2209 
2210 	rcu_read_lock();
2211 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2212 		struct drbd_device *device = peer_device->device;
2213 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2214 		kref_get(&device->kref);
2215 		rcu_read_unlock();
2216 		drbd_device_cleanup(device);
2217 		kref_put(&device->kref, drbd_destroy_device);
2218 		rcu_read_lock();
2219 	}
2220 	rcu_read_unlock();
2221 
2222 	return 0;
2223 }
2224