1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 #define CONN_HASH_SIZE 32 63 64 /* Number of messages to send before rescheduling */ 65 #define MAX_SEND_MSG_COUNT 25 66 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 67 68 struct cbuf { 69 unsigned int base; 70 unsigned int len; 71 unsigned int mask; 72 }; 73 74 static void cbuf_add(struct cbuf *cb, int n) 75 { 76 cb->len += n; 77 } 78 79 static int cbuf_data(struct cbuf *cb) 80 { 81 return ((cb->base + cb->len) & cb->mask); 82 } 83 84 static void cbuf_init(struct cbuf *cb, int size) 85 { 86 cb->base = cb->len = 0; 87 cb->mask = size-1; 88 } 89 90 static void cbuf_eat(struct cbuf *cb, int n) 91 { 92 cb->len -= n; 93 cb->base += n; 94 cb->base &= cb->mask; 95 } 96 97 static bool cbuf_empty(struct cbuf *cb) 98 { 99 return cb->len == 0; 100 } 101 102 struct connection { 103 struct socket *sock; /* NULL if not connected */ 104 uint32_t nodeid; /* So we know who we are in the list */ 105 struct mutex sock_mutex; 106 unsigned long flags; 107 #define CF_READ_PENDING 1 108 #define CF_WRITE_PENDING 2 109 #define CF_INIT_PENDING 4 110 #define CF_IS_OTHERCON 5 111 #define CF_CLOSE 6 112 #define CF_APP_LIMITED 7 113 #define CF_CLOSING 8 114 #define CF_SHUTDOWN 9 115 struct list_head writequeue; /* List of outgoing writequeue_entries */ 116 spinlock_t writequeue_lock; 117 int (*rx_action) (struct connection *); /* What to do when active */ 118 void (*connect_action) (struct connection *); /* What to do to connect */ 119 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 120 struct page *rx_page; 121 struct cbuf cb; 122 int retries; 123 #define MAX_CONNECT_RETRIES 3 124 struct hlist_node list; 125 struct connection *othercon; 126 struct work_struct rwork; /* Receive workqueue */ 127 struct work_struct swork; /* Send workqueue */ 128 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 129 }; 130 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 131 132 /* An entry waiting to be sent */ 133 struct writequeue_entry { 134 struct list_head list; 135 struct page *page; 136 int offset; 137 int len; 138 int end; 139 int users; 140 struct connection *con; 141 }; 142 143 struct dlm_node_addr { 144 struct list_head list; 145 int nodeid; 146 int addr_count; 147 int curr_addr_index; 148 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 149 }; 150 151 static struct listen_sock_callbacks { 152 void (*sk_error_report)(struct sock *); 153 void (*sk_data_ready)(struct sock *); 154 void (*sk_state_change)(struct sock *); 155 void (*sk_write_space)(struct sock *); 156 } listen_sock; 157 158 static LIST_HEAD(dlm_node_addrs); 159 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 160 161 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 162 static int dlm_local_count; 163 static int dlm_allow_conn; 164 165 /* Work queues */ 166 static struct workqueue_struct *recv_workqueue; 167 static struct workqueue_struct *send_workqueue; 168 169 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 170 static DEFINE_MUTEX(connections_lock); 171 static struct kmem_cache *con_cache; 172 173 static void process_recv_sockets(struct work_struct *work); 174 static void process_send_sockets(struct work_struct *work); 175 176 177 /* This is deliberately very simple because most clusters have simple 178 sequential nodeids, so we should be able to go straight to a connection 179 struct in the array */ 180 static inline int nodeid_hash(int nodeid) 181 { 182 return nodeid & (CONN_HASH_SIZE-1); 183 } 184 185 static struct connection *__find_con(int nodeid) 186 { 187 int r; 188 struct connection *con; 189 190 r = nodeid_hash(nodeid); 191 192 hlist_for_each_entry(con, &connection_hash[r], list) { 193 if (con->nodeid == nodeid) 194 return con; 195 } 196 return NULL; 197 } 198 199 /* 200 * If 'allocation' is zero then we don't attempt to create a new 201 * connection structure for this node. 202 */ 203 static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 204 { 205 struct connection *con = NULL; 206 int r; 207 208 con = __find_con(nodeid); 209 if (con || !alloc) 210 return con; 211 212 con = kmem_cache_zalloc(con_cache, alloc); 213 if (!con) 214 return NULL; 215 216 r = nodeid_hash(nodeid); 217 hlist_add_head(&con->list, &connection_hash[r]); 218 219 con->nodeid = nodeid; 220 mutex_init(&con->sock_mutex); 221 INIT_LIST_HEAD(&con->writequeue); 222 spin_lock_init(&con->writequeue_lock); 223 INIT_WORK(&con->swork, process_send_sockets); 224 INIT_WORK(&con->rwork, process_recv_sockets); 225 init_waitqueue_head(&con->shutdown_wait); 226 227 /* Setup action pointers for child sockets */ 228 if (con->nodeid) { 229 struct connection *zerocon = __find_con(0); 230 231 con->connect_action = zerocon->connect_action; 232 if (!con->rx_action) 233 con->rx_action = zerocon->rx_action; 234 } 235 236 return con; 237 } 238 239 /* Loop round all connections */ 240 static void foreach_conn(void (*conn_func)(struct connection *c)) 241 { 242 int i; 243 struct hlist_node *n; 244 struct connection *con; 245 246 for (i = 0; i < CONN_HASH_SIZE; i++) { 247 hlist_for_each_entry_safe(con, n, &connection_hash[i], list) 248 conn_func(con); 249 } 250 } 251 252 static struct connection *nodeid2con(int nodeid, gfp_t allocation) 253 { 254 struct connection *con; 255 256 mutex_lock(&connections_lock); 257 con = __nodeid2con(nodeid, allocation); 258 mutex_unlock(&connections_lock); 259 260 return con; 261 } 262 263 static struct dlm_node_addr *find_node_addr(int nodeid) 264 { 265 struct dlm_node_addr *na; 266 267 list_for_each_entry(na, &dlm_node_addrs, list) { 268 if (na->nodeid == nodeid) 269 return na; 270 } 271 return NULL; 272 } 273 274 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 275 { 276 switch (x->ss_family) { 277 case AF_INET: { 278 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 279 struct sockaddr_in *siny = (struct sockaddr_in *)y; 280 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 281 return 0; 282 if (sinx->sin_port != siny->sin_port) 283 return 0; 284 break; 285 } 286 case AF_INET6: { 287 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 288 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 289 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 290 return 0; 291 if (sinx->sin6_port != siny->sin6_port) 292 return 0; 293 break; 294 } 295 default: 296 return 0; 297 } 298 return 1; 299 } 300 301 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 302 struct sockaddr *sa_out, bool try_new_addr) 303 { 304 struct sockaddr_storage sas; 305 struct dlm_node_addr *na; 306 307 if (!dlm_local_count) 308 return -1; 309 310 spin_lock(&dlm_node_addrs_spin); 311 na = find_node_addr(nodeid); 312 if (na && na->addr_count) { 313 memcpy(&sas, na->addr[na->curr_addr_index], 314 sizeof(struct sockaddr_storage)); 315 316 if (try_new_addr) { 317 na->curr_addr_index++; 318 if (na->curr_addr_index == na->addr_count) 319 na->curr_addr_index = 0; 320 } 321 } 322 spin_unlock(&dlm_node_addrs_spin); 323 324 if (!na) 325 return -EEXIST; 326 327 if (!na->addr_count) 328 return -ENOENT; 329 330 if (sas_out) 331 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 332 333 if (!sa_out) 334 return 0; 335 336 if (dlm_local_addr[0]->ss_family == AF_INET) { 337 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 338 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 339 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 340 } else { 341 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 342 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 343 ret6->sin6_addr = in6->sin6_addr; 344 } 345 346 return 0; 347 } 348 349 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 350 { 351 struct dlm_node_addr *na; 352 int rv = -EEXIST; 353 int addr_i; 354 355 spin_lock(&dlm_node_addrs_spin); 356 list_for_each_entry(na, &dlm_node_addrs, list) { 357 if (!na->addr_count) 358 continue; 359 360 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 361 if (addr_compare(na->addr[addr_i], addr)) { 362 *nodeid = na->nodeid; 363 rv = 0; 364 goto unlock; 365 } 366 } 367 } 368 unlock: 369 spin_unlock(&dlm_node_addrs_spin); 370 return rv; 371 } 372 373 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 374 { 375 struct sockaddr_storage *new_addr; 376 struct dlm_node_addr *new_node, *na; 377 378 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 379 if (!new_node) 380 return -ENOMEM; 381 382 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 383 if (!new_addr) { 384 kfree(new_node); 385 return -ENOMEM; 386 } 387 388 memcpy(new_addr, addr, len); 389 390 spin_lock(&dlm_node_addrs_spin); 391 na = find_node_addr(nodeid); 392 if (!na) { 393 new_node->nodeid = nodeid; 394 new_node->addr[0] = new_addr; 395 new_node->addr_count = 1; 396 list_add(&new_node->list, &dlm_node_addrs); 397 spin_unlock(&dlm_node_addrs_spin); 398 return 0; 399 } 400 401 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 402 spin_unlock(&dlm_node_addrs_spin); 403 kfree(new_addr); 404 kfree(new_node); 405 return -ENOSPC; 406 } 407 408 na->addr[na->addr_count++] = new_addr; 409 spin_unlock(&dlm_node_addrs_spin); 410 kfree(new_node); 411 return 0; 412 } 413 414 /* Data available on socket or listen socket received a connect */ 415 static void lowcomms_data_ready(struct sock *sk) 416 { 417 struct connection *con; 418 419 read_lock_bh(&sk->sk_callback_lock); 420 con = sock2con(sk); 421 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 422 queue_work(recv_workqueue, &con->rwork); 423 read_unlock_bh(&sk->sk_callback_lock); 424 } 425 426 static void lowcomms_write_space(struct sock *sk) 427 { 428 struct connection *con; 429 430 read_lock_bh(&sk->sk_callback_lock); 431 con = sock2con(sk); 432 if (!con) 433 goto out; 434 435 clear_bit(SOCK_NOSPACE, &con->sock->flags); 436 437 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 438 con->sock->sk->sk_write_pending--; 439 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 440 } 441 442 queue_work(send_workqueue, &con->swork); 443 out: 444 read_unlock_bh(&sk->sk_callback_lock); 445 } 446 447 static inline void lowcomms_connect_sock(struct connection *con) 448 { 449 if (test_bit(CF_CLOSE, &con->flags)) 450 return; 451 queue_work(send_workqueue, &con->swork); 452 cond_resched(); 453 } 454 455 static void lowcomms_state_change(struct sock *sk) 456 { 457 /* SCTP layer is not calling sk_data_ready when the connection 458 * is done, so we catch the signal through here. Also, it 459 * doesn't switch socket state when entering shutdown, so we 460 * skip the write in that case. 461 */ 462 if (sk->sk_shutdown) { 463 if (sk->sk_shutdown == RCV_SHUTDOWN) 464 lowcomms_data_ready(sk); 465 } else if (sk->sk_state == TCP_ESTABLISHED) { 466 lowcomms_write_space(sk); 467 } 468 } 469 470 int dlm_lowcomms_connect_node(int nodeid) 471 { 472 struct connection *con; 473 474 if (nodeid == dlm_our_nodeid()) 475 return 0; 476 477 con = nodeid2con(nodeid, GFP_NOFS); 478 if (!con) 479 return -ENOMEM; 480 lowcomms_connect_sock(con); 481 return 0; 482 } 483 484 static void lowcomms_error_report(struct sock *sk) 485 { 486 struct connection *con; 487 struct sockaddr_storage saddr; 488 void (*orig_report)(struct sock *) = NULL; 489 490 read_lock_bh(&sk->sk_callback_lock); 491 con = sock2con(sk); 492 if (con == NULL) 493 goto out; 494 495 orig_report = listen_sock.sk_error_report; 496 if (con->sock == NULL || 497 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 498 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 499 "sending to node %d, port %d, " 500 "sk_err=%d/%d\n", dlm_our_nodeid(), 501 con->nodeid, dlm_config.ci_tcp_port, 502 sk->sk_err, sk->sk_err_soft); 503 } else if (saddr.ss_family == AF_INET) { 504 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 505 506 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 507 "sending to node %d at %pI4, port %d, " 508 "sk_err=%d/%d\n", dlm_our_nodeid(), 509 con->nodeid, &sin4->sin_addr.s_addr, 510 dlm_config.ci_tcp_port, sk->sk_err, 511 sk->sk_err_soft); 512 } else { 513 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 514 515 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 516 "sending to node %d at %u.%u.%u.%u, " 517 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 518 con->nodeid, sin6->sin6_addr.s6_addr32[0], 519 sin6->sin6_addr.s6_addr32[1], 520 sin6->sin6_addr.s6_addr32[2], 521 sin6->sin6_addr.s6_addr32[3], 522 dlm_config.ci_tcp_port, sk->sk_err, 523 sk->sk_err_soft); 524 } 525 out: 526 read_unlock_bh(&sk->sk_callback_lock); 527 if (orig_report) 528 orig_report(sk); 529 } 530 531 /* Note: sk_callback_lock must be locked before calling this function. */ 532 static void save_listen_callbacks(struct socket *sock) 533 { 534 struct sock *sk = sock->sk; 535 536 listen_sock.sk_data_ready = sk->sk_data_ready; 537 listen_sock.sk_state_change = sk->sk_state_change; 538 listen_sock.sk_write_space = sk->sk_write_space; 539 listen_sock.sk_error_report = sk->sk_error_report; 540 } 541 542 static void restore_callbacks(struct socket *sock) 543 { 544 struct sock *sk = sock->sk; 545 546 write_lock_bh(&sk->sk_callback_lock); 547 sk->sk_user_data = NULL; 548 sk->sk_data_ready = listen_sock.sk_data_ready; 549 sk->sk_state_change = listen_sock.sk_state_change; 550 sk->sk_write_space = listen_sock.sk_write_space; 551 sk->sk_error_report = listen_sock.sk_error_report; 552 write_unlock_bh(&sk->sk_callback_lock); 553 } 554 555 /* Make a socket active */ 556 static void add_sock(struct socket *sock, struct connection *con) 557 { 558 struct sock *sk = sock->sk; 559 560 write_lock_bh(&sk->sk_callback_lock); 561 con->sock = sock; 562 563 sk->sk_user_data = con; 564 /* Install a data_ready callback */ 565 sk->sk_data_ready = lowcomms_data_ready; 566 sk->sk_write_space = lowcomms_write_space; 567 sk->sk_state_change = lowcomms_state_change; 568 sk->sk_allocation = GFP_NOFS; 569 sk->sk_error_report = lowcomms_error_report; 570 write_unlock_bh(&sk->sk_callback_lock); 571 } 572 573 /* Add the port number to an IPv6 or 4 sockaddr and return the address 574 length */ 575 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 576 int *addr_len) 577 { 578 saddr->ss_family = dlm_local_addr[0]->ss_family; 579 if (saddr->ss_family == AF_INET) { 580 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 581 in4_addr->sin_port = cpu_to_be16(port); 582 *addr_len = sizeof(struct sockaddr_in); 583 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 584 } else { 585 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 586 in6_addr->sin6_port = cpu_to_be16(port); 587 *addr_len = sizeof(struct sockaddr_in6); 588 } 589 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 590 } 591 592 /* Close a remote connection and tidy up */ 593 static void close_connection(struct connection *con, bool and_other, 594 bool tx, bool rx) 595 { 596 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 597 598 if (tx && !closing && cancel_work_sync(&con->swork)) { 599 log_print("canceled swork for node %d", con->nodeid); 600 clear_bit(CF_WRITE_PENDING, &con->flags); 601 } 602 if (rx && !closing && cancel_work_sync(&con->rwork)) { 603 log_print("canceled rwork for node %d", con->nodeid); 604 clear_bit(CF_READ_PENDING, &con->flags); 605 } 606 607 mutex_lock(&con->sock_mutex); 608 if (con->sock) { 609 restore_callbacks(con->sock); 610 sock_release(con->sock); 611 con->sock = NULL; 612 } 613 if (con->othercon && and_other) { 614 /* Will only re-enter once. */ 615 close_connection(con->othercon, false, true, true); 616 } 617 if (con->rx_page) { 618 __free_page(con->rx_page); 619 con->rx_page = NULL; 620 } 621 622 con->retries = 0; 623 mutex_unlock(&con->sock_mutex); 624 clear_bit(CF_CLOSING, &con->flags); 625 } 626 627 static void shutdown_connection(struct connection *con) 628 { 629 int ret; 630 631 if (cancel_work_sync(&con->swork)) { 632 log_print("canceled swork for node %d", con->nodeid); 633 clear_bit(CF_WRITE_PENDING, &con->flags); 634 } 635 636 mutex_lock(&con->sock_mutex); 637 /* nothing to shutdown */ 638 if (!con->sock) { 639 mutex_unlock(&con->sock_mutex); 640 return; 641 } 642 643 set_bit(CF_SHUTDOWN, &con->flags); 644 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 645 mutex_unlock(&con->sock_mutex); 646 if (ret) { 647 log_print("Connection %p failed to shutdown: %d will force close", 648 con, ret); 649 goto force_close; 650 } else { 651 ret = wait_event_timeout(con->shutdown_wait, 652 !test_bit(CF_SHUTDOWN, &con->flags), 653 DLM_SHUTDOWN_WAIT_TIMEOUT); 654 if (ret == 0) { 655 log_print("Connection %p shutdown timed out, will force close", 656 con); 657 goto force_close; 658 } 659 } 660 661 return; 662 663 force_close: 664 clear_bit(CF_SHUTDOWN, &con->flags); 665 close_connection(con, false, true, true); 666 } 667 668 static void dlm_tcp_shutdown(struct connection *con) 669 { 670 if (con->othercon) 671 shutdown_connection(con->othercon); 672 shutdown_connection(con); 673 } 674 675 /* Data received from remote end */ 676 static int receive_from_sock(struct connection *con) 677 { 678 int ret = 0; 679 struct msghdr msg = {}; 680 struct kvec iov[2]; 681 unsigned len; 682 int r; 683 int call_again_soon = 0; 684 int nvec; 685 686 mutex_lock(&con->sock_mutex); 687 688 if (con->sock == NULL) { 689 ret = -EAGAIN; 690 goto out_close; 691 } 692 if (con->nodeid == 0) { 693 ret = -EINVAL; 694 goto out_close; 695 } 696 697 if (con->rx_page == NULL) { 698 /* 699 * This doesn't need to be atomic, but I think it should 700 * improve performance if it is. 701 */ 702 con->rx_page = alloc_page(GFP_ATOMIC); 703 if (con->rx_page == NULL) 704 goto out_resched; 705 cbuf_init(&con->cb, PAGE_SIZE); 706 } 707 708 /* 709 * iov[0] is the bit of the circular buffer between the current end 710 * point (cb.base + cb.len) and the end of the buffer. 711 */ 712 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 713 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 714 iov[1].iov_len = 0; 715 nvec = 1; 716 717 /* 718 * iov[1] is the bit of the circular buffer between the start of the 719 * buffer and the start of the currently used section (cb.base) 720 */ 721 if (cbuf_data(&con->cb) >= con->cb.base) { 722 iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb); 723 iov[1].iov_len = con->cb.base; 724 iov[1].iov_base = page_address(con->rx_page); 725 nvec = 2; 726 } 727 len = iov[0].iov_len + iov[1].iov_len; 728 iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len); 729 730 r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); 731 if (ret <= 0) 732 goto out_close; 733 else if (ret == len) 734 call_again_soon = 1; 735 736 cbuf_add(&con->cb, ret); 737 ret = dlm_process_incoming_buffer(con->nodeid, 738 page_address(con->rx_page), 739 con->cb.base, con->cb.len, 740 PAGE_SIZE); 741 if (ret < 0) { 742 log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d", 743 ret, page_address(con->rx_page), con->cb.base, 744 con->cb.len, r); 745 cbuf_eat(&con->cb, r); 746 } else { 747 cbuf_eat(&con->cb, ret); 748 } 749 750 if (cbuf_empty(&con->cb) && !call_again_soon) { 751 __free_page(con->rx_page); 752 con->rx_page = NULL; 753 } 754 755 if (call_again_soon) 756 goto out_resched; 757 mutex_unlock(&con->sock_mutex); 758 return 0; 759 760 out_resched: 761 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 762 queue_work(recv_workqueue, &con->rwork); 763 mutex_unlock(&con->sock_mutex); 764 return -EAGAIN; 765 766 out_close: 767 mutex_unlock(&con->sock_mutex); 768 if (ret != -EAGAIN) { 769 /* Reconnect when there is something to send */ 770 close_connection(con, false, true, false); 771 if (ret == 0) { 772 log_print("connection %p got EOF from %d", 773 con, con->nodeid); 774 /* handling for tcp shutdown */ 775 clear_bit(CF_SHUTDOWN, &con->flags); 776 wake_up(&con->shutdown_wait); 777 /* signal to breaking receive worker */ 778 ret = -1; 779 } 780 } 781 return ret; 782 } 783 784 /* Listening socket is busy, accept a connection */ 785 static int accept_from_sock(struct connection *con) 786 { 787 int result; 788 struct sockaddr_storage peeraddr; 789 struct socket *newsock; 790 int len; 791 int nodeid; 792 struct connection *newcon; 793 struct connection *addcon; 794 795 mutex_lock(&connections_lock); 796 if (!dlm_allow_conn) { 797 mutex_unlock(&connections_lock); 798 return -1; 799 } 800 mutex_unlock(&connections_lock); 801 802 mutex_lock_nested(&con->sock_mutex, 0); 803 804 if (!con->sock) { 805 mutex_unlock(&con->sock_mutex); 806 return -ENOTCONN; 807 } 808 809 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 810 if (result < 0) 811 goto accept_err; 812 813 /* Get the connected socket's peer */ 814 memset(&peeraddr, 0, sizeof(peeraddr)); 815 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 816 if (len < 0) { 817 result = -ECONNABORTED; 818 goto accept_err; 819 } 820 821 /* Get the new node's NODEID */ 822 make_sockaddr(&peeraddr, 0, &len); 823 if (addr_to_nodeid(&peeraddr, &nodeid)) { 824 unsigned char *b=(unsigned char *)&peeraddr; 825 log_print("connect from non cluster node"); 826 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 827 b, sizeof(struct sockaddr_storage)); 828 sock_release(newsock); 829 mutex_unlock(&con->sock_mutex); 830 return -1; 831 } 832 833 log_print("got connection from %d", nodeid); 834 835 /* Check to see if we already have a connection to this node. This 836 * could happen if the two nodes initiate a connection at roughly 837 * the same time and the connections cross on the wire. 838 * In this case we store the incoming one in "othercon" 839 */ 840 newcon = nodeid2con(nodeid, GFP_NOFS); 841 if (!newcon) { 842 result = -ENOMEM; 843 goto accept_err; 844 } 845 mutex_lock_nested(&newcon->sock_mutex, 1); 846 if (newcon->sock) { 847 struct connection *othercon = newcon->othercon; 848 849 if (!othercon) { 850 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 851 if (!othercon) { 852 log_print("failed to allocate incoming socket"); 853 mutex_unlock(&newcon->sock_mutex); 854 result = -ENOMEM; 855 goto accept_err; 856 } 857 othercon->nodeid = nodeid; 858 othercon->rx_action = receive_from_sock; 859 mutex_init(&othercon->sock_mutex); 860 INIT_LIST_HEAD(&othercon->writequeue); 861 spin_lock_init(&othercon->writequeue_lock); 862 INIT_WORK(&othercon->swork, process_send_sockets); 863 INIT_WORK(&othercon->rwork, process_recv_sockets); 864 init_waitqueue_head(&othercon->shutdown_wait); 865 set_bit(CF_IS_OTHERCON, &othercon->flags); 866 } else { 867 /* close other sock con if we have something new */ 868 close_connection(othercon, false, true, false); 869 } 870 871 mutex_lock_nested(&othercon->sock_mutex, 2); 872 newcon->othercon = othercon; 873 add_sock(newsock, othercon); 874 addcon = othercon; 875 mutex_unlock(&othercon->sock_mutex); 876 } 877 else { 878 newcon->rx_action = receive_from_sock; 879 /* accept copies the sk after we've saved the callbacks, so we 880 don't want to save them a second time or comm errors will 881 result in calling sk_error_report recursively. */ 882 add_sock(newsock, newcon); 883 addcon = newcon; 884 } 885 886 mutex_unlock(&newcon->sock_mutex); 887 888 /* 889 * Add it to the active queue in case we got data 890 * between processing the accept adding the socket 891 * to the read_sockets list 892 */ 893 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 894 queue_work(recv_workqueue, &addcon->rwork); 895 mutex_unlock(&con->sock_mutex); 896 897 return 0; 898 899 accept_err: 900 mutex_unlock(&con->sock_mutex); 901 if (newsock) 902 sock_release(newsock); 903 904 if (result != -EAGAIN) 905 log_print("error accepting connection from node: %d", result); 906 return result; 907 } 908 909 static void free_entry(struct writequeue_entry *e) 910 { 911 __free_page(e->page); 912 kfree(e); 913 } 914 915 /* 916 * writequeue_entry_complete - try to delete and free write queue entry 917 * @e: write queue entry to try to delete 918 * @completed: bytes completed 919 * 920 * writequeue_lock must be held. 921 */ 922 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 923 { 924 e->offset += completed; 925 e->len -= completed; 926 927 if (e->len == 0 && e->users == 0) { 928 list_del(&e->list); 929 free_entry(e); 930 } 931 } 932 933 /* 934 * sctp_bind_addrs - bind a SCTP socket to all our addresses 935 */ 936 static int sctp_bind_addrs(struct connection *con, uint16_t port) 937 { 938 struct sockaddr_storage localaddr; 939 struct sockaddr *addr = (struct sockaddr *)&localaddr; 940 int i, addr_len, result = 0; 941 942 for (i = 0; i < dlm_local_count; i++) { 943 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 944 make_sockaddr(&localaddr, port, &addr_len); 945 946 if (!i) 947 result = kernel_bind(con->sock, addr, addr_len); 948 else 949 result = sock_bind_add(con->sock->sk, addr, addr_len); 950 951 if (result < 0) { 952 log_print("Can't bind to %d addr number %d, %d.\n", 953 port, i + 1, result); 954 break; 955 } 956 } 957 return result; 958 } 959 960 /* Initiate an SCTP association. 961 This is a special case of send_to_sock() in that we don't yet have a 962 peeled-off socket for this association, so we use the listening socket 963 and add the primary IP address of the remote node. 964 */ 965 static void sctp_connect_to_sock(struct connection *con) 966 { 967 struct sockaddr_storage daddr; 968 int result; 969 int addr_len; 970 struct socket *sock; 971 unsigned int mark; 972 973 if (con->nodeid == 0) { 974 log_print("attempt to connect sock 0 foiled"); 975 return; 976 } 977 978 mutex_lock(&con->sock_mutex); 979 980 /* Some odd races can cause double-connects, ignore them */ 981 if (con->retries++ > MAX_CONNECT_RETRIES) 982 goto out; 983 984 if (con->sock) { 985 log_print("node %d already connected.", con->nodeid); 986 goto out; 987 } 988 989 memset(&daddr, 0, sizeof(daddr)); 990 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); 991 if (result < 0) { 992 log_print("no address for nodeid %d", con->nodeid); 993 goto out; 994 } 995 996 /* Create a socket to communicate with */ 997 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 998 SOCK_STREAM, IPPROTO_SCTP, &sock); 999 if (result < 0) 1000 goto socket_err; 1001 1002 /* set skb mark */ 1003 result = dlm_comm_mark(con->nodeid, &mark); 1004 if (result < 0) 1005 goto bind_err; 1006 1007 sock_set_mark(sock->sk, mark); 1008 1009 con->rx_action = receive_from_sock; 1010 con->connect_action = sctp_connect_to_sock; 1011 add_sock(sock, con); 1012 1013 /* Bind to all addresses. */ 1014 if (sctp_bind_addrs(con, 0)) 1015 goto bind_err; 1016 1017 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1018 1019 log_print("connecting to %d", con->nodeid); 1020 1021 /* Turn off Nagle's algorithm */ 1022 sctp_sock_set_nodelay(sock->sk); 1023 1024 /* 1025 * Make sock->ops->connect() function return in specified time, 1026 * since O_NONBLOCK argument in connect() function does not work here, 1027 * then, we should restore the default value of this attribute. 1028 */ 1029 sock_set_sndtimeo(sock->sk, 5); 1030 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1031 0); 1032 sock_set_sndtimeo(sock->sk, 0); 1033 1034 if (result == -EINPROGRESS) 1035 result = 0; 1036 if (result == 0) 1037 goto out; 1038 1039 bind_err: 1040 con->sock = NULL; 1041 sock_release(sock); 1042 1043 socket_err: 1044 /* 1045 * Some errors are fatal and this list might need adjusting. For other 1046 * errors we try again until the max number of retries is reached. 1047 */ 1048 if (result != -EHOSTUNREACH && 1049 result != -ENETUNREACH && 1050 result != -ENETDOWN && 1051 result != -EINVAL && 1052 result != -EPROTONOSUPPORT) { 1053 log_print("connect %d try %d error %d", con->nodeid, 1054 con->retries, result); 1055 mutex_unlock(&con->sock_mutex); 1056 msleep(1000); 1057 lowcomms_connect_sock(con); 1058 return; 1059 } 1060 1061 out: 1062 mutex_unlock(&con->sock_mutex); 1063 } 1064 1065 /* Connect a new socket to its peer */ 1066 static void tcp_connect_to_sock(struct connection *con) 1067 { 1068 struct sockaddr_storage saddr, src_addr; 1069 int addr_len; 1070 struct socket *sock = NULL; 1071 unsigned int mark; 1072 int result; 1073 1074 if (con->nodeid == 0) { 1075 log_print("attempt to connect sock 0 foiled"); 1076 return; 1077 } 1078 1079 mutex_lock(&con->sock_mutex); 1080 if (con->retries++ > MAX_CONNECT_RETRIES) 1081 goto out; 1082 1083 /* Some odd races can cause double-connects, ignore them */ 1084 if (con->sock) 1085 goto out; 1086 1087 /* Create a socket to communicate with */ 1088 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1089 SOCK_STREAM, IPPROTO_TCP, &sock); 1090 if (result < 0) 1091 goto out_err; 1092 1093 /* set skb mark */ 1094 result = dlm_comm_mark(con->nodeid, &mark); 1095 if (result < 0) 1096 goto out_err; 1097 1098 sock_set_mark(sock->sk, mark); 1099 1100 memset(&saddr, 0, sizeof(saddr)); 1101 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1102 if (result < 0) { 1103 log_print("no address for nodeid %d", con->nodeid); 1104 goto out_err; 1105 } 1106 1107 con->rx_action = receive_from_sock; 1108 con->connect_action = tcp_connect_to_sock; 1109 con->shutdown_action = dlm_tcp_shutdown; 1110 add_sock(sock, con); 1111 1112 /* Bind to our cluster-known address connecting to avoid 1113 routing problems */ 1114 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1115 make_sockaddr(&src_addr, 0, &addr_len); 1116 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1117 addr_len); 1118 if (result < 0) { 1119 log_print("could not bind for connect: %d", result); 1120 /* This *may* not indicate a critical error */ 1121 } 1122 1123 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1124 1125 log_print("connecting to %d", con->nodeid); 1126 1127 /* Turn off Nagle's algorithm */ 1128 tcp_sock_set_nodelay(sock->sk); 1129 1130 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1131 O_NONBLOCK); 1132 if (result == -EINPROGRESS) 1133 result = 0; 1134 if (result == 0) 1135 goto out; 1136 1137 out_err: 1138 if (con->sock) { 1139 sock_release(con->sock); 1140 con->sock = NULL; 1141 } else if (sock) { 1142 sock_release(sock); 1143 } 1144 /* 1145 * Some errors are fatal and this list might need adjusting. For other 1146 * errors we try again until the max number of retries is reached. 1147 */ 1148 if (result != -EHOSTUNREACH && 1149 result != -ENETUNREACH && 1150 result != -ENETDOWN && 1151 result != -EINVAL && 1152 result != -EPROTONOSUPPORT) { 1153 log_print("connect %d try %d error %d", con->nodeid, 1154 con->retries, result); 1155 mutex_unlock(&con->sock_mutex); 1156 msleep(1000); 1157 lowcomms_connect_sock(con); 1158 return; 1159 } 1160 out: 1161 mutex_unlock(&con->sock_mutex); 1162 return; 1163 } 1164 1165 static struct socket *tcp_create_listen_sock(struct connection *con, 1166 struct sockaddr_storage *saddr) 1167 { 1168 struct socket *sock = NULL; 1169 int result = 0; 1170 int addr_len; 1171 1172 if (dlm_local_addr[0]->ss_family == AF_INET) 1173 addr_len = sizeof(struct sockaddr_in); 1174 else 1175 addr_len = sizeof(struct sockaddr_in6); 1176 1177 /* Create a socket to communicate with */ 1178 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1179 SOCK_STREAM, IPPROTO_TCP, &sock); 1180 if (result < 0) { 1181 log_print("Can't create listening comms socket"); 1182 goto create_out; 1183 } 1184 1185 sock_set_mark(sock->sk, dlm_config.ci_mark); 1186 1187 /* Turn off Nagle's algorithm */ 1188 tcp_sock_set_nodelay(sock->sk); 1189 1190 sock_set_reuseaddr(sock->sk); 1191 1192 write_lock_bh(&sock->sk->sk_callback_lock); 1193 sock->sk->sk_user_data = con; 1194 save_listen_callbacks(sock); 1195 con->rx_action = accept_from_sock; 1196 con->connect_action = tcp_connect_to_sock; 1197 write_unlock_bh(&sock->sk->sk_callback_lock); 1198 1199 /* Bind to our port */ 1200 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1201 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1202 if (result < 0) { 1203 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1204 sock_release(sock); 1205 sock = NULL; 1206 con->sock = NULL; 1207 goto create_out; 1208 } 1209 sock_set_keepalive(sock->sk); 1210 1211 result = sock->ops->listen(sock, 5); 1212 if (result < 0) { 1213 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1214 sock_release(sock); 1215 sock = NULL; 1216 goto create_out; 1217 } 1218 1219 create_out: 1220 return sock; 1221 } 1222 1223 /* Get local addresses */ 1224 static void init_local(void) 1225 { 1226 struct sockaddr_storage sas, *addr; 1227 int i; 1228 1229 dlm_local_count = 0; 1230 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1231 if (dlm_our_addr(&sas, i)) 1232 break; 1233 1234 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1235 if (!addr) 1236 break; 1237 dlm_local_addr[dlm_local_count++] = addr; 1238 } 1239 } 1240 1241 /* Initialise SCTP socket and bind to all interfaces */ 1242 static int sctp_listen_for_all(void) 1243 { 1244 struct socket *sock = NULL; 1245 int result = -EINVAL; 1246 struct connection *con = nodeid2con(0, GFP_NOFS); 1247 1248 if (!con) 1249 return -ENOMEM; 1250 1251 log_print("Using SCTP for communications"); 1252 1253 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1254 SOCK_STREAM, IPPROTO_SCTP, &sock); 1255 if (result < 0) { 1256 log_print("Can't create comms socket, check SCTP is loaded"); 1257 goto out; 1258 } 1259 1260 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1261 sock_set_mark(sock->sk, dlm_config.ci_mark); 1262 sctp_sock_set_nodelay(sock->sk); 1263 1264 write_lock_bh(&sock->sk->sk_callback_lock); 1265 /* Init con struct */ 1266 sock->sk->sk_user_data = con; 1267 save_listen_callbacks(sock); 1268 con->sock = sock; 1269 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1270 con->rx_action = accept_from_sock; 1271 con->connect_action = sctp_connect_to_sock; 1272 1273 write_unlock_bh(&sock->sk->sk_callback_lock); 1274 1275 /* Bind to all addresses. */ 1276 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) 1277 goto create_delsock; 1278 1279 result = sock->ops->listen(sock, 5); 1280 if (result < 0) { 1281 log_print("Can't set socket listening"); 1282 goto create_delsock; 1283 } 1284 1285 return 0; 1286 1287 create_delsock: 1288 sock_release(sock); 1289 con->sock = NULL; 1290 out: 1291 return result; 1292 } 1293 1294 static int tcp_listen_for_all(void) 1295 { 1296 struct socket *sock = NULL; 1297 struct connection *con = nodeid2con(0, GFP_NOFS); 1298 int result = -EINVAL; 1299 1300 if (!con) 1301 return -ENOMEM; 1302 1303 /* We don't support multi-homed hosts */ 1304 if (dlm_local_addr[1] != NULL) { 1305 log_print("TCP protocol can't handle multi-homed hosts, " 1306 "try SCTP"); 1307 return -EINVAL; 1308 } 1309 1310 log_print("Using TCP for communications"); 1311 1312 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1313 if (sock) { 1314 add_sock(sock, con); 1315 result = 0; 1316 } 1317 else { 1318 result = -EADDRINUSE; 1319 } 1320 1321 return result; 1322 } 1323 1324 1325 1326 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1327 gfp_t allocation) 1328 { 1329 struct writequeue_entry *entry; 1330 1331 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1332 if (!entry) 1333 return NULL; 1334 1335 entry->page = alloc_page(allocation); 1336 if (!entry->page) { 1337 kfree(entry); 1338 return NULL; 1339 } 1340 1341 entry->offset = 0; 1342 entry->len = 0; 1343 entry->end = 0; 1344 entry->users = 0; 1345 entry->con = con; 1346 1347 return entry; 1348 } 1349 1350 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1351 { 1352 struct connection *con; 1353 struct writequeue_entry *e; 1354 int offset = 0; 1355 1356 con = nodeid2con(nodeid, allocation); 1357 if (!con) 1358 return NULL; 1359 1360 spin_lock(&con->writequeue_lock); 1361 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1362 if ((&e->list == &con->writequeue) || 1363 (PAGE_SIZE - e->end < len)) { 1364 e = NULL; 1365 } else { 1366 offset = e->end; 1367 e->end += len; 1368 e->users++; 1369 } 1370 spin_unlock(&con->writequeue_lock); 1371 1372 if (e) { 1373 got_one: 1374 *ppc = page_address(e->page) + offset; 1375 return e; 1376 } 1377 1378 e = new_writequeue_entry(con, allocation); 1379 if (e) { 1380 spin_lock(&con->writequeue_lock); 1381 offset = e->end; 1382 e->end += len; 1383 e->users++; 1384 list_add_tail(&e->list, &con->writequeue); 1385 spin_unlock(&con->writequeue_lock); 1386 goto got_one; 1387 } 1388 return NULL; 1389 } 1390 1391 void dlm_lowcomms_commit_buffer(void *mh) 1392 { 1393 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1394 struct connection *con = e->con; 1395 int users; 1396 1397 spin_lock(&con->writequeue_lock); 1398 users = --e->users; 1399 if (users) 1400 goto out; 1401 e->len = e->end - e->offset; 1402 spin_unlock(&con->writequeue_lock); 1403 1404 queue_work(send_workqueue, &con->swork); 1405 return; 1406 1407 out: 1408 spin_unlock(&con->writequeue_lock); 1409 return; 1410 } 1411 1412 /* Send a message */ 1413 static void send_to_sock(struct connection *con) 1414 { 1415 int ret = 0; 1416 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1417 struct writequeue_entry *e; 1418 int len, offset; 1419 int count = 0; 1420 1421 mutex_lock(&con->sock_mutex); 1422 if (con->sock == NULL) 1423 goto out_connect; 1424 1425 spin_lock(&con->writequeue_lock); 1426 for (;;) { 1427 e = list_entry(con->writequeue.next, struct writequeue_entry, 1428 list); 1429 if ((struct list_head *) e == &con->writequeue) 1430 break; 1431 1432 len = e->len; 1433 offset = e->offset; 1434 BUG_ON(len == 0 && e->users == 0); 1435 spin_unlock(&con->writequeue_lock); 1436 1437 ret = 0; 1438 if (len) { 1439 ret = kernel_sendpage(con->sock, e->page, offset, len, 1440 msg_flags); 1441 if (ret == -EAGAIN || ret == 0) { 1442 if (ret == -EAGAIN && 1443 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1444 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1445 /* Notify TCP that we're limited by the 1446 * application window size. 1447 */ 1448 set_bit(SOCK_NOSPACE, &con->sock->flags); 1449 con->sock->sk->sk_write_pending++; 1450 } 1451 cond_resched(); 1452 goto out; 1453 } else if (ret < 0) 1454 goto send_error; 1455 } 1456 1457 /* Don't starve people filling buffers */ 1458 if (++count >= MAX_SEND_MSG_COUNT) { 1459 cond_resched(); 1460 count = 0; 1461 } 1462 1463 spin_lock(&con->writequeue_lock); 1464 writequeue_entry_complete(e, ret); 1465 } 1466 spin_unlock(&con->writequeue_lock); 1467 out: 1468 mutex_unlock(&con->sock_mutex); 1469 return; 1470 1471 send_error: 1472 mutex_unlock(&con->sock_mutex); 1473 close_connection(con, false, false, true); 1474 /* Requeue the send work. When the work daemon runs again, it will try 1475 a new connection, then call this function again. */ 1476 queue_work(send_workqueue, &con->swork); 1477 return; 1478 1479 out_connect: 1480 mutex_unlock(&con->sock_mutex); 1481 queue_work(send_workqueue, &con->swork); 1482 cond_resched(); 1483 } 1484 1485 static void clean_one_writequeue(struct connection *con) 1486 { 1487 struct writequeue_entry *e, *safe; 1488 1489 spin_lock(&con->writequeue_lock); 1490 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1491 list_del(&e->list); 1492 free_entry(e); 1493 } 1494 spin_unlock(&con->writequeue_lock); 1495 } 1496 1497 /* Called from recovery when it knows that a node has 1498 left the cluster */ 1499 int dlm_lowcomms_close(int nodeid) 1500 { 1501 struct connection *con; 1502 struct dlm_node_addr *na; 1503 1504 log_print("closing connection to node %d", nodeid); 1505 con = nodeid2con(nodeid, 0); 1506 if (con) { 1507 set_bit(CF_CLOSE, &con->flags); 1508 close_connection(con, true, true, true); 1509 clean_one_writequeue(con); 1510 } 1511 1512 spin_lock(&dlm_node_addrs_spin); 1513 na = find_node_addr(nodeid); 1514 if (na) { 1515 list_del(&na->list); 1516 while (na->addr_count--) 1517 kfree(na->addr[na->addr_count]); 1518 kfree(na); 1519 } 1520 spin_unlock(&dlm_node_addrs_spin); 1521 1522 return 0; 1523 } 1524 1525 /* Receive workqueue function */ 1526 static void process_recv_sockets(struct work_struct *work) 1527 { 1528 struct connection *con = container_of(work, struct connection, rwork); 1529 int err; 1530 1531 clear_bit(CF_READ_PENDING, &con->flags); 1532 do { 1533 err = con->rx_action(con); 1534 } while (!err); 1535 } 1536 1537 /* Send workqueue function */ 1538 static void process_send_sockets(struct work_struct *work) 1539 { 1540 struct connection *con = container_of(work, struct connection, swork); 1541 1542 clear_bit(CF_WRITE_PENDING, &con->flags); 1543 if (con->sock == NULL) /* not mutex protected so check it inside too */ 1544 con->connect_action(con); 1545 if (!list_empty(&con->writequeue)) 1546 send_to_sock(con); 1547 } 1548 1549 1550 /* Discard all entries on the write queues */ 1551 static void clean_writequeues(void) 1552 { 1553 foreach_conn(clean_one_writequeue); 1554 } 1555 1556 static void work_stop(void) 1557 { 1558 if (recv_workqueue) 1559 destroy_workqueue(recv_workqueue); 1560 if (send_workqueue) 1561 destroy_workqueue(send_workqueue); 1562 } 1563 1564 static int work_start(void) 1565 { 1566 recv_workqueue = alloc_workqueue("dlm_recv", 1567 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1568 if (!recv_workqueue) { 1569 log_print("can't start dlm_recv"); 1570 return -ENOMEM; 1571 } 1572 1573 send_workqueue = alloc_workqueue("dlm_send", 1574 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1575 if (!send_workqueue) { 1576 log_print("can't start dlm_send"); 1577 destroy_workqueue(recv_workqueue); 1578 return -ENOMEM; 1579 } 1580 1581 return 0; 1582 } 1583 1584 static void _stop_conn(struct connection *con, bool and_other) 1585 { 1586 mutex_lock(&con->sock_mutex); 1587 set_bit(CF_CLOSE, &con->flags); 1588 set_bit(CF_READ_PENDING, &con->flags); 1589 set_bit(CF_WRITE_PENDING, &con->flags); 1590 if (con->sock && con->sock->sk) { 1591 write_lock_bh(&con->sock->sk->sk_callback_lock); 1592 con->sock->sk->sk_user_data = NULL; 1593 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1594 } 1595 if (con->othercon && and_other) 1596 _stop_conn(con->othercon, false); 1597 mutex_unlock(&con->sock_mutex); 1598 } 1599 1600 static void stop_conn(struct connection *con) 1601 { 1602 _stop_conn(con, true); 1603 } 1604 1605 static void shutdown_conn(struct connection *con) 1606 { 1607 if (con->shutdown_action) 1608 con->shutdown_action(con); 1609 } 1610 1611 static void free_conn(struct connection *con) 1612 { 1613 close_connection(con, true, true, true); 1614 if (con->othercon) 1615 kmem_cache_free(con_cache, con->othercon); 1616 hlist_del(&con->list); 1617 kmem_cache_free(con_cache, con); 1618 } 1619 1620 static void work_flush(void) 1621 { 1622 int ok; 1623 int i; 1624 struct hlist_node *n; 1625 struct connection *con; 1626 1627 if (recv_workqueue) 1628 flush_workqueue(recv_workqueue); 1629 if (send_workqueue) 1630 flush_workqueue(send_workqueue); 1631 do { 1632 ok = 1; 1633 foreach_conn(stop_conn); 1634 if (recv_workqueue) 1635 flush_workqueue(recv_workqueue); 1636 if (send_workqueue) 1637 flush_workqueue(send_workqueue); 1638 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1639 hlist_for_each_entry_safe(con, n, 1640 &connection_hash[i], list) { 1641 ok &= test_bit(CF_READ_PENDING, &con->flags); 1642 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1643 if (con->othercon) { 1644 ok &= test_bit(CF_READ_PENDING, 1645 &con->othercon->flags); 1646 ok &= test_bit(CF_WRITE_PENDING, 1647 &con->othercon->flags); 1648 } 1649 } 1650 } 1651 } while (!ok); 1652 } 1653 1654 void dlm_lowcomms_stop(void) 1655 { 1656 /* Set all the flags to prevent any 1657 socket activity. 1658 */ 1659 mutex_lock(&connections_lock); 1660 dlm_allow_conn = 0; 1661 mutex_unlock(&connections_lock); 1662 foreach_conn(shutdown_conn); 1663 work_flush(); 1664 clean_writequeues(); 1665 foreach_conn(free_conn); 1666 work_stop(); 1667 1668 kmem_cache_destroy(con_cache); 1669 } 1670 1671 int dlm_lowcomms_start(void) 1672 { 1673 int error = -EINVAL; 1674 struct connection *con; 1675 int i; 1676 1677 for (i = 0; i < CONN_HASH_SIZE; i++) 1678 INIT_HLIST_HEAD(&connection_hash[i]); 1679 1680 init_local(); 1681 if (!dlm_local_count) { 1682 error = -ENOTCONN; 1683 log_print("no local IP address has been set"); 1684 goto fail; 1685 } 1686 1687 error = -ENOMEM; 1688 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1689 __alignof__(struct connection), 0, 1690 NULL); 1691 if (!con_cache) 1692 goto fail; 1693 1694 error = work_start(); 1695 if (error) 1696 goto fail_destroy; 1697 1698 dlm_allow_conn = 1; 1699 1700 /* Start listening */ 1701 if (dlm_config.ci_protocol == 0) 1702 error = tcp_listen_for_all(); 1703 else 1704 error = sctp_listen_for_all(); 1705 if (error) 1706 goto fail_unlisten; 1707 1708 return 0; 1709 1710 fail_unlisten: 1711 dlm_allow_conn = 0; 1712 con = nodeid2con(0,0); 1713 if (con) { 1714 close_connection(con, false, true, true); 1715 kmem_cache_free(con_cache, con); 1716 } 1717 fail_destroy: 1718 kmem_cache_destroy(con_cache); 1719 fail: 1720 return error; 1721 } 1722 1723 void dlm_lowcomms_exit(void) 1724 { 1725 struct dlm_node_addr *na, *safe; 1726 1727 spin_lock(&dlm_node_addrs_spin); 1728 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1729 list_del(&na->list); 1730 while (na->addr_count--) 1731 kfree(na->addr[na->addr_count]); 1732 kfree(na); 1733 } 1734 spin_unlock(&dlm_node_addrs_spin); 1735 } 1736