1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 drbd_receiver.c 4 5 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 6 7 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 8 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 9 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 10 11 */ 12 13 14 #include <linux/module.h> 15 16 #include <linux/uaccess.h> 17 #include <net/sock.h> 18 19 #include <linux/drbd.h> 20 #include <linux/fs.h> 21 #include <linux/file.h> 22 #include <linux/in.h> 23 #include <linux/mm.h> 24 #include <linux/memcontrol.h> 25 #include <linux/mm_inline.h> 26 #include <linux/slab.h> 27 #include <uapi/linux/sched/types.h> 28 #include <linux/sched/signal.h> 29 #include <linux/pkt_sched.h> 30 #include <linux/unistd.h> 31 #include <linux/vmalloc.h> 32 #include <linux/random.h> 33 #include <linux/string.h> 34 #include <linux/scatterlist.h> 35 #include <linux/part_stat.h> 36 #include <linux/mempool.h> 37 #include "drbd_int.h" 38 #include "drbd_protocol.h" 39 #include "drbd_req.h" 40 #include "drbd_vli.h" 41 42 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES) 43 44 struct packet_info { 45 enum drbd_packet cmd; 46 unsigned int size; 47 unsigned int vnr; 48 void *data; 49 }; 50 51 enum finish_epoch { 52 FE_STILL_LIVE, 53 FE_DESTROYED, 54 FE_RECYCLED, 55 }; 56 57 static int drbd_do_features(struct drbd_connection *connection); 58 static int drbd_do_auth(struct drbd_connection *connection); 59 static int drbd_disconnected(struct drbd_peer_device *); 60 static void conn_wait_active_ee_empty(struct drbd_connection *connection); 61 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 62 static int e_end_block(struct drbd_work *, int); 63 64 65 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 66 67 static struct page *__drbd_alloc_pages(unsigned int number) 68 { 69 struct page *page = NULL; 70 struct page *tmp = NULL; 71 unsigned int i = 0; 72 73 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 74 * "criss-cross" setup, that might cause write-out on some other DRBD, 75 * which in turn might block on the other node at this very place. */ 76 for (i = 0; i < number; i++) { 77 tmp = mempool_alloc(&drbd_buffer_page_pool, GFP_TRY); 78 if (!tmp) 79 goto fail; 80 set_page_private(tmp, (unsigned long)page); 81 page = tmp; 82 } 83 return page; 84 fail: 85 page_chain_for_each_safe(page, tmp) { 86 set_page_private(page, 0); 87 mempool_free(page, &drbd_buffer_page_pool); 88 } 89 return NULL; 90 } 91 92 /** 93 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 94 * @peer_device: DRBD device. 95 * @number: number of pages requested 96 * @retry: whether to retry, if not enough pages are available right now 97 * 98 * Tries to allocate number pages, first from our own page pool, then from 99 * the kernel. 100 * Possibly retry until DRBD frees sufficient pages somewhere else. 101 * 102 * If this allocation would exceed the max_buffers setting, we throttle 103 * allocation (schedule_timeout) to give the system some room to breathe. 104 * 105 * We do not use max-buffers as hard limit, because it could lead to 106 * congestion and further to a distributed deadlock during online-verify or 107 * (checksum based) resync, if the max-buffers, socket buffer sizes and 108 * resync-rate settings are mis-configured. 109 * 110 * Returns a page chain linked via page->private. 111 */ 112 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 113 bool retry) 114 { 115 struct drbd_device *device = peer_device->device; 116 struct page *page; 117 struct net_conf *nc; 118 unsigned int mxb; 119 120 rcu_read_lock(); 121 nc = rcu_dereference(peer_device->connection->net_conf); 122 mxb = nc ? nc->max_buffers : 1000000; 123 rcu_read_unlock(); 124 125 if (atomic_read(&device->pp_in_use) >= mxb) 126 schedule_timeout_interruptible(HZ / 10); 127 page = __drbd_alloc_pages(number); 128 129 if (page) 130 atomic_add(number, &device->pp_in_use); 131 return page; 132 } 133 134 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 135 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 136 * Either links the page chain back to the global pool, 137 * or returns all pages to the system. */ 138 static void drbd_free_pages(struct drbd_device *device, struct page *page) 139 { 140 struct page *tmp; 141 int i = 0; 142 143 if (page == NULL) 144 return; 145 146 page_chain_for_each_safe(page, tmp) { 147 set_page_private(page, 0); 148 if (page_count(page) == 1) 149 mempool_free(page, &drbd_buffer_page_pool); 150 else 151 put_page(page); 152 i++; 153 } 154 i = atomic_sub_return(i, &device->pp_in_use); 155 if (i < 0) 156 drbd_warn(device, "ASSERTION FAILED: pp_in_use: %d < 0\n", i); 157 } 158 159 /* 160 You need to hold the req_lock: 161 _drbd_wait_ee_list_empty() 162 163 You must not have the req_lock: 164 drbd_free_peer_req() 165 drbd_alloc_peer_req() 166 drbd_free_peer_reqs() 167 drbd_ee_fix_bhs() 168 drbd_finish_peer_reqs() 169 drbd_clear_done_ee() 170 drbd_wait_ee_list_empty() 171 */ 172 173 /* normal: payload_size == request size (bi_size) 174 * w_same: payload_size == logical_block_size 175 * trim: payload_size == 0 */ 176 struct drbd_peer_request * 177 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 178 unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local) 179 { 180 struct drbd_device *device = peer_device->device; 181 struct drbd_peer_request *peer_req; 182 struct page *page = NULL; 183 unsigned int nr_pages = PFN_UP(payload_size); 184 185 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 186 return NULL; 187 188 peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 189 if (!peer_req) { 190 if (!(gfp_mask & __GFP_NOWARN)) 191 drbd_err(device, "%s: allocation failed\n", __func__); 192 return NULL; 193 } 194 195 if (nr_pages) { 196 page = drbd_alloc_pages(peer_device, nr_pages, 197 gfpflags_allow_blocking(gfp_mask)); 198 if (!page) 199 goto fail; 200 if (!mempool_is_saturated(&drbd_buffer_page_pool)) 201 peer_req->flags |= EE_RELEASE_TO_MEMPOOL; 202 } 203 204 memset(peer_req, 0, sizeof(*peer_req)); 205 INIT_LIST_HEAD(&peer_req->w.list); 206 drbd_clear_interval(&peer_req->i); 207 peer_req->i.size = request_size; 208 peer_req->i.sector = sector; 209 peer_req->submit_jif = jiffies; 210 peer_req->peer_device = peer_device; 211 peer_req->pages = page; 212 /* 213 * The block_id is opaque to the receiver. It is not endianness 214 * converted, and sent back to the sender unchanged. 215 */ 216 peer_req->block_id = id; 217 218 return peer_req; 219 220 fail: 221 mempool_free(peer_req, &drbd_ee_mempool); 222 return NULL; 223 } 224 225 void drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req) 226 { 227 might_sleep(); 228 if (peer_req->flags & EE_HAS_DIGEST) 229 kfree(peer_req->digest); 230 drbd_free_pages(device, peer_req->pages); 231 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 232 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 233 if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { 234 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 235 drbd_al_complete_io(device, &peer_req->i); 236 } 237 mempool_free(peer_req, &drbd_ee_mempool); 238 } 239 240 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 241 { 242 LIST_HEAD(work_list); 243 struct drbd_peer_request *peer_req, *t; 244 int count = 0; 245 246 spin_lock_irq(&device->resource->req_lock); 247 list_splice_init(list, &work_list); 248 spin_unlock_irq(&device->resource->req_lock); 249 250 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 251 drbd_free_peer_req(device, peer_req); 252 count++; 253 } 254 return count; 255 } 256 257 /* 258 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 259 */ 260 static int drbd_finish_peer_reqs(struct drbd_device *device) 261 { 262 LIST_HEAD(work_list); 263 struct drbd_peer_request *peer_req, *t; 264 int err = 0; 265 266 spin_lock_irq(&device->resource->req_lock); 267 list_splice_init(&device->done_ee, &work_list); 268 spin_unlock_irq(&device->resource->req_lock); 269 270 /* possible callbacks here: 271 * e_end_block, and e_end_resync_block, e_send_superseded. 272 * all ignore the last argument. 273 */ 274 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 275 int err2; 276 277 /* list_del not necessary, next/prev members not touched */ 278 err2 = peer_req->w.cb(&peer_req->w, !!err); 279 if (!err) 280 err = err2; 281 drbd_free_peer_req(device, peer_req); 282 } 283 wake_up(&device->ee_wait); 284 285 return err; 286 } 287 288 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 289 struct list_head *head) 290 { 291 DEFINE_WAIT(wait); 292 293 /* avoids spin_lock/unlock 294 * and calling prepare_to_wait in the fast path */ 295 while (!list_empty(head)) { 296 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 297 spin_unlock_irq(&device->resource->req_lock); 298 io_schedule(); 299 finish_wait(&device->ee_wait, &wait); 300 spin_lock_irq(&device->resource->req_lock); 301 } 302 } 303 304 static void drbd_wait_ee_list_empty(struct drbd_device *device, 305 struct list_head *head) 306 { 307 spin_lock_irq(&device->resource->req_lock); 308 _drbd_wait_ee_list_empty(device, head); 309 spin_unlock_irq(&device->resource->req_lock); 310 } 311 312 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 313 { 314 struct kvec iov = { 315 .iov_base = buf, 316 .iov_len = size, 317 }; 318 struct msghdr msg = { 319 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 320 }; 321 iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size); 322 return sock_recvmsg(sock, &msg, msg.msg_flags); 323 } 324 325 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 326 { 327 int rv; 328 329 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 330 331 if (rv < 0) { 332 if (rv == -ECONNRESET) 333 drbd_info(connection, "sock was reset by peer\n"); 334 else if (rv != -ERESTARTSYS) 335 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 336 } else if (rv == 0) { 337 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 338 long t; 339 rcu_read_lock(); 340 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 341 rcu_read_unlock(); 342 343 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 344 345 if (t) 346 goto out; 347 } 348 drbd_info(connection, "sock was shut down by peer\n"); 349 } 350 351 if (rv != size) 352 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 353 354 out: 355 return rv; 356 } 357 358 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 359 { 360 int err; 361 362 err = drbd_recv(connection, buf, size); 363 if (err != size) { 364 if (err >= 0) 365 err = -EIO; 366 } else 367 err = 0; 368 return err; 369 } 370 371 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 372 { 373 int err; 374 375 err = drbd_recv_all(connection, buf, size); 376 if (err && !signal_pending(current)) 377 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 378 return err; 379 } 380 381 /* quoting tcp(7): 382 * On individual connections, the socket buffer size must be set prior to the 383 * listen(2) or connect(2) calls in order to have it take effect. 384 * This is our wrapper to do so. 385 */ 386 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 387 unsigned int rcv) 388 { 389 /* open coded SO_SNDBUF, SO_RCVBUF */ 390 if (snd) { 391 sock->sk->sk_sndbuf = snd; 392 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 393 } 394 if (rcv) { 395 sock->sk->sk_rcvbuf = rcv; 396 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 397 } 398 } 399 400 static struct socket *drbd_try_connect(struct drbd_connection *connection) 401 { 402 const char *what; 403 struct socket *sock; 404 struct sockaddr_in6 src_in6; 405 struct sockaddr_in6 peer_in6; 406 struct net_conf *nc; 407 int err, peer_addr_len, my_addr_len; 408 int sndbuf_size, rcvbuf_size, connect_int; 409 int disconnect_on_error = 1; 410 411 rcu_read_lock(); 412 nc = rcu_dereference(connection->net_conf); 413 if (!nc) { 414 rcu_read_unlock(); 415 return NULL; 416 } 417 sndbuf_size = nc->sndbuf_size; 418 rcvbuf_size = nc->rcvbuf_size; 419 connect_int = nc->connect_int; 420 rcu_read_unlock(); 421 422 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 423 memcpy(&src_in6, &connection->my_addr, my_addr_len); 424 425 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 426 src_in6.sin6_port = 0; 427 else 428 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 429 430 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 431 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 432 433 what = "sock_create_kern"; 434 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family, 435 SOCK_STREAM, IPPROTO_TCP, &sock); 436 if (err < 0) { 437 sock = NULL; 438 goto out; 439 } 440 441 sock->sk->sk_rcvtimeo = 442 sock->sk->sk_sndtimeo = connect_int * HZ; 443 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 444 445 /* explicitly bind to the configured IP as source IP 446 * for the outgoing connections. 447 * This is needed for multihomed hosts and to be 448 * able to use lo: interfaces for drbd. 449 * Make sure to use 0 as port number, so linux selects 450 * a free one dynamically. 451 */ 452 what = "bind before connect"; 453 err = sock->ops->bind(sock, (struct sockaddr_unsized *) &src_in6, my_addr_len); 454 if (err < 0) 455 goto out; 456 457 /* connect may fail, peer not yet available. 458 * stay C_WF_CONNECTION, don't go Disconnecting! */ 459 disconnect_on_error = 0; 460 what = "connect"; 461 err = sock->ops->connect(sock, (struct sockaddr_unsized *) &peer_in6, peer_addr_len, 0); 462 463 out: 464 if (err < 0) { 465 if (sock) { 466 sock_release(sock); 467 sock = NULL; 468 } 469 switch (-err) { 470 /* timeout, busy, signal pending */ 471 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 472 case EINTR: case ERESTARTSYS: 473 /* peer not (yet) available, network problem */ 474 case ECONNREFUSED: case ENETUNREACH: 475 case EHOSTDOWN: case EHOSTUNREACH: 476 disconnect_on_error = 0; 477 break; 478 default: 479 drbd_err(connection, "%s failed, err = %d\n", what, err); 480 } 481 if (disconnect_on_error) 482 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 483 } 484 485 return sock; 486 } 487 488 struct accept_wait_data { 489 struct drbd_connection *connection; 490 struct socket *s_listen; 491 struct completion door_bell; 492 void (*original_sk_state_change)(struct sock *sk); 493 494 }; 495 496 static void drbd_incoming_connection(struct sock *sk) 497 { 498 struct accept_wait_data *ad = sk->sk_user_data; 499 void (*state_change)(struct sock *sk); 500 501 state_change = ad->original_sk_state_change; 502 if (sk->sk_state == TCP_ESTABLISHED) 503 complete(&ad->door_bell); 504 state_change(sk); 505 } 506 507 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 508 { 509 int err, sndbuf_size, rcvbuf_size, my_addr_len; 510 struct sockaddr_in6 my_addr; 511 struct socket *s_listen; 512 struct net_conf *nc; 513 const char *what; 514 515 rcu_read_lock(); 516 nc = rcu_dereference(connection->net_conf); 517 if (!nc) { 518 rcu_read_unlock(); 519 return -EIO; 520 } 521 sndbuf_size = nc->sndbuf_size; 522 rcvbuf_size = nc->rcvbuf_size; 523 rcu_read_unlock(); 524 525 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 526 memcpy(&my_addr, &connection->my_addr, my_addr_len); 527 528 what = "sock_create_kern"; 529 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family, 530 SOCK_STREAM, IPPROTO_TCP, &s_listen); 531 if (err) { 532 s_listen = NULL; 533 goto out; 534 } 535 536 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 537 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 538 539 what = "bind before listen"; 540 err = s_listen->ops->bind(s_listen, (struct sockaddr_unsized *)&my_addr, my_addr_len); 541 if (err < 0) 542 goto out; 543 544 ad->s_listen = s_listen; 545 write_lock_bh(&s_listen->sk->sk_callback_lock); 546 ad->original_sk_state_change = s_listen->sk->sk_state_change; 547 s_listen->sk->sk_state_change = drbd_incoming_connection; 548 s_listen->sk->sk_user_data = ad; 549 write_unlock_bh(&s_listen->sk->sk_callback_lock); 550 551 what = "listen"; 552 err = s_listen->ops->listen(s_listen, 5); 553 if (err < 0) 554 goto out; 555 556 return 0; 557 out: 558 if (s_listen) 559 sock_release(s_listen); 560 if (err < 0) { 561 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 562 drbd_err(connection, "%s failed, err = %d\n", what, err); 563 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 564 } 565 } 566 567 return -EIO; 568 } 569 570 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 571 { 572 write_lock_bh(&sk->sk_callback_lock); 573 sk->sk_state_change = ad->original_sk_state_change; 574 sk->sk_user_data = NULL; 575 write_unlock_bh(&sk->sk_callback_lock); 576 } 577 578 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 579 { 580 int timeo, connect_int, err = 0; 581 struct socket *s_estab = NULL; 582 struct net_conf *nc; 583 584 rcu_read_lock(); 585 nc = rcu_dereference(connection->net_conf); 586 if (!nc) { 587 rcu_read_unlock(); 588 return NULL; 589 } 590 connect_int = nc->connect_int; 591 rcu_read_unlock(); 592 593 timeo = connect_int * HZ; 594 /* 28.5% random jitter */ 595 timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7; 596 597 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 598 if (err <= 0) 599 return NULL; 600 601 err = kernel_accept(ad->s_listen, &s_estab, 0); 602 if (err < 0) { 603 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 604 drbd_err(connection, "accept failed, err = %d\n", err); 605 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 606 } 607 } 608 609 if (s_estab) 610 unregister_state_change(s_estab->sk, ad); 611 612 return s_estab; 613 } 614 615 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 616 617 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 618 enum drbd_packet cmd) 619 { 620 if (!conn_prepare_command(connection, sock)) 621 return -EIO; 622 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 623 } 624 625 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 626 { 627 unsigned int header_size = drbd_header_size(connection); 628 struct packet_info pi; 629 struct net_conf *nc; 630 int err; 631 632 rcu_read_lock(); 633 nc = rcu_dereference(connection->net_conf); 634 if (!nc) { 635 rcu_read_unlock(); 636 return -EIO; 637 } 638 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10; 639 rcu_read_unlock(); 640 641 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 642 if (err != header_size) { 643 if (err >= 0) 644 err = -EIO; 645 return err; 646 } 647 err = decode_header(connection, connection->data.rbuf, &pi); 648 if (err) 649 return err; 650 return pi.cmd; 651 } 652 653 /** 654 * drbd_socket_okay() - Free the socket if its connection is not okay 655 * @sock: pointer to the pointer to the socket. 656 */ 657 static bool drbd_socket_okay(struct socket **sock) 658 { 659 int rr; 660 char tb[4]; 661 662 if (!*sock) 663 return false; 664 665 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 666 667 if (rr > 0 || rr == -EAGAIN) { 668 return true; 669 } else { 670 sock_release(*sock); 671 *sock = NULL; 672 return false; 673 } 674 } 675 676 static bool connection_established(struct drbd_connection *connection, 677 struct socket **sock1, 678 struct socket **sock2) 679 { 680 struct net_conf *nc; 681 int timeout; 682 bool ok; 683 684 if (!*sock1 || !*sock2) 685 return false; 686 687 rcu_read_lock(); 688 nc = rcu_dereference(connection->net_conf); 689 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10; 690 rcu_read_unlock(); 691 schedule_timeout_interruptible(timeout); 692 693 ok = drbd_socket_okay(sock1); 694 ok = drbd_socket_okay(sock2) && ok; 695 696 return ok; 697 } 698 699 /* Gets called if a connection is established, or if a new minor gets created 700 in a connection */ 701 int drbd_connected(struct drbd_peer_device *peer_device) 702 { 703 struct drbd_device *device = peer_device->device; 704 int err; 705 706 atomic_set(&device->packet_seq, 0); 707 device->peer_seq = 0; 708 709 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 710 &peer_device->connection->cstate_mutex : 711 &device->own_state_mutex; 712 713 err = drbd_send_sync_param(peer_device); 714 if (!err) 715 err = drbd_send_sizes(peer_device, 0, 0); 716 if (!err) 717 err = drbd_send_uuids(peer_device); 718 if (!err) 719 err = drbd_send_current_state(peer_device); 720 clear_bit(USE_DEGR_WFC_T, &device->flags); 721 clear_bit(RESIZE_PENDING, &device->flags); 722 atomic_set(&device->ap_in_flight, 0); 723 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 724 return err; 725 } 726 727 /* 728 * return values: 729 * 1 yes, we have a valid connection 730 * 0 oops, did not work out, please try again 731 * -1 peer talks different language, 732 * no point in trying again, please go standalone. 733 * -2 We do not have a network config... 734 */ 735 static int conn_connect(struct drbd_connection *connection) 736 { 737 struct drbd_socket sock, msock; 738 struct drbd_peer_device *peer_device; 739 struct net_conf *nc; 740 int vnr, timeout, h; 741 bool discard_my_data, ok; 742 enum drbd_state_rv rv; 743 struct accept_wait_data ad = { 744 .connection = connection, 745 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 746 }; 747 748 clear_bit(DISCONNECT_SENT, &connection->flags); 749 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 750 return -2; 751 752 mutex_init(&sock.mutex); 753 sock.sbuf = connection->data.sbuf; 754 sock.rbuf = connection->data.rbuf; 755 sock.socket = NULL; 756 mutex_init(&msock.mutex); 757 msock.sbuf = connection->meta.sbuf; 758 msock.rbuf = connection->meta.rbuf; 759 msock.socket = NULL; 760 761 /* Assume that the peer only understands protocol 80 until we know better. */ 762 connection->agreed_pro_version = 80; 763 764 if (prepare_listen_socket(connection, &ad)) 765 return 0; 766 767 do { 768 struct socket *s; 769 770 s = drbd_try_connect(connection); 771 if (s) { 772 if (!sock.socket) { 773 sock.socket = s; 774 send_first_packet(connection, &sock, P_INITIAL_DATA); 775 } else if (!msock.socket) { 776 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 777 msock.socket = s; 778 send_first_packet(connection, &msock, P_INITIAL_META); 779 } else { 780 drbd_err(connection, "Logic error in conn_connect()\n"); 781 goto out_release_sockets; 782 } 783 } 784 785 if (connection_established(connection, &sock.socket, &msock.socket)) 786 break; 787 788 retry: 789 s = drbd_wait_for_connect(connection, &ad); 790 if (s) { 791 int fp = receive_first_packet(connection, s); 792 drbd_socket_okay(&sock.socket); 793 drbd_socket_okay(&msock.socket); 794 switch (fp) { 795 case P_INITIAL_DATA: 796 if (sock.socket) { 797 drbd_warn(connection, "initial packet S crossed\n"); 798 sock_release(sock.socket); 799 sock.socket = s; 800 goto randomize; 801 } 802 sock.socket = s; 803 break; 804 case P_INITIAL_META: 805 set_bit(RESOLVE_CONFLICTS, &connection->flags); 806 if (msock.socket) { 807 drbd_warn(connection, "initial packet M crossed\n"); 808 sock_release(msock.socket); 809 msock.socket = s; 810 goto randomize; 811 } 812 msock.socket = s; 813 break; 814 default: 815 drbd_warn(connection, "Error receiving initial packet\n"); 816 sock_release(s); 817 randomize: 818 if (get_random_u32_below(2)) 819 goto retry; 820 } 821 } 822 823 if (connection->cstate <= C_DISCONNECTING) 824 goto out_release_sockets; 825 if (signal_pending(current)) { 826 flush_signals(current); 827 smp_rmb(); 828 if (get_t_state(&connection->receiver) == EXITING) 829 goto out_release_sockets; 830 } 831 832 ok = connection_established(connection, &sock.socket, &msock.socket); 833 } while (!ok); 834 835 if (ad.s_listen) 836 sock_release(ad.s_listen); 837 838 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 839 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 840 841 sock.socket->sk->sk_allocation = GFP_NOIO; 842 msock.socket->sk->sk_allocation = GFP_NOIO; 843 844 sock.socket->sk->sk_use_task_frag = false; 845 msock.socket->sk->sk_use_task_frag = false; 846 847 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 848 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 849 850 /* NOT YET ... 851 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 852 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 853 * first set it to the P_CONNECTION_FEATURES timeout, 854 * which we set to 4x the configured ping_timeout. */ 855 rcu_read_lock(); 856 nc = rcu_dereference(connection->net_conf); 857 858 sock.socket->sk->sk_sndtimeo = 859 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 860 861 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 862 timeout = nc->timeout * HZ / 10; 863 discard_my_data = nc->discard_my_data; 864 rcu_read_unlock(); 865 866 msock.socket->sk->sk_sndtimeo = timeout; 867 868 /* we don't want delays. 869 * we use TCP_CORK where appropriate, though */ 870 tcp_sock_set_nodelay(sock.socket->sk); 871 tcp_sock_set_nodelay(msock.socket->sk); 872 873 connection->data.socket = sock.socket; 874 connection->meta.socket = msock.socket; 875 connection->last_received = jiffies; 876 877 h = drbd_do_features(connection); 878 if (h <= 0) 879 return h; 880 881 if (connection->cram_hmac_tfm) { 882 /* drbd_request_state(device, NS(conn, WFAuth)); */ 883 switch (drbd_do_auth(connection)) { 884 case -1: 885 drbd_err(connection, "Authentication of peer failed\n"); 886 return -1; 887 case 0: 888 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 889 return 0; 890 } 891 } 892 893 connection->data.socket->sk->sk_sndtimeo = timeout; 894 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 895 896 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 897 return -1; 898 899 /* Prevent a race between resync-handshake and 900 * being promoted to Primary. 901 * 902 * Grab and release the state mutex, so we know that any current 903 * drbd_set_role() is finished, and any incoming drbd_set_role 904 * will see the STATE_SENT flag, and wait for it to be cleared. 905 */ 906 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 907 mutex_lock(peer_device->device->state_mutex); 908 909 /* avoid a race with conn_request_state( C_DISCONNECTING ) */ 910 spin_lock_irq(&connection->resource->req_lock); 911 set_bit(STATE_SENT, &connection->flags); 912 spin_unlock_irq(&connection->resource->req_lock); 913 914 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 915 mutex_unlock(peer_device->device->state_mutex); 916 917 rcu_read_lock(); 918 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 919 struct drbd_device *device = peer_device->device; 920 kref_get(&device->kref); 921 rcu_read_unlock(); 922 923 if (discard_my_data) 924 set_bit(DISCARD_MY_DATA, &device->flags); 925 else 926 clear_bit(DISCARD_MY_DATA, &device->flags); 927 928 drbd_connected(peer_device); 929 kref_put(&device->kref, drbd_destroy_device); 930 rcu_read_lock(); 931 } 932 rcu_read_unlock(); 933 934 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 935 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 936 clear_bit(STATE_SENT, &connection->flags); 937 return 0; 938 } 939 940 drbd_thread_start(&connection->ack_receiver); 941 /* opencoded create_singlethread_workqueue(), 942 * to be able to use format string arguments */ 943 connection->ack_sender = 944 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name); 945 if (!connection->ack_sender) { 946 drbd_err(connection, "Failed to create workqueue ack_sender\n"); 947 return 0; 948 } 949 950 mutex_lock(&connection->resource->conf_update); 951 /* The discard_my_data flag is a single-shot modifier to the next 952 * connection attempt, the handshake of which is now well underway. 953 * No need for rcu style copying of the whole struct 954 * just to clear a single value. */ 955 connection->net_conf->discard_my_data = 0; 956 mutex_unlock(&connection->resource->conf_update); 957 958 return h; 959 960 out_release_sockets: 961 if (ad.s_listen) 962 sock_release(ad.s_listen); 963 if (sock.socket) 964 sock_release(sock.socket); 965 if (msock.socket) 966 sock_release(msock.socket); 967 return -1; 968 } 969 970 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 971 { 972 unsigned int header_size = drbd_header_size(connection); 973 974 if (header_size == sizeof(struct p_header100) && 975 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 976 struct p_header100 *h = header; 977 if (h->pad != 0) { 978 drbd_err(connection, "Header padding is not zero\n"); 979 return -EINVAL; 980 } 981 pi->vnr = be16_to_cpu(h->volume); 982 pi->cmd = be16_to_cpu(h->command); 983 pi->size = be32_to_cpu(h->length); 984 } else if (header_size == sizeof(struct p_header95) && 985 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 986 struct p_header95 *h = header; 987 pi->cmd = be16_to_cpu(h->command); 988 pi->size = be32_to_cpu(h->length); 989 pi->vnr = 0; 990 } else if (header_size == sizeof(struct p_header80) && 991 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 992 struct p_header80 *h = header; 993 pi->cmd = be16_to_cpu(h->command); 994 pi->size = be16_to_cpu(h->length); 995 pi->vnr = 0; 996 } else { 997 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 998 be32_to_cpu(*(__be32 *)header), 999 connection->agreed_pro_version); 1000 return -EINVAL; 1001 } 1002 pi->data = header + header_size; 1003 return 0; 1004 } 1005 1006 static void drbd_unplug_all_devices(struct drbd_connection *connection) 1007 { 1008 if (current->plug == &connection->receiver_plug) { 1009 blk_finish_plug(&connection->receiver_plug); 1010 blk_start_plug(&connection->receiver_plug); 1011 } /* else: maybe just schedule() ?? */ 1012 } 1013 1014 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1015 { 1016 void *buffer = connection->data.rbuf; 1017 int err; 1018 1019 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1020 if (err) 1021 return err; 1022 1023 err = decode_header(connection, buffer, pi); 1024 connection->last_received = jiffies; 1025 1026 return err; 1027 } 1028 1029 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi) 1030 { 1031 void *buffer = connection->data.rbuf; 1032 unsigned int size = drbd_header_size(connection); 1033 int err; 1034 1035 err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT); 1036 if (err != size) { 1037 /* If we have nothing in the receive buffer now, to reduce 1038 * application latency, try to drain the backend queues as 1039 * quickly as possible, and let remote TCP know what we have 1040 * received so far. */ 1041 if (err == -EAGAIN) { 1042 tcp_sock_set_quickack(connection->data.socket->sk, 2); 1043 drbd_unplug_all_devices(connection); 1044 } 1045 if (err > 0) { 1046 buffer += err; 1047 size -= err; 1048 } 1049 err = drbd_recv_all_warn(connection, buffer, size); 1050 if (err) 1051 return err; 1052 } 1053 1054 err = decode_header(connection, connection->data.rbuf, pi); 1055 connection->last_received = jiffies; 1056 1057 return err; 1058 } 1059 /* This is blkdev_issue_flush, but asynchronous. 1060 * We want to submit to all component volumes in parallel, 1061 * then wait for all completions. 1062 */ 1063 struct issue_flush_context { 1064 atomic_t pending; 1065 int error; 1066 struct completion done; 1067 }; 1068 struct one_flush_context { 1069 struct drbd_device *device; 1070 struct issue_flush_context *ctx; 1071 }; 1072 1073 static void one_flush_endio(struct bio *bio) 1074 { 1075 struct one_flush_context *octx = bio->bi_private; 1076 struct drbd_device *device = octx->device; 1077 struct issue_flush_context *ctx = octx->ctx; 1078 1079 if (bio->bi_status) { 1080 ctx->error = blk_status_to_errno(bio->bi_status); 1081 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status); 1082 } 1083 kfree(octx); 1084 bio_put(bio); 1085 1086 clear_bit(FLUSH_PENDING, &device->flags); 1087 put_ldev(device); 1088 kref_put(&device->kref, drbd_destroy_device); 1089 1090 if (atomic_dec_and_test(&ctx->pending)) 1091 complete(&ctx->done); 1092 } 1093 1094 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx) 1095 { 1096 struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0, 1097 REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO); 1098 struct one_flush_context *octx = kmalloc_obj(*octx, GFP_NOIO); 1099 1100 if (!octx) { 1101 drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n"); 1102 /* FIXME: what else can I do now? disconnecting or detaching 1103 * really does not help to improve the state of the world, either. 1104 */ 1105 bio_put(bio); 1106 1107 ctx->error = -ENOMEM; 1108 put_ldev(device); 1109 kref_put(&device->kref, drbd_destroy_device); 1110 return; 1111 } 1112 1113 octx->device = device; 1114 octx->ctx = ctx; 1115 bio->bi_private = octx; 1116 bio->bi_end_io = one_flush_endio; 1117 1118 device->flush_jif = jiffies; 1119 set_bit(FLUSH_PENDING, &device->flags); 1120 atomic_inc(&ctx->pending); 1121 submit_bio(bio); 1122 } 1123 1124 static void drbd_flush(struct drbd_connection *connection) 1125 { 1126 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) { 1127 struct drbd_peer_device *peer_device; 1128 struct issue_flush_context ctx; 1129 int vnr; 1130 1131 atomic_set(&ctx.pending, 1); 1132 ctx.error = 0; 1133 init_completion(&ctx.done); 1134 1135 rcu_read_lock(); 1136 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1137 struct drbd_device *device = peer_device->device; 1138 1139 if (!get_ldev(device)) 1140 continue; 1141 kref_get(&device->kref); 1142 rcu_read_unlock(); 1143 1144 submit_one_flush(device, &ctx); 1145 1146 rcu_read_lock(); 1147 } 1148 rcu_read_unlock(); 1149 1150 /* Do we want to add a timeout, 1151 * if disk-timeout is set? */ 1152 if (!atomic_dec_and_test(&ctx.pending)) 1153 wait_for_completion(&ctx.done); 1154 1155 if (ctx.error) { 1156 /* would rather check on EOPNOTSUPP, but that is not reliable. 1157 * don't try again for ANY return value != 0 1158 * if (rv == -EOPNOTSUPP) */ 1159 /* Any error is already reported by bio_endio callback. */ 1160 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO); 1161 } 1162 } 1163 } 1164 1165 /** 1166 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1167 * @connection: DRBD connection. 1168 * @epoch: Epoch object. 1169 * @ev: Epoch event. 1170 */ 1171 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1172 struct drbd_epoch *epoch, 1173 enum epoch_event ev) 1174 { 1175 int epoch_size; 1176 struct drbd_epoch *next_epoch; 1177 enum finish_epoch rv = FE_STILL_LIVE; 1178 1179 spin_lock(&connection->epoch_lock); 1180 do { 1181 next_epoch = NULL; 1182 1183 epoch_size = atomic_read(&epoch->epoch_size); 1184 1185 switch (ev & ~EV_CLEANUP) { 1186 case EV_PUT: 1187 atomic_dec(&epoch->active); 1188 break; 1189 case EV_GOT_BARRIER_NR: 1190 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1191 break; 1192 case EV_BECAME_LAST: 1193 /* nothing to do*/ 1194 break; 1195 } 1196 1197 if (epoch_size != 0 && 1198 atomic_read(&epoch->active) == 0 && 1199 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1200 if (!(ev & EV_CLEANUP)) { 1201 spin_unlock(&connection->epoch_lock); 1202 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1203 spin_lock(&connection->epoch_lock); 1204 } 1205 #if 0 1206 /* FIXME: dec unacked on connection, once we have 1207 * something to count pending connection packets in. */ 1208 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1209 dec_unacked(epoch->connection); 1210 #endif 1211 1212 if (connection->current_epoch != epoch) { 1213 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1214 list_del(&epoch->list); 1215 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1216 connection->epochs--; 1217 kfree(epoch); 1218 1219 if (rv == FE_STILL_LIVE) 1220 rv = FE_DESTROYED; 1221 } else { 1222 epoch->flags = 0; 1223 atomic_set(&epoch->epoch_size, 0); 1224 /* atomic_set(&epoch->active, 0); is already zero */ 1225 if (rv == FE_STILL_LIVE) 1226 rv = FE_RECYCLED; 1227 } 1228 } 1229 1230 if (!next_epoch) 1231 break; 1232 1233 epoch = next_epoch; 1234 } while (1); 1235 1236 spin_unlock(&connection->epoch_lock); 1237 1238 return rv; 1239 } 1240 1241 static enum write_ordering_e 1242 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo) 1243 { 1244 struct disk_conf *dc; 1245 1246 dc = rcu_dereference(bdev->disk_conf); 1247 1248 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes) 1249 wo = WO_DRAIN_IO; 1250 if (wo == WO_DRAIN_IO && !dc->disk_drain) 1251 wo = WO_NONE; 1252 1253 return wo; 1254 } 1255 1256 /* 1257 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1258 * @wo: Write ordering method to try. 1259 */ 1260 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, 1261 enum write_ordering_e wo) 1262 { 1263 struct drbd_device *device; 1264 enum write_ordering_e pwo; 1265 int vnr; 1266 static char *write_ordering_str[] = { 1267 [WO_NONE] = "none", 1268 [WO_DRAIN_IO] = "drain", 1269 [WO_BDEV_FLUSH] = "flush", 1270 }; 1271 1272 pwo = resource->write_ordering; 1273 if (wo != WO_BDEV_FLUSH) 1274 wo = min(pwo, wo); 1275 rcu_read_lock(); 1276 idr_for_each_entry(&resource->devices, device, vnr) { 1277 if (get_ldev(device)) { 1278 wo = max_allowed_wo(device->ldev, wo); 1279 if (device->ldev == bdev) 1280 bdev = NULL; 1281 put_ldev(device); 1282 } 1283 } 1284 1285 if (bdev) 1286 wo = max_allowed_wo(bdev, wo); 1287 1288 rcu_read_unlock(); 1289 1290 resource->write_ordering = wo; 1291 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH) 1292 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]); 1293 } 1294 1295 /* 1296 * Mapping "discard" to ZEROOUT with UNMAP does not work for us: 1297 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it 1298 * will directly go to fallback mode, submitting normal writes, and 1299 * never even try to UNMAP. 1300 * 1301 * And dm-thin does not do this (yet), mostly because in general it has 1302 * to assume that "skip_block_zeroing" is set. See also: 1303 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html 1304 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html 1305 * 1306 * We *may* ignore the discard-zeroes-data setting, if so configured. 1307 * 1308 * Assumption is that this "discard_zeroes_data=0" is only because the backend 1309 * may ignore partial unaligned discards. 1310 * 1311 * LVM/DM thin as of at least 1312 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28) 1313 * Library version: 1.02.93-RHEL7 (2015-01-28) 1314 * Driver version: 4.29.0 1315 * still behaves this way. 1316 * 1317 * For unaligned (wrt. alignment and granularity) or too small discards, 1318 * we zero-out the initial (and/or) trailing unaligned partial chunks, 1319 * but discard all the aligned full chunks. 1320 * 1321 * At least for LVM/DM thin, with skip_block_zeroing=false, 1322 * the result is effectively "discard_zeroes_data=1". 1323 */ 1324 /* flags: EE_TRIM|EE_ZEROOUT */ 1325 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags) 1326 { 1327 struct block_device *bdev = device->ldev->backing_bdev; 1328 sector_t tmp, nr; 1329 unsigned int max_discard_sectors, granularity; 1330 int alignment; 1331 int err = 0; 1332 1333 if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM)) 1334 goto zero_out; 1335 1336 /* Zero-sector (unknown) and one-sector granularities are the same. */ 1337 granularity = max(bdev_discard_granularity(bdev) >> 9, 1U); 1338 alignment = (bdev_discard_alignment(bdev) >> 9) % granularity; 1339 1340 max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22)); 1341 max_discard_sectors -= max_discard_sectors % granularity; 1342 if (unlikely(!max_discard_sectors)) 1343 goto zero_out; 1344 1345 if (nr_sectors < granularity) 1346 goto zero_out; 1347 1348 tmp = start; 1349 if (sector_div(tmp, granularity) != alignment) { 1350 if (nr_sectors < 2*granularity) 1351 goto zero_out; 1352 /* start + gran - (start + gran - align) % gran */ 1353 tmp = start + granularity - alignment; 1354 tmp = start + granularity - sector_div(tmp, granularity); 1355 1356 nr = tmp - start; 1357 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many 1358 * layers are below us, some may have smaller granularity */ 1359 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0); 1360 nr_sectors -= nr; 1361 start = tmp; 1362 } 1363 while (nr_sectors >= max_discard_sectors) { 1364 err |= blkdev_issue_discard(bdev, start, max_discard_sectors, 1365 GFP_NOIO); 1366 nr_sectors -= max_discard_sectors; 1367 start += max_discard_sectors; 1368 } 1369 if (nr_sectors) { 1370 /* max_discard_sectors is unsigned int (and a multiple of 1371 * granularity, we made sure of that above already); 1372 * nr is < max_discard_sectors; 1373 * I don't need sector_div here, even though nr is sector_t */ 1374 nr = nr_sectors; 1375 nr -= (unsigned int)nr % granularity; 1376 if (nr) { 1377 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO); 1378 nr_sectors -= nr; 1379 start += nr; 1380 } 1381 } 1382 zero_out: 1383 if (nr_sectors) { 1384 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 1385 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP); 1386 } 1387 return err != 0; 1388 } 1389 1390 static bool can_do_reliable_discards(struct drbd_device *device) 1391 { 1392 struct disk_conf *dc; 1393 bool can_do; 1394 1395 if (!bdev_max_discard_sectors(device->ldev->backing_bdev)) 1396 return false; 1397 1398 rcu_read_lock(); 1399 dc = rcu_dereference(device->ldev->disk_conf); 1400 can_do = dc->discard_zeroes_if_aligned; 1401 rcu_read_unlock(); 1402 return can_do; 1403 } 1404 1405 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req) 1406 { 1407 /* If the backend cannot discard, or does not guarantee 1408 * read-back zeroes in discarded ranges, we fall back to 1409 * zero-out. Unless configuration specifically requested 1410 * otherwise. */ 1411 if (!can_do_reliable_discards(device)) 1412 peer_req->flags |= EE_ZEROOUT; 1413 1414 if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector, 1415 peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM))) 1416 peer_req->flags |= EE_WAS_ERROR; 1417 drbd_endio_write_sec_final(peer_req); 1418 } 1419 1420 static int peer_request_fault_type(struct drbd_peer_request *peer_req) 1421 { 1422 if (peer_req_op(peer_req) == REQ_OP_READ) { 1423 return peer_req->flags & EE_APPLICATION ? 1424 DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD; 1425 } else { 1426 return peer_req->flags & EE_APPLICATION ? 1427 DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR; 1428 } 1429 } 1430 1431 /** 1432 * drbd_submit_peer_request() 1433 * @peer_req: peer request 1434 * 1435 * May spread the pages to multiple bios, 1436 * depending on bio_add_page restrictions. 1437 * 1438 * Returns 0 if all bios have been submitted, 1439 * -ENOMEM if we could not allocate enough bios, 1440 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1441 * single page to an empty bio (which should never happen and likely indicates 1442 * that the lower level IO stack is in some way broken). This has been observed 1443 * on certain Xen deployments. 1444 */ 1445 /* TODO allocate from our own bio_set. */ 1446 int drbd_submit_peer_request(struct drbd_peer_request *peer_req) 1447 { 1448 struct drbd_device *device = peer_req->peer_device->device; 1449 struct bio *bios = NULL; 1450 struct bio *bio; 1451 struct page *page = peer_req->pages; 1452 sector_t sector = peer_req->i.sector; 1453 unsigned int data_size = peer_req->i.size; 1454 unsigned int n_bios = 0; 1455 unsigned int nr_pages = PFN_UP(data_size); 1456 1457 /* TRIM/DISCARD: for now, always use the helper function 1458 * blkdev_issue_zeroout(..., discard=true). 1459 * It's synchronous, but it does the right thing wrt. bio splitting. 1460 * Correctness first, performance later. Next step is to code an 1461 * asynchronous variant of the same. 1462 */ 1463 if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) { 1464 /* wait for all pending IO completions, before we start 1465 * zeroing things out. */ 1466 conn_wait_active_ee_empty(peer_req->peer_device->connection); 1467 /* add it to the active list now, 1468 * so we can find it to present it in debugfs */ 1469 peer_req->submit_jif = jiffies; 1470 peer_req->flags |= EE_SUBMITTED; 1471 1472 /* If this was a resync request from receive_rs_deallocated(), 1473 * it is already on the sync_ee list */ 1474 if (list_empty(&peer_req->w.list)) { 1475 spin_lock_irq(&device->resource->req_lock); 1476 list_add_tail(&peer_req->w.list, &device->active_ee); 1477 spin_unlock_irq(&device->resource->req_lock); 1478 } 1479 1480 drbd_issue_peer_discard_or_zero_out(device, peer_req); 1481 return 0; 1482 } 1483 1484 /* In most cases, we will only need one bio. But in case the lower 1485 * level restrictions happen to be different at this offset on this 1486 * side than those of the sending peer, we may need to submit the 1487 * request in more than one bio. 1488 * 1489 * Plain bio_alloc is good enough here, this is no DRBD internally 1490 * generated bio, but a bio allocated on behalf of the peer. 1491 */ 1492 next_bio: 1493 /* _DISCARD, _WRITE_ZEROES handled above. 1494 * REQ_OP_FLUSH (empty flush) not expected, 1495 * should have been mapped to a "drbd protocol barrier". 1496 * REQ_OP_SECURE_ERASE: I don't see how we could ever support that. 1497 */ 1498 if (!(peer_req_op(peer_req) == REQ_OP_WRITE || 1499 peer_req_op(peer_req) == REQ_OP_READ)) { 1500 drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf); 1501 return -EINVAL; 1502 } 1503 1504 bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO); 1505 /* > peer_req->i.sector, unless this is the first bio */ 1506 bio->bi_iter.bi_sector = sector; 1507 bio->bi_private = peer_req; 1508 bio->bi_end_io = drbd_peer_request_endio; 1509 1510 bio->bi_next = bios; 1511 bios = bio; 1512 ++n_bios; 1513 1514 page_chain_for_each(page) { 1515 unsigned len = min_t(unsigned, data_size, PAGE_SIZE); 1516 if (!bio_add_page(bio, page, len, 0)) 1517 goto next_bio; 1518 data_size -= len; 1519 sector += len >> 9; 1520 --nr_pages; 1521 } 1522 D_ASSERT(device, data_size == 0); 1523 D_ASSERT(device, page == NULL); 1524 1525 atomic_set(&peer_req->pending_bios, n_bios); 1526 /* for debugfs: update timestamp, mark as submitted */ 1527 peer_req->submit_jif = jiffies; 1528 peer_req->flags |= EE_SUBMITTED; 1529 do { 1530 bio = bios; 1531 bios = bios->bi_next; 1532 bio->bi_next = NULL; 1533 1534 drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio); 1535 } while (bios); 1536 return 0; 1537 } 1538 1539 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1540 struct drbd_peer_request *peer_req) 1541 { 1542 struct drbd_interval *i = &peer_req->i; 1543 1544 drbd_remove_interval(&device->write_requests, i); 1545 drbd_clear_interval(i); 1546 1547 /* Wake up any processes waiting for this peer request to complete. */ 1548 if (i->waiting) 1549 wake_up(&device->misc_wait); 1550 } 1551 1552 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1553 { 1554 struct drbd_peer_device *peer_device; 1555 int vnr; 1556 1557 rcu_read_lock(); 1558 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1559 struct drbd_device *device = peer_device->device; 1560 1561 kref_get(&device->kref); 1562 rcu_read_unlock(); 1563 drbd_wait_ee_list_empty(device, &device->active_ee); 1564 kref_put(&device->kref, drbd_destroy_device); 1565 rcu_read_lock(); 1566 } 1567 rcu_read_unlock(); 1568 } 1569 1570 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1571 { 1572 int rv; 1573 struct p_barrier *p = pi->data; 1574 struct drbd_epoch *epoch; 1575 1576 /* FIXME these are unacked on connection, 1577 * not a specific (peer)device. 1578 */ 1579 connection->current_epoch->barrier_nr = p->barrier; 1580 connection->current_epoch->connection = connection; 1581 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1582 1583 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1584 * the activity log, which means it would not be resynced in case the 1585 * R_PRIMARY crashes now. 1586 * Therefore we must send the barrier_ack after the barrier request was 1587 * completed. */ 1588 switch (connection->resource->write_ordering) { 1589 case WO_NONE: 1590 if (rv == FE_RECYCLED) 1591 return 0; 1592 1593 /* receiver context, in the writeout path of the other node. 1594 * avoid potential distributed deadlock */ 1595 epoch = kmalloc_obj(struct drbd_epoch, GFP_NOIO); 1596 if (epoch) 1597 break; 1598 else 1599 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1600 fallthrough; 1601 1602 case WO_BDEV_FLUSH: 1603 case WO_DRAIN_IO: 1604 conn_wait_active_ee_empty(connection); 1605 drbd_flush(connection); 1606 1607 if (atomic_read(&connection->current_epoch->epoch_size)) { 1608 epoch = kmalloc_obj(struct drbd_epoch, GFP_NOIO); 1609 if (epoch) 1610 break; 1611 } 1612 1613 return 0; 1614 default: 1615 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", 1616 connection->resource->write_ordering); 1617 return -EIO; 1618 } 1619 1620 epoch->flags = 0; 1621 atomic_set(&epoch->epoch_size, 0); 1622 atomic_set(&epoch->active, 0); 1623 1624 spin_lock(&connection->epoch_lock); 1625 if (atomic_read(&connection->current_epoch->epoch_size)) { 1626 list_add(&epoch->list, &connection->current_epoch->list); 1627 connection->current_epoch = epoch; 1628 connection->epochs++; 1629 } else { 1630 /* The current_epoch got recycled while we allocated this one... */ 1631 kfree(epoch); 1632 } 1633 spin_unlock(&connection->epoch_lock); 1634 1635 return 0; 1636 } 1637 1638 /* quick wrapper in case payload size != request_size (write same) */ 1639 static void drbd_csum_ee_size(struct crypto_shash *h, 1640 struct drbd_peer_request *r, void *d, 1641 unsigned int payload_size) 1642 { 1643 unsigned int tmp = r->i.size; 1644 r->i.size = payload_size; 1645 drbd_csum_ee(h, r, d); 1646 r->i.size = tmp; 1647 } 1648 1649 /* used from receive_RSDataReply (recv_resync_read) 1650 * and from receive_Data. 1651 * data_size: actual payload ("data in") 1652 * for normal writes that is bi_size. 1653 * for discards, that is zero. 1654 * for write same, it is logical_block_size. 1655 * both trim and write same have the bi_size ("data len to be affected") 1656 * as extra argument in the packet header. 1657 */ 1658 static struct drbd_peer_request * 1659 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1660 struct packet_info *pi) __must_hold(local) 1661 { 1662 struct drbd_device *device = peer_device->device; 1663 const sector_t capacity = get_capacity(device->vdisk); 1664 struct drbd_peer_request *peer_req; 1665 struct page *page; 1666 int digest_size, err; 1667 unsigned int data_size = pi->size, ds; 1668 void *dig_in = peer_device->connection->int_dig_in; 1669 void *dig_vv = peer_device->connection->int_dig_vv; 1670 unsigned long *data; 1671 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; 1672 struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL; 1673 1674 digest_size = 0; 1675 if (!trim && peer_device->connection->peer_integrity_tfm) { 1676 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm); 1677 /* 1678 * FIXME: Receive the incoming digest into the receive buffer 1679 * here, together with its struct p_data? 1680 */ 1681 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1682 if (err) 1683 return NULL; 1684 data_size -= digest_size; 1685 } 1686 1687 /* assume request_size == data_size, but special case trim. */ 1688 ds = data_size; 1689 if (trim) { 1690 if (!expect(peer_device, data_size == 0)) 1691 return NULL; 1692 ds = be32_to_cpu(trim->size); 1693 } else if (zeroes) { 1694 if (!expect(peer_device, data_size == 0)) 1695 return NULL; 1696 ds = be32_to_cpu(zeroes->size); 1697 } 1698 1699 if (!expect(peer_device, IS_ALIGNED(ds, 512))) 1700 return NULL; 1701 if (trim || zeroes) { 1702 if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9))) 1703 return NULL; 1704 } else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE)) 1705 return NULL; 1706 1707 /* even though we trust out peer, 1708 * we sometimes have to double check. */ 1709 if (sector + (ds>>9) > capacity) { 1710 drbd_err(device, "request from peer beyond end of local disk: " 1711 "capacity: %llus < sector: %llus + size: %u\n", 1712 (unsigned long long)capacity, 1713 (unsigned long long)sector, ds); 1714 return NULL; 1715 } 1716 1717 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1718 * "criss-cross" setup, that might cause write-out on some other DRBD, 1719 * which in turn might block on the other node at this very place. */ 1720 peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO); 1721 if (!peer_req) 1722 return NULL; 1723 1724 peer_req->flags |= EE_WRITE; 1725 if (trim) { 1726 peer_req->flags |= EE_TRIM; 1727 return peer_req; 1728 } 1729 if (zeroes) { 1730 peer_req->flags |= EE_ZEROOUT; 1731 return peer_req; 1732 } 1733 1734 /* receive payload size bytes into page chain */ 1735 ds = data_size; 1736 page = peer_req->pages; 1737 page_chain_for_each(page) { 1738 unsigned len = min_t(int, ds, PAGE_SIZE); 1739 data = kmap_local_page(page); 1740 err = drbd_recv_all_warn(peer_device->connection, data, len); 1741 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1742 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1743 data[0] = data[0] ^ (unsigned long)-1; 1744 } 1745 kunmap_local(data); 1746 if (err) { 1747 drbd_free_peer_req(device, peer_req); 1748 return NULL; 1749 } 1750 ds -= len; 1751 } 1752 1753 if (digest_size) { 1754 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size); 1755 if (memcmp(dig_in, dig_vv, digest_size)) { 1756 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1757 (unsigned long long)sector, data_size); 1758 drbd_free_peer_req(device, peer_req); 1759 return NULL; 1760 } 1761 } 1762 device->recv_cnt += data_size >> 9; 1763 return peer_req; 1764 } 1765 1766 /* drbd_drain_block() just takes a data block 1767 * out of the socket input buffer, and discards it. 1768 */ 1769 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1770 { 1771 struct page *page; 1772 int err = 0; 1773 void *data; 1774 1775 if (!data_size) 1776 return 0; 1777 1778 page = drbd_alloc_pages(peer_device, 1, 1); 1779 1780 data = kmap_local_page(page); 1781 while (data_size) { 1782 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1783 1784 err = drbd_recv_all_warn(peer_device->connection, data, len); 1785 if (err) 1786 break; 1787 data_size -= len; 1788 } 1789 kunmap_local(data); 1790 drbd_free_pages(peer_device->device, page); 1791 return err; 1792 } 1793 1794 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1795 sector_t sector, int data_size) 1796 { 1797 struct bio_vec bvec; 1798 struct bvec_iter iter; 1799 struct bio *bio; 1800 int digest_size, err, expect; 1801 void *dig_in = peer_device->connection->int_dig_in; 1802 void *dig_vv = peer_device->connection->int_dig_vv; 1803 1804 digest_size = 0; 1805 if (peer_device->connection->peer_integrity_tfm) { 1806 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm); 1807 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1808 if (err) 1809 return err; 1810 data_size -= digest_size; 1811 } 1812 1813 /* optimistically update recv_cnt. if receiving fails below, 1814 * we disconnect anyways, and counters will be reset. */ 1815 peer_device->device->recv_cnt += data_size>>9; 1816 1817 bio = req->master_bio; 1818 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1819 1820 bio_for_each_segment(bvec, bio, iter) { 1821 void *mapped = bvec_kmap_local(&bvec); 1822 expect = min_t(int, data_size, bvec.bv_len); 1823 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1824 kunmap_local(mapped); 1825 if (err) 1826 return err; 1827 data_size -= expect; 1828 } 1829 1830 if (digest_size) { 1831 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1832 if (memcmp(dig_in, dig_vv, digest_size)) { 1833 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1834 return -EINVAL; 1835 } 1836 } 1837 1838 D_ASSERT(peer_device->device, data_size == 0); 1839 return 0; 1840 } 1841 1842 /* 1843 * e_end_resync_block() is called in ack_sender context via 1844 * drbd_finish_peer_reqs(). 1845 */ 1846 static int e_end_resync_block(struct drbd_work *w, int unused) 1847 { 1848 struct drbd_peer_request *peer_req = 1849 container_of(w, struct drbd_peer_request, w); 1850 struct drbd_peer_device *peer_device = peer_req->peer_device; 1851 struct drbd_device *device = peer_device->device; 1852 sector_t sector = peer_req->i.sector; 1853 int err; 1854 1855 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 1856 1857 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1858 drbd_set_in_sync(peer_device, sector, peer_req->i.size); 1859 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 1860 } else { 1861 /* Record failure to sync */ 1862 drbd_rs_failed_io(peer_device, sector, peer_req->i.size); 1863 1864 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 1865 } 1866 dec_unacked(device); 1867 1868 return err; 1869 } 1870 1871 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 1872 struct packet_info *pi) __releases(local) 1873 { 1874 struct drbd_device *device = peer_device->device; 1875 struct drbd_peer_request *peer_req; 1876 1877 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi); 1878 if (!peer_req) 1879 goto fail; 1880 1881 dec_rs_pending(peer_device); 1882 1883 inc_unacked(device); 1884 /* corresponding dec_unacked() in e_end_resync_block() 1885 * respective _drbd_clear_done_ee */ 1886 1887 peer_req->w.cb = e_end_resync_block; 1888 peer_req->opf = REQ_OP_WRITE; 1889 peer_req->submit_jif = jiffies; 1890 1891 spin_lock_irq(&device->resource->req_lock); 1892 list_add_tail(&peer_req->w.list, &device->sync_ee); 1893 spin_unlock_irq(&device->resource->req_lock); 1894 1895 atomic_add(pi->size >> 9, &device->rs_sect_ev); 1896 if (drbd_submit_peer_request(peer_req) == 0) 1897 return 0; 1898 1899 /* don't care for the reason here */ 1900 drbd_err(device, "submit failed, triggering re-connect\n"); 1901 spin_lock_irq(&device->resource->req_lock); 1902 list_del(&peer_req->w.list); 1903 spin_unlock_irq(&device->resource->req_lock); 1904 1905 drbd_free_peer_req(device, peer_req); 1906 fail: 1907 put_ldev(device); 1908 return -EIO; 1909 } 1910 1911 static struct drbd_request * 1912 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 1913 sector_t sector, bool missing_ok, const char *func) 1914 { 1915 struct drbd_request *req; 1916 1917 /* Request object according to our peer */ 1918 req = (struct drbd_request *)(unsigned long)id; 1919 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 1920 return req; 1921 if (!missing_ok) { 1922 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 1923 (unsigned long)id, (unsigned long long)sector); 1924 } 1925 return NULL; 1926 } 1927 1928 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 1929 { 1930 struct drbd_peer_device *peer_device; 1931 struct drbd_device *device; 1932 struct drbd_request *req; 1933 sector_t sector; 1934 int err; 1935 struct p_data *p = pi->data; 1936 1937 peer_device = conn_peer_device(connection, pi->vnr); 1938 if (!peer_device) 1939 return -EIO; 1940 device = peer_device->device; 1941 1942 sector = be64_to_cpu(p->sector); 1943 1944 spin_lock_irq(&device->resource->req_lock); 1945 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 1946 spin_unlock_irq(&device->resource->req_lock); 1947 if (unlikely(!req)) 1948 return -EIO; 1949 1950 err = recv_dless_read(peer_device, req, sector, pi->size); 1951 if (!err) 1952 req_mod(req, DATA_RECEIVED, peer_device); 1953 /* else: nothing. handled from drbd_disconnect... 1954 * I don't think we may complete this just yet 1955 * in case we are "on-disconnect: freeze" */ 1956 1957 return err; 1958 } 1959 1960 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 1961 { 1962 struct drbd_peer_device *peer_device; 1963 struct drbd_device *device; 1964 sector_t sector; 1965 int err; 1966 struct p_data *p = pi->data; 1967 1968 peer_device = conn_peer_device(connection, pi->vnr); 1969 if (!peer_device) 1970 return -EIO; 1971 device = peer_device->device; 1972 1973 sector = be64_to_cpu(p->sector); 1974 D_ASSERT(device, p->block_id == ID_SYNCER); 1975 1976 if (get_ldev(device)) { 1977 /* data is submitted to disk within recv_resync_read. 1978 * corresponding put_ldev done below on error, 1979 * or in drbd_peer_request_endio. */ 1980 err = recv_resync_read(peer_device, sector, pi); 1981 } else { 1982 if (drbd_ratelimit()) 1983 drbd_err(device, "Can not write resync data to local disk.\n"); 1984 1985 err = drbd_drain_block(peer_device, pi->size); 1986 1987 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 1988 } 1989 1990 atomic_add(pi->size >> 9, &device->rs_sect_in); 1991 1992 return err; 1993 } 1994 1995 static void restart_conflicting_writes(struct drbd_device *device, 1996 sector_t sector, int size) 1997 { 1998 struct drbd_interval *i; 1999 struct drbd_request *req; 2000 2001 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2002 if (!i->local) 2003 continue; 2004 req = container_of(i, struct drbd_request, i); 2005 if (req->rq_state & RQ_LOCAL_PENDING || 2006 !(req->rq_state & RQ_POSTPONED)) 2007 continue; 2008 /* as it is RQ_POSTPONED, this will cause it to 2009 * be queued on the retry workqueue. */ 2010 __req_mod(req, CONFLICT_RESOLVED, NULL, NULL); 2011 } 2012 } 2013 2014 /* 2015 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs(). 2016 */ 2017 static int e_end_block(struct drbd_work *w, int cancel) 2018 { 2019 struct drbd_peer_request *peer_req = 2020 container_of(w, struct drbd_peer_request, w); 2021 struct drbd_peer_device *peer_device = peer_req->peer_device; 2022 struct drbd_device *device = peer_device->device; 2023 sector_t sector = peer_req->i.sector; 2024 int err = 0, pcmd; 2025 2026 if (peer_req->flags & EE_SEND_WRITE_ACK) { 2027 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 2028 pcmd = (device->state.conn >= C_SYNC_SOURCE && 2029 device->state.conn <= C_PAUSED_SYNC_T && 2030 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 2031 P_RS_WRITE_ACK : P_WRITE_ACK; 2032 err = drbd_send_ack(peer_device, pcmd, peer_req); 2033 if (pcmd == P_RS_WRITE_ACK) 2034 drbd_set_in_sync(peer_device, sector, peer_req->i.size); 2035 } else { 2036 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 2037 /* we expect it to be marked out of sync anyways... 2038 * maybe assert this? */ 2039 } 2040 dec_unacked(device); 2041 } 2042 2043 /* we delete from the conflict detection hash _after_ we sent out the 2044 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 2045 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 2046 spin_lock_irq(&device->resource->req_lock); 2047 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 2048 drbd_remove_epoch_entry_interval(device, peer_req); 2049 if (peer_req->flags & EE_RESTART_REQUESTS) 2050 restart_conflicting_writes(device, sector, peer_req->i.size); 2051 spin_unlock_irq(&device->resource->req_lock); 2052 } else 2053 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 2054 2055 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 2056 2057 return err; 2058 } 2059 2060 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 2061 { 2062 struct drbd_peer_request *peer_req = 2063 container_of(w, struct drbd_peer_request, w); 2064 struct drbd_peer_device *peer_device = peer_req->peer_device; 2065 int err; 2066 2067 err = drbd_send_ack(peer_device, ack, peer_req); 2068 dec_unacked(peer_device->device); 2069 2070 return err; 2071 } 2072 2073 static int e_send_superseded(struct drbd_work *w, int unused) 2074 { 2075 return e_send_ack(w, P_SUPERSEDED); 2076 } 2077 2078 static int e_send_retry_write(struct drbd_work *w, int unused) 2079 { 2080 struct drbd_peer_request *peer_req = 2081 container_of(w, struct drbd_peer_request, w); 2082 struct drbd_connection *connection = peer_req->peer_device->connection; 2083 2084 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 2085 P_RETRY_WRITE : P_SUPERSEDED); 2086 } 2087 2088 static bool seq_greater(u32 a, u32 b) 2089 { 2090 /* 2091 * We assume 32-bit wrap-around here. 2092 * For 24-bit wrap-around, we would have to shift: 2093 * a <<= 8; b <<= 8; 2094 */ 2095 return (s32)a - (s32)b > 0; 2096 } 2097 2098 static u32 seq_max(u32 a, u32 b) 2099 { 2100 return seq_greater(a, b) ? a : b; 2101 } 2102 2103 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 2104 { 2105 struct drbd_device *device = peer_device->device; 2106 unsigned int newest_peer_seq; 2107 2108 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 2109 spin_lock(&device->peer_seq_lock); 2110 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 2111 device->peer_seq = newest_peer_seq; 2112 spin_unlock(&device->peer_seq_lock); 2113 /* wake up only if we actually changed device->peer_seq */ 2114 if (peer_seq == newest_peer_seq) 2115 wake_up(&device->seq_wait); 2116 } 2117 } 2118 2119 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 2120 { 2121 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 2122 } 2123 2124 /* maybe change sync_ee into interval trees as well? */ 2125 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 2126 { 2127 struct drbd_peer_request *rs_req; 2128 bool rv = false; 2129 2130 spin_lock_irq(&device->resource->req_lock); 2131 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 2132 if (overlaps(peer_req->i.sector, peer_req->i.size, 2133 rs_req->i.sector, rs_req->i.size)) { 2134 rv = true; 2135 break; 2136 } 2137 } 2138 spin_unlock_irq(&device->resource->req_lock); 2139 2140 return rv; 2141 } 2142 2143 /* Called from receive_Data. 2144 * Synchronize packets on sock with packets on msock. 2145 * 2146 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 2147 * packet traveling on msock, they are still processed in the order they have 2148 * been sent. 2149 * 2150 * Note: we don't care for Ack packets overtaking P_DATA packets. 2151 * 2152 * In case packet_seq is larger than device->peer_seq number, there are 2153 * outstanding packets on the msock. We wait for them to arrive. 2154 * In case we are the logically next packet, we update device->peer_seq 2155 * ourselves. Correctly handles 32bit wrap around. 2156 * 2157 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 2158 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 2159 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 2160 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 2161 * 2162 * returns 0 if we may process the packet, 2163 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 2164 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 2165 { 2166 struct drbd_device *device = peer_device->device; 2167 DEFINE_WAIT(wait); 2168 long timeout; 2169 int ret = 0, tp; 2170 2171 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 2172 return 0; 2173 2174 spin_lock(&device->peer_seq_lock); 2175 for (;;) { 2176 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 2177 device->peer_seq = seq_max(device->peer_seq, peer_seq); 2178 break; 2179 } 2180 2181 if (signal_pending(current)) { 2182 ret = -ERESTARTSYS; 2183 break; 2184 } 2185 2186 rcu_read_lock(); 2187 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2188 rcu_read_unlock(); 2189 2190 if (!tp) 2191 break; 2192 2193 /* Only need to wait if two_primaries is enabled */ 2194 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2195 spin_unlock(&device->peer_seq_lock); 2196 rcu_read_lock(); 2197 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2198 rcu_read_unlock(); 2199 timeout = schedule_timeout(timeout); 2200 spin_lock(&device->peer_seq_lock); 2201 if (!timeout) { 2202 ret = -ETIMEDOUT; 2203 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2204 break; 2205 } 2206 } 2207 spin_unlock(&device->peer_seq_lock); 2208 finish_wait(&device->seq_wait, &wait); 2209 return ret; 2210 } 2211 2212 static enum req_op wire_flags_to_bio_op(u32 dpf) 2213 { 2214 if (dpf & DP_ZEROES) 2215 return REQ_OP_WRITE_ZEROES; 2216 if (dpf & DP_DISCARD) 2217 return REQ_OP_DISCARD; 2218 else 2219 return REQ_OP_WRITE; 2220 } 2221 2222 /* see also bio_flags_to_wire() */ 2223 static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf) 2224 { 2225 return wire_flags_to_bio_op(dpf) | 2226 (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2227 (dpf & DP_FUA ? REQ_FUA : 0) | 2228 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0); 2229 } 2230 2231 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2232 unsigned int size) 2233 { 2234 struct drbd_peer_device *peer_device = first_peer_device(device); 2235 struct drbd_interval *i; 2236 2237 repeat: 2238 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2239 struct drbd_request *req; 2240 struct bio_and_error m; 2241 2242 if (!i->local) 2243 continue; 2244 req = container_of(i, struct drbd_request, i); 2245 if (!(req->rq_state & RQ_POSTPONED)) 2246 continue; 2247 req->rq_state &= ~RQ_POSTPONED; 2248 __req_mod(req, NEG_ACKED, peer_device, &m); 2249 spin_unlock_irq(&device->resource->req_lock); 2250 if (m.bio) 2251 complete_master_bio(device, &m); 2252 spin_lock_irq(&device->resource->req_lock); 2253 goto repeat; 2254 } 2255 } 2256 2257 static int handle_write_conflicts(struct drbd_device *device, 2258 struct drbd_peer_request *peer_req) 2259 { 2260 struct drbd_connection *connection = peer_req->peer_device->connection; 2261 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2262 sector_t sector = peer_req->i.sector; 2263 const unsigned int size = peer_req->i.size; 2264 struct drbd_interval *i; 2265 bool equal; 2266 int err; 2267 2268 /* 2269 * Inserting the peer request into the write_requests tree will prevent 2270 * new conflicting local requests from being added. 2271 */ 2272 drbd_insert_interval(&device->write_requests, &peer_req->i); 2273 2274 repeat: 2275 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2276 if (i == &peer_req->i) 2277 continue; 2278 if (i->completed) 2279 continue; 2280 2281 if (!i->local) { 2282 /* 2283 * Our peer has sent a conflicting remote request; this 2284 * should not happen in a two-node setup. Wait for the 2285 * earlier peer request to complete. 2286 */ 2287 err = drbd_wait_misc(device, i); 2288 if (err) 2289 goto out; 2290 goto repeat; 2291 } 2292 2293 equal = i->sector == sector && i->size == size; 2294 if (resolve_conflicts) { 2295 /* 2296 * If the peer request is fully contained within the 2297 * overlapping request, it can be considered overwritten 2298 * and thus superseded; otherwise, it will be retried 2299 * once all overlapping requests have completed. 2300 */ 2301 bool superseded = i->sector <= sector && i->sector + 2302 (i->size >> 9) >= sector + (size >> 9); 2303 2304 if (!equal) 2305 drbd_alert(device, "Concurrent writes detected: " 2306 "local=%llus +%u, remote=%llus +%u, " 2307 "assuming %s came first\n", 2308 (unsigned long long)i->sector, i->size, 2309 (unsigned long long)sector, size, 2310 superseded ? "local" : "remote"); 2311 2312 peer_req->w.cb = superseded ? e_send_superseded : 2313 e_send_retry_write; 2314 list_add_tail(&peer_req->w.list, &device->done_ee); 2315 /* put is in drbd_send_acks_wf() */ 2316 kref_get(&device->kref); 2317 if (!queue_work(connection->ack_sender, 2318 &peer_req->peer_device->send_acks_work)) 2319 kref_put(&device->kref, drbd_destroy_device); 2320 2321 err = -ENOENT; 2322 goto out; 2323 } else { 2324 struct drbd_request *req = 2325 container_of(i, struct drbd_request, i); 2326 2327 if (!equal) 2328 drbd_alert(device, "Concurrent writes detected: " 2329 "local=%llus +%u, remote=%llus +%u\n", 2330 (unsigned long long)i->sector, i->size, 2331 (unsigned long long)sector, size); 2332 2333 if (req->rq_state & RQ_LOCAL_PENDING || 2334 !(req->rq_state & RQ_POSTPONED)) { 2335 /* 2336 * Wait for the node with the discard flag to 2337 * decide if this request has been superseded 2338 * or needs to be retried. 2339 * Requests that have been superseded will 2340 * disappear from the write_requests tree. 2341 * 2342 * In addition, wait for the conflicting 2343 * request to finish locally before submitting 2344 * the conflicting peer request. 2345 */ 2346 err = drbd_wait_misc(device, &req->i); 2347 if (err) { 2348 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2349 fail_postponed_requests(device, sector, size); 2350 goto out; 2351 } 2352 goto repeat; 2353 } 2354 /* 2355 * Remember to restart the conflicting requests after 2356 * the new peer request has completed. 2357 */ 2358 peer_req->flags |= EE_RESTART_REQUESTS; 2359 } 2360 } 2361 err = 0; 2362 2363 out: 2364 if (err) 2365 drbd_remove_epoch_entry_interval(device, peer_req); 2366 return err; 2367 } 2368 2369 /* mirrored write */ 2370 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2371 { 2372 struct drbd_peer_device *peer_device; 2373 struct drbd_device *device; 2374 struct net_conf *nc; 2375 sector_t sector; 2376 struct drbd_peer_request *peer_req; 2377 struct p_data *p = pi->data; 2378 u32 peer_seq = be32_to_cpu(p->seq_num); 2379 u32 dp_flags; 2380 int err, tp; 2381 2382 peer_device = conn_peer_device(connection, pi->vnr); 2383 if (!peer_device) 2384 return -EIO; 2385 device = peer_device->device; 2386 2387 if (!get_ldev(device)) { 2388 int err2; 2389 2390 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2391 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2392 atomic_inc(&connection->current_epoch->epoch_size); 2393 err2 = drbd_drain_block(peer_device, pi->size); 2394 if (!err) 2395 err = err2; 2396 return err; 2397 } 2398 2399 /* 2400 * Corresponding put_ldev done either below (on various errors), or in 2401 * drbd_peer_request_endio, if we successfully submit the data at the 2402 * end of this function. 2403 */ 2404 2405 sector = be64_to_cpu(p->sector); 2406 peer_req = read_in_block(peer_device, p->block_id, sector, pi); 2407 if (!peer_req) { 2408 put_ldev(device); 2409 return -EIO; 2410 } 2411 2412 peer_req->w.cb = e_end_block; 2413 peer_req->submit_jif = jiffies; 2414 peer_req->flags |= EE_APPLICATION; 2415 2416 dp_flags = be32_to_cpu(p->dp_flags); 2417 peer_req->opf = wire_flags_to_bio(connection, dp_flags); 2418 if (pi->cmd == P_TRIM) { 2419 D_ASSERT(peer_device, peer_req->i.size > 0); 2420 D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD); 2421 D_ASSERT(peer_device, peer_req->pages == NULL); 2422 /* need to play safe: an older DRBD sender 2423 * may mean zero-out while sending P_TRIM. */ 2424 if (0 == (connection->agreed_features & DRBD_FF_WZEROES)) 2425 peer_req->flags |= EE_ZEROOUT; 2426 } else if (pi->cmd == P_ZEROES) { 2427 D_ASSERT(peer_device, peer_req->i.size > 0); 2428 D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES); 2429 D_ASSERT(peer_device, peer_req->pages == NULL); 2430 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */ 2431 if (dp_flags & DP_DISCARD) 2432 peer_req->flags |= EE_TRIM; 2433 } else if (peer_req->pages == NULL) { 2434 D_ASSERT(device, peer_req->i.size == 0); 2435 D_ASSERT(device, dp_flags & DP_FLUSH); 2436 } 2437 2438 if (dp_flags & DP_MAY_SET_IN_SYNC) 2439 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2440 2441 spin_lock(&connection->epoch_lock); 2442 peer_req->epoch = connection->current_epoch; 2443 atomic_inc(&peer_req->epoch->epoch_size); 2444 atomic_inc(&peer_req->epoch->active); 2445 spin_unlock(&connection->epoch_lock); 2446 2447 rcu_read_lock(); 2448 nc = rcu_dereference(peer_device->connection->net_conf); 2449 tp = nc->two_primaries; 2450 if (peer_device->connection->agreed_pro_version < 100) { 2451 switch (nc->wire_protocol) { 2452 case DRBD_PROT_C: 2453 dp_flags |= DP_SEND_WRITE_ACK; 2454 break; 2455 case DRBD_PROT_B: 2456 dp_flags |= DP_SEND_RECEIVE_ACK; 2457 break; 2458 } 2459 } 2460 rcu_read_unlock(); 2461 2462 if (dp_flags & DP_SEND_WRITE_ACK) { 2463 peer_req->flags |= EE_SEND_WRITE_ACK; 2464 inc_unacked(device); 2465 /* corresponding dec_unacked() in e_end_block() 2466 * respective _drbd_clear_done_ee */ 2467 } 2468 2469 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2470 /* I really don't like it that the receiver thread 2471 * sends on the msock, but anyways */ 2472 drbd_send_ack(peer_device, P_RECV_ACK, peer_req); 2473 } 2474 2475 if (tp) { 2476 /* two primaries implies protocol C */ 2477 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK); 2478 peer_req->flags |= EE_IN_INTERVAL_TREE; 2479 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2480 if (err) 2481 goto out_interrupted; 2482 spin_lock_irq(&device->resource->req_lock); 2483 err = handle_write_conflicts(device, peer_req); 2484 if (err) { 2485 spin_unlock_irq(&device->resource->req_lock); 2486 if (err == -ENOENT) { 2487 put_ldev(device); 2488 return 0; 2489 } 2490 goto out_interrupted; 2491 } 2492 } else { 2493 update_peer_seq(peer_device, peer_seq); 2494 spin_lock_irq(&device->resource->req_lock); 2495 } 2496 /* TRIM and is processed synchronously, 2497 * we wait for all pending requests, respectively wait for 2498 * active_ee to become empty in drbd_submit_peer_request(); 2499 * better not add ourselves here. */ 2500 if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0) 2501 list_add_tail(&peer_req->w.list, &device->active_ee); 2502 spin_unlock_irq(&device->resource->req_lock); 2503 2504 if (device->state.conn == C_SYNC_TARGET) 2505 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2506 2507 if (device->state.pdsk < D_INCONSISTENT) { 2508 /* In case we have the only disk of the cluster, */ 2509 drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size); 2510 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2511 drbd_al_begin_io(device, &peer_req->i); 2512 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2513 } 2514 2515 err = drbd_submit_peer_request(peer_req); 2516 if (!err) 2517 return 0; 2518 2519 /* don't care for the reason here */ 2520 drbd_err(device, "submit failed, triggering re-connect\n"); 2521 spin_lock_irq(&device->resource->req_lock); 2522 list_del(&peer_req->w.list); 2523 drbd_remove_epoch_entry_interval(device, peer_req); 2524 spin_unlock_irq(&device->resource->req_lock); 2525 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) { 2526 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 2527 drbd_al_complete_io(device, &peer_req->i); 2528 } 2529 2530 out_interrupted: 2531 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP); 2532 put_ldev(device); 2533 drbd_free_peer_req(device, peer_req); 2534 return err; 2535 } 2536 2537 /* We may throttle resync, if the lower device seems to be busy, 2538 * and current sync rate is above c_min_rate. 2539 * 2540 * To decide whether or not the lower device is busy, we use a scheme similar 2541 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2542 * (more than 64 sectors) of activity we cannot account for with our own resync 2543 * activity, it obviously is "busy". 2544 * 2545 * The current sync rate used here uses only the most recent two step marks, 2546 * to have a short time average so we can react faster. 2547 */ 2548 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector, 2549 bool throttle_if_app_is_waiting) 2550 { 2551 struct drbd_device *device = peer_device->device; 2552 struct lc_element *tmp; 2553 bool throttle = drbd_rs_c_min_rate_throttle(device); 2554 2555 if (!throttle || throttle_if_app_is_waiting) 2556 return throttle; 2557 2558 spin_lock_irq(&device->al_lock); 2559 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2560 if (tmp) { 2561 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2562 if (test_bit(BME_PRIORITY, &bm_ext->flags)) 2563 throttle = false; 2564 /* Do not slow down if app IO is already waiting for this extent, 2565 * and our progress is necessary for application IO to complete. */ 2566 } 2567 spin_unlock_irq(&device->al_lock); 2568 2569 return throttle; 2570 } 2571 2572 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) 2573 { 2574 struct gendisk *disk = device->ldev->backing_bdev->bd_disk; 2575 unsigned long db, dt, dbdt; 2576 unsigned int c_min_rate; 2577 int curr_events; 2578 2579 rcu_read_lock(); 2580 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2581 rcu_read_unlock(); 2582 2583 /* feature disabled? */ 2584 if (c_min_rate == 0) 2585 return false; 2586 2587 curr_events = (int)part_stat_read_accum(disk->part0, sectors) - 2588 atomic_read(&device->rs_sect_ev); 2589 2590 if (atomic_read(&device->ap_actlog_cnt) 2591 || curr_events - device->rs_last_events > 64) { 2592 unsigned long rs_left; 2593 int i; 2594 2595 device->rs_last_events = curr_events; 2596 2597 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2598 * approx. */ 2599 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2600 2601 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2602 rs_left = device->ov_left; 2603 else 2604 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2605 2606 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2607 if (!dt) 2608 dt++; 2609 db = device->rs_mark_left[i] - rs_left; 2610 dbdt = Bit2KB(db/dt); 2611 2612 if (dbdt > c_min_rate) 2613 return true; 2614 } 2615 return false; 2616 } 2617 2618 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2619 { 2620 struct drbd_peer_device *peer_device; 2621 struct drbd_device *device; 2622 sector_t sector; 2623 sector_t capacity; 2624 struct drbd_peer_request *peer_req; 2625 struct digest_info *di = NULL; 2626 int size, verb; 2627 struct p_block_req *p = pi->data; 2628 2629 peer_device = conn_peer_device(connection, pi->vnr); 2630 if (!peer_device) 2631 return -EIO; 2632 device = peer_device->device; 2633 capacity = get_capacity(device->vdisk); 2634 2635 sector = be64_to_cpu(p->sector); 2636 size = be32_to_cpu(p->blksize); 2637 2638 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2639 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2640 (unsigned long long)sector, size); 2641 return -EINVAL; 2642 } 2643 if (sector + (size>>9) > capacity) { 2644 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2645 (unsigned long long)sector, size); 2646 return -EINVAL; 2647 } 2648 2649 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2650 verb = 1; 2651 switch (pi->cmd) { 2652 case P_DATA_REQUEST: 2653 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2654 break; 2655 case P_RS_THIN_REQ: 2656 case P_RS_DATA_REQUEST: 2657 case P_CSUM_RS_REQUEST: 2658 case P_OV_REQUEST: 2659 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2660 break; 2661 case P_OV_REPLY: 2662 verb = 0; 2663 dec_rs_pending(peer_device); 2664 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2665 break; 2666 default: 2667 BUG(); 2668 } 2669 if (verb && drbd_ratelimit()) 2670 drbd_err(device, "Can not satisfy peer's read request, " 2671 "no local data.\n"); 2672 2673 /* drain possibly payload */ 2674 return drbd_drain_block(peer_device, pi->size); 2675 } 2676 2677 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2678 * "criss-cross" setup, that might cause write-out on some other DRBD, 2679 * which in turn might block on the other node at this very place. */ 2680 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, 2681 size, GFP_NOIO); 2682 if (!peer_req) { 2683 put_ldev(device); 2684 return -ENOMEM; 2685 } 2686 peer_req->opf = REQ_OP_READ; 2687 2688 switch (pi->cmd) { 2689 case P_DATA_REQUEST: 2690 peer_req->w.cb = w_e_end_data_req; 2691 /* application IO, don't drbd_rs_begin_io */ 2692 peer_req->flags |= EE_APPLICATION; 2693 goto submit; 2694 2695 case P_RS_THIN_REQ: 2696 /* If at some point in the future we have a smart way to 2697 find out if this data block is completely deallocated, 2698 then we would do something smarter here than reading 2699 the block... */ 2700 peer_req->flags |= EE_RS_THIN_REQ; 2701 fallthrough; 2702 case P_RS_DATA_REQUEST: 2703 peer_req->w.cb = w_e_end_rsdata_req; 2704 /* used in the sector offset progress display */ 2705 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2706 break; 2707 2708 case P_OV_REPLY: 2709 case P_CSUM_RS_REQUEST: 2710 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2711 if (!di) 2712 goto out_free_e; 2713 2714 di->digest_size = pi->size; 2715 di->digest = (((char *)di)+sizeof(struct digest_info)); 2716 2717 peer_req->digest = di; 2718 peer_req->flags |= EE_HAS_DIGEST; 2719 2720 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2721 goto out_free_e; 2722 2723 if (pi->cmd == P_CSUM_RS_REQUEST) { 2724 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2725 peer_req->w.cb = w_e_end_csum_rs_req; 2726 /* used in the sector offset progress display */ 2727 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2728 /* remember to report stats in drbd_resync_finished */ 2729 device->use_csums = true; 2730 } else if (pi->cmd == P_OV_REPLY) { 2731 /* track progress, we may need to throttle */ 2732 atomic_add(size >> 9, &device->rs_sect_in); 2733 peer_req->w.cb = w_e_end_ov_reply; 2734 dec_rs_pending(peer_device); 2735 /* drbd_rs_begin_io done when we sent this request, 2736 * but accounting still needs to be done. */ 2737 goto submit_for_resync; 2738 } 2739 break; 2740 2741 case P_OV_REQUEST: 2742 if (device->ov_start_sector == ~(sector_t)0 && 2743 peer_device->connection->agreed_pro_version >= 90) { 2744 unsigned long now = jiffies; 2745 int i; 2746 device->ov_start_sector = sector; 2747 device->ov_position = sector; 2748 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2749 device->rs_total = device->ov_left; 2750 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2751 device->rs_mark_left[i] = device->ov_left; 2752 device->rs_mark_time[i] = now; 2753 } 2754 drbd_info(device, "Online Verify start sector: %llu\n", 2755 (unsigned long long)sector); 2756 } 2757 peer_req->w.cb = w_e_end_ov_req; 2758 break; 2759 2760 default: 2761 BUG(); 2762 } 2763 2764 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2765 * wrt the receiver, but it is not as straightforward as it may seem. 2766 * Various places in the resync start and stop logic assume resync 2767 * requests are processed in order, requeuing this on the worker thread 2768 * introduces a bunch of new code for synchronization between threads. 2769 * 2770 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2771 * "forever", throttling after drbd_rs_begin_io will lock that extent 2772 * for application writes for the same time. For now, just throttle 2773 * here, where the rest of the code expects the receiver to sleep for 2774 * a while, anyways. 2775 */ 2776 2777 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2778 * this defers syncer requests for some time, before letting at least 2779 * on request through. The resync controller on the receiving side 2780 * will adapt to the incoming rate accordingly. 2781 * 2782 * We cannot throttle here if remote is Primary/SyncTarget: 2783 * we would also throttle its application reads. 2784 * In that case, throttling is done on the SyncTarget only. 2785 */ 2786 2787 /* Even though this may be a resync request, we do add to "read_ee"; 2788 * "sync_ee" is only used for resync WRITEs. 2789 * Add to list early, so debugfs can find this request 2790 * even if we have to sleep below. */ 2791 spin_lock_irq(&device->resource->req_lock); 2792 list_add_tail(&peer_req->w.list, &device->read_ee); 2793 spin_unlock_irq(&device->resource->req_lock); 2794 2795 update_receiver_timing_details(connection, drbd_rs_should_slow_down); 2796 if (device->state.peer != R_PRIMARY 2797 && drbd_rs_should_slow_down(peer_device, sector, false)) 2798 schedule_timeout_uninterruptible(HZ/10); 2799 update_receiver_timing_details(connection, drbd_rs_begin_io); 2800 if (drbd_rs_begin_io(device, sector)) 2801 goto out_free_e; 2802 2803 submit_for_resync: 2804 atomic_add(size >> 9, &device->rs_sect_ev); 2805 2806 submit: 2807 update_receiver_timing_details(connection, drbd_submit_peer_request); 2808 inc_unacked(device); 2809 if (drbd_submit_peer_request(peer_req) == 0) 2810 return 0; 2811 2812 /* don't care for the reason here */ 2813 drbd_err(device, "submit failed, triggering re-connect\n"); 2814 2815 out_free_e: 2816 spin_lock_irq(&device->resource->req_lock); 2817 list_del(&peer_req->w.list); 2818 spin_unlock_irq(&device->resource->req_lock); 2819 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2820 2821 put_ldev(device); 2822 drbd_free_peer_req(device, peer_req); 2823 return -EIO; 2824 } 2825 2826 /* 2827 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2828 */ 2829 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2830 { 2831 struct drbd_device *device = peer_device->device; 2832 int self, peer, rv = -100; 2833 unsigned long ch_self, ch_peer; 2834 enum drbd_after_sb_p after_sb_0p; 2835 2836 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2837 peer = device->p_uuid[UI_BITMAP] & 1; 2838 2839 ch_peer = device->p_uuid[UI_SIZE]; 2840 ch_self = device->comm_bm_set; 2841 2842 rcu_read_lock(); 2843 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2844 rcu_read_unlock(); 2845 switch (after_sb_0p) { 2846 case ASB_CONSENSUS: 2847 case ASB_DISCARD_SECONDARY: 2848 case ASB_CALL_HELPER: 2849 case ASB_VIOLENTLY: 2850 drbd_err(device, "Configuration error.\n"); 2851 break; 2852 case ASB_DISCONNECT: 2853 break; 2854 case ASB_DISCARD_YOUNGER_PRI: 2855 if (self == 0 && peer == 1) { 2856 rv = -1; 2857 break; 2858 } 2859 if (self == 1 && peer == 0) { 2860 rv = 1; 2861 break; 2862 } 2863 fallthrough; /* to one of the other strategies */ 2864 case ASB_DISCARD_OLDER_PRI: 2865 if (self == 0 && peer == 1) { 2866 rv = 1; 2867 break; 2868 } 2869 if (self == 1 && peer == 0) { 2870 rv = -1; 2871 break; 2872 } 2873 /* Else fall through to one of the other strategies... */ 2874 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 2875 "Using discard-least-changes instead\n"); 2876 fallthrough; 2877 case ASB_DISCARD_ZERO_CHG: 2878 if (ch_peer == 0 && ch_self == 0) { 2879 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2880 ? -1 : 1; 2881 break; 2882 } else { 2883 if (ch_peer == 0) { rv = 1; break; } 2884 if (ch_self == 0) { rv = -1; break; } 2885 } 2886 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 2887 break; 2888 fallthrough; 2889 case ASB_DISCARD_LEAST_CHG: 2890 if (ch_self < ch_peer) 2891 rv = -1; 2892 else if (ch_self > ch_peer) 2893 rv = 1; 2894 else /* ( ch_self == ch_peer ) */ 2895 /* Well, then use something else. */ 2896 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 2897 ? -1 : 1; 2898 break; 2899 case ASB_DISCARD_LOCAL: 2900 rv = -1; 2901 break; 2902 case ASB_DISCARD_REMOTE: 2903 rv = 1; 2904 } 2905 2906 return rv; 2907 } 2908 2909 /* 2910 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 2911 */ 2912 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 2913 { 2914 struct drbd_device *device = peer_device->device; 2915 int hg, rv = -100; 2916 enum drbd_after_sb_p after_sb_1p; 2917 2918 rcu_read_lock(); 2919 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 2920 rcu_read_unlock(); 2921 switch (after_sb_1p) { 2922 case ASB_DISCARD_YOUNGER_PRI: 2923 case ASB_DISCARD_OLDER_PRI: 2924 case ASB_DISCARD_LEAST_CHG: 2925 case ASB_DISCARD_LOCAL: 2926 case ASB_DISCARD_REMOTE: 2927 case ASB_DISCARD_ZERO_CHG: 2928 drbd_err(device, "Configuration error.\n"); 2929 break; 2930 case ASB_DISCONNECT: 2931 break; 2932 case ASB_CONSENSUS: 2933 hg = drbd_asb_recover_0p(peer_device); 2934 if (hg == -1 && device->state.role == R_SECONDARY) 2935 rv = hg; 2936 if (hg == 1 && device->state.role == R_PRIMARY) 2937 rv = hg; 2938 break; 2939 case ASB_VIOLENTLY: 2940 rv = drbd_asb_recover_0p(peer_device); 2941 break; 2942 case ASB_DISCARD_SECONDARY: 2943 return device->state.role == R_PRIMARY ? 1 : -1; 2944 case ASB_CALL_HELPER: 2945 hg = drbd_asb_recover_0p(peer_device); 2946 if (hg == -1 && device->state.role == R_PRIMARY) { 2947 enum drbd_state_rv rv2; 2948 2949 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 2950 * we might be here in C_WF_REPORT_PARAMS which is transient. 2951 * we do not need to wait for the after state change work either. */ 2952 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 2953 if (rv2 != SS_SUCCESS) { 2954 drbd_khelper(device, "pri-lost-after-sb"); 2955 } else { 2956 drbd_warn(device, "Successfully gave up primary role.\n"); 2957 rv = hg; 2958 } 2959 } else 2960 rv = hg; 2961 } 2962 2963 return rv; 2964 } 2965 2966 /* 2967 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 2968 */ 2969 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 2970 { 2971 struct drbd_device *device = peer_device->device; 2972 int hg, rv = -100; 2973 enum drbd_after_sb_p after_sb_2p; 2974 2975 rcu_read_lock(); 2976 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 2977 rcu_read_unlock(); 2978 switch (after_sb_2p) { 2979 case ASB_DISCARD_YOUNGER_PRI: 2980 case ASB_DISCARD_OLDER_PRI: 2981 case ASB_DISCARD_LEAST_CHG: 2982 case ASB_DISCARD_LOCAL: 2983 case ASB_DISCARD_REMOTE: 2984 case ASB_CONSENSUS: 2985 case ASB_DISCARD_SECONDARY: 2986 case ASB_DISCARD_ZERO_CHG: 2987 drbd_err(device, "Configuration error.\n"); 2988 break; 2989 case ASB_VIOLENTLY: 2990 rv = drbd_asb_recover_0p(peer_device); 2991 break; 2992 case ASB_DISCONNECT: 2993 break; 2994 case ASB_CALL_HELPER: 2995 hg = drbd_asb_recover_0p(peer_device); 2996 if (hg == -1) { 2997 enum drbd_state_rv rv2; 2998 2999 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 3000 * we might be here in C_WF_REPORT_PARAMS which is transient. 3001 * we do not need to wait for the after state change work either. */ 3002 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 3003 if (rv2 != SS_SUCCESS) { 3004 drbd_khelper(device, "pri-lost-after-sb"); 3005 } else { 3006 drbd_warn(device, "Successfully gave up primary role.\n"); 3007 rv = hg; 3008 } 3009 } else 3010 rv = hg; 3011 } 3012 3013 return rv; 3014 } 3015 3016 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 3017 u64 bits, u64 flags) 3018 { 3019 if (!uuid) { 3020 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 3021 return; 3022 } 3023 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 3024 text, 3025 (unsigned long long)uuid[UI_CURRENT], 3026 (unsigned long long)uuid[UI_BITMAP], 3027 (unsigned long long)uuid[UI_HISTORY_START], 3028 (unsigned long long)uuid[UI_HISTORY_END], 3029 (unsigned long long)bits, 3030 (unsigned long long)flags); 3031 } 3032 3033 /* 3034 100 after split brain try auto recover 3035 2 C_SYNC_SOURCE set BitMap 3036 1 C_SYNC_SOURCE use BitMap 3037 0 no Sync 3038 -1 C_SYNC_TARGET use BitMap 3039 -2 C_SYNC_TARGET set BitMap 3040 -100 after split brain, disconnect 3041 -1000 unrelated data 3042 -1091 requires proto 91 3043 -1096 requires proto 96 3044 */ 3045 3046 static int drbd_uuid_compare(struct drbd_peer_device *const peer_device, 3047 enum drbd_role const peer_role, int *rule_nr) __must_hold(local) 3048 { 3049 struct drbd_connection *const connection = peer_device->connection; 3050 struct drbd_device *device = peer_device->device; 3051 u64 self, peer; 3052 int i, j; 3053 3054 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3055 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3056 3057 *rule_nr = 10; 3058 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 3059 return 0; 3060 3061 *rule_nr = 20; 3062 if ((self == UUID_JUST_CREATED || self == (u64)0) && 3063 peer != UUID_JUST_CREATED) 3064 return -2; 3065 3066 *rule_nr = 30; 3067 if (self != UUID_JUST_CREATED && 3068 (peer == UUID_JUST_CREATED || peer == (u64)0)) 3069 return 2; 3070 3071 if (self == peer) { 3072 int rct, dc; /* roles at crash time */ 3073 3074 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 3075 3076 if (connection->agreed_pro_version < 91) 3077 return -1091; 3078 3079 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 3080 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 3081 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 3082 drbd_uuid_move_history(device); 3083 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 3084 device->ldev->md.uuid[UI_BITMAP] = 0; 3085 3086 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3087 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3088 *rule_nr = 34; 3089 } else { 3090 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 3091 *rule_nr = 36; 3092 } 3093 3094 return 1; 3095 } 3096 3097 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 3098 3099 if (connection->agreed_pro_version < 91) 3100 return -1091; 3101 3102 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 3103 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 3104 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 3105 3106 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 3107 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 3108 device->p_uuid[UI_BITMAP] = 0UL; 3109 3110 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3111 *rule_nr = 35; 3112 } else { 3113 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 3114 *rule_nr = 37; 3115 } 3116 3117 return -1; 3118 } 3119 3120 /* Common power [off|failure] */ 3121 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 3122 (device->p_uuid[UI_FLAGS] & 2); 3123 /* lowest bit is set when we were primary, 3124 * next bit (weight 2) is set when peer was primary */ 3125 *rule_nr = 40; 3126 3127 /* Neither has the "crashed primary" flag set, 3128 * only a replication link hickup. */ 3129 if (rct == 0) 3130 return 0; 3131 3132 /* Current UUID equal and no bitmap uuid; does not necessarily 3133 * mean this was a "simultaneous hard crash", maybe IO was 3134 * frozen, so no UUID-bump happened. 3135 * This is a protocol change, overload DRBD_FF_WSAME as flag 3136 * for "new-enough" peer DRBD version. */ 3137 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) { 3138 *rule_nr = 41; 3139 if (!(connection->agreed_features & DRBD_FF_WSAME)) { 3140 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n"); 3141 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8)); 3142 } 3143 if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) { 3144 /* At least one has the "crashed primary" bit set, 3145 * both are primary now, but neither has rotated its UUIDs? 3146 * "Can not happen." */ 3147 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n"); 3148 return -100; 3149 } 3150 if (device->state.role == R_PRIMARY) 3151 return 1; 3152 return -1; 3153 } 3154 3155 /* Both are secondary. 3156 * Really looks like recovery from simultaneous hard crash. 3157 * Check which had been primary before, and arbitrate. */ 3158 switch (rct) { 3159 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */ 3160 case 1: /* self_pri && !peer_pri */ return 1; 3161 case 2: /* !self_pri && peer_pri */ return -1; 3162 case 3: /* self_pri && peer_pri */ 3163 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags); 3164 return dc ? -1 : 1; 3165 } 3166 } 3167 3168 *rule_nr = 50; 3169 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3170 if (self == peer) 3171 return -1; 3172 3173 *rule_nr = 51; 3174 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 3175 if (self == peer) { 3176 if (connection->agreed_pro_version < 96 ? 3177 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 3178 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 3179 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 3180 /* The last P_SYNC_UUID did not get though. Undo the last start of 3181 resync as sync source modifications of the peer's UUIDs. */ 3182 3183 if (connection->agreed_pro_version < 91) 3184 return -1091; 3185 3186 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 3187 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 3188 3189 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 3190 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3191 3192 return -1; 3193 } 3194 } 3195 3196 *rule_nr = 60; 3197 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3198 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3199 peer = device->p_uuid[i] & ~((u64)1); 3200 if (self == peer) 3201 return -2; 3202 } 3203 3204 *rule_nr = 70; 3205 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3206 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3207 if (self == peer) 3208 return 1; 3209 3210 *rule_nr = 71; 3211 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 3212 if (self == peer) { 3213 if (connection->agreed_pro_version < 96 ? 3214 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 3215 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 3216 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 3217 /* The last P_SYNC_UUID did not get though. Undo the last start of 3218 resync as sync source modifications of our UUIDs. */ 3219 3220 if (connection->agreed_pro_version < 91) 3221 return -1091; 3222 3223 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 3224 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 3225 3226 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 3227 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3228 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3229 3230 return 1; 3231 } 3232 } 3233 3234 3235 *rule_nr = 80; 3236 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3237 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3238 self = device->ldev->md.uuid[i] & ~((u64)1); 3239 if (self == peer) 3240 return 2; 3241 } 3242 3243 *rule_nr = 90; 3244 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3245 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3246 if (self == peer && self != ((u64)0)) 3247 return 100; 3248 3249 *rule_nr = 100; 3250 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3251 self = device->ldev->md.uuid[i] & ~((u64)1); 3252 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 3253 peer = device->p_uuid[j] & ~((u64)1); 3254 if (self == peer) 3255 return -100; 3256 } 3257 } 3258 3259 return -1000; 3260 } 3261 3262 /* drbd_sync_handshake() returns the new conn state on success, or 3263 CONN_MASK (-1) on failure. 3264 */ 3265 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 3266 enum drbd_role peer_role, 3267 enum drbd_disk_state peer_disk) __must_hold(local) 3268 { 3269 struct drbd_device *device = peer_device->device; 3270 enum drbd_conns rv = C_MASK; 3271 enum drbd_disk_state mydisk; 3272 struct net_conf *nc; 3273 int hg, rule_nr, rr_conflict, tentative, always_asbp; 3274 3275 mydisk = device->state.disk; 3276 if (mydisk == D_NEGOTIATING) 3277 mydisk = device->new_state_tmp.disk; 3278 3279 drbd_info(device, "drbd_sync_handshake:\n"); 3280 3281 spin_lock_irq(&device->ldev->md.uuid_lock); 3282 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 3283 drbd_uuid_dump(device, "peer", device->p_uuid, 3284 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3285 3286 hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr); 3287 spin_unlock_irq(&device->ldev->md.uuid_lock); 3288 3289 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 3290 3291 if (hg == -1000) { 3292 drbd_alert(device, "Unrelated data, aborting!\n"); 3293 return C_MASK; 3294 } 3295 if (hg < -0x10000) { 3296 int proto, fflags; 3297 hg = -hg; 3298 proto = hg & 0xff; 3299 fflags = (hg >> 8) & 0xff; 3300 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n", 3301 proto, fflags); 3302 return C_MASK; 3303 } 3304 if (hg < -1000) { 3305 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3306 return C_MASK; 3307 } 3308 3309 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3310 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3311 int f = (hg == -100) || abs(hg) == 2; 3312 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3313 if (f) 3314 hg = hg*2; 3315 drbd_info(device, "Becoming sync %s due to disk states.\n", 3316 hg > 0 ? "source" : "target"); 3317 } 3318 3319 if (abs(hg) == 100) 3320 drbd_khelper(device, "initial-split-brain"); 3321 3322 rcu_read_lock(); 3323 nc = rcu_dereference(peer_device->connection->net_conf); 3324 always_asbp = nc->always_asbp; 3325 rr_conflict = nc->rr_conflict; 3326 tentative = nc->tentative; 3327 rcu_read_unlock(); 3328 3329 if (hg == 100 || (hg == -100 && always_asbp)) { 3330 int pcount = (device->state.role == R_PRIMARY) 3331 + (peer_role == R_PRIMARY); 3332 int forced = (hg == -100); 3333 3334 switch (pcount) { 3335 case 0: 3336 hg = drbd_asb_recover_0p(peer_device); 3337 break; 3338 case 1: 3339 hg = drbd_asb_recover_1p(peer_device); 3340 break; 3341 case 2: 3342 hg = drbd_asb_recover_2p(peer_device); 3343 break; 3344 } 3345 if (abs(hg) < 100) { 3346 drbd_warn(device, "Split-Brain detected, %d primaries, " 3347 "automatically solved. Sync from %s node\n", 3348 pcount, (hg < 0) ? "peer" : "this"); 3349 if (forced) { 3350 drbd_warn(device, "Doing a full sync, since" 3351 " UUIDs where ambiguous.\n"); 3352 hg = hg*2; 3353 } 3354 } 3355 } 3356 3357 if (hg == -100) { 3358 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3359 hg = -1; 3360 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3361 hg = 1; 3362 3363 if (abs(hg) < 100) 3364 drbd_warn(device, "Split-Brain detected, manually solved. " 3365 "Sync from %s node\n", 3366 (hg < 0) ? "peer" : "this"); 3367 } 3368 3369 if (hg == -100) { 3370 /* FIXME this log message is not correct if we end up here 3371 * after an attempted attach on a diskless node. 3372 * We just refuse to attach -- well, we drop the "connection" 3373 * to that disk, in a way... */ 3374 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3375 drbd_khelper(device, "split-brain"); 3376 return C_MASK; 3377 } 3378 3379 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3380 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3381 return C_MASK; 3382 } 3383 3384 if (hg < 0 && /* by intention we do not use mydisk here. */ 3385 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3386 switch (rr_conflict) { 3387 case ASB_CALL_HELPER: 3388 drbd_khelper(device, "pri-lost"); 3389 fallthrough; 3390 case ASB_DISCONNECT: 3391 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3392 return C_MASK; 3393 case ASB_VIOLENTLY: 3394 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3395 "assumption\n"); 3396 } 3397 } 3398 3399 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3400 if (hg == 0) 3401 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3402 else 3403 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3404 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3405 abs(hg) >= 2 ? "full" : "bit-map based"); 3406 return C_MASK; 3407 } 3408 3409 if (abs(hg) >= 2) { 3410 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3411 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3412 BM_LOCKED_SET_ALLOWED, NULL)) 3413 return C_MASK; 3414 } 3415 3416 if (hg > 0) { /* become sync source. */ 3417 rv = C_WF_BITMAP_S; 3418 } else if (hg < 0) { /* become sync target */ 3419 rv = C_WF_BITMAP_T; 3420 } else { 3421 rv = C_CONNECTED; 3422 if (drbd_bm_total_weight(device)) { 3423 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3424 drbd_bm_total_weight(device)); 3425 } 3426 } 3427 3428 return rv; 3429 } 3430 3431 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3432 { 3433 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3434 if (peer == ASB_DISCARD_REMOTE) 3435 return ASB_DISCARD_LOCAL; 3436 3437 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3438 if (peer == ASB_DISCARD_LOCAL) 3439 return ASB_DISCARD_REMOTE; 3440 3441 /* everything else is valid if they are equal on both sides. */ 3442 return peer; 3443 } 3444 3445 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3446 { 3447 struct p_protocol *p = pi->data; 3448 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3449 int p_proto, p_discard_my_data, p_two_primaries, cf; 3450 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3451 char integrity_alg[SHARED_SECRET_MAX] = ""; 3452 struct crypto_shash *peer_integrity_tfm = NULL; 3453 void *int_dig_in = NULL, *int_dig_vv = NULL; 3454 3455 p_proto = be32_to_cpu(p->protocol); 3456 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3457 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3458 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3459 p_two_primaries = be32_to_cpu(p->two_primaries); 3460 cf = be32_to_cpu(p->conn_flags); 3461 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3462 3463 if (connection->agreed_pro_version >= 87) { 3464 int err; 3465 3466 if (pi->size > sizeof(integrity_alg)) 3467 return -EIO; 3468 err = drbd_recv_all(connection, integrity_alg, pi->size); 3469 if (err) 3470 return err; 3471 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3472 } 3473 3474 if (pi->cmd != P_PROTOCOL_UPDATE) { 3475 clear_bit(CONN_DRY_RUN, &connection->flags); 3476 3477 if (cf & CF_DRY_RUN) 3478 set_bit(CONN_DRY_RUN, &connection->flags); 3479 3480 rcu_read_lock(); 3481 nc = rcu_dereference(connection->net_conf); 3482 3483 if (p_proto != nc->wire_protocol) { 3484 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3485 goto disconnect_rcu_unlock; 3486 } 3487 3488 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3489 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3490 goto disconnect_rcu_unlock; 3491 } 3492 3493 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3494 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3495 goto disconnect_rcu_unlock; 3496 } 3497 3498 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3499 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3500 goto disconnect_rcu_unlock; 3501 } 3502 3503 if (p_discard_my_data && nc->discard_my_data) { 3504 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3505 goto disconnect_rcu_unlock; 3506 } 3507 3508 if (p_two_primaries != nc->two_primaries) { 3509 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3510 goto disconnect_rcu_unlock; 3511 } 3512 3513 if (strcmp(integrity_alg, nc->integrity_alg)) { 3514 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3515 goto disconnect_rcu_unlock; 3516 } 3517 3518 rcu_read_unlock(); 3519 } 3520 3521 if (integrity_alg[0]) { 3522 int hash_size; 3523 3524 /* 3525 * We can only change the peer data integrity algorithm 3526 * here. Changing our own data integrity algorithm 3527 * requires that we send a P_PROTOCOL_UPDATE packet at 3528 * the same time; otherwise, the peer has no way to 3529 * tell between which packets the algorithm should 3530 * change. 3531 */ 3532 3533 peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0); 3534 if (IS_ERR(peer_integrity_tfm)) { 3535 peer_integrity_tfm = NULL; 3536 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3537 integrity_alg); 3538 goto disconnect; 3539 } 3540 3541 hash_size = crypto_shash_digestsize(peer_integrity_tfm); 3542 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3543 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3544 if (!(int_dig_in && int_dig_vv)) { 3545 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3546 goto disconnect; 3547 } 3548 } 3549 3550 new_net_conf = kmalloc_obj(struct net_conf, GFP_KERNEL); 3551 if (!new_net_conf) 3552 goto disconnect; 3553 3554 mutex_lock(&connection->data.mutex); 3555 mutex_lock(&connection->resource->conf_update); 3556 old_net_conf = connection->net_conf; 3557 *new_net_conf = *old_net_conf; 3558 3559 new_net_conf->wire_protocol = p_proto; 3560 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3561 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3562 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3563 new_net_conf->two_primaries = p_two_primaries; 3564 3565 rcu_assign_pointer(connection->net_conf, new_net_conf); 3566 mutex_unlock(&connection->resource->conf_update); 3567 mutex_unlock(&connection->data.mutex); 3568 3569 crypto_free_shash(connection->peer_integrity_tfm); 3570 kfree(connection->int_dig_in); 3571 kfree(connection->int_dig_vv); 3572 connection->peer_integrity_tfm = peer_integrity_tfm; 3573 connection->int_dig_in = int_dig_in; 3574 connection->int_dig_vv = int_dig_vv; 3575 3576 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3577 drbd_info(connection, "peer data-integrity-alg: %s\n", 3578 integrity_alg[0] ? integrity_alg : "(none)"); 3579 3580 kvfree_rcu_mightsleep(old_net_conf); 3581 return 0; 3582 3583 disconnect_rcu_unlock: 3584 rcu_read_unlock(); 3585 disconnect: 3586 crypto_free_shash(peer_integrity_tfm); 3587 kfree(int_dig_in); 3588 kfree(int_dig_vv); 3589 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3590 return -EIO; 3591 } 3592 3593 /* helper function 3594 * input: alg name, feature name 3595 * return: NULL (alg name was "") 3596 * ERR_PTR(error) if something goes wrong 3597 * or the crypto hash ptr, if it worked out ok. */ 3598 static struct crypto_shash *drbd_crypto_alloc_digest_safe( 3599 const struct drbd_device *device, 3600 const char *alg, const char *name) 3601 { 3602 struct crypto_shash *tfm; 3603 3604 if (!alg[0]) 3605 return NULL; 3606 3607 tfm = crypto_alloc_shash(alg, 0, 0); 3608 if (IS_ERR(tfm)) { 3609 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3610 alg, name, PTR_ERR(tfm)); 3611 return tfm; 3612 } 3613 return tfm; 3614 } 3615 3616 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3617 { 3618 void *buffer = connection->data.rbuf; 3619 int size = pi->size; 3620 3621 while (size) { 3622 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3623 s = drbd_recv(connection, buffer, s); 3624 if (s <= 0) { 3625 if (s < 0) 3626 return s; 3627 break; 3628 } 3629 size -= s; 3630 } 3631 if (size) 3632 return -EIO; 3633 return 0; 3634 } 3635 3636 /* 3637 * config_unknown_volume - device configuration command for unknown volume 3638 * 3639 * When a device is added to an existing connection, the node on which the 3640 * device is added first will send configuration commands to its peer but the 3641 * peer will not know about the device yet. It will warn and ignore these 3642 * commands. Once the device is added on the second node, the second node will 3643 * send the same device configuration commands, but in the other direction. 3644 * 3645 * (We can also end up here if drbd is misconfigured.) 3646 */ 3647 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3648 { 3649 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3650 cmdname(pi->cmd), pi->vnr); 3651 return ignore_remaining_packet(connection, pi); 3652 } 3653 3654 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3655 { 3656 struct drbd_peer_device *peer_device; 3657 struct drbd_device *device; 3658 struct p_rs_param_95 *p; 3659 unsigned int header_size, data_size, exp_max_sz; 3660 struct crypto_shash *verify_tfm = NULL; 3661 struct crypto_shash *csums_tfm = NULL; 3662 struct net_conf *old_net_conf, *new_net_conf = NULL; 3663 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3664 const int apv = connection->agreed_pro_version; 3665 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3666 unsigned int fifo_size = 0; 3667 int err; 3668 3669 peer_device = conn_peer_device(connection, pi->vnr); 3670 if (!peer_device) 3671 return config_unknown_volume(connection, pi); 3672 device = peer_device->device; 3673 3674 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3675 : apv == 88 ? sizeof(struct p_rs_param) 3676 + SHARED_SECRET_MAX 3677 : apv <= 94 ? sizeof(struct p_rs_param_89) 3678 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3679 3680 if (pi->size > exp_max_sz) { 3681 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3682 pi->size, exp_max_sz); 3683 return -EIO; 3684 } 3685 3686 if (apv <= 88) { 3687 header_size = sizeof(struct p_rs_param); 3688 data_size = pi->size - header_size; 3689 } else if (apv <= 94) { 3690 header_size = sizeof(struct p_rs_param_89); 3691 data_size = pi->size - header_size; 3692 D_ASSERT(device, data_size == 0); 3693 } else { 3694 header_size = sizeof(struct p_rs_param_95); 3695 data_size = pi->size - header_size; 3696 D_ASSERT(device, data_size == 0); 3697 } 3698 3699 /* initialize verify_alg and csums_alg */ 3700 p = pi->data; 3701 BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX); 3702 memset(&p->algs, 0, sizeof(p->algs)); 3703 3704 err = drbd_recv_all(peer_device->connection, p, header_size); 3705 if (err) 3706 return err; 3707 3708 mutex_lock(&connection->resource->conf_update); 3709 old_net_conf = peer_device->connection->net_conf; 3710 if (get_ldev(device)) { 3711 new_disk_conf = kzalloc_obj(struct disk_conf, GFP_KERNEL); 3712 if (!new_disk_conf) { 3713 put_ldev(device); 3714 mutex_unlock(&connection->resource->conf_update); 3715 drbd_err(device, "Allocation of new disk_conf failed\n"); 3716 return -ENOMEM; 3717 } 3718 3719 old_disk_conf = device->ldev->disk_conf; 3720 *new_disk_conf = *old_disk_conf; 3721 3722 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3723 } 3724 3725 if (apv >= 88) { 3726 if (apv == 88) { 3727 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3728 drbd_err(device, "verify-alg of wrong size, " 3729 "peer wants %u, accepting only up to %u byte\n", 3730 data_size, SHARED_SECRET_MAX); 3731 goto reconnect; 3732 } 3733 3734 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3735 if (err) 3736 goto reconnect; 3737 /* we expect NUL terminated string */ 3738 /* but just in case someone tries to be evil */ 3739 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3740 p->verify_alg[data_size-1] = 0; 3741 3742 } else /* apv >= 89 */ { 3743 /* we still expect NUL terminated strings */ 3744 /* but just in case someone tries to be evil */ 3745 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3746 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3747 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3748 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3749 } 3750 3751 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3752 if (device->state.conn == C_WF_REPORT_PARAMS) { 3753 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3754 old_net_conf->verify_alg, p->verify_alg); 3755 goto disconnect; 3756 } 3757 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3758 p->verify_alg, "verify-alg"); 3759 if (IS_ERR(verify_tfm)) { 3760 verify_tfm = NULL; 3761 goto disconnect; 3762 } 3763 } 3764 3765 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3766 if (device->state.conn == C_WF_REPORT_PARAMS) { 3767 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3768 old_net_conf->csums_alg, p->csums_alg); 3769 goto disconnect; 3770 } 3771 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3772 p->csums_alg, "csums-alg"); 3773 if (IS_ERR(csums_tfm)) { 3774 csums_tfm = NULL; 3775 goto disconnect; 3776 } 3777 } 3778 3779 if (apv > 94 && new_disk_conf) { 3780 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3781 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3782 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3783 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3784 3785 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3786 if (fifo_size != device->rs_plan_s->size) { 3787 new_plan = fifo_alloc(fifo_size); 3788 if (!new_plan) { 3789 drbd_err(device, "kmalloc of fifo_buffer failed"); 3790 put_ldev(device); 3791 goto disconnect; 3792 } 3793 } 3794 } 3795 3796 if (verify_tfm || csums_tfm) { 3797 new_net_conf = kzalloc_obj(struct net_conf, GFP_KERNEL); 3798 if (!new_net_conf) 3799 goto disconnect; 3800 3801 *new_net_conf = *old_net_conf; 3802 3803 if (verify_tfm) { 3804 strcpy(new_net_conf->verify_alg, p->verify_alg); 3805 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3806 crypto_free_shash(peer_device->connection->verify_tfm); 3807 peer_device->connection->verify_tfm = verify_tfm; 3808 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3809 } 3810 if (csums_tfm) { 3811 strcpy(new_net_conf->csums_alg, p->csums_alg); 3812 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3813 crypto_free_shash(peer_device->connection->csums_tfm); 3814 peer_device->connection->csums_tfm = csums_tfm; 3815 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3816 } 3817 rcu_assign_pointer(connection->net_conf, new_net_conf); 3818 } 3819 } 3820 3821 if (new_disk_conf) { 3822 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3823 put_ldev(device); 3824 } 3825 3826 if (new_plan) { 3827 old_plan = device->rs_plan_s; 3828 rcu_assign_pointer(device->rs_plan_s, new_plan); 3829 } 3830 3831 mutex_unlock(&connection->resource->conf_update); 3832 synchronize_rcu(); 3833 if (new_net_conf) 3834 kfree(old_net_conf); 3835 kfree(old_disk_conf); 3836 kfree(old_plan); 3837 3838 return 0; 3839 3840 reconnect: 3841 if (new_disk_conf) { 3842 put_ldev(device); 3843 kfree(new_disk_conf); 3844 } 3845 mutex_unlock(&connection->resource->conf_update); 3846 return -EIO; 3847 3848 disconnect: 3849 kfree(new_plan); 3850 if (new_disk_conf) { 3851 put_ldev(device); 3852 kfree(new_disk_conf); 3853 } 3854 mutex_unlock(&connection->resource->conf_update); 3855 /* just for completeness: actually not needed, 3856 * as this is not reached if csums_tfm was ok. */ 3857 crypto_free_shash(csums_tfm); 3858 /* but free the verify_tfm again, if csums_tfm did not work out */ 3859 crypto_free_shash(verify_tfm); 3860 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3861 return -EIO; 3862 } 3863 3864 /* warn if the arguments differ by more than 12.5% */ 3865 static void warn_if_differ_considerably(struct drbd_device *device, 3866 const char *s, sector_t a, sector_t b) 3867 { 3868 sector_t d; 3869 if (a == 0 || b == 0) 3870 return; 3871 d = (a > b) ? (a - b) : (b - a); 3872 if (d > (a>>3) || d > (b>>3)) 3873 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 3874 (unsigned long long)a, (unsigned long long)b); 3875 } 3876 3877 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 3878 { 3879 struct drbd_peer_device *peer_device; 3880 struct drbd_device *device; 3881 struct p_sizes *p = pi->data; 3882 struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL; 3883 enum determine_dev_size dd = DS_UNCHANGED; 3884 sector_t p_size, p_usize, p_csize, my_usize; 3885 sector_t new_size, cur_size; 3886 int ldsc = 0; /* local disk size changed */ 3887 enum dds_flags ddsf; 3888 3889 peer_device = conn_peer_device(connection, pi->vnr); 3890 if (!peer_device) 3891 return config_unknown_volume(connection, pi); 3892 device = peer_device->device; 3893 cur_size = get_capacity(device->vdisk); 3894 3895 p_size = be64_to_cpu(p->d_size); 3896 p_usize = be64_to_cpu(p->u_size); 3897 p_csize = be64_to_cpu(p->c_size); 3898 3899 /* just store the peer's disk size for now. 3900 * we still need to figure out whether we accept that. */ 3901 device->p_size = p_size; 3902 3903 if (get_ldev(device)) { 3904 rcu_read_lock(); 3905 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 3906 rcu_read_unlock(); 3907 3908 warn_if_differ_considerably(device, "lower level device sizes", 3909 p_size, drbd_get_max_capacity(device->ldev)); 3910 warn_if_differ_considerably(device, "user requested size", 3911 p_usize, my_usize); 3912 3913 /* if this is the first connect, or an otherwise expected 3914 * param exchange, choose the minimum */ 3915 if (device->state.conn == C_WF_REPORT_PARAMS) 3916 p_usize = min_not_zero(my_usize, p_usize); 3917 3918 /* Never shrink a device with usable data during connect, 3919 * or "attach" on the peer. 3920 * But allow online shrinking if we are connected. */ 3921 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0); 3922 if (new_size < cur_size && 3923 device->state.disk >= D_OUTDATED && 3924 (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) { 3925 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n", 3926 (unsigned long long)new_size, (unsigned long long)cur_size); 3927 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 3928 put_ldev(device); 3929 return -EIO; 3930 } 3931 3932 if (my_usize != p_usize) { 3933 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 3934 3935 new_disk_conf = kzalloc_obj(struct disk_conf, 3936 GFP_KERNEL); 3937 if (!new_disk_conf) { 3938 put_ldev(device); 3939 return -ENOMEM; 3940 } 3941 3942 mutex_lock(&connection->resource->conf_update); 3943 old_disk_conf = device->ldev->disk_conf; 3944 *new_disk_conf = *old_disk_conf; 3945 new_disk_conf->disk_size = p_usize; 3946 3947 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3948 mutex_unlock(&connection->resource->conf_update); 3949 kvfree_rcu_mightsleep(old_disk_conf); 3950 3951 drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n", 3952 (unsigned long)p_usize, (unsigned long)my_usize); 3953 } 3954 3955 put_ldev(device); 3956 } 3957 3958 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 3959 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size(). 3960 In case we cleared the QUEUE_FLAG_DISCARD from our queue in 3961 drbd_reconsider_queue_parameters(), we can be sure that after 3962 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */ 3963 3964 ddsf = be16_to_cpu(p->dds_flags); 3965 if (get_ldev(device)) { 3966 drbd_reconsider_queue_parameters(device, device->ldev, o); 3967 dd = drbd_determine_dev_size(device, ddsf, NULL); 3968 put_ldev(device); 3969 if (dd == DS_ERROR) 3970 return -EIO; 3971 drbd_md_sync(device); 3972 } else { 3973 /* 3974 * I am diskless, need to accept the peer's *current* size. 3975 * I must NOT accept the peers backing disk size, 3976 * it may have been larger than mine all along... 3977 * 3978 * At this point, the peer knows more about my disk, or at 3979 * least about what we last agreed upon, than myself. 3980 * So if his c_size is less than his d_size, the most likely 3981 * reason is that *my* d_size was smaller last time we checked. 3982 * 3983 * However, if he sends a zero current size, 3984 * take his (user-capped or) backing disk size anyways. 3985 * 3986 * Unless of course he does not have a disk himself. 3987 * In which case we ignore this completely. 3988 */ 3989 sector_t new_size = p_csize ?: p_usize ?: p_size; 3990 drbd_reconsider_queue_parameters(device, NULL, o); 3991 if (new_size == 0) { 3992 /* Ignore, peer does not know nothing. */ 3993 } else if (new_size == cur_size) { 3994 /* nothing to do */ 3995 } else if (cur_size != 0 && p_size == 0) { 3996 drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n", 3997 (unsigned long long)new_size, (unsigned long long)cur_size); 3998 } else if (new_size < cur_size && device->state.role == R_PRIMARY) { 3999 drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n", 4000 (unsigned long long)new_size, (unsigned long long)cur_size); 4001 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4002 return -EIO; 4003 } else { 4004 /* I believe the peer, if 4005 * - I don't have a current size myself 4006 * - we agree on the size anyways 4007 * - I do have a current size, am Secondary, 4008 * and he has the only disk 4009 * - I do have a current size, am Primary, 4010 * and he has the only disk, 4011 * which is larger than my current size 4012 */ 4013 drbd_set_my_capacity(device, new_size); 4014 } 4015 } 4016 4017 if (get_ldev(device)) { 4018 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 4019 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 4020 ldsc = 1; 4021 } 4022 4023 put_ldev(device); 4024 } 4025 4026 if (device->state.conn > C_WF_REPORT_PARAMS) { 4027 if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) || 4028 ldsc) { 4029 /* we have different sizes, probably peer 4030 * needs to know my new size... */ 4031 drbd_send_sizes(peer_device, 0, ddsf); 4032 } 4033 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 4034 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 4035 if (device->state.pdsk >= D_INCONSISTENT && 4036 device->state.disk >= D_INCONSISTENT) { 4037 if (ddsf & DDSF_NO_RESYNC) 4038 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 4039 else 4040 resync_after_online_grow(device); 4041 } else 4042 set_bit(RESYNC_AFTER_NEG, &device->flags); 4043 } 4044 } 4045 4046 return 0; 4047 } 4048 4049 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 4050 { 4051 struct drbd_peer_device *peer_device; 4052 struct drbd_device *device; 4053 struct p_uuids *p = pi->data; 4054 u64 *p_uuid; 4055 int i, updated_uuids = 0; 4056 4057 peer_device = conn_peer_device(connection, pi->vnr); 4058 if (!peer_device) 4059 return config_unknown_volume(connection, pi); 4060 device = peer_device->device; 4061 4062 p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO); 4063 if (!p_uuid) 4064 return false; 4065 4066 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 4067 p_uuid[i] = be64_to_cpu(p->uuid[i]); 4068 4069 kfree(device->p_uuid); 4070 device->p_uuid = p_uuid; 4071 4072 if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) && 4073 device->state.disk < D_INCONSISTENT && 4074 device->state.role == R_PRIMARY && 4075 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 4076 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 4077 (unsigned long long)device->ed_uuid); 4078 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4079 return -EIO; 4080 } 4081 4082 if (get_ldev(device)) { 4083 int skip_initial_sync = 4084 device->state.conn == C_CONNECTED && 4085 peer_device->connection->agreed_pro_version >= 90 && 4086 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 4087 (p_uuid[UI_FLAGS] & 8); 4088 if (skip_initial_sync) { 4089 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 4090 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 4091 "clear_n_write from receive_uuids", 4092 BM_LOCKED_TEST_ALLOWED, NULL); 4093 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 4094 _drbd_uuid_set(device, UI_BITMAP, 0); 4095 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 4096 CS_VERBOSE, NULL); 4097 drbd_md_sync(device); 4098 updated_uuids = 1; 4099 } 4100 put_ldev(device); 4101 } else if (device->state.disk < D_INCONSISTENT && 4102 device->state.role == R_PRIMARY) { 4103 /* I am a diskless primary, the peer just created a new current UUID 4104 for me. */ 4105 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 4106 } 4107 4108 /* Before we test for the disk state, we should wait until an eventually 4109 ongoing cluster wide state change is finished. That is important if 4110 we are primary and are detaching from our disk. We need to see the 4111 new disk state... */ 4112 mutex_lock(device->state_mutex); 4113 mutex_unlock(device->state_mutex); 4114 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 4115 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 4116 4117 if (updated_uuids) 4118 drbd_print_uuids(device, "receiver updated UUIDs to"); 4119 4120 return 0; 4121 } 4122 4123 /** 4124 * convert_state() - Converts the peer's view of the cluster state to our point of view 4125 * @ps: The state as seen by the peer. 4126 */ 4127 static union drbd_state convert_state(union drbd_state ps) 4128 { 4129 union drbd_state ms; 4130 4131 static enum drbd_conns c_tab[] = { 4132 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 4133 [C_CONNECTED] = C_CONNECTED, 4134 4135 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 4136 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 4137 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 4138 [C_VERIFY_S] = C_VERIFY_T, 4139 [C_MASK] = C_MASK, 4140 }; 4141 4142 ms.i = ps.i; 4143 4144 ms.conn = c_tab[ps.conn]; 4145 ms.peer = ps.role; 4146 ms.role = ps.peer; 4147 ms.pdsk = ps.disk; 4148 ms.disk = ps.pdsk; 4149 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 4150 4151 return ms; 4152 } 4153 4154 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 4155 { 4156 struct drbd_peer_device *peer_device; 4157 struct drbd_device *device; 4158 struct p_req_state *p = pi->data; 4159 union drbd_state mask, val; 4160 enum drbd_state_rv rv; 4161 4162 peer_device = conn_peer_device(connection, pi->vnr); 4163 if (!peer_device) 4164 return -EIO; 4165 device = peer_device->device; 4166 4167 mask.i = be32_to_cpu(p->mask); 4168 val.i = be32_to_cpu(p->val); 4169 4170 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 4171 mutex_is_locked(device->state_mutex)) { 4172 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 4173 return 0; 4174 } 4175 4176 mask = convert_state(mask); 4177 val = convert_state(val); 4178 4179 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 4180 drbd_send_sr_reply(peer_device, rv); 4181 4182 drbd_md_sync(device); 4183 4184 return 0; 4185 } 4186 4187 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 4188 { 4189 struct p_req_state *p = pi->data; 4190 union drbd_state mask, val; 4191 enum drbd_state_rv rv; 4192 4193 mask.i = be32_to_cpu(p->mask); 4194 val.i = be32_to_cpu(p->val); 4195 4196 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 4197 mutex_is_locked(&connection->cstate_mutex)) { 4198 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 4199 return 0; 4200 } 4201 4202 mask = convert_state(mask); 4203 val = convert_state(val); 4204 4205 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 4206 conn_send_sr_reply(connection, rv); 4207 4208 return 0; 4209 } 4210 4211 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 4212 { 4213 struct drbd_peer_device *peer_device; 4214 struct drbd_device *device; 4215 struct p_state *p = pi->data; 4216 union drbd_state os, ns, peer_state; 4217 enum drbd_disk_state real_peer_disk; 4218 enum chg_state_flags cs_flags; 4219 int rv; 4220 4221 peer_device = conn_peer_device(connection, pi->vnr); 4222 if (!peer_device) 4223 return config_unknown_volume(connection, pi); 4224 device = peer_device->device; 4225 4226 peer_state.i = be32_to_cpu(p->state); 4227 4228 real_peer_disk = peer_state.disk; 4229 if (peer_state.disk == D_NEGOTIATING) { 4230 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 4231 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 4232 } 4233 4234 spin_lock_irq(&device->resource->req_lock); 4235 retry: 4236 os = ns = drbd_read_state(device); 4237 spin_unlock_irq(&device->resource->req_lock); 4238 4239 /* If some other part of the code (ack_receiver thread, timeout) 4240 * already decided to close the connection again, 4241 * we must not "re-establish" it here. */ 4242 if (os.conn <= C_TEAR_DOWN) 4243 return -ECONNRESET; 4244 4245 /* If this is the "end of sync" confirmation, usually the peer disk 4246 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 4247 * set) resync started in PausedSyncT, or if the timing of pause-/ 4248 * unpause-sync events has been "just right", the peer disk may 4249 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 4250 */ 4251 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 4252 real_peer_disk == D_UP_TO_DATE && 4253 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 4254 /* If we are (becoming) SyncSource, but peer is still in sync 4255 * preparation, ignore its uptodate-ness to avoid flapping, it 4256 * will change to inconsistent once the peer reaches active 4257 * syncing states. 4258 * It may have changed syncer-paused flags, however, so we 4259 * cannot ignore this completely. */ 4260 if (peer_state.conn > C_CONNECTED && 4261 peer_state.conn < C_SYNC_SOURCE) 4262 real_peer_disk = D_INCONSISTENT; 4263 4264 /* if peer_state changes to connected at the same time, 4265 * it explicitly notifies us that it finished resync. 4266 * Maybe we should finish it up, too? */ 4267 else if (os.conn >= C_SYNC_SOURCE && 4268 peer_state.conn == C_CONNECTED) { 4269 if (drbd_bm_total_weight(device) <= device->rs_failed) 4270 drbd_resync_finished(peer_device); 4271 return 0; 4272 } 4273 } 4274 4275 /* explicit verify finished notification, stop sector reached. */ 4276 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 4277 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 4278 ov_out_of_sync_print(peer_device); 4279 drbd_resync_finished(peer_device); 4280 return 0; 4281 } 4282 4283 /* peer says his disk is inconsistent, while we think it is uptodate, 4284 * and this happens while the peer still thinks we have a sync going on, 4285 * but we think we are already done with the sync. 4286 * We ignore this to avoid flapping pdsk. 4287 * This should not happen, if the peer is a recent version of drbd. */ 4288 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 4289 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 4290 real_peer_disk = D_UP_TO_DATE; 4291 4292 if (ns.conn == C_WF_REPORT_PARAMS) 4293 ns.conn = C_CONNECTED; 4294 4295 if (peer_state.conn == C_AHEAD) 4296 ns.conn = C_BEHIND; 4297 4298 /* TODO: 4299 * if (primary and diskless and peer uuid != effective uuid) 4300 * abort attach on peer; 4301 * 4302 * If this node does not have good data, was already connected, but 4303 * the peer did a late attach only now, trying to "negotiate" with me, 4304 * AND I am currently Primary, possibly frozen, with some specific 4305 * "effective" uuid, this should never be reached, really, because 4306 * we first send the uuids, then the current state. 4307 * 4308 * In this scenario, we already dropped the connection hard 4309 * when we received the unsuitable uuids (receive_uuids(). 4310 * 4311 * Should we want to change this, that is: not drop the connection in 4312 * receive_uuids() already, then we would need to add a branch here 4313 * that aborts the attach of "unsuitable uuids" on the peer in case 4314 * this node is currently Diskless Primary. 4315 */ 4316 4317 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 4318 get_ldev_if_state(device, D_NEGOTIATING)) { 4319 int cr; /* consider resync */ 4320 4321 /* if we established a new connection */ 4322 cr = (os.conn < C_CONNECTED); 4323 /* if we had an established connection 4324 * and one of the nodes newly attaches a disk */ 4325 cr |= (os.conn == C_CONNECTED && 4326 (peer_state.disk == D_NEGOTIATING || 4327 os.disk == D_NEGOTIATING)); 4328 /* if we have both been inconsistent, and the peer has been 4329 * forced to be UpToDate with --force */ 4330 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 4331 /* if we had been plain connected, and the admin requested to 4332 * start a sync by "invalidate" or "invalidate-remote" */ 4333 cr |= (os.conn == C_CONNECTED && 4334 (peer_state.conn >= C_STARTING_SYNC_S && 4335 peer_state.conn <= C_WF_BITMAP_T)); 4336 4337 if (cr) 4338 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 4339 4340 put_ldev(device); 4341 if (ns.conn == C_MASK) { 4342 ns.conn = C_CONNECTED; 4343 if (device->state.disk == D_NEGOTIATING) { 4344 drbd_force_state(device, NS(disk, D_FAILED)); 4345 } else if (peer_state.disk == D_NEGOTIATING) { 4346 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 4347 peer_state.disk = D_DISKLESS; 4348 real_peer_disk = D_DISKLESS; 4349 } else { 4350 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 4351 return -EIO; 4352 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 4353 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4354 return -EIO; 4355 } 4356 } 4357 } 4358 4359 spin_lock_irq(&device->resource->req_lock); 4360 if (os.i != drbd_read_state(device).i) 4361 goto retry; 4362 clear_bit(CONSIDER_RESYNC, &device->flags); 4363 ns.peer = peer_state.role; 4364 ns.pdsk = real_peer_disk; 4365 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 4366 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 4367 ns.disk = device->new_state_tmp.disk; 4368 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4369 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4370 test_bit(NEW_CUR_UUID, &device->flags)) { 4371 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4372 for temporal network outages! */ 4373 spin_unlock_irq(&device->resource->req_lock); 4374 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4375 tl_clear(peer_device->connection); 4376 drbd_uuid_new_current(device); 4377 clear_bit(NEW_CUR_UUID, &device->flags); 4378 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4379 return -EIO; 4380 } 4381 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4382 ns = drbd_read_state(device); 4383 spin_unlock_irq(&device->resource->req_lock); 4384 4385 if (rv < SS_SUCCESS) { 4386 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4387 return -EIO; 4388 } 4389 4390 if (os.conn > C_WF_REPORT_PARAMS) { 4391 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4392 peer_state.disk != D_NEGOTIATING ) { 4393 /* we want resync, peer has not yet decided to sync... */ 4394 /* Nowadays only used when forcing a node into primary role and 4395 setting its disk to UpToDate with that */ 4396 drbd_send_uuids(peer_device); 4397 drbd_send_current_state(peer_device); 4398 } 4399 } 4400 4401 clear_bit(DISCARD_MY_DATA, &device->flags); 4402 4403 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4404 4405 return 0; 4406 } 4407 4408 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4409 { 4410 struct drbd_peer_device *peer_device; 4411 struct drbd_device *device; 4412 struct p_rs_uuid *p = pi->data; 4413 4414 peer_device = conn_peer_device(connection, pi->vnr); 4415 if (!peer_device) 4416 return -EIO; 4417 device = peer_device->device; 4418 4419 wait_event(device->misc_wait, 4420 device->state.conn == C_WF_SYNC_UUID || 4421 device->state.conn == C_BEHIND || 4422 device->state.conn < C_CONNECTED || 4423 device->state.disk < D_NEGOTIATING); 4424 4425 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4426 4427 /* Here the _drbd_uuid_ functions are right, current should 4428 _not_ be rotated into the history */ 4429 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4430 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4431 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4432 4433 drbd_print_uuids(device, "updated sync uuid"); 4434 drbd_start_resync(device, C_SYNC_TARGET); 4435 4436 put_ldev(device); 4437 } else 4438 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4439 4440 return 0; 4441 } 4442 4443 /* 4444 * receive_bitmap_plain 4445 * 4446 * Return 0 when done, 1 when another iteration is needed, and a negative error 4447 * code upon failure. 4448 */ 4449 static int 4450 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4451 unsigned long *p, struct bm_xfer_ctx *c) 4452 { 4453 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4454 drbd_header_size(peer_device->connection); 4455 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4456 c->bm_words - c->word_offset); 4457 unsigned int want = num_words * sizeof(*p); 4458 int err; 4459 4460 if (want != size) { 4461 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4462 return -EIO; 4463 } 4464 if (want == 0) 4465 return 0; 4466 err = drbd_recv_all(peer_device->connection, p, want); 4467 if (err) 4468 return err; 4469 4470 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4471 4472 c->word_offset += num_words; 4473 c->bit_offset = c->word_offset * BITS_PER_LONG; 4474 if (c->bit_offset > c->bm_bits) 4475 c->bit_offset = c->bm_bits; 4476 4477 return 1; 4478 } 4479 4480 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4481 { 4482 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4483 } 4484 4485 static int dcbp_get_start(struct p_compressed_bm *p) 4486 { 4487 return (p->encoding & 0x80) != 0; 4488 } 4489 4490 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4491 { 4492 return (p->encoding >> 4) & 0x7; 4493 } 4494 4495 /* 4496 * recv_bm_rle_bits 4497 * 4498 * Return 0 when done, 1 when another iteration is needed, and a negative error 4499 * code upon failure. 4500 */ 4501 static int 4502 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4503 struct p_compressed_bm *p, 4504 struct bm_xfer_ctx *c, 4505 unsigned int len) 4506 { 4507 struct bitstream bs; 4508 u64 look_ahead; 4509 u64 rl; 4510 u64 tmp; 4511 unsigned long s = c->bit_offset; 4512 unsigned long e; 4513 int toggle = dcbp_get_start(p); 4514 int have; 4515 int bits; 4516 4517 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4518 4519 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4520 if (bits < 0) 4521 return -EIO; 4522 4523 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4524 bits = vli_decode_bits(&rl, look_ahead); 4525 if (bits <= 0) 4526 return -EIO; 4527 4528 if (toggle) { 4529 e = s + rl -1; 4530 if (e >= c->bm_bits) { 4531 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4532 return -EIO; 4533 } 4534 _drbd_bm_set_bits(peer_device->device, s, e); 4535 } 4536 4537 if (have < bits) { 4538 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4539 have, bits, look_ahead, 4540 (unsigned int)(bs.cur.b - p->code), 4541 (unsigned int)bs.buf_len); 4542 return -EIO; 4543 } 4544 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4545 if (likely(bits < 64)) 4546 look_ahead >>= bits; 4547 else 4548 look_ahead = 0; 4549 have -= bits; 4550 4551 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4552 if (bits < 0) 4553 return -EIO; 4554 look_ahead |= tmp << have; 4555 have += bits; 4556 } 4557 4558 c->bit_offset = s; 4559 bm_xfer_ctx_bit_to_word_offset(c); 4560 4561 return (s != c->bm_bits); 4562 } 4563 4564 /* 4565 * decode_bitmap_c 4566 * 4567 * Return 0 when done, 1 when another iteration is needed, and a negative error 4568 * code upon failure. 4569 */ 4570 static int 4571 decode_bitmap_c(struct drbd_peer_device *peer_device, 4572 struct p_compressed_bm *p, 4573 struct bm_xfer_ctx *c, 4574 unsigned int len) 4575 { 4576 if (dcbp_get_code(p) == RLE_VLI_Bits) 4577 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4578 4579 /* other variants had been implemented for evaluation, 4580 * but have been dropped as this one turned out to be "best" 4581 * during all our tests. */ 4582 4583 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4584 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4585 return -EIO; 4586 } 4587 4588 void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device, 4589 const char *direction, struct bm_xfer_ctx *c) 4590 { 4591 /* what would it take to transfer it "plaintext" */ 4592 unsigned int header_size = drbd_header_size(peer_device->connection); 4593 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4594 unsigned int plain = 4595 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4596 c->bm_words * sizeof(unsigned long); 4597 unsigned int total = c->bytes[0] + c->bytes[1]; 4598 unsigned int r; 4599 4600 /* total can not be zero. but just in case: */ 4601 if (total == 0) 4602 return; 4603 4604 /* don't report if not compressed */ 4605 if (total >= plain) 4606 return; 4607 4608 /* total < plain. check for overflow, still */ 4609 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4610 : (1000 * total / plain); 4611 4612 if (r > 1000) 4613 r = 1000; 4614 4615 r = 1000 - r; 4616 drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4617 "total %u; compression: %u.%u%%\n", 4618 direction, 4619 c->bytes[1], c->packets[1], 4620 c->bytes[0], c->packets[0], 4621 total, r/10, r % 10); 4622 } 4623 4624 /* Since we are processing the bitfield from lower addresses to higher, 4625 it does not matter if the process it in 32 bit chunks or 64 bit 4626 chunks as long as it is little endian. (Understand it as byte stream, 4627 beginning with the lowest byte...) If we would use big endian 4628 we would need to process it from the highest address to the lowest, 4629 in order to be agnostic to the 32 vs 64 bits issue. 4630 4631 returns 0 on failure, 1 if we successfully received it. */ 4632 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4633 { 4634 struct drbd_peer_device *peer_device; 4635 struct drbd_device *device; 4636 struct bm_xfer_ctx c; 4637 int err; 4638 4639 peer_device = conn_peer_device(connection, pi->vnr); 4640 if (!peer_device) 4641 return -EIO; 4642 device = peer_device->device; 4643 4644 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4645 /* you are supposed to send additional out-of-sync information 4646 * if you actually set bits during this phase */ 4647 4648 c = (struct bm_xfer_ctx) { 4649 .bm_bits = drbd_bm_bits(device), 4650 .bm_words = drbd_bm_words(device), 4651 }; 4652 4653 for(;;) { 4654 if (pi->cmd == P_BITMAP) 4655 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4656 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4657 /* MAYBE: sanity check that we speak proto >= 90, 4658 * and the feature is enabled! */ 4659 struct p_compressed_bm *p = pi->data; 4660 4661 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4662 drbd_err(device, "ReportCBitmap packet too large\n"); 4663 err = -EIO; 4664 goto out; 4665 } 4666 if (pi->size <= sizeof(*p)) { 4667 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4668 err = -EIO; 4669 goto out; 4670 } 4671 err = drbd_recv_all(peer_device->connection, p, pi->size); 4672 if (err) 4673 goto out; 4674 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4675 } else { 4676 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4677 err = -EIO; 4678 goto out; 4679 } 4680 4681 c.packets[pi->cmd == P_BITMAP]++; 4682 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4683 4684 if (err <= 0) { 4685 if (err < 0) 4686 goto out; 4687 break; 4688 } 4689 err = drbd_recv_header(peer_device->connection, pi); 4690 if (err) 4691 goto out; 4692 } 4693 4694 INFO_bm_xfer_stats(peer_device, "receive", &c); 4695 4696 if (device->state.conn == C_WF_BITMAP_T) { 4697 enum drbd_state_rv rv; 4698 4699 err = drbd_send_bitmap(device, peer_device); 4700 if (err) 4701 goto out; 4702 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4703 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4704 D_ASSERT(device, rv == SS_SUCCESS); 4705 } else if (device->state.conn != C_WF_BITMAP_S) { 4706 /* admin may have requested C_DISCONNECTING, 4707 * other threads may have noticed network errors */ 4708 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4709 drbd_conn_str(device->state.conn)); 4710 } 4711 err = 0; 4712 4713 out: 4714 drbd_bm_unlock(device); 4715 if (!err && device->state.conn == C_WF_BITMAP_S) 4716 drbd_start_resync(device, C_SYNC_SOURCE); 4717 return err; 4718 } 4719 4720 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4721 { 4722 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4723 pi->cmd, pi->size); 4724 4725 return ignore_remaining_packet(connection, pi); 4726 } 4727 4728 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4729 { 4730 /* Make sure we've acked all the TCP data associated 4731 * with the data requests being unplugged */ 4732 tcp_sock_set_quickack(connection->data.socket->sk, 2); 4733 return 0; 4734 } 4735 4736 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4737 { 4738 struct drbd_peer_device *peer_device; 4739 struct drbd_device *device; 4740 struct p_block_desc *p = pi->data; 4741 4742 peer_device = conn_peer_device(connection, pi->vnr); 4743 if (!peer_device) 4744 return -EIO; 4745 device = peer_device->device; 4746 4747 switch (device->state.conn) { 4748 case C_WF_SYNC_UUID: 4749 case C_WF_BITMAP_T: 4750 case C_BEHIND: 4751 break; 4752 default: 4753 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4754 drbd_conn_str(device->state.conn)); 4755 } 4756 4757 drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4758 4759 return 0; 4760 } 4761 4762 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi) 4763 { 4764 struct drbd_peer_device *peer_device; 4765 struct p_block_desc *p = pi->data; 4766 struct drbd_device *device; 4767 sector_t sector; 4768 int size, err = 0; 4769 4770 peer_device = conn_peer_device(connection, pi->vnr); 4771 if (!peer_device) 4772 return -EIO; 4773 device = peer_device->device; 4774 4775 sector = be64_to_cpu(p->sector); 4776 size = be32_to_cpu(p->blksize); 4777 4778 dec_rs_pending(peer_device); 4779 4780 if (get_ldev(device)) { 4781 struct drbd_peer_request *peer_req; 4782 4783 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector, 4784 size, 0, GFP_NOIO); 4785 if (!peer_req) { 4786 put_ldev(device); 4787 return -ENOMEM; 4788 } 4789 4790 peer_req->w.cb = e_end_resync_block; 4791 peer_req->opf = REQ_OP_DISCARD; 4792 peer_req->submit_jif = jiffies; 4793 peer_req->flags |= EE_TRIM; 4794 4795 spin_lock_irq(&device->resource->req_lock); 4796 list_add_tail(&peer_req->w.list, &device->sync_ee); 4797 spin_unlock_irq(&device->resource->req_lock); 4798 4799 atomic_add(pi->size >> 9, &device->rs_sect_ev); 4800 err = drbd_submit_peer_request(peer_req); 4801 4802 if (err) { 4803 spin_lock_irq(&device->resource->req_lock); 4804 list_del(&peer_req->w.list); 4805 spin_unlock_irq(&device->resource->req_lock); 4806 4807 drbd_free_peer_req(device, peer_req); 4808 put_ldev(device); 4809 err = 0; 4810 goto fail; 4811 } 4812 4813 inc_unacked(device); 4814 4815 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(), 4816 as well as drbd_rs_complete_io() */ 4817 } else { 4818 fail: 4819 drbd_rs_complete_io(device, sector); 4820 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER); 4821 } 4822 4823 atomic_add(size >> 9, &device->rs_sect_in); 4824 4825 return err; 4826 } 4827 4828 struct data_cmd { 4829 int expect_payload; 4830 unsigned int pkt_size; 4831 int (*fn)(struct drbd_connection *, struct packet_info *); 4832 }; 4833 4834 static struct data_cmd drbd_cmd_handler[] = { 4835 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4836 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4837 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4838 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4839 [P_BITMAP] = { 1, 0, receive_bitmap } , 4840 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4841 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4842 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4843 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4844 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4845 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4846 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4847 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4848 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4849 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4850 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4851 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4852 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4853 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4854 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4855 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4856 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4857 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4858 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4859 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4860 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data }, 4861 [P_ZEROES] = { 0, sizeof(struct p_trim), receive_Data }, 4862 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated }, 4863 }; 4864 4865 static void drbdd(struct drbd_connection *connection) 4866 { 4867 struct packet_info pi; 4868 size_t shs; /* sub header size */ 4869 int err; 4870 4871 while (get_t_state(&connection->receiver) == RUNNING) { 4872 struct data_cmd const *cmd; 4873 4874 drbd_thread_current_set_cpu(&connection->receiver); 4875 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug); 4876 if (drbd_recv_header_maybe_unplug(connection, &pi)) 4877 goto err_out; 4878 4879 cmd = &drbd_cmd_handler[pi.cmd]; 4880 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4881 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4882 cmdname(pi.cmd), pi.cmd); 4883 goto err_out; 4884 } 4885 4886 shs = cmd->pkt_size; 4887 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME) 4888 shs += sizeof(struct o_qlim); 4889 if (pi.size > shs && !cmd->expect_payload) { 4890 drbd_err(connection, "No payload expected %s l:%d\n", 4891 cmdname(pi.cmd), pi.size); 4892 goto err_out; 4893 } 4894 if (pi.size < shs) { 4895 drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n", 4896 cmdname(pi.cmd), (int)shs, pi.size); 4897 goto err_out; 4898 } 4899 4900 if (shs) { 4901 update_receiver_timing_details(connection, drbd_recv_all_warn); 4902 err = drbd_recv_all_warn(connection, pi.data, shs); 4903 if (err) 4904 goto err_out; 4905 pi.size -= shs; 4906 } 4907 4908 update_receiver_timing_details(connection, cmd->fn); 4909 err = cmd->fn(connection, &pi); 4910 if (err) { 4911 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 4912 cmdname(pi.cmd), err, pi.size); 4913 goto err_out; 4914 } 4915 } 4916 return; 4917 4918 err_out: 4919 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4920 } 4921 4922 static void conn_disconnect(struct drbd_connection *connection) 4923 { 4924 struct drbd_peer_device *peer_device; 4925 enum drbd_conns oc; 4926 int vnr; 4927 4928 if (connection->cstate == C_STANDALONE) 4929 return; 4930 4931 /* We are about to start the cleanup after connection loss. 4932 * Make sure drbd_make_request knows about that. 4933 * Usually we should be in some network failure state already, 4934 * but just in case we are not, we fix it up here. 4935 */ 4936 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 4937 4938 /* ack_receiver does not clean up anything. it must not interfere, either */ 4939 drbd_thread_stop(&connection->ack_receiver); 4940 if (connection->ack_sender) { 4941 destroy_workqueue(connection->ack_sender); 4942 connection->ack_sender = NULL; 4943 } 4944 drbd_free_sock(connection); 4945 4946 rcu_read_lock(); 4947 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 4948 struct drbd_device *device = peer_device->device; 4949 kref_get(&device->kref); 4950 rcu_read_unlock(); 4951 drbd_disconnected(peer_device); 4952 kref_put(&device->kref, drbd_destroy_device); 4953 rcu_read_lock(); 4954 } 4955 rcu_read_unlock(); 4956 4957 if (!list_empty(&connection->current_epoch->list)) 4958 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 4959 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 4960 atomic_set(&connection->current_epoch->epoch_size, 0); 4961 connection->send.seen_any_write_yet = false; 4962 4963 drbd_info(connection, "Connection closed\n"); 4964 4965 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 4966 conn_try_outdate_peer_async(connection); 4967 4968 spin_lock_irq(&connection->resource->req_lock); 4969 oc = connection->cstate; 4970 if (oc >= C_UNCONNECTED) 4971 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 4972 4973 spin_unlock_irq(&connection->resource->req_lock); 4974 4975 if (oc == C_DISCONNECTING) 4976 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 4977 } 4978 4979 static int drbd_disconnected(struct drbd_peer_device *peer_device) 4980 { 4981 struct drbd_device *device = peer_device->device; 4982 unsigned int i; 4983 4984 /* wait for current activity to cease. */ 4985 spin_lock_irq(&device->resource->req_lock); 4986 _drbd_wait_ee_list_empty(device, &device->active_ee); 4987 _drbd_wait_ee_list_empty(device, &device->sync_ee); 4988 _drbd_wait_ee_list_empty(device, &device->read_ee); 4989 spin_unlock_irq(&device->resource->req_lock); 4990 4991 /* We do not have data structures that would allow us to 4992 * get the rs_pending_cnt down to 0 again. 4993 * * On C_SYNC_TARGET we do not have any data structures describing 4994 * the pending RSDataRequest's we have sent. 4995 * * On C_SYNC_SOURCE there is no data structure that tracks 4996 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 4997 * And no, it is not the sum of the reference counts in the 4998 * resync_LRU. The resync_LRU tracks the whole operation including 4999 * the disk-IO, while the rs_pending_cnt only tracks the blocks 5000 * on the fly. */ 5001 drbd_rs_cancel_all(device); 5002 device->rs_total = 0; 5003 device->rs_failed = 0; 5004 atomic_set(&device->rs_pending_cnt, 0); 5005 wake_up(&device->misc_wait); 5006 5007 timer_delete_sync(&device->resync_timer); 5008 resync_timer_fn(&device->resync_timer); 5009 5010 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 5011 * w_make_resync_request etc. which may still be on the worker queue 5012 * to be "canceled" */ 5013 drbd_flush_workqueue(&peer_device->connection->sender_work); 5014 5015 drbd_finish_peer_reqs(device); 5016 5017 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 5018 might have issued a work again. The one before drbd_finish_peer_reqs() is 5019 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 5020 drbd_flush_workqueue(&peer_device->connection->sender_work); 5021 5022 /* need to do it again, drbd_finish_peer_reqs() may have populated it 5023 * again via drbd_try_clear_on_disk_bm(). */ 5024 drbd_rs_cancel_all(device); 5025 5026 kfree(device->p_uuid); 5027 device->p_uuid = NULL; 5028 5029 if (!drbd_suspended(device)) 5030 tl_clear(peer_device->connection); 5031 5032 drbd_md_sync(device); 5033 5034 if (get_ldev(device)) { 5035 drbd_bitmap_io(device, &drbd_bm_write_copy_pages, 5036 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL); 5037 put_ldev(device); 5038 } 5039 5040 i = atomic_read(&device->pp_in_use_by_net); 5041 if (i) 5042 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 5043 i = atomic_read(&device->pp_in_use); 5044 if (i) 5045 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 5046 5047 D_ASSERT(device, list_empty(&device->read_ee)); 5048 D_ASSERT(device, list_empty(&device->active_ee)); 5049 D_ASSERT(device, list_empty(&device->sync_ee)); 5050 D_ASSERT(device, list_empty(&device->done_ee)); 5051 5052 return 0; 5053 } 5054 5055 /* 5056 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 5057 * we can agree on is stored in agreed_pro_version. 5058 * 5059 * feature flags and the reserved array should be enough room for future 5060 * enhancements of the handshake protocol, and possible plugins... 5061 * 5062 * for now, they are expected to be zero, but ignored. 5063 */ 5064 static int drbd_send_features(struct drbd_connection *connection) 5065 { 5066 struct drbd_socket *sock; 5067 struct p_connection_features *p; 5068 5069 sock = &connection->data; 5070 p = conn_prepare_command(connection, sock); 5071 if (!p) 5072 return -EIO; 5073 memset(p, 0, sizeof(*p)); 5074 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 5075 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 5076 p->feature_flags = cpu_to_be32(PRO_FEATURES); 5077 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 5078 } 5079 5080 /* 5081 * return values: 5082 * 1 yes, we have a valid connection 5083 * 0 oops, did not work out, please try again 5084 * -1 peer talks different language, 5085 * no point in trying again, please go standalone. 5086 */ 5087 static int drbd_do_features(struct drbd_connection *connection) 5088 { 5089 /* ASSERT current == connection->receiver ... */ 5090 struct p_connection_features *p; 5091 const int expect = sizeof(struct p_connection_features); 5092 struct packet_info pi; 5093 int err; 5094 5095 err = drbd_send_features(connection); 5096 if (err) 5097 return 0; 5098 5099 err = drbd_recv_header(connection, &pi); 5100 if (err) 5101 return 0; 5102 5103 if (pi.cmd != P_CONNECTION_FEATURES) { 5104 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 5105 cmdname(pi.cmd), pi.cmd); 5106 return -1; 5107 } 5108 5109 if (pi.size != expect) { 5110 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 5111 expect, pi.size); 5112 return -1; 5113 } 5114 5115 p = pi.data; 5116 err = drbd_recv_all_warn(connection, p, expect); 5117 if (err) 5118 return 0; 5119 5120 p->protocol_min = be32_to_cpu(p->protocol_min); 5121 p->protocol_max = be32_to_cpu(p->protocol_max); 5122 if (p->protocol_max == 0) 5123 p->protocol_max = p->protocol_min; 5124 5125 if (PRO_VERSION_MAX < p->protocol_min || 5126 PRO_VERSION_MIN > p->protocol_max) 5127 goto incompat; 5128 5129 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 5130 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags); 5131 5132 drbd_info(connection, "Handshake successful: " 5133 "Agreed network protocol version %d\n", connection->agreed_pro_version); 5134 5135 drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n", 5136 connection->agreed_features, 5137 connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "", 5138 connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "", 5139 connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "", 5140 connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" : 5141 connection->agreed_features ? "" : " none"); 5142 5143 return 1; 5144 5145 incompat: 5146 drbd_err(connection, "incompatible DRBD dialects: " 5147 "I support %d-%d, peer supports %d-%d\n", 5148 PRO_VERSION_MIN, PRO_VERSION_MAX, 5149 p->protocol_min, p->protocol_max); 5150 return -1; 5151 } 5152 5153 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 5154 static int drbd_do_auth(struct drbd_connection *connection) 5155 { 5156 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 5157 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 5158 return -1; 5159 } 5160 #else 5161 #define CHALLENGE_LEN 64 5162 5163 /* Return value: 5164 1 - auth succeeded, 5165 0 - failed, try again (network error), 5166 -1 - auth failed, don't try again. 5167 */ 5168 5169 static int drbd_do_auth(struct drbd_connection *connection) 5170 { 5171 struct drbd_socket *sock; 5172 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 5173 char *response = NULL; 5174 char *right_response = NULL; 5175 char *peers_ch = NULL; 5176 unsigned int key_len; 5177 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 5178 unsigned int resp_size; 5179 struct shash_desc *desc; 5180 struct packet_info pi; 5181 struct net_conf *nc; 5182 int err, rv; 5183 5184 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 5185 5186 rcu_read_lock(); 5187 nc = rcu_dereference(connection->net_conf); 5188 key_len = strlen(nc->shared_secret); 5189 memcpy(secret, nc->shared_secret, key_len); 5190 rcu_read_unlock(); 5191 5192 desc = kmalloc(sizeof(struct shash_desc) + 5193 crypto_shash_descsize(connection->cram_hmac_tfm), 5194 GFP_KERNEL); 5195 if (!desc) { 5196 rv = -1; 5197 goto fail; 5198 } 5199 desc->tfm = connection->cram_hmac_tfm; 5200 5201 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 5202 if (rv) { 5203 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv); 5204 rv = -1; 5205 goto fail; 5206 } 5207 5208 get_random_bytes(my_challenge, CHALLENGE_LEN); 5209 5210 sock = &connection->data; 5211 if (!conn_prepare_command(connection, sock)) { 5212 rv = 0; 5213 goto fail; 5214 } 5215 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 5216 my_challenge, CHALLENGE_LEN); 5217 if (!rv) 5218 goto fail; 5219 5220 err = drbd_recv_header(connection, &pi); 5221 if (err) { 5222 rv = 0; 5223 goto fail; 5224 } 5225 5226 if (pi.cmd != P_AUTH_CHALLENGE) { 5227 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 5228 cmdname(pi.cmd), pi.cmd); 5229 rv = -1; 5230 goto fail; 5231 } 5232 5233 if (pi.size > CHALLENGE_LEN * 2) { 5234 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 5235 rv = -1; 5236 goto fail; 5237 } 5238 5239 if (pi.size < CHALLENGE_LEN) { 5240 drbd_err(connection, "AuthChallenge payload too small.\n"); 5241 rv = -1; 5242 goto fail; 5243 } 5244 5245 peers_ch = kmalloc(pi.size, GFP_NOIO); 5246 if (!peers_ch) { 5247 rv = -1; 5248 goto fail; 5249 } 5250 5251 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 5252 if (err) { 5253 rv = 0; 5254 goto fail; 5255 } 5256 5257 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) { 5258 drbd_err(connection, "Peer presented the same challenge!\n"); 5259 rv = -1; 5260 goto fail; 5261 } 5262 5263 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm); 5264 response = kmalloc(resp_size, GFP_NOIO); 5265 if (!response) { 5266 rv = -1; 5267 goto fail; 5268 } 5269 5270 rv = crypto_shash_digest(desc, peers_ch, pi.size, response); 5271 if (rv) { 5272 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5273 rv = -1; 5274 goto fail; 5275 } 5276 5277 if (!conn_prepare_command(connection, sock)) { 5278 rv = 0; 5279 goto fail; 5280 } 5281 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 5282 response, resp_size); 5283 if (!rv) 5284 goto fail; 5285 5286 err = drbd_recv_header(connection, &pi); 5287 if (err) { 5288 rv = 0; 5289 goto fail; 5290 } 5291 5292 if (pi.cmd != P_AUTH_RESPONSE) { 5293 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 5294 cmdname(pi.cmd), pi.cmd); 5295 rv = 0; 5296 goto fail; 5297 } 5298 5299 if (pi.size != resp_size) { 5300 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 5301 rv = 0; 5302 goto fail; 5303 } 5304 5305 err = drbd_recv_all_warn(connection, response , resp_size); 5306 if (err) { 5307 rv = 0; 5308 goto fail; 5309 } 5310 5311 right_response = kmalloc(resp_size, GFP_NOIO); 5312 if (!right_response) { 5313 rv = -1; 5314 goto fail; 5315 } 5316 5317 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN, 5318 right_response); 5319 if (rv) { 5320 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5321 rv = -1; 5322 goto fail; 5323 } 5324 5325 rv = !memcmp(response, right_response, resp_size); 5326 5327 if (rv) 5328 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 5329 resp_size); 5330 else 5331 rv = -1; 5332 5333 fail: 5334 kfree(peers_ch); 5335 kfree(response); 5336 kfree(right_response); 5337 if (desc) { 5338 shash_desc_zero(desc); 5339 kfree(desc); 5340 } 5341 5342 return rv; 5343 } 5344 #endif 5345 5346 int drbd_receiver(struct drbd_thread *thi) 5347 { 5348 struct drbd_connection *connection = thi->connection; 5349 int h; 5350 5351 drbd_info(connection, "receiver (re)started\n"); 5352 5353 do { 5354 h = conn_connect(connection); 5355 if (h == 0) { 5356 conn_disconnect(connection); 5357 schedule_timeout_interruptible(HZ); 5358 } 5359 if (h == -1) { 5360 drbd_warn(connection, "Discarding network configuration.\n"); 5361 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5362 } 5363 } while (h == 0); 5364 5365 if (h > 0) { 5366 blk_start_plug(&connection->receiver_plug); 5367 drbdd(connection); 5368 blk_finish_plug(&connection->receiver_plug); 5369 } 5370 5371 conn_disconnect(connection); 5372 5373 drbd_info(connection, "receiver terminated\n"); 5374 return 0; 5375 } 5376 5377 /* ********* acknowledge sender ******** */ 5378 5379 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5380 { 5381 struct p_req_state_reply *p = pi->data; 5382 int retcode = be32_to_cpu(p->retcode); 5383 5384 if (retcode >= SS_SUCCESS) { 5385 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 5386 } else { 5387 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 5388 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 5389 drbd_set_st_err_str(retcode), retcode); 5390 } 5391 wake_up(&connection->ping_wait); 5392 5393 return 0; 5394 } 5395 5396 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5397 { 5398 struct drbd_peer_device *peer_device; 5399 struct drbd_device *device; 5400 struct p_req_state_reply *p = pi->data; 5401 int retcode = be32_to_cpu(p->retcode); 5402 5403 peer_device = conn_peer_device(connection, pi->vnr); 5404 if (!peer_device) 5405 return -EIO; 5406 device = peer_device->device; 5407 5408 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 5409 D_ASSERT(device, connection->agreed_pro_version < 100); 5410 return got_conn_RqSReply(connection, pi); 5411 } 5412 5413 if (retcode >= SS_SUCCESS) { 5414 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 5415 } else { 5416 set_bit(CL_ST_CHG_FAIL, &device->flags); 5417 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 5418 drbd_set_st_err_str(retcode), retcode); 5419 } 5420 wake_up(&device->state_wait); 5421 5422 return 0; 5423 } 5424 5425 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 5426 { 5427 return drbd_send_ping_ack(connection); 5428 5429 } 5430 5431 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 5432 { 5433 /* restore idle timeout */ 5434 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 5435 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 5436 wake_up(&connection->ping_wait); 5437 5438 return 0; 5439 } 5440 5441 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 5442 { 5443 struct drbd_peer_device *peer_device; 5444 struct drbd_device *device; 5445 struct p_block_ack *p = pi->data; 5446 sector_t sector = be64_to_cpu(p->sector); 5447 int blksize = be32_to_cpu(p->blksize); 5448 5449 peer_device = conn_peer_device(connection, pi->vnr); 5450 if (!peer_device) 5451 return -EIO; 5452 device = peer_device->device; 5453 5454 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 5455 5456 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5457 5458 if (get_ldev(device)) { 5459 drbd_rs_complete_io(device, sector); 5460 drbd_set_in_sync(peer_device, sector, blksize); 5461 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 5462 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 5463 put_ldev(device); 5464 } 5465 dec_rs_pending(peer_device); 5466 atomic_add(blksize >> 9, &device->rs_sect_in); 5467 5468 return 0; 5469 } 5470 5471 static int 5472 validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 5473 struct rb_root *root, const char *func, 5474 enum drbd_req_event what, bool missing_ok) 5475 { 5476 struct drbd_device *device = peer_device->device; 5477 struct drbd_request *req; 5478 struct bio_and_error m; 5479 5480 spin_lock_irq(&device->resource->req_lock); 5481 req = find_request(device, root, id, sector, missing_ok, func); 5482 if (unlikely(!req)) { 5483 spin_unlock_irq(&device->resource->req_lock); 5484 return -EIO; 5485 } 5486 __req_mod(req, what, peer_device, &m); 5487 spin_unlock_irq(&device->resource->req_lock); 5488 5489 if (m.bio) 5490 complete_master_bio(device, &m); 5491 return 0; 5492 } 5493 5494 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5495 { 5496 struct drbd_peer_device *peer_device; 5497 struct drbd_device *device; 5498 struct p_block_ack *p = pi->data; 5499 sector_t sector = be64_to_cpu(p->sector); 5500 int blksize = be32_to_cpu(p->blksize); 5501 enum drbd_req_event what; 5502 5503 peer_device = conn_peer_device(connection, pi->vnr); 5504 if (!peer_device) 5505 return -EIO; 5506 device = peer_device->device; 5507 5508 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5509 5510 if (p->block_id == ID_SYNCER) { 5511 drbd_set_in_sync(peer_device, sector, blksize); 5512 dec_rs_pending(peer_device); 5513 return 0; 5514 } 5515 switch (pi->cmd) { 5516 case P_RS_WRITE_ACK: 5517 what = WRITE_ACKED_BY_PEER_AND_SIS; 5518 break; 5519 case P_WRITE_ACK: 5520 what = WRITE_ACKED_BY_PEER; 5521 break; 5522 case P_RECV_ACK: 5523 what = RECV_ACKED_BY_PEER; 5524 break; 5525 case P_SUPERSEDED: 5526 what = CONFLICT_RESOLVED; 5527 break; 5528 case P_RETRY_WRITE: 5529 what = POSTPONE_WRITE; 5530 break; 5531 default: 5532 BUG(); 5533 } 5534 5535 return validate_req_change_req_state(peer_device, p->block_id, sector, 5536 &device->write_requests, __func__, 5537 what, false); 5538 } 5539 5540 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5541 { 5542 struct drbd_peer_device *peer_device; 5543 struct drbd_device *device; 5544 struct p_block_ack *p = pi->data; 5545 sector_t sector = be64_to_cpu(p->sector); 5546 int size = be32_to_cpu(p->blksize); 5547 int err; 5548 5549 peer_device = conn_peer_device(connection, pi->vnr); 5550 if (!peer_device) 5551 return -EIO; 5552 device = peer_device->device; 5553 5554 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5555 5556 if (p->block_id == ID_SYNCER) { 5557 dec_rs_pending(peer_device); 5558 drbd_rs_failed_io(peer_device, sector, size); 5559 return 0; 5560 } 5561 5562 err = validate_req_change_req_state(peer_device, p->block_id, sector, 5563 &device->write_requests, __func__, 5564 NEG_ACKED, true); 5565 if (err) { 5566 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5567 The master bio might already be completed, therefore the 5568 request is no longer in the collision hash. */ 5569 /* In Protocol B we might already have got a P_RECV_ACK 5570 but then get a P_NEG_ACK afterwards. */ 5571 drbd_set_out_of_sync(peer_device, sector, size); 5572 } 5573 return 0; 5574 } 5575 5576 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5577 { 5578 struct drbd_peer_device *peer_device; 5579 struct drbd_device *device; 5580 struct p_block_ack *p = pi->data; 5581 sector_t sector = be64_to_cpu(p->sector); 5582 5583 peer_device = conn_peer_device(connection, pi->vnr); 5584 if (!peer_device) 5585 return -EIO; 5586 device = peer_device->device; 5587 5588 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5589 5590 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5591 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5592 5593 return validate_req_change_req_state(peer_device, p->block_id, sector, 5594 &device->read_requests, __func__, 5595 NEG_ACKED, false); 5596 } 5597 5598 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5599 { 5600 struct drbd_peer_device *peer_device; 5601 struct drbd_device *device; 5602 sector_t sector; 5603 int size; 5604 struct p_block_ack *p = pi->data; 5605 5606 peer_device = conn_peer_device(connection, pi->vnr); 5607 if (!peer_device) 5608 return -EIO; 5609 device = peer_device->device; 5610 5611 sector = be64_to_cpu(p->sector); 5612 size = be32_to_cpu(p->blksize); 5613 5614 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5615 5616 dec_rs_pending(peer_device); 5617 5618 if (get_ldev_if_state(device, D_FAILED)) { 5619 drbd_rs_complete_io(device, sector); 5620 switch (pi->cmd) { 5621 case P_NEG_RS_DREPLY: 5622 drbd_rs_failed_io(peer_device, sector, size); 5623 break; 5624 case P_RS_CANCEL: 5625 break; 5626 default: 5627 BUG(); 5628 } 5629 put_ldev(device); 5630 } 5631 5632 return 0; 5633 } 5634 5635 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5636 { 5637 struct p_barrier_ack *p = pi->data; 5638 struct drbd_peer_device *peer_device; 5639 int vnr; 5640 5641 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5642 5643 rcu_read_lock(); 5644 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5645 struct drbd_device *device = peer_device->device; 5646 5647 if (device->state.conn == C_AHEAD && 5648 atomic_read(&device->ap_in_flight) == 0 && 5649 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5650 device->start_resync_timer.expires = jiffies + HZ; 5651 add_timer(&device->start_resync_timer); 5652 } 5653 } 5654 rcu_read_unlock(); 5655 5656 return 0; 5657 } 5658 5659 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5660 { 5661 struct drbd_peer_device *peer_device; 5662 struct drbd_device *device; 5663 struct p_block_ack *p = pi->data; 5664 struct drbd_device_work *dw; 5665 sector_t sector; 5666 int size; 5667 5668 peer_device = conn_peer_device(connection, pi->vnr); 5669 if (!peer_device) 5670 return -EIO; 5671 device = peer_device->device; 5672 5673 sector = be64_to_cpu(p->sector); 5674 size = be32_to_cpu(p->blksize); 5675 5676 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5677 5678 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5679 drbd_ov_out_of_sync_found(peer_device, sector, size); 5680 else 5681 ov_out_of_sync_print(peer_device); 5682 5683 if (!get_ldev(device)) 5684 return 0; 5685 5686 drbd_rs_complete_io(device, sector); 5687 dec_rs_pending(peer_device); 5688 5689 --device->ov_left; 5690 5691 /* let's advance progress step marks only for every other megabyte */ 5692 if ((device->ov_left & 0x200) == 0x200) 5693 drbd_advance_rs_marks(peer_device, device->ov_left); 5694 5695 if (device->ov_left == 0) { 5696 dw = kmalloc_obj(*dw, GFP_NOIO); 5697 if (dw) { 5698 dw->w.cb = w_ov_finished; 5699 dw->device = device; 5700 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5701 } else { 5702 drbd_err(device, "kmalloc(dw) failed."); 5703 ov_out_of_sync_print(peer_device); 5704 drbd_resync_finished(peer_device); 5705 } 5706 } 5707 put_ldev(device); 5708 return 0; 5709 } 5710 5711 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5712 { 5713 return 0; 5714 } 5715 5716 struct meta_sock_cmd { 5717 size_t pkt_size; 5718 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5719 }; 5720 5721 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout) 5722 { 5723 long t; 5724 struct net_conf *nc; 5725 5726 rcu_read_lock(); 5727 nc = rcu_dereference(connection->net_conf); 5728 t = ping_timeout ? nc->ping_timeo : nc->ping_int; 5729 rcu_read_unlock(); 5730 5731 t *= HZ; 5732 if (ping_timeout) 5733 t /= 10; 5734 5735 connection->meta.socket->sk->sk_rcvtimeo = t; 5736 } 5737 5738 static void set_ping_timeout(struct drbd_connection *connection) 5739 { 5740 set_rcvtimeo(connection, 1); 5741 } 5742 5743 static void set_idle_timeout(struct drbd_connection *connection) 5744 { 5745 set_rcvtimeo(connection, 0); 5746 } 5747 5748 static struct meta_sock_cmd ack_receiver_tbl[] = { 5749 [P_PING] = { 0, got_Ping }, 5750 [P_PING_ACK] = { 0, got_PingAck }, 5751 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5752 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5753 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5754 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5755 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5756 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5757 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5758 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5759 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5760 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5761 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5762 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5763 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5764 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5765 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5766 }; 5767 5768 int drbd_ack_receiver(struct drbd_thread *thi) 5769 { 5770 struct drbd_connection *connection = thi->connection; 5771 struct meta_sock_cmd *cmd = NULL; 5772 struct packet_info pi; 5773 unsigned long pre_recv_jif; 5774 int rv; 5775 void *buf = connection->meta.rbuf; 5776 int received = 0; 5777 unsigned int header_size = drbd_header_size(connection); 5778 int expect = header_size; 5779 bool ping_timeout_active = false; 5780 5781 sched_set_fifo_low(current); 5782 5783 while (get_t_state(thi) == RUNNING) { 5784 drbd_thread_current_set_cpu(thi); 5785 5786 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5787 if (drbd_send_ping(connection)) { 5788 drbd_err(connection, "drbd_send_ping has failed\n"); 5789 goto reconnect; 5790 } 5791 set_ping_timeout(connection); 5792 ping_timeout_active = true; 5793 } 5794 5795 pre_recv_jif = jiffies; 5796 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5797 5798 /* Note: 5799 * -EINTR (on meta) we got a signal 5800 * -EAGAIN (on meta) rcvtimeo expired 5801 * -ECONNRESET other side closed the connection 5802 * -ERESTARTSYS (on data) we got a signal 5803 * rv < 0 other than above: unexpected error! 5804 * rv == expected: full header or command 5805 * rv < expected: "woken" by signal during receive 5806 * rv == 0 : "connection shut down by peer" 5807 */ 5808 if (likely(rv > 0)) { 5809 received += rv; 5810 buf += rv; 5811 } else if (rv == 0) { 5812 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5813 long t; 5814 rcu_read_lock(); 5815 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5816 rcu_read_unlock(); 5817 5818 t = wait_event_timeout(connection->ping_wait, 5819 connection->cstate < C_WF_REPORT_PARAMS, 5820 t); 5821 if (t) 5822 break; 5823 } 5824 drbd_err(connection, "meta connection shut down by peer.\n"); 5825 goto reconnect; 5826 } else if (rv == -EAGAIN) { 5827 /* If the data socket received something meanwhile, 5828 * that is good enough: peer is still alive. */ 5829 if (time_after(connection->last_received, pre_recv_jif)) 5830 continue; 5831 if (ping_timeout_active) { 5832 drbd_err(connection, "PingAck did not arrive in time.\n"); 5833 goto reconnect; 5834 } 5835 set_bit(SEND_PING, &connection->flags); 5836 continue; 5837 } else if (rv == -EINTR) { 5838 /* maybe drbd_thread_stop(): the while condition will notice. 5839 * maybe woken for send_ping: we'll send a ping above, 5840 * and change the rcvtimeo */ 5841 flush_signals(current); 5842 continue; 5843 } else { 5844 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5845 goto reconnect; 5846 } 5847 5848 if (received == expect && cmd == NULL) { 5849 if (decode_header(connection, connection->meta.rbuf, &pi)) 5850 goto reconnect; 5851 cmd = &ack_receiver_tbl[pi.cmd]; 5852 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) { 5853 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5854 cmdname(pi.cmd), pi.cmd); 5855 goto disconnect; 5856 } 5857 expect = header_size + cmd->pkt_size; 5858 if (pi.size != expect - header_size) { 5859 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5860 pi.cmd, pi.size); 5861 goto reconnect; 5862 } 5863 } 5864 if (received == expect) { 5865 bool err; 5866 5867 err = cmd->fn(connection, &pi); 5868 if (err) { 5869 drbd_err(connection, "%ps failed\n", cmd->fn); 5870 goto reconnect; 5871 } 5872 5873 connection->last_received = jiffies; 5874 5875 if (cmd == &ack_receiver_tbl[P_PING_ACK]) { 5876 set_idle_timeout(connection); 5877 ping_timeout_active = false; 5878 } 5879 5880 buf = connection->meta.rbuf; 5881 received = 0; 5882 expect = header_size; 5883 cmd = NULL; 5884 } 5885 } 5886 5887 if (0) { 5888 reconnect: 5889 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5890 conn_md_sync(connection); 5891 } 5892 if (0) { 5893 disconnect: 5894 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5895 } 5896 5897 drbd_info(connection, "ack_receiver terminated\n"); 5898 5899 return 0; 5900 } 5901 5902 void drbd_send_acks_wf(struct work_struct *ws) 5903 { 5904 struct drbd_peer_device *peer_device = 5905 container_of(ws, struct drbd_peer_device, send_acks_work); 5906 struct drbd_connection *connection = peer_device->connection; 5907 struct drbd_device *device = peer_device->device; 5908 struct net_conf *nc; 5909 int tcp_cork, err; 5910 5911 rcu_read_lock(); 5912 nc = rcu_dereference(connection->net_conf); 5913 tcp_cork = nc->tcp_cork; 5914 rcu_read_unlock(); 5915 5916 if (tcp_cork) 5917 tcp_sock_set_cork(connection->meta.socket->sk, true); 5918 5919 err = drbd_finish_peer_reqs(device); 5920 kref_put(&device->kref, drbd_destroy_device); 5921 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the 5922 struct work_struct send_acks_work alive, which is in the peer_device object */ 5923 5924 if (err) { 5925 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5926 return; 5927 } 5928 5929 if (tcp_cork) 5930 tcp_sock_set_cork(connection->meta.socket->sk, false); 5931 5932 return; 5933 } 5934