1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 /* 15 * lowcomms.c 16 * 17 * This is the "low-level" comms layer. 18 * 19 * It is responsible for sending/receiving messages 20 * from other nodes in the cluster. 21 * 22 * Cluster nodes are referred to by their nodeids. nodeids are 23 * simply 32 bit numbers to the locking module - if they need to 24 * be expanded for the cluster infrastructure then that is its 25 * responsibility. It is this layer's 26 * responsibility to resolve these into IP address or 27 * whatever it needs for inter-node communication. 28 * 29 * The comms level is two kernel threads that deal mainly with 30 * the receiving of messages from other nodes and passing them 31 * up to the mid-level comms layer (which understands the 32 * message format) for execution by the locking core, and 33 * a send thread which does all the setting up of connections 34 * to remote nodes and the sending of data. Threads are not allowed 35 * to send their own data because it may cause them to wait in times 36 * of high load. Also, this way, the sending thread can collect together 37 * messages bound for one node and send them in one block. 38 * 39 * lowcomms will choose to use either TCP or SCTP as its transport layer 40 * depending on the configuration variable 'protocol'. This should be set 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 43 * for the DLM to function. 44 * 45 */ 46 47 #include <asm/ioctls.h> 48 #include <net/sock.h> 49 #include <net/tcp.h> 50 #include <linux/pagemap.h> 51 #include <linux/file.h> 52 #include <linux/mutex.h> 53 #include <linux/sctp.h> 54 #include <linux/slab.h> 55 #include <net/sctp/sctp.h> 56 #include <net/ipv6.h> 57 58 #include "dlm_internal.h" 59 #include "lowcomms.h" 60 #include "midcomms.h" 61 #include "config.h" 62 63 #define NEEDED_RMEM (4*1024*1024) 64 #define CONN_HASH_SIZE 32 65 66 /* Number of messages to send before rescheduling */ 67 #define MAX_SEND_MSG_COUNT 25 68 69 struct cbuf { 70 unsigned int base; 71 unsigned int len; 72 unsigned int mask; 73 }; 74 75 static void cbuf_add(struct cbuf *cb, int n) 76 { 77 cb->len += n; 78 } 79 80 static int cbuf_data(struct cbuf *cb) 81 { 82 return ((cb->base + cb->len) & cb->mask); 83 } 84 85 static void cbuf_init(struct cbuf *cb, int size) 86 { 87 cb->base = cb->len = 0; 88 cb->mask = size-1; 89 } 90 91 static void cbuf_eat(struct cbuf *cb, int n) 92 { 93 cb->len -= n; 94 cb->base += n; 95 cb->base &= cb->mask; 96 } 97 98 static bool cbuf_empty(struct cbuf *cb) 99 { 100 return cb->len == 0; 101 } 102 103 struct connection { 104 struct socket *sock; /* NULL if not connected */ 105 uint32_t nodeid; /* So we know who we are in the list */ 106 struct mutex sock_mutex; 107 unsigned long flags; 108 #define CF_READ_PENDING 1 109 #define CF_WRITE_PENDING 2 110 #define CF_CONNECT_PENDING 3 111 #define CF_INIT_PENDING 4 112 #define CF_IS_OTHERCON 5 113 #define CF_CLOSE 6 114 #define CF_APP_LIMITED 7 115 struct list_head writequeue; /* List of outgoing writequeue_entries */ 116 spinlock_t writequeue_lock; 117 int (*rx_action) (struct connection *); /* What to do when active */ 118 void (*connect_action) (struct connection *); /* What to do to connect */ 119 struct page *rx_page; 120 struct cbuf cb; 121 int retries; 122 #define MAX_CONNECT_RETRIES 3 123 int sctp_assoc; 124 struct hlist_node list; 125 struct connection *othercon; 126 struct work_struct rwork; /* Receive workqueue */ 127 struct work_struct swork; /* Send workqueue */ 128 bool try_new_addr; 129 }; 130 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 131 132 /* An entry waiting to be sent */ 133 struct writequeue_entry { 134 struct list_head list; 135 struct page *page; 136 int offset; 137 int len; 138 int end; 139 int users; 140 struct connection *con; 141 }; 142 143 struct dlm_node_addr { 144 struct list_head list; 145 int nodeid; 146 int addr_count; 147 int curr_addr_index; 148 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 149 }; 150 151 static LIST_HEAD(dlm_node_addrs); 152 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 153 154 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 155 static int dlm_local_count; 156 static int dlm_allow_conn; 157 158 /* Work queues */ 159 static struct workqueue_struct *recv_workqueue; 160 static struct workqueue_struct *send_workqueue; 161 162 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 163 static DEFINE_MUTEX(connections_lock); 164 static struct kmem_cache *con_cache; 165 166 static void process_recv_sockets(struct work_struct *work); 167 static void process_send_sockets(struct work_struct *work); 168 169 170 /* This is deliberately very simple because most clusters have simple 171 sequential nodeids, so we should be able to go straight to a connection 172 struct in the array */ 173 static inline int nodeid_hash(int nodeid) 174 { 175 return nodeid & (CONN_HASH_SIZE-1); 176 } 177 178 static struct connection *__find_con(int nodeid) 179 { 180 int r; 181 struct connection *con; 182 183 r = nodeid_hash(nodeid); 184 185 hlist_for_each_entry(con, &connection_hash[r], list) { 186 if (con->nodeid == nodeid) 187 return con; 188 } 189 return NULL; 190 } 191 192 /* 193 * If 'allocation' is zero then we don't attempt to create a new 194 * connection structure for this node. 195 */ 196 static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 197 { 198 struct connection *con = NULL; 199 int r; 200 201 con = __find_con(nodeid); 202 if (con || !alloc) 203 return con; 204 205 con = kmem_cache_zalloc(con_cache, alloc); 206 if (!con) 207 return NULL; 208 209 r = nodeid_hash(nodeid); 210 hlist_add_head(&con->list, &connection_hash[r]); 211 212 con->nodeid = nodeid; 213 mutex_init(&con->sock_mutex); 214 INIT_LIST_HEAD(&con->writequeue); 215 spin_lock_init(&con->writequeue_lock); 216 INIT_WORK(&con->swork, process_send_sockets); 217 INIT_WORK(&con->rwork, process_recv_sockets); 218 219 /* Setup action pointers for child sockets */ 220 if (con->nodeid) { 221 struct connection *zerocon = __find_con(0); 222 223 con->connect_action = zerocon->connect_action; 224 if (!con->rx_action) 225 con->rx_action = zerocon->rx_action; 226 } 227 228 return con; 229 } 230 231 /* Loop round all connections */ 232 static void foreach_conn(void (*conn_func)(struct connection *c)) 233 { 234 int i; 235 struct hlist_node *n; 236 struct connection *con; 237 238 for (i = 0; i < CONN_HASH_SIZE; i++) { 239 hlist_for_each_entry_safe(con, n, &connection_hash[i], list) 240 conn_func(con); 241 } 242 } 243 244 static struct connection *nodeid2con(int nodeid, gfp_t allocation) 245 { 246 struct connection *con; 247 248 mutex_lock(&connections_lock); 249 con = __nodeid2con(nodeid, allocation); 250 mutex_unlock(&connections_lock); 251 252 return con; 253 } 254 255 /* This is a bit drastic, but only called when things go wrong */ 256 static struct connection *assoc2con(int assoc_id) 257 { 258 int i; 259 struct connection *con; 260 261 mutex_lock(&connections_lock); 262 263 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 264 hlist_for_each_entry(con, &connection_hash[i], list) { 265 if (con->sctp_assoc == assoc_id) { 266 mutex_unlock(&connections_lock); 267 return con; 268 } 269 } 270 } 271 mutex_unlock(&connections_lock); 272 return NULL; 273 } 274 275 static struct dlm_node_addr *find_node_addr(int nodeid) 276 { 277 struct dlm_node_addr *na; 278 279 list_for_each_entry(na, &dlm_node_addrs, list) { 280 if (na->nodeid == nodeid) 281 return na; 282 } 283 return NULL; 284 } 285 286 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 287 { 288 switch (x->ss_family) { 289 case AF_INET: { 290 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 291 struct sockaddr_in *siny = (struct sockaddr_in *)y; 292 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 293 return 0; 294 if (sinx->sin_port != siny->sin_port) 295 return 0; 296 break; 297 } 298 case AF_INET6: { 299 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 300 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 301 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 302 return 0; 303 if (sinx->sin6_port != siny->sin6_port) 304 return 0; 305 break; 306 } 307 default: 308 return 0; 309 } 310 return 1; 311 } 312 313 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 314 struct sockaddr *sa_out, bool try_new_addr) 315 { 316 struct sockaddr_storage sas; 317 struct dlm_node_addr *na; 318 319 if (!dlm_local_count) 320 return -1; 321 322 spin_lock(&dlm_node_addrs_spin); 323 na = find_node_addr(nodeid); 324 if (na && na->addr_count) { 325 if (try_new_addr) { 326 na->curr_addr_index++; 327 if (na->curr_addr_index == na->addr_count) 328 na->curr_addr_index = 0; 329 } 330 331 memcpy(&sas, na->addr[na->curr_addr_index ], 332 sizeof(struct sockaddr_storage)); 333 } 334 spin_unlock(&dlm_node_addrs_spin); 335 336 if (!na) 337 return -EEXIST; 338 339 if (!na->addr_count) 340 return -ENOENT; 341 342 if (sas_out) 343 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 344 345 if (!sa_out) 346 return 0; 347 348 if (dlm_local_addr[0]->ss_family == AF_INET) { 349 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 350 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 351 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 352 } else { 353 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 354 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 355 ret6->sin6_addr = in6->sin6_addr; 356 } 357 358 return 0; 359 } 360 361 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 362 { 363 struct dlm_node_addr *na; 364 int rv = -EEXIST; 365 int addr_i; 366 367 spin_lock(&dlm_node_addrs_spin); 368 list_for_each_entry(na, &dlm_node_addrs, list) { 369 if (!na->addr_count) 370 continue; 371 372 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 373 if (addr_compare(na->addr[addr_i], addr)) { 374 *nodeid = na->nodeid; 375 rv = 0; 376 goto unlock; 377 } 378 } 379 } 380 unlock: 381 spin_unlock(&dlm_node_addrs_spin); 382 return rv; 383 } 384 385 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 386 { 387 struct sockaddr_storage *new_addr; 388 struct dlm_node_addr *new_node, *na; 389 390 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 391 if (!new_node) 392 return -ENOMEM; 393 394 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 395 if (!new_addr) { 396 kfree(new_node); 397 return -ENOMEM; 398 } 399 400 memcpy(new_addr, addr, len); 401 402 spin_lock(&dlm_node_addrs_spin); 403 na = find_node_addr(nodeid); 404 if (!na) { 405 new_node->nodeid = nodeid; 406 new_node->addr[0] = new_addr; 407 new_node->addr_count = 1; 408 list_add(&new_node->list, &dlm_node_addrs); 409 spin_unlock(&dlm_node_addrs_spin); 410 return 0; 411 } 412 413 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 414 spin_unlock(&dlm_node_addrs_spin); 415 kfree(new_addr); 416 kfree(new_node); 417 return -ENOSPC; 418 } 419 420 na->addr[na->addr_count++] = new_addr; 421 spin_unlock(&dlm_node_addrs_spin); 422 kfree(new_node); 423 return 0; 424 } 425 426 /* Data available on socket or listen socket received a connect */ 427 static void lowcomms_data_ready(struct sock *sk) 428 { 429 struct connection *con = sock2con(sk); 430 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 431 queue_work(recv_workqueue, &con->rwork); 432 } 433 434 static void lowcomms_write_space(struct sock *sk) 435 { 436 struct connection *con = sock2con(sk); 437 438 if (!con) 439 return; 440 441 clear_bit(SOCK_NOSPACE, &con->sock->flags); 442 443 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 444 con->sock->sk->sk_write_pending--; 445 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags); 446 } 447 448 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 449 queue_work(send_workqueue, &con->swork); 450 } 451 452 static inline void lowcomms_connect_sock(struct connection *con) 453 { 454 if (test_bit(CF_CLOSE, &con->flags)) 455 return; 456 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 457 queue_work(send_workqueue, &con->swork); 458 } 459 460 static void lowcomms_state_change(struct sock *sk) 461 { 462 if (sk->sk_state == TCP_ESTABLISHED) 463 lowcomms_write_space(sk); 464 } 465 466 int dlm_lowcomms_connect_node(int nodeid) 467 { 468 struct connection *con; 469 470 /* with sctp there's no connecting without sending */ 471 if (dlm_config.ci_protocol != 0) 472 return 0; 473 474 if (nodeid == dlm_our_nodeid()) 475 return 0; 476 477 con = nodeid2con(nodeid, GFP_NOFS); 478 if (!con) 479 return -ENOMEM; 480 lowcomms_connect_sock(con); 481 return 0; 482 } 483 484 /* Make a socket active */ 485 static void add_sock(struct socket *sock, struct connection *con) 486 { 487 con->sock = sock; 488 489 /* Install a data_ready callback */ 490 con->sock->sk->sk_data_ready = lowcomms_data_ready; 491 con->sock->sk->sk_write_space = lowcomms_write_space; 492 con->sock->sk->sk_state_change = lowcomms_state_change; 493 con->sock->sk->sk_user_data = con; 494 con->sock->sk->sk_allocation = GFP_NOFS; 495 } 496 497 /* Add the port number to an IPv6 or 4 sockaddr and return the address 498 length */ 499 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 500 int *addr_len) 501 { 502 saddr->ss_family = dlm_local_addr[0]->ss_family; 503 if (saddr->ss_family == AF_INET) { 504 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 505 in4_addr->sin_port = cpu_to_be16(port); 506 *addr_len = sizeof(struct sockaddr_in); 507 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 508 } else { 509 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 510 in6_addr->sin6_port = cpu_to_be16(port); 511 *addr_len = sizeof(struct sockaddr_in6); 512 } 513 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 514 } 515 516 /* Close a remote connection and tidy up */ 517 static void close_connection(struct connection *con, bool and_other) 518 { 519 mutex_lock(&con->sock_mutex); 520 521 if (con->sock) { 522 sock_release(con->sock); 523 con->sock = NULL; 524 } 525 if (con->othercon && and_other) { 526 /* Will only re-enter once. */ 527 close_connection(con->othercon, false); 528 } 529 if (con->rx_page) { 530 __free_page(con->rx_page); 531 con->rx_page = NULL; 532 } 533 534 con->retries = 0; 535 mutex_unlock(&con->sock_mutex); 536 } 537 538 /* We only send shutdown messages to nodes that are not part of the cluster */ 539 static void sctp_send_shutdown(sctp_assoc_t associd) 540 { 541 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 542 struct msghdr outmessage; 543 struct cmsghdr *cmsg; 544 struct sctp_sndrcvinfo *sinfo; 545 int ret; 546 struct connection *con; 547 548 con = nodeid2con(0,0); 549 BUG_ON(con == NULL); 550 551 outmessage.msg_name = NULL; 552 outmessage.msg_namelen = 0; 553 outmessage.msg_control = outcmsg; 554 outmessage.msg_controllen = sizeof(outcmsg); 555 outmessage.msg_flags = MSG_EOR; 556 557 cmsg = CMSG_FIRSTHDR(&outmessage); 558 cmsg->cmsg_level = IPPROTO_SCTP; 559 cmsg->cmsg_type = SCTP_SNDRCV; 560 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 561 outmessage.msg_controllen = cmsg->cmsg_len; 562 sinfo = CMSG_DATA(cmsg); 563 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 564 565 sinfo->sinfo_flags |= MSG_EOF; 566 sinfo->sinfo_assoc_id = associd; 567 568 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 569 570 if (ret != 0) 571 log_print("send EOF to node failed: %d", ret); 572 } 573 574 static void sctp_init_failed_foreach(struct connection *con) 575 { 576 577 /* 578 * Don't try to recover base con and handle race where the 579 * other node's assoc init creates a assoc and we get that 580 * notification, then we get a notification that our attempt 581 * failed due. This happens when we are still trying the primary 582 * address, but the other node has already tried secondary addrs 583 * and found one that worked. 584 */ 585 if (!con->nodeid || con->sctp_assoc) 586 return; 587 588 log_print("Retrying SCTP association init for node %d\n", con->nodeid); 589 590 con->try_new_addr = true; 591 con->sctp_assoc = 0; 592 if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) { 593 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 594 queue_work(send_workqueue, &con->swork); 595 } 596 } 597 598 /* INIT failed but we don't know which node... 599 restart INIT on all pending nodes */ 600 static void sctp_init_failed(void) 601 { 602 mutex_lock(&connections_lock); 603 604 foreach_conn(sctp_init_failed_foreach); 605 606 mutex_unlock(&connections_lock); 607 } 608 609 static void retry_failed_sctp_send(struct connection *recv_con, 610 struct sctp_send_failed *sn_send_failed, 611 char *buf) 612 { 613 int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed); 614 struct dlm_mhandle *mh; 615 struct connection *con; 616 char *retry_buf; 617 int nodeid = sn_send_failed->ssf_info.sinfo_ppid; 618 619 log_print("Retry sending %d bytes to node id %d", len, nodeid); 620 621 if (!nodeid) { 622 log_print("Shouldn't resend data via listening connection."); 623 return; 624 } 625 626 con = nodeid2con(nodeid, 0); 627 if (!con) { 628 log_print("Could not look up con for nodeid %d\n", 629 nodeid); 630 return; 631 } 632 633 mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf); 634 if (!mh) { 635 log_print("Could not allocate buf for retry."); 636 return; 637 } 638 memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len); 639 dlm_lowcomms_commit_buffer(mh); 640 641 /* 642 * If we got a assoc changed event before the send failed event then 643 * we only need to retry the send. 644 */ 645 if (con->sctp_assoc) { 646 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 647 queue_work(send_workqueue, &con->swork); 648 } else 649 sctp_init_failed_foreach(con); 650 } 651 652 /* Something happened to an association */ 653 static void process_sctp_notification(struct connection *con, 654 struct msghdr *msg, char *buf) 655 { 656 union sctp_notification *sn = (union sctp_notification *)buf; 657 struct linger linger; 658 659 switch (sn->sn_header.sn_type) { 660 case SCTP_SEND_FAILED: 661 retry_failed_sctp_send(con, &sn->sn_send_failed, buf); 662 break; 663 case SCTP_ASSOC_CHANGE: 664 switch (sn->sn_assoc_change.sac_state) { 665 case SCTP_COMM_UP: 666 case SCTP_RESTART: 667 { 668 /* Check that the new node is in the lockspace */ 669 struct sctp_prim prim; 670 int nodeid; 671 int prim_len, ret; 672 int addr_len; 673 struct connection *new_con; 674 675 /* 676 * We get this before any data for an association. 677 * We verify that the node is in the cluster and 678 * then peel off a socket for it. 679 */ 680 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 681 log_print("COMM_UP for invalid assoc ID %d", 682 (int)sn->sn_assoc_change.sac_assoc_id); 683 sctp_init_failed(); 684 return; 685 } 686 memset(&prim, 0, sizeof(struct sctp_prim)); 687 prim_len = sizeof(struct sctp_prim); 688 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 689 690 ret = kernel_getsockopt(con->sock, 691 IPPROTO_SCTP, 692 SCTP_PRIMARY_ADDR, 693 (char*)&prim, 694 &prim_len); 695 if (ret < 0) { 696 log_print("getsockopt/sctp_primary_addr on " 697 "new assoc %d failed : %d", 698 (int)sn->sn_assoc_change.sac_assoc_id, 699 ret); 700 701 /* Retry INIT later */ 702 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 703 if (new_con) 704 clear_bit(CF_CONNECT_PENDING, &con->flags); 705 return; 706 } 707 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 708 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 709 unsigned char *b=(unsigned char *)&prim.ssp_addr; 710 log_print("reject connect from unknown addr"); 711 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 712 b, sizeof(struct sockaddr_storage)); 713 sctp_send_shutdown(prim.ssp_assoc_id); 714 return; 715 } 716 717 new_con = nodeid2con(nodeid, GFP_NOFS); 718 if (!new_con) 719 return; 720 721 /* Peel off a new sock */ 722 lock_sock(con->sock->sk); 723 ret = sctp_do_peeloff(con->sock->sk, 724 sn->sn_assoc_change.sac_assoc_id, 725 &new_con->sock); 726 release_sock(con->sock->sk); 727 if (ret < 0) { 728 log_print("Can't peel off a socket for " 729 "connection %d to node %d: err=%d", 730 (int)sn->sn_assoc_change.sac_assoc_id, 731 nodeid, ret); 732 return; 733 } 734 add_sock(new_con->sock, new_con); 735 736 linger.l_onoff = 1; 737 linger.l_linger = 0; 738 ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER, 739 (char *)&linger, sizeof(linger)); 740 if (ret < 0) 741 log_print("set socket option SO_LINGER failed"); 742 743 log_print("connecting to %d sctp association %d", 744 nodeid, (int)sn->sn_assoc_change.sac_assoc_id); 745 746 new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id; 747 new_con->try_new_addr = false; 748 /* Send any pending writes */ 749 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 750 clear_bit(CF_INIT_PENDING, &new_con->flags); 751 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 752 queue_work(send_workqueue, &new_con->swork); 753 } 754 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 755 queue_work(recv_workqueue, &new_con->rwork); 756 } 757 break; 758 759 case SCTP_COMM_LOST: 760 case SCTP_SHUTDOWN_COMP: 761 { 762 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 763 if (con) { 764 con->sctp_assoc = 0; 765 } 766 } 767 break; 768 769 case SCTP_CANT_STR_ASSOC: 770 { 771 /* Will retry init when we get the send failed notification */ 772 log_print("Can't start SCTP association - retrying"); 773 } 774 break; 775 776 default: 777 log_print("unexpected SCTP assoc change id=%d state=%d", 778 (int)sn->sn_assoc_change.sac_assoc_id, 779 sn->sn_assoc_change.sac_state); 780 } 781 default: 782 ; /* fall through */ 783 } 784 } 785 786 /* Data received from remote end */ 787 static int receive_from_sock(struct connection *con) 788 { 789 int ret = 0; 790 struct msghdr msg = {}; 791 struct kvec iov[2]; 792 unsigned len; 793 int r; 794 int call_again_soon = 0; 795 int nvec; 796 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 797 798 mutex_lock(&con->sock_mutex); 799 800 if (con->sock == NULL) { 801 ret = -EAGAIN; 802 goto out_close; 803 } 804 805 if (con->rx_page == NULL) { 806 /* 807 * This doesn't need to be atomic, but I think it should 808 * improve performance if it is. 809 */ 810 con->rx_page = alloc_page(GFP_ATOMIC); 811 if (con->rx_page == NULL) 812 goto out_resched; 813 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 814 } 815 816 /* Only SCTP needs these really */ 817 memset(&incmsg, 0, sizeof(incmsg)); 818 msg.msg_control = incmsg; 819 msg.msg_controllen = sizeof(incmsg); 820 821 /* 822 * iov[0] is the bit of the circular buffer between the current end 823 * point (cb.base + cb.len) and the end of the buffer. 824 */ 825 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 826 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 827 iov[1].iov_len = 0; 828 nvec = 1; 829 830 /* 831 * iov[1] is the bit of the circular buffer between the start of the 832 * buffer and the start of the currently used section (cb.base) 833 */ 834 if (cbuf_data(&con->cb) >= con->cb.base) { 835 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 836 iov[1].iov_len = con->cb.base; 837 iov[1].iov_base = page_address(con->rx_page); 838 nvec = 2; 839 } 840 len = iov[0].iov_len + iov[1].iov_len; 841 842 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 843 MSG_DONTWAIT | MSG_NOSIGNAL); 844 if (ret <= 0) 845 goto out_close; 846 847 /* Process SCTP notifications */ 848 if (msg.msg_flags & MSG_NOTIFICATION) { 849 msg.msg_control = incmsg; 850 msg.msg_controllen = sizeof(incmsg); 851 852 process_sctp_notification(con, &msg, 853 page_address(con->rx_page) + con->cb.base); 854 mutex_unlock(&con->sock_mutex); 855 return 0; 856 } 857 BUG_ON(con->nodeid == 0); 858 859 if (ret == len) 860 call_again_soon = 1; 861 cbuf_add(&con->cb, ret); 862 ret = dlm_process_incoming_buffer(con->nodeid, 863 page_address(con->rx_page), 864 con->cb.base, con->cb.len, 865 PAGE_CACHE_SIZE); 866 if (ret == -EBADMSG) { 867 log_print("lowcomms: addr=%p, base=%u, len=%u, " 868 "iov_len=%u, iov_base[0]=%p, read=%d", 869 page_address(con->rx_page), con->cb.base, con->cb.len, 870 len, iov[0].iov_base, r); 871 } 872 if (ret < 0) 873 goto out_close; 874 cbuf_eat(&con->cb, ret); 875 876 if (cbuf_empty(&con->cb) && !call_again_soon) { 877 __free_page(con->rx_page); 878 con->rx_page = NULL; 879 } 880 881 if (call_again_soon) 882 goto out_resched; 883 mutex_unlock(&con->sock_mutex); 884 return 0; 885 886 out_resched: 887 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 888 queue_work(recv_workqueue, &con->rwork); 889 mutex_unlock(&con->sock_mutex); 890 return -EAGAIN; 891 892 out_close: 893 mutex_unlock(&con->sock_mutex); 894 if (ret != -EAGAIN) { 895 close_connection(con, false); 896 /* Reconnect when there is something to send */ 897 } 898 /* Don't return success if we really got EOF */ 899 if (ret == 0) 900 ret = -EAGAIN; 901 902 return ret; 903 } 904 905 /* Listening socket is busy, accept a connection */ 906 static int tcp_accept_from_sock(struct connection *con) 907 { 908 int result; 909 struct sockaddr_storage peeraddr; 910 struct socket *newsock; 911 int len; 912 int nodeid; 913 struct connection *newcon; 914 struct connection *addcon; 915 916 mutex_lock(&connections_lock); 917 if (!dlm_allow_conn) { 918 mutex_unlock(&connections_lock); 919 return -1; 920 } 921 mutex_unlock(&connections_lock); 922 923 memset(&peeraddr, 0, sizeof(peeraddr)); 924 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 925 IPPROTO_TCP, &newsock); 926 if (result < 0) 927 return -ENOMEM; 928 929 mutex_lock_nested(&con->sock_mutex, 0); 930 931 result = -ENOTCONN; 932 if (con->sock == NULL) 933 goto accept_err; 934 935 newsock->type = con->sock->type; 936 newsock->ops = con->sock->ops; 937 938 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 939 if (result < 0) 940 goto accept_err; 941 942 /* Get the connected socket's peer */ 943 memset(&peeraddr, 0, sizeof(peeraddr)); 944 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 945 &len, 2)) { 946 result = -ECONNABORTED; 947 goto accept_err; 948 } 949 950 /* Get the new node's NODEID */ 951 make_sockaddr(&peeraddr, 0, &len); 952 if (addr_to_nodeid(&peeraddr, &nodeid)) { 953 unsigned char *b=(unsigned char *)&peeraddr; 954 log_print("connect from non cluster node"); 955 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 956 b, sizeof(struct sockaddr_storage)); 957 sock_release(newsock); 958 mutex_unlock(&con->sock_mutex); 959 return -1; 960 } 961 962 log_print("got connection from %d", nodeid); 963 964 /* Check to see if we already have a connection to this node. This 965 * could happen if the two nodes initiate a connection at roughly 966 * the same time and the connections cross on the wire. 967 * In this case we store the incoming one in "othercon" 968 */ 969 newcon = nodeid2con(nodeid, GFP_NOFS); 970 if (!newcon) { 971 result = -ENOMEM; 972 goto accept_err; 973 } 974 mutex_lock_nested(&newcon->sock_mutex, 1); 975 if (newcon->sock) { 976 struct connection *othercon = newcon->othercon; 977 978 if (!othercon) { 979 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 980 if (!othercon) { 981 log_print("failed to allocate incoming socket"); 982 mutex_unlock(&newcon->sock_mutex); 983 result = -ENOMEM; 984 goto accept_err; 985 } 986 othercon->nodeid = nodeid; 987 othercon->rx_action = receive_from_sock; 988 mutex_init(&othercon->sock_mutex); 989 INIT_WORK(&othercon->swork, process_send_sockets); 990 INIT_WORK(&othercon->rwork, process_recv_sockets); 991 set_bit(CF_IS_OTHERCON, &othercon->flags); 992 } 993 if (!othercon->sock) { 994 newcon->othercon = othercon; 995 othercon->sock = newsock; 996 newsock->sk->sk_user_data = othercon; 997 add_sock(newsock, othercon); 998 addcon = othercon; 999 } 1000 else { 1001 printk("Extra connection from node %d attempted\n", nodeid); 1002 result = -EAGAIN; 1003 mutex_unlock(&newcon->sock_mutex); 1004 goto accept_err; 1005 } 1006 } 1007 else { 1008 newsock->sk->sk_user_data = newcon; 1009 newcon->rx_action = receive_from_sock; 1010 add_sock(newsock, newcon); 1011 addcon = newcon; 1012 } 1013 1014 mutex_unlock(&newcon->sock_mutex); 1015 1016 /* 1017 * Add it to the active queue in case we got data 1018 * between processing the accept adding the socket 1019 * to the read_sockets list 1020 */ 1021 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 1022 queue_work(recv_workqueue, &addcon->rwork); 1023 mutex_unlock(&con->sock_mutex); 1024 1025 return 0; 1026 1027 accept_err: 1028 mutex_unlock(&con->sock_mutex); 1029 sock_release(newsock); 1030 1031 if (result != -EAGAIN) 1032 log_print("error accepting connection from node: %d", result); 1033 return result; 1034 } 1035 1036 static void free_entry(struct writequeue_entry *e) 1037 { 1038 __free_page(e->page); 1039 kfree(e); 1040 } 1041 1042 /* 1043 * writequeue_entry_complete - try to delete and free write queue entry 1044 * @e: write queue entry to try to delete 1045 * @completed: bytes completed 1046 * 1047 * writequeue_lock must be held. 1048 */ 1049 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1050 { 1051 e->offset += completed; 1052 e->len -= completed; 1053 1054 if (e->len == 0 && e->users == 0) { 1055 list_del(&e->list); 1056 free_entry(e); 1057 } 1058 } 1059 1060 /* Initiate an SCTP association. 1061 This is a special case of send_to_sock() in that we don't yet have a 1062 peeled-off socket for this association, so we use the listening socket 1063 and add the primary IP address of the remote node. 1064 */ 1065 static void sctp_init_assoc(struct connection *con) 1066 { 1067 struct sockaddr_storage rem_addr; 1068 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 1069 struct msghdr outmessage; 1070 struct cmsghdr *cmsg; 1071 struct sctp_sndrcvinfo *sinfo; 1072 struct connection *base_con; 1073 struct writequeue_entry *e; 1074 int len, offset; 1075 int ret; 1076 int addrlen; 1077 struct kvec iov[1]; 1078 1079 mutex_lock(&con->sock_mutex); 1080 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 1081 goto unlock; 1082 1083 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr, 1084 con->try_new_addr)) { 1085 log_print("no address for nodeid %d", con->nodeid); 1086 goto unlock; 1087 } 1088 base_con = nodeid2con(0, 0); 1089 BUG_ON(base_con == NULL); 1090 1091 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 1092 1093 outmessage.msg_name = &rem_addr; 1094 outmessage.msg_namelen = addrlen; 1095 outmessage.msg_control = outcmsg; 1096 outmessage.msg_controllen = sizeof(outcmsg); 1097 outmessage.msg_flags = MSG_EOR; 1098 1099 spin_lock(&con->writequeue_lock); 1100 1101 if (list_empty(&con->writequeue)) { 1102 spin_unlock(&con->writequeue_lock); 1103 log_print("writequeue empty for nodeid %d", con->nodeid); 1104 goto unlock; 1105 } 1106 1107 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 1108 len = e->len; 1109 offset = e->offset; 1110 1111 /* Send the first block off the write queue */ 1112 iov[0].iov_base = page_address(e->page)+offset; 1113 iov[0].iov_len = len; 1114 spin_unlock(&con->writequeue_lock); 1115 1116 if (rem_addr.ss_family == AF_INET) { 1117 struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr; 1118 log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr); 1119 } else { 1120 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr; 1121 log_print("Trying to connect to %pI6", &sin6->sin6_addr); 1122 } 1123 1124 cmsg = CMSG_FIRSTHDR(&outmessage); 1125 cmsg->cmsg_level = IPPROTO_SCTP; 1126 cmsg->cmsg_type = SCTP_SNDRCV; 1127 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 1128 sinfo = CMSG_DATA(cmsg); 1129 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 1130 sinfo->sinfo_ppid = cpu_to_le32(con->nodeid); 1131 outmessage.msg_controllen = cmsg->cmsg_len; 1132 sinfo->sinfo_flags |= SCTP_ADDR_OVER; 1133 1134 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 1135 if (ret < 0) { 1136 log_print("Send first packet to node %d failed: %d", 1137 con->nodeid, ret); 1138 1139 /* Try again later */ 1140 clear_bit(CF_CONNECT_PENDING, &con->flags); 1141 clear_bit(CF_INIT_PENDING, &con->flags); 1142 } 1143 else { 1144 spin_lock(&con->writequeue_lock); 1145 writequeue_entry_complete(e, ret); 1146 spin_unlock(&con->writequeue_lock); 1147 } 1148 1149 unlock: 1150 mutex_unlock(&con->sock_mutex); 1151 } 1152 1153 /* Connect a new socket to its peer */ 1154 static void tcp_connect_to_sock(struct connection *con) 1155 { 1156 struct sockaddr_storage saddr, src_addr; 1157 int addr_len; 1158 struct socket *sock = NULL; 1159 int one = 1; 1160 int result; 1161 1162 if (con->nodeid == 0) { 1163 log_print("attempt to connect sock 0 foiled"); 1164 return; 1165 } 1166 1167 mutex_lock(&con->sock_mutex); 1168 if (con->retries++ > MAX_CONNECT_RETRIES) 1169 goto out; 1170 1171 /* Some odd races can cause double-connects, ignore them */ 1172 if (con->sock) 1173 goto out; 1174 1175 /* Create a socket to communicate with */ 1176 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1177 IPPROTO_TCP, &sock); 1178 if (result < 0) 1179 goto out_err; 1180 1181 memset(&saddr, 0, sizeof(saddr)); 1182 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1183 if (result < 0) { 1184 log_print("no address for nodeid %d", con->nodeid); 1185 goto out_err; 1186 } 1187 1188 sock->sk->sk_user_data = con; 1189 con->rx_action = receive_from_sock; 1190 con->connect_action = tcp_connect_to_sock; 1191 add_sock(sock, con); 1192 1193 /* Bind to our cluster-known address connecting to avoid 1194 routing problems */ 1195 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1196 make_sockaddr(&src_addr, 0, &addr_len); 1197 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1198 addr_len); 1199 if (result < 0) { 1200 log_print("could not bind for connect: %d", result); 1201 /* This *may* not indicate a critical error */ 1202 } 1203 1204 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1205 1206 log_print("connecting to %d", con->nodeid); 1207 1208 /* Turn off Nagle's algorithm */ 1209 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1210 sizeof(one)); 1211 1212 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1213 O_NONBLOCK); 1214 if (result == -EINPROGRESS) 1215 result = 0; 1216 if (result == 0) 1217 goto out; 1218 1219 out_err: 1220 if (con->sock) { 1221 sock_release(con->sock); 1222 con->sock = NULL; 1223 } else if (sock) { 1224 sock_release(sock); 1225 } 1226 /* 1227 * Some errors are fatal and this list might need adjusting. For other 1228 * errors we try again until the max number of retries is reached. 1229 */ 1230 if (result != -EHOSTUNREACH && 1231 result != -ENETUNREACH && 1232 result != -ENETDOWN && 1233 result != -EINVAL && 1234 result != -EPROTONOSUPPORT) { 1235 log_print("connect %d try %d error %d", con->nodeid, 1236 con->retries, result); 1237 mutex_unlock(&con->sock_mutex); 1238 msleep(1000); 1239 lowcomms_connect_sock(con); 1240 return; 1241 } 1242 out: 1243 mutex_unlock(&con->sock_mutex); 1244 return; 1245 } 1246 1247 static struct socket *tcp_create_listen_sock(struct connection *con, 1248 struct sockaddr_storage *saddr) 1249 { 1250 struct socket *sock = NULL; 1251 int result = 0; 1252 int one = 1; 1253 int addr_len; 1254 1255 if (dlm_local_addr[0]->ss_family == AF_INET) 1256 addr_len = sizeof(struct sockaddr_in); 1257 else 1258 addr_len = sizeof(struct sockaddr_in6); 1259 1260 /* Create a socket to communicate with */ 1261 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1262 IPPROTO_TCP, &sock); 1263 if (result < 0) { 1264 log_print("Can't create listening comms socket"); 1265 goto create_out; 1266 } 1267 1268 /* Turn off Nagle's algorithm */ 1269 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1270 sizeof(one)); 1271 1272 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1273 (char *)&one, sizeof(one)); 1274 1275 if (result < 0) { 1276 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1277 } 1278 con->rx_action = tcp_accept_from_sock; 1279 con->connect_action = tcp_connect_to_sock; 1280 1281 /* Bind to our port */ 1282 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1283 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1284 if (result < 0) { 1285 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1286 sock_release(sock); 1287 sock = NULL; 1288 con->sock = NULL; 1289 goto create_out; 1290 } 1291 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1292 (char *)&one, sizeof(one)); 1293 if (result < 0) { 1294 log_print("Set keepalive failed: %d", result); 1295 } 1296 1297 result = sock->ops->listen(sock, 5); 1298 if (result < 0) { 1299 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1300 sock_release(sock); 1301 sock = NULL; 1302 goto create_out; 1303 } 1304 1305 create_out: 1306 return sock; 1307 } 1308 1309 /* Get local addresses */ 1310 static void init_local(void) 1311 { 1312 struct sockaddr_storage sas, *addr; 1313 int i; 1314 1315 dlm_local_count = 0; 1316 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1317 if (dlm_our_addr(&sas, i)) 1318 break; 1319 1320 addr = kmalloc(sizeof(*addr), GFP_NOFS); 1321 if (!addr) 1322 break; 1323 memcpy(addr, &sas, sizeof(*addr)); 1324 dlm_local_addr[dlm_local_count++] = addr; 1325 } 1326 } 1327 1328 /* Bind to an IP address. SCTP allows multiple address so it can do 1329 multi-homing */ 1330 static int add_sctp_bind_addr(struct connection *sctp_con, 1331 struct sockaddr_storage *addr, 1332 int addr_len, int num) 1333 { 1334 int result = 0; 1335 1336 if (num == 1) 1337 result = kernel_bind(sctp_con->sock, 1338 (struct sockaddr *) addr, 1339 addr_len); 1340 else 1341 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1342 SCTP_SOCKOPT_BINDX_ADD, 1343 (char *)addr, addr_len); 1344 1345 if (result < 0) 1346 log_print("Can't bind to port %d addr number %d", 1347 dlm_config.ci_tcp_port, num); 1348 1349 return result; 1350 } 1351 1352 /* Initialise SCTP socket and bind to all interfaces */ 1353 static int sctp_listen_for_all(void) 1354 { 1355 struct socket *sock = NULL; 1356 struct sockaddr_storage localaddr; 1357 struct sctp_event_subscribe subscribe; 1358 int result = -EINVAL, num = 1, i, addr_len; 1359 struct connection *con = nodeid2con(0, GFP_NOFS); 1360 int bufsize = NEEDED_RMEM; 1361 int one = 1; 1362 1363 if (!con) 1364 return -ENOMEM; 1365 1366 log_print("Using SCTP for communications"); 1367 1368 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1369 IPPROTO_SCTP, &sock); 1370 if (result < 0) { 1371 log_print("Can't create comms socket, check SCTP is loaded"); 1372 goto out; 1373 } 1374 1375 /* Listen for events */ 1376 memset(&subscribe, 0, sizeof(subscribe)); 1377 subscribe.sctp_data_io_event = 1; 1378 subscribe.sctp_association_event = 1; 1379 subscribe.sctp_send_failure_event = 1; 1380 subscribe.sctp_shutdown_event = 1; 1381 subscribe.sctp_partial_delivery_event = 1; 1382 1383 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1384 (char *)&bufsize, sizeof(bufsize)); 1385 if (result) 1386 log_print("Error increasing buffer space on socket %d", result); 1387 1388 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1389 (char *)&subscribe, sizeof(subscribe)); 1390 if (result < 0) { 1391 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1392 result); 1393 goto create_delsock; 1394 } 1395 1396 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one, 1397 sizeof(one)); 1398 if (result < 0) 1399 log_print("Could not set SCTP NODELAY error %d\n", result); 1400 1401 /* Init con struct */ 1402 sock->sk->sk_user_data = con; 1403 con->sock = sock; 1404 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1405 con->rx_action = receive_from_sock; 1406 con->connect_action = sctp_init_assoc; 1407 1408 /* Bind to all interfaces. */ 1409 for (i = 0; i < dlm_local_count; i++) { 1410 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1411 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1412 1413 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1414 if (result) 1415 goto create_delsock; 1416 ++num; 1417 } 1418 1419 result = sock->ops->listen(sock, 5); 1420 if (result < 0) { 1421 log_print("Can't set socket listening"); 1422 goto create_delsock; 1423 } 1424 1425 return 0; 1426 1427 create_delsock: 1428 sock_release(sock); 1429 con->sock = NULL; 1430 out: 1431 return result; 1432 } 1433 1434 static int tcp_listen_for_all(void) 1435 { 1436 struct socket *sock = NULL; 1437 struct connection *con = nodeid2con(0, GFP_NOFS); 1438 int result = -EINVAL; 1439 1440 if (!con) 1441 return -ENOMEM; 1442 1443 /* We don't support multi-homed hosts */ 1444 if (dlm_local_addr[1] != NULL) { 1445 log_print("TCP protocol can't handle multi-homed hosts, " 1446 "try SCTP"); 1447 return -EINVAL; 1448 } 1449 1450 log_print("Using TCP for communications"); 1451 1452 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1453 if (sock) { 1454 add_sock(sock, con); 1455 result = 0; 1456 } 1457 else { 1458 result = -EADDRINUSE; 1459 } 1460 1461 return result; 1462 } 1463 1464 1465 1466 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1467 gfp_t allocation) 1468 { 1469 struct writequeue_entry *entry; 1470 1471 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1472 if (!entry) 1473 return NULL; 1474 1475 entry->page = alloc_page(allocation); 1476 if (!entry->page) { 1477 kfree(entry); 1478 return NULL; 1479 } 1480 1481 entry->offset = 0; 1482 entry->len = 0; 1483 entry->end = 0; 1484 entry->users = 0; 1485 entry->con = con; 1486 1487 return entry; 1488 } 1489 1490 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1491 { 1492 struct connection *con; 1493 struct writequeue_entry *e; 1494 int offset = 0; 1495 1496 con = nodeid2con(nodeid, allocation); 1497 if (!con) 1498 return NULL; 1499 1500 spin_lock(&con->writequeue_lock); 1501 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1502 if ((&e->list == &con->writequeue) || 1503 (PAGE_CACHE_SIZE - e->end < len)) { 1504 e = NULL; 1505 } else { 1506 offset = e->end; 1507 e->end += len; 1508 e->users++; 1509 } 1510 spin_unlock(&con->writequeue_lock); 1511 1512 if (e) { 1513 got_one: 1514 *ppc = page_address(e->page) + offset; 1515 return e; 1516 } 1517 1518 e = new_writequeue_entry(con, allocation); 1519 if (e) { 1520 spin_lock(&con->writequeue_lock); 1521 offset = e->end; 1522 e->end += len; 1523 e->users++; 1524 list_add_tail(&e->list, &con->writequeue); 1525 spin_unlock(&con->writequeue_lock); 1526 goto got_one; 1527 } 1528 return NULL; 1529 } 1530 1531 void dlm_lowcomms_commit_buffer(void *mh) 1532 { 1533 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1534 struct connection *con = e->con; 1535 int users; 1536 1537 spin_lock(&con->writequeue_lock); 1538 users = --e->users; 1539 if (users) 1540 goto out; 1541 e->len = e->end - e->offset; 1542 spin_unlock(&con->writequeue_lock); 1543 1544 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1545 queue_work(send_workqueue, &con->swork); 1546 } 1547 return; 1548 1549 out: 1550 spin_unlock(&con->writequeue_lock); 1551 return; 1552 } 1553 1554 /* Send a message */ 1555 static void send_to_sock(struct connection *con) 1556 { 1557 int ret = 0; 1558 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1559 struct writequeue_entry *e; 1560 int len, offset; 1561 int count = 0; 1562 1563 mutex_lock(&con->sock_mutex); 1564 if (con->sock == NULL) 1565 goto out_connect; 1566 1567 spin_lock(&con->writequeue_lock); 1568 for (;;) { 1569 e = list_entry(con->writequeue.next, struct writequeue_entry, 1570 list); 1571 if ((struct list_head *) e == &con->writequeue) 1572 break; 1573 1574 len = e->len; 1575 offset = e->offset; 1576 BUG_ON(len == 0 && e->users == 0); 1577 spin_unlock(&con->writequeue_lock); 1578 1579 ret = 0; 1580 if (len) { 1581 ret = kernel_sendpage(con->sock, e->page, offset, len, 1582 msg_flags); 1583 if (ret == -EAGAIN || ret == 0) { 1584 if (ret == -EAGAIN && 1585 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) && 1586 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1587 /* Notify TCP that we're limited by the 1588 * application window size. 1589 */ 1590 set_bit(SOCK_NOSPACE, &con->sock->flags); 1591 con->sock->sk->sk_write_pending++; 1592 } 1593 cond_resched(); 1594 goto out; 1595 } else if (ret < 0) 1596 goto send_error; 1597 } 1598 1599 /* Don't starve people filling buffers */ 1600 if (++count >= MAX_SEND_MSG_COUNT) { 1601 cond_resched(); 1602 count = 0; 1603 } 1604 1605 spin_lock(&con->writequeue_lock); 1606 writequeue_entry_complete(e, ret); 1607 } 1608 spin_unlock(&con->writequeue_lock); 1609 out: 1610 mutex_unlock(&con->sock_mutex); 1611 return; 1612 1613 send_error: 1614 mutex_unlock(&con->sock_mutex); 1615 close_connection(con, false); 1616 lowcomms_connect_sock(con); 1617 return; 1618 1619 out_connect: 1620 mutex_unlock(&con->sock_mutex); 1621 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1622 lowcomms_connect_sock(con); 1623 } 1624 1625 static void clean_one_writequeue(struct connection *con) 1626 { 1627 struct writequeue_entry *e, *safe; 1628 1629 spin_lock(&con->writequeue_lock); 1630 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1631 list_del(&e->list); 1632 free_entry(e); 1633 } 1634 spin_unlock(&con->writequeue_lock); 1635 } 1636 1637 /* Called from recovery when it knows that a node has 1638 left the cluster */ 1639 int dlm_lowcomms_close(int nodeid) 1640 { 1641 struct connection *con; 1642 struct dlm_node_addr *na; 1643 1644 log_print("closing connection to node %d", nodeid); 1645 con = nodeid2con(nodeid, 0); 1646 if (con) { 1647 clear_bit(CF_CONNECT_PENDING, &con->flags); 1648 clear_bit(CF_WRITE_PENDING, &con->flags); 1649 set_bit(CF_CLOSE, &con->flags); 1650 if (cancel_work_sync(&con->swork)) 1651 log_print("canceled swork for node %d", nodeid); 1652 if (cancel_work_sync(&con->rwork)) 1653 log_print("canceled rwork for node %d", nodeid); 1654 clean_one_writequeue(con); 1655 close_connection(con, true); 1656 } 1657 1658 spin_lock(&dlm_node_addrs_spin); 1659 na = find_node_addr(nodeid); 1660 if (na) { 1661 list_del(&na->list); 1662 while (na->addr_count--) 1663 kfree(na->addr[na->addr_count]); 1664 kfree(na); 1665 } 1666 spin_unlock(&dlm_node_addrs_spin); 1667 1668 return 0; 1669 } 1670 1671 /* Receive workqueue function */ 1672 static void process_recv_sockets(struct work_struct *work) 1673 { 1674 struct connection *con = container_of(work, struct connection, rwork); 1675 int err; 1676 1677 clear_bit(CF_READ_PENDING, &con->flags); 1678 do { 1679 err = con->rx_action(con); 1680 } while (!err); 1681 } 1682 1683 /* Send workqueue function */ 1684 static void process_send_sockets(struct work_struct *work) 1685 { 1686 struct connection *con = container_of(work, struct connection, swork); 1687 1688 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1689 con->connect_action(con); 1690 set_bit(CF_WRITE_PENDING, &con->flags); 1691 } 1692 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) 1693 send_to_sock(con); 1694 } 1695 1696 1697 /* Discard all entries on the write queues */ 1698 static void clean_writequeues(void) 1699 { 1700 foreach_conn(clean_one_writequeue); 1701 } 1702 1703 static void work_stop(void) 1704 { 1705 destroy_workqueue(recv_workqueue); 1706 destroy_workqueue(send_workqueue); 1707 } 1708 1709 static int work_start(void) 1710 { 1711 recv_workqueue = alloc_workqueue("dlm_recv", 1712 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1713 if (!recv_workqueue) { 1714 log_print("can't start dlm_recv"); 1715 return -ENOMEM; 1716 } 1717 1718 send_workqueue = alloc_workqueue("dlm_send", 1719 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1720 if (!send_workqueue) { 1721 log_print("can't start dlm_send"); 1722 destroy_workqueue(recv_workqueue); 1723 return -ENOMEM; 1724 } 1725 1726 return 0; 1727 } 1728 1729 static void stop_conn(struct connection *con) 1730 { 1731 con->flags |= 0x0F; 1732 if (con->sock && con->sock->sk) 1733 con->sock->sk->sk_user_data = NULL; 1734 } 1735 1736 static void free_conn(struct connection *con) 1737 { 1738 close_connection(con, true); 1739 if (con->othercon) 1740 kmem_cache_free(con_cache, con->othercon); 1741 hlist_del(&con->list); 1742 kmem_cache_free(con_cache, con); 1743 } 1744 1745 void dlm_lowcomms_stop(void) 1746 { 1747 /* Set all the flags to prevent any 1748 socket activity. 1749 */ 1750 mutex_lock(&connections_lock); 1751 dlm_allow_conn = 0; 1752 foreach_conn(stop_conn); 1753 mutex_unlock(&connections_lock); 1754 1755 work_stop(); 1756 1757 mutex_lock(&connections_lock); 1758 clean_writequeues(); 1759 1760 foreach_conn(free_conn); 1761 1762 mutex_unlock(&connections_lock); 1763 kmem_cache_destroy(con_cache); 1764 } 1765 1766 int dlm_lowcomms_start(void) 1767 { 1768 int error = -EINVAL; 1769 struct connection *con; 1770 int i; 1771 1772 for (i = 0; i < CONN_HASH_SIZE; i++) 1773 INIT_HLIST_HEAD(&connection_hash[i]); 1774 1775 init_local(); 1776 if (!dlm_local_count) { 1777 error = -ENOTCONN; 1778 log_print("no local IP address has been set"); 1779 goto fail; 1780 } 1781 1782 error = -ENOMEM; 1783 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1784 __alignof__(struct connection), 0, 1785 NULL); 1786 if (!con_cache) 1787 goto fail; 1788 1789 error = work_start(); 1790 if (error) 1791 goto fail_destroy; 1792 1793 dlm_allow_conn = 1; 1794 1795 /* Start listening */ 1796 if (dlm_config.ci_protocol == 0) 1797 error = tcp_listen_for_all(); 1798 else 1799 error = sctp_listen_for_all(); 1800 if (error) 1801 goto fail_unlisten; 1802 1803 return 0; 1804 1805 fail_unlisten: 1806 dlm_allow_conn = 0; 1807 con = nodeid2con(0,0); 1808 if (con) { 1809 close_connection(con, false); 1810 kmem_cache_free(con_cache, con); 1811 } 1812 fail_destroy: 1813 kmem_cache_destroy(con_cache); 1814 fail: 1815 return error; 1816 } 1817 1818 void dlm_lowcomms_exit(void) 1819 { 1820 struct dlm_node_addr *na, *safe; 1821 1822 spin_lock(&dlm_node_addrs_spin); 1823 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1824 list_del(&na->list); 1825 while (na->addr_count--) 1826 kfree(na->addr[na->addr_count]); 1827 kfree(na); 1828 } 1829 spin_unlock(&dlm_node_addrs_spin); 1830 } 1831