xref: /linux/fs/ocfs2/dlm/dlmthread.c (revision 14b42963f64b98ab61fa9723c03d71aa5ef4f862)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmthread.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/timer.h>
41 #include <linux/kthread.h>
42 #include <linux/delay.h>
43 
44 
45 #include "cluster/heartbeat.h"
46 #include "cluster/nodemanager.h"
47 #include "cluster/tcp.h"
48 
49 #include "dlmapi.h"
50 #include "dlmcommon.h"
51 #include "dlmdomain.h"
52 
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
54 #include "cluster/masklog.h"
55 
56 static int dlm_thread(void *data);
57 static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 				  struct dlm_lock_resource *lockres);
59 
60 static void dlm_flush_asts(struct dlm_ctxt *dlm);
61 
62 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
63 
64 /* will exit holding res->spinlock, but may drop in function */
65 /* waits until flags are cleared on res->state */
66 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
67 {
68 	DECLARE_WAITQUEUE(wait, current);
69 
70 	assert_spin_locked(&res->spinlock);
71 
72 	add_wait_queue(&res->wq, &wait);
73 repeat:
74 	set_current_state(TASK_UNINTERRUPTIBLE);
75 	if (res->state & flags) {
76 		spin_unlock(&res->spinlock);
77 		schedule();
78 		spin_lock(&res->spinlock);
79 		goto repeat;
80 	}
81 	remove_wait_queue(&res->wq, &wait);
82 	current->state = TASK_RUNNING;
83 }
84 
85 
86 int __dlm_lockres_unused(struct dlm_lock_resource *res)
87 {
88 	if (list_empty(&res->granted) &&
89 	    list_empty(&res->converting) &&
90 	    list_empty(&res->blocked) &&
91 	    list_empty(&res->dirty))
92 		return 1;
93 	return 0;
94 }
95 
96 
97 /* Call whenever you may have added or deleted something from one of
98  * the lockres queue's. This will figure out whether it belongs on the
99  * unused list or not and does the appropriate thing. */
100 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
101 			      struct dlm_lock_resource *res)
102 {
103 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
104 
105 	assert_spin_locked(&dlm->spinlock);
106 	assert_spin_locked(&res->spinlock);
107 
108 	if (__dlm_lockres_unused(res)){
109 		/* For now, just keep any resource we master */
110 		if (res->owner == dlm->node_num)
111 		{
112 			if (!list_empty(&res->purge)) {
113 				mlog(0, "we master %s:%.*s, but it is on "
114 				     "the purge list.  Removing\n",
115 				     dlm->name, res->lockname.len,
116 				     res->lockname.name);
117 				list_del_init(&res->purge);
118 				dlm->purge_count--;
119 			}
120 			return;
121 		}
122 
123 		if (list_empty(&res->purge)) {
124 			mlog(0, "putting lockres %.*s from purge list\n",
125 			     res->lockname.len, res->lockname.name);
126 
127 			res->last_used = jiffies;
128 			list_add_tail(&res->purge, &dlm->purge_list);
129 			dlm->purge_count++;
130 
131 			/* if this node is not the owner, there is
132 			 * no way to keep track of who the owner could be.
133 			 * unhash it to avoid serious problems. */
134 			if (res->owner != dlm->node_num) {
135 				mlog(0, "%s:%.*s: doing immediate "
136 				     "purge of lockres owned by %u\n",
137 				     dlm->name, res->lockname.len,
138 				     res->lockname.name, res->owner);
139 
140 				dlm_purge_lockres_now(dlm, res);
141 			}
142 		}
143 	} else if (!list_empty(&res->purge)) {
144 		mlog(0, "removing lockres %.*s from purge list, "
145 		     "owner=%u\n", res->lockname.len, res->lockname.name,
146 		     res->owner);
147 
148 		list_del_init(&res->purge);
149 		dlm->purge_count--;
150 	}
151 }
152 
153 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
154 			    struct dlm_lock_resource *res)
155 {
156 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
157 	spin_lock(&dlm->spinlock);
158 	spin_lock(&res->spinlock);
159 
160 	__dlm_lockres_calc_usage(dlm, res);
161 
162 	spin_unlock(&res->spinlock);
163 	spin_unlock(&dlm->spinlock);
164 }
165 
166 /* TODO: Eventual API: Called with the dlm spinlock held, may drop it
167  * to do migration, but will re-acquire before exit. */
168 void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
169 {
170 	int master;
171 	int ret;
172 
173 	spin_lock(&lockres->spinlock);
174 	master = lockres->owner == dlm->node_num;
175 	spin_unlock(&lockres->spinlock);
176 
177 	mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
178 	     lockres->lockname.name, master);
179 
180 	/* Non master is the easy case -- no migration required, just
181 	 * quit. */
182 	if (!master)
183 		goto finish;
184 
185 	/* Wheee! Migrate lockres here! */
186 	spin_unlock(&dlm->spinlock);
187 again:
188 
189 	ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
190 	if (ret == -ENOTEMPTY) {
191 		mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
192 		     lockres->lockname.len, lockres->lockname.name);
193 
194 		BUG();
195 	} else if (ret < 0) {
196 		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
197 		     lockres->lockname.len, lockres->lockname.name);
198 		msleep(100);
199 		goto again;
200 	}
201 
202 	spin_lock(&dlm->spinlock);
203 
204 finish:
205 	if (!list_empty(&lockres->purge)) {
206 		list_del_init(&lockres->purge);
207 		dlm->purge_count--;
208 	}
209 	__dlm_unhash_lockres(lockres);
210 }
211 
212 /* make an unused lockres go away immediately.
213  * as soon as the dlm spinlock is dropped, this lockres
214  * will not be found. kfree still happens on last put. */
215 static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
216 				  struct dlm_lock_resource *lockres)
217 {
218 	assert_spin_locked(&dlm->spinlock);
219 	assert_spin_locked(&lockres->spinlock);
220 
221 	BUG_ON(!__dlm_lockres_unused(lockres));
222 
223 	if (!list_empty(&lockres->purge)) {
224 		list_del_init(&lockres->purge);
225 		dlm->purge_count--;
226 	}
227 	__dlm_unhash_lockres(lockres);
228 }
229 
230 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
231 			       int purge_now)
232 {
233 	unsigned int run_max, unused;
234 	unsigned long purge_jiffies;
235 	struct dlm_lock_resource *lockres;
236 
237 	spin_lock(&dlm->spinlock);
238 	run_max = dlm->purge_count;
239 
240 	while(run_max && !list_empty(&dlm->purge_list)) {
241 		run_max--;
242 
243 		lockres = list_entry(dlm->purge_list.next,
244 				     struct dlm_lock_resource, purge);
245 
246 		/* Status of the lockres *might* change so double
247 		 * check. If the lockres is unused, holding the dlm
248 		 * spinlock will prevent people from getting and more
249 		 * refs on it -- there's no need to keep the lockres
250 		 * spinlock. */
251 		spin_lock(&lockres->spinlock);
252 		unused = __dlm_lockres_unused(lockres);
253 		spin_unlock(&lockres->spinlock);
254 
255 		if (!unused)
256 			continue;
257 
258 		purge_jiffies = lockres->last_used +
259 			msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
260 
261 		/* Make sure that we want to be processing this guy at
262 		 * this time. */
263 		if (!purge_now && time_after(purge_jiffies, jiffies)) {
264 			/* Since resources are added to the purge list
265 			 * in tail order, we can stop at the first
266 			 * unpurgable resource -- anyone added after
267 			 * him will have a greater last_used value */
268 			break;
269 		}
270 
271 		list_del_init(&lockres->purge);
272 		dlm->purge_count--;
273 
274 		/* This may drop and reacquire the dlm spinlock if it
275 		 * has to do migration. */
276 		mlog(0, "calling dlm_purge_lockres!\n");
277 		dlm_purge_lockres(dlm, lockres);
278 		mlog(0, "DONE calling dlm_purge_lockres!\n");
279 
280 		/* Avoid adding any scheduling latencies */
281 		cond_resched_lock(&dlm->spinlock);
282 	}
283 
284 	spin_unlock(&dlm->spinlock);
285 }
286 
287 static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
288 			      struct dlm_lock_resource *res)
289 {
290 	struct dlm_lock *lock, *target;
291 	struct list_head *iter;
292 	struct list_head *head;
293 	int can_grant = 1;
294 
295 	//mlog(0, "res->lockname.len=%d\n", res->lockname.len);
296 	//mlog(0, "res->lockname.name=%p\n", res->lockname.name);
297 	//mlog(0, "shuffle res %.*s\n", res->lockname.len,
298 	//	  res->lockname.name);
299 
300 	/* because this function is called with the lockres
301 	 * spinlock, and because we know that it is not migrating/
302 	 * recovering/in-progress, it is fine to reserve asts and
303 	 * basts right before queueing them all throughout */
304 	assert_spin_locked(&res->spinlock);
305 	BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
306 			      DLM_LOCK_RES_RECOVERING|
307 			      DLM_LOCK_RES_IN_PROGRESS)));
308 
309 converting:
310 	if (list_empty(&res->converting))
311 		goto blocked;
312 	mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
313 	     res->lockname.name);
314 
315 	target = list_entry(res->converting.next, struct dlm_lock, list);
316 	if (target->ml.convert_type == LKM_IVMODE) {
317 		mlog(ML_ERROR, "%.*s: converting a lock with no "
318 		     "convert_type!\n", res->lockname.len, res->lockname.name);
319 		BUG();
320 	}
321 	head = &res->granted;
322 	list_for_each(iter, head) {
323 		lock = list_entry(iter, struct dlm_lock, list);
324 		if (lock==target)
325 			continue;
326 		if (!dlm_lock_compatible(lock->ml.type,
327 					 target->ml.convert_type)) {
328 			can_grant = 0;
329 			/* queue the BAST if not already */
330 			if (lock->ml.highest_blocked == LKM_IVMODE) {
331 				__dlm_lockres_reserve_ast(res);
332 				dlm_queue_bast(dlm, lock);
333 			}
334 			/* update the highest_blocked if needed */
335 			if (lock->ml.highest_blocked < target->ml.convert_type)
336 				lock->ml.highest_blocked =
337 					target->ml.convert_type;
338 		}
339 	}
340 	head = &res->converting;
341 	list_for_each(iter, head) {
342 		lock = list_entry(iter, struct dlm_lock, list);
343 		if (lock==target)
344 			continue;
345 		if (!dlm_lock_compatible(lock->ml.type,
346 					 target->ml.convert_type)) {
347 			can_grant = 0;
348 			if (lock->ml.highest_blocked == LKM_IVMODE) {
349 				__dlm_lockres_reserve_ast(res);
350 				dlm_queue_bast(dlm, lock);
351 			}
352 			if (lock->ml.highest_blocked < target->ml.convert_type)
353 				lock->ml.highest_blocked =
354 					target->ml.convert_type;
355 		}
356 	}
357 
358 	/* we can convert the lock */
359 	if (can_grant) {
360 		spin_lock(&target->spinlock);
361 		BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
362 
363 		mlog(0, "calling ast for converting lock: %.*s, have: %d, "
364 		     "granting: %d, node: %u\n", res->lockname.len,
365 		     res->lockname.name, target->ml.type,
366 		     target->ml.convert_type, target->ml.node);
367 
368 		target->ml.type = target->ml.convert_type;
369 		target->ml.convert_type = LKM_IVMODE;
370 		list_move_tail(&target->list, &res->granted);
371 
372 		BUG_ON(!target->lksb);
373 		target->lksb->status = DLM_NORMAL;
374 
375 		spin_unlock(&target->spinlock);
376 
377 		__dlm_lockres_reserve_ast(res);
378 		dlm_queue_ast(dlm, target);
379 		/* go back and check for more */
380 		goto converting;
381 	}
382 
383 blocked:
384 	if (list_empty(&res->blocked))
385 		goto leave;
386 	target = list_entry(res->blocked.next, struct dlm_lock, list);
387 
388 	head = &res->granted;
389 	list_for_each(iter, head) {
390 		lock = list_entry(iter, struct dlm_lock, list);
391 		if (lock==target)
392 			continue;
393 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
394 			can_grant = 0;
395 			if (lock->ml.highest_blocked == LKM_IVMODE) {
396 				__dlm_lockres_reserve_ast(res);
397 				dlm_queue_bast(dlm, lock);
398 			}
399 			if (lock->ml.highest_blocked < target->ml.type)
400 				lock->ml.highest_blocked = target->ml.type;
401 		}
402 	}
403 
404 	head = &res->converting;
405 	list_for_each(iter, head) {
406 		lock = list_entry(iter, struct dlm_lock, list);
407 		if (lock==target)
408 			continue;
409 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
410 			can_grant = 0;
411 			if (lock->ml.highest_blocked == LKM_IVMODE) {
412 				__dlm_lockres_reserve_ast(res);
413 				dlm_queue_bast(dlm, lock);
414 			}
415 			if (lock->ml.highest_blocked < target->ml.type)
416 				lock->ml.highest_blocked = target->ml.type;
417 		}
418 	}
419 
420 	/* we can grant the blocked lock (only
421 	 * possible if converting list empty) */
422 	if (can_grant) {
423 		spin_lock(&target->spinlock);
424 		BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
425 
426 		mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
427 		     "node: %u\n", res->lockname.len, res->lockname.name,
428 		     target->ml.type, target->ml.node);
429 
430 		// target->ml.type is already correct
431 		list_move_tail(&target->list, &res->granted);
432 
433 		BUG_ON(!target->lksb);
434 		target->lksb->status = DLM_NORMAL;
435 
436 		spin_unlock(&target->spinlock);
437 
438 		__dlm_lockres_reserve_ast(res);
439 		dlm_queue_ast(dlm, target);
440 		/* go back and check for more */
441 		goto converting;
442 	}
443 
444 leave:
445 	return;
446 }
447 
448 /* must have NO locks when calling this with res !=NULL * */
449 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
450 {
451 	mlog_entry("dlm=%p, res=%p\n", dlm, res);
452 	if (res) {
453 		spin_lock(&dlm->spinlock);
454 		spin_lock(&res->spinlock);
455 		__dlm_dirty_lockres(dlm, res);
456 		spin_unlock(&res->spinlock);
457 		spin_unlock(&dlm->spinlock);
458 	}
459 	wake_up(&dlm->dlm_thread_wq);
460 }
461 
462 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
463 {
464 	mlog_entry("dlm=%p, res=%p\n", dlm, res);
465 
466 	assert_spin_locked(&dlm->spinlock);
467 	assert_spin_locked(&res->spinlock);
468 
469 	/* don't shuffle secondary queues */
470 	if ((res->owner == dlm->node_num) &&
471 	    !(res->state & DLM_LOCK_RES_DIRTY)) {
472 		/* ref for dirty_list */
473 		dlm_lockres_get(res);
474 		list_add_tail(&res->dirty, &dlm->dirty_list);
475 		res->state |= DLM_LOCK_RES_DIRTY;
476 	}
477 }
478 
479 
480 /* Launch the NM thread for the mounted volume */
481 int dlm_launch_thread(struct dlm_ctxt *dlm)
482 {
483 	mlog(0, "starting dlm thread...\n");
484 
485 	dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
486 	if (IS_ERR(dlm->dlm_thread_task)) {
487 		mlog_errno(PTR_ERR(dlm->dlm_thread_task));
488 		dlm->dlm_thread_task = NULL;
489 		return -EINVAL;
490 	}
491 
492 	return 0;
493 }
494 
495 void dlm_complete_thread(struct dlm_ctxt *dlm)
496 {
497 	if (dlm->dlm_thread_task) {
498 		mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
499 		kthread_stop(dlm->dlm_thread_task);
500 		dlm->dlm_thread_task = NULL;
501 	}
502 }
503 
504 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
505 {
506 	int empty;
507 
508 	spin_lock(&dlm->spinlock);
509 	empty = list_empty(&dlm->dirty_list);
510 	spin_unlock(&dlm->spinlock);
511 
512 	return empty;
513 }
514 
515 static void dlm_flush_asts(struct dlm_ctxt *dlm)
516 {
517 	int ret;
518 	struct dlm_lock *lock;
519 	struct dlm_lock_resource *res;
520 	u8 hi;
521 
522 	spin_lock(&dlm->ast_lock);
523 	while (!list_empty(&dlm->pending_asts)) {
524 		lock = list_entry(dlm->pending_asts.next,
525 				  struct dlm_lock, ast_list);
526 		/* get an extra ref on lock */
527 		dlm_lock_get(lock);
528 		res = lock->lockres;
529 		mlog(0, "delivering an ast for this lockres\n");
530 
531 		BUG_ON(!lock->ast_pending);
532 
533 		/* remove from list (including ref) */
534 		list_del_init(&lock->ast_list);
535 		dlm_lock_put(lock);
536 		spin_unlock(&dlm->ast_lock);
537 
538 		if (lock->ml.node != dlm->node_num) {
539 			ret = dlm_do_remote_ast(dlm, res, lock);
540 			if (ret < 0)
541 				mlog_errno(ret);
542 		} else
543 			dlm_do_local_ast(dlm, res, lock);
544 
545 		spin_lock(&dlm->ast_lock);
546 
547 		/* possible that another ast was queued while
548 		 * we were delivering the last one */
549 		if (!list_empty(&lock->ast_list)) {
550 			mlog(0, "aha another ast got queued while "
551 			     "we were finishing the last one.  will "
552 			     "keep the ast_pending flag set.\n");
553 		} else
554 			lock->ast_pending = 0;
555 
556 		/* drop the extra ref.
557 		 * this may drop it completely. */
558 		dlm_lock_put(lock);
559 		dlm_lockres_release_ast(dlm, res);
560 	}
561 
562 	while (!list_empty(&dlm->pending_basts)) {
563 		lock = list_entry(dlm->pending_basts.next,
564 				  struct dlm_lock, bast_list);
565 		/* get an extra ref on lock */
566 		dlm_lock_get(lock);
567 		res = lock->lockres;
568 
569 		BUG_ON(!lock->bast_pending);
570 
571 		/* get the highest blocked lock, and reset */
572 		spin_lock(&lock->spinlock);
573 		BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
574 		hi = lock->ml.highest_blocked;
575 		lock->ml.highest_blocked = LKM_IVMODE;
576 		spin_unlock(&lock->spinlock);
577 
578 		/* remove from list (including ref) */
579 		list_del_init(&lock->bast_list);
580 		dlm_lock_put(lock);
581 		spin_unlock(&dlm->ast_lock);
582 
583 		mlog(0, "delivering a bast for this lockres "
584 		     "(blocked = %d\n", hi);
585 
586 		if (lock->ml.node != dlm->node_num) {
587 			ret = dlm_send_proxy_bast(dlm, res, lock, hi);
588 			if (ret < 0)
589 				mlog_errno(ret);
590 		} else
591 			dlm_do_local_bast(dlm, res, lock, hi);
592 
593 		spin_lock(&dlm->ast_lock);
594 
595 		/* possible that another bast was queued while
596 		 * we were delivering the last one */
597 		if (!list_empty(&lock->bast_list)) {
598 			mlog(0, "aha another bast got queued while "
599 			     "we were finishing the last one.  will "
600 			     "keep the bast_pending flag set.\n");
601 		} else
602 			lock->bast_pending = 0;
603 
604 		/* drop the extra ref.
605 		 * this may drop it completely. */
606 		dlm_lock_put(lock);
607 		dlm_lockres_release_ast(dlm, res);
608 	}
609 	wake_up(&dlm->ast_wq);
610 	spin_unlock(&dlm->ast_lock);
611 }
612 
613 
614 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
615 #define DLM_THREAD_MAX_DIRTY  100
616 #define DLM_THREAD_MAX_ASTS   10
617 
618 static int dlm_thread(void *data)
619 {
620 	struct dlm_lock_resource *res;
621 	struct dlm_ctxt *dlm = data;
622 	unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
623 
624 	mlog(0, "dlm thread running for %s...\n", dlm->name);
625 
626 	while (!kthread_should_stop()) {
627 		int n = DLM_THREAD_MAX_DIRTY;
628 
629 		/* dlm_shutting_down is very point-in-time, but that
630 		 * doesn't matter as we'll just loop back around if we
631 		 * get false on the leading edge of a state
632 		 * transition. */
633 		dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
634 
635 		/* We really don't want to hold dlm->spinlock while
636 		 * calling dlm_shuffle_lists on each lockres that
637 		 * needs to have its queues adjusted and AST/BASTs
638 		 * run.  So let's pull each entry off the dirty_list
639 		 * and drop dlm->spinlock ASAP.  Once off the list,
640 		 * res->spinlock needs to be taken again to protect
641 		 * the queues while calling dlm_shuffle_lists.  */
642 		spin_lock(&dlm->spinlock);
643 		while (!list_empty(&dlm->dirty_list)) {
644 			int delay = 0;
645 			res = list_entry(dlm->dirty_list.next,
646 					 struct dlm_lock_resource, dirty);
647 
648 			/* peel a lockres off, remove it from the list,
649 			 * unset the dirty flag and drop the dlm lock */
650 			BUG_ON(!res);
651 			dlm_lockres_get(res);
652 
653 			spin_lock(&res->spinlock);
654 			res->state &= ~DLM_LOCK_RES_DIRTY;
655 			list_del_init(&res->dirty);
656 			spin_unlock(&res->spinlock);
657 			spin_unlock(&dlm->spinlock);
658 			/* Drop dirty_list ref */
659 			dlm_lockres_put(res);
660 
661 		 	/* lockres can be re-dirtied/re-added to the
662 			 * dirty_list in this gap, but that is ok */
663 
664 			spin_lock(&res->spinlock);
665 			if (res->owner != dlm->node_num) {
666 				__dlm_print_one_lock_resource(res);
667 				mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
668 				     res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
669 				     res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
670 				     res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
671 				     res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
672 			}
673 			BUG_ON(res->owner != dlm->node_num);
674 
675 			/* it is now ok to move lockreses in these states
676 			 * to the dirty list, assuming that they will only be
677 			 * dirty for a short while. */
678 			if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
679 					  DLM_LOCK_RES_MIGRATING |
680 					  DLM_LOCK_RES_RECOVERING)) {
681 				/* move it to the tail and keep going */
682 				spin_unlock(&res->spinlock);
683 				mlog(0, "delaying list shuffling for in-"
684 				     "progress lockres %.*s, state=%d\n",
685 				     res->lockname.len, res->lockname.name,
686 				     res->state);
687 				delay = 1;
688 				goto in_progress;
689 			}
690 
691 			/* at this point the lockres is not migrating/
692 			 * recovering/in-progress.  we have the lockres
693 			 * spinlock and do NOT have the dlm lock.
694 			 * safe to reserve/queue asts and run the lists. */
695 
696 			mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
697 			     "res=%.*s\n", dlm->name,
698 			     res->lockname.len, res->lockname.name);
699 
700 			/* called while holding lockres lock */
701 			dlm_shuffle_lists(dlm, res);
702 			spin_unlock(&res->spinlock);
703 
704 			dlm_lockres_calc_usage(dlm, res);
705 
706 in_progress:
707 
708 			spin_lock(&dlm->spinlock);
709 			/* if the lock was in-progress, stick
710 			 * it on the back of the list */
711 			if (delay) {
712 				/* ref for dirty_list */
713 				dlm_lockres_get(res);
714 				spin_lock(&res->spinlock);
715 				list_add_tail(&res->dirty, &dlm->dirty_list);
716 				res->state |= DLM_LOCK_RES_DIRTY;
717 				spin_unlock(&res->spinlock);
718 			}
719 			dlm_lockres_put(res);
720 
721 			/* unlikely, but we may need to give time to
722 			 * other tasks */
723 			if (!--n) {
724 				mlog(0, "throttling dlm_thread\n");
725 				break;
726 			}
727 		}
728 
729 		spin_unlock(&dlm->spinlock);
730 		dlm_flush_asts(dlm);
731 
732 		/* yield and continue right away if there is more work to do */
733 		if (!n) {
734 			cond_resched();
735 			continue;
736 		}
737 
738 		wait_event_interruptible_timeout(dlm->dlm_thread_wq,
739 						 !dlm_dirty_list_empty(dlm) ||
740 						 kthread_should_stop(),
741 						 timeout);
742 	}
743 
744 	mlog(0, "quitting DLM thread\n");
745 	return 0;
746 }
747