1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 /* Central locking logic has four stages: 14 15 dlm_lock() 16 dlm_unlock() 17 18 request_lock(ls, lkb) 19 convert_lock(ls, lkb) 20 unlock_lock(ls, lkb) 21 cancel_lock(ls, lkb) 22 23 _request_lock(r, lkb) 24 _convert_lock(r, lkb) 25 _unlock_lock(r, lkb) 26 _cancel_lock(r, lkb) 27 28 do_request(r, lkb) 29 do_convert(r, lkb) 30 do_unlock(r, lkb) 31 do_cancel(r, lkb) 32 33 Stage 1 (lock, unlock) is mainly about checking input args and 34 splitting into one of the four main operations: 35 36 dlm_lock = request_lock 37 dlm_lock+CONVERT = convert_lock 38 dlm_unlock = unlock_lock 39 dlm_unlock+CANCEL = cancel_lock 40 41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 42 provided to the next stage. 43 44 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 45 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 46 47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 48 given rsb and lkb and queues callbacks. 49 50 For remote operations, send_xxxx() results in the corresponding do_xxxx() 51 function being executed on the remote node. The connecting send/receive 52 calls on local (L) and remote (R) nodes: 53 54 L: send_xxxx() -> R: receive_xxxx() 55 R: do_xxxx() 56 L: receive_xxxx_reply() <- R: send_xxxx_reply() 57 */ 58 #include <linux/types.h> 59 #include <linux/rbtree.h> 60 #include <linux/slab.h> 61 #include "dlm_internal.h" 62 #include <linux/dlm_device.h> 63 #include "memory.h" 64 #include "lowcomms.h" 65 #include "requestqueue.h" 66 #include "util.h" 67 #include "dir.h" 68 #include "member.h" 69 #include "lockspace.h" 70 #include "ast.h" 71 #include "lock.h" 72 #include "rcom.h" 73 #include "recover.h" 74 #include "lvb_table.h" 75 #include "user.h" 76 #include "config.h" 77 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static int send_remove(struct dlm_rsb *r); 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 89 struct dlm_message *ms); 90 static int receive_extralen(struct dlm_message *ms); 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92 static void del_timeout(struct dlm_lkb *lkb); 93 static void toss_rsb(struct kref *kref); 94 95 /* 96 * Lock compatibilty matrix - thanks Steve 97 * UN = Unlocked state. Not really a state, used as a flag 98 * PD = Padding. Used to make the matrix a nice power of two in size 99 * Other states are the same as the VMS DLM. 100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 101 */ 102 103 static const int __dlm_compat_matrix[8][8] = { 104 /* UN NL CR CW PR PW EX PD */ 105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 113 }; 114 115 /* 116 * This defines the direction of transfer of LVB data. 117 * Granted mode is the row; requested mode is the column. 118 * Usage: matrix[grmode+1][rqmode+1] 119 * 1 = LVB is returned to the caller 120 * 0 = LVB is written to the resource 121 * -1 = nothing happens to the LVB 122 */ 123 124 const int dlm_lvb_operations[8][8] = { 125 /* UN NL CR CW PR PW EX PD*/ 126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 134 }; 135 136 #define modes_compat(gr, rq) \ 137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 138 139 int dlm_modes_compat(int mode1, int mode2) 140 { 141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 142 } 143 144 /* 145 * Compatibility matrix for conversions with QUECVT set. 146 * Granted mode is the row; requested mode is the column. 147 * Usage: matrix[grmode+1][rqmode+1] 148 */ 149 150 static const int __quecvt_compat_matrix[8][8] = { 151 /* UN NL CR CW PR PW EX PD */ 152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 160 }; 161 162 void dlm_print_lkb(struct dlm_lkb *lkb) 163 { 164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", 166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 169 (unsigned long long)lkb->lkb_recover_seq); 170 } 171 172 static void dlm_print_rsb(struct dlm_rsb *r) 173 { 174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 175 "rlc %d name %s\n", 176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count, 178 r->res_name); 179 } 180 181 void dlm_dump_rsb(struct dlm_rsb *r) 182 { 183 struct dlm_lkb *lkb; 184 185 dlm_print_rsb(r); 186 187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 189 printk(KERN_ERR "rsb lookup list\n"); 190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 191 dlm_print_lkb(lkb); 192 printk(KERN_ERR "rsb grant queue:\n"); 193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 194 dlm_print_lkb(lkb); 195 printk(KERN_ERR "rsb convert queue:\n"); 196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 197 dlm_print_lkb(lkb); 198 printk(KERN_ERR "rsb wait queue:\n"); 199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 200 dlm_print_lkb(lkb); 201 } 202 203 /* Threads cannot use the lockspace while it's being recovered */ 204 205 static inline void dlm_lock_recovery(struct dlm_ls *ls) 206 { 207 down_read(&ls->ls_in_recovery); 208 } 209 210 void dlm_unlock_recovery(struct dlm_ls *ls) 211 { 212 up_read(&ls->ls_in_recovery); 213 } 214 215 int dlm_lock_recovery_try(struct dlm_ls *ls) 216 { 217 return down_read_trylock(&ls->ls_in_recovery); 218 } 219 220 static inline int can_be_queued(struct dlm_lkb *lkb) 221 { 222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 223 } 224 225 static inline int force_blocking_asts(struct dlm_lkb *lkb) 226 { 227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 228 } 229 230 static inline int is_demoted(struct dlm_lkb *lkb) 231 { 232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 233 } 234 235 static inline int is_altmode(struct dlm_lkb *lkb) 236 { 237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); 238 } 239 240 static inline int is_granted(struct dlm_lkb *lkb) 241 { 242 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 243 } 244 245 static inline int is_remote(struct dlm_rsb *r) 246 { 247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 248 return !!r->res_nodeid; 249 } 250 251 static inline int is_process_copy(struct dlm_lkb *lkb) 252 { 253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 254 } 255 256 static inline int is_master_copy(struct dlm_lkb *lkb) 257 { 258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 259 } 260 261 static inline int middle_conversion(struct dlm_lkb *lkb) 262 { 263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 265 return 1; 266 return 0; 267 } 268 269 static inline int down_conversion(struct dlm_lkb *lkb) 270 { 271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 272 } 273 274 static inline int is_overlap_unlock(struct dlm_lkb *lkb) 275 { 276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; 277 } 278 279 static inline int is_overlap_cancel(struct dlm_lkb *lkb) 280 { 281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; 282 } 283 284 static inline int is_overlap(struct dlm_lkb *lkb) 285 { 286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | 287 DLM_IFL_OVERLAP_CANCEL)); 288 } 289 290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 291 { 292 if (is_master_copy(lkb)) 293 return; 294 295 del_timeout(lkb); 296 297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 298 299 /* if the operation was a cancel, then return -DLM_ECANCEL, if a 300 timeout caused the cancel then return -ETIMEDOUT */ 301 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { 302 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; 303 rv = -ETIMEDOUT; 304 } 305 306 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { 307 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; 308 rv = -EDEADLK; 309 } 310 311 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); 312 } 313 314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 315 { 316 queue_cast(r, lkb, 317 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 318 } 319 320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 321 { 322 if (is_master_copy(lkb)) { 323 send_bast(r, lkb, rqmode); 324 } else { 325 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); 326 } 327 } 328 329 /* 330 * Basic operations on rsb's and lkb's 331 */ 332 333 /* This is only called to add a reference when the code already holds 334 a valid reference to the rsb, so there's no need for locking. */ 335 336 static inline void hold_rsb(struct dlm_rsb *r) 337 { 338 kref_get(&r->res_ref); 339 } 340 341 void dlm_hold_rsb(struct dlm_rsb *r) 342 { 343 hold_rsb(r); 344 } 345 346 /* When all references to the rsb are gone it's transferred to 347 the tossed list for later disposal. */ 348 349 static void put_rsb(struct dlm_rsb *r) 350 { 351 struct dlm_ls *ls = r->res_ls; 352 uint32_t bucket = r->res_bucket; 353 354 spin_lock(&ls->ls_rsbtbl[bucket].lock); 355 kref_put(&r->res_ref, toss_rsb); 356 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 357 } 358 359 void dlm_put_rsb(struct dlm_rsb *r) 360 { 361 put_rsb(r); 362 } 363 364 static int pre_rsb_struct(struct dlm_ls *ls) 365 { 366 struct dlm_rsb *r1, *r2; 367 int count = 0; 368 369 spin_lock(&ls->ls_new_rsb_spin); 370 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 371 spin_unlock(&ls->ls_new_rsb_spin); 372 return 0; 373 } 374 spin_unlock(&ls->ls_new_rsb_spin); 375 376 r1 = dlm_allocate_rsb(ls); 377 r2 = dlm_allocate_rsb(ls); 378 379 spin_lock(&ls->ls_new_rsb_spin); 380 if (r1) { 381 list_add(&r1->res_hashchain, &ls->ls_new_rsb); 382 ls->ls_new_rsb_count++; 383 } 384 if (r2) { 385 list_add(&r2->res_hashchain, &ls->ls_new_rsb); 386 ls->ls_new_rsb_count++; 387 } 388 count = ls->ls_new_rsb_count; 389 spin_unlock(&ls->ls_new_rsb_spin); 390 391 if (!count) 392 return -ENOMEM; 393 return 0; 394 } 395 396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can 397 unlock any spinlocks, go back and call pre_rsb_struct again. 398 Otherwise, take an rsb off the list and return it. */ 399 400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, 401 struct dlm_rsb **r_ret) 402 { 403 struct dlm_rsb *r; 404 int count; 405 406 spin_lock(&ls->ls_new_rsb_spin); 407 if (list_empty(&ls->ls_new_rsb)) { 408 count = ls->ls_new_rsb_count; 409 spin_unlock(&ls->ls_new_rsb_spin); 410 log_debug(ls, "find_rsb retry %d %d %s", 411 count, dlm_config.ci_new_rsb_count, name); 412 return -EAGAIN; 413 } 414 415 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 416 list_del(&r->res_hashchain); 417 /* Convert the empty list_head to a NULL rb_node for tree usage: */ 418 memset(&r->res_hashnode, 0, sizeof(struct rb_node)); 419 ls->ls_new_rsb_count--; 420 spin_unlock(&ls->ls_new_rsb_spin); 421 422 r->res_ls = ls; 423 r->res_length = len; 424 memcpy(r->res_name, name, len); 425 mutex_init(&r->res_mutex); 426 427 INIT_LIST_HEAD(&r->res_lookup); 428 INIT_LIST_HEAD(&r->res_grantqueue); 429 INIT_LIST_HEAD(&r->res_convertqueue); 430 INIT_LIST_HEAD(&r->res_waitqueue); 431 INIT_LIST_HEAD(&r->res_root_list); 432 INIT_LIST_HEAD(&r->res_recover_list); 433 434 *r_ret = r; 435 return 0; 436 } 437 438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) 439 { 440 char maxname[DLM_RESNAME_MAXLEN]; 441 442 memset(maxname, 0, DLM_RESNAME_MAXLEN); 443 memcpy(maxname, name, nlen); 444 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); 445 } 446 447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 448 struct dlm_rsb **r_ret) 449 { 450 struct rb_node *node = tree->rb_node; 451 struct dlm_rsb *r; 452 int rc; 453 454 while (node) { 455 r = rb_entry(node, struct dlm_rsb, res_hashnode); 456 rc = rsb_cmp(r, name, len); 457 if (rc < 0) 458 node = node->rb_left; 459 else if (rc > 0) 460 node = node->rb_right; 461 else 462 goto found; 463 } 464 *r_ret = NULL; 465 return -EBADR; 466 467 found: 468 *r_ret = r; 469 return 0; 470 } 471 472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 473 { 474 struct rb_node **newn = &tree->rb_node; 475 struct rb_node *parent = NULL; 476 int rc; 477 478 while (*newn) { 479 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, 480 res_hashnode); 481 482 parent = *newn; 483 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); 484 if (rc < 0) 485 newn = &parent->rb_left; 486 else if (rc > 0) 487 newn = &parent->rb_right; 488 else { 489 log_print("rsb_insert match"); 490 dlm_dump_rsb(rsb); 491 dlm_dump_rsb(cur); 492 return -EEXIST; 493 } 494 } 495 496 rb_link_node(&rsb->res_hashnode, parent, newn); 497 rb_insert_color(&rsb->res_hashnode, tree); 498 return 0; 499 } 500 501 /* 502 * Find rsb in rsbtbl and potentially create/add one 503 * 504 * Delaying the release of rsb's has a similar benefit to applications keeping 505 * NL locks on an rsb, but without the guarantee that the cached master value 506 * will still be valid when the rsb is reused. Apps aren't always smart enough 507 * to keep NL locks on an rsb that they may lock again shortly; this can lead 508 * to excessive master lookups and removals if we don't delay the release. 509 * 510 * Searching for an rsb means looking through both the normal list and toss 511 * list. When found on the toss list the rsb is moved to the normal list with 512 * ref count of 1; when found on normal list the ref count is incremented. 513 * 514 * rsb's on the keep list are being used locally and refcounted. 515 * rsb's on the toss list are not being used locally, and are not refcounted. 516 * 517 * The toss list rsb's were either 518 * - previously used locally but not any more (were on keep list, then 519 * moved to toss list when last refcount dropped) 520 * - created and put on toss list as a directory record for a lookup 521 * (we are the dir node for the res, but are not using the res right now, 522 * but some other node is) 523 * 524 * The purpose of find_rsb() is to return a refcounted rsb for local use. 525 * So, if the given rsb is on the toss list, it is moved to the keep list 526 * before being returned. 527 * 528 * toss_rsb() happens when all local usage of the rsb is done, i.e. no 529 * more refcounts exist, so the rsb is moved from the keep list to the 530 * toss list. 531 * 532 * rsb's on both keep and toss lists are used for doing a name to master 533 * lookups. rsb's that are in use locally (and being refcounted) are on 534 * the keep list, rsb's that are not in use locally (not refcounted) and 535 * only exist for name/master lookups are on the toss list. 536 * 537 * rsb's on the toss list who's dir_nodeid is not local can have stale 538 * name/master mappings. So, remote requests on such rsb's can potentially 539 * return with an error, which means the mapping is stale and needs to 540 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and 541 * first_lkid is to keep only a single outstanding request on an rsb 542 * while that rsb has a potentially stale master.) 543 */ 544 545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, 546 uint32_t hash, uint32_t b, 547 int dir_nodeid, int from_nodeid, 548 unsigned int flags, struct dlm_rsb **r_ret) 549 { 550 struct dlm_rsb *r = NULL; 551 int our_nodeid = dlm_our_nodeid(); 552 int from_local = 0; 553 int from_other = 0; 554 int from_dir = 0; 555 int create = 0; 556 int error; 557 558 if (flags & R_RECEIVE_REQUEST) { 559 if (from_nodeid == dir_nodeid) 560 from_dir = 1; 561 else 562 from_other = 1; 563 } else if (flags & R_REQUEST) { 564 from_local = 1; 565 } 566 567 /* 568 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so 569 * from_nodeid has sent us a lock in dlm_recover_locks, believing 570 * we're the new master. Our local recovery may not have set 571 * res_master_nodeid to our_nodeid yet, so allow either. Don't 572 * create the rsb; dlm_recover_process_copy() will handle EBADR 573 * by resending. 574 * 575 * If someone sends us a request, we are the dir node, and we do 576 * not find the rsb anywhere, then recreate it. This happens if 577 * someone sends us a request after we have removed/freed an rsb 578 * from our toss list. (They sent a request instead of lookup 579 * because they are using an rsb from their toss list.) 580 */ 581 582 if (from_local || from_dir || 583 (from_other && (dir_nodeid == our_nodeid))) { 584 create = 1; 585 } 586 587 retry: 588 if (create) { 589 error = pre_rsb_struct(ls); 590 if (error < 0) 591 goto out; 592 } 593 594 spin_lock(&ls->ls_rsbtbl[b].lock); 595 596 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 597 if (error) 598 goto do_toss; 599 600 /* 601 * rsb is active, so we can't check master_nodeid without lock_rsb. 602 */ 603 604 kref_get(&r->res_ref); 605 error = 0; 606 goto out_unlock; 607 608 609 do_toss: 610 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 611 if (error) 612 goto do_new; 613 614 /* 615 * rsb found inactive (master_nodeid may be out of date unless 616 * we are the dir_nodeid or were the master) No other thread 617 * is using this rsb because it's on the toss list, so we can 618 * look at or update res_master_nodeid without lock_rsb. 619 */ 620 621 if ((r->res_master_nodeid != our_nodeid) && from_other) { 622 /* our rsb was not master, and another node (not the dir node) 623 has sent us a request */ 624 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 625 from_nodeid, r->res_master_nodeid, dir_nodeid, 626 r->res_name); 627 error = -ENOTBLK; 628 goto out_unlock; 629 } 630 631 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 632 /* don't think this should ever happen */ 633 log_error(ls, "find_rsb toss from_dir %d master %d", 634 from_nodeid, r->res_master_nodeid); 635 dlm_print_rsb(r); 636 /* fix it and go on */ 637 r->res_master_nodeid = our_nodeid; 638 r->res_nodeid = 0; 639 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 640 r->res_first_lkid = 0; 641 } 642 643 if (from_local && (r->res_master_nodeid != our_nodeid)) { 644 /* Because we have held no locks on this rsb, 645 res_master_nodeid could have become stale. */ 646 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 647 r->res_first_lkid = 0; 648 } 649 650 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 651 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 652 goto out_unlock; 653 654 655 do_new: 656 /* 657 * rsb not found 658 */ 659 660 if (error == -EBADR && !create) 661 goto out_unlock; 662 663 error = get_rsb_struct(ls, name, len, &r); 664 if (error == -EAGAIN) { 665 spin_unlock(&ls->ls_rsbtbl[b].lock); 666 goto retry; 667 } 668 if (error) 669 goto out_unlock; 670 671 r->res_hash = hash; 672 r->res_bucket = b; 673 r->res_dir_nodeid = dir_nodeid; 674 kref_init(&r->res_ref); 675 676 if (from_dir) { 677 /* want to see how often this happens */ 678 log_debug(ls, "find_rsb new from_dir %d recreate %s", 679 from_nodeid, r->res_name); 680 r->res_master_nodeid = our_nodeid; 681 r->res_nodeid = 0; 682 goto out_add; 683 } 684 685 if (from_other && (dir_nodeid != our_nodeid)) { 686 /* should never happen */ 687 log_error(ls, "find_rsb new from_other %d dir %d our %d %s", 688 from_nodeid, dir_nodeid, our_nodeid, r->res_name); 689 dlm_free_rsb(r); 690 error = -ENOTBLK; 691 goto out_unlock; 692 } 693 694 if (from_other) { 695 log_debug(ls, "find_rsb new from_other %d dir %d %s", 696 from_nodeid, dir_nodeid, r->res_name); 697 } 698 699 if (dir_nodeid == our_nodeid) { 700 /* When we are the dir nodeid, we can set the master 701 node immediately */ 702 r->res_master_nodeid = our_nodeid; 703 r->res_nodeid = 0; 704 } else { 705 /* set_master will send_lookup to dir_nodeid */ 706 r->res_master_nodeid = 0; 707 r->res_nodeid = -1; 708 } 709 710 out_add: 711 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 712 out_unlock: 713 spin_unlock(&ls->ls_rsbtbl[b].lock); 714 out: 715 *r_ret = r; 716 return error; 717 } 718 719 /* During recovery, other nodes can send us new MSTCPY locks (from 720 dlm_recover_locks) before we've made ourself master (in 721 dlm_recover_masters). */ 722 723 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, 724 uint32_t hash, uint32_t b, 725 int dir_nodeid, int from_nodeid, 726 unsigned int flags, struct dlm_rsb **r_ret) 727 { 728 struct dlm_rsb *r = NULL; 729 int our_nodeid = dlm_our_nodeid(); 730 int recover = (flags & R_RECEIVE_RECOVER); 731 int error; 732 733 retry: 734 error = pre_rsb_struct(ls); 735 if (error < 0) 736 goto out; 737 738 spin_lock(&ls->ls_rsbtbl[b].lock); 739 740 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 741 if (error) 742 goto do_toss; 743 744 /* 745 * rsb is active, so we can't check master_nodeid without lock_rsb. 746 */ 747 748 kref_get(&r->res_ref); 749 goto out_unlock; 750 751 752 do_toss: 753 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 754 if (error) 755 goto do_new; 756 757 /* 758 * rsb found inactive. No other thread is using this rsb because 759 * it's on the toss list, so we can look at or update 760 * res_master_nodeid without lock_rsb. 761 */ 762 763 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 764 /* our rsb is not master, and another node has sent us a 765 request; this should never happen */ 766 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 767 from_nodeid, r->res_master_nodeid, dir_nodeid); 768 dlm_print_rsb(r); 769 error = -ENOTBLK; 770 goto out_unlock; 771 } 772 773 if (!recover && (r->res_master_nodeid != our_nodeid) && 774 (dir_nodeid == our_nodeid)) { 775 /* our rsb is not master, and we are dir; may as well fix it; 776 this should never happen */ 777 log_error(ls, "find_rsb toss our %d master %d dir %d", 778 our_nodeid, r->res_master_nodeid, dir_nodeid); 779 dlm_print_rsb(r); 780 r->res_master_nodeid = our_nodeid; 781 r->res_nodeid = 0; 782 } 783 784 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 785 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 786 goto out_unlock; 787 788 789 do_new: 790 /* 791 * rsb not found 792 */ 793 794 error = get_rsb_struct(ls, name, len, &r); 795 if (error == -EAGAIN) { 796 spin_unlock(&ls->ls_rsbtbl[b].lock); 797 goto retry; 798 } 799 if (error) 800 goto out_unlock; 801 802 r->res_hash = hash; 803 r->res_bucket = b; 804 r->res_dir_nodeid = dir_nodeid; 805 r->res_master_nodeid = dir_nodeid; 806 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; 807 kref_init(&r->res_ref); 808 809 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 810 out_unlock: 811 spin_unlock(&ls->ls_rsbtbl[b].lock); 812 out: 813 *r_ret = r; 814 return error; 815 } 816 817 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid, 818 unsigned int flags, struct dlm_rsb **r_ret) 819 { 820 uint32_t hash, b; 821 int dir_nodeid; 822 823 if (len > DLM_RESNAME_MAXLEN) 824 return -EINVAL; 825 826 hash = jhash(name, len, 0); 827 b = hash & (ls->ls_rsbtbl_size - 1); 828 829 dir_nodeid = dlm_hash2nodeid(ls, hash); 830 831 if (dlm_no_directory(ls)) 832 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, 833 from_nodeid, flags, r_ret); 834 else 835 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, 836 from_nodeid, flags, r_ret); 837 } 838 839 /* we have received a request and found that res_master_nodeid != our_nodeid, 840 so we need to return an error or make ourself the master */ 841 842 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, 843 int from_nodeid) 844 { 845 if (dlm_no_directory(ls)) { 846 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", 847 from_nodeid, r->res_master_nodeid, 848 r->res_dir_nodeid); 849 dlm_print_rsb(r); 850 return -ENOTBLK; 851 } 852 853 if (from_nodeid != r->res_dir_nodeid) { 854 /* our rsb is not master, and another node (not the dir node) 855 has sent us a request. this is much more common when our 856 master_nodeid is zero, so limit debug to non-zero. */ 857 858 if (r->res_master_nodeid) { 859 log_debug(ls, "validate master from_other %d master %d " 860 "dir %d first %x %s", from_nodeid, 861 r->res_master_nodeid, r->res_dir_nodeid, 862 r->res_first_lkid, r->res_name); 863 } 864 return -ENOTBLK; 865 } else { 866 /* our rsb is not master, but the dir nodeid has sent us a 867 request; this could happen with master 0 / res_nodeid -1 */ 868 869 if (r->res_master_nodeid) { 870 log_error(ls, "validate master from_dir %d master %d " 871 "first %x %s", 872 from_nodeid, r->res_master_nodeid, 873 r->res_first_lkid, r->res_name); 874 } 875 876 r->res_master_nodeid = dlm_our_nodeid(); 877 r->res_nodeid = 0; 878 return 0; 879 } 880 } 881 882 /* 883 * We're the dir node for this res and another node wants to know the 884 * master nodeid. During normal operation (non recovery) this is only 885 * called from receive_lookup(); master lookups when the local node is 886 * the dir node are done by find_rsb(). 887 * 888 * normal operation, we are the dir node for a resource 889 * . _request_lock 890 * . set_master 891 * . send_lookup 892 * . receive_lookup 893 * . dlm_master_lookup flags 0 894 * 895 * recover directory, we are rebuilding dir for all resources 896 * . dlm_recover_directory 897 * . dlm_rcom_names 898 * remote node sends back the rsb names it is master of and we are dir of 899 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) 900 * we either create new rsb setting remote node as master, or find existing 901 * rsb and set master to be the remote node. 902 * 903 * recover masters, we are finding the new master for resources 904 * . dlm_recover_masters 905 * . recover_master 906 * . dlm_send_rcom_lookup 907 * . receive_rcom_lookup 908 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 909 */ 910 911 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, 912 unsigned int flags, int *r_nodeid, int *result) 913 { 914 struct dlm_rsb *r = NULL; 915 uint32_t hash, b; 916 int from_master = (flags & DLM_LU_RECOVER_DIR); 917 int fix_master = (flags & DLM_LU_RECOVER_MASTER); 918 int our_nodeid = dlm_our_nodeid(); 919 int dir_nodeid, error, toss_list = 0; 920 921 if (len > DLM_RESNAME_MAXLEN) 922 return -EINVAL; 923 924 if (from_nodeid == our_nodeid) { 925 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", 926 our_nodeid, flags); 927 return -EINVAL; 928 } 929 930 hash = jhash(name, len, 0); 931 b = hash & (ls->ls_rsbtbl_size - 1); 932 933 dir_nodeid = dlm_hash2nodeid(ls, hash); 934 if (dir_nodeid != our_nodeid) { 935 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", 936 from_nodeid, dir_nodeid, our_nodeid, hash, 937 ls->ls_num_nodes); 938 *r_nodeid = -1; 939 return -EINVAL; 940 } 941 942 retry: 943 error = pre_rsb_struct(ls); 944 if (error < 0) 945 return error; 946 947 spin_lock(&ls->ls_rsbtbl[b].lock); 948 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 949 if (!error) { 950 /* because the rsb is active, we need to lock_rsb before 951 checking/changing re_master_nodeid */ 952 953 hold_rsb(r); 954 spin_unlock(&ls->ls_rsbtbl[b].lock); 955 lock_rsb(r); 956 goto found; 957 } 958 959 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 960 if (error) 961 goto not_found; 962 963 /* because the rsb is inactive (on toss list), it's not refcounted 964 and lock_rsb is not used, but is protected by the rsbtbl lock */ 965 966 toss_list = 1; 967 found: 968 if (r->res_dir_nodeid != our_nodeid) { 969 /* should not happen, but may as well fix it and carry on */ 970 log_error(ls, "dlm_master_lookup res_dir %d our %d %s", 971 r->res_dir_nodeid, our_nodeid, r->res_name); 972 r->res_dir_nodeid = our_nodeid; 973 } 974 975 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { 976 /* Recovery uses this function to set a new master when 977 the previous master failed. Setting NEW_MASTER will 978 force dlm_recover_masters to call recover_master on this 979 rsb even though the res_nodeid is no longer removed. */ 980 981 r->res_master_nodeid = from_nodeid; 982 r->res_nodeid = from_nodeid; 983 rsb_set_flag(r, RSB_NEW_MASTER); 984 985 if (toss_list) { 986 /* I don't think we should ever find it on toss list. */ 987 log_error(ls, "dlm_master_lookup fix_master on toss"); 988 dlm_dump_rsb(r); 989 } 990 } 991 992 if (from_master && (r->res_master_nodeid != from_nodeid)) { 993 /* this will happen if from_nodeid became master during 994 a previous recovery cycle, and we aborted the previous 995 cycle before recovering this master value */ 996 997 log_limit(ls, "dlm_master_lookup from_master %d " 998 "master_nodeid %d res_nodeid %d first %x %s", 999 from_nodeid, r->res_master_nodeid, r->res_nodeid, 1000 r->res_first_lkid, r->res_name); 1001 1002 if (r->res_master_nodeid == our_nodeid) { 1003 log_error(ls, "from_master %d our_master", from_nodeid); 1004 dlm_dump_rsb(r); 1005 dlm_send_rcom_lookup_dump(r, from_nodeid); 1006 goto out_found; 1007 } 1008 1009 r->res_master_nodeid = from_nodeid; 1010 r->res_nodeid = from_nodeid; 1011 rsb_set_flag(r, RSB_NEW_MASTER); 1012 } 1013 1014 if (!r->res_master_nodeid) { 1015 /* this will happen if recovery happens while we're looking 1016 up the master for this rsb */ 1017 1018 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s", 1019 from_nodeid, r->res_first_lkid, r->res_name); 1020 r->res_master_nodeid = from_nodeid; 1021 r->res_nodeid = from_nodeid; 1022 } 1023 1024 if (!from_master && !fix_master && 1025 (r->res_master_nodeid == from_nodeid)) { 1026 /* this can happen when the master sends remove, the dir node 1027 finds the rsb on the keep list and ignores the remove, 1028 and the former master sends a lookup */ 1029 1030 log_limit(ls, "dlm_master_lookup from master %d flags %x " 1031 "first %x %s", from_nodeid, flags, 1032 r->res_first_lkid, r->res_name); 1033 } 1034 1035 out_found: 1036 *r_nodeid = r->res_master_nodeid; 1037 if (result) 1038 *result = DLM_LU_MATCH; 1039 1040 if (toss_list) { 1041 r->res_toss_time = jiffies; 1042 /* the rsb was inactive (on toss list) */ 1043 spin_unlock(&ls->ls_rsbtbl[b].lock); 1044 } else { 1045 /* the rsb was active */ 1046 unlock_rsb(r); 1047 put_rsb(r); 1048 } 1049 return 0; 1050 1051 not_found: 1052 error = get_rsb_struct(ls, name, len, &r); 1053 if (error == -EAGAIN) { 1054 spin_unlock(&ls->ls_rsbtbl[b].lock); 1055 goto retry; 1056 } 1057 if (error) 1058 goto out_unlock; 1059 1060 r->res_hash = hash; 1061 r->res_bucket = b; 1062 r->res_dir_nodeid = our_nodeid; 1063 r->res_master_nodeid = from_nodeid; 1064 r->res_nodeid = from_nodeid; 1065 kref_init(&r->res_ref); 1066 r->res_toss_time = jiffies; 1067 1068 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); 1069 if (error) { 1070 /* should never happen */ 1071 dlm_free_rsb(r); 1072 spin_unlock(&ls->ls_rsbtbl[b].lock); 1073 goto retry; 1074 } 1075 1076 if (result) 1077 *result = DLM_LU_ADD; 1078 *r_nodeid = from_nodeid; 1079 error = 0; 1080 out_unlock: 1081 spin_unlock(&ls->ls_rsbtbl[b].lock); 1082 return error; 1083 } 1084 1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1086 { 1087 struct rb_node *n; 1088 struct dlm_rsb *r; 1089 int i; 1090 1091 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1092 spin_lock(&ls->ls_rsbtbl[i].lock); 1093 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 1094 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1095 if (r->res_hash == hash) 1096 dlm_dump_rsb(r); 1097 } 1098 spin_unlock(&ls->ls_rsbtbl[i].lock); 1099 } 1100 } 1101 1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) 1103 { 1104 struct dlm_rsb *r = NULL; 1105 uint32_t hash, b; 1106 int error; 1107 1108 hash = jhash(name, len, 0); 1109 b = hash & (ls->ls_rsbtbl_size - 1); 1110 1111 spin_lock(&ls->ls_rsbtbl[b].lock); 1112 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1113 if (!error) 1114 goto out_dump; 1115 1116 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1117 if (error) 1118 goto out; 1119 out_dump: 1120 dlm_dump_rsb(r); 1121 out: 1122 spin_unlock(&ls->ls_rsbtbl[b].lock); 1123 } 1124 1125 static void toss_rsb(struct kref *kref) 1126 { 1127 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1128 struct dlm_ls *ls = r->res_ls; 1129 1130 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1131 kref_init(&r->res_ref); 1132 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); 1133 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); 1134 r->res_toss_time = jiffies; 1135 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; 1136 if (r->res_lvbptr) { 1137 dlm_free_lvb(r->res_lvbptr); 1138 r->res_lvbptr = NULL; 1139 } 1140 } 1141 1142 /* See comment for unhold_lkb */ 1143 1144 static void unhold_rsb(struct dlm_rsb *r) 1145 { 1146 int rv; 1147 rv = kref_put(&r->res_ref, toss_rsb); 1148 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1149 } 1150 1151 static void kill_rsb(struct kref *kref) 1152 { 1153 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1154 1155 /* All work is done after the return from kref_put() so we 1156 can release the write_lock before the remove and free. */ 1157 1158 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1159 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1160 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1161 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1162 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1163 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1164 } 1165 1166 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 1167 The rsb must exist as long as any lkb's for it do. */ 1168 1169 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1170 { 1171 hold_rsb(r); 1172 lkb->lkb_resource = r; 1173 } 1174 1175 static void detach_lkb(struct dlm_lkb *lkb) 1176 { 1177 if (lkb->lkb_resource) { 1178 put_rsb(lkb->lkb_resource); 1179 lkb->lkb_resource = NULL; 1180 } 1181 } 1182 1183 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1184 { 1185 struct dlm_lkb *lkb; 1186 int rv, id; 1187 1188 lkb = dlm_allocate_lkb(ls); 1189 if (!lkb) 1190 return -ENOMEM; 1191 1192 lkb->lkb_nodeid = -1; 1193 lkb->lkb_grmode = DLM_LOCK_IV; 1194 kref_init(&lkb->lkb_ref); 1195 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1196 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1197 INIT_LIST_HEAD(&lkb->lkb_time_list); 1198 INIT_LIST_HEAD(&lkb->lkb_cb_list); 1199 mutex_init(&lkb->lkb_cb_mutex); 1200 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); 1201 1202 retry: 1203 rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS); 1204 if (!rv) 1205 return -ENOMEM; 1206 1207 spin_lock(&ls->ls_lkbidr_spin); 1208 rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id); 1209 if (!rv) 1210 lkb->lkb_id = id; 1211 spin_unlock(&ls->ls_lkbidr_spin); 1212 1213 if (rv == -EAGAIN) 1214 goto retry; 1215 1216 if (rv < 0) { 1217 log_error(ls, "create_lkb idr error %d", rv); 1218 return rv; 1219 } 1220 1221 *lkb_ret = lkb; 1222 return 0; 1223 } 1224 1225 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1226 { 1227 struct dlm_lkb *lkb; 1228 1229 spin_lock(&ls->ls_lkbidr_spin); 1230 lkb = idr_find(&ls->ls_lkbidr, lkid); 1231 if (lkb) 1232 kref_get(&lkb->lkb_ref); 1233 spin_unlock(&ls->ls_lkbidr_spin); 1234 1235 *lkb_ret = lkb; 1236 return lkb ? 0 : -ENOENT; 1237 } 1238 1239 static void kill_lkb(struct kref *kref) 1240 { 1241 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1242 1243 /* All work is done after the return from kref_put() so we 1244 can release the write_lock before the detach_lkb */ 1245 1246 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1247 } 1248 1249 /* __put_lkb() is used when an lkb may not have an rsb attached to 1250 it so we need to provide the lockspace explicitly */ 1251 1252 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 1253 { 1254 uint32_t lkid = lkb->lkb_id; 1255 1256 spin_lock(&ls->ls_lkbidr_spin); 1257 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 1258 idr_remove(&ls->ls_lkbidr, lkid); 1259 spin_unlock(&ls->ls_lkbidr_spin); 1260 1261 detach_lkb(lkb); 1262 1263 /* for local/process lkbs, lvbptr points to caller's lksb */ 1264 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 1265 dlm_free_lvb(lkb->lkb_lvbptr); 1266 dlm_free_lkb(lkb); 1267 return 1; 1268 } else { 1269 spin_unlock(&ls->ls_lkbidr_spin); 1270 return 0; 1271 } 1272 } 1273 1274 int dlm_put_lkb(struct dlm_lkb *lkb) 1275 { 1276 struct dlm_ls *ls; 1277 1278 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 1279 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 1280 1281 ls = lkb->lkb_resource->res_ls; 1282 return __put_lkb(ls, lkb); 1283 } 1284 1285 /* This is only called to add a reference when the code already holds 1286 a valid reference to the lkb, so there's no need for locking. */ 1287 1288 static inline void hold_lkb(struct dlm_lkb *lkb) 1289 { 1290 kref_get(&lkb->lkb_ref); 1291 } 1292 1293 /* This is called when we need to remove a reference and are certain 1294 it's not the last ref. e.g. del_lkb is always called between a 1295 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 1296 put_lkb would work fine, but would involve unnecessary locking */ 1297 1298 static inline void unhold_lkb(struct dlm_lkb *lkb) 1299 { 1300 int rv; 1301 rv = kref_put(&lkb->lkb_ref, kill_lkb); 1302 DLM_ASSERT(!rv, dlm_print_lkb(lkb);); 1303 } 1304 1305 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 1306 int mode) 1307 { 1308 struct dlm_lkb *lkb = NULL; 1309 1310 list_for_each_entry(lkb, head, lkb_statequeue) 1311 if (lkb->lkb_rqmode < mode) 1312 break; 1313 1314 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); 1315 } 1316 1317 /* add/remove lkb to rsb's grant/convert/wait queue */ 1318 1319 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 1320 { 1321 kref_get(&lkb->lkb_ref); 1322 1323 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1324 1325 lkb->lkb_timestamp = ktime_get(); 1326 1327 lkb->lkb_status = status; 1328 1329 switch (status) { 1330 case DLM_LKSTS_WAITING: 1331 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1332 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 1333 else 1334 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 1335 break; 1336 case DLM_LKSTS_GRANTED: 1337 /* convention says granted locks kept in order of grmode */ 1338 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 1339 lkb->lkb_grmode); 1340 break; 1341 case DLM_LKSTS_CONVERT: 1342 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1343 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 1344 else 1345 list_add_tail(&lkb->lkb_statequeue, 1346 &r->res_convertqueue); 1347 break; 1348 default: 1349 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 1350 } 1351 } 1352 1353 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1354 { 1355 lkb->lkb_status = 0; 1356 list_del(&lkb->lkb_statequeue); 1357 unhold_lkb(lkb); 1358 } 1359 1360 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 1361 { 1362 hold_lkb(lkb); 1363 del_lkb(r, lkb); 1364 add_lkb(r, lkb, sts); 1365 unhold_lkb(lkb); 1366 } 1367 1368 static int msg_reply_type(int mstype) 1369 { 1370 switch (mstype) { 1371 case DLM_MSG_REQUEST: 1372 return DLM_MSG_REQUEST_REPLY; 1373 case DLM_MSG_CONVERT: 1374 return DLM_MSG_CONVERT_REPLY; 1375 case DLM_MSG_UNLOCK: 1376 return DLM_MSG_UNLOCK_REPLY; 1377 case DLM_MSG_CANCEL: 1378 return DLM_MSG_CANCEL_REPLY; 1379 case DLM_MSG_LOOKUP: 1380 return DLM_MSG_LOOKUP_REPLY; 1381 } 1382 return -1; 1383 } 1384 1385 static int nodeid_warned(int nodeid, int num_nodes, int *warned) 1386 { 1387 int i; 1388 1389 for (i = 0; i < num_nodes; i++) { 1390 if (!warned[i]) { 1391 warned[i] = nodeid; 1392 return 0; 1393 } 1394 if (warned[i] == nodeid) 1395 return 1; 1396 } 1397 return 0; 1398 } 1399 1400 void dlm_scan_waiters(struct dlm_ls *ls) 1401 { 1402 struct dlm_lkb *lkb; 1403 ktime_t zero = ktime_set(0, 0); 1404 s64 us; 1405 s64 debug_maxus = 0; 1406 u32 debug_scanned = 0; 1407 u32 debug_expired = 0; 1408 int num_nodes = 0; 1409 int *warned = NULL; 1410 1411 if (!dlm_config.ci_waitwarn_us) 1412 return; 1413 1414 mutex_lock(&ls->ls_waiters_mutex); 1415 1416 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 1417 if (ktime_equal(lkb->lkb_wait_time, zero)) 1418 continue; 1419 1420 debug_scanned++; 1421 1422 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time)); 1423 1424 if (us < dlm_config.ci_waitwarn_us) 1425 continue; 1426 1427 lkb->lkb_wait_time = zero; 1428 1429 debug_expired++; 1430 if (us > debug_maxus) 1431 debug_maxus = us; 1432 1433 if (!num_nodes) { 1434 num_nodes = ls->ls_num_nodes; 1435 warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL); 1436 } 1437 if (!warned) 1438 continue; 1439 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) 1440 continue; 1441 1442 log_error(ls, "waitwarn %x %lld %d us check connection to " 1443 "node %d", lkb->lkb_id, (long long)us, 1444 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid); 1445 } 1446 mutex_unlock(&ls->ls_waiters_mutex); 1447 kfree(warned); 1448 1449 if (debug_expired) 1450 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us", 1451 debug_scanned, debug_expired, 1452 dlm_config.ci_waitwarn_us, (long long)debug_maxus); 1453 } 1454 1455 /* add/remove lkb from global waiters list of lkb's waiting for 1456 a reply from a remote node */ 1457 1458 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) 1459 { 1460 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1461 int error = 0; 1462 1463 mutex_lock(&ls->ls_waiters_mutex); 1464 1465 if (is_overlap_unlock(lkb) || 1466 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 1467 error = -EINVAL; 1468 goto out; 1469 } 1470 1471 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 1472 switch (mstype) { 1473 case DLM_MSG_UNLOCK: 1474 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1475 break; 1476 case DLM_MSG_CANCEL: 1477 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1478 break; 1479 default: 1480 error = -EBUSY; 1481 goto out; 1482 } 1483 lkb->lkb_wait_count++; 1484 hold_lkb(lkb); 1485 1486 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", 1487 lkb->lkb_id, lkb->lkb_wait_type, mstype, 1488 lkb->lkb_wait_count, lkb->lkb_flags); 1489 goto out; 1490 } 1491 1492 DLM_ASSERT(!lkb->lkb_wait_count, 1493 dlm_print_lkb(lkb); 1494 printk("wait_count %d\n", lkb->lkb_wait_count);); 1495 1496 lkb->lkb_wait_count++; 1497 lkb->lkb_wait_type = mstype; 1498 lkb->lkb_wait_time = ktime_get(); 1499 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ 1500 hold_lkb(lkb); 1501 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 1502 out: 1503 if (error) 1504 log_error(ls, "addwait error %x %d flags %x %d %d %s", 1505 lkb->lkb_id, error, lkb->lkb_flags, mstype, 1506 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 1507 mutex_unlock(&ls->ls_waiters_mutex); 1508 return error; 1509 } 1510 1511 /* We clear the RESEND flag because we might be taking an lkb off the waiters 1512 list as part of process_requestqueue (e.g. a lookup that has an optimized 1513 request reply on the requestqueue) between dlm_recover_waiters_pre() which 1514 set RESEND and dlm_recover_waiters_post() */ 1515 1516 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, 1517 struct dlm_message *ms) 1518 { 1519 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1520 int overlap_done = 0; 1521 1522 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { 1523 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); 1524 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 1525 overlap_done = 1; 1526 goto out_del; 1527 } 1528 1529 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { 1530 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); 1531 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1532 overlap_done = 1; 1533 goto out_del; 1534 } 1535 1536 /* Cancel state was preemptively cleared by a successful convert, 1537 see next comment, nothing to do. */ 1538 1539 if ((mstype == DLM_MSG_CANCEL_REPLY) && 1540 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { 1541 log_debug(ls, "remwait %x cancel_reply wait_type %d", 1542 lkb->lkb_id, lkb->lkb_wait_type); 1543 return -1; 1544 } 1545 1546 /* Remove for the convert reply, and premptively remove for the 1547 cancel reply. A convert has been granted while there's still 1548 an outstanding cancel on it (the cancel is moot and the result 1549 in the cancel reply should be 0). We preempt the cancel reply 1550 because the app gets the convert result and then can follow up 1551 with another op, like convert. This subsequent op would see the 1552 lingering state of the cancel and fail with -EBUSY. */ 1553 1554 if ((mstype == DLM_MSG_CONVERT_REPLY) && 1555 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && 1556 is_overlap_cancel(lkb) && ms && !ms->m_result) { 1557 log_debug(ls, "remwait %x convert_reply zap overlap_cancel", 1558 lkb->lkb_id); 1559 lkb->lkb_wait_type = 0; 1560 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1561 lkb->lkb_wait_count--; 1562 goto out_del; 1563 } 1564 1565 /* N.B. type of reply may not always correspond to type of original 1566 msg due to lookup->request optimization, verify others? */ 1567 1568 if (lkb->lkb_wait_type) { 1569 lkb->lkb_wait_type = 0; 1570 goto out_del; 1571 } 1572 1573 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1574 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid, 1575 mstype, lkb->lkb_flags); 1576 return -1; 1577 1578 out_del: 1579 /* the force-unlock/cancel has completed and we haven't recvd a reply 1580 to the op that was in progress prior to the unlock/cancel; we 1581 give up on any reply to the earlier op. FIXME: not sure when/how 1582 this would happen */ 1583 1584 if (overlap_done && lkb->lkb_wait_type) { 1585 log_error(ls, "remwait error %x reply %d wait_type %d overlap", 1586 lkb->lkb_id, mstype, lkb->lkb_wait_type); 1587 lkb->lkb_wait_count--; 1588 lkb->lkb_wait_type = 0; 1589 } 1590 1591 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 1592 1593 lkb->lkb_flags &= ~DLM_IFL_RESEND; 1594 lkb->lkb_wait_count--; 1595 if (!lkb->lkb_wait_count) 1596 list_del_init(&lkb->lkb_wait_reply); 1597 unhold_lkb(lkb); 1598 return 0; 1599 } 1600 1601 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 1602 { 1603 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1604 int error; 1605 1606 mutex_lock(&ls->ls_waiters_mutex); 1607 error = _remove_from_waiters(lkb, mstype, NULL); 1608 mutex_unlock(&ls->ls_waiters_mutex); 1609 return error; 1610 } 1611 1612 /* Handles situations where we might be processing a "fake" or "stub" reply in 1613 which we can't try to take waiters_mutex again. */ 1614 1615 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) 1616 { 1617 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1618 int error; 1619 1620 if (ms->m_flags != DLM_IFL_STUB_MS) 1621 mutex_lock(&ls->ls_waiters_mutex); 1622 error = _remove_from_waiters(lkb, ms->m_type, ms); 1623 if (ms->m_flags != DLM_IFL_STUB_MS) 1624 mutex_unlock(&ls->ls_waiters_mutex); 1625 return error; 1626 } 1627 1628 /* If there's an rsb for the same resource being removed, ensure 1629 that the remove message is sent before the new lookup message. 1630 It should be rare to need a delay here, but if not, then it may 1631 be worthwhile to add a proper wait mechanism rather than a delay. */ 1632 1633 static void wait_pending_remove(struct dlm_rsb *r) 1634 { 1635 struct dlm_ls *ls = r->res_ls; 1636 restart: 1637 spin_lock(&ls->ls_remove_spin); 1638 if (ls->ls_remove_len && 1639 !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) { 1640 log_debug(ls, "delay lookup for remove dir %d %s", 1641 r->res_dir_nodeid, r->res_name); 1642 spin_unlock(&ls->ls_remove_spin); 1643 msleep(1); 1644 goto restart; 1645 } 1646 spin_unlock(&ls->ls_remove_spin); 1647 } 1648 1649 /* 1650 * ls_remove_spin protects ls_remove_name and ls_remove_len which are 1651 * read by other threads in wait_pending_remove. ls_remove_names 1652 * and ls_remove_lens are only used by the scan thread, so they do 1653 * not need protection. 1654 */ 1655 1656 static void shrink_bucket(struct dlm_ls *ls, int b) 1657 { 1658 struct rb_node *n, *next; 1659 struct dlm_rsb *r; 1660 char *name; 1661 int our_nodeid = dlm_our_nodeid(); 1662 int remote_count = 0; 1663 int need_shrink = 0; 1664 int i, len, rv; 1665 1666 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); 1667 1668 spin_lock(&ls->ls_rsbtbl[b].lock); 1669 1670 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { 1671 spin_unlock(&ls->ls_rsbtbl[b].lock); 1672 return; 1673 } 1674 1675 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { 1676 next = rb_next(n); 1677 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1678 1679 /* If we're the directory record for this rsb, and 1680 we're not the master of it, then we need to wait 1681 for the master node to send us a dir remove for 1682 before removing the dir record. */ 1683 1684 if (!dlm_no_directory(ls) && 1685 (r->res_master_nodeid != our_nodeid) && 1686 (dlm_dir_nodeid(r) == our_nodeid)) { 1687 continue; 1688 } 1689 1690 need_shrink = 1; 1691 1692 if (!time_after_eq(jiffies, r->res_toss_time + 1693 dlm_config.ci_toss_secs * HZ)) { 1694 continue; 1695 } 1696 1697 if (!dlm_no_directory(ls) && 1698 (r->res_master_nodeid == our_nodeid) && 1699 (dlm_dir_nodeid(r) != our_nodeid)) { 1700 1701 /* We're the master of this rsb but we're not 1702 the directory record, so we need to tell the 1703 dir node to remove the dir record. */ 1704 1705 ls->ls_remove_lens[remote_count] = r->res_length; 1706 memcpy(ls->ls_remove_names[remote_count], r->res_name, 1707 DLM_RESNAME_MAXLEN); 1708 remote_count++; 1709 1710 if (remote_count >= DLM_REMOVE_NAMES_MAX) 1711 break; 1712 continue; 1713 } 1714 1715 if (!kref_put(&r->res_ref, kill_rsb)) { 1716 log_error(ls, "tossed rsb in use %s", r->res_name); 1717 continue; 1718 } 1719 1720 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1721 dlm_free_rsb(r); 1722 } 1723 1724 if (need_shrink) 1725 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; 1726 else 1727 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; 1728 spin_unlock(&ls->ls_rsbtbl[b].lock); 1729 1730 /* 1731 * While searching for rsb's to free, we found some that require 1732 * remote removal. We leave them in place and find them again here 1733 * so there is a very small gap between removing them from the toss 1734 * list and sending the removal. Keeping this gap small is 1735 * important to keep us (the master node) from being out of sync 1736 * with the remote dir node for very long. 1737 * 1738 * From the time the rsb is removed from toss until just after 1739 * send_remove, the rsb name is saved in ls_remove_name. A new 1740 * lookup checks this to ensure that a new lookup message for the 1741 * same resource name is not sent just before the remove message. 1742 */ 1743 1744 for (i = 0; i < remote_count; i++) { 1745 name = ls->ls_remove_names[i]; 1746 len = ls->ls_remove_lens[i]; 1747 1748 spin_lock(&ls->ls_rsbtbl[b].lock); 1749 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1750 if (rv) { 1751 spin_unlock(&ls->ls_rsbtbl[b].lock); 1752 log_debug(ls, "remove_name not toss %s", name); 1753 continue; 1754 } 1755 1756 if (r->res_master_nodeid != our_nodeid) { 1757 spin_unlock(&ls->ls_rsbtbl[b].lock); 1758 log_debug(ls, "remove_name master %d dir %d our %d %s", 1759 r->res_master_nodeid, r->res_dir_nodeid, 1760 our_nodeid, name); 1761 continue; 1762 } 1763 1764 if (r->res_dir_nodeid == our_nodeid) { 1765 /* should never happen */ 1766 spin_unlock(&ls->ls_rsbtbl[b].lock); 1767 log_error(ls, "remove_name dir %d master %d our %d %s", 1768 r->res_dir_nodeid, r->res_master_nodeid, 1769 our_nodeid, name); 1770 continue; 1771 } 1772 1773 if (!time_after_eq(jiffies, r->res_toss_time + 1774 dlm_config.ci_toss_secs * HZ)) { 1775 spin_unlock(&ls->ls_rsbtbl[b].lock); 1776 log_debug(ls, "remove_name toss_time %lu now %lu %s", 1777 r->res_toss_time, jiffies, name); 1778 continue; 1779 } 1780 1781 if (!kref_put(&r->res_ref, kill_rsb)) { 1782 spin_unlock(&ls->ls_rsbtbl[b].lock); 1783 log_error(ls, "remove_name in use %s", name); 1784 continue; 1785 } 1786 1787 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1788 1789 /* block lookup of same name until we've sent remove */ 1790 spin_lock(&ls->ls_remove_spin); 1791 ls->ls_remove_len = len; 1792 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 1793 spin_unlock(&ls->ls_remove_spin); 1794 spin_unlock(&ls->ls_rsbtbl[b].lock); 1795 1796 send_remove(r); 1797 1798 /* allow lookup of name again */ 1799 spin_lock(&ls->ls_remove_spin); 1800 ls->ls_remove_len = 0; 1801 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 1802 spin_unlock(&ls->ls_remove_spin); 1803 1804 dlm_free_rsb(r); 1805 } 1806 } 1807 1808 void dlm_scan_rsbs(struct dlm_ls *ls) 1809 { 1810 int i; 1811 1812 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1813 shrink_bucket(ls, i); 1814 if (dlm_locking_stopped(ls)) 1815 break; 1816 cond_resched(); 1817 } 1818 } 1819 1820 static void add_timeout(struct dlm_lkb *lkb) 1821 { 1822 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1823 1824 if (is_master_copy(lkb)) 1825 return; 1826 1827 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && 1828 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 1829 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; 1830 goto add_it; 1831 } 1832 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) 1833 goto add_it; 1834 return; 1835 1836 add_it: 1837 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); 1838 mutex_lock(&ls->ls_timeout_mutex); 1839 hold_lkb(lkb); 1840 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); 1841 mutex_unlock(&ls->ls_timeout_mutex); 1842 } 1843 1844 static void del_timeout(struct dlm_lkb *lkb) 1845 { 1846 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1847 1848 mutex_lock(&ls->ls_timeout_mutex); 1849 if (!list_empty(&lkb->lkb_time_list)) { 1850 list_del_init(&lkb->lkb_time_list); 1851 unhold_lkb(lkb); 1852 } 1853 mutex_unlock(&ls->ls_timeout_mutex); 1854 } 1855 1856 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and 1857 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex 1858 and then lock rsb because of lock ordering in add_timeout. We may need 1859 to specify some special timeout-related bits in the lkb that are just to 1860 be accessed under the timeout_mutex. */ 1861 1862 void dlm_scan_timeout(struct dlm_ls *ls) 1863 { 1864 struct dlm_rsb *r; 1865 struct dlm_lkb *lkb; 1866 int do_cancel, do_warn; 1867 s64 wait_us; 1868 1869 for (;;) { 1870 if (dlm_locking_stopped(ls)) 1871 break; 1872 1873 do_cancel = 0; 1874 do_warn = 0; 1875 mutex_lock(&ls->ls_timeout_mutex); 1876 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) { 1877 1878 wait_us = ktime_to_us(ktime_sub(ktime_get(), 1879 lkb->lkb_timestamp)); 1880 1881 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) && 1882 wait_us >= (lkb->lkb_timeout_cs * 10000)) 1883 do_cancel = 1; 1884 1885 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && 1886 wait_us >= dlm_config.ci_timewarn_cs * 10000) 1887 do_warn = 1; 1888 1889 if (!do_cancel && !do_warn) 1890 continue; 1891 hold_lkb(lkb); 1892 break; 1893 } 1894 mutex_unlock(&ls->ls_timeout_mutex); 1895 1896 if (!do_cancel && !do_warn) 1897 break; 1898 1899 r = lkb->lkb_resource; 1900 hold_rsb(r); 1901 lock_rsb(r); 1902 1903 if (do_warn) { 1904 /* clear flag so we only warn once */ 1905 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1906 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) 1907 del_timeout(lkb); 1908 dlm_timeout_warn(lkb); 1909 } 1910 1911 if (do_cancel) { 1912 log_debug(ls, "timeout cancel %x node %d %s", 1913 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 1914 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1915 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; 1916 del_timeout(lkb); 1917 _cancel_lock(r, lkb); 1918 } 1919 1920 unlock_rsb(r); 1921 unhold_rsb(r); 1922 dlm_put_lkb(lkb); 1923 } 1924 } 1925 1926 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping 1927 dlm_recoverd before checking/setting ls_recover_begin. */ 1928 1929 void dlm_adjust_timeouts(struct dlm_ls *ls) 1930 { 1931 struct dlm_lkb *lkb; 1932 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin); 1933 1934 ls->ls_recover_begin = 0; 1935 mutex_lock(&ls->ls_timeout_mutex); 1936 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) 1937 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); 1938 mutex_unlock(&ls->ls_timeout_mutex); 1939 1940 if (!dlm_config.ci_waitwarn_us) 1941 return; 1942 1943 mutex_lock(&ls->ls_waiters_mutex); 1944 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 1945 if (ktime_to_us(lkb->lkb_wait_time)) 1946 lkb->lkb_wait_time = ktime_get(); 1947 } 1948 mutex_unlock(&ls->ls_waiters_mutex); 1949 } 1950 1951 /* lkb is master or local copy */ 1952 1953 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1954 { 1955 int b, len = r->res_ls->ls_lvblen; 1956 1957 /* b=1 lvb returned to caller 1958 b=0 lvb written to rsb or invalidated 1959 b=-1 do nothing */ 1960 1961 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1962 1963 if (b == 1) { 1964 if (!lkb->lkb_lvbptr) 1965 return; 1966 1967 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1968 return; 1969 1970 if (!r->res_lvbptr) 1971 return; 1972 1973 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1974 lkb->lkb_lvbseq = r->res_lvbseq; 1975 1976 } else if (b == 0) { 1977 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1978 rsb_set_flag(r, RSB_VALNOTVALID); 1979 return; 1980 } 1981 1982 if (!lkb->lkb_lvbptr) 1983 return; 1984 1985 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1986 return; 1987 1988 if (!r->res_lvbptr) 1989 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1990 1991 if (!r->res_lvbptr) 1992 return; 1993 1994 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1995 r->res_lvbseq++; 1996 lkb->lkb_lvbseq = r->res_lvbseq; 1997 rsb_clear_flag(r, RSB_VALNOTVALID); 1998 } 1999 2000 if (rsb_flag(r, RSB_VALNOTVALID)) 2001 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 2002 } 2003 2004 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2005 { 2006 if (lkb->lkb_grmode < DLM_LOCK_PW) 2007 return; 2008 2009 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 2010 rsb_set_flag(r, RSB_VALNOTVALID); 2011 return; 2012 } 2013 2014 if (!lkb->lkb_lvbptr) 2015 return; 2016 2017 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2018 return; 2019 2020 if (!r->res_lvbptr) 2021 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 2022 2023 if (!r->res_lvbptr) 2024 return; 2025 2026 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 2027 r->res_lvbseq++; 2028 rsb_clear_flag(r, RSB_VALNOTVALID); 2029 } 2030 2031 /* lkb is process copy (pc) */ 2032 2033 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2034 struct dlm_message *ms) 2035 { 2036 int b; 2037 2038 if (!lkb->lkb_lvbptr) 2039 return; 2040 2041 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2042 return; 2043 2044 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 2045 if (b == 1) { 2046 int len = receive_extralen(ms); 2047 if (len > DLM_RESNAME_MAXLEN) 2048 len = DLM_RESNAME_MAXLEN; 2049 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2050 lkb->lkb_lvbseq = ms->m_lvbseq; 2051 } 2052 } 2053 2054 /* Manipulate lkb's on rsb's convert/granted/waiting queues 2055 remove_lock -- used for unlock, removes lkb from granted 2056 revert_lock -- used for cancel, moves lkb from convert to granted 2057 grant_lock -- used for request and convert, adds lkb to granted or 2058 moves lkb from convert or waiting to granted 2059 2060 Each of these is used for master or local copy lkb's. There is 2061 also a _pc() variation used to make the corresponding change on 2062 a process copy (pc) lkb. */ 2063 2064 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2065 { 2066 del_lkb(r, lkb); 2067 lkb->lkb_grmode = DLM_LOCK_IV; 2068 /* this unhold undoes the original ref from create_lkb() 2069 so this leads to the lkb being freed */ 2070 unhold_lkb(lkb); 2071 } 2072 2073 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2074 { 2075 set_lvb_unlock(r, lkb); 2076 _remove_lock(r, lkb); 2077 } 2078 2079 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2080 { 2081 _remove_lock(r, lkb); 2082 } 2083 2084 /* returns: 0 did nothing 2085 1 moved lock to granted 2086 -1 removed lock */ 2087 2088 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2089 { 2090 int rv = 0; 2091 2092 lkb->lkb_rqmode = DLM_LOCK_IV; 2093 2094 switch (lkb->lkb_status) { 2095 case DLM_LKSTS_GRANTED: 2096 break; 2097 case DLM_LKSTS_CONVERT: 2098 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2099 rv = 1; 2100 break; 2101 case DLM_LKSTS_WAITING: 2102 del_lkb(r, lkb); 2103 lkb->lkb_grmode = DLM_LOCK_IV; 2104 /* this unhold undoes the original ref from create_lkb() 2105 so this leads to the lkb being freed */ 2106 unhold_lkb(lkb); 2107 rv = -1; 2108 break; 2109 default: 2110 log_print("invalid status for revert %d", lkb->lkb_status); 2111 } 2112 return rv; 2113 } 2114 2115 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2116 { 2117 return revert_lock(r, lkb); 2118 } 2119 2120 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2121 { 2122 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 2123 lkb->lkb_grmode = lkb->lkb_rqmode; 2124 if (lkb->lkb_status) 2125 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2126 else 2127 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 2128 } 2129 2130 lkb->lkb_rqmode = DLM_LOCK_IV; 2131 lkb->lkb_highbast = 0; 2132 } 2133 2134 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2135 { 2136 set_lvb_lock(r, lkb); 2137 _grant_lock(r, lkb); 2138 } 2139 2140 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2141 struct dlm_message *ms) 2142 { 2143 set_lvb_lock_pc(r, lkb, ms); 2144 _grant_lock(r, lkb); 2145 } 2146 2147 /* called by grant_pending_locks() which means an async grant message must 2148 be sent to the requesting node in addition to granting the lock if the 2149 lkb belongs to a remote node. */ 2150 2151 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 2152 { 2153 grant_lock(r, lkb); 2154 if (is_master_copy(lkb)) 2155 send_grant(r, lkb); 2156 else 2157 queue_cast(r, lkb, 0); 2158 } 2159 2160 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 2161 change the granted/requested modes. We're munging things accordingly in 2162 the process copy. 2163 CONVDEADLK: our grmode may have been forced down to NL to resolve a 2164 conversion deadlock 2165 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 2166 compatible with other granted locks */ 2167 2168 static void munge_demoted(struct dlm_lkb *lkb) 2169 { 2170 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 2171 log_print("munge_demoted %x invalid modes gr %d rq %d", 2172 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 2173 return; 2174 } 2175 2176 lkb->lkb_grmode = DLM_LOCK_NL; 2177 } 2178 2179 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 2180 { 2181 if (ms->m_type != DLM_MSG_REQUEST_REPLY && 2182 ms->m_type != DLM_MSG_GRANT) { 2183 log_print("munge_altmode %x invalid reply type %d", 2184 lkb->lkb_id, ms->m_type); 2185 return; 2186 } 2187 2188 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 2189 lkb->lkb_rqmode = DLM_LOCK_PR; 2190 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 2191 lkb->lkb_rqmode = DLM_LOCK_CW; 2192 else { 2193 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 2194 dlm_print_lkb(lkb); 2195 } 2196 } 2197 2198 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 2199 { 2200 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 2201 lkb_statequeue); 2202 if (lkb->lkb_id == first->lkb_id) 2203 return 1; 2204 2205 return 0; 2206 } 2207 2208 /* Check if the given lkb conflicts with another lkb on the queue. */ 2209 2210 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 2211 { 2212 struct dlm_lkb *this; 2213 2214 list_for_each_entry(this, head, lkb_statequeue) { 2215 if (this == lkb) 2216 continue; 2217 if (!modes_compat(this, lkb)) 2218 return 1; 2219 } 2220 return 0; 2221 } 2222 2223 /* 2224 * "A conversion deadlock arises with a pair of lock requests in the converting 2225 * queue for one resource. The granted mode of each lock blocks the requested 2226 * mode of the other lock." 2227 * 2228 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the 2229 * convert queue from being granted, then deadlk/demote lkb. 2230 * 2231 * Example: 2232 * Granted Queue: empty 2233 * Convert Queue: NL->EX (first lock) 2234 * PR->EX (second lock) 2235 * 2236 * The first lock can't be granted because of the granted mode of the second 2237 * lock and the second lock can't be granted because it's not first in the 2238 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we 2239 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK 2240 * flag set and return DEMOTED in the lksb flags. 2241 * 2242 * Originally, this function detected conv-deadlk in a more limited scope: 2243 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or 2244 * - if lkb1 was the first entry in the queue (not just earlier), and was 2245 * blocked by the granted mode of lkb2, and there was nothing on the 2246 * granted queue preventing lkb1 from being granted immediately, i.e. 2247 * lkb2 was the only thing preventing lkb1 from being granted. 2248 * 2249 * That second condition meant we'd only say there was conv-deadlk if 2250 * resolving it (by demotion) would lead to the first lock on the convert 2251 * queue being granted right away. It allowed conversion deadlocks to exist 2252 * between locks on the convert queue while they couldn't be granted anyway. 2253 * 2254 * Now, we detect and take action on conversion deadlocks immediately when 2255 * they're created, even if they may not be immediately consequential. If 2256 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted 2257 * mode that would prevent lkb1's conversion from being granted, we do a 2258 * deadlk/demote on lkb2 right away and don't let it onto the convert queue. 2259 * I think this means that the lkb_is_ahead condition below should always 2260 * be zero, i.e. there will never be conv-deadlk between two locks that are 2261 * both already on the convert queue. 2262 */ 2263 2264 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) 2265 { 2266 struct dlm_lkb *lkb1; 2267 int lkb_is_ahead = 0; 2268 2269 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { 2270 if (lkb1 == lkb2) { 2271 lkb_is_ahead = 1; 2272 continue; 2273 } 2274 2275 if (!lkb_is_ahead) { 2276 if (!modes_compat(lkb2, lkb1)) 2277 return 1; 2278 } else { 2279 if (!modes_compat(lkb2, lkb1) && 2280 !modes_compat(lkb1, lkb2)) 2281 return 1; 2282 } 2283 } 2284 return 0; 2285 } 2286 2287 /* 2288 * Return 1 if the lock can be granted, 0 otherwise. 2289 * Also detect and resolve conversion deadlocks. 2290 * 2291 * lkb is the lock to be granted 2292 * 2293 * now is 1 if the function is being called in the context of the 2294 * immediate request, it is 0 if called later, after the lock has been 2295 * queued. 2296 * 2297 * recover is 1 if dlm_recover_grant() is trying to grant conversions 2298 * after recovery. 2299 * 2300 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2301 */ 2302 2303 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2304 int recover) 2305 { 2306 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2307 2308 /* 2309 * 6-10: Version 5.4 introduced an option to address the phenomenon of 2310 * a new request for a NL mode lock being blocked. 2311 * 2312 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 2313 * request, then it would be granted. In essence, the use of this flag 2314 * tells the Lock Manager to expedite theis request by not considering 2315 * what may be in the CONVERTING or WAITING queues... As of this 2316 * writing, the EXPEDITE flag can be used only with new requests for NL 2317 * mode locks. This flag is not valid for conversion requests. 2318 * 2319 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 2320 * conversion or used with a non-NL requested mode. We also know an 2321 * EXPEDITE request is always granted immediately, so now must always 2322 * be 1. The full condition to grant an expedite request: (now && 2323 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 2324 * therefore be shortened to just checking the flag. 2325 */ 2326 2327 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 2328 return 1; 2329 2330 /* 2331 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 2332 * added to the remaining conditions. 2333 */ 2334 2335 if (queue_conflict(&r->res_grantqueue, lkb)) 2336 return 0; 2337 2338 /* 2339 * 6-3: By default, a conversion request is immediately granted if the 2340 * requested mode is compatible with the modes of all other granted 2341 * locks 2342 */ 2343 2344 if (queue_conflict(&r->res_convertqueue, lkb)) 2345 return 0; 2346 2347 /* 2348 * The RECOVER_GRANT flag means dlm_recover_grant() is granting 2349 * locks for a recovered rsb, on which lkb's have been rebuilt. 2350 * The lkb's may have been rebuilt on the queues in a different 2351 * order than they were in on the previous master. So, granting 2352 * queued conversions in order after recovery doesn't make sense 2353 * since the order hasn't been preserved anyway. The new order 2354 * could also have created a new "in place" conversion deadlock. 2355 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. 2356 * After recovery, there would be no granted locks, and possibly 2357 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after 2358 * recovery, grant conversions without considering order. 2359 */ 2360 2361 if (conv && recover) 2362 return 1; 2363 2364 /* 2365 * 6-5: But the default algorithm for deciding whether to grant or 2366 * queue conversion requests does not by itself guarantee that such 2367 * requests are serviced on a "first come first serve" basis. This, in 2368 * turn, can lead to a phenomenon known as "indefinate postponement". 2369 * 2370 * 6-7: This issue is dealt with by using the optional QUECVT flag with 2371 * the system service employed to request a lock conversion. This flag 2372 * forces certain conversion requests to be queued, even if they are 2373 * compatible with the granted modes of other locks on the same 2374 * resource. Thus, the use of this flag results in conversion requests 2375 * being ordered on a "first come first servce" basis. 2376 * 2377 * DCT: This condition is all about new conversions being able to occur 2378 * "in place" while the lock remains on the granted queue (assuming 2379 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 2380 * doesn't _have_ to go onto the convert queue where it's processed in 2381 * order. The "now" variable is necessary to distinguish converts 2382 * being received and processed for the first time now, because once a 2383 * convert is moved to the conversion queue the condition below applies 2384 * requiring fifo granting. 2385 */ 2386 2387 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 2388 return 1; 2389 2390 /* 2391 * Even if the convert is compat with all granted locks, 2392 * QUECVT forces it behind other locks on the convert queue. 2393 */ 2394 2395 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { 2396 if (list_empty(&r->res_convertqueue)) 2397 return 1; 2398 else 2399 return 0; 2400 } 2401 2402 /* 2403 * The NOORDER flag is set to avoid the standard vms rules on grant 2404 * order. 2405 */ 2406 2407 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 2408 return 1; 2409 2410 /* 2411 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 2412 * granted until all other conversion requests ahead of it are granted 2413 * and/or canceled. 2414 */ 2415 2416 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 2417 return 1; 2418 2419 /* 2420 * 6-4: By default, a new request is immediately granted only if all 2421 * three of the following conditions are satisfied when the request is 2422 * issued: 2423 * - The queue of ungranted conversion requests for the resource is 2424 * empty. 2425 * - The queue of ungranted new requests for the resource is empty. 2426 * - The mode of the new request is compatible with the most 2427 * restrictive mode of all granted locks on the resource. 2428 */ 2429 2430 if (now && !conv && list_empty(&r->res_convertqueue) && 2431 list_empty(&r->res_waitqueue)) 2432 return 1; 2433 2434 /* 2435 * 6-4: Once a lock request is in the queue of ungranted new requests, 2436 * it cannot be granted until the queue of ungranted conversion 2437 * requests is empty, all ungranted new requests ahead of it are 2438 * granted and/or canceled, and it is compatible with the granted mode 2439 * of the most restrictive lock granted on the resource. 2440 */ 2441 2442 if (!now && !conv && list_empty(&r->res_convertqueue) && 2443 first_in_list(lkb, &r->res_waitqueue)) 2444 return 1; 2445 2446 return 0; 2447 } 2448 2449 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2450 int recover, int *err) 2451 { 2452 int rv; 2453 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2454 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); 2455 2456 if (err) 2457 *err = 0; 2458 2459 rv = _can_be_granted(r, lkb, now, recover); 2460 if (rv) 2461 goto out; 2462 2463 /* 2464 * The CONVDEADLK flag is non-standard and tells the dlm to resolve 2465 * conversion deadlocks by demoting grmode to NL, otherwise the dlm 2466 * cancels one of the locks. 2467 */ 2468 2469 if (is_convert && can_be_queued(lkb) && 2470 conversion_deadlock_detect(r, lkb)) { 2471 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { 2472 lkb->lkb_grmode = DLM_LOCK_NL; 2473 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 2474 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 2475 if (err) 2476 *err = -EDEADLK; 2477 else { 2478 log_print("can_be_granted deadlock %x now %d", 2479 lkb->lkb_id, now); 2480 dlm_dump_rsb(r); 2481 } 2482 } 2483 goto out; 2484 } 2485 2486 /* 2487 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try 2488 * to grant a request in a mode other than the normal rqmode. It's a 2489 * simple way to provide a big optimization to applications that can 2490 * use them. 2491 */ 2492 2493 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) 2494 alt = DLM_LOCK_PR; 2495 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) 2496 alt = DLM_LOCK_CW; 2497 2498 if (alt) { 2499 lkb->lkb_rqmode = alt; 2500 rv = _can_be_granted(r, lkb, now, 0); 2501 if (rv) 2502 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 2503 else 2504 lkb->lkb_rqmode = rqmode; 2505 } 2506 out: 2507 return rv; 2508 } 2509 2510 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock 2511 for locks pending on the convert list. Once verified (watch for these 2512 log_prints), we should be able to just call _can_be_granted() and not 2513 bother with the demote/deadlk cases here (and there's no easy way to deal 2514 with a deadlk here, we'd have to generate something like grant_lock with 2515 the deadlk error.) */ 2516 2517 /* Returns the highest requested mode of all blocked conversions; sets 2518 cw if there's a blocked conversion to DLM_LOCK_CW. */ 2519 2520 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 2521 unsigned int *count) 2522 { 2523 struct dlm_lkb *lkb, *s; 2524 int recover = rsb_flag(r, RSB_RECOVER_GRANT); 2525 int hi, demoted, quit, grant_restart, demote_restart; 2526 int deadlk; 2527 2528 quit = 0; 2529 restart: 2530 grant_restart = 0; 2531 demote_restart = 0; 2532 hi = DLM_LOCK_IV; 2533 2534 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 2535 demoted = is_demoted(lkb); 2536 deadlk = 0; 2537 2538 if (can_be_granted(r, lkb, 0, recover, &deadlk)) { 2539 grant_lock_pending(r, lkb); 2540 grant_restart = 1; 2541 if (count) 2542 (*count)++; 2543 continue; 2544 } 2545 2546 if (!demoted && is_demoted(lkb)) { 2547 log_print("WARN: pending demoted %x node %d %s", 2548 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2549 demote_restart = 1; 2550 continue; 2551 } 2552 2553 if (deadlk) { 2554 log_print("WARN: pending deadlock %x node %d %s", 2555 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2556 dlm_dump_rsb(r); 2557 continue; 2558 } 2559 2560 hi = max_t(int, lkb->lkb_rqmode, hi); 2561 2562 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) 2563 *cw = 1; 2564 } 2565 2566 if (grant_restart) 2567 goto restart; 2568 if (demote_restart && !quit) { 2569 quit = 1; 2570 goto restart; 2571 } 2572 2573 return max_t(int, high, hi); 2574 } 2575 2576 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 2577 unsigned int *count) 2578 { 2579 struct dlm_lkb *lkb, *s; 2580 2581 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2582 if (can_be_granted(r, lkb, 0, 0, NULL)) { 2583 grant_lock_pending(r, lkb); 2584 if (count) 2585 (*count)++; 2586 } else { 2587 high = max_t(int, lkb->lkb_rqmode, high); 2588 if (lkb->lkb_rqmode == DLM_LOCK_CW) 2589 *cw = 1; 2590 } 2591 } 2592 2593 return high; 2594 } 2595 2596 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked 2597 on either the convert or waiting queue. 2598 high is the largest rqmode of all locks blocked on the convert or 2599 waiting queue. */ 2600 2601 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) 2602 { 2603 if (gr->lkb_grmode == DLM_LOCK_PR && cw) { 2604 if (gr->lkb_highbast < DLM_LOCK_EX) 2605 return 1; 2606 return 0; 2607 } 2608 2609 if (gr->lkb_highbast < high && 2610 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 2611 return 1; 2612 return 0; 2613 } 2614 2615 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) 2616 { 2617 struct dlm_lkb *lkb, *s; 2618 int high = DLM_LOCK_IV; 2619 int cw = 0; 2620 2621 if (!is_master(r)) { 2622 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 2623 dlm_dump_rsb(r); 2624 return; 2625 } 2626 2627 high = grant_pending_convert(r, high, &cw, count); 2628 high = grant_pending_wait(r, high, &cw, count); 2629 2630 if (high == DLM_LOCK_IV) 2631 return; 2632 2633 /* 2634 * If there are locks left on the wait/convert queue then send blocking 2635 * ASTs to granted locks based on the largest requested mode (high) 2636 * found above. 2637 */ 2638 2639 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 2640 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { 2641 if (cw && high == DLM_LOCK_PR && 2642 lkb->lkb_grmode == DLM_LOCK_PR) 2643 queue_bast(r, lkb, DLM_LOCK_CW); 2644 else 2645 queue_bast(r, lkb, high); 2646 lkb->lkb_highbast = high; 2647 } 2648 } 2649 } 2650 2651 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) 2652 { 2653 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || 2654 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { 2655 if (gr->lkb_highbast < DLM_LOCK_EX) 2656 return 1; 2657 return 0; 2658 } 2659 2660 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) 2661 return 1; 2662 return 0; 2663 } 2664 2665 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 2666 struct dlm_lkb *lkb) 2667 { 2668 struct dlm_lkb *gr; 2669 2670 list_for_each_entry(gr, head, lkb_statequeue) { 2671 /* skip self when sending basts to convertqueue */ 2672 if (gr == lkb) 2673 continue; 2674 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 2675 queue_bast(r, gr, lkb->lkb_rqmode); 2676 gr->lkb_highbast = lkb->lkb_rqmode; 2677 } 2678 } 2679 } 2680 2681 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 2682 { 2683 send_bast_queue(r, &r->res_grantqueue, lkb); 2684 } 2685 2686 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 2687 { 2688 send_bast_queue(r, &r->res_grantqueue, lkb); 2689 send_bast_queue(r, &r->res_convertqueue, lkb); 2690 } 2691 2692 /* set_master(r, lkb) -- set the master nodeid of a resource 2693 2694 The purpose of this function is to set the nodeid field in the given 2695 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 2696 known, it can just be copied to the lkb and the function will return 2697 0. If the rsb's nodeid is _not_ known, it needs to be looked up 2698 before it can be copied to the lkb. 2699 2700 When the rsb nodeid is being looked up remotely, the initial lkb 2701 causing the lookup is kept on the ls_waiters list waiting for the 2702 lookup reply. Other lkb's waiting for the same rsb lookup are kept 2703 on the rsb's res_lookup list until the master is verified. 2704 2705 Return values: 2706 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 2707 1: the rsb master is not available and the lkb has been placed on 2708 a wait queue 2709 */ 2710 2711 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2712 { 2713 int our_nodeid = dlm_our_nodeid(); 2714 2715 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2716 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2717 r->res_first_lkid = lkb->lkb_id; 2718 lkb->lkb_nodeid = r->res_nodeid; 2719 return 0; 2720 } 2721 2722 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 2723 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 2724 return 1; 2725 } 2726 2727 if (r->res_master_nodeid == our_nodeid) { 2728 lkb->lkb_nodeid = 0; 2729 return 0; 2730 } 2731 2732 if (r->res_master_nodeid) { 2733 lkb->lkb_nodeid = r->res_master_nodeid; 2734 return 0; 2735 } 2736 2737 if (dlm_dir_nodeid(r) == our_nodeid) { 2738 /* This is a somewhat unusual case; find_rsb will usually 2739 have set res_master_nodeid when dir nodeid is local, but 2740 there are cases where we become the dir node after we've 2741 past find_rsb and go through _request_lock again. 2742 confirm_master() or process_lookup_list() needs to be 2743 called after this. */ 2744 log_debug(r->res_ls, "set_master %x self master %d dir %d %s", 2745 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, 2746 r->res_name); 2747 r->res_master_nodeid = our_nodeid; 2748 r->res_nodeid = 0; 2749 lkb->lkb_nodeid = 0; 2750 return 0; 2751 } 2752 2753 wait_pending_remove(r); 2754 2755 r->res_first_lkid = lkb->lkb_id; 2756 send_lookup(r, lkb); 2757 return 1; 2758 } 2759 2760 static void process_lookup_list(struct dlm_rsb *r) 2761 { 2762 struct dlm_lkb *lkb, *safe; 2763 2764 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 2765 list_del_init(&lkb->lkb_rsb_lookup); 2766 _request_lock(r, lkb); 2767 schedule(); 2768 } 2769 } 2770 2771 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 2772 2773 static void confirm_master(struct dlm_rsb *r, int error) 2774 { 2775 struct dlm_lkb *lkb; 2776 2777 if (!r->res_first_lkid) 2778 return; 2779 2780 switch (error) { 2781 case 0: 2782 case -EINPROGRESS: 2783 r->res_first_lkid = 0; 2784 process_lookup_list(r); 2785 break; 2786 2787 case -EAGAIN: 2788 case -EBADR: 2789 case -ENOTBLK: 2790 /* the remote request failed and won't be retried (it was 2791 a NOQUEUE, or has been canceled/unlocked); make a waiting 2792 lkb the first_lkid */ 2793 2794 r->res_first_lkid = 0; 2795 2796 if (!list_empty(&r->res_lookup)) { 2797 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 2798 lkb_rsb_lookup); 2799 list_del_init(&lkb->lkb_rsb_lookup); 2800 r->res_first_lkid = lkb->lkb_id; 2801 _request_lock(r, lkb); 2802 } 2803 break; 2804 2805 default: 2806 log_error(r->res_ls, "confirm_master unknown error %d", error); 2807 } 2808 } 2809 2810 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2811 int namelen, unsigned long timeout_cs, 2812 void (*ast) (void *astparam), 2813 void *astparam, 2814 void (*bast) (void *astparam, int mode), 2815 struct dlm_args *args) 2816 { 2817 int rv = -EINVAL; 2818 2819 /* check for invalid arg usage */ 2820 2821 if (mode < 0 || mode > DLM_LOCK_EX) 2822 goto out; 2823 2824 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 2825 goto out; 2826 2827 if (flags & DLM_LKF_CANCEL) 2828 goto out; 2829 2830 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 2831 goto out; 2832 2833 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 2834 goto out; 2835 2836 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 2837 goto out; 2838 2839 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 2840 goto out; 2841 2842 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 2843 goto out; 2844 2845 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 2846 goto out; 2847 2848 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 2849 goto out; 2850 2851 if (!ast || !lksb) 2852 goto out; 2853 2854 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 2855 goto out; 2856 2857 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 2858 goto out; 2859 2860 /* these args will be copied to the lkb in validate_lock_args, 2861 it cannot be done now because when converting locks, fields in 2862 an active lkb cannot be modified before locking the rsb */ 2863 2864 args->flags = flags; 2865 args->astfn = ast; 2866 args->astparam = astparam; 2867 args->bastfn = bast; 2868 args->timeout = timeout_cs; 2869 args->mode = mode; 2870 args->lksb = lksb; 2871 rv = 0; 2872 out: 2873 return rv; 2874 } 2875 2876 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 2877 { 2878 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 2879 DLM_LKF_FORCEUNLOCK)) 2880 return -EINVAL; 2881 2882 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 2883 return -EINVAL; 2884 2885 args->flags = flags; 2886 args->astparam = astarg; 2887 return 0; 2888 } 2889 2890 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2891 struct dlm_args *args) 2892 { 2893 int rv = -EINVAL; 2894 2895 if (args->flags & DLM_LKF_CONVERT) { 2896 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 2897 goto out; 2898 2899 if (args->flags & DLM_LKF_QUECVT && 2900 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 2901 goto out; 2902 2903 rv = -EBUSY; 2904 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2905 goto out; 2906 2907 if (lkb->lkb_wait_type) 2908 goto out; 2909 2910 if (is_overlap(lkb)) 2911 goto out; 2912 } 2913 2914 lkb->lkb_exflags = args->flags; 2915 lkb->lkb_sbflags = 0; 2916 lkb->lkb_astfn = args->astfn; 2917 lkb->lkb_astparam = args->astparam; 2918 lkb->lkb_bastfn = args->bastfn; 2919 lkb->lkb_rqmode = args->mode; 2920 lkb->lkb_lksb = args->lksb; 2921 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2922 lkb->lkb_ownpid = (int) current->pid; 2923 lkb->lkb_timeout_cs = args->timeout; 2924 rv = 0; 2925 out: 2926 if (rv) 2927 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s", 2928 rv, lkb->lkb_id, lkb->lkb_flags, args->flags, 2929 lkb->lkb_status, lkb->lkb_wait_type, 2930 lkb->lkb_resource->res_name); 2931 return rv; 2932 } 2933 2934 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 2935 for success */ 2936 2937 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 2938 because there may be a lookup in progress and it's valid to do 2939 cancel/unlockf on it */ 2940 2941 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 2942 { 2943 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 2944 int rv = -EINVAL; 2945 2946 if (lkb->lkb_flags & DLM_IFL_MSTCPY) { 2947 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 2948 dlm_print_lkb(lkb); 2949 goto out; 2950 } 2951 2952 /* an lkb may still exist even though the lock is EOL'ed due to a 2953 cancel, unlock or failed noqueue request; an app can't use these 2954 locks; return same error as if the lkid had not been found at all */ 2955 2956 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { 2957 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 2958 rv = -ENOENT; 2959 goto out; 2960 } 2961 2962 /* an lkb may be waiting for an rsb lookup to complete where the 2963 lookup was initiated by another lock */ 2964 2965 if (!list_empty(&lkb->lkb_rsb_lookup)) { 2966 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 2967 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 2968 list_del_init(&lkb->lkb_rsb_lookup); 2969 queue_cast(lkb->lkb_resource, lkb, 2970 args->flags & DLM_LKF_CANCEL ? 2971 -DLM_ECANCEL : -DLM_EUNLOCK); 2972 unhold_lkb(lkb); /* undoes create_lkb() */ 2973 } 2974 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ 2975 rv = -EBUSY; 2976 goto out; 2977 } 2978 2979 /* cancel not allowed with another cancel/unlock in progress */ 2980 2981 if (args->flags & DLM_LKF_CANCEL) { 2982 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 2983 goto out; 2984 2985 if (is_overlap(lkb)) 2986 goto out; 2987 2988 /* don't let scand try to do a cancel */ 2989 del_timeout(lkb); 2990 2991 if (lkb->lkb_flags & DLM_IFL_RESEND) { 2992 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 2993 rv = -EBUSY; 2994 goto out; 2995 } 2996 2997 /* there's nothing to cancel */ 2998 if (lkb->lkb_status == DLM_LKSTS_GRANTED && 2999 !lkb->lkb_wait_type) { 3000 rv = -EBUSY; 3001 goto out; 3002 } 3003 3004 switch (lkb->lkb_wait_type) { 3005 case DLM_MSG_LOOKUP: 3006 case DLM_MSG_REQUEST: 3007 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 3008 rv = -EBUSY; 3009 goto out; 3010 case DLM_MSG_UNLOCK: 3011 case DLM_MSG_CANCEL: 3012 goto out; 3013 } 3014 /* add_to_waiters() will set OVERLAP_CANCEL */ 3015 goto out_ok; 3016 } 3017 3018 /* do we need to allow a force-unlock if there's a normal unlock 3019 already in progress? in what conditions could the normal unlock 3020 fail such that we'd want to send a force-unlock to be sure? */ 3021 3022 if (args->flags & DLM_LKF_FORCEUNLOCK) { 3023 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 3024 goto out; 3025 3026 if (is_overlap_unlock(lkb)) 3027 goto out; 3028 3029 /* don't let scand try to do a cancel */ 3030 del_timeout(lkb); 3031 3032 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3033 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3034 rv = -EBUSY; 3035 goto out; 3036 } 3037 3038 switch (lkb->lkb_wait_type) { 3039 case DLM_MSG_LOOKUP: 3040 case DLM_MSG_REQUEST: 3041 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3042 rv = -EBUSY; 3043 goto out; 3044 case DLM_MSG_UNLOCK: 3045 goto out; 3046 } 3047 /* add_to_waiters() will set OVERLAP_UNLOCK */ 3048 goto out_ok; 3049 } 3050 3051 /* normal unlock not allowed if there's any op in progress */ 3052 rv = -EBUSY; 3053 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 3054 goto out; 3055 3056 out_ok: 3057 /* an overlapping op shouldn't blow away exflags from other op */ 3058 lkb->lkb_exflags |= args->flags; 3059 lkb->lkb_sbflags = 0; 3060 lkb->lkb_astparam = args->astparam; 3061 rv = 0; 3062 out: 3063 if (rv) 3064 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, 3065 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 3066 args->flags, lkb->lkb_wait_type, 3067 lkb->lkb_resource->res_name); 3068 return rv; 3069 } 3070 3071 /* 3072 * Four stage 4 varieties: 3073 * do_request(), do_convert(), do_unlock(), do_cancel() 3074 * These are called on the master node for the given lock and 3075 * from the central locking logic. 3076 */ 3077 3078 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3079 { 3080 int error = 0; 3081 3082 if (can_be_granted(r, lkb, 1, 0, NULL)) { 3083 grant_lock(r, lkb); 3084 queue_cast(r, lkb, 0); 3085 goto out; 3086 } 3087 3088 if (can_be_queued(lkb)) { 3089 error = -EINPROGRESS; 3090 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3091 add_timeout(lkb); 3092 goto out; 3093 } 3094 3095 error = -EAGAIN; 3096 queue_cast(r, lkb, -EAGAIN); 3097 out: 3098 return error; 3099 } 3100 3101 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3102 int error) 3103 { 3104 switch (error) { 3105 case -EAGAIN: 3106 if (force_blocking_asts(lkb)) 3107 send_blocking_asts_all(r, lkb); 3108 break; 3109 case -EINPROGRESS: 3110 send_blocking_asts(r, lkb); 3111 break; 3112 } 3113 } 3114 3115 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3116 { 3117 int error = 0; 3118 int deadlk = 0; 3119 3120 /* changing an existing lock may allow others to be granted */ 3121 3122 if (can_be_granted(r, lkb, 1, 0, &deadlk)) { 3123 grant_lock(r, lkb); 3124 queue_cast(r, lkb, 0); 3125 goto out; 3126 } 3127 3128 /* can_be_granted() detected that this lock would block in a conversion 3129 deadlock, so we leave it on the granted queue and return EDEADLK in 3130 the ast for the convert. */ 3131 3132 if (deadlk) { 3133 /* it's left on the granted queue */ 3134 revert_lock(r, lkb); 3135 queue_cast(r, lkb, -EDEADLK); 3136 error = -EDEADLK; 3137 goto out; 3138 } 3139 3140 /* is_demoted() means the can_be_granted() above set the grmode 3141 to NL, and left us on the granted queue. This auto-demotion 3142 (due to CONVDEADLK) might mean other locks, and/or this lock, are 3143 now grantable. We have to try to grant other converting locks 3144 before we try again to grant this one. */ 3145 3146 if (is_demoted(lkb)) { 3147 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 3148 if (_can_be_granted(r, lkb, 1, 0)) { 3149 grant_lock(r, lkb); 3150 queue_cast(r, lkb, 0); 3151 goto out; 3152 } 3153 /* else fall through and move to convert queue */ 3154 } 3155 3156 if (can_be_queued(lkb)) { 3157 error = -EINPROGRESS; 3158 del_lkb(r, lkb); 3159 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3160 add_timeout(lkb); 3161 goto out; 3162 } 3163 3164 error = -EAGAIN; 3165 queue_cast(r, lkb, -EAGAIN); 3166 out: 3167 return error; 3168 } 3169 3170 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3171 int error) 3172 { 3173 switch (error) { 3174 case 0: 3175 grant_pending_locks(r, NULL); 3176 /* grant_pending_locks also sends basts */ 3177 break; 3178 case -EAGAIN: 3179 if (force_blocking_asts(lkb)) 3180 send_blocking_asts_all(r, lkb); 3181 break; 3182 case -EINPROGRESS: 3183 send_blocking_asts(r, lkb); 3184 break; 3185 } 3186 } 3187 3188 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3189 { 3190 remove_lock(r, lkb); 3191 queue_cast(r, lkb, -DLM_EUNLOCK); 3192 return -DLM_EUNLOCK; 3193 } 3194 3195 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3196 int error) 3197 { 3198 grant_pending_locks(r, NULL); 3199 } 3200 3201 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3202 3203 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3204 { 3205 int error; 3206 3207 error = revert_lock(r, lkb); 3208 if (error) { 3209 queue_cast(r, lkb, -DLM_ECANCEL); 3210 return -DLM_ECANCEL; 3211 } 3212 return 0; 3213 } 3214 3215 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3216 int error) 3217 { 3218 if (error) 3219 grant_pending_locks(r, NULL); 3220 } 3221 3222 /* 3223 * Four stage 3 varieties: 3224 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 3225 */ 3226 3227 /* add a new lkb to a possibly new rsb, called by requesting process */ 3228 3229 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3230 { 3231 int error; 3232 3233 /* set_master: sets lkb nodeid from r */ 3234 3235 error = set_master(r, lkb); 3236 if (error < 0) 3237 goto out; 3238 if (error) { 3239 error = 0; 3240 goto out; 3241 } 3242 3243 if (is_remote(r)) { 3244 /* receive_request() calls do_request() on remote node */ 3245 error = send_request(r, lkb); 3246 } else { 3247 error = do_request(r, lkb); 3248 /* for remote locks the request_reply is sent 3249 between do_request and do_request_effects */ 3250 do_request_effects(r, lkb, error); 3251 } 3252 out: 3253 return error; 3254 } 3255 3256 /* change some property of an existing lkb, e.g. mode */ 3257 3258 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3259 { 3260 int error; 3261 3262 if (is_remote(r)) { 3263 /* receive_convert() calls do_convert() on remote node */ 3264 error = send_convert(r, lkb); 3265 } else { 3266 error = do_convert(r, lkb); 3267 /* for remote locks the convert_reply is sent 3268 between do_convert and do_convert_effects */ 3269 do_convert_effects(r, lkb, error); 3270 } 3271 3272 return error; 3273 } 3274 3275 /* remove an existing lkb from the granted queue */ 3276 3277 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3278 { 3279 int error; 3280 3281 if (is_remote(r)) { 3282 /* receive_unlock() calls do_unlock() on remote node */ 3283 error = send_unlock(r, lkb); 3284 } else { 3285 error = do_unlock(r, lkb); 3286 /* for remote locks the unlock_reply is sent 3287 between do_unlock and do_unlock_effects */ 3288 do_unlock_effects(r, lkb, error); 3289 } 3290 3291 return error; 3292 } 3293 3294 /* remove an existing lkb from the convert or wait queue */ 3295 3296 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3297 { 3298 int error; 3299 3300 if (is_remote(r)) { 3301 /* receive_cancel() calls do_cancel() on remote node */ 3302 error = send_cancel(r, lkb); 3303 } else { 3304 error = do_cancel(r, lkb); 3305 /* for remote locks the cancel_reply is sent 3306 between do_cancel and do_cancel_effects */ 3307 do_cancel_effects(r, lkb, error); 3308 } 3309 3310 return error; 3311 } 3312 3313 /* 3314 * Four stage 2 varieties: 3315 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 3316 */ 3317 3318 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, 3319 int len, struct dlm_args *args) 3320 { 3321 struct dlm_rsb *r; 3322 int error; 3323 3324 error = validate_lock_args(ls, lkb, args); 3325 if (error) 3326 return error; 3327 3328 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 3329 if (error) 3330 return error; 3331 3332 lock_rsb(r); 3333 3334 attach_lkb(r, lkb); 3335 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 3336 3337 error = _request_lock(r, lkb); 3338 3339 unlock_rsb(r); 3340 put_rsb(r); 3341 return error; 3342 } 3343 3344 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3345 struct dlm_args *args) 3346 { 3347 struct dlm_rsb *r; 3348 int error; 3349 3350 r = lkb->lkb_resource; 3351 3352 hold_rsb(r); 3353 lock_rsb(r); 3354 3355 error = validate_lock_args(ls, lkb, args); 3356 if (error) 3357 goto out; 3358 3359 error = _convert_lock(r, lkb); 3360 out: 3361 unlock_rsb(r); 3362 put_rsb(r); 3363 return error; 3364 } 3365 3366 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3367 struct dlm_args *args) 3368 { 3369 struct dlm_rsb *r; 3370 int error; 3371 3372 r = lkb->lkb_resource; 3373 3374 hold_rsb(r); 3375 lock_rsb(r); 3376 3377 error = validate_unlock_args(lkb, args); 3378 if (error) 3379 goto out; 3380 3381 error = _unlock_lock(r, lkb); 3382 out: 3383 unlock_rsb(r); 3384 put_rsb(r); 3385 return error; 3386 } 3387 3388 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3389 struct dlm_args *args) 3390 { 3391 struct dlm_rsb *r; 3392 int error; 3393 3394 r = lkb->lkb_resource; 3395 3396 hold_rsb(r); 3397 lock_rsb(r); 3398 3399 error = validate_unlock_args(lkb, args); 3400 if (error) 3401 goto out; 3402 3403 error = _cancel_lock(r, lkb); 3404 out: 3405 unlock_rsb(r); 3406 put_rsb(r); 3407 return error; 3408 } 3409 3410 /* 3411 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 3412 */ 3413 3414 int dlm_lock(dlm_lockspace_t *lockspace, 3415 int mode, 3416 struct dlm_lksb *lksb, 3417 uint32_t flags, 3418 void *name, 3419 unsigned int namelen, 3420 uint32_t parent_lkid, 3421 void (*ast) (void *astarg), 3422 void *astarg, 3423 void (*bast) (void *astarg, int mode)) 3424 { 3425 struct dlm_ls *ls; 3426 struct dlm_lkb *lkb; 3427 struct dlm_args args; 3428 int error, convert = flags & DLM_LKF_CONVERT; 3429 3430 ls = dlm_find_lockspace_local(lockspace); 3431 if (!ls) 3432 return -EINVAL; 3433 3434 dlm_lock_recovery(ls); 3435 3436 if (convert) 3437 error = find_lkb(ls, lksb->sb_lkid, &lkb); 3438 else 3439 error = create_lkb(ls, &lkb); 3440 3441 if (error) 3442 goto out; 3443 3444 error = set_lock_args(mode, lksb, flags, namelen, 0, ast, 3445 astarg, bast, &args); 3446 if (error) 3447 goto out_put; 3448 3449 if (convert) 3450 error = convert_lock(ls, lkb, &args); 3451 else 3452 error = request_lock(ls, lkb, name, namelen, &args); 3453 3454 if (error == -EINPROGRESS) 3455 error = 0; 3456 out_put: 3457 if (convert || error) 3458 __put_lkb(ls, lkb); 3459 if (error == -EAGAIN || error == -EDEADLK) 3460 error = 0; 3461 out: 3462 dlm_unlock_recovery(ls); 3463 dlm_put_lockspace(ls); 3464 return error; 3465 } 3466 3467 int dlm_unlock(dlm_lockspace_t *lockspace, 3468 uint32_t lkid, 3469 uint32_t flags, 3470 struct dlm_lksb *lksb, 3471 void *astarg) 3472 { 3473 struct dlm_ls *ls; 3474 struct dlm_lkb *lkb; 3475 struct dlm_args args; 3476 int error; 3477 3478 ls = dlm_find_lockspace_local(lockspace); 3479 if (!ls) 3480 return -EINVAL; 3481 3482 dlm_lock_recovery(ls); 3483 3484 error = find_lkb(ls, lkid, &lkb); 3485 if (error) 3486 goto out; 3487 3488 error = set_unlock_args(flags, astarg, &args); 3489 if (error) 3490 goto out_put; 3491 3492 if (flags & DLM_LKF_CANCEL) 3493 error = cancel_lock(ls, lkb, &args); 3494 else 3495 error = unlock_lock(ls, lkb, &args); 3496 3497 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 3498 error = 0; 3499 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 3500 error = 0; 3501 out_put: 3502 dlm_put_lkb(lkb); 3503 out: 3504 dlm_unlock_recovery(ls); 3505 dlm_put_lockspace(ls); 3506 return error; 3507 } 3508 3509 /* 3510 * send/receive routines for remote operations and replies 3511 * 3512 * send_args 3513 * send_common 3514 * send_request receive_request 3515 * send_convert receive_convert 3516 * send_unlock receive_unlock 3517 * send_cancel receive_cancel 3518 * send_grant receive_grant 3519 * send_bast receive_bast 3520 * send_lookup receive_lookup 3521 * send_remove receive_remove 3522 * 3523 * send_common_reply 3524 * receive_request_reply send_request_reply 3525 * receive_convert_reply send_convert_reply 3526 * receive_unlock_reply send_unlock_reply 3527 * receive_cancel_reply send_cancel_reply 3528 * receive_lookup_reply send_lookup_reply 3529 */ 3530 3531 static int _create_message(struct dlm_ls *ls, int mb_len, 3532 int to_nodeid, int mstype, 3533 struct dlm_message **ms_ret, 3534 struct dlm_mhandle **mh_ret) 3535 { 3536 struct dlm_message *ms; 3537 struct dlm_mhandle *mh; 3538 char *mb; 3539 3540 /* get_buffer gives us a message handle (mh) that we need to 3541 pass into lowcomms_commit and a message buffer (mb) that we 3542 write our data into */ 3543 3544 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb); 3545 if (!mh) 3546 return -ENOBUFS; 3547 3548 memset(mb, 0, mb_len); 3549 3550 ms = (struct dlm_message *) mb; 3551 3552 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 3553 ms->m_header.h_lockspace = ls->ls_global_id; 3554 ms->m_header.h_nodeid = dlm_our_nodeid(); 3555 ms->m_header.h_length = mb_len; 3556 ms->m_header.h_cmd = DLM_MSG; 3557 3558 ms->m_type = mstype; 3559 3560 *mh_ret = mh; 3561 *ms_ret = ms; 3562 return 0; 3563 } 3564 3565 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 3566 int to_nodeid, int mstype, 3567 struct dlm_message **ms_ret, 3568 struct dlm_mhandle **mh_ret) 3569 { 3570 int mb_len = sizeof(struct dlm_message); 3571 3572 switch (mstype) { 3573 case DLM_MSG_REQUEST: 3574 case DLM_MSG_LOOKUP: 3575 case DLM_MSG_REMOVE: 3576 mb_len += r->res_length; 3577 break; 3578 case DLM_MSG_CONVERT: 3579 case DLM_MSG_UNLOCK: 3580 case DLM_MSG_REQUEST_REPLY: 3581 case DLM_MSG_CONVERT_REPLY: 3582 case DLM_MSG_GRANT: 3583 if (lkb && lkb->lkb_lvbptr) 3584 mb_len += r->res_ls->ls_lvblen; 3585 break; 3586 } 3587 3588 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 3589 ms_ret, mh_ret); 3590 } 3591 3592 /* further lowcomms enhancements or alternate implementations may make 3593 the return value from this function useful at some point */ 3594 3595 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) 3596 { 3597 dlm_message_out(ms); 3598 dlm_lowcomms_commit_buffer(mh); 3599 return 0; 3600 } 3601 3602 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 3603 struct dlm_message *ms) 3604 { 3605 ms->m_nodeid = lkb->lkb_nodeid; 3606 ms->m_pid = lkb->lkb_ownpid; 3607 ms->m_lkid = lkb->lkb_id; 3608 ms->m_remid = lkb->lkb_remid; 3609 ms->m_exflags = lkb->lkb_exflags; 3610 ms->m_sbflags = lkb->lkb_sbflags; 3611 ms->m_flags = lkb->lkb_flags; 3612 ms->m_lvbseq = lkb->lkb_lvbseq; 3613 ms->m_status = lkb->lkb_status; 3614 ms->m_grmode = lkb->lkb_grmode; 3615 ms->m_rqmode = lkb->lkb_rqmode; 3616 ms->m_hash = r->res_hash; 3617 3618 /* m_result and m_bastmode are set from function args, 3619 not from lkb fields */ 3620 3621 if (lkb->lkb_bastfn) 3622 ms->m_asts |= DLM_CB_BAST; 3623 if (lkb->lkb_astfn) 3624 ms->m_asts |= DLM_CB_CAST; 3625 3626 /* compare with switch in create_message; send_remove() doesn't 3627 use send_args() */ 3628 3629 switch (ms->m_type) { 3630 case DLM_MSG_REQUEST: 3631 case DLM_MSG_LOOKUP: 3632 memcpy(ms->m_extra, r->res_name, r->res_length); 3633 break; 3634 case DLM_MSG_CONVERT: 3635 case DLM_MSG_UNLOCK: 3636 case DLM_MSG_REQUEST_REPLY: 3637 case DLM_MSG_CONVERT_REPLY: 3638 case DLM_MSG_GRANT: 3639 if (!lkb->lkb_lvbptr) 3640 break; 3641 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 3642 break; 3643 } 3644 } 3645 3646 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 3647 { 3648 struct dlm_message *ms; 3649 struct dlm_mhandle *mh; 3650 int to_nodeid, error; 3651 3652 to_nodeid = r->res_nodeid; 3653 3654 error = add_to_waiters(lkb, mstype, to_nodeid); 3655 if (error) 3656 return error; 3657 3658 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3659 if (error) 3660 goto fail; 3661 3662 send_args(r, lkb, ms); 3663 3664 error = send_message(mh, ms); 3665 if (error) 3666 goto fail; 3667 return 0; 3668 3669 fail: 3670 remove_from_waiters(lkb, msg_reply_type(mstype)); 3671 return error; 3672 } 3673 3674 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3675 { 3676 return send_common(r, lkb, DLM_MSG_REQUEST); 3677 } 3678 3679 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3680 { 3681 int error; 3682 3683 error = send_common(r, lkb, DLM_MSG_CONVERT); 3684 3685 /* down conversions go without a reply from the master */ 3686 if (!error && down_conversion(lkb)) { 3687 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 3688 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS; 3689 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 3690 r->res_ls->ls_stub_ms.m_result = 0; 3691 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 3692 } 3693 3694 return error; 3695 } 3696 3697 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 3698 MASTER_UNCERTAIN to force the next request on the rsb to confirm 3699 that the master is still correct. */ 3700 3701 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3702 { 3703 return send_common(r, lkb, DLM_MSG_UNLOCK); 3704 } 3705 3706 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3707 { 3708 return send_common(r, lkb, DLM_MSG_CANCEL); 3709 } 3710 3711 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 3712 { 3713 struct dlm_message *ms; 3714 struct dlm_mhandle *mh; 3715 int to_nodeid, error; 3716 3717 to_nodeid = lkb->lkb_nodeid; 3718 3719 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 3720 if (error) 3721 goto out; 3722 3723 send_args(r, lkb, ms); 3724 3725 ms->m_result = 0; 3726 3727 error = send_message(mh, ms); 3728 out: 3729 return error; 3730 } 3731 3732 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 3733 { 3734 struct dlm_message *ms; 3735 struct dlm_mhandle *mh; 3736 int to_nodeid, error; 3737 3738 to_nodeid = lkb->lkb_nodeid; 3739 3740 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 3741 if (error) 3742 goto out; 3743 3744 send_args(r, lkb, ms); 3745 3746 ms->m_bastmode = mode; 3747 3748 error = send_message(mh, ms); 3749 out: 3750 return error; 3751 } 3752 3753 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 3754 { 3755 struct dlm_message *ms; 3756 struct dlm_mhandle *mh; 3757 int to_nodeid, error; 3758 3759 to_nodeid = dlm_dir_nodeid(r); 3760 3761 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); 3762 if (error) 3763 return error; 3764 3765 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 3766 if (error) 3767 goto fail; 3768 3769 send_args(r, lkb, ms); 3770 3771 error = send_message(mh, ms); 3772 if (error) 3773 goto fail; 3774 return 0; 3775 3776 fail: 3777 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3778 return error; 3779 } 3780 3781 static int send_remove(struct dlm_rsb *r) 3782 { 3783 struct dlm_message *ms; 3784 struct dlm_mhandle *mh; 3785 int to_nodeid, error; 3786 3787 to_nodeid = dlm_dir_nodeid(r); 3788 3789 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 3790 if (error) 3791 goto out; 3792 3793 memcpy(ms->m_extra, r->res_name, r->res_length); 3794 ms->m_hash = r->res_hash; 3795 3796 error = send_message(mh, ms); 3797 out: 3798 return error; 3799 } 3800 3801 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3802 int mstype, int rv) 3803 { 3804 struct dlm_message *ms; 3805 struct dlm_mhandle *mh; 3806 int to_nodeid, error; 3807 3808 to_nodeid = lkb->lkb_nodeid; 3809 3810 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3811 if (error) 3812 goto out; 3813 3814 send_args(r, lkb, ms); 3815 3816 ms->m_result = rv; 3817 3818 error = send_message(mh, ms); 3819 out: 3820 return error; 3821 } 3822 3823 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3824 { 3825 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 3826 } 3827 3828 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3829 { 3830 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 3831 } 3832 3833 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3834 { 3835 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 3836 } 3837 3838 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3839 { 3840 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 3841 } 3842 3843 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 3844 int ret_nodeid, int rv) 3845 { 3846 struct dlm_rsb *r = &ls->ls_stub_rsb; 3847 struct dlm_message *ms; 3848 struct dlm_mhandle *mh; 3849 int error, nodeid = ms_in->m_header.h_nodeid; 3850 3851 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 3852 if (error) 3853 goto out; 3854 3855 ms->m_lkid = ms_in->m_lkid; 3856 ms->m_result = rv; 3857 ms->m_nodeid = ret_nodeid; 3858 3859 error = send_message(mh, ms); 3860 out: 3861 return error; 3862 } 3863 3864 /* which args we save from a received message depends heavily on the type 3865 of message, unlike the send side where we can safely send everything about 3866 the lkb for any type of message */ 3867 3868 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 3869 { 3870 lkb->lkb_exflags = ms->m_exflags; 3871 lkb->lkb_sbflags = ms->m_sbflags; 3872 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3873 (ms->m_flags & 0x0000FFFF); 3874 } 3875 3876 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3877 { 3878 if (ms->m_flags == DLM_IFL_STUB_MS) 3879 return; 3880 3881 lkb->lkb_sbflags = ms->m_sbflags; 3882 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3883 (ms->m_flags & 0x0000FFFF); 3884 } 3885 3886 static int receive_extralen(struct dlm_message *ms) 3887 { 3888 return (ms->m_header.h_length - sizeof(struct dlm_message)); 3889 } 3890 3891 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 3892 struct dlm_message *ms) 3893 { 3894 int len; 3895 3896 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3897 if (!lkb->lkb_lvbptr) 3898 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3899 if (!lkb->lkb_lvbptr) 3900 return -ENOMEM; 3901 len = receive_extralen(ms); 3902 if (len > DLM_RESNAME_MAXLEN) 3903 len = DLM_RESNAME_MAXLEN; 3904 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 3905 } 3906 return 0; 3907 } 3908 3909 static void fake_bastfn(void *astparam, int mode) 3910 { 3911 log_print("fake_bastfn should not be called"); 3912 } 3913 3914 static void fake_astfn(void *astparam) 3915 { 3916 log_print("fake_astfn should not be called"); 3917 } 3918 3919 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3920 struct dlm_message *ms) 3921 { 3922 lkb->lkb_nodeid = ms->m_header.h_nodeid; 3923 lkb->lkb_ownpid = ms->m_pid; 3924 lkb->lkb_remid = ms->m_lkid; 3925 lkb->lkb_grmode = DLM_LOCK_IV; 3926 lkb->lkb_rqmode = ms->m_rqmode; 3927 3928 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 3929 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 3930 3931 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3932 /* lkb was just created so there won't be an lvb yet */ 3933 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3934 if (!lkb->lkb_lvbptr) 3935 return -ENOMEM; 3936 } 3937 3938 return 0; 3939 } 3940 3941 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3942 struct dlm_message *ms) 3943 { 3944 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3945 return -EBUSY; 3946 3947 if (receive_lvb(ls, lkb, ms)) 3948 return -ENOMEM; 3949 3950 lkb->lkb_rqmode = ms->m_rqmode; 3951 lkb->lkb_lvbseq = ms->m_lvbseq; 3952 3953 return 0; 3954 } 3955 3956 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3957 struct dlm_message *ms) 3958 { 3959 if (receive_lvb(ls, lkb, ms)) 3960 return -ENOMEM; 3961 return 0; 3962 } 3963 3964 /* We fill in the stub-lkb fields with the info that send_xxxx_reply() 3965 uses to send a reply and that the remote end uses to process the reply. */ 3966 3967 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 3968 { 3969 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 3970 lkb->lkb_nodeid = ms->m_header.h_nodeid; 3971 lkb->lkb_remid = ms->m_lkid; 3972 } 3973 3974 /* This is called after the rsb is locked so that we can safely inspect 3975 fields in the lkb. */ 3976 3977 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) 3978 { 3979 int from = ms->m_header.h_nodeid; 3980 int error = 0; 3981 3982 switch (ms->m_type) { 3983 case DLM_MSG_CONVERT: 3984 case DLM_MSG_UNLOCK: 3985 case DLM_MSG_CANCEL: 3986 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) 3987 error = -EINVAL; 3988 break; 3989 3990 case DLM_MSG_CONVERT_REPLY: 3991 case DLM_MSG_UNLOCK_REPLY: 3992 case DLM_MSG_CANCEL_REPLY: 3993 case DLM_MSG_GRANT: 3994 case DLM_MSG_BAST: 3995 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) 3996 error = -EINVAL; 3997 break; 3998 3999 case DLM_MSG_REQUEST_REPLY: 4000 if (!is_process_copy(lkb)) 4001 error = -EINVAL; 4002 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) 4003 error = -EINVAL; 4004 break; 4005 4006 default: 4007 error = -EINVAL; 4008 } 4009 4010 if (error) 4011 log_error(lkb->lkb_resource->res_ls, 4012 "ignore invalid message %d from %d %x %x %x %d", 4013 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, 4014 lkb->lkb_flags, lkb->lkb_nodeid); 4015 return error; 4016 } 4017 4018 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) 4019 { 4020 char name[DLM_RESNAME_MAXLEN + 1]; 4021 struct dlm_message *ms; 4022 struct dlm_mhandle *mh; 4023 struct dlm_rsb *r; 4024 uint32_t hash, b; 4025 int rv, dir_nodeid; 4026 4027 memset(name, 0, sizeof(name)); 4028 memcpy(name, ms_name, len); 4029 4030 hash = jhash(name, len, 0); 4031 b = hash & (ls->ls_rsbtbl_size - 1); 4032 4033 dir_nodeid = dlm_hash2nodeid(ls, hash); 4034 4035 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name); 4036 4037 spin_lock(&ls->ls_rsbtbl[b].lock); 4038 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4039 if (!rv) { 4040 spin_unlock(&ls->ls_rsbtbl[b].lock); 4041 log_error(ls, "repeat_remove on keep %s", name); 4042 return; 4043 } 4044 4045 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4046 if (!rv) { 4047 spin_unlock(&ls->ls_rsbtbl[b].lock); 4048 log_error(ls, "repeat_remove on toss %s", name); 4049 return; 4050 } 4051 4052 /* use ls->remove_name2 to avoid conflict with shrink? */ 4053 4054 spin_lock(&ls->ls_remove_spin); 4055 ls->ls_remove_len = len; 4056 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 4057 spin_unlock(&ls->ls_remove_spin); 4058 spin_unlock(&ls->ls_rsbtbl[b].lock); 4059 4060 rv = _create_message(ls, sizeof(struct dlm_message) + len, 4061 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); 4062 if (rv) 4063 return; 4064 4065 memcpy(ms->m_extra, name, len); 4066 ms->m_hash = hash; 4067 4068 send_message(mh, ms); 4069 4070 spin_lock(&ls->ls_remove_spin); 4071 ls->ls_remove_len = 0; 4072 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 4073 spin_unlock(&ls->ls_remove_spin); 4074 } 4075 4076 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) 4077 { 4078 struct dlm_lkb *lkb; 4079 struct dlm_rsb *r; 4080 int from_nodeid; 4081 int error, namelen = 0; 4082 4083 from_nodeid = ms->m_header.h_nodeid; 4084 4085 error = create_lkb(ls, &lkb); 4086 if (error) 4087 goto fail; 4088 4089 receive_flags(lkb, ms); 4090 lkb->lkb_flags |= DLM_IFL_MSTCPY; 4091 error = receive_request_args(ls, lkb, ms); 4092 if (error) { 4093 __put_lkb(ls, lkb); 4094 goto fail; 4095 } 4096 4097 /* The dir node is the authority on whether we are the master 4098 for this rsb or not, so if the master sends us a request, we should 4099 recreate the rsb if we've destroyed it. This race happens when we 4100 send a remove message to the dir node at the same time that the dir 4101 node sends us a request for the rsb. */ 4102 4103 namelen = receive_extralen(ms); 4104 4105 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, 4106 R_RECEIVE_REQUEST, &r); 4107 if (error) { 4108 __put_lkb(ls, lkb); 4109 goto fail; 4110 } 4111 4112 lock_rsb(r); 4113 4114 if (r->res_master_nodeid != dlm_our_nodeid()) { 4115 error = validate_master_nodeid(ls, r, from_nodeid); 4116 if (error) { 4117 unlock_rsb(r); 4118 put_rsb(r); 4119 __put_lkb(ls, lkb); 4120 goto fail; 4121 } 4122 } 4123 4124 attach_lkb(r, lkb); 4125 error = do_request(r, lkb); 4126 send_request_reply(r, lkb, error); 4127 do_request_effects(r, lkb, error); 4128 4129 unlock_rsb(r); 4130 put_rsb(r); 4131 4132 if (error == -EINPROGRESS) 4133 error = 0; 4134 if (error) 4135 dlm_put_lkb(lkb); 4136 return 0; 4137 4138 fail: 4139 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup 4140 and do this receive_request again from process_lookup_list once 4141 we get the lookup reply. This would avoid a many repeated 4142 ENOTBLK request failures when the lookup reply designating us 4143 as master is delayed. */ 4144 4145 /* We could repeatedly return -EBADR here if our send_remove() is 4146 delayed in being sent/arriving/being processed on the dir node. 4147 Another node would repeatedly lookup up the master, and the dir 4148 node would continue returning our nodeid until our send_remove 4149 took effect. 4150 4151 We send another remove message in case our previous send_remove 4152 was lost/ignored/missed somehow. */ 4153 4154 if (error != -ENOTBLK) { 4155 log_limit(ls, "receive_request %x from %d %d", 4156 ms->m_lkid, from_nodeid, error); 4157 } 4158 4159 if (namelen && error == -EBADR) { 4160 send_repeat_remove(ls, ms->m_extra, namelen); 4161 msleep(1000); 4162 } 4163 4164 setup_stub_lkb(ls, ms); 4165 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4166 return error; 4167 } 4168 4169 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 4170 { 4171 struct dlm_lkb *lkb; 4172 struct dlm_rsb *r; 4173 int error, reply = 1; 4174 4175 error = find_lkb(ls, ms->m_remid, &lkb); 4176 if (error) 4177 goto fail; 4178 4179 if (lkb->lkb_remid != ms->m_lkid) { 4180 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 4181 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 4182 (unsigned long long)lkb->lkb_recover_seq, 4183 ms->m_header.h_nodeid, ms->m_lkid); 4184 error = -ENOENT; 4185 goto fail; 4186 } 4187 4188 r = lkb->lkb_resource; 4189 4190 hold_rsb(r); 4191 lock_rsb(r); 4192 4193 error = validate_message(lkb, ms); 4194 if (error) 4195 goto out; 4196 4197 receive_flags(lkb, ms); 4198 4199 error = receive_convert_args(ls, lkb, ms); 4200 if (error) { 4201 send_convert_reply(r, lkb, error); 4202 goto out; 4203 } 4204 4205 reply = !down_conversion(lkb); 4206 4207 error = do_convert(r, lkb); 4208 if (reply) 4209 send_convert_reply(r, lkb, error); 4210 do_convert_effects(r, lkb, error); 4211 out: 4212 unlock_rsb(r); 4213 put_rsb(r); 4214 dlm_put_lkb(lkb); 4215 return 0; 4216 4217 fail: 4218 setup_stub_lkb(ls, ms); 4219 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4220 return error; 4221 } 4222 4223 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 4224 { 4225 struct dlm_lkb *lkb; 4226 struct dlm_rsb *r; 4227 int error; 4228 4229 error = find_lkb(ls, ms->m_remid, &lkb); 4230 if (error) 4231 goto fail; 4232 4233 if (lkb->lkb_remid != ms->m_lkid) { 4234 log_error(ls, "receive_unlock %x remid %x remote %d %x", 4235 lkb->lkb_id, lkb->lkb_remid, 4236 ms->m_header.h_nodeid, ms->m_lkid); 4237 error = -ENOENT; 4238 goto fail; 4239 } 4240 4241 r = lkb->lkb_resource; 4242 4243 hold_rsb(r); 4244 lock_rsb(r); 4245 4246 error = validate_message(lkb, ms); 4247 if (error) 4248 goto out; 4249 4250 receive_flags(lkb, ms); 4251 4252 error = receive_unlock_args(ls, lkb, ms); 4253 if (error) { 4254 send_unlock_reply(r, lkb, error); 4255 goto out; 4256 } 4257 4258 error = do_unlock(r, lkb); 4259 send_unlock_reply(r, lkb, error); 4260 do_unlock_effects(r, lkb, error); 4261 out: 4262 unlock_rsb(r); 4263 put_rsb(r); 4264 dlm_put_lkb(lkb); 4265 return 0; 4266 4267 fail: 4268 setup_stub_lkb(ls, ms); 4269 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4270 return error; 4271 } 4272 4273 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 4274 { 4275 struct dlm_lkb *lkb; 4276 struct dlm_rsb *r; 4277 int error; 4278 4279 error = find_lkb(ls, ms->m_remid, &lkb); 4280 if (error) 4281 goto fail; 4282 4283 receive_flags(lkb, ms); 4284 4285 r = lkb->lkb_resource; 4286 4287 hold_rsb(r); 4288 lock_rsb(r); 4289 4290 error = validate_message(lkb, ms); 4291 if (error) 4292 goto out; 4293 4294 error = do_cancel(r, lkb); 4295 send_cancel_reply(r, lkb, error); 4296 do_cancel_effects(r, lkb, error); 4297 out: 4298 unlock_rsb(r); 4299 put_rsb(r); 4300 dlm_put_lkb(lkb); 4301 return 0; 4302 4303 fail: 4304 setup_stub_lkb(ls, ms); 4305 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4306 return error; 4307 } 4308 4309 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 4310 { 4311 struct dlm_lkb *lkb; 4312 struct dlm_rsb *r; 4313 int error; 4314 4315 error = find_lkb(ls, ms->m_remid, &lkb); 4316 if (error) 4317 return error; 4318 4319 r = lkb->lkb_resource; 4320 4321 hold_rsb(r); 4322 lock_rsb(r); 4323 4324 error = validate_message(lkb, ms); 4325 if (error) 4326 goto out; 4327 4328 receive_flags_reply(lkb, ms); 4329 if (is_altmode(lkb)) 4330 munge_altmode(lkb, ms); 4331 grant_lock_pc(r, lkb, ms); 4332 queue_cast(r, lkb, 0); 4333 out: 4334 unlock_rsb(r); 4335 put_rsb(r); 4336 dlm_put_lkb(lkb); 4337 return 0; 4338 } 4339 4340 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 4341 { 4342 struct dlm_lkb *lkb; 4343 struct dlm_rsb *r; 4344 int error; 4345 4346 error = find_lkb(ls, ms->m_remid, &lkb); 4347 if (error) 4348 return error; 4349 4350 r = lkb->lkb_resource; 4351 4352 hold_rsb(r); 4353 lock_rsb(r); 4354 4355 error = validate_message(lkb, ms); 4356 if (error) 4357 goto out; 4358 4359 queue_bast(r, lkb, ms->m_bastmode); 4360 lkb->lkb_highbast = ms->m_bastmode; 4361 out: 4362 unlock_rsb(r); 4363 put_rsb(r); 4364 dlm_put_lkb(lkb); 4365 return 0; 4366 } 4367 4368 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4369 { 4370 int len, error, ret_nodeid, from_nodeid, our_nodeid; 4371 4372 from_nodeid = ms->m_header.h_nodeid; 4373 our_nodeid = dlm_our_nodeid(); 4374 4375 len = receive_extralen(ms); 4376 4377 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, 4378 &ret_nodeid, NULL); 4379 4380 /* Optimization: we're master so treat lookup as a request */ 4381 if (!error && ret_nodeid == our_nodeid) { 4382 receive_request(ls, ms); 4383 return; 4384 } 4385 send_lookup_reply(ls, ms, ret_nodeid, error); 4386 } 4387 4388 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4389 { 4390 char name[DLM_RESNAME_MAXLEN+1]; 4391 struct dlm_rsb *r; 4392 uint32_t hash, b; 4393 int rv, len, dir_nodeid, from_nodeid; 4394 4395 from_nodeid = ms->m_header.h_nodeid; 4396 4397 len = receive_extralen(ms); 4398 4399 if (len > DLM_RESNAME_MAXLEN) { 4400 log_error(ls, "receive_remove from %d bad len %d", 4401 from_nodeid, len); 4402 return; 4403 } 4404 4405 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 4406 if (dir_nodeid != dlm_our_nodeid()) { 4407 log_error(ls, "receive_remove from %d bad nodeid %d", 4408 from_nodeid, dir_nodeid); 4409 return; 4410 } 4411 4412 /* Look for name on rsbtbl.toss, if it's there, kill it. 4413 If it's on rsbtbl.keep, it's being used, and we should ignore this 4414 message. This is an expected race between the dir node sending a 4415 request to the master node at the same time as the master node sends 4416 a remove to the dir node. The resolution to that race is for the 4417 dir node to ignore the remove message, and the master node to 4418 recreate the master rsb when it gets a request from the dir node for 4419 an rsb it doesn't have. */ 4420 4421 memset(name, 0, sizeof(name)); 4422 memcpy(name, ms->m_extra, len); 4423 4424 hash = jhash(name, len, 0); 4425 b = hash & (ls->ls_rsbtbl_size - 1); 4426 4427 spin_lock(&ls->ls_rsbtbl[b].lock); 4428 4429 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4430 if (rv) { 4431 /* verify the rsb is on keep list per comment above */ 4432 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4433 if (rv) { 4434 /* should not happen */ 4435 log_error(ls, "receive_remove from %d not found %s", 4436 from_nodeid, name); 4437 spin_unlock(&ls->ls_rsbtbl[b].lock); 4438 return; 4439 } 4440 if (r->res_master_nodeid != from_nodeid) { 4441 /* should not happen */ 4442 log_error(ls, "receive_remove keep from %d master %d", 4443 from_nodeid, r->res_master_nodeid); 4444 dlm_print_rsb(r); 4445 spin_unlock(&ls->ls_rsbtbl[b].lock); 4446 return; 4447 } 4448 4449 log_debug(ls, "receive_remove from %d master %d first %x %s", 4450 from_nodeid, r->res_master_nodeid, r->res_first_lkid, 4451 name); 4452 spin_unlock(&ls->ls_rsbtbl[b].lock); 4453 return; 4454 } 4455 4456 if (r->res_master_nodeid != from_nodeid) { 4457 log_error(ls, "receive_remove toss from %d master %d", 4458 from_nodeid, r->res_master_nodeid); 4459 dlm_print_rsb(r); 4460 spin_unlock(&ls->ls_rsbtbl[b].lock); 4461 return; 4462 } 4463 4464 if (kref_put(&r->res_ref, kill_rsb)) { 4465 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 4466 spin_unlock(&ls->ls_rsbtbl[b].lock); 4467 dlm_free_rsb(r); 4468 } else { 4469 log_error(ls, "receive_remove from %d rsb ref error", 4470 from_nodeid); 4471 dlm_print_rsb(r); 4472 spin_unlock(&ls->ls_rsbtbl[b].lock); 4473 } 4474 } 4475 4476 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4477 { 4478 do_purge(ls, ms->m_nodeid, ms->m_pid); 4479 } 4480 4481 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 4482 { 4483 struct dlm_lkb *lkb; 4484 struct dlm_rsb *r; 4485 int error, mstype, result; 4486 int from_nodeid = ms->m_header.h_nodeid; 4487 4488 error = find_lkb(ls, ms->m_remid, &lkb); 4489 if (error) 4490 return error; 4491 4492 r = lkb->lkb_resource; 4493 hold_rsb(r); 4494 lock_rsb(r); 4495 4496 error = validate_message(lkb, ms); 4497 if (error) 4498 goto out; 4499 4500 mstype = lkb->lkb_wait_type; 4501 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4502 if (error) { 4503 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4504 lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result); 4505 dlm_dump_rsb(r); 4506 goto out; 4507 } 4508 4509 /* Optimization: the dir node was also the master, so it took our 4510 lookup as a request and sent request reply instead of lookup reply */ 4511 if (mstype == DLM_MSG_LOOKUP) { 4512 r->res_master_nodeid = from_nodeid; 4513 r->res_nodeid = from_nodeid; 4514 lkb->lkb_nodeid = from_nodeid; 4515 } 4516 4517 /* this is the value returned from do_request() on the master */ 4518 result = ms->m_result; 4519 4520 switch (result) { 4521 case -EAGAIN: 4522 /* request would block (be queued) on remote master */ 4523 queue_cast(r, lkb, -EAGAIN); 4524 confirm_master(r, -EAGAIN); 4525 unhold_lkb(lkb); /* undoes create_lkb() */ 4526 break; 4527 4528 case -EINPROGRESS: 4529 case 0: 4530 /* request was queued or granted on remote master */ 4531 receive_flags_reply(lkb, ms); 4532 lkb->lkb_remid = ms->m_lkid; 4533 if (is_altmode(lkb)) 4534 munge_altmode(lkb, ms); 4535 if (result) { 4536 add_lkb(r, lkb, DLM_LKSTS_WAITING); 4537 add_timeout(lkb); 4538 } else { 4539 grant_lock_pc(r, lkb, ms); 4540 queue_cast(r, lkb, 0); 4541 } 4542 confirm_master(r, result); 4543 break; 4544 4545 case -EBADR: 4546 case -ENOTBLK: 4547 /* find_rsb failed to find rsb or rsb wasn't master */ 4548 log_limit(ls, "receive_request_reply %x from %d %d " 4549 "master %d dir %d first %x %s", lkb->lkb_id, 4550 from_nodeid, result, r->res_master_nodeid, 4551 r->res_dir_nodeid, r->res_first_lkid, r->res_name); 4552 4553 if (r->res_dir_nodeid != dlm_our_nodeid() && 4554 r->res_master_nodeid != dlm_our_nodeid()) { 4555 /* cause _request_lock->set_master->send_lookup */ 4556 r->res_master_nodeid = 0; 4557 r->res_nodeid = -1; 4558 lkb->lkb_nodeid = -1; 4559 } 4560 4561 if (is_overlap(lkb)) { 4562 /* we'll ignore error in cancel/unlock reply */ 4563 queue_cast_overlap(r, lkb); 4564 confirm_master(r, result); 4565 unhold_lkb(lkb); /* undoes create_lkb() */ 4566 } else { 4567 _request_lock(r, lkb); 4568 4569 if (r->res_master_nodeid == dlm_our_nodeid()) 4570 confirm_master(r, 0); 4571 } 4572 break; 4573 4574 default: 4575 log_error(ls, "receive_request_reply %x error %d", 4576 lkb->lkb_id, result); 4577 } 4578 4579 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { 4580 log_debug(ls, "receive_request_reply %x result %d unlock", 4581 lkb->lkb_id, result); 4582 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4583 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4584 send_unlock(r, lkb); 4585 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { 4586 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 4587 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4588 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4589 send_cancel(r, lkb); 4590 } else { 4591 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4592 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4593 } 4594 out: 4595 unlock_rsb(r); 4596 put_rsb(r); 4597 dlm_put_lkb(lkb); 4598 return 0; 4599 } 4600 4601 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 4602 struct dlm_message *ms) 4603 { 4604 /* this is the value returned from do_convert() on the master */ 4605 switch (ms->m_result) { 4606 case -EAGAIN: 4607 /* convert would block (be queued) on remote master */ 4608 queue_cast(r, lkb, -EAGAIN); 4609 break; 4610 4611 case -EDEADLK: 4612 receive_flags_reply(lkb, ms); 4613 revert_lock_pc(r, lkb); 4614 queue_cast(r, lkb, -EDEADLK); 4615 break; 4616 4617 case -EINPROGRESS: 4618 /* convert was queued on remote master */ 4619 receive_flags_reply(lkb, ms); 4620 if (is_demoted(lkb)) 4621 munge_demoted(lkb); 4622 del_lkb(r, lkb); 4623 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 4624 add_timeout(lkb); 4625 break; 4626 4627 case 0: 4628 /* convert was granted on remote master */ 4629 receive_flags_reply(lkb, ms); 4630 if (is_demoted(lkb)) 4631 munge_demoted(lkb); 4632 grant_lock_pc(r, lkb, ms); 4633 queue_cast(r, lkb, 0); 4634 break; 4635 4636 default: 4637 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 4638 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, 4639 ms->m_result); 4640 dlm_print_rsb(r); 4641 dlm_print_lkb(lkb); 4642 } 4643 } 4644 4645 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4646 { 4647 struct dlm_rsb *r = lkb->lkb_resource; 4648 int error; 4649 4650 hold_rsb(r); 4651 lock_rsb(r); 4652 4653 error = validate_message(lkb, ms); 4654 if (error) 4655 goto out; 4656 4657 /* stub reply can happen with waiters_mutex held */ 4658 error = remove_from_waiters_ms(lkb, ms); 4659 if (error) 4660 goto out; 4661 4662 __receive_convert_reply(r, lkb, ms); 4663 out: 4664 unlock_rsb(r); 4665 put_rsb(r); 4666 } 4667 4668 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 4669 { 4670 struct dlm_lkb *lkb; 4671 int error; 4672 4673 error = find_lkb(ls, ms->m_remid, &lkb); 4674 if (error) 4675 return error; 4676 4677 _receive_convert_reply(lkb, ms); 4678 dlm_put_lkb(lkb); 4679 return 0; 4680 } 4681 4682 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4683 { 4684 struct dlm_rsb *r = lkb->lkb_resource; 4685 int error; 4686 4687 hold_rsb(r); 4688 lock_rsb(r); 4689 4690 error = validate_message(lkb, ms); 4691 if (error) 4692 goto out; 4693 4694 /* stub reply can happen with waiters_mutex held */ 4695 error = remove_from_waiters_ms(lkb, ms); 4696 if (error) 4697 goto out; 4698 4699 /* this is the value returned from do_unlock() on the master */ 4700 4701 switch (ms->m_result) { 4702 case -DLM_EUNLOCK: 4703 receive_flags_reply(lkb, ms); 4704 remove_lock_pc(r, lkb); 4705 queue_cast(r, lkb, -DLM_EUNLOCK); 4706 break; 4707 case -ENOENT: 4708 break; 4709 default: 4710 log_error(r->res_ls, "receive_unlock_reply %x error %d", 4711 lkb->lkb_id, ms->m_result); 4712 } 4713 out: 4714 unlock_rsb(r); 4715 put_rsb(r); 4716 } 4717 4718 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 4719 { 4720 struct dlm_lkb *lkb; 4721 int error; 4722 4723 error = find_lkb(ls, ms->m_remid, &lkb); 4724 if (error) 4725 return error; 4726 4727 _receive_unlock_reply(lkb, ms); 4728 dlm_put_lkb(lkb); 4729 return 0; 4730 } 4731 4732 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4733 { 4734 struct dlm_rsb *r = lkb->lkb_resource; 4735 int error; 4736 4737 hold_rsb(r); 4738 lock_rsb(r); 4739 4740 error = validate_message(lkb, ms); 4741 if (error) 4742 goto out; 4743 4744 /* stub reply can happen with waiters_mutex held */ 4745 error = remove_from_waiters_ms(lkb, ms); 4746 if (error) 4747 goto out; 4748 4749 /* this is the value returned from do_cancel() on the master */ 4750 4751 switch (ms->m_result) { 4752 case -DLM_ECANCEL: 4753 receive_flags_reply(lkb, ms); 4754 revert_lock_pc(r, lkb); 4755 queue_cast(r, lkb, -DLM_ECANCEL); 4756 break; 4757 case 0: 4758 break; 4759 default: 4760 log_error(r->res_ls, "receive_cancel_reply %x error %d", 4761 lkb->lkb_id, ms->m_result); 4762 } 4763 out: 4764 unlock_rsb(r); 4765 put_rsb(r); 4766 } 4767 4768 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 4769 { 4770 struct dlm_lkb *lkb; 4771 int error; 4772 4773 error = find_lkb(ls, ms->m_remid, &lkb); 4774 if (error) 4775 return error; 4776 4777 _receive_cancel_reply(lkb, ms); 4778 dlm_put_lkb(lkb); 4779 return 0; 4780 } 4781 4782 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 4783 { 4784 struct dlm_lkb *lkb; 4785 struct dlm_rsb *r; 4786 int error, ret_nodeid; 4787 int do_lookup_list = 0; 4788 4789 error = find_lkb(ls, ms->m_lkid, &lkb); 4790 if (error) { 4791 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid); 4792 return; 4793 } 4794 4795 /* ms->m_result is the value returned by dlm_master_lookup on dir node 4796 FIXME: will a non-zero error ever be returned? */ 4797 4798 r = lkb->lkb_resource; 4799 hold_rsb(r); 4800 lock_rsb(r); 4801 4802 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 4803 if (error) 4804 goto out; 4805 4806 ret_nodeid = ms->m_nodeid; 4807 4808 /* We sometimes receive a request from the dir node for this 4809 rsb before we've received the dir node's loookup_reply for it. 4810 The request from the dir node implies we're the master, so we set 4811 ourself as master in receive_request_reply, and verify here that 4812 we are indeed the master. */ 4813 4814 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { 4815 /* This should never happen */ 4816 log_error(ls, "receive_lookup_reply %x from %d ret %d " 4817 "master %d dir %d our %d first %x %s", 4818 lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid, 4819 r->res_master_nodeid, r->res_dir_nodeid, 4820 dlm_our_nodeid(), r->res_first_lkid, r->res_name); 4821 } 4822 4823 if (ret_nodeid == dlm_our_nodeid()) { 4824 r->res_master_nodeid = ret_nodeid; 4825 r->res_nodeid = 0; 4826 do_lookup_list = 1; 4827 r->res_first_lkid = 0; 4828 } else if (ret_nodeid == -1) { 4829 /* the remote node doesn't believe it's the dir node */ 4830 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", 4831 lkb->lkb_id, ms->m_header.h_nodeid); 4832 r->res_master_nodeid = 0; 4833 r->res_nodeid = -1; 4834 lkb->lkb_nodeid = -1; 4835 } else { 4836 /* set_master() will set lkb_nodeid from r */ 4837 r->res_master_nodeid = ret_nodeid; 4838 r->res_nodeid = ret_nodeid; 4839 } 4840 4841 if (is_overlap(lkb)) { 4842 log_debug(ls, "receive_lookup_reply %x unlock %x", 4843 lkb->lkb_id, lkb->lkb_flags); 4844 queue_cast_overlap(r, lkb); 4845 unhold_lkb(lkb); /* undoes create_lkb() */ 4846 goto out_list; 4847 } 4848 4849 _request_lock(r, lkb); 4850 4851 out_list: 4852 if (do_lookup_list) 4853 process_lookup_list(r); 4854 out: 4855 unlock_rsb(r); 4856 put_rsb(r); 4857 dlm_put_lkb(lkb); 4858 } 4859 4860 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4861 uint32_t saved_seq) 4862 { 4863 int error = 0, noent = 0; 4864 4865 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4866 log_limit(ls, "receive %d from non-member %d %x %x %d", 4867 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4868 ms->m_remid, ms->m_result); 4869 return; 4870 } 4871 4872 switch (ms->m_type) { 4873 4874 /* messages sent to a master node */ 4875 4876 case DLM_MSG_REQUEST: 4877 error = receive_request(ls, ms); 4878 break; 4879 4880 case DLM_MSG_CONVERT: 4881 error = receive_convert(ls, ms); 4882 break; 4883 4884 case DLM_MSG_UNLOCK: 4885 error = receive_unlock(ls, ms); 4886 break; 4887 4888 case DLM_MSG_CANCEL: 4889 noent = 1; 4890 error = receive_cancel(ls, ms); 4891 break; 4892 4893 /* messages sent from a master node (replies to above) */ 4894 4895 case DLM_MSG_REQUEST_REPLY: 4896 error = receive_request_reply(ls, ms); 4897 break; 4898 4899 case DLM_MSG_CONVERT_REPLY: 4900 error = receive_convert_reply(ls, ms); 4901 break; 4902 4903 case DLM_MSG_UNLOCK_REPLY: 4904 error = receive_unlock_reply(ls, ms); 4905 break; 4906 4907 case DLM_MSG_CANCEL_REPLY: 4908 error = receive_cancel_reply(ls, ms); 4909 break; 4910 4911 /* messages sent from a master node (only two types of async msg) */ 4912 4913 case DLM_MSG_GRANT: 4914 noent = 1; 4915 error = receive_grant(ls, ms); 4916 break; 4917 4918 case DLM_MSG_BAST: 4919 noent = 1; 4920 error = receive_bast(ls, ms); 4921 break; 4922 4923 /* messages sent to a dir node */ 4924 4925 case DLM_MSG_LOOKUP: 4926 receive_lookup(ls, ms); 4927 break; 4928 4929 case DLM_MSG_REMOVE: 4930 receive_remove(ls, ms); 4931 break; 4932 4933 /* messages sent from a dir node (remove has no reply) */ 4934 4935 case DLM_MSG_LOOKUP_REPLY: 4936 receive_lookup_reply(ls, ms); 4937 break; 4938 4939 /* other messages */ 4940 4941 case DLM_MSG_PURGE: 4942 receive_purge(ls, ms); 4943 break; 4944 4945 default: 4946 log_error(ls, "unknown message type %d", ms->m_type); 4947 } 4948 4949 /* 4950 * When checking for ENOENT, we're checking the result of 4951 * find_lkb(m_remid): 4952 * 4953 * The lock id referenced in the message wasn't found. This may 4954 * happen in normal usage for the async messages and cancel, so 4955 * only use log_debug for them. 4956 * 4957 * Some errors are expected and normal. 4958 */ 4959 4960 if (error == -ENOENT && noent) { 4961 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 4962 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4963 ms->m_lkid, saved_seq); 4964 } else if (error == -ENOENT) { 4965 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 4966 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4967 ms->m_lkid, saved_seq); 4968 4969 if (ms->m_type == DLM_MSG_CONVERT) 4970 dlm_dump_rsb_hash(ls, ms->m_hash); 4971 } 4972 4973 if (error == -EINVAL) { 4974 log_error(ls, "receive %d inval from %d lkid %x remid %x " 4975 "saved_seq %u", 4976 ms->m_type, ms->m_header.h_nodeid, 4977 ms->m_lkid, ms->m_remid, saved_seq); 4978 } 4979 } 4980 4981 /* If the lockspace is in recovery mode (locking stopped), then normal 4982 messages are saved on the requestqueue for processing after recovery is 4983 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 4984 messages off the requestqueue before we process new ones. This occurs right 4985 after recovery completes when we transition from saving all messages on 4986 requestqueue, to processing all the saved messages, to processing new 4987 messages as they arrive. */ 4988 4989 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4990 int nodeid) 4991 { 4992 if (dlm_locking_stopped(ls)) { 4993 /* If we were a member of this lockspace, left, and rejoined, 4994 other nodes may still be sending us messages from the 4995 lockspace generation before we left. */ 4996 if (!ls->ls_generation) { 4997 log_limit(ls, "receive %d from %d ignore old gen", 4998 ms->m_type, nodeid); 4999 return; 5000 } 5001 5002 dlm_add_requestqueue(ls, nodeid, ms); 5003 } else { 5004 dlm_wait_requestqueue(ls); 5005 _receive_message(ls, ms, 0); 5006 } 5007 } 5008 5009 /* This is called by dlm_recoverd to process messages that were saved on 5010 the requestqueue. */ 5011 5012 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 5013 uint32_t saved_seq) 5014 { 5015 _receive_message(ls, ms, saved_seq); 5016 } 5017 5018 /* This is called by the midcomms layer when something is received for 5019 the lockspace. It could be either a MSG (normal message sent as part of 5020 standard locking activity) or an RCOM (recovery message sent as part of 5021 lockspace recovery). */ 5022 5023 void dlm_receive_buffer(union dlm_packet *p, int nodeid) 5024 { 5025 struct dlm_header *hd = &p->header; 5026 struct dlm_ls *ls; 5027 int type = 0; 5028 5029 switch (hd->h_cmd) { 5030 case DLM_MSG: 5031 dlm_message_in(&p->message); 5032 type = p->message.m_type; 5033 break; 5034 case DLM_RCOM: 5035 dlm_rcom_in(&p->rcom); 5036 type = p->rcom.rc_type; 5037 break; 5038 default: 5039 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); 5040 return; 5041 } 5042 5043 if (hd->h_nodeid != nodeid) { 5044 log_print("invalid h_nodeid %d from %d lockspace %x", 5045 hd->h_nodeid, nodeid, hd->h_lockspace); 5046 return; 5047 } 5048 5049 ls = dlm_find_lockspace_global(hd->h_lockspace); 5050 if (!ls) { 5051 if (dlm_config.ci_log_debug) { 5052 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 5053 "%u from %d cmd %d type %d\n", 5054 hd->h_lockspace, nodeid, hd->h_cmd, type); 5055 } 5056 5057 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 5058 dlm_send_ls_not_ready(nodeid, &p->rcom); 5059 return; 5060 } 5061 5062 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 5063 be inactive (in this ls) before transitioning to recovery mode */ 5064 5065 down_read(&ls->ls_recv_active); 5066 if (hd->h_cmd == DLM_MSG) 5067 dlm_receive_message(ls, &p->message, nodeid); 5068 else 5069 dlm_receive_rcom(ls, &p->rcom, nodeid); 5070 up_read(&ls->ls_recv_active); 5071 5072 dlm_put_lockspace(ls); 5073 } 5074 5075 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, 5076 struct dlm_message *ms_stub) 5077 { 5078 if (middle_conversion(lkb)) { 5079 hold_lkb(lkb); 5080 memset(ms_stub, 0, sizeof(struct dlm_message)); 5081 ms_stub->m_flags = DLM_IFL_STUB_MS; 5082 ms_stub->m_type = DLM_MSG_CONVERT_REPLY; 5083 ms_stub->m_result = -EINPROGRESS; 5084 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; 5085 _receive_convert_reply(lkb, ms_stub); 5086 5087 /* Same special case as in receive_rcom_lock_args() */ 5088 lkb->lkb_grmode = DLM_LOCK_IV; 5089 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 5090 unhold_lkb(lkb); 5091 5092 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 5093 lkb->lkb_flags |= DLM_IFL_RESEND; 5094 } 5095 5096 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 5097 conversions are async; there's no reply from the remote master */ 5098 } 5099 5100 /* A waiting lkb needs recovery if the master node has failed, or 5101 the master node is changing (only when no directory is used) */ 5102 5103 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 5104 int dir_nodeid) 5105 { 5106 if (dlm_no_directory(ls)) 5107 return 1; 5108 5109 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 5110 return 1; 5111 5112 return 0; 5113 } 5114 5115 /* Recovery for locks that are waiting for replies from nodes that are now 5116 gone. We can just complete unlocks and cancels by faking a reply from the 5117 dead node. Requests and up-conversions we flag to be resent after 5118 recovery. Down-conversions can just be completed with a fake reply like 5119 unlocks. Conversions between PR and CW need special attention. */ 5120 5121 void dlm_recover_waiters_pre(struct dlm_ls *ls) 5122 { 5123 struct dlm_lkb *lkb, *safe; 5124 struct dlm_message *ms_stub; 5125 int wait_type, stub_unlock_result, stub_cancel_result; 5126 int dir_nodeid; 5127 5128 ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL); 5129 if (!ms_stub) { 5130 log_error(ls, "dlm_recover_waiters_pre no mem"); 5131 return; 5132 } 5133 5134 mutex_lock(&ls->ls_waiters_mutex); 5135 5136 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 5137 5138 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 5139 5140 /* exclude debug messages about unlocks because there can be so 5141 many and they aren't very interesting */ 5142 5143 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 5144 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5145 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 5146 lkb->lkb_id, 5147 lkb->lkb_remid, 5148 lkb->lkb_wait_type, 5149 lkb->lkb_resource->res_nodeid, 5150 lkb->lkb_nodeid, 5151 lkb->lkb_wait_nodeid, 5152 dir_nodeid); 5153 } 5154 5155 /* all outstanding lookups, regardless of destination will be 5156 resent after recovery is done */ 5157 5158 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 5159 lkb->lkb_flags |= DLM_IFL_RESEND; 5160 continue; 5161 } 5162 5163 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) 5164 continue; 5165 5166 wait_type = lkb->lkb_wait_type; 5167 stub_unlock_result = -DLM_EUNLOCK; 5168 stub_cancel_result = -DLM_ECANCEL; 5169 5170 /* Main reply may have been received leaving a zero wait_type, 5171 but a reply for the overlapping op may not have been 5172 received. In that case we need to fake the appropriate 5173 reply for the overlap op. */ 5174 5175 if (!wait_type) { 5176 if (is_overlap_cancel(lkb)) { 5177 wait_type = DLM_MSG_CANCEL; 5178 if (lkb->lkb_grmode == DLM_LOCK_IV) 5179 stub_cancel_result = 0; 5180 } 5181 if (is_overlap_unlock(lkb)) { 5182 wait_type = DLM_MSG_UNLOCK; 5183 if (lkb->lkb_grmode == DLM_LOCK_IV) 5184 stub_unlock_result = -ENOENT; 5185 } 5186 5187 log_debug(ls, "rwpre overlap %x %x %d %d %d", 5188 lkb->lkb_id, lkb->lkb_flags, wait_type, 5189 stub_cancel_result, stub_unlock_result); 5190 } 5191 5192 switch (wait_type) { 5193 5194 case DLM_MSG_REQUEST: 5195 lkb->lkb_flags |= DLM_IFL_RESEND; 5196 break; 5197 5198 case DLM_MSG_CONVERT: 5199 recover_convert_waiter(ls, lkb, ms_stub); 5200 break; 5201 5202 case DLM_MSG_UNLOCK: 5203 hold_lkb(lkb); 5204 memset(ms_stub, 0, sizeof(struct dlm_message)); 5205 ms_stub->m_flags = DLM_IFL_STUB_MS; 5206 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY; 5207 ms_stub->m_result = stub_unlock_result; 5208 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; 5209 _receive_unlock_reply(lkb, ms_stub); 5210 dlm_put_lkb(lkb); 5211 break; 5212 5213 case DLM_MSG_CANCEL: 5214 hold_lkb(lkb); 5215 memset(ms_stub, 0, sizeof(struct dlm_message)); 5216 ms_stub->m_flags = DLM_IFL_STUB_MS; 5217 ms_stub->m_type = DLM_MSG_CANCEL_REPLY; 5218 ms_stub->m_result = stub_cancel_result; 5219 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; 5220 _receive_cancel_reply(lkb, ms_stub); 5221 dlm_put_lkb(lkb); 5222 break; 5223 5224 default: 5225 log_error(ls, "invalid lkb wait_type %d %d", 5226 lkb->lkb_wait_type, wait_type); 5227 } 5228 schedule(); 5229 } 5230 mutex_unlock(&ls->ls_waiters_mutex); 5231 kfree(ms_stub); 5232 } 5233 5234 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 5235 { 5236 struct dlm_lkb *lkb; 5237 int found = 0; 5238 5239 mutex_lock(&ls->ls_waiters_mutex); 5240 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 5241 if (lkb->lkb_flags & DLM_IFL_RESEND) { 5242 hold_lkb(lkb); 5243 found = 1; 5244 break; 5245 } 5246 } 5247 mutex_unlock(&ls->ls_waiters_mutex); 5248 5249 if (!found) 5250 lkb = NULL; 5251 return lkb; 5252 } 5253 5254 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 5255 master or dir-node for r. Processing the lkb may result in it being placed 5256 back on waiters. */ 5257 5258 /* We do this after normal locking has been enabled and any saved messages 5259 (in requestqueue) have been processed. We should be confident that at 5260 this point we won't get or process a reply to any of these waiting 5261 operations. But, new ops may be coming in on the rsbs/locks here from 5262 userspace or remotely. */ 5263 5264 /* there may have been an overlap unlock/cancel prior to recovery or after 5265 recovery. if before, the lkb may still have a pos wait_count; if after, the 5266 overlap flag would just have been set and nothing new sent. we can be 5267 confident here than any replies to either the initial op or overlap ops 5268 prior to recovery have been received. */ 5269 5270 int dlm_recover_waiters_post(struct dlm_ls *ls) 5271 { 5272 struct dlm_lkb *lkb; 5273 struct dlm_rsb *r; 5274 int error = 0, mstype, err, oc, ou; 5275 5276 while (1) { 5277 if (dlm_locking_stopped(ls)) { 5278 log_debug(ls, "recover_waiters_post aborted"); 5279 error = -EINTR; 5280 break; 5281 } 5282 5283 lkb = find_resend_waiter(ls); 5284 if (!lkb) 5285 break; 5286 5287 r = lkb->lkb_resource; 5288 hold_rsb(r); 5289 lock_rsb(r); 5290 5291 mstype = lkb->lkb_wait_type; 5292 oc = is_overlap_cancel(lkb); 5293 ou = is_overlap_unlock(lkb); 5294 err = 0; 5295 5296 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5297 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 5298 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 5299 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 5300 dlm_dir_nodeid(r), oc, ou); 5301 5302 /* At this point we assume that we won't get a reply to any 5303 previous op or overlap op on this lock. First, do a big 5304 remove_from_waiters() for all previous ops. */ 5305 5306 lkb->lkb_flags &= ~DLM_IFL_RESEND; 5307 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 5308 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 5309 lkb->lkb_wait_type = 0; 5310 lkb->lkb_wait_count = 0; 5311 mutex_lock(&ls->ls_waiters_mutex); 5312 list_del_init(&lkb->lkb_wait_reply); 5313 mutex_unlock(&ls->ls_waiters_mutex); 5314 unhold_lkb(lkb); /* for waiters list */ 5315 5316 if (oc || ou) { 5317 /* do an unlock or cancel instead of resending */ 5318 switch (mstype) { 5319 case DLM_MSG_LOOKUP: 5320 case DLM_MSG_REQUEST: 5321 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 5322 -DLM_ECANCEL); 5323 unhold_lkb(lkb); /* undoes create_lkb() */ 5324 break; 5325 case DLM_MSG_CONVERT: 5326 if (oc) { 5327 queue_cast(r, lkb, -DLM_ECANCEL); 5328 } else { 5329 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 5330 _unlock_lock(r, lkb); 5331 } 5332 break; 5333 default: 5334 err = 1; 5335 } 5336 } else { 5337 switch (mstype) { 5338 case DLM_MSG_LOOKUP: 5339 case DLM_MSG_REQUEST: 5340 _request_lock(r, lkb); 5341 if (is_master(r)) 5342 confirm_master(r, 0); 5343 break; 5344 case DLM_MSG_CONVERT: 5345 _convert_lock(r, lkb); 5346 break; 5347 default: 5348 err = 1; 5349 } 5350 } 5351 5352 if (err) { 5353 log_error(ls, "waiter %x msg %d r_nodeid %d " 5354 "dir_nodeid %d overlap %d %d", 5355 lkb->lkb_id, mstype, r->res_nodeid, 5356 dlm_dir_nodeid(r), oc, ou); 5357 } 5358 unlock_rsb(r); 5359 put_rsb(r); 5360 dlm_put_lkb(lkb); 5361 } 5362 5363 return error; 5364 } 5365 5366 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 5367 struct list_head *list) 5368 { 5369 struct dlm_lkb *lkb, *safe; 5370 5371 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5372 if (!is_master_copy(lkb)) 5373 continue; 5374 5375 /* don't purge lkbs we've added in recover_master_copy for 5376 the current recovery seq */ 5377 5378 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 5379 continue; 5380 5381 del_lkb(r, lkb); 5382 5383 /* this put should free the lkb */ 5384 if (!dlm_put_lkb(lkb)) 5385 log_error(ls, "purged mstcpy lkb not released"); 5386 } 5387 } 5388 5389 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 5390 { 5391 struct dlm_ls *ls = r->res_ls; 5392 5393 purge_mstcpy_list(ls, r, &r->res_grantqueue); 5394 purge_mstcpy_list(ls, r, &r->res_convertqueue); 5395 purge_mstcpy_list(ls, r, &r->res_waitqueue); 5396 } 5397 5398 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 5399 struct list_head *list, 5400 int nodeid_gone, unsigned int *count) 5401 { 5402 struct dlm_lkb *lkb, *safe; 5403 5404 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5405 if (!is_master_copy(lkb)) 5406 continue; 5407 5408 if ((lkb->lkb_nodeid == nodeid_gone) || 5409 dlm_is_removed(ls, lkb->lkb_nodeid)) { 5410 5411 /* tell recover_lvb to invalidate the lvb 5412 because a node holding EX/PW failed */ 5413 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && 5414 (lkb->lkb_grmode >= DLM_LOCK_PW)) { 5415 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); 5416 } 5417 5418 del_lkb(r, lkb); 5419 5420 /* this put should free the lkb */ 5421 if (!dlm_put_lkb(lkb)) 5422 log_error(ls, "purged dead lkb not released"); 5423 5424 rsb_set_flag(r, RSB_RECOVER_GRANT); 5425 5426 (*count)++; 5427 } 5428 } 5429 } 5430 5431 /* Get rid of locks held by nodes that are gone. */ 5432 5433 void dlm_recover_purge(struct dlm_ls *ls) 5434 { 5435 struct dlm_rsb *r; 5436 struct dlm_member *memb; 5437 int nodes_count = 0; 5438 int nodeid_gone = 0; 5439 unsigned int lkb_count = 0; 5440 5441 /* cache one removed nodeid to optimize the common 5442 case of a single node removed */ 5443 5444 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 5445 nodes_count++; 5446 nodeid_gone = memb->nodeid; 5447 } 5448 5449 if (!nodes_count) 5450 return; 5451 5452 down_write(&ls->ls_root_sem); 5453 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 5454 hold_rsb(r); 5455 lock_rsb(r); 5456 if (is_master(r)) { 5457 purge_dead_list(ls, r, &r->res_grantqueue, 5458 nodeid_gone, &lkb_count); 5459 purge_dead_list(ls, r, &r->res_convertqueue, 5460 nodeid_gone, &lkb_count); 5461 purge_dead_list(ls, r, &r->res_waitqueue, 5462 nodeid_gone, &lkb_count); 5463 } 5464 unlock_rsb(r); 5465 unhold_rsb(r); 5466 cond_resched(); 5467 } 5468 up_write(&ls->ls_root_sem); 5469 5470 if (lkb_count) 5471 log_debug(ls, "dlm_recover_purge %u locks for %u nodes", 5472 lkb_count, nodes_count); 5473 } 5474 5475 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) 5476 { 5477 struct rb_node *n; 5478 struct dlm_rsb *r; 5479 5480 spin_lock(&ls->ls_rsbtbl[bucket].lock); 5481 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 5482 r = rb_entry(n, struct dlm_rsb, res_hashnode); 5483 5484 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5485 continue; 5486 if (!is_master(r)) { 5487 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5488 continue; 5489 } 5490 hold_rsb(r); 5491 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5492 return r; 5493 } 5494 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5495 return NULL; 5496 } 5497 5498 /* 5499 * Attempt to grant locks on resources that we are the master of. 5500 * Locks may have become grantable during recovery because locks 5501 * from departed nodes have been purged (or not rebuilt), allowing 5502 * previously blocked locks to now be granted. The subset of rsb's 5503 * we are interested in are those with lkb's on either the convert or 5504 * waiting queues. 5505 * 5506 * Simplest would be to go through each master rsb and check for non-empty 5507 * convert or waiting queues, and attempt to grant on those rsbs. 5508 * Checking the queues requires lock_rsb, though, for which we'd need 5509 * to release the rsbtbl lock. This would make iterating through all 5510 * rsb's very inefficient. So, we rely on earlier recovery routines 5511 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 5512 * locks for. 5513 */ 5514 5515 void dlm_recover_grant(struct dlm_ls *ls) 5516 { 5517 struct dlm_rsb *r; 5518 int bucket = 0; 5519 unsigned int count = 0; 5520 unsigned int rsb_count = 0; 5521 unsigned int lkb_count = 0; 5522 5523 while (1) { 5524 r = find_grant_rsb(ls, bucket); 5525 if (!r) { 5526 if (bucket == ls->ls_rsbtbl_size - 1) 5527 break; 5528 bucket++; 5529 continue; 5530 } 5531 rsb_count++; 5532 count = 0; 5533 lock_rsb(r); 5534 /* the RECOVER_GRANT flag is checked in the grant path */ 5535 grant_pending_locks(r, &count); 5536 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5537 lkb_count += count; 5538 confirm_master(r, 0); 5539 unlock_rsb(r); 5540 put_rsb(r); 5541 cond_resched(); 5542 } 5543 5544 if (lkb_count) 5545 log_debug(ls, "dlm_recover_grant %u locks on %u resources", 5546 lkb_count, rsb_count); 5547 } 5548 5549 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 5550 uint32_t remid) 5551 { 5552 struct dlm_lkb *lkb; 5553 5554 list_for_each_entry(lkb, head, lkb_statequeue) { 5555 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 5556 return lkb; 5557 } 5558 return NULL; 5559 } 5560 5561 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 5562 uint32_t remid) 5563 { 5564 struct dlm_lkb *lkb; 5565 5566 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 5567 if (lkb) 5568 return lkb; 5569 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 5570 if (lkb) 5571 return lkb; 5572 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 5573 if (lkb) 5574 return lkb; 5575 return NULL; 5576 } 5577 5578 /* needs at least dlm_rcom + rcom_lock */ 5579 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 5580 struct dlm_rsb *r, struct dlm_rcom *rc) 5581 { 5582 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5583 5584 lkb->lkb_nodeid = rc->rc_header.h_nodeid; 5585 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); 5586 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); 5587 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); 5588 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; 5589 lkb->lkb_flags |= DLM_IFL_MSTCPY; 5590 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); 5591 lkb->lkb_rqmode = rl->rl_rqmode; 5592 lkb->lkb_grmode = rl->rl_grmode; 5593 /* don't set lkb_status because add_lkb wants to itself */ 5594 5595 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 5596 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 5597 5598 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 5599 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - 5600 sizeof(struct rcom_lock); 5601 if (lvblen > ls->ls_lvblen) 5602 return -EINVAL; 5603 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 5604 if (!lkb->lkb_lvbptr) 5605 return -ENOMEM; 5606 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 5607 } 5608 5609 /* Conversions between PR and CW (middle modes) need special handling. 5610 The real granted mode of these converting locks cannot be determined 5611 until all locks have been rebuilt on the rsb (recover_conversion) */ 5612 5613 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && 5614 middle_conversion(lkb)) { 5615 rl->rl_status = DLM_LKSTS_CONVERT; 5616 lkb->lkb_grmode = DLM_LOCK_IV; 5617 rsb_set_flag(r, RSB_RECOVER_CONVERT); 5618 } 5619 5620 return 0; 5621 } 5622 5623 /* This lkb may have been recovered in a previous aborted recovery so we need 5624 to check if the rsb already has an lkb with the given remote nodeid/lkid. 5625 If so we just send back a standard reply. If not, we create a new lkb with 5626 the given values and send back our lkid. We send back our lkid by sending 5627 back the rcom_lock struct we got but with the remid field filled in. */ 5628 5629 /* needs at least dlm_rcom + rcom_lock */ 5630 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5631 { 5632 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5633 struct dlm_rsb *r; 5634 struct dlm_lkb *lkb; 5635 uint32_t remid = 0; 5636 int from_nodeid = rc->rc_header.h_nodeid; 5637 int error; 5638 5639 if (rl->rl_parent_lkid) { 5640 error = -EOPNOTSUPP; 5641 goto out; 5642 } 5643 5644 remid = le32_to_cpu(rl->rl_lkid); 5645 5646 /* In general we expect the rsb returned to be R_MASTER, but we don't 5647 have to require it. Recovery of masters on one node can overlap 5648 recovery of locks on another node, so one node can send us MSTCPY 5649 locks before we've made ourselves master of this rsb. We can still 5650 add new MSTCPY locks that we receive here without any harm; when 5651 we make ourselves master, dlm_recover_masters() won't touch the 5652 MSTCPY locks we've received early. */ 5653 5654 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 5655 from_nodeid, R_RECEIVE_RECOVER, &r); 5656 if (error) 5657 goto out; 5658 5659 lock_rsb(r); 5660 5661 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5662 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5663 from_nodeid, remid); 5664 error = -EBADR; 5665 goto out_unlock; 5666 } 5667 5668 lkb = search_remid(r, from_nodeid, remid); 5669 if (lkb) { 5670 error = -EEXIST; 5671 goto out_remid; 5672 } 5673 5674 error = create_lkb(ls, &lkb); 5675 if (error) 5676 goto out_unlock; 5677 5678 error = receive_rcom_lock_args(ls, lkb, r, rc); 5679 if (error) { 5680 __put_lkb(ls, lkb); 5681 goto out_unlock; 5682 } 5683 5684 attach_lkb(r, lkb); 5685 add_lkb(r, lkb, rl->rl_status); 5686 error = 0; 5687 ls->ls_recover_locks_in++; 5688 5689 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 5690 rsb_set_flag(r, RSB_RECOVER_GRANT); 5691 5692 out_remid: 5693 /* this is the new value returned to the lock holder for 5694 saving in its process-copy lkb */ 5695 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 5696 5697 lkb->lkb_recover_seq = ls->ls_recover_seq; 5698 5699 out_unlock: 5700 unlock_rsb(r); 5701 put_rsb(r); 5702 out: 5703 if (error && error != -EEXIST) 5704 log_debug(ls, "dlm_recover_master_copy remote %d %x error %d", 5705 from_nodeid, remid, error); 5706 rl->rl_result = cpu_to_le32(error); 5707 return error; 5708 } 5709 5710 /* needs at least dlm_rcom + rcom_lock */ 5711 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5712 { 5713 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5714 struct dlm_rsb *r; 5715 struct dlm_lkb *lkb; 5716 uint32_t lkid, remid; 5717 int error, result; 5718 5719 lkid = le32_to_cpu(rl->rl_lkid); 5720 remid = le32_to_cpu(rl->rl_remid); 5721 result = le32_to_cpu(rl->rl_result); 5722 5723 error = find_lkb(ls, lkid, &lkb); 5724 if (error) { 5725 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 5726 lkid, rc->rc_header.h_nodeid, remid, result); 5727 return error; 5728 } 5729 5730 r = lkb->lkb_resource; 5731 hold_rsb(r); 5732 lock_rsb(r); 5733 5734 if (!is_process_copy(lkb)) { 5735 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 5736 lkid, rc->rc_header.h_nodeid, remid, result); 5737 dlm_dump_rsb(r); 5738 unlock_rsb(r); 5739 put_rsb(r); 5740 dlm_put_lkb(lkb); 5741 return -EINVAL; 5742 } 5743 5744 switch (result) { 5745 case -EBADR: 5746 /* There's a chance the new master received our lock before 5747 dlm_recover_master_reply(), this wouldn't happen if we did 5748 a barrier between recover_masters and recover_locks. */ 5749 5750 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 5751 lkid, rc->rc_header.h_nodeid, remid, result); 5752 5753 dlm_send_rcom_lock(r, lkb); 5754 goto out; 5755 case -EEXIST: 5756 case 0: 5757 lkb->lkb_remid = remid; 5758 break; 5759 default: 5760 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 5761 lkid, rc->rc_header.h_nodeid, remid, result); 5762 } 5763 5764 /* an ack for dlm_recover_locks() which waits for replies from 5765 all the locks it sends to new masters */ 5766 dlm_recovered_lock(r); 5767 out: 5768 unlock_rsb(r); 5769 put_rsb(r); 5770 dlm_put_lkb(lkb); 5771 5772 return 0; 5773 } 5774 5775 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5776 int mode, uint32_t flags, void *name, unsigned int namelen, 5777 unsigned long timeout_cs) 5778 { 5779 struct dlm_lkb *lkb; 5780 struct dlm_args args; 5781 int error; 5782 5783 dlm_lock_recovery(ls); 5784 5785 error = create_lkb(ls, &lkb); 5786 if (error) { 5787 kfree(ua); 5788 goto out; 5789 } 5790 5791 if (flags & DLM_LKF_VALBLK) { 5792 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5793 if (!ua->lksb.sb_lvbptr) { 5794 kfree(ua); 5795 __put_lkb(ls, lkb); 5796 error = -ENOMEM; 5797 goto out; 5798 } 5799 } 5800 5801 /* After ua is attached to lkb it will be freed by dlm_free_lkb(). 5802 When DLM_IFL_USER is set, the dlm knows that this is a userspace 5803 lock and that lkb_astparam is the dlm_user_args structure. */ 5804 5805 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, 5806 fake_astfn, ua, fake_bastfn, &args); 5807 lkb->lkb_flags |= DLM_IFL_USER; 5808 5809 if (error) { 5810 __put_lkb(ls, lkb); 5811 goto out; 5812 } 5813 5814 error = request_lock(ls, lkb, name, namelen, &args); 5815 5816 switch (error) { 5817 case 0: 5818 break; 5819 case -EINPROGRESS: 5820 error = 0; 5821 break; 5822 case -EAGAIN: 5823 error = 0; 5824 /* fall through */ 5825 default: 5826 __put_lkb(ls, lkb); 5827 goto out; 5828 } 5829 5830 /* add this new lkb to the per-process list of locks */ 5831 spin_lock(&ua->proc->locks_spin); 5832 hold_lkb(lkb); 5833 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5834 spin_unlock(&ua->proc->locks_spin); 5835 out: 5836 dlm_unlock_recovery(ls); 5837 return error; 5838 } 5839 5840 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5841 int mode, uint32_t flags, uint32_t lkid, char *lvb_in, 5842 unsigned long timeout_cs) 5843 { 5844 struct dlm_lkb *lkb; 5845 struct dlm_args args; 5846 struct dlm_user_args *ua; 5847 int error; 5848 5849 dlm_lock_recovery(ls); 5850 5851 error = find_lkb(ls, lkid, &lkb); 5852 if (error) 5853 goto out; 5854 5855 /* user can change the params on its lock when it converts it, or 5856 add an lvb that didn't exist before */ 5857 5858 ua = lkb->lkb_ua; 5859 5860 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 5861 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5862 if (!ua->lksb.sb_lvbptr) { 5863 error = -ENOMEM; 5864 goto out_put; 5865 } 5866 } 5867 if (lvb_in && ua->lksb.sb_lvbptr) 5868 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5869 5870 ua->xid = ua_tmp->xid; 5871 ua->castparam = ua_tmp->castparam; 5872 ua->castaddr = ua_tmp->castaddr; 5873 ua->bastparam = ua_tmp->bastparam; 5874 ua->bastaddr = ua_tmp->bastaddr; 5875 ua->user_lksb = ua_tmp->user_lksb; 5876 5877 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, 5878 fake_astfn, ua, fake_bastfn, &args); 5879 if (error) 5880 goto out_put; 5881 5882 error = convert_lock(ls, lkb, &args); 5883 5884 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) 5885 error = 0; 5886 out_put: 5887 dlm_put_lkb(lkb); 5888 out: 5889 dlm_unlock_recovery(ls); 5890 kfree(ua_tmp); 5891 return error; 5892 } 5893 5894 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5895 uint32_t flags, uint32_t lkid, char *lvb_in) 5896 { 5897 struct dlm_lkb *lkb; 5898 struct dlm_args args; 5899 struct dlm_user_args *ua; 5900 int error; 5901 5902 dlm_lock_recovery(ls); 5903 5904 error = find_lkb(ls, lkid, &lkb); 5905 if (error) 5906 goto out; 5907 5908 ua = lkb->lkb_ua; 5909 5910 if (lvb_in && ua->lksb.sb_lvbptr) 5911 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5912 if (ua_tmp->castparam) 5913 ua->castparam = ua_tmp->castparam; 5914 ua->user_lksb = ua_tmp->user_lksb; 5915 5916 error = set_unlock_args(flags, ua, &args); 5917 if (error) 5918 goto out_put; 5919 5920 error = unlock_lock(ls, lkb, &args); 5921 5922 if (error == -DLM_EUNLOCK) 5923 error = 0; 5924 /* from validate_unlock_args() */ 5925 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 5926 error = 0; 5927 if (error) 5928 goto out_put; 5929 5930 spin_lock(&ua->proc->locks_spin); 5931 /* dlm_user_add_cb() may have already taken lkb off the proc list */ 5932 if (!list_empty(&lkb->lkb_ownqueue)) 5933 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 5934 spin_unlock(&ua->proc->locks_spin); 5935 out_put: 5936 dlm_put_lkb(lkb); 5937 out: 5938 dlm_unlock_recovery(ls); 5939 kfree(ua_tmp); 5940 return error; 5941 } 5942 5943 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5944 uint32_t flags, uint32_t lkid) 5945 { 5946 struct dlm_lkb *lkb; 5947 struct dlm_args args; 5948 struct dlm_user_args *ua; 5949 int error; 5950 5951 dlm_lock_recovery(ls); 5952 5953 error = find_lkb(ls, lkid, &lkb); 5954 if (error) 5955 goto out; 5956 5957 ua = lkb->lkb_ua; 5958 if (ua_tmp->castparam) 5959 ua->castparam = ua_tmp->castparam; 5960 ua->user_lksb = ua_tmp->user_lksb; 5961 5962 error = set_unlock_args(flags, ua, &args); 5963 if (error) 5964 goto out_put; 5965 5966 error = cancel_lock(ls, lkb, &args); 5967 5968 if (error == -DLM_ECANCEL) 5969 error = 0; 5970 /* from validate_unlock_args() */ 5971 if (error == -EBUSY) 5972 error = 0; 5973 out_put: 5974 dlm_put_lkb(lkb); 5975 out: 5976 dlm_unlock_recovery(ls); 5977 kfree(ua_tmp); 5978 return error; 5979 } 5980 5981 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) 5982 { 5983 struct dlm_lkb *lkb; 5984 struct dlm_args args; 5985 struct dlm_user_args *ua; 5986 struct dlm_rsb *r; 5987 int error; 5988 5989 dlm_lock_recovery(ls); 5990 5991 error = find_lkb(ls, lkid, &lkb); 5992 if (error) 5993 goto out; 5994 5995 ua = lkb->lkb_ua; 5996 5997 error = set_unlock_args(flags, ua, &args); 5998 if (error) 5999 goto out_put; 6000 6001 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ 6002 6003 r = lkb->lkb_resource; 6004 hold_rsb(r); 6005 lock_rsb(r); 6006 6007 error = validate_unlock_args(lkb, &args); 6008 if (error) 6009 goto out_r; 6010 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; 6011 6012 error = _cancel_lock(r, lkb); 6013 out_r: 6014 unlock_rsb(r); 6015 put_rsb(r); 6016 6017 if (error == -DLM_ECANCEL) 6018 error = 0; 6019 /* from validate_unlock_args() */ 6020 if (error == -EBUSY) 6021 error = 0; 6022 out_put: 6023 dlm_put_lkb(lkb); 6024 out: 6025 dlm_unlock_recovery(ls); 6026 return error; 6027 } 6028 6029 /* lkb's that are removed from the waiters list by revert are just left on the 6030 orphans list with the granted orphan locks, to be freed by purge */ 6031 6032 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6033 { 6034 struct dlm_args args; 6035 int error; 6036 6037 hold_lkb(lkb); 6038 mutex_lock(&ls->ls_orphans_mutex); 6039 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 6040 mutex_unlock(&ls->ls_orphans_mutex); 6041 6042 set_unlock_args(0, lkb->lkb_ua, &args); 6043 6044 error = cancel_lock(ls, lkb, &args); 6045 if (error == -DLM_ECANCEL) 6046 error = 0; 6047 return error; 6048 } 6049 6050 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't 6051 granted. Regardless of what rsb queue the lock is on, it's removed and 6052 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated 6053 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ 6054 6055 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6056 { 6057 struct dlm_args args; 6058 int error; 6059 6060 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, 6061 lkb->lkb_ua, &args); 6062 6063 error = unlock_lock(ls, lkb, &args); 6064 if (error == -DLM_EUNLOCK) 6065 error = 0; 6066 return error; 6067 } 6068 6069 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 6070 (which does lock_rsb) due to deadlock with receiving a message that does 6071 lock_rsb followed by dlm_user_add_cb() */ 6072 6073 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 6074 struct dlm_user_proc *proc) 6075 { 6076 struct dlm_lkb *lkb = NULL; 6077 6078 mutex_lock(&ls->ls_clear_proc_locks); 6079 if (list_empty(&proc->locks)) 6080 goto out; 6081 6082 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 6083 list_del_init(&lkb->lkb_ownqueue); 6084 6085 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6086 lkb->lkb_flags |= DLM_IFL_ORPHAN; 6087 else 6088 lkb->lkb_flags |= DLM_IFL_DEAD; 6089 out: 6090 mutex_unlock(&ls->ls_clear_proc_locks); 6091 return lkb; 6092 } 6093 6094 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 6095 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 6096 which we clear here. */ 6097 6098 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 6099 list, and no more device_writes should add lkb's to proc->locks list; so we 6100 shouldn't need to take asts_spin or locks_spin here. this assumes that 6101 device reads/writes/closes are serialized -- FIXME: we may need to serialize 6102 them ourself. */ 6103 6104 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6105 { 6106 struct dlm_lkb *lkb, *safe; 6107 6108 dlm_lock_recovery(ls); 6109 6110 while (1) { 6111 lkb = del_proc_lock(ls, proc); 6112 if (!lkb) 6113 break; 6114 del_timeout(lkb); 6115 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6116 orphan_proc_lock(ls, lkb); 6117 else 6118 unlock_proc_lock(ls, lkb); 6119 6120 /* this removes the reference for the proc->locks list 6121 added by dlm_user_request, it may result in the lkb 6122 being freed */ 6123 6124 dlm_put_lkb(lkb); 6125 } 6126 6127 mutex_lock(&ls->ls_clear_proc_locks); 6128 6129 /* in-progress unlocks */ 6130 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6131 list_del_init(&lkb->lkb_ownqueue); 6132 lkb->lkb_flags |= DLM_IFL_DEAD; 6133 dlm_put_lkb(lkb); 6134 } 6135 6136 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6137 memset(&lkb->lkb_callbacks, 0, 6138 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 6139 list_del_init(&lkb->lkb_cb_list); 6140 dlm_put_lkb(lkb); 6141 } 6142 6143 mutex_unlock(&ls->ls_clear_proc_locks); 6144 dlm_unlock_recovery(ls); 6145 } 6146 6147 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6148 { 6149 struct dlm_lkb *lkb, *safe; 6150 6151 while (1) { 6152 lkb = NULL; 6153 spin_lock(&proc->locks_spin); 6154 if (!list_empty(&proc->locks)) { 6155 lkb = list_entry(proc->locks.next, struct dlm_lkb, 6156 lkb_ownqueue); 6157 list_del_init(&lkb->lkb_ownqueue); 6158 } 6159 spin_unlock(&proc->locks_spin); 6160 6161 if (!lkb) 6162 break; 6163 6164 lkb->lkb_flags |= DLM_IFL_DEAD; 6165 unlock_proc_lock(ls, lkb); 6166 dlm_put_lkb(lkb); /* ref from proc->locks list */ 6167 } 6168 6169 spin_lock(&proc->locks_spin); 6170 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6171 list_del_init(&lkb->lkb_ownqueue); 6172 lkb->lkb_flags |= DLM_IFL_DEAD; 6173 dlm_put_lkb(lkb); 6174 } 6175 spin_unlock(&proc->locks_spin); 6176 6177 spin_lock(&proc->asts_spin); 6178 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6179 memset(&lkb->lkb_callbacks, 0, 6180 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 6181 list_del_init(&lkb->lkb_cb_list); 6182 dlm_put_lkb(lkb); 6183 } 6184 spin_unlock(&proc->asts_spin); 6185 } 6186 6187 /* pid of 0 means purge all orphans */ 6188 6189 static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 6190 { 6191 struct dlm_lkb *lkb, *safe; 6192 6193 mutex_lock(&ls->ls_orphans_mutex); 6194 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 6195 if (pid && lkb->lkb_ownpid != pid) 6196 continue; 6197 unlock_proc_lock(ls, lkb); 6198 list_del_init(&lkb->lkb_ownqueue); 6199 dlm_put_lkb(lkb); 6200 } 6201 mutex_unlock(&ls->ls_orphans_mutex); 6202 } 6203 6204 static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 6205 { 6206 struct dlm_message *ms; 6207 struct dlm_mhandle *mh; 6208 int error; 6209 6210 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 6211 DLM_MSG_PURGE, &ms, &mh); 6212 if (error) 6213 return error; 6214 ms->m_nodeid = nodeid; 6215 ms->m_pid = pid; 6216 6217 return send_message(mh, ms); 6218 } 6219 6220 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 6221 int nodeid, int pid) 6222 { 6223 int error = 0; 6224 6225 if (nodeid != dlm_our_nodeid()) { 6226 error = send_purge(ls, nodeid, pid); 6227 } else { 6228 dlm_lock_recovery(ls); 6229 if (pid == current->pid) 6230 purge_proc_locks(ls, proc); 6231 else 6232 do_purge(ls, nodeid, pid); 6233 dlm_unlock_recovery(ls); 6234 } 6235 return error; 6236 } 6237 6238