1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include "dlm_internal.h" 13 #include "lockspace.h" 14 #include "member.h" 15 #include "lowcomms.h" 16 #include "midcomms.h" 17 #include "rcom.h" 18 #include "recover.h" 19 #include "dir.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "util.h" 24 25 static int rcom_response(struct dlm_ls *ls) 26 { 27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags); 28 } 29 30 static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 31 struct dlm_rcom **rc_ret, char *mb, int mb_len) 32 { 33 struct dlm_rcom *rc; 34 35 rc = (struct dlm_rcom *) mb; 36 37 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 38 rc->rc_header.u.h_lockspace = ls->ls_global_id; 39 rc->rc_header.h_nodeid = dlm_our_nodeid(); 40 rc->rc_header.h_length = mb_len; 41 rc->rc_header.h_cmd = DLM_RCOM; 42 43 rc->rc_type = type; 44 45 spin_lock(&ls->ls_recover_lock); 46 rc->rc_seq = ls->ls_recover_seq; 47 spin_unlock(&ls->ls_recover_lock); 48 49 *rc_ret = rc; 50 } 51 52 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 53 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) 54 { 55 int mb_len = sizeof(struct dlm_rcom) + len; 56 struct dlm_mhandle *mh; 57 char *mb; 58 59 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb); 60 if (!mh) { 61 log_print("%s to %d type %d len %d ENOBUFS", 62 __func__, to_nodeid, type, len); 63 return -ENOBUFS; 64 } 65 66 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); 67 *mh_ret = mh; 68 return 0; 69 } 70 71 static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, 72 int len, struct dlm_rcom **rc_ret, 73 struct dlm_msg **msg_ret) 74 { 75 int mb_len = sizeof(struct dlm_rcom) + len; 76 struct dlm_msg *msg; 77 char *mb; 78 79 msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, GFP_NOFS, &mb, 80 NULL, NULL); 81 if (!msg) { 82 log_print("create_rcom to %d type %d len %d ENOBUFS", 83 to_nodeid, type, len); 84 return -ENOBUFS; 85 } 86 87 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len); 88 *msg_ret = msg; 89 return 0; 90 } 91 92 static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc) 93 { 94 dlm_rcom_out(rc); 95 dlm_midcomms_commit_mhandle(mh); 96 } 97 98 static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc) 99 { 100 dlm_rcom_out(rc); 101 dlm_lowcomms_commit_msg(msg); 102 dlm_lowcomms_put_msg(msg); 103 } 104 105 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs, 106 uint32_t flags) 107 { 108 rs->rs_flags = cpu_to_le32(flags); 109 } 110 111 /* When replying to a status request, a node also sends back its 112 configuration values. The requesting node then checks that the remote 113 node is configured the same way as itself. */ 114 115 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf, 116 uint32_t num_slots) 117 { 118 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 119 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 120 121 rf->rf_our_slot = cpu_to_le16(ls->ls_slot); 122 rf->rf_num_slots = cpu_to_le16(num_slots); 123 rf->rf_generation = cpu_to_le32(ls->ls_generation); 124 } 125 126 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 127 { 128 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 129 130 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { 131 log_error(ls, "version mismatch: %x nodeid %d: %x", 132 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid, 133 rc->rc_header.h_version); 134 return -EPROTO; 135 } 136 137 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen || 138 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) { 139 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 140 ls->ls_lvblen, ls->ls_exflags, nodeid, 141 le32_to_cpu(rf->rf_lvblen), 142 le32_to_cpu(rf->rf_lsflags)); 143 return -EPROTO; 144 } 145 return 0; 146 } 147 148 static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq) 149 { 150 spin_lock(&ls->ls_rcom_spin); 151 *new_seq = ++ls->ls_rcom_seq; 152 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 153 spin_unlock(&ls->ls_rcom_spin); 154 } 155 156 static void disallow_sync_reply(struct dlm_ls *ls) 157 { 158 spin_lock(&ls->ls_rcom_spin); 159 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 160 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 161 spin_unlock(&ls->ls_rcom_spin); 162 } 163 164 /* 165 * low nodeid gathers one slot value at a time from each node. 166 * it sets need_slots=0, and saves rf_our_slot returned from each 167 * rcom_config. 168 * 169 * other nodes gather all slot values at once from the low nodeid. 170 * they set need_slots=1, and ignore the rf_our_slot returned from each 171 * rcom_config. they use the rf_num_slots returned from the low 172 * node's rcom_config. 173 */ 174 175 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags) 176 { 177 struct dlm_rcom *rc; 178 struct dlm_msg *msg; 179 int error = 0; 180 181 ls->ls_recover_nodeid = nodeid; 182 183 if (nodeid == dlm_our_nodeid()) { 184 rc = ls->ls_recover_buf; 185 rc->rc_result = dlm_recover_status(ls); 186 goto out; 187 } 188 189 retry: 190 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS, 191 sizeof(struct rcom_status), &rc, &msg); 192 if (error) 193 goto out; 194 195 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags); 196 197 allow_sync_reply(ls, &rc->rc_id); 198 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); 199 200 send_rcom_stateless(msg, rc); 201 202 error = dlm_wait_function(ls, &rcom_response); 203 disallow_sync_reply(ls); 204 if (error == -ETIMEDOUT) 205 goto retry; 206 if (error) 207 goto out; 208 209 rc = ls->ls_recover_buf; 210 211 if (rc->rc_result == -ESRCH) { 212 /* we pretend the remote lockspace exists with 0 status */ 213 log_debug(ls, "remote node %d not ready", nodeid); 214 rc->rc_result = 0; 215 error = 0; 216 } else { 217 error = check_rcom_config(ls, rc, nodeid); 218 } 219 220 /* the caller looks at rc_result for the remote recovery status */ 221 out: 222 return error; 223 } 224 225 static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) 226 { 227 struct dlm_rcom *rc; 228 struct rcom_status *rs; 229 uint32_t status; 230 int nodeid = rc_in->rc_header.h_nodeid; 231 int len = sizeof(struct rcom_config); 232 struct dlm_msg *msg; 233 int num_slots = 0; 234 int error; 235 236 if (!dlm_slots_version(&rc_in->rc_header)) { 237 status = dlm_recover_status(ls); 238 goto do_create; 239 } 240 241 rs = (struct rcom_status *)rc_in->rc_buf; 242 243 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) { 244 status = dlm_recover_status(ls); 245 goto do_create; 246 } 247 248 spin_lock(&ls->ls_recover_lock); 249 status = ls->ls_recover_status; 250 num_slots = ls->ls_num_slots; 251 spin_unlock(&ls->ls_recover_lock); 252 len += num_slots * sizeof(struct rcom_slot); 253 254 do_create: 255 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY, 256 len, &rc, &msg); 257 if (error) 258 return; 259 260 rc->rc_id = rc_in->rc_id; 261 rc->rc_seq_reply = rc_in->rc_seq; 262 rc->rc_result = status; 263 264 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots); 265 266 if (!num_slots) 267 goto do_send; 268 269 spin_lock(&ls->ls_recover_lock); 270 if (ls->ls_num_slots != num_slots) { 271 spin_unlock(&ls->ls_recover_lock); 272 log_debug(ls, "receive_rcom_status num_slots %d to %d", 273 num_slots, ls->ls_num_slots); 274 rc->rc_result = 0; 275 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0); 276 goto do_send; 277 } 278 279 dlm_slots_copy_out(ls, rc); 280 spin_unlock(&ls->ls_recover_lock); 281 282 do_send: 283 send_rcom_stateless(msg, rc); 284 } 285 286 static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 287 { 288 spin_lock(&ls->ls_rcom_spin); 289 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || 290 rc_in->rc_id != ls->ls_rcom_seq) { 291 log_debug(ls, "reject reply %d from %d seq %llx expect %llx", 292 rc_in->rc_type, rc_in->rc_header.h_nodeid, 293 (unsigned long long)rc_in->rc_id, 294 (unsigned long long)ls->ls_rcom_seq); 295 goto out; 296 } 297 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); 298 set_bit(LSFL_RCOM_READY, &ls->ls_flags); 299 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 300 wake_up(&ls->ls_wait_general); 301 out: 302 spin_unlock(&ls->ls_rcom_spin); 303 } 304 305 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) 306 { 307 struct dlm_rcom *rc; 308 struct dlm_msg *msg; 309 int error = 0; 310 311 ls->ls_recover_nodeid = nodeid; 312 313 retry: 314 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES, last_len, 315 &rc, &msg); 316 if (error) 317 goto out; 318 memcpy(rc->rc_buf, last_name, last_len); 319 320 allow_sync_reply(ls, &rc->rc_id); 321 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); 322 323 send_rcom_stateless(msg, rc); 324 325 error = dlm_wait_function(ls, &rcom_response); 326 disallow_sync_reply(ls); 327 if (error == -ETIMEDOUT) 328 goto retry; 329 out: 330 return error; 331 } 332 333 static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) 334 { 335 struct dlm_rcom *rc; 336 int error, inlen, outlen, nodeid; 337 struct dlm_msg *msg; 338 339 nodeid = rc_in->rc_header.h_nodeid; 340 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 341 outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); 342 343 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, 344 &rc, &msg); 345 if (error) 346 return; 347 rc->rc_id = rc_in->rc_id; 348 rc->rc_seq_reply = rc_in->rc_seq; 349 350 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 351 nodeid); 352 send_rcom_stateless(msg, rc); 353 } 354 355 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) 356 { 357 struct dlm_rcom *rc; 358 struct dlm_mhandle *mh; 359 struct dlm_ls *ls = r->res_ls; 360 int error; 361 362 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, 363 &rc, &mh); 364 if (error) 365 goto out; 366 memcpy(rc->rc_buf, r->res_name, r->res_length); 367 rc->rc_id = (unsigned long) r->res_id; 368 369 send_rcom(mh, rc); 370 out: 371 return error; 372 } 373 374 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) 375 { 376 struct dlm_rcom *rc; 377 struct dlm_mhandle *mh; 378 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid; 379 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 380 381 /* Old code would send this special id to trigger a debug dump. */ 382 if (rc_in->rc_id == 0xFFFFFFFF) { 383 log_error(ls, "receive_rcom_lookup dump from %d", nodeid); 384 dlm_dump_rsb_name(ls, rc_in->rc_buf, len); 385 return; 386 } 387 388 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); 389 if (error) 390 return; 391 392 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len, 393 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL); 394 if (error) 395 ret_nodeid = error; 396 rc->rc_result = ret_nodeid; 397 rc->rc_id = rc_in->rc_id; 398 rc->rc_seq_reply = rc_in->rc_seq; 399 400 send_rcom(mh, rc); 401 } 402 403 static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 404 { 405 dlm_recover_master_reply(ls, rc_in); 406 } 407 408 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, 409 struct rcom_lock *rl) 410 { 411 memset(rl, 0, sizeof(*rl)); 412 413 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid); 414 rl->rl_lkid = cpu_to_le32(lkb->lkb_id); 415 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags); 416 rl->rl_flags = cpu_to_le32(lkb->lkb_flags); 417 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 418 rl->rl_rqmode = lkb->lkb_rqmode; 419 rl->rl_grmode = lkb->lkb_grmode; 420 rl->rl_status = lkb->lkb_status; 421 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type); 422 423 if (lkb->lkb_bastfn) 424 rl->rl_asts |= DLM_CB_BAST; 425 if (lkb->lkb_astfn) 426 rl->rl_asts |= DLM_CB_CAST; 427 428 rl->rl_namelen = cpu_to_le16(r->res_length); 429 memcpy(rl->rl_name, r->res_name, r->res_length); 430 431 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ? 432 If so, receive_rcom_lock_args() won't take this copy. */ 433 434 if (lkb->lkb_lvbptr) 435 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 436 } 437 438 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 439 { 440 struct dlm_ls *ls = r->res_ls; 441 struct dlm_rcom *rc; 442 struct dlm_mhandle *mh; 443 struct rcom_lock *rl; 444 int error, len = sizeof(struct rcom_lock); 445 446 if (lkb->lkb_lvbptr) 447 len += ls->ls_lvblen; 448 449 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); 450 if (error) 451 goto out; 452 453 rl = (struct rcom_lock *) rc->rc_buf; 454 pack_rcom_lock(r, lkb, rl); 455 rc->rc_id = (unsigned long) r; 456 457 send_rcom(mh, rc); 458 out: 459 return error; 460 } 461 462 /* needs at least dlm_rcom + rcom_lock */ 463 static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) 464 { 465 struct dlm_rcom *rc; 466 struct dlm_mhandle *mh; 467 int error, nodeid = rc_in->rc_header.h_nodeid; 468 469 dlm_recover_master_copy(ls, rc_in); 470 471 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, 472 sizeof(struct rcom_lock), &rc, &mh); 473 if (error) 474 return; 475 476 /* We send back the same rcom_lock struct we received, but 477 dlm_recover_master_copy() has filled in rl_remid and rl_result */ 478 479 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 480 rc->rc_id = rc_in->rc_id; 481 rc->rc_seq_reply = rc_in->rc_seq; 482 483 send_rcom(mh, rc); 484 } 485 486 /* If the lockspace doesn't exist then still send a status message 487 back; it's possible that it just doesn't have its global_id yet. */ 488 489 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) 490 { 491 struct dlm_rcom *rc; 492 struct rcom_config *rf; 493 struct dlm_mhandle *mh; 494 char *mb; 495 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); 496 497 mh = dlm_midcomms_get_mhandle(nodeid, mb_len, GFP_NOFS, &mb); 498 if (!mh) 499 return -ENOBUFS; 500 501 rc = (struct dlm_rcom *) mb; 502 503 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 504 rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace; 505 rc->rc_header.h_nodeid = dlm_our_nodeid(); 506 rc->rc_header.h_length = mb_len; 507 rc->rc_header.h_cmd = DLM_RCOM; 508 509 rc->rc_type = DLM_RCOM_STATUS_REPLY; 510 rc->rc_id = rc_in->rc_id; 511 rc->rc_seq_reply = rc_in->rc_seq; 512 rc->rc_result = -ESRCH; 513 514 rf = (struct rcom_config *) rc->rc_buf; 515 rf->rf_lvblen = cpu_to_le32(~0U); 516 517 dlm_rcom_out(rc); 518 dlm_midcomms_commit_mhandle(mh); 519 520 return 0; 521 } 522 523 /* 524 * Ignore messages for stage Y before we set 525 * recover_status bit for stage X: 526 * 527 * recover_status = 0 528 * 529 * dlm_recover_members() 530 * - send nothing 531 * - recv nothing 532 * - ignore NAMES, NAMES_REPLY 533 * - ignore LOOKUP, LOOKUP_REPLY 534 * - ignore LOCK, LOCK_REPLY 535 * 536 * recover_status |= NODES 537 * 538 * dlm_recover_members_wait() 539 * 540 * dlm_recover_directory() 541 * - send NAMES 542 * - recv NAMES_REPLY 543 * - ignore LOOKUP, LOOKUP_REPLY 544 * - ignore LOCK, LOCK_REPLY 545 * 546 * recover_status |= DIR 547 * 548 * dlm_recover_directory_wait() 549 * 550 * dlm_recover_masters() 551 * - send LOOKUP 552 * - recv LOOKUP_REPLY 553 * 554 * dlm_recover_locks() 555 * - send LOCKS 556 * - recv LOCKS_REPLY 557 * 558 * recover_status |= LOCKS 559 * 560 * dlm_recover_locks_wait() 561 * 562 * recover_status |= DONE 563 */ 564 565 /* Called by dlm_recv; corresponds to dlm_receive_message() but special 566 recovery-only comms are sent through here. */ 567 568 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 569 { 570 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 571 int stop, reply = 0, names = 0, lookup = 0, lock = 0; 572 uint32_t status; 573 uint64_t seq; 574 575 switch (rc->rc_type) { 576 case DLM_RCOM_STATUS_REPLY: 577 reply = 1; 578 break; 579 case DLM_RCOM_NAMES: 580 names = 1; 581 break; 582 case DLM_RCOM_NAMES_REPLY: 583 names = 1; 584 reply = 1; 585 break; 586 case DLM_RCOM_LOOKUP: 587 lookup = 1; 588 break; 589 case DLM_RCOM_LOOKUP_REPLY: 590 lookup = 1; 591 reply = 1; 592 break; 593 case DLM_RCOM_LOCK: 594 lock = 1; 595 break; 596 case DLM_RCOM_LOCK_REPLY: 597 lock = 1; 598 reply = 1; 599 break; 600 } 601 602 spin_lock(&ls->ls_recover_lock); 603 status = ls->ls_recover_status; 604 stop = dlm_recovery_stopped(ls); 605 seq = ls->ls_recover_seq; 606 spin_unlock(&ls->ls_recover_lock); 607 608 if (stop && (rc->rc_type != DLM_RCOM_STATUS)) 609 goto ignore; 610 611 if (reply && (rc->rc_seq_reply != seq)) 612 goto ignore; 613 614 if (!(status & DLM_RS_NODES) && (names || lookup || lock)) 615 goto ignore; 616 617 if (!(status & DLM_RS_DIR) && (lookup || lock)) 618 goto ignore; 619 620 switch (rc->rc_type) { 621 case DLM_RCOM_STATUS: 622 receive_rcom_status(ls, rc); 623 break; 624 625 case DLM_RCOM_NAMES: 626 receive_rcom_names(ls, rc); 627 break; 628 629 case DLM_RCOM_LOOKUP: 630 receive_rcom_lookup(ls, rc); 631 break; 632 633 case DLM_RCOM_LOCK: 634 if (rc->rc_header.h_length < lock_size) 635 goto Eshort; 636 receive_rcom_lock(ls, rc); 637 break; 638 639 case DLM_RCOM_STATUS_REPLY: 640 receive_sync_reply(ls, rc); 641 break; 642 643 case DLM_RCOM_NAMES_REPLY: 644 receive_sync_reply(ls, rc); 645 break; 646 647 case DLM_RCOM_LOOKUP_REPLY: 648 receive_rcom_lookup_reply(ls, rc); 649 break; 650 651 case DLM_RCOM_LOCK_REPLY: 652 if (rc->rc_header.h_length < lock_size) 653 goto Eshort; 654 dlm_recover_process_copy(ls, rc); 655 break; 656 657 default: 658 log_error(ls, "receive_rcom bad type %d", rc->rc_type); 659 } 660 return; 661 662 ignore: 663 log_limit(ls, "dlm_receive_rcom ignore msg %d " 664 "from %d %llu %llu recover seq %llu sts %x gen %u", 665 rc->rc_type, 666 nodeid, 667 (unsigned long long)rc->rc_seq, 668 (unsigned long long)rc->rc_seq_reply, 669 (unsigned long long)seq, 670 status, ls->ls_generation); 671 return; 672 Eshort: 673 log_error(ls, "recovery message %d from %d is too short", 674 rc->rc_type, nodeid); 675 } 676 677