xref: /linux/drivers/block/drbd/drbd_receiver.c (revision 300a0cfe9f375b2843bcb331bcfa7503475ef5dd)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_receiver.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11  */
12 
13 
14 #include <linux/module.h>
15 
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18 
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #include <linux/unistd.h>
31 #include <linux/vmalloc.h>
32 #include <linux/random.h>
33 #include <linux/string.h>
34 #include <linux/scatterlist.h>
35 #include <linux/part_stat.h>
36 #include <linux/mempool.h>
37 #include "drbd_int.h"
38 #include "drbd_protocol.h"
39 #include "drbd_req.h"
40 #include "drbd_vli.h"
41 
42 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
43 
44 struct packet_info {
45 	enum drbd_packet cmd;
46 	unsigned int size;
47 	unsigned int vnr;
48 	void *data;
49 };
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_features(struct drbd_connection *connection);
58 static int drbd_do_auth(struct drbd_connection *connection);
59 static int drbd_disconnected(struct drbd_peer_device *);
60 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
61 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
62 static int e_end_block(struct drbd_work *, int);
63 
64 
65 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
66 
67 static struct page *__drbd_alloc_pages(unsigned int number)
68 {
69 	struct page *page = NULL;
70 	struct page *tmp = NULL;
71 	unsigned int i = 0;
72 
73 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
74 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
75 	 * which in turn might block on the other node at this very place.  */
76 	for (i = 0; i < number; i++) {
77 		tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY);
78 		if (!tmp)
79 			goto fail;
80 		set_page_private(tmp, (unsigned long)page);
81 		page = tmp;
82 	}
83 	return page;
84 fail:
85 	page_chain_for_each_safe(page, tmp) {
86 		set_page_private(page, 0);
87 		mempool_free(page, &drbd_buffer_page_pool);
88 	}
89 	return NULL;
90 }
91 
92 /**
93  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
94  * @peer_device:	DRBD device.
95  * @number:		number of pages requested
96  * @retry:		whether to retry, if not enough pages are available right now
97  *
98  * Tries to allocate number pages, first from our own page pool, then from
99  * the kernel.
100  * Possibly retry until DRBD frees sufficient pages somewhere else.
101  *
102  * If this allocation would exceed the max_buffers setting, we throttle
103  * allocation (schedule_timeout) to give the system some room to breathe.
104  *
105  * We do not use max-buffers as hard limit, because it could lead to
106  * congestion and further to a distributed deadlock during online-verify or
107  * (checksum based) resync, if the max-buffers, socket buffer sizes and
108  * resync-rate settings are mis-configured.
109  *
110  * Returns a page chain linked via page->private.
111  */
112 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
113 			      bool retry)
114 {
115 	struct drbd_device *device = peer_device->device;
116 	struct page *page;
117 	struct net_conf *nc;
118 	unsigned int mxb;
119 
120 	rcu_read_lock();
121 	nc = rcu_dereference(peer_device->connection->net_conf);
122 	mxb = nc ? nc->max_buffers : 1000000;
123 	rcu_read_unlock();
124 
125 	if (atomic_read(&device->pp_in_use) >= mxb)
126 		schedule_timeout_interruptible(HZ / 10);
127 	page = __drbd_alloc_pages(number);
128 
129 	if (page)
130 		atomic_add(number, &device->pp_in_use);
131 	return page;
132 }
133 
134 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
135  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
136  * Either links the page chain back to the global pool,
137  * or returns all pages to the system. */
138 static void drbd_free_pages(struct drbd_device *device, struct page *page)
139 {
140 	struct page *tmp;
141 	int i = 0;
142 
143 	if (page == NULL)
144 		return;
145 
146 	page_chain_for_each_safe(page, tmp) {
147 		set_page_private(page, 0);
148 		if (page_count(page) == 1)
149 			mempool_free(page, &drbd_buffer_page_pool);
150 		else
151 			put_page(page);
152 		i++;
153 	}
154 	i = atomic_sub_return(i, &device->pp_in_use);
155 	if (i < 0)
156 		drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
157 }
158 
159 /*
160 You need to hold the req_lock:
161  _drbd_wait_ee_list_empty()
162 
163 You must not have the req_lock:
164  drbd_free_peer_req()
165  drbd_alloc_peer_req()
166  drbd_free_peer_reqs()
167  drbd_ee_fix_bhs()
168  drbd_finish_peer_reqs()
169  drbd_clear_done_ee()
170  drbd_wait_ee_list_empty()
171 */
172 
173 /* normal: payload_size == request size (bi_size)
174  * w_same: payload_size == logical_block_size
175  * trim: payload_size == 0 */
176 struct drbd_peer_request *
177 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
178 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
179 {
180 	struct drbd_device *device = peer_device->device;
181 	struct drbd_peer_request *peer_req;
182 	struct page *page = NULL;
183 	unsigned int nr_pages = PFN_UP(payload_size);
184 
185 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
186 		return NULL;
187 
188 	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
189 	if (!peer_req) {
190 		if (!(gfp_mask & __GFP_NOWARN))
191 			drbd_err(device, "%s: allocation failed\n", __func__);
192 		return NULL;
193 	}
194 
195 	if (nr_pages) {
196 		page = drbd_alloc_pages(peer_device, nr_pages,
197 					gfpflags_allow_blocking(gfp_mask));
198 		if (!page)
199 			goto fail;
200 		if (!mempool_is_saturated(&drbd_buffer_page_pool))
201 			peer_req->flags |= EE_RELEASE_TO_MEMPOOL;
202 	}
203 
204 	memset(peer_req, 0, sizeof(*peer_req));
205 	INIT_LIST_HEAD(&peer_req->w.list);
206 	drbd_clear_interval(&peer_req->i);
207 	peer_req->i.size = request_size;
208 	peer_req->i.sector = sector;
209 	peer_req->submit_jif = jiffies;
210 	peer_req->peer_device = peer_device;
211 	peer_req->pages = page;
212 	/*
213 	 * The block_id is opaque to the receiver.  It is not endianness
214 	 * converted, and sent back to the sender unchanged.
215 	 */
216 	peer_req->block_id = id;
217 
218 	return peer_req;
219 
220  fail:
221 	mempool_free(peer_req, &drbd_ee_mempool);
222 	return NULL;
223 }
224 
225 void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req)
226 {
227 	might_sleep();
228 	if (peer_req->flags & EE_HAS_DIGEST)
229 		kfree(peer_req->digest);
230 	drbd_free_pages(device, peer_req->pages);
231 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
232 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
233 	if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
234 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
235 		drbd_al_complete_io(device, &peer_req->i);
236 	}
237 	mempool_free(peer_req, &drbd_ee_mempool);
238 }
239 
240 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
241 {
242 	LIST_HEAD(work_list);
243 	struct drbd_peer_request *peer_req, *t;
244 	int count = 0;
245 
246 	spin_lock_irq(&device->resource->req_lock);
247 	list_splice_init(list, &work_list);
248 	spin_unlock_irq(&device->resource->req_lock);
249 
250 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
251 		drbd_free_peer_req(device, peer_req);
252 		count++;
253 	}
254 	return count;
255 }
256 
257 /*
258  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
259  */
260 static int drbd_finish_peer_reqs(struct drbd_device *device)
261 {
262 	LIST_HEAD(work_list);
263 	struct drbd_peer_request *peer_req, *t;
264 	int err = 0;
265 
266 	spin_lock_irq(&device->resource->req_lock);
267 	list_splice_init(&device->done_ee, &work_list);
268 	spin_unlock_irq(&device->resource->req_lock);
269 
270 	/* possible callbacks here:
271 	 * e_end_block, and e_end_resync_block, e_send_superseded.
272 	 * all ignore the last argument.
273 	 */
274 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
275 		int err2;
276 
277 		/* list_del not necessary, next/prev members not touched */
278 		err2 = peer_req->w.cb(&peer_req->w, !!err);
279 		if (!err)
280 			err = err2;
281 		drbd_free_peer_req(device, peer_req);
282 	}
283 	wake_up(&device->ee_wait);
284 
285 	return err;
286 }
287 
288 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
289 				     struct list_head *head)
290 {
291 	DEFINE_WAIT(wait);
292 
293 	/* avoids spin_lock/unlock
294 	 * and calling prepare_to_wait in the fast path */
295 	while (!list_empty(head)) {
296 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
297 		spin_unlock_irq(&device->resource->req_lock);
298 		io_schedule();
299 		finish_wait(&device->ee_wait, &wait);
300 		spin_lock_irq(&device->resource->req_lock);
301 	}
302 }
303 
304 static void drbd_wait_ee_list_empty(struct drbd_device *device,
305 				    struct list_head *head)
306 {
307 	spin_lock_irq(&device->resource->req_lock);
308 	_drbd_wait_ee_list_empty(device, head);
309 	spin_unlock_irq(&device->resource->req_lock);
310 }
311 
312 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
313 {
314 	struct kvec iov = {
315 		.iov_base = buf,
316 		.iov_len = size,
317 	};
318 	struct msghdr msg = {
319 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
320 	};
321 	iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size);
322 	return sock_recvmsg(sock, &msg, msg.msg_flags);
323 }
324 
325 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
326 {
327 	int rv;
328 
329 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
330 
331 	if (rv < 0) {
332 		if (rv == -ECONNRESET)
333 			drbd_info(connection, "sock was reset by peer\n");
334 		else if (rv != -ERESTARTSYS)
335 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
336 	} else if (rv == 0) {
337 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
338 			long t;
339 			rcu_read_lock();
340 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
341 			rcu_read_unlock();
342 
343 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
344 
345 			if (t)
346 				goto out;
347 		}
348 		drbd_info(connection, "sock was shut down by peer\n");
349 	}
350 
351 	if (rv != size)
352 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
353 
354 out:
355 	return rv;
356 }
357 
358 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
359 {
360 	int err;
361 
362 	err = drbd_recv(connection, buf, size);
363 	if (err != size) {
364 		if (err >= 0)
365 			err = -EIO;
366 	} else
367 		err = 0;
368 	return err;
369 }
370 
371 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
372 {
373 	int err;
374 
375 	err = drbd_recv_all(connection, buf, size);
376 	if (err && !signal_pending(current))
377 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
378 	return err;
379 }
380 
381 /* quoting tcp(7):
382  *   On individual connections, the socket buffer size must be set prior to the
383  *   listen(2) or connect(2) calls in order to have it take effect.
384  * This is our wrapper to do so.
385  */
386 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
387 		unsigned int rcv)
388 {
389 	/* open coded SO_SNDBUF, SO_RCVBUF */
390 	if (snd) {
391 		sock->sk->sk_sndbuf = snd;
392 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
393 	}
394 	if (rcv) {
395 		sock->sk->sk_rcvbuf = rcv;
396 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
397 	}
398 }
399 
400 static struct socket *drbd_try_connect(struct drbd_connection *connection)
401 {
402 	const char *what;
403 	struct socket *sock;
404 	struct sockaddr_in6 src_in6;
405 	struct sockaddr_in6 peer_in6;
406 	struct net_conf *nc;
407 	int err, peer_addr_len, my_addr_len;
408 	int sndbuf_size, rcvbuf_size, connect_int;
409 	int disconnect_on_error = 1;
410 
411 	rcu_read_lock();
412 	nc = rcu_dereference(connection->net_conf);
413 	if (!nc) {
414 		rcu_read_unlock();
415 		return NULL;
416 	}
417 	sndbuf_size = nc->sndbuf_size;
418 	rcvbuf_size = nc->rcvbuf_size;
419 	connect_int = nc->connect_int;
420 	rcu_read_unlock();
421 
422 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
423 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
424 
425 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
426 		src_in6.sin6_port = 0;
427 	else
428 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
429 
430 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
431 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
432 
433 	what = "sock_create_kern";
434 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
435 			       SOCK_STREAM, IPPROTO_TCP, &sock);
436 	if (err < 0) {
437 		sock = NULL;
438 		goto out;
439 	}
440 
441 	sock->sk->sk_rcvtimeo =
442 	sock->sk->sk_sndtimeo = connect_int * HZ;
443 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
444 
445        /* explicitly bind to the configured IP as source IP
446 	*  for the outgoing connections.
447 	*  This is needed for multihomed hosts and to be
448 	*  able to use lo: interfaces for drbd.
449 	* Make sure to use 0 as port number, so linux selects
450 	*  a free one dynamically.
451 	*/
452 	what = "bind before connect";
453 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
454 	if (err < 0)
455 		goto out;
456 
457 	/* connect may fail, peer not yet available.
458 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
459 	disconnect_on_error = 0;
460 	what = "connect";
461 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
462 
463 out:
464 	if (err < 0) {
465 		if (sock) {
466 			sock_release(sock);
467 			sock = NULL;
468 		}
469 		switch (-err) {
470 			/* timeout, busy, signal pending */
471 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
472 		case EINTR: case ERESTARTSYS:
473 			/* peer not (yet) available, network problem */
474 		case ECONNREFUSED: case ENETUNREACH:
475 		case EHOSTDOWN:    case EHOSTUNREACH:
476 			disconnect_on_error = 0;
477 			break;
478 		default:
479 			drbd_err(connection, "%s failed, err = %d\n", what, err);
480 		}
481 		if (disconnect_on_error)
482 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
483 	}
484 
485 	return sock;
486 }
487 
488 struct accept_wait_data {
489 	struct drbd_connection *connection;
490 	struct socket *s_listen;
491 	struct completion door_bell;
492 	void (*original_sk_state_change)(struct sock *sk);
493 
494 };
495 
496 static void drbd_incoming_connection(struct sock *sk)
497 {
498 	struct accept_wait_data *ad = sk->sk_user_data;
499 	void (*state_change)(struct sock *sk);
500 
501 	state_change = ad->original_sk_state_change;
502 	if (sk->sk_state == TCP_ESTABLISHED)
503 		complete(&ad->door_bell);
504 	state_change(sk);
505 }
506 
507 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
508 {
509 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
510 	struct sockaddr_in6 my_addr;
511 	struct socket *s_listen;
512 	struct net_conf *nc;
513 	const char *what;
514 
515 	rcu_read_lock();
516 	nc = rcu_dereference(connection->net_conf);
517 	if (!nc) {
518 		rcu_read_unlock();
519 		return -EIO;
520 	}
521 	sndbuf_size = nc->sndbuf_size;
522 	rcvbuf_size = nc->rcvbuf_size;
523 	rcu_read_unlock();
524 
525 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
526 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
527 
528 	what = "sock_create_kern";
529 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
530 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
531 	if (err) {
532 		s_listen = NULL;
533 		goto out;
534 	}
535 
536 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
537 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
538 
539 	what = "bind before listen";
540 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
541 	if (err < 0)
542 		goto out;
543 
544 	ad->s_listen = s_listen;
545 	write_lock_bh(&s_listen->sk->sk_callback_lock);
546 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
547 	s_listen->sk->sk_state_change = drbd_incoming_connection;
548 	s_listen->sk->sk_user_data = ad;
549 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
550 
551 	what = "listen";
552 	err = s_listen->ops->listen(s_listen, 5);
553 	if (err < 0)
554 		goto out;
555 
556 	return 0;
557 out:
558 	if (s_listen)
559 		sock_release(s_listen);
560 	if (err < 0) {
561 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
562 			drbd_err(connection, "%s failed, err = %d\n", what, err);
563 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
564 		}
565 	}
566 
567 	return -EIO;
568 }
569 
570 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
571 {
572 	write_lock_bh(&sk->sk_callback_lock);
573 	sk->sk_state_change = ad->original_sk_state_change;
574 	sk->sk_user_data = NULL;
575 	write_unlock_bh(&sk->sk_callback_lock);
576 }
577 
578 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
579 {
580 	int timeo, connect_int, err = 0;
581 	struct socket *s_estab = NULL;
582 	struct net_conf *nc;
583 
584 	rcu_read_lock();
585 	nc = rcu_dereference(connection->net_conf);
586 	if (!nc) {
587 		rcu_read_unlock();
588 		return NULL;
589 	}
590 	connect_int = nc->connect_int;
591 	rcu_read_unlock();
592 
593 	timeo = connect_int * HZ;
594 	/* 28.5% random jitter */
595 	timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7;
596 
597 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
598 	if (err <= 0)
599 		return NULL;
600 
601 	err = kernel_accept(ad->s_listen, &s_estab, 0);
602 	if (err < 0) {
603 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
604 			drbd_err(connection, "accept failed, err = %d\n", err);
605 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
606 		}
607 	}
608 
609 	if (s_estab)
610 		unregister_state_change(s_estab->sk, ad);
611 
612 	return s_estab;
613 }
614 
615 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
616 
617 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
618 			     enum drbd_packet cmd)
619 {
620 	if (!conn_prepare_command(connection, sock))
621 		return -EIO;
622 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
623 }
624 
625 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
626 {
627 	unsigned int header_size = drbd_header_size(connection);
628 	struct packet_info pi;
629 	struct net_conf *nc;
630 	int err;
631 
632 	rcu_read_lock();
633 	nc = rcu_dereference(connection->net_conf);
634 	if (!nc) {
635 		rcu_read_unlock();
636 		return -EIO;
637 	}
638 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
639 	rcu_read_unlock();
640 
641 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
642 	if (err != header_size) {
643 		if (err >= 0)
644 			err = -EIO;
645 		return err;
646 	}
647 	err = decode_header(connection, connection->data.rbuf, &pi);
648 	if (err)
649 		return err;
650 	return pi.cmd;
651 }
652 
653 /**
654  * drbd_socket_okay() - Free the socket if its connection is not okay
655  * @sock:	pointer to the pointer to the socket.
656  */
657 static bool drbd_socket_okay(struct socket **sock)
658 {
659 	int rr;
660 	char tb[4];
661 
662 	if (!*sock)
663 		return false;
664 
665 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
666 
667 	if (rr > 0 || rr == -EAGAIN) {
668 		return true;
669 	} else {
670 		sock_release(*sock);
671 		*sock = NULL;
672 		return false;
673 	}
674 }
675 
676 static bool connection_established(struct drbd_connection *connection,
677 				   struct socket **sock1,
678 				   struct socket **sock2)
679 {
680 	struct net_conf *nc;
681 	int timeout;
682 	bool ok;
683 
684 	if (!*sock1 || !*sock2)
685 		return false;
686 
687 	rcu_read_lock();
688 	nc = rcu_dereference(connection->net_conf);
689 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
690 	rcu_read_unlock();
691 	schedule_timeout_interruptible(timeout);
692 
693 	ok = drbd_socket_okay(sock1);
694 	ok = drbd_socket_okay(sock2) && ok;
695 
696 	return ok;
697 }
698 
699 /* Gets called if a connection is established, or if a new minor gets created
700    in a connection */
701 int drbd_connected(struct drbd_peer_device *peer_device)
702 {
703 	struct drbd_device *device = peer_device->device;
704 	int err;
705 
706 	atomic_set(&device->packet_seq, 0);
707 	device->peer_seq = 0;
708 
709 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
710 		&peer_device->connection->cstate_mutex :
711 		&device->own_state_mutex;
712 
713 	err = drbd_send_sync_param(peer_device);
714 	if (!err)
715 		err = drbd_send_sizes(peer_device, 0, 0);
716 	if (!err)
717 		err = drbd_send_uuids(peer_device);
718 	if (!err)
719 		err = drbd_send_current_state(peer_device);
720 	clear_bit(USE_DEGR_WFC_T, &device->flags);
721 	clear_bit(RESIZE_PENDING, &device->flags);
722 	atomic_set(&device->ap_in_flight, 0);
723 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
724 	return err;
725 }
726 
727 /*
728  * return values:
729  *   1 yes, we have a valid connection
730  *   0 oops, did not work out, please try again
731  *  -1 peer talks different language,
732  *     no point in trying again, please go standalone.
733  *  -2 We do not have a network config...
734  */
735 static int conn_connect(struct drbd_connection *connection)
736 {
737 	struct drbd_socket sock, msock;
738 	struct drbd_peer_device *peer_device;
739 	struct net_conf *nc;
740 	int vnr, timeout, h;
741 	bool discard_my_data, ok;
742 	enum drbd_state_rv rv;
743 	struct accept_wait_data ad = {
744 		.connection = connection,
745 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
746 	};
747 
748 	clear_bit(DISCONNECT_SENT, &connection->flags);
749 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
750 		return -2;
751 
752 	mutex_init(&sock.mutex);
753 	sock.sbuf = connection->data.sbuf;
754 	sock.rbuf = connection->data.rbuf;
755 	sock.socket = NULL;
756 	mutex_init(&msock.mutex);
757 	msock.sbuf = connection->meta.sbuf;
758 	msock.rbuf = connection->meta.rbuf;
759 	msock.socket = NULL;
760 
761 	/* Assume that the peer only understands protocol 80 until we know better.  */
762 	connection->agreed_pro_version = 80;
763 
764 	if (prepare_listen_socket(connection, &ad))
765 		return 0;
766 
767 	do {
768 		struct socket *s;
769 
770 		s = drbd_try_connect(connection);
771 		if (s) {
772 			if (!sock.socket) {
773 				sock.socket = s;
774 				send_first_packet(connection, &sock, P_INITIAL_DATA);
775 			} else if (!msock.socket) {
776 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
777 				msock.socket = s;
778 				send_first_packet(connection, &msock, P_INITIAL_META);
779 			} else {
780 				drbd_err(connection, "Logic error in conn_connect()\n");
781 				goto out_release_sockets;
782 			}
783 		}
784 
785 		if (connection_established(connection, &sock.socket, &msock.socket))
786 			break;
787 
788 retry:
789 		s = drbd_wait_for_connect(connection, &ad);
790 		if (s) {
791 			int fp = receive_first_packet(connection, s);
792 			drbd_socket_okay(&sock.socket);
793 			drbd_socket_okay(&msock.socket);
794 			switch (fp) {
795 			case P_INITIAL_DATA:
796 				if (sock.socket) {
797 					drbd_warn(connection, "initial packet S crossed\n");
798 					sock_release(sock.socket);
799 					sock.socket = s;
800 					goto randomize;
801 				}
802 				sock.socket = s;
803 				break;
804 			case P_INITIAL_META:
805 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
806 				if (msock.socket) {
807 					drbd_warn(connection, "initial packet M crossed\n");
808 					sock_release(msock.socket);
809 					msock.socket = s;
810 					goto randomize;
811 				}
812 				msock.socket = s;
813 				break;
814 			default:
815 				drbd_warn(connection, "Error receiving initial packet\n");
816 				sock_release(s);
817 randomize:
818 				if (get_random_u32_below(2))
819 					goto retry;
820 			}
821 		}
822 
823 		if (connection->cstate <= C_DISCONNECTING)
824 			goto out_release_sockets;
825 		if (signal_pending(current)) {
826 			flush_signals(current);
827 			smp_rmb();
828 			if (get_t_state(&connection->receiver) == EXITING)
829 				goto out_release_sockets;
830 		}
831 
832 		ok = connection_established(connection, &sock.socket, &msock.socket);
833 	} while (!ok);
834 
835 	if (ad.s_listen)
836 		sock_release(ad.s_listen);
837 
838 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
839 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
840 
841 	sock.socket->sk->sk_allocation = GFP_NOIO;
842 	msock.socket->sk->sk_allocation = GFP_NOIO;
843 
844 	sock.socket->sk->sk_use_task_frag = false;
845 	msock.socket->sk->sk_use_task_frag = false;
846 
847 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
848 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
849 
850 	/* NOT YET ...
851 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
852 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
853 	 * first set it to the P_CONNECTION_FEATURES timeout,
854 	 * which we set to 4x the configured ping_timeout. */
855 	rcu_read_lock();
856 	nc = rcu_dereference(connection->net_conf);
857 
858 	sock.socket->sk->sk_sndtimeo =
859 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
860 
861 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
862 	timeout = nc->timeout * HZ / 10;
863 	discard_my_data = nc->discard_my_data;
864 	rcu_read_unlock();
865 
866 	msock.socket->sk->sk_sndtimeo = timeout;
867 
868 	/* we don't want delays.
869 	 * we use TCP_CORK where appropriate, though */
870 	tcp_sock_set_nodelay(sock.socket->sk);
871 	tcp_sock_set_nodelay(msock.socket->sk);
872 
873 	connection->data.socket = sock.socket;
874 	connection->meta.socket = msock.socket;
875 	connection->last_received = jiffies;
876 
877 	h = drbd_do_features(connection);
878 	if (h <= 0)
879 		return h;
880 
881 	if (connection->cram_hmac_tfm) {
882 		/* drbd_request_state(device, NS(conn, WFAuth)); */
883 		switch (drbd_do_auth(connection)) {
884 		case -1:
885 			drbd_err(connection, "Authentication of peer failed\n");
886 			return -1;
887 		case 0:
888 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
889 			return 0;
890 		}
891 	}
892 
893 	connection->data.socket->sk->sk_sndtimeo = timeout;
894 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
895 
896 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
897 		return -1;
898 
899 	/* Prevent a race between resync-handshake and
900 	 * being promoted to Primary.
901 	 *
902 	 * Grab and release the state mutex, so we know that any current
903 	 * drbd_set_role() is finished, and any incoming drbd_set_role
904 	 * will see the STATE_SENT flag, and wait for it to be cleared.
905 	 */
906 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
907 		mutex_lock(peer_device->device->state_mutex);
908 
909 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
910 	spin_lock_irq(&connection->resource->req_lock);
911 	set_bit(STATE_SENT, &connection->flags);
912 	spin_unlock_irq(&connection->resource->req_lock);
913 
914 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
915 		mutex_unlock(peer_device->device->state_mutex);
916 
917 	rcu_read_lock();
918 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
919 		struct drbd_device *device = peer_device->device;
920 		kref_get(&device->kref);
921 		rcu_read_unlock();
922 
923 		if (discard_my_data)
924 			set_bit(DISCARD_MY_DATA, &device->flags);
925 		else
926 			clear_bit(DISCARD_MY_DATA, &device->flags);
927 
928 		drbd_connected(peer_device);
929 		kref_put(&device->kref, drbd_destroy_device);
930 		rcu_read_lock();
931 	}
932 	rcu_read_unlock();
933 
934 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
935 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
936 		clear_bit(STATE_SENT, &connection->flags);
937 		return 0;
938 	}
939 
940 	drbd_thread_start(&connection->ack_receiver);
941 	/* opencoded create_singlethread_workqueue(),
942 	 * to be able to use format string arguments */
943 	connection->ack_sender =
944 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
945 	if (!connection->ack_sender) {
946 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
947 		return 0;
948 	}
949 
950 	mutex_lock(&connection->resource->conf_update);
951 	/* The discard_my_data flag is a single-shot modifier to the next
952 	 * connection attempt, the handshake of which is now well underway.
953 	 * No need for rcu style copying of the whole struct
954 	 * just to clear a single value. */
955 	connection->net_conf->discard_my_data = 0;
956 	mutex_unlock(&connection->resource->conf_update);
957 
958 	return h;
959 
960 out_release_sockets:
961 	if (ad.s_listen)
962 		sock_release(ad.s_listen);
963 	if (sock.socket)
964 		sock_release(sock.socket);
965 	if (msock.socket)
966 		sock_release(msock.socket);
967 	return -1;
968 }
969 
970 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
971 {
972 	unsigned int header_size = drbd_header_size(connection);
973 
974 	if (header_size == sizeof(struct p_header100) &&
975 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
976 		struct p_header100 *h = header;
977 		if (h->pad != 0) {
978 			drbd_err(connection, "Header padding is not zero\n");
979 			return -EINVAL;
980 		}
981 		pi->vnr = be16_to_cpu(h->volume);
982 		pi->cmd = be16_to_cpu(h->command);
983 		pi->size = be32_to_cpu(h->length);
984 	} else if (header_size == sizeof(struct p_header95) &&
985 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
986 		struct p_header95 *h = header;
987 		pi->cmd = be16_to_cpu(h->command);
988 		pi->size = be32_to_cpu(h->length);
989 		pi->vnr = 0;
990 	} else if (header_size == sizeof(struct p_header80) &&
991 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
992 		struct p_header80 *h = header;
993 		pi->cmd = be16_to_cpu(h->command);
994 		pi->size = be16_to_cpu(h->length);
995 		pi->vnr = 0;
996 	} else {
997 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
998 			 be32_to_cpu(*(__be32 *)header),
999 			 connection->agreed_pro_version);
1000 		return -EINVAL;
1001 	}
1002 	pi->data = header + header_size;
1003 	return 0;
1004 }
1005 
1006 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1007 {
1008 	if (current->plug == &connection->receiver_plug) {
1009 		blk_finish_plug(&connection->receiver_plug);
1010 		blk_start_plug(&connection->receiver_plug);
1011 	} /* else: maybe just schedule() ?? */
1012 }
1013 
1014 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1015 {
1016 	void *buffer = connection->data.rbuf;
1017 	int err;
1018 
1019 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1020 	if (err)
1021 		return err;
1022 
1023 	err = decode_header(connection, buffer, pi);
1024 	connection->last_received = jiffies;
1025 
1026 	return err;
1027 }
1028 
1029 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1030 {
1031 	void *buffer = connection->data.rbuf;
1032 	unsigned int size = drbd_header_size(connection);
1033 	int err;
1034 
1035 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1036 	if (err != size) {
1037 		/* If we have nothing in the receive buffer now, to reduce
1038 		 * application latency, try to drain the backend queues as
1039 		 * quickly as possible, and let remote TCP know what we have
1040 		 * received so far. */
1041 		if (err == -EAGAIN) {
1042 			tcp_sock_set_quickack(connection->data.socket->sk, 2);
1043 			drbd_unplug_all_devices(connection);
1044 		}
1045 		if (err > 0) {
1046 			buffer += err;
1047 			size -= err;
1048 		}
1049 		err = drbd_recv_all_warn(connection, buffer, size);
1050 		if (err)
1051 			return err;
1052 	}
1053 
1054 	err = decode_header(connection, connection->data.rbuf, pi);
1055 	connection->last_received = jiffies;
1056 
1057 	return err;
1058 }
1059 /* This is blkdev_issue_flush, but asynchronous.
1060  * We want to submit to all component volumes in parallel,
1061  * then wait for all completions.
1062  */
1063 struct issue_flush_context {
1064 	atomic_t pending;
1065 	int error;
1066 	struct completion done;
1067 };
1068 struct one_flush_context {
1069 	struct drbd_device *device;
1070 	struct issue_flush_context *ctx;
1071 };
1072 
1073 static void one_flush_endio(struct bio *bio)
1074 {
1075 	struct one_flush_context *octx = bio->bi_private;
1076 	struct drbd_device *device = octx->device;
1077 	struct issue_flush_context *ctx = octx->ctx;
1078 
1079 	if (bio->bi_status) {
1080 		ctx->error = blk_status_to_errno(bio->bi_status);
1081 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1082 	}
1083 	kfree(octx);
1084 	bio_put(bio);
1085 
1086 	clear_bit(FLUSH_PENDING, &device->flags);
1087 	put_ldev(device);
1088 	kref_put(&device->kref, drbd_destroy_device);
1089 
1090 	if (atomic_dec_and_test(&ctx->pending))
1091 		complete(&ctx->done);
1092 }
1093 
1094 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1095 {
1096 	struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
1097 				    REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO);
1098 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1099 
1100 	if (!octx) {
1101 		drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
1102 		/* FIXME: what else can I do now?  disconnecting or detaching
1103 		 * really does not help to improve the state of the world, either.
1104 		 */
1105 		bio_put(bio);
1106 
1107 		ctx->error = -ENOMEM;
1108 		put_ldev(device);
1109 		kref_put(&device->kref, drbd_destroy_device);
1110 		return;
1111 	}
1112 
1113 	octx->device = device;
1114 	octx->ctx = ctx;
1115 	bio->bi_private = octx;
1116 	bio->bi_end_io = one_flush_endio;
1117 
1118 	device->flush_jif = jiffies;
1119 	set_bit(FLUSH_PENDING, &device->flags);
1120 	atomic_inc(&ctx->pending);
1121 	submit_bio(bio);
1122 }
1123 
1124 static void drbd_flush(struct drbd_connection *connection)
1125 {
1126 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1127 		struct drbd_peer_device *peer_device;
1128 		struct issue_flush_context ctx;
1129 		int vnr;
1130 
1131 		atomic_set(&ctx.pending, 1);
1132 		ctx.error = 0;
1133 		init_completion(&ctx.done);
1134 
1135 		rcu_read_lock();
1136 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1137 			struct drbd_device *device = peer_device->device;
1138 
1139 			if (!get_ldev(device))
1140 				continue;
1141 			kref_get(&device->kref);
1142 			rcu_read_unlock();
1143 
1144 			submit_one_flush(device, &ctx);
1145 
1146 			rcu_read_lock();
1147 		}
1148 		rcu_read_unlock();
1149 
1150 		/* Do we want to add a timeout,
1151 		 * if disk-timeout is set? */
1152 		if (!atomic_dec_and_test(&ctx.pending))
1153 			wait_for_completion(&ctx.done);
1154 
1155 		if (ctx.error) {
1156 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1157 			 * don't try again for ANY return value != 0
1158 			 * if (rv == -EOPNOTSUPP) */
1159 			/* Any error is already reported by bio_endio callback. */
1160 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1161 		}
1162 	}
1163 }
1164 
1165 /**
1166  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1167  * @connection:	DRBD connection.
1168  * @epoch:	Epoch object.
1169  * @ev:		Epoch event.
1170  */
1171 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1172 					       struct drbd_epoch *epoch,
1173 					       enum epoch_event ev)
1174 {
1175 	int epoch_size;
1176 	struct drbd_epoch *next_epoch;
1177 	enum finish_epoch rv = FE_STILL_LIVE;
1178 
1179 	spin_lock(&connection->epoch_lock);
1180 	do {
1181 		next_epoch = NULL;
1182 
1183 		epoch_size = atomic_read(&epoch->epoch_size);
1184 
1185 		switch (ev & ~EV_CLEANUP) {
1186 		case EV_PUT:
1187 			atomic_dec(&epoch->active);
1188 			break;
1189 		case EV_GOT_BARRIER_NR:
1190 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1191 			break;
1192 		case EV_BECAME_LAST:
1193 			/* nothing to do*/
1194 			break;
1195 		}
1196 
1197 		if (epoch_size != 0 &&
1198 		    atomic_read(&epoch->active) == 0 &&
1199 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1200 			if (!(ev & EV_CLEANUP)) {
1201 				spin_unlock(&connection->epoch_lock);
1202 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1203 				spin_lock(&connection->epoch_lock);
1204 			}
1205 #if 0
1206 			/* FIXME: dec unacked on connection, once we have
1207 			 * something to count pending connection packets in. */
1208 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1209 				dec_unacked(epoch->connection);
1210 #endif
1211 
1212 			if (connection->current_epoch != epoch) {
1213 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1214 				list_del(&epoch->list);
1215 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1216 				connection->epochs--;
1217 				kfree(epoch);
1218 
1219 				if (rv == FE_STILL_LIVE)
1220 					rv = FE_DESTROYED;
1221 			} else {
1222 				epoch->flags = 0;
1223 				atomic_set(&epoch->epoch_size, 0);
1224 				/* atomic_set(&epoch->active, 0); is already zero */
1225 				if (rv == FE_STILL_LIVE)
1226 					rv = FE_RECYCLED;
1227 			}
1228 		}
1229 
1230 		if (!next_epoch)
1231 			break;
1232 
1233 		epoch = next_epoch;
1234 	} while (1);
1235 
1236 	spin_unlock(&connection->epoch_lock);
1237 
1238 	return rv;
1239 }
1240 
1241 static enum write_ordering_e
1242 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1243 {
1244 	struct disk_conf *dc;
1245 
1246 	dc = rcu_dereference(bdev->disk_conf);
1247 
1248 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1249 		wo = WO_DRAIN_IO;
1250 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1251 		wo = WO_NONE;
1252 
1253 	return wo;
1254 }
1255 
1256 /*
1257  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1258  * @wo:		Write ordering method to try.
1259  */
1260 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1261 			      enum write_ordering_e wo)
1262 {
1263 	struct drbd_device *device;
1264 	enum write_ordering_e pwo;
1265 	int vnr;
1266 	static char *write_ordering_str[] = {
1267 		[WO_NONE] = "none",
1268 		[WO_DRAIN_IO] = "drain",
1269 		[WO_BDEV_FLUSH] = "flush",
1270 	};
1271 
1272 	pwo = resource->write_ordering;
1273 	if (wo != WO_BDEV_FLUSH)
1274 		wo = min(pwo, wo);
1275 	rcu_read_lock();
1276 	idr_for_each_entry(&resource->devices, device, vnr) {
1277 		if (get_ldev(device)) {
1278 			wo = max_allowed_wo(device->ldev, wo);
1279 			if (device->ldev == bdev)
1280 				bdev = NULL;
1281 			put_ldev(device);
1282 		}
1283 	}
1284 
1285 	if (bdev)
1286 		wo = max_allowed_wo(bdev, wo);
1287 
1288 	rcu_read_unlock();
1289 
1290 	resource->write_ordering = wo;
1291 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1292 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1293 }
1294 
1295 /*
1296  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1297  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1298  * will directly go to fallback mode, submitting normal writes, and
1299  * never even try to UNMAP.
1300  *
1301  * And dm-thin does not do this (yet), mostly because in general it has
1302  * to assume that "skip_block_zeroing" is set.  See also:
1303  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1304  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1305  *
1306  * We *may* ignore the discard-zeroes-data setting, if so configured.
1307  *
1308  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1309  * may ignore partial unaligned discards.
1310  *
1311  * LVM/DM thin as of at least
1312  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1313  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1314  *   Driver version:  4.29.0
1315  * still behaves this way.
1316  *
1317  * For unaligned (wrt. alignment and granularity) or too small discards,
1318  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1319  * but discard all the aligned full chunks.
1320  *
1321  * At least for LVM/DM thin, with skip_block_zeroing=false,
1322  * the result is effectively "discard_zeroes_data=1".
1323  */
1324 /* flags: EE_TRIM|EE_ZEROOUT */
1325 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1326 {
1327 	struct block_device *bdev = device->ldev->backing_bdev;
1328 	sector_t tmp, nr;
1329 	unsigned int max_discard_sectors, granularity;
1330 	int alignment;
1331 	int err = 0;
1332 
1333 	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1334 		goto zero_out;
1335 
1336 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1337 	granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
1338 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1339 
1340 	max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
1341 	max_discard_sectors -= max_discard_sectors % granularity;
1342 	if (unlikely(!max_discard_sectors))
1343 		goto zero_out;
1344 
1345 	if (nr_sectors < granularity)
1346 		goto zero_out;
1347 
1348 	tmp = start;
1349 	if (sector_div(tmp, granularity) != alignment) {
1350 		if (nr_sectors < 2*granularity)
1351 			goto zero_out;
1352 		/* start + gran - (start + gran - align) % gran */
1353 		tmp = start + granularity - alignment;
1354 		tmp = start + granularity - sector_div(tmp, granularity);
1355 
1356 		nr = tmp - start;
1357 		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1358 		 * layers are below us, some may have smaller granularity */
1359 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1360 		nr_sectors -= nr;
1361 		start = tmp;
1362 	}
1363 	while (nr_sectors >= max_discard_sectors) {
1364 		err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
1365 					    GFP_NOIO);
1366 		nr_sectors -= max_discard_sectors;
1367 		start += max_discard_sectors;
1368 	}
1369 	if (nr_sectors) {
1370 		/* max_discard_sectors is unsigned int (and a multiple of
1371 		 * granularity, we made sure of that above already);
1372 		 * nr is < max_discard_sectors;
1373 		 * I don't need sector_div here, even though nr is sector_t */
1374 		nr = nr_sectors;
1375 		nr -= (unsigned int)nr % granularity;
1376 		if (nr) {
1377 			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
1378 			nr_sectors -= nr;
1379 			start += nr;
1380 		}
1381 	}
1382  zero_out:
1383 	if (nr_sectors) {
1384 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1385 				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1386 	}
1387 	return err != 0;
1388 }
1389 
1390 static bool can_do_reliable_discards(struct drbd_device *device)
1391 {
1392 	struct disk_conf *dc;
1393 	bool can_do;
1394 
1395 	if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
1396 		return false;
1397 
1398 	rcu_read_lock();
1399 	dc = rcu_dereference(device->ldev->disk_conf);
1400 	can_do = dc->discard_zeroes_if_aligned;
1401 	rcu_read_unlock();
1402 	return can_do;
1403 }
1404 
1405 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1406 {
1407 	/* If the backend cannot discard, or does not guarantee
1408 	 * read-back zeroes in discarded ranges, we fall back to
1409 	 * zero-out.  Unless configuration specifically requested
1410 	 * otherwise. */
1411 	if (!can_do_reliable_discards(device))
1412 		peer_req->flags |= EE_ZEROOUT;
1413 
1414 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1415 	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1416 		peer_req->flags |= EE_WAS_ERROR;
1417 	drbd_endio_write_sec_final(peer_req);
1418 }
1419 
1420 static int peer_request_fault_type(struct drbd_peer_request *peer_req)
1421 {
1422 	if (peer_req_op(peer_req) == REQ_OP_READ) {
1423 		return peer_req->flags & EE_APPLICATION ?
1424 			DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD;
1425 	} else {
1426 		return peer_req->flags & EE_APPLICATION ?
1427 			DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR;
1428 	}
1429 }
1430 
1431 /**
1432  * drbd_submit_peer_request()
1433  * @peer_req:	peer request
1434  *
1435  * May spread the pages to multiple bios,
1436  * depending on bio_add_page restrictions.
1437  *
1438  * Returns 0 if all bios have been submitted,
1439  * -ENOMEM if we could not allocate enough bios,
1440  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1441  *  single page to an empty bio (which should never happen and likely indicates
1442  *  that the lower level IO stack is in some way broken). This has been observed
1443  *  on certain Xen deployments.
1444  */
1445 /* TODO allocate from our own bio_set. */
1446 int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
1447 {
1448 	struct drbd_device *device = peer_req->peer_device->device;
1449 	struct bio *bios = NULL;
1450 	struct bio *bio;
1451 	struct page *page = peer_req->pages;
1452 	sector_t sector = peer_req->i.sector;
1453 	unsigned int data_size = peer_req->i.size;
1454 	unsigned int n_bios = 0;
1455 	unsigned int nr_pages = PFN_UP(data_size);
1456 
1457 	/* TRIM/DISCARD: for now, always use the helper function
1458 	 * blkdev_issue_zeroout(..., discard=true).
1459 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1460 	 * Correctness first, performance later.  Next step is to code an
1461 	 * asynchronous variant of the same.
1462 	 */
1463 	if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
1464 		/* wait for all pending IO completions, before we start
1465 		 * zeroing things out. */
1466 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1467 		/* add it to the active list now,
1468 		 * so we can find it to present it in debugfs */
1469 		peer_req->submit_jif = jiffies;
1470 		peer_req->flags |= EE_SUBMITTED;
1471 
1472 		/* If this was a resync request from receive_rs_deallocated(),
1473 		 * it is already on the sync_ee list */
1474 		if (list_empty(&peer_req->w.list)) {
1475 			spin_lock_irq(&device->resource->req_lock);
1476 			list_add_tail(&peer_req->w.list, &device->active_ee);
1477 			spin_unlock_irq(&device->resource->req_lock);
1478 		}
1479 
1480 		drbd_issue_peer_discard_or_zero_out(device, peer_req);
1481 		return 0;
1482 	}
1483 
1484 	/* In most cases, we will only need one bio.  But in case the lower
1485 	 * level restrictions happen to be different at this offset on this
1486 	 * side than those of the sending peer, we may need to submit the
1487 	 * request in more than one bio.
1488 	 *
1489 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1490 	 * generated bio, but a bio allocated on behalf of the peer.
1491 	 */
1492 next_bio:
1493 	/* _DISCARD, _WRITE_ZEROES handled above.
1494 	 * REQ_OP_FLUSH (empty flush) not expected,
1495 	 * should have been mapped to a "drbd protocol barrier".
1496 	 * REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
1497 	 */
1498 	if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
1499 				peer_req_op(peer_req) == REQ_OP_READ)) {
1500 		drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf);
1501 		return -EINVAL;
1502 	}
1503 
1504 	bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO);
1505 	/* > peer_req->i.sector, unless this is the first bio */
1506 	bio->bi_iter.bi_sector = sector;
1507 	bio->bi_private = peer_req;
1508 	bio->bi_end_io = drbd_peer_request_endio;
1509 
1510 	bio->bi_next = bios;
1511 	bios = bio;
1512 	++n_bios;
1513 
1514 	page_chain_for_each(page) {
1515 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1516 		if (!bio_add_page(bio, page, len, 0))
1517 			goto next_bio;
1518 		data_size -= len;
1519 		sector += len >> 9;
1520 		--nr_pages;
1521 	}
1522 	D_ASSERT(device, data_size == 0);
1523 	D_ASSERT(device, page == NULL);
1524 
1525 	atomic_set(&peer_req->pending_bios, n_bios);
1526 	/* for debugfs: update timestamp, mark as submitted */
1527 	peer_req->submit_jif = jiffies;
1528 	peer_req->flags |= EE_SUBMITTED;
1529 	do {
1530 		bio = bios;
1531 		bios = bios->bi_next;
1532 		bio->bi_next = NULL;
1533 
1534 		drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio);
1535 	} while (bios);
1536 	return 0;
1537 }
1538 
1539 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1540 					     struct drbd_peer_request *peer_req)
1541 {
1542 	struct drbd_interval *i = &peer_req->i;
1543 
1544 	drbd_remove_interval(&device->write_requests, i);
1545 	drbd_clear_interval(i);
1546 
1547 	/* Wake up any processes waiting for this peer request to complete.  */
1548 	if (i->waiting)
1549 		wake_up(&device->misc_wait);
1550 }
1551 
1552 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1553 {
1554 	struct drbd_peer_device *peer_device;
1555 	int vnr;
1556 
1557 	rcu_read_lock();
1558 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1559 		struct drbd_device *device = peer_device->device;
1560 
1561 		kref_get(&device->kref);
1562 		rcu_read_unlock();
1563 		drbd_wait_ee_list_empty(device, &device->active_ee);
1564 		kref_put(&device->kref, drbd_destroy_device);
1565 		rcu_read_lock();
1566 	}
1567 	rcu_read_unlock();
1568 }
1569 
1570 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1571 {
1572 	int rv;
1573 	struct p_barrier *p = pi->data;
1574 	struct drbd_epoch *epoch;
1575 
1576 	/* FIXME these are unacked on connection,
1577 	 * not a specific (peer)device.
1578 	 */
1579 	connection->current_epoch->barrier_nr = p->barrier;
1580 	connection->current_epoch->connection = connection;
1581 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1582 
1583 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1584 	 * the activity log, which means it would not be resynced in case the
1585 	 * R_PRIMARY crashes now.
1586 	 * Therefore we must send the barrier_ack after the barrier request was
1587 	 * completed. */
1588 	switch (connection->resource->write_ordering) {
1589 	case WO_NONE:
1590 		if (rv == FE_RECYCLED)
1591 			return 0;
1592 
1593 		/* receiver context, in the writeout path of the other node.
1594 		 * avoid potential distributed deadlock */
1595 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1596 		if (epoch)
1597 			break;
1598 		else
1599 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1600 		fallthrough;
1601 
1602 	case WO_BDEV_FLUSH:
1603 	case WO_DRAIN_IO:
1604 		conn_wait_active_ee_empty(connection);
1605 		drbd_flush(connection);
1606 
1607 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1608 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1609 			if (epoch)
1610 				break;
1611 		}
1612 
1613 		return 0;
1614 	default:
1615 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1616 			 connection->resource->write_ordering);
1617 		return -EIO;
1618 	}
1619 
1620 	epoch->flags = 0;
1621 	atomic_set(&epoch->epoch_size, 0);
1622 	atomic_set(&epoch->active, 0);
1623 
1624 	spin_lock(&connection->epoch_lock);
1625 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1626 		list_add(&epoch->list, &connection->current_epoch->list);
1627 		connection->current_epoch = epoch;
1628 		connection->epochs++;
1629 	} else {
1630 		/* The current_epoch got recycled while we allocated this one... */
1631 		kfree(epoch);
1632 	}
1633 	spin_unlock(&connection->epoch_lock);
1634 
1635 	return 0;
1636 }
1637 
1638 /* quick wrapper in case payload size != request_size (write same) */
1639 static void drbd_csum_ee_size(struct crypto_shash *h,
1640 			      struct drbd_peer_request *r, void *d,
1641 			      unsigned int payload_size)
1642 {
1643 	unsigned int tmp = r->i.size;
1644 	r->i.size = payload_size;
1645 	drbd_csum_ee(h, r, d);
1646 	r->i.size = tmp;
1647 }
1648 
1649 /* used from receive_RSDataReply (recv_resync_read)
1650  * and from receive_Data.
1651  * data_size: actual payload ("data in")
1652  * 	for normal writes that is bi_size.
1653  * 	for discards, that is zero.
1654  * 	for write same, it is logical_block_size.
1655  * both trim and write same have the bi_size ("data len to be affected")
1656  * as extra argument in the packet header.
1657  */
1658 static struct drbd_peer_request *
1659 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1660 	      struct packet_info *pi) __must_hold(local)
1661 {
1662 	struct drbd_device *device = peer_device->device;
1663 	const sector_t capacity = get_capacity(device->vdisk);
1664 	struct drbd_peer_request *peer_req;
1665 	struct page *page;
1666 	int digest_size, err;
1667 	unsigned int data_size = pi->size, ds;
1668 	void *dig_in = peer_device->connection->int_dig_in;
1669 	void *dig_vv = peer_device->connection->int_dig_vv;
1670 	unsigned long *data;
1671 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1672 	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1673 
1674 	digest_size = 0;
1675 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1676 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1677 		/*
1678 		 * FIXME: Receive the incoming digest into the receive buffer
1679 		 *	  here, together with its struct p_data?
1680 		 */
1681 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1682 		if (err)
1683 			return NULL;
1684 		data_size -= digest_size;
1685 	}
1686 
1687 	/* assume request_size == data_size, but special case trim. */
1688 	ds = data_size;
1689 	if (trim) {
1690 		if (!expect(peer_device, data_size == 0))
1691 			return NULL;
1692 		ds = be32_to_cpu(trim->size);
1693 	} else if (zeroes) {
1694 		if (!expect(peer_device, data_size == 0))
1695 			return NULL;
1696 		ds = be32_to_cpu(zeroes->size);
1697 	}
1698 
1699 	if (!expect(peer_device, IS_ALIGNED(ds, 512)))
1700 		return NULL;
1701 	if (trim || zeroes) {
1702 		if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1703 			return NULL;
1704 	} else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE))
1705 		return NULL;
1706 
1707 	/* even though we trust out peer,
1708 	 * we sometimes have to double check. */
1709 	if (sector + (ds>>9) > capacity) {
1710 		drbd_err(device, "request from peer beyond end of local disk: "
1711 			"capacity: %llus < sector: %llus + size: %u\n",
1712 			(unsigned long long)capacity,
1713 			(unsigned long long)sector, ds);
1714 		return NULL;
1715 	}
1716 
1717 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1718 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1719 	 * which in turn might block on the other node at this very place.  */
1720 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1721 	if (!peer_req)
1722 		return NULL;
1723 
1724 	peer_req->flags |= EE_WRITE;
1725 	if (trim) {
1726 		peer_req->flags |= EE_TRIM;
1727 		return peer_req;
1728 	}
1729 	if (zeroes) {
1730 		peer_req->flags |= EE_ZEROOUT;
1731 		return peer_req;
1732 	}
1733 
1734 	/* receive payload size bytes into page chain */
1735 	ds = data_size;
1736 	page = peer_req->pages;
1737 	page_chain_for_each(page) {
1738 		unsigned len = min_t(int, ds, PAGE_SIZE);
1739 		data = kmap(page);
1740 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1741 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1742 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1743 			data[0] = data[0] ^ (unsigned long)-1;
1744 		}
1745 		kunmap(page);
1746 		if (err) {
1747 			drbd_free_peer_req(device, peer_req);
1748 			return NULL;
1749 		}
1750 		ds -= len;
1751 	}
1752 
1753 	if (digest_size) {
1754 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1755 		if (memcmp(dig_in, dig_vv, digest_size)) {
1756 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1757 				(unsigned long long)sector, data_size);
1758 			drbd_free_peer_req(device, peer_req);
1759 			return NULL;
1760 		}
1761 	}
1762 	device->recv_cnt += data_size >> 9;
1763 	return peer_req;
1764 }
1765 
1766 /* drbd_drain_block() just takes a data block
1767  * out of the socket input buffer, and discards it.
1768  */
1769 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1770 {
1771 	struct page *page;
1772 	int err = 0;
1773 	void *data;
1774 
1775 	if (!data_size)
1776 		return 0;
1777 
1778 	page = drbd_alloc_pages(peer_device, 1, 1);
1779 
1780 	data = kmap(page);
1781 	while (data_size) {
1782 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1783 
1784 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1785 		if (err)
1786 			break;
1787 		data_size -= len;
1788 	}
1789 	kunmap(page);
1790 	drbd_free_pages(peer_device->device, page);
1791 	return err;
1792 }
1793 
1794 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1795 			   sector_t sector, int data_size)
1796 {
1797 	struct bio_vec bvec;
1798 	struct bvec_iter iter;
1799 	struct bio *bio;
1800 	int digest_size, err, expect;
1801 	void *dig_in = peer_device->connection->int_dig_in;
1802 	void *dig_vv = peer_device->connection->int_dig_vv;
1803 
1804 	digest_size = 0;
1805 	if (peer_device->connection->peer_integrity_tfm) {
1806 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1807 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1808 		if (err)
1809 			return err;
1810 		data_size -= digest_size;
1811 	}
1812 
1813 	/* optimistically update recv_cnt.  if receiving fails below,
1814 	 * we disconnect anyways, and counters will be reset. */
1815 	peer_device->device->recv_cnt += data_size>>9;
1816 
1817 	bio = req->master_bio;
1818 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1819 
1820 	bio_for_each_segment(bvec, bio, iter) {
1821 		void *mapped = bvec_kmap_local(&bvec);
1822 		expect = min_t(int, data_size, bvec.bv_len);
1823 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1824 		kunmap_local(mapped);
1825 		if (err)
1826 			return err;
1827 		data_size -= expect;
1828 	}
1829 
1830 	if (digest_size) {
1831 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1832 		if (memcmp(dig_in, dig_vv, digest_size)) {
1833 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1834 			return -EINVAL;
1835 		}
1836 	}
1837 
1838 	D_ASSERT(peer_device->device, data_size == 0);
1839 	return 0;
1840 }
1841 
1842 /*
1843  * e_end_resync_block() is called in ack_sender context via
1844  * drbd_finish_peer_reqs().
1845  */
1846 static int e_end_resync_block(struct drbd_work *w, int unused)
1847 {
1848 	struct drbd_peer_request *peer_req =
1849 		container_of(w, struct drbd_peer_request, w);
1850 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1851 	struct drbd_device *device = peer_device->device;
1852 	sector_t sector = peer_req->i.sector;
1853 	int err;
1854 
1855 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1856 
1857 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1858 		drbd_set_in_sync(peer_device, sector, peer_req->i.size);
1859 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1860 	} else {
1861 		/* Record failure to sync */
1862 		drbd_rs_failed_io(peer_device, sector, peer_req->i.size);
1863 
1864 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1865 	}
1866 	dec_unacked(device);
1867 
1868 	return err;
1869 }
1870 
1871 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1872 			    struct packet_info *pi) __releases(local)
1873 {
1874 	struct drbd_device *device = peer_device->device;
1875 	struct drbd_peer_request *peer_req;
1876 
1877 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1878 	if (!peer_req)
1879 		goto fail;
1880 
1881 	dec_rs_pending(peer_device);
1882 
1883 	inc_unacked(device);
1884 	/* corresponding dec_unacked() in e_end_resync_block()
1885 	 * respective _drbd_clear_done_ee */
1886 
1887 	peer_req->w.cb = e_end_resync_block;
1888 	peer_req->opf = REQ_OP_WRITE;
1889 	peer_req->submit_jif = jiffies;
1890 
1891 	spin_lock_irq(&device->resource->req_lock);
1892 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1893 	spin_unlock_irq(&device->resource->req_lock);
1894 
1895 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1896 	if (drbd_submit_peer_request(peer_req) == 0)
1897 		return 0;
1898 
1899 	/* don't care for the reason here */
1900 	drbd_err(device, "submit failed, triggering re-connect\n");
1901 	spin_lock_irq(&device->resource->req_lock);
1902 	list_del(&peer_req->w.list);
1903 	spin_unlock_irq(&device->resource->req_lock);
1904 
1905 	drbd_free_peer_req(device, peer_req);
1906 fail:
1907 	put_ldev(device);
1908 	return -EIO;
1909 }
1910 
1911 static struct drbd_request *
1912 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1913 	     sector_t sector, bool missing_ok, const char *func)
1914 {
1915 	struct drbd_request *req;
1916 
1917 	/* Request object according to our peer */
1918 	req = (struct drbd_request *)(unsigned long)id;
1919 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1920 		return req;
1921 	if (!missing_ok) {
1922 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1923 			(unsigned long)id, (unsigned long long)sector);
1924 	}
1925 	return NULL;
1926 }
1927 
1928 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1929 {
1930 	struct drbd_peer_device *peer_device;
1931 	struct drbd_device *device;
1932 	struct drbd_request *req;
1933 	sector_t sector;
1934 	int err;
1935 	struct p_data *p = pi->data;
1936 
1937 	peer_device = conn_peer_device(connection, pi->vnr);
1938 	if (!peer_device)
1939 		return -EIO;
1940 	device = peer_device->device;
1941 
1942 	sector = be64_to_cpu(p->sector);
1943 
1944 	spin_lock_irq(&device->resource->req_lock);
1945 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1946 	spin_unlock_irq(&device->resource->req_lock);
1947 	if (unlikely(!req))
1948 		return -EIO;
1949 
1950 	err = recv_dless_read(peer_device, req, sector, pi->size);
1951 	if (!err)
1952 		req_mod(req, DATA_RECEIVED, peer_device);
1953 	/* else: nothing. handled from drbd_disconnect...
1954 	 * I don't think we may complete this just yet
1955 	 * in case we are "on-disconnect: freeze" */
1956 
1957 	return err;
1958 }
1959 
1960 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1961 {
1962 	struct drbd_peer_device *peer_device;
1963 	struct drbd_device *device;
1964 	sector_t sector;
1965 	int err;
1966 	struct p_data *p = pi->data;
1967 
1968 	peer_device = conn_peer_device(connection, pi->vnr);
1969 	if (!peer_device)
1970 		return -EIO;
1971 	device = peer_device->device;
1972 
1973 	sector = be64_to_cpu(p->sector);
1974 	D_ASSERT(device, p->block_id == ID_SYNCER);
1975 
1976 	if (get_ldev(device)) {
1977 		/* data is submitted to disk within recv_resync_read.
1978 		 * corresponding put_ldev done below on error,
1979 		 * or in drbd_peer_request_endio. */
1980 		err = recv_resync_read(peer_device, sector, pi);
1981 	} else {
1982 		if (drbd_ratelimit())
1983 			drbd_err(device, "Can not write resync data to local disk.\n");
1984 
1985 		err = drbd_drain_block(peer_device, pi->size);
1986 
1987 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1988 	}
1989 
1990 	atomic_add(pi->size >> 9, &device->rs_sect_in);
1991 
1992 	return err;
1993 }
1994 
1995 static void restart_conflicting_writes(struct drbd_device *device,
1996 				       sector_t sector, int size)
1997 {
1998 	struct drbd_interval *i;
1999 	struct drbd_request *req;
2000 
2001 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2002 		if (!i->local)
2003 			continue;
2004 		req = container_of(i, struct drbd_request, i);
2005 		if (req->rq_state & RQ_LOCAL_PENDING ||
2006 		    !(req->rq_state & RQ_POSTPONED))
2007 			continue;
2008 		/* as it is RQ_POSTPONED, this will cause it to
2009 		 * be queued on the retry workqueue. */
2010 		__req_mod(req, CONFLICT_RESOLVED, NULL, NULL);
2011 	}
2012 }
2013 
2014 /*
2015  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2016  */
2017 static int e_end_block(struct drbd_work *w, int cancel)
2018 {
2019 	struct drbd_peer_request *peer_req =
2020 		container_of(w, struct drbd_peer_request, w);
2021 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2022 	struct drbd_device *device = peer_device->device;
2023 	sector_t sector = peer_req->i.sector;
2024 	int err = 0, pcmd;
2025 
2026 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2027 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2028 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2029 				device->state.conn <= C_PAUSED_SYNC_T &&
2030 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2031 				P_RS_WRITE_ACK : P_WRITE_ACK;
2032 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2033 			if (pcmd == P_RS_WRITE_ACK)
2034 				drbd_set_in_sync(peer_device, sector, peer_req->i.size);
2035 		} else {
2036 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2037 			/* we expect it to be marked out of sync anyways...
2038 			 * maybe assert this?  */
2039 		}
2040 		dec_unacked(device);
2041 	}
2042 
2043 	/* we delete from the conflict detection hash _after_ we sent out the
2044 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2045 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2046 		spin_lock_irq(&device->resource->req_lock);
2047 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2048 		drbd_remove_epoch_entry_interval(device, peer_req);
2049 		if (peer_req->flags & EE_RESTART_REQUESTS)
2050 			restart_conflicting_writes(device, sector, peer_req->i.size);
2051 		spin_unlock_irq(&device->resource->req_lock);
2052 	} else
2053 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2054 
2055 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2056 
2057 	return err;
2058 }
2059 
2060 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2061 {
2062 	struct drbd_peer_request *peer_req =
2063 		container_of(w, struct drbd_peer_request, w);
2064 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2065 	int err;
2066 
2067 	err = drbd_send_ack(peer_device, ack, peer_req);
2068 	dec_unacked(peer_device->device);
2069 
2070 	return err;
2071 }
2072 
2073 static int e_send_superseded(struct drbd_work *w, int unused)
2074 {
2075 	return e_send_ack(w, P_SUPERSEDED);
2076 }
2077 
2078 static int e_send_retry_write(struct drbd_work *w, int unused)
2079 {
2080 	struct drbd_peer_request *peer_req =
2081 		container_of(w, struct drbd_peer_request, w);
2082 	struct drbd_connection *connection = peer_req->peer_device->connection;
2083 
2084 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2085 			     P_RETRY_WRITE : P_SUPERSEDED);
2086 }
2087 
2088 static bool seq_greater(u32 a, u32 b)
2089 {
2090 	/*
2091 	 * We assume 32-bit wrap-around here.
2092 	 * For 24-bit wrap-around, we would have to shift:
2093 	 *  a <<= 8; b <<= 8;
2094 	 */
2095 	return (s32)a - (s32)b > 0;
2096 }
2097 
2098 static u32 seq_max(u32 a, u32 b)
2099 {
2100 	return seq_greater(a, b) ? a : b;
2101 }
2102 
2103 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2104 {
2105 	struct drbd_device *device = peer_device->device;
2106 	unsigned int newest_peer_seq;
2107 
2108 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2109 		spin_lock(&device->peer_seq_lock);
2110 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2111 		device->peer_seq = newest_peer_seq;
2112 		spin_unlock(&device->peer_seq_lock);
2113 		/* wake up only if we actually changed device->peer_seq */
2114 		if (peer_seq == newest_peer_seq)
2115 			wake_up(&device->seq_wait);
2116 	}
2117 }
2118 
2119 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2120 {
2121 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2122 }
2123 
2124 /* maybe change sync_ee into interval trees as well? */
2125 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2126 {
2127 	struct drbd_peer_request *rs_req;
2128 	bool rv = false;
2129 
2130 	spin_lock_irq(&device->resource->req_lock);
2131 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2132 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2133 			     rs_req->i.sector, rs_req->i.size)) {
2134 			rv = true;
2135 			break;
2136 		}
2137 	}
2138 	spin_unlock_irq(&device->resource->req_lock);
2139 
2140 	return rv;
2141 }
2142 
2143 /* Called from receive_Data.
2144  * Synchronize packets on sock with packets on msock.
2145  *
2146  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2147  * packet traveling on msock, they are still processed in the order they have
2148  * been sent.
2149  *
2150  * Note: we don't care for Ack packets overtaking P_DATA packets.
2151  *
2152  * In case packet_seq is larger than device->peer_seq number, there are
2153  * outstanding packets on the msock. We wait for them to arrive.
2154  * In case we are the logically next packet, we update device->peer_seq
2155  * ourselves. Correctly handles 32bit wrap around.
2156  *
2157  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2158  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2159  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2160  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2161  *
2162  * returns 0 if we may process the packet,
2163  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2164 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2165 {
2166 	struct drbd_device *device = peer_device->device;
2167 	DEFINE_WAIT(wait);
2168 	long timeout;
2169 	int ret = 0, tp;
2170 
2171 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2172 		return 0;
2173 
2174 	spin_lock(&device->peer_seq_lock);
2175 	for (;;) {
2176 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2177 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2178 			break;
2179 		}
2180 
2181 		if (signal_pending(current)) {
2182 			ret = -ERESTARTSYS;
2183 			break;
2184 		}
2185 
2186 		rcu_read_lock();
2187 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2188 		rcu_read_unlock();
2189 
2190 		if (!tp)
2191 			break;
2192 
2193 		/* Only need to wait if two_primaries is enabled */
2194 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2195 		spin_unlock(&device->peer_seq_lock);
2196 		rcu_read_lock();
2197 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2198 		rcu_read_unlock();
2199 		timeout = schedule_timeout(timeout);
2200 		spin_lock(&device->peer_seq_lock);
2201 		if (!timeout) {
2202 			ret = -ETIMEDOUT;
2203 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2204 			break;
2205 		}
2206 	}
2207 	spin_unlock(&device->peer_seq_lock);
2208 	finish_wait(&device->seq_wait, &wait);
2209 	return ret;
2210 }
2211 
2212 static enum req_op wire_flags_to_bio_op(u32 dpf)
2213 {
2214 	if (dpf & DP_ZEROES)
2215 		return REQ_OP_WRITE_ZEROES;
2216 	if (dpf & DP_DISCARD)
2217 		return REQ_OP_DISCARD;
2218 	else
2219 		return REQ_OP_WRITE;
2220 }
2221 
2222 /* see also bio_flags_to_wire() */
2223 static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf)
2224 {
2225 	return wire_flags_to_bio_op(dpf) |
2226 		(dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2227 		(dpf & DP_FUA ? REQ_FUA : 0) |
2228 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2229 }
2230 
2231 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2232 				    unsigned int size)
2233 {
2234 	struct drbd_peer_device *peer_device = first_peer_device(device);
2235 	struct drbd_interval *i;
2236 
2237     repeat:
2238 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2239 		struct drbd_request *req;
2240 		struct bio_and_error m;
2241 
2242 		if (!i->local)
2243 			continue;
2244 		req = container_of(i, struct drbd_request, i);
2245 		if (!(req->rq_state & RQ_POSTPONED))
2246 			continue;
2247 		req->rq_state &= ~RQ_POSTPONED;
2248 		__req_mod(req, NEG_ACKED, peer_device, &m);
2249 		spin_unlock_irq(&device->resource->req_lock);
2250 		if (m.bio)
2251 			complete_master_bio(device, &m);
2252 		spin_lock_irq(&device->resource->req_lock);
2253 		goto repeat;
2254 	}
2255 }
2256 
2257 static int handle_write_conflicts(struct drbd_device *device,
2258 				  struct drbd_peer_request *peer_req)
2259 {
2260 	struct drbd_connection *connection = peer_req->peer_device->connection;
2261 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2262 	sector_t sector = peer_req->i.sector;
2263 	const unsigned int size = peer_req->i.size;
2264 	struct drbd_interval *i;
2265 	bool equal;
2266 	int err;
2267 
2268 	/*
2269 	 * Inserting the peer request into the write_requests tree will prevent
2270 	 * new conflicting local requests from being added.
2271 	 */
2272 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2273 
2274     repeat:
2275 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2276 		if (i == &peer_req->i)
2277 			continue;
2278 		if (i->completed)
2279 			continue;
2280 
2281 		if (!i->local) {
2282 			/*
2283 			 * Our peer has sent a conflicting remote request; this
2284 			 * should not happen in a two-node setup.  Wait for the
2285 			 * earlier peer request to complete.
2286 			 */
2287 			err = drbd_wait_misc(device, i);
2288 			if (err)
2289 				goto out;
2290 			goto repeat;
2291 		}
2292 
2293 		equal = i->sector == sector && i->size == size;
2294 		if (resolve_conflicts) {
2295 			/*
2296 			 * If the peer request is fully contained within the
2297 			 * overlapping request, it can be considered overwritten
2298 			 * and thus superseded; otherwise, it will be retried
2299 			 * once all overlapping requests have completed.
2300 			 */
2301 			bool superseded = i->sector <= sector && i->sector +
2302 				       (i->size >> 9) >= sector + (size >> 9);
2303 
2304 			if (!equal)
2305 				drbd_alert(device, "Concurrent writes detected: "
2306 					       "local=%llus +%u, remote=%llus +%u, "
2307 					       "assuming %s came first\n",
2308 					  (unsigned long long)i->sector, i->size,
2309 					  (unsigned long long)sector, size,
2310 					  superseded ? "local" : "remote");
2311 
2312 			peer_req->w.cb = superseded ? e_send_superseded :
2313 						   e_send_retry_write;
2314 			list_add_tail(&peer_req->w.list, &device->done_ee);
2315 			/* put is in drbd_send_acks_wf() */
2316 			kref_get(&device->kref);
2317 			if (!queue_work(connection->ack_sender,
2318 					&peer_req->peer_device->send_acks_work))
2319 				kref_put(&device->kref, drbd_destroy_device);
2320 
2321 			err = -ENOENT;
2322 			goto out;
2323 		} else {
2324 			struct drbd_request *req =
2325 				container_of(i, struct drbd_request, i);
2326 
2327 			if (!equal)
2328 				drbd_alert(device, "Concurrent writes detected: "
2329 					       "local=%llus +%u, remote=%llus +%u\n",
2330 					  (unsigned long long)i->sector, i->size,
2331 					  (unsigned long long)sector, size);
2332 
2333 			if (req->rq_state & RQ_LOCAL_PENDING ||
2334 			    !(req->rq_state & RQ_POSTPONED)) {
2335 				/*
2336 				 * Wait for the node with the discard flag to
2337 				 * decide if this request has been superseded
2338 				 * or needs to be retried.
2339 				 * Requests that have been superseded will
2340 				 * disappear from the write_requests tree.
2341 				 *
2342 				 * In addition, wait for the conflicting
2343 				 * request to finish locally before submitting
2344 				 * the conflicting peer request.
2345 				 */
2346 				err = drbd_wait_misc(device, &req->i);
2347 				if (err) {
2348 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2349 					fail_postponed_requests(device, sector, size);
2350 					goto out;
2351 				}
2352 				goto repeat;
2353 			}
2354 			/*
2355 			 * Remember to restart the conflicting requests after
2356 			 * the new peer request has completed.
2357 			 */
2358 			peer_req->flags |= EE_RESTART_REQUESTS;
2359 		}
2360 	}
2361 	err = 0;
2362 
2363     out:
2364 	if (err)
2365 		drbd_remove_epoch_entry_interval(device, peer_req);
2366 	return err;
2367 }
2368 
2369 /* mirrored write */
2370 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2371 {
2372 	struct drbd_peer_device *peer_device;
2373 	struct drbd_device *device;
2374 	struct net_conf *nc;
2375 	sector_t sector;
2376 	struct drbd_peer_request *peer_req;
2377 	struct p_data *p = pi->data;
2378 	u32 peer_seq = be32_to_cpu(p->seq_num);
2379 	u32 dp_flags;
2380 	int err, tp;
2381 
2382 	peer_device = conn_peer_device(connection, pi->vnr);
2383 	if (!peer_device)
2384 		return -EIO;
2385 	device = peer_device->device;
2386 
2387 	if (!get_ldev(device)) {
2388 		int err2;
2389 
2390 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2391 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2392 		atomic_inc(&connection->current_epoch->epoch_size);
2393 		err2 = drbd_drain_block(peer_device, pi->size);
2394 		if (!err)
2395 			err = err2;
2396 		return err;
2397 	}
2398 
2399 	/*
2400 	 * Corresponding put_ldev done either below (on various errors), or in
2401 	 * drbd_peer_request_endio, if we successfully submit the data at the
2402 	 * end of this function.
2403 	 */
2404 
2405 	sector = be64_to_cpu(p->sector);
2406 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2407 	if (!peer_req) {
2408 		put_ldev(device);
2409 		return -EIO;
2410 	}
2411 
2412 	peer_req->w.cb = e_end_block;
2413 	peer_req->submit_jif = jiffies;
2414 	peer_req->flags |= EE_APPLICATION;
2415 
2416 	dp_flags = be32_to_cpu(p->dp_flags);
2417 	peer_req->opf = wire_flags_to_bio(connection, dp_flags);
2418 	if (pi->cmd == P_TRIM) {
2419 		D_ASSERT(peer_device, peer_req->i.size > 0);
2420 		D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD);
2421 		D_ASSERT(peer_device, peer_req->pages == NULL);
2422 		/* need to play safe: an older DRBD sender
2423 		 * may mean zero-out while sending P_TRIM. */
2424 		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2425 			peer_req->flags |= EE_ZEROOUT;
2426 	} else if (pi->cmd == P_ZEROES) {
2427 		D_ASSERT(peer_device, peer_req->i.size > 0);
2428 		D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES);
2429 		D_ASSERT(peer_device, peer_req->pages == NULL);
2430 		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2431 		if (dp_flags & DP_DISCARD)
2432 			peer_req->flags |= EE_TRIM;
2433 	} else if (peer_req->pages == NULL) {
2434 		D_ASSERT(device, peer_req->i.size == 0);
2435 		D_ASSERT(device, dp_flags & DP_FLUSH);
2436 	}
2437 
2438 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2439 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2440 
2441 	spin_lock(&connection->epoch_lock);
2442 	peer_req->epoch = connection->current_epoch;
2443 	atomic_inc(&peer_req->epoch->epoch_size);
2444 	atomic_inc(&peer_req->epoch->active);
2445 	spin_unlock(&connection->epoch_lock);
2446 
2447 	rcu_read_lock();
2448 	nc = rcu_dereference(peer_device->connection->net_conf);
2449 	tp = nc->two_primaries;
2450 	if (peer_device->connection->agreed_pro_version < 100) {
2451 		switch (nc->wire_protocol) {
2452 		case DRBD_PROT_C:
2453 			dp_flags |= DP_SEND_WRITE_ACK;
2454 			break;
2455 		case DRBD_PROT_B:
2456 			dp_flags |= DP_SEND_RECEIVE_ACK;
2457 			break;
2458 		}
2459 	}
2460 	rcu_read_unlock();
2461 
2462 	if (dp_flags & DP_SEND_WRITE_ACK) {
2463 		peer_req->flags |= EE_SEND_WRITE_ACK;
2464 		inc_unacked(device);
2465 		/* corresponding dec_unacked() in e_end_block()
2466 		 * respective _drbd_clear_done_ee */
2467 	}
2468 
2469 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2470 		/* I really don't like it that the receiver thread
2471 		 * sends on the msock, but anyways */
2472 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2473 	}
2474 
2475 	if (tp) {
2476 		/* two primaries implies protocol C */
2477 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2478 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2479 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2480 		if (err)
2481 			goto out_interrupted;
2482 		spin_lock_irq(&device->resource->req_lock);
2483 		err = handle_write_conflicts(device, peer_req);
2484 		if (err) {
2485 			spin_unlock_irq(&device->resource->req_lock);
2486 			if (err == -ENOENT) {
2487 				put_ldev(device);
2488 				return 0;
2489 			}
2490 			goto out_interrupted;
2491 		}
2492 	} else {
2493 		update_peer_seq(peer_device, peer_seq);
2494 		spin_lock_irq(&device->resource->req_lock);
2495 	}
2496 	/* TRIM and is processed synchronously,
2497 	 * we wait for all pending requests, respectively wait for
2498 	 * active_ee to become empty in drbd_submit_peer_request();
2499 	 * better not add ourselves here. */
2500 	if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
2501 		list_add_tail(&peer_req->w.list, &device->active_ee);
2502 	spin_unlock_irq(&device->resource->req_lock);
2503 
2504 	if (device->state.conn == C_SYNC_TARGET)
2505 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2506 
2507 	if (device->state.pdsk < D_INCONSISTENT) {
2508 		/* In case we have the only disk of the cluster, */
2509 		drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
2510 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2511 		drbd_al_begin_io(device, &peer_req->i);
2512 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2513 	}
2514 
2515 	err = drbd_submit_peer_request(peer_req);
2516 	if (!err)
2517 		return 0;
2518 
2519 	/* don't care for the reason here */
2520 	drbd_err(device, "submit failed, triggering re-connect\n");
2521 	spin_lock_irq(&device->resource->req_lock);
2522 	list_del(&peer_req->w.list);
2523 	drbd_remove_epoch_entry_interval(device, peer_req);
2524 	spin_unlock_irq(&device->resource->req_lock);
2525 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2526 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2527 		drbd_al_complete_io(device, &peer_req->i);
2528 	}
2529 
2530 out_interrupted:
2531 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2532 	put_ldev(device);
2533 	drbd_free_peer_req(device, peer_req);
2534 	return err;
2535 }
2536 
2537 /* We may throttle resync, if the lower device seems to be busy,
2538  * and current sync rate is above c_min_rate.
2539  *
2540  * To decide whether or not the lower device is busy, we use a scheme similar
2541  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2542  * (more than 64 sectors) of activity we cannot account for with our own resync
2543  * activity, it obviously is "busy".
2544  *
2545  * The current sync rate used here uses only the most recent two step marks,
2546  * to have a short time average so we can react faster.
2547  */
2548 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector,
2549 		bool throttle_if_app_is_waiting)
2550 {
2551 	struct drbd_device *device = peer_device->device;
2552 	struct lc_element *tmp;
2553 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2554 
2555 	if (!throttle || throttle_if_app_is_waiting)
2556 		return throttle;
2557 
2558 	spin_lock_irq(&device->al_lock);
2559 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2560 	if (tmp) {
2561 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2562 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2563 			throttle = false;
2564 		/* Do not slow down if app IO is already waiting for this extent,
2565 		 * and our progress is necessary for application IO to complete. */
2566 	}
2567 	spin_unlock_irq(&device->al_lock);
2568 
2569 	return throttle;
2570 }
2571 
2572 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2573 {
2574 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2575 	unsigned long db, dt, dbdt;
2576 	unsigned int c_min_rate;
2577 	int curr_events;
2578 
2579 	rcu_read_lock();
2580 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2581 	rcu_read_unlock();
2582 
2583 	/* feature disabled? */
2584 	if (c_min_rate == 0)
2585 		return false;
2586 
2587 	curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2588 			atomic_read(&device->rs_sect_ev);
2589 
2590 	if (atomic_read(&device->ap_actlog_cnt)
2591 	    || curr_events - device->rs_last_events > 64) {
2592 		unsigned long rs_left;
2593 		int i;
2594 
2595 		device->rs_last_events = curr_events;
2596 
2597 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2598 		 * approx. */
2599 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2600 
2601 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2602 			rs_left = device->ov_left;
2603 		else
2604 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2605 
2606 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2607 		if (!dt)
2608 			dt++;
2609 		db = device->rs_mark_left[i] - rs_left;
2610 		dbdt = Bit2KB(db/dt);
2611 
2612 		if (dbdt > c_min_rate)
2613 			return true;
2614 	}
2615 	return false;
2616 }
2617 
2618 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2619 {
2620 	struct drbd_peer_device *peer_device;
2621 	struct drbd_device *device;
2622 	sector_t sector;
2623 	sector_t capacity;
2624 	struct drbd_peer_request *peer_req;
2625 	struct digest_info *di = NULL;
2626 	int size, verb;
2627 	struct p_block_req *p =	pi->data;
2628 
2629 	peer_device = conn_peer_device(connection, pi->vnr);
2630 	if (!peer_device)
2631 		return -EIO;
2632 	device = peer_device->device;
2633 	capacity = get_capacity(device->vdisk);
2634 
2635 	sector = be64_to_cpu(p->sector);
2636 	size   = be32_to_cpu(p->blksize);
2637 
2638 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2639 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2640 				(unsigned long long)sector, size);
2641 		return -EINVAL;
2642 	}
2643 	if (sector + (size>>9) > capacity) {
2644 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2645 				(unsigned long long)sector, size);
2646 		return -EINVAL;
2647 	}
2648 
2649 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2650 		verb = 1;
2651 		switch (pi->cmd) {
2652 		case P_DATA_REQUEST:
2653 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2654 			break;
2655 		case P_RS_THIN_REQ:
2656 		case P_RS_DATA_REQUEST:
2657 		case P_CSUM_RS_REQUEST:
2658 		case P_OV_REQUEST:
2659 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2660 			break;
2661 		case P_OV_REPLY:
2662 			verb = 0;
2663 			dec_rs_pending(peer_device);
2664 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2665 			break;
2666 		default:
2667 			BUG();
2668 		}
2669 		if (verb && drbd_ratelimit())
2670 			drbd_err(device, "Can not satisfy peer's read request, "
2671 			    "no local data.\n");
2672 
2673 		/* drain possibly payload */
2674 		return drbd_drain_block(peer_device, pi->size);
2675 	}
2676 
2677 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2678 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2679 	 * which in turn might block on the other node at this very place.  */
2680 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2681 			size, GFP_NOIO);
2682 	if (!peer_req) {
2683 		put_ldev(device);
2684 		return -ENOMEM;
2685 	}
2686 	peer_req->opf = REQ_OP_READ;
2687 
2688 	switch (pi->cmd) {
2689 	case P_DATA_REQUEST:
2690 		peer_req->w.cb = w_e_end_data_req;
2691 		/* application IO, don't drbd_rs_begin_io */
2692 		peer_req->flags |= EE_APPLICATION;
2693 		goto submit;
2694 
2695 	case P_RS_THIN_REQ:
2696 		/* If at some point in the future we have a smart way to
2697 		   find out if this data block is completely deallocated,
2698 		   then we would do something smarter here than reading
2699 		   the block... */
2700 		peer_req->flags |= EE_RS_THIN_REQ;
2701 		fallthrough;
2702 	case P_RS_DATA_REQUEST:
2703 		peer_req->w.cb = w_e_end_rsdata_req;
2704 		/* used in the sector offset progress display */
2705 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2706 		break;
2707 
2708 	case P_OV_REPLY:
2709 	case P_CSUM_RS_REQUEST:
2710 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2711 		if (!di)
2712 			goto out_free_e;
2713 
2714 		di->digest_size = pi->size;
2715 		di->digest = (((char *)di)+sizeof(struct digest_info));
2716 
2717 		peer_req->digest = di;
2718 		peer_req->flags |= EE_HAS_DIGEST;
2719 
2720 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2721 			goto out_free_e;
2722 
2723 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2724 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2725 			peer_req->w.cb = w_e_end_csum_rs_req;
2726 			/* used in the sector offset progress display */
2727 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2728 			/* remember to report stats in drbd_resync_finished */
2729 			device->use_csums = true;
2730 		} else if (pi->cmd == P_OV_REPLY) {
2731 			/* track progress, we may need to throttle */
2732 			atomic_add(size >> 9, &device->rs_sect_in);
2733 			peer_req->w.cb = w_e_end_ov_reply;
2734 			dec_rs_pending(peer_device);
2735 			/* drbd_rs_begin_io done when we sent this request,
2736 			 * but accounting still needs to be done. */
2737 			goto submit_for_resync;
2738 		}
2739 		break;
2740 
2741 	case P_OV_REQUEST:
2742 		if (device->ov_start_sector == ~(sector_t)0 &&
2743 		    peer_device->connection->agreed_pro_version >= 90) {
2744 			unsigned long now = jiffies;
2745 			int i;
2746 			device->ov_start_sector = sector;
2747 			device->ov_position = sector;
2748 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2749 			device->rs_total = device->ov_left;
2750 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2751 				device->rs_mark_left[i] = device->ov_left;
2752 				device->rs_mark_time[i] = now;
2753 			}
2754 			drbd_info(device, "Online Verify start sector: %llu\n",
2755 					(unsigned long long)sector);
2756 		}
2757 		peer_req->w.cb = w_e_end_ov_req;
2758 		break;
2759 
2760 	default:
2761 		BUG();
2762 	}
2763 
2764 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2765 	 * wrt the receiver, but it is not as straightforward as it may seem.
2766 	 * Various places in the resync start and stop logic assume resync
2767 	 * requests are processed in order, requeuing this on the worker thread
2768 	 * introduces a bunch of new code for synchronization between threads.
2769 	 *
2770 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2771 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2772 	 * for application writes for the same time.  For now, just throttle
2773 	 * here, where the rest of the code expects the receiver to sleep for
2774 	 * a while, anyways.
2775 	 */
2776 
2777 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2778 	 * this defers syncer requests for some time, before letting at least
2779 	 * on request through.  The resync controller on the receiving side
2780 	 * will adapt to the incoming rate accordingly.
2781 	 *
2782 	 * We cannot throttle here if remote is Primary/SyncTarget:
2783 	 * we would also throttle its application reads.
2784 	 * In that case, throttling is done on the SyncTarget only.
2785 	 */
2786 
2787 	/* Even though this may be a resync request, we do add to "read_ee";
2788 	 * "sync_ee" is only used for resync WRITEs.
2789 	 * Add to list early, so debugfs can find this request
2790 	 * even if we have to sleep below. */
2791 	spin_lock_irq(&device->resource->req_lock);
2792 	list_add_tail(&peer_req->w.list, &device->read_ee);
2793 	spin_unlock_irq(&device->resource->req_lock);
2794 
2795 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2796 	if (device->state.peer != R_PRIMARY
2797 	&& drbd_rs_should_slow_down(peer_device, sector, false))
2798 		schedule_timeout_uninterruptible(HZ/10);
2799 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2800 	if (drbd_rs_begin_io(device, sector))
2801 		goto out_free_e;
2802 
2803 submit_for_resync:
2804 	atomic_add(size >> 9, &device->rs_sect_ev);
2805 
2806 submit:
2807 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2808 	inc_unacked(device);
2809 	if (drbd_submit_peer_request(peer_req) == 0)
2810 		return 0;
2811 
2812 	/* don't care for the reason here */
2813 	drbd_err(device, "submit failed, triggering re-connect\n");
2814 
2815 out_free_e:
2816 	spin_lock_irq(&device->resource->req_lock);
2817 	list_del(&peer_req->w.list);
2818 	spin_unlock_irq(&device->resource->req_lock);
2819 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2820 
2821 	put_ldev(device);
2822 	drbd_free_peer_req(device, peer_req);
2823 	return -EIO;
2824 }
2825 
2826 /*
2827  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2828  */
2829 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2830 {
2831 	struct drbd_device *device = peer_device->device;
2832 	int self, peer, rv = -100;
2833 	unsigned long ch_self, ch_peer;
2834 	enum drbd_after_sb_p after_sb_0p;
2835 
2836 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2837 	peer = device->p_uuid[UI_BITMAP] & 1;
2838 
2839 	ch_peer = device->p_uuid[UI_SIZE];
2840 	ch_self = device->comm_bm_set;
2841 
2842 	rcu_read_lock();
2843 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2844 	rcu_read_unlock();
2845 	switch (after_sb_0p) {
2846 	case ASB_CONSENSUS:
2847 	case ASB_DISCARD_SECONDARY:
2848 	case ASB_CALL_HELPER:
2849 	case ASB_VIOLENTLY:
2850 		drbd_err(device, "Configuration error.\n");
2851 		break;
2852 	case ASB_DISCONNECT:
2853 		break;
2854 	case ASB_DISCARD_YOUNGER_PRI:
2855 		if (self == 0 && peer == 1) {
2856 			rv = -1;
2857 			break;
2858 		}
2859 		if (self == 1 && peer == 0) {
2860 			rv =  1;
2861 			break;
2862 		}
2863 		fallthrough;	/* to one of the other strategies */
2864 	case ASB_DISCARD_OLDER_PRI:
2865 		if (self == 0 && peer == 1) {
2866 			rv = 1;
2867 			break;
2868 		}
2869 		if (self == 1 && peer == 0) {
2870 			rv = -1;
2871 			break;
2872 		}
2873 		/* Else fall through to one of the other strategies... */
2874 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2875 		     "Using discard-least-changes instead\n");
2876 		fallthrough;
2877 	case ASB_DISCARD_ZERO_CHG:
2878 		if (ch_peer == 0 && ch_self == 0) {
2879 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2880 				? -1 : 1;
2881 			break;
2882 		} else {
2883 			if (ch_peer == 0) { rv =  1; break; }
2884 			if (ch_self == 0) { rv = -1; break; }
2885 		}
2886 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2887 			break;
2888 		fallthrough;
2889 	case ASB_DISCARD_LEAST_CHG:
2890 		if	(ch_self < ch_peer)
2891 			rv = -1;
2892 		else if (ch_self > ch_peer)
2893 			rv =  1;
2894 		else /* ( ch_self == ch_peer ) */
2895 		     /* Well, then use something else. */
2896 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2897 				? -1 : 1;
2898 		break;
2899 	case ASB_DISCARD_LOCAL:
2900 		rv = -1;
2901 		break;
2902 	case ASB_DISCARD_REMOTE:
2903 		rv =  1;
2904 	}
2905 
2906 	return rv;
2907 }
2908 
2909 /*
2910  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2911  */
2912 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2913 {
2914 	struct drbd_device *device = peer_device->device;
2915 	int hg, rv = -100;
2916 	enum drbd_after_sb_p after_sb_1p;
2917 
2918 	rcu_read_lock();
2919 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2920 	rcu_read_unlock();
2921 	switch (after_sb_1p) {
2922 	case ASB_DISCARD_YOUNGER_PRI:
2923 	case ASB_DISCARD_OLDER_PRI:
2924 	case ASB_DISCARD_LEAST_CHG:
2925 	case ASB_DISCARD_LOCAL:
2926 	case ASB_DISCARD_REMOTE:
2927 	case ASB_DISCARD_ZERO_CHG:
2928 		drbd_err(device, "Configuration error.\n");
2929 		break;
2930 	case ASB_DISCONNECT:
2931 		break;
2932 	case ASB_CONSENSUS:
2933 		hg = drbd_asb_recover_0p(peer_device);
2934 		if (hg == -1 && device->state.role == R_SECONDARY)
2935 			rv = hg;
2936 		if (hg == 1  && device->state.role == R_PRIMARY)
2937 			rv = hg;
2938 		break;
2939 	case ASB_VIOLENTLY:
2940 		rv = drbd_asb_recover_0p(peer_device);
2941 		break;
2942 	case ASB_DISCARD_SECONDARY:
2943 		return device->state.role == R_PRIMARY ? 1 : -1;
2944 	case ASB_CALL_HELPER:
2945 		hg = drbd_asb_recover_0p(peer_device);
2946 		if (hg == -1 && device->state.role == R_PRIMARY) {
2947 			enum drbd_state_rv rv2;
2948 
2949 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2950 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2951 			  * we do not need to wait for the after state change work either. */
2952 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2953 			if (rv2 != SS_SUCCESS) {
2954 				drbd_khelper(device, "pri-lost-after-sb");
2955 			} else {
2956 				drbd_warn(device, "Successfully gave up primary role.\n");
2957 				rv = hg;
2958 			}
2959 		} else
2960 			rv = hg;
2961 	}
2962 
2963 	return rv;
2964 }
2965 
2966 /*
2967  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2968  */
2969 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2970 {
2971 	struct drbd_device *device = peer_device->device;
2972 	int hg, rv = -100;
2973 	enum drbd_after_sb_p after_sb_2p;
2974 
2975 	rcu_read_lock();
2976 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2977 	rcu_read_unlock();
2978 	switch (after_sb_2p) {
2979 	case ASB_DISCARD_YOUNGER_PRI:
2980 	case ASB_DISCARD_OLDER_PRI:
2981 	case ASB_DISCARD_LEAST_CHG:
2982 	case ASB_DISCARD_LOCAL:
2983 	case ASB_DISCARD_REMOTE:
2984 	case ASB_CONSENSUS:
2985 	case ASB_DISCARD_SECONDARY:
2986 	case ASB_DISCARD_ZERO_CHG:
2987 		drbd_err(device, "Configuration error.\n");
2988 		break;
2989 	case ASB_VIOLENTLY:
2990 		rv = drbd_asb_recover_0p(peer_device);
2991 		break;
2992 	case ASB_DISCONNECT:
2993 		break;
2994 	case ASB_CALL_HELPER:
2995 		hg = drbd_asb_recover_0p(peer_device);
2996 		if (hg == -1) {
2997 			enum drbd_state_rv rv2;
2998 
2999 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3000 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3001 			  * we do not need to wait for the after state change work either. */
3002 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3003 			if (rv2 != SS_SUCCESS) {
3004 				drbd_khelper(device, "pri-lost-after-sb");
3005 			} else {
3006 				drbd_warn(device, "Successfully gave up primary role.\n");
3007 				rv = hg;
3008 			}
3009 		} else
3010 			rv = hg;
3011 	}
3012 
3013 	return rv;
3014 }
3015 
3016 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3017 			   u64 bits, u64 flags)
3018 {
3019 	if (!uuid) {
3020 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3021 		return;
3022 	}
3023 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3024 	     text,
3025 	     (unsigned long long)uuid[UI_CURRENT],
3026 	     (unsigned long long)uuid[UI_BITMAP],
3027 	     (unsigned long long)uuid[UI_HISTORY_START],
3028 	     (unsigned long long)uuid[UI_HISTORY_END],
3029 	     (unsigned long long)bits,
3030 	     (unsigned long long)flags);
3031 }
3032 
3033 /*
3034   100	after split brain try auto recover
3035     2	C_SYNC_SOURCE set BitMap
3036     1	C_SYNC_SOURCE use BitMap
3037     0	no Sync
3038    -1	C_SYNC_TARGET use BitMap
3039    -2	C_SYNC_TARGET set BitMap
3040  -100	after split brain, disconnect
3041 -1000	unrelated data
3042 -1091   requires proto 91
3043 -1096   requires proto 96
3044  */
3045 
3046 static int drbd_uuid_compare(struct drbd_peer_device *const peer_device,
3047 		enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3048 {
3049 	struct drbd_connection *const connection = peer_device->connection;
3050 	struct drbd_device *device = peer_device->device;
3051 	u64 self, peer;
3052 	int i, j;
3053 
3054 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3055 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3056 
3057 	*rule_nr = 10;
3058 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3059 		return 0;
3060 
3061 	*rule_nr = 20;
3062 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3063 	     peer != UUID_JUST_CREATED)
3064 		return -2;
3065 
3066 	*rule_nr = 30;
3067 	if (self != UUID_JUST_CREATED &&
3068 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3069 		return 2;
3070 
3071 	if (self == peer) {
3072 		int rct, dc; /* roles at crash time */
3073 
3074 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3075 
3076 			if (connection->agreed_pro_version < 91)
3077 				return -1091;
3078 
3079 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3080 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3081 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3082 				drbd_uuid_move_history(device);
3083 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3084 				device->ldev->md.uuid[UI_BITMAP] = 0;
3085 
3086 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3087 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3088 				*rule_nr = 34;
3089 			} else {
3090 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3091 				*rule_nr = 36;
3092 			}
3093 
3094 			return 1;
3095 		}
3096 
3097 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3098 
3099 			if (connection->agreed_pro_version < 91)
3100 				return -1091;
3101 
3102 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3103 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3104 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3105 
3106 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3107 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3108 				device->p_uuid[UI_BITMAP] = 0UL;
3109 
3110 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3111 				*rule_nr = 35;
3112 			} else {
3113 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3114 				*rule_nr = 37;
3115 			}
3116 
3117 			return -1;
3118 		}
3119 
3120 		/* Common power [off|failure] */
3121 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3122 			(device->p_uuid[UI_FLAGS] & 2);
3123 		/* lowest bit is set when we were primary,
3124 		 * next bit (weight 2) is set when peer was primary */
3125 		*rule_nr = 40;
3126 
3127 		/* Neither has the "crashed primary" flag set,
3128 		 * only a replication link hickup. */
3129 		if (rct == 0)
3130 			return 0;
3131 
3132 		/* Current UUID equal and no bitmap uuid; does not necessarily
3133 		 * mean this was a "simultaneous hard crash", maybe IO was
3134 		 * frozen, so no UUID-bump happened.
3135 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3136 		 * for "new-enough" peer DRBD version. */
3137 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3138 			*rule_nr = 41;
3139 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3140 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3141 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3142 			}
3143 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3144 				/* At least one has the "crashed primary" bit set,
3145 				 * both are primary now, but neither has rotated its UUIDs?
3146 				 * "Can not happen." */
3147 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3148 				return -100;
3149 			}
3150 			if (device->state.role == R_PRIMARY)
3151 				return 1;
3152 			return -1;
3153 		}
3154 
3155 		/* Both are secondary.
3156 		 * Really looks like recovery from simultaneous hard crash.
3157 		 * Check which had been primary before, and arbitrate. */
3158 		switch (rct) {
3159 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3160 		case 1: /*  self_pri && !peer_pri */ return 1;
3161 		case 2: /* !self_pri &&  peer_pri */ return -1;
3162 		case 3: /*  self_pri &&  peer_pri */
3163 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3164 			return dc ? -1 : 1;
3165 		}
3166 	}
3167 
3168 	*rule_nr = 50;
3169 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3170 	if (self == peer)
3171 		return -1;
3172 
3173 	*rule_nr = 51;
3174 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3175 	if (self == peer) {
3176 		if (connection->agreed_pro_version < 96 ?
3177 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3178 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3179 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3180 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3181 			   resync as sync source modifications of the peer's UUIDs. */
3182 
3183 			if (connection->agreed_pro_version < 91)
3184 				return -1091;
3185 
3186 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3187 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3188 
3189 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3190 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3191 
3192 			return -1;
3193 		}
3194 	}
3195 
3196 	*rule_nr = 60;
3197 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3198 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3199 		peer = device->p_uuid[i] & ~((u64)1);
3200 		if (self == peer)
3201 			return -2;
3202 	}
3203 
3204 	*rule_nr = 70;
3205 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3206 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207 	if (self == peer)
3208 		return 1;
3209 
3210 	*rule_nr = 71;
3211 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3212 	if (self == peer) {
3213 		if (connection->agreed_pro_version < 96 ?
3214 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3215 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3216 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3217 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3218 			   resync as sync source modifications of our UUIDs. */
3219 
3220 			if (connection->agreed_pro_version < 91)
3221 				return -1091;
3222 
3223 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3224 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3225 
3226 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3227 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3228 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3229 
3230 			return 1;
3231 		}
3232 	}
3233 
3234 
3235 	*rule_nr = 80;
3236 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3237 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3238 		self = device->ldev->md.uuid[i] & ~((u64)1);
3239 		if (self == peer)
3240 			return 2;
3241 	}
3242 
3243 	*rule_nr = 90;
3244 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3245 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3246 	if (self == peer && self != ((u64)0))
3247 		return 100;
3248 
3249 	*rule_nr = 100;
3250 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3251 		self = device->ldev->md.uuid[i] & ~((u64)1);
3252 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3253 			peer = device->p_uuid[j] & ~((u64)1);
3254 			if (self == peer)
3255 				return -100;
3256 		}
3257 	}
3258 
3259 	return -1000;
3260 }
3261 
3262 /* drbd_sync_handshake() returns the new conn state on success, or
3263    CONN_MASK (-1) on failure.
3264  */
3265 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3266 					   enum drbd_role peer_role,
3267 					   enum drbd_disk_state peer_disk) __must_hold(local)
3268 {
3269 	struct drbd_device *device = peer_device->device;
3270 	enum drbd_conns rv = C_MASK;
3271 	enum drbd_disk_state mydisk;
3272 	struct net_conf *nc;
3273 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3274 
3275 	mydisk = device->state.disk;
3276 	if (mydisk == D_NEGOTIATING)
3277 		mydisk = device->new_state_tmp.disk;
3278 
3279 	drbd_info(device, "drbd_sync_handshake:\n");
3280 
3281 	spin_lock_irq(&device->ldev->md.uuid_lock);
3282 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3283 	drbd_uuid_dump(device, "peer", device->p_uuid,
3284 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3285 
3286 	hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr);
3287 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3288 
3289 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3290 
3291 	if (hg == -1000) {
3292 		drbd_alert(device, "Unrelated data, aborting!\n");
3293 		return C_MASK;
3294 	}
3295 	if (hg < -0x10000) {
3296 		int proto, fflags;
3297 		hg = -hg;
3298 		proto = hg & 0xff;
3299 		fflags = (hg >> 8) & 0xff;
3300 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3301 					proto, fflags);
3302 		return C_MASK;
3303 	}
3304 	if (hg < -1000) {
3305 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3306 		return C_MASK;
3307 	}
3308 
3309 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3310 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3311 		int f = (hg == -100) || abs(hg) == 2;
3312 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3313 		if (f)
3314 			hg = hg*2;
3315 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3316 		     hg > 0 ? "source" : "target");
3317 	}
3318 
3319 	if (abs(hg) == 100)
3320 		drbd_khelper(device, "initial-split-brain");
3321 
3322 	rcu_read_lock();
3323 	nc = rcu_dereference(peer_device->connection->net_conf);
3324 	always_asbp = nc->always_asbp;
3325 	rr_conflict = nc->rr_conflict;
3326 	tentative = nc->tentative;
3327 	rcu_read_unlock();
3328 
3329 	if (hg == 100 || (hg == -100 && always_asbp)) {
3330 		int pcount = (device->state.role == R_PRIMARY)
3331 			   + (peer_role == R_PRIMARY);
3332 		int forced = (hg == -100);
3333 
3334 		switch (pcount) {
3335 		case 0:
3336 			hg = drbd_asb_recover_0p(peer_device);
3337 			break;
3338 		case 1:
3339 			hg = drbd_asb_recover_1p(peer_device);
3340 			break;
3341 		case 2:
3342 			hg = drbd_asb_recover_2p(peer_device);
3343 			break;
3344 		}
3345 		if (abs(hg) < 100) {
3346 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3347 			     "automatically solved. Sync from %s node\n",
3348 			     pcount, (hg < 0) ? "peer" : "this");
3349 			if (forced) {
3350 				drbd_warn(device, "Doing a full sync, since"
3351 				     " UUIDs where ambiguous.\n");
3352 				hg = hg*2;
3353 			}
3354 		}
3355 	}
3356 
3357 	if (hg == -100) {
3358 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3359 			hg = -1;
3360 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3361 			hg = 1;
3362 
3363 		if (abs(hg) < 100)
3364 			drbd_warn(device, "Split-Brain detected, manually solved. "
3365 			     "Sync from %s node\n",
3366 			     (hg < 0) ? "peer" : "this");
3367 	}
3368 
3369 	if (hg == -100) {
3370 		/* FIXME this log message is not correct if we end up here
3371 		 * after an attempted attach on a diskless node.
3372 		 * We just refuse to attach -- well, we drop the "connection"
3373 		 * to that disk, in a way... */
3374 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3375 		drbd_khelper(device, "split-brain");
3376 		return C_MASK;
3377 	}
3378 
3379 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3380 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3381 		return C_MASK;
3382 	}
3383 
3384 	if (hg < 0 && /* by intention we do not use mydisk here. */
3385 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3386 		switch (rr_conflict) {
3387 		case ASB_CALL_HELPER:
3388 			drbd_khelper(device, "pri-lost");
3389 			fallthrough;
3390 		case ASB_DISCONNECT:
3391 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3392 			return C_MASK;
3393 		case ASB_VIOLENTLY:
3394 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3395 			     "assumption\n");
3396 		}
3397 	}
3398 
3399 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3400 		if (hg == 0)
3401 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3402 		else
3403 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3404 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3405 				 abs(hg) >= 2 ? "full" : "bit-map based");
3406 		return C_MASK;
3407 	}
3408 
3409 	if (abs(hg) >= 2) {
3410 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3411 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3412 					BM_LOCKED_SET_ALLOWED, NULL))
3413 			return C_MASK;
3414 	}
3415 
3416 	if (hg > 0) { /* become sync source. */
3417 		rv = C_WF_BITMAP_S;
3418 	} else if (hg < 0) { /* become sync target */
3419 		rv = C_WF_BITMAP_T;
3420 	} else {
3421 		rv = C_CONNECTED;
3422 		if (drbd_bm_total_weight(device)) {
3423 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3424 			     drbd_bm_total_weight(device));
3425 		}
3426 	}
3427 
3428 	return rv;
3429 }
3430 
3431 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3432 {
3433 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3434 	if (peer == ASB_DISCARD_REMOTE)
3435 		return ASB_DISCARD_LOCAL;
3436 
3437 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3438 	if (peer == ASB_DISCARD_LOCAL)
3439 		return ASB_DISCARD_REMOTE;
3440 
3441 	/* everything else is valid if they are equal on both sides. */
3442 	return peer;
3443 }
3444 
3445 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3446 {
3447 	struct p_protocol *p = pi->data;
3448 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3449 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3450 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3451 	char integrity_alg[SHARED_SECRET_MAX] = "";
3452 	struct crypto_shash *peer_integrity_tfm = NULL;
3453 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3454 
3455 	p_proto		= be32_to_cpu(p->protocol);
3456 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3457 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3458 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3459 	p_two_primaries = be32_to_cpu(p->two_primaries);
3460 	cf		= be32_to_cpu(p->conn_flags);
3461 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3462 
3463 	if (connection->agreed_pro_version >= 87) {
3464 		int err;
3465 
3466 		if (pi->size > sizeof(integrity_alg))
3467 			return -EIO;
3468 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3469 		if (err)
3470 			return err;
3471 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3472 	}
3473 
3474 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3475 		clear_bit(CONN_DRY_RUN, &connection->flags);
3476 
3477 		if (cf & CF_DRY_RUN)
3478 			set_bit(CONN_DRY_RUN, &connection->flags);
3479 
3480 		rcu_read_lock();
3481 		nc = rcu_dereference(connection->net_conf);
3482 
3483 		if (p_proto != nc->wire_protocol) {
3484 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3485 			goto disconnect_rcu_unlock;
3486 		}
3487 
3488 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3489 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3490 			goto disconnect_rcu_unlock;
3491 		}
3492 
3493 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3494 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3495 			goto disconnect_rcu_unlock;
3496 		}
3497 
3498 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3499 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3500 			goto disconnect_rcu_unlock;
3501 		}
3502 
3503 		if (p_discard_my_data && nc->discard_my_data) {
3504 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3505 			goto disconnect_rcu_unlock;
3506 		}
3507 
3508 		if (p_two_primaries != nc->two_primaries) {
3509 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3510 			goto disconnect_rcu_unlock;
3511 		}
3512 
3513 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3514 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3515 			goto disconnect_rcu_unlock;
3516 		}
3517 
3518 		rcu_read_unlock();
3519 	}
3520 
3521 	if (integrity_alg[0]) {
3522 		int hash_size;
3523 
3524 		/*
3525 		 * We can only change the peer data integrity algorithm
3526 		 * here.  Changing our own data integrity algorithm
3527 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3528 		 * the same time; otherwise, the peer has no way to
3529 		 * tell between which packets the algorithm should
3530 		 * change.
3531 		 */
3532 
3533 		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3534 		if (IS_ERR(peer_integrity_tfm)) {
3535 			peer_integrity_tfm = NULL;
3536 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3537 				 integrity_alg);
3538 			goto disconnect;
3539 		}
3540 
3541 		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3542 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3543 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3544 		if (!(int_dig_in && int_dig_vv)) {
3545 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3546 			goto disconnect;
3547 		}
3548 	}
3549 
3550 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3551 	if (!new_net_conf)
3552 		goto disconnect;
3553 
3554 	mutex_lock(&connection->data.mutex);
3555 	mutex_lock(&connection->resource->conf_update);
3556 	old_net_conf = connection->net_conf;
3557 	*new_net_conf = *old_net_conf;
3558 
3559 	new_net_conf->wire_protocol = p_proto;
3560 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3561 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3562 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3563 	new_net_conf->two_primaries = p_two_primaries;
3564 
3565 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3566 	mutex_unlock(&connection->resource->conf_update);
3567 	mutex_unlock(&connection->data.mutex);
3568 
3569 	crypto_free_shash(connection->peer_integrity_tfm);
3570 	kfree(connection->int_dig_in);
3571 	kfree(connection->int_dig_vv);
3572 	connection->peer_integrity_tfm = peer_integrity_tfm;
3573 	connection->int_dig_in = int_dig_in;
3574 	connection->int_dig_vv = int_dig_vv;
3575 
3576 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3577 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3578 			  integrity_alg[0] ? integrity_alg : "(none)");
3579 
3580 	kvfree_rcu_mightsleep(old_net_conf);
3581 	return 0;
3582 
3583 disconnect_rcu_unlock:
3584 	rcu_read_unlock();
3585 disconnect:
3586 	crypto_free_shash(peer_integrity_tfm);
3587 	kfree(int_dig_in);
3588 	kfree(int_dig_vv);
3589 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3590 	return -EIO;
3591 }
3592 
3593 /* helper function
3594  * input: alg name, feature name
3595  * return: NULL (alg name was "")
3596  *         ERR_PTR(error) if something goes wrong
3597  *         or the crypto hash ptr, if it worked out ok. */
3598 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3599 		const struct drbd_device *device,
3600 		const char *alg, const char *name)
3601 {
3602 	struct crypto_shash *tfm;
3603 
3604 	if (!alg[0])
3605 		return NULL;
3606 
3607 	tfm = crypto_alloc_shash(alg, 0, 0);
3608 	if (IS_ERR(tfm)) {
3609 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3610 			alg, name, PTR_ERR(tfm));
3611 		return tfm;
3612 	}
3613 	return tfm;
3614 }
3615 
3616 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3617 {
3618 	void *buffer = connection->data.rbuf;
3619 	int size = pi->size;
3620 
3621 	while (size) {
3622 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3623 		s = drbd_recv(connection, buffer, s);
3624 		if (s <= 0) {
3625 			if (s < 0)
3626 				return s;
3627 			break;
3628 		}
3629 		size -= s;
3630 	}
3631 	if (size)
3632 		return -EIO;
3633 	return 0;
3634 }
3635 
3636 /*
3637  * config_unknown_volume  -  device configuration command for unknown volume
3638  *
3639  * When a device is added to an existing connection, the node on which the
3640  * device is added first will send configuration commands to its peer but the
3641  * peer will not know about the device yet.  It will warn and ignore these
3642  * commands.  Once the device is added on the second node, the second node will
3643  * send the same device configuration commands, but in the other direction.
3644  *
3645  * (We can also end up here if drbd is misconfigured.)
3646  */
3647 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3648 {
3649 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3650 		  cmdname(pi->cmd), pi->vnr);
3651 	return ignore_remaining_packet(connection, pi);
3652 }
3653 
3654 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3655 {
3656 	struct drbd_peer_device *peer_device;
3657 	struct drbd_device *device;
3658 	struct p_rs_param_95 *p;
3659 	unsigned int header_size, data_size, exp_max_sz;
3660 	struct crypto_shash *verify_tfm = NULL;
3661 	struct crypto_shash *csums_tfm = NULL;
3662 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3663 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3664 	const int apv = connection->agreed_pro_version;
3665 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3666 	unsigned int fifo_size = 0;
3667 	int err;
3668 
3669 	peer_device = conn_peer_device(connection, pi->vnr);
3670 	if (!peer_device)
3671 		return config_unknown_volume(connection, pi);
3672 	device = peer_device->device;
3673 
3674 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3675 		    : apv == 88 ? sizeof(struct p_rs_param)
3676 					+ SHARED_SECRET_MAX
3677 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3678 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3679 
3680 	if (pi->size > exp_max_sz) {
3681 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3682 		    pi->size, exp_max_sz);
3683 		return -EIO;
3684 	}
3685 
3686 	if (apv <= 88) {
3687 		header_size = sizeof(struct p_rs_param);
3688 		data_size = pi->size - header_size;
3689 	} else if (apv <= 94) {
3690 		header_size = sizeof(struct p_rs_param_89);
3691 		data_size = pi->size - header_size;
3692 		D_ASSERT(device, data_size == 0);
3693 	} else {
3694 		header_size = sizeof(struct p_rs_param_95);
3695 		data_size = pi->size - header_size;
3696 		D_ASSERT(device, data_size == 0);
3697 	}
3698 
3699 	/* initialize verify_alg and csums_alg */
3700 	p = pi->data;
3701 	BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
3702 	memset(&p->algs, 0, sizeof(p->algs));
3703 
3704 	err = drbd_recv_all(peer_device->connection, p, header_size);
3705 	if (err)
3706 		return err;
3707 
3708 	mutex_lock(&connection->resource->conf_update);
3709 	old_net_conf = peer_device->connection->net_conf;
3710 	if (get_ldev(device)) {
3711 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3712 		if (!new_disk_conf) {
3713 			put_ldev(device);
3714 			mutex_unlock(&connection->resource->conf_update);
3715 			drbd_err(device, "Allocation of new disk_conf failed\n");
3716 			return -ENOMEM;
3717 		}
3718 
3719 		old_disk_conf = device->ldev->disk_conf;
3720 		*new_disk_conf = *old_disk_conf;
3721 
3722 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3723 	}
3724 
3725 	if (apv >= 88) {
3726 		if (apv == 88) {
3727 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3728 				drbd_err(device, "verify-alg of wrong size, "
3729 					"peer wants %u, accepting only up to %u byte\n",
3730 					data_size, SHARED_SECRET_MAX);
3731 				goto reconnect;
3732 			}
3733 
3734 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3735 			if (err)
3736 				goto reconnect;
3737 			/* we expect NUL terminated string */
3738 			/* but just in case someone tries to be evil */
3739 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3740 			p->verify_alg[data_size-1] = 0;
3741 
3742 		} else /* apv >= 89 */ {
3743 			/* we still expect NUL terminated strings */
3744 			/* but just in case someone tries to be evil */
3745 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3746 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3747 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3748 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3749 		}
3750 
3751 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3752 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3753 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3754 				    old_net_conf->verify_alg, p->verify_alg);
3755 				goto disconnect;
3756 			}
3757 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3758 					p->verify_alg, "verify-alg");
3759 			if (IS_ERR(verify_tfm)) {
3760 				verify_tfm = NULL;
3761 				goto disconnect;
3762 			}
3763 		}
3764 
3765 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3766 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3767 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3768 				    old_net_conf->csums_alg, p->csums_alg);
3769 				goto disconnect;
3770 			}
3771 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3772 					p->csums_alg, "csums-alg");
3773 			if (IS_ERR(csums_tfm)) {
3774 				csums_tfm = NULL;
3775 				goto disconnect;
3776 			}
3777 		}
3778 
3779 		if (apv > 94 && new_disk_conf) {
3780 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3781 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3782 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3783 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3784 
3785 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3786 			if (fifo_size != device->rs_plan_s->size) {
3787 				new_plan = fifo_alloc(fifo_size);
3788 				if (!new_plan) {
3789 					drbd_err(device, "kmalloc of fifo_buffer failed");
3790 					put_ldev(device);
3791 					goto disconnect;
3792 				}
3793 			}
3794 		}
3795 
3796 		if (verify_tfm || csums_tfm) {
3797 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3798 			if (!new_net_conf)
3799 				goto disconnect;
3800 
3801 			*new_net_conf = *old_net_conf;
3802 
3803 			if (verify_tfm) {
3804 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3805 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3806 				crypto_free_shash(peer_device->connection->verify_tfm);
3807 				peer_device->connection->verify_tfm = verify_tfm;
3808 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3809 			}
3810 			if (csums_tfm) {
3811 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3812 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3813 				crypto_free_shash(peer_device->connection->csums_tfm);
3814 				peer_device->connection->csums_tfm = csums_tfm;
3815 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3816 			}
3817 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3818 		}
3819 	}
3820 
3821 	if (new_disk_conf) {
3822 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3823 		put_ldev(device);
3824 	}
3825 
3826 	if (new_plan) {
3827 		old_plan = device->rs_plan_s;
3828 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3829 	}
3830 
3831 	mutex_unlock(&connection->resource->conf_update);
3832 	synchronize_rcu();
3833 	if (new_net_conf)
3834 		kfree(old_net_conf);
3835 	kfree(old_disk_conf);
3836 	kfree(old_plan);
3837 
3838 	return 0;
3839 
3840 reconnect:
3841 	if (new_disk_conf) {
3842 		put_ldev(device);
3843 		kfree(new_disk_conf);
3844 	}
3845 	mutex_unlock(&connection->resource->conf_update);
3846 	return -EIO;
3847 
3848 disconnect:
3849 	kfree(new_plan);
3850 	if (new_disk_conf) {
3851 		put_ldev(device);
3852 		kfree(new_disk_conf);
3853 	}
3854 	mutex_unlock(&connection->resource->conf_update);
3855 	/* just for completeness: actually not needed,
3856 	 * as this is not reached if csums_tfm was ok. */
3857 	crypto_free_shash(csums_tfm);
3858 	/* but free the verify_tfm again, if csums_tfm did not work out */
3859 	crypto_free_shash(verify_tfm);
3860 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3861 	return -EIO;
3862 }
3863 
3864 /* warn if the arguments differ by more than 12.5% */
3865 static void warn_if_differ_considerably(struct drbd_device *device,
3866 	const char *s, sector_t a, sector_t b)
3867 {
3868 	sector_t d;
3869 	if (a == 0 || b == 0)
3870 		return;
3871 	d = (a > b) ? (a - b) : (b - a);
3872 	if (d > (a>>3) || d > (b>>3))
3873 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3874 		     (unsigned long long)a, (unsigned long long)b);
3875 }
3876 
3877 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3878 {
3879 	struct drbd_peer_device *peer_device;
3880 	struct drbd_device *device;
3881 	struct p_sizes *p = pi->data;
3882 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3883 	enum determine_dev_size dd = DS_UNCHANGED;
3884 	sector_t p_size, p_usize, p_csize, my_usize;
3885 	sector_t new_size, cur_size;
3886 	int ldsc = 0; /* local disk size changed */
3887 	enum dds_flags ddsf;
3888 
3889 	peer_device = conn_peer_device(connection, pi->vnr);
3890 	if (!peer_device)
3891 		return config_unknown_volume(connection, pi);
3892 	device = peer_device->device;
3893 	cur_size = get_capacity(device->vdisk);
3894 
3895 	p_size = be64_to_cpu(p->d_size);
3896 	p_usize = be64_to_cpu(p->u_size);
3897 	p_csize = be64_to_cpu(p->c_size);
3898 
3899 	/* just store the peer's disk size for now.
3900 	 * we still need to figure out whether we accept that. */
3901 	device->p_size = p_size;
3902 
3903 	if (get_ldev(device)) {
3904 		rcu_read_lock();
3905 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3906 		rcu_read_unlock();
3907 
3908 		warn_if_differ_considerably(device, "lower level device sizes",
3909 			   p_size, drbd_get_max_capacity(device->ldev));
3910 		warn_if_differ_considerably(device, "user requested size",
3911 					    p_usize, my_usize);
3912 
3913 		/* if this is the first connect, or an otherwise expected
3914 		 * param exchange, choose the minimum */
3915 		if (device->state.conn == C_WF_REPORT_PARAMS)
3916 			p_usize = min_not_zero(my_usize, p_usize);
3917 
3918 		/* Never shrink a device with usable data during connect,
3919 		 * or "attach" on the peer.
3920 		 * But allow online shrinking if we are connected. */
3921 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3922 		if (new_size < cur_size &&
3923 		    device->state.disk >= D_OUTDATED &&
3924 		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
3925 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3926 					(unsigned long long)new_size, (unsigned long long)cur_size);
3927 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3928 			put_ldev(device);
3929 			return -EIO;
3930 		}
3931 
3932 		if (my_usize != p_usize) {
3933 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3934 
3935 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3936 			if (!new_disk_conf) {
3937 				put_ldev(device);
3938 				return -ENOMEM;
3939 			}
3940 
3941 			mutex_lock(&connection->resource->conf_update);
3942 			old_disk_conf = device->ldev->disk_conf;
3943 			*new_disk_conf = *old_disk_conf;
3944 			new_disk_conf->disk_size = p_usize;
3945 
3946 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3947 			mutex_unlock(&connection->resource->conf_update);
3948 			kvfree_rcu_mightsleep(old_disk_conf);
3949 
3950 			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
3951 				 (unsigned long)p_usize, (unsigned long)my_usize);
3952 		}
3953 
3954 		put_ldev(device);
3955 	}
3956 
3957 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3958 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
3959 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3960 	   drbd_reconsider_queue_parameters(), we can be sure that after
3961 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3962 
3963 	ddsf = be16_to_cpu(p->dds_flags);
3964 	if (get_ldev(device)) {
3965 		drbd_reconsider_queue_parameters(device, device->ldev, o);
3966 		dd = drbd_determine_dev_size(device, ddsf, NULL);
3967 		put_ldev(device);
3968 		if (dd == DS_ERROR)
3969 			return -EIO;
3970 		drbd_md_sync(device);
3971 	} else {
3972 		/*
3973 		 * I am diskless, need to accept the peer's *current* size.
3974 		 * I must NOT accept the peers backing disk size,
3975 		 * it may have been larger than mine all along...
3976 		 *
3977 		 * At this point, the peer knows more about my disk, or at
3978 		 * least about what we last agreed upon, than myself.
3979 		 * So if his c_size is less than his d_size, the most likely
3980 		 * reason is that *my* d_size was smaller last time we checked.
3981 		 *
3982 		 * However, if he sends a zero current size,
3983 		 * take his (user-capped or) backing disk size anyways.
3984 		 *
3985 		 * Unless of course he does not have a disk himself.
3986 		 * In which case we ignore this completely.
3987 		 */
3988 		sector_t new_size = p_csize ?: p_usize ?: p_size;
3989 		drbd_reconsider_queue_parameters(device, NULL, o);
3990 		if (new_size == 0) {
3991 			/* Ignore, peer does not know nothing. */
3992 		} else if (new_size == cur_size) {
3993 			/* nothing to do */
3994 		} else if (cur_size != 0 && p_size == 0) {
3995 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
3996 					(unsigned long long)new_size, (unsigned long long)cur_size);
3997 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
3998 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
3999 					(unsigned long long)new_size, (unsigned long long)cur_size);
4000 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4001 			return -EIO;
4002 		} else {
4003 			/* I believe the peer, if
4004 			 *  - I don't have a current size myself
4005 			 *  - we agree on the size anyways
4006 			 *  - I do have a current size, am Secondary,
4007 			 *    and he has the only disk
4008 			 *  - I do have a current size, am Primary,
4009 			 *    and he has the only disk,
4010 			 *    which is larger than my current size
4011 			 */
4012 			drbd_set_my_capacity(device, new_size);
4013 		}
4014 	}
4015 
4016 	if (get_ldev(device)) {
4017 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4018 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4019 			ldsc = 1;
4020 		}
4021 
4022 		put_ldev(device);
4023 	}
4024 
4025 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4026 		if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4027 		    ldsc) {
4028 			/* we have different sizes, probably peer
4029 			 * needs to know my new size... */
4030 			drbd_send_sizes(peer_device, 0, ddsf);
4031 		}
4032 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4033 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4034 			if (device->state.pdsk >= D_INCONSISTENT &&
4035 			    device->state.disk >= D_INCONSISTENT) {
4036 				if (ddsf & DDSF_NO_RESYNC)
4037 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4038 				else
4039 					resync_after_online_grow(device);
4040 			} else
4041 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4042 		}
4043 	}
4044 
4045 	return 0;
4046 }
4047 
4048 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4049 {
4050 	struct drbd_peer_device *peer_device;
4051 	struct drbd_device *device;
4052 	struct p_uuids *p = pi->data;
4053 	u64 *p_uuid;
4054 	int i, updated_uuids = 0;
4055 
4056 	peer_device = conn_peer_device(connection, pi->vnr);
4057 	if (!peer_device)
4058 		return config_unknown_volume(connection, pi);
4059 	device = peer_device->device;
4060 
4061 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4062 	if (!p_uuid)
4063 		return false;
4064 
4065 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4066 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4067 
4068 	kfree(device->p_uuid);
4069 	device->p_uuid = p_uuid;
4070 
4071 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4072 	    device->state.disk < D_INCONSISTENT &&
4073 	    device->state.role == R_PRIMARY &&
4074 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4075 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4076 		    (unsigned long long)device->ed_uuid);
4077 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4078 		return -EIO;
4079 	}
4080 
4081 	if (get_ldev(device)) {
4082 		int skip_initial_sync =
4083 			device->state.conn == C_CONNECTED &&
4084 			peer_device->connection->agreed_pro_version >= 90 &&
4085 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4086 			(p_uuid[UI_FLAGS] & 8);
4087 		if (skip_initial_sync) {
4088 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4089 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4090 					"clear_n_write from receive_uuids",
4091 					BM_LOCKED_TEST_ALLOWED, NULL);
4092 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4093 			_drbd_uuid_set(device, UI_BITMAP, 0);
4094 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4095 					CS_VERBOSE, NULL);
4096 			drbd_md_sync(device);
4097 			updated_uuids = 1;
4098 		}
4099 		put_ldev(device);
4100 	} else if (device->state.disk < D_INCONSISTENT &&
4101 		   device->state.role == R_PRIMARY) {
4102 		/* I am a diskless primary, the peer just created a new current UUID
4103 		   for me. */
4104 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4105 	}
4106 
4107 	/* Before we test for the disk state, we should wait until an eventually
4108 	   ongoing cluster wide state change is finished. That is important if
4109 	   we are primary and are detaching from our disk. We need to see the
4110 	   new disk state... */
4111 	mutex_lock(device->state_mutex);
4112 	mutex_unlock(device->state_mutex);
4113 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4114 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4115 
4116 	if (updated_uuids)
4117 		drbd_print_uuids(device, "receiver updated UUIDs to");
4118 
4119 	return 0;
4120 }
4121 
4122 /**
4123  * convert_state() - Converts the peer's view of the cluster state to our point of view
4124  * @ps:		The state as seen by the peer.
4125  */
4126 static union drbd_state convert_state(union drbd_state ps)
4127 {
4128 	union drbd_state ms;
4129 
4130 	static enum drbd_conns c_tab[] = {
4131 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4132 		[C_CONNECTED] = C_CONNECTED,
4133 
4134 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4135 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4136 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4137 		[C_VERIFY_S]       = C_VERIFY_T,
4138 		[C_MASK]   = C_MASK,
4139 	};
4140 
4141 	ms.i = ps.i;
4142 
4143 	ms.conn = c_tab[ps.conn];
4144 	ms.peer = ps.role;
4145 	ms.role = ps.peer;
4146 	ms.pdsk = ps.disk;
4147 	ms.disk = ps.pdsk;
4148 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4149 
4150 	return ms;
4151 }
4152 
4153 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4154 {
4155 	struct drbd_peer_device *peer_device;
4156 	struct drbd_device *device;
4157 	struct p_req_state *p = pi->data;
4158 	union drbd_state mask, val;
4159 	enum drbd_state_rv rv;
4160 
4161 	peer_device = conn_peer_device(connection, pi->vnr);
4162 	if (!peer_device)
4163 		return -EIO;
4164 	device = peer_device->device;
4165 
4166 	mask.i = be32_to_cpu(p->mask);
4167 	val.i = be32_to_cpu(p->val);
4168 
4169 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4170 	    mutex_is_locked(device->state_mutex)) {
4171 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4172 		return 0;
4173 	}
4174 
4175 	mask = convert_state(mask);
4176 	val = convert_state(val);
4177 
4178 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4179 	drbd_send_sr_reply(peer_device, rv);
4180 
4181 	drbd_md_sync(device);
4182 
4183 	return 0;
4184 }
4185 
4186 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4187 {
4188 	struct p_req_state *p = pi->data;
4189 	union drbd_state mask, val;
4190 	enum drbd_state_rv rv;
4191 
4192 	mask.i = be32_to_cpu(p->mask);
4193 	val.i = be32_to_cpu(p->val);
4194 
4195 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4196 	    mutex_is_locked(&connection->cstate_mutex)) {
4197 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4198 		return 0;
4199 	}
4200 
4201 	mask = convert_state(mask);
4202 	val = convert_state(val);
4203 
4204 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4205 	conn_send_sr_reply(connection, rv);
4206 
4207 	return 0;
4208 }
4209 
4210 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4211 {
4212 	struct drbd_peer_device *peer_device;
4213 	struct drbd_device *device;
4214 	struct p_state *p = pi->data;
4215 	union drbd_state os, ns, peer_state;
4216 	enum drbd_disk_state real_peer_disk;
4217 	enum chg_state_flags cs_flags;
4218 	int rv;
4219 
4220 	peer_device = conn_peer_device(connection, pi->vnr);
4221 	if (!peer_device)
4222 		return config_unknown_volume(connection, pi);
4223 	device = peer_device->device;
4224 
4225 	peer_state.i = be32_to_cpu(p->state);
4226 
4227 	real_peer_disk = peer_state.disk;
4228 	if (peer_state.disk == D_NEGOTIATING) {
4229 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4230 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4231 	}
4232 
4233 	spin_lock_irq(&device->resource->req_lock);
4234  retry:
4235 	os = ns = drbd_read_state(device);
4236 	spin_unlock_irq(&device->resource->req_lock);
4237 
4238 	/* If some other part of the code (ack_receiver thread, timeout)
4239 	 * already decided to close the connection again,
4240 	 * we must not "re-establish" it here. */
4241 	if (os.conn <= C_TEAR_DOWN)
4242 		return -ECONNRESET;
4243 
4244 	/* If this is the "end of sync" confirmation, usually the peer disk
4245 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4246 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4247 	 * unpause-sync events has been "just right", the peer disk may
4248 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4249 	 */
4250 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4251 	    real_peer_disk == D_UP_TO_DATE &&
4252 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4253 		/* If we are (becoming) SyncSource, but peer is still in sync
4254 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4255 		 * will change to inconsistent once the peer reaches active
4256 		 * syncing states.
4257 		 * It may have changed syncer-paused flags, however, so we
4258 		 * cannot ignore this completely. */
4259 		if (peer_state.conn > C_CONNECTED &&
4260 		    peer_state.conn < C_SYNC_SOURCE)
4261 			real_peer_disk = D_INCONSISTENT;
4262 
4263 		/* if peer_state changes to connected at the same time,
4264 		 * it explicitly notifies us that it finished resync.
4265 		 * Maybe we should finish it up, too? */
4266 		else if (os.conn >= C_SYNC_SOURCE &&
4267 			 peer_state.conn == C_CONNECTED) {
4268 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4269 				drbd_resync_finished(peer_device);
4270 			return 0;
4271 		}
4272 	}
4273 
4274 	/* explicit verify finished notification, stop sector reached. */
4275 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4276 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4277 		ov_out_of_sync_print(peer_device);
4278 		drbd_resync_finished(peer_device);
4279 		return 0;
4280 	}
4281 
4282 	/* peer says his disk is inconsistent, while we think it is uptodate,
4283 	 * and this happens while the peer still thinks we have a sync going on,
4284 	 * but we think we are already done with the sync.
4285 	 * We ignore this to avoid flapping pdsk.
4286 	 * This should not happen, if the peer is a recent version of drbd. */
4287 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4288 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4289 		real_peer_disk = D_UP_TO_DATE;
4290 
4291 	if (ns.conn == C_WF_REPORT_PARAMS)
4292 		ns.conn = C_CONNECTED;
4293 
4294 	if (peer_state.conn == C_AHEAD)
4295 		ns.conn = C_BEHIND;
4296 
4297 	/* TODO:
4298 	 * if (primary and diskless and peer uuid != effective uuid)
4299 	 *     abort attach on peer;
4300 	 *
4301 	 * If this node does not have good data, was already connected, but
4302 	 * the peer did a late attach only now, trying to "negotiate" with me,
4303 	 * AND I am currently Primary, possibly frozen, with some specific
4304 	 * "effective" uuid, this should never be reached, really, because
4305 	 * we first send the uuids, then the current state.
4306 	 *
4307 	 * In this scenario, we already dropped the connection hard
4308 	 * when we received the unsuitable uuids (receive_uuids().
4309 	 *
4310 	 * Should we want to change this, that is: not drop the connection in
4311 	 * receive_uuids() already, then we would need to add a branch here
4312 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4313 	 * this node is currently Diskless Primary.
4314 	 */
4315 
4316 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4317 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4318 		int cr; /* consider resync */
4319 
4320 		/* if we established a new connection */
4321 		cr  = (os.conn < C_CONNECTED);
4322 		/* if we had an established connection
4323 		 * and one of the nodes newly attaches a disk */
4324 		cr |= (os.conn == C_CONNECTED &&
4325 		       (peer_state.disk == D_NEGOTIATING ||
4326 			os.disk == D_NEGOTIATING));
4327 		/* if we have both been inconsistent, and the peer has been
4328 		 * forced to be UpToDate with --force */
4329 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4330 		/* if we had been plain connected, and the admin requested to
4331 		 * start a sync by "invalidate" or "invalidate-remote" */
4332 		cr |= (os.conn == C_CONNECTED &&
4333 				(peer_state.conn >= C_STARTING_SYNC_S &&
4334 				 peer_state.conn <= C_WF_BITMAP_T));
4335 
4336 		if (cr)
4337 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4338 
4339 		put_ldev(device);
4340 		if (ns.conn == C_MASK) {
4341 			ns.conn = C_CONNECTED;
4342 			if (device->state.disk == D_NEGOTIATING) {
4343 				drbd_force_state(device, NS(disk, D_FAILED));
4344 			} else if (peer_state.disk == D_NEGOTIATING) {
4345 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4346 				peer_state.disk = D_DISKLESS;
4347 				real_peer_disk = D_DISKLESS;
4348 			} else {
4349 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4350 					return -EIO;
4351 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4352 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4353 				return -EIO;
4354 			}
4355 		}
4356 	}
4357 
4358 	spin_lock_irq(&device->resource->req_lock);
4359 	if (os.i != drbd_read_state(device).i)
4360 		goto retry;
4361 	clear_bit(CONSIDER_RESYNC, &device->flags);
4362 	ns.peer = peer_state.role;
4363 	ns.pdsk = real_peer_disk;
4364 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4365 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4366 		ns.disk = device->new_state_tmp.disk;
4367 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4368 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4369 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4370 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4371 		   for temporal network outages! */
4372 		spin_unlock_irq(&device->resource->req_lock);
4373 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4374 		tl_clear(peer_device->connection);
4375 		drbd_uuid_new_current(device);
4376 		clear_bit(NEW_CUR_UUID, &device->flags);
4377 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4378 		return -EIO;
4379 	}
4380 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4381 	ns = drbd_read_state(device);
4382 	spin_unlock_irq(&device->resource->req_lock);
4383 
4384 	if (rv < SS_SUCCESS) {
4385 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4386 		return -EIO;
4387 	}
4388 
4389 	if (os.conn > C_WF_REPORT_PARAMS) {
4390 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4391 		    peer_state.disk != D_NEGOTIATING ) {
4392 			/* we want resync, peer has not yet decided to sync... */
4393 			/* Nowadays only used when forcing a node into primary role and
4394 			   setting its disk to UpToDate with that */
4395 			drbd_send_uuids(peer_device);
4396 			drbd_send_current_state(peer_device);
4397 		}
4398 	}
4399 
4400 	clear_bit(DISCARD_MY_DATA, &device->flags);
4401 
4402 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4403 
4404 	return 0;
4405 }
4406 
4407 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4408 {
4409 	struct drbd_peer_device *peer_device;
4410 	struct drbd_device *device;
4411 	struct p_rs_uuid *p = pi->data;
4412 
4413 	peer_device = conn_peer_device(connection, pi->vnr);
4414 	if (!peer_device)
4415 		return -EIO;
4416 	device = peer_device->device;
4417 
4418 	wait_event(device->misc_wait,
4419 		   device->state.conn == C_WF_SYNC_UUID ||
4420 		   device->state.conn == C_BEHIND ||
4421 		   device->state.conn < C_CONNECTED ||
4422 		   device->state.disk < D_NEGOTIATING);
4423 
4424 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4425 
4426 	/* Here the _drbd_uuid_ functions are right, current should
4427 	   _not_ be rotated into the history */
4428 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4429 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4430 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4431 
4432 		drbd_print_uuids(device, "updated sync uuid");
4433 		drbd_start_resync(device, C_SYNC_TARGET);
4434 
4435 		put_ldev(device);
4436 	} else
4437 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4438 
4439 	return 0;
4440 }
4441 
4442 /*
4443  * receive_bitmap_plain
4444  *
4445  * Return 0 when done, 1 when another iteration is needed, and a negative error
4446  * code upon failure.
4447  */
4448 static int
4449 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4450 		     unsigned long *p, struct bm_xfer_ctx *c)
4451 {
4452 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4453 				 drbd_header_size(peer_device->connection);
4454 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4455 				       c->bm_words - c->word_offset);
4456 	unsigned int want = num_words * sizeof(*p);
4457 	int err;
4458 
4459 	if (want != size) {
4460 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4461 		return -EIO;
4462 	}
4463 	if (want == 0)
4464 		return 0;
4465 	err = drbd_recv_all(peer_device->connection, p, want);
4466 	if (err)
4467 		return err;
4468 
4469 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4470 
4471 	c->word_offset += num_words;
4472 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4473 	if (c->bit_offset > c->bm_bits)
4474 		c->bit_offset = c->bm_bits;
4475 
4476 	return 1;
4477 }
4478 
4479 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4480 {
4481 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4482 }
4483 
4484 static int dcbp_get_start(struct p_compressed_bm *p)
4485 {
4486 	return (p->encoding & 0x80) != 0;
4487 }
4488 
4489 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4490 {
4491 	return (p->encoding >> 4) & 0x7;
4492 }
4493 
4494 /*
4495  * recv_bm_rle_bits
4496  *
4497  * Return 0 when done, 1 when another iteration is needed, and a negative error
4498  * code upon failure.
4499  */
4500 static int
4501 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4502 		struct p_compressed_bm *p,
4503 		 struct bm_xfer_ctx *c,
4504 		 unsigned int len)
4505 {
4506 	struct bitstream bs;
4507 	u64 look_ahead;
4508 	u64 rl;
4509 	u64 tmp;
4510 	unsigned long s = c->bit_offset;
4511 	unsigned long e;
4512 	int toggle = dcbp_get_start(p);
4513 	int have;
4514 	int bits;
4515 
4516 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4517 
4518 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4519 	if (bits < 0)
4520 		return -EIO;
4521 
4522 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4523 		bits = vli_decode_bits(&rl, look_ahead);
4524 		if (bits <= 0)
4525 			return -EIO;
4526 
4527 		if (toggle) {
4528 			e = s + rl -1;
4529 			if (e >= c->bm_bits) {
4530 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4531 				return -EIO;
4532 			}
4533 			_drbd_bm_set_bits(peer_device->device, s, e);
4534 		}
4535 
4536 		if (have < bits) {
4537 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4538 				have, bits, look_ahead,
4539 				(unsigned int)(bs.cur.b - p->code),
4540 				(unsigned int)bs.buf_len);
4541 			return -EIO;
4542 		}
4543 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4544 		if (likely(bits < 64))
4545 			look_ahead >>= bits;
4546 		else
4547 			look_ahead = 0;
4548 		have -= bits;
4549 
4550 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4551 		if (bits < 0)
4552 			return -EIO;
4553 		look_ahead |= tmp << have;
4554 		have += bits;
4555 	}
4556 
4557 	c->bit_offset = s;
4558 	bm_xfer_ctx_bit_to_word_offset(c);
4559 
4560 	return (s != c->bm_bits);
4561 }
4562 
4563 /*
4564  * decode_bitmap_c
4565  *
4566  * Return 0 when done, 1 when another iteration is needed, and a negative error
4567  * code upon failure.
4568  */
4569 static int
4570 decode_bitmap_c(struct drbd_peer_device *peer_device,
4571 		struct p_compressed_bm *p,
4572 		struct bm_xfer_ctx *c,
4573 		unsigned int len)
4574 {
4575 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4576 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4577 
4578 	/* other variants had been implemented for evaluation,
4579 	 * but have been dropped as this one turned out to be "best"
4580 	 * during all our tests. */
4581 
4582 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4583 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4584 	return -EIO;
4585 }
4586 
4587 void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device,
4588 		const char *direction, struct bm_xfer_ctx *c)
4589 {
4590 	/* what would it take to transfer it "plaintext" */
4591 	unsigned int header_size = drbd_header_size(peer_device->connection);
4592 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4593 	unsigned int plain =
4594 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4595 		c->bm_words * sizeof(unsigned long);
4596 	unsigned int total = c->bytes[0] + c->bytes[1];
4597 	unsigned int r;
4598 
4599 	/* total can not be zero. but just in case: */
4600 	if (total == 0)
4601 		return;
4602 
4603 	/* don't report if not compressed */
4604 	if (total >= plain)
4605 		return;
4606 
4607 	/* total < plain. check for overflow, still */
4608 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4609 		                    : (1000 * total / plain);
4610 
4611 	if (r > 1000)
4612 		r = 1000;
4613 
4614 	r = 1000 - r;
4615 	drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4616 	     "total %u; compression: %u.%u%%\n",
4617 			direction,
4618 			c->bytes[1], c->packets[1],
4619 			c->bytes[0], c->packets[0],
4620 			total, r/10, r % 10);
4621 }
4622 
4623 /* Since we are processing the bitfield from lower addresses to higher,
4624    it does not matter if the process it in 32 bit chunks or 64 bit
4625    chunks as long as it is little endian. (Understand it as byte stream,
4626    beginning with the lowest byte...) If we would use big endian
4627    we would need to process it from the highest address to the lowest,
4628    in order to be agnostic to the 32 vs 64 bits issue.
4629 
4630    returns 0 on failure, 1 if we successfully received it. */
4631 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4632 {
4633 	struct drbd_peer_device *peer_device;
4634 	struct drbd_device *device;
4635 	struct bm_xfer_ctx c;
4636 	int err;
4637 
4638 	peer_device = conn_peer_device(connection, pi->vnr);
4639 	if (!peer_device)
4640 		return -EIO;
4641 	device = peer_device->device;
4642 
4643 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4644 	/* you are supposed to send additional out-of-sync information
4645 	 * if you actually set bits during this phase */
4646 
4647 	c = (struct bm_xfer_ctx) {
4648 		.bm_bits = drbd_bm_bits(device),
4649 		.bm_words = drbd_bm_words(device),
4650 	};
4651 
4652 	for(;;) {
4653 		if (pi->cmd == P_BITMAP)
4654 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4655 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4656 			/* MAYBE: sanity check that we speak proto >= 90,
4657 			 * and the feature is enabled! */
4658 			struct p_compressed_bm *p = pi->data;
4659 
4660 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4661 				drbd_err(device, "ReportCBitmap packet too large\n");
4662 				err = -EIO;
4663 				goto out;
4664 			}
4665 			if (pi->size <= sizeof(*p)) {
4666 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4667 				err = -EIO;
4668 				goto out;
4669 			}
4670 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4671 			if (err)
4672 			       goto out;
4673 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4674 		} else {
4675 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4676 			err = -EIO;
4677 			goto out;
4678 		}
4679 
4680 		c.packets[pi->cmd == P_BITMAP]++;
4681 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4682 
4683 		if (err <= 0) {
4684 			if (err < 0)
4685 				goto out;
4686 			break;
4687 		}
4688 		err = drbd_recv_header(peer_device->connection, pi);
4689 		if (err)
4690 			goto out;
4691 	}
4692 
4693 	INFO_bm_xfer_stats(peer_device, "receive", &c);
4694 
4695 	if (device->state.conn == C_WF_BITMAP_T) {
4696 		enum drbd_state_rv rv;
4697 
4698 		err = drbd_send_bitmap(device, peer_device);
4699 		if (err)
4700 			goto out;
4701 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4702 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4703 		D_ASSERT(device, rv == SS_SUCCESS);
4704 	} else if (device->state.conn != C_WF_BITMAP_S) {
4705 		/* admin may have requested C_DISCONNECTING,
4706 		 * other threads may have noticed network errors */
4707 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4708 		    drbd_conn_str(device->state.conn));
4709 	}
4710 	err = 0;
4711 
4712  out:
4713 	drbd_bm_unlock(device);
4714 	if (!err && device->state.conn == C_WF_BITMAP_S)
4715 		drbd_start_resync(device, C_SYNC_SOURCE);
4716 	return err;
4717 }
4718 
4719 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4720 {
4721 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4722 		 pi->cmd, pi->size);
4723 
4724 	return ignore_remaining_packet(connection, pi);
4725 }
4726 
4727 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4728 {
4729 	/* Make sure we've acked all the TCP data associated
4730 	 * with the data requests being unplugged */
4731 	tcp_sock_set_quickack(connection->data.socket->sk, 2);
4732 	return 0;
4733 }
4734 
4735 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4736 {
4737 	struct drbd_peer_device *peer_device;
4738 	struct drbd_device *device;
4739 	struct p_block_desc *p = pi->data;
4740 
4741 	peer_device = conn_peer_device(connection, pi->vnr);
4742 	if (!peer_device)
4743 		return -EIO;
4744 	device = peer_device->device;
4745 
4746 	switch (device->state.conn) {
4747 	case C_WF_SYNC_UUID:
4748 	case C_WF_BITMAP_T:
4749 	case C_BEHIND:
4750 			break;
4751 	default:
4752 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4753 				drbd_conn_str(device->state.conn));
4754 	}
4755 
4756 	drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4757 
4758 	return 0;
4759 }
4760 
4761 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4762 {
4763 	struct drbd_peer_device *peer_device;
4764 	struct p_block_desc *p = pi->data;
4765 	struct drbd_device *device;
4766 	sector_t sector;
4767 	int size, err = 0;
4768 
4769 	peer_device = conn_peer_device(connection, pi->vnr);
4770 	if (!peer_device)
4771 		return -EIO;
4772 	device = peer_device->device;
4773 
4774 	sector = be64_to_cpu(p->sector);
4775 	size = be32_to_cpu(p->blksize);
4776 
4777 	dec_rs_pending(peer_device);
4778 
4779 	if (get_ldev(device)) {
4780 		struct drbd_peer_request *peer_req;
4781 
4782 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4783 					       size, 0, GFP_NOIO);
4784 		if (!peer_req) {
4785 			put_ldev(device);
4786 			return -ENOMEM;
4787 		}
4788 
4789 		peer_req->w.cb = e_end_resync_block;
4790 		peer_req->opf = REQ_OP_DISCARD;
4791 		peer_req->submit_jif = jiffies;
4792 		peer_req->flags |= EE_TRIM;
4793 
4794 		spin_lock_irq(&device->resource->req_lock);
4795 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4796 		spin_unlock_irq(&device->resource->req_lock);
4797 
4798 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4799 		err = drbd_submit_peer_request(peer_req);
4800 
4801 		if (err) {
4802 			spin_lock_irq(&device->resource->req_lock);
4803 			list_del(&peer_req->w.list);
4804 			spin_unlock_irq(&device->resource->req_lock);
4805 
4806 			drbd_free_peer_req(device, peer_req);
4807 			put_ldev(device);
4808 			err = 0;
4809 			goto fail;
4810 		}
4811 
4812 		inc_unacked(device);
4813 
4814 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4815 		   as well as drbd_rs_complete_io() */
4816 	} else {
4817 	fail:
4818 		drbd_rs_complete_io(device, sector);
4819 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4820 	}
4821 
4822 	atomic_add(size >> 9, &device->rs_sect_in);
4823 
4824 	return err;
4825 }
4826 
4827 struct data_cmd {
4828 	int expect_payload;
4829 	unsigned int pkt_size;
4830 	int (*fn)(struct drbd_connection *, struct packet_info *);
4831 };
4832 
4833 static struct data_cmd drbd_cmd_handler[] = {
4834 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4835 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4836 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4837 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4838 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4839 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4840 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4841 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4842 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4843 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4844 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4845 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4846 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4847 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4848 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4849 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4850 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4851 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4852 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4853 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4854 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4855 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4856 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4857 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4858 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4859 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4860 	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
4861 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4862 };
4863 
4864 static void drbdd(struct drbd_connection *connection)
4865 {
4866 	struct packet_info pi;
4867 	size_t shs; /* sub header size */
4868 	int err;
4869 
4870 	while (get_t_state(&connection->receiver) == RUNNING) {
4871 		struct data_cmd const *cmd;
4872 
4873 		drbd_thread_current_set_cpu(&connection->receiver);
4874 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4875 		if (drbd_recv_header_maybe_unplug(connection, &pi))
4876 			goto err_out;
4877 
4878 		cmd = &drbd_cmd_handler[pi.cmd];
4879 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4880 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4881 				 cmdname(pi.cmd), pi.cmd);
4882 			goto err_out;
4883 		}
4884 
4885 		shs = cmd->pkt_size;
4886 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4887 			shs += sizeof(struct o_qlim);
4888 		if (pi.size > shs && !cmd->expect_payload) {
4889 			drbd_err(connection, "No payload expected %s l:%d\n",
4890 				 cmdname(pi.cmd), pi.size);
4891 			goto err_out;
4892 		}
4893 		if (pi.size < shs) {
4894 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4895 				 cmdname(pi.cmd), (int)shs, pi.size);
4896 			goto err_out;
4897 		}
4898 
4899 		if (shs) {
4900 			update_receiver_timing_details(connection, drbd_recv_all_warn);
4901 			err = drbd_recv_all_warn(connection, pi.data, shs);
4902 			if (err)
4903 				goto err_out;
4904 			pi.size -= shs;
4905 		}
4906 
4907 		update_receiver_timing_details(connection, cmd->fn);
4908 		err = cmd->fn(connection, &pi);
4909 		if (err) {
4910 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4911 				 cmdname(pi.cmd), err, pi.size);
4912 			goto err_out;
4913 		}
4914 	}
4915 	return;
4916 
4917     err_out:
4918 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4919 }
4920 
4921 static void conn_disconnect(struct drbd_connection *connection)
4922 {
4923 	struct drbd_peer_device *peer_device;
4924 	enum drbd_conns oc;
4925 	int vnr;
4926 
4927 	if (connection->cstate == C_STANDALONE)
4928 		return;
4929 
4930 	/* We are about to start the cleanup after connection loss.
4931 	 * Make sure drbd_make_request knows about that.
4932 	 * Usually we should be in some network failure state already,
4933 	 * but just in case we are not, we fix it up here.
4934 	 */
4935 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4936 
4937 	/* ack_receiver does not clean up anything. it must not interfere, either */
4938 	drbd_thread_stop(&connection->ack_receiver);
4939 	if (connection->ack_sender) {
4940 		destroy_workqueue(connection->ack_sender);
4941 		connection->ack_sender = NULL;
4942 	}
4943 	drbd_free_sock(connection);
4944 
4945 	rcu_read_lock();
4946 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4947 		struct drbd_device *device = peer_device->device;
4948 		kref_get(&device->kref);
4949 		rcu_read_unlock();
4950 		drbd_disconnected(peer_device);
4951 		kref_put(&device->kref, drbd_destroy_device);
4952 		rcu_read_lock();
4953 	}
4954 	rcu_read_unlock();
4955 
4956 	if (!list_empty(&connection->current_epoch->list))
4957 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4958 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4959 	atomic_set(&connection->current_epoch->epoch_size, 0);
4960 	connection->send.seen_any_write_yet = false;
4961 
4962 	drbd_info(connection, "Connection closed\n");
4963 
4964 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4965 		conn_try_outdate_peer_async(connection);
4966 
4967 	spin_lock_irq(&connection->resource->req_lock);
4968 	oc = connection->cstate;
4969 	if (oc >= C_UNCONNECTED)
4970 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4971 
4972 	spin_unlock_irq(&connection->resource->req_lock);
4973 
4974 	if (oc == C_DISCONNECTING)
4975 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4976 }
4977 
4978 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4979 {
4980 	struct drbd_device *device = peer_device->device;
4981 	unsigned int i;
4982 
4983 	/* wait for current activity to cease. */
4984 	spin_lock_irq(&device->resource->req_lock);
4985 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4986 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4987 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4988 	spin_unlock_irq(&device->resource->req_lock);
4989 
4990 	/* We do not have data structures that would allow us to
4991 	 * get the rs_pending_cnt down to 0 again.
4992 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4993 	 *    the pending RSDataRequest's we have sent.
4994 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4995 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4996 	 *  And no, it is not the sum of the reference counts in the
4997 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4998 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4999 	 *  on the fly. */
5000 	drbd_rs_cancel_all(device);
5001 	device->rs_total = 0;
5002 	device->rs_failed = 0;
5003 	atomic_set(&device->rs_pending_cnt, 0);
5004 	wake_up(&device->misc_wait);
5005 
5006 	timer_delete_sync(&device->resync_timer);
5007 	resync_timer_fn(&device->resync_timer);
5008 
5009 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5010 	 * w_make_resync_request etc. which may still be on the worker queue
5011 	 * to be "canceled" */
5012 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5013 
5014 	drbd_finish_peer_reqs(device);
5015 
5016 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5017 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5018 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5019 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5020 
5021 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5022 	 * again via drbd_try_clear_on_disk_bm(). */
5023 	drbd_rs_cancel_all(device);
5024 
5025 	kfree(device->p_uuid);
5026 	device->p_uuid = NULL;
5027 
5028 	if (!drbd_suspended(device))
5029 		tl_clear(peer_device->connection);
5030 
5031 	drbd_md_sync(device);
5032 
5033 	if (get_ldev(device)) {
5034 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5035 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL);
5036 		put_ldev(device);
5037 	}
5038 
5039 	i = atomic_read(&device->pp_in_use_by_net);
5040 	if (i)
5041 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5042 	i = atomic_read(&device->pp_in_use);
5043 	if (i)
5044 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5045 
5046 	D_ASSERT(device, list_empty(&device->read_ee));
5047 	D_ASSERT(device, list_empty(&device->active_ee));
5048 	D_ASSERT(device, list_empty(&device->sync_ee));
5049 	D_ASSERT(device, list_empty(&device->done_ee));
5050 
5051 	return 0;
5052 }
5053 
5054 /*
5055  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5056  * we can agree on is stored in agreed_pro_version.
5057  *
5058  * feature flags and the reserved array should be enough room for future
5059  * enhancements of the handshake protocol, and possible plugins...
5060  *
5061  * for now, they are expected to be zero, but ignored.
5062  */
5063 static int drbd_send_features(struct drbd_connection *connection)
5064 {
5065 	struct drbd_socket *sock;
5066 	struct p_connection_features *p;
5067 
5068 	sock = &connection->data;
5069 	p = conn_prepare_command(connection, sock);
5070 	if (!p)
5071 		return -EIO;
5072 	memset(p, 0, sizeof(*p));
5073 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5074 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5075 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5076 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5077 }
5078 
5079 /*
5080  * return values:
5081  *   1 yes, we have a valid connection
5082  *   0 oops, did not work out, please try again
5083  *  -1 peer talks different language,
5084  *     no point in trying again, please go standalone.
5085  */
5086 static int drbd_do_features(struct drbd_connection *connection)
5087 {
5088 	/* ASSERT current == connection->receiver ... */
5089 	struct p_connection_features *p;
5090 	const int expect = sizeof(struct p_connection_features);
5091 	struct packet_info pi;
5092 	int err;
5093 
5094 	err = drbd_send_features(connection);
5095 	if (err)
5096 		return 0;
5097 
5098 	err = drbd_recv_header(connection, &pi);
5099 	if (err)
5100 		return 0;
5101 
5102 	if (pi.cmd != P_CONNECTION_FEATURES) {
5103 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5104 			 cmdname(pi.cmd), pi.cmd);
5105 		return -1;
5106 	}
5107 
5108 	if (pi.size != expect) {
5109 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5110 		     expect, pi.size);
5111 		return -1;
5112 	}
5113 
5114 	p = pi.data;
5115 	err = drbd_recv_all_warn(connection, p, expect);
5116 	if (err)
5117 		return 0;
5118 
5119 	p->protocol_min = be32_to_cpu(p->protocol_min);
5120 	p->protocol_max = be32_to_cpu(p->protocol_max);
5121 	if (p->protocol_max == 0)
5122 		p->protocol_max = p->protocol_min;
5123 
5124 	if (PRO_VERSION_MAX < p->protocol_min ||
5125 	    PRO_VERSION_MIN > p->protocol_max)
5126 		goto incompat;
5127 
5128 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5129 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5130 
5131 	drbd_info(connection, "Handshake successful: "
5132 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5133 
5134 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5135 		  connection->agreed_features,
5136 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5137 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5138 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5139 		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5140 		  connection->agreed_features ? "" : " none");
5141 
5142 	return 1;
5143 
5144  incompat:
5145 	drbd_err(connection, "incompatible DRBD dialects: "
5146 	    "I support %d-%d, peer supports %d-%d\n",
5147 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5148 	    p->protocol_min, p->protocol_max);
5149 	return -1;
5150 }
5151 
5152 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5153 static int drbd_do_auth(struct drbd_connection *connection)
5154 {
5155 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5156 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5157 	return -1;
5158 }
5159 #else
5160 #define CHALLENGE_LEN 64
5161 
5162 /* Return value:
5163 	1 - auth succeeded,
5164 	0 - failed, try again (network error),
5165 	-1 - auth failed, don't try again.
5166 */
5167 
5168 static int drbd_do_auth(struct drbd_connection *connection)
5169 {
5170 	struct drbd_socket *sock;
5171 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5172 	char *response = NULL;
5173 	char *right_response = NULL;
5174 	char *peers_ch = NULL;
5175 	unsigned int key_len;
5176 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5177 	unsigned int resp_size;
5178 	struct shash_desc *desc;
5179 	struct packet_info pi;
5180 	struct net_conf *nc;
5181 	int err, rv;
5182 
5183 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5184 
5185 	rcu_read_lock();
5186 	nc = rcu_dereference(connection->net_conf);
5187 	key_len = strlen(nc->shared_secret);
5188 	memcpy(secret, nc->shared_secret, key_len);
5189 	rcu_read_unlock();
5190 
5191 	desc = kmalloc(sizeof(struct shash_desc) +
5192 		       crypto_shash_descsize(connection->cram_hmac_tfm),
5193 		       GFP_KERNEL);
5194 	if (!desc) {
5195 		rv = -1;
5196 		goto fail;
5197 	}
5198 	desc->tfm = connection->cram_hmac_tfm;
5199 
5200 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5201 	if (rv) {
5202 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5203 		rv = -1;
5204 		goto fail;
5205 	}
5206 
5207 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5208 
5209 	sock = &connection->data;
5210 	if (!conn_prepare_command(connection, sock)) {
5211 		rv = 0;
5212 		goto fail;
5213 	}
5214 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5215 				my_challenge, CHALLENGE_LEN);
5216 	if (!rv)
5217 		goto fail;
5218 
5219 	err = drbd_recv_header(connection, &pi);
5220 	if (err) {
5221 		rv = 0;
5222 		goto fail;
5223 	}
5224 
5225 	if (pi.cmd != P_AUTH_CHALLENGE) {
5226 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5227 			 cmdname(pi.cmd), pi.cmd);
5228 		rv = -1;
5229 		goto fail;
5230 	}
5231 
5232 	if (pi.size > CHALLENGE_LEN * 2) {
5233 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5234 		rv = -1;
5235 		goto fail;
5236 	}
5237 
5238 	if (pi.size < CHALLENGE_LEN) {
5239 		drbd_err(connection, "AuthChallenge payload too small.\n");
5240 		rv = -1;
5241 		goto fail;
5242 	}
5243 
5244 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5245 	if (!peers_ch) {
5246 		rv = -1;
5247 		goto fail;
5248 	}
5249 
5250 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5251 	if (err) {
5252 		rv = 0;
5253 		goto fail;
5254 	}
5255 
5256 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5257 		drbd_err(connection, "Peer presented the same challenge!\n");
5258 		rv = -1;
5259 		goto fail;
5260 	}
5261 
5262 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5263 	response = kmalloc(resp_size, GFP_NOIO);
5264 	if (!response) {
5265 		rv = -1;
5266 		goto fail;
5267 	}
5268 
5269 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5270 	if (rv) {
5271 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5272 		rv = -1;
5273 		goto fail;
5274 	}
5275 
5276 	if (!conn_prepare_command(connection, sock)) {
5277 		rv = 0;
5278 		goto fail;
5279 	}
5280 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5281 				response, resp_size);
5282 	if (!rv)
5283 		goto fail;
5284 
5285 	err = drbd_recv_header(connection, &pi);
5286 	if (err) {
5287 		rv = 0;
5288 		goto fail;
5289 	}
5290 
5291 	if (pi.cmd != P_AUTH_RESPONSE) {
5292 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5293 			 cmdname(pi.cmd), pi.cmd);
5294 		rv = 0;
5295 		goto fail;
5296 	}
5297 
5298 	if (pi.size != resp_size) {
5299 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5300 		rv = 0;
5301 		goto fail;
5302 	}
5303 
5304 	err = drbd_recv_all_warn(connection, response , resp_size);
5305 	if (err) {
5306 		rv = 0;
5307 		goto fail;
5308 	}
5309 
5310 	right_response = kmalloc(resp_size, GFP_NOIO);
5311 	if (!right_response) {
5312 		rv = -1;
5313 		goto fail;
5314 	}
5315 
5316 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5317 				 right_response);
5318 	if (rv) {
5319 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5320 		rv = -1;
5321 		goto fail;
5322 	}
5323 
5324 	rv = !memcmp(response, right_response, resp_size);
5325 
5326 	if (rv)
5327 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5328 		     resp_size);
5329 	else
5330 		rv = -1;
5331 
5332  fail:
5333 	kfree(peers_ch);
5334 	kfree(response);
5335 	kfree(right_response);
5336 	if (desc) {
5337 		shash_desc_zero(desc);
5338 		kfree(desc);
5339 	}
5340 
5341 	return rv;
5342 }
5343 #endif
5344 
5345 int drbd_receiver(struct drbd_thread *thi)
5346 {
5347 	struct drbd_connection *connection = thi->connection;
5348 	int h;
5349 
5350 	drbd_info(connection, "receiver (re)started\n");
5351 
5352 	do {
5353 		h = conn_connect(connection);
5354 		if (h == 0) {
5355 			conn_disconnect(connection);
5356 			schedule_timeout_interruptible(HZ);
5357 		}
5358 		if (h == -1) {
5359 			drbd_warn(connection, "Discarding network configuration.\n");
5360 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5361 		}
5362 	} while (h == 0);
5363 
5364 	if (h > 0) {
5365 		blk_start_plug(&connection->receiver_plug);
5366 		drbdd(connection);
5367 		blk_finish_plug(&connection->receiver_plug);
5368 	}
5369 
5370 	conn_disconnect(connection);
5371 
5372 	drbd_info(connection, "receiver terminated\n");
5373 	return 0;
5374 }
5375 
5376 /* ********* acknowledge sender ******** */
5377 
5378 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5379 {
5380 	struct p_req_state_reply *p = pi->data;
5381 	int retcode = be32_to_cpu(p->retcode);
5382 
5383 	if (retcode >= SS_SUCCESS) {
5384 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5385 	} else {
5386 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5387 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5388 			 drbd_set_st_err_str(retcode), retcode);
5389 	}
5390 	wake_up(&connection->ping_wait);
5391 
5392 	return 0;
5393 }
5394 
5395 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5396 {
5397 	struct drbd_peer_device *peer_device;
5398 	struct drbd_device *device;
5399 	struct p_req_state_reply *p = pi->data;
5400 	int retcode = be32_to_cpu(p->retcode);
5401 
5402 	peer_device = conn_peer_device(connection, pi->vnr);
5403 	if (!peer_device)
5404 		return -EIO;
5405 	device = peer_device->device;
5406 
5407 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5408 		D_ASSERT(device, connection->agreed_pro_version < 100);
5409 		return got_conn_RqSReply(connection, pi);
5410 	}
5411 
5412 	if (retcode >= SS_SUCCESS) {
5413 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5414 	} else {
5415 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5416 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5417 			drbd_set_st_err_str(retcode), retcode);
5418 	}
5419 	wake_up(&device->state_wait);
5420 
5421 	return 0;
5422 }
5423 
5424 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5425 {
5426 	return drbd_send_ping_ack(connection);
5427 
5428 }
5429 
5430 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5431 {
5432 	/* restore idle timeout */
5433 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5434 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5435 		wake_up(&connection->ping_wait);
5436 
5437 	return 0;
5438 }
5439 
5440 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5441 {
5442 	struct drbd_peer_device *peer_device;
5443 	struct drbd_device *device;
5444 	struct p_block_ack *p = pi->data;
5445 	sector_t sector = be64_to_cpu(p->sector);
5446 	int blksize = be32_to_cpu(p->blksize);
5447 
5448 	peer_device = conn_peer_device(connection, pi->vnr);
5449 	if (!peer_device)
5450 		return -EIO;
5451 	device = peer_device->device;
5452 
5453 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5454 
5455 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5456 
5457 	if (get_ldev(device)) {
5458 		drbd_rs_complete_io(device, sector);
5459 		drbd_set_in_sync(peer_device, sector, blksize);
5460 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5461 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5462 		put_ldev(device);
5463 	}
5464 	dec_rs_pending(peer_device);
5465 	atomic_add(blksize >> 9, &device->rs_sect_in);
5466 
5467 	return 0;
5468 }
5469 
5470 static int
5471 validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
5472 			      struct rb_root *root, const char *func,
5473 			      enum drbd_req_event what, bool missing_ok)
5474 {
5475 	struct drbd_device *device = peer_device->device;
5476 	struct drbd_request *req;
5477 	struct bio_and_error m;
5478 
5479 	spin_lock_irq(&device->resource->req_lock);
5480 	req = find_request(device, root, id, sector, missing_ok, func);
5481 	if (unlikely(!req)) {
5482 		spin_unlock_irq(&device->resource->req_lock);
5483 		return -EIO;
5484 	}
5485 	__req_mod(req, what, peer_device, &m);
5486 	spin_unlock_irq(&device->resource->req_lock);
5487 
5488 	if (m.bio)
5489 		complete_master_bio(device, &m);
5490 	return 0;
5491 }
5492 
5493 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5494 {
5495 	struct drbd_peer_device *peer_device;
5496 	struct drbd_device *device;
5497 	struct p_block_ack *p = pi->data;
5498 	sector_t sector = be64_to_cpu(p->sector);
5499 	int blksize = be32_to_cpu(p->blksize);
5500 	enum drbd_req_event what;
5501 
5502 	peer_device = conn_peer_device(connection, pi->vnr);
5503 	if (!peer_device)
5504 		return -EIO;
5505 	device = peer_device->device;
5506 
5507 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5508 
5509 	if (p->block_id == ID_SYNCER) {
5510 		drbd_set_in_sync(peer_device, sector, blksize);
5511 		dec_rs_pending(peer_device);
5512 		return 0;
5513 	}
5514 	switch (pi->cmd) {
5515 	case P_RS_WRITE_ACK:
5516 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5517 		break;
5518 	case P_WRITE_ACK:
5519 		what = WRITE_ACKED_BY_PEER;
5520 		break;
5521 	case P_RECV_ACK:
5522 		what = RECV_ACKED_BY_PEER;
5523 		break;
5524 	case P_SUPERSEDED:
5525 		what = CONFLICT_RESOLVED;
5526 		break;
5527 	case P_RETRY_WRITE:
5528 		what = POSTPONE_WRITE;
5529 		break;
5530 	default:
5531 		BUG();
5532 	}
5533 
5534 	return validate_req_change_req_state(peer_device, p->block_id, sector,
5535 					     &device->write_requests, __func__,
5536 					     what, false);
5537 }
5538 
5539 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5540 {
5541 	struct drbd_peer_device *peer_device;
5542 	struct drbd_device *device;
5543 	struct p_block_ack *p = pi->data;
5544 	sector_t sector = be64_to_cpu(p->sector);
5545 	int size = be32_to_cpu(p->blksize);
5546 	int err;
5547 
5548 	peer_device = conn_peer_device(connection, pi->vnr);
5549 	if (!peer_device)
5550 		return -EIO;
5551 	device = peer_device->device;
5552 
5553 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5554 
5555 	if (p->block_id == ID_SYNCER) {
5556 		dec_rs_pending(peer_device);
5557 		drbd_rs_failed_io(peer_device, sector, size);
5558 		return 0;
5559 	}
5560 
5561 	err = validate_req_change_req_state(peer_device, p->block_id, sector,
5562 					    &device->write_requests, __func__,
5563 					    NEG_ACKED, true);
5564 	if (err) {
5565 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5566 		   The master bio might already be completed, therefore the
5567 		   request is no longer in the collision hash. */
5568 		/* In Protocol B we might already have got a P_RECV_ACK
5569 		   but then get a P_NEG_ACK afterwards. */
5570 		drbd_set_out_of_sync(peer_device, sector, size);
5571 	}
5572 	return 0;
5573 }
5574 
5575 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5576 {
5577 	struct drbd_peer_device *peer_device;
5578 	struct drbd_device *device;
5579 	struct p_block_ack *p = pi->data;
5580 	sector_t sector = be64_to_cpu(p->sector);
5581 
5582 	peer_device = conn_peer_device(connection, pi->vnr);
5583 	if (!peer_device)
5584 		return -EIO;
5585 	device = peer_device->device;
5586 
5587 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5588 
5589 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5590 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5591 
5592 	return validate_req_change_req_state(peer_device, p->block_id, sector,
5593 					     &device->read_requests, __func__,
5594 					     NEG_ACKED, false);
5595 }
5596 
5597 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5598 {
5599 	struct drbd_peer_device *peer_device;
5600 	struct drbd_device *device;
5601 	sector_t sector;
5602 	int size;
5603 	struct p_block_ack *p = pi->data;
5604 
5605 	peer_device = conn_peer_device(connection, pi->vnr);
5606 	if (!peer_device)
5607 		return -EIO;
5608 	device = peer_device->device;
5609 
5610 	sector = be64_to_cpu(p->sector);
5611 	size = be32_to_cpu(p->blksize);
5612 
5613 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5614 
5615 	dec_rs_pending(peer_device);
5616 
5617 	if (get_ldev_if_state(device, D_FAILED)) {
5618 		drbd_rs_complete_io(device, sector);
5619 		switch (pi->cmd) {
5620 		case P_NEG_RS_DREPLY:
5621 			drbd_rs_failed_io(peer_device, sector, size);
5622 			break;
5623 		case P_RS_CANCEL:
5624 			break;
5625 		default:
5626 			BUG();
5627 		}
5628 		put_ldev(device);
5629 	}
5630 
5631 	return 0;
5632 }
5633 
5634 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5635 {
5636 	struct p_barrier_ack *p = pi->data;
5637 	struct drbd_peer_device *peer_device;
5638 	int vnr;
5639 
5640 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5641 
5642 	rcu_read_lock();
5643 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5644 		struct drbd_device *device = peer_device->device;
5645 
5646 		if (device->state.conn == C_AHEAD &&
5647 		    atomic_read(&device->ap_in_flight) == 0 &&
5648 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5649 			device->start_resync_timer.expires = jiffies + HZ;
5650 			add_timer(&device->start_resync_timer);
5651 		}
5652 	}
5653 	rcu_read_unlock();
5654 
5655 	return 0;
5656 }
5657 
5658 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5659 {
5660 	struct drbd_peer_device *peer_device;
5661 	struct drbd_device *device;
5662 	struct p_block_ack *p = pi->data;
5663 	struct drbd_device_work *dw;
5664 	sector_t sector;
5665 	int size;
5666 
5667 	peer_device = conn_peer_device(connection, pi->vnr);
5668 	if (!peer_device)
5669 		return -EIO;
5670 	device = peer_device->device;
5671 
5672 	sector = be64_to_cpu(p->sector);
5673 	size = be32_to_cpu(p->blksize);
5674 
5675 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5676 
5677 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5678 		drbd_ov_out_of_sync_found(peer_device, sector, size);
5679 	else
5680 		ov_out_of_sync_print(peer_device);
5681 
5682 	if (!get_ldev(device))
5683 		return 0;
5684 
5685 	drbd_rs_complete_io(device, sector);
5686 	dec_rs_pending(peer_device);
5687 
5688 	--device->ov_left;
5689 
5690 	/* let's advance progress step marks only for every other megabyte */
5691 	if ((device->ov_left & 0x200) == 0x200)
5692 		drbd_advance_rs_marks(peer_device, device->ov_left);
5693 
5694 	if (device->ov_left == 0) {
5695 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5696 		if (dw) {
5697 			dw->w.cb = w_ov_finished;
5698 			dw->device = device;
5699 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5700 		} else {
5701 			drbd_err(device, "kmalloc(dw) failed.");
5702 			ov_out_of_sync_print(peer_device);
5703 			drbd_resync_finished(peer_device);
5704 		}
5705 	}
5706 	put_ldev(device);
5707 	return 0;
5708 }
5709 
5710 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5711 {
5712 	return 0;
5713 }
5714 
5715 struct meta_sock_cmd {
5716 	size_t pkt_size;
5717 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5718 };
5719 
5720 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5721 {
5722 	long t;
5723 	struct net_conf *nc;
5724 
5725 	rcu_read_lock();
5726 	nc = rcu_dereference(connection->net_conf);
5727 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5728 	rcu_read_unlock();
5729 
5730 	t *= HZ;
5731 	if (ping_timeout)
5732 		t /= 10;
5733 
5734 	connection->meta.socket->sk->sk_rcvtimeo = t;
5735 }
5736 
5737 static void set_ping_timeout(struct drbd_connection *connection)
5738 {
5739 	set_rcvtimeo(connection, 1);
5740 }
5741 
5742 static void set_idle_timeout(struct drbd_connection *connection)
5743 {
5744 	set_rcvtimeo(connection, 0);
5745 }
5746 
5747 static struct meta_sock_cmd ack_receiver_tbl[] = {
5748 	[P_PING]	    = { 0, got_Ping },
5749 	[P_PING_ACK]	    = { 0, got_PingAck },
5750 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5751 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5752 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5753 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5754 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5755 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5756 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5757 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5758 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5759 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5760 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5761 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5762 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5763 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5764 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5765 };
5766 
5767 int drbd_ack_receiver(struct drbd_thread *thi)
5768 {
5769 	struct drbd_connection *connection = thi->connection;
5770 	struct meta_sock_cmd *cmd = NULL;
5771 	struct packet_info pi;
5772 	unsigned long pre_recv_jif;
5773 	int rv;
5774 	void *buf    = connection->meta.rbuf;
5775 	int received = 0;
5776 	unsigned int header_size = drbd_header_size(connection);
5777 	int expect   = header_size;
5778 	bool ping_timeout_active = false;
5779 
5780 	sched_set_fifo_low(current);
5781 
5782 	while (get_t_state(thi) == RUNNING) {
5783 		drbd_thread_current_set_cpu(thi);
5784 
5785 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5786 			if (drbd_send_ping(connection)) {
5787 				drbd_err(connection, "drbd_send_ping has failed\n");
5788 				goto reconnect;
5789 			}
5790 			set_ping_timeout(connection);
5791 			ping_timeout_active = true;
5792 		}
5793 
5794 		pre_recv_jif = jiffies;
5795 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5796 
5797 		/* Note:
5798 		 * -EINTR	 (on meta) we got a signal
5799 		 * -EAGAIN	 (on meta) rcvtimeo expired
5800 		 * -ECONNRESET	 other side closed the connection
5801 		 * -ERESTARTSYS  (on data) we got a signal
5802 		 * rv <  0	 other than above: unexpected error!
5803 		 * rv == expected: full header or command
5804 		 * rv <  expected: "woken" by signal during receive
5805 		 * rv == 0	 : "connection shut down by peer"
5806 		 */
5807 		if (likely(rv > 0)) {
5808 			received += rv;
5809 			buf	 += rv;
5810 		} else if (rv == 0) {
5811 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5812 				long t;
5813 				rcu_read_lock();
5814 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5815 				rcu_read_unlock();
5816 
5817 				t = wait_event_timeout(connection->ping_wait,
5818 						       connection->cstate < C_WF_REPORT_PARAMS,
5819 						       t);
5820 				if (t)
5821 					break;
5822 			}
5823 			drbd_err(connection, "meta connection shut down by peer.\n");
5824 			goto reconnect;
5825 		} else if (rv == -EAGAIN) {
5826 			/* If the data socket received something meanwhile,
5827 			 * that is good enough: peer is still alive. */
5828 			if (time_after(connection->last_received, pre_recv_jif))
5829 				continue;
5830 			if (ping_timeout_active) {
5831 				drbd_err(connection, "PingAck did not arrive in time.\n");
5832 				goto reconnect;
5833 			}
5834 			set_bit(SEND_PING, &connection->flags);
5835 			continue;
5836 		} else if (rv == -EINTR) {
5837 			/* maybe drbd_thread_stop(): the while condition will notice.
5838 			 * maybe woken for send_ping: we'll send a ping above,
5839 			 * and change the rcvtimeo */
5840 			flush_signals(current);
5841 			continue;
5842 		} else {
5843 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5844 			goto reconnect;
5845 		}
5846 
5847 		if (received == expect && cmd == NULL) {
5848 			if (decode_header(connection, connection->meta.rbuf, &pi))
5849 				goto reconnect;
5850 			cmd = &ack_receiver_tbl[pi.cmd];
5851 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5852 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5853 					 cmdname(pi.cmd), pi.cmd);
5854 				goto disconnect;
5855 			}
5856 			expect = header_size + cmd->pkt_size;
5857 			if (pi.size != expect - header_size) {
5858 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5859 					pi.cmd, pi.size);
5860 				goto reconnect;
5861 			}
5862 		}
5863 		if (received == expect) {
5864 			bool err;
5865 
5866 			err = cmd->fn(connection, &pi);
5867 			if (err) {
5868 				drbd_err(connection, "%ps failed\n", cmd->fn);
5869 				goto reconnect;
5870 			}
5871 
5872 			connection->last_received = jiffies;
5873 
5874 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5875 				set_idle_timeout(connection);
5876 				ping_timeout_active = false;
5877 			}
5878 
5879 			buf	 = connection->meta.rbuf;
5880 			received = 0;
5881 			expect	 = header_size;
5882 			cmd	 = NULL;
5883 		}
5884 	}
5885 
5886 	if (0) {
5887 reconnect:
5888 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5889 		conn_md_sync(connection);
5890 	}
5891 	if (0) {
5892 disconnect:
5893 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5894 	}
5895 
5896 	drbd_info(connection, "ack_receiver terminated\n");
5897 
5898 	return 0;
5899 }
5900 
5901 void drbd_send_acks_wf(struct work_struct *ws)
5902 {
5903 	struct drbd_peer_device *peer_device =
5904 		container_of(ws, struct drbd_peer_device, send_acks_work);
5905 	struct drbd_connection *connection = peer_device->connection;
5906 	struct drbd_device *device = peer_device->device;
5907 	struct net_conf *nc;
5908 	int tcp_cork, err;
5909 
5910 	rcu_read_lock();
5911 	nc = rcu_dereference(connection->net_conf);
5912 	tcp_cork = nc->tcp_cork;
5913 	rcu_read_unlock();
5914 
5915 	if (tcp_cork)
5916 		tcp_sock_set_cork(connection->meta.socket->sk, true);
5917 
5918 	err = drbd_finish_peer_reqs(device);
5919 	kref_put(&device->kref, drbd_destroy_device);
5920 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5921 	   struct work_struct send_acks_work alive, which is in the peer_device object */
5922 
5923 	if (err) {
5924 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5925 		return;
5926 	}
5927 
5928 	if (tcp_cork)
5929 		tcp_sock_set_cork(connection->meta.socket->sk, false);
5930 
5931 	return;
5932 }
5933