1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 /* Central locking logic has four stages: 14 15 dlm_lock() 16 dlm_unlock() 17 18 request_lock(ls, lkb) 19 convert_lock(ls, lkb) 20 unlock_lock(ls, lkb) 21 cancel_lock(ls, lkb) 22 23 _request_lock(r, lkb) 24 _convert_lock(r, lkb) 25 _unlock_lock(r, lkb) 26 _cancel_lock(r, lkb) 27 28 do_request(r, lkb) 29 do_convert(r, lkb) 30 do_unlock(r, lkb) 31 do_cancel(r, lkb) 32 33 Stage 1 (lock, unlock) is mainly about checking input args and 34 splitting into one of the four main operations: 35 36 dlm_lock = request_lock 37 dlm_lock+CONVERT = convert_lock 38 dlm_unlock = unlock_lock 39 dlm_unlock+CANCEL = cancel_lock 40 41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is 42 provided to the next stage. 43 44 Stage 3, _xxxx_lock(), determines if the operation is local or remote. 45 When remote, it calls send_xxxx(), when local it calls do_xxxx(). 46 47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the 48 given rsb and lkb and queues callbacks. 49 50 For remote operations, send_xxxx() results in the corresponding do_xxxx() 51 function being executed on the remote node. The connecting send/receive 52 calls on local (L) and remote (R) nodes: 53 54 L: send_xxxx() -> R: receive_xxxx() 55 R: do_xxxx() 56 L: receive_xxxx_reply() <- R: send_xxxx_reply() 57 */ 58 #include <linux/types.h> 59 #include "dlm_internal.h" 60 #include <linux/dlm_device.h> 61 #include "memory.h" 62 #include "lowcomms.h" 63 #include "requestqueue.h" 64 #include "util.h" 65 #include "dir.h" 66 #include "member.h" 67 #include "lockspace.h" 68 #include "ast.h" 69 #include "lock.h" 70 #include "rcom.h" 71 #include "recover.h" 72 #include "lvb_table.h" 73 #include "user.h" 74 #include "config.h" 75 76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb); 77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb); 78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb); 79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb); 80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb); 81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode); 82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb); 83 static int send_remove(struct dlm_rsb *r); 84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb); 85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 86 struct dlm_message *ms); 87 static int receive_extralen(struct dlm_message *ms); 88 89 /* 90 * Lock compatibilty matrix - thanks Steve 91 * UN = Unlocked state. Not really a state, used as a flag 92 * PD = Padding. Used to make the matrix a nice power of two in size 93 * Other states are the same as the VMS DLM. 94 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same) 95 */ 96 97 static const int __dlm_compat_matrix[8][8] = { 98 /* UN NL CR CW PR PW EX PD */ 99 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */ 100 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */ 101 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */ 102 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */ 103 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */ 104 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */ 105 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */ 106 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 107 }; 108 109 /* 110 * This defines the direction of transfer of LVB data. 111 * Granted mode is the row; requested mode is the column. 112 * Usage: matrix[grmode+1][rqmode+1] 113 * 1 = LVB is returned to the caller 114 * 0 = LVB is written to the resource 115 * -1 = nothing happens to the LVB 116 */ 117 118 const int dlm_lvb_operations[8][8] = { 119 /* UN NL CR CW PR PW EX PD*/ 120 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */ 121 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */ 122 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */ 123 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */ 124 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */ 125 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */ 126 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */ 127 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */ 128 }; 129 130 #define modes_compat(gr, rq) \ 131 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1] 132 133 int dlm_modes_compat(int mode1, int mode2) 134 { 135 return __dlm_compat_matrix[mode1 + 1][mode2 + 1]; 136 } 137 138 /* 139 * Compatibility matrix for conversions with QUECVT set. 140 * Granted mode is the row; requested mode is the column. 141 * Usage: matrix[grmode+1][rqmode+1] 142 */ 143 144 static const int __quecvt_compat_matrix[8][8] = { 145 /* UN NL CR CW PR PW EX PD */ 146 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */ 147 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */ 148 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */ 149 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */ 150 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */ 151 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */ 152 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */ 153 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */ 154 }; 155 156 void dlm_print_lkb(struct dlm_lkb *lkb) 157 { 158 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n" 159 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n", 160 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, 161 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, 162 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type); 163 } 164 165 void dlm_print_rsb(struct dlm_rsb *r) 166 { 167 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n", 168 r->res_nodeid, r->res_flags, r->res_first_lkid, 169 r->res_recover_locks_count, r->res_name); 170 } 171 172 void dlm_dump_rsb(struct dlm_rsb *r) 173 { 174 struct dlm_lkb *lkb; 175 176 dlm_print_rsb(r); 177 178 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n", 179 list_empty(&r->res_root_list), list_empty(&r->res_recover_list)); 180 printk(KERN_ERR "rsb lookup list\n"); 181 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup) 182 dlm_print_lkb(lkb); 183 printk(KERN_ERR "rsb grant queue:\n"); 184 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) 185 dlm_print_lkb(lkb); 186 printk(KERN_ERR "rsb convert queue:\n"); 187 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) 188 dlm_print_lkb(lkb); 189 printk(KERN_ERR "rsb wait queue:\n"); 190 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue) 191 dlm_print_lkb(lkb); 192 } 193 194 /* Threads cannot use the lockspace while it's being recovered */ 195 196 static inline void lock_recovery(struct dlm_ls *ls) 197 { 198 down_read(&ls->ls_in_recovery); 199 } 200 201 static inline void unlock_recovery(struct dlm_ls *ls) 202 { 203 up_read(&ls->ls_in_recovery); 204 } 205 206 static inline int lock_recovery_try(struct dlm_ls *ls) 207 { 208 return down_read_trylock(&ls->ls_in_recovery); 209 } 210 211 static inline int can_be_queued(struct dlm_lkb *lkb) 212 { 213 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE); 214 } 215 216 static inline int force_blocking_asts(struct dlm_lkb *lkb) 217 { 218 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST); 219 } 220 221 static inline int is_demoted(struct dlm_lkb *lkb) 222 { 223 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED); 224 } 225 226 static inline int is_remote(struct dlm_rsb *r) 227 { 228 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r);); 229 return !!r->res_nodeid; 230 } 231 232 static inline int is_process_copy(struct dlm_lkb *lkb) 233 { 234 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY)); 235 } 236 237 static inline int is_master_copy(struct dlm_lkb *lkb) 238 { 239 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 240 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb);); 241 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; 242 } 243 244 static inline int middle_conversion(struct dlm_lkb *lkb) 245 { 246 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) || 247 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW)) 248 return 1; 249 return 0; 250 } 251 252 static inline int down_conversion(struct dlm_lkb *lkb) 253 { 254 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode); 255 } 256 257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 258 { 259 if (is_master_copy(lkb)) 260 return; 261 262 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb);); 263 264 lkb->lkb_lksb->sb_status = rv; 265 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags; 266 267 dlm_add_ast(lkb, AST_COMP); 268 } 269 270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) 271 { 272 if (is_master_copy(lkb)) 273 send_bast(r, lkb, rqmode); 274 else { 275 lkb->lkb_bastmode = rqmode; 276 dlm_add_ast(lkb, AST_BAST); 277 } 278 } 279 280 /* 281 * Basic operations on rsb's and lkb's 282 */ 283 284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len) 285 { 286 struct dlm_rsb *r; 287 288 r = allocate_rsb(ls, len); 289 if (!r) 290 return NULL; 291 292 r->res_ls = ls; 293 r->res_length = len; 294 memcpy(r->res_name, name, len); 295 mutex_init(&r->res_mutex); 296 297 INIT_LIST_HEAD(&r->res_lookup); 298 INIT_LIST_HEAD(&r->res_grantqueue); 299 INIT_LIST_HEAD(&r->res_convertqueue); 300 INIT_LIST_HEAD(&r->res_waitqueue); 301 INIT_LIST_HEAD(&r->res_root_list); 302 INIT_LIST_HEAD(&r->res_recover_list); 303 304 return r; 305 } 306 307 static int search_rsb_list(struct list_head *head, char *name, int len, 308 unsigned int flags, struct dlm_rsb **r_ret) 309 { 310 struct dlm_rsb *r; 311 int error = 0; 312 313 list_for_each_entry(r, head, res_hashchain) { 314 if (len == r->res_length && !memcmp(name, r->res_name, len)) 315 goto found; 316 } 317 return -EBADR; 318 319 found: 320 if (r->res_nodeid && (flags & R_MASTER)) 321 error = -ENOTBLK; 322 *r_ret = r; 323 return error; 324 } 325 326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b, 327 unsigned int flags, struct dlm_rsb **r_ret) 328 { 329 struct dlm_rsb *r; 330 int error; 331 332 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r); 333 if (!error) { 334 kref_get(&r->res_ref); 335 goto out; 336 } 337 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r); 338 if (error) 339 goto out; 340 341 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list); 342 343 if (dlm_no_directory(ls)) 344 goto out; 345 346 if (r->res_nodeid == -1) { 347 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 348 r->res_first_lkid = 0; 349 } else if (r->res_nodeid > 0) { 350 rsb_set_flag(r, RSB_MASTER_UNCERTAIN); 351 r->res_first_lkid = 0; 352 } else { 353 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r);); 354 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),); 355 } 356 out: 357 *r_ret = r; 358 return error; 359 } 360 361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b, 362 unsigned int flags, struct dlm_rsb **r_ret) 363 { 364 int error; 365 write_lock(&ls->ls_rsbtbl[b].lock); 366 error = _search_rsb(ls, name, len, b, flags, r_ret); 367 write_unlock(&ls->ls_rsbtbl[b].lock); 368 return error; 369 } 370 371 /* 372 * Find rsb in rsbtbl and potentially create/add one 373 * 374 * Delaying the release of rsb's has a similar benefit to applications keeping 375 * NL locks on an rsb, but without the guarantee that the cached master value 376 * will still be valid when the rsb is reused. Apps aren't always smart enough 377 * to keep NL locks on an rsb that they may lock again shortly; this can lead 378 * to excessive master lookups and removals if we don't delay the release. 379 * 380 * Searching for an rsb means looking through both the normal list and toss 381 * list. When found on the toss list the rsb is moved to the normal list with 382 * ref count of 1; when found on normal list the ref count is incremented. 383 */ 384 385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen, 386 unsigned int flags, struct dlm_rsb **r_ret) 387 { 388 struct dlm_rsb *r, *tmp; 389 uint32_t hash, bucket; 390 int error = 0; 391 392 if (dlm_no_directory(ls)) 393 flags |= R_CREATE; 394 395 hash = jhash(name, namelen, 0); 396 bucket = hash & (ls->ls_rsbtbl_size - 1); 397 398 error = search_rsb(ls, name, namelen, bucket, flags, &r); 399 if (!error) 400 goto out; 401 402 if (error == -EBADR && !(flags & R_CREATE)) 403 goto out; 404 405 /* the rsb was found but wasn't a master copy */ 406 if (error == -ENOTBLK) 407 goto out; 408 409 error = -ENOMEM; 410 r = create_rsb(ls, name, namelen); 411 if (!r) 412 goto out; 413 414 r->res_hash = hash; 415 r->res_bucket = bucket; 416 r->res_nodeid = -1; 417 kref_init(&r->res_ref); 418 419 /* With no directory, the master can be set immediately */ 420 if (dlm_no_directory(ls)) { 421 int nodeid = dlm_dir_nodeid(r); 422 if (nodeid == dlm_our_nodeid()) 423 nodeid = 0; 424 r->res_nodeid = nodeid; 425 } 426 427 write_lock(&ls->ls_rsbtbl[bucket].lock); 428 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp); 429 if (!error) { 430 write_unlock(&ls->ls_rsbtbl[bucket].lock); 431 free_rsb(r); 432 r = tmp; 433 goto out; 434 } 435 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list); 436 write_unlock(&ls->ls_rsbtbl[bucket].lock); 437 error = 0; 438 out: 439 *r_ret = r; 440 return error; 441 } 442 443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen, 444 unsigned int flags, struct dlm_rsb **r_ret) 445 { 446 return find_rsb(ls, name, namelen, flags, r_ret); 447 } 448 449 /* This is only called to add a reference when the code already holds 450 a valid reference to the rsb, so there's no need for locking. */ 451 452 static inline void hold_rsb(struct dlm_rsb *r) 453 { 454 kref_get(&r->res_ref); 455 } 456 457 void dlm_hold_rsb(struct dlm_rsb *r) 458 { 459 hold_rsb(r); 460 } 461 462 static void toss_rsb(struct kref *kref) 463 { 464 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 465 struct dlm_ls *ls = r->res_ls; 466 467 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); 468 kref_init(&r->res_ref); 469 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss); 470 r->res_toss_time = jiffies; 471 if (r->res_lvbptr) { 472 free_lvb(r->res_lvbptr); 473 r->res_lvbptr = NULL; 474 } 475 } 476 477 /* When all references to the rsb are gone it's transfered to 478 the tossed list for later disposal. */ 479 480 static void put_rsb(struct dlm_rsb *r) 481 { 482 struct dlm_ls *ls = r->res_ls; 483 uint32_t bucket = r->res_bucket; 484 485 write_lock(&ls->ls_rsbtbl[bucket].lock); 486 kref_put(&r->res_ref, toss_rsb); 487 write_unlock(&ls->ls_rsbtbl[bucket].lock); 488 } 489 490 void dlm_put_rsb(struct dlm_rsb *r) 491 { 492 put_rsb(r); 493 } 494 495 /* See comment for unhold_lkb */ 496 497 static void unhold_rsb(struct dlm_rsb *r) 498 { 499 int rv; 500 rv = kref_put(&r->res_ref, toss_rsb); 501 DLM_ASSERT(!rv, dlm_dump_rsb(r);); 502 } 503 504 static void kill_rsb(struct kref *kref) 505 { 506 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref); 507 508 /* All work is done after the return from kref_put() so we 509 can release the write_lock before the remove and free. */ 510 511 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); 512 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); 513 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); 514 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); 515 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); 516 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); 517 } 518 519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting. 520 The rsb must exist as long as any lkb's for it do. */ 521 522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 523 { 524 hold_rsb(r); 525 lkb->lkb_resource = r; 526 } 527 528 static void detach_lkb(struct dlm_lkb *lkb) 529 { 530 if (lkb->lkb_resource) { 531 put_rsb(lkb->lkb_resource); 532 lkb->lkb_resource = NULL; 533 } 534 } 535 536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 537 { 538 struct dlm_lkb *lkb, *tmp; 539 uint32_t lkid = 0; 540 uint16_t bucket; 541 542 lkb = allocate_lkb(ls); 543 if (!lkb) 544 return -ENOMEM; 545 546 lkb->lkb_nodeid = -1; 547 lkb->lkb_grmode = DLM_LOCK_IV; 548 kref_init(&lkb->lkb_ref); 549 INIT_LIST_HEAD(&lkb->lkb_ownqueue); 550 551 get_random_bytes(&bucket, sizeof(bucket)); 552 bucket &= (ls->ls_lkbtbl_size - 1); 553 554 write_lock(&ls->ls_lkbtbl[bucket].lock); 555 556 /* counter can roll over so we must verify lkid is not in use */ 557 558 while (lkid == 0) { 559 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16); 560 561 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list, 562 lkb_idtbl_list) { 563 if (tmp->lkb_id != lkid) 564 continue; 565 lkid = 0; 566 break; 567 } 568 } 569 570 lkb->lkb_id = lkid; 571 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list); 572 write_unlock(&ls->ls_lkbtbl[bucket].lock); 573 574 *lkb_ret = lkb; 575 return 0; 576 } 577 578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid) 579 { 580 uint16_t bucket = lkid & 0xFFFF; 581 struct dlm_lkb *lkb; 582 583 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) { 584 if (lkb->lkb_id == lkid) 585 return lkb; 586 } 587 return NULL; 588 } 589 590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret) 591 { 592 struct dlm_lkb *lkb; 593 uint16_t bucket = lkid & 0xFFFF; 594 595 if (bucket >= ls->ls_lkbtbl_size) 596 return -EBADSLT; 597 598 read_lock(&ls->ls_lkbtbl[bucket].lock); 599 lkb = __find_lkb(ls, lkid); 600 if (lkb) 601 kref_get(&lkb->lkb_ref); 602 read_unlock(&ls->ls_lkbtbl[bucket].lock); 603 604 *lkb_ret = lkb; 605 return lkb ? 0 : -ENOENT; 606 } 607 608 static void kill_lkb(struct kref *kref) 609 { 610 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref); 611 612 /* All work is done after the return from kref_put() so we 613 can release the write_lock before the detach_lkb */ 614 615 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 616 } 617 618 /* __put_lkb() is used when an lkb may not have an rsb attached to 619 it so we need to provide the lockspace explicitly */ 620 621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb) 622 { 623 uint16_t bucket = lkb->lkb_id & 0xFFFF; 624 625 write_lock(&ls->ls_lkbtbl[bucket].lock); 626 if (kref_put(&lkb->lkb_ref, kill_lkb)) { 627 list_del(&lkb->lkb_idtbl_list); 628 write_unlock(&ls->ls_lkbtbl[bucket].lock); 629 630 detach_lkb(lkb); 631 632 /* for local/process lkbs, lvbptr points to caller's lksb */ 633 if (lkb->lkb_lvbptr && is_master_copy(lkb)) 634 free_lvb(lkb->lkb_lvbptr); 635 free_lkb(lkb); 636 return 1; 637 } else { 638 write_unlock(&ls->ls_lkbtbl[bucket].lock); 639 return 0; 640 } 641 } 642 643 int dlm_put_lkb(struct dlm_lkb *lkb) 644 { 645 struct dlm_ls *ls; 646 647 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb);); 648 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb);); 649 650 ls = lkb->lkb_resource->res_ls; 651 return __put_lkb(ls, lkb); 652 } 653 654 /* This is only called to add a reference when the code already holds 655 a valid reference to the lkb, so there's no need for locking. */ 656 657 static inline void hold_lkb(struct dlm_lkb *lkb) 658 { 659 kref_get(&lkb->lkb_ref); 660 } 661 662 /* This is called when we need to remove a reference and are certain 663 it's not the last ref. e.g. del_lkb is always called between a 664 find_lkb/put_lkb and is always the inverse of a previous add_lkb. 665 put_lkb would work fine, but would involve unnecessary locking */ 666 667 static inline void unhold_lkb(struct dlm_lkb *lkb) 668 { 669 int rv; 670 rv = kref_put(&lkb->lkb_ref, kill_lkb); 671 DLM_ASSERT(!rv, dlm_print_lkb(lkb);); 672 } 673 674 static void lkb_add_ordered(struct list_head *new, struct list_head *head, 675 int mode) 676 { 677 struct dlm_lkb *lkb = NULL; 678 679 list_for_each_entry(lkb, head, lkb_statequeue) 680 if (lkb->lkb_rqmode < mode) 681 break; 682 683 if (!lkb) 684 list_add_tail(new, head); 685 else 686 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue); 687 } 688 689 /* add/remove lkb to rsb's grant/convert/wait queue */ 690 691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status) 692 { 693 kref_get(&lkb->lkb_ref); 694 695 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb);); 696 697 lkb->lkb_status = status; 698 699 switch (status) { 700 case DLM_LKSTS_WAITING: 701 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 702 list_add(&lkb->lkb_statequeue, &r->res_waitqueue); 703 else 704 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue); 705 break; 706 case DLM_LKSTS_GRANTED: 707 /* convention says granted locks kept in order of grmode */ 708 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue, 709 lkb->lkb_grmode); 710 break; 711 case DLM_LKSTS_CONVERT: 712 if (lkb->lkb_exflags & DLM_LKF_HEADQUE) 713 list_add(&lkb->lkb_statequeue, &r->res_convertqueue); 714 else 715 list_add_tail(&lkb->lkb_statequeue, 716 &r->res_convertqueue); 717 break; 718 default: 719 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status);); 720 } 721 } 722 723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb) 724 { 725 lkb->lkb_status = 0; 726 list_del(&lkb->lkb_statequeue); 727 unhold_lkb(lkb); 728 } 729 730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts) 731 { 732 hold_lkb(lkb); 733 del_lkb(r, lkb); 734 add_lkb(r, lkb, sts); 735 unhold_lkb(lkb); 736 } 737 738 /* add/remove lkb from global waiters list of lkb's waiting for 739 a reply from a remote node */ 740 741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype) 742 { 743 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 744 745 mutex_lock(&ls->ls_waiters_mutex); 746 if (lkb->lkb_wait_type) { 747 log_print("add_to_waiters error %d", lkb->lkb_wait_type); 748 goto out; 749 } 750 lkb->lkb_wait_type = mstype; 751 kref_get(&lkb->lkb_ref); 752 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters); 753 out: 754 mutex_unlock(&ls->ls_waiters_mutex); 755 } 756 757 /* We clear the RESEND flag because we might be taking an lkb off the waiters 758 list as part of process_requestqueue (e.g. a lookup that has an optimized 759 request reply on the requestqueue) between dlm_recover_waiters_pre() which 760 set RESEND and dlm_recover_waiters_post() */ 761 762 static int _remove_from_waiters(struct dlm_lkb *lkb) 763 { 764 int error = 0; 765 766 if (!lkb->lkb_wait_type) { 767 log_print("remove_from_waiters error"); 768 error = -EINVAL; 769 goto out; 770 } 771 lkb->lkb_wait_type = 0; 772 lkb->lkb_flags &= ~DLM_IFL_RESEND; 773 list_del(&lkb->lkb_wait_reply); 774 unhold_lkb(lkb); 775 out: 776 return error; 777 } 778 779 static int remove_from_waiters(struct dlm_lkb *lkb) 780 { 781 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 782 int error; 783 784 mutex_lock(&ls->ls_waiters_mutex); 785 error = _remove_from_waiters(lkb); 786 mutex_unlock(&ls->ls_waiters_mutex); 787 return error; 788 } 789 790 static void dir_remove(struct dlm_rsb *r) 791 { 792 int to_nodeid; 793 794 if (dlm_no_directory(r->res_ls)) 795 return; 796 797 to_nodeid = dlm_dir_nodeid(r); 798 if (to_nodeid != dlm_our_nodeid()) 799 send_remove(r); 800 else 801 dlm_dir_remove_entry(r->res_ls, to_nodeid, 802 r->res_name, r->res_length); 803 } 804 805 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is 806 found since they are in order of newest to oldest? */ 807 808 static int shrink_bucket(struct dlm_ls *ls, int b) 809 { 810 struct dlm_rsb *r; 811 int count = 0, found; 812 813 for (;;) { 814 found = 0; 815 write_lock(&ls->ls_rsbtbl[b].lock); 816 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss, 817 res_hashchain) { 818 if (!time_after_eq(jiffies, r->res_toss_time + 819 dlm_config.ci_toss_secs * HZ)) 820 continue; 821 found = 1; 822 break; 823 } 824 825 if (!found) { 826 write_unlock(&ls->ls_rsbtbl[b].lock); 827 break; 828 } 829 830 if (kref_put(&r->res_ref, kill_rsb)) { 831 list_del(&r->res_hashchain); 832 write_unlock(&ls->ls_rsbtbl[b].lock); 833 834 if (is_master(r)) 835 dir_remove(r); 836 free_rsb(r); 837 count++; 838 } else { 839 write_unlock(&ls->ls_rsbtbl[b].lock); 840 log_error(ls, "tossed rsb in use %s", r->res_name); 841 } 842 } 843 844 return count; 845 } 846 847 void dlm_scan_rsbs(struct dlm_ls *ls) 848 { 849 int i; 850 851 if (dlm_locking_stopped(ls)) 852 return; 853 854 for (i = 0; i < ls->ls_rsbtbl_size; i++) { 855 shrink_bucket(ls, i); 856 cond_resched(); 857 } 858 } 859 860 /* lkb is master or local copy */ 861 862 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 863 { 864 int b, len = r->res_ls->ls_lvblen; 865 866 /* b=1 lvb returned to caller 867 b=0 lvb written to rsb or invalidated 868 b=-1 do nothing */ 869 870 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 871 872 if (b == 1) { 873 if (!lkb->lkb_lvbptr) 874 return; 875 876 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 877 return; 878 879 if (!r->res_lvbptr) 880 return; 881 882 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len); 883 lkb->lkb_lvbseq = r->res_lvbseq; 884 885 } else if (b == 0) { 886 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 887 rsb_set_flag(r, RSB_VALNOTVALID); 888 return; 889 } 890 891 if (!lkb->lkb_lvbptr) 892 return; 893 894 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 895 return; 896 897 if (!r->res_lvbptr) 898 r->res_lvbptr = allocate_lvb(r->res_ls); 899 900 if (!r->res_lvbptr) 901 return; 902 903 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len); 904 r->res_lvbseq++; 905 lkb->lkb_lvbseq = r->res_lvbseq; 906 rsb_clear_flag(r, RSB_VALNOTVALID); 907 } 908 909 if (rsb_flag(r, RSB_VALNOTVALID)) 910 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID; 911 } 912 913 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 914 { 915 if (lkb->lkb_grmode < DLM_LOCK_PW) 916 return; 917 918 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) { 919 rsb_set_flag(r, RSB_VALNOTVALID); 920 return; 921 } 922 923 if (!lkb->lkb_lvbptr) 924 return; 925 926 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 927 return; 928 929 if (!r->res_lvbptr) 930 r->res_lvbptr = allocate_lvb(r->res_ls); 931 932 if (!r->res_lvbptr) 933 return; 934 935 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 936 r->res_lvbseq++; 937 rsb_clear_flag(r, RSB_VALNOTVALID); 938 } 939 940 /* lkb is process copy (pc) */ 941 942 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 943 struct dlm_message *ms) 944 { 945 int b; 946 947 if (!lkb->lkb_lvbptr) 948 return; 949 950 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK)) 951 return; 952 953 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; 954 if (b == 1) { 955 int len = receive_extralen(ms); 956 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 957 lkb->lkb_lvbseq = ms->m_lvbseq; 958 } 959 } 960 961 /* Manipulate lkb's on rsb's convert/granted/waiting queues 962 remove_lock -- used for unlock, removes lkb from granted 963 revert_lock -- used for cancel, moves lkb from convert to granted 964 grant_lock -- used for request and convert, adds lkb to granted or 965 moves lkb from convert or waiting to granted 966 967 Each of these is used for master or local copy lkb's. There is 968 also a _pc() variation used to make the corresponding change on 969 a process copy (pc) lkb. */ 970 971 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 972 { 973 del_lkb(r, lkb); 974 lkb->lkb_grmode = DLM_LOCK_IV; 975 /* this unhold undoes the original ref from create_lkb() 976 so this leads to the lkb being freed */ 977 unhold_lkb(lkb); 978 } 979 980 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 981 { 982 set_lvb_unlock(r, lkb); 983 _remove_lock(r, lkb); 984 } 985 986 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 987 { 988 _remove_lock(r, lkb); 989 } 990 991 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 992 { 993 lkb->lkb_rqmode = DLM_LOCK_IV; 994 995 switch (lkb->lkb_status) { 996 case DLM_LKSTS_GRANTED: 997 break; 998 case DLM_LKSTS_CONVERT: 999 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1000 break; 1001 case DLM_LKSTS_WAITING: 1002 del_lkb(r, lkb); 1003 lkb->lkb_grmode = DLM_LOCK_IV; 1004 /* this unhold undoes the original ref from create_lkb() 1005 so this leads to the lkb being freed */ 1006 unhold_lkb(lkb); 1007 break; 1008 default: 1009 log_print("invalid status for revert %d", lkb->lkb_status); 1010 } 1011 } 1012 1013 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb) 1014 { 1015 revert_lock(r, lkb); 1016 } 1017 1018 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1019 { 1020 if (lkb->lkb_grmode != lkb->lkb_rqmode) { 1021 lkb->lkb_grmode = lkb->lkb_rqmode; 1022 if (lkb->lkb_status) 1023 move_lkb(r, lkb, DLM_LKSTS_GRANTED); 1024 else 1025 add_lkb(r, lkb, DLM_LKSTS_GRANTED); 1026 } 1027 1028 lkb->lkb_rqmode = DLM_LOCK_IV; 1029 } 1030 1031 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1032 { 1033 set_lvb_lock(r, lkb); 1034 _grant_lock(r, lkb); 1035 lkb->lkb_highbast = 0; 1036 } 1037 1038 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, 1039 struct dlm_message *ms) 1040 { 1041 set_lvb_lock_pc(r, lkb, ms); 1042 _grant_lock(r, lkb); 1043 } 1044 1045 /* called by grant_pending_locks() which means an async grant message must 1046 be sent to the requesting node in addition to granting the lock if the 1047 lkb belongs to a remote node. */ 1048 1049 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb) 1050 { 1051 grant_lock(r, lkb); 1052 if (is_master_copy(lkb)) 1053 send_grant(r, lkb); 1054 else 1055 queue_cast(r, lkb, 0); 1056 } 1057 1058 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head) 1059 { 1060 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb, 1061 lkb_statequeue); 1062 if (lkb->lkb_id == first->lkb_id) 1063 return 1; 1064 1065 return 0; 1066 } 1067 1068 /* Check if the given lkb conflicts with another lkb on the queue. */ 1069 1070 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb) 1071 { 1072 struct dlm_lkb *this; 1073 1074 list_for_each_entry(this, head, lkb_statequeue) { 1075 if (this == lkb) 1076 continue; 1077 if (!modes_compat(this, lkb)) 1078 return 1; 1079 } 1080 return 0; 1081 } 1082 1083 /* 1084 * "A conversion deadlock arises with a pair of lock requests in the converting 1085 * queue for one resource. The granted mode of each lock blocks the requested 1086 * mode of the other lock." 1087 * 1088 * Part 2: if the granted mode of lkb is preventing the first lkb in the 1089 * convert queue from being granted, then demote lkb (set grmode to NL). 1090 * This second form requires that we check for conv-deadlk even when 1091 * now == 0 in _can_be_granted(). 1092 * 1093 * Example: 1094 * Granted Queue: empty 1095 * Convert Queue: NL->EX (first lock) 1096 * PR->EX (second lock) 1097 * 1098 * The first lock can't be granted because of the granted mode of the second 1099 * lock and the second lock can't be granted because it's not first in the 1100 * list. We demote the granted mode of the second lock (the lkb passed to this 1101 * function). 1102 * 1103 * After the resolution, the "grant pending" function needs to go back and try 1104 * to grant locks on the convert queue again since the first lock can now be 1105 * granted. 1106 */ 1107 1108 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb) 1109 { 1110 struct dlm_lkb *this, *first = NULL, *self = NULL; 1111 1112 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) { 1113 if (!first) 1114 first = this; 1115 if (this == lkb) { 1116 self = lkb; 1117 continue; 1118 } 1119 1120 if (!modes_compat(this, lkb) && !modes_compat(lkb, this)) 1121 return 1; 1122 } 1123 1124 /* if lkb is on the convert queue and is preventing the first 1125 from being granted, then there's deadlock and we demote lkb. 1126 multiple converting locks may need to do this before the first 1127 converting lock can be granted. */ 1128 1129 if (self && self != first) { 1130 if (!modes_compat(lkb, first) && 1131 !queue_conflict(&rsb->res_grantqueue, first)) 1132 return 1; 1133 } 1134 1135 return 0; 1136 } 1137 1138 /* 1139 * Return 1 if the lock can be granted, 0 otherwise. 1140 * Also detect and resolve conversion deadlocks. 1141 * 1142 * lkb is the lock to be granted 1143 * 1144 * now is 1 if the function is being called in the context of the 1145 * immediate request, it is 0 if called later, after the lock has been 1146 * queued. 1147 * 1148 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis 1149 */ 1150 1151 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 1152 { 1153 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV); 1154 1155 /* 1156 * 6-10: Version 5.4 introduced an option to address the phenomenon of 1157 * a new request for a NL mode lock being blocked. 1158 * 1159 * 6-11: If the optional EXPEDITE flag is used with the new NL mode 1160 * request, then it would be granted. In essence, the use of this flag 1161 * tells the Lock Manager to expedite theis request by not considering 1162 * what may be in the CONVERTING or WAITING queues... As of this 1163 * writing, the EXPEDITE flag can be used only with new requests for NL 1164 * mode locks. This flag is not valid for conversion requests. 1165 * 1166 * A shortcut. Earlier checks return an error if EXPEDITE is used in a 1167 * conversion or used with a non-NL requested mode. We also know an 1168 * EXPEDITE request is always granted immediately, so now must always 1169 * be 1. The full condition to grant an expedite request: (now && 1170 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can 1171 * therefore be shortened to just checking the flag. 1172 */ 1173 1174 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) 1175 return 1; 1176 1177 /* 1178 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be 1179 * added to the remaining conditions. 1180 */ 1181 1182 if (queue_conflict(&r->res_grantqueue, lkb)) 1183 goto out; 1184 1185 /* 1186 * 6-3: By default, a conversion request is immediately granted if the 1187 * requested mode is compatible with the modes of all other granted 1188 * locks 1189 */ 1190 1191 if (queue_conflict(&r->res_convertqueue, lkb)) 1192 goto out; 1193 1194 /* 1195 * 6-5: But the default algorithm for deciding whether to grant or 1196 * queue conversion requests does not by itself guarantee that such 1197 * requests are serviced on a "first come first serve" basis. This, in 1198 * turn, can lead to a phenomenon known as "indefinate postponement". 1199 * 1200 * 6-7: This issue is dealt with by using the optional QUECVT flag with 1201 * the system service employed to request a lock conversion. This flag 1202 * forces certain conversion requests to be queued, even if they are 1203 * compatible with the granted modes of other locks on the same 1204 * resource. Thus, the use of this flag results in conversion requests 1205 * being ordered on a "first come first servce" basis. 1206 * 1207 * DCT: This condition is all about new conversions being able to occur 1208 * "in place" while the lock remains on the granted queue (assuming 1209 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion 1210 * doesn't _have_ to go onto the convert queue where it's processed in 1211 * order. The "now" variable is necessary to distinguish converts 1212 * being received and processed for the first time now, because once a 1213 * convert is moved to the conversion queue the condition below applies 1214 * requiring fifo granting. 1215 */ 1216 1217 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT)) 1218 return 1; 1219 1220 /* 1221 * The NOORDER flag is set to avoid the standard vms rules on grant 1222 * order. 1223 */ 1224 1225 if (lkb->lkb_exflags & DLM_LKF_NOORDER) 1226 return 1; 1227 1228 /* 1229 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be 1230 * granted until all other conversion requests ahead of it are granted 1231 * and/or canceled. 1232 */ 1233 1234 if (!now && conv && first_in_list(lkb, &r->res_convertqueue)) 1235 return 1; 1236 1237 /* 1238 * 6-4: By default, a new request is immediately granted only if all 1239 * three of the following conditions are satisfied when the request is 1240 * issued: 1241 * - The queue of ungranted conversion requests for the resource is 1242 * empty. 1243 * - The queue of ungranted new requests for the resource is empty. 1244 * - The mode of the new request is compatible with the most 1245 * restrictive mode of all granted locks on the resource. 1246 */ 1247 1248 if (now && !conv && list_empty(&r->res_convertqueue) && 1249 list_empty(&r->res_waitqueue)) 1250 return 1; 1251 1252 /* 1253 * 6-4: Once a lock request is in the queue of ungranted new requests, 1254 * it cannot be granted until the queue of ungranted conversion 1255 * requests is empty, all ungranted new requests ahead of it are 1256 * granted and/or canceled, and it is compatible with the granted mode 1257 * of the most restrictive lock granted on the resource. 1258 */ 1259 1260 if (!now && !conv && list_empty(&r->res_convertqueue) && 1261 first_in_list(lkb, &r->res_waitqueue)) 1262 return 1; 1263 1264 out: 1265 /* 1266 * The following, enabled by CONVDEADLK, departs from VMS. 1267 */ 1268 1269 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) && 1270 conversion_deadlock_detect(r, lkb)) { 1271 lkb->lkb_grmode = DLM_LOCK_NL; 1272 lkb->lkb_sbflags |= DLM_SBF_DEMOTED; 1273 } 1274 1275 return 0; 1276 } 1277 1278 /* 1279 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a 1280 * simple way to provide a big optimization to applications that can use them. 1281 */ 1282 1283 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now) 1284 { 1285 uint32_t flags = lkb->lkb_exflags; 1286 int rv; 1287 int8_t alt = 0, rqmode = lkb->lkb_rqmode; 1288 1289 rv = _can_be_granted(r, lkb, now); 1290 if (rv) 1291 goto out; 1292 1293 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED) 1294 goto out; 1295 1296 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR) 1297 alt = DLM_LOCK_PR; 1298 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW) 1299 alt = DLM_LOCK_CW; 1300 1301 if (alt) { 1302 lkb->lkb_rqmode = alt; 1303 rv = _can_be_granted(r, lkb, now); 1304 if (rv) 1305 lkb->lkb_sbflags |= DLM_SBF_ALTMODE; 1306 else 1307 lkb->lkb_rqmode = rqmode; 1308 } 1309 out: 1310 return rv; 1311 } 1312 1313 static int grant_pending_convert(struct dlm_rsb *r, int high) 1314 { 1315 struct dlm_lkb *lkb, *s; 1316 int hi, demoted, quit, grant_restart, demote_restart; 1317 1318 quit = 0; 1319 restart: 1320 grant_restart = 0; 1321 demote_restart = 0; 1322 hi = DLM_LOCK_IV; 1323 1324 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) { 1325 demoted = is_demoted(lkb); 1326 if (can_be_granted(r, lkb, 0)) { 1327 grant_lock_pending(r, lkb); 1328 grant_restart = 1; 1329 } else { 1330 hi = max_t(int, lkb->lkb_rqmode, hi); 1331 if (!demoted && is_demoted(lkb)) 1332 demote_restart = 1; 1333 } 1334 } 1335 1336 if (grant_restart) 1337 goto restart; 1338 if (demote_restart && !quit) { 1339 quit = 1; 1340 goto restart; 1341 } 1342 1343 return max_t(int, high, hi); 1344 } 1345 1346 static int grant_pending_wait(struct dlm_rsb *r, int high) 1347 { 1348 struct dlm_lkb *lkb, *s; 1349 1350 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { 1351 if (can_be_granted(r, lkb, 0)) 1352 grant_lock_pending(r, lkb); 1353 else 1354 high = max_t(int, lkb->lkb_rqmode, high); 1355 } 1356 1357 return high; 1358 } 1359 1360 static void grant_pending_locks(struct dlm_rsb *r) 1361 { 1362 struct dlm_lkb *lkb, *s; 1363 int high = DLM_LOCK_IV; 1364 1365 DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); 1366 1367 high = grant_pending_convert(r, high); 1368 high = grant_pending_wait(r, high); 1369 1370 if (high == DLM_LOCK_IV) 1371 return; 1372 1373 /* 1374 * If there are locks left on the wait/convert queue then send blocking 1375 * ASTs to granted locks based on the largest requested mode (high) 1376 * found above. FIXME: highbast < high comparison not valid for PR/CW. 1377 */ 1378 1379 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) { 1380 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) && 1381 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) { 1382 queue_bast(r, lkb, high); 1383 lkb->lkb_highbast = high; 1384 } 1385 } 1386 } 1387 1388 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head, 1389 struct dlm_lkb *lkb) 1390 { 1391 struct dlm_lkb *gr; 1392 1393 list_for_each_entry(gr, head, lkb_statequeue) { 1394 if (gr->lkb_bastaddr && 1395 gr->lkb_highbast < lkb->lkb_rqmode && 1396 !modes_compat(gr, lkb)) { 1397 queue_bast(r, gr, lkb->lkb_rqmode); 1398 gr->lkb_highbast = lkb->lkb_rqmode; 1399 } 1400 } 1401 } 1402 1403 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb) 1404 { 1405 send_bast_queue(r, &r->res_grantqueue, lkb); 1406 } 1407 1408 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb) 1409 { 1410 send_bast_queue(r, &r->res_grantqueue, lkb); 1411 send_bast_queue(r, &r->res_convertqueue, lkb); 1412 } 1413 1414 /* set_master(r, lkb) -- set the master nodeid of a resource 1415 1416 The purpose of this function is to set the nodeid field in the given 1417 lkb using the nodeid field in the given rsb. If the rsb's nodeid is 1418 known, it can just be copied to the lkb and the function will return 1419 0. If the rsb's nodeid is _not_ known, it needs to be looked up 1420 before it can be copied to the lkb. 1421 1422 When the rsb nodeid is being looked up remotely, the initial lkb 1423 causing the lookup is kept on the ls_waiters list waiting for the 1424 lookup reply. Other lkb's waiting for the same rsb lookup are kept 1425 on the rsb's res_lookup list until the master is verified. 1426 1427 Return values: 1428 0: nodeid is set in rsb/lkb and the caller should go ahead and use it 1429 1: the rsb master is not available and the lkb has been placed on 1430 a wait queue 1431 */ 1432 1433 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb) 1434 { 1435 struct dlm_ls *ls = r->res_ls; 1436 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); 1437 1438 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) { 1439 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN); 1440 r->res_first_lkid = lkb->lkb_id; 1441 lkb->lkb_nodeid = r->res_nodeid; 1442 return 0; 1443 } 1444 1445 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) { 1446 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup); 1447 return 1; 1448 } 1449 1450 if (r->res_nodeid == 0) { 1451 lkb->lkb_nodeid = 0; 1452 return 0; 1453 } 1454 1455 if (r->res_nodeid > 0) { 1456 lkb->lkb_nodeid = r->res_nodeid; 1457 return 0; 1458 } 1459 1460 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r);); 1461 1462 dir_nodeid = dlm_dir_nodeid(r); 1463 1464 if (dir_nodeid != our_nodeid) { 1465 r->res_first_lkid = lkb->lkb_id; 1466 send_lookup(r, lkb); 1467 return 1; 1468 } 1469 1470 for (;;) { 1471 /* It's possible for dlm_scand to remove an old rsb for 1472 this same resource from the toss list, us to create 1473 a new one, look up the master locally, and find it 1474 already exists just before dlm_scand does the 1475 dir_remove() on the previous rsb. */ 1476 1477 error = dlm_dir_lookup(ls, our_nodeid, r->res_name, 1478 r->res_length, &ret_nodeid); 1479 if (!error) 1480 break; 1481 log_debug(ls, "dir_lookup error %d %s", error, r->res_name); 1482 schedule(); 1483 } 1484 1485 if (ret_nodeid == our_nodeid) { 1486 r->res_first_lkid = 0; 1487 r->res_nodeid = 0; 1488 lkb->lkb_nodeid = 0; 1489 } else { 1490 r->res_first_lkid = lkb->lkb_id; 1491 r->res_nodeid = ret_nodeid; 1492 lkb->lkb_nodeid = ret_nodeid; 1493 } 1494 return 0; 1495 } 1496 1497 static void process_lookup_list(struct dlm_rsb *r) 1498 { 1499 struct dlm_lkb *lkb, *safe; 1500 1501 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) { 1502 list_del(&lkb->lkb_rsb_lookup); 1503 _request_lock(r, lkb); 1504 schedule(); 1505 } 1506 } 1507 1508 /* confirm_master -- confirm (or deny) an rsb's master nodeid */ 1509 1510 static void confirm_master(struct dlm_rsb *r, int error) 1511 { 1512 struct dlm_lkb *lkb; 1513 1514 if (!r->res_first_lkid) 1515 return; 1516 1517 switch (error) { 1518 case 0: 1519 case -EINPROGRESS: 1520 r->res_first_lkid = 0; 1521 process_lookup_list(r); 1522 break; 1523 1524 case -EAGAIN: 1525 /* the remote master didn't queue our NOQUEUE request; 1526 make a waiting lkb the first_lkid */ 1527 1528 r->res_first_lkid = 0; 1529 1530 if (!list_empty(&r->res_lookup)) { 1531 lkb = list_entry(r->res_lookup.next, struct dlm_lkb, 1532 lkb_rsb_lookup); 1533 list_del(&lkb->lkb_rsb_lookup); 1534 r->res_first_lkid = lkb->lkb_id; 1535 _request_lock(r, lkb); 1536 } else 1537 r->res_nodeid = -1; 1538 break; 1539 1540 default: 1541 log_error(r->res_ls, "confirm_master unknown error %d", error); 1542 } 1543 } 1544 1545 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags, 1546 int namelen, uint32_t parent_lkid, void *ast, 1547 void *astarg, void *bast, struct dlm_args *args) 1548 { 1549 int rv = -EINVAL; 1550 1551 /* check for invalid arg usage */ 1552 1553 if (mode < 0 || mode > DLM_LOCK_EX) 1554 goto out; 1555 1556 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN)) 1557 goto out; 1558 1559 if (flags & DLM_LKF_CANCEL) 1560 goto out; 1561 1562 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT)) 1563 goto out; 1564 1565 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT)) 1566 goto out; 1567 1568 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE) 1569 goto out; 1570 1571 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT) 1572 goto out; 1573 1574 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT) 1575 goto out; 1576 1577 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE) 1578 goto out; 1579 1580 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL) 1581 goto out; 1582 1583 if (!ast || !lksb) 1584 goto out; 1585 1586 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr) 1587 goto out; 1588 1589 /* parent/child locks not yet supported */ 1590 if (parent_lkid) 1591 goto out; 1592 1593 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid) 1594 goto out; 1595 1596 /* these args will be copied to the lkb in validate_lock_args, 1597 it cannot be done now because when converting locks, fields in 1598 an active lkb cannot be modified before locking the rsb */ 1599 1600 args->flags = flags; 1601 args->astaddr = ast; 1602 args->astparam = (long) astarg; 1603 args->bastaddr = bast; 1604 args->mode = mode; 1605 args->lksb = lksb; 1606 rv = 0; 1607 out: 1608 return rv; 1609 } 1610 1611 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args) 1612 { 1613 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK | 1614 DLM_LKF_FORCEUNLOCK)) 1615 return -EINVAL; 1616 1617 args->flags = flags; 1618 args->astparam = (long) astarg; 1619 return 0; 1620 } 1621 1622 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 1623 struct dlm_args *args) 1624 { 1625 int rv = -EINVAL; 1626 1627 if (args->flags & DLM_LKF_CONVERT) { 1628 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1629 goto out; 1630 1631 if (args->flags & DLM_LKF_QUECVT && 1632 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) 1633 goto out; 1634 1635 rv = -EBUSY; 1636 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 1637 goto out; 1638 1639 if (lkb->lkb_wait_type) 1640 goto out; 1641 } 1642 1643 lkb->lkb_exflags = args->flags; 1644 lkb->lkb_sbflags = 0; 1645 lkb->lkb_astaddr = args->astaddr; 1646 lkb->lkb_astparam = args->astparam; 1647 lkb->lkb_bastaddr = args->bastaddr; 1648 lkb->lkb_rqmode = args->mode; 1649 lkb->lkb_lksb = args->lksb; 1650 lkb->lkb_lvbptr = args->lksb->sb_lvbptr; 1651 lkb->lkb_ownpid = (int) current->pid; 1652 rv = 0; 1653 out: 1654 return rv; 1655 } 1656 1657 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args) 1658 { 1659 int rv = -EINVAL; 1660 1661 if (lkb->lkb_flags & DLM_IFL_MSTCPY) 1662 goto out; 1663 1664 if (args->flags & DLM_LKF_FORCEUNLOCK) 1665 goto out_ok; 1666 1667 if (args->flags & DLM_LKF_CANCEL && 1668 lkb->lkb_status == DLM_LKSTS_GRANTED) 1669 goto out; 1670 1671 if (!(args->flags & DLM_LKF_CANCEL) && 1672 lkb->lkb_status != DLM_LKSTS_GRANTED) 1673 goto out; 1674 1675 rv = -EBUSY; 1676 if (lkb->lkb_wait_type) 1677 goto out; 1678 1679 out_ok: 1680 lkb->lkb_exflags = args->flags; 1681 lkb->lkb_sbflags = 0; 1682 lkb->lkb_astparam = args->astparam; 1683 1684 rv = 0; 1685 out: 1686 return rv; 1687 } 1688 1689 /* 1690 * Four stage 4 varieties: 1691 * do_request(), do_convert(), do_unlock(), do_cancel() 1692 * These are called on the master node for the given lock and 1693 * from the central locking logic. 1694 */ 1695 1696 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 1697 { 1698 int error = 0; 1699 1700 if (can_be_granted(r, lkb, 1)) { 1701 grant_lock(r, lkb); 1702 queue_cast(r, lkb, 0); 1703 goto out; 1704 } 1705 1706 if (can_be_queued(lkb)) { 1707 error = -EINPROGRESS; 1708 add_lkb(r, lkb, DLM_LKSTS_WAITING); 1709 send_blocking_asts(r, lkb); 1710 goto out; 1711 } 1712 1713 error = -EAGAIN; 1714 if (force_blocking_asts(lkb)) 1715 send_blocking_asts_all(r, lkb); 1716 queue_cast(r, lkb, -EAGAIN); 1717 1718 out: 1719 return error; 1720 } 1721 1722 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 1723 { 1724 int error = 0; 1725 1726 /* changing an existing lock may allow others to be granted */ 1727 1728 if (can_be_granted(r, lkb, 1)) { 1729 grant_lock(r, lkb); 1730 queue_cast(r, lkb, 0); 1731 grant_pending_locks(r); 1732 goto out; 1733 } 1734 1735 if (can_be_queued(lkb)) { 1736 if (is_demoted(lkb)) 1737 grant_pending_locks(r); 1738 error = -EINPROGRESS; 1739 del_lkb(r, lkb); 1740 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 1741 send_blocking_asts(r, lkb); 1742 goto out; 1743 } 1744 1745 error = -EAGAIN; 1746 if (force_blocking_asts(lkb)) 1747 send_blocking_asts_all(r, lkb); 1748 queue_cast(r, lkb, -EAGAIN); 1749 1750 out: 1751 return error; 1752 } 1753 1754 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1755 { 1756 remove_lock(r, lkb); 1757 queue_cast(r, lkb, -DLM_EUNLOCK); 1758 grant_pending_locks(r); 1759 return -DLM_EUNLOCK; 1760 } 1761 1762 /* FIXME: if revert_lock() finds that the lkb is granted, we should 1763 skip the queue_cast(ECANCEL). It indicates that the request/convert 1764 completed (and queued a normal ast) just before the cancel; we don't 1765 want to clobber the sb_result for the normal ast with ECANCEL. */ 1766 1767 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 1768 { 1769 revert_lock(r, lkb); 1770 queue_cast(r, lkb, -DLM_ECANCEL); 1771 grant_pending_locks(r); 1772 return -DLM_ECANCEL; 1773 } 1774 1775 /* 1776 * Four stage 3 varieties: 1777 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() 1778 */ 1779 1780 /* add a new lkb to a possibly new rsb, called by requesting process */ 1781 1782 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1783 { 1784 int error; 1785 1786 /* set_master: sets lkb nodeid from r */ 1787 1788 error = set_master(r, lkb); 1789 if (error < 0) 1790 goto out; 1791 if (error) { 1792 error = 0; 1793 goto out; 1794 } 1795 1796 if (is_remote(r)) 1797 /* receive_request() calls do_request() on remote node */ 1798 error = send_request(r, lkb); 1799 else 1800 error = do_request(r, lkb); 1801 out: 1802 return error; 1803 } 1804 1805 /* change some property of an existing lkb, e.g. mode */ 1806 1807 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1808 { 1809 int error; 1810 1811 if (is_remote(r)) 1812 /* receive_convert() calls do_convert() on remote node */ 1813 error = send_convert(r, lkb); 1814 else 1815 error = do_convert(r, lkb); 1816 1817 return error; 1818 } 1819 1820 /* remove an existing lkb from the granted queue */ 1821 1822 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1823 { 1824 int error; 1825 1826 if (is_remote(r)) 1827 /* receive_unlock() calls do_unlock() on remote node */ 1828 error = send_unlock(r, lkb); 1829 else 1830 error = do_unlock(r, lkb); 1831 1832 return error; 1833 } 1834 1835 /* remove an existing lkb from the convert or wait queue */ 1836 1837 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 1838 { 1839 int error; 1840 1841 if (is_remote(r)) 1842 /* receive_cancel() calls do_cancel() on remote node */ 1843 error = send_cancel(r, lkb); 1844 else 1845 error = do_cancel(r, lkb); 1846 1847 return error; 1848 } 1849 1850 /* 1851 * Four stage 2 varieties: 1852 * request_lock(), convert_lock(), unlock_lock(), cancel_lock() 1853 */ 1854 1855 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, 1856 int len, struct dlm_args *args) 1857 { 1858 struct dlm_rsb *r; 1859 int error; 1860 1861 error = validate_lock_args(ls, lkb, args); 1862 if (error) 1863 goto out; 1864 1865 error = find_rsb(ls, name, len, R_CREATE, &r); 1866 if (error) 1867 goto out; 1868 1869 lock_rsb(r); 1870 1871 attach_lkb(r, lkb); 1872 lkb->lkb_lksb->sb_lkid = lkb->lkb_id; 1873 1874 error = _request_lock(r, lkb); 1875 1876 unlock_rsb(r); 1877 put_rsb(r); 1878 1879 out: 1880 return error; 1881 } 1882 1883 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 1884 struct dlm_args *args) 1885 { 1886 struct dlm_rsb *r; 1887 int error; 1888 1889 r = lkb->lkb_resource; 1890 1891 hold_rsb(r); 1892 lock_rsb(r); 1893 1894 error = validate_lock_args(ls, lkb, args); 1895 if (error) 1896 goto out; 1897 1898 error = _convert_lock(r, lkb); 1899 out: 1900 unlock_rsb(r); 1901 put_rsb(r); 1902 return error; 1903 } 1904 1905 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 1906 struct dlm_args *args) 1907 { 1908 struct dlm_rsb *r; 1909 int error; 1910 1911 r = lkb->lkb_resource; 1912 1913 hold_rsb(r); 1914 lock_rsb(r); 1915 1916 error = validate_unlock_args(lkb, args); 1917 if (error) 1918 goto out; 1919 1920 error = _unlock_lock(r, lkb); 1921 out: 1922 unlock_rsb(r); 1923 put_rsb(r); 1924 return error; 1925 } 1926 1927 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, 1928 struct dlm_args *args) 1929 { 1930 struct dlm_rsb *r; 1931 int error; 1932 1933 r = lkb->lkb_resource; 1934 1935 hold_rsb(r); 1936 lock_rsb(r); 1937 1938 error = validate_unlock_args(lkb, args); 1939 if (error) 1940 goto out; 1941 1942 error = _cancel_lock(r, lkb); 1943 out: 1944 unlock_rsb(r); 1945 put_rsb(r); 1946 return error; 1947 } 1948 1949 /* 1950 * Two stage 1 varieties: dlm_lock() and dlm_unlock() 1951 */ 1952 1953 int dlm_lock(dlm_lockspace_t *lockspace, 1954 int mode, 1955 struct dlm_lksb *lksb, 1956 uint32_t flags, 1957 void *name, 1958 unsigned int namelen, 1959 uint32_t parent_lkid, 1960 void (*ast) (void *astarg), 1961 void *astarg, 1962 void (*bast) (void *astarg, int mode)) 1963 { 1964 struct dlm_ls *ls; 1965 struct dlm_lkb *lkb; 1966 struct dlm_args args; 1967 int error, convert = flags & DLM_LKF_CONVERT; 1968 1969 ls = dlm_find_lockspace_local(lockspace); 1970 if (!ls) 1971 return -EINVAL; 1972 1973 lock_recovery(ls); 1974 1975 if (convert) 1976 error = find_lkb(ls, lksb->sb_lkid, &lkb); 1977 else 1978 error = create_lkb(ls, &lkb); 1979 1980 if (error) 1981 goto out; 1982 1983 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast, 1984 astarg, bast, &args); 1985 if (error) 1986 goto out_put; 1987 1988 if (convert) 1989 error = convert_lock(ls, lkb, &args); 1990 else 1991 error = request_lock(ls, lkb, name, namelen, &args); 1992 1993 if (error == -EINPROGRESS) 1994 error = 0; 1995 out_put: 1996 if (convert || error) 1997 __put_lkb(ls, lkb); 1998 if (error == -EAGAIN) 1999 error = 0; 2000 out: 2001 unlock_recovery(ls); 2002 dlm_put_lockspace(ls); 2003 return error; 2004 } 2005 2006 int dlm_unlock(dlm_lockspace_t *lockspace, 2007 uint32_t lkid, 2008 uint32_t flags, 2009 struct dlm_lksb *lksb, 2010 void *astarg) 2011 { 2012 struct dlm_ls *ls; 2013 struct dlm_lkb *lkb; 2014 struct dlm_args args; 2015 int error; 2016 2017 ls = dlm_find_lockspace_local(lockspace); 2018 if (!ls) 2019 return -EINVAL; 2020 2021 lock_recovery(ls); 2022 2023 error = find_lkb(ls, lkid, &lkb); 2024 if (error) 2025 goto out; 2026 2027 error = set_unlock_args(flags, astarg, &args); 2028 if (error) 2029 goto out_put; 2030 2031 if (flags & DLM_LKF_CANCEL) 2032 error = cancel_lock(ls, lkb, &args); 2033 else 2034 error = unlock_lock(ls, lkb, &args); 2035 2036 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL) 2037 error = 0; 2038 out_put: 2039 dlm_put_lkb(lkb); 2040 out: 2041 unlock_recovery(ls); 2042 dlm_put_lockspace(ls); 2043 return error; 2044 } 2045 2046 /* 2047 * send/receive routines for remote operations and replies 2048 * 2049 * send_args 2050 * send_common 2051 * send_request receive_request 2052 * send_convert receive_convert 2053 * send_unlock receive_unlock 2054 * send_cancel receive_cancel 2055 * send_grant receive_grant 2056 * send_bast receive_bast 2057 * send_lookup receive_lookup 2058 * send_remove receive_remove 2059 * 2060 * send_common_reply 2061 * receive_request_reply send_request_reply 2062 * receive_convert_reply send_convert_reply 2063 * receive_unlock_reply send_unlock_reply 2064 * receive_cancel_reply send_cancel_reply 2065 * receive_lookup_reply send_lookup_reply 2066 */ 2067 2068 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb, 2069 int to_nodeid, int mstype, 2070 struct dlm_message **ms_ret, 2071 struct dlm_mhandle **mh_ret) 2072 { 2073 struct dlm_message *ms; 2074 struct dlm_mhandle *mh; 2075 char *mb; 2076 int mb_len = sizeof(struct dlm_message); 2077 2078 switch (mstype) { 2079 case DLM_MSG_REQUEST: 2080 case DLM_MSG_LOOKUP: 2081 case DLM_MSG_REMOVE: 2082 mb_len += r->res_length; 2083 break; 2084 case DLM_MSG_CONVERT: 2085 case DLM_MSG_UNLOCK: 2086 case DLM_MSG_REQUEST_REPLY: 2087 case DLM_MSG_CONVERT_REPLY: 2088 case DLM_MSG_GRANT: 2089 if (lkb && lkb->lkb_lvbptr) 2090 mb_len += r->res_ls->ls_lvblen; 2091 break; 2092 } 2093 2094 /* get_buffer gives us a message handle (mh) that we need to 2095 pass into lowcomms_commit and a message buffer (mb) that we 2096 write our data into */ 2097 2098 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb); 2099 if (!mh) 2100 return -ENOBUFS; 2101 2102 memset(mb, 0, mb_len); 2103 2104 ms = (struct dlm_message *) mb; 2105 2106 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 2107 ms->m_header.h_lockspace = r->res_ls->ls_global_id; 2108 ms->m_header.h_nodeid = dlm_our_nodeid(); 2109 ms->m_header.h_length = mb_len; 2110 ms->m_header.h_cmd = DLM_MSG; 2111 2112 ms->m_type = mstype; 2113 2114 *mh_ret = mh; 2115 *ms_ret = ms; 2116 return 0; 2117 } 2118 2119 /* further lowcomms enhancements or alternate implementations may make 2120 the return value from this function useful at some point */ 2121 2122 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms) 2123 { 2124 dlm_message_out(ms); 2125 dlm_lowcomms_commit_buffer(mh); 2126 return 0; 2127 } 2128 2129 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb, 2130 struct dlm_message *ms) 2131 { 2132 ms->m_nodeid = lkb->lkb_nodeid; 2133 ms->m_pid = lkb->lkb_ownpid; 2134 ms->m_lkid = lkb->lkb_id; 2135 ms->m_remid = lkb->lkb_remid; 2136 ms->m_exflags = lkb->lkb_exflags; 2137 ms->m_sbflags = lkb->lkb_sbflags; 2138 ms->m_flags = lkb->lkb_flags; 2139 ms->m_lvbseq = lkb->lkb_lvbseq; 2140 ms->m_status = lkb->lkb_status; 2141 ms->m_grmode = lkb->lkb_grmode; 2142 ms->m_rqmode = lkb->lkb_rqmode; 2143 ms->m_hash = r->res_hash; 2144 2145 /* m_result and m_bastmode are set from function args, 2146 not from lkb fields */ 2147 2148 if (lkb->lkb_bastaddr) 2149 ms->m_asts |= AST_BAST; 2150 if (lkb->lkb_astaddr) 2151 ms->m_asts |= AST_COMP; 2152 2153 /* compare with switch in create_message; send_remove() doesn't 2154 use send_args() */ 2155 2156 switch (ms->m_type) { 2157 case DLM_MSG_REQUEST: 2158 case DLM_MSG_LOOKUP: 2159 memcpy(ms->m_extra, r->res_name, r->res_length); 2160 break; 2161 case DLM_MSG_CONVERT: 2162 case DLM_MSG_UNLOCK: 2163 case DLM_MSG_REQUEST_REPLY: 2164 case DLM_MSG_CONVERT_REPLY: 2165 case DLM_MSG_GRANT: 2166 if (!lkb->lkb_lvbptr) 2167 break; 2168 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 2169 break; 2170 } 2171 } 2172 2173 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype) 2174 { 2175 struct dlm_message *ms; 2176 struct dlm_mhandle *mh; 2177 int to_nodeid, error; 2178 2179 add_to_waiters(lkb, mstype); 2180 2181 to_nodeid = r->res_nodeid; 2182 2183 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2184 if (error) 2185 goto fail; 2186 2187 send_args(r, lkb, ms); 2188 2189 error = send_message(mh, ms); 2190 if (error) 2191 goto fail; 2192 return 0; 2193 2194 fail: 2195 remove_from_waiters(lkb); 2196 return error; 2197 } 2198 2199 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb) 2200 { 2201 return send_common(r, lkb, DLM_MSG_REQUEST); 2202 } 2203 2204 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) 2205 { 2206 int error; 2207 2208 error = send_common(r, lkb, DLM_MSG_CONVERT); 2209 2210 /* down conversions go without a reply from the master */ 2211 if (!error && down_conversion(lkb)) { 2212 remove_from_waiters(lkb); 2213 r->res_ls->ls_stub_ms.m_result = 0; 2214 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags; 2215 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms); 2216 } 2217 2218 return error; 2219 } 2220 2221 /* FIXME: if this lkb is the only lock we hold on the rsb, then set 2222 MASTER_UNCERTAIN to force the next request on the rsb to confirm 2223 that the master is still correct. */ 2224 2225 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) 2226 { 2227 return send_common(r, lkb, DLM_MSG_UNLOCK); 2228 } 2229 2230 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb) 2231 { 2232 return send_common(r, lkb, DLM_MSG_CANCEL); 2233 } 2234 2235 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb) 2236 { 2237 struct dlm_message *ms; 2238 struct dlm_mhandle *mh; 2239 int to_nodeid, error; 2240 2241 to_nodeid = lkb->lkb_nodeid; 2242 2243 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh); 2244 if (error) 2245 goto out; 2246 2247 send_args(r, lkb, ms); 2248 2249 ms->m_result = 0; 2250 2251 error = send_message(mh, ms); 2252 out: 2253 return error; 2254 } 2255 2256 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode) 2257 { 2258 struct dlm_message *ms; 2259 struct dlm_mhandle *mh; 2260 int to_nodeid, error; 2261 2262 to_nodeid = lkb->lkb_nodeid; 2263 2264 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh); 2265 if (error) 2266 goto out; 2267 2268 send_args(r, lkb, ms); 2269 2270 ms->m_bastmode = mode; 2271 2272 error = send_message(mh, ms); 2273 out: 2274 return error; 2275 } 2276 2277 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb) 2278 { 2279 struct dlm_message *ms; 2280 struct dlm_mhandle *mh; 2281 int to_nodeid, error; 2282 2283 add_to_waiters(lkb, DLM_MSG_LOOKUP); 2284 2285 to_nodeid = dlm_dir_nodeid(r); 2286 2287 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh); 2288 if (error) 2289 goto fail; 2290 2291 send_args(r, lkb, ms); 2292 2293 error = send_message(mh, ms); 2294 if (error) 2295 goto fail; 2296 return 0; 2297 2298 fail: 2299 remove_from_waiters(lkb); 2300 return error; 2301 } 2302 2303 static int send_remove(struct dlm_rsb *r) 2304 { 2305 struct dlm_message *ms; 2306 struct dlm_mhandle *mh; 2307 int to_nodeid, error; 2308 2309 to_nodeid = dlm_dir_nodeid(r); 2310 2311 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh); 2312 if (error) 2313 goto out; 2314 2315 memcpy(ms->m_extra, r->res_name, r->res_length); 2316 ms->m_hash = r->res_hash; 2317 2318 error = send_message(mh, ms); 2319 out: 2320 return error; 2321 } 2322 2323 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 2324 int mstype, int rv) 2325 { 2326 struct dlm_message *ms; 2327 struct dlm_mhandle *mh; 2328 int to_nodeid, error; 2329 2330 to_nodeid = lkb->lkb_nodeid; 2331 2332 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh); 2333 if (error) 2334 goto out; 2335 2336 send_args(r, lkb, ms); 2337 2338 ms->m_result = rv; 2339 2340 error = send_message(mh, ms); 2341 out: 2342 return error; 2343 } 2344 2345 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2346 { 2347 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv); 2348 } 2349 2350 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2351 { 2352 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv); 2353 } 2354 2355 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2356 { 2357 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv); 2358 } 2359 2360 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv) 2361 { 2362 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv); 2363 } 2364 2365 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in, 2366 int ret_nodeid, int rv) 2367 { 2368 struct dlm_rsb *r = &ls->ls_stub_rsb; 2369 struct dlm_message *ms; 2370 struct dlm_mhandle *mh; 2371 int error, nodeid = ms_in->m_header.h_nodeid; 2372 2373 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh); 2374 if (error) 2375 goto out; 2376 2377 ms->m_lkid = ms_in->m_lkid; 2378 ms->m_result = rv; 2379 ms->m_nodeid = ret_nodeid; 2380 2381 error = send_message(mh, ms); 2382 out: 2383 return error; 2384 } 2385 2386 /* which args we save from a received message depends heavily on the type 2387 of message, unlike the send side where we can safely send everything about 2388 the lkb for any type of message */ 2389 2390 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms) 2391 { 2392 lkb->lkb_exflags = ms->m_exflags; 2393 lkb->lkb_sbflags = ms->m_sbflags; 2394 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2395 (ms->m_flags & 0x0000FFFF); 2396 } 2397 2398 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 2399 { 2400 lkb->lkb_sbflags = ms->m_sbflags; 2401 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) | 2402 (ms->m_flags & 0x0000FFFF); 2403 } 2404 2405 static int receive_extralen(struct dlm_message *ms) 2406 { 2407 return (ms->m_header.h_length - sizeof(struct dlm_message)); 2408 } 2409 2410 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb, 2411 struct dlm_message *ms) 2412 { 2413 int len; 2414 2415 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 2416 if (!lkb->lkb_lvbptr) 2417 lkb->lkb_lvbptr = allocate_lvb(ls); 2418 if (!lkb->lkb_lvbptr) 2419 return -ENOMEM; 2420 len = receive_extralen(ms); 2421 memcpy(lkb->lkb_lvbptr, ms->m_extra, len); 2422 } 2423 return 0; 2424 } 2425 2426 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2427 struct dlm_message *ms) 2428 { 2429 lkb->lkb_nodeid = ms->m_header.h_nodeid; 2430 lkb->lkb_ownpid = ms->m_pid; 2431 lkb->lkb_remid = ms->m_lkid; 2432 lkb->lkb_grmode = DLM_LOCK_IV; 2433 lkb->lkb_rqmode = ms->m_rqmode; 2434 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST); 2435 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP); 2436 2437 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb);); 2438 2439 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 2440 /* lkb was just created so there won't be an lvb yet */ 2441 lkb->lkb_lvbptr = allocate_lvb(ls); 2442 if (!lkb->lkb_lvbptr) 2443 return -ENOMEM; 2444 } 2445 2446 return 0; 2447 } 2448 2449 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2450 struct dlm_message *ms) 2451 { 2452 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) { 2453 log_error(ls, "convert_args nodeid %d %d lkid %x %x", 2454 lkb->lkb_nodeid, ms->m_header.h_nodeid, 2455 lkb->lkb_id, lkb->lkb_remid); 2456 return -EINVAL; 2457 } 2458 2459 if (!is_master_copy(lkb)) 2460 return -EINVAL; 2461 2462 if (lkb->lkb_status != DLM_LKSTS_GRANTED) 2463 return -EBUSY; 2464 2465 if (receive_lvb(ls, lkb, ms)) 2466 return -ENOMEM; 2467 2468 lkb->lkb_rqmode = ms->m_rqmode; 2469 lkb->lkb_lvbseq = ms->m_lvbseq; 2470 2471 return 0; 2472 } 2473 2474 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 2475 struct dlm_message *ms) 2476 { 2477 if (!is_master_copy(lkb)) 2478 return -EINVAL; 2479 if (receive_lvb(ls, lkb, ms)) 2480 return -ENOMEM; 2481 return 0; 2482 } 2483 2484 /* We fill in the stub-lkb fields with the info that send_xxxx_reply() 2485 uses to send a reply and that the remote end uses to process the reply. */ 2486 2487 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms) 2488 { 2489 struct dlm_lkb *lkb = &ls->ls_stub_lkb; 2490 lkb->lkb_nodeid = ms->m_header.h_nodeid; 2491 lkb->lkb_remid = ms->m_lkid; 2492 } 2493 2494 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms) 2495 { 2496 struct dlm_lkb *lkb; 2497 struct dlm_rsb *r; 2498 int error, namelen; 2499 2500 error = create_lkb(ls, &lkb); 2501 if (error) 2502 goto fail; 2503 2504 receive_flags(lkb, ms); 2505 lkb->lkb_flags |= DLM_IFL_MSTCPY; 2506 error = receive_request_args(ls, lkb, ms); 2507 if (error) { 2508 __put_lkb(ls, lkb); 2509 goto fail; 2510 } 2511 2512 namelen = receive_extralen(ms); 2513 2514 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r); 2515 if (error) { 2516 __put_lkb(ls, lkb); 2517 goto fail; 2518 } 2519 2520 lock_rsb(r); 2521 2522 attach_lkb(r, lkb); 2523 error = do_request(r, lkb); 2524 send_request_reply(r, lkb, error); 2525 2526 unlock_rsb(r); 2527 put_rsb(r); 2528 2529 if (error == -EINPROGRESS) 2530 error = 0; 2531 if (error) 2532 dlm_put_lkb(lkb); 2533 return; 2534 2535 fail: 2536 setup_stub_lkb(ls, ms); 2537 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2538 } 2539 2540 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms) 2541 { 2542 struct dlm_lkb *lkb; 2543 struct dlm_rsb *r; 2544 int error, reply = 1; 2545 2546 error = find_lkb(ls, ms->m_remid, &lkb); 2547 if (error) 2548 goto fail; 2549 2550 r = lkb->lkb_resource; 2551 2552 hold_rsb(r); 2553 lock_rsb(r); 2554 2555 receive_flags(lkb, ms); 2556 error = receive_convert_args(ls, lkb, ms); 2557 if (error) 2558 goto out; 2559 reply = !down_conversion(lkb); 2560 2561 error = do_convert(r, lkb); 2562 out: 2563 if (reply) 2564 send_convert_reply(r, lkb, error); 2565 2566 unlock_rsb(r); 2567 put_rsb(r); 2568 dlm_put_lkb(lkb); 2569 return; 2570 2571 fail: 2572 setup_stub_lkb(ls, ms); 2573 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2574 } 2575 2576 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms) 2577 { 2578 struct dlm_lkb *lkb; 2579 struct dlm_rsb *r; 2580 int error; 2581 2582 error = find_lkb(ls, ms->m_remid, &lkb); 2583 if (error) 2584 goto fail; 2585 2586 r = lkb->lkb_resource; 2587 2588 hold_rsb(r); 2589 lock_rsb(r); 2590 2591 receive_flags(lkb, ms); 2592 error = receive_unlock_args(ls, lkb, ms); 2593 if (error) 2594 goto out; 2595 2596 error = do_unlock(r, lkb); 2597 out: 2598 send_unlock_reply(r, lkb, error); 2599 2600 unlock_rsb(r); 2601 put_rsb(r); 2602 dlm_put_lkb(lkb); 2603 return; 2604 2605 fail: 2606 setup_stub_lkb(ls, ms); 2607 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2608 } 2609 2610 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms) 2611 { 2612 struct dlm_lkb *lkb; 2613 struct dlm_rsb *r; 2614 int error; 2615 2616 error = find_lkb(ls, ms->m_remid, &lkb); 2617 if (error) 2618 goto fail; 2619 2620 receive_flags(lkb, ms); 2621 2622 r = lkb->lkb_resource; 2623 2624 hold_rsb(r); 2625 lock_rsb(r); 2626 2627 error = do_cancel(r, lkb); 2628 send_cancel_reply(r, lkb, error); 2629 2630 unlock_rsb(r); 2631 put_rsb(r); 2632 dlm_put_lkb(lkb); 2633 return; 2634 2635 fail: 2636 setup_stub_lkb(ls, ms); 2637 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error); 2638 } 2639 2640 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms) 2641 { 2642 struct dlm_lkb *lkb; 2643 struct dlm_rsb *r; 2644 int error; 2645 2646 error = find_lkb(ls, ms->m_remid, &lkb); 2647 if (error) { 2648 log_error(ls, "receive_grant no lkb"); 2649 return; 2650 } 2651 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2652 2653 r = lkb->lkb_resource; 2654 2655 hold_rsb(r); 2656 lock_rsb(r); 2657 2658 receive_flags_reply(lkb, ms); 2659 grant_lock_pc(r, lkb, ms); 2660 queue_cast(r, lkb, 0); 2661 2662 unlock_rsb(r); 2663 put_rsb(r); 2664 dlm_put_lkb(lkb); 2665 } 2666 2667 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms) 2668 { 2669 struct dlm_lkb *lkb; 2670 struct dlm_rsb *r; 2671 int error; 2672 2673 error = find_lkb(ls, ms->m_remid, &lkb); 2674 if (error) { 2675 log_error(ls, "receive_bast no lkb"); 2676 return; 2677 } 2678 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2679 2680 r = lkb->lkb_resource; 2681 2682 hold_rsb(r); 2683 lock_rsb(r); 2684 2685 queue_bast(r, lkb, ms->m_bastmode); 2686 2687 unlock_rsb(r); 2688 put_rsb(r); 2689 dlm_put_lkb(lkb); 2690 } 2691 2692 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms) 2693 { 2694 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid; 2695 2696 from_nodeid = ms->m_header.h_nodeid; 2697 our_nodeid = dlm_our_nodeid(); 2698 2699 len = receive_extralen(ms); 2700 2701 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 2702 if (dir_nodeid != our_nodeid) { 2703 log_error(ls, "lookup dir_nodeid %d from %d", 2704 dir_nodeid, from_nodeid); 2705 error = -EINVAL; 2706 ret_nodeid = -1; 2707 goto out; 2708 } 2709 2710 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid); 2711 2712 /* Optimization: we're master so treat lookup as a request */ 2713 if (!error && ret_nodeid == our_nodeid) { 2714 receive_request(ls, ms); 2715 return; 2716 } 2717 out: 2718 send_lookup_reply(ls, ms, ret_nodeid, error); 2719 } 2720 2721 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms) 2722 { 2723 int len, dir_nodeid, from_nodeid; 2724 2725 from_nodeid = ms->m_header.h_nodeid; 2726 2727 len = receive_extralen(ms); 2728 2729 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash); 2730 if (dir_nodeid != dlm_our_nodeid()) { 2731 log_error(ls, "remove dir entry dir_nodeid %d from %d", 2732 dir_nodeid, from_nodeid); 2733 return; 2734 } 2735 2736 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len); 2737 } 2738 2739 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) 2740 { 2741 struct dlm_lkb *lkb; 2742 struct dlm_rsb *r; 2743 int error, mstype; 2744 2745 error = find_lkb(ls, ms->m_remid, &lkb); 2746 if (error) { 2747 log_error(ls, "receive_request_reply no lkb"); 2748 return; 2749 } 2750 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2751 2752 mstype = lkb->lkb_wait_type; 2753 error = remove_from_waiters(lkb); 2754 if (error) { 2755 log_error(ls, "receive_request_reply not on waiters"); 2756 goto out; 2757 } 2758 2759 /* this is the value returned from do_request() on the master */ 2760 error = ms->m_result; 2761 2762 r = lkb->lkb_resource; 2763 hold_rsb(r); 2764 lock_rsb(r); 2765 2766 /* Optimization: the dir node was also the master, so it took our 2767 lookup as a request and sent request reply instead of lookup reply */ 2768 if (mstype == DLM_MSG_LOOKUP) { 2769 r->res_nodeid = ms->m_header.h_nodeid; 2770 lkb->lkb_nodeid = r->res_nodeid; 2771 } 2772 2773 switch (error) { 2774 case -EAGAIN: 2775 /* request would block (be queued) on remote master; 2776 the unhold undoes the original ref from create_lkb() 2777 so it leads to the lkb being freed */ 2778 queue_cast(r, lkb, -EAGAIN); 2779 confirm_master(r, -EAGAIN); 2780 unhold_lkb(lkb); 2781 break; 2782 2783 case -EINPROGRESS: 2784 case 0: 2785 /* request was queued or granted on remote master */ 2786 receive_flags_reply(lkb, ms); 2787 lkb->lkb_remid = ms->m_lkid; 2788 if (error) 2789 add_lkb(r, lkb, DLM_LKSTS_WAITING); 2790 else { 2791 grant_lock_pc(r, lkb, ms); 2792 queue_cast(r, lkb, 0); 2793 } 2794 confirm_master(r, error); 2795 break; 2796 2797 case -EBADR: 2798 case -ENOTBLK: 2799 /* find_rsb failed to find rsb or rsb wasn't master */ 2800 r->res_nodeid = -1; 2801 lkb->lkb_nodeid = -1; 2802 _request_lock(r, lkb); 2803 break; 2804 2805 default: 2806 log_error(ls, "receive_request_reply error %d", error); 2807 } 2808 2809 unlock_rsb(r); 2810 put_rsb(r); 2811 out: 2812 dlm_put_lkb(lkb); 2813 } 2814 2815 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, 2816 struct dlm_message *ms) 2817 { 2818 int error = ms->m_result; 2819 2820 /* this is the value returned from do_convert() on the master */ 2821 2822 switch (error) { 2823 case -EAGAIN: 2824 /* convert would block (be queued) on remote master */ 2825 queue_cast(r, lkb, -EAGAIN); 2826 break; 2827 2828 case -EINPROGRESS: 2829 /* convert was queued on remote master */ 2830 del_lkb(r, lkb); 2831 add_lkb(r, lkb, DLM_LKSTS_CONVERT); 2832 break; 2833 2834 case 0: 2835 /* convert was granted on remote master */ 2836 receive_flags_reply(lkb, ms); 2837 grant_lock_pc(r, lkb, ms); 2838 queue_cast(r, lkb, 0); 2839 break; 2840 2841 default: 2842 log_error(r->res_ls, "receive_convert_reply error %d", error); 2843 } 2844 } 2845 2846 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 2847 { 2848 struct dlm_rsb *r = lkb->lkb_resource; 2849 2850 hold_rsb(r); 2851 lock_rsb(r); 2852 2853 __receive_convert_reply(r, lkb, ms); 2854 2855 unlock_rsb(r); 2856 put_rsb(r); 2857 } 2858 2859 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms) 2860 { 2861 struct dlm_lkb *lkb; 2862 int error; 2863 2864 error = find_lkb(ls, ms->m_remid, &lkb); 2865 if (error) { 2866 log_error(ls, "receive_convert_reply no lkb"); 2867 return; 2868 } 2869 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2870 2871 error = remove_from_waiters(lkb); 2872 if (error) { 2873 log_error(ls, "receive_convert_reply not on waiters"); 2874 goto out; 2875 } 2876 2877 _receive_convert_reply(lkb, ms); 2878 out: 2879 dlm_put_lkb(lkb); 2880 } 2881 2882 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 2883 { 2884 struct dlm_rsb *r = lkb->lkb_resource; 2885 int error = ms->m_result; 2886 2887 hold_rsb(r); 2888 lock_rsb(r); 2889 2890 /* this is the value returned from do_unlock() on the master */ 2891 2892 switch (error) { 2893 case -DLM_EUNLOCK: 2894 receive_flags_reply(lkb, ms); 2895 remove_lock_pc(r, lkb); 2896 queue_cast(r, lkb, -DLM_EUNLOCK); 2897 break; 2898 default: 2899 log_error(r->res_ls, "receive_unlock_reply error %d", error); 2900 } 2901 2902 unlock_rsb(r); 2903 put_rsb(r); 2904 } 2905 2906 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms) 2907 { 2908 struct dlm_lkb *lkb; 2909 int error; 2910 2911 error = find_lkb(ls, ms->m_remid, &lkb); 2912 if (error) { 2913 log_error(ls, "receive_unlock_reply no lkb"); 2914 return; 2915 } 2916 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2917 2918 error = remove_from_waiters(lkb); 2919 if (error) { 2920 log_error(ls, "receive_unlock_reply not on waiters"); 2921 goto out; 2922 } 2923 2924 _receive_unlock_reply(lkb, ms); 2925 out: 2926 dlm_put_lkb(lkb); 2927 } 2928 2929 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms) 2930 { 2931 struct dlm_rsb *r = lkb->lkb_resource; 2932 int error = ms->m_result; 2933 2934 hold_rsb(r); 2935 lock_rsb(r); 2936 2937 /* this is the value returned from do_cancel() on the master */ 2938 2939 switch (error) { 2940 case -DLM_ECANCEL: 2941 receive_flags_reply(lkb, ms); 2942 revert_lock_pc(r, lkb); 2943 queue_cast(r, lkb, -DLM_ECANCEL); 2944 break; 2945 default: 2946 log_error(r->res_ls, "receive_cancel_reply error %d", error); 2947 } 2948 2949 unlock_rsb(r); 2950 put_rsb(r); 2951 } 2952 2953 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms) 2954 { 2955 struct dlm_lkb *lkb; 2956 int error; 2957 2958 error = find_lkb(ls, ms->m_remid, &lkb); 2959 if (error) { 2960 log_error(ls, "receive_cancel_reply no lkb"); 2961 return; 2962 } 2963 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 2964 2965 error = remove_from_waiters(lkb); 2966 if (error) { 2967 log_error(ls, "receive_cancel_reply not on waiters"); 2968 goto out; 2969 } 2970 2971 _receive_cancel_reply(lkb, ms); 2972 out: 2973 dlm_put_lkb(lkb); 2974 } 2975 2976 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms) 2977 { 2978 struct dlm_lkb *lkb; 2979 struct dlm_rsb *r; 2980 int error, ret_nodeid; 2981 2982 error = find_lkb(ls, ms->m_lkid, &lkb); 2983 if (error) { 2984 log_error(ls, "receive_lookup_reply no lkb"); 2985 return; 2986 } 2987 2988 error = remove_from_waiters(lkb); 2989 if (error) { 2990 log_error(ls, "receive_lookup_reply not on waiters"); 2991 goto out; 2992 } 2993 2994 /* this is the value returned by dlm_dir_lookup on dir node 2995 FIXME: will a non-zero error ever be returned? */ 2996 error = ms->m_result; 2997 2998 r = lkb->lkb_resource; 2999 hold_rsb(r); 3000 lock_rsb(r); 3001 3002 ret_nodeid = ms->m_nodeid; 3003 if (ret_nodeid == dlm_our_nodeid()) { 3004 r->res_nodeid = 0; 3005 ret_nodeid = 0; 3006 r->res_first_lkid = 0; 3007 } else { 3008 /* set_master() will copy res_nodeid to lkb_nodeid */ 3009 r->res_nodeid = ret_nodeid; 3010 } 3011 3012 _request_lock(r, lkb); 3013 3014 if (!ret_nodeid) 3015 process_lookup_list(r); 3016 3017 unlock_rsb(r); 3018 put_rsb(r); 3019 out: 3020 dlm_put_lkb(lkb); 3021 } 3022 3023 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery) 3024 { 3025 struct dlm_message *ms = (struct dlm_message *) hd; 3026 struct dlm_ls *ls; 3027 int error = 0; 3028 3029 if (!recovery) 3030 dlm_message_in(ms); 3031 3032 ls = dlm_find_lockspace_global(hd->h_lockspace); 3033 if (!ls) { 3034 log_print("drop message %d from %d for unknown lockspace %d", 3035 ms->m_type, nodeid, hd->h_lockspace); 3036 return -EINVAL; 3037 } 3038 3039 /* recovery may have just ended leaving a bunch of backed-up requests 3040 in the requestqueue; wait while dlm_recoverd clears them */ 3041 3042 if (!recovery) 3043 dlm_wait_requestqueue(ls); 3044 3045 /* recovery may have just started while there were a bunch of 3046 in-flight requests -- save them in requestqueue to be processed 3047 after recovery. we can't let dlm_recvd block on the recovery 3048 lock. if dlm_recoverd is calling this function to clear the 3049 requestqueue, it needs to be interrupted (-EINTR) if another 3050 recovery operation is starting. */ 3051 3052 while (1) { 3053 if (dlm_locking_stopped(ls)) { 3054 if (recovery) { 3055 error = -EINTR; 3056 goto out; 3057 } 3058 error = dlm_add_requestqueue(ls, nodeid, hd); 3059 if (error == -EAGAIN) 3060 continue; 3061 else { 3062 error = -EINTR; 3063 goto out; 3064 } 3065 } 3066 3067 if (lock_recovery_try(ls)) 3068 break; 3069 schedule(); 3070 } 3071 3072 switch (ms->m_type) { 3073 3074 /* messages sent to a master node */ 3075 3076 case DLM_MSG_REQUEST: 3077 receive_request(ls, ms); 3078 break; 3079 3080 case DLM_MSG_CONVERT: 3081 receive_convert(ls, ms); 3082 break; 3083 3084 case DLM_MSG_UNLOCK: 3085 receive_unlock(ls, ms); 3086 break; 3087 3088 case DLM_MSG_CANCEL: 3089 receive_cancel(ls, ms); 3090 break; 3091 3092 /* messages sent from a master node (replies to above) */ 3093 3094 case DLM_MSG_REQUEST_REPLY: 3095 receive_request_reply(ls, ms); 3096 break; 3097 3098 case DLM_MSG_CONVERT_REPLY: 3099 receive_convert_reply(ls, ms); 3100 break; 3101 3102 case DLM_MSG_UNLOCK_REPLY: 3103 receive_unlock_reply(ls, ms); 3104 break; 3105 3106 case DLM_MSG_CANCEL_REPLY: 3107 receive_cancel_reply(ls, ms); 3108 break; 3109 3110 /* messages sent from a master node (only two types of async msg) */ 3111 3112 case DLM_MSG_GRANT: 3113 receive_grant(ls, ms); 3114 break; 3115 3116 case DLM_MSG_BAST: 3117 receive_bast(ls, ms); 3118 break; 3119 3120 /* messages sent to a dir node */ 3121 3122 case DLM_MSG_LOOKUP: 3123 receive_lookup(ls, ms); 3124 break; 3125 3126 case DLM_MSG_REMOVE: 3127 receive_remove(ls, ms); 3128 break; 3129 3130 /* messages sent from a dir node (remove has no reply) */ 3131 3132 case DLM_MSG_LOOKUP_REPLY: 3133 receive_lookup_reply(ls, ms); 3134 break; 3135 3136 default: 3137 log_error(ls, "unknown message type %d", ms->m_type); 3138 } 3139 3140 unlock_recovery(ls); 3141 out: 3142 dlm_put_lockspace(ls); 3143 dlm_astd_wake(); 3144 return error; 3145 } 3146 3147 3148 /* 3149 * Recovery related 3150 */ 3151 3152 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb) 3153 { 3154 if (middle_conversion(lkb)) { 3155 hold_lkb(lkb); 3156 ls->ls_stub_ms.m_result = -EINPROGRESS; 3157 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3158 _remove_from_waiters(lkb); 3159 _receive_convert_reply(lkb, &ls->ls_stub_ms); 3160 3161 /* Same special case as in receive_rcom_lock_args() */ 3162 lkb->lkb_grmode = DLM_LOCK_IV; 3163 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT); 3164 unhold_lkb(lkb); 3165 3166 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) { 3167 lkb->lkb_flags |= DLM_IFL_RESEND; 3168 } 3169 3170 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down 3171 conversions are async; there's no reply from the remote master */ 3172 } 3173 3174 /* A waiting lkb needs recovery if the master node has failed, or 3175 the master node is changing (only when no directory is used) */ 3176 3177 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb) 3178 { 3179 if (dlm_is_removed(ls, lkb->lkb_nodeid)) 3180 return 1; 3181 3182 if (!dlm_no_directory(ls)) 3183 return 0; 3184 3185 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid) 3186 return 1; 3187 3188 return 0; 3189 } 3190 3191 /* Recovery for locks that are waiting for replies from nodes that are now 3192 gone. We can just complete unlocks and cancels by faking a reply from the 3193 dead node. Requests and up-conversions we flag to be resent after 3194 recovery. Down-conversions can just be completed with a fake reply like 3195 unlocks. Conversions between PR and CW need special attention. */ 3196 3197 void dlm_recover_waiters_pre(struct dlm_ls *ls) 3198 { 3199 struct dlm_lkb *lkb, *safe; 3200 3201 mutex_lock(&ls->ls_waiters_mutex); 3202 3203 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) { 3204 log_debug(ls, "pre recover waiter lkid %x type %d flags %x", 3205 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags); 3206 3207 /* all outstanding lookups, regardless of destination will be 3208 resent after recovery is done */ 3209 3210 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) { 3211 lkb->lkb_flags |= DLM_IFL_RESEND; 3212 continue; 3213 } 3214 3215 if (!waiter_needs_recovery(ls, lkb)) 3216 continue; 3217 3218 switch (lkb->lkb_wait_type) { 3219 3220 case DLM_MSG_REQUEST: 3221 lkb->lkb_flags |= DLM_IFL_RESEND; 3222 break; 3223 3224 case DLM_MSG_CONVERT: 3225 recover_convert_waiter(ls, lkb); 3226 break; 3227 3228 case DLM_MSG_UNLOCK: 3229 hold_lkb(lkb); 3230 ls->ls_stub_ms.m_result = -DLM_EUNLOCK; 3231 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3232 _remove_from_waiters(lkb); 3233 _receive_unlock_reply(lkb, &ls->ls_stub_ms); 3234 dlm_put_lkb(lkb); 3235 break; 3236 3237 case DLM_MSG_CANCEL: 3238 hold_lkb(lkb); 3239 ls->ls_stub_ms.m_result = -DLM_ECANCEL; 3240 ls->ls_stub_ms.m_flags = lkb->lkb_flags; 3241 _remove_from_waiters(lkb); 3242 _receive_cancel_reply(lkb, &ls->ls_stub_ms); 3243 dlm_put_lkb(lkb); 3244 break; 3245 3246 default: 3247 log_error(ls, "invalid lkb wait_type %d", 3248 lkb->lkb_wait_type); 3249 } 3250 schedule(); 3251 } 3252 mutex_unlock(&ls->ls_waiters_mutex); 3253 } 3254 3255 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret) 3256 { 3257 struct dlm_lkb *lkb; 3258 int rv = 0; 3259 3260 mutex_lock(&ls->ls_waiters_mutex); 3261 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) { 3262 if (lkb->lkb_flags & DLM_IFL_RESEND) { 3263 rv = lkb->lkb_wait_type; 3264 _remove_from_waiters(lkb); 3265 lkb->lkb_flags &= ~DLM_IFL_RESEND; 3266 break; 3267 } 3268 } 3269 mutex_unlock(&ls->ls_waiters_mutex); 3270 3271 if (!rv) 3272 lkb = NULL; 3273 *lkb_ret = lkb; 3274 return rv; 3275 } 3276 3277 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the 3278 master or dir-node for r. Processing the lkb may result in it being placed 3279 back on waiters. */ 3280 3281 int dlm_recover_waiters_post(struct dlm_ls *ls) 3282 { 3283 struct dlm_lkb *lkb; 3284 struct dlm_rsb *r; 3285 int error = 0, mstype; 3286 3287 while (1) { 3288 if (dlm_locking_stopped(ls)) { 3289 log_debug(ls, "recover_waiters_post aborted"); 3290 error = -EINTR; 3291 break; 3292 } 3293 3294 mstype = remove_resend_waiter(ls, &lkb); 3295 if (!mstype) 3296 break; 3297 3298 r = lkb->lkb_resource; 3299 3300 log_debug(ls, "recover_waiters_post %x type %d flags %x %s", 3301 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name); 3302 3303 switch (mstype) { 3304 3305 case DLM_MSG_LOOKUP: 3306 hold_rsb(r); 3307 lock_rsb(r); 3308 _request_lock(r, lkb); 3309 if (is_master(r)) 3310 confirm_master(r, 0); 3311 unlock_rsb(r); 3312 put_rsb(r); 3313 break; 3314 3315 case DLM_MSG_REQUEST: 3316 hold_rsb(r); 3317 lock_rsb(r); 3318 _request_lock(r, lkb); 3319 if (is_master(r)) 3320 confirm_master(r, 0); 3321 unlock_rsb(r); 3322 put_rsb(r); 3323 break; 3324 3325 case DLM_MSG_CONVERT: 3326 hold_rsb(r); 3327 lock_rsb(r); 3328 _convert_lock(r, lkb); 3329 unlock_rsb(r); 3330 put_rsb(r); 3331 break; 3332 3333 default: 3334 log_error(ls, "recover_waiters_post type %d", mstype); 3335 } 3336 } 3337 3338 return error; 3339 } 3340 3341 static void purge_queue(struct dlm_rsb *r, struct list_head *queue, 3342 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) 3343 { 3344 struct dlm_ls *ls = r->res_ls; 3345 struct dlm_lkb *lkb, *safe; 3346 3347 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { 3348 if (test(ls, lkb)) { 3349 rsb_set_flag(r, RSB_LOCKS_PURGED); 3350 del_lkb(r, lkb); 3351 /* this put should free the lkb */ 3352 if (!dlm_put_lkb(lkb)) 3353 log_error(ls, "purged lkb not released"); 3354 } 3355 } 3356 } 3357 3358 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 3359 { 3360 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); 3361 } 3362 3363 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) 3364 { 3365 return is_master_copy(lkb); 3366 } 3367 3368 static void purge_dead_locks(struct dlm_rsb *r) 3369 { 3370 purge_queue(r, &r->res_grantqueue, &purge_dead_test); 3371 purge_queue(r, &r->res_convertqueue, &purge_dead_test); 3372 purge_queue(r, &r->res_waitqueue, &purge_dead_test); 3373 } 3374 3375 void dlm_purge_mstcpy_locks(struct dlm_rsb *r) 3376 { 3377 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); 3378 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test); 3379 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); 3380 } 3381 3382 /* Get rid of locks held by nodes that are gone. */ 3383 3384 int dlm_purge_locks(struct dlm_ls *ls) 3385 { 3386 struct dlm_rsb *r; 3387 3388 log_debug(ls, "dlm_purge_locks"); 3389 3390 down_write(&ls->ls_root_sem); 3391 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 3392 hold_rsb(r); 3393 lock_rsb(r); 3394 if (is_master(r)) 3395 purge_dead_locks(r); 3396 unlock_rsb(r); 3397 unhold_rsb(r); 3398 3399 schedule(); 3400 } 3401 up_write(&ls->ls_root_sem); 3402 3403 return 0; 3404 } 3405 3406 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) 3407 { 3408 struct dlm_rsb *r, *r_ret = NULL; 3409 3410 read_lock(&ls->ls_rsbtbl[bucket].lock); 3411 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) { 3412 if (!rsb_flag(r, RSB_LOCKS_PURGED)) 3413 continue; 3414 hold_rsb(r); 3415 rsb_clear_flag(r, RSB_LOCKS_PURGED); 3416 r_ret = r; 3417 break; 3418 } 3419 read_unlock(&ls->ls_rsbtbl[bucket].lock); 3420 return r_ret; 3421 } 3422 3423 void dlm_grant_after_purge(struct dlm_ls *ls) 3424 { 3425 struct dlm_rsb *r; 3426 int bucket = 0; 3427 3428 while (1) { 3429 r = find_purged_rsb(ls, bucket); 3430 if (!r) { 3431 if (bucket == ls->ls_rsbtbl_size - 1) 3432 break; 3433 bucket++; 3434 continue; 3435 } 3436 lock_rsb(r); 3437 if (is_master(r)) { 3438 grant_pending_locks(r); 3439 confirm_master(r, 0); 3440 } 3441 unlock_rsb(r); 3442 put_rsb(r); 3443 schedule(); 3444 } 3445 } 3446 3447 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, 3448 uint32_t remid) 3449 { 3450 struct dlm_lkb *lkb; 3451 3452 list_for_each_entry(lkb, head, lkb_statequeue) { 3453 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid) 3454 return lkb; 3455 } 3456 return NULL; 3457 } 3458 3459 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid, 3460 uint32_t remid) 3461 { 3462 struct dlm_lkb *lkb; 3463 3464 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid); 3465 if (lkb) 3466 return lkb; 3467 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid); 3468 if (lkb) 3469 return lkb; 3470 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid); 3471 if (lkb) 3472 return lkb; 3473 return NULL; 3474 } 3475 3476 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, 3477 struct dlm_rsb *r, struct dlm_rcom *rc) 3478 { 3479 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3480 int lvblen; 3481 3482 lkb->lkb_nodeid = rc->rc_header.h_nodeid; 3483 lkb->lkb_ownpid = rl->rl_ownpid; 3484 lkb->lkb_remid = rl->rl_lkid; 3485 lkb->lkb_exflags = rl->rl_exflags; 3486 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF; 3487 lkb->lkb_flags |= DLM_IFL_MSTCPY; 3488 lkb->lkb_lvbseq = rl->rl_lvbseq; 3489 lkb->lkb_rqmode = rl->rl_rqmode; 3490 lkb->lkb_grmode = rl->rl_grmode; 3491 /* don't set lkb_status because add_lkb wants to itself */ 3492 3493 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST); 3494 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP); 3495 3496 if (lkb->lkb_exflags & DLM_LKF_VALBLK) { 3497 lkb->lkb_lvbptr = allocate_lvb(ls); 3498 if (!lkb->lkb_lvbptr) 3499 return -ENOMEM; 3500 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) - 3501 sizeof(struct rcom_lock); 3502 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen); 3503 } 3504 3505 /* Conversions between PR and CW (middle modes) need special handling. 3506 The real granted mode of these converting locks cannot be determined 3507 until all locks have been rebuilt on the rsb (recover_conversion) */ 3508 3509 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) { 3510 rl->rl_status = DLM_LKSTS_CONVERT; 3511 lkb->lkb_grmode = DLM_LOCK_IV; 3512 rsb_set_flag(r, RSB_RECOVER_CONVERT); 3513 } 3514 3515 return 0; 3516 } 3517 3518 /* This lkb may have been recovered in a previous aborted recovery so we need 3519 to check if the rsb already has an lkb with the given remote nodeid/lkid. 3520 If so we just send back a standard reply. If not, we create a new lkb with 3521 the given values and send back our lkid. We send back our lkid by sending 3522 back the rcom_lock struct we got but with the remid field filled in. */ 3523 3524 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 3525 { 3526 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3527 struct dlm_rsb *r; 3528 struct dlm_lkb *lkb; 3529 int error; 3530 3531 if (rl->rl_parent_lkid) { 3532 error = -EOPNOTSUPP; 3533 goto out; 3534 } 3535 3536 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r); 3537 if (error) 3538 goto out; 3539 3540 lock_rsb(r); 3541 3542 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid); 3543 if (lkb) { 3544 error = -EEXIST; 3545 goto out_remid; 3546 } 3547 3548 error = create_lkb(ls, &lkb); 3549 if (error) 3550 goto out_unlock; 3551 3552 error = receive_rcom_lock_args(ls, lkb, r, rc); 3553 if (error) { 3554 __put_lkb(ls, lkb); 3555 goto out_unlock; 3556 } 3557 3558 attach_lkb(r, lkb); 3559 add_lkb(r, lkb, rl->rl_status); 3560 error = 0; 3561 3562 out_remid: 3563 /* this is the new value returned to the lock holder for 3564 saving in its process-copy lkb */ 3565 rl->rl_remid = lkb->lkb_id; 3566 3567 out_unlock: 3568 unlock_rsb(r); 3569 put_rsb(r); 3570 out: 3571 if (error) 3572 log_print("recover_master_copy %d %x", error, rl->rl_lkid); 3573 rl->rl_result = error; 3574 return error; 3575 } 3576 3577 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) 3578 { 3579 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf; 3580 struct dlm_rsb *r; 3581 struct dlm_lkb *lkb; 3582 int error; 3583 3584 error = find_lkb(ls, rl->rl_lkid, &lkb); 3585 if (error) { 3586 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid); 3587 return error; 3588 } 3589 3590 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb);); 3591 3592 error = rl->rl_result; 3593 3594 r = lkb->lkb_resource; 3595 hold_rsb(r); 3596 lock_rsb(r); 3597 3598 switch (error) { 3599 case -EBADR: 3600 /* There's a chance the new master received our lock before 3601 dlm_recover_master_reply(), this wouldn't happen if we did 3602 a barrier between recover_masters and recover_locks. */ 3603 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id, 3604 (unsigned long)r, r->res_name); 3605 dlm_send_rcom_lock(r, lkb); 3606 goto out; 3607 case -EEXIST: 3608 log_debug(ls, "master copy exists %x", lkb->lkb_id); 3609 /* fall through */ 3610 case 0: 3611 lkb->lkb_remid = rl->rl_remid; 3612 break; 3613 default: 3614 log_error(ls, "dlm_recover_process_copy unknown error %d %x", 3615 error, lkb->lkb_id); 3616 } 3617 3618 /* an ack for dlm_recover_locks() which waits for replies from 3619 all the locks it sends to new masters */ 3620 dlm_recovered_lock(r); 3621 out: 3622 unlock_rsb(r); 3623 put_rsb(r); 3624 dlm_put_lkb(lkb); 3625 3626 return 0; 3627 } 3628 3629 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, 3630 int mode, uint32_t flags, void *name, unsigned int namelen, 3631 uint32_t parent_lkid) 3632 { 3633 struct dlm_lkb *lkb; 3634 struct dlm_args args; 3635 int error; 3636 3637 lock_recovery(ls); 3638 3639 error = create_lkb(ls, &lkb); 3640 if (error) { 3641 kfree(ua); 3642 goto out; 3643 } 3644 3645 if (flags & DLM_LKF_VALBLK) { 3646 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); 3647 if (!ua->lksb.sb_lvbptr) { 3648 kfree(ua); 3649 __put_lkb(ls, lkb); 3650 error = -ENOMEM; 3651 goto out; 3652 } 3653 } 3654 3655 /* After ua is attached to lkb it will be freed by free_lkb(). 3656 When DLM_IFL_USER is set, the dlm knows that this is a userspace 3657 lock and that lkb_astparam is the dlm_user_args structure. */ 3658 3659 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid, 3660 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args); 3661 lkb->lkb_flags |= DLM_IFL_USER; 3662 ua->old_mode = DLM_LOCK_IV; 3663 3664 if (error) { 3665 __put_lkb(ls, lkb); 3666 goto out; 3667 } 3668 3669 error = request_lock(ls, lkb, name, namelen, &args); 3670 3671 switch (error) { 3672 case 0: 3673 break; 3674 case -EINPROGRESS: 3675 error = 0; 3676 break; 3677 case -EAGAIN: 3678 error = 0; 3679 /* fall through */ 3680 default: 3681 __put_lkb(ls, lkb); 3682 goto out; 3683 } 3684 3685 /* add this new lkb to the per-process list of locks */ 3686 spin_lock(&ua->proc->locks_spin); 3687 kref_get(&lkb->lkb_ref); 3688 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks); 3689 spin_unlock(&ua->proc->locks_spin); 3690 out: 3691 unlock_recovery(ls); 3692 return error; 3693 } 3694 3695 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 3696 int mode, uint32_t flags, uint32_t lkid, char *lvb_in) 3697 { 3698 struct dlm_lkb *lkb; 3699 struct dlm_args args; 3700 struct dlm_user_args *ua; 3701 int error; 3702 3703 lock_recovery(ls); 3704 3705 error = find_lkb(ls, lkid, &lkb); 3706 if (error) 3707 goto out; 3708 3709 /* user can change the params on its lock when it converts it, or 3710 add an lvb that didn't exist before */ 3711 3712 ua = (struct dlm_user_args *)lkb->lkb_astparam; 3713 3714 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) { 3715 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL); 3716 if (!ua->lksb.sb_lvbptr) { 3717 error = -ENOMEM; 3718 goto out_put; 3719 } 3720 } 3721 if (lvb_in && ua->lksb.sb_lvbptr) 3722 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 3723 3724 ua->castparam = ua_tmp->castparam; 3725 ua->castaddr = ua_tmp->castaddr; 3726 ua->bastparam = ua_tmp->bastparam; 3727 ua->bastaddr = ua_tmp->bastaddr; 3728 ua->user_lksb = ua_tmp->user_lksb; 3729 ua->old_mode = lkb->lkb_grmode; 3730 3731 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST, 3732 ua, DLM_FAKE_USER_AST, &args); 3733 if (error) 3734 goto out_put; 3735 3736 error = convert_lock(ls, lkb, &args); 3737 3738 if (error == -EINPROGRESS || error == -EAGAIN) 3739 error = 0; 3740 out_put: 3741 dlm_put_lkb(lkb); 3742 out: 3743 unlock_recovery(ls); 3744 kfree(ua_tmp); 3745 return error; 3746 } 3747 3748 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 3749 uint32_t flags, uint32_t lkid, char *lvb_in) 3750 { 3751 struct dlm_lkb *lkb; 3752 struct dlm_args args; 3753 struct dlm_user_args *ua; 3754 int error; 3755 3756 lock_recovery(ls); 3757 3758 error = find_lkb(ls, lkid, &lkb); 3759 if (error) 3760 goto out; 3761 3762 ua = (struct dlm_user_args *)lkb->lkb_astparam; 3763 3764 if (lvb_in && ua->lksb.sb_lvbptr) 3765 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN); 3766 ua->castparam = ua_tmp->castparam; 3767 ua->user_lksb = ua_tmp->user_lksb; 3768 3769 error = set_unlock_args(flags, ua, &args); 3770 if (error) 3771 goto out_put; 3772 3773 error = unlock_lock(ls, lkb, &args); 3774 3775 if (error == -DLM_EUNLOCK) 3776 error = 0; 3777 if (error) 3778 goto out_put; 3779 3780 spin_lock(&ua->proc->locks_spin); 3781 /* dlm_user_add_ast() may have already taken lkb off the proc list */ 3782 if (!list_empty(&lkb->lkb_ownqueue)) 3783 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 3784 spin_unlock(&ua->proc->locks_spin); 3785 out_put: 3786 dlm_put_lkb(lkb); 3787 out: 3788 unlock_recovery(ls); 3789 return error; 3790 } 3791 3792 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp, 3793 uint32_t flags, uint32_t lkid) 3794 { 3795 struct dlm_lkb *lkb; 3796 struct dlm_args args; 3797 struct dlm_user_args *ua; 3798 int error; 3799 3800 lock_recovery(ls); 3801 3802 error = find_lkb(ls, lkid, &lkb); 3803 if (error) 3804 goto out; 3805 3806 ua = (struct dlm_user_args *)lkb->lkb_astparam; 3807 ua->castparam = ua_tmp->castparam; 3808 ua->user_lksb = ua_tmp->user_lksb; 3809 3810 error = set_unlock_args(flags, ua, &args); 3811 if (error) 3812 goto out_put; 3813 3814 error = cancel_lock(ls, lkb, &args); 3815 3816 if (error == -DLM_ECANCEL) 3817 error = 0; 3818 if (error) 3819 goto out_put; 3820 3821 /* this lkb was removed from the WAITING queue */ 3822 if (lkb->lkb_grmode == DLM_LOCK_IV) { 3823 spin_lock(&ua->proc->locks_spin); 3824 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking); 3825 spin_unlock(&ua->proc->locks_spin); 3826 } 3827 out_put: 3828 dlm_put_lkb(lkb); 3829 out: 3830 unlock_recovery(ls); 3831 return error; 3832 } 3833 3834 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 3835 { 3836 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 3837 3838 if (ua->lksb.sb_lvbptr) 3839 kfree(ua->lksb.sb_lvbptr); 3840 kfree(ua); 3841 lkb->lkb_astparam = (long)NULL; 3842 3843 /* TODO: propogate to master if needed */ 3844 return 0; 3845 } 3846 3847 /* The force flag allows the unlock to go ahead even if the lkb isn't granted. 3848 Regardless of what rsb queue the lock is on, it's removed and freed. */ 3849 3850 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb) 3851 { 3852 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam; 3853 struct dlm_args args; 3854 int error; 3855 3856 /* FIXME: we need to handle the case where the lkb is in limbo 3857 while the rsb is being looked up, currently we assert in 3858 _unlock_lock/is_remote because rsb nodeid is -1. */ 3859 3860 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args); 3861 3862 error = unlock_lock(ls, lkb, &args); 3863 if (error == -DLM_EUNLOCK) 3864 error = 0; 3865 return error; 3866 } 3867 3868 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which 3869 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts, 3870 which we clear here. */ 3871 3872 /* proc CLOSING flag is set so no more device_reads should look at proc->asts 3873 list, and no more device_writes should add lkb's to proc->locks list; so we 3874 shouldn't need to take asts_spin or locks_spin here. this assumes that 3875 device reads/writes/closes are serialized -- FIXME: we may need to serialize 3876 them ourself. */ 3877 3878 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc) 3879 { 3880 struct dlm_lkb *lkb, *safe; 3881 3882 lock_recovery(ls); 3883 mutex_lock(&ls->ls_clear_proc_locks); 3884 3885 list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) { 3886 list_del_init(&lkb->lkb_ownqueue); 3887 3888 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) { 3889 lkb->lkb_flags |= DLM_IFL_ORPHAN; 3890 orphan_proc_lock(ls, lkb); 3891 } else { 3892 lkb->lkb_flags |= DLM_IFL_DEAD; 3893 unlock_proc_lock(ls, lkb); 3894 } 3895 3896 /* this removes the reference for the proc->locks list 3897 added by dlm_user_request, it may result in the lkb 3898 being freed */ 3899 3900 dlm_put_lkb(lkb); 3901 } 3902 3903 /* in-progress unlocks */ 3904 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) { 3905 list_del_init(&lkb->lkb_ownqueue); 3906 lkb->lkb_flags |= DLM_IFL_DEAD; 3907 dlm_put_lkb(lkb); 3908 } 3909 3910 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) { 3911 list_del(&lkb->lkb_astqueue); 3912 dlm_put_lkb(lkb); 3913 } 3914 3915 mutex_unlock(&ls->ls_clear_proc_locks); 3916 unlock_recovery(ls); 3917 } 3918 3919