xref: /linux/drivers/block/drbd/drbd_receiver.c (revision ca55b2fef3a9373fcfc30f82fd26bc7fccbda732)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50 
51 #define PRO_FEATURES (FF_TRIM)
52 
53 struct packet_info {
54 	enum drbd_packet cmd;
55 	unsigned int size;
56 	unsigned int vnr;
57 	void *data;
58 };
59 
60 enum finish_epoch {
61 	FE_STILL_LIVE,
62 	FE_DESTROYED,
63 	FE_RECYCLED,
64 };
65 
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72 
73 
74 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75 
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80 
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 	struct page *page;
88 	struct page *tmp;
89 
90 	BUG_ON(!n);
91 	BUG_ON(!head);
92 
93 	page = *head;
94 
95 	if (!page)
96 		return NULL;
97 
98 	while (page) {
99 		tmp = page_chain_next(page);
100 		if (--n == 0)
101 			break; /* found sufficient pages */
102 		if (tmp == NULL)
103 			/* insufficient pages, don't use any of them. */
104 			return NULL;
105 		page = tmp;
106 	}
107 
108 	/* add end of list marker for the returned list */
109 	set_page_private(page, 0);
110 	/* actual return value, and adjustment of head */
111 	page = *head;
112 	*head = tmp;
113 	return page;
114 }
115 
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 	struct page *tmp;
122 	int i = 1;
123 	while ((tmp = page_chain_next(page)))
124 		++i, page = tmp;
125 	if (len)
126 		*len = i;
127 	return page;
128 }
129 
130 static int page_chain_free(struct page *page)
131 {
132 	struct page *tmp;
133 	int i = 0;
134 	page_chain_for_each_safe(page, tmp) {
135 		put_page(page);
136 		++i;
137 	}
138 	return i;
139 }
140 
141 static void page_chain_add(struct page **head,
142 		struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 	struct page *tmp;
146 	tmp = page_chain_tail(chain_first, NULL);
147 	BUG_ON(tmp != chain_last);
148 #endif
149 
150 	/* add chain to head */
151 	set_page_private(chain_last, (unsigned long)*head);
152 	*head = chain_first;
153 }
154 
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 				       unsigned int number)
157 {
158 	struct page *page = NULL;
159 	struct page *tmp = NULL;
160 	unsigned int i = 0;
161 
162 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163 	 * So what. It saves a spin_lock. */
164 	if (drbd_pp_vacant >= number) {
165 		spin_lock(&drbd_pp_lock);
166 		page = page_chain_del(&drbd_pp_pool, number);
167 		if (page)
168 			drbd_pp_vacant -= number;
169 		spin_unlock(&drbd_pp_lock);
170 		if (page)
171 			return page;
172 	}
173 
174 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 	 * which in turn might block on the other node at this very place.  */
177 	for (i = 0; i < number; i++) {
178 		tmp = alloc_page(GFP_TRY);
179 		if (!tmp)
180 			break;
181 		set_page_private(tmp, (unsigned long)page);
182 		page = tmp;
183 	}
184 
185 	if (i == number)
186 		return page;
187 
188 	/* Not enough pages immediately available this time.
189 	 * No need to jump around here, drbd_alloc_pages will retry this
190 	 * function "soon". */
191 	if (page) {
192 		tmp = page_chain_tail(page, NULL);
193 		spin_lock(&drbd_pp_lock);
194 		page_chain_add(&drbd_pp_pool, page, tmp);
195 		drbd_pp_vacant += i;
196 		spin_unlock(&drbd_pp_lock);
197 	}
198 	return NULL;
199 }
200 
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 					   struct list_head *to_be_freed)
203 {
204 	struct drbd_peer_request *peer_req, *tmp;
205 
206 	/* The EEs are always appended to the end of the list. Since
207 	   they are sent in order over the wire, they have to finish
208 	   in order. As soon as we see the first not finished we can
209 	   stop to examine the list... */
210 
211 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 		if (drbd_peer_req_has_active_page(peer_req))
213 			break;
214 		list_move(&peer_req->w.list, to_be_freed);
215 	}
216 }
217 
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220 	LIST_HEAD(reclaimed);
221 	struct drbd_peer_request *peer_req, *t;
222 
223 	spin_lock_irq(&device->resource->req_lock);
224 	reclaim_finished_net_peer_reqs(device, &reclaimed);
225 	spin_unlock_irq(&device->resource->req_lock);
226 
227 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 		drbd_free_net_peer_req(device, peer_req);
229 }
230 
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:	DRBD device.
234  * @number:	number of pages requested
235  * @retry:	whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252 			      bool retry)
253 {
254 	struct drbd_device *device = peer_device->device;
255 	struct page *page = NULL;
256 	struct net_conf *nc;
257 	DEFINE_WAIT(wait);
258 	unsigned int mxb;
259 
260 	rcu_read_lock();
261 	nc = rcu_dereference(peer_device->connection->net_conf);
262 	mxb = nc ? nc->max_buffers : 1000000;
263 	rcu_read_unlock();
264 
265 	if (atomic_read(&device->pp_in_use) < mxb)
266 		page = __drbd_alloc_pages(device, number);
267 
268 	while (page == NULL) {
269 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270 
271 		drbd_kick_lo_and_reclaim_net(device);
272 
273 		if (atomic_read(&device->pp_in_use) < mxb) {
274 			page = __drbd_alloc_pages(device, number);
275 			if (page)
276 				break;
277 		}
278 
279 		if (!retry)
280 			break;
281 
282 		if (signal_pending(current)) {
283 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284 			break;
285 		}
286 
287 		if (schedule_timeout(HZ/10) == 0)
288 			mxb = UINT_MAX;
289 	}
290 	finish_wait(&drbd_pp_wait, &wait);
291 
292 	if (page)
293 		atomic_add(number, &device->pp_in_use);
294 	return page;
295 }
296 
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304 	int i;
305 
306 	if (page == NULL)
307 		return;
308 
309 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 		i = page_chain_free(page);
311 	else {
312 		struct page *tmp;
313 		tmp = page_chain_tail(page, &i);
314 		spin_lock(&drbd_pp_lock);
315 		page_chain_add(&drbd_pp_pool, page, tmp);
316 		drbd_pp_vacant += i;
317 		spin_unlock(&drbd_pp_lock);
318 	}
319 	i = atomic_sub_return(i, a);
320 	if (i < 0)
321 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323 	wake_up(&drbd_pp_wait);
324 }
325 
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329 
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339 
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342 		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344 	struct drbd_device *device = peer_device->device;
345 	struct drbd_peer_request *peer_req;
346 	struct page *page = NULL;
347 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348 
349 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350 		return NULL;
351 
352 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353 	if (!peer_req) {
354 		if (!(gfp_mask & __GFP_NOWARN))
355 			drbd_err(device, "%s: allocation failed\n", __func__);
356 		return NULL;
357 	}
358 
359 	if (has_payload && data_size) {
360 		page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361 		if (!page)
362 			goto fail;
363 	}
364 
365 	memset(peer_req, 0, sizeof(*peer_req));
366 	INIT_LIST_HEAD(&peer_req->w.list);
367 	drbd_clear_interval(&peer_req->i);
368 	peer_req->i.size = data_size;
369 	peer_req->i.sector = sector;
370 	peer_req->submit_jif = jiffies;
371 	peer_req->peer_device = peer_device;
372 	peer_req->pages = page;
373 	/*
374 	 * The block_id is opaque to the receiver.  It is not endianness
375 	 * converted, and sent back to the sender unchanged.
376 	 */
377 	peer_req->block_id = id;
378 
379 	return peer_req;
380 
381  fail:
382 	mempool_free(peer_req, drbd_ee_mempool);
383 	return NULL;
384 }
385 
386 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
387 		       int is_net)
388 {
389 	might_sleep();
390 	if (peer_req->flags & EE_HAS_DIGEST)
391 		kfree(peer_req->digest);
392 	drbd_free_pages(device, peer_req->pages, is_net);
393 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
394 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
395 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
396 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
397 		drbd_al_complete_io(device, &peer_req->i);
398 	}
399 	mempool_free(peer_req, drbd_ee_mempool);
400 }
401 
402 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
403 {
404 	LIST_HEAD(work_list);
405 	struct drbd_peer_request *peer_req, *t;
406 	int count = 0;
407 	int is_net = list == &device->net_ee;
408 
409 	spin_lock_irq(&device->resource->req_lock);
410 	list_splice_init(list, &work_list);
411 	spin_unlock_irq(&device->resource->req_lock);
412 
413 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
414 		__drbd_free_peer_req(device, peer_req, is_net);
415 		count++;
416 	}
417 	return count;
418 }
419 
420 /*
421  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
422  */
423 static int drbd_finish_peer_reqs(struct drbd_device *device)
424 {
425 	LIST_HEAD(work_list);
426 	LIST_HEAD(reclaimed);
427 	struct drbd_peer_request *peer_req, *t;
428 	int err = 0;
429 
430 	spin_lock_irq(&device->resource->req_lock);
431 	reclaim_finished_net_peer_reqs(device, &reclaimed);
432 	list_splice_init(&device->done_ee, &work_list);
433 	spin_unlock_irq(&device->resource->req_lock);
434 
435 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
436 		drbd_free_net_peer_req(device, peer_req);
437 
438 	/* possible callbacks here:
439 	 * e_end_block, and e_end_resync_block, e_send_superseded.
440 	 * all ignore the last argument.
441 	 */
442 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443 		int err2;
444 
445 		/* list_del not necessary, next/prev members not touched */
446 		err2 = peer_req->w.cb(&peer_req->w, !!err);
447 		if (!err)
448 			err = err2;
449 		drbd_free_peer_req(device, peer_req);
450 	}
451 	wake_up(&device->ee_wait);
452 
453 	return err;
454 }
455 
456 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
457 				     struct list_head *head)
458 {
459 	DEFINE_WAIT(wait);
460 
461 	/* avoids spin_lock/unlock
462 	 * and calling prepare_to_wait in the fast path */
463 	while (!list_empty(head)) {
464 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
465 		spin_unlock_irq(&device->resource->req_lock);
466 		io_schedule();
467 		finish_wait(&device->ee_wait, &wait);
468 		spin_lock_irq(&device->resource->req_lock);
469 	}
470 }
471 
472 static void drbd_wait_ee_list_empty(struct drbd_device *device,
473 				    struct list_head *head)
474 {
475 	spin_lock_irq(&device->resource->req_lock);
476 	_drbd_wait_ee_list_empty(device, head);
477 	spin_unlock_irq(&device->resource->req_lock);
478 }
479 
480 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
481 {
482 	struct kvec iov = {
483 		.iov_base = buf,
484 		.iov_len = size,
485 	};
486 	struct msghdr msg = {
487 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
488 	};
489 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
490 }
491 
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493 {
494 	int rv;
495 
496 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497 
498 	if (rv < 0) {
499 		if (rv == -ECONNRESET)
500 			drbd_info(connection, "sock was reset by peer\n");
501 		else if (rv != -ERESTARTSYS)
502 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503 	} else if (rv == 0) {
504 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505 			long t;
506 			rcu_read_lock();
507 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508 			rcu_read_unlock();
509 
510 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511 
512 			if (t)
513 				goto out;
514 		}
515 		drbd_info(connection, "sock was shut down by peer\n");
516 	}
517 
518 	if (rv != size)
519 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520 
521 out:
522 	return rv;
523 }
524 
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 {
527 	int err;
528 
529 	err = drbd_recv(connection, buf, size);
530 	if (err != size) {
531 		if (err >= 0)
532 			err = -EIO;
533 	} else
534 		err = 0;
535 	return err;
536 }
537 
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 {
540 	int err;
541 
542 	err = drbd_recv_all(connection, buf, size);
543 	if (err && !signal_pending(current))
544 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545 	return err;
546 }
547 
548 /* quoting tcp(7):
549  *   On individual connections, the socket buffer size must be set prior to the
550  *   listen(2) or connect(2) calls in order to have it take effect.
551  * This is our wrapper to do so.
552  */
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554 		unsigned int rcv)
555 {
556 	/* open coded SO_SNDBUF, SO_RCVBUF */
557 	if (snd) {
558 		sock->sk->sk_sndbuf = snd;
559 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560 	}
561 	if (rcv) {
562 		sock->sk->sk_rcvbuf = rcv;
563 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564 	}
565 }
566 
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
568 {
569 	const char *what;
570 	struct socket *sock;
571 	struct sockaddr_in6 src_in6;
572 	struct sockaddr_in6 peer_in6;
573 	struct net_conf *nc;
574 	int err, peer_addr_len, my_addr_len;
575 	int sndbuf_size, rcvbuf_size, connect_int;
576 	int disconnect_on_error = 1;
577 
578 	rcu_read_lock();
579 	nc = rcu_dereference(connection->net_conf);
580 	if (!nc) {
581 		rcu_read_unlock();
582 		return NULL;
583 	}
584 	sndbuf_size = nc->sndbuf_size;
585 	rcvbuf_size = nc->rcvbuf_size;
586 	connect_int = nc->connect_int;
587 	rcu_read_unlock();
588 
589 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
591 
592 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593 		src_in6.sin6_port = 0;
594 	else
595 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596 
597 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599 
600 	what = "sock_create_kern";
601 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
602 			       SOCK_STREAM, IPPROTO_TCP, &sock);
603 	if (err < 0) {
604 		sock = NULL;
605 		goto out;
606 	}
607 
608 	sock->sk->sk_rcvtimeo =
609 	sock->sk->sk_sndtimeo = connect_int * HZ;
610 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611 
612        /* explicitly bind to the configured IP as source IP
613 	*  for the outgoing connections.
614 	*  This is needed for multihomed hosts and to be
615 	*  able to use lo: interfaces for drbd.
616 	* Make sure to use 0 as port number, so linux selects
617 	*  a free one dynamically.
618 	*/
619 	what = "bind before connect";
620 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621 	if (err < 0)
622 		goto out;
623 
624 	/* connect may fail, peer not yet available.
625 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
626 	disconnect_on_error = 0;
627 	what = "connect";
628 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629 
630 out:
631 	if (err < 0) {
632 		if (sock) {
633 			sock_release(sock);
634 			sock = NULL;
635 		}
636 		switch (-err) {
637 			/* timeout, busy, signal pending */
638 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639 		case EINTR: case ERESTARTSYS:
640 			/* peer not (yet) available, network problem */
641 		case ECONNREFUSED: case ENETUNREACH:
642 		case EHOSTDOWN:    case EHOSTUNREACH:
643 			disconnect_on_error = 0;
644 			break;
645 		default:
646 			drbd_err(connection, "%s failed, err = %d\n", what, err);
647 		}
648 		if (disconnect_on_error)
649 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650 	}
651 
652 	return sock;
653 }
654 
655 struct accept_wait_data {
656 	struct drbd_connection *connection;
657 	struct socket *s_listen;
658 	struct completion door_bell;
659 	void (*original_sk_state_change)(struct sock *sk);
660 
661 };
662 
663 static void drbd_incoming_connection(struct sock *sk)
664 {
665 	struct accept_wait_data *ad = sk->sk_user_data;
666 	void (*state_change)(struct sock *sk);
667 
668 	state_change = ad->original_sk_state_change;
669 	if (sk->sk_state == TCP_ESTABLISHED)
670 		complete(&ad->door_bell);
671 	state_change(sk);
672 }
673 
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675 {
676 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
677 	struct sockaddr_in6 my_addr;
678 	struct socket *s_listen;
679 	struct net_conf *nc;
680 	const char *what;
681 
682 	rcu_read_lock();
683 	nc = rcu_dereference(connection->net_conf);
684 	if (!nc) {
685 		rcu_read_unlock();
686 		return -EIO;
687 	}
688 	sndbuf_size = nc->sndbuf_size;
689 	rcvbuf_size = nc->rcvbuf_size;
690 	rcu_read_unlock();
691 
692 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
694 
695 	what = "sock_create_kern";
696 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
697 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
698 	if (err) {
699 		s_listen = NULL;
700 		goto out;
701 	}
702 
703 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705 
706 	what = "bind before listen";
707 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708 	if (err < 0)
709 		goto out;
710 
711 	ad->s_listen = s_listen;
712 	write_lock_bh(&s_listen->sk->sk_callback_lock);
713 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
714 	s_listen->sk->sk_state_change = drbd_incoming_connection;
715 	s_listen->sk->sk_user_data = ad;
716 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
717 
718 	what = "listen";
719 	err = s_listen->ops->listen(s_listen, 5);
720 	if (err < 0)
721 		goto out;
722 
723 	return 0;
724 out:
725 	if (s_listen)
726 		sock_release(s_listen);
727 	if (err < 0) {
728 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729 			drbd_err(connection, "%s failed, err = %d\n", what, err);
730 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731 		}
732 	}
733 
734 	return -EIO;
735 }
736 
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738 {
739 	write_lock_bh(&sk->sk_callback_lock);
740 	sk->sk_state_change = ad->original_sk_state_change;
741 	sk->sk_user_data = NULL;
742 	write_unlock_bh(&sk->sk_callback_lock);
743 }
744 
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746 {
747 	int timeo, connect_int, err = 0;
748 	struct socket *s_estab = NULL;
749 	struct net_conf *nc;
750 
751 	rcu_read_lock();
752 	nc = rcu_dereference(connection->net_conf);
753 	if (!nc) {
754 		rcu_read_unlock();
755 		return NULL;
756 	}
757 	connect_int = nc->connect_int;
758 	rcu_read_unlock();
759 
760 	timeo = connect_int * HZ;
761 	/* 28.5% random jitter */
762 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763 
764 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765 	if (err <= 0)
766 		return NULL;
767 
768 	err = kernel_accept(ad->s_listen, &s_estab, 0);
769 	if (err < 0) {
770 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771 			drbd_err(connection, "accept failed, err = %d\n", err);
772 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773 		}
774 	}
775 
776 	if (s_estab)
777 		unregister_state_change(s_estab->sk, ad);
778 
779 	return s_estab;
780 }
781 
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783 
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785 			     enum drbd_packet cmd)
786 {
787 	if (!conn_prepare_command(connection, sock))
788 		return -EIO;
789 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 }
791 
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793 {
794 	unsigned int header_size = drbd_header_size(connection);
795 	struct packet_info pi;
796 	struct net_conf *nc;
797 	int err;
798 
799 	rcu_read_lock();
800 	nc = rcu_dereference(connection->net_conf);
801 	if (!nc) {
802 		rcu_read_unlock();
803 		return -EIO;
804 	}
805 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
806 	rcu_read_unlock();
807 
808 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
809 	if (err != header_size) {
810 		if (err >= 0)
811 			err = -EIO;
812 		return err;
813 	}
814 	err = decode_header(connection, connection->data.rbuf, &pi);
815 	if (err)
816 		return err;
817 	return pi.cmd;
818 }
819 
820 /**
821  * drbd_socket_okay() - Free the socket if its connection is not okay
822  * @sock:	pointer to the pointer to the socket.
823  */
824 static bool drbd_socket_okay(struct socket **sock)
825 {
826 	int rr;
827 	char tb[4];
828 
829 	if (!*sock)
830 		return false;
831 
832 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
833 
834 	if (rr > 0 || rr == -EAGAIN) {
835 		return true;
836 	} else {
837 		sock_release(*sock);
838 		*sock = NULL;
839 		return false;
840 	}
841 }
842 
843 static bool connection_established(struct drbd_connection *connection,
844 				   struct socket **sock1,
845 				   struct socket **sock2)
846 {
847 	struct net_conf *nc;
848 	int timeout;
849 	bool ok;
850 
851 	if (!*sock1 || !*sock2)
852 		return false;
853 
854 	rcu_read_lock();
855 	nc = rcu_dereference(connection->net_conf);
856 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
857 	rcu_read_unlock();
858 	schedule_timeout_interruptible(timeout);
859 
860 	ok = drbd_socket_okay(sock1);
861 	ok = drbd_socket_okay(sock2) && ok;
862 
863 	return ok;
864 }
865 
866 /* Gets called if a connection is established, or if a new minor gets created
867    in a connection */
868 int drbd_connected(struct drbd_peer_device *peer_device)
869 {
870 	struct drbd_device *device = peer_device->device;
871 	int err;
872 
873 	atomic_set(&device->packet_seq, 0);
874 	device->peer_seq = 0;
875 
876 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
877 		&peer_device->connection->cstate_mutex :
878 		&device->own_state_mutex;
879 
880 	err = drbd_send_sync_param(peer_device);
881 	if (!err)
882 		err = drbd_send_sizes(peer_device, 0, 0);
883 	if (!err)
884 		err = drbd_send_uuids(peer_device);
885 	if (!err)
886 		err = drbd_send_current_state(peer_device);
887 	clear_bit(USE_DEGR_WFC_T, &device->flags);
888 	clear_bit(RESIZE_PENDING, &device->flags);
889 	atomic_set(&device->ap_in_flight, 0);
890 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
891 	return err;
892 }
893 
894 /*
895  * return values:
896  *   1 yes, we have a valid connection
897  *   0 oops, did not work out, please try again
898  *  -1 peer talks different language,
899  *     no point in trying again, please go standalone.
900  *  -2 We do not have a network config...
901  */
902 static int conn_connect(struct drbd_connection *connection)
903 {
904 	struct drbd_socket sock, msock;
905 	struct drbd_peer_device *peer_device;
906 	struct net_conf *nc;
907 	int vnr, timeout, h;
908 	bool discard_my_data, ok;
909 	enum drbd_state_rv rv;
910 	struct accept_wait_data ad = {
911 		.connection = connection,
912 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
913 	};
914 
915 	clear_bit(DISCONNECT_SENT, &connection->flags);
916 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
917 		return -2;
918 
919 	mutex_init(&sock.mutex);
920 	sock.sbuf = connection->data.sbuf;
921 	sock.rbuf = connection->data.rbuf;
922 	sock.socket = NULL;
923 	mutex_init(&msock.mutex);
924 	msock.sbuf = connection->meta.sbuf;
925 	msock.rbuf = connection->meta.rbuf;
926 	msock.socket = NULL;
927 
928 	/* Assume that the peer only understands protocol 80 until we know better.  */
929 	connection->agreed_pro_version = 80;
930 
931 	if (prepare_listen_socket(connection, &ad))
932 		return 0;
933 
934 	do {
935 		struct socket *s;
936 
937 		s = drbd_try_connect(connection);
938 		if (s) {
939 			if (!sock.socket) {
940 				sock.socket = s;
941 				send_first_packet(connection, &sock, P_INITIAL_DATA);
942 			} else if (!msock.socket) {
943 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
944 				msock.socket = s;
945 				send_first_packet(connection, &msock, P_INITIAL_META);
946 			} else {
947 				drbd_err(connection, "Logic error in conn_connect()\n");
948 				goto out_release_sockets;
949 			}
950 		}
951 
952 		if (connection_established(connection, &sock.socket, &msock.socket))
953 			break;
954 
955 retry:
956 		s = drbd_wait_for_connect(connection, &ad);
957 		if (s) {
958 			int fp = receive_first_packet(connection, s);
959 			drbd_socket_okay(&sock.socket);
960 			drbd_socket_okay(&msock.socket);
961 			switch (fp) {
962 			case P_INITIAL_DATA:
963 				if (sock.socket) {
964 					drbd_warn(connection, "initial packet S crossed\n");
965 					sock_release(sock.socket);
966 					sock.socket = s;
967 					goto randomize;
968 				}
969 				sock.socket = s;
970 				break;
971 			case P_INITIAL_META:
972 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
973 				if (msock.socket) {
974 					drbd_warn(connection, "initial packet M crossed\n");
975 					sock_release(msock.socket);
976 					msock.socket = s;
977 					goto randomize;
978 				}
979 				msock.socket = s;
980 				break;
981 			default:
982 				drbd_warn(connection, "Error receiving initial packet\n");
983 				sock_release(s);
984 randomize:
985 				if (prandom_u32() & 1)
986 					goto retry;
987 			}
988 		}
989 
990 		if (connection->cstate <= C_DISCONNECTING)
991 			goto out_release_sockets;
992 		if (signal_pending(current)) {
993 			flush_signals(current);
994 			smp_rmb();
995 			if (get_t_state(&connection->receiver) == EXITING)
996 				goto out_release_sockets;
997 		}
998 
999 		ok = connection_established(connection, &sock.socket, &msock.socket);
1000 	} while (!ok);
1001 
1002 	if (ad.s_listen)
1003 		sock_release(ad.s_listen);
1004 
1005 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1006 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007 
1008 	sock.socket->sk->sk_allocation = GFP_NOIO;
1009 	msock.socket->sk->sk_allocation = GFP_NOIO;
1010 
1011 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1012 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1013 
1014 	/* NOT YET ...
1015 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1016 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1017 	 * first set it to the P_CONNECTION_FEATURES timeout,
1018 	 * which we set to 4x the configured ping_timeout. */
1019 	rcu_read_lock();
1020 	nc = rcu_dereference(connection->net_conf);
1021 
1022 	sock.socket->sk->sk_sndtimeo =
1023 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1024 
1025 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1026 	timeout = nc->timeout * HZ / 10;
1027 	discard_my_data = nc->discard_my_data;
1028 	rcu_read_unlock();
1029 
1030 	msock.socket->sk->sk_sndtimeo = timeout;
1031 
1032 	/* we don't want delays.
1033 	 * we use TCP_CORK where appropriate, though */
1034 	drbd_tcp_nodelay(sock.socket);
1035 	drbd_tcp_nodelay(msock.socket);
1036 
1037 	connection->data.socket = sock.socket;
1038 	connection->meta.socket = msock.socket;
1039 	connection->last_received = jiffies;
1040 
1041 	h = drbd_do_features(connection);
1042 	if (h <= 0)
1043 		return h;
1044 
1045 	if (connection->cram_hmac_tfm) {
1046 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1047 		switch (drbd_do_auth(connection)) {
1048 		case -1:
1049 			drbd_err(connection, "Authentication of peer failed\n");
1050 			return -1;
1051 		case 0:
1052 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1053 			return 0;
1054 		}
1055 	}
1056 
1057 	connection->data.socket->sk->sk_sndtimeo = timeout;
1058 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1059 
1060 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1061 		return -1;
1062 
1063 	/* Prevent a race between resync-handshake and
1064 	 * being promoted to Primary.
1065 	 *
1066 	 * Grab and release the state mutex, so we know that any current
1067 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1068 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1069 	 */
1070 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1071 		mutex_lock(peer_device->device->state_mutex);
1072 
1073 	set_bit(STATE_SENT, &connection->flags);
1074 
1075 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1076 		mutex_unlock(peer_device->device->state_mutex);
1077 
1078 	rcu_read_lock();
1079 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1080 		struct drbd_device *device = peer_device->device;
1081 		kref_get(&device->kref);
1082 		rcu_read_unlock();
1083 
1084 		if (discard_my_data)
1085 			set_bit(DISCARD_MY_DATA, &device->flags);
1086 		else
1087 			clear_bit(DISCARD_MY_DATA, &device->flags);
1088 
1089 		drbd_connected(peer_device);
1090 		kref_put(&device->kref, drbd_destroy_device);
1091 		rcu_read_lock();
1092 	}
1093 	rcu_read_unlock();
1094 
1095 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1096 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1097 		clear_bit(STATE_SENT, &connection->flags);
1098 		return 0;
1099 	}
1100 
1101 	drbd_thread_start(&connection->asender);
1102 
1103 	mutex_lock(&connection->resource->conf_update);
1104 	/* The discard_my_data flag is a single-shot modifier to the next
1105 	 * connection attempt, the handshake of which is now well underway.
1106 	 * No need for rcu style copying of the whole struct
1107 	 * just to clear a single value. */
1108 	connection->net_conf->discard_my_data = 0;
1109 	mutex_unlock(&connection->resource->conf_update);
1110 
1111 	return h;
1112 
1113 out_release_sockets:
1114 	if (ad.s_listen)
1115 		sock_release(ad.s_listen);
1116 	if (sock.socket)
1117 		sock_release(sock.socket);
1118 	if (msock.socket)
1119 		sock_release(msock.socket);
1120 	return -1;
1121 }
1122 
1123 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1124 {
1125 	unsigned int header_size = drbd_header_size(connection);
1126 
1127 	if (header_size == sizeof(struct p_header100) &&
1128 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1129 		struct p_header100 *h = header;
1130 		if (h->pad != 0) {
1131 			drbd_err(connection, "Header padding is not zero\n");
1132 			return -EINVAL;
1133 		}
1134 		pi->vnr = be16_to_cpu(h->volume);
1135 		pi->cmd = be16_to_cpu(h->command);
1136 		pi->size = be32_to_cpu(h->length);
1137 	} else if (header_size == sizeof(struct p_header95) &&
1138 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1139 		struct p_header95 *h = header;
1140 		pi->cmd = be16_to_cpu(h->command);
1141 		pi->size = be32_to_cpu(h->length);
1142 		pi->vnr = 0;
1143 	} else if (header_size == sizeof(struct p_header80) &&
1144 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1145 		struct p_header80 *h = header;
1146 		pi->cmd = be16_to_cpu(h->command);
1147 		pi->size = be16_to_cpu(h->length);
1148 		pi->vnr = 0;
1149 	} else {
1150 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1151 			 be32_to_cpu(*(__be32 *)header),
1152 			 connection->agreed_pro_version);
1153 		return -EINVAL;
1154 	}
1155 	pi->data = header + header_size;
1156 	return 0;
1157 }
1158 
1159 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1160 {
1161 	void *buffer = connection->data.rbuf;
1162 	int err;
1163 
1164 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1165 	if (err)
1166 		return err;
1167 
1168 	err = decode_header(connection, buffer, pi);
1169 	connection->last_received = jiffies;
1170 
1171 	return err;
1172 }
1173 
1174 static void drbd_flush(struct drbd_connection *connection)
1175 {
1176 	int rv;
1177 	struct drbd_peer_device *peer_device;
1178 	int vnr;
1179 
1180 	if (connection->resource->write_ordering >= WO_bdev_flush) {
1181 		rcu_read_lock();
1182 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1183 			struct drbd_device *device = peer_device->device;
1184 
1185 			if (!get_ldev(device))
1186 				continue;
1187 			kref_get(&device->kref);
1188 			rcu_read_unlock();
1189 
1190 			/* Right now, we have only this one synchronous code path
1191 			 * for flushes between request epochs.
1192 			 * We may want to make those asynchronous,
1193 			 * or at least parallelize the flushes to the volume devices.
1194 			 */
1195 			device->flush_jif = jiffies;
1196 			set_bit(FLUSH_PENDING, &device->flags);
1197 			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1198 					GFP_NOIO, NULL);
1199 			clear_bit(FLUSH_PENDING, &device->flags);
1200 			if (rv) {
1201 				drbd_info(device, "local disk flush failed with status %d\n", rv);
1202 				/* would rather check on EOPNOTSUPP, but that is not reliable.
1203 				 * don't try again for ANY return value != 0
1204 				 * if (rv == -EOPNOTSUPP) */
1205 				drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1206 			}
1207 			put_ldev(device);
1208 			kref_put(&device->kref, drbd_destroy_device);
1209 
1210 			rcu_read_lock();
1211 			if (rv)
1212 				break;
1213 		}
1214 		rcu_read_unlock();
1215 	}
1216 }
1217 
1218 /**
1219  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1220  * @device:	DRBD device.
1221  * @epoch:	Epoch object.
1222  * @ev:		Epoch event.
1223  */
1224 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1225 					       struct drbd_epoch *epoch,
1226 					       enum epoch_event ev)
1227 {
1228 	int epoch_size;
1229 	struct drbd_epoch *next_epoch;
1230 	enum finish_epoch rv = FE_STILL_LIVE;
1231 
1232 	spin_lock(&connection->epoch_lock);
1233 	do {
1234 		next_epoch = NULL;
1235 
1236 		epoch_size = atomic_read(&epoch->epoch_size);
1237 
1238 		switch (ev & ~EV_CLEANUP) {
1239 		case EV_PUT:
1240 			atomic_dec(&epoch->active);
1241 			break;
1242 		case EV_GOT_BARRIER_NR:
1243 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1244 			break;
1245 		case EV_BECAME_LAST:
1246 			/* nothing to do*/
1247 			break;
1248 		}
1249 
1250 		if (epoch_size != 0 &&
1251 		    atomic_read(&epoch->active) == 0 &&
1252 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1253 			if (!(ev & EV_CLEANUP)) {
1254 				spin_unlock(&connection->epoch_lock);
1255 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1256 				spin_lock(&connection->epoch_lock);
1257 			}
1258 #if 0
1259 			/* FIXME: dec unacked on connection, once we have
1260 			 * something to count pending connection packets in. */
1261 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1262 				dec_unacked(epoch->connection);
1263 #endif
1264 
1265 			if (connection->current_epoch != epoch) {
1266 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1267 				list_del(&epoch->list);
1268 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1269 				connection->epochs--;
1270 				kfree(epoch);
1271 
1272 				if (rv == FE_STILL_LIVE)
1273 					rv = FE_DESTROYED;
1274 			} else {
1275 				epoch->flags = 0;
1276 				atomic_set(&epoch->epoch_size, 0);
1277 				/* atomic_set(&epoch->active, 0); is already zero */
1278 				if (rv == FE_STILL_LIVE)
1279 					rv = FE_RECYCLED;
1280 			}
1281 		}
1282 
1283 		if (!next_epoch)
1284 			break;
1285 
1286 		epoch = next_epoch;
1287 	} while (1);
1288 
1289 	spin_unlock(&connection->epoch_lock);
1290 
1291 	return rv;
1292 }
1293 
1294 static enum write_ordering_e
1295 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1296 {
1297 	struct disk_conf *dc;
1298 
1299 	dc = rcu_dereference(bdev->disk_conf);
1300 
1301 	if (wo == WO_bdev_flush && !dc->disk_flushes)
1302 		wo = WO_drain_io;
1303 	if (wo == WO_drain_io && !dc->disk_drain)
1304 		wo = WO_none;
1305 
1306 	return wo;
1307 }
1308 
1309 /**
1310  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1311  * @connection:	DRBD connection.
1312  * @wo:		Write ordering method to try.
1313  */
1314 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1315 			      enum write_ordering_e wo)
1316 {
1317 	struct drbd_device *device;
1318 	enum write_ordering_e pwo;
1319 	int vnr;
1320 	static char *write_ordering_str[] = {
1321 		[WO_none] = "none",
1322 		[WO_drain_io] = "drain",
1323 		[WO_bdev_flush] = "flush",
1324 	};
1325 
1326 	pwo = resource->write_ordering;
1327 	if (wo != WO_bdev_flush)
1328 		wo = min(pwo, wo);
1329 	rcu_read_lock();
1330 	idr_for_each_entry(&resource->devices, device, vnr) {
1331 		if (get_ldev(device)) {
1332 			wo = max_allowed_wo(device->ldev, wo);
1333 			if (device->ldev == bdev)
1334 				bdev = NULL;
1335 			put_ldev(device);
1336 		}
1337 	}
1338 
1339 	if (bdev)
1340 		wo = max_allowed_wo(bdev, wo);
1341 
1342 	rcu_read_unlock();
1343 
1344 	resource->write_ordering = wo;
1345 	if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1346 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1347 }
1348 
1349 /**
1350  * drbd_submit_peer_request()
1351  * @device:	DRBD device.
1352  * @peer_req:	peer request
1353  * @rw:		flag field, see bio->bi_rw
1354  *
1355  * May spread the pages to multiple bios,
1356  * depending on bio_add_page restrictions.
1357  *
1358  * Returns 0 if all bios have been submitted,
1359  * -ENOMEM if we could not allocate enough bios,
1360  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1361  *  single page to an empty bio (which should never happen and likely indicates
1362  *  that the lower level IO stack is in some way broken). This has been observed
1363  *  on certain Xen deployments.
1364  */
1365 /* TODO allocate from our own bio_set. */
1366 int drbd_submit_peer_request(struct drbd_device *device,
1367 			     struct drbd_peer_request *peer_req,
1368 			     const unsigned rw, const int fault_type)
1369 {
1370 	struct bio *bios = NULL;
1371 	struct bio *bio;
1372 	struct page *page = peer_req->pages;
1373 	sector_t sector = peer_req->i.sector;
1374 	unsigned data_size = peer_req->i.size;
1375 	unsigned n_bios = 0;
1376 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1377 	int err = -ENOMEM;
1378 
1379 	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1380 		/* wait for all pending IO completions, before we start
1381 		 * zeroing things out. */
1382 		conn_wait_active_ee_empty(first_peer_device(device)->connection);
1383 		/* add it to the active list now,
1384 		 * so we can find it to present it in debugfs */
1385 		peer_req->submit_jif = jiffies;
1386 		peer_req->flags |= EE_SUBMITTED;
1387 		spin_lock_irq(&device->resource->req_lock);
1388 		list_add_tail(&peer_req->w.list, &device->active_ee);
1389 		spin_unlock_irq(&device->resource->req_lock);
1390 		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1391 			sector, data_size >> 9, GFP_NOIO, false))
1392 			peer_req->flags |= EE_WAS_ERROR;
1393 		drbd_endio_write_sec_final(peer_req);
1394 		return 0;
1395 	}
1396 
1397 	/* Discards don't have any payload.
1398 	 * But the scsi layer still expects a bio_vec it can use internally,
1399 	 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1400 	if (peer_req->flags & EE_IS_TRIM)
1401 		nr_pages = 1;
1402 
1403 	/* In most cases, we will only need one bio.  But in case the lower
1404 	 * level restrictions happen to be different at this offset on this
1405 	 * side than those of the sending peer, we may need to submit the
1406 	 * request in more than one bio.
1407 	 *
1408 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1409 	 * generated bio, but a bio allocated on behalf of the peer.
1410 	 */
1411 next_bio:
1412 	bio = bio_alloc(GFP_NOIO, nr_pages);
1413 	if (!bio) {
1414 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1415 		goto fail;
1416 	}
1417 	/* > peer_req->i.sector, unless this is the first bio */
1418 	bio->bi_iter.bi_sector = sector;
1419 	bio->bi_bdev = device->ldev->backing_bdev;
1420 	bio->bi_rw = rw;
1421 	bio->bi_private = peer_req;
1422 	bio->bi_end_io = drbd_peer_request_endio;
1423 
1424 	bio->bi_next = bios;
1425 	bios = bio;
1426 	++n_bios;
1427 
1428 	if (rw & REQ_DISCARD) {
1429 		bio->bi_iter.bi_size = data_size;
1430 		goto submit;
1431 	}
1432 
1433 	page_chain_for_each(page) {
1434 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1435 		if (!bio_add_page(bio, page, len, 0)) {
1436 			/* A single page must always be possible!
1437 			 * But in case it fails anyways,
1438 			 * we deal with it, and complain (below). */
1439 			if (bio->bi_vcnt == 0) {
1440 				drbd_err(device,
1441 					"bio_add_page failed for len=%u, "
1442 					"bi_vcnt=0 (bi_sector=%llu)\n",
1443 					len, (uint64_t)bio->bi_iter.bi_sector);
1444 				err = -ENOSPC;
1445 				goto fail;
1446 			}
1447 			goto next_bio;
1448 		}
1449 		data_size -= len;
1450 		sector += len >> 9;
1451 		--nr_pages;
1452 	}
1453 	D_ASSERT(device, data_size == 0);
1454 submit:
1455 	D_ASSERT(device, page == NULL);
1456 
1457 	atomic_set(&peer_req->pending_bios, n_bios);
1458 	/* for debugfs: update timestamp, mark as submitted */
1459 	peer_req->submit_jif = jiffies;
1460 	peer_req->flags |= EE_SUBMITTED;
1461 	do {
1462 		bio = bios;
1463 		bios = bios->bi_next;
1464 		bio->bi_next = NULL;
1465 
1466 		drbd_generic_make_request(device, fault_type, bio);
1467 	} while (bios);
1468 	return 0;
1469 
1470 fail:
1471 	while (bios) {
1472 		bio = bios;
1473 		bios = bios->bi_next;
1474 		bio_put(bio);
1475 	}
1476 	return err;
1477 }
1478 
1479 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1480 					     struct drbd_peer_request *peer_req)
1481 {
1482 	struct drbd_interval *i = &peer_req->i;
1483 
1484 	drbd_remove_interval(&device->write_requests, i);
1485 	drbd_clear_interval(i);
1486 
1487 	/* Wake up any processes waiting for this peer request to complete.  */
1488 	if (i->waiting)
1489 		wake_up(&device->misc_wait);
1490 }
1491 
1492 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1493 {
1494 	struct drbd_peer_device *peer_device;
1495 	int vnr;
1496 
1497 	rcu_read_lock();
1498 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1499 		struct drbd_device *device = peer_device->device;
1500 
1501 		kref_get(&device->kref);
1502 		rcu_read_unlock();
1503 		drbd_wait_ee_list_empty(device, &device->active_ee);
1504 		kref_put(&device->kref, drbd_destroy_device);
1505 		rcu_read_lock();
1506 	}
1507 	rcu_read_unlock();
1508 }
1509 
1510 static struct drbd_peer_device *
1511 conn_peer_device(struct drbd_connection *connection, int volume_number)
1512 {
1513 	return idr_find(&connection->peer_devices, volume_number);
1514 }
1515 
1516 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1517 {
1518 	int rv;
1519 	struct p_barrier *p = pi->data;
1520 	struct drbd_epoch *epoch;
1521 
1522 	/* FIXME these are unacked on connection,
1523 	 * not a specific (peer)device.
1524 	 */
1525 	connection->current_epoch->barrier_nr = p->barrier;
1526 	connection->current_epoch->connection = connection;
1527 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1528 
1529 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1530 	 * the activity log, which means it would not be resynced in case the
1531 	 * R_PRIMARY crashes now.
1532 	 * Therefore we must send the barrier_ack after the barrier request was
1533 	 * completed. */
1534 	switch (connection->resource->write_ordering) {
1535 	case WO_none:
1536 		if (rv == FE_RECYCLED)
1537 			return 0;
1538 
1539 		/* receiver context, in the writeout path of the other node.
1540 		 * avoid potential distributed deadlock */
1541 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1542 		if (epoch)
1543 			break;
1544 		else
1545 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1546 			/* Fall through */
1547 
1548 	case WO_bdev_flush:
1549 	case WO_drain_io:
1550 		conn_wait_active_ee_empty(connection);
1551 		drbd_flush(connection);
1552 
1553 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1554 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1555 			if (epoch)
1556 				break;
1557 		}
1558 
1559 		return 0;
1560 	default:
1561 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1562 			 connection->resource->write_ordering);
1563 		return -EIO;
1564 	}
1565 
1566 	epoch->flags = 0;
1567 	atomic_set(&epoch->epoch_size, 0);
1568 	atomic_set(&epoch->active, 0);
1569 
1570 	spin_lock(&connection->epoch_lock);
1571 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1572 		list_add(&epoch->list, &connection->current_epoch->list);
1573 		connection->current_epoch = epoch;
1574 		connection->epochs++;
1575 	} else {
1576 		/* The current_epoch got recycled while we allocated this one... */
1577 		kfree(epoch);
1578 	}
1579 	spin_unlock(&connection->epoch_lock);
1580 
1581 	return 0;
1582 }
1583 
1584 /* used from receive_RSDataReply (recv_resync_read)
1585  * and from receive_Data */
1586 static struct drbd_peer_request *
1587 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1588 	      struct packet_info *pi) __must_hold(local)
1589 {
1590 	struct drbd_device *device = peer_device->device;
1591 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1592 	struct drbd_peer_request *peer_req;
1593 	struct page *page;
1594 	int digest_size, err;
1595 	unsigned int data_size = pi->size, ds;
1596 	void *dig_in = peer_device->connection->int_dig_in;
1597 	void *dig_vv = peer_device->connection->int_dig_vv;
1598 	unsigned long *data;
1599 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1600 
1601 	digest_size = 0;
1602 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1603 		digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1604 		/*
1605 		 * FIXME: Receive the incoming digest into the receive buffer
1606 		 *	  here, together with its struct p_data?
1607 		 */
1608 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1609 		if (err)
1610 			return NULL;
1611 		data_size -= digest_size;
1612 	}
1613 
1614 	if (trim) {
1615 		D_ASSERT(peer_device, data_size == 0);
1616 		data_size = be32_to_cpu(trim->size);
1617 	}
1618 
1619 	if (!expect(IS_ALIGNED(data_size, 512)))
1620 		return NULL;
1621 	/* prepare for larger trim requests. */
1622 	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1623 		return NULL;
1624 
1625 	/* even though we trust out peer,
1626 	 * we sometimes have to double check. */
1627 	if (sector + (data_size>>9) > capacity) {
1628 		drbd_err(device, "request from peer beyond end of local disk: "
1629 			"capacity: %llus < sector: %llus + size: %u\n",
1630 			(unsigned long long)capacity,
1631 			(unsigned long long)sector, data_size);
1632 		return NULL;
1633 	}
1634 
1635 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1636 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1637 	 * which in turn might block on the other node at this very place.  */
1638 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1639 	if (!peer_req)
1640 		return NULL;
1641 
1642 	peer_req->flags |= EE_WRITE;
1643 	if (trim)
1644 		return peer_req;
1645 
1646 	ds = data_size;
1647 	page = peer_req->pages;
1648 	page_chain_for_each(page) {
1649 		unsigned len = min_t(int, ds, PAGE_SIZE);
1650 		data = kmap(page);
1651 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1652 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1653 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1654 			data[0] = data[0] ^ (unsigned long)-1;
1655 		}
1656 		kunmap(page);
1657 		if (err) {
1658 			drbd_free_peer_req(device, peer_req);
1659 			return NULL;
1660 		}
1661 		ds -= len;
1662 	}
1663 
1664 	if (digest_size) {
1665 		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1666 		if (memcmp(dig_in, dig_vv, digest_size)) {
1667 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1668 				(unsigned long long)sector, data_size);
1669 			drbd_free_peer_req(device, peer_req);
1670 			return NULL;
1671 		}
1672 	}
1673 	device->recv_cnt += data_size >> 9;
1674 	return peer_req;
1675 }
1676 
1677 /* drbd_drain_block() just takes a data block
1678  * out of the socket input buffer, and discards it.
1679  */
1680 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1681 {
1682 	struct page *page;
1683 	int err = 0;
1684 	void *data;
1685 
1686 	if (!data_size)
1687 		return 0;
1688 
1689 	page = drbd_alloc_pages(peer_device, 1, 1);
1690 
1691 	data = kmap(page);
1692 	while (data_size) {
1693 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1694 
1695 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1696 		if (err)
1697 			break;
1698 		data_size -= len;
1699 	}
1700 	kunmap(page);
1701 	drbd_free_pages(peer_device->device, page, 0);
1702 	return err;
1703 }
1704 
1705 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1706 			   sector_t sector, int data_size)
1707 {
1708 	struct bio_vec bvec;
1709 	struct bvec_iter iter;
1710 	struct bio *bio;
1711 	int digest_size, err, expect;
1712 	void *dig_in = peer_device->connection->int_dig_in;
1713 	void *dig_vv = peer_device->connection->int_dig_vv;
1714 
1715 	digest_size = 0;
1716 	if (peer_device->connection->peer_integrity_tfm) {
1717 		digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1718 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1719 		if (err)
1720 			return err;
1721 		data_size -= digest_size;
1722 	}
1723 
1724 	/* optimistically update recv_cnt.  if receiving fails below,
1725 	 * we disconnect anyways, and counters will be reset. */
1726 	peer_device->device->recv_cnt += data_size>>9;
1727 
1728 	bio = req->master_bio;
1729 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1730 
1731 	bio_for_each_segment(bvec, bio, iter) {
1732 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1733 		expect = min_t(int, data_size, bvec.bv_len);
1734 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1735 		kunmap(bvec.bv_page);
1736 		if (err)
1737 			return err;
1738 		data_size -= expect;
1739 	}
1740 
1741 	if (digest_size) {
1742 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1743 		if (memcmp(dig_in, dig_vv, digest_size)) {
1744 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1745 			return -EINVAL;
1746 		}
1747 	}
1748 
1749 	D_ASSERT(peer_device->device, data_size == 0);
1750 	return 0;
1751 }
1752 
1753 /*
1754  * e_end_resync_block() is called in asender context via
1755  * drbd_finish_peer_reqs().
1756  */
1757 static int e_end_resync_block(struct drbd_work *w, int unused)
1758 {
1759 	struct drbd_peer_request *peer_req =
1760 		container_of(w, struct drbd_peer_request, w);
1761 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1762 	struct drbd_device *device = peer_device->device;
1763 	sector_t sector = peer_req->i.sector;
1764 	int err;
1765 
1766 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1767 
1768 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1769 		drbd_set_in_sync(device, sector, peer_req->i.size);
1770 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1771 	} else {
1772 		/* Record failure to sync */
1773 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1774 
1775 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1776 	}
1777 	dec_unacked(device);
1778 
1779 	return err;
1780 }
1781 
1782 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1783 			    struct packet_info *pi) __releases(local)
1784 {
1785 	struct drbd_device *device = peer_device->device;
1786 	struct drbd_peer_request *peer_req;
1787 
1788 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1789 	if (!peer_req)
1790 		goto fail;
1791 
1792 	dec_rs_pending(device);
1793 
1794 	inc_unacked(device);
1795 	/* corresponding dec_unacked() in e_end_resync_block()
1796 	 * respective _drbd_clear_done_ee */
1797 
1798 	peer_req->w.cb = e_end_resync_block;
1799 	peer_req->submit_jif = jiffies;
1800 
1801 	spin_lock_irq(&device->resource->req_lock);
1802 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1803 	spin_unlock_irq(&device->resource->req_lock);
1804 
1805 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1806 	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1807 		return 0;
1808 
1809 	/* don't care for the reason here */
1810 	drbd_err(device, "submit failed, triggering re-connect\n");
1811 	spin_lock_irq(&device->resource->req_lock);
1812 	list_del(&peer_req->w.list);
1813 	spin_unlock_irq(&device->resource->req_lock);
1814 
1815 	drbd_free_peer_req(device, peer_req);
1816 fail:
1817 	put_ldev(device);
1818 	return -EIO;
1819 }
1820 
1821 static struct drbd_request *
1822 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1823 	     sector_t sector, bool missing_ok, const char *func)
1824 {
1825 	struct drbd_request *req;
1826 
1827 	/* Request object according to our peer */
1828 	req = (struct drbd_request *)(unsigned long)id;
1829 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1830 		return req;
1831 	if (!missing_ok) {
1832 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1833 			(unsigned long)id, (unsigned long long)sector);
1834 	}
1835 	return NULL;
1836 }
1837 
1838 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1839 {
1840 	struct drbd_peer_device *peer_device;
1841 	struct drbd_device *device;
1842 	struct drbd_request *req;
1843 	sector_t sector;
1844 	int err;
1845 	struct p_data *p = pi->data;
1846 
1847 	peer_device = conn_peer_device(connection, pi->vnr);
1848 	if (!peer_device)
1849 		return -EIO;
1850 	device = peer_device->device;
1851 
1852 	sector = be64_to_cpu(p->sector);
1853 
1854 	spin_lock_irq(&device->resource->req_lock);
1855 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1856 	spin_unlock_irq(&device->resource->req_lock);
1857 	if (unlikely(!req))
1858 		return -EIO;
1859 
1860 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1861 	 * special casing it there for the various failure cases.
1862 	 * still no race with drbd_fail_pending_reads */
1863 	err = recv_dless_read(peer_device, req, sector, pi->size);
1864 	if (!err)
1865 		req_mod(req, DATA_RECEIVED);
1866 	/* else: nothing. handled from drbd_disconnect...
1867 	 * I don't think we may complete this just yet
1868 	 * in case we are "on-disconnect: freeze" */
1869 
1870 	return err;
1871 }
1872 
1873 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1874 {
1875 	struct drbd_peer_device *peer_device;
1876 	struct drbd_device *device;
1877 	sector_t sector;
1878 	int err;
1879 	struct p_data *p = pi->data;
1880 
1881 	peer_device = conn_peer_device(connection, pi->vnr);
1882 	if (!peer_device)
1883 		return -EIO;
1884 	device = peer_device->device;
1885 
1886 	sector = be64_to_cpu(p->sector);
1887 	D_ASSERT(device, p->block_id == ID_SYNCER);
1888 
1889 	if (get_ldev(device)) {
1890 		/* data is submitted to disk within recv_resync_read.
1891 		 * corresponding put_ldev done below on error,
1892 		 * or in drbd_peer_request_endio. */
1893 		err = recv_resync_read(peer_device, sector, pi);
1894 	} else {
1895 		if (__ratelimit(&drbd_ratelimit_state))
1896 			drbd_err(device, "Can not write resync data to local disk.\n");
1897 
1898 		err = drbd_drain_block(peer_device, pi->size);
1899 
1900 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1901 	}
1902 
1903 	atomic_add(pi->size >> 9, &device->rs_sect_in);
1904 
1905 	return err;
1906 }
1907 
1908 static void restart_conflicting_writes(struct drbd_device *device,
1909 				       sector_t sector, int size)
1910 {
1911 	struct drbd_interval *i;
1912 	struct drbd_request *req;
1913 
1914 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1915 		if (!i->local)
1916 			continue;
1917 		req = container_of(i, struct drbd_request, i);
1918 		if (req->rq_state & RQ_LOCAL_PENDING ||
1919 		    !(req->rq_state & RQ_POSTPONED))
1920 			continue;
1921 		/* as it is RQ_POSTPONED, this will cause it to
1922 		 * be queued on the retry workqueue. */
1923 		__req_mod(req, CONFLICT_RESOLVED, NULL);
1924 	}
1925 }
1926 
1927 /*
1928  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1929  */
1930 static int e_end_block(struct drbd_work *w, int cancel)
1931 {
1932 	struct drbd_peer_request *peer_req =
1933 		container_of(w, struct drbd_peer_request, w);
1934 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1935 	struct drbd_device *device = peer_device->device;
1936 	sector_t sector = peer_req->i.sector;
1937 	int err = 0, pcmd;
1938 
1939 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1940 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1941 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1942 				device->state.conn <= C_PAUSED_SYNC_T &&
1943 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1944 				P_RS_WRITE_ACK : P_WRITE_ACK;
1945 			err = drbd_send_ack(peer_device, pcmd, peer_req);
1946 			if (pcmd == P_RS_WRITE_ACK)
1947 				drbd_set_in_sync(device, sector, peer_req->i.size);
1948 		} else {
1949 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1950 			/* we expect it to be marked out of sync anyways...
1951 			 * maybe assert this?  */
1952 		}
1953 		dec_unacked(device);
1954 	}
1955 
1956 	/* we delete from the conflict detection hash _after_ we sent out the
1957 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1958 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1959 		spin_lock_irq(&device->resource->req_lock);
1960 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1961 		drbd_remove_epoch_entry_interval(device, peer_req);
1962 		if (peer_req->flags & EE_RESTART_REQUESTS)
1963 			restart_conflicting_writes(device, sector, peer_req->i.size);
1964 		spin_unlock_irq(&device->resource->req_lock);
1965 	} else
1966 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1967 
1968 	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1969 
1970 	return err;
1971 }
1972 
1973 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1974 {
1975 	struct drbd_peer_request *peer_req =
1976 		container_of(w, struct drbd_peer_request, w);
1977 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1978 	int err;
1979 
1980 	err = drbd_send_ack(peer_device, ack, peer_req);
1981 	dec_unacked(peer_device->device);
1982 
1983 	return err;
1984 }
1985 
1986 static int e_send_superseded(struct drbd_work *w, int unused)
1987 {
1988 	return e_send_ack(w, P_SUPERSEDED);
1989 }
1990 
1991 static int e_send_retry_write(struct drbd_work *w, int unused)
1992 {
1993 	struct drbd_peer_request *peer_req =
1994 		container_of(w, struct drbd_peer_request, w);
1995 	struct drbd_connection *connection = peer_req->peer_device->connection;
1996 
1997 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1998 			     P_RETRY_WRITE : P_SUPERSEDED);
1999 }
2000 
2001 static bool seq_greater(u32 a, u32 b)
2002 {
2003 	/*
2004 	 * We assume 32-bit wrap-around here.
2005 	 * For 24-bit wrap-around, we would have to shift:
2006 	 *  a <<= 8; b <<= 8;
2007 	 */
2008 	return (s32)a - (s32)b > 0;
2009 }
2010 
2011 static u32 seq_max(u32 a, u32 b)
2012 {
2013 	return seq_greater(a, b) ? a : b;
2014 }
2015 
2016 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2017 {
2018 	struct drbd_device *device = peer_device->device;
2019 	unsigned int newest_peer_seq;
2020 
2021 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2022 		spin_lock(&device->peer_seq_lock);
2023 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2024 		device->peer_seq = newest_peer_seq;
2025 		spin_unlock(&device->peer_seq_lock);
2026 		/* wake up only if we actually changed device->peer_seq */
2027 		if (peer_seq == newest_peer_seq)
2028 			wake_up(&device->seq_wait);
2029 	}
2030 }
2031 
2032 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2033 {
2034 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2035 }
2036 
2037 /* maybe change sync_ee into interval trees as well? */
2038 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2039 {
2040 	struct drbd_peer_request *rs_req;
2041 	bool rv = 0;
2042 
2043 	spin_lock_irq(&device->resource->req_lock);
2044 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2045 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2046 			     rs_req->i.sector, rs_req->i.size)) {
2047 			rv = 1;
2048 			break;
2049 		}
2050 	}
2051 	spin_unlock_irq(&device->resource->req_lock);
2052 
2053 	return rv;
2054 }
2055 
2056 /* Called from receive_Data.
2057  * Synchronize packets on sock with packets on msock.
2058  *
2059  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2060  * packet traveling on msock, they are still processed in the order they have
2061  * been sent.
2062  *
2063  * Note: we don't care for Ack packets overtaking P_DATA packets.
2064  *
2065  * In case packet_seq is larger than device->peer_seq number, there are
2066  * outstanding packets on the msock. We wait for them to arrive.
2067  * In case we are the logically next packet, we update device->peer_seq
2068  * ourselves. Correctly handles 32bit wrap around.
2069  *
2070  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2071  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2072  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2073  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2074  *
2075  * returns 0 if we may process the packet,
2076  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2077 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2078 {
2079 	struct drbd_device *device = peer_device->device;
2080 	DEFINE_WAIT(wait);
2081 	long timeout;
2082 	int ret = 0, tp;
2083 
2084 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2085 		return 0;
2086 
2087 	spin_lock(&device->peer_seq_lock);
2088 	for (;;) {
2089 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2090 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2091 			break;
2092 		}
2093 
2094 		if (signal_pending(current)) {
2095 			ret = -ERESTARTSYS;
2096 			break;
2097 		}
2098 
2099 		rcu_read_lock();
2100 		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2101 		rcu_read_unlock();
2102 
2103 		if (!tp)
2104 			break;
2105 
2106 		/* Only need to wait if two_primaries is enabled */
2107 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2108 		spin_unlock(&device->peer_seq_lock);
2109 		rcu_read_lock();
2110 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2111 		rcu_read_unlock();
2112 		timeout = schedule_timeout(timeout);
2113 		spin_lock(&device->peer_seq_lock);
2114 		if (!timeout) {
2115 			ret = -ETIMEDOUT;
2116 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2117 			break;
2118 		}
2119 	}
2120 	spin_unlock(&device->peer_seq_lock);
2121 	finish_wait(&device->seq_wait, &wait);
2122 	return ret;
2123 }
2124 
2125 /* see also bio_flags_to_wire()
2126  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2127  * flags and back. We may replicate to other kernel versions. */
2128 static unsigned long wire_flags_to_bio(u32 dpf)
2129 {
2130 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2131 		(dpf & DP_FUA ? REQ_FUA : 0) |
2132 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2133 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2134 }
2135 
2136 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2137 				    unsigned int size)
2138 {
2139 	struct drbd_interval *i;
2140 
2141     repeat:
2142 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2143 		struct drbd_request *req;
2144 		struct bio_and_error m;
2145 
2146 		if (!i->local)
2147 			continue;
2148 		req = container_of(i, struct drbd_request, i);
2149 		if (!(req->rq_state & RQ_POSTPONED))
2150 			continue;
2151 		req->rq_state &= ~RQ_POSTPONED;
2152 		__req_mod(req, NEG_ACKED, &m);
2153 		spin_unlock_irq(&device->resource->req_lock);
2154 		if (m.bio)
2155 			complete_master_bio(device, &m);
2156 		spin_lock_irq(&device->resource->req_lock);
2157 		goto repeat;
2158 	}
2159 }
2160 
2161 static int handle_write_conflicts(struct drbd_device *device,
2162 				  struct drbd_peer_request *peer_req)
2163 {
2164 	struct drbd_connection *connection = peer_req->peer_device->connection;
2165 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2166 	sector_t sector = peer_req->i.sector;
2167 	const unsigned int size = peer_req->i.size;
2168 	struct drbd_interval *i;
2169 	bool equal;
2170 	int err;
2171 
2172 	/*
2173 	 * Inserting the peer request into the write_requests tree will prevent
2174 	 * new conflicting local requests from being added.
2175 	 */
2176 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2177 
2178     repeat:
2179 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2180 		if (i == &peer_req->i)
2181 			continue;
2182 		if (i->completed)
2183 			continue;
2184 
2185 		if (!i->local) {
2186 			/*
2187 			 * Our peer has sent a conflicting remote request; this
2188 			 * should not happen in a two-node setup.  Wait for the
2189 			 * earlier peer request to complete.
2190 			 */
2191 			err = drbd_wait_misc(device, i);
2192 			if (err)
2193 				goto out;
2194 			goto repeat;
2195 		}
2196 
2197 		equal = i->sector == sector && i->size == size;
2198 		if (resolve_conflicts) {
2199 			/*
2200 			 * If the peer request is fully contained within the
2201 			 * overlapping request, it can be considered overwritten
2202 			 * and thus superseded; otherwise, it will be retried
2203 			 * once all overlapping requests have completed.
2204 			 */
2205 			bool superseded = i->sector <= sector && i->sector +
2206 				       (i->size >> 9) >= sector + (size >> 9);
2207 
2208 			if (!equal)
2209 				drbd_alert(device, "Concurrent writes detected: "
2210 					       "local=%llus +%u, remote=%llus +%u, "
2211 					       "assuming %s came first\n",
2212 					  (unsigned long long)i->sector, i->size,
2213 					  (unsigned long long)sector, size,
2214 					  superseded ? "local" : "remote");
2215 
2216 			peer_req->w.cb = superseded ? e_send_superseded :
2217 						   e_send_retry_write;
2218 			list_add_tail(&peer_req->w.list, &device->done_ee);
2219 			wake_asender(connection);
2220 
2221 			err = -ENOENT;
2222 			goto out;
2223 		} else {
2224 			struct drbd_request *req =
2225 				container_of(i, struct drbd_request, i);
2226 
2227 			if (!equal)
2228 				drbd_alert(device, "Concurrent writes detected: "
2229 					       "local=%llus +%u, remote=%llus +%u\n",
2230 					  (unsigned long long)i->sector, i->size,
2231 					  (unsigned long long)sector, size);
2232 
2233 			if (req->rq_state & RQ_LOCAL_PENDING ||
2234 			    !(req->rq_state & RQ_POSTPONED)) {
2235 				/*
2236 				 * Wait for the node with the discard flag to
2237 				 * decide if this request has been superseded
2238 				 * or needs to be retried.
2239 				 * Requests that have been superseded will
2240 				 * disappear from the write_requests tree.
2241 				 *
2242 				 * In addition, wait for the conflicting
2243 				 * request to finish locally before submitting
2244 				 * the conflicting peer request.
2245 				 */
2246 				err = drbd_wait_misc(device, &req->i);
2247 				if (err) {
2248 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2249 					fail_postponed_requests(device, sector, size);
2250 					goto out;
2251 				}
2252 				goto repeat;
2253 			}
2254 			/*
2255 			 * Remember to restart the conflicting requests after
2256 			 * the new peer request has completed.
2257 			 */
2258 			peer_req->flags |= EE_RESTART_REQUESTS;
2259 		}
2260 	}
2261 	err = 0;
2262 
2263     out:
2264 	if (err)
2265 		drbd_remove_epoch_entry_interval(device, peer_req);
2266 	return err;
2267 }
2268 
2269 /* mirrored write */
2270 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2271 {
2272 	struct drbd_peer_device *peer_device;
2273 	struct drbd_device *device;
2274 	struct net_conf *nc;
2275 	sector_t sector;
2276 	struct drbd_peer_request *peer_req;
2277 	struct p_data *p = pi->data;
2278 	u32 peer_seq = be32_to_cpu(p->seq_num);
2279 	int rw = WRITE;
2280 	u32 dp_flags;
2281 	int err, tp;
2282 
2283 	peer_device = conn_peer_device(connection, pi->vnr);
2284 	if (!peer_device)
2285 		return -EIO;
2286 	device = peer_device->device;
2287 
2288 	if (!get_ldev(device)) {
2289 		int err2;
2290 
2291 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2292 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2293 		atomic_inc(&connection->current_epoch->epoch_size);
2294 		err2 = drbd_drain_block(peer_device, pi->size);
2295 		if (!err)
2296 			err = err2;
2297 		return err;
2298 	}
2299 
2300 	/*
2301 	 * Corresponding put_ldev done either below (on various errors), or in
2302 	 * drbd_peer_request_endio, if we successfully submit the data at the
2303 	 * end of this function.
2304 	 */
2305 
2306 	sector = be64_to_cpu(p->sector);
2307 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2308 	if (!peer_req) {
2309 		put_ldev(device);
2310 		return -EIO;
2311 	}
2312 
2313 	peer_req->w.cb = e_end_block;
2314 	peer_req->submit_jif = jiffies;
2315 	peer_req->flags |= EE_APPLICATION;
2316 
2317 	dp_flags = be32_to_cpu(p->dp_flags);
2318 	rw |= wire_flags_to_bio(dp_flags);
2319 	if (pi->cmd == P_TRIM) {
2320 		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2321 		peer_req->flags |= EE_IS_TRIM;
2322 		if (!blk_queue_discard(q))
2323 			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2324 		D_ASSERT(peer_device, peer_req->i.size > 0);
2325 		D_ASSERT(peer_device, rw & REQ_DISCARD);
2326 		D_ASSERT(peer_device, peer_req->pages == NULL);
2327 	} else if (peer_req->pages == NULL) {
2328 		D_ASSERT(device, peer_req->i.size == 0);
2329 		D_ASSERT(device, dp_flags & DP_FLUSH);
2330 	}
2331 
2332 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2333 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2334 
2335 	spin_lock(&connection->epoch_lock);
2336 	peer_req->epoch = connection->current_epoch;
2337 	atomic_inc(&peer_req->epoch->epoch_size);
2338 	atomic_inc(&peer_req->epoch->active);
2339 	spin_unlock(&connection->epoch_lock);
2340 
2341 	rcu_read_lock();
2342 	nc = rcu_dereference(peer_device->connection->net_conf);
2343 	tp = nc->two_primaries;
2344 	if (peer_device->connection->agreed_pro_version < 100) {
2345 		switch (nc->wire_protocol) {
2346 		case DRBD_PROT_C:
2347 			dp_flags |= DP_SEND_WRITE_ACK;
2348 			break;
2349 		case DRBD_PROT_B:
2350 			dp_flags |= DP_SEND_RECEIVE_ACK;
2351 			break;
2352 		}
2353 	}
2354 	rcu_read_unlock();
2355 
2356 	if (dp_flags & DP_SEND_WRITE_ACK) {
2357 		peer_req->flags |= EE_SEND_WRITE_ACK;
2358 		inc_unacked(device);
2359 		/* corresponding dec_unacked() in e_end_block()
2360 		 * respective _drbd_clear_done_ee */
2361 	}
2362 
2363 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2364 		/* I really don't like it that the receiver thread
2365 		 * sends on the msock, but anyways */
2366 		drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2367 	}
2368 
2369 	if (tp) {
2370 		/* two primaries implies protocol C */
2371 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2372 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2373 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2374 		if (err)
2375 			goto out_interrupted;
2376 		spin_lock_irq(&device->resource->req_lock);
2377 		err = handle_write_conflicts(device, peer_req);
2378 		if (err) {
2379 			spin_unlock_irq(&device->resource->req_lock);
2380 			if (err == -ENOENT) {
2381 				put_ldev(device);
2382 				return 0;
2383 			}
2384 			goto out_interrupted;
2385 		}
2386 	} else {
2387 		update_peer_seq(peer_device, peer_seq);
2388 		spin_lock_irq(&device->resource->req_lock);
2389 	}
2390 	/* if we use the zeroout fallback code, we process synchronously
2391 	 * and we wait for all pending requests, respectively wait for
2392 	 * active_ee to become empty in drbd_submit_peer_request();
2393 	 * better not add ourselves here. */
2394 	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2395 		list_add_tail(&peer_req->w.list, &device->active_ee);
2396 	spin_unlock_irq(&device->resource->req_lock);
2397 
2398 	if (device->state.conn == C_SYNC_TARGET)
2399 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2400 
2401 	if (device->state.pdsk < D_INCONSISTENT) {
2402 		/* In case we have the only disk of the cluster, */
2403 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2404 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2405 		drbd_al_begin_io(device, &peer_req->i);
2406 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2407 	}
2408 
2409 	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2410 	if (!err)
2411 		return 0;
2412 
2413 	/* don't care for the reason here */
2414 	drbd_err(device, "submit failed, triggering re-connect\n");
2415 	spin_lock_irq(&device->resource->req_lock);
2416 	list_del(&peer_req->w.list);
2417 	drbd_remove_epoch_entry_interval(device, peer_req);
2418 	spin_unlock_irq(&device->resource->req_lock);
2419 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2420 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2421 		drbd_al_complete_io(device, &peer_req->i);
2422 	}
2423 
2424 out_interrupted:
2425 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2426 	put_ldev(device);
2427 	drbd_free_peer_req(device, peer_req);
2428 	return err;
2429 }
2430 
2431 /* We may throttle resync, if the lower device seems to be busy,
2432  * and current sync rate is above c_min_rate.
2433  *
2434  * To decide whether or not the lower device is busy, we use a scheme similar
2435  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2436  * (more than 64 sectors) of activity we cannot account for with our own resync
2437  * activity, it obviously is "busy".
2438  *
2439  * The current sync rate used here uses only the most recent two step marks,
2440  * to have a short time average so we can react faster.
2441  */
2442 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2443 		bool throttle_if_app_is_waiting)
2444 {
2445 	struct lc_element *tmp;
2446 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2447 
2448 	if (!throttle || throttle_if_app_is_waiting)
2449 		return throttle;
2450 
2451 	spin_lock_irq(&device->al_lock);
2452 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2453 	if (tmp) {
2454 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2455 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2456 			throttle = false;
2457 		/* Do not slow down if app IO is already waiting for this extent,
2458 		 * and our progress is necessary for application IO to complete. */
2459 	}
2460 	spin_unlock_irq(&device->al_lock);
2461 
2462 	return throttle;
2463 }
2464 
2465 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2466 {
2467 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2468 	unsigned long db, dt, dbdt;
2469 	unsigned int c_min_rate;
2470 	int curr_events;
2471 
2472 	rcu_read_lock();
2473 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2474 	rcu_read_unlock();
2475 
2476 	/* feature disabled? */
2477 	if (c_min_rate == 0)
2478 		return false;
2479 
2480 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2481 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2482 			atomic_read(&device->rs_sect_ev);
2483 
2484 	if (atomic_read(&device->ap_actlog_cnt)
2485 	    || curr_events - device->rs_last_events > 64) {
2486 		unsigned long rs_left;
2487 		int i;
2488 
2489 		device->rs_last_events = curr_events;
2490 
2491 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2492 		 * approx. */
2493 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2494 
2495 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2496 			rs_left = device->ov_left;
2497 		else
2498 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2499 
2500 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2501 		if (!dt)
2502 			dt++;
2503 		db = device->rs_mark_left[i] - rs_left;
2504 		dbdt = Bit2KB(db/dt);
2505 
2506 		if (dbdt > c_min_rate)
2507 			return true;
2508 	}
2509 	return false;
2510 }
2511 
2512 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2513 {
2514 	struct drbd_peer_device *peer_device;
2515 	struct drbd_device *device;
2516 	sector_t sector;
2517 	sector_t capacity;
2518 	struct drbd_peer_request *peer_req;
2519 	struct digest_info *di = NULL;
2520 	int size, verb;
2521 	unsigned int fault_type;
2522 	struct p_block_req *p =	pi->data;
2523 
2524 	peer_device = conn_peer_device(connection, pi->vnr);
2525 	if (!peer_device)
2526 		return -EIO;
2527 	device = peer_device->device;
2528 	capacity = drbd_get_capacity(device->this_bdev);
2529 
2530 	sector = be64_to_cpu(p->sector);
2531 	size   = be32_to_cpu(p->blksize);
2532 
2533 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2534 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2535 				(unsigned long long)sector, size);
2536 		return -EINVAL;
2537 	}
2538 	if (sector + (size>>9) > capacity) {
2539 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2540 				(unsigned long long)sector, size);
2541 		return -EINVAL;
2542 	}
2543 
2544 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2545 		verb = 1;
2546 		switch (pi->cmd) {
2547 		case P_DATA_REQUEST:
2548 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2549 			break;
2550 		case P_RS_DATA_REQUEST:
2551 		case P_CSUM_RS_REQUEST:
2552 		case P_OV_REQUEST:
2553 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2554 			break;
2555 		case P_OV_REPLY:
2556 			verb = 0;
2557 			dec_rs_pending(device);
2558 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2559 			break;
2560 		default:
2561 			BUG();
2562 		}
2563 		if (verb && __ratelimit(&drbd_ratelimit_state))
2564 			drbd_err(device, "Can not satisfy peer's read request, "
2565 			    "no local data.\n");
2566 
2567 		/* drain possibly payload */
2568 		return drbd_drain_block(peer_device, pi->size);
2569 	}
2570 
2571 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2572 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2573 	 * which in turn might block on the other node at this very place.  */
2574 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2575 			true /* has real payload */, GFP_NOIO);
2576 	if (!peer_req) {
2577 		put_ldev(device);
2578 		return -ENOMEM;
2579 	}
2580 
2581 	switch (pi->cmd) {
2582 	case P_DATA_REQUEST:
2583 		peer_req->w.cb = w_e_end_data_req;
2584 		fault_type = DRBD_FAULT_DT_RD;
2585 		/* application IO, don't drbd_rs_begin_io */
2586 		peer_req->flags |= EE_APPLICATION;
2587 		goto submit;
2588 
2589 	case P_RS_DATA_REQUEST:
2590 		peer_req->w.cb = w_e_end_rsdata_req;
2591 		fault_type = DRBD_FAULT_RS_RD;
2592 		/* used in the sector offset progress display */
2593 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2594 		break;
2595 
2596 	case P_OV_REPLY:
2597 	case P_CSUM_RS_REQUEST:
2598 		fault_type = DRBD_FAULT_RS_RD;
2599 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2600 		if (!di)
2601 			goto out_free_e;
2602 
2603 		di->digest_size = pi->size;
2604 		di->digest = (((char *)di)+sizeof(struct digest_info));
2605 
2606 		peer_req->digest = di;
2607 		peer_req->flags |= EE_HAS_DIGEST;
2608 
2609 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2610 			goto out_free_e;
2611 
2612 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2613 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2614 			peer_req->w.cb = w_e_end_csum_rs_req;
2615 			/* used in the sector offset progress display */
2616 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2617 			/* remember to report stats in drbd_resync_finished */
2618 			device->use_csums = true;
2619 		} else if (pi->cmd == P_OV_REPLY) {
2620 			/* track progress, we may need to throttle */
2621 			atomic_add(size >> 9, &device->rs_sect_in);
2622 			peer_req->w.cb = w_e_end_ov_reply;
2623 			dec_rs_pending(device);
2624 			/* drbd_rs_begin_io done when we sent this request,
2625 			 * but accounting still needs to be done. */
2626 			goto submit_for_resync;
2627 		}
2628 		break;
2629 
2630 	case P_OV_REQUEST:
2631 		if (device->ov_start_sector == ~(sector_t)0 &&
2632 		    peer_device->connection->agreed_pro_version >= 90) {
2633 			unsigned long now = jiffies;
2634 			int i;
2635 			device->ov_start_sector = sector;
2636 			device->ov_position = sector;
2637 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2638 			device->rs_total = device->ov_left;
2639 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2640 				device->rs_mark_left[i] = device->ov_left;
2641 				device->rs_mark_time[i] = now;
2642 			}
2643 			drbd_info(device, "Online Verify start sector: %llu\n",
2644 					(unsigned long long)sector);
2645 		}
2646 		peer_req->w.cb = w_e_end_ov_req;
2647 		fault_type = DRBD_FAULT_RS_RD;
2648 		break;
2649 
2650 	default:
2651 		BUG();
2652 	}
2653 
2654 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2655 	 * wrt the receiver, but it is not as straightforward as it may seem.
2656 	 * Various places in the resync start and stop logic assume resync
2657 	 * requests are processed in order, requeuing this on the worker thread
2658 	 * introduces a bunch of new code for synchronization between threads.
2659 	 *
2660 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2661 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2662 	 * for application writes for the same time.  For now, just throttle
2663 	 * here, where the rest of the code expects the receiver to sleep for
2664 	 * a while, anyways.
2665 	 */
2666 
2667 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2668 	 * this defers syncer requests for some time, before letting at least
2669 	 * on request through.  The resync controller on the receiving side
2670 	 * will adapt to the incoming rate accordingly.
2671 	 *
2672 	 * We cannot throttle here if remote is Primary/SyncTarget:
2673 	 * we would also throttle its application reads.
2674 	 * In that case, throttling is done on the SyncTarget only.
2675 	 */
2676 
2677 	/* Even though this may be a resync request, we do add to "read_ee";
2678 	 * "sync_ee" is only used for resync WRITEs.
2679 	 * Add to list early, so debugfs can find this request
2680 	 * even if we have to sleep below. */
2681 	spin_lock_irq(&device->resource->req_lock);
2682 	list_add_tail(&peer_req->w.list, &device->read_ee);
2683 	spin_unlock_irq(&device->resource->req_lock);
2684 
2685 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2686 	if (device->state.peer != R_PRIMARY
2687 	&& drbd_rs_should_slow_down(device, sector, false))
2688 		schedule_timeout_uninterruptible(HZ/10);
2689 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2690 	if (drbd_rs_begin_io(device, sector))
2691 		goto out_free_e;
2692 
2693 submit_for_resync:
2694 	atomic_add(size >> 9, &device->rs_sect_ev);
2695 
2696 submit:
2697 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2698 	inc_unacked(device);
2699 	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2700 		return 0;
2701 
2702 	/* don't care for the reason here */
2703 	drbd_err(device, "submit failed, triggering re-connect\n");
2704 
2705 out_free_e:
2706 	spin_lock_irq(&device->resource->req_lock);
2707 	list_del(&peer_req->w.list);
2708 	spin_unlock_irq(&device->resource->req_lock);
2709 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2710 
2711 	put_ldev(device);
2712 	drbd_free_peer_req(device, peer_req);
2713 	return -EIO;
2714 }
2715 
2716 /**
2717  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2718  */
2719 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2720 {
2721 	struct drbd_device *device = peer_device->device;
2722 	int self, peer, rv = -100;
2723 	unsigned long ch_self, ch_peer;
2724 	enum drbd_after_sb_p after_sb_0p;
2725 
2726 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2727 	peer = device->p_uuid[UI_BITMAP] & 1;
2728 
2729 	ch_peer = device->p_uuid[UI_SIZE];
2730 	ch_self = device->comm_bm_set;
2731 
2732 	rcu_read_lock();
2733 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2734 	rcu_read_unlock();
2735 	switch (after_sb_0p) {
2736 	case ASB_CONSENSUS:
2737 	case ASB_DISCARD_SECONDARY:
2738 	case ASB_CALL_HELPER:
2739 	case ASB_VIOLENTLY:
2740 		drbd_err(device, "Configuration error.\n");
2741 		break;
2742 	case ASB_DISCONNECT:
2743 		break;
2744 	case ASB_DISCARD_YOUNGER_PRI:
2745 		if (self == 0 && peer == 1) {
2746 			rv = -1;
2747 			break;
2748 		}
2749 		if (self == 1 && peer == 0) {
2750 			rv =  1;
2751 			break;
2752 		}
2753 		/* Else fall through to one of the other strategies... */
2754 	case ASB_DISCARD_OLDER_PRI:
2755 		if (self == 0 && peer == 1) {
2756 			rv = 1;
2757 			break;
2758 		}
2759 		if (self == 1 && peer == 0) {
2760 			rv = -1;
2761 			break;
2762 		}
2763 		/* Else fall through to one of the other strategies... */
2764 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2765 		     "Using discard-least-changes instead\n");
2766 	case ASB_DISCARD_ZERO_CHG:
2767 		if (ch_peer == 0 && ch_self == 0) {
2768 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2769 				? -1 : 1;
2770 			break;
2771 		} else {
2772 			if (ch_peer == 0) { rv =  1; break; }
2773 			if (ch_self == 0) { rv = -1; break; }
2774 		}
2775 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2776 			break;
2777 	case ASB_DISCARD_LEAST_CHG:
2778 		if	(ch_self < ch_peer)
2779 			rv = -1;
2780 		else if (ch_self > ch_peer)
2781 			rv =  1;
2782 		else /* ( ch_self == ch_peer ) */
2783 		     /* Well, then use something else. */
2784 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2785 				? -1 : 1;
2786 		break;
2787 	case ASB_DISCARD_LOCAL:
2788 		rv = -1;
2789 		break;
2790 	case ASB_DISCARD_REMOTE:
2791 		rv =  1;
2792 	}
2793 
2794 	return rv;
2795 }
2796 
2797 /**
2798  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2799  */
2800 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2801 {
2802 	struct drbd_device *device = peer_device->device;
2803 	int hg, rv = -100;
2804 	enum drbd_after_sb_p after_sb_1p;
2805 
2806 	rcu_read_lock();
2807 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2808 	rcu_read_unlock();
2809 	switch (after_sb_1p) {
2810 	case ASB_DISCARD_YOUNGER_PRI:
2811 	case ASB_DISCARD_OLDER_PRI:
2812 	case ASB_DISCARD_LEAST_CHG:
2813 	case ASB_DISCARD_LOCAL:
2814 	case ASB_DISCARD_REMOTE:
2815 	case ASB_DISCARD_ZERO_CHG:
2816 		drbd_err(device, "Configuration error.\n");
2817 		break;
2818 	case ASB_DISCONNECT:
2819 		break;
2820 	case ASB_CONSENSUS:
2821 		hg = drbd_asb_recover_0p(peer_device);
2822 		if (hg == -1 && device->state.role == R_SECONDARY)
2823 			rv = hg;
2824 		if (hg == 1  && device->state.role == R_PRIMARY)
2825 			rv = hg;
2826 		break;
2827 	case ASB_VIOLENTLY:
2828 		rv = drbd_asb_recover_0p(peer_device);
2829 		break;
2830 	case ASB_DISCARD_SECONDARY:
2831 		return device->state.role == R_PRIMARY ? 1 : -1;
2832 	case ASB_CALL_HELPER:
2833 		hg = drbd_asb_recover_0p(peer_device);
2834 		if (hg == -1 && device->state.role == R_PRIMARY) {
2835 			enum drbd_state_rv rv2;
2836 
2837 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2838 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2839 			  * we do not need to wait for the after state change work either. */
2840 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2841 			if (rv2 != SS_SUCCESS) {
2842 				drbd_khelper(device, "pri-lost-after-sb");
2843 			} else {
2844 				drbd_warn(device, "Successfully gave up primary role.\n");
2845 				rv = hg;
2846 			}
2847 		} else
2848 			rv = hg;
2849 	}
2850 
2851 	return rv;
2852 }
2853 
2854 /**
2855  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2856  */
2857 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2858 {
2859 	struct drbd_device *device = peer_device->device;
2860 	int hg, rv = -100;
2861 	enum drbd_after_sb_p after_sb_2p;
2862 
2863 	rcu_read_lock();
2864 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2865 	rcu_read_unlock();
2866 	switch (after_sb_2p) {
2867 	case ASB_DISCARD_YOUNGER_PRI:
2868 	case ASB_DISCARD_OLDER_PRI:
2869 	case ASB_DISCARD_LEAST_CHG:
2870 	case ASB_DISCARD_LOCAL:
2871 	case ASB_DISCARD_REMOTE:
2872 	case ASB_CONSENSUS:
2873 	case ASB_DISCARD_SECONDARY:
2874 	case ASB_DISCARD_ZERO_CHG:
2875 		drbd_err(device, "Configuration error.\n");
2876 		break;
2877 	case ASB_VIOLENTLY:
2878 		rv = drbd_asb_recover_0p(peer_device);
2879 		break;
2880 	case ASB_DISCONNECT:
2881 		break;
2882 	case ASB_CALL_HELPER:
2883 		hg = drbd_asb_recover_0p(peer_device);
2884 		if (hg == -1) {
2885 			enum drbd_state_rv rv2;
2886 
2887 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2888 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2889 			  * we do not need to wait for the after state change work either. */
2890 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2891 			if (rv2 != SS_SUCCESS) {
2892 				drbd_khelper(device, "pri-lost-after-sb");
2893 			} else {
2894 				drbd_warn(device, "Successfully gave up primary role.\n");
2895 				rv = hg;
2896 			}
2897 		} else
2898 			rv = hg;
2899 	}
2900 
2901 	return rv;
2902 }
2903 
2904 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2905 			   u64 bits, u64 flags)
2906 {
2907 	if (!uuid) {
2908 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2909 		return;
2910 	}
2911 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2912 	     text,
2913 	     (unsigned long long)uuid[UI_CURRENT],
2914 	     (unsigned long long)uuid[UI_BITMAP],
2915 	     (unsigned long long)uuid[UI_HISTORY_START],
2916 	     (unsigned long long)uuid[UI_HISTORY_END],
2917 	     (unsigned long long)bits,
2918 	     (unsigned long long)flags);
2919 }
2920 
2921 /*
2922   100	after split brain try auto recover
2923     2	C_SYNC_SOURCE set BitMap
2924     1	C_SYNC_SOURCE use BitMap
2925     0	no Sync
2926    -1	C_SYNC_TARGET use BitMap
2927    -2	C_SYNC_TARGET set BitMap
2928  -100	after split brain, disconnect
2929 -1000	unrelated data
2930 -1091   requires proto 91
2931 -1096   requires proto 96
2932  */
2933 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2934 {
2935 	struct drbd_peer_device *const peer_device = first_peer_device(device);
2936 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2937 	u64 self, peer;
2938 	int i, j;
2939 
2940 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2941 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2942 
2943 	*rule_nr = 10;
2944 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2945 		return 0;
2946 
2947 	*rule_nr = 20;
2948 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2949 	     peer != UUID_JUST_CREATED)
2950 		return -2;
2951 
2952 	*rule_nr = 30;
2953 	if (self != UUID_JUST_CREATED &&
2954 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2955 		return 2;
2956 
2957 	if (self == peer) {
2958 		int rct, dc; /* roles at crash time */
2959 
2960 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2961 
2962 			if (connection->agreed_pro_version < 91)
2963 				return -1091;
2964 
2965 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2966 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2967 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2968 				drbd_uuid_move_history(device);
2969 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2970 				device->ldev->md.uuid[UI_BITMAP] = 0;
2971 
2972 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2973 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2974 				*rule_nr = 34;
2975 			} else {
2976 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2977 				*rule_nr = 36;
2978 			}
2979 
2980 			return 1;
2981 		}
2982 
2983 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2984 
2985 			if (connection->agreed_pro_version < 91)
2986 				return -1091;
2987 
2988 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2989 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2990 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2991 
2992 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2993 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2994 				device->p_uuid[UI_BITMAP] = 0UL;
2995 
2996 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2997 				*rule_nr = 35;
2998 			} else {
2999 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3000 				*rule_nr = 37;
3001 			}
3002 
3003 			return -1;
3004 		}
3005 
3006 		/* Common power [off|failure] */
3007 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3008 			(device->p_uuid[UI_FLAGS] & 2);
3009 		/* lowest bit is set when we were primary,
3010 		 * next bit (weight 2) is set when peer was primary */
3011 		*rule_nr = 40;
3012 
3013 		switch (rct) {
3014 		case 0: /* !self_pri && !peer_pri */ return 0;
3015 		case 1: /*  self_pri && !peer_pri */ return 1;
3016 		case 2: /* !self_pri &&  peer_pri */ return -1;
3017 		case 3: /*  self_pri &&  peer_pri */
3018 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3019 			return dc ? -1 : 1;
3020 		}
3021 	}
3022 
3023 	*rule_nr = 50;
3024 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3025 	if (self == peer)
3026 		return -1;
3027 
3028 	*rule_nr = 51;
3029 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3030 	if (self == peer) {
3031 		if (connection->agreed_pro_version < 96 ?
3032 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3033 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3034 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3035 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3036 			   resync as sync source modifications of the peer's UUIDs. */
3037 
3038 			if (connection->agreed_pro_version < 91)
3039 				return -1091;
3040 
3041 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3042 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3043 
3044 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3045 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3046 
3047 			return -1;
3048 		}
3049 	}
3050 
3051 	*rule_nr = 60;
3052 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3053 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3054 		peer = device->p_uuid[i] & ~((u64)1);
3055 		if (self == peer)
3056 			return -2;
3057 	}
3058 
3059 	*rule_nr = 70;
3060 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3061 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3062 	if (self == peer)
3063 		return 1;
3064 
3065 	*rule_nr = 71;
3066 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3067 	if (self == peer) {
3068 		if (connection->agreed_pro_version < 96 ?
3069 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3070 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3071 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3072 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3073 			   resync as sync source modifications of our UUIDs. */
3074 
3075 			if (connection->agreed_pro_version < 91)
3076 				return -1091;
3077 
3078 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3079 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3080 
3081 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3082 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3083 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3084 
3085 			return 1;
3086 		}
3087 	}
3088 
3089 
3090 	*rule_nr = 80;
3091 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3092 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093 		self = device->ldev->md.uuid[i] & ~((u64)1);
3094 		if (self == peer)
3095 			return 2;
3096 	}
3097 
3098 	*rule_nr = 90;
3099 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3101 	if (self == peer && self != ((u64)0))
3102 		return 100;
3103 
3104 	*rule_nr = 100;
3105 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3106 		self = device->ldev->md.uuid[i] & ~((u64)1);
3107 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3108 			peer = device->p_uuid[j] & ~((u64)1);
3109 			if (self == peer)
3110 				return -100;
3111 		}
3112 	}
3113 
3114 	return -1000;
3115 }
3116 
3117 /* drbd_sync_handshake() returns the new conn state on success, or
3118    CONN_MASK (-1) on failure.
3119  */
3120 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3121 					   enum drbd_role peer_role,
3122 					   enum drbd_disk_state peer_disk) __must_hold(local)
3123 {
3124 	struct drbd_device *device = peer_device->device;
3125 	enum drbd_conns rv = C_MASK;
3126 	enum drbd_disk_state mydisk;
3127 	struct net_conf *nc;
3128 	int hg, rule_nr, rr_conflict, tentative;
3129 
3130 	mydisk = device->state.disk;
3131 	if (mydisk == D_NEGOTIATING)
3132 		mydisk = device->new_state_tmp.disk;
3133 
3134 	drbd_info(device, "drbd_sync_handshake:\n");
3135 
3136 	spin_lock_irq(&device->ldev->md.uuid_lock);
3137 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3138 	drbd_uuid_dump(device, "peer", device->p_uuid,
3139 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3140 
3141 	hg = drbd_uuid_compare(device, &rule_nr);
3142 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3143 
3144 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3145 
3146 	if (hg == -1000) {
3147 		drbd_alert(device, "Unrelated data, aborting!\n");
3148 		return C_MASK;
3149 	}
3150 	if (hg < -1000) {
3151 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3152 		return C_MASK;
3153 	}
3154 
3155 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3156 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3157 		int f = (hg == -100) || abs(hg) == 2;
3158 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3159 		if (f)
3160 			hg = hg*2;
3161 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3162 		     hg > 0 ? "source" : "target");
3163 	}
3164 
3165 	if (abs(hg) == 100)
3166 		drbd_khelper(device, "initial-split-brain");
3167 
3168 	rcu_read_lock();
3169 	nc = rcu_dereference(peer_device->connection->net_conf);
3170 
3171 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3172 		int pcount = (device->state.role == R_PRIMARY)
3173 			   + (peer_role == R_PRIMARY);
3174 		int forced = (hg == -100);
3175 
3176 		switch (pcount) {
3177 		case 0:
3178 			hg = drbd_asb_recover_0p(peer_device);
3179 			break;
3180 		case 1:
3181 			hg = drbd_asb_recover_1p(peer_device);
3182 			break;
3183 		case 2:
3184 			hg = drbd_asb_recover_2p(peer_device);
3185 			break;
3186 		}
3187 		if (abs(hg) < 100) {
3188 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3189 			     "automatically solved. Sync from %s node\n",
3190 			     pcount, (hg < 0) ? "peer" : "this");
3191 			if (forced) {
3192 				drbd_warn(device, "Doing a full sync, since"
3193 				     " UUIDs where ambiguous.\n");
3194 				hg = hg*2;
3195 			}
3196 		}
3197 	}
3198 
3199 	if (hg == -100) {
3200 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3201 			hg = -1;
3202 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3203 			hg = 1;
3204 
3205 		if (abs(hg) < 100)
3206 			drbd_warn(device, "Split-Brain detected, manually solved. "
3207 			     "Sync from %s node\n",
3208 			     (hg < 0) ? "peer" : "this");
3209 	}
3210 	rr_conflict = nc->rr_conflict;
3211 	tentative = nc->tentative;
3212 	rcu_read_unlock();
3213 
3214 	if (hg == -100) {
3215 		/* FIXME this log message is not correct if we end up here
3216 		 * after an attempted attach on a diskless node.
3217 		 * We just refuse to attach -- well, we drop the "connection"
3218 		 * to that disk, in a way... */
3219 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3220 		drbd_khelper(device, "split-brain");
3221 		return C_MASK;
3222 	}
3223 
3224 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3225 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3226 		return C_MASK;
3227 	}
3228 
3229 	if (hg < 0 && /* by intention we do not use mydisk here. */
3230 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3231 		switch (rr_conflict) {
3232 		case ASB_CALL_HELPER:
3233 			drbd_khelper(device, "pri-lost");
3234 			/* fall through */
3235 		case ASB_DISCONNECT:
3236 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3237 			return C_MASK;
3238 		case ASB_VIOLENTLY:
3239 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3240 			     "assumption\n");
3241 		}
3242 	}
3243 
3244 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3245 		if (hg == 0)
3246 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3247 		else
3248 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3249 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3250 				 abs(hg) >= 2 ? "full" : "bit-map based");
3251 		return C_MASK;
3252 	}
3253 
3254 	if (abs(hg) >= 2) {
3255 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3256 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3257 					BM_LOCKED_SET_ALLOWED))
3258 			return C_MASK;
3259 	}
3260 
3261 	if (hg > 0) { /* become sync source. */
3262 		rv = C_WF_BITMAP_S;
3263 	} else if (hg < 0) { /* become sync target */
3264 		rv = C_WF_BITMAP_T;
3265 	} else {
3266 		rv = C_CONNECTED;
3267 		if (drbd_bm_total_weight(device)) {
3268 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3269 			     drbd_bm_total_weight(device));
3270 		}
3271 	}
3272 
3273 	return rv;
3274 }
3275 
3276 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3277 {
3278 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3279 	if (peer == ASB_DISCARD_REMOTE)
3280 		return ASB_DISCARD_LOCAL;
3281 
3282 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3283 	if (peer == ASB_DISCARD_LOCAL)
3284 		return ASB_DISCARD_REMOTE;
3285 
3286 	/* everything else is valid if they are equal on both sides. */
3287 	return peer;
3288 }
3289 
3290 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3291 {
3292 	struct p_protocol *p = pi->data;
3293 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3294 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3295 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3296 	char integrity_alg[SHARED_SECRET_MAX] = "";
3297 	struct crypto_hash *peer_integrity_tfm = NULL;
3298 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3299 
3300 	p_proto		= be32_to_cpu(p->protocol);
3301 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3302 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3303 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3304 	p_two_primaries = be32_to_cpu(p->two_primaries);
3305 	cf		= be32_to_cpu(p->conn_flags);
3306 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3307 
3308 	if (connection->agreed_pro_version >= 87) {
3309 		int err;
3310 
3311 		if (pi->size > sizeof(integrity_alg))
3312 			return -EIO;
3313 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3314 		if (err)
3315 			return err;
3316 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3317 	}
3318 
3319 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3320 		clear_bit(CONN_DRY_RUN, &connection->flags);
3321 
3322 		if (cf & CF_DRY_RUN)
3323 			set_bit(CONN_DRY_RUN, &connection->flags);
3324 
3325 		rcu_read_lock();
3326 		nc = rcu_dereference(connection->net_conf);
3327 
3328 		if (p_proto != nc->wire_protocol) {
3329 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3330 			goto disconnect_rcu_unlock;
3331 		}
3332 
3333 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3334 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3335 			goto disconnect_rcu_unlock;
3336 		}
3337 
3338 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3339 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3340 			goto disconnect_rcu_unlock;
3341 		}
3342 
3343 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3344 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3345 			goto disconnect_rcu_unlock;
3346 		}
3347 
3348 		if (p_discard_my_data && nc->discard_my_data) {
3349 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3350 			goto disconnect_rcu_unlock;
3351 		}
3352 
3353 		if (p_two_primaries != nc->two_primaries) {
3354 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3355 			goto disconnect_rcu_unlock;
3356 		}
3357 
3358 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3359 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3360 			goto disconnect_rcu_unlock;
3361 		}
3362 
3363 		rcu_read_unlock();
3364 	}
3365 
3366 	if (integrity_alg[0]) {
3367 		int hash_size;
3368 
3369 		/*
3370 		 * We can only change the peer data integrity algorithm
3371 		 * here.  Changing our own data integrity algorithm
3372 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3373 		 * the same time; otherwise, the peer has no way to
3374 		 * tell between which packets the algorithm should
3375 		 * change.
3376 		 */
3377 
3378 		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3379 		if (!peer_integrity_tfm) {
3380 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3381 				 integrity_alg);
3382 			goto disconnect;
3383 		}
3384 
3385 		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3386 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3387 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3388 		if (!(int_dig_in && int_dig_vv)) {
3389 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3390 			goto disconnect;
3391 		}
3392 	}
3393 
3394 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3395 	if (!new_net_conf) {
3396 		drbd_err(connection, "Allocation of new net_conf failed\n");
3397 		goto disconnect;
3398 	}
3399 
3400 	mutex_lock(&connection->data.mutex);
3401 	mutex_lock(&connection->resource->conf_update);
3402 	old_net_conf = connection->net_conf;
3403 	*new_net_conf = *old_net_conf;
3404 
3405 	new_net_conf->wire_protocol = p_proto;
3406 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3407 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3408 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3409 	new_net_conf->two_primaries = p_two_primaries;
3410 
3411 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3412 	mutex_unlock(&connection->resource->conf_update);
3413 	mutex_unlock(&connection->data.mutex);
3414 
3415 	crypto_free_hash(connection->peer_integrity_tfm);
3416 	kfree(connection->int_dig_in);
3417 	kfree(connection->int_dig_vv);
3418 	connection->peer_integrity_tfm = peer_integrity_tfm;
3419 	connection->int_dig_in = int_dig_in;
3420 	connection->int_dig_vv = int_dig_vv;
3421 
3422 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3423 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3424 			  integrity_alg[0] ? integrity_alg : "(none)");
3425 
3426 	synchronize_rcu();
3427 	kfree(old_net_conf);
3428 	return 0;
3429 
3430 disconnect_rcu_unlock:
3431 	rcu_read_unlock();
3432 disconnect:
3433 	crypto_free_hash(peer_integrity_tfm);
3434 	kfree(int_dig_in);
3435 	kfree(int_dig_vv);
3436 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3437 	return -EIO;
3438 }
3439 
3440 /* helper function
3441  * input: alg name, feature name
3442  * return: NULL (alg name was "")
3443  *         ERR_PTR(error) if something goes wrong
3444  *         or the crypto hash ptr, if it worked out ok. */
3445 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3446 		const char *alg, const char *name)
3447 {
3448 	struct crypto_hash *tfm;
3449 
3450 	if (!alg[0])
3451 		return NULL;
3452 
3453 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3454 	if (IS_ERR(tfm)) {
3455 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3456 			alg, name, PTR_ERR(tfm));
3457 		return tfm;
3458 	}
3459 	return tfm;
3460 }
3461 
3462 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3463 {
3464 	void *buffer = connection->data.rbuf;
3465 	int size = pi->size;
3466 
3467 	while (size) {
3468 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3469 		s = drbd_recv(connection, buffer, s);
3470 		if (s <= 0) {
3471 			if (s < 0)
3472 				return s;
3473 			break;
3474 		}
3475 		size -= s;
3476 	}
3477 	if (size)
3478 		return -EIO;
3479 	return 0;
3480 }
3481 
3482 /*
3483  * config_unknown_volume  -  device configuration command for unknown volume
3484  *
3485  * When a device is added to an existing connection, the node on which the
3486  * device is added first will send configuration commands to its peer but the
3487  * peer will not know about the device yet.  It will warn and ignore these
3488  * commands.  Once the device is added on the second node, the second node will
3489  * send the same device configuration commands, but in the other direction.
3490  *
3491  * (We can also end up here if drbd is misconfigured.)
3492  */
3493 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3494 {
3495 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3496 		  cmdname(pi->cmd), pi->vnr);
3497 	return ignore_remaining_packet(connection, pi);
3498 }
3499 
3500 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3501 {
3502 	struct drbd_peer_device *peer_device;
3503 	struct drbd_device *device;
3504 	struct p_rs_param_95 *p;
3505 	unsigned int header_size, data_size, exp_max_sz;
3506 	struct crypto_hash *verify_tfm = NULL;
3507 	struct crypto_hash *csums_tfm = NULL;
3508 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3509 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3510 	const int apv = connection->agreed_pro_version;
3511 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3512 	int fifo_size = 0;
3513 	int err;
3514 
3515 	peer_device = conn_peer_device(connection, pi->vnr);
3516 	if (!peer_device)
3517 		return config_unknown_volume(connection, pi);
3518 	device = peer_device->device;
3519 
3520 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3521 		    : apv == 88 ? sizeof(struct p_rs_param)
3522 					+ SHARED_SECRET_MAX
3523 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3524 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3525 
3526 	if (pi->size > exp_max_sz) {
3527 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3528 		    pi->size, exp_max_sz);
3529 		return -EIO;
3530 	}
3531 
3532 	if (apv <= 88) {
3533 		header_size = sizeof(struct p_rs_param);
3534 		data_size = pi->size - header_size;
3535 	} else if (apv <= 94) {
3536 		header_size = sizeof(struct p_rs_param_89);
3537 		data_size = pi->size - header_size;
3538 		D_ASSERT(device, data_size == 0);
3539 	} else {
3540 		header_size = sizeof(struct p_rs_param_95);
3541 		data_size = pi->size - header_size;
3542 		D_ASSERT(device, data_size == 0);
3543 	}
3544 
3545 	/* initialize verify_alg and csums_alg */
3546 	p = pi->data;
3547 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3548 
3549 	err = drbd_recv_all(peer_device->connection, p, header_size);
3550 	if (err)
3551 		return err;
3552 
3553 	mutex_lock(&connection->resource->conf_update);
3554 	old_net_conf = peer_device->connection->net_conf;
3555 	if (get_ldev(device)) {
3556 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3557 		if (!new_disk_conf) {
3558 			put_ldev(device);
3559 			mutex_unlock(&connection->resource->conf_update);
3560 			drbd_err(device, "Allocation of new disk_conf failed\n");
3561 			return -ENOMEM;
3562 		}
3563 
3564 		old_disk_conf = device->ldev->disk_conf;
3565 		*new_disk_conf = *old_disk_conf;
3566 
3567 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3568 	}
3569 
3570 	if (apv >= 88) {
3571 		if (apv == 88) {
3572 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3573 				drbd_err(device, "verify-alg of wrong size, "
3574 					"peer wants %u, accepting only up to %u byte\n",
3575 					data_size, SHARED_SECRET_MAX);
3576 				err = -EIO;
3577 				goto reconnect;
3578 			}
3579 
3580 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3581 			if (err)
3582 				goto reconnect;
3583 			/* we expect NUL terminated string */
3584 			/* but just in case someone tries to be evil */
3585 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3586 			p->verify_alg[data_size-1] = 0;
3587 
3588 		} else /* apv >= 89 */ {
3589 			/* we still expect NUL terminated strings */
3590 			/* but just in case someone tries to be evil */
3591 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3592 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3593 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3594 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3595 		}
3596 
3597 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3598 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3599 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3600 				    old_net_conf->verify_alg, p->verify_alg);
3601 				goto disconnect;
3602 			}
3603 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3604 					p->verify_alg, "verify-alg");
3605 			if (IS_ERR(verify_tfm)) {
3606 				verify_tfm = NULL;
3607 				goto disconnect;
3608 			}
3609 		}
3610 
3611 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3612 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3613 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3614 				    old_net_conf->csums_alg, p->csums_alg);
3615 				goto disconnect;
3616 			}
3617 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3618 					p->csums_alg, "csums-alg");
3619 			if (IS_ERR(csums_tfm)) {
3620 				csums_tfm = NULL;
3621 				goto disconnect;
3622 			}
3623 		}
3624 
3625 		if (apv > 94 && new_disk_conf) {
3626 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3627 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3628 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3629 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3630 
3631 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3632 			if (fifo_size != device->rs_plan_s->size) {
3633 				new_plan = fifo_alloc(fifo_size);
3634 				if (!new_plan) {
3635 					drbd_err(device, "kmalloc of fifo_buffer failed");
3636 					put_ldev(device);
3637 					goto disconnect;
3638 				}
3639 			}
3640 		}
3641 
3642 		if (verify_tfm || csums_tfm) {
3643 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3644 			if (!new_net_conf) {
3645 				drbd_err(device, "Allocation of new net_conf failed\n");
3646 				goto disconnect;
3647 			}
3648 
3649 			*new_net_conf = *old_net_conf;
3650 
3651 			if (verify_tfm) {
3652 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3653 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3654 				crypto_free_hash(peer_device->connection->verify_tfm);
3655 				peer_device->connection->verify_tfm = verify_tfm;
3656 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3657 			}
3658 			if (csums_tfm) {
3659 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3660 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3661 				crypto_free_hash(peer_device->connection->csums_tfm);
3662 				peer_device->connection->csums_tfm = csums_tfm;
3663 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3664 			}
3665 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3666 		}
3667 	}
3668 
3669 	if (new_disk_conf) {
3670 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3671 		put_ldev(device);
3672 	}
3673 
3674 	if (new_plan) {
3675 		old_plan = device->rs_plan_s;
3676 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3677 	}
3678 
3679 	mutex_unlock(&connection->resource->conf_update);
3680 	synchronize_rcu();
3681 	if (new_net_conf)
3682 		kfree(old_net_conf);
3683 	kfree(old_disk_conf);
3684 	kfree(old_plan);
3685 
3686 	return 0;
3687 
3688 reconnect:
3689 	if (new_disk_conf) {
3690 		put_ldev(device);
3691 		kfree(new_disk_conf);
3692 	}
3693 	mutex_unlock(&connection->resource->conf_update);
3694 	return -EIO;
3695 
3696 disconnect:
3697 	kfree(new_plan);
3698 	if (new_disk_conf) {
3699 		put_ldev(device);
3700 		kfree(new_disk_conf);
3701 	}
3702 	mutex_unlock(&connection->resource->conf_update);
3703 	/* just for completeness: actually not needed,
3704 	 * as this is not reached if csums_tfm was ok. */
3705 	crypto_free_hash(csums_tfm);
3706 	/* but free the verify_tfm again, if csums_tfm did not work out */
3707 	crypto_free_hash(verify_tfm);
3708 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3709 	return -EIO;
3710 }
3711 
3712 /* warn if the arguments differ by more than 12.5% */
3713 static void warn_if_differ_considerably(struct drbd_device *device,
3714 	const char *s, sector_t a, sector_t b)
3715 {
3716 	sector_t d;
3717 	if (a == 0 || b == 0)
3718 		return;
3719 	d = (a > b) ? (a - b) : (b - a);
3720 	if (d > (a>>3) || d > (b>>3))
3721 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3722 		     (unsigned long long)a, (unsigned long long)b);
3723 }
3724 
3725 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3726 {
3727 	struct drbd_peer_device *peer_device;
3728 	struct drbd_device *device;
3729 	struct p_sizes *p = pi->data;
3730 	enum determine_dev_size dd = DS_UNCHANGED;
3731 	sector_t p_size, p_usize, p_csize, my_usize;
3732 	int ldsc = 0; /* local disk size changed */
3733 	enum dds_flags ddsf;
3734 
3735 	peer_device = conn_peer_device(connection, pi->vnr);
3736 	if (!peer_device)
3737 		return config_unknown_volume(connection, pi);
3738 	device = peer_device->device;
3739 
3740 	p_size = be64_to_cpu(p->d_size);
3741 	p_usize = be64_to_cpu(p->u_size);
3742 	p_csize = be64_to_cpu(p->c_size);
3743 
3744 	/* just store the peer's disk size for now.
3745 	 * we still need to figure out whether we accept that. */
3746 	device->p_size = p_size;
3747 
3748 	if (get_ldev(device)) {
3749 		rcu_read_lock();
3750 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3751 		rcu_read_unlock();
3752 
3753 		warn_if_differ_considerably(device, "lower level device sizes",
3754 			   p_size, drbd_get_max_capacity(device->ldev));
3755 		warn_if_differ_considerably(device, "user requested size",
3756 					    p_usize, my_usize);
3757 
3758 		/* if this is the first connect, or an otherwise expected
3759 		 * param exchange, choose the minimum */
3760 		if (device->state.conn == C_WF_REPORT_PARAMS)
3761 			p_usize = min_not_zero(my_usize, p_usize);
3762 
3763 		/* Never shrink a device with usable data during connect.
3764 		   But allow online shrinking if we are connected. */
3765 		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3766 		    drbd_get_capacity(device->this_bdev) &&
3767 		    device->state.disk >= D_OUTDATED &&
3768 		    device->state.conn < C_CONNECTED) {
3769 			drbd_err(device, "The peer's disk size is too small!\n");
3770 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3771 			put_ldev(device);
3772 			return -EIO;
3773 		}
3774 
3775 		if (my_usize != p_usize) {
3776 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3777 
3778 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3779 			if (!new_disk_conf) {
3780 				drbd_err(device, "Allocation of new disk_conf failed\n");
3781 				put_ldev(device);
3782 				return -ENOMEM;
3783 			}
3784 
3785 			mutex_lock(&connection->resource->conf_update);
3786 			old_disk_conf = device->ldev->disk_conf;
3787 			*new_disk_conf = *old_disk_conf;
3788 			new_disk_conf->disk_size = p_usize;
3789 
3790 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3791 			mutex_unlock(&connection->resource->conf_update);
3792 			synchronize_rcu();
3793 			kfree(old_disk_conf);
3794 
3795 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
3796 				 (unsigned long)my_usize);
3797 		}
3798 
3799 		put_ldev(device);
3800 	}
3801 
3802 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3803 	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3804 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3805 	   drbd_reconsider_max_bio_size(), we can be sure that after
3806 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3807 
3808 	ddsf = be16_to_cpu(p->dds_flags);
3809 	if (get_ldev(device)) {
3810 		drbd_reconsider_max_bio_size(device, device->ldev);
3811 		dd = drbd_determine_dev_size(device, ddsf, NULL);
3812 		put_ldev(device);
3813 		if (dd == DS_ERROR)
3814 			return -EIO;
3815 		drbd_md_sync(device);
3816 	} else {
3817 		/*
3818 		 * I am diskless, need to accept the peer's *current* size.
3819 		 * I must NOT accept the peers backing disk size,
3820 		 * it may have been larger than mine all along...
3821 		 *
3822 		 * At this point, the peer knows more about my disk, or at
3823 		 * least about what we last agreed upon, than myself.
3824 		 * So if his c_size is less than his d_size, the most likely
3825 		 * reason is that *my* d_size was smaller last time we checked.
3826 		 *
3827 		 * However, if he sends a zero current size,
3828 		 * take his (user-capped or) backing disk size anyways.
3829 		 */
3830 		drbd_reconsider_max_bio_size(device, NULL);
3831 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3832 	}
3833 
3834 	if (get_ldev(device)) {
3835 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3836 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3837 			ldsc = 1;
3838 		}
3839 
3840 		put_ldev(device);
3841 	}
3842 
3843 	if (device->state.conn > C_WF_REPORT_PARAMS) {
3844 		if (be64_to_cpu(p->c_size) !=
3845 		    drbd_get_capacity(device->this_bdev) || ldsc) {
3846 			/* we have different sizes, probably peer
3847 			 * needs to know my new size... */
3848 			drbd_send_sizes(peer_device, 0, ddsf);
3849 		}
3850 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3851 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3852 			if (device->state.pdsk >= D_INCONSISTENT &&
3853 			    device->state.disk >= D_INCONSISTENT) {
3854 				if (ddsf & DDSF_NO_RESYNC)
3855 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3856 				else
3857 					resync_after_online_grow(device);
3858 			} else
3859 				set_bit(RESYNC_AFTER_NEG, &device->flags);
3860 		}
3861 	}
3862 
3863 	return 0;
3864 }
3865 
3866 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3867 {
3868 	struct drbd_peer_device *peer_device;
3869 	struct drbd_device *device;
3870 	struct p_uuids *p = pi->data;
3871 	u64 *p_uuid;
3872 	int i, updated_uuids = 0;
3873 
3874 	peer_device = conn_peer_device(connection, pi->vnr);
3875 	if (!peer_device)
3876 		return config_unknown_volume(connection, pi);
3877 	device = peer_device->device;
3878 
3879 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3880 	if (!p_uuid) {
3881 		drbd_err(device, "kmalloc of p_uuid failed\n");
3882 		return false;
3883 	}
3884 
3885 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3886 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3887 
3888 	kfree(device->p_uuid);
3889 	device->p_uuid = p_uuid;
3890 
3891 	if (device->state.conn < C_CONNECTED &&
3892 	    device->state.disk < D_INCONSISTENT &&
3893 	    device->state.role == R_PRIMARY &&
3894 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3895 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3896 		    (unsigned long long)device->ed_uuid);
3897 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3898 		return -EIO;
3899 	}
3900 
3901 	if (get_ldev(device)) {
3902 		int skip_initial_sync =
3903 			device->state.conn == C_CONNECTED &&
3904 			peer_device->connection->agreed_pro_version >= 90 &&
3905 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3906 			(p_uuid[UI_FLAGS] & 8);
3907 		if (skip_initial_sync) {
3908 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3909 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3910 					"clear_n_write from receive_uuids",
3911 					BM_LOCKED_TEST_ALLOWED);
3912 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3913 			_drbd_uuid_set(device, UI_BITMAP, 0);
3914 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3915 					CS_VERBOSE, NULL);
3916 			drbd_md_sync(device);
3917 			updated_uuids = 1;
3918 		}
3919 		put_ldev(device);
3920 	} else if (device->state.disk < D_INCONSISTENT &&
3921 		   device->state.role == R_PRIMARY) {
3922 		/* I am a diskless primary, the peer just created a new current UUID
3923 		   for me. */
3924 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3925 	}
3926 
3927 	/* Before we test for the disk state, we should wait until an eventually
3928 	   ongoing cluster wide state change is finished. That is important if
3929 	   we are primary and are detaching from our disk. We need to see the
3930 	   new disk state... */
3931 	mutex_lock(device->state_mutex);
3932 	mutex_unlock(device->state_mutex);
3933 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3934 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3935 
3936 	if (updated_uuids)
3937 		drbd_print_uuids(device, "receiver updated UUIDs to");
3938 
3939 	return 0;
3940 }
3941 
3942 /**
3943  * convert_state() - Converts the peer's view of the cluster state to our point of view
3944  * @ps:		The state as seen by the peer.
3945  */
3946 static union drbd_state convert_state(union drbd_state ps)
3947 {
3948 	union drbd_state ms;
3949 
3950 	static enum drbd_conns c_tab[] = {
3951 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3952 		[C_CONNECTED] = C_CONNECTED,
3953 
3954 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3955 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3956 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3957 		[C_VERIFY_S]       = C_VERIFY_T,
3958 		[C_MASK]   = C_MASK,
3959 	};
3960 
3961 	ms.i = ps.i;
3962 
3963 	ms.conn = c_tab[ps.conn];
3964 	ms.peer = ps.role;
3965 	ms.role = ps.peer;
3966 	ms.pdsk = ps.disk;
3967 	ms.disk = ps.pdsk;
3968 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3969 
3970 	return ms;
3971 }
3972 
3973 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3974 {
3975 	struct drbd_peer_device *peer_device;
3976 	struct drbd_device *device;
3977 	struct p_req_state *p = pi->data;
3978 	union drbd_state mask, val;
3979 	enum drbd_state_rv rv;
3980 
3981 	peer_device = conn_peer_device(connection, pi->vnr);
3982 	if (!peer_device)
3983 		return -EIO;
3984 	device = peer_device->device;
3985 
3986 	mask.i = be32_to_cpu(p->mask);
3987 	val.i = be32_to_cpu(p->val);
3988 
3989 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3990 	    mutex_is_locked(device->state_mutex)) {
3991 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3992 		return 0;
3993 	}
3994 
3995 	mask = convert_state(mask);
3996 	val = convert_state(val);
3997 
3998 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3999 	drbd_send_sr_reply(peer_device, rv);
4000 
4001 	drbd_md_sync(device);
4002 
4003 	return 0;
4004 }
4005 
4006 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4007 {
4008 	struct p_req_state *p = pi->data;
4009 	union drbd_state mask, val;
4010 	enum drbd_state_rv rv;
4011 
4012 	mask.i = be32_to_cpu(p->mask);
4013 	val.i = be32_to_cpu(p->val);
4014 
4015 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4016 	    mutex_is_locked(&connection->cstate_mutex)) {
4017 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4018 		return 0;
4019 	}
4020 
4021 	mask = convert_state(mask);
4022 	val = convert_state(val);
4023 
4024 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4025 	conn_send_sr_reply(connection, rv);
4026 
4027 	return 0;
4028 }
4029 
4030 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4031 {
4032 	struct drbd_peer_device *peer_device;
4033 	struct drbd_device *device;
4034 	struct p_state *p = pi->data;
4035 	union drbd_state os, ns, peer_state;
4036 	enum drbd_disk_state real_peer_disk;
4037 	enum chg_state_flags cs_flags;
4038 	int rv;
4039 
4040 	peer_device = conn_peer_device(connection, pi->vnr);
4041 	if (!peer_device)
4042 		return config_unknown_volume(connection, pi);
4043 	device = peer_device->device;
4044 
4045 	peer_state.i = be32_to_cpu(p->state);
4046 
4047 	real_peer_disk = peer_state.disk;
4048 	if (peer_state.disk == D_NEGOTIATING) {
4049 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4050 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4051 	}
4052 
4053 	spin_lock_irq(&device->resource->req_lock);
4054  retry:
4055 	os = ns = drbd_read_state(device);
4056 	spin_unlock_irq(&device->resource->req_lock);
4057 
4058 	/* If some other part of the code (asender thread, timeout)
4059 	 * already decided to close the connection again,
4060 	 * we must not "re-establish" it here. */
4061 	if (os.conn <= C_TEAR_DOWN)
4062 		return -ECONNRESET;
4063 
4064 	/* If this is the "end of sync" confirmation, usually the peer disk
4065 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4066 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4067 	 * unpause-sync events has been "just right", the peer disk may
4068 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4069 	 */
4070 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4071 	    real_peer_disk == D_UP_TO_DATE &&
4072 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4073 		/* If we are (becoming) SyncSource, but peer is still in sync
4074 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4075 		 * will change to inconsistent once the peer reaches active
4076 		 * syncing states.
4077 		 * It may have changed syncer-paused flags, however, so we
4078 		 * cannot ignore this completely. */
4079 		if (peer_state.conn > C_CONNECTED &&
4080 		    peer_state.conn < C_SYNC_SOURCE)
4081 			real_peer_disk = D_INCONSISTENT;
4082 
4083 		/* if peer_state changes to connected at the same time,
4084 		 * it explicitly notifies us that it finished resync.
4085 		 * Maybe we should finish it up, too? */
4086 		else if (os.conn >= C_SYNC_SOURCE &&
4087 			 peer_state.conn == C_CONNECTED) {
4088 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4089 				drbd_resync_finished(device);
4090 			return 0;
4091 		}
4092 	}
4093 
4094 	/* explicit verify finished notification, stop sector reached. */
4095 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4096 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4097 		ov_out_of_sync_print(device);
4098 		drbd_resync_finished(device);
4099 		return 0;
4100 	}
4101 
4102 	/* peer says his disk is inconsistent, while we think it is uptodate,
4103 	 * and this happens while the peer still thinks we have a sync going on,
4104 	 * but we think we are already done with the sync.
4105 	 * We ignore this to avoid flapping pdsk.
4106 	 * This should not happen, if the peer is a recent version of drbd. */
4107 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4108 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4109 		real_peer_disk = D_UP_TO_DATE;
4110 
4111 	if (ns.conn == C_WF_REPORT_PARAMS)
4112 		ns.conn = C_CONNECTED;
4113 
4114 	if (peer_state.conn == C_AHEAD)
4115 		ns.conn = C_BEHIND;
4116 
4117 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4118 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4119 		int cr; /* consider resync */
4120 
4121 		/* if we established a new connection */
4122 		cr  = (os.conn < C_CONNECTED);
4123 		/* if we had an established connection
4124 		 * and one of the nodes newly attaches a disk */
4125 		cr |= (os.conn == C_CONNECTED &&
4126 		       (peer_state.disk == D_NEGOTIATING ||
4127 			os.disk == D_NEGOTIATING));
4128 		/* if we have both been inconsistent, and the peer has been
4129 		 * forced to be UpToDate with --overwrite-data */
4130 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4131 		/* if we had been plain connected, and the admin requested to
4132 		 * start a sync by "invalidate" or "invalidate-remote" */
4133 		cr |= (os.conn == C_CONNECTED &&
4134 				(peer_state.conn >= C_STARTING_SYNC_S &&
4135 				 peer_state.conn <= C_WF_BITMAP_T));
4136 
4137 		if (cr)
4138 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4139 
4140 		put_ldev(device);
4141 		if (ns.conn == C_MASK) {
4142 			ns.conn = C_CONNECTED;
4143 			if (device->state.disk == D_NEGOTIATING) {
4144 				drbd_force_state(device, NS(disk, D_FAILED));
4145 			} else if (peer_state.disk == D_NEGOTIATING) {
4146 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4147 				peer_state.disk = D_DISKLESS;
4148 				real_peer_disk = D_DISKLESS;
4149 			} else {
4150 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4151 					return -EIO;
4152 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4153 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154 				return -EIO;
4155 			}
4156 		}
4157 	}
4158 
4159 	spin_lock_irq(&device->resource->req_lock);
4160 	if (os.i != drbd_read_state(device).i)
4161 		goto retry;
4162 	clear_bit(CONSIDER_RESYNC, &device->flags);
4163 	ns.peer = peer_state.role;
4164 	ns.pdsk = real_peer_disk;
4165 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4166 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4167 		ns.disk = device->new_state_tmp.disk;
4168 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4169 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4170 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4171 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4172 		   for temporal network outages! */
4173 		spin_unlock_irq(&device->resource->req_lock);
4174 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4175 		tl_clear(peer_device->connection);
4176 		drbd_uuid_new_current(device);
4177 		clear_bit(NEW_CUR_UUID, &device->flags);
4178 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4179 		return -EIO;
4180 	}
4181 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4182 	ns = drbd_read_state(device);
4183 	spin_unlock_irq(&device->resource->req_lock);
4184 
4185 	if (rv < SS_SUCCESS) {
4186 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4187 		return -EIO;
4188 	}
4189 
4190 	if (os.conn > C_WF_REPORT_PARAMS) {
4191 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4192 		    peer_state.disk != D_NEGOTIATING ) {
4193 			/* we want resync, peer has not yet decided to sync... */
4194 			/* Nowadays only used when forcing a node into primary role and
4195 			   setting its disk to UpToDate with that */
4196 			drbd_send_uuids(peer_device);
4197 			drbd_send_current_state(peer_device);
4198 		}
4199 	}
4200 
4201 	clear_bit(DISCARD_MY_DATA, &device->flags);
4202 
4203 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4204 
4205 	return 0;
4206 }
4207 
4208 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4209 {
4210 	struct drbd_peer_device *peer_device;
4211 	struct drbd_device *device;
4212 	struct p_rs_uuid *p = pi->data;
4213 
4214 	peer_device = conn_peer_device(connection, pi->vnr);
4215 	if (!peer_device)
4216 		return -EIO;
4217 	device = peer_device->device;
4218 
4219 	wait_event(device->misc_wait,
4220 		   device->state.conn == C_WF_SYNC_UUID ||
4221 		   device->state.conn == C_BEHIND ||
4222 		   device->state.conn < C_CONNECTED ||
4223 		   device->state.disk < D_NEGOTIATING);
4224 
4225 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4226 
4227 	/* Here the _drbd_uuid_ functions are right, current should
4228 	   _not_ be rotated into the history */
4229 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4230 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4231 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4232 
4233 		drbd_print_uuids(device, "updated sync uuid");
4234 		drbd_start_resync(device, C_SYNC_TARGET);
4235 
4236 		put_ldev(device);
4237 	} else
4238 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4239 
4240 	return 0;
4241 }
4242 
4243 /**
4244  * receive_bitmap_plain
4245  *
4246  * Return 0 when done, 1 when another iteration is needed, and a negative error
4247  * code upon failure.
4248  */
4249 static int
4250 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4251 		     unsigned long *p, struct bm_xfer_ctx *c)
4252 {
4253 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4254 				 drbd_header_size(peer_device->connection);
4255 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4256 				       c->bm_words - c->word_offset);
4257 	unsigned int want = num_words * sizeof(*p);
4258 	int err;
4259 
4260 	if (want != size) {
4261 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4262 		return -EIO;
4263 	}
4264 	if (want == 0)
4265 		return 0;
4266 	err = drbd_recv_all(peer_device->connection, p, want);
4267 	if (err)
4268 		return err;
4269 
4270 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4271 
4272 	c->word_offset += num_words;
4273 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4274 	if (c->bit_offset > c->bm_bits)
4275 		c->bit_offset = c->bm_bits;
4276 
4277 	return 1;
4278 }
4279 
4280 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4281 {
4282 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4283 }
4284 
4285 static int dcbp_get_start(struct p_compressed_bm *p)
4286 {
4287 	return (p->encoding & 0x80) != 0;
4288 }
4289 
4290 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4291 {
4292 	return (p->encoding >> 4) & 0x7;
4293 }
4294 
4295 /**
4296  * recv_bm_rle_bits
4297  *
4298  * Return 0 when done, 1 when another iteration is needed, and a negative error
4299  * code upon failure.
4300  */
4301 static int
4302 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4303 		struct p_compressed_bm *p,
4304 		 struct bm_xfer_ctx *c,
4305 		 unsigned int len)
4306 {
4307 	struct bitstream bs;
4308 	u64 look_ahead;
4309 	u64 rl;
4310 	u64 tmp;
4311 	unsigned long s = c->bit_offset;
4312 	unsigned long e;
4313 	int toggle = dcbp_get_start(p);
4314 	int have;
4315 	int bits;
4316 
4317 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4318 
4319 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4320 	if (bits < 0)
4321 		return -EIO;
4322 
4323 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4324 		bits = vli_decode_bits(&rl, look_ahead);
4325 		if (bits <= 0)
4326 			return -EIO;
4327 
4328 		if (toggle) {
4329 			e = s + rl -1;
4330 			if (e >= c->bm_bits) {
4331 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4332 				return -EIO;
4333 			}
4334 			_drbd_bm_set_bits(peer_device->device, s, e);
4335 		}
4336 
4337 		if (have < bits) {
4338 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4339 				have, bits, look_ahead,
4340 				(unsigned int)(bs.cur.b - p->code),
4341 				(unsigned int)bs.buf_len);
4342 			return -EIO;
4343 		}
4344 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4345 		if (likely(bits < 64))
4346 			look_ahead >>= bits;
4347 		else
4348 			look_ahead = 0;
4349 		have -= bits;
4350 
4351 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4352 		if (bits < 0)
4353 			return -EIO;
4354 		look_ahead |= tmp << have;
4355 		have += bits;
4356 	}
4357 
4358 	c->bit_offset = s;
4359 	bm_xfer_ctx_bit_to_word_offset(c);
4360 
4361 	return (s != c->bm_bits);
4362 }
4363 
4364 /**
4365  * decode_bitmap_c
4366  *
4367  * Return 0 when done, 1 when another iteration is needed, and a negative error
4368  * code upon failure.
4369  */
4370 static int
4371 decode_bitmap_c(struct drbd_peer_device *peer_device,
4372 		struct p_compressed_bm *p,
4373 		struct bm_xfer_ctx *c,
4374 		unsigned int len)
4375 {
4376 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4377 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4378 
4379 	/* other variants had been implemented for evaluation,
4380 	 * but have been dropped as this one turned out to be "best"
4381 	 * during all our tests. */
4382 
4383 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4384 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4385 	return -EIO;
4386 }
4387 
4388 void INFO_bm_xfer_stats(struct drbd_device *device,
4389 		const char *direction, struct bm_xfer_ctx *c)
4390 {
4391 	/* what would it take to transfer it "plaintext" */
4392 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4393 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4394 	unsigned int plain =
4395 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4396 		c->bm_words * sizeof(unsigned long);
4397 	unsigned int total = c->bytes[0] + c->bytes[1];
4398 	unsigned int r;
4399 
4400 	/* total can not be zero. but just in case: */
4401 	if (total == 0)
4402 		return;
4403 
4404 	/* don't report if not compressed */
4405 	if (total >= plain)
4406 		return;
4407 
4408 	/* total < plain. check for overflow, still */
4409 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4410 		                    : (1000 * total / plain);
4411 
4412 	if (r > 1000)
4413 		r = 1000;
4414 
4415 	r = 1000 - r;
4416 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4417 	     "total %u; compression: %u.%u%%\n",
4418 			direction,
4419 			c->bytes[1], c->packets[1],
4420 			c->bytes[0], c->packets[0],
4421 			total, r/10, r % 10);
4422 }
4423 
4424 /* Since we are processing the bitfield from lower addresses to higher,
4425    it does not matter if the process it in 32 bit chunks or 64 bit
4426    chunks as long as it is little endian. (Understand it as byte stream,
4427    beginning with the lowest byte...) If we would use big endian
4428    we would need to process it from the highest address to the lowest,
4429    in order to be agnostic to the 32 vs 64 bits issue.
4430 
4431    returns 0 on failure, 1 if we successfully received it. */
4432 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4433 {
4434 	struct drbd_peer_device *peer_device;
4435 	struct drbd_device *device;
4436 	struct bm_xfer_ctx c;
4437 	int err;
4438 
4439 	peer_device = conn_peer_device(connection, pi->vnr);
4440 	if (!peer_device)
4441 		return -EIO;
4442 	device = peer_device->device;
4443 
4444 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4445 	/* you are supposed to send additional out-of-sync information
4446 	 * if you actually set bits during this phase */
4447 
4448 	c = (struct bm_xfer_ctx) {
4449 		.bm_bits = drbd_bm_bits(device),
4450 		.bm_words = drbd_bm_words(device),
4451 	};
4452 
4453 	for(;;) {
4454 		if (pi->cmd == P_BITMAP)
4455 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4456 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4457 			/* MAYBE: sanity check that we speak proto >= 90,
4458 			 * and the feature is enabled! */
4459 			struct p_compressed_bm *p = pi->data;
4460 
4461 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4462 				drbd_err(device, "ReportCBitmap packet too large\n");
4463 				err = -EIO;
4464 				goto out;
4465 			}
4466 			if (pi->size <= sizeof(*p)) {
4467 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4468 				err = -EIO;
4469 				goto out;
4470 			}
4471 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4472 			if (err)
4473 			       goto out;
4474 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4475 		} else {
4476 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4477 			err = -EIO;
4478 			goto out;
4479 		}
4480 
4481 		c.packets[pi->cmd == P_BITMAP]++;
4482 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4483 
4484 		if (err <= 0) {
4485 			if (err < 0)
4486 				goto out;
4487 			break;
4488 		}
4489 		err = drbd_recv_header(peer_device->connection, pi);
4490 		if (err)
4491 			goto out;
4492 	}
4493 
4494 	INFO_bm_xfer_stats(device, "receive", &c);
4495 
4496 	if (device->state.conn == C_WF_BITMAP_T) {
4497 		enum drbd_state_rv rv;
4498 
4499 		err = drbd_send_bitmap(device);
4500 		if (err)
4501 			goto out;
4502 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4503 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4504 		D_ASSERT(device, rv == SS_SUCCESS);
4505 	} else if (device->state.conn != C_WF_BITMAP_S) {
4506 		/* admin may have requested C_DISCONNECTING,
4507 		 * other threads may have noticed network errors */
4508 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4509 		    drbd_conn_str(device->state.conn));
4510 	}
4511 	err = 0;
4512 
4513  out:
4514 	drbd_bm_unlock(device);
4515 	if (!err && device->state.conn == C_WF_BITMAP_S)
4516 		drbd_start_resync(device, C_SYNC_SOURCE);
4517 	return err;
4518 }
4519 
4520 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4521 {
4522 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4523 		 pi->cmd, pi->size);
4524 
4525 	return ignore_remaining_packet(connection, pi);
4526 }
4527 
4528 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4529 {
4530 	/* Make sure we've acked all the TCP data associated
4531 	 * with the data requests being unplugged */
4532 	drbd_tcp_quickack(connection->data.socket);
4533 
4534 	return 0;
4535 }
4536 
4537 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4538 {
4539 	struct drbd_peer_device *peer_device;
4540 	struct drbd_device *device;
4541 	struct p_block_desc *p = pi->data;
4542 
4543 	peer_device = conn_peer_device(connection, pi->vnr);
4544 	if (!peer_device)
4545 		return -EIO;
4546 	device = peer_device->device;
4547 
4548 	switch (device->state.conn) {
4549 	case C_WF_SYNC_UUID:
4550 	case C_WF_BITMAP_T:
4551 	case C_BEHIND:
4552 			break;
4553 	default:
4554 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4555 				drbd_conn_str(device->state.conn));
4556 	}
4557 
4558 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4559 
4560 	return 0;
4561 }
4562 
4563 struct data_cmd {
4564 	int expect_payload;
4565 	size_t pkt_size;
4566 	int (*fn)(struct drbd_connection *, struct packet_info *);
4567 };
4568 
4569 static struct data_cmd drbd_cmd_handler[] = {
4570 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4571 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4572 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4573 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4574 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4575 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4576 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4577 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4578 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4580 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4581 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4582 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4583 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4584 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4585 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4586 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4587 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4588 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4589 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4591 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4592 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4593 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4594 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4595 };
4596 
4597 static void drbdd(struct drbd_connection *connection)
4598 {
4599 	struct packet_info pi;
4600 	size_t shs; /* sub header size */
4601 	int err;
4602 
4603 	while (get_t_state(&connection->receiver) == RUNNING) {
4604 		struct data_cmd *cmd;
4605 
4606 		drbd_thread_current_set_cpu(&connection->receiver);
4607 		update_receiver_timing_details(connection, drbd_recv_header);
4608 		if (drbd_recv_header(connection, &pi))
4609 			goto err_out;
4610 
4611 		cmd = &drbd_cmd_handler[pi.cmd];
4612 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4613 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4614 				 cmdname(pi.cmd), pi.cmd);
4615 			goto err_out;
4616 		}
4617 
4618 		shs = cmd->pkt_size;
4619 		if (pi.size > shs && !cmd->expect_payload) {
4620 			drbd_err(connection, "No payload expected %s l:%d\n",
4621 				 cmdname(pi.cmd), pi.size);
4622 			goto err_out;
4623 		}
4624 
4625 		if (shs) {
4626 			update_receiver_timing_details(connection, drbd_recv_all_warn);
4627 			err = drbd_recv_all_warn(connection, pi.data, shs);
4628 			if (err)
4629 				goto err_out;
4630 			pi.size -= shs;
4631 		}
4632 
4633 		update_receiver_timing_details(connection, cmd->fn);
4634 		err = cmd->fn(connection, &pi);
4635 		if (err) {
4636 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4637 				 cmdname(pi.cmd), err, pi.size);
4638 			goto err_out;
4639 		}
4640 	}
4641 	return;
4642 
4643     err_out:
4644 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4645 }
4646 
4647 static void conn_disconnect(struct drbd_connection *connection)
4648 {
4649 	struct drbd_peer_device *peer_device;
4650 	enum drbd_conns oc;
4651 	int vnr;
4652 
4653 	if (connection->cstate == C_STANDALONE)
4654 		return;
4655 
4656 	/* We are about to start the cleanup after connection loss.
4657 	 * Make sure drbd_make_request knows about that.
4658 	 * Usually we should be in some network failure state already,
4659 	 * but just in case we are not, we fix it up here.
4660 	 */
4661 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4662 
4663 	/* asender does not clean up anything. it must not interfere, either */
4664 	drbd_thread_stop(&connection->asender);
4665 	drbd_free_sock(connection);
4666 
4667 	rcu_read_lock();
4668 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4669 		struct drbd_device *device = peer_device->device;
4670 		kref_get(&device->kref);
4671 		rcu_read_unlock();
4672 		drbd_disconnected(peer_device);
4673 		kref_put(&device->kref, drbd_destroy_device);
4674 		rcu_read_lock();
4675 	}
4676 	rcu_read_unlock();
4677 
4678 	if (!list_empty(&connection->current_epoch->list))
4679 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4680 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4681 	atomic_set(&connection->current_epoch->epoch_size, 0);
4682 	connection->send.seen_any_write_yet = false;
4683 
4684 	drbd_info(connection, "Connection closed\n");
4685 
4686 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4687 		conn_try_outdate_peer_async(connection);
4688 
4689 	spin_lock_irq(&connection->resource->req_lock);
4690 	oc = connection->cstate;
4691 	if (oc >= C_UNCONNECTED)
4692 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4693 
4694 	spin_unlock_irq(&connection->resource->req_lock);
4695 
4696 	if (oc == C_DISCONNECTING)
4697 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4698 }
4699 
4700 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4701 {
4702 	struct drbd_device *device = peer_device->device;
4703 	unsigned int i;
4704 
4705 	/* wait for current activity to cease. */
4706 	spin_lock_irq(&device->resource->req_lock);
4707 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4708 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4709 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4710 	spin_unlock_irq(&device->resource->req_lock);
4711 
4712 	/* We do not have data structures that would allow us to
4713 	 * get the rs_pending_cnt down to 0 again.
4714 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4715 	 *    the pending RSDataRequest's we have sent.
4716 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4717 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4718 	 *  And no, it is not the sum of the reference counts in the
4719 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4720 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4721 	 *  on the fly. */
4722 	drbd_rs_cancel_all(device);
4723 	device->rs_total = 0;
4724 	device->rs_failed = 0;
4725 	atomic_set(&device->rs_pending_cnt, 0);
4726 	wake_up(&device->misc_wait);
4727 
4728 	del_timer_sync(&device->resync_timer);
4729 	resync_timer_fn((unsigned long)device);
4730 
4731 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4732 	 * w_make_resync_request etc. which may still be on the worker queue
4733 	 * to be "canceled" */
4734 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4735 
4736 	drbd_finish_peer_reqs(device);
4737 
4738 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4739 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4740 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4741 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4742 
4743 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4744 	 * again via drbd_try_clear_on_disk_bm(). */
4745 	drbd_rs_cancel_all(device);
4746 
4747 	kfree(device->p_uuid);
4748 	device->p_uuid = NULL;
4749 
4750 	if (!drbd_suspended(device))
4751 		tl_clear(peer_device->connection);
4752 
4753 	drbd_md_sync(device);
4754 
4755 	/* serialize with bitmap writeout triggered by the state change,
4756 	 * if any. */
4757 	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4758 
4759 	/* tcp_close and release of sendpage pages can be deferred.  I don't
4760 	 * want to use SO_LINGER, because apparently it can be deferred for
4761 	 * more than 20 seconds (longest time I checked).
4762 	 *
4763 	 * Actually we don't care for exactly when the network stack does its
4764 	 * put_page(), but release our reference on these pages right here.
4765 	 */
4766 	i = drbd_free_peer_reqs(device, &device->net_ee);
4767 	if (i)
4768 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4769 	i = atomic_read(&device->pp_in_use_by_net);
4770 	if (i)
4771 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4772 	i = atomic_read(&device->pp_in_use);
4773 	if (i)
4774 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4775 
4776 	D_ASSERT(device, list_empty(&device->read_ee));
4777 	D_ASSERT(device, list_empty(&device->active_ee));
4778 	D_ASSERT(device, list_empty(&device->sync_ee));
4779 	D_ASSERT(device, list_empty(&device->done_ee));
4780 
4781 	return 0;
4782 }
4783 
4784 /*
4785  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4786  * we can agree on is stored in agreed_pro_version.
4787  *
4788  * feature flags and the reserved array should be enough room for future
4789  * enhancements of the handshake protocol, and possible plugins...
4790  *
4791  * for now, they are expected to be zero, but ignored.
4792  */
4793 static int drbd_send_features(struct drbd_connection *connection)
4794 {
4795 	struct drbd_socket *sock;
4796 	struct p_connection_features *p;
4797 
4798 	sock = &connection->data;
4799 	p = conn_prepare_command(connection, sock);
4800 	if (!p)
4801 		return -EIO;
4802 	memset(p, 0, sizeof(*p));
4803 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4804 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4805 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4806 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4807 }
4808 
4809 /*
4810  * return values:
4811  *   1 yes, we have a valid connection
4812  *   0 oops, did not work out, please try again
4813  *  -1 peer talks different language,
4814  *     no point in trying again, please go standalone.
4815  */
4816 static int drbd_do_features(struct drbd_connection *connection)
4817 {
4818 	/* ASSERT current == connection->receiver ... */
4819 	struct p_connection_features *p;
4820 	const int expect = sizeof(struct p_connection_features);
4821 	struct packet_info pi;
4822 	int err;
4823 
4824 	err = drbd_send_features(connection);
4825 	if (err)
4826 		return 0;
4827 
4828 	err = drbd_recv_header(connection, &pi);
4829 	if (err)
4830 		return 0;
4831 
4832 	if (pi.cmd != P_CONNECTION_FEATURES) {
4833 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4834 			 cmdname(pi.cmd), pi.cmd);
4835 		return -1;
4836 	}
4837 
4838 	if (pi.size != expect) {
4839 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4840 		     expect, pi.size);
4841 		return -1;
4842 	}
4843 
4844 	p = pi.data;
4845 	err = drbd_recv_all_warn(connection, p, expect);
4846 	if (err)
4847 		return 0;
4848 
4849 	p->protocol_min = be32_to_cpu(p->protocol_min);
4850 	p->protocol_max = be32_to_cpu(p->protocol_max);
4851 	if (p->protocol_max == 0)
4852 		p->protocol_max = p->protocol_min;
4853 
4854 	if (PRO_VERSION_MAX < p->protocol_min ||
4855 	    PRO_VERSION_MIN > p->protocol_max)
4856 		goto incompat;
4857 
4858 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4859 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4860 
4861 	drbd_info(connection, "Handshake successful: "
4862 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
4863 
4864 	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4865 		  connection->agreed_features & FF_TRIM ? " " : " not ");
4866 
4867 	return 1;
4868 
4869  incompat:
4870 	drbd_err(connection, "incompatible DRBD dialects: "
4871 	    "I support %d-%d, peer supports %d-%d\n",
4872 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4873 	    p->protocol_min, p->protocol_max);
4874 	return -1;
4875 }
4876 
4877 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4878 static int drbd_do_auth(struct drbd_connection *connection)
4879 {
4880 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4881 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4882 	return -1;
4883 }
4884 #else
4885 #define CHALLENGE_LEN 64
4886 
4887 /* Return value:
4888 	1 - auth succeeded,
4889 	0 - failed, try again (network error),
4890 	-1 - auth failed, don't try again.
4891 */
4892 
4893 static int drbd_do_auth(struct drbd_connection *connection)
4894 {
4895 	struct drbd_socket *sock;
4896 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4897 	struct scatterlist sg;
4898 	char *response = NULL;
4899 	char *right_response = NULL;
4900 	char *peers_ch = NULL;
4901 	unsigned int key_len;
4902 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4903 	unsigned int resp_size;
4904 	struct hash_desc desc;
4905 	struct packet_info pi;
4906 	struct net_conf *nc;
4907 	int err, rv;
4908 
4909 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4910 
4911 	rcu_read_lock();
4912 	nc = rcu_dereference(connection->net_conf);
4913 	key_len = strlen(nc->shared_secret);
4914 	memcpy(secret, nc->shared_secret, key_len);
4915 	rcu_read_unlock();
4916 
4917 	desc.tfm = connection->cram_hmac_tfm;
4918 	desc.flags = 0;
4919 
4920 	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4921 	if (rv) {
4922 		drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4923 		rv = -1;
4924 		goto fail;
4925 	}
4926 
4927 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4928 
4929 	sock = &connection->data;
4930 	if (!conn_prepare_command(connection, sock)) {
4931 		rv = 0;
4932 		goto fail;
4933 	}
4934 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4935 				my_challenge, CHALLENGE_LEN);
4936 	if (!rv)
4937 		goto fail;
4938 
4939 	err = drbd_recv_header(connection, &pi);
4940 	if (err) {
4941 		rv = 0;
4942 		goto fail;
4943 	}
4944 
4945 	if (pi.cmd != P_AUTH_CHALLENGE) {
4946 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4947 			 cmdname(pi.cmd), pi.cmd);
4948 		rv = 0;
4949 		goto fail;
4950 	}
4951 
4952 	if (pi.size > CHALLENGE_LEN * 2) {
4953 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4954 		rv = -1;
4955 		goto fail;
4956 	}
4957 
4958 	if (pi.size < CHALLENGE_LEN) {
4959 		drbd_err(connection, "AuthChallenge payload too small.\n");
4960 		rv = -1;
4961 		goto fail;
4962 	}
4963 
4964 	peers_ch = kmalloc(pi.size, GFP_NOIO);
4965 	if (peers_ch == NULL) {
4966 		drbd_err(connection, "kmalloc of peers_ch failed\n");
4967 		rv = -1;
4968 		goto fail;
4969 	}
4970 
4971 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4972 	if (err) {
4973 		rv = 0;
4974 		goto fail;
4975 	}
4976 
4977 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4978 		drbd_err(connection, "Peer presented the same challenge!\n");
4979 		rv = -1;
4980 		goto fail;
4981 	}
4982 
4983 	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4984 	response = kmalloc(resp_size, GFP_NOIO);
4985 	if (response == NULL) {
4986 		drbd_err(connection, "kmalloc of response failed\n");
4987 		rv = -1;
4988 		goto fail;
4989 	}
4990 
4991 	sg_init_table(&sg, 1);
4992 	sg_set_buf(&sg, peers_ch, pi.size);
4993 
4994 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4995 	if (rv) {
4996 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4997 		rv = -1;
4998 		goto fail;
4999 	}
5000 
5001 	if (!conn_prepare_command(connection, sock)) {
5002 		rv = 0;
5003 		goto fail;
5004 	}
5005 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5006 				response, resp_size);
5007 	if (!rv)
5008 		goto fail;
5009 
5010 	err = drbd_recv_header(connection, &pi);
5011 	if (err) {
5012 		rv = 0;
5013 		goto fail;
5014 	}
5015 
5016 	if (pi.cmd != P_AUTH_RESPONSE) {
5017 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5018 			 cmdname(pi.cmd), pi.cmd);
5019 		rv = 0;
5020 		goto fail;
5021 	}
5022 
5023 	if (pi.size != resp_size) {
5024 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5025 		rv = 0;
5026 		goto fail;
5027 	}
5028 
5029 	err = drbd_recv_all_warn(connection, response , resp_size);
5030 	if (err) {
5031 		rv = 0;
5032 		goto fail;
5033 	}
5034 
5035 	right_response = kmalloc(resp_size, GFP_NOIO);
5036 	if (right_response == NULL) {
5037 		drbd_err(connection, "kmalloc of right_response failed\n");
5038 		rv = -1;
5039 		goto fail;
5040 	}
5041 
5042 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5043 
5044 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5045 	if (rv) {
5046 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5047 		rv = -1;
5048 		goto fail;
5049 	}
5050 
5051 	rv = !memcmp(response, right_response, resp_size);
5052 
5053 	if (rv)
5054 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5055 		     resp_size);
5056 	else
5057 		rv = -1;
5058 
5059  fail:
5060 	kfree(peers_ch);
5061 	kfree(response);
5062 	kfree(right_response);
5063 
5064 	return rv;
5065 }
5066 #endif
5067 
5068 int drbd_receiver(struct drbd_thread *thi)
5069 {
5070 	struct drbd_connection *connection = thi->connection;
5071 	int h;
5072 
5073 	drbd_info(connection, "receiver (re)started\n");
5074 
5075 	do {
5076 		h = conn_connect(connection);
5077 		if (h == 0) {
5078 			conn_disconnect(connection);
5079 			schedule_timeout_interruptible(HZ);
5080 		}
5081 		if (h == -1) {
5082 			drbd_warn(connection, "Discarding network configuration.\n");
5083 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5084 		}
5085 	} while (h == 0);
5086 
5087 	if (h > 0)
5088 		drbdd(connection);
5089 
5090 	conn_disconnect(connection);
5091 
5092 	drbd_info(connection, "receiver terminated\n");
5093 	return 0;
5094 }
5095 
5096 /* ********* acknowledge sender ******** */
5097 
5098 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5099 {
5100 	struct p_req_state_reply *p = pi->data;
5101 	int retcode = be32_to_cpu(p->retcode);
5102 
5103 	if (retcode >= SS_SUCCESS) {
5104 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5105 	} else {
5106 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5107 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5108 			 drbd_set_st_err_str(retcode), retcode);
5109 	}
5110 	wake_up(&connection->ping_wait);
5111 
5112 	return 0;
5113 }
5114 
5115 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5116 {
5117 	struct drbd_peer_device *peer_device;
5118 	struct drbd_device *device;
5119 	struct p_req_state_reply *p = pi->data;
5120 	int retcode = be32_to_cpu(p->retcode);
5121 
5122 	peer_device = conn_peer_device(connection, pi->vnr);
5123 	if (!peer_device)
5124 		return -EIO;
5125 	device = peer_device->device;
5126 
5127 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5128 		D_ASSERT(device, connection->agreed_pro_version < 100);
5129 		return got_conn_RqSReply(connection, pi);
5130 	}
5131 
5132 	if (retcode >= SS_SUCCESS) {
5133 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5134 	} else {
5135 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5136 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5137 			drbd_set_st_err_str(retcode), retcode);
5138 	}
5139 	wake_up(&device->state_wait);
5140 
5141 	return 0;
5142 }
5143 
5144 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5145 {
5146 	return drbd_send_ping_ack(connection);
5147 
5148 }
5149 
5150 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5151 {
5152 	/* restore idle timeout */
5153 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5154 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5155 		wake_up(&connection->ping_wait);
5156 
5157 	return 0;
5158 }
5159 
5160 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5161 {
5162 	struct drbd_peer_device *peer_device;
5163 	struct drbd_device *device;
5164 	struct p_block_ack *p = pi->data;
5165 	sector_t sector = be64_to_cpu(p->sector);
5166 	int blksize = be32_to_cpu(p->blksize);
5167 
5168 	peer_device = conn_peer_device(connection, pi->vnr);
5169 	if (!peer_device)
5170 		return -EIO;
5171 	device = peer_device->device;
5172 
5173 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5174 
5175 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5176 
5177 	if (get_ldev(device)) {
5178 		drbd_rs_complete_io(device, sector);
5179 		drbd_set_in_sync(device, sector, blksize);
5180 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5181 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5182 		put_ldev(device);
5183 	}
5184 	dec_rs_pending(device);
5185 	atomic_add(blksize >> 9, &device->rs_sect_in);
5186 
5187 	return 0;
5188 }
5189 
5190 static int
5191 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5192 			      struct rb_root *root, const char *func,
5193 			      enum drbd_req_event what, bool missing_ok)
5194 {
5195 	struct drbd_request *req;
5196 	struct bio_and_error m;
5197 
5198 	spin_lock_irq(&device->resource->req_lock);
5199 	req = find_request(device, root, id, sector, missing_ok, func);
5200 	if (unlikely(!req)) {
5201 		spin_unlock_irq(&device->resource->req_lock);
5202 		return -EIO;
5203 	}
5204 	__req_mod(req, what, &m);
5205 	spin_unlock_irq(&device->resource->req_lock);
5206 
5207 	if (m.bio)
5208 		complete_master_bio(device, &m);
5209 	return 0;
5210 }
5211 
5212 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5213 {
5214 	struct drbd_peer_device *peer_device;
5215 	struct drbd_device *device;
5216 	struct p_block_ack *p = pi->data;
5217 	sector_t sector = be64_to_cpu(p->sector);
5218 	int blksize = be32_to_cpu(p->blksize);
5219 	enum drbd_req_event what;
5220 
5221 	peer_device = conn_peer_device(connection, pi->vnr);
5222 	if (!peer_device)
5223 		return -EIO;
5224 	device = peer_device->device;
5225 
5226 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227 
5228 	if (p->block_id == ID_SYNCER) {
5229 		drbd_set_in_sync(device, sector, blksize);
5230 		dec_rs_pending(device);
5231 		return 0;
5232 	}
5233 	switch (pi->cmd) {
5234 	case P_RS_WRITE_ACK:
5235 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5236 		break;
5237 	case P_WRITE_ACK:
5238 		what = WRITE_ACKED_BY_PEER;
5239 		break;
5240 	case P_RECV_ACK:
5241 		what = RECV_ACKED_BY_PEER;
5242 		break;
5243 	case P_SUPERSEDED:
5244 		what = CONFLICT_RESOLVED;
5245 		break;
5246 	case P_RETRY_WRITE:
5247 		what = POSTPONE_WRITE;
5248 		break;
5249 	default:
5250 		BUG();
5251 	}
5252 
5253 	return validate_req_change_req_state(device, p->block_id, sector,
5254 					     &device->write_requests, __func__,
5255 					     what, false);
5256 }
5257 
5258 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5259 {
5260 	struct drbd_peer_device *peer_device;
5261 	struct drbd_device *device;
5262 	struct p_block_ack *p = pi->data;
5263 	sector_t sector = be64_to_cpu(p->sector);
5264 	int size = be32_to_cpu(p->blksize);
5265 	int err;
5266 
5267 	peer_device = conn_peer_device(connection, pi->vnr);
5268 	if (!peer_device)
5269 		return -EIO;
5270 	device = peer_device->device;
5271 
5272 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5273 
5274 	if (p->block_id == ID_SYNCER) {
5275 		dec_rs_pending(device);
5276 		drbd_rs_failed_io(device, sector, size);
5277 		return 0;
5278 	}
5279 
5280 	err = validate_req_change_req_state(device, p->block_id, sector,
5281 					    &device->write_requests, __func__,
5282 					    NEG_ACKED, true);
5283 	if (err) {
5284 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5285 		   The master bio might already be completed, therefore the
5286 		   request is no longer in the collision hash. */
5287 		/* In Protocol B we might already have got a P_RECV_ACK
5288 		   but then get a P_NEG_ACK afterwards. */
5289 		drbd_set_out_of_sync(device, sector, size);
5290 	}
5291 	return 0;
5292 }
5293 
5294 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5295 {
5296 	struct drbd_peer_device *peer_device;
5297 	struct drbd_device *device;
5298 	struct p_block_ack *p = pi->data;
5299 	sector_t sector = be64_to_cpu(p->sector);
5300 
5301 	peer_device = conn_peer_device(connection, pi->vnr);
5302 	if (!peer_device)
5303 		return -EIO;
5304 	device = peer_device->device;
5305 
5306 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5307 
5308 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5309 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5310 
5311 	return validate_req_change_req_state(device, p->block_id, sector,
5312 					     &device->read_requests, __func__,
5313 					     NEG_ACKED, false);
5314 }
5315 
5316 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5317 {
5318 	struct drbd_peer_device *peer_device;
5319 	struct drbd_device *device;
5320 	sector_t sector;
5321 	int size;
5322 	struct p_block_ack *p = pi->data;
5323 
5324 	peer_device = conn_peer_device(connection, pi->vnr);
5325 	if (!peer_device)
5326 		return -EIO;
5327 	device = peer_device->device;
5328 
5329 	sector = be64_to_cpu(p->sector);
5330 	size = be32_to_cpu(p->blksize);
5331 
5332 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5333 
5334 	dec_rs_pending(device);
5335 
5336 	if (get_ldev_if_state(device, D_FAILED)) {
5337 		drbd_rs_complete_io(device, sector);
5338 		switch (pi->cmd) {
5339 		case P_NEG_RS_DREPLY:
5340 			drbd_rs_failed_io(device, sector, size);
5341 		case P_RS_CANCEL:
5342 			break;
5343 		default:
5344 			BUG();
5345 		}
5346 		put_ldev(device);
5347 	}
5348 
5349 	return 0;
5350 }
5351 
5352 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5353 {
5354 	struct p_barrier_ack *p = pi->data;
5355 	struct drbd_peer_device *peer_device;
5356 	int vnr;
5357 
5358 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5359 
5360 	rcu_read_lock();
5361 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5362 		struct drbd_device *device = peer_device->device;
5363 
5364 		if (device->state.conn == C_AHEAD &&
5365 		    atomic_read(&device->ap_in_flight) == 0 &&
5366 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5367 			device->start_resync_timer.expires = jiffies + HZ;
5368 			add_timer(&device->start_resync_timer);
5369 		}
5370 	}
5371 	rcu_read_unlock();
5372 
5373 	return 0;
5374 }
5375 
5376 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5377 {
5378 	struct drbd_peer_device *peer_device;
5379 	struct drbd_device *device;
5380 	struct p_block_ack *p = pi->data;
5381 	struct drbd_device_work *dw;
5382 	sector_t sector;
5383 	int size;
5384 
5385 	peer_device = conn_peer_device(connection, pi->vnr);
5386 	if (!peer_device)
5387 		return -EIO;
5388 	device = peer_device->device;
5389 
5390 	sector = be64_to_cpu(p->sector);
5391 	size = be32_to_cpu(p->blksize);
5392 
5393 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5394 
5395 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5396 		drbd_ov_out_of_sync_found(device, sector, size);
5397 	else
5398 		ov_out_of_sync_print(device);
5399 
5400 	if (!get_ldev(device))
5401 		return 0;
5402 
5403 	drbd_rs_complete_io(device, sector);
5404 	dec_rs_pending(device);
5405 
5406 	--device->ov_left;
5407 
5408 	/* let's advance progress step marks only for every other megabyte */
5409 	if ((device->ov_left & 0x200) == 0x200)
5410 		drbd_advance_rs_marks(device, device->ov_left);
5411 
5412 	if (device->ov_left == 0) {
5413 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5414 		if (dw) {
5415 			dw->w.cb = w_ov_finished;
5416 			dw->device = device;
5417 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5418 		} else {
5419 			drbd_err(device, "kmalloc(dw) failed.");
5420 			ov_out_of_sync_print(device);
5421 			drbd_resync_finished(device);
5422 		}
5423 	}
5424 	put_ldev(device);
5425 	return 0;
5426 }
5427 
5428 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5429 {
5430 	return 0;
5431 }
5432 
5433 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5434 {
5435 	struct drbd_peer_device *peer_device;
5436 	int vnr, not_empty = 0;
5437 
5438 	do {
5439 		clear_bit(SIGNAL_ASENDER, &connection->flags);
5440 		flush_signals(current);
5441 
5442 		rcu_read_lock();
5443 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5444 			struct drbd_device *device = peer_device->device;
5445 			kref_get(&device->kref);
5446 			rcu_read_unlock();
5447 			if (drbd_finish_peer_reqs(device)) {
5448 				kref_put(&device->kref, drbd_destroy_device);
5449 				return 1;
5450 			}
5451 			kref_put(&device->kref, drbd_destroy_device);
5452 			rcu_read_lock();
5453 		}
5454 		set_bit(SIGNAL_ASENDER, &connection->flags);
5455 
5456 		spin_lock_irq(&connection->resource->req_lock);
5457 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5458 			struct drbd_device *device = peer_device->device;
5459 			not_empty = !list_empty(&device->done_ee);
5460 			if (not_empty)
5461 				break;
5462 		}
5463 		spin_unlock_irq(&connection->resource->req_lock);
5464 		rcu_read_unlock();
5465 	} while (not_empty);
5466 
5467 	return 0;
5468 }
5469 
5470 struct asender_cmd {
5471 	size_t pkt_size;
5472 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5473 };
5474 
5475 static struct asender_cmd asender_tbl[] = {
5476 	[P_PING]	    = { 0, got_Ping },
5477 	[P_PING_ACK]	    = { 0, got_PingAck },
5478 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5479 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5480 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5481 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5482 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5483 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5484 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5485 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5486 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5487 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5488 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5489 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5490 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5491 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5492 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5493 };
5494 
5495 int drbd_asender(struct drbd_thread *thi)
5496 {
5497 	struct drbd_connection *connection = thi->connection;
5498 	struct asender_cmd *cmd = NULL;
5499 	struct packet_info pi;
5500 	int rv;
5501 	void *buf    = connection->meta.rbuf;
5502 	int received = 0;
5503 	unsigned int header_size = drbd_header_size(connection);
5504 	int expect   = header_size;
5505 	bool ping_timeout_active = false;
5506 	struct net_conf *nc;
5507 	int ping_timeo, tcp_cork, ping_int;
5508 	struct sched_param param = { .sched_priority = 2 };
5509 
5510 	rv = sched_setscheduler(current, SCHED_RR, &param);
5511 	if (rv < 0)
5512 		drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5513 
5514 	while (get_t_state(thi) == RUNNING) {
5515 		drbd_thread_current_set_cpu(thi);
5516 
5517 		rcu_read_lock();
5518 		nc = rcu_dereference(connection->net_conf);
5519 		ping_timeo = nc->ping_timeo;
5520 		tcp_cork = nc->tcp_cork;
5521 		ping_int = nc->ping_int;
5522 		rcu_read_unlock();
5523 
5524 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5525 			if (drbd_send_ping(connection)) {
5526 				drbd_err(connection, "drbd_send_ping has failed\n");
5527 				goto reconnect;
5528 			}
5529 			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5530 			ping_timeout_active = true;
5531 		}
5532 
5533 		/* TODO: conditionally cork; it may hurt latency if we cork without
5534 		   much to send */
5535 		if (tcp_cork)
5536 			drbd_tcp_cork(connection->meta.socket);
5537 		if (connection_finish_peer_reqs(connection)) {
5538 			drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5539 			goto reconnect;
5540 		}
5541 		/* but unconditionally uncork unless disabled */
5542 		if (tcp_cork)
5543 			drbd_tcp_uncork(connection->meta.socket);
5544 
5545 		/* short circuit, recv_msg would return EINTR anyways. */
5546 		if (signal_pending(current))
5547 			continue;
5548 
5549 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5550 		clear_bit(SIGNAL_ASENDER, &connection->flags);
5551 
5552 		flush_signals(current);
5553 
5554 		/* Note:
5555 		 * -EINTR	 (on meta) we got a signal
5556 		 * -EAGAIN	 (on meta) rcvtimeo expired
5557 		 * -ECONNRESET	 other side closed the connection
5558 		 * -ERESTARTSYS  (on data) we got a signal
5559 		 * rv <  0	 other than above: unexpected error!
5560 		 * rv == expected: full header or command
5561 		 * rv <  expected: "woken" by signal during receive
5562 		 * rv == 0	 : "connection shut down by peer"
5563 		 */
5564 received_more:
5565 		if (likely(rv > 0)) {
5566 			received += rv;
5567 			buf	 += rv;
5568 		} else if (rv == 0) {
5569 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5570 				long t;
5571 				rcu_read_lock();
5572 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5573 				rcu_read_unlock();
5574 
5575 				t = wait_event_timeout(connection->ping_wait,
5576 						       connection->cstate < C_WF_REPORT_PARAMS,
5577 						       t);
5578 				if (t)
5579 					break;
5580 			}
5581 			drbd_err(connection, "meta connection shut down by peer.\n");
5582 			goto reconnect;
5583 		} else if (rv == -EAGAIN) {
5584 			/* If the data socket received something meanwhile,
5585 			 * that is good enough: peer is still alive. */
5586 			if (time_after(connection->last_received,
5587 				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5588 				continue;
5589 			if (ping_timeout_active) {
5590 				drbd_err(connection, "PingAck did not arrive in time.\n");
5591 				goto reconnect;
5592 			}
5593 			set_bit(SEND_PING, &connection->flags);
5594 			continue;
5595 		} else if (rv == -EINTR) {
5596 			continue;
5597 		} else {
5598 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5599 			goto reconnect;
5600 		}
5601 
5602 		if (received == expect && cmd == NULL) {
5603 			if (decode_header(connection, connection->meta.rbuf, &pi))
5604 				goto reconnect;
5605 			cmd = &asender_tbl[pi.cmd];
5606 			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5607 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5608 					 cmdname(pi.cmd), pi.cmd);
5609 				goto disconnect;
5610 			}
5611 			expect = header_size + cmd->pkt_size;
5612 			if (pi.size != expect - header_size) {
5613 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5614 					pi.cmd, pi.size);
5615 				goto reconnect;
5616 			}
5617 		}
5618 		if (received == expect) {
5619 			bool err;
5620 
5621 			err = cmd->fn(connection, &pi);
5622 			if (err) {
5623 				drbd_err(connection, "%pf failed\n", cmd->fn);
5624 				goto reconnect;
5625 			}
5626 
5627 			connection->last_received = jiffies;
5628 
5629 			if (cmd == &asender_tbl[P_PING_ACK]) {
5630 				/* restore idle timeout */
5631 				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5632 				ping_timeout_active = false;
5633 			}
5634 
5635 			buf	 = connection->meta.rbuf;
5636 			received = 0;
5637 			expect	 = header_size;
5638 			cmd	 = NULL;
5639 		}
5640 		if (test_bit(SEND_PING, &connection->flags))
5641 			continue;
5642 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5643 		if (rv > 0)
5644 			goto received_more;
5645 	}
5646 
5647 	if (0) {
5648 reconnect:
5649 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5650 		conn_md_sync(connection);
5651 	}
5652 	if (0) {
5653 disconnect:
5654 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5655 	}
5656 	clear_bit(SIGNAL_ASENDER, &connection->flags);
5657 
5658 	drbd_info(connection, "asender terminated\n");
5659 
5660 	return 0;
5661 }
5662