1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 #include <trace/events/sock.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "memory.h" 63 #include "config.h" 64 65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) 66 #define DLM_MAX_PROCESS_BUFFERS 24 67 #define NEEDED_RMEM (4*1024*1024) 68 69 struct connection { 70 struct socket *sock; /* NULL if not connected */ 71 uint32_t nodeid; /* So we know who we are in the list */ 72 /* this semaphore is used to allow parallel recv/send in read 73 * lock mode. When we release a sock we need to held the write lock. 74 * 75 * However this is locking code and not nice. When we remove the 76 * othercon handling we can look into other mechanism to synchronize 77 * io handling to call sock_release() at the right time. 78 */ 79 struct rw_semaphore sock_lock; 80 unsigned long flags; 81 #define CF_APP_LIMITED 0 82 #define CF_RECV_PENDING 1 83 #define CF_SEND_PENDING 2 84 #define CF_RECV_INTR 3 85 #define CF_IO_STOP 4 86 #define CF_IS_OTHERCON 5 87 struct list_head writequeue; /* List of outgoing writequeue_entries */ 88 spinlock_t writequeue_lock; 89 int retries; 90 struct hlist_node list; 91 /* due some connect()/accept() races we currently have this cross over 92 * connection attempt second connection for one node. 93 * 94 * There is a solution to avoid the race by introducing a connect 95 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 96 * connect. Otherside can connect but will only be considered that 97 * the other side wants to have a reconnect. 98 * 99 * However changing to this behaviour will break backwards compatible. 100 * In a DLM protocol major version upgrade we should remove this! 101 */ 102 struct connection *othercon; 103 struct work_struct rwork; /* receive worker */ 104 struct work_struct swork; /* send worker */ 105 wait_queue_head_t shutdown_wait; 106 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 107 int rx_leftover; 108 int mark; 109 int addr_count; 110 int curr_addr_index; 111 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 112 spinlock_t addrs_lock; 113 struct rcu_head rcu; 114 }; 115 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 116 117 struct listen_connection { 118 struct socket *sock; 119 struct work_struct rwork; 120 }; 121 122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 124 125 /* An entry waiting to be sent */ 126 struct writequeue_entry { 127 struct list_head list; 128 struct page *page; 129 int offset; 130 int len; 131 int end; 132 int users; 133 bool dirty; 134 struct connection *con; 135 struct list_head msgs; 136 struct kref ref; 137 }; 138 139 struct dlm_msg { 140 struct writequeue_entry *entry; 141 struct dlm_msg *orig_msg; 142 bool retransmit; 143 void *ppc; 144 int len; 145 int idx; /* new()/commit() idx exchange */ 146 147 struct list_head list; 148 struct kref ref; 149 }; 150 151 struct processqueue_entry { 152 unsigned char *buf; 153 int nodeid; 154 int buflen; 155 156 struct list_head list; 157 }; 158 159 struct dlm_proto_ops { 160 bool try_new_addr; 161 const char *name; 162 int proto; 163 164 void (*sockopts)(struct socket *sock); 165 int (*bind)(struct socket *sock); 166 int (*listen_validate)(void); 167 void (*listen_sockopts)(struct socket *sock); 168 int (*listen_bind)(struct socket *sock); 169 }; 170 171 static struct listen_sock_callbacks { 172 void (*sk_error_report)(struct sock *); 173 void (*sk_data_ready)(struct sock *); 174 void (*sk_state_change)(struct sock *); 175 void (*sk_write_space)(struct sock *); 176 } listen_sock; 177 178 static struct listen_connection listen_con; 179 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 180 static int dlm_local_count; 181 182 /* Work queues */ 183 static struct workqueue_struct *io_workqueue; 184 static struct workqueue_struct *process_workqueue; 185 186 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 187 static DEFINE_SPINLOCK(connections_lock); 188 DEFINE_STATIC_SRCU(connections_srcu); 189 190 static const struct dlm_proto_ops *dlm_proto_ops; 191 192 #define DLM_IO_SUCCESS 0 193 #define DLM_IO_END 1 194 #define DLM_IO_EOF 2 195 #define DLM_IO_RESCHED 3 196 #define DLM_IO_FLUSH 4 197 198 static void process_recv_sockets(struct work_struct *work); 199 static void process_send_sockets(struct work_struct *work); 200 static void process_dlm_messages(struct work_struct *work); 201 202 static DECLARE_WORK(process_work, process_dlm_messages); 203 static DEFINE_SPINLOCK(processqueue_lock); 204 static bool process_dlm_messages_pending; 205 static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); 206 static atomic_t processqueue_count; 207 static LIST_HEAD(processqueue); 208 209 bool dlm_lowcomms_is_running(void) 210 { 211 return !!listen_con.sock; 212 } 213 214 static void lowcomms_queue_swork(struct connection *con) 215 { 216 assert_spin_locked(&con->writequeue_lock); 217 218 if (!test_bit(CF_IO_STOP, &con->flags) && 219 !test_bit(CF_APP_LIMITED, &con->flags) && 220 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 221 queue_work(io_workqueue, &con->swork); 222 } 223 224 static void lowcomms_queue_rwork(struct connection *con) 225 { 226 #ifdef CONFIG_LOCKDEP 227 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 228 #endif 229 230 if (!test_bit(CF_IO_STOP, &con->flags) && 231 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 232 queue_work(io_workqueue, &con->rwork); 233 } 234 235 static void writequeue_entry_ctor(void *data) 236 { 237 struct writequeue_entry *entry = data; 238 239 INIT_LIST_HEAD(&entry->msgs); 240 } 241 242 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 243 { 244 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 245 0, 0, writequeue_entry_ctor); 246 } 247 248 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 249 { 250 return KMEM_CACHE(dlm_msg, 0); 251 } 252 253 /* need to held writequeue_lock */ 254 static struct writequeue_entry *con_next_wq(struct connection *con) 255 { 256 struct writequeue_entry *e; 257 258 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 259 list); 260 /* if len is zero nothing is to send, if there are users filling 261 * buffers we wait until the users are done so we can send more. 262 */ 263 if (!e || e->users || e->len == 0) 264 return NULL; 265 266 return e; 267 } 268 269 static struct connection *__find_con(int nodeid, int r) 270 { 271 struct connection *con; 272 273 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 274 if (con->nodeid == nodeid) 275 return con; 276 } 277 278 return NULL; 279 } 280 281 static void dlm_con_init(struct connection *con, int nodeid) 282 { 283 con->nodeid = nodeid; 284 init_rwsem(&con->sock_lock); 285 INIT_LIST_HEAD(&con->writequeue); 286 spin_lock_init(&con->writequeue_lock); 287 INIT_WORK(&con->swork, process_send_sockets); 288 INIT_WORK(&con->rwork, process_recv_sockets); 289 spin_lock_init(&con->addrs_lock); 290 init_waitqueue_head(&con->shutdown_wait); 291 } 292 293 /* 294 * If 'allocation' is zero then we don't attempt to create a new 295 * connection structure for this node. 296 */ 297 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 298 { 299 struct connection *con, *tmp; 300 int r; 301 302 r = nodeid_hash(nodeid); 303 con = __find_con(nodeid, r); 304 if (con || !alloc) 305 return con; 306 307 con = kzalloc(sizeof(*con), alloc); 308 if (!con) 309 return NULL; 310 311 dlm_con_init(con, nodeid); 312 313 spin_lock(&connections_lock); 314 /* Because multiple workqueues/threads calls this function it can 315 * race on multiple cpu's. Instead of locking hot path __find_con() 316 * we just check in rare cases of recently added nodes again 317 * under protection of connections_lock. If this is the case we 318 * abort our connection creation and return the existing connection. 319 */ 320 tmp = __find_con(nodeid, r); 321 if (tmp) { 322 spin_unlock(&connections_lock); 323 kfree(con); 324 return tmp; 325 } 326 327 hlist_add_head_rcu(&con->list, &connection_hash[r]); 328 spin_unlock(&connections_lock); 329 330 return con; 331 } 332 333 static int addr_compare(const struct sockaddr_storage *x, 334 const struct sockaddr_storage *y) 335 { 336 switch (x->ss_family) { 337 case AF_INET: { 338 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 339 struct sockaddr_in *siny = (struct sockaddr_in *)y; 340 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 341 return 0; 342 if (sinx->sin_port != siny->sin_port) 343 return 0; 344 break; 345 } 346 case AF_INET6: { 347 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 348 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 349 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 350 return 0; 351 if (sinx->sin6_port != siny->sin6_port) 352 return 0; 353 break; 354 } 355 default: 356 return 0; 357 } 358 return 1; 359 } 360 361 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 362 struct sockaddr *sa_out, bool try_new_addr, 363 unsigned int *mark) 364 { 365 struct sockaddr_storage sas; 366 struct connection *con; 367 int idx; 368 369 if (!dlm_local_count) 370 return -1; 371 372 idx = srcu_read_lock(&connections_srcu); 373 con = nodeid2con(nodeid, 0); 374 if (!con) { 375 srcu_read_unlock(&connections_srcu, idx); 376 return -ENOENT; 377 } 378 379 spin_lock(&con->addrs_lock); 380 if (!con->addr_count) { 381 spin_unlock(&con->addrs_lock); 382 srcu_read_unlock(&connections_srcu, idx); 383 return -ENOENT; 384 } 385 386 memcpy(&sas, &con->addr[con->curr_addr_index], 387 sizeof(struct sockaddr_storage)); 388 389 if (try_new_addr) { 390 con->curr_addr_index++; 391 if (con->curr_addr_index == con->addr_count) 392 con->curr_addr_index = 0; 393 } 394 395 *mark = con->mark; 396 spin_unlock(&con->addrs_lock); 397 398 if (sas_out) 399 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 400 401 if (!sa_out) { 402 srcu_read_unlock(&connections_srcu, idx); 403 return 0; 404 } 405 406 if (dlm_local_addr[0].ss_family == AF_INET) { 407 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 408 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 409 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 410 } else { 411 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 412 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 413 ret6->sin6_addr = in6->sin6_addr; 414 } 415 416 srcu_read_unlock(&connections_srcu, idx); 417 return 0; 418 } 419 420 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 421 unsigned int *mark) 422 { 423 struct connection *con; 424 int i, idx, addr_i; 425 426 idx = srcu_read_lock(&connections_srcu); 427 for (i = 0; i < CONN_HASH_SIZE; i++) { 428 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 429 WARN_ON_ONCE(!con->addr_count); 430 431 spin_lock(&con->addrs_lock); 432 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 433 if (addr_compare(&con->addr[addr_i], addr)) { 434 *nodeid = con->nodeid; 435 *mark = con->mark; 436 spin_unlock(&con->addrs_lock); 437 srcu_read_unlock(&connections_srcu, idx); 438 return 0; 439 } 440 } 441 spin_unlock(&con->addrs_lock); 442 } 443 } 444 srcu_read_unlock(&connections_srcu, idx); 445 446 return -ENOENT; 447 } 448 449 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 450 const struct sockaddr_storage *addr) 451 { 452 int i; 453 454 for (i = 0; i < con->addr_count; i++) { 455 if (addr_compare(&con->addr[i], addr)) 456 return true; 457 } 458 459 return false; 460 } 461 462 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr) 463 { 464 struct connection *con; 465 bool ret, idx; 466 467 idx = srcu_read_lock(&connections_srcu); 468 con = nodeid2con(nodeid, GFP_NOFS); 469 if (!con) { 470 srcu_read_unlock(&connections_srcu, idx); 471 return -ENOMEM; 472 } 473 474 spin_lock(&con->addrs_lock); 475 if (!con->addr_count) { 476 memcpy(&con->addr[0], addr, sizeof(*addr)); 477 con->addr_count = 1; 478 con->mark = dlm_config.ci_mark; 479 spin_unlock(&con->addrs_lock); 480 srcu_read_unlock(&connections_srcu, idx); 481 return 0; 482 } 483 484 ret = dlm_lowcomms_con_has_addr(con, addr); 485 if (ret) { 486 spin_unlock(&con->addrs_lock); 487 srcu_read_unlock(&connections_srcu, idx); 488 return -EEXIST; 489 } 490 491 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 492 spin_unlock(&con->addrs_lock); 493 srcu_read_unlock(&connections_srcu, idx); 494 return -ENOSPC; 495 } 496 497 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 498 srcu_read_unlock(&connections_srcu, idx); 499 spin_unlock(&con->addrs_lock); 500 return 0; 501 } 502 503 /* Data available on socket or listen socket received a connect */ 504 static void lowcomms_data_ready(struct sock *sk) 505 { 506 struct connection *con = sock2con(sk); 507 508 trace_sk_data_ready(sk); 509 510 set_bit(CF_RECV_INTR, &con->flags); 511 lowcomms_queue_rwork(con); 512 } 513 514 static void lowcomms_write_space(struct sock *sk) 515 { 516 struct connection *con = sock2con(sk); 517 518 clear_bit(SOCK_NOSPACE, &con->sock->flags); 519 520 spin_lock_bh(&con->writequeue_lock); 521 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 522 con->sock->sk->sk_write_pending--; 523 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 524 } 525 526 lowcomms_queue_swork(con); 527 spin_unlock_bh(&con->writequeue_lock); 528 } 529 530 static void lowcomms_state_change(struct sock *sk) 531 { 532 /* SCTP layer is not calling sk_data_ready when the connection 533 * is done, so we catch the signal through here. 534 */ 535 if (sk->sk_shutdown == RCV_SHUTDOWN) 536 lowcomms_data_ready(sk); 537 } 538 539 static void lowcomms_listen_data_ready(struct sock *sk) 540 { 541 trace_sk_data_ready(sk); 542 543 queue_work(io_workqueue, &listen_con.rwork); 544 } 545 546 int dlm_lowcomms_connect_node(int nodeid) 547 { 548 struct connection *con; 549 int idx; 550 551 idx = srcu_read_lock(&connections_srcu); 552 con = nodeid2con(nodeid, 0); 553 if (WARN_ON_ONCE(!con)) { 554 srcu_read_unlock(&connections_srcu, idx); 555 return -ENOENT; 556 } 557 558 down_read(&con->sock_lock); 559 if (!con->sock) { 560 spin_lock_bh(&con->writequeue_lock); 561 lowcomms_queue_swork(con); 562 spin_unlock_bh(&con->writequeue_lock); 563 } 564 up_read(&con->sock_lock); 565 srcu_read_unlock(&connections_srcu, idx); 566 567 cond_resched(); 568 return 0; 569 } 570 571 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 572 { 573 struct connection *con; 574 int idx; 575 576 idx = srcu_read_lock(&connections_srcu); 577 con = nodeid2con(nodeid, 0); 578 if (!con) { 579 srcu_read_unlock(&connections_srcu, idx); 580 return -ENOENT; 581 } 582 583 spin_lock(&con->addrs_lock); 584 con->mark = mark; 585 spin_unlock(&con->addrs_lock); 586 srcu_read_unlock(&connections_srcu, idx); 587 return 0; 588 } 589 590 static void lowcomms_error_report(struct sock *sk) 591 { 592 struct connection *con = sock2con(sk); 593 struct inet_sock *inet; 594 595 inet = inet_sk(sk); 596 switch (sk->sk_family) { 597 case AF_INET: 598 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 599 "sending to node %d at %pI4, dport %d, " 600 "sk_err=%d/%d\n", dlm_our_nodeid(), 601 con->nodeid, &inet->inet_daddr, 602 ntohs(inet->inet_dport), sk->sk_err, 603 READ_ONCE(sk->sk_err_soft)); 604 break; 605 #if IS_ENABLED(CONFIG_IPV6) 606 case AF_INET6: 607 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 608 "sending to node %d at %pI6c, " 609 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 610 con->nodeid, &sk->sk_v6_daddr, 611 ntohs(inet->inet_dport), sk->sk_err, 612 READ_ONCE(sk->sk_err_soft)); 613 break; 614 #endif 615 default: 616 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 617 "invalid socket family %d set, " 618 "sk_err=%d/%d\n", dlm_our_nodeid(), 619 sk->sk_family, sk->sk_err, 620 READ_ONCE(sk->sk_err_soft)); 621 break; 622 } 623 624 dlm_midcomms_unack_msg_resend(con->nodeid); 625 626 listen_sock.sk_error_report(sk); 627 } 628 629 static void restore_callbacks(struct sock *sk) 630 { 631 #ifdef CONFIG_LOCKDEP 632 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 633 #endif 634 635 sk->sk_user_data = NULL; 636 sk->sk_data_ready = listen_sock.sk_data_ready; 637 sk->sk_state_change = listen_sock.sk_state_change; 638 sk->sk_write_space = listen_sock.sk_write_space; 639 sk->sk_error_report = listen_sock.sk_error_report; 640 } 641 642 /* Make a socket active */ 643 static void add_sock(struct socket *sock, struct connection *con) 644 { 645 struct sock *sk = sock->sk; 646 647 lock_sock(sk); 648 con->sock = sock; 649 650 sk->sk_user_data = con; 651 sk->sk_data_ready = lowcomms_data_ready; 652 sk->sk_write_space = lowcomms_write_space; 653 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 654 sk->sk_state_change = lowcomms_state_change; 655 sk->sk_allocation = GFP_NOFS; 656 sk->sk_use_task_frag = false; 657 sk->sk_error_report = lowcomms_error_report; 658 release_sock(sk); 659 } 660 661 /* Add the port number to an IPv6 or 4 sockaddr and return the address 662 length */ 663 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 664 int *addr_len) 665 { 666 saddr->ss_family = dlm_local_addr[0].ss_family; 667 if (saddr->ss_family == AF_INET) { 668 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 669 in4_addr->sin_port = cpu_to_be16(port); 670 *addr_len = sizeof(struct sockaddr_in); 671 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 672 } else { 673 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 674 in6_addr->sin6_port = cpu_to_be16(port); 675 *addr_len = sizeof(struct sockaddr_in6); 676 } 677 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 678 } 679 680 static void dlm_page_release(struct kref *kref) 681 { 682 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 683 ref); 684 685 __free_page(e->page); 686 dlm_free_writequeue(e); 687 } 688 689 static void dlm_msg_release(struct kref *kref) 690 { 691 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 692 693 kref_put(&msg->entry->ref, dlm_page_release); 694 dlm_free_msg(msg); 695 } 696 697 static void free_entry(struct writequeue_entry *e) 698 { 699 struct dlm_msg *msg, *tmp; 700 701 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 702 if (msg->orig_msg) { 703 msg->orig_msg->retransmit = false; 704 kref_put(&msg->orig_msg->ref, dlm_msg_release); 705 } 706 707 list_del(&msg->list); 708 kref_put(&msg->ref, dlm_msg_release); 709 } 710 711 list_del(&e->list); 712 kref_put(&e->ref, dlm_page_release); 713 } 714 715 static void dlm_close_sock(struct socket **sock) 716 { 717 lock_sock((*sock)->sk); 718 restore_callbacks((*sock)->sk); 719 release_sock((*sock)->sk); 720 721 sock_release(*sock); 722 *sock = NULL; 723 } 724 725 static void allow_connection_io(struct connection *con) 726 { 727 if (con->othercon) 728 clear_bit(CF_IO_STOP, &con->othercon->flags); 729 clear_bit(CF_IO_STOP, &con->flags); 730 } 731 732 static void stop_connection_io(struct connection *con) 733 { 734 if (con->othercon) 735 stop_connection_io(con->othercon); 736 737 spin_lock_bh(&con->writequeue_lock); 738 set_bit(CF_IO_STOP, &con->flags); 739 spin_unlock_bh(&con->writequeue_lock); 740 741 down_write(&con->sock_lock); 742 if (con->sock) { 743 lock_sock(con->sock->sk); 744 restore_callbacks(con->sock->sk); 745 release_sock(con->sock->sk); 746 } 747 up_write(&con->sock_lock); 748 749 cancel_work_sync(&con->swork); 750 cancel_work_sync(&con->rwork); 751 } 752 753 /* Close a remote connection and tidy up */ 754 static void close_connection(struct connection *con, bool and_other) 755 { 756 struct writequeue_entry *e; 757 758 if (con->othercon && and_other) 759 close_connection(con->othercon, false); 760 761 down_write(&con->sock_lock); 762 if (!con->sock) { 763 up_write(&con->sock_lock); 764 return; 765 } 766 767 dlm_close_sock(&con->sock); 768 769 /* if we send a writequeue entry only a half way, we drop the 770 * whole entry because reconnection and that we not start of the 771 * middle of a msg which will confuse the other end. 772 * 773 * we can always drop messages because retransmits, but what we 774 * cannot allow is to transmit half messages which may be processed 775 * at the other side. 776 * 777 * our policy is to start on a clean state when disconnects, we don't 778 * know what's send/received on transport layer in this case. 779 */ 780 spin_lock_bh(&con->writequeue_lock); 781 if (!list_empty(&con->writequeue)) { 782 e = list_first_entry(&con->writequeue, struct writequeue_entry, 783 list); 784 if (e->dirty) 785 free_entry(e); 786 } 787 spin_unlock_bh(&con->writequeue_lock); 788 789 con->rx_leftover = 0; 790 con->retries = 0; 791 clear_bit(CF_APP_LIMITED, &con->flags); 792 clear_bit(CF_RECV_PENDING, &con->flags); 793 clear_bit(CF_SEND_PENDING, &con->flags); 794 up_write(&con->sock_lock); 795 } 796 797 static void shutdown_connection(struct connection *con, bool and_other) 798 { 799 int ret; 800 801 if (con->othercon && and_other) 802 shutdown_connection(con->othercon, false); 803 804 flush_workqueue(io_workqueue); 805 down_read(&con->sock_lock); 806 /* nothing to shutdown */ 807 if (!con->sock) { 808 up_read(&con->sock_lock); 809 return; 810 } 811 812 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 813 up_read(&con->sock_lock); 814 if (ret) { 815 log_print("Connection %p failed to shutdown: %d will force close", 816 con, ret); 817 goto force_close; 818 } else { 819 ret = wait_event_timeout(con->shutdown_wait, !con->sock, 820 DLM_SHUTDOWN_WAIT_TIMEOUT); 821 if (ret == 0) { 822 log_print("Connection %p shutdown timed out, will force close", 823 con); 824 goto force_close; 825 } 826 } 827 828 return; 829 830 force_close: 831 close_connection(con, false); 832 } 833 834 static struct processqueue_entry *new_processqueue_entry(int nodeid, 835 int buflen) 836 { 837 struct processqueue_entry *pentry; 838 839 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 840 if (!pentry) 841 return NULL; 842 843 pentry->buf = kmalloc(buflen, GFP_NOFS); 844 if (!pentry->buf) { 845 kfree(pentry); 846 return NULL; 847 } 848 849 pentry->nodeid = nodeid; 850 return pentry; 851 } 852 853 static void free_processqueue_entry(struct processqueue_entry *pentry) 854 { 855 kfree(pentry->buf); 856 kfree(pentry); 857 } 858 859 static void process_dlm_messages(struct work_struct *work) 860 { 861 struct processqueue_entry *pentry; 862 863 spin_lock_bh(&processqueue_lock); 864 pentry = list_first_entry_or_null(&processqueue, 865 struct processqueue_entry, list); 866 if (WARN_ON_ONCE(!pentry)) { 867 process_dlm_messages_pending = false; 868 spin_unlock_bh(&processqueue_lock); 869 return; 870 } 871 872 list_del(&pentry->list); 873 if (atomic_dec_and_test(&processqueue_count)) 874 wake_up(&processqueue_wq); 875 spin_unlock_bh(&processqueue_lock); 876 877 for (;;) { 878 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 879 pentry->buflen); 880 free_processqueue_entry(pentry); 881 882 spin_lock_bh(&processqueue_lock); 883 pentry = list_first_entry_or_null(&processqueue, 884 struct processqueue_entry, list); 885 if (!pentry) { 886 process_dlm_messages_pending = false; 887 spin_unlock_bh(&processqueue_lock); 888 break; 889 } 890 891 list_del(&pentry->list); 892 if (atomic_dec_and_test(&processqueue_count)) 893 wake_up(&processqueue_wq); 894 spin_unlock_bh(&processqueue_lock); 895 } 896 } 897 898 /* Data received from remote end */ 899 static int receive_from_sock(struct connection *con, int buflen) 900 { 901 struct processqueue_entry *pentry; 902 int ret, buflen_real; 903 struct msghdr msg; 904 struct kvec iov; 905 906 pentry = new_processqueue_entry(con->nodeid, buflen); 907 if (!pentry) 908 return DLM_IO_RESCHED; 909 910 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 911 912 /* calculate new buffer parameter regarding last receive and 913 * possible leftover bytes 914 */ 915 iov.iov_base = pentry->buf + con->rx_leftover; 916 iov.iov_len = buflen - con->rx_leftover; 917 918 memset(&msg, 0, sizeof(msg)); 919 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 920 clear_bit(CF_RECV_INTR, &con->flags); 921 again: 922 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 923 msg.msg_flags); 924 trace_dlm_recv(con->nodeid, ret); 925 if (ret == -EAGAIN) { 926 lock_sock(con->sock->sk); 927 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 928 release_sock(con->sock->sk); 929 goto again; 930 } 931 932 clear_bit(CF_RECV_PENDING, &con->flags); 933 release_sock(con->sock->sk); 934 free_processqueue_entry(pentry); 935 return DLM_IO_END; 936 } else if (ret == 0) { 937 /* close will clear CF_RECV_PENDING */ 938 free_processqueue_entry(pentry); 939 return DLM_IO_EOF; 940 } else if (ret < 0) { 941 free_processqueue_entry(pentry); 942 return ret; 943 } 944 945 /* new buflen according readed bytes and leftover from last receive */ 946 buflen_real = ret + con->rx_leftover; 947 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 948 buflen_real); 949 if (ret < 0) { 950 free_processqueue_entry(pentry); 951 return ret; 952 } 953 954 pentry->buflen = ret; 955 956 /* calculate leftover bytes from process and put it into begin of 957 * the receive buffer, so next receive we have the full message 958 * at the start address of the receive buffer. 959 */ 960 con->rx_leftover = buflen_real - ret; 961 memmove(con->rx_leftover_buf, pentry->buf + ret, 962 con->rx_leftover); 963 964 spin_lock_bh(&processqueue_lock); 965 ret = atomic_inc_return(&processqueue_count); 966 list_add_tail(&pentry->list, &processqueue); 967 if (!process_dlm_messages_pending) { 968 process_dlm_messages_pending = true; 969 queue_work(process_workqueue, &process_work); 970 } 971 spin_unlock_bh(&processqueue_lock); 972 973 if (ret > DLM_MAX_PROCESS_BUFFERS) 974 return DLM_IO_FLUSH; 975 976 return DLM_IO_SUCCESS; 977 } 978 979 /* Listening socket is busy, accept a connection */ 980 static int accept_from_sock(void) 981 { 982 struct sockaddr_storage peeraddr; 983 int len, idx, result, nodeid; 984 struct connection *newcon; 985 struct socket *newsock; 986 unsigned int mark; 987 988 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 989 if (result == -EAGAIN) 990 return DLM_IO_END; 991 else if (result < 0) 992 goto accept_err; 993 994 /* Get the connected socket's peer */ 995 memset(&peeraddr, 0, sizeof(peeraddr)); 996 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 997 if (len < 0) { 998 result = -ECONNABORTED; 999 goto accept_err; 1000 } 1001 1002 /* Get the new node's NODEID */ 1003 make_sockaddr(&peeraddr, 0, &len); 1004 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 1005 switch (peeraddr.ss_family) { 1006 case AF_INET: { 1007 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 1008 1009 log_print("connect from non cluster IPv4 node %pI4", 1010 &sin->sin_addr); 1011 break; 1012 } 1013 #if IS_ENABLED(CONFIG_IPV6) 1014 case AF_INET6: { 1015 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1016 1017 log_print("connect from non cluster IPv6 node %pI6c", 1018 &sin6->sin6_addr); 1019 break; 1020 } 1021 #endif 1022 default: 1023 log_print("invalid family from non cluster node"); 1024 break; 1025 } 1026 1027 sock_release(newsock); 1028 return -1; 1029 } 1030 1031 log_print("got connection from %d", nodeid); 1032 1033 /* Check to see if we already have a connection to this node. This 1034 * could happen if the two nodes initiate a connection at roughly 1035 * the same time and the connections cross on the wire. 1036 * In this case we store the incoming one in "othercon" 1037 */ 1038 idx = srcu_read_lock(&connections_srcu); 1039 newcon = nodeid2con(nodeid, 0); 1040 if (WARN_ON_ONCE(!newcon)) { 1041 srcu_read_unlock(&connections_srcu, idx); 1042 result = -ENOENT; 1043 goto accept_err; 1044 } 1045 1046 sock_set_mark(newsock->sk, mark); 1047 1048 down_write(&newcon->sock_lock); 1049 if (newcon->sock) { 1050 struct connection *othercon = newcon->othercon; 1051 1052 if (!othercon) { 1053 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1054 if (!othercon) { 1055 log_print("failed to allocate incoming socket"); 1056 up_write(&newcon->sock_lock); 1057 srcu_read_unlock(&connections_srcu, idx); 1058 result = -ENOMEM; 1059 goto accept_err; 1060 } 1061 1062 dlm_con_init(othercon, nodeid); 1063 lockdep_set_subclass(&othercon->sock_lock, 1); 1064 newcon->othercon = othercon; 1065 set_bit(CF_IS_OTHERCON, &othercon->flags); 1066 } else { 1067 /* close other sock con if we have something new */ 1068 close_connection(othercon, false); 1069 } 1070 1071 down_write(&othercon->sock_lock); 1072 add_sock(newsock, othercon); 1073 1074 /* check if we receved something while adding */ 1075 lock_sock(othercon->sock->sk); 1076 lowcomms_queue_rwork(othercon); 1077 release_sock(othercon->sock->sk); 1078 up_write(&othercon->sock_lock); 1079 } 1080 else { 1081 /* accept copies the sk after we've saved the callbacks, so we 1082 don't want to save them a second time or comm errors will 1083 result in calling sk_error_report recursively. */ 1084 add_sock(newsock, newcon); 1085 1086 /* check if we receved something while adding */ 1087 lock_sock(newcon->sock->sk); 1088 lowcomms_queue_rwork(newcon); 1089 release_sock(newcon->sock->sk); 1090 } 1091 up_write(&newcon->sock_lock); 1092 srcu_read_unlock(&connections_srcu, idx); 1093 1094 return DLM_IO_SUCCESS; 1095 1096 accept_err: 1097 if (newsock) 1098 sock_release(newsock); 1099 1100 return result; 1101 } 1102 1103 /* 1104 * writequeue_entry_complete - try to delete and free write queue entry 1105 * @e: write queue entry to try to delete 1106 * @completed: bytes completed 1107 * 1108 * writequeue_lock must be held. 1109 */ 1110 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1111 { 1112 e->offset += completed; 1113 e->len -= completed; 1114 /* signal that page was half way transmitted */ 1115 e->dirty = true; 1116 1117 if (e->len == 0 && e->users == 0) 1118 free_entry(e); 1119 } 1120 1121 /* 1122 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1123 */ 1124 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1125 { 1126 struct sockaddr_storage localaddr; 1127 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1128 int i, addr_len, result = 0; 1129 1130 for (i = 0; i < dlm_local_count; i++) { 1131 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1132 make_sockaddr(&localaddr, port, &addr_len); 1133 1134 if (!i) 1135 result = kernel_bind(sock, addr, addr_len); 1136 else 1137 result = sock_bind_add(sock->sk, addr, addr_len); 1138 1139 if (result < 0) { 1140 log_print("Can't bind to %d addr number %d, %d.\n", 1141 port, i + 1, result); 1142 break; 1143 } 1144 } 1145 return result; 1146 } 1147 1148 /* Get local addresses */ 1149 static void init_local(void) 1150 { 1151 struct sockaddr_storage sas; 1152 int i; 1153 1154 dlm_local_count = 0; 1155 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1156 if (dlm_our_addr(&sas, i)) 1157 break; 1158 1159 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1160 } 1161 } 1162 1163 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1164 { 1165 struct writequeue_entry *entry; 1166 1167 entry = dlm_allocate_writequeue(); 1168 if (!entry) 1169 return NULL; 1170 1171 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1172 if (!entry->page) { 1173 dlm_free_writequeue(entry); 1174 return NULL; 1175 } 1176 1177 entry->offset = 0; 1178 entry->len = 0; 1179 entry->end = 0; 1180 entry->dirty = false; 1181 entry->con = con; 1182 entry->users = 1; 1183 kref_init(&entry->ref); 1184 return entry; 1185 } 1186 1187 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1188 char **ppc, void (*cb)(void *data), 1189 void *data) 1190 { 1191 struct writequeue_entry *e; 1192 1193 spin_lock_bh(&con->writequeue_lock); 1194 if (!list_empty(&con->writequeue)) { 1195 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1196 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1197 kref_get(&e->ref); 1198 1199 *ppc = page_address(e->page) + e->end; 1200 if (cb) 1201 cb(data); 1202 1203 e->end += len; 1204 e->users++; 1205 goto out; 1206 } 1207 } 1208 1209 e = new_writequeue_entry(con); 1210 if (!e) 1211 goto out; 1212 1213 kref_get(&e->ref); 1214 *ppc = page_address(e->page); 1215 e->end += len; 1216 if (cb) 1217 cb(data); 1218 1219 list_add_tail(&e->list, &con->writequeue); 1220 1221 out: 1222 spin_unlock_bh(&con->writequeue_lock); 1223 return e; 1224 }; 1225 1226 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1227 char **ppc, void (*cb)(void *data), 1228 void *data) 1229 { 1230 struct writequeue_entry *e; 1231 struct dlm_msg *msg; 1232 1233 msg = dlm_allocate_msg(); 1234 if (!msg) 1235 return NULL; 1236 1237 kref_init(&msg->ref); 1238 1239 e = new_wq_entry(con, len, ppc, cb, data); 1240 if (!e) { 1241 dlm_free_msg(msg); 1242 return NULL; 1243 } 1244 1245 msg->retransmit = false; 1246 msg->orig_msg = NULL; 1247 msg->ppc = *ppc; 1248 msg->len = len; 1249 msg->entry = e; 1250 1251 return msg; 1252 } 1253 1254 /* avoid false positive for nodes_srcu, unlock happens in 1255 * dlm_lowcomms_commit_msg which is a must call if success 1256 */ 1257 #ifndef __CHECKER__ 1258 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc, 1259 void (*cb)(void *data), void *data) 1260 { 1261 struct connection *con; 1262 struct dlm_msg *msg; 1263 int idx; 1264 1265 if (len > DLM_MAX_SOCKET_BUFSIZE || 1266 len < sizeof(struct dlm_header)) { 1267 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1268 log_print("failed to allocate a buffer of size %d", len); 1269 WARN_ON_ONCE(1); 1270 return NULL; 1271 } 1272 1273 idx = srcu_read_lock(&connections_srcu); 1274 con = nodeid2con(nodeid, 0); 1275 if (WARN_ON_ONCE(!con)) { 1276 srcu_read_unlock(&connections_srcu, idx); 1277 return NULL; 1278 } 1279 1280 msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); 1281 if (!msg) { 1282 srcu_read_unlock(&connections_srcu, idx); 1283 return NULL; 1284 } 1285 1286 /* for dlm_lowcomms_commit_msg() */ 1287 kref_get(&msg->ref); 1288 /* we assume if successful commit must called */ 1289 msg->idx = idx; 1290 return msg; 1291 } 1292 #endif 1293 1294 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1295 { 1296 struct writequeue_entry *e = msg->entry; 1297 struct connection *con = e->con; 1298 int users; 1299 1300 spin_lock_bh(&con->writequeue_lock); 1301 kref_get(&msg->ref); 1302 list_add(&msg->list, &e->msgs); 1303 1304 users = --e->users; 1305 if (users) 1306 goto out; 1307 1308 e->len = DLM_WQ_LENGTH_BYTES(e); 1309 1310 lowcomms_queue_swork(con); 1311 1312 out: 1313 spin_unlock_bh(&con->writequeue_lock); 1314 return; 1315 } 1316 1317 /* avoid false positive for nodes_srcu, lock was happen in 1318 * dlm_lowcomms_new_msg 1319 */ 1320 #ifndef __CHECKER__ 1321 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1322 { 1323 _dlm_lowcomms_commit_msg(msg); 1324 srcu_read_unlock(&connections_srcu, msg->idx); 1325 /* because dlm_lowcomms_new_msg() */ 1326 kref_put(&msg->ref, dlm_msg_release); 1327 } 1328 #endif 1329 1330 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1331 { 1332 kref_put(&msg->ref, dlm_msg_release); 1333 } 1334 1335 /* does not held connections_srcu, usage lowcomms_error_report only */ 1336 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1337 { 1338 struct dlm_msg *msg_resend; 1339 char *ppc; 1340 1341 if (msg->retransmit) 1342 return 1; 1343 1344 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, 1345 NULL, NULL); 1346 if (!msg_resend) 1347 return -ENOMEM; 1348 1349 msg->retransmit = true; 1350 kref_get(&msg->ref); 1351 msg_resend->orig_msg = msg; 1352 1353 memcpy(ppc, msg->ppc, msg->len); 1354 _dlm_lowcomms_commit_msg(msg_resend); 1355 dlm_lowcomms_put_msg(msg_resend); 1356 1357 return 0; 1358 } 1359 1360 /* Send a message */ 1361 static int send_to_sock(struct connection *con) 1362 { 1363 struct writequeue_entry *e; 1364 struct bio_vec bvec; 1365 struct msghdr msg = { 1366 .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL, 1367 }; 1368 int len, offset, ret; 1369 1370 spin_lock_bh(&con->writequeue_lock); 1371 e = con_next_wq(con); 1372 if (!e) { 1373 clear_bit(CF_SEND_PENDING, &con->flags); 1374 spin_unlock_bh(&con->writequeue_lock); 1375 return DLM_IO_END; 1376 } 1377 1378 len = e->len; 1379 offset = e->offset; 1380 WARN_ON_ONCE(len == 0 && e->users == 0); 1381 spin_unlock_bh(&con->writequeue_lock); 1382 1383 bvec_set_page(&bvec, e->page, len, offset); 1384 iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len); 1385 ret = sock_sendmsg(con->sock, &msg); 1386 trace_dlm_send(con->nodeid, ret); 1387 if (ret == -EAGAIN || ret == 0) { 1388 lock_sock(con->sock->sk); 1389 spin_lock_bh(&con->writequeue_lock); 1390 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1391 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1392 /* Notify TCP that we're limited by the 1393 * application window size. 1394 */ 1395 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1396 con->sock->sk->sk_write_pending++; 1397 1398 clear_bit(CF_SEND_PENDING, &con->flags); 1399 spin_unlock_bh(&con->writequeue_lock); 1400 release_sock(con->sock->sk); 1401 1402 /* wait for write_space() event */ 1403 return DLM_IO_END; 1404 } 1405 spin_unlock_bh(&con->writequeue_lock); 1406 release_sock(con->sock->sk); 1407 1408 return DLM_IO_RESCHED; 1409 } else if (ret < 0) { 1410 return ret; 1411 } 1412 1413 spin_lock_bh(&con->writequeue_lock); 1414 writequeue_entry_complete(e, ret); 1415 spin_unlock_bh(&con->writequeue_lock); 1416 1417 return DLM_IO_SUCCESS; 1418 } 1419 1420 static void clean_one_writequeue(struct connection *con) 1421 { 1422 struct writequeue_entry *e, *safe; 1423 1424 spin_lock_bh(&con->writequeue_lock); 1425 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1426 free_entry(e); 1427 } 1428 spin_unlock_bh(&con->writequeue_lock); 1429 } 1430 1431 static void connection_release(struct rcu_head *rcu) 1432 { 1433 struct connection *con = container_of(rcu, struct connection, rcu); 1434 1435 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1436 WARN_ON_ONCE(con->sock); 1437 kfree(con); 1438 } 1439 1440 /* Called from recovery when it knows that a node has 1441 left the cluster */ 1442 int dlm_lowcomms_close(int nodeid) 1443 { 1444 struct connection *con; 1445 int idx; 1446 1447 log_print("closing connection to node %d", nodeid); 1448 1449 idx = srcu_read_lock(&connections_srcu); 1450 con = nodeid2con(nodeid, 0); 1451 if (WARN_ON_ONCE(!con)) { 1452 srcu_read_unlock(&connections_srcu, idx); 1453 return -ENOENT; 1454 } 1455 1456 stop_connection_io(con); 1457 log_print("io handling for node: %d stopped", nodeid); 1458 close_connection(con, true); 1459 1460 spin_lock(&connections_lock); 1461 hlist_del_rcu(&con->list); 1462 spin_unlock(&connections_lock); 1463 1464 clean_one_writequeue(con); 1465 call_srcu(&connections_srcu, &con->rcu, connection_release); 1466 if (con->othercon) { 1467 clean_one_writequeue(con->othercon); 1468 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1469 } 1470 srcu_read_unlock(&connections_srcu, idx); 1471 1472 /* for debugging we print when we are done to compare with other 1473 * messages in between. This function need to be correctly synchronized 1474 * with io handling 1475 */ 1476 log_print("closing connection to node %d done", nodeid); 1477 1478 return 0; 1479 } 1480 1481 /* Receive worker function */ 1482 static void process_recv_sockets(struct work_struct *work) 1483 { 1484 struct connection *con = container_of(work, struct connection, rwork); 1485 int ret, buflen; 1486 1487 down_read(&con->sock_lock); 1488 if (!con->sock) { 1489 up_read(&con->sock_lock); 1490 return; 1491 } 1492 1493 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1494 do { 1495 ret = receive_from_sock(con, buflen); 1496 } while (ret == DLM_IO_SUCCESS); 1497 up_read(&con->sock_lock); 1498 1499 switch (ret) { 1500 case DLM_IO_END: 1501 /* CF_RECV_PENDING cleared */ 1502 break; 1503 case DLM_IO_EOF: 1504 close_connection(con, false); 1505 wake_up(&con->shutdown_wait); 1506 /* CF_RECV_PENDING cleared */ 1507 break; 1508 case DLM_IO_FLUSH: 1509 /* we can't flush the process_workqueue here because a 1510 * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non 1511 * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead 1512 * we have a waitqueue to wait until all messages are 1513 * processed. 1514 * 1515 * This handling is only necessary to backoff the sender and 1516 * not queue all messages from the socket layer into DLM 1517 * processqueue. When DLM is capable to parse multiple messages 1518 * on an e.g. per socket basis this handling can might be 1519 * removed. Especially in a message burst we are too slow to 1520 * process messages and the queue will fill up memory. 1521 */ 1522 wait_event(processqueue_wq, !atomic_read(&processqueue_count)); 1523 fallthrough; 1524 case DLM_IO_RESCHED: 1525 cond_resched(); 1526 queue_work(io_workqueue, &con->rwork); 1527 /* CF_RECV_PENDING not cleared */ 1528 break; 1529 default: 1530 if (ret < 0) { 1531 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1532 close_connection(con, false); 1533 } else { 1534 spin_lock_bh(&con->writequeue_lock); 1535 lowcomms_queue_swork(con); 1536 spin_unlock_bh(&con->writequeue_lock); 1537 } 1538 1539 /* CF_RECV_PENDING cleared for othercon 1540 * we trigger send queue if not already done 1541 * and process_send_sockets will handle it 1542 */ 1543 break; 1544 } 1545 1546 WARN_ON_ONCE(1); 1547 break; 1548 } 1549 } 1550 1551 static void process_listen_recv_socket(struct work_struct *work) 1552 { 1553 int ret; 1554 1555 if (WARN_ON_ONCE(!listen_con.sock)) 1556 return; 1557 1558 do { 1559 ret = accept_from_sock(); 1560 } while (ret == DLM_IO_SUCCESS); 1561 1562 if (ret < 0) 1563 log_print("critical error accepting connection: %d", ret); 1564 } 1565 1566 static int dlm_connect(struct connection *con) 1567 { 1568 struct sockaddr_storage addr; 1569 int result, addr_len; 1570 struct socket *sock; 1571 unsigned int mark; 1572 1573 memset(&addr, 0, sizeof(addr)); 1574 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1575 dlm_proto_ops->try_new_addr, &mark); 1576 if (result < 0) { 1577 log_print("no address for nodeid %d", con->nodeid); 1578 return result; 1579 } 1580 1581 /* Create a socket to communicate with */ 1582 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1583 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1584 if (result < 0) 1585 return result; 1586 1587 sock_set_mark(sock->sk, mark); 1588 dlm_proto_ops->sockopts(sock); 1589 1590 result = dlm_proto_ops->bind(sock); 1591 if (result < 0) { 1592 sock_release(sock); 1593 return result; 1594 } 1595 1596 add_sock(sock, con); 1597 1598 log_print_ratelimited("connecting to %d", con->nodeid); 1599 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1600 result = kernel_connect(sock, (struct sockaddr *)&addr, addr_len, 0); 1601 switch (result) { 1602 case -EINPROGRESS: 1603 /* not an error */ 1604 fallthrough; 1605 case 0: 1606 break; 1607 default: 1608 if (result < 0) 1609 dlm_close_sock(&con->sock); 1610 1611 break; 1612 } 1613 1614 return result; 1615 } 1616 1617 /* Send worker function */ 1618 static void process_send_sockets(struct work_struct *work) 1619 { 1620 struct connection *con = container_of(work, struct connection, swork); 1621 int ret; 1622 1623 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1624 1625 down_read(&con->sock_lock); 1626 if (!con->sock) { 1627 up_read(&con->sock_lock); 1628 down_write(&con->sock_lock); 1629 if (!con->sock) { 1630 ret = dlm_connect(con); 1631 switch (ret) { 1632 case 0: 1633 break; 1634 default: 1635 /* CF_SEND_PENDING not cleared */ 1636 up_write(&con->sock_lock); 1637 log_print("connect to node %d try %d error %d", 1638 con->nodeid, con->retries++, ret); 1639 msleep(1000); 1640 /* For now we try forever to reconnect. In 1641 * future we should send a event to cluster 1642 * manager to fence itself after certain amount 1643 * of retries. 1644 */ 1645 queue_work(io_workqueue, &con->swork); 1646 return; 1647 } 1648 } 1649 downgrade_write(&con->sock_lock); 1650 } 1651 1652 do { 1653 ret = send_to_sock(con); 1654 } while (ret == DLM_IO_SUCCESS); 1655 up_read(&con->sock_lock); 1656 1657 switch (ret) { 1658 case DLM_IO_END: 1659 /* CF_SEND_PENDING cleared */ 1660 break; 1661 case DLM_IO_RESCHED: 1662 /* CF_SEND_PENDING not cleared */ 1663 cond_resched(); 1664 queue_work(io_workqueue, &con->swork); 1665 break; 1666 default: 1667 if (ret < 0) { 1668 close_connection(con, false); 1669 1670 /* CF_SEND_PENDING cleared */ 1671 spin_lock_bh(&con->writequeue_lock); 1672 lowcomms_queue_swork(con); 1673 spin_unlock_bh(&con->writequeue_lock); 1674 break; 1675 } 1676 1677 WARN_ON_ONCE(1); 1678 break; 1679 } 1680 } 1681 1682 static void work_stop(void) 1683 { 1684 if (io_workqueue) { 1685 destroy_workqueue(io_workqueue); 1686 io_workqueue = NULL; 1687 } 1688 1689 if (process_workqueue) { 1690 destroy_workqueue(process_workqueue); 1691 process_workqueue = NULL; 1692 } 1693 } 1694 1695 static int work_start(void) 1696 { 1697 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM | 1698 WQ_UNBOUND, 0); 1699 if (!io_workqueue) { 1700 log_print("can't start dlm_io"); 1701 return -ENOMEM; 1702 } 1703 1704 process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0); 1705 if (!process_workqueue) { 1706 log_print("can't start dlm_process"); 1707 destroy_workqueue(io_workqueue); 1708 io_workqueue = NULL; 1709 return -ENOMEM; 1710 } 1711 1712 return 0; 1713 } 1714 1715 void dlm_lowcomms_shutdown(void) 1716 { 1717 struct connection *con; 1718 int i, idx; 1719 1720 /* stop lowcomms_listen_data_ready calls */ 1721 lock_sock(listen_con.sock->sk); 1722 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1723 release_sock(listen_con.sock->sk); 1724 1725 cancel_work_sync(&listen_con.rwork); 1726 dlm_close_sock(&listen_con.sock); 1727 1728 idx = srcu_read_lock(&connections_srcu); 1729 for (i = 0; i < CONN_HASH_SIZE; i++) { 1730 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1731 shutdown_connection(con, true); 1732 stop_connection_io(con); 1733 flush_workqueue(process_workqueue); 1734 close_connection(con, true); 1735 1736 clean_one_writequeue(con); 1737 if (con->othercon) 1738 clean_one_writequeue(con->othercon); 1739 allow_connection_io(con); 1740 } 1741 } 1742 srcu_read_unlock(&connections_srcu, idx); 1743 } 1744 1745 void dlm_lowcomms_stop(void) 1746 { 1747 work_stop(); 1748 dlm_proto_ops = NULL; 1749 } 1750 1751 static int dlm_listen_for_all(void) 1752 { 1753 struct socket *sock; 1754 int result; 1755 1756 log_print("Using %s for communications", 1757 dlm_proto_ops->name); 1758 1759 result = dlm_proto_ops->listen_validate(); 1760 if (result < 0) 1761 return result; 1762 1763 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1764 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1765 if (result < 0) { 1766 log_print("Can't create comms socket: %d", result); 1767 return result; 1768 } 1769 1770 sock_set_mark(sock->sk, dlm_config.ci_mark); 1771 dlm_proto_ops->listen_sockopts(sock); 1772 1773 result = dlm_proto_ops->listen_bind(sock); 1774 if (result < 0) 1775 goto out; 1776 1777 lock_sock(sock->sk); 1778 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1779 listen_sock.sk_write_space = sock->sk->sk_write_space; 1780 listen_sock.sk_error_report = sock->sk->sk_error_report; 1781 listen_sock.sk_state_change = sock->sk->sk_state_change; 1782 1783 listen_con.sock = sock; 1784 1785 sock->sk->sk_allocation = GFP_NOFS; 1786 sock->sk->sk_use_task_frag = false; 1787 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1788 release_sock(sock->sk); 1789 1790 result = sock->ops->listen(sock, 128); 1791 if (result < 0) { 1792 dlm_close_sock(&listen_con.sock); 1793 return result; 1794 } 1795 1796 return 0; 1797 1798 out: 1799 sock_release(sock); 1800 return result; 1801 } 1802 1803 static int dlm_tcp_bind(struct socket *sock) 1804 { 1805 struct sockaddr_storage src_addr; 1806 int result, addr_len; 1807 1808 /* Bind to our cluster-known address connecting to avoid 1809 * routing problems. 1810 */ 1811 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1812 make_sockaddr(&src_addr, 0, &addr_len); 1813 1814 result = kernel_bind(sock, (struct sockaddr *)&src_addr, 1815 addr_len); 1816 if (result < 0) { 1817 /* This *may* not indicate a critical error */ 1818 log_print("could not bind for connect: %d", result); 1819 } 1820 1821 return 0; 1822 } 1823 1824 static int dlm_tcp_listen_validate(void) 1825 { 1826 /* We don't support multi-homed hosts */ 1827 if (dlm_local_count > 1) { 1828 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1829 return -EINVAL; 1830 } 1831 1832 return 0; 1833 } 1834 1835 static void dlm_tcp_sockopts(struct socket *sock) 1836 { 1837 /* Turn off Nagle's algorithm */ 1838 tcp_sock_set_nodelay(sock->sk); 1839 } 1840 1841 static void dlm_tcp_listen_sockopts(struct socket *sock) 1842 { 1843 dlm_tcp_sockopts(sock); 1844 sock_set_reuseaddr(sock->sk); 1845 } 1846 1847 static int dlm_tcp_listen_bind(struct socket *sock) 1848 { 1849 int addr_len; 1850 1851 /* Bind to our port */ 1852 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1853 return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1854 addr_len); 1855 } 1856 1857 static const struct dlm_proto_ops dlm_tcp_ops = { 1858 .name = "TCP", 1859 .proto = IPPROTO_TCP, 1860 .sockopts = dlm_tcp_sockopts, 1861 .bind = dlm_tcp_bind, 1862 .listen_validate = dlm_tcp_listen_validate, 1863 .listen_sockopts = dlm_tcp_listen_sockopts, 1864 .listen_bind = dlm_tcp_listen_bind, 1865 }; 1866 1867 static int dlm_sctp_bind(struct socket *sock) 1868 { 1869 return sctp_bind_addrs(sock, 0); 1870 } 1871 1872 static int dlm_sctp_listen_validate(void) 1873 { 1874 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1875 log_print("SCTP is not enabled by this kernel"); 1876 return -EOPNOTSUPP; 1877 } 1878 1879 request_module("sctp"); 1880 return 0; 1881 } 1882 1883 static int dlm_sctp_bind_listen(struct socket *sock) 1884 { 1885 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1886 } 1887 1888 static void dlm_sctp_sockopts(struct socket *sock) 1889 { 1890 /* Turn off Nagle's algorithm */ 1891 sctp_sock_set_nodelay(sock->sk); 1892 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1893 } 1894 1895 static const struct dlm_proto_ops dlm_sctp_ops = { 1896 .name = "SCTP", 1897 .proto = IPPROTO_SCTP, 1898 .try_new_addr = true, 1899 .sockopts = dlm_sctp_sockopts, 1900 .bind = dlm_sctp_bind, 1901 .listen_validate = dlm_sctp_listen_validate, 1902 .listen_sockopts = dlm_sctp_sockopts, 1903 .listen_bind = dlm_sctp_bind_listen, 1904 }; 1905 1906 int dlm_lowcomms_start(void) 1907 { 1908 int error; 1909 1910 init_local(); 1911 if (!dlm_local_count) { 1912 error = -ENOTCONN; 1913 log_print("no local IP address has been set"); 1914 goto fail; 1915 } 1916 1917 error = work_start(); 1918 if (error) 1919 goto fail; 1920 1921 /* Start listening */ 1922 switch (dlm_config.ci_protocol) { 1923 case DLM_PROTO_TCP: 1924 dlm_proto_ops = &dlm_tcp_ops; 1925 break; 1926 case DLM_PROTO_SCTP: 1927 dlm_proto_ops = &dlm_sctp_ops; 1928 break; 1929 default: 1930 log_print("Invalid protocol identifier %d set", 1931 dlm_config.ci_protocol); 1932 error = -EINVAL; 1933 goto fail_proto_ops; 1934 } 1935 1936 error = dlm_listen_for_all(); 1937 if (error) 1938 goto fail_listen; 1939 1940 return 0; 1941 1942 fail_listen: 1943 dlm_proto_ops = NULL; 1944 fail_proto_ops: 1945 work_stop(); 1946 fail: 1947 return error; 1948 } 1949 1950 void dlm_lowcomms_init(void) 1951 { 1952 int i; 1953 1954 for (i = 0; i < CONN_HASH_SIZE; i++) 1955 INIT_HLIST_HEAD(&connection_hash[i]); 1956 1957 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1958 } 1959 1960 void dlm_lowcomms_exit(void) 1961 { 1962 struct connection *con; 1963 int i, idx; 1964 1965 idx = srcu_read_lock(&connections_srcu); 1966 for (i = 0; i < CONN_HASH_SIZE; i++) { 1967 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1968 spin_lock(&connections_lock); 1969 hlist_del_rcu(&con->list); 1970 spin_unlock(&connections_lock); 1971 1972 if (con->othercon) 1973 call_srcu(&connections_srcu, &con->othercon->rcu, 1974 connection_release); 1975 call_srcu(&connections_srcu, &con->rcu, connection_release); 1976 } 1977 } 1978 srcu_read_unlock(&connections_srcu, idx); 1979 } 1980