1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * dlmthread.c 4 * 5 * standalone DLM module 6 * 7 * Copyright (C) 2004 Oracle. All rights reserved. 8 */ 9 10 11 #include <linux/module.h> 12 #include <linux/fs.h> 13 #include <linux/types.h> 14 #include <linux/highmem.h> 15 #include <linux/init.h> 16 #include <linux/sysctl.h> 17 #include <linux/random.h> 18 #include <linux/blkdev.h> 19 #include <linux/socket.h> 20 #include <linux/inet.h> 21 #include <linux/timer.h> 22 #include <linux/kthread.h> 23 #include <linux/delay.h> 24 25 26 #include "../cluster/heartbeat.h" 27 #include "../cluster/nodemanager.h" 28 #include "../cluster/tcp.h" 29 30 #include "dlmapi.h" 31 #include "dlmcommon.h" 32 #include "dlmdomain.h" 33 34 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 35 #include "../cluster/masklog.h" 36 37 static int dlm_thread(void *data); 38 static void dlm_flush_asts(struct dlm_ctxt *dlm); 39 40 /* will exit holding res->spinlock, but may drop in function */ 41 /* waits until flags are cleared on res->state */ 42 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 43 { 44 DECLARE_WAITQUEUE(wait, current); 45 46 assert_spin_locked(&res->spinlock); 47 48 add_wait_queue(&res->wq, &wait); 49 repeat: 50 set_current_state(TASK_UNINTERRUPTIBLE); 51 if (res->state & flags) { 52 spin_unlock(&res->spinlock); 53 schedule(); 54 spin_lock(&res->spinlock); 55 goto repeat; 56 } 57 remove_wait_queue(&res->wq, &wait); 58 __set_current_state(TASK_RUNNING); 59 } 60 61 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 62 { 63 if (list_empty(&res->granted) && 64 list_empty(&res->converting) && 65 list_empty(&res->blocked)) 66 return 0; 67 return 1; 68 } 69 70 /* "unused": the lockres has no locks, is not on the dirty list, 71 * has no inflight locks (in the gap between mastery and acquiring 72 * the first lock), and has no bits in its refmap. 73 * truly ready to be freed. */ 74 int __dlm_lockres_unused(struct dlm_lock_resource *res) 75 { 76 int bit; 77 78 assert_spin_locked(&res->spinlock); 79 80 if (__dlm_lockres_has_locks(res)) 81 return 0; 82 83 /* Locks are in the process of being created */ 84 if (res->inflight_locks) 85 return 0; 86 87 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 88 return 0; 89 90 if (res->state & (DLM_LOCK_RES_RECOVERING| 91 DLM_LOCK_RES_RECOVERY_WAITING)) 92 return 0; 93 94 /* Another node has this resource with this node as the master */ 95 bit = find_first_bit(res->refmap, O2NM_MAX_NODES); 96 if (bit < O2NM_MAX_NODES) 97 return 0; 98 99 return 1; 100 } 101 102 103 /* Call whenever you may have added or deleted something from one of 104 * the lockres queue's. This will figure out whether it belongs on the 105 * unused list or not and does the appropriate thing. */ 106 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 107 struct dlm_lock_resource *res) 108 { 109 assert_spin_locked(&dlm->spinlock); 110 assert_spin_locked(&res->spinlock); 111 112 if (__dlm_lockres_unused(res)){ 113 if (list_empty(&res->purge)) { 114 mlog(0, "%s: Adding res %.*s to purge list\n", 115 dlm->name, res->lockname.len, res->lockname.name); 116 117 res->last_used = jiffies; 118 dlm_lockres_get(res); 119 list_add_tail(&res->purge, &dlm->purge_list); 120 dlm->purge_count++; 121 } 122 } else if (!list_empty(&res->purge)) { 123 mlog(0, "%s: Removing res %.*s from purge list\n", 124 dlm->name, res->lockname.len, res->lockname.name); 125 126 list_del_init(&res->purge); 127 dlm_lockres_put(res); 128 dlm->purge_count--; 129 } 130 } 131 132 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 133 struct dlm_lock_resource *res) 134 { 135 spin_lock(&dlm->spinlock); 136 spin_lock(&res->spinlock); 137 138 __dlm_lockres_calc_usage(dlm, res); 139 140 spin_unlock(&res->spinlock); 141 spin_unlock(&dlm->spinlock); 142 } 143 144 /* 145 * Do the real purge work: 146 * unhash the lockres, and 147 * clear flag DLM_LOCK_RES_DROPPING_REF. 148 * It requires dlm and lockres spinlock to be taken. 149 */ 150 void __dlm_do_purge_lockres(struct dlm_ctxt *dlm, 151 struct dlm_lock_resource *res) 152 { 153 assert_spin_locked(&dlm->spinlock); 154 assert_spin_locked(&res->spinlock); 155 156 if (!list_empty(&res->purge)) { 157 mlog(0, "%s: Removing res %.*s from purgelist\n", 158 dlm->name, res->lockname.len, res->lockname.name); 159 list_del_init(&res->purge); 160 dlm_lockres_put(res); 161 dlm->purge_count--; 162 } 163 164 if (!__dlm_lockres_unused(res)) { 165 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 166 dlm->name, res->lockname.len, res->lockname.name); 167 __dlm_print_one_lock_resource(res); 168 BUG(); 169 } 170 171 __dlm_unhash_lockres(dlm, res); 172 173 spin_lock(&dlm->track_lock); 174 if (!list_empty(&res->tracking)) 175 list_del_init(&res->tracking); 176 else { 177 mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n", 178 dlm->name, res->lockname.len, res->lockname.name); 179 __dlm_print_one_lock_resource(res); 180 } 181 spin_unlock(&dlm->track_lock); 182 183 /* 184 * lockres is not in the hash now. drop the flag and wake up 185 * any processes waiting in dlm_get_lock_resource. 186 */ 187 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 188 } 189 190 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 191 struct dlm_lock_resource *res) 192 { 193 int master; 194 int ret = 0; 195 196 assert_spin_locked(&dlm->spinlock); 197 assert_spin_locked(&res->spinlock); 198 199 master = (res->owner == dlm->node_num); 200 201 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 202 res->lockname.len, res->lockname.name, master); 203 204 if (!master) { 205 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 206 mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n", 207 dlm->name, res->lockname.len, res->lockname.name); 208 spin_unlock(&res->spinlock); 209 return; 210 } 211 212 res->state |= DLM_LOCK_RES_DROPPING_REF; 213 /* drop spinlock... retake below */ 214 spin_unlock(&res->spinlock); 215 spin_unlock(&dlm->spinlock); 216 217 spin_lock(&res->spinlock); 218 /* This ensures that clear refmap is sent after the set */ 219 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 220 spin_unlock(&res->spinlock); 221 222 /* clear our bit from the master's refmap, ignore errors */ 223 ret = dlm_drop_lockres_ref(dlm, res); 224 if (ret < 0) { 225 if (!dlm_is_host_down(ret)) 226 BUG(); 227 } 228 spin_lock(&dlm->spinlock); 229 spin_lock(&res->spinlock); 230 } 231 232 if (!list_empty(&res->purge)) { 233 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 234 dlm->name, res->lockname.len, res->lockname.name, master); 235 list_del_init(&res->purge); 236 dlm_lockres_put(res); 237 dlm->purge_count--; 238 } 239 240 if (!master && ret == DLM_DEREF_RESPONSE_INPROG) { 241 mlog(0, "%s: deref %.*s in progress\n", 242 dlm->name, res->lockname.len, res->lockname.name); 243 spin_unlock(&res->spinlock); 244 return; 245 } 246 247 if (!__dlm_lockres_unused(res)) { 248 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 249 dlm->name, res->lockname.len, res->lockname.name); 250 __dlm_print_one_lock_resource(res); 251 BUG(); 252 } 253 254 __dlm_unhash_lockres(dlm, res); 255 256 spin_lock(&dlm->track_lock); 257 if (!list_empty(&res->tracking)) 258 list_del_init(&res->tracking); 259 else { 260 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n", 261 res->lockname.len, res->lockname.name); 262 __dlm_print_one_lock_resource(res); 263 } 264 spin_unlock(&dlm->track_lock); 265 266 /* lockres is not in the hash now. drop the flag and wake up 267 * any processes waiting in dlm_get_lock_resource. */ 268 if (!master) { 269 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 270 spin_unlock(&res->spinlock); 271 wake_up(&res->wq); 272 } else 273 spin_unlock(&res->spinlock); 274 } 275 276 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 277 int purge_now) 278 { 279 unsigned int run_max, unused; 280 unsigned long purge_jiffies; 281 struct dlm_lock_resource *lockres; 282 283 spin_lock(&dlm->spinlock); 284 run_max = dlm->purge_count; 285 286 while(run_max && !list_empty(&dlm->purge_list)) { 287 run_max--; 288 289 lockres = list_entry(dlm->purge_list.next, 290 struct dlm_lock_resource, purge); 291 292 spin_lock(&lockres->spinlock); 293 294 purge_jiffies = lockres->last_used + 295 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 296 297 /* Make sure that we want to be processing this guy at 298 * this time. */ 299 if (!purge_now && time_after(purge_jiffies, jiffies)) { 300 /* Since resources are added to the purge list 301 * in tail order, we can stop at the first 302 * unpurgable resource -- anyone added after 303 * him will have a greater last_used value */ 304 spin_unlock(&lockres->spinlock); 305 break; 306 } 307 308 /* Status of the lockres *might* change so double 309 * check. If the lockres is unused, holding the dlm 310 * spinlock will prevent people from getting and more 311 * refs on it. */ 312 unused = __dlm_lockres_unused(lockres); 313 if (!unused || 314 (lockres->state & DLM_LOCK_RES_MIGRATING) || 315 (lockres->inflight_assert_workers != 0)) { 316 mlog(0, "%s: res %.*s is in use or being remastered, " 317 "used %d, state %d, assert master workers %u\n", 318 dlm->name, lockres->lockname.len, 319 lockres->lockname.name, 320 !unused, lockres->state, 321 lockres->inflight_assert_workers); 322 list_move_tail(&lockres->purge, &dlm->purge_list); 323 spin_unlock(&lockres->spinlock); 324 continue; 325 } 326 327 dlm_lockres_get(lockres); 328 329 dlm_purge_lockres(dlm, lockres); 330 331 dlm_lockres_put(lockres); 332 333 /* Avoid adding any scheduling latencies */ 334 cond_resched_lock(&dlm->spinlock); 335 } 336 337 spin_unlock(&dlm->spinlock); 338 } 339 340 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 341 struct dlm_lock_resource *res) 342 { 343 struct dlm_lock *lock, *target; 344 int can_grant = 1; 345 346 /* 347 * Because this function is called with the lockres 348 * spinlock, and because we know that it is not migrating/ 349 * recovering/in-progress, it is fine to reserve asts and 350 * basts right before queueing them all throughout 351 */ 352 assert_spin_locked(&dlm->ast_lock); 353 assert_spin_locked(&res->spinlock); 354 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 355 DLM_LOCK_RES_RECOVERING| 356 DLM_LOCK_RES_IN_PROGRESS))); 357 358 converting: 359 if (list_empty(&res->converting)) 360 goto blocked; 361 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 362 res->lockname.len, res->lockname.name); 363 364 target = list_entry(res->converting.next, struct dlm_lock, list); 365 if (target->ml.convert_type == LKM_IVMODE) { 366 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 367 dlm->name, res->lockname.len, res->lockname.name); 368 BUG(); 369 } 370 list_for_each_entry(lock, &res->granted, list) { 371 if (lock==target) 372 continue; 373 if (!dlm_lock_compatible(lock->ml.type, 374 target->ml.convert_type)) { 375 can_grant = 0; 376 /* queue the BAST if not already */ 377 if (lock->ml.highest_blocked == LKM_IVMODE) { 378 __dlm_lockres_reserve_ast(res); 379 __dlm_queue_bast(dlm, lock); 380 } 381 /* update the highest_blocked if needed */ 382 if (lock->ml.highest_blocked < target->ml.convert_type) 383 lock->ml.highest_blocked = 384 target->ml.convert_type; 385 } 386 } 387 388 list_for_each_entry(lock, &res->converting, list) { 389 if (lock==target) 390 continue; 391 if (!dlm_lock_compatible(lock->ml.type, 392 target->ml.convert_type)) { 393 can_grant = 0; 394 if (lock->ml.highest_blocked == LKM_IVMODE) { 395 __dlm_lockres_reserve_ast(res); 396 __dlm_queue_bast(dlm, lock); 397 } 398 if (lock->ml.highest_blocked < target->ml.convert_type) 399 lock->ml.highest_blocked = 400 target->ml.convert_type; 401 } 402 } 403 404 /* we can convert the lock */ 405 if (can_grant) { 406 spin_lock(&target->spinlock); 407 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 408 409 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 410 "%d => %d, node %u\n", dlm->name, res->lockname.len, 411 res->lockname.name, 412 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 413 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 414 target->ml.type, 415 target->ml.convert_type, target->ml.node); 416 417 target->ml.type = target->ml.convert_type; 418 target->ml.convert_type = LKM_IVMODE; 419 list_move_tail(&target->list, &res->granted); 420 421 BUG_ON(!target->lksb); 422 target->lksb->status = DLM_NORMAL; 423 424 spin_unlock(&target->spinlock); 425 426 __dlm_lockres_reserve_ast(res); 427 __dlm_queue_ast(dlm, target); 428 /* go back and check for more */ 429 goto converting; 430 } 431 432 blocked: 433 if (list_empty(&res->blocked)) 434 goto leave; 435 target = list_entry(res->blocked.next, struct dlm_lock, list); 436 437 list_for_each_entry(lock, &res->granted, list) { 438 if (lock==target) 439 continue; 440 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 441 can_grant = 0; 442 if (lock->ml.highest_blocked == LKM_IVMODE) { 443 __dlm_lockres_reserve_ast(res); 444 __dlm_queue_bast(dlm, lock); 445 } 446 if (lock->ml.highest_blocked < target->ml.type) 447 lock->ml.highest_blocked = target->ml.type; 448 } 449 } 450 451 list_for_each_entry(lock, &res->converting, list) { 452 if (lock==target) 453 continue; 454 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 455 can_grant = 0; 456 if (lock->ml.highest_blocked == LKM_IVMODE) { 457 __dlm_lockres_reserve_ast(res); 458 __dlm_queue_bast(dlm, lock); 459 } 460 if (lock->ml.highest_blocked < target->ml.type) 461 lock->ml.highest_blocked = target->ml.type; 462 } 463 } 464 465 /* we can grant the blocked lock (only 466 * possible if converting list empty) */ 467 if (can_grant) { 468 spin_lock(&target->spinlock); 469 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 470 471 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 472 "node %u\n", dlm->name, res->lockname.len, 473 res->lockname.name, 474 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 475 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 476 target->ml.type, target->ml.node); 477 478 /* target->ml.type is already correct */ 479 list_move_tail(&target->list, &res->granted); 480 481 BUG_ON(!target->lksb); 482 target->lksb->status = DLM_NORMAL; 483 484 spin_unlock(&target->spinlock); 485 486 __dlm_lockres_reserve_ast(res); 487 __dlm_queue_ast(dlm, target); 488 /* go back and check for more */ 489 goto converting; 490 } 491 492 leave: 493 return; 494 } 495 496 /* must have NO locks when calling this with res !=NULL * */ 497 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 498 { 499 if (res) { 500 spin_lock(&dlm->spinlock); 501 spin_lock(&res->spinlock); 502 __dlm_dirty_lockres(dlm, res); 503 spin_unlock(&res->spinlock); 504 spin_unlock(&dlm->spinlock); 505 } 506 wake_up(&dlm->dlm_thread_wq); 507 } 508 509 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 510 { 511 assert_spin_locked(&dlm->spinlock); 512 assert_spin_locked(&res->spinlock); 513 514 /* don't shuffle secondary queues */ 515 if (res->owner == dlm->node_num) { 516 if (res->state & (DLM_LOCK_RES_MIGRATING | 517 DLM_LOCK_RES_BLOCK_DIRTY)) 518 return; 519 520 if (list_empty(&res->dirty)) { 521 /* ref for dirty_list */ 522 dlm_lockres_get(res); 523 list_add_tail(&res->dirty, &dlm->dirty_list); 524 res->state |= DLM_LOCK_RES_DIRTY; 525 } 526 } 527 528 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 529 res->lockname.name); 530 } 531 532 533 /* Launch the NM thread for the mounted volume */ 534 int dlm_launch_thread(struct dlm_ctxt *dlm) 535 { 536 mlog(0, "Starting dlm_thread...\n"); 537 538 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s", 539 dlm->name); 540 if (IS_ERR(dlm->dlm_thread_task)) { 541 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 542 dlm->dlm_thread_task = NULL; 543 return -EINVAL; 544 } 545 546 return 0; 547 } 548 549 void dlm_complete_thread(struct dlm_ctxt *dlm) 550 { 551 if (dlm->dlm_thread_task) { 552 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 553 kthread_stop(dlm->dlm_thread_task); 554 dlm->dlm_thread_task = NULL; 555 } 556 } 557 558 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 559 { 560 int empty; 561 562 spin_lock(&dlm->spinlock); 563 empty = list_empty(&dlm->dirty_list); 564 spin_unlock(&dlm->spinlock); 565 566 return empty; 567 } 568 569 static void dlm_flush_asts(struct dlm_ctxt *dlm) 570 { 571 int ret; 572 struct dlm_lock *lock; 573 struct dlm_lock_resource *res; 574 u8 hi; 575 576 spin_lock(&dlm->ast_lock); 577 while (!list_empty(&dlm->pending_asts)) { 578 lock = list_entry(dlm->pending_asts.next, 579 struct dlm_lock, ast_list); 580 /* get an extra ref on lock */ 581 dlm_lock_get(lock); 582 res = lock->lockres; 583 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 584 "node %u\n", dlm->name, res->lockname.len, 585 res->lockname.name, 586 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 587 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 588 lock->ml.type, lock->ml.node); 589 590 BUG_ON(!lock->ast_pending); 591 592 /* remove from list (including ref) */ 593 list_del_init(&lock->ast_list); 594 dlm_lock_put(lock); 595 spin_unlock(&dlm->ast_lock); 596 597 if (lock->ml.node != dlm->node_num) { 598 ret = dlm_do_remote_ast(dlm, res, lock); 599 if (ret < 0) 600 mlog_errno(ret); 601 } else 602 dlm_do_local_ast(dlm, res, lock); 603 604 spin_lock(&dlm->ast_lock); 605 606 /* possible that another ast was queued while 607 * we were delivering the last one */ 608 if (!list_empty(&lock->ast_list)) { 609 mlog(0, "%s: res %.*s, AST queued while flushing last " 610 "one\n", dlm->name, res->lockname.len, 611 res->lockname.name); 612 } else 613 lock->ast_pending = 0; 614 615 /* drop the extra ref. 616 * this may drop it completely. */ 617 dlm_lock_put(lock); 618 dlm_lockres_release_ast(dlm, res); 619 } 620 621 while (!list_empty(&dlm->pending_basts)) { 622 lock = list_entry(dlm->pending_basts.next, 623 struct dlm_lock, bast_list); 624 /* get an extra ref on lock */ 625 dlm_lock_get(lock); 626 res = lock->lockres; 627 628 BUG_ON(!lock->bast_pending); 629 630 /* get the highest blocked lock, and reset */ 631 spin_lock(&lock->spinlock); 632 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 633 hi = lock->ml.highest_blocked; 634 lock->ml.highest_blocked = LKM_IVMODE; 635 spin_unlock(&lock->spinlock); 636 637 /* remove from list (including ref) */ 638 list_del_init(&lock->bast_list); 639 dlm_lock_put(lock); 640 spin_unlock(&dlm->ast_lock); 641 642 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 643 "blocked %d, node %u\n", 644 dlm->name, res->lockname.len, res->lockname.name, 645 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 646 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 647 hi, lock->ml.node); 648 649 if (lock->ml.node != dlm->node_num) { 650 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 651 if (ret < 0) 652 mlog_errno(ret); 653 } else 654 dlm_do_local_bast(dlm, res, lock, hi); 655 656 spin_lock(&dlm->ast_lock); 657 658 /* possible that another bast was queued while 659 * we were delivering the last one */ 660 if (!list_empty(&lock->bast_list)) { 661 mlog(0, "%s: res %.*s, BAST queued while flushing last " 662 "one\n", dlm->name, res->lockname.len, 663 res->lockname.name); 664 } else 665 lock->bast_pending = 0; 666 667 /* drop the extra ref. 668 * this may drop it completely. */ 669 dlm_lock_put(lock); 670 dlm_lockres_release_ast(dlm, res); 671 } 672 wake_up(&dlm->ast_wq); 673 spin_unlock(&dlm->ast_lock); 674 } 675 676 677 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 678 #define DLM_THREAD_MAX_DIRTY 100 679 680 static int dlm_thread(void *data) 681 { 682 struct dlm_lock_resource *res; 683 struct dlm_ctxt *dlm = data; 684 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 685 686 mlog(0, "dlm thread running for %s...\n", dlm->name); 687 688 while (!kthread_should_stop()) { 689 int n = DLM_THREAD_MAX_DIRTY; 690 691 /* dlm_shutting_down is very point-in-time, but that 692 * doesn't matter as we'll just loop back around if we 693 * get false on the leading edge of a state 694 * transition. */ 695 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 696 697 /* We really don't want to hold dlm->spinlock while 698 * calling dlm_shuffle_lists on each lockres that 699 * needs to have its queues adjusted and AST/BASTs 700 * run. So let's pull each entry off the dirty_list 701 * and drop dlm->spinlock ASAP. Once off the list, 702 * res->spinlock needs to be taken again to protect 703 * the queues while calling dlm_shuffle_lists. */ 704 spin_lock(&dlm->spinlock); 705 while (!list_empty(&dlm->dirty_list)) { 706 int delay = 0; 707 res = list_entry(dlm->dirty_list.next, 708 struct dlm_lock_resource, dirty); 709 710 /* peel a lockres off, remove it from the list, 711 * unset the dirty flag and drop the dlm lock */ 712 BUG_ON(!res); 713 dlm_lockres_get(res); 714 715 spin_lock(&res->spinlock); 716 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 717 list_del_init(&res->dirty); 718 spin_unlock(&res->spinlock); 719 spin_unlock(&dlm->spinlock); 720 /* Drop dirty_list ref */ 721 dlm_lockres_put(res); 722 723 /* lockres can be re-dirtied/re-added to the 724 * dirty_list in this gap, but that is ok */ 725 726 spin_lock(&dlm->ast_lock); 727 spin_lock(&res->spinlock); 728 if (res->owner != dlm->node_num) { 729 __dlm_print_one_lock_resource(res); 730 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 731 " dirty %d\n", dlm->name, 732 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 733 !!(res->state & DLM_LOCK_RES_MIGRATING), 734 !!(res->state & DLM_LOCK_RES_RECOVERING), 735 !!(res->state & DLM_LOCK_RES_DIRTY)); 736 } 737 BUG_ON(res->owner != dlm->node_num); 738 739 /* it is now ok to move lockreses in these states 740 * to the dirty list, assuming that they will only be 741 * dirty for a short while. */ 742 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 743 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 744 DLM_LOCK_RES_RECOVERING | 745 DLM_LOCK_RES_RECOVERY_WAITING)) { 746 /* move it to the tail and keep going */ 747 res->state &= ~DLM_LOCK_RES_DIRTY; 748 spin_unlock(&res->spinlock); 749 spin_unlock(&dlm->ast_lock); 750 mlog(0, "%s: res %.*s, inprogress, delay list " 751 "shuffle, state %d\n", dlm->name, 752 res->lockname.len, res->lockname.name, 753 res->state); 754 delay = 1; 755 goto in_progress; 756 } 757 758 /* at this point the lockres is not migrating/ 759 * recovering/in-progress. we have the lockres 760 * spinlock and do NOT have the dlm lock. 761 * safe to reserve/queue asts and run the lists. */ 762 763 /* called while holding lockres lock */ 764 dlm_shuffle_lists(dlm, res); 765 res->state &= ~DLM_LOCK_RES_DIRTY; 766 spin_unlock(&res->spinlock); 767 spin_unlock(&dlm->ast_lock); 768 769 dlm_lockres_calc_usage(dlm, res); 770 771 in_progress: 772 773 spin_lock(&dlm->spinlock); 774 /* if the lock was in-progress, stick 775 * it on the back of the list */ 776 if (delay) { 777 spin_lock(&res->spinlock); 778 __dlm_dirty_lockres(dlm, res); 779 spin_unlock(&res->spinlock); 780 } 781 dlm_lockres_put(res); 782 783 /* unlikely, but we may need to give time to 784 * other tasks */ 785 if (!--n) { 786 mlog(0, "%s: Throttling dlm thread\n", 787 dlm->name); 788 break; 789 } 790 } 791 792 spin_unlock(&dlm->spinlock); 793 dlm_flush_asts(dlm); 794 795 /* yield and continue right away if there is more work to do */ 796 if (!n) { 797 cond_resched(); 798 continue; 799 } 800 801 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 802 !dlm_dirty_list_empty(dlm) || 803 kthread_should_stop(), 804 timeout); 805 } 806 807 mlog(0, "quitting DLM thread\n"); 808 return 0; 809 } 810