1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 #include <trace/events/sock.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "memory.h" 63 #include "config.h" 64 65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) 66 #define DLM_MAX_PROCESS_BUFFERS 24 67 #define NEEDED_RMEM (4*1024*1024) 68 69 struct connection { 70 struct socket *sock; /* NULL if not connected */ 71 uint32_t nodeid; /* So we know who we are in the list */ 72 /* this semaphore is used to allow parallel recv/send in read 73 * lock mode. When we release a sock we need to held the write lock. 74 * 75 * However this is locking code and not nice. When we remove the 76 * othercon handling we can look into other mechanism to synchronize 77 * io handling to call sock_release() at the right time. 78 */ 79 struct rw_semaphore sock_lock; 80 unsigned long flags; 81 #define CF_APP_LIMITED 0 82 #define CF_RECV_PENDING 1 83 #define CF_SEND_PENDING 2 84 #define CF_RECV_INTR 3 85 #define CF_IO_STOP 4 86 #define CF_IS_OTHERCON 5 87 struct list_head writequeue; /* List of outgoing writequeue_entries */ 88 spinlock_t writequeue_lock; 89 int retries; 90 struct hlist_node list; 91 /* due some connect()/accept() races we currently have this cross over 92 * connection attempt second connection for one node. 93 * 94 * There is a solution to avoid the race by introducing a connect 95 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 96 * connect. Otherside can connect but will only be considered that 97 * the other side wants to have a reconnect. 98 * 99 * However changing to this behaviour will break backwards compatible. 100 * In a DLM protocol major version upgrade we should remove this! 101 */ 102 struct connection *othercon; 103 struct work_struct rwork; /* receive worker */ 104 struct work_struct swork; /* send worker */ 105 wait_queue_head_t shutdown_wait; 106 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 107 int rx_leftover; 108 int mark; 109 int addr_count; 110 int curr_addr_index; 111 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 112 spinlock_t addrs_lock; 113 struct rcu_head rcu; 114 }; 115 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 116 117 struct listen_connection { 118 struct socket *sock; 119 struct work_struct rwork; 120 }; 121 122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 124 125 /* An entry waiting to be sent */ 126 struct writequeue_entry { 127 struct list_head list; 128 struct page *page; 129 int offset; 130 int len; 131 int end; 132 int users; 133 bool dirty; 134 struct connection *con; 135 struct list_head msgs; 136 struct kref ref; 137 }; 138 139 struct dlm_msg { 140 struct writequeue_entry *entry; 141 struct dlm_msg *orig_msg; 142 bool retransmit; 143 void *ppc; 144 int len; 145 int idx; /* new()/commit() idx exchange */ 146 147 struct list_head list; 148 struct kref ref; 149 }; 150 151 struct processqueue_entry { 152 unsigned char *buf; 153 int nodeid; 154 int buflen; 155 156 struct list_head list; 157 }; 158 159 struct dlm_proto_ops { 160 bool try_new_addr; 161 const char *name; 162 int proto; 163 164 int (*connect)(struct connection *con, struct socket *sock, 165 struct sockaddr *addr, int addr_len); 166 void (*sockopts)(struct socket *sock); 167 int (*bind)(struct socket *sock); 168 int (*listen_validate)(void); 169 void (*listen_sockopts)(struct socket *sock); 170 int (*listen_bind)(struct socket *sock); 171 }; 172 173 static struct listen_sock_callbacks { 174 void (*sk_error_report)(struct sock *); 175 void (*sk_data_ready)(struct sock *); 176 void (*sk_state_change)(struct sock *); 177 void (*sk_write_space)(struct sock *); 178 } listen_sock; 179 180 static struct listen_connection listen_con; 181 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 182 static int dlm_local_count; 183 184 /* Work queues */ 185 static struct workqueue_struct *io_workqueue; 186 static struct workqueue_struct *process_workqueue; 187 188 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 189 static DEFINE_SPINLOCK(connections_lock); 190 DEFINE_STATIC_SRCU(connections_srcu); 191 192 static const struct dlm_proto_ops *dlm_proto_ops; 193 194 #define DLM_IO_SUCCESS 0 195 #define DLM_IO_END 1 196 #define DLM_IO_EOF 2 197 #define DLM_IO_RESCHED 3 198 #define DLM_IO_FLUSH 4 199 200 static void process_recv_sockets(struct work_struct *work); 201 static void process_send_sockets(struct work_struct *work); 202 static void process_dlm_messages(struct work_struct *work); 203 204 static DECLARE_WORK(process_work, process_dlm_messages); 205 static DEFINE_SPINLOCK(processqueue_lock); 206 static bool process_dlm_messages_pending; 207 static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq); 208 static atomic_t processqueue_count; 209 static LIST_HEAD(processqueue); 210 211 bool dlm_lowcomms_is_running(void) 212 { 213 return !!listen_con.sock; 214 } 215 216 static void lowcomms_queue_swork(struct connection *con) 217 { 218 assert_spin_locked(&con->writequeue_lock); 219 220 if (!test_bit(CF_IO_STOP, &con->flags) && 221 !test_bit(CF_APP_LIMITED, &con->flags) && 222 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 223 queue_work(io_workqueue, &con->swork); 224 } 225 226 static void lowcomms_queue_rwork(struct connection *con) 227 { 228 #ifdef CONFIG_LOCKDEP 229 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 230 #endif 231 232 if (!test_bit(CF_IO_STOP, &con->flags) && 233 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 234 queue_work(io_workqueue, &con->rwork); 235 } 236 237 static void writequeue_entry_ctor(void *data) 238 { 239 struct writequeue_entry *entry = data; 240 241 INIT_LIST_HEAD(&entry->msgs); 242 } 243 244 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 245 { 246 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 247 0, 0, writequeue_entry_ctor); 248 } 249 250 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 251 { 252 return KMEM_CACHE(dlm_msg, 0); 253 } 254 255 /* need to held writequeue_lock */ 256 static struct writequeue_entry *con_next_wq(struct connection *con) 257 { 258 struct writequeue_entry *e; 259 260 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 261 list); 262 /* if len is zero nothing is to send, if there are users filling 263 * buffers we wait until the users are done so we can send more. 264 */ 265 if (!e || e->users || e->len == 0) 266 return NULL; 267 268 return e; 269 } 270 271 static struct connection *__find_con(int nodeid, int r) 272 { 273 struct connection *con; 274 275 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 276 if (con->nodeid == nodeid) 277 return con; 278 } 279 280 return NULL; 281 } 282 283 static void dlm_con_init(struct connection *con, int nodeid) 284 { 285 con->nodeid = nodeid; 286 init_rwsem(&con->sock_lock); 287 INIT_LIST_HEAD(&con->writequeue); 288 spin_lock_init(&con->writequeue_lock); 289 INIT_WORK(&con->swork, process_send_sockets); 290 INIT_WORK(&con->rwork, process_recv_sockets); 291 spin_lock_init(&con->addrs_lock); 292 init_waitqueue_head(&con->shutdown_wait); 293 } 294 295 /* 296 * If 'allocation' is zero then we don't attempt to create a new 297 * connection structure for this node. 298 */ 299 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 300 { 301 struct connection *con, *tmp; 302 int r; 303 304 r = nodeid_hash(nodeid); 305 con = __find_con(nodeid, r); 306 if (con || !alloc) 307 return con; 308 309 con = kzalloc(sizeof(*con), alloc); 310 if (!con) 311 return NULL; 312 313 dlm_con_init(con, nodeid); 314 315 spin_lock(&connections_lock); 316 /* Because multiple workqueues/threads calls this function it can 317 * race on multiple cpu's. Instead of locking hot path __find_con() 318 * we just check in rare cases of recently added nodes again 319 * under protection of connections_lock. If this is the case we 320 * abort our connection creation and return the existing connection. 321 */ 322 tmp = __find_con(nodeid, r); 323 if (tmp) { 324 spin_unlock(&connections_lock); 325 kfree(con); 326 return tmp; 327 } 328 329 hlist_add_head_rcu(&con->list, &connection_hash[r]); 330 spin_unlock(&connections_lock); 331 332 return con; 333 } 334 335 static int addr_compare(const struct sockaddr_storage *x, 336 const struct sockaddr_storage *y) 337 { 338 switch (x->ss_family) { 339 case AF_INET: { 340 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 341 struct sockaddr_in *siny = (struct sockaddr_in *)y; 342 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 343 return 0; 344 if (sinx->sin_port != siny->sin_port) 345 return 0; 346 break; 347 } 348 case AF_INET6: { 349 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 350 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 351 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 352 return 0; 353 if (sinx->sin6_port != siny->sin6_port) 354 return 0; 355 break; 356 } 357 default: 358 return 0; 359 } 360 return 1; 361 } 362 363 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 364 struct sockaddr *sa_out, bool try_new_addr, 365 unsigned int *mark) 366 { 367 struct sockaddr_storage sas; 368 struct connection *con; 369 int idx; 370 371 if (!dlm_local_count) 372 return -1; 373 374 idx = srcu_read_lock(&connections_srcu); 375 con = nodeid2con(nodeid, 0); 376 if (!con) { 377 srcu_read_unlock(&connections_srcu, idx); 378 return -ENOENT; 379 } 380 381 spin_lock(&con->addrs_lock); 382 if (!con->addr_count) { 383 spin_unlock(&con->addrs_lock); 384 srcu_read_unlock(&connections_srcu, idx); 385 return -ENOENT; 386 } 387 388 memcpy(&sas, &con->addr[con->curr_addr_index], 389 sizeof(struct sockaddr_storage)); 390 391 if (try_new_addr) { 392 con->curr_addr_index++; 393 if (con->curr_addr_index == con->addr_count) 394 con->curr_addr_index = 0; 395 } 396 397 *mark = con->mark; 398 spin_unlock(&con->addrs_lock); 399 400 if (sas_out) 401 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 402 403 if (!sa_out) { 404 srcu_read_unlock(&connections_srcu, idx); 405 return 0; 406 } 407 408 if (dlm_local_addr[0].ss_family == AF_INET) { 409 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 410 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 411 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 412 } else { 413 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 414 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 415 ret6->sin6_addr = in6->sin6_addr; 416 } 417 418 srcu_read_unlock(&connections_srcu, idx); 419 return 0; 420 } 421 422 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 423 unsigned int *mark) 424 { 425 struct connection *con; 426 int i, idx, addr_i; 427 428 idx = srcu_read_lock(&connections_srcu); 429 for (i = 0; i < CONN_HASH_SIZE; i++) { 430 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 431 WARN_ON_ONCE(!con->addr_count); 432 433 spin_lock(&con->addrs_lock); 434 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 435 if (addr_compare(&con->addr[addr_i], addr)) { 436 *nodeid = con->nodeid; 437 *mark = con->mark; 438 spin_unlock(&con->addrs_lock); 439 srcu_read_unlock(&connections_srcu, idx); 440 return 0; 441 } 442 } 443 spin_unlock(&con->addrs_lock); 444 } 445 } 446 srcu_read_unlock(&connections_srcu, idx); 447 448 return -ENOENT; 449 } 450 451 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 452 const struct sockaddr_storage *addr) 453 { 454 int i; 455 456 for (i = 0; i < con->addr_count; i++) { 457 if (addr_compare(&con->addr[i], addr)) 458 return true; 459 } 460 461 return false; 462 } 463 464 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 465 { 466 struct connection *con; 467 bool ret, idx; 468 469 idx = srcu_read_lock(&connections_srcu); 470 con = nodeid2con(nodeid, GFP_NOFS); 471 if (!con) { 472 srcu_read_unlock(&connections_srcu, idx); 473 return -ENOMEM; 474 } 475 476 spin_lock(&con->addrs_lock); 477 if (!con->addr_count) { 478 memcpy(&con->addr[0], addr, sizeof(*addr)); 479 con->addr_count = 1; 480 con->mark = dlm_config.ci_mark; 481 spin_unlock(&con->addrs_lock); 482 srcu_read_unlock(&connections_srcu, idx); 483 return 0; 484 } 485 486 ret = dlm_lowcomms_con_has_addr(con, addr); 487 if (ret) { 488 spin_unlock(&con->addrs_lock); 489 srcu_read_unlock(&connections_srcu, idx); 490 return -EEXIST; 491 } 492 493 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 494 spin_unlock(&con->addrs_lock); 495 srcu_read_unlock(&connections_srcu, idx); 496 return -ENOSPC; 497 } 498 499 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 500 srcu_read_unlock(&connections_srcu, idx); 501 spin_unlock(&con->addrs_lock); 502 return 0; 503 } 504 505 /* Data available on socket or listen socket received a connect */ 506 static void lowcomms_data_ready(struct sock *sk) 507 { 508 struct connection *con = sock2con(sk); 509 510 trace_sk_data_ready(sk); 511 512 set_bit(CF_RECV_INTR, &con->flags); 513 lowcomms_queue_rwork(con); 514 } 515 516 static void lowcomms_write_space(struct sock *sk) 517 { 518 struct connection *con = sock2con(sk); 519 520 clear_bit(SOCK_NOSPACE, &con->sock->flags); 521 522 spin_lock_bh(&con->writequeue_lock); 523 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 524 con->sock->sk->sk_write_pending--; 525 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 526 } 527 528 lowcomms_queue_swork(con); 529 spin_unlock_bh(&con->writequeue_lock); 530 } 531 532 static void lowcomms_state_change(struct sock *sk) 533 { 534 /* SCTP layer is not calling sk_data_ready when the connection 535 * is done, so we catch the signal through here. 536 */ 537 if (sk->sk_shutdown == RCV_SHUTDOWN) 538 lowcomms_data_ready(sk); 539 } 540 541 static void lowcomms_listen_data_ready(struct sock *sk) 542 { 543 trace_sk_data_ready(sk); 544 545 queue_work(io_workqueue, &listen_con.rwork); 546 } 547 548 int dlm_lowcomms_connect_node(int nodeid) 549 { 550 struct connection *con; 551 int idx; 552 553 idx = srcu_read_lock(&connections_srcu); 554 con = nodeid2con(nodeid, 0); 555 if (WARN_ON_ONCE(!con)) { 556 srcu_read_unlock(&connections_srcu, idx); 557 return -ENOENT; 558 } 559 560 down_read(&con->sock_lock); 561 if (!con->sock) { 562 spin_lock_bh(&con->writequeue_lock); 563 lowcomms_queue_swork(con); 564 spin_unlock_bh(&con->writequeue_lock); 565 } 566 up_read(&con->sock_lock); 567 srcu_read_unlock(&connections_srcu, idx); 568 569 cond_resched(); 570 return 0; 571 } 572 573 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 574 { 575 struct connection *con; 576 int idx; 577 578 idx = srcu_read_lock(&connections_srcu); 579 con = nodeid2con(nodeid, 0); 580 if (!con) { 581 srcu_read_unlock(&connections_srcu, idx); 582 return -ENOENT; 583 } 584 585 spin_lock(&con->addrs_lock); 586 con->mark = mark; 587 spin_unlock(&con->addrs_lock); 588 srcu_read_unlock(&connections_srcu, idx); 589 return 0; 590 } 591 592 static void lowcomms_error_report(struct sock *sk) 593 { 594 struct connection *con = sock2con(sk); 595 struct inet_sock *inet; 596 597 inet = inet_sk(sk); 598 switch (sk->sk_family) { 599 case AF_INET: 600 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 601 "sending to node %d at %pI4, dport %d, " 602 "sk_err=%d/%d\n", dlm_our_nodeid(), 603 con->nodeid, &inet->inet_daddr, 604 ntohs(inet->inet_dport), sk->sk_err, 605 READ_ONCE(sk->sk_err_soft)); 606 break; 607 #if IS_ENABLED(CONFIG_IPV6) 608 case AF_INET6: 609 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 610 "sending to node %d at %pI6c, " 611 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 612 con->nodeid, &sk->sk_v6_daddr, 613 ntohs(inet->inet_dport), sk->sk_err, 614 READ_ONCE(sk->sk_err_soft)); 615 break; 616 #endif 617 default: 618 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 619 "invalid socket family %d set, " 620 "sk_err=%d/%d\n", dlm_our_nodeid(), 621 sk->sk_family, sk->sk_err, 622 READ_ONCE(sk->sk_err_soft)); 623 break; 624 } 625 626 dlm_midcomms_unack_msg_resend(con->nodeid); 627 628 listen_sock.sk_error_report(sk); 629 } 630 631 static void restore_callbacks(struct sock *sk) 632 { 633 #ifdef CONFIG_LOCKDEP 634 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 635 #endif 636 637 sk->sk_user_data = NULL; 638 sk->sk_data_ready = listen_sock.sk_data_ready; 639 sk->sk_state_change = listen_sock.sk_state_change; 640 sk->sk_write_space = listen_sock.sk_write_space; 641 sk->sk_error_report = listen_sock.sk_error_report; 642 } 643 644 /* Make a socket active */ 645 static void add_sock(struct socket *sock, struct connection *con) 646 { 647 struct sock *sk = sock->sk; 648 649 lock_sock(sk); 650 con->sock = sock; 651 652 sk->sk_user_data = con; 653 sk->sk_data_ready = lowcomms_data_ready; 654 sk->sk_write_space = lowcomms_write_space; 655 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 656 sk->sk_state_change = lowcomms_state_change; 657 sk->sk_allocation = GFP_NOFS; 658 sk->sk_use_task_frag = false; 659 sk->sk_error_report = lowcomms_error_report; 660 release_sock(sk); 661 } 662 663 /* Add the port number to an IPv6 or 4 sockaddr and return the address 664 length */ 665 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 666 int *addr_len) 667 { 668 saddr->ss_family = dlm_local_addr[0].ss_family; 669 if (saddr->ss_family == AF_INET) { 670 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 671 in4_addr->sin_port = cpu_to_be16(port); 672 *addr_len = sizeof(struct sockaddr_in); 673 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 674 } else { 675 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 676 in6_addr->sin6_port = cpu_to_be16(port); 677 *addr_len = sizeof(struct sockaddr_in6); 678 } 679 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 680 } 681 682 static void dlm_page_release(struct kref *kref) 683 { 684 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 685 ref); 686 687 __free_page(e->page); 688 dlm_free_writequeue(e); 689 } 690 691 static void dlm_msg_release(struct kref *kref) 692 { 693 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 694 695 kref_put(&msg->entry->ref, dlm_page_release); 696 dlm_free_msg(msg); 697 } 698 699 static void free_entry(struct writequeue_entry *e) 700 { 701 struct dlm_msg *msg, *tmp; 702 703 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 704 if (msg->orig_msg) { 705 msg->orig_msg->retransmit = false; 706 kref_put(&msg->orig_msg->ref, dlm_msg_release); 707 } 708 709 list_del(&msg->list); 710 kref_put(&msg->ref, dlm_msg_release); 711 } 712 713 list_del(&e->list); 714 kref_put(&e->ref, dlm_page_release); 715 } 716 717 static void dlm_close_sock(struct socket **sock) 718 { 719 lock_sock((*sock)->sk); 720 restore_callbacks((*sock)->sk); 721 release_sock((*sock)->sk); 722 723 sock_release(*sock); 724 *sock = NULL; 725 } 726 727 static void allow_connection_io(struct connection *con) 728 { 729 if (con->othercon) 730 clear_bit(CF_IO_STOP, &con->othercon->flags); 731 clear_bit(CF_IO_STOP, &con->flags); 732 } 733 734 static void stop_connection_io(struct connection *con) 735 { 736 if (con->othercon) 737 stop_connection_io(con->othercon); 738 739 spin_lock_bh(&con->writequeue_lock); 740 set_bit(CF_IO_STOP, &con->flags); 741 spin_unlock_bh(&con->writequeue_lock); 742 743 down_write(&con->sock_lock); 744 if (con->sock) { 745 lock_sock(con->sock->sk); 746 restore_callbacks(con->sock->sk); 747 release_sock(con->sock->sk); 748 } 749 up_write(&con->sock_lock); 750 751 cancel_work_sync(&con->swork); 752 cancel_work_sync(&con->rwork); 753 } 754 755 /* Close a remote connection and tidy up */ 756 static void close_connection(struct connection *con, bool and_other) 757 { 758 struct writequeue_entry *e; 759 760 if (con->othercon && and_other) 761 close_connection(con->othercon, false); 762 763 down_write(&con->sock_lock); 764 if (!con->sock) { 765 up_write(&con->sock_lock); 766 return; 767 } 768 769 dlm_close_sock(&con->sock); 770 771 /* if we send a writequeue entry only a half way, we drop the 772 * whole entry because reconnection and that we not start of the 773 * middle of a msg which will confuse the other end. 774 * 775 * we can always drop messages because retransmits, but what we 776 * cannot allow is to transmit half messages which may be processed 777 * at the other side. 778 * 779 * our policy is to start on a clean state when disconnects, we don't 780 * know what's send/received on transport layer in this case. 781 */ 782 spin_lock_bh(&con->writequeue_lock); 783 if (!list_empty(&con->writequeue)) { 784 e = list_first_entry(&con->writequeue, struct writequeue_entry, 785 list); 786 if (e->dirty) 787 free_entry(e); 788 } 789 spin_unlock_bh(&con->writequeue_lock); 790 791 con->rx_leftover = 0; 792 con->retries = 0; 793 clear_bit(CF_APP_LIMITED, &con->flags); 794 clear_bit(CF_RECV_PENDING, &con->flags); 795 clear_bit(CF_SEND_PENDING, &con->flags); 796 up_write(&con->sock_lock); 797 } 798 799 static void shutdown_connection(struct connection *con, bool and_other) 800 { 801 int ret; 802 803 if (con->othercon && and_other) 804 shutdown_connection(con->othercon, false); 805 806 flush_workqueue(io_workqueue); 807 down_read(&con->sock_lock); 808 /* nothing to shutdown */ 809 if (!con->sock) { 810 up_read(&con->sock_lock); 811 return; 812 } 813 814 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 815 up_read(&con->sock_lock); 816 if (ret) { 817 log_print("Connection %p failed to shutdown: %d will force close", 818 con, ret); 819 goto force_close; 820 } else { 821 ret = wait_event_timeout(con->shutdown_wait, !con->sock, 822 DLM_SHUTDOWN_WAIT_TIMEOUT); 823 if (ret == 0) { 824 log_print("Connection %p shutdown timed out, will force close", 825 con); 826 goto force_close; 827 } 828 } 829 830 return; 831 832 force_close: 833 close_connection(con, false); 834 } 835 836 static struct processqueue_entry *new_processqueue_entry(int nodeid, 837 int buflen) 838 { 839 struct processqueue_entry *pentry; 840 841 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 842 if (!pentry) 843 return NULL; 844 845 pentry->buf = kmalloc(buflen, GFP_NOFS); 846 if (!pentry->buf) { 847 kfree(pentry); 848 return NULL; 849 } 850 851 pentry->nodeid = nodeid; 852 return pentry; 853 } 854 855 static void free_processqueue_entry(struct processqueue_entry *pentry) 856 { 857 kfree(pentry->buf); 858 kfree(pentry); 859 } 860 861 struct dlm_processed_nodes { 862 int nodeid; 863 864 struct list_head list; 865 }; 866 867 static void process_dlm_messages(struct work_struct *work) 868 { 869 struct processqueue_entry *pentry; 870 871 spin_lock_bh(&processqueue_lock); 872 pentry = list_first_entry_or_null(&processqueue, 873 struct processqueue_entry, list); 874 if (WARN_ON_ONCE(!pentry)) { 875 process_dlm_messages_pending = false; 876 spin_unlock_bh(&processqueue_lock); 877 return; 878 } 879 880 list_del(&pentry->list); 881 if (atomic_dec_and_test(&processqueue_count)) 882 wake_up(&processqueue_wq); 883 spin_unlock_bh(&processqueue_lock); 884 885 for (;;) { 886 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 887 pentry->buflen); 888 free_processqueue_entry(pentry); 889 890 spin_lock_bh(&processqueue_lock); 891 pentry = list_first_entry_or_null(&processqueue, 892 struct processqueue_entry, list); 893 if (!pentry) { 894 process_dlm_messages_pending = false; 895 spin_unlock_bh(&processqueue_lock); 896 break; 897 } 898 899 list_del(&pentry->list); 900 if (atomic_dec_and_test(&processqueue_count)) 901 wake_up(&processqueue_wq); 902 spin_unlock_bh(&processqueue_lock); 903 } 904 } 905 906 /* Data received from remote end */ 907 static int receive_from_sock(struct connection *con, int buflen) 908 { 909 struct processqueue_entry *pentry; 910 int ret, buflen_real; 911 struct msghdr msg; 912 struct kvec iov; 913 914 pentry = new_processqueue_entry(con->nodeid, buflen); 915 if (!pentry) 916 return DLM_IO_RESCHED; 917 918 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 919 920 /* calculate new buffer parameter regarding last receive and 921 * possible leftover bytes 922 */ 923 iov.iov_base = pentry->buf + con->rx_leftover; 924 iov.iov_len = buflen - con->rx_leftover; 925 926 memset(&msg, 0, sizeof(msg)); 927 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 928 clear_bit(CF_RECV_INTR, &con->flags); 929 again: 930 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 931 msg.msg_flags); 932 trace_dlm_recv(con->nodeid, ret); 933 if (ret == -EAGAIN) { 934 lock_sock(con->sock->sk); 935 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 936 release_sock(con->sock->sk); 937 goto again; 938 } 939 940 clear_bit(CF_RECV_PENDING, &con->flags); 941 release_sock(con->sock->sk); 942 free_processqueue_entry(pentry); 943 return DLM_IO_END; 944 } else if (ret == 0) { 945 /* close will clear CF_RECV_PENDING */ 946 free_processqueue_entry(pentry); 947 return DLM_IO_EOF; 948 } else if (ret < 0) { 949 free_processqueue_entry(pentry); 950 return ret; 951 } 952 953 /* new buflen according readed bytes and leftover from last receive */ 954 buflen_real = ret + con->rx_leftover; 955 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 956 buflen_real); 957 if (ret < 0) { 958 free_processqueue_entry(pentry); 959 return ret; 960 } 961 962 pentry->buflen = ret; 963 964 /* calculate leftover bytes from process and put it into begin of 965 * the receive buffer, so next receive we have the full message 966 * at the start address of the receive buffer. 967 */ 968 con->rx_leftover = buflen_real - ret; 969 memmove(con->rx_leftover_buf, pentry->buf + ret, 970 con->rx_leftover); 971 972 spin_lock_bh(&processqueue_lock); 973 ret = atomic_inc_return(&processqueue_count); 974 list_add_tail(&pentry->list, &processqueue); 975 if (!process_dlm_messages_pending) { 976 process_dlm_messages_pending = true; 977 queue_work(process_workqueue, &process_work); 978 } 979 spin_unlock_bh(&processqueue_lock); 980 981 if (ret > DLM_MAX_PROCESS_BUFFERS) 982 return DLM_IO_FLUSH; 983 984 return DLM_IO_SUCCESS; 985 } 986 987 /* Listening socket is busy, accept a connection */ 988 static int accept_from_sock(void) 989 { 990 struct sockaddr_storage peeraddr; 991 int len, idx, result, nodeid; 992 struct connection *newcon; 993 struct socket *newsock; 994 unsigned int mark; 995 996 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 997 if (result == -EAGAIN) 998 return DLM_IO_END; 999 else if (result < 0) 1000 goto accept_err; 1001 1002 /* Get the connected socket's peer */ 1003 memset(&peeraddr, 0, sizeof(peeraddr)); 1004 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 1005 if (len < 0) { 1006 result = -ECONNABORTED; 1007 goto accept_err; 1008 } 1009 1010 /* Get the new node's NODEID */ 1011 make_sockaddr(&peeraddr, 0, &len); 1012 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 1013 switch (peeraddr.ss_family) { 1014 case AF_INET: { 1015 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 1016 1017 log_print("connect from non cluster IPv4 node %pI4", 1018 &sin->sin_addr); 1019 break; 1020 } 1021 #if IS_ENABLED(CONFIG_IPV6) 1022 case AF_INET6: { 1023 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1024 1025 log_print("connect from non cluster IPv6 node %pI6c", 1026 &sin6->sin6_addr); 1027 break; 1028 } 1029 #endif 1030 default: 1031 log_print("invalid family from non cluster node"); 1032 break; 1033 } 1034 1035 sock_release(newsock); 1036 return -1; 1037 } 1038 1039 log_print("got connection from %d", nodeid); 1040 1041 /* Check to see if we already have a connection to this node. This 1042 * could happen if the two nodes initiate a connection at roughly 1043 * the same time and the connections cross on the wire. 1044 * In this case we store the incoming one in "othercon" 1045 */ 1046 idx = srcu_read_lock(&connections_srcu); 1047 newcon = nodeid2con(nodeid, 0); 1048 if (WARN_ON_ONCE(!newcon)) { 1049 srcu_read_unlock(&connections_srcu, idx); 1050 result = -ENOENT; 1051 goto accept_err; 1052 } 1053 1054 sock_set_mark(newsock->sk, mark); 1055 1056 down_write(&newcon->sock_lock); 1057 if (newcon->sock) { 1058 struct connection *othercon = newcon->othercon; 1059 1060 if (!othercon) { 1061 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1062 if (!othercon) { 1063 log_print("failed to allocate incoming socket"); 1064 up_write(&newcon->sock_lock); 1065 srcu_read_unlock(&connections_srcu, idx); 1066 result = -ENOMEM; 1067 goto accept_err; 1068 } 1069 1070 dlm_con_init(othercon, nodeid); 1071 lockdep_set_subclass(&othercon->sock_lock, 1); 1072 newcon->othercon = othercon; 1073 set_bit(CF_IS_OTHERCON, &othercon->flags); 1074 } else { 1075 /* close other sock con if we have something new */ 1076 close_connection(othercon, false); 1077 } 1078 1079 down_write(&othercon->sock_lock); 1080 add_sock(newsock, othercon); 1081 1082 /* check if we receved something while adding */ 1083 lock_sock(othercon->sock->sk); 1084 lowcomms_queue_rwork(othercon); 1085 release_sock(othercon->sock->sk); 1086 up_write(&othercon->sock_lock); 1087 } 1088 else { 1089 /* accept copies the sk after we've saved the callbacks, so we 1090 don't want to save them a second time or comm errors will 1091 result in calling sk_error_report recursively. */ 1092 add_sock(newsock, newcon); 1093 1094 /* check if we receved something while adding */ 1095 lock_sock(newcon->sock->sk); 1096 lowcomms_queue_rwork(newcon); 1097 release_sock(newcon->sock->sk); 1098 } 1099 up_write(&newcon->sock_lock); 1100 srcu_read_unlock(&connections_srcu, idx); 1101 1102 return DLM_IO_SUCCESS; 1103 1104 accept_err: 1105 if (newsock) 1106 sock_release(newsock); 1107 1108 return result; 1109 } 1110 1111 /* 1112 * writequeue_entry_complete - try to delete and free write queue entry 1113 * @e: write queue entry to try to delete 1114 * @completed: bytes completed 1115 * 1116 * writequeue_lock must be held. 1117 */ 1118 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1119 { 1120 e->offset += completed; 1121 e->len -= completed; 1122 /* signal that page was half way transmitted */ 1123 e->dirty = true; 1124 1125 if (e->len == 0 && e->users == 0) 1126 free_entry(e); 1127 } 1128 1129 /* 1130 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1131 */ 1132 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1133 { 1134 struct sockaddr_storage localaddr; 1135 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1136 int i, addr_len, result = 0; 1137 1138 for (i = 0; i < dlm_local_count; i++) { 1139 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1140 make_sockaddr(&localaddr, port, &addr_len); 1141 1142 if (!i) 1143 result = kernel_bind(sock, addr, addr_len); 1144 else 1145 result = sock_bind_add(sock->sk, addr, addr_len); 1146 1147 if (result < 0) { 1148 log_print("Can't bind to %d addr number %d, %d.\n", 1149 port, i + 1, result); 1150 break; 1151 } 1152 } 1153 return result; 1154 } 1155 1156 /* Get local addresses */ 1157 static void init_local(void) 1158 { 1159 struct sockaddr_storage sas; 1160 int i; 1161 1162 dlm_local_count = 0; 1163 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1164 if (dlm_our_addr(&sas, i)) 1165 break; 1166 1167 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1168 } 1169 } 1170 1171 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1172 { 1173 struct writequeue_entry *entry; 1174 1175 entry = dlm_allocate_writequeue(); 1176 if (!entry) 1177 return NULL; 1178 1179 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1180 if (!entry->page) { 1181 dlm_free_writequeue(entry); 1182 return NULL; 1183 } 1184 1185 entry->offset = 0; 1186 entry->len = 0; 1187 entry->end = 0; 1188 entry->dirty = false; 1189 entry->con = con; 1190 entry->users = 1; 1191 kref_init(&entry->ref); 1192 return entry; 1193 } 1194 1195 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1196 char **ppc, void (*cb)(void *data), 1197 void *data) 1198 { 1199 struct writequeue_entry *e; 1200 1201 spin_lock_bh(&con->writequeue_lock); 1202 if (!list_empty(&con->writequeue)) { 1203 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1204 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1205 kref_get(&e->ref); 1206 1207 *ppc = page_address(e->page) + e->end; 1208 if (cb) 1209 cb(data); 1210 1211 e->end += len; 1212 e->users++; 1213 goto out; 1214 } 1215 } 1216 1217 e = new_writequeue_entry(con); 1218 if (!e) 1219 goto out; 1220 1221 kref_get(&e->ref); 1222 *ppc = page_address(e->page); 1223 e->end += len; 1224 if (cb) 1225 cb(data); 1226 1227 list_add_tail(&e->list, &con->writequeue); 1228 1229 out: 1230 spin_unlock_bh(&con->writequeue_lock); 1231 return e; 1232 }; 1233 1234 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1235 char **ppc, void (*cb)(void *data), 1236 void *data) 1237 { 1238 struct writequeue_entry *e; 1239 struct dlm_msg *msg; 1240 1241 msg = dlm_allocate_msg(); 1242 if (!msg) 1243 return NULL; 1244 1245 kref_init(&msg->ref); 1246 1247 e = new_wq_entry(con, len, ppc, cb, data); 1248 if (!e) { 1249 dlm_free_msg(msg); 1250 return NULL; 1251 } 1252 1253 msg->retransmit = false; 1254 msg->orig_msg = NULL; 1255 msg->ppc = *ppc; 1256 msg->len = len; 1257 msg->entry = e; 1258 1259 return msg; 1260 } 1261 1262 /* avoid false positive for nodes_srcu, unlock happens in 1263 * dlm_lowcomms_commit_msg which is a must call if success 1264 */ 1265 #ifndef __CHECKER__ 1266 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, char **ppc, 1267 void (*cb)(void *data), void *data) 1268 { 1269 struct connection *con; 1270 struct dlm_msg *msg; 1271 int idx; 1272 1273 if (len > DLM_MAX_SOCKET_BUFSIZE || 1274 len < sizeof(struct dlm_header)) { 1275 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1276 log_print("failed to allocate a buffer of size %d", len); 1277 WARN_ON_ONCE(1); 1278 return NULL; 1279 } 1280 1281 idx = srcu_read_lock(&connections_srcu); 1282 con = nodeid2con(nodeid, 0); 1283 if (WARN_ON_ONCE(!con)) { 1284 srcu_read_unlock(&connections_srcu, idx); 1285 return NULL; 1286 } 1287 1288 msg = dlm_lowcomms_new_msg_con(con, len, ppc, cb, data); 1289 if (!msg) { 1290 srcu_read_unlock(&connections_srcu, idx); 1291 return NULL; 1292 } 1293 1294 /* for dlm_lowcomms_commit_msg() */ 1295 kref_get(&msg->ref); 1296 /* we assume if successful commit must called */ 1297 msg->idx = idx; 1298 return msg; 1299 } 1300 #endif 1301 1302 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1303 { 1304 struct writequeue_entry *e = msg->entry; 1305 struct connection *con = e->con; 1306 int users; 1307 1308 spin_lock_bh(&con->writequeue_lock); 1309 kref_get(&msg->ref); 1310 list_add(&msg->list, &e->msgs); 1311 1312 users = --e->users; 1313 if (users) 1314 goto out; 1315 1316 e->len = DLM_WQ_LENGTH_BYTES(e); 1317 1318 lowcomms_queue_swork(con); 1319 1320 out: 1321 spin_unlock_bh(&con->writequeue_lock); 1322 return; 1323 } 1324 1325 /* avoid false positive for nodes_srcu, lock was happen in 1326 * dlm_lowcomms_new_msg 1327 */ 1328 #ifndef __CHECKER__ 1329 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1330 { 1331 _dlm_lowcomms_commit_msg(msg); 1332 srcu_read_unlock(&connections_srcu, msg->idx); 1333 /* because dlm_lowcomms_new_msg() */ 1334 kref_put(&msg->ref, dlm_msg_release); 1335 } 1336 #endif 1337 1338 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1339 { 1340 kref_put(&msg->ref, dlm_msg_release); 1341 } 1342 1343 /* does not held connections_srcu, usage lowcomms_error_report only */ 1344 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1345 { 1346 struct dlm_msg *msg_resend; 1347 char *ppc; 1348 1349 if (msg->retransmit) 1350 return 1; 1351 1352 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, 1353 NULL, NULL); 1354 if (!msg_resend) 1355 return -ENOMEM; 1356 1357 msg->retransmit = true; 1358 kref_get(&msg->ref); 1359 msg_resend->orig_msg = msg; 1360 1361 memcpy(ppc, msg->ppc, msg->len); 1362 _dlm_lowcomms_commit_msg(msg_resend); 1363 dlm_lowcomms_put_msg(msg_resend); 1364 1365 return 0; 1366 } 1367 1368 /* Send a message */ 1369 static int send_to_sock(struct connection *con) 1370 { 1371 struct writequeue_entry *e; 1372 struct bio_vec bvec; 1373 struct msghdr msg = { 1374 .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL, 1375 }; 1376 int len, offset, ret; 1377 1378 spin_lock_bh(&con->writequeue_lock); 1379 e = con_next_wq(con); 1380 if (!e) { 1381 clear_bit(CF_SEND_PENDING, &con->flags); 1382 spin_unlock_bh(&con->writequeue_lock); 1383 return DLM_IO_END; 1384 } 1385 1386 len = e->len; 1387 offset = e->offset; 1388 WARN_ON_ONCE(len == 0 && e->users == 0); 1389 spin_unlock_bh(&con->writequeue_lock); 1390 1391 bvec_set_page(&bvec, e->page, len, offset); 1392 iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len); 1393 ret = sock_sendmsg(con->sock, &msg); 1394 trace_dlm_send(con->nodeid, ret); 1395 if (ret == -EAGAIN || ret == 0) { 1396 lock_sock(con->sock->sk); 1397 spin_lock_bh(&con->writequeue_lock); 1398 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1399 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1400 /* Notify TCP that we're limited by the 1401 * application window size. 1402 */ 1403 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1404 con->sock->sk->sk_write_pending++; 1405 1406 clear_bit(CF_SEND_PENDING, &con->flags); 1407 spin_unlock_bh(&con->writequeue_lock); 1408 release_sock(con->sock->sk); 1409 1410 /* wait for write_space() event */ 1411 return DLM_IO_END; 1412 } 1413 spin_unlock_bh(&con->writequeue_lock); 1414 release_sock(con->sock->sk); 1415 1416 return DLM_IO_RESCHED; 1417 } else if (ret < 0) { 1418 return ret; 1419 } 1420 1421 spin_lock_bh(&con->writequeue_lock); 1422 writequeue_entry_complete(e, ret); 1423 spin_unlock_bh(&con->writequeue_lock); 1424 1425 return DLM_IO_SUCCESS; 1426 } 1427 1428 static void clean_one_writequeue(struct connection *con) 1429 { 1430 struct writequeue_entry *e, *safe; 1431 1432 spin_lock_bh(&con->writequeue_lock); 1433 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1434 free_entry(e); 1435 } 1436 spin_unlock_bh(&con->writequeue_lock); 1437 } 1438 1439 static void connection_release(struct rcu_head *rcu) 1440 { 1441 struct connection *con = container_of(rcu, struct connection, rcu); 1442 1443 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1444 WARN_ON_ONCE(con->sock); 1445 kfree(con); 1446 } 1447 1448 /* Called from recovery when it knows that a node has 1449 left the cluster */ 1450 int dlm_lowcomms_close(int nodeid) 1451 { 1452 struct connection *con; 1453 int idx; 1454 1455 log_print("closing connection to node %d", nodeid); 1456 1457 idx = srcu_read_lock(&connections_srcu); 1458 con = nodeid2con(nodeid, 0); 1459 if (WARN_ON_ONCE(!con)) { 1460 srcu_read_unlock(&connections_srcu, idx); 1461 return -ENOENT; 1462 } 1463 1464 stop_connection_io(con); 1465 log_print("io handling for node: %d stopped", nodeid); 1466 close_connection(con, true); 1467 1468 spin_lock(&connections_lock); 1469 hlist_del_rcu(&con->list); 1470 spin_unlock(&connections_lock); 1471 1472 clean_one_writequeue(con); 1473 call_srcu(&connections_srcu, &con->rcu, connection_release); 1474 if (con->othercon) { 1475 clean_one_writequeue(con->othercon); 1476 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1477 } 1478 srcu_read_unlock(&connections_srcu, idx); 1479 1480 /* for debugging we print when we are done to compare with other 1481 * messages in between. This function need to be correctly synchronized 1482 * with io handling 1483 */ 1484 log_print("closing connection to node %d done", nodeid); 1485 1486 return 0; 1487 } 1488 1489 /* Receive worker function */ 1490 static void process_recv_sockets(struct work_struct *work) 1491 { 1492 struct connection *con = container_of(work, struct connection, rwork); 1493 int ret, buflen; 1494 1495 down_read(&con->sock_lock); 1496 if (!con->sock) { 1497 up_read(&con->sock_lock); 1498 return; 1499 } 1500 1501 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1502 do { 1503 ret = receive_from_sock(con, buflen); 1504 } while (ret == DLM_IO_SUCCESS); 1505 up_read(&con->sock_lock); 1506 1507 switch (ret) { 1508 case DLM_IO_END: 1509 /* CF_RECV_PENDING cleared */ 1510 break; 1511 case DLM_IO_EOF: 1512 close_connection(con, false); 1513 wake_up(&con->shutdown_wait); 1514 /* CF_RECV_PENDING cleared */ 1515 break; 1516 case DLM_IO_FLUSH: 1517 /* we can't flush the process_workqueue here because a 1518 * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non 1519 * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead 1520 * we have a waitqueue to wait until all messages are 1521 * processed. 1522 * 1523 * This handling is only necessary to backoff the sender and 1524 * not queue all messages from the socket layer into DLM 1525 * processqueue. When DLM is capable to parse multiple messages 1526 * on an e.g. per socket basis this handling can might be 1527 * removed. Especially in a message burst we are too slow to 1528 * process messages and the queue will fill up memory. 1529 */ 1530 wait_event(processqueue_wq, !atomic_read(&processqueue_count)); 1531 fallthrough; 1532 case DLM_IO_RESCHED: 1533 cond_resched(); 1534 queue_work(io_workqueue, &con->rwork); 1535 /* CF_RECV_PENDING not cleared */ 1536 break; 1537 default: 1538 if (ret < 0) { 1539 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1540 close_connection(con, false); 1541 } else { 1542 spin_lock_bh(&con->writequeue_lock); 1543 lowcomms_queue_swork(con); 1544 spin_unlock_bh(&con->writequeue_lock); 1545 } 1546 1547 /* CF_RECV_PENDING cleared for othercon 1548 * we trigger send queue if not already done 1549 * and process_send_sockets will handle it 1550 */ 1551 break; 1552 } 1553 1554 WARN_ON_ONCE(1); 1555 break; 1556 } 1557 } 1558 1559 static void process_listen_recv_socket(struct work_struct *work) 1560 { 1561 int ret; 1562 1563 if (WARN_ON_ONCE(!listen_con.sock)) 1564 return; 1565 1566 do { 1567 ret = accept_from_sock(); 1568 } while (ret == DLM_IO_SUCCESS); 1569 1570 if (ret < 0) 1571 log_print("critical error accepting connection: %d", ret); 1572 } 1573 1574 static int dlm_connect(struct connection *con) 1575 { 1576 struct sockaddr_storage addr; 1577 int result, addr_len; 1578 struct socket *sock; 1579 unsigned int mark; 1580 1581 memset(&addr, 0, sizeof(addr)); 1582 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1583 dlm_proto_ops->try_new_addr, &mark); 1584 if (result < 0) { 1585 log_print("no address for nodeid %d", con->nodeid); 1586 return result; 1587 } 1588 1589 /* Create a socket to communicate with */ 1590 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1591 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1592 if (result < 0) 1593 return result; 1594 1595 sock_set_mark(sock->sk, mark); 1596 dlm_proto_ops->sockopts(sock); 1597 1598 result = dlm_proto_ops->bind(sock); 1599 if (result < 0) { 1600 sock_release(sock); 1601 return result; 1602 } 1603 1604 add_sock(sock, con); 1605 1606 log_print_ratelimited("connecting to %d", con->nodeid); 1607 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1608 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1609 addr_len); 1610 switch (result) { 1611 case -EINPROGRESS: 1612 /* not an error */ 1613 fallthrough; 1614 case 0: 1615 break; 1616 default: 1617 if (result < 0) 1618 dlm_close_sock(&con->sock); 1619 1620 break; 1621 } 1622 1623 return result; 1624 } 1625 1626 /* Send worker function */ 1627 static void process_send_sockets(struct work_struct *work) 1628 { 1629 struct connection *con = container_of(work, struct connection, swork); 1630 int ret; 1631 1632 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1633 1634 down_read(&con->sock_lock); 1635 if (!con->sock) { 1636 up_read(&con->sock_lock); 1637 down_write(&con->sock_lock); 1638 if (!con->sock) { 1639 ret = dlm_connect(con); 1640 switch (ret) { 1641 case 0: 1642 break; 1643 case -EINPROGRESS: 1644 /* avoid spamming resched on connection 1645 * we might can switch to a state_change 1646 * event based mechanism if established 1647 */ 1648 msleep(100); 1649 break; 1650 default: 1651 /* CF_SEND_PENDING not cleared */ 1652 up_write(&con->sock_lock); 1653 log_print("connect to node %d try %d error %d", 1654 con->nodeid, con->retries++, ret); 1655 msleep(1000); 1656 /* For now we try forever to reconnect. In 1657 * future we should send a event to cluster 1658 * manager to fence itself after certain amount 1659 * of retries. 1660 */ 1661 queue_work(io_workqueue, &con->swork); 1662 return; 1663 } 1664 } 1665 downgrade_write(&con->sock_lock); 1666 } 1667 1668 do { 1669 ret = send_to_sock(con); 1670 } while (ret == DLM_IO_SUCCESS); 1671 up_read(&con->sock_lock); 1672 1673 switch (ret) { 1674 case DLM_IO_END: 1675 /* CF_SEND_PENDING cleared */ 1676 break; 1677 case DLM_IO_RESCHED: 1678 /* CF_SEND_PENDING not cleared */ 1679 cond_resched(); 1680 queue_work(io_workqueue, &con->swork); 1681 break; 1682 default: 1683 if (ret < 0) { 1684 close_connection(con, false); 1685 1686 /* CF_SEND_PENDING cleared */ 1687 spin_lock_bh(&con->writequeue_lock); 1688 lowcomms_queue_swork(con); 1689 spin_unlock_bh(&con->writequeue_lock); 1690 break; 1691 } 1692 1693 WARN_ON_ONCE(1); 1694 break; 1695 } 1696 } 1697 1698 static void work_stop(void) 1699 { 1700 if (io_workqueue) { 1701 destroy_workqueue(io_workqueue); 1702 io_workqueue = NULL; 1703 } 1704 1705 if (process_workqueue) { 1706 destroy_workqueue(process_workqueue); 1707 process_workqueue = NULL; 1708 } 1709 } 1710 1711 static int work_start(void) 1712 { 1713 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM | 1714 WQ_UNBOUND, 0); 1715 if (!io_workqueue) { 1716 log_print("can't start dlm_io"); 1717 return -ENOMEM; 1718 } 1719 1720 process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0); 1721 if (!process_workqueue) { 1722 log_print("can't start dlm_process"); 1723 destroy_workqueue(io_workqueue); 1724 io_workqueue = NULL; 1725 return -ENOMEM; 1726 } 1727 1728 return 0; 1729 } 1730 1731 void dlm_lowcomms_shutdown(void) 1732 { 1733 struct connection *con; 1734 int i, idx; 1735 1736 /* stop lowcomms_listen_data_ready calls */ 1737 lock_sock(listen_con.sock->sk); 1738 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1739 release_sock(listen_con.sock->sk); 1740 1741 cancel_work_sync(&listen_con.rwork); 1742 dlm_close_sock(&listen_con.sock); 1743 1744 idx = srcu_read_lock(&connections_srcu); 1745 for (i = 0; i < CONN_HASH_SIZE; i++) { 1746 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1747 shutdown_connection(con, true); 1748 stop_connection_io(con); 1749 flush_workqueue(process_workqueue); 1750 close_connection(con, true); 1751 1752 clean_one_writequeue(con); 1753 if (con->othercon) 1754 clean_one_writequeue(con->othercon); 1755 allow_connection_io(con); 1756 } 1757 } 1758 srcu_read_unlock(&connections_srcu, idx); 1759 } 1760 1761 void dlm_lowcomms_stop(void) 1762 { 1763 work_stop(); 1764 dlm_proto_ops = NULL; 1765 } 1766 1767 static int dlm_listen_for_all(void) 1768 { 1769 struct socket *sock; 1770 int result; 1771 1772 log_print("Using %s for communications", 1773 dlm_proto_ops->name); 1774 1775 result = dlm_proto_ops->listen_validate(); 1776 if (result < 0) 1777 return result; 1778 1779 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1780 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1781 if (result < 0) { 1782 log_print("Can't create comms socket: %d", result); 1783 return result; 1784 } 1785 1786 sock_set_mark(sock->sk, dlm_config.ci_mark); 1787 dlm_proto_ops->listen_sockopts(sock); 1788 1789 result = dlm_proto_ops->listen_bind(sock); 1790 if (result < 0) 1791 goto out; 1792 1793 lock_sock(sock->sk); 1794 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1795 listen_sock.sk_write_space = sock->sk->sk_write_space; 1796 listen_sock.sk_error_report = sock->sk->sk_error_report; 1797 listen_sock.sk_state_change = sock->sk->sk_state_change; 1798 1799 listen_con.sock = sock; 1800 1801 sock->sk->sk_allocation = GFP_NOFS; 1802 sock->sk->sk_use_task_frag = false; 1803 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1804 release_sock(sock->sk); 1805 1806 result = sock->ops->listen(sock, 128); 1807 if (result < 0) { 1808 dlm_close_sock(&listen_con.sock); 1809 return result; 1810 } 1811 1812 return 0; 1813 1814 out: 1815 sock_release(sock); 1816 return result; 1817 } 1818 1819 static int dlm_tcp_bind(struct socket *sock) 1820 { 1821 struct sockaddr_storage src_addr; 1822 int result, addr_len; 1823 1824 /* Bind to our cluster-known address connecting to avoid 1825 * routing problems. 1826 */ 1827 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1828 make_sockaddr(&src_addr, 0, &addr_len); 1829 1830 result = kernel_bind(sock, (struct sockaddr *)&src_addr, 1831 addr_len); 1832 if (result < 0) { 1833 /* This *may* not indicate a critical error */ 1834 log_print("could not bind for connect: %d", result); 1835 } 1836 1837 return 0; 1838 } 1839 1840 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1841 struct sockaddr *addr, int addr_len) 1842 { 1843 return kernel_connect(sock, addr, addr_len, O_NONBLOCK); 1844 } 1845 1846 static int dlm_tcp_listen_validate(void) 1847 { 1848 /* We don't support multi-homed hosts */ 1849 if (dlm_local_count > 1) { 1850 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1851 return -EINVAL; 1852 } 1853 1854 return 0; 1855 } 1856 1857 static void dlm_tcp_sockopts(struct socket *sock) 1858 { 1859 /* Turn off Nagle's algorithm */ 1860 tcp_sock_set_nodelay(sock->sk); 1861 } 1862 1863 static void dlm_tcp_listen_sockopts(struct socket *sock) 1864 { 1865 dlm_tcp_sockopts(sock); 1866 sock_set_reuseaddr(sock->sk); 1867 } 1868 1869 static int dlm_tcp_listen_bind(struct socket *sock) 1870 { 1871 int addr_len; 1872 1873 /* Bind to our port */ 1874 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1875 return kernel_bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1876 addr_len); 1877 } 1878 1879 static const struct dlm_proto_ops dlm_tcp_ops = { 1880 .name = "TCP", 1881 .proto = IPPROTO_TCP, 1882 .connect = dlm_tcp_connect, 1883 .sockopts = dlm_tcp_sockopts, 1884 .bind = dlm_tcp_bind, 1885 .listen_validate = dlm_tcp_listen_validate, 1886 .listen_sockopts = dlm_tcp_listen_sockopts, 1887 .listen_bind = dlm_tcp_listen_bind, 1888 }; 1889 1890 static int dlm_sctp_bind(struct socket *sock) 1891 { 1892 return sctp_bind_addrs(sock, 0); 1893 } 1894 1895 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1896 struct sockaddr *addr, int addr_len) 1897 { 1898 int ret; 1899 1900 /* 1901 * Make kernel_connect() function return in specified time, 1902 * since O_NONBLOCK argument in connect() function does not work here, 1903 * then, we should restore the default value of this attribute. 1904 */ 1905 sock_set_sndtimeo(sock->sk, 5); 1906 ret = kernel_connect(sock, addr, addr_len, 0); 1907 sock_set_sndtimeo(sock->sk, 0); 1908 return ret; 1909 } 1910 1911 static int dlm_sctp_listen_validate(void) 1912 { 1913 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1914 log_print("SCTP is not enabled by this kernel"); 1915 return -EOPNOTSUPP; 1916 } 1917 1918 request_module("sctp"); 1919 return 0; 1920 } 1921 1922 static int dlm_sctp_bind_listen(struct socket *sock) 1923 { 1924 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1925 } 1926 1927 static void dlm_sctp_sockopts(struct socket *sock) 1928 { 1929 /* Turn off Nagle's algorithm */ 1930 sctp_sock_set_nodelay(sock->sk); 1931 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1932 } 1933 1934 static const struct dlm_proto_ops dlm_sctp_ops = { 1935 .name = "SCTP", 1936 .proto = IPPROTO_SCTP, 1937 .try_new_addr = true, 1938 .connect = dlm_sctp_connect, 1939 .sockopts = dlm_sctp_sockopts, 1940 .bind = dlm_sctp_bind, 1941 .listen_validate = dlm_sctp_listen_validate, 1942 .listen_sockopts = dlm_sctp_sockopts, 1943 .listen_bind = dlm_sctp_bind_listen, 1944 }; 1945 1946 int dlm_lowcomms_start(void) 1947 { 1948 int error; 1949 1950 init_local(); 1951 if (!dlm_local_count) { 1952 error = -ENOTCONN; 1953 log_print("no local IP address has been set"); 1954 goto fail; 1955 } 1956 1957 error = work_start(); 1958 if (error) 1959 goto fail; 1960 1961 /* Start listening */ 1962 switch (dlm_config.ci_protocol) { 1963 case DLM_PROTO_TCP: 1964 dlm_proto_ops = &dlm_tcp_ops; 1965 break; 1966 case DLM_PROTO_SCTP: 1967 dlm_proto_ops = &dlm_sctp_ops; 1968 break; 1969 default: 1970 log_print("Invalid protocol identifier %d set", 1971 dlm_config.ci_protocol); 1972 error = -EINVAL; 1973 goto fail_proto_ops; 1974 } 1975 1976 error = dlm_listen_for_all(); 1977 if (error) 1978 goto fail_listen; 1979 1980 return 0; 1981 1982 fail_listen: 1983 dlm_proto_ops = NULL; 1984 fail_proto_ops: 1985 work_stop(); 1986 fail: 1987 return error; 1988 } 1989 1990 void dlm_lowcomms_init(void) 1991 { 1992 int i; 1993 1994 for (i = 0; i < CONN_HASH_SIZE; i++) 1995 INIT_HLIST_HEAD(&connection_hash[i]); 1996 1997 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1998 } 1999 2000 void dlm_lowcomms_exit(void) 2001 { 2002 struct connection *con; 2003 int i, idx; 2004 2005 idx = srcu_read_lock(&connections_srcu); 2006 for (i = 0; i < CONN_HASH_SIZE; i++) { 2007 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 2008 spin_lock(&connections_lock); 2009 hlist_del_rcu(&con->list); 2010 spin_unlock(&connections_lock); 2011 2012 if (con->othercon) 2013 call_srcu(&connections_srcu, &con->othercon->rcu, 2014 connection_release); 2015 call_srcu(&connections_srcu, &con->rcu, connection_release); 2016 } 2017 } 2018 srcu_read_unlock(&connections_srcu, idx); 2019 } 2020