1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * dlmthread.c 6 * 7 * standalone DLM module 8 * 9 * Copyright (C) 2004 Oracle. All rights reserved. 10 */ 11 12 13 #include <linux/module.h> 14 #include <linux/fs.h> 15 #include <linux/types.h> 16 #include <linux/highmem.h> 17 #include <linux/init.h> 18 #include <linux/sysctl.h> 19 #include <linux/random.h> 20 #include <linux/blkdev.h> 21 #include <linux/socket.h> 22 #include <linux/inet.h> 23 #include <linux/timer.h> 24 #include <linux/kthread.h> 25 #include <linux/delay.h> 26 27 28 #include "cluster/heartbeat.h" 29 #include "cluster/nodemanager.h" 30 #include "cluster/tcp.h" 31 32 #include "dlmapi.h" 33 #include "dlmcommon.h" 34 #include "dlmdomain.h" 35 36 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 37 #include "cluster/masklog.h" 38 39 static int dlm_thread(void *data); 40 static void dlm_flush_asts(struct dlm_ctxt *dlm); 41 42 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 43 44 /* will exit holding res->spinlock, but may drop in function */ 45 /* waits until flags are cleared on res->state */ 46 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 47 { 48 DECLARE_WAITQUEUE(wait, current); 49 50 assert_spin_locked(&res->spinlock); 51 52 add_wait_queue(&res->wq, &wait); 53 repeat: 54 set_current_state(TASK_UNINTERRUPTIBLE); 55 if (res->state & flags) { 56 spin_unlock(&res->spinlock); 57 schedule(); 58 spin_lock(&res->spinlock); 59 goto repeat; 60 } 61 remove_wait_queue(&res->wq, &wait); 62 __set_current_state(TASK_RUNNING); 63 } 64 65 int __dlm_lockres_has_locks(struct dlm_lock_resource *res) 66 { 67 if (list_empty(&res->granted) && 68 list_empty(&res->converting) && 69 list_empty(&res->blocked)) 70 return 0; 71 return 1; 72 } 73 74 /* "unused": the lockres has no locks, is not on the dirty list, 75 * has no inflight locks (in the gap between mastery and acquiring 76 * the first lock), and has no bits in its refmap. 77 * truly ready to be freed. */ 78 int __dlm_lockres_unused(struct dlm_lock_resource *res) 79 { 80 int bit; 81 82 assert_spin_locked(&res->spinlock); 83 84 if (__dlm_lockres_has_locks(res)) 85 return 0; 86 87 /* Locks are in the process of being created */ 88 if (res->inflight_locks) 89 return 0; 90 91 if (!list_empty(&res->dirty) || res->state & DLM_LOCK_RES_DIRTY) 92 return 0; 93 94 if (res->state & (DLM_LOCK_RES_RECOVERING| 95 DLM_LOCK_RES_RECOVERY_WAITING)) 96 return 0; 97 98 /* Another node has this resource with this node as the master */ 99 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0); 100 if (bit < O2NM_MAX_NODES) 101 return 0; 102 103 return 1; 104 } 105 106 107 /* Call whenever you may have added or deleted something from one of 108 * the lockres queue's. This will figure out whether it belongs on the 109 * unused list or not and does the appropriate thing. */ 110 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 111 struct dlm_lock_resource *res) 112 { 113 assert_spin_locked(&dlm->spinlock); 114 assert_spin_locked(&res->spinlock); 115 116 if (__dlm_lockres_unused(res)){ 117 if (list_empty(&res->purge)) { 118 mlog(0, "%s: Adding res %.*s to purge list\n", 119 dlm->name, res->lockname.len, res->lockname.name); 120 121 res->last_used = jiffies; 122 dlm_lockres_get(res); 123 list_add_tail(&res->purge, &dlm->purge_list); 124 dlm->purge_count++; 125 } 126 } else if (!list_empty(&res->purge)) { 127 mlog(0, "%s: Removing res %.*s from purge list\n", 128 dlm->name, res->lockname.len, res->lockname.name); 129 130 list_del_init(&res->purge); 131 dlm_lockres_put(res); 132 dlm->purge_count--; 133 } 134 } 135 136 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 137 struct dlm_lock_resource *res) 138 { 139 spin_lock(&dlm->spinlock); 140 spin_lock(&res->spinlock); 141 142 __dlm_lockres_calc_usage(dlm, res); 143 144 spin_unlock(&res->spinlock); 145 spin_unlock(&dlm->spinlock); 146 } 147 148 /* 149 * Do the real purge work: 150 * unhash the lockres, and 151 * clear flag DLM_LOCK_RES_DROPPING_REF. 152 * It requires dlm and lockres spinlock to be taken. 153 */ 154 void __dlm_do_purge_lockres(struct dlm_ctxt *dlm, 155 struct dlm_lock_resource *res) 156 { 157 assert_spin_locked(&dlm->spinlock); 158 assert_spin_locked(&res->spinlock); 159 160 if (!list_empty(&res->purge)) { 161 mlog(0, "%s: Removing res %.*s from purgelist\n", 162 dlm->name, res->lockname.len, res->lockname.name); 163 list_del_init(&res->purge); 164 dlm_lockres_put(res); 165 dlm->purge_count--; 166 } 167 168 if (!__dlm_lockres_unused(res)) { 169 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 170 dlm->name, res->lockname.len, res->lockname.name); 171 __dlm_print_one_lock_resource(res); 172 BUG(); 173 } 174 175 __dlm_unhash_lockres(dlm, res); 176 177 spin_lock(&dlm->track_lock); 178 if (!list_empty(&res->tracking)) 179 list_del_init(&res->tracking); 180 else { 181 mlog(ML_ERROR, "%s: Resource %.*s not on the Tracking list\n", 182 dlm->name, res->lockname.len, res->lockname.name); 183 __dlm_print_one_lock_resource(res); 184 } 185 spin_unlock(&dlm->track_lock); 186 187 /* 188 * lockres is not in the hash now. drop the flag and wake up 189 * any processes waiting in dlm_get_lock_resource. 190 */ 191 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 192 } 193 194 static void dlm_purge_lockres(struct dlm_ctxt *dlm, 195 struct dlm_lock_resource *res) 196 { 197 int master; 198 int ret = 0; 199 200 assert_spin_locked(&dlm->spinlock); 201 assert_spin_locked(&res->spinlock); 202 203 master = (res->owner == dlm->node_num); 204 205 mlog(0, "%s: Purging res %.*s, master %d\n", dlm->name, 206 res->lockname.len, res->lockname.name, master); 207 208 if (!master) { 209 if (res->state & DLM_LOCK_RES_DROPPING_REF) { 210 mlog(ML_NOTICE, "%s: res %.*s already in DLM_LOCK_RES_DROPPING_REF state\n", 211 dlm->name, res->lockname.len, res->lockname.name); 212 spin_unlock(&res->spinlock); 213 return; 214 } 215 216 res->state |= DLM_LOCK_RES_DROPPING_REF; 217 /* drop spinlock... retake below */ 218 spin_unlock(&res->spinlock); 219 spin_unlock(&dlm->spinlock); 220 221 spin_lock(&res->spinlock); 222 /* This ensures that clear refmap is sent after the set */ 223 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG); 224 spin_unlock(&res->spinlock); 225 226 /* clear our bit from the master's refmap, ignore errors */ 227 ret = dlm_drop_lockres_ref(dlm, res); 228 if (ret < 0) { 229 if (!dlm_is_host_down(ret)) 230 BUG(); 231 } 232 spin_lock(&dlm->spinlock); 233 spin_lock(&res->spinlock); 234 } 235 236 if (!list_empty(&res->purge)) { 237 mlog(0, "%s: Removing res %.*s from purgelist, master %d\n", 238 dlm->name, res->lockname.len, res->lockname.name, master); 239 list_del_init(&res->purge); 240 dlm_lockres_put(res); 241 dlm->purge_count--; 242 } 243 244 if (!master && ret == DLM_DEREF_RESPONSE_INPROG) { 245 mlog(0, "%s: deref %.*s in progress\n", 246 dlm->name, res->lockname.len, res->lockname.name); 247 spin_unlock(&res->spinlock); 248 return; 249 } 250 251 if (!__dlm_lockres_unused(res)) { 252 mlog(ML_ERROR, "%s: res %.*s in use after deref\n", 253 dlm->name, res->lockname.len, res->lockname.name); 254 __dlm_print_one_lock_resource(res); 255 BUG(); 256 } 257 258 __dlm_unhash_lockres(dlm, res); 259 260 spin_lock(&dlm->track_lock); 261 if (!list_empty(&res->tracking)) 262 list_del_init(&res->tracking); 263 else { 264 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n", 265 res->lockname.len, res->lockname.name); 266 __dlm_print_one_lock_resource(res); 267 } 268 spin_unlock(&dlm->track_lock); 269 270 /* lockres is not in the hash now. drop the flag and wake up 271 * any processes waiting in dlm_get_lock_resource. */ 272 if (!master) { 273 res->state &= ~DLM_LOCK_RES_DROPPING_REF; 274 spin_unlock(&res->spinlock); 275 wake_up(&res->wq); 276 } else 277 spin_unlock(&res->spinlock); 278 } 279 280 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 281 int purge_now) 282 { 283 unsigned int run_max, unused; 284 unsigned long purge_jiffies; 285 struct dlm_lock_resource *lockres; 286 287 spin_lock(&dlm->spinlock); 288 run_max = dlm->purge_count; 289 290 while(run_max && !list_empty(&dlm->purge_list)) { 291 run_max--; 292 293 lockres = list_entry(dlm->purge_list.next, 294 struct dlm_lock_resource, purge); 295 296 spin_lock(&lockres->spinlock); 297 298 purge_jiffies = lockres->last_used + 299 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 300 301 /* Make sure that we want to be processing this guy at 302 * this time. */ 303 if (!purge_now && time_after(purge_jiffies, jiffies)) { 304 /* Since resources are added to the purge list 305 * in tail order, we can stop at the first 306 * unpurgable resource -- anyone added after 307 * him will have a greater last_used value */ 308 spin_unlock(&lockres->spinlock); 309 break; 310 } 311 312 /* Status of the lockres *might* change so double 313 * check. If the lockres is unused, holding the dlm 314 * spinlock will prevent people from getting and more 315 * refs on it. */ 316 unused = __dlm_lockres_unused(lockres); 317 if (!unused || 318 (lockres->state & DLM_LOCK_RES_MIGRATING) || 319 (lockres->inflight_assert_workers != 0)) { 320 mlog(0, "%s: res %.*s is in use or being remastered, " 321 "used %d, state %d, assert master workers %u\n", 322 dlm->name, lockres->lockname.len, 323 lockres->lockname.name, 324 !unused, lockres->state, 325 lockres->inflight_assert_workers); 326 list_move_tail(&lockres->purge, &dlm->purge_list); 327 spin_unlock(&lockres->spinlock); 328 continue; 329 } 330 331 dlm_lockres_get(lockres); 332 333 dlm_purge_lockres(dlm, lockres); 334 335 dlm_lockres_put(lockres); 336 337 /* Avoid adding any scheduling latencies */ 338 cond_resched_lock(&dlm->spinlock); 339 } 340 341 spin_unlock(&dlm->spinlock); 342 } 343 344 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 345 struct dlm_lock_resource *res) 346 { 347 struct dlm_lock *lock, *target; 348 int can_grant = 1; 349 350 /* 351 * Because this function is called with the lockres 352 * spinlock, and because we know that it is not migrating/ 353 * recovering/in-progress, it is fine to reserve asts and 354 * basts right before queueing them all throughout 355 */ 356 assert_spin_locked(&dlm->ast_lock); 357 assert_spin_locked(&res->spinlock); 358 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 359 DLM_LOCK_RES_RECOVERING| 360 DLM_LOCK_RES_IN_PROGRESS))); 361 362 converting: 363 if (list_empty(&res->converting)) 364 goto blocked; 365 mlog(0, "%s: res %.*s has locks on the convert queue\n", dlm->name, 366 res->lockname.len, res->lockname.name); 367 368 target = list_entry(res->converting.next, struct dlm_lock, list); 369 if (target->ml.convert_type == LKM_IVMODE) { 370 mlog(ML_ERROR, "%s: res %.*s converting lock to invalid mode\n", 371 dlm->name, res->lockname.len, res->lockname.name); 372 BUG(); 373 } 374 list_for_each_entry(lock, &res->granted, list) { 375 if (lock==target) 376 continue; 377 if (!dlm_lock_compatible(lock->ml.type, 378 target->ml.convert_type)) { 379 can_grant = 0; 380 /* queue the BAST if not already */ 381 if (lock->ml.highest_blocked == LKM_IVMODE) { 382 __dlm_lockres_reserve_ast(res); 383 __dlm_queue_bast(dlm, lock); 384 } 385 /* update the highest_blocked if needed */ 386 if (lock->ml.highest_blocked < target->ml.convert_type) 387 lock->ml.highest_blocked = 388 target->ml.convert_type; 389 } 390 } 391 392 list_for_each_entry(lock, &res->converting, list) { 393 if (lock==target) 394 continue; 395 if (!dlm_lock_compatible(lock->ml.type, 396 target->ml.convert_type)) { 397 can_grant = 0; 398 if (lock->ml.highest_blocked == LKM_IVMODE) { 399 __dlm_lockres_reserve_ast(res); 400 __dlm_queue_bast(dlm, lock); 401 } 402 if (lock->ml.highest_blocked < target->ml.convert_type) 403 lock->ml.highest_blocked = 404 target->ml.convert_type; 405 } 406 } 407 408 /* we can convert the lock */ 409 if (can_grant) { 410 spin_lock(&target->spinlock); 411 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 412 413 mlog(0, "%s: res %.*s, AST for Converting lock %u:%llu, type " 414 "%d => %d, node %u\n", dlm->name, res->lockname.len, 415 res->lockname.name, 416 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 417 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 418 target->ml.type, 419 target->ml.convert_type, target->ml.node); 420 421 target->ml.type = target->ml.convert_type; 422 target->ml.convert_type = LKM_IVMODE; 423 list_move_tail(&target->list, &res->granted); 424 425 BUG_ON(!target->lksb); 426 target->lksb->status = DLM_NORMAL; 427 428 spin_unlock(&target->spinlock); 429 430 __dlm_lockres_reserve_ast(res); 431 __dlm_queue_ast(dlm, target); 432 /* go back and check for more */ 433 goto converting; 434 } 435 436 blocked: 437 if (list_empty(&res->blocked)) 438 goto leave; 439 target = list_entry(res->blocked.next, struct dlm_lock, list); 440 441 list_for_each_entry(lock, &res->granted, list) { 442 if (lock==target) 443 continue; 444 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 445 can_grant = 0; 446 if (lock->ml.highest_blocked == LKM_IVMODE) { 447 __dlm_lockres_reserve_ast(res); 448 __dlm_queue_bast(dlm, lock); 449 } 450 if (lock->ml.highest_blocked < target->ml.type) 451 lock->ml.highest_blocked = target->ml.type; 452 } 453 } 454 455 list_for_each_entry(lock, &res->converting, list) { 456 if (lock==target) 457 continue; 458 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 459 can_grant = 0; 460 if (lock->ml.highest_blocked == LKM_IVMODE) { 461 __dlm_lockres_reserve_ast(res); 462 __dlm_queue_bast(dlm, lock); 463 } 464 if (lock->ml.highest_blocked < target->ml.type) 465 lock->ml.highest_blocked = target->ml.type; 466 } 467 } 468 469 /* we can grant the blocked lock (only 470 * possible if converting list empty) */ 471 if (can_grant) { 472 spin_lock(&target->spinlock); 473 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 474 475 mlog(0, "%s: res %.*s, AST for Blocked lock %u:%llu, type %d, " 476 "node %u\n", dlm->name, res->lockname.len, 477 res->lockname.name, 478 dlm_get_lock_cookie_node(be64_to_cpu(target->ml.cookie)), 479 dlm_get_lock_cookie_seq(be64_to_cpu(target->ml.cookie)), 480 target->ml.type, target->ml.node); 481 482 /* target->ml.type is already correct */ 483 list_move_tail(&target->list, &res->granted); 484 485 BUG_ON(!target->lksb); 486 target->lksb->status = DLM_NORMAL; 487 488 spin_unlock(&target->spinlock); 489 490 __dlm_lockres_reserve_ast(res); 491 __dlm_queue_ast(dlm, target); 492 /* go back and check for more */ 493 goto converting; 494 } 495 496 leave: 497 return; 498 } 499 500 /* must have NO locks when calling this with res !=NULL * */ 501 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 502 { 503 if (res) { 504 spin_lock(&dlm->spinlock); 505 spin_lock(&res->spinlock); 506 __dlm_dirty_lockres(dlm, res); 507 spin_unlock(&res->spinlock); 508 spin_unlock(&dlm->spinlock); 509 } 510 wake_up(&dlm->dlm_thread_wq); 511 } 512 513 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 514 { 515 assert_spin_locked(&dlm->spinlock); 516 assert_spin_locked(&res->spinlock); 517 518 /* don't shuffle secondary queues */ 519 if (res->owner == dlm->node_num) { 520 if (res->state & (DLM_LOCK_RES_MIGRATING | 521 DLM_LOCK_RES_BLOCK_DIRTY)) 522 return; 523 524 if (list_empty(&res->dirty)) { 525 /* ref for dirty_list */ 526 dlm_lockres_get(res); 527 list_add_tail(&res->dirty, &dlm->dirty_list); 528 res->state |= DLM_LOCK_RES_DIRTY; 529 } 530 } 531 532 mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len, 533 res->lockname.name); 534 } 535 536 537 /* Launch the NM thread for the mounted volume */ 538 int dlm_launch_thread(struct dlm_ctxt *dlm) 539 { 540 mlog(0, "Starting dlm_thread...\n"); 541 542 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm-%s", 543 dlm->name); 544 if (IS_ERR(dlm->dlm_thread_task)) { 545 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 546 dlm->dlm_thread_task = NULL; 547 return -EINVAL; 548 } 549 550 return 0; 551 } 552 553 void dlm_complete_thread(struct dlm_ctxt *dlm) 554 { 555 if (dlm->dlm_thread_task) { 556 mlog(ML_KTHREAD, "Waiting for dlm thread to exit\n"); 557 kthread_stop(dlm->dlm_thread_task); 558 dlm->dlm_thread_task = NULL; 559 } 560 } 561 562 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 563 { 564 int empty; 565 566 spin_lock(&dlm->spinlock); 567 empty = list_empty(&dlm->dirty_list); 568 spin_unlock(&dlm->spinlock); 569 570 return empty; 571 } 572 573 static void dlm_flush_asts(struct dlm_ctxt *dlm) 574 { 575 int ret; 576 struct dlm_lock *lock; 577 struct dlm_lock_resource *res; 578 u8 hi; 579 580 spin_lock(&dlm->ast_lock); 581 while (!list_empty(&dlm->pending_asts)) { 582 lock = list_entry(dlm->pending_asts.next, 583 struct dlm_lock, ast_list); 584 /* get an extra ref on lock */ 585 dlm_lock_get(lock); 586 res = lock->lockres; 587 mlog(0, "%s: res %.*s, Flush AST for lock %u:%llu, type %d, " 588 "node %u\n", dlm->name, res->lockname.len, 589 res->lockname.name, 590 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 591 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 592 lock->ml.type, lock->ml.node); 593 594 BUG_ON(!lock->ast_pending); 595 596 /* remove from list (including ref) */ 597 list_del_init(&lock->ast_list); 598 dlm_lock_put(lock); 599 spin_unlock(&dlm->ast_lock); 600 601 if (lock->ml.node != dlm->node_num) { 602 ret = dlm_do_remote_ast(dlm, res, lock); 603 if (ret < 0) 604 mlog_errno(ret); 605 } else 606 dlm_do_local_ast(dlm, res, lock); 607 608 spin_lock(&dlm->ast_lock); 609 610 /* possible that another ast was queued while 611 * we were delivering the last one */ 612 if (!list_empty(&lock->ast_list)) { 613 mlog(0, "%s: res %.*s, AST queued while flushing last " 614 "one\n", dlm->name, res->lockname.len, 615 res->lockname.name); 616 } else 617 lock->ast_pending = 0; 618 619 /* drop the extra ref. 620 * this may drop it completely. */ 621 dlm_lock_put(lock); 622 dlm_lockres_release_ast(dlm, res); 623 } 624 625 while (!list_empty(&dlm->pending_basts)) { 626 lock = list_entry(dlm->pending_basts.next, 627 struct dlm_lock, bast_list); 628 /* get an extra ref on lock */ 629 dlm_lock_get(lock); 630 res = lock->lockres; 631 632 BUG_ON(!lock->bast_pending); 633 634 /* get the highest blocked lock, and reset */ 635 spin_lock(&lock->spinlock); 636 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 637 hi = lock->ml.highest_blocked; 638 lock->ml.highest_blocked = LKM_IVMODE; 639 spin_unlock(&lock->spinlock); 640 641 /* remove from list (including ref) */ 642 list_del_init(&lock->bast_list); 643 dlm_lock_put(lock); 644 spin_unlock(&dlm->ast_lock); 645 646 mlog(0, "%s: res %.*s, Flush BAST for lock %u:%llu, " 647 "blocked %d, node %u\n", 648 dlm->name, res->lockname.len, res->lockname.name, 649 dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)), 650 dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)), 651 hi, lock->ml.node); 652 653 if (lock->ml.node != dlm->node_num) { 654 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 655 if (ret < 0) 656 mlog_errno(ret); 657 } else 658 dlm_do_local_bast(dlm, res, lock, hi); 659 660 spin_lock(&dlm->ast_lock); 661 662 /* possible that another bast was queued while 663 * we were delivering the last one */ 664 if (!list_empty(&lock->bast_list)) { 665 mlog(0, "%s: res %.*s, BAST queued while flushing last " 666 "one\n", dlm->name, res->lockname.len, 667 res->lockname.name); 668 } else 669 lock->bast_pending = 0; 670 671 /* drop the extra ref. 672 * this may drop it completely. */ 673 dlm_lock_put(lock); 674 dlm_lockres_release_ast(dlm, res); 675 } 676 wake_up(&dlm->ast_wq); 677 spin_unlock(&dlm->ast_lock); 678 } 679 680 681 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 682 #define DLM_THREAD_MAX_DIRTY 100 683 #define DLM_THREAD_MAX_ASTS 10 684 685 static int dlm_thread(void *data) 686 { 687 struct dlm_lock_resource *res; 688 struct dlm_ctxt *dlm = data; 689 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 690 691 mlog(0, "dlm thread running for %s...\n", dlm->name); 692 693 while (!kthread_should_stop()) { 694 int n = DLM_THREAD_MAX_DIRTY; 695 696 /* dlm_shutting_down is very point-in-time, but that 697 * doesn't matter as we'll just loop back around if we 698 * get false on the leading edge of a state 699 * transition. */ 700 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 701 702 /* We really don't want to hold dlm->spinlock while 703 * calling dlm_shuffle_lists on each lockres that 704 * needs to have its queues adjusted and AST/BASTs 705 * run. So let's pull each entry off the dirty_list 706 * and drop dlm->spinlock ASAP. Once off the list, 707 * res->spinlock needs to be taken again to protect 708 * the queues while calling dlm_shuffle_lists. */ 709 spin_lock(&dlm->spinlock); 710 while (!list_empty(&dlm->dirty_list)) { 711 int delay = 0; 712 res = list_entry(dlm->dirty_list.next, 713 struct dlm_lock_resource, dirty); 714 715 /* peel a lockres off, remove it from the list, 716 * unset the dirty flag and drop the dlm lock */ 717 BUG_ON(!res); 718 dlm_lockres_get(res); 719 720 spin_lock(&res->spinlock); 721 /* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */ 722 list_del_init(&res->dirty); 723 spin_unlock(&res->spinlock); 724 spin_unlock(&dlm->spinlock); 725 /* Drop dirty_list ref */ 726 dlm_lockres_put(res); 727 728 /* lockres can be re-dirtied/re-added to the 729 * dirty_list in this gap, but that is ok */ 730 731 spin_lock(&dlm->ast_lock); 732 spin_lock(&res->spinlock); 733 if (res->owner != dlm->node_num) { 734 __dlm_print_one_lock_resource(res); 735 mlog(ML_ERROR, "%s: inprog %d, mig %d, reco %d," 736 " dirty %d\n", dlm->name, 737 !!(res->state & DLM_LOCK_RES_IN_PROGRESS), 738 !!(res->state & DLM_LOCK_RES_MIGRATING), 739 !!(res->state & DLM_LOCK_RES_RECOVERING), 740 !!(res->state & DLM_LOCK_RES_DIRTY)); 741 } 742 BUG_ON(res->owner != dlm->node_num); 743 744 /* it is now ok to move lockreses in these states 745 * to the dirty list, assuming that they will only be 746 * dirty for a short while. */ 747 BUG_ON(res->state & DLM_LOCK_RES_MIGRATING); 748 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 749 DLM_LOCK_RES_RECOVERING | 750 DLM_LOCK_RES_RECOVERY_WAITING)) { 751 /* move it to the tail and keep going */ 752 res->state &= ~DLM_LOCK_RES_DIRTY; 753 spin_unlock(&res->spinlock); 754 spin_unlock(&dlm->ast_lock); 755 mlog(0, "%s: res %.*s, inprogress, delay list " 756 "shuffle, state %d\n", dlm->name, 757 res->lockname.len, res->lockname.name, 758 res->state); 759 delay = 1; 760 goto in_progress; 761 } 762 763 /* at this point the lockres is not migrating/ 764 * recovering/in-progress. we have the lockres 765 * spinlock and do NOT have the dlm lock. 766 * safe to reserve/queue asts and run the lists. */ 767 768 /* called while holding lockres lock */ 769 dlm_shuffle_lists(dlm, res); 770 res->state &= ~DLM_LOCK_RES_DIRTY; 771 spin_unlock(&res->spinlock); 772 spin_unlock(&dlm->ast_lock); 773 774 dlm_lockres_calc_usage(dlm, res); 775 776 in_progress: 777 778 spin_lock(&dlm->spinlock); 779 /* if the lock was in-progress, stick 780 * it on the back of the list */ 781 if (delay) { 782 spin_lock(&res->spinlock); 783 __dlm_dirty_lockres(dlm, res); 784 spin_unlock(&res->spinlock); 785 } 786 dlm_lockres_put(res); 787 788 /* unlikely, but we may need to give time to 789 * other tasks */ 790 if (!--n) { 791 mlog(0, "%s: Throttling dlm thread\n", 792 dlm->name); 793 break; 794 } 795 } 796 797 spin_unlock(&dlm->spinlock); 798 dlm_flush_asts(dlm); 799 800 /* yield and continue right away if there is more work to do */ 801 if (!n) { 802 cond_resched(); 803 continue; 804 } 805 806 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 807 !dlm_dirty_list_empty(dlm) || 808 kthread_should_stop(), 809 timeout); 810 } 811 812 mlog(0, "quitting DLM thread\n"); 813 return 0; 814 } 815