1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2021 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * midcomms.c 14 * 15 * This is the appallingly named "mid-level" comms layer. It takes care about 16 * deliver an on application layer "reliable" communication above the used 17 * lowcomms transport layer. 18 * 19 * How it works: 20 * 21 * Each nodes keeps track of all send DLM messages in send_queue with a sequence 22 * number. The receive will send an DLM_ACK message back for every DLM message 23 * received at the other side. If a reconnect happens in lowcomms we will send 24 * all unacknowledged dlm messages again. The receiving side might drop any already 25 * received message by comparing sequence numbers. 26 * 27 * How version detection works: 28 * 29 * Due the fact that dlm has pre-configured node addresses on every side 30 * it is in it's nature that every side connects at starts to transmit 31 * dlm messages which ends in a race. However DLM_RCOM_NAMES, DLM_RCOM_STATUS 32 * and their replies are the first messages which are exchanges. Due backwards 33 * compatibility these messages are not covered by the midcomms re-transmission 34 * layer. These messages have their own re-transmission handling in the dlm 35 * application layer. The version field of every node will be set on these RCOM 36 * messages as soon as they arrived and the node isn't yet part of the nodes 37 * hash. There exists also logic to detect version mismatched if something weird 38 * going on or the first messages isn't an expected one. 39 * 40 * Termination: 41 * 42 * The midcomms layer does a 4 way handshake for termination on DLM protocol 43 * like TCP supports it with half-closed socket support. SCTP doesn't support 44 * half-closed socket, so we do it on DLM layer. Also socket shutdown() can be 45 * interrupted by .e.g. tcp reset itself. Additional there exists the othercon 46 * paradigm in lowcomms which cannot be easily without breaking backwards 47 * compatibility. A node cannot send anything to another node when a DLM_FIN 48 * message was send. There exists additional logic to print a warning if 49 * DLM wants to do it. There exists a state handling like RFC 793 but reduced 50 * to termination only. The event "member removal event" describes the cluster 51 * manager removed the node from internal lists, at this point DLM does not 52 * send any message to the other node. There exists two cases: 53 * 54 * 1. The cluster member was removed and we received a FIN 55 * OR 56 * 2. We received a FIN but the member was not removed yet 57 * 58 * One of these cases will do the CLOSE_WAIT to LAST_ACK change. 59 * 60 * 61 * +---------+ 62 * | CLOSED | 63 * +---------+ 64 * | add member/receive RCOM version 65 * | detection msg 66 * V 67 * +---------+ 68 * | ESTAB | 69 * +---------+ 70 * CLOSE | | rcv FIN 71 * ------- | | ------- 72 * +---------+ snd FIN / \ snd ACK +---------+ 73 * | FIN |<----------------- ------------------>| CLOSE | 74 * | WAIT-1 |------------------ | WAIT | 75 * +---------+ rcv FIN \ +---------+ 76 * | rcv ACK of FIN ------- | CLOSE | member 77 * | -------------- snd ACK | ------- | removal 78 * V x V snd FIN V event 79 * +---------+ +---------+ +---------+ 80 * |FINWAIT-2| | CLOSING | | LAST-ACK| 81 * +---------+ +---------+ +---------+ 82 * | rcv ACK of FIN | rcv ACK of FIN | 83 * | rcv FIN -------------- | -------------- | 84 * | ------- x V x V 85 * \ snd ACK +---------+ +---------+ 86 * ------------------------>| CLOSED | | CLOSED | 87 * +---------+ +---------+ 88 * 89 * NOTE: any state can interrupted by midcomms_close() and state will be 90 * switched to CLOSED in case of fencing. There exists also some timeout 91 * handling when we receive the version detection RCOM messages which is 92 * made by observation. 93 * 94 * Future improvements: 95 * 96 * There exists some known issues/improvements of the dlm handling. Some 97 * of them should be done in a next major dlm version bump which makes 98 * it incompatible with previous versions. 99 * 100 * Unaligned memory access: 101 * 102 * There exists cases when the dlm message buffer length is not aligned 103 * to 8 byte. However seems nobody detected any problem with it. This 104 * can be fixed in the next major version bump of dlm. 105 * 106 * Version detection: 107 * 108 * The version detection and how it's done is related to backwards 109 * compatibility. There exists better ways to make a better handling. 110 * However this should be changed in the next major version bump of dlm. 111 * 112 * Tail Size checking: 113 * 114 * There exists a message tail payload in e.g. DLM_MSG however we don't 115 * check it against the message length yet regarding to the receive buffer 116 * length. That need to be validated. 117 * 118 * Fencing bad nodes: 119 * 120 * At timeout places or weird sequence number behaviours we should send 121 * a fencing request to the cluster manager. 122 */ 123 124 /* Debug switch to enable a 5 seconds sleep waiting of a termination. 125 * This can be useful to test fencing while termination is running. 126 * This requires a setup with only gfs2 as dlm user, so that the 127 * last umount will terminate the connection. 128 * 129 * However it became useful to test, while the 5 seconds block in umount 130 * just press the reset button. In a lot of dropping the termination 131 * process can could take several seconds. 132 */ 133 #define DLM_DEBUG_FENCE_TERMINATION 0 134 135 #include <trace/events/dlm.h> 136 #include <net/tcp.h> 137 138 #include "dlm_internal.h" 139 #include "lowcomms.h" 140 #include "config.h" 141 #include "memory.h" 142 #include "lock.h" 143 #include "util.h" 144 #include "midcomms.h" 145 146 /* init value for sequence numbers for testing purpose only e.g. overflows */ 147 #define DLM_SEQ_INIT 0 148 /* 5 seconds wait to sync ending of dlm */ 149 #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(5000) 150 #define DLM_VERSION_NOT_SET 0 151 #define DLM_SEND_ACK_BACK_MSG_THRESHOLD 32 152 #define DLM_RECV_ACK_BACK_MSG_THRESHOLD (DLM_SEND_ACK_BACK_MSG_THRESHOLD * 8) 153 154 struct midcomms_node { 155 int nodeid; 156 uint32_t version; 157 atomic_t seq_send; 158 atomic_t seq_next; 159 /* These queues are unbound because we cannot drop any message in dlm. 160 * We could send a fence signal for a specific node to the cluster 161 * manager if queues hits some maximum value, however this handling 162 * not supported yet. 163 */ 164 struct list_head send_queue; 165 spinlock_t send_queue_lock; 166 atomic_t send_queue_cnt; 167 #define DLM_NODE_FLAG_CLOSE 1 168 #define DLM_NODE_FLAG_STOP_TX 2 169 #define DLM_NODE_FLAG_STOP_RX 3 170 atomic_t ulp_delivered; 171 unsigned long flags; 172 wait_queue_head_t shutdown_wait; 173 174 /* dlm tcp termination state */ 175 #define DLM_CLOSED 1 176 #define DLM_ESTABLISHED 2 177 #define DLM_FIN_WAIT1 3 178 #define DLM_FIN_WAIT2 4 179 #define DLM_CLOSE_WAIT 5 180 #define DLM_LAST_ACK 6 181 #define DLM_CLOSING 7 182 int state; 183 spinlock_t state_lock; 184 185 /* counts how many lockspaces are using this node 186 * this refcount is necessary to determine if the 187 * node wants to disconnect. 188 */ 189 int users; 190 191 /* not protected by srcu, node_hash lifetime */ 192 void *debugfs; 193 194 struct hlist_node hlist; 195 struct rcu_head rcu; 196 }; 197 198 struct dlm_mhandle { 199 const union dlm_packet *inner_p; 200 struct midcomms_node *node; 201 struct dlm_opts *opts; 202 struct dlm_msg *msg; 203 bool committed; 204 uint32_t seq; 205 206 void (*ack_rcv)(struct midcomms_node *node); 207 208 /* get_mhandle/commit srcu idx exchange */ 209 int idx; 210 211 struct list_head list; 212 struct rcu_head rcu; 213 }; 214 215 static struct hlist_head node_hash[CONN_HASH_SIZE]; 216 static DEFINE_SPINLOCK(nodes_lock); 217 DEFINE_STATIC_SRCU(nodes_srcu); 218 219 /* This mutex prevents that midcomms_close() is running while 220 * stop() or remove(). As I experienced invalid memory access 221 * behaviours when DLM_DEBUG_FENCE_TERMINATION is enabled and 222 * resetting machines. I will end in some double deletion in nodes 223 * datastructure. 224 */ 225 static DEFINE_MUTEX(close_lock); 226 227 struct kmem_cache *dlm_midcomms_cache_create(void) 228 { 229 return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle), 230 0, 0, NULL); 231 } 232 233 static inline const char *dlm_state_str(int state) 234 { 235 switch (state) { 236 case DLM_CLOSED: 237 return "CLOSED"; 238 case DLM_ESTABLISHED: 239 return "ESTABLISHED"; 240 case DLM_FIN_WAIT1: 241 return "FIN_WAIT1"; 242 case DLM_FIN_WAIT2: 243 return "FIN_WAIT2"; 244 case DLM_CLOSE_WAIT: 245 return "CLOSE_WAIT"; 246 case DLM_LAST_ACK: 247 return "LAST_ACK"; 248 case DLM_CLOSING: 249 return "CLOSING"; 250 default: 251 return "UNKNOWN"; 252 } 253 } 254 255 const char *dlm_midcomms_state(struct midcomms_node *node) 256 { 257 return dlm_state_str(node->state); 258 } 259 260 unsigned long dlm_midcomms_flags(struct midcomms_node *node) 261 { 262 return node->flags; 263 } 264 265 int dlm_midcomms_send_queue_cnt(struct midcomms_node *node) 266 { 267 return atomic_read(&node->send_queue_cnt); 268 } 269 270 uint32_t dlm_midcomms_version(struct midcomms_node *node) 271 { 272 return node->version; 273 } 274 275 static struct midcomms_node *__find_node(int nodeid, int r) 276 { 277 struct midcomms_node *node; 278 279 hlist_for_each_entry_rcu(node, &node_hash[r], hlist) { 280 if (node->nodeid == nodeid) 281 return node; 282 } 283 284 return NULL; 285 } 286 287 static void dlm_mhandle_release(struct rcu_head *rcu) 288 { 289 struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu); 290 291 dlm_lowcomms_put_msg(mh->msg); 292 dlm_free_mhandle(mh); 293 } 294 295 static void dlm_mhandle_delete(struct midcomms_node *node, 296 struct dlm_mhandle *mh) 297 { 298 list_del_rcu(&mh->list); 299 atomic_dec(&node->send_queue_cnt); 300 call_rcu(&mh->rcu, dlm_mhandle_release); 301 } 302 303 static void dlm_send_queue_flush(struct midcomms_node *node) 304 { 305 struct dlm_mhandle *mh; 306 307 pr_debug("flush midcomms send queue of node %d\n", node->nodeid); 308 309 rcu_read_lock(); 310 spin_lock_bh(&node->send_queue_lock); 311 list_for_each_entry_rcu(mh, &node->send_queue, list) { 312 dlm_mhandle_delete(node, mh); 313 } 314 spin_unlock_bh(&node->send_queue_lock); 315 rcu_read_unlock(); 316 } 317 318 static void midcomms_node_reset(struct midcomms_node *node) 319 { 320 pr_debug("reset node %d\n", node->nodeid); 321 322 atomic_set(&node->seq_next, DLM_SEQ_INIT); 323 atomic_set(&node->seq_send, DLM_SEQ_INIT); 324 atomic_set(&node->ulp_delivered, 0); 325 node->version = DLM_VERSION_NOT_SET; 326 node->flags = 0; 327 328 dlm_send_queue_flush(node); 329 node->state = DLM_CLOSED; 330 wake_up(&node->shutdown_wait); 331 } 332 333 static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) 334 { 335 struct midcomms_node *node, *tmp; 336 int r = nodeid_hash(nodeid); 337 338 node = __find_node(nodeid, r); 339 if (node || !alloc) 340 return node; 341 342 node = kmalloc(sizeof(*node), alloc); 343 if (!node) 344 return NULL; 345 346 node->nodeid = nodeid; 347 spin_lock_init(&node->state_lock); 348 spin_lock_init(&node->send_queue_lock); 349 atomic_set(&node->send_queue_cnt, 0); 350 INIT_LIST_HEAD(&node->send_queue); 351 init_waitqueue_head(&node->shutdown_wait); 352 node->users = 0; 353 midcomms_node_reset(node); 354 355 spin_lock(&nodes_lock); 356 /* check again if there was somebody else 357 * earlier here to add the node 358 */ 359 tmp = __find_node(nodeid, r); 360 if (tmp) { 361 spin_unlock(&nodes_lock); 362 kfree(node); 363 return tmp; 364 } 365 366 hlist_add_head_rcu(&node->hlist, &node_hash[r]); 367 spin_unlock(&nodes_lock); 368 369 node->debugfs = dlm_create_debug_comms_file(nodeid, node); 370 return node; 371 } 372 373 static int dlm_send_ack(int nodeid, uint32_t seq) 374 { 375 int mb_len = sizeof(struct dlm_header); 376 struct dlm_header *m_header; 377 struct dlm_msg *msg; 378 char *ppc; 379 380 msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_ATOMIC, &ppc, 381 NULL, NULL); 382 if (!msg) 383 return -ENOMEM; 384 385 m_header = (struct dlm_header *)ppc; 386 387 m_header->h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 388 m_header->h_nodeid = cpu_to_le32(dlm_our_nodeid()); 389 m_header->h_length = cpu_to_le16(mb_len); 390 m_header->h_cmd = DLM_ACK; 391 m_header->u.h_seq = cpu_to_le32(seq); 392 393 dlm_lowcomms_commit_msg(msg); 394 dlm_lowcomms_put_msg(msg); 395 396 return 0; 397 } 398 399 static void dlm_send_ack_threshold(struct midcomms_node *node, 400 uint32_t threshold) 401 { 402 uint32_t oval, nval; 403 bool send_ack; 404 405 /* let only send one user trigger threshold to send ack back */ 406 do { 407 oval = atomic_read(&node->ulp_delivered); 408 send_ack = (oval > threshold); 409 /* abort if threshold is not reached */ 410 if (!send_ack) 411 break; 412 413 nval = 0; 414 /* try to reset ulp_delivered counter */ 415 } while (atomic_cmpxchg(&node->ulp_delivered, oval, nval) != oval); 416 417 if (send_ack) 418 dlm_send_ack(node->nodeid, atomic_read(&node->seq_next)); 419 } 420 421 static int dlm_send_fin(struct midcomms_node *node, 422 void (*ack_rcv)(struct midcomms_node *node)) 423 { 424 int mb_len = sizeof(struct dlm_header); 425 struct dlm_header *m_header; 426 struct dlm_mhandle *mh; 427 char *ppc; 428 429 mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_ATOMIC, &ppc); 430 if (!mh) 431 return -ENOMEM; 432 433 set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags); 434 mh->ack_rcv = ack_rcv; 435 436 m_header = (struct dlm_header *)ppc; 437 438 m_header->h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 439 m_header->h_nodeid = cpu_to_le32(dlm_our_nodeid()); 440 m_header->h_length = cpu_to_le16(mb_len); 441 m_header->h_cmd = DLM_FIN; 442 443 pr_debug("sending fin msg to node %d\n", node->nodeid); 444 dlm_midcomms_commit_mhandle(mh, NULL, 0); 445 446 return 0; 447 } 448 449 static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq) 450 { 451 struct dlm_mhandle *mh; 452 453 rcu_read_lock(); 454 list_for_each_entry_rcu(mh, &node->send_queue, list) { 455 if (before(mh->seq, seq)) { 456 if (mh->ack_rcv) 457 mh->ack_rcv(node); 458 } else { 459 /* send queue should be ordered */ 460 break; 461 } 462 } 463 464 spin_lock_bh(&node->send_queue_lock); 465 list_for_each_entry_rcu(mh, &node->send_queue, list) { 466 if (before(mh->seq, seq)) { 467 dlm_mhandle_delete(node, mh); 468 } else { 469 /* send queue should be ordered */ 470 break; 471 } 472 } 473 spin_unlock_bh(&node->send_queue_lock); 474 rcu_read_unlock(); 475 } 476 477 static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) 478 { 479 spin_lock(&node->state_lock); 480 pr_debug("receive passive fin ack from node %d with state %s\n", 481 node->nodeid, dlm_state_str(node->state)); 482 483 switch (node->state) { 484 case DLM_LAST_ACK: 485 /* DLM_CLOSED */ 486 midcomms_node_reset(node); 487 break; 488 case DLM_CLOSED: 489 /* not valid but somehow we got what we want */ 490 wake_up(&node->shutdown_wait); 491 break; 492 default: 493 spin_unlock(&node->state_lock); 494 log_print("%s: unexpected state: %d", 495 __func__, node->state); 496 WARN_ON_ONCE(1); 497 return; 498 } 499 spin_unlock(&node->state_lock); 500 } 501 502 static void dlm_receive_buffer_3_2_trace(uint32_t seq, union dlm_packet *p) 503 { 504 switch (p->header.h_cmd) { 505 case DLM_MSG: 506 trace_dlm_recv_message(dlm_our_nodeid(), seq, &p->message); 507 break; 508 case DLM_RCOM: 509 trace_dlm_recv_rcom(dlm_our_nodeid(), seq, &p->rcom); 510 break; 511 default: 512 break; 513 } 514 } 515 516 static void dlm_midcomms_receive_buffer(union dlm_packet *p, 517 struct midcomms_node *node, 518 uint32_t seq) 519 { 520 bool is_expected_seq; 521 uint32_t oval, nval; 522 523 do { 524 oval = atomic_read(&node->seq_next); 525 is_expected_seq = (oval == seq); 526 if (!is_expected_seq) 527 break; 528 529 nval = oval + 1; 530 } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval); 531 532 if (is_expected_seq) { 533 switch (p->header.h_cmd) { 534 case DLM_FIN: 535 spin_lock(&node->state_lock); 536 pr_debug("receive fin msg from node %d with state %s\n", 537 node->nodeid, dlm_state_str(node->state)); 538 539 switch (node->state) { 540 case DLM_ESTABLISHED: 541 dlm_send_ack(node->nodeid, nval); 542 543 /* passive shutdown DLM_LAST_ACK case 1 544 * additional we check if the node is used by 545 * cluster manager events at all. 546 */ 547 if (node->users == 0) { 548 node->state = DLM_LAST_ACK; 549 pr_debug("switch node %d to state %s case 1\n", 550 node->nodeid, dlm_state_str(node->state)); 551 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); 552 dlm_send_fin(node, dlm_pas_fin_ack_rcv); 553 } else { 554 node->state = DLM_CLOSE_WAIT; 555 pr_debug("switch node %d to state %s\n", 556 node->nodeid, dlm_state_str(node->state)); 557 } 558 break; 559 case DLM_FIN_WAIT1: 560 dlm_send_ack(node->nodeid, nval); 561 node->state = DLM_CLOSING; 562 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); 563 pr_debug("switch node %d to state %s\n", 564 node->nodeid, dlm_state_str(node->state)); 565 break; 566 case DLM_FIN_WAIT2: 567 dlm_send_ack(node->nodeid, nval); 568 midcomms_node_reset(node); 569 pr_debug("switch node %d to state %s\n", 570 node->nodeid, dlm_state_str(node->state)); 571 break; 572 case DLM_LAST_ACK: 573 /* probably remove_member caught it, do nothing */ 574 break; 575 default: 576 spin_unlock(&node->state_lock); 577 log_print("%s: unexpected state: %d", 578 __func__, node->state); 579 WARN_ON_ONCE(1); 580 return; 581 } 582 spin_unlock(&node->state_lock); 583 break; 584 default: 585 WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); 586 dlm_receive_buffer_3_2_trace(seq, p); 587 dlm_receive_buffer(p, node->nodeid); 588 atomic_inc(&node->ulp_delivered); 589 /* unlikely case to send ack back when we don't transmit */ 590 dlm_send_ack_threshold(node, DLM_RECV_ACK_BACK_MSG_THRESHOLD); 591 break; 592 } 593 } else { 594 /* retry to ack message which we already have by sending back 595 * current node->seq_next number as ack. 596 */ 597 if (seq < oval) 598 dlm_send_ack(node->nodeid, oval); 599 600 log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", 601 seq, oval, node->nodeid); 602 } 603 } 604 605 static struct midcomms_node * 606 dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p, 607 uint16_t msglen, int (*cb)(struct midcomms_node *node)) 608 { 609 struct midcomms_node *node = NULL; 610 gfp_t allocation = 0; 611 int ret; 612 613 switch (p->header.h_cmd) { 614 case DLM_RCOM: 615 if (msglen < sizeof(struct dlm_rcom)) { 616 log_print("rcom msg too small: %u, will skip this message from node %d", 617 msglen, nodeid); 618 return NULL; 619 } 620 621 switch (p->rcom.rc_type) { 622 case cpu_to_le32(DLM_RCOM_NAMES): 623 fallthrough; 624 case cpu_to_le32(DLM_RCOM_NAMES_REPLY): 625 fallthrough; 626 case cpu_to_le32(DLM_RCOM_STATUS): 627 fallthrough; 628 case cpu_to_le32(DLM_RCOM_STATUS_REPLY): 629 node = nodeid2node(nodeid, 0); 630 if (node) { 631 spin_lock(&node->state_lock); 632 if (node->state != DLM_ESTABLISHED) 633 pr_debug("receive begin RCOM msg from node %d with state %s\n", 634 node->nodeid, dlm_state_str(node->state)); 635 636 switch (node->state) { 637 case DLM_CLOSED: 638 node->state = DLM_ESTABLISHED; 639 pr_debug("switch node %d to state %s\n", 640 node->nodeid, dlm_state_str(node->state)); 641 break; 642 case DLM_ESTABLISHED: 643 break; 644 default: 645 spin_unlock(&node->state_lock); 646 return NULL; 647 } 648 spin_unlock(&node->state_lock); 649 } 650 651 allocation = GFP_NOFS; 652 break; 653 default: 654 break; 655 } 656 657 break; 658 default: 659 break; 660 } 661 662 node = nodeid2node(nodeid, allocation); 663 if (!node) { 664 switch (p->header.h_cmd) { 665 case DLM_OPTS: 666 if (msglen < sizeof(struct dlm_opts)) { 667 log_print("opts msg too small: %u, will skip this message from node %d", 668 msglen, nodeid); 669 return NULL; 670 } 671 672 log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence", 673 p->opts.o_nextcmd, nodeid); 674 break; 675 default: 676 log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence", 677 p->header.h_cmd, nodeid); 678 break; 679 } 680 681 return NULL; 682 } 683 684 ret = cb(node); 685 if (ret < 0) 686 return NULL; 687 688 return node; 689 } 690 691 static int dlm_midcomms_version_check_3_2(struct midcomms_node *node) 692 { 693 switch (node->version) { 694 case DLM_VERSION_NOT_SET: 695 node->version = DLM_VERSION_3_2; 696 wake_up(&node->shutdown_wait); 697 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2, 698 node->nodeid); 699 break; 700 case DLM_VERSION_3_2: 701 break; 702 default: 703 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", 704 DLM_VERSION_3_2, node->nodeid, node->version); 705 return -1; 706 } 707 708 return 0; 709 } 710 711 static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid) 712 { 713 int len = msglen; 714 715 /* we only trust outer header msglen because 716 * it's checked against receive buffer length. 717 */ 718 if (len < sizeof(struct dlm_opts)) 719 return -1; 720 len -= sizeof(struct dlm_opts); 721 722 if (len < le16_to_cpu(p->opts.o_optlen)) 723 return -1; 724 len -= le16_to_cpu(p->opts.o_optlen); 725 726 switch (p->opts.o_nextcmd) { 727 case DLM_FIN: 728 if (len < sizeof(struct dlm_header)) { 729 log_print("fin too small: %d, will skip this message from node %d", 730 len, nodeid); 731 return -1; 732 } 733 734 break; 735 case DLM_MSG: 736 if (len < sizeof(struct dlm_message)) { 737 log_print("msg too small: %d, will skip this message from node %d", 738 msglen, nodeid); 739 return -1; 740 } 741 742 break; 743 case DLM_RCOM: 744 if (len < sizeof(struct dlm_rcom)) { 745 log_print("rcom msg too small: %d, will skip this message from node %d", 746 len, nodeid); 747 return -1; 748 } 749 750 break; 751 default: 752 log_print("unsupported o_nextcmd received: %u, will skip this message from node %d", 753 p->opts.o_nextcmd, nodeid); 754 return -1; 755 } 756 757 return 0; 758 } 759 760 static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) 761 { 762 uint16_t msglen = le16_to_cpu(p->header.h_length); 763 struct midcomms_node *node; 764 uint32_t seq; 765 int ret, idx; 766 767 idx = srcu_read_lock(&nodes_srcu); 768 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, 769 dlm_midcomms_version_check_3_2); 770 if (!node) 771 goto out; 772 773 switch (p->header.h_cmd) { 774 case DLM_RCOM: 775 /* these rcom message we use to determine version. 776 * they have their own retransmission handling and 777 * are the first messages of dlm. 778 * 779 * length already checked. 780 */ 781 switch (p->rcom.rc_type) { 782 case cpu_to_le32(DLM_RCOM_NAMES): 783 fallthrough; 784 case cpu_to_le32(DLM_RCOM_NAMES_REPLY): 785 fallthrough; 786 case cpu_to_le32(DLM_RCOM_STATUS): 787 fallthrough; 788 case cpu_to_le32(DLM_RCOM_STATUS_REPLY): 789 break; 790 default: 791 log_print("unsupported rcom type received: %u, will skip this message from node %d", 792 le32_to_cpu(p->rcom.rc_type), nodeid); 793 goto out; 794 } 795 796 WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); 797 dlm_receive_buffer(p, nodeid); 798 break; 799 case DLM_OPTS: 800 seq = le32_to_cpu(p->header.u.h_seq); 801 802 ret = dlm_opts_check_msglen(p, msglen, nodeid); 803 if (ret < 0) { 804 log_print("opts msg too small: %u, will skip this message from node %d", 805 msglen, nodeid); 806 goto out; 807 } 808 809 p = (union dlm_packet *)((unsigned char *)p->opts.o_opts + 810 le16_to_cpu(p->opts.o_optlen)); 811 812 /* recheck inner msglen just if it's not garbage */ 813 msglen = le16_to_cpu(p->header.h_length); 814 switch (p->header.h_cmd) { 815 case DLM_RCOM: 816 if (msglen < sizeof(struct dlm_rcom)) { 817 log_print("inner rcom msg too small: %u, will skip this message from node %d", 818 msglen, nodeid); 819 goto out; 820 } 821 822 break; 823 case DLM_MSG: 824 if (msglen < sizeof(struct dlm_message)) { 825 log_print("inner msg too small: %u, will skip this message from node %d", 826 msglen, nodeid); 827 goto out; 828 } 829 830 break; 831 case DLM_FIN: 832 if (msglen < sizeof(struct dlm_header)) { 833 log_print("inner fin too small: %u, will skip this message from node %d", 834 msglen, nodeid); 835 goto out; 836 } 837 838 break; 839 default: 840 log_print("unsupported inner h_cmd received: %u, will skip this message from node %d", 841 msglen, nodeid); 842 goto out; 843 } 844 845 dlm_midcomms_receive_buffer(p, node, seq); 846 break; 847 case DLM_ACK: 848 seq = le32_to_cpu(p->header.u.h_seq); 849 dlm_receive_ack(node, seq); 850 break; 851 default: 852 log_print("unsupported h_cmd received: %u, will skip this message from node %d", 853 p->header.h_cmd, nodeid); 854 break; 855 } 856 857 out: 858 srcu_read_unlock(&nodes_srcu, idx); 859 } 860 861 static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) 862 { 863 switch (node->version) { 864 case DLM_VERSION_NOT_SET: 865 node->version = DLM_VERSION_3_1; 866 wake_up(&node->shutdown_wait); 867 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1, 868 node->nodeid); 869 break; 870 case DLM_VERSION_3_1: 871 break; 872 default: 873 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", 874 DLM_VERSION_3_1, node->nodeid, node->version); 875 return -1; 876 } 877 878 return 0; 879 } 880 881 static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid) 882 { 883 uint16_t msglen = le16_to_cpu(p->header.h_length); 884 struct midcomms_node *node; 885 int idx; 886 887 idx = srcu_read_lock(&nodes_srcu); 888 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, 889 dlm_midcomms_version_check_3_1); 890 if (!node) { 891 srcu_read_unlock(&nodes_srcu, idx); 892 return; 893 } 894 srcu_read_unlock(&nodes_srcu, idx); 895 896 switch (p->header.h_cmd) { 897 case DLM_RCOM: 898 /* length already checked */ 899 break; 900 case DLM_MSG: 901 if (msglen < sizeof(struct dlm_message)) { 902 log_print("msg too small: %u, will skip this message from node %d", 903 msglen, nodeid); 904 return; 905 } 906 907 break; 908 default: 909 log_print("unsupported h_cmd received: %u, will skip this message from node %d", 910 p->header.h_cmd, nodeid); 911 return; 912 } 913 914 dlm_receive_buffer(p, nodeid); 915 } 916 917 int dlm_validate_incoming_buffer(int nodeid, unsigned char *buf, int len) 918 { 919 const unsigned char *ptr = buf; 920 const struct dlm_header *hd; 921 uint16_t msglen; 922 int ret = 0; 923 924 while (len >= sizeof(struct dlm_header)) { 925 hd = (struct dlm_header *)ptr; 926 927 /* no message should be more than DLM_MAX_SOCKET_BUFSIZE or 928 * less than dlm_header size. 929 * 930 * Some messages does not have a 8 byte length boundary yet 931 * which can occur in a unaligned memory access of some dlm 932 * messages. However this problem need to be fixed at the 933 * sending side, for now it seems nobody run into architecture 934 * related issues yet but it slows down some processing. 935 * Fixing this issue should be scheduled in future by doing 936 * the next major version bump. 937 */ 938 msglen = le16_to_cpu(hd->h_length); 939 if (msglen > DLM_MAX_SOCKET_BUFSIZE || 940 msglen < sizeof(struct dlm_header)) { 941 log_print("received invalid length header: %u from node %d, will abort message parsing", 942 msglen, nodeid); 943 return -EBADMSG; 944 } 945 946 /* caller will take care that leftover 947 * will be parsed next call with more data 948 */ 949 if (msglen > len) 950 break; 951 952 ret += msglen; 953 len -= msglen; 954 ptr += msglen; 955 } 956 957 return ret; 958 } 959 960 /* 961 * Called from the low-level comms layer to process a buffer of 962 * commands. 963 */ 964 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) 965 { 966 const unsigned char *ptr = buf; 967 const struct dlm_header *hd; 968 uint16_t msglen; 969 int ret = 0; 970 971 while (len >= sizeof(struct dlm_header)) { 972 hd = (struct dlm_header *)ptr; 973 974 msglen = le16_to_cpu(hd->h_length); 975 if (msglen > len) 976 break; 977 978 switch (hd->h_version) { 979 case cpu_to_le32(DLM_VERSION_3_1): 980 dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid); 981 break; 982 case cpu_to_le32(DLM_VERSION_3_2): 983 dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid); 984 break; 985 default: 986 log_print("received invalid version header: %u from node %d, will skip this message", 987 le32_to_cpu(hd->h_version), nodeid); 988 break; 989 } 990 991 ret += msglen; 992 len -= msglen; 993 ptr += msglen; 994 } 995 996 return ret; 997 } 998 999 void dlm_midcomms_unack_msg_resend(int nodeid) 1000 { 1001 struct midcomms_node *node; 1002 struct dlm_mhandle *mh; 1003 int idx, ret; 1004 1005 idx = srcu_read_lock(&nodes_srcu); 1006 node = nodeid2node(nodeid, 0); 1007 if (!node) { 1008 srcu_read_unlock(&nodes_srcu, idx); 1009 return; 1010 } 1011 1012 /* old protocol, we don't support to retransmit on failure */ 1013 switch (node->version) { 1014 case DLM_VERSION_3_2: 1015 break; 1016 default: 1017 srcu_read_unlock(&nodes_srcu, idx); 1018 return; 1019 } 1020 1021 rcu_read_lock(); 1022 list_for_each_entry_rcu(mh, &node->send_queue, list) { 1023 if (!mh->committed) 1024 continue; 1025 1026 ret = dlm_lowcomms_resend_msg(mh->msg); 1027 if (!ret) 1028 log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d", 1029 mh->seq, node->nodeid); 1030 } 1031 rcu_read_unlock(); 1032 srcu_read_unlock(&nodes_srcu, idx); 1033 } 1034 1035 static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len, 1036 uint32_t seq) 1037 { 1038 opts->o_header.h_cmd = DLM_OPTS; 1039 opts->o_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 1040 opts->o_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 1041 opts->o_header.h_length = cpu_to_le16(DLM_MIDCOMMS_OPT_LEN + inner_len); 1042 opts->o_header.u.h_seq = cpu_to_le32(seq); 1043 } 1044 1045 static void midcomms_new_msg_cb(void *data) 1046 { 1047 struct dlm_mhandle *mh = data; 1048 1049 atomic_inc(&mh->node->send_queue_cnt); 1050 1051 spin_lock_bh(&mh->node->send_queue_lock); 1052 list_add_tail_rcu(&mh->list, &mh->node->send_queue); 1053 spin_unlock_bh(&mh->node->send_queue_lock); 1054 1055 mh->seq = atomic_fetch_inc(&mh->node->seq_send); 1056 } 1057 1058 static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, 1059 int len, gfp_t allocation, char **ppc) 1060 { 1061 struct dlm_opts *opts; 1062 struct dlm_msg *msg; 1063 1064 msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN, 1065 allocation, ppc, midcomms_new_msg_cb, mh); 1066 if (!msg) 1067 return NULL; 1068 1069 opts = (struct dlm_opts *)*ppc; 1070 mh->opts = opts; 1071 1072 /* add possible options here */ 1073 dlm_fill_opts_header(opts, len, mh->seq); 1074 1075 *ppc += sizeof(*opts); 1076 mh->inner_p = (const union dlm_packet *)*ppc; 1077 return msg; 1078 } 1079 1080 /* avoid false positive for nodes_srcu, unlock happens in 1081 * dlm_midcomms_commit_mhandle which is a must call if success 1082 */ 1083 #ifndef __CHECKER__ 1084 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, 1085 gfp_t allocation, char **ppc) 1086 { 1087 struct midcomms_node *node; 1088 struct dlm_mhandle *mh; 1089 struct dlm_msg *msg; 1090 int idx; 1091 1092 idx = srcu_read_lock(&nodes_srcu); 1093 node = nodeid2node(nodeid, 0); 1094 if (!node) { 1095 WARN_ON_ONCE(1); 1096 goto err; 1097 } 1098 1099 /* this is a bug, however we going on and hope it will be resolved */ 1100 WARN_ON_ONCE(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); 1101 1102 mh = dlm_allocate_mhandle(allocation); 1103 if (!mh) 1104 goto err; 1105 1106 mh->committed = false; 1107 mh->ack_rcv = NULL; 1108 mh->idx = idx; 1109 mh->node = node; 1110 1111 switch (node->version) { 1112 case DLM_VERSION_3_1: 1113 msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc, 1114 NULL, NULL); 1115 if (!msg) { 1116 dlm_free_mhandle(mh); 1117 goto err; 1118 } 1119 1120 break; 1121 case DLM_VERSION_3_2: 1122 msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation, 1123 ppc); 1124 if (!msg) { 1125 dlm_free_mhandle(mh); 1126 goto err; 1127 } 1128 1129 /* send ack back if necessary */ 1130 dlm_send_ack_threshold(node, DLM_SEND_ACK_BACK_MSG_THRESHOLD); 1131 break; 1132 default: 1133 dlm_free_mhandle(mh); 1134 WARN_ON_ONCE(1); 1135 goto err; 1136 } 1137 1138 mh->msg = msg; 1139 1140 /* keep in mind that is a must to call 1141 * dlm_midcomms_commit_msg() which releases 1142 * nodes_srcu using mh->idx which is assumed 1143 * here that the application will call it. 1144 */ 1145 return mh; 1146 1147 err: 1148 srcu_read_unlock(&nodes_srcu, idx); 1149 return NULL; 1150 } 1151 #endif 1152 1153 static void dlm_midcomms_commit_msg_3_2_trace(const struct dlm_mhandle *mh, 1154 const void *name, int namelen) 1155 { 1156 switch (mh->inner_p->header.h_cmd) { 1157 case DLM_MSG: 1158 trace_dlm_send_message(mh->node->nodeid, mh->seq, 1159 &mh->inner_p->message, 1160 name, namelen); 1161 break; 1162 case DLM_RCOM: 1163 trace_dlm_send_rcom(mh->node->nodeid, mh->seq, 1164 &mh->inner_p->rcom); 1165 break; 1166 default: 1167 /* nothing to trace */ 1168 break; 1169 } 1170 } 1171 1172 static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh, 1173 const void *name, int namelen) 1174 { 1175 /* nexthdr chain for fast lookup */ 1176 mh->opts->o_nextcmd = mh->inner_p->header.h_cmd; 1177 mh->committed = true; 1178 dlm_midcomms_commit_msg_3_2_trace(mh, name, namelen); 1179 dlm_lowcomms_commit_msg(mh->msg); 1180 } 1181 1182 /* avoid false positive for nodes_srcu, lock was happen in 1183 * dlm_midcomms_get_mhandle 1184 */ 1185 #ifndef __CHECKER__ 1186 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh, 1187 const void *name, int namelen) 1188 { 1189 1190 switch (mh->node->version) { 1191 case DLM_VERSION_3_1: 1192 srcu_read_unlock(&nodes_srcu, mh->idx); 1193 1194 dlm_lowcomms_commit_msg(mh->msg); 1195 dlm_lowcomms_put_msg(mh->msg); 1196 /* mh is not part of rcu list in this case */ 1197 dlm_free_mhandle(mh); 1198 break; 1199 case DLM_VERSION_3_2: 1200 /* held rcu read lock here, because we sending the 1201 * dlm message out, when we do that we could receive 1202 * an ack back which releases the mhandle and we 1203 * get a use after free. 1204 */ 1205 rcu_read_lock(); 1206 dlm_midcomms_commit_msg_3_2(mh, name, namelen); 1207 srcu_read_unlock(&nodes_srcu, mh->idx); 1208 rcu_read_unlock(); 1209 break; 1210 default: 1211 srcu_read_unlock(&nodes_srcu, mh->idx); 1212 WARN_ON_ONCE(1); 1213 break; 1214 } 1215 } 1216 #endif 1217 1218 int dlm_midcomms_start(void) 1219 { 1220 return dlm_lowcomms_start(); 1221 } 1222 1223 void dlm_midcomms_stop(void) 1224 { 1225 dlm_lowcomms_stop(); 1226 } 1227 1228 void dlm_midcomms_init(void) 1229 { 1230 int i; 1231 1232 for (i = 0; i < CONN_HASH_SIZE; i++) 1233 INIT_HLIST_HEAD(&node_hash[i]); 1234 1235 dlm_lowcomms_init(); 1236 } 1237 1238 void dlm_midcomms_exit(void) 1239 { 1240 dlm_lowcomms_exit(); 1241 } 1242 1243 static void dlm_act_fin_ack_rcv(struct midcomms_node *node) 1244 { 1245 spin_lock(&node->state_lock); 1246 pr_debug("receive active fin ack from node %d with state %s\n", 1247 node->nodeid, dlm_state_str(node->state)); 1248 1249 switch (node->state) { 1250 case DLM_FIN_WAIT1: 1251 node->state = DLM_FIN_WAIT2; 1252 pr_debug("switch node %d to state %s\n", 1253 node->nodeid, dlm_state_str(node->state)); 1254 break; 1255 case DLM_CLOSING: 1256 midcomms_node_reset(node); 1257 pr_debug("switch node %d to state %s\n", 1258 node->nodeid, dlm_state_str(node->state)); 1259 break; 1260 case DLM_CLOSED: 1261 /* not valid but somehow we got what we want */ 1262 wake_up(&node->shutdown_wait); 1263 break; 1264 default: 1265 spin_unlock(&node->state_lock); 1266 log_print("%s: unexpected state: %d", 1267 __func__, node->state); 1268 WARN_ON_ONCE(1); 1269 return; 1270 } 1271 spin_unlock(&node->state_lock); 1272 } 1273 1274 void dlm_midcomms_add_member(int nodeid) 1275 { 1276 struct midcomms_node *node; 1277 int idx; 1278 1279 idx = srcu_read_lock(&nodes_srcu); 1280 node = nodeid2node(nodeid, GFP_NOFS); 1281 if (!node) { 1282 srcu_read_unlock(&nodes_srcu, idx); 1283 return; 1284 } 1285 1286 spin_lock(&node->state_lock); 1287 if (!node->users) { 1288 pr_debug("receive add member from node %d with state %s\n", 1289 node->nodeid, dlm_state_str(node->state)); 1290 switch (node->state) { 1291 case DLM_ESTABLISHED: 1292 break; 1293 case DLM_CLOSED: 1294 node->state = DLM_ESTABLISHED; 1295 pr_debug("switch node %d to state %s\n", 1296 node->nodeid, dlm_state_str(node->state)); 1297 break; 1298 default: 1299 /* some invalid state passive shutdown 1300 * was failed, we try to reset and 1301 * hope it will go on. 1302 */ 1303 log_print("reset node %d because shutdown stuck", 1304 node->nodeid); 1305 1306 midcomms_node_reset(node); 1307 node->state = DLM_ESTABLISHED; 1308 break; 1309 } 1310 } 1311 1312 node->users++; 1313 pr_debug("node %d users inc count %d\n", nodeid, node->users); 1314 spin_unlock(&node->state_lock); 1315 1316 srcu_read_unlock(&nodes_srcu, idx); 1317 } 1318 1319 void dlm_midcomms_remove_member(int nodeid) 1320 { 1321 struct midcomms_node *node; 1322 int idx; 1323 1324 idx = srcu_read_lock(&nodes_srcu); 1325 node = nodeid2node(nodeid, 0); 1326 if (!node) { 1327 srcu_read_unlock(&nodes_srcu, idx); 1328 return; 1329 } 1330 1331 spin_lock(&node->state_lock); 1332 node->users--; 1333 pr_debug("node %d users dec count %d\n", nodeid, node->users); 1334 1335 /* hitting users count to zero means the 1336 * other side is running dlm_midcomms_stop() 1337 * we meet us to have a clean disconnect. 1338 */ 1339 if (node->users == 0) { 1340 pr_debug("receive remove member from node %d with state %s\n", 1341 node->nodeid, dlm_state_str(node->state)); 1342 switch (node->state) { 1343 case DLM_ESTABLISHED: 1344 break; 1345 case DLM_CLOSE_WAIT: 1346 /* passive shutdown DLM_LAST_ACK case 2 */ 1347 node->state = DLM_LAST_ACK; 1348 pr_debug("switch node %d to state %s case 2\n", 1349 node->nodeid, dlm_state_str(node->state)); 1350 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); 1351 dlm_send_fin(node, dlm_pas_fin_ack_rcv); 1352 break; 1353 case DLM_LAST_ACK: 1354 /* probably receive fin caught it, do nothing */ 1355 break; 1356 case DLM_CLOSED: 1357 /* already gone, do nothing */ 1358 break; 1359 default: 1360 log_print("%s: unexpected state: %d", 1361 __func__, node->state); 1362 break; 1363 } 1364 } 1365 spin_unlock(&node->state_lock); 1366 1367 srcu_read_unlock(&nodes_srcu, idx); 1368 } 1369 1370 static void midcomms_node_release(struct rcu_head *rcu) 1371 { 1372 struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu); 1373 1374 WARN_ON_ONCE(atomic_read(&node->send_queue_cnt)); 1375 dlm_send_queue_flush(node); 1376 kfree(node); 1377 } 1378 1379 void dlm_midcomms_version_wait(void) 1380 { 1381 struct midcomms_node *node; 1382 int i, idx, ret; 1383 1384 idx = srcu_read_lock(&nodes_srcu); 1385 for (i = 0; i < CONN_HASH_SIZE; i++) { 1386 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { 1387 ret = wait_event_timeout(node->shutdown_wait, 1388 node->version != DLM_VERSION_NOT_SET || 1389 node->state == DLM_CLOSED || 1390 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags), 1391 DLM_SHUTDOWN_TIMEOUT); 1392 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) 1393 pr_debug("version wait timed out for node %d with state %s\n", 1394 node->nodeid, dlm_state_str(node->state)); 1395 } 1396 } 1397 srcu_read_unlock(&nodes_srcu, idx); 1398 } 1399 1400 static void midcomms_shutdown(struct midcomms_node *node) 1401 { 1402 int ret; 1403 1404 /* old protocol, we don't wait for pending operations */ 1405 switch (node->version) { 1406 case DLM_VERSION_3_2: 1407 break; 1408 default: 1409 return; 1410 } 1411 1412 spin_lock(&node->state_lock); 1413 pr_debug("receive active shutdown for node %d with state %s\n", 1414 node->nodeid, dlm_state_str(node->state)); 1415 switch (node->state) { 1416 case DLM_ESTABLISHED: 1417 node->state = DLM_FIN_WAIT1; 1418 pr_debug("switch node %d to state %s case 2\n", 1419 node->nodeid, dlm_state_str(node->state)); 1420 dlm_send_fin(node, dlm_act_fin_ack_rcv); 1421 break; 1422 case DLM_CLOSED: 1423 /* we have what we want */ 1424 break; 1425 default: 1426 /* busy to enter DLM_FIN_WAIT1, wait until passive 1427 * done in shutdown_wait to enter DLM_CLOSED. 1428 */ 1429 break; 1430 } 1431 spin_unlock(&node->state_lock); 1432 1433 if (DLM_DEBUG_FENCE_TERMINATION) 1434 msleep(5000); 1435 1436 /* wait for other side dlm + fin */ 1437 ret = wait_event_timeout(node->shutdown_wait, 1438 node->state == DLM_CLOSED || 1439 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags), 1440 DLM_SHUTDOWN_TIMEOUT); 1441 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) 1442 pr_debug("active shutdown timed out for node %d with state %s\n", 1443 node->nodeid, dlm_state_str(node->state)); 1444 else 1445 pr_debug("active shutdown done for node %d with state %s\n", 1446 node->nodeid, dlm_state_str(node->state)); 1447 } 1448 1449 void dlm_midcomms_shutdown(void) 1450 { 1451 struct midcomms_node *node; 1452 int i, idx; 1453 1454 mutex_lock(&close_lock); 1455 idx = srcu_read_lock(&nodes_srcu); 1456 for (i = 0; i < CONN_HASH_SIZE; i++) { 1457 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { 1458 midcomms_shutdown(node); 1459 1460 dlm_delete_debug_comms_file(node->debugfs); 1461 1462 spin_lock(&nodes_lock); 1463 hlist_del_rcu(&node->hlist); 1464 spin_unlock(&nodes_lock); 1465 1466 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); 1467 } 1468 } 1469 srcu_read_unlock(&nodes_srcu, idx); 1470 mutex_unlock(&close_lock); 1471 1472 dlm_lowcomms_shutdown(); 1473 } 1474 1475 int dlm_midcomms_close(int nodeid) 1476 { 1477 struct midcomms_node *node; 1478 int idx, ret; 1479 1480 idx = srcu_read_lock(&nodes_srcu); 1481 /* Abort pending close/remove operation */ 1482 node = nodeid2node(nodeid, 0); 1483 if (node) { 1484 /* let shutdown waiters leave */ 1485 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags); 1486 wake_up(&node->shutdown_wait); 1487 } 1488 srcu_read_unlock(&nodes_srcu, idx); 1489 1490 synchronize_srcu(&nodes_srcu); 1491 1492 idx = srcu_read_lock(&nodes_srcu); 1493 mutex_lock(&close_lock); 1494 node = nodeid2node(nodeid, 0); 1495 if (!node) { 1496 mutex_unlock(&close_lock); 1497 srcu_read_unlock(&nodes_srcu, idx); 1498 return dlm_lowcomms_close(nodeid); 1499 } 1500 1501 ret = dlm_lowcomms_close(nodeid); 1502 spin_lock(&node->state_lock); 1503 midcomms_node_reset(node); 1504 spin_unlock(&node->state_lock); 1505 srcu_read_unlock(&nodes_srcu, idx); 1506 mutex_unlock(&close_lock); 1507 1508 return ret; 1509 } 1510 1511 /* debug functionality to send raw dlm msg from user space */ 1512 struct dlm_rawmsg_data { 1513 struct midcomms_node *node; 1514 void *buf; 1515 }; 1516 1517 static void midcomms_new_rawmsg_cb(void *data) 1518 { 1519 struct dlm_rawmsg_data *rd = data; 1520 struct dlm_header *h = rd->buf; 1521 1522 switch (h->h_version) { 1523 case cpu_to_le32(DLM_VERSION_3_1): 1524 break; 1525 default: 1526 switch (h->h_cmd) { 1527 case DLM_OPTS: 1528 if (!h->u.h_seq) 1529 h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send)); 1530 break; 1531 default: 1532 break; 1533 } 1534 break; 1535 } 1536 } 1537 1538 int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf, 1539 int buflen) 1540 { 1541 struct dlm_rawmsg_data rd; 1542 struct dlm_msg *msg; 1543 char *msgbuf; 1544 1545 rd.node = node; 1546 rd.buf = buf; 1547 1548 msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS, 1549 &msgbuf, midcomms_new_rawmsg_cb, &rd); 1550 if (!msg) 1551 return -ENOMEM; 1552 1553 memcpy(msgbuf, buf, buflen); 1554 dlm_lowcomms_commit_msg(msg); 1555 return 0; 1556 } 1557 1558