1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 #include <trace/events/sock.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "memory.h" 63 #include "config.h" 64 65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) 66 #define DLM_MAX_PROCESS_BUFFERS 24 67 #define NEEDED_RMEM (4*1024*1024) 68 69 struct connection { 70 struct socket *sock; /* NULL if not connected */ 71 uint32_t nodeid; /* So we know who we are in the list */ 72 /* this semaphore is used to allow parallel recv/send in read 73 * lock mode. When we release a sock we need to held the write lock. 74 * 75 * However this is locking code and not nice. When we remove the 76 * othercon handling we can look into other mechanism to synchronize 77 * io handling to call sock_release() at the right time. 78 */ 79 struct rw_semaphore sock_lock; 80 unsigned long flags; 81 #define CF_APP_LIMITED 0 82 #define CF_RECV_PENDING 1 83 #define CF_SEND_PENDING 2 84 #define CF_RECV_INTR 3 85 #define CF_IO_STOP 4 86 #define CF_IS_OTHERCON 5 87 struct list_head writequeue; /* List of outgoing writequeue_entries */ 88 spinlock_t writequeue_lock; 89 int retries; 90 struct hlist_node list; 91 /* due some connect()/accept() races we currently have this cross over 92 * connection attempt second connection for one node. 93 * 94 * There is a solution to avoid the race by introducing a connect 95 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 96 * connect. Otherside can connect but will only be considered that 97 * the other side wants to have a reconnect. 98 * 99 * However changing to this behaviour will break backwards compatible. 100 * In a DLM protocol major version upgrade we should remove this! 101 */ 102 struct connection *othercon; 103 struct work_struct rwork; /* receive worker */ 104 struct work_struct swork; /* send worker */ 105 wait_queue_head_t shutdown_wait; 106 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 107 int rx_leftover; 108 int mark; 109 int addr_count; 110 int curr_addr_index; 111 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 112 spinlock_t addrs_lock; 113 struct rcu_head rcu; 114 }; 115 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 116 117 struct listen_connection { 118 struct socket *sock; 119 struct work_struct rwork; 120 }; 121 122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 124 125 /* An entry waiting to be sent */ 126 struct writequeue_entry { 127 struct list_head list; 128 struct page *page; 129 int offset; 130 int len; 131 int end; 132 int users; 133 bool dirty; 134 struct connection *con; 135 struct list_head msgs; 136 struct kref ref; 137 }; 138 139 struct dlm_msg { 140 struct writequeue_entry *entry; 141 struct dlm_msg *orig_msg; 142 bool retransmit; 143 void *ppc; 144 int len; 145 int idx; /* new()/commit() idx exchange */ 146 147 struct list_head list; 148 struct kref ref; 149 }; 150 151 struct processqueue_entry { 152 unsigned char *buf; 153 int nodeid; 154 int buflen; 155 156 struct list_head list; 157 }; 158 159 struct dlm_proto_ops { 160 bool try_new_addr; 161 const char *name; 162 int proto; 163 164 int (*connect)(struct connection *con, struct socket *sock, 165 struct sockaddr *addr, int addr_len); 166 void (*sockopts)(struct socket *sock); 167 int (*bind)(struct socket *sock); 168 int (*listen_validate)(void); 169 void (*listen_sockopts)(struct socket *sock); 170 int (*listen_bind)(struct socket *sock); 171 }; 172 173 static struct listen_sock_callbacks { 174 void (*sk_error_report)(struct sock *); 175 void (*sk_data_ready)(struct sock *); 176 void (*sk_state_change)(struct sock *); 177 void (*sk_write_space)(struct sock *); 178 } listen_sock; 179 180 static struct listen_connection listen_con; 181 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 182 static int dlm_local_count; 183 184 /* Work queues */ 185 static struct workqueue_struct *io_workqueue; 186 static struct workqueue_struct *process_workqueue; 187 188 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 189 static DEFINE_SPINLOCK(connections_lock); 190 DEFINE_STATIC_SRCU(connections_srcu); 191 192 static const struct dlm_proto_ops *dlm_proto_ops; 193 194 #define DLM_IO_SUCCESS 0 195 #define DLM_IO_END 1 196 #define DLM_IO_EOF 2 197 #define DLM_IO_RESCHED 3 198 #define DLM_IO_FLUSH 4 199 200 static void process_recv_sockets(struct work_struct *work); 201 static void process_send_sockets(struct work_struct *work); 202 static void process_dlm_messages(struct work_struct *work); 203 204 static DECLARE_WORK(process_work, process_dlm_messages); 205 static DEFINE_SPINLOCK(processqueue_lock); 206 static bool process_dlm_messages_pending; 207 static atomic_t processqueue_count; 208 static LIST_HEAD(processqueue); 209 210 bool dlm_lowcomms_is_running(void) 211 { 212 return !!listen_con.sock; 213 } 214 215 static void lowcomms_queue_swork(struct connection *con) 216 { 217 assert_spin_locked(&con->writequeue_lock); 218 219 if (!test_bit(CF_IO_STOP, &con->flags) && 220 !test_bit(CF_APP_LIMITED, &con->flags) && 221 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 222 queue_work(io_workqueue, &con->swork); 223 } 224 225 static void lowcomms_queue_rwork(struct connection *con) 226 { 227 #ifdef CONFIG_LOCKDEP 228 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 229 #endif 230 231 if (!test_bit(CF_IO_STOP, &con->flags) && 232 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 233 queue_work(io_workqueue, &con->rwork); 234 } 235 236 static void writequeue_entry_ctor(void *data) 237 { 238 struct writequeue_entry *entry = data; 239 240 INIT_LIST_HEAD(&entry->msgs); 241 } 242 243 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 244 { 245 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 246 0, 0, writequeue_entry_ctor); 247 } 248 249 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 250 { 251 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 252 } 253 254 /* need to held writequeue_lock */ 255 static struct writequeue_entry *con_next_wq(struct connection *con) 256 { 257 struct writequeue_entry *e; 258 259 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 260 list); 261 /* if len is zero nothing is to send, if there are users filling 262 * buffers we wait until the users are done so we can send more. 263 */ 264 if (!e || e->users || e->len == 0) 265 return NULL; 266 267 return e; 268 } 269 270 static struct connection *__find_con(int nodeid, int r) 271 { 272 struct connection *con; 273 274 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 275 if (con->nodeid == nodeid) 276 return con; 277 } 278 279 return NULL; 280 } 281 282 static void dlm_con_init(struct connection *con, int nodeid) 283 { 284 con->nodeid = nodeid; 285 init_rwsem(&con->sock_lock); 286 INIT_LIST_HEAD(&con->writequeue); 287 spin_lock_init(&con->writequeue_lock); 288 INIT_WORK(&con->swork, process_send_sockets); 289 INIT_WORK(&con->rwork, process_recv_sockets); 290 spin_lock_init(&con->addrs_lock); 291 init_waitqueue_head(&con->shutdown_wait); 292 } 293 294 /* 295 * If 'allocation' is zero then we don't attempt to create a new 296 * connection structure for this node. 297 */ 298 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 299 { 300 struct connection *con, *tmp; 301 int r; 302 303 r = nodeid_hash(nodeid); 304 con = __find_con(nodeid, r); 305 if (con || !alloc) 306 return con; 307 308 con = kzalloc(sizeof(*con), alloc); 309 if (!con) 310 return NULL; 311 312 dlm_con_init(con, nodeid); 313 314 spin_lock(&connections_lock); 315 /* Because multiple workqueues/threads calls this function it can 316 * race on multiple cpu's. Instead of locking hot path __find_con() 317 * we just check in rare cases of recently added nodes again 318 * under protection of connections_lock. If this is the case we 319 * abort our connection creation and return the existing connection. 320 */ 321 tmp = __find_con(nodeid, r); 322 if (tmp) { 323 spin_unlock(&connections_lock); 324 kfree(con); 325 return tmp; 326 } 327 328 hlist_add_head_rcu(&con->list, &connection_hash[r]); 329 spin_unlock(&connections_lock); 330 331 return con; 332 } 333 334 static int addr_compare(const struct sockaddr_storage *x, 335 const struct sockaddr_storage *y) 336 { 337 switch (x->ss_family) { 338 case AF_INET: { 339 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 340 struct sockaddr_in *siny = (struct sockaddr_in *)y; 341 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 342 return 0; 343 if (sinx->sin_port != siny->sin_port) 344 return 0; 345 break; 346 } 347 case AF_INET6: { 348 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 349 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 350 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 351 return 0; 352 if (sinx->sin6_port != siny->sin6_port) 353 return 0; 354 break; 355 } 356 default: 357 return 0; 358 } 359 return 1; 360 } 361 362 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 363 struct sockaddr *sa_out, bool try_new_addr, 364 unsigned int *mark) 365 { 366 struct sockaddr_storage sas; 367 struct connection *con; 368 int idx; 369 370 if (!dlm_local_count) 371 return -1; 372 373 idx = srcu_read_lock(&connections_srcu); 374 con = nodeid2con(nodeid, 0); 375 if (!con) { 376 srcu_read_unlock(&connections_srcu, idx); 377 return -ENOENT; 378 } 379 380 spin_lock(&con->addrs_lock); 381 if (!con->addr_count) { 382 spin_unlock(&con->addrs_lock); 383 srcu_read_unlock(&connections_srcu, idx); 384 return -ENOENT; 385 } 386 387 memcpy(&sas, &con->addr[con->curr_addr_index], 388 sizeof(struct sockaddr_storage)); 389 390 if (try_new_addr) { 391 con->curr_addr_index++; 392 if (con->curr_addr_index == con->addr_count) 393 con->curr_addr_index = 0; 394 } 395 396 *mark = con->mark; 397 spin_unlock(&con->addrs_lock); 398 399 if (sas_out) 400 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 401 402 if (!sa_out) { 403 srcu_read_unlock(&connections_srcu, idx); 404 return 0; 405 } 406 407 if (dlm_local_addr[0].ss_family == AF_INET) { 408 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 409 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 410 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 411 } else { 412 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 413 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 414 ret6->sin6_addr = in6->sin6_addr; 415 } 416 417 srcu_read_unlock(&connections_srcu, idx); 418 return 0; 419 } 420 421 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 422 unsigned int *mark) 423 { 424 struct connection *con; 425 int i, idx, addr_i; 426 427 idx = srcu_read_lock(&connections_srcu); 428 for (i = 0; i < CONN_HASH_SIZE; i++) { 429 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 430 WARN_ON_ONCE(!con->addr_count); 431 432 spin_lock(&con->addrs_lock); 433 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 434 if (addr_compare(&con->addr[addr_i], addr)) { 435 *nodeid = con->nodeid; 436 *mark = con->mark; 437 spin_unlock(&con->addrs_lock); 438 srcu_read_unlock(&connections_srcu, idx); 439 return 0; 440 } 441 } 442 spin_unlock(&con->addrs_lock); 443 } 444 } 445 srcu_read_unlock(&connections_srcu, idx); 446 447 return -ENOENT; 448 } 449 450 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 451 const struct sockaddr_storage *addr) 452 { 453 int i; 454 455 for (i = 0; i < con->addr_count; i++) { 456 if (addr_compare(&con->addr[i], addr)) 457 return true; 458 } 459 460 return false; 461 } 462 463 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 464 { 465 struct connection *con; 466 bool ret, idx; 467 468 idx = srcu_read_lock(&connections_srcu); 469 con = nodeid2con(nodeid, GFP_NOFS); 470 if (!con) { 471 srcu_read_unlock(&connections_srcu, idx); 472 return -ENOMEM; 473 } 474 475 spin_lock(&con->addrs_lock); 476 if (!con->addr_count) { 477 memcpy(&con->addr[0], addr, sizeof(*addr)); 478 con->addr_count = 1; 479 con->mark = dlm_config.ci_mark; 480 spin_unlock(&con->addrs_lock); 481 srcu_read_unlock(&connections_srcu, idx); 482 return 0; 483 } 484 485 ret = dlm_lowcomms_con_has_addr(con, addr); 486 if (ret) { 487 spin_unlock(&con->addrs_lock); 488 srcu_read_unlock(&connections_srcu, idx); 489 return -EEXIST; 490 } 491 492 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 493 spin_unlock(&con->addrs_lock); 494 srcu_read_unlock(&connections_srcu, idx); 495 return -ENOSPC; 496 } 497 498 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 499 srcu_read_unlock(&connections_srcu, idx); 500 spin_unlock(&con->addrs_lock); 501 return 0; 502 } 503 504 /* Data available on socket or listen socket received a connect */ 505 static void lowcomms_data_ready(struct sock *sk) 506 { 507 struct connection *con = sock2con(sk); 508 509 trace_sk_data_ready(sk); 510 511 set_bit(CF_RECV_INTR, &con->flags); 512 lowcomms_queue_rwork(con); 513 } 514 515 static void lowcomms_write_space(struct sock *sk) 516 { 517 struct connection *con = sock2con(sk); 518 519 clear_bit(SOCK_NOSPACE, &con->sock->flags); 520 521 spin_lock_bh(&con->writequeue_lock); 522 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 523 con->sock->sk->sk_write_pending--; 524 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 525 } 526 527 lowcomms_queue_swork(con); 528 spin_unlock_bh(&con->writequeue_lock); 529 } 530 531 static void lowcomms_state_change(struct sock *sk) 532 { 533 /* SCTP layer is not calling sk_data_ready when the connection 534 * is done, so we catch the signal through here. 535 */ 536 if (sk->sk_shutdown == RCV_SHUTDOWN) 537 lowcomms_data_ready(sk); 538 } 539 540 static void lowcomms_listen_data_ready(struct sock *sk) 541 { 542 trace_sk_data_ready(sk); 543 544 queue_work(io_workqueue, &listen_con.rwork); 545 } 546 547 int dlm_lowcomms_connect_node(int nodeid) 548 { 549 struct connection *con; 550 int idx; 551 552 idx = srcu_read_lock(&connections_srcu); 553 con = nodeid2con(nodeid, 0); 554 if (WARN_ON_ONCE(!con)) { 555 srcu_read_unlock(&connections_srcu, idx); 556 return -ENOENT; 557 } 558 559 down_read(&con->sock_lock); 560 if (!con->sock) { 561 spin_lock_bh(&con->writequeue_lock); 562 lowcomms_queue_swork(con); 563 spin_unlock_bh(&con->writequeue_lock); 564 } 565 up_read(&con->sock_lock); 566 srcu_read_unlock(&connections_srcu, idx); 567 568 cond_resched(); 569 return 0; 570 } 571 572 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 573 { 574 struct connection *con; 575 int idx; 576 577 idx = srcu_read_lock(&connections_srcu); 578 con = nodeid2con(nodeid, 0); 579 if (!con) { 580 srcu_read_unlock(&connections_srcu, idx); 581 return -ENOENT; 582 } 583 584 spin_lock(&con->addrs_lock); 585 con->mark = mark; 586 spin_unlock(&con->addrs_lock); 587 srcu_read_unlock(&connections_srcu, idx); 588 return 0; 589 } 590 591 static void lowcomms_error_report(struct sock *sk) 592 { 593 struct connection *con = sock2con(sk); 594 struct inet_sock *inet; 595 596 inet = inet_sk(sk); 597 switch (sk->sk_family) { 598 case AF_INET: 599 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 600 "sending to node %d at %pI4, dport %d, " 601 "sk_err=%d/%d\n", dlm_our_nodeid(), 602 con->nodeid, &inet->inet_daddr, 603 ntohs(inet->inet_dport), sk->sk_err, 604 READ_ONCE(sk->sk_err_soft)); 605 break; 606 #if IS_ENABLED(CONFIG_IPV6) 607 case AF_INET6: 608 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 609 "sending to node %d at %pI6c, " 610 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 611 con->nodeid, &sk->sk_v6_daddr, 612 ntohs(inet->inet_dport), sk->sk_err, 613 READ_ONCE(sk->sk_err_soft)); 614 break; 615 #endif 616 default: 617 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 618 "invalid socket family %d set, " 619 "sk_err=%d/%d\n", dlm_our_nodeid(), 620 sk->sk_family, sk->sk_err, 621 READ_ONCE(sk->sk_err_soft)); 622 break; 623 } 624 625 dlm_midcomms_unack_msg_resend(con->nodeid); 626 627 listen_sock.sk_error_report(sk); 628 } 629 630 static void restore_callbacks(struct sock *sk) 631 { 632 #ifdef CONFIG_LOCKDEP 633 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 634 #endif 635 636 sk->sk_user_data = NULL; 637 sk->sk_data_ready = listen_sock.sk_data_ready; 638 sk->sk_state_change = listen_sock.sk_state_change; 639 sk->sk_write_space = listen_sock.sk_write_space; 640 sk->sk_error_report = listen_sock.sk_error_report; 641 } 642 643 /* Make a socket active */ 644 static void add_sock(struct socket *sock, struct connection *con) 645 { 646 struct sock *sk = sock->sk; 647 648 lock_sock(sk); 649 con->sock = sock; 650 651 sk->sk_user_data = con; 652 sk->sk_data_ready = lowcomms_data_ready; 653 sk->sk_write_space = lowcomms_write_space; 654 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 655 sk->sk_state_change = lowcomms_state_change; 656 sk->sk_allocation = GFP_NOFS; 657 sk->sk_use_task_frag = false; 658 sk->sk_error_report = lowcomms_error_report; 659 release_sock(sk); 660 } 661 662 /* Add the port number to an IPv6 or 4 sockaddr and return the address 663 length */ 664 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 665 int *addr_len) 666 { 667 saddr->ss_family = dlm_local_addr[0].ss_family; 668 if (saddr->ss_family == AF_INET) { 669 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 670 in4_addr->sin_port = cpu_to_be16(port); 671 *addr_len = sizeof(struct sockaddr_in); 672 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 673 } else { 674 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 675 in6_addr->sin6_port = cpu_to_be16(port); 676 *addr_len = sizeof(struct sockaddr_in6); 677 } 678 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 679 } 680 681 static void dlm_page_release(struct kref *kref) 682 { 683 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 684 ref); 685 686 __free_page(e->page); 687 dlm_free_writequeue(e); 688 } 689 690 static void dlm_msg_release(struct kref *kref) 691 { 692 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 693 694 kref_put(&msg->entry->ref, dlm_page_release); 695 dlm_free_msg(msg); 696 } 697 698 static void free_entry(struct writequeue_entry *e) 699 { 700 struct dlm_msg *msg, *tmp; 701 702 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 703 if (msg->orig_msg) { 704 msg->orig_msg->retransmit = false; 705 kref_put(&msg->orig_msg->ref, dlm_msg_release); 706 } 707 708 list_del(&msg->list); 709 kref_put(&msg->ref, dlm_msg_release); 710 } 711 712 list_del(&e->list); 713 kref_put(&e->ref, dlm_page_release); 714 } 715 716 static void dlm_close_sock(struct socket **sock) 717 { 718 lock_sock((*sock)->sk); 719 restore_callbacks((*sock)->sk); 720 release_sock((*sock)->sk); 721 722 sock_release(*sock); 723 *sock = NULL; 724 } 725 726 static void allow_connection_io(struct connection *con) 727 { 728 if (con->othercon) 729 clear_bit(CF_IO_STOP, &con->othercon->flags); 730 clear_bit(CF_IO_STOP, &con->flags); 731 } 732 733 static void stop_connection_io(struct connection *con) 734 { 735 if (con->othercon) 736 stop_connection_io(con->othercon); 737 738 spin_lock_bh(&con->writequeue_lock); 739 set_bit(CF_IO_STOP, &con->flags); 740 spin_unlock_bh(&con->writequeue_lock); 741 742 down_write(&con->sock_lock); 743 if (con->sock) { 744 lock_sock(con->sock->sk); 745 restore_callbacks(con->sock->sk); 746 release_sock(con->sock->sk); 747 } 748 up_write(&con->sock_lock); 749 750 cancel_work_sync(&con->swork); 751 cancel_work_sync(&con->rwork); 752 } 753 754 /* Close a remote connection and tidy up */ 755 static void close_connection(struct connection *con, bool and_other) 756 { 757 struct writequeue_entry *e; 758 759 if (con->othercon && and_other) 760 close_connection(con->othercon, false); 761 762 down_write(&con->sock_lock); 763 if (!con->sock) { 764 up_write(&con->sock_lock); 765 return; 766 } 767 768 dlm_close_sock(&con->sock); 769 770 /* if we send a writequeue entry only a half way, we drop the 771 * whole entry because reconnection and that we not start of the 772 * middle of a msg which will confuse the other end. 773 * 774 * we can always drop messages because retransmits, but what we 775 * cannot allow is to transmit half messages which may be processed 776 * at the other side. 777 * 778 * our policy is to start on a clean state when disconnects, we don't 779 * know what's send/received on transport layer in this case. 780 */ 781 spin_lock_bh(&con->writequeue_lock); 782 if (!list_empty(&con->writequeue)) { 783 e = list_first_entry(&con->writequeue, struct writequeue_entry, 784 list); 785 if (e->dirty) 786 free_entry(e); 787 } 788 spin_unlock_bh(&con->writequeue_lock); 789 790 con->rx_leftover = 0; 791 con->retries = 0; 792 clear_bit(CF_APP_LIMITED, &con->flags); 793 clear_bit(CF_RECV_PENDING, &con->flags); 794 clear_bit(CF_SEND_PENDING, &con->flags); 795 up_write(&con->sock_lock); 796 } 797 798 static void shutdown_connection(struct connection *con, bool and_other) 799 { 800 int ret; 801 802 if (con->othercon && and_other) 803 shutdown_connection(con->othercon, false); 804 805 flush_workqueue(io_workqueue); 806 down_read(&con->sock_lock); 807 /* nothing to shutdown */ 808 if (!con->sock) { 809 up_read(&con->sock_lock); 810 return; 811 } 812 813 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 814 up_read(&con->sock_lock); 815 if (ret) { 816 log_print("Connection %p failed to shutdown: %d will force close", 817 con, ret); 818 goto force_close; 819 } else { 820 ret = wait_event_timeout(con->shutdown_wait, !con->sock, 821 DLM_SHUTDOWN_WAIT_TIMEOUT); 822 if (ret == 0) { 823 log_print("Connection %p shutdown timed out, will force close", 824 con); 825 goto force_close; 826 } 827 } 828 829 return; 830 831 force_close: 832 close_connection(con, false); 833 } 834 835 static struct processqueue_entry *new_processqueue_entry(int nodeid, 836 int buflen) 837 { 838 struct processqueue_entry *pentry; 839 840 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 841 if (!pentry) 842 return NULL; 843 844 pentry->buf = kmalloc(buflen, GFP_NOFS); 845 if (!pentry->buf) { 846 kfree(pentry); 847 return NULL; 848 } 849 850 pentry->nodeid = nodeid; 851 return pentry; 852 } 853 854 static void free_processqueue_entry(struct processqueue_entry *pentry) 855 { 856 kfree(pentry->buf); 857 kfree(pentry); 858 } 859 860 struct dlm_processed_nodes { 861 int nodeid; 862 863 struct list_head list; 864 }; 865 866 static void process_dlm_messages(struct work_struct *work) 867 { 868 struct processqueue_entry *pentry; 869 870 spin_lock(&processqueue_lock); 871 pentry = list_first_entry_or_null(&processqueue, 872 struct processqueue_entry, list); 873 if (WARN_ON_ONCE(!pentry)) { 874 process_dlm_messages_pending = false; 875 spin_unlock(&processqueue_lock); 876 return; 877 } 878 879 list_del(&pentry->list); 880 atomic_dec(&processqueue_count); 881 spin_unlock(&processqueue_lock); 882 883 for (;;) { 884 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 885 pentry->buflen); 886 free_processqueue_entry(pentry); 887 888 spin_lock(&processqueue_lock); 889 pentry = list_first_entry_or_null(&processqueue, 890 struct processqueue_entry, list); 891 if (!pentry) { 892 process_dlm_messages_pending = false; 893 spin_unlock(&processqueue_lock); 894 break; 895 } 896 897 list_del(&pentry->list); 898 atomic_dec(&processqueue_count); 899 spin_unlock(&processqueue_lock); 900 } 901 } 902 903 /* Data received from remote end */ 904 static int receive_from_sock(struct connection *con, int buflen) 905 { 906 struct processqueue_entry *pentry; 907 int ret, buflen_real; 908 struct msghdr msg; 909 struct kvec iov; 910 911 pentry = new_processqueue_entry(con->nodeid, buflen); 912 if (!pentry) 913 return DLM_IO_RESCHED; 914 915 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 916 917 /* calculate new buffer parameter regarding last receive and 918 * possible leftover bytes 919 */ 920 iov.iov_base = pentry->buf + con->rx_leftover; 921 iov.iov_len = buflen - con->rx_leftover; 922 923 memset(&msg, 0, sizeof(msg)); 924 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 925 clear_bit(CF_RECV_INTR, &con->flags); 926 again: 927 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 928 msg.msg_flags); 929 trace_dlm_recv(con->nodeid, ret); 930 if (ret == -EAGAIN) { 931 lock_sock(con->sock->sk); 932 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 933 release_sock(con->sock->sk); 934 goto again; 935 } 936 937 clear_bit(CF_RECV_PENDING, &con->flags); 938 release_sock(con->sock->sk); 939 free_processqueue_entry(pentry); 940 return DLM_IO_END; 941 } else if (ret == 0) { 942 /* close will clear CF_RECV_PENDING */ 943 free_processqueue_entry(pentry); 944 return DLM_IO_EOF; 945 } else if (ret < 0) { 946 free_processqueue_entry(pentry); 947 return ret; 948 } 949 950 /* new buflen according readed bytes and leftover from last receive */ 951 buflen_real = ret + con->rx_leftover; 952 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 953 buflen_real); 954 if (ret < 0) { 955 free_processqueue_entry(pentry); 956 return ret; 957 } 958 959 pentry->buflen = ret; 960 961 /* calculate leftover bytes from process and put it into begin of 962 * the receive buffer, so next receive we have the full message 963 * at the start address of the receive buffer. 964 */ 965 con->rx_leftover = buflen_real - ret; 966 memmove(con->rx_leftover_buf, pentry->buf + ret, 967 con->rx_leftover); 968 969 spin_lock(&processqueue_lock); 970 ret = atomic_inc_return(&processqueue_count); 971 list_add_tail(&pentry->list, &processqueue); 972 if (!process_dlm_messages_pending) { 973 process_dlm_messages_pending = true; 974 queue_work(process_workqueue, &process_work); 975 } 976 spin_unlock(&processqueue_lock); 977 978 if (ret > DLM_MAX_PROCESS_BUFFERS) 979 return DLM_IO_FLUSH; 980 981 return DLM_IO_SUCCESS; 982 } 983 984 /* Listening socket is busy, accept a connection */ 985 static int accept_from_sock(void) 986 { 987 struct sockaddr_storage peeraddr; 988 int len, idx, result, nodeid; 989 struct connection *newcon; 990 struct socket *newsock; 991 unsigned int mark; 992 993 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 994 if (result == -EAGAIN) 995 return DLM_IO_END; 996 else if (result < 0) 997 goto accept_err; 998 999 /* Get the connected socket's peer */ 1000 memset(&peeraddr, 0, sizeof(peeraddr)); 1001 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 1002 if (len < 0) { 1003 result = -ECONNABORTED; 1004 goto accept_err; 1005 } 1006 1007 /* Get the new node's NODEID */ 1008 make_sockaddr(&peeraddr, 0, &len); 1009 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 1010 switch (peeraddr.ss_family) { 1011 case AF_INET: { 1012 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 1013 1014 log_print("connect from non cluster IPv4 node %pI4", 1015 &sin->sin_addr); 1016 break; 1017 } 1018 #if IS_ENABLED(CONFIG_IPV6) 1019 case AF_INET6: { 1020 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1021 1022 log_print("connect from non cluster IPv6 node %pI6c", 1023 &sin6->sin6_addr); 1024 break; 1025 } 1026 #endif 1027 default: 1028 log_print("invalid family from non cluster node"); 1029 break; 1030 } 1031 1032 sock_release(newsock); 1033 return -1; 1034 } 1035 1036 log_print("got connection from %d", nodeid); 1037 1038 /* Check to see if we already have a connection to this node. This 1039 * could happen if the two nodes initiate a connection at roughly 1040 * the same time and the connections cross on the wire. 1041 * In this case we store the incoming one in "othercon" 1042 */ 1043 idx = srcu_read_lock(&connections_srcu); 1044 newcon = nodeid2con(nodeid, 0); 1045 if (WARN_ON_ONCE(!newcon)) { 1046 srcu_read_unlock(&connections_srcu, idx); 1047 result = -ENOENT; 1048 goto accept_err; 1049 } 1050 1051 sock_set_mark(newsock->sk, mark); 1052 1053 down_write(&newcon->sock_lock); 1054 if (newcon->sock) { 1055 struct connection *othercon = newcon->othercon; 1056 1057 if (!othercon) { 1058 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1059 if (!othercon) { 1060 log_print("failed to allocate incoming socket"); 1061 up_write(&newcon->sock_lock); 1062 srcu_read_unlock(&connections_srcu, idx); 1063 result = -ENOMEM; 1064 goto accept_err; 1065 } 1066 1067 dlm_con_init(othercon, nodeid); 1068 lockdep_set_subclass(&othercon->sock_lock, 1); 1069 newcon->othercon = othercon; 1070 set_bit(CF_IS_OTHERCON, &othercon->flags); 1071 } else { 1072 /* close other sock con if we have something new */ 1073 close_connection(othercon, false); 1074 } 1075 1076 down_write(&othercon->sock_lock); 1077 add_sock(newsock, othercon); 1078 1079 /* check if we receved something while adding */ 1080 lock_sock(othercon->sock->sk); 1081 lowcomms_queue_rwork(othercon); 1082 release_sock(othercon->sock->sk); 1083 up_write(&othercon->sock_lock); 1084 } 1085 else { 1086 /* accept copies the sk after we've saved the callbacks, so we 1087 don't want to save them a second time or comm errors will 1088 result in calling sk_error_report recursively. */ 1089 add_sock(newsock, newcon); 1090 1091 /* check if we receved something while adding */ 1092 lock_sock(newcon->sock->sk); 1093 lowcomms_queue_rwork(newcon); 1094 release_sock(newcon->sock->sk); 1095 } 1096 up_write(&newcon->sock_lock); 1097 srcu_read_unlock(&connections_srcu, idx); 1098 1099 return DLM_IO_SUCCESS; 1100 1101 accept_err: 1102 if (newsock) 1103 sock_release(newsock); 1104 1105 return result; 1106 } 1107 1108 /* 1109 * writequeue_entry_complete - try to delete and free write queue entry 1110 * @e: write queue entry to try to delete 1111 * @completed: bytes completed 1112 * 1113 * writequeue_lock must be held. 1114 */ 1115 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1116 { 1117 e->offset += completed; 1118 e->len -= completed; 1119 /* signal that page was half way transmitted */ 1120 e->dirty = true; 1121 1122 if (e->len == 0 && e->users == 0) 1123 free_entry(e); 1124 } 1125 1126 /* 1127 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1128 */ 1129 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1130 { 1131 struct sockaddr_storage localaddr; 1132 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1133 int i, addr_len, result = 0; 1134 1135 for (i = 0; i < dlm_local_count; i++) { 1136 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1137 make_sockaddr(&localaddr, port, &addr_len); 1138 1139 if (!i) 1140 result = kernel_bind(sock, addr, addr_len); 1141 else 1142 result = sock_bind_add(sock->sk, addr, addr_len); 1143 1144 if (result < 0) { 1145 log_print("Can't bind to %d addr number %d, %d.\n", 1146 port, i + 1, result); 1147 break; 1148 } 1149 } 1150 return result; 1151 } 1152 1153 /* Get local addresses */ 1154 static void init_local(void) 1155 { 1156 struct sockaddr_storage sas; 1157 int i; 1158 1159 dlm_local_count = 0; 1160 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1161 if (dlm_our_addr(&sas, i)) 1162 break; 1163 1164 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1165 } 1166 } 1167 1168 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1169 { 1170 struct writequeue_entry *entry; 1171 1172 entry = dlm_allocate_writequeue(); 1173 if (!entry) 1174 return NULL; 1175 1176 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1177 if (!entry->page) { 1178 dlm_free_writequeue(entry); 1179 return NULL; 1180 } 1181 1182 entry->offset = 0; 1183 entry->len = 0; 1184 entry->end = 0; 1185 entry->dirty = false; 1186 entry->con = con; 1187 entry->users = 1; 1188 kref_init(&entry->ref); 1189 return entry; 1190 } 1191 1192 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1193 char **ppc, void (*cb)(void *data), 1194 void *data) 1195 { 1196 struct writequeue_entry *e; 1197 1198 spin_lock_bh(&con->writequeue_lock); 1199 if (!list_empty(&con->writequeue)) { 1200 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1201 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1202 kref_get(&e->ref); 1203 1204 *ppc = page_address(e->page) + e->end; 1205 if (cb) 1206 cb(data); 1207 1208 e->end += len; 1209 e->users++; 1210 goto out; 1211 } 1212 } 1213 1214 e = new_writequeue_entry(con); 1215 if (!e) 1216 goto out; 1217 1218 kref_get(&e->ref); 1219 *ppc = page_address(e->page); 1220 e->end += len; 1221 if (cb) 1222 cb(data); 1223 1224 list_add_tail(&e->list, &con->writequeue); 1225 1226 out: 1227 spin_unlock_bh(&con->writequeue_lock); 1228 return e; 1229 }; 1230 1231 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1232 gfp_t allocation, char **ppc, 1233 void (*cb)(void *data), 1234 void *data) 1235 { 1236 struct writequeue_entry *e; 1237 struct dlm_msg *msg; 1238 1239 msg = dlm_allocate_msg(allocation); 1240 if (!msg) 1241 return NULL; 1242 1243 kref_init(&msg->ref); 1244 1245 e = new_wq_entry(con, len, ppc, cb, data); 1246 if (!e) { 1247 dlm_free_msg(msg); 1248 return NULL; 1249 } 1250 1251 msg->retransmit = false; 1252 msg->orig_msg = NULL; 1253 msg->ppc = *ppc; 1254 msg->len = len; 1255 msg->entry = e; 1256 1257 return msg; 1258 } 1259 1260 /* avoid false positive for nodes_srcu, unlock happens in 1261 * dlm_lowcomms_commit_msg which is a must call if success 1262 */ 1263 #ifndef __CHECKER__ 1264 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1265 char **ppc, void (*cb)(void *data), 1266 void *data) 1267 { 1268 struct connection *con; 1269 struct dlm_msg *msg; 1270 int idx; 1271 1272 if (len > DLM_MAX_SOCKET_BUFSIZE || 1273 len < sizeof(struct dlm_header)) { 1274 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1275 log_print("failed to allocate a buffer of size %d", len); 1276 WARN_ON_ONCE(1); 1277 return NULL; 1278 } 1279 1280 idx = srcu_read_lock(&connections_srcu); 1281 con = nodeid2con(nodeid, 0); 1282 if (WARN_ON_ONCE(!con)) { 1283 srcu_read_unlock(&connections_srcu, idx); 1284 return NULL; 1285 } 1286 1287 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1288 if (!msg) { 1289 srcu_read_unlock(&connections_srcu, idx); 1290 return NULL; 1291 } 1292 1293 /* for dlm_lowcomms_commit_msg() */ 1294 kref_get(&msg->ref); 1295 /* we assume if successful commit must called */ 1296 msg->idx = idx; 1297 return msg; 1298 } 1299 #endif 1300 1301 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1302 { 1303 struct writequeue_entry *e = msg->entry; 1304 struct connection *con = e->con; 1305 int users; 1306 1307 spin_lock_bh(&con->writequeue_lock); 1308 kref_get(&msg->ref); 1309 list_add(&msg->list, &e->msgs); 1310 1311 users = --e->users; 1312 if (users) 1313 goto out; 1314 1315 e->len = DLM_WQ_LENGTH_BYTES(e); 1316 1317 lowcomms_queue_swork(con); 1318 1319 out: 1320 spin_unlock_bh(&con->writequeue_lock); 1321 return; 1322 } 1323 1324 /* avoid false positive for nodes_srcu, lock was happen in 1325 * dlm_lowcomms_new_msg 1326 */ 1327 #ifndef __CHECKER__ 1328 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1329 { 1330 _dlm_lowcomms_commit_msg(msg); 1331 srcu_read_unlock(&connections_srcu, msg->idx); 1332 /* because dlm_lowcomms_new_msg() */ 1333 kref_put(&msg->ref, dlm_msg_release); 1334 } 1335 #endif 1336 1337 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1338 { 1339 kref_put(&msg->ref, dlm_msg_release); 1340 } 1341 1342 /* does not held connections_srcu, usage lowcomms_error_report only */ 1343 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1344 { 1345 struct dlm_msg *msg_resend; 1346 char *ppc; 1347 1348 if (msg->retransmit) 1349 return 1; 1350 1351 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1352 GFP_ATOMIC, &ppc, NULL, NULL); 1353 if (!msg_resend) 1354 return -ENOMEM; 1355 1356 msg->retransmit = true; 1357 kref_get(&msg->ref); 1358 msg_resend->orig_msg = msg; 1359 1360 memcpy(ppc, msg->ppc, msg->len); 1361 _dlm_lowcomms_commit_msg(msg_resend); 1362 dlm_lowcomms_put_msg(msg_resend); 1363 1364 return 0; 1365 } 1366 1367 /* Send a message */ 1368 static int send_to_sock(struct connection *con) 1369 { 1370 struct writequeue_entry *e; 1371 struct bio_vec bvec; 1372 struct msghdr msg = { 1373 .msg_flags = MSG_SPLICE_PAGES | MSG_DONTWAIT | MSG_NOSIGNAL, 1374 }; 1375 int len, offset, ret; 1376 1377 spin_lock_bh(&con->writequeue_lock); 1378 e = con_next_wq(con); 1379 if (!e) { 1380 clear_bit(CF_SEND_PENDING, &con->flags); 1381 spin_unlock_bh(&con->writequeue_lock); 1382 return DLM_IO_END; 1383 } 1384 1385 len = e->len; 1386 offset = e->offset; 1387 WARN_ON_ONCE(len == 0 && e->users == 0); 1388 spin_unlock_bh(&con->writequeue_lock); 1389 1390 bvec_set_page(&bvec, e->page, len, offset); 1391 iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len); 1392 ret = sock_sendmsg(con->sock, &msg); 1393 trace_dlm_send(con->nodeid, ret); 1394 if (ret == -EAGAIN || ret == 0) { 1395 lock_sock(con->sock->sk); 1396 spin_lock_bh(&con->writequeue_lock); 1397 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1398 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1399 /* Notify TCP that we're limited by the 1400 * application window size. 1401 */ 1402 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1403 con->sock->sk->sk_write_pending++; 1404 1405 clear_bit(CF_SEND_PENDING, &con->flags); 1406 spin_unlock_bh(&con->writequeue_lock); 1407 release_sock(con->sock->sk); 1408 1409 /* wait for write_space() event */ 1410 return DLM_IO_END; 1411 } 1412 spin_unlock_bh(&con->writequeue_lock); 1413 release_sock(con->sock->sk); 1414 1415 return DLM_IO_RESCHED; 1416 } else if (ret < 0) { 1417 return ret; 1418 } 1419 1420 spin_lock_bh(&con->writequeue_lock); 1421 writequeue_entry_complete(e, ret); 1422 spin_unlock_bh(&con->writequeue_lock); 1423 1424 return DLM_IO_SUCCESS; 1425 } 1426 1427 static void clean_one_writequeue(struct connection *con) 1428 { 1429 struct writequeue_entry *e, *safe; 1430 1431 spin_lock_bh(&con->writequeue_lock); 1432 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1433 free_entry(e); 1434 } 1435 spin_unlock_bh(&con->writequeue_lock); 1436 } 1437 1438 static void connection_release(struct rcu_head *rcu) 1439 { 1440 struct connection *con = container_of(rcu, struct connection, rcu); 1441 1442 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1443 WARN_ON_ONCE(con->sock); 1444 kfree(con); 1445 } 1446 1447 /* Called from recovery when it knows that a node has 1448 left the cluster */ 1449 int dlm_lowcomms_close(int nodeid) 1450 { 1451 struct connection *con; 1452 int idx; 1453 1454 log_print("closing connection to node %d", nodeid); 1455 1456 idx = srcu_read_lock(&connections_srcu); 1457 con = nodeid2con(nodeid, 0); 1458 if (WARN_ON_ONCE(!con)) { 1459 srcu_read_unlock(&connections_srcu, idx); 1460 return -ENOENT; 1461 } 1462 1463 stop_connection_io(con); 1464 log_print("io handling for node: %d stopped", nodeid); 1465 close_connection(con, true); 1466 1467 spin_lock(&connections_lock); 1468 hlist_del_rcu(&con->list); 1469 spin_unlock(&connections_lock); 1470 1471 clean_one_writequeue(con); 1472 call_srcu(&connections_srcu, &con->rcu, connection_release); 1473 if (con->othercon) { 1474 clean_one_writequeue(con->othercon); 1475 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1476 } 1477 srcu_read_unlock(&connections_srcu, idx); 1478 1479 /* for debugging we print when we are done to compare with other 1480 * messages in between. This function need to be correctly synchronized 1481 * with io handling 1482 */ 1483 log_print("closing connection to node %d done", nodeid); 1484 1485 return 0; 1486 } 1487 1488 /* Receive worker function */ 1489 static void process_recv_sockets(struct work_struct *work) 1490 { 1491 struct connection *con = container_of(work, struct connection, rwork); 1492 int ret, buflen; 1493 1494 down_read(&con->sock_lock); 1495 if (!con->sock) { 1496 up_read(&con->sock_lock); 1497 return; 1498 } 1499 1500 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1501 do { 1502 ret = receive_from_sock(con, buflen); 1503 } while (ret == DLM_IO_SUCCESS); 1504 up_read(&con->sock_lock); 1505 1506 switch (ret) { 1507 case DLM_IO_END: 1508 /* CF_RECV_PENDING cleared */ 1509 break; 1510 case DLM_IO_EOF: 1511 close_connection(con, false); 1512 wake_up(&con->shutdown_wait); 1513 /* CF_RECV_PENDING cleared */ 1514 break; 1515 case DLM_IO_FLUSH: 1516 flush_workqueue(process_workqueue); 1517 fallthrough; 1518 case DLM_IO_RESCHED: 1519 cond_resched(); 1520 queue_work(io_workqueue, &con->rwork); 1521 /* CF_RECV_PENDING not cleared */ 1522 break; 1523 default: 1524 if (ret < 0) { 1525 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1526 close_connection(con, false); 1527 } else { 1528 spin_lock_bh(&con->writequeue_lock); 1529 lowcomms_queue_swork(con); 1530 spin_unlock_bh(&con->writequeue_lock); 1531 } 1532 1533 /* CF_RECV_PENDING cleared for othercon 1534 * we trigger send queue if not already done 1535 * and process_send_sockets will handle it 1536 */ 1537 break; 1538 } 1539 1540 WARN_ON_ONCE(1); 1541 break; 1542 } 1543 } 1544 1545 static void process_listen_recv_socket(struct work_struct *work) 1546 { 1547 int ret; 1548 1549 if (WARN_ON_ONCE(!listen_con.sock)) 1550 return; 1551 1552 do { 1553 ret = accept_from_sock(); 1554 } while (ret == DLM_IO_SUCCESS); 1555 1556 if (ret < 0) 1557 log_print("critical error accepting connection: %d", ret); 1558 } 1559 1560 static int dlm_connect(struct connection *con) 1561 { 1562 struct sockaddr_storage addr; 1563 int result, addr_len; 1564 struct socket *sock; 1565 unsigned int mark; 1566 1567 memset(&addr, 0, sizeof(addr)); 1568 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1569 dlm_proto_ops->try_new_addr, &mark); 1570 if (result < 0) { 1571 log_print("no address for nodeid %d", con->nodeid); 1572 return result; 1573 } 1574 1575 /* Create a socket to communicate with */ 1576 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1577 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1578 if (result < 0) 1579 return result; 1580 1581 sock_set_mark(sock->sk, mark); 1582 dlm_proto_ops->sockopts(sock); 1583 1584 result = dlm_proto_ops->bind(sock); 1585 if (result < 0) { 1586 sock_release(sock); 1587 return result; 1588 } 1589 1590 add_sock(sock, con); 1591 1592 log_print_ratelimited("connecting to %d", con->nodeid); 1593 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1594 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1595 addr_len); 1596 switch (result) { 1597 case -EINPROGRESS: 1598 /* not an error */ 1599 fallthrough; 1600 case 0: 1601 break; 1602 default: 1603 if (result < 0) 1604 dlm_close_sock(&con->sock); 1605 1606 break; 1607 } 1608 1609 return result; 1610 } 1611 1612 /* Send worker function */ 1613 static void process_send_sockets(struct work_struct *work) 1614 { 1615 struct connection *con = container_of(work, struct connection, swork); 1616 int ret; 1617 1618 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1619 1620 down_read(&con->sock_lock); 1621 if (!con->sock) { 1622 up_read(&con->sock_lock); 1623 down_write(&con->sock_lock); 1624 if (!con->sock) { 1625 ret = dlm_connect(con); 1626 switch (ret) { 1627 case 0: 1628 break; 1629 case -EINPROGRESS: 1630 /* avoid spamming resched on connection 1631 * we might can switch to a state_change 1632 * event based mechanism if established 1633 */ 1634 msleep(100); 1635 break; 1636 default: 1637 /* CF_SEND_PENDING not cleared */ 1638 up_write(&con->sock_lock); 1639 log_print("connect to node %d try %d error %d", 1640 con->nodeid, con->retries++, ret); 1641 msleep(1000); 1642 /* For now we try forever to reconnect. In 1643 * future we should send a event to cluster 1644 * manager to fence itself after certain amount 1645 * of retries. 1646 */ 1647 queue_work(io_workqueue, &con->swork); 1648 return; 1649 } 1650 } 1651 downgrade_write(&con->sock_lock); 1652 } 1653 1654 do { 1655 ret = send_to_sock(con); 1656 } while (ret == DLM_IO_SUCCESS); 1657 up_read(&con->sock_lock); 1658 1659 switch (ret) { 1660 case DLM_IO_END: 1661 /* CF_SEND_PENDING cleared */ 1662 break; 1663 case DLM_IO_RESCHED: 1664 /* CF_SEND_PENDING not cleared */ 1665 cond_resched(); 1666 queue_work(io_workqueue, &con->swork); 1667 break; 1668 default: 1669 if (ret < 0) { 1670 close_connection(con, false); 1671 1672 /* CF_SEND_PENDING cleared */ 1673 spin_lock_bh(&con->writequeue_lock); 1674 lowcomms_queue_swork(con); 1675 spin_unlock_bh(&con->writequeue_lock); 1676 break; 1677 } 1678 1679 WARN_ON_ONCE(1); 1680 break; 1681 } 1682 } 1683 1684 static void work_stop(void) 1685 { 1686 if (io_workqueue) { 1687 destroy_workqueue(io_workqueue); 1688 io_workqueue = NULL; 1689 } 1690 1691 if (process_workqueue) { 1692 destroy_workqueue(process_workqueue); 1693 process_workqueue = NULL; 1694 } 1695 } 1696 1697 static int work_start(void) 1698 { 1699 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM | 1700 WQ_UNBOUND, 0); 1701 if (!io_workqueue) { 1702 log_print("can't start dlm_io"); 1703 return -ENOMEM; 1704 } 1705 1706 /* ordered dlm message process queue, 1707 * should be converted to a tasklet 1708 */ 1709 process_workqueue = alloc_ordered_workqueue("dlm_process", 1710 WQ_HIGHPRI | WQ_MEM_RECLAIM); 1711 if (!process_workqueue) { 1712 log_print("can't start dlm_process"); 1713 destroy_workqueue(io_workqueue); 1714 io_workqueue = NULL; 1715 return -ENOMEM; 1716 } 1717 1718 return 0; 1719 } 1720 1721 void dlm_lowcomms_shutdown(void) 1722 { 1723 struct connection *con; 1724 int i, idx; 1725 1726 /* stop lowcomms_listen_data_ready calls */ 1727 lock_sock(listen_con.sock->sk); 1728 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1729 release_sock(listen_con.sock->sk); 1730 1731 cancel_work_sync(&listen_con.rwork); 1732 dlm_close_sock(&listen_con.sock); 1733 1734 idx = srcu_read_lock(&connections_srcu); 1735 for (i = 0; i < CONN_HASH_SIZE; i++) { 1736 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1737 shutdown_connection(con, true); 1738 stop_connection_io(con); 1739 flush_workqueue(process_workqueue); 1740 close_connection(con, true); 1741 1742 clean_one_writequeue(con); 1743 if (con->othercon) 1744 clean_one_writequeue(con->othercon); 1745 allow_connection_io(con); 1746 } 1747 } 1748 srcu_read_unlock(&connections_srcu, idx); 1749 } 1750 1751 void dlm_lowcomms_stop(void) 1752 { 1753 work_stop(); 1754 dlm_proto_ops = NULL; 1755 } 1756 1757 static int dlm_listen_for_all(void) 1758 { 1759 struct socket *sock; 1760 int result; 1761 1762 log_print("Using %s for communications", 1763 dlm_proto_ops->name); 1764 1765 result = dlm_proto_ops->listen_validate(); 1766 if (result < 0) 1767 return result; 1768 1769 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1770 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1771 if (result < 0) { 1772 log_print("Can't create comms socket: %d", result); 1773 return result; 1774 } 1775 1776 sock_set_mark(sock->sk, dlm_config.ci_mark); 1777 dlm_proto_ops->listen_sockopts(sock); 1778 1779 result = dlm_proto_ops->listen_bind(sock); 1780 if (result < 0) 1781 goto out; 1782 1783 lock_sock(sock->sk); 1784 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1785 listen_sock.sk_write_space = sock->sk->sk_write_space; 1786 listen_sock.sk_error_report = sock->sk->sk_error_report; 1787 listen_sock.sk_state_change = sock->sk->sk_state_change; 1788 1789 listen_con.sock = sock; 1790 1791 sock->sk->sk_allocation = GFP_NOFS; 1792 sock->sk->sk_use_task_frag = false; 1793 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1794 release_sock(sock->sk); 1795 1796 result = sock->ops->listen(sock, 128); 1797 if (result < 0) { 1798 dlm_close_sock(&listen_con.sock); 1799 return result; 1800 } 1801 1802 return 0; 1803 1804 out: 1805 sock_release(sock); 1806 return result; 1807 } 1808 1809 static int dlm_tcp_bind(struct socket *sock) 1810 { 1811 struct sockaddr_storage src_addr; 1812 int result, addr_len; 1813 1814 /* Bind to our cluster-known address connecting to avoid 1815 * routing problems. 1816 */ 1817 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1818 make_sockaddr(&src_addr, 0, &addr_len); 1819 1820 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1821 addr_len); 1822 if (result < 0) { 1823 /* This *may* not indicate a critical error */ 1824 log_print("could not bind for connect: %d", result); 1825 } 1826 1827 return 0; 1828 } 1829 1830 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1831 struct sockaddr *addr, int addr_len) 1832 { 1833 return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1834 } 1835 1836 static int dlm_tcp_listen_validate(void) 1837 { 1838 /* We don't support multi-homed hosts */ 1839 if (dlm_local_count > 1) { 1840 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1841 return -EINVAL; 1842 } 1843 1844 return 0; 1845 } 1846 1847 static void dlm_tcp_sockopts(struct socket *sock) 1848 { 1849 /* Turn off Nagle's algorithm */ 1850 tcp_sock_set_nodelay(sock->sk); 1851 } 1852 1853 static void dlm_tcp_listen_sockopts(struct socket *sock) 1854 { 1855 dlm_tcp_sockopts(sock); 1856 sock_set_reuseaddr(sock->sk); 1857 } 1858 1859 static int dlm_tcp_listen_bind(struct socket *sock) 1860 { 1861 int addr_len; 1862 1863 /* Bind to our port */ 1864 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1865 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1866 addr_len); 1867 } 1868 1869 static const struct dlm_proto_ops dlm_tcp_ops = { 1870 .name = "TCP", 1871 .proto = IPPROTO_TCP, 1872 .connect = dlm_tcp_connect, 1873 .sockopts = dlm_tcp_sockopts, 1874 .bind = dlm_tcp_bind, 1875 .listen_validate = dlm_tcp_listen_validate, 1876 .listen_sockopts = dlm_tcp_listen_sockopts, 1877 .listen_bind = dlm_tcp_listen_bind, 1878 }; 1879 1880 static int dlm_sctp_bind(struct socket *sock) 1881 { 1882 return sctp_bind_addrs(sock, 0); 1883 } 1884 1885 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1886 struct sockaddr *addr, int addr_len) 1887 { 1888 int ret; 1889 1890 /* 1891 * Make sock->ops->connect() function return in specified time, 1892 * since O_NONBLOCK argument in connect() function does not work here, 1893 * then, we should restore the default value of this attribute. 1894 */ 1895 sock_set_sndtimeo(sock->sk, 5); 1896 ret = sock->ops->connect(sock, addr, addr_len, 0); 1897 sock_set_sndtimeo(sock->sk, 0); 1898 return ret; 1899 } 1900 1901 static int dlm_sctp_listen_validate(void) 1902 { 1903 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1904 log_print("SCTP is not enabled by this kernel"); 1905 return -EOPNOTSUPP; 1906 } 1907 1908 request_module("sctp"); 1909 return 0; 1910 } 1911 1912 static int dlm_sctp_bind_listen(struct socket *sock) 1913 { 1914 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1915 } 1916 1917 static void dlm_sctp_sockopts(struct socket *sock) 1918 { 1919 /* Turn off Nagle's algorithm */ 1920 sctp_sock_set_nodelay(sock->sk); 1921 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1922 } 1923 1924 static const struct dlm_proto_ops dlm_sctp_ops = { 1925 .name = "SCTP", 1926 .proto = IPPROTO_SCTP, 1927 .try_new_addr = true, 1928 .connect = dlm_sctp_connect, 1929 .sockopts = dlm_sctp_sockopts, 1930 .bind = dlm_sctp_bind, 1931 .listen_validate = dlm_sctp_listen_validate, 1932 .listen_sockopts = dlm_sctp_sockopts, 1933 .listen_bind = dlm_sctp_bind_listen, 1934 }; 1935 1936 int dlm_lowcomms_start(void) 1937 { 1938 int error; 1939 1940 init_local(); 1941 if (!dlm_local_count) { 1942 error = -ENOTCONN; 1943 log_print("no local IP address has been set"); 1944 goto fail; 1945 } 1946 1947 error = work_start(); 1948 if (error) 1949 goto fail; 1950 1951 /* Start listening */ 1952 switch (dlm_config.ci_protocol) { 1953 case DLM_PROTO_TCP: 1954 dlm_proto_ops = &dlm_tcp_ops; 1955 break; 1956 case DLM_PROTO_SCTP: 1957 dlm_proto_ops = &dlm_sctp_ops; 1958 break; 1959 default: 1960 log_print("Invalid protocol identifier %d set", 1961 dlm_config.ci_protocol); 1962 error = -EINVAL; 1963 goto fail_proto_ops; 1964 } 1965 1966 error = dlm_listen_for_all(); 1967 if (error) 1968 goto fail_listen; 1969 1970 return 0; 1971 1972 fail_listen: 1973 dlm_proto_ops = NULL; 1974 fail_proto_ops: 1975 work_stop(); 1976 fail: 1977 return error; 1978 } 1979 1980 void dlm_lowcomms_init(void) 1981 { 1982 int i; 1983 1984 for (i = 0; i < CONN_HASH_SIZE; i++) 1985 INIT_HLIST_HEAD(&connection_hash[i]); 1986 1987 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1988 } 1989 1990 void dlm_lowcomms_exit(void) 1991 { 1992 struct connection *con; 1993 int i, idx; 1994 1995 idx = srcu_read_lock(&connections_srcu); 1996 for (i = 0; i < CONN_HASH_SIZE; i++) { 1997 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1998 spin_lock(&connections_lock); 1999 hlist_del_rcu(&con->list); 2000 spin_unlock(&connections_lock); 2001 2002 if (con->othercon) 2003 call_srcu(&connections_srcu, &con->othercon->rcu, 2004 connection_release); 2005 call_srcu(&connections_srcu, &con->rcu, connection_release); 2006 } 2007 } 2008 srcu_read_unlock(&connections_srcu, idx); 2009 } 2010