1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 6 ** 7 ** 8 ******************************************************************************* 9 ******************************************************************************/ 10 11 /* Central locking logic has four stages: 12 13 dlm_lock() 14 dlm_unlock() 15 16 request_lock(ls, lkb) 17 convert_lock(ls, lkb) 18 unlock_lock(ls, lkb) 19 cancel_lock(ls, lkb) 20 21 _request_lock(r, lkb) 22 _convert_lock(r, lkb) 23 _unlock_lock(r, lkb) 24 _cancel_lock(r, lkb) 25 26 do_request(r, lkb) 27 do_convert(r, lkb) 28 do_unlock(r, lkb) 29 do_cancel(r, lkb) 30 31 Stage 1 (lock, unlock) is mainly about checking input args and 32 splitting into one of the four main operations: 33 34 dlm_lock = request_lock 35 dlm_lock+CONVERT = convert_lock 36 dlm_unlock = unlock_lock 37 dlm_unlock+CANCEL = cancel_lock 38 39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 40 provided to the next stage. 41 42 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 43 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 44 45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 46 given rsb and lkb and queues callbacks. 47 48 For remote operations, send_xxxx() results in the corresponding do_xxxx() 49 function being executed on the remote node. The connecting send/receive 50 calls on local (L) and remote (R) nodes: 51 52 L: send_xxxx() -> R: receive_xxxx() 53 R: do_xxxx() 54 L: receive_xxxx_reply() <- R: send_xxxx_reply() 55 */ 56 #include <trace/events/dlm.h> 57 58 #include <linux/types.h> 59 #include <linux/rbtree.h> 60 #include <linux/slab.h> 61 #include "dlm_internal.h" 62 #include <linux/dlm_device.h> 63 #include "memory.h" 64 #include "midcomms.h" 65 #include "requestqueue.h" 66 #include "util.h" 67 #include "dir.h" 68 #include "member.h" 69 #include "lockspace.h" 70 #include "ast.h" 71 #include "lock.h" 72 #include "rcom.h" 73 #include "recover.h" 74 #include "lvb_table.h" 75 #include "user.h" 76 #include "config.h" 77 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static int send_remove(struct dlm_rsb *r); 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 89 const struct dlm_message *ms, bool local); 90 static int receive_extralen(const struct dlm_message *ms); 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92 static void toss_rsb(struct kref *kref); 93 94 /* 95 * Lock compatibilty matrix - thanks Steve 96 * UN = Unlocked state. Not really a state, used as a flag 97 * PD = Padding. Used to make the matrix a nice power of two in size 98 * Other states are the same as the VMS DLM. 99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 100 */ 101 102 static const int __dlm_compat_matrix[8][8] = { 103 /* UN NL CR CW PR PW EX PD */ 104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 112 }; 113 114 /* 115 * This defines the direction of transfer of LVB data. 116 * Granted mode is the row; requested mode is the column. 117 * Usage: matrix[grmode+1][rqmode+1] 118 * 1 = LVB is returned to the caller 119 * 0 = LVB is written to the resource 120 * -1 = nothing happens to the LVB 121 */ 122 123 const int dlm_lvb_operations[8][8] = { 124 /* UN NL CR CW PR PW EX PD*/ 125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 133 }; 134 135 #define modes_compat(gr, rq) \ 136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 137 138 int dlm_modes_compat(int mode1, int mode2) 139 { 140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 141 } 142 143 /* 144 * Compatibility matrix for conversions with QUECVT set. 145 * Granted mode is the row; requested mode is the column. 146 * Usage: matrix[grmode+1][rqmode+1] 147 */ 148 149 static const int __quecvt_compat_matrix[8][8] = { 150 /* UN NL CR CW PR PW EX PD */ 151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 159 }; 160 161 void dlm_print_lkb(struct dlm_lkb *lkb) 162 { 163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", 165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode, 167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 168 (unsigned long long)lkb->lkb_recover_seq); 169 } 170 171 static void dlm_print_rsb(struct dlm_rsb *r) 172 { 173 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 174 "rlc %d name %s\n", 175 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 176 r->res_flags, r->res_first_lkid, r->res_recover_locks_count, 177 r->res_name); 178 } 179 180 void dlm_dump_rsb(struct dlm_rsb *r) 181 { 182 struct dlm_lkb *lkb; 183 184 dlm_print_rsb(r); 185 186 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 187 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 188 printk(KERN_ERR "rsb lookup list\n"); 189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 190 dlm_print_lkb(lkb); 191 printk(KERN_ERR "rsb grant queue:\n"); 192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 193 dlm_print_lkb(lkb); 194 printk(KERN_ERR "rsb convert queue:\n"); 195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 196 dlm_print_lkb(lkb); 197 printk(KERN_ERR "rsb wait queue:\n"); 198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 199 dlm_print_lkb(lkb); 200 } 201 202 /* Threads cannot use the lockspace while it's being recovered */ 203 204 void dlm_lock_recovery(struct dlm_ls *ls) 205 { 206 down_read(&ls->ls_in_recovery); 207 } 208 209 void dlm_unlock_recovery(struct dlm_ls *ls) 210 { 211 up_read(&ls->ls_in_recovery); 212 } 213 214 int dlm_lock_recovery_try(struct dlm_ls *ls) 215 { 216 return down_read_trylock(&ls->ls_in_recovery); 217 } 218 219 static inline int can_be_queued(struct dlm_lkb *lkb) 220 { 221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 222 } 223 224 static inline int force_blocking_asts(struct dlm_lkb *lkb) 225 { 226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 227 } 228 229 static inline int is_demoted(struct dlm_lkb *lkb) 230 { 231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags); 232 } 233 234 static inline int is_altmode(struct dlm_lkb *lkb) 235 { 236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags); 237 } 238 239 static inline int is_granted(struct dlm_lkb *lkb) 240 { 241 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 242 } 243 244 static inline int is_remote(struct dlm_rsb *r) 245 { 246 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 247 return !!r->res_nodeid; 248 } 249 250 static inline int is_process_copy(struct dlm_lkb *lkb) 251 { 252 return lkb->lkb_nodeid && 253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 254 } 255 256 static inline int is_master_copy(struct dlm_lkb *lkb) 257 { 258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 259 } 260 261 static inline int middle_conversion(struct dlm_lkb *lkb) 262 { 263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 265 return 1; 266 return 0; 267 } 268 269 static inline int down_conversion(struct dlm_lkb *lkb) 270 { 271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 272 } 273 274 static inline int is_overlap_unlock(struct dlm_lkb *lkb) 275 { 276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 277 } 278 279 static inline int is_overlap_cancel(struct dlm_lkb *lkb) 280 { 281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 282 } 283 284 static inline int is_overlap(struct dlm_lkb *lkb) 285 { 286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) || 287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 288 } 289 290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 291 { 292 if (is_master_copy(lkb)) 293 return; 294 295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 296 297 if (rv == -DLM_ECANCEL && 298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags)) 299 rv = -EDEADLK; 300 301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb)); 302 } 303 304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 305 { 306 queue_cast(r, lkb, 307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 308 } 309 310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 311 { 312 if (is_master_copy(lkb)) { 313 send_bast(r, lkb, rqmode); 314 } else { 315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); 316 } 317 } 318 319 /* 320 * Basic operations on rsb's and lkb's 321 */ 322 323 static inline unsigned long rsb_toss_jiffies(void) 324 { 325 return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ); 326 } 327 328 /* This is only called to add a reference when the code already holds 329 a valid reference to the rsb, so there's no need for locking. */ 330 331 static inline void hold_rsb(struct dlm_rsb *r) 332 { 333 /* rsbs in toss state never get referenced */ 334 WARN_ON(rsb_flag(r, RSB_TOSS)); 335 kref_get(&r->res_ref); 336 } 337 338 void dlm_hold_rsb(struct dlm_rsb *r) 339 { 340 hold_rsb(r); 341 } 342 343 /* TODO move this to lib/refcount.c */ 344 static __must_check bool 345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock) 346 __cond_acquires(lock) 347 { 348 if (refcount_dec_not_one(r)) 349 return false; 350 351 write_lock_bh(lock); 352 if (!refcount_dec_and_test(r)) { 353 write_unlock_bh(lock); 354 return false; 355 } 356 357 return true; 358 } 359 360 /* TODO move this to include/linux/kref.h */ 361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref, 362 void (*release)(struct kref *kref), 363 rwlock_t *lock) 364 { 365 if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) { 366 release(kref); 367 return 1; 368 } 369 370 return 0; 371 } 372 373 /* When all references to the rsb are gone it's transferred to 374 the tossed list for later disposal. */ 375 376 static void put_rsb(struct dlm_rsb *r) 377 { 378 struct dlm_ls *ls = r->res_ls; 379 int rv; 380 381 rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb, 382 &ls->ls_rsbtbl_lock); 383 if (rv) 384 write_unlock_bh(&ls->ls_rsbtbl_lock); 385 } 386 387 void dlm_put_rsb(struct dlm_rsb *r) 388 { 389 put_rsb(r); 390 } 391 392 static int pre_rsb_struct(struct dlm_ls *ls) 393 { 394 struct dlm_rsb *r1, *r2; 395 int count = 0; 396 397 spin_lock_bh(&ls->ls_new_rsb_spin); 398 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 399 spin_unlock_bh(&ls->ls_new_rsb_spin); 400 return 0; 401 } 402 spin_unlock_bh(&ls->ls_new_rsb_spin); 403 404 r1 = dlm_allocate_rsb(ls); 405 r2 = dlm_allocate_rsb(ls); 406 407 spin_lock_bh(&ls->ls_new_rsb_spin); 408 if (r1) { 409 list_add(&r1->res_hashchain, &ls->ls_new_rsb); 410 ls->ls_new_rsb_count++; 411 } 412 if (r2) { 413 list_add(&r2->res_hashchain, &ls->ls_new_rsb); 414 ls->ls_new_rsb_count++; 415 } 416 count = ls->ls_new_rsb_count; 417 spin_unlock_bh(&ls->ls_new_rsb_spin); 418 419 if (!count) 420 return -ENOMEM; 421 return 0; 422 } 423 424 /* connected with timer_delete_sync() in dlm_ls_stop() to stop 425 * new timers when recovery is triggered and don't run them 426 * again until a dlm_timer_resume() tries it again. 427 */ 428 static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) 429 { 430 if (!dlm_locking_stopped(ls)) 431 mod_timer(&ls->ls_timer, jiffies); 432 } 433 434 /* This function tries to resume the timer callback if a rsb 435 * is on the toss list and no timer is pending. It might that 436 * the first entry is on currently executed as timer callback 437 * but we don't care if a timer queued up again and does 438 * nothing. Should be a rare case. 439 */ 440 void dlm_timer_resume(struct dlm_ls *ls) 441 { 442 struct dlm_rsb *r; 443 444 spin_lock_bh(&ls->ls_toss_q_lock); 445 r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 446 res_toss_q_list); 447 if (r && !timer_pending(&ls->ls_timer)) 448 __rsb_mod_timer(ls, r->res_toss_time); 449 spin_unlock_bh(&ls->ls_toss_q_lock); 450 } 451 452 /* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ 453 static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) 454 { 455 struct dlm_rsb *first; 456 457 spin_lock_bh(&ls->ls_toss_q_lock); 458 r->res_toss_time = 0; 459 460 /* if the rsb is not queued do nothing */ 461 if (list_empty(&r->res_toss_q_list)) 462 goto out; 463 464 /* get the first element before delete */ 465 first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, 466 res_toss_q_list); 467 list_del_init(&r->res_toss_q_list); 468 /* check if the first element was the rsb we deleted */ 469 if (first == r) { 470 /* try to get the new first element, if the list 471 * is empty now try to delete the timer, if we are 472 * too late we don't care. 473 * 474 * if the list isn't empty and a new first element got 475 * in place, set the new timer expire time. 476 */ 477 first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 478 res_toss_q_list); 479 if (!first) 480 timer_delete(&ls->ls_timer); 481 else 482 __rsb_mod_timer(ls, first->res_toss_time); 483 } 484 485 out: 486 spin_unlock_bh(&ls->ls_toss_q_lock); 487 } 488 489 /* Caller must held ls_rsbtbl_lock and need to be called every time 490 * when either the rsb enters toss state or the toss state changes 491 * the dir/master nodeid. 492 */ 493 static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) 494 { 495 int our_nodeid = dlm_our_nodeid(); 496 struct dlm_rsb *first; 497 498 /* If we're the directory record for this rsb, and 499 * we're not the master of it, then we need to wait 500 * for the master node to send us a dir remove for 501 * before removing the dir record. 502 */ 503 if (!dlm_no_directory(ls) && 504 (r->res_master_nodeid != our_nodeid) && 505 (dlm_dir_nodeid(r) == our_nodeid)) { 506 rsb_delete_toss_timer(ls, r); 507 return; 508 } 509 510 spin_lock_bh(&ls->ls_toss_q_lock); 511 /* set the new rsb absolute expire time in the rsb */ 512 r->res_toss_time = rsb_toss_jiffies(); 513 if (list_empty(&ls->ls_toss_q)) { 514 /* if the queue is empty add the element and it's 515 * our new expire time 516 */ 517 list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); 518 __rsb_mod_timer(ls, r->res_toss_time); 519 } else { 520 /* check if the rsb was already queued, if so delete 521 * it from the toss queue 522 */ 523 if (!list_empty(&r->res_toss_q_list)) 524 list_del(&r->res_toss_q_list); 525 526 /* try to get the maybe new first element and then add 527 * to this rsb with the oldest expire time to the end 528 * of the queue. If the list was empty before this 529 * rsb expire time is our next expiration if it wasn't 530 * the now new first elemet is our new expiration time 531 */ 532 first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 533 res_toss_q_list); 534 list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); 535 if (!first) 536 __rsb_mod_timer(ls, r->res_toss_time); 537 else 538 __rsb_mod_timer(ls, first->res_toss_time); 539 } 540 spin_unlock_bh(&ls->ls_toss_q_lock); 541 } 542 543 /* if we hit contention we do in 250 ms a retry to trylock. 544 * if there is any other mod_timer in between we don't care 545 * about that it expires earlier again this is only for the 546 * unlikely case nothing happened in this time. 547 */ 548 #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) 549 550 void dlm_rsb_toss_timer(struct timer_list *timer) 551 { 552 struct dlm_ls *ls = from_timer(ls, timer, ls_timer); 553 int our_nodeid = dlm_our_nodeid(); 554 struct dlm_rsb *r; 555 int rv; 556 557 while (1) { 558 /* interrupting point to leave iteration when 559 * recovery waits for timer_delete_sync(), recovery 560 * will take care to delete everything in toss queue. 561 */ 562 if (dlm_locking_stopped(ls)) 563 break; 564 565 rv = spin_trylock(&ls->ls_toss_q_lock); 566 if (!rv) { 567 /* rearm again try timer */ 568 __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); 569 break; 570 } 571 572 r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, 573 res_toss_q_list); 574 if (!r) { 575 /* nothing to do anymore next rsb queue will 576 * set next mod_timer() expire. 577 */ 578 spin_unlock(&ls->ls_toss_q_lock); 579 break; 580 } 581 582 /* test if the first rsb isn't expired yet, if 583 * so we stop freeing rsb from toss queue as 584 * the order in queue is ascending to the 585 * absolute res_toss_time jiffies 586 */ 587 if (time_before(jiffies, r->res_toss_time)) { 588 /* rearm with the next rsb to expire in the future */ 589 __rsb_mod_timer(ls, r->res_toss_time); 590 spin_unlock(&ls->ls_toss_q_lock); 591 break; 592 } 593 594 /* in find_rsb_dir/nodir there is a reverse order of this 595 * lock, however this is only a trylock if we hit some 596 * possible contention we try it again. 597 * 598 * This lock synchronized while holding ls_toss_q_lock 599 * synchronize everything that rsb_delete_toss_timer() 600 * or rsb_mod_timer() can't run after this timer callback 601 * deletes the rsb from the ls_toss_q. Whereas the other 602 * holders have always a priority to run as this is only 603 * a caching handling and the other holders might to put 604 * this rsb out of the toss state. 605 */ 606 rv = write_trylock(&ls->ls_rsbtbl_lock); 607 if (!rv) { 608 spin_unlock(&ls->ls_toss_q_lock); 609 /* rearm again try timer */ 610 __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); 611 break; 612 } 613 614 list_del(&r->res_rsbs_list); 615 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, 616 dlm_rhash_rsb_params); 617 618 /* not necessary to held the ls_rsbtbl_lock when 619 * calling send_remove() 620 */ 621 write_unlock(&ls->ls_rsbtbl_lock); 622 623 /* remove the rsb out of the toss queue its gone 624 * drom DLM now 625 */ 626 list_del_init(&r->res_toss_q_list); 627 spin_unlock(&ls->ls_toss_q_lock); 628 629 /* no rsb in this state should ever run a timer */ 630 WARN_ON(!dlm_no_directory(ls) && 631 (r->res_master_nodeid != our_nodeid) && 632 (dlm_dir_nodeid(r) == our_nodeid)); 633 634 /* We're the master of this rsb but we're not 635 * the directory record, so we need to tell the 636 * dir node to remove the dir record 637 */ 638 if (!dlm_no_directory(ls) && 639 (r->res_master_nodeid == our_nodeid) && 640 (dlm_dir_nodeid(r) != our_nodeid)) 641 send_remove(r); 642 643 free_toss_rsb(r); 644 } 645 } 646 647 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can 648 unlock any spinlocks, go back and call pre_rsb_struct again. 649 Otherwise, take an rsb off the list and return it. */ 650 651 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, 652 struct dlm_rsb **r_ret) 653 { 654 struct dlm_rsb *r; 655 int count; 656 657 spin_lock_bh(&ls->ls_new_rsb_spin); 658 if (list_empty(&ls->ls_new_rsb)) { 659 count = ls->ls_new_rsb_count; 660 spin_unlock_bh(&ls->ls_new_rsb_spin); 661 log_debug(ls, "find_rsb retry %d %d %s", 662 count, dlm_config.ci_new_rsb_count, 663 (const char *)name); 664 return -EAGAIN; 665 } 666 667 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 668 list_del(&r->res_hashchain); 669 ls->ls_new_rsb_count--; 670 spin_unlock_bh(&ls->ls_new_rsb_spin); 671 672 r->res_ls = ls; 673 r->res_length = len; 674 memcpy(r->res_name, name, len); 675 spin_lock_init(&r->res_lock); 676 677 INIT_LIST_HEAD(&r->res_lookup); 678 INIT_LIST_HEAD(&r->res_grantqueue); 679 INIT_LIST_HEAD(&r->res_convertqueue); 680 INIT_LIST_HEAD(&r->res_waitqueue); 681 INIT_LIST_HEAD(&r->res_root_list); 682 INIT_LIST_HEAD(&r->res_toss_q_list); 683 INIT_LIST_HEAD(&r->res_recover_list); 684 INIT_LIST_HEAD(&r->res_masters_list); 685 686 *r_ret = r; 687 return 0; 688 } 689 690 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len, 691 struct dlm_rsb **r_ret) 692 { 693 char key[DLM_RESNAME_MAXLEN] = {}; 694 695 memcpy(key, name, len); 696 *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params); 697 if (*r_ret) 698 return 0; 699 700 return -EBADR; 701 } 702 703 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash) 704 { 705 return rhashtable_insert_fast(rhash, &rsb->res_node, 706 dlm_rhash_rsb_params); 707 } 708 709 /* 710 * Find rsb in rsbtbl and potentially create/add one 711 * 712 * Delaying the release of rsb's has a similar benefit to applications keeping 713 * NL locks on an rsb, but without the guarantee that the cached master value 714 * will still be valid when the rsb is reused. Apps aren't always smart enough 715 * to keep NL locks on an rsb that they may lock again shortly; this can lead 716 * to excessive master lookups and removals if we don't delay the release. 717 * 718 * Searching for an rsb means looking through both the normal list and toss 719 * list. When found on the toss list the rsb is moved to the normal list with 720 * ref count of 1; when found on normal list the ref count is incremented. 721 * 722 * rsb's on the keep list are being used locally and refcounted. 723 * rsb's on the toss list are not being used locally, and are not refcounted. 724 * 725 * The toss list rsb's were either 726 * - previously used locally but not any more (were on keep list, then 727 * moved to toss list when last refcount dropped) 728 * - created and put on toss list as a directory record for a lookup 729 * (we are the dir node for the res, but are not using the res right now, 730 * but some other node is) 731 * 732 * The purpose of find_rsb() is to return a refcounted rsb for local use. 733 * So, if the given rsb is on the toss list, it is moved to the keep list 734 * before being returned. 735 * 736 * toss_rsb() happens when all local usage of the rsb is done, i.e. no 737 * more refcounts exist, so the rsb is moved from the keep list to the 738 * toss list. 739 * 740 * rsb's on both keep and toss lists are used for doing a name to master 741 * lookups. rsb's that are in use locally (and being refcounted) are on 742 * the keep list, rsb's that are not in use locally (not refcounted) and 743 * only exist for name/master lookups are on the toss list. 744 * 745 * rsb's on the toss list who's dir_nodeid is not local can have stale 746 * name/master mappings. So, remote requests on such rsb's can potentially 747 * return with an error, which means the mapping is stale and needs to 748 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and 749 * first_lkid is to keep only a single outstanding request on an rsb 750 * while that rsb has a potentially stale master.) 751 */ 752 753 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, 754 uint32_t hash, int dir_nodeid, int from_nodeid, 755 unsigned int flags, struct dlm_rsb **r_ret) 756 { 757 struct dlm_rsb *r = NULL; 758 int our_nodeid = dlm_our_nodeid(); 759 int from_local = 0; 760 int from_other = 0; 761 int from_dir = 0; 762 int create = 0; 763 int error; 764 765 if (flags & R_RECEIVE_REQUEST) { 766 if (from_nodeid == dir_nodeid) 767 from_dir = 1; 768 else 769 from_other = 1; 770 } else if (flags & R_REQUEST) { 771 from_local = 1; 772 } 773 774 /* 775 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so 776 * from_nodeid has sent us a lock in dlm_recover_locks, believing 777 * we're the new master. Our local recovery may not have set 778 * res_master_nodeid to our_nodeid yet, so allow either. Don't 779 * create the rsb; dlm_recover_process_copy() will handle EBADR 780 * by resending. 781 * 782 * If someone sends us a request, we are the dir node, and we do 783 * not find the rsb anywhere, then recreate it. This happens if 784 * someone sends us a request after we have removed/freed an rsb 785 * from our toss list. (They sent a request instead of lookup 786 * because they are using an rsb from their toss list.) 787 */ 788 789 if (from_local || from_dir || 790 (from_other && (dir_nodeid == our_nodeid))) { 791 create = 1; 792 } 793 794 retry: 795 if (create) { 796 error = pre_rsb_struct(ls); 797 if (error < 0) 798 goto out; 799 } 800 801 retry_lookup: 802 803 /* check if the rsb is in keep state under read lock - likely path */ 804 read_lock_bh(&ls->ls_rsbtbl_lock); 805 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 806 if (error) { 807 read_unlock_bh(&ls->ls_rsbtbl_lock); 808 goto do_new; 809 } 810 811 /* 812 * rsb is active, so we can't check master_nodeid without lock_rsb. 813 */ 814 815 if (rsb_flag(r, RSB_TOSS)) { 816 read_unlock_bh(&ls->ls_rsbtbl_lock); 817 goto do_toss; 818 } 819 820 kref_get(&r->res_ref); 821 read_unlock_bh(&ls->ls_rsbtbl_lock); 822 goto out; 823 824 825 do_toss: 826 write_lock_bh(&ls->ls_rsbtbl_lock); 827 828 /* retry lookup under write lock to see if its still in toss state 829 * if not it's in keep state and we relookup - unlikely path. 830 */ 831 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 832 if (!error) { 833 if (!rsb_flag(r, RSB_TOSS)) { 834 write_unlock_bh(&ls->ls_rsbtbl_lock); 835 goto retry_lookup; 836 } 837 } else { 838 write_unlock_bh(&ls->ls_rsbtbl_lock); 839 goto do_new; 840 } 841 842 /* 843 * rsb found inactive (master_nodeid may be out of date unless 844 * we are the dir_nodeid or were the master) No other thread 845 * is using this rsb because it's on the toss list, so we can 846 * look at or update res_master_nodeid without lock_rsb. 847 */ 848 849 if ((r->res_master_nodeid != our_nodeid) && from_other) { 850 /* our rsb was not master, and another node (not the dir node) 851 has sent us a request */ 852 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 853 from_nodeid, r->res_master_nodeid, dir_nodeid, 854 r->res_name); 855 write_unlock_bh(&ls->ls_rsbtbl_lock); 856 error = -ENOTBLK; 857 goto out; 858 } 859 860 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 861 /* don't think this should ever happen */ 862 log_error(ls, "find_rsb toss from_dir %d master %d", 863 from_nodeid, r->res_master_nodeid); 864 dlm_print_rsb(r); 865 /* fix it and go on */ 866 r->res_master_nodeid = our_nodeid; 867 r->res_nodeid = 0; 868 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 869 r->res_first_lkid = 0; 870 } 871 872 if (from_local && (r->res_master_nodeid != our_nodeid)) { 873 /* Because we have held no locks on this rsb, 874 res_master_nodeid could have become stale. */ 875 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 876 r->res_first_lkid = 0; 877 } 878 879 list_move(&r->res_rsbs_list, &ls->ls_keep); 880 rsb_clear_flag(r, RSB_TOSS); 881 /* rsb got out of toss state, it becomes alive again 882 * and we reinit the reference counter that is only 883 * valid for keep state rsbs 884 */ 885 kref_init(&r->res_ref); 886 rsb_delete_toss_timer(ls, r); 887 write_unlock_bh(&ls->ls_rsbtbl_lock); 888 889 goto out; 890 891 892 do_new: 893 /* 894 * rsb not found 895 */ 896 897 if (error == -EBADR && !create) 898 goto out; 899 900 error = get_rsb_struct(ls, name, len, &r); 901 if (error == -EAGAIN) 902 goto retry; 903 if (error) 904 goto out; 905 906 r->res_hash = hash; 907 r->res_dir_nodeid = dir_nodeid; 908 kref_init(&r->res_ref); 909 910 if (from_dir) { 911 /* want to see how often this happens */ 912 log_debug(ls, "find_rsb new from_dir %d recreate %s", 913 from_nodeid, r->res_name); 914 r->res_master_nodeid = our_nodeid; 915 r->res_nodeid = 0; 916 goto out_add; 917 } 918 919 if (from_other && (dir_nodeid != our_nodeid)) { 920 /* should never happen */ 921 log_error(ls, "find_rsb new from_other %d dir %d our %d %s", 922 from_nodeid, dir_nodeid, our_nodeid, r->res_name); 923 dlm_free_rsb(r); 924 r = NULL; 925 error = -ENOTBLK; 926 goto out; 927 } 928 929 if (from_other) { 930 log_debug(ls, "find_rsb new from_other %d dir %d %s", 931 from_nodeid, dir_nodeid, r->res_name); 932 } 933 934 if (dir_nodeid == our_nodeid) { 935 /* When we are the dir nodeid, we can set the master 936 node immediately */ 937 r->res_master_nodeid = our_nodeid; 938 r->res_nodeid = 0; 939 } else { 940 /* set_master will send_lookup to dir_nodeid */ 941 r->res_master_nodeid = 0; 942 r->res_nodeid = -1; 943 } 944 945 out_add: 946 947 write_lock_bh(&ls->ls_rsbtbl_lock); 948 error = rsb_insert(r, &ls->ls_rsbtbl); 949 if (error == -EEXIST) { 950 /* somebody else was faster and it seems the 951 * rsb exists now, we do a whole relookup 952 */ 953 write_unlock_bh(&ls->ls_rsbtbl_lock); 954 dlm_free_rsb(r); 955 goto retry_lookup; 956 } else if (!error) { 957 list_add(&r->res_rsbs_list, &ls->ls_keep); 958 } 959 write_unlock_bh(&ls->ls_rsbtbl_lock); 960 out: 961 *r_ret = r; 962 return error; 963 } 964 965 /* During recovery, other nodes can send us new MSTCPY locks (from 966 dlm_recover_locks) before we've made ourself master (in 967 dlm_recover_masters). */ 968 969 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, 970 uint32_t hash, int dir_nodeid, int from_nodeid, 971 unsigned int flags, struct dlm_rsb **r_ret) 972 { 973 struct dlm_rsb *r = NULL; 974 int our_nodeid = dlm_our_nodeid(); 975 int recover = (flags & R_RECEIVE_RECOVER); 976 int error; 977 978 retry: 979 error = pre_rsb_struct(ls); 980 if (error < 0) 981 goto out; 982 983 retry_lookup: 984 985 /* check if the rsb is in keep state under read lock - likely path */ 986 read_lock_bh(&ls->ls_rsbtbl_lock); 987 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 988 if (error) { 989 read_unlock_bh(&ls->ls_rsbtbl_lock); 990 goto do_new; 991 } 992 993 if (rsb_flag(r, RSB_TOSS)) { 994 read_unlock_bh(&ls->ls_rsbtbl_lock); 995 goto do_toss; 996 } 997 998 /* 999 * rsb is active, so we can't check master_nodeid without lock_rsb. 1000 */ 1001 1002 kref_get(&r->res_ref); 1003 read_unlock_bh(&ls->ls_rsbtbl_lock); 1004 1005 goto out; 1006 1007 1008 do_toss: 1009 write_lock_bh(&ls->ls_rsbtbl_lock); 1010 1011 /* retry lookup under write lock to see if its still in toss state 1012 * if not it's in keep state and we relookup - unlikely path. 1013 */ 1014 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 1015 if (!error) { 1016 if (!rsb_flag(r, RSB_TOSS)) { 1017 write_unlock_bh(&ls->ls_rsbtbl_lock); 1018 goto retry_lookup; 1019 } 1020 } else { 1021 write_unlock_bh(&ls->ls_rsbtbl_lock); 1022 goto do_new; 1023 } 1024 1025 1026 /* 1027 * rsb found inactive. No other thread is using this rsb because 1028 * it's on the toss list, so we can look at or update 1029 * res_master_nodeid without lock_rsb. 1030 */ 1031 1032 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 1033 /* our rsb is not master, and another node has sent us a 1034 request; this should never happen */ 1035 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 1036 from_nodeid, r->res_master_nodeid, dir_nodeid); 1037 dlm_print_rsb(r); 1038 write_unlock_bh(&ls->ls_rsbtbl_lock); 1039 error = -ENOTBLK; 1040 goto out; 1041 } 1042 1043 if (!recover && (r->res_master_nodeid != our_nodeid) && 1044 (dir_nodeid == our_nodeid)) { 1045 /* our rsb is not master, and we are dir; may as well fix it; 1046 this should never happen */ 1047 log_error(ls, "find_rsb toss our %d master %d dir %d", 1048 our_nodeid, r->res_master_nodeid, dir_nodeid); 1049 dlm_print_rsb(r); 1050 r->res_master_nodeid = our_nodeid; 1051 r->res_nodeid = 0; 1052 } 1053 1054 list_move(&r->res_rsbs_list, &ls->ls_keep); 1055 rsb_clear_flag(r, RSB_TOSS); 1056 /* rsb got out of toss state, it becomes alive again 1057 * and we reinit the reference counter that is only 1058 * valid for keep state rsbs 1059 */ 1060 kref_init(&r->res_ref); 1061 rsb_delete_toss_timer(ls, r); 1062 write_unlock_bh(&ls->ls_rsbtbl_lock); 1063 1064 goto out; 1065 1066 1067 do_new: 1068 /* 1069 * rsb not found 1070 */ 1071 1072 error = get_rsb_struct(ls, name, len, &r); 1073 if (error == -EAGAIN) { 1074 goto retry; 1075 } 1076 if (error) 1077 goto out; 1078 1079 r->res_hash = hash; 1080 r->res_dir_nodeid = dir_nodeid; 1081 r->res_master_nodeid = dir_nodeid; 1082 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; 1083 kref_init(&r->res_ref); 1084 1085 write_lock_bh(&ls->ls_rsbtbl_lock); 1086 error = rsb_insert(r, &ls->ls_rsbtbl); 1087 if (error == -EEXIST) { 1088 /* somebody else was faster and it seems the 1089 * rsb exists now, we do a whole relookup 1090 */ 1091 write_unlock_bh(&ls->ls_rsbtbl_lock); 1092 dlm_free_rsb(r); 1093 goto retry_lookup; 1094 } else if (!error) { 1095 list_add(&r->res_rsbs_list, &ls->ls_keep); 1096 } 1097 write_unlock_bh(&ls->ls_rsbtbl_lock); 1098 1099 out: 1100 *r_ret = r; 1101 return error; 1102 } 1103 1104 static int find_rsb(struct dlm_ls *ls, const void *name, int len, 1105 int from_nodeid, unsigned int flags, 1106 struct dlm_rsb **r_ret) 1107 { 1108 int dir_nodeid; 1109 uint32_t hash; 1110 1111 if (len > DLM_RESNAME_MAXLEN) 1112 return -EINVAL; 1113 1114 hash = jhash(name, len, 0); 1115 dir_nodeid = dlm_hash2nodeid(ls, hash); 1116 1117 if (dlm_no_directory(ls)) 1118 return find_rsb_nodir(ls, name, len, hash, dir_nodeid, 1119 from_nodeid, flags, r_ret); 1120 else 1121 return find_rsb_dir(ls, name, len, hash, dir_nodeid, 1122 from_nodeid, flags, r_ret); 1123 } 1124 1125 /* we have received a request and found that res_master_nodeid != our_nodeid, 1126 so we need to return an error or make ourself the master */ 1127 1128 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, 1129 int from_nodeid) 1130 { 1131 if (dlm_no_directory(ls)) { 1132 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", 1133 from_nodeid, r->res_master_nodeid, 1134 r->res_dir_nodeid); 1135 dlm_print_rsb(r); 1136 return -ENOTBLK; 1137 } 1138 1139 if (from_nodeid != r->res_dir_nodeid) { 1140 /* our rsb is not master, and another node (not the dir node) 1141 has sent us a request. this is much more common when our 1142 master_nodeid is zero, so limit debug to non-zero. */ 1143 1144 if (r->res_master_nodeid) { 1145 log_debug(ls, "validate master from_other %d master %d " 1146 "dir %d first %x %s", from_nodeid, 1147 r->res_master_nodeid, r->res_dir_nodeid, 1148 r->res_first_lkid, r->res_name); 1149 } 1150 return -ENOTBLK; 1151 } else { 1152 /* our rsb is not master, but the dir nodeid has sent us a 1153 request; this could happen with master 0 / res_nodeid -1 */ 1154 1155 if (r->res_master_nodeid) { 1156 log_error(ls, "validate master from_dir %d master %d " 1157 "first %x %s", 1158 from_nodeid, r->res_master_nodeid, 1159 r->res_first_lkid, r->res_name); 1160 } 1161 1162 r->res_master_nodeid = dlm_our_nodeid(); 1163 r->res_nodeid = 0; 1164 return 0; 1165 } 1166 } 1167 1168 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, 1169 int from_nodeid, bool toss_list, unsigned int flags, 1170 int *r_nodeid, int *result) 1171 { 1172 int fix_master = (flags & DLM_LU_RECOVER_MASTER); 1173 int from_master = (flags & DLM_LU_RECOVER_DIR); 1174 1175 if (r->res_dir_nodeid != our_nodeid) { 1176 /* should not happen, but may as well fix it and carry on */ 1177 log_error(ls, "%s res_dir %d our %d %s", __func__, 1178 r->res_dir_nodeid, our_nodeid, r->res_name); 1179 r->res_dir_nodeid = our_nodeid; 1180 } 1181 1182 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { 1183 /* Recovery uses this function to set a new master when 1184 * the previous master failed. Setting NEW_MASTER will 1185 * force dlm_recover_masters to call recover_master on this 1186 * rsb even though the res_nodeid is no longer removed. 1187 */ 1188 1189 r->res_master_nodeid = from_nodeid; 1190 r->res_nodeid = from_nodeid; 1191 rsb_set_flag(r, RSB_NEW_MASTER); 1192 1193 if (toss_list) { 1194 /* I don't think we should ever find it on toss list. */ 1195 log_error(ls, "%s fix_master on toss", __func__); 1196 dlm_dump_rsb(r); 1197 } 1198 } 1199 1200 if (from_master && (r->res_master_nodeid != from_nodeid)) { 1201 /* this will happen if from_nodeid became master during 1202 * a previous recovery cycle, and we aborted the previous 1203 * cycle before recovering this master value 1204 */ 1205 1206 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s", 1207 __func__, from_nodeid, r->res_master_nodeid, 1208 r->res_nodeid, r->res_first_lkid, r->res_name); 1209 1210 if (r->res_master_nodeid == our_nodeid) { 1211 log_error(ls, "from_master %d our_master", from_nodeid); 1212 dlm_dump_rsb(r); 1213 goto ret_assign; 1214 } 1215 1216 r->res_master_nodeid = from_nodeid; 1217 r->res_nodeid = from_nodeid; 1218 rsb_set_flag(r, RSB_NEW_MASTER); 1219 } 1220 1221 if (!r->res_master_nodeid) { 1222 /* this will happen if recovery happens while we're looking 1223 * up the master for this rsb 1224 */ 1225 1226 log_debug(ls, "%s master 0 to %d first %x %s", __func__, 1227 from_nodeid, r->res_first_lkid, r->res_name); 1228 r->res_master_nodeid = from_nodeid; 1229 r->res_nodeid = from_nodeid; 1230 } 1231 1232 if (!from_master && !fix_master && 1233 (r->res_master_nodeid == from_nodeid)) { 1234 /* this can happen when the master sends remove, the dir node 1235 * finds the rsb on the keep list and ignores the remove, 1236 * and the former master sends a lookup 1237 */ 1238 1239 log_limit(ls, "%s from master %d flags %x first %x %s", 1240 __func__, from_nodeid, flags, r->res_first_lkid, 1241 r->res_name); 1242 } 1243 1244 ret_assign: 1245 *r_nodeid = r->res_master_nodeid; 1246 if (result) 1247 *result = DLM_LU_MATCH; 1248 } 1249 1250 /* 1251 * We're the dir node for this res and another node wants to know the 1252 * master nodeid. During normal operation (non recovery) this is only 1253 * called from receive_lookup(); master lookups when the local node is 1254 * the dir node are done by find_rsb(). 1255 * 1256 * normal operation, we are the dir node for a resource 1257 * . _request_lock 1258 * . set_master 1259 * . send_lookup 1260 * . receive_lookup 1261 * . dlm_master_lookup flags 0 1262 * 1263 * recover directory, we are rebuilding dir for all resources 1264 * . dlm_recover_directory 1265 * . dlm_rcom_names 1266 * remote node sends back the rsb names it is master of and we are dir of 1267 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) 1268 * we either create new rsb setting remote node as master, or find existing 1269 * rsb and set master to be the remote node. 1270 * 1271 * recover masters, we are finding the new master for resources 1272 * . dlm_recover_masters 1273 * . recover_master 1274 * . dlm_send_rcom_lookup 1275 * . receive_rcom_lookup 1276 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 1277 */ 1278 1279 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, 1280 int len, unsigned int flags, int *r_nodeid, int *result) 1281 { 1282 struct dlm_rsb *r = NULL; 1283 uint32_t hash; 1284 int our_nodeid = dlm_our_nodeid(); 1285 int dir_nodeid, error; 1286 1287 if (len > DLM_RESNAME_MAXLEN) 1288 return -EINVAL; 1289 1290 if (from_nodeid == our_nodeid) { 1291 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", 1292 our_nodeid, flags); 1293 return -EINVAL; 1294 } 1295 1296 hash = jhash(name, len, 0); 1297 dir_nodeid = dlm_hash2nodeid(ls, hash); 1298 if (dir_nodeid != our_nodeid) { 1299 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", 1300 from_nodeid, dir_nodeid, our_nodeid, hash, 1301 ls->ls_num_nodes); 1302 *r_nodeid = -1; 1303 return -EINVAL; 1304 } 1305 1306 retry: 1307 error = pre_rsb_struct(ls); 1308 if (error < 0) 1309 return error; 1310 1311 retry_lookup: 1312 1313 /* check if the rsb is in keep state under read lock - likely path */ 1314 read_lock_bh(&ls->ls_rsbtbl_lock); 1315 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 1316 if (!error) { 1317 if (rsb_flag(r, RSB_TOSS)) { 1318 read_unlock_bh(&ls->ls_rsbtbl_lock); 1319 goto do_toss; 1320 } 1321 1322 /* because the rsb is active, we need to lock_rsb before 1323 * checking/changing re_master_nodeid 1324 */ 1325 1326 hold_rsb(r); 1327 read_unlock_bh(&ls->ls_rsbtbl_lock); 1328 lock_rsb(r); 1329 1330 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false, 1331 flags, r_nodeid, result); 1332 1333 /* the rsb was active */ 1334 unlock_rsb(r); 1335 put_rsb(r); 1336 1337 return 0; 1338 } else { 1339 read_unlock_bh(&ls->ls_rsbtbl_lock); 1340 goto not_found; 1341 } 1342 1343 do_toss: 1344 /* unlikely path - relookup under write */ 1345 write_lock_bh(&ls->ls_rsbtbl_lock); 1346 1347 /* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock 1348 * check if the rsb is still in toss state, if not relookup 1349 */ 1350 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 1351 if (!error) { 1352 if (!rsb_flag(r, RSB_TOSS)) { 1353 write_unlock_bh(&ls->ls_rsbtbl_lock); 1354 /* something as changed, very unlikely but 1355 * try again 1356 */ 1357 goto retry_lookup; 1358 } 1359 } else { 1360 write_unlock_bh(&ls->ls_rsbtbl_lock); 1361 goto not_found; 1362 } 1363 1364 /* because the rsb is inactive (on toss list), it's not refcounted 1365 * and lock_rsb is not used, but is protected by the rsbtbl lock 1366 */ 1367 1368 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, 1369 r_nodeid, result); 1370 1371 rsb_mod_timer(ls, r); 1372 /* the rsb was inactive (on toss list) */ 1373 write_unlock_bh(&ls->ls_rsbtbl_lock); 1374 1375 return 0; 1376 1377 not_found: 1378 error = get_rsb_struct(ls, name, len, &r); 1379 if (error == -EAGAIN) 1380 goto retry; 1381 if (error) 1382 goto out; 1383 1384 r->res_hash = hash; 1385 r->res_dir_nodeid = our_nodeid; 1386 r->res_master_nodeid = from_nodeid; 1387 r->res_nodeid = from_nodeid; 1388 kref_init(&r->res_ref); 1389 rsb_set_flag(r, RSB_TOSS); 1390 1391 write_lock_bh(&ls->ls_rsbtbl_lock); 1392 error = rsb_insert(r, &ls->ls_rsbtbl); 1393 if (error == -EEXIST) { 1394 /* somebody else was faster and it seems the 1395 * rsb exists now, we do a whole relookup 1396 */ 1397 write_unlock_bh(&ls->ls_rsbtbl_lock); 1398 dlm_free_rsb(r); 1399 goto retry_lookup; 1400 } else if (error) { 1401 write_unlock_bh(&ls->ls_rsbtbl_lock); 1402 /* should never happen */ 1403 dlm_free_rsb(r); 1404 goto retry; 1405 } 1406 1407 list_add(&r->res_rsbs_list, &ls->ls_toss); 1408 rsb_mod_timer(ls, r); 1409 write_unlock_bh(&ls->ls_rsbtbl_lock); 1410 1411 if (result) 1412 *result = DLM_LU_ADD; 1413 *r_nodeid = from_nodeid; 1414 out: 1415 return error; 1416 } 1417 1418 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1419 { 1420 struct dlm_rsb *r; 1421 1422 read_lock_bh(&ls->ls_rsbtbl_lock); 1423 list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { 1424 if (r->res_hash == hash) 1425 dlm_dump_rsb(r); 1426 } 1427 read_unlock_bh(&ls->ls_rsbtbl_lock); 1428 } 1429 1430 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len) 1431 { 1432 struct dlm_rsb *r = NULL; 1433 int error; 1434 1435 read_lock_bh(&ls->ls_rsbtbl_lock); 1436 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 1437 if (!error) 1438 goto out; 1439 1440 dlm_dump_rsb(r); 1441 out: 1442 read_unlock_bh(&ls->ls_rsbtbl_lock); 1443 } 1444 1445 static void toss_rsb(struct kref *kref) 1446 { 1447 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1448 struct dlm_ls *ls = r->res_ls; 1449 1450 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1451 rsb_set_flag(r, RSB_TOSS); 1452 list_move(&r->res_rsbs_list, &ls->ls_toss); 1453 rsb_mod_timer(ls, r); 1454 1455 if (r->res_lvbptr) { 1456 dlm_free_lvb(r->res_lvbptr); 1457 r->res_lvbptr = NULL; 1458 } 1459 } 1460 1461 /* See comment for unhold_lkb */ 1462 1463 static void unhold_rsb(struct dlm_rsb *r) 1464 { 1465 int rv; 1466 1467 /* rsbs in toss state never get referenced */ 1468 WARN_ON(rsb_flag(r, RSB_TOSS)); 1469 rv = kref_put(&r->res_ref, toss_rsb); 1470 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1471 } 1472 1473 void free_toss_rsb(struct dlm_rsb *r) 1474 { 1475 WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); 1476 1477 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1478 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1479 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1480 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1481 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1482 DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); 1483 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1484 DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); 1485 1486 dlm_free_rsb(r); 1487 } 1488 1489 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 1490 The rsb must exist as long as any lkb's for it do. */ 1491 1492 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1493 { 1494 hold_rsb(r); 1495 lkb->lkb_resource = r; 1496 } 1497 1498 static void detach_lkb(struct dlm_lkb *lkb) 1499 { 1500 if (lkb->lkb_resource) { 1501 put_rsb(lkb->lkb_resource); 1502 lkb->lkb_resource = NULL; 1503 } 1504 } 1505 1506 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, 1507 int start, int end) 1508 { 1509 struct dlm_lkb *lkb; 1510 int rv; 1511 1512 lkb = dlm_allocate_lkb(ls); 1513 if (!lkb) 1514 return -ENOMEM; 1515 1516 lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV; 1517 lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV; 1518 lkb->lkb_last_cb_mode = DLM_LOCK_IV; 1519 lkb->lkb_nodeid = -1; 1520 lkb->lkb_grmode = DLM_LOCK_IV; 1521 kref_init(&lkb->lkb_ref); 1522 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1523 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1524 1525 write_lock_bh(&ls->ls_lkbidr_lock); 1526 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); 1527 if (rv >= 0) 1528 lkb->lkb_id = rv; 1529 write_unlock_bh(&ls->ls_lkbidr_lock); 1530 1531 if (rv < 0) { 1532 log_error(ls, "create_lkb idr error %d", rv); 1533 dlm_free_lkb(lkb); 1534 return rv; 1535 } 1536 1537 *lkb_ret = lkb; 1538 return 0; 1539 } 1540 1541 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1542 { 1543 return _create_lkb(ls, lkb_ret, 1, 0); 1544 } 1545 1546 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1547 { 1548 struct dlm_lkb *lkb; 1549 1550 read_lock_bh(&ls->ls_lkbidr_lock); 1551 lkb = idr_find(&ls->ls_lkbidr, lkid); 1552 if (lkb) 1553 kref_get(&lkb->lkb_ref); 1554 read_unlock_bh(&ls->ls_lkbidr_lock); 1555 1556 *lkb_ret = lkb; 1557 return lkb ? 0 : -ENOENT; 1558 } 1559 1560 static void kill_lkb(struct kref *kref) 1561 { 1562 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1563 1564 /* All work is done after the return from kref_put() so we 1565 can release the write_lock before the detach_lkb */ 1566 1567 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1568 } 1569 1570 /* __put_lkb() is used when an lkb may not have an rsb attached to 1571 it so we need to provide the lockspace explicitly */ 1572 1573 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 1574 { 1575 uint32_t lkid = lkb->lkb_id; 1576 int rv; 1577 1578 rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb, 1579 &ls->ls_lkbidr_lock); 1580 if (rv) { 1581 idr_remove(&ls->ls_lkbidr, lkid); 1582 write_unlock_bh(&ls->ls_lkbidr_lock); 1583 1584 detach_lkb(lkb); 1585 1586 /* for local/process lkbs, lvbptr points to caller's lksb */ 1587 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 1588 dlm_free_lvb(lkb->lkb_lvbptr); 1589 dlm_free_lkb(lkb); 1590 } 1591 1592 return rv; 1593 } 1594 1595 int dlm_put_lkb(struct dlm_lkb *lkb) 1596 { 1597 struct dlm_ls *ls; 1598 1599 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 1600 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 1601 1602 ls = lkb->lkb_resource->res_ls; 1603 return __put_lkb(ls, lkb); 1604 } 1605 1606 /* This is only called to add a reference when the code already holds 1607 a valid reference to the lkb, so there's no need for locking. */ 1608 1609 static inline void hold_lkb(struct dlm_lkb *lkb) 1610 { 1611 kref_get(&lkb->lkb_ref); 1612 } 1613 1614 static void unhold_lkb_assert(struct kref *kref) 1615 { 1616 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1617 1618 DLM_ASSERT(false, dlm_print_lkb(lkb);); 1619 } 1620 1621 /* This is called when we need to remove a reference and are certain 1622 it's not the last ref. e.g. del_lkb is always called between a 1623 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 1624 put_lkb would work fine, but would involve unnecessary locking */ 1625 1626 static inline void unhold_lkb(struct dlm_lkb *lkb) 1627 { 1628 kref_put(&lkb->lkb_ref, unhold_lkb_assert); 1629 } 1630 1631 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 1632 int mode) 1633 { 1634 struct dlm_lkb *lkb = NULL, *iter; 1635 1636 list_for_each_entry(iter, head, lkb_statequeue) 1637 if (iter->lkb_rqmode < mode) { 1638 lkb = iter; 1639 list_add_tail(new, &iter->lkb_statequeue); 1640 break; 1641 } 1642 1643 if (!lkb) 1644 list_add_tail(new, head); 1645 } 1646 1647 /* add/remove lkb to rsb's grant/convert/wait queue */ 1648 1649 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 1650 { 1651 kref_get(&lkb->lkb_ref); 1652 1653 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1654 1655 lkb->lkb_timestamp = ktime_get(); 1656 1657 lkb->lkb_status = status; 1658 1659 switch (status) { 1660 case DLM_LKSTS_WAITING: 1661 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1662 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 1663 else 1664 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 1665 break; 1666 case DLM_LKSTS_GRANTED: 1667 /* convention says granted locks kept in order of grmode */ 1668 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 1669 lkb->lkb_grmode); 1670 break; 1671 case DLM_LKSTS_CONVERT: 1672 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1673 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 1674 else 1675 list_add_tail(&lkb->lkb_statequeue, 1676 &r->res_convertqueue); 1677 break; 1678 default: 1679 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 1680 } 1681 } 1682 1683 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1684 { 1685 lkb->lkb_status = 0; 1686 list_del(&lkb->lkb_statequeue); 1687 unhold_lkb(lkb); 1688 } 1689 1690 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 1691 { 1692 hold_lkb(lkb); 1693 del_lkb(r, lkb); 1694 add_lkb(r, lkb, sts); 1695 unhold_lkb(lkb); 1696 } 1697 1698 static int msg_reply_type(int mstype) 1699 { 1700 switch (mstype) { 1701 case DLM_MSG_REQUEST: 1702 return DLM_MSG_REQUEST_REPLY; 1703 case DLM_MSG_CONVERT: 1704 return DLM_MSG_CONVERT_REPLY; 1705 case DLM_MSG_UNLOCK: 1706 return DLM_MSG_UNLOCK_REPLY; 1707 case DLM_MSG_CANCEL: 1708 return DLM_MSG_CANCEL_REPLY; 1709 case DLM_MSG_LOOKUP: 1710 return DLM_MSG_LOOKUP_REPLY; 1711 } 1712 return -1; 1713 } 1714 1715 /* add/remove lkb from global waiters list of lkb's waiting for 1716 a reply from a remote node */ 1717 1718 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) 1719 { 1720 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1721 int error = 0; 1722 1723 spin_lock_bh(&ls->ls_waiters_lock); 1724 1725 if (is_overlap_unlock(lkb) || 1726 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 1727 error = -EINVAL; 1728 goto out; 1729 } 1730 1731 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 1732 switch (mstype) { 1733 case DLM_MSG_UNLOCK: 1734 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 1735 break; 1736 case DLM_MSG_CANCEL: 1737 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 1738 break; 1739 default: 1740 error = -EBUSY; 1741 goto out; 1742 } 1743 lkb->lkb_wait_count++; 1744 hold_lkb(lkb); 1745 1746 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", 1747 lkb->lkb_id, lkb->lkb_wait_type, mstype, 1748 lkb->lkb_wait_count, dlm_iflags_val(lkb)); 1749 goto out; 1750 } 1751 1752 DLM_ASSERT(!lkb->lkb_wait_count, 1753 dlm_print_lkb(lkb); 1754 printk("wait_count %d\n", lkb->lkb_wait_count);); 1755 1756 lkb->lkb_wait_count++; 1757 lkb->lkb_wait_type = mstype; 1758 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ 1759 hold_lkb(lkb); 1760 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 1761 out: 1762 if (error) 1763 log_error(ls, "addwait error %x %d flags %x %d %d %s", 1764 lkb->lkb_id, error, dlm_iflags_val(lkb), mstype, 1765 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 1766 spin_unlock_bh(&ls->ls_waiters_lock); 1767 return error; 1768 } 1769 1770 /* We clear the RESEND flag because we might be taking an lkb off the waiters 1771 list as part of process_requestqueue (e.g. a lookup that has an optimized 1772 request reply on the requestqueue) between dlm_recover_waiters_pre() which 1773 set RESEND and dlm_recover_waiters_post() */ 1774 1775 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, 1776 const struct dlm_message *ms) 1777 { 1778 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1779 int overlap_done = 0; 1780 1781 if (mstype == DLM_MSG_UNLOCK_REPLY && 1782 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) { 1783 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); 1784 overlap_done = 1; 1785 goto out_del; 1786 } 1787 1788 if (mstype == DLM_MSG_CANCEL_REPLY && 1789 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) { 1790 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); 1791 overlap_done = 1; 1792 goto out_del; 1793 } 1794 1795 /* Cancel state was preemptively cleared by a successful convert, 1796 see next comment, nothing to do. */ 1797 1798 if ((mstype == DLM_MSG_CANCEL_REPLY) && 1799 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { 1800 log_debug(ls, "remwait %x cancel_reply wait_type %d", 1801 lkb->lkb_id, lkb->lkb_wait_type); 1802 return -1; 1803 } 1804 1805 /* Remove for the convert reply, and premptively remove for the 1806 cancel reply. A convert has been granted while there's still 1807 an outstanding cancel on it (the cancel is moot and the result 1808 in the cancel reply should be 0). We preempt the cancel reply 1809 because the app gets the convert result and then can follow up 1810 with another op, like convert. This subsequent op would see the 1811 lingering state of the cancel and fail with -EBUSY. */ 1812 1813 if ((mstype == DLM_MSG_CONVERT_REPLY) && 1814 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result && 1815 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) { 1816 log_debug(ls, "remwait %x convert_reply zap overlap_cancel", 1817 lkb->lkb_id); 1818 lkb->lkb_wait_type = 0; 1819 lkb->lkb_wait_count--; 1820 unhold_lkb(lkb); 1821 goto out_del; 1822 } 1823 1824 /* N.B. type of reply may not always correspond to type of original 1825 msg due to lookup->request optimization, verify others? */ 1826 1827 if (lkb->lkb_wait_type) { 1828 lkb->lkb_wait_type = 0; 1829 goto out_del; 1830 } 1831 1832 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1833 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0, 1834 lkb->lkb_remid, mstype, dlm_iflags_val(lkb)); 1835 return -1; 1836 1837 out_del: 1838 /* the force-unlock/cancel has completed and we haven't recvd a reply 1839 to the op that was in progress prior to the unlock/cancel; we 1840 give up on any reply to the earlier op. FIXME: not sure when/how 1841 this would happen */ 1842 1843 if (overlap_done && lkb->lkb_wait_type) { 1844 log_error(ls, "remwait error %x reply %d wait_type %d overlap", 1845 lkb->lkb_id, mstype, lkb->lkb_wait_type); 1846 lkb->lkb_wait_count--; 1847 unhold_lkb(lkb); 1848 lkb->lkb_wait_type = 0; 1849 } 1850 1851 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 1852 1853 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 1854 lkb->lkb_wait_count--; 1855 if (!lkb->lkb_wait_count) 1856 list_del_init(&lkb->lkb_wait_reply); 1857 unhold_lkb(lkb); 1858 return 0; 1859 } 1860 1861 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 1862 { 1863 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1864 int error; 1865 1866 spin_lock_bh(&ls->ls_waiters_lock); 1867 error = _remove_from_waiters(lkb, mstype, NULL); 1868 spin_unlock_bh(&ls->ls_waiters_lock); 1869 return error; 1870 } 1871 1872 /* Handles situations where we might be processing a "fake" or "local" reply in 1873 * the recovery context which stops any locking activity. Only debugfs might 1874 * change the lockspace waiters but they will held the recovery lock to ensure 1875 * remove_from_waiters_ms() in local case will be the only user manipulating the 1876 * lockspace waiters in recovery context. 1877 */ 1878 1879 static int remove_from_waiters_ms(struct dlm_lkb *lkb, 1880 const struct dlm_message *ms, bool local) 1881 { 1882 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1883 int error; 1884 1885 if (!local) 1886 spin_lock_bh(&ls->ls_waiters_lock); 1887 else 1888 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) || 1889 !dlm_locking_stopped(ls)); 1890 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms); 1891 if (!local) 1892 spin_unlock_bh(&ls->ls_waiters_lock); 1893 return error; 1894 } 1895 1896 /* lkb is master or local copy */ 1897 1898 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1899 { 1900 int b, len = r->res_ls->ls_lvblen; 1901 1902 /* b=1 lvb returned to caller 1903 b=0 lvb written to rsb or invalidated 1904 b=-1 do nothing */ 1905 1906 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1907 1908 if (b == 1) { 1909 if (!lkb->lkb_lvbptr) 1910 return; 1911 1912 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1913 return; 1914 1915 if (!r->res_lvbptr) 1916 return; 1917 1918 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1919 lkb->lkb_lvbseq = r->res_lvbseq; 1920 1921 } else if (b == 0) { 1922 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1923 rsb_set_flag(r, RSB_VALNOTVALID); 1924 return; 1925 } 1926 1927 if (!lkb->lkb_lvbptr) 1928 return; 1929 1930 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1931 return; 1932 1933 if (!r->res_lvbptr) 1934 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1935 1936 if (!r->res_lvbptr) 1937 return; 1938 1939 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1940 r->res_lvbseq++; 1941 lkb->lkb_lvbseq = r->res_lvbseq; 1942 rsb_clear_flag(r, RSB_VALNOTVALID); 1943 } 1944 1945 if (rsb_flag(r, RSB_VALNOTVALID)) 1946 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags); 1947 } 1948 1949 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1950 { 1951 if (lkb->lkb_grmode < DLM_LOCK_PW) 1952 return; 1953 1954 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1955 rsb_set_flag(r, RSB_VALNOTVALID); 1956 return; 1957 } 1958 1959 if (!lkb->lkb_lvbptr) 1960 return; 1961 1962 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1963 return; 1964 1965 if (!r->res_lvbptr) 1966 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1967 1968 if (!r->res_lvbptr) 1969 return; 1970 1971 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 1972 r->res_lvbseq++; 1973 rsb_clear_flag(r, RSB_VALNOTVALID); 1974 } 1975 1976 /* lkb is process copy (pc) */ 1977 1978 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1979 const struct dlm_message *ms) 1980 { 1981 int b; 1982 1983 if (!lkb->lkb_lvbptr) 1984 return; 1985 1986 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1987 return; 1988 1989 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1990 if (b == 1) { 1991 int len = receive_extralen(ms); 1992 if (len > r->res_ls->ls_lvblen) 1993 len = r->res_ls->ls_lvblen; 1994 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 1995 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 1996 } 1997 } 1998 1999 /* Manipulate lkb's on rsb's convert/granted/waiting queues 2000 remove_lock -- used for unlock, removes lkb from granted 2001 revert_lock -- used for cancel, moves lkb from convert to granted 2002 grant_lock -- used for request and convert, adds lkb to granted or 2003 moves lkb from convert or waiting to granted 2004 2005 Each of these is used for master or local copy lkb's. There is 2006 also a _pc() variation used to make the corresponding change on 2007 a process copy (pc) lkb. */ 2008 2009 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2010 { 2011 del_lkb(r, lkb); 2012 lkb->lkb_grmode = DLM_LOCK_IV; 2013 /* this unhold undoes the original ref from create_lkb() 2014 so this leads to the lkb being freed */ 2015 unhold_lkb(lkb); 2016 } 2017 2018 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2019 { 2020 set_lvb_unlock(r, lkb); 2021 _remove_lock(r, lkb); 2022 } 2023 2024 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2025 { 2026 _remove_lock(r, lkb); 2027 } 2028 2029 /* returns: 0 did nothing 2030 1 moved lock to granted 2031 -1 removed lock */ 2032 2033 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2034 { 2035 int rv = 0; 2036 2037 lkb->lkb_rqmode = DLM_LOCK_IV; 2038 2039 switch (lkb->lkb_status) { 2040 case DLM_LKSTS_GRANTED: 2041 break; 2042 case DLM_LKSTS_CONVERT: 2043 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2044 rv = 1; 2045 break; 2046 case DLM_LKSTS_WAITING: 2047 del_lkb(r, lkb); 2048 lkb->lkb_grmode = DLM_LOCK_IV; 2049 /* this unhold undoes the original ref from create_lkb() 2050 so this leads to the lkb being freed */ 2051 unhold_lkb(lkb); 2052 rv = -1; 2053 break; 2054 default: 2055 log_print("invalid status for revert %d", lkb->lkb_status); 2056 } 2057 return rv; 2058 } 2059 2060 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2061 { 2062 return revert_lock(r, lkb); 2063 } 2064 2065 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2066 { 2067 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 2068 lkb->lkb_grmode = lkb->lkb_rqmode; 2069 if (lkb->lkb_status) 2070 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2071 else 2072 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 2073 } 2074 2075 lkb->lkb_rqmode = DLM_LOCK_IV; 2076 lkb->lkb_highbast = 0; 2077 } 2078 2079 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2080 { 2081 set_lvb_lock(r, lkb); 2082 _grant_lock(r, lkb); 2083 } 2084 2085 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2086 const struct dlm_message *ms) 2087 { 2088 set_lvb_lock_pc(r, lkb, ms); 2089 _grant_lock(r, lkb); 2090 } 2091 2092 /* called by grant_pending_locks() which means an async grant message must 2093 be sent to the requesting node in addition to granting the lock if the 2094 lkb belongs to a remote node. */ 2095 2096 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 2097 { 2098 grant_lock(r, lkb); 2099 if (is_master_copy(lkb)) 2100 send_grant(r, lkb); 2101 else 2102 queue_cast(r, lkb, 0); 2103 } 2104 2105 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 2106 change the granted/requested modes. We're munging things accordingly in 2107 the process copy. 2108 CONVDEADLK: our grmode may have been forced down to NL to resolve a 2109 conversion deadlock 2110 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 2111 compatible with other granted locks */ 2112 2113 static void munge_demoted(struct dlm_lkb *lkb) 2114 { 2115 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 2116 log_print("munge_demoted %x invalid modes gr %d rq %d", 2117 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 2118 return; 2119 } 2120 2121 lkb->lkb_grmode = DLM_LOCK_NL; 2122 } 2123 2124 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms) 2125 { 2126 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) && 2127 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) { 2128 log_print("munge_altmode %x invalid reply type %d", 2129 lkb->lkb_id, le32_to_cpu(ms->m_type)); 2130 return; 2131 } 2132 2133 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 2134 lkb->lkb_rqmode = DLM_LOCK_PR; 2135 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 2136 lkb->lkb_rqmode = DLM_LOCK_CW; 2137 else { 2138 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 2139 dlm_print_lkb(lkb); 2140 } 2141 } 2142 2143 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 2144 { 2145 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 2146 lkb_statequeue); 2147 if (lkb->lkb_id == first->lkb_id) 2148 return 1; 2149 2150 return 0; 2151 } 2152 2153 /* Check if the given lkb conflicts with another lkb on the queue. */ 2154 2155 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 2156 { 2157 struct dlm_lkb *this; 2158 2159 list_for_each_entry(this, head, lkb_statequeue) { 2160 if (this == lkb) 2161 continue; 2162 if (!modes_compat(this, lkb)) 2163 return 1; 2164 } 2165 return 0; 2166 } 2167 2168 /* 2169 * "A conversion deadlock arises with a pair of lock requests in the converting 2170 * queue for one resource. The granted mode of each lock blocks the requested 2171 * mode of the other lock." 2172 * 2173 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the 2174 * convert queue from being granted, then deadlk/demote lkb. 2175 * 2176 * Example: 2177 * Granted Queue: empty 2178 * Convert Queue: NL->EX (first lock) 2179 * PR->EX (second lock) 2180 * 2181 * The first lock can't be granted because of the granted mode of the second 2182 * lock and the second lock can't be granted because it's not first in the 2183 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we 2184 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK 2185 * flag set and return DEMOTED in the lksb flags. 2186 * 2187 * Originally, this function detected conv-deadlk in a more limited scope: 2188 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or 2189 * - if lkb1 was the first entry in the queue (not just earlier), and was 2190 * blocked by the granted mode of lkb2, and there was nothing on the 2191 * granted queue preventing lkb1 from being granted immediately, i.e. 2192 * lkb2 was the only thing preventing lkb1 from being granted. 2193 * 2194 * That second condition meant we'd only say there was conv-deadlk if 2195 * resolving it (by demotion) would lead to the first lock on the convert 2196 * queue being granted right away. It allowed conversion deadlocks to exist 2197 * between locks on the convert queue while they couldn't be granted anyway. 2198 * 2199 * Now, we detect and take action on conversion deadlocks immediately when 2200 * they're created, even if they may not be immediately consequential. If 2201 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted 2202 * mode that would prevent lkb1's conversion from being granted, we do a 2203 * deadlk/demote on lkb2 right away and don't let it onto the convert queue. 2204 * I think this means that the lkb_is_ahead condition below should always 2205 * be zero, i.e. there will never be conv-deadlk between two locks that are 2206 * both already on the convert queue. 2207 */ 2208 2209 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) 2210 { 2211 struct dlm_lkb *lkb1; 2212 int lkb_is_ahead = 0; 2213 2214 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { 2215 if (lkb1 == lkb2) { 2216 lkb_is_ahead = 1; 2217 continue; 2218 } 2219 2220 if (!lkb_is_ahead) { 2221 if (!modes_compat(lkb2, lkb1)) 2222 return 1; 2223 } else { 2224 if (!modes_compat(lkb2, lkb1) && 2225 !modes_compat(lkb1, lkb2)) 2226 return 1; 2227 } 2228 } 2229 return 0; 2230 } 2231 2232 /* 2233 * Return 1 if the lock can be granted, 0 otherwise. 2234 * Also detect and resolve conversion deadlocks. 2235 * 2236 * lkb is the lock to be granted 2237 * 2238 * now is 1 if the function is being called in the context of the 2239 * immediate request, it is 0 if called later, after the lock has been 2240 * queued. 2241 * 2242 * recover is 1 if dlm_recover_grant() is trying to grant conversions 2243 * after recovery. 2244 * 2245 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2246 */ 2247 2248 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2249 int recover) 2250 { 2251 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2252 2253 /* 2254 * 6-10: Version 5.4 introduced an option to address the phenomenon of 2255 * a new request for a NL mode lock being blocked. 2256 * 2257 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 2258 * request, then it would be granted. In essence, the use of this flag 2259 * tells the Lock Manager to expedite theis request by not considering 2260 * what may be in the CONVERTING or WAITING queues... As of this 2261 * writing, the EXPEDITE flag can be used only with new requests for NL 2262 * mode locks. This flag is not valid for conversion requests. 2263 * 2264 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 2265 * conversion or used with a non-NL requested mode. We also know an 2266 * EXPEDITE request is always granted immediately, so now must always 2267 * be 1. The full condition to grant an expedite request: (now && 2268 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 2269 * therefore be shortened to just checking the flag. 2270 */ 2271 2272 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 2273 return 1; 2274 2275 /* 2276 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 2277 * added to the remaining conditions. 2278 */ 2279 2280 if (queue_conflict(&r->res_grantqueue, lkb)) 2281 return 0; 2282 2283 /* 2284 * 6-3: By default, a conversion request is immediately granted if the 2285 * requested mode is compatible with the modes of all other granted 2286 * locks 2287 */ 2288 2289 if (queue_conflict(&r->res_convertqueue, lkb)) 2290 return 0; 2291 2292 /* 2293 * The RECOVER_GRANT flag means dlm_recover_grant() is granting 2294 * locks for a recovered rsb, on which lkb's have been rebuilt. 2295 * The lkb's may have been rebuilt on the queues in a different 2296 * order than they were in on the previous master. So, granting 2297 * queued conversions in order after recovery doesn't make sense 2298 * since the order hasn't been preserved anyway. The new order 2299 * could also have created a new "in place" conversion deadlock. 2300 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. 2301 * After recovery, there would be no granted locks, and possibly 2302 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after 2303 * recovery, grant conversions without considering order. 2304 */ 2305 2306 if (conv && recover) 2307 return 1; 2308 2309 /* 2310 * 6-5: But the default algorithm for deciding whether to grant or 2311 * queue conversion requests does not by itself guarantee that such 2312 * requests are serviced on a "first come first serve" basis. This, in 2313 * turn, can lead to a phenomenon known as "indefinate postponement". 2314 * 2315 * 6-7: This issue is dealt with by using the optional QUECVT flag with 2316 * the system service employed to request a lock conversion. This flag 2317 * forces certain conversion requests to be queued, even if they are 2318 * compatible with the granted modes of other locks on the same 2319 * resource. Thus, the use of this flag results in conversion requests 2320 * being ordered on a "first come first servce" basis. 2321 * 2322 * DCT: This condition is all about new conversions being able to occur 2323 * "in place" while the lock remains on the granted queue (assuming 2324 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 2325 * doesn't _have_ to go onto the convert queue where it's processed in 2326 * order. The "now" variable is necessary to distinguish converts 2327 * being received and processed for the first time now, because once a 2328 * convert is moved to the conversion queue the condition below applies 2329 * requiring fifo granting. 2330 */ 2331 2332 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 2333 return 1; 2334 2335 /* 2336 * Even if the convert is compat with all granted locks, 2337 * QUECVT forces it behind other locks on the convert queue. 2338 */ 2339 2340 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { 2341 if (list_empty(&r->res_convertqueue)) 2342 return 1; 2343 else 2344 return 0; 2345 } 2346 2347 /* 2348 * The NOORDER flag is set to avoid the standard vms rules on grant 2349 * order. 2350 */ 2351 2352 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 2353 return 1; 2354 2355 /* 2356 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 2357 * granted until all other conversion requests ahead of it are granted 2358 * and/or canceled. 2359 */ 2360 2361 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 2362 return 1; 2363 2364 /* 2365 * 6-4: By default, a new request is immediately granted only if all 2366 * three of the following conditions are satisfied when the request is 2367 * issued: 2368 * - The queue of ungranted conversion requests for the resource is 2369 * empty. 2370 * - The queue of ungranted new requests for the resource is empty. 2371 * - The mode of the new request is compatible with the most 2372 * restrictive mode of all granted locks on the resource. 2373 */ 2374 2375 if (now && !conv && list_empty(&r->res_convertqueue) && 2376 list_empty(&r->res_waitqueue)) 2377 return 1; 2378 2379 /* 2380 * 6-4: Once a lock request is in the queue of ungranted new requests, 2381 * it cannot be granted until the queue of ungranted conversion 2382 * requests is empty, all ungranted new requests ahead of it are 2383 * granted and/or canceled, and it is compatible with the granted mode 2384 * of the most restrictive lock granted on the resource. 2385 */ 2386 2387 if (!now && !conv && list_empty(&r->res_convertqueue) && 2388 first_in_list(lkb, &r->res_waitqueue)) 2389 return 1; 2390 2391 return 0; 2392 } 2393 2394 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2395 int recover, int *err) 2396 { 2397 int rv; 2398 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2399 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); 2400 2401 if (err) 2402 *err = 0; 2403 2404 rv = _can_be_granted(r, lkb, now, recover); 2405 if (rv) 2406 goto out; 2407 2408 /* 2409 * The CONVDEADLK flag is non-standard and tells the dlm to resolve 2410 * conversion deadlocks by demoting grmode to NL, otherwise the dlm 2411 * cancels one of the locks. 2412 */ 2413 2414 if (is_convert && can_be_queued(lkb) && 2415 conversion_deadlock_detect(r, lkb)) { 2416 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { 2417 lkb->lkb_grmode = DLM_LOCK_NL; 2418 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags); 2419 } else if (err) { 2420 *err = -EDEADLK; 2421 } else { 2422 log_print("can_be_granted deadlock %x now %d", 2423 lkb->lkb_id, now); 2424 dlm_dump_rsb(r); 2425 } 2426 goto out; 2427 } 2428 2429 /* 2430 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try 2431 * to grant a request in a mode other than the normal rqmode. It's a 2432 * simple way to provide a big optimization to applications that can 2433 * use them. 2434 */ 2435 2436 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) 2437 alt = DLM_LOCK_PR; 2438 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) 2439 alt = DLM_LOCK_CW; 2440 2441 if (alt) { 2442 lkb->lkb_rqmode = alt; 2443 rv = _can_be_granted(r, lkb, now, 0); 2444 if (rv) 2445 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags); 2446 else 2447 lkb->lkb_rqmode = rqmode; 2448 } 2449 out: 2450 return rv; 2451 } 2452 2453 /* Returns the highest requested mode of all blocked conversions; sets 2454 cw if there's a blocked conversion to DLM_LOCK_CW. */ 2455 2456 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 2457 unsigned int *count) 2458 { 2459 struct dlm_lkb *lkb, *s; 2460 int recover = rsb_flag(r, RSB_RECOVER_GRANT); 2461 int hi, demoted, quit, grant_restart, demote_restart; 2462 int deadlk; 2463 2464 quit = 0; 2465 restart: 2466 grant_restart = 0; 2467 demote_restart = 0; 2468 hi = DLM_LOCK_IV; 2469 2470 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 2471 demoted = is_demoted(lkb); 2472 deadlk = 0; 2473 2474 if (can_be_granted(r, lkb, 0, recover, &deadlk)) { 2475 grant_lock_pending(r, lkb); 2476 grant_restart = 1; 2477 if (count) 2478 (*count)++; 2479 continue; 2480 } 2481 2482 if (!demoted && is_demoted(lkb)) { 2483 log_print("WARN: pending demoted %x node %d %s", 2484 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2485 demote_restart = 1; 2486 continue; 2487 } 2488 2489 if (deadlk) { 2490 /* 2491 * If DLM_LKB_NODLKWT flag is set and conversion 2492 * deadlock is detected, we request blocking AST and 2493 * down (or cancel) conversion. 2494 */ 2495 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) { 2496 if (lkb->lkb_highbast < lkb->lkb_rqmode) { 2497 queue_bast(r, lkb, lkb->lkb_rqmode); 2498 lkb->lkb_highbast = lkb->lkb_rqmode; 2499 } 2500 } else { 2501 log_print("WARN: pending deadlock %x node %d %s", 2502 lkb->lkb_id, lkb->lkb_nodeid, 2503 r->res_name); 2504 dlm_dump_rsb(r); 2505 } 2506 continue; 2507 } 2508 2509 hi = max_t(int, lkb->lkb_rqmode, hi); 2510 2511 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) 2512 *cw = 1; 2513 } 2514 2515 if (grant_restart) 2516 goto restart; 2517 if (demote_restart && !quit) { 2518 quit = 1; 2519 goto restart; 2520 } 2521 2522 return max_t(int, high, hi); 2523 } 2524 2525 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 2526 unsigned int *count) 2527 { 2528 struct dlm_lkb *lkb, *s; 2529 2530 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2531 if (can_be_granted(r, lkb, 0, 0, NULL)) { 2532 grant_lock_pending(r, lkb); 2533 if (count) 2534 (*count)++; 2535 } else { 2536 high = max_t(int, lkb->lkb_rqmode, high); 2537 if (lkb->lkb_rqmode == DLM_LOCK_CW) 2538 *cw = 1; 2539 } 2540 } 2541 2542 return high; 2543 } 2544 2545 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked 2546 on either the convert or waiting queue. 2547 high is the largest rqmode of all locks blocked on the convert or 2548 waiting queue. */ 2549 2550 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) 2551 { 2552 if (gr->lkb_grmode == DLM_LOCK_PR && cw) { 2553 if (gr->lkb_highbast < DLM_LOCK_EX) 2554 return 1; 2555 return 0; 2556 } 2557 2558 if (gr->lkb_highbast < high && 2559 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 2560 return 1; 2561 return 0; 2562 } 2563 2564 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) 2565 { 2566 struct dlm_lkb *lkb, *s; 2567 int high = DLM_LOCK_IV; 2568 int cw = 0; 2569 2570 if (!is_master(r)) { 2571 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 2572 dlm_dump_rsb(r); 2573 return; 2574 } 2575 2576 high = grant_pending_convert(r, high, &cw, count); 2577 high = grant_pending_wait(r, high, &cw, count); 2578 2579 if (high == DLM_LOCK_IV) 2580 return; 2581 2582 /* 2583 * If there are locks left on the wait/convert queue then send blocking 2584 * ASTs to granted locks based on the largest requested mode (high) 2585 * found above. 2586 */ 2587 2588 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 2589 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { 2590 if (cw && high == DLM_LOCK_PR && 2591 lkb->lkb_grmode == DLM_LOCK_PR) 2592 queue_bast(r, lkb, DLM_LOCK_CW); 2593 else 2594 queue_bast(r, lkb, high); 2595 lkb->lkb_highbast = high; 2596 } 2597 } 2598 } 2599 2600 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) 2601 { 2602 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || 2603 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { 2604 if (gr->lkb_highbast < DLM_LOCK_EX) 2605 return 1; 2606 return 0; 2607 } 2608 2609 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) 2610 return 1; 2611 return 0; 2612 } 2613 2614 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 2615 struct dlm_lkb *lkb) 2616 { 2617 struct dlm_lkb *gr; 2618 2619 list_for_each_entry(gr, head, lkb_statequeue) { 2620 /* skip self when sending basts to convertqueue */ 2621 if (gr == lkb) 2622 continue; 2623 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 2624 queue_bast(r, gr, lkb->lkb_rqmode); 2625 gr->lkb_highbast = lkb->lkb_rqmode; 2626 } 2627 } 2628 } 2629 2630 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 2631 { 2632 send_bast_queue(r, &r->res_grantqueue, lkb); 2633 } 2634 2635 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 2636 { 2637 send_bast_queue(r, &r->res_grantqueue, lkb); 2638 send_bast_queue(r, &r->res_convertqueue, lkb); 2639 } 2640 2641 /* set_master(r, lkb) -- set the master nodeid of a resource 2642 2643 The purpose of this function is to set the nodeid field in the given 2644 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 2645 known, it can just be copied to the lkb and the function will return 2646 0. If the rsb's nodeid is _not_ known, it needs to be looked up 2647 before it can be copied to the lkb. 2648 2649 When the rsb nodeid is being looked up remotely, the initial lkb 2650 causing the lookup is kept on the ls_waiters list waiting for the 2651 lookup reply. Other lkb's waiting for the same rsb lookup are kept 2652 on the rsb's res_lookup list until the master is verified. 2653 2654 Return values: 2655 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 2656 1: the rsb master is not available and the lkb has been placed on 2657 a wait queue 2658 */ 2659 2660 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2661 { 2662 int our_nodeid = dlm_our_nodeid(); 2663 2664 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2665 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2666 r->res_first_lkid = lkb->lkb_id; 2667 lkb->lkb_nodeid = r->res_nodeid; 2668 return 0; 2669 } 2670 2671 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 2672 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 2673 return 1; 2674 } 2675 2676 if (r->res_master_nodeid == our_nodeid) { 2677 lkb->lkb_nodeid = 0; 2678 return 0; 2679 } 2680 2681 if (r->res_master_nodeid) { 2682 lkb->lkb_nodeid = r->res_master_nodeid; 2683 return 0; 2684 } 2685 2686 if (dlm_dir_nodeid(r) == our_nodeid) { 2687 /* This is a somewhat unusual case; find_rsb will usually 2688 have set res_master_nodeid when dir nodeid is local, but 2689 there are cases where we become the dir node after we've 2690 past find_rsb and go through _request_lock again. 2691 confirm_master() or process_lookup_list() needs to be 2692 called after this. */ 2693 log_debug(r->res_ls, "set_master %x self master %d dir %d %s", 2694 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, 2695 r->res_name); 2696 r->res_master_nodeid = our_nodeid; 2697 r->res_nodeid = 0; 2698 lkb->lkb_nodeid = 0; 2699 return 0; 2700 } 2701 2702 r->res_first_lkid = lkb->lkb_id; 2703 send_lookup(r, lkb); 2704 return 1; 2705 } 2706 2707 static void process_lookup_list(struct dlm_rsb *r) 2708 { 2709 struct dlm_lkb *lkb, *safe; 2710 2711 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 2712 list_del_init(&lkb->lkb_rsb_lookup); 2713 _request_lock(r, lkb); 2714 } 2715 } 2716 2717 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 2718 2719 static void confirm_master(struct dlm_rsb *r, int error) 2720 { 2721 struct dlm_lkb *lkb; 2722 2723 if (!r->res_first_lkid) 2724 return; 2725 2726 switch (error) { 2727 case 0: 2728 case -EINPROGRESS: 2729 r->res_first_lkid = 0; 2730 process_lookup_list(r); 2731 break; 2732 2733 case -EAGAIN: 2734 case -EBADR: 2735 case -ENOTBLK: 2736 /* the remote request failed and won't be retried (it was 2737 a NOQUEUE, or has been canceled/unlocked); make a waiting 2738 lkb the first_lkid */ 2739 2740 r->res_first_lkid = 0; 2741 2742 if (!list_empty(&r->res_lookup)) { 2743 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 2744 lkb_rsb_lookup); 2745 list_del_init(&lkb->lkb_rsb_lookup); 2746 r->res_first_lkid = lkb->lkb_id; 2747 _request_lock(r, lkb); 2748 } 2749 break; 2750 2751 default: 2752 log_error(r->res_ls, "confirm_master unknown error %d", error); 2753 } 2754 } 2755 2756 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2757 int namelen, void (*ast)(void *astparam), 2758 void *astparam, 2759 void (*bast)(void *astparam, int mode), 2760 struct dlm_args *args) 2761 { 2762 int rv = -EINVAL; 2763 2764 /* check for invalid arg usage */ 2765 2766 if (mode < 0 || mode > DLM_LOCK_EX) 2767 goto out; 2768 2769 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 2770 goto out; 2771 2772 if (flags & DLM_LKF_CANCEL) 2773 goto out; 2774 2775 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 2776 goto out; 2777 2778 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 2779 goto out; 2780 2781 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 2782 goto out; 2783 2784 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 2785 goto out; 2786 2787 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 2788 goto out; 2789 2790 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 2791 goto out; 2792 2793 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 2794 goto out; 2795 2796 if (!ast || !lksb) 2797 goto out; 2798 2799 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 2800 goto out; 2801 2802 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 2803 goto out; 2804 2805 /* these args will be copied to the lkb in validate_lock_args, 2806 it cannot be done now because when converting locks, fields in 2807 an active lkb cannot be modified before locking the rsb */ 2808 2809 args->flags = flags; 2810 args->astfn = ast; 2811 args->astparam = astparam; 2812 args->bastfn = bast; 2813 args->mode = mode; 2814 args->lksb = lksb; 2815 rv = 0; 2816 out: 2817 return rv; 2818 } 2819 2820 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 2821 { 2822 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 2823 DLM_LKF_FORCEUNLOCK)) 2824 return -EINVAL; 2825 2826 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 2827 return -EINVAL; 2828 2829 args->flags = flags; 2830 args->astparam = astarg; 2831 return 0; 2832 } 2833 2834 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2835 struct dlm_args *args) 2836 { 2837 int rv = -EBUSY; 2838 2839 if (args->flags & DLM_LKF_CONVERT) { 2840 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2841 goto out; 2842 2843 /* lock not allowed if there's any op in progress */ 2844 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 2845 goto out; 2846 2847 if (is_overlap(lkb)) 2848 goto out; 2849 2850 rv = -EINVAL; 2851 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 2852 goto out; 2853 2854 if (args->flags & DLM_LKF_QUECVT && 2855 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 2856 goto out; 2857 } 2858 2859 lkb->lkb_exflags = args->flags; 2860 dlm_set_sbflags_val(lkb, 0); 2861 lkb->lkb_astfn = args->astfn; 2862 lkb->lkb_astparam = args->astparam; 2863 lkb->lkb_bastfn = args->bastfn; 2864 lkb->lkb_rqmode = args->mode; 2865 lkb->lkb_lksb = args->lksb; 2866 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2867 lkb->lkb_ownpid = (int) current->pid; 2868 rv = 0; 2869 out: 2870 switch (rv) { 2871 case 0: 2872 break; 2873 case -EINVAL: 2874 /* annoy the user because dlm usage is wrong */ 2875 WARN_ON(1); 2876 log_error(ls, "%s %d %x %x %x %d %d %s", __func__, 2877 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, 2878 lkb->lkb_status, lkb->lkb_wait_type, 2879 lkb->lkb_resource->res_name); 2880 break; 2881 default: 2882 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__, 2883 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags, 2884 lkb->lkb_status, lkb->lkb_wait_type, 2885 lkb->lkb_resource->res_name); 2886 break; 2887 } 2888 2889 return rv; 2890 } 2891 2892 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 2893 for success */ 2894 2895 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 2896 because there may be a lookup in progress and it's valid to do 2897 cancel/unlockf on it */ 2898 2899 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 2900 { 2901 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 2902 int rv = -EBUSY; 2903 2904 /* normal unlock not allowed if there's any op in progress */ 2905 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) && 2906 (lkb->lkb_wait_type || lkb->lkb_wait_count)) 2907 goto out; 2908 2909 /* an lkb may be waiting for an rsb lookup to complete where the 2910 lookup was initiated by another lock */ 2911 2912 if (!list_empty(&lkb->lkb_rsb_lookup)) { 2913 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 2914 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 2915 list_del_init(&lkb->lkb_rsb_lookup); 2916 queue_cast(lkb->lkb_resource, lkb, 2917 args->flags & DLM_LKF_CANCEL ? 2918 -DLM_ECANCEL : -DLM_EUNLOCK); 2919 unhold_lkb(lkb); /* undoes create_lkb() */ 2920 } 2921 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ 2922 goto out; 2923 } 2924 2925 rv = -EINVAL; 2926 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) { 2927 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 2928 dlm_print_lkb(lkb); 2929 goto out; 2930 } 2931 2932 /* an lkb may still exist even though the lock is EOL'ed due to a 2933 * cancel, unlock or failed noqueue request; an app can't use these 2934 * locks; return same error as if the lkid had not been found at all 2935 */ 2936 2937 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) { 2938 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 2939 rv = -ENOENT; 2940 goto out; 2941 } 2942 2943 /* cancel not allowed with another cancel/unlock in progress */ 2944 2945 if (args->flags & DLM_LKF_CANCEL) { 2946 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 2947 goto out; 2948 2949 if (is_overlap(lkb)) 2950 goto out; 2951 2952 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { 2953 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 2954 rv = -EBUSY; 2955 goto out; 2956 } 2957 2958 /* there's nothing to cancel */ 2959 if (lkb->lkb_status == DLM_LKSTS_GRANTED && 2960 !lkb->lkb_wait_type) { 2961 rv = -EBUSY; 2962 goto out; 2963 } 2964 2965 switch (lkb->lkb_wait_type) { 2966 case DLM_MSG_LOOKUP: 2967 case DLM_MSG_REQUEST: 2968 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 2969 rv = -EBUSY; 2970 goto out; 2971 case DLM_MSG_UNLOCK: 2972 case DLM_MSG_CANCEL: 2973 goto out; 2974 } 2975 /* add_to_waiters() will set OVERLAP_CANCEL */ 2976 goto out_ok; 2977 } 2978 2979 /* do we need to allow a force-unlock if there's a normal unlock 2980 already in progress? in what conditions could the normal unlock 2981 fail such that we'd want to send a force-unlock to be sure? */ 2982 2983 if (args->flags & DLM_LKF_FORCEUNLOCK) { 2984 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 2985 goto out; 2986 2987 if (is_overlap_unlock(lkb)) 2988 goto out; 2989 2990 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) { 2991 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 2992 rv = -EBUSY; 2993 goto out; 2994 } 2995 2996 switch (lkb->lkb_wait_type) { 2997 case DLM_MSG_LOOKUP: 2998 case DLM_MSG_REQUEST: 2999 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 3000 rv = -EBUSY; 3001 goto out; 3002 case DLM_MSG_UNLOCK: 3003 goto out; 3004 } 3005 /* add_to_waiters() will set OVERLAP_UNLOCK */ 3006 } 3007 3008 out_ok: 3009 /* an overlapping op shouldn't blow away exflags from other op */ 3010 lkb->lkb_exflags |= args->flags; 3011 dlm_set_sbflags_val(lkb, 0); 3012 lkb->lkb_astparam = args->astparam; 3013 rv = 0; 3014 out: 3015 switch (rv) { 3016 case 0: 3017 break; 3018 case -EINVAL: 3019 /* annoy the user because dlm usage is wrong */ 3020 WARN_ON(1); 3021 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv, 3022 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags, 3023 args->flags, lkb->lkb_wait_type, 3024 lkb->lkb_resource->res_name); 3025 break; 3026 default: 3027 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv, 3028 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags, 3029 args->flags, lkb->lkb_wait_type, 3030 lkb->lkb_resource->res_name); 3031 break; 3032 } 3033 3034 return rv; 3035 } 3036 3037 /* 3038 * Four stage 4 varieties: 3039 * do_request(), do_convert(), do_unlock(), do_cancel() 3040 * These are called on the master node for the given lock and 3041 * from the central locking logic. 3042 */ 3043 3044 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3045 { 3046 int error = 0; 3047 3048 if (can_be_granted(r, lkb, 1, 0, NULL)) { 3049 grant_lock(r, lkb); 3050 queue_cast(r, lkb, 0); 3051 goto out; 3052 } 3053 3054 if (can_be_queued(lkb)) { 3055 error = -EINPROGRESS; 3056 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3057 goto out; 3058 } 3059 3060 error = -EAGAIN; 3061 queue_cast(r, lkb, -EAGAIN); 3062 out: 3063 return error; 3064 } 3065 3066 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3067 int error) 3068 { 3069 switch (error) { 3070 case -EAGAIN: 3071 if (force_blocking_asts(lkb)) 3072 send_blocking_asts_all(r, lkb); 3073 break; 3074 case -EINPROGRESS: 3075 send_blocking_asts(r, lkb); 3076 break; 3077 } 3078 } 3079 3080 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3081 { 3082 int error = 0; 3083 int deadlk = 0; 3084 3085 /* changing an existing lock may allow others to be granted */ 3086 3087 if (can_be_granted(r, lkb, 1, 0, &deadlk)) { 3088 grant_lock(r, lkb); 3089 queue_cast(r, lkb, 0); 3090 goto out; 3091 } 3092 3093 /* can_be_granted() detected that this lock would block in a conversion 3094 deadlock, so we leave it on the granted queue and return EDEADLK in 3095 the ast for the convert. */ 3096 3097 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 3098 /* it's left on the granted queue */ 3099 revert_lock(r, lkb); 3100 queue_cast(r, lkb, -EDEADLK); 3101 error = -EDEADLK; 3102 goto out; 3103 } 3104 3105 /* is_demoted() means the can_be_granted() above set the grmode 3106 to NL, and left us on the granted queue. This auto-demotion 3107 (due to CONVDEADLK) might mean other locks, and/or this lock, are 3108 now grantable. We have to try to grant other converting locks 3109 before we try again to grant this one. */ 3110 3111 if (is_demoted(lkb)) { 3112 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 3113 if (_can_be_granted(r, lkb, 1, 0)) { 3114 grant_lock(r, lkb); 3115 queue_cast(r, lkb, 0); 3116 goto out; 3117 } 3118 /* else fall through and move to convert queue */ 3119 } 3120 3121 if (can_be_queued(lkb)) { 3122 error = -EINPROGRESS; 3123 del_lkb(r, lkb); 3124 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3125 goto out; 3126 } 3127 3128 error = -EAGAIN; 3129 queue_cast(r, lkb, -EAGAIN); 3130 out: 3131 return error; 3132 } 3133 3134 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3135 int error) 3136 { 3137 switch (error) { 3138 case 0: 3139 grant_pending_locks(r, NULL); 3140 /* grant_pending_locks also sends basts */ 3141 break; 3142 case -EAGAIN: 3143 if (force_blocking_asts(lkb)) 3144 send_blocking_asts_all(r, lkb); 3145 break; 3146 case -EINPROGRESS: 3147 send_blocking_asts(r, lkb); 3148 break; 3149 } 3150 } 3151 3152 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3153 { 3154 remove_lock(r, lkb); 3155 queue_cast(r, lkb, -DLM_EUNLOCK); 3156 return -DLM_EUNLOCK; 3157 } 3158 3159 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3160 int error) 3161 { 3162 grant_pending_locks(r, NULL); 3163 } 3164 3165 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3166 3167 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3168 { 3169 int error; 3170 3171 error = revert_lock(r, lkb); 3172 if (error) { 3173 queue_cast(r, lkb, -DLM_ECANCEL); 3174 return -DLM_ECANCEL; 3175 } 3176 return 0; 3177 } 3178 3179 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3180 int error) 3181 { 3182 if (error) 3183 grant_pending_locks(r, NULL); 3184 } 3185 3186 /* 3187 * Four stage 3 varieties: 3188 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 3189 */ 3190 3191 /* add a new lkb to a possibly new rsb, called by requesting process */ 3192 3193 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3194 { 3195 int error; 3196 3197 /* set_master: sets lkb nodeid from r */ 3198 3199 error = set_master(r, lkb); 3200 if (error < 0) 3201 goto out; 3202 if (error) { 3203 error = 0; 3204 goto out; 3205 } 3206 3207 if (is_remote(r)) { 3208 /* receive_request() calls do_request() on remote node */ 3209 error = send_request(r, lkb); 3210 } else { 3211 error = do_request(r, lkb); 3212 /* for remote locks the request_reply is sent 3213 between do_request and do_request_effects */ 3214 do_request_effects(r, lkb, error); 3215 } 3216 out: 3217 return error; 3218 } 3219 3220 /* change some property of an existing lkb, e.g. mode */ 3221 3222 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3223 { 3224 int error; 3225 3226 if (is_remote(r)) { 3227 /* receive_convert() calls do_convert() on remote node */ 3228 error = send_convert(r, lkb); 3229 } else { 3230 error = do_convert(r, lkb); 3231 /* for remote locks the convert_reply is sent 3232 between do_convert and do_convert_effects */ 3233 do_convert_effects(r, lkb, error); 3234 } 3235 3236 return error; 3237 } 3238 3239 /* remove an existing lkb from the granted queue */ 3240 3241 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3242 { 3243 int error; 3244 3245 if (is_remote(r)) { 3246 /* receive_unlock() calls do_unlock() on remote node */ 3247 error = send_unlock(r, lkb); 3248 } else { 3249 error = do_unlock(r, lkb); 3250 /* for remote locks the unlock_reply is sent 3251 between do_unlock and do_unlock_effects */ 3252 do_unlock_effects(r, lkb, error); 3253 } 3254 3255 return error; 3256 } 3257 3258 /* remove an existing lkb from the convert or wait queue */ 3259 3260 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3261 { 3262 int error; 3263 3264 if (is_remote(r)) { 3265 /* receive_cancel() calls do_cancel() on remote node */ 3266 error = send_cancel(r, lkb); 3267 } else { 3268 error = do_cancel(r, lkb); 3269 /* for remote locks the cancel_reply is sent 3270 between do_cancel and do_cancel_effects */ 3271 do_cancel_effects(r, lkb, error); 3272 } 3273 3274 return error; 3275 } 3276 3277 /* 3278 * Four stage 2 varieties: 3279 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 3280 */ 3281 3282 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3283 const void *name, int len, 3284 struct dlm_args *args) 3285 { 3286 struct dlm_rsb *r; 3287 int error; 3288 3289 error = validate_lock_args(ls, lkb, args); 3290 if (error) 3291 return error; 3292 3293 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 3294 if (error) 3295 return error; 3296 3297 lock_rsb(r); 3298 3299 attach_lkb(r, lkb); 3300 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 3301 3302 error = _request_lock(r, lkb); 3303 3304 unlock_rsb(r); 3305 put_rsb(r); 3306 return error; 3307 } 3308 3309 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3310 struct dlm_args *args) 3311 { 3312 struct dlm_rsb *r; 3313 int error; 3314 3315 r = lkb->lkb_resource; 3316 3317 hold_rsb(r); 3318 lock_rsb(r); 3319 3320 error = validate_lock_args(ls, lkb, args); 3321 if (error) 3322 goto out; 3323 3324 error = _convert_lock(r, lkb); 3325 out: 3326 unlock_rsb(r); 3327 put_rsb(r); 3328 return error; 3329 } 3330 3331 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3332 struct dlm_args *args) 3333 { 3334 struct dlm_rsb *r; 3335 int error; 3336 3337 r = lkb->lkb_resource; 3338 3339 hold_rsb(r); 3340 lock_rsb(r); 3341 3342 error = validate_unlock_args(lkb, args); 3343 if (error) 3344 goto out; 3345 3346 error = _unlock_lock(r, lkb); 3347 out: 3348 unlock_rsb(r); 3349 put_rsb(r); 3350 return error; 3351 } 3352 3353 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3354 struct dlm_args *args) 3355 { 3356 struct dlm_rsb *r; 3357 int error; 3358 3359 r = lkb->lkb_resource; 3360 3361 hold_rsb(r); 3362 lock_rsb(r); 3363 3364 error = validate_unlock_args(lkb, args); 3365 if (error) 3366 goto out; 3367 3368 error = _cancel_lock(r, lkb); 3369 out: 3370 unlock_rsb(r); 3371 put_rsb(r); 3372 return error; 3373 } 3374 3375 /* 3376 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 3377 */ 3378 3379 int dlm_lock(dlm_lockspace_t *lockspace, 3380 int mode, 3381 struct dlm_lksb *lksb, 3382 uint32_t flags, 3383 const void *name, 3384 unsigned int namelen, 3385 uint32_t parent_lkid, 3386 void (*ast) (void *astarg), 3387 void *astarg, 3388 void (*bast) (void *astarg, int mode)) 3389 { 3390 struct dlm_ls *ls; 3391 struct dlm_lkb *lkb; 3392 struct dlm_args args; 3393 int error, convert = flags & DLM_LKF_CONVERT; 3394 3395 ls = dlm_find_lockspace_local(lockspace); 3396 if (!ls) 3397 return -EINVAL; 3398 3399 dlm_lock_recovery(ls); 3400 3401 if (convert) 3402 error = find_lkb(ls, lksb->sb_lkid, &lkb); 3403 else 3404 error = create_lkb(ls, &lkb); 3405 3406 if (error) 3407 goto out; 3408 3409 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); 3410 3411 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast, 3412 &args); 3413 if (error) 3414 goto out_put; 3415 3416 if (convert) 3417 error = convert_lock(ls, lkb, &args); 3418 else 3419 error = request_lock(ls, lkb, name, namelen, &args); 3420 3421 if (error == -EINPROGRESS) 3422 error = 0; 3423 out_put: 3424 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true); 3425 3426 if (convert || error) 3427 __put_lkb(ls, lkb); 3428 if (error == -EAGAIN || error == -EDEADLK) 3429 error = 0; 3430 out: 3431 dlm_unlock_recovery(ls); 3432 dlm_put_lockspace(ls); 3433 return error; 3434 } 3435 3436 int dlm_unlock(dlm_lockspace_t *lockspace, 3437 uint32_t lkid, 3438 uint32_t flags, 3439 struct dlm_lksb *lksb, 3440 void *astarg) 3441 { 3442 struct dlm_ls *ls; 3443 struct dlm_lkb *lkb; 3444 struct dlm_args args; 3445 int error; 3446 3447 ls = dlm_find_lockspace_local(lockspace); 3448 if (!ls) 3449 return -EINVAL; 3450 3451 dlm_lock_recovery(ls); 3452 3453 error = find_lkb(ls, lkid, &lkb); 3454 if (error) 3455 goto out; 3456 3457 trace_dlm_unlock_start(ls, lkb, flags); 3458 3459 error = set_unlock_args(flags, astarg, &args); 3460 if (error) 3461 goto out_put; 3462 3463 if (flags & DLM_LKF_CANCEL) 3464 error = cancel_lock(ls, lkb, &args); 3465 else 3466 error = unlock_lock(ls, lkb, &args); 3467 3468 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 3469 error = 0; 3470 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 3471 error = 0; 3472 out_put: 3473 trace_dlm_unlock_end(ls, lkb, flags, error); 3474 3475 dlm_put_lkb(lkb); 3476 out: 3477 dlm_unlock_recovery(ls); 3478 dlm_put_lockspace(ls); 3479 return error; 3480 } 3481 3482 /* 3483 * send/receive routines for remote operations and replies 3484 * 3485 * send_args 3486 * send_common 3487 * send_request receive_request 3488 * send_convert receive_convert 3489 * send_unlock receive_unlock 3490 * send_cancel receive_cancel 3491 * send_grant receive_grant 3492 * send_bast receive_bast 3493 * send_lookup receive_lookup 3494 * send_remove receive_remove 3495 * 3496 * send_common_reply 3497 * receive_request_reply send_request_reply 3498 * receive_convert_reply send_convert_reply 3499 * receive_unlock_reply send_unlock_reply 3500 * receive_cancel_reply send_cancel_reply 3501 * receive_lookup_reply send_lookup_reply 3502 */ 3503 3504 static int _create_message(struct dlm_ls *ls, int mb_len, 3505 int to_nodeid, int mstype, 3506 struct dlm_message **ms_ret, 3507 struct dlm_mhandle **mh_ret) 3508 { 3509 struct dlm_message *ms; 3510 struct dlm_mhandle *mh; 3511 char *mb; 3512 3513 /* get_buffer gives us a message handle (mh) that we need to 3514 pass into midcomms_commit and a message buffer (mb) that we 3515 write our data into */ 3516 3517 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb); 3518 if (!mh) 3519 return -ENOBUFS; 3520 3521 ms = (struct dlm_message *) mb; 3522 3523 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 3524 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id); 3525 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid()); 3526 ms->m_header.h_length = cpu_to_le16(mb_len); 3527 ms->m_header.h_cmd = DLM_MSG; 3528 3529 ms->m_type = cpu_to_le32(mstype); 3530 3531 *mh_ret = mh; 3532 *ms_ret = ms; 3533 return 0; 3534 } 3535 3536 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 3537 int to_nodeid, int mstype, 3538 struct dlm_message **ms_ret, 3539 struct dlm_mhandle **mh_ret) 3540 { 3541 int mb_len = sizeof(struct dlm_message); 3542 3543 switch (mstype) { 3544 case DLM_MSG_REQUEST: 3545 case DLM_MSG_LOOKUP: 3546 case DLM_MSG_REMOVE: 3547 mb_len += r->res_length; 3548 break; 3549 case DLM_MSG_CONVERT: 3550 case DLM_MSG_UNLOCK: 3551 case DLM_MSG_REQUEST_REPLY: 3552 case DLM_MSG_CONVERT_REPLY: 3553 case DLM_MSG_GRANT: 3554 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK)) 3555 mb_len += r->res_ls->ls_lvblen; 3556 break; 3557 } 3558 3559 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 3560 ms_ret, mh_ret); 3561 } 3562 3563 /* further lowcomms enhancements or alternate implementations may make 3564 the return value from this function useful at some point */ 3565 3566 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms, 3567 const void *name, int namelen) 3568 { 3569 dlm_midcomms_commit_mhandle(mh, name, namelen); 3570 return 0; 3571 } 3572 3573 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 3574 struct dlm_message *ms) 3575 { 3576 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid); 3577 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid); 3578 ms->m_lkid = cpu_to_le32(lkb->lkb_id); 3579 ms->m_remid = cpu_to_le32(lkb->lkb_remid); 3580 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags); 3581 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb)); 3582 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb)); 3583 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq); 3584 ms->m_status = cpu_to_le32(lkb->lkb_status); 3585 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode); 3586 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode); 3587 ms->m_hash = cpu_to_le32(r->res_hash); 3588 3589 /* m_result and m_bastmode are set from function args, 3590 not from lkb fields */ 3591 3592 if (lkb->lkb_bastfn) 3593 ms->m_asts |= cpu_to_le32(DLM_CB_BAST); 3594 if (lkb->lkb_astfn) 3595 ms->m_asts |= cpu_to_le32(DLM_CB_CAST); 3596 3597 /* compare with switch in create_message; send_remove() doesn't 3598 use send_args() */ 3599 3600 switch (ms->m_type) { 3601 case cpu_to_le32(DLM_MSG_REQUEST): 3602 case cpu_to_le32(DLM_MSG_LOOKUP): 3603 memcpy(ms->m_extra, r->res_name, r->res_length); 3604 break; 3605 case cpu_to_le32(DLM_MSG_CONVERT): 3606 case cpu_to_le32(DLM_MSG_UNLOCK): 3607 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 3608 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 3609 case cpu_to_le32(DLM_MSG_GRANT): 3610 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK)) 3611 break; 3612 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 3613 break; 3614 } 3615 } 3616 3617 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 3618 { 3619 struct dlm_message *ms; 3620 struct dlm_mhandle *mh; 3621 int to_nodeid, error; 3622 3623 to_nodeid = r->res_nodeid; 3624 3625 error = add_to_waiters(lkb, mstype, to_nodeid); 3626 if (error) 3627 return error; 3628 3629 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3630 if (error) 3631 goto fail; 3632 3633 send_args(r, lkb, ms); 3634 3635 error = send_message(mh, ms, r->res_name, r->res_length); 3636 if (error) 3637 goto fail; 3638 return 0; 3639 3640 fail: 3641 remove_from_waiters(lkb, msg_reply_type(mstype)); 3642 return error; 3643 } 3644 3645 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3646 { 3647 return send_common(r, lkb, DLM_MSG_REQUEST); 3648 } 3649 3650 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3651 { 3652 int error; 3653 3654 error = send_common(r, lkb, DLM_MSG_CONVERT); 3655 3656 /* down conversions go without a reply from the master */ 3657 if (!error && down_conversion(lkb)) { 3658 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 3659 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 3660 r->res_ls->ls_local_ms.m_result = 0; 3661 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true); 3662 } 3663 3664 return error; 3665 } 3666 3667 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 3668 MASTER_UNCERTAIN to force the next request on the rsb to confirm 3669 that the master is still correct. */ 3670 3671 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3672 { 3673 return send_common(r, lkb, DLM_MSG_UNLOCK); 3674 } 3675 3676 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3677 { 3678 return send_common(r, lkb, DLM_MSG_CANCEL); 3679 } 3680 3681 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 3682 { 3683 struct dlm_message *ms; 3684 struct dlm_mhandle *mh; 3685 int to_nodeid, error; 3686 3687 to_nodeid = lkb->lkb_nodeid; 3688 3689 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 3690 if (error) 3691 goto out; 3692 3693 send_args(r, lkb, ms); 3694 3695 ms->m_result = 0; 3696 3697 error = send_message(mh, ms, r->res_name, r->res_length); 3698 out: 3699 return error; 3700 } 3701 3702 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 3703 { 3704 struct dlm_message *ms; 3705 struct dlm_mhandle *mh; 3706 int to_nodeid, error; 3707 3708 to_nodeid = lkb->lkb_nodeid; 3709 3710 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 3711 if (error) 3712 goto out; 3713 3714 send_args(r, lkb, ms); 3715 3716 ms->m_bastmode = cpu_to_le32(mode); 3717 3718 error = send_message(mh, ms, r->res_name, r->res_length); 3719 out: 3720 return error; 3721 } 3722 3723 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 3724 { 3725 struct dlm_message *ms; 3726 struct dlm_mhandle *mh; 3727 int to_nodeid, error; 3728 3729 to_nodeid = dlm_dir_nodeid(r); 3730 3731 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); 3732 if (error) 3733 return error; 3734 3735 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 3736 if (error) 3737 goto fail; 3738 3739 send_args(r, lkb, ms); 3740 3741 error = send_message(mh, ms, r->res_name, r->res_length); 3742 if (error) 3743 goto fail; 3744 return 0; 3745 3746 fail: 3747 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3748 return error; 3749 } 3750 3751 static int send_remove(struct dlm_rsb *r) 3752 { 3753 struct dlm_message *ms; 3754 struct dlm_mhandle *mh; 3755 int to_nodeid, error; 3756 3757 to_nodeid = dlm_dir_nodeid(r); 3758 3759 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 3760 if (error) 3761 goto out; 3762 3763 memcpy(ms->m_extra, r->res_name, r->res_length); 3764 ms->m_hash = cpu_to_le32(r->res_hash); 3765 3766 error = send_message(mh, ms, r->res_name, r->res_length); 3767 out: 3768 return error; 3769 } 3770 3771 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3772 int mstype, int rv) 3773 { 3774 struct dlm_message *ms; 3775 struct dlm_mhandle *mh; 3776 int to_nodeid, error; 3777 3778 to_nodeid = lkb->lkb_nodeid; 3779 3780 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3781 if (error) 3782 goto out; 3783 3784 send_args(r, lkb, ms); 3785 3786 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3787 3788 error = send_message(mh, ms, r->res_name, r->res_length); 3789 out: 3790 return error; 3791 } 3792 3793 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3794 { 3795 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 3796 } 3797 3798 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3799 { 3800 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 3801 } 3802 3803 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3804 { 3805 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 3806 } 3807 3808 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3809 { 3810 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 3811 } 3812 3813 static int send_lookup_reply(struct dlm_ls *ls, 3814 const struct dlm_message *ms_in, int ret_nodeid, 3815 int rv) 3816 { 3817 struct dlm_rsb *r = &ls->ls_local_rsb; 3818 struct dlm_message *ms; 3819 struct dlm_mhandle *mh; 3820 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid); 3821 3822 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 3823 if (error) 3824 goto out; 3825 3826 ms->m_lkid = ms_in->m_lkid; 3827 ms->m_result = cpu_to_le32(to_dlm_errno(rv)); 3828 ms->m_nodeid = cpu_to_le32(ret_nodeid); 3829 3830 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in)); 3831 out: 3832 return error; 3833 } 3834 3835 /* which args we save from a received message depends heavily on the type 3836 of message, unlike the send side where we can safely send everything about 3837 the lkb for any type of message */ 3838 3839 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms) 3840 { 3841 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags); 3842 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); 3843 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); 3844 } 3845 3846 static void receive_flags_reply(struct dlm_lkb *lkb, 3847 const struct dlm_message *ms, 3848 bool local) 3849 { 3850 if (local) 3851 return; 3852 3853 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags)); 3854 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags)); 3855 } 3856 3857 static int receive_extralen(const struct dlm_message *ms) 3858 { 3859 return (le16_to_cpu(ms->m_header.h_length) - 3860 sizeof(struct dlm_message)); 3861 } 3862 3863 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 3864 const struct dlm_message *ms) 3865 { 3866 int len; 3867 3868 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3869 if (!lkb->lkb_lvbptr) 3870 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3871 if (!lkb->lkb_lvbptr) 3872 return -ENOMEM; 3873 len = receive_extralen(ms); 3874 if (len > ls->ls_lvblen) 3875 len = ls->ls_lvblen; 3876 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 3877 } 3878 return 0; 3879 } 3880 3881 static void fake_bastfn(void *astparam, int mode) 3882 { 3883 log_print("fake_bastfn should not be called"); 3884 } 3885 3886 static void fake_astfn(void *astparam) 3887 { 3888 log_print("fake_astfn should not be called"); 3889 } 3890 3891 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3892 const struct dlm_message *ms) 3893 { 3894 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3895 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid); 3896 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3897 lkb->lkb_grmode = DLM_LOCK_IV; 3898 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3899 3900 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL; 3901 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL; 3902 3903 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3904 /* lkb was just created so there won't be an lvb yet */ 3905 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3906 if (!lkb->lkb_lvbptr) 3907 return -ENOMEM; 3908 } 3909 3910 return 0; 3911 } 3912 3913 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3914 const struct dlm_message *ms) 3915 { 3916 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3917 return -EBUSY; 3918 3919 if (receive_lvb(ls, lkb, ms)) 3920 return -ENOMEM; 3921 3922 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode); 3923 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq); 3924 3925 return 0; 3926 } 3927 3928 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3929 const struct dlm_message *ms) 3930 { 3931 if (receive_lvb(ls, lkb, ms)) 3932 return -ENOMEM; 3933 return 0; 3934 } 3935 3936 /* We fill in the local-lkb fields with the info that send_xxxx_reply() 3937 uses to send a reply and that the remote end uses to process the reply. */ 3938 3939 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms) 3940 { 3941 struct dlm_lkb *lkb = &ls->ls_local_lkb; 3942 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 3943 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 3944 } 3945 3946 /* This is called after the rsb is locked so that we can safely inspect 3947 fields in the lkb. */ 3948 3949 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms) 3950 { 3951 int from = le32_to_cpu(ms->m_header.h_nodeid); 3952 int error = 0; 3953 3954 /* currently mixing of user/kernel locks are not supported */ 3955 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) && 3956 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) { 3957 log_error(lkb->lkb_resource->res_ls, 3958 "got user dlm message for a kernel lock"); 3959 error = -EINVAL; 3960 goto out; 3961 } 3962 3963 switch (ms->m_type) { 3964 case cpu_to_le32(DLM_MSG_CONVERT): 3965 case cpu_to_le32(DLM_MSG_UNLOCK): 3966 case cpu_to_le32(DLM_MSG_CANCEL): 3967 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) 3968 error = -EINVAL; 3969 break; 3970 3971 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 3972 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 3973 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 3974 case cpu_to_le32(DLM_MSG_GRANT): 3975 case cpu_to_le32(DLM_MSG_BAST): 3976 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) 3977 error = -EINVAL; 3978 break; 3979 3980 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 3981 if (!is_process_copy(lkb)) 3982 error = -EINVAL; 3983 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) 3984 error = -EINVAL; 3985 break; 3986 3987 default: 3988 error = -EINVAL; 3989 } 3990 3991 out: 3992 if (error) 3993 log_error(lkb->lkb_resource->res_ls, 3994 "ignore invalid message %d from %d %x %x %x %d", 3995 le32_to_cpu(ms->m_type), from, lkb->lkb_id, 3996 lkb->lkb_remid, dlm_iflags_val(lkb), 3997 lkb->lkb_nodeid); 3998 return error; 3999 } 4000 4001 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms) 4002 { 4003 struct dlm_lkb *lkb; 4004 struct dlm_rsb *r; 4005 int from_nodeid; 4006 int error, namelen = 0; 4007 4008 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4009 4010 error = create_lkb(ls, &lkb); 4011 if (error) 4012 goto fail; 4013 4014 receive_flags(lkb, ms); 4015 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 4016 error = receive_request_args(ls, lkb, ms); 4017 if (error) { 4018 __put_lkb(ls, lkb); 4019 goto fail; 4020 } 4021 4022 /* The dir node is the authority on whether we are the master 4023 for this rsb or not, so if the master sends us a request, we should 4024 recreate the rsb if we've destroyed it. This race happens when we 4025 send a remove message to the dir node at the same time that the dir 4026 node sends us a request for the rsb. */ 4027 4028 namelen = receive_extralen(ms); 4029 4030 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, 4031 R_RECEIVE_REQUEST, &r); 4032 if (error) { 4033 __put_lkb(ls, lkb); 4034 goto fail; 4035 } 4036 4037 lock_rsb(r); 4038 4039 if (r->res_master_nodeid != dlm_our_nodeid()) { 4040 error = validate_master_nodeid(ls, r, from_nodeid); 4041 if (error) { 4042 unlock_rsb(r); 4043 put_rsb(r); 4044 __put_lkb(ls, lkb); 4045 goto fail; 4046 } 4047 } 4048 4049 attach_lkb(r, lkb); 4050 error = do_request(r, lkb); 4051 send_request_reply(r, lkb, error); 4052 do_request_effects(r, lkb, error); 4053 4054 unlock_rsb(r); 4055 put_rsb(r); 4056 4057 if (error == -EINPROGRESS) 4058 error = 0; 4059 if (error) 4060 dlm_put_lkb(lkb); 4061 return 0; 4062 4063 fail: 4064 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup 4065 and do this receive_request again from process_lookup_list once 4066 we get the lookup reply. This would avoid a many repeated 4067 ENOTBLK request failures when the lookup reply designating us 4068 as master is delayed. */ 4069 4070 if (error != -ENOTBLK) { 4071 log_limit(ls, "receive_request %x from %d %d", 4072 le32_to_cpu(ms->m_lkid), from_nodeid, error); 4073 } 4074 4075 setup_local_lkb(ls, ms); 4076 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 4077 return error; 4078 } 4079 4080 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms) 4081 { 4082 struct dlm_lkb *lkb; 4083 struct dlm_rsb *r; 4084 int error, reply = 1; 4085 4086 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4087 if (error) 4088 goto fail; 4089 4090 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 4091 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 4092 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 4093 (unsigned long long)lkb->lkb_recover_seq, 4094 le32_to_cpu(ms->m_header.h_nodeid), 4095 le32_to_cpu(ms->m_lkid)); 4096 error = -ENOENT; 4097 dlm_put_lkb(lkb); 4098 goto fail; 4099 } 4100 4101 r = lkb->lkb_resource; 4102 4103 hold_rsb(r); 4104 lock_rsb(r); 4105 4106 error = validate_message(lkb, ms); 4107 if (error) 4108 goto out; 4109 4110 receive_flags(lkb, ms); 4111 4112 error = receive_convert_args(ls, lkb, ms); 4113 if (error) { 4114 send_convert_reply(r, lkb, error); 4115 goto out; 4116 } 4117 4118 reply = !down_conversion(lkb); 4119 4120 error = do_convert(r, lkb); 4121 if (reply) 4122 send_convert_reply(r, lkb, error); 4123 do_convert_effects(r, lkb, error); 4124 out: 4125 unlock_rsb(r); 4126 put_rsb(r); 4127 dlm_put_lkb(lkb); 4128 return 0; 4129 4130 fail: 4131 setup_local_lkb(ls, ms); 4132 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 4133 return error; 4134 } 4135 4136 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms) 4137 { 4138 struct dlm_lkb *lkb; 4139 struct dlm_rsb *r; 4140 int error; 4141 4142 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4143 if (error) 4144 goto fail; 4145 4146 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) { 4147 log_error(ls, "receive_unlock %x remid %x remote %d %x", 4148 lkb->lkb_id, lkb->lkb_remid, 4149 le32_to_cpu(ms->m_header.h_nodeid), 4150 le32_to_cpu(ms->m_lkid)); 4151 error = -ENOENT; 4152 dlm_put_lkb(lkb); 4153 goto fail; 4154 } 4155 4156 r = lkb->lkb_resource; 4157 4158 hold_rsb(r); 4159 lock_rsb(r); 4160 4161 error = validate_message(lkb, ms); 4162 if (error) 4163 goto out; 4164 4165 receive_flags(lkb, ms); 4166 4167 error = receive_unlock_args(ls, lkb, ms); 4168 if (error) { 4169 send_unlock_reply(r, lkb, error); 4170 goto out; 4171 } 4172 4173 error = do_unlock(r, lkb); 4174 send_unlock_reply(r, lkb, error); 4175 do_unlock_effects(r, lkb, error); 4176 out: 4177 unlock_rsb(r); 4178 put_rsb(r); 4179 dlm_put_lkb(lkb); 4180 return 0; 4181 4182 fail: 4183 setup_local_lkb(ls, ms); 4184 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 4185 return error; 4186 } 4187 4188 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms) 4189 { 4190 struct dlm_lkb *lkb; 4191 struct dlm_rsb *r; 4192 int error; 4193 4194 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4195 if (error) 4196 goto fail; 4197 4198 receive_flags(lkb, ms); 4199 4200 r = lkb->lkb_resource; 4201 4202 hold_rsb(r); 4203 lock_rsb(r); 4204 4205 error = validate_message(lkb, ms); 4206 if (error) 4207 goto out; 4208 4209 error = do_cancel(r, lkb); 4210 send_cancel_reply(r, lkb, error); 4211 do_cancel_effects(r, lkb, error); 4212 out: 4213 unlock_rsb(r); 4214 put_rsb(r); 4215 dlm_put_lkb(lkb); 4216 return 0; 4217 4218 fail: 4219 setup_local_lkb(ls, ms); 4220 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error); 4221 return error; 4222 } 4223 4224 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms) 4225 { 4226 struct dlm_lkb *lkb; 4227 struct dlm_rsb *r; 4228 int error; 4229 4230 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4231 if (error) 4232 return error; 4233 4234 r = lkb->lkb_resource; 4235 4236 hold_rsb(r); 4237 lock_rsb(r); 4238 4239 error = validate_message(lkb, ms); 4240 if (error) 4241 goto out; 4242 4243 receive_flags_reply(lkb, ms, false); 4244 if (is_altmode(lkb)) 4245 munge_altmode(lkb, ms); 4246 grant_lock_pc(r, lkb, ms); 4247 queue_cast(r, lkb, 0); 4248 out: 4249 unlock_rsb(r); 4250 put_rsb(r); 4251 dlm_put_lkb(lkb); 4252 return 0; 4253 } 4254 4255 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms) 4256 { 4257 struct dlm_lkb *lkb; 4258 struct dlm_rsb *r; 4259 int error; 4260 4261 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4262 if (error) 4263 return error; 4264 4265 r = lkb->lkb_resource; 4266 4267 hold_rsb(r); 4268 lock_rsb(r); 4269 4270 error = validate_message(lkb, ms); 4271 if (error) 4272 goto out; 4273 4274 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode)); 4275 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode); 4276 out: 4277 unlock_rsb(r); 4278 put_rsb(r); 4279 dlm_put_lkb(lkb); 4280 return 0; 4281 } 4282 4283 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms) 4284 { 4285 int len, error, ret_nodeid, from_nodeid, our_nodeid; 4286 4287 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4288 our_nodeid = dlm_our_nodeid(); 4289 4290 len = receive_extralen(ms); 4291 4292 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, 4293 &ret_nodeid, NULL); 4294 4295 /* Optimization: we're master so treat lookup as a request */ 4296 if (!error && ret_nodeid == our_nodeid) { 4297 receive_request(ls, ms); 4298 return; 4299 } 4300 send_lookup_reply(ls, ms, ret_nodeid, error); 4301 } 4302 4303 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms) 4304 { 4305 char name[DLM_RESNAME_MAXLEN+1]; 4306 struct dlm_rsb *r; 4307 int rv, len, dir_nodeid, from_nodeid; 4308 4309 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4310 4311 len = receive_extralen(ms); 4312 4313 if (len > DLM_RESNAME_MAXLEN) { 4314 log_error(ls, "receive_remove from %d bad len %d", 4315 from_nodeid, len); 4316 return; 4317 } 4318 4319 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash)); 4320 if (dir_nodeid != dlm_our_nodeid()) { 4321 log_error(ls, "receive_remove from %d bad nodeid %d", 4322 from_nodeid, dir_nodeid); 4323 return; 4324 } 4325 4326 /* Look for name in rsb toss state, if it's there, kill it. 4327 * If it's in non toss state, it's being used, and we should ignore this 4328 * message. This is an expected race between the dir node sending a 4329 * request to the master node at the same time as the master node sends 4330 * a remove to the dir node. The resolution to that race is for the 4331 * dir node to ignore the remove message, and the master node to 4332 * recreate the master rsb when it gets a request from the dir node for 4333 * an rsb it doesn't have. 4334 */ 4335 4336 memset(name, 0, sizeof(name)); 4337 memcpy(name, ms->m_extra, len); 4338 4339 write_lock_bh(&ls->ls_rsbtbl_lock); 4340 4341 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); 4342 if (rv) { 4343 /* should not happen */ 4344 log_error(ls, "%s from %d not found %s", __func__, 4345 from_nodeid, name); 4346 write_unlock_bh(&ls->ls_rsbtbl_lock); 4347 return; 4348 } 4349 4350 if (!rsb_flag(r, RSB_TOSS)) { 4351 if (r->res_master_nodeid != from_nodeid) { 4352 /* should not happen */ 4353 log_error(ls, "receive_remove keep from %d master %d", 4354 from_nodeid, r->res_master_nodeid); 4355 dlm_print_rsb(r); 4356 write_unlock_bh(&ls->ls_rsbtbl_lock); 4357 return; 4358 } 4359 4360 log_debug(ls, "receive_remove from %d master %d first %x %s", 4361 from_nodeid, r->res_master_nodeid, r->res_first_lkid, 4362 name); 4363 write_unlock_bh(&ls->ls_rsbtbl_lock); 4364 return; 4365 } 4366 4367 if (r->res_master_nodeid != from_nodeid) { 4368 log_error(ls, "receive_remove toss from %d master %d", 4369 from_nodeid, r->res_master_nodeid); 4370 dlm_print_rsb(r); 4371 write_unlock_bh(&ls->ls_rsbtbl_lock); 4372 return; 4373 } 4374 4375 list_del(&r->res_rsbs_list); 4376 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, 4377 dlm_rhash_rsb_params); 4378 write_unlock_bh(&ls->ls_rsbtbl_lock); 4379 4380 free_toss_rsb(r); 4381 } 4382 4383 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms) 4384 { 4385 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid)); 4386 } 4387 4388 static int receive_request_reply(struct dlm_ls *ls, 4389 const struct dlm_message *ms) 4390 { 4391 struct dlm_lkb *lkb; 4392 struct dlm_rsb *r; 4393 int error, mstype, result; 4394 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid); 4395 4396 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4397 if (error) 4398 return error; 4399 4400 r = lkb->lkb_resource; 4401 hold_rsb(r); 4402 lock_rsb(r); 4403 4404 error = validate_message(lkb, ms); 4405 if (error) 4406 goto out; 4407 4408 mstype = lkb->lkb_wait_type; 4409 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4410 if (error) { 4411 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4412 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid), 4413 from_dlm_errno(le32_to_cpu(ms->m_result))); 4414 dlm_dump_rsb(r); 4415 goto out; 4416 } 4417 4418 /* Optimization: the dir node was also the master, so it took our 4419 lookup as a request and sent request reply instead of lookup reply */ 4420 if (mstype == DLM_MSG_LOOKUP) { 4421 r->res_master_nodeid = from_nodeid; 4422 r->res_nodeid = from_nodeid; 4423 lkb->lkb_nodeid = from_nodeid; 4424 } 4425 4426 /* this is the value returned from do_request() on the master */ 4427 result = from_dlm_errno(le32_to_cpu(ms->m_result)); 4428 4429 switch (result) { 4430 case -EAGAIN: 4431 /* request would block (be queued) on remote master */ 4432 queue_cast(r, lkb, -EAGAIN); 4433 confirm_master(r, -EAGAIN); 4434 unhold_lkb(lkb); /* undoes create_lkb() */ 4435 break; 4436 4437 case -EINPROGRESS: 4438 case 0: 4439 /* request was queued or granted on remote master */ 4440 receive_flags_reply(lkb, ms, false); 4441 lkb->lkb_remid = le32_to_cpu(ms->m_lkid); 4442 if (is_altmode(lkb)) 4443 munge_altmode(lkb, ms); 4444 if (result) { 4445 add_lkb(r, lkb, DLM_LKSTS_WAITING); 4446 } else { 4447 grant_lock_pc(r, lkb, ms); 4448 queue_cast(r, lkb, 0); 4449 } 4450 confirm_master(r, result); 4451 break; 4452 4453 case -EBADR: 4454 case -ENOTBLK: 4455 /* find_rsb failed to find rsb or rsb wasn't master */ 4456 log_limit(ls, "receive_request_reply %x from %d %d " 4457 "master %d dir %d first %x %s", lkb->lkb_id, 4458 from_nodeid, result, r->res_master_nodeid, 4459 r->res_dir_nodeid, r->res_first_lkid, r->res_name); 4460 4461 if (r->res_dir_nodeid != dlm_our_nodeid() && 4462 r->res_master_nodeid != dlm_our_nodeid()) { 4463 /* cause _request_lock->set_master->send_lookup */ 4464 r->res_master_nodeid = 0; 4465 r->res_nodeid = -1; 4466 lkb->lkb_nodeid = -1; 4467 } 4468 4469 if (is_overlap(lkb)) { 4470 /* we'll ignore error in cancel/unlock reply */ 4471 queue_cast_overlap(r, lkb); 4472 confirm_master(r, result); 4473 unhold_lkb(lkb); /* undoes create_lkb() */ 4474 } else { 4475 _request_lock(r, lkb); 4476 4477 if (r->res_master_nodeid == dlm_our_nodeid()) 4478 confirm_master(r, 0); 4479 } 4480 break; 4481 4482 default: 4483 log_error(ls, "receive_request_reply %x error %d", 4484 lkb->lkb_id, result); 4485 } 4486 4487 if ((result == 0 || result == -EINPROGRESS) && 4488 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) { 4489 log_debug(ls, "receive_request_reply %x result %d unlock", 4490 lkb->lkb_id, result); 4491 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 4492 send_unlock(r, lkb); 4493 } else if ((result == -EINPROGRESS) && 4494 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, 4495 &lkb->lkb_iflags)) { 4496 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 4497 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 4498 send_cancel(r, lkb); 4499 } else { 4500 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); 4501 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); 4502 } 4503 out: 4504 unlock_rsb(r); 4505 put_rsb(r); 4506 dlm_put_lkb(lkb); 4507 return 0; 4508 } 4509 4510 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 4511 const struct dlm_message *ms, bool local) 4512 { 4513 /* this is the value returned from do_convert() on the master */ 4514 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4515 case -EAGAIN: 4516 /* convert would block (be queued) on remote master */ 4517 queue_cast(r, lkb, -EAGAIN); 4518 break; 4519 4520 case -EDEADLK: 4521 receive_flags_reply(lkb, ms, local); 4522 revert_lock_pc(r, lkb); 4523 queue_cast(r, lkb, -EDEADLK); 4524 break; 4525 4526 case -EINPROGRESS: 4527 /* convert was queued on remote master */ 4528 receive_flags_reply(lkb, ms, local); 4529 if (is_demoted(lkb)) 4530 munge_demoted(lkb); 4531 del_lkb(r, lkb); 4532 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 4533 break; 4534 4535 case 0: 4536 /* convert was granted on remote master */ 4537 receive_flags_reply(lkb, ms, local); 4538 if (is_demoted(lkb)) 4539 munge_demoted(lkb); 4540 grant_lock_pc(r, lkb, ms); 4541 queue_cast(r, lkb, 0); 4542 break; 4543 4544 default: 4545 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 4546 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4547 le32_to_cpu(ms->m_lkid), 4548 from_dlm_errno(le32_to_cpu(ms->m_result))); 4549 dlm_print_rsb(r); 4550 dlm_print_lkb(lkb); 4551 } 4552 } 4553 4554 static void _receive_convert_reply(struct dlm_lkb *lkb, 4555 const struct dlm_message *ms, bool local) 4556 { 4557 struct dlm_rsb *r = lkb->lkb_resource; 4558 int error; 4559 4560 hold_rsb(r); 4561 lock_rsb(r); 4562 4563 error = validate_message(lkb, ms); 4564 if (error) 4565 goto out; 4566 4567 error = remove_from_waiters_ms(lkb, ms, local); 4568 if (error) 4569 goto out; 4570 4571 __receive_convert_reply(r, lkb, ms, local); 4572 out: 4573 unlock_rsb(r); 4574 put_rsb(r); 4575 } 4576 4577 static int receive_convert_reply(struct dlm_ls *ls, 4578 const struct dlm_message *ms) 4579 { 4580 struct dlm_lkb *lkb; 4581 int error; 4582 4583 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4584 if (error) 4585 return error; 4586 4587 _receive_convert_reply(lkb, ms, false); 4588 dlm_put_lkb(lkb); 4589 return 0; 4590 } 4591 4592 static void _receive_unlock_reply(struct dlm_lkb *lkb, 4593 const struct dlm_message *ms, bool local) 4594 { 4595 struct dlm_rsb *r = lkb->lkb_resource; 4596 int error; 4597 4598 hold_rsb(r); 4599 lock_rsb(r); 4600 4601 error = validate_message(lkb, ms); 4602 if (error) 4603 goto out; 4604 4605 error = remove_from_waiters_ms(lkb, ms, local); 4606 if (error) 4607 goto out; 4608 4609 /* this is the value returned from do_unlock() on the master */ 4610 4611 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4612 case -DLM_EUNLOCK: 4613 receive_flags_reply(lkb, ms, local); 4614 remove_lock_pc(r, lkb); 4615 queue_cast(r, lkb, -DLM_EUNLOCK); 4616 break; 4617 case -ENOENT: 4618 break; 4619 default: 4620 log_error(r->res_ls, "receive_unlock_reply %x error %d", 4621 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result))); 4622 } 4623 out: 4624 unlock_rsb(r); 4625 put_rsb(r); 4626 } 4627 4628 static int receive_unlock_reply(struct dlm_ls *ls, 4629 const struct dlm_message *ms) 4630 { 4631 struct dlm_lkb *lkb; 4632 int error; 4633 4634 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4635 if (error) 4636 return error; 4637 4638 _receive_unlock_reply(lkb, ms, false); 4639 dlm_put_lkb(lkb); 4640 return 0; 4641 } 4642 4643 static void _receive_cancel_reply(struct dlm_lkb *lkb, 4644 const struct dlm_message *ms, bool local) 4645 { 4646 struct dlm_rsb *r = lkb->lkb_resource; 4647 int error; 4648 4649 hold_rsb(r); 4650 lock_rsb(r); 4651 4652 error = validate_message(lkb, ms); 4653 if (error) 4654 goto out; 4655 4656 error = remove_from_waiters_ms(lkb, ms, local); 4657 if (error) 4658 goto out; 4659 4660 /* this is the value returned from do_cancel() on the master */ 4661 4662 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) { 4663 case -DLM_ECANCEL: 4664 receive_flags_reply(lkb, ms, local); 4665 revert_lock_pc(r, lkb); 4666 queue_cast(r, lkb, -DLM_ECANCEL); 4667 break; 4668 case 0: 4669 break; 4670 default: 4671 log_error(r->res_ls, "receive_cancel_reply %x error %d", 4672 lkb->lkb_id, 4673 from_dlm_errno(le32_to_cpu(ms->m_result))); 4674 } 4675 out: 4676 unlock_rsb(r); 4677 put_rsb(r); 4678 } 4679 4680 static int receive_cancel_reply(struct dlm_ls *ls, 4681 const struct dlm_message *ms) 4682 { 4683 struct dlm_lkb *lkb; 4684 int error; 4685 4686 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb); 4687 if (error) 4688 return error; 4689 4690 _receive_cancel_reply(lkb, ms, false); 4691 dlm_put_lkb(lkb); 4692 return 0; 4693 } 4694 4695 static void receive_lookup_reply(struct dlm_ls *ls, 4696 const struct dlm_message *ms) 4697 { 4698 struct dlm_lkb *lkb; 4699 struct dlm_rsb *r; 4700 int error, ret_nodeid; 4701 int do_lookup_list = 0; 4702 4703 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb); 4704 if (error) { 4705 log_error(ls, "%s no lkid %x", __func__, 4706 le32_to_cpu(ms->m_lkid)); 4707 return; 4708 } 4709 4710 /* ms->m_result is the value returned by dlm_master_lookup on dir node 4711 FIXME: will a non-zero error ever be returned? */ 4712 4713 r = lkb->lkb_resource; 4714 hold_rsb(r); 4715 lock_rsb(r); 4716 4717 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 4718 if (error) 4719 goto out; 4720 4721 ret_nodeid = le32_to_cpu(ms->m_nodeid); 4722 4723 /* We sometimes receive a request from the dir node for this 4724 rsb before we've received the dir node's loookup_reply for it. 4725 The request from the dir node implies we're the master, so we set 4726 ourself as master in receive_request_reply, and verify here that 4727 we are indeed the master. */ 4728 4729 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { 4730 /* This should never happen */ 4731 log_error(ls, "receive_lookup_reply %x from %d ret %d " 4732 "master %d dir %d our %d first %x %s", 4733 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid), 4734 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 4735 dlm_our_nodeid(), r->res_first_lkid, r->res_name); 4736 } 4737 4738 if (ret_nodeid == dlm_our_nodeid()) { 4739 r->res_master_nodeid = ret_nodeid; 4740 r->res_nodeid = 0; 4741 do_lookup_list = 1; 4742 r->res_first_lkid = 0; 4743 } else if (ret_nodeid == -1) { 4744 /* the remote node doesn't believe it's the dir node */ 4745 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", 4746 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid)); 4747 r->res_master_nodeid = 0; 4748 r->res_nodeid = -1; 4749 lkb->lkb_nodeid = -1; 4750 } else { 4751 /* set_master() will set lkb_nodeid from r */ 4752 r->res_master_nodeid = ret_nodeid; 4753 r->res_nodeid = ret_nodeid; 4754 } 4755 4756 if (is_overlap(lkb)) { 4757 log_debug(ls, "receive_lookup_reply %x unlock %x", 4758 lkb->lkb_id, dlm_iflags_val(lkb)); 4759 queue_cast_overlap(r, lkb); 4760 unhold_lkb(lkb); /* undoes create_lkb() */ 4761 goto out_list; 4762 } 4763 4764 _request_lock(r, lkb); 4765 4766 out_list: 4767 if (do_lookup_list) 4768 process_lookup_list(r); 4769 out: 4770 unlock_rsb(r); 4771 put_rsb(r); 4772 dlm_put_lkb(lkb); 4773 } 4774 4775 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms, 4776 uint32_t saved_seq) 4777 { 4778 int error = 0, noent = 0; 4779 4780 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) { 4781 log_limit(ls, "receive %d from non-member %d %x %x %d", 4782 le32_to_cpu(ms->m_type), 4783 le32_to_cpu(ms->m_header.h_nodeid), 4784 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4785 from_dlm_errno(le32_to_cpu(ms->m_result))); 4786 return; 4787 } 4788 4789 switch (ms->m_type) { 4790 4791 /* messages sent to a master node */ 4792 4793 case cpu_to_le32(DLM_MSG_REQUEST): 4794 error = receive_request(ls, ms); 4795 break; 4796 4797 case cpu_to_le32(DLM_MSG_CONVERT): 4798 error = receive_convert(ls, ms); 4799 break; 4800 4801 case cpu_to_le32(DLM_MSG_UNLOCK): 4802 error = receive_unlock(ls, ms); 4803 break; 4804 4805 case cpu_to_le32(DLM_MSG_CANCEL): 4806 noent = 1; 4807 error = receive_cancel(ls, ms); 4808 break; 4809 4810 /* messages sent from a master node (replies to above) */ 4811 4812 case cpu_to_le32(DLM_MSG_REQUEST_REPLY): 4813 error = receive_request_reply(ls, ms); 4814 break; 4815 4816 case cpu_to_le32(DLM_MSG_CONVERT_REPLY): 4817 error = receive_convert_reply(ls, ms); 4818 break; 4819 4820 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY): 4821 error = receive_unlock_reply(ls, ms); 4822 break; 4823 4824 case cpu_to_le32(DLM_MSG_CANCEL_REPLY): 4825 error = receive_cancel_reply(ls, ms); 4826 break; 4827 4828 /* messages sent from a master node (only two types of async msg) */ 4829 4830 case cpu_to_le32(DLM_MSG_GRANT): 4831 noent = 1; 4832 error = receive_grant(ls, ms); 4833 break; 4834 4835 case cpu_to_le32(DLM_MSG_BAST): 4836 noent = 1; 4837 error = receive_bast(ls, ms); 4838 break; 4839 4840 /* messages sent to a dir node */ 4841 4842 case cpu_to_le32(DLM_MSG_LOOKUP): 4843 receive_lookup(ls, ms); 4844 break; 4845 4846 case cpu_to_le32(DLM_MSG_REMOVE): 4847 receive_remove(ls, ms); 4848 break; 4849 4850 /* messages sent from a dir node (remove has no reply) */ 4851 4852 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY): 4853 receive_lookup_reply(ls, ms); 4854 break; 4855 4856 /* other messages */ 4857 4858 case cpu_to_le32(DLM_MSG_PURGE): 4859 receive_purge(ls, ms); 4860 break; 4861 4862 default: 4863 log_error(ls, "unknown message type %d", 4864 le32_to_cpu(ms->m_type)); 4865 } 4866 4867 /* 4868 * When checking for ENOENT, we're checking the result of 4869 * find_lkb(m_remid): 4870 * 4871 * The lock id referenced in the message wasn't found. This may 4872 * happen in normal usage for the async messages and cancel, so 4873 * only use log_debug for them. 4874 * 4875 * Some errors are expected and normal. 4876 */ 4877 4878 if (error == -ENOENT && noent) { 4879 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 4880 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 4881 le32_to_cpu(ms->m_header.h_nodeid), 4882 le32_to_cpu(ms->m_lkid), saved_seq); 4883 } else if (error == -ENOENT) { 4884 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 4885 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid), 4886 le32_to_cpu(ms->m_header.h_nodeid), 4887 le32_to_cpu(ms->m_lkid), saved_seq); 4888 4889 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT)) 4890 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash)); 4891 } 4892 4893 if (error == -EINVAL) { 4894 log_error(ls, "receive %d inval from %d lkid %x remid %x " 4895 "saved_seq %u", 4896 le32_to_cpu(ms->m_type), 4897 le32_to_cpu(ms->m_header.h_nodeid), 4898 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid), 4899 saved_seq); 4900 } 4901 } 4902 4903 /* If the lockspace is in recovery mode (locking stopped), then normal 4904 messages are saved on the requestqueue for processing after recovery is 4905 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 4906 messages off the requestqueue before we process new ones. This occurs right 4907 after recovery completes when we transition from saving all messages on 4908 requestqueue, to processing all the saved messages, to processing new 4909 messages as they arrive. */ 4910 4911 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms, 4912 int nodeid) 4913 { 4914 try_again: 4915 read_lock_bh(&ls->ls_requestqueue_lock); 4916 if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { 4917 /* If we were a member of this lockspace, left, and rejoined, 4918 other nodes may still be sending us messages from the 4919 lockspace generation before we left. */ 4920 if (WARN_ON_ONCE(!ls->ls_generation)) { 4921 read_unlock_bh(&ls->ls_requestqueue_lock); 4922 log_limit(ls, "receive %d from %d ignore old gen", 4923 le32_to_cpu(ms->m_type), nodeid); 4924 return; 4925 } 4926 4927 read_unlock_bh(&ls->ls_requestqueue_lock); 4928 write_lock_bh(&ls->ls_requestqueue_lock); 4929 /* recheck because we hold writelock now */ 4930 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) { 4931 write_unlock_bh(&ls->ls_requestqueue_lock); 4932 goto try_again; 4933 } 4934 4935 dlm_add_requestqueue(ls, nodeid, ms); 4936 write_unlock_bh(&ls->ls_requestqueue_lock); 4937 } else { 4938 _receive_message(ls, ms, 0); 4939 read_unlock_bh(&ls->ls_requestqueue_lock); 4940 } 4941 } 4942 4943 /* This is called by dlm_recoverd to process messages that were saved on 4944 the requestqueue. */ 4945 4946 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms, 4947 uint32_t saved_seq) 4948 { 4949 _receive_message(ls, ms, saved_seq); 4950 } 4951 4952 /* This is called by the midcomms layer when something is received for 4953 the lockspace. It could be either a MSG (normal message sent as part of 4954 standard locking activity) or an RCOM (recovery message sent as part of 4955 lockspace recovery). */ 4956 4957 void dlm_receive_buffer(const union dlm_packet *p, int nodeid) 4958 { 4959 const struct dlm_header *hd = &p->header; 4960 struct dlm_ls *ls; 4961 int type = 0; 4962 4963 switch (hd->h_cmd) { 4964 case DLM_MSG: 4965 type = le32_to_cpu(p->message.m_type); 4966 break; 4967 case DLM_RCOM: 4968 type = le32_to_cpu(p->rcom.rc_type); 4969 break; 4970 default: 4971 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); 4972 return; 4973 } 4974 4975 if (le32_to_cpu(hd->h_nodeid) != nodeid) { 4976 log_print("invalid h_nodeid %d from %d lockspace %x", 4977 le32_to_cpu(hd->h_nodeid), nodeid, 4978 le32_to_cpu(hd->u.h_lockspace)); 4979 return; 4980 } 4981 4982 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace)); 4983 if (!ls) { 4984 if (dlm_config.ci_log_debug) { 4985 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 4986 "%u from %d cmd %d type %d\n", 4987 le32_to_cpu(hd->u.h_lockspace), nodeid, 4988 hd->h_cmd, type); 4989 } 4990 4991 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 4992 dlm_send_ls_not_ready(nodeid, &p->rcom); 4993 return; 4994 } 4995 4996 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 4997 be inactive (in this ls) before transitioning to recovery mode */ 4998 4999 read_lock_bh(&ls->ls_recv_active); 5000 if (hd->h_cmd == DLM_MSG) 5001 dlm_receive_message(ls, &p->message, nodeid); 5002 else if (hd->h_cmd == DLM_RCOM) 5003 dlm_receive_rcom(ls, &p->rcom, nodeid); 5004 else 5005 log_error(ls, "invalid h_cmd %d from %d lockspace %x", 5006 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace)); 5007 read_unlock_bh(&ls->ls_recv_active); 5008 5009 dlm_put_lockspace(ls); 5010 } 5011 5012 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, 5013 struct dlm_message *ms_local) 5014 { 5015 if (middle_conversion(lkb)) { 5016 hold_lkb(lkb); 5017 memset(ms_local, 0, sizeof(struct dlm_message)); 5018 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY); 5019 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS)); 5020 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5021 _receive_convert_reply(lkb, ms_local, true); 5022 5023 /* Same special case as in receive_rcom_lock_args() */ 5024 lkb->lkb_grmode = DLM_LOCK_IV; 5025 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 5026 unhold_lkb(lkb); 5027 5028 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 5029 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 5030 } 5031 5032 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 5033 conversions are async; there's no reply from the remote master */ 5034 } 5035 5036 /* A waiting lkb needs recovery if the master node has failed, or 5037 the master node is changing (only when no directory is used) */ 5038 5039 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 5040 int dir_nodeid) 5041 { 5042 if (dlm_no_directory(ls)) 5043 return 1; 5044 5045 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 5046 return 1; 5047 5048 return 0; 5049 } 5050 5051 /* Recovery for locks that are waiting for replies from nodes that are now 5052 gone. We can just complete unlocks and cancels by faking a reply from the 5053 dead node. Requests and up-conversions we flag to be resent after 5054 recovery. Down-conversions can just be completed with a fake reply like 5055 unlocks. Conversions between PR and CW need special attention. */ 5056 5057 void dlm_recover_waiters_pre(struct dlm_ls *ls) 5058 { 5059 struct dlm_lkb *lkb, *safe; 5060 struct dlm_message *ms_local; 5061 int wait_type, local_unlock_result, local_cancel_result; 5062 int dir_nodeid; 5063 5064 ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL); 5065 if (!ms_local) 5066 return; 5067 5068 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 5069 5070 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 5071 5072 /* exclude debug messages about unlocks because there can be so 5073 many and they aren't very interesting */ 5074 5075 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 5076 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5077 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 5078 lkb->lkb_id, 5079 lkb->lkb_remid, 5080 lkb->lkb_wait_type, 5081 lkb->lkb_resource->res_nodeid, 5082 lkb->lkb_nodeid, 5083 lkb->lkb_wait_nodeid, 5084 dir_nodeid); 5085 } 5086 5087 /* all outstanding lookups, regardless of destination will be 5088 resent after recovery is done */ 5089 5090 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 5091 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 5092 continue; 5093 } 5094 5095 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) 5096 continue; 5097 5098 wait_type = lkb->lkb_wait_type; 5099 local_unlock_result = -DLM_EUNLOCK; 5100 local_cancel_result = -DLM_ECANCEL; 5101 5102 /* Main reply may have been received leaving a zero wait_type, 5103 but a reply for the overlapping op may not have been 5104 received. In that case we need to fake the appropriate 5105 reply for the overlap op. */ 5106 5107 if (!wait_type) { 5108 if (is_overlap_cancel(lkb)) { 5109 wait_type = DLM_MSG_CANCEL; 5110 if (lkb->lkb_grmode == DLM_LOCK_IV) 5111 local_cancel_result = 0; 5112 } 5113 if (is_overlap_unlock(lkb)) { 5114 wait_type = DLM_MSG_UNLOCK; 5115 if (lkb->lkb_grmode == DLM_LOCK_IV) 5116 local_unlock_result = -ENOENT; 5117 } 5118 5119 log_debug(ls, "rwpre overlap %x %x %d %d %d", 5120 lkb->lkb_id, dlm_iflags_val(lkb), wait_type, 5121 local_cancel_result, local_unlock_result); 5122 } 5123 5124 switch (wait_type) { 5125 5126 case DLM_MSG_REQUEST: 5127 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 5128 break; 5129 5130 case DLM_MSG_CONVERT: 5131 recover_convert_waiter(ls, lkb, ms_local); 5132 break; 5133 5134 case DLM_MSG_UNLOCK: 5135 hold_lkb(lkb); 5136 memset(ms_local, 0, sizeof(struct dlm_message)); 5137 ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY); 5138 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result)); 5139 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5140 _receive_unlock_reply(lkb, ms_local, true); 5141 dlm_put_lkb(lkb); 5142 break; 5143 5144 case DLM_MSG_CANCEL: 5145 hold_lkb(lkb); 5146 memset(ms_local, 0, sizeof(struct dlm_message)); 5147 ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY); 5148 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result)); 5149 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid); 5150 _receive_cancel_reply(lkb, ms_local, true); 5151 dlm_put_lkb(lkb); 5152 break; 5153 5154 default: 5155 log_error(ls, "invalid lkb wait_type %d %d", 5156 lkb->lkb_wait_type, wait_type); 5157 } 5158 schedule(); 5159 } 5160 kfree(ms_local); 5161 } 5162 5163 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 5164 { 5165 struct dlm_lkb *lkb = NULL, *iter; 5166 5167 spin_lock_bh(&ls->ls_waiters_lock); 5168 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) { 5169 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) { 5170 hold_lkb(iter); 5171 lkb = iter; 5172 break; 5173 } 5174 } 5175 spin_unlock_bh(&ls->ls_waiters_lock); 5176 5177 return lkb; 5178 } 5179 5180 /* 5181 * Forced state reset for locks that were in the middle of remote operations 5182 * when recovery happened (i.e. lkbs that were on the waiters list, waiting 5183 * for a reply from a remote operation.) The lkbs remaining on the waiters 5184 * list need to be reevaluated; some may need resending to a different node 5185 * than previously, and some may now need local handling rather than remote. 5186 * 5187 * First, the lkb state for the voided remote operation is forcibly reset, 5188 * equivalent to what remove_from_waiters() would normally do: 5189 * . lkb removed from ls_waiters list 5190 * . lkb wait_type cleared 5191 * . lkb waiters_count cleared 5192 * . lkb ref count decremented for each waiters_count (almost always 1, 5193 * but possibly 2 in case of cancel/unlock overlapping, which means 5194 * two remote replies were being expected for the lkb.) 5195 * 5196 * Second, the lkb is reprocessed like an original operation would be, 5197 * by passing it to _request_lock or _convert_lock, which will either 5198 * process the lkb operation locally, or send it to a remote node again 5199 * and put the lkb back onto the waiters list. 5200 * 5201 * When reprocessing the lkb, we may find that it's flagged for an overlapping 5202 * force-unlock or cancel, either from before recovery began, or after recovery 5203 * finished. If this is the case, the unlock/cancel is done directly, and the 5204 * original operation is not initiated again (no _request_lock/_convert_lock.) 5205 */ 5206 5207 int dlm_recover_waiters_post(struct dlm_ls *ls) 5208 { 5209 struct dlm_lkb *lkb; 5210 struct dlm_rsb *r; 5211 int error = 0, mstype, err, oc, ou; 5212 5213 while (1) { 5214 if (dlm_locking_stopped(ls)) { 5215 log_debug(ls, "recover_waiters_post aborted"); 5216 error = -EINTR; 5217 break; 5218 } 5219 5220 /* 5221 * Find an lkb from the waiters list that's been affected by 5222 * recovery node changes, and needs to be reprocessed. Does 5223 * hold_lkb(), adding a refcount. 5224 */ 5225 lkb = find_resend_waiter(ls); 5226 if (!lkb) 5227 break; 5228 5229 r = lkb->lkb_resource; 5230 hold_rsb(r); 5231 lock_rsb(r); 5232 5233 /* 5234 * If the lkb has been flagged for a force unlock or cancel, 5235 * then the reprocessing below will be replaced by just doing 5236 * the unlock/cancel directly. 5237 */ 5238 mstype = lkb->lkb_wait_type; 5239 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, 5240 &lkb->lkb_iflags); 5241 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, 5242 &lkb->lkb_iflags); 5243 err = 0; 5244 5245 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5246 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 5247 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 5248 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 5249 dlm_dir_nodeid(r), oc, ou); 5250 5251 /* 5252 * No reply to the pre-recovery operation will now be received, 5253 * so a forced equivalent of remove_from_waiters() is needed to 5254 * reset the waiters state that was in place before recovery. 5255 */ 5256 5257 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags); 5258 5259 /* Forcibly clear wait_type */ 5260 lkb->lkb_wait_type = 0; 5261 5262 /* 5263 * Forcibly reset wait_count and associated refcount. The 5264 * wait_count will almost always be 1, but in case of an 5265 * overlapping unlock/cancel it could be 2: see where 5266 * add_to_waiters() finds the lkb is already on the waiters 5267 * list and does lkb_wait_count++; hold_lkb(). 5268 */ 5269 while (lkb->lkb_wait_count) { 5270 lkb->lkb_wait_count--; 5271 unhold_lkb(lkb); 5272 } 5273 5274 /* Forcibly remove from waiters list */ 5275 spin_lock_bh(&ls->ls_waiters_lock); 5276 list_del_init(&lkb->lkb_wait_reply); 5277 spin_unlock_bh(&ls->ls_waiters_lock); 5278 5279 /* 5280 * The lkb is now clear of all prior waiters state and can be 5281 * processed locally, or sent to remote node again, or directly 5282 * cancelled/unlocked. 5283 */ 5284 5285 if (oc || ou) { 5286 /* do an unlock or cancel instead of resending */ 5287 switch (mstype) { 5288 case DLM_MSG_LOOKUP: 5289 case DLM_MSG_REQUEST: 5290 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 5291 -DLM_ECANCEL); 5292 unhold_lkb(lkb); /* undoes create_lkb() */ 5293 break; 5294 case DLM_MSG_CONVERT: 5295 if (oc) { 5296 queue_cast(r, lkb, -DLM_ECANCEL); 5297 } else { 5298 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 5299 _unlock_lock(r, lkb); 5300 } 5301 break; 5302 default: 5303 err = 1; 5304 } 5305 } else { 5306 switch (mstype) { 5307 case DLM_MSG_LOOKUP: 5308 case DLM_MSG_REQUEST: 5309 _request_lock(r, lkb); 5310 if (is_master(r)) 5311 confirm_master(r, 0); 5312 break; 5313 case DLM_MSG_CONVERT: 5314 _convert_lock(r, lkb); 5315 break; 5316 default: 5317 err = 1; 5318 } 5319 } 5320 5321 if (err) { 5322 log_error(ls, "waiter %x msg %d r_nodeid %d " 5323 "dir_nodeid %d overlap %d %d", 5324 lkb->lkb_id, mstype, r->res_nodeid, 5325 dlm_dir_nodeid(r), oc, ou); 5326 } 5327 unlock_rsb(r); 5328 put_rsb(r); 5329 dlm_put_lkb(lkb); 5330 } 5331 5332 return error; 5333 } 5334 5335 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 5336 struct list_head *list) 5337 { 5338 struct dlm_lkb *lkb, *safe; 5339 5340 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5341 if (!is_master_copy(lkb)) 5342 continue; 5343 5344 /* don't purge lkbs we've added in recover_master_copy for 5345 the current recovery seq */ 5346 5347 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 5348 continue; 5349 5350 del_lkb(r, lkb); 5351 5352 /* this put should free the lkb */ 5353 if (!dlm_put_lkb(lkb)) 5354 log_error(ls, "purged mstcpy lkb not released"); 5355 } 5356 } 5357 5358 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 5359 { 5360 struct dlm_ls *ls = r->res_ls; 5361 5362 purge_mstcpy_list(ls, r, &r->res_grantqueue); 5363 purge_mstcpy_list(ls, r, &r->res_convertqueue); 5364 purge_mstcpy_list(ls, r, &r->res_waitqueue); 5365 } 5366 5367 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 5368 struct list_head *list, 5369 int nodeid_gone, unsigned int *count) 5370 { 5371 struct dlm_lkb *lkb, *safe; 5372 5373 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5374 if (!is_master_copy(lkb)) 5375 continue; 5376 5377 if ((lkb->lkb_nodeid == nodeid_gone) || 5378 dlm_is_removed(ls, lkb->lkb_nodeid)) { 5379 5380 /* tell recover_lvb to invalidate the lvb 5381 because a node holding EX/PW failed */ 5382 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && 5383 (lkb->lkb_grmode >= DLM_LOCK_PW)) { 5384 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); 5385 } 5386 5387 del_lkb(r, lkb); 5388 5389 /* this put should free the lkb */ 5390 if (!dlm_put_lkb(lkb)) 5391 log_error(ls, "purged dead lkb not released"); 5392 5393 rsb_set_flag(r, RSB_RECOVER_GRANT); 5394 5395 (*count)++; 5396 } 5397 } 5398 } 5399 5400 /* Get rid of locks held by nodes that are gone. */ 5401 5402 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list) 5403 { 5404 struct dlm_rsb *r; 5405 struct dlm_member *memb; 5406 int nodes_count = 0; 5407 int nodeid_gone = 0; 5408 unsigned int lkb_count = 0; 5409 5410 /* cache one removed nodeid to optimize the common 5411 case of a single node removed */ 5412 5413 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 5414 nodes_count++; 5415 nodeid_gone = memb->nodeid; 5416 } 5417 5418 if (!nodes_count) 5419 return; 5420 5421 list_for_each_entry(r, root_list, res_root_list) { 5422 hold_rsb(r); 5423 lock_rsb(r); 5424 if (is_master(r)) { 5425 purge_dead_list(ls, r, &r->res_grantqueue, 5426 nodeid_gone, &lkb_count); 5427 purge_dead_list(ls, r, &r->res_convertqueue, 5428 nodeid_gone, &lkb_count); 5429 purge_dead_list(ls, r, &r->res_waitqueue, 5430 nodeid_gone, &lkb_count); 5431 } 5432 unlock_rsb(r); 5433 unhold_rsb(r); 5434 cond_resched(); 5435 } 5436 5437 if (lkb_count) 5438 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", 5439 lkb_count, nodes_count); 5440 } 5441 5442 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls) 5443 { 5444 struct dlm_rsb *r; 5445 5446 read_lock_bh(&ls->ls_rsbtbl_lock); 5447 list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) { 5448 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5449 continue; 5450 if (!is_master(r)) { 5451 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5452 continue; 5453 } 5454 hold_rsb(r); 5455 read_unlock_bh(&ls->ls_rsbtbl_lock); 5456 return r; 5457 } 5458 read_unlock_bh(&ls->ls_rsbtbl_lock); 5459 return NULL; 5460 } 5461 5462 /* 5463 * Attempt to grant locks on resources that we are the master of. 5464 * Locks may have become grantable during recovery because locks 5465 * from departed nodes have been purged (or not rebuilt), allowing 5466 * previously blocked locks to now be granted. The subset of rsb's 5467 * we are interested in are those with lkb's on either the convert or 5468 * waiting queues. 5469 * 5470 * Simplest would be to go through each master rsb and check for non-empty 5471 * convert or waiting queues, and attempt to grant on those rsbs. 5472 * Checking the queues requires lock_rsb, though, for which we'd need 5473 * to release the rsbtbl lock. This would make iterating through all 5474 * rsb's very inefficient. So, we rely on earlier recovery routines 5475 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 5476 * locks for. 5477 */ 5478 5479 void dlm_recover_grant(struct dlm_ls *ls) 5480 { 5481 struct dlm_rsb *r; 5482 unsigned int count = 0; 5483 unsigned int rsb_count = 0; 5484 unsigned int lkb_count = 0; 5485 5486 while (1) { 5487 r = find_grant_rsb(ls); 5488 if (!r) 5489 break; 5490 5491 rsb_count++; 5492 count = 0; 5493 lock_rsb(r); 5494 /* the RECOVER_GRANT flag is checked in the grant path */ 5495 grant_pending_locks(r, &count); 5496 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5497 lkb_count += count; 5498 confirm_master(r, 0); 5499 unlock_rsb(r); 5500 put_rsb(r); 5501 cond_resched(); 5502 } 5503 5504 if (lkb_count) 5505 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", 5506 lkb_count, rsb_count); 5507 } 5508 5509 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 5510 uint32_t remid) 5511 { 5512 struct dlm_lkb *lkb; 5513 5514 list_for_each_entry(lkb, head, lkb_statequeue) { 5515 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 5516 return lkb; 5517 } 5518 return NULL; 5519 } 5520 5521 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 5522 uint32_t remid) 5523 { 5524 struct dlm_lkb *lkb; 5525 5526 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 5527 if (lkb) 5528 return lkb; 5529 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 5530 if (lkb) 5531 return lkb; 5532 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 5533 if (lkb) 5534 return lkb; 5535 return NULL; 5536 } 5537 5538 /* needs at least dlm_rcom + rcom_lock */ 5539 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 5540 struct dlm_rsb *r, const struct dlm_rcom *rc) 5541 { 5542 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5543 5544 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5545 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); 5546 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); 5547 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); 5548 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags)); 5549 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags); 5550 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); 5551 lkb->lkb_rqmode = rl->rl_rqmode; 5552 lkb->lkb_grmode = rl->rl_grmode; 5553 /* don't set lkb_status because add_lkb wants to itself */ 5554 5555 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 5556 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 5557 5558 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 5559 int lvblen = le16_to_cpu(rc->rc_header.h_length) - 5560 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock); 5561 if (lvblen > ls->ls_lvblen) 5562 return -EINVAL; 5563 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 5564 if (!lkb->lkb_lvbptr) 5565 return -ENOMEM; 5566 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 5567 } 5568 5569 /* Conversions between PR and CW (middle modes) need special handling. 5570 The real granted mode of these converting locks cannot be determined 5571 until all locks have been rebuilt on the rsb (recover_conversion) */ 5572 5573 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && 5574 middle_conversion(lkb)) { 5575 rl->rl_status = DLM_LKSTS_CONVERT; 5576 lkb->lkb_grmode = DLM_LOCK_IV; 5577 rsb_set_flag(r, RSB_RECOVER_CONVERT); 5578 } 5579 5580 return 0; 5581 } 5582 5583 /* This lkb may have been recovered in a previous aborted recovery so we need 5584 to check if the rsb already has an lkb with the given remote nodeid/lkid. 5585 If so we just send back a standard reply. If not, we create a new lkb with 5586 the given values and send back our lkid. We send back our lkid by sending 5587 back the rcom_lock struct we got but with the remid field filled in. */ 5588 5589 /* needs at least dlm_rcom + rcom_lock */ 5590 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, 5591 __le32 *rl_remid, __le32 *rl_result) 5592 { 5593 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5594 struct dlm_rsb *r; 5595 struct dlm_lkb *lkb; 5596 uint32_t remid = 0; 5597 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid); 5598 int error; 5599 5600 /* init rl_remid with rcom lock rl_remid */ 5601 *rl_remid = rl->rl_remid; 5602 5603 if (rl->rl_parent_lkid) { 5604 error = -EOPNOTSUPP; 5605 goto out; 5606 } 5607 5608 remid = le32_to_cpu(rl->rl_lkid); 5609 5610 /* In general we expect the rsb returned to be R_MASTER, but we don't 5611 have to require it. Recovery of masters on one node can overlap 5612 recovery of locks on another node, so one node can send us MSTCPY 5613 locks before we've made ourselves master of this rsb. We can still 5614 add new MSTCPY locks that we receive here without any harm; when 5615 we make ourselves master, dlm_recover_masters() won't touch the 5616 MSTCPY locks we've received early. */ 5617 5618 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 5619 from_nodeid, R_RECEIVE_RECOVER, &r); 5620 if (error) 5621 goto out; 5622 5623 lock_rsb(r); 5624 5625 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5626 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5627 from_nodeid, remid); 5628 error = -EBADR; 5629 goto out_unlock; 5630 } 5631 5632 lkb = search_remid(r, from_nodeid, remid); 5633 if (lkb) { 5634 error = -EEXIST; 5635 goto out_remid; 5636 } 5637 5638 error = create_lkb(ls, &lkb); 5639 if (error) 5640 goto out_unlock; 5641 5642 error = receive_rcom_lock_args(ls, lkb, r, rc); 5643 if (error) { 5644 __put_lkb(ls, lkb); 5645 goto out_unlock; 5646 } 5647 5648 attach_lkb(r, lkb); 5649 add_lkb(r, lkb, rl->rl_status); 5650 ls->ls_recover_locks_in++; 5651 5652 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 5653 rsb_set_flag(r, RSB_RECOVER_GRANT); 5654 5655 out_remid: 5656 /* this is the new value returned to the lock holder for 5657 saving in its process-copy lkb */ 5658 *rl_remid = cpu_to_le32(lkb->lkb_id); 5659 5660 lkb->lkb_recover_seq = ls->ls_recover_seq; 5661 5662 out_unlock: 5663 unlock_rsb(r); 5664 put_rsb(r); 5665 out: 5666 if (error && error != -EEXIST) 5667 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", 5668 from_nodeid, remid, error); 5669 *rl_result = cpu_to_le32(error); 5670 return error; 5671 } 5672 5673 /* needs at least dlm_rcom + rcom_lock */ 5674 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc, 5675 uint64_t seq) 5676 { 5677 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5678 struct dlm_rsb *r; 5679 struct dlm_lkb *lkb; 5680 uint32_t lkid, remid; 5681 int error, result; 5682 5683 lkid = le32_to_cpu(rl->rl_lkid); 5684 remid = le32_to_cpu(rl->rl_remid); 5685 result = le32_to_cpu(rl->rl_result); 5686 5687 error = find_lkb(ls, lkid, &lkb); 5688 if (error) { 5689 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 5690 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5691 result); 5692 return error; 5693 } 5694 5695 r = lkb->lkb_resource; 5696 hold_rsb(r); 5697 lock_rsb(r); 5698 5699 if (!is_process_copy(lkb)) { 5700 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 5701 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5702 result); 5703 dlm_dump_rsb(r); 5704 unlock_rsb(r); 5705 put_rsb(r); 5706 dlm_put_lkb(lkb); 5707 return -EINVAL; 5708 } 5709 5710 switch (result) { 5711 case -EBADR: 5712 /* There's a chance the new master received our lock before 5713 dlm_recover_master_reply(), this wouldn't happen if we did 5714 a barrier between recover_masters and recover_locks. */ 5715 5716 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 5717 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5718 result); 5719 5720 dlm_send_rcom_lock(r, lkb, seq); 5721 goto out; 5722 case -EEXIST: 5723 case 0: 5724 lkb->lkb_remid = remid; 5725 break; 5726 default: 5727 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 5728 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid, 5729 result); 5730 } 5731 5732 /* an ack for dlm_recover_locks() which waits for replies from 5733 all the locks it sends to new masters */ 5734 dlm_recovered_lock(r); 5735 out: 5736 unlock_rsb(r); 5737 put_rsb(r); 5738 dlm_put_lkb(lkb); 5739 5740 return 0; 5741 } 5742 5743 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5744 int mode, uint32_t flags, void *name, unsigned int namelen) 5745 { 5746 struct dlm_lkb *lkb; 5747 struct dlm_args args; 5748 bool do_put = true; 5749 int error; 5750 5751 dlm_lock_recovery(ls); 5752 5753 error = create_lkb(ls, &lkb); 5754 if (error) { 5755 kfree(ua); 5756 goto out; 5757 } 5758 5759 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags); 5760 5761 if (flags & DLM_LKF_VALBLK) { 5762 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5763 if (!ua->lksb.sb_lvbptr) { 5764 kfree(ua); 5765 error = -ENOMEM; 5766 goto out_put; 5767 } 5768 } 5769 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua, 5770 fake_bastfn, &args); 5771 if (error) { 5772 kfree(ua->lksb.sb_lvbptr); 5773 ua->lksb.sb_lvbptr = NULL; 5774 kfree(ua); 5775 goto out_put; 5776 } 5777 5778 /* After ua is attached to lkb it will be freed by dlm_free_lkb(). 5779 When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace 5780 lock and that lkb_astparam is the dlm_user_args structure. */ 5781 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags); 5782 error = request_lock(ls, lkb, name, namelen, &args); 5783 5784 switch (error) { 5785 case 0: 5786 break; 5787 case -EINPROGRESS: 5788 error = 0; 5789 break; 5790 case -EAGAIN: 5791 error = 0; 5792 fallthrough; 5793 default: 5794 goto out_put; 5795 } 5796 5797 /* add this new lkb to the per-process list of locks */ 5798 spin_lock_bh(&ua->proc->locks_spin); 5799 hold_lkb(lkb); 5800 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5801 spin_unlock_bh(&ua->proc->locks_spin); 5802 do_put = false; 5803 out_put: 5804 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false); 5805 if (do_put) 5806 __put_lkb(ls, lkb); 5807 out: 5808 dlm_unlock_recovery(ls); 5809 return error; 5810 } 5811 5812 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5813 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 5814 { 5815 struct dlm_lkb *lkb; 5816 struct dlm_args args; 5817 struct dlm_user_args *ua; 5818 int error; 5819 5820 dlm_lock_recovery(ls); 5821 5822 error = find_lkb(ls, lkid, &lkb); 5823 if (error) 5824 goto out; 5825 5826 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags); 5827 5828 /* user can change the params on its lock when it converts it, or 5829 add an lvb that didn't exist before */ 5830 5831 ua = lkb->lkb_ua; 5832 5833 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 5834 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5835 if (!ua->lksb.sb_lvbptr) { 5836 error = -ENOMEM; 5837 goto out_put; 5838 } 5839 } 5840 if (lvb_in && ua->lksb.sb_lvbptr) 5841 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5842 5843 ua->xid = ua_tmp->xid; 5844 ua->castparam = ua_tmp->castparam; 5845 ua->castaddr = ua_tmp->castaddr; 5846 ua->bastparam = ua_tmp->bastparam; 5847 ua->bastaddr = ua_tmp->bastaddr; 5848 ua->user_lksb = ua_tmp->user_lksb; 5849 5850 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua, 5851 fake_bastfn, &args); 5852 if (error) 5853 goto out_put; 5854 5855 error = convert_lock(ls, lkb, &args); 5856 5857 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) 5858 error = 0; 5859 out_put: 5860 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false); 5861 dlm_put_lkb(lkb); 5862 out: 5863 dlm_unlock_recovery(ls); 5864 kfree(ua_tmp); 5865 return error; 5866 } 5867 5868 /* 5869 * The caller asks for an orphan lock on a given resource with a given mode. 5870 * If a matching lock exists, it's moved to the owner's list of locks and 5871 * the lkid is returned. 5872 */ 5873 5874 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5875 int mode, uint32_t flags, void *name, unsigned int namelen, 5876 uint32_t *lkid) 5877 { 5878 struct dlm_lkb *lkb = NULL, *iter; 5879 struct dlm_user_args *ua; 5880 int found_other_mode = 0; 5881 int rv = 0; 5882 5883 spin_lock_bh(&ls->ls_orphans_lock); 5884 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) { 5885 if (iter->lkb_resource->res_length != namelen) 5886 continue; 5887 if (memcmp(iter->lkb_resource->res_name, name, namelen)) 5888 continue; 5889 if (iter->lkb_grmode != mode) { 5890 found_other_mode = 1; 5891 continue; 5892 } 5893 5894 lkb = iter; 5895 list_del_init(&iter->lkb_ownqueue); 5896 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags); 5897 *lkid = iter->lkb_id; 5898 break; 5899 } 5900 spin_unlock_bh(&ls->ls_orphans_lock); 5901 5902 if (!lkb && found_other_mode) { 5903 rv = -EAGAIN; 5904 goto out; 5905 } 5906 5907 if (!lkb) { 5908 rv = -ENOENT; 5909 goto out; 5910 } 5911 5912 lkb->lkb_exflags = flags; 5913 lkb->lkb_ownpid = (int) current->pid; 5914 5915 ua = lkb->lkb_ua; 5916 5917 ua->proc = ua_tmp->proc; 5918 ua->xid = ua_tmp->xid; 5919 ua->castparam = ua_tmp->castparam; 5920 ua->castaddr = ua_tmp->castaddr; 5921 ua->bastparam = ua_tmp->bastparam; 5922 ua->bastaddr = ua_tmp->bastaddr; 5923 ua->user_lksb = ua_tmp->user_lksb; 5924 5925 /* 5926 * The lkb reference from the ls_orphans list was not 5927 * removed above, and is now considered the reference 5928 * for the proc locks list. 5929 */ 5930 5931 spin_lock_bh(&ua->proc->locks_spin); 5932 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5933 spin_unlock_bh(&ua->proc->locks_spin); 5934 out: 5935 kfree(ua_tmp); 5936 return rv; 5937 } 5938 5939 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5940 uint32_t flags, uint32_t lkid, char *lvb_in) 5941 { 5942 struct dlm_lkb *lkb; 5943 struct dlm_args args; 5944 struct dlm_user_args *ua; 5945 int error; 5946 5947 dlm_lock_recovery(ls); 5948 5949 error = find_lkb(ls, lkid, &lkb); 5950 if (error) 5951 goto out; 5952 5953 trace_dlm_unlock_start(ls, lkb, flags); 5954 5955 ua = lkb->lkb_ua; 5956 5957 if (lvb_in && ua->lksb.sb_lvbptr) 5958 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5959 if (ua_tmp->castparam) 5960 ua->castparam = ua_tmp->castparam; 5961 ua->user_lksb = ua_tmp->user_lksb; 5962 5963 error = set_unlock_args(flags, ua, &args); 5964 if (error) 5965 goto out_put; 5966 5967 error = unlock_lock(ls, lkb, &args); 5968 5969 if (error == -DLM_EUNLOCK) 5970 error = 0; 5971 /* from validate_unlock_args() */ 5972 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 5973 error = 0; 5974 if (error) 5975 goto out_put; 5976 5977 spin_lock_bh(&ua->proc->locks_spin); 5978 /* dlm_user_add_cb() may have already taken lkb off the proc list */ 5979 if (!list_empty(&lkb->lkb_ownqueue)) 5980 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 5981 spin_unlock_bh(&ua->proc->locks_spin); 5982 out_put: 5983 trace_dlm_unlock_end(ls, lkb, flags, error); 5984 dlm_put_lkb(lkb); 5985 out: 5986 dlm_unlock_recovery(ls); 5987 kfree(ua_tmp); 5988 return error; 5989 } 5990 5991 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5992 uint32_t flags, uint32_t lkid) 5993 { 5994 struct dlm_lkb *lkb; 5995 struct dlm_args args; 5996 struct dlm_user_args *ua; 5997 int error; 5998 5999 dlm_lock_recovery(ls); 6000 6001 error = find_lkb(ls, lkid, &lkb); 6002 if (error) 6003 goto out; 6004 6005 trace_dlm_unlock_start(ls, lkb, flags); 6006 6007 ua = lkb->lkb_ua; 6008 if (ua_tmp->castparam) 6009 ua->castparam = ua_tmp->castparam; 6010 ua->user_lksb = ua_tmp->user_lksb; 6011 6012 error = set_unlock_args(flags, ua, &args); 6013 if (error) 6014 goto out_put; 6015 6016 error = cancel_lock(ls, lkb, &args); 6017 6018 if (error == -DLM_ECANCEL) 6019 error = 0; 6020 /* from validate_unlock_args() */ 6021 if (error == -EBUSY) 6022 error = 0; 6023 out_put: 6024 trace_dlm_unlock_end(ls, lkb, flags, error); 6025 dlm_put_lkb(lkb); 6026 out: 6027 dlm_unlock_recovery(ls); 6028 kfree(ua_tmp); 6029 return error; 6030 } 6031 6032 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) 6033 { 6034 struct dlm_lkb *lkb; 6035 struct dlm_args args; 6036 struct dlm_user_args *ua; 6037 struct dlm_rsb *r; 6038 int error; 6039 6040 dlm_lock_recovery(ls); 6041 6042 error = find_lkb(ls, lkid, &lkb); 6043 if (error) 6044 goto out; 6045 6046 trace_dlm_unlock_start(ls, lkb, flags); 6047 6048 ua = lkb->lkb_ua; 6049 6050 error = set_unlock_args(flags, ua, &args); 6051 if (error) 6052 goto out_put; 6053 6054 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ 6055 6056 r = lkb->lkb_resource; 6057 hold_rsb(r); 6058 lock_rsb(r); 6059 6060 error = validate_unlock_args(lkb, &args); 6061 if (error) 6062 goto out_r; 6063 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags); 6064 6065 error = _cancel_lock(r, lkb); 6066 out_r: 6067 unlock_rsb(r); 6068 put_rsb(r); 6069 6070 if (error == -DLM_ECANCEL) 6071 error = 0; 6072 /* from validate_unlock_args() */ 6073 if (error == -EBUSY) 6074 error = 0; 6075 out_put: 6076 trace_dlm_unlock_end(ls, lkb, flags, error); 6077 dlm_put_lkb(lkb); 6078 out: 6079 dlm_unlock_recovery(ls); 6080 return error; 6081 } 6082 6083 /* lkb's that are removed from the waiters list by revert are just left on the 6084 orphans list with the granted orphan locks, to be freed by purge */ 6085 6086 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6087 { 6088 struct dlm_args args; 6089 int error; 6090 6091 hold_lkb(lkb); /* reference for the ls_orphans list */ 6092 spin_lock_bh(&ls->ls_orphans_lock); 6093 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 6094 spin_unlock_bh(&ls->ls_orphans_lock); 6095 6096 set_unlock_args(0, lkb->lkb_ua, &args); 6097 6098 error = cancel_lock(ls, lkb, &args); 6099 if (error == -DLM_ECANCEL) 6100 error = 0; 6101 return error; 6102 } 6103 6104 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't 6105 granted. Regardless of what rsb queue the lock is on, it's removed and 6106 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated 6107 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ 6108 6109 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6110 { 6111 struct dlm_args args; 6112 int error; 6113 6114 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, 6115 lkb->lkb_ua, &args); 6116 6117 error = unlock_lock(ls, lkb, &args); 6118 if (error == -DLM_EUNLOCK) 6119 error = 0; 6120 return error; 6121 } 6122 6123 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 6124 (which does lock_rsb) due to deadlock with receiving a message that does 6125 lock_rsb followed by dlm_user_add_cb() */ 6126 6127 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 6128 struct dlm_user_proc *proc) 6129 { 6130 struct dlm_lkb *lkb = NULL; 6131 6132 spin_lock_bh(&ls->ls_clear_proc_locks); 6133 if (list_empty(&proc->locks)) 6134 goto out; 6135 6136 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 6137 list_del_init(&lkb->lkb_ownqueue); 6138 6139 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6140 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags); 6141 else 6142 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 6143 out: 6144 spin_unlock_bh(&ls->ls_clear_proc_locks); 6145 return lkb; 6146 } 6147 6148 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 6149 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 6150 which we clear here. */ 6151 6152 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 6153 list, and no more device_writes should add lkb's to proc->locks list; so we 6154 shouldn't need to take asts_spin or locks_spin here. this assumes that 6155 device reads/writes/closes are serialized -- FIXME: we may need to serialize 6156 them ourself. */ 6157 6158 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6159 { 6160 struct dlm_callback *cb, *cb_safe; 6161 struct dlm_lkb *lkb, *safe; 6162 6163 dlm_lock_recovery(ls); 6164 6165 while (1) { 6166 lkb = del_proc_lock(ls, proc); 6167 if (!lkb) 6168 break; 6169 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6170 orphan_proc_lock(ls, lkb); 6171 else 6172 unlock_proc_lock(ls, lkb); 6173 6174 /* this removes the reference for the proc->locks list 6175 added by dlm_user_request, it may result in the lkb 6176 being freed */ 6177 6178 dlm_put_lkb(lkb); 6179 } 6180 6181 spin_lock_bh(&ls->ls_clear_proc_locks); 6182 6183 /* in-progress unlocks */ 6184 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6185 list_del_init(&lkb->lkb_ownqueue); 6186 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 6187 dlm_put_lkb(lkb); 6188 } 6189 6190 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { 6191 list_del(&cb->list); 6192 dlm_free_cb(cb); 6193 } 6194 6195 spin_unlock_bh(&ls->ls_clear_proc_locks); 6196 dlm_unlock_recovery(ls); 6197 } 6198 6199 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6200 { 6201 struct dlm_callback *cb, *cb_safe; 6202 struct dlm_lkb *lkb, *safe; 6203 6204 while (1) { 6205 lkb = NULL; 6206 spin_lock_bh(&proc->locks_spin); 6207 if (!list_empty(&proc->locks)) { 6208 lkb = list_entry(proc->locks.next, struct dlm_lkb, 6209 lkb_ownqueue); 6210 list_del_init(&lkb->lkb_ownqueue); 6211 } 6212 spin_unlock_bh(&proc->locks_spin); 6213 6214 if (!lkb) 6215 break; 6216 6217 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 6218 unlock_proc_lock(ls, lkb); 6219 dlm_put_lkb(lkb); /* ref from proc->locks list */ 6220 } 6221 6222 spin_lock_bh(&proc->locks_spin); 6223 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6224 list_del_init(&lkb->lkb_ownqueue); 6225 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags); 6226 dlm_put_lkb(lkb); 6227 } 6228 spin_unlock_bh(&proc->locks_spin); 6229 6230 spin_lock_bh(&proc->asts_spin); 6231 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) { 6232 list_del(&cb->list); 6233 dlm_free_cb(cb); 6234 } 6235 spin_unlock_bh(&proc->asts_spin); 6236 } 6237 6238 /* pid of 0 means purge all orphans */ 6239 6240 static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 6241 { 6242 struct dlm_lkb *lkb, *safe; 6243 6244 spin_lock_bh(&ls->ls_orphans_lock); 6245 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 6246 if (pid && lkb->lkb_ownpid != pid) 6247 continue; 6248 unlock_proc_lock(ls, lkb); 6249 list_del_init(&lkb->lkb_ownqueue); 6250 dlm_put_lkb(lkb); 6251 } 6252 spin_unlock_bh(&ls->ls_orphans_lock); 6253 } 6254 6255 static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 6256 { 6257 struct dlm_message *ms; 6258 struct dlm_mhandle *mh; 6259 int error; 6260 6261 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 6262 DLM_MSG_PURGE, &ms, &mh); 6263 if (error) 6264 return error; 6265 ms->m_nodeid = cpu_to_le32(nodeid); 6266 ms->m_pid = cpu_to_le32(pid); 6267 6268 return send_message(mh, ms, NULL, 0); 6269 } 6270 6271 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 6272 int nodeid, int pid) 6273 { 6274 int error = 0; 6275 6276 if (nodeid && (nodeid != dlm_our_nodeid())) { 6277 error = send_purge(ls, nodeid, pid); 6278 } else { 6279 dlm_lock_recovery(ls); 6280 if (pid == current->pid) 6281 purge_proc_locks(ls, proc); 6282 else 6283 do_purge(ls, nodeid, pid); 6284 dlm_unlock_recovery(ls); 6285 } 6286 return error; 6287 } 6288 6289 /* debug functionality */ 6290 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, 6291 int lkb_nodeid, unsigned int lkb_dflags, int lkb_status) 6292 { 6293 struct dlm_lksb *lksb; 6294 struct dlm_lkb *lkb; 6295 struct dlm_rsb *r; 6296 int error; 6297 6298 /* we currently can't set a valid user lock */ 6299 if (lkb_dflags & BIT(DLM_DFL_USER_BIT)) 6300 return -EOPNOTSUPP; 6301 6302 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 6303 if (!lksb) 6304 return -ENOMEM; 6305 6306 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); 6307 if (error) { 6308 kfree(lksb); 6309 return error; 6310 } 6311 6312 dlm_set_dflags_val(lkb, lkb_dflags); 6313 lkb->lkb_nodeid = lkb_nodeid; 6314 lkb->lkb_lksb = lksb; 6315 /* user specific pointer, just don't have it NULL for kernel locks */ 6316 if (~lkb_dflags & BIT(DLM_DFL_USER_BIT)) 6317 lkb->lkb_astparam = (void *)0xDEADBEEF; 6318 6319 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 6320 if (error) { 6321 kfree(lksb); 6322 __put_lkb(ls, lkb); 6323 return error; 6324 } 6325 6326 lock_rsb(r); 6327 attach_lkb(r, lkb); 6328 add_lkb(r, lkb, lkb_status); 6329 unlock_rsb(r); 6330 put_rsb(r); 6331 6332 return 0; 6333 } 6334 6335 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, 6336 int mstype, int to_nodeid) 6337 { 6338 struct dlm_lkb *lkb; 6339 int error; 6340 6341 error = find_lkb(ls, lkb_id, &lkb); 6342 if (error) 6343 return error; 6344 6345 error = add_to_waiters(lkb, mstype, to_nodeid); 6346 dlm_put_lkb(lkb); 6347 return error; 6348 } 6349 6350