1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * userdlm.c 6 * 7 * Code which implements the kernel side of a minimal userspace 8 * interface to our DLM. 9 * 10 * Many of the functions here are pared down versions of dlmglue.c 11 * functions. 12 * 13 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 14 */ 15 16 #include <linux/signal.h> 17 #include <linux/sched/signal.h> 18 19 #include <linux/module.h> 20 #include <linux/fs.h> 21 #include <linux/types.h> 22 #include <linux/crc32.h> 23 24 #include "../ocfs2_lockingver.h" 25 #include "../stackglue.h" 26 #include "userdlm.h" 27 28 #define MLOG_MASK_PREFIX ML_DLMFS 29 #include "../cluster/masklog.h" 30 31 32 static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) 33 { 34 return container_of(lksb, struct user_lock_res, l_lksb); 35 } 36 37 static inline int user_check_wait_flag(struct user_lock_res *lockres, 38 int flag) 39 { 40 int ret; 41 42 spin_lock(&lockres->l_lock); 43 ret = lockres->l_flags & flag; 44 spin_unlock(&lockres->l_lock); 45 46 return ret; 47 } 48 49 static inline void user_wait_on_busy_lock(struct user_lock_res *lockres) 50 51 { 52 wait_event(lockres->l_event, 53 !user_check_wait_flag(lockres, USER_LOCK_BUSY)); 54 } 55 56 static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres) 57 58 { 59 wait_event(lockres->l_event, 60 !user_check_wait_flag(lockres, USER_LOCK_BLOCKED)); 61 } 62 63 /* I heart container_of... */ 64 static inline struct ocfs2_cluster_connection * 65 cluster_connection_from_user_lockres(struct user_lock_res *lockres) 66 { 67 struct dlmfs_inode_private *ip; 68 69 ip = container_of(lockres, 70 struct dlmfs_inode_private, 71 ip_lockres); 72 return ip->ip_conn; 73 } 74 75 static struct inode * 76 user_dlm_inode_from_user_lockres(struct user_lock_res *lockres) 77 { 78 struct dlmfs_inode_private *ip; 79 80 ip = container_of(lockres, 81 struct dlmfs_inode_private, 82 ip_lockres); 83 return &ip->ip_vfs_inode; 84 } 85 86 static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) 87 { 88 spin_lock(&lockres->l_lock); 89 lockres->l_flags &= ~USER_LOCK_BUSY; 90 spin_unlock(&lockres->l_lock); 91 } 92 93 #define user_log_dlm_error(_func, _stat, _lockres) do { \ 94 mlog(ML_ERROR, "Dlm error %d while calling %s on " \ 95 "resource %.*s\n", _stat, _func, \ 96 _lockres->l_namelen, _lockres->l_name); \ 97 } while (0) 98 99 /* WARNING: This function lives in a world where the only three lock 100 * levels are EX, PR, and NL. It *will* have to be adjusted when more 101 * lock types are added. */ 102 static inline int user_highest_compat_lock_level(int level) 103 { 104 int new_level = DLM_LOCK_EX; 105 106 if (level == DLM_LOCK_EX) 107 new_level = DLM_LOCK_NL; 108 else if (level == DLM_LOCK_PR) 109 new_level = DLM_LOCK_PR; 110 return new_level; 111 } 112 113 static void user_ast(struct ocfs2_dlm_lksb *lksb) 114 { 115 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 116 int status; 117 118 mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n", 119 lockres->l_namelen, lockres->l_name, lockres->l_level, 120 lockres->l_requested); 121 122 spin_lock(&lockres->l_lock); 123 124 status = ocfs2_dlm_lock_status(&lockres->l_lksb); 125 if (status) { 126 mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", 127 status, lockres->l_namelen, lockres->l_name); 128 spin_unlock(&lockres->l_lock); 129 return; 130 } 131 132 mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV, 133 "Lockres %.*s, requested ivmode. flags 0x%x\n", 134 lockres->l_namelen, lockres->l_name, lockres->l_flags); 135 136 /* we're downconverting. */ 137 if (lockres->l_requested < lockres->l_level) { 138 if (lockres->l_requested <= 139 user_highest_compat_lock_level(lockres->l_blocking)) { 140 lockres->l_blocking = DLM_LOCK_NL; 141 lockres->l_flags &= ~USER_LOCK_BLOCKED; 142 } 143 } 144 145 lockres->l_level = lockres->l_requested; 146 lockres->l_requested = DLM_LOCK_IV; 147 lockres->l_flags |= USER_LOCK_ATTACHED; 148 lockres->l_flags &= ~USER_LOCK_BUSY; 149 150 spin_unlock(&lockres->l_lock); 151 152 wake_up(&lockres->l_event); 153 } 154 155 static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres) 156 { 157 struct inode *inode; 158 inode = user_dlm_inode_from_user_lockres(lockres); 159 if (!igrab(inode)) 160 BUG(); 161 } 162 163 static void user_dlm_unblock_lock(struct work_struct *work); 164 165 static void __user_dlm_queue_lockres(struct user_lock_res *lockres) 166 { 167 if (!(lockres->l_flags & USER_LOCK_QUEUED)) { 168 user_dlm_grab_inode_ref(lockres); 169 170 INIT_WORK(&lockres->l_work, user_dlm_unblock_lock); 171 172 queue_work(user_dlm_worker, &lockres->l_work); 173 lockres->l_flags |= USER_LOCK_QUEUED; 174 } 175 } 176 177 static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) 178 { 179 int queue = 0; 180 181 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) 182 return; 183 184 switch (lockres->l_blocking) { 185 case DLM_LOCK_EX: 186 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 187 queue = 1; 188 break; 189 case DLM_LOCK_PR: 190 if (!lockres->l_ex_holders) 191 queue = 1; 192 break; 193 default: 194 BUG(); 195 } 196 197 if (queue) 198 __user_dlm_queue_lockres(lockres); 199 } 200 201 static void user_bast(struct ocfs2_dlm_lksb *lksb, int level) 202 { 203 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 204 205 mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n", 206 lockres->l_namelen, lockres->l_name, level, lockres->l_level); 207 208 spin_lock(&lockres->l_lock); 209 lockres->l_flags |= USER_LOCK_BLOCKED; 210 if (level > lockres->l_blocking) 211 lockres->l_blocking = level; 212 213 __user_dlm_queue_lockres(lockres); 214 spin_unlock(&lockres->l_lock); 215 216 wake_up(&lockres->l_event); 217 } 218 219 static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status) 220 { 221 struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); 222 223 mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n", 224 lockres->l_namelen, lockres->l_name, lockres->l_flags); 225 226 if (status) 227 mlog(ML_ERROR, "dlm returns status %d\n", status); 228 229 spin_lock(&lockres->l_lock); 230 /* The teardown flag gets set early during the unlock process, 231 * so test the cancel flag to make sure that this ast isn't 232 * for a concurrent cancel. */ 233 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN 234 && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { 235 lockres->l_level = DLM_LOCK_IV; 236 } else if (status == DLM_CANCELGRANT) { 237 /* We tried to cancel a convert request, but it was 238 * already granted. Don't clear the busy flag - the 239 * ast should've done this already. */ 240 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 241 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 242 goto out_noclear; 243 } else { 244 BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); 245 /* Cancel succeeded, we want to re-queue */ 246 lockres->l_requested = DLM_LOCK_IV; /* cancel an 247 * upconvert 248 * request. */ 249 lockres->l_flags &= ~USER_LOCK_IN_CANCEL; 250 /* we want the unblock thread to look at it again 251 * now. */ 252 if (lockres->l_flags & USER_LOCK_BLOCKED) 253 __user_dlm_queue_lockres(lockres); 254 } 255 256 lockres->l_flags &= ~USER_LOCK_BUSY; 257 out_noclear: 258 spin_unlock(&lockres->l_lock); 259 260 wake_up(&lockres->l_event); 261 } 262 263 /* 264 * This is the userdlmfs locking protocol version. 265 * 266 * See fs/ocfs2/dlmglue.c for more details on locking versions. 267 */ 268 static struct ocfs2_locking_protocol user_dlm_lproto = { 269 .lp_max_version = { 270 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 271 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 272 }, 273 .lp_lock_ast = user_ast, 274 .lp_blocking_ast = user_bast, 275 .lp_unlock_ast = user_unlock_ast, 276 }; 277 278 static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) 279 { 280 struct inode *inode; 281 inode = user_dlm_inode_from_user_lockres(lockres); 282 iput(inode); 283 } 284 285 static void user_dlm_unblock_lock(struct work_struct *work) 286 { 287 int new_level, status; 288 struct user_lock_res *lockres = 289 container_of(work, struct user_lock_res, l_work); 290 struct ocfs2_cluster_connection *conn = 291 cluster_connection_from_user_lockres(lockres); 292 293 mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 294 295 spin_lock(&lockres->l_lock); 296 297 mlog_bug_on_msg(!(lockres->l_flags & USER_LOCK_QUEUED), 298 "Lockres %.*s, flags 0x%x\n", 299 lockres->l_namelen, lockres->l_name, lockres->l_flags); 300 301 /* notice that we don't clear USER_LOCK_BLOCKED here. If it's 302 * set, we want user_ast clear it. */ 303 lockres->l_flags &= ~USER_LOCK_QUEUED; 304 305 /* It's valid to get here and no longer be blocked - if we get 306 * several basts in a row, we might be queued by the first 307 * one, the unblock thread might run and clear the queued 308 * flag, and finally we might get another bast which re-queues 309 * us before our ast for the downconvert is called. */ 310 if (!(lockres->l_flags & USER_LOCK_BLOCKED)) { 311 mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n", 312 lockres->l_namelen, lockres->l_name); 313 spin_unlock(&lockres->l_lock); 314 goto drop_ref; 315 } 316 317 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 318 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n", 319 lockres->l_namelen, lockres->l_name); 320 spin_unlock(&lockres->l_lock); 321 goto drop_ref; 322 } 323 324 if (lockres->l_flags & USER_LOCK_BUSY) { 325 if (lockres->l_flags & USER_LOCK_IN_CANCEL) { 326 mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n", 327 lockres->l_namelen, lockres->l_name); 328 spin_unlock(&lockres->l_lock); 329 goto drop_ref; 330 } 331 332 lockres->l_flags |= USER_LOCK_IN_CANCEL; 333 spin_unlock(&lockres->l_lock); 334 335 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, 336 DLM_LKF_CANCEL); 337 if (status) 338 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 339 goto drop_ref; 340 } 341 342 /* If there are still incompat holders, we can exit safely 343 * without worrying about re-queueing this lock as that will 344 * happen on the last call to user_cluster_unlock. */ 345 if ((lockres->l_blocking == DLM_LOCK_EX) 346 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 347 spin_unlock(&lockres->l_lock); 348 mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n", 349 lockres->l_namelen, lockres->l_name, 350 lockres->l_ex_holders, lockres->l_ro_holders); 351 goto drop_ref; 352 } 353 354 if ((lockres->l_blocking == DLM_LOCK_PR) 355 && lockres->l_ex_holders) { 356 spin_unlock(&lockres->l_lock); 357 mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n", 358 lockres->l_namelen, lockres->l_name, 359 lockres->l_ex_holders); 360 goto drop_ref; 361 } 362 363 /* yay, we can downconvert now. */ 364 new_level = user_highest_compat_lock_level(lockres->l_blocking); 365 lockres->l_requested = new_level; 366 lockres->l_flags |= USER_LOCK_BUSY; 367 mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n", 368 lockres->l_namelen, lockres->l_name, lockres->l_level, new_level); 369 spin_unlock(&lockres->l_lock); 370 371 /* need lock downconvert request now... */ 372 status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb, 373 DLM_LKF_CONVERT|DLM_LKF_VALBLK, 374 lockres->l_name, 375 lockres->l_namelen); 376 if (status) { 377 user_log_dlm_error("ocfs2_dlm_lock", status, lockres); 378 user_recover_from_dlm_error(lockres); 379 } 380 381 drop_ref: 382 user_dlm_drop_inode_ref(lockres); 383 } 384 385 static inline void user_dlm_inc_holders(struct user_lock_res *lockres, 386 int level) 387 { 388 switch(level) { 389 case DLM_LOCK_EX: 390 lockres->l_ex_holders++; 391 break; 392 case DLM_LOCK_PR: 393 lockres->l_ro_holders++; 394 break; 395 default: 396 BUG(); 397 } 398 } 399 400 /* predict what lock level we'll be dropping down to on behalf 401 * of another node, and return true if the currently wanted 402 * level will be compatible with it. */ 403 static inline int 404 user_may_continue_on_blocked_lock(struct user_lock_res *lockres, 405 int wanted) 406 { 407 BUG_ON(!(lockres->l_flags & USER_LOCK_BLOCKED)); 408 409 return wanted <= user_highest_compat_lock_level(lockres->l_blocking); 410 } 411 412 int user_dlm_cluster_lock(struct user_lock_res *lockres, 413 int level, 414 int lkm_flags) 415 { 416 int status, local_flags; 417 struct ocfs2_cluster_connection *conn = 418 cluster_connection_from_user_lockres(lockres); 419 420 if (level != DLM_LOCK_EX && 421 level != DLM_LOCK_PR) { 422 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 423 lockres->l_namelen, lockres->l_name); 424 status = -EINVAL; 425 goto bail; 426 } 427 428 mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n", 429 lockres->l_namelen, lockres->l_name, level, lkm_flags); 430 431 again: 432 if (signal_pending(current)) { 433 status = -ERESTARTSYS; 434 goto bail; 435 } 436 437 spin_lock(&lockres->l_lock); 438 439 /* We only compare against the currently granted level 440 * here. If the lock is blocked waiting on a downconvert, 441 * we'll get caught below. */ 442 if ((lockres->l_flags & USER_LOCK_BUSY) && 443 (level > lockres->l_level)) { 444 /* is someone sitting in dlm_lock? If so, wait on 445 * them. */ 446 spin_unlock(&lockres->l_lock); 447 448 user_wait_on_busy_lock(lockres); 449 goto again; 450 } 451 452 if ((lockres->l_flags & USER_LOCK_BLOCKED) && 453 (!user_may_continue_on_blocked_lock(lockres, level))) { 454 /* is the lock is currently blocked on behalf of 455 * another node */ 456 spin_unlock(&lockres->l_lock); 457 458 user_wait_on_blocked_lock(lockres); 459 goto again; 460 } 461 462 if (level > lockres->l_level) { 463 local_flags = lkm_flags | DLM_LKF_VALBLK; 464 if (lockres->l_level != DLM_LOCK_IV) 465 local_flags |= DLM_LKF_CONVERT; 466 467 lockres->l_requested = level; 468 lockres->l_flags |= USER_LOCK_BUSY; 469 spin_unlock(&lockres->l_lock); 470 471 BUG_ON(level == DLM_LOCK_IV); 472 BUG_ON(level == DLM_LOCK_NL); 473 474 /* call dlm_lock to upgrade lock now */ 475 status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb, 476 local_flags, lockres->l_name, 477 lockres->l_namelen); 478 if (status) { 479 if ((lkm_flags & DLM_LKF_NOQUEUE) && 480 (status != -EAGAIN)) 481 user_log_dlm_error("ocfs2_dlm_lock", 482 status, lockres); 483 user_recover_from_dlm_error(lockres); 484 goto bail; 485 } 486 487 user_wait_on_busy_lock(lockres); 488 goto again; 489 } 490 491 user_dlm_inc_holders(lockres, level); 492 spin_unlock(&lockres->l_lock); 493 494 status = 0; 495 bail: 496 return status; 497 } 498 499 static inline void user_dlm_dec_holders(struct user_lock_res *lockres, 500 int level) 501 { 502 switch(level) { 503 case DLM_LOCK_EX: 504 BUG_ON(!lockres->l_ex_holders); 505 lockres->l_ex_holders--; 506 break; 507 case DLM_LOCK_PR: 508 BUG_ON(!lockres->l_ro_holders); 509 lockres->l_ro_holders--; 510 break; 511 default: 512 BUG(); 513 } 514 } 515 516 void user_dlm_cluster_unlock(struct user_lock_res *lockres, 517 int level) 518 { 519 if (level != DLM_LOCK_EX && 520 level != DLM_LOCK_PR) { 521 mlog(ML_ERROR, "lockres %.*s: invalid request!\n", 522 lockres->l_namelen, lockres->l_name); 523 return; 524 } 525 526 spin_lock(&lockres->l_lock); 527 user_dlm_dec_holders(lockres, level); 528 __user_dlm_cond_queue_lockres(lockres); 529 spin_unlock(&lockres->l_lock); 530 } 531 532 void user_dlm_write_lvb(struct inode *inode, 533 const char *val, 534 unsigned int len) 535 { 536 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 537 char *lvb; 538 539 BUG_ON(len > DLM_LVB_LEN); 540 541 spin_lock(&lockres->l_lock); 542 543 BUG_ON(lockres->l_level < DLM_LOCK_EX); 544 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 545 memcpy(lvb, val, len); 546 547 spin_unlock(&lockres->l_lock); 548 } 549 550 ssize_t user_dlm_read_lvb(struct inode *inode, 551 char *val, 552 unsigned int len) 553 { 554 struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; 555 char *lvb; 556 ssize_t ret = len; 557 558 BUG_ON(len > DLM_LVB_LEN); 559 560 spin_lock(&lockres->l_lock); 561 562 BUG_ON(lockres->l_level < DLM_LOCK_PR); 563 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) { 564 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 565 memcpy(val, lvb, len); 566 } else 567 ret = 0; 568 569 spin_unlock(&lockres->l_lock); 570 return ret; 571 } 572 573 void user_dlm_lock_res_init(struct user_lock_res *lockres, 574 struct dentry *dentry) 575 { 576 memset(lockres, 0, sizeof(*lockres)); 577 578 spin_lock_init(&lockres->l_lock); 579 init_waitqueue_head(&lockres->l_event); 580 lockres->l_level = DLM_LOCK_IV; 581 lockres->l_requested = DLM_LOCK_IV; 582 lockres->l_blocking = DLM_LOCK_IV; 583 584 /* should have been checked before getting here. */ 585 BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); 586 587 memcpy(lockres->l_name, 588 dentry->d_name.name, 589 dentry->d_name.len); 590 lockres->l_namelen = dentry->d_name.len; 591 } 592 593 int user_dlm_destroy_lock(struct user_lock_res *lockres) 594 { 595 int status = -EBUSY; 596 struct ocfs2_cluster_connection *conn = 597 cluster_connection_from_user_lockres(lockres); 598 599 mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); 600 601 spin_lock(&lockres->l_lock); 602 if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { 603 spin_unlock(&lockres->l_lock); 604 return 0; 605 } 606 607 lockres->l_flags |= USER_LOCK_IN_TEARDOWN; 608 609 while (lockres->l_flags & USER_LOCK_BUSY) { 610 spin_unlock(&lockres->l_lock); 611 612 user_wait_on_busy_lock(lockres); 613 614 spin_lock(&lockres->l_lock); 615 } 616 617 if (lockres->l_ro_holders || lockres->l_ex_holders) { 618 spin_unlock(&lockres->l_lock); 619 goto bail; 620 } 621 622 status = 0; 623 if (!(lockres->l_flags & USER_LOCK_ATTACHED)) { 624 spin_unlock(&lockres->l_lock); 625 goto bail; 626 } 627 628 lockres->l_flags &= ~USER_LOCK_ATTACHED; 629 lockres->l_flags |= USER_LOCK_BUSY; 630 spin_unlock(&lockres->l_lock); 631 632 status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK); 633 if (status) { 634 user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); 635 goto bail; 636 } 637 638 user_wait_on_busy_lock(lockres); 639 640 status = 0; 641 bail: 642 return status; 643 } 644 645 static void user_dlm_recovery_handler_noop(int node_num, 646 void *recovery_data) 647 { 648 /* We ignore recovery events */ 649 return; 650 } 651 652 void user_dlm_set_locking_protocol(void) 653 { 654 ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version); 655 } 656 657 struct ocfs2_cluster_connection *user_dlm_register(const struct qstr *name) 658 { 659 int rc; 660 struct ocfs2_cluster_connection *conn; 661 662 rc = ocfs2_cluster_connect_agnostic(name->name, name->len, 663 &user_dlm_lproto, 664 user_dlm_recovery_handler_noop, 665 NULL, &conn); 666 if (rc) 667 mlog_errno(rc); 668 669 return rc ? ERR_PTR(rc) : conn; 670 } 671 672 void user_dlm_unregister(struct ocfs2_cluster_connection *conn) 673 { 674 ocfs2_cluster_disconnect(conn, 0); 675 } 676