1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include "dlm_internal.h" 13 #include "lockspace.h" 14 #include "member.h" 15 #include "lowcomms.h" 16 #include "midcomms.h" 17 #include "rcom.h" 18 #include "recover.h" 19 #include "dir.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "util.h" 24 25 static int rcom_response(struct dlm_ls *ls) 26 { 27 return test_bit(LSFL_RCOM_READY, &ls->ls_flags); 28 } 29 30 static void _create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 31 struct dlm_rcom **rc_ret, char *mb, int mb_len, 32 uint64_t seq) 33 { 34 struct dlm_rcom *rc; 35 36 rc = (struct dlm_rcom *) mb; 37 38 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 39 rc->rc_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); 40 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 41 rc->rc_header.h_length = cpu_to_le16(mb_len); 42 rc->rc_header.h_cmd = DLM_RCOM; 43 44 rc->rc_type = cpu_to_le32(type); 45 rc->rc_seq = cpu_to_le64(seq); 46 47 *rc_ret = rc; 48 } 49 50 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 51 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret, 52 uint64_t seq) 53 { 54 int mb_len = sizeof(struct dlm_rcom) + len; 55 struct dlm_mhandle *mh; 56 char *mb; 57 58 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb); 59 if (!mh) { 60 log_print("%s to %d type %d len %d ENOBUFS", 61 __func__, to_nodeid, type, len); 62 return -ENOBUFS; 63 } 64 65 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq); 66 *mh_ret = mh; 67 return 0; 68 } 69 70 static int create_rcom_stateless(struct dlm_ls *ls, int to_nodeid, int type, 71 int len, struct dlm_rcom **rc_ret, 72 struct dlm_msg **msg_ret, uint64_t seq) 73 { 74 int mb_len = sizeof(struct dlm_rcom) + len; 75 struct dlm_msg *msg; 76 char *mb; 77 78 msg = dlm_lowcomms_new_msg(to_nodeid, mb_len, &mb, NULL, NULL); 79 if (!msg) { 80 log_print("create_rcom to %d type %d len %d ENOBUFS", 81 to_nodeid, type, len); 82 return -ENOBUFS; 83 } 84 85 _create_rcom(ls, to_nodeid, type, len, rc_ret, mb, mb_len, seq); 86 *msg_ret = msg; 87 return 0; 88 } 89 90 static void send_rcom(struct dlm_mhandle *mh, struct dlm_rcom *rc) 91 { 92 dlm_midcomms_commit_mhandle(mh, NULL, 0); 93 } 94 95 static void send_rcom_stateless(struct dlm_msg *msg, struct dlm_rcom *rc) 96 { 97 dlm_lowcomms_commit_msg(msg); 98 dlm_lowcomms_put_msg(msg); 99 } 100 101 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs, 102 uint32_t flags) 103 { 104 rs->rs_flags = cpu_to_le32(flags); 105 } 106 107 /* When replying to a status request, a node also sends back its 108 configuration values. The requesting node then checks that the remote 109 node is configured the same way as itself. */ 110 111 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf, 112 uint32_t num_slots) 113 { 114 rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen); 115 rf->rf_lsflags = cpu_to_le32(ls->ls_exflags); 116 117 rf->rf_our_slot = cpu_to_le16(ls->ls_slot); 118 rf->rf_num_slots = cpu_to_le16(num_slots); 119 rf->rf_generation = cpu_to_le32(ls->ls_generation); 120 } 121 122 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 123 { 124 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 125 126 if ((le32_to_cpu(rc->rc_header.h_version) & 0xFFFF0000) != DLM_HEADER_MAJOR) { 127 log_error(ls, "version mismatch: %x nodeid %d: %x", 128 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid, 129 le32_to_cpu(rc->rc_header.h_version)); 130 return -EPROTO; 131 } 132 133 if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen || 134 le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) { 135 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 136 ls->ls_lvblen, ls->ls_exflags, nodeid, 137 le32_to_cpu(rf->rf_lvblen), 138 le32_to_cpu(rf->rf_lsflags)); 139 return -EPROTO; 140 } 141 return 0; 142 } 143 144 static void allow_sync_reply(struct dlm_ls *ls, __le64 *new_seq) 145 { 146 spin_lock_bh(&ls->ls_rcom_spin); 147 *new_seq = cpu_to_le64(++ls->ls_rcom_seq); 148 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 149 spin_unlock_bh(&ls->ls_rcom_spin); 150 } 151 152 static void disallow_sync_reply(struct dlm_ls *ls) 153 { 154 spin_lock_bh(&ls->ls_rcom_spin); 155 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 156 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 157 spin_unlock_bh(&ls->ls_rcom_spin); 158 } 159 160 /* 161 * low nodeid gathers one slot value at a time from each node. 162 * it sets need_slots=0, and saves rf_our_slot returned from each 163 * rcom_config. 164 * 165 * other nodes gather all slot values at once from the low nodeid. 166 * they set need_slots=1, and ignore the rf_our_slot returned from each 167 * rcom_config. they use the rf_num_slots returned from the low 168 * node's rcom_config. 169 */ 170 171 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags, 172 uint64_t seq) 173 { 174 struct dlm_rcom *rc; 175 struct dlm_msg *msg; 176 int error = 0; 177 178 ls->ls_recover_nodeid = nodeid; 179 180 if (nodeid == dlm_our_nodeid()) { 181 rc = ls->ls_recover_buf; 182 rc->rc_result = cpu_to_le32(dlm_recover_status(ls)); 183 goto out; 184 } 185 186 retry: 187 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS, 188 sizeof(struct rcom_status), &rc, &msg, 189 seq); 190 if (error) 191 goto out; 192 193 set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags); 194 195 allow_sync_reply(ls, &rc->rc_id); 196 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); 197 198 send_rcom_stateless(msg, rc); 199 200 error = dlm_wait_function(ls, &rcom_response); 201 disallow_sync_reply(ls); 202 if (error == -ETIMEDOUT) 203 goto retry; 204 if (error) 205 goto out; 206 207 rc = ls->ls_recover_buf; 208 209 if (rc->rc_result == cpu_to_le32(-ESRCH)) { 210 /* we pretend the remote lockspace exists with 0 status */ 211 log_debug(ls, "remote node %d not ready", nodeid); 212 rc->rc_result = 0; 213 error = 0; 214 } else { 215 error = check_rcom_config(ls, rc, nodeid); 216 } 217 218 /* the caller looks at rc_result for the remote recovery status */ 219 out: 220 return error; 221 } 222 223 static void receive_rcom_status(struct dlm_ls *ls, 224 const struct dlm_rcom *rc_in, 225 uint64_t seq) 226 { 227 struct dlm_rcom *rc; 228 struct rcom_status *rs; 229 uint32_t status; 230 int nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 231 int len = sizeof(struct rcom_config); 232 struct dlm_msg *msg; 233 int num_slots = 0; 234 int error; 235 236 if (!dlm_slots_version(&rc_in->rc_header)) { 237 status = dlm_recover_status(ls); 238 goto do_create; 239 } 240 241 rs = (struct rcom_status *)rc_in->rc_buf; 242 243 if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) { 244 status = dlm_recover_status(ls); 245 goto do_create; 246 } 247 248 spin_lock_bh(&ls->ls_recover_lock); 249 status = ls->ls_recover_status; 250 num_slots = ls->ls_num_slots; 251 spin_unlock_bh(&ls->ls_recover_lock); 252 len += num_slots * sizeof(struct rcom_slot); 253 254 do_create: 255 error = create_rcom_stateless(ls, nodeid, DLM_RCOM_STATUS_REPLY, 256 len, &rc, &msg, seq); 257 if (error) 258 return; 259 260 rc->rc_id = rc_in->rc_id; 261 rc->rc_seq_reply = rc_in->rc_seq; 262 rc->rc_result = cpu_to_le32(status); 263 264 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots); 265 266 if (!num_slots) 267 goto do_send; 268 269 spin_lock_bh(&ls->ls_recover_lock); 270 if (ls->ls_num_slots != num_slots) { 271 spin_unlock_bh(&ls->ls_recover_lock); 272 log_debug(ls, "receive_rcom_status num_slots %d to %d", 273 num_slots, ls->ls_num_slots); 274 rc->rc_result = 0; 275 set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0); 276 goto do_send; 277 } 278 279 dlm_slots_copy_out(ls, rc); 280 spin_unlock_bh(&ls->ls_recover_lock); 281 282 do_send: 283 send_rcom_stateless(msg, rc); 284 } 285 286 static void receive_sync_reply(struct dlm_ls *ls, const struct dlm_rcom *rc_in) 287 { 288 spin_lock_bh(&ls->ls_rcom_spin); 289 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || 290 le64_to_cpu(rc_in->rc_id) != ls->ls_rcom_seq) { 291 log_debug(ls, "reject reply %d from %d seq %llx expect %llx", 292 le32_to_cpu(rc_in->rc_type), 293 le32_to_cpu(rc_in->rc_header.h_nodeid), 294 (unsigned long long)le64_to_cpu(rc_in->rc_id), 295 (unsigned long long)ls->ls_rcom_seq); 296 goto out; 297 } 298 memcpy(ls->ls_recover_buf, rc_in, 299 le16_to_cpu(rc_in->rc_header.h_length)); 300 set_bit(LSFL_RCOM_READY, &ls->ls_flags); 301 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 302 wake_up(&ls->ls_wait_general); 303 out: 304 spin_unlock_bh(&ls->ls_rcom_spin); 305 } 306 307 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, 308 int last_len, uint64_t seq) 309 { 310 struct dlm_mhandle *mh; 311 struct dlm_rcom *rc; 312 int error = 0; 313 314 ls->ls_recover_nodeid = nodeid; 315 316 retry: 317 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, 318 &rc, &mh, seq); 319 if (error) 320 goto out; 321 memcpy(rc->rc_buf, last_name, last_len); 322 323 allow_sync_reply(ls, &rc->rc_id); 324 memset(ls->ls_recover_buf, 0, DLM_MAX_SOCKET_BUFSIZE); 325 326 send_rcom(mh, rc); 327 328 error = dlm_wait_function(ls, &rcom_response); 329 disallow_sync_reply(ls); 330 if (error == -ETIMEDOUT) 331 goto retry; 332 out: 333 return error; 334 } 335 336 static void receive_rcom_names(struct dlm_ls *ls, const struct dlm_rcom *rc_in, 337 uint64_t seq) 338 { 339 struct dlm_mhandle *mh; 340 struct dlm_rcom *rc; 341 int error, inlen, outlen, nodeid; 342 343 nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 344 inlen = le16_to_cpu(rc_in->rc_header.h_length) - 345 sizeof(struct dlm_rcom); 346 outlen = DLM_MAX_APP_BUFSIZE - sizeof(struct dlm_rcom); 347 348 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, 349 &rc, &mh, seq); 350 if (error) 351 return; 352 rc->rc_id = rc_in->rc_id; 353 rc->rc_seq_reply = rc_in->rc_seq; 354 355 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 356 nodeid); 357 send_rcom(mh, rc); 358 } 359 360 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid, uint64_t seq) 361 { 362 struct dlm_rcom *rc; 363 struct dlm_mhandle *mh; 364 struct dlm_ls *ls = r->res_ls; 365 int error; 366 367 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, 368 &rc, &mh, seq); 369 if (error) 370 goto out; 371 memcpy(rc->rc_buf, r->res_name, r->res_length); 372 rc->rc_id = cpu_to_le64(r->res_id); 373 374 send_rcom(mh, rc); 375 out: 376 return error; 377 } 378 379 static void receive_rcom_lookup(struct dlm_ls *ls, 380 const struct dlm_rcom *rc_in, uint64_t seq) 381 { 382 struct dlm_rcom *rc; 383 struct dlm_mhandle *mh; 384 int error, ret_nodeid, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 385 int len = le16_to_cpu(rc_in->rc_header.h_length) - 386 sizeof(struct dlm_rcom); 387 388 /* Old code would send this special id to trigger a debug dump. */ 389 if (rc_in->rc_id == cpu_to_le64(0xFFFFFFFF)) { 390 log_error(ls, "receive_rcom_lookup dump from %d", nodeid); 391 dlm_dump_rsb_name(ls, rc_in->rc_buf, len); 392 return; 393 } 394 395 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh, 396 seq); 397 if (error) 398 return; 399 400 error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len, 401 DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL); 402 if (error) 403 ret_nodeid = error; 404 rc->rc_result = cpu_to_le32(ret_nodeid); 405 rc->rc_id = rc_in->rc_id; 406 rc->rc_seq_reply = rc_in->rc_seq; 407 408 send_rcom(mh, rc); 409 } 410 411 static void receive_rcom_lookup_reply(struct dlm_ls *ls, 412 const struct dlm_rcom *rc_in) 413 { 414 dlm_recover_master_reply(ls, rc_in); 415 } 416 417 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, 418 struct rcom_lock *rl) 419 { 420 memset(rl, 0, sizeof(*rl)); 421 422 rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid); 423 rl->rl_lkid = cpu_to_le32(lkb->lkb_id); 424 rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags); 425 rl->rl_flags = cpu_to_le32(dlm_dflags_val(lkb)); 426 rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 427 rl->rl_rqmode = lkb->lkb_rqmode; 428 rl->rl_grmode = lkb->lkb_grmode; 429 rl->rl_status = lkb->lkb_status; 430 rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type); 431 432 if (lkb->lkb_bastfn) 433 rl->rl_asts |= DLM_CB_BAST; 434 if (lkb->lkb_astfn) 435 rl->rl_asts |= DLM_CB_CAST; 436 437 rl->rl_namelen = cpu_to_le16(r->res_length); 438 memcpy(rl->rl_name, r->res_name, r->res_length); 439 440 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ? 441 If so, receive_rcom_lock_args() won't take this copy. */ 442 443 if (lkb->lkb_lvbptr) 444 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 445 } 446 447 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, uint64_t seq) 448 { 449 struct dlm_ls *ls = r->res_ls; 450 struct dlm_rcom *rc; 451 struct dlm_mhandle *mh; 452 struct rcom_lock *rl; 453 int error, len = sizeof(struct rcom_lock); 454 455 if (lkb->lkb_lvbptr) 456 len += ls->ls_lvblen; 457 458 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh, 459 seq); 460 if (error) 461 goto out; 462 463 rl = (struct rcom_lock *) rc->rc_buf; 464 pack_rcom_lock(r, lkb, rl); 465 rc->rc_id = cpu_to_le64((uintptr_t)r); 466 467 send_rcom(mh, rc); 468 out: 469 return error; 470 } 471 472 /* needs at least dlm_rcom + rcom_lock */ 473 static void receive_rcom_lock(struct dlm_ls *ls, const struct dlm_rcom *rc_in, 474 uint64_t seq) 475 { 476 __le32 rl_remid, rl_result; 477 struct rcom_lock *rl; 478 struct dlm_rcom *rc; 479 struct dlm_mhandle *mh; 480 int error, nodeid = le32_to_cpu(rc_in->rc_header.h_nodeid); 481 482 dlm_recover_master_copy(ls, rc_in, &rl_remid, &rl_result); 483 484 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, 485 sizeof(struct rcom_lock), &rc, &mh, seq); 486 if (error) 487 return; 488 489 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 490 rl = (struct rcom_lock *)rc->rc_buf; 491 /* set rl_remid and rl_result from dlm_recover_master_copy() */ 492 rl->rl_remid = rl_remid; 493 rl->rl_result = rl_result; 494 495 rc->rc_id = rc_in->rc_id; 496 rc->rc_seq_reply = rc_in->rc_seq; 497 498 send_rcom(mh, rc); 499 } 500 501 /* If the lockspace doesn't exist then still send a status message 502 back; it's possible that it just doesn't have its global_id yet. */ 503 504 int dlm_send_ls_not_ready(int nodeid, const struct dlm_rcom *rc_in) 505 { 506 struct dlm_rcom *rc; 507 struct rcom_config *rf; 508 struct dlm_mhandle *mh; 509 char *mb; 510 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); 511 512 mh = dlm_midcomms_get_mhandle(nodeid, mb_len, &mb); 513 if (!mh) 514 return -ENOBUFS; 515 516 rc = (struct dlm_rcom *) mb; 517 518 rc->rc_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 519 rc->rc_header.u.h_lockspace = rc_in->rc_header.u.h_lockspace; 520 rc->rc_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 521 rc->rc_header.h_length = cpu_to_le16(mb_len); 522 rc->rc_header.h_cmd = DLM_RCOM; 523 524 rc->rc_type = cpu_to_le32(DLM_RCOM_STATUS_REPLY); 525 rc->rc_id = rc_in->rc_id; 526 rc->rc_seq_reply = rc_in->rc_seq; 527 rc->rc_result = cpu_to_le32(-ESRCH); 528 529 rf = (struct rcom_config *) rc->rc_buf; 530 rf->rf_lvblen = cpu_to_le32(~0U); 531 532 dlm_midcomms_commit_mhandle(mh, NULL, 0); 533 534 return 0; 535 } 536 537 /* 538 * Ignore messages for stage Y before we set 539 * recover_status bit for stage X: 540 * 541 * recover_status = 0 542 * 543 * dlm_recover_members() 544 * - send nothing 545 * - recv nothing 546 * - ignore NAMES, NAMES_REPLY 547 * - ignore LOOKUP, LOOKUP_REPLY 548 * - ignore LOCK, LOCK_REPLY 549 * 550 * recover_status |= NODES 551 * 552 * dlm_recover_members_wait() 553 * 554 * dlm_recover_directory() 555 * - send NAMES 556 * - recv NAMES_REPLY 557 * - ignore LOOKUP, LOOKUP_REPLY 558 * - ignore LOCK, LOCK_REPLY 559 * 560 * recover_status |= DIR 561 * 562 * dlm_recover_directory_wait() 563 * 564 * dlm_recover_masters() 565 * - send LOOKUP 566 * - recv LOOKUP_REPLY 567 * 568 * dlm_recover_locks() 569 * - send LOCKS 570 * - recv LOCKS_REPLY 571 * 572 * recover_status |= LOCKS 573 * 574 * dlm_recover_locks_wait() 575 * 576 * recover_status |= DONE 577 */ 578 579 /* Called by dlm_recv; corresponds to dlm_receive_message() but special 580 recovery-only comms are sent through here. */ 581 582 void dlm_receive_rcom(struct dlm_ls *ls, const struct dlm_rcom *rc, int nodeid) 583 { 584 int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); 585 int stop, reply = 0, names = 0, lookup = 0, lock = 0; 586 uint32_t status; 587 uint64_t seq; 588 589 switch (rc->rc_type) { 590 case cpu_to_le32(DLM_RCOM_STATUS_REPLY): 591 reply = 1; 592 break; 593 case cpu_to_le32(DLM_RCOM_NAMES): 594 names = 1; 595 break; 596 case cpu_to_le32(DLM_RCOM_NAMES_REPLY): 597 names = 1; 598 reply = 1; 599 break; 600 case cpu_to_le32(DLM_RCOM_LOOKUP): 601 lookup = 1; 602 break; 603 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY): 604 lookup = 1; 605 reply = 1; 606 break; 607 case cpu_to_le32(DLM_RCOM_LOCK): 608 lock = 1; 609 break; 610 case cpu_to_le32(DLM_RCOM_LOCK_REPLY): 611 lock = 1; 612 reply = 1; 613 break; 614 } 615 616 spin_lock_bh(&ls->ls_recover_lock); 617 status = ls->ls_recover_status; 618 stop = dlm_recovery_stopped(ls); 619 seq = ls->ls_recover_seq; 620 spin_unlock_bh(&ls->ls_recover_lock); 621 622 if (stop && (rc->rc_type != cpu_to_le32(DLM_RCOM_STATUS))) 623 goto ignore; 624 625 if (reply && (le64_to_cpu(rc->rc_seq_reply) != seq)) 626 goto ignore; 627 628 if (!(status & DLM_RS_NODES) && (names || lookup || lock)) 629 goto ignore; 630 631 if (!(status & DLM_RS_DIR) && (lookup || lock)) 632 goto ignore; 633 634 switch (rc->rc_type) { 635 case cpu_to_le32(DLM_RCOM_STATUS): 636 receive_rcom_status(ls, rc, seq); 637 break; 638 639 case cpu_to_le32(DLM_RCOM_NAMES): 640 receive_rcom_names(ls, rc, seq); 641 break; 642 643 case cpu_to_le32(DLM_RCOM_LOOKUP): 644 receive_rcom_lookup(ls, rc, seq); 645 break; 646 647 case cpu_to_le32(DLM_RCOM_LOCK): 648 if (le16_to_cpu(rc->rc_header.h_length) < lock_size) 649 goto Eshort; 650 receive_rcom_lock(ls, rc, seq); 651 break; 652 653 case cpu_to_le32(DLM_RCOM_STATUS_REPLY): 654 receive_sync_reply(ls, rc); 655 break; 656 657 case cpu_to_le32(DLM_RCOM_NAMES_REPLY): 658 receive_sync_reply(ls, rc); 659 break; 660 661 case cpu_to_le32(DLM_RCOM_LOOKUP_REPLY): 662 receive_rcom_lookup_reply(ls, rc); 663 break; 664 665 case cpu_to_le32(DLM_RCOM_LOCK_REPLY): 666 if (le16_to_cpu(rc->rc_header.h_length) < lock_size) 667 goto Eshort; 668 dlm_recover_process_copy(ls, rc, seq); 669 break; 670 671 default: 672 log_error(ls, "receive_rcom bad type %d", 673 le32_to_cpu(rc->rc_type)); 674 } 675 return; 676 677 ignore: 678 log_limit(ls, "dlm_receive_rcom ignore msg %d " 679 "from %d %llu %llu recover seq %llu sts %x gen %u", 680 le32_to_cpu(rc->rc_type), 681 nodeid, 682 (unsigned long long)le64_to_cpu(rc->rc_seq), 683 (unsigned long long)le64_to_cpu(rc->rc_seq_reply), 684 (unsigned long long)seq, 685 status, ls->ls_generation); 686 return; 687 Eshort: 688 log_error(ls, "recovery message %d from %d is too short", 689 le32_to_cpu(rc->rc_type), nodeid); 690 } 691 692