1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmthread.c 5 * 6 * standalone DLM module 7 * 8 * Copyright (C) 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 * 25 */ 26 27 28 #include <linux/module.h> 29 #include <linux/fs.h> 30 #include <linux/types.h> 31 #include <linux/slab.h> 32 #include <linux/highmem.h> 33 #include <linux/utsname.h> 34 #include <linux/init.h> 35 #include <linux/sysctl.h> 36 #include <linux/random.h> 37 #include <linux/blkdev.h> 38 #include <linux/socket.h> 39 #include <linux/inet.h> 40 #include <linux/timer.h> 41 #include <linux/kthread.h> 42 #include <linux/delay.h> 43 44 45 #include "cluster/heartbeat.h" 46 #include "cluster/nodemanager.h" 47 #include "cluster/tcp.h" 48 49 #include "dlmapi.h" 50 #include "dlmcommon.h" 51 #include "dlmdomain.h" 52 53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD) 54 #include "cluster/masklog.h" 55 56 static int dlm_thread(void *data); 57 static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, 58 struct dlm_lock_resource *lockres); 59 60 static void dlm_flush_asts(struct dlm_ctxt *dlm); 61 62 #define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num) 63 64 /* will exit holding res->spinlock, but may drop in function */ 65 /* waits until flags are cleared on res->state */ 66 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags) 67 { 68 DECLARE_WAITQUEUE(wait, current); 69 70 assert_spin_locked(&res->spinlock); 71 72 add_wait_queue(&res->wq, &wait); 73 repeat: 74 set_current_state(TASK_UNINTERRUPTIBLE); 75 if (res->state & flags) { 76 spin_unlock(&res->spinlock); 77 schedule(); 78 spin_lock(&res->spinlock); 79 goto repeat; 80 } 81 remove_wait_queue(&res->wq, &wait); 82 current->state = TASK_RUNNING; 83 } 84 85 86 int __dlm_lockres_unused(struct dlm_lock_resource *res) 87 { 88 if (list_empty(&res->granted) && 89 list_empty(&res->converting) && 90 list_empty(&res->blocked) && 91 list_empty(&res->dirty)) 92 return 1; 93 return 0; 94 } 95 96 97 /* Call whenever you may have added or deleted something from one of 98 * the lockres queue's. This will figure out whether it belongs on the 99 * unused list or not and does the appropriate thing. */ 100 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 101 struct dlm_lock_resource *res) 102 { 103 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); 104 105 assert_spin_locked(&dlm->spinlock); 106 assert_spin_locked(&res->spinlock); 107 108 if (__dlm_lockres_unused(res)){ 109 /* For now, just keep any resource we master */ 110 if (res->owner == dlm->node_num) 111 { 112 if (!list_empty(&res->purge)) { 113 mlog(0, "we master %s:%.*s, but it is on " 114 "the purge list. Removing\n", 115 dlm->name, res->lockname.len, 116 res->lockname.name); 117 list_del_init(&res->purge); 118 dlm->purge_count--; 119 } 120 return; 121 } 122 123 if (list_empty(&res->purge)) { 124 mlog(0, "putting lockres %.*s from purge list\n", 125 res->lockname.len, res->lockname.name); 126 127 res->last_used = jiffies; 128 list_add_tail(&res->purge, &dlm->purge_list); 129 dlm->purge_count++; 130 131 /* if this node is not the owner, there is 132 * no way to keep track of who the owner could be. 133 * unhash it to avoid serious problems. */ 134 if (res->owner != dlm->node_num) { 135 mlog(0, "%s:%.*s: doing immediate " 136 "purge of lockres owned by %u\n", 137 dlm->name, res->lockname.len, 138 res->lockname.name, res->owner); 139 140 dlm_purge_lockres_now(dlm, res); 141 } 142 } 143 } else if (!list_empty(&res->purge)) { 144 mlog(0, "removing lockres %.*s from purge list, " 145 "owner=%u\n", res->lockname.len, res->lockname.name, 146 res->owner); 147 148 list_del_init(&res->purge); 149 dlm->purge_count--; 150 } 151 } 152 153 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm, 154 struct dlm_lock_resource *res) 155 { 156 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name); 157 spin_lock(&dlm->spinlock); 158 spin_lock(&res->spinlock); 159 160 __dlm_lockres_calc_usage(dlm, res); 161 162 spin_unlock(&res->spinlock); 163 spin_unlock(&dlm->spinlock); 164 } 165 166 /* TODO: Eventual API: Called with the dlm spinlock held, may drop it 167 * to do migration, but will re-acquire before exit. */ 168 void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres) 169 { 170 int master; 171 int ret; 172 173 spin_lock(&lockres->spinlock); 174 master = lockres->owner == dlm->node_num; 175 spin_unlock(&lockres->spinlock); 176 177 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len, 178 lockres->lockname.name, master); 179 180 /* Non master is the easy case -- no migration required, just 181 * quit. */ 182 if (!master) 183 goto finish; 184 185 /* Wheee! Migrate lockres here! */ 186 spin_unlock(&dlm->spinlock); 187 again: 188 189 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES); 190 if (ret == -ENOTEMPTY) { 191 mlog(ML_ERROR, "lockres %.*s still has local locks!\n", 192 lockres->lockname.len, lockres->lockname.name); 193 194 BUG(); 195 } else if (ret < 0) { 196 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n", 197 lockres->lockname.len, lockres->lockname.name); 198 msleep(100); 199 goto again; 200 } 201 202 spin_lock(&dlm->spinlock); 203 204 finish: 205 if (!list_empty(&lockres->purge)) { 206 list_del_init(&lockres->purge); 207 dlm->purge_count--; 208 } 209 __dlm_unhash_lockres(lockres); 210 } 211 212 /* make an unused lockres go away immediately. 213 * as soon as the dlm spinlock is dropped, this lockres 214 * will not be found. kfree still happens on last put. */ 215 static void dlm_purge_lockres_now(struct dlm_ctxt *dlm, 216 struct dlm_lock_resource *lockres) 217 { 218 assert_spin_locked(&dlm->spinlock); 219 assert_spin_locked(&lockres->spinlock); 220 221 BUG_ON(!__dlm_lockres_unused(lockres)); 222 223 if (!list_empty(&lockres->purge)) { 224 list_del_init(&lockres->purge); 225 dlm->purge_count--; 226 } 227 __dlm_unhash_lockres(lockres); 228 } 229 230 static void dlm_run_purge_list(struct dlm_ctxt *dlm, 231 int purge_now) 232 { 233 unsigned int run_max, unused; 234 unsigned long purge_jiffies; 235 struct dlm_lock_resource *lockres; 236 237 spin_lock(&dlm->spinlock); 238 run_max = dlm->purge_count; 239 240 while(run_max && !list_empty(&dlm->purge_list)) { 241 run_max--; 242 243 lockres = list_entry(dlm->purge_list.next, 244 struct dlm_lock_resource, purge); 245 246 /* Status of the lockres *might* change so double 247 * check. If the lockres is unused, holding the dlm 248 * spinlock will prevent people from getting and more 249 * refs on it -- there's no need to keep the lockres 250 * spinlock. */ 251 spin_lock(&lockres->spinlock); 252 unused = __dlm_lockres_unused(lockres); 253 spin_unlock(&lockres->spinlock); 254 255 if (!unused) 256 continue; 257 258 purge_jiffies = lockres->last_used + 259 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS); 260 261 /* Make sure that we want to be processing this guy at 262 * this time. */ 263 if (!purge_now && time_after(purge_jiffies, jiffies)) { 264 /* Since resources are added to the purge list 265 * in tail order, we can stop at the first 266 * unpurgable resource -- anyone added after 267 * him will have a greater last_used value */ 268 break; 269 } 270 271 list_del_init(&lockres->purge); 272 dlm->purge_count--; 273 274 /* This may drop and reacquire the dlm spinlock if it 275 * has to do migration. */ 276 mlog(0, "calling dlm_purge_lockres!\n"); 277 dlm_purge_lockres(dlm, lockres); 278 mlog(0, "DONE calling dlm_purge_lockres!\n"); 279 280 /* Avoid adding any scheduling latencies */ 281 cond_resched_lock(&dlm->spinlock); 282 } 283 284 spin_unlock(&dlm->spinlock); 285 } 286 287 static void dlm_shuffle_lists(struct dlm_ctxt *dlm, 288 struct dlm_lock_resource *res) 289 { 290 struct dlm_lock *lock, *target; 291 struct list_head *iter; 292 struct list_head *head; 293 int can_grant = 1; 294 295 //mlog(0, "res->lockname.len=%d\n", res->lockname.len); 296 //mlog(0, "res->lockname.name=%p\n", res->lockname.name); 297 //mlog(0, "shuffle res %.*s\n", res->lockname.len, 298 // res->lockname.name); 299 300 /* because this function is called with the lockres 301 * spinlock, and because we know that it is not migrating/ 302 * recovering/in-progress, it is fine to reserve asts and 303 * basts right before queueing them all throughout */ 304 assert_spin_locked(&res->spinlock); 305 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING| 306 DLM_LOCK_RES_RECOVERING| 307 DLM_LOCK_RES_IN_PROGRESS))); 308 309 converting: 310 if (list_empty(&res->converting)) 311 goto blocked; 312 mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len, 313 res->lockname.name); 314 315 target = list_entry(res->converting.next, struct dlm_lock, list); 316 if (target->ml.convert_type == LKM_IVMODE) { 317 mlog(ML_ERROR, "%.*s: converting a lock with no " 318 "convert_type!\n", res->lockname.len, res->lockname.name); 319 BUG(); 320 } 321 head = &res->granted; 322 list_for_each(iter, head) { 323 lock = list_entry(iter, struct dlm_lock, list); 324 if (lock==target) 325 continue; 326 if (!dlm_lock_compatible(lock->ml.type, 327 target->ml.convert_type)) { 328 can_grant = 0; 329 /* queue the BAST if not already */ 330 if (lock->ml.highest_blocked == LKM_IVMODE) { 331 __dlm_lockres_reserve_ast(res); 332 dlm_queue_bast(dlm, lock); 333 } 334 /* update the highest_blocked if needed */ 335 if (lock->ml.highest_blocked < target->ml.convert_type) 336 lock->ml.highest_blocked = 337 target->ml.convert_type; 338 } 339 } 340 head = &res->converting; 341 list_for_each(iter, head) { 342 lock = list_entry(iter, struct dlm_lock, list); 343 if (lock==target) 344 continue; 345 if (!dlm_lock_compatible(lock->ml.type, 346 target->ml.convert_type)) { 347 can_grant = 0; 348 if (lock->ml.highest_blocked == LKM_IVMODE) { 349 __dlm_lockres_reserve_ast(res); 350 dlm_queue_bast(dlm, lock); 351 } 352 if (lock->ml.highest_blocked < target->ml.convert_type) 353 lock->ml.highest_blocked = 354 target->ml.convert_type; 355 } 356 } 357 358 /* we can convert the lock */ 359 if (can_grant) { 360 spin_lock(&target->spinlock); 361 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 362 363 mlog(0, "calling ast for converting lock: %.*s, have: %d, " 364 "granting: %d, node: %u\n", res->lockname.len, 365 res->lockname.name, target->ml.type, 366 target->ml.convert_type, target->ml.node); 367 368 target->ml.type = target->ml.convert_type; 369 target->ml.convert_type = LKM_IVMODE; 370 list_move_tail(&target->list, &res->granted); 371 372 BUG_ON(!target->lksb); 373 target->lksb->status = DLM_NORMAL; 374 375 spin_unlock(&target->spinlock); 376 377 __dlm_lockres_reserve_ast(res); 378 dlm_queue_ast(dlm, target); 379 /* go back and check for more */ 380 goto converting; 381 } 382 383 blocked: 384 if (list_empty(&res->blocked)) 385 goto leave; 386 target = list_entry(res->blocked.next, struct dlm_lock, list); 387 388 head = &res->granted; 389 list_for_each(iter, head) { 390 lock = list_entry(iter, struct dlm_lock, list); 391 if (lock==target) 392 continue; 393 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 394 can_grant = 0; 395 if (lock->ml.highest_blocked == LKM_IVMODE) { 396 __dlm_lockres_reserve_ast(res); 397 dlm_queue_bast(dlm, lock); 398 } 399 if (lock->ml.highest_blocked < target->ml.type) 400 lock->ml.highest_blocked = target->ml.type; 401 } 402 } 403 404 head = &res->converting; 405 list_for_each(iter, head) { 406 lock = list_entry(iter, struct dlm_lock, list); 407 if (lock==target) 408 continue; 409 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) { 410 can_grant = 0; 411 if (lock->ml.highest_blocked == LKM_IVMODE) { 412 __dlm_lockres_reserve_ast(res); 413 dlm_queue_bast(dlm, lock); 414 } 415 if (lock->ml.highest_blocked < target->ml.type) 416 lock->ml.highest_blocked = target->ml.type; 417 } 418 } 419 420 /* we can grant the blocked lock (only 421 * possible if converting list empty) */ 422 if (can_grant) { 423 spin_lock(&target->spinlock); 424 BUG_ON(target->ml.highest_blocked != LKM_IVMODE); 425 426 mlog(0, "calling ast for blocked lock: %.*s, granting: %d, " 427 "node: %u\n", res->lockname.len, res->lockname.name, 428 target->ml.type, target->ml.node); 429 430 // target->ml.type is already correct 431 list_move_tail(&target->list, &res->granted); 432 433 BUG_ON(!target->lksb); 434 target->lksb->status = DLM_NORMAL; 435 436 spin_unlock(&target->spinlock); 437 438 __dlm_lockres_reserve_ast(res); 439 dlm_queue_ast(dlm, target); 440 /* go back and check for more */ 441 goto converting; 442 } 443 444 leave: 445 return; 446 } 447 448 /* must have NO locks when calling this with res !=NULL * */ 449 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 450 { 451 mlog_entry("dlm=%p, res=%p\n", dlm, res); 452 if (res) { 453 spin_lock(&dlm->spinlock); 454 spin_lock(&res->spinlock); 455 __dlm_dirty_lockres(dlm, res); 456 spin_unlock(&res->spinlock); 457 spin_unlock(&dlm->spinlock); 458 } 459 wake_up(&dlm->dlm_thread_wq); 460 } 461 462 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res) 463 { 464 mlog_entry("dlm=%p, res=%p\n", dlm, res); 465 466 assert_spin_locked(&dlm->spinlock); 467 assert_spin_locked(&res->spinlock); 468 469 /* don't shuffle secondary queues */ 470 if ((res->owner == dlm->node_num) && 471 !(res->state & DLM_LOCK_RES_DIRTY)) { 472 /* ref for dirty_list */ 473 dlm_lockres_get(res); 474 list_add_tail(&res->dirty, &dlm->dirty_list); 475 res->state |= DLM_LOCK_RES_DIRTY; 476 } 477 } 478 479 480 /* Launch the NM thread for the mounted volume */ 481 int dlm_launch_thread(struct dlm_ctxt *dlm) 482 { 483 mlog(0, "starting dlm thread...\n"); 484 485 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread"); 486 if (IS_ERR(dlm->dlm_thread_task)) { 487 mlog_errno(PTR_ERR(dlm->dlm_thread_task)); 488 dlm->dlm_thread_task = NULL; 489 return -EINVAL; 490 } 491 492 return 0; 493 } 494 495 void dlm_complete_thread(struct dlm_ctxt *dlm) 496 { 497 if (dlm->dlm_thread_task) { 498 mlog(ML_KTHREAD, "waiting for dlm thread to exit\n"); 499 kthread_stop(dlm->dlm_thread_task); 500 dlm->dlm_thread_task = NULL; 501 } 502 } 503 504 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm) 505 { 506 int empty; 507 508 spin_lock(&dlm->spinlock); 509 empty = list_empty(&dlm->dirty_list); 510 spin_unlock(&dlm->spinlock); 511 512 return empty; 513 } 514 515 static void dlm_flush_asts(struct dlm_ctxt *dlm) 516 { 517 int ret; 518 struct dlm_lock *lock; 519 struct dlm_lock_resource *res; 520 u8 hi; 521 522 spin_lock(&dlm->ast_lock); 523 while (!list_empty(&dlm->pending_asts)) { 524 lock = list_entry(dlm->pending_asts.next, 525 struct dlm_lock, ast_list); 526 /* get an extra ref on lock */ 527 dlm_lock_get(lock); 528 res = lock->lockres; 529 mlog(0, "delivering an ast for this lockres\n"); 530 531 BUG_ON(!lock->ast_pending); 532 533 /* remove from list (including ref) */ 534 list_del_init(&lock->ast_list); 535 dlm_lock_put(lock); 536 spin_unlock(&dlm->ast_lock); 537 538 if (lock->ml.node != dlm->node_num) { 539 ret = dlm_do_remote_ast(dlm, res, lock); 540 if (ret < 0) 541 mlog_errno(ret); 542 } else 543 dlm_do_local_ast(dlm, res, lock); 544 545 spin_lock(&dlm->ast_lock); 546 547 /* possible that another ast was queued while 548 * we were delivering the last one */ 549 if (!list_empty(&lock->ast_list)) { 550 mlog(0, "aha another ast got queued while " 551 "we were finishing the last one. will " 552 "keep the ast_pending flag set.\n"); 553 } else 554 lock->ast_pending = 0; 555 556 /* drop the extra ref. 557 * this may drop it completely. */ 558 dlm_lock_put(lock); 559 dlm_lockres_release_ast(dlm, res); 560 } 561 562 while (!list_empty(&dlm->pending_basts)) { 563 lock = list_entry(dlm->pending_basts.next, 564 struct dlm_lock, bast_list); 565 /* get an extra ref on lock */ 566 dlm_lock_get(lock); 567 res = lock->lockres; 568 569 BUG_ON(!lock->bast_pending); 570 571 /* get the highest blocked lock, and reset */ 572 spin_lock(&lock->spinlock); 573 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE); 574 hi = lock->ml.highest_blocked; 575 lock->ml.highest_blocked = LKM_IVMODE; 576 spin_unlock(&lock->spinlock); 577 578 /* remove from list (including ref) */ 579 list_del_init(&lock->bast_list); 580 dlm_lock_put(lock); 581 spin_unlock(&dlm->ast_lock); 582 583 mlog(0, "delivering a bast for this lockres " 584 "(blocked = %d\n", hi); 585 586 if (lock->ml.node != dlm->node_num) { 587 ret = dlm_send_proxy_bast(dlm, res, lock, hi); 588 if (ret < 0) 589 mlog_errno(ret); 590 } else 591 dlm_do_local_bast(dlm, res, lock, hi); 592 593 spin_lock(&dlm->ast_lock); 594 595 /* possible that another bast was queued while 596 * we were delivering the last one */ 597 if (!list_empty(&lock->bast_list)) { 598 mlog(0, "aha another bast got queued while " 599 "we were finishing the last one. will " 600 "keep the bast_pending flag set.\n"); 601 } else 602 lock->bast_pending = 0; 603 604 /* drop the extra ref. 605 * this may drop it completely. */ 606 dlm_lock_put(lock); 607 dlm_lockres_release_ast(dlm, res); 608 } 609 wake_up(&dlm->ast_wq); 610 spin_unlock(&dlm->ast_lock); 611 } 612 613 614 #define DLM_THREAD_TIMEOUT_MS (4 * 1000) 615 #define DLM_THREAD_MAX_DIRTY 100 616 #define DLM_THREAD_MAX_ASTS 10 617 618 static int dlm_thread(void *data) 619 { 620 struct dlm_lock_resource *res; 621 struct dlm_ctxt *dlm = data; 622 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS); 623 624 mlog(0, "dlm thread running for %s...\n", dlm->name); 625 626 while (!kthread_should_stop()) { 627 int n = DLM_THREAD_MAX_DIRTY; 628 629 /* dlm_shutting_down is very point-in-time, but that 630 * doesn't matter as we'll just loop back around if we 631 * get false on the leading edge of a state 632 * transition. */ 633 dlm_run_purge_list(dlm, dlm_shutting_down(dlm)); 634 635 /* We really don't want to hold dlm->spinlock while 636 * calling dlm_shuffle_lists on each lockres that 637 * needs to have its queues adjusted and AST/BASTs 638 * run. So let's pull each entry off the dirty_list 639 * and drop dlm->spinlock ASAP. Once off the list, 640 * res->spinlock needs to be taken again to protect 641 * the queues while calling dlm_shuffle_lists. */ 642 spin_lock(&dlm->spinlock); 643 while (!list_empty(&dlm->dirty_list)) { 644 int delay = 0; 645 res = list_entry(dlm->dirty_list.next, 646 struct dlm_lock_resource, dirty); 647 648 /* peel a lockres off, remove it from the list, 649 * unset the dirty flag and drop the dlm lock */ 650 BUG_ON(!res); 651 dlm_lockres_get(res); 652 653 spin_lock(&res->spinlock); 654 res->state &= ~DLM_LOCK_RES_DIRTY; 655 list_del_init(&res->dirty); 656 spin_unlock(&res->spinlock); 657 spin_unlock(&dlm->spinlock); 658 /* Drop dirty_list ref */ 659 dlm_lockres_put(res); 660 661 /* lockres can be re-dirtied/re-added to the 662 * dirty_list in this gap, but that is ok */ 663 664 spin_lock(&res->spinlock); 665 if (res->owner != dlm->node_num) { 666 __dlm_print_one_lock_resource(res); 667 mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n", 668 res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no", 669 res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no", 670 res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no", 671 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no"); 672 } 673 BUG_ON(res->owner != dlm->node_num); 674 675 /* it is now ok to move lockreses in these states 676 * to the dirty list, assuming that they will only be 677 * dirty for a short while. */ 678 if (res->state & (DLM_LOCK_RES_IN_PROGRESS | 679 DLM_LOCK_RES_MIGRATING | 680 DLM_LOCK_RES_RECOVERING)) { 681 /* move it to the tail and keep going */ 682 spin_unlock(&res->spinlock); 683 mlog(0, "delaying list shuffling for in-" 684 "progress lockres %.*s, state=%d\n", 685 res->lockname.len, res->lockname.name, 686 res->state); 687 delay = 1; 688 goto in_progress; 689 } 690 691 /* at this point the lockres is not migrating/ 692 * recovering/in-progress. we have the lockres 693 * spinlock and do NOT have the dlm lock. 694 * safe to reserve/queue asts and run the lists. */ 695 696 mlog(0, "calling dlm_shuffle_lists with dlm=%s, " 697 "res=%.*s\n", dlm->name, 698 res->lockname.len, res->lockname.name); 699 700 /* called while holding lockres lock */ 701 dlm_shuffle_lists(dlm, res); 702 spin_unlock(&res->spinlock); 703 704 dlm_lockres_calc_usage(dlm, res); 705 706 in_progress: 707 708 spin_lock(&dlm->spinlock); 709 /* if the lock was in-progress, stick 710 * it on the back of the list */ 711 if (delay) { 712 /* ref for dirty_list */ 713 dlm_lockres_get(res); 714 spin_lock(&res->spinlock); 715 list_add_tail(&res->dirty, &dlm->dirty_list); 716 res->state |= DLM_LOCK_RES_DIRTY; 717 spin_unlock(&res->spinlock); 718 } 719 dlm_lockres_put(res); 720 721 /* unlikely, but we may need to give time to 722 * other tasks */ 723 if (!--n) { 724 mlog(0, "throttling dlm_thread\n"); 725 break; 726 } 727 } 728 729 spin_unlock(&dlm->spinlock); 730 dlm_flush_asts(dlm); 731 732 /* yield and continue right away if there is more work to do */ 733 if (!n) { 734 cond_resched(); 735 continue; 736 } 737 738 wait_event_interruptible_timeout(dlm->dlm_thread_wq, 739 !dlm_dirty_list_empty(dlm) || 740 kthread_should_stop(), 741 timeout); 742 } 743 744 mlog(0, "quitting DLM thread\n"); 745 return 0; 746 } 747