xref: /linux/drivers/block/drbd/drbd_receiver.c (revision b34bce45530ca897aea35915e0e42eb3c8047b52)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/mm.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_req.h"
50 
51 #include "drbd_vli.h"
52 
53 struct flush_work {
54 	struct drbd_work w;
55 	struct drbd_epoch *epoch;
56 };
57 
58 enum finish_epoch {
59 	FE_STILL_LIVE,
60 	FE_DESTROYED,
61 	FE_RECYCLED,
62 };
63 
64 static int drbd_do_handshake(struct drbd_conf *mdev);
65 static int drbd_do_auth(struct drbd_conf *mdev);
66 
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69 
70 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71 {
72 	struct drbd_epoch *prev;
73 	spin_lock(&mdev->epoch_lock);
74 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 	if (prev == epoch || prev == mdev->current_epoch)
76 		prev = NULL;
77 	spin_unlock(&mdev->epoch_lock);
78 	return prev;
79 }
80 
81 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
82 
83 /*
84  * some helper functions to deal with single linked page lists,
85  * page->private being our "next" pointer.
86  */
87 
88 /* If at least n pages are linked at head, get n pages off.
89  * Otherwise, don't modify head, and return NULL.
90  * Locking is the responsibility of the caller.
91  */
92 static struct page *page_chain_del(struct page **head, int n)
93 {
94 	struct page *page;
95 	struct page *tmp;
96 
97 	BUG_ON(!n);
98 	BUG_ON(!head);
99 
100 	page = *head;
101 
102 	if (!page)
103 		return NULL;
104 
105 	while (page) {
106 		tmp = page_chain_next(page);
107 		if (--n == 0)
108 			break; /* found sufficient pages */
109 		if (tmp == NULL)
110 			/* insufficient pages, don't use any of them. */
111 			return NULL;
112 		page = tmp;
113 	}
114 
115 	/* add end of list marker for the returned list */
116 	set_page_private(page, 0);
117 	/* actual return value, and adjustment of head */
118 	page = *head;
119 	*head = tmp;
120 	return page;
121 }
122 
123 /* may be used outside of locks to find the tail of a (usually short)
124  * "private" page chain, before adding it back to a global chain head
125  * with page_chain_add() under a spinlock. */
126 static struct page *page_chain_tail(struct page *page, int *len)
127 {
128 	struct page *tmp;
129 	int i = 1;
130 	while ((tmp = page_chain_next(page)))
131 		++i, page = tmp;
132 	if (len)
133 		*len = i;
134 	return page;
135 }
136 
137 static int page_chain_free(struct page *page)
138 {
139 	struct page *tmp;
140 	int i = 0;
141 	page_chain_for_each_safe(page, tmp) {
142 		put_page(page);
143 		++i;
144 	}
145 	return i;
146 }
147 
148 static void page_chain_add(struct page **head,
149 		struct page *chain_first, struct page *chain_last)
150 {
151 #if 1
152 	struct page *tmp;
153 	tmp = page_chain_tail(chain_first, NULL);
154 	BUG_ON(tmp != chain_last);
155 #endif
156 
157 	/* add chain to head */
158 	set_page_private(chain_last, (unsigned long)*head);
159 	*head = chain_first;
160 }
161 
162 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
163 {
164 	struct page *page = NULL;
165 	struct page *tmp = NULL;
166 	int i = 0;
167 
168 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
169 	 * So what. It saves a spin_lock. */
170 	if (drbd_pp_vacant >= number) {
171 		spin_lock(&drbd_pp_lock);
172 		page = page_chain_del(&drbd_pp_pool, number);
173 		if (page)
174 			drbd_pp_vacant -= number;
175 		spin_unlock(&drbd_pp_lock);
176 		if (page)
177 			return page;
178 	}
179 
180 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
181 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
182 	 * which in turn might block on the other node at this very place.  */
183 	for (i = 0; i < number; i++) {
184 		tmp = alloc_page(GFP_TRY);
185 		if (!tmp)
186 			break;
187 		set_page_private(tmp, (unsigned long)page);
188 		page = tmp;
189 	}
190 
191 	if (i == number)
192 		return page;
193 
194 	/* Not enough pages immediately available this time.
195 	 * No need to jump around here, drbd_pp_alloc will retry this
196 	 * function "soon". */
197 	if (page) {
198 		tmp = page_chain_tail(page, NULL);
199 		spin_lock(&drbd_pp_lock);
200 		page_chain_add(&drbd_pp_pool, page, tmp);
201 		drbd_pp_vacant += i;
202 		spin_unlock(&drbd_pp_lock);
203 	}
204 	return NULL;
205 }
206 
207 /* kick lower level device, if we have more than (arbitrary number)
208  * reference counts on it, which typically are locally submitted io
209  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
210 static void maybe_kick_lo(struct drbd_conf *mdev)
211 {
212 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
213 		drbd_kick_lo(mdev);
214 }
215 
216 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
217 {
218 	struct drbd_epoch_entry *e;
219 	struct list_head *le, *tle;
220 
221 	/* The EEs are always appended to the end of the list. Since
222 	   they are sent in order over the wire, they have to finish
223 	   in order. As soon as we see the first not finished we can
224 	   stop to examine the list... */
225 
226 	list_for_each_safe(le, tle, &mdev->net_ee) {
227 		e = list_entry(le, struct drbd_epoch_entry, w.list);
228 		if (drbd_ee_has_active_page(e))
229 			break;
230 		list_move(le, to_be_freed);
231 	}
232 }
233 
234 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
235 {
236 	LIST_HEAD(reclaimed);
237 	struct drbd_epoch_entry *e, *t;
238 
239 	maybe_kick_lo(mdev);
240 	spin_lock_irq(&mdev->req_lock);
241 	reclaim_net_ee(mdev, &reclaimed);
242 	spin_unlock_irq(&mdev->req_lock);
243 
244 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
245 		drbd_free_ee(mdev, e);
246 }
247 
248 /**
249  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
250  * @mdev:	DRBD device.
251  * @number:	number of pages requested
252  * @retry:	whether to retry, if not enough pages are available right now
253  *
254  * Tries to allocate number pages, first from our own page pool, then from
255  * the kernel, unless this allocation would exceed the max_buffers setting.
256  * Possibly retry until DRBD frees sufficient pages somewhere else.
257  *
258  * Returns a page chain linked via page->private.
259  */
260 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
261 {
262 	struct page *page = NULL;
263 	DEFINE_WAIT(wait);
264 
265 	/* Yes, we may run up to @number over max_buffers. If we
266 	 * follow it strictly, the admin will get it wrong anyways. */
267 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
268 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
269 
270 	while (page == NULL) {
271 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
272 
273 		drbd_kick_lo_and_reclaim_net(mdev);
274 
275 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
276 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
277 			if (page)
278 				break;
279 		}
280 
281 		if (!retry)
282 			break;
283 
284 		if (signal_pending(current)) {
285 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
286 			break;
287 		}
288 
289 		schedule();
290 	}
291 	finish_wait(&drbd_pp_wait, &wait);
292 
293 	if (page)
294 		atomic_add(number, &mdev->pp_in_use);
295 	return page;
296 }
297 
298 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
299  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
300  * Either links the page chain back to the global pool,
301  * or returns all pages to the system. */
302 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
303 {
304 	int i;
305 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
306 		i = page_chain_free(page);
307 	else {
308 		struct page *tmp;
309 		tmp = page_chain_tail(page, &i);
310 		spin_lock(&drbd_pp_lock);
311 		page_chain_add(&drbd_pp_pool, page, tmp);
312 		drbd_pp_vacant += i;
313 		spin_unlock(&drbd_pp_lock);
314 	}
315 	atomic_sub(i, &mdev->pp_in_use);
316 	i = atomic_read(&mdev->pp_in_use);
317 	if (i < 0)
318 		dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
319 	wake_up(&drbd_pp_wait);
320 }
321 
322 /*
323 You need to hold the req_lock:
324  _drbd_wait_ee_list_empty()
325 
326 You must not have the req_lock:
327  drbd_free_ee()
328  drbd_alloc_ee()
329  drbd_init_ee()
330  drbd_release_ee()
331  drbd_ee_fix_bhs()
332  drbd_process_done_ee()
333  drbd_clear_done_ee()
334  drbd_wait_ee_list_empty()
335 */
336 
337 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
338 				     u64 id,
339 				     sector_t sector,
340 				     unsigned int data_size,
341 				     gfp_t gfp_mask) __must_hold(local)
342 {
343 	struct drbd_epoch_entry *e;
344 	struct page *page;
345 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
346 
347 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
348 		return NULL;
349 
350 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
351 	if (!e) {
352 		if (!(gfp_mask & __GFP_NOWARN))
353 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
354 		return NULL;
355 	}
356 
357 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
358 	if (!page)
359 		goto fail;
360 
361 	INIT_HLIST_NODE(&e->colision);
362 	e->epoch = NULL;
363 	e->mdev = mdev;
364 	e->pages = page;
365 	atomic_set(&e->pending_bios, 0);
366 	e->size = data_size;
367 	e->flags = 0;
368 	e->sector = sector;
369 	e->sector = sector;
370 	e->block_id = id;
371 
372 	return e;
373 
374  fail:
375 	mempool_free(e, drbd_ee_mempool);
376 	return NULL;
377 }
378 
379 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
380 {
381 	drbd_pp_free(mdev, e->pages);
382 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
383 	D_ASSERT(hlist_unhashed(&e->colision));
384 	mempool_free(e, drbd_ee_mempool);
385 }
386 
387 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
388 {
389 	LIST_HEAD(work_list);
390 	struct drbd_epoch_entry *e, *t;
391 	int count = 0;
392 
393 	spin_lock_irq(&mdev->req_lock);
394 	list_splice_init(list, &work_list);
395 	spin_unlock_irq(&mdev->req_lock);
396 
397 	list_for_each_entry_safe(e, t, &work_list, w.list) {
398 		drbd_free_ee(mdev, e);
399 		count++;
400 	}
401 	return count;
402 }
403 
404 
405 /*
406  * This function is called from _asender only_
407  * but see also comments in _req_mod(,barrier_acked)
408  * and receive_Barrier.
409  *
410  * Move entries from net_ee to done_ee, if ready.
411  * Grab done_ee, call all callbacks, free the entries.
412  * The callbacks typically send out ACKs.
413  */
414 static int drbd_process_done_ee(struct drbd_conf *mdev)
415 {
416 	LIST_HEAD(work_list);
417 	LIST_HEAD(reclaimed);
418 	struct drbd_epoch_entry *e, *t;
419 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
420 
421 	spin_lock_irq(&mdev->req_lock);
422 	reclaim_net_ee(mdev, &reclaimed);
423 	list_splice_init(&mdev->done_ee, &work_list);
424 	spin_unlock_irq(&mdev->req_lock);
425 
426 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
427 		drbd_free_ee(mdev, e);
428 
429 	/* possible callbacks here:
430 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
431 	 * all ignore the last argument.
432 	 */
433 	list_for_each_entry_safe(e, t, &work_list, w.list) {
434 		/* list_del not necessary, next/prev members not touched */
435 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
436 		drbd_free_ee(mdev, e);
437 	}
438 	wake_up(&mdev->ee_wait);
439 
440 	return ok;
441 }
442 
443 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444 {
445 	DEFINE_WAIT(wait);
446 
447 	/* avoids spin_lock/unlock
448 	 * and calling prepare_to_wait in the fast path */
449 	while (!list_empty(head)) {
450 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
451 		spin_unlock_irq(&mdev->req_lock);
452 		drbd_kick_lo(mdev);
453 		schedule();
454 		finish_wait(&mdev->ee_wait, &wait);
455 		spin_lock_irq(&mdev->req_lock);
456 	}
457 }
458 
459 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
460 {
461 	spin_lock_irq(&mdev->req_lock);
462 	_drbd_wait_ee_list_empty(mdev, head);
463 	spin_unlock_irq(&mdev->req_lock);
464 }
465 
466 /* see also kernel_accept; which is only present since 2.6.18.
467  * also we want to log which part of it failed, exactly */
468 static int drbd_accept(struct drbd_conf *mdev, const char **what,
469 		struct socket *sock, struct socket **newsock)
470 {
471 	struct sock *sk = sock->sk;
472 	int err = 0;
473 
474 	*what = "listen";
475 	err = sock->ops->listen(sock, 5);
476 	if (err < 0)
477 		goto out;
478 
479 	*what = "sock_create_lite";
480 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
481 			       newsock);
482 	if (err < 0)
483 		goto out;
484 
485 	*what = "accept";
486 	err = sock->ops->accept(sock, *newsock, 0);
487 	if (err < 0) {
488 		sock_release(*newsock);
489 		*newsock = NULL;
490 		goto out;
491 	}
492 	(*newsock)->ops  = sock->ops;
493 
494 out:
495 	return err;
496 }
497 
498 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
499 		    void *buf, size_t size, int flags)
500 {
501 	mm_segment_t oldfs;
502 	struct kvec iov = {
503 		.iov_base = buf,
504 		.iov_len = size,
505 	};
506 	struct msghdr msg = {
507 		.msg_iovlen = 1,
508 		.msg_iov = (struct iovec *)&iov,
509 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
510 	};
511 	int rv;
512 
513 	oldfs = get_fs();
514 	set_fs(KERNEL_DS);
515 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
516 	set_fs(oldfs);
517 
518 	return rv;
519 }
520 
521 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
522 {
523 	mm_segment_t oldfs;
524 	struct kvec iov = {
525 		.iov_base = buf,
526 		.iov_len = size,
527 	};
528 	struct msghdr msg = {
529 		.msg_iovlen = 1,
530 		.msg_iov = (struct iovec *)&iov,
531 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
532 	};
533 	int rv;
534 
535 	oldfs = get_fs();
536 	set_fs(KERNEL_DS);
537 
538 	for (;;) {
539 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
540 		if (rv == size)
541 			break;
542 
543 		/* Note:
544 		 * ECONNRESET	other side closed the connection
545 		 * ERESTARTSYS	(on  sock) we got a signal
546 		 */
547 
548 		if (rv < 0) {
549 			if (rv == -ECONNRESET)
550 				dev_info(DEV, "sock was reset by peer\n");
551 			else if (rv != -ERESTARTSYS)
552 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
553 			break;
554 		} else if (rv == 0) {
555 			dev_info(DEV, "sock was shut down by peer\n");
556 			break;
557 		} else	{
558 			/* signal came in, or peer/link went down,
559 			 * after we read a partial message
560 			 */
561 			/* D_ASSERT(signal_pending(current)); */
562 			break;
563 		}
564 	};
565 
566 	set_fs(oldfs);
567 
568 	if (rv != size)
569 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
570 
571 	return rv;
572 }
573 
574 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
575 {
576 	const char *what;
577 	struct socket *sock;
578 	struct sockaddr_in6 src_in6;
579 	int err;
580 	int disconnect_on_error = 1;
581 
582 	if (!get_net_conf(mdev))
583 		return NULL;
584 
585 	what = "sock_create_kern";
586 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
587 		SOCK_STREAM, IPPROTO_TCP, &sock);
588 	if (err < 0) {
589 		sock = NULL;
590 		goto out;
591 	}
592 
593 	sock->sk->sk_rcvtimeo =
594 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
595 
596        /* explicitly bind to the configured IP as source IP
597 	*  for the outgoing connections.
598 	*  This is needed for multihomed hosts and to be
599 	*  able to use lo: interfaces for drbd.
600 	* Make sure to use 0 as port number, so linux selects
601 	*  a free one dynamically.
602 	*/
603 	memcpy(&src_in6, mdev->net_conf->my_addr,
604 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
605 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
606 		src_in6.sin6_port = 0;
607 	else
608 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
609 
610 	what = "bind before connect";
611 	err = sock->ops->bind(sock,
612 			      (struct sockaddr *) &src_in6,
613 			      mdev->net_conf->my_addr_len);
614 	if (err < 0)
615 		goto out;
616 
617 	/* connect may fail, peer not yet available.
618 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
619 	disconnect_on_error = 0;
620 	what = "connect";
621 	err = sock->ops->connect(sock,
622 				 (struct sockaddr *)mdev->net_conf->peer_addr,
623 				 mdev->net_conf->peer_addr_len, 0);
624 
625 out:
626 	if (err < 0) {
627 		if (sock) {
628 			sock_release(sock);
629 			sock = NULL;
630 		}
631 		switch (-err) {
632 			/* timeout, busy, signal pending */
633 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
634 		case EINTR: case ERESTARTSYS:
635 			/* peer not (yet) available, network problem */
636 		case ECONNREFUSED: case ENETUNREACH:
637 		case EHOSTDOWN:    case EHOSTUNREACH:
638 			disconnect_on_error = 0;
639 			break;
640 		default:
641 			dev_err(DEV, "%s failed, err = %d\n", what, err);
642 		}
643 		if (disconnect_on_error)
644 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
645 	}
646 	put_net_conf(mdev);
647 	return sock;
648 }
649 
650 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
651 {
652 	int timeo, err;
653 	struct socket *s_estab = NULL, *s_listen;
654 	const char *what;
655 
656 	if (!get_net_conf(mdev))
657 		return NULL;
658 
659 	what = "sock_create_kern";
660 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
661 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
662 	if (err) {
663 		s_listen = NULL;
664 		goto out;
665 	}
666 
667 	timeo = mdev->net_conf->try_connect_int * HZ;
668 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
669 
670 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
671 	s_listen->sk->sk_rcvtimeo = timeo;
672 	s_listen->sk->sk_sndtimeo = timeo;
673 
674 	what = "bind before listen";
675 	err = s_listen->ops->bind(s_listen,
676 			      (struct sockaddr *) mdev->net_conf->my_addr,
677 			      mdev->net_conf->my_addr_len);
678 	if (err < 0)
679 		goto out;
680 
681 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
682 
683 out:
684 	if (s_listen)
685 		sock_release(s_listen);
686 	if (err < 0) {
687 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
688 			dev_err(DEV, "%s failed, err = %d\n", what, err);
689 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
690 		}
691 	}
692 	put_net_conf(mdev);
693 
694 	return s_estab;
695 }
696 
697 static int drbd_send_fp(struct drbd_conf *mdev,
698 	struct socket *sock, enum drbd_packets cmd)
699 {
700 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
701 
702 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
703 }
704 
705 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
706 {
707 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
708 	int rr;
709 
710 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
711 
712 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
713 		return be16_to_cpu(h->command);
714 
715 	return 0xffff;
716 }
717 
718 /**
719  * drbd_socket_okay() - Free the socket if its connection is not okay
720  * @mdev:	DRBD device.
721  * @sock:	pointer to the pointer to the socket.
722  */
723 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
724 {
725 	int rr;
726 	char tb[4];
727 
728 	if (!*sock)
729 		return FALSE;
730 
731 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
732 
733 	if (rr > 0 || rr == -EAGAIN) {
734 		return TRUE;
735 	} else {
736 		sock_release(*sock);
737 		*sock = NULL;
738 		return FALSE;
739 	}
740 }
741 
742 /*
743  * return values:
744  *   1 yes, we have a valid connection
745  *   0 oops, did not work out, please try again
746  *  -1 peer talks different language,
747  *     no point in trying again, please go standalone.
748  *  -2 We do not have a network config...
749  */
750 static int drbd_connect(struct drbd_conf *mdev)
751 {
752 	struct socket *s, *sock, *msock;
753 	int try, h, ok;
754 
755 	D_ASSERT(!mdev->data.socket);
756 
757 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
758 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
759 
760 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
761 		return -2;
762 
763 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
764 
765 	sock  = NULL;
766 	msock = NULL;
767 
768 	do {
769 		for (try = 0;;) {
770 			/* 3 tries, this should take less than a second! */
771 			s = drbd_try_connect(mdev);
772 			if (s || ++try >= 3)
773 				break;
774 			/* give the other side time to call bind() & listen() */
775 			__set_current_state(TASK_INTERRUPTIBLE);
776 			schedule_timeout(HZ / 10);
777 		}
778 
779 		if (s) {
780 			if (!sock) {
781 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
782 				sock = s;
783 				s = NULL;
784 			} else if (!msock) {
785 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
786 				msock = s;
787 				s = NULL;
788 			} else {
789 				dev_err(DEV, "Logic error in drbd_connect()\n");
790 				goto out_release_sockets;
791 			}
792 		}
793 
794 		if (sock && msock) {
795 			__set_current_state(TASK_INTERRUPTIBLE);
796 			schedule_timeout(HZ / 10);
797 			ok = drbd_socket_okay(mdev, &sock);
798 			ok = drbd_socket_okay(mdev, &msock) && ok;
799 			if (ok)
800 				break;
801 		}
802 
803 retry:
804 		s = drbd_wait_for_connect(mdev);
805 		if (s) {
806 			try = drbd_recv_fp(mdev, s);
807 			drbd_socket_okay(mdev, &sock);
808 			drbd_socket_okay(mdev, &msock);
809 			switch (try) {
810 			case P_HAND_SHAKE_S:
811 				if (sock) {
812 					dev_warn(DEV, "initial packet S crossed\n");
813 					sock_release(sock);
814 				}
815 				sock = s;
816 				break;
817 			case P_HAND_SHAKE_M:
818 				if (msock) {
819 					dev_warn(DEV, "initial packet M crossed\n");
820 					sock_release(msock);
821 				}
822 				msock = s;
823 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
824 				break;
825 			default:
826 				dev_warn(DEV, "Error receiving initial packet\n");
827 				sock_release(s);
828 				if (random32() & 1)
829 					goto retry;
830 			}
831 		}
832 
833 		if (mdev->state.conn <= C_DISCONNECTING)
834 			goto out_release_sockets;
835 		if (signal_pending(current)) {
836 			flush_signals(current);
837 			smp_rmb();
838 			if (get_t_state(&mdev->receiver) == Exiting)
839 				goto out_release_sockets;
840 		}
841 
842 		if (sock && msock) {
843 			ok = drbd_socket_okay(mdev, &sock);
844 			ok = drbd_socket_okay(mdev, &msock) && ok;
845 			if (ok)
846 				break;
847 		}
848 	} while (1);
849 
850 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
851 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
852 
853 	sock->sk->sk_allocation = GFP_NOIO;
854 	msock->sk->sk_allocation = GFP_NOIO;
855 
856 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
857 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
858 
859 	if (mdev->net_conf->sndbuf_size) {
860 		sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
861 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
862 	}
863 
864 	if (mdev->net_conf->rcvbuf_size) {
865 		sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
866 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
867 	}
868 
869 	/* NOT YET ...
870 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
871 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
872 	 * first set it to the P_HAND_SHAKE timeout,
873 	 * which we set to 4x the configured ping_timeout. */
874 	sock->sk->sk_sndtimeo =
875 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
876 
877 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
878 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
879 
880 	/* we don't want delays.
881 	 * we use TCP_CORK where apropriate, though */
882 	drbd_tcp_nodelay(sock);
883 	drbd_tcp_nodelay(msock);
884 
885 	mdev->data.socket = sock;
886 	mdev->meta.socket = msock;
887 	mdev->last_received = jiffies;
888 
889 	D_ASSERT(mdev->asender.task == NULL);
890 
891 	h = drbd_do_handshake(mdev);
892 	if (h <= 0)
893 		return h;
894 
895 	if (mdev->cram_hmac_tfm) {
896 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
897 		switch (drbd_do_auth(mdev)) {
898 		case -1:
899 			dev_err(DEV, "Authentication of peer failed\n");
900 			return -1;
901 		case 0:
902 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
903 			return 0;
904 		}
905 	}
906 
907 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
908 		return 0;
909 
910 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
911 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
912 
913 	atomic_set(&mdev->packet_seq, 0);
914 	mdev->peer_seq = 0;
915 
916 	drbd_thread_start(&mdev->asender);
917 
918 	if (!drbd_send_protocol(mdev))
919 		return -1;
920 	drbd_send_sync_param(mdev, &mdev->sync_conf);
921 	drbd_send_sizes(mdev, 0, 0);
922 	drbd_send_uuids(mdev);
923 	drbd_send_state(mdev);
924 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
925 	clear_bit(RESIZE_PENDING, &mdev->flags);
926 
927 	return 1;
928 
929 out_release_sockets:
930 	if (sock)
931 		sock_release(sock);
932 	if (msock)
933 		sock_release(msock);
934 	return -1;
935 }
936 
937 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
938 {
939 	int r;
940 
941 	r = drbd_recv(mdev, h, sizeof(*h));
942 
943 	if (unlikely(r != sizeof(*h))) {
944 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
945 		return FALSE;
946 	};
947 	h->command = be16_to_cpu(h->command);
948 	h->length  = be16_to_cpu(h->length);
949 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
950 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
951 		    (long)be32_to_cpu(h->magic),
952 		    h->command, h->length);
953 		return FALSE;
954 	}
955 	mdev->last_received = jiffies;
956 
957 	return TRUE;
958 }
959 
960 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
961 {
962 	int rv;
963 
964 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
965 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
966 					NULL, BLKDEV_IFL_WAIT);
967 		if (rv) {
968 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
969 			/* would rather check on EOPNOTSUPP, but that is not reliable.
970 			 * don't try again for ANY return value != 0
971 			 * if (rv == -EOPNOTSUPP) */
972 			drbd_bump_write_ordering(mdev, WO_drain_io);
973 		}
974 		put_ldev(mdev);
975 	}
976 
977 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
978 }
979 
980 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
981 {
982 	struct flush_work *fw = (struct flush_work *)w;
983 	struct drbd_epoch *epoch = fw->epoch;
984 
985 	kfree(w);
986 
987 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
988 		drbd_flush_after_epoch(mdev, epoch);
989 
990 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
991 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
992 
993 	return 1;
994 }
995 
996 /**
997  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
998  * @mdev:	DRBD device.
999  * @epoch:	Epoch object.
1000  * @ev:		Epoch event.
1001  */
1002 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1003 					       struct drbd_epoch *epoch,
1004 					       enum epoch_event ev)
1005 {
1006 	int finish, epoch_size;
1007 	struct drbd_epoch *next_epoch;
1008 	int schedule_flush = 0;
1009 	enum finish_epoch rv = FE_STILL_LIVE;
1010 
1011 	spin_lock(&mdev->epoch_lock);
1012 	do {
1013 		next_epoch = NULL;
1014 		finish = 0;
1015 
1016 		epoch_size = atomic_read(&epoch->epoch_size);
1017 
1018 		switch (ev & ~EV_CLEANUP) {
1019 		case EV_PUT:
1020 			atomic_dec(&epoch->active);
1021 			break;
1022 		case EV_GOT_BARRIER_NR:
1023 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1024 
1025 			/* Special case: If we just switched from WO_bio_barrier to
1026 			   WO_bdev_flush we should not finish the current epoch */
1027 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1028 			    mdev->write_ordering != WO_bio_barrier &&
1029 			    epoch == mdev->current_epoch)
1030 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1031 			break;
1032 		case EV_BARRIER_DONE:
1033 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1034 			break;
1035 		case EV_BECAME_LAST:
1036 			/* nothing to do*/
1037 			break;
1038 		}
1039 
1040 		if (epoch_size != 0 &&
1041 		    atomic_read(&epoch->active) == 0 &&
1042 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1043 		    epoch->list.prev == &mdev->current_epoch->list &&
1044 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1045 			/* Nearly all conditions are met to finish that epoch... */
1046 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1047 			    mdev->write_ordering == WO_none ||
1048 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1049 			    ev & EV_CLEANUP) {
1050 				finish = 1;
1051 				set_bit(DE_IS_FINISHING, &epoch->flags);
1052 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1053 				 mdev->write_ordering == WO_bio_barrier) {
1054 				atomic_inc(&epoch->active);
1055 				schedule_flush = 1;
1056 			}
1057 		}
1058 		if (finish) {
1059 			if (!(ev & EV_CLEANUP)) {
1060 				spin_unlock(&mdev->epoch_lock);
1061 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1062 				spin_lock(&mdev->epoch_lock);
1063 			}
1064 			dec_unacked(mdev);
1065 
1066 			if (mdev->current_epoch != epoch) {
1067 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1068 				list_del(&epoch->list);
1069 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1070 				mdev->epochs--;
1071 				kfree(epoch);
1072 
1073 				if (rv == FE_STILL_LIVE)
1074 					rv = FE_DESTROYED;
1075 			} else {
1076 				epoch->flags = 0;
1077 				atomic_set(&epoch->epoch_size, 0);
1078 				/* atomic_set(&epoch->active, 0); is alrady zero */
1079 				if (rv == FE_STILL_LIVE)
1080 					rv = FE_RECYCLED;
1081 			}
1082 		}
1083 
1084 		if (!next_epoch)
1085 			break;
1086 
1087 		epoch = next_epoch;
1088 	} while (1);
1089 
1090 	spin_unlock(&mdev->epoch_lock);
1091 
1092 	if (schedule_flush) {
1093 		struct flush_work *fw;
1094 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1095 		if (fw) {
1096 			fw->w.cb = w_flush;
1097 			fw->epoch = epoch;
1098 			drbd_queue_work(&mdev->data.work, &fw->w);
1099 		} else {
1100 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1101 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1102 			/* That is not a recursion, only one level */
1103 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1104 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1105 		}
1106 	}
1107 
1108 	return rv;
1109 }
1110 
1111 /**
1112  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1113  * @mdev:	DRBD device.
1114  * @wo:		Write ordering method to try.
1115  */
1116 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1117 {
1118 	enum write_ordering_e pwo;
1119 	static char *write_ordering_str[] = {
1120 		[WO_none] = "none",
1121 		[WO_drain_io] = "drain",
1122 		[WO_bdev_flush] = "flush",
1123 		[WO_bio_barrier] = "barrier",
1124 	};
1125 
1126 	pwo = mdev->write_ordering;
1127 	wo = min(pwo, wo);
1128 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1129 		wo = WO_bdev_flush;
1130 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1131 		wo = WO_drain_io;
1132 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1133 		wo = WO_none;
1134 	mdev->write_ordering = wo;
1135 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1136 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1137 }
1138 
1139 /**
1140  * drbd_submit_ee()
1141  * @mdev:	DRBD device.
1142  * @e:		epoch entry
1143  * @rw:		flag field, see bio->bi_rw
1144  */
1145 /* TODO allocate from our own bio_set. */
1146 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1147 		const unsigned rw, const int fault_type)
1148 {
1149 	struct bio *bios = NULL;
1150 	struct bio *bio;
1151 	struct page *page = e->pages;
1152 	sector_t sector = e->sector;
1153 	unsigned ds = e->size;
1154 	unsigned n_bios = 0;
1155 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1156 
1157 	if (atomic_read(&mdev->new_c_uuid)) {
1158 		if (atomic_add_unless(&mdev->new_c_uuid, -1, 1)) {
1159 			drbd_uuid_new_current(mdev);
1160 			drbd_md_sync(mdev);
1161 
1162 			atomic_dec(&mdev->new_c_uuid);
1163 			wake_up(&mdev->misc_wait);
1164 		}
1165 		wait_event(mdev->misc_wait, !atomic_read(&mdev->new_c_uuid));
1166 	}
1167 
1168 	/* In most cases, we will only need one bio.  But in case the lower
1169 	 * level restrictions happen to be different at this offset on this
1170 	 * side than those of the sending peer, we may need to submit the
1171 	 * request in more than one bio. */
1172 next_bio:
1173 	bio = bio_alloc(GFP_NOIO, nr_pages);
1174 	if (!bio) {
1175 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1176 		goto fail;
1177 	}
1178 	/* > e->sector, unless this is the first bio */
1179 	bio->bi_sector = sector;
1180 	bio->bi_bdev = mdev->ldev->backing_bdev;
1181 	/* we special case some flags in the multi-bio case, see below
1182 	 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1183 	bio->bi_rw = rw;
1184 	bio->bi_private = e;
1185 	bio->bi_end_io = drbd_endio_sec;
1186 
1187 	bio->bi_next = bios;
1188 	bios = bio;
1189 	++n_bios;
1190 
1191 	page_chain_for_each(page) {
1192 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1193 		if (!bio_add_page(bio, page, len, 0)) {
1194 			/* a single page must always be possible! */
1195 			BUG_ON(bio->bi_vcnt == 0);
1196 			goto next_bio;
1197 		}
1198 		ds -= len;
1199 		sector += len >> 9;
1200 		--nr_pages;
1201 	}
1202 	D_ASSERT(page == NULL);
1203 	D_ASSERT(ds == 0);
1204 
1205 	atomic_set(&e->pending_bios, n_bios);
1206 	do {
1207 		bio = bios;
1208 		bios = bios->bi_next;
1209 		bio->bi_next = NULL;
1210 
1211 		/* strip off BIO_RW_UNPLUG unless it is the last bio */
1212 		if (bios)
1213 			bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1214 
1215 		drbd_generic_make_request(mdev, fault_type, bio);
1216 
1217 		/* strip off BIO_RW_BARRIER,
1218 		 * unless it is the first or last bio */
1219 		if (bios && bios->bi_next)
1220 			bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1221 	} while (bios);
1222 	maybe_kick_lo(mdev);
1223 	return 0;
1224 
1225 fail:
1226 	while (bios) {
1227 		bio = bios;
1228 		bios = bios->bi_next;
1229 		bio_put(bio);
1230 	}
1231 	return -ENOMEM;
1232 }
1233 
1234 /**
1235  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1236  * @mdev:	DRBD device.
1237  * @w:		work object.
1238  * @cancel:	The connection will be closed anyways (unused in this callback)
1239  */
1240 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1241 {
1242 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1243 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1244 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1245 	   so that we can finish that epoch in drbd_may_finish_epoch().
1246 	   That is necessary if we already have a long chain of Epochs, before
1247 	   we realize that BIO_RW_BARRIER is actually not supported */
1248 
1249 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1250 	   that will never trigger. If it is reported late, we will just
1251 	   print that warning and continue correctly for all future requests
1252 	   with WO_bdev_flush */
1253 	if (previous_epoch(mdev, e->epoch))
1254 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1255 
1256 	/* we still have a local reference,
1257 	 * get_ldev was done in receive_Data. */
1258 
1259 	e->w.cb = e_end_block;
1260 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1261 		/* drbd_submit_ee fails for one reason only:
1262 		 * if was not able to allocate sufficient bios.
1263 		 * requeue, try again later. */
1264 		e->w.cb = w_e_reissue;
1265 		drbd_queue_work(&mdev->data.work, &e->w);
1266 	}
1267 	return 1;
1268 }
1269 
1270 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1271 {
1272 	int rv, issue_flush;
1273 	struct p_barrier *p = (struct p_barrier *)h;
1274 	struct drbd_epoch *epoch;
1275 
1276 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1277 
1278 	rv = drbd_recv(mdev, h->payload, h->length);
1279 	ERR_IF(rv != h->length) return FALSE;
1280 
1281 	inc_unacked(mdev);
1282 
1283 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1284 		drbd_kick_lo(mdev);
1285 
1286 	mdev->current_epoch->barrier_nr = p->barrier;
1287 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1288 
1289 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1290 	 * the activity log, which means it would not be resynced in case the
1291 	 * R_PRIMARY crashes now.
1292 	 * Therefore we must send the barrier_ack after the barrier request was
1293 	 * completed. */
1294 	switch (mdev->write_ordering) {
1295 	case WO_bio_barrier:
1296 	case WO_none:
1297 		if (rv == FE_RECYCLED)
1298 			return TRUE;
1299 		break;
1300 
1301 	case WO_bdev_flush:
1302 	case WO_drain_io:
1303 		if (rv == FE_STILL_LIVE) {
1304 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1305 			drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1306 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1307 		}
1308 		if (rv == FE_RECYCLED)
1309 			return TRUE;
1310 
1311 		/* The asender will send all the ACKs and barrier ACKs out, since
1312 		   all EEs moved from the active_ee to the done_ee. We need to
1313 		   provide a new epoch object for the EEs that come in soon */
1314 		break;
1315 	}
1316 
1317 	/* receiver context, in the writeout path of the other node.
1318 	 * avoid potential distributed deadlock */
1319 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1320 	if (!epoch) {
1321 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1322 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1323 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1324 		if (issue_flush) {
1325 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1326 			if (rv == FE_RECYCLED)
1327 				return TRUE;
1328 		}
1329 
1330 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1331 
1332 		return TRUE;
1333 	}
1334 
1335 	epoch->flags = 0;
1336 	atomic_set(&epoch->epoch_size, 0);
1337 	atomic_set(&epoch->active, 0);
1338 
1339 	spin_lock(&mdev->epoch_lock);
1340 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1341 		list_add(&epoch->list, &mdev->current_epoch->list);
1342 		mdev->current_epoch = epoch;
1343 		mdev->epochs++;
1344 	} else {
1345 		/* The current_epoch got recycled while we allocated this one... */
1346 		kfree(epoch);
1347 	}
1348 	spin_unlock(&mdev->epoch_lock);
1349 
1350 	return TRUE;
1351 }
1352 
1353 /* used from receive_RSDataReply (recv_resync_read)
1354  * and from receive_Data */
1355 static struct drbd_epoch_entry *
1356 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1357 {
1358 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1359 	struct drbd_epoch_entry *e;
1360 	struct page *page;
1361 	int dgs, ds, rr;
1362 	void *dig_in = mdev->int_dig_in;
1363 	void *dig_vv = mdev->int_dig_vv;
1364 	unsigned long *data;
1365 
1366 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1367 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1368 
1369 	if (dgs) {
1370 		rr = drbd_recv(mdev, dig_in, dgs);
1371 		if (rr != dgs) {
1372 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1373 			     rr, dgs);
1374 			return NULL;
1375 		}
1376 	}
1377 
1378 	data_size -= dgs;
1379 
1380 	ERR_IF(data_size &  0x1ff) return NULL;
1381 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1382 
1383 	/* even though we trust out peer,
1384 	 * we sometimes have to double check. */
1385 	if (sector + (data_size>>9) > capacity) {
1386 		dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1387 			(unsigned long long)capacity,
1388 			(unsigned long long)sector, data_size);
1389 		return NULL;
1390 	}
1391 
1392 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1393 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1394 	 * which in turn might block on the other node at this very place.  */
1395 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1396 	if (!e)
1397 		return NULL;
1398 
1399 	ds = data_size;
1400 	page = e->pages;
1401 	page_chain_for_each(page) {
1402 		unsigned len = min_t(int, ds, PAGE_SIZE);
1403 		data = kmap(page);
1404 		rr = drbd_recv(mdev, data, len);
1405 		if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1406 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1407 			data[0] = data[0] ^ (unsigned long)-1;
1408 		}
1409 		kunmap(page);
1410 		if (rr != len) {
1411 			drbd_free_ee(mdev, e);
1412 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1413 			     rr, len);
1414 			return NULL;
1415 		}
1416 		ds -= rr;
1417 	}
1418 
1419 	if (dgs) {
1420 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1421 		if (memcmp(dig_in, dig_vv, dgs)) {
1422 			dev_err(DEV, "Digest integrity check FAILED.\n");
1423 			drbd_bcast_ee(mdev, "digest failed",
1424 					dgs, dig_in, dig_vv, e);
1425 			drbd_free_ee(mdev, e);
1426 			return NULL;
1427 		}
1428 	}
1429 	mdev->recv_cnt += data_size>>9;
1430 	return e;
1431 }
1432 
1433 /* drbd_drain_block() just takes a data block
1434  * out of the socket input buffer, and discards it.
1435  */
1436 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1437 {
1438 	struct page *page;
1439 	int rr, rv = 1;
1440 	void *data;
1441 
1442 	if (!data_size)
1443 		return TRUE;
1444 
1445 	page = drbd_pp_alloc(mdev, 1, 1);
1446 
1447 	data = kmap(page);
1448 	while (data_size) {
1449 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1450 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1451 			rv = 0;
1452 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1453 			     rr, min_t(int, data_size, PAGE_SIZE));
1454 			break;
1455 		}
1456 		data_size -= rr;
1457 	}
1458 	kunmap(page);
1459 	drbd_pp_free(mdev, page);
1460 	return rv;
1461 }
1462 
1463 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1464 			   sector_t sector, int data_size)
1465 {
1466 	struct bio_vec *bvec;
1467 	struct bio *bio;
1468 	int dgs, rr, i, expect;
1469 	void *dig_in = mdev->int_dig_in;
1470 	void *dig_vv = mdev->int_dig_vv;
1471 
1472 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1473 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1474 
1475 	if (dgs) {
1476 		rr = drbd_recv(mdev, dig_in, dgs);
1477 		if (rr != dgs) {
1478 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1479 			     rr, dgs);
1480 			return 0;
1481 		}
1482 	}
1483 
1484 	data_size -= dgs;
1485 
1486 	/* optimistically update recv_cnt.  if receiving fails below,
1487 	 * we disconnect anyways, and counters will be reset. */
1488 	mdev->recv_cnt += data_size>>9;
1489 
1490 	bio = req->master_bio;
1491 	D_ASSERT(sector == bio->bi_sector);
1492 
1493 	bio_for_each_segment(bvec, bio, i) {
1494 		expect = min_t(int, data_size, bvec->bv_len);
1495 		rr = drbd_recv(mdev,
1496 			     kmap(bvec->bv_page)+bvec->bv_offset,
1497 			     expect);
1498 		kunmap(bvec->bv_page);
1499 		if (rr != expect) {
1500 			dev_warn(DEV, "short read receiving data reply: "
1501 			     "read %d expected %d\n",
1502 			     rr, expect);
1503 			return 0;
1504 		}
1505 		data_size -= rr;
1506 	}
1507 
1508 	if (dgs) {
1509 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1510 		if (memcmp(dig_in, dig_vv, dgs)) {
1511 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1512 			return 0;
1513 		}
1514 	}
1515 
1516 	D_ASSERT(data_size == 0);
1517 	return 1;
1518 }
1519 
1520 /* e_end_resync_block() is called via
1521  * drbd_process_done_ee() by asender only */
1522 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1523 {
1524 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1525 	sector_t sector = e->sector;
1526 	int ok;
1527 
1528 	D_ASSERT(hlist_unhashed(&e->colision));
1529 
1530 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1531 		drbd_set_in_sync(mdev, sector, e->size);
1532 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1533 	} else {
1534 		/* Record failure to sync */
1535 		drbd_rs_failed_io(mdev, sector, e->size);
1536 
1537 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1538 	}
1539 	dec_unacked(mdev);
1540 
1541 	return ok;
1542 }
1543 
1544 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1545 {
1546 	struct drbd_epoch_entry *e;
1547 
1548 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1549 	if (!e)
1550 		goto fail;
1551 
1552 	dec_rs_pending(mdev);
1553 
1554 	inc_unacked(mdev);
1555 	/* corresponding dec_unacked() in e_end_resync_block()
1556 	 * respective _drbd_clear_done_ee */
1557 
1558 	e->w.cb = e_end_resync_block;
1559 
1560 	spin_lock_irq(&mdev->req_lock);
1561 	list_add(&e->w.list, &mdev->sync_ee);
1562 	spin_unlock_irq(&mdev->req_lock);
1563 
1564 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1565 		return TRUE;
1566 
1567 	drbd_free_ee(mdev, e);
1568 fail:
1569 	put_ldev(mdev);
1570 	return FALSE;
1571 }
1572 
1573 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1574 {
1575 	struct drbd_request *req;
1576 	sector_t sector;
1577 	unsigned int header_size, data_size;
1578 	int ok;
1579 	struct p_data *p = (struct p_data *)h;
1580 
1581 	header_size = sizeof(*p) - sizeof(*h);
1582 	data_size   = h->length  - header_size;
1583 
1584 	ERR_IF(data_size == 0) return FALSE;
1585 
1586 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1587 		return FALSE;
1588 
1589 	sector = be64_to_cpu(p->sector);
1590 
1591 	spin_lock_irq(&mdev->req_lock);
1592 	req = _ar_id_to_req(mdev, p->block_id, sector);
1593 	spin_unlock_irq(&mdev->req_lock);
1594 	if (unlikely(!req)) {
1595 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1596 		return FALSE;
1597 	}
1598 
1599 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1600 	 * special casing it there for the various failure cases.
1601 	 * still no race with drbd_fail_pending_reads */
1602 	ok = recv_dless_read(mdev, req, sector, data_size);
1603 
1604 	if (ok)
1605 		req_mod(req, data_received);
1606 	/* else: nothing. handled from drbd_disconnect...
1607 	 * I don't think we may complete this just yet
1608 	 * in case we are "on-disconnect: freeze" */
1609 
1610 	return ok;
1611 }
1612 
1613 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1614 {
1615 	sector_t sector;
1616 	unsigned int header_size, data_size;
1617 	int ok;
1618 	struct p_data *p = (struct p_data *)h;
1619 
1620 	header_size = sizeof(*p) - sizeof(*h);
1621 	data_size   = h->length  - header_size;
1622 
1623 	ERR_IF(data_size == 0) return FALSE;
1624 
1625 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1626 		return FALSE;
1627 
1628 	sector = be64_to_cpu(p->sector);
1629 	D_ASSERT(p->block_id == ID_SYNCER);
1630 
1631 	if (get_ldev(mdev)) {
1632 		/* data is submitted to disk within recv_resync_read.
1633 		 * corresponding put_ldev done below on error,
1634 		 * or in drbd_endio_write_sec. */
1635 		ok = recv_resync_read(mdev, sector, data_size);
1636 	} else {
1637 		if (__ratelimit(&drbd_ratelimit_state))
1638 			dev_err(DEV, "Can not write resync data to local disk.\n");
1639 
1640 		ok = drbd_drain_block(mdev, data_size);
1641 
1642 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1643 	}
1644 
1645 	return ok;
1646 }
1647 
1648 /* e_end_block() is called via drbd_process_done_ee().
1649  * this means this function only runs in the asender thread
1650  */
1651 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1652 {
1653 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1654 	sector_t sector = e->sector;
1655 	struct drbd_epoch *epoch;
1656 	int ok = 1, pcmd;
1657 
1658 	if (e->flags & EE_IS_BARRIER) {
1659 		epoch = previous_epoch(mdev, e->epoch);
1660 		if (epoch)
1661 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1662 	}
1663 
1664 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1665 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1666 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1667 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1668 				e->flags & EE_MAY_SET_IN_SYNC) ?
1669 				P_RS_WRITE_ACK : P_WRITE_ACK;
1670 			ok &= drbd_send_ack(mdev, pcmd, e);
1671 			if (pcmd == P_RS_WRITE_ACK)
1672 				drbd_set_in_sync(mdev, sector, e->size);
1673 		} else {
1674 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1675 			/* we expect it to be marked out of sync anyways...
1676 			 * maybe assert this?  */
1677 		}
1678 		dec_unacked(mdev);
1679 	}
1680 	/* we delete from the conflict detection hash _after_ we sent out the
1681 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1682 	if (mdev->net_conf->two_primaries) {
1683 		spin_lock_irq(&mdev->req_lock);
1684 		D_ASSERT(!hlist_unhashed(&e->colision));
1685 		hlist_del_init(&e->colision);
1686 		spin_unlock_irq(&mdev->req_lock);
1687 	} else {
1688 		D_ASSERT(hlist_unhashed(&e->colision));
1689 	}
1690 
1691 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1692 
1693 	return ok;
1694 }
1695 
1696 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1697 {
1698 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1699 	int ok = 1;
1700 
1701 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1702 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1703 
1704 	spin_lock_irq(&mdev->req_lock);
1705 	D_ASSERT(!hlist_unhashed(&e->colision));
1706 	hlist_del_init(&e->colision);
1707 	spin_unlock_irq(&mdev->req_lock);
1708 
1709 	dec_unacked(mdev);
1710 
1711 	return ok;
1712 }
1713 
1714 /* Called from receive_Data.
1715  * Synchronize packets on sock with packets on msock.
1716  *
1717  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1718  * packet traveling on msock, they are still processed in the order they have
1719  * been sent.
1720  *
1721  * Note: we don't care for Ack packets overtaking P_DATA packets.
1722  *
1723  * In case packet_seq is larger than mdev->peer_seq number, there are
1724  * outstanding packets on the msock. We wait for them to arrive.
1725  * In case we are the logically next packet, we update mdev->peer_seq
1726  * ourselves. Correctly handles 32bit wrap around.
1727  *
1728  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1729  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1730  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1731  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1732  *
1733  * returns 0 if we may process the packet,
1734  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1735 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1736 {
1737 	DEFINE_WAIT(wait);
1738 	unsigned int p_seq;
1739 	long timeout;
1740 	int ret = 0;
1741 	spin_lock(&mdev->peer_seq_lock);
1742 	for (;;) {
1743 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1744 		if (seq_le(packet_seq, mdev->peer_seq+1))
1745 			break;
1746 		if (signal_pending(current)) {
1747 			ret = -ERESTARTSYS;
1748 			break;
1749 		}
1750 		p_seq = mdev->peer_seq;
1751 		spin_unlock(&mdev->peer_seq_lock);
1752 		timeout = schedule_timeout(30*HZ);
1753 		spin_lock(&mdev->peer_seq_lock);
1754 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1755 			ret = -ETIMEDOUT;
1756 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1757 			break;
1758 		}
1759 	}
1760 	finish_wait(&mdev->seq_wait, &wait);
1761 	if (mdev->peer_seq+1 == packet_seq)
1762 		mdev->peer_seq++;
1763 	spin_unlock(&mdev->peer_seq_lock);
1764 	return ret;
1765 }
1766 
1767 /* mirrored write */
1768 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1769 {
1770 	sector_t sector;
1771 	struct drbd_epoch_entry *e;
1772 	struct p_data *p = (struct p_data *)h;
1773 	int header_size, data_size;
1774 	int rw = WRITE;
1775 	u32 dp_flags;
1776 
1777 	header_size = sizeof(*p) - sizeof(*h);
1778 	data_size   = h->length  - header_size;
1779 
1780 	ERR_IF(data_size == 0) return FALSE;
1781 
1782 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1783 		return FALSE;
1784 
1785 	if (!get_ldev(mdev)) {
1786 		if (__ratelimit(&drbd_ratelimit_state))
1787 			dev_err(DEV, "Can not write mirrored data block "
1788 			    "to local disk.\n");
1789 		spin_lock(&mdev->peer_seq_lock);
1790 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1791 			mdev->peer_seq++;
1792 		spin_unlock(&mdev->peer_seq_lock);
1793 
1794 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1795 		atomic_inc(&mdev->current_epoch->epoch_size);
1796 		return drbd_drain_block(mdev, data_size);
1797 	}
1798 
1799 	/* get_ldev(mdev) successful.
1800 	 * Corresponding put_ldev done either below (on various errors),
1801 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1802 	 * the end of this function. */
1803 
1804 	sector = be64_to_cpu(p->sector);
1805 	e = read_in_block(mdev, p->block_id, sector, data_size);
1806 	if (!e) {
1807 		put_ldev(mdev);
1808 		return FALSE;
1809 	}
1810 
1811 	e->w.cb = e_end_block;
1812 
1813 	spin_lock(&mdev->epoch_lock);
1814 	e->epoch = mdev->current_epoch;
1815 	atomic_inc(&e->epoch->epoch_size);
1816 	atomic_inc(&e->epoch->active);
1817 
1818 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1819 		struct drbd_epoch *epoch;
1820 		/* Issue a barrier if we start a new epoch, and the previous epoch
1821 		   was not a epoch containing a single request which already was
1822 		   a Barrier. */
1823 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1824 		if (epoch == e->epoch) {
1825 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1826 			rw |= (1<<BIO_RW_BARRIER);
1827 			e->flags |= EE_IS_BARRIER;
1828 		} else {
1829 			if (atomic_read(&epoch->epoch_size) > 1 ||
1830 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1831 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1832 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1833 				rw |= (1<<BIO_RW_BARRIER);
1834 				e->flags |= EE_IS_BARRIER;
1835 			}
1836 		}
1837 	}
1838 	spin_unlock(&mdev->epoch_lock);
1839 
1840 	dp_flags = be32_to_cpu(p->dp_flags);
1841 	if (dp_flags & DP_HARDBARRIER) {
1842 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1843 		/* rw |= (1<<BIO_RW_BARRIER); */
1844 	}
1845 	if (dp_flags & DP_RW_SYNC)
1846 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1847 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1848 		e->flags |= EE_MAY_SET_IN_SYNC;
1849 
1850 	/* I'm the receiver, I do hold a net_cnt reference. */
1851 	if (!mdev->net_conf->two_primaries) {
1852 		spin_lock_irq(&mdev->req_lock);
1853 	} else {
1854 		/* don't get the req_lock yet,
1855 		 * we may sleep in drbd_wait_peer_seq */
1856 		const int size = e->size;
1857 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1858 		DEFINE_WAIT(wait);
1859 		struct drbd_request *i;
1860 		struct hlist_node *n;
1861 		struct hlist_head *slot;
1862 		int first;
1863 
1864 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1865 		BUG_ON(mdev->ee_hash == NULL);
1866 		BUG_ON(mdev->tl_hash == NULL);
1867 
1868 		/* conflict detection and handling:
1869 		 * 1. wait on the sequence number,
1870 		 *    in case this data packet overtook ACK packets.
1871 		 * 2. check our hash tables for conflicting requests.
1872 		 *    we only need to walk the tl_hash, since an ee can not
1873 		 *    have a conflict with an other ee: on the submitting
1874 		 *    node, the corresponding req had already been conflicting,
1875 		 *    and a conflicting req is never sent.
1876 		 *
1877 		 * Note: for two_primaries, we are protocol C,
1878 		 * so there cannot be any request that is DONE
1879 		 * but still on the transfer log.
1880 		 *
1881 		 * unconditionally add to the ee_hash.
1882 		 *
1883 		 * if no conflicting request is found:
1884 		 *    submit.
1885 		 *
1886 		 * if any conflicting request is found
1887 		 * that has not yet been acked,
1888 		 * AND I have the "discard concurrent writes" flag:
1889 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1890 		 *
1891 		 * if any conflicting request is found:
1892 		 *	 block the receiver, waiting on misc_wait
1893 		 *	 until no more conflicting requests are there,
1894 		 *	 or we get interrupted (disconnect).
1895 		 *
1896 		 *	 we do not just write after local io completion of those
1897 		 *	 requests, but only after req is done completely, i.e.
1898 		 *	 we wait for the P_DISCARD_ACK to arrive!
1899 		 *
1900 		 *	 then proceed normally, i.e. submit.
1901 		 */
1902 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1903 			goto out_interrupted;
1904 
1905 		spin_lock_irq(&mdev->req_lock);
1906 
1907 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1908 
1909 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1910 		slot = tl_hash_slot(mdev, sector);
1911 		first = 1;
1912 		for (;;) {
1913 			int have_unacked = 0;
1914 			int have_conflict = 0;
1915 			prepare_to_wait(&mdev->misc_wait, &wait,
1916 				TASK_INTERRUPTIBLE);
1917 			hlist_for_each_entry(i, n, slot, colision) {
1918 				if (OVERLAPS) {
1919 					/* only ALERT on first iteration,
1920 					 * we may be woken up early... */
1921 					if (first)
1922 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1923 						      "	new: %llus +%u; pending: %llus +%u\n",
1924 						      current->comm, current->pid,
1925 						      (unsigned long long)sector, size,
1926 						      (unsigned long long)i->sector, i->size);
1927 					if (i->rq_state & RQ_NET_PENDING)
1928 						++have_unacked;
1929 					++have_conflict;
1930 				}
1931 			}
1932 #undef OVERLAPS
1933 			if (!have_conflict)
1934 				break;
1935 
1936 			/* Discard Ack only for the _first_ iteration */
1937 			if (first && discard && have_unacked) {
1938 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1939 				     (unsigned long long)sector);
1940 				inc_unacked(mdev);
1941 				e->w.cb = e_send_discard_ack;
1942 				list_add_tail(&e->w.list, &mdev->done_ee);
1943 
1944 				spin_unlock_irq(&mdev->req_lock);
1945 
1946 				/* we could probably send that P_DISCARD_ACK ourselves,
1947 				 * but I don't like the receiver using the msock */
1948 
1949 				put_ldev(mdev);
1950 				wake_asender(mdev);
1951 				finish_wait(&mdev->misc_wait, &wait);
1952 				return TRUE;
1953 			}
1954 
1955 			if (signal_pending(current)) {
1956 				hlist_del_init(&e->colision);
1957 
1958 				spin_unlock_irq(&mdev->req_lock);
1959 
1960 				finish_wait(&mdev->misc_wait, &wait);
1961 				goto out_interrupted;
1962 			}
1963 
1964 			spin_unlock_irq(&mdev->req_lock);
1965 			if (first) {
1966 				first = 0;
1967 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1968 				     "sec=%llus\n", (unsigned long long)sector);
1969 			} else if (discard) {
1970 				/* we had none on the first iteration.
1971 				 * there must be none now. */
1972 				D_ASSERT(have_unacked == 0);
1973 			}
1974 			schedule();
1975 			spin_lock_irq(&mdev->req_lock);
1976 		}
1977 		finish_wait(&mdev->misc_wait, &wait);
1978 	}
1979 
1980 	list_add(&e->w.list, &mdev->active_ee);
1981 	spin_unlock_irq(&mdev->req_lock);
1982 
1983 	switch (mdev->net_conf->wire_protocol) {
1984 	case DRBD_PROT_C:
1985 		inc_unacked(mdev);
1986 		/* corresponding dec_unacked() in e_end_block()
1987 		 * respective _drbd_clear_done_ee */
1988 		break;
1989 	case DRBD_PROT_B:
1990 		/* I really don't like it that the receiver thread
1991 		 * sends on the msock, but anyways */
1992 		drbd_send_ack(mdev, P_RECV_ACK, e);
1993 		break;
1994 	case DRBD_PROT_A:
1995 		/* nothing to do */
1996 		break;
1997 	}
1998 
1999 	if (mdev->state.pdsk == D_DISKLESS) {
2000 		/* In case we have the only disk of the cluster, */
2001 		drbd_set_out_of_sync(mdev, e->sector, e->size);
2002 		e->flags |= EE_CALL_AL_COMPLETE_IO;
2003 		drbd_al_begin_io(mdev, e->sector);
2004 	}
2005 
2006 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2007 		return TRUE;
2008 
2009 out_interrupted:
2010 	/* yes, the epoch_size now is imbalanced.
2011 	 * but we drop the connection anyways, so we don't have a chance to
2012 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2013 	put_ldev(mdev);
2014 	drbd_free_ee(mdev, e);
2015 	return FALSE;
2016 }
2017 
2018 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2019 {
2020 	sector_t sector;
2021 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2022 	struct drbd_epoch_entry *e;
2023 	struct digest_info *di = NULL;
2024 	int size, digest_size;
2025 	unsigned int fault_type;
2026 	struct p_block_req *p =
2027 		(struct p_block_req *)h;
2028 	const int brps = sizeof(*p)-sizeof(*h);
2029 
2030 	if (drbd_recv(mdev, h->payload, brps) != brps)
2031 		return FALSE;
2032 
2033 	sector = be64_to_cpu(p->sector);
2034 	size   = be32_to_cpu(p->blksize);
2035 
2036 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2037 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2038 				(unsigned long long)sector, size);
2039 		return FALSE;
2040 	}
2041 	if (sector + (size>>9) > capacity) {
2042 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2043 				(unsigned long long)sector, size);
2044 		return FALSE;
2045 	}
2046 
2047 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2048 		if (__ratelimit(&drbd_ratelimit_state))
2049 			dev_err(DEV, "Can not satisfy peer's read request, "
2050 			    "no local data.\n");
2051 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2052 				 P_NEG_RS_DREPLY , p);
2053 		return drbd_drain_block(mdev, h->length - brps);
2054 	}
2055 
2056 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2057 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2058 	 * which in turn might block on the other node at this very place.  */
2059 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2060 	if (!e) {
2061 		put_ldev(mdev);
2062 		return FALSE;
2063 	}
2064 
2065 	switch (h->command) {
2066 	case P_DATA_REQUEST:
2067 		e->w.cb = w_e_end_data_req;
2068 		fault_type = DRBD_FAULT_DT_RD;
2069 		break;
2070 	case P_RS_DATA_REQUEST:
2071 		e->w.cb = w_e_end_rsdata_req;
2072 		fault_type = DRBD_FAULT_RS_RD;
2073 		/* Eventually this should become asynchronously. Currently it
2074 		 * blocks the whole receiver just to delay the reading of a
2075 		 * resync data block.
2076 		 * the drbd_work_queue mechanism is made for this...
2077 		 */
2078 		if (!drbd_rs_begin_io(mdev, sector)) {
2079 			/* we have been interrupted,
2080 			 * probably connection lost! */
2081 			D_ASSERT(signal_pending(current));
2082 			goto out_free_e;
2083 		}
2084 		break;
2085 
2086 	case P_OV_REPLY:
2087 	case P_CSUM_RS_REQUEST:
2088 		fault_type = DRBD_FAULT_RS_RD;
2089 		digest_size = h->length - brps ;
2090 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2091 		if (!di)
2092 			goto out_free_e;
2093 
2094 		di->digest_size = digest_size;
2095 		di->digest = (((char *)di)+sizeof(struct digest_info));
2096 
2097 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2098 			goto out_free_e;
2099 
2100 		e->block_id = (u64)(unsigned long)di;
2101 		if (h->command == P_CSUM_RS_REQUEST) {
2102 			D_ASSERT(mdev->agreed_pro_version >= 89);
2103 			e->w.cb = w_e_end_csum_rs_req;
2104 		} else if (h->command == P_OV_REPLY) {
2105 			e->w.cb = w_e_end_ov_reply;
2106 			dec_rs_pending(mdev);
2107 			break;
2108 		}
2109 
2110 		if (!drbd_rs_begin_io(mdev, sector)) {
2111 			/* we have been interrupted, probably connection lost! */
2112 			D_ASSERT(signal_pending(current));
2113 			goto out_free_e;
2114 		}
2115 		break;
2116 
2117 	case P_OV_REQUEST:
2118 		if (mdev->state.conn >= C_CONNECTED &&
2119 		    mdev->state.conn != C_VERIFY_T)
2120 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2121 				drbd_conn_str(mdev->state.conn));
2122 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2123 		    mdev->agreed_pro_version >= 90) {
2124 			mdev->ov_start_sector = sector;
2125 			mdev->ov_position = sector;
2126 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2127 			dev_info(DEV, "Online Verify start sector: %llu\n",
2128 					(unsigned long long)sector);
2129 		}
2130 		e->w.cb = w_e_end_ov_req;
2131 		fault_type = DRBD_FAULT_RS_RD;
2132 		/* Eventually this should become asynchronous. Currently it
2133 		 * blocks the whole receiver just to delay the reading of a
2134 		 * resync data block.
2135 		 * the drbd_work_queue mechanism is made for this...
2136 		 */
2137 		if (!drbd_rs_begin_io(mdev, sector)) {
2138 			/* we have been interrupted,
2139 			 * probably connection lost! */
2140 			D_ASSERT(signal_pending(current));
2141 			goto out_free_e;
2142 		}
2143 		break;
2144 
2145 
2146 	default:
2147 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2148 		    cmdname(h->command));
2149 		fault_type = DRBD_FAULT_MAX;
2150 	}
2151 
2152 	spin_lock_irq(&mdev->req_lock);
2153 	list_add(&e->w.list, &mdev->read_ee);
2154 	spin_unlock_irq(&mdev->req_lock);
2155 
2156 	inc_unacked(mdev);
2157 
2158 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2159 		return TRUE;
2160 
2161 out_free_e:
2162 	kfree(di);
2163 	put_ldev(mdev);
2164 	drbd_free_ee(mdev, e);
2165 	return FALSE;
2166 }
2167 
2168 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2169 {
2170 	int self, peer, rv = -100;
2171 	unsigned long ch_self, ch_peer;
2172 
2173 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2174 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2175 
2176 	ch_peer = mdev->p_uuid[UI_SIZE];
2177 	ch_self = mdev->comm_bm_set;
2178 
2179 	switch (mdev->net_conf->after_sb_0p) {
2180 	case ASB_CONSENSUS:
2181 	case ASB_DISCARD_SECONDARY:
2182 	case ASB_CALL_HELPER:
2183 		dev_err(DEV, "Configuration error.\n");
2184 		break;
2185 	case ASB_DISCONNECT:
2186 		break;
2187 	case ASB_DISCARD_YOUNGER_PRI:
2188 		if (self == 0 && peer == 1) {
2189 			rv = -1;
2190 			break;
2191 		}
2192 		if (self == 1 && peer == 0) {
2193 			rv =  1;
2194 			break;
2195 		}
2196 		/* Else fall through to one of the other strategies... */
2197 	case ASB_DISCARD_OLDER_PRI:
2198 		if (self == 0 && peer == 1) {
2199 			rv = 1;
2200 			break;
2201 		}
2202 		if (self == 1 && peer == 0) {
2203 			rv = -1;
2204 			break;
2205 		}
2206 		/* Else fall through to one of the other strategies... */
2207 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2208 		     "Using discard-least-changes instead\n");
2209 	case ASB_DISCARD_ZERO_CHG:
2210 		if (ch_peer == 0 && ch_self == 0) {
2211 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2212 				? -1 : 1;
2213 			break;
2214 		} else {
2215 			if (ch_peer == 0) { rv =  1; break; }
2216 			if (ch_self == 0) { rv = -1; break; }
2217 		}
2218 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2219 			break;
2220 	case ASB_DISCARD_LEAST_CHG:
2221 		if	(ch_self < ch_peer)
2222 			rv = -1;
2223 		else if (ch_self > ch_peer)
2224 			rv =  1;
2225 		else /* ( ch_self == ch_peer ) */
2226 		     /* Well, then use something else. */
2227 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2228 				? -1 : 1;
2229 		break;
2230 	case ASB_DISCARD_LOCAL:
2231 		rv = -1;
2232 		break;
2233 	case ASB_DISCARD_REMOTE:
2234 		rv =  1;
2235 	}
2236 
2237 	return rv;
2238 }
2239 
2240 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2241 {
2242 	int self, peer, hg, rv = -100;
2243 
2244 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2245 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2246 
2247 	switch (mdev->net_conf->after_sb_1p) {
2248 	case ASB_DISCARD_YOUNGER_PRI:
2249 	case ASB_DISCARD_OLDER_PRI:
2250 	case ASB_DISCARD_LEAST_CHG:
2251 	case ASB_DISCARD_LOCAL:
2252 	case ASB_DISCARD_REMOTE:
2253 		dev_err(DEV, "Configuration error.\n");
2254 		break;
2255 	case ASB_DISCONNECT:
2256 		break;
2257 	case ASB_CONSENSUS:
2258 		hg = drbd_asb_recover_0p(mdev);
2259 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2260 			rv = hg;
2261 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2262 			rv = hg;
2263 		break;
2264 	case ASB_VIOLENTLY:
2265 		rv = drbd_asb_recover_0p(mdev);
2266 		break;
2267 	case ASB_DISCARD_SECONDARY:
2268 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2269 	case ASB_CALL_HELPER:
2270 		hg = drbd_asb_recover_0p(mdev);
2271 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2272 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2273 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2274 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2275 			  * we do not need to wait for the after state change work either. */
2276 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2277 			if (self != SS_SUCCESS) {
2278 				drbd_khelper(mdev, "pri-lost-after-sb");
2279 			} else {
2280 				dev_warn(DEV, "Successfully gave up primary role.\n");
2281 				rv = hg;
2282 			}
2283 		} else
2284 			rv = hg;
2285 	}
2286 
2287 	return rv;
2288 }
2289 
2290 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2291 {
2292 	int self, peer, hg, rv = -100;
2293 
2294 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2295 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2296 
2297 	switch (mdev->net_conf->after_sb_2p) {
2298 	case ASB_DISCARD_YOUNGER_PRI:
2299 	case ASB_DISCARD_OLDER_PRI:
2300 	case ASB_DISCARD_LEAST_CHG:
2301 	case ASB_DISCARD_LOCAL:
2302 	case ASB_DISCARD_REMOTE:
2303 	case ASB_CONSENSUS:
2304 	case ASB_DISCARD_SECONDARY:
2305 		dev_err(DEV, "Configuration error.\n");
2306 		break;
2307 	case ASB_VIOLENTLY:
2308 		rv = drbd_asb_recover_0p(mdev);
2309 		break;
2310 	case ASB_DISCONNECT:
2311 		break;
2312 	case ASB_CALL_HELPER:
2313 		hg = drbd_asb_recover_0p(mdev);
2314 		if (hg == -1) {
2315 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2316 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2317 			  * we do not need to wait for the after state change work either. */
2318 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2319 			if (self != SS_SUCCESS) {
2320 				drbd_khelper(mdev, "pri-lost-after-sb");
2321 			} else {
2322 				dev_warn(DEV, "Successfully gave up primary role.\n");
2323 				rv = hg;
2324 			}
2325 		} else
2326 			rv = hg;
2327 	}
2328 
2329 	return rv;
2330 }
2331 
2332 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2333 			   u64 bits, u64 flags)
2334 {
2335 	if (!uuid) {
2336 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2337 		return;
2338 	}
2339 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2340 	     text,
2341 	     (unsigned long long)uuid[UI_CURRENT],
2342 	     (unsigned long long)uuid[UI_BITMAP],
2343 	     (unsigned long long)uuid[UI_HISTORY_START],
2344 	     (unsigned long long)uuid[UI_HISTORY_END],
2345 	     (unsigned long long)bits,
2346 	     (unsigned long long)flags);
2347 }
2348 
2349 /*
2350   100	after split brain try auto recover
2351     2	C_SYNC_SOURCE set BitMap
2352     1	C_SYNC_SOURCE use BitMap
2353     0	no Sync
2354    -1	C_SYNC_TARGET use BitMap
2355    -2	C_SYNC_TARGET set BitMap
2356  -100	after split brain, disconnect
2357 -1000	unrelated data
2358  */
2359 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2360 {
2361 	u64 self, peer;
2362 	int i, j;
2363 
2364 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2365 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2366 
2367 	*rule_nr = 10;
2368 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2369 		return 0;
2370 
2371 	*rule_nr = 20;
2372 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2373 	     peer != UUID_JUST_CREATED)
2374 		return -2;
2375 
2376 	*rule_nr = 30;
2377 	if (self != UUID_JUST_CREATED &&
2378 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2379 		return 2;
2380 
2381 	if (self == peer) {
2382 		int rct, dc; /* roles at crash time */
2383 
2384 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2385 
2386 			if (mdev->agreed_pro_version < 91)
2387 				return -1001;
2388 
2389 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2390 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2391 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2392 				drbd_uuid_set_bm(mdev, 0UL);
2393 
2394 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2395 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2396 				*rule_nr = 34;
2397 			} else {
2398 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2399 				*rule_nr = 36;
2400 			}
2401 
2402 			return 1;
2403 		}
2404 
2405 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2406 
2407 			if (mdev->agreed_pro_version < 91)
2408 				return -1001;
2409 
2410 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2411 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2412 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2413 
2414 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2415 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2416 				mdev->p_uuid[UI_BITMAP] = 0UL;
2417 
2418 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2419 				*rule_nr = 35;
2420 			} else {
2421 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2422 				*rule_nr = 37;
2423 			}
2424 
2425 			return -1;
2426 		}
2427 
2428 		/* Common power [off|failure] */
2429 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2430 			(mdev->p_uuid[UI_FLAGS] & 2);
2431 		/* lowest bit is set when we were primary,
2432 		 * next bit (weight 2) is set when peer was primary */
2433 		*rule_nr = 40;
2434 
2435 		switch (rct) {
2436 		case 0: /* !self_pri && !peer_pri */ return 0;
2437 		case 1: /*  self_pri && !peer_pri */ return 1;
2438 		case 2: /* !self_pri &&  peer_pri */ return -1;
2439 		case 3: /*  self_pri &&  peer_pri */
2440 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2441 			return dc ? -1 : 1;
2442 		}
2443 	}
2444 
2445 	*rule_nr = 50;
2446 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2447 	if (self == peer)
2448 		return -1;
2449 
2450 	*rule_nr = 51;
2451 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2452 	if (self == peer) {
2453 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2454 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2455 		if (self == peer) {
2456 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2457 			   resync as sync source modifications of the peer's UUIDs. */
2458 
2459 			if (mdev->agreed_pro_version < 91)
2460 				return -1001;
2461 
2462 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2463 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2464 			return -1;
2465 		}
2466 	}
2467 
2468 	*rule_nr = 60;
2469 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2470 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2471 		peer = mdev->p_uuid[i] & ~((u64)1);
2472 		if (self == peer)
2473 			return -2;
2474 	}
2475 
2476 	*rule_nr = 70;
2477 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2478 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2479 	if (self == peer)
2480 		return 1;
2481 
2482 	*rule_nr = 71;
2483 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2484 	if (self == peer) {
2485 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2486 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2487 		if (self == peer) {
2488 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2489 			   resync as sync source modifications of our UUIDs. */
2490 
2491 			if (mdev->agreed_pro_version < 91)
2492 				return -1001;
2493 
2494 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2495 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2496 
2497 			dev_info(DEV, "Undid last start of resync:\n");
2498 
2499 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2500 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2501 
2502 			return 1;
2503 		}
2504 	}
2505 
2506 
2507 	*rule_nr = 80;
2508 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2509 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2510 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2511 		if (self == peer)
2512 			return 2;
2513 	}
2514 
2515 	*rule_nr = 90;
2516 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2517 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2518 	if (self == peer && self != ((u64)0))
2519 		return 100;
2520 
2521 	*rule_nr = 100;
2522 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2523 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2524 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2525 			peer = mdev->p_uuid[j] & ~((u64)1);
2526 			if (self == peer)
2527 				return -100;
2528 		}
2529 	}
2530 
2531 	return -1000;
2532 }
2533 
2534 /* drbd_sync_handshake() returns the new conn state on success, or
2535    CONN_MASK (-1) on failure.
2536  */
2537 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2538 					   enum drbd_disk_state peer_disk) __must_hold(local)
2539 {
2540 	int hg, rule_nr;
2541 	enum drbd_conns rv = C_MASK;
2542 	enum drbd_disk_state mydisk;
2543 
2544 	mydisk = mdev->state.disk;
2545 	if (mydisk == D_NEGOTIATING)
2546 		mydisk = mdev->new_state_tmp.disk;
2547 
2548 	dev_info(DEV, "drbd_sync_handshake:\n");
2549 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2550 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2551 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2552 
2553 	hg = drbd_uuid_compare(mdev, &rule_nr);
2554 
2555 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2556 
2557 	if (hg == -1000) {
2558 		dev_alert(DEV, "Unrelated data, aborting!\n");
2559 		return C_MASK;
2560 	}
2561 	if (hg == -1001) {
2562 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2563 		return C_MASK;
2564 	}
2565 
2566 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2567 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2568 		int f = (hg == -100) || abs(hg) == 2;
2569 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2570 		if (f)
2571 			hg = hg*2;
2572 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2573 		     hg > 0 ? "source" : "target");
2574 	}
2575 
2576 	if (abs(hg) == 100)
2577 		drbd_khelper(mdev, "initial-split-brain");
2578 
2579 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2580 		int pcount = (mdev->state.role == R_PRIMARY)
2581 			   + (peer_role == R_PRIMARY);
2582 		int forced = (hg == -100);
2583 
2584 		switch (pcount) {
2585 		case 0:
2586 			hg = drbd_asb_recover_0p(mdev);
2587 			break;
2588 		case 1:
2589 			hg = drbd_asb_recover_1p(mdev);
2590 			break;
2591 		case 2:
2592 			hg = drbd_asb_recover_2p(mdev);
2593 			break;
2594 		}
2595 		if (abs(hg) < 100) {
2596 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2597 			     "automatically solved. Sync from %s node\n",
2598 			     pcount, (hg < 0) ? "peer" : "this");
2599 			if (forced) {
2600 				dev_warn(DEV, "Doing a full sync, since"
2601 				     " UUIDs where ambiguous.\n");
2602 				hg = hg*2;
2603 			}
2604 		}
2605 	}
2606 
2607 	if (hg == -100) {
2608 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2609 			hg = -1;
2610 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2611 			hg = 1;
2612 
2613 		if (abs(hg) < 100)
2614 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2615 			     "Sync from %s node\n",
2616 			     (hg < 0) ? "peer" : "this");
2617 	}
2618 
2619 	if (hg == -100) {
2620 		/* FIXME this log message is not correct if we end up here
2621 		 * after an attempted attach on a diskless node.
2622 		 * We just refuse to attach -- well, we drop the "connection"
2623 		 * to that disk, in a way... */
2624 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2625 		drbd_khelper(mdev, "split-brain");
2626 		return C_MASK;
2627 	}
2628 
2629 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2630 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2631 		return C_MASK;
2632 	}
2633 
2634 	if (hg < 0 && /* by intention we do not use mydisk here. */
2635 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2636 		switch (mdev->net_conf->rr_conflict) {
2637 		case ASB_CALL_HELPER:
2638 			drbd_khelper(mdev, "pri-lost");
2639 			/* fall through */
2640 		case ASB_DISCONNECT:
2641 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2642 			return C_MASK;
2643 		case ASB_VIOLENTLY:
2644 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2645 			     "assumption\n");
2646 		}
2647 	}
2648 
2649 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2650 		if (hg == 0)
2651 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2652 		else
2653 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2654 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2655 				 abs(hg) >= 2 ? "full" : "bit-map based");
2656 		return C_MASK;
2657 	}
2658 
2659 	if (abs(hg) >= 2) {
2660 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2661 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2662 			return C_MASK;
2663 	}
2664 
2665 	if (hg > 0) { /* become sync source. */
2666 		rv = C_WF_BITMAP_S;
2667 	} else if (hg < 0) { /* become sync target */
2668 		rv = C_WF_BITMAP_T;
2669 	} else {
2670 		rv = C_CONNECTED;
2671 		if (drbd_bm_total_weight(mdev)) {
2672 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2673 			     drbd_bm_total_weight(mdev));
2674 		}
2675 	}
2676 
2677 	return rv;
2678 }
2679 
2680 /* returns 1 if invalid */
2681 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2682 {
2683 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2684 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2685 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2686 		return 0;
2687 
2688 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2689 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2690 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2691 		return 1;
2692 
2693 	/* everything else is valid if they are equal on both sides. */
2694 	if (peer == self)
2695 		return 0;
2696 
2697 	/* everything es is invalid. */
2698 	return 1;
2699 }
2700 
2701 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2702 {
2703 	struct p_protocol *p = (struct p_protocol *)h;
2704 	int header_size, data_size;
2705 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2706 	int p_want_lose, p_two_primaries, cf;
2707 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2708 
2709 	header_size = sizeof(*p) - sizeof(*h);
2710 	data_size   = h->length  - header_size;
2711 
2712 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2713 		return FALSE;
2714 
2715 	p_proto		= be32_to_cpu(p->protocol);
2716 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2717 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2718 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2719 	p_two_primaries = be32_to_cpu(p->two_primaries);
2720 	cf		= be32_to_cpu(p->conn_flags);
2721 	p_want_lose = cf & CF_WANT_LOSE;
2722 
2723 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2724 
2725 	if (cf & CF_DRY_RUN)
2726 		set_bit(CONN_DRY_RUN, &mdev->flags);
2727 
2728 	if (p_proto != mdev->net_conf->wire_protocol) {
2729 		dev_err(DEV, "incompatible communication protocols\n");
2730 		goto disconnect;
2731 	}
2732 
2733 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2734 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2735 		goto disconnect;
2736 	}
2737 
2738 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2739 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2740 		goto disconnect;
2741 	}
2742 
2743 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2744 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2745 		goto disconnect;
2746 	}
2747 
2748 	if (p_want_lose && mdev->net_conf->want_lose) {
2749 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2750 		goto disconnect;
2751 	}
2752 
2753 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2754 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2755 		goto disconnect;
2756 	}
2757 
2758 	if (mdev->agreed_pro_version >= 87) {
2759 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2760 
2761 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2762 			return FALSE;
2763 
2764 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2765 		if (strcmp(p_integrity_alg, my_alg)) {
2766 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2767 			goto disconnect;
2768 		}
2769 		dev_info(DEV, "data-integrity-alg: %s\n",
2770 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2771 	}
2772 
2773 	return TRUE;
2774 
2775 disconnect:
2776 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2777 	return FALSE;
2778 }
2779 
2780 /* helper function
2781  * input: alg name, feature name
2782  * return: NULL (alg name was "")
2783  *         ERR_PTR(error) if something goes wrong
2784  *         or the crypto hash ptr, if it worked out ok. */
2785 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2786 		const char *alg, const char *name)
2787 {
2788 	struct crypto_hash *tfm;
2789 
2790 	if (!alg[0])
2791 		return NULL;
2792 
2793 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2794 	if (IS_ERR(tfm)) {
2795 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2796 			alg, name, PTR_ERR(tfm));
2797 		return tfm;
2798 	}
2799 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2800 		crypto_free_hash(tfm);
2801 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2802 		return ERR_PTR(-EINVAL);
2803 	}
2804 	return tfm;
2805 }
2806 
2807 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2808 {
2809 	int ok = TRUE;
2810 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2811 	unsigned int header_size, data_size, exp_max_sz;
2812 	struct crypto_hash *verify_tfm = NULL;
2813 	struct crypto_hash *csums_tfm = NULL;
2814 	const int apv = mdev->agreed_pro_version;
2815 
2816 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2817 		    : apv == 88 ? sizeof(struct p_rs_param)
2818 					+ SHARED_SECRET_MAX
2819 		    : /* 89 */    sizeof(struct p_rs_param_89);
2820 
2821 	if (h->length > exp_max_sz) {
2822 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2823 		    h->length, exp_max_sz);
2824 		return FALSE;
2825 	}
2826 
2827 	if (apv <= 88) {
2828 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2829 		data_size   = h->length  - header_size;
2830 	} else /* apv >= 89 */ {
2831 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2832 		data_size   = h->length  - header_size;
2833 		D_ASSERT(data_size == 0);
2834 	}
2835 
2836 	/* initialize verify_alg and csums_alg */
2837 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2838 
2839 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2840 		return FALSE;
2841 
2842 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2843 
2844 	if (apv >= 88) {
2845 		if (apv == 88) {
2846 			if (data_size > SHARED_SECRET_MAX) {
2847 				dev_err(DEV, "verify-alg too long, "
2848 				    "peer wants %u, accepting only %u byte\n",
2849 						data_size, SHARED_SECRET_MAX);
2850 				return FALSE;
2851 			}
2852 
2853 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2854 				return FALSE;
2855 
2856 			/* we expect NUL terminated string */
2857 			/* but just in case someone tries to be evil */
2858 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2859 			p->verify_alg[data_size-1] = 0;
2860 
2861 		} else /* apv >= 89 */ {
2862 			/* we still expect NUL terminated strings */
2863 			/* but just in case someone tries to be evil */
2864 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2865 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2866 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2867 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2868 		}
2869 
2870 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2871 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2872 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2873 				    mdev->sync_conf.verify_alg, p->verify_alg);
2874 				goto disconnect;
2875 			}
2876 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2877 					p->verify_alg, "verify-alg");
2878 			if (IS_ERR(verify_tfm)) {
2879 				verify_tfm = NULL;
2880 				goto disconnect;
2881 			}
2882 		}
2883 
2884 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2885 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2886 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2887 				    mdev->sync_conf.csums_alg, p->csums_alg);
2888 				goto disconnect;
2889 			}
2890 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2891 					p->csums_alg, "csums-alg");
2892 			if (IS_ERR(csums_tfm)) {
2893 				csums_tfm = NULL;
2894 				goto disconnect;
2895 			}
2896 		}
2897 
2898 
2899 		spin_lock(&mdev->peer_seq_lock);
2900 		/* lock against drbd_nl_syncer_conf() */
2901 		if (verify_tfm) {
2902 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2903 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2904 			crypto_free_hash(mdev->verify_tfm);
2905 			mdev->verify_tfm = verify_tfm;
2906 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2907 		}
2908 		if (csums_tfm) {
2909 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2910 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2911 			crypto_free_hash(mdev->csums_tfm);
2912 			mdev->csums_tfm = csums_tfm;
2913 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2914 		}
2915 		spin_unlock(&mdev->peer_seq_lock);
2916 	}
2917 
2918 	return ok;
2919 disconnect:
2920 	/* just for completeness: actually not needed,
2921 	 * as this is not reached if csums_tfm was ok. */
2922 	crypto_free_hash(csums_tfm);
2923 	/* but free the verify_tfm again, if csums_tfm did not work out */
2924 	crypto_free_hash(verify_tfm);
2925 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2926 	return FALSE;
2927 }
2928 
2929 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2930 {
2931 	/* sorry, we currently have no working implementation
2932 	 * of distributed TCQ */
2933 }
2934 
2935 /* warn if the arguments differ by more than 12.5% */
2936 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2937 	const char *s, sector_t a, sector_t b)
2938 {
2939 	sector_t d;
2940 	if (a == 0 || b == 0)
2941 		return;
2942 	d = (a > b) ? (a - b) : (b - a);
2943 	if (d > (a>>3) || d > (b>>3))
2944 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2945 		     (unsigned long long)a, (unsigned long long)b);
2946 }
2947 
2948 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2949 {
2950 	struct p_sizes *p = (struct p_sizes *)h;
2951 	enum determine_dev_size dd = unchanged;
2952 	unsigned int max_seg_s;
2953 	sector_t p_size, p_usize, my_usize;
2954 	int ldsc = 0; /* local disk size changed */
2955 	enum dds_flags ddsf;
2956 
2957 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2958 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2959 		return FALSE;
2960 
2961 	p_size = be64_to_cpu(p->d_size);
2962 	p_usize = be64_to_cpu(p->u_size);
2963 
2964 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2965 		dev_err(DEV, "some backing storage is needed\n");
2966 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2967 		return FALSE;
2968 	}
2969 
2970 	/* just store the peer's disk size for now.
2971 	 * we still need to figure out whether we accept that. */
2972 	mdev->p_size = p_size;
2973 
2974 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2975 	if (get_ldev(mdev)) {
2976 		warn_if_differ_considerably(mdev, "lower level device sizes",
2977 			   p_size, drbd_get_max_capacity(mdev->ldev));
2978 		warn_if_differ_considerably(mdev, "user requested size",
2979 					    p_usize, mdev->ldev->dc.disk_size);
2980 
2981 		/* if this is the first connect, or an otherwise expected
2982 		 * param exchange, choose the minimum */
2983 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2984 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2985 					     p_usize);
2986 
2987 		my_usize = mdev->ldev->dc.disk_size;
2988 
2989 		if (mdev->ldev->dc.disk_size != p_usize) {
2990 			mdev->ldev->dc.disk_size = p_usize;
2991 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2992 			     (unsigned long)mdev->ldev->dc.disk_size);
2993 		}
2994 
2995 		/* Never shrink a device with usable data during connect.
2996 		   But allow online shrinking if we are connected. */
2997 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2998 		   drbd_get_capacity(mdev->this_bdev) &&
2999 		   mdev->state.disk >= D_OUTDATED &&
3000 		   mdev->state.conn < C_CONNECTED) {
3001 			dev_err(DEV, "The peer's disk size is too small!\n");
3002 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3003 			mdev->ldev->dc.disk_size = my_usize;
3004 			put_ldev(mdev);
3005 			return FALSE;
3006 		}
3007 		put_ldev(mdev);
3008 	}
3009 #undef min_not_zero
3010 
3011 	ddsf = be16_to_cpu(p->dds_flags);
3012 	if (get_ldev(mdev)) {
3013 		dd = drbd_determin_dev_size(mdev, ddsf);
3014 		put_ldev(mdev);
3015 		if (dd == dev_size_error)
3016 			return FALSE;
3017 		drbd_md_sync(mdev);
3018 	} else {
3019 		/* I am diskless, need to accept the peer's size. */
3020 		drbd_set_my_capacity(mdev, p_size);
3021 	}
3022 
3023 	if (get_ldev(mdev)) {
3024 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3025 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3026 			ldsc = 1;
3027 		}
3028 
3029 		if (mdev->agreed_pro_version < 94)
3030 			max_seg_s = be32_to_cpu(p->max_segment_size);
3031 		else /* drbd 8.3.8 onwards */
3032 			max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3033 
3034 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3035 			drbd_setup_queue_param(mdev, max_seg_s);
3036 
3037 		drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3038 		put_ldev(mdev);
3039 	}
3040 
3041 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3042 		if (be64_to_cpu(p->c_size) !=
3043 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3044 			/* we have different sizes, probably peer
3045 			 * needs to know my new size... */
3046 			drbd_send_sizes(mdev, 0, ddsf);
3047 		}
3048 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3049 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3050 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3051 			    mdev->state.disk >= D_INCONSISTENT) {
3052 				if (ddsf & DDSF_NO_RESYNC)
3053 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3054 				else
3055 					resync_after_online_grow(mdev);
3056 			} else
3057 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3058 		}
3059 	}
3060 
3061 	return TRUE;
3062 }
3063 
3064 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3065 {
3066 	struct p_uuids *p = (struct p_uuids *)h;
3067 	u64 *p_uuid;
3068 	int i;
3069 
3070 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3071 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3072 		return FALSE;
3073 
3074 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3075 
3076 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3077 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3078 
3079 	kfree(mdev->p_uuid);
3080 	mdev->p_uuid = p_uuid;
3081 
3082 	if (mdev->state.conn < C_CONNECTED &&
3083 	    mdev->state.disk < D_INCONSISTENT &&
3084 	    mdev->state.role == R_PRIMARY &&
3085 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3086 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3087 		    (unsigned long long)mdev->ed_uuid);
3088 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3089 		return FALSE;
3090 	}
3091 
3092 	if (get_ldev(mdev)) {
3093 		int skip_initial_sync =
3094 			mdev->state.conn == C_CONNECTED &&
3095 			mdev->agreed_pro_version >= 90 &&
3096 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3097 			(p_uuid[UI_FLAGS] & 8);
3098 		if (skip_initial_sync) {
3099 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3100 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3101 					"clear_n_write from receive_uuids");
3102 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3103 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3104 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3105 					CS_VERBOSE, NULL);
3106 			drbd_md_sync(mdev);
3107 		}
3108 		put_ldev(mdev);
3109 	}
3110 
3111 	/* Before we test for the disk state, we should wait until an eventually
3112 	   ongoing cluster wide state change is finished. That is important if
3113 	   we are primary and are detaching from our disk. We need to see the
3114 	   new disk state... */
3115 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3116 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3117 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3118 
3119 	return TRUE;
3120 }
3121 
3122 /**
3123  * convert_state() - Converts the peer's view of the cluster state to our point of view
3124  * @ps:		The state as seen by the peer.
3125  */
3126 static union drbd_state convert_state(union drbd_state ps)
3127 {
3128 	union drbd_state ms;
3129 
3130 	static enum drbd_conns c_tab[] = {
3131 		[C_CONNECTED] = C_CONNECTED,
3132 
3133 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3134 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3135 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3136 		[C_VERIFY_S]       = C_VERIFY_T,
3137 		[C_MASK]   = C_MASK,
3138 	};
3139 
3140 	ms.i = ps.i;
3141 
3142 	ms.conn = c_tab[ps.conn];
3143 	ms.peer = ps.role;
3144 	ms.role = ps.peer;
3145 	ms.pdsk = ps.disk;
3146 	ms.disk = ps.pdsk;
3147 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3148 
3149 	return ms;
3150 }
3151 
3152 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3153 {
3154 	struct p_req_state *p = (struct p_req_state *)h;
3155 	union drbd_state mask, val;
3156 	int rv;
3157 
3158 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3159 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3160 		return FALSE;
3161 
3162 	mask.i = be32_to_cpu(p->mask);
3163 	val.i = be32_to_cpu(p->val);
3164 
3165 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3166 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3167 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3168 		return TRUE;
3169 	}
3170 
3171 	mask = convert_state(mask);
3172 	val = convert_state(val);
3173 
3174 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3175 
3176 	drbd_send_sr_reply(mdev, rv);
3177 	drbd_md_sync(mdev);
3178 
3179 	return TRUE;
3180 }
3181 
3182 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3183 {
3184 	struct p_state *p = (struct p_state *)h;
3185 	enum drbd_conns nconn, oconn;
3186 	union drbd_state ns, peer_state;
3187 	enum drbd_disk_state real_peer_disk;
3188 	int rv;
3189 
3190 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3191 		return FALSE;
3192 
3193 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3194 		return FALSE;
3195 
3196 	peer_state.i = be32_to_cpu(p->state);
3197 
3198 	real_peer_disk = peer_state.disk;
3199 	if (peer_state.disk == D_NEGOTIATING) {
3200 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3201 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3202 	}
3203 
3204 	spin_lock_irq(&mdev->req_lock);
3205  retry:
3206 	oconn = nconn = mdev->state.conn;
3207 	spin_unlock_irq(&mdev->req_lock);
3208 
3209 	if (nconn == C_WF_REPORT_PARAMS)
3210 		nconn = C_CONNECTED;
3211 
3212 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3213 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3214 		int cr; /* consider resync */
3215 
3216 		/* if we established a new connection */
3217 		cr  = (oconn < C_CONNECTED);
3218 		/* if we had an established connection
3219 		 * and one of the nodes newly attaches a disk */
3220 		cr |= (oconn == C_CONNECTED &&
3221 		       (peer_state.disk == D_NEGOTIATING ||
3222 			mdev->state.disk == D_NEGOTIATING));
3223 		/* if we have both been inconsistent, and the peer has been
3224 		 * forced to be UpToDate with --overwrite-data */
3225 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3226 		/* if we had been plain connected, and the admin requested to
3227 		 * start a sync by "invalidate" or "invalidate-remote" */
3228 		cr |= (oconn == C_CONNECTED &&
3229 				(peer_state.conn >= C_STARTING_SYNC_S &&
3230 				 peer_state.conn <= C_WF_BITMAP_T));
3231 
3232 		if (cr)
3233 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3234 
3235 		put_ldev(mdev);
3236 		if (nconn == C_MASK) {
3237 			nconn = C_CONNECTED;
3238 			if (mdev->state.disk == D_NEGOTIATING) {
3239 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3240 			} else if (peer_state.disk == D_NEGOTIATING) {
3241 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3242 				peer_state.disk = D_DISKLESS;
3243 				real_peer_disk = D_DISKLESS;
3244 			} else {
3245 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3246 					return FALSE;
3247 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3248 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3249 				return FALSE;
3250 			}
3251 		}
3252 	}
3253 
3254 	spin_lock_irq(&mdev->req_lock);
3255 	if (mdev->state.conn != oconn)
3256 		goto retry;
3257 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3258 	ns.i = mdev->state.i;
3259 	ns.conn = nconn;
3260 	ns.peer = peer_state.role;
3261 	ns.pdsk = real_peer_disk;
3262 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3263 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3264 		ns.disk = mdev->new_state_tmp.disk;
3265 
3266 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3267 	ns = mdev->state;
3268 	spin_unlock_irq(&mdev->req_lock);
3269 
3270 	if (rv < SS_SUCCESS) {
3271 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3272 		return FALSE;
3273 	}
3274 
3275 	if (oconn > C_WF_REPORT_PARAMS) {
3276 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3277 		    peer_state.disk != D_NEGOTIATING ) {
3278 			/* we want resync, peer has not yet decided to sync... */
3279 			/* Nowadays only used when forcing a node into primary role and
3280 			   setting its disk to UpToDate with that */
3281 			drbd_send_uuids(mdev);
3282 			drbd_send_state(mdev);
3283 		}
3284 	}
3285 
3286 	mdev->net_conf->want_lose = 0;
3287 
3288 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3289 
3290 	return TRUE;
3291 }
3292 
3293 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3294 {
3295 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3296 
3297 	wait_event(mdev->misc_wait,
3298 		   mdev->state.conn == C_WF_SYNC_UUID ||
3299 		   mdev->state.conn < C_CONNECTED ||
3300 		   mdev->state.disk < D_NEGOTIATING);
3301 
3302 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3303 
3304 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3305 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3306 		return FALSE;
3307 
3308 	/* Here the _drbd_uuid_ functions are right, current should
3309 	   _not_ be rotated into the history */
3310 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3311 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3312 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3313 
3314 		drbd_start_resync(mdev, C_SYNC_TARGET);
3315 
3316 		put_ldev(mdev);
3317 	} else
3318 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3319 
3320 	return TRUE;
3321 }
3322 
3323 enum receive_bitmap_ret { OK, DONE, FAILED };
3324 
3325 static enum receive_bitmap_ret
3326 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3327 	unsigned long *buffer, struct bm_xfer_ctx *c)
3328 {
3329 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3330 	unsigned want = num_words * sizeof(long);
3331 
3332 	if (want != h->length) {
3333 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3334 		return FAILED;
3335 	}
3336 	if (want == 0)
3337 		return DONE;
3338 	if (drbd_recv(mdev, buffer, want) != want)
3339 		return FAILED;
3340 
3341 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3342 
3343 	c->word_offset += num_words;
3344 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3345 	if (c->bit_offset > c->bm_bits)
3346 		c->bit_offset = c->bm_bits;
3347 
3348 	return OK;
3349 }
3350 
3351 static enum receive_bitmap_ret
3352 recv_bm_rle_bits(struct drbd_conf *mdev,
3353 		struct p_compressed_bm *p,
3354 		struct bm_xfer_ctx *c)
3355 {
3356 	struct bitstream bs;
3357 	u64 look_ahead;
3358 	u64 rl;
3359 	u64 tmp;
3360 	unsigned long s = c->bit_offset;
3361 	unsigned long e;
3362 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3363 	int toggle = DCBP_get_start(p);
3364 	int have;
3365 	int bits;
3366 
3367 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3368 
3369 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3370 	if (bits < 0)
3371 		return FAILED;
3372 
3373 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3374 		bits = vli_decode_bits(&rl, look_ahead);
3375 		if (bits <= 0)
3376 			return FAILED;
3377 
3378 		if (toggle) {
3379 			e = s + rl -1;
3380 			if (e >= c->bm_bits) {
3381 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3382 				return FAILED;
3383 			}
3384 			_drbd_bm_set_bits(mdev, s, e);
3385 		}
3386 
3387 		if (have < bits) {
3388 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3389 				have, bits, look_ahead,
3390 				(unsigned int)(bs.cur.b - p->code),
3391 				(unsigned int)bs.buf_len);
3392 			return FAILED;
3393 		}
3394 		look_ahead >>= bits;
3395 		have -= bits;
3396 
3397 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3398 		if (bits < 0)
3399 			return FAILED;
3400 		look_ahead |= tmp << have;
3401 		have += bits;
3402 	}
3403 
3404 	c->bit_offset = s;
3405 	bm_xfer_ctx_bit_to_word_offset(c);
3406 
3407 	return (s == c->bm_bits) ? DONE : OK;
3408 }
3409 
3410 static enum receive_bitmap_ret
3411 decode_bitmap_c(struct drbd_conf *mdev,
3412 		struct p_compressed_bm *p,
3413 		struct bm_xfer_ctx *c)
3414 {
3415 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3416 		return recv_bm_rle_bits(mdev, p, c);
3417 
3418 	/* other variants had been implemented for evaluation,
3419 	 * but have been dropped as this one turned out to be "best"
3420 	 * during all our tests. */
3421 
3422 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3423 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3424 	return FAILED;
3425 }
3426 
3427 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3428 		const char *direction, struct bm_xfer_ctx *c)
3429 {
3430 	/* what would it take to transfer it "plaintext" */
3431 	unsigned plain = sizeof(struct p_header) *
3432 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3433 		+ c->bm_words * sizeof(long);
3434 	unsigned total = c->bytes[0] + c->bytes[1];
3435 	unsigned r;
3436 
3437 	/* total can not be zero. but just in case: */
3438 	if (total == 0)
3439 		return;
3440 
3441 	/* don't report if not compressed */
3442 	if (total >= plain)
3443 		return;
3444 
3445 	/* total < plain. check for overflow, still */
3446 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3447 		                    : (1000 * total / plain);
3448 
3449 	if (r > 1000)
3450 		r = 1000;
3451 
3452 	r = 1000 - r;
3453 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3454 	     "total %u; compression: %u.%u%%\n",
3455 			direction,
3456 			c->bytes[1], c->packets[1],
3457 			c->bytes[0], c->packets[0],
3458 			total, r/10, r % 10);
3459 }
3460 
3461 /* Since we are processing the bitfield from lower addresses to higher,
3462    it does not matter if the process it in 32 bit chunks or 64 bit
3463    chunks as long as it is little endian. (Understand it as byte stream,
3464    beginning with the lowest byte...) If we would use big endian
3465    we would need to process it from the highest address to the lowest,
3466    in order to be agnostic to the 32 vs 64 bits issue.
3467 
3468    returns 0 on failure, 1 if we successfully received it. */
3469 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3470 {
3471 	struct bm_xfer_ctx c;
3472 	void *buffer;
3473 	enum receive_bitmap_ret ret;
3474 	int ok = FALSE;
3475 
3476 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3477 
3478 	drbd_bm_lock(mdev, "receive bitmap");
3479 
3480 	/* maybe we should use some per thread scratch page,
3481 	 * and allocate that during initial device creation? */
3482 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3483 	if (!buffer) {
3484 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3485 		goto out;
3486 	}
3487 
3488 	c = (struct bm_xfer_ctx) {
3489 		.bm_bits = drbd_bm_bits(mdev),
3490 		.bm_words = drbd_bm_words(mdev),
3491 	};
3492 
3493 	do {
3494 		if (h->command == P_BITMAP) {
3495 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3496 		} else if (h->command == P_COMPRESSED_BITMAP) {
3497 			/* MAYBE: sanity check that we speak proto >= 90,
3498 			 * and the feature is enabled! */
3499 			struct p_compressed_bm *p;
3500 
3501 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3502 				dev_err(DEV, "ReportCBitmap packet too large\n");
3503 				goto out;
3504 			}
3505 			/* use the page buff */
3506 			p = buffer;
3507 			memcpy(p, h, sizeof(*h));
3508 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3509 				goto out;
3510 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3511 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3512 				return FAILED;
3513 			}
3514 			ret = decode_bitmap_c(mdev, p, &c);
3515 		} else {
3516 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3517 			goto out;
3518 		}
3519 
3520 		c.packets[h->command == P_BITMAP]++;
3521 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3522 
3523 		if (ret != OK)
3524 			break;
3525 
3526 		if (!drbd_recv_header(mdev, h))
3527 			goto out;
3528 	} while (ret == OK);
3529 	if (ret == FAILED)
3530 		goto out;
3531 
3532 	INFO_bm_xfer_stats(mdev, "receive", &c);
3533 
3534 	if (mdev->state.conn == C_WF_BITMAP_T) {
3535 		ok = !drbd_send_bitmap(mdev);
3536 		if (!ok)
3537 			goto out;
3538 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3539 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3540 		D_ASSERT(ok == SS_SUCCESS);
3541 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3542 		/* admin may have requested C_DISCONNECTING,
3543 		 * other threads may have noticed network errors */
3544 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3545 		    drbd_conn_str(mdev->state.conn));
3546 	}
3547 
3548 	ok = TRUE;
3549  out:
3550 	drbd_bm_unlock(mdev);
3551 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3552 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3553 	free_page((unsigned long) buffer);
3554 	return ok;
3555 }
3556 
3557 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3558 {
3559 	/* TODO zero copy sink :) */
3560 	static char sink[128];
3561 	int size, want, r;
3562 
3563 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3564 	     h->command, h->length);
3565 
3566 	size = h->length;
3567 	while (size > 0) {
3568 		want = min_t(int, size, sizeof(sink));
3569 		r = drbd_recv(mdev, sink, want);
3570 		ERR_IF(r <= 0) break;
3571 		size -= r;
3572 	}
3573 	return size == 0;
3574 }
3575 
3576 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3577 {
3578 	if (mdev->state.disk >= D_INCONSISTENT)
3579 		drbd_kick_lo(mdev);
3580 
3581 	/* Make sure we've acked all the TCP data associated
3582 	 * with the data requests being unplugged */
3583 	drbd_tcp_quickack(mdev->data.socket);
3584 
3585 	return TRUE;
3586 }
3587 
3588 static void timeval_sub_us(struct timeval* tv, unsigned int us)
3589 {
3590 	tv->tv_sec -= us / 1000000;
3591 	us = us % 1000000;
3592 	if (tv->tv_usec > us) {
3593 		tv->tv_usec += 1000000;
3594 		tv->tv_sec--;
3595 	}
3596 	tv->tv_usec -= us;
3597 }
3598 
3599 static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3600 {
3601 	struct delay_probe *dp;
3602 	struct list_head *le;
3603 	struct timeval now;
3604 	int seq_num;
3605 	int offset;
3606 	int data_delay;
3607 
3608 	seq_num = be32_to_cpu(p->seq_num);
3609 	offset  = be32_to_cpu(p->offset);
3610 
3611 	spin_lock(&mdev->peer_seq_lock);
3612 	if (!list_empty(&mdev->delay_probes)) {
3613 		if (from == USE_DATA_SOCKET)
3614 			le = mdev->delay_probes.next;
3615 		else
3616 			le = mdev->delay_probes.prev;
3617 
3618 		dp = list_entry(le, struct delay_probe, list);
3619 
3620 		if (dp->seq_num == seq_num) {
3621 			list_del(le);
3622 			spin_unlock(&mdev->peer_seq_lock);
3623 			do_gettimeofday(&now);
3624 			timeval_sub_us(&now, offset);
3625 			data_delay =
3626 				now.tv_usec - dp->time.tv_usec +
3627 				(now.tv_sec - dp->time.tv_sec) * 1000000;
3628 
3629 			if (data_delay > 0)
3630 				mdev->data_delay = data_delay;
3631 
3632 			kfree(dp);
3633 			return;
3634 		}
3635 
3636 		if (dp->seq_num > seq_num) {
3637 			spin_unlock(&mdev->peer_seq_lock);
3638 			dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3639 			return; /* Do not alloca a struct delay_probe.... */
3640 		}
3641 	}
3642 	spin_unlock(&mdev->peer_seq_lock);
3643 
3644 	dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3645 	if (!dp) {
3646 		dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3647 		return;
3648 	}
3649 
3650 	dp->seq_num = seq_num;
3651 	do_gettimeofday(&dp->time);
3652 	timeval_sub_us(&dp->time, offset);
3653 
3654 	spin_lock(&mdev->peer_seq_lock);
3655 	if (from == USE_DATA_SOCKET)
3656 		list_add(&dp->list, &mdev->delay_probes);
3657 	else
3658 		list_add_tail(&dp->list, &mdev->delay_probes);
3659 	spin_unlock(&mdev->peer_seq_lock);
3660 }
3661 
3662 static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3663 {
3664 	struct p_delay_probe *p = (struct p_delay_probe *)h;
3665 
3666 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3667 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3668 		return FALSE;
3669 
3670 	got_delay_probe(mdev, USE_DATA_SOCKET, p);
3671 	return TRUE;
3672 }
3673 
3674 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3675 
3676 static drbd_cmd_handler_f drbd_default_handler[] = {
3677 	[P_DATA]	    = receive_Data,
3678 	[P_DATA_REPLY]	    = receive_DataReply,
3679 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3680 	[P_BARRIER]	    = receive_Barrier,
3681 	[P_BITMAP]	    = receive_bitmap,
3682 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3683 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3684 	[P_DATA_REQUEST]    = receive_DataRequest,
3685 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3686 	[P_SYNC_PARAM]	    = receive_SyncParam,
3687 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3688 	[P_PROTOCOL]        = receive_protocol,
3689 	[P_UUIDS]	    = receive_uuids,
3690 	[P_SIZES]	    = receive_sizes,
3691 	[P_STATE]	    = receive_state,
3692 	[P_STATE_CHG_REQ]   = receive_req_state,
3693 	[P_SYNC_UUID]       = receive_sync_uuid,
3694 	[P_OV_REQUEST]      = receive_DataRequest,
3695 	[P_OV_REPLY]        = receive_DataRequest,
3696 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3697 	[P_DELAY_PROBE]     = receive_delay_probe,
3698 	/* anything missing from this table is in
3699 	 * the asender_tbl, see get_asender_cmd */
3700 	[P_MAX_CMD]	    = NULL,
3701 };
3702 
3703 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3704 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3705 
3706 static void drbdd(struct drbd_conf *mdev)
3707 {
3708 	drbd_cmd_handler_f handler;
3709 	struct p_header *header = &mdev->data.rbuf.header;
3710 
3711 	while (get_t_state(&mdev->receiver) == Running) {
3712 		drbd_thread_current_set_cpu(mdev);
3713 		if (!drbd_recv_header(mdev, header)) {
3714 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3715 			break;
3716 		}
3717 
3718 		if (header->command < P_MAX_CMD)
3719 			handler = drbd_cmd_handler[header->command];
3720 		else if (P_MAY_IGNORE < header->command
3721 		     && header->command < P_MAX_OPT_CMD)
3722 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3723 		else if (header->command > P_MAX_OPT_CMD)
3724 			handler = receive_skip;
3725 		else
3726 			handler = NULL;
3727 
3728 		if (unlikely(!handler)) {
3729 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3730 			    header->command, header->length);
3731 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3732 			break;
3733 		}
3734 		if (unlikely(!handler(mdev, header))) {
3735 			dev_err(DEV, "error receiving %s, l: %d!\n",
3736 			    cmdname(header->command), header->length);
3737 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3738 			break;
3739 		}
3740 	}
3741 }
3742 
3743 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3744 {
3745 	struct hlist_head *slot;
3746 	struct hlist_node *pos;
3747 	struct hlist_node *tmp;
3748 	struct drbd_request *req;
3749 	int i;
3750 
3751 	/*
3752 	 * Application READ requests
3753 	 */
3754 	spin_lock_irq(&mdev->req_lock);
3755 	for (i = 0; i < APP_R_HSIZE; i++) {
3756 		slot = mdev->app_reads_hash+i;
3757 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3758 			/* it may (but should not any longer!)
3759 			 * be on the work queue; if that assert triggers,
3760 			 * we need to also grab the
3761 			 * spin_lock_irq(&mdev->data.work.q_lock);
3762 			 * and list_del_init here. */
3763 			D_ASSERT(list_empty(&req->w.list));
3764 			/* It would be nice to complete outside of spinlock.
3765 			 * But this is easier for now. */
3766 			_req_mod(req, connection_lost_while_pending);
3767 		}
3768 	}
3769 	for (i = 0; i < APP_R_HSIZE; i++)
3770 		if (!hlist_empty(mdev->app_reads_hash+i))
3771 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3772 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3773 
3774 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3775 	spin_unlock_irq(&mdev->req_lock);
3776 }
3777 
3778 void drbd_flush_workqueue(struct drbd_conf *mdev)
3779 {
3780 	struct drbd_wq_barrier barr;
3781 
3782 	barr.w.cb = w_prev_work_done;
3783 	init_completion(&barr.done);
3784 	drbd_queue_work(&mdev->data.work, &barr.w);
3785 	wait_for_completion(&barr.done);
3786 }
3787 
3788 static void drbd_disconnect(struct drbd_conf *mdev)
3789 {
3790 	enum drbd_fencing_p fp;
3791 	union drbd_state os, ns;
3792 	int rv = SS_UNKNOWN_ERROR;
3793 	unsigned int i;
3794 
3795 	if (mdev->state.conn == C_STANDALONE)
3796 		return;
3797 	if (mdev->state.conn >= C_WF_CONNECTION)
3798 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3799 				drbd_conn_str(mdev->state.conn));
3800 
3801 	/* asender does not clean up anything. it must not interfere, either */
3802 	drbd_thread_stop(&mdev->asender);
3803 	drbd_free_sock(mdev);
3804 
3805 	spin_lock_irq(&mdev->req_lock);
3806 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3807 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3808 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3809 	spin_unlock_irq(&mdev->req_lock);
3810 
3811 	/* We do not have data structures that would allow us to
3812 	 * get the rs_pending_cnt down to 0 again.
3813 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3814 	 *    the pending RSDataRequest's we have sent.
3815 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3816 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3817 	 *  And no, it is not the sum of the reference counts in the
3818 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3819 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3820 	 *  on the fly. */
3821 	drbd_rs_cancel_all(mdev);
3822 	mdev->rs_total = 0;
3823 	mdev->rs_failed = 0;
3824 	atomic_set(&mdev->rs_pending_cnt, 0);
3825 	wake_up(&mdev->misc_wait);
3826 
3827 	/* make sure syncer is stopped and w_resume_next_sg queued */
3828 	del_timer_sync(&mdev->resync_timer);
3829 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3830 	resync_timer_fn((unsigned long)mdev);
3831 
3832 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3833 	 * w_make_resync_request etc. which may still be on the worker queue
3834 	 * to be "canceled" */
3835 	drbd_flush_workqueue(mdev);
3836 
3837 	/* This also does reclaim_net_ee().  If we do this too early, we might
3838 	 * miss some resync ee and pages.*/
3839 	drbd_process_done_ee(mdev);
3840 
3841 	kfree(mdev->p_uuid);
3842 	mdev->p_uuid = NULL;
3843 
3844 	if (!mdev->state.susp)
3845 		tl_clear(mdev);
3846 
3847 	drbd_fail_pending_reads(mdev);
3848 
3849 	dev_info(DEV, "Connection closed\n");
3850 
3851 	drbd_md_sync(mdev);
3852 
3853 	fp = FP_DONT_CARE;
3854 	if (get_ldev(mdev)) {
3855 		fp = mdev->ldev->dc.fencing;
3856 		put_ldev(mdev);
3857 	}
3858 
3859 	if (mdev->state.role == R_PRIMARY) {
3860 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3861 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3862 			drbd_request_state(mdev, NS(pdsk, nps));
3863 		}
3864 	}
3865 
3866 	spin_lock_irq(&mdev->req_lock);
3867 	os = mdev->state;
3868 	if (os.conn >= C_UNCONNECTED) {
3869 		/* Do not restart in case we are C_DISCONNECTING */
3870 		ns = os;
3871 		ns.conn = C_UNCONNECTED;
3872 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3873 	}
3874 	spin_unlock_irq(&mdev->req_lock);
3875 
3876 	if (os.conn == C_DISCONNECTING) {
3877 		struct hlist_head *h;
3878 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3879 
3880 		/* we must not free the tl_hash
3881 		 * while application io is still on the fly */
3882 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3883 
3884 		spin_lock_irq(&mdev->req_lock);
3885 		/* paranoia code */
3886 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3887 			if (h->first)
3888 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3889 						(int)(h - mdev->ee_hash), h->first);
3890 		kfree(mdev->ee_hash);
3891 		mdev->ee_hash = NULL;
3892 		mdev->ee_hash_s = 0;
3893 
3894 		/* paranoia code */
3895 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3896 			if (h->first)
3897 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3898 						(int)(h - mdev->tl_hash), h->first);
3899 		kfree(mdev->tl_hash);
3900 		mdev->tl_hash = NULL;
3901 		mdev->tl_hash_s = 0;
3902 		spin_unlock_irq(&mdev->req_lock);
3903 
3904 		crypto_free_hash(mdev->cram_hmac_tfm);
3905 		mdev->cram_hmac_tfm = NULL;
3906 
3907 		kfree(mdev->net_conf);
3908 		mdev->net_conf = NULL;
3909 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3910 	}
3911 
3912 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3913 	 * want to use SO_LINGER, because apparently it can be deferred for
3914 	 * more than 20 seconds (longest time I checked).
3915 	 *
3916 	 * Actually we don't care for exactly when the network stack does its
3917 	 * put_page(), but release our reference on these pages right here.
3918 	 */
3919 	i = drbd_release_ee(mdev, &mdev->net_ee);
3920 	if (i)
3921 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3922 	i = atomic_read(&mdev->pp_in_use);
3923 	if (i)
3924 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3925 
3926 	D_ASSERT(list_empty(&mdev->read_ee));
3927 	D_ASSERT(list_empty(&mdev->active_ee));
3928 	D_ASSERT(list_empty(&mdev->sync_ee));
3929 	D_ASSERT(list_empty(&mdev->done_ee));
3930 
3931 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3932 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3933 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3934 }
3935 
3936 /*
3937  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3938  * we can agree on is stored in agreed_pro_version.
3939  *
3940  * feature flags and the reserved array should be enough room for future
3941  * enhancements of the handshake protocol, and possible plugins...
3942  *
3943  * for now, they are expected to be zero, but ignored.
3944  */
3945 static int drbd_send_handshake(struct drbd_conf *mdev)
3946 {
3947 	/* ASSERT current == mdev->receiver ... */
3948 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3949 	int ok;
3950 
3951 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3952 		dev_err(DEV, "interrupted during initial handshake\n");
3953 		return 0; /* interrupted. not ok. */
3954 	}
3955 
3956 	if (mdev->data.socket == NULL) {
3957 		mutex_unlock(&mdev->data.mutex);
3958 		return 0;
3959 	}
3960 
3961 	memset(p, 0, sizeof(*p));
3962 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3963 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3964 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3965 			     (struct p_header *)p, sizeof(*p), 0 );
3966 	mutex_unlock(&mdev->data.mutex);
3967 	return ok;
3968 }
3969 
3970 /*
3971  * return values:
3972  *   1 yes, we have a valid connection
3973  *   0 oops, did not work out, please try again
3974  *  -1 peer talks different language,
3975  *     no point in trying again, please go standalone.
3976  */
3977 static int drbd_do_handshake(struct drbd_conf *mdev)
3978 {
3979 	/* ASSERT current == mdev->receiver ... */
3980 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3981 	const int expect = sizeof(struct p_handshake)
3982 			  -sizeof(struct p_header);
3983 	int rv;
3984 
3985 	rv = drbd_send_handshake(mdev);
3986 	if (!rv)
3987 		return 0;
3988 
3989 	rv = drbd_recv_header(mdev, &p->head);
3990 	if (!rv)
3991 		return 0;
3992 
3993 	if (p->head.command != P_HAND_SHAKE) {
3994 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3995 		     cmdname(p->head.command), p->head.command);
3996 		return -1;
3997 	}
3998 
3999 	if (p->head.length != expect) {
4000 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4001 		     expect, p->head.length);
4002 		return -1;
4003 	}
4004 
4005 	rv = drbd_recv(mdev, &p->head.payload, expect);
4006 
4007 	if (rv != expect) {
4008 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
4009 		return 0;
4010 	}
4011 
4012 	p->protocol_min = be32_to_cpu(p->protocol_min);
4013 	p->protocol_max = be32_to_cpu(p->protocol_max);
4014 	if (p->protocol_max == 0)
4015 		p->protocol_max = p->protocol_min;
4016 
4017 	if (PRO_VERSION_MAX < p->protocol_min ||
4018 	    PRO_VERSION_MIN > p->protocol_max)
4019 		goto incompat;
4020 
4021 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4022 
4023 	dev_info(DEV, "Handshake successful: "
4024 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4025 
4026 	return 1;
4027 
4028  incompat:
4029 	dev_err(DEV, "incompatible DRBD dialects: "
4030 	    "I support %d-%d, peer supports %d-%d\n",
4031 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4032 	    p->protocol_min, p->protocol_max);
4033 	return -1;
4034 }
4035 
4036 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4037 static int drbd_do_auth(struct drbd_conf *mdev)
4038 {
4039 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4040 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4041 	return -1;
4042 }
4043 #else
4044 #define CHALLENGE_LEN 64
4045 
4046 /* Return value:
4047 	1 - auth succeeded,
4048 	0 - failed, try again (network error),
4049 	-1 - auth failed, don't try again.
4050 */
4051 
4052 static int drbd_do_auth(struct drbd_conf *mdev)
4053 {
4054 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4055 	struct scatterlist sg;
4056 	char *response = NULL;
4057 	char *right_response = NULL;
4058 	char *peers_ch = NULL;
4059 	struct p_header p;
4060 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4061 	unsigned int resp_size;
4062 	struct hash_desc desc;
4063 	int rv;
4064 
4065 	desc.tfm = mdev->cram_hmac_tfm;
4066 	desc.flags = 0;
4067 
4068 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4069 				(u8 *)mdev->net_conf->shared_secret, key_len);
4070 	if (rv) {
4071 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4072 		rv = -1;
4073 		goto fail;
4074 	}
4075 
4076 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4077 
4078 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4079 	if (!rv)
4080 		goto fail;
4081 
4082 	rv = drbd_recv_header(mdev, &p);
4083 	if (!rv)
4084 		goto fail;
4085 
4086 	if (p.command != P_AUTH_CHALLENGE) {
4087 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4088 		    cmdname(p.command), p.command);
4089 		rv = 0;
4090 		goto fail;
4091 	}
4092 
4093 	if (p.length > CHALLENGE_LEN*2) {
4094 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4095 		rv = -1;
4096 		goto fail;
4097 	}
4098 
4099 	peers_ch = kmalloc(p.length, GFP_NOIO);
4100 	if (peers_ch == NULL) {
4101 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4102 		rv = -1;
4103 		goto fail;
4104 	}
4105 
4106 	rv = drbd_recv(mdev, peers_ch, p.length);
4107 
4108 	if (rv != p.length) {
4109 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4110 		rv = 0;
4111 		goto fail;
4112 	}
4113 
4114 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4115 	response = kmalloc(resp_size, GFP_NOIO);
4116 	if (response == NULL) {
4117 		dev_err(DEV, "kmalloc of response failed\n");
4118 		rv = -1;
4119 		goto fail;
4120 	}
4121 
4122 	sg_init_table(&sg, 1);
4123 	sg_set_buf(&sg, peers_ch, p.length);
4124 
4125 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4126 	if (rv) {
4127 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4128 		rv = -1;
4129 		goto fail;
4130 	}
4131 
4132 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4133 	if (!rv)
4134 		goto fail;
4135 
4136 	rv = drbd_recv_header(mdev, &p);
4137 	if (!rv)
4138 		goto fail;
4139 
4140 	if (p.command != P_AUTH_RESPONSE) {
4141 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4142 		    cmdname(p.command), p.command);
4143 		rv = 0;
4144 		goto fail;
4145 	}
4146 
4147 	if (p.length != resp_size) {
4148 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4149 		rv = 0;
4150 		goto fail;
4151 	}
4152 
4153 	rv = drbd_recv(mdev, response , resp_size);
4154 
4155 	if (rv != resp_size) {
4156 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4157 		rv = 0;
4158 		goto fail;
4159 	}
4160 
4161 	right_response = kmalloc(resp_size, GFP_NOIO);
4162 	if (right_response == NULL) {
4163 		dev_err(DEV, "kmalloc of right_response failed\n");
4164 		rv = -1;
4165 		goto fail;
4166 	}
4167 
4168 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4169 
4170 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4171 	if (rv) {
4172 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4173 		rv = -1;
4174 		goto fail;
4175 	}
4176 
4177 	rv = !memcmp(response, right_response, resp_size);
4178 
4179 	if (rv)
4180 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4181 		     resp_size, mdev->net_conf->cram_hmac_alg);
4182 	else
4183 		rv = -1;
4184 
4185  fail:
4186 	kfree(peers_ch);
4187 	kfree(response);
4188 	kfree(right_response);
4189 
4190 	return rv;
4191 }
4192 #endif
4193 
4194 int drbdd_init(struct drbd_thread *thi)
4195 {
4196 	struct drbd_conf *mdev = thi->mdev;
4197 	unsigned int minor = mdev_to_minor(mdev);
4198 	int h;
4199 
4200 	sprintf(current->comm, "drbd%d_receiver", minor);
4201 
4202 	dev_info(DEV, "receiver (re)started\n");
4203 
4204 	do {
4205 		h = drbd_connect(mdev);
4206 		if (h == 0) {
4207 			drbd_disconnect(mdev);
4208 			__set_current_state(TASK_INTERRUPTIBLE);
4209 			schedule_timeout(HZ);
4210 		}
4211 		if (h == -1) {
4212 			dev_warn(DEV, "Discarding network configuration.\n");
4213 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4214 		}
4215 	} while (h == 0);
4216 
4217 	if (h > 0) {
4218 		if (get_net_conf(mdev)) {
4219 			drbdd(mdev);
4220 			put_net_conf(mdev);
4221 		}
4222 	}
4223 
4224 	drbd_disconnect(mdev);
4225 
4226 	dev_info(DEV, "receiver terminated\n");
4227 	return 0;
4228 }
4229 
4230 /* ********* acknowledge sender ******** */
4231 
4232 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4233 {
4234 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4235 
4236 	int retcode = be32_to_cpu(p->retcode);
4237 
4238 	if (retcode >= SS_SUCCESS) {
4239 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4240 	} else {
4241 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4242 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4243 		    drbd_set_st_err_str(retcode), retcode);
4244 	}
4245 	wake_up(&mdev->state_wait);
4246 
4247 	return TRUE;
4248 }
4249 
4250 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4251 {
4252 	return drbd_send_ping_ack(mdev);
4253 
4254 }
4255 
4256 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4257 {
4258 	/* restore idle timeout */
4259 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4260 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4261 		wake_up(&mdev->misc_wait);
4262 
4263 	return TRUE;
4264 }
4265 
4266 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4267 {
4268 	struct p_block_ack *p = (struct p_block_ack *)h;
4269 	sector_t sector = be64_to_cpu(p->sector);
4270 	int blksize = be32_to_cpu(p->blksize);
4271 
4272 	D_ASSERT(mdev->agreed_pro_version >= 89);
4273 
4274 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4275 
4276 	drbd_rs_complete_io(mdev, sector);
4277 	drbd_set_in_sync(mdev, sector, blksize);
4278 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4279 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4280 	dec_rs_pending(mdev);
4281 
4282 	return TRUE;
4283 }
4284 
4285 /* when we receive the ACK for a write request,
4286  * verify that we actually know about it */
4287 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4288 	u64 id, sector_t sector)
4289 {
4290 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4291 	struct hlist_node *n;
4292 	struct drbd_request *req;
4293 
4294 	hlist_for_each_entry(req, n, slot, colision) {
4295 		if ((unsigned long)req == (unsigned long)id) {
4296 			if (req->sector != sector) {
4297 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4298 				    "wrong sector (%llus versus %llus)\n", req,
4299 				    (unsigned long long)req->sector,
4300 				    (unsigned long long)sector);
4301 				break;
4302 			}
4303 			return req;
4304 		}
4305 	}
4306 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4307 		(void *)(unsigned long)id, (unsigned long long)sector);
4308 	return NULL;
4309 }
4310 
4311 typedef struct drbd_request *(req_validator_fn)
4312 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4313 
4314 static int validate_req_change_req_state(struct drbd_conf *mdev,
4315 	u64 id, sector_t sector, req_validator_fn validator,
4316 	const char *func, enum drbd_req_event what)
4317 {
4318 	struct drbd_request *req;
4319 	struct bio_and_error m;
4320 
4321 	spin_lock_irq(&mdev->req_lock);
4322 	req = validator(mdev, id, sector);
4323 	if (unlikely(!req)) {
4324 		spin_unlock_irq(&mdev->req_lock);
4325 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4326 		return FALSE;
4327 	}
4328 	__req_mod(req, what, &m);
4329 	spin_unlock_irq(&mdev->req_lock);
4330 
4331 	if (m.bio)
4332 		complete_master_bio(mdev, &m);
4333 	return TRUE;
4334 }
4335 
4336 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4337 {
4338 	struct p_block_ack *p = (struct p_block_ack *)h;
4339 	sector_t sector = be64_to_cpu(p->sector);
4340 	int blksize = be32_to_cpu(p->blksize);
4341 	enum drbd_req_event what;
4342 
4343 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4344 
4345 	if (is_syncer_block_id(p->block_id)) {
4346 		drbd_set_in_sync(mdev, sector, blksize);
4347 		dec_rs_pending(mdev);
4348 		return TRUE;
4349 	}
4350 	switch (be16_to_cpu(h->command)) {
4351 	case P_RS_WRITE_ACK:
4352 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4353 		what = write_acked_by_peer_and_sis;
4354 		break;
4355 	case P_WRITE_ACK:
4356 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4357 		what = write_acked_by_peer;
4358 		break;
4359 	case P_RECV_ACK:
4360 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4361 		what = recv_acked_by_peer;
4362 		break;
4363 	case P_DISCARD_ACK:
4364 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4365 		what = conflict_discarded_by_peer;
4366 		break;
4367 	default:
4368 		D_ASSERT(0);
4369 		return FALSE;
4370 	}
4371 
4372 	return validate_req_change_req_state(mdev, p->block_id, sector,
4373 		_ack_id_to_req, __func__ , what);
4374 }
4375 
4376 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4377 {
4378 	struct p_block_ack *p = (struct p_block_ack *)h;
4379 	sector_t sector = be64_to_cpu(p->sector);
4380 
4381 	if (__ratelimit(&drbd_ratelimit_state))
4382 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4383 
4384 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4385 
4386 	if (is_syncer_block_id(p->block_id)) {
4387 		int size = be32_to_cpu(p->blksize);
4388 		dec_rs_pending(mdev);
4389 		drbd_rs_failed_io(mdev, sector, size);
4390 		return TRUE;
4391 	}
4392 	return validate_req_change_req_state(mdev, p->block_id, sector,
4393 		_ack_id_to_req, __func__ , neg_acked);
4394 }
4395 
4396 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4397 {
4398 	struct p_block_ack *p = (struct p_block_ack *)h;
4399 	sector_t sector = be64_to_cpu(p->sector);
4400 
4401 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4402 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4403 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4404 
4405 	return validate_req_change_req_state(mdev, p->block_id, sector,
4406 		_ar_id_to_req, __func__ , neg_acked);
4407 }
4408 
4409 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4410 {
4411 	sector_t sector;
4412 	int size;
4413 	struct p_block_ack *p = (struct p_block_ack *)h;
4414 
4415 	sector = be64_to_cpu(p->sector);
4416 	size = be32_to_cpu(p->blksize);
4417 
4418 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4419 
4420 	dec_rs_pending(mdev);
4421 
4422 	if (get_ldev_if_state(mdev, D_FAILED)) {
4423 		drbd_rs_complete_io(mdev, sector);
4424 		drbd_rs_failed_io(mdev, sector, size);
4425 		put_ldev(mdev);
4426 	}
4427 
4428 	return TRUE;
4429 }
4430 
4431 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4432 {
4433 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4434 
4435 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4436 
4437 	return TRUE;
4438 }
4439 
4440 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4441 {
4442 	struct p_block_ack *p = (struct p_block_ack *)h;
4443 	struct drbd_work *w;
4444 	sector_t sector;
4445 	int size;
4446 
4447 	sector = be64_to_cpu(p->sector);
4448 	size = be32_to_cpu(p->blksize);
4449 
4450 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4451 
4452 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4453 		drbd_ov_oos_found(mdev, sector, size);
4454 	else
4455 		ov_oos_print(mdev);
4456 
4457 	drbd_rs_complete_io(mdev, sector);
4458 	dec_rs_pending(mdev);
4459 
4460 	if (--mdev->ov_left == 0) {
4461 		w = kmalloc(sizeof(*w), GFP_NOIO);
4462 		if (w) {
4463 			w->cb = w_ov_finished;
4464 			drbd_queue_work_front(&mdev->data.work, w);
4465 		} else {
4466 			dev_err(DEV, "kmalloc(w) failed.");
4467 			ov_oos_print(mdev);
4468 			drbd_resync_finished(mdev);
4469 		}
4470 	}
4471 	return TRUE;
4472 }
4473 
4474 static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4475 {
4476 	struct p_delay_probe *p = (struct p_delay_probe *)h;
4477 
4478 	got_delay_probe(mdev, USE_META_SOCKET, p);
4479 	return TRUE;
4480 }
4481 
4482 struct asender_cmd {
4483 	size_t pkt_size;
4484 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4485 };
4486 
4487 static struct asender_cmd *get_asender_cmd(int cmd)
4488 {
4489 	static struct asender_cmd asender_tbl[] = {
4490 		/* anything missing from this table is in
4491 		 * the drbd_cmd_handler (drbd_default_handler) table,
4492 		 * see the beginning of drbdd() */
4493 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4494 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4495 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4496 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4497 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4498 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4499 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4500 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4501 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4502 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4503 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4504 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4505 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4506 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_delay_probe_m },
4507 	[P_MAX_CMD]	    = { 0, NULL },
4508 	};
4509 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4510 		return NULL;
4511 	return &asender_tbl[cmd];
4512 }
4513 
4514 int drbd_asender(struct drbd_thread *thi)
4515 {
4516 	struct drbd_conf *mdev = thi->mdev;
4517 	struct p_header *h = &mdev->meta.rbuf.header;
4518 	struct asender_cmd *cmd = NULL;
4519 
4520 	int rv, len;
4521 	void *buf    = h;
4522 	int received = 0;
4523 	int expect   = sizeof(struct p_header);
4524 	int empty;
4525 
4526 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4527 
4528 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4529 	current->rt_priority = 2;    /* more important than all other tasks */
4530 
4531 	while (get_t_state(thi) == Running) {
4532 		drbd_thread_current_set_cpu(mdev);
4533 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4534 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4535 			mdev->meta.socket->sk->sk_rcvtimeo =
4536 				mdev->net_conf->ping_timeo*HZ/10;
4537 		}
4538 
4539 		/* conditionally cork;
4540 		 * it may hurt latency if we cork without much to send */
4541 		if (!mdev->net_conf->no_cork &&
4542 			3 < atomic_read(&mdev->unacked_cnt))
4543 			drbd_tcp_cork(mdev->meta.socket);
4544 		while (1) {
4545 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4546 			flush_signals(current);
4547 			if (!drbd_process_done_ee(mdev)) {
4548 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4549 				goto reconnect;
4550 			}
4551 			/* to avoid race with newly queued ACKs */
4552 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4553 			spin_lock_irq(&mdev->req_lock);
4554 			empty = list_empty(&mdev->done_ee);
4555 			spin_unlock_irq(&mdev->req_lock);
4556 			/* new ack may have been queued right here,
4557 			 * but then there is also a signal pending,
4558 			 * and we start over... */
4559 			if (empty)
4560 				break;
4561 		}
4562 		/* but unconditionally uncork unless disabled */
4563 		if (!mdev->net_conf->no_cork)
4564 			drbd_tcp_uncork(mdev->meta.socket);
4565 
4566 		/* short circuit, recv_msg would return EINTR anyways. */
4567 		if (signal_pending(current))
4568 			continue;
4569 
4570 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4571 				     buf, expect-received, 0);
4572 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4573 
4574 		flush_signals(current);
4575 
4576 		/* Note:
4577 		 * -EINTR	 (on meta) we got a signal
4578 		 * -EAGAIN	 (on meta) rcvtimeo expired
4579 		 * -ECONNRESET	 other side closed the connection
4580 		 * -ERESTARTSYS  (on data) we got a signal
4581 		 * rv <  0	 other than above: unexpected error!
4582 		 * rv == expected: full header or command
4583 		 * rv <  expected: "woken" by signal during receive
4584 		 * rv == 0	 : "connection shut down by peer"
4585 		 */
4586 		if (likely(rv > 0)) {
4587 			received += rv;
4588 			buf	 += rv;
4589 		} else if (rv == 0) {
4590 			dev_err(DEV, "meta connection shut down by peer.\n");
4591 			goto reconnect;
4592 		} else if (rv == -EAGAIN) {
4593 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4594 			    mdev->net_conf->ping_timeo*HZ/10) {
4595 				dev_err(DEV, "PingAck did not arrive in time.\n");
4596 				goto reconnect;
4597 			}
4598 			set_bit(SEND_PING, &mdev->flags);
4599 			continue;
4600 		} else if (rv == -EINTR) {
4601 			continue;
4602 		} else {
4603 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4604 			goto reconnect;
4605 		}
4606 
4607 		if (received == expect && cmd == NULL) {
4608 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4609 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4610 				    (long)be32_to_cpu(h->magic),
4611 				    h->command, h->length);
4612 				goto reconnect;
4613 			}
4614 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4615 			len = be16_to_cpu(h->length);
4616 			if (unlikely(cmd == NULL)) {
4617 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4618 				    (long)be32_to_cpu(h->magic),
4619 				    h->command, h->length);
4620 				goto disconnect;
4621 			}
4622 			expect = cmd->pkt_size;
4623 			ERR_IF(len != expect-sizeof(struct p_header))
4624 				goto reconnect;
4625 		}
4626 		if (received == expect) {
4627 			D_ASSERT(cmd != NULL);
4628 			if (!cmd->process(mdev, h))
4629 				goto reconnect;
4630 
4631 			buf	 = h;
4632 			received = 0;
4633 			expect	 = sizeof(struct p_header);
4634 			cmd	 = NULL;
4635 		}
4636 	}
4637 
4638 	if (0) {
4639 reconnect:
4640 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4641 	}
4642 	if (0) {
4643 disconnect:
4644 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4645 	}
4646 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4647 
4648 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4649 	dev_info(DEV, "asender terminated\n");
4650 
4651 	return 0;
4652 }
4653