1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmlock.c 5 * 6 * underlying calls for lock creation 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/slab.h> 32 #include <linux/highmem.h> 33 #include <linux/utsname.h> 34 #include <linux/init.h> 35 #include <linux/sysctl.h> 36 #include <linux/random.h> 37 #include <linux/blkdev.h> 38 #include <linux/socket.h> 39 #include <linux/inet.h> 40 #include <linux/spinlock.h> 41 #include <linux/delay.h> 42 43 44 #include "cluster/heartbeat.h" 45 #include "cluster/nodemanager.h" 46 #include "cluster/tcp.h" 47 48 #include "dlmapi.h" 49 #include "dlmcommon.h" 50 51 #include "dlmconvert.h" 52 53 #define MLOG_MASK_PREFIX ML_DLM 54 #include "cluster/masklog.h" 55 56 static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED; 57 static u64 dlm_next_cookie = 1; 58 59 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 60 struct dlm_lock_resource *res, 61 struct dlm_lock *lock, int flags); 62 static void dlm_init_lock(struct dlm_lock *newlock, int type, 63 u8 node, u64 cookie); 64 static void dlm_lock_release(struct kref *kref); 65 static void dlm_lock_detach_lockres(struct dlm_lock *lock); 66 67 /* Tell us whether we can grant a new lock request. 68 * locking: 69 * caller needs: res->spinlock 70 * taken: none 71 * held on exit: none 72 * returns: 1 if the lock can be granted, 0 otherwise. 73 */ 74 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res, 75 struct dlm_lock *lock) 76 { 77 struct list_head *iter; 78 struct dlm_lock *tmplock; 79 80 list_for_each(iter, &res->granted) { 81 tmplock = list_entry(iter, struct dlm_lock, list); 82 83 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 84 return 0; 85 } 86 87 list_for_each(iter, &res->converting) { 88 tmplock = list_entry(iter, struct dlm_lock, list); 89 90 if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type)) 91 return 0; 92 } 93 94 return 1; 95 } 96 97 /* performs lock creation at the lockres master site 98 * locking: 99 * caller needs: none 100 * taken: takes and drops res->spinlock 101 * held on exit: none 102 * returns: DLM_NORMAL, DLM_NOTQUEUED 103 */ 104 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, 105 struct dlm_lock_resource *res, 106 struct dlm_lock *lock, int flags) 107 { 108 int call_ast = 0, kick_thread = 0; 109 enum dlm_status status = DLM_NORMAL; 110 111 mlog_entry("type=%d\n", lock->ml.type); 112 113 spin_lock(&res->spinlock); 114 /* if called from dlm_create_lock_handler, need to 115 * ensure it will not sleep in dlm_wait_on_lockres */ 116 status = __dlm_lockres_state_to_status(res); 117 if (status != DLM_NORMAL && 118 lock->ml.node != dlm->node_num) { 119 /* erf. state changed after lock was dropped. */ 120 spin_unlock(&res->spinlock); 121 dlm_error(status); 122 return status; 123 } 124 __dlm_wait_on_lockres(res); 125 __dlm_lockres_reserve_ast(res); 126 127 if (dlm_can_grant_new_lock(res, lock)) { 128 mlog(0, "I can grant this lock right away\n"); 129 /* got it right away */ 130 lock->lksb->status = DLM_NORMAL; 131 status = DLM_NORMAL; 132 dlm_lock_get(lock); 133 list_add_tail(&lock->list, &res->granted); 134 135 /* for the recovery lock, we can't allow the ast 136 * to be queued since the dlmthread is already 137 * frozen. but the recovery lock is always locked 138 * with LKM_NOQUEUE so we do not need the ast in 139 * this special case */ 140 if (!dlm_is_recovery_lock(res->lockname.name, 141 res->lockname.len)) { 142 kick_thread = 1; 143 call_ast = 1; 144 } else { 145 mlog(0, "%s: returning DLM_NORMAL to " 146 "node %u for reco lock\n", dlm->name, 147 lock->ml.node); 148 } 149 } else { 150 /* for NOQUEUE request, unless we get the 151 * lock right away, return DLM_NOTQUEUED */ 152 if (flags & LKM_NOQUEUE) { 153 status = DLM_NOTQUEUED; 154 if (dlm_is_recovery_lock(res->lockname.name, 155 res->lockname.len)) { 156 mlog(0, "%s: returning NOTQUEUED to " 157 "node %u for reco lock\n", dlm->name, 158 lock->ml.node); 159 } 160 } else { 161 dlm_lock_get(lock); 162 list_add_tail(&lock->list, &res->blocked); 163 kick_thread = 1; 164 } 165 } 166 167 spin_unlock(&res->spinlock); 168 wake_up(&res->wq); 169 170 /* either queue the ast or release it */ 171 if (call_ast) 172 dlm_queue_ast(dlm, lock); 173 else 174 dlm_lockres_release_ast(dlm, res); 175 176 dlm_lockres_calc_usage(dlm, res); 177 if (kick_thread) 178 dlm_kick_thread(dlm, res); 179 180 return status; 181 } 182 183 void dlm_revert_pending_lock(struct dlm_lock_resource *res, 184 struct dlm_lock *lock) 185 { 186 /* remove from local queue if it failed */ 187 list_del_init(&lock->list); 188 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 189 } 190 191 192 /* 193 * locking: 194 * caller needs: none 195 * taken: takes and drops res->spinlock 196 * held on exit: none 197 * returns: DLM_DENIED, DLM_RECOVERING, or net status 198 */ 199 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, 200 struct dlm_lock_resource *res, 201 struct dlm_lock *lock, int flags) 202 { 203 enum dlm_status status = DLM_DENIED; 204 205 mlog_entry("type=%d\n", lock->ml.type); 206 mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, 207 res->lockname.name, flags); 208 209 spin_lock(&res->spinlock); 210 211 /* will exit this call with spinlock held */ 212 __dlm_wait_on_lockres(res); 213 res->state |= DLM_LOCK_RES_IN_PROGRESS; 214 215 /* add lock to local (secondary) queue */ 216 dlm_lock_get(lock); 217 list_add_tail(&lock->list, &res->blocked); 218 lock->lock_pending = 1; 219 spin_unlock(&res->spinlock); 220 221 /* spec seems to say that you will get DLM_NORMAL when the lock 222 * has been queued, meaning we need to wait for a reply here. */ 223 status = dlm_send_remote_lock_request(dlm, res, lock, flags); 224 225 spin_lock(&res->spinlock); 226 res->state &= ~DLM_LOCK_RES_IN_PROGRESS; 227 lock->lock_pending = 0; 228 if (status != DLM_NORMAL) { 229 if (status != DLM_NOTQUEUED) 230 dlm_error(status); 231 dlm_revert_pending_lock(res, lock); 232 dlm_lock_put(lock); 233 } else if (dlm_is_recovery_lock(res->lockname.name, 234 res->lockname.len)) { 235 /* special case for the $RECOVERY lock. 236 * there will never be an AST delivered to put 237 * this lock on the proper secondary queue 238 * (granted), so do it manually. */ 239 mlog(0, "%s: $RECOVERY lock for this node (%u) is " 240 "mastered by %u; got lock, manually granting (no ast)\n", 241 dlm->name, dlm->node_num, res->owner); 242 list_del_init(&lock->list); 243 list_add_tail(&lock->list, &res->granted); 244 } 245 spin_unlock(&res->spinlock); 246 247 dlm_lockres_calc_usage(dlm, res); 248 249 wake_up(&res->wq); 250 return status; 251 } 252 253 254 /* for remote lock creation. 255 * locking: 256 * caller needs: none, but need res->state & DLM_LOCK_RES_IN_PROGRESS 257 * taken: none 258 * held on exit: none 259 * returns: DLM_NOLOCKMGR, or net status 260 */ 261 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, 262 struct dlm_lock_resource *res, 263 struct dlm_lock *lock, int flags) 264 { 265 struct dlm_create_lock create; 266 int tmpret, status = 0; 267 enum dlm_status ret; 268 269 mlog_entry_void(); 270 271 memset(&create, 0, sizeof(create)); 272 create.node_idx = dlm->node_num; 273 create.requested_type = lock->ml.type; 274 create.cookie = lock->ml.cookie; 275 create.namelen = res->lockname.len; 276 create.flags = cpu_to_be32(flags); 277 memcpy(create.name, res->lockname.name, create.namelen); 278 279 tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create, 280 sizeof(create), res->owner, &status); 281 if (tmpret >= 0) { 282 // successfully sent and received 283 ret = status; // this is already a dlm_status 284 } else { 285 mlog_errno(tmpret); 286 if (dlm_is_host_down(tmpret)) { 287 ret = DLM_RECOVERING; 288 mlog(0, "node %u died so returning DLM_RECOVERING " 289 "from lock message!\n", res->owner); 290 } else { 291 ret = dlm_err_to_dlm_status(tmpret); 292 } 293 } 294 295 return ret; 296 } 297 298 void dlm_lock_get(struct dlm_lock *lock) 299 { 300 kref_get(&lock->lock_refs); 301 } 302 303 void dlm_lock_put(struct dlm_lock *lock) 304 { 305 kref_put(&lock->lock_refs, dlm_lock_release); 306 } 307 308 static void dlm_lock_release(struct kref *kref) 309 { 310 struct dlm_lock *lock; 311 312 lock = container_of(kref, struct dlm_lock, lock_refs); 313 314 BUG_ON(!list_empty(&lock->list)); 315 BUG_ON(!list_empty(&lock->ast_list)); 316 BUG_ON(!list_empty(&lock->bast_list)); 317 BUG_ON(lock->ast_pending); 318 BUG_ON(lock->bast_pending); 319 320 dlm_lock_detach_lockres(lock); 321 322 if (lock->lksb_kernel_allocated) { 323 mlog(0, "freeing kernel-allocated lksb\n"); 324 kfree(lock->lksb); 325 } 326 kfree(lock); 327 } 328 329 /* associate a lock with it's lockres, getting a ref on the lockres */ 330 void dlm_lock_attach_lockres(struct dlm_lock *lock, 331 struct dlm_lock_resource *res) 332 { 333 dlm_lockres_get(res); 334 lock->lockres = res; 335 } 336 337 /* drop ref on lockres, if there is still one associated with lock */ 338 static void dlm_lock_detach_lockres(struct dlm_lock *lock) 339 { 340 struct dlm_lock_resource *res; 341 342 res = lock->lockres; 343 if (res) { 344 lock->lockres = NULL; 345 mlog(0, "removing lock's lockres reference\n"); 346 dlm_lockres_put(res); 347 } 348 } 349 350 static void dlm_init_lock(struct dlm_lock *newlock, int type, 351 u8 node, u64 cookie) 352 { 353 INIT_LIST_HEAD(&newlock->list); 354 INIT_LIST_HEAD(&newlock->ast_list); 355 INIT_LIST_HEAD(&newlock->bast_list); 356 spin_lock_init(&newlock->spinlock); 357 newlock->ml.type = type; 358 newlock->ml.convert_type = LKM_IVMODE; 359 newlock->ml.highest_blocked = LKM_IVMODE; 360 newlock->ml.node = node; 361 newlock->ml.pad1 = 0; 362 newlock->ml.list = 0; 363 newlock->ml.flags = 0; 364 newlock->ast = NULL; 365 newlock->bast = NULL; 366 newlock->astdata = NULL; 367 newlock->ml.cookie = cpu_to_be64(cookie); 368 newlock->ast_pending = 0; 369 newlock->bast_pending = 0; 370 newlock->convert_pending = 0; 371 newlock->lock_pending = 0; 372 newlock->unlock_pending = 0; 373 newlock->cancel_pending = 0; 374 newlock->lksb_kernel_allocated = 0; 375 376 kref_init(&newlock->lock_refs); 377 } 378 379 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, 380 struct dlm_lockstatus *lksb) 381 { 382 struct dlm_lock *lock; 383 int kernel_allocated = 0; 384 385 lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); 386 if (!lock) 387 return NULL; 388 389 if (!lksb) { 390 /* zero memory only if kernel-allocated */ 391 lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); 392 if (!lksb) { 393 kfree(lock); 394 return NULL; 395 } 396 kernel_allocated = 1; 397 } 398 399 dlm_init_lock(lock, type, node, cookie); 400 if (kernel_allocated) 401 lock->lksb_kernel_allocated = 1; 402 lock->lksb = lksb; 403 lksb->lockid = lock; 404 return lock; 405 } 406 407 /* handler for lock creation net message 408 * locking: 409 * caller needs: none 410 * taken: takes and drops res->spinlock 411 * held on exit: none 412 * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED 413 */ 414 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) 415 { 416 struct dlm_ctxt *dlm = data; 417 struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; 418 struct dlm_lock_resource *res = NULL; 419 struct dlm_lock *newlock = NULL; 420 struct dlm_lockstatus *lksb = NULL; 421 enum dlm_status status = DLM_NORMAL; 422 char *name; 423 unsigned int namelen; 424 425 BUG_ON(!dlm); 426 427 mlog_entry_void(); 428 429 if (!dlm_grab(dlm)) 430 return DLM_REJECTED; 431 432 mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), 433 "Domain %s not fully joined!\n", dlm->name); 434 435 name = create->name; 436 namelen = create->namelen; 437 438 status = DLM_IVBUFLEN; 439 if (namelen > DLM_LOCKID_NAME_MAX) { 440 dlm_error(status); 441 goto leave; 442 } 443 444 status = DLM_SYSERR; 445 newlock = dlm_new_lock(create->requested_type, 446 create->node_idx, 447 be64_to_cpu(create->cookie), NULL); 448 if (!newlock) { 449 dlm_error(status); 450 goto leave; 451 } 452 453 lksb = newlock->lksb; 454 455 if (be32_to_cpu(create->flags) & LKM_GET_LVB) { 456 lksb->flags |= DLM_LKSB_GET_LVB; 457 mlog(0, "set DLM_LKSB_GET_LVB flag\n"); 458 } 459 460 status = DLM_IVLOCKID; 461 res = dlm_lookup_lockres(dlm, name, namelen); 462 if (!res) { 463 dlm_error(status); 464 goto leave; 465 } 466 467 spin_lock(&res->spinlock); 468 status = __dlm_lockres_state_to_status(res); 469 spin_unlock(&res->spinlock); 470 471 if (status != DLM_NORMAL) { 472 mlog(0, "lockres recovering/migrating/in-progress\n"); 473 goto leave; 474 } 475 476 dlm_lock_attach_lockres(newlock, res); 477 478 status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags)); 479 leave: 480 if (status != DLM_NORMAL) 481 if (newlock) 482 dlm_lock_put(newlock); 483 484 if (res) 485 dlm_lockres_put(res); 486 487 dlm_put(dlm); 488 489 return status; 490 } 491 492 493 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */ 494 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie) 495 { 496 u64 tmpnode = node_num; 497 498 /* shift single byte of node num into top 8 bits */ 499 tmpnode <<= 56; 500 501 spin_lock(&dlm_cookie_lock); 502 *cookie = (dlm_next_cookie | tmpnode); 503 if (++dlm_next_cookie & 0xff00000000000000ull) { 504 mlog(0, "This node's cookie will now wrap!\n"); 505 dlm_next_cookie = 1; 506 } 507 spin_unlock(&dlm_cookie_lock); 508 } 509 510 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, 511 struct dlm_lockstatus *lksb, int flags, 512 const char *name, dlm_astlockfunc_t *ast, void *data, 513 dlm_bastlockfunc_t *bast) 514 { 515 enum dlm_status status; 516 struct dlm_lock_resource *res = NULL; 517 struct dlm_lock *lock = NULL; 518 int convert = 0, recovery = 0; 519 520 /* yes this function is a mess. 521 * TODO: clean this up. lots of common code in the 522 * lock and convert paths, especially in the retry blocks */ 523 if (!lksb) { 524 dlm_error(DLM_BADARGS); 525 return DLM_BADARGS; 526 } 527 528 status = DLM_BADPARAM; 529 if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) { 530 dlm_error(status); 531 goto error; 532 } 533 534 if (flags & ~LKM_VALID_FLAGS) { 535 dlm_error(status); 536 goto error; 537 } 538 539 convert = (flags & LKM_CONVERT); 540 recovery = (flags & LKM_RECOVERY); 541 542 if (recovery && 543 (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) { 544 dlm_error(status); 545 goto error; 546 } 547 if (convert && (flags & LKM_LOCAL)) { 548 mlog(ML_ERROR, "strange LOCAL convert request!\n"); 549 goto error; 550 } 551 552 if (convert) { 553 /* CONVERT request */ 554 555 /* if converting, must pass in a valid dlm_lock */ 556 lock = lksb->lockid; 557 if (!lock) { 558 mlog(ML_ERROR, "NULL lock pointer in convert " 559 "request\n"); 560 goto error; 561 } 562 563 res = lock->lockres; 564 if (!res) { 565 mlog(ML_ERROR, "NULL lockres pointer in convert " 566 "request\n"); 567 goto error; 568 } 569 dlm_lockres_get(res); 570 571 /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are 572 * static after the original lock call. convert requests will 573 * ensure that everything is the same, or return DLM_BADARGS. 574 * this means that DLM_DENIED_NOASTS will never be returned. 575 */ 576 if (lock->lksb != lksb || lock->ast != ast || 577 lock->bast != bast || lock->astdata != data) { 578 status = DLM_BADARGS; 579 mlog(ML_ERROR, "new args: lksb=%p, ast=%p, bast=%p, " 580 "astdata=%p\n", lksb, ast, bast, data); 581 mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, " 582 "astdata=%p\n", lock->lksb, lock->ast, 583 lock->bast, lock->astdata); 584 goto error; 585 } 586 retry_convert: 587 dlm_wait_for_recovery(dlm); 588 589 if (res->owner == dlm->node_num) 590 status = dlmconvert_master(dlm, res, lock, flags, mode); 591 else 592 status = dlmconvert_remote(dlm, res, lock, flags, mode); 593 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 594 status == DLM_FORWARD) { 595 /* for now, see how this works without sleeping 596 * and just retry right away. I suspect the reco 597 * or migration will complete fast enough that 598 * no waiting will be necessary */ 599 mlog(0, "retrying convert with migration/recovery/" 600 "in-progress\n"); 601 msleep(100); 602 goto retry_convert; 603 } 604 } else { 605 u64 tmpcookie; 606 607 /* LOCK request */ 608 status = DLM_BADARGS; 609 if (!name) { 610 dlm_error(status); 611 goto error; 612 } 613 614 status = DLM_IVBUFLEN; 615 if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) { 616 dlm_error(status); 617 goto error; 618 } 619 620 dlm_get_next_cookie(dlm->node_num, &tmpcookie); 621 lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb); 622 if (!lock) { 623 dlm_error(status); 624 goto error; 625 } 626 627 if (!recovery) 628 dlm_wait_for_recovery(dlm); 629 630 /* find or create the lock resource */ 631 res = dlm_get_lock_resource(dlm, name, flags); 632 if (!res) { 633 status = DLM_IVLOCKID; 634 dlm_error(status); 635 goto error; 636 } 637 638 mlog(0, "type=%d, flags = 0x%x\n", mode, flags); 639 mlog(0, "creating lock: lock=%p res=%p\n", lock, res); 640 641 dlm_lock_attach_lockres(lock, res); 642 lock->ast = ast; 643 lock->bast = bast; 644 lock->astdata = data; 645 646 retry_lock: 647 if (flags & LKM_VALBLK) { 648 mlog(0, "LKM_VALBLK passed by caller\n"); 649 650 /* LVB requests for non PR, PW or EX locks are 651 * ignored. */ 652 if (mode < LKM_PRMODE) 653 flags &= ~LKM_VALBLK; 654 else { 655 flags |= LKM_GET_LVB; 656 lock->lksb->flags |= DLM_LKSB_GET_LVB; 657 } 658 } 659 660 if (res->owner == dlm->node_num) 661 status = dlmlock_master(dlm, res, lock, flags); 662 else 663 status = dlmlock_remote(dlm, res, lock, flags); 664 665 if (status == DLM_RECOVERING || status == DLM_MIGRATING || 666 status == DLM_FORWARD) { 667 mlog(0, "retrying lock with migration/" 668 "recovery/in progress\n"); 669 msleep(100); 670 /* no waiting for dlm_reco_thread */ 671 if (recovery) { 672 if (status == DLM_RECOVERING) { 673 mlog(0, "%s: got RECOVERING " 674 "for $REOCVERY lock, master " 675 "was %u\n", dlm->name, 676 res->owner); 677 dlm_wait_for_node_death(dlm, res->owner, 678 DLM_NODE_DEATH_WAIT_MAX); 679 } 680 } else { 681 dlm_wait_for_recovery(dlm); 682 } 683 goto retry_lock; 684 } 685 686 if (status != DLM_NORMAL) { 687 lock->lksb->flags &= ~DLM_LKSB_GET_LVB; 688 if (status != DLM_NOTQUEUED) 689 dlm_error(status); 690 goto error; 691 } 692 } 693 694 error: 695 if (status != DLM_NORMAL) { 696 if (lock && !convert) 697 dlm_lock_put(lock); 698 // this is kind of unnecessary 699 lksb->status = status; 700 } 701 702 /* put lockres ref from the convert path 703 * or from dlm_get_lock_resource */ 704 if (res) 705 dlm_lockres_put(res); 706 707 return status; 708 } 709 EXPORT_SYMBOL_GPL(dlmlock); 710