xref: /linux/drivers/block/drbd/drbd_receiver.c (revision 69050f8d6d075dc01af7a5f2f550a8067510366f)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_receiver.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11  */
12 
13 
14 #include <linux/module.h>
15 
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18 
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #include <linux/unistd.h>
31 #include <linux/vmalloc.h>
32 #include <linux/random.h>
33 #include <linux/string.h>
34 #include <linux/scatterlist.h>
35 #include <linux/part_stat.h>
36 #include <linux/mempool.h>
37 #include "drbd_int.h"
38 #include "drbd_protocol.h"
39 #include "drbd_req.h"
40 #include "drbd_vli.h"
41 
42 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
43 
44 struct packet_info {
45 	enum drbd_packet cmd;
46 	unsigned int size;
47 	unsigned int vnr;
48 	void *data;
49 };
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_features(struct drbd_connection *connection);
58 static int drbd_do_auth(struct drbd_connection *connection);
59 static int drbd_disconnected(struct drbd_peer_device *);
60 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
61 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
62 static int e_end_block(struct drbd_work *, int);
63 
64 
65 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
66 
67 static struct page *__drbd_alloc_pages(unsigned int number)
68 {
69 	struct page *page = NULL;
70 	struct page *tmp = NULL;
71 	unsigned int i = 0;
72 
73 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
74 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
75 	 * which in turn might block on the other node at this very place.  */
76 	for (i = 0; i < number; i++) {
77 		tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY);
78 		if (!tmp)
79 			goto fail;
80 		set_page_private(tmp, (unsigned long)page);
81 		page = tmp;
82 	}
83 	return page;
84 fail:
85 	page_chain_for_each_safe(page, tmp) {
86 		set_page_private(page, 0);
87 		mempool_free(page, &drbd_buffer_page_pool);
88 	}
89 	return NULL;
90 }
91 
92 /**
93  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
94  * @peer_device:	DRBD device.
95  * @number:		number of pages requested
96  * @retry:		whether to retry, if not enough pages are available right now
97  *
98  * Tries to allocate number pages, first from our own page pool, then from
99  * the kernel.
100  * Possibly retry until DRBD frees sufficient pages somewhere else.
101  *
102  * If this allocation would exceed the max_buffers setting, we throttle
103  * allocation (schedule_timeout) to give the system some room to breathe.
104  *
105  * We do not use max-buffers as hard limit, because it could lead to
106  * congestion and further to a distributed deadlock during online-verify or
107  * (checksum based) resync, if the max-buffers, socket buffer sizes and
108  * resync-rate settings are mis-configured.
109  *
110  * Returns a page chain linked via page->private.
111  */
112 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
113 			      bool retry)
114 {
115 	struct drbd_device *device = peer_device->device;
116 	struct page *page;
117 	struct net_conf *nc;
118 	unsigned int mxb;
119 
120 	rcu_read_lock();
121 	nc = rcu_dereference(peer_device->connection->net_conf);
122 	mxb = nc ? nc->max_buffers : 1000000;
123 	rcu_read_unlock();
124 
125 	if (atomic_read(&device->pp_in_use) >= mxb)
126 		schedule_timeout_interruptible(HZ / 10);
127 	page = __drbd_alloc_pages(number);
128 
129 	if (page)
130 		atomic_add(number, &device->pp_in_use);
131 	return page;
132 }
133 
134 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
135  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
136  * Either links the page chain back to the global pool,
137  * or returns all pages to the system. */
138 static void drbd_free_pages(struct drbd_device *device, struct page *page)
139 {
140 	struct page *tmp;
141 	int i = 0;
142 
143 	if (page == NULL)
144 		return;
145 
146 	page_chain_for_each_safe(page, tmp) {
147 		set_page_private(page, 0);
148 		if (page_count(page) == 1)
149 			mempool_free(page, &drbd_buffer_page_pool);
150 		else
151 			put_page(page);
152 		i++;
153 	}
154 	i = atomic_sub_return(i, &device->pp_in_use);
155 	if (i < 0)
156 		drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
157 }
158 
159 /*
160 You need to hold the req_lock:
161  _drbd_wait_ee_list_empty()
162 
163 You must not have the req_lock:
164  drbd_free_peer_req()
165  drbd_alloc_peer_req()
166  drbd_free_peer_reqs()
167  drbd_ee_fix_bhs()
168  drbd_finish_peer_reqs()
169  drbd_clear_done_ee()
170  drbd_wait_ee_list_empty()
171 */
172 
173 /* normal: payload_size == request size (bi_size)
174  * w_same: payload_size == logical_block_size
175  * trim: payload_size == 0 */
176 struct drbd_peer_request *
177 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
178 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
179 {
180 	struct drbd_device *device = peer_device->device;
181 	struct drbd_peer_request *peer_req;
182 	struct page *page = NULL;
183 	unsigned int nr_pages = PFN_UP(payload_size);
184 
185 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
186 		return NULL;
187 
188 	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
189 	if (!peer_req) {
190 		if (!(gfp_mask & __GFP_NOWARN))
191 			drbd_err(device, "%s: allocation failed\n", __func__);
192 		return NULL;
193 	}
194 
195 	if (nr_pages) {
196 		page = drbd_alloc_pages(peer_device, nr_pages,
197 					gfpflags_allow_blocking(gfp_mask));
198 		if (!page)
199 			goto fail;
200 		if (!mempool_is_saturated(&drbd_buffer_page_pool))
201 			peer_req->flags |= EE_RELEASE_TO_MEMPOOL;
202 	}
203 
204 	memset(peer_req, 0, sizeof(*peer_req));
205 	INIT_LIST_HEAD(&peer_req->w.list);
206 	drbd_clear_interval(&peer_req->i);
207 	peer_req->i.size = request_size;
208 	peer_req->i.sector = sector;
209 	peer_req->submit_jif = jiffies;
210 	peer_req->peer_device = peer_device;
211 	peer_req->pages = page;
212 	/*
213 	 * The block_id is opaque to the receiver.  It is not endianness
214 	 * converted, and sent back to the sender unchanged.
215 	 */
216 	peer_req->block_id = id;
217 
218 	return peer_req;
219 
220  fail:
221 	mempool_free(peer_req, &drbd_ee_mempool);
222 	return NULL;
223 }
224 
225 void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req)
226 {
227 	might_sleep();
228 	if (peer_req->flags & EE_HAS_DIGEST)
229 		kfree(peer_req->digest);
230 	drbd_free_pages(device, peer_req->pages);
231 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
232 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
233 	if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
234 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
235 		drbd_al_complete_io(device, &peer_req->i);
236 	}
237 	mempool_free(peer_req, &drbd_ee_mempool);
238 }
239 
240 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
241 {
242 	LIST_HEAD(work_list);
243 	struct drbd_peer_request *peer_req, *t;
244 	int count = 0;
245 
246 	spin_lock_irq(&device->resource->req_lock);
247 	list_splice_init(list, &work_list);
248 	spin_unlock_irq(&device->resource->req_lock);
249 
250 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
251 		drbd_free_peer_req(device, peer_req);
252 		count++;
253 	}
254 	return count;
255 }
256 
257 /*
258  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
259  */
260 static int drbd_finish_peer_reqs(struct drbd_device *device)
261 {
262 	LIST_HEAD(work_list);
263 	struct drbd_peer_request *peer_req, *t;
264 	int err = 0;
265 
266 	spin_lock_irq(&device->resource->req_lock);
267 	list_splice_init(&device->done_ee, &work_list);
268 	spin_unlock_irq(&device->resource->req_lock);
269 
270 	/* possible callbacks here:
271 	 * e_end_block, and e_end_resync_block, e_send_superseded.
272 	 * all ignore the last argument.
273 	 */
274 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
275 		int err2;
276 
277 		/* list_del not necessary, next/prev members not touched */
278 		err2 = peer_req->w.cb(&peer_req->w, !!err);
279 		if (!err)
280 			err = err2;
281 		drbd_free_peer_req(device, peer_req);
282 	}
283 	wake_up(&device->ee_wait);
284 
285 	return err;
286 }
287 
288 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
289 				     struct list_head *head)
290 {
291 	DEFINE_WAIT(wait);
292 
293 	/* avoids spin_lock/unlock
294 	 * and calling prepare_to_wait in the fast path */
295 	while (!list_empty(head)) {
296 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
297 		spin_unlock_irq(&device->resource->req_lock);
298 		io_schedule();
299 		finish_wait(&device->ee_wait, &wait);
300 		spin_lock_irq(&device->resource->req_lock);
301 	}
302 }
303 
304 static void drbd_wait_ee_list_empty(struct drbd_device *device,
305 				    struct list_head *head)
306 {
307 	spin_lock_irq(&device->resource->req_lock);
308 	_drbd_wait_ee_list_empty(device, head);
309 	spin_unlock_irq(&device->resource->req_lock);
310 }
311 
312 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
313 {
314 	struct kvec iov = {
315 		.iov_base = buf,
316 		.iov_len = size,
317 	};
318 	struct msghdr msg = {
319 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
320 	};
321 	iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size);
322 	return sock_recvmsg(sock, &msg, msg.msg_flags);
323 }
324 
325 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
326 {
327 	int rv;
328 
329 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
330 
331 	if (rv < 0) {
332 		if (rv == -ECONNRESET)
333 			drbd_info(connection, "sock was reset by peer\n");
334 		else if (rv != -ERESTARTSYS)
335 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
336 	} else if (rv == 0) {
337 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
338 			long t;
339 			rcu_read_lock();
340 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
341 			rcu_read_unlock();
342 
343 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
344 
345 			if (t)
346 				goto out;
347 		}
348 		drbd_info(connection, "sock was shut down by peer\n");
349 	}
350 
351 	if (rv != size)
352 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
353 
354 out:
355 	return rv;
356 }
357 
358 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
359 {
360 	int err;
361 
362 	err = drbd_recv(connection, buf, size);
363 	if (err != size) {
364 		if (err >= 0)
365 			err = -EIO;
366 	} else
367 		err = 0;
368 	return err;
369 }
370 
371 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
372 {
373 	int err;
374 
375 	err = drbd_recv_all(connection, buf, size);
376 	if (err && !signal_pending(current))
377 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
378 	return err;
379 }
380 
381 /* quoting tcp(7):
382  *   On individual connections, the socket buffer size must be set prior to the
383  *   listen(2) or connect(2) calls in order to have it take effect.
384  * This is our wrapper to do so.
385  */
386 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
387 		unsigned int rcv)
388 {
389 	/* open coded SO_SNDBUF, SO_RCVBUF */
390 	if (snd) {
391 		sock->sk->sk_sndbuf = snd;
392 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
393 	}
394 	if (rcv) {
395 		sock->sk->sk_rcvbuf = rcv;
396 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
397 	}
398 }
399 
400 static struct socket *drbd_try_connect(struct drbd_connection *connection)
401 {
402 	const char *what;
403 	struct socket *sock;
404 	struct sockaddr_in6 src_in6;
405 	struct sockaddr_in6 peer_in6;
406 	struct net_conf *nc;
407 	int err, peer_addr_len, my_addr_len;
408 	int sndbuf_size, rcvbuf_size, connect_int;
409 	int disconnect_on_error = 1;
410 
411 	rcu_read_lock();
412 	nc = rcu_dereference(connection->net_conf);
413 	if (!nc) {
414 		rcu_read_unlock();
415 		return NULL;
416 	}
417 	sndbuf_size = nc->sndbuf_size;
418 	rcvbuf_size = nc->rcvbuf_size;
419 	connect_int = nc->connect_int;
420 	rcu_read_unlock();
421 
422 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
423 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
424 
425 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
426 		src_in6.sin6_port = 0;
427 	else
428 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
429 
430 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
431 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
432 
433 	what = "sock_create_kern";
434 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
435 			       SOCK_STREAM, IPPROTO_TCP, &sock);
436 	if (err < 0) {
437 		sock = NULL;
438 		goto out;
439 	}
440 
441 	sock->sk->sk_rcvtimeo =
442 	sock->sk->sk_sndtimeo = connect_int * HZ;
443 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
444 
445        /* explicitly bind to the configured IP as source IP
446 	*  for the outgoing connections.
447 	*  This is needed for multihomed hosts and to be
448 	*  able to use lo: interfaces for drbd.
449 	* Make sure to use 0 as port number, so linux selects
450 	*  a free one dynamically.
451 	*/
452 	what = "bind before connect";
453 	err = sock->ops->bind(sock, (struct sockaddr_unsized *) &src_in6, my_addr_len);
454 	if (err < 0)
455 		goto out;
456 
457 	/* connect may fail, peer not yet available.
458 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
459 	disconnect_on_error = 0;
460 	what = "connect";
461 	err = sock->ops->connect(sock, (struct sockaddr_unsized *) &peer_in6, peer_addr_len, 0);
462 
463 out:
464 	if (err < 0) {
465 		if (sock) {
466 			sock_release(sock);
467 			sock = NULL;
468 		}
469 		switch (-err) {
470 			/* timeout, busy, signal pending */
471 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
472 		case EINTR: case ERESTARTSYS:
473 			/* peer not (yet) available, network problem */
474 		case ECONNREFUSED: case ENETUNREACH:
475 		case EHOSTDOWN:    case EHOSTUNREACH:
476 			disconnect_on_error = 0;
477 			break;
478 		default:
479 			drbd_err(connection, "%s failed, err = %d\n", what, err);
480 		}
481 		if (disconnect_on_error)
482 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
483 	}
484 
485 	return sock;
486 }
487 
488 struct accept_wait_data {
489 	struct drbd_connection *connection;
490 	struct socket *s_listen;
491 	struct completion door_bell;
492 	void (*original_sk_state_change)(struct sock *sk);
493 
494 };
495 
496 static void drbd_incoming_connection(struct sock *sk)
497 {
498 	struct accept_wait_data *ad = sk->sk_user_data;
499 	void (*state_change)(struct sock *sk);
500 
501 	state_change = ad->original_sk_state_change;
502 	if (sk->sk_state == TCP_ESTABLISHED)
503 		complete(&ad->door_bell);
504 	state_change(sk);
505 }
506 
507 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
508 {
509 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
510 	struct sockaddr_in6 my_addr;
511 	struct socket *s_listen;
512 	struct net_conf *nc;
513 	const char *what;
514 
515 	rcu_read_lock();
516 	nc = rcu_dereference(connection->net_conf);
517 	if (!nc) {
518 		rcu_read_unlock();
519 		return -EIO;
520 	}
521 	sndbuf_size = nc->sndbuf_size;
522 	rcvbuf_size = nc->rcvbuf_size;
523 	rcu_read_unlock();
524 
525 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
526 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
527 
528 	what = "sock_create_kern";
529 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
530 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
531 	if (err) {
532 		s_listen = NULL;
533 		goto out;
534 	}
535 
536 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
537 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
538 
539 	what = "bind before listen";
540 	err = s_listen->ops->bind(s_listen, (struct sockaddr_unsized *)&my_addr, my_addr_len);
541 	if (err < 0)
542 		goto out;
543 
544 	ad->s_listen = s_listen;
545 	write_lock_bh(&s_listen->sk->sk_callback_lock);
546 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
547 	s_listen->sk->sk_state_change = drbd_incoming_connection;
548 	s_listen->sk->sk_user_data = ad;
549 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
550 
551 	what = "listen";
552 	err = s_listen->ops->listen(s_listen, 5);
553 	if (err < 0)
554 		goto out;
555 
556 	return 0;
557 out:
558 	if (s_listen)
559 		sock_release(s_listen);
560 	if (err < 0) {
561 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
562 			drbd_err(connection, "%s failed, err = %d\n", what, err);
563 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
564 		}
565 	}
566 
567 	return -EIO;
568 }
569 
570 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
571 {
572 	write_lock_bh(&sk->sk_callback_lock);
573 	sk->sk_state_change = ad->original_sk_state_change;
574 	sk->sk_user_data = NULL;
575 	write_unlock_bh(&sk->sk_callback_lock);
576 }
577 
578 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
579 {
580 	int timeo, connect_int, err = 0;
581 	struct socket *s_estab = NULL;
582 	struct net_conf *nc;
583 
584 	rcu_read_lock();
585 	nc = rcu_dereference(connection->net_conf);
586 	if (!nc) {
587 		rcu_read_unlock();
588 		return NULL;
589 	}
590 	connect_int = nc->connect_int;
591 	rcu_read_unlock();
592 
593 	timeo = connect_int * HZ;
594 	/* 28.5% random jitter */
595 	timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7;
596 
597 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
598 	if (err <= 0)
599 		return NULL;
600 
601 	err = kernel_accept(ad->s_listen, &s_estab, 0);
602 	if (err < 0) {
603 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
604 			drbd_err(connection, "accept failed, err = %d\n", err);
605 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
606 		}
607 	}
608 
609 	if (s_estab)
610 		unregister_state_change(s_estab->sk, ad);
611 
612 	return s_estab;
613 }
614 
615 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
616 
617 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
618 			     enum drbd_packet cmd)
619 {
620 	if (!conn_prepare_command(connection, sock))
621 		return -EIO;
622 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
623 }
624 
625 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
626 {
627 	unsigned int header_size = drbd_header_size(connection);
628 	struct packet_info pi;
629 	struct net_conf *nc;
630 	int err;
631 
632 	rcu_read_lock();
633 	nc = rcu_dereference(connection->net_conf);
634 	if (!nc) {
635 		rcu_read_unlock();
636 		return -EIO;
637 	}
638 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
639 	rcu_read_unlock();
640 
641 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
642 	if (err != header_size) {
643 		if (err >= 0)
644 			err = -EIO;
645 		return err;
646 	}
647 	err = decode_header(connection, connection->data.rbuf, &pi);
648 	if (err)
649 		return err;
650 	return pi.cmd;
651 }
652 
653 /**
654  * drbd_socket_okay() - Free the socket if its connection is not okay
655  * @sock:	pointer to the pointer to the socket.
656  */
657 static bool drbd_socket_okay(struct socket **sock)
658 {
659 	int rr;
660 	char tb[4];
661 
662 	if (!*sock)
663 		return false;
664 
665 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
666 
667 	if (rr > 0 || rr == -EAGAIN) {
668 		return true;
669 	} else {
670 		sock_release(*sock);
671 		*sock = NULL;
672 		return false;
673 	}
674 }
675 
676 static bool connection_established(struct drbd_connection *connection,
677 				   struct socket **sock1,
678 				   struct socket **sock2)
679 {
680 	struct net_conf *nc;
681 	int timeout;
682 	bool ok;
683 
684 	if (!*sock1 || !*sock2)
685 		return false;
686 
687 	rcu_read_lock();
688 	nc = rcu_dereference(connection->net_conf);
689 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
690 	rcu_read_unlock();
691 	schedule_timeout_interruptible(timeout);
692 
693 	ok = drbd_socket_okay(sock1);
694 	ok = drbd_socket_okay(sock2) && ok;
695 
696 	return ok;
697 }
698 
699 /* Gets called if a connection is established, or if a new minor gets created
700    in a connection */
701 int drbd_connected(struct drbd_peer_device *peer_device)
702 {
703 	struct drbd_device *device = peer_device->device;
704 	int err;
705 
706 	atomic_set(&device->packet_seq, 0);
707 	device->peer_seq = 0;
708 
709 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
710 		&peer_device->connection->cstate_mutex :
711 		&device->own_state_mutex;
712 
713 	err = drbd_send_sync_param(peer_device);
714 	if (!err)
715 		err = drbd_send_sizes(peer_device, 0, 0);
716 	if (!err)
717 		err = drbd_send_uuids(peer_device);
718 	if (!err)
719 		err = drbd_send_current_state(peer_device);
720 	clear_bit(USE_DEGR_WFC_T, &device->flags);
721 	clear_bit(RESIZE_PENDING, &device->flags);
722 	atomic_set(&device->ap_in_flight, 0);
723 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
724 	return err;
725 }
726 
727 /*
728  * return values:
729  *   1 yes, we have a valid connection
730  *   0 oops, did not work out, please try again
731  *  -1 peer talks different language,
732  *     no point in trying again, please go standalone.
733  *  -2 We do not have a network config...
734  */
735 static int conn_connect(struct drbd_connection *connection)
736 {
737 	struct drbd_socket sock, msock;
738 	struct drbd_peer_device *peer_device;
739 	struct net_conf *nc;
740 	int vnr, timeout, h;
741 	bool discard_my_data, ok;
742 	enum drbd_state_rv rv;
743 	struct accept_wait_data ad = {
744 		.connection = connection,
745 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
746 	};
747 
748 	clear_bit(DISCONNECT_SENT, &connection->flags);
749 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
750 		return -2;
751 
752 	mutex_init(&sock.mutex);
753 	sock.sbuf = connection->data.sbuf;
754 	sock.rbuf = connection->data.rbuf;
755 	sock.socket = NULL;
756 	mutex_init(&msock.mutex);
757 	msock.sbuf = connection->meta.sbuf;
758 	msock.rbuf = connection->meta.rbuf;
759 	msock.socket = NULL;
760 
761 	/* Assume that the peer only understands protocol 80 until we know better.  */
762 	connection->agreed_pro_version = 80;
763 
764 	if (prepare_listen_socket(connection, &ad))
765 		return 0;
766 
767 	do {
768 		struct socket *s;
769 
770 		s = drbd_try_connect(connection);
771 		if (s) {
772 			if (!sock.socket) {
773 				sock.socket = s;
774 				send_first_packet(connection, &sock, P_INITIAL_DATA);
775 			} else if (!msock.socket) {
776 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
777 				msock.socket = s;
778 				send_first_packet(connection, &msock, P_INITIAL_META);
779 			} else {
780 				drbd_err(connection, "Logic error in conn_connect()\n");
781 				goto out_release_sockets;
782 			}
783 		}
784 
785 		if (connection_established(connection, &sock.socket, &msock.socket))
786 			break;
787 
788 retry:
789 		s = drbd_wait_for_connect(connection, &ad);
790 		if (s) {
791 			int fp = receive_first_packet(connection, s);
792 			drbd_socket_okay(&sock.socket);
793 			drbd_socket_okay(&msock.socket);
794 			switch (fp) {
795 			case P_INITIAL_DATA:
796 				if (sock.socket) {
797 					drbd_warn(connection, "initial packet S crossed\n");
798 					sock_release(sock.socket);
799 					sock.socket = s;
800 					goto randomize;
801 				}
802 				sock.socket = s;
803 				break;
804 			case P_INITIAL_META:
805 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
806 				if (msock.socket) {
807 					drbd_warn(connection, "initial packet M crossed\n");
808 					sock_release(msock.socket);
809 					msock.socket = s;
810 					goto randomize;
811 				}
812 				msock.socket = s;
813 				break;
814 			default:
815 				drbd_warn(connection, "Error receiving initial packet\n");
816 				sock_release(s);
817 randomize:
818 				if (get_random_u32_below(2))
819 					goto retry;
820 			}
821 		}
822 
823 		if (connection->cstate <= C_DISCONNECTING)
824 			goto out_release_sockets;
825 		if (signal_pending(current)) {
826 			flush_signals(current);
827 			smp_rmb();
828 			if (get_t_state(&connection->receiver) == EXITING)
829 				goto out_release_sockets;
830 		}
831 
832 		ok = connection_established(connection, &sock.socket, &msock.socket);
833 	} while (!ok);
834 
835 	if (ad.s_listen)
836 		sock_release(ad.s_listen);
837 
838 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
839 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
840 
841 	sock.socket->sk->sk_allocation = GFP_NOIO;
842 	msock.socket->sk->sk_allocation = GFP_NOIO;
843 
844 	sock.socket->sk->sk_use_task_frag = false;
845 	msock.socket->sk->sk_use_task_frag = false;
846 
847 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
848 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
849 
850 	/* NOT YET ...
851 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
852 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
853 	 * first set it to the P_CONNECTION_FEATURES timeout,
854 	 * which we set to 4x the configured ping_timeout. */
855 	rcu_read_lock();
856 	nc = rcu_dereference(connection->net_conf);
857 
858 	sock.socket->sk->sk_sndtimeo =
859 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
860 
861 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
862 	timeout = nc->timeout * HZ / 10;
863 	discard_my_data = nc->discard_my_data;
864 	rcu_read_unlock();
865 
866 	msock.socket->sk->sk_sndtimeo = timeout;
867 
868 	/* we don't want delays.
869 	 * we use TCP_CORK where appropriate, though */
870 	tcp_sock_set_nodelay(sock.socket->sk);
871 	tcp_sock_set_nodelay(msock.socket->sk);
872 
873 	connection->data.socket = sock.socket;
874 	connection->meta.socket = msock.socket;
875 	connection->last_received = jiffies;
876 
877 	h = drbd_do_features(connection);
878 	if (h <= 0)
879 		return h;
880 
881 	if (connection->cram_hmac_tfm) {
882 		/* drbd_request_state(device, NS(conn, WFAuth)); */
883 		switch (drbd_do_auth(connection)) {
884 		case -1:
885 			drbd_err(connection, "Authentication of peer failed\n");
886 			return -1;
887 		case 0:
888 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
889 			return 0;
890 		}
891 	}
892 
893 	connection->data.socket->sk->sk_sndtimeo = timeout;
894 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
895 
896 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
897 		return -1;
898 
899 	/* Prevent a race between resync-handshake and
900 	 * being promoted to Primary.
901 	 *
902 	 * Grab and release the state mutex, so we know that any current
903 	 * drbd_set_role() is finished, and any incoming drbd_set_role
904 	 * will see the STATE_SENT flag, and wait for it to be cleared.
905 	 */
906 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
907 		mutex_lock(peer_device->device->state_mutex);
908 
909 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
910 	spin_lock_irq(&connection->resource->req_lock);
911 	set_bit(STATE_SENT, &connection->flags);
912 	spin_unlock_irq(&connection->resource->req_lock);
913 
914 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
915 		mutex_unlock(peer_device->device->state_mutex);
916 
917 	rcu_read_lock();
918 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
919 		struct drbd_device *device = peer_device->device;
920 		kref_get(&device->kref);
921 		rcu_read_unlock();
922 
923 		if (discard_my_data)
924 			set_bit(DISCARD_MY_DATA, &device->flags);
925 		else
926 			clear_bit(DISCARD_MY_DATA, &device->flags);
927 
928 		drbd_connected(peer_device);
929 		kref_put(&device->kref, drbd_destroy_device);
930 		rcu_read_lock();
931 	}
932 	rcu_read_unlock();
933 
934 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
935 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
936 		clear_bit(STATE_SENT, &connection->flags);
937 		return 0;
938 	}
939 
940 	drbd_thread_start(&connection->ack_receiver);
941 	/* opencoded create_singlethread_workqueue(),
942 	 * to be able to use format string arguments */
943 	connection->ack_sender =
944 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
945 	if (!connection->ack_sender) {
946 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
947 		return 0;
948 	}
949 
950 	mutex_lock(&connection->resource->conf_update);
951 	/* The discard_my_data flag is a single-shot modifier to the next
952 	 * connection attempt, the handshake of which is now well underway.
953 	 * No need for rcu style copying of the whole struct
954 	 * just to clear a single value. */
955 	connection->net_conf->discard_my_data = 0;
956 	mutex_unlock(&connection->resource->conf_update);
957 
958 	return h;
959 
960 out_release_sockets:
961 	if (ad.s_listen)
962 		sock_release(ad.s_listen);
963 	if (sock.socket)
964 		sock_release(sock.socket);
965 	if (msock.socket)
966 		sock_release(msock.socket);
967 	return -1;
968 }
969 
970 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
971 {
972 	unsigned int header_size = drbd_header_size(connection);
973 
974 	if (header_size == sizeof(struct p_header100) &&
975 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
976 		struct p_header100 *h = header;
977 		if (h->pad != 0) {
978 			drbd_err(connection, "Header padding is not zero\n");
979 			return -EINVAL;
980 		}
981 		pi->vnr = be16_to_cpu(h->volume);
982 		pi->cmd = be16_to_cpu(h->command);
983 		pi->size = be32_to_cpu(h->length);
984 	} else if (header_size == sizeof(struct p_header95) &&
985 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
986 		struct p_header95 *h = header;
987 		pi->cmd = be16_to_cpu(h->command);
988 		pi->size = be32_to_cpu(h->length);
989 		pi->vnr = 0;
990 	} else if (header_size == sizeof(struct p_header80) &&
991 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
992 		struct p_header80 *h = header;
993 		pi->cmd = be16_to_cpu(h->command);
994 		pi->size = be16_to_cpu(h->length);
995 		pi->vnr = 0;
996 	} else {
997 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
998 			 be32_to_cpu(*(__be32 *)header),
999 			 connection->agreed_pro_version);
1000 		return -EINVAL;
1001 	}
1002 	pi->data = header + header_size;
1003 	return 0;
1004 }
1005 
1006 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1007 {
1008 	if (current->plug == &connection->receiver_plug) {
1009 		blk_finish_plug(&connection->receiver_plug);
1010 		blk_start_plug(&connection->receiver_plug);
1011 	} /* else: maybe just schedule() ?? */
1012 }
1013 
1014 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1015 {
1016 	void *buffer = connection->data.rbuf;
1017 	int err;
1018 
1019 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1020 	if (err)
1021 		return err;
1022 
1023 	err = decode_header(connection, buffer, pi);
1024 	connection->last_received = jiffies;
1025 
1026 	return err;
1027 }
1028 
1029 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1030 {
1031 	void *buffer = connection->data.rbuf;
1032 	unsigned int size = drbd_header_size(connection);
1033 	int err;
1034 
1035 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1036 	if (err != size) {
1037 		/* If we have nothing in the receive buffer now, to reduce
1038 		 * application latency, try to drain the backend queues as
1039 		 * quickly as possible, and let remote TCP know what we have
1040 		 * received so far. */
1041 		if (err == -EAGAIN) {
1042 			tcp_sock_set_quickack(connection->data.socket->sk, 2);
1043 			drbd_unplug_all_devices(connection);
1044 		}
1045 		if (err > 0) {
1046 			buffer += err;
1047 			size -= err;
1048 		}
1049 		err = drbd_recv_all_warn(connection, buffer, size);
1050 		if (err)
1051 			return err;
1052 	}
1053 
1054 	err = decode_header(connection, connection->data.rbuf, pi);
1055 	connection->last_received = jiffies;
1056 
1057 	return err;
1058 }
1059 /* This is blkdev_issue_flush, but asynchronous.
1060  * We want to submit to all component volumes in parallel,
1061  * then wait for all completions.
1062  */
1063 struct issue_flush_context {
1064 	atomic_t pending;
1065 	int error;
1066 	struct completion done;
1067 };
1068 struct one_flush_context {
1069 	struct drbd_device *device;
1070 	struct issue_flush_context *ctx;
1071 };
1072 
1073 static void one_flush_endio(struct bio *bio)
1074 {
1075 	struct one_flush_context *octx = bio->bi_private;
1076 	struct drbd_device *device = octx->device;
1077 	struct issue_flush_context *ctx = octx->ctx;
1078 
1079 	if (bio->bi_status) {
1080 		ctx->error = blk_status_to_errno(bio->bi_status);
1081 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1082 	}
1083 	kfree(octx);
1084 	bio_put(bio);
1085 
1086 	clear_bit(FLUSH_PENDING, &device->flags);
1087 	put_ldev(device);
1088 	kref_put(&device->kref, drbd_destroy_device);
1089 
1090 	if (atomic_dec_and_test(&ctx->pending))
1091 		complete(&ctx->done);
1092 }
1093 
1094 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1095 {
1096 	struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
1097 				    REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO);
1098 	struct one_flush_context *octx = kmalloc_obj(*octx, GFP_NOIO);
1099 
1100 	if (!octx) {
1101 		drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
1102 		/* FIXME: what else can I do now?  disconnecting or detaching
1103 		 * really does not help to improve the state of the world, either.
1104 		 */
1105 		bio_put(bio);
1106 
1107 		ctx->error = -ENOMEM;
1108 		put_ldev(device);
1109 		kref_put(&device->kref, drbd_destroy_device);
1110 		return;
1111 	}
1112 
1113 	octx->device = device;
1114 	octx->ctx = ctx;
1115 	bio->bi_private = octx;
1116 	bio->bi_end_io = one_flush_endio;
1117 
1118 	device->flush_jif = jiffies;
1119 	set_bit(FLUSH_PENDING, &device->flags);
1120 	atomic_inc(&ctx->pending);
1121 	submit_bio(bio);
1122 }
1123 
1124 static void drbd_flush(struct drbd_connection *connection)
1125 {
1126 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1127 		struct drbd_peer_device *peer_device;
1128 		struct issue_flush_context ctx;
1129 		int vnr;
1130 
1131 		atomic_set(&ctx.pending, 1);
1132 		ctx.error = 0;
1133 		init_completion(&ctx.done);
1134 
1135 		rcu_read_lock();
1136 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1137 			struct drbd_device *device = peer_device->device;
1138 
1139 			if (!get_ldev(device))
1140 				continue;
1141 			kref_get(&device->kref);
1142 			rcu_read_unlock();
1143 
1144 			submit_one_flush(device, &ctx);
1145 
1146 			rcu_read_lock();
1147 		}
1148 		rcu_read_unlock();
1149 
1150 		/* Do we want to add a timeout,
1151 		 * if disk-timeout is set? */
1152 		if (!atomic_dec_and_test(&ctx.pending))
1153 			wait_for_completion(&ctx.done);
1154 
1155 		if (ctx.error) {
1156 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1157 			 * don't try again for ANY return value != 0
1158 			 * if (rv == -EOPNOTSUPP) */
1159 			/* Any error is already reported by bio_endio callback. */
1160 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1161 		}
1162 	}
1163 }
1164 
1165 /**
1166  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1167  * @connection:	DRBD connection.
1168  * @epoch:	Epoch object.
1169  * @ev:		Epoch event.
1170  */
1171 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1172 					       struct drbd_epoch *epoch,
1173 					       enum epoch_event ev)
1174 {
1175 	int epoch_size;
1176 	struct drbd_epoch *next_epoch;
1177 	enum finish_epoch rv = FE_STILL_LIVE;
1178 
1179 	spin_lock(&connection->epoch_lock);
1180 	do {
1181 		next_epoch = NULL;
1182 
1183 		epoch_size = atomic_read(&epoch->epoch_size);
1184 
1185 		switch (ev & ~EV_CLEANUP) {
1186 		case EV_PUT:
1187 			atomic_dec(&epoch->active);
1188 			break;
1189 		case EV_GOT_BARRIER_NR:
1190 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1191 			break;
1192 		case EV_BECAME_LAST:
1193 			/* nothing to do*/
1194 			break;
1195 		}
1196 
1197 		if (epoch_size != 0 &&
1198 		    atomic_read(&epoch->active) == 0 &&
1199 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1200 			if (!(ev & EV_CLEANUP)) {
1201 				spin_unlock(&connection->epoch_lock);
1202 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1203 				spin_lock(&connection->epoch_lock);
1204 			}
1205 #if 0
1206 			/* FIXME: dec unacked on connection, once we have
1207 			 * something to count pending connection packets in. */
1208 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1209 				dec_unacked(epoch->connection);
1210 #endif
1211 
1212 			if (connection->current_epoch != epoch) {
1213 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1214 				list_del(&epoch->list);
1215 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1216 				connection->epochs--;
1217 				kfree(epoch);
1218 
1219 				if (rv == FE_STILL_LIVE)
1220 					rv = FE_DESTROYED;
1221 			} else {
1222 				epoch->flags = 0;
1223 				atomic_set(&epoch->epoch_size, 0);
1224 				/* atomic_set(&epoch->active, 0); is already zero */
1225 				if (rv == FE_STILL_LIVE)
1226 					rv = FE_RECYCLED;
1227 			}
1228 		}
1229 
1230 		if (!next_epoch)
1231 			break;
1232 
1233 		epoch = next_epoch;
1234 	} while (1);
1235 
1236 	spin_unlock(&connection->epoch_lock);
1237 
1238 	return rv;
1239 }
1240 
1241 static enum write_ordering_e
1242 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1243 {
1244 	struct disk_conf *dc;
1245 
1246 	dc = rcu_dereference(bdev->disk_conf);
1247 
1248 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1249 		wo = WO_DRAIN_IO;
1250 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1251 		wo = WO_NONE;
1252 
1253 	return wo;
1254 }
1255 
1256 /*
1257  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1258  * @wo:		Write ordering method to try.
1259  */
1260 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1261 			      enum write_ordering_e wo)
1262 {
1263 	struct drbd_device *device;
1264 	enum write_ordering_e pwo;
1265 	int vnr;
1266 	static char *write_ordering_str[] = {
1267 		[WO_NONE] = "none",
1268 		[WO_DRAIN_IO] = "drain",
1269 		[WO_BDEV_FLUSH] = "flush",
1270 	};
1271 
1272 	pwo = resource->write_ordering;
1273 	if (wo != WO_BDEV_FLUSH)
1274 		wo = min(pwo, wo);
1275 	rcu_read_lock();
1276 	idr_for_each_entry(&resource->devices, device, vnr) {
1277 		if (get_ldev(device)) {
1278 			wo = max_allowed_wo(device->ldev, wo);
1279 			if (device->ldev == bdev)
1280 				bdev = NULL;
1281 			put_ldev(device);
1282 		}
1283 	}
1284 
1285 	if (bdev)
1286 		wo = max_allowed_wo(bdev, wo);
1287 
1288 	rcu_read_unlock();
1289 
1290 	resource->write_ordering = wo;
1291 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1292 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1293 }
1294 
1295 /*
1296  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1297  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1298  * will directly go to fallback mode, submitting normal writes, and
1299  * never even try to UNMAP.
1300  *
1301  * And dm-thin does not do this (yet), mostly because in general it has
1302  * to assume that "skip_block_zeroing" is set.  See also:
1303  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1304  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1305  *
1306  * We *may* ignore the discard-zeroes-data setting, if so configured.
1307  *
1308  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1309  * may ignore partial unaligned discards.
1310  *
1311  * LVM/DM thin as of at least
1312  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1313  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1314  *   Driver version:  4.29.0
1315  * still behaves this way.
1316  *
1317  * For unaligned (wrt. alignment and granularity) or too small discards,
1318  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1319  * but discard all the aligned full chunks.
1320  *
1321  * At least for LVM/DM thin, with skip_block_zeroing=false,
1322  * the result is effectively "discard_zeroes_data=1".
1323  */
1324 /* flags: EE_TRIM|EE_ZEROOUT */
1325 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1326 {
1327 	struct block_device *bdev = device->ldev->backing_bdev;
1328 	sector_t tmp, nr;
1329 	unsigned int max_discard_sectors, granularity;
1330 	int alignment;
1331 	int err = 0;
1332 
1333 	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1334 		goto zero_out;
1335 
1336 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1337 	granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
1338 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1339 
1340 	max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
1341 	max_discard_sectors -= max_discard_sectors % granularity;
1342 	if (unlikely(!max_discard_sectors))
1343 		goto zero_out;
1344 
1345 	if (nr_sectors < granularity)
1346 		goto zero_out;
1347 
1348 	tmp = start;
1349 	if (sector_div(tmp, granularity) != alignment) {
1350 		if (nr_sectors < 2*granularity)
1351 			goto zero_out;
1352 		/* start + gran - (start + gran - align) % gran */
1353 		tmp = start + granularity - alignment;
1354 		tmp = start + granularity - sector_div(tmp, granularity);
1355 
1356 		nr = tmp - start;
1357 		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1358 		 * layers are below us, some may have smaller granularity */
1359 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1360 		nr_sectors -= nr;
1361 		start = tmp;
1362 	}
1363 	while (nr_sectors >= max_discard_sectors) {
1364 		err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
1365 					    GFP_NOIO);
1366 		nr_sectors -= max_discard_sectors;
1367 		start += max_discard_sectors;
1368 	}
1369 	if (nr_sectors) {
1370 		/* max_discard_sectors is unsigned int (and a multiple of
1371 		 * granularity, we made sure of that above already);
1372 		 * nr is < max_discard_sectors;
1373 		 * I don't need sector_div here, even though nr is sector_t */
1374 		nr = nr_sectors;
1375 		nr -= (unsigned int)nr % granularity;
1376 		if (nr) {
1377 			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
1378 			nr_sectors -= nr;
1379 			start += nr;
1380 		}
1381 	}
1382  zero_out:
1383 	if (nr_sectors) {
1384 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1385 				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1386 	}
1387 	return err != 0;
1388 }
1389 
1390 static bool can_do_reliable_discards(struct drbd_device *device)
1391 {
1392 	struct disk_conf *dc;
1393 	bool can_do;
1394 
1395 	if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
1396 		return false;
1397 
1398 	rcu_read_lock();
1399 	dc = rcu_dereference(device->ldev->disk_conf);
1400 	can_do = dc->discard_zeroes_if_aligned;
1401 	rcu_read_unlock();
1402 	return can_do;
1403 }
1404 
1405 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1406 {
1407 	/* If the backend cannot discard, or does not guarantee
1408 	 * read-back zeroes in discarded ranges, we fall back to
1409 	 * zero-out.  Unless configuration specifically requested
1410 	 * otherwise. */
1411 	if (!can_do_reliable_discards(device))
1412 		peer_req->flags |= EE_ZEROOUT;
1413 
1414 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1415 	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1416 		peer_req->flags |= EE_WAS_ERROR;
1417 	drbd_endio_write_sec_final(peer_req);
1418 }
1419 
1420 static int peer_request_fault_type(struct drbd_peer_request *peer_req)
1421 {
1422 	if (peer_req_op(peer_req) == REQ_OP_READ) {
1423 		return peer_req->flags & EE_APPLICATION ?
1424 			DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD;
1425 	} else {
1426 		return peer_req->flags & EE_APPLICATION ?
1427 			DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR;
1428 	}
1429 }
1430 
1431 /**
1432  * drbd_submit_peer_request()
1433  * @peer_req:	peer request
1434  *
1435  * May spread the pages to multiple bios,
1436  * depending on bio_add_page restrictions.
1437  *
1438  * Returns 0 if all bios have been submitted,
1439  * -ENOMEM if we could not allocate enough bios,
1440  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1441  *  single page to an empty bio (which should never happen and likely indicates
1442  *  that the lower level IO stack is in some way broken). This has been observed
1443  *  on certain Xen deployments.
1444  */
1445 /* TODO allocate from our own bio_set. */
1446 int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
1447 {
1448 	struct drbd_device *device = peer_req->peer_device->device;
1449 	struct bio *bios = NULL;
1450 	struct bio *bio;
1451 	struct page *page = peer_req->pages;
1452 	sector_t sector = peer_req->i.sector;
1453 	unsigned int data_size = peer_req->i.size;
1454 	unsigned int n_bios = 0;
1455 	unsigned int nr_pages = PFN_UP(data_size);
1456 
1457 	/* TRIM/DISCARD: for now, always use the helper function
1458 	 * blkdev_issue_zeroout(..., discard=true).
1459 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1460 	 * Correctness first, performance later.  Next step is to code an
1461 	 * asynchronous variant of the same.
1462 	 */
1463 	if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
1464 		/* wait for all pending IO completions, before we start
1465 		 * zeroing things out. */
1466 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1467 		/* add it to the active list now,
1468 		 * so we can find it to present it in debugfs */
1469 		peer_req->submit_jif = jiffies;
1470 		peer_req->flags |= EE_SUBMITTED;
1471 
1472 		/* If this was a resync request from receive_rs_deallocated(),
1473 		 * it is already on the sync_ee list */
1474 		if (list_empty(&peer_req->w.list)) {
1475 			spin_lock_irq(&device->resource->req_lock);
1476 			list_add_tail(&peer_req->w.list, &device->active_ee);
1477 			spin_unlock_irq(&device->resource->req_lock);
1478 		}
1479 
1480 		drbd_issue_peer_discard_or_zero_out(device, peer_req);
1481 		return 0;
1482 	}
1483 
1484 	/* In most cases, we will only need one bio.  But in case the lower
1485 	 * level restrictions happen to be different at this offset on this
1486 	 * side than those of the sending peer, we may need to submit the
1487 	 * request in more than one bio.
1488 	 *
1489 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1490 	 * generated bio, but a bio allocated on behalf of the peer.
1491 	 */
1492 next_bio:
1493 	/* _DISCARD, _WRITE_ZEROES handled above.
1494 	 * REQ_OP_FLUSH (empty flush) not expected,
1495 	 * should have been mapped to a "drbd protocol barrier".
1496 	 * REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
1497 	 */
1498 	if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
1499 				peer_req_op(peer_req) == REQ_OP_READ)) {
1500 		drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf);
1501 		return -EINVAL;
1502 	}
1503 
1504 	bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO);
1505 	/* > peer_req->i.sector, unless this is the first bio */
1506 	bio->bi_iter.bi_sector = sector;
1507 	bio->bi_private = peer_req;
1508 	bio->bi_end_io = drbd_peer_request_endio;
1509 
1510 	bio->bi_next = bios;
1511 	bios = bio;
1512 	++n_bios;
1513 
1514 	page_chain_for_each(page) {
1515 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1516 		if (!bio_add_page(bio, page, len, 0))
1517 			goto next_bio;
1518 		data_size -= len;
1519 		sector += len >> 9;
1520 		--nr_pages;
1521 	}
1522 	D_ASSERT(device, data_size == 0);
1523 	D_ASSERT(device, page == NULL);
1524 
1525 	atomic_set(&peer_req->pending_bios, n_bios);
1526 	/* for debugfs: update timestamp, mark as submitted */
1527 	peer_req->submit_jif = jiffies;
1528 	peer_req->flags |= EE_SUBMITTED;
1529 	do {
1530 		bio = bios;
1531 		bios = bios->bi_next;
1532 		bio->bi_next = NULL;
1533 
1534 		drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio);
1535 	} while (bios);
1536 	return 0;
1537 }
1538 
1539 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1540 					     struct drbd_peer_request *peer_req)
1541 {
1542 	struct drbd_interval *i = &peer_req->i;
1543 
1544 	drbd_remove_interval(&device->write_requests, i);
1545 	drbd_clear_interval(i);
1546 
1547 	/* Wake up any processes waiting for this peer request to complete.  */
1548 	if (i->waiting)
1549 		wake_up(&device->misc_wait);
1550 }
1551 
1552 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1553 {
1554 	struct drbd_peer_device *peer_device;
1555 	int vnr;
1556 
1557 	rcu_read_lock();
1558 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1559 		struct drbd_device *device = peer_device->device;
1560 
1561 		kref_get(&device->kref);
1562 		rcu_read_unlock();
1563 		drbd_wait_ee_list_empty(device, &device->active_ee);
1564 		kref_put(&device->kref, drbd_destroy_device);
1565 		rcu_read_lock();
1566 	}
1567 	rcu_read_unlock();
1568 }
1569 
1570 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1571 {
1572 	int rv;
1573 	struct p_barrier *p = pi->data;
1574 	struct drbd_epoch *epoch;
1575 
1576 	/* FIXME these are unacked on connection,
1577 	 * not a specific (peer)device.
1578 	 */
1579 	connection->current_epoch->barrier_nr = p->barrier;
1580 	connection->current_epoch->connection = connection;
1581 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1582 
1583 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1584 	 * the activity log, which means it would not be resynced in case the
1585 	 * R_PRIMARY crashes now.
1586 	 * Therefore we must send the barrier_ack after the barrier request was
1587 	 * completed. */
1588 	switch (connection->resource->write_ordering) {
1589 	case WO_NONE:
1590 		if (rv == FE_RECYCLED)
1591 			return 0;
1592 
1593 		/* receiver context, in the writeout path of the other node.
1594 		 * avoid potential distributed deadlock */
1595 		epoch = kmalloc_obj(struct drbd_epoch, GFP_NOIO);
1596 		if (epoch)
1597 			break;
1598 		else
1599 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1600 		fallthrough;
1601 
1602 	case WO_BDEV_FLUSH:
1603 	case WO_DRAIN_IO:
1604 		conn_wait_active_ee_empty(connection);
1605 		drbd_flush(connection);
1606 
1607 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1608 			epoch = kmalloc_obj(struct drbd_epoch, GFP_NOIO);
1609 			if (epoch)
1610 				break;
1611 		}
1612 
1613 		return 0;
1614 	default:
1615 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1616 			 connection->resource->write_ordering);
1617 		return -EIO;
1618 	}
1619 
1620 	epoch->flags = 0;
1621 	atomic_set(&epoch->epoch_size, 0);
1622 	atomic_set(&epoch->active, 0);
1623 
1624 	spin_lock(&connection->epoch_lock);
1625 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1626 		list_add(&epoch->list, &connection->current_epoch->list);
1627 		connection->current_epoch = epoch;
1628 		connection->epochs++;
1629 	} else {
1630 		/* The current_epoch got recycled while we allocated this one... */
1631 		kfree(epoch);
1632 	}
1633 	spin_unlock(&connection->epoch_lock);
1634 
1635 	return 0;
1636 }
1637 
1638 /* quick wrapper in case payload size != request_size (write same) */
1639 static void drbd_csum_ee_size(struct crypto_shash *h,
1640 			      struct drbd_peer_request *r, void *d,
1641 			      unsigned int payload_size)
1642 {
1643 	unsigned int tmp = r->i.size;
1644 	r->i.size = payload_size;
1645 	drbd_csum_ee(h, r, d);
1646 	r->i.size = tmp;
1647 }
1648 
1649 /* used from receive_RSDataReply (recv_resync_read)
1650  * and from receive_Data.
1651  * data_size: actual payload ("data in")
1652  * 	for normal writes that is bi_size.
1653  * 	for discards, that is zero.
1654  * 	for write same, it is logical_block_size.
1655  * both trim and write same have the bi_size ("data len to be affected")
1656  * as extra argument in the packet header.
1657  */
1658 static struct drbd_peer_request *
1659 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1660 	      struct packet_info *pi) __must_hold(local)
1661 {
1662 	struct drbd_device *device = peer_device->device;
1663 	const sector_t capacity = get_capacity(device->vdisk);
1664 	struct drbd_peer_request *peer_req;
1665 	struct page *page;
1666 	int digest_size, err;
1667 	unsigned int data_size = pi->size, ds;
1668 	void *dig_in = peer_device->connection->int_dig_in;
1669 	void *dig_vv = peer_device->connection->int_dig_vv;
1670 	unsigned long *data;
1671 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1672 	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1673 
1674 	digest_size = 0;
1675 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1676 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1677 		/*
1678 		 * FIXME: Receive the incoming digest into the receive buffer
1679 		 *	  here, together with its struct p_data?
1680 		 */
1681 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1682 		if (err)
1683 			return NULL;
1684 		data_size -= digest_size;
1685 	}
1686 
1687 	/* assume request_size == data_size, but special case trim. */
1688 	ds = data_size;
1689 	if (trim) {
1690 		if (!expect(peer_device, data_size == 0))
1691 			return NULL;
1692 		ds = be32_to_cpu(trim->size);
1693 	} else if (zeroes) {
1694 		if (!expect(peer_device, data_size == 0))
1695 			return NULL;
1696 		ds = be32_to_cpu(zeroes->size);
1697 	}
1698 
1699 	if (!expect(peer_device, IS_ALIGNED(ds, 512)))
1700 		return NULL;
1701 	if (trim || zeroes) {
1702 		if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1703 			return NULL;
1704 	} else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE))
1705 		return NULL;
1706 
1707 	/* even though we trust out peer,
1708 	 * we sometimes have to double check. */
1709 	if (sector + (ds>>9) > capacity) {
1710 		drbd_err(device, "request from peer beyond end of local disk: "
1711 			"capacity: %llus < sector: %llus + size: %u\n",
1712 			(unsigned long long)capacity,
1713 			(unsigned long long)sector, ds);
1714 		return NULL;
1715 	}
1716 
1717 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1718 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1719 	 * which in turn might block on the other node at this very place.  */
1720 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1721 	if (!peer_req)
1722 		return NULL;
1723 
1724 	peer_req->flags |= EE_WRITE;
1725 	if (trim) {
1726 		peer_req->flags |= EE_TRIM;
1727 		return peer_req;
1728 	}
1729 	if (zeroes) {
1730 		peer_req->flags |= EE_ZEROOUT;
1731 		return peer_req;
1732 	}
1733 
1734 	/* receive payload size bytes into page chain */
1735 	ds = data_size;
1736 	page = peer_req->pages;
1737 	page_chain_for_each(page) {
1738 		unsigned len = min_t(int, ds, PAGE_SIZE);
1739 		data = kmap_local_page(page);
1740 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1741 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1742 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1743 			data[0] = data[0] ^ (unsigned long)-1;
1744 		}
1745 		kunmap_local(data);
1746 		if (err) {
1747 			drbd_free_peer_req(device, peer_req);
1748 			return NULL;
1749 		}
1750 		ds -= len;
1751 	}
1752 
1753 	if (digest_size) {
1754 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1755 		if (memcmp(dig_in, dig_vv, digest_size)) {
1756 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1757 				(unsigned long long)sector, data_size);
1758 			drbd_free_peer_req(device, peer_req);
1759 			return NULL;
1760 		}
1761 	}
1762 	device->recv_cnt += data_size >> 9;
1763 	return peer_req;
1764 }
1765 
1766 /* drbd_drain_block() just takes a data block
1767  * out of the socket input buffer, and discards it.
1768  */
1769 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1770 {
1771 	struct page *page;
1772 	int err = 0;
1773 	void *data;
1774 
1775 	if (!data_size)
1776 		return 0;
1777 
1778 	page = drbd_alloc_pages(peer_device, 1, 1);
1779 
1780 	data = kmap_local_page(page);
1781 	while (data_size) {
1782 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1783 
1784 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1785 		if (err)
1786 			break;
1787 		data_size -= len;
1788 	}
1789 	kunmap_local(data);
1790 	drbd_free_pages(peer_device->device, page);
1791 	return err;
1792 }
1793 
1794 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1795 			   sector_t sector, int data_size)
1796 {
1797 	struct bio_vec bvec;
1798 	struct bvec_iter iter;
1799 	struct bio *bio;
1800 	int digest_size, err, expect;
1801 	void *dig_in = peer_device->connection->int_dig_in;
1802 	void *dig_vv = peer_device->connection->int_dig_vv;
1803 
1804 	digest_size = 0;
1805 	if (peer_device->connection->peer_integrity_tfm) {
1806 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1807 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1808 		if (err)
1809 			return err;
1810 		data_size -= digest_size;
1811 	}
1812 
1813 	/* optimistically update recv_cnt.  if receiving fails below,
1814 	 * we disconnect anyways, and counters will be reset. */
1815 	peer_device->device->recv_cnt += data_size>>9;
1816 
1817 	bio = req->master_bio;
1818 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1819 
1820 	bio_for_each_segment(bvec, bio, iter) {
1821 		void *mapped = bvec_kmap_local(&bvec);
1822 		expect = min_t(int, data_size, bvec.bv_len);
1823 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1824 		kunmap_local(mapped);
1825 		if (err)
1826 			return err;
1827 		data_size -= expect;
1828 	}
1829 
1830 	if (digest_size) {
1831 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1832 		if (memcmp(dig_in, dig_vv, digest_size)) {
1833 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1834 			return -EINVAL;
1835 		}
1836 	}
1837 
1838 	D_ASSERT(peer_device->device, data_size == 0);
1839 	return 0;
1840 }
1841 
1842 /*
1843  * e_end_resync_block() is called in ack_sender context via
1844  * drbd_finish_peer_reqs().
1845  */
1846 static int e_end_resync_block(struct drbd_work *w, int unused)
1847 {
1848 	struct drbd_peer_request *peer_req =
1849 		container_of(w, struct drbd_peer_request, w);
1850 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1851 	struct drbd_device *device = peer_device->device;
1852 	sector_t sector = peer_req->i.sector;
1853 	int err;
1854 
1855 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1856 
1857 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1858 		drbd_set_in_sync(peer_device, sector, peer_req->i.size);
1859 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1860 	} else {
1861 		/* Record failure to sync */
1862 		drbd_rs_failed_io(peer_device, sector, peer_req->i.size);
1863 
1864 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1865 	}
1866 	dec_unacked(device);
1867 
1868 	return err;
1869 }
1870 
1871 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1872 			    struct packet_info *pi) __releases(local)
1873 {
1874 	struct drbd_device *device = peer_device->device;
1875 	struct drbd_peer_request *peer_req;
1876 
1877 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1878 	if (!peer_req)
1879 		goto fail;
1880 
1881 	dec_rs_pending(peer_device);
1882 
1883 	inc_unacked(device);
1884 	/* corresponding dec_unacked() in e_end_resync_block()
1885 	 * respective _drbd_clear_done_ee */
1886 
1887 	peer_req->w.cb = e_end_resync_block;
1888 	peer_req->opf = REQ_OP_WRITE;
1889 	peer_req->submit_jif = jiffies;
1890 
1891 	spin_lock_irq(&device->resource->req_lock);
1892 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1893 	spin_unlock_irq(&device->resource->req_lock);
1894 
1895 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1896 	if (drbd_submit_peer_request(peer_req) == 0)
1897 		return 0;
1898 
1899 	/* don't care for the reason here */
1900 	drbd_err(device, "submit failed, triggering re-connect\n");
1901 	spin_lock_irq(&device->resource->req_lock);
1902 	list_del(&peer_req->w.list);
1903 	spin_unlock_irq(&device->resource->req_lock);
1904 
1905 	drbd_free_peer_req(device, peer_req);
1906 fail:
1907 	put_ldev(device);
1908 	return -EIO;
1909 }
1910 
1911 static struct drbd_request *
1912 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1913 	     sector_t sector, bool missing_ok, const char *func)
1914 {
1915 	struct drbd_request *req;
1916 
1917 	/* Request object according to our peer */
1918 	req = (struct drbd_request *)(unsigned long)id;
1919 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1920 		return req;
1921 	if (!missing_ok) {
1922 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1923 			(unsigned long)id, (unsigned long long)sector);
1924 	}
1925 	return NULL;
1926 }
1927 
1928 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1929 {
1930 	struct drbd_peer_device *peer_device;
1931 	struct drbd_device *device;
1932 	struct drbd_request *req;
1933 	sector_t sector;
1934 	int err;
1935 	struct p_data *p = pi->data;
1936 
1937 	peer_device = conn_peer_device(connection, pi->vnr);
1938 	if (!peer_device)
1939 		return -EIO;
1940 	device = peer_device->device;
1941 
1942 	sector = be64_to_cpu(p->sector);
1943 
1944 	spin_lock_irq(&device->resource->req_lock);
1945 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1946 	spin_unlock_irq(&device->resource->req_lock);
1947 	if (unlikely(!req))
1948 		return -EIO;
1949 
1950 	err = recv_dless_read(peer_device, req, sector, pi->size);
1951 	if (!err)
1952 		req_mod(req, DATA_RECEIVED, peer_device);
1953 	/* else: nothing. handled from drbd_disconnect...
1954 	 * I don't think we may complete this just yet
1955 	 * in case we are "on-disconnect: freeze" */
1956 
1957 	return err;
1958 }
1959 
1960 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1961 {
1962 	struct drbd_peer_device *peer_device;
1963 	struct drbd_device *device;
1964 	sector_t sector;
1965 	int err;
1966 	struct p_data *p = pi->data;
1967 
1968 	peer_device = conn_peer_device(connection, pi->vnr);
1969 	if (!peer_device)
1970 		return -EIO;
1971 	device = peer_device->device;
1972 
1973 	sector = be64_to_cpu(p->sector);
1974 	D_ASSERT(device, p->block_id == ID_SYNCER);
1975 
1976 	if (get_ldev(device)) {
1977 		/* data is submitted to disk within recv_resync_read.
1978 		 * corresponding put_ldev done below on error,
1979 		 * or in drbd_peer_request_endio. */
1980 		err = recv_resync_read(peer_device, sector, pi);
1981 	} else {
1982 		if (drbd_ratelimit())
1983 			drbd_err(device, "Can not write resync data to local disk.\n");
1984 
1985 		err = drbd_drain_block(peer_device, pi->size);
1986 
1987 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1988 	}
1989 
1990 	atomic_add(pi->size >> 9, &device->rs_sect_in);
1991 
1992 	return err;
1993 }
1994 
1995 static void restart_conflicting_writes(struct drbd_device *device,
1996 				       sector_t sector, int size)
1997 {
1998 	struct drbd_interval *i;
1999 	struct drbd_request *req;
2000 
2001 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2002 		if (!i->local)
2003 			continue;
2004 		req = container_of(i, struct drbd_request, i);
2005 		if (req->rq_state & RQ_LOCAL_PENDING ||
2006 		    !(req->rq_state & RQ_POSTPONED))
2007 			continue;
2008 		/* as it is RQ_POSTPONED, this will cause it to
2009 		 * be queued on the retry workqueue. */
2010 		__req_mod(req, CONFLICT_RESOLVED, NULL, NULL);
2011 	}
2012 }
2013 
2014 /*
2015  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2016  */
2017 static int e_end_block(struct drbd_work *w, int cancel)
2018 {
2019 	struct drbd_peer_request *peer_req =
2020 		container_of(w, struct drbd_peer_request, w);
2021 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2022 	struct drbd_device *device = peer_device->device;
2023 	sector_t sector = peer_req->i.sector;
2024 	int err = 0, pcmd;
2025 
2026 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2027 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2028 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2029 				device->state.conn <= C_PAUSED_SYNC_T &&
2030 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2031 				P_RS_WRITE_ACK : P_WRITE_ACK;
2032 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2033 			if (pcmd == P_RS_WRITE_ACK)
2034 				drbd_set_in_sync(peer_device, sector, peer_req->i.size);
2035 		} else {
2036 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2037 			/* we expect it to be marked out of sync anyways...
2038 			 * maybe assert this?  */
2039 		}
2040 		dec_unacked(device);
2041 	}
2042 
2043 	/* we delete from the conflict detection hash _after_ we sent out the
2044 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2045 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2046 		spin_lock_irq(&device->resource->req_lock);
2047 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2048 		drbd_remove_epoch_entry_interval(device, peer_req);
2049 		if (peer_req->flags & EE_RESTART_REQUESTS)
2050 			restart_conflicting_writes(device, sector, peer_req->i.size);
2051 		spin_unlock_irq(&device->resource->req_lock);
2052 	} else
2053 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2054 
2055 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2056 
2057 	return err;
2058 }
2059 
2060 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2061 {
2062 	struct drbd_peer_request *peer_req =
2063 		container_of(w, struct drbd_peer_request, w);
2064 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2065 	int err;
2066 
2067 	err = drbd_send_ack(peer_device, ack, peer_req);
2068 	dec_unacked(peer_device->device);
2069 
2070 	return err;
2071 }
2072 
2073 static int e_send_superseded(struct drbd_work *w, int unused)
2074 {
2075 	return e_send_ack(w, P_SUPERSEDED);
2076 }
2077 
2078 static int e_send_retry_write(struct drbd_work *w, int unused)
2079 {
2080 	struct drbd_peer_request *peer_req =
2081 		container_of(w, struct drbd_peer_request, w);
2082 	struct drbd_connection *connection = peer_req->peer_device->connection;
2083 
2084 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2085 			     P_RETRY_WRITE : P_SUPERSEDED);
2086 }
2087 
2088 static bool seq_greater(u32 a, u32 b)
2089 {
2090 	/*
2091 	 * We assume 32-bit wrap-around here.
2092 	 * For 24-bit wrap-around, we would have to shift:
2093 	 *  a <<= 8; b <<= 8;
2094 	 */
2095 	return (s32)a - (s32)b > 0;
2096 }
2097 
2098 static u32 seq_max(u32 a, u32 b)
2099 {
2100 	return seq_greater(a, b) ? a : b;
2101 }
2102 
2103 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2104 {
2105 	struct drbd_device *device = peer_device->device;
2106 	unsigned int newest_peer_seq;
2107 
2108 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2109 		spin_lock(&device->peer_seq_lock);
2110 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2111 		device->peer_seq = newest_peer_seq;
2112 		spin_unlock(&device->peer_seq_lock);
2113 		/* wake up only if we actually changed device->peer_seq */
2114 		if (peer_seq == newest_peer_seq)
2115 			wake_up(&device->seq_wait);
2116 	}
2117 }
2118 
2119 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2120 {
2121 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2122 }
2123 
2124 /* maybe change sync_ee into interval trees as well? */
2125 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2126 {
2127 	struct drbd_peer_request *rs_req;
2128 	bool rv = false;
2129 
2130 	spin_lock_irq(&device->resource->req_lock);
2131 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2132 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2133 			     rs_req->i.sector, rs_req->i.size)) {
2134 			rv = true;
2135 			break;
2136 		}
2137 	}
2138 	spin_unlock_irq(&device->resource->req_lock);
2139 
2140 	return rv;
2141 }
2142 
2143 /* Called from receive_Data.
2144  * Synchronize packets on sock with packets on msock.
2145  *
2146  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2147  * packet traveling on msock, they are still processed in the order they have
2148  * been sent.
2149  *
2150  * Note: we don't care for Ack packets overtaking P_DATA packets.
2151  *
2152  * In case packet_seq is larger than device->peer_seq number, there are
2153  * outstanding packets on the msock. We wait for them to arrive.
2154  * In case we are the logically next packet, we update device->peer_seq
2155  * ourselves. Correctly handles 32bit wrap around.
2156  *
2157  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2158  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2159  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2160  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2161  *
2162  * returns 0 if we may process the packet,
2163  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2164 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2165 {
2166 	struct drbd_device *device = peer_device->device;
2167 	DEFINE_WAIT(wait);
2168 	long timeout;
2169 	int ret = 0, tp;
2170 
2171 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2172 		return 0;
2173 
2174 	spin_lock(&device->peer_seq_lock);
2175 	for (;;) {
2176 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2177 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2178 			break;
2179 		}
2180 
2181 		if (signal_pending(current)) {
2182 			ret = -ERESTARTSYS;
2183 			break;
2184 		}
2185 
2186 		rcu_read_lock();
2187 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2188 		rcu_read_unlock();
2189 
2190 		if (!tp)
2191 			break;
2192 
2193 		/* Only need to wait if two_primaries is enabled */
2194 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2195 		spin_unlock(&device->peer_seq_lock);
2196 		rcu_read_lock();
2197 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2198 		rcu_read_unlock();
2199 		timeout = schedule_timeout(timeout);
2200 		spin_lock(&device->peer_seq_lock);
2201 		if (!timeout) {
2202 			ret = -ETIMEDOUT;
2203 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2204 			break;
2205 		}
2206 	}
2207 	spin_unlock(&device->peer_seq_lock);
2208 	finish_wait(&device->seq_wait, &wait);
2209 	return ret;
2210 }
2211 
2212 static enum req_op wire_flags_to_bio_op(u32 dpf)
2213 {
2214 	if (dpf & DP_ZEROES)
2215 		return REQ_OP_WRITE_ZEROES;
2216 	if (dpf & DP_DISCARD)
2217 		return REQ_OP_DISCARD;
2218 	else
2219 		return REQ_OP_WRITE;
2220 }
2221 
2222 /* see also bio_flags_to_wire() */
2223 static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf)
2224 {
2225 	return wire_flags_to_bio_op(dpf) |
2226 		(dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2227 		(dpf & DP_FUA ? REQ_FUA : 0) |
2228 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2229 }
2230 
2231 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2232 				    unsigned int size)
2233 {
2234 	struct drbd_peer_device *peer_device = first_peer_device(device);
2235 	struct drbd_interval *i;
2236 
2237     repeat:
2238 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2239 		struct drbd_request *req;
2240 		struct bio_and_error m;
2241 
2242 		if (!i->local)
2243 			continue;
2244 		req = container_of(i, struct drbd_request, i);
2245 		if (!(req->rq_state & RQ_POSTPONED))
2246 			continue;
2247 		req->rq_state &= ~RQ_POSTPONED;
2248 		__req_mod(req, NEG_ACKED, peer_device, &m);
2249 		spin_unlock_irq(&device->resource->req_lock);
2250 		if (m.bio)
2251 			complete_master_bio(device, &m);
2252 		spin_lock_irq(&device->resource->req_lock);
2253 		goto repeat;
2254 	}
2255 }
2256 
2257 static int handle_write_conflicts(struct drbd_device *device,
2258 				  struct drbd_peer_request *peer_req)
2259 {
2260 	struct drbd_connection *connection = peer_req->peer_device->connection;
2261 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2262 	sector_t sector = peer_req->i.sector;
2263 	const unsigned int size = peer_req->i.size;
2264 	struct drbd_interval *i;
2265 	bool equal;
2266 	int err;
2267 
2268 	/*
2269 	 * Inserting the peer request into the write_requests tree will prevent
2270 	 * new conflicting local requests from being added.
2271 	 */
2272 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2273 
2274     repeat:
2275 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2276 		if (i == &peer_req->i)
2277 			continue;
2278 		if (i->completed)
2279 			continue;
2280 
2281 		if (!i->local) {
2282 			/*
2283 			 * Our peer has sent a conflicting remote request; this
2284 			 * should not happen in a two-node setup.  Wait for the
2285 			 * earlier peer request to complete.
2286 			 */
2287 			err = drbd_wait_misc(device, i);
2288 			if (err)
2289 				goto out;
2290 			goto repeat;
2291 		}
2292 
2293 		equal = i->sector == sector && i->size == size;
2294 		if (resolve_conflicts) {
2295 			/*
2296 			 * If the peer request is fully contained within the
2297 			 * overlapping request, it can be considered overwritten
2298 			 * and thus superseded; otherwise, it will be retried
2299 			 * once all overlapping requests have completed.
2300 			 */
2301 			bool superseded = i->sector <= sector && i->sector +
2302 				       (i->size >> 9) >= sector + (size >> 9);
2303 
2304 			if (!equal)
2305 				drbd_alert(device, "Concurrent writes detected: "
2306 					       "local=%llus +%u, remote=%llus +%u, "
2307 					       "assuming %s came first\n",
2308 					  (unsigned long long)i->sector, i->size,
2309 					  (unsigned long long)sector, size,
2310 					  superseded ? "local" : "remote");
2311 
2312 			peer_req->w.cb = superseded ? e_send_superseded :
2313 						   e_send_retry_write;
2314 			list_add_tail(&peer_req->w.list, &device->done_ee);
2315 			/* put is in drbd_send_acks_wf() */
2316 			kref_get(&device->kref);
2317 			if (!queue_work(connection->ack_sender,
2318 					&peer_req->peer_device->send_acks_work))
2319 				kref_put(&device->kref, drbd_destroy_device);
2320 
2321 			err = -ENOENT;
2322 			goto out;
2323 		} else {
2324 			struct drbd_request *req =
2325 				container_of(i, struct drbd_request, i);
2326 
2327 			if (!equal)
2328 				drbd_alert(device, "Concurrent writes detected: "
2329 					       "local=%llus +%u, remote=%llus +%u\n",
2330 					  (unsigned long long)i->sector, i->size,
2331 					  (unsigned long long)sector, size);
2332 
2333 			if (req->rq_state & RQ_LOCAL_PENDING ||
2334 			    !(req->rq_state & RQ_POSTPONED)) {
2335 				/*
2336 				 * Wait for the node with the discard flag to
2337 				 * decide if this request has been superseded
2338 				 * or needs to be retried.
2339 				 * Requests that have been superseded will
2340 				 * disappear from the write_requests tree.
2341 				 *
2342 				 * In addition, wait for the conflicting
2343 				 * request to finish locally before submitting
2344 				 * the conflicting peer request.
2345 				 */
2346 				err = drbd_wait_misc(device, &req->i);
2347 				if (err) {
2348 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2349 					fail_postponed_requests(device, sector, size);
2350 					goto out;
2351 				}
2352 				goto repeat;
2353 			}
2354 			/*
2355 			 * Remember to restart the conflicting requests after
2356 			 * the new peer request has completed.
2357 			 */
2358 			peer_req->flags |= EE_RESTART_REQUESTS;
2359 		}
2360 	}
2361 	err = 0;
2362 
2363     out:
2364 	if (err)
2365 		drbd_remove_epoch_entry_interval(device, peer_req);
2366 	return err;
2367 }
2368 
2369 /* mirrored write */
2370 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2371 {
2372 	struct drbd_peer_device *peer_device;
2373 	struct drbd_device *device;
2374 	struct net_conf *nc;
2375 	sector_t sector;
2376 	struct drbd_peer_request *peer_req;
2377 	struct p_data *p = pi->data;
2378 	u32 peer_seq = be32_to_cpu(p->seq_num);
2379 	u32 dp_flags;
2380 	int err, tp;
2381 
2382 	peer_device = conn_peer_device(connection, pi->vnr);
2383 	if (!peer_device)
2384 		return -EIO;
2385 	device = peer_device->device;
2386 
2387 	if (!get_ldev(device)) {
2388 		int err2;
2389 
2390 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2391 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2392 		atomic_inc(&connection->current_epoch->epoch_size);
2393 		err2 = drbd_drain_block(peer_device, pi->size);
2394 		if (!err)
2395 			err = err2;
2396 		return err;
2397 	}
2398 
2399 	/*
2400 	 * Corresponding put_ldev done either below (on various errors), or in
2401 	 * drbd_peer_request_endio, if we successfully submit the data at the
2402 	 * end of this function.
2403 	 */
2404 
2405 	sector = be64_to_cpu(p->sector);
2406 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2407 	if (!peer_req) {
2408 		put_ldev(device);
2409 		return -EIO;
2410 	}
2411 
2412 	peer_req->w.cb = e_end_block;
2413 	peer_req->submit_jif = jiffies;
2414 	peer_req->flags |= EE_APPLICATION;
2415 
2416 	dp_flags = be32_to_cpu(p->dp_flags);
2417 	peer_req->opf = wire_flags_to_bio(connection, dp_flags);
2418 	if (pi->cmd == P_TRIM) {
2419 		D_ASSERT(peer_device, peer_req->i.size > 0);
2420 		D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD);
2421 		D_ASSERT(peer_device, peer_req->pages == NULL);
2422 		/* need to play safe: an older DRBD sender
2423 		 * may mean zero-out while sending P_TRIM. */
2424 		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2425 			peer_req->flags |= EE_ZEROOUT;
2426 	} else if (pi->cmd == P_ZEROES) {
2427 		D_ASSERT(peer_device, peer_req->i.size > 0);
2428 		D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES);
2429 		D_ASSERT(peer_device, peer_req->pages == NULL);
2430 		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2431 		if (dp_flags & DP_DISCARD)
2432 			peer_req->flags |= EE_TRIM;
2433 	} else if (peer_req->pages == NULL) {
2434 		D_ASSERT(device, peer_req->i.size == 0);
2435 		D_ASSERT(device, dp_flags & DP_FLUSH);
2436 	}
2437 
2438 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2439 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2440 
2441 	spin_lock(&connection->epoch_lock);
2442 	peer_req->epoch = connection->current_epoch;
2443 	atomic_inc(&peer_req->epoch->epoch_size);
2444 	atomic_inc(&peer_req->epoch->active);
2445 	spin_unlock(&connection->epoch_lock);
2446 
2447 	rcu_read_lock();
2448 	nc = rcu_dereference(peer_device->connection->net_conf);
2449 	tp = nc->two_primaries;
2450 	if (peer_device->connection->agreed_pro_version < 100) {
2451 		switch (nc->wire_protocol) {
2452 		case DRBD_PROT_C:
2453 			dp_flags |= DP_SEND_WRITE_ACK;
2454 			break;
2455 		case DRBD_PROT_B:
2456 			dp_flags |= DP_SEND_RECEIVE_ACK;
2457 			break;
2458 		}
2459 	}
2460 	rcu_read_unlock();
2461 
2462 	if (dp_flags & DP_SEND_WRITE_ACK) {
2463 		peer_req->flags |= EE_SEND_WRITE_ACK;
2464 		inc_unacked(device);
2465 		/* corresponding dec_unacked() in e_end_block()
2466 		 * respective _drbd_clear_done_ee */
2467 	}
2468 
2469 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2470 		/* I really don't like it that the receiver thread
2471 		 * sends on the msock, but anyways */
2472 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2473 	}
2474 
2475 	if (tp) {
2476 		/* two primaries implies protocol C */
2477 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2478 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2479 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2480 		if (err)
2481 			goto out_interrupted;
2482 		spin_lock_irq(&device->resource->req_lock);
2483 		err = handle_write_conflicts(device, peer_req);
2484 		if (err) {
2485 			spin_unlock_irq(&device->resource->req_lock);
2486 			if (err == -ENOENT) {
2487 				put_ldev(device);
2488 				return 0;
2489 			}
2490 			goto out_interrupted;
2491 		}
2492 	} else {
2493 		update_peer_seq(peer_device, peer_seq);
2494 		spin_lock_irq(&device->resource->req_lock);
2495 	}
2496 	/* TRIM and is processed synchronously,
2497 	 * we wait for all pending requests, respectively wait for
2498 	 * active_ee to become empty in drbd_submit_peer_request();
2499 	 * better not add ourselves here. */
2500 	if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
2501 		list_add_tail(&peer_req->w.list, &device->active_ee);
2502 	spin_unlock_irq(&device->resource->req_lock);
2503 
2504 	if (device->state.conn == C_SYNC_TARGET)
2505 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2506 
2507 	if (device->state.pdsk < D_INCONSISTENT) {
2508 		/* In case we have the only disk of the cluster, */
2509 		drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
2510 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2511 		drbd_al_begin_io(device, &peer_req->i);
2512 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2513 	}
2514 
2515 	err = drbd_submit_peer_request(peer_req);
2516 	if (!err)
2517 		return 0;
2518 
2519 	/* don't care for the reason here */
2520 	drbd_err(device, "submit failed, triggering re-connect\n");
2521 	spin_lock_irq(&device->resource->req_lock);
2522 	list_del(&peer_req->w.list);
2523 	drbd_remove_epoch_entry_interval(device, peer_req);
2524 	spin_unlock_irq(&device->resource->req_lock);
2525 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2526 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2527 		drbd_al_complete_io(device, &peer_req->i);
2528 	}
2529 
2530 out_interrupted:
2531 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2532 	put_ldev(device);
2533 	drbd_free_peer_req(device, peer_req);
2534 	return err;
2535 }
2536 
2537 /* We may throttle resync, if the lower device seems to be busy,
2538  * and current sync rate is above c_min_rate.
2539  *
2540  * To decide whether or not the lower device is busy, we use a scheme similar
2541  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2542  * (more than 64 sectors) of activity we cannot account for with our own resync
2543  * activity, it obviously is "busy".
2544  *
2545  * The current sync rate used here uses only the most recent two step marks,
2546  * to have a short time average so we can react faster.
2547  */
2548 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector,
2549 		bool throttle_if_app_is_waiting)
2550 {
2551 	struct drbd_device *device = peer_device->device;
2552 	struct lc_element *tmp;
2553 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2554 
2555 	if (!throttle || throttle_if_app_is_waiting)
2556 		return throttle;
2557 
2558 	spin_lock_irq(&device->al_lock);
2559 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2560 	if (tmp) {
2561 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2562 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2563 			throttle = false;
2564 		/* Do not slow down if app IO is already waiting for this extent,
2565 		 * and our progress is necessary for application IO to complete. */
2566 	}
2567 	spin_unlock_irq(&device->al_lock);
2568 
2569 	return throttle;
2570 }
2571 
2572 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2573 {
2574 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2575 	unsigned long db, dt, dbdt;
2576 	unsigned int c_min_rate;
2577 	int curr_events;
2578 
2579 	rcu_read_lock();
2580 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2581 	rcu_read_unlock();
2582 
2583 	/* feature disabled? */
2584 	if (c_min_rate == 0)
2585 		return false;
2586 
2587 	curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2588 			atomic_read(&device->rs_sect_ev);
2589 
2590 	if (atomic_read(&device->ap_actlog_cnt)
2591 	    || curr_events - device->rs_last_events > 64) {
2592 		unsigned long rs_left;
2593 		int i;
2594 
2595 		device->rs_last_events = curr_events;
2596 
2597 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2598 		 * approx. */
2599 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2600 
2601 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2602 			rs_left = device->ov_left;
2603 		else
2604 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2605 
2606 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2607 		if (!dt)
2608 			dt++;
2609 		db = device->rs_mark_left[i] - rs_left;
2610 		dbdt = Bit2KB(db/dt);
2611 
2612 		if (dbdt > c_min_rate)
2613 			return true;
2614 	}
2615 	return false;
2616 }
2617 
2618 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2619 {
2620 	struct drbd_peer_device *peer_device;
2621 	struct drbd_device *device;
2622 	sector_t sector;
2623 	sector_t capacity;
2624 	struct drbd_peer_request *peer_req;
2625 	struct digest_info *di = NULL;
2626 	int size, verb;
2627 	struct p_block_req *p =	pi->data;
2628 
2629 	peer_device = conn_peer_device(connection, pi->vnr);
2630 	if (!peer_device)
2631 		return -EIO;
2632 	device = peer_device->device;
2633 	capacity = get_capacity(device->vdisk);
2634 
2635 	sector = be64_to_cpu(p->sector);
2636 	size   = be32_to_cpu(p->blksize);
2637 
2638 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2639 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2640 				(unsigned long long)sector, size);
2641 		return -EINVAL;
2642 	}
2643 	if (sector + (size>>9) > capacity) {
2644 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2645 				(unsigned long long)sector, size);
2646 		return -EINVAL;
2647 	}
2648 
2649 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2650 		verb = 1;
2651 		switch (pi->cmd) {
2652 		case P_DATA_REQUEST:
2653 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2654 			break;
2655 		case P_RS_THIN_REQ:
2656 		case P_RS_DATA_REQUEST:
2657 		case P_CSUM_RS_REQUEST:
2658 		case P_OV_REQUEST:
2659 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2660 			break;
2661 		case P_OV_REPLY:
2662 			verb = 0;
2663 			dec_rs_pending(peer_device);
2664 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2665 			break;
2666 		default:
2667 			BUG();
2668 		}
2669 		if (verb && drbd_ratelimit())
2670 			drbd_err(device, "Can not satisfy peer's read request, "
2671 			    "no local data.\n");
2672 
2673 		/* drain possibly payload */
2674 		return drbd_drain_block(peer_device, pi->size);
2675 	}
2676 
2677 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2678 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2679 	 * which in turn might block on the other node at this very place.  */
2680 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2681 			size, GFP_NOIO);
2682 	if (!peer_req) {
2683 		put_ldev(device);
2684 		return -ENOMEM;
2685 	}
2686 	peer_req->opf = REQ_OP_READ;
2687 
2688 	switch (pi->cmd) {
2689 	case P_DATA_REQUEST:
2690 		peer_req->w.cb = w_e_end_data_req;
2691 		/* application IO, don't drbd_rs_begin_io */
2692 		peer_req->flags |= EE_APPLICATION;
2693 		goto submit;
2694 
2695 	case P_RS_THIN_REQ:
2696 		/* If at some point in the future we have a smart way to
2697 		   find out if this data block is completely deallocated,
2698 		   then we would do something smarter here than reading
2699 		   the block... */
2700 		peer_req->flags |= EE_RS_THIN_REQ;
2701 		fallthrough;
2702 	case P_RS_DATA_REQUEST:
2703 		peer_req->w.cb = w_e_end_rsdata_req;
2704 		/* used in the sector offset progress display */
2705 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2706 		break;
2707 
2708 	case P_OV_REPLY:
2709 	case P_CSUM_RS_REQUEST:
2710 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2711 		if (!di)
2712 			goto out_free_e;
2713 
2714 		di->digest_size = pi->size;
2715 		di->digest = (((char *)di)+sizeof(struct digest_info));
2716 
2717 		peer_req->digest = di;
2718 		peer_req->flags |= EE_HAS_DIGEST;
2719 
2720 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2721 			goto out_free_e;
2722 
2723 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2724 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2725 			peer_req->w.cb = w_e_end_csum_rs_req;
2726 			/* used in the sector offset progress display */
2727 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2728 			/* remember to report stats in drbd_resync_finished */
2729 			device->use_csums = true;
2730 		} else if (pi->cmd == P_OV_REPLY) {
2731 			/* track progress, we may need to throttle */
2732 			atomic_add(size >> 9, &device->rs_sect_in);
2733 			peer_req->w.cb = w_e_end_ov_reply;
2734 			dec_rs_pending(peer_device);
2735 			/* drbd_rs_begin_io done when we sent this request,
2736 			 * but accounting still needs to be done. */
2737 			goto submit_for_resync;
2738 		}
2739 		break;
2740 
2741 	case P_OV_REQUEST:
2742 		if (device->ov_start_sector == ~(sector_t)0 &&
2743 		    peer_device->connection->agreed_pro_version >= 90) {
2744 			unsigned long now = jiffies;
2745 			int i;
2746 			device->ov_start_sector = sector;
2747 			device->ov_position = sector;
2748 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2749 			device->rs_total = device->ov_left;
2750 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2751 				device->rs_mark_left[i] = device->ov_left;
2752 				device->rs_mark_time[i] = now;
2753 			}
2754 			drbd_info(device, "Online Verify start sector: %llu\n",
2755 					(unsigned long long)sector);
2756 		}
2757 		peer_req->w.cb = w_e_end_ov_req;
2758 		break;
2759 
2760 	default:
2761 		BUG();
2762 	}
2763 
2764 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2765 	 * wrt the receiver, but it is not as straightforward as it may seem.
2766 	 * Various places in the resync start and stop logic assume resync
2767 	 * requests are processed in order, requeuing this on the worker thread
2768 	 * introduces a bunch of new code for synchronization between threads.
2769 	 *
2770 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2771 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2772 	 * for application writes for the same time.  For now, just throttle
2773 	 * here, where the rest of the code expects the receiver to sleep for
2774 	 * a while, anyways.
2775 	 */
2776 
2777 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2778 	 * this defers syncer requests for some time, before letting at least
2779 	 * on request through.  The resync controller on the receiving side
2780 	 * will adapt to the incoming rate accordingly.
2781 	 *
2782 	 * We cannot throttle here if remote is Primary/SyncTarget:
2783 	 * we would also throttle its application reads.
2784 	 * In that case, throttling is done on the SyncTarget only.
2785 	 */
2786 
2787 	/* Even though this may be a resync request, we do add to "read_ee";
2788 	 * "sync_ee" is only used for resync WRITEs.
2789 	 * Add to list early, so debugfs can find this request
2790 	 * even if we have to sleep below. */
2791 	spin_lock_irq(&device->resource->req_lock);
2792 	list_add_tail(&peer_req->w.list, &device->read_ee);
2793 	spin_unlock_irq(&device->resource->req_lock);
2794 
2795 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2796 	if (device->state.peer != R_PRIMARY
2797 	&& drbd_rs_should_slow_down(peer_device, sector, false))
2798 		schedule_timeout_uninterruptible(HZ/10);
2799 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2800 	if (drbd_rs_begin_io(device, sector))
2801 		goto out_free_e;
2802 
2803 submit_for_resync:
2804 	atomic_add(size >> 9, &device->rs_sect_ev);
2805 
2806 submit:
2807 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2808 	inc_unacked(device);
2809 	if (drbd_submit_peer_request(peer_req) == 0)
2810 		return 0;
2811 
2812 	/* don't care for the reason here */
2813 	drbd_err(device, "submit failed, triggering re-connect\n");
2814 
2815 out_free_e:
2816 	spin_lock_irq(&device->resource->req_lock);
2817 	list_del(&peer_req->w.list);
2818 	spin_unlock_irq(&device->resource->req_lock);
2819 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2820 
2821 	put_ldev(device);
2822 	drbd_free_peer_req(device, peer_req);
2823 	return -EIO;
2824 }
2825 
2826 /*
2827  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2828  */
2829 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2830 {
2831 	struct drbd_device *device = peer_device->device;
2832 	int self, peer, rv = -100;
2833 	unsigned long ch_self, ch_peer;
2834 	enum drbd_after_sb_p after_sb_0p;
2835 
2836 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2837 	peer = device->p_uuid[UI_BITMAP] & 1;
2838 
2839 	ch_peer = device->p_uuid[UI_SIZE];
2840 	ch_self = device->comm_bm_set;
2841 
2842 	rcu_read_lock();
2843 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2844 	rcu_read_unlock();
2845 	switch (after_sb_0p) {
2846 	case ASB_CONSENSUS:
2847 	case ASB_DISCARD_SECONDARY:
2848 	case ASB_CALL_HELPER:
2849 	case ASB_VIOLENTLY:
2850 		drbd_err(device, "Configuration error.\n");
2851 		break;
2852 	case ASB_DISCONNECT:
2853 		break;
2854 	case ASB_DISCARD_YOUNGER_PRI:
2855 		if (self == 0 && peer == 1) {
2856 			rv = -1;
2857 			break;
2858 		}
2859 		if (self == 1 && peer == 0) {
2860 			rv =  1;
2861 			break;
2862 		}
2863 		fallthrough;	/* to one of the other strategies */
2864 	case ASB_DISCARD_OLDER_PRI:
2865 		if (self == 0 && peer == 1) {
2866 			rv = 1;
2867 			break;
2868 		}
2869 		if (self == 1 && peer == 0) {
2870 			rv = -1;
2871 			break;
2872 		}
2873 		/* Else fall through to one of the other strategies... */
2874 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2875 		     "Using discard-least-changes instead\n");
2876 		fallthrough;
2877 	case ASB_DISCARD_ZERO_CHG:
2878 		if (ch_peer == 0 && ch_self == 0) {
2879 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2880 				? -1 : 1;
2881 			break;
2882 		} else {
2883 			if (ch_peer == 0) { rv =  1; break; }
2884 			if (ch_self == 0) { rv = -1; break; }
2885 		}
2886 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2887 			break;
2888 		fallthrough;
2889 	case ASB_DISCARD_LEAST_CHG:
2890 		if	(ch_self < ch_peer)
2891 			rv = -1;
2892 		else if (ch_self > ch_peer)
2893 			rv =  1;
2894 		else /* ( ch_self == ch_peer ) */
2895 		     /* Well, then use something else. */
2896 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2897 				? -1 : 1;
2898 		break;
2899 	case ASB_DISCARD_LOCAL:
2900 		rv = -1;
2901 		break;
2902 	case ASB_DISCARD_REMOTE:
2903 		rv =  1;
2904 	}
2905 
2906 	return rv;
2907 }
2908 
2909 /*
2910  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2911  */
2912 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2913 {
2914 	struct drbd_device *device = peer_device->device;
2915 	int hg, rv = -100;
2916 	enum drbd_after_sb_p after_sb_1p;
2917 
2918 	rcu_read_lock();
2919 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2920 	rcu_read_unlock();
2921 	switch (after_sb_1p) {
2922 	case ASB_DISCARD_YOUNGER_PRI:
2923 	case ASB_DISCARD_OLDER_PRI:
2924 	case ASB_DISCARD_LEAST_CHG:
2925 	case ASB_DISCARD_LOCAL:
2926 	case ASB_DISCARD_REMOTE:
2927 	case ASB_DISCARD_ZERO_CHG:
2928 		drbd_err(device, "Configuration error.\n");
2929 		break;
2930 	case ASB_DISCONNECT:
2931 		break;
2932 	case ASB_CONSENSUS:
2933 		hg = drbd_asb_recover_0p(peer_device);
2934 		if (hg == -1 && device->state.role == R_SECONDARY)
2935 			rv = hg;
2936 		if (hg == 1  && device->state.role == R_PRIMARY)
2937 			rv = hg;
2938 		break;
2939 	case ASB_VIOLENTLY:
2940 		rv = drbd_asb_recover_0p(peer_device);
2941 		break;
2942 	case ASB_DISCARD_SECONDARY:
2943 		return device->state.role == R_PRIMARY ? 1 : -1;
2944 	case ASB_CALL_HELPER:
2945 		hg = drbd_asb_recover_0p(peer_device);
2946 		if (hg == -1 && device->state.role == R_PRIMARY) {
2947 			enum drbd_state_rv rv2;
2948 
2949 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2950 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2951 			  * we do not need to wait for the after state change work either. */
2952 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2953 			if (rv2 != SS_SUCCESS) {
2954 				drbd_khelper(device, "pri-lost-after-sb");
2955 			} else {
2956 				drbd_warn(device, "Successfully gave up primary role.\n");
2957 				rv = hg;
2958 			}
2959 		} else
2960 			rv = hg;
2961 	}
2962 
2963 	return rv;
2964 }
2965 
2966 /*
2967  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2968  */
2969 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2970 {
2971 	struct drbd_device *device = peer_device->device;
2972 	int hg, rv = -100;
2973 	enum drbd_after_sb_p after_sb_2p;
2974 
2975 	rcu_read_lock();
2976 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2977 	rcu_read_unlock();
2978 	switch (after_sb_2p) {
2979 	case ASB_DISCARD_YOUNGER_PRI:
2980 	case ASB_DISCARD_OLDER_PRI:
2981 	case ASB_DISCARD_LEAST_CHG:
2982 	case ASB_DISCARD_LOCAL:
2983 	case ASB_DISCARD_REMOTE:
2984 	case ASB_CONSENSUS:
2985 	case ASB_DISCARD_SECONDARY:
2986 	case ASB_DISCARD_ZERO_CHG:
2987 		drbd_err(device, "Configuration error.\n");
2988 		break;
2989 	case ASB_VIOLENTLY:
2990 		rv = drbd_asb_recover_0p(peer_device);
2991 		break;
2992 	case ASB_DISCONNECT:
2993 		break;
2994 	case ASB_CALL_HELPER:
2995 		hg = drbd_asb_recover_0p(peer_device);
2996 		if (hg == -1) {
2997 			enum drbd_state_rv rv2;
2998 
2999 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3000 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3001 			  * we do not need to wait for the after state change work either. */
3002 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3003 			if (rv2 != SS_SUCCESS) {
3004 				drbd_khelper(device, "pri-lost-after-sb");
3005 			} else {
3006 				drbd_warn(device, "Successfully gave up primary role.\n");
3007 				rv = hg;
3008 			}
3009 		} else
3010 			rv = hg;
3011 	}
3012 
3013 	return rv;
3014 }
3015 
3016 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3017 			   u64 bits, u64 flags)
3018 {
3019 	if (!uuid) {
3020 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3021 		return;
3022 	}
3023 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3024 	     text,
3025 	     (unsigned long long)uuid[UI_CURRENT],
3026 	     (unsigned long long)uuid[UI_BITMAP],
3027 	     (unsigned long long)uuid[UI_HISTORY_START],
3028 	     (unsigned long long)uuid[UI_HISTORY_END],
3029 	     (unsigned long long)bits,
3030 	     (unsigned long long)flags);
3031 }
3032 
3033 /*
3034   100	after split brain try auto recover
3035     2	C_SYNC_SOURCE set BitMap
3036     1	C_SYNC_SOURCE use BitMap
3037     0	no Sync
3038    -1	C_SYNC_TARGET use BitMap
3039    -2	C_SYNC_TARGET set BitMap
3040  -100	after split brain, disconnect
3041 -1000	unrelated data
3042 -1091   requires proto 91
3043 -1096   requires proto 96
3044  */
3045 
3046 static int drbd_uuid_compare(struct drbd_peer_device *const peer_device,
3047 		enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3048 {
3049 	struct drbd_connection *const connection = peer_device->connection;
3050 	struct drbd_device *device = peer_device->device;
3051 	u64 self, peer;
3052 	int i, j;
3053 
3054 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3055 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3056 
3057 	*rule_nr = 10;
3058 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3059 		return 0;
3060 
3061 	*rule_nr = 20;
3062 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3063 	     peer != UUID_JUST_CREATED)
3064 		return -2;
3065 
3066 	*rule_nr = 30;
3067 	if (self != UUID_JUST_CREATED &&
3068 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3069 		return 2;
3070 
3071 	if (self == peer) {
3072 		int rct, dc; /* roles at crash time */
3073 
3074 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3075 
3076 			if (connection->agreed_pro_version < 91)
3077 				return -1091;
3078 
3079 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3080 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3081 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3082 				drbd_uuid_move_history(device);
3083 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3084 				device->ldev->md.uuid[UI_BITMAP] = 0;
3085 
3086 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3087 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3088 				*rule_nr = 34;
3089 			} else {
3090 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3091 				*rule_nr = 36;
3092 			}
3093 
3094 			return 1;
3095 		}
3096 
3097 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3098 
3099 			if (connection->agreed_pro_version < 91)
3100 				return -1091;
3101 
3102 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3103 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3104 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3105 
3106 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3107 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3108 				device->p_uuid[UI_BITMAP] = 0UL;
3109 
3110 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3111 				*rule_nr = 35;
3112 			} else {
3113 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3114 				*rule_nr = 37;
3115 			}
3116 
3117 			return -1;
3118 		}
3119 
3120 		/* Common power [off|failure] */
3121 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3122 			(device->p_uuid[UI_FLAGS] & 2);
3123 		/* lowest bit is set when we were primary,
3124 		 * next bit (weight 2) is set when peer was primary */
3125 		*rule_nr = 40;
3126 
3127 		/* Neither has the "crashed primary" flag set,
3128 		 * only a replication link hickup. */
3129 		if (rct == 0)
3130 			return 0;
3131 
3132 		/* Current UUID equal and no bitmap uuid; does not necessarily
3133 		 * mean this was a "simultaneous hard crash", maybe IO was
3134 		 * frozen, so no UUID-bump happened.
3135 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3136 		 * for "new-enough" peer DRBD version. */
3137 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3138 			*rule_nr = 41;
3139 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3140 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3141 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3142 			}
3143 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3144 				/* At least one has the "crashed primary" bit set,
3145 				 * both are primary now, but neither has rotated its UUIDs?
3146 				 * "Can not happen." */
3147 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3148 				return -100;
3149 			}
3150 			if (device->state.role == R_PRIMARY)
3151 				return 1;
3152 			return -1;
3153 		}
3154 
3155 		/* Both are secondary.
3156 		 * Really looks like recovery from simultaneous hard crash.
3157 		 * Check which had been primary before, and arbitrate. */
3158 		switch (rct) {
3159 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3160 		case 1: /*  self_pri && !peer_pri */ return 1;
3161 		case 2: /* !self_pri &&  peer_pri */ return -1;
3162 		case 3: /*  self_pri &&  peer_pri */
3163 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3164 			return dc ? -1 : 1;
3165 		}
3166 	}
3167 
3168 	*rule_nr = 50;
3169 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3170 	if (self == peer)
3171 		return -1;
3172 
3173 	*rule_nr = 51;
3174 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3175 	if (self == peer) {
3176 		if (connection->agreed_pro_version < 96 ?
3177 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3178 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3179 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3180 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3181 			   resync as sync source modifications of the peer's UUIDs. */
3182 
3183 			if (connection->agreed_pro_version < 91)
3184 				return -1091;
3185 
3186 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3187 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3188 
3189 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3190 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3191 
3192 			return -1;
3193 		}
3194 	}
3195 
3196 	*rule_nr = 60;
3197 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3198 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3199 		peer = device->p_uuid[i] & ~((u64)1);
3200 		if (self == peer)
3201 			return -2;
3202 	}
3203 
3204 	*rule_nr = 70;
3205 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3206 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207 	if (self == peer)
3208 		return 1;
3209 
3210 	*rule_nr = 71;
3211 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3212 	if (self == peer) {
3213 		if (connection->agreed_pro_version < 96 ?
3214 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3215 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3216 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3217 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3218 			   resync as sync source modifications of our UUIDs. */
3219 
3220 			if (connection->agreed_pro_version < 91)
3221 				return -1091;
3222 
3223 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3224 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3225 
3226 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3227 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3228 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3229 
3230 			return 1;
3231 		}
3232 	}
3233 
3234 
3235 	*rule_nr = 80;
3236 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3237 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3238 		self = device->ldev->md.uuid[i] & ~((u64)1);
3239 		if (self == peer)
3240 			return 2;
3241 	}
3242 
3243 	*rule_nr = 90;
3244 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3245 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3246 	if (self == peer && self != ((u64)0))
3247 		return 100;
3248 
3249 	*rule_nr = 100;
3250 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3251 		self = device->ldev->md.uuid[i] & ~((u64)1);
3252 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3253 			peer = device->p_uuid[j] & ~((u64)1);
3254 			if (self == peer)
3255 				return -100;
3256 		}
3257 	}
3258 
3259 	return -1000;
3260 }
3261 
3262 /* drbd_sync_handshake() returns the new conn state on success, or
3263    CONN_MASK (-1) on failure.
3264  */
3265 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3266 					   enum drbd_role peer_role,
3267 					   enum drbd_disk_state peer_disk) __must_hold(local)
3268 {
3269 	struct drbd_device *device = peer_device->device;
3270 	enum drbd_conns rv = C_MASK;
3271 	enum drbd_disk_state mydisk;
3272 	struct net_conf *nc;
3273 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3274 
3275 	mydisk = device->state.disk;
3276 	if (mydisk == D_NEGOTIATING)
3277 		mydisk = device->new_state_tmp.disk;
3278 
3279 	drbd_info(device, "drbd_sync_handshake:\n");
3280 
3281 	spin_lock_irq(&device->ldev->md.uuid_lock);
3282 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3283 	drbd_uuid_dump(device, "peer", device->p_uuid,
3284 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3285 
3286 	hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr);
3287 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3288 
3289 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3290 
3291 	if (hg == -1000) {
3292 		drbd_alert(device, "Unrelated data, aborting!\n");
3293 		return C_MASK;
3294 	}
3295 	if (hg < -0x10000) {
3296 		int proto, fflags;
3297 		hg = -hg;
3298 		proto = hg & 0xff;
3299 		fflags = (hg >> 8) & 0xff;
3300 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3301 					proto, fflags);
3302 		return C_MASK;
3303 	}
3304 	if (hg < -1000) {
3305 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3306 		return C_MASK;
3307 	}
3308 
3309 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3310 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3311 		int f = (hg == -100) || abs(hg) == 2;
3312 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3313 		if (f)
3314 			hg = hg*2;
3315 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3316 		     hg > 0 ? "source" : "target");
3317 	}
3318 
3319 	if (abs(hg) == 100)
3320 		drbd_khelper(device, "initial-split-brain");
3321 
3322 	rcu_read_lock();
3323 	nc = rcu_dereference(peer_device->connection->net_conf);
3324 	always_asbp = nc->always_asbp;
3325 	rr_conflict = nc->rr_conflict;
3326 	tentative = nc->tentative;
3327 	rcu_read_unlock();
3328 
3329 	if (hg == 100 || (hg == -100 && always_asbp)) {
3330 		int pcount = (device->state.role == R_PRIMARY)
3331 			   + (peer_role == R_PRIMARY);
3332 		int forced = (hg == -100);
3333 
3334 		switch (pcount) {
3335 		case 0:
3336 			hg = drbd_asb_recover_0p(peer_device);
3337 			break;
3338 		case 1:
3339 			hg = drbd_asb_recover_1p(peer_device);
3340 			break;
3341 		case 2:
3342 			hg = drbd_asb_recover_2p(peer_device);
3343 			break;
3344 		}
3345 		if (abs(hg) < 100) {
3346 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3347 			     "automatically solved. Sync from %s node\n",
3348 			     pcount, (hg < 0) ? "peer" : "this");
3349 			if (forced) {
3350 				drbd_warn(device, "Doing a full sync, since"
3351 				     " UUIDs where ambiguous.\n");
3352 				hg = hg*2;
3353 			}
3354 		}
3355 	}
3356 
3357 	if (hg == -100) {
3358 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3359 			hg = -1;
3360 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3361 			hg = 1;
3362 
3363 		if (abs(hg) < 100)
3364 			drbd_warn(device, "Split-Brain detected, manually solved. "
3365 			     "Sync from %s node\n",
3366 			     (hg < 0) ? "peer" : "this");
3367 	}
3368 
3369 	if (hg == -100) {
3370 		/* FIXME this log message is not correct if we end up here
3371 		 * after an attempted attach on a diskless node.
3372 		 * We just refuse to attach -- well, we drop the "connection"
3373 		 * to that disk, in a way... */
3374 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3375 		drbd_khelper(device, "split-brain");
3376 		return C_MASK;
3377 	}
3378 
3379 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3380 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3381 		return C_MASK;
3382 	}
3383 
3384 	if (hg < 0 && /* by intention we do not use mydisk here. */
3385 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3386 		switch (rr_conflict) {
3387 		case ASB_CALL_HELPER:
3388 			drbd_khelper(device, "pri-lost");
3389 			fallthrough;
3390 		case ASB_DISCONNECT:
3391 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3392 			return C_MASK;
3393 		case ASB_VIOLENTLY:
3394 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3395 			     "assumption\n");
3396 		}
3397 	}
3398 
3399 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3400 		if (hg == 0)
3401 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3402 		else
3403 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3404 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3405 				 abs(hg) >= 2 ? "full" : "bit-map based");
3406 		return C_MASK;
3407 	}
3408 
3409 	if (abs(hg) >= 2) {
3410 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3411 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3412 					BM_LOCKED_SET_ALLOWED, NULL))
3413 			return C_MASK;
3414 	}
3415 
3416 	if (hg > 0) { /* become sync source. */
3417 		rv = C_WF_BITMAP_S;
3418 	} else if (hg < 0) { /* become sync target */
3419 		rv = C_WF_BITMAP_T;
3420 	} else {
3421 		rv = C_CONNECTED;
3422 		if (drbd_bm_total_weight(device)) {
3423 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3424 			     drbd_bm_total_weight(device));
3425 		}
3426 	}
3427 
3428 	return rv;
3429 }
3430 
3431 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3432 {
3433 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3434 	if (peer == ASB_DISCARD_REMOTE)
3435 		return ASB_DISCARD_LOCAL;
3436 
3437 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3438 	if (peer == ASB_DISCARD_LOCAL)
3439 		return ASB_DISCARD_REMOTE;
3440 
3441 	/* everything else is valid if they are equal on both sides. */
3442 	return peer;
3443 }
3444 
3445 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3446 {
3447 	struct p_protocol *p = pi->data;
3448 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3449 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3450 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3451 	char integrity_alg[SHARED_SECRET_MAX] = "";
3452 	struct crypto_shash *peer_integrity_tfm = NULL;
3453 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3454 
3455 	p_proto		= be32_to_cpu(p->protocol);
3456 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3457 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3458 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3459 	p_two_primaries = be32_to_cpu(p->two_primaries);
3460 	cf		= be32_to_cpu(p->conn_flags);
3461 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3462 
3463 	if (connection->agreed_pro_version >= 87) {
3464 		int err;
3465 
3466 		if (pi->size > sizeof(integrity_alg))
3467 			return -EIO;
3468 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3469 		if (err)
3470 			return err;
3471 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3472 	}
3473 
3474 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3475 		clear_bit(CONN_DRY_RUN, &connection->flags);
3476 
3477 		if (cf & CF_DRY_RUN)
3478 			set_bit(CONN_DRY_RUN, &connection->flags);
3479 
3480 		rcu_read_lock();
3481 		nc = rcu_dereference(connection->net_conf);
3482 
3483 		if (p_proto != nc->wire_protocol) {
3484 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3485 			goto disconnect_rcu_unlock;
3486 		}
3487 
3488 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3489 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3490 			goto disconnect_rcu_unlock;
3491 		}
3492 
3493 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3494 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3495 			goto disconnect_rcu_unlock;
3496 		}
3497 
3498 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3499 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3500 			goto disconnect_rcu_unlock;
3501 		}
3502 
3503 		if (p_discard_my_data && nc->discard_my_data) {
3504 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3505 			goto disconnect_rcu_unlock;
3506 		}
3507 
3508 		if (p_two_primaries != nc->two_primaries) {
3509 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3510 			goto disconnect_rcu_unlock;
3511 		}
3512 
3513 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3514 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3515 			goto disconnect_rcu_unlock;
3516 		}
3517 
3518 		rcu_read_unlock();
3519 	}
3520 
3521 	if (integrity_alg[0]) {
3522 		int hash_size;
3523 
3524 		/*
3525 		 * We can only change the peer data integrity algorithm
3526 		 * here.  Changing our own data integrity algorithm
3527 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3528 		 * the same time; otherwise, the peer has no way to
3529 		 * tell between which packets the algorithm should
3530 		 * change.
3531 		 */
3532 
3533 		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3534 		if (IS_ERR(peer_integrity_tfm)) {
3535 			peer_integrity_tfm = NULL;
3536 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3537 				 integrity_alg);
3538 			goto disconnect;
3539 		}
3540 
3541 		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3542 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3543 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3544 		if (!(int_dig_in && int_dig_vv)) {
3545 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3546 			goto disconnect;
3547 		}
3548 	}
3549 
3550 	new_net_conf = kmalloc_obj(struct net_conf, GFP_KERNEL);
3551 	if (!new_net_conf)
3552 		goto disconnect;
3553 
3554 	mutex_lock(&connection->data.mutex);
3555 	mutex_lock(&connection->resource->conf_update);
3556 	old_net_conf = connection->net_conf;
3557 	*new_net_conf = *old_net_conf;
3558 
3559 	new_net_conf->wire_protocol = p_proto;
3560 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3561 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3562 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3563 	new_net_conf->two_primaries = p_two_primaries;
3564 
3565 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3566 	mutex_unlock(&connection->resource->conf_update);
3567 	mutex_unlock(&connection->data.mutex);
3568 
3569 	crypto_free_shash(connection->peer_integrity_tfm);
3570 	kfree(connection->int_dig_in);
3571 	kfree(connection->int_dig_vv);
3572 	connection->peer_integrity_tfm = peer_integrity_tfm;
3573 	connection->int_dig_in = int_dig_in;
3574 	connection->int_dig_vv = int_dig_vv;
3575 
3576 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3577 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3578 			  integrity_alg[0] ? integrity_alg : "(none)");
3579 
3580 	kvfree_rcu_mightsleep(old_net_conf);
3581 	return 0;
3582 
3583 disconnect_rcu_unlock:
3584 	rcu_read_unlock();
3585 disconnect:
3586 	crypto_free_shash(peer_integrity_tfm);
3587 	kfree(int_dig_in);
3588 	kfree(int_dig_vv);
3589 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3590 	return -EIO;
3591 }
3592 
3593 /* helper function
3594  * input: alg name, feature name
3595  * return: NULL (alg name was "")
3596  *         ERR_PTR(error) if something goes wrong
3597  *         or the crypto hash ptr, if it worked out ok. */
3598 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3599 		const struct drbd_device *device,
3600 		const char *alg, const char *name)
3601 {
3602 	struct crypto_shash *tfm;
3603 
3604 	if (!alg[0])
3605 		return NULL;
3606 
3607 	tfm = crypto_alloc_shash(alg, 0, 0);
3608 	if (IS_ERR(tfm)) {
3609 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3610 			alg, name, PTR_ERR(tfm));
3611 		return tfm;
3612 	}
3613 	return tfm;
3614 }
3615 
3616 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3617 {
3618 	void *buffer = connection->data.rbuf;
3619 	int size = pi->size;
3620 
3621 	while (size) {
3622 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3623 		s = drbd_recv(connection, buffer, s);
3624 		if (s <= 0) {
3625 			if (s < 0)
3626 				return s;
3627 			break;
3628 		}
3629 		size -= s;
3630 	}
3631 	if (size)
3632 		return -EIO;
3633 	return 0;
3634 }
3635 
3636 /*
3637  * config_unknown_volume  -  device configuration command for unknown volume
3638  *
3639  * When a device is added to an existing connection, the node on which the
3640  * device is added first will send configuration commands to its peer but the
3641  * peer will not know about the device yet.  It will warn and ignore these
3642  * commands.  Once the device is added on the second node, the second node will
3643  * send the same device configuration commands, but in the other direction.
3644  *
3645  * (We can also end up here if drbd is misconfigured.)
3646  */
3647 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3648 {
3649 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3650 		  cmdname(pi->cmd), pi->vnr);
3651 	return ignore_remaining_packet(connection, pi);
3652 }
3653 
3654 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3655 {
3656 	struct drbd_peer_device *peer_device;
3657 	struct drbd_device *device;
3658 	struct p_rs_param_95 *p;
3659 	unsigned int header_size, data_size, exp_max_sz;
3660 	struct crypto_shash *verify_tfm = NULL;
3661 	struct crypto_shash *csums_tfm = NULL;
3662 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3663 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3664 	const int apv = connection->agreed_pro_version;
3665 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3666 	unsigned int fifo_size = 0;
3667 	int err;
3668 
3669 	peer_device = conn_peer_device(connection, pi->vnr);
3670 	if (!peer_device)
3671 		return config_unknown_volume(connection, pi);
3672 	device = peer_device->device;
3673 
3674 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3675 		    : apv == 88 ? sizeof(struct p_rs_param)
3676 					+ SHARED_SECRET_MAX
3677 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3678 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3679 
3680 	if (pi->size > exp_max_sz) {
3681 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3682 		    pi->size, exp_max_sz);
3683 		return -EIO;
3684 	}
3685 
3686 	if (apv <= 88) {
3687 		header_size = sizeof(struct p_rs_param);
3688 		data_size = pi->size - header_size;
3689 	} else if (apv <= 94) {
3690 		header_size = sizeof(struct p_rs_param_89);
3691 		data_size = pi->size - header_size;
3692 		D_ASSERT(device, data_size == 0);
3693 	} else {
3694 		header_size = sizeof(struct p_rs_param_95);
3695 		data_size = pi->size - header_size;
3696 		D_ASSERT(device, data_size == 0);
3697 	}
3698 
3699 	/* initialize verify_alg and csums_alg */
3700 	p = pi->data;
3701 	BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
3702 	memset(&p->algs, 0, sizeof(p->algs));
3703 
3704 	err = drbd_recv_all(peer_device->connection, p, header_size);
3705 	if (err)
3706 		return err;
3707 
3708 	mutex_lock(&connection->resource->conf_update);
3709 	old_net_conf = peer_device->connection->net_conf;
3710 	if (get_ldev(device)) {
3711 		new_disk_conf = kzalloc_obj(struct disk_conf, GFP_KERNEL);
3712 		if (!new_disk_conf) {
3713 			put_ldev(device);
3714 			mutex_unlock(&connection->resource->conf_update);
3715 			drbd_err(device, "Allocation of new disk_conf failed\n");
3716 			return -ENOMEM;
3717 		}
3718 
3719 		old_disk_conf = device->ldev->disk_conf;
3720 		*new_disk_conf = *old_disk_conf;
3721 
3722 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3723 	}
3724 
3725 	if (apv >= 88) {
3726 		if (apv == 88) {
3727 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3728 				drbd_err(device, "verify-alg of wrong size, "
3729 					"peer wants %u, accepting only up to %u byte\n",
3730 					data_size, SHARED_SECRET_MAX);
3731 				goto reconnect;
3732 			}
3733 
3734 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3735 			if (err)
3736 				goto reconnect;
3737 			/* we expect NUL terminated string */
3738 			/* but just in case someone tries to be evil */
3739 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3740 			p->verify_alg[data_size-1] = 0;
3741 
3742 		} else /* apv >= 89 */ {
3743 			/* we still expect NUL terminated strings */
3744 			/* but just in case someone tries to be evil */
3745 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3746 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3747 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3748 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3749 		}
3750 
3751 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3752 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3753 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3754 				    old_net_conf->verify_alg, p->verify_alg);
3755 				goto disconnect;
3756 			}
3757 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3758 					p->verify_alg, "verify-alg");
3759 			if (IS_ERR(verify_tfm)) {
3760 				verify_tfm = NULL;
3761 				goto disconnect;
3762 			}
3763 		}
3764 
3765 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3766 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3767 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3768 				    old_net_conf->csums_alg, p->csums_alg);
3769 				goto disconnect;
3770 			}
3771 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3772 					p->csums_alg, "csums-alg");
3773 			if (IS_ERR(csums_tfm)) {
3774 				csums_tfm = NULL;
3775 				goto disconnect;
3776 			}
3777 		}
3778 
3779 		if (apv > 94 && new_disk_conf) {
3780 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3781 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3782 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3783 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3784 
3785 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3786 			if (fifo_size != device->rs_plan_s->size) {
3787 				new_plan = fifo_alloc(fifo_size);
3788 				if (!new_plan) {
3789 					drbd_err(device, "kmalloc of fifo_buffer failed");
3790 					put_ldev(device);
3791 					goto disconnect;
3792 				}
3793 			}
3794 		}
3795 
3796 		if (verify_tfm || csums_tfm) {
3797 			new_net_conf = kzalloc_obj(struct net_conf, GFP_KERNEL);
3798 			if (!new_net_conf)
3799 				goto disconnect;
3800 
3801 			*new_net_conf = *old_net_conf;
3802 
3803 			if (verify_tfm) {
3804 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3805 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3806 				crypto_free_shash(peer_device->connection->verify_tfm);
3807 				peer_device->connection->verify_tfm = verify_tfm;
3808 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3809 			}
3810 			if (csums_tfm) {
3811 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3812 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3813 				crypto_free_shash(peer_device->connection->csums_tfm);
3814 				peer_device->connection->csums_tfm = csums_tfm;
3815 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3816 			}
3817 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3818 		}
3819 	}
3820 
3821 	if (new_disk_conf) {
3822 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3823 		put_ldev(device);
3824 	}
3825 
3826 	if (new_plan) {
3827 		old_plan = device->rs_plan_s;
3828 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3829 	}
3830 
3831 	mutex_unlock(&connection->resource->conf_update);
3832 	synchronize_rcu();
3833 	if (new_net_conf)
3834 		kfree(old_net_conf);
3835 	kfree(old_disk_conf);
3836 	kfree(old_plan);
3837 
3838 	return 0;
3839 
3840 reconnect:
3841 	if (new_disk_conf) {
3842 		put_ldev(device);
3843 		kfree(new_disk_conf);
3844 	}
3845 	mutex_unlock(&connection->resource->conf_update);
3846 	return -EIO;
3847 
3848 disconnect:
3849 	kfree(new_plan);
3850 	if (new_disk_conf) {
3851 		put_ldev(device);
3852 		kfree(new_disk_conf);
3853 	}
3854 	mutex_unlock(&connection->resource->conf_update);
3855 	/* just for completeness: actually not needed,
3856 	 * as this is not reached if csums_tfm was ok. */
3857 	crypto_free_shash(csums_tfm);
3858 	/* but free the verify_tfm again, if csums_tfm did not work out */
3859 	crypto_free_shash(verify_tfm);
3860 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3861 	return -EIO;
3862 }
3863 
3864 /* warn if the arguments differ by more than 12.5% */
3865 static void warn_if_differ_considerably(struct drbd_device *device,
3866 	const char *s, sector_t a, sector_t b)
3867 {
3868 	sector_t d;
3869 	if (a == 0 || b == 0)
3870 		return;
3871 	d = (a > b) ? (a - b) : (b - a);
3872 	if (d > (a>>3) || d > (b>>3))
3873 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3874 		     (unsigned long long)a, (unsigned long long)b);
3875 }
3876 
3877 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3878 {
3879 	struct drbd_peer_device *peer_device;
3880 	struct drbd_device *device;
3881 	struct p_sizes *p = pi->data;
3882 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3883 	enum determine_dev_size dd = DS_UNCHANGED;
3884 	sector_t p_size, p_usize, p_csize, my_usize;
3885 	sector_t new_size, cur_size;
3886 	int ldsc = 0; /* local disk size changed */
3887 	enum dds_flags ddsf;
3888 
3889 	peer_device = conn_peer_device(connection, pi->vnr);
3890 	if (!peer_device)
3891 		return config_unknown_volume(connection, pi);
3892 	device = peer_device->device;
3893 	cur_size = get_capacity(device->vdisk);
3894 
3895 	p_size = be64_to_cpu(p->d_size);
3896 	p_usize = be64_to_cpu(p->u_size);
3897 	p_csize = be64_to_cpu(p->c_size);
3898 
3899 	/* just store the peer's disk size for now.
3900 	 * we still need to figure out whether we accept that. */
3901 	device->p_size = p_size;
3902 
3903 	if (get_ldev(device)) {
3904 		rcu_read_lock();
3905 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3906 		rcu_read_unlock();
3907 
3908 		warn_if_differ_considerably(device, "lower level device sizes",
3909 			   p_size, drbd_get_max_capacity(device->ldev));
3910 		warn_if_differ_considerably(device, "user requested size",
3911 					    p_usize, my_usize);
3912 
3913 		/* if this is the first connect, or an otherwise expected
3914 		 * param exchange, choose the minimum */
3915 		if (device->state.conn == C_WF_REPORT_PARAMS)
3916 			p_usize = min_not_zero(my_usize, p_usize);
3917 
3918 		/* Never shrink a device with usable data during connect,
3919 		 * or "attach" on the peer.
3920 		 * But allow online shrinking if we are connected. */
3921 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3922 		if (new_size < cur_size &&
3923 		    device->state.disk >= D_OUTDATED &&
3924 		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
3925 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3926 					(unsigned long long)new_size, (unsigned long long)cur_size);
3927 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3928 			put_ldev(device);
3929 			return -EIO;
3930 		}
3931 
3932 		if (my_usize != p_usize) {
3933 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3934 
3935 			new_disk_conf = kzalloc_obj(struct disk_conf,
3936 						    GFP_KERNEL);
3937 			if (!new_disk_conf) {
3938 				put_ldev(device);
3939 				return -ENOMEM;
3940 			}
3941 
3942 			mutex_lock(&connection->resource->conf_update);
3943 			old_disk_conf = device->ldev->disk_conf;
3944 			*new_disk_conf = *old_disk_conf;
3945 			new_disk_conf->disk_size = p_usize;
3946 
3947 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3948 			mutex_unlock(&connection->resource->conf_update);
3949 			kvfree_rcu_mightsleep(old_disk_conf);
3950 
3951 			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
3952 				 (unsigned long)p_usize, (unsigned long)my_usize);
3953 		}
3954 
3955 		put_ldev(device);
3956 	}
3957 
3958 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3959 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
3960 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3961 	   drbd_reconsider_queue_parameters(), we can be sure that after
3962 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3963 
3964 	ddsf = be16_to_cpu(p->dds_flags);
3965 	if (get_ldev(device)) {
3966 		drbd_reconsider_queue_parameters(device, device->ldev, o);
3967 		dd = drbd_determine_dev_size(device, ddsf, NULL);
3968 		put_ldev(device);
3969 		if (dd == DS_ERROR)
3970 			return -EIO;
3971 		drbd_md_sync(device);
3972 	} else {
3973 		/*
3974 		 * I am diskless, need to accept the peer's *current* size.
3975 		 * I must NOT accept the peers backing disk size,
3976 		 * it may have been larger than mine all along...
3977 		 *
3978 		 * At this point, the peer knows more about my disk, or at
3979 		 * least about what we last agreed upon, than myself.
3980 		 * So if his c_size is less than his d_size, the most likely
3981 		 * reason is that *my* d_size was smaller last time we checked.
3982 		 *
3983 		 * However, if he sends a zero current size,
3984 		 * take his (user-capped or) backing disk size anyways.
3985 		 *
3986 		 * Unless of course he does not have a disk himself.
3987 		 * In which case we ignore this completely.
3988 		 */
3989 		sector_t new_size = p_csize ?: p_usize ?: p_size;
3990 		drbd_reconsider_queue_parameters(device, NULL, o);
3991 		if (new_size == 0) {
3992 			/* Ignore, peer does not know nothing. */
3993 		} else if (new_size == cur_size) {
3994 			/* nothing to do */
3995 		} else if (cur_size != 0 && p_size == 0) {
3996 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
3997 					(unsigned long long)new_size, (unsigned long long)cur_size);
3998 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
3999 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4000 					(unsigned long long)new_size, (unsigned long long)cur_size);
4001 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4002 			return -EIO;
4003 		} else {
4004 			/* I believe the peer, if
4005 			 *  - I don't have a current size myself
4006 			 *  - we agree on the size anyways
4007 			 *  - I do have a current size, am Secondary,
4008 			 *    and he has the only disk
4009 			 *  - I do have a current size, am Primary,
4010 			 *    and he has the only disk,
4011 			 *    which is larger than my current size
4012 			 */
4013 			drbd_set_my_capacity(device, new_size);
4014 		}
4015 	}
4016 
4017 	if (get_ldev(device)) {
4018 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4019 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4020 			ldsc = 1;
4021 		}
4022 
4023 		put_ldev(device);
4024 	}
4025 
4026 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4027 		if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4028 		    ldsc) {
4029 			/* we have different sizes, probably peer
4030 			 * needs to know my new size... */
4031 			drbd_send_sizes(peer_device, 0, ddsf);
4032 		}
4033 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4034 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4035 			if (device->state.pdsk >= D_INCONSISTENT &&
4036 			    device->state.disk >= D_INCONSISTENT) {
4037 				if (ddsf & DDSF_NO_RESYNC)
4038 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4039 				else
4040 					resync_after_online_grow(device);
4041 			} else
4042 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4043 		}
4044 	}
4045 
4046 	return 0;
4047 }
4048 
4049 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4050 {
4051 	struct drbd_peer_device *peer_device;
4052 	struct drbd_device *device;
4053 	struct p_uuids *p = pi->data;
4054 	u64 *p_uuid;
4055 	int i, updated_uuids = 0;
4056 
4057 	peer_device = conn_peer_device(connection, pi->vnr);
4058 	if (!peer_device)
4059 		return config_unknown_volume(connection, pi);
4060 	device = peer_device->device;
4061 
4062 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4063 	if (!p_uuid)
4064 		return false;
4065 
4066 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4067 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4068 
4069 	kfree(device->p_uuid);
4070 	device->p_uuid = p_uuid;
4071 
4072 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4073 	    device->state.disk < D_INCONSISTENT &&
4074 	    device->state.role == R_PRIMARY &&
4075 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4076 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4077 		    (unsigned long long)device->ed_uuid);
4078 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4079 		return -EIO;
4080 	}
4081 
4082 	if (get_ldev(device)) {
4083 		int skip_initial_sync =
4084 			device->state.conn == C_CONNECTED &&
4085 			peer_device->connection->agreed_pro_version >= 90 &&
4086 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4087 			(p_uuid[UI_FLAGS] & 8);
4088 		if (skip_initial_sync) {
4089 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4090 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4091 					"clear_n_write from receive_uuids",
4092 					BM_LOCKED_TEST_ALLOWED, NULL);
4093 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4094 			_drbd_uuid_set(device, UI_BITMAP, 0);
4095 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4096 					CS_VERBOSE, NULL);
4097 			drbd_md_sync(device);
4098 			updated_uuids = 1;
4099 		}
4100 		put_ldev(device);
4101 	} else if (device->state.disk < D_INCONSISTENT &&
4102 		   device->state.role == R_PRIMARY) {
4103 		/* I am a diskless primary, the peer just created a new current UUID
4104 		   for me. */
4105 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4106 	}
4107 
4108 	/* Before we test for the disk state, we should wait until an eventually
4109 	   ongoing cluster wide state change is finished. That is important if
4110 	   we are primary and are detaching from our disk. We need to see the
4111 	   new disk state... */
4112 	mutex_lock(device->state_mutex);
4113 	mutex_unlock(device->state_mutex);
4114 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4115 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4116 
4117 	if (updated_uuids)
4118 		drbd_print_uuids(device, "receiver updated UUIDs to");
4119 
4120 	return 0;
4121 }
4122 
4123 /**
4124  * convert_state() - Converts the peer's view of the cluster state to our point of view
4125  * @ps:		The state as seen by the peer.
4126  */
4127 static union drbd_state convert_state(union drbd_state ps)
4128 {
4129 	union drbd_state ms;
4130 
4131 	static enum drbd_conns c_tab[] = {
4132 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4133 		[C_CONNECTED] = C_CONNECTED,
4134 
4135 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4136 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4137 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4138 		[C_VERIFY_S]       = C_VERIFY_T,
4139 		[C_MASK]   = C_MASK,
4140 	};
4141 
4142 	ms.i = ps.i;
4143 
4144 	ms.conn = c_tab[ps.conn];
4145 	ms.peer = ps.role;
4146 	ms.role = ps.peer;
4147 	ms.pdsk = ps.disk;
4148 	ms.disk = ps.pdsk;
4149 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4150 
4151 	return ms;
4152 }
4153 
4154 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4155 {
4156 	struct drbd_peer_device *peer_device;
4157 	struct drbd_device *device;
4158 	struct p_req_state *p = pi->data;
4159 	union drbd_state mask, val;
4160 	enum drbd_state_rv rv;
4161 
4162 	peer_device = conn_peer_device(connection, pi->vnr);
4163 	if (!peer_device)
4164 		return -EIO;
4165 	device = peer_device->device;
4166 
4167 	mask.i = be32_to_cpu(p->mask);
4168 	val.i = be32_to_cpu(p->val);
4169 
4170 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4171 	    mutex_is_locked(device->state_mutex)) {
4172 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4173 		return 0;
4174 	}
4175 
4176 	mask = convert_state(mask);
4177 	val = convert_state(val);
4178 
4179 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4180 	drbd_send_sr_reply(peer_device, rv);
4181 
4182 	drbd_md_sync(device);
4183 
4184 	return 0;
4185 }
4186 
4187 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4188 {
4189 	struct p_req_state *p = pi->data;
4190 	union drbd_state mask, val;
4191 	enum drbd_state_rv rv;
4192 
4193 	mask.i = be32_to_cpu(p->mask);
4194 	val.i = be32_to_cpu(p->val);
4195 
4196 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4197 	    mutex_is_locked(&connection->cstate_mutex)) {
4198 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4199 		return 0;
4200 	}
4201 
4202 	mask = convert_state(mask);
4203 	val = convert_state(val);
4204 
4205 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4206 	conn_send_sr_reply(connection, rv);
4207 
4208 	return 0;
4209 }
4210 
4211 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4212 {
4213 	struct drbd_peer_device *peer_device;
4214 	struct drbd_device *device;
4215 	struct p_state *p = pi->data;
4216 	union drbd_state os, ns, peer_state;
4217 	enum drbd_disk_state real_peer_disk;
4218 	enum chg_state_flags cs_flags;
4219 	int rv;
4220 
4221 	peer_device = conn_peer_device(connection, pi->vnr);
4222 	if (!peer_device)
4223 		return config_unknown_volume(connection, pi);
4224 	device = peer_device->device;
4225 
4226 	peer_state.i = be32_to_cpu(p->state);
4227 
4228 	real_peer_disk = peer_state.disk;
4229 	if (peer_state.disk == D_NEGOTIATING) {
4230 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4231 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4232 	}
4233 
4234 	spin_lock_irq(&device->resource->req_lock);
4235  retry:
4236 	os = ns = drbd_read_state(device);
4237 	spin_unlock_irq(&device->resource->req_lock);
4238 
4239 	/* If some other part of the code (ack_receiver thread, timeout)
4240 	 * already decided to close the connection again,
4241 	 * we must not "re-establish" it here. */
4242 	if (os.conn <= C_TEAR_DOWN)
4243 		return -ECONNRESET;
4244 
4245 	/* If this is the "end of sync" confirmation, usually the peer disk
4246 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4247 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4248 	 * unpause-sync events has been "just right", the peer disk may
4249 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4250 	 */
4251 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4252 	    real_peer_disk == D_UP_TO_DATE &&
4253 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4254 		/* If we are (becoming) SyncSource, but peer is still in sync
4255 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4256 		 * will change to inconsistent once the peer reaches active
4257 		 * syncing states.
4258 		 * It may have changed syncer-paused flags, however, so we
4259 		 * cannot ignore this completely. */
4260 		if (peer_state.conn > C_CONNECTED &&
4261 		    peer_state.conn < C_SYNC_SOURCE)
4262 			real_peer_disk = D_INCONSISTENT;
4263 
4264 		/* if peer_state changes to connected at the same time,
4265 		 * it explicitly notifies us that it finished resync.
4266 		 * Maybe we should finish it up, too? */
4267 		else if (os.conn >= C_SYNC_SOURCE &&
4268 			 peer_state.conn == C_CONNECTED) {
4269 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4270 				drbd_resync_finished(peer_device);
4271 			return 0;
4272 		}
4273 	}
4274 
4275 	/* explicit verify finished notification, stop sector reached. */
4276 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4277 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4278 		ov_out_of_sync_print(peer_device);
4279 		drbd_resync_finished(peer_device);
4280 		return 0;
4281 	}
4282 
4283 	/* peer says his disk is inconsistent, while we think it is uptodate,
4284 	 * and this happens while the peer still thinks we have a sync going on,
4285 	 * but we think we are already done with the sync.
4286 	 * We ignore this to avoid flapping pdsk.
4287 	 * This should not happen, if the peer is a recent version of drbd. */
4288 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4289 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4290 		real_peer_disk = D_UP_TO_DATE;
4291 
4292 	if (ns.conn == C_WF_REPORT_PARAMS)
4293 		ns.conn = C_CONNECTED;
4294 
4295 	if (peer_state.conn == C_AHEAD)
4296 		ns.conn = C_BEHIND;
4297 
4298 	/* TODO:
4299 	 * if (primary and diskless and peer uuid != effective uuid)
4300 	 *     abort attach on peer;
4301 	 *
4302 	 * If this node does not have good data, was already connected, but
4303 	 * the peer did a late attach only now, trying to "negotiate" with me,
4304 	 * AND I am currently Primary, possibly frozen, with some specific
4305 	 * "effective" uuid, this should never be reached, really, because
4306 	 * we first send the uuids, then the current state.
4307 	 *
4308 	 * In this scenario, we already dropped the connection hard
4309 	 * when we received the unsuitable uuids (receive_uuids().
4310 	 *
4311 	 * Should we want to change this, that is: not drop the connection in
4312 	 * receive_uuids() already, then we would need to add a branch here
4313 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4314 	 * this node is currently Diskless Primary.
4315 	 */
4316 
4317 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4318 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4319 		int cr; /* consider resync */
4320 
4321 		/* if we established a new connection */
4322 		cr  = (os.conn < C_CONNECTED);
4323 		/* if we had an established connection
4324 		 * and one of the nodes newly attaches a disk */
4325 		cr |= (os.conn == C_CONNECTED &&
4326 		       (peer_state.disk == D_NEGOTIATING ||
4327 			os.disk == D_NEGOTIATING));
4328 		/* if we have both been inconsistent, and the peer has been
4329 		 * forced to be UpToDate with --force */
4330 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4331 		/* if we had been plain connected, and the admin requested to
4332 		 * start a sync by "invalidate" or "invalidate-remote" */
4333 		cr |= (os.conn == C_CONNECTED &&
4334 				(peer_state.conn >= C_STARTING_SYNC_S &&
4335 				 peer_state.conn <= C_WF_BITMAP_T));
4336 
4337 		if (cr)
4338 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4339 
4340 		put_ldev(device);
4341 		if (ns.conn == C_MASK) {
4342 			ns.conn = C_CONNECTED;
4343 			if (device->state.disk == D_NEGOTIATING) {
4344 				drbd_force_state(device, NS(disk, D_FAILED));
4345 			} else if (peer_state.disk == D_NEGOTIATING) {
4346 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4347 				peer_state.disk = D_DISKLESS;
4348 				real_peer_disk = D_DISKLESS;
4349 			} else {
4350 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4351 					return -EIO;
4352 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4353 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4354 				return -EIO;
4355 			}
4356 		}
4357 	}
4358 
4359 	spin_lock_irq(&device->resource->req_lock);
4360 	if (os.i != drbd_read_state(device).i)
4361 		goto retry;
4362 	clear_bit(CONSIDER_RESYNC, &device->flags);
4363 	ns.peer = peer_state.role;
4364 	ns.pdsk = real_peer_disk;
4365 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4366 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4367 		ns.disk = device->new_state_tmp.disk;
4368 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4369 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4370 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4371 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4372 		   for temporal network outages! */
4373 		spin_unlock_irq(&device->resource->req_lock);
4374 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4375 		tl_clear(peer_device->connection);
4376 		drbd_uuid_new_current(device);
4377 		clear_bit(NEW_CUR_UUID, &device->flags);
4378 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4379 		return -EIO;
4380 	}
4381 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4382 	ns = drbd_read_state(device);
4383 	spin_unlock_irq(&device->resource->req_lock);
4384 
4385 	if (rv < SS_SUCCESS) {
4386 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4387 		return -EIO;
4388 	}
4389 
4390 	if (os.conn > C_WF_REPORT_PARAMS) {
4391 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4392 		    peer_state.disk != D_NEGOTIATING ) {
4393 			/* we want resync, peer has not yet decided to sync... */
4394 			/* Nowadays only used when forcing a node into primary role and
4395 			   setting its disk to UpToDate with that */
4396 			drbd_send_uuids(peer_device);
4397 			drbd_send_current_state(peer_device);
4398 		}
4399 	}
4400 
4401 	clear_bit(DISCARD_MY_DATA, &device->flags);
4402 
4403 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4404 
4405 	return 0;
4406 }
4407 
4408 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4409 {
4410 	struct drbd_peer_device *peer_device;
4411 	struct drbd_device *device;
4412 	struct p_rs_uuid *p = pi->data;
4413 
4414 	peer_device = conn_peer_device(connection, pi->vnr);
4415 	if (!peer_device)
4416 		return -EIO;
4417 	device = peer_device->device;
4418 
4419 	wait_event(device->misc_wait,
4420 		   device->state.conn == C_WF_SYNC_UUID ||
4421 		   device->state.conn == C_BEHIND ||
4422 		   device->state.conn < C_CONNECTED ||
4423 		   device->state.disk < D_NEGOTIATING);
4424 
4425 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4426 
4427 	/* Here the _drbd_uuid_ functions are right, current should
4428 	   _not_ be rotated into the history */
4429 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4430 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4431 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4432 
4433 		drbd_print_uuids(device, "updated sync uuid");
4434 		drbd_start_resync(device, C_SYNC_TARGET);
4435 
4436 		put_ldev(device);
4437 	} else
4438 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4439 
4440 	return 0;
4441 }
4442 
4443 /*
4444  * receive_bitmap_plain
4445  *
4446  * Return 0 when done, 1 when another iteration is needed, and a negative error
4447  * code upon failure.
4448  */
4449 static int
4450 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4451 		     unsigned long *p, struct bm_xfer_ctx *c)
4452 {
4453 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4454 				 drbd_header_size(peer_device->connection);
4455 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4456 				       c->bm_words - c->word_offset);
4457 	unsigned int want = num_words * sizeof(*p);
4458 	int err;
4459 
4460 	if (want != size) {
4461 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4462 		return -EIO;
4463 	}
4464 	if (want == 0)
4465 		return 0;
4466 	err = drbd_recv_all(peer_device->connection, p, want);
4467 	if (err)
4468 		return err;
4469 
4470 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4471 
4472 	c->word_offset += num_words;
4473 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4474 	if (c->bit_offset > c->bm_bits)
4475 		c->bit_offset = c->bm_bits;
4476 
4477 	return 1;
4478 }
4479 
4480 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4481 {
4482 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4483 }
4484 
4485 static int dcbp_get_start(struct p_compressed_bm *p)
4486 {
4487 	return (p->encoding & 0x80) != 0;
4488 }
4489 
4490 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4491 {
4492 	return (p->encoding >> 4) & 0x7;
4493 }
4494 
4495 /*
4496  * recv_bm_rle_bits
4497  *
4498  * Return 0 when done, 1 when another iteration is needed, and a negative error
4499  * code upon failure.
4500  */
4501 static int
4502 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4503 		struct p_compressed_bm *p,
4504 		 struct bm_xfer_ctx *c,
4505 		 unsigned int len)
4506 {
4507 	struct bitstream bs;
4508 	u64 look_ahead;
4509 	u64 rl;
4510 	u64 tmp;
4511 	unsigned long s = c->bit_offset;
4512 	unsigned long e;
4513 	int toggle = dcbp_get_start(p);
4514 	int have;
4515 	int bits;
4516 
4517 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4518 
4519 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4520 	if (bits < 0)
4521 		return -EIO;
4522 
4523 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4524 		bits = vli_decode_bits(&rl, look_ahead);
4525 		if (bits <= 0)
4526 			return -EIO;
4527 
4528 		if (toggle) {
4529 			e = s + rl -1;
4530 			if (e >= c->bm_bits) {
4531 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4532 				return -EIO;
4533 			}
4534 			_drbd_bm_set_bits(peer_device->device, s, e);
4535 		}
4536 
4537 		if (have < bits) {
4538 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4539 				have, bits, look_ahead,
4540 				(unsigned int)(bs.cur.b - p->code),
4541 				(unsigned int)bs.buf_len);
4542 			return -EIO;
4543 		}
4544 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4545 		if (likely(bits < 64))
4546 			look_ahead >>= bits;
4547 		else
4548 			look_ahead = 0;
4549 		have -= bits;
4550 
4551 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4552 		if (bits < 0)
4553 			return -EIO;
4554 		look_ahead |= tmp << have;
4555 		have += bits;
4556 	}
4557 
4558 	c->bit_offset = s;
4559 	bm_xfer_ctx_bit_to_word_offset(c);
4560 
4561 	return (s != c->bm_bits);
4562 }
4563 
4564 /*
4565  * decode_bitmap_c
4566  *
4567  * Return 0 when done, 1 when another iteration is needed, and a negative error
4568  * code upon failure.
4569  */
4570 static int
4571 decode_bitmap_c(struct drbd_peer_device *peer_device,
4572 		struct p_compressed_bm *p,
4573 		struct bm_xfer_ctx *c,
4574 		unsigned int len)
4575 {
4576 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4577 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4578 
4579 	/* other variants had been implemented for evaluation,
4580 	 * but have been dropped as this one turned out to be "best"
4581 	 * during all our tests. */
4582 
4583 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4584 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4585 	return -EIO;
4586 }
4587 
4588 void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device,
4589 		const char *direction, struct bm_xfer_ctx *c)
4590 {
4591 	/* what would it take to transfer it "plaintext" */
4592 	unsigned int header_size = drbd_header_size(peer_device->connection);
4593 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4594 	unsigned int plain =
4595 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4596 		c->bm_words * sizeof(unsigned long);
4597 	unsigned int total = c->bytes[0] + c->bytes[1];
4598 	unsigned int r;
4599 
4600 	/* total can not be zero. but just in case: */
4601 	if (total == 0)
4602 		return;
4603 
4604 	/* don't report if not compressed */
4605 	if (total >= plain)
4606 		return;
4607 
4608 	/* total < plain. check for overflow, still */
4609 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4610 		                    : (1000 * total / plain);
4611 
4612 	if (r > 1000)
4613 		r = 1000;
4614 
4615 	r = 1000 - r;
4616 	drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4617 	     "total %u; compression: %u.%u%%\n",
4618 			direction,
4619 			c->bytes[1], c->packets[1],
4620 			c->bytes[0], c->packets[0],
4621 			total, r/10, r % 10);
4622 }
4623 
4624 /* Since we are processing the bitfield from lower addresses to higher,
4625    it does not matter if the process it in 32 bit chunks or 64 bit
4626    chunks as long as it is little endian. (Understand it as byte stream,
4627    beginning with the lowest byte...) If we would use big endian
4628    we would need to process it from the highest address to the lowest,
4629    in order to be agnostic to the 32 vs 64 bits issue.
4630 
4631    returns 0 on failure, 1 if we successfully received it. */
4632 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4633 {
4634 	struct drbd_peer_device *peer_device;
4635 	struct drbd_device *device;
4636 	struct bm_xfer_ctx c;
4637 	int err;
4638 
4639 	peer_device = conn_peer_device(connection, pi->vnr);
4640 	if (!peer_device)
4641 		return -EIO;
4642 	device = peer_device->device;
4643 
4644 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4645 	/* you are supposed to send additional out-of-sync information
4646 	 * if you actually set bits during this phase */
4647 
4648 	c = (struct bm_xfer_ctx) {
4649 		.bm_bits = drbd_bm_bits(device),
4650 		.bm_words = drbd_bm_words(device),
4651 	};
4652 
4653 	for(;;) {
4654 		if (pi->cmd == P_BITMAP)
4655 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4656 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4657 			/* MAYBE: sanity check that we speak proto >= 90,
4658 			 * and the feature is enabled! */
4659 			struct p_compressed_bm *p = pi->data;
4660 
4661 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4662 				drbd_err(device, "ReportCBitmap packet too large\n");
4663 				err = -EIO;
4664 				goto out;
4665 			}
4666 			if (pi->size <= sizeof(*p)) {
4667 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4668 				err = -EIO;
4669 				goto out;
4670 			}
4671 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4672 			if (err)
4673 			       goto out;
4674 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4675 		} else {
4676 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4677 			err = -EIO;
4678 			goto out;
4679 		}
4680 
4681 		c.packets[pi->cmd == P_BITMAP]++;
4682 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4683 
4684 		if (err <= 0) {
4685 			if (err < 0)
4686 				goto out;
4687 			break;
4688 		}
4689 		err = drbd_recv_header(peer_device->connection, pi);
4690 		if (err)
4691 			goto out;
4692 	}
4693 
4694 	INFO_bm_xfer_stats(peer_device, "receive", &c);
4695 
4696 	if (device->state.conn == C_WF_BITMAP_T) {
4697 		enum drbd_state_rv rv;
4698 
4699 		err = drbd_send_bitmap(device, peer_device);
4700 		if (err)
4701 			goto out;
4702 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4703 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4704 		D_ASSERT(device, rv == SS_SUCCESS);
4705 	} else if (device->state.conn != C_WF_BITMAP_S) {
4706 		/* admin may have requested C_DISCONNECTING,
4707 		 * other threads may have noticed network errors */
4708 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4709 		    drbd_conn_str(device->state.conn));
4710 	}
4711 	err = 0;
4712 
4713  out:
4714 	drbd_bm_unlock(device);
4715 	if (!err && device->state.conn == C_WF_BITMAP_S)
4716 		drbd_start_resync(device, C_SYNC_SOURCE);
4717 	return err;
4718 }
4719 
4720 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4721 {
4722 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4723 		 pi->cmd, pi->size);
4724 
4725 	return ignore_remaining_packet(connection, pi);
4726 }
4727 
4728 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4729 {
4730 	/* Make sure we've acked all the TCP data associated
4731 	 * with the data requests being unplugged */
4732 	tcp_sock_set_quickack(connection->data.socket->sk, 2);
4733 	return 0;
4734 }
4735 
4736 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4737 {
4738 	struct drbd_peer_device *peer_device;
4739 	struct drbd_device *device;
4740 	struct p_block_desc *p = pi->data;
4741 
4742 	peer_device = conn_peer_device(connection, pi->vnr);
4743 	if (!peer_device)
4744 		return -EIO;
4745 	device = peer_device->device;
4746 
4747 	switch (device->state.conn) {
4748 	case C_WF_SYNC_UUID:
4749 	case C_WF_BITMAP_T:
4750 	case C_BEHIND:
4751 			break;
4752 	default:
4753 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4754 				drbd_conn_str(device->state.conn));
4755 	}
4756 
4757 	drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4758 
4759 	return 0;
4760 }
4761 
4762 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4763 {
4764 	struct drbd_peer_device *peer_device;
4765 	struct p_block_desc *p = pi->data;
4766 	struct drbd_device *device;
4767 	sector_t sector;
4768 	int size, err = 0;
4769 
4770 	peer_device = conn_peer_device(connection, pi->vnr);
4771 	if (!peer_device)
4772 		return -EIO;
4773 	device = peer_device->device;
4774 
4775 	sector = be64_to_cpu(p->sector);
4776 	size = be32_to_cpu(p->blksize);
4777 
4778 	dec_rs_pending(peer_device);
4779 
4780 	if (get_ldev(device)) {
4781 		struct drbd_peer_request *peer_req;
4782 
4783 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4784 					       size, 0, GFP_NOIO);
4785 		if (!peer_req) {
4786 			put_ldev(device);
4787 			return -ENOMEM;
4788 		}
4789 
4790 		peer_req->w.cb = e_end_resync_block;
4791 		peer_req->opf = REQ_OP_DISCARD;
4792 		peer_req->submit_jif = jiffies;
4793 		peer_req->flags |= EE_TRIM;
4794 
4795 		spin_lock_irq(&device->resource->req_lock);
4796 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4797 		spin_unlock_irq(&device->resource->req_lock);
4798 
4799 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4800 		err = drbd_submit_peer_request(peer_req);
4801 
4802 		if (err) {
4803 			spin_lock_irq(&device->resource->req_lock);
4804 			list_del(&peer_req->w.list);
4805 			spin_unlock_irq(&device->resource->req_lock);
4806 
4807 			drbd_free_peer_req(device, peer_req);
4808 			put_ldev(device);
4809 			err = 0;
4810 			goto fail;
4811 		}
4812 
4813 		inc_unacked(device);
4814 
4815 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4816 		   as well as drbd_rs_complete_io() */
4817 	} else {
4818 	fail:
4819 		drbd_rs_complete_io(device, sector);
4820 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4821 	}
4822 
4823 	atomic_add(size >> 9, &device->rs_sect_in);
4824 
4825 	return err;
4826 }
4827 
4828 struct data_cmd {
4829 	int expect_payload;
4830 	unsigned int pkt_size;
4831 	int (*fn)(struct drbd_connection *, struct packet_info *);
4832 };
4833 
4834 static struct data_cmd drbd_cmd_handler[] = {
4835 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4836 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4837 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4838 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4839 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4840 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4841 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4842 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4843 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4844 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4845 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4846 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4847 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4848 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4849 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4850 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4851 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4852 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4853 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4854 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4855 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4856 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4857 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4858 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4859 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4860 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4861 	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
4862 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4863 };
4864 
4865 static void drbdd(struct drbd_connection *connection)
4866 {
4867 	struct packet_info pi;
4868 	size_t shs; /* sub header size */
4869 	int err;
4870 
4871 	while (get_t_state(&connection->receiver) == RUNNING) {
4872 		struct data_cmd const *cmd;
4873 
4874 		drbd_thread_current_set_cpu(&connection->receiver);
4875 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4876 		if (drbd_recv_header_maybe_unplug(connection, &pi))
4877 			goto err_out;
4878 
4879 		cmd = &drbd_cmd_handler[pi.cmd];
4880 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4881 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4882 				 cmdname(pi.cmd), pi.cmd);
4883 			goto err_out;
4884 		}
4885 
4886 		shs = cmd->pkt_size;
4887 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4888 			shs += sizeof(struct o_qlim);
4889 		if (pi.size > shs && !cmd->expect_payload) {
4890 			drbd_err(connection, "No payload expected %s l:%d\n",
4891 				 cmdname(pi.cmd), pi.size);
4892 			goto err_out;
4893 		}
4894 		if (pi.size < shs) {
4895 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4896 				 cmdname(pi.cmd), (int)shs, pi.size);
4897 			goto err_out;
4898 		}
4899 
4900 		if (shs) {
4901 			update_receiver_timing_details(connection, drbd_recv_all_warn);
4902 			err = drbd_recv_all_warn(connection, pi.data, shs);
4903 			if (err)
4904 				goto err_out;
4905 			pi.size -= shs;
4906 		}
4907 
4908 		update_receiver_timing_details(connection, cmd->fn);
4909 		err = cmd->fn(connection, &pi);
4910 		if (err) {
4911 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4912 				 cmdname(pi.cmd), err, pi.size);
4913 			goto err_out;
4914 		}
4915 	}
4916 	return;
4917 
4918     err_out:
4919 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4920 }
4921 
4922 static void conn_disconnect(struct drbd_connection *connection)
4923 {
4924 	struct drbd_peer_device *peer_device;
4925 	enum drbd_conns oc;
4926 	int vnr;
4927 
4928 	if (connection->cstate == C_STANDALONE)
4929 		return;
4930 
4931 	/* We are about to start the cleanup after connection loss.
4932 	 * Make sure drbd_make_request knows about that.
4933 	 * Usually we should be in some network failure state already,
4934 	 * but just in case we are not, we fix it up here.
4935 	 */
4936 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4937 
4938 	/* ack_receiver does not clean up anything. it must not interfere, either */
4939 	drbd_thread_stop(&connection->ack_receiver);
4940 	if (connection->ack_sender) {
4941 		destroy_workqueue(connection->ack_sender);
4942 		connection->ack_sender = NULL;
4943 	}
4944 	drbd_free_sock(connection);
4945 
4946 	rcu_read_lock();
4947 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4948 		struct drbd_device *device = peer_device->device;
4949 		kref_get(&device->kref);
4950 		rcu_read_unlock();
4951 		drbd_disconnected(peer_device);
4952 		kref_put(&device->kref, drbd_destroy_device);
4953 		rcu_read_lock();
4954 	}
4955 	rcu_read_unlock();
4956 
4957 	if (!list_empty(&connection->current_epoch->list))
4958 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4959 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4960 	atomic_set(&connection->current_epoch->epoch_size, 0);
4961 	connection->send.seen_any_write_yet = false;
4962 
4963 	drbd_info(connection, "Connection closed\n");
4964 
4965 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4966 		conn_try_outdate_peer_async(connection);
4967 
4968 	spin_lock_irq(&connection->resource->req_lock);
4969 	oc = connection->cstate;
4970 	if (oc >= C_UNCONNECTED)
4971 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4972 
4973 	spin_unlock_irq(&connection->resource->req_lock);
4974 
4975 	if (oc == C_DISCONNECTING)
4976 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4977 }
4978 
4979 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4980 {
4981 	struct drbd_device *device = peer_device->device;
4982 	unsigned int i;
4983 
4984 	/* wait for current activity to cease. */
4985 	spin_lock_irq(&device->resource->req_lock);
4986 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4987 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4988 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4989 	spin_unlock_irq(&device->resource->req_lock);
4990 
4991 	/* We do not have data structures that would allow us to
4992 	 * get the rs_pending_cnt down to 0 again.
4993 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4994 	 *    the pending RSDataRequest's we have sent.
4995 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4996 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4997 	 *  And no, it is not the sum of the reference counts in the
4998 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4999 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5000 	 *  on the fly. */
5001 	drbd_rs_cancel_all(device);
5002 	device->rs_total = 0;
5003 	device->rs_failed = 0;
5004 	atomic_set(&device->rs_pending_cnt, 0);
5005 	wake_up(&device->misc_wait);
5006 
5007 	timer_delete_sync(&device->resync_timer);
5008 	resync_timer_fn(&device->resync_timer);
5009 
5010 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5011 	 * w_make_resync_request etc. which may still be on the worker queue
5012 	 * to be "canceled" */
5013 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5014 
5015 	drbd_finish_peer_reqs(device);
5016 
5017 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5018 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5019 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5020 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5021 
5022 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5023 	 * again via drbd_try_clear_on_disk_bm(). */
5024 	drbd_rs_cancel_all(device);
5025 
5026 	kfree(device->p_uuid);
5027 	device->p_uuid = NULL;
5028 
5029 	if (!drbd_suspended(device))
5030 		tl_clear(peer_device->connection);
5031 
5032 	drbd_md_sync(device);
5033 
5034 	if (get_ldev(device)) {
5035 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5036 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL);
5037 		put_ldev(device);
5038 	}
5039 
5040 	i = atomic_read(&device->pp_in_use_by_net);
5041 	if (i)
5042 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5043 	i = atomic_read(&device->pp_in_use);
5044 	if (i)
5045 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5046 
5047 	D_ASSERT(device, list_empty(&device->read_ee));
5048 	D_ASSERT(device, list_empty(&device->active_ee));
5049 	D_ASSERT(device, list_empty(&device->sync_ee));
5050 	D_ASSERT(device, list_empty(&device->done_ee));
5051 
5052 	return 0;
5053 }
5054 
5055 /*
5056  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5057  * we can agree on is stored in agreed_pro_version.
5058  *
5059  * feature flags and the reserved array should be enough room for future
5060  * enhancements of the handshake protocol, and possible plugins...
5061  *
5062  * for now, they are expected to be zero, but ignored.
5063  */
5064 static int drbd_send_features(struct drbd_connection *connection)
5065 {
5066 	struct drbd_socket *sock;
5067 	struct p_connection_features *p;
5068 
5069 	sock = &connection->data;
5070 	p = conn_prepare_command(connection, sock);
5071 	if (!p)
5072 		return -EIO;
5073 	memset(p, 0, sizeof(*p));
5074 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5075 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5076 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5077 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5078 }
5079 
5080 /*
5081  * return values:
5082  *   1 yes, we have a valid connection
5083  *   0 oops, did not work out, please try again
5084  *  -1 peer talks different language,
5085  *     no point in trying again, please go standalone.
5086  */
5087 static int drbd_do_features(struct drbd_connection *connection)
5088 {
5089 	/* ASSERT current == connection->receiver ... */
5090 	struct p_connection_features *p;
5091 	const int expect = sizeof(struct p_connection_features);
5092 	struct packet_info pi;
5093 	int err;
5094 
5095 	err = drbd_send_features(connection);
5096 	if (err)
5097 		return 0;
5098 
5099 	err = drbd_recv_header(connection, &pi);
5100 	if (err)
5101 		return 0;
5102 
5103 	if (pi.cmd != P_CONNECTION_FEATURES) {
5104 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5105 			 cmdname(pi.cmd), pi.cmd);
5106 		return -1;
5107 	}
5108 
5109 	if (pi.size != expect) {
5110 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5111 		     expect, pi.size);
5112 		return -1;
5113 	}
5114 
5115 	p = pi.data;
5116 	err = drbd_recv_all_warn(connection, p, expect);
5117 	if (err)
5118 		return 0;
5119 
5120 	p->protocol_min = be32_to_cpu(p->protocol_min);
5121 	p->protocol_max = be32_to_cpu(p->protocol_max);
5122 	if (p->protocol_max == 0)
5123 		p->protocol_max = p->protocol_min;
5124 
5125 	if (PRO_VERSION_MAX < p->protocol_min ||
5126 	    PRO_VERSION_MIN > p->protocol_max)
5127 		goto incompat;
5128 
5129 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5130 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5131 
5132 	drbd_info(connection, "Handshake successful: "
5133 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5134 
5135 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5136 		  connection->agreed_features,
5137 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5138 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5139 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5140 		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5141 		  connection->agreed_features ? "" : " none");
5142 
5143 	return 1;
5144 
5145  incompat:
5146 	drbd_err(connection, "incompatible DRBD dialects: "
5147 	    "I support %d-%d, peer supports %d-%d\n",
5148 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5149 	    p->protocol_min, p->protocol_max);
5150 	return -1;
5151 }
5152 
5153 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5154 static int drbd_do_auth(struct drbd_connection *connection)
5155 {
5156 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5157 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5158 	return -1;
5159 }
5160 #else
5161 #define CHALLENGE_LEN 64
5162 
5163 /* Return value:
5164 	1 - auth succeeded,
5165 	0 - failed, try again (network error),
5166 	-1 - auth failed, don't try again.
5167 */
5168 
5169 static int drbd_do_auth(struct drbd_connection *connection)
5170 {
5171 	struct drbd_socket *sock;
5172 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5173 	char *response = NULL;
5174 	char *right_response = NULL;
5175 	char *peers_ch = NULL;
5176 	unsigned int key_len;
5177 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5178 	unsigned int resp_size;
5179 	struct shash_desc *desc;
5180 	struct packet_info pi;
5181 	struct net_conf *nc;
5182 	int err, rv;
5183 
5184 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5185 
5186 	rcu_read_lock();
5187 	nc = rcu_dereference(connection->net_conf);
5188 	key_len = strlen(nc->shared_secret);
5189 	memcpy(secret, nc->shared_secret, key_len);
5190 	rcu_read_unlock();
5191 
5192 	desc = kmalloc(sizeof(struct shash_desc) +
5193 		       crypto_shash_descsize(connection->cram_hmac_tfm),
5194 		       GFP_KERNEL);
5195 	if (!desc) {
5196 		rv = -1;
5197 		goto fail;
5198 	}
5199 	desc->tfm = connection->cram_hmac_tfm;
5200 
5201 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5202 	if (rv) {
5203 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5204 		rv = -1;
5205 		goto fail;
5206 	}
5207 
5208 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5209 
5210 	sock = &connection->data;
5211 	if (!conn_prepare_command(connection, sock)) {
5212 		rv = 0;
5213 		goto fail;
5214 	}
5215 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5216 				my_challenge, CHALLENGE_LEN);
5217 	if (!rv)
5218 		goto fail;
5219 
5220 	err = drbd_recv_header(connection, &pi);
5221 	if (err) {
5222 		rv = 0;
5223 		goto fail;
5224 	}
5225 
5226 	if (pi.cmd != P_AUTH_CHALLENGE) {
5227 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5228 			 cmdname(pi.cmd), pi.cmd);
5229 		rv = -1;
5230 		goto fail;
5231 	}
5232 
5233 	if (pi.size > CHALLENGE_LEN * 2) {
5234 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5235 		rv = -1;
5236 		goto fail;
5237 	}
5238 
5239 	if (pi.size < CHALLENGE_LEN) {
5240 		drbd_err(connection, "AuthChallenge payload too small.\n");
5241 		rv = -1;
5242 		goto fail;
5243 	}
5244 
5245 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5246 	if (!peers_ch) {
5247 		rv = -1;
5248 		goto fail;
5249 	}
5250 
5251 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5252 	if (err) {
5253 		rv = 0;
5254 		goto fail;
5255 	}
5256 
5257 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5258 		drbd_err(connection, "Peer presented the same challenge!\n");
5259 		rv = -1;
5260 		goto fail;
5261 	}
5262 
5263 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5264 	response = kmalloc(resp_size, GFP_NOIO);
5265 	if (!response) {
5266 		rv = -1;
5267 		goto fail;
5268 	}
5269 
5270 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5271 	if (rv) {
5272 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5273 		rv = -1;
5274 		goto fail;
5275 	}
5276 
5277 	if (!conn_prepare_command(connection, sock)) {
5278 		rv = 0;
5279 		goto fail;
5280 	}
5281 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5282 				response, resp_size);
5283 	if (!rv)
5284 		goto fail;
5285 
5286 	err = drbd_recv_header(connection, &pi);
5287 	if (err) {
5288 		rv = 0;
5289 		goto fail;
5290 	}
5291 
5292 	if (pi.cmd != P_AUTH_RESPONSE) {
5293 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5294 			 cmdname(pi.cmd), pi.cmd);
5295 		rv = 0;
5296 		goto fail;
5297 	}
5298 
5299 	if (pi.size != resp_size) {
5300 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5301 		rv = 0;
5302 		goto fail;
5303 	}
5304 
5305 	err = drbd_recv_all_warn(connection, response , resp_size);
5306 	if (err) {
5307 		rv = 0;
5308 		goto fail;
5309 	}
5310 
5311 	right_response = kmalloc(resp_size, GFP_NOIO);
5312 	if (!right_response) {
5313 		rv = -1;
5314 		goto fail;
5315 	}
5316 
5317 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5318 				 right_response);
5319 	if (rv) {
5320 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5321 		rv = -1;
5322 		goto fail;
5323 	}
5324 
5325 	rv = !memcmp(response, right_response, resp_size);
5326 
5327 	if (rv)
5328 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5329 		     resp_size);
5330 	else
5331 		rv = -1;
5332 
5333  fail:
5334 	kfree(peers_ch);
5335 	kfree(response);
5336 	kfree(right_response);
5337 	if (desc) {
5338 		shash_desc_zero(desc);
5339 		kfree(desc);
5340 	}
5341 
5342 	return rv;
5343 }
5344 #endif
5345 
5346 int drbd_receiver(struct drbd_thread *thi)
5347 {
5348 	struct drbd_connection *connection = thi->connection;
5349 	int h;
5350 
5351 	drbd_info(connection, "receiver (re)started\n");
5352 
5353 	do {
5354 		h = conn_connect(connection);
5355 		if (h == 0) {
5356 			conn_disconnect(connection);
5357 			schedule_timeout_interruptible(HZ);
5358 		}
5359 		if (h == -1) {
5360 			drbd_warn(connection, "Discarding network configuration.\n");
5361 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5362 		}
5363 	} while (h == 0);
5364 
5365 	if (h > 0) {
5366 		blk_start_plug(&connection->receiver_plug);
5367 		drbdd(connection);
5368 		blk_finish_plug(&connection->receiver_plug);
5369 	}
5370 
5371 	conn_disconnect(connection);
5372 
5373 	drbd_info(connection, "receiver terminated\n");
5374 	return 0;
5375 }
5376 
5377 /* ********* acknowledge sender ******** */
5378 
5379 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5380 {
5381 	struct p_req_state_reply *p = pi->data;
5382 	int retcode = be32_to_cpu(p->retcode);
5383 
5384 	if (retcode >= SS_SUCCESS) {
5385 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5386 	} else {
5387 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5388 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5389 			 drbd_set_st_err_str(retcode), retcode);
5390 	}
5391 	wake_up(&connection->ping_wait);
5392 
5393 	return 0;
5394 }
5395 
5396 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5397 {
5398 	struct drbd_peer_device *peer_device;
5399 	struct drbd_device *device;
5400 	struct p_req_state_reply *p = pi->data;
5401 	int retcode = be32_to_cpu(p->retcode);
5402 
5403 	peer_device = conn_peer_device(connection, pi->vnr);
5404 	if (!peer_device)
5405 		return -EIO;
5406 	device = peer_device->device;
5407 
5408 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5409 		D_ASSERT(device, connection->agreed_pro_version < 100);
5410 		return got_conn_RqSReply(connection, pi);
5411 	}
5412 
5413 	if (retcode >= SS_SUCCESS) {
5414 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5415 	} else {
5416 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5417 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5418 			drbd_set_st_err_str(retcode), retcode);
5419 	}
5420 	wake_up(&device->state_wait);
5421 
5422 	return 0;
5423 }
5424 
5425 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5426 {
5427 	return drbd_send_ping_ack(connection);
5428 
5429 }
5430 
5431 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5432 {
5433 	/* restore idle timeout */
5434 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5435 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5436 		wake_up(&connection->ping_wait);
5437 
5438 	return 0;
5439 }
5440 
5441 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5442 {
5443 	struct drbd_peer_device *peer_device;
5444 	struct drbd_device *device;
5445 	struct p_block_ack *p = pi->data;
5446 	sector_t sector = be64_to_cpu(p->sector);
5447 	int blksize = be32_to_cpu(p->blksize);
5448 
5449 	peer_device = conn_peer_device(connection, pi->vnr);
5450 	if (!peer_device)
5451 		return -EIO;
5452 	device = peer_device->device;
5453 
5454 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5455 
5456 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5457 
5458 	if (get_ldev(device)) {
5459 		drbd_rs_complete_io(device, sector);
5460 		drbd_set_in_sync(peer_device, sector, blksize);
5461 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5462 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5463 		put_ldev(device);
5464 	}
5465 	dec_rs_pending(peer_device);
5466 	atomic_add(blksize >> 9, &device->rs_sect_in);
5467 
5468 	return 0;
5469 }
5470 
5471 static int
5472 validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
5473 			      struct rb_root *root, const char *func,
5474 			      enum drbd_req_event what, bool missing_ok)
5475 {
5476 	struct drbd_device *device = peer_device->device;
5477 	struct drbd_request *req;
5478 	struct bio_and_error m;
5479 
5480 	spin_lock_irq(&device->resource->req_lock);
5481 	req = find_request(device, root, id, sector, missing_ok, func);
5482 	if (unlikely(!req)) {
5483 		spin_unlock_irq(&device->resource->req_lock);
5484 		return -EIO;
5485 	}
5486 	__req_mod(req, what, peer_device, &m);
5487 	spin_unlock_irq(&device->resource->req_lock);
5488 
5489 	if (m.bio)
5490 		complete_master_bio(device, &m);
5491 	return 0;
5492 }
5493 
5494 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5495 {
5496 	struct drbd_peer_device *peer_device;
5497 	struct drbd_device *device;
5498 	struct p_block_ack *p = pi->data;
5499 	sector_t sector = be64_to_cpu(p->sector);
5500 	int blksize = be32_to_cpu(p->blksize);
5501 	enum drbd_req_event what;
5502 
5503 	peer_device = conn_peer_device(connection, pi->vnr);
5504 	if (!peer_device)
5505 		return -EIO;
5506 	device = peer_device->device;
5507 
5508 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5509 
5510 	if (p->block_id == ID_SYNCER) {
5511 		drbd_set_in_sync(peer_device, sector, blksize);
5512 		dec_rs_pending(peer_device);
5513 		return 0;
5514 	}
5515 	switch (pi->cmd) {
5516 	case P_RS_WRITE_ACK:
5517 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5518 		break;
5519 	case P_WRITE_ACK:
5520 		what = WRITE_ACKED_BY_PEER;
5521 		break;
5522 	case P_RECV_ACK:
5523 		what = RECV_ACKED_BY_PEER;
5524 		break;
5525 	case P_SUPERSEDED:
5526 		what = CONFLICT_RESOLVED;
5527 		break;
5528 	case P_RETRY_WRITE:
5529 		what = POSTPONE_WRITE;
5530 		break;
5531 	default:
5532 		BUG();
5533 	}
5534 
5535 	return validate_req_change_req_state(peer_device, p->block_id, sector,
5536 					     &device->write_requests, __func__,
5537 					     what, false);
5538 }
5539 
5540 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5541 {
5542 	struct drbd_peer_device *peer_device;
5543 	struct drbd_device *device;
5544 	struct p_block_ack *p = pi->data;
5545 	sector_t sector = be64_to_cpu(p->sector);
5546 	int size = be32_to_cpu(p->blksize);
5547 	int err;
5548 
5549 	peer_device = conn_peer_device(connection, pi->vnr);
5550 	if (!peer_device)
5551 		return -EIO;
5552 	device = peer_device->device;
5553 
5554 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5555 
5556 	if (p->block_id == ID_SYNCER) {
5557 		dec_rs_pending(peer_device);
5558 		drbd_rs_failed_io(peer_device, sector, size);
5559 		return 0;
5560 	}
5561 
5562 	err = validate_req_change_req_state(peer_device, p->block_id, sector,
5563 					    &device->write_requests, __func__,
5564 					    NEG_ACKED, true);
5565 	if (err) {
5566 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5567 		   The master bio might already be completed, therefore the
5568 		   request is no longer in the collision hash. */
5569 		/* In Protocol B we might already have got a P_RECV_ACK
5570 		   but then get a P_NEG_ACK afterwards. */
5571 		drbd_set_out_of_sync(peer_device, sector, size);
5572 	}
5573 	return 0;
5574 }
5575 
5576 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5577 {
5578 	struct drbd_peer_device *peer_device;
5579 	struct drbd_device *device;
5580 	struct p_block_ack *p = pi->data;
5581 	sector_t sector = be64_to_cpu(p->sector);
5582 
5583 	peer_device = conn_peer_device(connection, pi->vnr);
5584 	if (!peer_device)
5585 		return -EIO;
5586 	device = peer_device->device;
5587 
5588 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5589 
5590 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5591 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5592 
5593 	return validate_req_change_req_state(peer_device, p->block_id, sector,
5594 					     &device->read_requests, __func__,
5595 					     NEG_ACKED, false);
5596 }
5597 
5598 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5599 {
5600 	struct drbd_peer_device *peer_device;
5601 	struct drbd_device *device;
5602 	sector_t sector;
5603 	int size;
5604 	struct p_block_ack *p = pi->data;
5605 
5606 	peer_device = conn_peer_device(connection, pi->vnr);
5607 	if (!peer_device)
5608 		return -EIO;
5609 	device = peer_device->device;
5610 
5611 	sector = be64_to_cpu(p->sector);
5612 	size = be32_to_cpu(p->blksize);
5613 
5614 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5615 
5616 	dec_rs_pending(peer_device);
5617 
5618 	if (get_ldev_if_state(device, D_FAILED)) {
5619 		drbd_rs_complete_io(device, sector);
5620 		switch (pi->cmd) {
5621 		case P_NEG_RS_DREPLY:
5622 			drbd_rs_failed_io(peer_device, sector, size);
5623 			break;
5624 		case P_RS_CANCEL:
5625 			break;
5626 		default:
5627 			BUG();
5628 		}
5629 		put_ldev(device);
5630 	}
5631 
5632 	return 0;
5633 }
5634 
5635 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5636 {
5637 	struct p_barrier_ack *p = pi->data;
5638 	struct drbd_peer_device *peer_device;
5639 	int vnr;
5640 
5641 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5642 
5643 	rcu_read_lock();
5644 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5645 		struct drbd_device *device = peer_device->device;
5646 
5647 		if (device->state.conn == C_AHEAD &&
5648 		    atomic_read(&device->ap_in_flight) == 0 &&
5649 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5650 			device->start_resync_timer.expires = jiffies + HZ;
5651 			add_timer(&device->start_resync_timer);
5652 		}
5653 	}
5654 	rcu_read_unlock();
5655 
5656 	return 0;
5657 }
5658 
5659 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5660 {
5661 	struct drbd_peer_device *peer_device;
5662 	struct drbd_device *device;
5663 	struct p_block_ack *p = pi->data;
5664 	struct drbd_device_work *dw;
5665 	sector_t sector;
5666 	int size;
5667 
5668 	peer_device = conn_peer_device(connection, pi->vnr);
5669 	if (!peer_device)
5670 		return -EIO;
5671 	device = peer_device->device;
5672 
5673 	sector = be64_to_cpu(p->sector);
5674 	size = be32_to_cpu(p->blksize);
5675 
5676 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5677 
5678 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5679 		drbd_ov_out_of_sync_found(peer_device, sector, size);
5680 	else
5681 		ov_out_of_sync_print(peer_device);
5682 
5683 	if (!get_ldev(device))
5684 		return 0;
5685 
5686 	drbd_rs_complete_io(device, sector);
5687 	dec_rs_pending(peer_device);
5688 
5689 	--device->ov_left;
5690 
5691 	/* let's advance progress step marks only for every other megabyte */
5692 	if ((device->ov_left & 0x200) == 0x200)
5693 		drbd_advance_rs_marks(peer_device, device->ov_left);
5694 
5695 	if (device->ov_left == 0) {
5696 		dw = kmalloc_obj(*dw, GFP_NOIO);
5697 		if (dw) {
5698 			dw->w.cb = w_ov_finished;
5699 			dw->device = device;
5700 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5701 		} else {
5702 			drbd_err(device, "kmalloc(dw) failed.");
5703 			ov_out_of_sync_print(peer_device);
5704 			drbd_resync_finished(peer_device);
5705 		}
5706 	}
5707 	put_ldev(device);
5708 	return 0;
5709 }
5710 
5711 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5712 {
5713 	return 0;
5714 }
5715 
5716 struct meta_sock_cmd {
5717 	size_t pkt_size;
5718 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5719 };
5720 
5721 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5722 {
5723 	long t;
5724 	struct net_conf *nc;
5725 
5726 	rcu_read_lock();
5727 	nc = rcu_dereference(connection->net_conf);
5728 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5729 	rcu_read_unlock();
5730 
5731 	t *= HZ;
5732 	if (ping_timeout)
5733 		t /= 10;
5734 
5735 	connection->meta.socket->sk->sk_rcvtimeo = t;
5736 }
5737 
5738 static void set_ping_timeout(struct drbd_connection *connection)
5739 {
5740 	set_rcvtimeo(connection, 1);
5741 }
5742 
5743 static void set_idle_timeout(struct drbd_connection *connection)
5744 {
5745 	set_rcvtimeo(connection, 0);
5746 }
5747 
5748 static struct meta_sock_cmd ack_receiver_tbl[] = {
5749 	[P_PING]	    = { 0, got_Ping },
5750 	[P_PING_ACK]	    = { 0, got_PingAck },
5751 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5752 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5753 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5754 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5755 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5756 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5757 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5758 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5759 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5760 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5761 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5762 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5763 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5764 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5765 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5766 };
5767 
5768 int drbd_ack_receiver(struct drbd_thread *thi)
5769 {
5770 	struct drbd_connection *connection = thi->connection;
5771 	struct meta_sock_cmd *cmd = NULL;
5772 	struct packet_info pi;
5773 	unsigned long pre_recv_jif;
5774 	int rv;
5775 	void *buf    = connection->meta.rbuf;
5776 	int received = 0;
5777 	unsigned int header_size = drbd_header_size(connection);
5778 	int expect   = header_size;
5779 	bool ping_timeout_active = false;
5780 
5781 	sched_set_fifo_low(current);
5782 
5783 	while (get_t_state(thi) == RUNNING) {
5784 		drbd_thread_current_set_cpu(thi);
5785 
5786 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5787 			if (drbd_send_ping(connection)) {
5788 				drbd_err(connection, "drbd_send_ping has failed\n");
5789 				goto reconnect;
5790 			}
5791 			set_ping_timeout(connection);
5792 			ping_timeout_active = true;
5793 		}
5794 
5795 		pre_recv_jif = jiffies;
5796 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5797 
5798 		/* Note:
5799 		 * -EINTR	 (on meta) we got a signal
5800 		 * -EAGAIN	 (on meta) rcvtimeo expired
5801 		 * -ECONNRESET	 other side closed the connection
5802 		 * -ERESTARTSYS  (on data) we got a signal
5803 		 * rv <  0	 other than above: unexpected error!
5804 		 * rv == expected: full header or command
5805 		 * rv <  expected: "woken" by signal during receive
5806 		 * rv == 0	 : "connection shut down by peer"
5807 		 */
5808 		if (likely(rv > 0)) {
5809 			received += rv;
5810 			buf	 += rv;
5811 		} else if (rv == 0) {
5812 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5813 				long t;
5814 				rcu_read_lock();
5815 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5816 				rcu_read_unlock();
5817 
5818 				t = wait_event_timeout(connection->ping_wait,
5819 						       connection->cstate < C_WF_REPORT_PARAMS,
5820 						       t);
5821 				if (t)
5822 					break;
5823 			}
5824 			drbd_err(connection, "meta connection shut down by peer.\n");
5825 			goto reconnect;
5826 		} else if (rv == -EAGAIN) {
5827 			/* If the data socket received something meanwhile,
5828 			 * that is good enough: peer is still alive. */
5829 			if (time_after(connection->last_received, pre_recv_jif))
5830 				continue;
5831 			if (ping_timeout_active) {
5832 				drbd_err(connection, "PingAck did not arrive in time.\n");
5833 				goto reconnect;
5834 			}
5835 			set_bit(SEND_PING, &connection->flags);
5836 			continue;
5837 		} else if (rv == -EINTR) {
5838 			/* maybe drbd_thread_stop(): the while condition will notice.
5839 			 * maybe woken for send_ping: we'll send a ping above,
5840 			 * and change the rcvtimeo */
5841 			flush_signals(current);
5842 			continue;
5843 		} else {
5844 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5845 			goto reconnect;
5846 		}
5847 
5848 		if (received == expect && cmd == NULL) {
5849 			if (decode_header(connection, connection->meta.rbuf, &pi))
5850 				goto reconnect;
5851 			cmd = &ack_receiver_tbl[pi.cmd];
5852 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5853 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5854 					 cmdname(pi.cmd), pi.cmd);
5855 				goto disconnect;
5856 			}
5857 			expect = header_size + cmd->pkt_size;
5858 			if (pi.size != expect - header_size) {
5859 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5860 					pi.cmd, pi.size);
5861 				goto reconnect;
5862 			}
5863 		}
5864 		if (received == expect) {
5865 			bool err;
5866 
5867 			err = cmd->fn(connection, &pi);
5868 			if (err) {
5869 				drbd_err(connection, "%ps failed\n", cmd->fn);
5870 				goto reconnect;
5871 			}
5872 
5873 			connection->last_received = jiffies;
5874 
5875 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5876 				set_idle_timeout(connection);
5877 				ping_timeout_active = false;
5878 			}
5879 
5880 			buf	 = connection->meta.rbuf;
5881 			received = 0;
5882 			expect	 = header_size;
5883 			cmd	 = NULL;
5884 		}
5885 	}
5886 
5887 	if (0) {
5888 reconnect:
5889 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5890 		conn_md_sync(connection);
5891 	}
5892 	if (0) {
5893 disconnect:
5894 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5895 	}
5896 
5897 	drbd_info(connection, "ack_receiver terminated\n");
5898 
5899 	return 0;
5900 }
5901 
5902 void drbd_send_acks_wf(struct work_struct *ws)
5903 {
5904 	struct drbd_peer_device *peer_device =
5905 		container_of(ws, struct drbd_peer_device, send_acks_work);
5906 	struct drbd_connection *connection = peer_device->connection;
5907 	struct drbd_device *device = peer_device->device;
5908 	struct net_conf *nc;
5909 	int tcp_cork, err;
5910 
5911 	rcu_read_lock();
5912 	nc = rcu_dereference(connection->net_conf);
5913 	tcp_cork = nc->tcp_cork;
5914 	rcu_read_unlock();
5915 
5916 	if (tcp_cork)
5917 		tcp_sock_set_cork(connection->meta.socket->sk, true);
5918 
5919 	err = drbd_finish_peer_reqs(device);
5920 	kref_put(&device->kref, drbd_destroy_device);
5921 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5922 	   struct work_struct send_acks_work alive, which is in the peer_device object */
5923 
5924 	if (err) {
5925 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5926 		return;
5927 	}
5928 
5929 	if (tcp_cork)
5930 		tcp_sock_set_cork(connection->meta.socket->sk, false);
5931 
5932 	return;
5933 }
5934