xref: /linux/fs/ocfs2/dlm/dlmthread.c (revision 123656d4cc8c946f578ebd18c2050f5251720428)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmthread.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/timer.h>
41 #include <linux/kthread.h>
42 
43 
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47 
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdomain.h"
51 
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
53 #include "cluster/masklog.h"
54 
55 static int dlm_thread(void *data);
56 
57 static void dlm_flush_asts(struct dlm_ctxt *dlm);
58 
59 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
60 
61 /* will exit holding res->spinlock, but may drop in function */
62 /* waits until flags are cleared on res->state */
63 void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
64 {
65 	DECLARE_WAITQUEUE(wait, current);
66 
67 	assert_spin_locked(&res->spinlock);
68 
69 	add_wait_queue(&res->wq, &wait);
70 repeat:
71 	set_current_state(TASK_UNINTERRUPTIBLE);
72 	if (res->state & flags) {
73 		spin_unlock(&res->spinlock);
74 		schedule();
75 		spin_lock(&res->spinlock);
76 		goto repeat;
77 	}
78 	remove_wait_queue(&res->wq, &wait);
79 	current->state = TASK_RUNNING;
80 }
81 
82 
83 static int __dlm_lockres_unused(struct dlm_lock_resource *res)
84 {
85 	if (list_empty(&res->granted) &&
86 	    list_empty(&res->converting) &&
87 	    list_empty(&res->blocked) &&
88 	    list_empty(&res->dirty))
89 		return 1;
90 	return 0;
91 }
92 
93 
94 /* Call whenever you may have added or deleted something from one of
95  * the lockres queue's. This will figure out whether it belongs on the
96  * unused list or not and does the appropriate thing. */
97 void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
98 			      struct dlm_lock_resource *res)
99 {
100 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
101 
102 	assert_spin_locked(&dlm->spinlock);
103 	assert_spin_locked(&res->spinlock);
104 
105 	if (__dlm_lockres_unused(res)){
106 		if (list_empty(&res->purge)) {
107 			mlog(0, "putting lockres %.*s from purge list\n",
108 			     res->lockname.len, res->lockname.name);
109 
110 			res->last_used = jiffies;
111 			list_add_tail(&res->purge, &dlm->purge_list);
112 			dlm->purge_count++;
113 		}
114 	} else if (!list_empty(&res->purge)) {
115 		mlog(0, "removing lockres %.*s from purge list\n",
116 		     res->lockname.len, res->lockname.name);
117 
118 		list_del_init(&res->purge);
119 		dlm->purge_count--;
120 	}
121 }
122 
123 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
124 			    struct dlm_lock_resource *res)
125 {
126 	mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
127 	spin_lock(&dlm->spinlock);
128 	spin_lock(&res->spinlock);
129 
130 	__dlm_lockres_calc_usage(dlm, res);
131 
132 	spin_unlock(&res->spinlock);
133 	spin_unlock(&dlm->spinlock);
134 }
135 
136 /* TODO: Eventual API: Called with the dlm spinlock held, may drop it
137  * to do migration, but will re-acquire before exit. */
138 void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
139 {
140 	int master;
141 	int ret;
142 
143 	spin_lock(&lockres->spinlock);
144 	master = lockres->owner == dlm->node_num;
145 	spin_unlock(&lockres->spinlock);
146 
147 	mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
148 	     lockres->lockname.name, master);
149 
150 	/* Non master is the easy case -- no migration required, just
151 	 * quit. */
152 	if (!master)
153 		goto finish;
154 
155 	/* Wheee! Migrate lockres here! */
156 	spin_unlock(&dlm->spinlock);
157 again:
158 
159 	ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
160 	if (ret == -ENOTEMPTY) {
161 		mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
162 		     lockres->lockname.len, lockres->lockname.name);
163 
164 		BUG();
165 	} else if (ret < 0) {
166 		mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
167 		     lockres->lockname.len, lockres->lockname.name);
168 		goto again;
169 	}
170 
171 	spin_lock(&dlm->spinlock);
172 
173 finish:
174 	if (!list_empty(&lockres->purge)) {
175 		list_del_init(&lockres->purge);
176 		dlm->purge_count--;
177 	}
178 	__dlm_unhash_lockres(lockres);
179 }
180 
181 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
182 			       int purge_now)
183 {
184 	unsigned int run_max, unused;
185 	unsigned long purge_jiffies;
186 	struct dlm_lock_resource *lockres;
187 
188 	spin_lock(&dlm->spinlock);
189 	run_max = dlm->purge_count;
190 
191 	while(run_max && !list_empty(&dlm->purge_list)) {
192 		run_max--;
193 
194 		lockres = list_entry(dlm->purge_list.next,
195 				     struct dlm_lock_resource, purge);
196 
197 		/* Status of the lockres *might* change so double
198 		 * check. If the lockres is unused, holding the dlm
199 		 * spinlock will prevent people from getting and more
200 		 * refs on it -- there's no need to keep the lockres
201 		 * spinlock. */
202 		spin_lock(&lockres->spinlock);
203 		unused = __dlm_lockres_unused(lockres);
204 		spin_unlock(&lockres->spinlock);
205 
206 		if (!unused)
207 			continue;
208 
209 		purge_jiffies = lockres->last_used +
210 			msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
211 
212 		/* Make sure that we want to be processing this guy at
213 		 * this time. */
214 		if (!purge_now && time_after(purge_jiffies, jiffies)) {
215 			/* Since resources are added to the purge list
216 			 * in tail order, we can stop at the first
217 			 * unpurgable resource -- anyone added after
218 			 * him will have a greater last_used value */
219 			break;
220 		}
221 
222 		list_del_init(&lockres->purge);
223 		dlm->purge_count--;
224 
225 		/* This may drop and reacquire the dlm spinlock if it
226 		 * has to do migration. */
227 		mlog(0, "calling dlm_purge_lockres!\n");
228 		dlm_purge_lockres(dlm, lockres);
229 		mlog(0, "DONE calling dlm_purge_lockres!\n");
230 
231 		/* Avoid adding any scheduling latencies */
232 		cond_resched_lock(&dlm->spinlock);
233 	}
234 
235 	spin_unlock(&dlm->spinlock);
236 }
237 
238 static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
239 			      struct dlm_lock_resource *res)
240 {
241 	struct dlm_lock *lock, *target;
242 	struct list_head *iter;
243 	struct list_head *head;
244 	int can_grant = 1;
245 
246 	//mlog(0, "res->lockname.len=%d\n", res->lockname.len);
247 	//mlog(0, "res->lockname.name=%p\n", res->lockname.name);
248 	//mlog(0, "shuffle res %.*s\n", res->lockname.len,
249 	//	  res->lockname.name);
250 
251 	/* because this function is called with the lockres
252 	 * spinlock, and because we know that it is not migrating/
253 	 * recovering/in-progress, it is fine to reserve asts and
254 	 * basts right before queueing them all throughout */
255 	assert_spin_locked(&res->spinlock);
256 	BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
257 			      DLM_LOCK_RES_RECOVERING|
258 			      DLM_LOCK_RES_IN_PROGRESS)));
259 
260 converting:
261 	if (list_empty(&res->converting))
262 		goto blocked;
263 	mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
264 	     res->lockname.name);
265 
266 	target = list_entry(res->converting.next, struct dlm_lock, list);
267 	if (target->ml.convert_type == LKM_IVMODE) {
268 		mlog(ML_ERROR, "%.*s: converting a lock with no "
269 		     "convert_type!\n", res->lockname.len, res->lockname.name);
270 		BUG();
271 	}
272 	head = &res->granted;
273 	list_for_each(iter, head) {
274 		lock = list_entry(iter, struct dlm_lock, list);
275 		if (lock==target)
276 			continue;
277 		if (!dlm_lock_compatible(lock->ml.type,
278 					 target->ml.convert_type)) {
279 			can_grant = 0;
280 			/* queue the BAST if not already */
281 			if (lock->ml.highest_blocked == LKM_IVMODE) {
282 				__dlm_lockres_reserve_ast(res);
283 				dlm_queue_bast(dlm, lock);
284 			}
285 			/* update the highest_blocked if needed */
286 			if (lock->ml.highest_blocked < target->ml.convert_type)
287 				lock->ml.highest_blocked =
288 					target->ml.convert_type;
289 		}
290 	}
291 	head = &res->converting;
292 	list_for_each(iter, head) {
293 		lock = list_entry(iter, struct dlm_lock, list);
294 		if (lock==target)
295 			continue;
296 		if (!dlm_lock_compatible(lock->ml.type,
297 					 target->ml.convert_type)) {
298 			can_grant = 0;
299 			if (lock->ml.highest_blocked == LKM_IVMODE) {
300 				__dlm_lockres_reserve_ast(res);
301 				dlm_queue_bast(dlm, lock);
302 			}
303 			if (lock->ml.highest_blocked < target->ml.convert_type)
304 				lock->ml.highest_blocked =
305 					target->ml.convert_type;
306 		}
307 	}
308 
309 	/* we can convert the lock */
310 	if (can_grant) {
311 		spin_lock(&target->spinlock);
312 		BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
313 
314 		mlog(0, "calling ast for converting lock: %.*s, have: %d, "
315 		     "granting: %d, node: %u\n", res->lockname.len,
316 		     res->lockname.name, target->ml.type,
317 		     target->ml.convert_type, target->ml.node);
318 
319 		target->ml.type = target->ml.convert_type;
320 		target->ml.convert_type = LKM_IVMODE;
321 		list_del_init(&target->list);
322 		list_add_tail(&target->list, &res->granted);
323 
324 		BUG_ON(!target->lksb);
325 		target->lksb->status = DLM_NORMAL;
326 
327 		spin_unlock(&target->spinlock);
328 
329 		__dlm_lockres_reserve_ast(res);
330 		dlm_queue_ast(dlm, target);
331 		/* go back and check for more */
332 		goto converting;
333 	}
334 
335 blocked:
336 	if (list_empty(&res->blocked))
337 		goto leave;
338 	target = list_entry(res->blocked.next, struct dlm_lock, list);
339 
340 	head = &res->granted;
341 	list_for_each(iter, head) {
342 		lock = list_entry(iter, struct dlm_lock, list);
343 		if (lock==target)
344 			continue;
345 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
346 			can_grant = 0;
347 			if (lock->ml.highest_blocked == LKM_IVMODE) {
348 				__dlm_lockres_reserve_ast(res);
349 				dlm_queue_bast(dlm, lock);
350 			}
351 			if (lock->ml.highest_blocked < target->ml.type)
352 				lock->ml.highest_blocked = target->ml.type;
353 		}
354 	}
355 
356 	head = &res->converting;
357 	list_for_each(iter, head) {
358 		lock = list_entry(iter, struct dlm_lock, list);
359 		if (lock==target)
360 			continue;
361 		if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
362 			can_grant = 0;
363 			if (lock->ml.highest_blocked == LKM_IVMODE) {
364 				__dlm_lockres_reserve_ast(res);
365 				dlm_queue_bast(dlm, lock);
366 			}
367 			if (lock->ml.highest_blocked < target->ml.type)
368 				lock->ml.highest_blocked = target->ml.type;
369 		}
370 	}
371 
372 	/* we can grant the blocked lock (only
373 	 * possible if converting list empty) */
374 	if (can_grant) {
375 		spin_lock(&target->spinlock);
376 		BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
377 
378 		mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
379 		     "node: %u\n", res->lockname.len, res->lockname.name,
380 		     target->ml.type, target->ml.node);
381 
382 		// target->ml.type is already correct
383 		list_del_init(&target->list);
384 		list_add_tail(&target->list, &res->granted);
385 
386 		BUG_ON(!target->lksb);
387 		target->lksb->status = DLM_NORMAL;
388 
389 		spin_unlock(&target->spinlock);
390 
391 		__dlm_lockres_reserve_ast(res);
392 		dlm_queue_ast(dlm, target);
393 		/* go back and check for more */
394 		goto converting;
395 	}
396 
397 leave:
398 	return;
399 }
400 
401 /* must have NO locks when calling this with res !=NULL * */
402 void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
403 {
404 	mlog_entry("dlm=%p, res=%p\n", dlm, res);
405 	if (res) {
406 		spin_lock(&dlm->spinlock);
407 		spin_lock(&res->spinlock);
408 		__dlm_dirty_lockres(dlm, res);
409 		spin_unlock(&res->spinlock);
410 		spin_unlock(&dlm->spinlock);
411 	}
412 	wake_up(&dlm->dlm_thread_wq);
413 }
414 
415 void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
416 {
417 	mlog_entry("dlm=%p, res=%p\n", dlm, res);
418 
419 	assert_spin_locked(&dlm->spinlock);
420 	assert_spin_locked(&res->spinlock);
421 
422 	/* don't shuffle secondary queues */
423 	if ((res->owner == dlm->node_num) &&
424 	    !(res->state & DLM_LOCK_RES_DIRTY)) {
425 		list_add_tail(&res->dirty, &dlm->dirty_list);
426 		res->state |= DLM_LOCK_RES_DIRTY;
427 	}
428 }
429 
430 
431 /* Launch the NM thread for the mounted volume */
432 int dlm_launch_thread(struct dlm_ctxt *dlm)
433 {
434 	mlog(0, "starting dlm thread...\n");
435 
436 	dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
437 	if (IS_ERR(dlm->dlm_thread_task)) {
438 		mlog_errno(PTR_ERR(dlm->dlm_thread_task));
439 		dlm->dlm_thread_task = NULL;
440 		return -EINVAL;
441 	}
442 
443 	return 0;
444 }
445 
446 void dlm_complete_thread(struct dlm_ctxt *dlm)
447 {
448 	if (dlm->dlm_thread_task) {
449 		mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
450 		kthread_stop(dlm->dlm_thread_task);
451 		dlm->dlm_thread_task = NULL;
452 	}
453 }
454 
455 static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
456 {
457 	int empty;
458 
459 	spin_lock(&dlm->spinlock);
460 	empty = list_empty(&dlm->dirty_list);
461 	spin_unlock(&dlm->spinlock);
462 
463 	return empty;
464 }
465 
466 static void dlm_flush_asts(struct dlm_ctxt *dlm)
467 {
468 	int ret;
469 	struct dlm_lock *lock;
470 	struct dlm_lock_resource *res;
471 	u8 hi;
472 
473 	spin_lock(&dlm->ast_lock);
474 	while (!list_empty(&dlm->pending_asts)) {
475 		lock = list_entry(dlm->pending_asts.next,
476 				  struct dlm_lock, ast_list);
477 		/* get an extra ref on lock */
478 		dlm_lock_get(lock);
479 		res = lock->lockres;
480 		mlog(0, "delivering an ast for this lockres\n");
481 
482 		BUG_ON(!lock->ast_pending);
483 
484 		/* remove from list (including ref) */
485 		list_del_init(&lock->ast_list);
486 		dlm_lock_put(lock);
487 		spin_unlock(&dlm->ast_lock);
488 
489 		if (lock->ml.node != dlm->node_num) {
490 			ret = dlm_do_remote_ast(dlm, res, lock);
491 			if (ret < 0)
492 				mlog_errno(ret);
493 		} else
494 			dlm_do_local_ast(dlm, res, lock);
495 
496 		spin_lock(&dlm->ast_lock);
497 
498 		/* possible that another ast was queued while
499 		 * we were delivering the last one */
500 		if (!list_empty(&lock->ast_list)) {
501 			mlog(0, "aha another ast got queued while "
502 			     "we were finishing the last one.  will "
503 			     "keep the ast_pending flag set.\n");
504 		} else
505 			lock->ast_pending = 0;
506 
507 		/* drop the extra ref.
508 		 * this may drop it completely. */
509 		dlm_lock_put(lock);
510 		dlm_lockres_release_ast(dlm, res);
511 	}
512 
513 	while (!list_empty(&dlm->pending_basts)) {
514 		lock = list_entry(dlm->pending_basts.next,
515 				  struct dlm_lock, bast_list);
516 		/* get an extra ref on lock */
517 		dlm_lock_get(lock);
518 		res = lock->lockres;
519 
520 		BUG_ON(!lock->bast_pending);
521 
522 		/* get the highest blocked lock, and reset */
523 		spin_lock(&lock->spinlock);
524 		BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
525 		hi = lock->ml.highest_blocked;
526 		lock->ml.highest_blocked = LKM_IVMODE;
527 		spin_unlock(&lock->spinlock);
528 
529 		/* remove from list (including ref) */
530 		list_del_init(&lock->bast_list);
531 		dlm_lock_put(lock);
532 		spin_unlock(&dlm->ast_lock);
533 
534 		mlog(0, "delivering a bast for this lockres "
535 		     "(blocked = %d\n", hi);
536 
537 		if (lock->ml.node != dlm->node_num) {
538 			ret = dlm_send_proxy_bast(dlm, res, lock, hi);
539 			if (ret < 0)
540 				mlog_errno(ret);
541 		} else
542 			dlm_do_local_bast(dlm, res, lock, hi);
543 
544 		spin_lock(&dlm->ast_lock);
545 
546 		/* possible that another bast was queued while
547 		 * we were delivering the last one */
548 		if (!list_empty(&lock->bast_list)) {
549 			mlog(0, "aha another bast got queued while "
550 			     "we were finishing the last one.  will "
551 			     "keep the bast_pending flag set.\n");
552 		} else
553 			lock->bast_pending = 0;
554 
555 		/* drop the extra ref.
556 		 * this may drop it completely. */
557 		dlm_lock_put(lock);
558 		dlm_lockres_release_ast(dlm, res);
559 	}
560 	wake_up(&dlm->ast_wq);
561 	spin_unlock(&dlm->ast_lock);
562 }
563 
564 
565 #define DLM_THREAD_TIMEOUT_MS (4 * 1000)
566 #define DLM_THREAD_MAX_DIRTY  100
567 #define DLM_THREAD_MAX_ASTS   10
568 
569 static int dlm_thread(void *data)
570 {
571 	struct dlm_lock_resource *res;
572 	struct dlm_ctxt *dlm = data;
573 	unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
574 
575 	mlog(0, "dlm thread running for %s...\n", dlm->name);
576 
577 	while (!kthread_should_stop()) {
578 		int n = DLM_THREAD_MAX_DIRTY;
579 
580 		/* dlm_shutting_down is very point-in-time, but that
581 		 * doesn't matter as we'll just loop back around if we
582 		 * get false on the leading edge of a state
583 		 * transition. */
584 		dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
585 
586 		/* We really don't want to hold dlm->spinlock while
587 		 * calling dlm_shuffle_lists on each lockres that
588 		 * needs to have its queues adjusted and AST/BASTs
589 		 * run.  So let's pull each entry off the dirty_list
590 		 * and drop dlm->spinlock ASAP.  Once off the list,
591 		 * res->spinlock needs to be taken again to protect
592 		 * the queues while calling dlm_shuffle_lists.  */
593 		spin_lock(&dlm->spinlock);
594 		while (!list_empty(&dlm->dirty_list)) {
595 			int delay = 0;
596 			res = list_entry(dlm->dirty_list.next,
597 					 struct dlm_lock_resource, dirty);
598 
599 			/* peel a lockres off, remove it from the list,
600 			 * unset the dirty flag and drop the dlm lock */
601 			BUG_ON(!res);
602 			dlm_lockres_get(res);
603 
604 			spin_lock(&res->spinlock);
605 			res->state &= ~DLM_LOCK_RES_DIRTY;
606 			list_del_init(&res->dirty);
607 			spin_unlock(&res->spinlock);
608 			spin_unlock(&dlm->spinlock);
609 
610 		 	/* lockres can be re-dirtied/re-added to the
611 			 * dirty_list in this gap, but that is ok */
612 
613 			spin_lock(&res->spinlock);
614 			if (res->owner != dlm->node_num) {
615 				__dlm_print_one_lock_resource(res);
616 				mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
617 				     res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
618 				     res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
619 				     res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
620 				     res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
621 			}
622 			BUG_ON(res->owner != dlm->node_num);
623 
624 			/* it is now ok to move lockreses in these states
625 			 * to the dirty list, assuming that they will only be
626 			 * dirty for a short while. */
627 			if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
628 					  DLM_LOCK_RES_MIGRATING |
629 					  DLM_LOCK_RES_RECOVERING)) {
630 				/* move it to the tail and keep going */
631 				spin_unlock(&res->spinlock);
632 				mlog(0, "delaying list shuffling for in-"
633 				     "progress lockres %.*s, state=%d\n",
634 				     res->lockname.len, res->lockname.name,
635 				     res->state);
636 				delay = 1;
637 				goto in_progress;
638 			}
639 
640 			/* at this point the lockres is not migrating/
641 			 * recovering/in-progress.  we have the lockres
642 			 * spinlock and do NOT have the dlm lock.
643 			 * safe to reserve/queue asts and run the lists. */
644 
645 			mlog(0, "calling dlm_shuffle_lists with dlm=%p, "
646 			     "res=%p\n", dlm, res);
647 
648 			/* called while holding lockres lock */
649 			dlm_shuffle_lists(dlm, res);
650 			spin_unlock(&res->spinlock);
651 
652 			dlm_lockres_calc_usage(dlm, res);
653 
654 in_progress:
655 
656 			spin_lock(&dlm->spinlock);
657 			/* if the lock was in-progress, stick
658 			 * it on the back of the list */
659 			if (delay) {
660 				spin_lock(&res->spinlock);
661 				list_add_tail(&res->dirty, &dlm->dirty_list);
662 				res->state |= DLM_LOCK_RES_DIRTY;
663 				spin_unlock(&res->spinlock);
664 			}
665 			dlm_lockres_put(res);
666 
667 			/* unlikely, but we may need to give time to
668 			 * other tasks */
669 			if (!--n) {
670 				mlog(0, "throttling dlm_thread\n");
671 				break;
672 			}
673 		}
674 
675 		spin_unlock(&dlm->spinlock);
676 		dlm_flush_asts(dlm);
677 
678 		/* yield and continue right away if there is more work to do */
679 		if (!n) {
680 			yield();
681 			continue;
682 		}
683 
684 		wait_event_interruptible_timeout(dlm->dlm_thread_wq,
685 						 !dlm_dirty_list_empty(dlm) ||
686 						 kthread_should_stop(),
687 						 timeout);
688 	}
689 
690 	mlog(0, "quitting DLM thread\n");
691 	return 0;
692 }
693