1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 #include <trace/events/sock.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "memory.h" 63 #include "config.h" 64 65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) 66 #define DLM_MAX_PROCESS_BUFFERS 24 67 #define NEEDED_RMEM (4*1024*1024) 68 69 struct connection { 70 struct socket *sock; /* NULL if not connected */ 71 uint32_t nodeid; /* So we know who we are in the list */ 72 /* this semaphore is used to allow parallel recv/send in read 73 * lock mode. When we release a sock we need to held the write lock. 74 * 75 * However this is locking code and not nice. When we remove the 76 * othercon handling we can look into other mechanism to synchronize 77 * io handling to call sock_release() at the right time. 78 */ 79 struct rw_semaphore sock_lock; 80 unsigned long flags; 81 #define CF_APP_LIMITED 0 82 #define CF_RECV_PENDING 1 83 #define CF_SEND_PENDING 2 84 #define CF_RECV_INTR 3 85 #define CF_IO_STOP 4 86 #define CF_IS_OTHERCON 5 87 struct list_head writequeue; /* List of outgoing writequeue_entries */ 88 spinlock_t writequeue_lock; 89 int retries; 90 struct hlist_node list; 91 /* due some connect()/accept() races we currently have this cross over 92 * connection attempt second connection for one node. 93 * 94 * There is a solution to avoid the race by introducing a connect 95 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 96 * connect. Otherside can connect but will only be considered that 97 * the other side wants to have a reconnect. 98 * 99 * However changing to this behaviour will break backwards compatible. 100 * In a DLM protocol major version upgrade we should remove this! 101 */ 102 struct connection *othercon; 103 struct work_struct rwork; /* receive worker */ 104 struct work_struct swork; /* send worker */ 105 wait_queue_head_t shutdown_wait; 106 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 107 int rx_leftover; 108 int mark; 109 int addr_count; 110 int curr_addr_index; 111 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 112 spinlock_t addrs_lock; 113 struct rcu_head rcu; 114 }; 115 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 116 117 struct listen_connection { 118 struct socket *sock; 119 struct work_struct rwork; 120 }; 121 122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 124 125 /* An entry waiting to be sent */ 126 struct writequeue_entry { 127 struct list_head list; 128 struct page *page; 129 int offset; 130 int len; 131 int end; 132 int users; 133 bool dirty; 134 struct connection *con; 135 struct list_head msgs; 136 struct kref ref; 137 }; 138 139 struct dlm_msg { 140 struct writequeue_entry *entry; 141 struct dlm_msg *orig_msg; 142 bool retransmit; 143 void *ppc; 144 int len; 145 int idx; /* new()/commit() idx exchange */ 146 147 struct list_head list; 148 struct kref ref; 149 }; 150 151 struct processqueue_entry { 152 unsigned char *buf; 153 int nodeid; 154 int buflen; 155 156 struct list_head list; 157 }; 158 159 struct dlm_proto_ops { 160 bool try_new_addr; 161 const char *name; 162 int proto; 163 int how; 164 165 void (*sockopts)(struct socket *sock); 166 int (*bind)(struct socket *sock); 167 int (*listen_validate)(void); 168 void (*listen_sockopts)(struct socket *sock); 169 int (*listen_bind)(struct socket *sock); 170 }; 171 172 static struct listen_sock_callbacks { 173 void (*sk_error_report)(struct sock *); 174 void (*sk_data_ready)(struct sock *); 175 void (*sk_state_change)(struct sock *); 176 void (*sk_write_space)(struct sock *); 177 } listen_sock; 178 179 static struct listen_connection listen_con; 180 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 181 static int dlm_local_count; 182 183 /* Work queues */ 184 static struct workqueue_struct *io_workqueue; 185 static struct workqueue_struct *process_workqueue; 186 187 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 188 static DEFINE_SPINLOCK(connections_lock); 189 DEFINE_STATIC_SRCU(connections_srcu); 190 191 static const struct dlm_proto_ops *dlm_proto_ops; 192 193 #define DLM_IO_SUCCESS 0 194 #define DLM_IO_END 1 195 #define DLM_IO_EOF 2 196 #define DLM_IO_RESCHED 3 197 #define DLM_IO_FLUSH 4 198 199 static void process_recv_sockets(struct work_struct *work); 200 static void process_send_sockets(struct work_struct *work); 201 static void process_dlm_messages(struct work_struct *work); 202 203 static DECLARE_WORK(process_work, process_dlm_messages); 204 static DEFINE_SPINLOCK(processqueue_lock); 205 static bool process_dlm_messages_pending; 206 static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); 207 static atomic_t processqueue_count; 208 static LIST_HEAD(processqueue); 209 210 bool dlm_lowcomms_is_running(void) 211 { 212 return !!listen_con.sock; 213 } 214 215 static void lowcomms_queue_swork(struct connection *con) 216 { 217 assert_spin_locked(&con->writequeue_lock); 218 219 if (!test_bit(CF_IO_STOP, &con->flags) && 220 !test_bit(CF_APP_LIMITED, &con->flags) && 221 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 222 queue_work(io_workqueue, &con->swork); 223 } 224 225 static void lowcomms_queue_rwork(struct connection *con) 226 { 227 #ifdef CONFIG_LOCKDEP 228 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 229 #endif 230 231 if (!test_bit(CF_IO_STOP, &con->flags) && 232 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 233 queue_work(io_workqueue, &con->rwork); 234 } 235 236 static void writequeue_entry_ctor(void *data) 237 { 238 struct writequeue_entry *entry = data; 239 240 INIT_LIST_HEAD(&entry->msgs); 241 } 242 243 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 244 { 245 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 246 0, 0, writequeue_entry_ctor); 247 } 248 249 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 250 { 251 return KMEM_CACHE(dlm_msg, 0); 252 } 253 254 /* need to held writequeue_lock */ 255 static struct writequeue_entry *con_next_wq(struct connection *con) 256 { 257 struct writequeue_entry *e; 258 259 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 260 list); 261 /* if len is zero nothing is to send, if there are users filling 262 * buffers we wait until the users are done so we can send more. 263 */ 264 if (!e || e->users || e->len == 0) 265 return NULL; 266 267 return e; 268 } 269 270 static struct connection *__find_con(int nodeid, int r) 271 { 272 struct connection *con; 273 274 hlist_for_each_entry_srcu(con, &connection_hash[r], list, 275 srcu_read_lock_held(&connections_srcu)) { 276 if (con->nodeid == nodeid) 277 return con; 278 } 279 280 return NULL; 281 } 282 283 static void dlm_con_init(struct connection *con, int nodeid) 284 { 285 con->nodeid = nodeid; 286 init_rwsem(&con->sock_lock); 287 INIT_LIST_HEAD(&con->writequeue); 288 spin_lock_init(&con->writequeue_lock); 289 INIT_WORK(&con->swork, process_send_sockets); 290 INIT_WORK(&con->rwork, process_recv_sockets); 291 spin_lock_init(&con->addrs_lock); 292 init_waitqueue_head(&con->shutdown_wait); 293 } 294 295 /* 296 * If 'allocation' is zero then we don't attempt to create a new 297 * connection structure for this node. 298 */ 299 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 300 { 301 struct connection *con, *tmp; 302 int r; 303 304 r = nodeid_hash(nodeid); 305 con = __find_con(nodeid, r); 306 if (con || !alloc) 307 return con; 308 309 con = kzalloc_obj(*con, alloc); 310 if (!con) 311 return NULL; 312 313 dlm_con_init(con, nodeid); 314 315 spin_lock(&connections_lock); 316 /* Because multiple workqueues/threads calls this function it can 317 * race on multiple cpu's. Instead of locking hot path __find_con() 318 * we just check in rare cases of recently added nodes again 319 * under protection of connections_lock. If this is the case we 320 * abort our connection creation and return the existing connection. 321 */ 322 tmp = __find_con(nodeid, r); 323 if (tmp) { 324 spin_unlock(&connections_lock); 325 kfree(con); 326 return tmp; 327 } 328 329 hlist_add_head_rcu(&con->list, &connection_hash[r]); 330 spin_unlock(&connections_lock); 331 332 return con; 333 } 334 335 static int addr_compare(const struct sockaddr_storage *x, 336 const struct sockaddr_storage *y) 337 { 338 switch (x->ss_family) { 339 case AF_INET: { 340 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 341 struct sockaddr_in *siny = (struct sockaddr_in *)y; 342 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 343 return 0; 344 if (sinx->sin_port != siny->sin_port) 345 return 0; 346 break; 347 } 348 case AF_INET6: { 349 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 350 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 351 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 352 return 0; 353 if (sinx->sin6_port != siny->sin6_port) 354 return 0; 355 break; 356 } 357 default: 358 return 0; 359 } 360 return 1; 361 } 362 363 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 364 struct sockaddr *sa_out, bool try_new_addr, 365 unsigned int *mark) 366 { 367 struct sockaddr_storage sas; 368 struct connection *con; 369 int idx; 370 371 if (!dlm_local_count) 372 return -1; 373 374 idx = srcu_read_lock(&connections_srcu); 375 con = nodeid2con(nodeid, 0); 376 if (!con) { 377 srcu_read_unlock(&connections_srcu, idx); 378 return -ENOENT; 379 } 380 381 spin_lock(&con->addrs_lock); 382 if (!con->addr_count) { 383 spin_unlock(&con->addrs_lock); 384 srcu_read_unlock(&connections_srcu, idx); 385 return -ENOENT; 386 } 387 388 memcpy(&sas, &con->addr[con->curr_addr_index], 389 sizeof(struct sockaddr_storage)); 390 391 if (try_new_addr) { 392 con->curr_addr_index++; 393 if (con->curr_addr_index == con->addr_count) 394 con->curr_addr_index = 0; 395 } 396 397 *mark = con->mark; 398 spin_unlock(&con->addrs_lock); 399 400 if (sas_out) 401 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 402 403 if (!sa_out) { 404 srcu_read_unlock(&connections_srcu, idx); 405 return 0; 406 } 407 408 if (dlm_local_addr[0].ss_family == AF_INET) { 409 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 410 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 411 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 412 } else { 413 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 414 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 415 ret6->sin6_addr = in6->sin6_addr; 416 } 417 418 srcu_read_unlock(&connections_srcu, idx); 419 return 0; 420 } 421 422 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 423 unsigned int *mark) 424 { 425 struct connection *con; 426 int i, idx, addr_i; 427 428 idx = srcu_read_lock(&connections_srcu); 429 for (i = 0; i < CONN_HASH_SIZE; i++) { 430 hlist_for_each_entry_srcu(con, &connection_hash[i], list, 431 srcu_read_lock_held(&connections_srcu)) { 432 WARN_ON_ONCE(!con->addr_count); 433 434 spin_lock(&con->addrs_lock); 435 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 436 if (addr_compare(&con->addr[addr_i], addr)) { 437 *nodeid = con->nodeid; 438 *mark = con->mark; 439 spin_unlock(&con->addrs_lock); 440 srcu_read_unlock(&connections_srcu, idx); 441 return 0; 442 } 443 } 444 spin_unlock(&con->addrs_lock); 445 } 446 } 447 srcu_read_unlock(&connections_srcu, idx); 448 449 return -ENOENT; 450 } 451 452 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 453 const struct sockaddr_storage *addr) 454 { 455 int i; 456 457 for (i = 0; i < con->addr_count; i++) { 458 if (addr_compare(&con->addr[i], addr)) 459 return true; 460 } 461 462 return false; 463 } 464 465 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) 466 { 467 struct connection *con; 468 bool ret; 469 int idx; 470 471 idx = srcu_read_lock(&connections_srcu); 472 con = nodeid2con(nodeid, GFP_NOFS); 473 if (!con) { 474 srcu_read_unlock(&connections_srcu, idx); 475 return -ENOMEM; 476 } 477 478 spin_lock(&con->addrs_lock); 479 if (!con->addr_count) { 480 memcpy(&con->addr[0], addr, sizeof(*addr)); 481 con->addr_count = 1; 482 con->mark = dlm_config.ci_mark; 483 spin_unlock(&con->addrs_lock); 484 srcu_read_unlock(&connections_srcu, idx); 485 return 0; 486 } 487 488 ret = dlm_lowcomms_con_has_addr(con, addr); 489 if (ret) { 490 spin_unlock(&con->addrs_lock); 491 srcu_read_unlock(&connections_srcu, idx); 492 return -EEXIST; 493 } 494 495 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 496 spin_unlock(&con->addrs_lock); 497 srcu_read_unlock(&connections_srcu, idx); 498 return -ENOSPC; 499 } 500 501 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 502 srcu_read_unlock(&connections_srcu, idx); 503 spin_unlock(&con->addrs_lock); 504 return 0; 505 } 506 507 /* Data available on socket or listen socket received a connect */ 508 static void lowcomms_data_ready(struct sock *sk) 509 { 510 struct connection *con = sock2con(sk); 511 512 trace_sk_data_ready(sk); 513 514 set_bit(CF_RECV_INTR, &con->flags); 515 lowcomms_queue_rwork(con); 516 } 517 518 static void lowcomms_write_space(struct sock *sk) 519 { 520 struct connection *con = sock2con(sk); 521 522 clear_bit(SOCK_NOSPACE, &con->sock->flags); 523 524 spin_lock_bh(&con->writequeue_lock); 525 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 526 con->sock->sk->sk_write_pending--; 527 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 528 } 529 530 lowcomms_queue_swork(con); 531 spin_unlock_bh(&con->writequeue_lock); 532 } 533 534 static void lowcomms_state_change(struct sock *sk) 535 { 536 /* SCTP layer is not calling sk_data_ready when the connection 537 * is done, so we catch the signal through here. 538 */ 539 if (sk->sk_shutdown & RCV_SHUTDOWN) 540 lowcomms_data_ready(sk); 541 } 542 543 static void lowcomms_listen_data_ready(struct sock *sk) 544 { 545 trace_sk_data_ready(sk); 546 547 queue_work(io_workqueue, &listen_con.rwork); 548 } 549 550 int dlm_lowcomms_connect_node(int nodeid) 551 { 552 struct connection *con; 553 int idx; 554 555 idx = srcu_read_lock(&connections_srcu); 556 con = nodeid2con(nodeid, 0); 557 if (WARN_ON_ONCE(!con)) { 558 srcu_read_unlock(&connections_srcu, idx); 559 return -ENOENT; 560 } 561 562 down_read(&con->sock_lock); 563 if (!con->sock) { 564 spin_lock_bh(&con->writequeue_lock); 565 lowcomms_queue_swork(con); 566 spin_unlock_bh(&con->writequeue_lock); 567 } 568 up_read(&con->sock_lock); 569 srcu_read_unlock(&connections_srcu, idx); 570 571 cond_resched(); 572 return 0; 573 } 574 575 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 576 { 577 struct connection *con; 578 int idx; 579 580 idx = srcu_read_lock(&connections_srcu); 581 con = nodeid2con(nodeid, 0); 582 if (!con) { 583 srcu_read_unlock(&connections_srcu, idx); 584 return -ENOENT; 585 } 586 587 spin_lock(&con->addrs_lock); 588 con->mark = mark; 589 spin_unlock(&con->addrs_lock); 590 srcu_read_unlock(&connections_srcu, idx); 591 return 0; 592 } 593 594 static void lowcomms_error_report(struct sock *sk) 595 { 596 struct connection *con = sock2con(sk); 597 struct inet_sock *inet; 598 599 inet = inet_sk(sk); 600 switch (sk->sk_family) { 601 case AF_INET: 602 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 603 "sending to node %d at %pI4, dport %d, " 604 "sk_err=%d/%d\n", dlm_our_nodeid(), 605 con->nodeid, &inet->inet_daddr, 606 ntohs(inet->inet_dport), sk->sk_err, 607 READ_ONCE(sk->sk_err_soft)); 608 break; 609 #if IS_ENABLED(CONFIG_IPV6) 610 case AF_INET6: 611 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 612 "sending to node %d at %pI6c, " 613 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 614 con->nodeid, &sk->sk_v6_daddr, 615 ntohs(inet->inet_dport), sk->sk_err, 616 READ_ONCE(sk->sk_err_soft)); 617 break; 618 #endif 619 default: 620 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 621 "invalid socket family %d set, " 622 "sk_err=%d/%d\n", dlm_our_nodeid(), 623 sk->sk_family, sk->sk_err, 624 READ_ONCE(sk->sk_err_soft)); 625 break; 626 } 627 628 dlm_midcomms_unack_msg_resend(con->nodeid); 629 630 listen_sock.sk_error_report(sk); 631 } 632 633 static void restore_callbacks(struct sock *sk) 634 { 635 #ifdef CONFIG_LOCKDEP 636 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 637 #endif 638 639 sk->sk_user_data = NULL; 640 sk->sk_data_ready = listen_sock.sk_data_ready; 641 sk->sk_state_change = listen_sock.sk_state_change; 642 sk->sk_write_space = listen_sock.sk_write_space; 643 sk->sk_error_report = listen_sock.sk_error_report; 644 } 645 646 /* Make a socket active */ 647 static void add_sock(struct socket *sock, struct connection *con) 648 { 649 struct sock *sk = sock->sk; 650 651 lock_sock(sk); 652 con->sock = sock; 653 654 sk->sk_user_data = con; 655 sk->sk_data_ready = lowcomms_data_ready; 656 sk->sk_write_space = lowcomms_write_space; 657 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 658 sk->sk_state_change = lowcomms_state_change; 659 sk->sk_allocation = GFP_NOFS; 660 sk->sk_use_task_frag = false; 661 sk->sk_error_report = lowcomms_error_report; 662 release_sock(sk); 663 } 664 665 /* Add the port number to an IPv6 or 4 sockaddr and return the address 666 length */ 667 static void make_sockaddr(struct sockaddr_storage *saddr, __be16 port, 668 int *addr_len) 669 { 670 saddr->ss_family = dlm_local_addr[0].ss_family; 671 if (saddr->ss_family == AF_INET) { 672 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 673 in4_addr->sin_port = port; 674 *addr_len = sizeof(struct sockaddr_in); 675 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 676 } else { 677 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 678 in6_addr->sin6_port = port; 679 *addr_len = sizeof(struct sockaddr_in6); 680 } 681 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 682 } 683 684 static void dlm_page_release(struct kref *kref) 685 { 686 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 687 ref); 688 689 __free_page(e->page); 690 dlm_free_writequeue(e); 691 } 692 693 static void dlm_msg_release(struct kref *kref) 694 { 695 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 696 697 kref_put(&msg->entry->ref, dlm_page_release); 698 dlm_free_msg(msg); 699 } 700 701 static void free_entry(struct writequeue_entry *e) 702 { 703 struct dlm_msg *msg, *tmp; 704 705 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 706 if (msg->orig_msg) { 707 msg->orig_msg->retransmit = false; 708 kref_put(&msg->orig_msg->ref, dlm_msg_release); 709 } 710 711 list_del(&msg->list); 712 kref_put(&msg->ref, dlm_msg_release); 713 } 714 715 list_del(&e->list); 716 kref_put(&e->ref, dlm_page_release); 717 } 718 719 static void dlm_close_sock(struct socket **sock) 720 { 721 lock_sock((*sock)->sk); 722 restore_callbacks((*sock)->sk); 723 release_sock((*sock)->sk); 724 725 sock_release(*sock); 726 *sock = NULL; 727 } 728 729 static void allow_connection_io(struct connection *con) 730 { 731 if (con->othercon) 732 clear_bit(CF_IO_STOP, &con->othercon->flags); 733 clear_bit(CF_IO_STOP, &con->flags); 734 } 735 736 static void stop_connection_io(struct connection *con) 737 { 738 if (con->othercon) 739 stop_connection_io(con->othercon); 740 741 spin_lock_bh(&con->writequeue_lock); 742 set_bit(CF_IO_STOP, &con->flags); 743 spin_unlock_bh(&con->writequeue_lock); 744 745 down_write(&con->sock_lock); 746 if (con->sock) { 747 lock_sock(con->sock->sk); 748 restore_callbacks(con->sock->sk); 749 release_sock(con->sock->sk); 750 } 751 up_write(&con->sock_lock); 752 753 cancel_work_sync(&con->swork); 754 cancel_work_sync(&con->rwork); 755 } 756 757 /* Close a remote connection and tidy up */ 758 static void close_connection(struct connection *con, bool and_other) 759 { 760 struct writequeue_entry *e; 761 762 if (con->othercon && and_other) 763 close_connection(con->othercon, false); 764 765 down_write(&con->sock_lock); 766 if (!con->sock) { 767 up_write(&con->sock_lock); 768 return; 769 } 770 771 dlm_close_sock(&con->sock); 772 773 /* if we send a writequeue entry only a half way, we drop the 774 * whole entry because reconnection and that we not start of the 775 * middle of a msg which will confuse the other end. 776 * 777 * we can always drop messages because retransmits, but what we 778 * cannot allow is to transmit half messages which may be processed 779 * at the other side. 780 * 781 * our policy is to start on a clean state when disconnects, we don't 782 * know what's send/received on transport layer in this case. 783 */ 784 spin_lock_bh(&con->writequeue_lock); 785 if (!list_empty(&con->writequeue)) { 786 e = list_first_entry(&con->writequeue, struct writequeue_entry, 787 list); 788 if (e->dirty) 789 free_entry(e); 790 } 791 spin_unlock_bh(&con->writequeue_lock); 792 793 con->rx_leftover = 0; 794 con->retries = 0; 795 clear_bit(CF_APP_LIMITED, &con->flags); 796 clear_bit(CF_RECV_PENDING, &con->flags); 797 clear_bit(CF_SEND_PENDING, &con->flags); 798 up_write(&con->sock_lock); 799 } 800 801 static void shutdown_connection(struct connection *con, bool and_other) 802 { 803 int ret; 804 805 if (con->othercon && and_other) 806 shutdown_connection(con->othercon, false); 807 808 flush_workqueue(io_workqueue); 809 down_read(&con->sock_lock); 810 /* nothing to shutdown */ 811 if (!con->sock) { 812 up_read(&con->sock_lock); 813 return; 814 } 815 816 ret = kernel_sock_shutdown(con->sock, dlm_proto_ops->how); 817 up_read(&con->sock_lock); 818 if (ret) { 819 log_print("Connection %p failed to shutdown: %d will force close", 820 con, ret); 821 goto force_close; 822 } else { 823 ret = wait_event_timeout(con->shutdown_wait, !con->sock, 824 DLM_SHUTDOWN_WAIT_TIMEOUT); 825 if (ret == 0) { 826 log_print("Connection %p shutdown timed out, will force close", 827 con); 828 goto force_close; 829 } 830 } 831 832 return; 833 834 force_close: 835 close_connection(con, false); 836 } 837 838 static struct processqueue_entry *new_processqueue_entry(int nodeid, 839 int buflen) 840 { 841 struct processqueue_entry *pentry; 842 843 pentry = kmalloc_obj(*pentry, GFP_NOFS); 844 if (!pentry) 845 return NULL; 846 847 pentry->buf = kmalloc(buflen, GFP_NOFS); 848 if (!pentry->buf) { 849 kfree(pentry); 850 return NULL; 851 } 852 853 pentry->nodeid = nodeid; 854 return pentry; 855 } 856 857 static void free_processqueue_entry(struct processqueue_entry *pentry) 858 { 859 kfree(pentry->buf); 860 kfree(pentry); 861 } 862 863 static void process_dlm_messages(struct work_struct *work) 864 { 865 struct processqueue_entry *pentry; 866 867 spin_lock_bh(&processqueue_lock); 868 pentry = list_first_entry_or_null(&processqueue, 869 struct processqueue_entry, list); 870 if (WARN_ON_ONCE(!pentry)) { 871 process_dlm_messages_pending = false; 872 spin_unlock_bh(&processqueue_lock); 873 return; 874 } 875 876 list_del(&pentry->list); 877 if (atomic_dec_and_test(&processqueue_count)) 878 wake_up(&processqueue_wq); 879 spin_unlock_bh(&processqueue_lock); 880 881 for (;;) { 882 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 883 pentry->buflen); 884 free_processqueue_entry(pentry); 885 886 spin_lock_bh(&processqueue_lock); 887 pentry = list_first_entry_or_null(&processqueue, 888 struct processqueue_entry, list); 889 if (!pentry) { 890 process_dlm_messages_pending = false; 891 spin_unlock_bh(&processqueue_lock); 892 break; 893 } 894 895 list_del(&pentry->list); 896 if (atomic_dec_and_test(&processqueue_count)) 897 wake_up(&processqueue_wq); 898 spin_unlock_bh(&processqueue_lock); 899 } 900 } 901 902 /* Data received from remote end */ 903 static int receive_from_sock(struct connection *con, int buflen) 904 { 905 struct processqueue_entry *pentry; 906 int ret, buflen_real; 907 struct msghdr msg; 908 struct kvec iov; 909 910 pentry = new_processqueue_entry(con->nodeid, buflen); 911 if (!pentry) 912 return DLM_IO_RESCHED; 913 914 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 915 916 /* calculate new buffer parameter regarding last receive and 917 * possible leftover bytes 918 */ 919 iov.iov_base = pentry->buf + con->rx_leftover; 920 iov.iov_len = buflen - con->rx_leftover; 921 922 memset(&msg, 0, sizeof(msg)); 923 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 924 clear_bit(CF_RECV_INTR, &con->flags); 925 again: 926 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 927 msg.msg_flags); 928 trace_dlm_recv(con->nodeid, ret); 929 if (ret == -EAGAIN) { 930 lock_sock(con->sock->sk); 931 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 932 release_sock(con->sock->sk); 933 goto again; 934 } 935 936 clear_bit(CF_RECV_PENDING, &con->flags); 937 release_sock(con->sock->sk); 938 free_processqueue_entry(pentry); 939 return DLM_IO_END; 940 } else if (ret == 0) { 941 /* close will clear CF_RECV_PENDING */ 942 free_processqueue_entry(pentry); 943 return DLM_IO_EOF; 944 } else if (ret < 0) { 945 free_processqueue_entry(pentry); 946 return ret; 947 } 948 949 /* new buflen according readed bytes and leftover from last receive */ 950 buflen_real = ret + con->rx_leftover; 951 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 952 buflen_real); 953 if (ret < 0) { 954 free_processqueue_entry(pentry); 955 return ret; 956 } 957 958 pentry->buflen = ret; 959 960 /* calculate leftover bytes from process and put it into begin of 961 * the receive buffer, so next receive we have the full message 962 * at the start address of the receive buffer. 963 */ 964 con->rx_leftover = buflen_real - ret; 965 memmove(con->rx_leftover_buf, pentry->buf + ret, 966 con->rx_leftover); 967 968 spin_lock_bh(&processqueue_lock); 969 ret = atomic_inc_return(&processqueue_count); 970 list_add_tail(&pentry->list, &processqueue); 971 if (!process_dlm_messages_pending) { 972 process_dlm_messages_pending = true; 973 queue_work(process_workqueue, &process_work); 974 } 975 spin_unlock_bh(&processqueue_lock); 976 977 if (ret > DLM_MAX_PROCESS_BUFFERS) 978 return DLM_IO_FLUSH; 979 980 return DLM_IO_SUCCESS; 981 } 982 983 /* Listening socket is busy, accept a connection */ 984 static int accept_from_sock(void) 985 { 986 struct sockaddr_storage peeraddr; 987 int len, idx, result, nodeid; 988 struct connection *newcon; 989 struct socket *newsock; 990 unsigned int mark; 991 992 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 993 if (result == -EAGAIN) 994 return DLM_IO_END; 995 else if (result < 0) 996 goto accept_err; 997 998 /* Get the connected socket's peer */ 999 memset(&peeraddr, 0, sizeof(peeraddr)); 1000 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 1001 if (len < 0) { 1002 result = -ECONNABORTED; 1003 goto accept_err; 1004 } 1005 1006 /* Get the new node's NODEID */ 1007 make_sockaddr(&peeraddr, 0, &len); 1008 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 1009 switch (peeraddr.ss_family) { 1010 case AF_INET: { 1011 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 1012 1013 log_print("connect from non cluster IPv4 node %pI4", 1014 &sin->sin_addr); 1015 break; 1016 } 1017 #if IS_ENABLED(CONFIG_IPV6) 1018 case AF_INET6: { 1019 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1020 1021 log_print("connect from non cluster IPv6 node %pI6c", 1022 &sin6->sin6_addr); 1023 break; 1024 } 1025 #endif 1026 default: 1027 log_print("invalid family from non cluster node"); 1028 break; 1029 } 1030 1031 sock_release(newsock); 1032 return -1; 1033 } 1034 1035 log_print("got connection from %d", nodeid); 1036 1037 /* Check to see if we already have a connection to this node. This 1038 * could happen if the two nodes initiate a connection at roughly 1039 * the same time and the connections cross on the wire. 1040 * In this case we store the incoming one in "othercon" 1041 */ 1042 idx = srcu_read_lock(&connections_srcu); 1043 newcon = nodeid2con(nodeid, 0); 1044 if (WARN_ON_ONCE(!newcon)) { 1045 srcu_read_unlock(&connections_srcu, idx); 1046 result = -ENOENT; 1047 goto accept_err; 1048 } 1049 1050 sock_set_mark(newsock->sk, mark); 1051 1052 down_write(&newcon->sock_lock); 1053 if (newcon->sock) { 1054 struct connection *othercon = newcon->othercon; 1055 1056 if (!othercon) { 1057 othercon = kzalloc_obj(*othercon, GFP_NOFS); 1058 if (!othercon) { 1059 log_print("failed to allocate incoming socket"); 1060 up_write(&newcon->sock_lock); 1061 srcu_read_unlock(&connections_srcu, idx); 1062 result = -ENOMEM; 1063 goto accept_err; 1064 } 1065 1066 dlm_con_init(othercon, nodeid); 1067 lockdep_set_subclass(&othercon->sock_lock, 1); 1068 newcon->othercon = othercon; 1069 set_bit(CF_IS_OTHERCON, &othercon->flags); 1070 } else { 1071 /* close other sock con if we have something new */ 1072 close_connection(othercon, false); 1073 } 1074 1075 down_write(&othercon->sock_lock); 1076 add_sock(newsock, othercon); 1077 1078 /* check if we receved something while adding */ 1079 lock_sock(othercon->sock->sk); 1080 lowcomms_queue_rwork(othercon); 1081 release_sock(othercon->sock->sk); 1082 up_write(&othercon->sock_lock); 1083 } 1084 else { 1085 /* accept copies the sk after we've saved the callbacks, so we 1086 don't want to save them a second time or comm errors will 1087 result in calling sk_error_report recursively. */ 1088 add_sock(newsock, newcon); 1089 1090 /* check if we receved something while adding */ 1091 lock_sock(newcon->sock->sk); 1092 lowcomms_queue_rwork(newcon); 1093 release_sock(newcon->sock->sk); 1094 } 1095 up_write(&newcon->sock_lock); 1096 srcu_read_unlock(&connections_srcu, idx); 1097 1098 return DLM_IO_SUCCESS; 1099 1100 accept_err: 1101 if (newsock) 1102 sock_release(newsock); 1103 1104 return result; 1105 } 1106 1107 /* 1108 * writequeue_entry_complete - try to delete and free write queue entry 1109 * @e: write queue entry to try to delete 1110 * @completed: bytes completed 1111 * 1112 * writequeue_lock must be held. 1113 */ 1114 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1115 { 1116 e->offset += completed; 1117 e->len -= completed; 1118 /* signal that page was half way transmitted */ 1119 e->dirty = true; 1120 1121 if (e->len == 0 && e->users == 0) 1122 free_entry(e); 1123 } 1124 1125 /* 1126 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1127 */ 1128 static int sctp_bind_addrs(struct socket *sock, __be16 port) 1129 { 1130 struct sockaddr_storage localaddr; 1131 struct sockaddr_unsized *addr = (struct sockaddr_unsized *)&localaddr; 1132 int i, addr_len, result = 0; 1133 1134 for (i = 0; i < dlm_local_count; i++) { 1135 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1136 make_sockaddr(&localaddr, port, &addr_len); 1137 1138 if (!i) 1139 result = kernel_bind(sock, addr, addr_len); 1140 else 1141 result = sock_bind_add(sock->sk, addr, addr_len); 1142 1143 if (result < 0) { 1144 log_print("Can't bind to %d addr number %d, %d.\n", 1145 port, i + 1, result); 1146 break; 1147 } 1148 } 1149 return result; 1150 } 1151 1152 /* Get local addresses */ 1153 static void init_local(void) 1154 { 1155 struct sockaddr_storage sas; 1156 int i; 1157 1158 dlm_local_count = 0; 1159 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1160 if (dlm_our_addr(&sas, i)) 1161 break; 1162 1163 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1164 } 1165 } 1166 1167 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1168 { 1169 struct writequeue_entry *entry; 1170 1171 entry = dlm_allocate_writequeue(); 1172 if (!entry) 1173 return NULL; 1174 1175 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1176 if (!entry->page) { 1177 dlm_free_writequeue(entry); 1178 return NULL; 1179 } 1180 1181 entry->offset = 0; 1182 entry->len = 0; 1183 entry->end = 0; 1184 entry->dirty = false; 1185 entry->con = con; 1186 entry->users = 1; 1187 kref_init(&entry->ref); 1188 return entry; 1189 } 1190 1191 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1192 char **ppc, void (*cb)(void *data), 1193 void *data) 1194 { 1195 struct writequeue_entry *e; 1196 1197 spin_lock_bh(&con->writequeue_lock); 1198 if (!list_empty(&con->writequeue)) { 1199 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1200 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1201 kref_get(&e->ref); 1202 1203 *ppc = page_address(e->page) + e->end; 1204 if (cb) 1205 cb(data); 1206 1207 e->end += len; 1208 e->users++; 1209 goto out; 1210 } 1211 } 1212 1213 e = new_writequeue_entry(con); 1214 if (!e) 1215 goto out; 1216 1217 kref_get(&e->ref); 1218 *ppc = page_address(e->page); 1219 e->end += len; 1220 if (cb) 1221 cb(data); 1222 1223 list_add_tail(&e->list, &con->writequeue); 1224 1225 out: 1226 spin_unlock_bh(&con->writequeue_lock); 1227 return e; 1228 }; 1229 1230 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1231 char **ppc, void (*cb)(void *data), 1232 void *data) 1233 { 1234 struct writequeue_entry *e; 1235 struct dlm_msg *msg; 1236 1237 msg = dlm_allocate_msg(); 1238 if (!msg) 1239 return NULL; 1240 1241 kref_init(&msg->ref); 1242 1243 e = new_wq_entry(con, len, ppc, cb, data); 1244 if (!e) { 1245 dlm_free_msg(msg); 1246 return NULL; 1247 } 1248 1249 msg->retransmit = false; 1250 msg->orig_msg = NULL; 1251 msg->ppc = *ppc; 1252 msg->len = len; 1253 msg->entry = e; 1254 1255 return msg; 1256 } 1257 1258 /* avoid false positive for nodes_srcu, unlock happens in 1259 * dlm_lowcomms_commit_msg which is a must call if success 1260 */ 1261 #ifndef __CHECKER__ 1262 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc, 1263 void (*cb)(void *data), void *data) 1264 { 1265 struct connection *con; 1266 struct dlm_msg *msg; 1267 int idx; 1268 1269 if (len > DLM_MAX_SOCKET_BUFSIZE || 1270 len < sizeof(struct dlm_header)) { 1271 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1272 log_print("failed to allocate a buffer of size %d", len); 1273 WARN_ON_ONCE(1); 1274 return NULL; 1275 } 1276 1277 idx = srcu_read_lock(&connections_srcu); 1278 con = nodeid2con(nodeid, 0); 1279 if (WARN_ON_ONCE(!con)) { 1280 srcu_read_unlock(&connections_srcu, idx); 1281 return NULL; 1282 } 1283 1284 msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); 1285 if (!msg) { 1286 srcu_read_unlock(&connections_srcu, idx); 1287 return NULL; 1288 } 1289 1290 /* for dlm_lowcomms_commit_msg() */ 1291 kref_get(&msg->ref); 1292 /* we assume if successful commit must called */ 1293 msg->idx = idx; 1294 return msg; 1295 } 1296 #endif 1297 1298 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1299 { 1300 struct writequeue_entry *e = msg->entry; 1301 struct connection *con = e->con; 1302 int users; 1303 1304 spin_lock_bh(&con->writequeue_lock); 1305 kref_get(&msg->ref); 1306 list_add(&msg->list, &e->msgs); 1307 1308 users = --e->users; 1309 if (users) 1310 goto out; 1311 1312 e->len = DLM_WQ_LENGTH_BYTES(e); 1313 1314 lowcomms_queue_swork(con); 1315 1316 out: 1317 spin_unlock_bh(&con->writequeue_lock); 1318 return; 1319 } 1320 1321 /* avoid false positive for nodes_srcu, lock was happen in 1322 * dlm_lowcomms_new_msg 1323 */ 1324 #ifndef __CHECKER__ 1325 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1326 { 1327 _dlm_lowcomms_commit_msg(msg); 1328 srcu_read_unlock(&connections_srcu, msg->idx); 1329 /* because dlm_lowcomms_new_msg() */ 1330 kref_put(&msg->ref, dlm_msg_release); 1331 } 1332 #endif 1333 1334 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1335 { 1336 kref_put(&msg->ref, dlm_msg_release); 1337 } 1338 1339 /* does not held connections_srcu, usage lowcomms_error_report only */ 1340 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1341 { 1342 struct dlm_msg *msg_resend; 1343 char *ppc; 1344 1345 if (msg->retransmit) 1346 return 1; 1347 1348 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, 1349 NULL, NULL); 1350 if (!msg_resend) 1351 return -ENOMEM; 1352 1353 msg->retransmit = true; 1354 kref_get(&msg->ref); 1355 msg_resend->orig_msg = msg; 1356 1357 memcpy(ppc, msg->ppc, msg->len); 1358 _dlm_lowcomms_commit_msg(msg_resend); 1359 dlm_lowcomms_put_msg(msg_resend); 1360 1361 return 0; 1362 } 1363 1364 /* Send a message */ 1365 static int send_to_sock(struct connection *con) 1366 { 1367 struct writequeue_entry *e; 1368 struct bio_vec bvec; 1369 struct msghdr msg = { 1370 .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL, 1371 }; 1372 int len, offset, ret; 1373 1374 spin_lock_bh(&con->writequeue_lock); 1375 e = con_next_wq(con); 1376 if (!e) { 1377 clear_bit(CF_SEND_PENDING, &con->flags); 1378 spin_unlock_bh(&con->writequeue_lock); 1379 return DLM_IO_END; 1380 } 1381 1382 len = e->len; 1383 offset = e->offset; 1384 WARN_ON_ONCE(len == 0 && e->users == 0); 1385 spin_unlock_bh(&con->writequeue_lock); 1386 1387 bvec_set_page(&bvec, e->page, len, offset); 1388 iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len); 1389 ret = sock_sendmsg(con->sock, &msg); 1390 trace_dlm_send(con->nodeid, ret); 1391 if (ret == -EAGAIN || ret == 0) { 1392 lock_sock(con->sock->sk); 1393 spin_lock_bh(&con->writequeue_lock); 1394 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1395 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1396 /* Notify TCP that we're limited by the 1397 * application window size. 1398 */ 1399 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1400 con->sock->sk->sk_write_pending++; 1401 1402 clear_bit(CF_SEND_PENDING, &con->flags); 1403 spin_unlock_bh(&con->writequeue_lock); 1404 release_sock(con->sock->sk); 1405 1406 /* wait for write_space() event */ 1407 return DLM_IO_END; 1408 } 1409 spin_unlock_bh(&con->writequeue_lock); 1410 release_sock(con->sock->sk); 1411 1412 return DLM_IO_RESCHED; 1413 } else if (ret < 0) { 1414 return ret; 1415 } 1416 1417 spin_lock_bh(&con->writequeue_lock); 1418 writequeue_entry_complete(e, ret); 1419 spin_unlock_bh(&con->writequeue_lock); 1420 1421 return DLM_IO_SUCCESS; 1422 } 1423 1424 static void clean_one_writequeue(struct connection *con) 1425 { 1426 struct writequeue_entry *e, *safe; 1427 1428 spin_lock_bh(&con->writequeue_lock); 1429 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1430 free_entry(e); 1431 } 1432 spin_unlock_bh(&con->writequeue_lock); 1433 } 1434 1435 static void connection_release(struct rcu_head *rcu) 1436 { 1437 struct connection *con = container_of(rcu, struct connection, rcu); 1438 1439 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1440 WARN_ON_ONCE(con->sock); 1441 kfree(con); 1442 } 1443 1444 /* Called from recovery when it knows that a node has 1445 left the cluster */ 1446 int dlm_lowcomms_close(int nodeid) 1447 { 1448 struct connection *con; 1449 int idx; 1450 1451 log_print("closing connection to node %d", nodeid); 1452 1453 idx = srcu_read_lock(&connections_srcu); 1454 con = nodeid2con(nodeid, 0); 1455 if (WARN_ON_ONCE(!con)) { 1456 srcu_read_unlock(&connections_srcu, idx); 1457 return -ENOENT; 1458 } 1459 1460 stop_connection_io(con); 1461 log_print("io handling for node: %d stopped", nodeid); 1462 close_connection(con, true); 1463 1464 spin_lock(&connections_lock); 1465 hlist_del_rcu(&con->list); 1466 spin_unlock(&connections_lock); 1467 1468 clean_one_writequeue(con); 1469 call_srcu(&connections_srcu, &con->rcu, connection_release); 1470 if (con->othercon) { 1471 clean_one_writequeue(con->othercon); 1472 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1473 } 1474 srcu_read_unlock(&connections_srcu, idx); 1475 1476 /* for debugging we print when we are done to compare with other 1477 * messages in between. This function need to be correctly synchronized 1478 * with io handling 1479 */ 1480 log_print("closing connection to node %d done", nodeid); 1481 1482 return 0; 1483 } 1484 1485 /* Receive worker function */ 1486 static void process_recv_sockets(struct work_struct *work) 1487 { 1488 struct connection *con = container_of(work, struct connection, rwork); 1489 int ret, buflen; 1490 1491 down_read(&con->sock_lock); 1492 if (!con->sock) { 1493 up_read(&con->sock_lock); 1494 return; 1495 } 1496 1497 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1498 do { 1499 ret = receive_from_sock(con, buflen); 1500 } while (ret == DLM_IO_SUCCESS); 1501 up_read(&con->sock_lock); 1502 1503 switch (ret) { 1504 case DLM_IO_END: 1505 /* CF_RECV_PENDING cleared */ 1506 break; 1507 case DLM_IO_EOF: 1508 close_connection(con, false); 1509 wake_up(&con->shutdown_wait); 1510 /* CF_RECV_PENDING cleared */ 1511 break; 1512 case DLM_IO_FLUSH: 1513 /* we can't flush the process_workqueue here because a 1514 * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non 1515 * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead 1516 * we have a waitqueue to wait until all messages are 1517 * processed. 1518 * 1519 * This handling is only necessary to backoff the sender and 1520 * not queue all messages from the socket layer into DLM 1521 * processqueue. When DLM is capable to parse multiple messages 1522 * on an e.g. per socket basis this handling can might be 1523 * removed. Especially in a message burst we are too slow to 1524 * process messages and the queue will fill up memory. 1525 */ 1526 wait_event(processqueue_wq, !atomic_read(&processqueue_count)); 1527 fallthrough; 1528 case DLM_IO_RESCHED: 1529 cond_resched(); 1530 queue_work(io_workqueue, &con->rwork); 1531 /* CF_RECV_PENDING not cleared */ 1532 break; 1533 default: 1534 if (ret < 0) { 1535 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1536 close_connection(con, false); 1537 } else { 1538 spin_lock_bh(&con->writequeue_lock); 1539 lowcomms_queue_swork(con); 1540 spin_unlock_bh(&con->writequeue_lock); 1541 } 1542 1543 /* CF_RECV_PENDING cleared for othercon 1544 * we trigger send queue if not already done 1545 * and process_send_sockets will handle it 1546 */ 1547 break; 1548 } 1549 1550 WARN_ON_ONCE(1); 1551 break; 1552 } 1553 } 1554 1555 static void process_listen_recv_socket(struct work_struct *work) 1556 { 1557 int ret; 1558 1559 if (WARN_ON_ONCE(!listen_con.sock)) 1560 return; 1561 1562 do { 1563 ret = accept_from_sock(); 1564 } while (ret == DLM_IO_SUCCESS); 1565 1566 if (ret < 0) 1567 log_print("critical error accepting connection: %d", ret); 1568 } 1569 1570 static int dlm_connect(struct connection *con) 1571 { 1572 struct sockaddr_storage addr; 1573 int result, addr_len; 1574 struct socket *sock; 1575 unsigned int mark; 1576 1577 memset(&addr, 0, sizeof(addr)); 1578 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1579 dlm_proto_ops->try_new_addr, &mark); 1580 if (result < 0) { 1581 log_print("no address for nodeid %d", con->nodeid); 1582 return result; 1583 } 1584 1585 /* Create a socket to communicate with */ 1586 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1587 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1588 if (result < 0) 1589 return result; 1590 1591 sock_set_mark(sock->sk, mark); 1592 dlm_proto_ops->sockopts(sock); 1593 1594 result = dlm_proto_ops->bind(sock); 1595 if (result < 0) { 1596 sock_release(sock); 1597 return result; 1598 } 1599 1600 add_sock(sock, con); 1601 1602 log_print_ratelimited("connecting to %d", con->nodeid); 1603 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1604 result = kernel_connect(sock, (struct sockaddr_unsized *)&addr, addr_len, 0); 1605 switch (result) { 1606 case -EINPROGRESS: 1607 /* not an error */ 1608 fallthrough; 1609 case 0: 1610 break; 1611 default: 1612 if (result < 0) 1613 dlm_close_sock(&con->sock); 1614 1615 break; 1616 } 1617 1618 return result; 1619 } 1620 1621 /* Send worker function */ 1622 static void process_send_sockets(struct work_struct *work) 1623 { 1624 struct connection *con = container_of(work, struct connection, swork); 1625 int ret; 1626 1627 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1628 1629 down_read(&con->sock_lock); 1630 if (!con->sock) { 1631 up_read(&con->sock_lock); 1632 down_write(&con->sock_lock); 1633 if (!con->sock) { 1634 ret = dlm_connect(con); 1635 switch (ret) { 1636 case 0: 1637 break; 1638 default: 1639 /* CF_SEND_PENDING not cleared */ 1640 up_write(&con->sock_lock); 1641 log_print("connect to node %d try %d error %d", 1642 con->nodeid, con->retries++, ret); 1643 msleep(1000); 1644 /* For now we try forever to reconnect. In 1645 * future we should send a event to cluster 1646 * manager to fence itself after certain amount 1647 * of retries. 1648 */ 1649 queue_work(io_workqueue, &con->swork); 1650 return; 1651 } 1652 } 1653 downgrade_write(&con->sock_lock); 1654 } 1655 1656 do { 1657 ret = send_to_sock(con); 1658 } while (ret == DLM_IO_SUCCESS); 1659 up_read(&con->sock_lock); 1660 1661 switch (ret) { 1662 case DLM_IO_END: 1663 /* CF_SEND_PENDING cleared */ 1664 break; 1665 case DLM_IO_RESCHED: 1666 /* CF_SEND_PENDING not cleared */ 1667 cond_resched(); 1668 queue_work(io_workqueue, &con->swork); 1669 break; 1670 default: 1671 if (ret < 0) { 1672 close_connection(con, false); 1673 1674 /* CF_SEND_PENDING cleared */ 1675 spin_lock_bh(&con->writequeue_lock); 1676 lowcomms_queue_swork(con); 1677 spin_unlock_bh(&con->writequeue_lock); 1678 break; 1679 } 1680 1681 WARN_ON_ONCE(1); 1682 break; 1683 } 1684 } 1685 1686 static void work_stop(void) 1687 { 1688 if (io_workqueue) { 1689 destroy_workqueue(io_workqueue); 1690 io_workqueue = NULL; 1691 } 1692 1693 if (process_workqueue) { 1694 destroy_workqueue(process_workqueue); 1695 process_workqueue = NULL; 1696 } 1697 } 1698 1699 static int work_start(void) 1700 { 1701 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM | 1702 WQ_UNBOUND, 0); 1703 if (!io_workqueue) { 1704 log_print("can't start dlm_io"); 1705 return -ENOMEM; 1706 } 1707 1708 process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH | WQ_PERCPU, 0); 1709 if (!process_workqueue) { 1710 log_print("can't start dlm_process"); 1711 destroy_workqueue(io_workqueue); 1712 io_workqueue = NULL; 1713 return -ENOMEM; 1714 } 1715 1716 return 0; 1717 } 1718 1719 void dlm_lowcomms_shutdown(void) 1720 { 1721 struct connection *con; 1722 int i, idx; 1723 1724 /* stop lowcomms_listen_data_ready calls */ 1725 lock_sock(listen_con.sock->sk); 1726 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1727 release_sock(listen_con.sock->sk); 1728 1729 cancel_work_sync(&listen_con.rwork); 1730 dlm_close_sock(&listen_con.sock); 1731 1732 idx = srcu_read_lock(&connections_srcu); 1733 for (i = 0; i < CONN_HASH_SIZE; i++) { 1734 hlist_for_each_entry_srcu(con, &connection_hash[i], list, 1735 srcu_read_lock_held(&connections_srcu)) { 1736 shutdown_connection(con, true); 1737 stop_connection_io(con); 1738 flush_workqueue(process_workqueue); 1739 close_connection(con, true); 1740 1741 clean_one_writequeue(con); 1742 if (con->othercon) 1743 clean_one_writequeue(con->othercon); 1744 allow_connection_io(con); 1745 } 1746 } 1747 srcu_read_unlock(&connections_srcu, idx); 1748 } 1749 1750 void dlm_lowcomms_stop(void) 1751 { 1752 work_stop(); 1753 dlm_proto_ops = NULL; 1754 } 1755 1756 static int dlm_listen_for_all(void) 1757 { 1758 struct socket *sock; 1759 int result; 1760 1761 log_print("Using %s for communications", 1762 dlm_proto_ops->name); 1763 1764 result = dlm_proto_ops->listen_validate(); 1765 if (result < 0) 1766 return result; 1767 1768 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1769 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1770 if (result < 0) { 1771 log_print("Can't create comms socket: %d", result); 1772 return result; 1773 } 1774 1775 sock_set_mark(sock->sk, dlm_config.ci_mark); 1776 dlm_proto_ops->listen_sockopts(sock); 1777 1778 result = dlm_proto_ops->listen_bind(sock); 1779 if (result < 0) 1780 goto out; 1781 1782 lock_sock(sock->sk); 1783 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1784 listen_sock.sk_write_space = sock->sk->sk_write_space; 1785 listen_sock.sk_error_report = sock->sk->sk_error_report; 1786 listen_sock.sk_state_change = sock->sk->sk_state_change; 1787 1788 listen_con.sock = sock; 1789 1790 sock->sk->sk_allocation = GFP_NOFS; 1791 sock->sk->sk_use_task_frag = false; 1792 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1793 release_sock(sock->sk); 1794 1795 result = sock->ops->listen(sock, 128); 1796 if (result < 0) { 1797 dlm_close_sock(&listen_con.sock); 1798 return result; 1799 } 1800 1801 return 0; 1802 1803 out: 1804 sock_release(sock); 1805 return result; 1806 } 1807 1808 static int dlm_tcp_bind(struct socket *sock) 1809 { 1810 struct sockaddr_storage src_addr; 1811 int result, addr_len; 1812 1813 /* Bind to our cluster-known address connecting to avoid 1814 * routing problems. 1815 */ 1816 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1817 make_sockaddr(&src_addr, 0, &addr_len); 1818 1819 result = kernel_bind(sock, (struct sockaddr_unsized *)&src_addr, 1820 addr_len); 1821 if (result < 0) { 1822 /* This *may* not indicate a critical error */ 1823 log_print("could not bind for connect: %d", result); 1824 } 1825 1826 return 0; 1827 } 1828 1829 static int dlm_tcp_listen_validate(void) 1830 { 1831 /* We don't support multi-homed hosts */ 1832 if (dlm_local_count > 1) { 1833 log_print("Detect multi-homed hosts but use only the first IP address."); 1834 log_print("Try SCTP, if you want to enable multi-link."); 1835 } 1836 1837 return 0; 1838 } 1839 1840 static void dlm_tcp_sockopts(struct socket *sock) 1841 { 1842 /* Turn off Nagle's algorithm */ 1843 tcp_sock_set_nodelay(sock->sk); 1844 } 1845 1846 static void dlm_tcp_listen_sockopts(struct socket *sock) 1847 { 1848 dlm_tcp_sockopts(sock); 1849 sock_set_reuseaddr(sock->sk); 1850 } 1851 1852 static int dlm_tcp_listen_bind(struct socket *sock) 1853 { 1854 int addr_len; 1855 1856 /* Bind to our port */ 1857 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1858 return kernel_bind(sock, (struct sockaddr_unsized *)&dlm_local_addr[0], 1859 addr_len); 1860 } 1861 1862 static const struct dlm_proto_ops dlm_tcp_ops = { 1863 .name = "TCP", 1864 .proto = IPPROTO_TCP, 1865 .how = SHUT_WR, 1866 .sockopts = dlm_tcp_sockopts, 1867 .bind = dlm_tcp_bind, 1868 .listen_validate = dlm_tcp_listen_validate, 1869 .listen_sockopts = dlm_tcp_listen_sockopts, 1870 .listen_bind = dlm_tcp_listen_bind, 1871 }; 1872 1873 static int dlm_sctp_bind(struct socket *sock) 1874 { 1875 return sctp_bind_addrs(sock, 0); 1876 } 1877 1878 static int dlm_sctp_listen_validate(void) 1879 { 1880 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1881 log_print("SCTP is not enabled by this kernel"); 1882 return -EOPNOTSUPP; 1883 } 1884 1885 request_module("sctp"); 1886 return 0; 1887 } 1888 1889 static int dlm_sctp_bind_listen(struct socket *sock) 1890 { 1891 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1892 } 1893 1894 static void dlm_sctp_sockopts(struct socket *sock) 1895 { 1896 /* Turn off Nagle's algorithm */ 1897 sctp_sock_set_nodelay(sock->sk); 1898 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1899 } 1900 1901 static const struct dlm_proto_ops dlm_sctp_ops = { 1902 .name = "SCTP", 1903 .proto = IPPROTO_SCTP, 1904 .how = SHUT_RDWR, 1905 .try_new_addr = true, 1906 .sockopts = dlm_sctp_sockopts, 1907 .bind = dlm_sctp_bind, 1908 .listen_validate = dlm_sctp_listen_validate, 1909 .listen_sockopts = dlm_sctp_sockopts, 1910 .listen_bind = dlm_sctp_bind_listen, 1911 }; 1912 1913 int dlm_lowcomms_start(void) 1914 { 1915 int error; 1916 1917 init_local(); 1918 if (!dlm_local_count) { 1919 error = -ENOTCONN; 1920 log_print("no local IP address has been set"); 1921 goto fail; 1922 } 1923 1924 error = work_start(); 1925 if (error) 1926 goto fail; 1927 1928 /* Start listening */ 1929 switch (dlm_config.ci_protocol) { 1930 case DLM_PROTO_TCP: 1931 dlm_proto_ops = &dlm_tcp_ops; 1932 break; 1933 case DLM_PROTO_SCTP: 1934 dlm_proto_ops = &dlm_sctp_ops; 1935 break; 1936 default: 1937 log_print("Invalid protocol identifier %d set", 1938 dlm_config.ci_protocol); 1939 error = -EINVAL; 1940 goto fail_proto_ops; 1941 } 1942 1943 error = dlm_listen_for_all(); 1944 if (error) 1945 goto fail_listen; 1946 1947 return 0; 1948 1949 fail_listen: 1950 dlm_proto_ops = NULL; 1951 fail_proto_ops: 1952 work_stop(); 1953 fail: 1954 return error; 1955 } 1956 1957 void dlm_lowcomms_init(void) 1958 { 1959 int i; 1960 1961 for (i = 0; i < CONN_HASH_SIZE; i++) 1962 INIT_HLIST_HEAD(&connection_hash[i]); 1963 1964 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1965 } 1966 1967 void dlm_lowcomms_exit(void) 1968 { 1969 struct connection *con; 1970 int i, idx; 1971 1972 idx = srcu_read_lock(&connections_srcu); 1973 for (i = 0; i < CONN_HASH_SIZE; i++) { 1974 hlist_for_each_entry_srcu(con, &connection_hash[i], list, 1975 srcu_read_lock_held(&connections_srcu)) { 1976 spin_lock(&connections_lock); 1977 hlist_del_rcu(&con->list); 1978 spin_unlock(&connections_lock); 1979 1980 if (con->othercon) 1981 call_srcu(&connections_srcu, &con->othercon->rcu, 1982 connection_release); 1983 call_srcu(&connections_srcu, &con->rcu, connection_release); 1984 } 1985 } 1986 srcu_read_unlock(&connections_srcu, idx); 1987 } 1988