1 /* 2 drbd_receiver.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 */ 24 25 26 #include <linux/module.h> 27 28 #include <linux/uaccess.h> 29 #include <net/sock.h> 30 31 #include <linux/drbd.h> 32 #include <linux/fs.h> 33 #include <linux/file.h> 34 #include <linux/in.h> 35 #include <linux/mm.h> 36 #include <linux/memcontrol.h> 37 #include <linux/mm_inline.h> 38 #include <linux/slab.h> 39 #include <uapi/linux/sched/types.h> 40 #include <linux/pkt_sched.h> 41 #define __KERNEL_SYSCALLS__ 42 #include <linux/unistd.h> 43 #include <linux/vmalloc.h> 44 #include <linux/random.h> 45 #include <linux/string.h> 46 #include <linux/scatterlist.h> 47 #include "drbd_int.h" 48 #include "drbd_protocol.h" 49 #include "drbd_req.h" 50 #include "drbd_vli.h" 51 52 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME) 53 54 struct packet_info { 55 enum drbd_packet cmd; 56 unsigned int size; 57 unsigned int vnr; 58 void *data; 59 }; 60 61 enum finish_epoch { 62 FE_STILL_LIVE, 63 FE_DESTROYED, 64 FE_RECYCLED, 65 }; 66 67 static int drbd_do_features(struct drbd_connection *connection); 68 static int drbd_do_auth(struct drbd_connection *connection); 69 static int drbd_disconnected(struct drbd_peer_device *); 70 static void conn_wait_active_ee_empty(struct drbd_connection *connection); 71 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event); 72 static int e_end_block(struct drbd_work *, int); 73 74 75 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 76 77 /* 78 * some helper functions to deal with single linked page lists, 79 * page->private being our "next" pointer. 80 */ 81 82 /* If at least n pages are linked at head, get n pages off. 83 * Otherwise, don't modify head, and return NULL. 84 * Locking is the responsibility of the caller. 85 */ 86 static struct page *page_chain_del(struct page **head, int n) 87 { 88 struct page *page; 89 struct page *tmp; 90 91 BUG_ON(!n); 92 BUG_ON(!head); 93 94 page = *head; 95 96 if (!page) 97 return NULL; 98 99 while (page) { 100 tmp = page_chain_next(page); 101 if (--n == 0) 102 break; /* found sufficient pages */ 103 if (tmp == NULL) 104 /* insufficient pages, don't use any of them. */ 105 return NULL; 106 page = tmp; 107 } 108 109 /* add end of list marker for the returned list */ 110 set_page_private(page, 0); 111 /* actual return value, and adjustment of head */ 112 page = *head; 113 *head = tmp; 114 return page; 115 } 116 117 /* may be used outside of locks to find the tail of a (usually short) 118 * "private" page chain, before adding it back to a global chain head 119 * with page_chain_add() under a spinlock. */ 120 static struct page *page_chain_tail(struct page *page, int *len) 121 { 122 struct page *tmp; 123 int i = 1; 124 while ((tmp = page_chain_next(page))) 125 ++i, page = tmp; 126 if (len) 127 *len = i; 128 return page; 129 } 130 131 static int page_chain_free(struct page *page) 132 { 133 struct page *tmp; 134 int i = 0; 135 page_chain_for_each_safe(page, tmp) { 136 put_page(page); 137 ++i; 138 } 139 return i; 140 } 141 142 static void page_chain_add(struct page **head, 143 struct page *chain_first, struct page *chain_last) 144 { 145 #if 1 146 struct page *tmp; 147 tmp = page_chain_tail(chain_first, NULL); 148 BUG_ON(tmp != chain_last); 149 #endif 150 151 /* add chain to head */ 152 set_page_private(chain_last, (unsigned long)*head); 153 *head = chain_first; 154 } 155 156 static struct page *__drbd_alloc_pages(struct drbd_device *device, 157 unsigned int number) 158 { 159 struct page *page = NULL; 160 struct page *tmp = NULL; 161 unsigned int i = 0; 162 163 /* Yes, testing drbd_pp_vacant outside the lock is racy. 164 * So what. It saves a spin_lock. */ 165 if (drbd_pp_vacant >= number) { 166 spin_lock(&drbd_pp_lock); 167 page = page_chain_del(&drbd_pp_pool, number); 168 if (page) 169 drbd_pp_vacant -= number; 170 spin_unlock(&drbd_pp_lock); 171 if (page) 172 return page; 173 } 174 175 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD 176 * "criss-cross" setup, that might cause write-out on some other DRBD, 177 * which in turn might block on the other node at this very place. */ 178 for (i = 0; i < number; i++) { 179 tmp = alloc_page(GFP_TRY); 180 if (!tmp) 181 break; 182 set_page_private(tmp, (unsigned long)page); 183 page = tmp; 184 } 185 186 if (i == number) 187 return page; 188 189 /* Not enough pages immediately available this time. 190 * No need to jump around here, drbd_alloc_pages will retry this 191 * function "soon". */ 192 if (page) { 193 tmp = page_chain_tail(page, NULL); 194 spin_lock(&drbd_pp_lock); 195 page_chain_add(&drbd_pp_pool, page, tmp); 196 drbd_pp_vacant += i; 197 spin_unlock(&drbd_pp_lock); 198 } 199 return NULL; 200 } 201 202 static void reclaim_finished_net_peer_reqs(struct drbd_device *device, 203 struct list_head *to_be_freed) 204 { 205 struct drbd_peer_request *peer_req, *tmp; 206 207 /* The EEs are always appended to the end of the list. Since 208 they are sent in order over the wire, they have to finish 209 in order. As soon as we see the first not finished we can 210 stop to examine the list... */ 211 212 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) { 213 if (drbd_peer_req_has_active_page(peer_req)) 214 break; 215 list_move(&peer_req->w.list, to_be_freed); 216 } 217 } 218 219 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device) 220 { 221 LIST_HEAD(reclaimed); 222 struct drbd_peer_request *peer_req, *t; 223 224 spin_lock_irq(&device->resource->req_lock); 225 reclaim_finished_net_peer_reqs(device, &reclaimed); 226 spin_unlock_irq(&device->resource->req_lock); 227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 228 drbd_free_net_peer_req(device, peer_req); 229 } 230 231 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection) 232 { 233 struct drbd_peer_device *peer_device; 234 int vnr; 235 236 rcu_read_lock(); 237 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 238 struct drbd_device *device = peer_device->device; 239 if (!atomic_read(&device->pp_in_use_by_net)) 240 continue; 241 242 kref_get(&device->kref); 243 rcu_read_unlock(); 244 drbd_reclaim_net_peer_reqs(device); 245 kref_put(&device->kref, drbd_destroy_device); 246 rcu_read_lock(); 247 } 248 rcu_read_unlock(); 249 } 250 251 /** 252 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled) 253 * @device: DRBD device. 254 * @number: number of pages requested 255 * @retry: whether to retry, if not enough pages are available right now 256 * 257 * Tries to allocate number pages, first from our own page pool, then from 258 * the kernel. 259 * Possibly retry until DRBD frees sufficient pages somewhere else. 260 * 261 * If this allocation would exceed the max_buffers setting, we throttle 262 * allocation (schedule_timeout) to give the system some room to breathe. 263 * 264 * We do not use max-buffers as hard limit, because it could lead to 265 * congestion and further to a distributed deadlock during online-verify or 266 * (checksum based) resync, if the max-buffers, socket buffer sizes and 267 * resync-rate settings are mis-configured. 268 * 269 * Returns a page chain linked via page->private. 270 */ 271 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number, 272 bool retry) 273 { 274 struct drbd_device *device = peer_device->device; 275 struct page *page = NULL; 276 struct net_conf *nc; 277 DEFINE_WAIT(wait); 278 unsigned int mxb; 279 280 rcu_read_lock(); 281 nc = rcu_dereference(peer_device->connection->net_conf); 282 mxb = nc ? nc->max_buffers : 1000000; 283 rcu_read_unlock(); 284 285 if (atomic_read(&device->pp_in_use) < mxb) 286 page = __drbd_alloc_pages(device, number); 287 288 /* Try to keep the fast path fast, but occasionally we need 289 * to reclaim the pages we lended to the network stack. */ 290 if (page && atomic_read(&device->pp_in_use_by_net) > 512) 291 drbd_reclaim_net_peer_reqs(device); 292 293 while (page == NULL) { 294 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE); 295 296 drbd_reclaim_net_peer_reqs(device); 297 298 if (atomic_read(&device->pp_in_use) < mxb) { 299 page = __drbd_alloc_pages(device, number); 300 if (page) 301 break; 302 } 303 304 if (!retry) 305 break; 306 307 if (signal_pending(current)) { 308 drbd_warn(device, "drbd_alloc_pages interrupted!\n"); 309 break; 310 } 311 312 if (schedule_timeout(HZ/10) == 0) 313 mxb = UINT_MAX; 314 } 315 finish_wait(&drbd_pp_wait, &wait); 316 317 if (page) 318 atomic_add(number, &device->pp_in_use); 319 return page; 320 } 321 322 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages. 323 * Is also used from inside an other spin_lock_irq(&resource->req_lock); 324 * Either links the page chain back to the global pool, 325 * or returns all pages to the system. */ 326 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net) 327 { 328 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use; 329 int i; 330 331 if (page == NULL) 332 return; 333 334 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count) 335 i = page_chain_free(page); 336 else { 337 struct page *tmp; 338 tmp = page_chain_tail(page, &i); 339 spin_lock(&drbd_pp_lock); 340 page_chain_add(&drbd_pp_pool, page, tmp); 341 drbd_pp_vacant += i; 342 spin_unlock(&drbd_pp_lock); 343 } 344 i = atomic_sub_return(i, a); 345 if (i < 0) 346 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n", 347 is_net ? "pp_in_use_by_net" : "pp_in_use", i); 348 wake_up(&drbd_pp_wait); 349 } 350 351 /* 352 You need to hold the req_lock: 353 _drbd_wait_ee_list_empty() 354 355 You must not have the req_lock: 356 drbd_free_peer_req() 357 drbd_alloc_peer_req() 358 drbd_free_peer_reqs() 359 drbd_ee_fix_bhs() 360 drbd_finish_peer_reqs() 361 drbd_clear_done_ee() 362 drbd_wait_ee_list_empty() 363 */ 364 365 /* normal: payload_size == request size (bi_size) 366 * w_same: payload_size == logical_block_size 367 * trim: payload_size == 0 */ 368 struct drbd_peer_request * 369 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 370 unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local) 371 { 372 struct drbd_device *device = peer_device->device; 373 struct drbd_peer_request *peer_req; 374 struct page *page = NULL; 375 unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT; 376 377 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE)) 378 return NULL; 379 380 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM); 381 if (!peer_req) { 382 if (!(gfp_mask & __GFP_NOWARN)) 383 drbd_err(device, "%s: allocation failed\n", __func__); 384 return NULL; 385 } 386 387 if (nr_pages) { 388 page = drbd_alloc_pages(peer_device, nr_pages, 389 gfpflags_allow_blocking(gfp_mask)); 390 if (!page) 391 goto fail; 392 } 393 394 memset(peer_req, 0, sizeof(*peer_req)); 395 INIT_LIST_HEAD(&peer_req->w.list); 396 drbd_clear_interval(&peer_req->i); 397 peer_req->i.size = request_size; 398 peer_req->i.sector = sector; 399 peer_req->submit_jif = jiffies; 400 peer_req->peer_device = peer_device; 401 peer_req->pages = page; 402 /* 403 * The block_id is opaque to the receiver. It is not endianness 404 * converted, and sent back to the sender unchanged. 405 */ 406 peer_req->block_id = id; 407 408 return peer_req; 409 410 fail: 411 mempool_free(peer_req, drbd_ee_mempool); 412 return NULL; 413 } 414 415 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req, 416 int is_net) 417 { 418 might_sleep(); 419 if (peer_req->flags & EE_HAS_DIGEST) 420 kfree(peer_req->digest); 421 drbd_free_pages(device, peer_req->pages, is_net); 422 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0); 423 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 424 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) { 425 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 426 drbd_al_complete_io(device, &peer_req->i); 427 } 428 mempool_free(peer_req, drbd_ee_mempool); 429 } 430 431 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list) 432 { 433 LIST_HEAD(work_list); 434 struct drbd_peer_request *peer_req, *t; 435 int count = 0; 436 int is_net = list == &device->net_ee; 437 438 spin_lock_irq(&device->resource->req_lock); 439 list_splice_init(list, &work_list); 440 spin_unlock_irq(&device->resource->req_lock); 441 442 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 443 __drbd_free_peer_req(device, peer_req, is_net); 444 count++; 445 } 446 return count; 447 } 448 449 /* 450 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier. 451 */ 452 static int drbd_finish_peer_reqs(struct drbd_device *device) 453 { 454 LIST_HEAD(work_list); 455 LIST_HEAD(reclaimed); 456 struct drbd_peer_request *peer_req, *t; 457 int err = 0; 458 459 spin_lock_irq(&device->resource->req_lock); 460 reclaim_finished_net_peer_reqs(device, &reclaimed); 461 list_splice_init(&device->done_ee, &work_list); 462 spin_unlock_irq(&device->resource->req_lock); 463 464 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list) 465 drbd_free_net_peer_req(device, peer_req); 466 467 /* possible callbacks here: 468 * e_end_block, and e_end_resync_block, e_send_superseded. 469 * all ignore the last argument. 470 */ 471 list_for_each_entry_safe(peer_req, t, &work_list, w.list) { 472 int err2; 473 474 /* list_del not necessary, next/prev members not touched */ 475 err2 = peer_req->w.cb(&peer_req->w, !!err); 476 if (!err) 477 err = err2; 478 drbd_free_peer_req(device, peer_req); 479 } 480 wake_up(&device->ee_wait); 481 482 return err; 483 } 484 485 static void _drbd_wait_ee_list_empty(struct drbd_device *device, 486 struct list_head *head) 487 { 488 DEFINE_WAIT(wait); 489 490 /* avoids spin_lock/unlock 491 * and calling prepare_to_wait in the fast path */ 492 while (!list_empty(head)) { 493 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE); 494 spin_unlock_irq(&device->resource->req_lock); 495 io_schedule(); 496 finish_wait(&device->ee_wait, &wait); 497 spin_lock_irq(&device->resource->req_lock); 498 } 499 } 500 501 static void drbd_wait_ee_list_empty(struct drbd_device *device, 502 struct list_head *head) 503 { 504 spin_lock_irq(&device->resource->req_lock); 505 _drbd_wait_ee_list_empty(device, head); 506 spin_unlock_irq(&device->resource->req_lock); 507 } 508 509 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags) 510 { 511 struct kvec iov = { 512 .iov_base = buf, 513 .iov_len = size, 514 }; 515 struct msghdr msg = { 516 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL) 517 }; 518 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags); 519 } 520 521 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size) 522 { 523 int rv; 524 525 rv = drbd_recv_short(connection->data.socket, buf, size, 0); 526 527 if (rv < 0) { 528 if (rv == -ECONNRESET) 529 drbd_info(connection, "sock was reset by peer\n"); 530 else if (rv != -ERESTARTSYS) 531 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 532 } else if (rv == 0) { 533 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 534 long t; 535 rcu_read_lock(); 536 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 537 rcu_read_unlock(); 538 539 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t); 540 541 if (t) 542 goto out; 543 } 544 drbd_info(connection, "sock was shut down by peer\n"); 545 } 546 547 if (rv != size) 548 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); 549 550 out: 551 return rv; 552 } 553 554 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size) 555 { 556 int err; 557 558 err = drbd_recv(connection, buf, size); 559 if (err != size) { 560 if (err >= 0) 561 err = -EIO; 562 } else 563 err = 0; 564 return err; 565 } 566 567 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size) 568 { 569 int err; 570 571 err = drbd_recv_all(connection, buf, size); 572 if (err && !signal_pending(current)) 573 drbd_warn(connection, "short read (expected size %d)\n", (int)size); 574 return err; 575 } 576 577 /* quoting tcp(7): 578 * On individual connections, the socket buffer size must be set prior to the 579 * listen(2) or connect(2) calls in order to have it take effect. 580 * This is our wrapper to do so. 581 */ 582 static void drbd_setbufsize(struct socket *sock, unsigned int snd, 583 unsigned int rcv) 584 { 585 /* open coded SO_SNDBUF, SO_RCVBUF */ 586 if (snd) { 587 sock->sk->sk_sndbuf = snd; 588 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 589 } 590 if (rcv) { 591 sock->sk->sk_rcvbuf = rcv; 592 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 593 } 594 } 595 596 static struct socket *drbd_try_connect(struct drbd_connection *connection) 597 { 598 const char *what; 599 struct socket *sock; 600 struct sockaddr_in6 src_in6; 601 struct sockaddr_in6 peer_in6; 602 struct net_conf *nc; 603 int err, peer_addr_len, my_addr_len; 604 int sndbuf_size, rcvbuf_size, connect_int; 605 int disconnect_on_error = 1; 606 607 rcu_read_lock(); 608 nc = rcu_dereference(connection->net_conf); 609 if (!nc) { 610 rcu_read_unlock(); 611 return NULL; 612 } 613 sndbuf_size = nc->sndbuf_size; 614 rcvbuf_size = nc->rcvbuf_size; 615 connect_int = nc->connect_int; 616 rcu_read_unlock(); 617 618 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6)); 619 memcpy(&src_in6, &connection->my_addr, my_addr_len); 620 621 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6) 622 src_in6.sin6_port = 0; 623 else 624 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */ 625 626 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6)); 627 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len); 628 629 what = "sock_create_kern"; 630 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family, 631 SOCK_STREAM, IPPROTO_TCP, &sock); 632 if (err < 0) { 633 sock = NULL; 634 goto out; 635 } 636 637 sock->sk->sk_rcvtimeo = 638 sock->sk->sk_sndtimeo = connect_int * HZ; 639 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size); 640 641 /* explicitly bind to the configured IP as source IP 642 * for the outgoing connections. 643 * This is needed for multihomed hosts and to be 644 * able to use lo: interfaces for drbd. 645 * Make sure to use 0 as port number, so linux selects 646 * a free one dynamically. 647 */ 648 what = "bind before connect"; 649 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len); 650 if (err < 0) 651 goto out; 652 653 /* connect may fail, peer not yet available. 654 * stay C_WF_CONNECTION, don't go Disconnecting! */ 655 disconnect_on_error = 0; 656 what = "connect"; 657 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0); 658 659 out: 660 if (err < 0) { 661 if (sock) { 662 sock_release(sock); 663 sock = NULL; 664 } 665 switch (-err) { 666 /* timeout, busy, signal pending */ 667 case ETIMEDOUT: case EAGAIN: case EINPROGRESS: 668 case EINTR: case ERESTARTSYS: 669 /* peer not (yet) available, network problem */ 670 case ECONNREFUSED: case ENETUNREACH: 671 case EHOSTDOWN: case EHOSTUNREACH: 672 disconnect_on_error = 0; 673 break; 674 default: 675 drbd_err(connection, "%s failed, err = %d\n", what, err); 676 } 677 if (disconnect_on_error) 678 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 679 } 680 681 return sock; 682 } 683 684 struct accept_wait_data { 685 struct drbd_connection *connection; 686 struct socket *s_listen; 687 struct completion door_bell; 688 void (*original_sk_state_change)(struct sock *sk); 689 690 }; 691 692 static void drbd_incoming_connection(struct sock *sk) 693 { 694 struct accept_wait_data *ad = sk->sk_user_data; 695 void (*state_change)(struct sock *sk); 696 697 state_change = ad->original_sk_state_change; 698 if (sk->sk_state == TCP_ESTABLISHED) 699 complete(&ad->door_bell); 700 state_change(sk); 701 } 702 703 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad) 704 { 705 int err, sndbuf_size, rcvbuf_size, my_addr_len; 706 struct sockaddr_in6 my_addr; 707 struct socket *s_listen; 708 struct net_conf *nc; 709 const char *what; 710 711 rcu_read_lock(); 712 nc = rcu_dereference(connection->net_conf); 713 if (!nc) { 714 rcu_read_unlock(); 715 return -EIO; 716 } 717 sndbuf_size = nc->sndbuf_size; 718 rcvbuf_size = nc->rcvbuf_size; 719 rcu_read_unlock(); 720 721 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6)); 722 memcpy(&my_addr, &connection->my_addr, my_addr_len); 723 724 what = "sock_create_kern"; 725 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family, 726 SOCK_STREAM, IPPROTO_TCP, &s_listen); 727 if (err) { 728 s_listen = NULL; 729 goto out; 730 } 731 732 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 733 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size); 734 735 what = "bind before listen"; 736 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len); 737 if (err < 0) 738 goto out; 739 740 ad->s_listen = s_listen; 741 write_lock_bh(&s_listen->sk->sk_callback_lock); 742 ad->original_sk_state_change = s_listen->sk->sk_state_change; 743 s_listen->sk->sk_state_change = drbd_incoming_connection; 744 s_listen->sk->sk_user_data = ad; 745 write_unlock_bh(&s_listen->sk->sk_callback_lock); 746 747 what = "listen"; 748 err = s_listen->ops->listen(s_listen, 5); 749 if (err < 0) 750 goto out; 751 752 return 0; 753 out: 754 if (s_listen) 755 sock_release(s_listen); 756 if (err < 0) { 757 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 758 drbd_err(connection, "%s failed, err = %d\n", what, err); 759 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 760 } 761 } 762 763 return -EIO; 764 } 765 766 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad) 767 { 768 write_lock_bh(&sk->sk_callback_lock); 769 sk->sk_state_change = ad->original_sk_state_change; 770 sk->sk_user_data = NULL; 771 write_unlock_bh(&sk->sk_callback_lock); 772 } 773 774 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad) 775 { 776 int timeo, connect_int, err = 0; 777 struct socket *s_estab = NULL; 778 struct net_conf *nc; 779 780 rcu_read_lock(); 781 nc = rcu_dereference(connection->net_conf); 782 if (!nc) { 783 rcu_read_unlock(); 784 return NULL; 785 } 786 connect_int = nc->connect_int; 787 rcu_read_unlock(); 788 789 timeo = connect_int * HZ; 790 /* 28.5% random jitter */ 791 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7; 792 793 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo); 794 if (err <= 0) 795 return NULL; 796 797 err = kernel_accept(ad->s_listen, &s_estab, 0); 798 if (err < 0) { 799 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) { 800 drbd_err(connection, "accept failed, err = %d\n", err); 801 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 802 } 803 } 804 805 if (s_estab) 806 unregister_state_change(s_estab->sk, ad); 807 808 return s_estab; 809 } 810 811 static int decode_header(struct drbd_connection *, void *, struct packet_info *); 812 813 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock, 814 enum drbd_packet cmd) 815 { 816 if (!conn_prepare_command(connection, sock)) 817 return -EIO; 818 return conn_send_command(connection, sock, cmd, 0, NULL, 0); 819 } 820 821 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock) 822 { 823 unsigned int header_size = drbd_header_size(connection); 824 struct packet_info pi; 825 struct net_conf *nc; 826 int err; 827 828 rcu_read_lock(); 829 nc = rcu_dereference(connection->net_conf); 830 if (!nc) { 831 rcu_read_unlock(); 832 return -EIO; 833 } 834 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10; 835 rcu_read_unlock(); 836 837 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0); 838 if (err != header_size) { 839 if (err >= 0) 840 err = -EIO; 841 return err; 842 } 843 err = decode_header(connection, connection->data.rbuf, &pi); 844 if (err) 845 return err; 846 return pi.cmd; 847 } 848 849 /** 850 * drbd_socket_okay() - Free the socket if its connection is not okay 851 * @sock: pointer to the pointer to the socket. 852 */ 853 static bool drbd_socket_okay(struct socket **sock) 854 { 855 int rr; 856 char tb[4]; 857 858 if (!*sock) 859 return false; 860 861 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK); 862 863 if (rr > 0 || rr == -EAGAIN) { 864 return true; 865 } else { 866 sock_release(*sock); 867 *sock = NULL; 868 return false; 869 } 870 } 871 872 static bool connection_established(struct drbd_connection *connection, 873 struct socket **sock1, 874 struct socket **sock2) 875 { 876 struct net_conf *nc; 877 int timeout; 878 bool ok; 879 880 if (!*sock1 || !*sock2) 881 return false; 882 883 rcu_read_lock(); 884 nc = rcu_dereference(connection->net_conf); 885 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10; 886 rcu_read_unlock(); 887 schedule_timeout_interruptible(timeout); 888 889 ok = drbd_socket_okay(sock1); 890 ok = drbd_socket_okay(sock2) && ok; 891 892 return ok; 893 } 894 895 /* Gets called if a connection is established, or if a new minor gets created 896 in a connection */ 897 int drbd_connected(struct drbd_peer_device *peer_device) 898 { 899 struct drbd_device *device = peer_device->device; 900 int err; 901 902 atomic_set(&device->packet_seq, 0); 903 device->peer_seq = 0; 904 905 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ? 906 &peer_device->connection->cstate_mutex : 907 &device->own_state_mutex; 908 909 err = drbd_send_sync_param(peer_device); 910 if (!err) 911 err = drbd_send_sizes(peer_device, 0, 0); 912 if (!err) 913 err = drbd_send_uuids(peer_device); 914 if (!err) 915 err = drbd_send_current_state(peer_device); 916 clear_bit(USE_DEGR_WFC_T, &device->flags); 917 clear_bit(RESIZE_PENDING, &device->flags); 918 atomic_set(&device->ap_in_flight, 0); 919 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */ 920 return err; 921 } 922 923 /* 924 * return values: 925 * 1 yes, we have a valid connection 926 * 0 oops, did not work out, please try again 927 * -1 peer talks different language, 928 * no point in trying again, please go standalone. 929 * -2 We do not have a network config... 930 */ 931 static int conn_connect(struct drbd_connection *connection) 932 { 933 struct drbd_socket sock, msock; 934 struct drbd_peer_device *peer_device; 935 struct net_conf *nc; 936 int vnr, timeout, h; 937 bool discard_my_data, ok; 938 enum drbd_state_rv rv; 939 struct accept_wait_data ad = { 940 .connection = connection, 941 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell), 942 }; 943 944 clear_bit(DISCONNECT_SENT, &connection->flags); 945 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS) 946 return -2; 947 948 mutex_init(&sock.mutex); 949 sock.sbuf = connection->data.sbuf; 950 sock.rbuf = connection->data.rbuf; 951 sock.socket = NULL; 952 mutex_init(&msock.mutex); 953 msock.sbuf = connection->meta.sbuf; 954 msock.rbuf = connection->meta.rbuf; 955 msock.socket = NULL; 956 957 /* Assume that the peer only understands protocol 80 until we know better. */ 958 connection->agreed_pro_version = 80; 959 960 if (prepare_listen_socket(connection, &ad)) 961 return 0; 962 963 do { 964 struct socket *s; 965 966 s = drbd_try_connect(connection); 967 if (s) { 968 if (!sock.socket) { 969 sock.socket = s; 970 send_first_packet(connection, &sock, P_INITIAL_DATA); 971 } else if (!msock.socket) { 972 clear_bit(RESOLVE_CONFLICTS, &connection->flags); 973 msock.socket = s; 974 send_first_packet(connection, &msock, P_INITIAL_META); 975 } else { 976 drbd_err(connection, "Logic error in conn_connect()\n"); 977 goto out_release_sockets; 978 } 979 } 980 981 if (connection_established(connection, &sock.socket, &msock.socket)) 982 break; 983 984 retry: 985 s = drbd_wait_for_connect(connection, &ad); 986 if (s) { 987 int fp = receive_first_packet(connection, s); 988 drbd_socket_okay(&sock.socket); 989 drbd_socket_okay(&msock.socket); 990 switch (fp) { 991 case P_INITIAL_DATA: 992 if (sock.socket) { 993 drbd_warn(connection, "initial packet S crossed\n"); 994 sock_release(sock.socket); 995 sock.socket = s; 996 goto randomize; 997 } 998 sock.socket = s; 999 break; 1000 case P_INITIAL_META: 1001 set_bit(RESOLVE_CONFLICTS, &connection->flags); 1002 if (msock.socket) { 1003 drbd_warn(connection, "initial packet M crossed\n"); 1004 sock_release(msock.socket); 1005 msock.socket = s; 1006 goto randomize; 1007 } 1008 msock.socket = s; 1009 break; 1010 default: 1011 drbd_warn(connection, "Error receiving initial packet\n"); 1012 sock_release(s); 1013 randomize: 1014 if (prandom_u32() & 1) 1015 goto retry; 1016 } 1017 } 1018 1019 if (connection->cstate <= C_DISCONNECTING) 1020 goto out_release_sockets; 1021 if (signal_pending(current)) { 1022 flush_signals(current); 1023 smp_rmb(); 1024 if (get_t_state(&connection->receiver) == EXITING) 1025 goto out_release_sockets; 1026 } 1027 1028 ok = connection_established(connection, &sock.socket, &msock.socket); 1029 } while (!ok); 1030 1031 if (ad.s_listen) 1032 sock_release(ad.s_listen); 1033 1034 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 1035 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */ 1036 1037 sock.socket->sk->sk_allocation = GFP_NOIO; 1038 msock.socket->sk->sk_allocation = GFP_NOIO; 1039 1040 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK; 1041 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE; 1042 1043 /* NOT YET ... 1044 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10; 1045 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1046 * first set it to the P_CONNECTION_FEATURES timeout, 1047 * which we set to 4x the configured ping_timeout. */ 1048 rcu_read_lock(); 1049 nc = rcu_dereference(connection->net_conf); 1050 1051 sock.socket->sk->sk_sndtimeo = 1052 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10; 1053 1054 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ; 1055 timeout = nc->timeout * HZ / 10; 1056 discard_my_data = nc->discard_my_data; 1057 rcu_read_unlock(); 1058 1059 msock.socket->sk->sk_sndtimeo = timeout; 1060 1061 /* we don't want delays. 1062 * we use TCP_CORK where appropriate, though */ 1063 drbd_tcp_nodelay(sock.socket); 1064 drbd_tcp_nodelay(msock.socket); 1065 1066 connection->data.socket = sock.socket; 1067 connection->meta.socket = msock.socket; 1068 connection->last_received = jiffies; 1069 1070 h = drbd_do_features(connection); 1071 if (h <= 0) 1072 return h; 1073 1074 if (connection->cram_hmac_tfm) { 1075 /* drbd_request_state(device, NS(conn, WFAuth)); */ 1076 switch (drbd_do_auth(connection)) { 1077 case -1: 1078 drbd_err(connection, "Authentication of peer failed\n"); 1079 return -1; 1080 case 0: 1081 drbd_err(connection, "Authentication of peer failed, trying again.\n"); 1082 return 0; 1083 } 1084 } 1085 1086 connection->data.socket->sk->sk_sndtimeo = timeout; 1087 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT; 1088 1089 if (drbd_send_protocol(connection) == -EOPNOTSUPP) 1090 return -1; 1091 1092 /* Prevent a race between resync-handshake and 1093 * being promoted to Primary. 1094 * 1095 * Grab and release the state mutex, so we know that any current 1096 * drbd_set_role() is finished, and any incoming drbd_set_role 1097 * will see the STATE_SENT flag, and wait for it to be cleared. 1098 */ 1099 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1100 mutex_lock(peer_device->device->state_mutex); 1101 1102 set_bit(STATE_SENT, &connection->flags); 1103 1104 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) 1105 mutex_unlock(peer_device->device->state_mutex); 1106 1107 rcu_read_lock(); 1108 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1109 struct drbd_device *device = peer_device->device; 1110 kref_get(&device->kref); 1111 rcu_read_unlock(); 1112 1113 if (discard_my_data) 1114 set_bit(DISCARD_MY_DATA, &device->flags); 1115 else 1116 clear_bit(DISCARD_MY_DATA, &device->flags); 1117 1118 drbd_connected(peer_device); 1119 kref_put(&device->kref, drbd_destroy_device); 1120 rcu_read_lock(); 1121 } 1122 rcu_read_unlock(); 1123 1124 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE); 1125 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) { 1126 clear_bit(STATE_SENT, &connection->flags); 1127 return 0; 1128 } 1129 1130 drbd_thread_start(&connection->ack_receiver); 1131 /* opencoded create_singlethread_workqueue(), 1132 * to be able to use format string arguments */ 1133 connection->ack_sender = 1134 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name); 1135 if (!connection->ack_sender) { 1136 drbd_err(connection, "Failed to create workqueue ack_sender\n"); 1137 return 0; 1138 } 1139 1140 mutex_lock(&connection->resource->conf_update); 1141 /* The discard_my_data flag is a single-shot modifier to the next 1142 * connection attempt, the handshake of which is now well underway. 1143 * No need for rcu style copying of the whole struct 1144 * just to clear a single value. */ 1145 connection->net_conf->discard_my_data = 0; 1146 mutex_unlock(&connection->resource->conf_update); 1147 1148 return h; 1149 1150 out_release_sockets: 1151 if (ad.s_listen) 1152 sock_release(ad.s_listen); 1153 if (sock.socket) 1154 sock_release(sock.socket); 1155 if (msock.socket) 1156 sock_release(msock.socket); 1157 return -1; 1158 } 1159 1160 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi) 1161 { 1162 unsigned int header_size = drbd_header_size(connection); 1163 1164 if (header_size == sizeof(struct p_header100) && 1165 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) { 1166 struct p_header100 *h = header; 1167 if (h->pad != 0) { 1168 drbd_err(connection, "Header padding is not zero\n"); 1169 return -EINVAL; 1170 } 1171 pi->vnr = be16_to_cpu(h->volume); 1172 pi->cmd = be16_to_cpu(h->command); 1173 pi->size = be32_to_cpu(h->length); 1174 } else if (header_size == sizeof(struct p_header95) && 1175 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) { 1176 struct p_header95 *h = header; 1177 pi->cmd = be16_to_cpu(h->command); 1178 pi->size = be32_to_cpu(h->length); 1179 pi->vnr = 0; 1180 } else if (header_size == sizeof(struct p_header80) && 1181 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) { 1182 struct p_header80 *h = header; 1183 pi->cmd = be16_to_cpu(h->command); 1184 pi->size = be16_to_cpu(h->length); 1185 pi->vnr = 0; 1186 } else { 1187 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n", 1188 be32_to_cpu(*(__be32 *)header), 1189 connection->agreed_pro_version); 1190 return -EINVAL; 1191 } 1192 pi->data = header + header_size; 1193 return 0; 1194 } 1195 1196 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi) 1197 { 1198 void *buffer = connection->data.rbuf; 1199 int err; 1200 1201 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection)); 1202 if (err) 1203 return err; 1204 1205 err = decode_header(connection, buffer, pi); 1206 connection->last_received = jiffies; 1207 1208 return err; 1209 } 1210 1211 /* This is blkdev_issue_flush, but asynchronous. 1212 * We want to submit to all component volumes in parallel, 1213 * then wait for all completions. 1214 */ 1215 struct issue_flush_context { 1216 atomic_t pending; 1217 int error; 1218 struct completion done; 1219 }; 1220 struct one_flush_context { 1221 struct drbd_device *device; 1222 struct issue_flush_context *ctx; 1223 }; 1224 1225 void one_flush_endio(struct bio *bio) 1226 { 1227 struct one_flush_context *octx = bio->bi_private; 1228 struct drbd_device *device = octx->device; 1229 struct issue_flush_context *ctx = octx->ctx; 1230 1231 if (bio->bi_error) { 1232 ctx->error = bio->bi_error; 1233 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error); 1234 } 1235 kfree(octx); 1236 bio_put(bio); 1237 1238 clear_bit(FLUSH_PENDING, &device->flags); 1239 put_ldev(device); 1240 kref_put(&device->kref, drbd_destroy_device); 1241 1242 if (atomic_dec_and_test(&ctx->pending)) 1243 complete(&ctx->done); 1244 } 1245 1246 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx) 1247 { 1248 struct bio *bio = bio_alloc(GFP_NOIO, 0); 1249 struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO); 1250 if (!bio || !octx) { 1251 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n"); 1252 /* FIXME: what else can I do now? disconnecting or detaching 1253 * really does not help to improve the state of the world, either. 1254 */ 1255 kfree(octx); 1256 if (bio) 1257 bio_put(bio); 1258 1259 ctx->error = -ENOMEM; 1260 put_ldev(device); 1261 kref_put(&device->kref, drbd_destroy_device); 1262 return; 1263 } 1264 1265 octx->device = device; 1266 octx->ctx = ctx; 1267 bio->bi_bdev = device->ldev->backing_bdev; 1268 bio->bi_private = octx; 1269 bio->bi_end_io = one_flush_endio; 1270 bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH; 1271 1272 device->flush_jif = jiffies; 1273 set_bit(FLUSH_PENDING, &device->flags); 1274 atomic_inc(&ctx->pending); 1275 submit_bio(bio); 1276 } 1277 1278 static void drbd_flush(struct drbd_connection *connection) 1279 { 1280 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) { 1281 struct drbd_peer_device *peer_device; 1282 struct issue_flush_context ctx; 1283 int vnr; 1284 1285 atomic_set(&ctx.pending, 1); 1286 ctx.error = 0; 1287 init_completion(&ctx.done); 1288 1289 rcu_read_lock(); 1290 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1291 struct drbd_device *device = peer_device->device; 1292 1293 if (!get_ldev(device)) 1294 continue; 1295 kref_get(&device->kref); 1296 rcu_read_unlock(); 1297 1298 submit_one_flush(device, &ctx); 1299 1300 rcu_read_lock(); 1301 } 1302 rcu_read_unlock(); 1303 1304 /* Do we want to add a timeout, 1305 * if disk-timeout is set? */ 1306 if (!atomic_dec_and_test(&ctx.pending)) 1307 wait_for_completion(&ctx.done); 1308 1309 if (ctx.error) { 1310 /* would rather check on EOPNOTSUPP, but that is not reliable. 1311 * don't try again for ANY return value != 0 1312 * if (rv == -EOPNOTSUPP) */ 1313 /* Any error is already reported by bio_endio callback. */ 1314 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO); 1315 } 1316 } 1317 } 1318 1319 /** 1320 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it. 1321 * @device: DRBD device. 1322 * @epoch: Epoch object. 1323 * @ev: Epoch event. 1324 */ 1325 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection, 1326 struct drbd_epoch *epoch, 1327 enum epoch_event ev) 1328 { 1329 int epoch_size; 1330 struct drbd_epoch *next_epoch; 1331 enum finish_epoch rv = FE_STILL_LIVE; 1332 1333 spin_lock(&connection->epoch_lock); 1334 do { 1335 next_epoch = NULL; 1336 1337 epoch_size = atomic_read(&epoch->epoch_size); 1338 1339 switch (ev & ~EV_CLEANUP) { 1340 case EV_PUT: 1341 atomic_dec(&epoch->active); 1342 break; 1343 case EV_GOT_BARRIER_NR: 1344 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags); 1345 break; 1346 case EV_BECAME_LAST: 1347 /* nothing to do*/ 1348 break; 1349 } 1350 1351 if (epoch_size != 0 && 1352 atomic_read(&epoch->active) == 0 && 1353 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) { 1354 if (!(ev & EV_CLEANUP)) { 1355 spin_unlock(&connection->epoch_lock); 1356 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size); 1357 spin_lock(&connection->epoch_lock); 1358 } 1359 #if 0 1360 /* FIXME: dec unacked on connection, once we have 1361 * something to count pending connection packets in. */ 1362 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) 1363 dec_unacked(epoch->connection); 1364 #endif 1365 1366 if (connection->current_epoch != epoch) { 1367 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list); 1368 list_del(&epoch->list); 1369 ev = EV_BECAME_LAST | (ev & EV_CLEANUP); 1370 connection->epochs--; 1371 kfree(epoch); 1372 1373 if (rv == FE_STILL_LIVE) 1374 rv = FE_DESTROYED; 1375 } else { 1376 epoch->flags = 0; 1377 atomic_set(&epoch->epoch_size, 0); 1378 /* atomic_set(&epoch->active, 0); is already zero */ 1379 if (rv == FE_STILL_LIVE) 1380 rv = FE_RECYCLED; 1381 } 1382 } 1383 1384 if (!next_epoch) 1385 break; 1386 1387 epoch = next_epoch; 1388 } while (1); 1389 1390 spin_unlock(&connection->epoch_lock); 1391 1392 return rv; 1393 } 1394 1395 static enum write_ordering_e 1396 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo) 1397 { 1398 struct disk_conf *dc; 1399 1400 dc = rcu_dereference(bdev->disk_conf); 1401 1402 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes) 1403 wo = WO_DRAIN_IO; 1404 if (wo == WO_DRAIN_IO && !dc->disk_drain) 1405 wo = WO_NONE; 1406 1407 return wo; 1408 } 1409 1410 /** 1411 * drbd_bump_write_ordering() - Fall back to an other write ordering method 1412 * @connection: DRBD connection. 1413 * @wo: Write ordering method to try. 1414 */ 1415 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev, 1416 enum write_ordering_e wo) 1417 { 1418 struct drbd_device *device; 1419 enum write_ordering_e pwo; 1420 int vnr; 1421 static char *write_ordering_str[] = { 1422 [WO_NONE] = "none", 1423 [WO_DRAIN_IO] = "drain", 1424 [WO_BDEV_FLUSH] = "flush", 1425 }; 1426 1427 pwo = resource->write_ordering; 1428 if (wo != WO_BDEV_FLUSH) 1429 wo = min(pwo, wo); 1430 rcu_read_lock(); 1431 idr_for_each_entry(&resource->devices, device, vnr) { 1432 if (get_ldev(device)) { 1433 wo = max_allowed_wo(device->ldev, wo); 1434 if (device->ldev == bdev) 1435 bdev = NULL; 1436 put_ldev(device); 1437 } 1438 } 1439 1440 if (bdev) 1441 wo = max_allowed_wo(bdev, wo); 1442 1443 rcu_read_unlock(); 1444 1445 resource->write_ordering = wo; 1446 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH) 1447 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]); 1448 } 1449 1450 /* 1451 * We *may* ignore the discard-zeroes-data setting, if so configured. 1452 * 1453 * Assumption is that it "discard_zeroes_data=0" is only because the backend 1454 * may ignore partial unaligned discards. 1455 * 1456 * LVM/DM thin as of at least 1457 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28) 1458 * Library version: 1.02.93-RHEL7 (2015-01-28) 1459 * Driver version: 4.29.0 1460 * still behaves this way. 1461 * 1462 * For unaligned (wrt. alignment and granularity) or too small discards, 1463 * we zero-out the initial (and/or) trailing unaligned partial chunks, 1464 * but discard all the aligned full chunks. 1465 * 1466 * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1". 1467 */ 1468 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard) 1469 { 1470 struct block_device *bdev = device->ldev->backing_bdev; 1471 struct request_queue *q = bdev_get_queue(bdev); 1472 sector_t tmp, nr; 1473 unsigned int max_discard_sectors, granularity; 1474 int alignment; 1475 int err = 0; 1476 1477 if (!discard) 1478 goto zero_out; 1479 1480 /* Zero-sector (unknown) and one-sector granularities are the same. */ 1481 granularity = max(q->limits.discard_granularity >> 9, 1U); 1482 alignment = (bdev_discard_alignment(bdev) >> 9) % granularity; 1483 1484 max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22)); 1485 max_discard_sectors -= max_discard_sectors % granularity; 1486 if (unlikely(!max_discard_sectors)) 1487 goto zero_out; 1488 1489 if (nr_sectors < granularity) 1490 goto zero_out; 1491 1492 tmp = start; 1493 if (sector_div(tmp, granularity) != alignment) { 1494 if (nr_sectors < 2*granularity) 1495 goto zero_out; 1496 /* start + gran - (start + gran - align) % gran */ 1497 tmp = start + granularity - alignment; 1498 tmp = start + granularity - sector_div(tmp, granularity); 1499 1500 nr = tmp - start; 1501 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0); 1502 nr_sectors -= nr; 1503 start = tmp; 1504 } 1505 while (nr_sectors >= granularity) { 1506 nr = min_t(sector_t, nr_sectors, max_discard_sectors); 1507 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0); 1508 nr_sectors -= nr; 1509 start += nr; 1510 } 1511 zero_out: 1512 if (nr_sectors) { 1513 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0); 1514 } 1515 return err != 0; 1516 } 1517 1518 static bool can_do_reliable_discards(struct drbd_device *device) 1519 { 1520 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev); 1521 struct disk_conf *dc; 1522 bool can_do; 1523 1524 if (!blk_queue_discard(q)) 1525 return false; 1526 1527 if (q->limits.discard_zeroes_data) 1528 return true; 1529 1530 rcu_read_lock(); 1531 dc = rcu_dereference(device->ldev->disk_conf); 1532 can_do = dc->discard_zeroes_if_aligned; 1533 rcu_read_unlock(); 1534 return can_do; 1535 } 1536 1537 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req) 1538 { 1539 /* If the backend cannot discard, or does not guarantee 1540 * read-back zeroes in discarded ranges, we fall back to 1541 * zero-out. Unless configuration specifically requested 1542 * otherwise. */ 1543 if (!can_do_reliable_discards(device)) 1544 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT; 1545 1546 if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector, 1547 peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT))) 1548 peer_req->flags |= EE_WAS_ERROR; 1549 drbd_endio_write_sec_final(peer_req); 1550 } 1551 1552 static void drbd_issue_peer_wsame(struct drbd_device *device, 1553 struct drbd_peer_request *peer_req) 1554 { 1555 struct block_device *bdev = device->ldev->backing_bdev; 1556 sector_t s = peer_req->i.sector; 1557 sector_t nr = peer_req->i.size >> 9; 1558 if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages)) 1559 peer_req->flags |= EE_WAS_ERROR; 1560 drbd_endio_write_sec_final(peer_req); 1561 } 1562 1563 1564 /** 1565 * drbd_submit_peer_request() 1566 * @device: DRBD device. 1567 * @peer_req: peer request 1568 * @rw: flag field, see bio->bi_opf 1569 * 1570 * May spread the pages to multiple bios, 1571 * depending on bio_add_page restrictions. 1572 * 1573 * Returns 0 if all bios have been submitted, 1574 * -ENOMEM if we could not allocate enough bios, 1575 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a 1576 * single page to an empty bio (which should never happen and likely indicates 1577 * that the lower level IO stack is in some way broken). This has been observed 1578 * on certain Xen deployments. 1579 */ 1580 /* TODO allocate from our own bio_set. */ 1581 int drbd_submit_peer_request(struct drbd_device *device, 1582 struct drbd_peer_request *peer_req, 1583 const unsigned op, const unsigned op_flags, 1584 const int fault_type) 1585 { 1586 struct bio *bios = NULL; 1587 struct bio *bio; 1588 struct page *page = peer_req->pages; 1589 sector_t sector = peer_req->i.sector; 1590 unsigned data_size = peer_req->i.size; 1591 unsigned n_bios = 0; 1592 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT; 1593 int err = -ENOMEM; 1594 1595 /* TRIM/DISCARD: for now, always use the helper function 1596 * blkdev_issue_zeroout(..., discard=true). 1597 * It's synchronous, but it does the right thing wrt. bio splitting. 1598 * Correctness first, performance later. Next step is to code an 1599 * asynchronous variant of the same. 1600 */ 1601 if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) { 1602 /* wait for all pending IO completions, before we start 1603 * zeroing things out. */ 1604 conn_wait_active_ee_empty(peer_req->peer_device->connection); 1605 /* add it to the active list now, 1606 * so we can find it to present it in debugfs */ 1607 peer_req->submit_jif = jiffies; 1608 peer_req->flags |= EE_SUBMITTED; 1609 1610 /* If this was a resync request from receive_rs_deallocated(), 1611 * it is already on the sync_ee list */ 1612 if (list_empty(&peer_req->w.list)) { 1613 spin_lock_irq(&device->resource->req_lock); 1614 list_add_tail(&peer_req->w.list, &device->active_ee); 1615 spin_unlock_irq(&device->resource->req_lock); 1616 } 1617 1618 if (peer_req->flags & EE_IS_TRIM) 1619 drbd_issue_peer_discard(device, peer_req); 1620 else /* EE_WRITE_SAME */ 1621 drbd_issue_peer_wsame(device, peer_req); 1622 return 0; 1623 } 1624 1625 /* In most cases, we will only need one bio. But in case the lower 1626 * level restrictions happen to be different at this offset on this 1627 * side than those of the sending peer, we may need to submit the 1628 * request in more than one bio. 1629 * 1630 * Plain bio_alloc is good enough here, this is no DRBD internally 1631 * generated bio, but a bio allocated on behalf of the peer. 1632 */ 1633 next_bio: 1634 bio = bio_alloc(GFP_NOIO, nr_pages); 1635 if (!bio) { 1636 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages); 1637 goto fail; 1638 } 1639 /* > peer_req->i.sector, unless this is the first bio */ 1640 bio->bi_iter.bi_sector = sector; 1641 bio->bi_bdev = device->ldev->backing_bdev; 1642 bio_set_op_attrs(bio, op, op_flags); 1643 bio->bi_private = peer_req; 1644 bio->bi_end_io = drbd_peer_request_endio; 1645 1646 bio->bi_next = bios; 1647 bios = bio; 1648 ++n_bios; 1649 1650 page_chain_for_each(page) { 1651 unsigned len = min_t(unsigned, data_size, PAGE_SIZE); 1652 if (!bio_add_page(bio, page, len, 0)) 1653 goto next_bio; 1654 data_size -= len; 1655 sector += len >> 9; 1656 --nr_pages; 1657 } 1658 D_ASSERT(device, data_size == 0); 1659 D_ASSERT(device, page == NULL); 1660 1661 atomic_set(&peer_req->pending_bios, n_bios); 1662 /* for debugfs: update timestamp, mark as submitted */ 1663 peer_req->submit_jif = jiffies; 1664 peer_req->flags |= EE_SUBMITTED; 1665 do { 1666 bio = bios; 1667 bios = bios->bi_next; 1668 bio->bi_next = NULL; 1669 1670 drbd_generic_make_request(device, fault_type, bio); 1671 } while (bios); 1672 return 0; 1673 1674 fail: 1675 while (bios) { 1676 bio = bios; 1677 bios = bios->bi_next; 1678 bio_put(bio); 1679 } 1680 return err; 1681 } 1682 1683 static void drbd_remove_epoch_entry_interval(struct drbd_device *device, 1684 struct drbd_peer_request *peer_req) 1685 { 1686 struct drbd_interval *i = &peer_req->i; 1687 1688 drbd_remove_interval(&device->write_requests, i); 1689 drbd_clear_interval(i); 1690 1691 /* Wake up any processes waiting for this peer request to complete. */ 1692 if (i->waiting) 1693 wake_up(&device->misc_wait); 1694 } 1695 1696 static void conn_wait_active_ee_empty(struct drbd_connection *connection) 1697 { 1698 struct drbd_peer_device *peer_device; 1699 int vnr; 1700 1701 rcu_read_lock(); 1702 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1703 struct drbd_device *device = peer_device->device; 1704 1705 kref_get(&device->kref); 1706 rcu_read_unlock(); 1707 drbd_wait_ee_list_empty(device, &device->active_ee); 1708 kref_put(&device->kref, drbd_destroy_device); 1709 rcu_read_lock(); 1710 } 1711 rcu_read_unlock(); 1712 } 1713 1714 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi) 1715 { 1716 int rv; 1717 struct p_barrier *p = pi->data; 1718 struct drbd_epoch *epoch; 1719 1720 /* FIXME these are unacked on connection, 1721 * not a specific (peer)device. 1722 */ 1723 connection->current_epoch->barrier_nr = p->barrier; 1724 connection->current_epoch->connection = connection; 1725 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR); 1726 1727 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from 1728 * the activity log, which means it would not be resynced in case the 1729 * R_PRIMARY crashes now. 1730 * Therefore we must send the barrier_ack after the barrier request was 1731 * completed. */ 1732 switch (connection->resource->write_ordering) { 1733 case WO_NONE: 1734 if (rv == FE_RECYCLED) 1735 return 0; 1736 1737 /* receiver context, in the writeout path of the other node. 1738 * avoid potential distributed deadlock */ 1739 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1740 if (epoch) 1741 break; 1742 else 1743 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n"); 1744 /* Fall through */ 1745 1746 case WO_BDEV_FLUSH: 1747 case WO_DRAIN_IO: 1748 conn_wait_active_ee_empty(connection); 1749 drbd_flush(connection); 1750 1751 if (atomic_read(&connection->current_epoch->epoch_size)) { 1752 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO); 1753 if (epoch) 1754 break; 1755 } 1756 1757 return 0; 1758 default: 1759 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", 1760 connection->resource->write_ordering); 1761 return -EIO; 1762 } 1763 1764 epoch->flags = 0; 1765 atomic_set(&epoch->epoch_size, 0); 1766 atomic_set(&epoch->active, 0); 1767 1768 spin_lock(&connection->epoch_lock); 1769 if (atomic_read(&connection->current_epoch->epoch_size)) { 1770 list_add(&epoch->list, &connection->current_epoch->list); 1771 connection->current_epoch = epoch; 1772 connection->epochs++; 1773 } else { 1774 /* The current_epoch got recycled while we allocated this one... */ 1775 kfree(epoch); 1776 } 1777 spin_unlock(&connection->epoch_lock); 1778 1779 return 0; 1780 } 1781 1782 /* quick wrapper in case payload size != request_size (write same) */ 1783 static void drbd_csum_ee_size(struct crypto_ahash *h, 1784 struct drbd_peer_request *r, void *d, 1785 unsigned int payload_size) 1786 { 1787 unsigned int tmp = r->i.size; 1788 r->i.size = payload_size; 1789 drbd_csum_ee(h, r, d); 1790 r->i.size = tmp; 1791 } 1792 1793 /* used from receive_RSDataReply (recv_resync_read) 1794 * and from receive_Data. 1795 * data_size: actual payload ("data in") 1796 * for normal writes that is bi_size. 1797 * for discards, that is zero. 1798 * for write same, it is logical_block_size. 1799 * both trim and write same have the bi_size ("data len to be affected") 1800 * as extra argument in the packet header. 1801 */ 1802 static struct drbd_peer_request * 1803 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector, 1804 struct packet_info *pi) __must_hold(local) 1805 { 1806 struct drbd_device *device = peer_device->device; 1807 const sector_t capacity = drbd_get_capacity(device->this_bdev); 1808 struct drbd_peer_request *peer_req; 1809 struct page *page; 1810 int digest_size, err; 1811 unsigned int data_size = pi->size, ds; 1812 void *dig_in = peer_device->connection->int_dig_in; 1813 void *dig_vv = peer_device->connection->int_dig_vv; 1814 unsigned long *data; 1815 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL; 1816 struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL; 1817 1818 digest_size = 0; 1819 if (!trim && peer_device->connection->peer_integrity_tfm) { 1820 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm); 1821 /* 1822 * FIXME: Receive the incoming digest into the receive buffer 1823 * here, together with its struct p_data? 1824 */ 1825 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1826 if (err) 1827 return NULL; 1828 data_size -= digest_size; 1829 } 1830 1831 /* assume request_size == data_size, but special case trim and wsame. */ 1832 ds = data_size; 1833 if (trim) { 1834 if (!expect(data_size == 0)) 1835 return NULL; 1836 ds = be32_to_cpu(trim->size); 1837 } else if (wsame) { 1838 if (data_size != queue_logical_block_size(device->rq_queue)) { 1839 drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n", 1840 data_size, queue_logical_block_size(device->rq_queue)); 1841 return NULL; 1842 } 1843 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) { 1844 drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n", 1845 data_size, bdev_logical_block_size(device->ldev->backing_bdev)); 1846 return NULL; 1847 } 1848 ds = be32_to_cpu(wsame->size); 1849 } 1850 1851 if (!expect(IS_ALIGNED(ds, 512))) 1852 return NULL; 1853 if (trim || wsame) { 1854 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9))) 1855 return NULL; 1856 } else if (!expect(ds <= DRBD_MAX_BIO_SIZE)) 1857 return NULL; 1858 1859 /* even though we trust out peer, 1860 * we sometimes have to double check. */ 1861 if (sector + (ds>>9) > capacity) { 1862 drbd_err(device, "request from peer beyond end of local disk: " 1863 "capacity: %llus < sector: %llus + size: %u\n", 1864 (unsigned long long)capacity, 1865 (unsigned long long)sector, ds); 1866 return NULL; 1867 } 1868 1869 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 1870 * "criss-cross" setup, that might cause write-out on some other DRBD, 1871 * which in turn might block on the other node at this very place. */ 1872 peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO); 1873 if (!peer_req) 1874 return NULL; 1875 1876 peer_req->flags |= EE_WRITE; 1877 if (trim) { 1878 peer_req->flags |= EE_IS_TRIM; 1879 return peer_req; 1880 } 1881 if (wsame) 1882 peer_req->flags |= EE_WRITE_SAME; 1883 1884 /* receive payload size bytes into page chain */ 1885 ds = data_size; 1886 page = peer_req->pages; 1887 page_chain_for_each(page) { 1888 unsigned len = min_t(int, ds, PAGE_SIZE); 1889 data = kmap(page); 1890 err = drbd_recv_all_warn(peer_device->connection, data, len); 1891 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) { 1892 drbd_err(device, "Fault injection: Corrupting data on receive\n"); 1893 data[0] = data[0] ^ (unsigned long)-1; 1894 } 1895 kunmap(page); 1896 if (err) { 1897 drbd_free_peer_req(device, peer_req); 1898 return NULL; 1899 } 1900 ds -= len; 1901 } 1902 1903 if (digest_size) { 1904 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size); 1905 if (memcmp(dig_in, dig_vv, digest_size)) { 1906 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n", 1907 (unsigned long long)sector, data_size); 1908 drbd_free_peer_req(device, peer_req); 1909 return NULL; 1910 } 1911 } 1912 device->recv_cnt += data_size >> 9; 1913 return peer_req; 1914 } 1915 1916 /* drbd_drain_block() just takes a data block 1917 * out of the socket input buffer, and discards it. 1918 */ 1919 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size) 1920 { 1921 struct page *page; 1922 int err = 0; 1923 void *data; 1924 1925 if (!data_size) 1926 return 0; 1927 1928 page = drbd_alloc_pages(peer_device, 1, 1); 1929 1930 data = kmap(page); 1931 while (data_size) { 1932 unsigned int len = min_t(int, data_size, PAGE_SIZE); 1933 1934 err = drbd_recv_all_warn(peer_device->connection, data, len); 1935 if (err) 1936 break; 1937 data_size -= len; 1938 } 1939 kunmap(page); 1940 drbd_free_pages(peer_device->device, page, 0); 1941 return err; 1942 } 1943 1944 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req, 1945 sector_t sector, int data_size) 1946 { 1947 struct bio_vec bvec; 1948 struct bvec_iter iter; 1949 struct bio *bio; 1950 int digest_size, err, expect; 1951 void *dig_in = peer_device->connection->int_dig_in; 1952 void *dig_vv = peer_device->connection->int_dig_vv; 1953 1954 digest_size = 0; 1955 if (peer_device->connection->peer_integrity_tfm) { 1956 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm); 1957 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size); 1958 if (err) 1959 return err; 1960 data_size -= digest_size; 1961 } 1962 1963 /* optimistically update recv_cnt. if receiving fails below, 1964 * we disconnect anyways, and counters will be reset. */ 1965 peer_device->device->recv_cnt += data_size>>9; 1966 1967 bio = req->master_bio; 1968 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector); 1969 1970 bio_for_each_segment(bvec, bio, iter) { 1971 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset; 1972 expect = min_t(int, data_size, bvec.bv_len); 1973 err = drbd_recv_all_warn(peer_device->connection, mapped, expect); 1974 kunmap(bvec.bv_page); 1975 if (err) 1976 return err; 1977 data_size -= expect; 1978 } 1979 1980 if (digest_size) { 1981 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv); 1982 if (memcmp(dig_in, dig_vv, digest_size)) { 1983 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n"); 1984 return -EINVAL; 1985 } 1986 } 1987 1988 D_ASSERT(peer_device->device, data_size == 0); 1989 return 0; 1990 } 1991 1992 /* 1993 * e_end_resync_block() is called in ack_sender context via 1994 * drbd_finish_peer_reqs(). 1995 */ 1996 static int e_end_resync_block(struct drbd_work *w, int unused) 1997 { 1998 struct drbd_peer_request *peer_req = 1999 container_of(w, struct drbd_peer_request, w); 2000 struct drbd_peer_device *peer_device = peer_req->peer_device; 2001 struct drbd_device *device = peer_device->device; 2002 sector_t sector = peer_req->i.sector; 2003 int err; 2004 2005 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 2006 2007 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 2008 drbd_set_in_sync(device, sector, peer_req->i.size); 2009 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req); 2010 } else { 2011 /* Record failure to sync */ 2012 drbd_rs_failed_io(device, sector, peer_req->i.size); 2013 2014 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 2015 } 2016 dec_unacked(device); 2017 2018 return err; 2019 } 2020 2021 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector, 2022 struct packet_info *pi) __releases(local) 2023 { 2024 struct drbd_device *device = peer_device->device; 2025 struct drbd_peer_request *peer_req; 2026 2027 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi); 2028 if (!peer_req) 2029 goto fail; 2030 2031 dec_rs_pending(device); 2032 2033 inc_unacked(device); 2034 /* corresponding dec_unacked() in e_end_resync_block() 2035 * respective _drbd_clear_done_ee */ 2036 2037 peer_req->w.cb = e_end_resync_block; 2038 peer_req->submit_jif = jiffies; 2039 2040 spin_lock_irq(&device->resource->req_lock); 2041 list_add_tail(&peer_req->w.list, &device->sync_ee); 2042 spin_unlock_irq(&device->resource->req_lock); 2043 2044 atomic_add(pi->size >> 9, &device->rs_sect_ev); 2045 if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0, 2046 DRBD_FAULT_RS_WR) == 0) 2047 return 0; 2048 2049 /* don't care for the reason here */ 2050 drbd_err(device, "submit failed, triggering re-connect\n"); 2051 spin_lock_irq(&device->resource->req_lock); 2052 list_del(&peer_req->w.list); 2053 spin_unlock_irq(&device->resource->req_lock); 2054 2055 drbd_free_peer_req(device, peer_req); 2056 fail: 2057 put_ldev(device); 2058 return -EIO; 2059 } 2060 2061 static struct drbd_request * 2062 find_request(struct drbd_device *device, struct rb_root *root, u64 id, 2063 sector_t sector, bool missing_ok, const char *func) 2064 { 2065 struct drbd_request *req; 2066 2067 /* Request object according to our peer */ 2068 req = (struct drbd_request *)(unsigned long)id; 2069 if (drbd_contains_interval(root, sector, &req->i) && req->i.local) 2070 return req; 2071 if (!missing_ok) { 2072 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func, 2073 (unsigned long)id, (unsigned long long)sector); 2074 } 2075 return NULL; 2076 } 2077 2078 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi) 2079 { 2080 struct drbd_peer_device *peer_device; 2081 struct drbd_device *device; 2082 struct drbd_request *req; 2083 sector_t sector; 2084 int err; 2085 struct p_data *p = pi->data; 2086 2087 peer_device = conn_peer_device(connection, pi->vnr); 2088 if (!peer_device) 2089 return -EIO; 2090 device = peer_device->device; 2091 2092 sector = be64_to_cpu(p->sector); 2093 2094 spin_lock_irq(&device->resource->req_lock); 2095 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__); 2096 spin_unlock_irq(&device->resource->req_lock); 2097 if (unlikely(!req)) 2098 return -EIO; 2099 2100 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid 2101 * special casing it there for the various failure cases. 2102 * still no race with drbd_fail_pending_reads */ 2103 err = recv_dless_read(peer_device, req, sector, pi->size); 2104 if (!err) 2105 req_mod(req, DATA_RECEIVED); 2106 /* else: nothing. handled from drbd_disconnect... 2107 * I don't think we may complete this just yet 2108 * in case we are "on-disconnect: freeze" */ 2109 2110 return err; 2111 } 2112 2113 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi) 2114 { 2115 struct drbd_peer_device *peer_device; 2116 struct drbd_device *device; 2117 sector_t sector; 2118 int err; 2119 struct p_data *p = pi->data; 2120 2121 peer_device = conn_peer_device(connection, pi->vnr); 2122 if (!peer_device) 2123 return -EIO; 2124 device = peer_device->device; 2125 2126 sector = be64_to_cpu(p->sector); 2127 D_ASSERT(device, p->block_id == ID_SYNCER); 2128 2129 if (get_ldev(device)) { 2130 /* data is submitted to disk within recv_resync_read. 2131 * corresponding put_ldev done below on error, 2132 * or in drbd_peer_request_endio. */ 2133 err = recv_resync_read(peer_device, sector, pi); 2134 } else { 2135 if (__ratelimit(&drbd_ratelimit_state)) 2136 drbd_err(device, "Can not write resync data to local disk.\n"); 2137 2138 err = drbd_drain_block(peer_device, pi->size); 2139 2140 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2141 } 2142 2143 atomic_add(pi->size >> 9, &device->rs_sect_in); 2144 2145 return err; 2146 } 2147 2148 static void restart_conflicting_writes(struct drbd_device *device, 2149 sector_t sector, int size) 2150 { 2151 struct drbd_interval *i; 2152 struct drbd_request *req; 2153 2154 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2155 if (!i->local) 2156 continue; 2157 req = container_of(i, struct drbd_request, i); 2158 if (req->rq_state & RQ_LOCAL_PENDING || 2159 !(req->rq_state & RQ_POSTPONED)) 2160 continue; 2161 /* as it is RQ_POSTPONED, this will cause it to 2162 * be queued on the retry workqueue. */ 2163 __req_mod(req, CONFLICT_RESOLVED, NULL); 2164 } 2165 } 2166 2167 /* 2168 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs(). 2169 */ 2170 static int e_end_block(struct drbd_work *w, int cancel) 2171 { 2172 struct drbd_peer_request *peer_req = 2173 container_of(w, struct drbd_peer_request, w); 2174 struct drbd_peer_device *peer_device = peer_req->peer_device; 2175 struct drbd_device *device = peer_device->device; 2176 sector_t sector = peer_req->i.sector; 2177 int err = 0, pcmd; 2178 2179 if (peer_req->flags & EE_SEND_WRITE_ACK) { 2180 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 2181 pcmd = (device->state.conn >= C_SYNC_SOURCE && 2182 device->state.conn <= C_PAUSED_SYNC_T && 2183 peer_req->flags & EE_MAY_SET_IN_SYNC) ? 2184 P_RS_WRITE_ACK : P_WRITE_ACK; 2185 err = drbd_send_ack(peer_device, pcmd, peer_req); 2186 if (pcmd == P_RS_WRITE_ACK) 2187 drbd_set_in_sync(device, sector, peer_req->i.size); 2188 } else { 2189 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req); 2190 /* we expect it to be marked out of sync anyways... 2191 * maybe assert this? */ 2192 } 2193 dec_unacked(device); 2194 } 2195 2196 /* we delete from the conflict detection hash _after_ we sent out the 2197 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */ 2198 if (peer_req->flags & EE_IN_INTERVAL_TREE) { 2199 spin_lock_irq(&device->resource->req_lock); 2200 D_ASSERT(device, !drbd_interval_empty(&peer_req->i)); 2201 drbd_remove_epoch_entry_interval(device, peer_req); 2202 if (peer_req->flags & EE_RESTART_REQUESTS) 2203 restart_conflicting_writes(device, sector, peer_req->i.size); 2204 spin_unlock_irq(&device->resource->req_lock); 2205 } else 2206 D_ASSERT(device, drbd_interval_empty(&peer_req->i)); 2207 2208 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0)); 2209 2210 return err; 2211 } 2212 2213 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack) 2214 { 2215 struct drbd_peer_request *peer_req = 2216 container_of(w, struct drbd_peer_request, w); 2217 struct drbd_peer_device *peer_device = peer_req->peer_device; 2218 int err; 2219 2220 err = drbd_send_ack(peer_device, ack, peer_req); 2221 dec_unacked(peer_device->device); 2222 2223 return err; 2224 } 2225 2226 static int e_send_superseded(struct drbd_work *w, int unused) 2227 { 2228 return e_send_ack(w, P_SUPERSEDED); 2229 } 2230 2231 static int e_send_retry_write(struct drbd_work *w, int unused) 2232 { 2233 struct drbd_peer_request *peer_req = 2234 container_of(w, struct drbd_peer_request, w); 2235 struct drbd_connection *connection = peer_req->peer_device->connection; 2236 2237 return e_send_ack(w, connection->agreed_pro_version >= 100 ? 2238 P_RETRY_WRITE : P_SUPERSEDED); 2239 } 2240 2241 static bool seq_greater(u32 a, u32 b) 2242 { 2243 /* 2244 * We assume 32-bit wrap-around here. 2245 * For 24-bit wrap-around, we would have to shift: 2246 * a <<= 8; b <<= 8; 2247 */ 2248 return (s32)a - (s32)b > 0; 2249 } 2250 2251 static u32 seq_max(u32 a, u32 b) 2252 { 2253 return seq_greater(a, b) ? a : b; 2254 } 2255 2256 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq) 2257 { 2258 struct drbd_device *device = peer_device->device; 2259 unsigned int newest_peer_seq; 2260 2261 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) { 2262 spin_lock(&device->peer_seq_lock); 2263 newest_peer_seq = seq_max(device->peer_seq, peer_seq); 2264 device->peer_seq = newest_peer_seq; 2265 spin_unlock(&device->peer_seq_lock); 2266 /* wake up only if we actually changed device->peer_seq */ 2267 if (peer_seq == newest_peer_seq) 2268 wake_up(&device->seq_wait); 2269 } 2270 } 2271 2272 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2) 2273 { 2274 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9))); 2275 } 2276 2277 /* maybe change sync_ee into interval trees as well? */ 2278 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req) 2279 { 2280 struct drbd_peer_request *rs_req; 2281 bool rv = false; 2282 2283 spin_lock_irq(&device->resource->req_lock); 2284 list_for_each_entry(rs_req, &device->sync_ee, w.list) { 2285 if (overlaps(peer_req->i.sector, peer_req->i.size, 2286 rs_req->i.sector, rs_req->i.size)) { 2287 rv = true; 2288 break; 2289 } 2290 } 2291 spin_unlock_irq(&device->resource->req_lock); 2292 2293 return rv; 2294 } 2295 2296 /* Called from receive_Data. 2297 * Synchronize packets on sock with packets on msock. 2298 * 2299 * This is here so even when a P_DATA packet traveling via sock overtook an Ack 2300 * packet traveling on msock, they are still processed in the order they have 2301 * been sent. 2302 * 2303 * Note: we don't care for Ack packets overtaking P_DATA packets. 2304 * 2305 * In case packet_seq is larger than device->peer_seq number, there are 2306 * outstanding packets on the msock. We wait for them to arrive. 2307 * In case we are the logically next packet, we update device->peer_seq 2308 * ourselves. Correctly handles 32bit wrap around. 2309 * 2310 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second, 2311 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds 2312 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have 2313 * 1<<9 == 512 seconds aka ages for the 32bit wrap around... 2314 * 2315 * returns 0 if we may process the packet, 2316 * -ERESTARTSYS if we were interrupted (by disconnect signal). */ 2317 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq) 2318 { 2319 struct drbd_device *device = peer_device->device; 2320 DEFINE_WAIT(wait); 2321 long timeout; 2322 int ret = 0, tp; 2323 2324 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) 2325 return 0; 2326 2327 spin_lock(&device->peer_seq_lock); 2328 for (;;) { 2329 if (!seq_greater(peer_seq - 1, device->peer_seq)) { 2330 device->peer_seq = seq_max(device->peer_seq, peer_seq); 2331 break; 2332 } 2333 2334 if (signal_pending(current)) { 2335 ret = -ERESTARTSYS; 2336 break; 2337 } 2338 2339 rcu_read_lock(); 2340 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries; 2341 rcu_read_unlock(); 2342 2343 if (!tp) 2344 break; 2345 2346 /* Only need to wait if two_primaries is enabled */ 2347 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE); 2348 spin_unlock(&device->peer_seq_lock); 2349 rcu_read_lock(); 2350 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10; 2351 rcu_read_unlock(); 2352 timeout = schedule_timeout(timeout); 2353 spin_lock(&device->peer_seq_lock); 2354 if (!timeout) { 2355 ret = -ETIMEDOUT; 2356 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n"); 2357 break; 2358 } 2359 } 2360 spin_unlock(&device->peer_seq_lock); 2361 finish_wait(&device->seq_wait, &wait); 2362 return ret; 2363 } 2364 2365 /* see also bio_flags_to_wire() 2366 * DRBD_REQ_*, because we need to semantically map the flags to data packet 2367 * flags and back. We may replicate to other kernel versions. */ 2368 static unsigned long wire_flags_to_bio_flags(u32 dpf) 2369 { 2370 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) | 2371 (dpf & DP_FUA ? REQ_FUA : 0) | 2372 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0); 2373 } 2374 2375 static unsigned long wire_flags_to_bio_op(u32 dpf) 2376 { 2377 if (dpf & DP_DISCARD) 2378 return REQ_OP_DISCARD; 2379 else 2380 return REQ_OP_WRITE; 2381 } 2382 2383 static void fail_postponed_requests(struct drbd_device *device, sector_t sector, 2384 unsigned int size) 2385 { 2386 struct drbd_interval *i; 2387 2388 repeat: 2389 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2390 struct drbd_request *req; 2391 struct bio_and_error m; 2392 2393 if (!i->local) 2394 continue; 2395 req = container_of(i, struct drbd_request, i); 2396 if (!(req->rq_state & RQ_POSTPONED)) 2397 continue; 2398 req->rq_state &= ~RQ_POSTPONED; 2399 __req_mod(req, NEG_ACKED, &m); 2400 spin_unlock_irq(&device->resource->req_lock); 2401 if (m.bio) 2402 complete_master_bio(device, &m); 2403 spin_lock_irq(&device->resource->req_lock); 2404 goto repeat; 2405 } 2406 } 2407 2408 static int handle_write_conflicts(struct drbd_device *device, 2409 struct drbd_peer_request *peer_req) 2410 { 2411 struct drbd_connection *connection = peer_req->peer_device->connection; 2412 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags); 2413 sector_t sector = peer_req->i.sector; 2414 const unsigned int size = peer_req->i.size; 2415 struct drbd_interval *i; 2416 bool equal; 2417 int err; 2418 2419 /* 2420 * Inserting the peer request into the write_requests tree will prevent 2421 * new conflicting local requests from being added. 2422 */ 2423 drbd_insert_interval(&device->write_requests, &peer_req->i); 2424 2425 repeat: 2426 drbd_for_each_overlap(i, &device->write_requests, sector, size) { 2427 if (i == &peer_req->i) 2428 continue; 2429 if (i->completed) 2430 continue; 2431 2432 if (!i->local) { 2433 /* 2434 * Our peer has sent a conflicting remote request; this 2435 * should not happen in a two-node setup. Wait for the 2436 * earlier peer request to complete. 2437 */ 2438 err = drbd_wait_misc(device, i); 2439 if (err) 2440 goto out; 2441 goto repeat; 2442 } 2443 2444 equal = i->sector == sector && i->size == size; 2445 if (resolve_conflicts) { 2446 /* 2447 * If the peer request is fully contained within the 2448 * overlapping request, it can be considered overwritten 2449 * and thus superseded; otherwise, it will be retried 2450 * once all overlapping requests have completed. 2451 */ 2452 bool superseded = i->sector <= sector && i->sector + 2453 (i->size >> 9) >= sector + (size >> 9); 2454 2455 if (!equal) 2456 drbd_alert(device, "Concurrent writes detected: " 2457 "local=%llus +%u, remote=%llus +%u, " 2458 "assuming %s came first\n", 2459 (unsigned long long)i->sector, i->size, 2460 (unsigned long long)sector, size, 2461 superseded ? "local" : "remote"); 2462 2463 peer_req->w.cb = superseded ? e_send_superseded : 2464 e_send_retry_write; 2465 list_add_tail(&peer_req->w.list, &device->done_ee); 2466 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work); 2467 2468 err = -ENOENT; 2469 goto out; 2470 } else { 2471 struct drbd_request *req = 2472 container_of(i, struct drbd_request, i); 2473 2474 if (!equal) 2475 drbd_alert(device, "Concurrent writes detected: " 2476 "local=%llus +%u, remote=%llus +%u\n", 2477 (unsigned long long)i->sector, i->size, 2478 (unsigned long long)sector, size); 2479 2480 if (req->rq_state & RQ_LOCAL_PENDING || 2481 !(req->rq_state & RQ_POSTPONED)) { 2482 /* 2483 * Wait for the node with the discard flag to 2484 * decide if this request has been superseded 2485 * or needs to be retried. 2486 * Requests that have been superseded will 2487 * disappear from the write_requests tree. 2488 * 2489 * In addition, wait for the conflicting 2490 * request to finish locally before submitting 2491 * the conflicting peer request. 2492 */ 2493 err = drbd_wait_misc(device, &req->i); 2494 if (err) { 2495 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); 2496 fail_postponed_requests(device, sector, size); 2497 goto out; 2498 } 2499 goto repeat; 2500 } 2501 /* 2502 * Remember to restart the conflicting requests after 2503 * the new peer request has completed. 2504 */ 2505 peer_req->flags |= EE_RESTART_REQUESTS; 2506 } 2507 } 2508 err = 0; 2509 2510 out: 2511 if (err) 2512 drbd_remove_epoch_entry_interval(device, peer_req); 2513 return err; 2514 } 2515 2516 /* mirrored write */ 2517 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi) 2518 { 2519 struct drbd_peer_device *peer_device; 2520 struct drbd_device *device; 2521 struct net_conf *nc; 2522 sector_t sector; 2523 struct drbd_peer_request *peer_req; 2524 struct p_data *p = pi->data; 2525 u32 peer_seq = be32_to_cpu(p->seq_num); 2526 int op, op_flags; 2527 u32 dp_flags; 2528 int err, tp; 2529 2530 peer_device = conn_peer_device(connection, pi->vnr); 2531 if (!peer_device) 2532 return -EIO; 2533 device = peer_device->device; 2534 2535 if (!get_ldev(device)) { 2536 int err2; 2537 2538 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2539 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size); 2540 atomic_inc(&connection->current_epoch->epoch_size); 2541 err2 = drbd_drain_block(peer_device, pi->size); 2542 if (!err) 2543 err = err2; 2544 return err; 2545 } 2546 2547 /* 2548 * Corresponding put_ldev done either below (on various errors), or in 2549 * drbd_peer_request_endio, if we successfully submit the data at the 2550 * end of this function. 2551 */ 2552 2553 sector = be64_to_cpu(p->sector); 2554 peer_req = read_in_block(peer_device, p->block_id, sector, pi); 2555 if (!peer_req) { 2556 put_ldev(device); 2557 return -EIO; 2558 } 2559 2560 peer_req->w.cb = e_end_block; 2561 peer_req->submit_jif = jiffies; 2562 peer_req->flags |= EE_APPLICATION; 2563 2564 dp_flags = be32_to_cpu(p->dp_flags); 2565 op = wire_flags_to_bio_op(dp_flags); 2566 op_flags = wire_flags_to_bio_flags(dp_flags); 2567 if (pi->cmd == P_TRIM) { 2568 D_ASSERT(peer_device, peer_req->i.size > 0); 2569 D_ASSERT(peer_device, op == REQ_OP_DISCARD); 2570 D_ASSERT(peer_device, peer_req->pages == NULL); 2571 } else if (peer_req->pages == NULL) { 2572 D_ASSERT(device, peer_req->i.size == 0); 2573 D_ASSERT(device, dp_flags & DP_FLUSH); 2574 } 2575 2576 if (dp_flags & DP_MAY_SET_IN_SYNC) 2577 peer_req->flags |= EE_MAY_SET_IN_SYNC; 2578 2579 spin_lock(&connection->epoch_lock); 2580 peer_req->epoch = connection->current_epoch; 2581 atomic_inc(&peer_req->epoch->epoch_size); 2582 atomic_inc(&peer_req->epoch->active); 2583 spin_unlock(&connection->epoch_lock); 2584 2585 rcu_read_lock(); 2586 nc = rcu_dereference(peer_device->connection->net_conf); 2587 tp = nc->two_primaries; 2588 if (peer_device->connection->agreed_pro_version < 100) { 2589 switch (nc->wire_protocol) { 2590 case DRBD_PROT_C: 2591 dp_flags |= DP_SEND_WRITE_ACK; 2592 break; 2593 case DRBD_PROT_B: 2594 dp_flags |= DP_SEND_RECEIVE_ACK; 2595 break; 2596 } 2597 } 2598 rcu_read_unlock(); 2599 2600 if (dp_flags & DP_SEND_WRITE_ACK) { 2601 peer_req->flags |= EE_SEND_WRITE_ACK; 2602 inc_unacked(device); 2603 /* corresponding dec_unacked() in e_end_block() 2604 * respective _drbd_clear_done_ee */ 2605 } 2606 2607 if (dp_flags & DP_SEND_RECEIVE_ACK) { 2608 /* I really don't like it that the receiver thread 2609 * sends on the msock, but anyways */ 2610 drbd_send_ack(peer_device, P_RECV_ACK, peer_req); 2611 } 2612 2613 if (tp) { 2614 /* two primaries implies protocol C */ 2615 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK); 2616 peer_req->flags |= EE_IN_INTERVAL_TREE; 2617 err = wait_for_and_update_peer_seq(peer_device, peer_seq); 2618 if (err) 2619 goto out_interrupted; 2620 spin_lock_irq(&device->resource->req_lock); 2621 err = handle_write_conflicts(device, peer_req); 2622 if (err) { 2623 spin_unlock_irq(&device->resource->req_lock); 2624 if (err == -ENOENT) { 2625 put_ldev(device); 2626 return 0; 2627 } 2628 goto out_interrupted; 2629 } 2630 } else { 2631 update_peer_seq(peer_device, peer_seq); 2632 spin_lock_irq(&device->resource->req_lock); 2633 } 2634 /* TRIM and WRITE_SAME are processed synchronously, 2635 * we wait for all pending requests, respectively wait for 2636 * active_ee to become empty in drbd_submit_peer_request(); 2637 * better not add ourselves here. */ 2638 if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0) 2639 list_add_tail(&peer_req->w.list, &device->active_ee); 2640 spin_unlock_irq(&device->resource->req_lock); 2641 2642 if (device->state.conn == C_SYNC_TARGET) 2643 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req)); 2644 2645 if (device->state.pdsk < D_INCONSISTENT) { 2646 /* In case we have the only disk of the cluster, */ 2647 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 2648 peer_req->flags &= ~EE_MAY_SET_IN_SYNC; 2649 drbd_al_begin_io(device, &peer_req->i); 2650 peer_req->flags |= EE_CALL_AL_COMPLETE_IO; 2651 } 2652 2653 err = drbd_submit_peer_request(device, peer_req, op, op_flags, 2654 DRBD_FAULT_DT_WR); 2655 if (!err) 2656 return 0; 2657 2658 /* don't care for the reason here */ 2659 drbd_err(device, "submit failed, triggering re-connect\n"); 2660 spin_lock_irq(&device->resource->req_lock); 2661 list_del(&peer_req->w.list); 2662 drbd_remove_epoch_entry_interval(device, peer_req); 2663 spin_unlock_irq(&device->resource->req_lock); 2664 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) { 2665 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 2666 drbd_al_complete_io(device, &peer_req->i); 2667 } 2668 2669 out_interrupted: 2670 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP); 2671 put_ldev(device); 2672 drbd_free_peer_req(device, peer_req); 2673 return err; 2674 } 2675 2676 /* We may throttle resync, if the lower device seems to be busy, 2677 * and current sync rate is above c_min_rate. 2678 * 2679 * To decide whether or not the lower device is busy, we use a scheme similar 2680 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant" 2681 * (more than 64 sectors) of activity we cannot account for with our own resync 2682 * activity, it obviously is "busy". 2683 * 2684 * The current sync rate used here uses only the most recent two step marks, 2685 * to have a short time average so we can react faster. 2686 */ 2687 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector, 2688 bool throttle_if_app_is_waiting) 2689 { 2690 struct lc_element *tmp; 2691 bool throttle = drbd_rs_c_min_rate_throttle(device); 2692 2693 if (!throttle || throttle_if_app_is_waiting) 2694 return throttle; 2695 2696 spin_lock_irq(&device->al_lock); 2697 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector)); 2698 if (tmp) { 2699 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 2700 if (test_bit(BME_PRIORITY, &bm_ext->flags)) 2701 throttle = false; 2702 /* Do not slow down if app IO is already waiting for this extent, 2703 * and our progress is necessary for application IO to complete. */ 2704 } 2705 spin_unlock_irq(&device->al_lock); 2706 2707 return throttle; 2708 } 2709 2710 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device) 2711 { 2712 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 2713 unsigned long db, dt, dbdt; 2714 unsigned int c_min_rate; 2715 int curr_events; 2716 2717 rcu_read_lock(); 2718 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate; 2719 rcu_read_unlock(); 2720 2721 /* feature disabled? */ 2722 if (c_min_rate == 0) 2723 return false; 2724 2725 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) + 2726 (int)part_stat_read(&disk->part0, sectors[1]) - 2727 atomic_read(&device->rs_sect_ev); 2728 2729 if (atomic_read(&device->ap_actlog_cnt) 2730 || curr_events - device->rs_last_events > 64) { 2731 unsigned long rs_left; 2732 int i; 2733 2734 device->rs_last_events = curr_events; 2735 2736 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP, 2737 * approx. */ 2738 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS; 2739 2740 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 2741 rs_left = device->ov_left; 2742 else 2743 rs_left = drbd_bm_total_weight(device) - device->rs_failed; 2744 2745 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ; 2746 if (!dt) 2747 dt++; 2748 db = device->rs_mark_left[i] - rs_left; 2749 dbdt = Bit2KB(db/dt); 2750 2751 if (dbdt > c_min_rate) 2752 return true; 2753 } 2754 return false; 2755 } 2756 2757 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi) 2758 { 2759 struct drbd_peer_device *peer_device; 2760 struct drbd_device *device; 2761 sector_t sector; 2762 sector_t capacity; 2763 struct drbd_peer_request *peer_req; 2764 struct digest_info *di = NULL; 2765 int size, verb; 2766 unsigned int fault_type; 2767 struct p_block_req *p = pi->data; 2768 2769 peer_device = conn_peer_device(connection, pi->vnr); 2770 if (!peer_device) 2771 return -EIO; 2772 device = peer_device->device; 2773 capacity = drbd_get_capacity(device->this_bdev); 2774 2775 sector = be64_to_cpu(p->sector); 2776 size = be32_to_cpu(p->blksize); 2777 2778 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) { 2779 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2780 (unsigned long long)sector, size); 2781 return -EINVAL; 2782 } 2783 if (sector + (size>>9) > capacity) { 2784 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__, 2785 (unsigned long long)sector, size); 2786 return -EINVAL; 2787 } 2788 2789 if (!get_ldev_if_state(device, D_UP_TO_DATE)) { 2790 verb = 1; 2791 switch (pi->cmd) { 2792 case P_DATA_REQUEST: 2793 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p); 2794 break; 2795 case P_RS_THIN_REQ: 2796 case P_RS_DATA_REQUEST: 2797 case P_CSUM_RS_REQUEST: 2798 case P_OV_REQUEST: 2799 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p); 2800 break; 2801 case P_OV_REPLY: 2802 verb = 0; 2803 dec_rs_pending(device); 2804 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC); 2805 break; 2806 default: 2807 BUG(); 2808 } 2809 if (verb && __ratelimit(&drbd_ratelimit_state)) 2810 drbd_err(device, "Can not satisfy peer's read request, " 2811 "no local data.\n"); 2812 2813 /* drain possibly payload */ 2814 return drbd_drain_block(peer_device, pi->size); 2815 } 2816 2817 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD 2818 * "criss-cross" setup, that might cause write-out on some other DRBD, 2819 * which in turn might block on the other node at this very place. */ 2820 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, 2821 size, GFP_NOIO); 2822 if (!peer_req) { 2823 put_ldev(device); 2824 return -ENOMEM; 2825 } 2826 2827 switch (pi->cmd) { 2828 case P_DATA_REQUEST: 2829 peer_req->w.cb = w_e_end_data_req; 2830 fault_type = DRBD_FAULT_DT_RD; 2831 /* application IO, don't drbd_rs_begin_io */ 2832 peer_req->flags |= EE_APPLICATION; 2833 goto submit; 2834 2835 case P_RS_THIN_REQ: 2836 /* If at some point in the future we have a smart way to 2837 find out if this data block is completely deallocated, 2838 then we would do something smarter here than reading 2839 the block... */ 2840 peer_req->flags |= EE_RS_THIN_REQ; 2841 case P_RS_DATA_REQUEST: 2842 peer_req->w.cb = w_e_end_rsdata_req; 2843 fault_type = DRBD_FAULT_RS_RD; 2844 /* used in the sector offset progress display */ 2845 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2846 break; 2847 2848 case P_OV_REPLY: 2849 case P_CSUM_RS_REQUEST: 2850 fault_type = DRBD_FAULT_RS_RD; 2851 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO); 2852 if (!di) 2853 goto out_free_e; 2854 2855 di->digest_size = pi->size; 2856 di->digest = (((char *)di)+sizeof(struct digest_info)); 2857 2858 peer_req->digest = di; 2859 peer_req->flags |= EE_HAS_DIGEST; 2860 2861 if (drbd_recv_all(peer_device->connection, di->digest, pi->size)) 2862 goto out_free_e; 2863 2864 if (pi->cmd == P_CSUM_RS_REQUEST) { 2865 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 2866 peer_req->w.cb = w_e_end_csum_rs_req; 2867 /* used in the sector offset progress display */ 2868 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 2869 /* remember to report stats in drbd_resync_finished */ 2870 device->use_csums = true; 2871 } else if (pi->cmd == P_OV_REPLY) { 2872 /* track progress, we may need to throttle */ 2873 atomic_add(size >> 9, &device->rs_sect_in); 2874 peer_req->w.cb = w_e_end_ov_reply; 2875 dec_rs_pending(device); 2876 /* drbd_rs_begin_io done when we sent this request, 2877 * but accounting still needs to be done. */ 2878 goto submit_for_resync; 2879 } 2880 break; 2881 2882 case P_OV_REQUEST: 2883 if (device->ov_start_sector == ~(sector_t)0 && 2884 peer_device->connection->agreed_pro_version >= 90) { 2885 unsigned long now = jiffies; 2886 int i; 2887 device->ov_start_sector = sector; 2888 device->ov_position = sector; 2889 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector); 2890 device->rs_total = device->ov_left; 2891 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 2892 device->rs_mark_left[i] = device->ov_left; 2893 device->rs_mark_time[i] = now; 2894 } 2895 drbd_info(device, "Online Verify start sector: %llu\n", 2896 (unsigned long long)sector); 2897 } 2898 peer_req->w.cb = w_e_end_ov_req; 2899 fault_type = DRBD_FAULT_RS_RD; 2900 break; 2901 2902 default: 2903 BUG(); 2904 } 2905 2906 /* Throttle, drbd_rs_begin_io and submit should become asynchronous 2907 * wrt the receiver, but it is not as straightforward as it may seem. 2908 * Various places in the resync start and stop logic assume resync 2909 * requests are processed in order, requeuing this on the worker thread 2910 * introduces a bunch of new code for synchronization between threads. 2911 * 2912 * Unlimited throttling before drbd_rs_begin_io may stall the resync 2913 * "forever", throttling after drbd_rs_begin_io will lock that extent 2914 * for application writes for the same time. For now, just throttle 2915 * here, where the rest of the code expects the receiver to sleep for 2916 * a while, anyways. 2917 */ 2918 2919 /* Throttle before drbd_rs_begin_io, as that locks out application IO; 2920 * this defers syncer requests for some time, before letting at least 2921 * on request through. The resync controller on the receiving side 2922 * will adapt to the incoming rate accordingly. 2923 * 2924 * We cannot throttle here if remote is Primary/SyncTarget: 2925 * we would also throttle its application reads. 2926 * In that case, throttling is done on the SyncTarget only. 2927 */ 2928 2929 /* Even though this may be a resync request, we do add to "read_ee"; 2930 * "sync_ee" is only used for resync WRITEs. 2931 * Add to list early, so debugfs can find this request 2932 * even if we have to sleep below. */ 2933 spin_lock_irq(&device->resource->req_lock); 2934 list_add_tail(&peer_req->w.list, &device->read_ee); 2935 spin_unlock_irq(&device->resource->req_lock); 2936 2937 update_receiver_timing_details(connection, drbd_rs_should_slow_down); 2938 if (device->state.peer != R_PRIMARY 2939 && drbd_rs_should_slow_down(device, sector, false)) 2940 schedule_timeout_uninterruptible(HZ/10); 2941 update_receiver_timing_details(connection, drbd_rs_begin_io); 2942 if (drbd_rs_begin_io(device, sector)) 2943 goto out_free_e; 2944 2945 submit_for_resync: 2946 atomic_add(size >> 9, &device->rs_sect_ev); 2947 2948 submit: 2949 update_receiver_timing_details(connection, drbd_submit_peer_request); 2950 inc_unacked(device); 2951 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0, 2952 fault_type) == 0) 2953 return 0; 2954 2955 /* don't care for the reason here */ 2956 drbd_err(device, "submit failed, triggering re-connect\n"); 2957 2958 out_free_e: 2959 spin_lock_irq(&device->resource->req_lock); 2960 list_del(&peer_req->w.list); 2961 spin_unlock_irq(&device->resource->req_lock); 2962 /* no drbd_rs_complete_io(), we are dropping the connection anyways */ 2963 2964 put_ldev(device); 2965 drbd_free_peer_req(device, peer_req); 2966 return -EIO; 2967 } 2968 2969 /** 2970 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries 2971 */ 2972 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local) 2973 { 2974 struct drbd_device *device = peer_device->device; 2975 int self, peer, rv = -100; 2976 unsigned long ch_self, ch_peer; 2977 enum drbd_after_sb_p after_sb_0p; 2978 2979 self = device->ldev->md.uuid[UI_BITMAP] & 1; 2980 peer = device->p_uuid[UI_BITMAP] & 1; 2981 2982 ch_peer = device->p_uuid[UI_SIZE]; 2983 ch_self = device->comm_bm_set; 2984 2985 rcu_read_lock(); 2986 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p; 2987 rcu_read_unlock(); 2988 switch (after_sb_0p) { 2989 case ASB_CONSENSUS: 2990 case ASB_DISCARD_SECONDARY: 2991 case ASB_CALL_HELPER: 2992 case ASB_VIOLENTLY: 2993 drbd_err(device, "Configuration error.\n"); 2994 break; 2995 case ASB_DISCONNECT: 2996 break; 2997 case ASB_DISCARD_YOUNGER_PRI: 2998 if (self == 0 && peer == 1) { 2999 rv = -1; 3000 break; 3001 } 3002 if (self == 1 && peer == 0) { 3003 rv = 1; 3004 break; 3005 } 3006 /* Else fall through to one of the other strategies... */ 3007 case ASB_DISCARD_OLDER_PRI: 3008 if (self == 0 && peer == 1) { 3009 rv = 1; 3010 break; 3011 } 3012 if (self == 1 && peer == 0) { 3013 rv = -1; 3014 break; 3015 } 3016 /* Else fall through to one of the other strategies... */ 3017 drbd_warn(device, "Discard younger/older primary did not find a decision\n" 3018 "Using discard-least-changes instead\n"); 3019 case ASB_DISCARD_ZERO_CHG: 3020 if (ch_peer == 0 && ch_self == 0) { 3021 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 3022 ? -1 : 1; 3023 break; 3024 } else { 3025 if (ch_peer == 0) { rv = 1; break; } 3026 if (ch_self == 0) { rv = -1; break; } 3027 } 3028 if (after_sb_0p == ASB_DISCARD_ZERO_CHG) 3029 break; 3030 case ASB_DISCARD_LEAST_CHG: 3031 if (ch_self < ch_peer) 3032 rv = -1; 3033 else if (ch_self > ch_peer) 3034 rv = 1; 3035 else /* ( ch_self == ch_peer ) */ 3036 /* Well, then use something else. */ 3037 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) 3038 ? -1 : 1; 3039 break; 3040 case ASB_DISCARD_LOCAL: 3041 rv = -1; 3042 break; 3043 case ASB_DISCARD_REMOTE: 3044 rv = 1; 3045 } 3046 3047 return rv; 3048 } 3049 3050 /** 3051 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary 3052 */ 3053 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local) 3054 { 3055 struct drbd_device *device = peer_device->device; 3056 int hg, rv = -100; 3057 enum drbd_after_sb_p after_sb_1p; 3058 3059 rcu_read_lock(); 3060 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p; 3061 rcu_read_unlock(); 3062 switch (after_sb_1p) { 3063 case ASB_DISCARD_YOUNGER_PRI: 3064 case ASB_DISCARD_OLDER_PRI: 3065 case ASB_DISCARD_LEAST_CHG: 3066 case ASB_DISCARD_LOCAL: 3067 case ASB_DISCARD_REMOTE: 3068 case ASB_DISCARD_ZERO_CHG: 3069 drbd_err(device, "Configuration error.\n"); 3070 break; 3071 case ASB_DISCONNECT: 3072 break; 3073 case ASB_CONSENSUS: 3074 hg = drbd_asb_recover_0p(peer_device); 3075 if (hg == -1 && device->state.role == R_SECONDARY) 3076 rv = hg; 3077 if (hg == 1 && device->state.role == R_PRIMARY) 3078 rv = hg; 3079 break; 3080 case ASB_VIOLENTLY: 3081 rv = drbd_asb_recover_0p(peer_device); 3082 break; 3083 case ASB_DISCARD_SECONDARY: 3084 return device->state.role == R_PRIMARY ? 1 : -1; 3085 case ASB_CALL_HELPER: 3086 hg = drbd_asb_recover_0p(peer_device); 3087 if (hg == -1 && device->state.role == R_PRIMARY) { 3088 enum drbd_state_rv rv2; 3089 3090 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 3091 * we might be here in C_WF_REPORT_PARAMS which is transient. 3092 * we do not need to wait for the after state change work either. */ 3093 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 3094 if (rv2 != SS_SUCCESS) { 3095 drbd_khelper(device, "pri-lost-after-sb"); 3096 } else { 3097 drbd_warn(device, "Successfully gave up primary role.\n"); 3098 rv = hg; 3099 } 3100 } else 3101 rv = hg; 3102 } 3103 3104 return rv; 3105 } 3106 3107 /** 3108 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries 3109 */ 3110 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local) 3111 { 3112 struct drbd_device *device = peer_device->device; 3113 int hg, rv = -100; 3114 enum drbd_after_sb_p after_sb_2p; 3115 3116 rcu_read_lock(); 3117 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p; 3118 rcu_read_unlock(); 3119 switch (after_sb_2p) { 3120 case ASB_DISCARD_YOUNGER_PRI: 3121 case ASB_DISCARD_OLDER_PRI: 3122 case ASB_DISCARD_LEAST_CHG: 3123 case ASB_DISCARD_LOCAL: 3124 case ASB_DISCARD_REMOTE: 3125 case ASB_CONSENSUS: 3126 case ASB_DISCARD_SECONDARY: 3127 case ASB_DISCARD_ZERO_CHG: 3128 drbd_err(device, "Configuration error.\n"); 3129 break; 3130 case ASB_VIOLENTLY: 3131 rv = drbd_asb_recover_0p(peer_device); 3132 break; 3133 case ASB_DISCONNECT: 3134 break; 3135 case ASB_CALL_HELPER: 3136 hg = drbd_asb_recover_0p(peer_device); 3137 if (hg == -1) { 3138 enum drbd_state_rv rv2; 3139 3140 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE, 3141 * we might be here in C_WF_REPORT_PARAMS which is transient. 3142 * we do not need to wait for the after state change work either. */ 3143 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY)); 3144 if (rv2 != SS_SUCCESS) { 3145 drbd_khelper(device, "pri-lost-after-sb"); 3146 } else { 3147 drbd_warn(device, "Successfully gave up primary role.\n"); 3148 rv = hg; 3149 } 3150 } else 3151 rv = hg; 3152 } 3153 3154 return rv; 3155 } 3156 3157 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid, 3158 u64 bits, u64 flags) 3159 { 3160 if (!uuid) { 3161 drbd_info(device, "%s uuid info vanished while I was looking!\n", text); 3162 return; 3163 } 3164 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n", 3165 text, 3166 (unsigned long long)uuid[UI_CURRENT], 3167 (unsigned long long)uuid[UI_BITMAP], 3168 (unsigned long long)uuid[UI_HISTORY_START], 3169 (unsigned long long)uuid[UI_HISTORY_END], 3170 (unsigned long long)bits, 3171 (unsigned long long)flags); 3172 } 3173 3174 /* 3175 100 after split brain try auto recover 3176 2 C_SYNC_SOURCE set BitMap 3177 1 C_SYNC_SOURCE use BitMap 3178 0 no Sync 3179 -1 C_SYNC_TARGET use BitMap 3180 -2 C_SYNC_TARGET set BitMap 3181 -100 after split brain, disconnect 3182 -1000 unrelated data 3183 -1091 requires proto 91 3184 -1096 requires proto 96 3185 */ 3186 3187 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local) 3188 { 3189 struct drbd_peer_device *const peer_device = first_peer_device(device); 3190 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 3191 u64 self, peer; 3192 int i, j; 3193 3194 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3195 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3196 3197 *rule_nr = 10; 3198 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED) 3199 return 0; 3200 3201 *rule_nr = 20; 3202 if ((self == UUID_JUST_CREATED || self == (u64)0) && 3203 peer != UUID_JUST_CREATED) 3204 return -2; 3205 3206 *rule_nr = 30; 3207 if (self != UUID_JUST_CREATED && 3208 (peer == UUID_JUST_CREATED || peer == (u64)0)) 3209 return 2; 3210 3211 if (self == peer) { 3212 int rct, dc; /* roles at crash time */ 3213 3214 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) { 3215 3216 if (connection->agreed_pro_version < 91) 3217 return -1091; 3218 3219 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) && 3220 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) { 3221 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n"); 3222 drbd_uuid_move_history(device); 3223 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP]; 3224 device->ldev->md.uuid[UI_BITMAP] = 0; 3225 3226 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3227 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3228 *rule_nr = 34; 3229 } else { 3230 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n"); 3231 *rule_nr = 36; 3232 } 3233 3234 return 1; 3235 } 3236 3237 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) { 3238 3239 if (connection->agreed_pro_version < 91) 3240 return -1091; 3241 3242 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) && 3243 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) { 3244 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n"); 3245 3246 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START]; 3247 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP]; 3248 device->p_uuid[UI_BITMAP] = 0UL; 3249 3250 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3251 *rule_nr = 35; 3252 } else { 3253 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n"); 3254 *rule_nr = 37; 3255 } 3256 3257 return -1; 3258 } 3259 3260 /* Common power [off|failure] */ 3261 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) + 3262 (device->p_uuid[UI_FLAGS] & 2); 3263 /* lowest bit is set when we were primary, 3264 * next bit (weight 2) is set when peer was primary */ 3265 *rule_nr = 40; 3266 3267 /* Neither has the "crashed primary" flag set, 3268 * only a replication link hickup. */ 3269 if (rct == 0) 3270 return 0; 3271 3272 /* Current UUID equal and no bitmap uuid; does not necessarily 3273 * mean this was a "simultaneous hard crash", maybe IO was 3274 * frozen, so no UUID-bump happened. 3275 * This is a protocol change, overload DRBD_FF_WSAME as flag 3276 * for "new-enough" peer DRBD version. */ 3277 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) { 3278 *rule_nr = 41; 3279 if (!(connection->agreed_features & DRBD_FF_WSAME)) { 3280 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n"); 3281 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8)); 3282 } 3283 if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) { 3284 /* At least one has the "crashed primary" bit set, 3285 * both are primary now, but neither has rotated its UUIDs? 3286 * "Can not happen." */ 3287 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n"); 3288 return -100; 3289 } 3290 if (device->state.role == R_PRIMARY) 3291 return 1; 3292 return -1; 3293 } 3294 3295 /* Both are secondary. 3296 * Really looks like recovery from simultaneous hard crash. 3297 * Check which had been primary before, and arbitrate. */ 3298 switch (rct) { 3299 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */ 3300 case 1: /* self_pri && !peer_pri */ return 1; 3301 case 2: /* !self_pri && peer_pri */ return -1; 3302 case 3: /* self_pri && peer_pri */ 3303 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags); 3304 return dc ? -1 : 1; 3305 } 3306 } 3307 3308 *rule_nr = 50; 3309 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3310 if (self == peer) 3311 return -1; 3312 3313 *rule_nr = 51; 3314 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1); 3315 if (self == peer) { 3316 if (connection->agreed_pro_version < 96 ? 3317 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == 3318 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) : 3319 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) { 3320 /* The last P_SYNC_UUID did not get though. Undo the last start of 3321 resync as sync source modifications of the peer's UUIDs. */ 3322 3323 if (connection->agreed_pro_version < 91) 3324 return -1091; 3325 3326 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START]; 3327 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1]; 3328 3329 drbd_info(device, "Lost last syncUUID packet, corrected:\n"); 3330 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3331 3332 return -1; 3333 } 3334 } 3335 3336 *rule_nr = 60; 3337 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1); 3338 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3339 peer = device->p_uuid[i] & ~((u64)1); 3340 if (self == peer) 3341 return -2; 3342 } 3343 3344 *rule_nr = 70; 3345 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3346 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3347 if (self == peer) 3348 return 1; 3349 3350 *rule_nr = 71; 3351 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1); 3352 if (self == peer) { 3353 if (connection->agreed_pro_version < 96 ? 3354 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == 3355 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) : 3356 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) { 3357 /* The last P_SYNC_UUID did not get though. Undo the last start of 3358 resync as sync source modifications of our UUIDs. */ 3359 3360 if (connection->agreed_pro_version < 91) 3361 return -1091; 3362 3363 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]); 3364 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]); 3365 3366 drbd_info(device, "Last syncUUID did not get through, corrected:\n"); 3367 drbd_uuid_dump(device, "self", device->ldev->md.uuid, 3368 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0); 3369 3370 return 1; 3371 } 3372 } 3373 3374 3375 *rule_nr = 80; 3376 peer = device->p_uuid[UI_CURRENT] & ~((u64)1); 3377 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3378 self = device->ldev->md.uuid[i] & ~((u64)1); 3379 if (self == peer) 3380 return 2; 3381 } 3382 3383 *rule_nr = 90; 3384 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1); 3385 peer = device->p_uuid[UI_BITMAP] & ~((u64)1); 3386 if (self == peer && self != ((u64)0)) 3387 return 100; 3388 3389 *rule_nr = 100; 3390 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) { 3391 self = device->ldev->md.uuid[i] & ~((u64)1); 3392 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) { 3393 peer = device->p_uuid[j] & ~((u64)1); 3394 if (self == peer) 3395 return -100; 3396 } 3397 } 3398 3399 return -1000; 3400 } 3401 3402 /* drbd_sync_handshake() returns the new conn state on success, or 3403 CONN_MASK (-1) on failure. 3404 */ 3405 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device, 3406 enum drbd_role peer_role, 3407 enum drbd_disk_state peer_disk) __must_hold(local) 3408 { 3409 struct drbd_device *device = peer_device->device; 3410 enum drbd_conns rv = C_MASK; 3411 enum drbd_disk_state mydisk; 3412 struct net_conf *nc; 3413 int hg, rule_nr, rr_conflict, tentative; 3414 3415 mydisk = device->state.disk; 3416 if (mydisk == D_NEGOTIATING) 3417 mydisk = device->new_state_tmp.disk; 3418 3419 drbd_info(device, "drbd_sync_handshake:\n"); 3420 3421 spin_lock_irq(&device->ldev->md.uuid_lock); 3422 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0); 3423 drbd_uuid_dump(device, "peer", device->p_uuid, 3424 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]); 3425 3426 hg = drbd_uuid_compare(device, peer_role, &rule_nr); 3427 spin_unlock_irq(&device->ldev->md.uuid_lock); 3428 3429 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr); 3430 3431 if (hg == -1000) { 3432 drbd_alert(device, "Unrelated data, aborting!\n"); 3433 return C_MASK; 3434 } 3435 if (hg < -0x10000) { 3436 int proto, fflags; 3437 hg = -hg; 3438 proto = hg & 0xff; 3439 fflags = (hg >> 8) & 0xff; 3440 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n", 3441 proto, fflags); 3442 return C_MASK; 3443 } 3444 if (hg < -1000) { 3445 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000); 3446 return C_MASK; 3447 } 3448 3449 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) || 3450 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) { 3451 int f = (hg == -100) || abs(hg) == 2; 3452 hg = mydisk > D_INCONSISTENT ? 1 : -1; 3453 if (f) 3454 hg = hg*2; 3455 drbd_info(device, "Becoming sync %s due to disk states.\n", 3456 hg > 0 ? "source" : "target"); 3457 } 3458 3459 if (abs(hg) == 100) 3460 drbd_khelper(device, "initial-split-brain"); 3461 3462 rcu_read_lock(); 3463 nc = rcu_dereference(peer_device->connection->net_conf); 3464 3465 if (hg == 100 || (hg == -100 && nc->always_asbp)) { 3466 int pcount = (device->state.role == R_PRIMARY) 3467 + (peer_role == R_PRIMARY); 3468 int forced = (hg == -100); 3469 3470 switch (pcount) { 3471 case 0: 3472 hg = drbd_asb_recover_0p(peer_device); 3473 break; 3474 case 1: 3475 hg = drbd_asb_recover_1p(peer_device); 3476 break; 3477 case 2: 3478 hg = drbd_asb_recover_2p(peer_device); 3479 break; 3480 } 3481 if (abs(hg) < 100) { 3482 drbd_warn(device, "Split-Brain detected, %d primaries, " 3483 "automatically solved. Sync from %s node\n", 3484 pcount, (hg < 0) ? "peer" : "this"); 3485 if (forced) { 3486 drbd_warn(device, "Doing a full sync, since" 3487 " UUIDs where ambiguous.\n"); 3488 hg = hg*2; 3489 } 3490 } 3491 } 3492 3493 if (hg == -100) { 3494 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1)) 3495 hg = -1; 3496 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1)) 3497 hg = 1; 3498 3499 if (abs(hg) < 100) 3500 drbd_warn(device, "Split-Brain detected, manually solved. " 3501 "Sync from %s node\n", 3502 (hg < 0) ? "peer" : "this"); 3503 } 3504 rr_conflict = nc->rr_conflict; 3505 tentative = nc->tentative; 3506 rcu_read_unlock(); 3507 3508 if (hg == -100) { 3509 /* FIXME this log message is not correct if we end up here 3510 * after an attempted attach on a diskless node. 3511 * We just refuse to attach -- well, we drop the "connection" 3512 * to that disk, in a way... */ 3513 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n"); 3514 drbd_khelper(device, "split-brain"); 3515 return C_MASK; 3516 } 3517 3518 if (hg > 0 && mydisk <= D_INCONSISTENT) { 3519 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n"); 3520 return C_MASK; 3521 } 3522 3523 if (hg < 0 && /* by intention we do not use mydisk here. */ 3524 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) { 3525 switch (rr_conflict) { 3526 case ASB_CALL_HELPER: 3527 drbd_khelper(device, "pri-lost"); 3528 /* fall through */ 3529 case ASB_DISCONNECT: 3530 drbd_err(device, "I shall become SyncTarget, but I am primary!\n"); 3531 return C_MASK; 3532 case ASB_VIOLENTLY: 3533 drbd_warn(device, "Becoming SyncTarget, violating the stable-data" 3534 "assumption\n"); 3535 } 3536 } 3537 3538 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) { 3539 if (hg == 0) 3540 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n"); 3541 else 3542 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.", 3543 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET), 3544 abs(hg) >= 2 ? "full" : "bit-map based"); 3545 return C_MASK; 3546 } 3547 3548 if (abs(hg) >= 2) { 3549 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n"); 3550 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake", 3551 BM_LOCKED_SET_ALLOWED)) 3552 return C_MASK; 3553 } 3554 3555 if (hg > 0) { /* become sync source. */ 3556 rv = C_WF_BITMAP_S; 3557 } else if (hg < 0) { /* become sync target */ 3558 rv = C_WF_BITMAP_T; 3559 } else { 3560 rv = C_CONNECTED; 3561 if (drbd_bm_total_weight(device)) { 3562 drbd_info(device, "No resync, but %lu bits in bitmap!\n", 3563 drbd_bm_total_weight(device)); 3564 } 3565 } 3566 3567 return rv; 3568 } 3569 3570 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer) 3571 { 3572 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */ 3573 if (peer == ASB_DISCARD_REMOTE) 3574 return ASB_DISCARD_LOCAL; 3575 3576 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */ 3577 if (peer == ASB_DISCARD_LOCAL) 3578 return ASB_DISCARD_REMOTE; 3579 3580 /* everything else is valid if they are equal on both sides. */ 3581 return peer; 3582 } 3583 3584 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi) 3585 { 3586 struct p_protocol *p = pi->data; 3587 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p; 3588 int p_proto, p_discard_my_data, p_two_primaries, cf; 3589 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL; 3590 char integrity_alg[SHARED_SECRET_MAX] = ""; 3591 struct crypto_ahash *peer_integrity_tfm = NULL; 3592 void *int_dig_in = NULL, *int_dig_vv = NULL; 3593 3594 p_proto = be32_to_cpu(p->protocol); 3595 p_after_sb_0p = be32_to_cpu(p->after_sb_0p); 3596 p_after_sb_1p = be32_to_cpu(p->after_sb_1p); 3597 p_after_sb_2p = be32_to_cpu(p->after_sb_2p); 3598 p_two_primaries = be32_to_cpu(p->two_primaries); 3599 cf = be32_to_cpu(p->conn_flags); 3600 p_discard_my_data = cf & CF_DISCARD_MY_DATA; 3601 3602 if (connection->agreed_pro_version >= 87) { 3603 int err; 3604 3605 if (pi->size > sizeof(integrity_alg)) 3606 return -EIO; 3607 err = drbd_recv_all(connection, integrity_alg, pi->size); 3608 if (err) 3609 return err; 3610 integrity_alg[SHARED_SECRET_MAX - 1] = 0; 3611 } 3612 3613 if (pi->cmd != P_PROTOCOL_UPDATE) { 3614 clear_bit(CONN_DRY_RUN, &connection->flags); 3615 3616 if (cf & CF_DRY_RUN) 3617 set_bit(CONN_DRY_RUN, &connection->flags); 3618 3619 rcu_read_lock(); 3620 nc = rcu_dereference(connection->net_conf); 3621 3622 if (p_proto != nc->wire_protocol) { 3623 drbd_err(connection, "incompatible %s settings\n", "protocol"); 3624 goto disconnect_rcu_unlock; 3625 } 3626 3627 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) { 3628 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri"); 3629 goto disconnect_rcu_unlock; 3630 } 3631 3632 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) { 3633 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri"); 3634 goto disconnect_rcu_unlock; 3635 } 3636 3637 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) { 3638 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri"); 3639 goto disconnect_rcu_unlock; 3640 } 3641 3642 if (p_discard_my_data && nc->discard_my_data) { 3643 drbd_err(connection, "incompatible %s settings\n", "discard-my-data"); 3644 goto disconnect_rcu_unlock; 3645 } 3646 3647 if (p_two_primaries != nc->two_primaries) { 3648 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries"); 3649 goto disconnect_rcu_unlock; 3650 } 3651 3652 if (strcmp(integrity_alg, nc->integrity_alg)) { 3653 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg"); 3654 goto disconnect_rcu_unlock; 3655 } 3656 3657 rcu_read_unlock(); 3658 } 3659 3660 if (integrity_alg[0]) { 3661 int hash_size; 3662 3663 /* 3664 * We can only change the peer data integrity algorithm 3665 * here. Changing our own data integrity algorithm 3666 * requires that we send a P_PROTOCOL_UPDATE packet at 3667 * the same time; otherwise, the peer has no way to 3668 * tell between which packets the algorithm should 3669 * change. 3670 */ 3671 3672 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC); 3673 if (IS_ERR(peer_integrity_tfm)) { 3674 peer_integrity_tfm = NULL; 3675 drbd_err(connection, "peer data-integrity-alg %s not supported\n", 3676 integrity_alg); 3677 goto disconnect; 3678 } 3679 3680 hash_size = crypto_ahash_digestsize(peer_integrity_tfm); 3681 int_dig_in = kmalloc(hash_size, GFP_KERNEL); 3682 int_dig_vv = kmalloc(hash_size, GFP_KERNEL); 3683 if (!(int_dig_in && int_dig_vv)) { 3684 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n"); 3685 goto disconnect; 3686 } 3687 } 3688 3689 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL); 3690 if (!new_net_conf) { 3691 drbd_err(connection, "Allocation of new net_conf failed\n"); 3692 goto disconnect; 3693 } 3694 3695 mutex_lock(&connection->data.mutex); 3696 mutex_lock(&connection->resource->conf_update); 3697 old_net_conf = connection->net_conf; 3698 *new_net_conf = *old_net_conf; 3699 3700 new_net_conf->wire_protocol = p_proto; 3701 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p); 3702 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p); 3703 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p); 3704 new_net_conf->two_primaries = p_two_primaries; 3705 3706 rcu_assign_pointer(connection->net_conf, new_net_conf); 3707 mutex_unlock(&connection->resource->conf_update); 3708 mutex_unlock(&connection->data.mutex); 3709 3710 crypto_free_ahash(connection->peer_integrity_tfm); 3711 kfree(connection->int_dig_in); 3712 kfree(connection->int_dig_vv); 3713 connection->peer_integrity_tfm = peer_integrity_tfm; 3714 connection->int_dig_in = int_dig_in; 3715 connection->int_dig_vv = int_dig_vv; 3716 3717 if (strcmp(old_net_conf->integrity_alg, integrity_alg)) 3718 drbd_info(connection, "peer data-integrity-alg: %s\n", 3719 integrity_alg[0] ? integrity_alg : "(none)"); 3720 3721 synchronize_rcu(); 3722 kfree(old_net_conf); 3723 return 0; 3724 3725 disconnect_rcu_unlock: 3726 rcu_read_unlock(); 3727 disconnect: 3728 crypto_free_ahash(peer_integrity_tfm); 3729 kfree(int_dig_in); 3730 kfree(int_dig_vv); 3731 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 3732 return -EIO; 3733 } 3734 3735 /* helper function 3736 * input: alg name, feature name 3737 * return: NULL (alg name was "") 3738 * ERR_PTR(error) if something goes wrong 3739 * or the crypto hash ptr, if it worked out ok. */ 3740 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device, 3741 const char *alg, const char *name) 3742 { 3743 struct crypto_ahash *tfm; 3744 3745 if (!alg[0]) 3746 return NULL; 3747 3748 tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC); 3749 if (IS_ERR(tfm)) { 3750 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n", 3751 alg, name, PTR_ERR(tfm)); 3752 return tfm; 3753 } 3754 return tfm; 3755 } 3756 3757 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi) 3758 { 3759 void *buffer = connection->data.rbuf; 3760 int size = pi->size; 3761 3762 while (size) { 3763 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE); 3764 s = drbd_recv(connection, buffer, s); 3765 if (s <= 0) { 3766 if (s < 0) 3767 return s; 3768 break; 3769 } 3770 size -= s; 3771 } 3772 if (size) 3773 return -EIO; 3774 return 0; 3775 } 3776 3777 /* 3778 * config_unknown_volume - device configuration command for unknown volume 3779 * 3780 * When a device is added to an existing connection, the node on which the 3781 * device is added first will send configuration commands to its peer but the 3782 * peer will not know about the device yet. It will warn and ignore these 3783 * commands. Once the device is added on the second node, the second node will 3784 * send the same device configuration commands, but in the other direction. 3785 * 3786 * (We can also end up here if drbd is misconfigured.) 3787 */ 3788 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi) 3789 { 3790 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n", 3791 cmdname(pi->cmd), pi->vnr); 3792 return ignore_remaining_packet(connection, pi); 3793 } 3794 3795 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi) 3796 { 3797 struct drbd_peer_device *peer_device; 3798 struct drbd_device *device; 3799 struct p_rs_param_95 *p; 3800 unsigned int header_size, data_size, exp_max_sz; 3801 struct crypto_ahash *verify_tfm = NULL; 3802 struct crypto_ahash *csums_tfm = NULL; 3803 struct net_conf *old_net_conf, *new_net_conf = NULL; 3804 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL; 3805 const int apv = connection->agreed_pro_version; 3806 struct fifo_buffer *old_plan = NULL, *new_plan = NULL; 3807 int fifo_size = 0; 3808 int err; 3809 3810 peer_device = conn_peer_device(connection, pi->vnr); 3811 if (!peer_device) 3812 return config_unknown_volume(connection, pi); 3813 device = peer_device->device; 3814 3815 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param) 3816 : apv == 88 ? sizeof(struct p_rs_param) 3817 + SHARED_SECRET_MAX 3818 : apv <= 94 ? sizeof(struct p_rs_param_89) 3819 : /* apv >= 95 */ sizeof(struct p_rs_param_95); 3820 3821 if (pi->size > exp_max_sz) { 3822 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n", 3823 pi->size, exp_max_sz); 3824 return -EIO; 3825 } 3826 3827 if (apv <= 88) { 3828 header_size = sizeof(struct p_rs_param); 3829 data_size = pi->size - header_size; 3830 } else if (apv <= 94) { 3831 header_size = sizeof(struct p_rs_param_89); 3832 data_size = pi->size - header_size; 3833 D_ASSERT(device, data_size == 0); 3834 } else { 3835 header_size = sizeof(struct p_rs_param_95); 3836 data_size = pi->size - header_size; 3837 D_ASSERT(device, data_size == 0); 3838 } 3839 3840 /* initialize verify_alg and csums_alg */ 3841 p = pi->data; 3842 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX); 3843 3844 err = drbd_recv_all(peer_device->connection, p, header_size); 3845 if (err) 3846 return err; 3847 3848 mutex_lock(&connection->resource->conf_update); 3849 old_net_conf = peer_device->connection->net_conf; 3850 if (get_ldev(device)) { 3851 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 3852 if (!new_disk_conf) { 3853 put_ldev(device); 3854 mutex_unlock(&connection->resource->conf_update); 3855 drbd_err(device, "Allocation of new disk_conf failed\n"); 3856 return -ENOMEM; 3857 } 3858 3859 old_disk_conf = device->ldev->disk_conf; 3860 *new_disk_conf = *old_disk_conf; 3861 3862 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate); 3863 } 3864 3865 if (apv >= 88) { 3866 if (apv == 88) { 3867 if (data_size > SHARED_SECRET_MAX || data_size == 0) { 3868 drbd_err(device, "verify-alg of wrong size, " 3869 "peer wants %u, accepting only up to %u byte\n", 3870 data_size, SHARED_SECRET_MAX); 3871 err = -EIO; 3872 goto reconnect; 3873 } 3874 3875 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size); 3876 if (err) 3877 goto reconnect; 3878 /* we expect NUL terminated string */ 3879 /* but just in case someone tries to be evil */ 3880 D_ASSERT(device, p->verify_alg[data_size-1] == 0); 3881 p->verify_alg[data_size-1] = 0; 3882 3883 } else /* apv >= 89 */ { 3884 /* we still expect NUL terminated strings */ 3885 /* but just in case someone tries to be evil */ 3886 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0); 3887 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0); 3888 p->verify_alg[SHARED_SECRET_MAX-1] = 0; 3889 p->csums_alg[SHARED_SECRET_MAX-1] = 0; 3890 } 3891 3892 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) { 3893 if (device->state.conn == C_WF_REPORT_PARAMS) { 3894 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n", 3895 old_net_conf->verify_alg, p->verify_alg); 3896 goto disconnect; 3897 } 3898 verify_tfm = drbd_crypto_alloc_digest_safe(device, 3899 p->verify_alg, "verify-alg"); 3900 if (IS_ERR(verify_tfm)) { 3901 verify_tfm = NULL; 3902 goto disconnect; 3903 } 3904 } 3905 3906 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) { 3907 if (device->state.conn == C_WF_REPORT_PARAMS) { 3908 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n", 3909 old_net_conf->csums_alg, p->csums_alg); 3910 goto disconnect; 3911 } 3912 csums_tfm = drbd_crypto_alloc_digest_safe(device, 3913 p->csums_alg, "csums-alg"); 3914 if (IS_ERR(csums_tfm)) { 3915 csums_tfm = NULL; 3916 goto disconnect; 3917 } 3918 } 3919 3920 if (apv > 94 && new_disk_conf) { 3921 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead); 3922 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target); 3923 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target); 3924 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate); 3925 3926 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ; 3927 if (fifo_size != device->rs_plan_s->size) { 3928 new_plan = fifo_alloc(fifo_size); 3929 if (!new_plan) { 3930 drbd_err(device, "kmalloc of fifo_buffer failed"); 3931 put_ldev(device); 3932 goto disconnect; 3933 } 3934 } 3935 } 3936 3937 if (verify_tfm || csums_tfm) { 3938 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL); 3939 if (!new_net_conf) { 3940 drbd_err(device, "Allocation of new net_conf failed\n"); 3941 goto disconnect; 3942 } 3943 3944 *new_net_conf = *old_net_conf; 3945 3946 if (verify_tfm) { 3947 strcpy(new_net_conf->verify_alg, p->verify_alg); 3948 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1; 3949 crypto_free_ahash(peer_device->connection->verify_tfm); 3950 peer_device->connection->verify_tfm = verify_tfm; 3951 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg); 3952 } 3953 if (csums_tfm) { 3954 strcpy(new_net_conf->csums_alg, p->csums_alg); 3955 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1; 3956 crypto_free_ahash(peer_device->connection->csums_tfm); 3957 peer_device->connection->csums_tfm = csums_tfm; 3958 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg); 3959 } 3960 rcu_assign_pointer(connection->net_conf, new_net_conf); 3961 } 3962 } 3963 3964 if (new_disk_conf) { 3965 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 3966 put_ldev(device); 3967 } 3968 3969 if (new_plan) { 3970 old_plan = device->rs_plan_s; 3971 rcu_assign_pointer(device->rs_plan_s, new_plan); 3972 } 3973 3974 mutex_unlock(&connection->resource->conf_update); 3975 synchronize_rcu(); 3976 if (new_net_conf) 3977 kfree(old_net_conf); 3978 kfree(old_disk_conf); 3979 kfree(old_plan); 3980 3981 return 0; 3982 3983 reconnect: 3984 if (new_disk_conf) { 3985 put_ldev(device); 3986 kfree(new_disk_conf); 3987 } 3988 mutex_unlock(&connection->resource->conf_update); 3989 return -EIO; 3990 3991 disconnect: 3992 kfree(new_plan); 3993 if (new_disk_conf) { 3994 put_ldev(device); 3995 kfree(new_disk_conf); 3996 } 3997 mutex_unlock(&connection->resource->conf_update); 3998 /* just for completeness: actually not needed, 3999 * as this is not reached if csums_tfm was ok. */ 4000 crypto_free_ahash(csums_tfm); 4001 /* but free the verify_tfm again, if csums_tfm did not work out */ 4002 crypto_free_ahash(verify_tfm); 4003 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4004 return -EIO; 4005 } 4006 4007 /* warn if the arguments differ by more than 12.5% */ 4008 static void warn_if_differ_considerably(struct drbd_device *device, 4009 const char *s, sector_t a, sector_t b) 4010 { 4011 sector_t d; 4012 if (a == 0 || b == 0) 4013 return; 4014 d = (a > b) ? (a - b) : (b - a); 4015 if (d > (a>>3) || d > (b>>3)) 4016 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s, 4017 (unsigned long long)a, (unsigned long long)b); 4018 } 4019 4020 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi) 4021 { 4022 struct drbd_peer_device *peer_device; 4023 struct drbd_device *device; 4024 struct p_sizes *p = pi->data; 4025 struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL; 4026 enum determine_dev_size dd = DS_UNCHANGED; 4027 sector_t p_size, p_usize, p_csize, my_usize; 4028 int ldsc = 0; /* local disk size changed */ 4029 enum dds_flags ddsf; 4030 4031 peer_device = conn_peer_device(connection, pi->vnr); 4032 if (!peer_device) 4033 return config_unknown_volume(connection, pi); 4034 device = peer_device->device; 4035 4036 p_size = be64_to_cpu(p->d_size); 4037 p_usize = be64_to_cpu(p->u_size); 4038 p_csize = be64_to_cpu(p->c_size); 4039 4040 /* just store the peer's disk size for now. 4041 * we still need to figure out whether we accept that. */ 4042 device->p_size = p_size; 4043 4044 if (get_ldev(device)) { 4045 sector_t new_size, cur_size; 4046 rcu_read_lock(); 4047 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size; 4048 rcu_read_unlock(); 4049 4050 warn_if_differ_considerably(device, "lower level device sizes", 4051 p_size, drbd_get_max_capacity(device->ldev)); 4052 warn_if_differ_considerably(device, "user requested size", 4053 p_usize, my_usize); 4054 4055 /* if this is the first connect, or an otherwise expected 4056 * param exchange, choose the minimum */ 4057 if (device->state.conn == C_WF_REPORT_PARAMS) 4058 p_usize = min_not_zero(my_usize, p_usize); 4059 4060 /* Never shrink a device with usable data during connect. 4061 But allow online shrinking if we are connected. */ 4062 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0); 4063 cur_size = drbd_get_capacity(device->this_bdev); 4064 if (new_size < cur_size && 4065 device->state.disk >= D_OUTDATED && 4066 device->state.conn < C_CONNECTED) { 4067 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n", 4068 (unsigned long long)new_size, (unsigned long long)cur_size); 4069 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4070 put_ldev(device); 4071 return -EIO; 4072 } 4073 4074 if (my_usize != p_usize) { 4075 struct disk_conf *old_disk_conf, *new_disk_conf = NULL; 4076 4077 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL); 4078 if (!new_disk_conf) { 4079 drbd_err(device, "Allocation of new disk_conf failed\n"); 4080 put_ldev(device); 4081 return -ENOMEM; 4082 } 4083 4084 mutex_lock(&connection->resource->conf_update); 4085 old_disk_conf = device->ldev->disk_conf; 4086 *new_disk_conf = *old_disk_conf; 4087 new_disk_conf->disk_size = p_usize; 4088 4089 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf); 4090 mutex_unlock(&connection->resource->conf_update); 4091 synchronize_rcu(); 4092 kfree(old_disk_conf); 4093 4094 drbd_info(device, "Peer sets u_size to %lu sectors\n", 4095 (unsigned long)my_usize); 4096 } 4097 4098 put_ldev(device); 4099 } 4100 4101 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size); 4102 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size(). 4103 In case we cleared the QUEUE_FLAG_DISCARD from our queue in 4104 drbd_reconsider_queue_parameters(), we can be sure that after 4105 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */ 4106 4107 ddsf = be16_to_cpu(p->dds_flags); 4108 if (get_ldev(device)) { 4109 drbd_reconsider_queue_parameters(device, device->ldev, o); 4110 dd = drbd_determine_dev_size(device, ddsf, NULL); 4111 put_ldev(device); 4112 if (dd == DS_ERROR) 4113 return -EIO; 4114 drbd_md_sync(device); 4115 } else { 4116 /* 4117 * I am diskless, need to accept the peer's *current* size. 4118 * I must NOT accept the peers backing disk size, 4119 * it may have been larger than mine all along... 4120 * 4121 * At this point, the peer knows more about my disk, or at 4122 * least about what we last agreed upon, than myself. 4123 * So if his c_size is less than his d_size, the most likely 4124 * reason is that *my* d_size was smaller last time we checked. 4125 * 4126 * However, if he sends a zero current size, 4127 * take his (user-capped or) backing disk size anyways. 4128 */ 4129 drbd_reconsider_queue_parameters(device, NULL, o); 4130 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size); 4131 } 4132 4133 if (get_ldev(device)) { 4134 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) { 4135 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev); 4136 ldsc = 1; 4137 } 4138 4139 put_ldev(device); 4140 } 4141 4142 if (device->state.conn > C_WF_REPORT_PARAMS) { 4143 if (be64_to_cpu(p->c_size) != 4144 drbd_get_capacity(device->this_bdev) || ldsc) { 4145 /* we have different sizes, probably peer 4146 * needs to know my new size... */ 4147 drbd_send_sizes(peer_device, 0, ddsf); 4148 } 4149 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) || 4150 (dd == DS_GREW && device->state.conn == C_CONNECTED)) { 4151 if (device->state.pdsk >= D_INCONSISTENT && 4152 device->state.disk >= D_INCONSISTENT) { 4153 if (ddsf & DDSF_NO_RESYNC) 4154 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n"); 4155 else 4156 resync_after_online_grow(device); 4157 } else 4158 set_bit(RESYNC_AFTER_NEG, &device->flags); 4159 } 4160 } 4161 4162 return 0; 4163 } 4164 4165 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi) 4166 { 4167 struct drbd_peer_device *peer_device; 4168 struct drbd_device *device; 4169 struct p_uuids *p = pi->data; 4170 u64 *p_uuid; 4171 int i, updated_uuids = 0; 4172 4173 peer_device = conn_peer_device(connection, pi->vnr); 4174 if (!peer_device) 4175 return config_unknown_volume(connection, pi); 4176 device = peer_device->device; 4177 4178 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO); 4179 if (!p_uuid) { 4180 drbd_err(device, "kmalloc of p_uuid failed\n"); 4181 return false; 4182 } 4183 4184 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++) 4185 p_uuid[i] = be64_to_cpu(p->uuid[i]); 4186 4187 kfree(device->p_uuid); 4188 device->p_uuid = p_uuid; 4189 4190 if (device->state.conn < C_CONNECTED && 4191 device->state.disk < D_INCONSISTENT && 4192 device->state.role == R_PRIMARY && 4193 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) { 4194 drbd_err(device, "Can only connect to data with current UUID=%016llX\n", 4195 (unsigned long long)device->ed_uuid); 4196 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4197 return -EIO; 4198 } 4199 4200 if (get_ldev(device)) { 4201 int skip_initial_sync = 4202 device->state.conn == C_CONNECTED && 4203 peer_device->connection->agreed_pro_version >= 90 && 4204 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED && 4205 (p_uuid[UI_FLAGS] & 8); 4206 if (skip_initial_sync) { 4207 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n"); 4208 drbd_bitmap_io(device, &drbd_bmio_clear_n_write, 4209 "clear_n_write from receive_uuids", 4210 BM_LOCKED_TEST_ALLOWED); 4211 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]); 4212 _drbd_uuid_set(device, UI_BITMAP, 0); 4213 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE), 4214 CS_VERBOSE, NULL); 4215 drbd_md_sync(device); 4216 updated_uuids = 1; 4217 } 4218 put_ldev(device); 4219 } else if (device->state.disk < D_INCONSISTENT && 4220 device->state.role == R_PRIMARY) { 4221 /* I am a diskless primary, the peer just created a new current UUID 4222 for me. */ 4223 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 4224 } 4225 4226 /* Before we test for the disk state, we should wait until an eventually 4227 ongoing cluster wide state change is finished. That is important if 4228 we are primary and are detaching from our disk. We need to see the 4229 new disk state... */ 4230 mutex_lock(device->state_mutex); 4231 mutex_unlock(device->state_mutex); 4232 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT) 4233 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]); 4234 4235 if (updated_uuids) 4236 drbd_print_uuids(device, "receiver updated UUIDs to"); 4237 4238 return 0; 4239 } 4240 4241 /** 4242 * convert_state() - Converts the peer's view of the cluster state to our point of view 4243 * @ps: The state as seen by the peer. 4244 */ 4245 static union drbd_state convert_state(union drbd_state ps) 4246 { 4247 union drbd_state ms; 4248 4249 static enum drbd_conns c_tab[] = { 4250 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS, 4251 [C_CONNECTED] = C_CONNECTED, 4252 4253 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T, 4254 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S, 4255 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */ 4256 [C_VERIFY_S] = C_VERIFY_T, 4257 [C_MASK] = C_MASK, 4258 }; 4259 4260 ms.i = ps.i; 4261 4262 ms.conn = c_tab[ps.conn]; 4263 ms.peer = ps.role; 4264 ms.role = ps.peer; 4265 ms.pdsk = ps.disk; 4266 ms.disk = ps.pdsk; 4267 ms.peer_isp = (ps.aftr_isp | ps.user_isp); 4268 4269 return ms; 4270 } 4271 4272 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi) 4273 { 4274 struct drbd_peer_device *peer_device; 4275 struct drbd_device *device; 4276 struct p_req_state *p = pi->data; 4277 union drbd_state mask, val; 4278 enum drbd_state_rv rv; 4279 4280 peer_device = conn_peer_device(connection, pi->vnr); 4281 if (!peer_device) 4282 return -EIO; 4283 device = peer_device->device; 4284 4285 mask.i = be32_to_cpu(p->mask); 4286 val.i = be32_to_cpu(p->val); 4287 4288 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) && 4289 mutex_is_locked(device->state_mutex)) { 4290 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG); 4291 return 0; 4292 } 4293 4294 mask = convert_state(mask); 4295 val = convert_state(val); 4296 4297 rv = drbd_change_state(device, CS_VERBOSE, mask, val); 4298 drbd_send_sr_reply(peer_device, rv); 4299 4300 drbd_md_sync(device); 4301 4302 return 0; 4303 } 4304 4305 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi) 4306 { 4307 struct p_req_state *p = pi->data; 4308 union drbd_state mask, val; 4309 enum drbd_state_rv rv; 4310 4311 mask.i = be32_to_cpu(p->mask); 4312 val.i = be32_to_cpu(p->val); 4313 4314 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) && 4315 mutex_is_locked(&connection->cstate_mutex)) { 4316 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG); 4317 return 0; 4318 } 4319 4320 mask = convert_state(mask); 4321 val = convert_state(val); 4322 4323 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL); 4324 conn_send_sr_reply(connection, rv); 4325 4326 return 0; 4327 } 4328 4329 static int receive_state(struct drbd_connection *connection, struct packet_info *pi) 4330 { 4331 struct drbd_peer_device *peer_device; 4332 struct drbd_device *device; 4333 struct p_state *p = pi->data; 4334 union drbd_state os, ns, peer_state; 4335 enum drbd_disk_state real_peer_disk; 4336 enum chg_state_flags cs_flags; 4337 int rv; 4338 4339 peer_device = conn_peer_device(connection, pi->vnr); 4340 if (!peer_device) 4341 return config_unknown_volume(connection, pi); 4342 device = peer_device->device; 4343 4344 peer_state.i = be32_to_cpu(p->state); 4345 4346 real_peer_disk = peer_state.disk; 4347 if (peer_state.disk == D_NEGOTIATING) { 4348 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT; 4349 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk)); 4350 } 4351 4352 spin_lock_irq(&device->resource->req_lock); 4353 retry: 4354 os = ns = drbd_read_state(device); 4355 spin_unlock_irq(&device->resource->req_lock); 4356 4357 /* If some other part of the code (ack_receiver thread, timeout) 4358 * already decided to close the connection again, 4359 * we must not "re-establish" it here. */ 4360 if (os.conn <= C_TEAR_DOWN) 4361 return -ECONNRESET; 4362 4363 /* If this is the "end of sync" confirmation, usually the peer disk 4364 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits 4365 * set) resync started in PausedSyncT, or if the timing of pause-/ 4366 * unpause-sync events has been "just right", the peer disk may 4367 * transition from D_CONSISTENT to D_UP_TO_DATE as well. 4368 */ 4369 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) && 4370 real_peer_disk == D_UP_TO_DATE && 4371 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) { 4372 /* If we are (becoming) SyncSource, but peer is still in sync 4373 * preparation, ignore its uptodate-ness to avoid flapping, it 4374 * will change to inconsistent once the peer reaches active 4375 * syncing states. 4376 * It may have changed syncer-paused flags, however, so we 4377 * cannot ignore this completely. */ 4378 if (peer_state.conn > C_CONNECTED && 4379 peer_state.conn < C_SYNC_SOURCE) 4380 real_peer_disk = D_INCONSISTENT; 4381 4382 /* if peer_state changes to connected at the same time, 4383 * it explicitly notifies us that it finished resync. 4384 * Maybe we should finish it up, too? */ 4385 else if (os.conn >= C_SYNC_SOURCE && 4386 peer_state.conn == C_CONNECTED) { 4387 if (drbd_bm_total_weight(device) <= device->rs_failed) 4388 drbd_resync_finished(device); 4389 return 0; 4390 } 4391 } 4392 4393 /* explicit verify finished notification, stop sector reached. */ 4394 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE && 4395 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) { 4396 ov_out_of_sync_print(device); 4397 drbd_resync_finished(device); 4398 return 0; 4399 } 4400 4401 /* peer says his disk is inconsistent, while we think it is uptodate, 4402 * and this happens while the peer still thinks we have a sync going on, 4403 * but we think we are already done with the sync. 4404 * We ignore this to avoid flapping pdsk. 4405 * This should not happen, if the peer is a recent version of drbd. */ 4406 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT && 4407 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE) 4408 real_peer_disk = D_UP_TO_DATE; 4409 4410 if (ns.conn == C_WF_REPORT_PARAMS) 4411 ns.conn = C_CONNECTED; 4412 4413 if (peer_state.conn == C_AHEAD) 4414 ns.conn = C_BEHIND; 4415 4416 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING && 4417 get_ldev_if_state(device, D_NEGOTIATING)) { 4418 int cr; /* consider resync */ 4419 4420 /* if we established a new connection */ 4421 cr = (os.conn < C_CONNECTED); 4422 /* if we had an established connection 4423 * and one of the nodes newly attaches a disk */ 4424 cr |= (os.conn == C_CONNECTED && 4425 (peer_state.disk == D_NEGOTIATING || 4426 os.disk == D_NEGOTIATING)); 4427 /* if we have both been inconsistent, and the peer has been 4428 * forced to be UpToDate with --overwrite-data */ 4429 cr |= test_bit(CONSIDER_RESYNC, &device->flags); 4430 /* if we had been plain connected, and the admin requested to 4431 * start a sync by "invalidate" or "invalidate-remote" */ 4432 cr |= (os.conn == C_CONNECTED && 4433 (peer_state.conn >= C_STARTING_SYNC_S && 4434 peer_state.conn <= C_WF_BITMAP_T)); 4435 4436 if (cr) 4437 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk); 4438 4439 put_ldev(device); 4440 if (ns.conn == C_MASK) { 4441 ns.conn = C_CONNECTED; 4442 if (device->state.disk == D_NEGOTIATING) { 4443 drbd_force_state(device, NS(disk, D_FAILED)); 4444 } else if (peer_state.disk == D_NEGOTIATING) { 4445 drbd_err(device, "Disk attach process on the peer node was aborted.\n"); 4446 peer_state.disk = D_DISKLESS; 4447 real_peer_disk = D_DISKLESS; 4448 } else { 4449 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags)) 4450 return -EIO; 4451 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS); 4452 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4453 return -EIO; 4454 } 4455 } 4456 } 4457 4458 spin_lock_irq(&device->resource->req_lock); 4459 if (os.i != drbd_read_state(device).i) 4460 goto retry; 4461 clear_bit(CONSIDER_RESYNC, &device->flags); 4462 ns.peer = peer_state.role; 4463 ns.pdsk = real_peer_disk; 4464 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp); 4465 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING) 4466 ns.disk = device->new_state_tmp.disk; 4467 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD); 4468 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED && 4469 test_bit(NEW_CUR_UUID, &device->flags)) { 4470 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this 4471 for temporal network outages! */ 4472 spin_unlock_irq(&device->resource->req_lock); 4473 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n"); 4474 tl_clear(peer_device->connection); 4475 drbd_uuid_new_current(device); 4476 clear_bit(NEW_CUR_UUID, &device->flags); 4477 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD); 4478 return -EIO; 4479 } 4480 rv = _drbd_set_state(device, ns, cs_flags, NULL); 4481 ns = drbd_read_state(device); 4482 spin_unlock_irq(&device->resource->req_lock); 4483 4484 if (rv < SS_SUCCESS) { 4485 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD); 4486 return -EIO; 4487 } 4488 4489 if (os.conn > C_WF_REPORT_PARAMS) { 4490 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED && 4491 peer_state.disk != D_NEGOTIATING ) { 4492 /* we want resync, peer has not yet decided to sync... */ 4493 /* Nowadays only used when forcing a node into primary role and 4494 setting its disk to UpToDate with that */ 4495 drbd_send_uuids(peer_device); 4496 drbd_send_current_state(peer_device); 4497 } 4498 } 4499 4500 clear_bit(DISCARD_MY_DATA, &device->flags); 4501 4502 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */ 4503 4504 return 0; 4505 } 4506 4507 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi) 4508 { 4509 struct drbd_peer_device *peer_device; 4510 struct drbd_device *device; 4511 struct p_rs_uuid *p = pi->data; 4512 4513 peer_device = conn_peer_device(connection, pi->vnr); 4514 if (!peer_device) 4515 return -EIO; 4516 device = peer_device->device; 4517 4518 wait_event(device->misc_wait, 4519 device->state.conn == C_WF_SYNC_UUID || 4520 device->state.conn == C_BEHIND || 4521 device->state.conn < C_CONNECTED || 4522 device->state.disk < D_NEGOTIATING); 4523 4524 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */ 4525 4526 /* Here the _drbd_uuid_ functions are right, current should 4527 _not_ be rotated into the history */ 4528 if (get_ldev_if_state(device, D_NEGOTIATING)) { 4529 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid)); 4530 _drbd_uuid_set(device, UI_BITMAP, 0UL); 4531 4532 drbd_print_uuids(device, "updated sync uuid"); 4533 drbd_start_resync(device, C_SYNC_TARGET); 4534 4535 put_ldev(device); 4536 } else 4537 drbd_err(device, "Ignoring SyncUUID packet!\n"); 4538 4539 return 0; 4540 } 4541 4542 /** 4543 * receive_bitmap_plain 4544 * 4545 * Return 0 when done, 1 when another iteration is needed, and a negative error 4546 * code upon failure. 4547 */ 4548 static int 4549 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size, 4550 unsigned long *p, struct bm_xfer_ctx *c) 4551 { 4552 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - 4553 drbd_header_size(peer_device->connection); 4554 unsigned int num_words = min_t(size_t, data_size / sizeof(*p), 4555 c->bm_words - c->word_offset); 4556 unsigned int want = num_words * sizeof(*p); 4557 int err; 4558 4559 if (want != size) { 4560 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size); 4561 return -EIO; 4562 } 4563 if (want == 0) 4564 return 0; 4565 err = drbd_recv_all(peer_device->connection, p, want); 4566 if (err) 4567 return err; 4568 4569 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p); 4570 4571 c->word_offset += num_words; 4572 c->bit_offset = c->word_offset * BITS_PER_LONG; 4573 if (c->bit_offset > c->bm_bits) 4574 c->bit_offset = c->bm_bits; 4575 4576 return 1; 4577 } 4578 4579 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p) 4580 { 4581 return (enum drbd_bitmap_code)(p->encoding & 0x0f); 4582 } 4583 4584 static int dcbp_get_start(struct p_compressed_bm *p) 4585 { 4586 return (p->encoding & 0x80) != 0; 4587 } 4588 4589 static int dcbp_get_pad_bits(struct p_compressed_bm *p) 4590 { 4591 return (p->encoding >> 4) & 0x7; 4592 } 4593 4594 /** 4595 * recv_bm_rle_bits 4596 * 4597 * Return 0 when done, 1 when another iteration is needed, and a negative error 4598 * code upon failure. 4599 */ 4600 static int 4601 recv_bm_rle_bits(struct drbd_peer_device *peer_device, 4602 struct p_compressed_bm *p, 4603 struct bm_xfer_ctx *c, 4604 unsigned int len) 4605 { 4606 struct bitstream bs; 4607 u64 look_ahead; 4608 u64 rl; 4609 u64 tmp; 4610 unsigned long s = c->bit_offset; 4611 unsigned long e; 4612 int toggle = dcbp_get_start(p); 4613 int have; 4614 int bits; 4615 4616 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p)); 4617 4618 bits = bitstream_get_bits(&bs, &look_ahead, 64); 4619 if (bits < 0) 4620 return -EIO; 4621 4622 for (have = bits; have > 0; s += rl, toggle = !toggle) { 4623 bits = vli_decode_bits(&rl, look_ahead); 4624 if (bits <= 0) 4625 return -EIO; 4626 4627 if (toggle) { 4628 e = s + rl -1; 4629 if (e >= c->bm_bits) { 4630 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e); 4631 return -EIO; 4632 } 4633 _drbd_bm_set_bits(peer_device->device, s, e); 4634 } 4635 4636 if (have < bits) { 4637 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n", 4638 have, bits, look_ahead, 4639 (unsigned int)(bs.cur.b - p->code), 4640 (unsigned int)bs.buf_len); 4641 return -EIO; 4642 } 4643 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */ 4644 if (likely(bits < 64)) 4645 look_ahead >>= bits; 4646 else 4647 look_ahead = 0; 4648 have -= bits; 4649 4650 bits = bitstream_get_bits(&bs, &tmp, 64 - have); 4651 if (bits < 0) 4652 return -EIO; 4653 look_ahead |= tmp << have; 4654 have += bits; 4655 } 4656 4657 c->bit_offset = s; 4658 bm_xfer_ctx_bit_to_word_offset(c); 4659 4660 return (s != c->bm_bits); 4661 } 4662 4663 /** 4664 * decode_bitmap_c 4665 * 4666 * Return 0 when done, 1 when another iteration is needed, and a negative error 4667 * code upon failure. 4668 */ 4669 static int 4670 decode_bitmap_c(struct drbd_peer_device *peer_device, 4671 struct p_compressed_bm *p, 4672 struct bm_xfer_ctx *c, 4673 unsigned int len) 4674 { 4675 if (dcbp_get_code(p) == RLE_VLI_Bits) 4676 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p)); 4677 4678 /* other variants had been implemented for evaluation, 4679 * but have been dropped as this one turned out to be "best" 4680 * during all our tests. */ 4681 4682 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding); 4683 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 4684 return -EIO; 4685 } 4686 4687 void INFO_bm_xfer_stats(struct drbd_device *device, 4688 const char *direction, struct bm_xfer_ctx *c) 4689 { 4690 /* what would it take to transfer it "plaintext" */ 4691 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection); 4692 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size; 4693 unsigned int plain = 4694 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) + 4695 c->bm_words * sizeof(unsigned long); 4696 unsigned int total = c->bytes[0] + c->bytes[1]; 4697 unsigned int r; 4698 4699 /* total can not be zero. but just in case: */ 4700 if (total == 0) 4701 return; 4702 4703 /* don't report if not compressed */ 4704 if (total >= plain) 4705 return; 4706 4707 /* total < plain. check for overflow, still */ 4708 r = (total > UINT_MAX/1000) ? (total / (plain/1000)) 4709 : (1000 * total / plain); 4710 4711 if (r > 1000) 4712 r = 1000; 4713 4714 r = 1000 - r; 4715 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), " 4716 "total %u; compression: %u.%u%%\n", 4717 direction, 4718 c->bytes[1], c->packets[1], 4719 c->bytes[0], c->packets[0], 4720 total, r/10, r % 10); 4721 } 4722 4723 /* Since we are processing the bitfield from lower addresses to higher, 4724 it does not matter if the process it in 32 bit chunks or 64 bit 4725 chunks as long as it is little endian. (Understand it as byte stream, 4726 beginning with the lowest byte...) If we would use big endian 4727 we would need to process it from the highest address to the lowest, 4728 in order to be agnostic to the 32 vs 64 bits issue. 4729 4730 returns 0 on failure, 1 if we successfully received it. */ 4731 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi) 4732 { 4733 struct drbd_peer_device *peer_device; 4734 struct drbd_device *device; 4735 struct bm_xfer_ctx c; 4736 int err; 4737 4738 peer_device = conn_peer_device(connection, pi->vnr); 4739 if (!peer_device) 4740 return -EIO; 4741 device = peer_device->device; 4742 4743 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED); 4744 /* you are supposed to send additional out-of-sync information 4745 * if you actually set bits during this phase */ 4746 4747 c = (struct bm_xfer_ctx) { 4748 .bm_bits = drbd_bm_bits(device), 4749 .bm_words = drbd_bm_words(device), 4750 }; 4751 4752 for(;;) { 4753 if (pi->cmd == P_BITMAP) 4754 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c); 4755 else if (pi->cmd == P_COMPRESSED_BITMAP) { 4756 /* MAYBE: sanity check that we speak proto >= 90, 4757 * and the feature is enabled! */ 4758 struct p_compressed_bm *p = pi->data; 4759 4760 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) { 4761 drbd_err(device, "ReportCBitmap packet too large\n"); 4762 err = -EIO; 4763 goto out; 4764 } 4765 if (pi->size <= sizeof(*p)) { 4766 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size); 4767 err = -EIO; 4768 goto out; 4769 } 4770 err = drbd_recv_all(peer_device->connection, p, pi->size); 4771 if (err) 4772 goto out; 4773 err = decode_bitmap_c(peer_device, p, &c, pi->size); 4774 } else { 4775 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd); 4776 err = -EIO; 4777 goto out; 4778 } 4779 4780 c.packets[pi->cmd == P_BITMAP]++; 4781 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size; 4782 4783 if (err <= 0) { 4784 if (err < 0) 4785 goto out; 4786 break; 4787 } 4788 err = drbd_recv_header(peer_device->connection, pi); 4789 if (err) 4790 goto out; 4791 } 4792 4793 INFO_bm_xfer_stats(device, "receive", &c); 4794 4795 if (device->state.conn == C_WF_BITMAP_T) { 4796 enum drbd_state_rv rv; 4797 4798 err = drbd_send_bitmap(device); 4799 if (err) 4800 goto out; 4801 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */ 4802 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE); 4803 D_ASSERT(device, rv == SS_SUCCESS); 4804 } else if (device->state.conn != C_WF_BITMAP_S) { 4805 /* admin may have requested C_DISCONNECTING, 4806 * other threads may have noticed network errors */ 4807 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n", 4808 drbd_conn_str(device->state.conn)); 4809 } 4810 err = 0; 4811 4812 out: 4813 drbd_bm_unlock(device); 4814 if (!err && device->state.conn == C_WF_BITMAP_S) 4815 drbd_start_resync(device, C_SYNC_SOURCE); 4816 return err; 4817 } 4818 4819 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi) 4820 { 4821 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n", 4822 pi->cmd, pi->size); 4823 4824 return ignore_remaining_packet(connection, pi); 4825 } 4826 4827 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi) 4828 { 4829 /* Make sure we've acked all the TCP data associated 4830 * with the data requests being unplugged */ 4831 drbd_tcp_quickack(connection->data.socket); 4832 4833 return 0; 4834 } 4835 4836 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi) 4837 { 4838 struct drbd_peer_device *peer_device; 4839 struct drbd_device *device; 4840 struct p_block_desc *p = pi->data; 4841 4842 peer_device = conn_peer_device(connection, pi->vnr); 4843 if (!peer_device) 4844 return -EIO; 4845 device = peer_device->device; 4846 4847 switch (device->state.conn) { 4848 case C_WF_SYNC_UUID: 4849 case C_WF_BITMAP_T: 4850 case C_BEHIND: 4851 break; 4852 default: 4853 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n", 4854 drbd_conn_str(device->state.conn)); 4855 } 4856 4857 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize)); 4858 4859 return 0; 4860 } 4861 4862 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi) 4863 { 4864 struct drbd_peer_device *peer_device; 4865 struct p_block_desc *p = pi->data; 4866 struct drbd_device *device; 4867 sector_t sector; 4868 int size, err = 0; 4869 4870 peer_device = conn_peer_device(connection, pi->vnr); 4871 if (!peer_device) 4872 return -EIO; 4873 device = peer_device->device; 4874 4875 sector = be64_to_cpu(p->sector); 4876 size = be32_to_cpu(p->blksize); 4877 4878 dec_rs_pending(device); 4879 4880 if (get_ldev(device)) { 4881 struct drbd_peer_request *peer_req; 4882 const int op = REQ_OP_DISCARD; 4883 4884 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector, 4885 size, 0, GFP_NOIO); 4886 if (!peer_req) { 4887 put_ldev(device); 4888 return -ENOMEM; 4889 } 4890 4891 peer_req->w.cb = e_end_resync_block; 4892 peer_req->submit_jif = jiffies; 4893 peer_req->flags |= EE_IS_TRIM; 4894 4895 spin_lock_irq(&device->resource->req_lock); 4896 list_add_tail(&peer_req->w.list, &device->sync_ee); 4897 spin_unlock_irq(&device->resource->req_lock); 4898 4899 atomic_add(pi->size >> 9, &device->rs_sect_ev); 4900 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR); 4901 4902 if (err) { 4903 spin_lock_irq(&device->resource->req_lock); 4904 list_del(&peer_req->w.list); 4905 spin_unlock_irq(&device->resource->req_lock); 4906 4907 drbd_free_peer_req(device, peer_req); 4908 put_ldev(device); 4909 err = 0; 4910 goto fail; 4911 } 4912 4913 inc_unacked(device); 4914 4915 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(), 4916 as well as drbd_rs_complete_io() */ 4917 } else { 4918 fail: 4919 drbd_rs_complete_io(device, sector); 4920 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER); 4921 } 4922 4923 atomic_add(size >> 9, &device->rs_sect_in); 4924 4925 return err; 4926 } 4927 4928 struct data_cmd { 4929 int expect_payload; 4930 unsigned int pkt_size; 4931 int (*fn)(struct drbd_connection *, struct packet_info *); 4932 }; 4933 4934 static struct data_cmd drbd_cmd_handler[] = { 4935 [P_DATA] = { 1, sizeof(struct p_data), receive_Data }, 4936 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply }, 4937 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } , 4938 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } , 4939 [P_BITMAP] = { 1, 0, receive_bitmap } , 4940 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } , 4941 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote }, 4942 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4943 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4944 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam }, 4945 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam }, 4946 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol }, 4947 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids }, 4948 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes }, 4949 [P_STATE] = { 0, sizeof(struct p_state), receive_state }, 4950 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state }, 4951 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid }, 4952 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4953 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4954 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest }, 4955 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest }, 4956 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip }, 4957 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync }, 4958 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state }, 4959 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol }, 4960 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data }, 4961 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated }, 4962 [P_WSAME] = { 1, sizeof(struct p_wsame), receive_Data }, 4963 }; 4964 4965 static void drbdd(struct drbd_connection *connection) 4966 { 4967 struct packet_info pi; 4968 size_t shs; /* sub header size */ 4969 int err; 4970 4971 while (get_t_state(&connection->receiver) == RUNNING) { 4972 struct data_cmd const *cmd; 4973 4974 drbd_thread_current_set_cpu(&connection->receiver); 4975 update_receiver_timing_details(connection, drbd_recv_header); 4976 if (drbd_recv_header(connection, &pi)) 4977 goto err_out; 4978 4979 cmd = &drbd_cmd_handler[pi.cmd]; 4980 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) { 4981 drbd_err(connection, "Unexpected data packet %s (0x%04x)", 4982 cmdname(pi.cmd), pi.cmd); 4983 goto err_out; 4984 } 4985 4986 shs = cmd->pkt_size; 4987 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME) 4988 shs += sizeof(struct o_qlim); 4989 if (pi.size > shs && !cmd->expect_payload) { 4990 drbd_err(connection, "No payload expected %s l:%d\n", 4991 cmdname(pi.cmd), pi.size); 4992 goto err_out; 4993 } 4994 if (pi.size < shs) { 4995 drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n", 4996 cmdname(pi.cmd), (int)shs, pi.size); 4997 goto err_out; 4998 } 4999 5000 if (shs) { 5001 update_receiver_timing_details(connection, drbd_recv_all_warn); 5002 err = drbd_recv_all_warn(connection, pi.data, shs); 5003 if (err) 5004 goto err_out; 5005 pi.size -= shs; 5006 } 5007 5008 update_receiver_timing_details(connection, cmd->fn); 5009 err = cmd->fn(connection, &pi); 5010 if (err) { 5011 drbd_err(connection, "error receiving %s, e: %d l: %d!\n", 5012 cmdname(pi.cmd), err, pi.size); 5013 goto err_out; 5014 } 5015 } 5016 return; 5017 5018 err_out: 5019 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); 5020 } 5021 5022 static void conn_disconnect(struct drbd_connection *connection) 5023 { 5024 struct drbd_peer_device *peer_device; 5025 enum drbd_conns oc; 5026 int vnr; 5027 5028 if (connection->cstate == C_STANDALONE) 5029 return; 5030 5031 /* We are about to start the cleanup after connection loss. 5032 * Make sure drbd_make_request knows about that. 5033 * Usually we should be in some network failure state already, 5034 * but just in case we are not, we fix it up here. 5035 */ 5036 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5037 5038 /* ack_receiver does not clean up anything. it must not interfere, either */ 5039 drbd_thread_stop(&connection->ack_receiver); 5040 if (connection->ack_sender) { 5041 destroy_workqueue(connection->ack_sender); 5042 connection->ack_sender = NULL; 5043 } 5044 drbd_free_sock(connection); 5045 5046 rcu_read_lock(); 5047 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5048 struct drbd_device *device = peer_device->device; 5049 kref_get(&device->kref); 5050 rcu_read_unlock(); 5051 drbd_disconnected(peer_device); 5052 kref_put(&device->kref, drbd_destroy_device); 5053 rcu_read_lock(); 5054 } 5055 rcu_read_unlock(); 5056 5057 if (!list_empty(&connection->current_epoch->list)) 5058 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n"); 5059 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ 5060 atomic_set(&connection->current_epoch->epoch_size, 0); 5061 connection->send.seen_any_write_yet = false; 5062 5063 drbd_info(connection, "Connection closed\n"); 5064 5065 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN) 5066 conn_try_outdate_peer_async(connection); 5067 5068 spin_lock_irq(&connection->resource->req_lock); 5069 oc = connection->cstate; 5070 if (oc >= C_UNCONNECTED) 5071 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE); 5072 5073 spin_unlock_irq(&connection->resource->req_lock); 5074 5075 if (oc == C_DISCONNECTING) 5076 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD); 5077 } 5078 5079 static int drbd_disconnected(struct drbd_peer_device *peer_device) 5080 { 5081 struct drbd_device *device = peer_device->device; 5082 unsigned int i; 5083 5084 /* wait for current activity to cease. */ 5085 spin_lock_irq(&device->resource->req_lock); 5086 _drbd_wait_ee_list_empty(device, &device->active_ee); 5087 _drbd_wait_ee_list_empty(device, &device->sync_ee); 5088 _drbd_wait_ee_list_empty(device, &device->read_ee); 5089 spin_unlock_irq(&device->resource->req_lock); 5090 5091 /* We do not have data structures that would allow us to 5092 * get the rs_pending_cnt down to 0 again. 5093 * * On C_SYNC_TARGET we do not have any data structures describing 5094 * the pending RSDataRequest's we have sent. 5095 * * On C_SYNC_SOURCE there is no data structure that tracks 5096 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget. 5097 * And no, it is not the sum of the reference counts in the 5098 * resync_LRU. The resync_LRU tracks the whole operation including 5099 * the disk-IO, while the rs_pending_cnt only tracks the blocks 5100 * on the fly. */ 5101 drbd_rs_cancel_all(device); 5102 device->rs_total = 0; 5103 device->rs_failed = 0; 5104 atomic_set(&device->rs_pending_cnt, 0); 5105 wake_up(&device->misc_wait); 5106 5107 del_timer_sync(&device->resync_timer); 5108 resync_timer_fn((unsigned long)device); 5109 5110 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier, 5111 * w_make_resync_request etc. which may still be on the worker queue 5112 * to be "canceled" */ 5113 drbd_flush_workqueue(&peer_device->connection->sender_work); 5114 5115 drbd_finish_peer_reqs(device); 5116 5117 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs() 5118 might have issued a work again. The one before drbd_finish_peer_reqs() is 5119 necessary to reclain net_ee in drbd_finish_peer_reqs(). */ 5120 drbd_flush_workqueue(&peer_device->connection->sender_work); 5121 5122 /* need to do it again, drbd_finish_peer_reqs() may have populated it 5123 * again via drbd_try_clear_on_disk_bm(). */ 5124 drbd_rs_cancel_all(device); 5125 5126 kfree(device->p_uuid); 5127 device->p_uuid = NULL; 5128 5129 if (!drbd_suspended(device)) 5130 tl_clear(peer_device->connection); 5131 5132 drbd_md_sync(device); 5133 5134 if (get_ldev(device)) { 5135 drbd_bitmap_io(device, &drbd_bm_write_copy_pages, 5136 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED); 5137 put_ldev(device); 5138 } 5139 5140 /* tcp_close and release of sendpage pages can be deferred. I don't 5141 * want to use SO_LINGER, because apparently it can be deferred for 5142 * more than 20 seconds (longest time I checked). 5143 * 5144 * Actually we don't care for exactly when the network stack does its 5145 * put_page(), but release our reference on these pages right here. 5146 */ 5147 i = drbd_free_peer_reqs(device, &device->net_ee); 5148 if (i) 5149 drbd_info(device, "net_ee not empty, killed %u entries\n", i); 5150 i = atomic_read(&device->pp_in_use_by_net); 5151 if (i) 5152 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i); 5153 i = atomic_read(&device->pp_in_use); 5154 if (i) 5155 drbd_info(device, "pp_in_use = %d, expected 0\n", i); 5156 5157 D_ASSERT(device, list_empty(&device->read_ee)); 5158 D_ASSERT(device, list_empty(&device->active_ee)); 5159 D_ASSERT(device, list_empty(&device->sync_ee)); 5160 D_ASSERT(device, list_empty(&device->done_ee)); 5161 5162 return 0; 5163 } 5164 5165 /* 5166 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version 5167 * we can agree on is stored in agreed_pro_version. 5168 * 5169 * feature flags and the reserved array should be enough room for future 5170 * enhancements of the handshake protocol, and possible plugins... 5171 * 5172 * for now, they are expected to be zero, but ignored. 5173 */ 5174 static int drbd_send_features(struct drbd_connection *connection) 5175 { 5176 struct drbd_socket *sock; 5177 struct p_connection_features *p; 5178 5179 sock = &connection->data; 5180 p = conn_prepare_command(connection, sock); 5181 if (!p) 5182 return -EIO; 5183 memset(p, 0, sizeof(*p)); 5184 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN); 5185 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX); 5186 p->feature_flags = cpu_to_be32(PRO_FEATURES); 5187 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0); 5188 } 5189 5190 /* 5191 * return values: 5192 * 1 yes, we have a valid connection 5193 * 0 oops, did not work out, please try again 5194 * -1 peer talks different language, 5195 * no point in trying again, please go standalone. 5196 */ 5197 static int drbd_do_features(struct drbd_connection *connection) 5198 { 5199 /* ASSERT current == connection->receiver ... */ 5200 struct p_connection_features *p; 5201 const int expect = sizeof(struct p_connection_features); 5202 struct packet_info pi; 5203 int err; 5204 5205 err = drbd_send_features(connection); 5206 if (err) 5207 return 0; 5208 5209 err = drbd_recv_header(connection, &pi); 5210 if (err) 5211 return 0; 5212 5213 if (pi.cmd != P_CONNECTION_FEATURES) { 5214 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n", 5215 cmdname(pi.cmd), pi.cmd); 5216 return -1; 5217 } 5218 5219 if (pi.size != expect) { 5220 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n", 5221 expect, pi.size); 5222 return -1; 5223 } 5224 5225 p = pi.data; 5226 err = drbd_recv_all_warn(connection, p, expect); 5227 if (err) 5228 return 0; 5229 5230 p->protocol_min = be32_to_cpu(p->protocol_min); 5231 p->protocol_max = be32_to_cpu(p->protocol_max); 5232 if (p->protocol_max == 0) 5233 p->protocol_max = p->protocol_min; 5234 5235 if (PRO_VERSION_MAX < p->protocol_min || 5236 PRO_VERSION_MIN > p->protocol_max) 5237 goto incompat; 5238 5239 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max); 5240 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags); 5241 5242 drbd_info(connection, "Handshake successful: " 5243 "Agreed network protocol version %d\n", connection->agreed_pro_version); 5244 5245 drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n", 5246 connection->agreed_features, 5247 connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "", 5248 connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "", 5249 connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : 5250 connection->agreed_features ? "" : " none"); 5251 5252 return 1; 5253 5254 incompat: 5255 drbd_err(connection, "incompatible DRBD dialects: " 5256 "I support %d-%d, peer supports %d-%d\n", 5257 PRO_VERSION_MIN, PRO_VERSION_MAX, 5258 p->protocol_min, p->protocol_max); 5259 return -1; 5260 } 5261 5262 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE) 5263 static int drbd_do_auth(struct drbd_connection *connection) 5264 { 5265 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n"); 5266 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n"); 5267 return -1; 5268 } 5269 #else 5270 #define CHALLENGE_LEN 64 5271 5272 /* Return value: 5273 1 - auth succeeded, 5274 0 - failed, try again (network error), 5275 -1 - auth failed, don't try again. 5276 */ 5277 5278 static int drbd_do_auth(struct drbd_connection *connection) 5279 { 5280 struct drbd_socket *sock; 5281 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */ 5282 char *response = NULL; 5283 char *right_response = NULL; 5284 char *peers_ch = NULL; 5285 unsigned int key_len; 5286 char secret[SHARED_SECRET_MAX]; /* 64 byte */ 5287 unsigned int resp_size; 5288 SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm); 5289 struct packet_info pi; 5290 struct net_conf *nc; 5291 int err, rv; 5292 5293 /* FIXME: Put the challenge/response into the preallocated socket buffer. */ 5294 5295 rcu_read_lock(); 5296 nc = rcu_dereference(connection->net_conf); 5297 key_len = strlen(nc->shared_secret); 5298 memcpy(secret, nc->shared_secret, key_len); 5299 rcu_read_unlock(); 5300 5301 desc->tfm = connection->cram_hmac_tfm; 5302 desc->flags = 0; 5303 5304 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len); 5305 if (rv) { 5306 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv); 5307 rv = -1; 5308 goto fail; 5309 } 5310 5311 get_random_bytes(my_challenge, CHALLENGE_LEN); 5312 5313 sock = &connection->data; 5314 if (!conn_prepare_command(connection, sock)) { 5315 rv = 0; 5316 goto fail; 5317 } 5318 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0, 5319 my_challenge, CHALLENGE_LEN); 5320 if (!rv) 5321 goto fail; 5322 5323 err = drbd_recv_header(connection, &pi); 5324 if (err) { 5325 rv = 0; 5326 goto fail; 5327 } 5328 5329 if (pi.cmd != P_AUTH_CHALLENGE) { 5330 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n", 5331 cmdname(pi.cmd), pi.cmd); 5332 rv = 0; 5333 goto fail; 5334 } 5335 5336 if (pi.size > CHALLENGE_LEN * 2) { 5337 drbd_err(connection, "expected AuthChallenge payload too big.\n"); 5338 rv = -1; 5339 goto fail; 5340 } 5341 5342 if (pi.size < CHALLENGE_LEN) { 5343 drbd_err(connection, "AuthChallenge payload too small.\n"); 5344 rv = -1; 5345 goto fail; 5346 } 5347 5348 peers_ch = kmalloc(pi.size, GFP_NOIO); 5349 if (peers_ch == NULL) { 5350 drbd_err(connection, "kmalloc of peers_ch failed\n"); 5351 rv = -1; 5352 goto fail; 5353 } 5354 5355 err = drbd_recv_all_warn(connection, peers_ch, pi.size); 5356 if (err) { 5357 rv = 0; 5358 goto fail; 5359 } 5360 5361 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) { 5362 drbd_err(connection, "Peer presented the same challenge!\n"); 5363 rv = -1; 5364 goto fail; 5365 } 5366 5367 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm); 5368 response = kmalloc(resp_size, GFP_NOIO); 5369 if (response == NULL) { 5370 drbd_err(connection, "kmalloc of response failed\n"); 5371 rv = -1; 5372 goto fail; 5373 } 5374 5375 rv = crypto_shash_digest(desc, peers_ch, pi.size, response); 5376 if (rv) { 5377 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5378 rv = -1; 5379 goto fail; 5380 } 5381 5382 if (!conn_prepare_command(connection, sock)) { 5383 rv = 0; 5384 goto fail; 5385 } 5386 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0, 5387 response, resp_size); 5388 if (!rv) 5389 goto fail; 5390 5391 err = drbd_recv_header(connection, &pi); 5392 if (err) { 5393 rv = 0; 5394 goto fail; 5395 } 5396 5397 if (pi.cmd != P_AUTH_RESPONSE) { 5398 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n", 5399 cmdname(pi.cmd), pi.cmd); 5400 rv = 0; 5401 goto fail; 5402 } 5403 5404 if (pi.size != resp_size) { 5405 drbd_err(connection, "expected AuthResponse payload of wrong size\n"); 5406 rv = 0; 5407 goto fail; 5408 } 5409 5410 err = drbd_recv_all_warn(connection, response , resp_size); 5411 if (err) { 5412 rv = 0; 5413 goto fail; 5414 } 5415 5416 right_response = kmalloc(resp_size, GFP_NOIO); 5417 if (right_response == NULL) { 5418 drbd_err(connection, "kmalloc of right_response failed\n"); 5419 rv = -1; 5420 goto fail; 5421 } 5422 5423 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN, 5424 right_response); 5425 if (rv) { 5426 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv); 5427 rv = -1; 5428 goto fail; 5429 } 5430 5431 rv = !memcmp(response, right_response, resp_size); 5432 5433 if (rv) 5434 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n", 5435 resp_size); 5436 else 5437 rv = -1; 5438 5439 fail: 5440 kfree(peers_ch); 5441 kfree(response); 5442 kfree(right_response); 5443 shash_desc_zero(desc); 5444 5445 return rv; 5446 } 5447 #endif 5448 5449 int drbd_receiver(struct drbd_thread *thi) 5450 { 5451 struct drbd_connection *connection = thi->connection; 5452 int h; 5453 5454 drbd_info(connection, "receiver (re)started\n"); 5455 5456 do { 5457 h = conn_connect(connection); 5458 if (h == 0) { 5459 conn_disconnect(connection); 5460 schedule_timeout_interruptible(HZ); 5461 } 5462 if (h == -1) { 5463 drbd_warn(connection, "Discarding network configuration.\n"); 5464 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5465 } 5466 } while (h == 0); 5467 5468 if (h > 0) 5469 drbdd(connection); 5470 5471 conn_disconnect(connection); 5472 5473 drbd_info(connection, "receiver terminated\n"); 5474 return 0; 5475 } 5476 5477 /* ********* acknowledge sender ******** */ 5478 5479 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5480 { 5481 struct p_req_state_reply *p = pi->data; 5482 int retcode = be32_to_cpu(p->retcode); 5483 5484 if (retcode >= SS_SUCCESS) { 5485 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags); 5486 } else { 5487 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags); 5488 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n", 5489 drbd_set_st_err_str(retcode), retcode); 5490 } 5491 wake_up(&connection->ping_wait); 5492 5493 return 0; 5494 } 5495 5496 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi) 5497 { 5498 struct drbd_peer_device *peer_device; 5499 struct drbd_device *device; 5500 struct p_req_state_reply *p = pi->data; 5501 int retcode = be32_to_cpu(p->retcode); 5502 5503 peer_device = conn_peer_device(connection, pi->vnr); 5504 if (!peer_device) 5505 return -EIO; 5506 device = peer_device->device; 5507 5508 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) { 5509 D_ASSERT(device, connection->agreed_pro_version < 100); 5510 return got_conn_RqSReply(connection, pi); 5511 } 5512 5513 if (retcode >= SS_SUCCESS) { 5514 set_bit(CL_ST_CHG_SUCCESS, &device->flags); 5515 } else { 5516 set_bit(CL_ST_CHG_FAIL, &device->flags); 5517 drbd_err(device, "Requested state change failed by peer: %s (%d)\n", 5518 drbd_set_st_err_str(retcode), retcode); 5519 } 5520 wake_up(&device->state_wait); 5521 5522 return 0; 5523 } 5524 5525 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi) 5526 { 5527 return drbd_send_ping_ack(connection); 5528 5529 } 5530 5531 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi) 5532 { 5533 /* restore idle timeout */ 5534 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ; 5535 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags)) 5536 wake_up(&connection->ping_wait); 5537 5538 return 0; 5539 } 5540 5541 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi) 5542 { 5543 struct drbd_peer_device *peer_device; 5544 struct drbd_device *device; 5545 struct p_block_ack *p = pi->data; 5546 sector_t sector = be64_to_cpu(p->sector); 5547 int blksize = be32_to_cpu(p->blksize); 5548 5549 peer_device = conn_peer_device(connection, pi->vnr); 5550 if (!peer_device) 5551 return -EIO; 5552 device = peer_device->device; 5553 5554 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89); 5555 5556 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5557 5558 if (get_ldev(device)) { 5559 drbd_rs_complete_io(device, sector); 5560 drbd_set_in_sync(device, sector, blksize); 5561 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */ 5562 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT); 5563 put_ldev(device); 5564 } 5565 dec_rs_pending(device); 5566 atomic_add(blksize >> 9, &device->rs_sect_in); 5567 5568 return 0; 5569 } 5570 5571 static int 5572 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector, 5573 struct rb_root *root, const char *func, 5574 enum drbd_req_event what, bool missing_ok) 5575 { 5576 struct drbd_request *req; 5577 struct bio_and_error m; 5578 5579 spin_lock_irq(&device->resource->req_lock); 5580 req = find_request(device, root, id, sector, missing_ok, func); 5581 if (unlikely(!req)) { 5582 spin_unlock_irq(&device->resource->req_lock); 5583 return -EIO; 5584 } 5585 __req_mod(req, what, &m); 5586 spin_unlock_irq(&device->resource->req_lock); 5587 5588 if (m.bio) 5589 complete_master_bio(device, &m); 5590 return 0; 5591 } 5592 5593 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi) 5594 { 5595 struct drbd_peer_device *peer_device; 5596 struct drbd_device *device; 5597 struct p_block_ack *p = pi->data; 5598 sector_t sector = be64_to_cpu(p->sector); 5599 int blksize = be32_to_cpu(p->blksize); 5600 enum drbd_req_event what; 5601 5602 peer_device = conn_peer_device(connection, pi->vnr); 5603 if (!peer_device) 5604 return -EIO; 5605 device = peer_device->device; 5606 5607 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5608 5609 if (p->block_id == ID_SYNCER) { 5610 drbd_set_in_sync(device, sector, blksize); 5611 dec_rs_pending(device); 5612 return 0; 5613 } 5614 switch (pi->cmd) { 5615 case P_RS_WRITE_ACK: 5616 what = WRITE_ACKED_BY_PEER_AND_SIS; 5617 break; 5618 case P_WRITE_ACK: 5619 what = WRITE_ACKED_BY_PEER; 5620 break; 5621 case P_RECV_ACK: 5622 what = RECV_ACKED_BY_PEER; 5623 break; 5624 case P_SUPERSEDED: 5625 what = CONFLICT_RESOLVED; 5626 break; 5627 case P_RETRY_WRITE: 5628 what = POSTPONE_WRITE; 5629 break; 5630 default: 5631 BUG(); 5632 } 5633 5634 return validate_req_change_req_state(device, p->block_id, sector, 5635 &device->write_requests, __func__, 5636 what, false); 5637 } 5638 5639 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi) 5640 { 5641 struct drbd_peer_device *peer_device; 5642 struct drbd_device *device; 5643 struct p_block_ack *p = pi->data; 5644 sector_t sector = be64_to_cpu(p->sector); 5645 int size = be32_to_cpu(p->blksize); 5646 int err; 5647 5648 peer_device = conn_peer_device(connection, pi->vnr); 5649 if (!peer_device) 5650 return -EIO; 5651 device = peer_device->device; 5652 5653 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5654 5655 if (p->block_id == ID_SYNCER) { 5656 dec_rs_pending(device); 5657 drbd_rs_failed_io(device, sector, size); 5658 return 0; 5659 } 5660 5661 err = validate_req_change_req_state(device, p->block_id, sector, 5662 &device->write_requests, __func__, 5663 NEG_ACKED, true); 5664 if (err) { 5665 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs. 5666 The master bio might already be completed, therefore the 5667 request is no longer in the collision hash. */ 5668 /* In Protocol B we might already have got a P_RECV_ACK 5669 but then get a P_NEG_ACK afterwards. */ 5670 drbd_set_out_of_sync(device, sector, size); 5671 } 5672 return 0; 5673 } 5674 5675 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi) 5676 { 5677 struct drbd_peer_device *peer_device; 5678 struct drbd_device *device; 5679 struct p_block_ack *p = pi->data; 5680 sector_t sector = be64_to_cpu(p->sector); 5681 5682 peer_device = conn_peer_device(connection, pi->vnr); 5683 if (!peer_device) 5684 return -EIO; 5685 device = peer_device->device; 5686 5687 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5688 5689 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n", 5690 (unsigned long long)sector, be32_to_cpu(p->blksize)); 5691 5692 return validate_req_change_req_state(device, p->block_id, sector, 5693 &device->read_requests, __func__, 5694 NEG_ACKED, false); 5695 } 5696 5697 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi) 5698 { 5699 struct drbd_peer_device *peer_device; 5700 struct drbd_device *device; 5701 sector_t sector; 5702 int size; 5703 struct p_block_ack *p = pi->data; 5704 5705 peer_device = conn_peer_device(connection, pi->vnr); 5706 if (!peer_device) 5707 return -EIO; 5708 device = peer_device->device; 5709 5710 sector = be64_to_cpu(p->sector); 5711 size = be32_to_cpu(p->blksize); 5712 5713 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5714 5715 dec_rs_pending(device); 5716 5717 if (get_ldev_if_state(device, D_FAILED)) { 5718 drbd_rs_complete_io(device, sector); 5719 switch (pi->cmd) { 5720 case P_NEG_RS_DREPLY: 5721 drbd_rs_failed_io(device, sector, size); 5722 case P_RS_CANCEL: 5723 break; 5724 default: 5725 BUG(); 5726 } 5727 put_ldev(device); 5728 } 5729 5730 return 0; 5731 } 5732 5733 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi) 5734 { 5735 struct p_barrier_ack *p = pi->data; 5736 struct drbd_peer_device *peer_device; 5737 int vnr; 5738 5739 tl_release(connection, p->barrier, be32_to_cpu(p->set_size)); 5740 5741 rcu_read_lock(); 5742 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 5743 struct drbd_device *device = peer_device->device; 5744 5745 if (device->state.conn == C_AHEAD && 5746 atomic_read(&device->ap_in_flight) == 0 && 5747 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) { 5748 device->start_resync_timer.expires = jiffies + HZ; 5749 add_timer(&device->start_resync_timer); 5750 } 5751 } 5752 rcu_read_unlock(); 5753 5754 return 0; 5755 } 5756 5757 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi) 5758 { 5759 struct drbd_peer_device *peer_device; 5760 struct drbd_device *device; 5761 struct p_block_ack *p = pi->data; 5762 struct drbd_device_work *dw; 5763 sector_t sector; 5764 int size; 5765 5766 peer_device = conn_peer_device(connection, pi->vnr); 5767 if (!peer_device) 5768 return -EIO; 5769 device = peer_device->device; 5770 5771 sector = be64_to_cpu(p->sector); 5772 size = be32_to_cpu(p->blksize); 5773 5774 update_peer_seq(peer_device, be32_to_cpu(p->seq_num)); 5775 5776 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC) 5777 drbd_ov_out_of_sync_found(device, sector, size); 5778 else 5779 ov_out_of_sync_print(device); 5780 5781 if (!get_ldev(device)) 5782 return 0; 5783 5784 drbd_rs_complete_io(device, sector); 5785 dec_rs_pending(device); 5786 5787 --device->ov_left; 5788 5789 /* let's advance progress step marks only for every other megabyte */ 5790 if ((device->ov_left & 0x200) == 0x200) 5791 drbd_advance_rs_marks(device, device->ov_left); 5792 5793 if (device->ov_left == 0) { 5794 dw = kmalloc(sizeof(*dw), GFP_NOIO); 5795 if (dw) { 5796 dw->w.cb = w_ov_finished; 5797 dw->device = device; 5798 drbd_queue_work(&peer_device->connection->sender_work, &dw->w); 5799 } else { 5800 drbd_err(device, "kmalloc(dw) failed."); 5801 ov_out_of_sync_print(device); 5802 drbd_resync_finished(device); 5803 } 5804 } 5805 put_ldev(device); 5806 return 0; 5807 } 5808 5809 static int got_skip(struct drbd_connection *connection, struct packet_info *pi) 5810 { 5811 return 0; 5812 } 5813 5814 struct meta_sock_cmd { 5815 size_t pkt_size; 5816 int (*fn)(struct drbd_connection *connection, struct packet_info *); 5817 }; 5818 5819 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout) 5820 { 5821 long t; 5822 struct net_conf *nc; 5823 5824 rcu_read_lock(); 5825 nc = rcu_dereference(connection->net_conf); 5826 t = ping_timeout ? nc->ping_timeo : nc->ping_int; 5827 rcu_read_unlock(); 5828 5829 t *= HZ; 5830 if (ping_timeout) 5831 t /= 10; 5832 5833 connection->meta.socket->sk->sk_rcvtimeo = t; 5834 } 5835 5836 static void set_ping_timeout(struct drbd_connection *connection) 5837 { 5838 set_rcvtimeo(connection, 1); 5839 } 5840 5841 static void set_idle_timeout(struct drbd_connection *connection) 5842 { 5843 set_rcvtimeo(connection, 0); 5844 } 5845 5846 static struct meta_sock_cmd ack_receiver_tbl[] = { 5847 [P_PING] = { 0, got_Ping }, 5848 [P_PING_ACK] = { 0, got_PingAck }, 5849 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5850 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5851 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck }, 5852 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck }, 5853 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck }, 5854 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply }, 5855 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5856 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult }, 5857 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck }, 5858 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply }, 5859 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync }, 5860 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip }, 5861 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply }, 5862 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply }, 5863 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck }, 5864 }; 5865 5866 int drbd_ack_receiver(struct drbd_thread *thi) 5867 { 5868 struct drbd_connection *connection = thi->connection; 5869 struct meta_sock_cmd *cmd = NULL; 5870 struct packet_info pi; 5871 unsigned long pre_recv_jif; 5872 int rv; 5873 void *buf = connection->meta.rbuf; 5874 int received = 0; 5875 unsigned int header_size = drbd_header_size(connection); 5876 int expect = header_size; 5877 bool ping_timeout_active = false; 5878 struct sched_param param = { .sched_priority = 2 }; 5879 5880 rv = sched_setscheduler(current, SCHED_RR, ¶m); 5881 if (rv < 0) 5882 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv); 5883 5884 while (get_t_state(thi) == RUNNING) { 5885 drbd_thread_current_set_cpu(thi); 5886 5887 conn_reclaim_net_peer_reqs(connection); 5888 5889 if (test_and_clear_bit(SEND_PING, &connection->flags)) { 5890 if (drbd_send_ping(connection)) { 5891 drbd_err(connection, "drbd_send_ping has failed\n"); 5892 goto reconnect; 5893 } 5894 set_ping_timeout(connection); 5895 ping_timeout_active = true; 5896 } 5897 5898 pre_recv_jif = jiffies; 5899 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0); 5900 5901 /* Note: 5902 * -EINTR (on meta) we got a signal 5903 * -EAGAIN (on meta) rcvtimeo expired 5904 * -ECONNRESET other side closed the connection 5905 * -ERESTARTSYS (on data) we got a signal 5906 * rv < 0 other than above: unexpected error! 5907 * rv == expected: full header or command 5908 * rv < expected: "woken" by signal during receive 5909 * rv == 0 : "connection shut down by peer" 5910 */ 5911 if (likely(rv > 0)) { 5912 received += rv; 5913 buf += rv; 5914 } else if (rv == 0) { 5915 if (test_bit(DISCONNECT_SENT, &connection->flags)) { 5916 long t; 5917 rcu_read_lock(); 5918 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10; 5919 rcu_read_unlock(); 5920 5921 t = wait_event_timeout(connection->ping_wait, 5922 connection->cstate < C_WF_REPORT_PARAMS, 5923 t); 5924 if (t) 5925 break; 5926 } 5927 drbd_err(connection, "meta connection shut down by peer.\n"); 5928 goto reconnect; 5929 } else if (rv == -EAGAIN) { 5930 /* If the data socket received something meanwhile, 5931 * that is good enough: peer is still alive. */ 5932 if (time_after(connection->last_received, pre_recv_jif)) 5933 continue; 5934 if (ping_timeout_active) { 5935 drbd_err(connection, "PingAck did not arrive in time.\n"); 5936 goto reconnect; 5937 } 5938 set_bit(SEND_PING, &connection->flags); 5939 continue; 5940 } else if (rv == -EINTR) { 5941 /* maybe drbd_thread_stop(): the while condition will notice. 5942 * maybe woken for send_ping: we'll send a ping above, 5943 * and change the rcvtimeo */ 5944 flush_signals(current); 5945 continue; 5946 } else { 5947 drbd_err(connection, "sock_recvmsg returned %d\n", rv); 5948 goto reconnect; 5949 } 5950 5951 if (received == expect && cmd == NULL) { 5952 if (decode_header(connection, connection->meta.rbuf, &pi)) 5953 goto reconnect; 5954 cmd = &ack_receiver_tbl[pi.cmd]; 5955 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) { 5956 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n", 5957 cmdname(pi.cmd), pi.cmd); 5958 goto disconnect; 5959 } 5960 expect = header_size + cmd->pkt_size; 5961 if (pi.size != expect - header_size) { 5962 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n", 5963 pi.cmd, pi.size); 5964 goto reconnect; 5965 } 5966 } 5967 if (received == expect) { 5968 bool err; 5969 5970 err = cmd->fn(connection, &pi); 5971 if (err) { 5972 drbd_err(connection, "%pf failed\n", cmd->fn); 5973 goto reconnect; 5974 } 5975 5976 connection->last_received = jiffies; 5977 5978 if (cmd == &ack_receiver_tbl[P_PING_ACK]) { 5979 set_idle_timeout(connection); 5980 ping_timeout_active = false; 5981 } 5982 5983 buf = connection->meta.rbuf; 5984 received = 0; 5985 expect = header_size; 5986 cmd = NULL; 5987 } 5988 } 5989 5990 if (0) { 5991 reconnect: 5992 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 5993 conn_md_sync(connection); 5994 } 5995 if (0) { 5996 disconnect: 5997 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 5998 } 5999 6000 drbd_info(connection, "ack_receiver terminated\n"); 6001 6002 return 0; 6003 } 6004 6005 void drbd_send_acks_wf(struct work_struct *ws) 6006 { 6007 struct drbd_peer_device *peer_device = 6008 container_of(ws, struct drbd_peer_device, send_acks_work); 6009 struct drbd_connection *connection = peer_device->connection; 6010 struct drbd_device *device = peer_device->device; 6011 struct net_conf *nc; 6012 int tcp_cork, err; 6013 6014 rcu_read_lock(); 6015 nc = rcu_dereference(connection->net_conf); 6016 tcp_cork = nc->tcp_cork; 6017 rcu_read_unlock(); 6018 6019 if (tcp_cork) 6020 drbd_tcp_cork(connection->meta.socket); 6021 6022 err = drbd_finish_peer_reqs(device); 6023 kref_put(&device->kref, drbd_destroy_device); 6024 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the 6025 struct work_struct send_acks_work alive, which is in the peer_device object */ 6026 6027 if (err) { 6028 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 6029 return; 6030 } 6031 6032 if (tcp_cork) 6033 drbd_tcp_uncork(connection->meta.socket); 6034 6035 return; 6036 } 6037