xref: /linux/drivers/block/drbd/drbd_receiver.c (revision 0883c2c06fb5bcf5b9e008270827e63c09a88c1e)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50 
51 #define PRO_FEATURES (FF_TRIM)
52 
53 struct packet_info {
54 	enum drbd_packet cmd;
55 	unsigned int size;
56 	unsigned int vnr;
57 	void *data;
58 };
59 
60 enum finish_epoch {
61 	FE_STILL_LIVE,
62 	FE_DESTROYED,
63 	FE_RECYCLED,
64 };
65 
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72 
73 
74 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75 
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80 
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 	struct page *page;
88 	struct page *tmp;
89 
90 	BUG_ON(!n);
91 	BUG_ON(!head);
92 
93 	page = *head;
94 
95 	if (!page)
96 		return NULL;
97 
98 	while (page) {
99 		tmp = page_chain_next(page);
100 		if (--n == 0)
101 			break; /* found sufficient pages */
102 		if (tmp == NULL)
103 			/* insufficient pages, don't use any of them. */
104 			return NULL;
105 		page = tmp;
106 	}
107 
108 	/* add end of list marker for the returned list */
109 	set_page_private(page, 0);
110 	/* actual return value, and adjustment of head */
111 	page = *head;
112 	*head = tmp;
113 	return page;
114 }
115 
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 	struct page *tmp;
122 	int i = 1;
123 	while ((tmp = page_chain_next(page)))
124 		++i, page = tmp;
125 	if (len)
126 		*len = i;
127 	return page;
128 }
129 
130 static int page_chain_free(struct page *page)
131 {
132 	struct page *tmp;
133 	int i = 0;
134 	page_chain_for_each_safe(page, tmp) {
135 		put_page(page);
136 		++i;
137 	}
138 	return i;
139 }
140 
141 static void page_chain_add(struct page **head,
142 		struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 	struct page *tmp;
146 	tmp = page_chain_tail(chain_first, NULL);
147 	BUG_ON(tmp != chain_last);
148 #endif
149 
150 	/* add chain to head */
151 	set_page_private(chain_last, (unsigned long)*head);
152 	*head = chain_first;
153 }
154 
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 				       unsigned int number)
157 {
158 	struct page *page = NULL;
159 	struct page *tmp = NULL;
160 	unsigned int i = 0;
161 
162 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163 	 * So what. It saves a spin_lock. */
164 	if (drbd_pp_vacant >= number) {
165 		spin_lock(&drbd_pp_lock);
166 		page = page_chain_del(&drbd_pp_pool, number);
167 		if (page)
168 			drbd_pp_vacant -= number;
169 		spin_unlock(&drbd_pp_lock);
170 		if (page)
171 			return page;
172 	}
173 
174 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 	 * which in turn might block on the other node at this very place.  */
177 	for (i = 0; i < number; i++) {
178 		tmp = alloc_page(GFP_TRY);
179 		if (!tmp)
180 			break;
181 		set_page_private(tmp, (unsigned long)page);
182 		page = tmp;
183 	}
184 
185 	if (i == number)
186 		return page;
187 
188 	/* Not enough pages immediately available this time.
189 	 * No need to jump around here, drbd_alloc_pages will retry this
190 	 * function "soon". */
191 	if (page) {
192 		tmp = page_chain_tail(page, NULL);
193 		spin_lock(&drbd_pp_lock);
194 		page_chain_add(&drbd_pp_pool, page, tmp);
195 		drbd_pp_vacant += i;
196 		spin_unlock(&drbd_pp_lock);
197 	}
198 	return NULL;
199 }
200 
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 					   struct list_head *to_be_freed)
203 {
204 	struct drbd_peer_request *peer_req, *tmp;
205 
206 	/* The EEs are always appended to the end of the list. Since
207 	   they are sent in order over the wire, they have to finish
208 	   in order. As soon as we see the first not finished we can
209 	   stop to examine the list... */
210 
211 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 		if (drbd_peer_req_has_active_page(peer_req))
213 			break;
214 		list_move(&peer_req->w.list, to_be_freed);
215 	}
216 }
217 
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220 	LIST_HEAD(reclaimed);
221 	struct drbd_peer_request *peer_req, *t;
222 
223 	spin_lock_irq(&device->resource->req_lock);
224 	reclaim_finished_net_peer_reqs(device, &reclaimed);
225 	spin_unlock_irq(&device->resource->req_lock);
226 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227 		drbd_free_net_peer_req(device, peer_req);
228 }
229 
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232 	struct drbd_peer_device *peer_device;
233 	int vnr;
234 
235 	rcu_read_lock();
236 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237 		struct drbd_device *device = peer_device->device;
238 		if (!atomic_read(&device->pp_in_use_by_net))
239 			continue;
240 
241 		kref_get(&device->kref);
242 		rcu_read_unlock();
243 		drbd_reclaim_net_peer_reqs(device);
244 		kref_put(&device->kref, drbd_destroy_device);
245 		rcu_read_lock();
246 	}
247 	rcu_read_unlock();
248 }
249 
250 /**
251  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252  * @device:	DRBD device.
253  * @number:	number of pages requested
254  * @retry:	whether to retry, if not enough pages are available right now
255  *
256  * Tries to allocate number pages, first from our own page pool, then from
257  * the kernel.
258  * Possibly retry until DRBD frees sufficient pages somewhere else.
259  *
260  * If this allocation would exceed the max_buffers setting, we throttle
261  * allocation (schedule_timeout) to give the system some room to breathe.
262  *
263  * We do not use max-buffers as hard limit, because it could lead to
264  * congestion and further to a distributed deadlock during online-verify or
265  * (checksum based) resync, if the max-buffers, socket buffer sizes and
266  * resync-rate settings are mis-configured.
267  *
268  * Returns a page chain linked via page->private.
269  */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271 			      bool retry)
272 {
273 	struct drbd_device *device = peer_device->device;
274 	struct page *page = NULL;
275 	struct net_conf *nc;
276 	DEFINE_WAIT(wait);
277 	unsigned int mxb;
278 
279 	rcu_read_lock();
280 	nc = rcu_dereference(peer_device->connection->net_conf);
281 	mxb = nc ? nc->max_buffers : 1000000;
282 	rcu_read_unlock();
283 
284 	if (atomic_read(&device->pp_in_use) < mxb)
285 		page = __drbd_alloc_pages(device, number);
286 
287 	/* Try to keep the fast path fast, but occasionally we need
288 	 * to reclaim the pages we lended to the network stack. */
289 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290 		drbd_reclaim_net_peer_reqs(device);
291 
292 	while (page == NULL) {
293 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294 
295 		drbd_reclaim_net_peer_reqs(device);
296 
297 		if (atomic_read(&device->pp_in_use) < mxb) {
298 			page = __drbd_alloc_pages(device, number);
299 			if (page)
300 				break;
301 		}
302 
303 		if (!retry)
304 			break;
305 
306 		if (signal_pending(current)) {
307 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308 			break;
309 		}
310 
311 		if (schedule_timeout(HZ/10) == 0)
312 			mxb = UINT_MAX;
313 	}
314 	finish_wait(&drbd_pp_wait, &wait);
315 
316 	if (page)
317 		atomic_add(number, &device->pp_in_use);
318 	return page;
319 }
320 
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323  * Either links the page chain back to the global pool,
324  * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328 	int i;
329 
330 	if (page == NULL)
331 		return;
332 
333 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334 		i = page_chain_free(page);
335 	else {
336 		struct page *tmp;
337 		tmp = page_chain_tail(page, &i);
338 		spin_lock(&drbd_pp_lock);
339 		page_chain_add(&drbd_pp_pool, page, tmp);
340 		drbd_pp_vacant += i;
341 		spin_unlock(&drbd_pp_lock);
342 	}
343 	i = atomic_sub_return(i, a);
344 	if (i < 0)
345 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347 	wake_up(&drbd_pp_wait);
348 }
349 
350 /*
351 You need to hold the req_lock:
352  _drbd_wait_ee_list_empty()
353 
354 You must not have the req_lock:
355  drbd_free_peer_req()
356  drbd_alloc_peer_req()
357  drbd_free_peer_reqs()
358  drbd_ee_fix_bhs()
359  drbd_finish_peer_reqs()
360  drbd_clear_done_ee()
361  drbd_wait_ee_list_empty()
362 */
363 
364 struct drbd_peer_request *
365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
366 		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
367 {
368 	struct drbd_device *device = peer_device->device;
369 	struct drbd_peer_request *peer_req;
370 	struct page *page = NULL;
371 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
372 
373 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
374 		return NULL;
375 
376 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377 	if (!peer_req) {
378 		if (!(gfp_mask & __GFP_NOWARN))
379 			drbd_err(device, "%s: allocation failed\n", __func__);
380 		return NULL;
381 	}
382 
383 	if (has_payload && data_size) {
384 		page = drbd_alloc_pages(peer_device, nr_pages,
385 					gfpflags_allow_blocking(gfp_mask));
386 		if (!page)
387 			goto fail;
388 	}
389 
390 	memset(peer_req, 0, sizeof(*peer_req));
391 	INIT_LIST_HEAD(&peer_req->w.list);
392 	drbd_clear_interval(&peer_req->i);
393 	peer_req->i.size = data_size;
394 	peer_req->i.sector = sector;
395 	peer_req->submit_jif = jiffies;
396 	peer_req->peer_device = peer_device;
397 	peer_req->pages = page;
398 	/*
399 	 * The block_id is opaque to the receiver.  It is not endianness
400 	 * converted, and sent back to the sender unchanged.
401 	 */
402 	peer_req->block_id = id;
403 
404 	return peer_req;
405 
406  fail:
407 	mempool_free(peer_req, drbd_ee_mempool);
408 	return NULL;
409 }
410 
411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
412 		       int is_net)
413 {
414 	might_sleep();
415 	if (peer_req->flags & EE_HAS_DIGEST)
416 		kfree(peer_req->digest);
417 	drbd_free_pages(device, peer_req->pages, is_net);
418 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
420 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422 		drbd_al_complete_io(device, &peer_req->i);
423 	}
424 	mempool_free(peer_req, drbd_ee_mempool);
425 }
426 
427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
428 {
429 	LIST_HEAD(work_list);
430 	struct drbd_peer_request *peer_req, *t;
431 	int count = 0;
432 	int is_net = list == &device->net_ee;
433 
434 	spin_lock_irq(&device->resource->req_lock);
435 	list_splice_init(list, &work_list);
436 	spin_unlock_irq(&device->resource->req_lock);
437 
438 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
439 		__drbd_free_peer_req(device, peer_req, is_net);
440 		count++;
441 	}
442 	return count;
443 }
444 
445 /*
446  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
447  */
448 static int drbd_finish_peer_reqs(struct drbd_device *device)
449 {
450 	LIST_HEAD(work_list);
451 	LIST_HEAD(reclaimed);
452 	struct drbd_peer_request *peer_req, *t;
453 	int err = 0;
454 
455 	spin_lock_irq(&device->resource->req_lock);
456 	reclaim_finished_net_peer_reqs(device, &reclaimed);
457 	list_splice_init(&device->done_ee, &work_list);
458 	spin_unlock_irq(&device->resource->req_lock);
459 
460 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
461 		drbd_free_net_peer_req(device, peer_req);
462 
463 	/* possible callbacks here:
464 	 * e_end_block, and e_end_resync_block, e_send_superseded.
465 	 * all ignore the last argument.
466 	 */
467 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
468 		int err2;
469 
470 		/* list_del not necessary, next/prev members not touched */
471 		err2 = peer_req->w.cb(&peer_req->w, !!err);
472 		if (!err)
473 			err = err2;
474 		drbd_free_peer_req(device, peer_req);
475 	}
476 	wake_up(&device->ee_wait);
477 
478 	return err;
479 }
480 
481 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
482 				     struct list_head *head)
483 {
484 	DEFINE_WAIT(wait);
485 
486 	/* avoids spin_lock/unlock
487 	 * and calling prepare_to_wait in the fast path */
488 	while (!list_empty(head)) {
489 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
490 		spin_unlock_irq(&device->resource->req_lock);
491 		io_schedule();
492 		finish_wait(&device->ee_wait, &wait);
493 		spin_lock_irq(&device->resource->req_lock);
494 	}
495 }
496 
497 static void drbd_wait_ee_list_empty(struct drbd_device *device,
498 				    struct list_head *head)
499 {
500 	spin_lock_irq(&device->resource->req_lock);
501 	_drbd_wait_ee_list_empty(device, head);
502 	spin_unlock_irq(&device->resource->req_lock);
503 }
504 
505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
506 {
507 	struct kvec iov = {
508 		.iov_base = buf,
509 		.iov_len = size,
510 	};
511 	struct msghdr msg = {
512 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
513 	};
514 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
515 }
516 
517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
518 {
519 	int rv;
520 
521 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
522 
523 	if (rv < 0) {
524 		if (rv == -ECONNRESET)
525 			drbd_info(connection, "sock was reset by peer\n");
526 		else if (rv != -ERESTARTSYS)
527 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
528 	} else if (rv == 0) {
529 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
530 			long t;
531 			rcu_read_lock();
532 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
533 			rcu_read_unlock();
534 
535 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
536 
537 			if (t)
538 				goto out;
539 		}
540 		drbd_info(connection, "sock was shut down by peer\n");
541 	}
542 
543 	if (rv != size)
544 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
545 
546 out:
547 	return rv;
548 }
549 
550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
551 {
552 	int err;
553 
554 	err = drbd_recv(connection, buf, size);
555 	if (err != size) {
556 		if (err >= 0)
557 			err = -EIO;
558 	} else
559 		err = 0;
560 	return err;
561 }
562 
563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
564 {
565 	int err;
566 
567 	err = drbd_recv_all(connection, buf, size);
568 	if (err && !signal_pending(current))
569 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
570 	return err;
571 }
572 
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 		unsigned int rcv)
580 {
581 	/* open coded SO_SNDBUF, SO_RCVBUF */
582 	if (snd) {
583 		sock->sk->sk_sndbuf = snd;
584 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585 	}
586 	if (rcv) {
587 		sock->sk->sk_rcvbuf = rcv;
588 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589 	}
590 }
591 
592 static struct socket *drbd_try_connect(struct drbd_connection *connection)
593 {
594 	const char *what;
595 	struct socket *sock;
596 	struct sockaddr_in6 src_in6;
597 	struct sockaddr_in6 peer_in6;
598 	struct net_conf *nc;
599 	int err, peer_addr_len, my_addr_len;
600 	int sndbuf_size, rcvbuf_size, connect_int;
601 	int disconnect_on_error = 1;
602 
603 	rcu_read_lock();
604 	nc = rcu_dereference(connection->net_conf);
605 	if (!nc) {
606 		rcu_read_unlock();
607 		return NULL;
608 	}
609 	sndbuf_size = nc->sndbuf_size;
610 	rcvbuf_size = nc->rcvbuf_size;
611 	connect_int = nc->connect_int;
612 	rcu_read_unlock();
613 
614 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
616 
617 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
618 		src_in6.sin6_port = 0;
619 	else
620 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
621 
622 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
624 
625 	what = "sock_create_kern";
626 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
627 			       SOCK_STREAM, IPPROTO_TCP, &sock);
628 	if (err < 0) {
629 		sock = NULL;
630 		goto out;
631 	}
632 
633 	sock->sk->sk_rcvtimeo =
634 	sock->sk->sk_sndtimeo = connect_int * HZ;
635 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
636 
637        /* explicitly bind to the configured IP as source IP
638 	*  for the outgoing connections.
639 	*  This is needed for multihomed hosts and to be
640 	*  able to use lo: interfaces for drbd.
641 	* Make sure to use 0 as port number, so linux selects
642 	*  a free one dynamically.
643 	*/
644 	what = "bind before connect";
645 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
646 	if (err < 0)
647 		goto out;
648 
649 	/* connect may fail, peer not yet available.
650 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
651 	disconnect_on_error = 0;
652 	what = "connect";
653 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
654 
655 out:
656 	if (err < 0) {
657 		if (sock) {
658 			sock_release(sock);
659 			sock = NULL;
660 		}
661 		switch (-err) {
662 			/* timeout, busy, signal pending */
663 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664 		case EINTR: case ERESTARTSYS:
665 			/* peer not (yet) available, network problem */
666 		case ECONNREFUSED: case ENETUNREACH:
667 		case EHOSTDOWN:    case EHOSTUNREACH:
668 			disconnect_on_error = 0;
669 			break;
670 		default:
671 			drbd_err(connection, "%s failed, err = %d\n", what, err);
672 		}
673 		if (disconnect_on_error)
674 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
675 	}
676 
677 	return sock;
678 }
679 
680 struct accept_wait_data {
681 	struct drbd_connection *connection;
682 	struct socket *s_listen;
683 	struct completion door_bell;
684 	void (*original_sk_state_change)(struct sock *sk);
685 
686 };
687 
688 static void drbd_incoming_connection(struct sock *sk)
689 {
690 	struct accept_wait_data *ad = sk->sk_user_data;
691 	void (*state_change)(struct sock *sk);
692 
693 	state_change = ad->original_sk_state_change;
694 	if (sk->sk_state == TCP_ESTABLISHED)
695 		complete(&ad->door_bell);
696 	state_change(sk);
697 }
698 
699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
700 {
701 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
702 	struct sockaddr_in6 my_addr;
703 	struct socket *s_listen;
704 	struct net_conf *nc;
705 	const char *what;
706 
707 	rcu_read_lock();
708 	nc = rcu_dereference(connection->net_conf);
709 	if (!nc) {
710 		rcu_read_unlock();
711 		return -EIO;
712 	}
713 	sndbuf_size = nc->sndbuf_size;
714 	rcvbuf_size = nc->rcvbuf_size;
715 	rcu_read_unlock();
716 
717 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
719 
720 	what = "sock_create_kern";
721 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
722 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
723 	if (err) {
724 		s_listen = NULL;
725 		goto out;
726 	}
727 
728 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
729 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
730 
731 	what = "bind before listen";
732 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733 	if (err < 0)
734 		goto out;
735 
736 	ad->s_listen = s_listen;
737 	write_lock_bh(&s_listen->sk->sk_callback_lock);
738 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
739 	s_listen->sk->sk_state_change = drbd_incoming_connection;
740 	s_listen->sk->sk_user_data = ad;
741 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
742 
743 	what = "listen";
744 	err = s_listen->ops->listen(s_listen, 5);
745 	if (err < 0)
746 		goto out;
747 
748 	return 0;
749 out:
750 	if (s_listen)
751 		sock_release(s_listen);
752 	if (err < 0) {
753 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754 			drbd_err(connection, "%s failed, err = %d\n", what, err);
755 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
756 		}
757 	}
758 
759 	return -EIO;
760 }
761 
762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
763 {
764 	write_lock_bh(&sk->sk_callback_lock);
765 	sk->sk_state_change = ad->original_sk_state_change;
766 	sk->sk_user_data = NULL;
767 	write_unlock_bh(&sk->sk_callback_lock);
768 }
769 
770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
771 {
772 	int timeo, connect_int, err = 0;
773 	struct socket *s_estab = NULL;
774 	struct net_conf *nc;
775 
776 	rcu_read_lock();
777 	nc = rcu_dereference(connection->net_conf);
778 	if (!nc) {
779 		rcu_read_unlock();
780 		return NULL;
781 	}
782 	connect_int = nc->connect_int;
783 	rcu_read_unlock();
784 
785 	timeo = connect_int * HZ;
786 	/* 28.5% random jitter */
787 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
788 
789 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790 	if (err <= 0)
791 		return NULL;
792 
793 	err = kernel_accept(ad->s_listen, &s_estab, 0);
794 	if (err < 0) {
795 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
796 			drbd_err(connection, "accept failed, err = %d\n", err);
797 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
798 		}
799 	}
800 
801 	if (s_estab)
802 		unregister_state_change(s_estab->sk, ad);
803 
804 	return s_estab;
805 }
806 
807 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
808 
809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
810 			     enum drbd_packet cmd)
811 {
812 	if (!conn_prepare_command(connection, sock))
813 		return -EIO;
814 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
815 }
816 
817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
818 {
819 	unsigned int header_size = drbd_header_size(connection);
820 	struct packet_info pi;
821 	struct net_conf *nc;
822 	int err;
823 
824 	rcu_read_lock();
825 	nc = rcu_dereference(connection->net_conf);
826 	if (!nc) {
827 		rcu_read_unlock();
828 		return -EIO;
829 	}
830 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831 	rcu_read_unlock();
832 
833 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
834 	if (err != header_size) {
835 		if (err >= 0)
836 			err = -EIO;
837 		return err;
838 	}
839 	err = decode_header(connection, connection->data.rbuf, &pi);
840 	if (err)
841 		return err;
842 	return pi.cmd;
843 }
844 
845 /**
846  * drbd_socket_okay() - Free the socket if its connection is not okay
847  * @sock:	pointer to the pointer to the socket.
848  */
849 static bool drbd_socket_okay(struct socket **sock)
850 {
851 	int rr;
852 	char tb[4];
853 
854 	if (!*sock)
855 		return false;
856 
857 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
858 
859 	if (rr > 0 || rr == -EAGAIN) {
860 		return true;
861 	} else {
862 		sock_release(*sock);
863 		*sock = NULL;
864 		return false;
865 	}
866 }
867 
868 static bool connection_established(struct drbd_connection *connection,
869 				   struct socket **sock1,
870 				   struct socket **sock2)
871 {
872 	struct net_conf *nc;
873 	int timeout;
874 	bool ok;
875 
876 	if (!*sock1 || !*sock2)
877 		return false;
878 
879 	rcu_read_lock();
880 	nc = rcu_dereference(connection->net_conf);
881 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882 	rcu_read_unlock();
883 	schedule_timeout_interruptible(timeout);
884 
885 	ok = drbd_socket_okay(sock1);
886 	ok = drbd_socket_okay(sock2) && ok;
887 
888 	return ok;
889 }
890 
891 /* Gets called if a connection is established, or if a new minor gets created
892    in a connection */
893 int drbd_connected(struct drbd_peer_device *peer_device)
894 {
895 	struct drbd_device *device = peer_device->device;
896 	int err;
897 
898 	atomic_set(&device->packet_seq, 0);
899 	device->peer_seq = 0;
900 
901 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902 		&peer_device->connection->cstate_mutex :
903 		&device->own_state_mutex;
904 
905 	err = drbd_send_sync_param(peer_device);
906 	if (!err)
907 		err = drbd_send_sizes(peer_device, 0, 0);
908 	if (!err)
909 		err = drbd_send_uuids(peer_device);
910 	if (!err)
911 		err = drbd_send_current_state(peer_device);
912 	clear_bit(USE_DEGR_WFC_T, &device->flags);
913 	clear_bit(RESIZE_PENDING, &device->flags);
914 	atomic_set(&device->ap_in_flight, 0);
915 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
916 	return err;
917 }
918 
919 /*
920  * return values:
921  *   1 yes, we have a valid connection
922  *   0 oops, did not work out, please try again
923  *  -1 peer talks different language,
924  *     no point in trying again, please go standalone.
925  *  -2 We do not have a network config...
926  */
927 static int conn_connect(struct drbd_connection *connection)
928 {
929 	struct drbd_socket sock, msock;
930 	struct drbd_peer_device *peer_device;
931 	struct net_conf *nc;
932 	int vnr, timeout, h;
933 	bool discard_my_data, ok;
934 	enum drbd_state_rv rv;
935 	struct accept_wait_data ad = {
936 		.connection = connection,
937 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
938 	};
939 
940 	clear_bit(DISCONNECT_SENT, &connection->flags);
941 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
942 		return -2;
943 
944 	mutex_init(&sock.mutex);
945 	sock.sbuf = connection->data.sbuf;
946 	sock.rbuf = connection->data.rbuf;
947 	sock.socket = NULL;
948 	mutex_init(&msock.mutex);
949 	msock.sbuf = connection->meta.sbuf;
950 	msock.rbuf = connection->meta.rbuf;
951 	msock.socket = NULL;
952 
953 	/* Assume that the peer only understands protocol 80 until we know better.  */
954 	connection->agreed_pro_version = 80;
955 
956 	if (prepare_listen_socket(connection, &ad))
957 		return 0;
958 
959 	do {
960 		struct socket *s;
961 
962 		s = drbd_try_connect(connection);
963 		if (s) {
964 			if (!sock.socket) {
965 				sock.socket = s;
966 				send_first_packet(connection, &sock, P_INITIAL_DATA);
967 			} else if (!msock.socket) {
968 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
969 				msock.socket = s;
970 				send_first_packet(connection, &msock, P_INITIAL_META);
971 			} else {
972 				drbd_err(connection, "Logic error in conn_connect()\n");
973 				goto out_release_sockets;
974 			}
975 		}
976 
977 		if (connection_established(connection, &sock.socket, &msock.socket))
978 			break;
979 
980 retry:
981 		s = drbd_wait_for_connect(connection, &ad);
982 		if (s) {
983 			int fp = receive_first_packet(connection, s);
984 			drbd_socket_okay(&sock.socket);
985 			drbd_socket_okay(&msock.socket);
986 			switch (fp) {
987 			case P_INITIAL_DATA:
988 				if (sock.socket) {
989 					drbd_warn(connection, "initial packet S crossed\n");
990 					sock_release(sock.socket);
991 					sock.socket = s;
992 					goto randomize;
993 				}
994 				sock.socket = s;
995 				break;
996 			case P_INITIAL_META:
997 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
998 				if (msock.socket) {
999 					drbd_warn(connection, "initial packet M crossed\n");
1000 					sock_release(msock.socket);
1001 					msock.socket = s;
1002 					goto randomize;
1003 				}
1004 				msock.socket = s;
1005 				break;
1006 			default:
1007 				drbd_warn(connection, "Error receiving initial packet\n");
1008 				sock_release(s);
1009 randomize:
1010 				if (prandom_u32() & 1)
1011 					goto retry;
1012 			}
1013 		}
1014 
1015 		if (connection->cstate <= C_DISCONNECTING)
1016 			goto out_release_sockets;
1017 		if (signal_pending(current)) {
1018 			flush_signals(current);
1019 			smp_rmb();
1020 			if (get_t_state(&connection->receiver) == EXITING)
1021 				goto out_release_sockets;
1022 		}
1023 
1024 		ok = connection_established(connection, &sock.socket, &msock.socket);
1025 	} while (!ok);
1026 
1027 	if (ad.s_listen)
1028 		sock_release(ad.s_listen);
1029 
1030 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1032 
1033 	sock.socket->sk->sk_allocation = GFP_NOIO;
1034 	msock.socket->sk->sk_allocation = GFP_NOIO;
1035 
1036 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1038 
1039 	/* NOT YET ...
1040 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042 	 * first set it to the P_CONNECTION_FEATURES timeout,
1043 	 * which we set to 4x the configured ping_timeout. */
1044 	rcu_read_lock();
1045 	nc = rcu_dereference(connection->net_conf);
1046 
1047 	sock.socket->sk->sk_sndtimeo =
1048 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1049 
1050 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051 	timeout = nc->timeout * HZ / 10;
1052 	discard_my_data = nc->discard_my_data;
1053 	rcu_read_unlock();
1054 
1055 	msock.socket->sk->sk_sndtimeo = timeout;
1056 
1057 	/* we don't want delays.
1058 	 * we use TCP_CORK where appropriate, though */
1059 	drbd_tcp_nodelay(sock.socket);
1060 	drbd_tcp_nodelay(msock.socket);
1061 
1062 	connection->data.socket = sock.socket;
1063 	connection->meta.socket = msock.socket;
1064 	connection->last_received = jiffies;
1065 
1066 	h = drbd_do_features(connection);
1067 	if (h <= 0)
1068 		return h;
1069 
1070 	if (connection->cram_hmac_tfm) {
1071 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1072 		switch (drbd_do_auth(connection)) {
1073 		case -1:
1074 			drbd_err(connection, "Authentication of peer failed\n");
1075 			return -1;
1076 		case 0:
1077 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078 			return 0;
1079 		}
1080 	}
1081 
1082 	connection->data.socket->sk->sk_sndtimeo = timeout;
1083 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1084 
1085 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086 		return -1;
1087 
1088 	/* Prevent a race between resync-handshake and
1089 	 * being promoted to Primary.
1090 	 *
1091 	 * Grab and release the state mutex, so we know that any current
1092 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1093 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1094 	 */
1095 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096 		mutex_lock(peer_device->device->state_mutex);
1097 
1098 	set_bit(STATE_SENT, &connection->flags);
1099 
1100 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 		mutex_unlock(peer_device->device->state_mutex);
1102 
1103 	rcu_read_lock();
1104 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105 		struct drbd_device *device = peer_device->device;
1106 		kref_get(&device->kref);
1107 		rcu_read_unlock();
1108 
1109 		if (discard_my_data)
1110 			set_bit(DISCARD_MY_DATA, &device->flags);
1111 		else
1112 			clear_bit(DISCARD_MY_DATA, &device->flags);
1113 
1114 		drbd_connected(peer_device);
1115 		kref_put(&device->kref, drbd_destroy_device);
1116 		rcu_read_lock();
1117 	}
1118 	rcu_read_unlock();
1119 
1120 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122 		clear_bit(STATE_SENT, &connection->flags);
1123 		return 0;
1124 	}
1125 
1126 	drbd_thread_start(&connection->ack_receiver);
1127 	/* opencoded create_singlethread_workqueue(),
1128 	 * to be able to use format string arguments */
1129 	connection->ack_sender =
1130 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131 	if (!connection->ack_sender) {
1132 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133 		return 0;
1134 	}
1135 
1136 	mutex_lock(&connection->resource->conf_update);
1137 	/* The discard_my_data flag is a single-shot modifier to the next
1138 	 * connection attempt, the handshake of which is now well underway.
1139 	 * No need for rcu style copying of the whole struct
1140 	 * just to clear a single value. */
1141 	connection->net_conf->discard_my_data = 0;
1142 	mutex_unlock(&connection->resource->conf_update);
1143 
1144 	return h;
1145 
1146 out_release_sockets:
1147 	if (ad.s_listen)
1148 		sock_release(ad.s_listen);
1149 	if (sock.socket)
1150 		sock_release(sock.socket);
1151 	if (msock.socket)
1152 		sock_release(msock.socket);
1153 	return -1;
1154 }
1155 
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158 	unsigned int header_size = drbd_header_size(connection);
1159 
1160 	if (header_size == sizeof(struct p_header100) &&
1161 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162 		struct p_header100 *h = header;
1163 		if (h->pad != 0) {
1164 			drbd_err(connection, "Header padding is not zero\n");
1165 			return -EINVAL;
1166 		}
1167 		pi->vnr = be16_to_cpu(h->volume);
1168 		pi->cmd = be16_to_cpu(h->command);
1169 		pi->size = be32_to_cpu(h->length);
1170 	} else if (header_size == sizeof(struct p_header95) &&
1171 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172 		struct p_header95 *h = header;
1173 		pi->cmd = be16_to_cpu(h->command);
1174 		pi->size = be32_to_cpu(h->length);
1175 		pi->vnr = 0;
1176 	} else if (header_size == sizeof(struct p_header80) &&
1177 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178 		struct p_header80 *h = header;
1179 		pi->cmd = be16_to_cpu(h->command);
1180 		pi->size = be16_to_cpu(h->length);
1181 		pi->vnr = 0;
1182 	} else {
1183 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184 			 be32_to_cpu(*(__be32 *)header),
1185 			 connection->agreed_pro_version);
1186 		return -EINVAL;
1187 	}
1188 	pi->data = header + header_size;
1189 	return 0;
1190 }
1191 
1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1193 {
1194 	void *buffer = connection->data.rbuf;
1195 	int err;
1196 
1197 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198 	if (err)
1199 		return err;
1200 
1201 	err = decode_header(connection, buffer, pi);
1202 	connection->last_received = jiffies;
1203 
1204 	return err;
1205 }
1206 
1207 static void drbd_flush(struct drbd_connection *connection)
1208 {
1209 	int rv;
1210 	struct drbd_peer_device *peer_device;
1211 	int vnr;
1212 
1213 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214 		rcu_read_lock();
1215 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216 			struct drbd_device *device = peer_device->device;
1217 
1218 			if (!get_ldev(device))
1219 				continue;
1220 			kref_get(&device->kref);
1221 			rcu_read_unlock();
1222 
1223 			/* Right now, we have only this one synchronous code path
1224 			 * for flushes between request epochs.
1225 			 * We may want to make those asynchronous,
1226 			 * or at least parallelize the flushes to the volume devices.
1227 			 */
1228 			device->flush_jif = jiffies;
1229 			set_bit(FLUSH_PENDING, &device->flags);
1230 			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231 					GFP_NOIO, NULL);
1232 			clear_bit(FLUSH_PENDING, &device->flags);
1233 			if (rv) {
1234 				drbd_info(device, "local disk flush failed with status %d\n", rv);
1235 				/* would rather check on EOPNOTSUPP, but that is not reliable.
1236 				 * don't try again for ANY return value != 0
1237 				 * if (rv == -EOPNOTSUPP) */
1238 				drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1239 			}
1240 			put_ldev(device);
1241 			kref_put(&device->kref, drbd_destroy_device);
1242 
1243 			rcu_read_lock();
1244 			if (rv)
1245 				break;
1246 		}
1247 		rcu_read_unlock();
1248 	}
1249 }
1250 
1251 /**
1252  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253  * @device:	DRBD device.
1254  * @epoch:	Epoch object.
1255  * @ev:		Epoch event.
1256  */
1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258 					       struct drbd_epoch *epoch,
1259 					       enum epoch_event ev)
1260 {
1261 	int epoch_size;
1262 	struct drbd_epoch *next_epoch;
1263 	enum finish_epoch rv = FE_STILL_LIVE;
1264 
1265 	spin_lock(&connection->epoch_lock);
1266 	do {
1267 		next_epoch = NULL;
1268 
1269 		epoch_size = atomic_read(&epoch->epoch_size);
1270 
1271 		switch (ev & ~EV_CLEANUP) {
1272 		case EV_PUT:
1273 			atomic_dec(&epoch->active);
1274 			break;
1275 		case EV_GOT_BARRIER_NR:
1276 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277 			break;
1278 		case EV_BECAME_LAST:
1279 			/* nothing to do*/
1280 			break;
1281 		}
1282 
1283 		if (epoch_size != 0 &&
1284 		    atomic_read(&epoch->active) == 0 &&
1285 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286 			if (!(ev & EV_CLEANUP)) {
1287 				spin_unlock(&connection->epoch_lock);
1288 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289 				spin_lock(&connection->epoch_lock);
1290 			}
1291 #if 0
1292 			/* FIXME: dec unacked on connection, once we have
1293 			 * something to count pending connection packets in. */
1294 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295 				dec_unacked(epoch->connection);
1296 #endif
1297 
1298 			if (connection->current_epoch != epoch) {
1299 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300 				list_del(&epoch->list);
1301 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302 				connection->epochs--;
1303 				kfree(epoch);
1304 
1305 				if (rv == FE_STILL_LIVE)
1306 					rv = FE_DESTROYED;
1307 			} else {
1308 				epoch->flags = 0;
1309 				atomic_set(&epoch->epoch_size, 0);
1310 				/* atomic_set(&epoch->active, 0); is already zero */
1311 				if (rv == FE_STILL_LIVE)
1312 					rv = FE_RECYCLED;
1313 			}
1314 		}
1315 
1316 		if (!next_epoch)
1317 			break;
1318 
1319 		epoch = next_epoch;
1320 	} while (1);
1321 
1322 	spin_unlock(&connection->epoch_lock);
1323 
1324 	return rv;
1325 }
1326 
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1329 {
1330 	struct disk_conf *dc;
1331 
1332 	dc = rcu_dereference(bdev->disk_conf);
1333 
1334 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335 		wo = WO_DRAIN_IO;
1336 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337 		wo = WO_NONE;
1338 
1339 	return wo;
1340 }
1341 
1342 /**
1343  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344  * @connection:	DRBD connection.
1345  * @wo:		Write ordering method to try.
1346  */
1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348 			      enum write_ordering_e wo)
1349 {
1350 	struct drbd_device *device;
1351 	enum write_ordering_e pwo;
1352 	int vnr;
1353 	static char *write_ordering_str[] = {
1354 		[WO_NONE] = "none",
1355 		[WO_DRAIN_IO] = "drain",
1356 		[WO_BDEV_FLUSH] = "flush",
1357 	};
1358 
1359 	pwo = resource->write_ordering;
1360 	if (wo != WO_BDEV_FLUSH)
1361 		wo = min(pwo, wo);
1362 	rcu_read_lock();
1363 	idr_for_each_entry(&resource->devices, device, vnr) {
1364 		if (get_ldev(device)) {
1365 			wo = max_allowed_wo(device->ldev, wo);
1366 			if (device->ldev == bdev)
1367 				bdev = NULL;
1368 			put_ldev(device);
1369 		}
1370 	}
1371 
1372 	if (bdev)
1373 		wo = max_allowed_wo(bdev, wo);
1374 
1375 	rcu_read_unlock();
1376 
1377 	resource->write_ordering = wo;
1378 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1380 }
1381 
1382 /**
1383  * drbd_submit_peer_request()
1384  * @device:	DRBD device.
1385  * @peer_req:	peer request
1386  * @rw:		flag field, see bio->bi_rw
1387  *
1388  * May spread the pages to multiple bios,
1389  * depending on bio_add_page restrictions.
1390  *
1391  * Returns 0 if all bios have been submitted,
1392  * -ENOMEM if we could not allocate enough bios,
1393  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394  *  single page to an empty bio (which should never happen and likely indicates
1395  *  that the lower level IO stack is in some way broken). This has been observed
1396  *  on certain Xen deployments.
1397  */
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device *device,
1400 			     struct drbd_peer_request *peer_req,
1401 			     const unsigned rw, const int fault_type)
1402 {
1403 	struct bio *bios = NULL;
1404 	struct bio *bio;
1405 	struct page *page = peer_req->pages;
1406 	sector_t sector = peer_req->i.sector;
1407 	unsigned data_size = peer_req->i.size;
1408 	unsigned n_bios = 0;
1409 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1410 	int err = -ENOMEM;
1411 
1412 	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1413 		/* wait for all pending IO completions, before we start
1414 		 * zeroing things out. */
1415 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1416 		/* add it to the active list now,
1417 		 * so we can find it to present it in debugfs */
1418 		peer_req->submit_jif = jiffies;
1419 		peer_req->flags |= EE_SUBMITTED;
1420 		spin_lock_irq(&device->resource->req_lock);
1421 		list_add_tail(&peer_req->w.list, &device->active_ee);
1422 		spin_unlock_irq(&device->resource->req_lock);
1423 		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1424 			sector, data_size >> 9, GFP_NOIO, false))
1425 			peer_req->flags |= EE_WAS_ERROR;
1426 		drbd_endio_write_sec_final(peer_req);
1427 		return 0;
1428 	}
1429 
1430 	/* Discards don't have any payload.
1431 	 * But the scsi layer still expects a bio_vec it can use internally,
1432 	 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1433 	if (peer_req->flags & EE_IS_TRIM)
1434 		nr_pages = 1;
1435 
1436 	/* In most cases, we will only need one bio.  But in case the lower
1437 	 * level restrictions happen to be different at this offset on this
1438 	 * side than those of the sending peer, we may need to submit the
1439 	 * request in more than one bio.
1440 	 *
1441 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1442 	 * generated bio, but a bio allocated on behalf of the peer.
1443 	 */
1444 next_bio:
1445 	bio = bio_alloc(GFP_NOIO, nr_pages);
1446 	if (!bio) {
1447 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1448 		goto fail;
1449 	}
1450 	/* > peer_req->i.sector, unless this is the first bio */
1451 	bio->bi_iter.bi_sector = sector;
1452 	bio->bi_bdev = device->ldev->backing_bdev;
1453 	bio->bi_rw = rw;
1454 	bio->bi_private = peer_req;
1455 	bio->bi_end_io = drbd_peer_request_endio;
1456 
1457 	bio->bi_next = bios;
1458 	bios = bio;
1459 	++n_bios;
1460 
1461 	if (rw & REQ_DISCARD) {
1462 		bio->bi_iter.bi_size = data_size;
1463 		goto submit;
1464 	}
1465 
1466 	page_chain_for_each(page) {
1467 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1468 		if (!bio_add_page(bio, page, len, 0)) {
1469 			/* A single page must always be possible!
1470 			 * But in case it fails anyways,
1471 			 * we deal with it, and complain (below). */
1472 			if (bio->bi_vcnt == 0) {
1473 				drbd_err(device,
1474 					"bio_add_page failed for len=%u, "
1475 					"bi_vcnt=0 (bi_sector=%llu)\n",
1476 					len, (uint64_t)bio->bi_iter.bi_sector);
1477 				err = -ENOSPC;
1478 				goto fail;
1479 			}
1480 			goto next_bio;
1481 		}
1482 		data_size -= len;
1483 		sector += len >> 9;
1484 		--nr_pages;
1485 	}
1486 	D_ASSERT(device, data_size == 0);
1487 submit:
1488 	D_ASSERT(device, page == NULL);
1489 
1490 	atomic_set(&peer_req->pending_bios, n_bios);
1491 	/* for debugfs: update timestamp, mark as submitted */
1492 	peer_req->submit_jif = jiffies;
1493 	peer_req->flags |= EE_SUBMITTED;
1494 	do {
1495 		bio = bios;
1496 		bios = bios->bi_next;
1497 		bio->bi_next = NULL;
1498 
1499 		drbd_generic_make_request(device, fault_type, bio);
1500 	} while (bios);
1501 	return 0;
1502 
1503 fail:
1504 	while (bios) {
1505 		bio = bios;
1506 		bios = bios->bi_next;
1507 		bio_put(bio);
1508 	}
1509 	return err;
1510 }
1511 
1512 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1513 					     struct drbd_peer_request *peer_req)
1514 {
1515 	struct drbd_interval *i = &peer_req->i;
1516 
1517 	drbd_remove_interval(&device->write_requests, i);
1518 	drbd_clear_interval(i);
1519 
1520 	/* Wake up any processes waiting for this peer request to complete.  */
1521 	if (i->waiting)
1522 		wake_up(&device->misc_wait);
1523 }
1524 
1525 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1526 {
1527 	struct drbd_peer_device *peer_device;
1528 	int vnr;
1529 
1530 	rcu_read_lock();
1531 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1532 		struct drbd_device *device = peer_device->device;
1533 
1534 		kref_get(&device->kref);
1535 		rcu_read_unlock();
1536 		drbd_wait_ee_list_empty(device, &device->active_ee);
1537 		kref_put(&device->kref, drbd_destroy_device);
1538 		rcu_read_lock();
1539 	}
1540 	rcu_read_unlock();
1541 }
1542 
1543 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1544 {
1545 	int rv;
1546 	struct p_barrier *p = pi->data;
1547 	struct drbd_epoch *epoch;
1548 
1549 	/* FIXME these are unacked on connection,
1550 	 * not a specific (peer)device.
1551 	 */
1552 	connection->current_epoch->barrier_nr = p->barrier;
1553 	connection->current_epoch->connection = connection;
1554 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1555 
1556 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1557 	 * the activity log, which means it would not be resynced in case the
1558 	 * R_PRIMARY crashes now.
1559 	 * Therefore we must send the barrier_ack after the barrier request was
1560 	 * completed. */
1561 	switch (connection->resource->write_ordering) {
1562 	case WO_NONE:
1563 		if (rv == FE_RECYCLED)
1564 			return 0;
1565 
1566 		/* receiver context, in the writeout path of the other node.
1567 		 * avoid potential distributed deadlock */
1568 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1569 		if (epoch)
1570 			break;
1571 		else
1572 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1573 			/* Fall through */
1574 
1575 	case WO_BDEV_FLUSH:
1576 	case WO_DRAIN_IO:
1577 		conn_wait_active_ee_empty(connection);
1578 		drbd_flush(connection);
1579 
1580 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1581 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1582 			if (epoch)
1583 				break;
1584 		}
1585 
1586 		return 0;
1587 	default:
1588 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1589 			 connection->resource->write_ordering);
1590 		return -EIO;
1591 	}
1592 
1593 	epoch->flags = 0;
1594 	atomic_set(&epoch->epoch_size, 0);
1595 	atomic_set(&epoch->active, 0);
1596 
1597 	spin_lock(&connection->epoch_lock);
1598 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1599 		list_add(&epoch->list, &connection->current_epoch->list);
1600 		connection->current_epoch = epoch;
1601 		connection->epochs++;
1602 	} else {
1603 		/* The current_epoch got recycled while we allocated this one... */
1604 		kfree(epoch);
1605 	}
1606 	spin_unlock(&connection->epoch_lock);
1607 
1608 	return 0;
1609 }
1610 
1611 /* used from receive_RSDataReply (recv_resync_read)
1612  * and from receive_Data */
1613 static struct drbd_peer_request *
1614 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1615 	      struct packet_info *pi) __must_hold(local)
1616 {
1617 	struct drbd_device *device = peer_device->device;
1618 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1619 	struct drbd_peer_request *peer_req;
1620 	struct page *page;
1621 	int digest_size, err;
1622 	unsigned int data_size = pi->size, ds;
1623 	void *dig_in = peer_device->connection->int_dig_in;
1624 	void *dig_vv = peer_device->connection->int_dig_vv;
1625 	unsigned long *data;
1626 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1627 
1628 	digest_size = 0;
1629 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1630 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1631 		/*
1632 		 * FIXME: Receive the incoming digest into the receive buffer
1633 		 *	  here, together with its struct p_data?
1634 		 */
1635 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1636 		if (err)
1637 			return NULL;
1638 		data_size -= digest_size;
1639 	}
1640 
1641 	if (trim) {
1642 		D_ASSERT(peer_device, data_size == 0);
1643 		data_size = be32_to_cpu(trim->size);
1644 	}
1645 
1646 	if (!expect(IS_ALIGNED(data_size, 512)))
1647 		return NULL;
1648 	/* prepare for larger trim requests. */
1649 	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1650 		return NULL;
1651 
1652 	/* even though we trust out peer,
1653 	 * we sometimes have to double check. */
1654 	if (sector + (data_size>>9) > capacity) {
1655 		drbd_err(device, "request from peer beyond end of local disk: "
1656 			"capacity: %llus < sector: %llus + size: %u\n",
1657 			(unsigned long long)capacity,
1658 			(unsigned long long)sector, data_size);
1659 		return NULL;
1660 	}
1661 
1662 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1663 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1664 	 * which in turn might block on the other node at this very place.  */
1665 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1666 	if (!peer_req)
1667 		return NULL;
1668 
1669 	peer_req->flags |= EE_WRITE;
1670 	if (trim)
1671 		return peer_req;
1672 
1673 	ds = data_size;
1674 	page = peer_req->pages;
1675 	page_chain_for_each(page) {
1676 		unsigned len = min_t(int, ds, PAGE_SIZE);
1677 		data = kmap(page);
1678 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1679 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1680 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1681 			data[0] = data[0] ^ (unsigned long)-1;
1682 		}
1683 		kunmap(page);
1684 		if (err) {
1685 			drbd_free_peer_req(device, peer_req);
1686 			return NULL;
1687 		}
1688 		ds -= len;
1689 	}
1690 
1691 	if (digest_size) {
1692 		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1693 		if (memcmp(dig_in, dig_vv, digest_size)) {
1694 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1695 				(unsigned long long)sector, data_size);
1696 			drbd_free_peer_req(device, peer_req);
1697 			return NULL;
1698 		}
1699 	}
1700 	device->recv_cnt += data_size >> 9;
1701 	return peer_req;
1702 }
1703 
1704 /* drbd_drain_block() just takes a data block
1705  * out of the socket input buffer, and discards it.
1706  */
1707 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1708 {
1709 	struct page *page;
1710 	int err = 0;
1711 	void *data;
1712 
1713 	if (!data_size)
1714 		return 0;
1715 
1716 	page = drbd_alloc_pages(peer_device, 1, 1);
1717 
1718 	data = kmap(page);
1719 	while (data_size) {
1720 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1721 
1722 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1723 		if (err)
1724 			break;
1725 		data_size -= len;
1726 	}
1727 	kunmap(page);
1728 	drbd_free_pages(peer_device->device, page, 0);
1729 	return err;
1730 }
1731 
1732 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1733 			   sector_t sector, int data_size)
1734 {
1735 	struct bio_vec bvec;
1736 	struct bvec_iter iter;
1737 	struct bio *bio;
1738 	int digest_size, err, expect;
1739 	void *dig_in = peer_device->connection->int_dig_in;
1740 	void *dig_vv = peer_device->connection->int_dig_vv;
1741 
1742 	digest_size = 0;
1743 	if (peer_device->connection->peer_integrity_tfm) {
1744 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1745 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1746 		if (err)
1747 			return err;
1748 		data_size -= digest_size;
1749 	}
1750 
1751 	/* optimistically update recv_cnt.  if receiving fails below,
1752 	 * we disconnect anyways, and counters will be reset. */
1753 	peer_device->device->recv_cnt += data_size>>9;
1754 
1755 	bio = req->master_bio;
1756 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1757 
1758 	bio_for_each_segment(bvec, bio, iter) {
1759 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1760 		expect = min_t(int, data_size, bvec.bv_len);
1761 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1762 		kunmap(bvec.bv_page);
1763 		if (err)
1764 			return err;
1765 		data_size -= expect;
1766 	}
1767 
1768 	if (digest_size) {
1769 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1770 		if (memcmp(dig_in, dig_vv, digest_size)) {
1771 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1772 			return -EINVAL;
1773 		}
1774 	}
1775 
1776 	D_ASSERT(peer_device->device, data_size == 0);
1777 	return 0;
1778 }
1779 
1780 /*
1781  * e_end_resync_block() is called in ack_sender context via
1782  * drbd_finish_peer_reqs().
1783  */
1784 static int e_end_resync_block(struct drbd_work *w, int unused)
1785 {
1786 	struct drbd_peer_request *peer_req =
1787 		container_of(w, struct drbd_peer_request, w);
1788 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1789 	struct drbd_device *device = peer_device->device;
1790 	sector_t sector = peer_req->i.sector;
1791 	int err;
1792 
1793 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1794 
1795 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1796 		drbd_set_in_sync(device, sector, peer_req->i.size);
1797 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1798 	} else {
1799 		/* Record failure to sync */
1800 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1801 
1802 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1803 	}
1804 	dec_unacked(device);
1805 
1806 	return err;
1807 }
1808 
1809 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1810 			    struct packet_info *pi) __releases(local)
1811 {
1812 	struct drbd_device *device = peer_device->device;
1813 	struct drbd_peer_request *peer_req;
1814 
1815 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1816 	if (!peer_req)
1817 		goto fail;
1818 
1819 	dec_rs_pending(device);
1820 
1821 	inc_unacked(device);
1822 	/* corresponding dec_unacked() in e_end_resync_block()
1823 	 * respective _drbd_clear_done_ee */
1824 
1825 	peer_req->w.cb = e_end_resync_block;
1826 	peer_req->submit_jif = jiffies;
1827 
1828 	spin_lock_irq(&device->resource->req_lock);
1829 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1830 	spin_unlock_irq(&device->resource->req_lock);
1831 
1832 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1833 	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1834 		return 0;
1835 
1836 	/* don't care for the reason here */
1837 	drbd_err(device, "submit failed, triggering re-connect\n");
1838 	spin_lock_irq(&device->resource->req_lock);
1839 	list_del(&peer_req->w.list);
1840 	spin_unlock_irq(&device->resource->req_lock);
1841 
1842 	drbd_free_peer_req(device, peer_req);
1843 fail:
1844 	put_ldev(device);
1845 	return -EIO;
1846 }
1847 
1848 static struct drbd_request *
1849 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1850 	     sector_t sector, bool missing_ok, const char *func)
1851 {
1852 	struct drbd_request *req;
1853 
1854 	/* Request object according to our peer */
1855 	req = (struct drbd_request *)(unsigned long)id;
1856 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1857 		return req;
1858 	if (!missing_ok) {
1859 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1860 			(unsigned long)id, (unsigned long long)sector);
1861 	}
1862 	return NULL;
1863 }
1864 
1865 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1866 {
1867 	struct drbd_peer_device *peer_device;
1868 	struct drbd_device *device;
1869 	struct drbd_request *req;
1870 	sector_t sector;
1871 	int err;
1872 	struct p_data *p = pi->data;
1873 
1874 	peer_device = conn_peer_device(connection, pi->vnr);
1875 	if (!peer_device)
1876 		return -EIO;
1877 	device = peer_device->device;
1878 
1879 	sector = be64_to_cpu(p->sector);
1880 
1881 	spin_lock_irq(&device->resource->req_lock);
1882 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1883 	spin_unlock_irq(&device->resource->req_lock);
1884 	if (unlikely(!req))
1885 		return -EIO;
1886 
1887 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1888 	 * special casing it there for the various failure cases.
1889 	 * still no race with drbd_fail_pending_reads */
1890 	err = recv_dless_read(peer_device, req, sector, pi->size);
1891 	if (!err)
1892 		req_mod(req, DATA_RECEIVED);
1893 	/* else: nothing. handled from drbd_disconnect...
1894 	 * I don't think we may complete this just yet
1895 	 * in case we are "on-disconnect: freeze" */
1896 
1897 	return err;
1898 }
1899 
1900 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1901 {
1902 	struct drbd_peer_device *peer_device;
1903 	struct drbd_device *device;
1904 	sector_t sector;
1905 	int err;
1906 	struct p_data *p = pi->data;
1907 
1908 	peer_device = conn_peer_device(connection, pi->vnr);
1909 	if (!peer_device)
1910 		return -EIO;
1911 	device = peer_device->device;
1912 
1913 	sector = be64_to_cpu(p->sector);
1914 	D_ASSERT(device, p->block_id == ID_SYNCER);
1915 
1916 	if (get_ldev(device)) {
1917 		/* data is submitted to disk within recv_resync_read.
1918 		 * corresponding put_ldev done below on error,
1919 		 * or in drbd_peer_request_endio. */
1920 		err = recv_resync_read(peer_device, sector, pi);
1921 	} else {
1922 		if (__ratelimit(&drbd_ratelimit_state))
1923 			drbd_err(device, "Can not write resync data to local disk.\n");
1924 
1925 		err = drbd_drain_block(peer_device, pi->size);
1926 
1927 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1928 	}
1929 
1930 	atomic_add(pi->size >> 9, &device->rs_sect_in);
1931 
1932 	return err;
1933 }
1934 
1935 static void restart_conflicting_writes(struct drbd_device *device,
1936 				       sector_t sector, int size)
1937 {
1938 	struct drbd_interval *i;
1939 	struct drbd_request *req;
1940 
1941 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1942 		if (!i->local)
1943 			continue;
1944 		req = container_of(i, struct drbd_request, i);
1945 		if (req->rq_state & RQ_LOCAL_PENDING ||
1946 		    !(req->rq_state & RQ_POSTPONED))
1947 			continue;
1948 		/* as it is RQ_POSTPONED, this will cause it to
1949 		 * be queued on the retry workqueue. */
1950 		__req_mod(req, CONFLICT_RESOLVED, NULL);
1951 	}
1952 }
1953 
1954 /*
1955  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1956  */
1957 static int e_end_block(struct drbd_work *w, int cancel)
1958 {
1959 	struct drbd_peer_request *peer_req =
1960 		container_of(w, struct drbd_peer_request, w);
1961 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1962 	struct drbd_device *device = peer_device->device;
1963 	sector_t sector = peer_req->i.sector;
1964 	int err = 0, pcmd;
1965 
1966 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1967 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1968 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1969 				device->state.conn <= C_PAUSED_SYNC_T &&
1970 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1971 				P_RS_WRITE_ACK : P_WRITE_ACK;
1972 			err = drbd_send_ack(peer_device, pcmd, peer_req);
1973 			if (pcmd == P_RS_WRITE_ACK)
1974 				drbd_set_in_sync(device, sector, peer_req->i.size);
1975 		} else {
1976 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1977 			/* we expect it to be marked out of sync anyways...
1978 			 * maybe assert this?  */
1979 		}
1980 		dec_unacked(device);
1981 	}
1982 
1983 	/* we delete from the conflict detection hash _after_ we sent out the
1984 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1985 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1986 		spin_lock_irq(&device->resource->req_lock);
1987 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1988 		drbd_remove_epoch_entry_interval(device, peer_req);
1989 		if (peer_req->flags & EE_RESTART_REQUESTS)
1990 			restart_conflicting_writes(device, sector, peer_req->i.size);
1991 		spin_unlock_irq(&device->resource->req_lock);
1992 	} else
1993 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1994 
1995 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1996 
1997 	return err;
1998 }
1999 
2000 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2001 {
2002 	struct drbd_peer_request *peer_req =
2003 		container_of(w, struct drbd_peer_request, w);
2004 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2005 	int err;
2006 
2007 	err = drbd_send_ack(peer_device, ack, peer_req);
2008 	dec_unacked(peer_device->device);
2009 
2010 	return err;
2011 }
2012 
2013 static int e_send_superseded(struct drbd_work *w, int unused)
2014 {
2015 	return e_send_ack(w, P_SUPERSEDED);
2016 }
2017 
2018 static int e_send_retry_write(struct drbd_work *w, int unused)
2019 {
2020 	struct drbd_peer_request *peer_req =
2021 		container_of(w, struct drbd_peer_request, w);
2022 	struct drbd_connection *connection = peer_req->peer_device->connection;
2023 
2024 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2025 			     P_RETRY_WRITE : P_SUPERSEDED);
2026 }
2027 
2028 static bool seq_greater(u32 a, u32 b)
2029 {
2030 	/*
2031 	 * We assume 32-bit wrap-around here.
2032 	 * For 24-bit wrap-around, we would have to shift:
2033 	 *  a <<= 8; b <<= 8;
2034 	 */
2035 	return (s32)a - (s32)b > 0;
2036 }
2037 
2038 static u32 seq_max(u32 a, u32 b)
2039 {
2040 	return seq_greater(a, b) ? a : b;
2041 }
2042 
2043 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2044 {
2045 	struct drbd_device *device = peer_device->device;
2046 	unsigned int newest_peer_seq;
2047 
2048 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2049 		spin_lock(&device->peer_seq_lock);
2050 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2051 		device->peer_seq = newest_peer_seq;
2052 		spin_unlock(&device->peer_seq_lock);
2053 		/* wake up only if we actually changed device->peer_seq */
2054 		if (peer_seq == newest_peer_seq)
2055 			wake_up(&device->seq_wait);
2056 	}
2057 }
2058 
2059 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2060 {
2061 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2062 }
2063 
2064 /* maybe change sync_ee into interval trees as well? */
2065 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2066 {
2067 	struct drbd_peer_request *rs_req;
2068 	bool rv = 0;
2069 
2070 	spin_lock_irq(&device->resource->req_lock);
2071 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2072 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2073 			     rs_req->i.sector, rs_req->i.size)) {
2074 			rv = 1;
2075 			break;
2076 		}
2077 	}
2078 	spin_unlock_irq(&device->resource->req_lock);
2079 
2080 	return rv;
2081 }
2082 
2083 /* Called from receive_Data.
2084  * Synchronize packets on sock with packets on msock.
2085  *
2086  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2087  * packet traveling on msock, they are still processed in the order they have
2088  * been sent.
2089  *
2090  * Note: we don't care for Ack packets overtaking P_DATA packets.
2091  *
2092  * In case packet_seq is larger than device->peer_seq number, there are
2093  * outstanding packets on the msock. We wait for them to arrive.
2094  * In case we are the logically next packet, we update device->peer_seq
2095  * ourselves. Correctly handles 32bit wrap around.
2096  *
2097  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2098  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2099  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2100  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2101  *
2102  * returns 0 if we may process the packet,
2103  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2104 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2105 {
2106 	struct drbd_device *device = peer_device->device;
2107 	DEFINE_WAIT(wait);
2108 	long timeout;
2109 	int ret = 0, tp;
2110 
2111 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2112 		return 0;
2113 
2114 	spin_lock(&device->peer_seq_lock);
2115 	for (;;) {
2116 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2117 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2118 			break;
2119 		}
2120 
2121 		if (signal_pending(current)) {
2122 			ret = -ERESTARTSYS;
2123 			break;
2124 		}
2125 
2126 		rcu_read_lock();
2127 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2128 		rcu_read_unlock();
2129 
2130 		if (!tp)
2131 			break;
2132 
2133 		/* Only need to wait if two_primaries is enabled */
2134 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2135 		spin_unlock(&device->peer_seq_lock);
2136 		rcu_read_lock();
2137 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2138 		rcu_read_unlock();
2139 		timeout = schedule_timeout(timeout);
2140 		spin_lock(&device->peer_seq_lock);
2141 		if (!timeout) {
2142 			ret = -ETIMEDOUT;
2143 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2144 			break;
2145 		}
2146 	}
2147 	spin_unlock(&device->peer_seq_lock);
2148 	finish_wait(&device->seq_wait, &wait);
2149 	return ret;
2150 }
2151 
2152 /* see also bio_flags_to_wire()
2153  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2154  * flags and back. We may replicate to other kernel versions. */
2155 static unsigned long wire_flags_to_bio(u32 dpf)
2156 {
2157 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2158 		(dpf & DP_FUA ? REQ_FUA : 0) |
2159 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2160 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2161 }
2162 
2163 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2164 				    unsigned int size)
2165 {
2166 	struct drbd_interval *i;
2167 
2168     repeat:
2169 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2170 		struct drbd_request *req;
2171 		struct bio_and_error m;
2172 
2173 		if (!i->local)
2174 			continue;
2175 		req = container_of(i, struct drbd_request, i);
2176 		if (!(req->rq_state & RQ_POSTPONED))
2177 			continue;
2178 		req->rq_state &= ~RQ_POSTPONED;
2179 		__req_mod(req, NEG_ACKED, &m);
2180 		spin_unlock_irq(&device->resource->req_lock);
2181 		if (m.bio)
2182 			complete_master_bio(device, &m);
2183 		spin_lock_irq(&device->resource->req_lock);
2184 		goto repeat;
2185 	}
2186 }
2187 
2188 static int handle_write_conflicts(struct drbd_device *device,
2189 				  struct drbd_peer_request *peer_req)
2190 {
2191 	struct drbd_connection *connection = peer_req->peer_device->connection;
2192 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2193 	sector_t sector = peer_req->i.sector;
2194 	const unsigned int size = peer_req->i.size;
2195 	struct drbd_interval *i;
2196 	bool equal;
2197 	int err;
2198 
2199 	/*
2200 	 * Inserting the peer request into the write_requests tree will prevent
2201 	 * new conflicting local requests from being added.
2202 	 */
2203 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2204 
2205     repeat:
2206 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2207 		if (i == &peer_req->i)
2208 			continue;
2209 		if (i->completed)
2210 			continue;
2211 
2212 		if (!i->local) {
2213 			/*
2214 			 * Our peer has sent a conflicting remote request; this
2215 			 * should not happen in a two-node setup.  Wait for the
2216 			 * earlier peer request to complete.
2217 			 */
2218 			err = drbd_wait_misc(device, i);
2219 			if (err)
2220 				goto out;
2221 			goto repeat;
2222 		}
2223 
2224 		equal = i->sector == sector && i->size == size;
2225 		if (resolve_conflicts) {
2226 			/*
2227 			 * If the peer request is fully contained within the
2228 			 * overlapping request, it can be considered overwritten
2229 			 * and thus superseded; otherwise, it will be retried
2230 			 * once all overlapping requests have completed.
2231 			 */
2232 			bool superseded = i->sector <= sector && i->sector +
2233 				       (i->size >> 9) >= sector + (size >> 9);
2234 
2235 			if (!equal)
2236 				drbd_alert(device, "Concurrent writes detected: "
2237 					       "local=%llus +%u, remote=%llus +%u, "
2238 					       "assuming %s came first\n",
2239 					  (unsigned long long)i->sector, i->size,
2240 					  (unsigned long long)sector, size,
2241 					  superseded ? "local" : "remote");
2242 
2243 			peer_req->w.cb = superseded ? e_send_superseded :
2244 						   e_send_retry_write;
2245 			list_add_tail(&peer_req->w.list, &device->done_ee);
2246 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2247 
2248 			err = -ENOENT;
2249 			goto out;
2250 		} else {
2251 			struct drbd_request *req =
2252 				container_of(i, struct drbd_request, i);
2253 
2254 			if (!equal)
2255 				drbd_alert(device, "Concurrent writes detected: "
2256 					       "local=%llus +%u, remote=%llus +%u\n",
2257 					  (unsigned long long)i->sector, i->size,
2258 					  (unsigned long long)sector, size);
2259 
2260 			if (req->rq_state & RQ_LOCAL_PENDING ||
2261 			    !(req->rq_state & RQ_POSTPONED)) {
2262 				/*
2263 				 * Wait for the node with the discard flag to
2264 				 * decide if this request has been superseded
2265 				 * or needs to be retried.
2266 				 * Requests that have been superseded will
2267 				 * disappear from the write_requests tree.
2268 				 *
2269 				 * In addition, wait for the conflicting
2270 				 * request to finish locally before submitting
2271 				 * the conflicting peer request.
2272 				 */
2273 				err = drbd_wait_misc(device, &req->i);
2274 				if (err) {
2275 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2276 					fail_postponed_requests(device, sector, size);
2277 					goto out;
2278 				}
2279 				goto repeat;
2280 			}
2281 			/*
2282 			 * Remember to restart the conflicting requests after
2283 			 * the new peer request has completed.
2284 			 */
2285 			peer_req->flags |= EE_RESTART_REQUESTS;
2286 		}
2287 	}
2288 	err = 0;
2289 
2290     out:
2291 	if (err)
2292 		drbd_remove_epoch_entry_interval(device, peer_req);
2293 	return err;
2294 }
2295 
2296 /* mirrored write */
2297 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2298 {
2299 	struct drbd_peer_device *peer_device;
2300 	struct drbd_device *device;
2301 	struct net_conf *nc;
2302 	sector_t sector;
2303 	struct drbd_peer_request *peer_req;
2304 	struct p_data *p = pi->data;
2305 	u32 peer_seq = be32_to_cpu(p->seq_num);
2306 	int rw = WRITE;
2307 	u32 dp_flags;
2308 	int err, tp;
2309 
2310 	peer_device = conn_peer_device(connection, pi->vnr);
2311 	if (!peer_device)
2312 		return -EIO;
2313 	device = peer_device->device;
2314 
2315 	if (!get_ldev(device)) {
2316 		int err2;
2317 
2318 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2319 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2320 		atomic_inc(&connection->current_epoch->epoch_size);
2321 		err2 = drbd_drain_block(peer_device, pi->size);
2322 		if (!err)
2323 			err = err2;
2324 		return err;
2325 	}
2326 
2327 	/*
2328 	 * Corresponding put_ldev done either below (on various errors), or in
2329 	 * drbd_peer_request_endio, if we successfully submit the data at the
2330 	 * end of this function.
2331 	 */
2332 
2333 	sector = be64_to_cpu(p->sector);
2334 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2335 	if (!peer_req) {
2336 		put_ldev(device);
2337 		return -EIO;
2338 	}
2339 
2340 	peer_req->w.cb = e_end_block;
2341 	peer_req->submit_jif = jiffies;
2342 	peer_req->flags |= EE_APPLICATION;
2343 
2344 	dp_flags = be32_to_cpu(p->dp_flags);
2345 	rw |= wire_flags_to_bio(dp_flags);
2346 	if (pi->cmd == P_TRIM) {
2347 		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2348 		peer_req->flags |= EE_IS_TRIM;
2349 		if (!blk_queue_discard(q))
2350 			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2351 		D_ASSERT(peer_device, peer_req->i.size > 0);
2352 		D_ASSERT(peer_device, rw & REQ_DISCARD);
2353 		D_ASSERT(peer_device, peer_req->pages == NULL);
2354 	} else if (peer_req->pages == NULL) {
2355 		D_ASSERT(device, peer_req->i.size == 0);
2356 		D_ASSERT(device, dp_flags & DP_FLUSH);
2357 	}
2358 
2359 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2360 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2361 
2362 	spin_lock(&connection->epoch_lock);
2363 	peer_req->epoch = connection->current_epoch;
2364 	atomic_inc(&peer_req->epoch->epoch_size);
2365 	atomic_inc(&peer_req->epoch->active);
2366 	spin_unlock(&connection->epoch_lock);
2367 
2368 	rcu_read_lock();
2369 	nc = rcu_dereference(peer_device->connection->net_conf);
2370 	tp = nc->two_primaries;
2371 	if (peer_device->connection->agreed_pro_version < 100) {
2372 		switch (nc->wire_protocol) {
2373 		case DRBD_PROT_C:
2374 			dp_flags |= DP_SEND_WRITE_ACK;
2375 			break;
2376 		case DRBD_PROT_B:
2377 			dp_flags |= DP_SEND_RECEIVE_ACK;
2378 			break;
2379 		}
2380 	}
2381 	rcu_read_unlock();
2382 
2383 	if (dp_flags & DP_SEND_WRITE_ACK) {
2384 		peer_req->flags |= EE_SEND_WRITE_ACK;
2385 		inc_unacked(device);
2386 		/* corresponding dec_unacked() in e_end_block()
2387 		 * respective _drbd_clear_done_ee */
2388 	}
2389 
2390 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2391 		/* I really don't like it that the receiver thread
2392 		 * sends on the msock, but anyways */
2393 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2394 	}
2395 
2396 	if (tp) {
2397 		/* two primaries implies protocol C */
2398 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2399 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2400 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2401 		if (err)
2402 			goto out_interrupted;
2403 		spin_lock_irq(&device->resource->req_lock);
2404 		err = handle_write_conflicts(device, peer_req);
2405 		if (err) {
2406 			spin_unlock_irq(&device->resource->req_lock);
2407 			if (err == -ENOENT) {
2408 				put_ldev(device);
2409 				return 0;
2410 			}
2411 			goto out_interrupted;
2412 		}
2413 	} else {
2414 		update_peer_seq(peer_device, peer_seq);
2415 		spin_lock_irq(&device->resource->req_lock);
2416 	}
2417 	/* if we use the zeroout fallback code, we process synchronously
2418 	 * and we wait for all pending requests, respectively wait for
2419 	 * active_ee to become empty in drbd_submit_peer_request();
2420 	 * better not add ourselves here. */
2421 	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2422 		list_add_tail(&peer_req->w.list, &device->active_ee);
2423 	spin_unlock_irq(&device->resource->req_lock);
2424 
2425 	if (device->state.conn == C_SYNC_TARGET)
2426 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2427 
2428 	if (device->state.pdsk < D_INCONSISTENT) {
2429 		/* In case we have the only disk of the cluster, */
2430 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2431 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2432 		drbd_al_begin_io(device, &peer_req->i);
2433 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2434 	}
2435 
2436 	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2437 	if (!err)
2438 		return 0;
2439 
2440 	/* don't care for the reason here */
2441 	drbd_err(device, "submit failed, triggering re-connect\n");
2442 	spin_lock_irq(&device->resource->req_lock);
2443 	list_del(&peer_req->w.list);
2444 	drbd_remove_epoch_entry_interval(device, peer_req);
2445 	spin_unlock_irq(&device->resource->req_lock);
2446 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2447 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2448 		drbd_al_complete_io(device, &peer_req->i);
2449 	}
2450 
2451 out_interrupted:
2452 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2453 	put_ldev(device);
2454 	drbd_free_peer_req(device, peer_req);
2455 	return err;
2456 }
2457 
2458 /* We may throttle resync, if the lower device seems to be busy,
2459  * and current sync rate is above c_min_rate.
2460  *
2461  * To decide whether or not the lower device is busy, we use a scheme similar
2462  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2463  * (more than 64 sectors) of activity we cannot account for with our own resync
2464  * activity, it obviously is "busy".
2465  *
2466  * The current sync rate used here uses only the most recent two step marks,
2467  * to have a short time average so we can react faster.
2468  */
2469 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2470 		bool throttle_if_app_is_waiting)
2471 {
2472 	struct lc_element *tmp;
2473 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2474 
2475 	if (!throttle || throttle_if_app_is_waiting)
2476 		return throttle;
2477 
2478 	spin_lock_irq(&device->al_lock);
2479 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2480 	if (tmp) {
2481 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2482 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2483 			throttle = false;
2484 		/* Do not slow down if app IO is already waiting for this extent,
2485 		 * and our progress is necessary for application IO to complete. */
2486 	}
2487 	spin_unlock_irq(&device->al_lock);
2488 
2489 	return throttle;
2490 }
2491 
2492 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2493 {
2494 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2495 	unsigned long db, dt, dbdt;
2496 	unsigned int c_min_rate;
2497 	int curr_events;
2498 
2499 	rcu_read_lock();
2500 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2501 	rcu_read_unlock();
2502 
2503 	/* feature disabled? */
2504 	if (c_min_rate == 0)
2505 		return false;
2506 
2507 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2508 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2509 			atomic_read(&device->rs_sect_ev);
2510 
2511 	if (atomic_read(&device->ap_actlog_cnt)
2512 	    || curr_events - device->rs_last_events > 64) {
2513 		unsigned long rs_left;
2514 		int i;
2515 
2516 		device->rs_last_events = curr_events;
2517 
2518 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2519 		 * approx. */
2520 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2521 
2522 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2523 			rs_left = device->ov_left;
2524 		else
2525 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2526 
2527 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2528 		if (!dt)
2529 			dt++;
2530 		db = device->rs_mark_left[i] - rs_left;
2531 		dbdt = Bit2KB(db/dt);
2532 
2533 		if (dbdt > c_min_rate)
2534 			return true;
2535 	}
2536 	return false;
2537 }
2538 
2539 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2540 {
2541 	struct drbd_peer_device *peer_device;
2542 	struct drbd_device *device;
2543 	sector_t sector;
2544 	sector_t capacity;
2545 	struct drbd_peer_request *peer_req;
2546 	struct digest_info *di = NULL;
2547 	int size, verb;
2548 	unsigned int fault_type;
2549 	struct p_block_req *p =	pi->data;
2550 
2551 	peer_device = conn_peer_device(connection, pi->vnr);
2552 	if (!peer_device)
2553 		return -EIO;
2554 	device = peer_device->device;
2555 	capacity = drbd_get_capacity(device->this_bdev);
2556 
2557 	sector = be64_to_cpu(p->sector);
2558 	size   = be32_to_cpu(p->blksize);
2559 
2560 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2561 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2562 				(unsigned long long)sector, size);
2563 		return -EINVAL;
2564 	}
2565 	if (sector + (size>>9) > capacity) {
2566 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2567 				(unsigned long long)sector, size);
2568 		return -EINVAL;
2569 	}
2570 
2571 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2572 		verb = 1;
2573 		switch (pi->cmd) {
2574 		case P_DATA_REQUEST:
2575 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2576 			break;
2577 		case P_RS_DATA_REQUEST:
2578 		case P_CSUM_RS_REQUEST:
2579 		case P_OV_REQUEST:
2580 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2581 			break;
2582 		case P_OV_REPLY:
2583 			verb = 0;
2584 			dec_rs_pending(device);
2585 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2586 			break;
2587 		default:
2588 			BUG();
2589 		}
2590 		if (verb && __ratelimit(&drbd_ratelimit_state))
2591 			drbd_err(device, "Can not satisfy peer's read request, "
2592 			    "no local data.\n");
2593 
2594 		/* drain possibly payload */
2595 		return drbd_drain_block(peer_device, pi->size);
2596 	}
2597 
2598 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2599 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2600 	 * which in turn might block on the other node at this very place.  */
2601 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2602 			true /* has real payload */, GFP_NOIO);
2603 	if (!peer_req) {
2604 		put_ldev(device);
2605 		return -ENOMEM;
2606 	}
2607 
2608 	switch (pi->cmd) {
2609 	case P_DATA_REQUEST:
2610 		peer_req->w.cb = w_e_end_data_req;
2611 		fault_type = DRBD_FAULT_DT_RD;
2612 		/* application IO, don't drbd_rs_begin_io */
2613 		peer_req->flags |= EE_APPLICATION;
2614 		goto submit;
2615 
2616 	case P_RS_DATA_REQUEST:
2617 		peer_req->w.cb = w_e_end_rsdata_req;
2618 		fault_type = DRBD_FAULT_RS_RD;
2619 		/* used in the sector offset progress display */
2620 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2621 		break;
2622 
2623 	case P_OV_REPLY:
2624 	case P_CSUM_RS_REQUEST:
2625 		fault_type = DRBD_FAULT_RS_RD;
2626 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2627 		if (!di)
2628 			goto out_free_e;
2629 
2630 		di->digest_size = pi->size;
2631 		di->digest = (((char *)di)+sizeof(struct digest_info));
2632 
2633 		peer_req->digest = di;
2634 		peer_req->flags |= EE_HAS_DIGEST;
2635 
2636 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2637 			goto out_free_e;
2638 
2639 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2640 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2641 			peer_req->w.cb = w_e_end_csum_rs_req;
2642 			/* used in the sector offset progress display */
2643 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2644 			/* remember to report stats in drbd_resync_finished */
2645 			device->use_csums = true;
2646 		} else if (pi->cmd == P_OV_REPLY) {
2647 			/* track progress, we may need to throttle */
2648 			atomic_add(size >> 9, &device->rs_sect_in);
2649 			peer_req->w.cb = w_e_end_ov_reply;
2650 			dec_rs_pending(device);
2651 			/* drbd_rs_begin_io done when we sent this request,
2652 			 * but accounting still needs to be done. */
2653 			goto submit_for_resync;
2654 		}
2655 		break;
2656 
2657 	case P_OV_REQUEST:
2658 		if (device->ov_start_sector == ~(sector_t)0 &&
2659 		    peer_device->connection->agreed_pro_version >= 90) {
2660 			unsigned long now = jiffies;
2661 			int i;
2662 			device->ov_start_sector = sector;
2663 			device->ov_position = sector;
2664 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2665 			device->rs_total = device->ov_left;
2666 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2667 				device->rs_mark_left[i] = device->ov_left;
2668 				device->rs_mark_time[i] = now;
2669 			}
2670 			drbd_info(device, "Online Verify start sector: %llu\n",
2671 					(unsigned long long)sector);
2672 		}
2673 		peer_req->w.cb = w_e_end_ov_req;
2674 		fault_type = DRBD_FAULT_RS_RD;
2675 		break;
2676 
2677 	default:
2678 		BUG();
2679 	}
2680 
2681 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2682 	 * wrt the receiver, but it is not as straightforward as it may seem.
2683 	 * Various places in the resync start and stop logic assume resync
2684 	 * requests are processed in order, requeuing this on the worker thread
2685 	 * introduces a bunch of new code for synchronization between threads.
2686 	 *
2687 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2688 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2689 	 * for application writes for the same time.  For now, just throttle
2690 	 * here, where the rest of the code expects the receiver to sleep for
2691 	 * a while, anyways.
2692 	 */
2693 
2694 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2695 	 * this defers syncer requests for some time, before letting at least
2696 	 * on request through.  The resync controller on the receiving side
2697 	 * will adapt to the incoming rate accordingly.
2698 	 *
2699 	 * We cannot throttle here if remote is Primary/SyncTarget:
2700 	 * we would also throttle its application reads.
2701 	 * In that case, throttling is done on the SyncTarget only.
2702 	 */
2703 
2704 	/* Even though this may be a resync request, we do add to "read_ee";
2705 	 * "sync_ee" is only used for resync WRITEs.
2706 	 * Add to list early, so debugfs can find this request
2707 	 * even if we have to sleep below. */
2708 	spin_lock_irq(&device->resource->req_lock);
2709 	list_add_tail(&peer_req->w.list, &device->read_ee);
2710 	spin_unlock_irq(&device->resource->req_lock);
2711 
2712 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2713 	if (device->state.peer != R_PRIMARY
2714 	&& drbd_rs_should_slow_down(device, sector, false))
2715 		schedule_timeout_uninterruptible(HZ/10);
2716 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2717 	if (drbd_rs_begin_io(device, sector))
2718 		goto out_free_e;
2719 
2720 submit_for_resync:
2721 	atomic_add(size >> 9, &device->rs_sect_ev);
2722 
2723 submit:
2724 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2725 	inc_unacked(device);
2726 	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2727 		return 0;
2728 
2729 	/* don't care for the reason here */
2730 	drbd_err(device, "submit failed, triggering re-connect\n");
2731 
2732 out_free_e:
2733 	spin_lock_irq(&device->resource->req_lock);
2734 	list_del(&peer_req->w.list);
2735 	spin_unlock_irq(&device->resource->req_lock);
2736 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2737 
2738 	put_ldev(device);
2739 	drbd_free_peer_req(device, peer_req);
2740 	return -EIO;
2741 }
2742 
2743 /**
2744  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2745  */
2746 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2747 {
2748 	struct drbd_device *device = peer_device->device;
2749 	int self, peer, rv = -100;
2750 	unsigned long ch_self, ch_peer;
2751 	enum drbd_after_sb_p after_sb_0p;
2752 
2753 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2754 	peer = device->p_uuid[UI_BITMAP] & 1;
2755 
2756 	ch_peer = device->p_uuid[UI_SIZE];
2757 	ch_self = device->comm_bm_set;
2758 
2759 	rcu_read_lock();
2760 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2761 	rcu_read_unlock();
2762 	switch (after_sb_0p) {
2763 	case ASB_CONSENSUS:
2764 	case ASB_DISCARD_SECONDARY:
2765 	case ASB_CALL_HELPER:
2766 	case ASB_VIOLENTLY:
2767 		drbd_err(device, "Configuration error.\n");
2768 		break;
2769 	case ASB_DISCONNECT:
2770 		break;
2771 	case ASB_DISCARD_YOUNGER_PRI:
2772 		if (self == 0 && peer == 1) {
2773 			rv = -1;
2774 			break;
2775 		}
2776 		if (self == 1 && peer == 0) {
2777 			rv =  1;
2778 			break;
2779 		}
2780 		/* Else fall through to one of the other strategies... */
2781 	case ASB_DISCARD_OLDER_PRI:
2782 		if (self == 0 && peer == 1) {
2783 			rv = 1;
2784 			break;
2785 		}
2786 		if (self == 1 && peer == 0) {
2787 			rv = -1;
2788 			break;
2789 		}
2790 		/* Else fall through to one of the other strategies... */
2791 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2792 		     "Using discard-least-changes instead\n");
2793 	case ASB_DISCARD_ZERO_CHG:
2794 		if (ch_peer == 0 && ch_self == 0) {
2795 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2796 				? -1 : 1;
2797 			break;
2798 		} else {
2799 			if (ch_peer == 0) { rv =  1; break; }
2800 			if (ch_self == 0) { rv = -1; break; }
2801 		}
2802 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2803 			break;
2804 	case ASB_DISCARD_LEAST_CHG:
2805 		if	(ch_self < ch_peer)
2806 			rv = -1;
2807 		else if (ch_self > ch_peer)
2808 			rv =  1;
2809 		else /* ( ch_self == ch_peer ) */
2810 		     /* Well, then use something else. */
2811 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2812 				? -1 : 1;
2813 		break;
2814 	case ASB_DISCARD_LOCAL:
2815 		rv = -1;
2816 		break;
2817 	case ASB_DISCARD_REMOTE:
2818 		rv =  1;
2819 	}
2820 
2821 	return rv;
2822 }
2823 
2824 /**
2825  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2826  */
2827 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2828 {
2829 	struct drbd_device *device = peer_device->device;
2830 	int hg, rv = -100;
2831 	enum drbd_after_sb_p after_sb_1p;
2832 
2833 	rcu_read_lock();
2834 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2835 	rcu_read_unlock();
2836 	switch (after_sb_1p) {
2837 	case ASB_DISCARD_YOUNGER_PRI:
2838 	case ASB_DISCARD_OLDER_PRI:
2839 	case ASB_DISCARD_LEAST_CHG:
2840 	case ASB_DISCARD_LOCAL:
2841 	case ASB_DISCARD_REMOTE:
2842 	case ASB_DISCARD_ZERO_CHG:
2843 		drbd_err(device, "Configuration error.\n");
2844 		break;
2845 	case ASB_DISCONNECT:
2846 		break;
2847 	case ASB_CONSENSUS:
2848 		hg = drbd_asb_recover_0p(peer_device);
2849 		if (hg == -1 && device->state.role == R_SECONDARY)
2850 			rv = hg;
2851 		if (hg == 1  && device->state.role == R_PRIMARY)
2852 			rv = hg;
2853 		break;
2854 	case ASB_VIOLENTLY:
2855 		rv = drbd_asb_recover_0p(peer_device);
2856 		break;
2857 	case ASB_DISCARD_SECONDARY:
2858 		return device->state.role == R_PRIMARY ? 1 : -1;
2859 	case ASB_CALL_HELPER:
2860 		hg = drbd_asb_recover_0p(peer_device);
2861 		if (hg == -1 && device->state.role == R_PRIMARY) {
2862 			enum drbd_state_rv rv2;
2863 
2864 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2865 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2866 			  * we do not need to wait for the after state change work either. */
2867 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2868 			if (rv2 != SS_SUCCESS) {
2869 				drbd_khelper(device, "pri-lost-after-sb");
2870 			} else {
2871 				drbd_warn(device, "Successfully gave up primary role.\n");
2872 				rv = hg;
2873 			}
2874 		} else
2875 			rv = hg;
2876 	}
2877 
2878 	return rv;
2879 }
2880 
2881 /**
2882  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2883  */
2884 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2885 {
2886 	struct drbd_device *device = peer_device->device;
2887 	int hg, rv = -100;
2888 	enum drbd_after_sb_p after_sb_2p;
2889 
2890 	rcu_read_lock();
2891 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2892 	rcu_read_unlock();
2893 	switch (after_sb_2p) {
2894 	case ASB_DISCARD_YOUNGER_PRI:
2895 	case ASB_DISCARD_OLDER_PRI:
2896 	case ASB_DISCARD_LEAST_CHG:
2897 	case ASB_DISCARD_LOCAL:
2898 	case ASB_DISCARD_REMOTE:
2899 	case ASB_CONSENSUS:
2900 	case ASB_DISCARD_SECONDARY:
2901 	case ASB_DISCARD_ZERO_CHG:
2902 		drbd_err(device, "Configuration error.\n");
2903 		break;
2904 	case ASB_VIOLENTLY:
2905 		rv = drbd_asb_recover_0p(peer_device);
2906 		break;
2907 	case ASB_DISCONNECT:
2908 		break;
2909 	case ASB_CALL_HELPER:
2910 		hg = drbd_asb_recover_0p(peer_device);
2911 		if (hg == -1) {
2912 			enum drbd_state_rv rv2;
2913 
2914 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2915 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2916 			  * we do not need to wait for the after state change work either. */
2917 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2918 			if (rv2 != SS_SUCCESS) {
2919 				drbd_khelper(device, "pri-lost-after-sb");
2920 			} else {
2921 				drbd_warn(device, "Successfully gave up primary role.\n");
2922 				rv = hg;
2923 			}
2924 		} else
2925 			rv = hg;
2926 	}
2927 
2928 	return rv;
2929 }
2930 
2931 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2932 			   u64 bits, u64 flags)
2933 {
2934 	if (!uuid) {
2935 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2936 		return;
2937 	}
2938 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2939 	     text,
2940 	     (unsigned long long)uuid[UI_CURRENT],
2941 	     (unsigned long long)uuid[UI_BITMAP],
2942 	     (unsigned long long)uuid[UI_HISTORY_START],
2943 	     (unsigned long long)uuid[UI_HISTORY_END],
2944 	     (unsigned long long)bits,
2945 	     (unsigned long long)flags);
2946 }
2947 
2948 /*
2949   100	after split brain try auto recover
2950     2	C_SYNC_SOURCE set BitMap
2951     1	C_SYNC_SOURCE use BitMap
2952     0	no Sync
2953    -1	C_SYNC_TARGET use BitMap
2954    -2	C_SYNC_TARGET set BitMap
2955  -100	after split brain, disconnect
2956 -1000	unrelated data
2957 -1091   requires proto 91
2958 -1096   requires proto 96
2959  */
2960 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2961 {
2962 	struct drbd_peer_device *const peer_device = first_peer_device(device);
2963 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2964 	u64 self, peer;
2965 	int i, j;
2966 
2967 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2968 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2969 
2970 	*rule_nr = 10;
2971 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2972 		return 0;
2973 
2974 	*rule_nr = 20;
2975 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2976 	     peer != UUID_JUST_CREATED)
2977 		return -2;
2978 
2979 	*rule_nr = 30;
2980 	if (self != UUID_JUST_CREATED &&
2981 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2982 		return 2;
2983 
2984 	if (self == peer) {
2985 		int rct, dc; /* roles at crash time */
2986 
2987 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2988 
2989 			if (connection->agreed_pro_version < 91)
2990 				return -1091;
2991 
2992 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2993 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2994 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2995 				drbd_uuid_move_history(device);
2996 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2997 				device->ldev->md.uuid[UI_BITMAP] = 0;
2998 
2999 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3000 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3001 				*rule_nr = 34;
3002 			} else {
3003 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3004 				*rule_nr = 36;
3005 			}
3006 
3007 			return 1;
3008 		}
3009 
3010 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3011 
3012 			if (connection->agreed_pro_version < 91)
3013 				return -1091;
3014 
3015 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3016 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3017 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3018 
3019 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3020 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3021 				device->p_uuid[UI_BITMAP] = 0UL;
3022 
3023 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3024 				*rule_nr = 35;
3025 			} else {
3026 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3027 				*rule_nr = 37;
3028 			}
3029 
3030 			return -1;
3031 		}
3032 
3033 		/* Common power [off|failure] */
3034 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3035 			(device->p_uuid[UI_FLAGS] & 2);
3036 		/* lowest bit is set when we were primary,
3037 		 * next bit (weight 2) is set when peer was primary */
3038 		*rule_nr = 40;
3039 
3040 		switch (rct) {
3041 		case 0: /* !self_pri && !peer_pri */ return 0;
3042 		case 1: /*  self_pri && !peer_pri */ return 1;
3043 		case 2: /* !self_pri &&  peer_pri */ return -1;
3044 		case 3: /*  self_pri &&  peer_pri */
3045 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3046 			return dc ? -1 : 1;
3047 		}
3048 	}
3049 
3050 	*rule_nr = 50;
3051 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3052 	if (self == peer)
3053 		return -1;
3054 
3055 	*rule_nr = 51;
3056 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3057 	if (self == peer) {
3058 		if (connection->agreed_pro_version < 96 ?
3059 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3060 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3061 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3062 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3063 			   resync as sync source modifications of the peer's UUIDs. */
3064 
3065 			if (connection->agreed_pro_version < 91)
3066 				return -1091;
3067 
3068 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3069 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3070 
3071 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3072 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3073 
3074 			return -1;
3075 		}
3076 	}
3077 
3078 	*rule_nr = 60;
3079 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3080 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3081 		peer = device->p_uuid[i] & ~((u64)1);
3082 		if (self == peer)
3083 			return -2;
3084 	}
3085 
3086 	*rule_nr = 70;
3087 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3088 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3089 	if (self == peer)
3090 		return 1;
3091 
3092 	*rule_nr = 71;
3093 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3094 	if (self == peer) {
3095 		if (connection->agreed_pro_version < 96 ?
3096 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3097 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3098 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3099 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3100 			   resync as sync source modifications of our UUIDs. */
3101 
3102 			if (connection->agreed_pro_version < 91)
3103 				return -1091;
3104 
3105 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3106 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3107 
3108 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3109 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3110 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3111 
3112 			return 1;
3113 		}
3114 	}
3115 
3116 
3117 	*rule_nr = 80;
3118 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3119 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3120 		self = device->ldev->md.uuid[i] & ~((u64)1);
3121 		if (self == peer)
3122 			return 2;
3123 	}
3124 
3125 	*rule_nr = 90;
3126 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3127 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3128 	if (self == peer && self != ((u64)0))
3129 		return 100;
3130 
3131 	*rule_nr = 100;
3132 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3133 		self = device->ldev->md.uuid[i] & ~((u64)1);
3134 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3135 			peer = device->p_uuid[j] & ~((u64)1);
3136 			if (self == peer)
3137 				return -100;
3138 		}
3139 	}
3140 
3141 	return -1000;
3142 }
3143 
3144 /* drbd_sync_handshake() returns the new conn state on success, or
3145    CONN_MASK (-1) on failure.
3146  */
3147 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3148 					   enum drbd_role peer_role,
3149 					   enum drbd_disk_state peer_disk) __must_hold(local)
3150 {
3151 	struct drbd_device *device = peer_device->device;
3152 	enum drbd_conns rv = C_MASK;
3153 	enum drbd_disk_state mydisk;
3154 	struct net_conf *nc;
3155 	int hg, rule_nr, rr_conflict, tentative;
3156 
3157 	mydisk = device->state.disk;
3158 	if (mydisk == D_NEGOTIATING)
3159 		mydisk = device->new_state_tmp.disk;
3160 
3161 	drbd_info(device, "drbd_sync_handshake:\n");
3162 
3163 	spin_lock_irq(&device->ldev->md.uuid_lock);
3164 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3165 	drbd_uuid_dump(device, "peer", device->p_uuid,
3166 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3167 
3168 	hg = drbd_uuid_compare(device, &rule_nr);
3169 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3170 
3171 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3172 
3173 	if (hg == -1000) {
3174 		drbd_alert(device, "Unrelated data, aborting!\n");
3175 		return C_MASK;
3176 	}
3177 	if (hg < -1000) {
3178 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3179 		return C_MASK;
3180 	}
3181 
3182 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3183 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3184 		int f = (hg == -100) || abs(hg) == 2;
3185 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3186 		if (f)
3187 			hg = hg*2;
3188 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3189 		     hg > 0 ? "source" : "target");
3190 	}
3191 
3192 	if (abs(hg) == 100)
3193 		drbd_khelper(device, "initial-split-brain");
3194 
3195 	rcu_read_lock();
3196 	nc = rcu_dereference(peer_device->connection->net_conf);
3197 
3198 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3199 		int pcount = (device->state.role == R_PRIMARY)
3200 			   + (peer_role == R_PRIMARY);
3201 		int forced = (hg == -100);
3202 
3203 		switch (pcount) {
3204 		case 0:
3205 			hg = drbd_asb_recover_0p(peer_device);
3206 			break;
3207 		case 1:
3208 			hg = drbd_asb_recover_1p(peer_device);
3209 			break;
3210 		case 2:
3211 			hg = drbd_asb_recover_2p(peer_device);
3212 			break;
3213 		}
3214 		if (abs(hg) < 100) {
3215 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3216 			     "automatically solved. Sync from %s node\n",
3217 			     pcount, (hg < 0) ? "peer" : "this");
3218 			if (forced) {
3219 				drbd_warn(device, "Doing a full sync, since"
3220 				     " UUIDs where ambiguous.\n");
3221 				hg = hg*2;
3222 			}
3223 		}
3224 	}
3225 
3226 	if (hg == -100) {
3227 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3228 			hg = -1;
3229 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3230 			hg = 1;
3231 
3232 		if (abs(hg) < 100)
3233 			drbd_warn(device, "Split-Brain detected, manually solved. "
3234 			     "Sync from %s node\n",
3235 			     (hg < 0) ? "peer" : "this");
3236 	}
3237 	rr_conflict = nc->rr_conflict;
3238 	tentative = nc->tentative;
3239 	rcu_read_unlock();
3240 
3241 	if (hg == -100) {
3242 		/* FIXME this log message is not correct if we end up here
3243 		 * after an attempted attach on a diskless node.
3244 		 * We just refuse to attach -- well, we drop the "connection"
3245 		 * to that disk, in a way... */
3246 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3247 		drbd_khelper(device, "split-brain");
3248 		return C_MASK;
3249 	}
3250 
3251 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3252 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3253 		return C_MASK;
3254 	}
3255 
3256 	if (hg < 0 && /* by intention we do not use mydisk here. */
3257 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3258 		switch (rr_conflict) {
3259 		case ASB_CALL_HELPER:
3260 			drbd_khelper(device, "pri-lost");
3261 			/* fall through */
3262 		case ASB_DISCONNECT:
3263 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3264 			return C_MASK;
3265 		case ASB_VIOLENTLY:
3266 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3267 			     "assumption\n");
3268 		}
3269 	}
3270 
3271 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3272 		if (hg == 0)
3273 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3274 		else
3275 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3276 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3277 				 abs(hg) >= 2 ? "full" : "bit-map based");
3278 		return C_MASK;
3279 	}
3280 
3281 	if (abs(hg) >= 2) {
3282 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3283 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3284 					BM_LOCKED_SET_ALLOWED))
3285 			return C_MASK;
3286 	}
3287 
3288 	if (hg > 0) { /* become sync source. */
3289 		rv = C_WF_BITMAP_S;
3290 	} else if (hg < 0) { /* become sync target */
3291 		rv = C_WF_BITMAP_T;
3292 	} else {
3293 		rv = C_CONNECTED;
3294 		if (drbd_bm_total_weight(device)) {
3295 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3296 			     drbd_bm_total_weight(device));
3297 		}
3298 	}
3299 
3300 	return rv;
3301 }
3302 
3303 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3304 {
3305 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3306 	if (peer == ASB_DISCARD_REMOTE)
3307 		return ASB_DISCARD_LOCAL;
3308 
3309 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3310 	if (peer == ASB_DISCARD_LOCAL)
3311 		return ASB_DISCARD_REMOTE;
3312 
3313 	/* everything else is valid if they are equal on both sides. */
3314 	return peer;
3315 }
3316 
3317 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3318 {
3319 	struct p_protocol *p = pi->data;
3320 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3321 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3322 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3323 	char integrity_alg[SHARED_SECRET_MAX] = "";
3324 	struct crypto_ahash *peer_integrity_tfm = NULL;
3325 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3326 
3327 	p_proto		= be32_to_cpu(p->protocol);
3328 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3329 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3330 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3331 	p_two_primaries = be32_to_cpu(p->two_primaries);
3332 	cf		= be32_to_cpu(p->conn_flags);
3333 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3334 
3335 	if (connection->agreed_pro_version >= 87) {
3336 		int err;
3337 
3338 		if (pi->size > sizeof(integrity_alg))
3339 			return -EIO;
3340 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3341 		if (err)
3342 			return err;
3343 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3344 	}
3345 
3346 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3347 		clear_bit(CONN_DRY_RUN, &connection->flags);
3348 
3349 		if (cf & CF_DRY_RUN)
3350 			set_bit(CONN_DRY_RUN, &connection->flags);
3351 
3352 		rcu_read_lock();
3353 		nc = rcu_dereference(connection->net_conf);
3354 
3355 		if (p_proto != nc->wire_protocol) {
3356 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3357 			goto disconnect_rcu_unlock;
3358 		}
3359 
3360 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3361 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3362 			goto disconnect_rcu_unlock;
3363 		}
3364 
3365 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3366 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3367 			goto disconnect_rcu_unlock;
3368 		}
3369 
3370 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3371 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3372 			goto disconnect_rcu_unlock;
3373 		}
3374 
3375 		if (p_discard_my_data && nc->discard_my_data) {
3376 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3377 			goto disconnect_rcu_unlock;
3378 		}
3379 
3380 		if (p_two_primaries != nc->two_primaries) {
3381 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3382 			goto disconnect_rcu_unlock;
3383 		}
3384 
3385 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3386 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3387 			goto disconnect_rcu_unlock;
3388 		}
3389 
3390 		rcu_read_unlock();
3391 	}
3392 
3393 	if (integrity_alg[0]) {
3394 		int hash_size;
3395 
3396 		/*
3397 		 * We can only change the peer data integrity algorithm
3398 		 * here.  Changing our own data integrity algorithm
3399 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3400 		 * the same time; otherwise, the peer has no way to
3401 		 * tell between which packets the algorithm should
3402 		 * change.
3403 		 */
3404 
3405 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3406 		if (!peer_integrity_tfm) {
3407 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3408 				 integrity_alg);
3409 			goto disconnect;
3410 		}
3411 
3412 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3413 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3414 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3415 		if (!(int_dig_in && int_dig_vv)) {
3416 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3417 			goto disconnect;
3418 		}
3419 	}
3420 
3421 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3422 	if (!new_net_conf) {
3423 		drbd_err(connection, "Allocation of new net_conf failed\n");
3424 		goto disconnect;
3425 	}
3426 
3427 	mutex_lock(&connection->data.mutex);
3428 	mutex_lock(&connection->resource->conf_update);
3429 	old_net_conf = connection->net_conf;
3430 	*new_net_conf = *old_net_conf;
3431 
3432 	new_net_conf->wire_protocol = p_proto;
3433 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3434 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3435 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3436 	new_net_conf->two_primaries = p_two_primaries;
3437 
3438 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3439 	mutex_unlock(&connection->resource->conf_update);
3440 	mutex_unlock(&connection->data.mutex);
3441 
3442 	crypto_free_ahash(connection->peer_integrity_tfm);
3443 	kfree(connection->int_dig_in);
3444 	kfree(connection->int_dig_vv);
3445 	connection->peer_integrity_tfm = peer_integrity_tfm;
3446 	connection->int_dig_in = int_dig_in;
3447 	connection->int_dig_vv = int_dig_vv;
3448 
3449 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3450 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3451 			  integrity_alg[0] ? integrity_alg : "(none)");
3452 
3453 	synchronize_rcu();
3454 	kfree(old_net_conf);
3455 	return 0;
3456 
3457 disconnect_rcu_unlock:
3458 	rcu_read_unlock();
3459 disconnect:
3460 	crypto_free_ahash(peer_integrity_tfm);
3461 	kfree(int_dig_in);
3462 	kfree(int_dig_vv);
3463 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3464 	return -EIO;
3465 }
3466 
3467 /* helper function
3468  * input: alg name, feature name
3469  * return: NULL (alg name was "")
3470  *         ERR_PTR(error) if something goes wrong
3471  *         or the crypto hash ptr, if it worked out ok. */
3472 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3473 		const char *alg, const char *name)
3474 {
3475 	struct crypto_ahash *tfm;
3476 
3477 	if (!alg[0])
3478 		return NULL;
3479 
3480 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3481 	if (IS_ERR(tfm)) {
3482 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3483 			alg, name, PTR_ERR(tfm));
3484 		return tfm;
3485 	}
3486 	return tfm;
3487 }
3488 
3489 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3490 {
3491 	void *buffer = connection->data.rbuf;
3492 	int size = pi->size;
3493 
3494 	while (size) {
3495 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3496 		s = drbd_recv(connection, buffer, s);
3497 		if (s <= 0) {
3498 			if (s < 0)
3499 				return s;
3500 			break;
3501 		}
3502 		size -= s;
3503 	}
3504 	if (size)
3505 		return -EIO;
3506 	return 0;
3507 }
3508 
3509 /*
3510  * config_unknown_volume  -  device configuration command for unknown volume
3511  *
3512  * When a device is added to an existing connection, the node on which the
3513  * device is added first will send configuration commands to its peer but the
3514  * peer will not know about the device yet.  It will warn and ignore these
3515  * commands.  Once the device is added on the second node, the second node will
3516  * send the same device configuration commands, but in the other direction.
3517  *
3518  * (We can also end up here if drbd is misconfigured.)
3519  */
3520 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3521 {
3522 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3523 		  cmdname(pi->cmd), pi->vnr);
3524 	return ignore_remaining_packet(connection, pi);
3525 }
3526 
3527 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3528 {
3529 	struct drbd_peer_device *peer_device;
3530 	struct drbd_device *device;
3531 	struct p_rs_param_95 *p;
3532 	unsigned int header_size, data_size, exp_max_sz;
3533 	struct crypto_ahash *verify_tfm = NULL;
3534 	struct crypto_ahash *csums_tfm = NULL;
3535 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3536 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3537 	const int apv = connection->agreed_pro_version;
3538 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3539 	int fifo_size = 0;
3540 	int err;
3541 
3542 	peer_device = conn_peer_device(connection, pi->vnr);
3543 	if (!peer_device)
3544 		return config_unknown_volume(connection, pi);
3545 	device = peer_device->device;
3546 
3547 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3548 		    : apv == 88 ? sizeof(struct p_rs_param)
3549 					+ SHARED_SECRET_MAX
3550 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3551 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3552 
3553 	if (pi->size > exp_max_sz) {
3554 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3555 		    pi->size, exp_max_sz);
3556 		return -EIO;
3557 	}
3558 
3559 	if (apv <= 88) {
3560 		header_size = sizeof(struct p_rs_param);
3561 		data_size = pi->size - header_size;
3562 	} else if (apv <= 94) {
3563 		header_size = sizeof(struct p_rs_param_89);
3564 		data_size = pi->size - header_size;
3565 		D_ASSERT(device, data_size == 0);
3566 	} else {
3567 		header_size = sizeof(struct p_rs_param_95);
3568 		data_size = pi->size - header_size;
3569 		D_ASSERT(device, data_size == 0);
3570 	}
3571 
3572 	/* initialize verify_alg and csums_alg */
3573 	p = pi->data;
3574 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3575 
3576 	err = drbd_recv_all(peer_device->connection, p, header_size);
3577 	if (err)
3578 		return err;
3579 
3580 	mutex_lock(&connection->resource->conf_update);
3581 	old_net_conf = peer_device->connection->net_conf;
3582 	if (get_ldev(device)) {
3583 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3584 		if (!new_disk_conf) {
3585 			put_ldev(device);
3586 			mutex_unlock(&connection->resource->conf_update);
3587 			drbd_err(device, "Allocation of new disk_conf failed\n");
3588 			return -ENOMEM;
3589 		}
3590 
3591 		old_disk_conf = device->ldev->disk_conf;
3592 		*new_disk_conf = *old_disk_conf;
3593 
3594 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3595 	}
3596 
3597 	if (apv >= 88) {
3598 		if (apv == 88) {
3599 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3600 				drbd_err(device, "verify-alg of wrong size, "
3601 					"peer wants %u, accepting only up to %u byte\n",
3602 					data_size, SHARED_SECRET_MAX);
3603 				err = -EIO;
3604 				goto reconnect;
3605 			}
3606 
3607 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3608 			if (err)
3609 				goto reconnect;
3610 			/* we expect NUL terminated string */
3611 			/* but just in case someone tries to be evil */
3612 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3613 			p->verify_alg[data_size-1] = 0;
3614 
3615 		} else /* apv >= 89 */ {
3616 			/* we still expect NUL terminated strings */
3617 			/* but just in case someone tries to be evil */
3618 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3619 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3620 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3621 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3622 		}
3623 
3624 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3625 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3626 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3627 				    old_net_conf->verify_alg, p->verify_alg);
3628 				goto disconnect;
3629 			}
3630 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3631 					p->verify_alg, "verify-alg");
3632 			if (IS_ERR(verify_tfm)) {
3633 				verify_tfm = NULL;
3634 				goto disconnect;
3635 			}
3636 		}
3637 
3638 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3639 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3640 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3641 				    old_net_conf->csums_alg, p->csums_alg);
3642 				goto disconnect;
3643 			}
3644 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3645 					p->csums_alg, "csums-alg");
3646 			if (IS_ERR(csums_tfm)) {
3647 				csums_tfm = NULL;
3648 				goto disconnect;
3649 			}
3650 		}
3651 
3652 		if (apv > 94 && new_disk_conf) {
3653 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3654 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3655 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3656 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3657 
3658 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3659 			if (fifo_size != device->rs_plan_s->size) {
3660 				new_plan = fifo_alloc(fifo_size);
3661 				if (!new_plan) {
3662 					drbd_err(device, "kmalloc of fifo_buffer failed");
3663 					put_ldev(device);
3664 					goto disconnect;
3665 				}
3666 			}
3667 		}
3668 
3669 		if (verify_tfm || csums_tfm) {
3670 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3671 			if (!new_net_conf) {
3672 				drbd_err(device, "Allocation of new net_conf failed\n");
3673 				goto disconnect;
3674 			}
3675 
3676 			*new_net_conf = *old_net_conf;
3677 
3678 			if (verify_tfm) {
3679 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3680 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3681 				crypto_free_ahash(peer_device->connection->verify_tfm);
3682 				peer_device->connection->verify_tfm = verify_tfm;
3683 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3684 			}
3685 			if (csums_tfm) {
3686 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3687 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3688 				crypto_free_ahash(peer_device->connection->csums_tfm);
3689 				peer_device->connection->csums_tfm = csums_tfm;
3690 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3691 			}
3692 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3693 		}
3694 	}
3695 
3696 	if (new_disk_conf) {
3697 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698 		put_ldev(device);
3699 	}
3700 
3701 	if (new_plan) {
3702 		old_plan = device->rs_plan_s;
3703 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3704 	}
3705 
3706 	mutex_unlock(&connection->resource->conf_update);
3707 	synchronize_rcu();
3708 	if (new_net_conf)
3709 		kfree(old_net_conf);
3710 	kfree(old_disk_conf);
3711 	kfree(old_plan);
3712 
3713 	return 0;
3714 
3715 reconnect:
3716 	if (new_disk_conf) {
3717 		put_ldev(device);
3718 		kfree(new_disk_conf);
3719 	}
3720 	mutex_unlock(&connection->resource->conf_update);
3721 	return -EIO;
3722 
3723 disconnect:
3724 	kfree(new_plan);
3725 	if (new_disk_conf) {
3726 		put_ldev(device);
3727 		kfree(new_disk_conf);
3728 	}
3729 	mutex_unlock(&connection->resource->conf_update);
3730 	/* just for completeness: actually not needed,
3731 	 * as this is not reached if csums_tfm was ok. */
3732 	crypto_free_ahash(csums_tfm);
3733 	/* but free the verify_tfm again, if csums_tfm did not work out */
3734 	crypto_free_ahash(verify_tfm);
3735 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3736 	return -EIO;
3737 }
3738 
3739 /* warn if the arguments differ by more than 12.5% */
3740 static void warn_if_differ_considerably(struct drbd_device *device,
3741 	const char *s, sector_t a, sector_t b)
3742 {
3743 	sector_t d;
3744 	if (a == 0 || b == 0)
3745 		return;
3746 	d = (a > b) ? (a - b) : (b - a);
3747 	if (d > (a>>3) || d > (b>>3))
3748 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3749 		     (unsigned long long)a, (unsigned long long)b);
3750 }
3751 
3752 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3753 {
3754 	struct drbd_peer_device *peer_device;
3755 	struct drbd_device *device;
3756 	struct p_sizes *p = pi->data;
3757 	enum determine_dev_size dd = DS_UNCHANGED;
3758 	sector_t p_size, p_usize, p_csize, my_usize;
3759 	int ldsc = 0; /* local disk size changed */
3760 	enum dds_flags ddsf;
3761 
3762 	peer_device = conn_peer_device(connection, pi->vnr);
3763 	if (!peer_device)
3764 		return config_unknown_volume(connection, pi);
3765 	device = peer_device->device;
3766 
3767 	p_size = be64_to_cpu(p->d_size);
3768 	p_usize = be64_to_cpu(p->u_size);
3769 	p_csize = be64_to_cpu(p->c_size);
3770 
3771 	/* just store the peer's disk size for now.
3772 	 * we still need to figure out whether we accept that. */
3773 	device->p_size = p_size;
3774 
3775 	if (get_ldev(device)) {
3776 		rcu_read_lock();
3777 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3778 		rcu_read_unlock();
3779 
3780 		warn_if_differ_considerably(device, "lower level device sizes",
3781 			   p_size, drbd_get_max_capacity(device->ldev));
3782 		warn_if_differ_considerably(device, "user requested size",
3783 					    p_usize, my_usize);
3784 
3785 		/* if this is the first connect, or an otherwise expected
3786 		 * param exchange, choose the minimum */
3787 		if (device->state.conn == C_WF_REPORT_PARAMS)
3788 			p_usize = min_not_zero(my_usize, p_usize);
3789 
3790 		/* Never shrink a device with usable data during connect.
3791 		   But allow online shrinking if we are connected. */
3792 		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3793 		    drbd_get_capacity(device->this_bdev) &&
3794 		    device->state.disk >= D_OUTDATED &&
3795 		    device->state.conn < C_CONNECTED) {
3796 			drbd_err(device, "The peer's disk size is too small!\n");
3797 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3798 			put_ldev(device);
3799 			return -EIO;
3800 		}
3801 
3802 		if (my_usize != p_usize) {
3803 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3804 
3805 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3806 			if (!new_disk_conf) {
3807 				drbd_err(device, "Allocation of new disk_conf failed\n");
3808 				put_ldev(device);
3809 				return -ENOMEM;
3810 			}
3811 
3812 			mutex_lock(&connection->resource->conf_update);
3813 			old_disk_conf = device->ldev->disk_conf;
3814 			*new_disk_conf = *old_disk_conf;
3815 			new_disk_conf->disk_size = p_usize;
3816 
3817 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3818 			mutex_unlock(&connection->resource->conf_update);
3819 			synchronize_rcu();
3820 			kfree(old_disk_conf);
3821 
3822 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
3823 				 (unsigned long)my_usize);
3824 		}
3825 
3826 		put_ldev(device);
3827 	}
3828 
3829 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3830 	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3831 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3832 	   drbd_reconsider_max_bio_size(), we can be sure that after
3833 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3834 
3835 	ddsf = be16_to_cpu(p->dds_flags);
3836 	if (get_ldev(device)) {
3837 		drbd_reconsider_max_bio_size(device, device->ldev);
3838 		dd = drbd_determine_dev_size(device, ddsf, NULL);
3839 		put_ldev(device);
3840 		if (dd == DS_ERROR)
3841 			return -EIO;
3842 		drbd_md_sync(device);
3843 	} else {
3844 		/*
3845 		 * I am diskless, need to accept the peer's *current* size.
3846 		 * I must NOT accept the peers backing disk size,
3847 		 * it may have been larger than mine all along...
3848 		 *
3849 		 * At this point, the peer knows more about my disk, or at
3850 		 * least about what we last agreed upon, than myself.
3851 		 * So if his c_size is less than his d_size, the most likely
3852 		 * reason is that *my* d_size was smaller last time we checked.
3853 		 *
3854 		 * However, if he sends a zero current size,
3855 		 * take his (user-capped or) backing disk size anyways.
3856 		 */
3857 		drbd_reconsider_max_bio_size(device, NULL);
3858 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3859 	}
3860 
3861 	if (get_ldev(device)) {
3862 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3863 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3864 			ldsc = 1;
3865 		}
3866 
3867 		put_ldev(device);
3868 	}
3869 
3870 	if (device->state.conn > C_WF_REPORT_PARAMS) {
3871 		if (be64_to_cpu(p->c_size) !=
3872 		    drbd_get_capacity(device->this_bdev) || ldsc) {
3873 			/* we have different sizes, probably peer
3874 			 * needs to know my new size... */
3875 			drbd_send_sizes(peer_device, 0, ddsf);
3876 		}
3877 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3878 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3879 			if (device->state.pdsk >= D_INCONSISTENT &&
3880 			    device->state.disk >= D_INCONSISTENT) {
3881 				if (ddsf & DDSF_NO_RESYNC)
3882 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3883 				else
3884 					resync_after_online_grow(device);
3885 			} else
3886 				set_bit(RESYNC_AFTER_NEG, &device->flags);
3887 		}
3888 	}
3889 
3890 	return 0;
3891 }
3892 
3893 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3894 {
3895 	struct drbd_peer_device *peer_device;
3896 	struct drbd_device *device;
3897 	struct p_uuids *p = pi->data;
3898 	u64 *p_uuid;
3899 	int i, updated_uuids = 0;
3900 
3901 	peer_device = conn_peer_device(connection, pi->vnr);
3902 	if (!peer_device)
3903 		return config_unknown_volume(connection, pi);
3904 	device = peer_device->device;
3905 
3906 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3907 	if (!p_uuid) {
3908 		drbd_err(device, "kmalloc of p_uuid failed\n");
3909 		return false;
3910 	}
3911 
3912 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3913 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3914 
3915 	kfree(device->p_uuid);
3916 	device->p_uuid = p_uuid;
3917 
3918 	if (device->state.conn < C_CONNECTED &&
3919 	    device->state.disk < D_INCONSISTENT &&
3920 	    device->state.role == R_PRIMARY &&
3921 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3922 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3923 		    (unsigned long long)device->ed_uuid);
3924 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3925 		return -EIO;
3926 	}
3927 
3928 	if (get_ldev(device)) {
3929 		int skip_initial_sync =
3930 			device->state.conn == C_CONNECTED &&
3931 			peer_device->connection->agreed_pro_version >= 90 &&
3932 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3933 			(p_uuid[UI_FLAGS] & 8);
3934 		if (skip_initial_sync) {
3935 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3936 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3937 					"clear_n_write from receive_uuids",
3938 					BM_LOCKED_TEST_ALLOWED);
3939 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3940 			_drbd_uuid_set(device, UI_BITMAP, 0);
3941 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3942 					CS_VERBOSE, NULL);
3943 			drbd_md_sync(device);
3944 			updated_uuids = 1;
3945 		}
3946 		put_ldev(device);
3947 	} else if (device->state.disk < D_INCONSISTENT &&
3948 		   device->state.role == R_PRIMARY) {
3949 		/* I am a diskless primary, the peer just created a new current UUID
3950 		   for me. */
3951 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3952 	}
3953 
3954 	/* Before we test for the disk state, we should wait until an eventually
3955 	   ongoing cluster wide state change is finished. That is important if
3956 	   we are primary and are detaching from our disk. We need to see the
3957 	   new disk state... */
3958 	mutex_lock(device->state_mutex);
3959 	mutex_unlock(device->state_mutex);
3960 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3961 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3962 
3963 	if (updated_uuids)
3964 		drbd_print_uuids(device, "receiver updated UUIDs to");
3965 
3966 	return 0;
3967 }
3968 
3969 /**
3970  * convert_state() - Converts the peer's view of the cluster state to our point of view
3971  * @ps:		The state as seen by the peer.
3972  */
3973 static union drbd_state convert_state(union drbd_state ps)
3974 {
3975 	union drbd_state ms;
3976 
3977 	static enum drbd_conns c_tab[] = {
3978 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3979 		[C_CONNECTED] = C_CONNECTED,
3980 
3981 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3982 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3983 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3984 		[C_VERIFY_S]       = C_VERIFY_T,
3985 		[C_MASK]   = C_MASK,
3986 	};
3987 
3988 	ms.i = ps.i;
3989 
3990 	ms.conn = c_tab[ps.conn];
3991 	ms.peer = ps.role;
3992 	ms.role = ps.peer;
3993 	ms.pdsk = ps.disk;
3994 	ms.disk = ps.pdsk;
3995 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3996 
3997 	return ms;
3998 }
3999 
4000 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4001 {
4002 	struct drbd_peer_device *peer_device;
4003 	struct drbd_device *device;
4004 	struct p_req_state *p = pi->data;
4005 	union drbd_state mask, val;
4006 	enum drbd_state_rv rv;
4007 
4008 	peer_device = conn_peer_device(connection, pi->vnr);
4009 	if (!peer_device)
4010 		return -EIO;
4011 	device = peer_device->device;
4012 
4013 	mask.i = be32_to_cpu(p->mask);
4014 	val.i = be32_to_cpu(p->val);
4015 
4016 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4017 	    mutex_is_locked(device->state_mutex)) {
4018 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4019 		return 0;
4020 	}
4021 
4022 	mask = convert_state(mask);
4023 	val = convert_state(val);
4024 
4025 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4026 	drbd_send_sr_reply(peer_device, rv);
4027 
4028 	drbd_md_sync(device);
4029 
4030 	return 0;
4031 }
4032 
4033 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4034 {
4035 	struct p_req_state *p = pi->data;
4036 	union drbd_state mask, val;
4037 	enum drbd_state_rv rv;
4038 
4039 	mask.i = be32_to_cpu(p->mask);
4040 	val.i = be32_to_cpu(p->val);
4041 
4042 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4043 	    mutex_is_locked(&connection->cstate_mutex)) {
4044 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4045 		return 0;
4046 	}
4047 
4048 	mask = convert_state(mask);
4049 	val = convert_state(val);
4050 
4051 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4052 	conn_send_sr_reply(connection, rv);
4053 
4054 	return 0;
4055 }
4056 
4057 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4058 {
4059 	struct drbd_peer_device *peer_device;
4060 	struct drbd_device *device;
4061 	struct p_state *p = pi->data;
4062 	union drbd_state os, ns, peer_state;
4063 	enum drbd_disk_state real_peer_disk;
4064 	enum chg_state_flags cs_flags;
4065 	int rv;
4066 
4067 	peer_device = conn_peer_device(connection, pi->vnr);
4068 	if (!peer_device)
4069 		return config_unknown_volume(connection, pi);
4070 	device = peer_device->device;
4071 
4072 	peer_state.i = be32_to_cpu(p->state);
4073 
4074 	real_peer_disk = peer_state.disk;
4075 	if (peer_state.disk == D_NEGOTIATING) {
4076 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4077 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4078 	}
4079 
4080 	spin_lock_irq(&device->resource->req_lock);
4081  retry:
4082 	os = ns = drbd_read_state(device);
4083 	spin_unlock_irq(&device->resource->req_lock);
4084 
4085 	/* If some other part of the code (ack_receiver thread, timeout)
4086 	 * already decided to close the connection again,
4087 	 * we must not "re-establish" it here. */
4088 	if (os.conn <= C_TEAR_DOWN)
4089 		return -ECONNRESET;
4090 
4091 	/* If this is the "end of sync" confirmation, usually the peer disk
4092 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4093 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4094 	 * unpause-sync events has been "just right", the peer disk may
4095 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4096 	 */
4097 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4098 	    real_peer_disk == D_UP_TO_DATE &&
4099 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4100 		/* If we are (becoming) SyncSource, but peer is still in sync
4101 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4102 		 * will change to inconsistent once the peer reaches active
4103 		 * syncing states.
4104 		 * It may have changed syncer-paused flags, however, so we
4105 		 * cannot ignore this completely. */
4106 		if (peer_state.conn > C_CONNECTED &&
4107 		    peer_state.conn < C_SYNC_SOURCE)
4108 			real_peer_disk = D_INCONSISTENT;
4109 
4110 		/* if peer_state changes to connected at the same time,
4111 		 * it explicitly notifies us that it finished resync.
4112 		 * Maybe we should finish it up, too? */
4113 		else if (os.conn >= C_SYNC_SOURCE &&
4114 			 peer_state.conn == C_CONNECTED) {
4115 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4116 				drbd_resync_finished(device);
4117 			return 0;
4118 		}
4119 	}
4120 
4121 	/* explicit verify finished notification, stop sector reached. */
4122 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4123 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4124 		ov_out_of_sync_print(device);
4125 		drbd_resync_finished(device);
4126 		return 0;
4127 	}
4128 
4129 	/* peer says his disk is inconsistent, while we think it is uptodate,
4130 	 * and this happens while the peer still thinks we have a sync going on,
4131 	 * but we think we are already done with the sync.
4132 	 * We ignore this to avoid flapping pdsk.
4133 	 * This should not happen, if the peer is a recent version of drbd. */
4134 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4135 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4136 		real_peer_disk = D_UP_TO_DATE;
4137 
4138 	if (ns.conn == C_WF_REPORT_PARAMS)
4139 		ns.conn = C_CONNECTED;
4140 
4141 	if (peer_state.conn == C_AHEAD)
4142 		ns.conn = C_BEHIND;
4143 
4144 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4145 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4146 		int cr; /* consider resync */
4147 
4148 		/* if we established a new connection */
4149 		cr  = (os.conn < C_CONNECTED);
4150 		/* if we had an established connection
4151 		 * and one of the nodes newly attaches a disk */
4152 		cr |= (os.conn == C_CONNECTED &&
4153 		       (peer_state.disk == D_NEGOTIATING ||
4154 			os.disk == D_NEGOTIATING));
4155 		/* if we have both been inconsistent, and the peer has been
4156 		 * forced to be UpToDate with --overwrite-data */
4157 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4158 		/* if we had been plain connected, and the admin requested to
4159 		 * start a sync by "invalidate" or "invalidate-remote" */
4160 		cr |= (os.conn == C_CONNECTED &&
4161 				(peer_state.conn >= C_STARTING_SYNC_S &&
4162 				 peer_state.conn <= C_WF_BITMAP_T));
4163 
4164 		if (cr)
4165 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4166 
4167 		put_ldev(device);
4168 		if (ns.conn == C_MASK) {
4169 			ns.conn = C_CONNECTED;
4170 			if (device->state.disk == D_NEGOTIATING) {
4171 				drbd_force_state(device, NS(disk, D_FAILED));
4172 			} else if (peer_state.disk == D_NEGOTIATING) {
4173 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4174 				peer_state.disk = D_DISKLESS;
4175 				real_peer_disk = D_DISKLESS;
4176 			} else {
4177 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4178 					return -EIO;
4179 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4180 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4181 				return -EIO;
4182 			}
4183 		}
4184 	}
4185 
4186 	spin_lock_irq(&device->resource->req_lock);
4187 	if (os.i != drbd_read_state(device).i)
4188 		goto retry;
4189 	clear_bit(CONSIDER_RESYNC, &device->flags);
4190 	ns.peer = peer_state.role;
4191 	ns.pdsk = real_peer_disk;
4192 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4193 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4194 		ns.disk = device->new_state_tmp.disk;
4195 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4196 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4197 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4198 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4199 		   for temporal network outages! */
4200 		spin_unlock_irq(&device->resource->req_lock);
4201 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4202 		tl_clear(peer_device->connection);
4203 		drbd_uuid_new_current(device);
4204 		clear_bit(NEW_CUR_UUID, &device->flags);
4205 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4206 		return -EIO;
4207 	}
4208 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4209 	ns = drbd_read_state(device);
4210 	spin_unlock_irq(&device->resource->req_lock);
4211 
4212 	if (rv < SS_SUCCESS) {
4213 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4214 		return -EIO;
4215 	}
4216 
4217 	if (os.conn > C_WF_REPORT_PARAMS) {
4218 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4219 		    peer_state.disk != D_NEGOTIATING ) {
4220 			/* we want resync, peer has not yet decided to sync... */
4221 			/* Nowadays only used when forcing a node into primary role and
4222 			   setting its disk to UpToDate with that */
4223 			drbd_send_uuids(peer_device);
4224 			drbd_send_current_state(peer_device);
4225 		}
4226 	}
4227 
4228 	clear_bit(DISCARD_MY_DATA, &device->flags);
4229 
4230 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4231 
4232 	return 0;
4233 }
4234 
4235 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4236 {
4237 	struct drbd_peer_device *peer_device;
4238 	struct drbd_device *device;
4239 	struct p_rs_uuid *p = pi->data;
4240 
4241 	peer_device = conn_peer_device(connection, pi->vnr);
4242 	if (!peer_device)
4243 		return -EIO;
4244 	device = peer_device->device;
4245 
4246 	wait_event(device->misc_wait,
4247 		   device->state.conn == C_WF_SYNC_UUID ||
4248 		   device->state.conn == C_BEHIND ||
4249 		   device->state.conn < C_CONNECTED ||
4250 		   device->state.disk < D_NEGOTIATING);
4251 
4252 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4253 
4254 	/* Here the _drbd_uuid_ functions are right, current should
4255 	   _not_ be rotated into the history */
4256 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4257 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4258 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4259 
4260 		drbd_print_uuids(device, "updated sync uuid");
4261 		drbd_start_resync(device, C_SYNC_TARGET);
4262 
4263 		put_ldev(device);
4264 	} else
4265 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4266 
4267 	return 0;
4268 }
4269 
4270 /**
4271  * receive_bitmap_plain
4272  *
4273  * Return 0 when done, 1 when another iteration is needed, and a negative error
4274  * code upon failure.
4275  */
4276 static int
4277 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4278 		     unsigned long *p, struct bm_xfer_ctx *c)
4279 {
4280 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4281 				 drbd_header_size(peer_device->connection);
4282 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4283 				       c->bm_words - c->word_offset);
4284 	unsigned int want = num_words * sizeof(*p);
4285 	int err;
4286 
4287 	if (want != size) {
4288 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4289 		return -EIO;
4290 	}
4291 	if (want == 0)
4292 		return 0;
4293 	err = drbd_recv_all(peer_device->connection, p, want);
4294 	if (err)
4295 		return err;
4296 
4297 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4298 
4299 	c->word_offset += num_words;
4300 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4301 	if (c->bit_offset > c->bm_bits)
4302 		c->bit_offset = c->bm_bits;
4303 
4304 	return 1;
4305 }
4306 
4307 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4308 {
4309 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4310 }
4311 
4312 static int dcbp_get_start(struct p_compressed_bm *p)
4313 {
4314 	return (p->encoding & 0x80) != 0;
4315 }
4316 
4317 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4318 {
4319 	return (p->encoding >> 4) & 0x7;
4320 }
4321 
4322 /**
4323  * recv_bm_rle_bits
4324  *
4325  * Return 0 when done, 1 when another iteration is needed, and a negative error
4326  * code upon failure.
4327  */
4328 static int
4329 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4330 		struct p_compressed_bm *p,
4331 		 struct bm_xfer_ctx *c,
4332 		 unsigned int len)
4333 {
4334 	struct bitstream bs;
4335 	u64 look_ahead;
4336 	u64 rl;
4337 	u64 tmp;
4338 	unsigned long s = c->bit_offset;
4339 	unsigned long e;
4340 	int toggle = dcbp_get_start(p);
4341 	int have;
4342 	int bits;
4343 
4344 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4345 
4346 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4347 	if (bits < 0)
4348 		return -EIO;
4349 
4350 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4351 		bits = vli_decode_bits(&rl, look_ahead);
4352 		if (bits <= 0)
4353 			return -EIO;
4354 
4355 		if (toggle) {
4356 			e = s + rl -1;
4357 			if (e >= c->bm_bits) {
4358 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4359 				return -EIO;
4360 			}
4361 			_drbd_bm_set_bits(peer_device->device, s, e);
4362 		}
4363 
4364 		if (have < bits) {
4365 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4366 				have, bits, look_ahead,
4367 				(unsigned int)(bs.cur.b - p->code),
4368 				(unsigned int)bs.buf_len);
4369 			return -EIO;
4370 		}
4371 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4372 		if (likely(bits < 64))
4373 			look_ahead >>= bits;
4374 		else
4375 			look_ahead = 0;
4376 		have -= bits;
4377 
4378 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4379 		if (bits < 0)
4380 			return -EIO;
4381 		look_ahead |= tmp << have;
4382 		have += bits;
4383 	}
4384 
4385 	c->bit_offset = s;
4386 	bm_xfer_ctx_bit_to_word_offset(c);
4387 
4388 	return (s != c->bm_bits);
4389 }
4390 
4391 /**
4392  * decode_bitmap_c
4393  *
4394  * Return 0 when done, 1 when another iteration is needed, and a negative error
4395  * code upon failure.
4396  */
4397 static int
4398 decode_bitmap_c(struct drbd_peer_device *peer_device,
4399 		struct p_compressed_bm *p,
4400 		struct bm_xfer_ctx *c,
4401 		unsigned int len)
4402 {
4403 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4404 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4405 
4406 	/* other variants had been implemented for evaluation,
4407 	 * but have been dropped as this one turned out to be "best"
4408 	 * during all our tests. */
4409 
4410 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4411 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4412 	return -EIO;
4413 }
4414 
4415 void INFO_bm_xfer_stats(struct drbd_device *device,
4416 		const char *direction, struct bm_xfer_ctx *c)
4417 {
4418 	/* what would it take to transfer it "plaintext" */
4419 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4420 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4421 	unsigned int plain =
4422 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4423 		c->bm_words * sizeof(unsigned long);
4424 	unsigned int total = c->bytes[0] + c->bytes[1];
4425 	unsigned int r;
4426 
4427 	/* total can not be zero. but just in case: */
4428 	if (total == 0)
4429 		return;
4430 
4431 	/* don't report if not compressed */
4432 	if (total >= plain)
4433 		return;
4434 
4435 	/* total < plain. check for overflow, still */
4436 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4437 		                    : (1000 * total / plain);
4438 
4439 	if (r > 1000)
4440 		r = 1000;
4441 
4442 	r = 1000 - r;
4443 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4444 	     "total %u; compression: %u.%u%%\n",
4445 			direction,
4446 			c->bytes[1], c->packets[1],
4447 			c->bytes[0], c->packets[0],
4448 			total, r/10, r % 10);
4449 }
4450 
4451 /* Since we are processing the bitfield from lower addresses to higher,
4452    it does not matter if the process it in 32 bit chunks or 64 bit
4453    chunks as long as it is little endian. (Understand it as byte stream,
4454    beginning with the lowest byte...) If we would use big endian
4455    we would need to process it from the highest address to the lowest,
4456    in order to be agnostic to the 32 vs 64 bits issue.
4457 
4458    returns 0 on failure, 1 if we successfully received it. */
4459 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4460 {
4461 	struct drbd_peer_device *peer_device;
4462 	struct drbd_device *device;
4463 	struct bm_xfer_ctx c;
4464 	int err;
4465 
4466 	peer_device = conn_peer_device(connection, pi->vnr);
4467 	if (!peer_device)
4468 		return -EIO;
4469 	device = peer_device->device;
4470 
4471 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4472 	/* you are supposed to send additional out-of-sync information
4473 	 * if you actually set bits during this phase */
4474 
4475 	c = (struct bm_xfer_ctx) {
4476 		.bm_bits = drbd_bm_bits(device),
4477 		.bm_words = drbd_bm_words(device),
4478 	};
4479 
4480 	for(;;) {
4481 		if (pi->cmd == P_BITMAP)
4482 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4483 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4484 			/* MAYBE: sanity check that we speak proto >= 90,
4485 			 * and the feature is enabled! */
4486 			struct p_compressed_bm *p = pi->data;
4487 
4488 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4489 				drbd_err(device, "ReportCBitmap packet too large\n");
4490 				err = -EIO;
4491 				goto out;
4492 			}
4493 			if (pi->size <= sizeof(*p)) {
4494 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4495 				err = -EIO;
4496 				goto out;
4497 			}
4498 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4499 			if (err)
4500 			       goto out;
4501 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4502 		} else {
4503 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4504 			err = -EIO;
4505 			goto out;
4506 		}
4507 
4508 		c.packets[pi->cmd == P_BITMAP]++;
4509 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4510 
4511 		if (err <= 0) {
4512 			if (err < 0)
4513 				goto out;
4514 			break;
4515 		}
4516 		err = drbd_recv_header(peer_device->connection, pi);
4517 		if (err)
4518 			goto out;
4519 	}
4520 
4521 	INFO_bm_xfer_stats(device, "receive", &c);
4522 
4523 	if (device->state.conn == C_WF_BITMAP_T) {
4524 		enum drbd_state_rv rv;
4525 
4526 		err = drbd_send_bitmap(device);
4527 		if (err)
4528 			goto out;
4529 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4530 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4531 		D_ASSERT(device, rv == SS_SUCCESS);
4532 	} else if (device->state.conn != C_WF_BITMAP_S) {
4533 		/* admin may have requested C_DISCONNECTING,
4534 		 * other threads may have noticed network errors */
4535 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4536 		    drbd_conn_str(device->state.conn));
4537 	}
4538 	err = 0;
4539 
4540  out:
4541 	drbd_bm_unlock(device);
4542 	if (!err && device->state.conn == C_WF_BITMAP_S)
4543 		drbd_start_resync(device, C_SYNC_SOURCE);
4544 	return err;
4545 }
4546 
4547 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4548 {
4549 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4550 		 pi->cmd, pi->size);
4551 
4552 	return ignore_remaining_packet(connection, pi);
4553 }
4554 
4555 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4556 {
4557 	/* Make sure we've acked all the TCP data associated
4558 	 * with the data requests being unplugged */
4559 	drbd_tcp_quickack(connection->data.socket);
4560 
4561 	return 0;
4562 }
4563 
4564 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4565 {
4566 	struct drbd_peer_device *peer_device;
4567 	struct drbd_device *device;
4568 	struct p_block_desc *p = pi->data;
4569 
4570 	peer_device = conn_peer_device(connection, pi->vnr);
4571 	if (!peer_device)
4572 		return -EIO;
4573 	device = peer_device->device;
4574 
4575 	switch (device->state.conn) {
4576 	case C_WF_SYNC_UUID:
4577 	case C_WF_BITMAP_T:
4578 	case C_BEHIND:
4579 			break;
4580 	default:
4581 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4582 				drbd_conn_str(device->state.conn));
4583 	}
4584 
4585 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4586 
4587 	return 0;
4588 }
4589 
4590 struct data_cmd {
4591 	int expect_payload;
4592 	size_t pkt_size;
4593 	int (*fn)(struct drbd_connection *, struct packet_info *);
4594 };
4595 
4596 static struct data_cmd drbd_cmd_handler[] = {
4597 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4598 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4599 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4600 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4601 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4602 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4603 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4604 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4605 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4606 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4607 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4608 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4609 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4610 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4611 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4612 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4613 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4614 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4615 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4616 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4617 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4618 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4619 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4620 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4621 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4622 };
4623 
4624 static void drbdd(struct drbd_connection *connection)
4625 {
4626 	struct packet_info pi;
4627 	size_t shs; /* sub header size */
4628 	int err;
4629 
4630 	while (get_t_state(&connection->receiver) == RUNNING) {
4631 		struct data_cmd *cmd;
4632 
4633 		drbd_thread_current_set_cpu(&connection->receiver);
4634 		update_receiver_timing_details(connection, drbd_recv_header);
4635 		if (drbd_recv_header(connection, &pi))
4636 			goto err_out;
4637 
4638 		cmd = &drbd_cmd_handler[pi.cmd];
4639 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4640 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4641 				 cmdname(pi.cmd), pi.cmd);
4642 			goto err_out;
4643 		}
4644 
4645 		shs = cmd->pkt_size;
4646 		if (pi.size > shs && !cmd->expect_payload) {
4647 			drbd_err(connection, "No payload expected %s l:%d\n",
4648 				 cmdname(pi.cmd), pi.size);
4649 			goto err_out;
4650 		}
4651 
4652 		if (shs) {
4653 			update_receiver_timing_details(connection, drbd_recv_all_warn);
4654 			err = drbd_recv_all_warn(connection, pi.data, shs);
4655 			if (err)
4656 				goto err_out;
4657 			pi.size -= shs;
4658 		}
4659 
4660 		update_receiver_timing_details(connection, cmd->fn);
4661 		err = cmd->fn(connection, &pi);
4662 		if (err) {
4663 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4664 				 cmdname(pi.cmd), err, pi.size);
4665 			goto err_out;
4666 		}
4667 	}
4668 	return;
4669 
4670     err_out:
4671 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4672 }
4673 
4674 static void conn_disconnect(struct drbd_connection *connection)
4675 {
4676 	struct drbd_peer_device *peer_device;
4677 	enum drbd_conns oc;
4678 	int vnr;
4679 
4680 	if (connection->cstate == C_STANDALONE)
4681 		return;
4682 
4683 	/* We are about to start the cleanup after connection loss.
4684 	 * Make sure drbd_make_request knows about that.
4685 	 * Usually we should be in some network failure state already,
4686 	 * but just in case we are not, we fix it up here.
4687 	 */
4688 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4689 
4690 	/* ack_receiver does not clean up anything. it must not interfere, either */
4691 	drbd_thread_stop(&connection->ack_receiver);
4692 	if (connection->ack_sender) {
4693 		destroy_workqueue(connection->ack_sender);
4694 		connection->ack_sender = NULL;
4695 	}
4696 	drbd_free_sock(connection);
4697 
4698 	rcu_read_lock();
4699 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4700 		struct drbd_device *device = peer_device->device;
4701 		kref_get(&device->kref);
4702 		rcu_read_unlock();
4703 		drbd_disconnected(peer_device);
4704 		kref_put(&device->kref, drbd_destroy_device);
4705 		rcu_read_lock();
4706 	}
4707 	rcu_read_unlock();
4708 
4709 	if (!list_empty(&connection->current_epoch->list))
4710 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4711 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4712 	atomic_set(&connection->current_epoch->epoch_size, 0);
4713 	connection->send.seen_any_write_yet = false;
4714 
4715 	drbd_info(connection, "Connection closed\n");
4716 
4717 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4718 		conn_try_outdate_peer_async(connection);
4719 
4720 	spin_lock_irq(&connection->resource->req_lock);
4721 	oc = connection->cstate;
4722 	if (oc >= C_UNCONNECTED)
4723 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4724 
4725 	spin_unlock_irq(&connection->resource->req_lock);
4726 
4727 	if (oc == C_DISCONNECTING)
4728 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4729 }
4730 
4731 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4732 {
4733 	struct drbd_device *device = peer_device->device;
4734 	unsigned int i;
4735 
4736 	/* wait for current activity to cease. */
4737 	spin_lock_irq(&device->resource->req_lock);
4738 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4739 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4740 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4741 	spin_unlock_irq(&device->resource->req_lock);
4742 
4743 	/* We do not have data structures that would allow us to
4744 	 * get the rs_pending_cnt down to 0 again.
4745 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4746 	 *    the pending RSDataRequest's we have sent.
4747 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4748 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4749 	 *  And no, it is not the sum of the reference counts in the
4750 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4751 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4752 	 *  on the fly. */
4753 	drbd_rs_cancel_all(device);
4754 	device->rs_total = 0;
4755 	device->rs_failed = 0;
4756 	atomic_set(&device->rs_pending_cnt, 0);
4757 	wake_up(&device->misc_wait);
4758 
4759 	del_timer_sync(&device->resync_timer);
4760 	resync_timer_fn((unsigned long)device);
4761 
4762 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4763 	 * w_make_resync_request etc. which may still be on the worker queue
4764 	 * to be "canceled" */
4765 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4766 
4767 	drbd_finish_peer_reqs(device);
4768 
4769 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4770 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4771 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4772 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4773 
4774 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4775 	 * again via drbd_try_clear_on_disk_bm(). */
4776 	drbd_rs_cancel_all(device);
4777 
4778 	kfree(device->p_uuid);
4779 	device->p_uuid = NULL;
4780 
4781 	if (!drbd_suspended(device))
4782 		tl_clear(peer_device->connection);
4783 
4784 	drbd_md_sync(device);
4785 
4786 	/* serialize with bitmap writeout triggered by the state change,
4787 	 * if any. */
4788 	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4789 
4790 	/* tcp_close and release of sendpage pages can be deferred.  I don't
4791 	 * want to use SO_LINGER, because apparently it can be deferred for
4792 	 * more than 20 seconds (longest time I checked).
4793 	 *
4794 	 * Actually we don't care for exactly when the network stack does its
4795 	 * put_page(), but release our reference on these pages right here.
4796 	 */
4797 	i = drbd_free_peer_reqs(device, &device->net_ee);
4798 	if (i)
4799 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4800 	i = atomic_read(&device->pp_in_use_by_net);
4801 	if (i)
4802 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4803 	i = atomic_read(&device->pp_in_use);
4804 	if (i)
4805 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4806 
4807 	D_ASSERT(device, list_empty(&device->read_ee));
4808 	D_ASSERT(device, list_empty(&device->active_ee));
4809 	D_ASSERT(device, list_empty(&device->sync_ee));
4810 	D_ASSERT(device, list_empty(&device->done_ee));
4811 
4812 	return 0;
4813 }
4814 
4815 /*
4816  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4817  * we can agree on is stored in agreed_pro_version.
4818  *
4819  * feature flags and the reserved array should be enough room for future
4820  * enhancements of the handshake protocol, and possible plugins...
4821  *
4822  * for now, they are expected to be zero, but ignored.
4823  */
4824 static int drbd_send_features(struct drbd_connection *connection)
4825 {
4826 	struct drbd_socket *sock;
4827 	struct p_connection_features *p;
4828 
4829 	sock = &connection->data;
4830 	p = conn_prepare_command(connection, sock);
4831 	if (!p)
4832 		return -EIO;
4833 	memset(p, 0, sizeof(*p));
4834 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4835 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4836 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4837 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4838 }
4839 
4840 /*
4841  * return values:
4842  *   1 yes, we have a valid connection
4843  *   0 oops, did not work out, please try again
4844  *  -1 peer talks different language,
4845  *     no point in trying again, please go standalone.
4846  */
4847 static int drbd_do_features(struct drbd_connection *connection)
4848 {
4849 	/* ASSERT current == connection->receiver ... */
4850 	struct p_connection_features *p;
4851 	const int expect = sizeof(struct p_connection_features);
4852 	struct packet_info pi;
4853 	int err;
4854 
4855 	err = drbd_send_features(connection);
4856 	if (err)
4857 		return 0;
4858 
4859 	err = drbd_recv_header(connection, &pi);
4860 	if (err)
4861 		return 0;
4862 
4863 	if (pi.cmd != P_CONNECTION_FEATURES) {
4864 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4865 			 cmdname(pi.cmd), pi.cmd);
4866 		return -1;
4867 	}
4868 
4869 	if (pi.size != expect) {
4870 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4871 		     expect, pi.size);
4872 		return -1;
4873 	}
4874 
4875 	p = pi.data;
4876 	err = drbd_recv_all_warn(connection, p, expect);
4877 	if (err)
4878 		return 0;
4879 
4880 	p->protocol_min = be32_to_cpu(p->protocol_min);
4881 	p->protocol_max = be32_to_cpu(p->protocol_max);
4882 	if (p->protocol_max == 0)
4883 		p->protocol_max = p->protocol_min;
4884 
4885 	if (PRO_VERSION_MAX < p->protocol_min ||
4886 	    PRO_VERSION_MIN > p->protocol_max)
4887 		goto incompat;
4888 
4889 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4890 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4891 
4892 	drbd_info(connection, "Handshake successful: "
4893 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
4894 
4895 	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4896 		  connection->agreed_features & FF_TRIM ? " " : " not ");
4897 
4898 	return 1;
4899 
4900  incompat:
4901 	drbd_err(connection, "incompatible DRBD dialects: "
4902 	    "I support %d-%d, peer supports %d-%d\n",
4903 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4904 	    p->protocol_min, p->protocol_max);
4905 	return -1;
4906 }
4907 
4908 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4909 static int drbd_do_auth(struct drbd_connection *connection)
4910 {
4911 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4912 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4913 	return -1;
4914 }
4915 #else
4916 #define CHALLENGE_LEN 64
4917 
4918 /* Return value:
4919 	1 - auth succeeded,
4920 	0 - failed, try again (network error),
4921 	-1 - auth failed, don't try again.
4922 */
4923 
4924 static int drbd_do_auth(struct drbd_connection *connection)
4925 {
4926 	struct drbd_socket *sock;
4927 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4928 	char *response = NULL;
4929 	char *right_response = NULL;
4930 	char *peers_ch = NULL;
4931 	unsigned int key_len;
4932 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4933 	unsigned int resp_size;
4934 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
4935 	struct packet_info pi;
4936 	struct net_conf *nc;
4937 	int err, rv;
4938 
4939 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4940 
4941 	rcu_read_lock();
4942 	nc = rcu_dereference(connection->net_conf);
4943 	key_len = strlen(nc->shared_secret);
4944 	memcpy(secret, nc->shared_secret, key_len);
4945 	rcu_read_unlock();
4946 
4947 	desc->tfm = connection->cram_hmac_tfm;
4948 	desc->flags = 0;
4949 
4950 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4951 	if (rv) {
4952 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
4953 		rv = -1;
4954 		goto fail;
4955 	}
4956 
4957 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4958 
4959 	sock = &connection->data;
4960 	if (!conn_prepare_command(connection, sock)) {
4961 		rv = 0;
4962 		goto fail;
4963 	}
4964 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4965 				my_challenge, CHALLENGE_LEN);
4966 	if (!rv)
4967 		goto fail;
4968 
4969 	err = drbd_recv_header(connection, &pi);
4970 	if (err) {
4971 		rv = 0;
4972 		goto fail;
4973 	}
4974 
4975 	if (pi.cmd != P_AUTH_CHALLENGE) {
4976 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4977 			 cmdname(pi.cmd), pi.cmd);
4978 		rv = 0;
4979 		goto fail;
4980 	}
4981 
4982 	if (pi.size > CHALLENGE_LEN * 2) {
4983 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4984 		rv = -1;
4985 		goto fail;
4986 	}
4987 
4988 	if (pi.size < CHALLENGE_LEN) {
4989 		drbd_err(connection, "AuthChallenge payload too small.\n");
4990 		rv = -1;
4991 		goto fail;
4992 	}
4993 
4994 	peers_ch = kmalloc(pi.size, GFP_NOIO);
4995 	if (peers_ch == NULL) {
4996 		drbd_err(connection, "kmalloc of peers_ch failed\n");
4997 		rv = -1;
4998 		goto fail;
4999 	}
5000 
5001 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5002 	if (err) {
5003 		rv = 0;
5004 		goto fail;
5005 	}
5006 
5007 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5008 		drbd_err(connection, "Peer presented the same challenge!\n");
5009 		rv = -1;
5010 		goto fail;
5011 	}
5012 
5013 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5014 	response = kmalloc(resp_size, GFP_NOIO);
5015 	if (response == NULL) {
5016 		drbd_err(connection, "kmalloc of response failed\n");
5017 		rv = -1;
5018 		goto fail;
5019 	}
5020 
5021 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5022 	if (rv) {
5023 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5024 		rv = -1;
5025 		goto fail;
5026 	}
5027 
5028 	if (!conn_prepare_command(connection, sock)) {
5029 		rv = 0;
5030 		goto fail;
5031 	}
5032 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5033 				response, resp_size);
5034 	if (!rv)
5035 		goto fail;
5036 
5037 	err = drbd_recv_header(connection, &pi);
5038 	if (err) {
5039 		rv = 0;
5040 		goto fail;
5041 	}
5042 
5043 	if (pi.cmd != P_AUTH_RESPONSE) {
5044 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5045 			 cmdname(pi.cmd), pi.cmd);
5046 		rv = 0;
5047 		goto fail;
5048 	}
5049 
5050 	if (pi.size != resp_size) {
5051 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5052 		rv = 0;
5053 		goto fail;
5054 	}
5055 
5056 	err = drbd_recv_all_warn(connection, response , resp_size);
5057 	if (err) {
5058 		rv = 0;
5059 		goto fail;
5060 	}
5061 
5062 	right_response = kmalloc(resp_size, GFP_NOIO);
5063 	if (right_response == NULL) {
5064 		drbd_err(connection, "kmalloc of right_response failed\n");
5065 		rv = -1;
5066 		goto fail;
5067 	}
5068 
5069 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5070 				 right_response);
5071 	if (rv) {
5072 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5073 		rv = -1;
5074 		goto fail;
5075 	}
5076 
5077 	rv = !memcmp(response, right_response, resp_size);
5078 
5079 	if (rv)
5080 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5081 		     resp_size);
5082 	else
5083 		rv = -1;
5084 
5085  fail:
5086 	kfree(peers_ch);
5087 	kfree(response);
5088 	kfree(right_response);
5089 	shash_desc_zero(desc);
5090 
5091 	return rv;
5092 }
5093 #endif
5094 
5095 int drbd_receiver(struct drbd_thread *thi)
5096 {
5097 	struct drbd_connection *connection = thi->connection;
5098 	int h;
5099 
5100 	drbd_info(connection, "receiver (re)started\n");
5101 
5102 	do {
5103 		h = conn_connect(connection);
5104 		if (h == 0) {
5105 			conn_disconnect(connection);
5106 			schedule_timeout_interruptible(HZ);
5107 		}
5108 		if (h == -1) {
5109 			drbd_warn(connection, "Discarding network configuration.\n");
5110 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5111 		}
5112 	} while (h == 0);
5113 
5114 	if (h > 0)
5115 		drbdd(connection);
5116 
5117 	conn_disconnect(connection);
5118 
5119 	drbd_info(connection, "receiver terminated\n");
5120 	return 0;
5121 }
5122 
5123 /* ********* acknowledge sender ******** */
5124 
5125 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5126 {
5127 	struct p_req_state_reply *p = pi->data;
5128 	int retcode = be32_to_cpu(p->retcode);
5129 
5130 	if (retcode >= SS_SUCCESS) {
5131 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5132 	} else {
5133 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5134 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5135 			 drbd_set_st_err_str(retcode), retcode);
5136 	}
5137 	wake_up(&connection->ping_wait);
5138 
5139 	return 0;
5140 }
5141 
5142 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5143 {
5144 	struct drbd_peer_device *peer_device;
5145 	struct drbd_device *device;
5146 	struct p_req_state_reply *p = pi->data;
5147 	int retcode = be32_to_cpu(p->retcode);
5148 
5149 	peer_device = conn_peer_device(connection, pi->vnr);
5150 	if (!peer_device)
5151 		return -EIO;
5152 	device = peer_device->device;
5153 
5154 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5155 		D_ASSERT(device, connection->agreed_pro_version < 100);
5156 		return got_conn_RqSReply(connection, pi);
5157 	}
5158 
5159 	if (retcode >= SS_SUCCESS) {
5160 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5161 	} else {
5162 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5163 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5164 			drbd_set_st_err_str(retcode), retcode);
5165 	}
5166 	wake_up(&device->state_wait);
5167 
5168 	return 0;
5169 }
5170 
5171 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5172 {
5173 	return drbd_send_ping_ack(connection);
5174 
5175 }
5176 
5177 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5178 {
5179 	/* restore idle timeout */
5180 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5181 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5182 		wake_up(&connection->ping_wait);
5183 
5184 	return 0;
5185 }
5186 
5187 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5188 {
5189 	struct drbd_peer_device *peer_device;
5190 	struct drbd_device *device;
5191 	struct p_block_ack *p = pi->data;
5192 	sector_t sector = be64_to_cpu(p->sector);
5193 	int blksize = be32_to_cpu(p->blksize);
5194 
5195 	peer_device = conn_peer_device(connection, pi->vnr);
5196 	if (!peer_device)
5197 		return -EIO;
5198 	device = peer_device->device;
5199 
5200 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5201 
5202 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5203 
5204 	if (get_ldev(device)) {
5205 		drbd_rs_complete_io(device, sector);
5206 		drbd_set_in_sync(device, sector, blksize);
5207 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5208 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5209 		put_ldev(device);
5210 	}
5211 	dec_rs_pending(device);
5212 	atomic_add(blksize >> 9, &device->rs_sect_in);
5213 
5214 	return 0;
5215 }
5216 
5217 static int
5218 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5219 			      struct rb_root *root, const char *func,
5220 			      enum drbd_req_event what, bool missing_ok)
5221 {
5222 	struct drbd_request *req;
5223 	struct bio_and_error m;
5224 
5225 	spin_lock_irq(&device->resource->req_lock);
5226 	req = find_request(device, root, id, sector, missing_ok, func);
5227 	if (unlikely(!req)) {
5228 		spin_unlock_irq(&device->resource->req_lock);
5229 		return -EIO;
5230 	}
5231 	__req_mod(req, what, &m);
5232 	spin_unlock_irq(&device->resource->req_lock);
5233 
5234 	if (m.bio)
5235 		complete_master_bio(device, &m);
5236 	return 0;
5237 }
5238 
5239 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5240 {
5241 	struct drbd_peer_device *peer_device;
5242 	struct drbd_device *device;
5243 	struct p_block_ack *p = pi->data;
5244 	sector_t sector = be64_to_cpu(p->sector);
5245 	int blksize = be32_to_cpu(p->blksize);
5246 	enum drbd_req_event what;
5247 
5248 	peer_device = conn_peer_device(connection, pi->vnr);
5249 	if (!peer_device)
5250 		return -EIO;
5251 	device = peer_device->device;
5252 
5253 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5254 
5255 	if (p->block_id == ID_SYNCER) {
5256 		drbd_set_in_sync(device, sector, blksize);
5257 		dec_rs_pending(device);
5258 		return 0;
5259 	}
5260 	switch (pi->cmd) {
5261 	case P_RS_WRITE_ACK:
5262 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5263 		break;
5264 	case P_WRITE_ACK:
5265 		what = WRITE_ACKED_BY_PEER;
5266 		break;
5267 	case P_RECV_ACK:
5268 		what = RECV_ACKED_BY_PEER;
5269 		break;
5270 	case P_SUPERSEDED:
5271 		what = CONFLICT_RESOLVED;
5272 		break;
5273 	case P_RETRY_WRITE:
5274 		what = POSTPONE_WRITE;
5275 		break;
5276 	default:
5277 		BUG();
5278 	}
5279 
5280 	return validate_req_change_req_state(device, p->block_id, sector,
5281 					     &device->write_requests, __func__,
5282 					     what, false);
5283 }
5284 
5285 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5286 {
5287 	struct drbd_peer_device *peer_device;
5288 	struct drbd_device *device;
5289 	struct p_block_ack *p = pi->data;
5290 	sector_t sector = be64_to_cpu(p->sector);
5291 	int size = be32_to_cpu(p->blksize);
5292 	int err;
5293 
5294 	peer_device = conn_peer_device(connection, pi->vnr);
5295 	if (!peer_device)
5296 		return -EIO;
5297 	device = peer_device->device;
5298 
5299 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5300 
5301 	if (p->block_id == ID_SYNCER) {
5302 		dec_rs_pending(device);
5303 		drbd_rs_failed_io(device, sector, size);
5304 		return 0;
5305 	}
5306 
5307 	err = validate_req_change_req_state(device, p->block_id, sector,
5308 					    &device->write_requests, __func__,
5309 					    NEG_ACKED, true);
5310 	if (err) {
5311 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5312 		   The master bio might already be completed, therefore the
5313 		   request is no longer in the collision hash. */
5314 		/* In Protocol B we might already have got a P_RECV_ACK
5315 		   but then get a P_NEG_ACK afterwards. */
5316 		drbd_set_out_of_sync(device, sector, size);
5317 	}
5318 	return 0;
5319 }
5320 
5321 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5322 {
5323 	struct drbd_peer_device *peer_device;
5324 	struct drbd_device *device;
5325 	struct p_block_ack *p = pi->data;
5326 	sector_t sector = be64_to_cpu(p->sector);
5327 
5328 	peer_device = conn_peer_device(connection, pi->vnr);
5329 	if (!peer_device)
5330 		return -EIO;
5331 	device = peer_device->device;
5332 
5333 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5334 
5335 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5336 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5337 
5338 	return validate_req_change_req_state(device, p->block_id, sector,
5339 					     &device->read_requests, __func__,
5340 					     NEG_ACKED, false);
5341 }
5342 
5343 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5344 {
5345 	struct drbd_peer_device *peer_device;
5346 	struct drbd_device *device;
5347 	sector_t sector;
5348 	int size;
5349 	struct p_block_ack *p = pi->data;
5350 
5351 	peer_device = conn_peer_device(connection, pi->vnr);
5352 	if (!peer_device)
5353 		return -EIO;
5354 	device = peer_device->device;
5355 
5356 	sector = be64_to_cpu(p->sector);
5357 	size = be32_to_cpu(p->blksize);
5358 
5359 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5360 
5361 	dec_rs_pending(device);
5362 
5363 	if (get_ldev_if_state(device, D_FAILED)) {
5364 		drbd_rs_complete_io(device, sector);
5365 		switch (pi->cmd) {
5366 		case P_NEG_RS_DREPLY:
5367 			drbd_rs_failed_io(device, sector, size);
5368 		case P_RS_CANCEL:
5369 			break;
5370 		default:
5371 			BUG();
5372 		}
5373 		put_ldev(device);
5374 	}
5375 
5376 	return 0;
5377 }
5378 
5379 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5380 {
5381 	struct p_barrier_ack *p = pi->data;
5382 	struct drbd_peer_device *peer_device;
5383 	int vnr;
5384 
5385 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5386 
5387 	rcu_read_lock();
5388 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5389 		struct drbd_device *device = peer_device->device;
5390 
5391 		if (device->state.conn == C_AHEAD &&
5392 		    atomic_read(&device->ap_in_flight) == 0 &&
5393 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5394 			device->start_resync_timer.expires = jiffies + HZ;
5395 			add_timer(&device->start_resync_timer);
5396 		}
5397 	}
5398 	rcu_read_unlock();
5399 
5400 	return 0;
5401 }
5402 
5403 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5404 {
5405 	struct drbd_peer_device *peer_device;
5406 	struct drbd_device *device;
5407 	struct p_block_ack *p = pi->data;
5408 	struct drbd_device_work *dw;
5409 	sector_t sector;
5410 	int size;
5411 
5412 	peer_device = conn_peer_device(connection, pi->vnr);
5413 	if (!peer_device)
5414 		return -EIO;
5415 	device = peer_device->device;
5416 
5417 	sector = be64_to_cpu(p->sector);
5418 	size = be32_to_cpu(p->blksize);
5419 
5420 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5421 
5422 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5423 		drbd_ov_out_of_sync_found(device, sector, size);
5424 	else
5425 		ov_out_of_sync_print(device);
5426 
5427 	if (!get_ldev(device))
5428 		return 0;
5429 
5430 	drbd_rs_complete_io(device, sector);
5431 	dec_rs_pending(device);
5432 
5433 	--device->ov_left;
5434 
5435 	/* let's advance progress step marks only for every other megabyte */
5436 	if ((device->ov_left & 0x200) == 0x200)
5437 		drbd_advance_rs_marks(device, device->ov_left);
5438 
5439 	if (device->ov_left == 0) {
5440 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5441 		if (dw) {
5442 			dw->w.cb = w_ov_finished;
5443 			dw->device = device;
5444 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5445 		} else {
5446 			drbd_err(device, "kmalloc(dw) failed.");
5447 			ov_out_of_sync_print(device);
5448 			drbd_resync_finished(device);
5449 		}
5450 	}
5451 	put_ldev(device);
5452 	return 0;
5453 }
5454 
5455 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5456 {
5457 	return 0;
5458 }
5459 
5460 struct meta_sock_cmd {
5461 	size_t pkt_size;
5462 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5463 };
5464 
5465 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5466 {
5467 	long t;
5468 	struct net_conf *nc;
5469 
5470 	rcu_read_lock();
5471 	nc = rcu_dereference(connection->net_conf);
5472 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5473 	rcu_read_unlock();
5474 
5475 	t *= HZ;
5476 	if (ping_timeout)
5477 		t /= 10;
5478 
5479 	connection->meta.socket->sk->sk_rcvtimeo = t;
5480 }
5481 
5482 static void set_ping_timeout(struct drbd_connection *connection)
5483 {
5484 	set_rcvtimeo(connection, 1);
5485 }
5486 
5487 static void set_idle_timeout(struct drbd_connection *connection)
5488 {
5489 	set_rcvtimeo(connection, 0);
5490 }
5491 
5492 static struct meta_sock_cmd ack_receiver_tbl[] = {
5493 	[P_PING]	    = { 0, got_Ping },
5494 	[P_PING_ACK]	    = { 0, got_PingAck },
5495 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5496 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5497 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5498 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5499 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5500 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5501 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5502 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5503 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5504 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5505 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5506 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5507 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5508 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5509 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5510 };
5511 
5512 int drbd_ack_receiver(struct drbd_thread *thi)
5513 {
5514 	struct drbd_connection *connection = thi->connection;
5515 	struct meta_sock_cmd *cmd = NULL;
5516 	struct packet_info pi;
5517 	unsigned long pre_recv_jif;
5518 	int rv;
5519 	void *buf    = connection->meta.rbuf;
5520 	int received = 0;
5521 	unsigned int header_size = drbd_header_size(connection);
5522 	int expect   = header_size;
5523 	bool ping_timeout_active = false;
5524 	struct sched_param param = { .sched_priority = 2 };
5525 
5526 	rv = sched_setscheduler(current, SCHED_RR, &param);
5527 	if (rv < 0)
5528 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5529 
5530 	while (get_t_state(thi) == RUNNING) {
5531 		drbd_thread_current_set_cpu(thi);
5532 
5533 		conn_reclaim_net_peer_reqs(connection);
5534 
5535 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5536 			if (drbd_send_ping(connection)) {
5537 				drbd_err(connection, "drbd_send_ping has failed\n");
5538 				goto reconnect;
5539 			}
5540 			set_ping_timeout(connection);
5541 			ping_timeout_active = true;
5542 		}
5543 
5544 		pre_recv_jif = jiffies;
5545 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5546 
5547 		/* Note:
5548 		 * -EINTR	 (on meta) we got a signal
5549 		 * -EAGAIN	 (on meta) rcvtimeo expired
5550 		 * -ECONNRESET	 other side closed the connection
5551 		 * -ERESTARTSYS  (on data) we got a signal
5552 		 * rv <  0	 other than above: unexpected error!
5553 		 * rv == expected: full header or command
5554 		 * rv <  expected: "woken" by signal during receive
5555 		 * rv == 0	 : "connection shut down by peer"
5556 		 */
5557 		if (likely(rv > 0)) {
5558 			received += rv;
5559 			buf	 += rv;
5560 		} else if (rv == 0) {
5561 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5562 				long t;
5563 				rcu_read_lock();
5564 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5565 				rcu_read_unlock();
5566 
5567 				t = wait_event_timeout(connection->ping_wait,
5568 						       connection->cstate < C_WF_REPORT_PARAMS,
5569 						       t);
5570 				if (t)
5571 					break;
5572 			}
5573 			drbd_err(connection, "meta connection shut down by peer.\n");
5574 			goto reconnect;
5575 		} else if (rv == -EAGAIN) {
5576 			/* If the data socket received something meanwhile,
5577 			 * that is good enough: peer is still alive. */
5578 			if (time_after(connection->last_received, pre_recv_jif))
5579 				continue;
5580 			if (ping_timeout_active) {
5581 				drbd_err(connection, "PingAck did not arrive in time.\n");
5582 				goto reconnect;
5583 			}
5584 			set_bit(SEND_PING, &connection->flags);
5585 			continue;
5586 		} else if (rv == -EINTR) {
5587 			/* maybe drbd_thread_stop(): the while condition will notice.
5588 			 * maybe woken for send_ping: we'll send a ping above,
5589 			 * and change the rcvtimeo */
5590 			flush_signals(current);
5591 			continue;
5592 		} else {
5593 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5594 			goto reconnect;
5595 		}
5596 
5597 		if (received == expect && cmd == NULL) {
5598 			if (decode_header(connection, connection->meta.rbuf, &pi))
5599 				goto reconnect;
5600 			cmd = &ack_receiver_tbl[pi.cmd];
5601 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5602 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5603 					 cmdname(pi.cmd), pi.cmd);
5604 				goto disconnect;
5605 			}
5606 			expect = header_size + cmd->pkt_size;
5607 			if (pi.size != expect - header_size) {
5608 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5609 					pi.cmd, pi.size);
5610 				goto reconnect;
5611 			}
5612 		}
5613 		if (received == expect) {
5614 			bool err;
5615 
5616 			err = cmd->fn(connection, &pi);
5617 			if (err) {
5618 				drbd_err(connection, "%pf failed\n", cmd->fn);
5619 				goto reconnect;
5620 			}
5621 
5622 			connection->last_received = jiffies;
5623 
5624 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5625 				set_idle_timeout(connection);
5626 				ping_timeout_active = false;
5627 			}
5628 
5629 			buf	 = connection->meta.rbuf;
5630 			received = 0;
5631 			expect	 = header_size;
5632 			cmd	 = NULL;
5633 		}
5634 	}
5635 
5636 	if (0) {
5637 reconnect:
5638 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5639 		conn_md_sync(connection);
5640 	}
5641 	if (0) {
5642 disconnect:
5643 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5644 	}
5645 
5646 	drbd_info(connection, "ack_receiver terminated\n");
5647 
5648 	return 0;
5649 }
5650 
5651 void drbd_send_acks_wf(struct work_struct *ws)
5652 {
5653 	struct drbd_peer_device *peer_device =
5654 		container_of(ws, struct drbd_peer_device, send_acks_work);
5655 	struct drbd_connection *connection = peer_device->connection;
5656 	struct drbd_device *device = peer_device->device;
5657 	struct net_conf *nc;
5658 	int tcp_cork, err;
5659 
5660 	rcu_read_lock();
5661 	nc = rcu_dereference(connection->net_conf);
5662 	tcp_cork = nc->tcp_cork;
5663 	rcu_read_unlock();
5664 
5665 	if (tcp_cork)
5666 		drbd_tcp_cork(connection->meta.socket);
5667 
5668 	err = drbd_finish_peer_reqs(device);
5669 	kref_put(&device->kref, drbd_destroy_device);
5670 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5671 	   struct work_struct send_acks_work alive, which is in the peer_device object */
5672 
5673 	if (err) {
5674 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5675 		return;
5676 	}
5677 
5678 	if (tcp_cork)
5679 		drbd_tcp_uncork(connection->meta.socket);
5680 
5681 	return;
5682 }
5683