1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 58 #include "dlm_internal.h" 59 #include "lowcomms.h" 60 #include "midcomms.h" 61 #include "memory.h" 62 #include "config.h" 63 64 #define NEEDED_RMEM (4*1024*1024) 65 66 struct connection { 67 struct socket *sock; /* NULL if not connected */ 68 uint32_t nodeid; /* So we know who we are in the list */ 69 /* this semaphore is used to allow parallel recv/send in read 70 * lock mode. When we release a sock we need to held the write lock. 71 * 72 * However this is locking code and not nice. When we remove the 73 * othercon handling we can look into other mechanism to synchronize 74 * io handling to call sock_release() at the right time. 75 */ 76 struct rw_semaphore sock_lock; 77 unsigned long flags; 78 #define CF_APP_LIMITED 0 79 #define CF_RECV_PENDING 1 80 #define CF_SEND_PENDING 2 81 #define CF_RECV_INTR 3 82 #define CF_IO_STOP 4 83 #define CF_IS_OTHERCON 5 84 struct list_head writequeue; /* List of outgoing writequeue_entries */ 85 spinlock_t writequeue_lock; 86 int retries; 87 struct hlist_node list; 88 /* due some connect()/accept() races we currently have this cross over 89 * connection attempt second connection for one node. 90 * 91 * There is a solution to avoid the race by introducing a connect 92 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 93 * connect. Otherside can connect but will only be considered that 94 * the other side wants to have a reconnect. 95 * 96 * However changing to this behaviour will break backwards compatible. 97 * In a DLM protocol major version upgrade we should remove this! 98 */ 99 struct connection *othercon; 100 struct work_struct rwork; /* receive worker */ 101 struct work_struct swork; /* send worker */ 102 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 103 int rx_leftover; 104 int mark; 105 int addr_count; 106 int curr_addr_index; 107 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 108 spinlock_t addrs_lock; 109 struct rcu_head rcu; 110 }; 111 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 112 113 struct listen_connection { 114 struct socket *sock; 115 struct work_struct rwork; 116 }; 117 118 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 119 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 120 121 /* An entry waiting to be sent */ 122 struct writequeue_entry { 123 struct list_head list; 124 struct page *page; 125 int offset; 126 int len; 127 int end; 128 int users; 129 bool dirty; 130 struct connection *con; 131 struct list_head msgs; 132 struct kref ref; 133 }; 134 135 struct dlm_msg { 136 struct writequeue_entry *entry; 137 struct dlm_msg *orig_msg; 138 bool retransmit; 139 void *ppc; 140 int len; 141 int idx; /* new()/commit() idx exchange */ 142 143 struct list_head list; 144 struct kref ref; 145 }; 146 147 struct processqueue_entry { 148 unsigned char *buf; 149 int nodeid; 150 int buflen; 151 152 struct list_head list; 153 }; 154 155 struct dlm_proto_ops { 156 bool try_new_addr; 157 const char *name; 158 int proto; 159 160 int (*connect)(struct connection *con, struct socket *sock, 161 struct sockaddr *addr, int addr_len); 162 void (*sockopts)(struct socket *sock); 163 int (*bind)(struct socket *sock); 164 int (*listen_validate)(void); 165 void (*listen_sockopts)(struct socket *sock); 166 int (*listen_bind)(struct socket *sock); 167 }; 168 169 static struct listen_sock_callbacks { 170 void (*sk_error_report)(struct sock *); 171 void (*sk_data_ready)(struct sock *); 172 void (*sk_state_change)(struct sock *); 173 void (*sk_write_space)(struct sock *); 174 } listen_sock; 175 176 static struct listen_connection listen_con; 177 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 178 static int dlm_local_count; 179 180 /* Work queues */ 181 static struct workqueue_struct *io_workqueue; 182 static struct workqueue_struct *process_workqueue; 183 184 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 185 static DEFINE_SPINLOCK(connections_lock); 186 DEFINE_STATIC_SRCU(connections_srcu); 187 188 static const struct dlm_proto_ops *dlm_proto_ops; 189 190 #define DLM_IO_SUCCESS 0 191 #define DLM_IO_END 1 192 #define DLM_IO_EOF 2 193 #define DLM_IO_RESCHED 3 194 195 static void process_recv_sockets(struct work_struct *work); 196 static void process_send_sockets(struct work_struct *work); 197 static void process_dlm_messages(struct work_struct *work); 198 199 static DECLARE_WORK(process_work, process_dlm_messages); 200 static DEFINE_SPINLOCK(processqueue_lock); 201 static bool process_dlm_messages_pending; 202 static LIST_HEAD(processqueue); 203 204 bool dlm_lowcomms_is_running(void) 205 { 206 return !!listen_con.sock; 207 } 208 209 static void lowcomms_queue_swork(struct connection *con) 210 { 211 assert_spin_locked(&con->writequeue_lock); 212 213 if (!test_bit(CF_IO_STOP, &con->flags) && 214 !test_bit(CF_APP_LIMITED, &con->flags) && 215 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 216 queue_work(io_workqueue, &con->swork); 217 } 218 219 static void lowcomms_queue_rwork(struct connection *con) 220 { 221 #ifdef CONFIG_LOCKDEP 222 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 223 #endif 224 225 if (!test_bit(CF_IO_STOP, &con->flags) && 226 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 227 queue_work(io_workqueue, &con->rwork); 228 } 229 230 static void writequeue_entry_ctor(void *data) 231 { 232 struct writequeue_entry *entry = data; 233 234 INIT_LIST_HEAD(&entry->msgs); 235 } 236 237 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 238 { 239 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 240 0, 0, writequeue_entry_ctor); 241 } 242 243 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 244 { 245 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 246 } 247 248 /* need to held writequeue_lock */ 249 static struct writequeue_entry *con_next_wq(struct connection *con) 250 { 251 struct writequeue_entry *e; 252 253 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 254 list); 255 /* if len is zero nothing is to send, if there are users filling 256 * buffers we wait until the users are done so we can send more. 257 */ 258 if (!e || e->users || e->len == 0) 259 return NULL; 260 261 return e; 262 } 263 264 static struct connection *__find_con(int nodeid, int r) 265 { 266 struct connection *con; 267 268 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 269 if (con->nodeid == nodeid) 270 return con; 271 } 272 273 return NULL; 274 } 275 276 static void dlm_con_init(struct connection *con, int nodeid) 277 { 278 con->nodeid = nodeid; 279 init_rwsem(&con->sock_lock); 280 INIT_LIST_HEAD(&con->writequeue); 281 spin_lock_init(&con->writequeue_lock); 282 INIT_WORK(&con->swork, process_send_sockets); 283 INIT_WORK(&con->rwork, process_recv_sockets); 284 spin_lock_init(&con->addrs_lock); 285 } 286 287 /* 288 * If 'allocation' is zero then we don't attempt to create a new 289 * connection structure for this node. 290 */ 291 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 292 { 293 struct connection *con, *tmp; 294 int r; 295 296 r = nodeid_hash(nodeid); 297 con = __find_con(nodeid, r); 298 if (con || !alloc) 299 return con; 300 301 con = kzalloc(sizeof(*con), alloc); 302 if (!con) 303 return NULL; 304 305 dlm_con_init(con, nodeid); 306 307 spin_lock(&connections_lock); 308 /* Because multiple workqueues/threads calls this function it can 309 * race on multiple cpu's. Instead of locking hot path __find_con() 310 * we just check in rare cases of recently added nodes again 311 * under protection of connections_lock. If this is the case we 312 * abort our connection creation and return the existing connection. 313 */ 314 tmp = __find_con(nodeid, r); 315 if (tmp) { 316 spin_unlock(&connections_lock); 317 kfree(con); 318 return tmp; 319 } 320 321 hlist_add_head_rcu(&con->list, &connection_hash[r]); 322 spin_unlock(&connections_lock); 323 324 return con; 325 } 326 327 static int addr_compare(const struct sockaddr_storage *x, 328 const struct sockaddr_storage *y) 329 { 330 switch (x->ss_family) { 331 case AF_INET: { 332 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 333 struct sockaddr_in *siny = (struct sockaddr_in *)y; 334 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 335 return 0; 336 if (sinx->sin_port != siny->sin_port) 337 return 0; 338 break; 339 } 340 case AF_INET6: { 341 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 342 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 343 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 344 return 0; 345 if (sinx->sin6_port != siny->sin6_port) 346 return 0; 347 break; 348 } 349 default: 350 return 0; 351 } 352 return 1; 353 } 354 355 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 356 struct sockaddr *sa_out, bool try_new_addr, 357 unsigned int *mark) 358 { 359 struct sockaddr_storage sas; 360 struct connection *con; 361 int idx; 362 363 if (!dlm_local_count) 364 return -1; 365 366 idx = srcu_read_lock(&connections_srcu); 367 con = nodeid2con(nodeid, 0); 368 if (!con) { 369 srcu_read_unlock(&connections_srcu, idx); 370 return -ENOENT; 371 } 372 373 spin_lock(&con->addrs_lock); 374 if (!con->addr_count) { 375 spin_unlock(&con->addrs_lock); 376 srcu_read_unlock(&connections_srcu, idx); 377 return -ENOENT; 378 } 379 380 memcpy(&sas, &con->addr[con->curr_addr_index], 381 sizeof(struct sockaddr_storage)); 382 383 if (try_new_addr) { 384 con->curr_addr_index++; 385 if (con->curr_addr_index == con->addr_count) 386 con->curr_addr_index = 0; 387 } 388 389 *mark = con->mark; 390 spin_unlock(&con->addrs_lock); 391 392 if (sas_out) 393 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 394 395 if (!sa_out) { 396 srcu_read_unlock(&connections_srcu, idx); 397 return 0; 398 } 399 400 if (dlm_local_addr[0].ss_family == AF_INET) { 401 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 402 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 403 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 404 } else { 405 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 406 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 407 ret6->sin6_addr = in6->sin6_addr; 408 } 409 410 srcu_read_unlock(&connections_srcu, idx); 411 return 0; 412 } 413 414 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 415 unsigned int *mark) 416 { 417 struct connection *con; 418 int i, idx, addr_i; 419 420 idx = srcu_read_lock(&connections_srcu); 421 for (i = 0; i < CONN_HASH_SIZE; i++) { 422 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 423 WARN_ON_ONCE(!con->addr_count); 424 425 spin_lock(&con->addrs_lock); 426 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 427 if (addr_compare(&con->addr[addr_i], addr)) { 428 *nodeid = con->nodeid; 429 *mark = con->mark; 430 spin_unlock(&con->addrs_lock); 431 srcu_read_unlock(&connections_srcu, idx); 432 return 0; 433 } 434 } 435 spin_unlock(&con->addrs_lock); 436 } 437 } 438 srcu_read_unlock(&connections_srcu, idx); 439 440 return -ENOENT; 441 } 442 443 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 444 const struct sockaddr_storage *addr) 445 { 446 int i; 447 448 for (i = 0; i < con->addr_count; i++) { 449 if (addr_compare(&con->addr[i], addr)) 450 return true; 451 } 452 453 return false; 454 } 455 456 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 457 { 458 struct connection *con; 459 bool ret, idx; 460 461 idx = srcu_read_lock(&connections_srcu); 462 con = nodeid2con(nodeid, GFP_NOFS); 463 if (!con) { 464 srcu_read_unlock(&connections_srcu, idx); 465 return -ENOMEM; 466 } 467 468 spin_lock(&con->addrs_lock); 469 if (!con->addr_count) { 470 memcpy(&con->addr[0], addr, sizeof(*addr)); 471 con->addr_count = 1; 472 con->mark = dlm_config.ci_mark; 473 spin_unlock(&con->addrs_lock); 474 srcu_read_unlock(&connections_srcu, idx); 475 return 0; 476 } 477 478 ret = dlm_lowcomms_con_has_addr(con, addr); 479 if (ret) { 480 spin_unlock(&con->addrs_lock); 481 srcu_read_unlock(&connections_srcu, idx); 482 return -EEXIST; 483 } 484 485 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 486 spin_unlock(&con->addrs_lock); 487 srcu_read_unlock(&connections_srcu, idx); 488 return -ENOSPC; 489 } 490 491 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 492 srcu_read_unlock(&connections_srcu, idx); 493 spin_unlock(&con->addrs_lock); 494 return 0; 495 } 496 497 /* Data available on socket or listen socket received a connect */ 498 static void lowcomms_data_ready(struct sock *sk) 499 { 500 struct connection *con = sock2con(sk); 501 502 set_bit(CF_RECV_INTR, &con->flags); 503 lowcomms_queue_rwork(con); 504 } 505 506 static void lowcomms_write_space(struct sock *sk) 507 { 508 struct connection *con = sock2con(sk); 509 510 clear_bit(SOCK_NOSPACE, &con->sock->flags); 511 512 spin_lock_bh(&con->writequeue_lock); 513 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 514 con->sock->sk->sk_write_pending--; 515 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 516 } 517 518 lowcomms_queue_swork(con); 519 spin_unlock_bh(&con->writequeue_lock); 520 } 521 522 static void lowcomms_state_change(struct sock *sk) 523 { 524 /* SCTP layer is not calling sk_data_ready when the connection 525 * is done, so we catch the signal through here. 526 */ 527 if (sk->sk_shutdown == RCV_SHUTDOWN) 528 lowcomms_data_ready(sk); 529 } 530 531 static void lowcomms_listen_data_ready(struct sock *sk) 532 { 533 queue_work(io_workqueue, &listen_con.rwork); 534 } 535 536 int dlm_lowcomms_connect_node(int nodeid) 537 { 538 struct connection *con; 539 int idx; 540 541 if (nodeid == dlm_our_nodeid()) 542 return 0; 543 544 idx = srcu_read_lock(&connections_srcu); 545 con = nodeid2con(nodeid, 0); 546 if (WARN_ON_ONCE(!con)) { 547 srcu_read_unlock(&connections_srcu, idx); 548 return -ENOENT; 549 } 550 551 down_read(&con->sock_lock); 552 if (!con->sock) { 553 spin_lock_bh(&con->writequeue_lock); 554 lowcomms_queue_swork(con); 555 spin_unlock_bh(&con->writequeue_lock); 556 } 557 up_read(&con->sock_lock); 558 srcu_read_unlock(&connections_srcu, idx); 559 560 cond_resched(); 561 return 0; 562 } 563 564 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 565 { 566 struct connection *con; 567 int idx; 568 569 idx = srcu_read_lock(&connections_srcu); 570 con = nodeid2con(nodeid, 0); 571 if (!con) { 572 srcu_read_unlock(&connections_srcu, idx); 573 return -ENOENT; 574 } 575 576 spin_lock(&con->addrs_lock); 577 con->mark = mark; 578 spin_unlock(&con->addrs_lock); 579 srcu_read_unlock(&connections_srcu, idx); 580 return 0; 581 } 582 583 static void lowcomms_error_report(struct sock *sk) 584 { 585 struct connection *con = sock2con(sk); 586 struct inet_sock *inet; 587 588 inet = inet_sk(sk); 589 switch (sk->sk_family) { 590 case AF_INET: 591 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 592 "sending to node %d at %pI4, dport %d, " 593 "sk_err=%d/%d\n", dlm_our_nodeid(), 594 con->nodeid, &inet->inet_daddr, 595 ntohs(inet->inet_dport), sk->sk_err, 596 sk->sk_err_soft); 597 break; 598 #if IS_ENABLED(CONFIG_IPV6) 599 case AF_INET6: 600 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 601 "sending to node %d at %pI6c, " 602 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 603 con->nodeid, &sk->sk_v6_daddr, 604 ntohs(inet->inet_dport), sk->sk_err, 605 sk->sk_err_soft); 606 break; 607 #endif 608 default: 609 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 610 "invalid socket family %d set, " 611 "sk_err=%d/%d\n", dlm_our_nodeid(), 612 sk->sk_family, sk->sk_err, sk->sk_err_soft); 613 break; 614 } 615 616 dlm_midcomms_unack_msg_resend(con->nodeid); 617 618 listen_sock.sk_error_report(sk); 619 } 620 621 static void restore_callbacks(struct sock *sk) 622 { 623 #ifdef CONFIG_LOCKDEP 624 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 625 #endif 626 627 sk->sk_user_data = NULL; 628 sk->sk_data_ready = listen_sock.sk_data_ready; 629 sk->sk_state_change = listen_sock.sk_state_change; 630 sk->sk_write_space = listen_sock.sk_write_space; 631 sk->sk_error_report = listen_sock.sk_error_report; 632 } 633 634 /* Make a socket active */ 635 static void add_sock(struct socket *sock, struct connection *con) 636 { 637 struct sock *sk = sock->sk; 638 639 lock_sock(sk); 640 con->sock = sock; 641 642 sk->sk_user_data = con; 643 sk->sk_data_ready = lowcomms_data_ready; 644 sk->sk_write_space = lowcomms_write_space; 645 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 646 sk->sk_state_change = lowcomms_state_change; 647 sk->sk_allocation = GFP_NOFS; 648 sk->sk_use_task_frag = false; 649 sk->sk_error_report = lowcomms_error_report; 650 release_sock(sk); 651 } 652 653 /* Add the port number to an IPv6 or 4 sockaddr and return the address 654 length */ 655 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 656 int *addr_len) 657 { 658 saddr->ss_family = dlm_local_addr[0].ss_family; 659 if (saddr->ss_family == AF_INET) { 660 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 661 in4_addr->sin_port = cpu_to_be16(port); 662 *addr_len = sizeof(struct sockaddr_in); 663 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 664 } else { 665 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 666 in6_addr->sin6_port = cpu_to_be16(port); 667 *addr_len = sizeof(struct sockaddr_in6); 668 } 669 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 670 } 671 672 static void dlm_page_release(struct kref *kref) 673 { 674 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 675 ref); 676 677 __free_page(e->page); 678 dlm_free_writequeue(e); 679 } 680 681 static void dlm_msg_release(struct kref *kref) 682 { 683 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 684 685 kref_put(&msg->entry->ref, dlm_page_release); 686 dlm_free_msg(msg); 687 } 688 689 static void free_entry(struct writequeue_entry *e) 690 { 691 struct dlm_msg *msg, *tmp; 692 693 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 694 if (msg->orig_msg) { 695 msg->orig_msg->retransmit = false; 696 kref_put(&msg->orig_msg->ref, dlm_msg_release); 697 } 698 699 list_del(&msg->list); 700 kref_put(&msg->ref, dlm_msg_release); 701 } 702 703 list_del(&e->list); 704 kref_put(&e->ref, dlm_page_release); 705 } 706 707 static void dlm_close_sock(struct socket **sock) 708 { 709 lock_sock((*sock)->sk); 710 restore_callbacks((*sock)->sk); 711 release_sock((*sock)->sk); 712 713 sock_release(*sock); 714 *sock = NULL; 715 } 716 717 static void allow_connection_io(struct connection *con) 718 { 719 if (con->othercon) 720 clear_bit(CF_IO_STOP, &con->othercon->flags); 721 clear_bit(CF_IO_STOP, &con->flags); 722 } 723 724 static void stop_connection_io(struct connection *con) 725 { 726 if (con->othercon) 727 stop_connection_io(con->othercon); 728 729 down_write(&con->sock_lock); 730 if (con->sock) { 731 lock_sock(con->sock->sk); 732 restore_callbacks(con->sock->sk); 733 734 spin_lock_bh(&con->writequeue_lock); 735 set_bit(CF_IO_STOP, &con->flags); 736 spin_unlock_bh(&con->writequeue_lock); 737 release_sock(con->sock->sk); 738 } else { 739 spin_lock_bh(&con->writequeue_lock); 740 set_bit(CF_IO_STOP, &con->flags); 741 spin_unlock_bh(&con->writequeue_lock); 742 } 743 up_write(&con->sock_lock); 744 745 cancel_work_sync(&con->swork); 746 cancel_work_sync(&con->rwork); 747 } 748 749 /* Close a remote connection and tidy up */ 750 static void close_connection(struct connection *con, bool and_other) 751 { 752 struct writequeue_entry *e; 753 754 if (con->othercon && and_other) 755 close_connection(con->othercon, false); 756 757 down_write(&con->sock_lock); 758 if (!con->sock) { 759 up_write(&con->sock_lock); 760 return; 761 } 762 763 dlm_close_sock(&con->sock); 764 765 /* if we send a writequeue entry only a half way, we drop the 766 * whole entry because reconnection and that we not start of the 767 * middle of a msg which will confuse the other end. 768 * 769 * we can always drop messages because retransmits, but what we 770 * cannot allow is to transmit half messages which may be processed 771 * at the other side. 772 * 773 * our policy is to start on a clean state when disconnects, we don't 774 * know what's send/received on transport layer in this case. 775 */ 776 spin_lock_bh(&con->writequeue_lock); 777 if (!list_empty(&con->writequeue)) { 778 e = list_first_entry(&con->writequeue, struct writequeue_entry, 779 list); 780 if (e->dirty) 781 free_entry(e); 782 } 783 spin_unlock_bh(&con->writequeue_lock); 784 785 con->rx_leftover = 0; 786 con->retries = 0; 787 clear_bit(CF_APP_LIMITED, &con->flags); 788 clear_bit(CF_RECV_PENDING, &con->flags); 789 clear_bit(CF_SEND_PENDING, &con->flags); 790 up_write(&con->sock_lock); 791 } 792 793 static struct processqueue_entry *new_processqueue_entry(int nodeid, 794 int buflen) 795 { 796 struct processqueue_entry *pentry; 797 798 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 799 if (!pentry) 800 return NULL; 801 802 pentry->buf = kmalloc(buflen, GFP_NOFS); 803 if (!pentry->buf) { 804 kfree(pentry); 805 return NULL; 806 } 807 808 pentry->nodeid = nodeid; 809 return pentry; 810 } 811 812 static void free_processqueue_entry(struct processqueue_entry *pentry) 813 { 814 kfree(pentry->buf); 815 kfree(pentry); 816 } 817 818 struct dlm_processed_nodes { 819 int nodeid; 820 821 struct list_head list; 822 }; 823 824 static void add_processed_node(int nodeid, struct list_head *processed_nodes) 825 { 826 struct dlm_processed_nodes *n; 827 828 list_for_each_entry(n, processed_nodes, list) { 829 /* we already remembered this node */ 830 if (n->nodeid == nodeid) 831 return; 832 } 833 834 /* if it's fails in worst case we simple don't send an ack back. 835 * We try it next time. 836 */ 837 n = kmalloc(sizeof(*n), GFP_NOFS); 838 if (!n) 839 return; 840 841 n->nodeid = nodeid; 842 list_add(&n->list, processed_nodes); 843 } 844 845 static void process_dlm_messages(struct work_struct *work) 846 { 847 struct dlm_processed_nodes *n, *n_tmp; 848 struct processqueue_entry *pentry; 849 LIST_HEAD(processed_nodes); 850 851 spin_lock(&processqueue_lock); 852 pentry = list_first_entry_or_null(&processqueue, 853 struct processqueue_entry, list); 854 if (WARN_ON_ONCE(!pentry)) { 855 spin_unlock(&processqueue_lock); 856 return; 857 } 858 859 list_del(&pentry->list); 860 spin_unlock(&processqueue_lock); 861 862 for (;;) { 863 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 864 pentry->buflen); 865 add_processed_node(pentry->nodeid, &processed_nodes); 866 free_processqueue_entry(pentry); 867 868 spin_lock(&processqueue_lock); 869 pentry = list_first_entry_or_null(&processqueue, 870 struct processqueue_entry, list); 871 if (!pentry) { 872 process_dlm_messages_pending = false; 873 spin_unlock(&processqueue_lock); 874 break; 875 } 876 877 list_del(&pentry->list); 878 spin_unlock(&processqueue_lock); 879 } 880 881 /* send ack back after we processed couple of messages */ 882 list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { 883 list_del(&n->list); 884 dlm_midcomms_receive_done(n->nodeid); 885 kfree(n); 886 } 887 } 888 889 /* Data received from remote end */ 890 static int receive_from_sock(struct connection *con, int buflen) 891 { 892 struct processqueue_entry *pentry; 893 int ret, buflen_real; 894 struct msghdr msg; 895 struct kvec iov; 896 897 pentry = new_processqueue_entry(con->nodeid, buflen); 898 if (!pentry) 899 return DLM_IO_RESCHED; 900 901 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 902 903 /* calculate new buffer parameter regarding last receive and 904 * possible leftover bytes 905 */ 906 iov.iov_base = pentry->buf + con->rx_leftover; 907 iov.iov_len = buflen - con->rx_leftover; 908 909 memset(&msg, 0, sizeof(msg)); 910 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 911 clear_bit(CF_RECV_INTR, &con->flags); 912 again: 913 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 914 msg.msg_flags); 915 trace_dlm_recv(con->nodeid, ret); 916 if (ret == -EAGAIN) { 917 lock_sock(con->sock->sk); 918 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 919 release_sock(con->sock->sk); 920 goto again; 921 } 922 923 clear_bit(CF_RECV_PENDING, &con->flags); 924 release_sock(con->sock->sk); 925 free_processqueue_entry(pentry); 926 return DLM_IO_END; 927 } else if (ret == 0) { 928 /* close will clear CF_RECV_PENDING */ 929 free_processqueue_entry(pentry); 930 return DLM_IO_EOF; 931 } else if (ret < 0) { 932 free_processqueue_entry(pentry); 933 return ret; 934 } 935 936 /* new buflen according readed bytes and leftover from last receive */ 937 buflen_real = ret + con->rx_leftover; 938 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 939 buflen_real); 940 if (ret < 0) { 941 free_processqueue_entry(pentry); 942 return ret; 943 } 944 945 pentry->buflen = ret; 946 947 /* calculate leftover bytes from process and put it into begin of 948 * the receive buffer, so next receive we have the full message 949 * at the start address of the receive buffer. 950 */ 951 con->rx_leftover = buflen_real - ret; 952 memmove(con->rx_leftover_buf, pentry->buf + ret, 953 con->rx_leftover); 954 955 spin_lock(&processqueue_lock); 956 list_add_tail(&pentry->list, &processqueue); 957 if (!process_dlm_messages_pending) { 958 process_dlm_messages_pending = true; 959 queue_work(process_workqueue, &process_work); 960 } 961 spin_unlock(&processqueue_lock); 962 963 return DLM_IO_SUCCESS; 964 } 965 966 /* Listening socket is busy, accept a connection */ 967 static int accept_from_sock(void) 968 { 969 struct sockaddr_storage peeraddr; 970 int len, idx, result, nodeid; 971 struct connection *newcon; 972 struct socket *newsock; 973 unsigned int mark; 974 975 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 976 if (result == -EAGAIN) 977 return DLM_IO_END; 978 else if (result < 0) 979 goto accept_err; 980 981 /* Get the connected socket's peer */ 982 memset(&peeraddr, 0, sizeof(peeraddr)); 983 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 984 if (len < 0) { 985 result = -ECONNABORTED; 986 goto accept_err; 987 } 988 989 /* Get the new node's NODEID */ 990 make_sockaddr(&peeraddr, 0, &len); 991 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 992 switch (peeraddr.ss_family) { 993 case AF_INET: { 994 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 995 996 log_print("connect from non cluster IPv4 node %pI4", 997 &sin->sin_addr); 998 break; 999 } 1000 #if IS_ENABLED(CONFIG_IPV6) 1001 case AF_INET6: { 1002 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1003 1004 log_print("connect from non cluster IPv6 node %pI6c", 1005 &sin6->sin6_addr); 1006 break; 1007 } 1008 #endif 1009 default: 1010 log_print("invalid family from non cluster node"); 1011 break; 1012 } 1013 1014 sock_release(newsock); 1015 return -1; 1016 } 1017 1018 log_print("got connection from %d", nodeid); 1019 1020 /* Check to see if we already have a connection to this node. This 1021 * could happen if the two nodes initiate a connection at roughly 1022 * the same time and the connections cross on the wire. 1023 * In this case we store the incoming one in "othercon" 1024 */ 1025 idx = srcu_read_lock(&connections_srcu); 1026 newcon = nodeid2con(nodeid, 0); 1027 if (WARN_ON_ONCE(!newcon)) { 1028 srcu_read_unlock(&connections_srcu, idx); 1029 result = -ENOENT; 1030 goto accept_err; 1031 } 1032 1033 sock_set_mark(newsock->sk, mark); 1034 1035 down_write(&newcon->sock_lock); 1036 if (newcon->sock) { 1037 struct connection *othercon = newcon->othercon; 1038 1039 if (!othercon) { 1040 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1041 if (!othercon) { 1042 log_print("failed to allocate incoming socket"); 1043 up_write(&newcon->sock_lock); 1044 srcu_read_unlock(&connections_srcu, idx); 1045 result = -ENOMEM; 1046 goto accept_err; 1047 } 1048 1049 dlm_con_init(othercon, nodeid); 1050 lockdep_set_subclass(&othercon->sock_lock, 1); 1051 newcon->othercon = othercon; 1052 set_bit(CF_IS_OTHERCON, &othercon->flags); 1053 } else { 1054 /* close other sock con if we have something new */ 1055 close_connection(othercon, false); 1056 } 1057 1058 down_write(&othercon->sock_lock); 1059 add_sock(newsock, othercon); 1060 1061 /* check if we receved something while adding */ 1062 lock_sock(othercon->sock->sk); 1063 lowcomms_queue_rwork(othercon); 1064 release_sock(othercon->sock->sk); 1065 up_write(&othercon->sock_lock); 1066 } 1067 else { 1068 /* accept copies the sk after we've saved the callbacks, so we 1069 don't want to save them a second time or comm errors will 1070 result in calling sk_error_report recursively. */ 1071 add_sock(newsock, newcon); 1072 1073 /* check if we receved something while adding */ 1074 lock_sock(newcon->sock->sk); 1075 lowcomms_queue_rwork(newcon); 1076 release_sock(newcon->sock->sk); 1077 } 1078 up_write(&newcon->sock_lock); 1079 srcu_read_unlock(&connections_srcu, idx); 1080 1081 return DLM_IO_SUCCESS; 1082 1083 accept_err: 1084 if (newsock) 1085 sock_release(newsock); 1086 1087 return result; 1088 } 1089 1090 /* 1091 * writequeue_entry_complete - try to delete and free write queue entry 1092 * @e: write queue entry to try to delete 1093 * @completed: bytes completed 1094 * 1095 * writequeue_lock must be held. 1096 */ 1097 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1098 { 1099 e->offset += completed; 1100 e->len -= completed; 1101 /* signal that page was half way transmitted */ 1102 e->dirty = true; 1103 1104 if (e->len == 0 && e->users == 0) 1105 free_entry(e); 1106 } 1107 1108 /* 1109 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1110 */ 1111 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1112 { 1113 struct sockaddr_storage localaddr; 1114 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1115 int i, addr_len, result = 0; 1116 1117 for (i = 0; i < dlm_local_count; i++) { 1118 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1119 make_sockaddr(&localaddr, port, &addr_len); 1120 1121 if (!i) 1122 result = kernel_bind(sock, addr, addr_len); 1123 else 1124 result = sock_bind_add(sock->sk, addr, addr_len); 1125 1126 if (result < 0) { 1127 log_print("Can't bind to %d addr number %d, %d.\n", 1128 port, i + 1, result); 1129 break; 1130 } 1131 } 1132 return result; 1133 } 1134 1135 /* Get local addresses */ 1136 static void init_local(void) 1137 { 1138 struct sockaddr_storage sas; 1139 int i; 1140 1141 dlm_local_count = 0; 1142 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1143 if (dlm_our_addr(&sas, i)) 1144 break; 1145 1146 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1147 } 1148 } 1149 1150 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1151 { 1152 struct writequeue_entry *entry; 1153 1154 entry = dlm_allocate_writequeue(); 1155 if (!entry) 1156 return NULL; 1157 1158 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1159 if (!entry->page) { 1160 dlm_free_writequeue(entry); 1161 return NULL; 1162 } 1163 1164 entry->offset = 0; 1165 entry->len = 0; 1166 entry->end = 0; 1167 entry->dirty = false; 1168 entry->con = con; 1169 entry->users = 1; 1170 kref_init(&entry->ref); 1171 return entry; 1172 } 1173 1174 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1175 char **ppc, void (*cb)(void *data), 1176 void *data) 1177 { 1178 struct writequeue_entry *e; 1179 1180 spin_lock_bh(&con->writequeue_lock); 1181 if (!list_empty(&con->writequeue)) { 1182 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1183 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1184 kref_get(&e->ref); 1185 1186 *ppc = page_address(e->page) + e->end; 1187 if (cb) 1188 cb(data); 1189 1190 e->end += len; 1191 e->users++; 1192 goto out; 1193 } 1194 } 1195 1196 e = new_writequeue_entry(con); 1197 if (!e) 1198 goto out; 1199 1200 kref_get(&e->ref); 1201 *ppc = page_address(e->page); 1202 e->end += len; 1203 if (cb) 1204 cb(data); 1205 1206 list_add_tail(&e->list, &con->writequeue); 1207 1208 out: 1209 spin_unlock_bh(&con->writequeue_lock); 1210 return e; 1211 }; 1212 1213 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1214 gfp_t allocation, char **ppc, 1215 void (*cb)(void *data), 1216 void *data) 1217 { 1218 struct writequeue_entry *e; 1219 struct dlm_msg *msg; 1220 1221 msg = dlm_allocate_msg(allocation); 1222 if (!msg) 1223 return NULL; 1224 1225 kref_init(&msg->ref); 1226 1227 e = new_wq_entry(con, len, ppc, cb, data); 1228 if (!e) { 1229 dlm_free_msg(msg); 1230 return NULL; 1231 } 1232 1233 msg->retransmit = false; 1234 msg->orig_msg = NULL; 1235 msg->ppc = *ppc; 1236 msg->len = len; 1237 msg->entry = e; 1238 1239 return msg; 1240 } 1241 1242 /* avoid false positive for nodes_srcu, unlock happens in 1243 * dlm_lowcomms_commit_msg which is a must call if success 1244 */ 1245 #ifndef __CHECKER__ 1246 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1247 char **ppc, void (*cb)(void *data), 1248 void *data) 1249 { 1250 struct connection *con; 1251 struct dlm_msg *msg; 1252 int idx; 1253 1254 if (len > DLM_MAX_SOCKET_BUFSIZE || 1255 len < sizeof(struct dlm_header)) { 1256 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1257 log_print("failed to allocate a buffer of size %d", len); 1258 WARN_ON_ONCE(1); 1259 return NULL; 1260 } 1261 1262 idx = srcu_read_lock(&connections_srcu); 1263 con = nodeid2con(nodeid, 0); 1264 if (WARN_ON_ONCE(!con)) { 1265 srcu_read_unlock(&connections_srcu, idx); 1266 return NULL; 1267 } 1268 1269 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1270 if (!msg) { 1271 srcu_read_unlock(&connections_srcu, idx); 1272 return NULL; 1273 } 1274 1275 /* for dlm_lowcomms_commit_msg() */ 1276 kref_get(&msg->ref); 1277 /* we assume if successful commit must called */ 1278 msg->idx = idx; 1279 return msg; 1280 } 1281 #endif 1282 1283 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1284 { 1285 struct writequeue_entry *e = msg->entry; 1286 struct connection *con = e->con; 1287 int users; 1288 1289 spin_lock_bh(&con->writequeue_lock); 1290 kref_get(&msg->ref); 1291 list_add(&msg->list, &e->msgs); 1292 1293 users = --e->users; 1294 if (users) 1295 goto out; 1296 1297 e->len = DLM_WQ_LENGTH_BYTES(e); 1298 1299 lowcomms_queue_swork(con); 1300 1301 out: 1302 spin_unlock_bh(&con->writequeue_lock); 1303 return; 1304 } 1305 1306 /* avoid false positive for nodes_srcu, lock was happen in 1307 * dlm_lowcomms_new_msg 1308 */ 1309 #ifndef __CHECKER__ 1310 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1311 { 1312 _dlm_lowcomms_commit_msg(msg); 1313 srcu_read_unlock(&connections_srcu, msg->idx); 1314 /* because dlm_lowcomms_new_msg() */ 1315 kref_put(&msg->ref, dlm_msg_release); 1316 } 1317 #endif 1318 1319 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1320 { 1321 kref_put(&msg->ref, dlm_msg_release); 1322 } 1323 1324 /* does not held connections_srcu, usage lowcomms_error_report only */ 1325 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1326 { 1327 struct dlm_msg *msg_resend; 1328 char *ppc; 1329 1330 if (msg->retransmit) 1331 return 1; 1332 1333 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1334 GFP_ATOMIC, &ppc, NULL, NULL); 1335 if (!msg_resend) 1336 return -ENOMEM; 1337 1338 msg->retransmit = true; 1339 kref_get(&msg->ref); 1340 msg_resend->orig_msg = msg; 1341 1342 memcpy(ppc, msg->ppc, msg->len); 1343 _dlm_lowcomms_commit_msg(msg_resend); 1344 dlm_lowcomms_put_msg(msg_resend); 1345 1346 return 0; 1347 } 1348 1349 /* Send a message */ 1350 static int send_to_sock(struct connection *con) 1351 { 1352 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1353 struct writequeue_entry *e; 1354 int len, offset, ret; 1355 1356 spin_lock_bh(&con->writequeue_lock); 1357 e = con_next_wq(con); 1358 if (!e) { 1359 clear_bit(CF_SEND_PENDING, &con->flags); 1360 spin_unlock_bh(&con->writequeue_lock); 1361 return DLM_IO_END; 1362 } 1363 1364 len = e->len; 1365 offset = e->offset; 1366 WARN_ON_ONCE(len == 0 && e->users == 0); 1367 spin_unlock_bh(&con->writequeue_lock); 1368 1369 ret = kernel_sendpage(con->sock, e->page, offset, len, 1370 msg_flags); 1371 trace_dlm_send(con->nodeid, ret); 1372 if (ret == -EAGAIN || ret == 0) { 1373 lock_sock(con->sock->sk); 1374 spin_lock_bh(&con->writequeue_lock); 1375 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1376 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1377 /* Notify TCP that we're limited by the 1378 * application window size. 1379 */ 1380 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1381 con->sock->sk->sk_write_pending++; 1382 1383 clear_bit(CF_SEND_PENDING, &con->flags); 1384 spin_unlock_bh(&con->writequeue_lock); 1385 release_sock(con->sock->sk); 1386 1387 /* wait for write_space() event */ 1388 return DLM_IO_END; 1389 } 1390 spin_unlock_bh(&con->writequeue_lock); 1391 release_sock(con->sock->sk); 1392 1393 return DLM_IO_RESCHED; 1394 } else if (ret < 0) { 1395 return ret; 1396 } 1397 1398 spin_lock_bh(&con->writequeue_lock); 1399 writequeue_entry_complete(e, ret); 1400 spin_unlock_bh(&con->writequeue_lock); 1401 1402 return DLM_IO_SUCCESS; 1403 } 1404 1405 static void clean_one_writequeue(struct connection *con) 1406 { 1407 struct writequeue_entry *e, *safe; 1408 1409 spin_lock_bh(&con->writequeue_lock); 1410 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1411 free_entry(e); 1412 } 1413 spin_unlock_bh(&con->writequeue_lock); 1414 } 1415 1416 static void connection_release(struct rcu_head *rcu) 1417 { 1418 struct connection *con = container_of(rcu, struct connection, rcu); 1419 1420 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1421 WARN_ON_ONCE(con->sock); 1422 kfree(con); 1423 } 1424 1425 /* Called from recovery when it knows that a node has 1426 left the cluster */ 1427 int dlm_lowcomms_close(int nodeid) 1428 { 1429 struct connection *con; 1430 int idx; 1431 1432 log_print("closing connection to node %d", nodeid); 1433 1434 idx = srcu_read_lock(&connections_srcu); 1435 con = nodeid2con(nodeid, 0); 1436 if (WARN_ON_ONCE(!con)) { 1437 srcu_read_unlock(&connections_srcu, idx); 1438 return -ENOENT; 1439 } 1440 1441 stop_connection_io(con); 1442 log_print("io handling for node: %d stopped", nodeid); 1443 close_connection(con, true); 1444 1445 spin_lock(&connections_lock); 1446 hlist_del_rcu(&con->list); 1447 spin_unlock(&connections_lock); 1448 1449 clean_one_writequeue(con); 1450 call_srcu(&connections_srcu, &con->rcu, connection_release); 1451 if (con->othercon) { 1452 clean_one_writequeue(con->othercon); 1453 if (con->othercon) 1454 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1455 } 1456 srcu_read_unlock(&connections_srcu, idx); 1457 1458 /* for debugging we print when we are done to compare with other 1459 * messages in between. This function need to be correctly synchronized 1460 * with io handling 1461 */ 1462 log_print("closing connection to node %d done", nodeid); 1463 1464 return 0; 1465 } 1466 1467 /* Receive worker function */ 1468 static void process_recv_sockets(struct work_struct *work) 1469 { 1470 struct connection *con = container_of(work, struct connection, rwork); 1471 int ret, buflen; 1472 1473 down_read(&con->sock_lock); 1474 if (!con->sock) { 1475 up_read(&con->sock_lock); 1476 return; 1477 } 1478 1479 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1480 do { 1481 ret = receive_from_sock(con, buflen); 1482 } while (ret == DLM_IO_SUCCESS); 1483 up_read(&con->sock_lock); 1484 1485 switch (ret) { 1486 case DLM_IO_END: 1487 /* CF_RECV_PENDING cleared */ 1488 break; 1489 case DLM_IO_EOF: 1490 close_connection(con, false); 1491 /* CF_RECV_PENDING cleared */ 1492 break; 1493 case DLM_IO_RESCHED: 1494 cond_resched(); 1495 queue_work(io_workqueue, &con->rwork); 1496 /* CF_RECV_PENDING not cleared */ 1497 break; 1498 default: 1499 if (ret < 0) { 1500 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1501 close_connection(con, false); 1502 } else { 1503 spin_lock_bh(&con->writequeue_lock); 1504 lowcomms_queue_swork(con); 1505 spin_unlock_bh(&con->writequeue_lock); 1506 } 1507 1508 /* CF_RECV_PENDING cleared for othercon 1509 * we trigger send queue if not already done 1510 * and process_send_sockets will handle it 1511 */ 1512 break; 1513 } 1514 1515 WARN_ON_ONCE(1); 1516 break; 1517 } 1518 } 1519 1520 static void process_listen_recv_socket(struct work_struct *work) 1521 { 1522 int ret; 1523 1524 if (WARN_ON_ONCE(!listen_con.sock)) 1525 return; 1526 1527 do { 1528 ret = accept_from_sock(); 1529 } while (ret == DLM_IO_SUCCESS); 1530 1531 if (ret < 0) 1532 log_print("critical error accepting connection: %d", ret); 1533 } 1534 1535 static int dlm_connect(struct connection *con) 1536 { 1537 struct sockaddr_storage addr; 1538 int result, addr_len; 1539 struct socket *sock; 1540 unsigned int mark; 1541 1542 memset(&addr, 0, sizeof(addr)); 1543 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1544 dlm_proto_ops->try_new_addr, &mark); 1545 if (result < 0) { 1546 log_print("no address for nodeid %d", con->nodeid); 1547 return result; 1548 } 1549 1550 /* Create a socket to communicate with */ 1551 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1552 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1553 if (result < 0) 1554 return result; 1555 1556 sock_set_mark(sock->sk, mark); 1557 dlm_proto_ops->sockopts(sock); 1558 1559 result = dlm_proto_ops->bind(sock); 1560 if (result < 0) { 1561 sock_release(sock); 1562 return result; 1563 } 1564 1565 add_sock(sock, con); 1566 1567 log_print_ratelimited("connecting to %d", con->nodeid); 1568 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1569 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1570 addr_len); 1571 switch (result) { 1572 case -EINPROGRESS: 1573 /* not an error */ 1574 fallthrough; 1575 case 0: 1576 break; 1577 default: 1578 if (result < 0) 1579 dlm_close_sock(&con->sock); 1580 1581 break; 1582 } 1583 1584 return result; 1585 } 1586 1587 /* Send worker function */ 1588 static void process_send_sockets(struct work_struct *work) 1589 { 1590 struct connection *con = container_of(work, struct connection, swork); 1591 int ret; 1592 1593 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1594 1595 down_read(&con->sock_lock); 1596 if (!con->sock) { 1597 up_read(&con->sock_lock); 1598 down_write(&con->sock_lock); 1599 if (!con->sock) { 1600 ret = dlm_connect(con); 1601 switch (ret) { 1602 case 0: 1603 break; 1604 case -EINPROGRESS: 1605 /* avoid spamming resched on connection 1606 * we might can switch to a state_change 1607 * event based mechanism if established 1608 */ 1609 msleep(100); 1610 break; 1611 default: 1612 /* CF_SEND_PENDING not cleared */ 1613 up_write(&con->sock_lock); 1614 log_print("connect to node %d try %d error %d", 1615 con->nodeid, con->retries++, ret); 1616 msleep(1000); 1617 /* For now we try forever to reconnect. In 1618 * future we should send a event to cluster 1619 * manager to fence itself after certain amount 1620 * of retries. 1621 */ 1622 queue_work(io_workqueue, &con->swork); 1623 return; 1624 } 1625 } 1626 downgrade_write(&con->sock_lock); 1627 } 1628 1629 do { 1630 ret = send_to_sock(con); 1631 } while (ret == DLM_IO_SUCCESS); 1632 up_read(&con->sock_lock); 1633 1634 switch (ret) { 1635 case DLM_IO_END: 1636 /* CF_SEND_PENDING cleared */ 1637 break; 1638 case DLM_IO_RESCHED: 1639 /* CF_SEND_PENDING not cleared */ 1640 cond_resched(); 1641 queue_work(io_workqueue, &con->swork); 1642 break; 1643 default: 1644 if (ret < 0) { 1645 close_connection(con, false); 1646 1647 /* CF_SEND_PENDING cleared */ 1648 spin_lock_bh(&con->writequeue_lock); 1649 lowcomms_queue_swork(con); 1650 spin_unlock_bh(&con->writequeue_lock); 1651 break; 1652 } 1653 1654 WARN_ON_ONCE(1); 1655 break; 1656 } 1657 } 1658 1659 static void work_stop(void) 1660 { 1661 if (io_workqueue) { 1662 destroy_workqueue(io_workqueue); 1663 io_workqueue = NULL; 1664 } 1665 1666 if (process_workqueue) { 1667 destroy_workqueue(process_workqueue); 1668 process_workqueue = NULL; 1669 } 1670 } 1671 1672 static int work_start(void) 1673 { 1674 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM, 1675 0); 1676 if (!io_workqueue) { 1677 log_print("can't start dlm_io"); 1678 return -ENOMEM; 1679 } 1680 1681 /* ordered dlm message process queue, 1682 * should be converted to a tasklet 1683 */ 1684 process_workqueue = alloc_ordered_workqueue("dlm_process", 1685 WQ_HIGHPRI | WQ_MEM_RECLAIM); 1686 if (!process_workqueue) { 1687 log_print("can't start dlm_process"); 1688 destroy_workqueue(io_workqueue); 1689 io_workqueue = NULL; 1690 return -ENOMEM; 1691 } 1692 1693 return 0; 1694 } 1695 1696 void dlm_lowcomms_shutdown(void) 1697 { 1698 /* stop lowcomms_listen_data_ready calls */ 1699 lock_sock(listen_con.sock->sk); 1700 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1701 release_sock(listen_con.sock->sk); 1702 1703 cancel_work_sync(&listen_con.rwork); 1704 dlm_close_sock(&listen_con.sock); 1705 1706 flush_workqueue(process_workqueue); 1707 } 1708 1709 void dlm_lowcomms_shutdown_node(int nodeid, bool force) 1710 { 1711 struct connection *con; 1712 int idx; 1713 1714 idx = srcu_read_lock(&connections_srcu); 1715 con = nodeid2con(nodeid, 0); 1716 if (WARN_ON_ONCE(!con)) { 1717 srcu_read_unlock(&connections_srcu, idx); 1718 return; 1719 } 1720 1721 flush_work(&con->swork); 1722 stop_connection_io(con); 1723 WARN_ON_ONCE(!force && !list_empty(&con->writequeue)); 1724 close_connection(con, true); 1725 clean_one_writequeue(con); 1726 if (con->othercon) 1727 clean_one_writequeue(con->othercon); 1728 allow_connection_io(con); 1729 srcu_read_unlock(&connections_srcu, idx); 1730 } 1731 1732 void dlm_lowcomms_stop(void) 1733 { 1734 work_stop(); 1735 dlm_proto_ops = NULL; 1736 } 1737 1738 static int dlm_listen_for_all(void) 1739 { 1740 struct socket *sock; 1741 int result; 1742 1743 log_print("Using %s for communications", 1744 dlm_proto_ops->name); 1745 1746 result = dlm_proto_ops->listen_validate(); 1747 if (result < 0) 1748 return result; 1749 1750 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1751 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1752 if (result < 0) { 1753 log_print("Can't create comms socket: %d", result); 1754 return result; 1755 } 1756 1757 sock_set_mark(sock->sk, dlm_config.ci_mark); 1758 dlm_proto_ops->listen_sockopts(sock); 1759 1760 result = dlm_proto_ops->listen_bind(sock); 1761 if (result < 0) 1762 goto out; 1763 1764 lock_sock(sock->sk); 1765 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1766 listen_sock.sk_write_space = sock->sk->sk_write_space; 1767 listen_sock.sk_error_report = sock->sk->sk_error_report; 1768 listen_sock.sk_state_change = sock->sk->sk_state_change; 1769 1770 listen_con.sock = sock; 1771 1772 sock->sk->sk_allocation = GFP_NOFS; 1773 sock->sk->sk_use_task_frag = false; 1774 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1775 release_sock(sock->sk); 1776 1777 result = sock->ops->listen(sock, 5); 1778 if (result < 0) { 1779 dlm_close_sock(&listen_con.sock); 1780 return result; 1781 } 1782 1783 return 0; 1784 1785 out: 1786 sock_release(sock); 1787 return result; 1788 } 1789 1790 static int dlm_tcp_bind(struct socket *sock) 1791 { 1792 struct sockaddr_storage src_addr; 1793 int result, addr_len; 1794 1795 /* Bind to our cluster-known address connecting to avoid 1796 * routing problems. 1797 */ 1798 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1799 make_sockaddr(&src_addr, 0, &addr_len); 1800 1801 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1802 addr_len); 1803 if (result < 0) { 1804 /* This *may* not indicate a critical error */ 1805 log_print("could not bind for connect: %d", result); 1806 } 1807 1808 return 0; 1809 } 1810 1811 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1812 struct sockaddr *addr, int addr_len) 1813 { 1814 return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1815 } 1816 1817 static int dlm_tcp_listen_validate(void) 1818 { 1819 /* We don't support multi-homed hosts */ 1820 if (dlm_local_count > 1) { 1821 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1822 return -EINVAL; 1823 } 1824 1825 return 0; 1826 } 1827 1828 static void dlm_tcp_sockopts(struct socket *sock) 1829 { 1830 /* Turn off Nagle's algorithm */ 1831 tcp_sock_set_nodelay(sock->sk); 1832 } 1833 1834 static void dlm_tcp_listen_sockopts(struct socket *sock) 1835 { 1836 dlm_tcp_sockopts(sock); 1837 sock_set_reuseaddr(sock->sk); 1838 } 1839 1840 static int dlm_tcp_listen_bind(struct socket *sock) 1841 { 1842 int addr_len; 1843 1844 /* Bind to our port */ 1845 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1846 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1847 addr_len); 1848 } 1849 1850 static const struct dlm_proto_ops dlm_tcp_ops = { 1851 .name = "TCP", 1852 .proto = IPPROTO_TCP, 1853 .connect = dlm_tcp_connect, 1854 .sockopts = dlm_tcp_sockopts, 1855 .bind = dlm_tcp_bind, 1856 .listen_validate = dlm_tcp_listen_validate, 1857 .listen_sockopts = dlm_tcp_listen_sockopts, 1858 .listen_bind = dlm_tcp_listen_bind, 1859 }; 1860 1861 static int dlm_sctp_bind(struct socket *sock) 1862 { 1863 return sctp_bind_addrs(sock, 0); 1864 } 1865 1866 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1867 struct sockaddr *addr, int addr_len) 1868 { 1869 int ret; 1870 1871 /* 1872 * Make sock->ops->connect() function return in specified time, 1873 * since O_NONBLOCK argument in connect() function does not work here, 1874 * then, we should restore the default value of this attribute. 1875 */ 1876 sock_set_sndtimeo(sock->sk, 5); 1877 ret = sock->ops->connect(sock, addr, addr_len, 0); 1878 sock_set_sndtimeo(sock->sk, 0); 1879 return ret; 1880 } 1881 1882 static int dlm_sctp_listen_validate(void) 1883 { 1884 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1885 log_print("SCTP is not enabled by this kernel"); 1886 return -EOPNOTSUPP; 1887 } 1888 1889 request_module("sctp"); 1890 return 0; 1891 } 1892 1893 static int dlm_sctp_bind_listen(struct socket *sock) 1894 { 1895 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1896 } 1897 1898 static void dlm_sctp_sockopts(struct socket *sock) 1899 { 1900 /* Turn off Nagle's algorithm */ 1901 sctp_sock_set_nodelay(sock->sk); 1902 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1903 } 1904 1905 static const struct dlm_proto_ops dlm_sctp_ops = { 1906 .name = "SCTP", 1907 .proto = IPPROTO_SCTP, 1908 .try_new_addr = true, 1909 .connect = dlm_sctp_connect, 1910 .sockopts = dlm_sctp_sockopts, 1911 .bind = dlm_sctp_bind, 1912 .listen_validate = dlm_sctp_listen_validate, 1913 .listen_sockopts = dlm_sctp_sockopts, 1914 .listen_bind = dlm_sctp_bind_listen, 1915 }; 1916 1917 int dlm_lowcomms_start(void) 1918 { 1919 int error; 1920 1921 init_local(); 1922 if (!dlm_local_count) { 1923 error = -ENOTCONN; 1924 log_print("no local IP address has been set"); 1925 goto fail; 1926 } 1927 1928 error = work_start(); 1929 if (error) 1930 goto fail; 1931 1932 /* Start listening */ 1933 switch (dlm_config.ci_protocol) { 1934 case DLM_PROTO_TCP: 1935 dlm_proto_ops = &dlm_tcp_ops; 1936 break; 1937 case DLM_PROTO_SCTP: 1938 dlm_proto_ops = &dlm_sctp_ops; 1939 break; 1940 default: 1941 log_print("Invalid protocol identifier %d set", 1942 dlm_config.ci_protocol); 1943 error = -EINVAL; 1944 goto fail_proto_ops; 1945 } 1946 1947 error = dlm_listen_for_all(); 1948 if (error) 1949 goto fail_listen; 1950 1951 return 0; 1952 1953 fail_listen: 1954 dlm_proto_ops = NULL; 1955 fail_proto_ops: 1956 work_stop(); 1957 fail: 1958 return error; 1959 } 1960 1961 void dlm_lowcomms_init(void) 1962 { 1963 int i; 1964 1965 for (i = 0; i < CONN_HASH_SIZE; i++) 1966 INIT_HLIST_HEAD(&connection_hash[i]); 1967 1968 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1969 } 1970 1971 void dlm_lowcomms_exit(void) 1972 { 1973 struct connection *con; 1974 int i, idx; 1975 1976 idx = srcu_read_lock(&connections_srcu); 1977 for (i = 0; i < CONN_HASH_SIZE; i++) { 1978 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1979 spin_lock(&connections_lock); 1980 hlist_del_rcu(&con->list); 1981 spin_unlock(&connections_lock); 1982 1983 if (con->othercon) 1984 call_srcu(&connections_srcu, &con->othercon->rcu, 1985 connection_release); 1986 call_srcu(&connections_srcu, &con->rcu, connection_release); 1987 } 1988 } 1989 srcu_read_unlock(&connections_srcu, idx); 1990 } 1991