1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 drbd_receiver.c 4 5 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 6 7 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 8 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 9 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 10 11 */ 12 13 14 #include <linux/module.h> 15 16 #include <linux/uaccess.h> 17 #include <net/sock.h> 18 19 #include <linux/drbd.h> 20 #include <linux/fs.h> 21 #include <linux/file.h> 22 #include <linux/in.h> 23 #include <linux/mm.h> 24 #include <linux/memcontrol.h> 25 #include <linux/mm_inline.h> 26 #include <linux/slab.h> 27 #include <uapi/linux/sched/types.h> 28 #include <linux/sched/signal.h> 29 #include <linux/pkt_sched.h> 30 #include <linux/unistd.h> 31 #include <linux/vmalloc.h> 32 #include <linux/random.h> 33 #include <linux/string.h> 34 #include <linux/scatterlist.h> 35 #include <linux/part_stat.h> 36 #include <linux/mempool.h> 37 #include "drbd_int.h" 38 #include "drbd_protocol.h" 39 #include "drbd_req.h" 40 #include "drbd_vli.h" 41 42 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES) 43 44 struct packet_info { 45 enum drbd_packet cmd; 46 unsigned int size; 47 unsigned int vnr; 48 void *data; 49 }; 50 51 enum finish_epoch { 52 FE_STILL_LIVE, 53 FE_DESTROYED, 54 FE_RECYCLED, 55 }; 56 57 static int drbd_do_features(struct drbd_connection *connection); 58 static int drbd_do_auth(struct drbd_connection *connection); 59 static int drbd_disconnected(struct drbd_peer_device *); 60 static void conn_wait_active_ee_empty(struct drbd_connection *connection); 61 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 62 static int e_end_block(struct drbd_work *, int); 63 64 65 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 66 67 static struct page *__drbd_alloc_pages(unsigned int number) 68 { 69 struct page *page = NULL; 70 struct page *tmp = NULL; 71 unsigned int i = 0; 72 73 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 74 * "criss-cross" setup, that might cause write-out on some other DRBD, 75 * which in turn might block on the other node at this very place. */ 76 for (i = 0; i < number; i++) { 77 tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY); 78 if (!tmp) 79 goto fail; 80 set_page_private(tmp, (unsigned long)page); 81 page = tmp; 82 } 83 return page; 84 fail: 85 page_chain_for_each_safe(page, tmp) { 86 set_page_private(page, 0); 87 mempool_free(page, &drbd_buffer_page_pool); 88 } 89 return NULL; 90 } 91 92 /** 93 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 94 * @peer_device: DRBD device. 95 * @number: number of pages requested 96 * @retry: whether to retry, if not enough pages are available right now 97 * 98 * Tries to allocate number pages, first from our own page pool, then from 99 * the kernel. 100 * Possibly retry until DRBD frees sufficient pages somewhere else. 101 * 102 * If this allocation would exceed the max_buffers setting, we throttle 103 * allocation (schedule_timeout) to give the system some room to breathe. 104 * 105 * We do not use max-buffers as hard limit, because it could lead to 106 * congestion and further to a distributed deadlock during online-verify or 107 * (checksum based) resync, if the max-buffers, socket buffer sizes and 108 * resync-rate settings are mis-configured. 109 * 110 * Returns a page chain linked via page->private. 111 */ 112 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 113 bool retry) 114 { 115 struct drbd_device *device = peer_device->device; 116 struct page *page; 117 struct net_conf *nc; 118 unsigned int mxb; 119 120 rcu_read_lock(); 121 nc = rcu_dereference(peer_device->connection->net_conf); 122 mxb = nc ? nc->max_buffers : 1000000; 123 rcu_read_unlock(); 124 125 if (atomic_read(&device->pp_in_use) >= mxb) 126 schedule_timeout_interruptible(HZ / 10); 127 page = __drbd_alloc_pages(number); 128 129 if (page) 130 atomic_add(number, &device->pp_in_use); 131 return page; 132 } 133 134 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 135 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 136 * Either links the page chain back to the global pool, 137 * or returns all pages to the system. */ 138 static void drbd_free_pages(struct drbd_device *device, struct page *page) 139 { 140 struct page *tmp; 141 int i = 0; 142 143 if (page == NULL) 144 return; 145 146 page_chain_for_each_safe(page, tmp) { 147 set_page_private(page, 0); 148 if (page_count(page) == 1) 149 mempool_free(page, &drbd_buffer_page_pool); 150 else 151 put_page(page); 152 i++; 153 } 154 i = atomic_sub_return(i, &device->pp_in_use); 155 if (i < 0) 156 drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); 157 } 158 159 /* 160 You need to hold the req_lock: 161 _drbd_wait_ee_list_empty() 162 163 You must not have the req_lock: 164 drbd_free_peer_req() 165 drbd_alloc_peer_req() 166 drbd_free_peer_reqs() 167 drbd_ee_fix_bhs() 168 drbd_finish_peer_reqs() 169 drbd_clear_done_ee() 170 drbd_wait_ee_list_empty() 171 */ 172 173 /* normal: payload_size == request size (bi_size) 174 * w_same: payload_size == logical_block_size 175 * trim: payload_size == 0 */ 176 struct drbd_peer_request * 177 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 178 unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local) 179 { 180 struct drbd_device *device = peer_device->device; 181 struct drbd_peer_request *peer_req; 182 struct page *page = NULL; 183 unsigned int nr_pages = PFN_UP(payload_size); 184 185 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 186 return NULL; 187 188 peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 189 if (!peer_req) { 190 if (!(gfp_mask & __GFP_NOWARN)) 191 drbd_err(device, "%s: allocation failed\n", __func__); 192 return NULL; 193 } 194 195 if (nr_pages) { 196 page = drbd_alloc_pages(peer_device, nr_pages, 197 gfpflags_allow_blocking(gfp_mask)); 198 if (!page) 199 goto fail; 200 if (!mempool_is_saturated(&drbd_buffer_page_pool)) 201 peer_req->flags |= EE_RELEASE_TO_MEMPOOL; 202 } 203 204 memset(peer_req, 0, sizeof(*peer_req)); 205 INIT_LIST_HEAD(&peer_req->w.list); 206 drbd_clear_interval(&peer_req->i); 207 peer_req->i.size = request_size; 208 peer_req->i.sector = sector; 209 peer_req->submit_jif = jiffies; 210 peer_req->peer_device = peer_device; 211 peer_req->pages = page; 212 /* 213 * The block_id is opaque to the receiver. It is not endianness 214 * converted, and sent back to the sender unchanged. 215 */ 216 peer_req->block_id = id; 217 218 return peer_req; 219 220 fail: 221 mempool_free(peer_req, &drbd_ee_mempool); 222 return NULL; 223 } 224 225 void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req) 226 { 227 might_sleep(); 228 if (peer_req->flags & EE_HAS_DIGEST) 229 kfree(peer_req->digest); 230 drbd_free_pages(device, peer_req->pages); 231 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 232 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 233 if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { 234 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 235 drbd_al_complete_io(device, &peer_req->i); 236 } 237 mempool_free(peer_req, &drbd_ee_mempool); 238 } 239 240 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 241 { 242 LIST_HEAD(work_list); 243 struct drbd_peer_request *peer_req, *t; 244 int count = 0; 245 246 spin_lock_irq(&device->resource->req_lock); 247 list_splice_init(list, &work_list); 248 spin_unlock_irq(&device->resource->req_lock); 249 250 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 251 drbd_free_peer_req(device, peer_req); 252 count++; 253 } 254 return count; 255 } 256 257 /* 258 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 259 */ 260 static int drbd_finish_peer_reqs(struct drbd_device *device) 261 { 262 LIST_HEAD(work_list); 263 struct drbd_peer_request *peer_req, *t; 264 int err = 0; 265 266 spin_lock_irq(&device->resource->req_lock); 267 list_splice_init(&device->done_ee, &work_list); 268 spin_unlock_irq(&device->resource->req_lock); 269 270 /* possible callbacks here: 271 * e_end_block, and e_end_resync_block, e_send_superseded. 272 * all ignore the last argument. 273 */ 274 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 275 int err2; 276 277 /* list_del not necessary, next/prev members not touched */ 278 err2 = peer_req->w.cb(&peer_req->w, !!err); 279 if (!err) 280 err = err2; 281 drbd_free_peer_req(device, peer_req); 282 } 283 wake_up(&device->ee_wait); 284 285 return err; 286 } 287 288 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 289 struct list_head *head) 290 { 291 DEFINE_WAIT(wait); 292 293 /* avoids spin_lock/unlock 294 * and calling prepare_to_wait in the fast path */ 295 while (!list_empty(head)) { 296 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 297 spin_unlock_irq(&device->resource->req_lock); 298 io_schedule(); 299 finish_wait(&device->ee_wait, &wait); 300 spin_lock_irq(&device->resource->req_lock); 301 } 302 } 303 304 static void drbd_wait_ee_list_empty(struct drbd_device *device, 305 struct list_head *head) 306 { 307 spin_lock_irq(&device->resource->req_lock); 308 _drbd_wait_ee_list_empty(device, head); 309 spin_unlock_irq(&device->resource->req_lock); 310 } 311 312 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 313 { 314 struct kvec iov = { 315 .iov_base = buf, 316 .iov_len = size, 317 }; 318 struct msghdr msg = { 319 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 320 }; 321 iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size); 322 return sock_recvmsg(sock, &msg, msg.msg_flags); 323 } 324 325 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 326 { 327 int rv; 328 329 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 330 331 if (rv < 0) { 332 if (rv == -ECONNRESET) 333 drbd_info(connection, "sock was reset by peer\n"); 334 else if (rv != -ERESTARTSYS) 335 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 336 } else if (rv == 0) { 337 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 338 long t; 339 rcu_read_lock(); 340 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 341 rcu_read_unlock(); 342 343 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 344 345 if (t) 346 goto out; 347 } 348 drbd_info(connection, "sock was shut down by peer\n"); 349 } 350 351 if (rv != size) 352 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 353 354 out: 355 return rv; 356 } 357 358 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 359 { 360 int err; 361 362 err = drbd_recv(connection, buf, size); 363 if (err != size) { 364 if (err >= 0) 365 err = -EIO; 366 } else 367 err = 0; 368 return err; 369 } 370 371 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 372 { 373 int err; 374 375 err = drbd_recv_all(connection, buf, size); 376 if (err && !signal_pending(current)) 377 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 378 return err; 379 } 380 381 /* quoting tcp(7): 382 * On individual connections, the socket buffer size must be set prior to the 383 * listen(2) or connect(2) calls in order to have it take effect. 384 * This is our wrapper to do so. 385 */ 386 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 387 unsigned int rcv) 388 { 389 /* open coded SO_SNDBUF, SO_RCVBUF */ 390 if (snd) { 391 sock->sk->sk_sndbuf = snd; 392 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 393 } 394 if (rcv) { 395 sock->sk->sk_rcvbuf = rcv; 396 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 397 } 398 } 399 400 static struct socket *drbd_try_connect(struct drbd_connection *connection) 401 { 402 const char *what; 403 struct socket *sock; 404 struct sockaddr_in6 src_in6; 405 struct sockaddr_in6 peer_in6; 406 struct net_conf *nc; 407 int err, peer_addr_len, my_addr_len; 408 int sndbuf_size, rcvbuf_size, connect_int; 409 int disconnect_on_error = 1; 410 411 rcu_read_lock(); 412 nc = rcu_dereference(connection->net_conf); 413 if (!nc) { 414 rcu_read_unlock(); 415 return NULL; 416 } 417 sndbuf_size = nc->sndbuf_size; 418 rcvbuf_size = nc->rcvbuf_size; 419 connect_int = nc->connect_int; 420 rcu_read_unlock(); 421 422 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 423 memcpy(&src_in6, &connection->my_addr, my_addr_len); 424 425 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 426 src_in6.sin6_port = 0; 427 else 428 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 429 430 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 431 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 432 433 what = "sock_create_kern"; 434 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family, 435 SOCK_STREAM, IPPROTO_TCP, &sock); 436 if (err < 0) { 437 sock = NULL; 438 goto out; 439 } 440 441 sock->sk->sk_rcvtimeo = 442 sock->sk->sk_sndtimeo = connect_int * HZ; 443 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 444 445 /* explicitly bind to the configured IP as source IP 446 * for the outgoing connections. 447 * This is needed for multihomed hosts and to be 448 * able to use lo: interfaces for drbd. 449 * Make sure to use 0 as port number, so linux selects 450 * a free one dynamically. 451 */ 452 what = "bind before connect"; 453 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); 454 if (err < 0) 455 goto out; 456 457 /* connect may fail, peer not yet available. 458 * stay C_WF_CONNECTION, don't go Disconnecting! */ 459 disconnect_on_error = 0; 460 what = "connect"; 461 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0); 462 463 out: 464 if (err < 0) { 465 if (sock) { 466 sock_release(sock); 467 sock = NULL; 468 } 469 switch (-err) { 470 /* timeout, busy, signal pending */ 471 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 472 case EINTR: case ERESTARTSYS: 473 /* peer not (yet) available, network problem */ 474 case ECONNREFUSED: case ENETUNREACH: 475 case EHOSTDOWN: case EHOSTUNREACH: 476 disconnect_on_error = 0; 477 break; 478 default: 479 drbd_err(connection, "%s failed, err = %d\n", what, err); 480 } 481 if (disconnect_on_error) 482 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 483 } 484 485 return sock; 486 } 487 488 struct accept_wait_data { 489 struct drbd_connection *connection; 490 struct socket *s_listen; 491 struct completion door_bell; 492 void (*original_sk_state_change)(struct sock *sk); 493 494 }; 495 496 static void drbd_incoming_connection(struct sock *sk) 497 { 498 struct accept_wait_data *ad = sk->sk_user_data; 499 void (*state_change)(struct sock *sk); 500 501 state_change = ad->original_sk_state_change; 502 if (sk->sk_state == TCP_ESTABLISHED) 503 complete(&ad->door_bell); 504 state_change(sk); 505 } 506 507 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 508 { 509 int err, sndbuf_size, rcvbuf_size, my_addr_len; 510 struct sockaddr_in6 my_addr; 511 struct socket *s_listen; 512 struct net_conf *nc; 513 const char *what; 514 515 rcu_read_lock(); 516 nc = rcu_dereference(connection->net_conf); 517 if (!nc) { 518 rcu_read_unlock(); 519 return -EIO; 520 } 521 sndbuf_size = nc->sndbuf_size; 522 rcvbuf_size = nc->rcvbuf_size; 523 rcu_read_unlock(); 524 525 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 526 memcpy(&my_addr, &connection->my_addr, my_addr_len); 527 528 what = "sock_create_kern"; 529 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family, 530 SOCK_STREAM, IPPROTO_TCP, &s_listen); 531 if (err) { 532 s_listen = NULL; 533 goto out; 534 } 535 536 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 537 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 538 539 what = "bind before listen"; 540 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len); 541 if (err < 0) 542 goto out; 543 544 ad->s_listen = s_listen; 545 write_lock_bh(&s_listen->sk->sk_callback_lock); 546 ad->original_sk_state_change = s_listen->sk->sk_state_change; 547 s_listen->sk->sk_state_change = drbd_incoming_connection; 548 s_listen->sk->sk_user_data = ad; 549 write_unlock_bh(&s_listen->sk->sk_callback_lock); 550 551 what = "listen"; 552 err = s_listen->ops->listen(s_listen, 5); 553 if (err < 0) 554 goto out; 555 556 return 0; 557 out: 558 if (s_listen) 559 sock_release(s_listen); 560 if (err < 0) { 561 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 562 drbd_err(connection, "%s failed, err = %d\n", what, err); 563 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 564 } 565 } 566 567 return -EIO; 568 } 569 570 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 571 { 572 write_lock_bh(&sk->sk_callback_lock); 573 sk->sk_state_change = ad->original_sk_state_change; 574 sk->sk_user_data = NULL; 575 write_unlock_bh(&sk->sk_callback_lock); 576 } 577 578 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 579 { 580 int timeo, connect_int, err = 0; 581 struct socket *s_estab = NULL; 582 struct net_conf *nc; 583 584 rcu_read_lock(); 585 nc = rcu_dereference(connection->net_conf); 586 if (!nc) { 587 rcu_read_unlock(); 588 return NULL; 589 } 590 connect_int = nc->connect_int; 591 rcu_read_unlock(); 592 593 timeo = connect_int * HZ; 594 /* 28.5% random jitter */ 595 timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7; 596 597 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 598 if (err <= 0) 599 return NULL; 600 601 err = kernel_accept(ad->s_listen, &s_estab, 0); 602 if (err < 0) { 603 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 604 drbd_err(connection, "accept failed, err = %d\n", err); 605 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 606 } 607 } 608 609 if (s_estab) 610 unregister_state_change(s_estab->sk, ad); 611 612 return s_estab; 613 } 614 615 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 616 617 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 618 enum drbd_packet cmd) 619 { 620 if (!conn_prepare_command(connection, sock)) 621 return -EIO; 622 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 623 } 624 625 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 626 { 627 unsigned int header_size = drbd_header_size(connection); 628 struct packet_info pi; 629 struct net_conf *nc; 630 int err; 631 632 rcu_read_lock(); 633 nc = rcu_dereference(connection->net_conf); 634 if (!nc) { 635 rcu_read_unlock(); 636 return -EIO; 637 } 638 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10; 639 rcu_read_unlock(); 640 641 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 642 if (err != header_size) { 643 if (err >= 0) 644 err = -EIO; 645 return err; 646 } 647 err = decode_header(connection, connection->data.rbuf, &pi); 648 if (err) 649 return err; 650 return pi.cmd; 651 } 652 653 /** 654 * drbd_socket_okay() - Free the socket if its connection is not okay 655 * @sock: pointer to the pointer to the socket. 656 */ 657 static bool drbd_socket_okay(struct socket **sock) 658 { 659 int rr; 660 char tb[4]; 661 662 if (!*sock) 663 return false; 664 665 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 666 667 if (rr > 0 || rr == -EAGAIN) { 668 return true; 669 } else { 670 sock_release(*sock); 671 *sock = NULL; 672 return false; 673 } 674 } 675 676 static bool connection_established(struct drbd_connection *connection, 677 struct socket **sock1, 678 struct socket **sock2) 679 { 680 struct net_conf *nc; 681 int timeout; 682 bool ok; 683 684 if (!*sock1 || !*sock2) 685 return false; 686 687 rcu_read_lock(); 688 nc = rcu_dereference(connection->net_conf); 689 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10; 690 rcu_read_unlock(); 691 schedule_timeout_interruptible(timeout); 692 693 ok = drbd_socket_okay(sock1); 694 ok = drbd_socket_okay(sock2) && ok; 695 696 return ok; 697 } 698 699 /* Gets called if a connection is established, or if a new minor gets created 700 in a connection */ 701 int drbd_connected(struct drbd_peer_device *peer_device) 702 { 703 struct drbd_device *device = peer_device->device; 704 int err; 705 706 atomic_set(&device->packet_seq, 0); 707 device->peer_seq = 0; 708 709 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 710 &peer_device->connection->cstate_mutex : 711 &device->own_state_mutex; 712 713 err = drbd_send_sync_param(peer_device); 714 if (!err) 715 err = drbd_send_sizes(peer_device, 0, 0); 716 if (!err) 717 err = drbd_send_uuids(peer_device); 718 if (!err) 719 err = drbd_send_current_state(peer_device); 720 clear_bit(USE_DEGR_WFC_T, &device->flags); 721 clear_bit(RESIZE_PENDING, &device->flags); 722 atomic_set(&device->ap_in_flight, 0); 723 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 724 return err; 725 } 726 727 /* 728 * return values: 729 * 1 yes, we have a valid connection 730 * 0 oops, did not work out, please try again 731 * -1 peer talks different language, 732 * no point in trying again, please go standalone. 733 * -2 We do not have a network config... 734 */ 735 static int conn_connect(struct drbd_connection *connection) 736 { 737 struct drbd_socket sock, msock; 738 struct drbd_peer_device *peer_device; 739 struct net_conf *nc; 740 int vnr, timeout, h; 741 bool discard_my_data, ok; 742 enum drbd_state_rv rv; 743 struct accept_wait_data ad = { 744 .connection = connection, 745 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 746 }; 747 748 clear_bit(DISCONNECT_SENT, &connection->flags); 749 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 750 return -2; 751 752 mutex_init(&sock.mutex); 753 sock.sbuf = connection->data.sbuf; 754 sock.rbuf = connection->data.rbuf; 755 sock.socket = NULL; 756 mutex_init(&msock.mutex); 757 msock.sbuf = connection->meta.sbuf; 758 msock.rbuf = connection->meta.rbuf; 759 msock.socket = NULL; 760 761 /* Assume that the peer only understands protocol 80 until we know better. */ 762 connection->agreed_pro_version = 80; 763 764 if (prepare_listen_socket(connection, &ad)) 765 return 0; 766 767 do { 768 struct socket *s; 769 770 s = drbd_try_connect(connection); 771 if (s) { 772 if (!sock.socket) { 773 sock.socket = s; 774 send_first_packet(connection, &sock, P_INITIAL_DATA); 775 } else if (!msock.socket) { 776 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 777 msock.socket = s; 778 send_first_packet(connection, &msock, P_INITIAL_META); 779 } else { 780 drbd_err(connection, "Logic error in conn_connect()\n"); 781 goto out_release_sockets; 782 } 783 } 784 785 if (connection_established(connection, &sock.socket, &msock.socket)) 786 break; 787 788 retry: 789 s = drbd_wait_for_connect(connection, &ad); 790 if (s) { 791 int fp = receive_first_packet(connection, s); 792 drbd_socket_okay(&sock.socket); 793 drbd_socket_okay(&msock.socket); 794 switch (fp) { 795 case P_INITIAL_DATA: 796 if (sock.socket) { 797 drbd_warn(connection, "initial packet S crossed\n"); 798 sock_release(sock.socket); 799 sock.socket = s; 800 goto randomize; 801 } 802 sock.socket = s; 803 break; 804 case P_INITIAL_META: 805 set_bit(RESOLVE_CONFLICTS, &connection->flags); 806 if (msock.socket) { 807 drbd_warn(connection, "initial packet M crossed\n"); 808 sock_release(msock.socket); 809 msock.socket = s; 810 goto randomize; 811 } 812 msock.socket = s; 813 break; 814 default: 815 drbd_warn(connection, "Error receiving initial packet\n"); 816 sock_release(s); 817 randomize: 818 if (get_random_u32_below(2)) 819 goto retry; 820 } 821 } 822 823 if (connection->cstate <= C_DISCONNECTING) 824 goto out_release_sockets; 825 if (signal_pending(current)) { 826 flush_signals(current); 827 smp_rmb(); 828 if (get_t_state(&connection->receiver) == EXITING) 829 goto out_release_sockets; 830 } 831 832 ok = connection_established(connection, &sock.socket, &msock.socket); 833 } while (!ok); 834 835 if (ad.s_listen) 836 sock_release(ad.s_listen); 837 838 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 839 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 840 841 sock.socket->sk->sk_allocation = GFP_NOIO; 842 msock.socket->sk->sk_allocation = GFP_NOIO; 843 844 sock.socket->sk->sk_use_task_frag = false; 845 msock.socket->sk->sk_use_task_frag = false; 846 847 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 848 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 849 850 /* NOT YET ... 851 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 852 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 853 * first set it to the P_CONNECTION_FEATURES timeout, 854 * which we set to 4x the configured ping_timeout. */ 855 rcu_read_lock(); 856 nc = rcu_dereference(connection->net_conf); 857 858 sock.socket->sk->sk_sndtimeo = 859 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 860 861 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 862 timeout = nc->timeout * HZ / 10; 863 discard_my_data = nc->discard_my_data; 864 rcu_read_unlock(); 865 866 msock.socket->sk->sk_sndtimeo = timeout; 867 868 /* we don't want delays. 869 * we use TCP_CORK where appropriate, though */ 870 tcp_sock_set_nodelay(sock.socket->sk); 871 tcp_sock_set_nodelay(msock.socket->sk); 872 873 connection->data.socket = sock.socket; 874 connection->meta.socket = msock.socket; 875 connection->last_received = jiffies; 876 877 h = drbd_do_features(connection); 878 if (h <= 0) 879 return h; 880 881 if (connection->cram_hmac_tfm) { 882 /* drbd_request_state(device, NS(conn, WFAuth)); */ 883 switch (drbd_do_auth(connection)) { 884 case -1: 885 drbd_err(connection, "Authentication of peer failed\n"); 886 return -1; 887 case 0: 888 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 889 return 0; 890 } 891 } 892 893 connection->data.socket->sk->sk_sndtimeo = timeout; 894 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 895 896 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 897 return -1; 898 899 /* Prevent a race between resync-handshake and 900 * being promoted to Primary. 901 * 902 * Grab and release the state mutex, so we know that any current 903 * drbd_set_role() is finished, and any incoming drbd_set_role 904 * will see the STATE_SENT flag, and wait for it to be cleared. 905 */ 906 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 907 mutex_lock(peer_device->device->state_mutex); 908 909 /* avoid a race with conn_request_state( C_DISCONNECTING ) */ 910 spin_lock_irq(&connection->resource->req_lock); 911 set_bit(STATE_SENT, &connection->flags); 912 spin_unlock_irq(&connection->resource->req_lock); 913 914 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 915 mutex_unlock(peer_device->device->state_mutex); 916 917 rcu_read_lock(); 918 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 919 struct drbd_device *device = peer_device->device; 920 kref_get(&device->kref); 921 rcu_read_unlock(); 922 923 if (discard_my_data) 924 set_bit(DISCARD_MY_DATA, &device->flags); 925 else 926 clear_bit(DISCARD_MY_DATA, &device->flags); 927 928 drbd_connected(peer_device); 929 kref_put(&device->kref, drbd_destroy_device); 930 rcu_read_lock(); 931 } 932 rcu_read_unlock(); 933 934 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 935 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 936 clear_bit(STATE_SENT, &connection->flags); 937 return 0; 938 } 939 940 drbd_thread_start(&connection->ack_receiver); 941 /* opencoded create_singlethread_workqueue(), 942 * to be able to use format string arguments */ 943 connection->ack_sender = 944 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name); 945 if (!connection->ack_sender) { 946 drbd_err(connection, "Failed to create workqueue ack_sender\n"); 947 return 0; 948 } 949 950 mutex_lock(&connection->resource->conf_update); 951 /* The discard_my_data flag is a single-shot modifier to the next 952 * connection attempt, the handshake of which is now well underway. 953 * No need for rcu style copying of the whole struct 954 * just to clear a single value. */ 955 connection->net_conf->discard_my_data = 0; 956 mutex_unlock(&connection->resource->conf_update); 957 958 return h; 959 960 out_release_sockets: 961 if (ad.s_listen) 962 sock_release(ad.s_listen); 963 if (sock.socket) 964 sock_release(sock.socket); 965 if (msock.socket) 966 sock_release(msock.socket); 967 return -1; 968 } 969 970 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 971 { 972 unsigned int header_size = drbd_header_size(connection); 973 974 if (header_size == sizeof(struct p_header100) && 975 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 976 struct p_header100 *h = header; 977 if (h->pad != 0) { 978 drbd_err(connection, "Header padding is not zero\n"); 979 return -EINVAL; 980 } 981 pi->vnr = be16_to_cpu(h->volume); 982 pi->cmd = be16_to_cpu(h->command); 983 pi->size = be32_to_cpu(h->length); 984 } else if (header_size == sizeof(struct p_header95) && 985 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 986 struct p_header95 *h = header; 987 pi->cmd = be16_to_cpu(h->command); 988 pi->size = be32_to_cpu(h->length); 989 pi->vnr = 0; 990 } else if (header_size == sizeof(struct p_header80) && 991 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 992 struct p_header80 *h = header; 993 pi->cmd = be16_to_cpu(h->command); 994 pi->size = be16_to_cpu(h->length); 995 pi->vnr = 0; 996 } else { 997 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 998 be32_to_cpu(*(__be32 *)header), 999 connection->agreed_pro_version); 1000 return -EINVAL; 1001 } 1002 pi->data = header + header_size; 1003 return 0; 1004 } 1005 1006 static void drbd_unplug_all_devices(struct drbd_connection *connection) 1007 { 1008 if (current->plug == &connection->receiver_plug) { 1009 blk_finish_plug(&connection->receiver_plug); 1010 blk_start_plug(&connection->receiver_plug); 1011 } /* else: maybe just schedule() ?? */ 1012 } 1013 1014 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1015 { 1016 void *buffer = connection->data.rbuf; 1017 int err; 1018 1019 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1020 if (err) 1021 return err; 1022 1023 err = decode_header(connection, buffer, pi); 1024 connection->last_received = jiffies; 1025 1026 return err; 1027 } 1028 1029 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi) 1030 { 1031 void *buffer = connection->data.rbuf; 1032 unsigned int size = drbd_header_size(connection); 1033 int err; 1034 1035 err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT); 1036 if (err != size) { 1037 /* If we have nothing in the receive buffer now, to reduce 1038 * application latency, try to drain the backend queues as 1039 * quickly as possible, and let remote TCP know what we have 1040 * received so far. */ 1041 if (err == -EAGAIN) { 1042 tcp_sock_set_quickack(connection->data.socket->sk, 2); 1043 drbd_unplug_all_devices(connection); 1044 } 1045 if (err > 0) { 1046 buffer += err; 1047 size -= err; 1048 } 1049 err = drbd_recv_all_warn(connection, buffer, size); 1050 if (err) 1051 return err; 1052 } 1053 1054 err = decode_header(connection, connection->data.rbuf, pi); 1055 connection->last_received = jiffies; 1056 1057 return err; 1058 } 1059 /* This is blkdev_issue_flush, but asynchronous. 1060 * We want to submit to all component volumes in parallel, 1061 * then wait for all completions. 1062 */ 1063 struct issue_flush_context { 1064 atomic_t pending; 1065 int error; 1066 struct completion done; 1067 }; 1068 struct one_flush_context { 1069 struct drbd_device *device; 1070 struct issue_flush_context *ctx; 1071 }; 1072 1073 static void one_flush_endio(struct bio *bio) 1074 { 1075 struct one_flush_context *octx = bio->bi_private; 1076 struct drbd_device *device = octx->device; 1077 struct issue_flush_context *ctx = octx->ctx; 1078 1079 if (bio->bi_status) { 1080 ctx->error = blk_status_to_errno(bio->bi_status); 1081 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status); 1082 } 1083 kfree(octx); 1084 bio_put(bio); 1085 1086 clear_bit(FLUSH_PENDING, &device->flags); 1087 put_ldev(device); 1088 kref_put(&device->kref, drbd_destroy_device); 1089 1090 if (atomic_dec_and_test(&ctx->pending)) 1091 complete(&ctx->done); 1092 } 1093 1094 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx) 1095 { 1096 struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0, 1097 REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO); 1098 struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO); 1099 1100 if (!octx) { 1101 drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n"); 1102 /* FIXME: what else can I do now? disconnecting or detaching 1103 * really does not help to improve the state of the world, either. 1104 */ 1105 bio_put(bio); 1106 1107 ctx->error = -ENOMEM; 1108 put_ldev(device); 1109 kref_put(&device->kref, drbd_destroy_device); 1110 return; 1111 } 1112 1113 octx->device = device; 1114 octx->ctx = ctx; 1115 bio->bi_private = octx; 1116 bio->bi_end_io = one_flush_endio; 1117 1118 device->flush_jif = jiffies; 1119 set_bit(FLUSH_PENDING, &device->flags); 1120 atomic_inc(&ctx->pending); 1121 submit_bio(bio); 1122 } 1123 1124 static void drbd_flush(struct drbd_connection *connection) 1125 { 1126 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) { 1127 struct drbd_peer_device *peer_device; 1128 struct issue_flush_context ctx; 1129 int vnr; 1130 1131 atomic_set(&ctx.pending, 1); 1132 ctx.error = 0; 1133 init_completion(&ctx.done); 1134 1135 rcu_read_lock(); 1136 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1137 struct drbd_device *device = peer_device->device; 1138 1139 if (!get_ldev(device)) 1140 continue; 1141 kref_get(&device->kref); 1142 rcu_read_unlock(); 1143 1144 submit_one_flush(device, &ctx); 1145 1146 rcu_read_lock(); 1147 } 1148 rcu_read_unlock(); 1149 1150 /* Do we want to add a timeout, 1151 * if disk-timeout is set? */ 1152 if (!atomic_dec_and_test(&ctx.pending)) 1153 wait_for_completion(&ctx.done); 1154 1155 if (ctx.error) { 1156 /* would rather check on EOPNOTSUPP, but that is not reliable. 1157 * don't try again for ANY return value != 0 1158 * if (rv == -EOPNOTSUPP) */ 1159 /* Any error is already reported by bio_endio callback. */ 1160 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO); 1161 } 1162 } 1163 } 1164 1165 /** 1166 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1167 * @connection: DRBD connection. 1168 * @epoch: Epoch object. 1169 * @ev: Epoch event. 1170 */ 1171 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1172 struct drbd_epoch *epoch, 1173 enum epoch_event ev) 1174 { 1175 int epoch_size; 1176 struct drbd_epoch *next_epoch; 1177 enum finish_epoch rv = FE_STILL_LIVE; 1178 1179 spin_lock(&connection->epoch_lock); 1180 do { 1181 next_epoch = NULL; 1182 1183 epoch_size = atomic_read(&epoch->epoch_size); 1184 1185 switch (ev & ~EV_CLEANUP) { 1186 case EV_PUT: 1187 atomic_dec(&epoch->active); 1188 break; 1189 case EV_GOT_BARRIER_NR: 1190 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1191 break; 1192 case EV_BECAME_LAST: 1193 /* nothing to do*/ 1194 break; 1195 } 1196 1197 if (epoch_size != 0 && 1198 atomic_read(&epoch->active) == 0 && 1199 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1200 if (!(ev & EV_CLEANUP)) { 1201 spin_unlock(&connection->epoch_lock); 1202 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1203 spin_lock(&connection->epoch_lock); 1204 } 1205 #if 0 1206 /* FIXME: dec unacked on connection, once we have 1207 * something to count pending connection packets in. */ 1208 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1209 dec_unacked(epoch->connection); 1210 #endif 1211 1212 if (connection->current_epoch != epoch) { 1213 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1214 list_del(&epoch->list); 1215 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1216 connection->epochs--; 1217 kfree(epoch); 1218 1219 if (rv == FE_STILL_LIVE) 1220 rv = FE_DESTROYED; 1221 } else { 1222 epoch->flags = 0; 1223 atomic_set(&epoch->epoch_size, 0); 1224 /* atomic_set(&epoch->active, 0); is already zero */ 1225 if (rv == FE_STILL_LIVE) 1226 rv = FE_RECYCLED; 1227 } 1228 } 1229 1230 if (!next_epoch) 1231 break; 1232 1233 epoch = next_epoch; 1234 } while (1); 1235 1236 spin_unlock(&connection->epoch_lock); 1237 1238 return rv; 1239 } 1240 1241 static enum write_ordering_e 1242 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo) 1243 { 1244 struct disk_conf *dc; 1245 1246 dc = rcu_dereference(bdev->disk_conf); 1247 1248 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes) 1249 wo = WO_DRAIN_IO; 1250 if (wo == WO_DRAIN_IO && !dc->disk_drain) 1251 wo = WO_NONE; 1252 1253 return wo; 1254 } 1255 1256 /* 1257 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1258 * @wo: Write ordering method to try. 1259 */ 1260 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, 1261 enum write_ordering_e wo) 1262 { 1263 struct drbd_device *device; 1264 enum write_ordering_e pwo; 1265 int vnr; 1266 static char *write_ordering_str[] = { 1267 [WO_NONE] = "none", 1268 [WO_DRAIN_IO] = "drain", 1269 [WO_BDEV_FLUSH] = "flush", 1270 }; 1271 1272 pwo = resource->write_ordering; 1273 if (wo != WO_BDEV_FLUSH) 1274 wo = min(pwo, wo); 1275 rcu_read_lock(); 1276 idr_for_each_entry(&resource->devices, device, vnr) { 1277 if (get_ldev(device)) { 1278 wo = max_allowed_wo(device->ldev, wo); 1279 if (device->ldev == bdev) 1280 bdev = NULL; 1281 put_ldev(device); 1282 } 1283 } 1284 1285 if (bdev) 1286 wo = max_allowed_wo(bdev, wo); 1287 1288 rcu_read_unlock(); 1289 1290 resource->write_ordering = wo; 1291 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH) 1292 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]); 1293 } 1294 1295 /* 1296 * Mapping "discard" to ZEROOUT with UNMAP does not work for us: 1297 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it 1298 * will directly go to fallback mode, submitting normal writes, and 1299 * never even try to UNMAP. 1300 * 1301 * And dm-thin does not do this (yet), mostly because in general it has 1302 * to assume that "skip_block_zeroing" is set. See also: 1303 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html 1304 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html 1305 * 1306 * We *may* ignore the discard-zeroes-data setting, if so configured. 1307 * 1308 * Assumption is that this "discard_zeroes_data=0" is only because the backend 1309 * may ignore partial unaligned discards. 1310 * 1311 * LVM/DM thin as of at least 1312 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28) 1313 * Library version: 1.02.93-RHEL7 (2015-01-28) 1314 * Driver version: 4.29.0 1315 * still behaves this way. 1316 * 1317 * For unaligned (wrt. alignment and granularity) or too small discards, 1318 * we zero-out the initial (and/or) trailing unaligned partial chunks, 1319 * but discard all the aligned full chunks. 1320 * 1321 * At least for LVM/DM thin, with skip_block_zeroing=false, 1322 * the result is effectively "discard_zeroes_data=1". 1323 */ 1324 /* flags: EE_TRIM|EE_ZEROOUT */ 1325 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags) 1326 { 1327 struct block_device *bdev = device->ldev->backing_bdev; 1328 sector_t tmp, nr; 1329 unsigned int max_discard_sectors, granularity; 1330 int alignment; 1331 int err = 0; 1332 1333 if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM)) 1334 goto zero_out; 1335 1336 /* Zero-sector (unknown) and one-sector granularities are the same. */ 1337 granularity = max(bdev_discard_granularity(bdev) >> 9, 1U); 1338 alignment = (bdev_discard_alignment(bdev) >> 9) % granularity; 1339 1340 max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22)); 1341 max_discard_sectors -= max_discard_sectors % granularity; 1342 if (unlikely(!max_discard_sectors)) 1343 goto zero_out; 1344 1345 if (nr_sectors < granularity) 1346 goto zero_out; 1347 1348 tmp = start; 1349 if (sector_div(tmp, granularity) != alignment) { 1350 if (nr_sectors < 2*granularity) 1351 goto zero_out; 1352 /* start + gran - (start + gran - align) % gran */ 1353 tmp = start + granularity - alignment; 1354 tmp = start + granularity - sector_div(tmp, granularity); 1355 1356 nr = tmp - start; 1357 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many 1358 * layers are below us, some may have smaller granularity */ 1359 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0); 1360 nr_sectors -= nr; 1361 start = tmp; 1362 } 1363 while (nr_sectors >= max_discard_sectors) { 1364 err |= blkdev_issue_discard(bdev, start, max_discard_sectors, 1365 GFP_NOIO); 1366 nr_sectors -= max_discard_sectors; 1367 start += max_discard_sectors; 1368 } 1369 if (nr_sectors) { 1370 /* max_discard_sectors is unsigned int (and a multiple of 1371 * granularity, we made sure of that above already); 1372 * nr is < max_discard_sectors; 1373 * I don't need sector_div here, even though nr is sector_t */ 1374 nr = nr_sectors; 1375 nr -= (unsigned int)nr % granularity; 1376 if (nr) { 1377 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO); 1378 nr_sectors -= nr; 1379 start += nr; 1380 } 1381 } 1382 zero_out: 1383 if (nr_sectors) { 1384 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 1385 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP); 1386 } 1387 return err != 0; 1388 } 1389 1390 static bool can_do_reliable_discards(struct drbd_device *device) 1391 { 1392 struct disk_conf *dc; 1393 bool can_do; 1394 1395 if (!bdev_max_discard_sectors(device->ldev->backing_bdev)) 1396 return false; 1397 1398 rcu_read_lock(); 1399 dc = rcu_dereference(device->ldev->disk_conf); 1400 can_do = dc->discard_zeroes_if_aligned; 1401 rcu_read_unlock(); 1402 return can_do; 1403 } 1404 1405 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req) 1406 { 1407 /* If the backend cannot discard, or does not guarantee 1408 * read-back zeroes in discarded ranges, we fall back to 1409 * zero-out. Unless configuration specifically requested 1410 * otherwise. */ 1411 if (!can_do_reliable_discards(device)) 1412 peer_req->flags |= EE_ZEROOUT; 1413 1414 if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector, 1415 peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM))) 1416 peer_req->flags |= EE_WAS_ERROR; 1417 drbd_endio_write_sec_final(peer_req); 1418 } 1419 1420 static int peer_request_fault_type(struct drbd_peer_request *peer_req) 1421 { 1422 if (peer_req_op(peer_req) == REQ_OP_READ) { 1423 return peer_req->flags & EE_APPLICATION ? 1424 DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD; 1425 } else { 1426 return peer_req->flags & EE_APPLICATION ? 1427 DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR; 1428 } 1429 } 1430 1431 /** 1432 * drbd_submit_peer_request() 1433 * @peer_req: peer request 1434 * 1435 * May spread the pages to multiple bios, 1436 * depending on bio_add_page restrictions. 1437 * 1438 * Returns 0 if all bios have been submitted, 1439 * -ENOMEM if we could not allocate enough bios, 1440 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1441 * single page to an empty bio (which should never happen and likely indicates 1442 * that the lower level IO stack is in some way broken). This has been observed 1443 * on certain Xen deployments. 1444 */ 1445 /* TODO allocate from our own bio_set. */ 1446 int drbd_submit_peer_request(struct drbd_peer_request *peer_req) 1447 { 1448 struct drbd_device *device = peer_req->peer_device->device; 1449 struct bio *bios = NULL; 1450 struct bio *bio; 1451 struct page *page = peer_req->pages; 1452 sector_t sector = peer_req->i.sector; 1453 unsigned int data_size = peer_req->i.size; 1454 unsigned int n_bios = 0; 1455 unsigned int nr_pages = PFN_UP(data_size); 1456 1457 /* TRIM/DISCARD: for now, always use the helper function 1458 * blkdev_issue_zeroout(..., discard=true). 1459 * It's synchronous, but it does the right thing wrt. bio splitting. 1460 * Correctness first, performance later. Next step is to code an 1461 * asynchronous variant of the same. 1462 */ 1463 if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) { 1464 /* wait for all pending IO completions, before we start 1465 * zeroing things out. */ 1466 conn_wait_active_ee_empty(peer_req->peer_device->connection); 1467 /* add it to the active list now, 1468 * so we can find it to present it in debugfs */ 1469 peer_req->submit_jif = jiffies; 1470 peer_req->flags |= EE_SUBMITTED; 1471 1472 /* If this was a resync request from receive_rs_deallocated(), 1473 * it is already on the sync_ee list */ 1474 if (list_empty(&peer_req->w.list)) { 1475 spin_lock_irq(&device->resource->req_lock); 1476 list_add_tail(&peer_req->w.list, &device->active_ee); 1477 spin_unlock_irq(&device->resource->req_lock); 1478 } 1479 1480 drbd_issue_peer_discard_or_zero_out(device, peer_req); 1481 return 0; 1482 } 1483 1484 /* In most cases, we will only need one bio. But in case the lower 1485 * level restrictions happen to be different at this offset on this 1486 * side than those of the sending peer, we may need to submit the 1487 * request in more than one bio. 1488 * 1489 * Plain bio_alloc is good enough here, this is no DRBD internally 1490 * generated bio, but a bio allocated on behalf of the peer. 1491 */ 1492 next_bio: 1493 /* _DISCARD, _WRITE_ZEROES handled above. 1494 * REQ_OP_FLUSH (empty flush) not expected, 1495 * should have been mapped to a "drbd protocol barrier". 1496 * REQ_OP_SECURE_ERASE: I don't see how we could ever support that. 1497 */ 1498 if (!(peer_req_op(peer_req) == REQ_OP_WRITE || 1499 peer_req_op(peer_req) == REQ_OP_READ)) { 1500 drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf); 1501 return -EINVAL; 1502 } 1503 1504 bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO); 1505 /* > peer_req->i.sector, unless this is the first bio */ 1506 bio->bi_iter.bi_sector = sector; 1507 bio->bi_private = peer_req; 1508 bio->bi_end_io = drbd_peer_request_endio; 1509 1510 bio->bi_next = bios; 1511 bios = bio; 1512 ++n_bios; 1513 1514 page_chain_for_each(page) { 1515 unsigned len = min_t(unsigned, data_size, PAGE_SIZE); 1516 if (!bio_add_page(bio, page, len, 0)) 1517 goto next_bio; 1518 data_size -= len; 1519 sector += len >> 9; 1520 --nr_pages; 1521 } 1522 D_ASSERT(device, data_size == 0); 1523 D_ASSERT(device, page == NULL); 1524 1525 atomic_set(&peer_req->pending_bios, n_bios); 1526 /* for debugfs: update timestamp, mark as submitted */ 1527 peer_req->submit_jif = jiffies; 1528 peer_req->flags |= EE_SUBMITTED; 1529 do { 1530 bio = bios; 1531 bios = bios->bi_next; 1532 bio->bi_next = NULL; 1533 1534 drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio); 1535 } while (bios); 1536 return 0; 1537 } 1538 1539 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1540 struct drbd_peer_request *peer_req) 1541 { 1542 struct drbd_interval *i = &peer_req->i; 1543 1544 drbd_remove_interval(&device->write_requests, i); 1545 drbd_clear_interval(i); 1546 1547 /* Wake up any processes waiting for this peer request to complete. */ 1548 if (i->waiting) 1549 wake_up(&device->misc_wait); 1550 } 1551 1552 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1553 { 1554 struct drbd_peer_device *peer_device; 1555 int vnr; 1556 1557 rcu_read_lock(); 1558 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1559 struct drbd_device *device = peer_device->device; 1560 1561 kref_get(&device->kref); 1562 rcu_read_unlock(); 1563 drbd_wait_ee_list_empty(device, &device->active_ee); 1564 kref_put(&device->kref, drbd_destroy_device); 1565 rcu_read_lock(); 1566 } 1567 rcu_read_unlock(); 1568 } 1569 1570 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1571 { 1572 int rv; 1573 struct p_barrier *p = pi->data; 1574 struct drbd_epoch *epoch; 1575 1576 /* FIXME these are unacked on connection, 1577 * not a specific (peer)device. 1578 */ 1579 connection->current_epoch->barrier_nr = p->barrier; 1580 connection->current_epoch->connection = connection; 1581 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1582 1583 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1584 * the activity log, which means it would not be resynced in case the 1585 * R_PRIMARY crashes now. 1586 * Therefore we must send the barrier_ack after the barrier request was 1587 * completed. */ 1588 switch (connection->resource->write_ordering) { 1589 case WO_NONE: 1590 if (rv == FE_RECYCLED) 1591 return 0; 1592 1593 /* receiver context, in the writeout path of the other node. 1594 * avoid potential distributed deadlock */ 1595 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1596 if (epoch) 1597 break; 1598 else 1599 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1600 fallthrough; 1601 1602 case WO_BDEV_FLUSH: 1603 case WO_DRAIN_IO: 1604 conn_wait_active_ee_empty(connection); 1605 drbd_flush(connection); 1606 1607 if (atomic_read(&connection->current_epoch->epoch_size)) { 1608 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1609 if (epoch) 1610 break; 1611 } 1612 1613 return 0; 1614 default: 1615 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", 1616 connection->resource->write_ordering); 1617 return -EIO; 1618 } 1619 1620 epoch->flags = 0; 1621 atomic_set(&epoch->epoch_size, 0); 1622 atomic_set(&epoch->active, 0); 1623 1624 spin_lock(&connection->epoch_lock); 1625 if (atomic_read(&connection->current_epoch->epoch_size)) { 1626 list_add(&epoch->list, &connection->current_epoch->list); 1627 connection->current_epoch = epoch; 1628 connection->epochs++; 1629 } else { 1630 /* The current_epoch got recycled while we allocated this one... */ 1631 kfree(epoch); 1632 } 1633 spin_unlock(&connection->epoch_lock); 1634 1635 return 0; 1636 } 1637 1638 /* quick wrapper in case payload size != request_size (write same) */ 1639 static void drbd_csum_ee_size(struct crypto_shash *h, 1640 struct drbd_peer_request *r, void *d, 1641 unsigned int payload_size) 1642 { 1643 unsigned int tmp = r->i.size; 1644 r->i.size = payload_size; 1645 drbd_csum_ee(h, r, d); 1646 r->i.size = tmp; 1647 } 1648 1649 /* used from receive_RSDataReply (recv_resync_read) 1650 * and from receive_Data. 1651 * data_size: actual payload ("data in") 1652 * for normal writes that is bi_size. 1653 * for discards, that is zero. 1654 * for write same, it is logical_block_size. 1655 * both trim and write same have the bi_size ("data len to be affected") 1656 * as extra argument in the packet header. 1657 */ 1658 static struct drbd_peer_request * 1659 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1660 struct packet_info *pi) __must_hold(local) 1661 { 1662 struct drbd_device *device = peer_device->device; 1663 const sector_t capacity = get_capacity(device->vdisk); 1664 struct drbd_peer_request *peer_req; 1665 struct page *page; 1666 int digest_size, err; 1667 unsigned int data_size = pi->size, ds; 1668 void *dig_in = peer_device->connection->int_dig_in; 1669 void *dig_vv = peer_device->connection->int_dig_vv; 1670 unsigned long *data; 1671 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; 1672 struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL; 1673 1674 digest_size = 0; 1675 if (!trim && peer_device->connection->peer_integrity_tfm) { 1676 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm); 1677 /* 1678 * FIXME: Receive the incoming digest into the receive buffer 1679 * here, together with its struct p_data? 1680 */ 1681 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1682 if (err) 1683 return NULL; 1684 data_size -= digest_size; 1685 } 1686 1687 /* assume request_size == data_size, but special case trim. */ 1688 ds = data_size; 1689 if (trim) { 1690 if (!expect(peer_device, data_size == 0)) 1691 return NULL; 1692 ds = be32_to_cpu(trim->size); 1693 } else if (zeroes) { 1694 if (!expect(peer_device, data_size == 0)) 1695 return NULL; 1696 ds = be32_to_cpu(zeroes->size); 1697 } 1698 1699 if (!expect(peer_device, IS_ALIGNED(ds, 512))) 1700 return NULL; 1701 if (trim || zeroes) { 1702 if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9))) 1703 return NULL; 1704 } else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE)) 1705 return NULL; 1706 1707 /* even though we trust out peer, 1708 * we sometimes have to double check. */ 1709 if (sector + (ds>>9) > capacity) { 1710 drbd_err(device, "request from peer beyond end of local disk: " 1711 "capacity: %llus < sector: %llus + size: %u\n", 1712 (unsigned long long)capacity, 1713 (unsigned long long)sector, ds); 1714 return NULL; 1715 } 1716 1717 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1718 * "criss-cross" setup, that might cause write-out on some other DRBD, 1719 * which in turn might block on the other node at this very place. */ 1720 peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO); 1721 if (!peer_req) 1722 return NULL; 1723 1724 peer_req->flags |= EE_WRITE; 1725 if (trim) { 1726 peer_req->flags |= EE_TRIM; 1727 return peer_req; 1728 } 1729 if (zeroes) { 1730 peer_req->flags |= EE_ZEROOUT; 1731 return peer_req; 1732 } 1733 1734 /* receive payload size bytes into page chain */ 1735 ds = data_size; 1736 page = peer_req->pages; 1737 page_chain_for_each(page) { 1738 unsigned len = min_t(int, ds, PAGE_SIZE); 1739 data = kmap(page); 1740 err = drbd_recv_all_warn(peer_device->connection, data, len); 1741 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1742 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1743 data[0] = data[0] ^ (unsigned long)-1; 1744 } 1745 kunmap(page); 1746 if (err) { 1747 drbd_free_peer_req(device, peer_req); 1748 return NULL; 1749 } 1750 ds -= len; 1751 } 1752 1753 if (digest_size) { 1754 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size); 1755 if (memcmp(dig_in, dig_vv, digest_size)) { 1756 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1757 (unsigned long long)sector, data_size); 1758 drbd_free_peer_req(device, peer_req); 1759 return NULL; 1760 } 1761 } 1762 device->recv_cnt += data_size >> 9; 1763 return peer_req; 1764 } 1765 1766 /* drbd_drain_block() just takes a data block 1767 * out of the socket input buffer, and discards it. 1768 */ 1769 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1770 { 1771 struct page *page; 1772 int err = 0; 1773 void *data; 1774 1775 if (!data_size) 1776 return 0; 1777 1778 page = drbd_alloc_pages(peer_device, 1, 1); 1779 1780 data = kmap(page); 1781 while (data_size) { 1782 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1783 1784 err = drbd_recv_all_warn(peer_device->connection, data, len); 1785 if (err) 1786 break; 1787 data_size -= len; 1788 } 1789 kunmap(page); 1790 drbd_free_pages(peer_device->device, page); 1791 return err; 1792 } 1793 1794 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1795 sector_t sector, int data_size) 1796 { 1797 struct bio_vec bvec; 1798 struct bvec_iter iter; 1799 struct bio *bio; 1800 int digest_size, err, expect; 1801 void *dig_in = peer_device->connection->int_dig_in; 1802 void *dig_vv = peer_device->connection->int_dig_vv; 1803 1804 digest_size = 0; 1805 if (peer_device->connection->peer_integrity_tfm) { 1806 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm); 1807 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1808 if (err) 1809 return err; 1810 data_size -= digest_size; 1811 } 1812 1813 /* optimistically update recv_cnt. if receiving fails below, 1814 * we disconnect anyways, and counters will be reset. */ 1815 peer_device->device->recv_cnt += data_size>>9; 1816 1817 bio = req->master_bio; 1818 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1819 1820 bio_for_each_segment(bvec, bio, iter) { 1821 void *mapped = bvec_kmap_local(&bvec); 1822 expect = min_t(int, data_size, bvec.bv_len); 1823 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1824 kunmap_local(mapped); 1825 if (err) 1826 return err; 1827 data_size -= expect; 1828 } 1829 1830 if (digest_size) { 1831 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1832 if (memcmp(dig_in, dig_vv, digest_size)) { 1833 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1834 return -EINVAL; 1835 } 1836 } 1837 1838 D_ASSERT(peer_device->device, data_size == 0); 1839 return 0; 1840 } 1841 1842 /* 1843 * e_end_resync_block() is called in ack_sender context via 1844 * drbd_finish_peer_reqs(). 1845 */ 1846 static int e_end_resync_block(struct drbd_work *w, int unused) 1847 { 1848 struct drbd_peer_request *peer_req = 1849 container_of(w, struct drbd_peer_request, w); 1850 struct drbd_peer_device *peer_device = peer_req->peer_device; 1851 struct drbd_device *device = peer_device->device; 1852 sector_t sector = peer_req->i.sector; 1853 int err; 1854 1855 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1856 1857 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1858 drbd_set_in_sync(peer_device, sector, peer_req->i.size); 1859 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 1860 } else { 1861 /* Record failure to sync */ 1862 drbd_rs_failed_io(peer_device, sector, peer_req->i.size); 1863 1864 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1865 } 1866 dec_unacked(device); 1867 1868 return err; 1869 } 1870 1871 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 1872 struct packet_info *pi) __releases(local) 1873 { 1874 struct drbd_device *device = peer_device->device; 1875 struct drbd_peer_request *peer_req; 1876 1877 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi); 1878 if (!peer_req) 1879 goto fail; 1880 1881 dec_rs_pending(peer_device); 1882 1883 inc_unacked(device); 1884 /* corresponding dec_unacked() in e_end_resync_block() 1885 * respective _drbd_clear_done_ee */ 1886 1887 peer_req->w.cb = e_end_resync_block; 1888 peer_req->opf = REQ_OP_WRITE; 1889 peer_req->submit_jif = jiffies; 1890 1891 spin_lock_irq(&device->resource->req_lock); 1892 list_add_tail(&peer_req->w.list, &device->sync_ee); 1893 spin_unlock_irq(&device->resource->req_lock); 1894 1895 atomic_add(pi->size >> 9, &device->rs_sect_ev); 1896 if (drbd_submit_peer_request(peer_req) == 0) 1897 return 0; 1898 1899 /* don't care for the reason here */ 1900 drbd_err(device, "submit failed, triggering re-connect\n"); 1901 spin_lock_irq(&device->resource->req_lock); 1902 list_del(&peer_req->w.list); 1903 spin_unlock_irq(&device->resource->req_lock); 1904 1905 drbd_free_peer_req(device, peer_req); 1906 fail: 1907 put_ldev(device); 1908 return -EIO; 1909 } 1910 1911 static struct drbd_request * 1912 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 1913 sector_t sector, bool missing_ok, const char *func) 1914 { 1915 struct drbd_request *req; 1916 1917 /* Request object according to our peer */ 1918 req = (struct drbd_request *)(unsigned long)id; 1919 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 1920 return req; 1921 if (!missing_ok) { 1922 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 1923 (unsigned long)id, (unsigned long long)sector); 1924 } 1925 return NULL; 1926 } 1927 1928 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 1929 { 1930 struct drbd_peer_device *peer_device; 1931 struct drbd_device *device; 1932 struct drbd_request *req; 1933 sector_t sector; 1934 int err; 1935 struct p_data *p = pi->data; 1936 1937 peer_device = conn_peer_device(connection, pi->vnr); 1938 if (!peer_device) 1939 return -EIO; 1940 device = peer_device->device; 1941 1942 sector = be64_to_cpu(p->sector); 1943 1944 spin_lock_irq(&device->resource->req_lock); 1945 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 1946 spin_unlock_irq(&device->resource->req_lock); 1947 if (unlikely(!req)) 1948 return -EIO; 1949 1950 err = recv_dless_read(peer_device, req, sector, pi->size); 1951 if (!err) 1952 req_mod(req, DATA_RECEIVED, peer_device); 1953 /* else: nothing. handled from drbd_disconnect... 1954 * I don't think we may complete this just yet 1955 * in case we are "on-disconnect: freeze" */ 1956 1957 return err; 1958 } 1959 1960 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 1961 { 1962 struct drbd_peer_device *peer_device; 1963 struct drbd_device *device; 1964 sector_t sector; 1965 int err; 1966 struct p_data *p = pi->data; 1967 1968 peer_device = conn_peer_device(connection, pi->vnr); 1969 if (!peer_device) 1970 return -EIO; 1971 device = peer_device->device; 1972 1973 sector = be64_to_cpu(p->sector); 1974 D_ASSERT(device, p->block_id == ID_SYNCER); 1975 1976 if (get_ldev(device)) { 1977 /* data is submitted to disk within recv_resync_read. 1978 * corresponding put_ldev done below on error, 1979 * or in drbd_peer_request_endio. */ 1980 err = recv_resync_read(peer_device, sector, pi); 1981 } else { 1982 if (drbd_ratelimit()) 1983 drbd_err(device, "Can not write resync data to local disk.\n"); 1984 1985 err = drbd_drain_block(peer_device, pi->size); 1986 1987 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 1988 } 1989 1990 atomic_add(pi->size >> 9, &device->rs_sect_in); 1991 1992 return err; 1993 } 1994 1995 static void restart_conflicting_writes(struct drbd_device *device, 1996 sector_t sector, int size) 1997 { 1998 struct drbd_interval *i; 1999 struct drbd_request *req; 2000 2001 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2002 if (!i->local) 2003 continue; 2004 req = container_of(i, struct drbd_request, i); 2005 if (req->rq_state & RQ_LOCAL_PENDING || 2006 !(req->rq_state & RQ_POSTPONED)) 2007 continue; 2008 /* as it is RQ_POSTPONED, this will cause it to 2009 * be queued on the retry workqueue. */ 2010 __req_mod(req, CONFLICT_RESOLVED, NULL, NULL); 2011 } 2012 } 2013 2014 /* 2015 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs(). 2016 */ 2017 static int e_end_block(struct drbd_work *w, int cancel) 2018 { 2019 struct drbd_peer_request *peer_req = 2020 container_of(w, struct drbd_peer_request, w); 2021 struct drbd_peer_device *peer_device = peer_req->peer_device; 2022 struct drbd_device *device = peer_device->device; 2023 sector_t sector = peer_req->i.sector; 2024 int err = 0, pcmd; 2025 2026 if (peer_req->flags & EE_SEND_WRITE_ACK) { 2027 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 2028 pcmd = (device->state.conn >= C_SYNC_SOURCE && 2029 device->state.conn <= C_PAUSED_SYNC_T && 2030 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 2031 P_RS_WRITE_ACK : P_WRITE_ACK; 2032 err = drbd_send_ack(peer_device, pcmd, peer_req); 2033 if (pcmd == P_RS_WRITE_ACK) 2034 drbd_set_in_sync(peer_device, sector, peer_req->i.size); 2035 } else { 2036 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 2037 /* we expect it to be marked out of sync anyways... 2038 * maybe assert this? */ 2039 } 2040 dec_unacked(device); 2041 } 2042 2043 /* we delete from the conflict detection hash _after_ we sent out the 2044 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 2045 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 2046 spin_lock_irq(&device->resource->req_lock); 2047 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 2048 drbd_remove_epoch_entry_interval(device, peer_req); 2049 if (peer_req->flags & EE_RESTART_REQUESTS) 2050 restart_conflicting_writes(device, sector, peer_req->i.size); 2051 spin_unlock_irq(&device->resource->req_lock); 2052 } else 2053 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 2054 2055 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 2056 2057 return err; 2058 } 2059 2060 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 2061 { 2062 struct drbd_peer_request *peer_req = 2063 container_of(w, struct drbd_peer_request, w); 2064 struct drbd_peer_device *peer_device = peer_req->peer_device; 2065 int err; 2066 2067 err = drbd_send_ack(peer_device, ack, peer_req); 2068 dec_unacked(peer_device->device); 2069 2070 return err; 2071 } 2072 2073 static int e_send_superseded(struct drbd_work *w, int unused) 2074 { 2075 return e_send_ack(w, P_SUPERSEDED); 2076 } 2077 2078 static int e_send_retry_write(struct drbd_work *w, int unused) 2079 { 2080 struct drbd_peer_request *peer_req = 2081 container_of(w, struct drbd_peer_request, w); 2082 struct drbd_connection *connection = peer_req->peer_device->connection; 2083 2084 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 2085 P_RETRY_WRITE : P_SUPERSEDED); 2086 } 2087 2088 static bool seq_greater(u32 a, u32 b) 2089 { 2090 /* 2091 * We assume 32-bit wrap-around here. 2092 * For 24-bit wrap-around, we would have to shift: 2093 * a <<= 8; b <<= 8; 2094 */ 2095 return (s32)a - (s32)b > 0; 2096 } 2097 2098 static u32 seq_max(u32 a, u32 b) 2099 { 2100 return seq_greater(a, b) ? a : b; 2101 } 2102 2103 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 2104 { 2105 struct drbd_device *device = peer_device->device; 2106 unsigned int newest_peer_seq; 2107 2108 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 2109 spin_lock(&device->peer_seq_lock); 2110 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 2111 device->peer_seq = newest_peer_seq; 2112 spin_unlock(&device->peer_seq_lock); 2113 /* wake up only if we actually changed device->peer_seq */ 2114 if (peer_seq == newest_peer_seq) 2115 wake_up(&device->seq_wait); 2116 } 2117 } 2118 2119 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 2120 { 2121 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 2122 } 2123 2124 /* maybe change sync_ee into interval trees as well? */ 2125 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 2126 { 2127 struct drbd_peer_request *rs_req; 2128 bool rv = false; 2129 2130 spin_lock_irq(&device->resource->req_lock); 2131 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 2132 if (overlaps(peer_req->i.sector, peer_req->i.size, 2133 rs_req->i.sector, rs_req->i.size)) { 2134 rv = true; 2135 break; 2136 } 2137 } 2138 spin_unlock_irq(&device->resource->req_lock); 2139 2140 return rv; 2141 } 2142 2143 /* Called from receive_Data. 2144 * Synchronize packets on sock with packets on msock. 2145 * 2146 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 2147 * packet traveling on msock, they are still processed in the order they have 2148 * been sent. 2149 * 2150 * Note: we don't care for Ack packets overtaking P_DATA packets. 2151 * 2152 * In case packet_seq is larger than device->peer_seq number, there are 2153 * outstanding packets on the msock. We wait for them to arrive. 2154 * In case we are the logically next packet, we update device->peer_seq 2155 * ourselves. Correctly handles 32bit wrap around. 2156 * 2157 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 2158 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 2159 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 2160 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 2161 * 2162 * returns 0 if we may process the packet, 2163 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 2164 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 2165 { 2166 struct drbd_device *device = peer_device->device; 2167 DEFINE_WAIT(wait); 2168 long timeout; 2169 int ret = 0, tp; 2170 2171 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 2172 return 0; 2173 2174 spin_lock(&device->peer_seq_lock); 2175 for (;;) { 2176 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 2177 device->peer_seq = seq_max(device->peer_seq, peer_seq); 2178 break; 2179 } 2180 2181 if (signal_pending(current)) { 2182 ret = -ERESTARTSYS; 2183 break; 2184 } 2185 2186 rcu_read_lock(); 2187 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2188 rcu_read_unlock(); 2189 2190 if (!tp) 2191 break; 2192 2193 /* Only need to wait if two_primaries is enabled */ 2194 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2195 spin_unlock(&device->peer_seq_lock); 2196 rcu_read_lock(); 2197 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2198 rcu_read_unlock(); 2199 timeout = schedule_timeout(timeout); 2200 spin_lock(&device->peer_seq_lock); 2201 if (!timeout) { 2202 ret = -ETIMEDOUT; 2203 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2204 break; 2205 } 2206 } 2207 spin_unlock(&device->peer_seq_lock); 2208 finish_wait(&device->seq_wait, &wait); 2209 return ret; 2210 } 2211 2212 static enum req_op wire_flags_to_bio_op(u32 dpf) 2213 { 2214 if (dpf & DP_ZEROES) 2215 return REQ_OP_WRITE_ZEROES; 2216 if (dpf & DP_DISCARD) 2217 return REQ_OP_DISCARD; 2218 else 2219 return REQ_OP_WRITE; 2220 } 2221 2222 /* see also bio_flags_to_wire() */ 2223 static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf) 2224 { 2225 return wire_flags_to_bio_op(dpf) | 2226 (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2227 (dpf & DP_FUA ? REQ_FUA : 0) | 2228 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0); 2229 } 2230 2231 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2232 unsigned int size) 2233 { 2234 struct drbd_peer_device *peer_device = first_peer_device(device); 2235 struct drbd_interval *i; 2236 2237 repeat: 2238 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2239 struct drbd_request *req; 2240 struct bio_and_error m; 2241 2242 if (!i->local) 2243 continue; 2244 req = container_of(i, struct drbd_request, i); 2245 if (!(req->rq_state & RQ_POSTPONED)) 2246 continue; 2247 req->rq_state &= ~RQ_POSTPONED; 2248 __req_mod(req, NEG_ACKED, peer_device, &m); 2249 spin_unlock_irq(&device->resource->req_lock); 2250 if (m.bio) 2251 complete_master_bio(device, &m); 2252 spin_lock_irq(&device->resource->req_lock); 2253 goto repeat; 2254 } 2255 } 2256 2257 static int handle_write_conflicts(struct drbd_device *device, 2258 struct drbd_peer_request *peer_req) 2259 { 2260 struct drbd_connection *connection = peer_req->peer_device->connection; 2261 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2262 sector_t sector = peer_req->i.sector; 2263 const unsigned int size = peer_req->i.size; 2264 struct drbd_interval *i; 2265 bool equal; 2266 int err; 2267 2268 /* 2269 * Inserting the peer request into the write_requests tree will prevent 2270 * new conflicting local requests from being added. 2271 */ 2272 drbd_insert_interval(&device->write_requests, &peer_req->i); 2273 2274 repeat: 2275 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2276 if (i == &peer_req->i) 2277 continue; 2278 if (i->completed) 2279 continue; 2280 2281 if (!i->local) { 2282 /* 2283 * Our peer has sent a conflicting remote request; this 2284 * should not happen in a two-node setup. Wait for the 2285 * earlier peer request to complete. 2286 */ 2287 err = drbd_wait_misc(device, i); 2288 if (err) 2289 goto out; 2290 goto repeat; 2291 } 2292 2293 equal = i->sector == sector && i->size == size; 2294 if (resolve_conflicts) { 2295 /* 2296 * If the peer request is fully contained within the 2297 * overlapping request, it can be considered overwritten 2298 * and thus superseded; otherwise, it will be retried 2299 * once all overlapping requests have completed. 2300 */ 2301 bool superseded = i->sector <= sector && i->sector + 2302 (i->size >> 9) >= sector + (size >> 9); 2303 2304 if (!equal) 2305 drbd_alert(device, "Concurrent writes detected: " 2306 "local=%llus +%u, remote=%llus +%u, " 2307 "assuming %s came first\n", 2308 (unsigned long long)i->sector, i->size, 2309 (unsigned long long)sector, size, 2310 superseded ? "local" : "remote"); 2311 2312 peer_req->w.cb = superseded ? e_send_superseded : 2313 e_send_retry_write; 2314 list_add_tail(&peer_req->w.list, &device->done_ee); 2315 /* put is in drbd_send_acks_wf() */ 2316 kref_get(&device->kref); 2317 if (!queue_work(connection->ack_sender, 2318 &peer_req->peer_device->send_acks_work)) 2319 kref_put(&device->kref, drbd_destroy_device); 2320 2321 err = -ENOENT; 2322 goto out; 2323 } else { 2324 struct drbd_request *req = 2325 container_of(i, struct drbd_request, i); 2326 2327 if (!equal) 2328 drbd_alert(device, "Concurrent writes detected: " 2329 "local=%llus +%u, remote=%llus +%u\n", 2330 (unsigned long long)i->sector, i->size, 2331 (unsigned long long)sector, size); 2332 2333 if (req->rq_state & RQ_LOCAL_PENDING || 2334 !(req->rq_state & RQ_POSTPONED)) { 2335 /* 2336 * Wait for the node with the discard flag to 2337 * decide if this request has been superseded 2338 * or needs to be retried. 2339 * Requests that have been superseded will 2340 * disappear from the write_requests tree. 2341 * 2342 * In addition, wait for the conflicting 2343 * request to finish locally before submitting 2344 * the conflicting peer request. 2345 */ 2346 err = drbd_wait_misc(device, &req->i); 2347 if (err) { 2348 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2349 fail_postponed_requests(device, sector, size); 2350 goto out; 2351 } 2352 goto repeat; 2353 } 2354 /* 2355 * Remember to restart the conflicting requests after 2356 * the new peer request has completed. 2357 */ 2358 peer_req->flags |= EE_RESTART_REQUESTS; 2359 } 2360 } 2361 err = 0; 2362 2363 out: 2364 if (err) 2365 drbd_remove_epoch_entry_interval(device, peer_req); 2366 return err; 2367 } 2368 2369 /* mirrored write */ 2370 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2371 { 2372 struct drbd_peer_device *peer_device; 2373 struct drbd_device *device; 2374 struct net_conf *nc; 2375 sector_t sector; 2376 struct drbd_peer_request *peer_req; 2377 struct p_data *p = pi->data; 2378 u32 peer_seq = be32_to_cpu(p->seq_num); 2379 u32 dp_flags; 2380 int err, tp; 2381 2382 peer_device = conn_peer_device(connection, pi->vnr); 2383 if (!peer_device) 2384 return -EIO; 2385 device = peer_device->device; 2386 2387 if (!get_ldev(device)) { 2388 int err2; 2389 2390 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2391 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2392 atomic_inc(&connection->current_epoch->epoch_size); 2393 err2 = drbd_drain_block(peer_device, pi->size); 2394 if (!err) 2395 err = err2; 2396 return err; 2397 } 2398 2399 /* 2400 * Corresponding put_ldev done either below (on various errors), or in 2401 * drbd_peer_request_endio, if we successfully submit the data at the 2402 * end of this function. 2403 */ 2404 2405 sector = be64_to_cpu(p->sector); 2406 peer_req = read_in_block(peer_device, p->block_id, sector, pi); 2407 if (!peer_req) { 2408 put_ldev(device); 2409 return -EIO; 2410 } 2411 2412 peer_req->w.cb = e_end_block; 2413 peer_req->submit_jif = jiffies; 2414 peer_req->flags |= EE_APPLICATION; 2415 2416 dp_flags = be32_to_cpu(p->dp_flags); 2417 peer_req->opf = wire_flags_to_bio(connection, dp_flags); 2418 if (pi->cmd == P_TRIM) { 2419 D_ASSERT(peer_device, peer_req->i.size > 0); 2420 D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD); 2421 D_ASSERT(peer_device, peer_req->pages == NULL); 2422 /* need to play safe: an older DRBD sender 2423 * may mean zero-out while sending P_TRIM. */ 2424 if (0 == (connection->agreed_features & DRBD_FF_WZEROES)) 2425 peer_req->flags |= EE_ZEROOUT; 2426 } else if (pi->cmd == P_ZEROES) { 2427 D_ASSERT(peer_device, peer_req->i.size > 0); 2428 D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES); 2429 D_ASSERT(peer_device, peer_req->pages == NULL); 2430 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */ 2431 if (dp_flags & DP_DISCARD) 2432 peer_req->flags |= EE_TRIM; 2433 } else if (peer_req->pages == NULL) { 2434 D_ASSERT(device, peer_req->i.size == 0); 2435 D_ASSERT(device, dp_flags & DP_FLUSH); 2436 } 2437 2438 if (dp_flags & DP_MAY_SET_IN_SYNC) 2439 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2440 2441 spin_lock(&connection->epoch_lock); 2442 peer_req->epoch = connection->current_epoch; 2443 atomic_inc(&peer_req->epoch->epoch_size); 2444 atomic_inc(&peer_req->epoch->active); 2445 spin_unlock(&connection->epoch_lock); 2446 2447 rcu_read_lock(); 2448 nc = rcu_dereference(peer_device->connection->net_conf); 2449 tp = nc->two_primaries; 2450 if (peer_device->connection->agreed_pro_version < 100) { 2451 switch (nc->wire_protocol) { 2452 case DRBD_PROT_C: 2453 dp_flags |= DP_SEND_WRITE_ACK; 2454 break; 2455 case DRBD_PROT_B: 2456 dp_flags |= DP_SEND_RECEIVE_ACK; 2457 break; 2458 } 2459 } 2460 rcu_read_unlock(); 2461 2462 if (dp_flags & DP_SEND_WRITE_ACK) { 2463 peer_req->flags |= EE_SEND_WRITE_ACK; 2464 inc_unacked(device); 2465 /* corresponding dec_unacked() in e_end_block() 2466 * respective _drbd_clear_done_ee */ 2467 } 2468 2469 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2470 /* I really don't like it that the receiver thread 2471 * sends on the msock, but anyways */ 2472 drbd_send_ack(peer_device, P_RECV_ACK, peer_req); 2473 } 2474 2475 if (tp) { 2476 /* two primaries implies protocol C */ 2477 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK); 2478 peer_req->flags |= EE_IN_INTERVAL_TREE; 2479 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2480 if (err) 2481 goto out_interrupted; 2482 spin_lock_irq(&device->resource->req_lock); 2483 err = handle_write_conflicts(device, peer_req); 2484 if (err) { 2485 spin_unlock_irq(&device->resource->req_lock); 2486 if (err == -ENOENT) { 2487 put_ldev(device); 2488 return 0; 2489 } 2490 goto out_interrupted; 2491 } 2492 } else { 2493 update_peer_seq(peer_device, peer_seq); 2494 spin_lock_irq(&device->resource->req_lock); 2495 } 2496 /* TRIM and is processed synchronously, 2497 * we wait for all pending requests, respectively wait for 2498 * active_ee to become empty in drbd_submit_peer_request(); 2499 * better not add ourselves here. */ 2500 if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0) 2501 list_add_tail(&peer_req->w.list, &device->active_ee); 2502 spin_unlock_irq(&device->resource->req_lock); 2503 2504 if (device->state.conn == C_SYNC_TARGET) 2505 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2506 2507 if (device->state.pdsk < D_INCONSISTENT) { 2508 /* In case we have the only disk of the cluster, */ 2509 drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size); 2510 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2511 drbd_al_begin_io(device, &peer_req->i); 2512 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2513 } 2514 2515 err = drbd_submit_peer_request(peer_req); 2516 if (!err) 2517 return 0; 2518 2519 /* don't care for the reason here */ 2520 drbd_err(device, "submit failed, triggering re-connect\n"); 2521 spin_lock_irq(&device->resource->req_lock); 2522 list_del(&peer_req->w.list); 2523 drbd_remove_epoch_entry_interval(device, peer_req); 2524 spin_unlock_irq(&device->resource->req_lock); 2525 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) { 2526 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 2527 drbd_al_complete_io(device, &peer_req->i); 2528 } 2529 2530 out_interrupted: 2531 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP); 2532 put_ldev(device); 2533 drbd_free_peer_req(device, peer_req); 2534 return err; 2535 } 2536 2537 /* We may throttle resync, if the lower device seems to be busy, 2538 * and current sync rate is above c_min_rate. 2539 * 2540 * To decide whether or not the lower device is busy, we use a scheme similar 2541 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2542 * (more than 64 sectors) of activity we cannot account for with our own resync 2543 * activity, it obviously is "busy". 2544 * 2545 * The current sync rate used here uses only the most recent two step marks, 2546 * to have a short time average so we can react faster. 2547 */ 2548 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector, 2549 bool throttle_if_app_is_waiting) 2550 { 2551 struct drbd_device *device = peer_device->device; 2552 struct lc_element *tmp; 2553 bool throttle = drbd_rs_c_min_rate_throttle(device); 2554 2555 if (!throttle || throttle_if_app_is_waiting) 2556 return throttle; 2557 2558 spin_lock_irq(&device->al_lock); 2559 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2560 if (tmp) { 2561 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2562 if (test_bit(BME_PRIORITY, &bm_ext->flags)) 2563 throttle = false; 2564 /* Do not slow down if app IO is already waiting for this extent, 2565 * and our progress is necessary for application IO to complete. */ 2566 } 2567 spin_unlock_irq(&device->al_lock); 2568 2569 return throttle; 2570 } 2571 2572 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) 2573 { 2574 struct gendisk *disk = device->ldev->backing_bdev->bd_disk; 2575 unsigned long db, dt, dbdt; 2576 unsigned int c_min_rate; 2577 int curr_events; 2578 2579 rcu_read_lock(); 2580 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2581 rcu_read_unlock(); 2582 2583 /* feature disabled? */ 2584 if (c_min_rate == 0) 2585 return false; 2586 2587 curr_events = (int)part_stat_read_accum(disk->part0, sectors) - 2588 atomic_read(&device->rs_sect_ev); 2589 2590 if (atomic_read(&device->ap_actlog_cnt) 2591 || curr_events - device->rs_last_events > 64) { 2592 unsigned long rs_left; 2593 int i; 2594 2595 device->rs_last_events = curr_events; 2596 2597 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2598 * approx. */ 2599 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2600 2601 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2602 rs_left = device->ov_left; 2603 else 2604 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2605 2606 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2607 if (!dt) 2608 dt++; 2609 db = device->rs_mark_left[i] - rs_left; 2610 dbdt = Bit2KB(db/dt); 2611 2612 if (dbdt > c_min_rate) 2613 return true; 2614 } 2615 return false; 2616 } 2617 2618 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2619 { 2620 struct drbd_peer_device *peer_device; 2621 struct drbd_device *device; 2622 sector_t sector; 2623 sector_t capacity; 2624 struct drbd_peer_request *peer_req; 2625 struct digest_info *di = NULL; 2626 int size, verb; 2627 struct p_block_req *p = pi->data; 2628 2629 peer_device = conn_peer_device(connection, pi->vnr); 2630 if (!peer_device) 2631 return -EIO; 2632 device = peer_device->device; 2633 capacity = get_capacity(device->vdisk); 2634 2635 sector = be64_to_cpu(p->sector); 2636 size = be32_to_cpu(p->blksize); 2637 2638 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2639 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2640 (unsigned long long)sector, size); 2641 return -EINVAL; 2642 } 2643 if (sector + (size>>9) > capacity) { 2644 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2645 (unsigned long long)sector, size); 2646 return -EINVAL; 2647 } 2648 2649 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2650 verb = 1; 2651 switch (pi->cmd) { 2652 case P_DATA_REQUEST: 2653 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2654 break; 2655 case P_RS_THIN_REQ: 2656 case P_RS_DATA_REQUEST: 2657 case P_CSUM_RS_REQUEST: 2658 case P_OV_REQUEST: 2659 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2660 break; 2661 case P_OV_REPLY: 2662 verb = 0; 2663 dec_rs_pending(peer_device); 2664 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2665 break; 2666 default: 2667 BUG(); 2668 } 2669 if (verb && drbd_ratelimit()) 2670 drbd_err(device, "Can not satisfy peer's read request, " 2671 "no local data.\n"); 2672 2673 /* drain possibly payload */ 2674 return drbd_drain_block(peer_device, pi->size); 2675 } 2676 2677 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2678 * "criss-cross" setup, that might cause write-out on some other DRBD, 2679 * which in turn might block on the other node at this very place. */ 2680 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, 2681 size, GFP_NOIO); 2682 if (!peer_req) { 2683 put_ldev(device); 2684 return -ENOMEM; 2685 } 2686 peer_req->opf = REQ_OP_READ; 2687 2688 switch (pi->cmd) { 2689 case P_DATA_REQUEST: 2690 peer_req->w.cb = w_e_end_data_req; 2691 /* application IO, don't drbd_rs_begin_io */ 2692 peer_req->flags |= EE_APPLICATION; 2693 goto submit; 2694 2695 case P_RS_THIN_REQ: 2696 /* If at some point in the future we have a smart way to 2697 find out if this data block is completely deallocated, 2698 then we would do something smarter here than reading 2699 the block... */ 2700 peer_req->flags |= EE_RS_THIN_REQ; 2701 fallthrough; 2702 case P_RS_DATA_REQUEST: 2703 peer_req->w.cb = w_e_end_rsdata_req; 2704 /* used in the sector offset progress display */ 2705 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2706 break; 2707 2708 case P_OV_REPLY: 2709 case P_CSUM_RS_REQUEST: 2710 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2711 if (!di) 2712 goto out_free_e; 2713 2714 di->digest_size = pi->size; 2715 di->digest = (((char *)di)+sizeof(struct digest_info)); 2716 2717 peer_req->digest = di; 2718 peer_req->flags |= EE_HAS_DIGEST; 2719 2720 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2721 goto out_free_e; 2722 2723 if (pi->cmd == P_CSUM_RS_REQUEST) { 2724 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2725 peer_req->w.cb = w_e_end_csum_rs_req; 2726 /* used in the sector offset progress display */ 2727 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2728 /* remember to report stats in drbd_resync_finished */ 2729 device->use_csums = true; 2730 } else if (pi->cmd == P_OV_REPLY) { 2731 /* track progress, we may need to throttle */ 2732 atomic_add(size >> 9, &device->rs_sect_in); 2733 peer_req->w.cb = w_e_end_ov_reply; 2734 dec_rs_pending(peer_device); 2735 /* drbd_rs_begin_io done when we sent this request, 2736 * but accounting still needs to be done. */ 2737 goto submit_for_resync; 2738 } 2739 break; 2740 2741 case P_OV_REQUEST: 2742 if (device->ov_start_sector == ~(sector_t)0 && 2743 peer_device->connection->agreed_pro_version >= 90) { 2744 unsigned long now = jiffies; 2745 int i; 2746 device->ov_start_sector = sector; 2747 device->ov_position = sector; 2748 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2749 device->rs_total = device->ov_left; 2750 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2751 device->rs_mark_left[i] = device->ov_left; 2752 device->rs_mark_time[i] = now; 2753 } 2754 drbd_info(device, "Online Verify start sector: %llu\n", 2755 (unsigned long long)sector); 2756 } 2757 peer_req->w.cb = w_e_end_ov_req; 2758 break; 2759 2760 default: 2761 BUG(); 2762 } 2763 2764 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2765 * wrt the receiver, but it is not as straightforward as it may seem. 2766 * Various places in the resync start and stop logic assume resync 2767 * requests are processed in order, requeuing this on the worker thread 2768 * introduces a bunch of new code for synchronization between threads. 2769 * 2770 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2771 * "forever", throttling after drbd_rs_begin_io will lock that extent 2772 * for application writes for the same time. For now, just throttle 2773 * here, where the rest of the code expects the receiver to sleep for 2774 * a while, anyways. 2775 */ 2776 2777 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2778 * this defers syncer requests for some time, before letting at least 2779 * on request through. The resync controller on the receiving side 2780 * will adapt to the incoming rate accordingly. 2781 * 2782 * We cannot throttle here if remote is Primary/SyncTarget: 2783 * we would also throttle its application reads. 2784 * In that case, throttling is done on the SyncTarget only. 2785 */ 2786 2787 /* Even though this may be a resync request, we do add to "read_ee"; 2788 * "sync_ee" is only used for resync WRITEs. 2789 * Add to list early, so debugfs can find this request 2790 * even if we have to sleep below. */ 2791 spin_lock_irq(&device->resource->req_lock); 2792 list_add_tail(&peer_req->w.list, &device->read_ee); 2793 spin_unlock_irq(&device->resource->req_lock); 2794 2795 update_receiver_timing_details(connection, drbd_rs_should_slow_down); 2796 if (device->state.peer != R_PRIMARY 2797 && drbd_rs_should_slow_down(peer_device, sector, false)) 2798 schedule_timeout_uninterruptible(HZ/10); 2799 update_receiver_timing_details(connection, drbd_rs_begin_io); 2800 if (drbd_rs_begin_io(device, sector)) 2801 goto out_free_e; 2802 2803 submit_for_resync: 2804 atomic_add(size >> 9, &device->rs_sect_ev); 2805 2806 submit: 2807 update_receiver_timing_details(connection, drbd_submit_peer_request); 2808 inc_unacked(device); 2809 if (drbd_submit_peer_request(peer_req) == 0) 2810 return 0; 2811 2812 /* don't care for the reason here */ 2813 drbd_err(device, "submit failed, triggering re-connect\n"); 2814 2815 out_free_e: 2816 spin_lock_irq(&device->resource->req_lock); 2817 list_del(&peer_req->w.list); 2818 spin_unlock_irq(&device->resource->req_lock); 2819 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2820 2821 put_ldev(device); 2822 drbd_free_peer_req(device, peer_req); 2823 return -EIO; 2824 } 2825 2826 /* 2827 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2828 */ 2829 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2830 { 2831 struct drbd_device *device = peer_device->device; 2832 int self, peer, rv = -100; 2833 unsigned long ch_self, ch_peer; 2834 enum drbd_after_sb_p after_sb_0p; 2835 2836 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2837 peer = device->p_uuid[UI_BITMAP] & 1; 2838 2839 ch_peer = device->p_uuid[UI_SIZE]; 2840 ch_self = device->comm_bm_set; 2841 2842 rcu_read_lock(); 2843 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2844 rcu_read_unlock(); 2845 switch (after_sb_0p) { 2846 case ASB_CONSENSUS: 2847 case ASB_DISCARD_SECONDARY: 2848 case ASB_CALL_HELPER: 2849 case ASB_VIOLENTLY: 2850 drbd_err(device, "Configuration error.\n"); 2851 break; 2852 case ASB_DISCONNECT: 2853 break; 2854 case ASB_DISCARD_YOUNGER_PRI: 2855 if (self == 0 && peer == 1) { 2856 rv = -1; 2857 break; 2858 } 2859 if (self == 1 && peer == 0) { 2860 rv = 1; 2861 break; 2862 } 2863 fallthrough; /* to one of the other strategies */ 2864 case ASB_DISCARD_OLDER_PRI: 2865 if (self == 0 && peer == 1) { 2866 rv = 1; 2867 break; 2868 } 2869 if (self == 1 && peer == 0) { 2870 rv = -1; 2871 break; 2872 } 2873 /* Else fall through to one of the other strategies... */ 2874 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 2875 "Using discard-least-changes instead\n"); 2876 fallthrough; 2877 case ASB_DISCARD_ZERO_CHG: 2878 if (ch_peer == 0 && ch_self == 0) { 2879 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2880 ? -1 : 1; 2881 break; 2882 } else { 2883 if (ch_peer == 0) { rv = 1; break; } 2884 if (ch_self == 0) { rv = -1; break; } 2885 } 2886 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 2887 break; 2888 fallthrough; 2889 case ASB_DISCARD_LEAST_CHG: 2890 if (ch_self < ch_peer) 2891 rv = -1; 2892 else if (ch_self > ch_peer) 2893 rv = 1; 2894 else /* ( ch_self == ch_peer ) */ 2895 /* Well, then use something else. */ 2896 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2897 ? -1 : 1; 2898 break; 2899 case ASB_DISCARD_LOCAL: 2900 rv = -1; 2901 break; 2902 case ASB_DISCARD_REMOTE: 2903 rv = 1; 2904 } 2905 2906 return rv; 2907 } 2908 2909 /* 2910 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 2911 */ 2912 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 2913 { 2914 struct drbd_device *device = peer_device->device; 2915 int hg, rv = -100; 2916 enum drbd_after_sb_p after_sb_1p; 2917 2918 rcu_read_lock(); 2919 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 2920 rcu_read_unlock(); 2921 switch (after_sb_1p) { 2922 case ASB_DISCARD_YOUNGER_PRI: 2923 case ASB_DISCARD_OLDER_PRI: 2924 case ASB_DISCARD_LEAST_CHG: 2925 case ASB_DISCARD_LOCAL: 2926 case ASB_DISCARD_REMOTE: 2927 case ASB_DISCARD_ZERO_CHG: 2928 drbd_err(device, "Configuration error.\n"); 2929 break; 2930 case ASB_DISCONNECT: 2931 break; 2932 case ASB_CONSENSUS: 2933 hg = drbd_asb_recover_0p(peer_device); 2934 if (hg == -1 && device->state.role == R_SECONDARY) 2935 rv = hg; 2936 if (hg == 1 && device->state.role == R_PRIMARY) 2937 rv = hg; 2938 break; 2939 case ASB_VIOLENTLY: 2940 rv = drbd_asb_recover_0p(peer_device); 2941 break; 2942 case ASB_DISCARD_SECONDARY: 2943 return device->state.role == R_PRIMARY ? 1 : -1; 2944 case ASB_CALL_HELPER: 2945 hg = drbd_asb_recover_0p(peer_device); 2946 if (hg == -1 && device->state.role == R_PRIMARY) { 2947 enum drbd_state_rv rv2; 2948 2949 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2950 * we might be here in C_WF_REPORT_PARAMS which is transient. 2951 * we do not need to wait for the after state change work either. */ 2952 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2953 if (rv2 != SS_SUCCESS) { 2954 drbd_khelper(device, "pri-lost-after-sb"); 2955 } else { 2956 drbd_warn(device, "Successfully gave up primary role.\n"); 2957 rv = hg; 2958 } 2959 } else 2960 rv = hg; 2961 } 2962 2963 return rv; 2964 } 2965 2966 /* 2967 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 2968 */ 2969 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 2970 { 2971 struct drbd_device *device = peer_device->device; 2972 int hg, rv = -100; 2973 enum drbd_after_sb_p after_sb_2p; 2974 2975 rcu_read_lock(); 2976 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 2977 rcu_read_unlock(); 2978 switch (after_sb_2p) { 2979 case ASB_DISCARD_YOUNGER_PRI: 2980 case ASB_DISCARD_OLDER_PRI: 2981 case ASB_DISCARD_LEAST_CHG: 2982 case ASB_DISCARD_LOCAL: 2983 case ASB_DISCARD_REMOTE: 2984 case ASB_CONSENSUS: 2985 case ASB_DISCARD_SECONDARY: 2986 case ASB_DISCARD_ZERO_CHG: 2987 drbd_err(device, "Configuration error.\n"); 2988 break; 2989 case ASB_VIOLENTLY: 2990 rv = drbd_asb_recover_0p(peer_device); 2991 break; 2992 case ASB_DISCONNECT: 2993 break; 2994 case ASB_CALL_HELPER: 2995 hg = drbd_asb_recover_0p(peer_device); 2996 if (hg == -1) { 2997 enum drbd_state_rv rv2; 2998 2999 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 3000 * we might be here in C_WF_REPORT_PARAMS which is transient. 3001 * we do not need to wait for the after state change work either. */ 3002 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 3003 if (rv2 != SS_SUCCESS) { 3004 drbd_khelper(device, "pri-lost-after-sb"); 3005 } else { 3006 drbd_warn(device, "Successfully gave up primary role.\n"); 3007 rv = hg; 3008 } 3009 } else 3010 rv = hg; 3011 } 3012 3013 return rv; 3014 } 3015 3016 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 3017 u64 bits, u64 flags) 3018 { 3019 if (!uuid) { 3020 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 3021 return; 3022 } 3023 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 3024 text, 3025 (unsigned long long)uuid[UI_CURRENT], 3026 (unsigned long long)uuid[UI_BITMAP], 3027 (unsigned long long)uuid[UI_HISTORY_START], 3028 (unsigned long long)uuid[UI_HISTORY_END], 3029 (unsigned long long)bits, 3030 (unsigned long long)flags); 3031 } 3032 3033 /* 3034 100 after split brain try auto recover 3035 2 C_SYNC_SOURCE set BitMap 3036 1 C_SYNC_SOURCE use BitMap 3037 0 no Sync 3038 -1 C_SYNC_TARGET use BitMap 3039 -2 C_SYNC_TARGET set BitMap 3040 -100 after split brain, disconnect 3041 -1000 unrelated data 3042 -1091 requires proto 91 3043 -1096 requires proto 96 3044 */ 3045 3046 static int drbd_uuid_compare(struct drbd_peer_device *const peer_device, 3047 enum drbd_role const peer_role, int *rule_nr) __must_hold(local) 3048 { 3049 struct drbd_connection *const connection = peer_device->connection; 3050 struct drbd_device *device = peer_device->device; 3051 u64 self, peer; 3052 int i, j; 3053 3054 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3055 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3056 3057 *rule_nr = 10; 3058 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 3059 return 0; 3060 3061 *rule_nr = 20; 3062 if ((self == UUID_JUST_CREATED || self == (u64)0) && 3063 peer != UUID_JUST_CREATED) 3064 return -2; 3065 3066 *rule_nr = 30; 3067 if (self != UUID_JUST_CREATED && 3068 (peer == UUID_JUST_CREATED || peer == (u64)0)) 3069 return 2; 3070 3071 if (self == peer) { 3072 int rct, dc; /* roles at crash time */ 3073 3074 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 3075 3076 if (connection->agreed_pro_version < 91) 3077 return -1091; 3078 3079 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 3080 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 3081 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 3082 drbd_uuid_move_history(device); 3083 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 3084 device->ldev->md.uuid[UI_BITMAP] = 0; 3085 3086 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3087 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3088 *rule_nr = 34; 3089 } else { 3090 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 3091 *rule_nr = 36; 3092 } 3093 3094 return 1; 3095 } 3096 3097 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 3098 3099 if (connection->agreed_pro_version < 91) 3100 return -1091; 3101 3102 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 3103 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 3104 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 3105 3106 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 3107 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 3108 device->p_uuid[UI_BITMAP] = 0UL; 3109 3110 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3111 *rule_nr = 35; 3112 } else { 3113 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 3114 *rule_nr = 37; 3115 } 3116 3117 return -1; 3118 } 3119 3120 /* Common power [off|failure] */ 3121 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 3122 (device->p_uuid[UI_FLAGS] & 2); 3123 /* lowest bit is set when we were primary, 3124 * next bit (weight 2) is set when peer was primary */ 3125 *rule_nr = 40; 3126 3127 /* Neither has the "crashed primary" flag set, 3128 * only a replication link hickup. */ 3129 if (rct == 0) 3130 return 0; 3131 3132 /* Current UUID equal and no bitmap uuid; does not necessarily 3133 * mean this was a "simultaneous hard crash", maybe IO was 3134 * frozen, so no UUID-bump happened. 3135 * This is a protocol change, overload DRBD_FF_WSAME as flag 3136 * for "new-enough" peer DRBD version. */ 3137 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) { 3138 *rule_nr = 41; 3139 if (!(connection->agreed_features & DRBD_FF_WSAME)) { 3140 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n"); 3141 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8)); 3142 } 3143 if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) { 3144 /* At least one has the "crashed primary" bit set, 3145 * both are primary now, but neither has rotated its UUIDs? 3146 * "Can not happen." */ 3147 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n"); 3148 return -100; 3149 } 3150 if (device->state.role == R_PRIMARY) 3151 return 1; 3152 return -1; 3153 } 3154 3155 /* Both are secondary. 3156 * Really looks like recovery from simultaneous hard crash. 3157 * Check which had been primary before, and arbitrate. */ 3158 switch (rct) { 3159 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */ 3160 case 1: /* self_pri && !peer_pri */ return 1; 3161 case 2: /* !self_pri && peer_pri */ return -1; 3162 case 3: /* self_pri && peer_pri */ 3163 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags); 3164 return dc ? -1 : 1; 3165 } 3166 } 3167 3168 *rule_nr = 50; 3169 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3170 if (self == peer) 3171 return -1; 3172 3173 *rule_nr = 51; 3174 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 3175 if (self == peer) { 3176 if (connection->agreed_pro_version < 96 ? 3177 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 3178 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 3179 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 3180 /* The last P_SYNC_UUID did not get though. Undo the last start of 3181 resync as sync source modifications of the peer's UUIDs. */ 3182 3183 if (connection->agreed_pro_version < 91) 3184 return -1091; 3185 3186 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 3187 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 3188 3189 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 3190 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3191 3192 return -1; 3193 } 3194 } 3195 3196 *rule_nr = 60; 3197 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3198 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3199 peer = device->p_uuid[i] & ~((u64)1); 3200 if (self == peer) 3201 return -2; 3202 } 3203 3204 *rule_nr = 70; 3205 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3206 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3207 if (self == peer) 3208 return 1; 3209 3210 *rule_nr = 71; 3211 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 3212 if (self == peer) { 3213 if (connection->agreed_pro_version < 96 ? 3214 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 3215 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 3216 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 3217 /* The last P_SYNC_UUID did not get though. Undo the last start of 3218 resync as sync source modifications of our UUIDs. */ 3219 3220 if (connection->agreed_pro_version < 91) 3221 return -1091; 3222 3223 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 3224 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 3225 3226 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 3227 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3228 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3229 3230 return 1; 3231 } 3232 } 3233 3234 3235 *rule_nr = 80; 3236 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3237 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3238 self = device->ldev->md.uuid[i] & ~((u64)1); 3239 if (self == peer) 3240 return 2; 3241 } 3242 3243 *rule_nr = 90; 3244 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3245 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3246 if (self == peer && self != ((u64)0)) 3247 return 100; 3248 3249 *rule_nr = 100; 3250 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3251 self = device->ldev->md.uuid[i] & ~((u64)1); 3252 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 3253 peer = device->p_uuid[j] & ~((u64)1); 3254 if (self == peer) 3255 return -100; 3256 } 3257 } 3258 3259 return -1000; 3260 } 3261 3262 /* drbd_sync_handshake() returns the new conn state on success, or 3263 CONN_MASK (-1) on failure. 3264 */ 3265 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 3266 enum drbd_role peer_role, 3267 enum drbd_disk_state peer_disk) __must_hold(local) 3268 { 3269 struct drbd_device *device = peer_device->device; 3270 enum drbd_conns rv = C_MASK; 3271 enum drbd_disk_state mydisk; 3272 struct net_conf *nc; 3273 int hg, rule_nr, rr_conflict, tentative, always_asbp; 3274 3275 mydisk = device->state.disk; 3276 if (mydisk == D_NEGOTIATING) 3277 mydisk = device->new_state_tmp.disk; 3278 3279 drbd_info(device, "drbd_sync_handshake:\n"); 3280 3281 spin_lock_irq(&device->ldev->md.uuid_lock); 3282 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 3283 drbd_uuid_dump(device, "peer", device->p_uuid, 3284 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3285 3286 hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr); 3287 spin_unlock_irq(&device->ldev->md.uuid_lock); 3288 3289 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 3290 3291 if (hg == -1000) { 3292 drbd_alert(device, "Unrelated data, aborting!\n"); 3293 return C_MASK; 3294 } 3295 if (hg < -0x10000) { 3296 int proto, fflags; 3297 hg = -hg; 3298 proto = hg & 0xff; 3299 fflags = (hg >> 8) & 0xff; 3300 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n", 3301 proto, fflags); 3302 return C_MASK; 3303 } 3304 if (hg < -1000) { 3305 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3306 return C_MASK; 3307 } 3308 3309 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3310 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3311 int f = (hg == -100) || abs(hg) == 2; 3312 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3313 if (f) 3314 hg = hg*2; 3315 drbd_info(device, "Becoming sync %s due to disk states.\n", 3316 hg > 0 ? "source" : "target"); 3317 } 3318 3319 if (abs(hg) == 100) 3320 drbd_khelper(device, "initial-split-brain"); 3321 3322 rcu_read_lock(); 3323 nc = rcu_dereference(peer_device->connection->net_conf); 3324 always_asbp = nc->always_asbp; 3325 rr_conflict = nc->rr_conflict; 3326 tentative = nc->tentative; 3327 rcu_read_unlock(); 3328 3329 if (hg == 100 || (hg == -100 && always_asbp)) { 3330 int pcount = (device->state.role == R_PRIMARY) 3331 + (peer_role == R_PRIMARY); 3332 int forced = (hg == -100); 3333 3334 switch (pcount) { 3335 case 0: 3336 hg = drbd_asb_recover_0p(peer_device); 3337 break; 3338 case 1: 3339 hg = drbd_asb_recover_1p(peer_device); 3340 break; 3341 case 2: 3342 hg = drbd_asb_recover_2p(peer_device); 3343 break; 3344 } 3345 if (abs(hg) < 100) { 3346 drbd_warn(device, "Split-Brain detected, %d primaries, " 3347 "automatically solved. Sync from %s node\n", 3348 pcount, (hg < 0) ? "peer" : "this"); 3349 if (forced) { 3350 drbd_warn(device, "Doing a full sync, since" 3351 " UUIDs where ambiguous.\n"); 3352 hg = hg*2; 3353 } 3354 } 3355 } 3356 3357 if (hg == -100) { 3358 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3359 hg = -1; 3360 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3361 hg = 1; 3362 3363 if (abs(hg) < 100) 3364 drbd_warn(device, "Split-Brain detected, manually solved. " 3365 "Sync from %s node\n", 3366 (hg < 0) ? "peer" : "this"); 3367 } 3368 3369 if (hg == -100) { 3370 /* FIXME this log message is not correct if we end up here 3371 * after an attempted attach on a diskless node. 3372 * We just refuse to attach -- well, we drop the "connection" 3373 * to that disk, in a way... */ 3374 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3375 drbd_khelper(device, "split-brain"); 3376 return C_MASK; 3377 } 3378 3379 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3380 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3381 return C_MASK; 3382 } 3383 3384 if (hg < 0 && /* by intention we do not use mydisk here. */ 3385 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3386 switch (rr_conflict) { 3387 case ASB_CALL_HELPER: 3388 drbd_khelper(device, "pri-lost"); 3389 fallthrough; 3390 case ASB_DISCONNECT: 3391 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3392 return C_MASK; 3393 case ASB_VIOLENTLY: 3394 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3395 "assumption\n"); 3396 } 3397 } 3398 3399 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3400 if (hg == 0) 3401 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3402 else 3403 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3404 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3405 abs(hg) >= 2 ? "full" : "bit-map based"); 3406 return C_MASK; 3407 } 3408 3409 if (abs(hg) >= 2) { 3410 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3411 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3412 BM_LOCKED_SET_ALLOWED, NULL)) 3413 return C_MASK; 3414 } 3415 3416 if (hg > 0) { /* become sync source. */ 3417 rv = C_WF_BITMAP_S; 3418 } else if (hg < 0) { /* become sync target */ 3419 rv = C_WF_BITMAP_T; 3420 } else { 3421 rv = C_CONNECTED; 3422 if (drbd_bm_total_weight(device)) { 3423 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3424 drbd_bm_total_weight(device)); 3425 } 3426 } 3427 3428 return rv; 3429 } 3430 3431 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3432 { 3433 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3434 if (peer == ASB_DISCARD_REMOTE) 3435 return ASB_DISCARD_LOCAL; 3436 3437 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3438 if (peer == ASB_DISCARD_LOCAL) 3439 return ASB_DISCARD_REMOTE; 3440 3441 /* everything else is valid if they are equal on both sides. */ 3442 return peer; 3443 } 3444 3445 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3446 { 3447 struct p_protocol *p = pi->data; 3448 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3449 int p_proto, p_discard_my_data, p_two_primaries, cf; 3450 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3451 char integrity_alg[SHARED_SECRET_MAX] = ""; 3452 struct crypto_shash *peer_integrity_tfm = NULL; 3453 void *int_dig_in = NULL, *int_dig_vv = NULL; 3454 3455 p_proto = be32_to_cpu(p->protocol); 3456 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3457 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3458 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3459 p_two_primaries = be32_to_cpu(p->two_primaries); 3460 cf = be32_to_cpu(p->conn_flags); 3461 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3462 3463 if (connection->agreed_pro_version >= 87) { 3464 int err; 3465 3466 if (pi->size > sizeof(integrity_alg)) 3467 return -EIO; 3468 err = drbd_recv_all(connection, integrity_alg, pi->size); 3469 if (err) 3470 return err; 3471 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3472 } 3473 3474 if (pi->cmd != P_PROTOCOL_UPDATE) { 3475 clear_bit(CONN_DRY_RUN, &connection->flags); 3476 3477 if (cf & CF_DRY_RUN) 3478 set_bit(CONN_DRY_RUN, &connection->flags); 3479 3480 rcu_read_lock(); 3481 nc = rcu_dereference(connection->net_conf); 3482 3483 if (p_proto != nc->wire_protocol) { 3484 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3485 goto disconnect_rcu_unlock; 3486 } 3487 3488 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3489 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3490 goto disconnect_rcu_unlock; 3491 } 3492 3493 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3494 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3495 goto disconnect_rcu_unlock; 3496 } 3497 3498 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3499 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3500 goto disconnect_rcu_unlock; 3501 } 3502 3503 if (p_discard_my_data && nc->discard_my_data) { 3504 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3505 goto disconnect_rcu_unlock; 3506 } 3507 3508 if (p_two_primaries != nc->two_primaries) { 3509 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3510 goto disconnect_rcu_unlock; 3511 } 3512 3513 if (strcmp(integrity_alg, nc->integrity_alg)) { 3514 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3515 goto disconnect_rcu_unlock; 3516 } 3517 3518 rcu_read_unlock(); 3519 } 3520 3521 if (integrity_alg[0]) { 3522 int hash_size; 3523 3524 /* 3525 * We can only change the peer data integrity algorithm 3526 * here. Changing our own data integrity algorithm 3527 * requires that we send a P_PROTOCOL_UPDATE packet at 3528 * the same time; otherwise, the peer has no way to 3529 * tell between which packets the algorithm should 3530 * change. 3531 */ 3532 3533 peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0); 3534 if (IS_ERR(peer_integrity_tfm)) { 3535 peer_integrity_tfm = NULL; 3536 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3537 integrity_alg); 3538 goto disconnect; 3539 } 3540 3541 hash_size = crypto_shash_digestsize(peer_integrity_tfm); 3542 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3543 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3544 if (!(int_dig_in && int_dig_vv)) { 3545 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3546 goto disconnect; 3547 } 3548 } 3549 3550 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); 3551 if (!new_net_conf) 3552 goto disconnect; 3553 3554 mutex_lock(&connection->data.mutex); 3555 mutex_lock(&connection->resource->conf_update); 3556 old_net_conf = connection->net_conf; 3557 *new_net_conf = *old_net_conf; 3558 3559 new_net_conf->wire_protocol = p_proto; 3560 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3561 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3562 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3563 new_net_conf->two_primaries = p_two_primaries; 3564 3565 rcu_assign_pointer(connection->net_conf, new_net_conf); 3566 mutex_unlock(&connection->resource->conf_update); 3567 mutex_unlock(&connection->data.mutex); 3568 3569 crypto_free_shash(connection->peer_integrity_tfm); 3570 kfree(connection->int_dig_in); 3571 kfree(connection->int_dig_vv); 3572 connection->peer_integrity_tfm = peer_integrity_tfm; 3573 connection->int_dig_in = int_dig_in; 3574 connection->int_dig_vv = int_dig_vv; 3575 3576 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3577 drbd_info(connection, "peer data-integrity-alg: %s\n", 3578 integrity_alg[0] ? integrity_alg : "(none)"); 3579 3580 kvfree_rcu_mightsleep(old_net_conf); 3581 return 0; 3582 3583 disconnect_rcu_unlock: 3584 rcu_read_unlock(); 3585 disconnect: 3586 crypto_free_shash(peer_integrity_tfm); 3587 kfree(int_dig_in); 3588 kfree(int_dig_vv); 3589 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3590 return -EIO; 3591 } 3592 3593 /* helper function 3594 * input: alg name, feature name 3595 * return: NULL (alg name was "") 3596 * ERR_PTR(error) if something goes wrong 3597 * or the crypto hash ptr, if it worked out ok. */ 3598 static struct crypto_shash *drbd_crypto_alloc_digest_safe( 3599 const struct drbd_device *device, 3600 const char *alg, const char *name) 3601 { 3602 struct crypto_shash *tfm; 3603 3604 if (!alg[0]) 3605 return NULL; 3606 3607 tfm = crypto_alloc_shash(alg, 0, 0); 3608 if (IS_ERR(tfm)) { 3609 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3610 alg, name, PTR_ERR(tfm)); 3611 return tfm; 3612 } 3613 return tfm; 3614 } 3615 3616 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3617 { 3618 void *buffer = connection->data.rbuf; 3619 int size = pi->size; 3620 3621 while (size) { 3622 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3623 s = drbd_recv(connection, buffer, s); 3624 if (s <= 0) { 3625 if (s < 0) 3626 return s; 3627 break; 3628 } 3629 size -= s; 3630 } 3631 if (size) 3632 return -EIO; 3633 return 0; 3634 } 3635 3636 /* 3637 * config_unknown_volume - device configuration command for unknown volume 3638 * 3639 * When a device is added to an existing connection, the node on which the 3640 * device is added first will send configuration commands to its peer but the 3641 * peer will not know about the device yet. It will warn and ignore these 3642 * commands. Once the device is added on the second node, the second node will 3643 * send the same device configuration commands, but in the other direction. 3644 * 3645 * (We can also end up here if drbd is misconfigured.) 3646 */ 3647 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3648 { 3649 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3650 cmdname(pi->cmd), pi->vnr); 3651 return ignore_remaining_packet(connection, pi); 3652 } 3653 3654 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3655 { 3656 struct drbd_peer_device *peer_device; 3657 struct drbd_device *device; 3658 struct p_rs_param_95 *p; 3659 unsigned int header_size, data_size, exp_max_sz; 3660 struct crypto_shash *verify_tfm = NULL; 3661 struct crypto_shash *csums_tfm = NULL; 3662 struct net_conf *old_net_conf, *new_net_conf = NULL; 3663 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3664 const int apv = connection->agreed_pro_version; 3665 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3666 unsigned int fifo_size = 0; 3667 int err; 3668 3669 peer_device = conn_peer_device(connection, pi->vnr); 3670 if (!peer_device) 3671 return config_unknown_volume(connection, pi); 3672 device = peer_device->device; 3673 3674 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3675 : apv == 88 ? sizeof(struct p_rs_param) 3676 + SHARED_SECRET_MAX 3677 : apv <= 94 ? sizeof(struct p_rs_param_89) 3678 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3679 3680 if (pi->size > exp_max_sz) { 3681 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3682 pi->size, exp_max_sz); 3683 return -EIO; 3684 } 3685 3686 if (apv <= 88) { 3687 header_size = sizeof(struct p_rs_param); 3688 data_size = pi->size - header_size; 3689 } else if (apv <= 94) { 3690 header_size = sizeof(struct p_rs_param_89); 3691 data_size = pi->size - header_size; 3692 D_ASSERT(device, data_size == 0); 3693 } else { 3694 header_size = sizeof(struct p_rs_param_95); 3695 data_size = pi->size - header_size; 3696 D_ASSERT(device, data_size == 0); 3697 } 3698 3699 /* initialize verify_alg and csums_alg */ 3700 p = pi->data; 3701 BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX); 3702 memset(&p->algs, 0, sizeof(p->algs)); 3703 3704 err = drbd_recv_all(peer_device->connection, p, header_size); 3705 if (err) 3706 return err; 3707 3708 mutex_lock(&connection->resource->conf_update); 3709 old_net_conf = peer_device->connection->net_conf; 3710 if (get_ldev(device)) { 3711 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3712 if (!new_disk_conf) { 3713 put_ldev(device); 3714 mutex_unlock(&connection->resource->conf_update); 3715 drbd_err(device, "Allocation of new disk_conf failed\n"); 3716 return -ENOMEM; 3717 } 3718 3719 old_disk_conf = device->ldev->disk_conf; 3720 *new_disk_conf = *old_disk_conf; 3721 3722 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3723 } 3724 3725 if (apv >= 88) { 3726 if (apv == 88) { 3727 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3728 drbd_err(device, "verify-alg of wrong size, " 3729 "peer wants %u, accepting only up to %u byte\n", 3730 data_size, SHARED_SECRET_MAX); 3731 goto reconnect; 3732 } 3733 3734 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3735 if (err) 3736 goto reconnect; 3737 /* we expect NUL terminated string */ 3738 /* but just in case someone tries to be evil */ 3739 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3740 p->verify_alg[data_size-1] = 0; 3741 3742 } else /* apv >= 89 */ { 3743 /* we still expect NUL terminated strings */ 3744 /* but just in case someone tries to be evil */ 3745 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3746 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3747 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3748 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3749 } 3750 3751 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3752 if (device->state.conn == C_WF_REPORT_PARAMS) { 3753 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3754 old_net_conf->verify_alg, p->verify_alg); 3755 goto disconnect; 3756 } 3757 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3758 p->verify_alg, "verify-alg"); 3759 if (IS_ERR(verify_tfm)) { 3760 verify_tfm = NULL; 3761 goto disconnect; 3762 } 3763 } 3764 3765 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3766 if (device->state.conn == C_WF_REPORT_PARAMS) { 3767 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3768 old_net_conf->csums_alg, p->csums_alg); 3769 goto disconnect; 3770 } 3771 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3772 p->csums_alg, "csums-alg"); 3773 if (IS_ERR(csums_tfm)) { 3774 csums_tfm = NULL; 3775 goto disconnect; 3776 } 3777 } 3778 3779 if (apv > 94 && new_disk_conf) { 3780 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3781 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3782 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3783 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3784 3785 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3786 if (fifo_size != device->rs_plan_s->size) { 3787 new_plan = fifo_alloc(fifo_size); 3788 if (!new_plan) { 3789 drbd_err(device, "kmalloc of fifo_buffer failed"); 3790 put_ldev(device); 3791 goto disconnect; 3792 } 3793 } 3794 } 3795 3796 if (verify_tfm || csums_tfm) { 3797 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); 3798 if (!new_net_conf) 3799 goto disconnect; 3800 3801 *new_net_conf = *old_net_conf; 3802 3803 if (verify_tfm) { 3804 strcpy(new_net_conf->verify_alg, p->verify_alg); 3805 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3806 crypto_free_shash(peer_device->connection->verify_tfm); 3807 peer_device->connection->verify_tfm = verify_tfm; 3808 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3809 } 3810 if (csums_tfm) { 3811 strcpy(new_net_conf->csums_alg, p->csums_alg); 3812 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3813 crypto_free_shash(peer_device->connection->csums_tfm); 3814 peer_device->connection->csums_tfm = csums_tfm; 3815 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3816 } 3817 rcu_assign_pointer(connection->net_conf, new_net_conf); 3818 } 3819 } 3820 3821 if (new_disk_conf) { 3822 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3823 put_ldev(device); 3824 } 3825 3826 if (new_plan) { 3827 old_plan = device->rs_plan_s; 3828 rcu_assign_pointer(device->rs_plan_s, new_plan); 3829 } 3830 3831 mutex_unlock(&connection->resource->conf_update); 3832 synchronize_rcu(); 3833 if (new_net_conf) 3834 kfree(old_net_conf); 3835 kfree(old_disk_conf); 3836 kfree(old_plan); 3837 3838 return 0; 3839 3840 reconnect: 3841 if (new_disk_conf) { 3842 put_ldev(device); 3843 kfree(new_disk_conf); 3844 } 3845 mutex_unlock(&connection->resource->conf_update); 3846 return -EIO; 3847 3848 disconnect: 3849 kfree(new_plan); 3850 if (new_disk_conf) { 3851 put_ldev(device); 3852 kfree(new_disk_conf); 3853 } 3854 mutex_unlock(&connection->resource->conf_update); 3855 /* just for completeness: actually not needed, 3856 * as this is not reached if csums_tfm was ok. */ 3857 crypto_free_shash(csums_tfm); 3858 /* but free the verify_tfm again, if csums_tfm did not work out */ 3859 crypto_free_shash(verify_tfm); 3860 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3861 return -EIO; 3862 } 3863 3864 /* warn if the arguments differ by more than 12.5% */ 3865 static void warn_if_differ_considerably(struct drbd_device *device, 3866 const char *s, sector_t a, sector_t b) 3867 { 3868 sector_t d; 3869 if (a == 0 || b == 0) 3870 return; 3871 d = (a > b) ? (a - b) : (b - a); 3872 if (d > (a>>3) || d > (b>>3)) 3873 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 3874 (unsigned long long)a, (unsigned long long)b); 3875 } 3876 3877 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 3878 { 3879 struct drbd_peer_device *peer_device; 3880 struct drbd_device *device; 3881 struct p_sizes *p = pi->data; 3882 struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL; 3883 enum determine_dev_size dd = DS_UNCHANGED; 3884 sector_t p_size, p_usize, p_csize, my_usize; 3885 sector_t new_size, cur_size; 3886 int ldsc = 0; /* local disk size changed */ 3887 enum dds_flags ddsf; 3888 3889 peer_device = conn_peer_device(connection, pi->vnr); 3890 if (!peer_device) 3891 return config_unknown_volume(connection, pi); 3892 device = peer_device->device; 3893 cur_size = get_capacity(device->vdisk); 3894 3895 p_size = be64_to_cpu(p->d_size); 3896 p_usize = be64_to_cpu(p->u_size); 3897 p_csize = be64_to_cpu(p->c_size); 3898 3899 /* just store the peer's disk size for now. 3900 * we still need to figure out whether we accept that. */ 3901 device->p_size = p_size; 3902 3903 if (get_ldev(device)) { 3904 rcu_read_lock(); 3905 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 3906 rcu_read_unlock(); 3907 3908 warn_if_differ_considerably(device, "lower level device sizes", 3909 p_size, drbd_get_max_capacity(device->ldev)); 3910 warn_if_differ_considerably(device, "user requested size", 3911 p_usize, my_usize); 3912 3913 /* if this is the first connect, or an otherwise expected 3914 * param exchange, choose the minimum */ 3915 if (device->state.conn == C_WF_REPORT_PARAMS) 3916 p_usize = min_not_zero(my_usize, p_usize); 3917 3918 /* Never shrink a device with usable data during connect, 3919 * or "attach" on the peer. 3920 * But allow online shrinking if we are connected. */ 3921 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0); 3922 if (new_size < cur_size && 3923 device->state.disk >= D_OUTDATED && 3924 (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) { 3925 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n", 3926 (unsigned long long)new_size, (unsigned long long)cur_size); 3927 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3928 put_ldev(device); 3929 return -EIO; 3930 } 3931 3932 if (my_usize != p_usize) { 3933 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 3934 3935 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3936 if (!new_disk_conf) { 3937 put_ldev(device); 3938 return -ENOMEM; 3939 } 3940 3941 mutex_lock(&connection->resource->conf_update); 3942 old_disk_conf = device->ldev->disk_conf; 3943 *new_disk_conf = *old_disk_conf; 3944 new_disk_conf->disk_size = p_usize; 3945 3946 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3947 mutex_unlock(&connection->resource->conf_update); 3948 kvfree_rcu_mightsleep(old_disk_conf); 3949 3950 drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n", 3951 (unsigned long)p_usize, (unsigned long)my_usize); 3952 } 3953 3954 put_ldev(device); 3955 } 3956 3957 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3958 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size(). 3959 In case we cleared the QUEUE_FLAG_DISCARD from our queue in 3960 drbd_reconsider_queue_parameters(), we can be sure that after 3961 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */ 3962 3963 ddsf = be16_to_cpu(p->dds_flags); 3964 if (get_ldev(device)) { 3965 drbd_reconsider_queue_parameters(device, device->ldev, o); 3966 dd = drbd_determine_dev_size(device, ddsf, NULL); 3967 put_ldev(device); 3968 if (dd == DS_ERROR) 3969 return -EIO; 3970 drbd_md_sync(device); 3971 } else { 3972 /* 3973 * I am diskless, need to accept the peer's *current* size. 3974 * I must NOT accept the peers backing disk size, 3975 * it may have been larger than mine all along... 3976 * 3977 * At this point, the peer knows more about my disk, or at 3978 * least about what we last agreed upon, than myself. 3979 * So if his c_size is less than his d_size, the most likely 3980 * reason is that *my* d_size was smaller last time we checked. 3981 * 3982 * However, if he sends a zero current size, 3983 * take his (user-capped or) backing disk size anyways. 3984 * 3985 * Unless of course he does not have a disk himself. 3986 * In which case we ignore this completely. 3987 */ 3988 sector_t new_size = p_csize ?: p_usize ?: p_size; 3989 drbd_reconsider_queue_parameters(device, NULL, o); 3990 if (new_size == 0) { 3991 /* Ignore, peer does not know nothing. */ 3992 } else if (new_size == cur_size) { 3993 /* nothing to do */ 3994 } else if (cur_size != 0 && p_size == 0) { 3995 drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n", 3996 (unsigned long long)new_size, (unsigned long long)cur_size); 3997 } else if (new_size < cur_size && device->state.role == R_PRIMARY) { 3998 drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n", 3999 (unsigned long long)new_size, (unsigned long long)cur_size); 4000 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4001 return -EIO; 4002 } else { 4003 /* I believe the peer, if 4004 * - I don't have a current size myself 4005 * - we agree on the size anyways 4006 * - I do have a current size, am Secondary, 4007 * and he has the only disk 4008 * - I do have a current size, am Primary, 4009 * and he has the only disk, 4010 * which is larger than my current size 4011 */ 4012 drbd_set_my_capacity(device, new_size); 4013 } 4014 } 4015 4016 if (get_ldev(device)) { 4017 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 4018 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 4019 ldsc = 1; 4020 } 4021 4022 put_ldev(device); 4023 } 4024 4025 if (device->state.conn > C_WF_REPORT_PARAMS) { 4026 if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) || 4027 ldsc) { 4028 /* we have different sizes, probably peer 4029 * needs to know my new size... */ 4030 drbd_send_sizes(peer_device, 0, ddsf); 4031 } 4032 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 4033 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 4034 if (device->state.pdsk >= D_INCONSISTENT && 4035 device->state.disk >= D_INCONSISTENT) { 4036 if (ddsf & DDSF_NO_RESYNC) 4037 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 4038 else 4039 resync_after_online_grow(device); 4040 } else 4041 set_bit(RESYNC_AFTER_NEG, &device->flags); 4042 } 4043 } 4044 4045 return 0; 4046 } 4047 4048 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 4049 { 4050 struct drbd_peer_device *peer_device; 4051 struct drbd_device *device; 4052 struct p_uuids *p = pi->data; 4053 u64 *p_uuid; 4054 int i, updated_uuids = 0; 4055 4056 peer_device = conn_peer_device(connection, pi->vnr); 4057 if (!peer_device) 4058 return config_unknown_volume(connection, pi); 4059 device = peer_device->device; 4060 4061 p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO); 4062 if (!p_uuid) 4063 return false; 4064 4065 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 4066 p_uuid[i] = be64_to_cpu(p->uuid[i]); 4067 4068 kfree(device->p_uuid); 4069 device->p_uuid = p_uuid; 4070 4071 if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) && 4072 device->state.disk < D_INCONSISTENT && 4073 device->state.role == R_PRIMARY && 4074 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 4075 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 4076 (unsigned long long)device->ed_uuid); 4077 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4078 return -EIO; 4079 } 4080 4081 if (get_ldev(device)) { 4082 int skip_initial_sync = 4083 device->state.conn == C_CONNECTED && 4084 peer_device->connection->agreed_pro_version >= 90 && 4085 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 4086 (p_uuid[UI_FLAGS] & 8); 4087 if (skip_initial_sync) { 4088 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 4089 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 4090 "clear_n_write from receive_uuids", 4091 BM_LOCKED_TEST_ALLOWED, NULL); 4092 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 4093 _drbd_uuid_set(device, UI_BITMAP, 0); 4094 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 4095 CS_VERBOSE, NULL); 4096 drbd_md_sync(device); 4097 updated_uuids = 1; 4098 } 4099 put_ldev(device); 4100 } else if (device->state.disk < D_INCONSISTENT && 4101 device->state.role == R_PRIMARY) { 4102 /* I am a diskless primary, the peer just created a new current UUID 4103 for me. */ 4104 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 4105 } 4106 4107 /* Before we test for the disk state, we should wait until an eventually 4108 ongoing cluster wide state change is finished. That is important if 4109 we are primary and are detaching from our disk. We need to see the 4110 new disk state... */ 4111 mutex_lock(device->state_mutex); 4112 mutex_unlock(device->state_mutex); 4113 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 4114 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 4115 4116 if (updated_uuids) 4117 drbd_print_uuids(device, "receiver updated UUIDs to"); 4118 4119 return 0; 4120 } 4121 4122 /** 4123 * convert_state() - Converts the peer's view of the cluster state to our point of view 4124 * @ps: The state as seen by the peer. 4125 */ 4126 static union drbd_state convert_state(union drbd_state ps) 4127 { 4128 union drbd_state ms; 4129 4130 static enum drbd_conns c_tab[] = { 4131 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 4132 [C_CONNECTED] = C_CONNECTED, 4133 4134 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 4135 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 4136 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 4137 [C_VERIFY_S] = C_VERIFY_T, 4138 [C_MASK] = C_MASK, 4139 }; 4140 4141 ms.i = ps.i; 4142 4143 ms.conn = c_tab[ps.conn]; 4144 ms.peer = ps.role; 4145 ms.role = ps.peer; 4146 ms.pdsk = ps.disk; 4147 ms.disk = ps.pdsk; 4148 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 4149 4150 return ms; 4151 } 4152 4153 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 4154 { 4155 struct drbd_peer_device *peer_device; 4156 struct drbd_device *device; 4157 struct p_req_state *p = pi->data; 4158 union drbd_state mask, val; 4159 enum drbd_state_rv rv; 4160 4161 peer_device = conn_peer_device(connection, pi->vnr); 4162 if (!peer_device) 4163 return -EIO; 4164 device = peer_device->device; 4165 4166 mask.i = be32_to_cpu(p->mask); 4167 val.i = be32_to_cpu(p->val); 4168 4169 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 4170 mutex_is_locked(device->state_mutex)) { 4171 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 4172 return 0; 4173 } 4174 4175 mask = convert_state(mask); 4176 val = convert_state(val); 4177 4178 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 4179 drbd_send_sr_reply(peer_device, rv); 4180 4181 drbd_md_sync(device); 4182 4183 return 0; 4184 } 4185 4186 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 4187 { 4188 struct p_req_state *p = pi->data; 4189 union drbd_state mask, val; 4190 enum drbd_state_rv rv; 4191 4192 mask.i = be32_to_cpu(p->mask); 4193 val.i = be32_to_cpu(p->val); 4194 4195 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 4196 mutex_is_locked(&connection->cstate_mutex)) { 4197 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 4198 return 0; 4199 } 4200 4201 mask = convert_state(mask); 4202 val = convert_state(val); 4203 4204 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 4205 conn_send_sr_reply(connection, rv); 4206 4207 return 0; 4208 } 4209 4210 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 4211 { 4212 struct drbd_peer_device *peer_device; 4213 struct drbd_device *device; 4214 struct p_state *p = pi->data; 4215 union drbd_state os, ns, peer_state; 4216 enum drbd_disk_state real_peer_disk; 4217 enum chg_state_flags cs_flags; 4218 int rv; 4219 4220 peer_device = conn_peer_device(connection, pi->vnr); 4221 if (!peer_device) 4222 return config_unknown_volume(connection, pi); 4223 device = peer_device->device; 4224 4225 peer_state.i = be32_to_cpu(p->state); 4226 4227 real_peer_disk = peer_state.disk; 4228 if (peer_state.disk == D_NEGOTIATING) { 4229 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 4230 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 4231 } 4232 4233 spin_lock_irq(&device->resource->req_lock); 4234 retry: 4235 os = ns = drbd_read_state(device); 4236 spin_unlock_irq(&device->resource->req_lock); 4237 4238 /* If some other part of the code (ack_receiver thread, timeout) 4239 * already decided to close the connection again, 4240 * we must not "re-establish" it here. */ 4241 if (os.conn <= C_TEAR_DOWN) 4242 return -ECONNRESET; 4243 4244 /* If this is the "end of sync" confirmation, usually the peer disk 4245 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 4246 * set) resync started in PausedSyncT, or if the timing of pause-/ 4247 * unpause-sync events has been "just right", the peer disk may 4248 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 4249 */ 4250 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 4251 real_peer_disk == D_UP_TO_DATE && 4252 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 4253 /* If we are (becoming) SyncSource, but peer is still in sync 4254 * preparation, ignore its uptodate-ness to avoid flapping, it 4255 * will change to inconsistent once the peer reaches active 4256 * syncing states. 4257 * It may have changed syncer-paused flags, however, so we 4258 * cannot ignore this completely. */ 4259 if (peer_state.conn > C_CONNECTED && 4260 peer_state.conn < C_SYNC_SOURCE) 4261 real_peer_disk = D_INCONSISTENT; 4262 4263 /* if peer_state changes to connected at the same time, 4264 * it explicitly notifies us that it finished resync. 4265 * Maybe we should finish it up, too? */ 4266 else if (os.conn >= C_SYNC_SOURCE && 4267 peer_state.conn == C_CONNECTED) { 4268 if (drbd_bm_total_weight(device) <= device->rs_failed) 4269 drbd_resync_finished(peer_device); 4270 return 0; 4271 } 4272 } 4273 4274 /* explicit verify finished notification, stop sector reached. */ 4275 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 4276 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 4277 ov_out_of_sync_print(peer_device); 4278 drbd_resync_finished(peer_device); 4279 return 0; 4280 } 4281 4282 /* peer says his disk is inconsistent, while we think it is uptodate, 4283 * and this happens while the peer still thinks we have a sync going on, 4284 * but we think we are already done with the sync. 4285 * We ignore this to avoid flapping pdsk. 4286 * This should not happen, if the peer is a recent version of drbd. */ 4287 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 4288 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 4289 real_peer_disk = D_UP_TO_DATE; 4290 4291 if (ns.conn == C_WF_REPORT_PARAMS) 4292 ns.conn = C_CONNECTED; 4293 4294 if (peer_state.conn == C_AHEAD) 4295 ns.conn = C_BEHIND; 4296 4297 /* TODO: 4298 * if (primary and diskless and peer uuid != effective uuid) 4299 * abort attach on peer; 4300 * 4301 * If this node does not have good data, was already connected, but 4302 * the peer did a late attach only now, trying to "negotiate" with me, 4303 * AND I am currently Primary, possibly frozen, with some specific 4304 * "effective" uuid, this should never be reached, really, because 4305 * we first send the uuids, then the current state. 4306 * 4307 * In this scenario, we already dropped the connection hard 4308 * when we received the unsuitable uuids (receive_uuids(). 4309 * 4310 * Should we want to change this, that is: not drop the connection in 4311 * receive_uuids() already, then we would need to add a branch here 4312 * that aborts the attach of "unsuitable uuids" on the peer in case 4313 * this node is currently Diskless Primary. 4314 */ 4315 4316 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 4317 get_ldev_if_state(device, D_NEGOTIATING)) { 4318 int cr; /* consider resync */ 4319 4320 /* if we established a new connection */ 4321 cr = (os.conn < C_CONNECTED); 4322 /* if we had an established connection 4323 * and one of the nodes newly attaches a disk */ 4324 cr |= (os.conn == C_CONNECTED && 4325 (peer_state.disk == D_NEGOTIATING || 4326 os.disk == D_NEGOTIATING)); 4327 /* if we have both been inconsistent, and the peer has been 4328 * forced to be UpToDate with --force */ 4329 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 4330 /* if we had been plain connected, and the admin requested to 4331 * start a sync by "invalidate" or "invalidate-remote" */ 4332 cr |= (os.conn == C_CONNECTED && 4333 (peer_state.conn >= C_STARTING_SYNC_S && 4334 peer_state.conn <= C_WF_BITMAP_T)); 4335 4336 if (cr) 4337 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 4338 4339 put_ldev(device); 4340 if (ns.conn == C_MASK) { 4341 ns.conn = C_CONNECTED; 4342 if (device->state.disk == D_NEGOTIATING) { 4343 drbd_force_state(device, NS(disk, D_FAILED)); 4344 } else if (peer_state.disk == D_NEGOTIATING) { 4345 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 4346 peer_state.disk = D_DISKLESS; 4347 real_peer_disk = D_DISKLESS; 4348 } else { 4349 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 4350 return -EIO; 4351 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 4352 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4353 return -EIO; 4354 } 4355 } 4356 } 4357 4358 spin_lock_irq(&device->resource->req_lock); 4359 if (os.i != drbd_read_state(device).i) 4360 goto retry; 4361 clear_bit(CONSIDER_RESYNC, &device->flags); 4362 ns.peer = peer_state.role; 4363 ns.pdsk = real_peer_disk; 4364 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 4365 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 4366 ns.disk = device->new_state_tmp.disk; 4367 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4368 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4369 test_bit(NEW_CUR_UUID, &device->flags)) { 4370 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4371 for temporal network outages! */ 4372 spin_unlock_irq(&device->resource->req_lock); 4373 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4374 tl_clear(peer_device->connection); 4375 drbd_uuid_new_current(device); 4376 clear_bit(NEW_CUR_UUID, &device->flags); 4377 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4378 return -EIO; 4379 } 4380 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4381 ns = drbd_read_state(device); 4382 spin_unlock_irq(&device->resource->req_lock); 4383 4384 if (rv < SS_SUCCESS) { 4385 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4386 return -EIO; 4387 } 4388 4389 if (os.conn > C_WF_REPORT_PARAMS) { 4390 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4391 peer_state.disk != D_NEGOTIATING ) { 4392 /* we want resync, peer has not yet decided to sync... */ 4393 /* Nowadays only used when forcing a node into primary role and 4394 setting its disk to UpToDate with that */ 4395 drbd_send_uuids(peer_device); 4396 drbd_send_current_state(peer_device); 4397 } 4398 } 4399 4400 clear_bit(DISCARD_MY_DATA, &device->flags); 4401 4402 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4403 4404 return 0; 4405 } 4406 4407 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4408 { 4409 struct drbd_peer_device *peer_device; 4410 struct drbd_device *device; 4411 struct p_rs_uuid *p = pi->data; 4412 4413 peer_device = conn_peer_device(connection, pi->vnr); 4414 if (!peer_device) 4415 return -EIO; 4416 device = peer_device->device; 4417 4418 wait_event(device->misc_wait, 4419 device->state.conn == C_WF_SYNC_UUID || 4420 device->state.conn == C_BEHIND || 4421 device->state.conn < C_CONNECTED || 4422 device->state.disk < D_NEGOTIATING); 4423 4424 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4425 4426 /* Here the _drbd_uuid_ functions are right, current should 4427 _not_ be rotated into the history */ 4428 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4429 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4430 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4431 4432 drbd_print_uuids(device, "updated sync uuid"); 4433 drbd_start_resync(device, C_SYNC_TARGET); 4434 4435 put_ldev(device); 4436 } else 4437 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4438 4439 return 0; 4440 } 4441 4442 /* 4443 * receive_bitmap_plain 4444 * 4445 * Return 0 when done, 1 when another iteration is needed, and a negative error 4446 * code upon failure. 4447 */ 4448 static int 4449 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4450 unsigned long *p, struct bm_xfer_ctx *c) 4451 { 4452 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4453 drbd_header_size(peer_device->connection); 4454 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4455 c->bm_words - c->word_offset); 4456 unsigned int want = num_words * sizeof(*p); 4457 int err; 4458 4459 if (want != size) { 4460 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4461 return -EIO; 4462 } 4463 if (want == 0) 4464 return 0; 4465 err = drbd_recv_all(peer_device->connection, p, want); 4466 if (err) 4467 return err; 4468 4469 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4470 4471 c->word_offset += num_words; 4472 c->bit_offset = c->word_offset * BITS_PER_LONG; 4473 if (c->bit_offset > c->bm_bits) 4474 c->bit_offset = c->bm_bits; 4475 4476 return 1; 4477 } 4478 4479 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4480 { 4481 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4482 } 4483 4484 static int dcbp_get_start(struct p_compressed_bm *p) 4485 { 4486 return (p->encoding & 0x80) != 0; 4487 } 4488 4489 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4490 { 4491 return (p->encoding >> 4) & 0x7; 4492 } 4493 4494 /* 4495 * recv_bm_rle_bits 4496 * 4497 * Return 0 when done, 1 when another iteration is needed, and a negative error 4498 * code upon failure. 4499 */ 4500 static int 4501 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4502 struct p_compressed_bm *p, 4503 struct bm_xfer_ctx *c, 4504 unsigned int len) 4505 { 4506 struct bitstream bs; 4507 u64 look_ahead; 4508 u64 rl; 4509 u64 tmp; 4510 unsigned long s = c->bit_offset; 4511 unsigned long e; 4512 int toggle = dcbp_get_start(p); 4513 int have; 4514 int bits; 4515 4516 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4517 4518 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4519 if (bits < 0) 4520 return -EIO; 4521 4522 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4523 bits = vli_decode_bits(&rl, look_ahead); 4524 if (bits <= 0) 4525 return -EIO; 4526 4527 if (toggle) { 4528 e = s + rl -1; 4529 if (e >= c->bm_bits) { 4530 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4531 return -EIO; 4532 } 4533 _drbd_bm_set_bits(peer_device->device, s, e); 4534 } 4535 4536 if (have < bits) { 4537 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4538 have, bits, look_ahead, 4539 (unsigned int)(bs.cur.b - p->code), 4540 (unsigned int)bs.buf_len); 4541 return -EIO; 4542 } 4543 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4544 if (likely(bits < 64)) 4545 look_ahead >>= bits; 4546 else 4547 look_ahead = 0; 4548 have -= bits; 4549 4550 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4551 if (bits < 0) 4552 return -EIO; 4553 look_ahead |= tmp << have; 4554 have += bits; 4555 } 4556 4557 c->bit_offset = s; 4558 bm_xfer_ctx_bit_to_word_offset(c); 4559 4560 return (s != c->bm_bits); 4561 } 4562 4563 /* 4564 * decode_bitmap_c 4565 * 4566 * Return 0 when done, 1 when another iteration is needed, and a negative error 4567 * code upon failure. 4568 */ 4569 static int 4570 decode_bitmap_c(struct drbd_peer_device *peer_device, 4571 struct p_compressed_bm *p, 4572 struct bm_xfer_ctx *c, 4573 unsigned int len) 4574 { 4575 if (dcbp_get_code(p) == RLE_VLI_Bits) 4576 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4577 4578 /* other variants had been implemented for evaluation, 4579 * but have been dropped as this one turned out to be "best" 4580 * during all our tests. */ 4581 4582 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4583 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4584 return -EIO; 4585 } 4586 4587 void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device, 4588 const char *direction, struct bm_xfer_ctx *c) 4589 { 4590 /* what would it take to transfer it "plaintext" */ 4591 unsigned int header_size = drbd_header_size(peer_device->connection); 4592 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4593 unsigned int plain = 4594 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4595 c->bm_words * sizeof(unsigned long); 4596 unsigned int total = c->bytes[0] + c->bytes[1]; 4597 unsigned int r; 4598 4599 /* total can not be zero. but just in case: */ 4600 if (total == 0) 4601 return; 4602 4603 /* don't report if not compressed */ 4604 if (total >= plain) 4605 return; 4606 4607 /* total < plain. check for overflow, still */ 4608 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4609 : (1000 * total / plain); 4610 4611 if (r > 1000) 4612 r = 1000; 4613 4614 r = 1000 - r; 4615 drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4616 "total %u; compression: %u.%u%%\n", 4617 direction, 4618 c->bytes[1], c->packets[1], 4619 c->bytes[0], c->packets[0], 4620 total, r/10, r % 10); 4621 } 4622 4623 /* Since we are processing the bitfield from lower addresses to higher, 4624 it does not matter if the process it in 32 bit chunks or 64 bit 4625 chunks as long as it is little endian. (Understand it as byte stream, 4626 beginning with the lowest byte...) If we would use big endian 4627 we would need to process it from the highest address to the lowest, 4628 in order to be agnostic to the 32 vs 64 bits issue. 4629 4630 returns 0 on failure, 1 if we successfully received it. */ 4631 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4632 { 4633 struct drbd_peer_device *peer_device; 4634 struct drbd_device *device; 4635 struct bm_xfer_ctx c; 4636 int err; 4637 4638 peer_device = conn_peer_device(connection, pi->vnr); 4639 if (!peer_device) 4640 return -EIO; 4641 device = peer_device->device; 4642 4643 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4644 /* you are supposed to send additional out-of-sync information 4645 * if you actually set bits during this phase */ 4646 4647 c = (struct bm_xfer_ctx) { 4648 .bm_bits = drbd_bm_bits(device), 4649 .bm_words = drbd_bm_words(device), 4650 }; 4651 4652 for(;;) { 4653 if (pi->cmd == P_BITMAP) 4654 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4655 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4656 /* MAYBE: sanity check that we speak proto >= 90, 4657 * and the feature is enabled! */ 4658 struct p_compressed_bm *p = pi->data; 4659 4660 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4661 drbd_err(device, "ReportCBitmap packet too large\n"); 4662 err = -EIO; 4663 goto out; 4664 } 4665 if (pi->size <= sizeof(*p)) { 4666 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4667 err = -EIO; 4668 goto out; 4669 } 4670 err = drbd_recv_all(peer_device->connection, p, pi->size); 4671 if (err) 4672 goto out; 4673 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4674 } else { 4675 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4676 err = -EIO; 4677 goto out; 4678 } 4679 4680 c.packets[pi->cmd == P_BITMAP]++; 4681 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4682 4683 if (err <= 0) { 4684 if (err < 0) 4685 goto out; 4686 break; 4687 } 4688 err = drbd_recv_header(peer_device->connection, pi); 4689 if (err) 4690 goto out; 4691 } 4692 4693 INFO_bm_xfer_stats(peer_device, "receive", &c); 4694 4695 if (device->state.conn == C_WF_BITMAP_T) { 4696 enum drbd_state_rv rv; 4697 4698 err = drbd_send_bitmap(device, peer_device); 4699 if (err) 4700 goto out; 4701 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4702 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4703 D_ASSERT(device, rv == SS_SUCCESS); 4704 } else if (device->state.conn != C_WF_BITMAP_S) { 4705 /* admin may have requested C_DISCONNECTING, 4706 * other threads may have noticed network errors */ 4707 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4708 drbd_conn_str(device->state.conn)); 4709 } 4710 err = 0; 4711 4712 out: 4713 drbd_bm_unlock(device); 4714 if (!err && device->state.conn == C_WF_BITMAP_S) 4715 drbd_start_resync(device, C_SYNC_SOURCE); 4716 return err; 4717 } 4718 4719 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4720 { 4721 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4722 pi->cmd, pi->size); 4723 4724 return ignore_remaining_packet(connection, pi); 4725 } 4726 4727 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4728 { 4729 /* Make sure we've acked all the TCP data associated 4730 * with the data requests being unplugged */ 4731 tcp_sock_set_quickack(connection->data.socket->sk, 2); 4732 return 0; 4733 } 4734 4735 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4736 { 4737 struct drbd_peer_device *peer_device; 4738 struct drbd_device *device; 4739 struct p_block_desc *p = pi->data; 4740 4741 peer_device = conn_peer_device(connection, pi->vnr); 4742 if (!peer_device) 4743 return -EIO; 4744 device = peer_device->device; 4745 4746 switch (device->state.conn) { 4747 case C_WF_SYNC_UUID: 4748 case C_WF_BITMAP_T: 4749 case C_BEHIND: 4750 break; 4751 default: 4752 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4753 drbd_conn_str(device->state.conn)); 4754 } 4755 4756 drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4757 4758 return 0; 4759 } 4760 4761 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi) 4762 { 4763 struct drbd_peer_device *peer_device; 4764 struct p_block_desc *p = pi->data; 4765 struct drbd_device *device; 4766 sector_t sector; 4767 int size, err = 0; 4768 4769 peer_device = conn_peer_device(connection, pi->vnr); 4770 if (!peer_device) 4771 return -EIO; 4772 device = peer_device->device; 4773 4774 sector = be64_to_cpu(p->sector); 4775 size = be32_to_cpu(p->blksize); 4776 4777 dec_rs_pending(peer_device); 4778 4779 if (get_ldev(device)) { 4780 struct drbd_peer_request *peer_req; 4781 4782 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector, 4783 size, 0, GFP_NOIO); 4784 if (!peer_req) { 4785 put_ldev(device); 4786 return -ENOMEM; 4787 } 4788 4789 peer_req->w.cb = e_end_resync_block; 4790 peer_req->opf = REQ_OP_DISCARD; 4791 peer_req->submit_jif = jiffies; 4792 peer_req->flags |= EE_TRIM; 4793 4794 spin_lock_irq(&device->resource->req_lock); 4795 list_add_tail(&peer_req->w.list, &device->sync_ee); 4796 spin_unlock_irq(&device->resource->req_lock); 4797 4798 atomic_add(pi->size >> 9, &device->rs_sect_ev); 4799 err = drbd_submit_peer_request(peer_req); 4800 4801 if (err) { 4802 spin_lock_irq(&device->resource->req_lock); 4803 list_del(&peer_req->w.list); 4804 spin_unlock_irq(&device->resource->req_lock); 4805 4806 drbd_free_peer_req(device, peer_req); 4807 put_ldev(device); 4808 err = 0; 4809 goto fail; 4810 } 4811 4812 inc_unacked(device); 4813 4814 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(), 4815 as well as drbd_rs_complete_io() */ 4816 } else { 4817 fail: 4818 drbd_rs_complete_io(device, sector); 4819 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER); 4820 } 4821 4822 atomic_add(size >> 9, &device->rs_sect_in); 4823 4824 return err; 4825 } 4826 4827 struct data_cmd { 4828 int expect_payload; 4829 unsigned int pkt_size; 4830 int (*fn)(struct drbd_connection *, struct packet_info *); 4831 }; 4832 4833 static struct data_cmd drbd_cmd_handler[] = { 4834 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4835 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4836 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4837 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4838 [P_BITMAP] = { 1, 0, receive_bitmap } , 4839 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4840 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4841 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4842 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4843 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4844 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4845 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4846 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4847 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4848 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4849 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4850 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4851 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4852 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4853 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4854 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4855 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4856 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4857 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4858 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4859 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data }, 4860 [P_ZEROES] = { 0, sizeof(struct p_trim), receive_Data }, 4861 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated }, 4862 }; 4863 4864 static void drbdd(struct drbd_connection *connection) 4865 { 4866 struct packet_info pi; 4867 size_t shs; /* sub header size */ 4868 int err; 4869 4870 while (get_t_state(&connection->receiver) == RUNNING) { 4871 struct data_cmd const *cmd; 4872 4873 drbd_thread_current_set_cpu(&connection->receiver); 4874 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug); 4875 if (drbd_recv_header_maybe_unplug(connection, &pi)) 4876 goto err_out; 4877 4878 cmd = &drbd_cmd_handler[pi.cmd]; 4879 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4880 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4881 cmdname(pi.cmd), pi.cmd); 4882 goto err_out; 4883 } 4884 4885 shs = cmd->pkt_size; 4886 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME) 4887 shs += sizeof(struct o_qlim); 4888 if (pi.size > shs && !cmd->expect_payload) { 4889 drbd_err(connection, "No payload expected %s l:%d\n", 4890 cmdname(pi.cmd), pi.size); 4891 goto err_out; 4892 } 4893 if (pi.size < shs) { 4894 drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n", 4895 cmdname(pi.cmd), (int)shs, pi.size); 4896 goto err_out; 4897 } 4898 4899 if (shs) { 4900 update_receiver_timing_details(connection, drbd_recv_all_warn); 4901 err = drbd_recv_all_warn(connection, pi.data, shs); 4902 if (err) 4903 goto err_out; 4904 pi.size -= shs; 4905 } 4906 4907 update_receiver_timing_details(connection, cmd->fn); 4908 err = cmd->fn(connection, &pi); 4909 if (err) { 4910 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 4911 cmdname(pi.cmd), err, pi.size); 4912 goto err_out; 4913 } 4914 } 4915 return; 4916 4917 err_out: 4918 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4919 } 4920 4921 static void conn_disconnect(struct drbd_connection *connection) 4922 { 4923 struct drbd_peer_device *peer_device; 4924 enum drbd_conns oc; 4925 int vnr; 4926 4927 if (connection->cstate == C_STANDALONE) 4928 return; 4929 4930 /* We are about to start the cleanup after connection loss. 4931 * Make sure drbd_make_request knows about that. 4932 * Usually we should be in some network failure state already, 4933 * but just in case we are not, we fix it up here. 4934 */ 4935 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 4936 4937 /* ack_receiver does not clean up anything. it must not interfere, either */ 4938 drbd_thread_stop(&connection->ack_receiver); 4939 if (connection->ack_sender) { 4940 destroy_workqueue(connection->ack_sender); 4941 connection->ack_sender = NULL; 4942 } 4943 drbd_free_sock(connection); 4944 4945 rcu_read_lock(); 4946 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 4947 struct drbd_device *device = peer_device->device; 4948 kref_get(&device->kref); 4949 rcu_read_unlock(); 4950 drbd_disconnected(peer_device); 4951 kref_put(&device->kref, drbd_destroy_device); 4952 rcu_read_lock(); 4953 } 4954 rcu_read_unlock(); 4955 4956 if (!list_empty(&connection->current_epoch->list)) 4957 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 4958 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4959 atomic_set(&connection->current_epoch->epoch_size, 0); 4960 connection->send.seen_any_write_yet = false; 4961 4962 drbd_info(connection, "Connection closed\n"); 4963 4964 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 4965 conn_try_outdate_peer_async(connection); 4966 4967 spin_lock_irq(&connection->resource->req_lock); 4968 oc = connection->cstate; 4969 if (oc >= C_UNCONNECTED) 4970 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 4971 4972 spin_unlock_irq(&connection->resource->req_lock); 4973 4974 if (oc == C_DISCONNECTING) 4975 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 4976 } 4977 4978 static int drbd_disconnected(struct drbd_peer_device *peer_device) 4979 { 4980 struct drbd_device *device = peer_device->device; 4981 unsigned int i; 4982 4983 /* wait for current activity to cease. */ 4984 spin_lock_irq(&device->resource->req_lock); 4985 _drbd_wait_ee_list_empty(device, &device->active_ee); 4986 _drbd_wait_ee_list_empty(device, &device->sync_ee); 4987 _drbd_wait_ee_list_empty(device, &device->read_ee); 4988 spin_unlock_irq(&device->resource->req_lock); 4989 4990 /* We do not have data structures that would allow us to 4991 * get the rs_pending_cnt down to 0 again. 4992 * * On C_SYNC_TARGET we do not have any data structures describing 4993 * the pending RSDataRequest's we have sent. 4994 * * On C_SYNC_SOURCE there is no data structure that tracks 4995 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 4996 * And no, it is not the sum of the reference counts in the 4997 * resync_LRU. The resync_LRU tracks the whole operation including 4998 * the disk-IO, while the rs_pending_cnt only tracks the blocks 4999 * on the fly. */ 5000 drbd_rs_cancel_all(device); 5001 device->rs_total = 0; 5002 device->rs_failed = 0; 5003 atomic_set(&device->rs_pending_cnt, 0); 5004 wake_up(&device->misc_wait); 5005 5006 timer_delete_sync(&device->resync_timer); 5007 resync_timer_fn(&device->resync_timer); 5008 5009 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 5010 * w_make_resync_request etc. which may still be on the worker queue 5011 * to be "canceled" */ 5012 drbd_flush_workqueue(&peer_device->connection->sender_work); 5013 5014 drbd_finish_peer_reqs(device); 5015 5016 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 5017 might have issued a work again. The one before drbd_finish_peer_reqs() is 5018 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 5019 drbd_flush_workqueue(&peer_device->connection->sender_work); 5020 5021 /* need to do it again, drbd_finish_peer_reqs() may have populated it 5022 * again via drbd_try_clear_on_disk_bm(). */ 5023 drbd_rs_cancel_all(device); 5024 5025 kfree(device->p_uuid); 5026 device->p_uuid = NULL; 5027 5028 if (!drbd_suspended(device)) 5029 tl_clear(peer_device->connection); 5030 5031 drbd_md_sync(device); 5032 5033 if (get_ldev(device)) { 5034 drbd_bitmap_io(device, &drbd_bm_write_copy_pages, 5035 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL); 5036 put_ldev(device); 5037 } 5038 5039 i = atomic_read(&device->pp_in_use_by_net); 5040 if (i) 5041 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 5042 i = atomic_read(&device->pp_in_use); 5043 if (i) 5044 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 5045 5046 D_ASSERT(device, list_empty(&device->read_ee)); 5047 D_ASSERT(device, list_empty(&device->active_ee)); 5048 D_ASSERT(device, list_empty(&device->sync_ee)); 5049 D_ASSERT(device, list_empty(&device->done_ee)); 5050 5051 return 0; 5052 } 5053 5054 /* 5055 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 5056 * we can agree on is stored in agreed_pro_version. 5057 * 5058 * feature flags and the reserved array should be enough room for future 5059 * enhancements of the handshake protocol, and possible plugins... 5060 * 5061 * for now, they are expected to be zero, but ignored. 5062 */ 5063 static int drbd_send_features(struct drbd_connection *connection) 5064 { 5065 struct drbd_socket *sock; 5066 struct p_connection_features *p; 5067 5068 sock = &connection->data; 5069 p = conn_prepare_command(connection, sock); 5070 if (!p) 5071 return -EIO; 5072 memset(p, 0, sizeof(*p)); 5073 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 5074 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 5075 p->feature_flags = cpu_to_be32(PRO_FEATURES); 5076 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 5077 } 5078 5079 /* 5080 * return values: 5081 * 1 yes, we have a valid connection 5082 * 0 oops, did not work out, please try again 5083 * -1 peer talks different language, 5084 * no point in trying again, please go standalone. 5085 */ 5086 static int drbd_do_features(struct drbd_connection *connection) 5087 { 5088 /* ASSERT current == connection->receiver ... */ 5089 struct p_connection_features *p; 5090 const int expect = sizeof(struct p_connection_features); 5091 struct packet_info pi; 5092 int err; 5093 5094 err = drbd_send_features(connection); 5095 if (err) 5096 return 0; 5097 5098 err = drbd_recv_header(connection, &pi); 5099 if (err) 5100 return 0; 5101 5102 if (pi.cmd != P_CONNECTION_FEATURES) { 5103 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 5104 cmdname(pi.cmd), pi.cmd); 5105 return -1; 5106 } 5107 5108 if (pi.size != expect) { 5109 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 5110 expect, pi.size); 5111 return -1; 5112 } 5113 5114 p = pi.data; 5115 err = drbd_recv_all_warn(connection, p, expect); 5116 if (err) 5117 return 0; 5118 5119 p->protocol_min = be32_to_cpu(p->protocol_min); 5120 p->protocol_max = be32_to_cpu(p->protocol_max); 5121 if (p->protocol_max == 0) 5122 p->protocol_max = p->protocol_min; 5123 5124 if (PRO_VERSION_MAX < p->protocol_min || 5125 PRO_VERSION_MIN > p->protocol_max) 5126 goto incompat; 5127 5128 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 5129 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags); 5130 5131 drbd_info(connection, "Handshake successful: " 5132 "Agreed network protocol version %d\n", connection->agreed_pro_version); 5133 5134 drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n", 5135 connection->agreed_features, 5136 connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "", 5137 connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "", 5138 connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "", 5139 connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" : 5140 connection->agreed_features ? "" : " none"); 5141 5142 return 1; 5143 5144 incompat: 5145 drbd_err(connection, "incompatible DRBD dialects: " 5146 "I support %d-%d, peer supports %d-%d\n", 5147 PRO_VERSION_MIN, PRO_VERSION_MAX, 5148 p->protocol_min, p->protocol_max); 5149 return -1; 5150 } 5151 5152 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 5153 static int drbd_do_auth(struct drbd_connection *connection) 5154 { 5155 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 5156 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 5157 return -1; 5158 } 5159 #else 5160 #define CHALLENGE_LEN 64 5161 5162 /* Return value: 5163 1 - auth succeeded, 5164 0 - failed, try again (network error), 5165 -1 - auth failed, don't try again. 5166 */ 5167 5168 static int drbd_do_auth(struct drbd_connection *connection) 5169 { 5170 struct drbd_socket *sock; 5171 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 5172 char *response = NULL; 5173 char *right_response = NULL; 5174 char *peers_ch = NULL; 5175 unsigned int key_len; 5176 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 5177 unsigned int resp_size; 5178 struct shash_desc *desc; 5179 struct packet_info pi; 5180 struct net_conf *nc; 5181 int err, rv; 5182 5183 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 5184 5185 rcu_read_lock(); 5186 nc = rcu_dereference(connection->net_conf); 5187 key_len = strlen(nc->shared_secret); 5188 memcpy(secret, nc->shared_secret, key_len); 5189 rcu_read_unlock(); 5190 5191 desc = kmalloc(sizeof(struct shash_desc) + 5192 crypto_shash_descsize(connection->cram_hmac_tfm), 5193 GFP_KERNEL); 5194 if (!desc) { 5195 rv = -1; 5196 goto fail; 5197 } 5198 desc->tfm = connection->cram_hmac_tfm; 5199 5200 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 5201 if (rv) { 5202 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv); 5203 rv = -1; 5204 goto fail; 5205 } 5206 5207 get_random_bytes(my_challenge, CHALLENGE_LEN); 5208 5209 sock = &connection->data; 5210 if (!conn_prepare_command(connection, sock)) { 5211 rv = 0; 5212 goto fail; 5213 } 5214 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 5215 my_challenge, CHALLENGE_LEN); 5216 if (!rv) 5217 goto fail; 5218 5219 err = drbd_recv_header(connection, &pi); 5220 if (err) { 5221 rv = 0; 5222 goto fail; 5223 } 5224 5225 if (pi.cmd != P_AUTH_CHALLENGE) { 5226 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 5227 cmdname(pi.cmd), pi.cmd); 5228 rv = -1; 5229 goto fail; 5230 } 5231 5232 if (pi.size > CHALLENGE_LEN * 2) { 5233 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 5234 rv = -1; 5235 goto fail; 5236 } 5237 5238 if (pi.size < CHALLENGE_LEN) { 5239 drbd_err(connection, "AuthChallenge payload too small.\n"); 5240 rv = -1; 5241 goto fail; 5242 } 5243 5244 peers_ch = kmalloc(pi.size, GFP_NOIO); 5245 if (!peers_ch) { 5246 rv = -1; 5247 goto fail; 5248 } 5249 5250 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 5251 if (err) { 5252 rv = 0; 5253 goto fail; 5254 } 5255 5256 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) { 5257 drbd_err(connection, "Peer presented the same challenge!\n"); 5258 rv = -1; 5259 goto fail; 5260 } 5261 5262 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm); 5263 response = kmalloc(resp_size, GFP_NOIO); 5264 if (!response) { 5265 rv = -1; 5266 goto fail; 5267 } 5268 5269 rv = crypto_shash_digest(desc, peers_ch, pi.size, response); 5270 if (rv) { 5271 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5272 rv = -1; 5273 goto fail; 5274 } 5275 5276 if (!conn_prepare_command(connection, sock)) { 5277 rv = 0; 5278 goto fail; 5279 } 5280 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 5281 response, resp_size); 5282 if (!rv) 5283 goto fail; 5284 5285 err = drbd_recv_header(connection, &pi); 5286 if (err) { 5287 rv = 0; 5288 goto fail; 5289 } 5290 5291 if (pi.cmd != P_AUTH_RESPONSE) { 5292 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 5293 cmdname(pi.cmd), pi.cmd); 5294 rv = 0; 5295 goto fail; 5296 } 5297 5298 if (pi.size != resp_size) { 5299 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 5300 rv = 0; 5301 goto fail; 5302 } 5303 5304 err = drbd_recv_all_warn(connection, response , resp_size); 5305 if (err) { 5306 rv = 0; 5307 goto fail; 5308 } 5309 5310 right_response = kmalloc(resp_size, GFP_NOIO); 5311 if (!right_response) { 5312 rv = -1; 5313 goto fail; 5314 } 5315 5316 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN, 5317 right_response); 5318 if (rv) { 5319 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5320 rv = -1; 5321 goto fail; 5322 } 5323 5324 rv = !memcmp(response, right_response, resp_size); 5325 5326 if (rv) 5327 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 5328 resp_size); 5329 else 5330 rv = -1; 5331 5332 fail: 5333 kfree(peers_ch); 5334 kfree(response); 5335 kfree(right_response); 5336 if (desc) { 5337 shash_desc_zero(desc); 5338 kfree(desc); 5339 } 5340 5341 return rv; 5342 } 5343 #endif 5344 5345 int drbd_receiver(struct drbd_thread *thi) 5346 { 5347 struct drbd_connection *connection = thi->connection; 5348 int h; 5349 5350 drbd_info(connection, "receiver (re)started\n"); 5351 5352 do { 5353 h = conn_connect(connection); 5354 if (h == 0) { 5355 conn_disconnect(connection); 5356 schedule_timeout_interruptible(HZ); 5357 } 5358 if (h == -1) { 5359 drbd_warn(connection, "Discarding network configuration.\n"); 5360 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5361 } 5362 } while (h == 0); 5363 5364 if (h > 0) { 5365 blk_start_plug(&connection->receiver_plug); 5366 drbdd(connection); 5367 blk_finish_plug(&connection->receiver_plug); 5368 } 5369 5370 conn_disconnect(connection); 5371 5372 drbd_info(connection, "receiver terminated\n"); 5373 return 0; 5374 } 5375 5376 /* ********* acknowledge sender ******** */ 5377 5378 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5379 { 5380 struct p_req_state_reply *p = pi->data; 5381 int retcode = be32_to_cpu(p->retcode); 5382 5383 if (retcode >= SS_SUCCESS) { 5384 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 5385 } else { 5386 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 5387 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 5388 drbd_set_st_err_str(retcode), retcode); 5389 } 5390 wake_up(&connection->ping_wait); 5391 5392 return 0; 5393 } 5394 5395 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5396 { 5397 struct drbd_peer_device *peer_device; 5398 struct drbd_device *device; 5399 struct p_req_state_reply *p = pi->data; 5400 int retcode = be32_to_cpu(p->retcode); 5401 5402 peer_device = conn_peer_device(connection, pi->vnr); 5403 if (!peer_device) 5404 return -EIO; 5405 device = peer_device->device; 5406 5407 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 5408 D_ASSERT(device, connection->agreed_pro_version < 100); 5409 return got_conn_RqSReply(connection, pi); 5410 } 5411 5412 if (retcode >= SS_SUCCESS) { 5413 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 5414 } else { 5415 set_bit(CL_ST_CHG_FAIL, &device->flags); 5416 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 5417 drbd_set_st_err_str(retcode), retcode); 5418 } 5419 wake_up(&device->state_wait); 5420 5421 return 0; 5422 } 5423 5424 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 5425 { 5426 return drbd_send_ping_ack(connection); 5427 5428 } 5429 5430 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 5431 { 5432 /* restore idle timeout */ 5433 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 5434 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 5435 wake_up(&connection->ping_wait); 5436 5437 return 0; 5438 } 5439 5440 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 5441 { 5442 struct drbd_peer_device *peer_device; 5443 struct drbd_device *device; 5444 struct p_block_ack *p = pi->data; 5445 sector_t sector = be64_to_cpu(p->sector); 5446 int blksize = be32_to_cpu(p->blksize); 5447 5448 peer_device = conn_peer_device(connection, pi->vnr); 5449 if (!peer_device) 5450 return -EIO; 5451 device = peer_device->device; 5452 5453 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 5454 5455 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5456 5457 if (get_ldev(device)) { 5458 drbd_rs_complete_io(device, sector); 5459 drbd_set_in_sync(peer_device, sector, blksize); 5460 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 5461 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 5462 put_ldev(device); 5463 } 5464 dec_rs_pending(peer_device); 5465 atomic_add(blksize >> 9, &device->rs_sect_in); 5466 5467 return 0; 5468 } 5469 5470 static int 5471 validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 5472 struct rb_root *root, const char *func, 5473 enum drbd_req_event what, bool missing_ok) 5474 { 5475 struct drbd_device *device = peer_device->device; 5476 struct drbd_request *req; 5477 struct bio_and_error m; 5478 5479 spin_lock_irq(&device->resource->req_lock); 5480 req = find_request(device, root, id, sector, missing_ok, func); 5481 if (unlikely(!req)) { 5482 spin_unlock_irq(&device->resource->req_lock); 5483 return -EIO; 5484 } 5485 __req_mod(req, what, peer_device, &m); 5486 spin_unlock_irq(&device->resource->req_lock); 5487 5488 if (m.bio) 5489 complete_master_bio(device, &m); 5490 return 0; 5491 } 5492 5493 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5494 { 5495 struct drbd_peer_device *peer_device; 5496 struct drbd_device *device; 5497 struct p_block_ack *p = pi->data; 5498 sector_t sector = be64_to_cpu(p->sector); 5499 int blksize = be32_to_cpu(p->blksize); 5500 enum drbd_req_event what; 5501 5502 peer_device = conn_peer_device(connection, pi->vnr); 5503 if (!peer_device) 5504 return -EIO; 5505 device = peer_device->device; 5506 5507 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5508 5509 if (p->block_id == ID_SYNCER) { 5510 drbd_set_in_sync(peer_device, sector, blksize); 5511 dec_rs_pending(peer_device); 5512 return 0; 5513 } 5514 switch (pi->cmd) { 5515 case P_RS_WRITE_ACK: 5516 what = WRITE_ACKED_BY_PEER_AND_SIS; 5517 break; 5518 case P_WRITE_ACK: 5519 what = WRITE_ACKED_BY_PEER; 5520 break; 5521 case P_RECV_ACK: 5522 what = RECV_ACKED_BY_PEER; 5523 break; 5524 case P_SUPERSEDED: 5525 what = CONFLICT_RESOLVED; 5526 break; 5527 case P_RETRY_WRITE: 5528 what = POSTPONE_WRITE; 5529 break; 5530 default: 5531 BUG(); 5532 } 5533 5534 return validate_req_change_req_state(peer_device, p->block_id, sector, 5535 &device->write_requests, __func__, 5536 what, false); 5537 } 5538 5539 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5540 { 5541 struct drbd_peer_device *peer_device; 5542 struct drbd_device *device; 5543 struct p_block_ack *p = pi->data; 5544 sector_t sector = be64_to_cpu(p->sector); 5545 int size = be32_to_cpu(p->blksize); 5546 int err; 5547 5548 peer_device = conn_peer_device(connection, pi->vnr); 5549 if (!peer_device) 5550 return -EIO; 5551 device = peer_device->device; 5552 5553 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5554 5555 if (p->block_id == ID_SYNCER) { 5556 dec_rs_pending(peer_device); 5557 drbd_rs_failed_io(peer_device, sector, size); 5558 return 0; 5559 } 5560 5561 err = validate_req_change_req_state(peer_device, p->block_id, sector, 5562 &device->write_requests, __func__, 5563 NEG_ACKED, true); 5564 if (err) { 5565 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5566 The master bio might already be completed, therefore the 5567 request is no longer in the collision hash. */ 5568 /* In Protocol B we might already have got a P_RECV_ACK 5569 but then get a P_NEG_ACK afterwards. */ 5570 drbd_set_out_of_sync(peer_device, sector, size); 5571 } 5572 return 0; 5573 } 5574 5575 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5576 { 5577 struct drbd_peer_device *peer_device; 5578 struct drbd_device *device; 5579 struct p_block_ack *p = pi->data; 5580 sector_t sector = be64_to_cpu(p->sector); 5581 5582 peer_device = conn_peer_device(connection, pi->vnr); 5583 if (!peer_device) 5584 return -EIO; 5585 device = peer_device->device; 5586 5587 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5588 5589 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5590 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5591 5592 return validate_req_change_req_state(peer_device, p->block_id, sector, 5593 &device->read_requests, __func__, 5594 NEG_ACKED, false); 5595 } 5596 5597 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5598 { 5599 struct drbd_peer_device *peer_device; 5600 struct drbd_device *device; 5601 sector_t sector; 5602 int size; 5603 struct p_block_ack *p = pi->data; 5604 5605 peer_device = conn_peer_device(connection, pi->vnr); 5606 if (!peer_device) 5607 return -EIO; 5608 device = peer_device->device; 5609 5610 sector = be64_to_cpu(p->sector); 5611 size = be32_to_cpu(p->blksize); 5612 5613 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5614 5615 dec_rs_pending(peer_device); 5616 5617 if (get_ldev_if_state(device, D_FAILED)) { 5618 drbd_rs_complete_io(device, sector); 5619 switch (pi->cmd) { 5620 case P_NEG_RS_DREPLY: 5621 drbd_rs_failed_io(peer_device, sector, size); 5622 break; 5623 case P_RS_CANCEL: 5624 break; 5625 default: 5626 BUG(); 5627 } 5628 put_ldev(device); 5629 } 5630 5631 return 0; 5632 } 5633 5634 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5635 { 5636 struct p_barrier_ack *p = pi->data; 5637 struct drbd_peer_device *peer_device; 5638 int vnr; 5639 5640 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5641 5642 rcu_read_lock(); 5643 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5644 struct drbd_device *device = peer_device->device; 5645 5646 if (device->state.conn == C_AHEAD && 5647 atomic_read(&device->ap_in_flight) == 0 && 5648 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5649 device->start_resync_timer.expires = jiffies + HZ; 5650 add_timer(&device->start_resync_timer); 5651 } 5652 } 5653 rcu_read_unlock(); 5654 5655 return 0; 5656 } 5657 5658 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5659 { 5660 struct drbd_peer_device *peer_device; 5661 struct drbd_device *device; 5662 struct p_block_ack *p = pi->data; 5663 struct drbd_device_work *dw; 5664 sector_t sector; 5665 int size; 5666 5667 peer_device = conn_peer_device(connection, pi->vnr); 5668 if (!peer_device) 5669 return -EIO; 5670 device = peer_device->device; 5671 5672 sector = be64_to_cpu(p->sector); 5673 size = be32_to_cpu(p->blksize); 5674 5675 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5676 5677 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5678 drbd_ov_out_of_sync_found(peer_device, sector, size); 5679 else 5680 ov_out_of_sync_print(peer_device); 5681 5682 if (!get_ldev(device)) 5683 return 0; 5684 5685 drbd_rs_complete_io(device, sector); 5686 dec_rs_pending(peer_device); 5687 5688 --device->ov_left; 5689 5690 /* let's advance progress step marks only for every other megabyte */ 5691 if ((device->ov_left & 0x200) == 0x200) 5692 drbd_advance_rs_marks(peer_device, device->ov_left); 5693 5694 if (device->ov_left == 0) { 5695 dw = kmalloc(sizeof(*dw), GFP_NOIO); 5696 if (dw) { 5697 dw->w.cb = w_ov_finished; 5698 dw->device = device; 5699 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5700 } else { 5701 drbd_err(device, "kmalloc(dw) failed."); 5702 ov_out_of_sync_print(peer_device); 5703 drbd_resync_finished(peer_device); 5704 } 5705 } 5706 put_ldev(device); 5707 return 0; 5708 } 5709 5710 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5711 { 5712 return 0; 5713 } 5714 5715 struct meta_sock_cmd { 5716 size_t pkt_size; 5717 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5718 }; 5719 5720 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout) 5721 { 5722 long t; 5723 struct net_conf *nc; 5724 5725 rcu_read_lock(); 5726 nc = rcu_dereference(connection->net_conf); 5727 t = ping_timeout ? nc->ping_timeo : nc->ping_int; 5728 rcu_read_unlock(); 5729 5730 t *= HZ; 5731 if (ping_timeout) 5732 t /= 10; 5733 5734 connection->meta.socket->sk->sk_rcvtimeo = t; 5735 } 5736 5737 static void set_ping_timeout(struct drbd_connection *connection) 5738 { 5739 set_rcvtimeo(connection, 1); 5740 } 5741 5742 static void set_idle_timeout(struct drbd_connection *connection) 5743 { 5744 set_rcvtimeo(connection, 0); 5745 } 5746 5747 static struct meta_sock_cmd ack_receiver_tbl[] = { 5748 [P_PING] = { 0, got_Ping }, 5749 [P_PING_ACK] = { 0, got_PingAck }, 5750 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5751 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5752 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5753 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5754 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5755 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5756 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5757 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5758 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5759 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5760 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5761 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5762 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5763 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5764 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5765 }; 5766 5767 int drbd_ack_receiver(struct drbd_thread *thi) 5768 { 5769 struct drbd_connection *connection = thi->connection; 5770 struct meta_sock_cmd *cmd = NULL; 5771 struct packet_info pi; 5772 unsigned long pre_recv_jif; 5773 int rv; 5774 void *buf = connection->meta.rbuf; 5775 int received = 0; 5776 unsigned int header_size = drbd_header_size(connection); 5777 int expect = header_size; 5778 bool ping_timeout_active = false; 5779 5780 sched_set_fifo_low(current); 5781 5782 while (get_t_state(thi) == RUNNING) { 5783 drbd_thread_current_set_cpu(thi); 5784 5785 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5786 if (drbd_send_ping(connection)) { 5787 drbd_err(connection, "drbd_send_ping has failed\n"); 5788 goto reconnect; 5789 } 5790 set_ping_timeout(connection); 5791 ping_timeout_active = true; 5792 } 5793 5794 pre_recv_jif = jiffies; 5795 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5796 5797 /* Note: 5798 * -EINTR (on meta) we got a signal 5799 * -EAGAIN (on meta) rcvtimeo expired 5800 * -ECONNRESET other side closed the connection 5801 * -ERESTARTSYS (on data) we got a signal 5802 * rv < 0 other than above: unexpected error! 5803 * rv == expected: full header or command 5804 * rv < expected: "woken" by signal during receive 5805 * rv == 0 : "connection shut down by peer" 5806 */ 5807 if (likely(rv > 0)) { 5808 received += rv; 5809 buf += rv; 5810 } else if (rv == 0) { 5811 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5812 long t; 5813 rcu_read_lock(); 5814 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5815 rcu_read_unlock(); 5816 5817 t = wait_event_timeout(connection->ping_wait, 5818 connection->cstate < C_WF_REPORT_PARAMS, 5819 t); 5820 if (t) 5821 break; 5822 } 5823 drbd_err(connection, "meta connection shut down by peer.\n"); 5824 goto reconnect; 5825 } else if (rv == -EAGAIN) { 5826 /* If the data socket received something meanwhile, 5827 * that is good enough: peer is still alive. */ 5828 if (time_after(connection->last_received, pre_recv_jif)) 5829 continue; 5830 if (ping_timeout_active) { 5831 drbd_err(connection, "PingAck did not arrive in time.\n"); 5832 goto reconnect; 5833 } 5834 set_bit(SEND_PING, &connection->flags); 5835 continue; 5836 } else if (rv == -EINTR) { 5837 /* maybe drbd_thread_stop(): the while condition will notice. 5838 * maybe woken for send_ping: we'll send a ping above, 5839 * and change the rcvtimeo */ 5840 flush_signals(current); 5841 continue; 5842 } else { 5843 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5844 goto reconnect; 5845 } 5846 5847 if (received == expect && cmd == NULL) { 5848 if (decode_header(connection, connection->meta.rbuf, &pi)) 5849 goto reconnect; 5850 cmd = &ack_receiver_tbl[pi.cmd]; 5851 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) { 5852 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5853 cmdname(pi.cmd), pi.cmd); 5854 goto disconnect; 5855 } 5856 expect = header_size + cmd->pkt_size; 5857 if (pi.size != expect - header_size) { 5858 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5859 pi.cmd, pi.size); 5860 goto reconnect; 5861 } 5862 } 5863 if (received == expect) { 5864 bool err; 5865 5866 err = cmd->fn(connection, &pi); 5867 if (err) { 5868 drbd_err(connection, "%ps failed\n", cmd->fn); 5869 goto reconnect; 5870 } 5871 5872 connection->last_received = jiffies; 5873 5874 if (cmd == &ack_receiver_tbl[P_PING_ACK]) { 5875 set_idle_timeout(connection); 5876 ping_timeout_active = false; 5877 } 5878 5879 buf = connection->meta.rbuf; 5880 received = 0; 5881 expect = header_size; 5882 cmd = NULL; 5883 } 5884 } 5885 5886 if (0) { 5887 reconnect: 5888 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5889 conn_md_sync(connection); 5890 } 5891 if (0) { 5892 disconnect: 5893 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5894 } 5895 5896 drbd_info(connection, "ack_receiver terminated\n"); 5897 5898 return 0; 5899 } 5900 5901 void drbd_send_acks_wf(struct work_struct *ws) 5902 { 5903 struct drbd_peer_device *peer_device = 5904 container_of(ws, struct drbd_peer_device, send_acks_work); 5905 struct drbd_connection *connection = peer_device->connection; 5906 struct drbd_device *device = peer_device->device; 5907 struct net_conf *nc; 5908 int tcp_cork, err; 5909 5910 rcu_read_lock(); 5911 nc = rcu_dereference(connection->net_conf); 5912 tcp_cork = nc->tcp_cork; 5913 rcu_read_unlock(); 5914 5915 if (tcp_cork) 5916 tcp_sock_set_cork(connection->meta.socket->sk, true); 5917 5918 err = drbd_finish_peer_reqs(device); 5919 kref_put(&device->kref, drbd_destroy_device); 5920 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the 5921 struct work_struct send_acks_work alive, which is in the peer_device object */ 5922 5923 if (err) { 5924 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5925 return; 5926 } 5927 5928 if (tcp_cork) 5929 tcp_sock_set_cork(connection->meta.socket->sk, false); 5930 5931 return; 5932 } 5933