xref: /linux/drivers/block/drbd/drbd_receiver.c (revision 3f07c0144132e4f59d88055ac8ff3e691a5fa2b8)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/string.h>
46 #include <linux/scatterlist.h>
47 #include "drbd_int.h"
48 #include "drbd_protocol.h"
49 #include "drbd_req.h"
50 #include "drbd_vli.h"
51 
52 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
53 
54 struct packet_info {
55 	enum drbd_packet cmd;
56 	unsigned int size;
57 	unsigned int vnr;
58 	void *data;
59 };
60 
61 enum finish_epoch {
62 	FE_STILL_LIVE,
63 	FE_DESTROYED,
64 	FE_RECYCLED,
65 };
66 
67 static int drbd_do_features(struct drbd_connection *connection);
68 static int drbd_do_auth(struct drbd_connection *connection);
69 static int drbd_disconnected(struct drbd_peer_device *);
70 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
71 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
72 static int e_end_block(struct drbd_work *, int);
73 
74 
75 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
76 
77 /*
78  * some helper functions to deal with single linked page lists,
79  * page->private being our "next" pointer.
80  */
81 
82 /* If at least n pages are linked at head, get n pages off.
83  * Otherwise, don't modify head, and return NULL.
84  * Locking is the responsibility of the caller.
85  */
86 static struct page *page_chain_del(struct page **head, int n)
87 {
88 	struct page *page;
89 	struct page *tmp;
90 
91 	BUG_ON(!n);
92 	BUG_ON(!head);
93 
94 	page = *head;
95 
96 	if (!page)
97 		return NULL;
98 
99 	while (page) {
100 		tmp = page_chain_next(page);
101 		if (--n == 0)
102 			break; /* found sufficient pages */
103 		if (tmp == NULL)
104 			/* insufficient pages, don't use any of them. */
105 			return NULL;
106 		page = tmp;
107 	}
108 
109 	/* add end of list marker for the returned list */
110 	set_page_private(page, 0);
111 	/* actual return value, and adjustment of head */
112 	page = *head;
113 	*head = tmp;
114 	return page;
115 }
116 
117 /* may be used outside of locks to find the tail of a (usually short)
118  * "private" page chain, before adding it back to a global chain head
119  * with page_chain_add() under a spinlock. */
120 static struct page *page_chain_tail(struct page *page, int *len)
121 {
122 	struct page *tmp;
123 	int i = 1;
124 	while ((tmp = page_chain_next(page)))
125 		++i, page = tmp;
126 	if (len)
127 		*len = i;
128 	return page;
129 }
130 
131 static int page_chain_free(struct page *page)
132 {
133 	struct page *tmp;
134 	int i = 0;
135 	page_chain_for_each_safe(page, tmp) {
136 		put_page(page);
137 		++i;
138 	}
139 	return i;
140 }
141 
142 static void page_chain_add(struct page **head,
143 		struct page *chain_first, struct page *chain_last)
144 {
145 #if 1
146 	struct page *tmp;
147 	tmp = page_chain_tail(chain_first, NULL);
148 	BUG_ON(tmp != chain_last);
149 #endif
150 
151 	/* add chain to head */
152 	set_page_private(chain_last, (unsigned long)*head);
153 	*head = chain_first;
154 }
155 
156 static struct page *__drbd_alloc_pages(struct drbd_device *device,
157 				       unsigned int number)
158 {
159 	struct page *page = NULL;
160 	struct page *tmp = NULL;
161 	unsigned int i = 0;
162 
163 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
164 	 * So what. It saves a spin_lock. */
165 	if (drbd_pp_vacant >= number) {
166 		spin_lock(&drbd_pp_lock);
167 		page = page_chain_del(&drbd_pp_pool, number);
168 		if (page)
169 			drbd_pp_vacant -= number;
170 		spin_unlock(&drbd_pp_lock);
171 		if (page)
172 			return page;
173 	}
174 
175 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
176 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
177 	 * which in turn might block on the other node at this very place.  */
178 	for (i = 0; i < number; i++) {
179 		tmp = alloc_page(GFP_TRY);
180 		if (!tmp)
181 			break;
182 		set_page_private(tmp, (unsigned long)page);
183 		page = tmp;
184 	}
185 
186 	if (i == number)
187 		return page;
188 
189 	/* Not enough pages immediately available this time.
190 	 * No need to jump around here, drbd_alloc_pages will retry this
191 	 * function "soon". */
192 	if (page) {
193 		tmp = page_chain_tail(page, NULL);
194 		spin_lock(&drbd_pp_lock);
195 		page_chain_add(&drbd_pp_pool, page, tmp);
196 		drbd_pp_vacant += i;
197 		spin_unlock(&drbd_pp_lock);
198 	}
199 	return NULL;
200 }
201 
202 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
203 					   struct list_head *to_be_freed)
204 {
205 	struct drbd_peer_request *peer_req, *tmp;
206 
207 	/* The EEs are always appended to the end of the list. Since
208 	   they are sent in order over the wire, they have to finish
209 	   in order. As soon as we see the first not finished we can
210 	   stop to examine the list... */
211 
212 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
213 		if (drbd_peer_req_has_active_page(peer_req))
214 			break;
215 		list_move(&peer_req->w.list, to_be_freed);
216 	}
217 }
218 
219 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
220 {
221 	LIST_HEAD(reclaimed);
222 	struct drbd_peer_request *peer_req, *t;
223 
224 	spin_lock_irq(&device->resource->req_lock);
225 	reclaim_finished_net_peer_reqs(device, &reclaimed);
226 	spin_unlock_irq(&device->resource->req_lock);
227 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 		drbd_free_net_peer_req(device, peer_req);
229 }
230 
231 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
232 {
233 	struct drbd_peer_device *peer_device;
234 	int vnr;
235 
236 	rcu_read_lock();
237 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
238 		struct drbd_device *device = peer_device->device;
239 		if (!atomic_read(&device->pp_in_use_by_net))
240 			continue;
241 
242 		kref_get(&device->kref);
243 		rcu_read_unlock();
244 		drbd_reclaim_net_peer_reqs(device);
245 		kref_put(&device->kref, drbd_destroy_device);
246 		rcu_read_lock();
247 	}
248 	rcu_read_unlock();
249 }
250 
251 /**
252  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
253  * @device:	DRBD device.
254  * @number:	number of pages requested
255  * @retry:	whether to retry, if not enough pages are available right now
256  *
257  * Tries to allocate number pages, first from our own page pool, then from
258  * the kernel.
259  * Possibly retry until DRBD frees sufficient pages somewhere else.
260  *
261  * If this allocation would exceed the max_buffers setting, we throttle
262  * allocation (schedule_timeout) to give the system some room to breathe.
263  *
264  * We do not use max-buffers as hard limit, because it could lead to
265  * congestion and further to a distributed deadlock during online-verify or
266  * (checksum based) resync, if the max-buffers, socket buffer sizes and
267  * resync-rate settings are mis-configured.
268  *
269  * Returns a page chain linked via page->private.
270  */
271 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
272 			      bool retry)
273 {
274 	struct drbd_device *device = peer_device->device;
275 	struct page *page = NULL;
276 	struct net_conf *nc;
277 	DEFINE_WAIT(wait);
278 	unsigned int mxb;
279 
280 	rcu_read_lock();
281 	nc = rcu_dereference(peer_device->connection->net_conf);
282 	mxb = nc ? nc->max_buffers : 1000000;
283 	rcu_read_unlock();
284 
285 	if (atomic_read(&device->pp_in_use) < mxb)
286 		page = __drbd_alloc_pages(device, number);
287 
288 	/* Try to keep the fast path fast, but occasionally we need
289 	 * to reclaim the pages we lended to the network stack. */
290 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
291 		drbd_reclaim_net_peer_reqs(device);
292 
293 	while (page == NULL) {
294 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
295 
296 		drbd_reclaim_net_peer_reqs(device);
297 
298 		if (atomic_read(&device->pp_in_use) < mxb) {
299 			page = __drbd_alloc_pages(device, number);
300 			if (page)
301 				break;
302 		}
303 
304 		if (!retry)
305 			break;
306 
307 		if (signal_pending(current)) {
308 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
309 			break;
310 		}
311 
312 		if (schedule_timeout(HZ/10) == 0)
313 			mxb = UINT_MAX;
314 	}
315 	finish_wait(&drbd_pp_wait, &wait);
316 
317 	if (page)
318 		atomic_add(number, &device->pp_in_use);
319 	return page;
320 }
321 
322 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
323  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
324  * Either links the page chain back to the global pool,
325  * or returns all pages to the system. */
326 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
327 {
328 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
329 	int i;
330 
331 	if (page == NULL)
332 		return;
333 
334 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
335 		i = page_chain_free(page);
336 	else {
337 		struct page *tmp;
338 		tmp = page_chain_tail(page, &i);
339 		spin_lock(&drbd_pp_lock);
340 		page_chain_add(&drbd_pp_pool, page, tmp);
341 		drbd_pp_vacant += i;
342 		spin_unlock(&drbd_pp_lock);
343 	}
344 	i = atomic_sub_return(i, a);
345 	if (i < 0)
346 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
347 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
348 	wake_up(&drbd_pp_wait);
349 }
350 
351 /*
352 You need to hold the req_lock:
353  _drbd_wait_ee_list_empty()
354 
355 You must not have the req_lock:
356  drbd_free_peer_req()
357  drbd_alloc_peer_req()
358  drbd_free_peer_reqs()
359  drbd_ee_fix_bhs()
360  drbd_finish_peer_reqs()
361  drbd_clear_done_ee()
362  drbd_wait_ee_list_empty()
363 */
364 
365 /* normal: payload_size == request size (bi_size)
366  * w_same: payload_size == logical_block_size
367  * trim: payload_size == 0 */
368 struct drbd_peer_request *
369 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
370 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
371 {
372 	struct drbd_device *device = peer_device->device;
373 	struct drbd_peer_request *peer_req;
374 	struct page *page = NULL;
375 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
376 
377 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
378 		return NULL;
379 
380 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
381 	if (!peer_req) {
382 		if (!(gfp_mask & __GFP_NOWARN))
383 			drbd_err(device, "%s: allocation failed\n", __func__);
384 		return NULL;
385 	}
386 
387 	if (nr_pages) {
388 		page = drbd_alloc_pages(peer_device, nr_pages,
389 					gfpflags_allow_blocking(gfp_mask));
390 		if (!page)
391 			goto fail;
392 	}
393 
394 	memset(peer_req, 0, sizeof(*peer_req));
395 	INIT_LIST_HEAD(&peer_req->w.list);
396 	drbd_clear_interval(&peer_req->i);
397 	peer_req->i.size = request_size;
398 	peer_req->i.sector = sector;
399 	peer_req->submit_jif = jiffies;
400 	peer_req->peer_device = peer_device;
401 	peer_req->pages = page;
402 	/*
403 	 * The block_id is opaque to the receiver.  It is not endianness
404 	 * converted, and sent back to the sender unchanged.
405 	 */
406 	peer_req->block_id = id;
407 
408 	return peer_req;
409 
410  fail:
411 	mempool_free(peer_req, drbd_ee_mempool);
412 	return NULL;
413 }
414 
415 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
416 		       int is_net)
417 {
418 	might_sleep();
419 	if (peer_req->flags & EE_HAS_DIGEST)
420 		kfree(peer_req->digest);
421 	drbd_free_pages(device, peer_req->pages, is_net);
422 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
423 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
424 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
425 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
426 		drbd_al_complete_io(device, &peer_req->i);
427 	}
428 	mempool_free(peer_req, drbd_ee_mempool);
429 }
430 
431 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
432 {
433 	LIST_HEAD(work_list);
434 	struct drbd_peer_request *peer_req, *t;
435 	int count = 0;
436 	int is_net = list == &device->net_ee;
437 
438 	spin_lock_irq(&device->resource->req_lock);
439 	list_splice_init(list, &work_list);
440 	spin_unlock_irq(&device->resource->req_lock);
441 
442 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443 		__drbd_free_peer_req(device, peer_req, is_net);
444 		count++;
445 	}
446 	return count;
447 }
448 
449 /*
450  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
451  */
452 static int drbd_finish_peer_reqs(struct drbd_device *device)
453 {
454 	LIST_HEAD(work_list);
455 	LIST_HEAD(reclaimed);
456 	struct drbd_peer_request *peer_req, *t;
457 	int err = 0;
458 
459 	spin_lock_irq(&device->resource->req_lock);
460 	reclaim_finished_net_peer_reqs(device, &reclaimed);
461 	list_splice_init(&device->done_ee, &work_list);
462 	spin_unlock_irq(&device->resource->req_lock);
463 
464 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
465 		drbd_free_net_peer_req(device, peer_req);
466 
467 	/* possible callbacks here:
468 	 * e_end_block, and e_end_resync_block, e_send_superseded.
469 	 * all ignore the last argument.
470 	 */
471 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
472 		int err2;
473 
474 		/* list_del not necessary, next/prev members not touched */
475 		err2 = peer_req->w.cb(&peer_req->w, !!err);
476 		if (!err)
477 			err = err2;
478 		drbd_free_peer_req(device, peer_req);
479 	}
480 	wake_up(&device->ee_wait);
481 
482 	return err;
483 }
484 
485 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
486 				     struct list_head *head)
487 {
488 	DEFINE_WAIT(wait);
489 
490 	/* avoids spin_lock/unlock
491 	 * and calling prepare_to_wait in the fast path */
492 	while (!list_empty(head)) {
493 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
494 		spin_unlock_irq(&device->resource->req_lock);
495 		io_schedule();
496 		finish_wait(&device->ee_wait, &wait);
497 		spin_lock_irq(&device->resource->req_lock);
498 	}
499 }
500 
501 static void drbd_wait_ee_list_empty(struct drbd_device *device,
502 				    struct list_head *head)
503 {
504 	spin_lock_irq(&device->resource->req_lock);
505 	_drbd_wait_ee_list_empty(device, head);
506 	spin_unlock_irq(&device->resource->req_lock);
507 }
508 
509 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
510 {
511 	struct kvec iov = {
512 		.iov_base = buf,
513 		.iov_len = size,
514 	};
515 	struct msghdr msg = {
516 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
517 	};
518 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
519 }
520 
521 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
522 {
523 	int rv;
524 
525 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
526 
527 	if (rv < 0) {
528 		if (rv == -ECONNRESET)
529 			drbd_info(connection, "sock was reset by peer\n");
530 		else if (rv != -ERESTARTSYS)
531 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
532 	} else if (rv == 0) {
533 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
534 			long t;
535 			rcu_read_lock();
536 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
537 			rcu_read_unlock();
538 
539 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
540 
541 			if (t)
542 				goto out;
543 		}
544 		drbd_info(connection, "sock was shut down by peer\n");
545 	}
546 
547 	if (rv != size)
548 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
549 
550 out:
551 	return rv;
552 }
553 
554 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
555 {
556 	int err;
557 
558 	err = drbd_recv(connection, buf, size);
559 	if (err != size) {
560 		if (err >= 0)
561 			err = -EIO;
562 	} else
563 		err = 0;
564 	return err;
565 }
566 
567 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
568 {
569 	int err;
570 
571 	err = drbd_recv_all(connection, buf, size);
572 	if (err && !signal_pending(current))
573 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
574 	return err;
575 }
576 
577 /* quoting tcp(7):
578  *   On individual connections, the socket buffer size must be set prior to the
579  *   listen(2) or connect(2) calls in order to have it take effect.
580  * This is our wrapper to do so.
581  */
582 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
583 		unsigned int rcv)
584 {
585 	/* open coded SO_SNDBUF, SO_RCVBUF */
586 	if (snd) {
587 		sock->sk->sk_sndbuf = snd;
588 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
589 	}
590 	if (rcv) {
591 		sock->sk->sk_rcvbuf = rcv;
592 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
593 	}
594 }
595 
596 static struct socket *drbd_try_connect(struct drbd_connection *connection)
597 {
598 	const char *what;
599 	struct socket *sock;
600 	struct sockaddr_in6 src_in6;
601 	struct sockaddr_in6 peer_in6;
602 	struct net_conf *nc;
603 	int err, peer_addr_len, my_addr_len;
604 	int sndbuf_size, rcvbuf_size, connect_int;
605 	int disconnect_on_error = 1;
606 
607 	rcu_read_lock();
608 	nc = rcu_dereference(connection->net_conf);
609 	if (!nc) {
610 		rcu_read_unlock();
611 		return NULL;
612 	}
613 	sndbuf_size = nc->sndbuf_size;
614 	rcvbuf_size = nc->rcvbuf_size;
615 	connect_int = nc->connect_int;
616 	rcu_read_unlock();
617 
618 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
619 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
620 
621 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
622 		src_in6.sin6_port = 0;
623 	else
624 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
625 
626 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
627 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
628 
629 	what = "sock_create_kern";
630 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
631 			       SOCK_STREAM, IPPROTO_TCP, &sock);
632 	if (err < 0) {
633 		sock = NULL;
634 		goto out;
635 	}
636 
637 	sock->sk->sk_rcvtimeo =
638 	sock->sk->sk_sndtimeo = connect_int * HZ;
639 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
640 
641        /* explicitly bind to the configured IP as source IP
642 	*  for the outgoing connections.
643 	*  This is needed for multihomed hosts and to be
644 	*  able to use lo: interfaces for drbd.
645 	* Make sure to use 0 as port number, so linux selects
646 	*  a free one dynamically.
647 	*/
648 	what = "bind before connect";
649 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
650 	if (err < 0)
651 		goto out;
652 
653 	/* connect may fail, peer not yet available.
654 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
655 	disconnect_on_error = 0;
656 	what = "connect";
657 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
658 
659 out:
660 	if (err < 0) {
661 		if (sock) {
662 			sock_release(sock);
663 			sock = NULL;
664 		}
665 		switch (-err) {
666 			/* timeout, busy, signal pending */
667 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
668 		case EINTR: case ERESTARTSYS:
669 			/* peer not (yet) available, network problem */
670 		case ECONNREFUSED: case ENETUNREACH:
671 		case EHOSTDOWN:    case EHOSTUNREACH:
672 			disconnect_on_error = 0;
673 			break;
674 		default:
675 			drbd_err(connection, "%s failed, err = %d\n", what, err);
676 		}
677 		if (disconnect_on_error)
678 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
679 	}
680 
681 	return sock;
682 }
683 
684 struct accept_wait_data {
685 	struct drbd_connection *connection;
686 	struct socket *s_listen;
687 	struct completion door_bell;
688 	void (*original_sk_state_change)(struct sock *sk);
689 
690 };
691 
692 static void drbd_incoming_connection(struct sock *sk)
693 {
694 	struct accept_wait_data *ad = sk->sk_user_data;
695 	void (*state_change)(struct sock *sk);
696 
697 	state_change = ad->original_sk_state_change;
698 	if (sk->sk_state == TCP_ESTABLISHED)
699 		complete(&ad->door_bell);
700 	state_change(sk);
701 }
702 
703 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
704 {
705 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
706 	struct sockaddr_in6 my_addr;
707 	struct socket *s_listen;
708 	struct net_conf *nc;
709 	const char *what;
710 
711 	rcu_read_lock();
712 	nc = rcu_dereference(connection->net_conf);
713 	if (!nc) {
714 		rcu_read_unlock();
715 		return -EIO;
716 	}
717 	sndbuf_size = nc->sndbuf_size;
718 	rcvbuf_size = nc->rcvbuf_size;
719 	rcu_read_unlock();
720 
721 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
722 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
723 
724 	what = "sock_create_kern";
725 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
726 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
727 	if (err) {
728 		s_listen = NULL;
729 		goto out;
730 	}
731 
732 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
733 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
734 
735 	what = "bind before listen";
736 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
737 	if (err < 0)
738 		goto out;
739 
740 	ad->s_listen = s_listen;
741 	write_lock_bh(&s_listen->sk->sk_callback_lock);
742 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
743 	s_listen->sk->sk_state_change = drbd_incoming_connection;
744 	s_listen->sk->sk_user_data = ad;
745 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
746 
747 	what = "listen";
748 	err = s_listen->ops->listen(s_listen, 5);
749 	if (err < 0)
750 		goto out;
751 
752 	return 0;
753 out:
754 	if (s_listen)
755 		sock_release(s_listen);
756 	if (err < 0) {
757 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
758 			drbd_err(connection, "%s failed, err = %d\n", what, err);
759 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
760 		}
761 	}
762 
763 	return -EIO;
764 }
765 
766 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
767 {
768 	write_lock_bh(&sk->sk_callback_lock);
769 	sk->sk_state_change = ad->original_sk_state_change;
770 	sk->sk_user_data = NULL;
771 	write_unlock_bh(&sk->sk_callback_lock);
772 }
773 
774 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
775 {
776 	int timeo, connect_int, err = 0;
777 	struct socket *s_estab = NULL;
778 	struct net_conf *nc;
779 
780 	rcu_read_lock();
781 	nc = rcu_dereference(connection->net_conf);
782 	if (!nc) {
783 		rcu_read_unlock();
784 		return NULL;
785 	}
786 	connect_int = nc->connect_int;
787 	rcu_read_unlock();
788 
789 	timeo = connect_int * HZ;
790 	/* 28.5% random jitter */
791 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
792 
793 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
794 	if (err <= 0)
795 		return NULL;
796 
797 	err = kernel_accept(ad->s_listen, &s_estab, 0);
798 	if (err < 0) {
799 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
800 			drbd_err(connection, "accept failed, err = %d\n", err);
801 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
802 		}
803 	}
804 
805 	if (s_estab)
806 		unregister_state_change(s_estab->sk, ad);
807 
808 	return s_estab;
809 }
810 
811 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
812 
813 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
814 			     enum drbd_packet cmd)
815 {
816 	if (!conn_prepare_command(connection, sock))
817 		return -EIO;
818 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
819 }
820 
821 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
822 {
823 	unsigned int header_size = drbd_header_size(connection);
824 	struct packet_info pi;
825 	struct net_conf *nc;
826 	int err;
827 
828 	rcu_read_lock();
829 	nc = rcu_dereference(connection->net_conf);
830 	if (!nc) {
831 		rcu_read_unlock();
832 		return -EIO;
833 	}
834 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
835 	rcu_read_unlock();
836 
837 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
838 	if (err != header_size) {
839 		if (err >= 0)
840 			err = -EIO;
841 		return err;
842 	}
843 	err = decode_header(connection, connection->data.rbuf, &pi);
844 	if (err)
845 		return err;
846 	return pi.cmd;
847 }
848 
849 /**
850  * drbd_socket_okay() - Free the socket if its connection is not okay
851  * @sock:	pointer to the pointer to the socket.
852  */
853 static bool drbd_socket_okay(struct socket **sock)
854 {
855 	int rr;
856 	char tb[4];
857 
858 	if (!*sock)
859 		return false;
860 
861 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
862 
863 	if (rr > 0 || rr == -EAGAIN) {
864 		return true;
865 	} else {
866 		sock_release(*sock);
867 		*sock = NULL;
868 		return false;
869 	}
870 }
871 
872 static bool connection_established(struct drbd_connection *connection,
873 				   struct socket **sock1,
874 				   struct socket **sock2)
875 {
876 	struct net_conf *nc;
877 	int timeout;
878 	bool ok;
879 
880 	if (!*sock1 || !*sock2)
881 		return false;
882 
883 	rcu_read_lock();
884 	nc = rcu_dereference(connection->net_conf);
885 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
886 	rcu_read_unlock();
887 	schedule_timeout_interruptible(timeout);
888 
889 	ok = drbd_socket_okay(sock1);
890 	ok = drbd_socket_okay(sock2) && ok;
891 
892 	return ok;
893 }
894 
895 /* Gets called if a connection is established, or if a new minor gets created
896    in a connection */
897 int drbd_connected(struct drbd_peer_device *peer_device)
898 {
899 	struct drbd_device *device = peer_device->device;
900 	int err;
901 
902 	atomic_set(&device->packet_seq, 0);
903 	device->peer_seq = 0;
904 
905 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
906 		&peer_device->connection->cstate_mutex :
907 		&device->own_state_mutex;
908 
909 	err = drbd_send_sync_param(peer_device);
910 	if (!err)
911 		err = drbd_send_sizes(peer_device, 0, 0);
912 	if (!err)
913 		err = drbd_send_uuids(peer_device);
914 	if (!err)
915 		err = drbd_send_current_state(peer_device);
916 	clear_bit(USE_DEGR_WFC_T, &device->flags);
917 	clear_bit(RESIZE_PENDING, &device->flags);
918 	atomic_set(&device->ap_in_flight, 0);
919 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
920 	return err;
921 }
922 
923 /*
924  * return values:
925  *   1 yes, we have a valid connection
926  *   0 oops, did not work out, please try again
927  *  -1 peer talks different language,
928  *     no point in trying again, please go standalone.
929  *  -2 We do not have a network config...
930  */
931 static int conn_connect(struct drbd_connection *connection)
932 {
933 	struct drbd_socket sock, msock;
934 	struct drbd_peer_device *peer_device;
935 	struct net_conf *nc;
936 	int vnr, timeout, h;
937 	bool discard_my_data, ok;
938 	enum drbd_state_rv rv;
939 	struct accept_wait_data ad = {
940 		.connection = connection,
941 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
942 	};
943 
944 	clear_bit(DISCONNECT_SENT, &connection->flags);
945 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
946 		return -2;
947 
948 	mutex_init(&sock.mutex);
949 	sock.sbuf = connection->data.sbuf;
950 	sock.rbuf = connection->data.rbuf;
951 	sock.socket = NULL;
952 	mutex_init(&msock.mutex);
953 	msock.sbuf = connection->meta.sbuf;
954 	msock.rbuf = connection->meta.rbuf;
955 	msock.socket = NULL;
956 
957 	/* Assume that the peer only understands protocol 80 until we know better.  */
958 	connection->agreed_pro_version = 80;
959 
960 	if (prepare_listen_socket(connection, &ad))
961 		return 0;
962 
963 	do {
964 		struct socket *s;
965 
966 		s = drbd_try_connect(connection);
967 		if (s) {
968 			if (!sock.socket) {
969 				sock.socket = s;
970 				send_first_packet(connection, &sock, P_INITIAL_DATA);
971 			} else if (!msock.socket) {
972 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
973 				msock.socket = s;
974 				send_first_packet(connection, &msock, P_INITIAL_META);
975 			} else {
976 				drbd_err(connection, "Logic error in conn_connect()\n");
977 				goto out_release_sockets;
978 			}
979 		}
980 
981 		if (connection_established(connection, &sock.socket, &msock.socket))
982 			break;
983 
984 retry:
985 		s = drbd_wait_for_connect(connection, &ad);
986 		if (s) {
987 			int fp = receive_first_packet(connection, s);
988 			drbd_socket_okay(&sock.socket);
989 			drbd_socket_okay(&msock.socket);
990 			switch (fp) {
991 			case P_INITIAL_DATA:
992 				if (sock.socket) {
993 					drbd_warn(connection, "initial packet S crossed\n");
994 					sock_release(sock.socket);
995 					sock.socket = s;
996 					goto randomize;
997 				}
998 				sock.socket = s;
999 				break;
1000 			case P_INITIAL_META:
1001 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1002 				if (msock.socket) {
1003 					drbd_warn(connection, "initial packet M crossed\n");
1004 					sock_release(msock.socket);
1005 					msock.socket = s;
1006 					goto randomize;
1007 				}
1008 				msock.socket = s;
1009 				break;
1010 			default:
1011 				drbd_warn(connection, "Error receiving initial packet\n");
1012 				sock_release(s);
1013 randomize:
1014 				if (prandom_u32() & 1)
1015 					goto retry;
1016 			}
1017 		}
1018 
1019 		if (connection->cstate <= C_DISCONNECTING)
1020 			goto out_release_sockets;
1021 		if (signal_pending(current)) {
1022 			flush_signals(current);
1023 			smp_rmb();
1024 			if (get_t_state(&connection->receiver) == EXITING)
1025 				goto out_release_sockets;
1026 		}
1027 
1028 		ok = connection_established(connection, &sock.socket, &msock.socket);
1029 	} while (!ok);
1030 
1031 	if (ad.s_listen)
1032 		sock_release(ad.s_listen);
1033 
1034 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1035 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036 
1037 	sock.socket->sk->sk_allocation = GFP_NOIO;
1038 	msock.socket->sk->sk_allocation = GFP_NOIO;
1039 
1040 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1041 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1042 
1043 	/* NOT YET ...
1044 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1045 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1046 	 * first set it to the P_CONNECTION_FEATURES timeout,
1047 	 * which we set to 4x the configured ping_timeout. */
1048 	rcu_read_lock();
1049 	nc = rcu_dereference(connection->net_conf);
1050 
1051 	sock.socket->sk->sk_sndtimeo =
1052 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1053 
1054 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1055 	timeout = nc->timeout * HZ / 10;
1056 	discard_my_data = nc->discard_my_data;
1057 	rcu_read_unlock();
1058 
1059 	msock.socket->sk->sk_sndtimeo = timeout;
1060 
1061 	/* we don't want delays.
1062 	 * we use TCP_CORK where appropriate, though */
1063 	drbd_tcp_nodelay(sock.socket);
1064 	drbd_tcp_nodelay(msock.socket);
1065 
1066 	connection->data.socket = sock.socket;
1067 	connection->meta.socket = msock.socket;
1068 	connection->last_received = jiffies;
1069 
1070 	h = drbd_do_features(connection);
1071 	if (h <= 0)
1072 		return h;
1073 
1074 	if (connection->cram_hmac_tfm) {
1075 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1076 		switch (drbd_do_auth(connection)) {
1077 		case -1:
1078 			drbd_err(connection, "Authentication of peer failed\n");
1079 			return -1;
1080 		case 0:
1081 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1082 			return 0;
1083 		}
1084 	}
1085 
1086 	connection->data.socket->sk->sk_sndtimeo = timeout;
1087 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1088 
1089 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1090 		return -1;
1091 
1092 	/* Prevent a race between resync-handshake and
1093 	 * being promoted to Primary.
1094 	 *
1095 	 * Grab and release the state mutex, so we know that any current
1096 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1097 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1098 	 */
1099 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1100 		mutex_lock(peer_device->device->state_mutex);
1101 
1102 	set_bit(STATE_SENT, &connection->flags);
1103 
1104 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1105 		mutex_unlock(peer_device->device->state_mutex);
1106 
1107 	rcu_read_lock();
1108 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1109 		struct drbd_device *device = peer_device->device;
1110 		kref_get(&device->kref);
1111 		rcu_read_unlock();
1112 
1113 		if (discard_my_data)
1114 			set_bit(DISCARD_MY_DATA, &device->flags);
1115 		else
1116 			clear_bit(DISCARD_MY_DATA, &device->flags);
1117 
1118 		drbd_connected(peer_device);
1119 		kref_put(&device->kref, drbd_destroy_device);
1120 		rcu_read_lock();
1121 	}
1122 	rcu_read_unlock();
1123 
1124 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1125 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1126 		clear_bit(STATE_SENT, &connection->flags);
1127 		return 0;
1128 	}
1129 
1130 	drbd_thread_start(&connection->ack_receiver);
1131 	/* opencoded create_singlethread_workqueue(),
1132 	 * to be able to use format string arguments */
1133 	connection->ack_sender =
1134 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1135 	if (!connection->ack_sender) {
1136 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1137 		return 0;
1138 	}
1139 
1140 	mutex_lock(&connection->resource->conf_update);
1141 	/* The discard_my_data flag is a single-shot modifier to the next
1142 	 * connection attempt, the handshake of which is now well underway.
1143 	 * No need for rcu style copying of the whole struct
1144 	 * just to clear a single value. */
1145 	connection->net_conf->discard_my_data = 0;
1146 	mutex_unlock(&connection->resource->conf_update);
1147 
1148 	return h;
1149 
1150 out_release_sockets:
1151 	if (ad.s_listen)
1152 		sock_release(ad.s_listen);
1153 	if (sock.socket)
1154 		sock_release(sock.socket);
1155 	if (msock.socket)
1156 		sock_release(msock.socket);
1157 	return -1;
1158 }
1159 
1160 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1161 {
1162 	unsigned int header_size = drbd_header_size(connection);
1163 
1164 	if (header_size == sizeof(struct p_header100) &&
1165 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1166 		struct p_header100 *h = header;
1167 		if (h->pad != 0) {
1168 			drbd_err(connection, "Header padding is not zero\n");
1169 			return -EINVAL;
1170 		}
1171 		pi->vnr = be16_to_cpu(h->volume);
1172 		pi->cmd = be16_to_cpu(h->command);
1173 		pi->size = be32_to_cpu(h->length);
1174 	} else if (header_size == sizeof(struct p_header95) &&
1175 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1176 		struct p_header95 *h = header;
1177 		pi->cmd = be16_to_cpu(h->command);
1178 		pi->size = be32_to_cpu(h->length);
1179 		pi->vnr = 0;
1180 	} else if (header_size == sizeof(struct p_header80) &&
1181 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1182 		struct p_header80 *h = header;
1183 		pi->cmd = be16_to_cpu(h->command);
1184 		pi->size = be16_to_cpu(h->length);
1185 		pi->vnr = 0;
1186 	} else {
1187 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1188 			 be32_to_cpu(*(__be32 *)header),
1189 			 connection->agreed_pro_version);
1190 		return -EINVAL;
1191 	}
1192 	pi->data = header + header_size;
1193 	return 0;
1194 }
1195 
1196 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1197 {
1198 	void *buffer = connection->data.rbuf;
1199 	int err;
1200 
1201 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1202 	if (err)
1203 		return err;
1204 
1205 	err = decode_header(connection, buffer, pi);
1206 	connection->last_received = jiffies;
1207 
1208 	return err;
1209 }
1210 
1211 /* This is blkdev_issue_flush, but asynchronous.
1212  * We want to submit to all component volumes in parallel,
1213  * then wait for all completions.
1214  */
1215 struct issue_flush_context {
1216 	atomic_t pending;
1217 	int error;
1218 	struct completion done;
1219 };
1220 struct one_flush_context {
1221 	struct drbd_device *device;
1222 	struct issue_flush_context *ctx;
1223 };
1224 
1225 void one_flush_endio(struct bio *bio)
1226 {
1227 	struct one_flush_context *octx = bio->bi_private;
1228 	struct drbd_device *device = octx->device;
1229 	struct issue_flush_context *ctx = octx->ctx;
1230 
1231 	if (bio->bi_error) {
1232 		ctx->error = bio->bi_error;
1233 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1234 	}
1235 	kfree(octx);
1236 	bio_put(bio);
1237 
1238 	clear_bit(FLUSH_PENDING, &device->flags);
1239 	put_ldev(device);
1240 	kref_put(&device->kref, drbd_destroy_device);
1241 
1242 	if (atomic_dec_and_test(&ctx->pending))
1243 		complete(&ctx->done);
1244 }
1245 
1246 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1247 {
1248 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1249 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1250 	if (!bio || !octx) {
1251 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1252 		/* FIXME: what else can I do now?  disconnecting or detaching
1253 		 * really does not help to improve the state of the world, either.
1254 		 */
1255 		kfree(octx);
1256 		if (bio)
1257 			bio_put(bio);
1258 
1259 		ctx->error = -ENOMEM;
1260 		put_ldev(device);
1261 		kref_put(&device->kref, drbd_destroy_device);
1262 		return;
1263 	}
1264 
1265 	octx->device = device;
1266 	octx->ctx = ctx;
1267 	bio->bi_bdev = device->ldev->backing_bdev;
1268 	bio->bi_private = octx;
1269 	bio->bi_end_io = one_flush_endio;
1270 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1271 
1272 	device->flush_jif = jiffies;
1273 	set_bit(FLUSH_PENDING, &device->flags);
1274 	atomic_inc(&ctx->pending);
1275 	submit_bio(bio);
1276 }
1277 
1278 static void drbd_flush(struct drbd_connection *connection)
1279 {
1280 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1281 		struct drbd_peer_device *peer_device;
1282 		struct issue_flush_context ctx;
1283 		int vnr;
1284 
1285 		atomic_set(&ctx.pending, 1);
1286 		ctx.error = 0;
1287 		init_completion(&ctx.done);
1288 
1289 		rcu_read_lock();
1290 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1291 			struct drbd_device *device = peer_device->device;
1292 
1293 			if (!get_ldev(device))
1294 				continue;
1295 			kref_get(&device->kref);
1296 			rcu_read_unlock();
1297 
1298 			submit_one_flush(device, &ctx);
1299 
1300 			rcu_read_lock();
1301 		}
1302 		rcu_read_unlock();
1303 
1304 		/* Do we want to add a timeout,
1305 		 * if disk-timeout is set? */
1306 		if (!atomic_dec_and_test(&ctx.pending))
1307 			wait_for_completion(&ctx.done);
1308 
1309 		if (ctx.error) {
1310 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1311 			 * don't try again for ANY return value != 0
1312 			 * if (rv == -EOPNOTSUPP) */
1313 			/* Any error is already reported by bio_endio callback. */
1314 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1315 		}
1316 	}
1317 }
1318 
1319 /**
1320  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1321  * @device:	DRBD device.
1322  * @epoch:	Epoch object.
1323  * @ev:		Epoch event.
1324  */
1325 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1326 					       struct drbd_epoch *epoch,
1327 					       enum epoch_event ev)
1328 {
1329 	int epoch_size;
1330 	struct drbd_epoch *next_epoch;
1331 	enum finish_epoch rv = FE_STILL_LIVE;
1332 
1333 	spin_lock(&connection->epoch_lock);
1334 	do {
1335 		next_epoch = NULL;
1336 
1337 		epoch_size = atomic_read(&epoch->epoch_size);
1338 
1339 		switch (ev & ~EV_CLEANUP) {
1340 		case EV_PUT:
1341 			atomic_dec(&epoch->active);
1342 			break;
1343 		case EV_GOT_BARRIER_NR:
1344 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1345 			break;
1346 		case EV_BECAME_LAST:
1347 			/* nothing to do*/
1348 			break;
1349 		}
1350 
1351 		if (epoch_size != 0 &&
1352 		    atomic_read(&epoch->active) == 0 &&
1353 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1354 			if (!(ev & EV_CLEANUP)) {
1355 				spin_unlock(&connection->epoch_lock);
1356 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1357 				spin_lock(&connection->epoch_lock);
1358 			}
1359 #if 0
1360 			/* FIXME: dec unacked on connection, once we have
1361 			 * something to count pending connection packets in. */
1362 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1363 				dec_unacked(epoch->connection);
1364 #endif
1365 
1366 			if (connection->current_epoch != epoch) {
1367 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1368 				list_del(&epoch->list);
1369 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1370 				connection->epochs--;
1371 				kfree(epoch);
1372 
1373 				if (rv == FE_STILL_LIVE)
1374 					rv = FE_DESTROYED;
1375 			} else {
1376 				epoch->flags = 0;
1377 				atomic_set(&epoch->epoch_size, 0);
1378 				/* atomic_set(&epoch->active, 0); is already zero */
1379 				if (rv == FE_STILL_LIVE)
1380 					rv = FE_RECYCLED;
1381 			}
1382 		}
1383 
1384 		if (!next_epoch)
1385 			break;
1386 
1387 		epoch = next_epoch;
1388 	} while (1);
1389 
1390 	spin_unlock(&connection->epoch_lock);
1391 
1392 	return rv;
1393 }
1394 
1395 static enum write_ordering_e
1396 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1397 {
1398 	struct disk_conf *dc;
1399 
1400 	dc = rcu_dereference(bdev->disk_conf);
1401 
1402 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1403 		wo = WO_DRAIN_IO;
1404 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1405 		wo = WO_NONE;
1406 
1407 	return wo;
1408 }
1409 
1410 /**
1411  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1412  * @connection:	DRBD connection.
1413  * @wo:		Write ordering method to try.
1414  */
1415 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1416 			      enum write_ordering_e wo)
1417 {
1418 	struct drbd_device *device;
1419 	enum write_ordering_e pwo;
1420 	int vnr;
1421 	static char *write_ordering_str[] = {
1422 		[WO_NONE] = "none",
1423 		[WO_DRAIN_IO] = "drain",
1424 		[WO_BDEV_FLUSH] = "flush",
1425 	};
1426 
1427 	pwo = resource->write_ordering;
1428 	if (wo != WO_BDEV_FLUSH)
1429 		wo = min(pwo, wo);
1430 	rcu_read_lock();
1431 	idr_for_each_entry(&resource->devices, device, vnr) {
1432 		if (get_ldev(device)) {
1433 			wo = max_allowed_wo(device->ldev, wo);
1434 			if (device->ldev == bdev)
1435 				bdev = NULL;
1436 			put_ldev(device);
1437 		}
1438 	}
1439 
1440 	if (bdev)
1441 		wo = max_allowed_wo(bdev, wo);
1442 
1443 	rcu_read_unlock();
1444 
1445 	resource->write_ordering = wo;
1446 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1447 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1448 }
1449 
1450 /*
1451  * We *may* ignore the discard-zeroes-data setting, if so configured.
1452  *
1453  * Assumption is that it "discard_zeroes_data=0" is only because the backend
1454  * may ignore partial unaligned discards.
1455  *
1456  * LVM/DM thin as of at least
1457  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1458  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1459  *   Driver version:  4.29.0
1460  * still behaves this way.
1461  *
1462  * For unaligned (wrt. alignment and granularity) or too small discards,
1463  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1464  * but discard all the aligned full chunks.
1465  *
1466  * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1467  */
1468 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1469 {
1470 	struct block_device *bdev = device->ldev->backing_bdev;
1471 	struct request_queue *q = bdev_get_queue(bdev);
1472 	sector_t tmp, nr;
1473 	unsigned int max_discard_sectors, granularity;
1474 	int alignment;
1475 	int err = 0;
1476 
1477 	if (!discard)
1478 		goto zero_out;
1479 
1480 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1481 	granularity = max(q->limits.discard_granularity >> 9, 1U);
1482 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1483 
1484 	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1485 	max_discard_sectors -= max_discard_sectors % granularity;
1486 	if (unlikely(!max_discard_sectors))
1487 		goto zero_out;
1488 
1489 	if (nr_sectors < granularity)
1490 		goto zero_out;
1491 
1492 	tmp = start;
1493 	if (sector_div(tmp, granularity) != alignment) {
1494 		if (nr_sectors < 2*granularity)
1495 			goto zero_out;
1496 		/* start + gran - (start + gran - align) % gran */
1497 		tmp = start + granularity - alignment;
1498 		tmp = start + granularity - sector_div(tmp, granularity);
1499 
1500 		nr = tmp - start;
1501 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1502 		nr_sectors -= nr;
1503 		start = tmp;
1504 	}
1505 	while (nr_sectors >= granularity) {
1506 		nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1507 		err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1508 		nr_sectors -= nr;
1509 		start += nr;
1510 	}
1511  zero_out:
1512 	if (nr_sectors) {
1513 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1514 	}
1515 	return err != 0;
1516 }
1517 
1518 static bool can_do_reliable_discards(struct drbd_device *device)
1519 {
1520 	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1521 	struct disk_conf *dc;
1522 	bool can_do;
1523 
1524 	if (!blk_queue_discard(q))
1525 		return false;
1526 
1527 	if (q->limits.discard_zeroes_data)
1528 		return true;
1529 
1530 	rcu_read_lock();
1531 	dc = rcu_dereference(device->ldev->disk_conf);
1532 	can_do = dc->discard_zeroes_if_aligned;
1533 	rcu_read_unlock();
1534 	return can_do;
1535 }
1536 
1537 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1538 {
1539 	/* If the backend cannot discard, or does not guarantee
1540 	 * read-back zeroes in discarded ranges, we fall back to
1541 	 * zero-out.  Unless configuration specifically requested
1542 	 * otherwise. */
1543 	if (!can_do_reliable_discards(device))
1544 		peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1545 
1546 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1547 	    peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1548 		peer_req->flags |= EE_WAS_ERROR;
1549 	drbd_endio_write_sec_final(peer_req);
1550 }
1551 
1552 static void drbd_issue_peer_wsame(struct drbd_device *device,
1553 				  struct drbd_peer_request *peer_req)
1554 {
1555 	struct block_device *bdev = device->ldev->backing_bdev;
1556 	sector_t s = peer_req->i.sector;
1557 	sector_t nr = peer_req->i.size >> 9;
1558 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1559 		peer_req->flags |= EE_WAS_ERROR;
1560 	drbd_endio_write_sec_final(peer_req);
1561 }
1562 
1563 
1564 /**
1565  * drbd_submit_peer_request()
1566  * @device:	DRBD device.
1567  * @peer_req:	peer request
1568  * @rw:		flag field, see bio->bi_opf
1569  *
1570  * May spread the pages to multiple bios,
1571  * depending on bio_add_page restrictions.
1572  *
1573  * Returns 0 if all bios have been submitted,
1574  * -ENOMEM if we could not allocate enough bios,
1575  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1576  *  single page to an empty bio (which should never happen and likely indicates
1577  *  that the lower level IO stack is in some way broken). This has been observed
1578  *  on certain Xen deployments.
1579  */
1580 /* TODO allocate from our own bio_set. */
1581 int drbd_submit_peer_request(struct drbd_device *device,
1582 			     struct drbd_peer_request *peer_req,
1583 			     const unsigned op, const unsigned op_flags,
1584 			     const int fault_type)
1585 {
1586 	struct bio *bios = NULL;
1587 	struct bio *bio;
1588 	struct page *page = peer_req->pages;
1589 	sector_t sector = peer_req->i.sector;
1590 	unsigned data_size = peer_req->i.size;
1591 	unsigned n_bios = 0;
1592 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1593 	int err = -ENOMEM;
1594 
1595 	/* TRIM/DISCARD: for now, always use the helper function
1596 	 * blkdev_issue_zeroout(..., discard=true).
1597 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1598 	 * Correctness first, performance later.  Next step is to code an
1599 	 * asynchronous variant of the same.
1600 	 */
1601 	if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1602 		/* wait for all pending IO completions, before we start
1603 		 * zeroing things out. */
1604 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1605 		/* add it to the active list now,
1606 		 * so we can find it to present it in debugfs */
1607 		peer_req->submit_jif = jiffies;
1608 		peer_req->flags |= EE_SUBMITTED;
1609 
1610 		/* If this was a resync request from receive_rs_deallocated(),
1611 		 * it is already on the sync_ee list */
1612 		if (list_empty(&peer_req->w.list)) {
1613 			spin_lock_irq(&device->resource->req_lock);
1614 			list_add_tail(&peer_req->w.list, &device->active_ee);
1615 			spin_unlock_irq(&device->resource->req_lock);
1616 		}
1617 
1618 		if (peer_req->flags & EE_IS_TRIM)
1619 			drbd_issue_peer_discard(device, peer_req);
1620 		else /* EE_WRITE_SAME */
1621 			drbd_issue_peer_wsame(device, peer_req);
1622 		return 0;
1623 	}
1624 
1625 	/* In most cases, we will only need one bio.  But in case the lower
1626 	 * level restrictions happen to be different at this offset on this
1627 	 * side than those of the sending peer, we may need to submit the
1628 	 * request in more than one bio.
1629 	 *
1630 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1631 	 * generated bio, but a bio allocated on behalf of the peer.
1632 	 */
1633 next_bio:
1634 	bio = bio_alloc(GFP_NOIO, nr_pages);
1635 	if (!bio) {
1636 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1637 		goto fail;
1638 	}
1639 	/* > peer_req->i.sector, unless this is the first bio */
1640 	bio->bi_iter.bi_sector = sector;
1641 	bio->bi_bdev = device->ldev->backing_bdev;
1642 	bio_set_op_attrs(bio, op, op_flags);
1643 	bio->bi_private = peer_req;
1644 	bio->bi_end_io = drbd_peer_request_endio;
1645 
1646 	bio->bi_next = bios;
1647 	bios = bio;
1648 	++n_bios;
1649 
1650 	page_chain_for_each(page) {
1651 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1652 		if (!bio_add_page(bio, page, len, 0))
1653 			goto next_bio;
1654 		data_size -= len;
1655 		sector += len >> 9;
1656 		--nr_pages;
1657 	}
1658 	D_ASSERT(device, data_size == 0);
1659 	D_ASSERT(device, page == NULL);
1660 
1661 	atomic_set(&peer_req->pending_bios, n_bios);
1662 	/* for debugfs: update timestamp, mark as submitted */
1663 	peer_req->submit_jif = jiffies;
1664 	peer_req->flags |= EE_SUBMITTED;
1665 	do {
1666 		bio = bios;
1667 		bios = bios->bi_next;
1668 		bio->bi_next = NULL;
1669 
1670 		drbd_generic_make_request(device, fault_type, bio);
1671 	} while (bios);
1672 	return 0;
1673 
1674 fail:
1675 	while (bios) {
1676 		bio = bios;
1677 		bios = bios->bi_next;
1678 		bio_put(bio);
1679 	}
1680 	return err;
1681 }
1682 
1683 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1684 					     struct drbd_peer_request *peer_req)
1685 {
1686 	struct drbd_interval *i = &peer_req->i;
1687 
1688 	drbd_remove_interval(&device->write_requests, i);
1689 	drbd_clear_interval(i);
1690 
1691 	/* Wake up any processes waiting for this peer request to complete.  */
1692 	if (i->waiting)
1693 		wake_up(&device->misc_wait);
1694 }
1695 
1696 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1697 {
1698 	struct drbd_peer_device *peer_device;
1699 	int vnr;
1700 
1701 	rcu_read_lock();
1702 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1703 		struct drbd_device *device = peer_device->device;
1704 
1705 		kref_get(&device->kref);
1706 		rcu_read_unlock();
1707 		drbd_wait_ee_list_empty(device, &device->active_ee);
1708 		kref_put(&device->kref, drbd_destroy_device);
1709 		rcu_read_lock();
1710 	}
1711 	rcu_read_unlock();
1712 }
1713 
1714 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1715 {
1716 	int rv;
1717 	struct p_barrier *p = pi->data;
1718 	struct drbd_epoch *epoch;
1719 
1720 	/* FIXME these are unacked on connection,
1721 	 * not a specific (peer)device.
1722 	 */
1723 	connection->current_epoch->barrier_nr = p->barrier;
1724 	connection->current_epoch->connection = connection;
1725 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1726 
1727 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1728 	 * the activity log, which means it would not be resynced in case the
1729 	 * R_PRIMARY crashes now.
1730 	 * Therefore we must send the barrier_ack after the barrier request was
1731 	 * completed. */
1732 	switch (connection->resource->write_ordering) {
1733 	case WO_NONE:
1734 		if (rv == FE_RECYCLED)
1735 			return 0;
1736 
1737 		/* receiver context, in the writeout path of the other node.
1738 		 * avoid potential distributed deadlock */
1739 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1740 		if (epoch)
1741 			break;
1742 		else
1743 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1744 			/* Fall through */
1745 
1746 	case WO_BDEV_FLUSH:
1747 	case WO_DRAIN_IO:
1748 		conn_wait_active_ee_empty(connection);
1749 		drbd_flush(connection);
1750 
1751 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1752 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1753 			if (epoch)
1754 				break;
1755 		}
1756 
1757 		return 0;
1758 	default:
1759 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1760 			 connection->resource->write_ordering);
1761 		return -EIO;
1762 	}
1763 
1764 	epoch->flags = 0;
1765 	atomic_set(&epoch->epoch_size, 0);
1766 	atomic_set(&epoch->active, 0);
1767 
1768 	spin_lock(&connection->epoch_lock);
1769 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1770 		list_add(&epoch->list, &connection->current_epoch->list);
1771 		connection->current_epoch = epoch;
1772 		connection->epochs++;
1773 	} else {
1774 		/* The current_epoch got recycled while we allocated this one... */
1775 		kfree(epoch);
1776 	}
1777 	spin_unlock(&connection->epoch_lock);
1778 
1779 	return 0;
1780 }
1781 
1782 /* quick wrapper in case payload size != request_size (write same) */
1783 static void drbd_csum_ee_size(struct crypto_ahash *h,
1784 			      struct drbd_peer_request *r, void *d,
1785 			      unsigned int payload_size)
1786 {
1787 	unsigned int tmp = r->i.size;
1788 	r->i.size = payload_size;
1789 	drbd_csum_ee(h, r, d);
1790 	r->i.size = tmp;
1791 }
1792 
1793 /* used from receive_RSDataReply (recv_resync_read)
1794  * and from receive_Data.
1795  * data_size: actual payload ("data in")
1796  * 	for normal writes that is bi_size.
1797  * 	for discards, that is zero.
1798  * 	for write same, it is logical_block_size.
1799  * both trim and write same have the bi_size ("data len to be affected")
1800  * as extra argument in the packet header.
1801  */
1802 static struct drbd_peer_request *
1803 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1804 	      struct packet_info *pi) __must_hold(local)
1805 {
1806 	struct drbd_device *device = peer_device->device;
1807 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1808 	struct drbd_peer_request *peer_req;
1809 	struct page *page;
1810 	int digest_size, err;
1811 	unsigned int data_size = pi->size, ds;
1812 	void *dig_in = peer_device->connection->int_dig_in;
1813 	void *dig_vv = peer_device->connection->int_dig_vv;
1814 	unsigned long *data;
1815 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1816 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1817 
1818 	digest_size = 0;
1819 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1820 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1821 		/*
1822 		 * FIXME: Receive the incoming digest into the receive buffer
1823 		 *	  here, together with its struct p_data?
1824 		 */
1825 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1826 		if (err)
1827 			return NULL;
1828 		data_size -= digest_size;
1829 	}
1830 
1831 	/* assume request_size == data_size, but special case trim and wsame. */
1832 	ds = data_size;
1833 	if (trim) {
1834 		if (!expect(data_size == 0))
1835 			return NULL;
1836 		ds = be32_to_cpu(trim->size);
1837 	} else if (wsame) {
1838 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1839 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1840 				data_size, queue_logical_block_size(device->rq_queue));
1841 			return NULL;
1842 		}
1843 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1844 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1845 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1846 			return NULL;
1847 		}
1848 		ds = be32_to_cpu(wsame->size);
1849 	}
1850 
1851 	if (!expect(IS_ALIGNED(ds, 512)))
1852 		return NULL;
1853 	if (trim || wsame) {
1854 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1855 			return NULL;
1856 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1857 		return NULL;
1858 
1859 	/* even though we trust out peer,
1860 	 * we sometimes have to double check. */
1861 	if (sector + (ds>>9) > capacity) {
1862 		drbd_err(device, "request from peer beyond end of local disk: "
1863 			"capacity: %llus < sector: %llus + size: %u\n",
1864 			(unsigned long long)capacity,
1865 			(unsigned long long)sector, ds);
1866 		return NULL;
1867 	}
1868 
1869 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1870 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1871 	 * which in turn might block on the other node at this very place.  */
1872 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1873 	if (!peer_req)
1874 		return NULL;
1875 
1876 	peer_req->flags |= EE_WRITE;
1877 	if (trim) {
1878 		peer_req->flags |= EE_IS_TRIM;
1879 		return peer_req;
1880 	}
1881 	if (wsame)
1882 		peer_req->flags |= EE_WRITE_SAME;
1883 
1884 	/* receive payload size bytes into page chain */
1885 	ds = data_size;
1886 	page = peer_req->pages;
1887 	page_chain_for_each(page) {
1888 		unsigned len = min_t(int, ds, PAGE_SIZE);
1889 		data = kmap(page);
1890 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1891 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1892 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1893 			data[0] = data[0] ^ (unsigned long)-1;
1894 		}
1895 		kunmap(page);
1896 		if (err) {
1897 			drbd_free_peer_req(device, peer_req);
1898 			return NULL;
1899 		}
1900 		ds -= len;
1901 	}
1902 
1903 	if (digest_size) {
1904 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1905 		if (memcmp(dig_in, dig_vv, digest_size)) {
1906 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1907 				(unsigned long long)sector, data_size);
1908 			drbd_free_peer_req(device, peer_req);
1909 			return NULL;
1910 		}
1911 	}
1912 	device->recv_cnt += data_size >> 9;
1913 	return peer_req;
1914 }
1915 
1916 /* drbd_drain_block() just takes a data block
1917  * out of the socket input buffer, and discards it.
1918  */
1919 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1920 {
1921 	struct page *page;
1922 	int err = 0;
1923 	void *data;
1924 
1925 	if (!data_size)
1926 		return 0;
1927 
1928 	page = drbd_alloc_pages(peer_device, 1, 1);
1929 
1930 	data = kmap(page);
1931 	while (data_size) {
1932 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1933 
1934 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1935 		if (err)
1936 			break;
1937 		data_size -= len;
1938 	}
1939 	kunmap(page);
1940 	drbd_free_pages(peer_device->device, page, 0);
1941 	return err;
1942 }
1943 
1944 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1945 			   sector_t sector, int data_size)
1946 {
1947 	struct bio_vec bvec;
1948 	struct bvec_iter iter;
1949 	struct bio *bio;
1950 	int digest_size, err, expect;
1951 	void *dig_in = peer_device->connection->int_dig_in;
1952 	void *dig_vv = peer_device->connection->int_dig_vv;
1953 
1954 	digest_size = 0;
1955 	if (peer_device->connection->peer_integrity_tfm) {
1956 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1957 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1958 		if (err)
1959 			return err;
1960 		data_size -= digest_size;
1961 	}
1962 
1963 	/* optimistically update recv_cnt.  if receiving fails below,
1964 	 * we disconnect anyways, and counters will be reset. */
1965 	peer_device->device->recv_cnt += data_size>>9;
1966 
1967 	bio = req->master_bio;
1968 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1969 
1970 	bio_for_each_segment(bvec, bio, iter) {
1971 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1972 		expect = min_t(int, data_size, bvec.bv_len);
1973 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1974 		kunmap(bvec.bv_page);
1975 		if (err)
1976 			return err;
1977 		data_size -= expect;
1978 	}
1979 
1980 	if (digest_size) {
1981 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1982 		if (memcmp(dig_in, dig_vv, digest_size)) {
1983 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1984 			return -EINVAL;
1985 		}
1986 	}
1987 
1988 	D_ASSERT(peer_device->device, data_size == 0);
1989 	return 0;
1990 }
1991 
1992 /*
1993  * e_end_resync_block() is called in ack_sender context via
1994  * drbd_finish_peer_reqs().
1995  */
1996 static int e_end_resync_block(struct drbd_work *w, int unused)
1997 {
1998 	struct drbd_peer_request *peer_req =
1999 		container_of(w, struct drbd_peer_request, w);
2000 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2001 	struct drbd_device *device = peer_device->device;
2002 	sector_t sector = peer_req->i.sector;
2003 	int err;
2004 
2005 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2006 
2007 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2008 		drbd_set_in_sync(device, sector, peer_req->i.size);
2009 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2010 	} else {
2011 		/* Record failure to sync */
2012 		drbd_rs_failed_io(device, sector, peer_req->i.size);
2013 
2014 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2015 	}
2016 	dec_unacked(device);
2017 
2018 	return err;
2019 }
2020 
2021 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2022 			    struct packet_info *pi) __releases(local)
2023 {
2024 	struct drbd_device *device = peer_device->device;
2025 	struct drbd_peer_request *peer_req;
2026 
2027 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2028 	if (!peer_req)
2029 		goto fail;
2030 
2031 	dec_rs_pending(device);
2032 
2033 	inc_unacked(device);
2034 	/* corresponding dec_unacked() in e_end_resync_block()
2035 	 * respective _drbd_clear_done_ee */
2036 
2037 	peer_req->w.cb = e_end_resync_block;
2038 	peer_req->submit_jif = jiffies;
2039 
2040 	spin_lock_irq(&device->resource->req_lock);
2041 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2042 	spin_unlock_irq(&device->resource->req_lock);
2043 
2044 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2045 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2046 				     DRBD_FAULT_RS_WR) == 0)
2047 		return 0;
2048 
2049 	/* don't care for the reason here */
2050 	drbd_err(device, "submit failed, triggering re-connect\n");
2051 	spin_lock_irq(&device->resource->req_lock);
2052 	list_del(&peer_req->w.list);
2053 	spin_unlock_irq(&device->resource->req_lock);
2054 
2055 	drbd_free_peer_req(device, peer_req);
2056 fail:
2057 	put_ldev(device);
2058 	return -EIO;
2059 }
2060 
2061 static struct drbd_request *
2062 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2063 	     sector_t sector, bool missing_ok, const char *func)
2064 {
2065 	struct drbd_request *req;
2066 
2067 	/* Request object according to our peer */
2068 	req = (struct drbd_request *)(unsigned long)id;
2069 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2070 		return req;
2071 	if (!missing_ok) {
2072 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2073 			(unsigned long)id, (unsigned long long)sector);
2074 	}
2075 	return NULL;
2076 }
2077 
2078 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2079 {
2080 	struct drbd_peer_device *peer_device;
2081 	struct drbd_device *device;
2082 	struct drbd_request *req;
2083 	sector_t sector;
2084 	int err;
2085 	struct p_data *p = pi->data;
2086 
2087 	peer_device = conn_peer_device(connection, pi->vnr);
2088 	if (!peer_device)
2089 		return -EIO;
2090 	device = peer_device->device;
2091 
2092 	sector = be64_to_cpu(p->sector);
2093 
2094 	spin_lock_irq(&device->resource->req_lock);
2095 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2096 	spin_unlock_irq(&device->resource->req_lock);
2097 	if (unlikely(!req))
2098 		return -EIO;
2099 
2100 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2101 	 * special casing it there for the various failure cases.
2102 	 * still no race with drbd_fail_pending_reads */
2103 	err = recv_dless_read(peer_device, req, sector, pi->size);
2104 	if (!err)
2105 		req_mod(req, DATA_RECEIVED);
2106 	/* else: nothing. handled from drbd_disconnect...
2107 	 * I don't think we may complete this just yet
2108 	 * in case we are "on-disconnect: freeze" */
2109 
2110 	return err;
2111 }
2112 
2113 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2114 {
2115 	struct drbd_peer_device *peer_device;
2116 	struct drbd_device *device;
2117 	sector_t sector;
2118 	int err;
2119 	struct p_data *p = pi->data;
2120 
2121 	peer_device = conn_peer_device(connection, pi->vnr);
2122 	if (!peer_device)
2123 		return -EIO;
2124 	device = peer_device->device;
2125 
2126 	sector = be64_to_cpu(p->sector);
2127 	D_ASSERT(device, p->block_id == ID_SYNCER);
2128 
2129 	if (get_ldev(device)) {
2130 		/* data is submitted to disk within recv_resync_read.
2131 		 * corresponding put_ldev done below on error,
2132 		 * or in drbd_peer_request_endio. */
2133 		err = recv_resync_read(peer_device, sector, pi);
2134 	} else {
2135 		if (__ratelimit(&drbd_ratelimit_state))
2136 			drbd_err(device, "Can not write resync data to local disk.\n");
2137 
2138 		err = drbd_drain_block(peer_device, pi->size);
2139 
2140 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2141 	}
2142 
2143 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2144 
2145 	return err;
2146 }
2147 
2148 static void restart_conflicting_writes(struct drbd_device *device,
2149 				       sector_t sector, int size)
2150 {
2151 	struct drbd_interval *i;
2152 	struct drbd_request *req;
2153 
2154 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2155 		if (!i->local)
2156 			continue;
2157 		req = container_of(i, struct drbd_request, i);
2158 		if (req->rq_state & RQ_LOCAL_PENDING ||
2159 		    !(req->rq_state & RQ_POSTPONED))
2160 			continue;
2161 		/* as it is RQ_POSTPONED, this will cause it to
2162 		 * be queued on the retry workqueue. */
2163 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2164 	}
2165 }
2166 
2167 /*
2168  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2169  */
2170 static int e_end_block(struct drbd_work *w, int cancel)
2171 {
2172 	struct drbd_peer_request *peer_req =
2173 		container_of(w, struct drbd_peer_request, w);
2174 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2175 	struct drbd_device *device = peer_device->device;
2176 	sector_t sector = peer_req->i.sector;
2177 	int err = 0, pcmd;
2178 
2179 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2180 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2181 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2182 				device->state.conn <= C_PAUSED_SYNC_T &&
2183 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2184 				P_RS_WRITE_ACK : P_WRITE_ACK;
2185 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2186 			if (pcmd == P_RS_WRITE_ACK)
2187 				drbd_set_in_sync(device, sector, peer_req->i.size);
2188 		} else {
2189 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2190 			/* we expect it to be marked out of sync anyways...
2191 			 * maybe assert this?  */
2192 		}
2193 		dec_unacked(device);
2194 	}
2195 
2196 	/* we delete from the conflict detection hash _after_ we sent out the
2197 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2198 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2199 		spin_lock_irq(&device->resource->req_lock);
2200 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2201 		drbd_remove_epoch_entry_interval(device, peer_req);
2202 		if (peer_req->flags & EE_RESTART_REQUESTS)
2203 			restart_conflicting_writes(device, sector, peer_req->i.size);
2204 		spin_unlock_irq(&device->resource->req_lock);
2205 	} else
2206 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2207 
2208 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2209 
2210 	return err;
2211 }
2212 
2213 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2214 {
2215 	struct drbd_peer_request *peer_req =
2216 		container_of(w, struct drbd_peer_request, w);
2217 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2218 	int err;
2219 
2220 	err = drbd_send_ack(peer_device, ack, peer_req);
2221 	dec_unacked(peer_device->device);
2222 
2223 	return err;
2224 }
2225 
2226 static int e_send_superseded(struct drbd_work *w, int unused)
2227 {
2228 	return e_send_ack(w, P_SUPERSEDED);
2229 }
2230 
2231 static int e_send_retry_write(struct drbd_work *w, int unused)
2232 {
2233 	struct drbd_peer_request *peer_req =
2234 		container_of(w, struct drbd_peer_request, w);
2235 	struct drbd_connection *connection = peer_req->peer_device->connection;
2236 
2237 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2238 			     P_RETRY_WRITE : P_SUPERSEDED);
2239 }
2240 
2241 static bool seq_greater(u32 a, u32 b)
2242 {
2243 	/*
2244 	 * We assume 32-bit wrap-around here.
2245 	 * For 24-bit wrap-around, we would have to shift:
2246 	 *  a <<= 8; b <<= 8;
2247 	 */
2248 	return (s32)a - (s32)b > 0;
2249 }
2250 
2251 static u32 seq_max(u32 a, u32 b)
2252 {
2253 	return seq_greater(a, b) ? a : b;
2254 }
2255 
2256 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2257 {
2258 	struct drbd_device *device = peer_device->device;
2259 	unsigned int newest_peer_seq;
2260 
2261 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2262 		spin_lock(&device->peer_seq_lock);
2263 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2264 		device->peer_seq = newest_peer_seq;
2265 		spin_unlock(&device->peer_seq_lock);
2266 		/* wake up only if we actually changed device->peer_seq */
2267 		if (peer_seq == newest_peer_seq)
2268 			wake_up(&device->seq_wait);
2269 	}
2270 }
2271 
2272 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2273 {
2274 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2275 }
2276 
2277 /* maybe change sync_ee into interval trees as well? */
2278 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2279 {
2280 	struct drbd_peer_request *rs_req;
2281 	bool rv = false;
2282 
2283 	spin_lock_irq(&device->resource->req_lock);
2284 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2285 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2286 			     rs_req->i.sector, rs_req->i.size)) {
2287 			rv = true;
2288 			break;
2289 		}
2290 	}
2291 	spin_unlock_irq(&device->resource->req_lock);
2292 
2293 	return rv;
2294 }
2295 
2296 /* Called from receive_Data.
2297  * Synchronize packets on sock with packets on msock.
2298  *
2299  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2300  * packet traveling on msock, they are still processed in the order they have
2301  * been sent.
2302  *
2303  * Note: we don't care for Ack packets overtaking P_DATA packets.
2304  *
2305  * In case packet_seq is larger than device->peer_seq number, there are
2306  * outstanding packets on the msock. We wait for them to arrive.
2307  * In case we are the logically next packet, we update device->peer_seq
2308  * ourselves. Correctly handles 32bit wrap around.
2309  *
2310  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2311  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2312  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2313  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2314  *
2315  * returns 0 if we may process the packet,
2316  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2317 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2318 {
2319 	struct drbd_device *device = peer_device->device;
2320 	DEFINE_WAIT(wait);
2321 	long timeout;
2322 	int ret = 0, tp;
2323 
2324 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2325 		return 0;
2326 
2327 	spin_lock(&device->peer_seq_lock);
2328 	for (;;) {
2329 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2330 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2331 			break;
2332 		}
2333 
2334 		if (signal_pending(current)) {
2335 			ret = -ERESTARTSYS;
2336 			break;
2337 		}
2338 
2339 		rcu_read_lock();
2340 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2341 		rcu_read_unlock();
2342 
2343 		if (!tp)
2344 			break;
2345 
2346 		/* Only need to wait if two_primaries is enabled */
2347 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2348 		spin_unlock(&device->peer_seq_lock);
2349 		rcu_read_lock();
2350 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2351 		rcu_read_unlock();
2352 		timeout = schedule_timeout(timeout);
2353 		spin_lock(&device->peer_seq_lock);
2354 		if (!timeout) {
2355 			ret = -ETIMEDOUT;
2356 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2357 			break;
2358 		}
2359 	}
2360 	spin_unlock(&device->peer_seq_lock);
2361 	finish_wait(&device->seq_wait, &wait);
2362 	return ret;
2363 }
2364 
2365 /* see also bio_flags_to_wire()
2366  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2367  * flags and back. We may replicate to other kernel versions. */
2368 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2369 {
2370 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2371 		(dpf & DP_FUA ? REQ_FUA : 0) |
2372 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2373 }
2374 
2375 static unsigned long wire_flags_to_bio_op(u32 dpf)
2376 {
2377 	if (dpf & DP_DISCARD)
2378 		return REQ_OP_DISCARD;
2379 	else
2380 		return REQ_OP_WRITE;
2381 }
2382 
2383 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2384 				    unsigned int size)
2385 {
2386 	struct drbd_interval *i;
2387 
2388     repeat:
2389 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2390 		struct drbd_request *req;
2391 		struct bio_and_error m;
2392 
2393 		if (!i->local)
2394 			continue;
2395 		req = container_of(i, struct drbd_request, i);
2396 		if (!(req->rq_state & RQ_POSTPONED))
2397 			continue;
2398 		req->rq_state &= ~RQ_POSTPONED;
2399 		__req_mod(req, NEG_ACKED, &m);
2400 		spin_unlock_irq(&device->resource->req_lock);
2401 		if (m.bio)
2402 			complete_master_bio(device, &m);
2403 		spin_lock_irq(&device->resource->req_lock);
2404 		goto repeat;
2405 	}
2406 }
2407 
2408 static int handle_write_conflicts(struct drbd_device *device,
2409 				  struct drbd_peer_request *peer_req)
2410 {
2411 	struct drbd_connection *connection = peer_req->peer_device->connection;
2412 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2413 	sector_t sector = peer_req->i.sector;
2414 	const unsigned int size = peer_req->i.size;
2415 	struct drbd_interval *i;
2416 	bool equal;
2417 	int err;
2418 
2419 	/*
2420 	 * Inserting the peer request into the write_requests tree will prevent
2421 	 * new conflicting local requests from being added.
2422 	 */
2423 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2424 
2425     repeat:
2426 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2427 		if (i == &peer_req->i)
2428 			continue;
2429 		if (i->completed)
2430 			continue;
2431 
2432 		if (!i->local) {
2433 			/*
2434 			 * Our peer has sent a conflicting remote request; this
2435 			 * should not happen in a two-node setup.  Wait for the
2436 			 * earlier peer request to complete.
2437 			 */
2438 			err = drbd_wait_misc(device, i);
2439 			if (err)
2440 				goto out;
2441 			goto repeat;
2442 		}
2443 
2444 		equal = i->sector == sector && i->size == size;
2445 		if (resolve_conflicts) {
2446 			/*
2447 			 * If the peer request is fully contained within the
2448 			 * overlapping request, it can be considered overwritten
2449 			 * and thus superseded; otherwise, it will be retried
2450 			 * once all overlapping requests have completed.
2451 			 */
2452 			bool superseded = i->sector <= sector && i->sector +
2453 				       (i->size >> 9) >= sector + (size >> 9);
2454 
2455 			if (!equal)
2456 				drbd_alert(device, "Concurrent writes detected: "
2457 					       "local=%llus +%u, remote=%llus +%u, "
2458 					       "assuming %s came first\n",
2459 					  (unsigned long long)i->sector, i->size,
2460 					  (unsigned long long)sector, size,
2461 					  superseded ? "local" : "remote");
2462 
2463 			peer_req->w.cb = superseded ? e_send_superseded :
2464 						   e_send_retry_write;
2465 			list_add_tail(&peer_req->w.list, &device->done_ee);
2466 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2467 
2468 			err = -ENOENT;
2469 			goto out;
2470 		} else {
2471 			struct drbd_request *req =
2472 				container_of(i, struct drbd_request, i);
2473 
2474 			if (!equal)
2475 				drbd_alert(device, "Concurrent writes detected: "
2476 					       "local=%llus +%u, remote=%llus +%u\n",
2477 					  (unsigned long long)i->sector, i->size,
2478 					  (unsigned long long)sector, size);
2479 
2480 			if (req->rq_state & RQ_LOCAL_PENDING ||
2481 			    !(req->rq_state & RQ_POSTPONED)) {
2482 				/*
2483 				 * Wait for the node with the discard flag to
2484 				 * decide if this request has been superseded
2485 				 * or needs to be retried.
2486 				 * Requests that have been superseded will
2487 				 * disappear from the write_requests tree.
2488 				 *
2489 				 * In addition, wait for the conflicting
2490 				 * request to finish locally before submitting
2491 				 * the conflicting peer request.
2492 				 */
2493 				err = drbd_wait_misc(device, &req->i);
2494 				if (err) {
2495 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2496 					fail_postponed_requests(device, sector, size);
2497 					goto out;
2498 				}
2499 				goto repeat;
2500 			}
2501 			/*
2502 			 * Remember to restart the conflicting requests after
2503 			 * the new peer request has completed.
2504 			 */
2505 			peer_req->flags |= EE_RESTART_REQUESTS;
2506 		}
2507 	}
2508 	err = 0;
2509 
2510     out:
2511 	if (err)
2512 		drbd_remove_epoch_entry_interval(device, peer_req);
2513 	return err;
2514 }
2515 
2516 /* mirrored write */
2517 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2518 {
2519 	struct drbd_peer_device *peer_device;
2520 	struct drbd_device *device;
2521 	struct net_conf *nc;
2522 	sector_t sector;
2523 	struct drbd_peer_request *peer_req;
2524 	struct p_data *p = pi->data;
2525 	u32 peer_seq = be32_to_cpu(p->seq_num);
2526 	int op, op_flags;
2527 	u32 dp_flags;
2528 	int err, tp;
2529 
2530 	peer_device = conn_peer_device(connection, pi->vnr);
2531 	if (!peer_device)
2532 		return -EIO;
2533 	device = peer_device->device;
2534 
2535 	if (!get_ldev(device)) {
2536 		int err2;
2537 
2538 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2539 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2540 		atomic_inc(&connection->current_epoch->epoch_size);
2541 		err2 = drbd_drain_block(peer_device, pi->size);
2542 		if (!err)
2543 			err = err2;
2544 		return err;
2545 	}
2546 
2547 	/*
2548 	 * Corresponding put_ldev done either below (on various errors), or in
2549 	 * drbd_peer_request_endio, if we successfully submit the data at the
2550 	 * end of this function.
2551 	 */
2552 
2553 	sector = be64_to_cpu(p->sector);
2554 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2555 	if (!peer_req) {
2556 		put_ldev(device);
2557 		return -EIO;
2558 	}
2559 
2560 	peer_req->w.cb = e_end_block;
2561 	peer_req->submit_jif = jiffies;
2562 	peer_req->flags |= EE_APPLICATION;
2563 
2564 	dp_flags = be32_to_cpu(p->dp_flags);
2565 	op = wire_flags_to_bio_op(dp_flags);
2566 	op_flags = wire_flags_to_bio_flags(dp_flags);
2567 	if (pi->cmd == P_TRIM) {
2568 		D_ASSERT(peer_device, peer_req->i.size > 0);
2569 		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2570 		D_ASSERT(peer_device, peer_req->pages == NULL);
2571 	} else if (peer_req->pages == NULL) {
2572 		D_ASSERT(device, peer_req->i.size == 0);
2573 		D_ASSERT(device, dp_flags & DP_FLUSH);
2574 	}
2575 
2576 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2577 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2578 
2579 	spin_lock(&connection->epoch_lock);
2580 	peer_req->epoch = connection->current_epoch;
2581 	atomic_inc(&peer_req->epoch->epoch_size);
2582 	atomic_inc(&peer_req->epoch->active);
2583 	spin_unlock(&connection->epoch_lock);
2584 
2585 	rcu_read_lock();
2586 	nc = rcu_dereference(peer_device->connection->net_conf);
2587 	tp = nc->two_primaries;
2588 	if (peer_device->connection->agreed_pro_version < 100) {
2589 		switch (nc->wire_protocol) {
2590 		case DRBD_PROT_C:
2591 			dp_flags |= DP_SEND_WRITE_ACK;
2592 			break;
2593 		case DRBD_PROT_B:
2594 			dp_flags |= DP_SEND_RECEIVE_ACK;
2595 			break;
2596 		}
2597 	}
2598 	rcu_read_unlock();
2599 
2600 	if (dp_flags & DP_SEND_WRITE_ACK) {
2601 		peer_req->flags |= EE_SEND_WRITE_ACK;
2602 		inc_unacked(device);
2603 		/* corresponding dec_unacked() in e_end_block()
2604 		 * respective _drbd_clear_done_ee */
2605 	}
2606 
2607 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2608 		/* I really don't like it that the receiver thread
2609 		 * sends on the msock, but anyways */
2610 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2611 	}
2612 
2613 	if (tp) {
2614 		/* two primaries implies protocol C */
2615 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2616 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2617 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2618 		if (err)
2619 			goto out_interrupted;
2620 		spin_lock_irq(&device->resource->req_lock);
2621 		err = handle_write_conflicts(device, peer_req);
2622 		if (err) {
2623 			spin_unlock_irq(&device->resource->req_lock);
2624 			if (err == -ENOENT) {
2625 				put_ldev(device);
2626 				return 0;
2627 			}
2628 			goto out_interrupted;
2629 		}
2630 	} else {
2631 		update_peer_seq(peer_device, peer_seq);
2632 		spin_lock_irq(&device->resource->req_lock);
2633 	}
2634 	/* TRIM and WRITE_SAME are processed synchronously,
2635 	 * we wait for all pending requests, respectively wait for
2636 	 * active_ee to become empty in drbd_submit_peer_request();
2637 	 * better not add ourselves here. */
2638 	if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2639 		list_add_tail(&peer_req->w.list, &device->active_ee);
2640 	spin_unlock_irq(&device->resource->req_lock);
2641 
2642 	if (device->state.conn == C_SYNC_TARGET)
2643 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2644 
2645 	if (device->state.pdsk < D_INCONSISTENT) {
2646 		/* In case we have the only disk of the cluster, */
2647 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2648 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2649 		drbd_al_begin_io(device, &peer_req->i);
2650 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2651 	}
2652 
2653 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2654 				       DRBD_FAULT_DT_WR);
2655 	if (!err)
2656 		return 0;
2657 
2658 	/* don't care for the reason here */
2659 	drbd_err(device, "submit failed, triggering re-connect\n");
2660 	spin_lock_irq(&device->resource->req_lock);
2661 	list_del(&peer_req->w.list);
2662 	drbd_remove_epoch_entry_interval(device, peer_req);
2663 	spin_unlock_irq(&device->resource->req_lock);
2664 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2665 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2666 		drbd_al_complete_io(device, &peer_req->i);
2667 	}
2668 
2669 out_interrupted:
2670 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2671 	put_ldev(device);
2672 	drbd_free_peer_req(device, peer_req);
2673 	return err;
2674 }
2675 
2676 /* We may throttle resync, if the lower device seems to be busy,
2677  * and current sync rate is above c_min_rate.
2678  *
2679  * To decide whether or not the lower device is busy, we use a scheme similar
2680  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2681  * (more than 64 sectors) of activity we cannot account for with our own resync
2682  * activity, it obviously is "busy".
2683  *
2684  * The current sync rate used here uses only the most recent two step marks,
2685  * to have a short time average so we can react faster.
2686  */
2687 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2688 		bool throttle_if_app_is_waiting)
2689 {
2690 	struct lc_element *tmp;
2691 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2692 
2693 	if (!throttle || throttle_if_app_is_waiting)
2694 		return throttle;
2695 
2696 	spin_lock_irq(&device->al_lock);
2697 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2698 	if (tmp) {
2699 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2700 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2701 			throttle = false;
2702 		/* Do not slow down if app IO is already waiting for this extent,
2703 		 * and our progress is necessary for application IO to complete. */
2704 	}
2705 	spin_unlock_irq(&device->al_lock);
2706 
2707 	return throttle;
2708 }
2709 
2710 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2711 {
2712 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2713 	unsigned long db, dt, dbdt;
2714 	unsigned int c_min_rate;
2715 	int curr_events;
2716 
2717 	rcu_read_lock();
2718 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2719 	rcu_read_unlock();
2720 
2721 	/* feature disabled? */
2722 	if (c_min_rate == 0)
2723 		return false;
2724 
2725 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2726 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2727 			atomic_read(&device->rs_sect_ev);
2728 
2729 	if (atomic_read(&device->ap_actlog_cnt)
2730 	    || curr_events - device->rs_last_events > 64) {
2731 		unsigned long rs_left;
2732 		int i;
2733 
2734 		device->rs_last_events = curr_events;
2735 
2736 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2737 		 * approx. */
2738 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2739 
2740 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2741 			rs_left = device->ov_left;
2742 		else
2743 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2744 
2745 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2746 		if (!dt)
2747 			dt++;
2748 		db = device->rs_mark_left[i] - rs_left;
2749 		dbdt = Bit2KB(db/dt);
2750 
2751 		if (dbdt > c_min_rate)
2752 			return true;
2753 	}
2754 	return false;
2755 }
2756 
2757 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2758 {
2759 	struct drbd_peer_device *peer_device;
2760 	struct drbd_device *device;
2761 	sector_t sector;
2762 	sector_t capacity;
2763 	struct drbd_peer_request *peer_req;
2764 	struct digest_info *di = NULL;
2765 	int size, verb;
2766 	unsigned int fault_type;
2767 	struct p_block_req *p =	pi->data;
2768 
2769 	peer_device = conn_peer_device(connection, pi->vnr);
2770 	if (!peer_device)
2771 		return -EIO;
2772 	device = peer_device->device;
2773 	capacity = drbd_get_capacity(device->this_bdev);
2774 
2775 	sector = be64_to_cpu(p->sector);
2776 	size   = be32_to_cpu(p->blksize);
2777 
2778 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2779 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2780 				(unsigned long long)sector, size);
2781 		return -EINVAL;
2782 	}
2783 	if (sector + (size>>9) > capacity) {
2784 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2785 				(unsigned long long)sector, size);
2786 		return -EINVAL;
2787 	}
2788 
2789 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2790 		verb = 1;
2791 		switch (pi->cmd) {
2792 		case P_DATA_REQUEST:
2793 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2794 			break;
2795 		case P_RS_THIN_REQ:
2796 		case P_RS_DATA_REQUEST:
2797 		case P_CSUM_RS_REQUEST:
2798 		case P_OV_REQUEST:
2799 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2800 			break;
2801 		case P_OV_REPLY:
2802 			verb = 0;
2803 			dec_rs_pending(device);
2804 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2805 			break;
2806 		default:
2807 			BUG();
2808 		}
2809 		if (verb && __ratelimit(&drbd_ratelimit_state))
2810 			drbd_err(device, "Can not satisfy peer's read request, "
2811 			    "no local data.\n");
2812 
2813 		/* drain possibly payload */
2814 		return drbd_drain_block(peer_device, pi->size);
2815 	}
2816 
2817 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2818 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2819 	 * which in turn might block on the other node at this very place.  */
2820 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2821 			size, GFP_NOIO);
2822 	if (!peer_req) {
2823 		put_ldev(device);
2824 		return -ENOMEM;
2825 	}
2826 
2827 	switch (pi->cmd) {
2828 	case P_DATA_REQUEST:
2829 		peer_req->w.cb = w_e_end_data_req;
2830 		fault_type = DRBD_FAULT_DT_RD;
2831 		/* application IO, don't drbd_rs_begin_io */
2832 		peer_req->flags |= EE_APPLICATION;
2833 		goto submit;
2834 
2835 	case P_RS_THIN_REQ:
2836 		/* If at some point in the future we have a smart way to
2837 		   find out if this data block is completely deallocated,
2838 		   then we would do something smarter here than reading
2839 		   the block... */
2840 		peer_req->flags |= EE_RS_THIN_REQ;
2841 	case P_RS_DATA_REQUEST:
2842 		peer_req->w.cb = w_e_end_rsdata_req;
2843 		fault_type = DRBD_FAULT_RS_RD;
2844 		/* used in the sector offset progress display */
2845 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2846 		break;
2847 
2848 	case P_OV_REPLY:
2849 	case P_CSUM_RS_REQUEST:
2850 		fault_type = DRBD_FAULT_RS_RD;
2851 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2852 		if (!di)
2853 			goto out_free_e;
2854 
2855 		di->digest_size = pi->size;
2856 		di->digest = (((char *)di)+sizeof(struct digest_info));
2857 
2858 		peer_req->digest = di;
2859 		peer_req->flags |= EE_HAS_DIGEST;
2860 
2861 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2862 			goto out_free_e;
2863 
2864 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2865 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2866 			peer_req->w.cb = w_e_end_csum_rs_req;
2867 			/* used in the sector offset progress display */
2868 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2869 			/* remember to report stats in drbd_resync_finished */
2870 			device->use_csums = true;
2871 		} else if (pi->cmd == P_OV_REPLY) {
2872 			/* track progress, we may need to throttle */
2873 			atomic_add(size >> 9, &device->rs_sect_in);
2874 			peer_req->w.cb = w_e_end_ov_reply;
2875 			dec_rs_pending(device);
2876 			/* drbd_rs_begin_io done when we sent this request,
2877 			 * but accounting still needs to be done. */
2878 			goto submit_for_resync;
2879 		}
2880 		break;
2881 
2882 	case P_OV_REQUEST:
2883 		if (device->ov_start_sector == ~(sector_t)0 &&
2884 		    peer_device->connection->agreed_pro_version >= 90) {
2885 			unsigned long now = jiffies;
2886 			int i;
2887 			device->ov_start_sector = sector;
2888 			device->ov_position = sector;
2889 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2890 			device->rs_total = device->ov_left;
2891 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2892 				device->rs_mark_left[i] = device->ov_left;
2893 				device->rs_mark_time[i] = now;
2894 			}
2895 			drbd_info(device, "Online Verify start sector: %llu\n",
2896 					(unsigned long long)sector);
2897 		}
2898 		peer_req->w.cb = w_e_end_ov_req;
2899 		fault_type = DRBD_FAULT_RS_RD;
2900 		break;
2901 
2902 	default:
2903 		BUG();
2904 	}
2905 
2906 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2907 	 * wrt the receiver, but it is not as straightforward as it may seem.
2908 	 * Various places in the resync start and stop logic assume resync
2909 	 * requests are processed in order, requeuing this on the worker thread
2910 	 * introduces a bunch of new code for synchronization between threads.
2911 	 *
2912 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2913 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2914 	 * for application writes for the same time.  For now, just throttle
2915 	 * here, where the rest of the code expects the receiver to sleep for
2916 	 * a while, anyways.
2917 	 */
2918 
2919 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2920 	 * this defers syncer requests for some time, before letting at least
2921 	 * on request through.  The resync controller on the receiving side
2922 	 * will adapt to the incoming rate accordingly.
2923 	 *
2924 	 * We cannot throttle here if remote is Primary/SyncTarget:
2925 	 * we would also throttle its application reads.
2926 	 * In that case, throttling is done on the SyncTarget only.
2927 	 */
2928 
2929 	/* Even though this may be a resync request, we do add to "read_ee";
2930 	 * "sync_ee" is only used for resync WRITEs.
2931 	 * Add to list early, so debugfs can find this request
2932 	 * even if we have to sleep below. */
2933 	spin_lock_irq(&device->resource->req_lock);
2934 	list_add_tail(&peer_req->w.list, &device->read_ee);
2935 	spin_unlock_irq(&device->resource->req_lock);
2936 
2937 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2938 	if (device->state.peer != R_PRIMARY
2939 	&& drbd_rs_should_slow_down(device, sector, false))
2940 		schedule_timeout_uninterruptible(HZ/10);
2941 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2942 	if (drbd_rs_begin_io(device, sector))
2943 		goto out_free_e;
2944 
2945 submit_for_resync:
2946 	atomic_add(size >> 9, &device->rs_sect_ev);
2947 
2948 submit:
2949 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2950 	inc_unacked(device);
2951 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2952 				     fault_type) == 0)
2953 		return 0;
2954 
2955 	/* don't care for the reason here */
2956 	drbd_err(device, "submit failed, triggering re-connect\n");
2957 
2958 out_free_e:
2959 	spin_lock_irq(&device->resource->req_lock);
2960 	list_del(&peer_req->w.list);
2961 	spin_unlock_irq(&device->resource->req_lock);
2962 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2963 
2964 	put_ldev(device);
2965 	drbd_free_peer_req(device, peer_req);
2966 	return -EIO;
2967 }
2968 
2969 /**
2970  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2971  */
2972 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2973 {
2974 	struct drbd_device *device = peer_device->device;
2975 	int self, peer, rv = -100;
2976 	unsigned long ch_self, ch_peer;
2977 	enum drbd_after_sb_p after_sb_0p;
2978 
2979 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2980 	peer = device->p_uuid[UI_BITMAP] & 1;
2981 
2982 	ch_peer = device->p_uuid[UI_SIZE];
2983 	ch_self = device->comm_bm_set;
2984 
2985 	rcu_read_lock();
2986 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2987 	rcu_read_unlock();
2988 	switch (after_sb_0p) {
2989 	case ASB_CONSENSUS:
2990 	case ASB_DISCARD_SECONDARY:
2991 	case ASB_CALL_HELPER:
2992 	case ASB_VIOLENTLY:
2993 		drbd_err(device, "Configuration error.\n");
2994 		break;
2995 	case ASB_DISCONNECT:
2996 		break;
2997 	case ASB_DISCARD_YOUNGER_PRI:
2998 		if (self == 0 && peer == 1) {
2999 			rv = -1;
3000 			break;
3001 		}
3002 		if (self == 1 && peer == 0) {
3003 			rv =  1;
3004 			break;
3005 		}
3006 		/* Else fall through to one of the other strategies... */
3007 	case ASB_DISCARD_OLDER_PRI:
3008 		if (self == 0 && peer == 1) {
3009 			rv = 1;
3010 			break;
3011 		}
3012 		if (self == 1 && peer == 0) {
3013 			rv = -1;
3014 			break;
3015 		}
3016 		/* Else fall through to one of the other strategies... */
3017 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3018 		     "Using discard-least-changes instead\n");
3019 	case ASB_DISCARD_ZERO_CHG:
3020 		if (ch_peer == 0 && ch_self == 0) {
3021 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3022 				? -1 : 1;
3023 			break;
3024 		} else {
3025 			if (ch_peer == 0) { rv =  1; break; }
3026 			if (ch_self == 0) { rv = -1; break; }
3027 		}
3028 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3029 			break;
3030 	case ASB_DISCARD_LEAST_CHG:
3031 		if	(ch_self < ch_peer)
3032 			rv = -1;
3033 		else if (ch_self > ch_peer)
3034 			rv =  1;
3035 		else /* ( ch_self == ch_peer ) */
3036 		     /* Well, then use something else. */
3037 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3038 				? -1 : 1;
3039 		break;
3040 	case ASB_DISCARD_LOCAL:
3041 		rv = -1;
3042 		break;
3043 	case ASB_DISCARD_REMOTE:
3044 		rv =  1;
3045 	}
3046 
3047 	return rv;
3048 }
3049 
3050 /**
3051  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3052  */
3053 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3054 {
3055 	struct drbd_device *device = peer_device->device;
3056 	int hg, rv = -100;
3057 	enum drbd_after_sb_p after_sb_1p;
3058 
3059 	rcu_read_lock();
3060 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3061 	rcu_read_unlock();
3062 	switch (after_sb_1p) {
3063 	case ASB_DISCARD_YOUNGER_PRI:
3064 	case ASB_DISCARD_OLDER_PRI:
3065 	case ASB_DISCARD_LEAST_CHG:
3066 	case ASB_DISCARD_LOCAL:
3067 	case ASB_DISCARD_REMOTE:
3068 	case ASB_DISCARD_ZERO_CHG:
3069 		drbd_err(device, "Configuration error.\n");
3070 		break;
3071 	case ASB_DISCONNECT:
3072 		break;
3073 	case ASB_CONSENSUS:
3074 		hg = drbd_asb_recover_0p(peer_device);
3075 		if (hg == -1 && device->state.role == R_SECONDARY)
3076 			rv = hg;
3077 		if (hg == 1  && device->state.role == R_PRIMARY)
3078 			rv = hg;
3079 		break;
3080 	case ASB_VIOLENTLY:
3081 		rv = drbd_asb_recover_0p(peer_device);
3082 		break;
3083 	case ASB_DISCARD_SECONDARY:
3084 		return device->state.role == R_PRIMARY ? 1 : -1;
3085 	case ASB_CALL_HELPER:
3086 		hg = drbd_asb_recover_0p(peer_device);
3087 		if (hg == -1 && device->state.role == R_PRIMARY) {
3088 			enum drbd_state_rv rv2;
3089 
3090 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3091 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3092 			  * we do not need to wait for the after state change work either. */
3093 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3094 			if (rv2 != SS_SUCCESS) {
3095 				drbd_khelper(device, "pri-lost-after-sb");
3096 			} else {
3097 				drbd_warn(device, "Successfully gave up primary role.\n");
3098 				rv = hg;
3099 			}
3100 		} else
3101 			rv = hg;
3102 	}
3103 
3104 	return rv;
3105 }
3106 
3107 /**
3108  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3109  */
3110 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3111 {
3112 	struct drbd_device *device = peer_device->device;
3113 	int hg, rv = -100;
3114 	enum drbd_after_sb_p after_sb_2p;
3115 
3116 	rcu_read_lock();
3117 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3118 	rcu_read_unlock();
3119 	switch (after_sb_2p) {
3120 	case ASB_DISCARD_YOUNGER_PRI:
3121 	case ASB_DISCARD_OLDER_PRI:
3122 	case ASB_DISCARD_LEAST_CHG:
3123 	case ASB_DISCARD_LOCAL:
3124 	case ASB_DISCARD_REMOTE:
3125 	case ASB_CONSENSUS:
3126 	case ASB_DISCARD_SECONDARY:
3127 	case ASB_DISCARD_ZERO_CHG:
3128 		drbd_err(device, "Configuration error.\n");
3129 		break;
3130 	case ASB_VIOLENTLY:
3131 		rv = drbd_asb_recover_0p(peer_device);
3132 		break;
3133 	case ASB_DISCONNECT:
3134 		break;
3135 	case ASB_CALL_HELPER:
3136 		hg = drbd_asb_recover_0p(peer_device);
3137 		if (hg == -1) {
3138 			enum drbd_state_rv rv2;
3139 
3140 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3141 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3142 			  * we do not need to wait for the after state change work either. */
3143 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3144 			if (rv2 != SS_SUCCESS) {
3145 				drbd_khelper(device, "pri-lost-after-sb");
3146 			} else {
3147 				drbd_warn(device, "Successfully gave up primary role.\n");
3148 				rv = hg;
3149 			}
3150 		} else
3151 			rv = hg;
3152 	}
3153 
3154 	return rv;
3155 }
3156 
3157 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3158 			   u64 bits, u64 flags)
3159 {
3160 	if (!uuid) {
3161 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3162 		return;
3163 	}
3164 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3165 	     text,
3166 	     (unsigned long long)uuid[UI_CURRENT],
3167 	     (unsigned long long)uuid[UI_BITMAP],
3168 	     (unsigned long long)uuid[UI_HISTORY_START],
3169 	     (unsigned long long)uuid[UI_HISTORY_END],
3170 	     (unsigned long long)bits,
3171 	     (unsigned long long)flags);
3172 }
3173 
3174 /*
3175   100	after split brain try auto recover
3176     2	C_SYNC_SOURCE set BitMap
3177     1	C_SYNC_SOURCE use BitMap
3178     0	no Sync
3179    -1	C_SYNC_TARGET use BitMap
3180    -2	C_SYNC_TARGET set BitMap
3181  -100	after split brain, disconnect
3182 -1000	unrelated data
3183 -1091   requires proto 91
3184 -1096   requires proto 96
3185  */
3186 
3187 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3188 {
3189 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3190 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3191 	u64 self, peer;
3192 	int i, j;
3193 
3194 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3195 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3196 
3197 	*rule_nr = 10;
3198 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3199 		return 0;
3200 
3201 	*rule_nr = 20;
3202 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3203 	     peer != UUID_JUST_CREATED)
3204 		return -2;
3205 
3206 	*rule_nr = 30;
3207 	if (self != UUID_JUST_CREATED &&
3208 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3209 		return 2;
3210 
3211 	if (self == peer) {
3212 		int rct, dc; /* roles at crash time */
3213 
3214 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3215 
3216 			if (connection->agreed_pro_version < 91)
3217 				return -1091;
3218 
3219 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3220 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3221 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3222 				drbd_uuid_move_history(device);
3223 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3224 				device->ldev->md.uuid[UI_BITMAP] = 0;
3225 
3226 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3227 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3228 				*rule_nr = 34;
3229 			} else {
3230 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3231 				*rule_nr = 36;
3232 			}
3233 
3234 			return 1;
3235 		}
3236 
3237 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3238 
3239 			if (connection->agreed_pro_version < 91)
3240 				return -1091;
3241 
3242 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3243 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3244 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3245 
3246 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3247 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3248 				device->p_uuid[UI_BITMAP] = 0UL;
3249 
3250 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3251 				*rule_nr = 35;
3252 			} else {
3253 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3254 				*rule_nr = 37;
3255 			}
3256 
3257 			return -1;
3258 		}
3259 
3260 		/* Common power [off|failure] */
3261 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3262 			(device->p_uuid[UI_FLAGS] & 2);
3263 		/* lowest bit is set when we were primary,
3264 		 * next bit (weight 2) is set when peer was primary */
3265 		*rule_nr = 40;
3266 
3267 		/* Neither has the "crashed primary" flag set,
3268 		 * only a replication link hickup. */
3269 		if (rct == 0)
3270 			return 0;
3271 
3272 		/* Current UUID equal and no bitmap uuid; does not necessarily
3273 		 * mean this was a "simultaneous hard crash", maybe IO was
3274 		 * frozen, so no UUID-bump happened.
3275 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3276 		 * for "new-enough" peer DRBD version. */
3277 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3278 			*rule_nr = 41;
3279 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3280 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3281 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3282 			}
3283 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3284 				/* At least one has the "crashed primary" bit set,
3285 				 * both are primary now, but neither has rotated its UUIDs?
3286 				 * "Can not happen." */
3287 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3288 				return -100;
3289 			}
3290 			if (device->state.role == R_PRIMARY)
3291 				return 1;
3292 			return -1;
3293 		}
3294 
3295 		/* Both are secondary.
3296 		 * Really looks like recovery from simultaneous hard crash.
3297 		 * Check which had been primary before, and arbitrate. */
3298 		switch (rct) {
3299 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3300 		case 1: /*  self_pri && !peer_pri */ return 1;
3301 		case 2: /* !self_pri &&  peer_pri */ return -1;
3302 		case 3: /*  self_pri &&  peer_pri */
3303 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3304 			return dc ? -1 : 1;
3305 		}
3306 	}
3307 
3308 	*rule_nr = 50;
3309 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3310 	if (self == peer)
3311 		return -1;
3312 
3313 	*rule_nr = 51;
3314 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3315 	if (self == peer) {
3316 		if (connection->agreed_pro_version < 96 ?
3317 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3318 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3319 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3320 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3321 			   resync as sync source modifications of the peer's UUIDs. */
3322 
3323 			if (connection->agreed_pro_version < 91)
3324 				return -1091;
3325 
3326 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3327 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3328 
3329 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3330 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3331 
3332 			return -1;
3333 		}
3334 	}
3335 
3336 	*rule_nr = 60;
3337 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3338 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3339 		peer = device->p_uuid[i] & ~((u64)1);
3340 		if (self == peer)
3341 			return -2;
3342 	}
3343 
3344 	*rule_nr = 70;
3345 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3346 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3347 	if (self == peer)
3348 		return 1;
3349 
3350 	*rule_nr = 71;
3351 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3352 	if (self == peer) {
3353 		if (connection->agreed_pro_version < 96 ?
3354 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3355 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3356 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3357 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3358 			   resync as sync source modifications of our UUIDs. */
3359 
3360 			if (connection->agreed_pro_version < 91)
3361 				return -1091;
3362 
3363 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3364 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3365 
3366 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3367 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3368 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3369 
3370 			return 1;
3371 		}
3372 	}
3373 
3374 
3375 	*rule_nr = 80;
3376 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3377 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3378 		self = device->ldev->md.uuid[i] & ~((u64)1);
3379 		if (self == peer)
3380 			return 2;
3381 	}
3382 
3383 	*rule_nr = 90;
3384 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3385 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3386 	if (self == peer && self != ((u64)0))
3387 		return 100;
3388 
3389 	*rule_nr = 100;
3390 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3391 		self = device->ldev->md.uuid[i] & ~((u64)1);
3392 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3393 			peer = device->p_uuid[j] & ~((u64)1);
3394 			if (self == peer)
3395 				return -100;
3396 		}
3397 	}
3398 
3399 	return -1000;
3400 }
3401 
3402 /* drbd_sync_handshake() returns the new conn state on success, or
3403    CONN_MASK (-1) on failure.
3404  */
3405 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3406 					   enum drbd_role peer_role,
3407 					   enum drbd_disk_state peer_disk) __must_hold(local)
3408 {
3409 	struct drbd_device *device = peer_device->device;
3410 	enum drbd_conns rv = C_MASK;
3411 	enum drbd_disk_state mydisk;
3412 	struct net_conf *nc;
3413 	int hg, rule_nr, rr_conflict, tentative;
3414 
3415 	mydisk = device->state.disk;
3416 	if (mydisk == D_NEGOTIATING)
3417 		mydisk = device->new_state_tmp.disk;
3418 
3419 	drbd_info(device, "drbd_sync_handshake:\n");
3420 
3421 	spin_lock_irq(&device->ldev->md.uuid_lock);
3422 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3423 	drbd_uuid_dump(device, "peer", device->p_uuid,
3424 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3425 
3426 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3427 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3428 
3429 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3430 
3431 	if (hg == -1000) {
3432 		drbd_alert(device, "Unrelated data, aborting!\n");
3433 		return C_MASK;
3434 	}
3435 	if (hg < -0x10000) {
3436 		int proto, fflags;
3437 		hg = -hg;
3438 		proto = hg & 0xff;
3439 		fflags = (hg >> 8) & 0xff;
3440 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3441 					proto, fflags);
3442 		return C_MASK;
3443 	}
3444 	if (hg < -1000) {
3445 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3446 		return C_MASK;
3447 	}
3448 
3449 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3450 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3451 		int f = (hg == -100) || abs(hg) == 2;
3452 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3453 		if (f)
3454 			hg = hg*2;
3455 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3456 		     hg > 0 ? "source" : "target");
3457 	}
3458 
3459 	if (abs(hg) == 100)
3460 		drbd_khelper(device, "initial-split-brain");
3461 
3462 	rcu_read_lock();
3463 	nc = rcu_dereference(peer_device->connection->net_conf);
3464 
3465 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3466 		int pcount = (device->state.role == R_PRIMARY)
3467 			   + (peer_role == R_PRIMARY);
3468 		int forced = (hg == -100);
3469 
3470 		switch (pcount) {
3471 		case 0:
3472 			hg = drbd_asb_recover_0p(peer_device);
3473 			break;
3474 		case 1:
3475 			hg = drbd_asb_recover_1p(peer_device);
3476 			break;
3477 		case 2:
3478 			hg = drbd_asb_recover_2p(peer_device);
3479 			break;
3480 		}
3481 		if (abs(hg) < 100) {
3482 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3483 			     "automatically solved. Sync from %s node\n",
3484 			     pcount, (hg < 0) ? "peer" : "this");
3485 			if (forced) {
3486 				drbd_warn(device, "Doing a full sync, since"
3487 				     " UUIDs where ambiguous.\n");
3488 				hg = hg*2;
3489 			}
3490 		}
3491 	}
3492 
3493 	if (hg == -100) {
3494 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3495 			hg = -1;
3496 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3497 			hg = 1;
3498 
3499 		if (abs(hg) < 100)
3500 			drbd_warn(device, "Split-Brain detected, manually solved. "
3501 			     "Sync from %s node\n",
3502 			     (hg < 0) ? "peer" : "this");
3503 	}
3504 	rr_conflict = nc->rr_conflict;
3505 	tentative = nc->tentative;
3506 	rcu_read_unlock();
3507 
3508 	if (hg == -100) {
3509 		/* FIXME this log message is not correct if we end up here
3510 		 * after an attempted attach on a diskless node.
3511 		 * We just refuse to attach -- well, we drop the "connection"
3512 		 * to that disk, in a way... */
3513 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3514 		drbd_khelper(device, "split-brain");
3515 		return C_MASK;
3516 	}
3517 
3518 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3519 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3520 		return C_MASK;
3521 	}
3522 
3523 	if (hg < 0 && /* by intention we do not use mydisk here. */
3524 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3525 		switch (rr_conflict) {
3526 		case ASB_CALL_HELPER:
3527 			drbd_khelper(device, "pri-lost");
3528 			/* fall through */
3529 		case ASB_DISCONNECT:
3530 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3531 			return C_MASK;
3532 		case ASB_VIOLENTLY:
3533 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3534 			     "assumption\n");
3535 		}
3536 	}
3537 
3538 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3539 		if (hg == 0)
3540 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3541 		else
3542 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3543 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3544 				 abs(hg) >= 2 ? "full" : "bit-map based");
3545 		return C_MASK;
3546 	}
3547 
3548 	if (abs(hg) >= 2) {
3549 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3550 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3551 					BM_LOCKED_SET_ALLOWED))
3552 			return C_MASK;
3553 	}
3554 
3555 	if (hg > 0) { /* become sync source. */
3556 		rv = C_WF_BITMAP_S;
3557 	} else if (hg < 0) { /* become sync target */
3558 		rv = C_WF_BITMAP_T;
3559 	} else {
3560 		rv = C_CONNECTED;
3561 		if (drbd_bm_total_weight(device)) {
3562 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3563 			     drbd_bm_total_weight(device));
3564 		}
3565 	}
3566 
3567 	return rv;
3568 }
3569 
3570 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3571 {
3572 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3573 	if (peer == ASB_DISCARD_REMOTE)
3574 		return ASB_DISCARD_LOCAL;
3575 
3576 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3577 	if (peer == ASB_DISCARD_LOCAL)
3578 		return ASB_DISCARD_REMOTE;
3579 
3580 	/* everything else is valid if they are equal on both sides. */
3581 	return peer;
3582 }
3583 
3584 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3585 {
3586 	struct p_protocol *p = pi->data;
3587 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3588 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3589 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3590 	char integrity_alg[SHARED_SECRET_MAX] = "";
3591 	struct crypto_ahash *peer_integrity_tfm = NULL;
3592 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3593 
3594 	p_proto		= be32_to_cpu(p->protocol);
3595 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3596 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3597 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3598 	p_two_primaries = be32_to_cpu(p->two_primaries);
3599 	cf		= be32_to_cpu(p->conn_flags);
3600 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3601 
3602 	if (connection->agreed_pro_version >= 87) {
3603 		int err;
3604 
3605 		if (pi->size > sizeof(integrity_alg))
3606 			return -EIO;
3607 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3608 		if (err)
3609 			return err;
3610 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3611 	}
3612 
3613 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3614 		clear_bit(CONN_DRY_RUN, &connection->flags);
3615 
3616 		if (cf & CF_DRY_RUN)
3617 			set_bit(CONN_DRY_RUN, &connection->flags);
3618 
3619 		rcu_read_lock();
3620 		nc = rcu_dereference(connection->net_conf);
3621 
3622 		if (p_proto != nc->wire_protocol) {
3623 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3624 			goto disconnect_rcu_unlock;
3625 		}
3626 
3627 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3628 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3629 			goto disconnect_rcu_unlock;
3630 		}
3631 
3632 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3633 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3634 			goto disconnect_rcu_unlock;
3635 		}
3636 
3637 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3638 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3639 			goto disconnect_rcu_unlock;
3640 		}
3641 
3642 		if (p_discard_my_data && nc->discard_my_data) {
3643 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3644 			goto disconnect_rcu_unlock;
3645 		}
3646 
3647 		if (p_two_primaries != nc->two_primaries) {
3648 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3649 			goto disconnect_rcu_unlock;
3650 		}
3651 
3652 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3653 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3654 			goto disconnect_rcu_unlock;
3655 		}
3656 
3657 		rcu_read_unlock();
3658 	}
3659 
3660 	if (integrity_alg[0]) {
3661 		int hash_size;
3662 
3663 		/*
3664 		 * We can only change the peer data integrity algorithm
3665 		 * here.  Changing our own data integrity algorithm
3666 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3667 		 * the same time; otherwise, the peer has no way to
3668 		 * tell between which packets the algorithm should
3669 		 * change.
3670 		 */
3671 
3672 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3673 		if (IS_ERR(peer_integrity_tfm)) {
3674 			peer_integrity_tfm = NULL;
3675 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3676 				 integrity_alg);
3677 			goto disconnect;
3678 		}
3679 
3680 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3681 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3682 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3683 		if (!(int_dig_in && int_dig_vv)) {
3684 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3685 			goto disconnect;
3686 		}
3687 	}
3688 
3689 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3690 	if (!new_net_conf) {
3691 		drbd_err(connection, "Allocation of new net_conf failed\n");
3692 		goto disconnect;
3693 	}
3694 
3695 	mutex_lock(&connection->data.mutex);
3696 	mutex_lock(&connection->resource->conf_update);
3697 	old_net_conf = connection->net_conf;
3698 	*new_net_conf = *old_net_conf;
3699 
3700 	new_net_conf->wire_protocol = p_proto;
3701 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3702 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3703 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3704 	new_net_conf->two_primaries = p_two_primaries;
3705 
3706 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3707 	mutex_unlock(&connection->resource->conf_update);
3708 	mutex_unlock(&connection->data.mutex);
3709 
3710 	crypto_free_ahash(connection->peer_integrity_tfm);
3711 	kfree(connection->int_dig_in);
3712 	kfree(connection->int_dig_vv);
3713 	connection->peer_integrity_tfm = peer_integrity_tfm;
3714 	connection->int_dig_in = int_dig_in;
3715 	connection->int_dig_vv = int_dig_vv;
3716 
3717 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3718 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3719 			  integrity_alg[0] ? integrity_alg : "(none)");
3720 
3721 	synchronize_rcu();
3722 	kfree(old_net_conf);
3723 	return 0;
3724 
3725 disconnect_rcu_unlock:
3726 	rcu_read_unlock();
3727 disconnect:
3728 	crypto_free_ahash(peer_integrity_tfm);
3729 	kfree(int_dig_in);
3730 	kfree(int_dig_vv);
3731 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3732 	return -EIO;
3733 }
3734 
3735 /* helper function
3736  * input: alg name, feature name
3737  * return: NULL (alg name was "")
3738  *         ERR_PTR(error) if something goes wrong
3739  *         or the crypto hash ptr, if it worked out ok. */
3740 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3741 		const char *alg, const char *name)
3742 {
3743 	struct crypto_ahash *tfm;
3744 
3745 	if (!alg[0])
3746 		return NULL;
3747 
3748 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3749 	if (IS_ERR(tfm)) {
3750 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3751 			alg, name, PTR_ERR(tfm));
3752 		return tfm;
3753 	}
3754 	return tfm;
3755 }
3756 
3757 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3758 {
3759 	void *buffer = connection->data.rbuf;
3760 	int size = pi->size;
3761 
3762 	while (size) {
3763 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3764 		s = drbd_recv(connection, buffer, s);
3765 		if (s <= 0) {
3766 			if (s < 0)
3767 				return s;
3768 			break;
3769 		}
3770 		size -= s;
3771 	}
3772 	if (size)
3773 		return -EIO;
3774 	return 0;
3775 }
3776 
3777 /*
3778  * config_unknown_volume  -  device configuration command for unknown volume
3779  *
3780  * When a device is added to an existing connection, the node on which the
3781  * device is added first will send configuration commands to its peer but the
3782  * peer will not know about the device yet.  It will warn and ignore these
3783  * commands.  Once the device is added on the second node, the second node will
3784  * send the same device configuration commands, but in the other direction.
3785  *
3786  * (We can also end up here if drbd is misconfigured.)
3787  */
3788 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3789 {
3790 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3791 		  cmdname(pi->cmd), pi->vnr);
3792 	return ignore_remaining_packet(connection, pi);
3793 }
3794 
3795 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3796 {
3797 	struct drbd_peer_device *peer_device;
3798 	struct drbd_device *device;
3799 	struct p_rs_param_95 *p;
3800 	unsigned int header_size, data_size, exp_max_sz;
3801 	struct crypto_ahash *verify_tfm = NULL;
3802 	struct crypto_ahash *csums_tfm = NULL;
3803 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3804 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3805 	const int apv = connection->agreed_pro_version;
3806 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3807 	int fifo_size = 0;
3808 	int err;
3809 
3810 	peer_device = conn_peer_device(connection, pi->vnr);
3811 	if (!peer_device)
3812 		return config_unknown_volume(connection, pi);
3813 	device = peer_device->device;
3814 
3815 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3816 		    : apv == 88 ? sizeof(struct p_rs_param)
3817 					+ SHARED_SECRET_MAX
3818 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3819 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3820 
3821 	if (pi->size > exp_max_sz) {
3822 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3823 		    pi->size, exp_max_sz);
3824 		return -EIO;
3825 	}
3826 
3827 	if (apv <= 88) {
3828 		header_size = sizeof(struct p_rs_param);
3829 		data_size = pi->size - header_size;
3830 	} else if (apv <= 94) {
3831 		header_size = sizeof(struct p_rs_param_89);
3832 		data_size = pi->size - header_size;
3833 		D_ASSERT(device, data_size == 0);
3834 	} else {
3835 		header_size = sizeof(struct p_rs_param_95);
3836 		data_size = pi->size - header_size;
3837 		D_ASSERT(device, data_size == 0);
3838 	}
3839 
3840 	/* initialize verify_alg and csums_alg */
3841 	p = pi->data;
3842 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3843 
3844 	err = drbd_recv_all(peer_device->connection, p, header_size);
3845 	if (err)
3846 		return err;
3847 
3848 	mutex_lock(&connection->resource->conf_update);
3849 	old_net_conf = peer_device->connection->net_conf;
3850 	if (get_ldev(device)) {
3851 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3852 		if (!new_disk_conf) {
3853 			put_ldev(device);
3854 			mutex_unlock(&connection->resource->conf_update);
3855 			drbd_err(device, "Allocation of new disk_conf failed\n");
3856 			return -ENOMEM;
3857 		}
3858 
3859 		old_disk_conf = device->ldev->disk_conf;
3860 		*new_disk_conf = *old_disk_conf;
3861 
3862 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3863 	}
3864 
3865 	if (apv >= 88) {
3866 		if (apv == 88) {
3867 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3868 				drbd_err(device, "verify-alg of wrong size, "
3869 					"peer wants %u, accepting only up to %u byte\n",
3870 					data_size, SHARED_SECRET_MAX);
3871 				err = -EIO;
3872 				goto reconnect;
3873 			}
3874 
3875 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3876 			if (err)
3877 				goto reconnect;
3878 			/* we expect NUL terminated string */
3879 			/* but just in case someone tries to be evil */
3880 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3881 			p->verify_alg[data_size-1] = 0;
3882 
3883 		} else /* apv >= 89 */ {
3884 			/* we still expect NUL terminated strings */
3885 			/* but just in case someone tries to be evil */
3886 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3887 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3888 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3889 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3890 		}
3891 
3892 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3893 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3894 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3895 				    old_net_conf->verify_alg, p->verify_alg);
3896 				goto disconnect;
3897 			}
3898 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3899 					p->verify_alg, "verify-alg");
3900 			if (IS_ERR(verify_tfm)) {
3901 				verify_tfm = NULL;
3902 				goto disconnect;
3903 			}
3904 		}
3905 
3906 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3907 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3908 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3909 				    old_net_conf->csums_alg, p->csums_alg);
3910 				goto disconnect;
3911 			}
3912 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3913 					p->csums_alg, "csums-alg");
3914 			if (IS_ERR(csums_tfm)) {
3915 				csums_tfm = NULL;
3916 				goto disconnect;
3917 			}
3918 		}
3919 
3920 		if (apv > 94 && new_disk_conf) {
3921 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3922 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3923 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3924 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3925 
3926 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3927 			if (fifo_size != device->rs_plan_s->size) {
3928 				new_plan = fifo_alloc(fifo_size);
3929 				if (!new_plan) {
3930 					drbd_err(device, "kmalloc of fifo_buffer failed");
3931 					put_ldev(device);
3932 					goto disconnect;
3933 				}
3934 			}
3935 		}
3936 
3937 		if (verify_tfm || csums_tfm) {
3938 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3939 			if (!new_net_conf) {
3940 				drbd_err(device, "Allocation of new net_conf failed\n");
3941 				goto disconnect;
3942 			}
3943 
3944 			*new_net_conf = *old_net_conf;
3945 
3946 			if (verify_tfm) {
3947 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3948 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3949 				crypto_free_ahash(peer_device->connection->verify_tfm);
3950 				peer_device->connection->verify_tfm = verify_tfm;
3951 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3952 			}
3953 			if (csums_tfm) {
3954 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3955 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3956 				crypto_free_ahash(peer_device->connection->csums_tfm);
3957 				peer_device->connection->csums_tfm = csums_tfm;
3958 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3959 			}
3960 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3961 		}
3962 	}
3963 
3964 	if (new_disk_conf) {
3965 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3966 		put_ldev(device);
3967 	}
3968 
3969 	if (new_plan) {
3970 		old_plan = device->rs_plan_s;
3971 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3972 	}
3973 
3974 	mutex_unlock(&connection->resource->conf_update);
3975 	synchronize_rcu();
3976 	if (new_net_conf)
3977 		kfree(old_net_conf);
3978 	kfree(old_disk_conf);
3979 	kfree(old_plan);
3980 
3981 	return 0;
3982 
3983 reconnect:
3984 	if (new_disk_conf) {
3985 		put_ldev(device);
3986 		kfree(new_disk_conf);
3987 	}
3988 	mutex_unlock(&connection->resource->conf_update);
3989 	return -EIO;
3990 
3991 disconnect:
3992 	kfree(new_plan);
3993 	if (new_disk_conf) {
3994 		put_ldev(device);
3995 		kfree(new_disk_conf);
3996 	}
3997 	mutex_unlock(&connection->resource->conf_update);
3998 	/* just for completeness: actually not needed,
3999 	 * as this is not reached if csums_tfm was ok. */
4000 	crypto_free_ahash(csums_tfm);
4001 	/* but free the verify_tfm again, if csums_tfm did not work out */
4002 	crypto_free_ahash(verify_tfm);
4003 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4004 	return -EIO;
4005 }
4006 
4007 /* warn if the arguments differ by more than 12.5% */
4008 static void warn_if_differ_considerably(struct drbd_device *device,
4009 	const char *s, sector_t a, sector_t b)
4010 {
4011 	sector_t d;
4012 	if (a == 0 || b == 0)
4013 		return;
4014 	d = (a > b) ? (a - b) : (b - a);
4015 	if (d > (a>>3) || d > (b>>3))
4016 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4017 		     (unsigned long long)a, (unsigned long long)b);
4018 }
4019 
4020 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4021 {
4022 	struct drbd_peer_device *peer_device;
4023 	struct drbd_device *device;
4024 	struct p_sizes *p = pi->data;
4025 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4026 	enum determine_dev_size dd = DS_UNCHANGED;
4027 	sector_t p_size, p_usize, p_csize, my_usize;
4028 	int ldsc = 0; /* local disk size changed */
4029 	enum dds_flags ddsf;
4030 
4031 	peer_device = conn_peer_device(connection, pi->vnr);
4032 	if (!peer_device)
4033 		return config_unknown_volume(connection, pi);
4034 	device = peer_device->device;
4035 
4036 	p_size = be64_to_cpu(p->d_size);
4037 	p_usize = be64_to_cpu(p->u_size);
4038 	p_csize = be64_to_cpu(p->c_size);
4039 
4040 	/* just store the peer's disk size for now.
4041 	 * we still need to figure out whether we accept that. */
4042 	device->p_size = p_size;
4043 
4044 	if (get_ldev(device)) {
4045 		sector_t new_size, cur_size;
4046 		rcu_read_lock();
4047 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4048 		rcu_read_unlock();
4049 
4050 		warn_if_differ_considerably(device, "lower level device sizes",
4051 			   p_size, drbd_get_max_capacity(device->ldev));
4052 		warn_if_differ_considerably(device, "user requested size",
4053 					    p_usize, my_usize);
4054 
4055 		/* if this is the first connect, or an otherwise expected
4056 		 * param exchange, choose the minimum */
4057 		if (device->state.conn == C_WF_REPORT_PARAMS)
4058 			p_usize = min_not_zero(my_usize, p_usize);
4059 
4060 		/* Never shrink a device with usable data during connect.
4061 		   But allow online shrinking if we are connected. */
4062 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4063 		cur_size = drbd_get_capacity(device->this_bdev);
4064 		if (new_size < cur_size &&
4065 		    device->state.disk >= D_OUTDATED &&
4066 		    device->state.conn < C_CONNECTED) {
4067 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4068 					(unsigned long long)new_size, (unsigned long long)cur_size);
4069 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4070 			put_ldev(device);
4071 			return -EIO;
4072 		}
4073 
4074 		if (my_usize != p_usize) {
4075 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4076 
4077 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4078 			if (!new_disk_conf) {
4079 				drbd_err(device, "Allocation of new disk_conf failed\n");
4080 				put_ldev(device);
4081 				return -ENOMEM;
4082 			}
4083 
4084 			mutex_lock(&connection->resource->conf_update);
4085 			old_disk_conf = device->ldev->disk_conf;
4086 			*new_disk_conf = *old_disk_conf;
4087 			new_disk_conf->disk_size = p_usize;
4088 
4089 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4090 			mutex_unlock(&connection->resource->conf_update);
4091 			synchronize_rcu();
4092 			kfree(old_disk_conf);
4093 
4094 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
4095 				 (unsigned long)my_usize);
4096 		}
4097 
4098 		put_ldev(device);
4099 	}
4100 
4101 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4102 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4103 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4104 	   drbd_reconsider_queue_parameters(), we can be sure that after
4105 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4106 
4107 	ddsf = be16_to_cpu(p->dds_flags);
4108 	if (get_ldev(device)) {
4109 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4110 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4111 		put_ldev(device);
4112 		if (dd == DS_ERROR)
4113 			return -EIO;
4114 		drbd_md_sync(device);
4115 	} else {
4116 		/*
4117 		 * I am diskless, need to accept the peer's *current* size.
4118 		 * I must NOT accept the peers backing disk size,
4119 		 * it may have been larger than mine all along...
4120 		 *
4121 		 * At this point, the peer knows more about my disk, or at
4122 		 * least about what we last agreed upon, than myself.
4123 		 * So if his c_size is less than his d_size, the most likely
4124 		 * reason is that *my* d_size was smaller last time we checked.
4125 		 *
4126 		 * However, if he sends a zero current size,
4127 		 * take his (user-capped or) backing disk size anyways.
4128 		 */
4129 		drbd_reconsider_queue_parameters(device, NULL, o);
4130 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4131 	}
4132 
4133 	if (get_ldev(device)) {
4134 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4135 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4136 			ldsc = 1;
4137 		}
4138 
4139 		put_ldev(device);
4140 	}
4141 
4142 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4143 		if (be64_to_cpu(p->c_size) !=
4144 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4145 			/* we have different sizes, probably peer
4146 			 * needs to know my new size... */
4147 			drbd_send_sizes(peer_device, 0, ddsf);
4148 		}
4149 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4150 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4151 			if (device->state.pdsk >= D_INCONSISTENT &&
4152 			    device->state.disk >= D_INCONSISTENT) {
4153 				if (ddsf & DDSF_NO_RESYNC)
4154 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4155 				else
4156 					resync_after_online_grow(device);
4157 			} else
4158 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4159 		}
4160 	}
4161 
4162 	return 0;
4163 }
4164 
4165 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4166 {
4167 	struct drbd_peer_device *peer_device;
4168 	struct drbd_device *device;
4169 	struct p_uuids *p = pi->data;
4170 	u64 *p_uuid;
4171 	int i, updated_uuids = 0;
4172 
4173 	peer_device = conn_peer_device(connection, pi->vnr);
4174 	if (!peer_device)
4175 		return config_unknown_volume(connection, pi);
4176 	device = peer_device->device;
4177 
4178 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4179 	if (!p_uuid) {
4180 		drbd_err(device, "kmalloc of p_uuid failed\n");
4181 		return false;
4182 	}
4183 
4184 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4185 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4186 
4187 	kfree(device->p_uuid);
4188 	device->p_uuid = p_uuid;
4189 
4190 	if (device->state.conn < C_CONNECTED &&
4191 	    device->state.disk < D_INCONSISTENT &&
4192 	    device->state.role == R_PRIMARY &&
4193 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4194 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4195 		    (unsigned long long)device->ed_uuid);
4196 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4197 		return -EIO;
4198 	}
4199 
4200 	if (get_ldev(device)) {
4201 		int skip_initial_sync =
4202 			device->state.conn == C_CONNECTED &&
4203 			peer_device->connection->agreed_pro_version >= 90 &&
4204 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4205 			(p_uuid[UI_FLAGS] & 8);
4206 		if (skip_initial_sync) {
4207 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4208 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4209 					"clear_n_write from receive_uuids",
4210 					BM_LOCKED_TEST_ALLOWED);
4211 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4212 			_drbd_uuid_set(device, UI_BITMAP, 0);
4213 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4214 					CS_VERBOSE, NULL);
4215 			drbd_md_sync(device);
4216 			updated_uuids = 1;
4217 		}
4218 		put_ldev(device);
4219 	} else if (device->state.disk < D_INCONSISTENT &&
4220 		   device->state.role == R_PRIMARY) {
4221 		/* I am a diskless primary, the peer just created a new current UUID
4222 		   for me. */
4223 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4224 	}
4225 
4226 	/* Before we test for the disk state, we should wait until an eventually
4227 	   ongoing cluster wide state change is finished. That is important if
4228 	   we are primary and are detaching from our disk. We need to see the
4229 	   new disk state... */
4230 	mutex_lock(device->state_mutex);
4231 	mutex_unlock(device->state_mutex);
4232 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4233 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4234 
4235 	if (updated_uuids)
4236 		drbd_print_uuids(device, "receiver updated UUIDs to");
4237 
4238 	return 0;
4239 }
4240 
4241 /**
4242  * convert_state() - Converts the peer's view of the cluster state to our point of view
4243  * @ps:		The state as seen by the peer.
4244  */
4245 static union drbd_state convert_state(union drbd_state ps)
4246 {
4247 	union drbd_state ms;
4248 
4249 	static enum drbd_conns c_tab[] = {
4250 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4251 		[C_CONNECTED] = C_CONNECTED,
4252 
4253 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4254 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4255 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4256 		[C_VERIFY_S]       = C_VERIFY_T,
4257 		[C_MASK]   = C_MASK,
4258 	};
4259 
4260 	ms.i = ps.i;
4261 
4262 	ms.conn = c_tab[ps.conn];
4263 	ms.peer = ps.role;
4264 	ms.role = ps.peer;
4265 	ms.pdsk = ps.disk;
4266 	ms.disk = ps.pdsk;
4267 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4268 
4269 	return ms;
4270 }
4271 
4272 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4273 {
4274 	struct drbd_peer_device *peer_device;
4275 	struct drbd_device *device;
4276 	struct p_req_state *p = pi->data;
4277 	union drbd_state mask, val;
4278 	enum drbd_state_rv rv;
4279 
4280 	peer_device = conn_peer_device(connection, pi->vnr);
4281 	if (!peer_device)
4282 		return -EIO;
4283 	device = peer_device->device;
4284 
4285 	mask.i = be32_to_cpu(p->mask);
4286 	val.i = be32_to_cpu(p->val);
4287 
4288 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4289 	    mutex_is_locked(device->state_mutex)) {
4290 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4291 		return 0;
4292 	}
4293 
4294 	mask = convert_state(mask);
4295 	val = convert_state(val);
4296 
4297 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4298 	drbd_send_sr_reply(peer_device, rv);
4299 
4300 	drbd_md_sync(device);
4301 
4302 	return 0;
4303 }
4304 
4305 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4306 {
4307 	struct p_req_state *p = pi->data;
4308 	union drbd_state mask, val;
4309 	enum drbd_state_rv rv;
4310 
4311 	mask.i = be32_to_cpu(p->mask);
4312 	val.i = be32_to_cpu(p->val);
4313 
4314 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4315 	    mutex_is_locked(&connection->cstate_mutex)) {
4316 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4317 		return 0;
4318 	}
4319 
4320 	mask = convert_state(mask);
4321 	val = convert_state(val);
4322 
4323 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4324 	conn_send_sr_reply(connection, rv);
4325 
4326 	return 0;
4327 }
4328 
4329 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4330 {
4331 	struct drbd_peer_device *peer_device;
4332 	struct drbd_device *device;
4333 	struct p_state *p = pi->data;
4334 	union drbd_state os, ns, peer_state;
4335 	enum drbd_disk_state real_peer_disk;
4336 	enum chg_state_flags cs_flags;
4337 	int rv;
4338 
4339 	peer_device = conn_peer_device(connection, pi->vnr);
4340 	if (!peer_device)
4341 		return config_unknown_volume(connection, pi);
4342 	device = peer_device->device;
4343 
4344 	peer_state.i = be32_to_cpu(p->state);
4345 
4346 	real_peer_disk = peer_state.disk;
4347 	if (peer_state.disk == D_NEGOTIATING) {
4348 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4349 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4350 	}
4351 
4352 	spin_lock_irq(&device->resource->req_lock);
4353  retry:
4354 	os = ns = drbd_read_state(device);
4355 	spin_unlock_irq(&device->resource->req_lock);
4356 
4357 	/* If some other part of the code (ack_receiver thread, timeout)
4358 	 * already decided to close the connection again,
4359 	 * we must not "re-establish" it here. */
4360 	if (os.conn <= C_TEAR_DOWN)
4361 		return -ECONNRESET;
4362 
4363 	/* If this is the "end of sync" confirmation, usually the peer disk
4364 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4365 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4366 	 * unpause-sync events has been "just right", the peer disk may
4367 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4368 	 */
4369 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4370 	    real_peer_disk == D_UP_TO_DATE &&
4371 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4372 		/* If we are (becoming) SyncSource, but peer is still in sync
4373 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4374 		 * will change to inconsistent once the peer reaches active
4375 		 * syncing states.
4376 		 * It may have changed syncer-paused flags, however, so we
4377 		 * cannot ignore this completely. */
4378 		if (peer_state.conn > C_CONNECTED &&
4379 		    peer_state.conn < C_SYNC_SOURCE)
4380 			real_peer_disk = D_INCONSISTENT;
4381 
4382 		/* if peer_state changes to connected at the same time,
4383 		 * it explicitly notifies us that it finished resync.
4384 		 * Maybe we should finish it up, too? */
4385 		else if (os.conn >= C_SYNC_SOURCE &&
4386 			 peer_state.conn == C_CONNECTED) {
4387 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4388 				drbd_resync_finished(device);
4389 			return 0;
4390 		}
4391 	}
4392 
4393 	/* explicit verify finished notification, stop sector reached. */
4394 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4395 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4396 		ov_out_of_sync_print(device);
4397 		drbd_resync_finished(device);
4398 		return 0;
4399 	}
4400 
4401 	/* peer says his disk is inconsistent, while we think it is uptodate,
4402 	 * and this happens while the peer still thinks we have a sync going on,
4403 	 * but we think we are already done with the sync.
4404 	 * We ignore this to avoid flapping pdsk.
4405 	 * This should not happen, if the peer is a recent version of drbd. */
4406 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4407 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4408 		real_peer_disk = D_UP_TO_DATE;
4409 
4410 	if (ns.conn == C_WF_REPORT_PARAMS)
4411 		ns.conn = C_CONNECTED;
4412 
4413 	if (peer_state.conn == C_AHEAD)
4414 		ns.conn = C_BEHIND;
4415 
4416 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4417 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4418 		int cr; /* consider resync */
4419 
4420 		/* if we established a new connection */
4421 		cr  = (os.conn < C_CONNECTED);
4422 		/* if we had an established connection
4423 		 * and one of the nodes newly attaches a disk */
4424 		cr |= (os.conn == C_CONNECTED &&
4425 		       (peer_state.disk == D_NEGOTIATING ||
4426 			os.disk == D_NEGOTIATING));
4427 		/* if we have both been inconsistent, and the peer has been
4428 		 * forced to be UpToDate with --overwrite-data */
4429 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4430 		/* if we had been plain connected, and the admin requested to
4431 		 * start a sync by "invalidate" or "invalidate-remote" */
4432 		cr |= (os.conn == C_CONNECTED &&
4433 				(peer_state.conn >= C_STARTING_SYNC_S &&
4434 				 peer_state.conn <= C_WF_BITMAP_T));
4435 
4436 		if (cr)
4437 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4438 
4439 		put_ldev(device);
4440 		if (ns.conn == C_MASK) {
4441 			ns.conn = C_CONNECTED;
4442 			if (device->state.disk == D_NEGOTIATING) {
4443 				drbd_force_state(device, NS(disk, D_FAILED));
4444 			} else if (peer_state.disk == D_NEGOTIATING) {
4445 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4446 				peer_state.disk = D_DISKLESS;
4447 				real_peer_disk = D_DISKLESS;
4448 			} else {
4449 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4450 					return -EIO;
4451 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4452 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4453 				return -EIO;
4454 			}
4455 		}
4456 	}
4457 
4458 	spin_lock_irq(&device->resource->req_lock);
4459 	if (os.i != drbd_read_state(device).i)
4460 		goto retry;
4461 	clear_bit(CONSIDER_RESYNC, &device->flags);
4462 	ns.peer = peer_state.role;
4463 	ns.pdsk = real_peer_disk;
4464 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4465 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4466 		ns.disk = device->new_state_tmp.disk;
4467 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4468 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4469 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4470 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4471 		   for temporal network outages! */
4472 		spin_unlock_irq(&device->resource->req_lock);
4473 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4474 		tl_clear(peer_device->connection);
4475 		drbd_uuid_new_current(device);
4476 		clear_bit(NEW_CUR_UUID, &device->flags);
4477 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4478 		return -EIO;
4479 	}
4480 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4481 	ns = drbd_read_state(device);
4482 	spin_unlock_irq(&device->resource->req_lock);
4483 
4484 	if (rv < SS_SUCCESS) {
4485 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4486 		return -EIO;
4487 	}
4488 
4489 	if (os.conn > C_WF_REPORT_PARAMS) {
4490 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4491 		    peer_state.disk != D_NEGOTIATING ) {
4492 			/* we want resync, peer has not yet decided to sync... */
4493 			/* Nowadays only used when forcing a node into primary role and
4494 			   setting its disk to UpToDate with that */
4495 			drbd_send_uuids(peer_device);
4496 			drbd_send_current_state(peer_device);
4497 		}
4498 	}
4499 
4500 	clear_bit(DISCARD_MY_DATA, &device->flags);
4501 
4502 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4503 
4504 	return 0;
4505 }
4506 
4507 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4508 {
4509 	struct drbd_peer_device *peer_device;
4510 	struct drbd_device *device;
4511 	struct p_rs_uuid *p = pi->data;
4512 
4513 	peer_device = conn_peer_device(connection, pi->vnr);
4514 	if (!peer_device)
4515 		return -EIO;
4516 	device = peer_device->device;
4517 
4518 	wait_event(device->misc_wait,
4519 		   device->state.conn == C_WF_SYNC_UUID ||
4520 		   device->state.conn == C_BEHIND ||
4521 		   device->state.conn < C_CONNECTED ||
4522 		   device->state.disk < D_NEGOTIATING);
4523 
4524 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4525 
4526 	/* Here the _drbd_uuid_ functions are right, current should
4527 	   _not_ be rotated into the history */
4528 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4529 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4530 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4531 
4532 		drbd_print_uuids(device, "updated sync uuid");
4533 		drbd_start_resync(device, C_SYNC_TARGET);
4534 
4535 		put_ldev(device);
4536 	} else
4537 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4538 
4539 	return 0;
4540 }
4541 
4542 /**
4543  * receive_bitmap_plain
4544  *
4545  * Return 0 when done, 1 when another iteration is needed, and a negative error
4546  * code upon failure.
4547  */
4548 static int
4549 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4550 		     unsigned long *p, struct bm_xfer_ctx *c)
4551 {
4552 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4553 				 drbd_header_size(peer_device->connection);
4554 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4555 				       c->bm_words - c->word_offset);
4556 	unsigned int want = num_words * sizeof(*p);
4557 	int err;
4558 
4559 	if (want != size) {
4560 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4561 		return -EIO;
4562 	}
4563 	if (want == 0)
4564 		return 0;
4565 	err = drbd_recv_all(peer_device->connection, p, want);
4566 	if (err)
4567 		return err;
4568 
4569 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4570 
4571 	c->word_offset += num_words;
4572 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4573 	if (c->bit_offset > c->bm_bits)
4574 		c->bit_offset = c->bm_bits;
4575 
4576 	return 1;
4577 }
4578 
4579 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4580 {
4581 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4582 }
4583 
4584 static int dcbp_get_start(struct p_compressed_bm *p)
4585 {
4586 	return (p->encoding & 0x80) != 0;
4587 }
4588 
4589 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4590 {
4591 	return (p->encoding >> 4) & 0x7;
4592 }
4593 
4594 /**
4595  * recv_bm_rle_bits
4596  *
4597  * Return 0 when done, 1 when another iteration is needed, and a negative error
4598  * code upon failure.
4599  */
4600 static int
4601 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4602 		struct p_compressed_bm *p,
4603 		 struct bm_xfer_ctx *c,
4604 		 unsigned int len)
4605 {
4606 	struct bitstream bs;
4607 	u64 look_ahead;
4608 	u64 rl;
4609 	u64 tmp;
4610 	unsigned long s = c->bit_offset;
4611 	unsigned long e;
4612 	int toggle = dcbp_get_start(p);
4613 	int have;
4614 	int bits;
4615 
4616 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4617 
4618 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4619 	if (bits < 0)
4620 		return -EIO;
4621 
4622 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4623 		bits = vli_decode_bits(&rl, look_ahead);
4624 		if (bits <= 0)
4625 			return -EIO;
4626 
4627 		if (toggle) {
4628 			e = s + rl -1;
4629 			if (e >= c->bm_bits) {
4630 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4631 				return -EIO;
4632 			}
4633 			_drbd_bm_set_bits(peer_device->device, s, e);
4634 		}
4635 
4636 		if (have < bits) {
4637 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4638 				have, bits, look_ahead,
4639 				(unsigned int)(bs.cur.b - p->code),
4640 				(unsigned int)bs.buf_len);
4641 			return -EIO;
4642 		}
4643 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4644 		if (likely(bits < 64))
4645 			look_ahead >>= bits;
4646 		else
4647 			look_ahead = 0;
4648 		have -= bits;
4649 
4650 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4651 		if (bits < 0)
4652 			return -EIO;
4653 		look_ahead |= tmp << have;
4654 		have += bits;
4655 	}
4656 
4657 	c->bit_offset = s;
4658 	bm_xfer_ctx_bit_to_word_offset(c);
4659 
4660 	return (s != c->bm_bits);
4661 }
4662 
4663 /**
4664  * decode_bitmap_c
4665  *
4666  * Return 0 when done, 1 when another iteration is needed, and a negative error
4667  * code upon failure.
4668  */
4669 static int
4670 decode_bitmap_c(struct drbd_peer_device *peer_device,
4671 		struct p_compressed_bm *p,
4672 		struct bm_xfer_ctx *c,
4673 		unsigned int len)
4674 {
4675 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4676 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4677 
4678 	/* other variants had been implemented for evaluation,
4679 	 * but have been dropped as this one turned out to be "best"
4680 	 * during all our tests. */
4681 
4682 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4683 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4684 	return -EIO;
4685 }
4686 
4687 void INFO_bm_xfer_stats(struct drbd_device *device,
4688 		const char *direction, struct bm_xfer_ctx *c)
4689 {
4690 	/* what would it take to transfer it "plaintext" */
4691 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4692 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4693 	unsigned int plain =
4694 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4695 		c->bm_words * sizeof(unsigned long);
4696 	unsigned int total = c->bytes[0] + c->bytes[1];
4697 	unsigned int r;
4698 
4699 	/* total can not be zero. but just in case: */
4700 	if (total == 0)
4701 		return;
4702 
4703 	/* don't report if not compressed */
4704 	if (total >= plain)
4705 		return;
4706 
4707 	/* total < plain. check for overflow, still */
4708 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4709 		                    : (1000 * total / plain);
4710 
4711 	if (r > 1000)
4712 		r = 1000;
4713 
4714 	r = 1000 - r;
4715 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4716 	     "total %u; compression: %u.%u%%\n",
4717 			direction,
4718 			c->bytes[1], c->packets[1],
4719 			c->bytes[0], c->packets[0],
4720 			total, r/10, r % 10);
4721 }
4722 
4723 /* Since we are processing the bitfield from lower addresses to higher,
4724    it does not matter if the process it in 32 bit chunks or 64 bit
4725    chunks as long as it is little endian. (Understand it as byte stream,
4726    beginning with the lowest byte...) If we would use big endian
4727    we would need to process it from the highest address to the lowest,
4728    in order to be agnostic to the 32 vs 64 bits issue.
4729 
4730    returns 0 on failure, 1 if we successfully received it. */
4731 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4732 {
4733 	struct drbd_peer_device *peer_device;
4734 	struct drbd_device *device;
4735 	struct bm_xfer_ctx c;
4736 	int err;
4737 
4738 	peer_device = conn_peer_device(connection, pi->vnr);
4739 	if (!peer_device)
4740 		return -EIO;
4741 	device = peer_device->device;
4742 
4743 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4744 	/* you are supposed to send additional out-of-sync information
4745 	 * if you actually set bits during this phase */
4746 
4747 	c = (struct bm_xfer_ctx) {
4748 		.bm_bits = drbd_bm_bits(device),
4749 		.bm_words = drbd_bm_words(device),
4750 	};
4751 
4752 	for(;;) {
4753 		if (pi->cmd == P_BITMAP)
4754 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4755 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4756 			/* MAYBE: sanity check that we speak proto >= 90,
4757 			 * and the feature is enabled! */
4758 			struct p_compressed_bm *p = pi->data;
4759 
4760 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4761 				drbd_err(device, "ReportCBitmap packet too large\n");
4762 				err = -EIO;
4763 				goto out;
4764 			}
4765 			if (pi->size <= sizeof(*p)) {
4766 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4767 				err = -EIO;
4768 				goto out;
4769 			}
4770 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4771 			if (err)
4772 			       goto out;
4773 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4774 		} else {
4775 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4776 			err = -EIO;
4777 			goto out;
4778 		}
4779 
4780 		c.packets[pi->cmd == P_BITMAP]++;
4781 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4782 
4783 		if (err <= 0) {
4784 			if (err < 0)
4785 				goto out;
4786 			break;
4787 		}
4788 		err = drbd_recv_header(peer_device->connection, pi);
4789 		if (err)
4790 			goto out;
4791 	}
4792 
4793 	INFO_bm_xfer_stats(device, "receive", &c);
4794 
4795 	if (device->state.conn == C_WF_BITMAP_T) {
4796 		enum drbd_state_rv rv;
4797 
4798 		err = drbd_send_bitmap(device);
4799 		if (err)
4800 			goto out;
4801 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4802 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4803 		D_ASSERT(device, rv == SS_SUCCESS);
4804 	} else if (device->state.conn != C_WF_BITMAP_S) {
4805 		/* admin may have requested C_DISCONNECTING,
4806 		 * other threads may have noticed network errors */
4807 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4808 		    drbd_conn_str(device->state.conn));
4809 	}
4810 	err = 0;
4811 
4812  out:
4813 	drbd_bm_unlock(device);
4814 	if (!err && device->state.conn == C_WF_BITMAP_S)
4815 		drbd_start_resync(device, C_SYNC_SOURCE);
4816 	return err;
4817 }
4818 
4819 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4820 {
4821 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4822 		 pi->cmd, pi->size);
4823 
4824 	return ignore_remaining_packet(connection, pi);
4825 }
4826 
4827 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4828 {
4829 	/* Make sure we've acked all the TCP data associated
4830 	 * with the data requests being unplugged */
4831 	drbd_tcp_quickack(connection->data.socket);
4832 
4833 	return 0;
4834 }
4835 
4836 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4837 {
4838 	struct drbd_peer_device *peer_device;
4839 	struct drbd_device *device;
4840 	struct p_block_desc *p = pi->data;
4841 
4842 	peer_device = conn_peer_device(connection, pi->vnr);
4843 	if (!peer_device)
4844 		return -EIO;
4845 	device = peer_device->device;
4846 
4847 	switch (device->state.conn) {
4848 	case C_WF_SYNC_UUID:
4849 	case C_WF_BITMAP_T:
4850 	case C_BEHIND:
4851 			break;
4852 	default:
4853 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4854 				drbd_conn_str(device->state.conn));
4855 	}
4856 
4857 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4858 
4859 	return 0;
4860 }
4861 
4862 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4863 {
4864 	struct drbd_peer_device *peer_device;
4865 	struct p_block_desc *p = pi->data;
4866 	struct drbd_device *device;
4867 	sector_t sector;
4868 	int size, err = 0;
4869 
4870 	peer_device = conn_peer_device(connection, pi->vnr);
4871 	if (!peer_device)
4872 		return -EIO;
4873 	device = peer_device->device;
4874 
4875 	sector = be64_to_cpu(p->sector);
4876 	size = be32_to_cpu(p->blksize);
4877 
4878 	dec_rs_pending(device);
4879 
4880 	if (get_ldev(device)) {
4881 		struct drbd_peer_request *peer_req;
4882 		const int op = REQ_OP_DISCARD;
4883 
4884 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4885 					       size, 0, GFP_NOIO);
4886 		if (!peer_req) {
4887 			put_ldev(device);
4888 			return -ENOMEM;
4889 		}
4890 
4891 		peer_req->w.cb = e_end_resync_block;
4892 		peer_req->submit_jif = jiffies;
4893 		peer_req->flags |= EE_IS_TRIM;
4894 
4895 		spin_lock_irq(&device->resource->req_lock);
4896 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4897 		spin_unlock_irq(&device->resource->req_lock);
4898 
4899 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4900 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4901 
4902 		if (err) {
4903 			spin_lock_irq(&device->resource->req_lock);
4904 			list_del(&peer_req->w.list);
4905 			spin_unlock_irq(&device->resource->req_lock);
4906 
4907 			drbd_free_peer_req(device, peer_req);
4908 			put_ldev(device);
4909 			err = 0;
4910 			goto fail;
4911 		}
4912 
4913 		inc_unacked(device);
4914 
4915 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4916 		   as well as drbd_rs_complete_io() */
4917 	} else {
4918 	fail:
4919 		drbd_rs_complete_io(device, sector);
4920 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4921 	}
4922 
4923 	atomic_add(size >> 9, &device->rs_sect_in);
4924 
4925 	return err;
4926 }
4927 
4928 struct data_cmd {
4929 	int expect_payload;
4930 	unsigned int pkt_size;
4931 	int (*fn)(struct drbd_connection *, struct packet_info *);
4932 };
4933 
4934 static struct data_cmd drbd_cmd_handler[] = {
4935 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4936 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4937 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4938 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4939 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4940 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4941 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4942 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4943 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4944 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4945 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4946 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4947 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4948 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4949 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4950 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4951 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4952 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4953 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4954 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4955 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4956 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4957 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4958 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4959 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4960 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4961 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4962 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
4963 };
4964 
4965 static void drbdd(struct drbd_connection *connection)
4966 {
4967 	struct packet_info pi;
4968 	size_t shs; /* sub header size */
4969 	int err;
4970 
4971 	while (get_t_state(&connection->receiver) == RUNNING) {
4972 		struct data_cmd const *cmd;
4973 
4974 		drbd_thread_current_set_cpu(&connection->receiver);
4975 		update_receiver_timing_details(connection, drbd_recv_header);
4976 		if (drbd_recv_header(connection, &pi))
4977 			goto err_out;
4978 
4979 		cmd = &drbd_cmd_handler[pi.cmd];
4980 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4981 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4982 				 cmdname(pi.cmd), pi.cmd);
4983 			goto err_out;
4984 		}
4985 
4986 		shs = cmd->pkt_size;
4987 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4988 			shs += sizeof(struct o_qlim);
4989 		if (pi.size > shs && !cmd->expect_payload) {
4990 			drbd_err(connection, "No payload expected %s l:%d\n",
4991 				 cmdname(pi.cmd), pi.size);
4992 			goto err_out;
4993 		}
4994 		if (pi.size < shs) {
4995 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4996 				 cmdname(pi.cmd), (int)shs, pi.size);
4997 			goto err_out;
4998 		}
4999 
5000 		if (shs) {
5001 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5002 			err = drbd_recv_all_warn(connection, pi.data, shs);
5003 			if (err)
5004 				goto err_out;
5005 			pi.size -= shs;
5006 		}
5007 
5008 		update_receiver_timing_details(connection, cmd->fn);
5009 		err = cmd->fn(connection, &pi);
5010 		if (err) {
5011 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5012 				 cmdname(pi.cmd), err, pi.size);
5013 			goto err_out;
5014 		}
5015 	}
5016 	return;
5017 
5018     err_out:
5019 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5020 }
5021 
5022 static void conn_disconnect(struct drbd_connection *connection)
5023 {
5024 	struct drbd_peer_device *peer_device;
5025 	enum drbd_conns oc;
5026 	int vnr;
5027 
5028 	if (connection->cstate == C_STANDALONE)
5029 		return;
5030 
5031 	/* We are about to start the cleanup after connection loss.
5032 	 * Make sure drbd_make_request knows about that.
5033 	 * Usually we should be in some network failure state already,
5034 	 * but just in case we are not, we fix it up here.
5035 	 */
5036 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5037 
5038 	/* ack_receiver does not clean up anything. it must not interfere, either */
5039 	drbd_thread_stop(&connection->ack_receiver);
5040 	if (connection->ack_sender) {
5041 		destroy_workqueue(connection->ack_sender);
5042 		connection->ack_sender = NULL;
5043 	}
5044 	drbd_free_sock(connection);
5045 
5046 	rcu_read_lock();
5047 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5048 		struct drbd_device *device = peer_device->device;
5049 		kref_get(&device->kref);
5050 		rcu_read_unlock();
5051 		drbd_disconnected(peer_device);
5052 		kref_put(&device->kref, drbd_destroy_device);
5053 		rcu_read_lock();
5054 	}
5055 	rcu_read_unlock();
5056 
5057 	if (!list_empty(&connection->current_epoch->list))
5058 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5059 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5060 	atomic_set(&connection->current_epoch->epoch_size, 0);
5061 	connection->send.seen_any_write_yet = false;
5062 
5063 	drbd_info(connection, "Connection closed\n");
5064 
5065 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5066 		conn_try_outdate_peer_async(connection);
5067 
5068 	spin_lock_irq(&connection->resource->req_lock);
5069 	oc = connection->cstate;
5070 	if (oc >= C_UNCONNECTED)
5071 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5072 
5073 	spin_unlock_irq(&connection->resource->req_lock);
5074 
5075 	if (oc == C_DISCONNECTING)
5076 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5077 }
5078 
5079 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5080 {
5081 	struct drbd_device *device = peer_device->device;
5082 	unsigned int i;
5083 
5084 	/* wait for current activity to cease. */
5085 	spin_lock_irq(&device->resource->req_lock);
5086 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5087 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5088 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5089 	spin_unlock_irq(&device->resource->req_lock);
5090 
5091 	/* We do not have data structures that would allow us to
5092 	 * get the rs_pending_cnt down to 0 again.
5093 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5094 	 *    the pending RSDataRequest's we have sent.
5095 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5096 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5097 	 *  And no, it is not the sum of the reference counts in the
5098 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5099 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5100 	 *  on the fly. */
5101 	drbd_rs_cancel_all(device);
5102 	device->rs_total = 0;
5103 	device->rs_failed = 0;
5104 	atomic_set(&device->rs_pending_cnt, 0);
5105 	wake_up(&device->misc_wait);
5106 
5107 	del_timer_sync(&device->resync_timer);
5108 	resync_timer_fn((unsigned long)device);
5109 
5110 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5111 	 * w_make_resync_request etc. which may still be on the worker queue
5112 	 * to be "canceled" */
5113 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5114 
5115 	drbd_finish_peer_reqs(device);
5116 
5117 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5118 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5119 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5120 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5121 
5122 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5123 	 * again via drbd_try_clear_on_disk_bm(). */
5124 	drbd_rs_cancel_all(device);
5125 
5126 	kfree(device->p_uuid);
5127 	device->p_uuid = NULL;
5128 
5129 	if (!drbd_suspended(device))
5130 		tl_clear(peer_device->connection);
5131 
5132 	drbd_md_sync(device);
5133 
5134 	if (get_ldev(device)) {
5135 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5136 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5137 		put_ldev(device);
5138 	}
5139 
5140 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5141 	 * want to use SO_LINGER, because apparently it can be deferred for
5142 	 * more than 20 seconds (longest time I checked).
5143 	 *
5144 	 * Actually we don't care for exactly when the network stack does its
5145 	 * put_page(), but release our reference on these pages right here.
5146 	 */
5147 	i = drbd_free_peer_reqs(device, &device->net_ee);
5148 	if (i)
5149 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5150 	i = atomic_read(&device->pp_in_use_by_net);
5151 	if (i)
5152 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5153 	i = atomic_read(&device->pp_in_use);
5154 	if (i)
5155 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5156 
5157 	D_ASSERT(device, list_empty(&device->read_ee));
5158 	D_ASSERT(device, list_empty(&device->active_ee));
5159 	D_ASSERT(device, list_empty(&device->sync_ee));
5160 	D_ASSERT(device, list_empty(&device->done_ee));
5161 
5162 	return 0;
5163 }
5164 
5165 /*
5166  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5167  * we can agree on is stored in agreed_pro_version.
5168  *
5169  * feature flags and the reserved array should be enough room for future
5170  * enhancements of the handshake protocol, and possible plugins...
5171  *
5172  * for now, they are expected to be zero, but ignored.
5173  */
5174 static int drbd_send_features(struct drbd_connection *connection)
5175 {
5176 	struct drbd_socket *sock;
5177 	struct p_connection_features *p;
5178 
5179 	sock = &connection->data;
5180 	p = conn_prepare_command(connection, sock);
5181 	if (!p)
5182 		return -EIO;
5183 	memset(p, 0, sizeof(*p));
5184 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5185 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5186 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5187 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5188 }
5189 
5190 /*
5191  * return values:
5192  *   1 yes, we have a valid connection
5193  *   0 oops, did not work out, please try again
5194  *  -1 peer talks different language,
5195  *     no point in trying again, please go standalone.
5196  */
5197 static int drbd_do_features(struct drbd_connection *connection)
5198 {
5199 	/* ASSERT current == connection->receiver ... */
5200 	struct p_connection_features *p;
5201 	const int expect = sizeof(struct p_connection_features);
5202 	struct packet_info pi;
5203 	int err;
5204 
5205 	err = drbd_send_features(connection);
5206 	if (err)
5207 		return 0;
5208 
5209 	err = drbd_recv_header(connection, &pi);
5210 	if (err)
5211 		return 0;
5212 
5213 	if (pi.cmd != P_CONNECTION_FEATURES) {
5214 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5215 			 cmdname(pi.cmd), pi.cmd);
5216 		return -1;
5217 	}
5218 
5219 	if (pi.size != expect) {
5220 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5221 		     expect, pi.size);
5222 		return -1;
5223 	}
5224 
5225 	p = pi.data;
5226 	err = drbd_recv_all_warn(connection, p, expect);
5227 	if (err)
5228 		return 0;
5229 
5230 	p->protocol_min = be32_to_cpu(p->protocol_min);
5231 	p->protocol_max = be32_to_cpu(p->protocol_max);
5232 	if (p->protocol_max == 0)
5233 		p->protocol_max = p->protocol_min;
5234 
5235 	if (PRO_VERSION_MAX < p->protocol_min ||
5236 	    PRO_VERSION_MIN > p->protocol_max)
5237 		goto incompat;
5238 
5239 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5240 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5241 
5242 	drbd_info(connection, "Handshake successful: "
5243 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5244 
5245 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5246 		  connection->agreed_features,
5247 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5248 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5249 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5250 		  connection->agreed_features ? "" : " none");
5251 
5252 	return 1;
5253 
5254  incompat:
5255 	drbd_err(connection, "incompatible DRBD dialects: "
5256 	    "I support %d-%d, peer supports %d-%d\n",
5257 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5258 	    p->protocol_min, p->protocol_max);
5259 	return -1;
5260 }
5261 
5262 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5263 static int drbd_do_auth(struct drbd_connection *connection)
5264 {
5265 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5266 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5267 	return -1;
5268 }
5269 #else
5270 #define CHALLENGE_LEN 64
5271 
5272 /* Return value:
5273 	1 - auth succeeded,
5274 	0 - failed, try again (network error),
5275 	-1 - auth failed, don't try again.
5276 */
5277 
5278 static int drbd_do_auth(struct drbd_connection *connection)
5279 {
5280 	struct drbd_socket *sock;
5281 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5282 	char *response = NULL;
5283 	char *right_response = NULL;
5284 	char *peers_ch = NULL;
5285 	unsigned int key_len;
5286 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5287 	unsigned int resp_size;
5288 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5289 	struct packet_info pi;
5290 	struct net_conf *nc;
5291 	int err, rv;
5292 
5293 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5294 
5295 	rcu_read_lock();
5296 	nc = rcu_dereference(connection->net_conf);
5297 	key_len = strlen(nc->shared_secret);
5298 	memcpy(secret, nc->shared_secret, key_len);
5299 	rcu_read_unlock();
5300 
5301 	desc->tfm = connection->cram_hmac_tfm;
5302 	desc->flags = 0;
5303 
5304 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5305 	if (rv) {
5306 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5307 		rv = -1;
5308 		goto fail;
5309 	}
5310 
5311 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5312 
5313 	sock = &connection->data;
5314 	if (!conn_prepare_command(connection, sock)) {
5315 		rv = 0;
5316 		goto fail;
5317 	}
5318 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5319 				my_challenge, CHALLENGE_LEN);
5320 	if (!rv)
5321 		goto fail;
5322 
5323 	err = drbd_recv_header(connection, &pi);
5324 	if (err) {
5325 		rv = 0;
5326 		goto fail;
5327 	}
5328 
5329 	if (pi.cmd != P_AUTH_CHALLENGE) {
5330 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5331 			 cmdname(pi.cmd), pi.cmd);
5332 		rv = 0;
5333 		goto fail;
5334 	}
5335 
5336 	if (pi.size > CHALLENGE_LEN * 2) {
5337 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5338 		rv = -1;
5339 		goto fail;
5340 	}
5341 
5342 	if (pi.size < CHALLENGE_LEN) {
5343 		drbd_err(connection, "AuthChallenge payload too small.\n");
5344 		rv = -1;
5345 		goto fail;
5346 	}
5347 
5348 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5349 	if (peers_ch == NULL) {
5350 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5351 		rv = -1;
5352 		goto fail;
5353 	}
5354 
5355 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5356 	if (err) {
5357 		rv = 0;
5358 		goto fail;
5359 	}
5360 
5361 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5362 		drbd_err(connection, "Peer presented the same challenge!\n");
5363 		rv = -1;
5364 		goto fail;
5365 	}
5366 
5367 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5368 	response = kmalloc(resp_size, GFP_NOIO);
5369 	if (response == NULL) {
5370 		drbd_err(connection, "kmalloc of response failed\n");
5371 		rv = -1;
5372 		goto fail;
5373 	}
5374 
5375 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5376 	if (rv) {
5377 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5378 		rv = -1;
5379 		goto fail;
5380 	}
5381 
5382 	if (!conn_prepare_command(connection, sock)) {
5383 		rv = 0;
5384 		goto fail;
5385 	}
5386 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5387 				response, resp_size);
5388 	if (!rv)
5389 		goto fail;
5390 
5391 	err = drbd_recv_header(connection, &pi);
5392 	if (err) {
5393 		rv = 0;
5394 		goto fail;
5395 	}
5396 
5397 	if (pi.cmd != P_AUTH_RESPONSE) {
5398 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5399 			 cmdname(pi.cmd), pi.cmd);
5400 		rv = 0;
5401 		goto fail;
5402 	}
5403 
5404 	if (pi.size != resp_size) {
5405 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5406 		rv = 0;
5407 		goto fail;
5408 	}
5409 
5410 	err = drbd_recv_all_warn(connection, response , resp_size);
5411 	if (err) {
5412 		rv = 0;
5413 		goto fail;
5414 	}
5415 
5416 	right_response = kmalloc(resp_size, GFP_NOIO);
5417 	if (right_response == NULL) {
5418 		drbd_err(connection, "kmalloc of right_response failed\n");
5419 		rv = -1;
5420 		goto fail;
5421 	}
5422 
5423 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5424 				 right_response);
5425 	if (rv) {
5426 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5427 		rv = -1;
5428 		goto fail;
5429 	}
5430 
5431 	rv = !memcmp(response, right_response, resp_size);
5432 
5433 	if (rv)
5434 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5435 		     resp_size);
5436 	else
5437 		rv = -1;
5438 
5439  fail:
5440 	kfree(peers_ch);
5441 	kfree(response);
5442 	kfree(right_response);
5443 	shash_desc_zero(desc);
5444 
5445 	return rv;
5446 }
5447 #endif
5448 
5449 int drbd_receiver(struct drbd_thread *thi)
5450 {
5451 	struct drbd_connection *connection = thi->connection;
5452 	int h;
5453 
5454 	drbd_info(connection, "receiver (re)started\n");
5455 
5456 	do {
5457 		h = conn_connect(connection);
5458 		if (h == 0) {
5459 			conn_disconnect(connection);
5460 			schedule_timeout_interruptible(HZ);
5461 		}
5462 		if (h == -1) {
5463 			drbd_warn(connection, "Discarding network configuration.\n");
5464 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5465 		}
5466 	} while (h == 0);
5467 
5468 	if (h > 0)
5469 		drbdd(connection);
5470 
5471 	conn_disconnect(connection);
5472 
5473 	drbd_info(connection, "receiver terminated\n");
5474 	return 0;
5475 }
5476 
5477 /* ********* acknowledge sender ******** */
5478 
5479 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5480 {
5481 	struct p_req_state_reply *p = pi->data;
5482 	int retcode = be32_to_cpu(p->retcode);
5483 
5484 	if (retcode >= SS_SUCCESS) {
5485 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5486 	} else {
5487 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5488 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5489 			 drbd_set_st_err_str(retcode), retcode);
5490 	}
5491 	wake_up(&connection->ping_wait);
5492 
5493 	return 0;
5494 }
5495 
5496 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5497 {
5498 	struct drbd_peer_device *peer_device;
5499 	struct drbd_device *device;
5500 	struct p_req_state_reply *p = pi->data;
5501 	int retcode = be32_to_cpu(p->retcode);
5502 
5503 	peer_device = conn_peer_device(connection, pi->vnr);
5504 	if (!peer_device)
5505 		return -EIO;
5506 	device = peer_device->device;
5507 
5508 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5509 		D_ASSERT(device, connection->agreed_pro_version < 100);
5510 		return got_conn_RqSReply(connection, pi);
5511 	}
5512 
5513 	if (retcode >= SS_SUCCESS) {
5514 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5515 	} else {
5516 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5517 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5518 			drbd_set_st_err_str(retcode), retcode);
5519 	}
5520 	wake_up(&device->state_wait);
5521 
5522 	return 0;
5523 }
5524 
5525 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5526 {
5527 	return drbd_send_ping_ack(connection);
5528 
5529 }
5530 
5531 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5532 {
5533 	/* restore idle timeout */
5534 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5535 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5536 		wake_up(&connection->ping_wait);
5537 
5538 	return 0;
5539 }
5540 
5541 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5542 {
5543 	struct drbd_peer_device *peer_device;
5544 	struct drbd_device *device;
5545 	struct p_block_ack *p = pi->data;
5546 	sector_t sector = be64_to_cpu(p->sector);
5547 	int blksize = be32_to_cpu(p->blksize);
5548 
5549 	peer_device = conn_peer_device(connection, pi->vnr);
5550 	if (!peer_device)
5551 		return -EIO;
5552 	device = peer_device->device;
5553 
5554 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5555 
5556 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5557 
5558 	if (get_ldev(device)) {
5559 		drbd_rs_complete_io(device, sector);
5560 		drbd_set_in_sync(device, sector, blksize);
5561 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5562 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5563 		put_ldev(device);
5564 	}
5565 	dec_rs_pending(device);
5566 	atomic_add(blksize >> 9, &device->rs_sect_in);
5567 
5568 	return 0;
5569 }
5570 
5571 static int
5572 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5573 			      struct rb_root *root, const char *func,
5574 			      enum drbd_req_event what, bool missing_ok)
5575 {
5576 	struct drbd_request *req;
5577 	struct bio_and_error m;
5578 
5579 	spin_lock_irq(&device->resource->req_lock);
5580 	req = find_request(device, root, id, sector, missing_ok, func);
5581 	if (unlikely(!req)) {
5582 		spin_unlock_irq(&device->resource->req_lock);
5583 		return -EIO;
5584 	}
5585 	__req_mod(req, what, &m);
5586 	spin_unlock_irq(&device->resource->req_lock);
5587 
5588 	if (m.bio)
5589 		complete_master_bio(device, &m);
5590 	return 0;
5591 }
5592 
5593 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5594 {
5595 	struct drbd_peer_device *peer_device;
5596 	struct drbd_device *device;
5597 	struct p_block_ack *p = pi->data;
5598 	sector_t sector = be64_to_cpu(p->sector);
5599 	int blksize = be32_to_cpu(p->blksize);
5600 	enum drbd_req_event what;
5601 
5602 	peer_device = conn_peer_device(connection, pi->vnr);
5603 	if (!peer_device)
5604 		return -EIO;
5605 	device = peer_device->device;
5606 
5607 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5608 
5609 	if (p->block_id == ID_SYNCER) {
5610 		drbd_set_in_sync(device, sector, blksize);
5611 		dec_rs_pending(device);
5612 		return 0;
5613 	}
5614 	switch (pi->cmd) {
5615 	case P_RS_WRITE_ACK:
5616 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5617 		break;
5618 	case P_WRITE_ACK:
5619 		what = WRITE_ACKED_BY_PEER;
5620 		break;
5621 	case P_RECV_ACK:
5622 		what = RECV_ACKED_BY_PEER;
5623 		break;
5624 	case P_SUPERSEDED:
5625 		what = CONFLICT_RESOLVED;
5626 		break;
5627 	case P_RETRY_WRITE:
5628 		what = POSTPONE_WRITE;
5629 		break;
5630 	default:
5631 		BUG();
5632 	}
5633 
5634 	return validate_req_change_req_state(device, p->block_id, sector,
5635 					     &device->write_requests, __func__,
5636 					     what, false);
5637 }
5638 
5639 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5640 {
5641 	struct drbd_peer_device *peer_device;
5642 	struct drbd_device *device;
5643 	struct p_block_ack *p = pi->data;
5644 	sector_t sector = be64_to_cpu(p->sector);
5645 	int size = be32_to_cpu(p->blksize);
5646 	int err;
5647 
5648 	peer_device = conn_peer_device(connection, pi->vnr);
5649 	if (!peer_device)
5650 		return -EIO;
5651 	device = peer_device->device;
5652 
5653 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5654 
5655 	if (p->block_id == ID_SYNCER) {
5656 		dec_rs_pending(device);
5657 		drbd_rs_failed_io(device, sector, size);
5658 		return 0;
5659 	}
5660 
5661 	err = validate_req_change_req_state(device, p->block_id, sector,
5662 					    &device->write_requests, __func__,
5663 					    NEG_ACKED, true);
5664 	if (err) {
5665 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5666 		   The master bio might already be completed, therefore the
5667 		   request is no longer in the collision hash. */
5668 		/* In Protocol B we might already have got a P_RECV_ACK
5669 		   but then get a P_NEG_ACK afterwards. */
5670 		drbd_set_out_of_sync(device, sector, size);
5671 	}
5672 	return 0;
5673 }
5674 
5675 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5676 {
5677 	struct drbd_peer_device *peer_device;
5678 	struct drbd_device *device;
5679 	struct p_block_ack *p = pi->data;
5680 	sector_t sector = be64_to_cpu(p->sector);
5681 
5682 	peer_device = conn_peer_device(connection, pi->vnr);
5683 	if (!peer_device)
5684 		return -EIO;
5685 	device = peer_device->device;
5686 
5687 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5688 
5689 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5690 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5691 
5692 	return validate_req_change_req_state(device, p->block_id, sector,
5693 					     &device->read_requests, __func__,
5694 					     NEG_ACKED, false);
5695 }
5696 
5697 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5698 {
5699 	struct drbd_peer_device *peer_device;
5700 	struct drbd_device *device;
5701 	sector_t sector;
5702 	int size;
5703 	struct p_block_ack *p = pi->data;
5704 
5705 	peer_device = conn_peer_device(connection, pi->vnr);
5706 	if (!peer_device)
5707 		return -EIO;
5708 	device = peer_device->device;
5709 
5710 	sector = be64_to_cpu(p->sector);
5711 	size = be32_to_cpu(p->blksize);
5712 
5713 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5714 
5715 	dec_rs_pending(device);
5716 
5717 	if (get_ldev_if_state(device, D_FAILED)) {
5718 		drbd_rs_complete_io(device, sector);
5719 		switch (pi->cmd) {
5720 		case P_NEG_RS_DREPLY:
5721 			drbd_rs_failed_io(device, sector, size);
5722 		case P_RS_CANCEL:
5723 			break;
5724 		default:
5725 			BUG();
5726 		}
5727 		put_ldev(device);
5728 	}
5729 
5730 	return 0;
5731 }
5732 
5733 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5734 {
5735 	struct p_barrier_ack *p = pi->data;
5736 	struct drbd_peer_device *peer_device;
5737 	int vnr;
5738 
5739 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5740 
5741 	rcu_read_lock();
5742 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5743 		struct drbd_device *device = peer_device->device;
5744 
5745 		if (device->state.conn == C_AHEAD &&
5746 		    atomic_read(&device->ap_in_flight) == 0 &&
5747 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5748 			device->start_resync_timer.expires = jiffies + HZ;
5749 			add_timer(&device->start_resync_timer);
5750 		}
5751 	}
5752 	rcu_read_unlock();
5753 
5754 	return 0;
5755 }
5756 
5757 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5758 {
5759 	struct drbd_peer_device *peer_device;
5760 	struct drbd_device *device;
5761 	struct p_block_ack *p = pi->data;
5762 	struct drbd_device_work *dw;
5763 	sector_t sector;
5764 	int size;
5765 
5766 	peer_device = conn_peer_device(connection, pi->vnr);
5767 	if (!peer_device)
5768 		return -EIO;
5769 	device = peer_device->device;
5770 
5771 	sector = be64_to_cpu(p->sector);
5772 	size = be32_to_cpu(p->blksize);
5773 
5774 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5775 
5776 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5777 		drbd_ov_out_of_sync_found(device, sector, size);
5778 	else
5779 		ov_out_of_sync_print(device);
5780 
5781 	if (!get_ldev(device))
5782 		return 0;
5783 
5784 	drbd_rs_complete_io(device, sector);
5785 	dec_rs_pending(device);
5786 
5787 	--device->ov_left;
5788 
5789 	/* let's advance progress step marks only for every other megabyte */
5790 	if ((device->ov_left & 0x200) == 0x200)
5791 		drbd_advance_rs_marks(device, device->ov_left);
5792 
5793 	if (device->ov_left == 0) {
5794 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5795 		if (dw) {
5796 			dw->w.cb = w_ov_finished;
5797 			dw->device = device;
5798 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5799 		} else {
5800 			drbd_err(device, "kmalloc(dw) failed.");
5801 			ov_out_of_sync_print(device);
5802 			drbd_resync_finished(device);
5803 		}
5804 	}
5805 	put_ldev(device);
5806 	return 0;
5807 }
5808 
5809 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5810 {
5811 	return 0;
5812 }
5813 
5814 struct meta_sock_cmd {
5815 	size_t pkt_size;
5816 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5817 };
5818 
5819 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5820 {
5821 	long t;
5822 	struct net_conf *nc;
5823 
5824 	rcu_read_lock();
5825 	nc = rcu_dereference(connection->net_conf);
5826 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5827 	rcu_read_unlock();
5828 
5829 	t *= HZ;
5830 	if (ping_timeout)
5831 		t /= 10;
5832 
5833 	connection->meta.socket->sk->sk_rcvtimeo = t;
5834 }
5835 
5836 static void set_ping_timeout(struct drbd_connection *connection)
5837 {
5838 	set_rcvtimeo(connection, 1);
5839 }
5840 
5841 static void set_idle_timeout(struct drbd_connection *connection)
5842 {
5843 	set_rcvtimeo(connection, 0);
5844 }
5845 
5846 static struct meta_sock_cmd ack_receiver_tbl[] = {
5847 	[P_PING]	    = { 0, got_Ping },
5848 	[P_PING_ACK]	    = { 0, got_PingAck },
5849 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5850 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5851 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5852 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5853 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5854 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5855 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5856 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5857 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5858 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5859 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5860 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5861 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5862 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5863 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5864 };
5865 
5866 int drbd_ack_receiver(struct drbd_thread *thi)
5867 {
5868 	struct drbd_connection *connection = thi->connection;
5869 	struct meta_sock_cmd *cmd = NULL;
5870 	struct packet_info pi;
5871 	unsigned long pre_recv_jif;
5872 	int rv;
5873 	void *buf    = connection->meta.rbuf;
5874 	int received = 0;
5875 	unsigned int header_size = drbd_header_size(connection);
5876 	int expect   = header_size;
5877 	bool ping_timeout_active = false;
5878 	struct sched_param param = { .sched_priority = 2 };
5879 
5880 	rv = sched_setscheduler(current, SCHED_RR, &param);
5881 	if (rv < 0)
5882 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5883 
5884 	while (get_t_state(thi) == RUNNING) {
5885 		drbd_thread_current_set_cpu(thi);
5886 
5887 		conn_reclaim_net_peer_reqs(connection);
5888 
5889 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5890 			if (drbd_send_ping(connection)) {
5891 				drbd_err(connection, "drbd_send_ping has failed\n");
5892 				goto reconnect;
5893 			}
5894 			set_ping_timeout(connection);
5895 			ping_timeout_active = true;
5896 		}
5897 
5898 		pre_recv_jif = jiffies;
5899 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5900 
5901 		/* Note:
5902 		 * -EINTR	 (on meta) we got a signal
5903 		 * -EAGAIN	 (on meta) rcvtimeo expired
5904 		 * -ECONNRESET	 other side closed the connection
5905 		 * -ERESTARTSYS  (on data) we got a signal
5906 		 * rv <  0	 other than above: unexpected error!
5907 		 * rv == expected: full header or command
5908 		 * rv <  expected: "woken" by signal during receive
5909 		 * rv == 0	 : "connection shut down by peer"
5910 		 */
5911 		if (likely(rv > 0)) {
5912 			received += rv;
5913 			buf	 += rv;
5914 		} else if (rv == 0) {
5915 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5916 				long t;
5917 				rcu_read_lock();
5918 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5919 				rcu_read_unlock();
5920 
5921 				t = wait_event_timeout(connection->ping_wait,
5922 						       connection->cstate < C_WF_REPORT_PARAMS,
5923 						       t);
5924 				if (t)
5925 					break;
5926 			}
5927 			drbd_err(connection, "meta connection shut down by peer.\n");
5928 			goto reconnect;
5929 		} else if (rv == -EAGAIN) {
5930 			/* If the data socket received something meanwhile,
5931 			 * that is good enough: peer is still alive. */
5932 			if (time_after(connection->last_received, pre_recv_jif))
5933 				continue;
5934 			if (ping_timeout_active) {
5935 				drbd_err(connection, "PingAck did not arrive in time.\n");
5936 				goto reconnect;
5937 			}
5938 			set_bit(SEND_PING, &connection->flags);
5939 			continue;
5940 		} else if (rv == -EINTR) {
5941 			/* maybe drbd_thread_stop(): the while condition will notice.
5942 			 * maybe woken for send_ping: we'll send a ping above,
5943 			 * and change the rcvtimeo */
5944 			flush_signals(current);
5945 			continue;
5946 		} else {
5947 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5948 			goto reconnect;
5949 		}
5950 
5951 		if (received == expect && cmd == NULL) {
5952 			if (decode_header(connection, connection->meta.rbuf, &pi))
5953 				goto reconnect;
5954 			cmd = &ack_receiver_tbl[pi.cmd];
5955 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5956 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5957 					 cmdname(pi.cmd), pi.cmd);
5958 				goto disconnect;
5959 			}
5960 			expect = header_size + cmd->pkt_size;
5961 			if (pi.size != expect - header_size) {
5962 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5963 					pi.cmd, pi.size);
5964 				goto reconnect;
5965 			}
5966 		}
5967 		if (received == expect) {
5968 			bool err;
5969 
5970 			err = cmd->fn(connection, &pi);
5971 			if (err) {
5972 				drbd_err(connection, "%pf failed\n", cmd->fn);
5973 				goto reconnect;
5974 			}
5975 
5976 			connection->last_received = jiffies;
5977 
5978 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5979 				set_idle_timeout(connection);
5980 				ping_timeout_active = false;
5981 			}
5982 
5983 			buf	 = connection->meta.rbuf;
5984 			received = 0;
5985 			expect	 = header_size;
5986 			cmd	 = NULL;
5987 		}
5988 	}
5989 
5990 	if (0) {
5991 reconnect:
5992 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5993 		conn_md_sync(connection);
5994 	}
5995 	if (0) {
5996 disconnect:
5997 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5998 	}
5999 
6000 	drbd_info(connection, "ack_receiver terminated\n");
6001 
6002 	return 0;
6003 }
6004 
6005 void drbd_send_acks_wf(struct work_struct *ws)
6006 {
6007 	struct drbd_peer_device *peer_device =
6008 		container_of(ws, struct drbd_peer_device, send_acks_work);
6009 	struct drbd_connection *connection = peer_device->connection;
6010 	struct drbd_device *device = peer_device->device;
6011 	struct net_conf *nc;
6012 	int tcp_cork, err;
6013 
6014 	rcu_read_lock();
6015 	nc = rcu_dereference(connection->net_conf);
6016 	tcp_cork = nc->tcp_cork;
6017 	rcu_read_unlock();
6018 
6019 	if (tcp_cork)
6020 		drbd_tcp_cork(connection->meta.socket);
6021 
6022 	err = drbd_finish_peer_reqs(device);
6023 	kref_put(&device->kref, drbd_destroy_device);
6024 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6025 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6026 
6027 	if (err) {
6028 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6029 		return;
6030 	}
6031 
6032 	if (tcp_cork)
6033 		drbd_tcp_uncork(connection->meta.socket);
6034 
6035 	return;
6036 }
6037