xref: /linux/drivers/block/drbd/drbd_receiver.c (revision a234ca0faa65dcd5cc473915bd925130ebb7b74b)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/string.h>
46 #include <linux/scatterlist.h>
47 #include "drbd_int.h"
48 #include "drbd_req.h"
49 
50 #include "drbd_vli.h"
51 
52 struct flush_work {
53 	struct drbd_work w;
54 	struct drbd_epoch *epoch;
55 };
56 
57 enum finish_epoch {
58 	FE_STILL_LIVE,
59 	FE_DESTROYED,
60 	FE_RECYCLED,
61 };
62 
63 static int drbd_do_handshake(struct drbd_conf *mdev);
64 static int drbd_do_auth(struct drbd_conf *mdev);
65 
66 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68 
69 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70 {
71 	struct drbd_epoch *prev;
72 	spin_lock(&mdev->epoch_lock);
73 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74 	if (prev == epoch || prev == mdev->current_epoch)
75 		prev = NULL;
76 	spin_unlock(&mdev->epoch_lock);
77 	return prev;
78 }
79 
80 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
81 
82 /*
83  * some helper functions to deal with single linked page lists,
84  * page->private being our "next" pointer.
85  */
86 
87 /* If at least n pages are linked at head, get n pages off.
88  * Otherwise, don't modify head, and return NULL.
89  * Locking is the responsibility of the caller.
90  */
91 static struct page *page_chain_del(struct page **head, int n)
92 {
93 	struct page *page;
94 	struct page *tmp;
95 
96 	BUG_ON(!n);
97 	BUG_ON(!head);
98 
99 	page = *head;
100 
101 	if (!page)
102 		return NULL;
103 
104 	while (page) {
105 		tmp = page_chain_next(page);
106 		if (--n == 0)
107 			break; /* found sufficient pages */
108 		if (tmp == NULL)
109 			/* insufficient pages, don't use any of them. */
110 			return NULL;
111 		page = tmp;
112 	}
113 
114 	/* add end of list marker for the returned list */
115 	set_page_private(page, 0);
116 	/* actual return value, and adjustment of head */
117 	page = *head;
118 	*head = tmp;
119 	return page;
120 }
121 
122 /* may be used outside of locks to find the tail of a (usually short)
123  * "private" page chain, before adding it back to a global chain head
124  * with page_chain_add() under a spinlock. */
125 static struct page *page_chain_tail(struct page *page, int *len)
126 {
127 	struct page *tmp;
128 	int i = 1;
129 	while ((tmp = page_chain_next(page)))
130 		++i, page = tmp;
131 	if (len)
132 		*len = i;
133 	return page;
134 }
135 
136 static int page_chain_free(struct page *page)
137 {
138 	struct page *tmp;
139 	int i = 0;
140 	page_chain_for_each_safe(page, tmp) {
141 		put_page(page);
142 		++i;
143 	}
144 	return i;
145 }
146 
147 static void page_chain_add(struct page **head,
148 		struct page *chain_first, struct page *chain_last)
149 {
150 #if 1
151 	struct page *tmp;
152 	tmp = page_chain_tail(chain_first, NULL);
153 	BUG_ON(tmp != chain_last);
154 #endif
155 
156 	/* add chain to head */
157 	set_page_private(chain_last, (unsigned long)*head);
158 	*head = chain_first;
159 }
160 
161 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
162 {
163 	struct page *page = NULL;
164 	struct page *tmp = NULL;
165 	int i = 0;
166 
167 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
168 	 * So what. It saves a spin_lock. */
169 	if (drbd_pp_vacant >= number) {
170 		spin_lock(&drbd_pp_lock);
171 		page = page_chain_del(&drbd_pp_pool, number);
172 		if (page)
173 			drbd_pp_vacant -= number;
174 		spin_unlock(&drbd_pp_lock);
175 		if (page)
176 			return page;
177 	}
178 
179 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
181 	 * which in turn might block on the other node at this very place.  */
182 	for (i = 0; i < number; i++) {
183 		tmp = alloc_page(GFP_TRY);
184 		if (!tmp)
185 			break;
186 		set_page_private(tmp, (unsigned long)page);
187 		page = tmp;
188 	}
189 
190 	if (i == number)
191 		return page;
192 
193 	/* Not enough pages immediately available this time.
194 	 * No need to jump around here, drbd_pp_alloc will retry this
195 	 * function "soon". */
196 	if (page) {
197 		tmp = page_chain_tail(page, NULL);
198 		spin_lock(&drbd_pp_lock);
199 		page_chain_add(&drbd_pp_pool, page, tmp);
200 		drbd_pp_vacant += i;
201 		spin_unlock(&drbd_pp_lock);
202 	}
203 	return NULL;
204 }
205 
206 /* kick lower level device, if we have more than (arbitrary number)
207  * reference counts on it, which typically are locally submitted io
208  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
209 static void maybe_kick_lo(struct drbd_conf *mdev)
210 {
211 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212 		drbd_kick_lo(mdev);
213 }
214 
215 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216 {
217 	struct drbd_epoch_entry *e;
218 	struct list_head *le, *tle;
219 
220 	/* The EEs are always appended to the end of the list. Since
221 	   they are sent in order over the wire, they have to finish
222 	   in order. As soon as we see the first not finished we can
223 	   stop to examine the list... */
224 
225 	list_for_each_safe(le, tle, &mdev->net_ee) {
226 		e = list_entry(le, struct drbd_epoch_entry, w.list);
227 		if (drbd_ee_has_active_page(e))
228 			break;
229 		list_move(le, to_be_freed);
230 	}
231 }
232 
233 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234 {
235 	LIST_HEAD(reclaimed);
236 	struct drbd_epoch_entry *e, *t;
237 
238 	maybe_kick_lo(mdev);
239 	spin_lock_irq(&mdev->req_lock);
240 	reclaim_net_ee(mdev, &reclaimed);
241 	spin_unlock_irq(&mdev->req_lock);
242 
243 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
244 		drbd_free_ee(mdev, e);
245 }
246 
247 /**
248  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
249  * @mdev:	DRBD device.
250  * @number:	number of pages requested
251  * @retry:	whether to retry, if not enough pages are available right now
252  *
253  * Tries to allocate number pages, first from our own page pool, then from
254  * the kernel, unless this allocation would exceed the max_buffers setting.
255  * Possibly retry until DRBD frees sufficient pages somewhere else.
256  *
257  * Returns a page chain linked via page->private.
258  */
259 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
260 {
261 	struct page *page = NULL;
262 	DEFINE_WAIT(wait);
263 
264 	/* Yes, we may run up to @number over max_buffers. If we
265 	 * follow it strictly, the admin will get it wrong anyways. */
266 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
268 
269 	while (page == NULL) {
270 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271 
272 		drbd_kick_lo_and_reclaim_net(mdev);
273 
274 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
275 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
276 			if (page)
277 				break;
278 		}
279 
280 		if (!retry)
281 			break;
282 
283 		if (signal_pending(current)) {
284 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285 			break;
286 		}
287 
288 		schedule();
289 	}
290 	finish_wait(&drbd_pp_wait, &wait);
291 
292 	if (page)
293 		atomic_add(number, &mdev->pp_in_use);
294 	return page;
295 }
296 
297 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
298  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
302 {
303 	int i;
304 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
305 		i = page_chain_free(page);
306 	else {
307 		struct page *tmp;
308 		tmp = page_chain_tail(page, &i);
309 		spin_lock(&drbd_pp_lock);
310 		page_chain_add(&drbd_pp_pool, page, tmp);
311 		drbd_pp_vacant += i;
312 		spin_unlock(&drbd_pp_lock);
313 	}
314 	atomic_sub(i, &mdev->pp_in_use);
315 	i = atomic_read(&mdev->pp_in_use);
316 	if (i < 0)
317 		dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
318 	wake_up(&drbd_pp_wait);
319 }
320 
321 /*
322 You need to hold the req_lock:
323  _drbd_wait_ee_list_empty()
324 
325 You must not have the req_lock:
326  drbd_free_ee()
327  drbd_alloc_ee()
328  drbd_init_ee()
329  drbd_release_ee()
330  drbd_ee_fix_bhs()
331  drbd_process_done_ee()
332  drbd_clear_done_ee()
333  drbd_wait_ee_list_empty()
334 */
335 
336 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
337 				     u64 id,
338 				     sector_t sector,
339 				     unsigned int data_size,
340 				     gfp_t gfp_mask) __must_hold(local)
341 {
342 	struct drbd_epoch_entry *e;
343 	struct page *page;
344 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
345 
346 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
347 		return NULL;
348 
349 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
350 	if (!e) {
351 		if (!(gfp_mask & __GFP_NOWARN))
352 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
353 		return NULL;
354 	}
355 
356 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
357 	if (!page)
358 		goto fail;
359 
360 	INIT_HLIST_NODE(&e->colision);
361 	e->epoch = NULL;
362 	e->mdev = mdev;
363 	e->pages = page;
364 	atomic_set(&e->pending_bios, 0);
365 	e->size = data_size;
366 	e->flags = 0;
367 	e->sector = sector;
368 	e->sector = sector;
369 	e->block_id = id;
370 
371 	return e;
372 
373  fail:
374 	mempool_free(e, drbd_ee_mempool);
375 	return NULL;
376 }
377 
378 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
379 {
380 	drbd_pp_free(mdev, e->pages);
381 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
382 	D_ASSERT(hlist_unhashed(&e->colision));
383 	mempool_free(e, drbd_ee_mempool);
384 }
385 
386 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
387 {
388 	LIST_HEAD(work_list);
389 	struct drbd_epoch_entry *e, *t;
390 	int count = 0;
391 
392 	spin_lock_irq(&mdev->req_lock);
393 	list_splice_init(list, &work_list);
394 	spin_unlock_irq(&mdev->req_lock);
395 
396 	list_for_each_entry_safe(e, t, &work_list, w.list) {
397 		drbd_free_ee(mdev, e);
398 		count++;
399 	}
400 	return count;
401 }
402 
403 
404 /*
405  * This function is called from _asender only_
406  * but see also comments in _req_mod(,barrier_acked)
407  * and receive_Barrier.
408  *
409  * Move entries from net_ee to done_ee, if ready.
410  * Grab done_ee, call all callbacks, free the entries.
411  * The callbacks typically send out ACKs.
412  */
413 static int drbd_process_done_ee(struct drbd_conf *mdev)
414 {
415 	LIST_HEAD(work_list);
416 	LIST_HEAD(reclaimed);
417 	struct drbd_epoch_entry *e, *t;
418 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
419 
420 	spin_lock_irq(&mdev->req_lock);
421 	reclaim_net_ee(mdev, &reclaimed);
422 	list_splice_init(&mdev->done_ee, &work_list);
423 	spin_unlock_irq(&mdev->req_lock);
424 
425 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
426 		drbd_free_ee(mdev, e);
427 
428 	/* possible callbacks here:
429 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
430 	 * all ignore the last argument.
431 	 */
432 	list_for_each_entry_safe(e, t, &work_list, w.list) {
433 		/* list_del not necessary, next/prev members not touched */
434 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
435 		drbd_free_ee(mdev, e);
436 	}
437 	wake_up(&mdev->ee_wait);
438 
439 	return ok;
440 }
441 
442 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
443 {
444 	DEFINE_WAIT(wait);
445 
446 	/* avoids spin_lock/unlock
447 	 * and calling prepare_to_wait in the fast path */
448 	while (!list_empty(head)) {
449 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
450 		spin_unlock_irq(&mdev->req_lock);
451 		drbd_kick_lo(mdev);
452 		schedule();
453 		finish_wait(&mdev->ee_wait, &wait);
454 		spin_lock_irq(&mdev->req_lock);
455 	}
456 }
457 
458 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
459 {
460 	spin_lock_irq(&mdev->req_lock);
461 	_drbd_wait_ee_list_empty(mdev, head);
462 	spin_unlock_irq(&mdev->req_lock);
463 }
464 
465 /* see also kernel_accept; which is only present since 2.6.18.
466  * also we want to log which part of it failed, exactly */
467 static int drbd_accept(struct drbd_conf *mdev, const char **what,
468 		struct socket *sock, struct socket **newsock)
469 {
470 	struct sock *sk = sock->sk;
471 	int err = 0;
472 
473 	*what = "listen";
474 	err = sock->ops->listen(sock, 5);
475 	if (err < 0)
476 		goto out;
477 
478 	*what = "sock_create_lite";
479 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
480 			       newsock);
481 	if (err < 0)
482 		goto out;
483 
484 	*what = "accept";
485 	err = sock->ops->accept(sock, *newsock, 0);
486 	if (err < 0) {
487 		sock_release(*newsock);
488 		*newsock = NULL;
489 		goto out;
490 	}
491 	(*newsock)->ops  = sock->ops;
492 
493 out:
494 	return err;
495 }
496 
497 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
498 		    void *buf, size_t size, int flags)
499 {
500 	mm_segment_t oldfs;
501 	struct kvec iov = {
502 		.iov_base = buf,
503 		.iov_len = size,
504 	};
505 	struct msghdr msg = {
506 		.msg_iovlen = 1,
507 		.msg_iov = (struct iovec *)&iov,
508 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
509 	};
510 	int rv;
511 
512 	oldfs = get_fs();
513 	set_fs(KERNEL_DS);
514 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
515 	set_fs(oldfs);
516 
517 	return rv;
518 }
519 
520 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
521 {
522 	mm_segment_t oldfs;
523 	struct kvec iov = {
524 		.iov_base = buf,
525 		.iov_len = size,
526 	};
527 	struct msghdr msg = {
528 		.msg_iovlen = 1,
529 		.msg_iov = (struct iovec *)&iov,
530 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
531 	};
532 	int rv;
533 
534 	oldfs = get_fs();
535 	set_fs(KERNEL_DS);
536 
537 	for (;;) {
538 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
539 		if (rv == size)
540 			break;
541 
542 		/* Note:
543 		 * ECONNRESET	other side closed the connection
544 		 * ERESTARTSYS	(on  sock) we got a signal
545 		 */
546 
547 		if (rv < 0) {
548 			if (rv == -ECONNRESET)
549 				dev_info(DEV, "sock was reset by peer\n");
550 			else if (rv != -ERESTARTSYS)
551 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
552 			break;
553 		} else if (rv == 0) {
554 			dev_info(DEV, "sock was shut down by peer\n");
555 			break;
556 		} else	{
557 			/* signal came in, or peer/link went down,
558 			 * after we read a partial message
559 			 */
560 			/* D_ASSERT(signal_pending(current)); */
561 			break;
562 		}
563 	};
564 
565 	set_fs(oldfs);
566 
567 	if (rv != size)
568 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
569 
570 	return rv;
571 }
572 
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 		unsigned int rcv)
580 {
581 	/* open coded SO_SNDBUF, SO_RCVBUF */
582 	if (snd) {
583 		sock->sk->sk_sndbuf = snd;
584 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585 	}
586 	if (rcv) {
587 		sock->sk->sk_rcvbuf = rcv;
588 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589 	}
590 }
591 
592 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
593 {
594 	const char *what;
595 	struct socket *sock;
596 	struct sockaddr_in6 src_in6;
597 	int err;
598 	int disconnect_on_error = 1;
599 
600 	if (!get_net_conf(mdev))
601 		return NULL;
602 
603 	what = "sock_create_kern";
604 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
605 		SOCK_STREAM, IPPROTO_TCP, &sock);
606 	if (err < 0) {
607 		sock = NULL;
608 		goto out;
609 	}
610 
611 	sock->sk->sk_rcvtimeo =
612 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
613 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
614 			mdev->net_conf->rcvbuf_size);
615 
616        /* explicitly bind to the configured IP as source IP
617 	*  for the outgoing connections.
618 	*  This is needed for multihomed hosts and to be
619 	*  able to use lo: interfaces for drbd.
620 	* Make sure to use 0 as port number, so linux selects
621 	*  a free one dynamically.
622 	*/
623 	memcpy(&src_in6, mdev->net_conf->my_addr,
624 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
625 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
626 		src_in6.sin6_port = 0;
627 	else
628 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
629 
630 	what = "bind before connect";
631 	err = sock->ops->bind(sock,
632 			      (struct sockaddr *) &src_in6,
633 			      mdev->net_conf->my_addr_len);
634 	if (err < 0)
635 		goto out;
636 
637 	/* connect may fail, peer not yet available.
638 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
639 	disconnect_on_error = 0;
640 	what = "connect";
641 	err = sock->ops->connect(sock,
642 				 (struct sockaddr *)mdev->net_conf->peer_addr,
643 				 mdev->net_conf->peer_addr_len, 0);
644 
645 out:
646 	if (err < 0) {
647 		if (sock) {
648 			sock_release(sock);
649 			sock = NULL;
650 		}
651 		switch (-err) {
652 			/* timeout, busy, signal pending */
653 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
654 		case EINTR: case ERESTARTSYS:
655 			/* peer not (yet) available, network problem */
656 		case ECONNREFUSED: case ENETUNREACH:
657 		case EHOSTDOWN:    case EHOSTUNREACH:
658 			disconnect_on_error = 0;
659 			break;
660 		default:
661 			dev_err(DEV, "%s failed, err = %d\n", what, err);
662 		}
663 		if (disconnect_on_error)
664 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
665 	}
666 	put_net_conf(mdev);
667 	return sock;
668 }
669 
670 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
671 {
672 	int timeo, err;
673 	struct socket *s_estab = NULL, *s_listen;
674 	const char *what;
675 
676 	if (!get_net_conf(mdev))
677 		return NULL;
678 
679 	what = "sock_create_kern";
680 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
681 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
682 	if (err) {
683 		s_listen = NULL;
684 		goto out;
685 	}
686 
687 	timeo = mdev->net_conf->try_connect_int * HZ;
688 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
689 
690 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
691 	s_listen->sk->sk_rcvtimeo = timeo;
692 	s_listen->sk->sk_sndtimeo = timeo;
693 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
694 			mdev->net_conf->rcvbuf_size);
695 
696 	what = "bind before listen";
697 	err = s_listen->ops->bind(s_listen,
698 			      (struct sockaddr *) mdev->net_conf->my_addr,
699 			      mdev->net_conf->my_addr_len);
700 	if (err < 0)
701 		goto out;
702 
703 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
704 
705 out:
706 	if (s_listen)
707 		sock_release(s_listen);
708 	if (err < 0) {
709 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
710 			dev_err(DEV, "%s failed, err = %d\n", what, err);
711 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
712 		}
713 	}
714 	put_net_conf(mdev);
715 
716 	return s_estab;
717 }
718 
719 static int drbd_send_fp(struct drbd_conf *mdev,
720 	struct socket *sock, enum drbd_packets cmd)
721 {
722 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
723 
724 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
725 }
726 
727 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
728 {
729 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
730 	int rr;
731 
732 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
733 
734 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
735 		return be16_to_cpu(h->command);
736 
737 	return 0xffff;
738 }
739 
740 /**
741  * drbd_socket_okay() - Free the socket if its connection is not okay
742  * @mdev:	DRBD device.
743  * @sock:	pointer to the pointer to the socket.
744  */
745 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
746 {
747 	int rr;
748 	char tb[4];
749 
750 	if (!*sock)
751 		return FALSE;
752 
753 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
754 
755 	if (rr > 0 || rr == -EAGAIN) {
756 		return TRUE;
757 	} else {
758 		sock_release(*sock);
759 		*sock = NULL;
760 		return FALSE;
761 	}
762 }
763 
764 /*
765  * return values:
766  *   1 yes, we have a valid connection
767  *   0 oops, did not work out, please try again
768  *  -1 peer talks different language,
769  *     no point in trying again, please go standalone.
770  *  -2 We do not have a network config...
771  */
772 static int drbd_connect(struct drbd_conf *mdev)
773 {
774 	struct socket *s, *sock, *msock;
775 	int try, h, ok;
776 
777 	D_ASSERT(!mdev->data.socket);
778 
779 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
780 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
781 
782 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
783 		return -2;
784 
785 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
786 
787 	sock  = NULL;
788 	msock = NULL;
789 
790 	do {
791 		for (try = 0;;) {
792 			/* 3 tries, this should take less than a second! */
793 			s = drbd_try_connect(mdev);
794 			if (s || ++try >= 3)
795 				break;
796 			/* give the other side time to call bind() & listen() */
797 			__set_current_state(TASK_INTERRUPTIBLE);
798 			schedule_timeout(HZ / 10);
799 		}
800 
801 		if (s) {
802 			if (!sock) {
803 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
804 				sock = s;
805 				s = NULL;
806 			} else if (!msock) {
807 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
808 				msock = s;
809 				s = NULL;
810 			} else {
811 				dev_err(DEV, "Logic error in drbd_connect()\n");
812 				goto out_release_sockets;
813 			}
814 		}
815 
816 		if (sock && msock) {
817 			__set_current_state(TASK_INTERRUPTIBLE);
818 			schedule_timeout(HZ / 10);
819 			ok = drbd_socket_okay(mdev, &sock);
820 			ok = drbd_socket_okay(mdev, &msock) && ok;
821 			if (ok)
822 				break;
823 		}
824 
825 retry:
826 		s = drbd_wait_for_connect(mdev);
827 		if (s) {
828 			try = drbd_recv_fp(mdev, s);
829 			drbd_socket_okay(mdev, &sock);
830 			drbd_socket_okay(mdev, &msock);
831 			switch (try) {
832 			case P_HAND_SHAKE_S:
833 				if (sock) {
834 					dev_warn(DEV, "initial packet S crossed\n");
835 					sock_release(sock);
836 				}
837 				sock = s;
838 				break;
839 			case P_HAND_SHAKE_M:
840 				if (msock) {
841 					dev_warn(DEV, "initial packet M crossed\n");
842 					sock_release(msock);
843 				}
844 				msock = s;
845 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
846 				break;
847 			default:
848 				dev_warn(DEV, "Error receiving initial packet\n");
849 				sock_release(s);
850 				if (random32() & 1)
851 					goto retry;
852 			}
853 		}
854 
855 		if (mdev->state.conn <= C_DISCONNECTING)
856 			goto out_release_sockets;
857 		if (signal_pending(current)) {
858 			flush_signals(current);
859 			smp_rmb();
860 			if (get_t_state(&mdev->receiver) == Exiting)
861 				goto out_release_sockets;
862 		}
863 
864 		if (sock && msock) {
865 			ok = drbd_socket_okay(mdev, &sock);
866 			ok = drbd_socket_okay(mdev, &msock) && ok;
867 			if (ok)
868 				break;
869 		}
870 	} while (1);
871 
872 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
874 
875 	sock->sk->sk_allocation = GFP_NOIO;
876 	msock->sk->sk_allocation = GFP_NOIO;
877 
878 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
879 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
880 
881 	/* NOT YET ...
882 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
883 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
884 	 * first set it to the P_HAND_SHAKE timeout,
885 	 * which we set to 4x the configured ping_timeout. */
886 	sock->sk->sk_sndtimeo =
887 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
888 
889 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
890 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
891 
892 	/* we don't want delays.
893 	 * we use TCP_CORK where apropriate, though */
894 	drbd_tcp_nodelay(sock);
895 	drbd_tcp_nodelay(msock);
896 
897 	mdev->data.socket = sock;
898 	mdev->meta.socket = msock;
899 	mdev->last_received = jiffies;
900 
901 	D_ASSERT(mdev->asender.task == NULL);
902 
903 	h = drbd_do_handshake(mdev);
904 	if (h <= 0)
905 		return h;
906 
907 	if (mdev->cram_hmac_tfm) {
908 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
909 		switch (drbd_do_auth(mdev)) {
910 		case -1:
911 			dev_err(DEV, "Authentication of peer failed\n");
912 			return -1;
913 		case 0:
914 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
915 			return 0;
916 		}
917 	}
918 
919 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
920 		return 0;
921 
922 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
923 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
924 
925 	atomic_set(&mdev->packet_seq, 0);
926 	mdev->peer_seq = 0;
927 
928 	drbd_thread_start(&mdev->asender);
929 
930 	if (!drbd_send_protocol(mdev))
931 		return -1;
932 	drbd_send_sync_param(mdev, &mdev->sync_conf);
933 	drbd_send_sizes(mdev, 0, 0);
934 	drbd_send_uuids(mdev);
935 	drbd_send_state(mdev);
936 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
937 	clear_bit(RESIZE_PENDING, &mdev->flags);
938 
939 	return 1;
940 
941 out_release_sockets:
942 	if (sock)
943 		sock_release(sock);
944 	if (msock)
945 		sock_release(msock);
946 	return -1;
947 }
948 
949 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
950 {
951 	int r;
952 
953 	r = drbd_recv(mdev, h, sizeof(*h));
954 
955 	if (unlikely(r != sizeof(*h))) {
956 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
957 		return FALSE;
958 	};
959 	h->command = be16_to_cpu(h->command);
960 	h->length  = be16_to_cpu(h->length);
961 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
962 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
963 		    (long)be32_to_cpu(h->magic),
964 		    h->command, h->length);
965 		return FALSE;
966 	}
967 	mdev->last_received = jiffies;
968 
969 	return TRUE;
970 }
971 
972 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
973 {
974 	int rv;
975 
976 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
977 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
978 					NULL, BLKDEV_IFL_WAIT);
979 		if (rv) {
980 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
981 			/* would rather check on EOPNOTSUPP, but that is not reliable.
982 			 * don't try again for ANY return value != 0
983 			 * if (rv == -EOPNOTSUPP) */
984 			drbd_bump_write_ordering(mdev, WO_drain_io);
985 		}
986 		put_ldev(mdev);
987 	}
988 
989 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
990 }
991 
992 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
993 {
994 	struct flush_work *fw = (struct flush_work *)w;
995 	struct drbd_epoch *epoch = fw->epoch;
996 
997 	kfree(w);
998 
999 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
1000 		drbd_flush_after_epoch(mdev, epoch);
1001 
1002 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1003 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1004 
1005 	return 1;
1006 }
1007 
1008 /**
1009  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1010  * @mdev:	DRBD device.
1011  * @epoch:	Epoch object.
1012  * @ev:		Epoch event.
1013  */
1014 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1015 					       struct drbd_epoch *epoch,
1016 					       enum epoch_event ev)
1017 {
1018 	int finish, epoch_size;
1019 	struct drbd_epoch *next_epoch;
1020 	int schedule_flush = 0;
1021 	enum finish_epoch rv = FE_STILL_LIVE;
1022 
1023 	spin_lock(&mdev->epoch_lock);
1024 	do {
1025 		next_epoch = NULL;
1026 		finish = 0;
1027 
1028 		epoch_size = atomic_read(&epoch->epoch_size);
1029 
1030 		switch (ev & ~EV_CLEANUP) {
1031 		case EV_PUT:
1032 			atomic_dec(&epoch->active);
1033 			break;
1034 		case EV_GOT_BARRIER_NR:
1035 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1036 
1037 			/* Special case: If we just switched from WO_bio_barrier to
1038 			   WO_bdev_flush we should not finish the current epoch */
1039 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1040 			    mdev->write_ordering != WO_bio_barrier &&
1041 			    epoch == mdev->current_epoch)
1042 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1043 			break;
1044 		case EV_BARRIER_DONE:
1045 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1046 			break;
1047 		case EV_BECAME_LAST:
1048 			/* nothing to do*/
1049 			break;
1050 		}
1051 
1052 		if (epoch_size != 0 &&
1053 		    atomic_read(&epoch->active) == 0 &&
1054 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1055 		    epoch->list.prev == &mdev->current_epoch->list &&
1056 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1057 			/* Nearly all conditions are met to finish that epoch... */
1058 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1059 			    mdev->write_ordering == WO_none ||
1060 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1061 			    ev & EV_CLEANUP) {
1062 				finish = 1;
1063 				set_bit(DE_IS_FINISHING, &epoch->flags);
1064 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1065 				 mdev->write_ordering == WO_bio_barrier) {
1066 				atomic_inc(&epoch->active);
1067 				schedule_flush = 1;
1068 			}
1069 		}
1070 		if (finish) {
1071 			if (!(ev & EV_CLEANUP)) {
1072 				spin_unlock(&mdev->epoch_lock);
1073 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1074 				spin_lock(&mdev->epoch_lock);
1075 			}
1076 			dec_unacked(mdev);
1077 
1078 			if (mdev->current_epoch != epoch) {
1079 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1080 				list_del(&epoch->list);
1081 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1082 				mdev->epochs--;
1083 				kfree(epoch);
1084 
1085 				if (rv == FE_STILL_LIVE)
1086 					rv = FE_DESTROYED;
1087 			} else {
1088 				epoch->flags = 0;
1089 				atomic_set(&epoch->epoch_size, 0);
1090 				/* atomic_set(&epoch->active, 0); is already zero */
1091 				if (rv == FE_STILL_LIVE)
1092 					rv = FE_RECYCLED;
1093 			}
1094 		}
1095 
1096 		if (!next_epoch)
1097 			break;
1098 
1099 		epoch = next_epoch;
1100 	} while (1);
1101 
1102 	spin_unlock(&mdev->epoch_lock);
1103 
1104 	if (schedule_flush) {
1105 		struct flush_work *fw;
1106 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1107 		if (fw) {
1108 			fw->w.cb = w_flush;
1109 			fw->epoch = epoch;
1110 			drbd_queue_work(&mdev->data.work, &fw->w);
1111 		} else {
1112 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1113 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1114 			/* That is not a recursion, only one level */
1115 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1116 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1117 		}
1118 	}
1119 
1120 	return rv;
1121 }
1122 
1123 /**
1124  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1125  * @mdev:	DRBD device.
1126  * @wo:		Write ordering method to try.
1127  */
1128 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1129 {
1130 	enum write_ordering_e pwo;
1131 	static char *write_ordering_str[] = {
1132 		[WO_none] = "none",
1133 		[WO_drain_io] = "drain",
1134 		[WO_bdev_flush] = "flush",
1135 		[WO_bio_barrier] = "barrier",
1136 	};
1137 
1138 	pwo = mdev->write_ordering;
1139 	wo = min(pwo, wo);
1140 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1141 		wo = WO_bdev_flush;
1142 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1143 		wo = WO_drain_io;
1144 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1145 		wo = WO_none;
1146 	mdev->write_ordering = wo;
1147 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1148 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1149 }
1150 
1151 /**
1152  * drbd_submit_ee()
1153  * @mdev:	DRBD device.
1154  * @e:		epoch entry
1155  * @rw:		flag field, see bio->bi_rw
1156  */
1157 /* TODO allocate from our own bio_set. */
1158 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1159 		const unsigned rw, const int fault_type)
1160 {
1161 	struct bio *bios = NULL;
1162 	struct bio *bio;
1163 	struct page *page = e->pages;
1164 	sector_t sector = e->sector;
1165 	unsigned ds = e->size;
1166 	unsigned n_bios = 0;
1167 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1168 
1169 	/* In most cases, we will only need one bio.  But in case the lower
1170 	 * level restrictions happen to be different at this offset on this
1171 	 * side than those of the sending peer, we may need to submit the
1172 	 * request in more than one bio. */
1173 next_bio:
1174 	bio = bio_alloc(GFP_NOIO, nr_pages);
1175 	if (!bio) {
1176 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1177 		goto fail;
1178 	}
1179 	/* > e->sector, unless this is the first bio */
1180 	bio->bi_sector = sector;
1181 	bio->bi_bdev = mdev->ldev->backing_bdev;
1182 	/* we special case some flags in the multi-bio case, see below
1183 	 * (REQ_UNPLUG, REQ_HARDBARRIER) */
1184 	bio->bi_rw = rw;
1185 	bio->bi_private = e;
1186 	bio->bi_end_io = drbd_endio_sec;
1187 
1188 	bio->bi_next = bios;
1189 	bios = bio;
1190 	++n_bios;
1191 
1192 	page_chain_for_each(page) {
1193 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1194 		if (!bio_add_page(bio, page, len, 0)) {
1195 			/* a single page must always be possible! */
1196 			BUG_ON(bio->bi_vcnt == 0);
1197 			goto next_bio;
1198 		}
1199 		ds -= len;
1200 		sector += len >> 9;
1201 		--nr_pages;
1202 	}
1203 	D_ASSERT(page == NULL);
1204 	D_ASSERT(ds == 0);
1205 
1206 	atomic_set(&e->pending_bios, n_bios);
1207 	do {
1208 		bio = bios;
1209 		bios = bios->bi_next;
1210 		bio->bi_next = NULL;
1211 
1212 		/* strip off REQ_UNPLUG unless it is the last bio */
1213 		if (bios)
1214 			bio->bi_rw &= ~REQ_UNPLUG;
1215 
1216 		drbd_generic_make_request(mdev, fault_type, bio);
1217 
1218 		/* strip off REQ_HARDBARRIER,
1219 		 * unless it is the first or last bio */
1220 		if (bios && bios->bi_next)
1221 			bios->bi_rw &= ~REQ_HARDBARRIER;
1222 	} while (bios);
1223 	maybe_kick_lo(mdev);
1224 	return 0;
1225 
1226 fail:
1227 	while (bios) {
1228 		bio = bios;
1229 		bios = bios->bi_next;
1230 		bio_put(bio);
1231 	}
1232 	return -ENOMEM;
1233 }
1234 
1235 /**
1236  * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
1237  * @mdev:	DRBD device.
1238  * @w:		work object.
1239  * @cancel:	The connection will be closed anyways (unused in this callback)
1240  */
1241 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1242 {
1243 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1244 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1245 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1246 	   so that we can finish that epoch in drbd_may_finish_epoch().
1247 	   That is necessary if we already have a long chain of Epochs, before
1248 	   we realize that REQ_HARDBARRIER is actually not supported */
1249 
1250 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1251 	   that will never trigger. If it is reported late, we will just
1252 	   print that warning and continue correctly for all future requests
1253 	   with WO_bdev_flush */
1254 	if (previous_epoch(mdev, e->epoch))
1255 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1256 
1257 	/* we still have a local reference,
1258 	 * get_ldev was done in receive_Data. */
1259 
1260 	e->w.cb = e_end_block;
1261 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1262 		/* drbd_submit_ee fails for one reason only:
1263 		 * if was not able to allocate sufficient bios.
1264 		 * requeue, try again later. */
1265 		e->w.cb = w_e_reissue;
1266 		drbd_queue_work(&mdev->data.work, &e->w);
1267 	}
1268 	return 1;
1269 }
1270 
1271 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1272 {
1273 	int rv, issue_flush;
1274 	struct p_barrier *p = (struct p_barrier *)h;
1275 	struct drbd_epoch *epoch;
1276 
1277 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1278 
1279 	rv = drbd_recv(mdev, h->payload, h->length);
1280 	ERR_IF(rv != h->length) return FALSE;
1281 
1282 	inc_unacked(mdev);
1283 
1284 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1285 		drbd_kick_lo(mdev);
1286 
1287 	mdev->current_epoch->barrier_nr = p->barrier;
1288 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1289 
1290 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1291 	 * the activity log, which means it would not be resynced in case the
1292 	 * R_PRIMARY crashes now.
1293 	 * Therefore we must send the barrier_ack after the barrier request was
1294 	 * completed. */
1295 	switch (mdev->write_ordering) {
1296 	case WO_bio_barrier:
1297 	case WO_none:
1298 		if (rv == FE_RECYCLED)
1299 			return TRUE;
1300 		break;
1301 
1302 	case WO_bdev_flush:
1303 	case WO_drain_io:
1304 		if (rv == FE_STILL_LIVE) {
1305 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1306 			drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1307 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1308 		}
1309 		if (rv == FE_RECYCLED)
1310 			return TRUE;
1311 
1312 		/* The asender will send all the ACKs and barrier ACKs out, since
1313 		   all EEs moved from the active_ee to the done_ee. We need to
1314 		   provide a new epoch object for the EEs that come in soon */
1315 		break;
1316 	}
1317 
1318 	/* receiver context, in the writeout path of the other node.
1319 	 * avoid potential distributed deadlock */
1320 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1321 	if (!epoch) {
1322 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1323 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1324 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1325 		if (issue_flush) {
1326 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1327 			if (rv == FE_RECYCLED)
1328 				return TRUE;
1329 		}
1330 
1331 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1332 
1333 		return TRUE;
1334 	}
1335 
1336 	epoch->flags = 0;
1337 	atomic_set(&epoch->epoch_size, 0);
1338 	atomic_set(&epoch->active, 0);
1339 
1340 	spin_lock(&mdev->epoch_lock);
1341 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1342 		list_add(&epoch->list, &mdev->current_epoch->list);
1343 		mdev->current_epoch = epoch;
1344 		mdev->epochs++;
1345 	} else {
1346 		/* The current_epoch got recycled while we allocated this one... */
1347 		kfree(epoch);
1348 	}
1349 	spin_unlock(&mdev->epoch_lock);
1350 
1351 	return TRUE;
1352 }
1353 
1354 /* used from receive_RSDataReply (recv_resync_read)
1355  * and from receive_Data */
1356 static struct drbd_epoch_entry *
1357 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1358 {
1359 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1360 	struct drbd_epoch_entry *e;
1361 	struct page *page;
1362 	int dgs, ds, rr;
1363 	void *dig_in = mdev->int_dig_in;
1364 	void *dig_vv = mdev->int_dig_vv;
1365 	unsigned long *data;
1366 
1367 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1368 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1369 
1370 	if (dgs) {
1371 		rr = drbd_recv(mdev, dig_in, dgs);
1372 		if (rr != dgs) {
1373 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1374 			     rr, dgs);
1375 			return NULL;
1376 		}
1377 	}
1378 
1379 	data_size -= dgs;
1380 
1381 	ERR_IF(data_size &  0x1ff) return NULL;
1382 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1383 
1384 	/* even though we trust out peer,
1385 	 * we sometimes have to double check. */
1386 	if (sector + (data_size>>9) > capacity) {
1387 		dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1388 			(unsigned long long)capacity,
1389 			(unsigned long long)sector, data_size);
1390 		return NULL;
1391 	}
1392 
1393 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1394 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1395 	 * which in turn might block on the other node at this very place.  */
1396 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1397 	if (!e)
1398 		return NULL;
1399 
1400 	ds = data_size;
1401 	page = e->pages;
1402 	page_chain_for_each(page) {
1403 		unsigned len = min_t(int, ds, PAGE_SIZE);
1404 		data = kmap(page);
1405 		rr = drbd_recv(mdev, data, len);
1406 		if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1407 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1408 			data[0] = data[0] ^ (unsigned long)-1;
1409 		}
1410 		kunmap(page);
1411 		if (rr != len) {
1412 			drbd_free_ee(mdev, e);
1413 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1414 			     rr, len);
1415 			return NULL;
1416 		}
1417 		ds -= rr;
1418 	}
1419 
1420 	if (dgs) {
1421 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1422 		if (memcmp(dig_in, dig_vv, dgs)) {
1423 			dev_err(DEV, "Digest integrity check FAILED.\n");
1424 			drbd_bcast_ee(mdev, "digest failed",
1425 					dgs, dig_in, dig_vv, e);
1426 			drbd_free_ee(mdev, e);
1427 			return NULL;
1428 		}
1429 	}
1430 	mdev->recv_cnt += data_size>>9;
1431 	return e;
1432 }
1433 
1434 /* drbd_drain_block() just takes a data block
1435  * out of the socket input buffer, and discards it.
1436  */
1437 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1438 {
1439 	struct page *page;
1440 	int rr, rv = 1;
1441 	void *data;
1442 
1443 	if (!data_size)
1444 		return TRUE;
1445 
1446 	page = drbd_pp_alloc(mdev, 1, 1);
1447 
1448 	data = kmap(page);
1449 	while (data_size) {
1450 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1451 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1452 			rv = 0;
1453 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1454 			     rr, min_t(int, data_size, PAGE_SIZE));
1455 			break;
1456 		}
1457 		data_size -= rr;
1458 	}
1459 	kunmap(page);
1460 	drbd_pp_free(mdev, page);
1461 	return rv;
1462 }
1463 
1464 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1465 			   sector_t sector, int data_size)
1466 {
1467 	struct bio_vec *bvec;
1468 	struct bio *bio;
1469 	int dgs, rr, i, expect;
1470 	void *dig_in = mdev->int_dig_in;
1471 	void *dig_vv = mdev->int_dig_vv;
1472 
1473 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1474 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1475 
1476 	if (dgs) {
1477 		rr = drbd_recv(mdev, dig_in, dgs);
1478 		if (rr != dgs) {
1479 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1480 			     rr, dgs);
1481 			return 0;
1482 		}
1483 	}
1484 
1485 	data_size -= dgs;
1486 
1487 	/* optimistically update recv_cnt.  if receiving fails below,
1488 	 * we disconnect anyways, and counters will be reset. */
1489 	mdev->recv_cnt += data_size>>9;
1490 
1491 	bio = req->master_bio;
1492 	D_ASSERT(sector == bio->bi_sector);
1493 
1494 	bio_for_each_segment(bvec, bio, i) {
1495 		expect = min_t(int, data_size, bvec->bv_len);
1496 		rr = drbd_recv(mdev,
1497 			     kmap(bvec->bv_page)+bvec->bv_offset,
1498 			     expect);
1499 		kunmap(bvec->bv_page);
1500 		if (rr != expect) {
1501 			dev_warn(DEV, "short read receiving data reply: "
1502 			     "read %d expected %d\n",
1503 			     rr, expect);
1504 			return 0;
1505 		}
1506 		data_size -= rr;
1507 	}
1508 
1509 	if (dgs) {
1510 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1511 		if (memcmp(dig_in, dig_vv, dgs)) {
1512 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1513 			return 0;
1514 		}
1515 	}
1516 
1517 	D_ASSERT(data_size == 0);
1518 	return 1;
1519 }
1520 
1521 /* e_end_resync_block() is called via
1522  * drbd_process_done_ee() by asender only */
1523 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1524 {
1525 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1526 	sector_t sector = e->sector;
1527 	int ok;
1528 
1529 	D_ASSERT(hlist_unhashed(&e->colision));
1530 
1531 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1532 		drbd_set_in_sync(mdev, sector, e->size);
1533 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1534 	} else {
1535 		/* Record failure to sync */
1536 		drbd_rs_failed_io(mdev, sector, e->size);
1537 
1538 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1539 	}
1540 	dec_unacked(mdev);
1541 
1542 	return ok;
1543 }
1544 
1545 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1546 {
1547 	struct drbd_epoch_entry *e;
1548 
1549 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1550 	if (!e)
1551 		goto fail;
1552 
1553 	dec_rs_pending(mdev);
1554 
1555 	inc_unacked(mdev);
1556 	/* corresponding dec_unacked() in e_end_resync_block()
1557 	 * respective _drbd_clear_done_ee */
1558 
1559 	e->w.cb = e_end_resync_block;
1560 
1561 	spin_lock_irq(&mdev->req_lock);
1562 	list_add(&e->w.list, &mdev->sync_ee);
1563 	spin_unlock_irq(&mdev->req_lock);
1564 
1565 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1566 		return TRUE;
1567 
1568 	drbd_free_ee(mdev, e);
1569 fail:
1570 	put_ldev(mdev);
1571 	return FALSE;
1572 }
1573 
1574 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1575 {
1576 	struct drbd_request *req;
1577 	sector_t sector;
1578 	unsigned int header_size, data_size;
1579 	int ok;
1580 	struct p_data *p = (struct p_data *)h;
1581 
1582 	header_size = sizeof(*p) - sizeof(*h);
1583 	data_size   = h->length  - header_size;
1584 
1585 	ERR_IF(data_size == 0) return FALSE;
1586 
1587 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1588 		return FALSE;
1589 
1590 	sector = be64_to_cpu(p->sector);
1591 
1592 	spin_lock_irq(&mdev->req_lock);
1593 	req = _ar_id_to_req(mdev, p->block_id, sector);
1594 	spin_unlock_irq(&mdev->req_lock);
1595 	if (unlikely(!req)) {
1596 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1597 		return FALSE;
1598 	}
1599 
1600 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1601 	 * special casing it there for the various failure cases.
1602 	 * still no race with drbd_fail_pending_reads */
1603 	ok = recv_dless_read(mdev, req, sector, data_size);
1604 
1605 	if (ok)
1606 		req_mod(req, data_received);
1607 	/* else: nothing. handled from drbd_disconnect...
1608 	 * I don't think we may complete this just yet
1609 	 * in case we are "on-disconnect: freeze" */
1610 
1611 	return ok;
1612 }
1613 
1614 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1615 {
1616 	sector_t sector;
1617 	unsigned int header_size, data_size;
1618 	int ok;
1619 	struct p_data *p = (struct p_data *)h;
1620 
1621 	header_size = sizeof(*p) - sizeof(*h);
1622 	data_size   = h->length  - header_size;
1623 
1624 	ERR_IF(data_size == 0) return FALSE;
1625 
1626 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1627 		return FALSE;
1628 
1629 	sector = be64_to_cpu(p->sector);
1630 	D_ASSERT(p->block_id == ID_SYNCER);
1631 
1632 	if (get_ldev(mdev)) {
1633 		/* data is submitted to disk within recv_resync_read.
1634 		 * corresponding put_ldev done below on error,
1635 		 * or in drbd_endio_write_sec. */
1636 		ok = recv_resync_read(mdev, sector, data_size);
1637 	} else {
1638 		if (__ratelimit(&drbd_ratelimit_state))
1639 			dev_err(DEV, "Can not write resync data to local disk.\n");
1640 
1641 		ok = drbd_drain_block(mdev, data_size);
1642 
1643 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1644 	}
1645 
1646 	return ok;
1647 }
1648 
1649 /* e_end_block() is called via drbd_process_done_ee().
1650  * this means this function only runs in the asender thread
1651  */
1652 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1653 {
1654 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1655 	sector_t sector = e->sector;
1656 	struct drbd_epoch *epoch;
1657 	int ok = 1, pcmd;
1658 
1659 	if (e->flags & EE_IS_BARRIER) {
1660 		epoch = previous_epoch(mdev, e->epoch);
1661 		if (epoch)
1662 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1663 	}
1664 
1665 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1666 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1667 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1668 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1669 				e->flags & EE_MAY_SET_IN_SYNC) ?
1670 				P_RS_WRITE_ACK : P_WRITE_ACK;
1671 			ok &= drbd_send_ack(mdev, pcmd, e);
1672 			if (pcmd == P_RS_WRITE_ACK)
1673 				drbd_set_in_sync(mdev, sector, e->size);
1674 		} else {
1675 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1676 			/* we expect it to be marked out of sync anyways...
1677 			 * maybe assert this?  */
1678 		}
1679 		dec_unacked(mdev);
1680 	}
1681 	/* we delete from the conflict detection hash _after_ we sent out the
1682 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1683 	if (mdev->net_conf->two_primaries) {
1684 		spin_lock_irq(&mdev->req_lock);
1685 		D_ASSERT(!hlist_unhashed(&e->colision));
1686 		hlist_del_init(&e->colision);
1687 		spin_unlock_irq(&mdev->req_lock);
1688 	} else {
1689 		D_ASSERT(hlist_unhashed(&e->colision));
1690 	}
1691 
1692 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1693 
1694 	return ok;
1695 }
1696 
1697 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1698 {
1699 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1700 	int ok = 1;
1701 
1702 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1703 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1704 
1705 	spin_lock_irq(&mdev->req_lock);
1706 	D_ASSERT(!hlist_unhashed(&e->colision));
1707 	hlist_del_init(&e->colision);
1708 	spin_unlock_irq(&mdev->req_lock);
1709 
1710 	dec_unacked(mdev);
1711 
1712 	return ok;
1713 }
1714 
1715 /* Called from receive_Data.
1716  * Synchronize packets on sock with packets on msock.
1717  *
1718  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1719  * packet traveling on msock, they are still processed in the order they have
1720  * been sent.
1721  *
1722  * Note: we don't care for Ack packets overtaking P_DATA packets.
1723  *
1724  * In case packet_seq is larger than mdev->peer_seq number, there are
1725  * outstanding packets on the msock. We wait for them to arrive.
1726  * In case we are the logically next packet, we update mdev->peer_seq
1727  * ourselves. Correctly handles 32bit wrap around.
1728  *
1729  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1730  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1731  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1732  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1733  *
1734  * returns 0 if we may process the packet,
1735  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1736 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1737 {
1738 	DEFINE_WAIT(wait);
1739 	unsigned int p_seq;
1740 	long timeout;
1741 	int ret = 0;
1742 	spin_lock(&mdev->peer_seq_lock);
1743 	for (;;) {
1744 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1745 		if (seq_le(packet_seq, mdev->peer_seq+1))
1746 			break;
1747 		if (signal_pending(current)) {
1748 			ret = -ERESTARTSYS;
1749 			break;
1750 		}
1751 		p_seq = mdev->peer_seq;
1752 		spin_unlock(&mdev->peer_seq_lock);
1753 		timeout = schedule_timeout(30*HZ);
1754 		spin_lock(&mdev->peer_seq_lock);
1755 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1756 			ret = -ETIMEDOUT;
1757 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1758 			break;
1759 		}
1760 	}
1761 	finish_wait(&mdev->seq_wait, &wait);
1762 	if (mdev->peer_seq+1 == packet_seq)
1763 		mdev->peer_seq++;
1764 	spin_unlock(&mdev->peer_seq_lock);
1765 	return ret;
1766 }
1767 
1768 /* mirrored write */
1769 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1770 {
1771 	sector_t sector;
1772 	struct drbd_epoch_entry *e;
1773 	struct p_data *p = (struct p_data *)h;
1774 	int header_size, data_size;
1775 	int rw = WRITE;
1776 	u32 dp_flags;
1777 
1778 	header_size = sizeof(*p) - sizeof(*h);
1779 	data_size   = h->length  - header_size;
1780 
1781 	ERR_IF(data_size == 0) return FALSE;
1782 
1783 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1784 		return FALSE;
1785 
1786 	if (!get_ldev(mdev)) {
1787 		if (__ratelimit(&drbd_ratelimit_state))
1788 			dev_err(DEV, "Can not write mirrored data block "
1789 			    "to local disk.\n");
1790 		spin_lock(&mdev->peer_seq_lock);
1791 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1792 			mdev->peer_seq++;
1793 		spin_unlock(&mdev->peer_seq_lock);
1794 
1795 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1796 		atomic_inc(&mdev->current_epoch->epoch_size);
1797 		return drbd_drain_block(mdev, data_size);
1798 	}
1799 
1800 	/* get_ldev(mdev) successful.
1801 	 * Corresponding put_ldev done either below (on various errors),
1802 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1803 	 * the end of this function. */
1804 
1805 	sector = be64_to_cpu(p->sector);
1806 	e = read_in_block(mdev, p->block_id, sector, data_size);
1807 	if (!e) {
1808 		put_ldev(mdev);
1809 		return FALSE;
1810 	}
1811 
1812 	e->w.cb = e_end_block;
1813 
1814 	spin_lock(&mdev->epoch_lock);
1815 	e->epoch = mdev->current_epoch;
1816 	atomic_inc(&e->epoch->epoch_size);
1817 	atomic_inc(&e->epoch->active);
1818 
1819 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1820 		struct drbd_epoch *epoch;
1821 		/* Issue a barrier if we start a new epoch, and the previous epoch
1822 		   was not a epoch containing a single request which already was
1823 		   a Barrier. */
1824 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1825 		if (epoch == e->epoch) {
1826 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1827 			rw |= REQ_HARDBARRIER;
1828 			e->flags |= EE_IS_BARRIER;
1829 		} else {
1830 			if (atomic_read(&epoch->epoch_size) > 1 ||
1831 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1832 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1833 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1834 				rw |= REQ_HARDBARRIER;
1835 				e->flags |= EE_IS_BARRIER;
1836 			}
1837 		}
1838 	}
1839 	spin_unlock(&mdev->epoch_lock);
1840 
1841 	dp_flags = be32_to_cpu(p->dp_flags);
1842 	if (dp_flags & DP_HARDBARRIER) {
1843 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1844 		/* rw |= REQ_HARDBARRIER; */
1845 	}
1846 	if (dp_flags & DP_RW_SYNC)
1847 		rw |= REQ_SYNC | REQ_UNPLUG;
1848 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1849 		e->flags |= EE_MAY_SET_IN_SYNC;
1850 
1851 	/* I'm the receiver, I do hold a net_cnt reference. */
1852 	if (!mdev->net_conf->two_primaries) {
1853 		spin_lock_irq(&mdev->req_lock);
1854 	} else {
1855 		/* don't get the req_lock yet,
1856 		 * we may sleep in drbd_wait_peer_seq */
1857 		const int size = e->size;
1858 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1859 		DEFINE_WAIT(wait);
1860 		struct drbd_request *i;
1861 		struct hlist_node *n;
1862 		struct hlist_head *slot;
1863 		int first;
1864 
1865 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1866 		BUG_ON(mdev->ee_hash == NULL);
1867 		BUG_ON(mdev->tl_hash == NULL);
1868 
1869 		/* conflict detection and handling:
1870 		 * 1. wait on the sequence number,
1871 		 *    in case this data packet overtook ACK packets.
1872 		 * 2. check our hash tables for conflicting requests.
1873 		 *    we only need to walk the tl_hash, since an ee can not
1874 		 *    have a conflict with an other ee: on the submitting
1875 		 *    node, the corresponding req had already been conflicting,
1876 		 *    and a conflicting req is never sent.
1877 		 *
1878 		 * Note: for two_primaries, we are protocol C,
1879 		 * so there cannot be any request that is DONE
1880 		 * but still on the transfer log.
1881 		 *
1882 		 * unconditionally add to the ee_hash.
1883 		 *
1884 		 * if no conflicting request is found:
1885 		 *    submit.
1886 		 *
1887 		 * if any conflicting request is found
1888 		 * that has not yet been acked,
1889 		 * AND I have the "discard concurrent writes" flag:
1890 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1891 		 *
1892 		 * if any conflicting request is found:
1893 		 *	 block the receiver, waiting on misc_wait
1894 		 *	 until no more conflicting requests are there,
1895 		 *	 or we get interrupted (disconnect).
1896 		 *
1897 		 *	 we do not just write after local io completion of those
1898 		 *	 requests, but only after req is done completely, i.e.
1899 		 *	 we wait for the P_DISCARD_ACK to arrive!
1900 		 *
1901 		 *	 then proceed normally, i.e. submit.
1902 		 */
1903 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1904 			goto out_interrupted;
1905 
1906 		spin_lock_irq(&mdev->req_lock);
1907 
1908 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1909 
1910 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1911 		slot = tl_hash_slot(mdev, sector);
1912 		first = 1;
1913 		for (;;) {
1914 			int have_unacked = 0;
1915 			int have_conflict = 0;
1916 			prepare_to_wait(&mdev->misc_wait, &wait,
1917 				TASK_INTERRUPTIBLE);
1918 			hlist_for_each_entry(i, n, slot, colision) {
1919 				if (OVERLAPS) {
1920 					/* only ALERT on first iteration,
1921 					 * we may be woken up early... */
1922 					if (first)
1923 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1924 						      "	new: %llus +%u; pending: %llus +%u\n",
1925 						      current->comm, current->pid,
1926 						      (unsigned long long)sector, size,
1927 						      (unsigned long long)i->sector, i->size);
1928 					if (i->rq_state & RQ_NET_PENDING)
1929 						++have_unacked;
1930 					++have_conflict;
1931 				}
1932 			}
1933 #undef OVERLAPS
1934 			if (!have_conflict)
1935 				break;
1936 
1937 			/* Discard Ack only for the _first_ iteration */
1938 			if (first && discard && have_unacked) {
1939 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1940 				     (unsigned long long)sector);
1941 				inc_unacked(mdev);
1942 				e->w.cb = e_send_discard_ack;
1943 				list_add_tail(&e->w.list, &mdev->done_ee);
1944 
1945 				spin_unlock_irq(&mdev->req_lock);
1946 
1947 				/* we could probably send that P_DISCARD_ACK ourselves,
1948 				 * but I don't like the receiver using the msock */
1949 
1950 				put_ldev(mdev);
1951 				wake_asender(mdev);
1952 				finish_wait(&mdev->misc_wait, &wait);
1953 				return TRUE;
1954 			}
1955 
1956 			if (signal_pending(current)) {
1957 				hlist_del_init(&e->colision);
1958 
1959 				spin_unlock_irq(&mdev->req_lock);
1960 
1961 				finish_wait(&mdev->misc_wait, &wait);
1962 				goto out_interrupted;
1963 			}
1964 
1965 			spin_unlock_irq(&mdev->req_lock);
1966 			if (first) {
1967 				first = 0;
1968 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1969 				     "sec=%llus\n", (unsigned long long)sector);
1970 			} else if (discard) {
1971 				/* we had none on the first iteration.
1972 				 * there must be none now. */
1973 				D_ASSERT(have_unacked == 0);
1974 			}
1975 			schedule();
1976 			spin_lock_irq(&mdev->req_lock);
1977 		}
1978 		finish_wait(&mdev->misc_wait, &wait);
1979 	}
1980 
1981 	list_add(&e->w.list, &mdev->active_ee);
1982 	spin_unlock_irq(&mdev->req_lock);
1983 
1984 	switch (mdev->net_conf->wire_protocol) {
1985 	case DRBD_PROT_C:
1986 		inc_unacked(mdev);
1987 		/* corresponding dec_unacked() in e_end_block()
1988 		 * respective _drbd_clear_done_ee */
1989 		break;
1990 	case DRBD_PROT_B:
1991 		/* I really don't like it that the receiver thread
1992 		 * sends on the msock, but anyways */
1993 		drbd_send_ack(mdev, P_RECV_ACK, e);
1994 		break;
1995 	case DRBD_PROT_A:
1996 		/* nothing to do */
1997 		break;
1998 	}
1999 
2000 	if (mdev->state.pdsk == D_DISKLESS) {
2001 		/* In case we have the only disk of the cluster, */
2002 		drbd_set_out_of_sync(mdev, e->sector, e->size);
2003 		e->flags |= EE_CALL_AL_COMPLETE_IO;
2004 		drbd_al_begin_io(mdev, e->sector);
2005 	}
2006 
2007 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2008 		return TRUE;
2009 
2010 out_interrupted:
2011 	/* yes, the epoch_size now is imbalanced.
2012 	 * but we drop the connection anyways, so we don't have a chance to
2013 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2014 	put_ldev(mdev);
2015 	drbd_free_ee(mdev, e);
2016 	return FALSE;
2017 }
2018 
2019 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2020 {
2021 	sector_t sector;
2022 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2023 	struct drbd_epoch_entry *e;
2024 	struct digest_info *di = NULL;
2025 	int size, digest_size;
2026 	unsigned int fault_type;
2027 	struct p_block_req *p =
2028 		(struct p_block_req *)h;
2029 	const int brps = sizeof(*p)-sizeof(*h);
2030 
2031 	if (drbd_recv(mdev, h->payload, brps) != brps)
2032 		return FALSE;
2033 
2034 	sector = be64_to_cpu(p->sector);
2035 	size   = be32_to_cpu(p->blksize);
2036 
2037 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2038 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2039 				(unsigned long long)sector, size);
2040 		return FALSE;
2041 	}
2042 	if (sector + (size>>9) > capacity) {
2043 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2044 				(unsigned long long)sector, size);
2045 		return FALSE;
2046 	}
2047 
2048 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2049 		if (__ratelimit(&drbd_ratelimit_state))
2050 			dev_err(DEV, "Can not satisfy peer's read request, "
2051 			    "no local data.\n");
2052 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2053 				 P_NEG_RS_DREPLY , p);
2054 		return drbd_drain_block(mdev, h->length - brps);
2055 	}
2056 
2057 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2058 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2059 	 * which in turn might block on the other node at this very place.  */
2060 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2061 	if (!e) {
2062 		put_ldev(mdev);
2063 		return FALSE;
2064 	}
2065 
2066 	switch (h->command) {
2067 	case P_DATA_REQUEST:
2068 		e->w.cb = w_e_end_data_req;
2069 		fault_type = DRBD_FAULT_DT_RD;
2070 		break;
2071 	case P_RS_DATA_REQUEST:
2072 		e->w.cb = w_e_end_rsdata_req;
2073 		fault_type = DRBD_FAULT_RS_RD;
2074 		/* Eventually this should become asynchronously. Currently it
2075 		 * blocks the whole receiver just to delay the reading of a
2076 		 * resync data block.
2077 		 * the drbd_work_queue mechanism is made for this...
2078 		 */
2079 		if (!drbd_rs_begin_io(mdev, sector)) {
2080 			/* we have been interrupted,
2081 			 * probably connection lost! */
2082 			D_ASSERT(signal_pending(current));
2083 			goto out_free_e;
2084 		}
2085 		break;
2086 
2087 	case P_OV_REPLY:
2088 	case P_CSUM_RS_REQUEST:
2089 		fault_type = DRBD_FAULT_RS_RD;
2090 		digest_size = h->length - brps ;
2091 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2092 		if (!di)
2093 			goto out_free_e;
2094 
2095 		di->digest_size = digest_size;
2096 		di->digest = (((char *)di)+sizeof(struct digest_info));
2097 
2098 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2099 			goto out_free_e;
2100 
2101 		e->block_id = (u64)(unsigned long)di;
2102 		if (h->command == P_CSUM_RS_REQUEST) {
2103 			D_ASSERT(mdev->agreed_pro_version >= 89);
2104 			e->w.cb = w_e_end_csum_rs_req;
2105 		} else if (h->command == P_OV_REPLY) {
2106 			e->w.cb = w_e_end_ov_reply;
2107 			dec_rs_pending(mdev);
2108 			break;
2109 		}
2110 
2111 		if (!drbd_rs_begin_io(mdev, sector)) {
2112 			/* we have been interrupted, probably connection lost! */
2113 			D_ASSERT(signal_pending(current));
2114 			goto out_free_e;
2115 		}
2116 		break;
2117 
2118 	case P_OV_REQUEST:
2119 		if (mdev->state.conn >= C_CONNECTED &&
2120 		    mdev->state.conn != C_VERIFY_T)
2121 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2122 				drbd_conn_str(mdev->state.conn));
2123 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2124 		    mdev->agreed_pro_version >= 90) {
2125 			mdev->ov_start_sector = sector;
2126 			mdev->ov_position = sector;
2127 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2128 			dev_info(DEV, "Online Verify start sector: %llu\n",
2129 					(unsigned long long)sector);
2130 		}
2131 		e->w.cb = w_e_end_ov_req;
2132 		fault_type = DRBD_FAULT_RS_RD;
2133 		/* Eventually this should become asynchronous. Currently it
2134 		 * blocks the whole receiver just to delay the reading of a
2135 		 * resync data block.
2136 		 * the drbd_work_queue mechanism is made for this...
2137 		 */
2138 		if (!drbd_rs_begin_io(mdev, sector)) {
2139 			/* we have been interrupted,
2140 			 * probably connection lost! */
2141 			D_ASSERT(signal_pending(current));
2142 			goto out_free_e;
2143 		}
2144 		break;
2145 
2146 
2147 	default:
2148 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2149 		    cmdname(h->command));
2150 		fault_type = DRBD_FAULT_MAX;
2151 	}
2152 
2153 	spin_lock_irq(&mdev->req_lock);
2154 	list_add(&e->w.list, &mdev->read_ee);
2155 	spin_unlock_irq(&mdev->req_lock);
2156 
2157 	inc_unacked(mdev);
2158 
2159 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2160 		return TRUE;
2161 
2162 out_free_e:
2163 	kfree(di);
2164 	put_ldev(mdev);
2165 	drbd_free_ee(mdev, e);
2166 	return FALSE;
2167 }
2168 
2169 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2170 {
2171 	int self, peer, rv = -100;
2172 	unsigned long ch_self, ch_peer;
2173 
2174 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2175 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2176 
2177 	ch_peer = mdev->p_uuid[UI_SIZE];
2178 	ch_self = mdev->comm_bm_set;
2179 
2180 	switch (mdev->net_conf->after_sb_0p) {
2181 	case ASB_CONSENSUS:
2182 	case ASB_DISCARD_SECONDARY:
2183 	case ASB_CALL_HELPER:
2184 		dev_err(DEV, "Configuration error.\n");
2185 		break;
2186 	case ASB_DISCONNECT:
2187 		break;
2188 	case ASB_DISCARD_YOUNGER_PRI:
2189 		if (self == 0 && peer == 1) {
2190 			rv = -1;
2191 			break;
2192 		}
2193 		if (self == 1 && peer == 0) {
2194 			rv =  1;
2195 			break;
2196 		}
2197 		/* Else fall through to one of the other strategies... */
2198 	case ASB_DISCARD_OLDER_PRI:
2199 		if (self == 0 && peer == 1) {
2200 			rv = 1;
2201 			break;
2202 		}
2203 		if (self == 1 && peer == 0) {
2204 			rv = -1;
2205 			break;
2206 		}
2207 		/* Else fall through to one of the other strategies... */
2208 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2209 		     "Using discard-least-changes instead\n");
2210 	case ASB_DISCARD_ZERO_CHG:
2211 		if (ch_peer == 0 && ch_self == 0) {
2212 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2213 				? -1 : 1;
2214 			break;
2215 		} else {
2216 			if (ch_peer == 0) { rv =  1; break; }
2217 			if (ch_self == 0) { rv = -1; break; }
2218 		}
2219 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2220 			break;
2221 	case ASB_DISCARD_LEAST_CHG:
2222 		if	(ch_self < ch_peer)
2223 			rv = -1;
2224 		else if (ch_self > ch_peer)
2225 			rv =  1;
2226 		else /* ( ch_self == ch_peer ) */
2227 		     /* Well, then use something else. */
2228 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2229 				? -1 : 1;
2230 		break;
2231 	case ASB_DISCARD_LOCAL:
2232 		rv = -1;
2233 		break;
2234 	case ASB_DISCARD_REMOTE:
2235 		rv =  1;
2236 	}
2237 
2238 	return rv;
2239 }
2240 
2241 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2242 {
2243 	int self, peer, hg, rv = -100;
2244 
2245 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2246 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2247 
2248 	switch (mdev->net_conf->after_sb_1p) {
2249 	case ASB_DISCARD_YOUNGER_PRI:
2250 	case ASB_DISCARD_OLDER_PRI:
2251 	case ASB_DISCARD_LEAST_CHG:
2252 	case ASB_DISCARD_LOCAL:
2253 	case ASB_DISCARD_REMOTE:
2254 		dev_err(DEV, "Configuration error.\n");
2255 		break;
2256 	case ASB_DISCONNECT:
2257 		break;
2258 	case ASB_CONSENSUS:
2259 		hg = drbd_asb_recover_0p(mdev);
2260 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2261 			rv = hg;
2262 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2263 			rv = hg;
2264 		break;
2265 	case ASB_VIOLENTLY:
2266 		rv = drbd_asb_recover_0p(mdev);
2267 		break;
2268 	case ASB_DISCARD_SECONDARY:
2269 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2270 	case ASB_CALL_HELPER:
2271 		hg = drbd_asb_recover_0p(mdev);
2272 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2273 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2274 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2275 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2276 			  * we do not need to wait for the after state change work either. */
2277 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2278 			if (self != SS_SUCCESS) {
2279 				drbd_khelper(mdev, "pri-lost-after-sb");
2280 			} else {
2281 				dev_warn(DEV, "Successfully gave up primary role.\n");
2282 				rv = hg;
2283 			}
2284 		} else
2285 			rv = hg;
2286 	}
2287 
2288 	return rv;
2289 }
2290 
2291 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2292 {
2293 	int self, peer, hg, rv = -100;
2294 
2295 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2296 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2297 
2298 	switch (mdev->net_conf->after_sb_2p) {
2299 	case ASB_DISCARD_YOUNGER_PRI:
2300 	case ASB_DISCARD_OLDER_PRI:
2301 	case ASB_DISCARD_LEAST_CHG:
2302 	case ASB_DISCARD_LOCAL:
2303 	case ASB_DISCARD_REMOTE:
2304 	case ASB_CONSENSUS:
2305 	case ASB_DISCARD_SECONDARY:
2306 		dev_err(DEV, "Configuration error.\n");
2307 		break;
2308 	case ASB_VIOLENTLY:
2309 		rv = drbd_asb_recover_0p(mdev);
2310 		break;
2311 	case ASB_DISCONNECT:
2312 		break;
2313 	case ASB_CALL_HELPER:
2314 		hg = drbd_asb_recover_0p(mdev);
2315 		if (hg == -1) {
2316 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2318 			  * we do not need to wait for the after state change work either. */
2319 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320 			if (self != SS_SUCCESS) {
2321 				drbd_khelper(mdev, "pri-lost-after-sb");
2322 			} else {
2323 				dev_warn(DEV, "Successfully gave up primary role.\n");
2324 				rv = hg;
2325 			}
2326 		} else
2327 			rv = hg;
2328 	}
2329 
2330 	return rv;
2331 }
2332 
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334 			   u64 bits, u64 flags)
2335 {
2336 	if (!uuid) {
2337 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338 		return;
2339 	}
2340 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341 	     text,
2342 	     (unsigned long long)uuid[UI_CURRENT],
2343 	     (unsigned long long)uuid[UI_BITMAP],
2344 	     (unsigned long long)uuid[UI_HISTORY_START],
2345 	     (unsigned long long)uuid[UI_HISTORY_END],
2346 	     (unsigned long long)bits,
2347 	     (unsigned long long)flags);
2348 }
2349 
2350 /*
2351   100	after split brain try auto recover
2352     2	C_SYNC_SOURCE set BitMap
2353     1	C_SYNC_SOURCE use BitMap
2354     0	no Sync
2355    -1	C_SYNC_TARGET use BitMap
2356    -2	C_SYNC_TARGET set BitMap
2357  -100	after split brain, disconnect
2358 -1000	unrelated data
2359  */
2360 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2361 {
2362 	u64 self, peer;
2363 	int i, j;
2364 
2365 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2366 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2367 
2368 	*rule_nr = 10;
2369 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2370 		return 0;
2371 
2372 	*rule_nr = 20;
2373 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2374 	     peer != UUID_JUST_CREATED)
2375 		return -2;
2376 
2377 	*rule_nr = 30;
2378 	if (self != UUID_JUST_CREATED &&
2379 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2380 		return 2;
2381 
2382 	if (self == peer) {
2383 		int rct, dc; /* roles at crash time */
2384 
2385 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2386 
2387 			if (mdev->agreed_pro_version < 91)
2388 				return -1001;
2389 
2390 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2391 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2392 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2393 				drbd_uuid_set_bm(mdev, 0UL);
2394 
2395 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2396 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2397 				*rule_nr = 34;
2398 			} else {
2399 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2400 				*rule_nr = 36;
2401 			}
2402 
2403 			return 1;
2404 		}
2405 
2406 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2407 
2408 			if (mdev->agreed_pro_version < 91)
2409 				return -1001;
2410 
2411 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2412 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2413 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2414 
2415 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2416 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2417 				mdev->p_uuid[UI_BITMAP] = 0UL;
2418 
2419 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2420 				*rule_nr = 35;
2421 			} else {
2422 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2423 				*rule_nr = 37;
2424 			}
2425 
2426 			return -1;
2427 		}
2428 
2429 		/* Common power [off|failure] */
2430 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2431 			(mdev->p_uuid[UI_FLAGS] & 2);
2432 		/* lowest bit is set when we were primary,
2433 		 * next bit (weight 2) is set when peer was primary */
2434 		*rule_nr = 40;
2435 
2436 		switch (rct) {
2437 		case 0: /* !self_pri && !peer_pri */ return 0;
2438 		case 1: /*  self_pri && !peer_pri */ return 1;
2439 		case 2: /* !self_pri &&  peer_pri */ return -1;
2440 		case 3: /*  self_pri &&  peer_pri */
2441 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2442 			return dc ? -1 : 1;
2443 		}
2444 	}
2445 
2446 	*rule_nr = 50;
2447 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2448 	if (self == peer)
2449 		return -1;
2450 
2451 	*rule_nr = 51;
2452 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2453 	if (self == peer) {
2454 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2455 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2456 		if (self == peer) {
2457 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2458 			   resync as sync source modifications of the peer's UUIDs. */
2459 
2460 			if (mdev->agreed_pro_version < 91)
2461 				return -1001;
2462 
2463 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2464 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2465 			return -1;
2466 		}
2467 	}
2468 
2469 	*rule_nr = 60;
2470 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2471 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472 		peer = mdev->p_uuid[i] & ~((u64)1);
2473 		if (self == peer)
2474 			return -2;
2475 	}
2476 
2477 	*rule_nr = 70;
2478 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2480 	if (self == peer)
2481 		return 1;
2482 
2483 	*rule_nr = 71;
2484 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2485 	if (self == peer) {
2486 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2487 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2488 		if (self == peer) {
2489 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2490 			   resync as sync source modifications of our UUIDs. */
2491 
2492 			if (mdev->agreed_pro_version < 91)
2493 				return -1001;
2494 
2495 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2496 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2497 
2498 			dev_info(DEV, "Undid last start of resync:\n");
2499 
2500 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2501 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2502 
2503 			return 1;
2504 		}
2505 	}
2506 
2507 
2508 	*rule_nr = 80;
2509 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2510 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2511 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2512 		if (self == peer)
2513 			return 2;
2514 	}
2515 
2516 	*rule_nr = 90;
2517 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2518 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2519 	if (self == peer && self != ((u64)0))
2520 		return 100;
2521 
2522 	*rule_nr = 100;
2523 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2524 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2525 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2526 			peer = mdev->p_uuid[j] & ~((u64)1);
2527 			if (self == peer)
2528 				return -100;
2529 		}
2530 	}
2531 
2532 	return -1000;
2533 }
2534 
2535 /* drbd_sync_handshake() returns the new conn state on success, or
2536    CONN_MASK (-1) on failure.
2537  */
2538 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2539 					   enum drbd_disk_state peer_disk) __must_hold(local)
2540 {
2541 	int hg, rule_nr;
2542 	enum drbd_conns rv = C_MASK;
2543 	enum drbd_disk_state mydisk;
2544 
2545 	mydisk = mdev->state.disk;
2546 	if (mydisk == D_NEGOTIATING)
2547 		mydisk = mdev->new_state_tmp.disk;
2548 
2549 	dev_info(DEV, "drbd_sync_handshake:\n");
2550 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2551 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2552 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2553 
2554 	hg = drbd_uuid_compare(mdev, &rule_nr);
2555 
2556 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2557 
2558 	if (hg == -1000) {
2559 		dev_alert(DEV, "Unrelated data, aborting!\n");
2560 		return C_MASK;
2561 	}
2562 	if (hg == -1001) {
2563 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2564 		return C_MASK;
2565 	}
2566 
2567 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2568 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2569 		int f = (hg == -100) || abs(hg) == 2;
2570 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2571 		if (f)
2572 			hg = hg*2;
2573 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2574 		     hg > 0 ? "source" : "target");
2575 	}
2576 
2577 	if (abs(hg) == 100)
2578 		drbd_khelper(mdev, "initial-split-brain");
2579 
2580 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2581 		int pcount = (mdev->state.role == R_PRIMARY)
2582 			   + (peer_role == R_PRIMARY);
2583 		int forced = (hg == -100);
2584 
2585 		switch (pcount) {
2586 		case 0:
2587 			hg = drbd_asb_recover_0p(mdev);
2588 			break;
2589 		case 1:
2590 			hg = drbd_asb_recover_1p(mdev);
2591 			break;
2592 		case 2:
2593 			hg = drbd_asb_recover_2p(mdev);
2594 			break;
2595 		}
2596 		if (abs(hg) < 100) {
2597 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2598 			     "automatically solved. Sync from %s node\n",
2599 			     pcount, (hg < 0) ? "peer" : "this");
2600 			if (forced) {
2601 				dev_warn(DEV, "Doing a full sync, since"
2602 				     " UUIDs where ambiguous.\n");
2603 				hg = hg*2;
2604 			}
2605 		}
2606 	}
2607 
2608 	if (hg == -100) {
2609 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2610 			hg = -1;
2611 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2612 			hg = 1;
2613 
2614 		if (abs(hg) < 100)
2615 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2616 			     "Sync from %s node\n",
2617 			     (hg < 0) ? "peer" : "this");
2618 	}
2619 
2620 	if (hg == -100) {
2621 		/* FIXME this log message is not correct if we end up here
2622 		 * after an attempted attach on a diskless node.
2623 		 * We just refuse to attach -- well, we drop the "connection"
2624 		 * to that disk, in a way... */
2625 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2626 		drbd_khelper(mdev, "split-brain");
2627 		return C_MASK;
2628 	}
2629 
2630 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2631 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2632 		return C_MASK;
2633 	}
2634 
2635 	if (hg < 0 && /* by intention we do not use mydisk here. */
2636 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2637 		switch (mdev->net_conf->rr_conflict) {
2638 		case ASB_CALL_HELPER:
2639 			drbd_khelper(mdev, "pri-lost");
2640 			/* fall through */
2641 		case ASB_DISCONNECT:
2642 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2643 			return C_MASK;
2644 		case ASB_VIOLENTLY:
2645 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2646 			     "assumption\n");
2647 		}
2648 	}
2649 
2650 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2651 		if (hg == 0)
2652 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2653 		else
2654 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2655 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2656 				 abs(hg) >= 2 ? "full" : "bit-map based");
2657 		return C_MASK;
2658 	}
2659 
2660 	if (abs(hg) >= 2) {
2661 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2662 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2663 			return C_MASK;
2664 	}
2665 
2666 	if (hg > 0) { /* become sync source. */
2667 		rv = C_WF_BITMAP_S;
2668 	} else if (hg < 0) { /* become sync target */
2669 		rv = C_WF_BITMAP_T;
2670 	} else {
2671 		rv = C_CONNECTED;
2672 		if (drbd_bm_total_weight(mdev)) {
2673 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2674 			     drbd_bm_total_weight(mdev));
2675 		}
2676 	}
2677 
2678 	return rv;
2679 }
2680 
2681 /* returns 1 if invalid */
2682 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2683 {
2684 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2685 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2686 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2687 		return 0;
2688 
2689 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2690 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2691 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2692 		return 1;
2693 
2694 	/* everything else is valid if they are equal on both sides. */
2695 	if (peer == self)
2696 		return 0;
2697 
2698 	/* everything es is invalid. */
2699 	return 1;
2700 }
2701 
2702 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2703 {
2704 	struct p_protocol *p = (struct p_protocol *)h;
2705 	int header_size, data_size;
2706 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2707 	int p_want_lose, p_two_primaries, cf;
2708 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2709 
2710 	header_size = sizeof(*p) - sizeof(*h);
2711 	data_size   = h->length  - header_size;
2712 
2713 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2714 		return FALSE;
2715 
2716 	p_proto		= be32_to_cpu(p->protocol);
2717 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2718 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2719 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2720 	p_two_primaries = be32_to_cpu(p->two_primaries);
2721 	cf		= be32_to_cpu(p->conn_flags);
2722 	p_want_lose = cf & CF_WANT_LOSE;
2723 
2724 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2725 
2726 	if (cf & CF_DRY_RUN)
2727 		set_bit(CONN_DRY_RUN, &mdev->flags);
2728 
2729 	if (p_proto != mdev->net_conf->wire_protocol) {
2730 		dev_err(DEV, "incompatible communication protocols\n");
2731 		goto disconnect;
2732 	}
2733 
2734 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2735 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2736 		goto disconnect;
2737 	}
2738 
2739 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2740 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2741 		goto disconnect;
2742 	}
2743 
2744 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2745 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2746 		goto disconnect;
2747 	}
2748 
2749 	if (p_want_lose && mdev->net_conf->want_lose) {
2750 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2751 		goto disconnect;
2752 	}
2753 
2754 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2755 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2756 		goto disconnect;
2757 	}
2758 
2759 	if (mdev->agreed_pro_version >= 87) {
2760 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2761 
2762 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2763 			return FALSE;
2764 
2765 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2766 		if (strcmp(p_integrity_alg, my_alg)) {
2767 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2768 			goto disconnect;
2769 		}
2770 		dev_info(DEV, "data-integrity-alg: %s\n",
2771 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2772 	}
2773 
2774 	return TRUE;
2775 
2776 disconnect:
2777 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2778 	return FALSE;
2779 }
2780 
2781 /* helper function
2782  * input: alg name, feature name
2783  * return: NULL (alg name was "")
2784  *         ERR_PTR(error) if something goes wrong
2785  *         or the crypto hash ptr, if it worked out ok. */
2786 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2787 		const char *alg, const char *name)
2788 {
2789 	struct crypto_hash *tfm;
2790 
2791 	if (!alg[0])
2792 		return NULL;
2793 
2794 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2795 	if (IS_ERR(tfm)) {
2796 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2797 			alg, name, PTR_ERR(tfm));
2798 		return tfm;
2799 	}
2800 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2801 		crypto_free_hash(tfm);
2802 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2803 		return ERR_PTR(-EINVAL);
2804 	}
2805 	return tfm;
2806 }
2807 
2808 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2809 {
2810 	int ok = TRUE;
2811 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2812 	unsigned int header_size, data_size, exp_max_sz;
2813 	struct crypto_hash *verify_tfm = NULL;
2814 	struct crypto_hash *csums_tfm = NULL;
2815 	const int apv = mdev->agreed_pro_version;
2816 
2817 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2818 		    : apv == 88 ? sizeof(struct p_rs_param)
2819 					+ SHARED_SECRET_MAX
2820 		    : /* 89 */    sizeof(struct p_rs_param_89);
2821 
2822 	if (h->length > exp_max_sz) {
2823 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2824 		    h->length, exp_max_sz);
2825 		return FALSE;
2826 	}
2827 
2828 	if (apv <= 88) {
2829 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2830 		data_size   = h->length  - header_size;
2831 	} else /* apv >= 89 */ {
2832 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2833 		data_size   = h->length  - header_size;
2834 		D_ASSERT(data_size == 0);
2835 	}
2836 
2837 	/* initialize verify_alg and csums_alg */
2838 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2839 
2840 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2841 		return FALSE;
2842 
2843 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2844 
2845 	if (apv >= 88) {
2846 		if (apv == 88) {
2847 			if (data_size > SHARED_SECRET_MAX) {
2848 				dev_err(DEV, "verify-alg too long, "
2849 				    "peer wants %u, accepting only %u byte\n",
2850 						data_size, SHARED_SECRET_MAX);
2851 				return FALSE;
2852 			}
2853 
2854 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2855 				return FALSE;
2856 
2857 			/* we expect NUL terminated string */
2858 			/* but just in case someone tries to be evil */
2859 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2860 			p->verify_alg[data_size-1] = 0;
2861 
2862 		} else /* apv >= 89 */ {
2863 			/* we still expect NUL terminated strings */
2864 			/* but just in case someone tries to be evil */
2865 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2866 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2867 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2868 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2869 		}
2870 
2871 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2872 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2873 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2874 				    mdev->sync_conf.verify_alg, p->verify_alg);
2875 				goto disconnect;
2876 			}
2877 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2878 					p->verify_alg, "verify-alg");
2879 			if (IS_ERR(verify_tfm)) {
2880 				verify_tfm = NULL;
2881 				goto disconnect;
2882 			}
2883 		}
2884 
2885 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2886 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2887 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2888 				    mdev->sync_conf.csums_alg, p->csums_alg);
2889 				goto disconnect;
2890 			}
2891 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2892 					p->csums_alg, "csums-alg");
2893 			if (IS_ERR(csums_tfm)) {
2894 				csums_tfm = NULL;
2895 				goto disconnect;
2896 			}
2897 		}
2898 
2899 
2900 		spin_lock(&mdev->peer_seq_lock);
2901 		/* lock against drbd_nl_syncer_conf() */
2902 		if (verify_tfm) {
2903 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2904 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2905 			crypto_free_hash(mdev->verify_tfm);
2906 			mdev->verify_tfm = verify_tfm;
2907 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2908 		}
2909 		if (csums_tfm) {
2910 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2911 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2912 			crypto_free_hash(mdev->csums_tfm);
2913 			mdev->csums_tfm = csums_tfm;
2914 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2915 		}
2916 		spin_unlock(&mdev->peer_seq_lock);
2917 	}
2918 
2919 	return ok;
2920 disconnect:
2921 	/* just for completeness: actually not needed,
2922 	 * as this is not reached if csums_tfm was ok. */
2923 	crypto_free_hash(csums_tfm);
2924 	/* but free the verify_tfm again, if csums_tfm did not work out */
2925 	crypto_free_hash(verify_tfm);
2926 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2927 	return FALSE;
2928 }
2929 
2930 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2931 {
2932 	/* sorry, we currently have no working implementation
2933 	 * of distributed TCQ */
2934 }
2935 
2936 /* warn if the arguments differ by more than 12.5% */
2937 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2938 	const char *s, sector_t a, sector_t b)
2939 {
2940 	sector_t d;
2941 	if (a == 0 || b == 0)
2942 		return;
2943 	d = (a > b) ? (a - b) : (b - a);
2944 	if (d > (a>>3) || d > (b>>3))
2945 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2946 		     (unsigned long long)a, (unsigned long long)b);
2947 }
2948 
2949 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2950 {
2951 	struct p_sizes *p = (struct p_sizes *)h;
2952 	enum determine_dev_size dd = unchanged;
2953 	unsigned int max_seg_s;
2954 	sector_t p_size, p_usize, my_usize;
2955 	int ldsc = 0; /* local disk size changed */
2956 	enum dds_flags ddsf;
2957 
2958 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2959 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2960 		return FALSE;
2961 
2962 	p_size = be64_to_cpu(p->d_size);
2963 	p_usize = be64_to_cpu(p->u_size);
2964 
2965 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2966 		dev_err(DEV, "some backing storage is needed\n");
2967 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968 		return FALSE;
2969 	}
2970 
2971 	/* just store the peer's disk size for now.
2972 	 * we still need to figure out whether we accept that. */
2973 	mdev->p_size = p_size;
2974 
2975 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2976 	if (get_ldev(mdev)) {
2977 		warn_if_differ_considerably(mdev, "lower level device sizes",
2978 			   p_size, drbd_get_max_capacity(mdev->ldev));
2979 		warn_if_differ_considerably(mdev, "user requested size",
2980 					    p_usize, mdev->ldev->dc.disk_size);
2981 
2982 		/* if this is the first connect, or an otherwise expected
2983 		 * param exchange, choose the minimum */
2984 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2985 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2986 					     p_usize);
2987 
2988 		my_usize = mdev->ldev->dc.disk_size;
2989 
2990 		if (mdev->ldev->dc.disk_size != p_usize) {
2991 			mdev->ldev->dc.disk_size = p_usize;
2992 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2993 			     (unsigned long)mdev->ldev->dc.disk_size);
2994 		}
2995 
2996 		/* Never shrink a device with usable data during connect.
2997 		   But allow online shrinking if we are connected. */
2998 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2999 		   drbd_get_capacity(mdev->this_bdev) &&
3000 		   mdev->state.disk >= D_OUTDATED &&
3001 		   mdev->state.conn < C_CONNECTED) {
3002 			dev_err(DEV, "The peer's disk size is too small!\n");
3003 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3004 			mdev->ldev->dc.disk_size = my_usize;
3005 			put_ldev(mdev);
3006 			return FALSE;
3007 		}
3008 		put_ldev(mdev);
3009 	}
3010 #undef min_not_zero
3011 
3012 	ddsf = be16_to_cpu(p->dds_flags);
3013 	if (get_ldev(mdev)) {
3014 		dd = drbd_determin_dev_size(mdev, ddsf);
3015 		put_ldev(mdev);
3016 		if (dd == dev_size_error)
3017 			return FALSE;
3018 		drbd_md_sync(mdev);
3019 	} else {
3020 		/* I am diskless, need to accept the peer's size. */
3021 		drbd_set_my_capacity(mdev, p_size);
3022 	}
3023 
3024 	if (get_ldev(mdev)) {
3025 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3026 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3027 			ldsc = 1;
3028 		}
3029 
3030 		if (mdev->agreed_pro_version < 94)
3031 			max_seg_s = be32_to_cpu(p->max_segment_size);
3032 		else /* drbd 8.3.8 onwards */
3033 			max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3034 
3035 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3036 			drbd_setup_queue_param(mdev, max_seg_s);
3037 
3038 		drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3039 		put_ldev(mdev);
3040 	}
3041 
3042 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043 		if (be64_to_cpu(p->c_size) !=
3044 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045 			/* we have different sizes, probably peer
3046 			 * needs to know my new size... */
3047 			drbd_send_sizes(mdev, 0, ddsf);
3048 		}
3049 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3052 			    mdev->state.disk >= D_INCONSISTENT) {
3053 				if (ddsf & DDSF_NO_RESYNC)
3054 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055 				else
3056 					resync_after_online_grow(mdev);
3057 			} else
3058 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059 		}
3060 	}
3061 
3062 	return TRUE;
3063 }
3064 
3065 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3066 {
3067 	struct p_uuids *p = (struct p_uuids *)h;
3068 	u64 *p_uuid;
3069 	int i;
3070 
3071 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3072 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073 		return FALSE;
3074 
3075 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3076 
3077 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3078 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3079 
3080 	kfree(mdev->p_uuid);
3081 	mdev->p_uuid = p_uuid;
3082 
3083 	if (mdev->state.conn < C_CONNECTED &&
3084 	    mdev->state.disk < D_INCONSISTENT &&
3085 	    mdev->state.role == R_PRIMARY &&
3086 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3087 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3088 		    (unsigned long long)mdev->ed_uuid);
3089 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3090 		return FALSE;
3091 	}
3092 
3093 	if (get_ldev(mdev)) {
3094 		int skip_initial_sync =
3095 			mdev->state.conn == C_CONNECTED &&
3096 			mdev->agreed_pro_version >= 90 &&
3097 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3098 			(p_uuid[UI_FLAGS] & 8);
3099 		if (skip_initial_sync) {
3100 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3101 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3102 					"clear_n_write from receive_uuids");
3103 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3104 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3105 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3106 					CS_VERBOSE, NULL);
3107 			drbd_md_sync(mdev);
3108 		}
3109 		put_ldev(mdev);
3110 	}
3111 
3112 	/* Before we test for the disk state, we should wait until an eventually
3113 	   ongoing cluster wide state change is finished. That is important if
3114 	   we are primary and are detaching from our disk. We need to see the
3115 	   new disk state... */
3116 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3117 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3118 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3119 
3120 	return TRUE;
3121 }
3122 
3123 /**
3124  * convert_state() - Converts the peer's view of the cluster state to our point of view
3125  * @ps:		The state as seen by the peer.
3126  */
3127 static union drbd_state convert_state(union drbd_state ps)
3128 {
3129 	union drbd_state ms;
3130 
3131 	static enum drbd_conns c_tab[] = {
3132 		[C_CONNECTED] = C_CONNECTED,
3133 
3134 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3135 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3136 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3137 		[C_VERIFY_S]       = C_VERIFY_T,
3138 		[C_MASK]   = C_MASK,
3139 	};
3140 
3141 	ms.i = ps.i;
3142 
3143 	ms.conn = c_tab[ps.conn];
3144 	ms.peer = ps.role;
3145 	ms.role = ps.peer;
3146 	ms.pdsk = ps.disk;
3147 	ms.disk = ps.pdsk;
3148 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3149 
3150 	return ms;
3151 }
3152 
3153 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3154 {
3155 	struct p_req_state *p = (struct p_req_state *)h;
3156 	union drbd_state mask, val;
3157 	int rv;
3158 
3159 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3160 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3161 		return FALSE;
3162 
3163 	mask.i = be32_to_cpu(p->mask);
3164 	val.i = be32_to_cpu(p->val);
3165 
3166 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3167 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3168 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3169 		return TRUE;
3170 	}
3171 
3172 	mask = convert_state(mask);
3173 	val = convert_state(val);
3174 
3175 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3176 
3177 	drbd_send_sr_reply(mdev, rv);
3178 	drbd_md_sync(mdev);
3179 
3180 	return TRUE;
3181 }
3182 
3183 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3184 {
3185 	struct p_state *p = (struct p_state *)h;
3186 	enum drbd_conns nconn, oconn;
3187 	union drbd_state ns, peer_state;
3188 	enum drbd_disk_state real_peer_disk;
3189 	int rv;
3190 
3191 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3192 		return FALSE;
3193 
3194 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3195 		return FALSE;
3196 
3197 	peer_state.i = be32_to_cpu(p->state);
3198 
3199 	real_peer_disk = peer_state.disk;
3200 	if (peer_state.disk == D_NEGOTIATING) {
3201 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3202 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3203 	}
3204 
3205 	spin_lock_irq(&mdev->req_lock);
3206  retry:
3207 	oconn = nconn = mdev->state.conn;
3208 	spin_unlock_irq(&mdev->req_lock);
3209 
3210 	if (nconn == C_WF_REPORT_PARAMS)
3211 		nconn = C_CONNECTED;
3212 
3213 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3214 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3215 		int cr; /* consider resync */
3216 
3217 		/* if we established a new connection */
3218 		cr  = (oconn < C_CONNECTED);
3219 		/* if we had an established connection
3220 		 * and one of the nodes newly attaches a disk */
3221 		cr |= (oconn == C_CONNECTED &&
3222 		       (peer_state.disk == D_NEGOTIATING ||
3223 			mdev->state.disk == D_NEGOTIATING));
3224 		/* if we have both been inconsistent, and the peer has been
3225 		 * forced to be UpToDate with --overwrite-data */
3226 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3227 		/* if we had been plain connected, and the admin requested to
3228 		 * start a sync by "invalidate" or "invalidate-remote" */
3229 		cr |= (oconn == C_CONNECTED &&
3230 				(peer_state.conn >= C_STARTING_SYNC_S &&
3231 				 peer_state.conn <= C_WF_BITMAP_T));
3232 
3233 		if (cr)
3234 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3235 
3236 		put_ldev(mdev);
3237 		if (nconn == C_MASK) {
3238 			nconn = C_CONNECTED;
3239 			if (mdev->state.disk == D_NEGOTIATING) {
3240 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3241 			} else if (peer_state.disk == D_NEGOTIATING) {
3242 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3243 				peer_state.disk = D_DISKLESS;
3244 				real_peer_disk = D_DISKLESS;
3245 			} else {
3246 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3247 					return FALSE;
3248 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3249 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3250 				return FALSE;
3251 			}
3252 		}
3253 	}
3254 
3255 	spin_lock_irq(&mdev->req_lock);
3256 	if (mdev->state.conn != oconn)
3257 		goto retry;
3258 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3259 	ns.i = mdev->state.i;
3260 	ns.conn = nconn;
3261 	ns.peer = peer_state.role;
3262 	ns.pdsk = real_peer_disk;
3263 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3264 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3265 		ns.disk = mdev->new_state_tmp.disk;
3266 
3267 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3268 	ns = mdev->state;
3269 	spin_unlock_irq(&mdev->req_lock);
3270 
3271 	if (rv < SS_SUCCESS) {
3272 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3273 		return FALSE;
3274 	}
3275 
3276 	if (oconn > C_WF_REPORT_PARAMS) {
3277 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3278 		    peer_state.disk != D_NEGOTIATING ) {
3279 			/* we want resync, peer has not yet decided to sync... */
3280 			/* Nowadays only used when forcing a node into primary role and
3281 			   setting its disk to UpToDate with that */
3282 			drbd_send_uuids(mdev);
3283 			drbd_send_state(mdev);
3284 		}
3285 	}
3286 
3287 	mdev->net_conf->want_lose = 0;
3288 
3289 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3290 
3291 	return TRUE;
3292 }
3293 
3294 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3295 {
3296 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3297 
3298 	wait_event(mdev->misc_wait,
3299 		   mdev->state.conn == C_WF_SYNC_UUID ||
3300 		   mdev->state.conn < C_CONNECTED ||
3301 		   mdev->state.disk < D_NEGOTIATING);
3302 
3303 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3304 
3305 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3306 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3307 		return FALSE;
3308 
3309 	/* Here the _drbd_uuid_ functions are right, current should
3310 	   _not_ be rotated into the history */
3311 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3312 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3313 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3314 
3315 		drbd_start_resync(mdev, C_SYNC_TARGET);
3316 
3317 		put_ldev(mdev);
3318 	} else
3319 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3320 
3321 	return TRUE;
3322 }
3323 
3324 enum receive_bitmap_ret { OK, DONE, FAILED };
3325 
3326 static enum receive_bitmap_ret
3327 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3328 	unsigned long *buffer, struct bm_xfer_ctx *c)
3329 {
3330 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3331 	unsigned want = num_words * sizeof(long);
3332 
3333 	if (want != h->length) {
3334 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3335 		return FAILED;
3336 	}
3337 	if (want == 0)
3338 		return DONE;
3339 	if (drbd_recv(mdev, buffer, want) != want)
3340 		return FAILED;
3341 
3342 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3343 
3344 	c->word_offset += num_words;
3345 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3346 	if (c->bit_offset > c->bm_bits)
3347 		c->bit_offset = c->bm_bits;
3348 
3349 	return OK;
3350 }
3351 
3352 static enum receive_bitmap_ret
3353 recv_bm_rle_bits(struct drbd_conf *mdev,
3354 		struct p_compressed_bm *p,
3355 		struct bm_xfer_ctx *c)
3356 {
3357 	struct bitstream bs;
3358 	u64 look_ahead;
3359 	u64 rl;
3360 	u64 tmp;
3361 	unsigned long s = c->bit_offset;
3362 	unsigned long e;
3363 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3364 	int toggle = DCBP_get_start(p);
3365 	int have;
3366 	int bits;
3367 
3368 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3369 
3370 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3371 	if (bits < 0)
3372 		return FAILED;
3373 
3374 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3375 		bits = vli_decode_bits(&rl, look_ahead);
3376 		if (bits <= 0)
3377 			return FAILED;
3378 
3379 		if (toggle) {
3380 			e = s + rl -1;
3381 			if (e >= c->bm_bits) {
3382 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3383 				return FAILED;
3384 			}
3385 			_drbd_bm_set_bits(mdev, s, e);
3386 		}
3387 
3388 		if (have < bits) {
3389 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3390 				have, bits, look_ahead,
3391 				(unsigned int)(bs.cur.b - p->code),
3392 				(unsigned int)bs.buf_len);
3393 			return FAILED;
3394 		}
3395 		look_ahead >>= bits;
3396 		have -= bits;
3397 
3398 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3399 		if (bits < 0)
3400 			return FAILED;
3401 		look_ahead |= tmp << have;
3402 		have += bits;
3403 	}
3404 
3405 	c->bit_offset = s;
3406 	bm_xfer_ctx_bit_to_word_offset(c);
3407 
3408 	return (s == c->bm_bits) ? DONE : OK;
3409 }
3410 
3411 static enum receive_bitmap_ret
3412 decode_bitmap_c(struct drbd_conf *mdev,
3413 		struct p_compressed_bm *p,
3414 		struct bm_xfer_ctx *c)
3415 {
3416 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3417 		return recv_bm_rle_bits(mdev, p, c);
3418 
3419 	/* other variants had been implemented for evaluation,
3420 	 * but have been dropped as this one turned out to be "best"
3421 	 * during all our tests. */
3422 
3423 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3424 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3425 	return FAILED;
3426 }
3427 
3428 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3429 		const char *direction, struct bm_xfer_ctx *c)
3430 {
3431 	/* what would it take to transfer it "plaintext" */
3432 	unsigned plain = sizeof(struct p_header) *
3433 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3434 		+ c->bm_words * sizeof(long);
3435 	unsigned total = c->bytes[0] + c->bytes[1];
3436 	unsigned r;
3437 
3438 	/* total can not be zero. but just in case: */
3439 	if (total == 0)
3440 		return;
3441 
3442 	/* don't report if not compressed */
3443 	if (total >= plain)
3444 		return;
3445 
3446 	/* total < plain. check for overflow, still */
3447 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3448 		                    : (1000 * total / plain);
3449 
3450 	if (r > 1000)
3451 		r = 1000;
3452 
3453 	r = 1000 - r;
3454 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3455 	     "total %u; compression: %u.%u%%\n",
3456 			direction,
3457 			c->bytes[1], c->packets[1],
3458 			c->bytes[0], c->packets[0],
3459 			total, r/10, r % 10);
3460 }
3461 
3462 /* Since we are processing the bitfield from lower addresses to higher,
3463    it does not matter if the process it in 32 bit chunks or 64 bit
3464    chunks as long as it is little endian. (Understand it as byte stream,
3465    beginning with the lowest byte...) If we would use big endian
3466    we would need to process it from the highest address to the lowest,
3467    in order to be agnostic to the 32 vs 64 bits issue.
3468 
3469    returns 0 on failure, 1 if we successfully received it. */
3470 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3471 {
3472 	struct bm_xfer_ctx c;
3473 	void *buffer;
3474 	enum receive_bitmap_ret ret;
3475 	int ok = FALSE;
3476 
3477 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3478 
3479 	drbd_bm_lock(mdev, "receive bitmap");
3480 
3481 	/* maybe we should use some per thread scratch page,
3482 	 * and allocate that during initial device creation? */
3483 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3484 	if (!buffer) {
3485 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3486 		goto out;
3487 	}
3488 
3489 	c = (struct bm_xfer_ctx) {
3490 		.bm_bits = drbd_bm_bits(mdev),
3491 		.bm_words = drbd_bm_words(mdev),
3492 	};
3493 
3494 	do {
3495 		if (h->command == P_BITMAP) {
3496 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3497 		} else if (h->command == P_COMPRESSED_BITMAP) {
3498 			/* MAYBE: sanity check that we speak proto >= 90,
3499 			 * and the feature is enabled! */
3500 			struct p_compressed_bm *p;
3501 
3502 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3503 				dev_err(DEV, "ReportCBitmap packet too large\n");
3504 				goto out;
3505 			}
3506 			/* use the page buff */
3507 			p = buffer;
3508 			memcpy(p, h, sizeof(*h));
3509 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3510 				goto out;
3511 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3512 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3513 				return FAILED;
3514 			}
3515 			ret = decode_bitmap_c(mdev, p, &c);
3516 		} else {
3517 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3518 			goto out;
3519 		}
3520 
3521 		c.packets[h->command == P_BITMAP]++;
3522 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3523 
3524 		if (ret != OK)
3525 			break;
3526 
3527 		if (!drbd_recv_header(mdev, h))
3528 			goto out;
3529 	} while (ret == OK);
3530 	if (ret == FAILED)
3531 		goto out;
3532 
3533 	INFO_bm_xfer_stats(mdev, "receive", &c);
3534 
3535 	if (mdev->state.conn == C_WF_BITMAP_T) {
3536 		ok = !drbd_send_bitmap(mdev);
3537 		if (!ok)
3538 			goto out;
3539 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3540 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3541 		D_ASSERT(ok == SS_SUCCESS);
3542 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3543 		/* admin may have requested C_DISCONNECTING,
3544 		 * other threads may have noticed network errors */
3545 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3546 		    drbd_conn_str(mdev->state.conn));
3547 	}
3548 
3549 	ok = TRUE;
3550  out:
3551 	drbd_bm_unlock(mdev);
3552 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3553 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3554 	free_page((unsigned long) buffer);
3555 	return ok;
3556 }
3557 
3558 static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
3559 {
3560 	/* TODO zero copy sink :) */
3561 	static char sink[128];
3562 	int size, want, r;
3563 
3564 	if (!silent)
3565 		dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3566 		     h->command, h->length);
3567 
3568 	size = h->length;
3569 	while (size > 0) {
3570 		want = min_t(int, size, sizeof(sink));
3571 		r = drbd_recv(mdev, sink, want);
3572 		ERR_IF(r <= 0) break;
3573 		size -= r;
3574 	}
3575 	return size == 0;
3576 }
3577 
3578 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3579 {
3580 	return receive_skip_(mdev, h, 0);
3581 }
3582 
3583 static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
3584 {
3585 	return receive_skip_(mdev, h, 1);
3586 }
3587 
3588 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3589 {
3590 	if (mdev->state.disk >= D_INCONSISTENT)
3591 		drbd_kick_lo(mdev);
3592 
3593 	/* Make sure we've acked all the TCP data associated
3594 	 * with the data requests being unplugged */
3595 	drbd_tcp_quickack(mdev->data.socket);
3596 
3597 	return TRUE;
3598 }
3599 
3600 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3601 
3602 static drbd_cmd_handler_f drbd_default_handler[] = {
3603 	[P_DATA]	    = receive_Data,
3604 	[P_DATA_REPLY]	    = receive_DataReply,
3605 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3606 	[P_BARRIER]	    = receive_Barrier,
3607 	[P_BITMAP]	    = receive_bitmap,
3608 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3609 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3610 	[P_DATA_REQUEST]    = receive_DataRequest,
3611 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3612 	[P_SYNC_PARAM]	    = receive_SyncParam,
3613 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3614 	[P_PROTOCOL]        = receive_protocol,
3615 	[P_UUIDS]	    = receive_uuids,
3616 	[P_SIZES]	    = receive_sizes,
3617 	[P_STATE]	    = receive_state,
3618 	[P_STATE_CHG_REQ]   = receive_req_state,
3619 	[P_SYNC_UUID]       = receive_sync_uuid,
3620 	[P_OV_REQUEST]      = receive_DataRequest,
3621 	[P_OV_REPLY]        = receive_DataRequest,
3622 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3623 	[P_DELAY_PROBE]     = receive_skip_silent,
3624 	/* anything missing from this table is in
3625 	 * the asender_tbl, see get_asender_cmd */
3626 	[P_MAX_CMD]	    = NULL,
3627 };
3628 
3629 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3630 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3631 
3632 static void drbdd(struct drbd_conf *mdev)
3633 {
3634 	drbd_cmd_handler_f handler;
3635 	struct p_header *header = &mdev->data.rbuf.header;
3636 
3637 	while (get_t_state(&mdev->receiver) == Running) {
3638 		drbd_thread_current_set_cpu(mdev);
3639 		if (!drbd_recv_header(mdev, header)) {
3640 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3641 			break;
3642 		}
3643 
3644 		if (header->command < P_MAX_CMD)
3645 			handler = drbd_cmd_handler[header->command];
3646 		else if (P_MAY_IGNORE < header->command
3647 		     && header->command < P_MAX_OPT_CMD)
3648 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3649 		else if (header->command > P_MAX_OPT_CMD)
3650 			handler = receive_skip;
3651 		else
3652 			handler = NULL;
3653 
3654 		if (unlikely(!handler)) {
3655 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3656 			    header->command, header->length);
3657 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3658 			break;
3659 		}
3660 		if (unlikely(!handler(mdev, header))) {
3661 			dev_err(DEV, "error receiving %s, l: %d!\n",
3662 			    cmdname(header->command), header->length);
3663 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3664 			break;
3665 		}
3666 	}
3667 }
3668 
3669 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3670 {
3671 	struct hlist_head *slot;
3672 	struct hlist_node *pos;
3673 	struct hlist_node *tmp;
3674 	struct drbd_request *req;
3675 	int i;
3676 
3677 	/*
3678 	 * Application READ requests
3679 	 */
3680 	spin_lock_irq(&mdev->req_lock);
3681 	for (i = 0; i < APP_R_HSIZE; i++) {
3682 		slot = mdev->app_reads_hash+i;
3683 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3684 			/* it may (but should not any longer!)
3685 			 * be on the work queue; if that assert triggers,
3686 			 * we need to also grab the
3687 			 * spin_lock_irq(&mdev->data.work.q_lock);
3688 			 * and list_del_init here. */
3689 			D_ASSERT(list_empty(&req->w.list));
3690 			/* It would be nice to complete outside of spinlock.
3691 			 * But this is easier for now. */
3692 			_req_mod(req, connection_lost_while_pending);
3693 		}
3694 	}
3695 	for (i = 0; i < APP_R_HSIZE; i++)
3696 		if (!hlist_empty(mdev->app_reads_hash+i))
3697 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3698 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3699 
3700 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3701 	spin_unlock_irq(&mdev->req_lock);
3702 }
3703 
3704 void drbd_flush_workqueue(struct drbd_conf *mdev)
3705 {
3706 	struct drbd_wq_barrier barr;
3707 
3708 	barr.w.cb = w_prev_work_done;
3709 	init_completion(&barr.done);
3710 	drbd_queue_work(&mdev->data.work, &barr.w);
3711 	wait_for_completion(&barr.done);
3712 }
3713 
3714 static void drbd_disconnect(struct drbd_conf *mdev)
3715 {
3716 	enum drbd_fencing_p fp;
3717 	union drbd_state os, ns;
3718 	int rv = SS_UNKNOWN_ERROR;
3719 	unsigned int i;
3720 
3721 	if (mdev->state.conn == C_STANDALONE)
3722 		return;
3723 	if (mdev->state.conn >= C_WF_CONNECTION)
3724 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3725 				drbd_conn_str(mdev->state.conn));
3726 
3727 	/* asender does not clean up anything. it must not interfere, either */
3728 	drbd_thread_stop(&mdev->asender);
3729 	drbd_free_sock(mdev);
3730 
3731 	spin_lock_irq(&mdev->req_lock);
3732 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3733 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3734 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3735 	spin_unlock_irq(&mdev->req_lock);
3736 
3737 	/* We do not have data structures that would allow us to
3738 	 * get the rs_pending_cnt down to 0 again.
3739 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3740 	 *    the pending RSDataRequest's we have sent.
3741 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3742 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3743 	 *  And no, it is not the sum of the reference counts in the
3744 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3745 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3746 	 *  on the fly. */
3747 	drbd_rs_cancel_all(mdev);
3748 	mdev->rs_total = 0;
3749 	mdev->rs_failed = 0;
3750 	atomic_set(&mdev->rs_pending_cnt, 0);
3751 	wake_up(&mdev->misc_wait);
3752 
3753 	/* make sure syncer is stopped and w_resume_next_sg queued */
3754 	del_timer_sync(&mdev->resync_timer);
3755 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3756 	resync_timer_fn((unsigned long)mdev);
3757 
3758 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3759 	 * w_make_resync_request etc. which may still be on the worker queue
3760 	 * to be "canceled" */
3761 	drbd_flush_workqueue(mdev);
3762 
3763 	/* This also does reclaim_net_ee().  If we do this too early, we might
3764 	 * miss some resync ee and pages.*/
3765 	drbd_process_done_ee(mdev);
3766 
3767 	kfree(mdev->p_uuid);
3768 	mdev->p_uuid = NULL;
3769 
3770 	if (!mdev->state.susp)
3771 		tl_clear(mdev);
3772 
3773 	drbd_fail_pending_reads(mdev);
3774 
3775 	dev_info(DEV, "Connection closed\n");
3776 
3777 	drbd_md_sync(mdev);
3778 
3779 	fp = FP_DONT_CARE;
3780 	if (get_ldev(mdev)) {
3781 		fp = mdev->ldev->dc.fencing;
3782 		put_ldev(mdev);
3783 	}
3784 
3785 	if (mdev->state.role == R_PRIMARY) {
3786 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3787 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3788 			drbd_request_state(mdev, NS(pdsk, nps));
3789 		}
3790 	}
3791 
3792 	spin_lock_irq(&mdev->req_lock);
3793 	os = mdev->state;
3794 	if (os.conn >= C_UNCONNECTED) {
3795 		/* Do not restart in case we are C_DISCONNECTING */
3796 		ns = os;
3797 		ns.conn = C_UNCONNECTED;
3798 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3799 	}
3800 	spin_unlock_irq(&mdev->req_lock);
3801 
3802 	if (os.conn == C_DISCONNECTING) {
3803 		struct hlist_head *h;
3804 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3805 
3806 		/* we must not free the tl_hash
3807 		 * while application io is still on the fly */
3808 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3809 
3810 		spin_lock_irq(&mdev->req_lock);
3811 		/* paranoia code */
3812 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3813 			if (h->first)
3814 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3815 						(int)(h - mdev->ee_hash), h->first);
3816 		kfree(mdev->ee_hash);
3817 		mdev->ee_hash = NULL;
3818 		mdev->ee_hash_s = 0;
3819 
3820 		/* paranoia code */
3821 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3822 			if (h->first)
3823 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3824 						(int)(h - mdev->tl_hash), h->first);
3825 		kfree(mdev->tl_hash);
3826 		mdev->tl_hash = NULL;
3827 		mdev->tl_hash_s = 0;
3828 		spin_unlock_irq(&mdev->req_lock);
3829 
3830 		crypto_free_hash(mdev->cram_hmac_tfm);
3831 		mdev->cram_hmac_tfm = NULL;
3832 
3833 		kfree(mdev->net_conf);
3834 		mdev->net_conf = NULL;
3835 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3836 	}
3837 
3838 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3839 	 * want to use SO_LINGER, because apparently it can be deferred for
3840 	 * more than 20 seconds (longest time I checked).
3841 	 *
3842 	 * Actually we don't care for exactly when the network stack does its
3843 	 * put_page(), but release our reference on these pages right here.
3844 	 */
3845 	i = drbd_release_ee(mdev, &mdev->net_ee);
3846 	if (i)
3847 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3848 	i = atomic_read(&mdev->pp_in_use);
3849 	if (i)
3850 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3851 
3852 	D_ASSERT(list_empty(&mdev->read_ee));
3853 	D_ASSERT(list_empty(&mdev->active_ee));
3854 	D_ASSERT(list_empty(&mdev->sync_ee));
3855 	D_ASSERT(list_empty(&mdev->done_ee));
3856 
3857 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3858 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3859 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3860 }
3861 
3862 /*
3863  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3864  * we can agree on is stored in agreed_pro_version.
3865  *
3866  * feature flags and the reserved array should be enough room for future
3867  * enhancements of the handshake protocol, and possible plugins...
3868  *
3869  * for now, they are expected to be zero, but ignored.
3870  */
3871 static int drbd_send_handshake(struct drbd_conf *mdev)
3872 {
3873 	/* ASSERT current == mdev->receiver ... */
3874 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3875 	int ok;
3876 
3877 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3878 		dev_err(DEV, "interrupted during initial handshake\n");
3879 		return 0; /* interrupted. not ok. */
3880 	}
3881 
3882 	if (mdev->data.socket == NULL) {
3883 		mutex_unlock(&mdev->data.mutex);
3884 		return 0;
3885 	}
3886 
3887 	memset(p, 0, sizeof(*p));
3888 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3889 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3890 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3891 			     (struct p_header *)p, sizeof(*p), 0 );
3892 	mutex_unlock(&mdev->data.mutex);
3893 	return ok;
3894 }
3895 
3896 /*
3897  * return values:
3898  *   1 yes, we have a valid connection
3899  *   0 oops, did not work out, please try again
3900  *  -1 peer talks different language,
3901  *     no point in trying again, please go standalone.
3902  */
3903 static int drbd_do_handshake(struct drbd_conf *mdev)
3904 {
3905 	/* ASSERT current == mdev->receiver ... */
3906 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3907 	const int expect = sizeof(struct p_handshake)
3908 			  -sizeof(struct p_header);
3909 	int rv;
3910 
3911 	rv = drbd_send_handshake(mdev);
3912 	if (!rv)
3913 		return 0;
3914 
3915 	rv = drbd_recv_header(mdev, &p->head);
3916 	if (!rv)
3917 		return 0;
3918 
3919 	if (p->head.command != P_HAND_SHAKE) {
3920 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3921 		     cmdname(p->head.command), p->head.command);
3922 		return -1;
3923 	}
3924 
3925 	if (p->head.length != expect) {
3926 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3927 		     expect, p->head.length);
3928 		return -1;
3929 	}
3930 
3931 	rv = drbd_recv(mdev, &p->head.payload, expect);
3932 
3933 	if (rv != expect) {
3934 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3935 		return 0;
3936 	}
3937 
3938 	p->protocol_min = be32_to_cpu(p->protocol_min);
3939 	p->protocol_max = be32_to_cpu(p->protocol_max);
3940 	if (p->protocol_max == 0)
3941 		p->protocol_max = p->protocol_min;
3942 
3943 	if (PRO_VERSION_MAX < p->protocol_min ||
3944 	    PRO_VERSION_MIN > p->protocol_max)
3945 		goto incompat;
3946 
3947 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3948 
3949 	dev_info(DEV, "Handshake successful: "
3950 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3951 
3952 	return 1;
3953 
3954  incompat:
3955 	dev_err(DEV, "incompatible DRBD dialects: "
3956 	    "I support %d-%d, peer supports %d-%d\n",
3957 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3958 	    p->protocol_min, p->protocol_max);
3959 	return -1;
3960 }
3961 
3962 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3963 static int drbd_do_auth(struct drbd_conf *mdev)
3964 {
3965 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3966 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3967 	return -1;
3968 }
3969 #else
3970 #define CHALLENGE_LEN 64
3971 
3972 /* Return value:
3973 	1 - auth succeeded,
3974 	0 - failed, try again (network error),
3975 	-1 - auth failed, don't try again.
3976 */
3977 
3978 static int drbd_do_auth(struct drbd_conf *mdev)
3979 {
3980 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3981 	struct scatterlist sg;
3982 	char *response = NULL;
3983 	char *right_response = NULL;
3984 	char *peers_ch = NULL;
3985 	struct p_header p;
3986 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3987 	unsigned int resp_size;
3988 	struct hash_desc desc;
3989 	int rv;
3990 
3991 	desc.tfm = mdev->cram_hmac_tfm;
3992 	desc.flags = 0;
3993 
3994 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3995 				(u8 *)mdev->net_conf->shared_secret, key_len);
3996 	if (rv) {
3997 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3998 		rv = -1;
3999 		goto fail;
4000 	}
4001 
4002 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4003 
4004 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4005 	if (!rv)
4006 		goto fail;
4007 
4008 	rv = drbd_recv_header(mdev, &p);
4009 	if (!rv)
4010 		goto fail;
4011 
4012 	if (p.command != P_AUTH_CHALLENGE) {
4013 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4014 		    cmdname(p.command), p.command);
4015 		rv = 0;
4016 		goto fail;
4017 	}
4018 
4019 	if (p.length > CHALLENGE_LEN*2) {
4020 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4021 		rv = -1;
4022 		goto fail;
4023 	}
4024 
4025 	peers_ch = kmalloc(p.length, GFP_NOIO);
4026 	if (peers_ch == NULL) {
4027 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4028 		rv = -1;
4029 		goto fail;
4030 	}
4031 
4032 	rv = drbd_recv(mdev, peers_ch, p.length);
4033 
4034 	if (rv != p.length) {
4035 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4036 		rv = 0;
4037 		goto fail;
4038 	}
4039 
4040 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4041 	response = kmalloc(resp_size, GFP_NOIO);
4042 	if (response == NULL) {
4043 		dev_err(DEV, "kmalloc of response failed\n");
4044 		rv = -1;
4045 		goto fail;
4046 	}
4047 
4048 	sg_init_table(&sg, 1);
4049 	sg_set_buf(&sg, peers_ch, p.length);
4050 
4051 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4052 	if (rv) {
4053 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4054 		rv = -1;
4055 		goto fail;
4056 	}
4057 
4058 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4059 	if (!rv)
4060 		goto fail;
4061 
4062 	rv = drbd_recv_header(mdev, &p);
4063 	if (!rv)
4064 		goto fail;
4065 
4066 	if (p.command != P_AUTH_RESPONSE) {
4067 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4068 		    cmdname(p.command), p.command);
4069 		rv = 0;
4070 		goto fail;
4071 	}
4072 
4073 	if (p.length != resp_size) {
4074 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4075 		rv = 0;
4076 		goto fail;
4077 	}
4078 
4079 	rv = drbd_recv(mdev, response , resp_size);
4080 
4081 	if (rv != resp_size) {
4082 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4083 		rv = 0;
4084 		goto fail;
4085 	}
4086 
4087 	right_response = kmalloc(resp_size, GFP_NOIO);
4088 	if (right_response == NULL) {
4089 		dev_err(DEV, "kmalloc of right_response failed\n");
4090 		rv = -1;
4091 		goto fail;
4092 	}
4093 
4094 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4095 
4096 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4097 	if (rv) {
4098 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4099 		rv = -1;
4100 		goto fail;
4101 	}
4102 
4103 	rv = !memcmp(response, right_response, resp_size);
4104 
4105 	if (rv)
4106 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4107 		     resp_size, mdev->net_conf->cram_hmac_alg);
4108 	else
4109 		rv = -1;
4110 
4111  fail:
4112 	kfree(peers_ch);
4113 	kfree(response);
4114 	kfree(right_response);
4115 
4116 	return rv;
4117 }
4118 #endif
4119 
4120 int drbdd_init(struct drbd_thread *thi)
4121 {
4122 	struct drbd_conf *mdev = thi->mdev;
4123 	unsigned int minor = mdev_to_minor(mdev);
4124 	int h;
4125 
4126 	sprintf(current->comm, "drbd%d_receiver", minor);
4127 
4128 	dev_info(DEV, "receiver (re)started\n");
4129 
4130 	do {
4131 		h = drbd_connect(mdev);
4132 		if (h == 0) {
4133 			drbd_disconnect(mdev);
4134 			__set_current_state(TASK_INTERRUPTIBLE);
4135 			schedule_timeout(HZ);
4136 		}
4137 		if (h == -1) {
4138 			dev_warn(DEV, "Discarding network configuration.\n");
4139 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4140 		}
4141 	} while (h == 0);
4142 
4143 	if (h > 0) {
4144 		if (get_net_conf(mdev)) {
4145 			drbdd(mdev);
4146 			put_net_conf(mdev);
4147 		}
4148 	}
4149 
4150 	drbd_disconnect(mdev);
4151 
4152 	dev_info(DEV, "receiver terminated\n");
4153 	return 0;
4154 }
4155 
4156 /* ********* acknowledge sender ******** */
4157 
4158 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4159 {
4160 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4161 
4162 	int retcode = be32_to_cpu(p->retcode);
4163 
4164 	if (retcode >= SS_SUCCESS) {
4165 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4166 	} else {
4167 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4168 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4169 		    drbd_set_st_err_str(retcode), retcode);
4170 	}
4171 	wake_up(&mdev->state_wait);
4172 
4173 	return TRUE;
4174 }
4175 
4176 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4177 {
4178 	return drbd_send_ping_ack(mdev);
4179 
4180 }
4181 
4182 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4183 {
4184 	/* restore idle timeout */
4185 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4186 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4187 		wake_up(&mdev->misc_wait);
4188 
4189 	return TRUE;
4190 }
4191 
4192 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4193 {
4194 	struct p_block_ack *p = (struct p_block_ack *)h;
4195 	sector_t sector = be64_to_cpu(p->sector);
4196 	int blksize = be32_to_cpu(p->blksize);
4197 
4198 	D_ASSERT(mdev->agreed_pro_version >= 89);
4199 
4200 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4201 
4202 	drbd_rs_complete_io(mdev, sector);
4203 	drbd_set_in_sync(mdev, sector, blksize);
4204 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4205 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4206 	dec_rs_pending(mdev);
4207 
4208 	return TRUE;
4209 }
4210 
4211 /* when we receive the ACK for a write request,
4212  * verify that we actually know about it */
4213 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4214 	u64 id, sector_t sector)
4215 {
4216 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4217 	struct hlist_node *n;
4218 	struct drbd_request *req;
4219 
4220 	hlist_for_each_entry(req, n, slot, colision) {
4221 		if ((unsigned long)req == (unsigned long)id) {
4222 			if (req->sector != sector) {
4223 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4224 				    "wrong sector (%llus versus %llus)\n", req,
4225 				    (unsigned long long)req->sector,
4226 				    (unsigned long long)sector);
4227 				break;
4228 			}
4229 			return req;
4230 		}
4231 	}
4232 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4233 		(void *)(unsigned long)id, (unsigned long long)sector);
4234 	return NULL;
4235 }
4236 
4237 typedef struct drbd_request *(req_validator_fn)
4238 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4239 
4240 static int validate_req_change_req_state(struct drbd_conf *mdev,
4241 	u64 id, sector_t sector, req_validator_fn validator,
4242 	const char *func, enum drbd_req_event what)
4243 {
4244 	struct drbd_request *req;
4245 	struct bio_and_error m;
4246 
4247 	spin_lock_irq(&mdev->req_lock);
4248 	req = validator(mdev, id, sector);
4249 	if (unlikely(!req)) {
4250 		spin_unlock_irq(&mdev->req_lock);
4251 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4252 		return FALSE;
4253 	}
4254 	__req_mod(req, what, &m);
4255 	spin_unlock_irq(&mdev->req_lock);
4256 
4257 	if (m.bio)
4258 		complete_master_bio(mdev, &m);
4259 	return TRUE;
4260 }
4261 
4262 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4263 {
4264 	struct p_block_ack *p = (struct p_block_ack *)h;
4265 	sector_t sector = be64_to_cpu(p->sector);
4266 	int blksize = be32_to_cpu(p->blksize);
4267 	enum drbd_req_event what;
4268 
4269 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4270 
4271 	if (is_syncer_block_id(p->block_id)) {
4272 		drbd_set_in_sync(mdev, sector, blksize);
4273 		dec_rs_pending(mdev);
4274 		return TRUE;
4275 	}
4276 	switch (be16_to_cpu(h->command)) {
4277 	case P_RS_WRITE_ACK:
4278 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4279 		what = write_acked_by_peer_and_sis;
4280 		break;
4281 	case P_WRITE_ACK:
4282 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4283 		what = write_acked_by_peer;
4284 		break;
4285 	case P_RECV_ACK:
4286 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4287 		what = recv_acked_by_peer;
4288 		break;
4289 	case P_DISCARD_ACK:
4290 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4291 		what = conflict_discarded_by_peer;
4292 		break;
4293 	default:
4294 		D_ASSERT(0);
4295 		return FALSE;
4296 	}
4297 
4298 	return validate_req_change_req_state(mdev, p->block_id, sector,
4299 		_ack_id_to_req, __func__ , what);
4300 }
4301 
4302 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4303 {
4304 	struct p_block_ack *p = (struct p_block_ack *)h;
4305 	sector_t sector = be64_to_cpu(p->sector);
4306 
4307 	if (__ratelimit(&drbd_ratelimit_state))
4308 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4309 
4310 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4311 
4312 	if (is_syncer_block_id(p->block_id)) {
4313 		int size = be32_to_cpu(p->blksize);
4314 		dec_rs_pending(mdev);
4315 		drbd_rs_failed_io(mdev, sector, size);
4316 		return TRUE;
4317 	}
4318 	return validate_req_change_req_state(mdev, p->block_id, sector,
4319 		_ack_id_to_req, __func__ , neg_acked);
4320 }
4321 
4322 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4323 {
4324 	struct p_block_ack *p = (struct p_block_ack *)h;
4325 	sector_t sector = be64_to_cpu(p->sector);
4326 
4327 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4328 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4329 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4330 
4331 	return validate_req_change_req_state(mdev, p->block_id, sector,
4332 		_ar_id_to_req, __func__ , neg_acked);
4333 }
4334 
4335 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4336 {
4337 	sector_t sector;
4338 	int size;
4339 	struct p_block_ack *p = (struct p_block_ack *)h;
4340 
4341 	sector = be64_to_cpu(p->sector);
4342 	size = be32_to_cpu(p->blksize);
4343 
4344 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4345 
4346 	dec_rs_pending(mdev);
4347 
4348 	if (get_ldev_if_state(mdev, D_FAILED)) {
4349 		drbd_rs_complete_io(mdev, sector);
4350 		drbd_rs_failed_io(mdev, sector, size);
4351 		put_ldev(mdev);
4352 	}
4353 
4354 	return TRUE;
4355 }
4356 
4357 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4358 {
4359 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4360 
4361 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4362 
4363 	return TRUE;
4364 }
4365 
4366 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4367 {
4368 	struct p_block_ack *p = (struct p_block_ack *)h;
4369 	struct drbd_work *w;
4370 	sector_t sector;
4371 	int size;
4372 
4373 	sector = be64_to_cpu(p->sector);
4374 	size = be32_to_cpu(p->blksize);
4375 
4376 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377 
4378 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4379 		drbd_ov_oos_found(mdev, sector, size);
4380 	else
4381 		ov_oos_print(mdev);
4382 
4383 	drbd_rs_complete_io(mdev, sector);
4384 	dec_rs_pending(mdev);
4385 
4386 	if (--mdev->ov_left == 0) {
4387 		w = kmalloc(sizeof(*w), GFP_NOIO);
4388 		if (w) {
4389 			w->cb = w_ov_finished;
4390 			drbd_queue_work_front(&mdev->data.work, w);
4391 		} else {
4392 			dev_err(DEV, "kmalloc(w) failed.");
4393 			ov_oos_print(mdev);
4394 			drbd_resync_finished(mdev);
4395 		}
4396 	}
4397 	return TRUE;
4398 }
4399 
4400 static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
4401 {
4402 	/* IGNORE */
4403 	return TRUE;
4404 }
4405 
4406 struct asender_cmd {
4407 	size_t pkt_size;
4408 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4409 };
4410 
4411 static struct asender_cmd *get_asender_cmd(int cmd)
4412 {
4413 	static struct asender_cmd asender_tbl[] = {
4414 		/* anything missing from this table is in
4415 		 * the drbd_cmd_handler (drbd_default_handler) table,
4416 		 * see the beginning of drbdd() */
4417 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4418 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4419 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4420 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4421 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4422 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4423 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4424 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4425 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4426 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4427 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4428 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4429 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4430 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
4431 	[P_MAX_CMD]	    = { 0, NULL },
4432 	};
4433 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4434 		return NULL;
4435 	return &asender_tbl[cmd];
4436 }
4437 
4438 int drbd_asender(struct drbd_thread *thi)
4439 {
4440 	struct drbd_conf *mdev = thi->mdev;
4441 	struct p_header *h = &mdev->meta.rbuf.header;
4442 	struct asender_cmd *cmd = NULL;
4443 
4444 	int rv, len;
4445 	void *buf    = h;
4446 	int received = 0;
4447 	int expect   = sizeof(struct p_header);
4448 	int empty;
4449 
4450 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4451 
4452 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4453 	current->rt_priority = 2;    /* more important than all other tasks */
4454 
4455 	while (get_t_state(thi) == Running) {
4456 		drbd_thread_current_set_cpu(mdev);
4457 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4458 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4459 			mdev->meta.socket->sk->sk_rcvtimeo =
4460 				mdev->net_conf->ping_timeo*HZ/10;
4461 		}
4462 
4463 		/* conditionally cork;
4464 		 * it may hurt latency if we cork without much to send */
4465 		if (!mdev->net_conf->no_cork &&
4466 			3 < atomic_read(&mdev->unacked_cnt))
4467 			drbd_tcp_cork(mdev->meta.socket);
4468 		while (1) {
4469 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4470 			flush_signals(current);
4471 			if (!drbd_process_done_ee(mdev)) {
4472 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4473 				goto reconnect;
4474 			}
4475 			/* to avoid race with newly queued ACKs */
4476 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4477 			spin_lock_irq(&mdev->req_lock);
4478 			empty = list_empty(&mdev->done_ee);
4479 			spin_unlock_irq(&mdev->req_lock);
4480 			/* new ack may have been queued right here,
4481 			 * but then there is also a signal pending,
4482 			 * and we start over... */
4483 			if (empty)
4484 				break;
4485 		}
4486 		/* but unconditionally uncork unless disabled */
4487 		if (!mdev->net_conf->no_cork)
4488 			drbd_tcp_uncork(mdev->meta.socket);
4489 
4490 		/* short circuit, recv_msg would return EINTR anyways. */
4491 		if (signal_pending(current))
4492 			continue;
4493 
4494 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4495 				     buf, expect-received, 0);
4496 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4497 
4498 		flush_signals(current);
4499 
4500 		/* Note:
4501 		 * -EINTR	 (on meta) we got a signal
4502 		 * -EAGAIN	 (on meta) rcvtimeo expired
4503 		 * -ECONNRESET	 other side closed the connection
4504 		 * -ERESTARTSYS  (on data) we got a signal
4505 		 * rv <  0	 other than above: unexpected error!
4506 		 * rv == expected: full header or command
4507 		 * rv <  expected: "woken" by signal during receive
4508 		 * rv == 0	 : "connection shut down by peer"
4509 		 */
4510 		if (likely(rv > 0)) {
4511 			received += rv;
4512 			buf	 += rv;
4513 		} else if (rv == 0) {
4514 			dev_err(DEV, "meta connection shut down by peer.\n");
4515 			goto reconnect;
4516 		} else if (rv == -EAGAIN) {
4517 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4518 			    mdev->net_conf->ping_timeo*HZ/10) {
4519 				dev_err(DEV, "PingAck did not arrive in time.\n");
4520 				goto reconnect;
4521 			}
4522 			set_bit(SEND_PING, &mdev->flags);
4523 			continue;
4524 		} else if (rv == -EINTR) {
4525 			continue;
4526 		} else {
4527 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4528 			goto reconnect;
4529 		}
4530 
4531 		if (received == expect && cmd == NULL) {
4532 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4533 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4534 				    (long)be32_to_cpu(h->magic),
4535 				    h->command, h->length);
4536 				goto reconnect;
4537 			}
4538 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4539 			len = be16_to_cpu(h->length);
4540 			if (unlikely(cmd == NULL)) {
4541 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4542 				    (long)be32_to_cpu(h->magic),
4543 				    h->command, h->length);
4544 				goto disconnect;
4545 			}
4546 			expect = cmd->pkt_size;
4547 			ERR_IF(len != expect-sizeof(struct p_header))
4548 				goto reconnect;
4549 		}
4550 		if (received == expect) {
4551 			D_ASSERT(cmd != NULL);
4552 			if (!cmd->process(mdev, h))
4553 				goto reconnect;
4554 
4555 			buf	 = h;
4556 			received = 0;
4557 			expect	 = sizeof(struct p_header);
4558 			cmd	 = NULL;
4559 		}
4560 	}
4561 
4562 	if (0) {
4563 reconnect:
4564 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4565 	}
4566 	if (0) {
4567 disconnect:
4568 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4569 	}
4570 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4571 
4572 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4573 	dev_info(DEV, "asender terminated\n");
4574 
4575 	return 0;
4576 }
4577