1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. 6 ** 7 ** 8 ******************************************************************************* 9 ******************************************************************************/ 10 11 /* Central locking logic has four stages: 12 13 dlm_lock() 14 dlm_unlock() 15 16 request_lock(ls, lkb) 17 convert_lock(ls, lkb) 18 unlock_lock(ls, lkb) 19 cancel_lock(ls, lkb) 20 21 _request_lock(r, lkb) 22 _convert_lock(r, lkb) 23 _unlock_lock(r, lkb) 24 _cancel_lock(r, lkb) 25 26 do_request(r, lkb) 27 do_convert(r, lkb) 28 do_unlock(r, lkb) 29 do_cancel(r, lkb) 30 31 Stage 1 (lock, unlock) is mainly about checking input args and 32 splitting into one of the four main operations: 33 34 dlm_lock = request_lock 35 dlm_lock+CONVERT = convert_lock 36 dlm_unlock = unlock_lock 37 dlm_unlock+CANCEL = cancel_lock 38 39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 40 provided to the next stage. 41 42 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 43 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 44 45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 46 given rsb and lkb and queues callbacks. 47 48 For remote operations, send_xxxx() results in the corresponding do_xxxx() 49 function being executed on the remote node. The connecting send/receive 50 calls on local (L) and remote (R) nodes: 51 52 L: send_xxxx() -> R: receive_xxxx() 53 R: do_xxxx() 54 L: receive_xxxx_reply() <- R: send_xxxx_reply() 55 */ 56 #include <trace/events/dlm.h> 57 58 #include <linux/types.h> 59 #include <linux/rbtree.h> 60 #include <linux/slab.h> 61 #include "dlm_internal.h" 62 #include <linux/dlm_device.h> 63 #include "memory.h" 64 #include "midcomms.h" 65 #include "requestqueue.h" 66 #include "util.h" 67 #include "dir.h" 68 #include "member.h" 69 #include "lockspace.h" 70 #include "ast.h" 71 #include "lock.h" 72 #include "rcom.h" 73 #include "recover.h" 74 #include "lvb_table.h" 75 #include "user.h" 76 #include "config.h" 77 78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static int send_remove(struct dlm_rsb *r); 86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 89 struct dlm_message *ms); 90 static int receive_extralen(struct dlm_message *ms); 91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid); 92 static void del_timeout(struct dlm_lkb *lkb); 93 static void toss_rsb(struct kref *kref); 94 95 /* 96 * Lock compatibilty matrix - thanks Steve 97 * UN = Unlocked state. Not really a state, used as a flag 98 * PD = Padding. Used to make the matrix a nice power of two in size 99 * Other states are the same as the VMS DLM. 100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 101 */ 102 103 static const int __dlm_compat_matrix[8][8] = { 104 /* UN NL CR CW PR PW EX PD */ 105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 113 }; 114 115 /* 116 * This defines the direction of transfer of LVB data. 117 * Granted mode is the row; requested mode is the column. 118 * Usage: matrix[grmode+1][rqmode+1] 119 * 1 = LVB is returned to the caller 120 * 0 = LVB is written to the resource 121 * -1 = nothing happens to the LVB 122 */ 123 124 const int dlm_lvb_operations[8][8] = { 125 /* UN NL CR CW PR PW EX PD*/ 126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 134 }; 135 136 #define modes_compat(gr, rq) \ 137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 138 139 int dlm_modes_compat(int mode1, int mode2) 140 { 141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 142 } 143 144 /* 145 * Compatibility matrix for conversions with QUECVT set. 146 * Granted mode is the row; requested mode is the column. 147 * Usage: matrix[grmode+1][rqmode+1] 148 */ 149 150 static const int __quecvt_compat_matrix[8][8] = { 151 /* UN NL CR CW PR PW EX PD */ 152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 160 }; 161 162 void dlm_print_lkb(struct dlm_lkb *lkb) 163 { 164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " 165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n", 166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid, 169 (unsigned long long)lkb->lkb_recover_seq); 170 } 171 172 static void dlm_print_rsb(struct dlm_rsb *r) 173 { 174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x " 175 "rlc %d name %s\n", 176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid, 177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count, 178 r->res_name); 179 } 180 181 void dlm_dump_rsb(struct dlm_rsb *r) 182 { 183 struct dlm_lkb *lkb; 184 185 dlm_print_rsb(r); 186 187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 189 printk(KERN_ERR "rsb lookup list\n"); 190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 191 dlm_print_lkb(lkb); 192 printk(KERN_ERR "rsb grant queue:\n"); 193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 194 dlm_print_lkb(lkb); 195 printk(KERN_ERR "rsb convert queue:\n"); 196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 197 dlm_print_lkb(lkb); 198 printk(KERN_ERR "rsb wait queue:\n"); 199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 200 dlm_print_lkb(lkb); 201 } 202 203 /* Threads cannot use the lockspace while it's being recovered */ 204 205 static inline void dlm_lock_recovery(struct dlm_ls *ls) 206 { 207 down_read(&ls->ls_in_recovery); 208 } 209 210 void dlm_unlock_recovery(struct dlm_ls *ls) 211 { 212 up_read(&ls->ls_in_recovery); 213 } 214 215 int dlm_lock_recovery_try(struct dlm_ls *ls) 216 { 217 return down_read_trylock(&ls->ls_in_recovery); 218 } 219 220 static inline int can_be_queued(struct dlm_lkb *lkb) 221 { 222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 223 } 224 225 static inline int force_blocking_asts(struct dlm_lkb *lkb) 226 { 227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 228 } 229 230 static inline int is_demoted(struct dlm_lkb *lkb) 231 { 232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 233 } 234 235 static inline int is_altmode(struct dlm_lkb *lkb) 236 { 237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE); 238 } 239 240 static inline int is_granted(struct dlm_lkb *lkb) 241 { 242 return (lkb->lkb_status == DLM_LKSTS_GRANTED); 243 } 244 245 static inline int is_remote(struct dlm_rsb *r) 246 { 247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 248 return !!r->res_nodeid; 249 } 250 251 static inline int is_process_copy(struct dlm_lkb *lkb) 252 { 253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 254 } 255 256 static inline int is_master_copy(struct dlm_lkb *lkb) 257 { 258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 259 } 260 261 static inline int middle_conversion(struct dlm_lkb *lkb) 262 { 263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 265 return 1; 266 return 0; 267 } 268 269 static inline int down_conversion(struct dlm_lkb *lkb) 270 { 271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 272 } 273 274 static inline int is_overlap_unlock(struct dlm_lkb *lkb) 275 { 276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK; 277 } 278 279 static inline int is_overlap_cancel(struct dlm_lkb *lkb) 280 { 281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL; 282 } 283 284 static inline int is_overlap(struct dlm_lkb *lkb) 285 { 286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK | 287 DLM_IFL_OVERLAP_CANCEL)); 288 } 289 290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 291 { 292 if (is_master_copy(lkb)) 293 return; 294 295 del_timeout(lkb); 296 297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 298 299 /* if the operation was a cancel, then return -DLM_ECANCEL, if a 300 timeout caused the cancel then return -ETIMEDOUT */ 301 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) { 302 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL; 303 rv = -ETIMEDOUT; 304 } 305 306 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) { 307 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL; 308 rv = -EDEADLK; 309 } 310 311 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags); 312 } 313 314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb) 315 { 316 queue_cast(r, lkb, 317 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL); 318 } 319 320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 321 { 322 if (is_master_copy(lkb)) { 323 send_bast(r, lkb, rqmode); 324 } else { 325 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0); 326 } 327 } 328 329 /* 330 * Basic operations on rsb's and lkb's 331 */ 332 333 /* This is only called to add a reference when the code already holds 334 a valid reference to the rsb, so there's no need for locking. */ 335 336 static inline void hold_rsb(struct dlm_rsb *r) 337 { 338 kref_get(&r->res_ref); 339 } 340 341 void dlm_hold_rsb(struct dlm_rsb *r) 342 { 343 hold_rsb(r); 344 } 345 346 /* When all references to the rsb are gone it's transferred to 347 the tossed list for later disposal. */ 348 349 static void put_rsb(struct dlm_rsb *r) 350 { 351 struct dlm_ls *ls = r->res_ls; 352 uint32_t bucket = r->res_bucket; 353 354 spin_lock(&ls->ls_rsbtbl[bucket].lock); 355 kref_put(&r->res_ref, toss_rsb); 356 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 357 } 358 359 void dlm_put_rsb(struct dlm_rsb *r) 360 { 361 put_rsb(r); 362 } 363 364 static int pre_rsb_struct(struct dlm_ls *ls) 365 { 366 struct dlm_rsb *r1, *r2; 367 int count = 0; 368 369 spin_lock(&ls->ls_new_rsb_spin); 370 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) { 371 spin_unlock(&ls->ls_new_rsb_spin); 372 return 0; 373 } 374 spin_unlock(&ls->ls_new_rsb_spin); 375 376 r1 = dlm_allocate_rsb(ls); 377 r2 = dlm_allocate_rsb(ls); 378 379 spin_lock(&ls->ls_new_rsb_spin); 380 if (r1) { 381 list_add(&r1->res_hashchain, &ls->ls_new_rsb); 382 ls->ls_new_rsb_count++; 383 } 384 if (r2) { 385 list_add(&r2->res_hashchain, &ls->ls_new_rsb); 386 ls->ls_new_rsb_count++; 387 } 388 count = ls->ls_new_rsb_count; 389 spin_unlock(&ls->ls_new_rsb_spin); 390 391 if (!count) 392 return -ENOMEM; 393 return 0; 394 } 395 396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can 397 unlock any spinlocks, go back and call pre_rsb_struct again. 398 Otherwise, take an rsb off the list and return it. */ 399 400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len, 401 struct dlm_rsb **r_ret) 402 { 403 struct dlm_rsb *r; 404 int count; 405 406 spin_lock(&ls->ls_new_rsb_spin); 407 if (list_empty(&ls->ls_new_rsb)) { 408 count = ls->ls_new_rsb_count; 409 spin_unlock(&ls->ls_new_rsb_spin); 410 log_debug(ls, "find_rsb retry %d %d %s", 411 count, dlm_config.ci_new_rsb_count, name); 412 return -EAGAIN; 413 } 414 415 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); 416 list_del(&r->res_hashchain); 417 /* Convert the empty list_head to a NULL rb_node for tree usage: */ 418 memset(&r->res_hashnode, 0, sizeof(struct rb_node)); 419 ls->ls_new_rsb_count--; 420 spin_unlock(&ls->ls_new_rsb_spin); 421 422 r->res_ls = ls; 423 r->res_length = len; 424 memcpy(r->res_name, name, len); 425 mutex_init(&r->res_mutex); 426 427 INIT_LIST_HEAD(&r->res_lookup); 428 INIT_LIST_HEAD(&r->res_grantqueue); 429 INIT_LIST_HEAD(&r->res_convertqueue); 430 INIT_LIST_HEAD(&r->res_waitqueue); 431 INIT_LIST_HEAD(&r->res_root_list); 432 INIT_LIST_HEAD(&r->res_recover_list); 433 434 *r_ret = r; 435 return 0; 436 } 437 438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen) 439 { 440 char maxname[DLM_RESNAME_MAXLEN]; 441 442 memset(maxname, 0, DLM_RESNAME_MAXLEN); 443 memcpy(maxname, name, nlen); 444 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN); 445 } 446 447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, 448 struct dlm_rsb **r_ret) 449 { 450 struct rb_node *node = tree->rb_node; 451 struct dlm_rsb *r; 452 int rc; 453 454 while (node) { 455 r = rb_entry(node, struct dlm_rsb, res_hashnode); 456 rc = rsb_cmp(r, name, len); 457 if (rc < 0) 458 node = node->rb_left; 459 else if (rc > 0) 460 node = node->rb_right; 461 else 462 goto found; 463 } 464 *r_ret = NULL; 465 return -EBADR; 466 467 found: 468 *r_ret = r; 469 return 0; 470 } 471 472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree) 473 { 474 struct rb_node **newn = &tree->rb_node; 475 struct rb_node *parent = NULL; 476 int rc; 477 478 while (*newn) { 479 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb, 480 res_hashnode); 481 482 parent = *newn; 483 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length); 484 if (rc < 0) 485 newn = &parent->rb_left; 486 else if (rc > 0) 487 newn = &parent->rb_right; 488 else { 489 log_print("rsb_insert match"); 490 dlm_dump_rsb(rsb); 491 dlm_dump_rsb(cur); 492 return -EEXIST; 493 } 494 } 495 496 rb_link_node(&rsb->res_hashnode, parent, newn); 497 rb_insert_color(&rsb->res_hashnode, tree); 498 return 0; 499 } 500 501 /* 502 * Find rsb in rsbtbl and potentially create/add one 503 * 504 * Delaying the release of rsb's has a similar benefit to applications keeping 505 * NL locks on an rsb, but without the guarantee that the cached master value 506 * will still be valid when the rsb is reused. Apps aren't always smart enough 507 * to keep NL locks on an rsb that they may lock again shortly; this can lead 508 * to excessive master lookups and removals if we don't delay the release. 509 * 510 * Searching for an rsb means looking through both the normal list and toss 511 * list. When found on the toss list the rsb is moved to the normal list with 512 * ref count of 1; when found on normal list the ref count is incremented. 513 * 514 * rsb's on the keep list are being used locally and refcounted. 515 * rsb's on the toss list are not being used locally, and are not refcounted. 516 * 517 * The toss list rsb's were either 518 * - previously used locally but not any more (were on keep list, then 519 * moved to toss list when last refcount dropped) 520 * - created and put on toss list as a directory record for a lookup 521 * (we are the dir node for the res, but are not using the res right now, 522 * but some other node is) 523 * 524 * The purpose of find_rsb() is to return a refcounted rsb for local use. 525 * So, if the given rsb is on the toss list, it is moved to the keep list 526 * before being returned. 527 * 528 * toss_rsb() happens when all local usage of the rsb is done, i.e. no 529 * more refcounts exist, so the rsb is moved from the keep list to the 530 * toss list. 531 * 532 * rsb's on both keep and toss lists are used for doing a name to master 533 * lookups. rsb's that are in use locally (and being refcounted) are on 534 * the keep list, rsb's that are not in use locally (not refcounted) and 535 * only exist for name/master lookups are on the toss list. 536 * 537 * rsb's on the toss list who's dir_nodeid is not local can have stale 538 * name/master mappings. So, remote requests on such rsb's can potentially 539 * return with an error, which means the mapping is stale and needs to 540 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and 541 * first_lkid is to keep only a single outstanding request on an rsb 542 * while that rsb has a potentially stale master.) 543 */ 544 545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len, 546 uint32_t hash, uint32_t b, 547 int dir_nodeid, int from_nodeid, 548 unsigned int flags, struct dlm_rsb **r_ret) 549 { 550 struct dlm_rsb *r = NULL; 551 int our_nodeid = dlm_our_nodeid(); 552 int from_local = 0; 553 int from_other = 0; 554 int from_dir = 0; 555 int create = 0; 556 int error; 557 558 if (flags & R_RECEIVE_REQUEST) { 559 if (from_nodeid == dir_nodeid) 560 from_dir = 1; 561 else 562 from_other = 1; 563 } else if (flags & R_REQUEST) { 564 from_local = 1; 565 } 566 567 /* 568 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so 569 * from_nodeid has sent us a lock in dlm_recover_locks, believing 570 * we're the new master. Our local recovery may not have set 571 * res_master_nodeid to our_nodeid yet, so allow either. Don't 572 * create the rsb; dlm_recover_process_copy() will handle EBADR 573 * by resending. 574 * 575 * If someone sends us a request, we are the dir node, and we do 576 * not find the rsb anywhere, then recreate it. This happens if 577 * someone sends us a request after we have removed/freed an rsb 578 * from our toss list. (They sent a request instead of lookup 579 * because they are using an rsb from their toss list.) 580 */ 581 582 if (from_local || from_dir || 583 (from_other && (dir_nodeid == our_nodeid))) { 584 create = 1; 585 } 586 587 retry: 588 if (create) { 589 error = pre_rsb_struct(ls); 590 if (error < 0) 591 goto out; 592 } 593 594 spin_lock(&ls->ls_rsbtbl[b].lock); 595 596 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 597 if (error) 598 goto do_toss; 599 600 /* 601 * rsb is active, so we can't check master_nodeid without lock_rsb. 602 */ 603 604 kref_get(&r->res_ref); 605 error = 0; 606 goto out_unlock; 607 608 609 do_toss: 610 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 611 if (error) 612 goto do_new; 613 614 /* 615 * rsb found inactive (master_nodeid may be out of date unless 616 * we are the dir_nodeid or were the master) No other thread 617 * is using this rsb because it's on the toss list, so we can 618 * look at or update res_master_nodeid without lock_rsb. 619 */ 620 621 if ((r->res_master_nodeid != our_nodeid) && from_other) { 622 /* our rsb was not master, and another node (not the dir node) 623 has sent us a request */ 624 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s", 625 from_nodeid, r->res_master_nodeid, dir_nodeid, 626 r->res_name); 627 error = -ENOTBLK; 628 goto out_unlock; 629 } 630 631 if ((r->res_master_nodeid != our_nodeid) && from_dir) { 632 /* don't think this should ever happen */ 633 log_error(ls, "find_rsb toss from_dir %d master %d", 634 from_nodeid, r->res_master_nodeid); 635 dlm_print_rsb(r); 636 /* fix it and go on */ 637 r->res_master_nodeid = our_nodeid; 638 r->res_nodeid = 0; 639 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 640 r->res_first_lkid = 0; 641 } 642 643 if (from_local && (r->res_master_nodeid != our_nodeid)) { 644 /* Because we have held no locks on this rsb, 645 res_master_nodeid could have become stale. */ 646 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 647 r->res_first_lkid = 0; 648 } 649 650 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 651 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 652 goto out_unlock; 653 654 655 do_new: 656 /* 657 * rsb not found 658 */ 659 660 if (error == -EBADR && !create) 661 goto out_unlock; 662 663 error = get_rsb_struct(ls, name, len, &r); 664 if (error == -EAGAIN) { 665 spin_unlock(&ls->ls_rsbtbl[b].lock); 666 goto retry; 667 } 668 if (error) 669 goto out_unlock; 670 671 r->res_hash = hash; 672 r->res_bucket = b; 673 r->res_dir_nodeid = dir_nodeid; 674 kref_init(&r->res_ref); 675 676 if (from_dir) { 677 /* want to see how often this happens */ 678 log_debug(ls, "find_rsb new from_dir %d recreate %s", 679 from_nodeid, r->res_name); 680 r->res_master_nodeid = our_nodeid; 681 r->res_nodeid = 0; 682 goto out_add; 683 } 684 685 if (from_other && (dir_nodeid != our_nodeid)) { 686 /* should never happen */ 687 log_error(ls, "find_rsb new from_other %d dir %d our %d %s", 688 from_nodeid, dir_nodeid, our_nodeid, r->res_name); 689 dlm_free_rsb(r); 690 r = NULL; 691 error = -ENOTBLK; 692 goto out_unlock; 693 } 694 695 if (from_other) { 696 log_debug(ls, "find_rsb new from_other %d dir %d %s", 697 from_nodeid, dir_nodeid, r->res_name); 698 } 699 700 if (dir_nodeid == our_nodeid) { 701 /* When we are the dir nodeid, we can set the master 702 node immediately */ 703 r->res_master_nodeid = our_nodeid; 704 r->res_nodeid = 0; 705 } else { 706 /* set_master will send_lookup to dir_nodeid */ 707 r->res_master_nodeid = 0; 708 r->res_nodeid = -1; 709 } 710 711 out_add: 712 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 713 out_unlock: 714 spin_unlock(&ls->ls_rsbtbl[b].lock); 715 out: 716 *r_ret = r; 717 return error; 718 } 719 720 /* During recovery, other nodes can send us new MSTCPY locks (from 721 dlm_recover_locks) before we've made ourself master (in 722 dlm_recover_masters). */ 723 724 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len, 725 uint32_t hash, uint32_t b, 726 int dir_nodeid, int from_nodeid, 727 unsigned int flags, struct dlm_rsb **r_ret) 728 { 729 struct dlm_rsb *r = NULL; 730 int our_nodeid = dlm_our_nodeid(); 731 int recover = (flags & R_RECEIVE_RECOVER); 732 int error; 733 734 retry: 735 error = pre_rsb_struct(ls); 736 if (error < 0) 737 goto out; 738 739 spin_lock(&ls->ls_rsbtbl[b].lock); 740 741 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 742 if (error) 743 goto do_toss; 744 745 /* 746 * rsb is active, so we can't check master_nodeid without lock_rsb. 747 */ 748 749 kref_get(&r->res_ref); 750 goto out_unlock; 751 752 753 do_toss: 754 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 755 if (error) 756 goto do_new; 757 758 /* 759 * rsb found inactive. No other thread is using this rsb because 760 * it's on the toss list, so we can look at or update 761 * res_master_nodeid without lock_rsb. 762 */ 763 764 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { 765 /* our rsb is not master, and another node has sent us a 766 request; this should never happen */ 767 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d", 768 from_nodeid, r->res_master_nodeid, dir_nodeid); 769 dlm_print_rsb(r); 770 error = -ENOTBLK; 771 goto out_unlock; 772 } 773 774 if (!recover && (r->res_master_nodeid != our_nodeid) && 775 (dir_nodeid == our_nodeid)) { 776 /* our rsb is not master, and we are dir; may as well fix it; 777 this should never happen */ 778 log_error(ls, "find_rsb toss our %d master %d dir %d", 779 our_nodeid, r->res_master_nodeid, dir_nodeid); 780 dlm_print_rsb(r); 781 r->res_master_nodeid = our_nodeid; 782 r->res_nodeid = 0; 783 } 784 785 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 786 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 787 goto out_unlock; 788 789 790 do_new: 791 /* 792 * rsb not found 793 */ 794 795 error = get_rsb_struct(ls, name, len, &r); 796 if (error == -EAGAIN) { 797 spin_unlock(&ls->ls_rsbtbl[b].lock); 798 goto retry; 799 } 800 if (error) 801 goto out_unlock; 802 803 r->res_hash = hash; 804 r->res_bucket = b; 805 r->res_dir_nodeid = dir_nodeid; 806 r->res_master_nodeid = dir_nodeid; 807 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid; 808 kref_init(&r->res_ref); 809 810 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep); 811 out_unlock: 812 spin_unlock(&ls->ls_rsbtbl[b].lock); 813 out: 814 *r_ret = r; 815 return error; 816 } 817 818 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid, 819 unsigned int flags, struct dlm_rsb **r_ret) 820 { 821 uint32_t hash, b; 822 int dir_nodeid; 823 824 if (len > DLM_RESNAME_MAXLEN) 825 return -EINVAL; 826 827 hash = jhash(name, len, 0); 828 b = hash & (ls->ls_rsbtbl_size - 1); 829 830 dir_nodeid = dlm_hash2nodeid(ls, hash); 831 832 if (dlm_no_directory(ls)) 833 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid, 834 from_nodeid, flags, r_ret); 835 else 836 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid, 837 from_nodeid, flags, r_ret); 838 } 839 840 /* we have received a request and found that res_master_nodeid != our_nodeid, 841 so we need to return an error or make ourself the master */ 842 843 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, 844 int from_nodeid) 845 { 846 if (dlm_no_directory(ls)) { 847 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d", 848 from_nodeid, r->res_master_nodeid, 849 r->res_dir_nodeid); 850 dlm_print_rsb(r); 851 return -ENOTBLK; 852 } 853 854 if (from_nodeid != r->res_dir_nodeid) { 855 /* our rsb is not master, and another node (not the dir node) 856 has sent us a request. this is much more common when our 857 master_nodeid is zero, so limit debug to non-zero. */ 858 859 if (r->res_master_nodeid) { 860 log_debug(ls, "validate master from_other %d master %d " 861 "dir %d first %x %s", from_nodeid, 862 r->res_master_nodeid, r->res_dir_nodeid, 863 r->res_first_lkid, r->res_name); 864 } 865 return -ENOTBLK; 866 } else { 867 /* our rsb is not master, but the dir nodeid has sent us a 868 request; this could happen with master 0 / res_nodeid -1 */ 869 870 if (r->res_master_nodeid) { 871 log_error(ls, "validate master from_dir %d master %d " 872 "first %x %s", 873 from_nodeid, r->res_master_nodeid, 874 r->res_first_lkid, r->res_name); 875 } 876 877 r->res_master_nodeid = dlm_our_nodeid(); 878 r->res_nodeid = 0; 879 return 0; 880 } 881 } 882 883 /* 884 * We're the dir node for this res and another node wants to know the 885 * master nodeid. During normal operation (non recovery) this is only 886 * called from receive_lookup(); master lookups when the local node is 887 * the dir node are done by find_rsb(). 888 * 889 * normal operation, we are the dir node for a resource 890 * . _request_lock 891 * . set_master 892 * . send_lookup 893 * . receive_lookup 894 * . dlm_master_lookup flags 0 895 * 896 * recover directory, we are rebuilding dir for all resources 897 * . dlm_recover_directory 898 * . dlm_rcom_names 899 * remote node sends back the rsb names it is master of and we are dir of 900 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) 901 * we either create new rsb setting remote node as master, or find existing 902 * rsb and set master to be the remote node. 903 * 904 * recover masters, we are finding the new master for resources 905 * . dlm_recover_masters 906 * . recover_master 907 * . dlm_send_rcom_lookup 908 * . receive_rcom_lookup 909 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0) 910 */ 911 912 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len, 913 unsigned int flags, int *r_nodeid, int *result) 914 { 915 struct dlm_rsb *r = NULL; 916 uint32_t hash, b; 917 int from_master = (flags & DLM_LU_RECOVER_DIR); 918 int fix_master = (flags & DLM_LU_RECOVER_MASTER); 919 int our_nodeid = dlm_our_nodeid(); 920 int dir_nodeid, error, toss_list = 0; 921 922 if (len > DLM_RESNAME_MAXLEN) 923 return -EINVAL; 924 925 if (from_nodeid == our_nodeid) { 926 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x", 927 our_nodeid, flags); 928 return -EINVAL; 929 } 930 931 hash = jhash(name, len, 0); 932 b = hash & (ls->ls_rsbtbl_size - 1); 933 934 dir_nodeid = dlm_hash2nodeid(ls, hash); 935 if (dir_nodeid != our_nodeid) { 936 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d", 937 from_nodeid, dir_nodeid, our_nodeid, hash, 938 ls->ls_num_nodes); 939 *r_nodeid = -1; 940 return -EINVAL; 941 } 942 943 retry: 944 error = pre_rsb_struct(ls); 945 if (error < 0) 946 return error; 947 948 spin_lock(&ls->ls_rsbtbl[b].lock); 949 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 950 if (!error) { 951 /* because the rsb is active, we need to lock_rsb before 952 checking/changing re_master_nodeid */ 953 954 hold_rsb(r); 955 spin_unlock(&ls->ls_rsbtbl[b].lock); 956 lock_rsb(r); 957 goto found; 958 } 959 960 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 961 if (error) 962 goto not_found; 963 964 /* because the rsb is inactive (on toss list), it's not refcounted 965 and lock_rsb is not used, but is protected by the rsbtbl lock */ 966 967 toss_list = 1; 968 found: 969 if (r->res_dir_nodeid != our_nodeid) { 970 /* should not happen, but may as well fix it and carry on */ 971 log_error(ls, "dlm_master_lookup res_dir %d our %d %s", 972 r->res_dir_nodeid, our_nodeid, r->res_name); 973 r->res_dir_nodeid = our_nodeid; 974 } 975 976 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) { 977 /* Recovery uses this function to set a new master when 978 the previous master failed. Setting NEW_MASTER will 979 force dlm_recover_masters to call recover_master on this 980 rsb even though the res_nodeid is no longer removed. */ 981 982 r->res_master_nodeid = from_nodeid; 983 r->res_nodeid = from_nodeid; 984 rsb_set_flag(r, RSB_NEW_MASTER); 985 986 if (toss_list) { 987 /* I don't think we should ever find it on toss list. */ 988 log_error(ls, "dlm_master_lookup fix_master on toss"); 989 dlm_dump_rsb(r); 990 } 991 } 992 993 if (from_master && (r->res_master_nodeid != from_nodeid)) { 994 /* this will happen if from_nodeid became master during 995 a previous recovery cycle, and we aborted the previous 996 cycle before recovering this master value */ 997 998 log_limit(ls, "dlm_master_lookup from_master %d " 999 "master_nodeid %d res_nodeid %d first %x %s", 1000 from_nodeid, r->res_master_nodeid, r->res_nodeid, 1001 r->res_first_lkid, r->res_name); 1002 1003 if (r->res_master_nodeid == our_nodeid) { 1004 log_error(ls, "from_master %d our_master", from_nodeid); 1005 dlm_dump_rsb(r); 1006 goto out_found; 1007 } 1008 1009 r->res_master_nodeid = from_nodeid; 1010 r->res_nodeid = from_nodeid; 1011 rsb_set_flag(r, RSB_NEW_MASTER); 1012 } 1013 1014 if (!r->res_master_nodeid) { 1015 /* this will happen if recovery happens while we're looking 1016 up the master for this rsb */ 1017 1018 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s", 1019 from_nodeid, r->res_first_lkid, r->res_name); 1020 r->res_master_nodeid = from_nodeid; 1021 r->res_nodeid = from_nodeid; 1022 } 1023 1024 if (!from_master && !fix_master && 1025 (r->res_master_nodeid == from_nodeid)) { 1026 /* this can happen when the master sends remove, the dir node 1027 finds the rsb on the keep list and ignores the remove, 1028 and the former master sends a lookup */ 1029 1030 log_limit(ls, "dlm_master_lookup from master %d flags %x " 1031 "first %x %s", from_nodeid, flags, 1032 r->res_first_lkid, r->res_name); 1033 } 1034 1035 out_found: 1036 *r_nodeid = r->res_master_nodeid; 1037 if (result) 1038 *result = DLM_LU_MATCH; 1039 1040 if (toss_list) { 1041 r->res_toss_time = jiffies; 1042 /* the rsb was inactive (on toss list) */ 1043 spin_unlock(&ls->ls_rsbtbl[b].lock); 1044 } else { 1045 /* the rsb was active */ 1046 unlock_rsb(r); 1047 put_rsb(r); 1048 } 1049 return 0; 1050 1051 not_found: 1052 error = get_rsb_struct(ls, name, len, &r); 1053 if (error == -EAGAIN) { 1054 spin_unlock(&ls->ls_rsbtbl[b].lock); 1055 goto retry; 1056 } 1057 if (error) 1058 goto out_unlock; 1059 1060 r->res_hash = hash; 1061 r->res_bucket = b; 1062 r->res_dir_nodeid = our_nodeid; 1063 r->res_master_nodeid = from_nodeid; 1064 r->res_nodeid = from_nodeid; 1065 kref_init(&r->res_ref); 1066 r->res_toss_time = jiffies; 1067 1068 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss); 1069 if (error) { 1070 /* should never happen */ 1071 dlm_free_rsb(r); 1072 spin_unlock(&ls->ls_rsbtbl[b].lock); 1073 goto retry; 1074 } 1075 1076 if (result) 1077 *result = DLM_LU_ADD; 1078 *r_nodeid = from_nodeid; 1079 error = 0; 1080 out_unlock: 1081 spin_unlock(&ls->ls_rsbtbl[b].lock); 1082 return error; 1083 } 1084 1085 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash) 1086 { 1087 struct rb_node *n; 1088 struct dlm_rsb *r; 1089 int i; 1090 1091 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1092 spin_lock(&ls->ls_rsbtbl[i].lock); 1093 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) { 1094 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1095 if (r->res_hash == hash) 1096 dlm_dump_rsb(r); 1097 } 1098 spin_unlock(&ls->ls_rsbtbl[i].lock); 1099 } 1100 } 1101 1102 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len) 1103 { 1104 struct dlm_rsb *r = NULL; 1105 uint32_t hash, b; 1106 int error; 1107 1108 hash = jhash(name, len, 0); 1109 b = hash & (ls->ls_rsbtbl_size - 1); 1110 1111 spin_lock(&ls->ls_rsbtbl[b].lock); 1112 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 1113 if (!error) 1114 goto out_dump; 1115 1116 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1117 if (error) 1118 goto out; 1119 out_dump: 1120 dlm_dump_rsb(r); 1121 out: 1122 spin_unlock(&ls->ls_rsbtbl[b].lock); 1123 } 1124 1125 static void toss_rsb(struct kref *kref) 1126 { 1127 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1128 struct dlm_ls *ls = r->res_ls; 1129 1130 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 1131 kref_init(&r->res_ref); 1132 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep); 1133 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss); 1134 r->res_toss_time = jiffies; 1135 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK; 1136 if (r->res_lvbptr) { 1137 dlm_free_lvb(r->res_lvbptr); 1138 r->res_lvbptr = NULL; 1139 } 1140 } 1141 1142 /* See comment for unhold_lkb */ 1143 1144 static void unhold_rsb(struct dlm_rsb *r) 1145 { 1146 int rv; 1147 rv = kref_put(&r->res_ref, toss_rsb); 1148 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 1149 } 1150 1151 static void kill_rsb(struct kref *kref) 1152 { 1153 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 1154 1155 /* All work is done after the return from kref_put() so we 1156 can release the write_lock before the remove and free. */ 1157 1158 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 1159 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 1160 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 1161 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 1162 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 1163 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 1164 } 1165 1166 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 1167 The rsb must exist as long as any lkb's for it do. */ 1168 1169 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1170 { 1171 hold_rsb(r); 1172 lkb->lkb_resource = r; 1173 } 1174 1175 static void detach_lkb(struct dlm_lkb *lkb) 1176 { 1177 if (lkb->lkb_resource) { 1178 put_rsb(lkb->lkb_resource); 1179 lkb->lkb_resource = NULL; 1180 } 1181 } 1182 1183 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret, 1184 int start, int end) 1185 { 1186 struct dlm_lkb *lkb; 1187 int rv; 1188 1189 lkb = dlm_allocate_lkb(ls); 1190 if (!lkb) 1191 return -ENOMEM; 1192 1193 lkb->lkb_nodeid = -1; 1194 lkb->lkb_grmode = DLM_LOCK_IV; 1195 kref_init(&lkb->lkb_ref); 1196 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 1197 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup); 1198 INIT_LIST_HEAD(&lkb->lkb_time_list); 1199 INIT_LIST_HEAD(&lkb->lkb_cb_list); 1200 mutex_init(&lkb->lkb_cb_mutex); 1201 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work); 1202 1203 idr_preload(GFP_NOFS); 1204 spin_lock(&ls->ls_lkbidr_spin); 1205 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT); 1206 if (rv >= 0) 1207 lkb->lkb_id = rv; 1208 spin_unlock(&ls->ls_lkbidr_spin); 1209 idr_preload_end(); 1210 1211 if (rv < 0) { 1212 log_error(ls, "create_lkb idr error %d", rv); 1213 dlm_free_lkb(lkb); 1214 return rv; 1215 } 1216 1217 *lkb_ret = lkb; 1218 return 0; 1219 } 1220 1221 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 1222 { 1223 return _create_lkb(ls, lkb_ret, 1, 0); 1224 } 1225 1226 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 1227 { 1228 struct dlm_lkb *lkb; 1229 1230 spin_lock(&ls->ls_lkbidr_spin); 1231 lkb = idr_find(&ls->ls_lkbidr, lkid); 1232 if (lkb) 1233 kref_get(&lkb->lkb_ref); 1234 spin_unlock(&ls->ls_lkbidr_spin); 1235 1236 *lkb_ret = lkb; 1237 return lkb ? 0 : -ENOENT; 1238 } 1239 1240 static void kill_lkb(struct kref *kref) 1241 { 1242 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 1243 1244 /* All work is done after the return from kref_put() so we 1245 can release the write_lock before the detach_lkb */ 1246 1247 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1248 } 1249 1250 /* __put_lkb() is used when an lkb may not have an rsb attached to 1251 it so we need to provide the lockspace explicitly */ 1252 1253 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 1254 { 1255 uint32_t lkid = lkb->lkb_id; 1256 1257 spin_lock(&ls->ls_lkbidr_spin); 1258 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 1259 idr_remove(&ls->ls_lkbidr, lkid); 1260 spin_unlock(&ls->ls_lkbidr_spin); 1261 1262 detach_lkb(lkb); 1263 1264 /* for local/process lkbs, lvbptr points to caller's lksb */ 1265 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 1266 dlm_free_lvb(lkb->lkb_lvbptr); 1267 dlm_free_lkb(lkb); 1268 return 1; 1269 } else { 1270 spin_unlock(&ls->ls_lkbidr_spin); 1271 return 0; 1272 } 1273 } 1274 1275 int dlm_put_lkb(struct dlm_lkb *lkb) 1276 { 1277 struct dlm_ls *ls; 1278 1279 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 1280 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 1281 1282 ls = lkb->lkb_resource->res_ls; 1283 return __put_lkb(ls, lkb); 1284 } 1285 1286 /* This is only called to add a reference when the code already holds 1287 a valid reference to the lkb, so there's no need for locking. */ 1288 1289 static inline void hold_lkb(struct dlm_lkb *lkb) 1290 { 1291 kref_get(&lkb->lkb_ref); 1292 } 1293 1294 /* This is called when we need to remove a reference and are certain 1295 it's not the last ref. e.g. del_lkb is always called between a 1296 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 1297 put_lkb would work fine, but would involve unnecessary locking */ 1298 1299 static inline void unhold_lkb(struct dlm_lkb *lkb) 1300 { 1301 int rv; 1302 rv = kref_put(&lkb->lkb_ref, kill_lkb); 1303 DLM_ASSERT(!rv, dlm_print_lkb(lkb);); 1304 } 1305 1306 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 1307 int mode) 1308 { 1309 struct dlm_lkb *lkb = NULL; 1310 1311 list_for_each_entry(lkb, head, lkb_statequeue) 1312 if (lkb->lkb_rqmode < mode) 1313 break; 1314 1315 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); 1316 } 1317 1318 /* add/remove lkb to rsb's grant/convert/wait queue */ 1319 1320 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 1321 { 1322 kref_get(&lkb->lkb_ref); 1323 1324 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 1325 1326 lkb->lkb_timestamp = ktime_get(); 1327 1328 lkb->lkb_status = status; 1329 1330 switch (status) { 1331 case DLM_LKSTS_WAITING: 1332 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1333 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 1334 else 1335 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 1336 break; 1337 case DLM_LKSTS_GRANTED: 1338 /* convention says granted locks kept in order of grmode */ 1339 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 1340 lkb->lkb_grmode); 1341 break; 1342 case DLM_LKSTS_CONVERT: 1343 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 1344 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 1345 else 1346 list_add_tail(&lkb->lkb_statequeue, 1347 &r->res_convertqueue); 1348 break; 1349 default: 1350 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 1351 } 1352 } 1353 1354 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 1355 { 1356 lkb->lkb_status = 0; 1357 list_del(&lkb->lkb_statequeue); 1358 unhold_lkb(lkb); 1359 } 1360 1361 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 1362 { 1363 hold_lkb(lkb); 1364 del_lkb(r, lkb); 1365 add_lkb(r, lkb, sts); 1366 unhold_lkb(lkb); 1367 } 1368 1369 static int msg_reply_type(int mstype) 1370 { 1371 switch (mstype) { 1372 case DLM_MSG_REQUEST: 1373 return DLM_MSG_REQUEST_REPLY; 1374 case DLM_MSG_CONVERT: 1375 return DLM_MSG_CONVERT_REPLY; 1376 case DLM_MSG_UNLOCK: 1377 return DLM_MSG_UNLOCK_REPLY; 1378 case DLM_MSG_CANCEL: 1379 return DLM_MSG_CANCEL_REPLY; 1380 case DLM_MSG_LOOKUP: 1381 return DLM_MSG_LOOKUP_REPLY; 1382 } 1383 return -1; 1384 } 1385 1386 static int nodeid_warned(int nodeid, int num_nodes, int *warned) 1387 { 1388 int i; 1389 1390 for (i = 0; i < num_nodes; i++) { 1391 if (!warned[i]) { 1392 warned[i] = nodeid; 1393 return 0; 1394 } 1395 if (warned[i] == nodeid) 1396 return 1; 1397 } 1398 return 0; 1399 } 1400 1401 void dlm_scan_waiters(struct dlm_ls *ls) 1402 { 1403 struct dlm_lkb *lkb; 1404 s64 us; 1405 s64 debug_maxus = 0; 1406 u32 debug_scanned = 0; 1407 u32 debug_expired = 0; 1408 int num_nodes = 0; 1409 int *warned = NULL; 1410 1411 if (!dlm_config.ci_waitwarn_us) 1412 return; 1413 1414 mutex_lock(&ls->ls_waiters_mutex); 1415 1416 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 1417 if (!lkb->lkb_wait_time) 1418 continue; 1419 1420 debug_scanned++; 1421 1422 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time)); 1423 1424 if (us < dlm_config.ci_waitwarn_us) 1425 continue; 1426 1427 lkb->lkb_wait_time = 0; 1428 1429 debug_expired++; 1430 if (us > debug_maxus) 1431 debug_maxus = us; 1432 1433 if (!num_nodes) { 1434 num_nodes = ls->ls_num_nodes; 1435 warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL); 1436 } 1437 if (!warned) 1438 continue; 1439 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned)) 1440 continue; 1441 1442 log_error(ls, "waitwarn %x %lld %d us check connection to " 1443 "node %d", lkb->lkb_id, (long long)us, 1444 dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid); 1445 } 1446 mutex_unlock(&ls->ls_waiters_mutex); 1447 kfree(warned); 1448 1449 if (debug_expired) 1450 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us", 1451 debug_scanned, debug_expired, 1452 dlm_config.ci_waitwarn_us, (long long)debug_maxus); 1453 } 1454 1455 /* add/remove lkb from global waiters list of lkb's waiting for 1456 a reply from a remote node */ 1457 1458 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid) 1459 { 1460 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1461 int error = 0; 1462 1463 mutex_lock(&ls->ls_waiters_mutex); 1464 1465 if (is_overlap_unlock(lkb) || 1466 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) { 1467 error = -EINVAL; 1468 goto out; 1469 } 1470 1471 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { 1472 switch (mstype) { 1473 case DLM_MSG_UNLOCK: 1474 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 1475 break; 1476 case DLM_MSG_CANCEL: 1477 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 1478 break; 1479 default: 1480 error = -EBUSY; 1481 goto out; 1482 } 1483 lkb->lkb_wait_count++; 1484 hold_lkb(lkb); 1485 1486 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x", 1487 lkb->lkb_id, lkb->lkb_wait_type, mstype, 1488 lkb->lkb_wait_count, lkb->lkb_flags); 1489 goto out; 1490 } 1491 1492 DLM_ASSERT(!lkb->lkb_wait_count, 1493 dlm_print_lkb(lkb); 1494 printk("wait_count %d\n", lkb->lkb_wait_count);); 1495 1496 lkb->lkb_wait_count++; 1497 lkb->lkb_wait_type = mstype; 1498 lkb->lkb_wait_time = ktime_get(); 1499 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */ 1500 hold_lkb(lkb); 1501 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 1502 out: 1503 if (error) 1504 log_error(ls, "addwait error %x %d flags %x %d %d %s", 1505 lkb->lkb_id, error, lkb->lkb_flags, mstype, 1506 lkb->lkb_wait_type, lkb->lkb_resource->res_name); 1507 mutex_unlock(&ls->ls_waiters_mutex); 1508 return error; 1509 } 1510 1511 /* We clear the RESEND flag because we might be taking an lkb off the waiters 1512 list as part of process_requestqueue (e.g. a lookup that has an optimized 1513 request reply on the requestqueue) between dlm_recover_waiters_pre() which 1514 set RESEND and dlm_recover_waiters_post() */ 1515 1516 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype, 1517 struct dlm_message *ms) 1518 { 1519 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1520 int overlap_done = 0; 1521 1522 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) { 1523 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id); 1524 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 1525 overlap_done = 1; 1526 goto out_del; 1527 } 1528 1529 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) { 1530 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id); 1531 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1532 overlap_done = 1; 1533 goto out_del; 1534 } 1535 1536 /* Cancel state was preemptively cleared by a successful convert, 1537 see next comment, nothing to do. */ 1538 1539 if ((mstype == DLM_MSG_CANCEL_REPLY) && 1540 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) { 1541 log_debug(ls, "remwait %x cancel_reply wait_type %d", 1542 lkb->lkb_id, lkb->lkb_wait_type); 1543 return -1; 1544 } 1545 1546 /* Remove for the convert reply, and premptively remove for the 1547 cancel reply. A convert has been granted while there's still 1548 an outstanding cancel on it (the cancel is moot and the result 1549 in the cancel reply should be 0). We preempt the cancel reply 1550 because the app gets the convert result and then can follow up 1551 with another op, like convert. This subsequent op would see the 1552 lingering state of the cancel and fail with -EBUSY. */ 1553 1554 if ((mstype == DLM_MSG_CONVERT_REPLY) && 1555 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && 1556 is_overlap_cancel(lkb) && ms && !ms->m_result) { 1557 log_debug(ls, "remwait %x convert_reply zap overlap_cancel", 1558 lkb->lkb_id); 1559 lkb->lkb_wait_type = 0; 1560 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 1561 lkb->lkb_wait_count--; 1562 goto out_del; 1563 } 1564 1565 /* N.B. type of reply may not always correspond to type of original 1566 msg due to lookup->request optimization, verify others? */ 1567 1568 if (lkb->lkb_wait_type) { 1569 lkb->lkb_wait_type = 0; 1570 goto out_del; 1571 } 1572 1573 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait", 1574 lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid, 1575 mstype, lkb->lkb_flags); 1576 return -1; 1577 1578 out_del: 1579 /* the force-unlock/cancel has completed and we haven't recvd a reply 1580 to the op that was in progress prior to the unlock/cancel; we 1581 give up on any reply to the earlier op. FIXME: not sure when/how 1582 this would happen */ 1583 1584 if (overlap_done && lkb->lkb_wait_type) { 1585 log_error(ls, "remwait error %x reply %d wait_type %d overlap", 1586 lkb->lkb_id, mstype, lkb->lkb_wait_type); 1587 lkb->lkb_wait_count--; 1588 lkb->lkb_wait_type = 0; 1589 } 1590 1591 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb);); 1592 1593 lkb->lkb_flags &= ~DLM_IFL_RESEND; 1594 lkb->lkb_wait_count--; 1595 if (!lkb->lkb_wait_count) 1596 list_del_init(&lkb->lkb_wait_reply); 1597 unhold_lkb(lkb); 1598 return 0; 1599 } 1600 1601 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype) 1602 { 1603 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1604 int error; 1605 1606 mutex_lock(&ls->ls_waiters_mutex); 1607 error = _remove_from_waiters(lkb, mstype, NULL); 1608 mutex_unlock(&ls->ls_waiters_mutex); 1609 return error; 1610 } 1611 1612 /* Handles situations where we might be processing a "fake" or "stub" reply in 1613 which we can't try to take waiters_mutex again. */ 1614 1615 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms) 1616 { 1617 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1618 int error; 1619 1620 if (ms->m_flags != DLM_IFL_STUB_MS) 1621 mutex_lock(&ls->ls_waiters_mutex); 1622 error = _remove_from_waiters(lkb, ms->m_type, ms); 1623 if (ms->m_flags != DLM_IFL_STUB_MS) 1624 mutex_unlock(&ls->ls_waiters_mutex); 1625 return error; 1626 } 1627 1628 /* If there's an rsb for the same resource being removed, ensure 1629 * that the remove message is sent before the new lookup message. 1630 */ 1631 1632 #define DLM_WAIT_PENDING_COND(ls, r) \ 1633 (ls->ls_remove_len && \ 1634 !rsb_cmp(r, ls->ls_remove_name, \ 1635 ls->ls_remove_len)) 1636 1637 static void wait_pending_remove(struct dlm_rsb *r) 1638 { 1639 struct dlm_ls *ls = r->res_ls; 1640 restart: 1641 spin_lock(&ls->ls_remove_spin); 1642 if (DLM_WAIT_PENDING_COND(ls, r)) { 1643 log_debug(ls, "delay lookup for remove dir %d %s", 1644 r->res_dir_nodeid, r->res_name); 1645 spin_unlock(&ls->ls_remove_spin); 1646 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r)); 1647 goto restart; 1648 } 1649 spin_unlock(&ls->ls_remove_spin); 1650 } 1651 1652 /* 1653 * ls_remove_spin protects ls_remove_name and ls_remove_len which are 1654 * read by other threads in wait_pending_remove. ls_remove_names 1655 * and ls_remove_lens are only used by the scan thread, so they do 1656 * not need protection. 1657 */ 1658 1659 static void shrink_bucket(struct dlm_ls *ls, int b) 1660 { 1661 struct rb_node *n, *next; 1662 struct dlm_rsb *r; 1663 char *name; 1664 int our_nodeid = dlm_our_nodeid(); 1665 int remote_count = 0; 1666 int need_shrink = 0; 1667 int i, len, rv; 1668 1669 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); 1670 1671 spin_lock(&ls->ls_rsbtbl[b].lock); 1672 1673 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) { 1674 spin_unlock(&ls->ls_rsbtbl[b].lock); 1675 return; 1676 } 1677 1678 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) { 1679 next = rb_next(n); 1680 r = rb_entry(n, struct dlm_rsb, res_hashnode); 1681 1682 /* If we're the directory record for this rsb, and 1683 we're not the master of it, then we need to wait 1684 for the master node to send us a dir remove for 1685 before removing the dir record. */ 1686 1687 if (!dlm_no_directory(ls) && 1688 (r->res_master_nodeid != our_nodeid) && 1689 (dlm_dir_nodeid(r) == our_nodeid)) { 1690 continue; 1691 } 1692 1693 need_shrink = 1; 1694 1695 if (!time_after_eq(jiffies, r->res_toss_time + 1696 dlm_config.ci_toss_secs * HZ)) { 1697 continue; 1698 } 1699 1700 if (!dlm_no_directory(ls) && 1701 (r->res_master_nodeid == our_nodeid) && 1702 (dlm_dir_nodeid(r) != our_nodeid)) { 1703 1704 /* We're the master of this rsb but we're not 1705 the directory record, so we need to tell the 1706 dir node to remove the dir record. */ 1707 1708 ls->ls_remove_lens[remote_count] = r->res_length; 1709 memcpy(ls->ls_remove_names[remote_count], r->res_name, 1710 DLM_RESNAME_MAXLEN); 1711 remote_count++; 1712 1713 if (remote_count >= DLM_REMOVE_NAMES_MAX) 1714 break; 1715 continue; 1716 } 1717 1718 if (!kref_put(&r->res_ref, kill_rsb)) { 1719 log_error(ls, "tossed rsb in use %s", r->res_name); 1720 continue; 1721 } 1722 1723 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1724 dlm_free_rsb(r); 1725 } 1726 1727 if (need_shrink) 1728 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK; 1729 else 1730 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK; 1731 spin_unlock(&ls->ls_rsbtbl[b].lock); 1732 1733 /* 1734 * While searching for rsb's to free, we found some that require 1735 * remote removal. We leave them in place and find them again here 1736 * so there is a very small gap between removing them from the toss 1737 * list and sending the removal. Keeping this gap small is 1738 * important to keep us (the master node) from being out of sync 1739 * with the remote dir node for very long. 1740 * 1741 * From the time the rsb is removed from toss until just after 1742 * send_remove, the rsb name is saved in ls_remove_name. A new 1743 * lookup checks this to ensure that a new lookup message for the 1744 * same resource name is not sent just before the remove message. 1745 */ 1746 1747 for (i = 0; i < remote_count; i++) { 1748 name = ls->ls_remove_names[i]; 1749 len = ls->ls_remove_lens[i]; 1750 1751 spin_lock(&ls->ls_rsbtbl[b].lock); 1752 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 1753 if (rv) { 1754 spin_unlock(&ls->ls_rsbtbl[b].lock); 1755 log_debug(ls, "remove_name not toss %s", name); 1756 continue; 1757 } 1758 1759 if (r->res_master_nodeid != our_nodeid) { 1760 spin_unlock(&ls->ls_rsbtbl[b].lock); 1761 log_debug(ls, "remove_name master %d dir %d our %d %s", 1762 r->res_master_nodeid, r->res_dir_nodeid, 1763 our_nodeid, name); 1764 continue; 1765 } 1766 1767 if (r->res_dir_nodeid == our_nodeid) { 1768 /* should never happen */ 1769 spin_unlock(&ls->ls_rsbtbl[b].lock); 1770 log_error(ls, "remove_name dir %d master %d our %d %s", 1771 r->res_dir_nodeid, r->res_master_nodeid, 1772 our_nodeid, name); 1773 continue; 1774 } 1775 1776 if (!time_after_eq(jiffies, r->res_toss_time + 1777 dlm_config.ci_toss_secs * HZ)) { 1778 spin_unlock(&ls->ls_rsbtbl[b].lock); 1779 log_debug(ls, "remove_name toss_time %lu now %lu %s", 1780 r->res_toss_time, jiffies, name); 1781 continue; 1782 } 1783 1784 if (!kref_put(&r->res_ref, kill_rsb)) { 1785 spin_unlock(&ls->ls_rsbtbl[b].lock); 1786 log_error(ls, "remove_name in use %s", name); 1787 continue; 1788 } 1789 1790 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 1791 1792 /* block lookup of same name until we've sent remove */ 1793 spin_lock(&ls->ls_remove_spin); 1794 ls->ls_remove_len = len; 1795 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 1796 spin_unlock(&ls->ls_remove_spin); 1797 spin_unlock(&ls->ls_rsbtbl[b].lock); 1798 wake_up(&ls->ls_remove_wait); 1799 1800 send_remove(r); 1801 1802 /* allow lookup of name again */ 1803 spin_lock(&ls->ls_remove_spin); 1804 ls->ls_remove_len = 0; 1805 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 1806 spin_unlock(&ls->ls_remove_spin); 1807 1808 dlm_free_rsb(r); 1809 } 1810 } 1811 1812 void dlm_scan_rsbs(struct dlm_ls *ls) 1813 { 1814 int i; 1815 1816 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 1817 shrink_bucket(ls, i); 1818 if (dlm_locking_stopped(ls)) 1819 break; 1820 cond_resched(); 1821 } 1822 } 1823 1824 static void add_timeout(struct dlm_lkb *lkb) 1825 { 1826 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1827 1828 if (is_master_copy(lkb)) 1829 return; 1830 1831 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) && 1832 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 1833 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN; 1834 goto add_it; 1835 } 1836 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT) 1837 goto add_it; 1838 return; 1839 1840 add_it: 1841 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb);); 1842 mutex_lock(&ls->ls_timeout_mutex); 1843 hold_lkb(lkb); 1844 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout); 1845 mutex_unlock(&ls->ls_timeout_mutex); 1846 } 1847 1848 static void del_timeout(struct dlm_lkb *lkb) 1849 { 1850 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 1851 1852 mutex_lock(&ls->ls_timeout_mutex); 1853 if (!list_empty(&lkb->lkb_time_list)) { 1854 list_del_init(&lkb->lkb_time_list); 1855 unhold_lkb(lkb); 1856 } 1857 mutex_unlock(&ls->ls_timeout_mutex); 1858 } 1859 1860 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and 1861 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex 1862 and then lock rsb because of lock ordering in add_timeout. We may need 1863 to specify some special timeout-related bits in the lkb that are just to 1864 be accessed under the timeout_mutex. */ 1865 1866 void dlm_scan_timeout(struct dlm_ls *ls) 1867 { 1868 struct dlm_rsb *r; 1869 struct dlm_lkb *lkb; 1870 int do_cancel, do_warn; 1871 s64 wait_us; 1872 1873 for (;;) { 1874 if (dlm_locking_stopped(ls)) 1875 break; 1876 1877 do_cancel = 0; 1878 do_warn = 0; 1879 mutex_lock(&ls->ls_timeout_mutex); 1880 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) { 1881 1882 wait_us = ktime_to_us(ktime_sub(ktime_get(), 1883 lkb->lkb_timestamp)); 1884 1885 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) && 1886 wait_us >= (lkb->lkb_timeout_cs * 10000)) 1887 do_cancel = 1; 1888 1889 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) && 1890 wait_us >= dlm_config.ci_timewarn_cs * 10000) 1891 do_warn = 1; 1892 1893 if (!do_cancel && !do_warn) 1894 continue; 1895 hold_lkb(lkb); 1896 break; 1897 } 1898 mutex_unlock(&ls->ls_timeout_mutex); 1899 1900 if (!do_cancel && !do_warn) 1901 break; 1902 1903 r = lkb->lkb_resource; 1904 hold_rsb(r); 1905 lock_rsb(r); 1906 1907 if (do_warn) { 1908 /* clear flag so we only warn once */ 1909 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1910 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT)) 1911 del_timeout(lkb); 1912 dlm_timeout_warn(lkb); 1913 } 1914 1915 if (do_cancel) { 1916 log_debug(ls, "timeout cancel %x node %d %s", 1917 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 1918 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN; 1919 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL; 1920 del_timeout(lkb); 1921 _cancel_lock(r, lkb); 1922 } 1923 1924 unlock_rsb(r); 1925 unhold_rsb(r); 1926 dlm_put_lkb(lkb); 1927 } 1928 } 1929 1930 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping 1931 dlm_recoverd before checking/setting ls_recover_begin. */ 1932 1933 void dlm_adjust_timeouts(struct dlm_ls *ls) 1934 { 1935 struct dlm_lkb *lkb; 1936 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin); 1937 1938 ls->ls_recover_begin = 0; 1939 mutex_lock(&ls->ls_timeout_mutex); 1940 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) 1941 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us); 1942 mutex_unlock(&ls->ls_timeout_mutex); 1943 1944 if (!dlm_config.ci_waitwarn_us) 1945 return; 1946 1947 mutex_lock(&ls->ls_waiters_mutex); 1948 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 1949 if (ktime_to_us(lkb->lkb_wait_time)) 1950 lkb->lkb_wait_time = ktime_get(); 1951 } 1952 mutex_unlock(&ls->ls_waiters_mutex); 1953 } 1954 1955 /* lkb is master or local copy */ 1956 1957 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1958 { 1959 int b, len = r->res_ls->ls_lvblen; 1960 1961 /* b=1 lvb returned to caller 1962 b=0 lvb written to rsb or invalidated 1963 b=-1 do nothing */ 1964 1965 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 1966 1967 if (b == 1) { 1968 if (!lkb->lkb_lvbptr) 1969 return; 1970 1971 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1972 return; 1973 1974 if (!r->res_lvbptr) 1975 return; 1976 1977 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 1978 lkb->lkb_lvbseq = r->res_lvbseq; 1979 1980 } else if (b == 0) { 1981 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 1982 rsb_set_flag(r, RSB_VALNOTVALID); 1983 return; 1984 } 1985 1986 if (!lkb->lkb_lvbptr) 1987 return; 1988 1989 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 1990 return; 1991 1992 if (!r->res_lvbptr) 1993 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 1994 1995 if (!r->res_lvbptr) 1996 return; 1997 1998 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 1999 r->res_lvbseq++; 2000 lkb->lkb_lvbseq = r->res_lvbseq; 2001 rsb_clear_flag(r, RSB_VALNOTVALID); 2002 } 2003 2004 if (rsb_flag(r, RSB_VALNOTVALID)) 2005 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 2006 } 2007 2008 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2009 { 2010 if (lkb->lkb_grmode < DLM_LOCK_PW) 2011 return; 2012 2013 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 2014 rsb_set_flag(r, RSB_VALNOTVALID); 2015 return; 2016 } 2017 2018 if (!lkb->lkb_lvbptr) 2019 return; 2020 2021 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2022 return; 2023 2024 if (!r->res_lvbptr) 2025 r->res_lvbptr = dlm_allocate_lvb(r->res_ls); 2026 2027 if (!r->res_lvbptr) 2028 return; 2029 2030 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 2031 r->res_lvbseq++; 2032 rsb_clear_flag(r, RSB_VALNOTVALID); 2033 } 2034 2035 /* lkb is process copy (pc) */ 2036 2037 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2038 struct dlm_message *ms) 2039 { 2040 int b; 2041 2042 if (!lkb->lkb_lvbptr) 2043 return; 2044 2045 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 2046 return; 2047 2048 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 2049 if (b == 1) { 2050 int len = receive_extralen(ms); 2051 if (len > r->res_ls->ls_lvblen) 2052 len = r->res_ls->ls_lvblen; 2053 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2054 lkb->lkb_lvbseq = ms->m_lvbseq; 2055 } 2056 } 2057 2058 /* Manipulate lkb's on rsb's convert/granted/waiting queues 2059 remove_lock -- used for unlock, removes lkb from granted 2060 revert_lock -- used for cancel, moves lkb from convert to granted 2061 grant_lock -- used for request and convert, adds lkb to granted or 2062 moves lkb from convert or waiting to granted 2063 2064 Each of these is used for master or local copy lkb's. There is 2065 also a _pc() variation used to make the corresponding change on 2066 a process copy (pc) lkb. */ 2067 2068 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2069 { 2070 del_lkb(r, lkb); 2071 lkb->lkb_grmode = DLM_LOCK_IV; 2072 /* this unhold undoes the original ref from create_lkb() 2073 so this leads to the lkb being freed */ 2074 unhold_lkb(lkb); 2075 } 2076 2077 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2078 { 2079 set_lvb_unlock(r, lkb); 2080 _remove_lock(r, lkb); 2081 } 2082 2083 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2084 { 2085 _remove_lock(r, lkb); 2086 } 2087 2088 /* returns: 0 did nothing 2089 1 moved lock to granted 2090 -1 removed lock */ 2091 2092 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2093 { 2094 int rv = 0; 2095 2096 lkb->lkb_rqmode = DLM_LOCK_IV; 2097 2098 switch (lkb->lkb_status) { 2099 case DLM_LKSTS_GRANTED: 2100 break; 2101 case DLM_LKSTS_CONVERT: 2102 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2103 rv = 1; 2104 break; 2105 case DLM_LKSTS_WAITING: 2106 del_lkb(r, lkb); 2107 lkb->lkb_grmode = DLM_LOCK_IV; 2108 /* this unhold undoes the original ref from create_lkb() 2109 so this leads to the lkb being freed */ 2110 unhold_lkb(lkb); 2111 rv = -1; 2112 break; 2113 default: 2114 log_print("invalid status for revert %d", lkb->lkb_status); 2115 } 2116 return rv; 2117 } 2118 2119 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 2120 { 2121 return revert_lock(r, lkb); 2122 } 2123 2124 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2125 { 2126 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 2127 lkb->lkb_grmode = lkb->lkb_rqmode; 2128 if (lkb->lkb_status) 2129 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 2130 else 2131 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 2132 } 2133 2134 lkb->lkb_rqmode = DLM_LOCK_IV; 2135 lkb->lkb_highbast = 0; 2136 } 2137 2138 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2139 { 2140 set_lvb_lock(r, lkb); 2141 _grant_lock(r, lkb); 2142 } 2143 2144 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 2145 struct dlm_message *ms) 2146 { 2147 set_lvb_lock_pc(r, lkb, ms); 2148 _grant_lock(r, lkb); 2149 } 2150 2151 /* called by grant_pending_locks() which means an async grant message must 2152 be sent to the requesting node in addition to granting the lock if the 2153 lkb belongs to a remote node. */ 2154 2155 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 2156 { 2157 grant_lock(r, lkb); 2158 if (is_master_copy(lkb)) 2159 send_grant(r, lkb); 2160 else 2161 queue_cast(r, lkb, 0); 2162 } 2163 2164 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to 2165 change the granted/requested modes. We're munging things accordingly in 2166 the process copy. 2167 CONVDEADLK: our grmode may have been forced down to NL to resolve a 2168 conversion deadlock 2169 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become 2170 compatible with other granted locks */ 2171 2172 static void munge_demoted(struct dlm_lkb *lkb) 2173 { 2174 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) { 2175 log_print("munge_demoted %x invalid modes gr %d rq %d", 2176 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode); 2177 return; 2178 } 2179 2180 lkb->lkb_grmode = DLM_LOCK_NL; 2181 } 2182 2183 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms) 2184 { 2185 if (ms->m_type != DLM_MSG_REQUEST_REPLY && 2186 ms->m_type != DLM_MSG_GRANT) { 2187 log_print("munge_altmode %x invalid reply type %d", 2188 lkb->lkb_id, ms->m_type); 2189 return; 2190 } 2191 2192 if (lkb->lkb_exflags & DLM_LKF_ALTPR) 2193 lkb->lkb_rqmode = DLM_LOCK_PR; 2194 else if (lkb->lkb_exflags & DLM_LKF_ALTCW) 2195 lkb->lkb_rqmode = DLM_LOCK_CW; 2196 else { 2197 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags); 2198 dlm_print_lkb(lkb); 2199 } 2200 } 2201 2202 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 2203 { 2204 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 2205 lkb_statequeue); 2206 if (lkb->lkb_id == first->lkb_id) 2207 return 1; 2208 2209 return 0; 2210 } 2211 2212 /* Check if the given lkb conflicts with another lkb on the queue. */ 2213 2214 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 2215 { 2216 struct dlm_lkb *this; 2217 2218 list_for_each_entry(this, head, lkb_statequeue) { 2219 if (this == lkb) 2220 continue; 2221 if (!modes_compat(this, lkb)) 2222 return 1; 2223 } 2224 return 0; 2225 } 2226 2227 /* 2228 * "A conversion deadlock arises with a pair of lock requests in the converting 2229 * queue for one resource. The granted mode of each lock blocks the requested 2230 * mode of the other lock." 2231 * 2232 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the 2233 * convert queue from being granted, then deadlk/demote lkb. 2234 * 2235 * Example: 2236 * Granted Queue: empty 2237 * Convert Queue: NL->EX (first lock) 2238 * PR->EX (second lock) 2239 * 2240 * The first lock can't be granted because of the granted mode of the second 2241 * lock and the second lock can't be granted because it's not first in the 2242 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we 2243 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK 2244 * flag set and return DEMOTED in the lksb flags. 2245 * 2246 * Originally, this function detected conv-deadlk in a more limited scope: 2247 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or 2248 * - if lkb1 was the first entry in the queue (not just earlier), and was 2249 * blocked by the granted mode of lkb2, and there was nothing on the 2250 * granted queue preventing lkb1 from being granted immediately, i.e. 2251 * lkb2 was the only thing preventing lkb1 from being granted. 2252 * 2253 * That second condition meant we'd only say there was conv-deadlk if 2254 * resolving it (by demotion) would lead to the first lock on the convert 2255 * queue being granted right away. It allowed conversion deadlocks to exist 2256 * between locks on the convert queue while they couldn't be granted anyway. 2257 * 2258 * Now, we detect and take action on conversion deadlocks immediately when 2259 * they're created, even if they may not be immediately consequential. If 2260 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted 2261 * mode that would prevent lkb1's conversion from being granted, we do a 2262 * deadlk/demote on lkb2 right away and don't let it onto the convert queue. 2263 * I think this means that the lkb_is_ahead condition below should always 2264 * be zero, i.e. there will never be conv-deadlk between two locks that are 2265 * both already on the convert queue. 2266 */ 2267 2268 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2) 2269 { 2270 struct dlm_lkb *lkb1; 2271 int lkb_is_ahead = 0; 2272 2273 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) { 2274 if (lkb1 == lkb2) { 2275 lkb_is_ahead = 1; 2276 continue; 2277 } 2278 2279 if (!lkb_is_ahead) { 2280 if (!modes_compat(lkb2, lkb1)) 2281 return 1; 2282 } else { 2283 if (!modes_compat(lkb2, lkb1) && 2284 !modes_compat(lkb1, lkb2)) 2285 return 1; 2286 } 2287 } 2288 return 0; 2289 } 2290 2291 /* 2292 * Return 1 if the lock can be granted, 0 otherwise. 2293 * Also detect and resolve conversion deadlocks. 2294 * 2295 * lkb is the lock to be granted 2296 * 2297 * now is 1 if the function is being called in the context of the 2298 * immediate request, it is 0 if called later, after the lock has been 2299 * queued. 2300 * 2301 * recover is 1 if dlm_recover_grant() is trying to grant conversions 2302 * after recovery. 2303 * 2304 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 2305 */ 2306 2307 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2308 int recover) 2309 { 2310 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 2311 2312 /* 2313 * 6-10: Version 5.4 introduced an option to address the phenomenon of 2314 * a new request for a NL mode lock being blocked. 2315 * 2316 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 2317 * request, then it would be granted. In essence, the use of this flag 2318 * tells the Lock Manager to expedite theis request by not considering 2319 * what may be in the CONVERTING or WAITING queues... As of this 2320 * writing, the EXPEDITE flag can be used only with new requests for NL 2321 * mode locks. This flag is not valid for conversion requests. 2322 * 2323 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 2324 * conversion or used with a non-NL requested mode. We also know an 2325 * EXPEDITE request is always granted immediately, so now must always 2326 * be 1. The full condition to grant an expedite request: (now && 2327 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 2328 * therefore be shortened to just checking the flag. 2329 */ 2330 2331 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 2332 return 1; 2333 2334 /* 2335 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 2336 * added to the remaining conditions. 2337 */ 2338 2339 if (queue_conflict(&r->res_grantqueue, lkb)) 2340 return 0; 2341 2342 /* 2343 * 6-3: By default, a conversion request is immediately granted if the 2344 * requested mode is compatible with the modes of all other granted 2345 * locks 2346 */ 2347 2348 if (queue_conflict(&r->res_convertqueue, lkb)) 2349 return 0; 2350 2351 /* 2352 * The RECOVER_GRANT flag means dlm_recover_grant() is granting 2353 * locks for a recovered rsb, on which lkb's have been rebuilt. 2354 * The lkb's may have been rebuilt on the queues in a different 2355 * order than they were in on the previous master. So, granting 2356 * queued conversions in order after recovery doesn't make sense 2357 * since the order hasn't been preserved anyway. The new order 2358 * could also have created a new "in place" conversion deadlock. 2359 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX. 2360 * After recovery, there would be no granted locks, and possibly 2361 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after 2362 * recovery, grant conversions without considering order. 2363 */ 2364 2365 if (conv && recover) 2366 return 1; 2367 2368 /* 2369 * 6-5: But the default algorithm for deciding whether to grant or 2370 * queue conversion requests does not by itself guarantee that such 2371 * requests are serviced on a "first come first serve" basis. This, in 2372 * turn, can lead to a phenomenon known as "indefinate postponement". 2373 * 2374 * 6-7: This issue is dealt with by using the optional QUECVT flag with 2375 * the system service employed to request a lock conversion. This flag 2376 * forces certain conversion requests to be queued, even if they are 2377 * compatible with the granted modes of other locks on the same 2378 * resource. Thus, the use of this flag results in conversion requests 2379 * being ordered on a "first come first servce" basis. 2380 * 2381 * DCT: This condition is all about new conversions being able to occur 2382 * "in place" while the lock remains on the granted queue (assuming 2383 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 2384 * doesn't _have_ to go onto the convert queue where it's processed in 2385 * order. The "now" variable is necessary to distinguish converts 2386 * being received and processed for the first time now, because once a 2387 * convert is moved to the conversion queue the condition below applies 2388 * requiring fifo granting. 2389 */ 2390 2391 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 2392 return 1; 2393 2394 /* 2395 * Even if the convert is compat with all granted locks, 2396 * QUECVT forces it behind other locks on the convert queue. 2397 */ 2398 2399 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) { 2400 if (list_empty(&r->res_convertqueue)) 2401 return 1; 2402 else 2403 return 0; 2404 } 2405 2406 /* 2407 * The NOORDER flag is set to avoid the standard vms rules on grant 2408 * order. 2409 */ 2410 2411 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 2412 return 1; 2413 2414 /* 2415 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 2416 * granted until all other conversion requests ahead of it are granted 2417 * and/or canceled. 2418 */ 2419 2420 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 2421 return 1; 2422 2423 /* 2424 * 6-4: By default, a new request is immediately granted only if all 2425 * three of the following conditions are satisfied when the request is 2426 * issued: 2427 * - The queue of ungranted conversion requests for the resource is 2428 * empty. 2429 * - The queue of ungranted new requests for the resource is empty. 2430 * - The mode of the new request is compatible with the most 2431 * restrictive mode of all granted locks on the resource. 2432 */ 2433 2434 if (now && !conv && list_empty(&r->res_convertqueue) && 2435 list_empty(&r->res_waitqueue)) 2436 return 1; 2437 2438 /* 2439 * 6-4: Once a lock request is in the queue of ungranted new requests, 2440 * it cannot be granted until the queue of ungranted conversion 2441 * requests is empty, all ungranted new requests ahead of it are 2442 * granted and/or canceled, and it is compatible with the granted mode 2443 * of the most restrictive lock granted on the resource. 2444 */ 2445 2446 if (!now && !conv && list_empty(&r->res_convertqueue) && 2447 first_in_list(lkb, &r->res_waitqueue)) 2448 return 1; 2449 2450 return 0; 2451 } 2452 2453 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, 2454 int recover, int *err) 2455 { 2456 int rv; 2457 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 2458 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV); 2459 2460 if (err) 2461 *err = 0; 2462 2463 rv = _can_be_granted(r, lkb, now, recover); 2464 if (rv) 2465 goto out; 2466 2467 /* 2468 * The CONVDEADLK flag is non-standard and tells the dlm to resolve 2469 * conversion deadlocks by demoting grmode to NL, otherwise the dlm 2470 * cancels one of the locks. 2471 */ 2472 2473 if (is_convert && can_be_queued(lkb) && 2474 conversion_deadlock_detect(r, lkb)) { 2475 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) { 2476 lkb->lkb_grmode = DLM_LOCK_NL; 2477 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 2478 } else if (err) { 2479 *err = -EDEADLK; 2480 } else { 2481 log_print("can_be_granted deadlock %x now %d", 2482 lkb->lkb_id, now); 2483 dlm_dump_rsb(r); 2484 } 2485 goto out; 2486 } 2487 2488 /* 2489 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try 2490 * to grant a request in a mode other than the normal rqmode. It's a 2491 * simple way to provide a big optimization to applications that can 2492 * use them. 2493 */ 2494 2495 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR)) 2496 alt = DLM_LOCK_PR; 2497 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW)) 2498 alt = DLM_LOCK_CW; 2499 2500 if (alt) { 2501 lkb->lkb_rqmode = alt; 2502 rv = _can_be_granted(r, lkb, now, 0); 2503 if (rv) 2504 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 2505 else 2506 lkb->lkb_rqmode = rqmode; 2507 } 2508 out: 2509 return rv; 2510 } 2511 2512 /* Returns the highest requested mode of all blocked conversions; sets 2513 cw if there's a blocked conversion to DLM_LOCK_CW. */ 2514 2515 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw, 2516 unsigned int *count) 2517 { 2518 struct dlm_lkb *lkb, *s; 2519 int recover = rsb_flag(r, RSB_RECOVER_GRANT); 2520 int hi, demoted, quit, grant_restart, demote_restart; 2521 int deadlk; 2522 2523 quit = 0; 2524 restart: 2525 grant_restart = 0; 2526 demote_restart = 0; 2527 hi = DLM_LOCK_IV; 2528 2529 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 2530 demoted = is_demoted(lkb); 2531 deadlk = 0; 2532 2533 if (can_be_granted(r, lkb, 0, recover, &deadlk)) { 2534 grant_lock_pending(r, lkb); 2535 grant_restart = 1; 2536 if (count) 2537 (*count)++; 2538 continue; 2539 } 2540 2541 if (!demoted && is_demoted(lkb)) { 2542 log_print("WARN: pending demoted %x node %d %s", 2543 lkb->lkb_id, lkb->lkb_nodeid, r->res_name); 2544 demote_restart = 1; 2545 continue; 2546 } 2547 2548 if (deadlk) { 2549 /* 2550 * If DLM_LKB_NODLKWT flag is set and conversion 2551 * deadlock is detected, we request blocking AST and 2552 * down (or cancel) conversion. 2553 */ 2554 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) { 2555 if (lkb->lkb_highbast < lkb->lkb_rqmode) { 2556 queue_bast(r, lkb, lkb->lkb_rqmode); 2557 lkb->lkb_highbast = lkb->lkb_rqmode; 2558 } 2559 } else { 2560 log_print("WARN: pending deadlock %x node %d %s", 2561 lkb->lkb_id, lkb->lkb_nodeid, 2562 r->res_name); 2563 dlm_dump_rsb(r); 2564 } 2565 continue; 2566 } 2567 2568 hi = max_t(int, lkb->lkb_rqmode, hi); 2569 2570 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW) 2571 *cw = 1; 2572 } 2573 2574 if (grant_restart) 2575 goto restart; 2576 if (demote_restart && !quit) { 2577 quit = 1; 2578 goto restart; 2579 } 2580 2581 return max_t(int, high, hi); 2582 } 2583 2584 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw, 2585 unsigned int *count) 2586 { 2587 struct dlm_lkb *lkb, *s; 2588 2589 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 2590 if (can_be_granted(r, lkb, 0, 0, NULL)) { 2591 grant_lock_pending(r, lkb); 2592 if (count) 2593 (*count)++; 2594 } else { 2595 high = max_t(int, lkb->lkb_rqmode, high); 2596 if (lkb->lkb_rqmode == DLM_LOCK_CW) 2597 *cw = 1; 2598 } 2599 } 2600 2601 return high; 2602 } 2603 2604 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked 2605 on either the convert or waiting queue. 2606 high is the largest rqmode of all locks blocked on the convert or 2607 waiting queue. */ 2608 2609 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) 2610 { 2611 if (gr->lkb_grmode == DLM_LOCK_PR && cw) { 2612 if (gr->lkb_highbast < DLM_LOCK_EX) 2613 return 1; 2614 return 0; 2615 } 2616 2617 if (gr->lkb_highbast < high && 2618 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1]) 2619 return 1; 2620 return 0; 2621 } 2622 2623 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count) 2624 { 2625 struct dlm_lkb *lkb, *s; 2626 int high = DLM_LOCK_IV; 2627 int cw = 0; 2628 2629 if (!is_master(r)) { 2630 log_print("grant_pending_locks r nodeid %d", r->res_nodeid); 2631 dlm_dump_rsb(r); 2632 return; 2633 } 2634 2635 high = grant_pending_convert(r, high, &cw, count); 2636 high = grant_pending_wait(r, high, &cw, count); 2637 2638 if (high == DLM_LOCK_IV) 2639 return; 2640 2641 /* 2642 * If there are locks left on the wait/convert queue then send blocking 2643 * ASTs to granted locks based on the largest requested mode (high) 2644 * found above. 2645 */ 2646 2647 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 2648 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) { 2649 if (cw && high == DLM_LOCK_PR && 2650 lkb->lkb_grmode == DLM_LOCK_PR) 2651 queue_bast(r, lkb, DLM_LOCK_CW); 2652 else 2653 queue_bast(r, lkb, high); 2654 lkb->lkb_highbast = high; 2655 } 2656 } 2657 } 2658 2659 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq) 2660 { 2661 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) || 2662 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) { 2663 if (gr->lkb_highbast < DLM_LOCK_EX) 2664 return 1; 2665 return 0; 2666 } 2667 2668 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq)) 2669 return 1; 2670 return 0; 2671 } 2672 2673 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 2674 struct dlm_lkb *lkb) 2675 { 2676 struct dlm_lkb *gr; 2677 2678 list_for_each_entry(gr, head, lkb_statequeue) { 2679 /* skip self when sending basts to convertqueue */ 2680 if (gr == lkb) 2681 continue; 2682 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) { 2683 queue_bast(r, gr, lkb->lkb_rqmode); 2684 gr->lkb_highbast = lkb->lkb_rqmode; 2685 } 2686 } 2687 } 2688 2689 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 2690 { 2691 send_bast_queue(r, &r->res_grantqueue, lkb); 2692 } 2693 2694 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 2695 { 2696 send_bast_queue(r, &r->res_grantqueue, lkb); 2697 send_bast_queue(r, &r->res_convertqueue, lkb); 2698 } 2699 2700 /* set_master(r, lkb) -- set the master nodeid of a resource 2701 2702 The purpose of this function is to set the nodeid field in the given 2703 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 2704 known, it can just be copied to the lkb and the function will return 2705 0. If the rsb's nodeid is _not_ known, it needs to be looked up 2706 before it can be copied to the lkb. 2707 2708 When the rsb nodeid is being looked up remotely, the initial lkb 2709 causing the lookup is kept on the ls_waiters list waiting for the 2710 lookup reply. Other lkb's waiting for the same rsb lookup are kept 2711 on the rsb's res_lookup list until the master is verified. 2712 2713 Return values: 2714 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 2715 1: the rsb master is not available and the lkb has been placed on 2716 a wait queue 2717 */ 2718 2719 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 2720 { 2721 int our_nodeid = dlm_our_nodeid(); 2722 2723 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 2724 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 2725 r->res_first_lkid = lkb->lkb_id; 2726 lkb->lkb_nodeid = r->res_nodeid; 2727 return 0; 2728 } 2729 2730 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 2731 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 2732 return 1; 2733 } 2734 2735 if (r->res_master_nodeid == our_nodeid) { 2736 lkb->lkb_nodeid = 0; 2737 return 0; 2738 } 2739 2740 if (r->res_master_nodeid) { 2741 lkb->lkb_nodeid = r->res_master_nodeid; 2742 return 0; 2743 } 2744 2745 if (dlm_dir_nodeid(r) == our_nodeid) { 2746 /* This is a somewhat unusual case; find_rsb will usually 2747 have set res_master_nodeid when dir nodeid is local, but 2748 there are cases where we become the dir node after we've 2749 past find_rsb and go through _request_lock again. 2750 confirm_master() or process_lookup_list() needs to be 2751 called after this. */ 2752 log_debug(r->res_ls, "set_master %x self master %d dir %d %s", 2753 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid, 2754 r->res_name); 2755 r->res_master_nodeid = our_nodeid; 2756 r->res_nodeid = 0; 2757 lkb->lkb_nodeid = 0; 2758 return 0; 2759 } 2760 2761 wait_pending_remove(r); 2762 2763 r->res_first_lkid = lkb->lkb_id; 2764 send_lookup(r, lkb); 2765 return 1; 2766 } 2767 2768 static void process_lookup_list(struct dlm_rsb *r) 2769 { 2770 struct dlm_lkb *lkb, *safe; 2771 2772 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 2773 list_del_init(&lkb->lkb_rsb_lookup); 2774 _request_lock(r, lkb); 2775 schedule(); 2776 } 2777 } 2778 2779 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 2780 2781 static void confirm_master(struct dlm_rsb *r, int error) 2782 { 2783 struct dlm_lkb *lkb; 2784 2785 if (!r->res_first_lkid) 2786 return; 2787 2788 switch (error) { 2789 case 0: 2790 case -EINPROGRESS: 2791 r->res_first_lkid = 0; 2792 process_lookup_list(r); 2793 break; 2794 2795 case -EAGAIN: 2796 case -EBADR: 2797 case -ENOTBLK: 2798 /* the remote request failed and won't be retried (it was 2799 a NOQUEUE, or has been canceled/unlocked); make a waiting 2800 lkb the first_lkid */ 2801 2802 r->res_first_lkid = 0; 2803 2804 if (!list_empty(&r->res_lookup)) { 2805 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 2806 lkb_rsb_lookup); 2807 list_del_init(&lkb->lkb_rsb_lookup); 2808 r->res_first_lkid = lkb->lkb_id; 2809 _request_lock(r, lkb); 2810 } 2811 break; 2812 2813 default: 2814 log_error(r->res_ls, "confirm_master unknown error %d", error); 2815 } 2816 } 2817 2818 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 2819 int namelen, unsigned long timeout_cs, 2820 void (*ast) (void *astparam), 2821 void *astparam, 2822 void (*bast) (void *astparam, int mode), 2823 struct dlm_args *args) 2824 { 2825 int rv = -EINVAL; 2826 2827 /* check for invalid arg usage */ 2828 2829 if (mode < 0 || mode > DLM_LOCK_EX) 2830 goto out; 2831 2832 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 2833 goto out; 2834 2835 if (flags & DLM_LKF_CANCEL) 2836 goto out; 2837 2838 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 2839 goto out; 2840 2841 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 2842 goto out; 2843 2844 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 2845 goto out; 2846 2847 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 2848 goto out; 2849 2850 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 2851 goto out; 2852 2853 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 2854 goto out; 2855 2856 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 2857 goto out; 2858 2859 if (!ast || !lksb) 2860 goto out; 2861 2862 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 2863 goto out; 2864 2865 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 2866 goto out; 2867 2868 /* these args will be copied to the lkb in validate_lock_args, 2869 it cannot be done now because when converting locks, fields in 2870 an active lkb cannot be modified before locking the rsb */ 2871 2872 args->flags = flags; 2873 args->astfn = ast; 2874 args->astparam = astparam; 2875 args->bastfn = bast; 2876 args->timeout = timeout_cs; 2877 args->mode = mode; 2878 args->lksb = lksb; 2879 rv = 0; 2880 out: 2881 return rv; 2882 } 2883 2884 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 2885 { 2886 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 2887 DLM_LKF_FORCEUNLOCK)) 2888 return -EINVAL; 2889 2890 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK) 2891 return -EINVAL; 2892 2893 args->flags = flags; 2894 args->astparam = astarg; 2895 return 0; 2896 } 2897 2898 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2899 struct dlm_args *args) 2900 { 2901 int rv = -EINVAL; 2902 2903 if (args->flags & DLM_LKF_CONVERT) { 2904 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 2905 goto out; 2906 2907 if (args->flags & DLM_LKF_QUECVT && 2908 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 2909 goto out; 2910 2911 rv = -EBUSY; 2912 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2913 goto out; 2914 2915 if (lkb->lkb_wait_type) 2916 goto out; 2917 2918 if (is_overlap(lkb)) 2919 goto out; 2920 } 2921 2922 lkb->lkb_exflags = args->flags; 2923 lkb->lkb_sbflags = 0; 2924 lkb->lkb_astfn = args->astfn; 2925 lkb->lkb_astparam = args->astparam; 2926 lkb->lkb_bastfn = args->bastfn; 2927 lkb->lkb_rqmode = args->mode; 2928 lkb->lkb_lksb = args->lksb; 2929 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 2930 lkb->lkb_ownpid = (int) current->pid; 2931 lkb->lkb_timeout_cs = args->timeout; 2932 rv = 0; 2933 out: 2934 if (rv) 2935 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s", 2936 rv, lkb->lkb_id, lkb->lkb_flags, args->flags, 2937 lkb->lkb_status, lkb->lkb_wait_type, 2938 lkb->lkb_resource->res_name); 2939 return rv; 2940 } 2941 2942 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 2943 for success */ 2944 2945 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here 2946 because there may be a lookup in progress and it's valid to do 2947 cancel/unlockf on it */ 2948 2949 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 2950 { 2951 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 2952 int rv = -EINVAL; 2953 2954 if (lkb->lkb_flags & DLM_IFL_MSTCPY) { 2955 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); 2956 dlm_print_lkb(lkb); 2957 goto out; 2958 } 2959 2960 /* an lkb may still exist even though the lock is EOL'ed due to a 2961 cancel, unlock or failed noqueue request; an app can't use these 2962 locks; return same error as if the lkid had not been found at all */ 2963 2964 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { 2965 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); 2966 rv = -ENOENT; 2967 goto out; 2968 } 2969 2970 /* an lkb may be waiting for an rsb lookup to complete where the 2971 lookup was initiated by another lock */ 2972 2973 if (!list_empty(&lkb->lkb_rsb_lookup)) { 2974 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { 2975 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); 2976 list_del_init(&lkb->lkb_rsb_lookup); 2977 queue_cast(lkb->lkb_resource, lkb, 2978 args->flags & DLM_LKF_CANCEL ? 2979 -DLM_ECANCEL : -DLM_EUNLOCK); 2980 unhold_lkb(lkb); /* undoes create_lkb() */ 2981 } 2982 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */ 2983 rv = -EBUSY; 2984 goto out; 2985 } 2986 2987 /* cancel not allowed with another cancel/unlock in progress */ 2988 2989 if (args->flags & DLM_LKF_CANCEL) { 2990 if (lkb->lkb_exflags & DLM_LKF_CANCEL) 2991 goto out; 2992 2993 if (is_overlap(lkb)) 2994 goto out; 2995 2996 /* don't let scand try to do a cancel */ 2997 del_timeout(lkb); 2998 2999 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3000 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 3001 rv = -EBUSY; 3002 goto out; 3003 } 3004 3005 /* there's nothing to cancel */ 3006 if (lkb->lkb_status == DLM_LKSTS_GRANTED && 3007 !lkb->lkb_wait_type) { 3008 rv = -EBUSY; 3009 goto out; 3010 } 3011 3012 switch (lkb->lkb_wait_type) { 3013 case DLM_MSG_LOOKUP: 3014 case DLM_MSG_REQUEST: 3015 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; 3016 rv = -EBUSY; 3017 goto out; 3018 case DLM_MSG_UNLOCK: 3019 case DLM_MSG_CANCEL: 3020 goto out; 3021 } 3022 /* add_to_waiters() will set OVERLAP_CANCEL */ 3023 goto out_ok; 3024 } 3025 3026 /* do we need to allow a force-unlock if there's a normal unlock 3027 already in progress? in what conditions could the normal unlock 3028 fail such that we'd want to send a force-unlock to be sure? */ 3029 3030 if (args->flags & DLM_LKF_FORCEUNLOCK) { 3031 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) 3032 goto out; 3033 3034 if (is_overlap_unlock(lkb)) 3035 goto out; 3036 3037 /* don't let scand try to do a cancel */ 3038 del_timeout(lkb); 3039 3040 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3041 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3042 rv = -EBUSY; 3043 goto out; 3044 } 3045 3046 switch (lkb->lkb_wait_type) { 3047 case DLM_MSG_LOOKUP: 3048 case DLM_MSG_REQUEST: 3049 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; 3050 rv = -EBUSY; 3051 goto out; 3052 case DLM_MSG_UNLOCK: 3053 goto out; 3054 } 3055 /* add_to_waiters() will set OVERLAP_UNLOCK */ 3056 goto out_ok; 3057 } 3058 3059 /* normal unlock not allowed if there's any op in progress */ 3060 rv = -EBUSY; 3061 if (lkb->lkb_wait_type || lkb->lkb_wait_count) 3062 goto out; 3063 3064 out_ok: 3065 /* an overlapping op shouldn't blow away exflags from other op */ 3066 lkb->lkb_exflags |= args->flags; 3067 lkb->lkb_sbflags = 0; 3068 lkb->lkb_astparam = args->astparam; 3069 rv = 0; 3070 out: 3071 if (rv) 3072 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, 3073 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, 3074 args->flags, lkb->lkb_wait_type, 3075 lkb->lkb_resource->res_name); 3076 return rv; 3077 } 3078 3079 /* 3080 * Four stage 4 varieties: 3081 * do_request(), do_convert(), do_unlock(), do_cancel() 3082 * These are called on the master node for the given lock and 3083 * from the central locking logic. 3084 */ 3085 3086 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3087 { 3088 int error = 0; 3089 3090 if (can_be_granted(r, lkb, 1, 0, NULL)) { 3091 grant_lock(r, lkb); 3092 queue_cast(r, lkb, 0); 3093 goto out; 3094 } 3095 3096 if (can_be_queued(lkb)) { 3097 error = -EINPROGRESS; 3098 add_lkb(r, lkb, DLM_LKSTS_WAITING); 3099 add_timeout(lkb); 3100 goto out; 3101 } 3102 3103 error = -EAGAIN; 3104 queue_cast(r, lkb, -EAGAIN); 3105 out: 3106 return error; 3107 } 3108 3109 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3110 int error) 3111 { 3112 switch (error) { 3113 case -EAGAIN: 3114 if (force_blocking_asts(lkb)) 3115 send_blocking_asts_all(r, lkb); 3116 break; 3117 case -EINPROGRESS: 3118 send_blocking_asts(r, lkb); 3119 break; 3120 } 3121 } 3122 3123 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3124 { 3125 int error = 0; 3126 int deadlk = 0; 3127 3128 /* changing an existing lock may allow others to be granted */ 3129 3130 if (can_be_granted(r, lkb, 1, 0, &deadlk)) { 3131 grant_lock(r, lkb); 3132 queue_cast(r, lkb, 0); 3133 goto out; 3134 } 3135 3136 /* can_be_granted() detected that this lock would block in a conversion 3137 deadlock, so we leave it on the granted queue and return EDEADLK in 3138 the ast for the convert. */ 3139 3140 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) { 3141 /* it's left on the granted queue */ 3142 revert_lock(r, lkb); 3143 queue_cast(r, lkb, -EDEADLK); 3144 error = -EDEADLK; 3145 goto out; 3146 } 3147 3148 /* is_demoted() means the can_be_granted() above set the grmode 3149 to NL, and left us on the granted queue. This auto-demotion 3150 (due to CONVDEADLK) might mean other locks, and/or this lock, are 3151 now grantable. We have to try to grant other converting locks 3152 before we try again to grant this one. */ 3153 3154 if (is_demoted(lkb)) { 3155 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL); 3156 if (_can_be_granted(r, lkb, 1, 0)) { 3157 grant_lock(r, lkb); 3158 queue_cast(r, lkb, 0); 3159 goto out; 3160 } 3161 /* else fall through and move to convert queue */ 3162 } 3163 3164 if (can_be_queued(lkb)) { 3165 error = -EINPROGRESS; 3166 del_lkb(r, lkb); 3167 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 3168 add_timeout(lkb); 3169 goto out; 3170 } 3171 3172 error = -EAGAIN; 3173 queue_cast(r, lkb, -EAGAIN); 3174 out: 3175 return error; 3176 } 3177 3178 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3179 int error) 3180 { 3181 switch (error) { 3182 case 0: 3183 grant_pending_locks(r, NULL); 3184 /* grant_pending_locks also sends basts */ 3185 break; 3186 case -EAGAIN: 3187 if (force_blocking_asts(lkb)) 3188 send_blocking_asts_all(r, lkb); 3189 break; 3190 case -EINPROGRESS: 3191 send_blocking_asts(r, lkb); 3192 break; 3193 } 3194 } 3195 3196 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3197 { 3198 remove_lock(r, lkb); 3199 queue_cast(r, lkb, -DLM_EUNLOCK); 3200 return -DLM_EUNLOCK; 3201 } 3202 3203 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3204 int error) 3205 { 3206 grant_pending_locks(r, NULL); 3207 } 3208 3209 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ 3210 3211 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3212 { 3213 int error; 3214 3215 error = revert_lock(r, lkb); 3216 if (error) { 3217 queue_cast(r, lkb, -DLM_ECANCEL); 3218 return -DLM_ECANCEL; 3219 } 3220 return 0; 3221 } 3222 3223 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, 3224 int error) 3225 { 3226 if (error) 3227 grant_pending_locks(r, NULL); 3228 } 3229 3230 /* 3231 * Four stage 3 varieties: 3232 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 3233 */ 3234 3235 /* add a new lkb to a possibly new rsb, called by requesting process */ 3236 3237 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3238 { 3239 int error; 3240 3241 /* set_master: sets lkb nodeid from r */ 3242 3243 error = set_master(r, lkb); 3244 if (error < 0) 3245 goto out; 3246 if (error) { 3247 error = 0; 3248 goto out; 3249 } 3250 3251 if (is_remote(r)) { 3252 /* receive_request() calls do_request() on remote node */ 3253 error = send_request(r, lkb); 3254 } else { 3255 error = do_request(r, lkb); 3256 /* for remote locks the request_reply is sent 3257 between do_request and do_request_effects */ 3258 do_request_effects(r, lkb, error); 3259 } 3260 out: 3261 return error; 3262 } 3263 3264 /* change some property of an existing lkb, e.g. mode */ 3265 3266 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3267 { 3268 int error; 3269 3270 if (is_remote(r)) { 3271 /* receive_convert() calls do_convert() on remote node */ 3272 error = send_convert(r, lkb); 3273 } else { 3274 error = do_convert(r, lkb); 3275 /* for remote locks the convert_reply is sent 3276 between do_convert and do_convert_effects */ 3277 do_convert_effects(r, lkb, error); 3278 } 3279 3280 return error; 3281 } 3282 3283 /* remove an existing lkb from the granted queue */ 3284 3285 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3286 { 3287 int error; 3288 3289 if (is_remote(r)) { 3290 /* receive_unlock() calls do_unlock() on remote node */ 3291 error = send_unlock(r, lkb); 3292 } else { 3293 error = do_unlock(r, lkb); 3294 /* for remote locks the unlock_reply is sent 3295 between do_unlock and do_unlock_effects */ 3296 do_unlock_effects(r, lkb, error); 3297 } 3298 3299 return error; 3300 } 3301 3302 /* remove an existing lkb from the convert or wait queue */ 3303 3304 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3305 { 3306 int error; 3307 3308 if (is_remote(r)) { 3309 /* receive_cancel() calls do_cancel() on remote node */ 3310 error = send_cancel(r, lkb); 3311 } else { 3312 error = do_cancel(r, lkb); 3313 /* for remote locks the cancel_reply is sent 3314 between do_cancel and do_cancel_effects */ 3315 do_cancel_effects(r, lkb, error); 3316 } 3317 3318 return error; 3319 } 3320 3321 /* 3322 * Four stage 2 varieties: 3323 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 3324 */ 3325 3326 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, 3327 int len, struct dlm_args *args) 3328 { 3329 struct dlm_rsb *r; 3330 int error; 3331 3332 error = validate_lock_args(ls, lkb, args); 3333 if (error) 3334 return error; 3335 3336 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 3337 if (error) 3338 return error; 3339 3340 lock_rsb(r); 3341 3342 attach_lkb(r, lkb); 3343 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 3344 3345 error = _request_lock(r, lkb); 3346 3347 unlock_rsb(r); 3348 put_rsb(r); 3349 return error; 3350 } 3351 3352 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3353 struct dlm_args *args) 3354 { 3355 struct dlm_rsb *r; 3356 int error; 3357 3358 r = lkb->lkb_resource; 3359 3360 hold_rsb(r); 3361 lock_rsb(r); 3362 3363 error = validate_lock_args(ls, lkb, args); 3364 if (error) 3365 goto out; 3366 3367 error = _convert_lock(r, lkb); 3368 out: 3369 unlock_rsb(r); 3370 put_rsb(r); 3371 return error; 3372 } 3373 3374 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3375 struct dlm_args *args) 3376 { 3377 struct dlm_rsb *r; 3378 int error; 3379 3380 r = lkb->lkb_resource; 3381 3382 hold_rsb(r); 3383 lock_rsb(r); 3384 3385 error = validate_unlock_args(lkb, args); 3386 if (error) 3387 goto out; 3388 3389 error = _unlock_lock(r, lkb); 3390 out: 3391 unlock_rsb(r); 3392 put_rsb(r); 3393 return error; 3394 } 3395 3396 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 3397 struct dlm_args *args) 3398 { 3399 struct dlm_rsb *r; 3400 int error; 3401 3402 r = lkb->lkb_resource; 3403 3404 hold_rsb(r); 3405 lock_rsb(r); 3406 3407 error = validate_unlock_args(lkb, args); 3408 if (error) 3409 goto out; 3410 3411 error = _cancel_lock(r, lkb); 3412 out: 3413 unlock_rsb(r); 3414 put_rsb(r); 3415 return error; 3416 } 3417 3418 /* 3419 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 3420 */ 3421 3422 int dlm_lock(dlm_lockspace_t *lockspace, 3423 int mode, 3424 struct dlm_lksb *lksb, 3425 uint32_t flags, 3426 void *name, 3427 unsigned int namelen, 3428 uint32_t parent_lkid, 3429 void (*ast) (void *astarg), 3430 void *astarg, 3431 void (*bast) (void *astarg, int mode)) 3432 { 3433 struct dlm_ls *ls; 3434 struct dlm_lkb *lkb; 3435 struct dlm_args args; 3436 int error, convert = flags & DLM_LKF_CONVERT; 3437 3438 ls = dlm_find_lockspace_local(lockspace); 3439 if (!ls) 3440 return -EINVAL; 3441 3442 dlm_lock_recovery(ls); 3443 3444 if (convert) 3445 error = find_lkb(ls, lksb->sb_lkid, &lkb); 3446 else 3447 error = create_lkb(ls, &lkb); 3448 3449 if (error) 3450 goto out; 3451 3452 trace_dlm_lock_start(ls, lkb, mode, flags); 3453 3454 error = set_lock_args(mode, lksb, flags, namelen, 0, ast, 3455 astarg, bast, &args); 3456 if (error) 3457 goto out_put; 3458 3459 if (convert) 3460 error = convert_lock(ls, lkb, &args); 3461 else 3462 error = request_lock(ls, lkb, name, namelen, &args); 3463 3464 if (error == -EINPROGRESS) 3465 error = 0; 3466 out_put: 3467 trace_dlm_lock_end(ls, lkb, mode, flags, error); 3468 3469 if (convert || error) 3470 __put_lkb(ls, lkb); 3471 if (error == -EAGAIN || error == -EDEADLK) 3472 error = 0; 3473 out: 3474 dlm_unlock_recovery(ls); 3475 dlm_put_lockspace(ls); 3476 return error; 3477 } 3478 3479 int dlm_unlock(dlm_lockspace_t *lockspace, 3480 uint32_t lkid, 3481 uint32_t flags, 3482 struct dlm_lksb *lksb, 3483 void *astarg) 3484 { 3485 struct dlm_ls *ls; 3486 struct dlm_lkb *lkb; 3487 struct dlm_args args; 3488 int error; 3489 3490 ls = dlm_find_lockspace_local(lockspace); 3491 if (!ls) 3492 return -EINVAL; 3493 3494 dlm_lock_recovery(ls); 3495 3496 error = find_lkb(ls, lkid, &lkb); 3497 if (error) 3498 goto out; 3499 3500 trace_dlm_unlock_start(ls, lkb, flags); 3501 3502 error = set_unlock_args(flags, astarg, &args); 3503 if (error) 3504 goto out_put; 3505 3506 if (flags & DLM_LKF_CANCEL) 3507 error = cancel_lock(ls, lkb, &args); 3508 else 3509 error = unlock_lock(ls, lkb, &args); 3510 3511 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 3512 error = 0; 3513 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK))) 3514 error = 0; 3515 out_put: 3516 trace_dlm_unlock_end(ls, lkb, flags, error); 3517 3518 dlm_put_lkb(lkb); 3519 out: 3520 dlm_unlock_recovery(ls); 3521 dlm_put_lockspace(ls); 3522 return error; 3523 } 3524 3525 /* 3526 * send/receive routines for remote operations and replies 3527 * 3528 * send_args 3529 * send_common 3530 * send_request receive_request 3531 * send_convert receive_convert 3532 * send_unlock receive_unlock 3533 * send_cancel receive_cancel 3534 * send_grant receive_grant 3535 * send_bast receive_bast 3536 * send_lookup receive_lookup 3537 * send_remove receive_remove 3538 * 3539 * send_common_reply 3540 * receive_request_reply send_request_reply 3541 * receive_convert_reply send_convert_reply 3542 * receive_unlock_reply send_unlock_reply 3543 * receive_cancel_reply send_cancel_reply 3544 * receive_lookup_reply send_lookup_reply 3545 */ 3546 3547 static int _create_message(struct dlm_ls *ls, int mb_len, 3548 int to_nodeid, int mstype, 3549 struct dlm_message **ms_ret, 3550 struct dlm_mhandle **mh_ret) 3551 { 3552 struct dlm_message *ms; 3553 struct dlm_mhandle *mh; 3554 char *mb; 3555 3556 /* get_buffer gives us a message handle (mh) that we need to 3557 pass into midcomms_commit and a message buffer (mb) that we 3558 write our data into */ 3559 3560 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb); 3561 if (!mh) 3562 return -ENOBUFS; 3563 3564 ms = (struct dlm_message *) mb; 3565 3566 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 3567 ms->m_header.u.h_lockspace = ls->ls_global_id; 3568 ms->m_header.h_nodeid = dlm_our_nodeid(); 3569 ms->m_header.h_length = mb_len; 3570 ms->m_header.h_cmd = DLM_MSG; 3571 3572 ms->m_type = mstype; 3573 3574 *mh_ret = mh; 3575 *ms_ret = ms; 3576 return 0; 3577 } 3578 3579 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 3580 int to_nodeid, int mstype, 3581 struct dlm_message **ms_ret, 3582 struct dlm_mhandle **mh_ret) 3583 { 3584 int mb_len = sizeof(struct dlm_message); 3585 3586 switch (mstype) { 3587 case DLM_MSG_REQUEST: 3588 case DLM_MSG_LOOKUP: 3589 case DLM_MSG_REMOVE: 3590 mb_len += r->res_length; 3591 break; 3592 case DLM_MSG_CONVERT: 3593 case DLM_MSG_UNLOCK: 3594 case DLM_MSG_REQUEST_REPLY: 3595 case DLM_MSG_CONVERT_REPLY: 3596 case DLM_MSG_GRANT: 3597 if (lkb && lkb->lkb_lvbptr) 3598 mb_len += r->res_ls->ls_lvblen; 3599 break; 3600 } 3601 3602 return _create_message(r->res_ls, mb_len, to_nodeid, mstype, 3603 ms_ret, mh_ret); 3604 } 3605 3606 /* further lowcomms enhancements or alternate implementations may make 3607 the return value from this function useful at some point */ 3608 3609 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) 3610 { 3611 dlm_message_out(ms); 3612 dlm_midcomms_commit_mhandle(mh); 3613 return 0; 3614 } 3615 3616 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 3617 struct dlm_message *ms) 3618 { 3619 ms->m_nodeid = lkb->lkb_nodeid; 3620 ms->m_pid = lkb->lkb_ownpid; 3621 ms->m_lkid = lkb->lkb_id; 3622 ms->m_remid = lkb->lkb_remid; 3623 ms->m_exflags = lkb->lkb_exflags; 3624 ms->m_sbflags = lkb->lkb_sbflags; 3625 ms->m_flags = lkb->lkb_flags; 3626 ms->m_lvbseq = lkb->lkb_lvbseq; 3627 ms->m_status = lkb->lkb_status; 3628 ms->m_grmode = lkb->lkb_grmode; 3629 ms->m_rqmode = lkb->lkb_rqmode; 3630 ms->m_hash = r->res_hash; 3631 3632 /* m_result and m_bastmode are set from function args, 3633 not from lkb fields */ 3634 3635 if (lkb->lkb_bastfn) 3636 ms->m_asts |= DLM_CB_BAST; 3637 if (lkb->lkb_astfn) 3638 ms->m_asts |= DLM_CB_CAST; 3639 3640 /* compare with switch in create_message; send_remove() doesn't 3641 use send_args() */ 3642 3643 switch (ms->m_type) { 3644 case DLM_MSG_REQUEST: 3645 case DLM_MSG_LOOKUP: 3646 memcpy(ms->m_extra, r->res_name, r->res_length); 3647 break; 3648 case DLM_MSG_CONVERT: 3649 case DLM_MSG_UNLOCK: 3650 case DLM_MSG_REQUEST_REPLY: 3651 case DLM_MSG_CONVERT_REPLY: 3652 case DLM_MSG_GRANT: 3653 if (!lkb->lkb_lvbptr) 3654 break; 3655 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 3656 break; 3657 } 3658 } 3659 3660 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 3661 { 3662 struct dlm_message *ms; 3663 struct dlm_mhandle *mh; 3664 int to_nodeid, error; 3665 3666 to_nodeid = r->res_nodeid; 3667 3668 error = add_to_waiters(lkb, mstype, to_nodeid); 3669 if (error) 3670 return error; 3671 3672 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3673 if (error) 3674 goto fail; 3675 3676 send_args(r, lkb, ms); 3677 3678 error = send_message(mh, ms); 3679 if (error) 3680 goto fail; 3681 return 0; 3682 3683 fail: 3684 remove_from_waiters(lkb, msg_reply_type(mstype)); 3685 return error; 3686 } 3687 3688 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 3689 { 3690 return send_common(r, lkb, DLM_MSG_REQUEST); 3691 } 3692 3693 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 3694 { 3695 int error; 3696 3697 error = send_common(r, lkb, DLM_MSG_CONVERT); 3698 3699 /* down conversions go without a reply from the master */ 3700 if (!error && down_conversion(lkb)) { 3701 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY); 3702 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS; 3703 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY; 3704 r->res_ls->ls_stub_ms.m_result = 0; 3705 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 3706 } 3707 3708 return error; 3709 } 3710 3711 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 3712 MASTER_UNCERTAIN to force the next request on the rsb to confirm 3713 that the master is still correct. */ 3714 3715 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 3716 { 3717 return send_common(r, lkb, DLM_MSG_UNLOCK); 3718 } 3719 3720 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 3721 { 3722 return send_common(r, lkb, DLM_MSG_CANCEL); 3723 } 3724 3725 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 3726 { 3727 struct dlm_message *ms; 3728 struct dlm_mhandle *mh; 3729 int to_nodeid, error; 3730 3731 to_nodeid = lkb->lkb_nodeid; 3732 3733 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 3734 if (error) 3735 goto out; 3736 3737 send_args(r, lkb, ms); 3738 3739 ms->m_result = 0; 3740 3741 error = send_message(mh, ms); 3742 out: 3743 return error; 3744 } 3745 3746 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 3747 { 3748 struct dlm_message *ms; 3749 struct dlm_mhandle *mh; 3750 int to_nodeid, error; 3751 3752 to_nodeid = lkb->lkb_nodeid; 3753 3754 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 3755 if (error) 3756 goto out; 3757 3758 send_args(r, lkb, ms); 3759 3760 ms->m_bastmode = mode; 3761 3762 error = send_message(mh, ms); 3763 out: 3764 return error; 3765 } 3766 3767 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 3768 { 3769 struct dlm_message *ms; 3770 struct dlm_mhandle *mh; 3771 int to_nodeid, error; 3772 3773 to_nodeid = dlm_dir_nodeid(r); 3774 3775 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid); 3776 if (error) 3777 return error; 3778 3779 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 3780 if (error) 3781 goto fail; 3782 3783 send_args(r, lkb, ms); 3784 3785 error = send_message(mh, ms); 3786 if (error) 3787 goto fail; 3788 return 0; 3789 3790 fail: 3791 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 3792 return error; 3793 } 3794 3795 static int send_remove(struct dlm_rsb *r) 3796 { 3797 struct dlm_message *ms; 3798 struct dlm_mhandle *mh; 3799 int to_nodeid, error; 3800 3801 to_nodeid = dlm_dir_nodeid(r); 3802 3803 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 3804 if (error) 3805 goto out; 3806 3807 memcpy(ms->m_extra, r->res_name, r->res_length); 3808 ms->m_hash = r->res_hash; 3809 3810 error = send_message(mh, ms); 3811 out: 3812 return error; 3813 } 3814 3815 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 3816 int mstype, int rv) 3817 { 3818 struct dlm_message *ms; 3819 struct dlm_mhandle *mh; 3820 int to_nodeid, error; 3821 3822 to_nodeid = lkb->lkb_nodeid; 3823 3824 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 3825 if (error) 3826 goto out; 3827 3828 send_args(r, lkb, ms); 3829 3830 ms->m_result = rv; 3831 3832 error = send_message(mh, ms); 3833 out: 3834 return error; 3835 } 3836 3837 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3838 { 3839 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 3840 } 3841 3842 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3843 { 3844 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 3845 } 3846 3847 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3848 { 3849 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 3850 } 3851 3852 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 3853 { 3854 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 3855 } 3856 3857 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 3858 int ret_nodeid, int rv) 3859 { 3860 struct dlm_rsb *r = &ls->ls_stub_rsb; 3861 struct dlm_message *ms; 3862 struct dlm_mhandle *mh; 3863 int error, nodeid = ms_in->m_header.h_nodeid; 3864 3865 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 3866 if (error) 3867 goto out; 3868 3869 ms->m_lkid = ms_in->m_lkid; 3870 ms->m_result = rv; 3871 ms->m_nodeid = ret_nodeid; 3872 3873 error = send_message(mh, ms); 3874 out: 3875 return error; 3876 } 3877 3878 /* which args we save from a received message depends heavily on the type 3879 of message, unlike the send side where we can safely send everything about 3880 the lkb for any type of message */ 3881 3882 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 3883 { 3884 lkb->lkb_exflags = ms->m_exflags; 3885 lkb->lkb_sbflags = ms->m_sbflags; 3886 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3887 (ms->m_flags & 0x0000FFFF); 3888 } 3889 3890 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 3891 { 3892 if (ms->m_flags == DLM_IFL_STUB_MS) 3893 return; 3894 3895 lkb->lkb_sbflags = ms->m_sbflags; 3896 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 3897 (ms->m_flags & 0x0000FFFF); 3898 } 3899 3900 static int receive_extralen(struct dlm_message *ms) 3901 { 3902 return (ms->m_header.h_length - sizeof(struct dlm_message)); 3903 } 3904 3905 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 3906 struct dlm_message *ms) 3907 { 3908 int len; 3909 3910 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3911 if (!lkb->lkb_lvbptr) 3912 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3913 if (!lkb->lkb_lvbptr) 3914 return -ENOMEM; 3915 len = receive_extralen(ms); 3916 if (len > ls->ls_lvblen) 3917 len = ls->ls_lvblen; 3918 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 3919 } 3920 return 0; 3921 } 3922 3923 static void fake_bastfn(void *astparam, int mode) 3924 { 3925 log_print("fake_bastfn should not be called"); 3926 } 3927 3928 static void fake_astfn(void *astparam) 3929 { 3930 log_print("fake_astfn should not be called"); 3931 } 3932 3933 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3934 struct dlm_message *ms) 3935 { 3936 lkb->lkb_nodeid = ms->m_header.h_nodeid; 3937 lkb->lkb_ownpid = ms->m_pid; 3938 lkb->lkb_remid = ms->m_lkid; 3939 lkb->lkb_grmode = DLM_LOCK_IV; 3940 lkb->lkb_rqmode = ms->m_rqmode; 3941 3942 lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 3943 lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 3944 3945 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3946 /* lkb was just created so there won't be an lvb yet */ 3947 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 3948 if (!lkb->lkb_lvbptr) 3949 return -ENOMEM; 3950 } 3951 3952 return 0; 3953 } 3954 3955 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3956 struct dlm_message *ms) 3957 { 3958 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 3959 return -EBUSY; 3960 3961 if (receive_lvb(ls, lkb, ms)) 3962 return -ENOMEM; 3963 3964 lkb->lkb_rqmode = ms->m_rqmode; 3965 lkb->lkb_lvbseq = ms->m_lvbseq; 3966 3967 return 0; 3968 } 3969 3970 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3971 struct dlm_message *ms) 3972 { 3973 if (receive_lvb(ls, lkb, ms)) 3974 return -ENOMEM; 3975 return 0; 3976 } 3977 3978 /* We fill in the stub-lkb fields with the info that send_xxxx_reply() 3979 uses to send a reply and that the remote end uses to process the reply. */ 3980 3981 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 3982 { 3983 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 3984 lkb->lkb_nodeid = ms->m_header.h_nodeid; 3985 lkb->lkb_remid = ms->m_lkid; 3986 } 3987 3988 /* This is called after the rsb is locked so that we can safely inspect 3989 fields in the lkb. */ 3990 3991 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms) 3992 { 3993 int from = ms->m_header.h_nodeid; 3994 int error = 0; 3995 3996 /* currently mixing of user/kernel locks are not supported */ 3997 if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) { 3998 log_error(lkb->lkb_resource->res_ls, 3999 "got user dlm message for a kernel lock"); 4000 error = -EINVAL; 4001 goto out; 4002 } 4003 4004 switch (ms->m_type) { 4005 case DLM_MSG_CONVERT: 4006 case DLM_MSG_UNLOCK: 4007 case DLM_MSG_CANCEL: 4008 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from) 4009 error = -EINVAL; 4010 break; 4011 4012 case DLM_MSG_CONVERT_REPLY: 4013 case DLM_MSG_UNLOCK_REPLY: 4014 case DLM_MSG_CANCEL_REPLY: 4015 case DLM_MSG_GRANT: 4016 case DLM_MSG_BAST: 4017 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from) 4018 error = -EINVAL; 4019 break; 4020 4021 case DLM_MSG_REQUEST_REPLY: 4022 if (!is_process_copy(lkb)) 4023 error = -EINVAL; 4024 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from) 4025 error = -EINVAL; 4026 break; 4027 4028 default: 4029 error = -EINVAL; 4030 } 4031 4032 out: 4033 if (error) 4034 log_error(lkb->lkb_resource->res_ls, 4035 "ignore invalid message %d from %d %x %x %x %d", 4036 ms->m_type, from, lkb->lkb_id, lkb->lkb_remid, 4037 lkb->lkb_flags, lkb->lkb_nodeid); 4038 return error; 4039 } 4040 4041 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len) 4042 { 4043 char name[DLM_RESNAME_MAXLEN + 1]; 4044 struct dlm_message *ms; 4045 struct dlm_mhandle *mh; 4046 struct dlm_rsb *r; 4047 uint32_t hash, b; 4048 int rv, dir_nodeid; 4049 4050 memset(name, 0, sizeof(name)); 4051 memcpy(name, ms_name, len); 4052 4053 hash = jhash(name, len, 0); 4054 b = hash & (ls->ls_rsbtbl_size - 1); 4055 4056 dir_nodeid = dlm_hash2nodeid(ls, hash); 4057 4058 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name); 4059 4060 spin_lock(&ls->ls_rsbtbl[b].lock); 4061 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4062 if (!rv) { 4063 spin_unlock(&ls->ls_rsbtbl[b].lock); 4064 log_error(ls, "repeat_remove on keep %s", name); 4065 return; 4066 } 4067 4068 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4069 if (!rv) { 4070 spin_unlock(&ls->ls_rsbtbl[b].lock); 4071 log_error(ls, "repeat_remove on toss %s", name); 4072 return; 4073 } 4074 4075 /* use ls->remove_name2 to avoid conflict with shrink? */ 4076 4077 spin_lock(&ls->ls_remove_spin); 4078 ls->ls_remove_len = len; 4079 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN); 4080 spin_unlock(&ls->ls_remove_spin); 4081 spin_unlock(&ls->ls_rsbtbl[b].lock); 4082 wake_up(&ls->ls_remove_wait); 4083 4084 rv = _create_message(ls, sizeof(struct dlm_message) + len, 4085 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh); 4086 if (rv) 4087 return; 4088 4089 memcpy(ms->m_extra, name, len); 4090 ms->m_hash = hash; 4091 4092 send_message(mh, ms); 4093 4094 spin_lock(&ls->ls_remove_spin); 4095 ls->ls_remove_len = 0; 4096 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN); 4097 spin_unlock(&ls->ls_remove_spin); 4098 } 4099 4100 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms) 4101 { 4102 struct dlm_lkb *lkb; 4103 struct dlm_rsb *r; 4104 int from_nodeid; 4105 int error, namelen = 0; 4106 4107 from_nodeid = ms->m_header.h_nodeid; 4108 4109 error = create_lkb(ls, &lkb); 4110 if (error) 4111 goto fail; 4112 4113 receive_flags(lkb, ms); 4114 lkb->lkb_flags |= DLM_IFL_MSTCPY; 4115 error = receive_request_args(ls, lkb, ms); 4116 if (error) { 4117 __put_lkb(ls, lkb); 4118 goto fail; 4119 } 4120 4121 /* The dir node is the authority on whether we are the master 4122 for this rsb or not, so if the master sends us a request, we should 4123 recreate the rsb if we've destroyed it. This race happens when we 4124 send a remove message to the dir node at the same time that the dir 4125 node sends us a request for the rsb. */ 4126 4127 namelen = receive_extralen(ms); 4128 4129 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid, 4130 R_RECEIVE_REQUEST, &r); 4131 if (error) { 4132 __put_lkb(ls, lkb); 4133 goto fail; 4134 } 4135 4136 lock_rsb(r); 4137 4138 if (r->res_master_nodeid != dlm_our_nodeid()) { 4139 error = validate_master_nodeid(ls, r, from_nodeid); 4140 if (error) { 4141 unlock_rsb(r); 4142 put_rsb(r); 4143 __put_lkb(ls, lkb); 4144 goto fail; 4145 } 4146 } 4147 4148 attach_lkb(r, lkb); 4149 error = do_request(r, lkb); 4150 send_request_reply(r, lkb, error); 4151 do_request_effects(r, lkb, error); 4152 4153 unlock_rsb(r); 4154 put_rsb(r); 4155 4156 if (error == -EINPROGRESS) 4157 error = 0; 4158 if (error) 4159 dlm_put_lkb(lkb); 4160 return 0; 4161 4162 fail: 4163 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup 4164 and do this receive_request again from process_lookup_list once 4165 we get the lookup reply. This would avoid a many repeated 4166 ENOTBLK request failures when the lookup reply designating us 4167 as master is delayed. */ 4168 4169 /* We could repeatedly return -EBADR here if our send_remove() is 4170 delayed in being sent/arriving/being processed on the dir node. 4171 Another node would repeatedly lookup up the master, and the dir 4172 node would continue returning our nodeid until our send_remove 4173 took effect. 4174 4175 We send another remove message in case our previous send_remove 4176 was lost/ignored/missed somehow. */ 4177 4178 if (error != -ENOTBLK) { 4179 log_limit(ls, "receive_request %x from %d %d", 4180 ms->m_lkid, from_nodeid, error); 4181 } 4182 4183 if (namelen && error == -EBADR) { 4184 send_repeat_remove(ls, ms->m_extra, namelen); 4185 msleep(1000); 4186 } 4187 4188 setup_stub_lkb(ls, ms); 4189 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4190 return error; 4191 } 4192 4193 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 4194 { 4195 struct dlm_lkb *lkb; 4196 struct dlm_rsb *r; 4197 int error, reply = 1; 4198 4199 error = find_lkb(ls, ms->m_remid, &lkb); 4200 if (error) 4201 goto fail; 4202 4203 if (lkb->lkb_remid != ms->m_lkid) { 4204 log_error(ls, "receive_convert %x remid %x recover_seq %llu " 4205 "remote %d %x", lkb->lkb_id, lkb->lkb_remid, 4206 (unsigned long long)lkb->lkb_recover_seq, 4207 ms->m_header.h_nodeid, ms->m_lkid); 4208 error = -ENOENT; 4209 dlm_put_lkb(lkb); 4210 goto fail; 4211 } 4212 4213 r = lkb->lkb_resource; 4214 4215 hold_rsb(r); 4216 lock_rsb(r); 4217 4218 error = validate_message(lkb, ms); 4219 if (error) 4220 goto out; 4221 4222 receive_flags(lkb, ms); 4223 4224 error = receive_convert_args(ls, lkb, ms); 4225 if (error) { 4226 send_convert_reply(r, lkb, error); 4227 goto out; 4228 } 4229 4230 reply = !down_conversion(lkb); 4231 4232 error = do_convert(r, lkb); 4233 if (reply) 4234 send_convert_reply(r, lkb, error); 4235 do_convert_effects(r, lkb, error); 4236 out: 4237 unlock_rsb(r); 4238 put_rsb(r); 4239 dlm_put_lkb(lkb); 4240 return 0; 4241 4242 fail: 4243 setup_stub_lkb(ls, ms); 4244 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4245 return error; 4246 } 4247 4248 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 4249 { 4250 struct dlm_lkb *lkb; 4251 struct dlm_rsb *r; 4252 int error; 4253 4254 error = find_lkb(ls, ms->m_remid, &lkb); 4255 if (error) 4256 goto fail; 4257 4258 if (lkb->lkb_remid != ms->m_lkid) { 4259 log_error(ls, "receive_unlock %x remid %x remote %d %x", 4260 lkb->lkb_id, lkb->lkb_remid, 4261 ms->m_header.h_nodeid, ms->m_lkid); 4262 error = -ENOENT; 4263 dlm_put_lkb(lkb); 4264 goto fail; 4265 } 4266 4267 r = lkb->lkb_resource; 4268 4269 hold_rsb(r); 4270 lock_rsb(r); 4271 4272 error = validate_message(lkb, ms); 4273 if (error) 4274 goto out; 4275 4276 receive_flags(lkb, ms); 4277 4278 error = receive_unlock_args(ls, lkb, ms); 4279 if (error) { 4280 send_unlock_reply(r, lkb, error); 4281 goto out; 4282 } 4283 4284 error = do_unlock(r, lkb); 4285 send_unlock_reply(r, lkb, error); 4286 do_unlock_effects(r, lkb, error); 4287 out: 4288 unlock_rsb(r); 4289 put_rsb(r); 4290 dlm_put_lkb(lkb); 4291 return 0; 4292 4293 fail: 4294 setup_stub_lkb(ls, ms); 4295 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4296 return error; 4297 } 4298 4299 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 4300 { 4301 struct dlm_lkb *lkb; 4302 struct dlm_rsb *r; 4303 int error; 4304 4305 error = find_lkb(ls, ms->m_remid, &lkb); 4306 if (error) 4307 goto fail; 4308 4309 receive_flags(lkb, ms); 4310 4311 r = lkb->lkb_resource; 4312 4313 hold_rsb(r); 4314 lock_rsb(r); 4315 4316 error = validate_message(lkb, ms); 4317 if (error) 4318 goto out; 4319 4320 error = do_cancel(r, lkb); 4321 send_cancel_reply(r, lkb, error); 4322 do_cancel_effects(r, lkb, error); 4323 out: 4324 unlock_rsb(r); 4325 put_rsb(r); 4326 dlm_put_lkb(lkb); 4327 return 0; 4328 4329 fail: 4330 setup_stub_lkb(ls, ms); 4331 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 4332 return error; 4333 } 4334 4335 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 4336 { 4337 struct dlm_lkb *lkb; 4338 struct dlm_rsb *r; 4339 int error; 4340 4341 error = find_lkb(ls, ms->m_remid, &lkb); 4342 if (error) 4343 return error; 4344 4345 r = lkb->lkb_resource; 4346 4347 hold_rsb(r); 4348 lock_rsb(r); 4349 4350 error = validate_message(lkb, ms); 4351 if (error) 4352 goto out; 4353 4354 receive_flags_reply(lkb, ms); 4355 if (is_altmode(lkb)) 4356 munge_altmode(lkb, ms); 4357 grant_lock_pc(r, lkb, ms); 4358 queue_cast(r, lkb, 0); 4359 out: 4360 unlock_rsb(r); 4361 put_rsb(r); 4362 dlm_put_lkb(lkb); 4363 return 0; 4364 } 4365 4366 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 4367 { 4368 struct dlm_lkb *lkb; 4369 struct dlm_rsb *r; 4370 int error; 4371 4372 error = find_lkb(ls, ms->m_remid, &lkb); 4373 if (error) 4374 return error; 4375 4376 r = lkb->lkb_resource; 4377 4378 hold_rsb(r); 4379 lock_rsb(r); 4380 4381 error = validate_message(lkb, ms); 4382 if (error) 4383 goto out; 4384 4385 queue_bast(r, lkb, ms->m_bastmode); 4386 lkb->lkb_highbast = ms->m_bastmode; 4387 out: 4388 unlock_rsb(r); 4389 put_rsb(r); 4390 dlm_put_lkb(lkb); 4391 return 0; 4392 } 4393 4394 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 4395 { 4396 int len, error, ret_nodeid, from_nodeid, our_nodeid; 4397 4398 from_nodeid = ms->m_header.h_nodeid; 4399 our_nodeid = dlm_our_nodeid(); 4400 4401 len = receive_extralen(ms); 4402 4403 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0, 4404 &ret_nodeid, NULL); 4405 4406 /* Optimization: we're master so treat lookup as a request */ 4407 if (!error && ret_nodeid == our_nodeid) { 4408 receive_request(ls, ms); 4409 return; 4410 } 4411 send_lookup_reply(ls, ms, ret_nodeid, error); 4412 } 4413 4414 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 4415 { 4416 char name[DLM_RESNAME_MAXLEN+1]; 4417 struct dlm_rsb *r; 4418 uint32_t hash, b; 4419 int rv, len, dir_nodeid, from_nodeid; 4420 4421 from_nodeid = ms->m_header.h_nodeid; 4422 4423 len = receive_extralen(ms); 4424 4425 if (len > DLM_RESNAME_MAXLEN) { 4426 log_error(ls, "receive_remove from %d bad len %d", 4427 from_nodeid, len); 4428 return; 4429 } 4430 4431 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 4432 if (dir_nodeid != dlm_our_nodeid()) { 4433 log_error(ls, "receive_remove from %d bad nodeid %d", 4434 from_nodeid, dir_nodeid); 4435 return; 4436 } 4437 4438 /* Look for name on rsbtbl.toss, if it's there, kill it. 4439 If it's on rsbtbl.keep, it's being used, and we should ignore this 4440 message. This is an expected race between the dir node sending a 4441 request to the master node at the same time as the master node sends 4442 a remove to the dir node. The resolution to that race is for the 4443 dir node to ignore the remove message, and the master node to 4444 recreate the master rsb when it gets a request from the dir node for 4445 an rsb it doesn't have. */ 4446 4447 memset(name, 0, sizeof(name)); 4448 memcpy(name, ms->m_extra, len); 4449 4450 hash = jhash(name, len, 0); 4451 b = hash & (ls->ls_rsbtbl_size - 1); 4452 4453 spin_lock(&ls->ls_rsbtbl[b].lock); 4454 4455 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r); 4456 if (rv) { 4457 /* verify the rsb is on keep list per comment above */ 4458 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r); 4459 if (rv) { 4460 /* should not happen */ 4461 log_error(ls, "receive_remove from %d not found %s", 4462 from_nodeid, name); 4463 spin_unlock(&ls->ls_rsbtbl[b].lock); 4464 return; 4465 } 4466 if (r->res_master_nodeid != from_nodeid) { 4467 /* should not happen */ 4468 log_error(ls, "receive_remove keep from %d master %d", 4469 from_nodeid, r->res_master_nodeid); 4470 dlm_print_rsb(r); 4471 spin_unlock(&ls->ls_rsbtbl[b].lock); 4472 return; 4473 } 4474 4475 log_debug(ls, "receive_remove from %d master %d first %x %s", 4476 from_nodeid, r->res_master_nodeid, r->res_first_lkid, 4477 name); 4478 spin_unlock(&ls->ls_rsbtbl[b].lock); 4479 return; 4480 } 4481 4482 if (r->res_master_nodeid != from_nodeid) { 4483 log_error(ls, "receive_remove toss from %d master %d", 4484 from_nodeid, r->res_master_nodeid); 4485 dlm_print_rsb(r); 4486 spin_unlock(&ls->ls_rsbtbl[b].lock); 4487 return; 4488 } 4489 4490 if (kref_put(&r->res_ref, kill_rsb)) { 4491 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss); 4492 spin_unlock(&ls->ls_rsbtbl[b].lock); 4493 dlm_free_rsb(r); 4494 } else { 4495 log_error(ls, "receive_remove from %d rsb ref error", 4496 from_nodeid); 4497 dlm_print_rsb(r); 4498 spin_unlock(&ls->ls_rsbtbl[b].lock); 4499 } 4500 } 4501 4502 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms) 4503 { 4504 do_purge(ls, ms->m_nodeid, ms->m_pid); 4505 } 4506 4507 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 4508 { 4509 struct dlm_lkb *lkb; 4510 struct dlm_rsb *r; 4511 int error, mstype, result; 4512 int from_nodeid = ms->m_header.h_nodeid; 4513 4514 error = find_lkb(ls, ms->m_remid, &lkb); 4515 if (error) 4516 return error; 4517 4518 r = lkb->lkb_resource; 4519 hold_rsb(r); 4520 lock_rsb(r); 4521 4522 error = validate_message(lkb, ms); 4523 if (error) 4524 goto out; 4525 4526 mstype = lkb->lkb_wait_type; 4527 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); 4528 if (error) { 4529 log_error(ls, "receive_request_reply %x remote %d %x result %d", 4530 lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result); 4531 dlm_dump_rsb(r); 4532 goto out; 4533 } 4534 4535 /* Optimization: the dir node was also the master, so it took our 4536 lookup as a request and sent request reply instead of lookup reply */ 4537 if (mstype == DLM_MSG_LOOKUP) { 4538 r->res_master_nodeid = from_nodeid; 4539 r->res_nodeid = from_nodeid; 4540 lkb->lkb_nodeid = from_nodeid; 4541 } 4542 4543 /* this is the value returned from do_request() on the master */ 4544 result = ms->m_result; 4545 4546 switch (result) { 4547 case -EAGAIN: 4548 /* request would block (be queued) on remote master */ 4549 queue_cast(r, lkb, -EAGAIN); 4550 confirm_master(r, -EAGAIN); 4551 unhold_lkb(lkb); /* undoes create_lkb() */ 4552 break; 4553 4554 case -EINPROGRESS: 4555 case 0: 4556 /* request was queued or granted on remote master */ 4557 receive_flags_reply(lkb, ms); 4558 lkb->lkb_remid = ms->m_lkid; 4559 if (is_altmode(lkb)) 4560 munge_altmode(lkb, ms); 4561 if (result) { 4562 add_lkb(r, lkb, DLM_LKSTS_WAITING); 4563 add_timeout(lkb); 4564 } else { 4565 grant_lock_pc(r, lkb, ms); 4566 queue_cast(r, lkb, 0); 4567 } 4568 confirm_master(r, result); 4569 break; 4570 4571 case -EBADR: 4572 case -ENOTBLK: 4573 /* find_rsb failed to find rsb or rsb wasn't master */ 4574 log_limit(ls, "receive_request_reply %x from %d %d " 4575 "master %d dir %d first %x %s", lkb->lkb_id, 4576 from_nodeid, result, r->res_master_nodeid, 4577 r->res_dir_nodeid, r->res_first_lkid, r->res_name); 4578 4579 if (r->res_dir_nodeid != dlm_our_nodeid() && 4580 r->res_master_nodeid != dlm_our_nodeid()) { 4581 /* cause _request_lock->set_master->send_lookup */ 4582 r->res_master_nodeid = 0; 4583 r->res_nodeid = -1; 4584 lkb->lkb_nodeid = -1; 4585 } 4586 4587 if (is_overlap(lkb)) { 4588 /* we'll ignore error in cancel/unlock reply */ 4589 queue_cast_overlap(r, lkb); 4590 confirm_master(r, result); 4591 unhold_lkb(lkb); /* undoes create_lkb() */ 4592 } else { 4593 _request_lock(r, lkb); 4594 4595 if (r->res_master_nodeid == dlm_our_nodeid()) 4596 confirm_master(r, 0); 4597 } 4598 break; 4599 4600 default: 4601 log_error(ls, "receive_request_reply %x error %d", 4602 lkb->lkb_id, result); 4603 } 4604 4605 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) { 4606 log_debug(ls, "receive_request_reply %x result %d unlock", 4607 lkb->lkb_id, result); 4608 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4609 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4610 send_unlock(r, lkb); 4611 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) { 4612 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id); 4613 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4614 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4615 send_cancel(r, lkb); 4616 } else { 4617 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 4618 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 4619 } 4620 out: 4621 unlock_rsb(r); 4622 put_rsb(r); 4623 dlm_put_lkb(lkb); 4624 return 0; 4625 } 4626 4627 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 4628 struct dlm_message *ms) 4629 { 4630 /* this is the value returned from do_convert() on the master */ 4631 switch (ms->m_result) { 4632 case -EAGAIN: 4633 /* convert would block (be queued) on remote master */ 4634 queue_cast(r, lkb, -EAGAIN); 4635 break; 4636 4637 case -EDEADLK: 4638 receive_flags_reply(lkb, ms); 4639 revert_lock_pc(r, lkb); 4640 queue_cast(r, lkb, -EDEADLK); 4641 break; 4642 4643 case -EINPROGRESS: 4644 /* convert was queued on remote master */ 4645 receive_flags_reply(lkb, ms); 4646 if (is_demoted(lkb)) 4647 munge_demoted(lkb); 4648 del_lkb(r, lkb); 4649 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 4650 add_timeout(lkb); 4651 break; 4652 4653 case 0: 4654 /* convert was granted on remote master */ 4655 receive_flags_reply(lkb, ms); 4656 if (is_demoted(lkb)) 4657 munge_demoted(lkb); 4658 grant_lock_pc(r, lkb, ms); 4659 queue_cast(r, lkb, 0); 4660 break; 4661 4662 default: 4663 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d", 4664 lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid, 4665 ms->m_result); 4666 dlm_print_rsb(r); 4667 dlm_print_lkb(lkb); 4668 } 4669 } 4670 4671 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4672 { 4673 struct dlm_rsb *r = lkb->lkb_resource; 4674 int error; 4675 4676 hold_rsb(r); 4677 lock_rsb(r); 4678 4679 error = validate_message(lkb, ms); 4680 if (error) 4681 goto out; 4682 4683 /* stub reply can happen with waiters_mutex held */ 4684 error = remove_from_waiters_ms(lkb, ms); 4685 if (error) 4686 goto out; 4687 4688 __receive_convert_reply(r, lkb, ms); 4689 out: 4690 unlock_rsb(r); 4691 put_rsb(r); 4692 } 4693 4694 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 4695 { 4696 struct dlm_lkb *lkb; 4697 int error; 4698 4699 error = find_lkb(ls, ms->m_remid, &lkb); 4700 if (error) 4701 return error; 4702 4703 _receive_convert_reply(lkb, ms); 4704 dlm_put_lkb(lkb); 4705 return 0; 4706 } 4707 4708 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4709 { 4710 struct dlm_rsb *r = lkb->lkb_resource; 4711 int error; 4712 4713 hold_rsb(r); 4714 lock_rsb(r); 4715 4716 error = validate_message(lkb, ms); 4717 if (error) 4718 goto out; 4719 4720 /* stub reply can happen with waiters_mutex held */ 4721 error = remove_from_waiters_ms(lkb, ms); 4722 if (error) 4723 goto out; 4724 4725 /* this is the value returned from do_unlock() on the master */ 4726 4727 switch (ms->m_result) { 4728 case -DLM_EUNLOCK: 4729 receive_flags_reply(lkb, ms); 4730 remove_lock_pc(r, lkb); 4731 queue_cast(r, lkb, -DLM_EUNLOCK); 4732 break; 4733 case -ENOENT: 4734 break; 4735 default: 4736 log_error(r->res_ls, "receive_unlock_reply %x error %d", 4737 lkb->lkb_id, ms->m_result); 4738 } 4739 out: 4740 unlock_rsb(r); 4741 put_rsb(r); 4742 } 4743 4744 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 4745 { 4746 struct dlm_lkb *lkb; 4747 int error; 4748 4749 error = find_lkb(ls, ms->m_remid, &lkb); 4750 if (error) 4751 return error; 4752 4753 _receive_unlock_reply(lkb, ms); 4754 dlm_put_lkb(lkb); 4755 return 0; 4756 } 4757 4758 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 4759 { 4760 struct dlm_rsb *r = lkb->lkb_resource; 4761 int error; 4762 4763 hold_rsb(r); 4764 lock_rsb(r); 4765 4766 error = validate_message(lkb, ms); 4767 if (error) 4768 goto out; 4769 4770 /* stub reply can happen with waiters_mutex held */ 4771 error = remove_from_waiters_ms(lkb, ms); 4772 if (error) 4773 goto out; 4774 4775 /* this is the value returned from do_cancel() on the master */ 4776 4777 switch (ms->m_result) { 4778 case -DLM_ECANCEL: 4779 receive_flags_reply(lkb, ms); 4780 revert_lock_pc(r, lkb); 4781 queue_cast(r, lkb, -DLM_ECANCEL); 4782 break; 4783 case 0: 4784 break; 4785 default: 4786 log_error(r->res_ls, "receive_cancel_reply %x error %d", 4787 lkb->lkb_id, ms->m_result); 4788 } 4789 out: 4790 unlock_rsb(r); 4791 put_rsb(r); 4792 } 4793 4794 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 4795 { 4796 struct dlm_lkb *lkb; 4797 int error; 4798 4799 error = find_lkb(ls, ms->m_remid, &lkb); 4800 if (error) 4801 return error; 4802 4803 _receive_cancel_reply(lkb, ms); 4804 dlm_put_lkb(lkb); 4805 return 0; 4806 } 4807 4808 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 4809 { 4810 struct dlm_lkb *lkb; 4811 struct dlm_rsb *r; 4812 int error, ret_nodeid; 4813 int do_lookup_list = 0; 4814 4815 error = find_lkb(ls, ms->m_lkid, &lkb); 4816 if (error) { 4817 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid); 4818 return; 4819 } 4820 4821 /* ms->m_result is the value returned by dlm_master_lookup on dir node 4822 FIXME: will a non-zero error ever be returned? */ 4823 4824 r = lkb->lkb_resource; 4825 hold_rsb(r); 4826 lock_rsb(r); 4827 4828 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY); 4829 if (error) 4830 goto out; 4831 4832 ret_nodeid = ms->m_nodeid; 4833 4834 /* We sometimes receive a request from the dir node for this 4835 rsb before we've received the dir node's loookup_reply for it. 4836 The request from the dir node implies we're the master, so we set 4837 ourself as master in receive_request_reply, and verify here that 4838 we are indeed the master. */ 4839 4840 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) { 4841 /* This should never happen */ 4842 log_error(ls, "receive_lookup_reply %x from %d ret %d " 4843 "master %d dir %d our %d first %x %s", 4844 lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid, 4845 r->res_master_nodeid, r->res_dir_nodeid, 4846 dlm_our_nodeid(), r->res_first_lkid, r->res_name); 4847 } 4848 4849 if (ret_nodeid == dlm_our_nodeid()) { 4850 r->res_master_nodeid = ret_nodeid; 4851 r->res_nodeid = 0; 4852 do_lookup_list = 1; 4853 r->res_first_lkid = 0; 4854 } else if (ret_nodeid == -1) { 4855 /* the remote node doesn't believe it's the dir node */ 4856 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid", 4857 lkb->lkb_id, ms->m_header.h_nodeid); 4858 r->res_master_nodeid = 0; 4859 r->res_nodeid = -1; 4860 lkb->lkb_nodeid = -1; 4861 } else { 4862 /* set_master() will set lkb_nodeid from r */ 4863 r->res_master_nodeid = ret_nodeid; 4864 r->res_nodeid = ret_nodeid; 4865 } 4866 4867 if (is_overlap(lkb)) { 4868 log_debug(ls, "receive_lookup_reply %x unlock %x", 4869 lkb->lkb_id, lkb->lkb_flags); 4870 queue_cast_overlap(r, lkb); 4871 unhold_lkb(lkb); /* undoes create_lkb() */ 4872 goto out_list; 4873 } 4874 4875 _request_lock(r, lkb); 4876 4877 out_list: 4878 if (do_lookup_list) 4879 process_lookup_list(r); 4880 out: 4881 unlock_rsb(r); 4882 put_rsb(r); 4883 dlm_put_lkb(lkb); 4884 } 4885 4886 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, 4887 uint32_t saved_seq) 4888 { 4889 int error = 0, noent = 0; 4890 4891 if (!dlm_is_member(ls, ms->m_header.h_nodeid)) { 4892 log_limit(ls, "receive %d from non-member %d %x %x %d", 4893 ms->m_type, ms->m_header.h_nodeid, ms->m_lkid, 4894 ms->m_remid, ms->m_result); 4895 return; 4896 } 4897 4898 switch (ms->m_type) { 4899 4900 /* messages sent to a master node */ 4901 4902 case DLM_MSG_REQUEST: 4903 error = receive_request(ls, ms); 4904 break; 4905 4906 case DLM_MSG_CONVERT: 4907 error = receive_convert(ls, ms); 4908 break; 4909 4910 case DLM_MSG_UNLOCK: 4911 error = receive_unlock(ls, ms); 4912 break; 4913 4914 case DLM_MSG_CANCEL: 4915 noent = 1; 4916 error = receive_cancel(ls, ms); 4917 break; 4918 4919 /* messages sent from a master node (replies to above) */ 4920 4921 case DLM_MSG_REQUEST_REPLY: 4922 error = receive_request_reply(ls, ms); 4923 break; 4924 4925 case DLM_MSG_CONVERT_REPLY: 4926 error = receive_convert_reply(ls, ms); 4927 break; 4928 4929 case DLM_MSG_UNLOCK_REPLY: 4930 error = receive_unlock_reply(ls, ms); 4931 break; 4932 4933 case DLM_MSG_CANCEL_REPLY: 4934 error = receive_cancel_reply(ls, ms); 4935 break; 4936 4937 /* messages sent from a master node (only two types of async msg) */ 4938 4939 case DLM_MSG_GRANT: 4940 noent = 1; 4941 error = receive_grant(ls, ms); 4942 break; 4943 4944 case DLM_MSG_BAST: 4945 noent = 1; 4946 error = receive_bast(ls, ms); 4947 break; 4948 4949 /* messages sent to a dir node */ 4950 4951 case DLM_MSG_LOOKUP: 4952 receive_lookup(ls, ms); 4953 break; 4954 4955 case DLM_MSG_REMOVE: 4956 receive_remove(ls, ms); 4957 break; 4958 4959 /* messages sent from a dir node (remove has no reply) */ 4960 4961 case DLM_MSG_LOOKUP_REPLY: 4962 receive_lookup_reply(ls, ms); 4963 break; 4964 4965 /* other messages */ 4966 4967 case DLM_MSG_PURGE: 4968 receive_purge(ls, ms); 4969 break; 4970 4971 default: 4972 log_error(ls, "unknown message type %d", ms->m_type); 4973 } 4974 4975 /* 4976 * When checking for ENOENT, we're checking the result of 4977 * find_lkb(m_remid): 4978 * 4979 * The lock id referenced in the message wasn't found. This may 4980 * happen in normal usage for the async messages and cancel, so 4981 * only use log_debug for them. 4982 * 4983 * Some errors are expected and normal. 4984 */ 4985 4986 if (error == -ENOENT && noent) { 4987 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u", 4988 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4989 ms->m_lkid, saved_seq); 4990 } else if (error == -ENOENT) { 4991 log_error(ls, "receive %d no %x remote %d %x saved_seq %u", 4992 ms->m_type, ms->m_remid, ms->m_header.h_nodeid, 4993 ms->m_lkid, saved_seq); 4994 4995 if (ms->m_type == DLM_MSG_CONVERT) 4996 dlm_dump_rsb_hash(ls, ms->m_hash); 4997 } 4998 4999 if (error == -EINVAL) { 5000 log_error(ls, "receive %d inval from %d lkid %x remid %x " 5001 "saved_seq %u", 5002 ms->m_type, ms->m_header.h_nodeid, 5003 ms->m_lkid, ms->m_remid, saved_seq); 5004 } 5005 } 5006 5007 /* If the lockspace is in recovery mode (locking stopped), then normal 5008 messages are saved on the requestqueue for processing after recovery is 5009 done. When not in recovery mode, we wait for dlm_recoverd to drain saved 5010 messages off the requestqueue before we process new ones. This occurs right 5011 after recovery completes when we transition from saving all messages on 5012 requestqueue, to processing all the saved messages, to processing new 5013 messages as they arrive. */ 5014 5015 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms, 5016 int nodeid) 5017 { 5018 if (dlm_locking_stopped(ls)) { 5019 /* If we were a member of this lockspace, left, and rejoined, 5020 other nodes may still be sending us messages from the 5021 lockspace generation before we left. */ 5022 if (!ls->ls_generation) { 5023 log_limit(ls, "receive %d from %d ignore old gen", 5024 ms->m_type, nodeid); 5025 return; 5026 } 5027 5028 dlm_add_requestqueue(ls, nodeid, ms); 5029 } else { 5030 dlm_wait_requestqueue(ls); 5031 _receive_message(ls, ms, 0); 5032 } 5033 } 5034 5035 /* This is called by dlm_recoverd to process messages that were saved on 5036 the requestqueue. */ 5037 5038 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms, 5039 uint32_t saved_seq) 5040 { 5041 _receive_message(ls, ms, saved_seq); 5042 } 5043 5044 /* This is called by the midcomms layer when something is received for 5045 the lockspace. It could be either a MSG (normal message sent as part of 5046 standard locking activity) or an RCOM (recovery message sent as part of 5047 lockspace recovery). */ 5048 5049 void dlm_receive_buffer(union dlm_packet *p, int nodeid) 5050 { 5051 struct dlm_header *hd = &p->header; 5052 struct dlm_ls *ls; 5053 int type = 0; 5054 5055 switch (hd->h_cmd) { 5056 case DLM_MSG: 5057 dlm_message_in(&p->message); 5058 type = p->message.m_type; 5059 break; 5060 case DLM_RCOM: 5061 dlm_rcom_in(&p->rcom); 5062 type = p->rcom.rc_type; 5063 break; 5064 default: 5065 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid); 5066 return; 5067 } 5068 5069 if (hd->h_nodeid != nodeid) { 5070 log_print("invalid h_nodeid %d from %d lockspace %x", 5071 hd->h_nodeid, nodeid, hd->u.h_lockspace); 5072 return; 5073 } 5074 5075 ls = dlm_find_lockspace_global(hd->u.h_lockspace); 5076 if (!ls) { 5077 if (dlm_config.ci_log_debug) { 5078 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace " 5079 "%u from %d cmd %d type %d\n", 5080 hd->u.h_lockspace, nodeid, hd->h_cmd, type); 5081 } 5082 5083 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) 5084 dlm_send_ls_not_ready(nodeid, &p->rcom); 5085 return; 5086 } 5087 5088 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to 5089 be inactive (in this ls) before transitioning to recovery mode */ 5090 5091 down_read(&ls->ls_recv_active); 5092 if (hd->h_cmd == DLM_MSG) 5093 dlm_receive_message(ls, &p->message, nodeid); 5094 else 5095 dlm_receive_rcom(ls, &p->rcom, nodeid); 5096 up_read(&ls->ls_recv_active); 5097 5098 dlm_put_lockspace(ls); 5099 } 5100 5101 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, 5102 struct dlm_message *ms_stub) 5103 { 5104 if (middle_conversion(lkb)) { 5105 hold_lkb(lkb); 5106 memset(ms_stub, 0, sizeof(struct dlm_message)); 5107 ms_stub->m_flags = DLM_IFL_STUB_MS; 5108 ms_stub->m_type = DLM_MSG_CONVERT_REPLY; 5109 ms_stub->m_result = -EINPROGRESS; 5110 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; 5111 _receive_convert_reply(lkb, ms_stub); 5112 5113 /* Same special case as in receive_rcom_lock_args() */ 5114 lkb->lkb_grmode = DLM_LOCK_IV; 5115 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 5116 unhold_lkb(lkb); 5117 5118 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 5119 lkb->lkb_flags |= DLM_IFL_RESEND; 5120 } 5121 5122 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 5123 conversions are async; there's no reply from the remote master */ 5124 } 5125 5126 /* A waiting lkb needs recovery if the master node has failed, or 5127 the master node is changing (only when no directory is used) */ 5128 5129 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, 5130 int dir_nodeid) 5131 { 5132 if (dlm_no_directory(ls)) 5133 return 1; 5134 5135 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) 5136 return 1; 5137 5138 return 0; 5139 } 5140 5141 /* Recovery for locks that are waiting for replies from nodes that are now 5142 gone. We can just complete unlocks and cancels by faking a reply from the 5143 dead node. Requests and up-conversions we flag to be resent after 5144 recovery. Down-conversions can just be completed with a fake reply like 5145 unlocks. Conversions between PR and CW need special attention. */ 5146 5147 void dlm_recover_waiters_pre(struct dlm_ls *ls) 5148 { 5149 struct dlm_lkb *lkb, *safe; 5150 struct dlm_message *ms_stub; 5151 int wait_type, stub_unlock_result, stub_cancel_result; 5152 int dir_nodeid; 5153 5154 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL); 5155 if (!ms_stub) 5156 return; 5157 5158 mutex_lock(&ls->ls_waiters_mutex); 5159 5160 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 5161 5162 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource); 5163 5164 /* exclude debug messages about unlocks because there can be so 5165 many and they aren't very interesting */ 5166 5167 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) { 5168 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5169 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d", 5170 lkb->lkb_id, 5171 lkb->lkb_remid, 5172 lkb->lkb_wait_type, 5173 lkb->lkb_resource->res_nodeid, 5174 lkb->lkb_nodeid, 5175 lkb->lkb_wait_nodeid, 5176 dir_nodeid); 5177 } 5178 5179 /* all outstanding lookups, regardless of destination will be 5180 resent after recovery is done */ 5181 5182 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 5183 lkb->lkb_flags |= DLM_IFL_RESEND; 5184 continue; 5185 } 5186 5187 if (!waiter_needs_recovery(ls, lkb, dir_nodeid)) 5188 continue; 5189 5190 wait_type = lkb->lkb_wait_type; 5191 stub_unlock_result = -DLM_EUNLOCK; 5192 stub_cancel_result = -DLM_ECANCEL; 5193 5194 /* Main reply may have been received leaving a zero wait_type, 5195 but a reply for the overlapping op may not have been 5196 received. In that case we need to fake the appropriate 5197 reply for the overlap op. */ 5198 5199 if (!wait_type) { 5200 if (is_overlap_cancel(lkb)) { 5201 wait_type = DLM_MSG_CANCEL; 5202 if (lkb->lkb_grmode == DLM_LOCK_IV) 5203 stub_cancel_result = 0; 5204 } 5205 if (is_overlap_unlock(lkb)) { 5206 wait_type = DLM_MSG_UNLOCK; 5207 if (lkb->lkb_grmode == DLM_LOCK_IV) 5208 stub_unlock_result = -ENOENT; 5209 } 5210 5211 log_debug(ls, "rwpre overlap %x %x %d %d %d", 5212 lkb->lkb_id, lkb->lkb_flags, wait_type, 5213 stub_cancel_result, stub_unlock_result); 5214 } 5215 5216 switch (wait_type) { 5217 5218 case DLM_MSG_REQUEST: 5219 lkb->lkb_flags |= DLM_IFL_RESEND; 5220 break; 5221 5222 case DLM_MSG_CONVERT: 5223 recover_convert_waiter(ls, lkb, ms_stub); 5224 break; 5225 5226 case DLM_MSG_UNLOCK: 5227 hold_lkb(lkb); 5228 memset(ms_stub, 0, sizeof(struct dlm_message)); 5229 ms_stub->m_flags = DLM_IFL_STUB_MS; 5230 ms_stub->m_type = DLM_MSG_UNLOCK_REPLY; 5231 ms_stub->m_result = stub_unlock_result; 5232 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; 5233 _receive_unlock_reply(lkb, ms_stub); 5234 dlm_put_lkb(lkb); 5235 break; 5236 5237 case DLM_MSG_CANCEL: 5238 hold_lkb(lkb); 5239 memset(ms_stub, 0, sizeof(struct dlm_message)); 5240 ms_stub->m_flags = DLM_IFL_STUB_MS; 5241 ms_stub->m_type = DLM_MSG_CANCEL_REPLY; 5242 ms_stub->m_result = stub_cancel_result; 5243 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid; 5244 _receive_cancel_reply(lkb, ms_stub); 5245 dlm_put_lkb(lkb); 5246 break; 5247 5248 default: 5249 log_error(ls, "invalid lkb wait_type %d %d", 5250 lkb->lkb_wait_type, wait_type); 5251 } 5252 schedule(); 5253 } 5254 mutex_unlock(&ls->ls_waiters_mutex); 5255 kfree(ms_stub); 5256 } 5257 5258 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls) 5259 { 5260 struct dlm_lkb *lkb; 5261 int found = 0; 5262 5263 mutex_lock(&ls->ls_waiters_mutex); 5264 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 5265 if (lkb->lkb_flags & DLM_IFL_RESEND) { 5266 hold_lkb(lkb); 5267 found = 1; 5268 break; 5269 } 5270 } 5271 mutex_unlock(&ls->ls_waiters_mutex); 5272 5273 if (!found) 5274 lkb = NULL; 5275 return lkb; 5276 } 5277 5278 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 5279 master or dir-node for r. Processing the lkb may result in it being placed 5280 back on waiters. */ 5281 5282 /* We do this after normal locking has been enabled and any saved messages 5283 (in requestqueue) have been processed. We should be confident that at 5284 this point we won't get or process a reply to any of these waiting 5285 operations. But, new ops may be coming in on the rsbs/locks here from 5286 userspace or remotely. */ 5287 5288 /* there may have been an overlap unlock/cancel prior to recovery or after 5289 recovery. if before, the lkb may still have a pos wait_count; if after, the 5290 overlap flag would just have been set and nothing new sent. we can be 5291 confident here than any replies to either the initial op or overlap ops 5292 prior to recovery have been received. */ 5293 5294 int dlm_recover_waiters_post(struct dlm_ls *ls) 5295 { 5296 struct dlm_lkb *lkb; 5297 struct dlm_rsb *r; 5298 int error = 0, mstype, err, oc, ou; 5299 5300 while (1) { 5301 if (dlm_locking_stopped(ls)) { 5302 log_debug(ls, "recover_waiters_post aborted"); 5303 error = -EINTR; 5304 break; 5305 } 5306 5307 lkb = find_resend_waiter(ls); 5308 if (!lkb) 5309 break; 5310 5311 r = lkb->lkb_resource; 5312 hold_rsb(r); 5313 lock_rsb(r); 5314 5315 mstype = lkb->lkb_wait_type; 5316 oc = is_overlap_cancel(lkb); 5317 ou = is_overlap_unlock(lkb); 5318 err = 0; 5319 5320 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d " 5321 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d " 5322 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype, 5323 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid, 5324 dlm_dir_nodeid(r), oc, ou); 5325 5326 /* At this point we assume that we won't get a reply to any 5327 previous op or overlap op on this lock. First, do a big 5328 remove_from_waiters() for all previous ops. */ 5329 5330 lkb->lkb_flags &= ~DLM_IFL_RESEND; 5331 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK; 5332 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL; 5333 lkb->lkb_wait_type = 0; 5334 lkb->lkb_wait_count = 0; 5335 mutex_lock(&ls->ls_waiters_mutex); 5336 list_del_init(&lkb->lkb_wait_reply); 5337 mutex_unlock(&ls->ls_waiters_mutex); 5338 unhold_lkb(lkb); /* for waiters list */ 5339 5340 if (oc || ou) { 5341 /* do an unlock or cancel instead of resending */ 5342 switch (mstype) { 5343 case DLM_MSG_LOOKUP: 5344 case DLM_MSG_REQUEST: 5345 queue_cast(r, lkb, ou ? -DLM_EUNLOCK : 5346 -DLM_ECANCEL); 5347 unhold_lkb(lkb); /* undoes create_lkb() */ 5348 break; 5349 case DLM_MSG_CONVERT: 5350 if (oc) { 5351 queue_cast(r, lkb, -DLM_ECANCEL); 5352 } else { 5353 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK; 5354 _unlock_lock(r, lkb); 5355 } 5356 break; 5357 default: 5358 err = 1; 5359 } 5360 } else { 5361 switch (mstype) { 5362 case DLM_MSG_LOOKUP: 5363 case DLM_MSG_REQUEST: 5364 _request_lock(r, lkb); 5365 if (is_master(r)) 5366 confirm_master(r, 0); 5367 break; 5368 case DLM_MSG_CONVERT: 5369 _convert_lock(r, lkb); 5370 break; 5371 default: 5372 err = 1; 5373 } 5374 } 5375 5376 if (err) { 5377 log_error(ls, "waiter %x msg %d r_nodeid %d " 5378 "dir_nodeid %d overlap %d %d", 5379 lkb->lkb_id, mstype, r->res_nodeid, 5380 dlm_dir_nodeid(r), oc, ou); 5381 } 5382 unlock_rsb(r); 5383 put_rsb(r); 5384 dlm_put_lkb(lkb); 5385 } 5386 5387 return error; 5388 } 5389 5390 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r, 5391 struct list_head *list) 5392 { 5393 struct dlm_lkb *lkb, *safe; 5394 5395 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5396 if (!is_master_copy(lkb)) 5397 continue; 5398 5399 /* don't purge lkbs we've added in recover_master_copy for 5400 the current recovery seq */ 5401 5402 if (lkb->lkb_recover_seq == ls->ls_recover_seq) 5403 continue; 5404 5405 del_lkb(r, lkb); 5406 5407 /* this put should free the lkb */ 5408 if (!dlm_put_lkb(lkb)) 5409 log_error(ls, "purged mstcpy lkb not released"); 5410 } 5411 } 5412 5413 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 5414 { 5415 struct dlm_ls *ls = r->res_ls; 5416 5417 purge_mstcpy_list(ls, r, &r->res_grantqueue); 5418 purge_mstcpy_list(ls, r, &r->res_convertqueue); 5419 purge_mstcpy_list(ls, r, &r->res_waitqueue); 5420 } 5421 5422 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r, 5423 struct list_head *list, 5424 int nodeid_gone, unsigned int *count) 5425 { 5426 struct dlm_lkb *lkb, *safe; 5427 5428 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) { 5429 if (!is_master_copy(lkb)) 5430 continue; 5431 5432 if ((lkb->lkb_nodeid == nodeid_gone) || 5433 dlm_is_removed(ls, lkb->lkb_nodeid)) { 5434 5435 /* tell recover_lvb to invalidate the lvb 5436 because a node holding EX/PW failed */ 5437 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) && 5438 (lkb->lkb_grmode >= DLM_LOCK_PW)) { 5439 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL); 5440 } 5441 5442 del_lkb(r, lkb); 5443 5444 /* this put should free the lkb */ 5445 if (!dlm_put_lkb(lkb)) 5446 log_error(ls, "purged dead lkb not released"); 5447 5448 rsb_set_flag(r, RSB_RECOVER_GRANT); 5449 5450 (*count)++; 5451 } 5452 } 5453 } 5454 5455 /* Get rid of locks held by nodes that are gone. */ 5456 5457 void dlm_recover_purge(struct dlm_ls *ls) 5458 { 5459 struct dlm_rsb *r; 5460 struct dlm_member *memb; 5461 int nodes_count = 0; 5462 int nodeid_gone = 0; 5463 unsigned int lkb_count = 0; 5464 5465 /* cache one removed nodeid to optimize the common 5466 case of a single node removed */ 5467 5468 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 5469 nodes_count++; 5470 nodeid_gone = memb->nodeid; 5471 } 5472 5473 if (!nodes_count) 5474 return; 5475 5476 down_write(&ls->ls_root_sem); 5477 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 5478 hold_rsb(r); 5479 lock_rsb(r); 5480 if (is_master(r)) { 5481 purge_dead_list(ls, r, &r->res_grantqueue, 5482 nodeid_gone, &lkb_count); 5483 purge_dead_list(ls, r, &r->res_convertqueue, 5484 nodeid_gone, &lkb_count); 5485 purge_dead_list(ls, r, &r->res_waitqueue, 5486 nodeid_gone, &lkb_count); 5487 } 5488 unlock_rsb(r); 5489 unhold_rsb(r); 5490 cond_resched(); 5491 } 5492 up_write(&ls->ls_root_sem); 5493 5494 if (lkb_count) 5495 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes", 5496 lkb_count, nodes_count); 5497 } 5498 5499 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket) 5500 { 5501 struct rb_node *n; 5502 struct dlm_rsb *r; 5503 5504 spin_lock(&ls->ls_rsbtbl[bucket].lock); 5505 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { 5506 r = rb_entry(n, struct dlm_rsb, res_hashnode); 5507 5508 if (!rsb_flag(r, RSB_RECOVER_GRANT)) 5509 continue; 5510 if (!is_master(r)) { 5511 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5512 continue; 5513 } 5514 hold_rsb(r); 5515 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5516 return r; 5517 } 5518 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 5519 return NULL; 5520 } 5521 5522 /* 5523 * Attempt to grant locks on resources that we are the master of. 5524 * Locks may have become grantable during recovery because locks 5525 * from departed nodes have been purged (or not rebuilt), allowing 5526 * previously blocked locks to now be granted. The subset of rsb's 5527 * we are interested in are those with lkb's on either the convert or 5528 * waiting queues. 5529 * 5530 * Simplest would be to go through each master rsb and check for non-empty 5531 * convert or waiting queues, and attempt to grant on those rsbs. 5532 * Checking the queues requires lock_rsb, though, for which we'd need 5533 * to release the rsbtbl lock. This would make iterating through all 5534 * rsb's very inefficient. So, we rely on earlier recovery routines 5535 * to set RECOVER_GRANT on any rsb's that we should attempt to grant 5536 * locks for. 5537 */ 5538 5539 void dlm_recover_grant(struct dlm_ls *ls) 5540 { 5541 struct dlm_rsb *r; 5542 int bucket = 0; 5543 unsigned int count = 0; 5544 unsigned int rsb_count = 0; 5545 unsigned int lkb_count = 0; 5546 5547 while (1) { 5548 r = find_grant_rsb(ls, bucket); 5549 if (!r) { 5550 if (bucket == ls->ls_rsbtbl_size - 1) 5551 break; 5552 bucket++; 5553 continue; 5554 } 5555 rsb_count++; 5556 count = 0; 5557 lock_rsb(r); 5558 /* the RECOVER_GRANT flag is checked in the grant path */ 5559 grant_pending_locks(r, &count); 5560 rsb_clear_flag(r, RSB_RECOVER_GRANT); 5561 lkb_count += count; 5562 confirm_master(r, 0); 5563 unlock_rsb(r); 5564 put_rsb(r); 5565 cond_resched(); 5566 } 5567 5568 if (lkb_count) 5569 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources", 5570 lkb_count, rsb_count); 5571 } 5572 5573 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 5574 uint32_t remid) 5575 { 5576 struct dlm_lkb *lkb; 5577 5578 list_for_each_entry(lkb, head, lkb_statequeue) { 5579 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 5580 return lkb; 5581 } 5582 return NULL; 5583 } 5584 5585 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 5586 uint32_t remid) 5587 { 5588 struct dlm_lkb *lkb; 5589 5590 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 5591 if (lkb) 5592 return lkb; 5593 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 5594 if (lkb) 5595 return lkb; 5596 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 5597 if (lkb) 5598 return lkb; 5599 return NULL; 5600 } 5601 5602 /* needs at least dlm_rcom + rcom_lock */ 5603 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 5604 struct dlm_rsb *r, struct dlm_rcom *rc) 5605 { 5606 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5607 5608 lkb->lkb_nodeid = rc->rc_header.h_nodeid; 5609 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid); 5610 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid); 5611 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags); 5612 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF; 5613 lkb->lkb_flags |= DLM_IFL_MSTCPY; 5614 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq); 5615 lkb->lkb_rqmode = rl->rl_rqmode; 5616 lkb->lkb_grmode = rl->rl_grmode; 5617 /* don't set lkb_status because add_lkb wants to itself */ 5618 5619 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL; 5620 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL; 5621 5622 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 5623 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - 5624 sizeof(struct rcom_lock); 5625 if (lvblen > ls->ls_lvblen) 5626 return -EINVAL; 5627 lkb->lkb_lvbptr = dlm_allocate_lvb(ls); 5628 if (!lkb->lkb_lvbptr) 5629 return -ENOMEM; 5630 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 5631 } 5632 5633 /* Conversions between PR and CW (middle modes) need special handling. 5634 The real granted mode of these converting locks cannot be determined 5635 until all locks have been rebuilt on the rsb (recover_conversion) */ 5636 5637 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) && 5638 middle_conversion(lkb)) { 5639 rl->rl_status = DLM_LKSTS_CONVERT; 5640 lkb->lkb_grmode = DLM_LOCK_IV; 5641 rsb_set_flag(r, RSB_RECOVER_CONVERT); 5642 } 5643 5644 return 0; 5645 } 5646 5647 /* This lkb may have been recovered in a previous aborted recovery so we need 5648 to check if the rsb already has an lkb with the given remote nodeid/lkid. 5649 If so we just send back a standard reply. If not, we create a new lkb with 5650 the given values and send back our lkid. We send back our lkid by sending 5651 back the rcom_lock struct we got but with the remid field filled in. */ 5652 5653 /* needs at least dlm_rcom + rcom_lock */ 5654 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5655 { 5656 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5657 struct dlm_rsb *r; 5658 struct dlm_lkb *lkb; 5659 uint32_t remid = 0; 5660 int from_nodeid = rc->rc_header.h_nodeid; 5661 int error; 5662 5663 if (rl->rl_parent_lkid) { 5664 error = -EOPNOTSUPP; 5665 goto out; 5666 } 5667 5668 remid = le32_to_cpu(rl->rl_lkid); 5669 5670 /* In general we expect the rsb returned to be R_MASTER, but we don't 5671 have to require it. Recovery of masters on one node can overlap 5672 recovery of locks on another node, so one node can send us MSTCPY 5673 locks before we've made ourselves master of this rsb. We can still 5674 add new MSTCPY locks that we receive here without any harm; when 5675 we make ourselves master, dlm_recover_masters() won't touch the 5676 MSTCPY locks we've received early. */ 5677 5678 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 5679 from_nodeid, R_RECEIVE_RECOVER, &r); 5680 if (error) 5681 goto out; 5682 5683 lock_rsb(r); 5684 5685 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) { 5686 log_error(ls, "dlm_recover_master_copy remote %d %x not dir", 5687 from_nodeid, remid); 5688 error = -EBADR; 5689 goto out_unlock; 5690 } 5691 5692 lkb = search_remid(r, from_nodeid, remid); 5693 if (lkb) { 5694 error = -EEXIST; 5695 goto out_remid; 5696 } 5697 5698 error = create_lkb(ls, &lkb); 5699 if (error) 5700 goto out_unlock; 5701 5702 error = receive_rcom_lock_args(ls, lkb, r, rc); 5703 if (error) { 5704 __put_lkb(ls, lkb); 5705 goto out_unlock; 5706 } 5707 5708 attach_lkb(r, lkb); 5709 add_lkb(r, lkb, rl->rl_status); 5710 error = 0; 5711 ls->ls_recover_locks_in++; 5712 5713 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) 5714 rsb_set_flag(r, RSB_RECOVER_GRANT); 5715 5716 out_remid: 5717 /* this is the new value returned to the lock holder for 5718 saving in its process-copy lkb */ 5719 rl->rl_remid = cpu_to_le32(lkb->lkb_id); 5720 5721 lkb->lkb_recover_seq = ls->ls_recover_seq; 5722 5723 out_unlock: 5724 unlock_rsb(r); 5725 put_rsb(r); 5726 out: 5727 if (error && error != -EEXIST) 5728 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d", 5729 from_nodeid, remid, error); 5730 rl->rl_result = cpu_to_le32(error); 5731 return error; 5732 } 5733 5734 /* needs at least dlm_rcom + rcom_lock */ 5735 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 5736 { 5737 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 5738 struct dlm_rsb *r; 5739 struct dlm_lkb *lkb; 5740 uint32_t lkid, remid; 5741 int error, result; 5742 5743 lkid = le32_to_cpu(rl->rl_lkid); 5744 remid = le32_to_cpu(rl->rl_remid); 5745 result = le32_to_cpu(rl->rl_result); 5746 5747 error = find_lkb(ls, lkid, &lkb); 5748 if (error) { 5749 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d", 5750 lkid, rc->rc_header.h_nodeid, remid, result); 5751 return error; 5752 } 5753 5754 r = lkb->lkb_resource; 5755 hold_rsb(r); 5756 lock_rsb(r); 5757 5758 if (!is_process_copy(lkb)) { 5759 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", 5760 lkid, rc->rc_header.h_nodeid, remid, result); 5761 dlm_dump_rsb(r); 5762 unlock_rsb(r); 5763 put_rsb(r); 5764 dlm_put_lkb(lkb); 5765 return -EINVAL; 5766 } 5767 5768 switch (result) { 5769 case -EBADR: 5770 /* There's a chance the new master received our lock before 5771 dlm_recover_master_reply(), this wouldn't happen if we did 5772 a barrier between recover_masters and recover_locks. */ 5773 5774 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d", 5775 lkid, rc->rc_header.h_nodeid, remid, result); 5776 5777 dlm_send_rcom_lock(r, lkb); 5778 goto out; 5779 case -EEXIST: 5780 case 0: 5781 lkb->lkb_remid = remid; 5782 break; 5783 default: 5784 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk", 5785 lkid, rc->rc_header.h_nodeid, remid, result); 5786 } 5787 5788 /* an ack for dlm_recover_locks() which waits for replies from 5789 all the locks it sends to new masters */ 5790 dlm_recovered_lock(r); 5791 out: 5792 unlock_rsb(r); 5793 put_rsb(r); 5794 dlm_put_lkb(lkb); 5795 5796 return 0; 5797 } 5798 5799 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 5800 int mode, uint32_t flags, void *name, unsigned int namelen, 5801 unsigned long timeout_cs) 5802 { 5803 struct dlm_lkb *lkb; 5804 struct dlm_args args; 5805 int error; 5806 5807 dlm_lock_recovery(ls); 5808 5809 error = create_lkb(ls, &lkb); 5810 if (error) { 5811 kfree(ua); 5812 goto out; 5813 } 5814 5815 if (flags & DLM_LKF_VALBLK) { 5816 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5817 if (!ua->lksb.sb_lvbptr) { 5818 kfree(ua); 5819 __put_lkb(ls, lkb); 5820 error = -ENOMEM; 5821 goto out; 5822 } 5823 } 5824 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs, 5825 fake_astfn, ua, fake_bastfn, &args); 5826 if (error) { 5827 kfree(ua->lksb.sb_lvbptr); 5828 ua->lksb.sb_lvbptr = NULL; 5829 kfree(ua); 5830 __put_lkb(ls, lkb); 5831 goto out; 5832 } 5833 5834 /* After ua is attached to lkb it will be freed by dlm_free_lkb(). 5835 When DLM_IFL_USER is set, the dlm knows that this is a userspace 5836 lock and that lkb_astparam is the dlm_user_args structure. */ 5837 lkb->lkb_flags |= DLM_IFL_USER; 5838 error = request_lock(ls, lkb, name, namelen, &args); 5839 5840 switch (error) { 5841 case 0: 5842 break; 5843 case -EINPROGRESS: 5844 error = 0; 5845 break; 5846 case -EAGAIN: 5847 error = 0; 5848 fallthrough; 5849 default: 5850 __put_lkb(ls, lkb); 5851 goto out; 5852 } 5853 5854 /* add this new lkb to the per-process list of locks */ 5855 spin_lock(&ua->proc->locks_spin); 5856 hold_lkb(lkb); 5857 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5858 spin_unlock(&ua->proc->locks_spin); 5859 out: 5860 dlm_unlock_recovery(ls); 5861 return error; 5862 } 5863 5864 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5865 int mode, uint32_t flags, uint32_t lkid, char *lvb_in, 5866 unsigned long timeout_cs) 5867 { 5868 struct dlm_lkb *lkb; 5869 struct dlm_args args; 5870 struct dlm_user_args *ua; 5871 int error; 5872 5873 dlm_lock_recovery(ls); 5874 5875 error = find_lkb(ls, lkid, &lkb); 5876 if (error) 5877 goto out; 5878 5879 /* user can change the params on its lock when it converts it, or 5880 add an lvb that didn't exist before */ 5881 5882 ua = lkb->lkb_ua; 5883 5884 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 5885 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS); 5886 if (!ua->lksb.sb_lvbptr) { 5887 error = -ENOMEM; 5888 goto out_put; 5889 } 5890 } 5891 if (lvb_in && ua->lksb.sb_lvbptr) 5892 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 5893 5894 ua->xid = ua_tmp->xid; 5895 ua->castparam = ua_tmp->castparam; 5896 ua->castaddr = ua_tmp->castaddr; 5897 ua->bastparam = ua_tmp->bastparam; 5898 ua->bastaddr = ua_tmp->bastaddr; 5899 ua->user_lksb = ua_tmp->user_lksb; 5900 5901 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs, 5902 fake_astfn, ua, fake_bastfn, &args); 5903 if (error) 5904 goto out_put; 5905 5906 error = convert_lock(ls, lkb, &args); 5907 5908 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK) 5909 error = 0; 5910 out_put: 5911 dlm_put_lkb(lkb); 5912 out: 5913 dlm_unlock_recovery(ls); 5914 kfree(ua_tmp); 5915 return error; 5916 } 5917 5918 /* 5919 * The caller asks for an orphan lock on a given resource with a given mode. 5920 * If a matching lock exists, it's moved to the owner's list of locks and 5921 * the lkid is returned. 5922 */ 5923 5924 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5925 int mode, uint32_t flags, void *name, unsigned int namelen, 5926 unsigned long timeout_cs, uint32_t *lkid) 5927 { 5928 struct dlm_lkb *lkb; 5929 struct dlm_user_args *ua; 5930 int found_other_mode = 0; 5931 int found = 0; 5932 int rv = 0; 5933 5934 mutex_lock(&ls->ls_orphans_mutex); 5935 list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) { 5936 if (lkb->lkb_resource->res_length != namelen) 5937 continue; 5938 if (memcmp(lkb->lkb_resource->res_name, name, namelen)) 5939 continue; 5940 if (lkb->lkb_grmode != mode) { 5941 found_other_mode = 1; 5942 continue; 5943 } 5944 5945 found = 1; 5946 list_del_init(&lkb->lkb_ownqueue); 5947 lkb->lkb_flags &= ~DLM_IFL_ORPHAN; 5948 *lkid = lkb->lkb_id; 5949 break; 5950 } 5951 mutex_unlock(&ls->ls_orphans_mutex); 5952 5953 if (!found && found_other_mode) { 5954 rv = -EAGAIN; 5955 goto out; 5956 } 5957 5958 if (!found) { 5959 rv = -ENOENT; 5960 goto out; 5961 } 5962 5963 lkb->lkb_exflags = flags; 5964 lkb->lkb_ownpid = (int) current->pid; 5965 5966 ua = lkb->lkb_ua; 5967 5968 ua->proc = ua_tmp->proc; 5969 ua->xid = ua_tmp->xid; 5970 ua->castparam = ua_tmp->castparam; 5971 ua->castaddr = ua_tmp->castaddr; 5972 ua->bastparam = ua_tmp->bastparam; 5973 ua->bastaddr = ua_tmp->bastaddr; 5974 ua->user_lksb = ua_tmp->user_lksb; 5975 5976 /* 5977 * The lkb reference from the ls_orphans list was not 5978 * removed above, and is now considered the reference 5979 * for the proc locks list. 5980 */ 5981 5982 spin_lock(&ua->proc->locks_spin); 5983 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 5984 spin_unlock(&ua->proc->locks_spin); 5985 out: 5986 kfree(ua_tmp); 5987 return rv; 5988 } 5989 5990 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 5991 uint32_t flags, uint32_t lkid, char *lvb_in) 5992 { 5993 struct dlm_lkb *lkb; 5994 struct dlm_args args; 5995 struct dlm_user_args *ua; 5996 int error; 5997 5998 dlm_lock_recovery(ls); 5999 6000 error = find_lkb(ls, lkid, &lkb); 6001 if (error) 6002 goto out; 6003 6004 ua = lkb->lkb_ua; 6005 6006 if (lvb_in && ua->lksb.sb_lvbptr) 6007 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 6008 if (ua_tmp->castparam) 6009 ua->castparam = ua_tmp->castparam; 6010 ua->user_lksb = ua_tmp->user_lksb; 6011 6012 error = set_unlock_args(flags, ua, &args); 6013 if (error) 6014 goto out_put; 6015 6016 error = unlock_lock(ls, lkb, &args); 6017 6018 if (error == -DLM_EUNLOCK) 6019 error = 0; 6020 /* from validate_unlock_args() */ 6021 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK)) 6022 error = 0; 6023 if (error) 6024 goto out_put; 6025 6026 spin_lock(&ua->proc->locks_spin); 6027 /* dlm_user_add_cb() may have already taken lkb off the proc list */ 6028 if (!list_empty(&lkb->lkb_ownqueue)) 6029 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 6030 spin_unlock(&ua->proc->locks_spin); 6031 out_put: 6032 dlm_put_lkb(lkb); 6033 out: 6034 dlm_unlock_recovery(ls); 6035 kfree(ua_tmp); 6036 return error; 6037 } 6038 6039 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 6040 uint32_t flags, uint32_t lkid) 6041 { 6042 struct dlm_lkb *lkb; 6043 struct dlm_args args; 6044 struct dlm_user_args *ua; 6045 int error; 6046 6047 dlm_lock_recovery(ls); 6048 6049 error = find_lkb(ls, lkid, &lkb); 6050 if (error) 6051 goto out; 6052 6053 ua = lkb->lkb_ua; 6054 if (ua_tmp->castparam) 6055 ua->castparam = ua_tmp->castparam; 6056 ua->user_lksb = ua_tmp->user_lksb; 6057 6058 error = set_unlock_args(flags, ua, &args); 6059 if (error) 6060 goto out_put; 6061 6062 error = cancel_lock(ls, lkb, &args); 6063 6064 if (error == -DLM_ECANCEL) 6065 error = 0; 6066 /* from validate_unlock_args() */ 6067 if (error == -EBUSY) 6068 error = 0; 6069 out_put: 6070 dlm_put_lkb(lkb); 6071 out: 6072 dlm_unlock_recovery(ls); 6073 kfree(ua_tmp); 6074 return error; 6075 } 6076 6077 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid) 6078 { 6079 struct dlm_lkb *lkb; 6080 struct dlm_args args; 6081 struct dlm_user_args *ua; 6082 struct dlm_rsb *r; 6083 int error; 6084 6085 dlm_lock_recovery(ls); 6086 6087 error = find_lkb(ls, lkid, &lkb); 6088 if (error) 6089 goto out; 6090 6091 ua = lkb->lkb_ua; 6092 6093 error = set_unlock_args(flags, ua, &args); 6094 if (error) 6095 goto out_put; 6096 6097 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */ 6098 6099 r = lkb->lkb_resource; 6100 hold_rsb(r); 6101 lock_rsb(r); 6102 6103 error = validate_unlock_args(lkb, &args); 6104 if (error) 6105 goto out_r; 6106 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL; 6107 6108 error = _cancel_lock(r, lkb); 6109 out_r: 6110 unlock_rsb(r); 6111 put_rsb(r); 6112 6113 if (error == -DLM_ECANCEL) 6114 error = 0; 6115 /* from validate_unlock_args() */ 6116 if (error == -EBUSY) 6117 error = 0; 6118 out_put: 6119 dlm_put_lkb(lkb); 6120 out: 6121 dlm_unlock_recovery(ls); 6122 return error; 6123 } 6124 6125 /* lkb's that are removed from the waiters list by revert are just left on the 6126 orphans list with the granted orphan locks, to be freed by purge */ 6127 6128 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6129 { 6130 struct dlm_args args; 6131 int error; 6132 6133 hold_lkb(lkb); /* reference for the ls_orphans list */ 6134 mutex_lock(&ls->ls_orphans_mutex); 6135 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans); 6136 mutex_unlock(&ls->ls_orphans_mutex); 6137 6138 set_unlock_args(0, lkb->lkb_ua, &args); 6139 6140 error = cancel_lock(ls, lkb, &args); 6141 if (error == -DLM_ECANCEL) 6142 error = 0; 6143 return error; 6144 } 6145 6146 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't 6147 granted. Regardless of what rsb queue the lock is on, it's removed and 6148 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated 6149 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */ 6150 6151 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 6152 { 6153 struct dlm_args args; 6154 int error; 6155 6156 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK, 6157 lkb->lkb_ua, &args); 6158 6159 error = unlock_lock(ls, lkb, &args); 6160 if (error == -DLM_EUNLOCK) 6161 error = 0; 6162 return error; 6163 } 6164 6165 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock() 6166 (which does lock_rsb) due to deadlock with receiving a message that does 6167 lock_rsb followed by dlm_user_add_cb() */ 6168 6169 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls, 6170 struct dlm_user_proc *proc) 6171 { 6172 struct dlm_lkb *lkb = NULL; 6173 6174 mutex_lock(&ls->ls_clear_proc_locks); 6175 if (list_empty(&proc->locks)) 6176 goto out; 6177 6178 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue); 6179 list_del_init(&lkb->lkb_ownqueue); 6180 6181 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6182 lkb->lkb_flags |= DLM_IFL_ORPHAN; 6183 else 6184 lkb->lkb_flags |= DLM_IFL_DEAD; 6185 out: 6186 mutex_unlock(&ls->ls_clear_proc_locks); 6187 return lkb; 6188 } 6189 6190 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which 6191 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 6192 which we clear here. */ 6193 6194 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 6195 list, and no more device_writes should add lkb's to proc->locks list; so we 6196 shouldn't need to take asts_spin or locks_spin here. this assumes that 6197 device reads/writes/closes are serialized -- FIXME: we may need to serialize 6198 them ourself. */ 6199 6200 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6201 { 6202 struct dlm_lkb *lkb, *safe; 6203 6204 dlm_lock_recovery(ls); 6205 6206 while (1) { 6207 lkb = del_proc_lock(ls, proc); 6208 if (!lkb) 6209 break; 6210 del_timeout(lkb); 6211 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) 6212 orphan_proc_lock(ls, lkb); 6213 else 6214 unlock_proc_lock(ls, lkb); 6215 6216 /* this removes the reference for the proc->locks list 6217 added by dlm_user_request, it may result in the lkb 6218 being freed */ 6219 6220 dlm_put_lkb(lkb); 6221 } 6222 6223 mutex_lock(&ls->ls_clear_proc_locks); 6224 6225 /* in-progress unlocks */ 6226 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6227 list_del_init(&lkb->lkb_ownqueue); 6228 lkb->lkb_flags |= DLM_IFL_DEAD; 6229 dlm_put_lkb(lkb); 6230 } 6231 6232 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6233 memset(&lkb->lkb_callbacks, 0, 6234 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 6235 list_del_init(&lkb->lkb_cb_list); 6236 dlm_put_lkb(lkb); 6237 } 6238 6239 mutex_unlock(&ls->ls_clear_proc_locks); 6240 dlm_unlock_recovery(ls); 6241 } 6242 6243 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 6244 { 6245 struct dlm_lkb *lkb, *safe; 6246 6247 while (1) { 6248 lkb = NULL; 6249 spin_lock(&proc->locks_spin); 6250 if (!list_empty(&proc->locks)) { 6251 lkb = list_entry(proc->locks.next, struct dlm_lkb, 6252 lkb_ownqueue); 6253 list_del_init(&lkb->lkb_ownqueue); 6254 } 6255 spin_unlock(&proc->locks_spin); 6256 6257 if (!lkb) 6258 break; 6259 6260 lkb->lkb_flags |= DLM_IFL_DEAD; 6261 unlock_proc_lock(ls, lkb); 6262 dlm_put_lkb(lkb); /* ref from proc->locks list */ 6263 } 6264 6265 spin_lock(&proc->locks_spin); 6266 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 6267 list_del_init(&lkb->lkb_ownqueue); 6268 lkb->lkb_flags |= DLM_IFL_DEAD; 6269 dlm_put_lkb(lkb); 6270 } 6271 spin_unlock(&proc->locks_spin); 6272 6273 spin_lock(&proc->asts_spin); 6274 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) { 6275 memset(&lkb->lkb_callbacks, 0, 6276 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE); 6277 list_del_init(&lkb->lkb_cb_list); 6278 dlm_put_lkb(lkb); 6279 } 6280 spin_unlock(&proc->asts_spin); 6281 } 6282 6283 /* pid of 0 means purge all orphans */ 6284 6285 static void do_purge(struct dlm_ls *ls, int nodeid, int pid) 6286 { 6287 struct dlm_lkb *lkb, *safe; 6288 6289 mutex_lock(&ls->ls_orphans_mutex); 6290 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) { 6291 if (pid && lkb->lkb_ownpid != pid) 6292 continue; 6293 unlock_proc_lock(ls, lkb); 6294 list_del_init(&lkb->lkb_ownqueue); 6295 dlm_put_lkb(lkb); 6296 } 6297 mutex_unlock(&ls->ls_orphans_mutex); 6298 } 6299 6300 static int send_purge(struct dlm_ls *ls, int nodeid, int pid) 6301 { 6302 struct dlm_message *ms; 6303 struct dlm_mhandle *mh; 6304 int error; 6305 6306 error = _create_message(ls, sizeof(struct dlm_message), nodeid, 6307 DLM_MSG_PURGE, &ms, &mh); 6308 if (error) 6309 return error; 6310 ms->m_nodeid = nodeid; 6311 ms->m_pid = pid; 6312 6313 return send_message(mh, ms); 6314 } 6315 6316 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc, 6317 int nodeid, int pid) 6318 { 6319 int error = 0; 6320 6321 if (nodeid && (nodeid != dlm_our_nodeid())) { 6322 error = send_purge(ls, nodeid, pid); 6323 } else { 6324 dlm_lock_recovery(ls); 6325 if (pid == current->pid) 6326 purge_proc_locks(ls, proc); 6327 else 6328 do_purge(ls, nodeid, pid); 6329 dlm_unlock_recovery(ls); 6330 } 6331 return error; 6332 } 6333 6334 /* debug functionality */ 6335 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len, 6336 int lkb_nodeid, unsigned int lkb_flags, int lkb_status) 6337 { 6338 struct dlm_lksb *lksb; 6339 struct dlm_lkb *lkb; 6340 struct dlm_rsb *r; 6341 int error; 6342 6343 /* we currently can't set a valid user lock */ 6344 if (lkb_flags & DLM_IFL_USER) 6345 return -EOPNOTSUPP; 6346 6347 lksb = kzalloc(sizeof(*lksb), GFP_NOFS); 6348 if (!lksb) 6349 return -ENOMEM; 6350 6351 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1); 6352 if (error) { 6353 kfree(lksb); 6354 return error; 6355 } 6356 6357 lkb->lkb_flags = lkb_flags; 6358 lkb->lkb_nodeid = lkb_nodeid; 6359 lkb->lkb_lksb = lksb; 6360 /* user specific pointer, just don't have it NULL for kernel locks */ 6361 if (~lkb_flags & DLM_IFL_USER) 6362 lkb->lkb_astparam = (void *)0xDEADBEEF; 6363 6364 error = find_rsb(ls, name, len, 0, R_REQUEST, &r); 6365 if (error) { 6366 kfree(lksb); 6367 __put_lkb(ls, lkb); 6368 return error; 6369 } 6370 6371 lock_rsb(r); 6372 attach_lkb(r, lkb); 6373 add_lkb(r, lkb, lkb_status); 6374 unlock_rsb(r); 6375 put_rsb(r); 6376 6377 return 0; 6378 } 6379 6380 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id, 6381 int mstype, int to_nodeid) 6382 { 6383 struct dlm_lkb *lkb; 6384 int error; 6385 6386 error = find_lkb(ls, lkb_id, &lkb); 6387 if (error) 6388 return error; 6389 6390 error = add_to_waiters(lkb, mstype, to_nodeid); 6391 dlm_put_lkb(lkb); 6392 return error; 6393 } 6394 6395