1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 /* 15 * lowcomms.c 16 * 17 * This is the "low-level" comms layer. 18 * 19 * It is responsible for sending/receiving messages 20 * from other nodes in the cluster. 21 * 22 * Cluster nodes are referred to by their nodeids. nodeids are 23 * simply 32 bit numbers to the locking module - if they need to 24 * be expanded for the cluster infrastructure then that is its 25 * responsibility. It is this layer's 26 * responsibility to resolve these into IP address or 27 * whatever it needs for inter-node communication. 28 * 29 * The comms level is two kernel threads that deal mainly with 30 * the receiving of messages from other nodes and passing them 31 * up to the mid-level comms layer (which understands the 32 * message format) for execution by the locking core, and 33 * a send thread which does all the setting up of connections 34 * to remote nodes and the sending of data. Threads are not allowed 35 * to send their own data because it may cause them to wait in times 36 * of high load. Also, this way, the sending thread can collect together 37 * messages bound for one node and send them in one block. 38 * 39 * lowcomms will choose to use either TCP or SCTP as its transport layer 40 * depending on the configuration variable 'protocol'. This should be set 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 43 * for the DLM to function. 44 * 45 */ 46 47 #include <asm/ioctls.h> 48 #include <net/sock.h> 49 #include <net/tcp.h> 50 #include <linux/pagemap.h> 51 #include <linux/file.h> 52 #include <linux/mutex.h> 53 #include <linux/sctp.h> 54 #include <net/sctp/user.h> 55 #include <net/ipv6.h> 56 57 #include "dlm_internal.h" 58 #include "lowcomms.h" 59 #include "midcomms.h" 60 #include "config.h" 61 62 #define NEEDED_RMEM (4*1024*1024) 63 #define CONN_HASH_SIZE 32 64 65 struct cbuf { 66 unsigned int base; 67 unsigned int len; 68 unsigned int mask; 69 }; 70 71 static void cbuf_add(struct cbuf *cb, int n) 72 { 73 cb->len += n; 74 } 75 76 static int cbuf_data(struct cbuf *cb) 77 { 78 return ((cb->base + cb->len) & cb->mask); 79 } 80 81 static void cbuf_init(struct cbuf *cb, int size) 82 { 83 cb->base = cb->len = 0; 84 cb->mask = size-1; 85 } 86 87 static void cbuf_eat(struct cbuf *cb, int n) 88 { 89 cb->len -= n; 90 cb->base += n; 91 cb->base &= cb->mask; 92 } 93 94 static bool cbuf_empty(struct cbuf *cb) 95 { 96 return cb->len == 0; 97 } 98 99 struct connection { 100 struct socket *sock; /* NULL if not connected */ 101 uint32_t nodeid; /* So we know who we are in the list */ 102 struct mutex sock_mutex; 103 unsigned long flags; 104 #define CF_READ_PENDING 1 105 #define CF_WRITE_PENDING 2 106 #define CF_CONNECT_PENDING 3 107 #define CF_INIT_PENDING 4 108 #define CF_IS_OTHERCON 5 109 struct list_head writequeue; /* List of outgoing writequeue_entries */ 110 spinlock_t writequeue_lock; 111 int (*rx_action) (struct connection *); /* What to do when active */ 112 void (*connect_action) (struct connection *); /* What to do to connect */ 113 struct page *rx_page; 114 struct cbuf cb; 115 int retries; 116 #define MAX_CONNECT_RETRIES 3 117 int sctp_assoc; 118 struct hlist_node list; 119 struct connection *othercon; 120 struct work_struct rwork; /* Receive workqueue */ 121 struct work_struct swork; /* Send workqueue */ 122 }; 123 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 124 125 /* An entry waiting to be sent */ 126 struct writequeue_entry { 127 struct list_head list; 128 struct page *page; 129 int offset; 130 int len; 131 int end; 132 int users; 133 struct connection *con; 134 }; 135 136 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 137 static int dlm_local_count; 138 139 /* Work queues */ 140 static struct workqueue_struct *recv_workqueue; 141 static struct workqueue_struct *send_workqueue; 142 143 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 144 static DEFINE_MUTEX(connections_lock); 145 static struct kmem_cache *con_cache; 146 147 static void process_recv_sockets(struct work_struct *work); 148 static void process_send_sockets(struct work_struct *work); 149 150 151 /* This is deliberately very simple because most clusters have simple 152 sequential nodeids, so we should be able to go straight to a connection 153 struct in the array */ 154 static inline int nodeid_hash(int nodeid) 155 { 156 return nodeid & (CONN_HASH_SIZE-1); 157 } 158 159 static struct connection *__find_con(int nodeid) 160 { 161 int r; 162 struct hlist_node *h; 163 struct connection *con; 164 165 r = nodeid_hash(nodeid); 166 167 hlist_for_each_entry(con, h, &connection_hash[r], list) { 168 if (con->nodeid == nodeid) 169 return con; 170 } 171 return NULL; 172 } 173 174 /* 175 * If 'allocation' is zero then we don't attempt to create a new 176 * connection structure for this node. 177 */ 178 static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 179 { 180 struct connection *con = NULL; 181 int r; 182 183 con = __find_con(nodeid); 184 if (con || !alloc) 185 return con; 186 187 con = kmem_cache_zalloc(con_cache, alloc); 188 if (!con) 189 return NULL; 190 191 r = nodeid_hash(nodeid); 192 hlist_add_head(&con->list, &connection_hash[r]); 193 194 con->nodeid = nodeid; 195 mutex_init(&con->sock_mutex); 196 INIT_LIST_HEAD(&con->writequeue); 197 spin_lock_init(&con->writequeue_lock); 198 INIT_WORK(&con->swork, process_send_sockets); 199 INIT_WORK(&con->rwork, process_recv_sockets); 200 201 /* Setup action pointers for child sockets */ 202 if (con->nodeid) { 203 struct connection *zerocon = __find_con(0); 204 205 con->connect_action = zerocon->connect_action; 206 if (!con->rx_action) 207 con->rx_action = zerocon->rx_action; 208 } 209 210 return con; 211 } 212 213 /* Loop round all connections */ 214 static void foreach_conn(void (*conn_func)(struct connection *c)) 215 { 216 int i; 217 struct hlist_node *h, *n; 218 struct connection *con; 219 220 for (i = 0; i < CONN_HASH_SIZE; i++) { 221 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ 222 conn_func(con); 223 } 224 } 225 } 226 227 static struct connection *nodeid2con(int nodeid, gfp_t allocation) 228 { 229 struct connection *con; 230 231 mutex_lock(&connections_lock); 232 con = __nodeid2con(nodeid, allocation); 233 mutex_unlock(&connections_lock); 234 235 return con; 236 } 237 238 /* This is a bit drastic, but only called when things go wrong */ 239 static struct connection *assoc2con(int assoc_id) 240 { 241 int i; 242 struct hlist_node *h; 243 struct connection *con; 244 245 mutex_lock(&connections_lock); 246 247 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 248 hlist_for_each_entry(con, h, &connection_hash[i], list) { 249 if (con && con->sctp_assoc == assoc_id) { 250 mutex_unlock(&connections_lock); 251 return con; 252 } 253 } 254 } 255 mutex_unlock(&connections_lock); 256 return NULL; 257 } 258 259 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 260 { 261 struct sockaddr_storage addr; 262 int error; 263 264 if (!dlm_local_count) 265 return -1; 266 267 error = dlm_nodeid_to_addr(nodeid, &addr); 268 if (error) 269 return error; 270 271 if (dlm_local_addr[0]->ss_family == AF_INET) { 272 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 273 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 274 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 275 } else { 276 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 277 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 278 ipv6_addr_copy(&ret6->sin6_addr, &in6->sin6_addr); 279 } 280 281 return 0; 282 } 283 284 /* Data available on socket or listen socket received a connect */ 285 static void lowcomms_data_ready(struct sock *sk, int count_unused) 286 { 287 struct connection *con = sock2con(sk); 288 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 289 queue_work(recv_workqueue, &con->rwork); 290 } 291 292 static void lowcomms_write_space(struct sock *sk) 293 { 294 struct connection *con = sock2con(sk); 295 296 if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 297 queue_work(send_workqueue, &con->swork); 298 } 299 300 static inline void lowcomms_connect_sock(struct connection *con) 301 { 302 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 303 queue_work(send_workqueue, &con->swork); 304 } 305 306 static void lowcomms_state_change(struct sock *sk) 307 { 308 if (sk->sk_state == TCP_ESTABLISHED) 309 lowcomms_write_space(sk); 310 } 311 312 /* Make a socket active */ 313 static int add_sock(struct socket *sock, struct connection *con) 314 { 315 con->sock = sock; 316 317 /* Install a data_ready callback */ 318 con->sock->sk->sk_data_ready = lowcomms_data_ready; 319 con->sock->sk->sk_write_space = lowcomms_write_space; 320 con->sock->sk->sk_state_change = lowcomms_state_change; 321 con->sock->sk->sk_user_data = con; 322 con->sock->sk->sk_allocation = GFP_NOFS; 323 return 0; 324 } 325 326 /* Add the port number to an IPv6 or 4 sockaddr and return the address 327 length */ 328 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 329 int *addr_len) 330 { 331 saddr->ss_family = dlm_local_addr[0]->ss_family; 332 if (saddr->ss_family == AF_INET) { 333 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 334 in4_addr->sin_port = cpu_to_be16(port); 335 *addr_len = sizeof(struct sockaddr_in); 336 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 337 } else { 338 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 339 in6_addr->sin6_port = cpu_to_be16(port); 340 *addr_len = sizeof(struct sockaddr_in6); 341 } 342 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 343 } 344 345 /* Close a remote connection and tidy up */ 346 static void close_connection(struct connection *con, bool and_other) 347 { 348 mutex_lock(&con->sock_mutex); 349 350 if (con->sock) { 351 sock_release(con->sock); 352 con->sock = NULL; 353 } 354 if (con->othercon && and_other) { 355 /* Will only re-enter once. */ 356 close_connection(con->othercon, false); 357 } 358 if (con->rx_page) { 359 __free_page(con->rx_page); 360 con->rx_page = NULL; 361 } 362 363 con->retries = 0; 364 mutex_unlock(&con->sock_mutex); 365 } 366 367 /* We only send shutdown messages to nodes that are not part of the cluster */ 368 static void sctp_send_shutdown(sctp_assoc_t associd) 369 { 370 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 371 struct msghdr outmessage; 372 struct cmsghdr *cmsg; 373 struct sctp_sndrcvinfo *sinfo; 374 int ret; 375 struct connection *con; 376 377 con = nodeid2con(0,0); 378 BUG_ON(con == NULL); 379 380 outmessage.msg_name = NULL; 381 outmessage.msg_namelen = 0; 382 outmessage.msg_control = outcmsg; 383 outmessage.msg_controllen = sizeof(outcmsg); 384 outmessage.msg_flags = MSG_EOR; 385 386 cmsg = CMSG_FIRSTHDR(&outmessage); 387 cmsg->cmsg_level = IPPROTO_SCTP; 388 cmsg->cmsg_type = SCTP_SNDRCV; 389 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 390 outmessage.msg_controllen = cmsg->cmsg_len; 391 sinfo = CMSG_DATA(cmsg); 392 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 393 394 sinfo->sinfo_flags |= MSG_EOF; 395 sinfo->sinfo_assoc_id = associd; 396 397 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 398 399 if (ret != 0) 400 log_print("send EOF to node failed: %d", ret); 401 } 402 403 static void sctp_init_failed_foreach(struct connection *con) 404 { 405 con->sctp_assoc = 0; 406 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 407 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 408 queue_work(send_workqueue, &con->swork); 409 } 410 } 411 412 /* INIT failed but we don't know which node... 413 restart INIT on all pending nodes */ 414 static void sctp_init_failed(void) 415 { 416 mutex_lock(&connections_lock); 417 418 foreach_conn(sctp_init_failed_foreach); 419 420 mutex_unlock(&connections_lock); 421 } 422 423 /* Something happened to an association */ 424 static void process_sctp_notification(struct connection *con, 425 struct msghdr *msg, char *buf) 426 { 427 union sctp_notification *sn = (union sctp_notification *)buf; 428 429 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 430 switch (sn->sn_assoc_change.sac_state) { 431 432 case SCTP_COMM_UP: 433 case SCTP_RESTART: 434 { 435 /* Check that the new node is in the lockspace */ 436 struct sctp_prim prim; 437 int nodeid; 438 int prim_len, ret; 439 int addr_len; 440 struct connection *new_con; 441 struct file *file; 442 sctp_peeloff_arg_t parg; 443 int parglen = sizeof(parg); 444 445 /* 446 * We get this before any data for an association. 447 * We verify that the node is in the cluster and 448 * then peel off a socket for it. 449 */ 450 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 451 log_print("COMM_UP for invalid assoc ID %d", 452 (int)sn->sn_assoc_change.sac_assoc_id); 453 sctp_init_failed(); 454 return; 455 } 456 memset(&prim, 0, sizeof(struct sctp_prim)); 457 prim_len = sizeof(struct sctp_prim); 458 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 459 460 ret = kernel_getsockopt(con->sock, 461 IPPROTO_SCTP, 462 SCTP_PRIMARY_ADDR, 463 (char*)&prim, 464 &prim_len); 465 if (ret < 0) { 466 log_print("getsockopt/sctp_primary_addr on " 467 "new assoc %d failed : %d", 468 (int)sn->sn_assoc_change.sac_assoc_id, 469 ret); 470 471 /* Retry INIT later */ 472 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 473 if (new_con) 474 clear_bit(CF_CONNECT_PENDING, &con->flags); 475 return; 476 } 477 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 478 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 479 int i; 480 unsigned char *b=(unsigned char *)&prim.ssp_addr; 481 log_print("reject connect from unknown addr"); 482 for (i=0; i<sizeof(struct sockaddr_storage);i++) 483 printk("%02x ", b[i]); 484 printk("\n"); 485 sctp_send_shutdown(prim.ssp_assoc_id); 486 return; 487 } 488 489 new_con = nodeid2con(nodeid, GFP_KERNEL); 490 if (!new_con) 491 return; 492 493 /* Peel off a new sock */ 494 parg.associd = sn->sn_assoc_change.sac_assoc_id; 495 ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, 496 SCTP_SOCKOPT_PEELOFF, 497 (void *)&parg, &parglen); 498 if (ret) { 499 log_print("Can't peel off a socket for " 500 "connection %d to node %d: err=%d\n", 501 parg.associd, nodeid, ret); 502 } 503 file = fget(parg.sd); 504 new_con->sock = SOCKET_I(file->f_dentry->d_inode); 505 add_sock(new_con->sock, new_con); 506 fput(file); 507 put_unused_fd(parg.sd); 508 509 log_print("got new/restarted association %d nodeid %d", 510 (int)sn->sn_assoc_change.sac_assoc_id, nodeid); 511 512 /* Send any pending writes */ 513 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 514 clear_bit(CF_INIT_PENDING, &con->flags); 515 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 516 queue_work(send_workqueue, &new_con->swork); 517 } 518 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 519 queue_work(recv_workqueue, &new_con->rwork); 520 } 521 break; 522 523 case SCTP_COMM_LOST: 524 case SCTP_SHUTDOWN_COMP: 525 { 526 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 527 if (con) { 528 con->sctp_assoc = 0; 529 } 530 } 531 break; 532 533 /* We don't know which INIT failed, so clear the PENDING flags 534 * on them all. if assoc_id is zero then it will then try 535 * again */ 536 537 case SCTP_CANT_STR_ASSOC: 538 { 539 log_print("Can't start SCTP association - retrying"); 540 sctp_init_failed(); 541 } 542 break; 543 544 default: 545 log_print("unexpected SCTP assoc change id=%d state=%d", 546 (int)sn->sn_assoc_change.sac_assoc_id, 547 sn->sn_assoc_change.sac_state); 548 } 549 } 550 } 551 552 /* Data received from remote end */ 553 static int receive_from_sock(struct connection *con) 554 { 555 int ret = 0; 556 struct msghdr msg = {}; 557 struct kvec iov[2]; 558 unsigned len; 559 int r; 560 int call_again_soon = 0; 561 int nvec; 562 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 563 564 mutex_lock(&con->sock_mutex); 565 566 if (con->sock == NULL) { 567 ret = -EAGAIN; 568 goto out_close; 569 } 570 571 if (con->rx_page == NULL) { 572 /* 573 * This doesn't need to be atomic, but I think it should 574 * improve performance if it is. 575 */ 576 con->rx_page = alloc_page(GFP_ATOMIC); 577 if (con->rx_page == NULL) 578 goto out_resched; 579 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 580 } 581 582 /* Only SCTP needs these really */ 583 memset(&incmsg, 0, sizeof(incmsg)); 584 msg.msg_control = incmsg; 585 msg.msg_controllen = sizeof(incmsg); 586 587 /* 588 * iov[0] is the bit of the circular buffer between the current end 589 * point (cb.base + cb.len) and the end of the buffer. 590 */ 591 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 592 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 593 iov[1].iov_len = 0; 594 nvec = 1; 595 596 /* 597 * iov[1] is the bit of the circular buffer between the start of the 598 * buffer and the start of the currently used section (cb.base) 599 */ 600 if (cbuf_data(&con->cb) >= con->cb.base) { 601 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 602 iov[1].iov_len = con->cb.base; 603 iov[1].iov_base = page_address(con->rx_page); 604 nvec = 2; 605 } 606 len = iov[0].iov_len + iov[1].iov_len; 607 608 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 609 MSG_DONTWAIT | MSG_NOSIGNAL); 610 if (ret <= 0) 611 goto out_close; 612 613 /* Process SCTP notifications */ 614 if (msg.msg_flags & MSG_NOTIFICATION) { 615 msg.msg_control = incmsg; 616 msg.msg_controllen = sizeof(incmsg); 617 618 process_sctp_notification(con, &msg, 619 page_address(con->rx_page) + con->cb.base); 620 mutex_unlock(&con->sock_mutex); 621 return 0; 622 } 623 BUG_ON(con->nodeid == 0); 624 625 if (ret == len) 626 call_again_soon = 1; 627 cbuf_add(&con->cb, ret); 628 ret = dlm_process_incoming_buffer(con->nodeid, 629 page_address(con->rx_page), 630 con->cb.base, con->cb.len, 631 PAGE_CACHE_SIZE); 632 if (ret == -EBADMSG) { 633 log_print("lowcomms: addr=%p, base=%u, len=%u, " 634 "iov_len=%u, iov_base[0]=%p, read=%d", 635 page_address(con->rx_page), con->cb.base, con->cb.len, 636 len, iov[0].iov_base, r); 637 } 638 if (ret < 0) 639 goto out_close; 640 cbuf_eat(&con->cb, ret); 641 642 if (cbuf_empty(&con->cb) && !call_again_soon) { 643 __free_page(con->rx_page); 644 con->rx_page = NULL; 645 } 646 647 if (call_again_soon) 648 goto out_resched; 649 mutex_unlock(&con->sock_mutex); 650 return 0; 651 652 out_resched: 653 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 654 queue_work(recv_workqueue, &con->rwork); 655 mutex_unlock(&con->sock_mutex); 656 return -EAGAIN; 657 658 out_close: 659 mutex_unlock(&con->sock_mutex); 660 if (ret != -EAGAIN) { 661 close_connection(con, false); 662 /* Reconnect when there is something to send */ 663 } 664 /* Don't return success if we really got EOF */ 665 if (ret == 0) 666 ret = -EAGAIN; 667 668 return ret; 669 } 670 671 /* Listening socket is busy, accept a connection */ 672 static int tcp_accept_from_sock(struct connection *con) 673 { 674 int result; 675 struct sockaddr_storage peeraddr; 676 struct socket *newsock; 677 int len; 678 int nodeid; 679 struct connection *newcon; 680 struct connection *addcon; 681 682 memset(&peeraddr, 0, sizeof(peeraddr)); 683 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 684 IPPROTO_TCP, &newsock); 685 if (result < 0) 686 return -ENOMEM; 687 688 mutex_lock_nested(&con->sock_mutex, 0); 689 690 result = -ENOTCONN; 691 if (con->sock == NULL) 692 goto accept_err; 693 694 newsock->type = con->sock->type; 695 newsock->ops = con->sock->ops; 696 697 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 698 if (result < 0) 699 goto accept_err; 700 701 /* Get the connected socket's peer */ 702 memset(&peeraddr, 0, sizeof(peeraddr)); 703 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 704 &len, 2)) { 705 result = -ECONNABORTED; 706 goto accept_err; 707 } 708 709 /* Get the new node's NODEID */ 710 make_sockaddr(&peeraddr, 0, &len); 711 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 712 log_print("connect from non cluster node"); 713 sock_release(newsock); 714 mutex_unlock(&con->sock_mutex); 715 return -1; 716 } 717 718 log_print("got connection from %d", nodeid); 719 720 /* Check to see if we already have a connection to this node. This 721 * could happen if the two nodes initiate a connection at roughly 722 * the same time and the connections cross on the wire. 723 * In this case we store the incoming one in "othercon" 724 */ 725 newcon = nodeid2con(nodeid, GFP_KERNEL); 726 if (!newcon) { 727 result = -ENOMEM; 728 goto accept_err; 729 } 730 mutex_lock_nested(&newcon->sock_mutex, 1); 731 if (newcon->sock) { 732 struct connection *othercon = newcon->othercon; 733 734 if (!othercon) { 735 othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL); 736 if (!othercon) { 737 log_print("failed to allocate incoming socket"); 738 mutex_unlock(&newcon->sock_mutex); 739 result = -ENOMEM; 740 goto accept_err; 741 } 742 othercon->nodeid = nodeid; 743 othercon->rx_action = receive_from_sock; 744 mutex_init(&othercon->sock_mutex); 745 INIT_WORK(&othercon->swork, process_send_sockets); 746 INIT_WORK(&othercon->rwork, process_recv_sockets); 747 set_bit(CF_IS_OTHERCON, &othercon->flags); 748 } 749 if (!othercon->sock) { 750 newcon->othercon = othercon; 751 othercon->sock = newsock; 752 newsock->sk->sk_user_data = othercon; 753 add_sock(newsock, othercon); 754 addcon = othercon; 755 } 756 else { 757 printk("Extra connection from node %d attempted\n", nodeid); 758 result = -EAGAIN; 759 mutex_unlock(&newcon->sock_mutex); 760 goto accept_err; 761 } 762 } 763 else { 764 newsock->sk->sk_user_data = newcon; 765 newcon->rx_action = receive_from_sock; 766 add_sock(newsock, newcon); 767 addcon = newcon; 768 } 769 770 mutex_unlock(&newcon->sock_mutex); 771 772 /* 773 * Add it to the active queue in case we got data 774 * beween processing the accept adding the socket 775 * to the read_sockets list 776 */ 777 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 778 queue_work(recv_workqueue, &addcon->rwork); 779 mutex_unlock(&con->sock_mutex); 780 781 return 0; 782 783 accept_err: 784 mutex_unlock(&con->sock_mutex); 785 sock_release(newsock); 786 787 if (result != -EAGAIN) 788 log_print("error accepting connection from node: %d", result); 789 return result; 790 } 791 792 static void free_entry(struct writequeue_entry *e) 793 { 794 __free_page(e->page); 795 kfree(e); 796 } 797 798 /* Initiate an SCTP association. 799 This is a special case of send_to_sock() in that we don't yet have a 800 peeled-off socket for this association, so we use the listening socket 801 and add the primary IP address of the remote node. 802 */ 803 static void sctp_init_assoc(struct connection *con) 804 { 805 struct sockaddr_storage rem_addr; 806 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 807 struct msghdr outmessage; 808 struct cmsghdr *cmsg; 809 struct sctp_sndrcvinfo *sinfo; 810 struct connection *base_con; 811 struct writequeue_entry *e; 812 int len, offset; 813 int ret; 814 int addrlen; 815 struct kvec iov[1]; 816 817 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 818 return; 819 820 if (con->retries++ > MAX_CONNECT_RETRIES) 821 return; 822 823 log_print("Initiating association with node %d", con->nodeid); 824 825 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { 826 log_print("no address for nodeid %d", con->nodeid); 827 return; 828 } 829 base_con = nodeid2con(0, 0); 830 BUG_ON(base_con == NULL); 831 832 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 833 834 outmessage.msg_name = &rem_addr; 835 outmessage.msg_namelen = addrlen; 836 outmessage.msg_control = outcmsg; 837 outmessage.msg_controllen = sizeof(outcmsg); 838 outmessage.msg_flags = MSG_EOR; 839 840 spin_lock(&con->writequeue_lock); 841 e = list_entry(con->writequeue.next, struct writequeue_entry, 842 list); 843 844 BUG_ON((struct list_head *) e == &con->writequeue); 845 846 len = e->len; 847 offset = e->offset; 848 spin_unlock(&con->writequeue_lock); 849 850 /* Send the first block off the write queue */ 851 iov[0].iov_base = page_address(e->page)+offset; 852 iov[0].iov_len = len; 853 854 cmsg = CMSG_FIRSTHDR(&outmessage); 855 cmsg->cmsg_level = IPPROTO_SCTP; 856 cmsg->cmsg_type = SCTP_SNDRCV; 857 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 858 sinfo = CMSG_DATA(cmsg); 859 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 860 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 861 outmessage.msg_controllen = cmsg->cmsg_len; 862 863 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 864 if (ret < 0) { 865 log_print("Send first packet to node %d failed: %d", 866 con->nodeid, ret); 867 868 /* Try again later */ 869 clear_bit(CF_CONNECT_PENDING, &con->flags); 870 clear_bit(CF_INIT_PENDING, &con->flags); 871 } 872 else { 873 spin_lock(&con->writequeue_lock); 874 e->offset += ret; 875 e->len -= ret; 876 877 if (e->len == 0 && e->users == 0) { 878 list_del(&e->list); 879 free_entry(e); 880 } 881 spin_unlock(&con->writequeue_lock); 882 } 883 } 884 885 /* Connect a new socket to its peer */ 886 static void tcp_connect_to_sock(struct connection *con) 887 { 888 int result = -EHOSTUNREACH; 889 struct sockaddr_storage saddr, src_addr; 890 int addr_len; 891 struct socket *sock; 892 893 if (con->nodeid == 0) { 894 log_print("attempt to connect sock 0 foiled"); 895 return; 896 } 897 898 mutex_lock(&con->sock_mutex); 899 if (con->retries++ > MAX_CONNECT_RETRIES) 900 goto out; 901 902 /* Some odd races can cause double-connects, ignore them */ 903 if (con->sock) { 904 result = 0; 905 goto out; 906 } 907 908 /* Create a socket to communicate with */ 909 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 910 IPPROTO_TCP, &sock); 911 if (result < 0) 912 goto out_err; 913 914 memset(&saddr, 0, sizeof(saddr)); 915 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) { 916 sock_release(sock); 917 goto out_err; 918 } 919 920 sock->sk->sk_user_data = con; 921 con->rx_action = receive_from_sock; 922 con->connect_action = tcp_connect_to_sock; 923 add_sock(sock, con); 924 925 /* Bind to our cluster-known address connecting to avoid 926 routing problems */ 927 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 928 make_sockaddr(&src_addr, 0, &addr_len); 929 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 930 addr_len); 931 if (result < 0) { 932 log_print("could not bind for connect: %d", result); 933 /* This *may* not indicate a critical error */ 934 } 935 936 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 937 938 log_print("connecting to %d", con->nodeid); 939 result = 940 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 941 O_NONBLOCK); 942 if (result == -EINPROGRESS) 943 result = 0; 944 if (result == 0) 945 goto out; 946 947 out_err: 948 if (con->sock) { 949 sock_release(con->sock); 950 con->sock = NULL; 951 } 952 /* 953 * Some errors are fatal and this list might need adjusting. For other 954 * errors we try again until the max number of retries is reached. 955 */ 956 if (result != -EHOSTUNREACH && result != -ENETUNREACH && 957 result != -ENETDOWN && result != -EINVAL 958 && result != -EPROTONOSUPPORT) { 959 lowcomms_connect_sock(con); 960 result = 0; 961 } 962 out: 963 mutex_unlock(&con->sock_mutex); 964 return; 965 } 966 967 static struct socket *tcp_create_listen_sock(struct connection *con, 968 struct sockaddr_storage *saddr) 969 { 970 struct socket *sock = NULL; 971 int result = 0; 972 int one = 1; 973 int addr_len; 974 975 if (dlm_local_addr[0]->ss_family == AF_INET) 976 addr_len = sizeof(struct sockaddr_in); 977 else 978 addr_len = sizeof(struct sockaddr_in6); 979 980 /* Create a socket to communicate with */ 981 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 982 IPPROTO_TCP, &sock); 983 if (result < 0) { 984 log_print("Can't create listening comms socket"); 985 goto create_out; 986 } 987 988 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 989 (char *)&one, sizeof(one)); 990 991 if (result < 0) { 992 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 993 } 994 sock->sk->sk_user_data = con; 995 con->rx_action = tcp_accept_from_sock; 996 con->connect_action = tcp_connect_to_sock; 997 con->sock = sock; 998 999 /* Bind to our port */ 1000 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1001 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1002 if (result < 0) { 1003 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1004 sock_release(sock); 1005 sock = NULL; 1006 con->sock = NULL; 1007 goto create_out; 1008 } 1009 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1010 (char *)&one, sizeof(one)); 1011 if (result < 0) { 1012 log_print("Set keepalive failed: %d", result); 1013 } 1014 1015 result = sock->ops->listen(sock, 5); 1016 if (result < 0) { 1017 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1018 sock_release(sock); 1019 sock = NULL; 1020 goto create_out; 1021 } 1022 1023 create_out: 1024 return sock; 1025 } 1026 1027 /* Get local addresses */ 1028 static void init_local(void) 1029 { 1030 struct sockaddr_storage sas, *addr; 1031 int i; 1032 1033 dlm_local_count = 0; 1034 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { 1035 if (dlm_our_addr(&sas, i)) 1036 break; 1037 1038 addr = kmalloc(sizeof(*addr), GFP_KERNEL); 1039 if (!addr) 1040 break; 1041 memcpy(addr, &sas, sizeof(*addr)); 1042 dlm_local_addr[dlm_local_count++] = addr; 1043 } 1044 } 1045 1046 /* Bind to an IP address. SCTP allows multiple address so it can do 1047 multi-homing */ 1048 static int add_sctp_bind_addr(struct connection *sctp_con, 1049 struct sockaddr_storage *addr, 1050 int addr_len, int num) 1051 { 1052 int result = 0; 1053 1054 if (num == 1) 1055 result = kernel_bind(sctp_con->sock, 1056 (struct sockaddr *) addr, 1057 addr_len); 1058 else 1059 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1060 SCTP_SOCKOPT_BINDX_ADD, 1061 (char *)addr, addr_len); 1062 1063 if (result < 0) 1064 log_print("Can't bind to port %d addr number %d", 1065 dlm_config.ci_tcp_port, num); 1066 1067 return result; 1068 } 1069 1070 /* Initialise SCTP socket and bind to all interfaces */ 1071 static int sctp_listen_for_all(void) 1072 { 1073 struct socket *sock = NULL; 1074 struct sockaddr_storage localaddr; 1075 struct sctp_event_subscribe subscribe; 1076 int result = -EINVAL, num = 1, i, addr_len; 1077 struct connection *con = nodeid2con(0, GFP_KERNEL); 1078 int bufsize = NEEDED_RMEM; 1079 1080 if (!con) 1081 return -ENOMEM; 1082 1083 log_print("Using SCTP for communications"); 1084 1085 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1086 IPPROTO_SCTP, &sock); 1087 if (result < 0) { 1088 log_print("Can't create comms socket, check SCTP is loaded"); 1089 goto out; 1090 } 1091 1092 /* Listen for events */ 1093 memset(&subscribe, 0, sizeof(subscribe)); 1094 subscribe.sctp_data_io_event = 1; 1095 subscribe.sctp_association_event = 1; 1096 subscribe.sctp_send_failure_event = 1; 1097 subscribe.sctp_shutdown_event = 1; 1098 subscribe.sctp_partial_delivery_event = 1; 1099 1100 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1101 (char *)&bufsize, sizeof(bufsize)); 1102 if (result) 1103 log_print("Error increasing buffer space on socket %d", result); 1104 1105 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1106 (char *)&subscribe, sizeof(subscribe)); 1107 if (result < 0) { 1108 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1109 result); 1110 goto create_delsock; 1111 } 1112 1113 /* Init con struct */ 1114 sock->sk->sk_user_data = con; 1115 con->sock = sock; 1116 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1117 con->rx_action = receive_from_sock; 1118 con->connect_action = sctp_init_assoc; 1119 1120 /* Bind to all interfaces. */ 1121 for (i = 0; i < dlm_local_count; i++) { 1122 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1123 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1124 1125 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1126 if (result) 1127 goto create_delsock; 1128 ++num; 1129 } 1130 1131 result = sock->ops->listen(sock, 5); 1132 if (result < 0) { 1133 log_print("Can't set socket listening"); 1134 goto create_delsock; 1135 } 1136 1137 return 0; 1138 1139 create_delsock: 1140 sock_release(sock); 1141 con->sock = NULL; 1142 out: 1143 return result; 1144 } 1145 1146 static int tcp_listen_for_all(void) 1147 { 1148 struct socket *sock = NULL; 1149 struct connection *con = nodeid2con(0, GFP_KERNEL); 1150 int result = -EINVAL; 1151 1152 if (!con) 1153 return -ENOMEM; 1154 1155 /* We don't support multi-homed hosts */ 1156 if (dlm_local_addr[1] != NULL) { 1157 log_print("TCP protocol can't handle multi-homed hosts, " 1158 "try SCTP"); 1159 return -EINVAL; 1160 } 1161 1162 log_print("Using TCP for communications"); 1163 1164 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1165 if (sock) { 1166 add_sock(sock, con); 1167 result = 0; 1168 } 1169 else { 1170 result = -EADDRINUSE; 1171 } 1172 1173 return result; 1174 } 1175 1176 1177 1178 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1179 gfp_t allocation) 1180 { 1181 struct writequeue_entry *entry; 1182 1183 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1184 if (!entry) 1185 return NULL; 1186 1187 entry->page = alloc_page(allocation); 1188 if (!entry->page) { 1189 kfree(entry); 1190 return NULL; 1191 } 1192 1193 entry->offset = 0; 1194 entry->len = 0; 1195 entry->end = 0; 1196 entry->users = 0; 1197 entry->con = con; 1198 1199 return entry; 1200 } 1201 1202 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1203 { 1204 struct connection *con; 1205 struct writequeue_entry *e; 1206 int offset = 0; 1207 int users = 0; 1208 1209 con = nodeid2con(nodeid, allocation); 1210 if (!con) 1211 return NULL; 1212 1213 spin_lock(&con->writequeue_lock); 1214 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1215 if ((&e->list == &con->writequeue) || 1216 (PAGE_CACHE_SIZE - e->end < len)) { 1217 e = NULL; 1218 } else { 1219 offset = e->end; 1220 e->end += len; 1221 users = e->users++; 1222 } 1223 spin_unlock(&con->writequeue_lock); 1224 1225 if (e) { 1226 got_one: 1227 *ppc = page_address(e->page) + offset; 1228 return e; 1229 } 1230 1231 e = new_writequeue_entry(con, allocation); 1232 if (e) { 1233 spin_lock(&con->writequeue_lock); 1234 offset = e->end; 1235 e->end += len; 1236 users = e->users++; 1237 list_add_tail(&e->list, &con->writequeue); 1238 spin_unlock(&con->writequeue_lock); 1239 goto got_one; 1240 } 1241 return NULL; 1242 } 1243 1244 void dlm_lowcomms_commit_buffer(void *mh) 1245 { 1246 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1247 struct connection *con = e->con; 1248 int users; 1249 1250 spin_lock(&con->writequeue_lock); 1251 users = --e->users; 1252 if (users) 1253 goto out; 1254 e->len = e->end - e->offset; 1255 spin_unlock(&con->writequeue_lock); 1256 1257 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1258 queue_work(send_workqueue, &con->swork); 1259 } 1260 return; 1261 1262 out: 1263 spin_unlock(&con->writequeue_lock); 1264 return; 1265 } 1266 1267 /* Send a message */ 1268 static void send_to_sock(struct connection *con) 1269 { 1270 int ret = 0; 1271 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); 1272 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1273 struct writequeue_entry *e; 1274 int len, offset; 1275 1276 mutex_lock(&con->sock_mutex); 1277 if (con->sock == NULL) 1278 goto out_connect; 1279 1280 sendpage = con->sock->ops->sendpage; 1281 1282 spin_lock(&con->writequeue_lock); 1283 for (;;) { 1284 e = list_entry(con->writequeue.next, struct writequeue_entry, 1285 list); 1286 if ((struct list_head *) e == &con->writequeue) 1287 break; 1288 1289 len = e->len; 1290 offset = e->offset; 1291 BUG_ON(len == 0 && e->users == 0); 1292 spin_unlock(&con->writequeue_lock); 1293 1294 ret = 0; 1295 if (len) { 1296 ret = sendpage(con->sock, e->page, offset, len, 1297 msg_flags); 1298 if (ret == -EAGAIN || ret == 0) { 1299 cond_resched(); 1300 goto out; 1301 } 1302 if (ret <= 0) 1303 goto send_error; 1304 } 1305 /* Don't starve people filling buffers */ 1306 cond_resched(); 1307 1308 spin_lock(&con->writequeue_lock); 1309 e->offset += ret; 1310 e->len -= ret; 1311 1312 if (e->len == 0 && e->users == 0) { 1313 list_del(&e->list); 1314 free_entry(e); 1315 continue; 1316 } 1317 } 1318 spin_unlock(&con->writequeue_lock); 1319 out: 1320 mutex_unlock(&con->sock_mutex); 1321 return; 1322 1323 send_error: 1324 mutex_unlock(&con->sock_mutex); 1325 close_connection(con, false); 1326 lowcomms_connect_sock(con); 1327 return; 1328 1329 out_connect: 1330 mutex_unlock(&con->sock_mutex); 1331 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1332 lowcomms_connect_sock(con); 1333 return; 1334 } 1335 1336 static void clean_one_writequeue(struct connection *con) 1337 { 1338 struct writequeue_entry *e, *safe; 1339 1340 spin_lock(&con->writequeue_lock); 1341 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1342 list_del(&e->list); 1343 free_entry(e); 1344 } 1345 spin_unlock(&con->writequeue_lock); 1346 } 1347 1348 /* Called from recovery when it knows that a node has 1349 left the cluster */ 1350 int dlm_lowcomms_close(int nodeid) 1351 { 1352 struct connection *con; 1353 1354 log_print("closing connection to node %d", nodeid); 1355 con = nodeid2con(nodeid, 0); 1356 if (con) { 1357 clean_one_writequeue(con); 1358 close_connection(con, true); 1359 } 1360 return 0; 1361 } 1362 1363 /* Receive workqueue function */ 1364 static void process_recv_sockets(struct work_struct *work) 1365 { 1366 struct connection *con = container_of(work, struct connection, rwork); 1367 int err; 1368 1369 clear_bit(CF_READ_PENDING, &con->flags); 1370 do { 1371 err = con->rx_action(con); 1372 } while (!err); 1373 } 1374 1375 /* Send workqueue function */ 1376 static void process_send_sockets(struct work_struct *work) 1377 { 1378 struct connection *con = container_of(work, struct connection, swork); 1379 1380 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1381 con->connect_action(con); 1382 } 1383 clear_bit(CF_WRITE_PENDING, &con->flags); 1384 send_to_sock(con); 1385 } 1386 1387 1388 /* Discard all entries on the write queues */ 1389 static void clean_writequeues(void) 1390 { 1391 foreach_conn(clean_one_writequeue); 1392 } 1393 1394 static void work_stop(void) 1395 { 1396 destroy_workqueue(recv_workqueue); 1397 destroy_workqueue(send_workqueue); 1398 } 1399 1400 static int work_start(void) 1401 { 1402 int error; 1403 recv_workqueue = create_workqueue("dlm_recv"); 1404 error = IS_ERR(recv_workqueue); 1405 if (error) { 1406 log_print("can't start dlm_recv %d", error); 1407 return error; 1408 } 1409 1410 send_workqueue = create_singlethread_workqueue("dlm_send"); 1411 error = IS_ERR(send_workqueue); 1412 if (error) { 1413 log_print("can't start dlm_send %d", error); 1414 destroy_workqueue(recv_workqueue); 1415 return error; 1416 } 1417 1418 return 0; 1419 } 1420 1421 static void stop_conn(struct connection *con) 1422 { 1423 con->flags |= 0x0F; 1424 if (con->sock) 1425 con->sock->sk->sk_user_data = NULL; 1426 } 1427 1428 static void free_conn(struct connection *con) 1429 { 1430 close_connection(con, true); 1431 if (con->othercon) 1432 kmem_cache_free(con_cache, con->othercon); 1433 hlist_del(&con->list); 1434 kmem_cache_free(con_cache, con); 1435 } 1436 1437 void dlm_lowcomms_stop(void) 1438 { 1439 /* Set all the flags to prevent any 1440 socket activity. 1441 */ 1442 mutex_lock(&connections_lock); 1443 foreach_conn(stop_conn); 1444 mutex_unlock(&connections_lock); 1445 1446 work_stop(); 1447 1448 mutex_lock(&connections_lock); 1449 clean_writequeues(); 1450 1451 foreach_conn(free_conn); 1452 1453 mutex_unlock(&connections_lock); 1454 kmem_cache_destroy(con_cache); 1455 } 1456 1457 int dlm_lowcomms_start(void) 1458 { 1459 int error = -EINVAL; 1460 struct connection *con; 1461 int i; 1462 1463 for (i = 0; i < CONN_HASH_SIZE; i++) 1464 INIT_HLIST_HEAD(&connection_hash[i]); 1465 1466 init_local(); 1467 if (!dlm_local_count) { 1468 error = -ENOTCONN; 1469 log_print("no local IP address has been set"); 1470 goto out; 1471 } 1472 1473 error = -ENOMEM; 1474 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1475 __alignof__(struct connection), 0, 1476 NULL); 1477 if (!con_cache) 1478 goto out; 1479 1480 /* Start listening */ 1481 if (dlm_config.ci_protocol == 0) 1482 error = tcp_listen_for_all(); 1483 else 1484 error = sctp_listen_for_all(); 1485 if (error) 1486 goto fail_unlisten; 1487 1488 error = work_start(); 1489 if (error) 1490 goto fail_unlisten; 1491 1492 return 0; 1493 1494 fail_unlisten: 1495 con = nodeid2con(0,0); 1496 if (con) { 1497 close_connection(con, false); 1498 kmem_cache_free(con_cache, con); 1499 } 1500 kmem_cache_destroy(con_cache); 1501 1502 out: 1503 return error; 1504 } 1505