xref: /linux/drivers/block/drbd/drbd_receiver.c (revision f2ee442115c9b6219083c019939a9cc0c9abb2f8)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59 
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page)))
114 		++i, page = tmp;
115 	if (len)
116 		*len = i;
117 	return page;
118 }
119 
120 static int page_chain_free(struct page *page)
121 {
122 	struct page *tmp;
123 	int i = 0;
124 	page_chain_for_each_safe(page, tmp) {
125 		put_page(page);
126 		++i;
127 	}
128 	return i;
129 }
130 
131 static void page_chain_add(struct page **head,
132 		struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135 	struct page *tmp;
136 	tmp = page_chain_tail(chain_first, NULL);
137 	BUG_ON(tmp != chain_last);
138 #endif
139 
140 	/* add chain to head */
141 	set_page_private(chain_last, (unsigned long)*head);
142 	*head = chain_first;
143 }
144 
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147 	struct page *page = NULL;
148 	struct page *tmp = NULL;
149 	int i = 0;
150 
151 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
152 	 * So what. It saves a spin_lock. */
153 	if (drbd_pp_vacant >= number) {
154 		spin_lock(&drbd_pp_lock);
155 		page = page_chain_del(&drbd_pp_pool, number);
156 		if (page)
157 			drbd_pp_vacant -= number;
158 		spin_unlock(&drbd_pp_lock);
159 		if (page)
160 			return page;
161 	}
162 
163 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 	 * which in turn might block on the other node at this very place.  */
166 	for (i = 0; i < number; i++) {
167 		tmp = alloc_page(GFP_TRY);
168 		if (!tmp)
169 			break;
170 		set_page_private(tmp, (unsigned long)page);
171 		page = tmp;
172 	}
173 
174 	if (i == number)
175 		return page;
176 
177 	/* Not enough pages immediately available this time.
178 	 * No need to jump around here, drbd_pp_alloc will retry this
179 	 * function "soon". */
180 	if (page) {
181 		tmp = page_chain_tail(page, NULL);
182 		spin_lock(&drbd_pp_lock);
183 		page_chain_add(&drbd_pp_pool, page, tmp);
184 		drbd_pp_vacant += i;
185 		spin_unlock(&drbd_pp_lock);
186 	}
187 	return NULL;
188 }
189 
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192 	struct drbd_epoch_entry *e;
193 	struct list_head *le, *tle;
194 
195 	/* The EEs are always appended to the end of the list. Since
196 	   they are sent in order over the wire, they have to finish
197 	   in order. As soon as we see the first not finished we can
198 	   stop to examine the list... */
199 
200 	list_for_each_safe(le, tle, &mdev->net_ee) {
201 		e = list_entry(le, struct drbd_epoch_entry, w.list);
202 		if (drbd_ee_has_active_page(e))
203 			break;
204 		list_move(le, to_be_freed);
205 	}
206 }
207 
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210 	LIST_HEAD(reclaimed);
211 	struct drbd_epoch_entry *e, *t;
212 
213 	spin_lock_irq(&mdev->req_lock);
214 	reclaim_net_ee(mdev, &reclaimed);
215 	spin_unlock_irq(&mdev->req_lock);
216 
217 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
218 		drbd_free_net_ee(mdev, e);
219 }
220 
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:	DRBD device.
224  * @number:	number of pages requested
225  * @retry:	whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235 	struct page *page = NULL;
236 	DEFINE_WAIT(wait);
237 
238 	/* Yes, we may run up to @number over max_buffers. If we
239 	 * follow it strictly, the admin will get it wrong anyways. */
240 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242 
243 	while (page == NULL) {
244 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245 
246 		drbd_kick_lo_and_reclaim_net(mdev);
247 
248 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250 			if (page)
251 				break;
252 		}
253 
254 		if (!retry)
255 			break;
256 
257 		if (signal_pending(current)) {
258 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 			break;
260 		}
261 
262 		schedule();
263 	}
264 	finish_wait(&drbd_pp_wait, &wait);
265 
266 	if (page)
267 		atomic_add(number, &mdev->pp_in_use);
268 	return page;
269 }
270 
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278 	int i;
279 
280 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281 		i = page_chain_free(page);
282 	else {
283 		struct page *tmp;
284 		tmp = page_chain_tail(page, &i);
285 		spin_lock(&drbd_pp_lock);
286 		page_chain_add(&drbd_pp_pool, page, tmp);
287 		drbd_pp_vacant += i;
288 		spin_unlock(&drbd_pp_lock);
289 	}
290 	i = atomic_sub_return(i, a);
291 	if (i < 0)
292 		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294 	wake_up(&drbd_pp_wait);
295 }
296 
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300 
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311 
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313 				     u64 id,
314 				     sector_t sector,
315 				     unsigned int data_size,
316 				     gfp_t gfp_mask) __must_hold(local)
317 {
318 	struct drbd_epoch_entry *e;
319 	struct page *page;
320 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321 
322 	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323 		return NULL;
324 
325 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326 	if (!e) {
327 		if (!(gfp_mask & __GFP_NOWARN))
328 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329 		return NULL;
330 	}
331 
332 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333 	if (!page)
334 		goto fail;
335 
336 	INIT_HLIST_NODE(&e->collision);
337 	e->epoch = NULL;
338 	e->mdev = mdev;
339 	e->pages = page;
340 	atomic_set(&e->pending_bios, 0);
341 	e->size = data_size;
342 	e->flags = 0;
343 	e->sector = sector;
344 	e->block_id = id;
345 
346 	return e;
347 
348  fail:
349 	mempool_free(e, drbd_ee_mempool);
350 	return NULL;
351 }
352 
353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355 	if (e->flags & EE_HAS_DIGEST)
356 		kfree(e->digest);
357 	drbd_pp_free(mdev, e->pages, is_net);
358 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
359 	D_ASSERT(hlist_unhashed(&e->collision));
360 	mempool_free(e, drbd_ee_mempool);
361 }
362 
363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365 	LIST_HEAD(work_list);
366 	struct drbd_epoch_entry *e, *t;
367 	int count = 0;
368 	int is_net = list == &mdev->net_ee;
369 
370 	spin_lock_irq(&mdev->req_lock);
371 	list_splice_init(list, &work_list);
372 	spin_unlock_irq(&mdev->req_lock);
373 
374 	list_for_each_entry_safe(e, t, &work_list, w.list) {
375 		drbd_free_some_ee(mdev, e, is_net);
376 		count++;
377 	}
378 	return count;
379 }
380 
381 
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393 	LIST_HEAD(work_list);
394 	LIST_HEAD(reclaimed);
395 	struct drbd_epoch_entry *e, *t;
396 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397 
398 	spin_lock_irq(&mdev->req_lock);
399 	reclaim_net_ee(mdev, &reclaimed);
400 	list_splice_init(&mdev->done_ee, &work_list);
401 	spin_unlock_irq(&mdev->req_lock);
402 
403 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
404 		drbd_free_net_ee(mdev, e);
405 
406 	/* possible callbacks here:
407 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
408 	 * all ignore the last argument.
409 	 */
410 	list_for_each_entry_safe(e, t, &work_list, w.list) {
411 		/* list_del not necessary, next/prev members not touched */
412 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
413 		drbd_free_ee(mdev, e);
414 	}
415 	wake_up(&mdev->ee_wait);
416 
417 	return ok;
418 }
419 
420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422 	DEFINE_WAIT(wait);
423 
424 	/* avoids spin_lock/unlock
425 	 * and calling prepare_to_wait in the fast path */
426 	while (!list_empty(head)) {
427 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428 		spin_unlock_irq(&mdev->req_lock);
429 		io_schedule();
430 		finish_wait(&mdev->ee_wait, &wait);
431 		spin_lock_irq(&mdev->req_lock);
432 	}
433 }
434 
435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437 	spin_lock_irq(&mdev->req_lock);
438 	_drbd_wait_ee_list_empty(mdev, head);
439 	spin_unlock_irq(&mdev->req_lock);
440 }
441 
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445 		struct socket *sock, struct socket **newsock)
446 {
447 	struct sock *sk = sock->sk;
448 	int err = 0;
449 
450 	*what = "listen";
451 	err = sock->ops->listen(sock, 5);
452 	if (err < 0)
453 		goto out;
454 
455 	*what = "sock_create_lite";
456 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457 			       newsock);
458 	if (err < 0)
459 		goto out;
460 
461 	*what = "accept";
462 	err = sock->ops->accept(sock, *newsock, 0);
463 	if (err < 0) {
464 		sock_release(*newsock);
465 		*newsock = NULL;
466 		goto out;
467 	}
468 	(*newsock)->ops  = sock->ops;
469 
470 out:
471 	return err;
472 }
473 
474 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
475 		    void *buf, size_t size, int flags)
476 {
477 	mm_segment_t oldfs;
478 	struct kvec iov = {
479 		.iov_base = buf,
480 		.iov_len = size,
481 	};
482 	struct msghdr msg = {
483 		.msg_iovlen = 1,
484 		.msg_iov = (struct iovec *)&iov,
485 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486 	};
487 	int rv;
488 
489 	oldfs = get_fs();
490 	set_fs(KERNEL_DS);
491 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492 	set_fs(oldfs);
493 
494 	return rv;
495 }
496 
497 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
498 {
499 	mm_segment_t oldfs;
500 	struct kvec iov = {
501 		.iov_base = buf,
502 		.iov_len = size,
503 	};
504 	struct msghdr msg = {
505 		.msg_iovlen = 1,
506 		.msg_iov = (struct iovec *)&iov,
507 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
508 	};
509 	int rv;
510 
511 	oldfs = get_fs();
512 	set_fs(KERNEL_DS);
513 
514 	for (;;) {
515 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
516 		if (rv == size)
517 			break;
518 
519 		/* Note:
520 		 * ECONNRESET	other side closed the connection
521 		 * ERESTARTSYS	(on  sock) we got a signal
522 		 */
523 
524 		if (rv < 0) {
525 			if (rv == -ECONNRESET)
526 				dev_info(DEV, "sock was reset by peer\n");
527 			else if (rv != -ERESTARTSYS)
528 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
529 			break;
530 		} else if (rv == 0) {
531 			dev_info(DEV, "sock was shut down by peer\n");
532 			break;
533 		} else	{
534 			/* signal came in, or peer/link went down,
535 			 * after we read a partial message
536 			 */
537 			/* D_ASSERT(signal_pending(current)); */
538 			break;
539 		}
540 	};
541 
542 	set_fs(oldfs);
543 
544 	if (rv != size)
545 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
546 
547 	return rv;
548 }
549 
550 /* quoting tcp(7):
551  *   On individual connections, the socket buffer size must be set prior to the
552  *   listen(2) or connect(2) calls in order to have it take effect.
553  * This is our wrapper to do so.
554  */
555 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556 		unsigned int rcv)
557 {
558 	/* open coded SO_SNDBUF, SO_RCVBUF */
559 	if (snd) {
560 		sock->sk->sk_sndbuf = snd;
561 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562 	}
563 	if (rcv) {
564 		sock->sk->sk_rcvbuf = rcv;
565 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
566 	}
567 }
568 
569 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
570 {
571 	const char *what;
572 	struct socket *sock;
573 	struct sockaddr_in6 src_in6;
574 	int err;
575 	int disconnect_on_error = 1;
576 
577 	if (!get_net_conf(mdev))
578 		return NULL;
579 
580 	what = "sock_create_kern";
581 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
582 		SOCK_STREAM, IPPROTO_TCP, &sock);
583 	if (err < 0) {
584 		sock = NULL;
585 		goto out;
586 	}
587 
588 	sock->sk->sk_rcvtimeo =
589 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
590 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
591 			mdev->net_conf->rcvbuf_size);
592 
593        /* explicitly bind to the configured IP as source IP
594 	*  for the outgoing connections.
595 	*  This is needed for multihomed hosts and to be
596 	*  able to use lo: interfaces for drbd.
597 	* Make sure to use 0 as port number, so linux selects
598 	*  a free one dynamically.
599 	*/
600 	memcpy(&src_in6, mdev->net_conf->my_addr,
601 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
602 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
603 		src_in6.sin6_port = 0;
604 	else
605 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
606 
607 	what = "bind before connect";
608 	err = sock->ops->bind(sock,
609 			      (struct sockaddr *) &src_in6,
610 			      mdev->net_conf->my_addr_len);
611 	if (err < 0)
612 		goto out;
613 
614 	/* connect may fail, peer not yet available.
615 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
616 	disconnect_on_error = 0;
617 	what = "connect";
618 	err = sock->ops->connect(sock,
619 				 (struct sockaddr *)mdev->net_conf->peer_addr,
620 				 mdev->net_conf->peer_addr_len, 0);
621 
622 out:
623 	if (err < 0) {
624 		if (sock) {
625 			sock_release(sock);
626 			sock = NULL;
627 		}
628 		switch (-err) {
629 			/* timeout, busy, signal pending */
630 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
631 		case EINTR: case ERESTARTSYS:
632 			/* peer not (yet) available, network problem */
633 		case ECONNREFUSED: case ENETUNREACH:
634 		case EHOSTDOWN:    case EHOSTUNREACH:
635 			disconnect_on_error = 0;
636 			break;
637 		default:
638 			dev_err(DEV, "%s failed, err = %d\n", what, err);
639 		}
640 		if (disconnect_on_error)
641 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
642 	}
643 	put_net_conf(mdev);
644 	return sock;
645 }
646 
647 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
648 {
649 	int timeo, err;
650 	struct socket *s_estab = NULL, *s_listen;
651 	const char *what;
652 
653 	if (!get_net_conf(mdev))
654 		return NULL;
655 
656 	what = "sock_create_kern";
657 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
658 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
659 	if (err) {
660 		s_listen = NULL;
661 		goto out;
662 	}
663 
664 	timeo = mdev->net_conf->try_connect_int * HZ;
665 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
666 
667 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
668 	s_listen->sk->sk_rcvtimeo = timeo;
669 	s_listen->sk->sk_sndtimeo = timeo;
670 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
671 			mdev->net_conf->rcvbuf_size);
672 
673 	what = "bind before listen";
674 	err = s_listen->ops->bind(s_listen,
675 			      (struct sockaddr *) mdev->net_conf->my_addr,
676 			      mdev->net_conf->my_addr_len);
677 	if (err < 0)
678 		goto out;
679 
680 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
681 
682 out:
683 	if (s_listen)
684 		sock_release(s_listen);
685 	if (err < 0) {
686 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
687 			dev_err(DEV, "%s failed, err = %d\n", what, err);
688 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
689 		}
690 	}
691 	put_net_conf(mdev);
692 
693 	return s_estab;
694 }
695 
696 static int drbd_send_fp(struct drbd_conf *mdev,
697 	struct socket *sock, enum drbd_packets cmd)
698 {
699 	struct p_header80 *h = &mdev->data.sbuf.header.h80;
700 
701 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
702 }
703 
704 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
705 {
706 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
707 	int rr;
708 
709 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
710 
711 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
712 		return be16_to_cpu(h->command);
713 
714 	return 0xffff;
715 }
716 
717 /**
718  * drbd_socket_okay() - Free the socket if its connection is not okay
719  * @mdev:	DRBD device.
720  * @sock:	pointer to the pointer to the socket.
721  */
722 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
723 {
724 	int rr;
725 	char tb[4];
726 
727 	if (!*sock)
728 		return false;
729 
730 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
731 
732 	if (rr > 0 || rr == -EAGAIN) {
733 		return true;
734 	} else {
735 		sock_release(*sock);
736 		*sock = NULL;
737 		return false;
738 	}
739 }
740 
741 /*
742  * return values:
743  *   1 yes, we have a valid connection
744  *   0 oops, did not work out, please try again
745  *  -1 peer talks different language,
746  *     no point in trying again, please go standalone.
747  *  -2 We do not have a network config...
748  */
749 static int drbd_connect(struct drbd_conf *mdev)
750 {
751 	struct socket *s, *sock, *msock;
752 	int try, h, ok;
753 
754 	D_ASSERT(!mdev->data.socket);
755 
756 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
757 		return -2;
758 
759 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
760 
761 	sock  = NULL;
762 	msock = NULL;
763 
764 	do {
765 		for (try = 0;;) {
766 			/* 3 tries, this should take less than a second! */
767 			s = drbd_try_connect(mdev);
768 			if (s || ++try >= 3)
769 				break;
770 			/* give the other side time to call bind() & listen() */
771 			schedule_timeout_interruptible(HZ / 10);
772 		}
773 
774 		if (s) {
775 			if (!sock) {
776 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
777 				sock = s;
778 				s = NULL;
779 			} else if (!msock) {
780 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
781 				msock = s;
782 				s = NULL;
783 			} else {
784 				dev_err(DEV, "Logic error in drbd_connect()\n");
785 				goto out_release_sockets;
786 			}
787 		}
788 
789 		if (sock && msock) {
790 			schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
791 			ok = drbd_socket_okay(mdev, &sock);
792 			ok = drbd_socket_okay(mdev, &msock) && ok;
793 			if (ok)
794 				break;
795 		}
796 
797 retry:
798 		s = drbd_wait_for_connect(mdev);
799 		if (s) {
800 			try = drbd_recv_fp(mdev, s);
801 			drbd_socket_okay(mdev, &sock);
802 			drbd_socket_okay(mdev, &msock);
803 			switch (try) {
804 			case P_HAND_SHAKE_S:
805 				if (sock) {
806 					dev_warn(DEV, "initial packet S crossed\n");
807 					sock_release(sock);
808 				}
809 				sock = s;
810 				break;
811 			case P_HAND_SHAKE_M:
812 				if (msock) {
813 					dev_warn(DEV, "initial packet M crossed\n");
814 					sock_release(msock);
815 				}
816 				msock = s;
817 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
818 				break;
819 			default:
820 				dev_warn(DEV, "Error receiving initial packet\n");
821 				sock_release(s);
822 				if (random32() & 1)
823 					goto retry;
824 			}
825 		}
826 
827 		if (mdev->state.conn <= C_DISCONNECTING)
828 			goto out_release_sockets;
829 		if (signal_pending(current)) {
830 			flush_signals(current);
831 			smp_rmb();
832 			if (get_t_state(&mdev->receiver) == Exiting)
833 				goto out_release_sockets;
834 		}
835 
836 		if (sock && msock) {
837 			ok = drbd_socket_okay(mdev, &sock);
838 			ok = drbd_socket_okay(mdev, &msock) && ok;
839 			if (ok)
840 				break;
841 		}
842 	} while (1);
843 
844 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
845 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
846 
847 	sock->sk->sk_allocation = GFP_NOIO;
848 	msock->sk->sk_allocation = GFP_NOIO;
849 
850 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
851 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
852 
853 	/* NOT YET ...
854 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856 	 * first set it to the P_HAND_SHAKE timeout,
857 	 * which we set to 4x the configured ping_timeout. */
858 	sock->sk->sk_sndtimeo =
859 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860 
861 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863 
864 	/* we don't want delays.
865 	 * we use TCP_CORK where appropriate, though */
866 	drbd_tcp_nodelay(sock);
867 	drbd_tcp_nodelay(msock);
868 
869 	mdev->data.socket = sock;
870 	mdev->meta.socket = msock;
871 	mdev->last_received = jiffies;
872 
873 	D_ASSERT(mdev->asender.task == NULL);
874 
875 	h = drbd_do_handshake(mdev);
876 	if (h <= 0)
877 		return h;
878 
879 	if (mdev->cram_hmac_tfm) {
880 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
881 		switch (drbd_do_auth(mdev)) {
882 		case -1:
883 			dev_err(DEV, "Authentication of peer failed\n");
884 			return -1;
885 		case 0:
886 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
887 			return 0;
888 		}
889 	}
890 
891 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
892 		return 0;
893 
894 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
895 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
896 
897 	atomic_set(&mdev->packet_seq, 0);
898 	mdev->peer_seq = 0;
899 
900 	drbd_thread_start(&mdev->asender);
901 
902 	if (drbd_send_protocol(mdev) == -1)
903 		return -1;
904 	drbd_send_sync_param(mdev, &mdev->sync_conf);
905 	drbd_send_sizes(mdev, 0, 0);
906 	drbd_send_uuids(mdev);
907 	drbd_send_state(mdev);
908 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
909 	clear_bit(RESIZE_PENDING, &mdev->flags);
910 	mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
911 
912 	return 1;
913 
914 out_release_sockets:
915 	if (sock)
916 		sock_release(sock);
917 	if (msock)
918 		sock_release(msock);
919 	return -1;
920 }
921 
922 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
923 {
924 	union p_header *h = &mdev->data.rbuf.header;
925 	int r;
926 
927 	r = drbd_recv(mdev, h, sizeof(*h));
928 	if (unlikely(r != sizeof(*h))) {
929 		if (!signal_pending(current))
930 			dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
931 		return false;
932 	}
933 
934 	if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
935 		*cmd = be16_to_cpu(h->h80.command);
936 		*packet_size = be16_to_cpu(h->h80.length);
937 	} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
938 		*cmd = be16_to_cpu(h->h95.command);
939 		*packet_size = be32_to_cpu(h->h95.length);
940 	} else {
941 		dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
942 		    be32_to_cpu(h->h80.magic),
943 		    be16_to_cpu(h->h80.command),
944 		    be16_to_cpu(h->h80.length));
945 		return false;
946 	}
947 	mdev->last_received = jiffies;
948 
949 	return true;
950 }
951 
952 static void drbd_flush(struct drbd_conf *mdev)
953 {
954 	int rv;
955 
956 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
957 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
958 					NULL);
959 		if (rv) {
960 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
961 			/* would rather check on EOPNOTSUPP, but that is not reliable.
962 			 * don't try again for ANY return value != 0
963 			 * if (rv == -EOPNOTSUPP) */
964 			drbd_bump_write_ordering(mdev, WO_drain_io);
965 		}
966 		put_ldev(mdev);
967 	}
968 }
969 
970 /**
971  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
972  * @mdev:	DRBD device.
973  * @epoch:	Epoch object.
974  * @ev:		Epoch event.
975  */
976 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
977 					       struct drbd_epoch *epoch,
978 					       enum epoch_event ev)
979 {
980 	int epoch_size;
981 	struct drbd_epoch *next_epoch;
982 	enum finish_epoch rv = FE_STILL_LIVE;
983 
984 	spin_lock(&mdev->epoch_lock);
985 	do {
986 		next_epoch = NULL;
987 
988 		epoch_size = atomic_read(&epoch->epoch_size);
989 
990 		switch (ev & ~EV_CLEANUP) {
991 		case EV_PUT:
992 			atomic_dec(&epoch->active);
993 			break;
994 		case EV_GOT_BARRIER_NR:
995 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
996 			break;
997 		case EV_BECAME_LAST:
998 			/* nothing to do*/
999 			break;
1000 		}
1001 
1002 		if (epoch_size != 0 &&
1003 		    atomic_read(&epoch->active) == 0 &&
1004 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1005 			if (!(ev & EV_CLEANUP)) {
1006 				spin_unlock(&mdev->epoch_lock);
1007 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1008 				spin_lock(&mdev->epoch_lock);
1009 			}
1010 			dec_unacked(mdev);
1011 
1012 			if (mdev->current_epoch != epoch) {
1013 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1014 				list_del(&epoch->list);
1015 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1016 				mdev->epochs--;
1017 				kfree(epoch);
1018 
1019 				if (rv == FE_STILL_LIVE)
1020 					rv = FE_DESTROYED;
1021 			} else {
1022 				epoch->flags = 0;
1023 				atomic_set(&epoch->epoch_size, 0);
1024 				/* atomic_set(&epoch->active, 0); is already zero */
1025 				if (rv == FE_STILL_LIVE)
1026 					rv = FE_RECYCLED;
1027 				wake_up(&mdev->ee_wait);
1028 			}
1029 		}
1030 
1031 		if (!next_epoch)
1032 			break;
1033 
1034 		epoch = next_epoch;
1035 	} while (1);
1036 
1037 	spin_unlock(&mdev->epoch_lock);
1038 
1039 	return rv;
1040 }
1041 
1042 /**
1043  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1044  * @mdev:	DRBD device.
1045  * @wo:		Write ordering method to try.
1046  */
1047 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1048 {
1049 	enum write_ordering_e pwo;
1050 	static char *write_ordering_str[] = {
1051 		[WO_none] = "none",
1052 		[WO_drain_io] = "drain",
1053 		[WO_bdev_flush] = "flush",
1054 	};
1055 
1056 	pwo = mdev->write_ordering;
1057 	wo = min(pwo, wo);
1058 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1059 		wo = WO_drain_io;
1060 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1061 		wo = WO_none;
1062 	mdev->write_ordering = wo;
1063 	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1064 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1065 }
1066 
1067 /**
1068  * drbd_submit_ee()
1069  * @mdev:	DRBD device.
1070  * @e:		epoch entry
1071  * @rw:		flag field, see bio->bi_rw
1072  *
1073  * May spread the pages to multiple bios,
1074  * depending on bio_add_page restrictions.
1075  *
1076  * Returns 0 if all bios have been submitted,
1077  * -ENOMEM if we could not allocate enough bios,
1078  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1079  *  single page to an empty bio (which should never happen and likely indicates
1080  *  that the lower level IO stack is in some way broken). This has been observed
1081  *  on certain Xen deployments.
1082  */
1083 /* TODO allocate from our own bio_set. */
1084 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1085 		const unsigned rw, const int fault_type)
1086 {
1087 	struct bio *bios = NULL;
1088 	struct bio *bio;
1089 	struct page *page = e->pages;
1090 	sector_t sector = e->sector;
1091 	unsigned ds = e->size;
1092 	unsigned n_bios = 0;
1093 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1094 	int err = -ENOMEM;
1095 
1096 	/* In most cases, we will only need one bio.  But in case the lower
1097 	 * level restrictions happen to be different at this offset on this
1098 	 * side than those of the sending peer, we may need to submit the
1099 	 * request in more than one bio. */
1100 next_bio:
1101 	bio = bio_alloc(GFP_NOIO, nr_pages);
1102 	if (!bio) {
1103 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1104 		goto fail;
1105 	}
1106 	/* > e->sector, unless this is the first bio */
1107 	bio->bi_sector = sector;
1108 	bio->bi_bdev = mdev->ldev->backing_bdev;
1109 	bio->bi_rw = rw;
1110 	bio->bi_private = e;
1111 	bio->bi_end_io = drbd_endio_sec;
1112 
1113 	bio->bi_next = bios;
1114 	bios = bio;
1115 	++n_bios;
1116 
1117 	page_chain_for_each(page) {
1118 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1119 		if (!bio_add_page(bio, page, len, 0)) {
1120 			/* A single page must always be possible!
1121 			 * But in case it fails anyways,
1122 			 * we deal with it, and complain (below). */
1123 			if (bio->bi_vcnt == 0) {
1124 				dev_err(DEV,
1125 					"bio_add_page failed for len=%u, "
1126 					"bi_vcnt=0 (bi_sector=%llu)\n",
1127 					len, (unsigned long long)bio->bi_sector);
1128 				err = -ENOSPC;
1129 				goto fail;
1130 			}
1131 			goto next_bio;
1132 		}
1133 		ds -= len;
1134 		sector += len >> 9;
1135 		--nr_pages;
1136 	}
1137 	D_ASSERT(page == NULL);
1138 	D_ASSERT(ds == 0);
1139 
1140 	atomic_set(&e->pending_bios, n_bios);
1141 	do {
1142 		bio = bios;
1143 		bios = bios->bi_next;
1144 		bio->bi_next = NULL;
1145 
1146 		drbd_generic_make_request(mdev, fault_type, bio);
1147 	} while (bios);
1148 	return 0;
1149 
1150 fail:
1151 	while (bios) {
1152 		bio = bios;
1153 		bios = bios->bi_next;
1154 		bio_put(bio);
1155 	}
1156 	return err;
1157 }
1158 
1159 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1160 {
1161 	int rv;
1162 	struct p_barrier *p = &mdev->data.rbuf.barrier;
1163 	struct drbd_epoch *epoch;
1164 
1165 	inc_unacked(mdev);
1166 
1167 	mdev->current_epoch->barrier_nr = p->barrier;
1168 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1169 
1170 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1171 	 * the activity log, which means it would not be resynced in case the
1172 	 * R_PRIMARY crashes now.
1173 	 * Therefore we must send the barrier_ack after the barrier request was
1174 	 * completed. */
1175 	switch (mdev->write_ordering) {
1176 	case WO_none:
1177 		if (rv == FE_RECYCLED)
1178 			return true;
1179 
1180 		/* receiver context, in the writeout path of the other node.
1181 		 * avoid potential distributed deadlock */
1182 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1183 		if (epoch)
1184 			break;
1185 		else
1186 			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1187 			/* Fall through */
1188 
1189 	case WO_bdev_flush:
1190 	case WO_drain_io:
1191 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1192 		drbd_flush(mdev);
1193 
1194 		if (atomic_read(&mdev->current_epoch->epoch_size)) {
1195 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1196 			if (epoch)
1197 				break;
1198 		}
1199 
1200 		epoch = mdev->current_epoch;
1201 		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1202 
1203 		D_ASSERT(atomic_read(&epoch->active) == 0);
1204 		D_ASSERT(epoch->flags == 0);
1205 
1206 		return true;
1207 	default:
1208 		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1209 		return false;
1210 	}
1211 
1212 	epoch->flags = 0;
1213 	atomic_set(&epoch->epoch_size, 0);
1214 	atomic_set(&epoch->active, 0);
1215 
1216 	spin_lock(&mdev->epoch_lock);
1217 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1218 		list_add(&epoch->list, &mdev->current_epoch->list);
1219 		mdev->current_epoch = epoch;
1220 		mdev->epochs++;
1221 	} else {
1222 		/* The current_epoch got recycled while we allocated this one... */
1223 		kfree(epoch);
1224 	}
1225 	spin_unlock(&mdev->epoch_lock);
1226 
1227 	return true;
1228 }
1229 
1230 /* used from receive_RSDataReply (recv_resync_read)
1231  * and from receive_Data */
1232 static struct drbd_epoch_entry *
1233 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1234 {
1235 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1236 	struct drbd_epoch_entry *e;
1237 	struct page *page;
1238 	int dgs, ds, rr;
1239 	void *dig_in = mdev->int_dig_in;
1240 	void *dig_vv = mdev->int_dig_vv;
1241 	unsigned long *data;
1242 
1243 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1244 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1245 
1246 	if (dgs) {
1247 		rr = drbd_recv(mdev, dig_in, dgs);
1248 		if (rr != dgs) {
1249 			if (!signal_pending(current))
1250 				dev_warn(DEV,
1251 					"short read receiving data digest: read %d expected %d\n",
1252 					rr, dgs);
1253 			return NULL;
1254 		}
1255 	}
1256 
1257 	data_size -= dgs;
1258 
1259 	ERR_IF(data_size == 0) return NULL;
1260 	ERR_IF(data_size &  0x1ff) return NULL;
1261 	ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1262 
1263 	/* even though we trust out peer,
1264 	 * we sometimes have to double check. */
1265 	if (sector + (data_size>>9) > capacity) {
1266 		dev_err(DEV, "request from peer beyond end of local disk: "
1267 			"capacity: %llus < sector: %llus + size: %u\n",
1268 			(unsigned long long)capacity,
1269 			(unsigned long long)sector, data_size);
1270 		return NULL;
1271 	}
1272 
1273 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1274 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1275 	 * which in turn might block on the other node at this very place.  */
1276 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1277 	if (!e)
1278 		return NULL;
1279 
1280 	ds = data_size;
1281 	page = e->pages;
1282 	page_chain_for_each(page) {
1283 		unsigned len = min_t(int, ds, PAGE_SIZE);
1284 		data = kmap(page);
1285 		rr = drbd_recv(mdev, data, len);
1286 		if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1287 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1288 			data[0] = data[0] ^ (unsigned long)-1;
1289 		}
1290 		kunmap(page);
1291 		if (rr != len) {
1292 			drbd_free_ee(mdev, e);
1293 			if (!signal_pending(current))
1294 				dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1295 				rr, len);
1296 			return NULL;
1297 		}
1298 		ds -= rr;
1299 	}
1300 
1301 	if (dgs) {
1302 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1303 		if (memcmp(dig_in, dig_vv, dgs)) {
1304 			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1305 				(unsigned long long)sector, data_size);
1306 			drbd_bcast_ee(mdev, "digest failed",
1307 					dgs, dig_in, dig_vv, e);
1308 			drbd_free_ee(mdev, e);
1309 			return NULL;
1310 		}
1311 	}
1312 	mdev->recv_cnt += data_size>>9;
1313 	return e;
1314 }
1315 
1316 /* drbd_drain_block() just takes a data block
1317  * out of the socket input buffer, and discards it.
1318  */
1319 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1320 {
1321 	struct page *page;
1322 	int rr, rv = 1;
1323 	void *data;
1324 
1325 	if (!data_size)
1326 		return true;
1327 
1328 	page = drbd_pp_alloc(mdev, 1, 1);
1329 
1330 	data = kmap(page);
1331 	while (data_size) {
1332 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334 			rv = 0;
1335 			if (!signal_pending(current))
1336 				dev_warn(DEV,
1337 					"short read receiving data: read %d expected %d\n",
1338 					rr, min_t(int, data_size, PAGE_SIZE));
1339 			break;
1340 		}
1341 		data_size -= rr;
1342 	}
1343 	kunmap(page);
1344 	drbd_pp_free(mdev, page, 0);
1345 	return rv;
1346 }
1347 
1348 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1349 			   sector_t sector, int data_size)
1350 {
1351 	struct bio_vec *bvec;
1352 	struct bio *bio;
1353 	int dgs, rr, i, expect;
1354 	void *dig_in = mdev->int_dig_in;
1355 	void *dig_vv = mdev->int_dig_vv;
1356 
1357 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1358 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1359 
1360 	if (dgs) {
1361 		rr = drbd_recv(mdev, dig_in, dgs);
1362 		if (rr != dgs) {
1363 			if (!signal_pending(current))
1364 				dev_warn(DEV,
1365 					"short read receiving data reply digest: read %d expected %d\n",
1366 					rr, dgs);
1367 			return 0;
1368 		}
1369 	}
1370 
1371 	data_size -= dgs;
1372 
1373 	/* optimistically update recv_cnt.  if receiving fails below,
1374 	 * we disconnect anyways, and counters will be reset. */
1375 	mdev->recv_cnt += data_size>>9;
1376 
1377 	bio = req->master_bio;
1378 	D_ASSERT(sector == bio->bi_sector);
1379 
1380 	bio_for_each_segment(bvec, bio, i) {
1381 		expect = min_t(int, data_size, bvec->bv_len);
1382 		rr = drbd_recv(mdev,
1383 			     kmap(bvec->bv_page)+bvec->bv_offset,
1384 			     expect);
1385 		kunmap(bvec->bv_page);
1386 		if (rr != expect) {
1387 			if (!signal_pending(current))
1388 				dev_warn(DEV, "short read receiving data reply: "
1389 					"read %d expected %d\n",
1390 					rr, expect);
1391 			return 0;
1392 		}
1393 		data_size -= rr;
1394 	}
1395 
1396 	if (dgs) {
1397 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1398 		if (memcmp(dig_in, dig_vv, dgs)) {
1399 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1400 			return 0;
1401 		}
1402 	}
1403 
1404 	D_ASSERT(data_size == 0);
1405 	return 1;
1406 }
1407 
1408 /* e_end_resync_block() is called via
1409  * drbd_process_done_ee() by asender only */
1410 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1411 {
1412 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1413 	sector_t sector = e->sector;
1414 	int ok;
1415 
1416 	D_ASSERT(hlist_unhashed(&e->collision));
1417 
1418 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1419 		drbd_set_in_sync(mdev, sector, e->size);
1420 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1421 	} else {
1422 		/* Record failure to sync */
1423 		drbd_rs_failed_io(mdev, sector, e->size);
1424 
1425 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1426 	}
1427 	dec_unacked(mdev);
1428 
1429 	return ok;
1430 }
1431 
1432 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1433 {
1434 	struct drbd_epoch_entry *e;
1435 
1436 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1437 	if (!e)
1438 		goto fail;
1439 
1440 	dec_rs_pending(mdev);
1441 
1442 	inc_unacked(mdev);
1443 	/* corresponding dec_unacked() in e_end_resync_block()
1444 	 * respective _drbd_clear_done_ee */
1445 
1446 	e->w.cb = e_end_resync_block;
1447 
1448 	spin_lock_irq(&mdev->req_lock);
1449 	list_add(&e->w.list, &mdev->sync_ee);
1450 	spin_unlock_irq(&mdev->req_lock);
1451 
1452 	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1453 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1454 		return true;
1455 
1456 	/* don't care for the reason here */
1457 	dev_err(DEV, "submit failed, triggering re-connect\n");
1458 	spin_lock_irq(&mdev->req_lock);
1459 	list_del(&e->w.list);
1460 	spin_unlock_irq(&mdev->req_lock);
1461 
1462 	drbd_free_ee(mdev, e);
1463 fail:
1464 	put_ldev(mdev);
1465 	return false;
1466 }
1467 
1468 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1469 {
1470 	struct drbd_request *req;
1471 	sector_t sector;
1472 	int ok;
1473 	struct p_data *p = &mdev->data.rbuf.data;
1474 
1475 	sector = be64_to_cpu(p->sector);
1476 
1477 	spin_lock_irq(&mdev->req_lock);
1478 	req = _ar_id_to_req(mdev, p->block_id, sector);
1479 	spin_unlock_irq(&mdev->req_lock);
1480 	if (unlikely(!req)) {
1481 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1482 		return false;
1483 	}
1484 
1485 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1486 	 * special casing it there for the various failure cases.
1487 	 * still no race with drbd_fail_pending_reads */
1488 	ok = recv_dless_read(mdev, req, sector, data_size);
1489 
1490 	if (ok)
1491 		req_mod(req, data_received);
1492 	/* else: nothing. handled from drbd_disconnect...
1493 	 * I don't think we may complete this just yet
1494 	 * in case we are "on-disconnect: freeze" */
1495 
1496 	return ok;
1497 }
1498 
1499 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1500 {
1501 	sector_t sector;
1502 	int ok;
1503 	struct p_data *p = &mdev->data.rbuf.data;
1504 
1505 	sector = be64_to_cpu(p->sector);
1506 	D_ASSERT(p->block_id == ID_SYNCER);
1507 
1508 	if (get_ldev(mdev)) {
1509 		/* data is submitted to disk within recv_resync_read.
1510 		 * corresponding put_ldev done below on error,
1511 		 * or in drbd_endio_write_sec. */
1512 		ok = recv_resync_read(mdev, sector, data_size);
1513 	} else {
1514 		if (__ratelimit(&drbd_ratelimit_state))
1515 			dev_err(DEV, "Can not write resync data to local disk.\n");
1516 
1517 		ok = drbd_drain_block(mdev, data_size);
1518 
1519 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1520 	}
1521 
1522 	atomic_add(data_size >> 9, &mdev->rs_sect_in);
1523 
1524 	return ok;
1525 }
1526 
1527 /* e_end_block() is called via drbd_process_done_ee().
1528  * this means this function only runs in the asender thread
1529  */
1530 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1531 {
1532 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1533 	sector_t sector = e->sector;
1534 	int ok = 1, pcmd;
1535 
1536 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1537 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1538 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1539 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1540 				e->flags & EE_MAY_SET_IN_SYNC) ?
1541 				P_RS_WRITE_ACK : P_WRITE_ACK;
1542 			ok &= drbd_send_ack(mdev, pcmd, e);
1543 			if (pcmd == P_RS_WRITE_ACK)
1544 				drbd_set_in_sync(mdev, sector, e->size);
1545 		} else {
1546 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1547 			/* we expect it to be marked out of sync anyways...
1548 			 * maybe assert this?  */
1549 		}
1550 		dec_unacked(mdev);
1551 	}
1552 	/* we delete from the conflict detection hash _after_ we sent out the
1553 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1554 	if (mdev->net_conf->two_primaries) {
1555 		spin_lock_irq(&mdev->req_lock);
1556 		D_ASSERT(!hlist_unhashed(&e->collision));
1557 		hlist_del_init(&e->collision);
1558 		spin_unlock_irq(&mdev->req_lock);
1559 	} else {
1560 		D_ASSERT(hlist_unhashed(&e->collision));
1561 	}
1562 
1563 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1564 
1565 	return ok;
1566 }
1567 
1568 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1569 {
1570 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1571 	int ok = 1;
1572 
1573 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1574 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1575 
1576 	spin_lock_irq(&mdev->req_lock);
1577 	D_ASSERT(!hlist_unhashed(&e->collision));
1578 	hlist_del_init(&e->collision);
1579 	spin_unlock_irq(&mdev->req_lock);
1580 
1581 	dec_unacked(mdev);
1582 
1583 	return ok;
1584 }
1585 
1586 /* Called from receive_Data.
1587  * Synchronize packets on sock with packets on msock.
1588  *
1589  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1590  * packet traveling on msock, they are still processed in the order they have
1591  * been sent.
1592  *
1593  * Note: we don't care for Ack packets overtaking P_DATA packets.
1594  *
1595  * In case packet_seq is larger than mdev->peer_seq number, there are
1596  * outstanding packets on the msock. We wait for them to arrive.
1597  * In case we are the logically next packet, we update mdev->peer_seq
1598  * ourselves. Correctly handles 32bit wrap around.
1599  *
1600  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1601  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1602  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1603  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1604  *
1605  * returns 0 if we may process the packet,
1606  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1607 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1608 {
1609 	DEFINE_WAIT(wait);
1610 	unsigned int p_seq;
1611 	long timeout;
1612 	int ret = 0;
1613 	spin_lock(&mdev->peer_seq_lock);
1614 	for (;;) {
1615 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1616 		if (seq_le(packet_seq, mdev->peer_seq+1))
1617 			break;
1618 		if (signal_pending(current)) {
1619 			ret = -ERESTARTSYS;
1620 			break;
1621 		}
1622 		p_seq = mdev->peer_seq;
1623 		spin_unlock(&mdev->peer_seq_lock);
1624 		timeout = schedule_timeout(30*HZ);
1625 		spin_lock(&mdev->peer_seq_lock);
1626 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1627 			ret = -ETIMEDOUT;
1628 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1629 			break;
1630 		}
1631 	}
1632 	finish_wait(&mdev->seq_wait, &wait);
1633 	if (mdev->peer_seq+1 == packet_seq)
1634 		mdev->peer_seq++;
1635 	spin_unlock(&mdev->peer_seq_lock);
1636 	return ret;
1637 }
1638 
1639 /* see also bio_flags_to_wire()
1640  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1641  * flags and back. We may replicate to other kernel versions. */
1642 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1643 {
1644 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1645 		(dpf & DP_FUA ? REQ_FUA : 0) |
1646 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1647 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1648 }
1649 
1650 /* mirrored write */
1651 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1652 {
1653 	sector_t sector;
1654 	struct drbd_epoch_entry *e;
1655 	struct p_data *p = &mdev->data.rbuf.data;
1656 	int rw = WRITE;
1657 	u32 dp_flags;
1658 
1659 	if (!get_ldev(mdev)) {
1660 		spin_lock(&mdev->peer_seq_lock);
1661 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1662 			mdev->peer_seq++;
1663 		spin_unlock(&mdev->peer_seq_lock);
1664 
1665 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1666 		atomic_inc(&mdev->current_epoch->epoch_size);
1667 		return drbd_drain_block(mdev, data_size);
1668 	}
1669 
1670 	/* get_ldev(mdev) successful.
1671 	 * Corresponding put_ldev done either below (on various errors),
1672 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1673 	 * the end of this function. */
1674 
1675 	sector = be64_to_cpu(p->sector);
1676 	e = read_in_block(mdev, p->block_id, sector, data_size);
1677 	if (!e) {
1678 		put_ldev(mdev);
1679 		return false;
1680 	}
1681 
1682 	e->w.cb = e_end_block;
1683 
1684 	dp_flags = be32_to_cpu(p->dp_flags);
1685 	rw |= wire_flags_to_bio(mdev, dp_flags);
1686 
1687 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1688 		e->flags |= EE_MAY_SET_IN_SYNC;
1689 
1690 	spin_lock(&mdev->epoch_lock);
1691 	e->epoch = mdev->current_epoch;
1692 	atomic_inc(&e->epoch->epoch_size);
1693 	atomic_inc(&e->epoch->active);
1694 	spin_unlock(&mdev->epoch_lock);
1695 
1696 	/* I'm the receiver, I do hold a net_cnt reference. */
1697 	if (!mdev->net_conf->two_primaries) {
1698 		spin_lock_irq(&mdev->req_lock);
1699 	} else {
1700 		/* don't get the req_lock yet,
1701 		 * we may sleep in drbd_wait_peer_seq */
1702 		const int size = e->size;
1703 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1704 		DEFINE_WAIT(wait);
1705 		struct drbd_request *i;
1706 		struct hlist_node *n;
1707 		struct hlist_head *slot;
1708 		int first;
1709 
1710 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1711 		BUG_ON(mdev->ee_hash == NULL);
1712 		BUG_ON(mdev->tl_hash == NULL);
1713 
1714 		/* conflict detection and handling:
1715 		 * 1. wait on the sequence number,
1716 		 *    in case this data packet overtook ACK packets.
1717 		 * 2. check our hash tables for conflicting requests.
1718 		 *    we only need to walk the tl_hash, since an ee can not
1719 		 *    have a conflict with an other ee: on the submitting
1720 		 *    node, the corresponding req had already been conflicting,
1721 		 *    and a conflicting req is never sent.
1722 		 *
1723 		 * Note: for two_primaries, we are protocol C,
1724 		 * so there cannot be any request that is DONE
1725 		 * but still on the transfer log.
1726 		 *
1727 		 * unconditionally add to the ee_hash.
1728 		 *
1729 		 * if no conflicting request is found:
1730 		 *    submit.
1731 		 *
1732 		 * if any conflicting request is found
1733 		 * that has not yet been acked,
1734 		 * AND I have the "discard concurrent writes" flag:
1735 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1736 		 *
1737 		 * if any conflicting request is found:
1738 		 *	 block the receiver, waiting on misc_wait
1739 		 *	 until no more conflicting requests are there,
1740 		 *	 or we get interrupted (disconnect).
1741 		 *
1742 		 *	 we do not just write after local io completion of those
1743 		 *	 requests, but only after req is done completely, i.e.
1744 		 *	 we wait for the P_DISCARD_ACK to arrive!
1745 		 *
1746 		 *	 then proceed normally, i.e. submit.
1747 		 */
1748 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1749 			goto out_interrupted;
1750 
1751 		spin_lock_irq(&mdev->req_lock);
1752 
1753 		hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1754 
1755 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1756 		slot = tl_hash_slot(mdev, sector);
1757 		first = 1;
1758 		for (;;) {
1759 			int have_unacked = 0;
1760 			int have_conflict = 0;
1761 			prepare_to_wait(&mdev->misc_wait, &wait,
1762 				TASK_INTERRUPTIBLE);
1763 			hlist_for_each_entry(i, n, slot, collision) {
1764 				if (OVERLAPS) {
1765 					/* only ALERT on first iteration,
1766 					 * we may be woken up early... */
1767 					if (first)
1768 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1769 						      "	new: %llus +%u; pending: %llus +%u\n",
1770 						      current->comm, current->pid,
1771 						      (unsigned long long)sector, size,
1772 						      (unsigned long long)i->sector, i->size);
1773 					if (i->rq_state & RQ_NET_PENDING)
1774 						++have_unacked;
1775 					++have_conflict;
1776 				}
1777 			}
1778 #undef OVERLAPS
1779 			if (!have_conflict)
1780 				break;
1781 
1782 			/* Discard Ack only for the _first_ iteration */
1783 			if (first && discard && have_unacked) {
1784 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1785 				     (unsigned long long)sector);
1786 				inc_unacked(mdev);
1787 				e->w.cb = e_send_discard_ack;
1788 				list_add_tail(&e->w.list, &mdev->done_ee);
1789 
1790 				spin_unlock_irq(&mdev->req_lock);
1791 
1792 				/* we could probably send that P_DISCARD_ACK ourselves,
1793 				 * but I don't like the receiver using the msock */
1794 
1795 				put_ldev(mdev);
1796 				wake_asender(mdev);
1797 				finish_wait(&mdev->misc_wait, &wait);
1798 				return true;
1799 			}
1800 
1801 			if (signal_pending(current)) {
1802 				hlist_del_init(&e->collision);
1803 
1804 				spin_unlock_irq(&mdev->req_lock);
1805 
1806 				finish_wait(&mdev->misc_wait, &wait);
1807 				goto out_interrupted;
1808 			}
1809 
1810 			spin_unlock_irq(&mdev->req_lock);
1811 			if (first) {
1812 				first = 0;
1813 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1814 				     "sec=%llus\n", (unsigned long long)sector);
1815 			} else if (discard) {
1816 				/* we had none on the first iteration.
1817 				 * there must be none now. */
1818 				D_ASSERT(have_unacked == 0);
1819 			}
1820 			schedule();
1821 			spin_lock_irq(&mdev->req_lock);
1822 		}
1823 		finish_wait(&mdev->misc_wait, &wait);
1824 	}
1825 
1826 	list_add(&e->w.list, &mdev->active_ee);
1827 	spin_unlock_irq(&mdev->req_lock);
1828 
1829 	switch (mdev->net_conf->wire_protocol) {
1830 	case DRBD_PROT_C:
1831 		inc_unacked(mdev);
1832 		/* corresponding dec_unacked() in e_end_block()
1833 		 * respective _drbd_clear_done_ee */
1834 		break;
1835 	case DRBD_PROT_B:
1836 		/* I really don't like it that the receiver thread
1837 		 * sends on the msock, but anyways */
1838 		drbd_send_ack(mdev, P_RECV_ACK, e);
1839 		break;
1840 	case DRBD_PROT_A:
1841 		/* nothing to do */
1842 		break;
1843 	}
1844 
1845 	if (mdev->state.pdsk < D_INCONSISTENT) {
1846 		/* In case we have the only disk of the cluster, */
1847 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1848 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1849 		e->flags &= ~EE_MAY_SET_IN_SYNC;
1850 		drbd_al_begin_io(mdev, e->sector);
1851 	}
1852 
1853 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1854 		return true;
1855 
1856 	/* don't care for the reason here */
1857 	dev_err(DEV, "submit failed, triggering re-connect\n");
1858 	spin_lock_irq(&mdev->req_lock);
1859 	list_del(&e->w.list);
1860 	hlist_del_init(&e->collision);
1861 	spin_unlock_irq(&mdev->req_lock);
1862 	if (e->flags & EE_CALL_AL_COMPLETE_IO)
1863 		drbd_al_complete_io(mdev, e->sector);
1864 
1865 out_interrupted:
1866 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1867 	put_ldev(mdev);
1868 	drbd_free_ee(mdev, e);
1869 	return false;
1870 }
1871 
1872 /* We may throttle resync, if the lower device seems to be busy,
1873  * and current sync rate is above c_min_rate.
1874  *
1875  * To decide whether or not the lower device is busy, we use a scheme similar
1876  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1877  * (more than 64 sectors) of activity we cannot account for with our own resync
1878  * activity, it obviously is "busy".
1879  *
1880  * The current sync rate used here uses only the most recent two step marks,
1881  * to have a short time average so we can react faster.
1882  */
1883 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1884 {
1885 	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1886 	unsigned long db, dt, dbdt;
1887 	struct lc_element *tmp;
1888 	int curr_events;
1889 	int throttle = 0;
1890 
1891 	/* feature disabled? */
1892 	if (mdev->sync_conf.c_min_rate == 0)
1893 		return 0;
1894 
1895 	spin_lock_irq(&mdev->al_lock);
1896 	tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1897 	if (tmp) {
1898 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1899 		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1900 			spin_unlock_irq(&mdev->al_lock);
1901 			return 0;
1902 		}
1903 		/* Do not slow down if app IO is already waiting for this extent */
1904 	}
1905 	spin_unlock_irq(&mdev->al_lock);
1906 
1907 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1908 		      (int)part_stat_read(&disk->part0, sectors[1]) -
1909 			atomic_read(&mdev->rs_sect_ev);
1910 
1911 	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1912 		unsigned long rs_left;
1913 		int i;
1914 
1915 		mdev->rs_last_events = curr_events;
1916 
1917 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1918 		 * approx. */
1919 		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1920 
1921 		if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1922 			rs_left = mdev->ov_left;
1923 		else
1924 			rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1925 
1926 		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1927 		if (!dt)
1928 			dt++;
1929 		db = mdev->rs_mark_left[i] - rs_left;
1930 		dbdt = Bit2KB(db/dt);
1931 
1932 		if (dbdt > mdev->sync_conf.c_min_rate)
1933 			throttle = 1;
1934 	}
1935 	return throttle;
1936 }
1937 
1938 
1939 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1940 {
1941 	sector_t sector;
1942 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1943 	struct drbd_epoch_entry *e;
1944 	struct digest_info *di = NULL;
1945 	int size, verb;
1946 	unsigned int fault_type;
1947 	struct p_block_req *p =	&mdev->data.rbuf.block_req;
1948 
1949 	sector = be64_to_cpu(p->sector);
1950 	size   = be32_to_cpu(p->blksize);
1951 
1952 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1953 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1954 				(unsigned long long)sector, size);
1955 		return false;
1956 	}
1957 	if (sector + (size>>9) > capacity) {
1958 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1959 				(unsigned long long)sector, size);
1960 		return false;
1961 	}
1962 
1963 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1964 		verb = 1;
1965 		switch (cmd) {
1966 		case P_DATA_REQUEST:
1967 			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1968 			break;
1969 		case P_RS_DATA_REQUEST:
1970 		case P_CSUM_RS_REQUEST:
1971 		case P_OV_REQUEST:
1972 			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1973 			break;
1974 		case P_OV_REPLY:
1975 			verb = 0;
1976 			dec_rs_pending(mdev);
1977 			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1978 			break;
1979 		default:
1980 			dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1981 				cmdname(cmd));
1982 		}
1983 		if (verb && __ratelimit(&drbd_ratelimit_state))
1984 			dev_err(DEV, "Can not satisfy peer's read request, "
1985 			    "no local data.\n");
1986 
1987 		/* drain possibly payload */
1988 		return drbd_drain_block(mdev, digest_size);
1989 	}
1990 
1991 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1992 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1993 	 * which in turn might block on the other node at this very place.  */
1994 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1995 	if (!e) {
1996 		put_ldev(mdev);
1997 		return false;
1998 	}
1999 
2000 	switch (cmd) {
2001 	case P_DATA_REQUEST:
2002 		e->w.cb = w_e_end_data_req;
2003 		fault_type = DRBD_FAULT_DT_RD;
2004 		/* application IO, don't drbd_rs_begin_io */
2005 		goto submit;
2006 
2007 	case P_RS_DATA_REQUEST:
2008 		e->w.cb = w_e_end_rsdata_req;
2009 		fault_type = DRBD_FAULT_RS_RD;
2010 		/* used in the sector offset progress display */
2011 		mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2012 		break;
2013 
2014 	case P_OV_REPLY:
2015 	case P_CSUM_RS_REQUEST:
2016 		fault_type = DRBD_FAULT_RS_RD;
2017 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2018 		if (!di)
2019 			goto out_free_e;
2020 
2021 		di->digest_size = digest_size;
2022 		di->digest = (((char *)di)+sizeof(struct digest_info));
2023 
2024 		e->digest = di;
2025 		e->flags |= EE_HAS_DIGEST;
2026 
2027 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2028 			goto out_free_e;
2029 
2030 		if (cmd == P_CSUM_RS_REQUEST) {
2031 			D_ASSERT(mdev->agreed_pro_version >= 89);
2032 			e->w.cb = w_e_end_csum_rs_req;
2033 			/* used in the sector offset progress display */
2034 			mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2035 		} else if (cmd == P_OV_REPLY) {
2036 			/* track progress, we may need to throttle */
2037 			atomic_add(size >> 9, &mdev->rs_sect_in);
2038 			e->w.cb = w_e_end_ov_reply;
2039 			dec_rs_pending(mdev);
2040 			/* drbd_rs_begin_io done when we sent this request,
2041 			 * but accounting still needs to be done. */
2042 			goto submit_for_resync;
2043 		}
2044 		break;
2045 
2046 	case P_OV_REQUEST:
2047 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2048 		    mdev->agreed_pro_version >= 90) {
2049 			unsigned long now = jiffies;
2050 			int i;
2051 			mdev->ov_start_sector = sector;
2052 			mdev->ov_position = sector;
2053 			mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2054 			mdev->rs_total = mdev->ov_left;
2055 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2056 				mdev->rs_mark_left[i] = mdev->ov_left;
2057 				mdev->rs_mark_time[i] = now;
2058 			}
2059 			dev_info(DEV, "Online Verify start sector: %llu\n",
2060 					(unsigned long long)sector);
2061 		}
2062 		e->w.cb = w_e_end_ov_req;
2063 		fault_type = DRBD_FAULT_RS_RD;
2064 		break;
2065 
2066 	default:
2067 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2068 		    cmdname(cmd));
2069 		fault_type = DRBD_FAULT_MAX;
2070 		goto out_free_e;
2071 	}
2072 
2073 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2074 	 * wrt the receiver, but it is not as straightforward as it may seem.
2075 	 * Various places in the resync start and stop logic assume resync
2076 	 * requests are processed in order, requeuing this on the worker thread
2077 	 * introduces a bunch of new code for synchronization between threads.
2078 	 *
2079 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2080 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2081 	 * for application writes for the same time.  For now, just throttle
2082 	 * here, where the rest of the code expects the receiver to sleep for
2083 	 * a while, anyways.
2084 	 */
2085 
2086 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2087 	 * this defers syncer requests for some time, before letting at least
2088 	 * on request through.  The resync controller on the receiving side
2089 	 * will adapt to the incoming rate accordingly.
2090 	 *
2091 	 * We cannot throttle here if remote is Primary/SyncTarget:
2092 	 * we would also throttle its application reads.
2093 	 * In that case, throttling is done on the SyncTarget only.
2094 	 */
2095 	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2096 		schedule_timeout_uninterruptible(HZ/10);
2097 	if (drbd_rs_begin_io(mdev, sector))
2098 		goto out_free_e;
2099 
2100 submit_for_resync:
2101 	atomic_add(size >> 9, &mdev->rs_sect_ev);
2102 
2103 submit:
2104 	inc_unacked(mdev);
2105 	spin_lock_irq(&mdev->req_lock);
2106 	list_add_tail(&e->w.list, &mdev->read_ee);
2107 	spin_unlock_irq(&mdev->req_lock);
2108 
2109 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2110 		return true;
2111 
2112 	/* don't care for the reason here */
2113 	dev_err(DEV, "submit failed, triggering re-connect\n");
2114 	spin_lock_irq(&mdev->req_lock);
2115 	list_del(&e->w.list);
2116 	spin_unlock_irq(&mdev->req_lock);
2117 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2118 
2119 out_free_e:
2120 	put_ldev(mdev);
2121 	drbd_free_ee(mdev, e);
2122 	return false;
2123 }
2124 
2125 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2126 {
2127 	int self, peer, rv = -100;
2128 	unsigned long ch_self, ch_peer;
2129 
2130 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2131 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2132 
2133 	ch_peer = mdev->p_uuid[UI_SIZE];
2134 	ch_self = mdev->comm_bm_set;
2135 
2136 	switch (mdev->net_conf->after_sb_0p) {
2137 	case ASB_CONSENSUS:
2138 	case ASB_DISCARD_SECONDARY:
2139 	case ASB_CALL_HELPER:
2140 		dev_err(DEV, "Configuration error.\n");
2141 		break;
2142 	case ASB_DISCONNECT:
2143 		break;
2144 	case ASB_DISCARD_YOUNGER_PRI:
2145 		if (self == 0 && peer == 1) {
2146 			rv = -1;
2147 			break;
2148 		}
2149 		if (self == 1 && peer == 0) {
2150 			rv =  1;
2151 			break;
2152 		}
2153 		/* Else fall through to one of the other strategies... */
2154 	case ASB_DISCARD_OLDER_PRI:
2155 		if (self == 0 && peer == 1) {
2156 			rv = 1;
2157 			break;
2158 		}
2159 		if (self == 1 && peer == 0) {
2160 			rv = -1;
2161 			break;
2162 		}
2163 		/* Else fall through to one of the other strategies... */
2164 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2165 		     "Using discard-least-changes instead\n");
2166 	case ASB_DISCARD_ZERO_CHG:
2167 		if (ch_peer == 0 && ch_self == 0) {
2168 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2169 				? -1 : 1;
2170 			break;
2171 		} else {
2172 			if (ch_peer == 0) { rv =  1; break; }
2173 			if (ch_self == 0) { rv = -1; break; }
2174 		}
2175 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2176 			break;
2177 	case ASB_DISCARD_LEAST_CHG:
2178 		if	(ch_self < ch_peer)
2179 			rv = -1;
2180 		else if (ch_self > ch_peer)
2181 			rv =  1;
2182 		else /* ( ch_self == ch_peer ) */
2183 		     /* Well, then use something else. */
2184 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2185 				? -1 : 1;
2186 		break;
2187 	case ASB_DISCARD_LOCAL:
2188 		rv = -1;
2189 		break;
2190 	case ASB_DISCARD_REMOTE:
2191 		rv =  1;
2192 	}
2193 
2194 	return rv;
2195 }
2196 
2197 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2198 {
2199 	int hg, rv = -100;
2200 
2201 	switch (mdev->net_conf->after_sb_1p) {
2202 	case ASB_DISCARD_YOUNGER_PRI:
2203 	case ASB_DISCARD_OLDER_PRI:
2204 	case ASB_DISCARD_LEAST_CHG:
2205 	case ASB_DISCARD_LOCAL:
2206 	case ASB_DISCARD_REMOTE:
2207 		dev_err(DEV, "Configuration error.\n");
2208 		break;
2209 	case ASB_DISCONNECT:
2210 		break;
2211 	case ASB_CONSENSUS:
2212 		hg = drbd_asb_recover_0p(mdev);
2213 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2214 			rv = hg;
2215 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2216 			rv = hg;
2217 		break;
2218 	case ASB_VIOLENTLY:
2219 		rv = drbd_asb_recover_0p(mdev);
2220 		break;
2221 	case ASB_DISCARD_SECONDARY:
2222 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2223 	case ASB_CALL_HELPER:
2224 		hg = drbd_asb_recover_0p(mdev);
2225 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2226 			enum drbd_state_rv rv2;
2227 
2228 			drbd_set_role(mdev, R_SECONDARY, 0);
2229 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2230 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2231 			  * we do not need to wait for the after state change work either. */
2232 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2233 			if (rv2 != SS_SUCCESS) {
2234 				drbd_khelper(mdev, "pri-lost-after-sb");
2235 			} else {
2236 				dev_warn(DEV, "Successfully gave up primary role.\n");
2237 				rv = hg;
2238 			}
2239 		} else
2240 			rv = hg;
2241 	}
2242 
2243 	return rv;
2244 }
2245 
2246 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2247 {
2248 	int hg, rv = -100;
2249 
2250 	switch (mdev->net_conf->after_sb_2p) {
2251 	case ASB_DISCARD_YOUNGER_PRI:
2252 	case ASB_DISCARD_OLDER_PRI:
2253 	case ASB_DISCARD_LEAST_CHG:
2254 	case ASB_DISCARD_LOCAL:
2255 	case ASB_DISCARD_REMOTE:
2256 	case ASB_CONSENSUS:
2257 	case ASB_DISCARD_SECONDARY:
2258 		dev_err(DEV, "Configuration error.\n");
2259 		break;
2260 	case ASB_VIOLENTLY:
2261 		rv = drbd_asb_recover_0p(mdev);
2262 		break;
2263 	case ASB_DISCONNECT:
2264 		break;
2265 	case ASB_CALL_HELPER:
2266 		hg = drbd_asb_recover_0p(mdev);
2267 		if (hg == -1) {
2268 			enum drbd_state_rv rv2;
2269 
2270 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2271 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2272 			  * we do not need to wait for the after state change work either. */
2273 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2274 			if (rv2 != SS_SUCCESS) {
2275 				drbd_khelper(mdev, "pri-lost-after-sb");
2276 			} else {
2277 				dev_warn(DEV, "Successfully gave up primary role.\n");
2278 				rv = hg;
2279 			}
2280 		} else
2281 			rv = hg;
2282 	}
2283 
2284 	return rv;
2285 }
2286 
2287 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2288 			   u64 bits, u64 flags)
2289 {
2290 	if (!uuid) {
2291 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2292 		return;
2293 	}
2294 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2295 	     text,
2296 	     (unsigned long long)uuid[UI_CURRENT],
2297 	     (unsigned long long)uuid[UI_BITMAP],
2298 	     (unsigned long long)uuid[UI_HISTORY_START],
2299 	     (unsigned long long)uuid[UI_HISTORY_END],
2300 	     (unsigned long long)bits,
2301 	     (unsigned long long)flags);
2302 }
2303 
2304 /*
2305   100	after split brain try auto recover
2306     2	C_SYNC_SOURCE set BitMap
2307     1	C_SYNC_SOURCE use BitMap
2308     0	no Sync
2309    -1	C_SYNC_TARGET use BitMap
2310    -2	C_SYNC_TARGET set BitMap
2311  -100	after split brain, disconnect
2312 -1000	unrelated data
2313 -1091   requires proto 91
2314 -1096   requires proto 96
2315  */
2316 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2317 {
2318 	u64 self, peer;
2319 	int i, j;
2320 
2321 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2322 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2323 
2324 	*rule_nr = 10;
2325 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2326 		return 0;
2327 
2328 	*rule_nr = 20;
2329 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2330 	     peer != UUID_JUST_CREATED)
2331 		return -2;
2332 
2333 	*rule_nr = 30;
2334 	if (self != UUID_JUST_CREATED &&
2335 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2336 		return 2;
2337 
2338 	if (self == peer) {
2339 		int rct, dc; /* roles at crash time */
2340 
2341 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2342 
2343 			if (mdev->agreed_pro_version < 91)
2344 				return -1091;
2345 
2346 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2347 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2348 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2349 				drbd_uuid_set_bm(mdev, 0UL);
2350 
2351 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2352 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2353 				*rule_nr = 34;
2354 			} else {
2355 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2356 				*rule_nr = 36;
2357 			}
2358 
2359 			return 1;
2360 		}
2361 
2362 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2363 
2364 			if (mdev->agreed_pro_version < 91)
2365 				return -1091;
2366 
2367 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2368 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2369 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2370 
2371 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2372 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2373 				mdev->p_uuid[UI_BITMAP] = 0UL;
2374 
2375 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2376 				*rule_nr = 35;
2377 			} else {
2378 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2379 				*rule_nr = 37;
2380 			}
2381 
2382 			return -1;
2383 		}
2384 
2385 		/* Common power [off|failure] */
2386 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2387 			(mdev->p_uuid[UI_FLAGS] & 2);
2388 		/* lowest bit is set when we were primary,
2389 		 * next bit (weight 2) is set when peer was primary */
2390 		*rule_nr = 40;
2391 
2392 		switch (rct) {
2393 		case 0: /* !self_pri && !peer_pri */ return 0;
2394 		case 1: /*  self_pri && !peer_pri */ return 1;
2395 		case 2: /* !self_pri &&  peer_pri */ return -1;
2396 		case 3: /*  self_pri &&  peer_pri */
2397 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2398 			return dc ? -1 : 1;
2399 		}
2400 	}
2401 
2402 	*rule_nr = 50;
2403 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2404 	if (self == peer)
2405 		return -1;
2406 
2407 	*rule_nr = 51;
2408 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2409 	if (self == peer) {
2410 		if (mdev->agreed_pro_version < 96 ?
2411 		    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2412 		    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2413 		    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2414 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2415 			   resync as sync source modifications of the peer's UUIDs. */
2416 
2417 			if (mdev->agreed_pro_version < 91)
2418 				return -1091;
2419 
2420 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2421 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2422 
2423 			dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2424 			drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2425 
2426 			return -1;
2427 		}
2428 	}
2429 
2430 	*rule_nr = 60;
2431 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2432 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2433 		peer = mdev->p_uuid[i] & ~((u64)1);
2434 		if (self == peer)
2435 			return -2;
2436 	}
2437 
2438 	*rule_nr = 70;
2439 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2440 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2441 	if (self == peer)
2442 		return 1;
2443 
2444 	*rule_nr = 71;
2445 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2446 	if (self == peer) {
2447 		if (mdev->agreed_pro_version < 96 ?
2448 		    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2449 		    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2450 		    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2451 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2452 			   resync as sync source modifications of our UUIDs. */
2453 
2454 			if (mdev->agreed_pro_version < 91)
2455 				return -1091;
2456 
2457 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2458 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2459 
2460 			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2461 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2462 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2463 
2464 			return 1;
2465 		}
2466 	}
2467 
2468 
2469 	*rule_nr = 80;
2470 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2471 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2473 		if (self == peer)
2474 			return 2;
2475 	}
2476 
2477 	*rule_nr = 90;
2478 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2480 	if (self == peer && self != ((u64)0))
2481 		return 100;
2482 
2483 	*rule_nr = 100;
2484 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2485 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2486 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2487 			peer = mdev->p_uuid[j] & ~((u64)1);
2488 			if (self == peer)
2489 				return -100;
2490 		}
2491 	}
2492 
2493 	return -1000;
2494 }
2495 
2496 /* drbd_sync_handshake() returns the new conn state on success, or
2497    CONN_MASK (-1) on failure.
2498  */
2499 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2500 					   enum drbd_disk_state peer_disk) __must_hold(local)
2501 {
2502 	int hg, rule_nr;
2503 	enum drbd_conns rv = C_MASK;
2504 	enum drbd_disk_state mydisk;
2505 
2506 	mydisk = mdev->state.disk;
2507 	if (mydisk == D_NEGOTIATING)
2508 		mydisk = mdev->new_state_tmp.disk;
2509 
2510 	dev_info(DEV, "drbd_sync_handshake:\n");
2511 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2512 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2513 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2514 
2515 	hg = drbd_uuid_compare(mdev, &rule_nr);
2516 
2517 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2518 
2519 	if (hg == -1000) {
2520 		dev_alert(DEV, "Unrelated data, aborting!\n");
2521 		return C_MASK;
2522 	}
2523 	if (hg < -1000) {
2524 		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2525 		return C_MASK;
2526 	}
2527 
2528 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2529 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2530 		int f = (hg == -100) || abs(hg) == 2;
2531 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2532 		if (f)
2533 			hg = hg*2;
2534 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2535 		     hg > 0 ? "source" : "target");
2536 	}
2537 
2538 	if (abs(hg) == 100)
2539 		drbd_khelper(mdev, "initial-split-brain");
2540 
2541 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2542 		int pcount = (mdev->state.role == R_PRIMARY)
2543 			   + (peer_role == R_PRIMARY);
2544 		int forced = (hg == -100);
2545 
2546 		switch (pcount) {
2547 		case 0:
2548 			hg = drbd_asb_recover_0p(mdev);
2549 			break;
2550 		case 1:
2551 			hg = drbd_asb_recover_1p(mdev);
2552 			break;
2553 		case 2:
2554 			hg = drbd_asb_recover_2p(mdev);
2555 			break;
2556 		}
2557 		if (abs(hg) < 100) {
2558 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2559 			     "automatically solved. Sync from %s node\n",
2560 			     pcount, (hg < 0) ? "peer" : "this");
2561 			if (forced) {
2562 				dev_warn(DEV, "Doing a full sync, since"
2563 				     " UUIDs where ambiguous.\n");
2564 				hg = hg*2;
2565 			}
2566 		}
2567 	}
2568 
2569 	if (hg == -100) {
2570 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2571 			hg = -1;
2572 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2573 			hg = 1;
2574 
2575 		if (abs(hg) < 100)
2576 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2577 			     "Sync from %s node\n",
2578 			     (hg < 0) ? "peer" : "this");
2579 	}
2580 
2581 	if (hg == -100) {
2582 		/* FIXME this log message is not correct if we end up here
2583 		 * after an attempted attach on a diskless node.
2584 		 * We just refuse to attach -- well, we drop the "connection"
2585 		 * to that disk, in a way... */
2586 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2587 		drbd_khelper(mdev, "split-brain");
2588 		return C_MASK;
2589 	}
2590 
2591 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2592 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2593 		return C_MASK;
2594 	}
2595 
2596 	if (hg < 0 && /* by intention we do not use mydisk here. */
2597 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2598 		switch (mdev->net_conf->rr_conflict) {
2599 		case ASB_CALL_HELPER:
2600 			drbd_khelper(mdev, "pri-lost");
2601 			/* fall through */
2602 		case ASB_DISCONNECT:
2603 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2604 			return C_MASK;
2605 		case ASB_VIOLENTLY:
2606 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2607 			     "assumption\n");
2608 		}
2609 	}
2610 
2611 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2612 		if (hg == 0)
2613 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2614 		else
2615 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2616 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2617 				 abs(hg) >= 2 ? "full" : "bit-map based");
2618 		return C_MASK;
2619 	}
2620 
2621 	if (abs(hg) >= 2) {
2622 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2623 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2624 					BM_LOCKED_SET_ALLOWED))
2625 			return C_MASK;
2626 	}
2627 
2628 	if (hg > 0) { /* become sync source. */
2629 		rv = C_WF_BITMAP_S;
2630 	} else if (hg < 0) { /* become sync target */
2631 		rv = C_WF_BITMAP_T;
2632 	} else {
2633 		rv = C_CONNECTED;
2634 		if (drbd_bm_total_weight(mdev)) {
2635 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2636 			     drbd_bm_total_weight(mdev));
2637 		}
2638 	}
2639 
2640 	return rv;
2641 }
2642 
2643 /* returns 1 if invalid */
2644 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2645 {
2646 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2647 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2648 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2649 		return 0;
2650 
2651 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2652 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2653 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2654 		return 1;
2655 
2656 	/* everything else is valid if they are equal on both sides. */
2657 	if (peer == self)
2658 		return 0;
2659 
2660 	/* everything es is invalid. */
2661 	return 1;
2662 }
2663 
2664 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2665 {
2666 	struct p_protocol *p = &mdev->data.rbuf.protocol;
2667 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2668 	int p_want_lose, p_two_primaries, cf;
2669 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2670 
2671 	p_proto		= be32_to_cpu(p->protocol);
2672 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2673 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2674 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2675 	p_two_primaries = be32_to_cpu(p->two_primaries);
2676 	cf		= be32_to_cpu(p->conn_flags);
2677 	p_want_lose = cf & CF_WANT_LOSE;
2678 
2679 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2680 
2681 	if (cf & CF_DRY_RUN)
2682 		set_bit(CONN_DRY_RUN, &mdev->flags);
2683 
2684 	if (p_proto != mdev->net_conf->wire_protocol) {
2685 		dev_err(DEV, "incompatible communication protocols\n");
2686 		goto disconnect;
2687 	}
2688 
2689 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2690 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2691 		goto disconnect;
2692 	}
2693 
2694 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2695 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2696 		goto disconnect;
2697 	}
2698 
2699 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2700 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2701 		goto disconnect;
2702 	}
2703 
2704 	if (p_want_lose && mdev->net_conf->want_lose) {
2705 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2706 		goto disconnect;
2707 	}
2708 
2709 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2710 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2711 		goto disconnect;
2712 	}
2713 
2714 	if (mdev->agreed_pro_version >= 87) {
2715 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2716 
2717 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2718 			return false;
2719 
2720 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2721 		if (strcmp(p_integrity_alg, my_alg)) {
2722 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2723 			goto disconnect;
2724 		}
2725 		dev_info(DEV, "data-integrity-alg: %s\n",
2726 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2727 	}
2728 
2729 	return true;
2730 
2731 disconnect:
2732 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2733 	return false;
2734 }
2735 
2736 /* helper function
2737  * input: alg name, feature name
2738  * return: NULL (alg name was "")
2739  *         ERR_PTR(error) if something goes wrong
2740  *         or the crypto hash ptr, if it worked out ok. */
2741 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2742 		const char *alg, const char *name)
2743 {
2744 	struct crypto_hash *tfm;
2745 
2746 	if (!alg[0])
2747 		return NULL;
2748 
2749 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2750 	if (IS_ERR(tfm)) {
2751 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2752 			alg, name, PTR_ERR(tfm));
2753 		return tfm;
2754 	}
2755 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2756 		crypto_free_hash(tfm);
2757 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2758 		return ERR_PTR(-EINVAL);
2759 	}
2760 	return tfm;
2761 }
2762 
2763 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2764 {
2765 	int ok = true;
2766 	struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2767 	unsigned int header_size, data_size, exp_max_sz;
2768 	struct crypto_hash *verify_tfm = NULL;
2769 	struct crypto_hash *csums_tfm = NULL;
2770 	const int apv = mdev->agreed_pro_version;
2771 	int *rs_plan_s = NULL;
2772 	int fifo_size = 0;
2773 
2774 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2775 		    : apv == 88 ? sizeof(struct p_rs_param)
2776 					+ SHARED_SECRET_MAX
2777 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
2778 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2779 
2780 	if (packet_size > exp_max_sz) {
2781 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2782 		    packet_size, exp_max_sz);
2783 		return false;
2784 	}
2785 
2786 	if (apv <= 88) {
2787 		header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2788 		data_size   = packet_size  - header_size;
2789 	} else if (apv <= 94) {
2790 		header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2791 		data_size   = packet_size  - header_size;
2792 		D_ASSERT(data_size == 0);
2793 	} else {
2794 		header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2795 		data_size   = packet_size  - header_size;
2796 		D_ASSERT(data_size == 0);
2797 	}
2798 
2799 	/* initialize verify_alg and csums_alg */
2800 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2801 
2802 	if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2803 		return false;
2804 
2805 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2806 
2807 	if (apv >= 88) {
2808 		if (apv == 88) {
2809 			if (data_size > SHARED_SECRET_MAX) {
2810 				dev_err(DEV, "verify-alg too long, "
2811 				    "peer wants %u, accepting only %u byte\n",
2812 						data_size, SHARED_SECRET_MAX);
2813 				return false;
2814 			}
2815 
2816 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2817 				return false;
2818 
2819 			/* we expect NUL terminated string */
2820 			/* but just in case someone tries to be evil */
2821 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2822 			p->verify_alg[data_size-1] = 0;
2823 
2824 		} else /* apv >= 89 */ {
2825 			/* we still expect NUL terminated strings */
2826 			/* but just in case someone tries to be evil */
2827 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2828 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2829 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2830 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2831 		}
2832 
2833 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2834 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2835 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2836 				    mdev->sync_conf.verify_alg, p->verify_alg);
2837 				goto disconnect;
2838 			}
2839 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2840 					p->verify_alg, "verify-alg");
2841 			if (IS_ERR(verify_tfm)) {
2842 				verify_tfm = NULL;
2843 				goto disconnect;
2844 			}
2845 		}
2846 
2847 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2848 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2849 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2850 				    mdev->sync_conf.csums_alg, p->csums_alg);
2851 				goto disconnect;
2852 			}
2853 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2854 					p->csums_alg, "csums-alg");
2855 			if (IS_ERR(csums_tfm)) {
2856 				csums_tfm = NULL;
2857 				goto disconnect;
2858 			}
2859 		}
2860 
2861 		if (apv > 94) {
2862 			mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2863 			mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2864 			mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2865 			mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2866 			mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2867 
2868 			fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2869 			if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2870 				rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2871 				if (!rs_plan_s) {
2872 					dev_err(DEV, "kmalloc of fifo_buffer failed");
2873 					goto disconnect;
2874 				}
2875 			}
2876 		}
2877 
2878 		spin_lock(&mdev->peer_seq_lock);
2879 		/* lock against drbd_nl_syncer_conf() */
2880 		if (verify_tfm) {
2881 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2882 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2883 			crypto_free_hash(mdev->verify_tfm);
2884 			mdev->verify_tfm = verify_tfm;
2885 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2886 		}
2887 		if (csums_tfm) {
2888 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2889 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2890 			crypto_free_hash(mdev->csums_tfm);
2891 			mdev->csums_tfm = csums_tfm;
2892 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2893 		}
2894 		if (fifo_size != mdev->rs_plan_s.size) {
2895 			kfree(mdev->rs_plan_s.values);
2896 			mdev->rs_plan_s.values = rs_plan_s;
2897 			mdev->rs_plan_s.size   = fifo_size;
2898 			mdev->rs_planed = 0;
2899 		}
2900 		spin_unlock(&mdev->peer_seq_lock);
2901 	}
2902 
2903 	return ok;
2904 disconnect:
2905 	/* just for completeness: actually not needed,
2906 	 * as this is not reached if csums_tfm was ok. */
2907 	crypto_free_hash(csums_tfm);
2908 	/* but free the verify_tfm again, if csums_tfm did not work out */
2909 	crypto_free_hash(verify_tfm);
2910 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2911 	return false;
2912 }
2913 
2914 /* warn if the arguments differ by more than 12.5% */
2915 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2916 	const char *s, sector_t a, sector_t b)
2917 {
2918 	sector_t d;
2919 	if (a == 0 || b == 0)
2920 		return;
2921 	d = (a > b) ? (a - b) : (b - a);
2922 	if (d > (a>>3) || d > (b>>3))
2923 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2924 		     (unsigned long long)a, (unsigned long long)b);
2925 }
2926 
2927 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2928 {
2929 	struct p_sizes *p = &mdev->data.rbuf.sizes;
2930 	enum determine_dev_size dd = unchanged;
2931 	sector_t p_size, p_usize, my_usize;
2932 	int ldsc = 0; /* local disk size changed */
2933 	enum dds_flags ddsf;
2934 
2935 	p_size = be64_to_cpu(p->d_size);
2936 	p_usize = be64_to_cpu(p->u_size);
2937 
2938 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2939 		dev_err(DEV, "some backing storage is needed\n");
2940 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2941 		return false;
2942 	}
2943 
2944 	/* just store the peer's disk size for now.
2945 	 * we still need to figure out whether we accept that. */
2946 	mdev->p_size = p_size;
2947 
2948 	if (get_ldev(mdev)) {
2949 		warn_if_differ_considerably(mdev, "lower level device sizes",
2950 			   p_size, drbd_get_max_capacity(mdev->ldev));
2951 		warn_if_differ_considerably(mdev, "user requested size",
2952 					    p_usize, mdev->ldev->dc.disk_size);
2953 
2954 		/* if this is the first connect, or an otherwise expected
2955 		 * param exchange, choose the minimum */
2956 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2957 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2958 					     p_usize);
2959 
2960 		my_usize = mdev->ldev->dc.disk_size;
2961 
2962 		if (mdev->ldev->dc.disk_size != p_usize) {
2963 			mdev->ldev->dc.disk_size = p_usize;
2964 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2965 			     (unsigned long)mdev->ldev->dc.disk_size);
2966 		}
2967 
2968 		/* Never shrink a device with usable data during connect.
2969 		   But allow online shrinking if we are connected. */
2970 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2971 		   drbd_get_capacity(mdev->this_bdev) &&
2972 		   mdev->state.disk >= D_OUTDATED &&
2973 		   mdev->state.conn < C_CONNECTED) {
2974 			dev_err(DEV, "The peer's disk size is too small!\n");
2975 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2976 			mdev->ldev->dc.disk_size = my_usize;
2977 			put_ldev(mdev);
2978 			return false;
2979 		}
2980 		put_ldev(mdev);
2981 	}
2982 
2983 	ddsf = be16_to_cpu(p->dds_flags);
2984 	if (get_ldev(mdev)) {
2985 		dd = drbd_determine_dev_size(mdev, ddsf);
2986 		put_ldev(mdev);
2987 		if (dd == dev_size_error)
2988 			return false;
2989 		drbd_md_sync(mdev);
2990 	} else {
2991 		/* I am diskless, need to accept the peer's size. */
2992 		drbd_set_my_capacity(mdev, p_size);
2993 	}
2994 
2995 	mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
2996 	drbd_reconsider_max_bio_size(mdev);
2997 
2998 	if (get_ldev(mdev)) {
2999 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3000 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3001 			ldsc = 1;
3002 		}
3003 
3004 		put_ldev(mdev);
3005 	}
3006 
3007 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3008 		if (be64_to_cpu(p->c_size) !=
3009 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3010 			/* we have different sizes, probably peer
3011 			 * needs to know my new size... */
3012 			drbd_send_sizes(mdev, 0, ddsf);
3013 		}
3014 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3015 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3016 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3017 			    mdev->state.disk >= D_INCONSISTENT) {
3018 				if (ddsf & DDSF_NO_RESYNC)
3019 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3020 				else
3021 					resync_after_online_grow(mdev);
3022 			} else
3023 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3024 		}
3025 	}
3026 
3027 	return true;
3028 }
3029 
3030 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3031 {
3032 	struct p_uuids *p = &mdev->data.rbuf.uuids;
3033 	u64 *p_uuid;
3034 	int i, updated_uuids = 0;
3035 
3036 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3037 
3038 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3039 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3040 
3041 	kfree(mdev->p_uuid);
3042 	mdev->p_uuid = p_uuid;
3043 
3044 	if (mdev->state.conn < C_CONNECTED &&
3045 	    mdev->state.disk < D_INCONSISTENT &&
3046 	    mdev->state.role == R_PRIMARY &&
3047 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3048 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3049 		    (unsigned long long)mdev->ed_uuid);
3050 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3051 		return false;
3052 	}
3053 
3054 	if (get_ldev(mdev)) {
3055 		int skip_initial_sync =
3056 			mdev->state.conn == C_CONNECTED &&
3057 			mdev->agreed_pro_version >= 90 &&
3058 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3059 			(p_uuid[UI_FLAGS] & 8);
3060 		if (skip_initial_sync) {
3061 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3062 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3063 					"clear_n_write from receive_uuids",
3064 					BM_LOCKED_TEST_ALLOWED);
3065 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3066 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3067 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3068 					CS_VERBOSE, NULL);
3069 			drbd_md_sync(mdev);
3070 			updated_uuids = 1;
3071 		}
3072 		put_ldev(mdev);
3073 	} else if (mdev->state.disk < D_INCONSISTENT &&
3074 		   mdev->state.role == R_PRIMARY) {
3075 		/* I am a diskless primary, the peer just created a new current UUID
3076 		   for me. */
3077 		updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3078 	}
3079 
3080 	/* Before we test for the disk state, we should wait until an eventually
3081 	   ongoing cluster wide state change is finished. That is important if
3082 	   we are primary and are detaching from our disk. We need to see the
3083 	   new disk state... */
3084 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3085 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3086 		updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3087 
3088 	if (updated_uuids)
3089 		drbd_print_uuids(mdev, "receiver updated UUIDs to");
3090 
3091 	return true;
3092 }
3093 
3094 /**
3095  * convert_state() - Converts the peer's view of the cluster state to our point of view
3096  * @ps:		The state as seen by the peer.
3097  */
3098 static union drbd_state convert_state(union drbd_state ps)
3099 {
3100 	union drbd_state ms;
3101 
3102 	static enum drbd_conns c_tab[] = {
3103 		[C_CONNECTED] = C_CONNECTED,
3104 
3105 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3106 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3107 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3108 		[C_VERIFY_S]       = C_VERIFY_T,
3109 		[C_MASK]   = C_MASK,
3110 	};
3111 
3112 	ms.i = ps.i;
3113 
3114 	ms.conn = c_tab[ps.conn];
3115 	ms.peer = ps.role;
3116 	ms.role = ps.peer;
3117 	ms.pdsk = ps.disk;
3118 	ms.disk = ps.pdsk;
3119 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3120 
3121 	return ms;
3122 }
3123 
3124 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3125 {
3126 	struct p_req_state *p = &mdev->data.rbuf.req_state;
3127 	union drbd_state mask, val;
3128 	enum drbd_state_rv rv;
3129 
3130 	mask.i = be32_to_cpu(p->mask);
3131 	val.i = be32_to_cpu(p->val);
3132 
3133 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3134 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3135 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3136 		return true;
3137 	}
3138 
3139 	mask = convert_state(mask);
3140 	val = convert_state(val);
3141 
3142 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3143 
3144 	drbd_send_sr_reply(mdev, rv);
3145 	drbd_md_sync(mdev);
3146 
3147 	return true;
3148 }
3149 
3150 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3151 {
3152 	struct p_state *p = &mdev->data.rbuf.state;
3153 	union drbd_state os, ns, peer_state;
3154 	enum drbd_disk_state real_peer_disk;
3155 	enum chg_state_flags cs_flags;
3156 	int rv;
3157 
3158 	peer_state.i = be32_to_cpu(p->state);
3159 
3160 	real_peer_disk = peer_state.disk;
3161 	if (peer_state.disk == D_NEGOTIATING) {
3162 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3163 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3164 	}
3165 
3166 	spin_lock_irq(&mdev->req_lock);
3167  retry:
3168 	os = ns = mdev->state;
3169 	spin_unlock_irq(&mdev->req_lock);
3170 
3171 	/* peer says his disk is uptodate, while we think it is inconsistent,
3172 	 * and this happens while we think we have a sync going on. */
3173 	if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3174 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3175 		/* If we are (becoming) SyncSource, but peer is still in sync
3176 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3177 		 * will change to inconsistent once the peer reaches active
3178 		 * syncing states.
3179 		 * It may have changed syncer-paused flags, however, so we
3180 		 * cannot ignore this completely. */
3181 		if (peer_state.conn > C_CONNECTED &&
3182 		    peer_state.conn < C_SYNC_SOURCE)
3183 			real_peer_disk = D_INCONSISTENT;
3184 
3185 		/* if peer_state changes to connected at the same time,
3186 		 * it explicitly notifies us that it finished resync.
3187 		 * Maybe we should finish it up, too? */
3188 		else if (os.conn >= C_SYNC_SOURCE &&
3189 			 peer_state.conn == C_CONNECTED) {
3190 			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3191 				drbd_resync_finished(mdev);
3192 			return true;
3193 		}
3194 	}
3195 
3196 	/* peer says his disk is inconsistent, while we think it is uptodate,
3197 	 * and this happens while the peer still thinks we have a sync going on,
3198 	 * but we think we are already done with the sync.
3199 	 * We ignore this to avoid flapping pdsk.
3200 	 * This should not happen, if the peer is a recent version of drbd. */
3201 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3202 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3203 		real_peer_disk = D_UP_TO_DATE;
3204 
3205 	if (ns.conn == C_WF_REPORT_PARAMS)
3206 		ns.conn = C_CONNECTED;
3207 
3208 	if (peer_state.conn == C_AHEAD)
3209 		ns.conn = C_BEHIND;
3210 
3211 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3212 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3213 		int cr; /* consider resync */
3214 
3215 		/* if we established a new connection */
3216 		cr  = (os.conn < C_CONNECTED);
3217 		/* if we had an established connection
3218 		 * and one of the nodes newly attaches a disk */
3219 		cr |= (os.conn == C_CONNECTED &&
3220 		       (peer_state.disk == D_NEGOTIATING ||
3221 			os.disk == D_NEGOTIATING));
3222 		/* if we have both been inconsistent, and the peer has been
3223 		 * forced to be UpToDate with --overwrite-data */
3224 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3225 		/* if we had been plain connected, and the admin requested to
3226 		 * start a sync by "invalidate" or "invalidate-remote" */
3227 		cr |= (os.conn == C_CONNECTED &&
3228 				(peer_state.conn >= C_STARTING_SYNC_S &&
3229 				 peer_state.conn <= C_WF_BITMAP_T));
3230 
3231 		if (cr)
3232 			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3233 
3234 		put_ldev(mdev);
3235 		if (ns.conn == C_MASK) {
3236 			ns.conn = C_CONNECTED;
3237 			if (mdev->state.disk == D_NEGOTIATING) {
3238 				drbd_force_state(mdev, NS(disk, D_FAILED));
3239 			} else if (peer_state.disk == D_NEGOTIATING) {
3240 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3241 				peer_state.disk = D_DISKLESS;
3242 				real_peer_disk = D_DISKLESS;
3243 			} else {
3244 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3245 					return false;
3246 				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3247 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3248 				return false;
3249 			}
3250 		}
3251 	}
3252 
3253 	spin_lock_irq(&mdev->req_lock);
3254 	if (mdev->state.i != os.i)
3255 		goto retry;
3256 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3257 	ns.peer = peer_state.role;
3258 	ns.pdsk = real_peer_disk;
3259 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3260 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3261 		ns.disk = mdev->new_state_tmp.disk;
3262 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3263 	if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3264 	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3265 		/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3266 		   for temporal network outages! */
3267 		spin_unlock_irq(&mdev->req_lock);
3268 		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3269 		tl_clear(mdev);
3270 		drbd_uuid_new_current(mdev);
3271 		clear_bit(NEW_CUR_UUID, &mdev->flags);
3272 		drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3273 		return false;
3274 	}
3275 	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3276 	ns = mdev->state;
3277 	spin_unlock_irq(&mdev->req_lock);
3278 
3279 	if (rv < SS_SUCCESS) {
3280 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3281 		return false;
3282 	}
3283 
3284 	if (os.conn > C_WF_REPORT_PARAMS) {
3285 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3286 		    peer_state.disk != D_NEGOTIATING ) {
3287 			/* we want resync, peer has not yet decided to sync... */
3288 			/* Nowadays only used when forcing a node into primary role and
3289 			   setting its disk to UpToDate with that */
3290 			drbd_send_uuids(mdev);
3291 			drbd_send_state(mdev);
3292 		}
3293 	}
3294 
3295 	mdev->net_conf->want_lose = 0;
3296 
3297 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3298 
3299 	return true;
3300 }
3301 
3302 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3303 {
3304 	struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3305 
3306 	wait_event(mdev->misc_wait,
3307 		   mdev->state.conn == C_WF_SYNC_UUID ||
3308 		   mdev->state.conn == C_BEHIND ||
3309 		   mdev->state.conn < C_CONNECTED ||
3310 		   mdev->state.disk < D_NEGOTIATING);
3311 
3312 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3313 
3314 	/* Here the _drbd_uuid_ functions are right, current should
3315 	   _not_ be rotated into the history */
3316 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3317 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3318 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3319 
3320 		drbd_print_uuids(mdev, "updated sync uuid");
3321 		drbd_start_resync(mdev, C_SYNC_TARGET);
3322 
3323 		put_ldev(mdev);
3324 	} else
3325 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3326 
3327 	return true;
3328 }
3329 
3330 /**
3331  * receive_bitmap_plain
3332  *
3333  * Return 0 when done, 1 when another iteration is needed, and a negative error
3334  * code upon failure.
3335  */
3336 static int
3337 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3338 		     unsigned long *buffer, struct bm_xfer_ctx *c)
3339 {
3340 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3341 	unsigned want = num_words * sizeof(long);
3342 	int err;
3343 
3344 	if (want != data_size) {
3345 		dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3346 		return -EIO;
3347 	}
3348 	if (want == 0)
3349 		return 0;
3350 	err = drbd_recv(mdev, buffer, want);
3351 	if (err != want) {
3352 		if (err >= 0)
3353 			err = -EIO;
3354 		return err;
3355 	}
3356 
3357 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3358 
3359 	c->word_offset += num_words;
3360 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3361 	if (c->bit_offset > c->bm_bits)
3362 		c->bit_offset = c->bm_bits;
3363 
3364 	return 1;
3365 }
3366 
3367 /**
3368  * recv_bm_rle_bits
3369  *
3370  * Return 0 when done, 1 when another iteration is needed, and a negative error
3371  * code upon failure.
3372  */
3373 static int
3374 recv_bm_rle_bits(struct drbd_conf *mdev,
3375 		struct p_compressed_bm *p,
3376 		struct bm_xfer_ctx *c)
3377 {
3378 	struct bitstream bs;
3379 	u64 look_ahead;
3380 	u64 rl;
3381 	u64 tmp;
3382 	unsigned long s = c->bit_offset;
3383 	unsigned long e;
3384 	int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3385 	int toggle = DCBP_get_start(p);
3386 	int have;
3387 	int bits;
3388 
3389 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3390 
3391 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3392 	if (bits < 0)
3393 		return -EIO;
3394 
3395 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3396 		bits = vli_decode_bits(&rl, look_ahead);
3397 		if (bits <= 0)
3398 			return -EIO;
3399 
3400 		if (toggle) {
3401 			e = s + rl -1;
3402 			if (e >= c->bm_bits) {
3403 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3404 				return -EIO;
3405 			}
3406 			_drbd_bm_set_bits(mdev, s, e);
3407 		}
3408 
3409 		if (have < bits) {
3410 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3411 				have, bits, look_ahead,
3412 				(unsigned int)(bs.cur.b - p->code),
3413 				(unsigned int)bs.buf_len);
3414 			return -EIO;
3415 		}
3416 		look_ahead >>= bits;
3417 		have -= bits;
3418 
3419 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3420 		if (bits < 0)
3421 			return -EIO;
3422 		look_ahead |= tmp << have;
3423 		have += bits;
3424 	}
3425 
3426 	c->bit_offset = s;
3427 	bm_xfer_ctx_bit_to_word_offset(c);
3428 
3429 	return (s != c->bm_bits);
3430 }
3431 
3432 /**
3433  * decode_bitmap_c
3434  *
3435  * Return 0 when done, 1 when another iteration is needed, and a negative error
3436  * code upon failure.
3437  */
3438 static int
3439 decode_bitmap_c(struct drbd_conf *mdev,
3440 		struct p_compressed_bm *p,
3441 		struct bm_xfer_ctx *c)
3442 {
3443 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3444 		return recv_bm_rle_bits(mdev, p, c);
3445 
3446 	/* other variants had been implemented for evaluation,
3447 	 * but have been dropped as this one turned out to be "best"
3448 	 * during all our tests. */
3449 
3450 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3451 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3452 	return -EIO;
3453 }
3454 
3455 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3456 		const char *direction, struct bm_xfer_ctx *c)
3457 {
3458 	/* what would it take to transfer it "plaintext" */
3459 	unsigned plain = sizeof(struct p_header80) *
3460 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3461 		+ c->bm_words * sizeof(long);
3462 	unsigned total = c->bytes[0] + c->bytes[1];
3463 	unsigned r;
3464 
3465 	/* total can not be zero. but just in case: */
3466 	if (total == 0)
3467 		return;
3468 
3469 	/* don't report if not compressed */
3470 	if (total >= plain)
3471 		return;
3472 
3473 	/* total < plain. check for overflow, still */
3474 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3475 		                    : (1000 * total / plain);
3476 
3477 	if (r > 1000)
3478 		r = 1000;
3479 
3480 	r = 1000 - r;
3481 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3482 	     "total %u; compression: %u.%u%%\n",
3483 			direction,
3484 			c->bytes[1], c->packets[1],
3485 			c->bytes[0], c->packets[0],
3486 			total, r/10, r % 10);
3487 }
3488 
3489 /* Since we are processing the bitfield from lower addresses to higher,
3490    it does not matter if the process it in 32 bit chunks or 64 bit
3491    chunks as long as it is little endian. (Understand it as byte stream,
3492    beginning with the lowest byte...) If we would use big endian
3493    we would need to process it from the highest address to the lowest,
3494    in order to be agnostic to the 32 vs 64 bits issue.
3495 
3496    returns 0 on failure, 1 if we successfully received it. */
3497 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3498 {
3499 	struct bm_xfer_ctx c;
3500 	void *buffer;
3501 	int err;
3502 	int ok = false;
3503 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
3504 
3505 	drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3506 	/* you are supposed to send additional out-of-sync information
3507 	 * if you actually set bits during this phase */
3508 
3509 	/* maybe we should use some per thread scratch page,
3510 	 * and allocate that during initial device creation? */
3511 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3512 	if (!buffer) {
3513 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3514 		goto out;
3515 	}
3516 
3517 	c = (struct bm_xfer_ctx) {
3518 		.bm_bits = drbd_bm_bits(mdev),
3519 		.bm_words = drbd_bm_words(mdev),
3520 	};
3521 
3522 	for(;;) {
3523 		if (cmd == P_BITMAP) {
3524 			err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3525 		} else if (cmd == P_COMPRESSED_BITMAP) {
3526 			/* MAYBE: sanity check that we speak proto >= 90,
3527 			 * and the feature is enabled! */
3528 			struct p_compressed_bm *p;
3529 
3530 			if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3531 				dev_err(DEV, "ReportCBitmap packet too large\n");
3532 				goto out;
3533 			}
3534 			/* use the page buff */
3535 			p = buffer;
3536 			memcpy(p, h, sizeof(*h));
3537 			if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3538 				goto out;
3539 			if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3540 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3541 				goto out;
3542 			}
3543 			err = decode_bitmap_c(mdev, p, &c);
3544 		} else {
3545 			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3546 			goto out;
3547 		}
3548 
3549 		c.packets[cmd == P_BITMAP]++;
3550 		c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3551 
3552 		if (err <= 0) {
3553 			if (err < 0)
3554 				goto out;
3555 			break;
3556 		}
3557 		if (!drbd_recv_header(mdev, &cmd, &data_size))
3558 			goto out;
3559 	}
3560 
3561 	INFO_bm_xfer_stats(mdev, "receive", &c);
3562 
3563 	if (mdev->state.conn == C_WF_BITMAP_T) {
3564 		enum drbd_state_rv rv;
3565 
3566 		ok = !drbd_send_bitmap(mdev);
3567 		if (!ok)
3568 			goto out;
3569 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3570 		rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3571 		D_ASSERT(rv == SS_SUCCESS);
3572 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3573 		/* admin may have requested C_DISCONNECTING,
3574 		 * other threads may have noticed network errors */
3575 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3576 		    drbd_conn_str(mdev->state.conn));
3577 	}
3578 
3579 	ok = true;
3580  out:
3581 	drbd_bm_unlock(mdev);
3582 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3583 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3584 	free_page((unsigned long) buffer);
3585 	return ok;
3586 }
3587 
3588 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3589 {
3590 	/* TODO zero copy sink :) */
3591 	static char sink[128];
3592 	int size, want, r;
3593 
3594 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3595 		 cmd, data_size);
3596 
3597 	size = data_size;
3598 	while (size > 0) {
3599 		want = min_t(int, size, sizeof(sink));
3600 		r = drbd_recv(mdev, sink, want);
3601 		ERR_IF(r <= 0) break;
3602 		size -= r;
3603 	}
3604 	return size == 0;
3605 }
3606 
3607 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3608 {
3609 	/* Make sure we've acked all the TCP data associated
3610 	 * with the data requests being unplugged */
3611 	drbd_tcp_quickack(mdev->data.socket);
3612 
3613 	return true;
3614 }
3615 
3616 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3617 {
3618 	struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3619 
3620 	switch (mdev->state.conn) {
3621 	case C_WF_SYNC_UUID:
3622 	case C_WF_BITMAP_T:
3623 	case C_BEHIND:
3624 			break;
3625 	default:
3626 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3627 				drbd_conn_str(mdev->state.conn));
3628 	}
3629 
3630 	drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3631 
3632 	return true;
3633 }
3634 
3635 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3636 
3637 struct data_cmd {
3638 	int expect_payload;
3639 	size_t pkt_size;
3640 	drbd_cmd_handler_f function;
3641 };
3642 
3643 static struct data_cmd drbd_cmd_handler[] = {
3644 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
3645 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
3646 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3647 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3648 	[P_BITMAP]	    = { 1, sizeof(struct p_header80), receive_bitmap } ,
3649 	[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3650 	[P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3651 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3652 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3653 	[P_SYNC_PARAM]	    = { 1, sizeof(struct p_header80), receive_SyncParam },
3654 	[P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3655 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3656 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
3657 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
3658 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
3659 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3660 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3661 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3662 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3663 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3664 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3665 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3666 	/* anything missing from this table is in
3667 	 * the asender_tbl, see get_asender_cmd */
3668 	[P_MAX_CMD]	    = { 0, 0, NULL },
3669 };
3670 
3671 /* All handler functions that expect a sub-header get that sub-heder in
3672    mdev->data.rbuf.header.head.payload.
3673 
3674    Usually in mdev->data.rbuf.header.head the callback can find the usual
3675    p_header, but they may not rely on that. Since there is also p_header95 !
3676  */
3677 
3678 static void drbdd(struct drbd_conf *mdev)
3679 {
3680 	union p_header *header = &mdev->data.rbuf.header;
3681 	unsigned int packet_size;
3682 	enum drbd_packets cmd;
3683 	size_t shs; /* sub header size */
3684 	int rv;
3685 
3686 	while (get_t_state(&mdev->receiver) == Running) {
3687 		drbd_thread_current_set_cpu(mdev);
3688 		if (!drbd_recv_header(mdev, &cmd, &packet_size))
3689 			goto err_out;
3690 
3691 		if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3692 			dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3693 			goto err_out;
3694 		}
3695 
3696 		shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3697 		if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3698 			dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3699 			goto err_out;
3700 		}
3701 
3702 		if (shs) {
3703 			rv = drbd_recv(mdev, &header->h80.payload, shs);
3704 			if (unlikely(rv != shs)) {
3705 				if (!signal_pending(current))
3706 					dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3707 				goto err_out;
3708 			}
3709 		}
3710 
3711 		rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3712 
3713 		if (unlikely(!rv)) {
3714 			dev_err(DEV, "error receiving %s, l: %d!\n",
3715 			    cmdname(cmd), packet_size);
3716 			goto err_out;
3717 		}
3718 	}
3719 
3720 	if (0) {
3721 	err_out:
3722 		drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3723 	}
3724 	/* If we leave here, we probably want to update at least the
3725 	 * "Connected" indicator on stable storage. Do so explicitly here. */
3726 	drbd_md_sync(mdev);
3727 }
3728 
3729 void drbd_flush_workqueue(struct drbd_conf *mdev)
3730 {
3731 	struct drbd_wq_barrier barr;
3732 
3733 	barr.w.cb = w_prev_work_done;
3734 	init_completion(&barr.done);
3735 	drbd_queue_work(&mdev->data.work, &barr.w);
3736 	wait_for_completion(&barr.done);
3737 }
3738 
3739 void drbd_free_tl_hash(struct drbd_conf *mdev)
3740 {
3741 	struct hlist_head *h;
3742 
3743 	spin_lock_irq(&mdev->req_lock);
3744 
3745 	if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3746 		spin_unlock_irq(&mdev->req_lock);
3747 		return;
3748 	}
3749 	/* paranoia code */
3750 	for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3751 		if (h->first)
3752 			dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3753 				(int)(h - mdev->ee_hash), h->first);
3754 	kfree(mdev->ee_hash);
3755 	mdev->ee_hash = NULL;
3756 	mdev->ee_hash_s = 0;
3757 
3758 	/* paranoia code */
3759 	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3760 		if (h->first)
3761 			dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3762 				(int)(h - mdev->tl_hash), h->first);
3763 	kfree(mdev->tl_hash);
3764 	mdev->tl_hash = NULL;
3765 	mdev->tl_hash_s = 0;
3766 	spin_unlock_irq(&mdev->req_lock);
3767 }
3768 
3769 static void drbd_disconnect(struct drbd_conf *mdev)
3770 {
3771 	enum drbd_fencing_p fp;
3772 	union drbd_state os, ns;
3773 	int rv = SS_UNKNOWN_ERROR;
3774 	unsigned int i;
3775 
3776 	if (mdev->state.conn == C_STANDALONE)
3777 		return;
3778 
3779 	/* asender does not clean up anything. it must not interfere, either */
3780 	drbd_thread_stop(&mdev->asender);
3781 	drbd_free_sock(mdev);
3782 
3783 	/* wait for current activity to cease. */
3784 	spin_lock_irq(&mdev->req_lock);
3785 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3786 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3787 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3788 	spin_unlock_irq(&mdev->req_lock);
3789 
3790 	/* We do not have data structures that would allow us to
3791 	 * get the rs_pending_cnt down to 0 again.
3792 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3793 	 *    the pending RSDataRequest's we have sent.
3794 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3795 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3796 	 *  And no, it is not the sum of the reference counts in the
3797 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3798 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3799 	 *  on the fly. */
3800 	drbd_rs_cancel_all(mdev);
3801 	mdev->rs_total = 0;
3802 	mdev->rs_failed = 0;
3803 	atomic_set(&mdev->rs_pending_cnt, 0);
3804 	wake_up(&mdev->misc_wait);
3805 
3806 	del_timer(&mdev->request_timer);
3807 
3808 	/* make sure syncer is stopped and w_resume_next_sg queued */
3809 	del_timer_sync(&mdev->resync_timer);
3810 	resync_timer_fn((unsigned long)mdev);
3811 
3812 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3813 	 * w_make_resync_request etc. which may still be on the worker queue
3814 	 * to be "canceled" */
3815 	drbd_flush_workqueue(mdev);
3816 
3817 	/* This also does reclaim_net_ee().  If we do this too early, we might
3818 	 * miss some resync ee and pages.*/
3819 	drbd_process_done_ee(mdev);
3820 
3821 	kfree(mdev->p_uuid);
3822 	mdev->p_uuid = NULL;
3823 
3824 	if (!is_susp(mdev->state))
3825 		tl_clear(mdev);
3826 
3827 	dev_info(DEV, "Connection closed\n");
3828 
3829 	drbd_md_sync(mdev);
3830 
3831 	fp = FP_DONT_CARE;
3832 	if (get_ldev(mdev)) {
3833 		fp = mdev->ldev->dc.fencing;
3834 		put_ldev(mdev);
3835 	}
3836 
3837 	if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3838 		drbd_try_outdate_peer_async(mdev);
3839 
3840 	spin_lock_irq(&mdev->req_lock);
3841 	os = mdev->state;
3842 	if (os.conn >= C_UNCONNECTED) {
3843 		/* Do not restart in case we are C_DISCONNECTING */
3844 		ns = os;
3845 		ns.conn = C_UNCONNECTED;
3846 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3847 	}
3848 	spin_unlock_irq(&mdev->req_lock);
3849 
3850 	if (os.conn == C_DISCONNECTING) {
3851 		wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3852 
3853 		crypto_free_hash(mdev->cram_hmac_tfm);
3854 		mdev->cram_hmac_tfm = NULL;
3855 
3856 		kfree(mdev->net_conf);
3857 		mdev->net_conf = NULL;
3858 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3859 	}
3860 
3861 	/* serialize with bitmap writeout triggered by the state change,
3862 	 * if any. */
3863 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3864 
3865 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3866 	 * want to use SO_LINGER, because apparently it can be deferred for
3867 	 * more than 20 seconds (longest time I checked).
3868 	 *
3869 	 * Actually we don't care for exactly when the network stack does its
3870 	 * put_page(), but release our reference on these pages right here.
3871 	 */
3872 	i = drbd_release_ee(mdev, &mdev->net_ee);
3873 	if (i)
3874 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3875 	i = atomic_read(&mdev->pp_in_use_by_net);
3876 	if (i)
3877 		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3878 	i = atomic_read(&mdev->pp_in_use);
3879 	if (i)
3880 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3881 
3882 	D_ASSERT(list_empty(&mdev->read_ee));
3883 	D_ASSERT(list_empty(&mdev->active_ee));
3884 	D_ASSERT(list_empty(&mdev->sync_ee));
3885 	D_ASSERT(list_empty(&mdev->done_ee));
3886 
3887 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3888 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3889 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3890 }
3891 
3892 /*
3893  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3894  * we can agree on is stored in agreed_pro_version.
3895  *
3896  * feature flags and the reserved array should be enough room for future
3897  * enhancements of the handshake protocol, and possible plugins...
3898  *
3899  * for now, they are expected to be zero, but ignored.
3900  */
3901 static int drbd_send_handshake(struct drbd_conf *mdev)
3902 {
3903 	/* ASSERT current == mdev->receiver ... */
3904 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3905 	int ok;
3906 
3907 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3908 		dev_err(DEV, "interrupted during initial handshake\n");
3909 		return 0; /* interrupted. not ok. */
3910 	}
3911 
3912 	if (mdev->data.socket == NULL) {
3913 		mutex_unlock(&mdev->data.mutex);
3914 		return 0;
3915 	}
3916 
3917 	memset(p, 0, sizeof(*p));
3918 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3919 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3920 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3921 			     (struct p_header80 *)p, sizeof(*p), 0 );
3922 	mutex_unlock(&mdev->data.mutex);
3923 	return ok;
3924 }
3925 
3926 /*
3927  * return values:
3928  *   1 yes, we have a valid connection
3929  *   0 oops, did not work out, please try again
3930  *  -1 peer talks different language,
3931  *     no point in trying again, please go standalone.
3932  */
3933 static int drbd_do_handshake(struct drbd_conf *mdev)
3934 {
3935 	/* ASSERT current == mdev->receiver ... */
3936 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3937 	const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3938 	unsigned int length;
3939 	enum drbd_packets cmd;
3940 	int rv;
3941 
3942 	rv = drbd_send_handshake(mdev);
3943 	if (!rv)
3944 		return 0;
3945 
3946 	rv = drbd_recv_header(mdev, &cmd, &length);
3947 	if (!rv)
3948 		return 0;
3949 
3950 	if (cmd != P_HAND_SHAKE) {
3951 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3952 		     cmdname(cmd), cmd);
3953 		return -1;
3954 	}
3955 
3956 	if (length != expect) {
3957 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3958 		     expect, length);
3959 		return -1;
3960 	}
3961 
3962 	rv = drbd_recv(mdev, &p->head.payload, expect);
3963 
3964 	if (rv != expect) {
3965 		if (!signal_pending(current))
3966 			dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
3967 		return 0;
3968 	}
3969 
3970 	p->protocol_min = be32_to_cpu(p->protocol_min);
3971 	p->protocol_max = be32_to_cpu(p->protocol_max);
3972 	if (p->protocol_max == 0)
3973 		p->protocol_max = p->protocol_min;
3974 
3975 	if (PRO_VERSION_MAX < p->protocol_min ||
3976 	    PRO_VERSION_MIN > p->protocol_max)
3977 		goto incompat;
3978 
3979 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3980 
3981 	dev_info(DEV, "Handshake successful: "
3982 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3983 
3984 	return 1;
3985 
3986  incompat:
3987 	dev_err(DEV, "incompatible DRBD dialects: "
3988 	    "I support %d-%d, peer supports %d-%d\n",
3989 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3990 	    p->protocol_min, p->protocol_max);
3991 	return -1;
3992 }
3993 
3994 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3995 static int drbd_do_auth(struct drbd_conf *mdev)
3996 {
3997 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3998 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3999 	return -1;
4000 }
4001 #else
4002 #define CHALLENGE_LEN 64
4003 
4004 /* Return value:
4005 	1 - auth succeeded,
4006 	0 - failed, try again (network error),
4007 	-1 - auth failed, don't try again.
4008 */
4009 
4010 static int drbd_do_auth(struct drbd_conf *mdev)
4011 {
4012 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4013 	struct scatterlist sg;
4014 	char *response = NULL;
4015 	char *right_response = NULL;
4016 	char *peers_ch = NULL;
4017 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4018 	unsigned int resp_size;
4019 	struct hash_desc desc;
4020 	enum drbd_packets cmd;
4021 	unsigned int length;
4022 	int rv;
4023 
4024 	desc.tfm = mdev->cram_hmac_tfm;
4025 	desc.flags = 0;
4026 
4027 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4028 				(u8 *)mdev->net_conf->shared_secret, key_len);
4029 	if (rv) {
4030 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4031 		rv = -1;
4032 		goto fail;
4033 	}
4034 
4035 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4036 
4037 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4038 	if (!rv)
4039 		goto fail;
4040 
4041 	rv = drbd_recv_header(mdev, &cmd, &length);
4042 	if (!rv)
4043 		goto fail;
4044 
4045 	if (cmd != P_AUTH_CHALLENGE) {
4046 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4047 		    cmdname(cmd), cmd);
4048 		rv = 0;
4049 		goto fail;
4050 	}
4051 
4052 	if (length > CHALLENGE_LEN * 2) {
4053 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4054 		rv = -1;
4055 		goto fail;
4056 	}
4057 
4058 	peers_ch = kmalloc(length, GFP_NOIO);
4059 	if (peers_ch == NULL) {
4060 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4061 		rv = -1;
4062 		goto fail;
4063 	}
4064 
4065 	rv = drbd_recv(mdev, peers_ch, length);
4066 
4067 	if (rv != length) {
4068 		if (!signal_pending(current))
4069 			dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4070 		rv = 0;
4071 		goto fail;
4072 	}
4073 
4074 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4075 	response = kmalloc(resp_size, GFP_NOIO);
4076 	if (response == NULL) {
4077 		dev_err(DEV, "kmalloc of response failed\n");
4078 		rv = -1;
4079 		goto fail;
4080 	}
4081 
4082 	sg_init_table(&sg, 1);
4083 	sg_set_buf(&sg, peers_ch, length);
4084 
4085 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4086 	if (rv) {
4087 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4088 		rv = -1;
4089 		goto fail;
4090 	}
4091 
4092 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4093 	if (!rv)
4094 		goto fail;
4095 
4096 	rv = drbd_recv_header(mdev, &cmd, &length);
4097 	if (!rv)
4098 		goto fail;
4099 
4100 	if (cmd != P_AUTH_RESPONSE) {
4101 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4102 			cmdname(cmd), cmd);
4103 		rv = 0;
4104 		goto fail;
4105 	}
4106 
4107 	if (length != resp_size) {
4108 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4109 		rv = 0;
4110 		goto fail;
4111 	}
4112 
4113 	rv = drbd_recv(mdev, response , resp_size);
4114 
4115 	if (rv != resp_size) {
4116 		if (!signal_pending(current))
4117 			dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4118 		rv = 0;
4119 		goto fail;
4120 	}
4121 
4122 	right_response = kmalloc(resp_size, GFP_NOIO);
4123 	if (right_response == NULL) {
4124 		dev_err(DEV, "kmalloc of right_response failed\n");
4125 		rv = -1;
4126 		goto fail;
4127 	}
4128 
4129 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4130 
4131 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4132 	if (rv) {
4133 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4134 		rv = -1;
4135 		goto fail;
4136 	}
4137 
4138 	rv = !memcmp(response, right_response, resp_size);
4139 
4140 	if (rv)
4141 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4142 		     resp_size, mdev->net_conf->cram_hmac_alg);
4143 	else
4144 		rv = -1;
4145 
4146  fail:
4147 	kfree(peers_ch);
4148 	kfree(response);
4149 	kfree(right_response);
4150 
4151 	return rv;
4152 }
4153 #endif
4154 
4155 int drbdd_init(struct drbd_thread *thi)
4156 {
4157 	struct drbd_conf *mdev = thi->mdev;
4158 	unsigned int minor = mdev_to_minor(mdev);
4159 	int h;
4160 
4161 	sprintf(current->comm, "drbd%d_receiver", minor);
4162 
4163 	dev_info(DEV, "receiver (re)started\n");
4164 
4165 	do {
4166 		h = drbd_connect(mdev);
4167 		if (h == 0) {
4168 			drbd_disconnect(mdev);
4169 			schedule_timeout_interruptible(HZ);
4170 		}
4171 		if (h == -1) {
4172 			dev_warn(DEV, "Discarding network configuration.\n");
4173 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4174 		}
4175 	} while (h == 0);
4176 
4177 	if (h > 0) {
4178 		if (get_net_conf(mdev)) {
4179 			drbdd(mdev);
4180 			put_net_conf(mdev);
4181 		}
4182 	}
4183 
4184 	drbd_disconnect(mdev);
4185 
4186 	dev_info(DEV, "receiver terminated\n");
4187 	return 0;
4188 }
4189 
4190 /* ********* acknowledge sender ******** */
4191 
4192 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4193 {
4194 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4195 
4196 	int retcode = be32_to_cpu(p->retcode);
4197 
4198 	if (retcode >= SS_SUCCESS) {
4199 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4200 	} else {
4201 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4202 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4203 		    drbd_set_st_err_str(retcode), retcode);
4204 	}
4205 	wake_up(&mdev->state_wait);
4206 
4207 	return true;
4208 }
4209 
4210 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4211 {
4212 	return drbd_send_ping_ack(mdev);
4213 
4214 }
4215 
4216 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4217 {
4218 	/* restore idle timeout */
4219 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4220 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4221 		wake_up(&mdev->misc_wait);
4222 
4223 	return true;
4224 }
4225 
4226 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4227 {
4228 	struct p_block_ack *p = (struct p_block_ack *)h;
4229 	sector_t sector = be64_to_cpu(p->sector);
4230 	int blksize = be32_to_cpu(p->blksize);
4231 
4232 	D_ASSERT(mdev->agreed_pro_version >= 89);
4233 
4234 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4235 
4236 	if (get_ldev(mdev)) {
4237 		drbd_rs_complete_io(mdev, sector);
4238 		drbd_set_in_sync(mdev, sector, blksize);
4239 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4240 		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4241 		put_ldev(mdev);
4242 	}
4243 	dec_rs_pending(mdev);
4244 	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4245 
4246 	return true;
4247 }
4248 
4249 /* when we receive the ACK for a write request,
4250  * verify that we actually know about it */
4251 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4252 	u64 id, sector_t sector)
4253 {
4254 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4255 	struct hlist_node *n;
4256 	struct drbd_request *req;
4257 
4258 	hlist_for_each_entry(req, n, slot, collision) {
4259 		if ((unsigned long)req == (unsigned long)id) {
4260 			if (req->sector != sector) {
4261 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4262 				    "wrong sector (%llus versus %llus)\n", req,
4263 				    (unsigned long long)req->sector,
4264 				    (unsigned long long)sector);
4265 				break;
4266 			}
4267 			return req;
4268 		}
4269 	}
4270 	return NULL;
4271 }
4272 
4273 typedef struct drbd_request *(req_validator_fn)
4274 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4275 
4276 static int validate_req_change_req_state(struct drbd_conf *mdev,
4277 	u64 id, sector_t sector, req_validator_fn validator,
4278 	const char *func, enum drbd_req_event what)
4279 {
4280 	struct drbd_request *req;
4281 	struct bio_and_error m;
4282 
4283 	spin_lock_irq(&mdev->req_lock);
4284 	req = validator(mdev, id, sector);
4285 	if (unlikely(!req)) {
4286 		spin_unlock_irq(&mdev->req_lock);
4287 
4288 		dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4289 			(void *)(unsigned long)id, (unsigned long long)sector);
4290 		return false;
4291 	}
4292 	__req_mod(req, what, &m);
4293 	spin_unlock_irq(&mdev->req_lock);
4294 
4295 	if (m.bio)
4296 		complete_master_bio(mdev, &m);
4297 	return true;
4298 }
4299 
4300 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4301 {
4302 	struct p_block_ack *p = (struct p_block_ack *)h;
4303 	sector_t sector = be64_to_cpu(p->sector);
4304 	int blksize = be32_to_cpu(p->blksize);
4305 	enum drbd_req_event what;
4306 
4307 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4308 
4309 	if (is_syncer_block_id(p->block_id)) {
4310 		drbd_set_in_sync(mdev, sector, blksize);
4311 		dec_rs_pending(mdev);
4312 		return true;
4313 	}
4314 	switch (be16_to_cpu(h->command)) {
4315 	case P_RS_WRITE_ACK:
4316 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4317 		what = write_acked_by_peer_and_sis;
4318 		break;
4319 	case P_WRITE_ACK:
4320 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4321 		what = write_acked_by_peer;
4322 		break;
4323 	case P_RECV_ACK:
4324 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4325 		what = recv_acked_by_peer;
4326 		break;
4327 	case P_DISCARD_ACK:
4328 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4329 		what = conflict_discarded_by_peer;
4330 		break;
4331 	default:
4332 		D_ASSERT(0);
4333 		return false;
4334 	}
4335 
4336 	return validate_req_change_req_state(mdev, p->block_id, sector,
4337 		_ack_id_to_req, __func__ , what);
4338 }
4339 
4340 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4341 {
4342 	struct p_block_ack *p = (struct p_block_ack *)h;
4343 	sector_t sector = be64_to_cpu(p->sector);
4344 	int size = be32_to_cpu(p->blksize);
4345 	struct drbd_request *req;
4346 	struct bio_and_error m;
4347 
4348 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4349 
4350 	if (is_syncer_block_id(p->block_id)) {
4351 		dec_rs_pending(mdev);
4352 		drbd_rs_failed_io(mdev, sector, size);
4353 		return true;
4354 	}
4355 
4356 	spin_lock_irq(&mdev->req_lock);
4357 	req = _ack_id_to_req(mdev, p->block_id, sector);
4358 	if (!req) {
4359 		spin_unlock_irq(&mdev->req_lock);
4360 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4361 		    mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4362 			/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4363 			   The master bio might already be completed, therefore the
4364 			   request is no longer in the collision hash.
4365 			   => Do not try to validate block_id as request. */
4366 			/* In Protocol B we might already have got a P_RECV_ACK
4367 			   but then get a P_NEG_ACK after wards. */
4368 			drbd_set_out_of_sync(mdev, sector, size);
4369 			return true;
4370 		} else {
4371 			dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4372 				(void *)(unsigned long)p->block_id, (unsigned long long)sector);
4373 			return false;
4374 		}
4375 	}
4376 	__req_mod(req, neg_acked, &m);
4377 	spin_unlock_irq(&mdev->req_lock);
4378 
4379 	if (m.bio)
4380 		complete_master_bio(mdev, &m);
4381 	return true;
4382 }
4383 
4384 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4385 {
4386 	struct p_block_ack *p = (struct p_block_ack *)h;
4387 	sector_t sector = be64_to_cpu(p->sector);
4388 
4389 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4390 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4391 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4392 
4393 	return validate_req_change_req_state(mdev, p->block_id, sector,
4394 		_ar_id_to_req, __func__ , neg_acked);
4395 }
4396 
4397 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4398 {
4399 	sector_t sector;
4400 	int size;
4401 	struct p_block_ack *p = (struct p_block_ack *)h;
4402 
4403 	sector = be64_to_cpu(p->sector);
4404 	size = be32_to_cpu(p->blksize);
4405 
4406 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4407 
4408 	dec_rs_pending(mdev);
4409 
4410 	if (get_ldev_if_state(mdev, D_FAILED)) {
4411 		drbd_rs_complete_io(mdev, sector);
4412 		switch (be16_to_cpu(h->command)) {
4413 		case P_NEG_RS_DREPLY:
4414 			drbd_rs_failed_io(mdev, sector, size);
4415 		case P_RS_CANCEL:
4416 			break;
4417 		default:
4418 			D_ASSERT(0);
4419 			put_ldev(mdev);
4420 			return false;
4421 		}
4422 		put_ldev(mdev);
4423 	}
4424 
4425 	return true;
4426 }
4427 
4428 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4429 {
4430 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4431 
4432 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4433 
4434 	if (mdev->state.conn == C_AHEAD &&
4435 	    atomic_read(&mdev->ap_in_flight) == 0 &&
4436 	    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4437 		mdev->start_resync_timer.expires = jiffies + HZ;
4438 		add_timer(&mdev->start_resync_timer);
4439 	}
4440 
4441 	return true;
4442 }
4443 
4444 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4445 {
4446 	struct p_block_ack *p = (struct p_block_ack *)h;
4447 	struct drbd_work *w;
4448 	sector_t sector;
4449 	int size;
4450 
4451 	sector = be64_to_cpu(p->sector);
4452 	size = be32_to_cpu(p->blksize);
4453 
4454 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4455 
4456 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4457 		drbd_ov_oos_found(mdev, sector, size);
4458 	else
4459 		ov_oos_print(mdev);
4460 
4461 	if (!get_ldev(mdev))
4462 		return true;
4463 
4464 	drbd_rs_complete_io(mdev, sector);
4465 	dec_rs_pending(mdev);
4466 
4467 	--mdev->ov_left;
4468 
4469 	/* let's advance progress step marks only for every other megabyte */
4470 	if ((mdev->ov_left & 0x200) == 0x200)
4471 		drbd_advance_rs_marks(mdev, mdev->ov_left);
4472 
4473 	if (mdev->ov_left == 0) {
4474 		w = kmalloc(sizeof(*w), GFP_NOIO);
4475 		if (w) {
4476 			w->cb = w_ov_finished;
4477 			drbd_queue_work_front(&mdev->data.work, w);
4478 		} else {
4479 			dev_err(DEV, "kmalloc(w) failed.");
4480 			ov_oos_print(mdev);
4481 			drbd_resync_finished(mdev);
4482 		}
4483 	}
4484 	put_ldev(mdev);
4485 	return true;
4486 }
4487 
4488 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4489 {
4490 	return true;
4491 }
4492 
4493 struct asender_cmd {
4494 	size_t pkt_size;
4495 	int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4496 };
4497 
4498 static struct asender_cmd *get_asender_cmd(int cmd)
4499 {
4500 	static struct asender_cmd asender_tbl[] = {
4501 		/* anything missing from this table is in
4502 		 * the drbd_cmd_handler (drbd_default_handler) table,
4503 		 * see the beginning of drbdd() */
4504 	[P_PING]	    = { sizeof(struct p_header80), got_Ping },
4505 	[P_PING_ACK]	    = { sizeof(struct p_header80), got_PingAck },
4506 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4507 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4508 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4509 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4510 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4511 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4512 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4513 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4514 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4515 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4516 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4517 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4518 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4519 	[P_MAX_CMD]	    = { 0, NULL },
4520 	};
4521 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4522 		return NULL;
4523 	return &asender_tbl[cmd];
4524 }
4525 
4526 int drbd_asender(struct drbd_thread *thi)
4527 {
4528 	struct drbd_conf *mdev = thi->mdev;
4529 	struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4530 	struct asender_cmd *cmd = NULL;
4531 
4532 	int rv, len;
4533 	void *buf    = h;
4534 	int received = 0;
4535 	int expect   = sizeof(struct p_header80);
4536 	int empty;
4537 	int ping_timeout_active = 0;
4538 
4539 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4540 
4541 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4542 	current->rt_priority = 2;    /* more important than all other tasks */
4543 
4544 	while (get_t_state(thi) == Running) {
4545 		drbd_thread_current_set_cpu(mdev);
4546 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4547 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4548 			mdev->meta.socket->sk->sk_rcvtimeo =
4549 				mdev->net_conf->ping_timeo*HZ/10;
4550 			ping_timeout_active = 1;
4551 		}
4552 
4553 		/* conditionally cork;
4554 		 * it may hurt latency if we cork without much to send */
4555 		if (!mdev->net_conf->no_cork &&
4556 			3 < atomic_read(&mdev->unacked_cnt))
4557 			drbd_tcp_cork(mdev->meta.socket);
4558 		while (1) {
4559 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4560 			flush_signals(current);
4561 			if (!drbd_process_done_ee(mdev))
4562 				goto reconnect;
4563 			/* to avoid race with newly queued ACKs */
4564 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4565 			spin_lock_irq(&mdev->req_lock);
4566 			empty = list_empty(&mdev->done_ee);
4567 			spin_unlock_irq(&mdev->req_lock);
4568 			/* new ack may have been queued right here,
4569 			 * but then there is also a signal pending,
4570 			 * and we start over... */
4571 			if (empty)
4572 				break;
4573 		}
4574 		/* but unconditionally uncork unless disabled */
4575 		if (!mdev->net_conf->no_cork)
4576 			drbd_tcp_uncork(mdev->meta.socket);
4577 
4578 		/* short circuit, recv_msg would return EINTR anyways. */
4579 		if (signal_pending(current))
4580 			continue;
4581 
4582 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4583 				     buf, expect-received, 0);
4584 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4585 
4586 		flush_signals(current);
4587 
4588 		/* Note:
4589 		 * -EINTR	 (on meta) we got a signal
4590 		 * -EAGAIN	 (on meta) rcvtimeo expired
4591 		 * -ECONNRESET	 other side closed the connection
4592 		 * -ERESTARTSYS  (on data) we got a signal
4593 		 * rv <  0	 other than above: unexpected error!
4594 		 * rv == expected: full header or command
4595 		 * rv <  expected: "woken" by signal during receive
4596 		 * rv == 0	 : "connection shut down by peer"
4597 		 */
4598 		if (likely(rv > 0)) {
4599 			received += rv;
4600 			buf	 += rv;
4601 		} else if (rv == 0) {
4602 			dev_err(DEV, "meta connection shut down by peer.\n");
4603 			goto reconnect;
4604 		} else if (rv == -EAGAIN) {
4605 			/* If the data socket received something meanwhile,
4606 			 * that is good enough: peer is still alive. */
4607 			if (time_after(mdev->last_received,
4608 				jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4609 				continue;
4610 			if (ping_timeout_active) {
4611 				dev_err(DEV, "PingAck did not arrive in time.\n");
4612 				goto reconnect;
4613 			}
4614 			set_bit(SEND_PING, &mdev->flags);
4615 			continue;
4616 		} else if (rv == -EINTR) {
4617 			continue;
4618 		} else {
4619 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4620 			goto reconnect;
4621 		}
4622 
4623 		if (received == expect && cmd == NULL) {
4624 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4625 				dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4626 				    be32_to_cpu(h->magic),
4627 				    be16_to_cpu(h->command),
4628 				    be16_to_cpu(h->length));
4629 				goto reconnect;
4630 			}
4631 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4632 			len = be16_to_cpu(h->length);
4633 			if (unlikely(cmd == NULL)) {
4634 				dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4635 				    be32_to_cpu(h->magic),
4636 				    be16_to_cpu(h->command),
4637 				    be16_to_cpu(h->length));
4638 				goto disconnect;
4639 			}
4640 			expect = cmd->pkt_size;
4641 			ERR_IF(len != expect-sizeof(struct p_header80))
4642 				goto reconnect;
4643 		}
4644 		if (received == expect) {
4645 			mdev->last_received = jiffies;
4646 			D_ASSERT(cmd != NULL);
4647 			if (!cmd->process(mdev, h))
4648 				goto reconnect;
4649 
4650 			/* the idle_timeout (ping-int)
4651 			 * has been restored in got_PingAck() */
4652 			if (cmd == get_asender_cmd(P_PING_ACK))
4653 				ping_timeout_active = 0;
4654 
4655 			buf	 = h;
4656 			received = 0;
4657 			expect	 = sizeof(struct p_header80);
4658 			cmd	 = NULL;
4659 		}
4660 	}
4661 
4662 	if (0) {
4663 reconnect:
4664 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4665 		drbd_md_sync(mdev);
4666 	}
4667 	if (0) {
4668 disconnect:
4669 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4670 		drbd_md_sync(mdev);
4671 	}
4672 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4673 
4674 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4675 	dev_info(DEV, "asender terminated\n");
4676 
4677 	return 0;
4678 }
4679