xref: /linux/fs/ocfs2/dlmglue.c (revision 643d1f7fe3aa12c8bdea6fa5b4ba874ff6dd601d)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25 
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/crc32.h>
31 #include <linux/kthread.h>
32 #include <linux/pagemap.h>
33 #include <linux/debugfs.h>
34 #include <linux/seq_file.h>
35 
36 #include <cluster/heartbeat.h>
37 #include <cluster/nodemanager.h>
38 #include <cluster/tcp.h>
39 
40 #include <dlm/dlmapi.h>
41 
42 #define MLOG_MASK_PREFIX ML_DLM_GLUE
43 #include <cluster/masklog.h>
44 
45 #include "ocfs2.h"
46 
47 #include "alloc.h"
48 #include "dcache.h"
49 #include "dlmglue.h"
50 #include "extent_map.h"
51 #include "file.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58 
59 #include "buffer_head_io.h"
60 
61 struct ocfs2_mask_waiter {
62 	struct list_head	mw_item;
63 	int			mw_status;
64 	struct completion	mw_complete;
65 	unsigned long		mw_mask;
66 	unsigned long		mw_goal;
67 };
68 
69 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
70 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
72 
73 /*
74  * Return value from ->downconvert_worker functions.
75  *
76  * These control the precise actions of ocfs2_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81 	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
82 	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
83 				      * ->post_unlock callback */
84 	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
85 				      * ->post_unlock() callback. */
86 };
87 
88 struct ocfs2_unblock_ctl {
89 	int requeue;
90 	enum ocfs2_unblock_action unblock_action;
91 };
92 
93 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
94 					int new_level);
95 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
96 
97 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
98 				     int blocking);
99 
100 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
101 				       int blocking);
102 
103 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
104 				     struct ocfs2_lock_res *lockres);
105 
106 
107 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
108 
109 /* This aids in debugging situations where a bad LVB might be involved. */
110 static void ocfs2_dump_meta_lvb_info(u64 level,
111 				     const char *function,
112 				     unsigned int line,
113 				     struct ocfs2_lock_res *lockres)
114 {
115 	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
116 
117 	mlog(level, "LVB information for %s (called from %s:%u):\n",
118 	     lockres->l_name, function, line);
119 	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
120 	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
121 	     be32_to_cpu(lvb->lvb_igeneration));
122 	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
123 	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
124 	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
125 	     be16_to_cpu(lvb->lvb_imode));
126 	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
127 	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
128 	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
129 	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
130 	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
131 	     be32_to_cpu(lvb->lvb_iattr));
132 }
133 
134 
135 /*
136  * OCFS2 Lock Resource Operations
137  *
138  * These fine tune the behavior of the generic dlmglue locking infrastructure.
139  *
140  * The most basic of lock types can point ->l_priv to their respective
141  * struct ocfs2_super and allow the default actions to manage things.
142  *
143  * Right now, each lock type also needs to implement an init function,
144  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
145  * should be called when the lock is no longer needed (i.e., object
146  * destruction time).
147  */
148 struct ocfs2_lock_res_ops {
149 	/*
150 	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
151 	 * this callback if ->l_priv is not an ocfs2_super pointer
152 	 */
153 	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
154 
155 	/*
156 	 * Optionally called in the downconvert thread after a
157 	 * successful downconvert. The lockres will not be referenced
158 	 * after this callback is called, so it is safe to free
159 	 * memory, etc.
160 	 *
161 	 * The exact semantics of when this is called are controlled
162 	 * by ->downconvert_worker()
163 	 */
164 	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
165 
166 	/*
167 	 * Allow a lock type to add checks to determine whether it is
168 	 * safe to downconvert a lock. Return 0 to re-queue the
169 	 * downconvert at a later time, nonzero to continue.
170 	 *
171 	 * For most locks, the default checks that there are no
172 	 * incompatible holders are sufficient.
173 	 *
174 	 * Called with the lockres spinlock held.
175 	 */
176 	int (*check_downconvert)(struct ocfs2_lock_res *, int);
177 
178 	/*
179 	 * Allows a lock type to populate the lock value block. This
180 	 * is called on downconvert, and when we drop a lock.
181 	 *
182 	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
183 	 * in the flags field.
184 	 *
185 	 * Called with the lockres spinlock held.
186 	 */
187 	void (*set_lvb)(struct ocfs2_lock_res *);
188 
189 	/*
190 	 * Called from the downconvert thread when it is determined
191 	 * that a lock will be downconverted. This is called without
192 	 * any locks held so the function can do work that might
193 	 * schedule (syncing out data, etc).
194 	 *
195 	 * This should return any one of the ocfs2_unblock_action
196 	 * values, depending on what it wants the thread to do.
197 	 */
198 	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
199 
200 	/*
201 	 * LOCK_TYPE_* flags which describe the specific requirements
202 	 * of a lock type. Descriptions of each individual flag follow.
203 	 */
204 	int flags;
205 };
206 
207 /*
208  * Some locks want to "refresh" potentially stale data when a
209  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
210  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
211  * individual lockres l_flags member from the ast function. It is
212  * expected that the locking wrapper will clear the
213  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
214  */
215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
216 
217 /*
218  * Indicate that a lock type makes use of the lock value block. The
219  * ->set_lvb lock type callback must be defined.
220  */
221 #define LOCK_TYPE_USES_LVB		0x2
222 
223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
224 	.get_osb	= ocfs2_get_inode_osb,
225 	.flags		= 0,
226 };
227 
228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
229 	.get_osb	= ocfs2_get_inode_osb,
230 	.check_downconvert = ocfs2_check_meta_downconvert,
231 	.set_lvb	= ocfs2_set_meta_lvb,
232 	.downconvert_worker = ocfs2_data_convert_worker,
233 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
234 };
235 
236 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
237 	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
238 };
239 
240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
241 	.flags		= 0,
242 };
243 
244 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
245 	.get_osb	= ocfs2_get_dentry_osb,
246 	.post_unlock	= ocfs2_dentry_post_unlock,
247 	.downconvert_worker = ocfs2_dentry_convert_worker,
248 	.flags		= 0,
249 };
250 
251 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
252 	.get_osb	= ocfs2_get_inode_osb,
253 	.flags		= 0,
254 };
255 
256 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
257 	.get_osb	= ocfs2_get_file_osb,
258 	.flags		= 0,
259 };
260 
261 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
262 {
263 	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
264 		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
265 		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
266 }
267 
268 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
269 {
270 	BUG_ON(!ocfs2_is_inode_lock(lockres));
271 
272 	return (struct inode *) lockres->l_priv;
273 }
274 
275 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
276 {
277 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
278 
279 	return (struct ocfs2_dentry_lock *)lockres->l_priv;
280 }
281 
282 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
283 {
284 	if (lockres->l_ops->get_osb)
285 		return lockres->l_ops->get_osb(lockres);
286 
287 	return (struct ocfs2_super *)lockres->l_priv;
288 }
289 
290 static int ocfs2_lock_create(struct ocfs2_super *osb,
291 			     struct ocfs2_lock_res *lockres,
292 			     int level,
293 			     int dlm_flags);
294 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
295 						     int wanted);
296 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
297 				 struct ocfs2_lock_res *lockres,
298 				 int level);
299 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
300 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
301 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
302 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
303 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
304 					struct ocfs2_lock_res *lockres);
305 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
306 						int convert);
307 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
308 	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
309 		"resource %s: %s\n", dlm_errname(_stat), _func,	\
310 		_lockres->l_name, dlm_errmsg(_stat));		\
311 } while (0)
312 static int ocfs2_downconvert_thread(void *arg);
313 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
314 					struct ocfs2_lock_res *lockres);
315 static int ocfs2_inode_lock_update(struct inode *inode,
316 				  struct buffer_head **bh);
317 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
318 static inline int ocfs2_highest_compat_lock_level(int level);
319 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
320 				      int new_level);
321 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
322 				  struct ocfs2_lock_res *lockres,
323 				  int new_level,
324 				  int lvb);
325 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
326 				        struct ocfs2_lock_res *lockres);
327 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
328 				struct ocfs2_lock_res *lockres);
329 
330 
331 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
332 				  u64 blkno,
333 				  u32 generation,
334 				  char *name)
335 {
336 	int len;
337 
338 	mlog_entry_void();
339 
340 	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
341 
342 	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
343 		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
344 		       (long long)blkno, generation);
345 
346 	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
347 
348 	mlog(0, "built lock resource with name: %s\n", name);
349 
350 	mlog_exit_void();
351 }
352 
353 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
354 
355 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
356 				       struct ocfs2_dlm_debug *dlm_debug)
357 {
358 	mlog(0, "Add tracking for lockres %s\n", res->l_name);
359 
360 	spin_lock(&ocfs2_dlm_tracking_lock);
361 	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
362 	spin_unlock(&ocfs2_dlm_tracking_lock);
363 }
364 
365 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
366 {
367 	spin_lock(&ocfs2_dlm_tracking_lock);
368 	if (!list_empty(&res->l_debug_list))
369 		list_del_init(&res->l_debug_list);
370 	spin_unlock(&ocfs2_dlm_tracking_lock);
371 }
372 
373 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
374 				       struct ocfs2_lock_res *res,
375 				       enum ocfs2_lock_type type,
376 				       struct ocfs2_lock_res_ops *ops,
377 				       void *priv)
378 {
379 	res->l_type          = type;
380 	res->l_ops           = ops;
381 	res->l_priv          = priv;
382 
383 	res->l_level         = LKM_IVMODE;
384 	res->l_requested     = LKM_IVMODE;
385 	res->l_blocking      = LKM_IVMODE;
386 	res->l_action        = OCFS2_AST_INVALID;
387 	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
388 
389 	res->l_flags         = OCFS2_LOCK_INITIALIZED;
390 
391 	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
392 }
393 
394 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
395 {
396 	/* This also clears out the lock status block */
397 	memset(res, 0, sizeof(struct ocfs2_lock_res));
398 	spin_lock_init(&res->l_lock);
399 	init_waitqueue_head(&res->l_event);
400 	INIT_LIST_HEAD(&res->l_blocked_list);
401 	INIT_LIST_HEAD(&res->l_mask_waiters);
402 }
403 
404 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
405 			       enum ocfs2_lock_type type,
406 			       unsigned int generation,
407 			       struct inode *inode)
408 {
409 	struct ocfs2_lock_res_ops *ops;
410 
411 	switch(type) {
412 		case OCFS2_LOCK_TYPE_RW:
413 			ops = &ocfs2_inode_rw_lops;
414 			break;
415 		case OCFS2_LOCK_TYPE_META:
416 			ops = &ocfs2_inode_inode_lops;
417 			break;
418 		case OCFS2_LOCK_TYPE_OPEN:
419 			ops = &ocfs2_inode_open_lops;
420 			break;
421 		default:
422 			mlog_bug_on_msg(1, "type: %d\n", type);
423 			ops = NULL; /* thanks, gcc */
424 			break;
425 	};
426 
427 	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
428 			      generation, res->l_name);
429 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
430 }
431 
432 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
433 {
434 	struct inode *inode = ocfs2_lock_res_inode(lockres);
435 
436 	return OCFS2_SB(inode->i_sb);
437 }
438 
439 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
440 {
441 	struct ocfs2_file_private *fp = lockres->l_priv;
442 
443 	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
444 }
445 
446 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
447 {
448 	__be64 inode_blkno_be;
449 
450 	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
451 	       sizeof(__be64));
452 
453 	return be64_to_cpu(inode_blkno_be);
454 }
455 
456 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
457 {
458 	struct ocfs2_dentry_lock *dl = lockres->l_priv;
459 
460 	return OCFS2_SB(dl->dl_inode->i_sb);
461 }
462 
463 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
464 				u64 parent, struct inode *inode)
465 {
466 	int len;
467 	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
468 	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
469 	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
470 
471 	ocfs2_lock_res_init_once(lockres);
472 
473 	/*
474 	 * Unfortunately, the standard lock naming scheme won't work
475 	 * here because we have two 16 byte values to use. Instead,
476 	 * we'll stuff the inode number as a binary value. We still
477 	 * want error prints to show something without garbling the
478 	 * display, so drop a null byte in there before the inode
479 	 * number. A future version of OCFS2 will likely use all
480 	 * binary lock names. The stringified names have been a
481 	 * tremendous aid in debugging, but now that the debugfs
482 	 * interface exists, we can mangle things there if need be.
483 	 *
484 	 * NOTE: We also drop the standard "pad" value (the total lock
485 	 * name size stays the same though - the last part is all
486 	 * zeros due to the memset in ocfs2_lock_res_init_once()
487 	 */
488 	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
489 		       "%c%016llx",
490 		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
491 		       (long long)parent);
492 
493 	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
494 
495 	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
496 	       sizeof(__be64));
497 
498 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
499 				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
500 				   dl);
501 }
502 
503 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
504 				      struct ocfs2_super *osb)
505 {
506 	/* Superblock lockres doesn't come from a slab so we call init
507 	 * once on it manually.  */
508 	ocfs2_lock_res_init_once(res);
509 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
510 			      0, res->l_name);
511 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
512 				   &ocfs2_super_lops, osb);
513 }
514 
515 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
516 				       struct ocfs2_super *osb)
517 {
518 	/* Rename lockres doesn't come from a slab so we call init
519 	 * once on it manually.  */
520 	ocfs2_lock_res_init_once(res);
521 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
522 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
523 				   &ocfs2_rename_lops, osb);
524 }
525 
526 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
527 			      struct ocfs2_file_private *fp)
528 {
529 	struct inode *inode = fp->fp_file->f_mapping->host;
530 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
531 
532 	ocfs2_lock_res_init_once(lockres);
533 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
534 			      inode->i_generation, lockres->l_name);
535 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
536 				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
537 				   fp);
538 	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
539 }
540 
541 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
542 {
543 	mlog_entry_void();
544 
545 	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
546 		return;
547 
548 	ocfs2_remove_lockres_tracking(res);
549 
550 	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
551 			"Lockres %s is on the blocked list\n",
552 			res->l_name);
553 	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
554 			"Lockres %s has mask waiters pending\n",
555 			res->l_name);
556 	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
557 			"Lockres %s is locked\n",
558 			res->l_name);
559 	mlog_bug_on_msg(res->l_ro_holders,
560 			"Lockres %s has %u ro holders\n",
561 			res->l_name, res->l_ro_holders);
562 	mlog_bug_on_msg(res->l_ex_holders,
563 			"Lockres %s has %u ex holders\n",
564 			res->l_name, res->l_ex_holders);
565 
566 	/* Need to clear out the lock status block for the dlm */
567 	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
568 
569 	res->l_flags = 0UL;
570 	mlog_exit_void();
571 }
572 
573 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
574 				     int level)
575 {
576 	mlog_entry_void();
577 
578 	BUG_ON(!lockres);
579 
580 	switch(level) {
581 	case LKM_EXMODE:
582 		lockres->l_ex_holders++;
583 		break;
584 	case LKM_PRMODE:
585 		lockres->l_ro_holders++;
586 		break;
587 	default:
588 		BUG();
589 	}
590 
591 	mlog_exit_void();
592 }
593 
594 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
595 				     int level)
596 {
597 	mlog_entry_void();
598 
599 	BUG_ON(!lockres);
600 
601 	switch(level) {
602 	case LKM_EXMODE:
603 		BUG_ON(!lockres->l_ex_holders);
604 		lockres->l_ex_holders--;
605 		break;
606 	case LKM_PRMODE:
607 		BUG_ON(!lockres->l_ro_holders);
608 		lockres->l_ro_holders--;
609 		break;
610 	default:
611 		BUG();
612 	}
613 	mlog_exit_void();
614 }
615 
616 /* WARNING: This function lives in a world where the only three lock
617  * levels are EX, PR, and NL. It *will* have to be adjusted when more
618  * lock types are added. */
619 static inline int ocfs2_highest_compat_lock_level(int level)
620 {
621 	int new_level = LKM_EXMODE;
622 
623 	if (level == LKM_EXMODE)
624 		new_level = LKM_NLMODE;
625 	else if (level == LKM_PRMODE)
626 		new_level = LKM_PRMODE;
627 	return new_level;
628 }
629 
630 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
631 			      unsigned long newflags)
632 {
633 	struct ocfs2_mask_waiter *mw, *tmp;
634 
635  	assert_spin_locked(&lockres->l_lock);
636 
637 	lockres->l_flags = newflags;
638 
639 	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
640 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
641 			continue;
642 
643 		list_del_init(&mw->mw_item);
644 		mw->mw_status = 0;
645 		complete(&mw->mw_complete);
646 	}
647 }
648 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
649 {
650 	lockres_set_flags(lockres, lockres->l_flags | or);
651 }
652 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
653 				unsigned long clear)
654 {
655 	lockres_set_flags(lockres, lockres->l_flags & ~clear);
656 }
657 
658 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
659 {
660 	mlog_entry_void();
661 
662 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
663 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
664 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
665 	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
666 
667 	lockres->l_level = lockres->l_requested;
668 	if (lockres->l_level <=
669 	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
670 		lockres->l_blocking = LKM_NLMODE;
671 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
672 	}
673 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
674 
675 	mlog_exit_void();
676 }
677 
678 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
679 {
680 	mlog_entry_void();
681 
682 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
683 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
684 
685 	/* Convert from RO to EX doesn't really need anything as our
686 	 * information is already up to data. Convert from NL to
687 	 * *anything* however should mark ourselves as needing an
688 	 * update */
689 	if (lockres->l_level == LKM_NLMODE &&
690 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
691 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
692 
693 	lockres->l_level = lockres->l_requested;
694 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
695 
696 	mlog_exit_void();
697 }
698 
699 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
700 {
701 	mlog_entry_void();
702 
703 	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
704 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
705 
706 	if (lockres->l_requested > LKM_NLMODE &&
707 	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
708 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
709 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
710 
711 	lockres->l_level = lockres->l_requested;
712 	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
713 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
714 
715 	mlog_exit_void();
716 }
717 
718 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
719 				     int level)
720 {
721 	int needs_downconvert = 0;
722 	mlog_entry_void();
723 
724 	assert_spin_locked(&lockres->l_lock);
725 
726 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
727 
728 	if (level > lockres->l_blocking) {
729 		/* only schedule a downconvert if we haven't already scheduled
730 		 * one that goes low enough to satisfy the level we're
731 		 * blocking.  this also catches the case where we get
732 		 * duplicate BASTs */
733 		if (ocfs2_highest_compat_lock_level(level) <
734 		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
735 			needs_downconvert = 1;
736 
737 		lockres->l_blocking = level;
738 	}
739 
740 	mlog_exit(needs_downconvert);
741 	return needs_downconvert;
742 }
743 
744 static void ocfs2_blocking_ast(void *opaque, int level)
745 {
746 	struct ocfs2_lock_res *lockres = opaque;
747 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
748 	int needs_downconvert;
749 	unsigned long flags;
750 
751 	BUG_ON(level <= LKM_NLMODE);
752 
753 	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
754 	     lockres->l_name, level, lockres->l_level,
755 	     ocfs2_lock_type_string(lockres->l_type));
756 
757 	/*
758 	 * We can skip the bast for locks which don't enable caching -
759 	 * they'll be dropped at the earliest possible time anyway.
760 	 */
761 	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
762 		return;
763 
764 	spin_lock_irqsave(&lockres->l_lock, flags);
765 	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
766 	if (needs_downconvert)
767 		ocfs2_schedule_blocked_lock(osb, lockres);
768 	spin_unlock_irqrestore(&lockres->l_lock, flags);
769 
770 	wake_up(&lockres->l_event);
771 
772 	ocfs2_wake_downconvert_thread(osb);
773 }
774 
775 static void ocfs2_locking_ast(void *opaque)
776 {
777 	struct ocfs2_lock_res *lockres = opaque;
778 	struct dlm_lockstatus *lksb = &lockres->l_lksb;
779 	unsigned long flags;
780 
781 	spin_lock_irqsave(&lockres->l_lock, flags);
782 
783 	if (lksb->status != DLM_NORMAL) {
784 		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
785 		     lockres->l_name, lksb->status);
786 		spin_unlock_irqrestore(&lockres->l_lock, flags);
787 		return;
788 	}
789 
790 	switch(lockres->l_action) {
791 	case OCFS2_AST_ATTACH:
792 		ocfs2_generic_handle_attach_action(lockres);
793 		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
794 		break;
795 	case OCFS2_AST_CONVERT:
796 		ocfs2_generic_handle_convert_action(lockres);
797 		break;
798 	case OCFS2_AST_DOWNCONVERT:
799 		ocfs2_generic_handle_downconvert_action(lockres);
800 		break;
801 	default:
802 		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
803 		     "lockres flags = 0x%lx, unlock action: %u\n",
804 		     lockres->l_name, lockres->l_action, lockres->l_flags,
805 		     lockres->l_unlock_action);
806 		BUG();
807 	}
808 
809 	/* set it to something invalid so if we get called again we
810 	 * can catch it. */
811 	lockres->l_action = OCFS2_AST_INVALID;
812 
813 	wake_up(&lockres->l_event);
814 	spin_unlock_irqrestore(&lockres->l_lock, flags);
815 }
816 
817 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
818 						int convert)
819 {
820 	unsigned long flags;
821 
822 	mlog_entry_void();
823 	spin_lock_irqsave(&lockres->l_lock, flags);
824 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
825 	if (convert)
826 		lockres->l_action = OCFS2_AST_INVALID;
827 	else
828 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
829 	spin_unlock_irqrestore(&lockres->l_lock, flags);
830 
831 	wake_up(&lockres->l_event);
832 	mlog_exit_void();
833 }
834 
835 /* Note: If we detect another process working on the lock (i.e.,
836  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
837  * to do the right thing in that case.
838  */
839 static int ocfs2_lock_create(struct ocfs2_super *osb,
840 			     struct ocfs2_lock_res *lockres,
841 			     int level,
842 			     int dlm_flags)
843 {
844 	int ret = 0;
845 	enum dlm_status status = DLM_NORMAL;
846 	unsigned long flags;
847 
848 	mlog_entry_void();
849 
850 	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
851 	     dlm_flags);
852 
853 	spin_lock_irqsave(&lockres->l_lock, flags);
854 	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
855 	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
856 		spin_unlock_irqrestore(&lockres->l_lock, flags);
857 		goto bail;
858 	}
859 
860 	lockres->l_action = OCFS2_AST_ATTACH;
861 	lockres->l_requested = level;
862 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
863 	spin_unlock_irqrestore(&lockres->l_lock, flags);
864 
865 	status = dlmlock(osb->dlm,
866 			 level,
867 			 &lockres->l_lksb,
868 			 dlm_flags,
869 			 lockres->l_name,
870 			 OCFS2_LOCK_ID_MAX_LEN - 1,
871 			 ocfs2_locking_ast,
872 			 lockres,
873 			 ocfs2_blocking_ast);
874 	if (status != DLM_NORMAL) {
875 		ocfs2_log_dlm_error("dlmlock", status, lockres);
876 		ret = -EINVAL;
877 		ocfs2_recover_from_dlm_error(lockres, 1);
878 	}
879 
880 	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
881 
882 bail:
883 	mlog_exit(ret);
884 	return ret;
885 }
886 
887 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
888 					int flag)
889 {
890 	unsigned long flags;
891 	int ret;
892 
893 	spin_lock_irqsave(&lockres->l_lock, flags);
894 	ret = lockres->l_flags & flag;
895 	spin_unlock_irqrestore(&lockres->l_lock, flags);
896 
897 	return ret;
898 }
899 
900 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
901 
902 {
903 	wait_event(lockres->l_event,
904 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
905 }
906 
907 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
908 
909 {
910 	wait_event(lockres->l_event,
911 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
912 }
913 
914 /* predict what lock level we'll be dropping down to on behalf
915  * of another node, and return true if the currently wanted
916  * level will be compatible with it. */
917 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
918 						     int wanted)
919 {
920 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
921 
922 	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
923 }
924 
925 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
926 {
927 	INIT_LIST_HEAD(&mw->mw_item);
928 	init_completion(&mw->mw_complete);
929 }
930 
931 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
932 {
933 	wait_for_completion(&mw->mw_complete);
934 	/* Re-arm the completion in case we want to wait on it again */
935 	INIT_COMPLETION(mw->mw_complete);
936 	return mw->mw_status;
937 }
938 
939 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
940 				    struct ocfs2_mask_waiter *mw,
941 				    unsigned long mask,
942 				    unsigned long goal)
943 {
944 	BUG_ON(!list_empty(&mw->mw_item));
945 
946 	assert_spin_locked(&lockres->l_lock);
947 
948 	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
949 	mw->mw_mask = mask;
950 	mw->mw_goal = goal;
951 }
952 
953 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
954  * if the mask still hadn't reached its goal */
955 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
956 				      struct ocfs2_mask_waiter *mw)
957 {
958 	unsigned long flags;
959 	int ret = 0;
960 
961 	spin_lock_irqsave(&lockres->l_lock, flags);
962 	if (!list_empty(&mw->mw_item)) {
963 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
964 			ret = -EBUSY;
965 
966 		list_del_init(&mw->mw_item);
967 		init_completion(&mw->mw_complete);
968 	}
969 	spin_unlock_irqrestore(&lockres->l_lock, flags);
970 
971 	return ret;
972 
973 }
974 
975 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
976 					     struct ocfs2_lock_res *lockres)
977 {
978 	int ret;
979 
980 	ret = wait_for_completion_interruptible(&mw->mw_complete);
981 	if (ret)
982 		lockres_remove_mask_waiter(lockres, mw);
983 	else
984 		ret = mw->mw_status;
985 	/* Re-arm the completion in case we want to wait on it again */
986 	INIT_COMPLETION(mw->mw_complete);
987 	return ret;
988 }
989 
990 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
991 			      struct ocfs2_lock_res *lockres,
992 			      int level,
993 			      int lkm_flags,
994 			      int arg_flags)
995 {
996 	struct ocfs2_mask_waiter mw;
997 	enum dlm_status status;
998 	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
999 	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1000 	unsigned long flags;
1001 
1002 	mlog_entry_void();
1003 
1004 	ocfs2_init_mask_waiter(&mw);
1005 
1006 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1007 		lkm_flags |= LKM_VALBLK;
1008 
1009 again:
1010 	wait = 0;
1011 
1012 	if (catch_signals && signal_pending(current)) {
1013 		ret = -ERESTARTSYS;
1014 		goto out;
1015 	}
1016 
1017 	spin_lock_irqsave(&lockres->l_lock, flags);
1018 
1019 	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1020 			"Cluster lock called on freeing lockres %s! flags "
1021 			"0x%lx\n", lockres->l_name, lockres->l_flags);
1022 
1023 	/* We only compare against the currently granted level
1024 	 * here. If the lock is blocked waiting on a downconvert,
1025 	 * we'll get caught below. */
1026 	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1027 	    level > lockres->l_level) {
1028 		/* is someone sitting in dlm_lock? If so, wait on
1029 		 * them. */
1030 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1031 		wait = 1;
1032 		goto unlock;
1033 	}
1034 
1035 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1036 	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1037 		/* is the lock is currently blocked on behalf of
1038 		 * another node */
1039 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1040 		wait = 1;
1041 		goto unlock;
1042 	}
1043 
1044 	if (level > lockres->l_level) {
1045 		if (lockres->l_action != OCFS2_AST_INVALID)
1046 			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1047 			     lockres->l_name, lockres->l_action);
1048 
1049 		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1050 			lockres->l_action = OCFS2_AST_ATTACH;
1051 			lkm_flags &= ~LKM_CONVERT;
1052 		} else {
1053 			lockres->l_action = OCFS2_AST_CONVERT;
1054 			lkm_flags |= LKM_CONVERT;
1055 		}
1056 
1057 		lockres->l_requested = level;
1058 		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1059 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1060 
1061 		BUG_ON(level == LKM_IVMODE);
1062 		BUG_ON(level == LKM_NLMODE);
1063 
1064 		mlog(0, "lock %s, convert from %d to level = %d\n",
1065 		     lockres->l_name, lockres->l_level, level);
1066 
1067 		/* call dlm_lock to upgrade lock now */
1068 		status = dlmlock(osb->dlm,
1069 				 level,
1070 				 &lockres->l_lksb,
1071 				 lkm_flags,
1072 				 lockres->l_name,
1073 				 OCFS2_LOCK_ID_MAX_LEN - 1,
1074 				 ocfs2_locking_ast,
1075 				 lockres,
1076 				 ocfs2_blocking_ast);
1077 		if (status != DLM_NORMAL) {
1078 			if ((lkm_flags & LKM_NOQUEUE) &&
1079 			    (status == DLM_NOTQUEUED))
1080 				ret = -EAGAIN;
1081 			else {
1082 				ocfs2_log_dlm_error("dlmlock", status,
1083 						    lockres);
1084 				ret = -EINVAL;
1085 			}
1086 			ocfs2_recover_from_dlm_error(lockres, 1);
1087 			goto out;
1088 		}
1089 
1090 		mlog(0, "lock %s, successfull return from dlmlock\n",
1091 		     lockres->l_name);
1092 
1093 		/* At this point we've gone inside the dlm and need to
1094 		 * complete our work regardless. */
1095 		catch_signals = 0;
1096 
1097 		/* wait for busy to clear and carry on */
1098 		goto again;
1099 	}
1100 
1101 	/* Ok, if we get here then we're good to go. */
1102 	ocfs2_inc_holders(lockres, level);
1103 
1104 	ret = 0;
1105 unlock:
1106 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1107 out:
1108 	/*
1109 	 * This is helping work around a lock inversion between the page lock
1110 	 * and dlm locks.  One path holds the page lock while calling aops
1111 	 * which block acquiring dlm locks.  The voting thread holds dlm
1112 	 * locks while acquiring page locks while down converting data locks.
1113 	 * This block is helping an aop path notice the inversion and back
1114 	 * off to unlock its page lock before trying the dlm lock again.
1115 	 */
1116 	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1117 	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1118 		wait = 0;
1119 		if (lockres_remove_mask_waiter(lockres, &mw))
1120 			ret = -EAGAIN;
1121 		else
1122 			goto again;
1123 	}
1124 	if (wait) {
1125 		ret = ocfs2_wait_for_mask(&mw);
1126 		if (ret == 0)
1127 			goto again;
1128 		mlog_errno(ret);
1129 	}
1130 
1131 	mlog_exit(ret);
1132 	return ret;
1133 }
1134 
1135 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1136 				 struct ocfs2_lock_res *lockres,
1137 				 int level)
1138 {
1139 	unsigned long flags;
1140 
1141 	mlog_entry_void();
1142 	spin_lock_irqsave(&lockres->l_lock, flags);
1143 	ocfs2_dec_holders(lockres, level);
1144 	ocfs2_downconvert_on_unlock(osb, lockres);
1145 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1146 	mlog_exit_void();
1147 }
1148 
1149 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1150 				 struct ocfs2_lock_res *lockres,
1151 				 int ex,
1152 				 int local)
1153 {
1154 	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1155 	unsigned long flags;
1156 	int lkm_flags = local ? LKM_LOCAL : 0;
1157 
1158 	spin_lock_irqsave(&lockres->l_lock, flags);
1159 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1160 	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1161 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1162 
1163 	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1164 }
1165 
1166 /* Grants us an EX lock on the data and metadata resources, skipping
1167  * the normal cluster directory lookup. Use this ONLY on newly created
1168  * inodes which other nodes can't possibly see, and which haven't been
1169  * hashed in the inode hash yet. This can give us a good performance
1170  * increase as it'll skip the network broadcast normally associated
1171  * with creating a new lock resource. */
1172 int ocfs2_create_new_inode_locks(struct inode *inode)
1173 {
1174 	int ret;
1175 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1176 
1177 	BUG_ON(!inode);
1178 	BUG_ON(!ocfs2_inode_is_new(inode));
1179 
1180 	mlog_entry_void();
1181 
1182 	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1183 
1184 	/* NOTE: That we don't increment any of the holder counts, nor
1185 	 * do we add anything to a journal handle. Since this is
1186 	 * supposed to be a new inode which the cluster doesn't know
1187 	 * about yet, there is no need to.  As far as the LVB handling
1188 	 * is concerned, this is basically like acquiring an EX lock
1189 	 * on a resource which has an invalid one -- we'll set it
1190 	 * valid when we release the EX. */
1191 
1192 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1193 	if (ret) {
1194 		mlog_errno(ret);
1195 		goto bail;
1196 	}
1197 
1198 	/*
1199 	 * We don't want to use LKM_LOCAL on a meta data lock as they
1200 	 * don't use a generation in their lock names.
1201 	 */
1202 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1203 	if (ret) {
1204 		mlog_errno(ret);
1205 		goto bail;
1206 	}
1207 
1208 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1209 	if (ret) {
1210 		mlog_errno(ret);
1211 		goto bail;
1212 	}
1213 
1214 bail:
1215 	mlog_exit(ret);
1216 	return ret;
1217 }
1218 
1219 int ocfs2_rw_lock(struct inode *inode, int write)
1220 {
1221 	int status, level;
1222 	struct ocfs2_lock_res *lockres;
1223 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1224 
1225 	BUG_ON(!inode);
1226 
1227 	mlog_entry_void();
1228 
1229 	mlog(0, "inode %llu take %s RW lock\n",
1230 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1231 	     write ? "EXMODE" : "PRMODE");
1232 
1233 	if (ocfs2_mount_local(osb))
1234 		return 0;
1235 
1236 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1237 
1238 	level = write ? LKM_EXMODE : LKM_PRMODE;
1239 
1240 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1241 				    0);
1242 	if (status < 0)
1243 		mlog_errno(status);
1244 
1245 	mlog_exit(status);
1246 	return status;
1247 }
1248 
1249 void ocfs2_rw_unlock(struct inode *inode, int write)
1250 {
1251 	int level = write ? LKM_EXMODE : LKM_PRMODE;
1252 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1253 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1254 
1255 	mlog_entry_void();
1256 
1257 	mlog(0, "inode %llu drop %s RW lock\n",
1258 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1259 	     write ? "EXMODE" : "PRMODE");
1260 
1261 	if (!ocfs2_mount_local(osb))
1262 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1263 
1264 	mlog_exit_void();
1265 }
1266 
1267 /*
1268  * ocfs2_open_lock always get PR mode lock.
1269  */
1270 int ocfs2_open_lock(struct inode *inode)
1271 {
1272 	int status = 0;
1273 	struct ocfs2_lock_res *lockres;
1274 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1275 
1276 	BUG_ON(!inode);
1277 
1278 	mlog_entry_void();
1279 
1280 	mlog(0, "inode %llu take PRMODE open lock\n",
1281 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1282 
1283 	if (ocfs2_mount_local(osb))
1284 		goto out;
1285 
1286 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1287 
1288 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1289 				    LKM_PRMODE, 0, 0);
1290 	if (status < 0)
1291 		mlog_errno(status);
1292 
1293 out:
1294 	mlog_exit(status);
1295 	return status;
1296 }
1297 
1298 int ocfs2_try_open_lock(struct inode *inode, int write)
1299 {
1300 	int status = 0, level;
1301 	struct ocfs2_lock_res *lockres;
1302 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1303 
1304 	BUG_ON(!inode);
1305 
1306 	mlog_entry_void();
1307 
1308 	mlog(0, "inode %llu try to take %s open lock\n",
1309 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1310 	     write ? "EXMODE" : "PRMODE");
1311 
1312 	if (ocfs2_mount_local(osb))
1313 		goto out;
1314 
1315 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1316 
1317 	level = write ? LKM_EXMODE : LKM_PRMODE;
1318 
1319 	/*
1320 	 * The file system may already holding a PRMODE/EXMODE open lock.
1321 	 * Since we pass LKM_NOQUEUE, the request won't block waiting on
1322 	 * other nodes and the -EAGAIN will indicate to the caller that
1323 	 * this inode is still in use.
1324 	 */
1325 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1326 				    level, LKM_NOQUEUE, 0);
1327 
1328 out:
1329 	mlog_exit(status);
1330 	return status;
1331 }
1332 
1333 /*
1334  * ocfs2_open_unlock unlock PR and EX mode open locks.
1335  */
1336 void ocfs2_open_unlock(struct inode *inode)
1337 {
1338 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1339 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1340 
1341 	mlog_entry_void();
1342 
1343 	mlog(0, "inode %llu drop open lock\n",
1344 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1345 
1346 	if (ocfs2_mount_local(osb))
1347 		goto out;
1348 
1349 	if(lockres->l_ro_holders)
1350 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1351 				     LKM_PRMODE);
1352 	if(lockres->l_ex_holders)
1353 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1354 				     LKM_EXMODE);
1355 
1356 out:
1357 	mlog_exit_void();
1358 }
1359 
1360 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1361 				     int level)
1362 {
1363 	int ret;
1364 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1365 	unsigned long flags;
1366 	struct ocfs2_mask_waiter mw;
1367 
1368 	ocfs2_init_mask_waiter(&mw);
1369 
1370 retry_cancel:
1371 	spin_lock_irqsave(&lockres->l_lock, flags);
1372 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1373 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1374 		if (ret) {
1375 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1376 			ret = ocfs2_cancel_convert(osb, lockres);
1377 			if (ret < 0) {
1378 				mlog_errno(ret);
1379 				goto out;
1380 			}
1381 			goto retry_cancel;
1382 		}
1383 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1384 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1385 
1386 		ocfs2_wait_for_mask(&mw);
1387 		goto retry_cancel;
1388 	}
1389 
1390 	ret = -ERESTARTSYS;
1391 	/*
1392 	 * We may still have gotten the lock, in which case there's no
1393 	 * point to restarting the syscall.
1394 	 */
1395 	if (lockres->l_level == level)
1396 		ret = 0;
1397 
1398 	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1399 	     lockres->l_flags, lockres->l_level, lockres->l_action);
1400 
1401 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1402 
1403 out:
1404 	return ret;
1405 }
1406 
1407 /*
1408  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1409  * flock() calls. The locking approach this requires is sufficiently
1410  * different from all other cluster lock types that we implement a
1411  * seperate path to the "low-level" dlm calls. In particular:
1412  *
1413  * - No optimization of lock levels is done - we take at exactly
1414  *   what's been requested.
1415  *
1416  * - No lock caching is employed. We immediately downconvert to
1417  *   no-lock at unlock time. This also means flock locks never go on
1418  *   the blocking list).
1419  *
1420  * - Since userspace can trivially deadlock itself with flock, we make
1421  *   sure to allow cancellation of a misbehaving applications flock()
1422  *   request.
1423  *
1424  * - Access to any flock lockres doesn't require concurrency, so we
1425  *   can simplify the code by requiring the caller to guarantee
1426  *   serialization of dlmglue flock calls.
1427  */
1428 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1429 {
1430 	int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
1431 	unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
1432 	unsigned long flags;
1433 	struct ocfs2_file_private *fp = file->private_data;
1434 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1435 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1436 	struct ocfs2_mask_waiter mw;
1437 
1438 	ocfs2_init_mask_waiter(&mw);
1439 
1440 	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1441 	    (lockres->l_level > LKM_NLMODE)) {
1442 		mlog(ML_ERROR,
1443 		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1444 		     "level: %u\n", lockres->l_name, lockres->l_flags,
1445 		     lockres->l_level);
1446 		return -EINVAL;
1447 	}
1448 
1449 	spin_lock_irqsave(&lockres->l_lock, flags);
1450 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1451 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1452 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1453 
1454 		/*
1455 		 * Get the lock at NLMODE to start - that way we
1456 		 * can cancel the upconvert request if need be.
1457 		 */
1458 		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
1459 		if (ret < 0) {
1460 			mlog_errno(ret);
1461 			goto out;
1462 		}
1463 
1464 		ret = ocfs2_wait_for_mask(&mw);
1465 		if (ret) {
1466 			mlog_errno(ret);
1467 			goto out;
1468 		}
1469 		spin_lock_irqsave(&lockres->l_lock, flags);
1470 	}
1471 
1472 	lockres->l_action = OCFS2_AST_CONVERT;
1473 	lkm_flags |= LKM_CONVERT;
1474 	lockres->l_requested = level;
1475 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1476 
1477 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1478 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1479 
1480 	ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
1481 		      lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1482 		      ocfs2_locking_ast, lockres, ocfs2_blocking_ast);
1483 	if (ret != DLM_NORMAL) {
1484 		if (trylock && ret == DLM_NOTQUEUED)
1485 			ret = -EAGAIN;
1486 		else {
1487 			ocfs2_log_dlm_error("dlmlock", ret, lockres);
1488 			ret = -EINVAL;
1489 		}
1490 
1491 		ocfs2_recover_from_dlm_error(lockres, 1);
1492 		lockres_remove_mask_waiter(lockres, &mw);
1493 		goto out;
1494 	}
1495 
1496 	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1497 	if (ret == -ERESTARTSYS) {
1498 		/*
1499 		 * Userspace can cause deadlock itself with
1500 		 * flock(). Current behavior locally is to allow the
1501 		 * deadlock, but abort the system call if a signal is
1502 		 * received. We follow this example, otherwise a
1503 		 * poorly written program could sit in kernel until
1504 		 * reboot.
1505 		 *
1506 		 * Handling this is a bit more complicated for Ocfs2
1507 		 * though. We can't exit this function with an
1508 		 * outstanding lock request, so a cancel convert is
1509 		 * required. We intentionally overwrite 'ret' - if the
1510 		 * cancel fails and the lock was granted, it's easier
1511 		 * to just bubble sucess back up to the user.
1512 		 */
1513 		ret = ocfs2_flock_handle_signal(lockres, level);
1514 	}
1515 
1516 out:
1517 
1518 	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1519 	     lockres->l_name, ex, trylock, ret);
1520 	return ret;
1521 }
1522 
1523 void ocfs2_file_unlock(struct file *file)
1524 {
1525 	int ret;
1526 	unsigned long flags;
1527 	struct ocfs2_file_private *fp = file->private_data;
1528 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1529 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1530 	struct ocfs2_mask_waiter mw;
1531 
1532 	ocfs2_init_mask_waiter(&mw);
1533 
1534 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1535 		return;
1536 
1537 	if (lockres->l_level == LKM_NLMODE)
1538 		return;
1539 
1540 	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1541 	     lockres->l_name, lockres->l_flags, lockres->l_level,
1542 	     lockres->l_action);
1543 
1544 	spin_lock_irqsave(&lockres->l_lock, flags);
1545 	/*
1546 	 * Fake a blocking ast for the downconvert code.
1547 	 */
1548 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1549 	lockres->l_blocking = LKM_EXMODE;
1550 
1551 	ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1552 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1553 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1554 
1555 	ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
1556 	if (ret) {
1557 		mlog_errno(ret);
1558 		return;
1559 	}
1560 
1561 	ret = ocfs2_wait_for_mask(&mw);
1562 	if (ret)
1563 		mlog_errno(ret);
1564 }
1565 
1566 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1567 					struct ocfs2_lock_res *lockres)
1568 {
1569 	int kick = 0;
1570 
1571 	mlog_entry_void();
1572 
1573 	/* If we know that another node is waiting on our lock, kick
1574 	 * the downconvert thread * pre-emptively when we reach a release
1575 	 * condition. */
1576 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1577 		switch(lockres->l_blocking) {
1578 		case LKM_EXMODE:
1579 			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1580 				kick = 1;
1581 			break;
1582 		case LKM_PRMODE:
1583 			if (!lockres->l_ex_holders)
1584 				kick = 1;
1585 			break;
1586 		default:
1587 			BUG();
1588 		}
1589 	}
1590 
1591 	if (kick)
1592 		ocfs2_wake_downconvert_thread(osb);
1593 
1594 	mlog_exit_void();
1595 }
1596 
1597 #define OCFS2_SEC_BITS   34
1598 #define OCFS2_SEC_SHIFT  (64 - 34)
1599 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1600 
1601 /* LVB only has room for 64 bits of time here so we pack it for
1602  * now. */
1603 static u64 ocfs2_pack_timespec(struct timespec *spec)
1604 {
1605 	u64 res;
1606 	u64 sec = spec->tv_sec;
1607 	u32 nsec = spec->tv_nsec;
1608 
1609 	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1610 
1611 	return res;
1612 }
1613 
1614 /* Call this with the lockres locked. I am reasonably sure we don't
1615  * need ip_lock in this function as anyone who would be changing those
1616  * values is supposed to be blocked in ocfs2_inode_lock right now. */
1617 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1618 {
1619 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1620 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1621 	struct ocfs2_meta_lvb *lvb;
1622 
1623 	mlog_entry_void();
1624 
1625 	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1626 
1627 	/*
1628 	 * Invalidate the LVB of a deleted inode - this way other
1629 	 * nodes are forced to go to disk and discover the new inode
1630 	 * status.
1631 	 */
1632 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1633 		lvb->lvb_version = 0;
1634 		goto out;
1635 	}
1636 
1637 	lvb->lvb_version   = OCFS2_LVB_VERSION;
1638 	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1639 	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1640 	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1641 	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1642 	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1643 	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1644 	lvb->lvb_iatime_packed  =
1645 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1646 	lvb->lvb_ictime_packed =
1647 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1648 	lvb->lvb_imtime_packed =
1649 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1650 	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1651 	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1652 	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1653 
1654 out:
1655 	mlog_meta_lvb(0, lockres);
1656 
1657 	mlog_exit_void();
1658 }
1659 
1660 static void ocfs2_unpack_timespec(struct timespec *spec,
1661 				  u64 packed_time)
1662 {
1663 	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1664 	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1665 }
1666 
1667 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1668 {
1669 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1670 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1671 	struct ocfs2_meta_lvb *lvb;
1672 
1673 	mlog_entry_void();
1674 
1675 	mlog_meta_lvb(0, lockres);
1676 
1677 	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1678 
1679 	/* We're safe here without the lockres lock... */
1680 	spin_lock(&oi->ip_lock);
1681 	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1682 	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1683 
1684 	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1685 	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
1686 	ocfs2_set_inode_flags(inode);
1687 
1688 	/* fast-symlinks are a special case */
1689 	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1690 		inode->i_blocks = 0;
1691 	else
1692 		inode->i_blocks = ocfs2_inode_sector_count(inode);
1693 
1694 	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1695 	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1696 	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1697 	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1698 	ocfs2_unpack_timespec(&inode->i_atime,
1699 			      be64_to_cpu(lvb->lvb_iatime_packed));
1700 	ocfs2_unpack_timespec(&inode->i_mtime,
1701 			      be64_to_cpu(lvb->lvb_imtime_packed));
1702 	ocfs2_unpack_timespec(&inode->i_ctime,
1703 			      be64_to_cpu(lvb->lvb_ictime_packed));
1704 	spin_unlock(&oi->ip_lock);
1705 
1706 	mlog_exit_void();
1707 }
1708 
1709 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1710 					      struct ocfs2_lock_res *lockres)
1711 {
1712 	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1713 
1714 	if (lvb->lvb_version == OCFS2_LVB_VERSION
1715 	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1716 		return 1;
1717 	return 0;
1718 }
1719 
1720 /* Determine whether a lock resource needs to be refreshed, and
1721  * arbitrate who gets to refresh it.
1722  *
1723  *   0 means no refresh needed.
1724  *
1725  *   > 0 means you need to refresh this and you MUST call
1726  *   ocfs2_complete_lock_res_refresh afterwards. */
1727 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1728 {
1729 	unsigned long flags;
1730 	int status = 0;
1731 
1732 	mlog_entry_void();
1733 
1734 refresh_check:
1735 	spin_lock_irqsave(&lockres->l_lock, flags);
1736 	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1737 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1738 		goto bail;
1739 	}
1740 
1741 	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1742 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1743 
1744 		ocfs2_wait_on_refreshing_lock(lockres);
1745 		goto refresh_check;
1746 	}
1747 
1748 	/* Ok, I'll be the one to refresh this lock. */
1749 	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1750 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1751 
1752 	status = 1;
1753 bail:
1754 	mlog_exit(status);
1755 	return status;
1756 }
1757 
1758 /* If status is non zero, I'll mark it as not being in refresh
1759  * anymroe, but i won't clear the needs refresh flag. */
1760 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1761 						   int status)
1762 {
1763 	unsigned long flags;
1764 	mlog_entry_void();
1765 
1766 	spin_lock_irqsave(&lockres->l_lock, flags);
1767 	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1768 	if (!status)
1769 		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1770 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1771 
1772 	wake_up(&lockres->l_event);
1773 
1774 	mlog_exit_void();
1775 }
1776 
1777 /* may or may not return a bh if it went to disk. */
1778 static int ocfs2_inode_lock_update(struct inode *inode,
1779 				  struct buffer_head **bh)
1780 {
1781 	int status = 0;
1782 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1783 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1784 	struct ocfs2_dinode *fe;
1785 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1786 
1787 	mlog_entry_void();
1788 
1789 	if (ocfs2_mount_local(osb))
1790 		goto bail;
1791 
1792 	spin_lock(&oi->ip_lock);
1793 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1794 		mlog(0, "Orphaned inode %llu was deleted while we "
1795 		     "were waiting on a lock. ip_flags = 0x%x\n",
1796 		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1797 		spin_unlock(&oi->ip_lock);
1798 		status = -ENOENT;
1799 		goto bail;
1800 	}
1801 	spin_unlock(&oi->ip_lock);
1802 
1803 	if (!ocfs2_should_refresh_lock_res(lockres))
1804 		goto bail;
1805 
1806 	/* This will discard any caching information we might have had
1807 	 * for the inode metadata. */
1808 	ocfs2_metadata_cache_purge(inode);
1809 
1810 	ocfs2_extent_map_trunc(inode, 0);
1811 
1812 	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1813 		mlog(0, "Trusting LVB on inode %llu\n",
1814 		     (unsigned long long)oi->ip_blkno);
1815 		ocfs2_refresh_inode_from_lvb(inode);
1816 	} else {
1817 		/* Boo, we have to go to disk. */
1818 		/* read bh, cast, ocfs2_refresh_inode */
1819 		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1820 					  bh, OCFS2_BH_CACHED, inode);
1821 		if (status < 0) {
1822 			mlog_errno(status);
1823 			goto bail_refresh;
1824 		}
1825 		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1826 
1827 		/* This is a good chance to make sure we're not
1828 		 * locking an invalid object.
1829 		 *
1830 		 * We bug on a stale inode here because we checked
1831 		 * above whether it was wiped from disk. The wiping
1832 		 * node provides a guarantee that we receive that
1833 		 * message and can mark the inode before dropping any
1834 		 * locks associated with it. */
1835 		if (!OCFS2_IS_VALID_DINODE(fe)) {
1836 			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1837 			status = -EIO;
1838 			goto bail_refresh;
1839 		}
1840 		mlog_bug_on_msg(inode->i_generation !=
1841 				le32_to_cpu(fe->i_generation),
1842 				"Invalid dinode %llu disk generation: %u "
1843 				"inode->i_generation: %u\n",
1844 				(unsigned long long)oi->ip_blkno,
1845 				le32_to_cpu(fe->i_generation),
1846 				inode->i_generation);
1847 		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1848 				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1849 				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1850 				(unsigned long long)oi->ip_blkno,
1851 				(unsigned long long)le64_to_cpu(fe->i_dtime),
1852 				le32_to_cpu(fe->i_flags));
1853 
1854 		ocfs2_refresh_inode(inode, fe);
1855 	}
1856 
1857 	status = 0;
1858 bail_refresh:
1859 	ocfs2_complete_lock_res_refresh(lockres, status);
1860 bail:
1861 	mlog_exit(status);
1862 	return status;
1863 }
1864 
1865 static int ocfs2_assign_bh(struct inode *inode,
1866 			   struct buffer_head **ret_bh,
1867 			   struct buffer_head *passed_bh)
1868 {
1869 	int status;
1870 
1871 	if (passed_bh) {
1872 		/* Ok, the update went to disk for us, use the
1873 		 * returned bh. */
1874 		*ret_bh = passed_bh;
1875 		get_bh(*ret_bh);
1876 
1877 		return 0;
1878 	}
1879 
1880 	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1881 				  OCFS2_I(inode)->ip_blkno,
1882 				  ret_bh,
1883 				  OCFS2_BH_CACHED,
1884 				  inode);
1885 	if (status < 0)
1886 		mlog_errno(status);
1887 
1888 	return status;
1889 }
1890 
1891 /*
1892  * returns < 0 error if the callback will never be called, otherwise
1893  * the result of the lock will be communicated via the callback.
1894  */
1895 int ocfs2_inode_lock_full(struct inode *inode,
1896 			 struct buffer_head **ret_bh,
1897 			 int ex,
1898 			 int arg_flags)
1899 {
1900 	int status, level, dlm_flags, acquired;
1901 	struct ocfs2_lock_res *lockres = NULL;
1902 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1903 	struct buffer_head *local_bh = NULL;
1904 
1905 	BUG_ON(!inode);
1906 
1907 	mlog_entry_void();
1908 
1909 	mlog(0, "inode %llu, take %s META lock\n",
1910 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1911 	     ex ? "EXMODE" : "PRMODE");
1912 
1913 	status = 0;
1914 	acquired = 0;
1915 	/* We'll allow faking a readonly metadata lock for
1916 	 * rodevices. */
1917 	if (ocfs2_is_hard_readonly(osb)) {
1918 		if (ex)
1919 			status = -EROFS;
1920 		goto bail;
1921 	}
1922 
1923 	if (ocfs2_mount_local(osb))
1924 		goto local;
1925 
1926 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1927 		wait_event(osb->recovery_event,
1928 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1929 
1930 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
1931 	level = ex ? LKM_EXMODE : LKM_PRMODE;
1932 	dlm_flags = 0;
1933 	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1934 		dlm_flags |= LKM_NOQUEUE;
1935 
1936 	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1937 	if (status < 0) {
1938 		if (status != -EAGAIN && status != -EIOCBRETRY)
1939 			mlog_errno(status);
1940 		goto bail;
1941 	}
1942 
1943 	/* Notify the error cleanup path to drop the cluster lock. */
1944 	acquired = 1;
1945 
1946 	/* We wait twice because a node may have died while we were in
1947 	 * the lower dlm layers. The second time though, we've
1948 	 * committed to owning this lock so we don't allow signals to
1949 	 * abort the operation. */
1950 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1951 		wait_event(osb->recovery_event,
1952 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1953 
1954 local:
1955 	/*
1956 	 * We only see this flag if we're being called from
1957 	 * ocfs2_read_locked_inode(). It means we're locking an inode
1958 	 * which hasn't been populated yet, so clear the refresh flag
1959 	 * and let the caller handle it.
1960 	 */
1961 	if (inode->i_state & I_NEW) {
1962 		status = 0;
1963 		if (lockres)
1964 			ocfs2_complete_lock_res_refresh(lockres, 0);
1965 		goto bail;
1966 	}
1967 
1968 	/* This is fun. The caller may want a bh back, or it may
1969 	 * not. ocfs2_inode_lock_update definitely wants one in, but
1970 	 * may or may not read one, depending on what's in the
1971 	 * LVB. The result of all of this is that we've *only* gone to
1972 	 * disk if we have to, so the complexity is worthwhile. */
1973 	status = ocfs2_inode_lock_update(inode, &local_bh);
1974 	if (status < 0) {
1975 		if (status != -ENOENT)
1976 			mlog_errno(status);
1977 		goto bail;
1978 	}
1979 
1980 	if (ret_bh) {
1981 		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1982 		if (status < 0) {
1983 			mlog_errno(status);
1984 			goto bail;
1985 		}
1986 	}
1987 
1988 bail:
1989 	if (status < 0) {
1990 		if (ret_bh && (*ret_bh)) {
1991 			brelse(*ret_bh);
1992 			*ret_bh = NULL;
1993 		}
1994 		if (acquired)
1995 			ocfs2_inode_unlock(inode, ex);
1996 	}
1997 
1998 	if (local_bh)
1999 		brelse(local_bh);
2000 
2001 	mlog_exit(status);
2002 	return status;
2003 }
2004 
2005 /*
2006  * This is working around a lock inversion between tasks acquiring DLM
2007  * locks while holding a page lock and the downconvert thread which
2008  * blocks dlm lock acquiry while acquiring page locks.
2009  *
2010  * ** These _with_page variantes are only intended to be called from aop
2011  * methods that hold page locks and return a very specific *positive* error
2012  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2013  *
2014  * The DLM is called such that it returns -EAGAIN if it would have
2015  * blocked waiting for the downconvert thread.  In that case we unlock
2016  * our page so the downconvert thread can make progress.  Once we've
2017  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2018  * that called us can bubble that back up into the VFS who will then
2019  * immediately retry the aop call.
2020  *
2021  * We do a blocking lock and immediate unlock before returning, though, so that
2022  * the lock has a great chance of being cached on this node by the time the VFS
2023  * calls back to retry the aop.    This has a potential to livelock as nodes
2024  * ping locks back and forth, but that's a risk we're willing to take to avoid
2025  * the lock inversion simply.
2026  */
2027 int ocfs2_inode_lock_with_page(struct inode *inode,
2028 			      struct buffer_head **ret_bh,
2029 			      int ex,
2030 			      struct page *page)
2031 {
2032 	int ret;
2033 
2034 	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2035 	if (ret == -EAGAIN) {
2036 		unlock_page(page);
2037 		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2038 			ocfs2_inode_unlock(inode, ex);
2039 		ret = AOP_TRUNCATED_PAGE;
2040 	}
2041 
2042 	return ret;
2043 }
2044 
2045 int ocfs2_inode_lock_atime(struct inode *inode,
2046 			  struct vfsmount *vfsmnt,
2047 			  int *level)
2048 {
2049 	int ret;
2050 
2051 	mlog_entry_void();
2052 	ret = ocfs2_inode_lock(inode, NULL, 0);
2053 	if (ret < 0) {
2054 		mlog_errno(ret);
2055 		return ret;
2056 	}
2057 
2058 	/*
2059 	 * If we should update atime, we will get EX lock,
2060 	 * otherwise we just get PR lock.
2061 	 */
2062 	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2063 		struct buffer_head *bh = NULL;
2064 
2065 		ocfs2_inode_unlock(inode, 0);
2066 		ret = ocfs2_inode_lock(inode, &bh, 1);
2067 		if (ret < 0) {
2068 			mlog_errno(ret);
2069 			return ret;
2070 		}
2071 		*level = 1;
2072 		if (ocfs2_should_update_atime(inode, vfsmnt))
2073 			ocfs2_update_inode_atime(inode, bh);
2074 		if (bh)
2075 			brelse(bh);
2076 	} else
2077 		*level = 0;
2078 
2079 	mlog_exit(ret);
2080 	return ret;
2081 }
2082 
2083 void ocfs2_inode_unlock(struct inode *inode,
2084 		       int ex)
2085 {
2086 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2087 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2088 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2089 
2090 	mlog_entry_void();
2091 
2092 	mlog(0, "inode %llu drop %s META lock\n",
2093 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2094 	     ex ? "EXMODE" : "PRMODE");
2095 
2096 	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2097 	    !ocfs2_mount_local(osb))
2098 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2099 
2100 	mlog_exit_void();
2101 }
2102 
2103 int ocfs2_super_lock(struct ocfs2_super *osb,
2104 		     int ex)
2105 {
2106 	int status = 0;
2107 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2108 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2109 	struct buffer_head *bh;
2110 	struct ocfs2_slot_info *si = osb->slot_info;
2111 
2112 	mlog_entry_void();
2113 
2114 	if (ocfs2_is_hard_readonly(osb))
2115 		return -EROFS;
2116 
2117 	if (ocfs2_mount_local(osb))
2118 		goto bail;
2119 
2120 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2121 	if (status < 0) {
2122 		mlog_errno(status);
2123 		goto bail;
2124 	}
2125 
2126 	/* The super block lock path is really in the best position to
2127 	 * know when resources covered by the lock need to be
2128 	 * refreshed, so we do it here. Of course, making sense of
2129 	 * everything is up to the caller :) */
2130 	status = ocfs2_should_refresh_lock_res(lockres);
2131 	if (status < 0) {
2132 		mlog_errno(status);
2133 		goto bail;
2134 	}
2135 	if (status) {
2136 		bh = si->si_bh;
2137 		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
2138 					  si->si_inode);
2139 		if (status == 0)
2140 			ocfs2_update_slot_info(si);
2141 
2142 		ocfs2_complete_lock_res_refresh(lockres, status);
2143 
2144 		if (status < 0)
2145 			mlog_errno(status);
2146 	}
2147 bail:
2148 	mlog_exit(status);
2149 	return status;
2150 }
2151 
2152 void ocfs2_super_unlock(struct ocfs2_super *osb,
2153 			int ex)
2154 {
2155 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2156 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2157 
2158 	if (!ocfs2_mount_local(osb))
2159 		ocfs2_cluster_unlock(osb, lockres, level);
2160 }
2161 
2162 int ocfs2_rename_lock(struct ocfs2_super *osb)
2163 {
2164 	int status;
2165 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2166 
2167 	if (ocfs2_is_hard_readonly(osb))
2168 		return -EROFS;
2169 
2170 	if (ocfs2_mount_local(osb))
2171 		return 0;
2172 
2173 	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
2174 	if (status < 0)
2175 		mlog_errno(status);
2176 
2177 	return status;
2178 }
2179 
2180 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2181 {
2182 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2183 
2184 	if (!ocfs2_mount_local(osb))
2185 		ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
2186 }
2187 
2188 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2189 {
2190 	int ret;
2191 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2192 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2193 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2194 
2195 	BUG_ON(!dl);
2196 
2197 	if (ocfs2_is_hard_readonly(osb))
2198 		return -EROFS;
2199 
2200 	if (ocfs2_mount_local(osb))
2201 		return 0;
2202 
2203 	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2204 	if (ret < 0)
2205 		mlog_errno(ret);
2206 
2207 	return ret;
2208 }
2209 
2210 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2211 {
2212 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2213 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2214 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2215 
2216 	if (!ocfs2_mount_local(osb))
2217 		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2218 }
2219 
2220 /* Reference counting of the dlm debug structure. We want this because
2221  * open references on the debug inodes can live on after a mount, so
2222  * we can't rely on the ocfs2_super to always exist. */
2223 static void ocfs2_dlm_debug_free(struct kref *kref)
2224 {
2225 	struct ocfs2_dlm_debug *dlm_debug;
2226 
2227 	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2228 
2229 	kfree(dlm_debug);
2230 }
2231 
2232 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2233 {
2234 	if (dlm_debug)
2235 		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2236 }
2237 
2238 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2239 {
2240 	kref_get(&debug->d_refcnt);
2241 }
2242 
2243 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2244 {
2245 	struct ocfs2_dlm_debug *dlm_debug;
2246 
2247 	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2248 	if (!dlm_debug) {
2249 		mlog_errno(-ENOMEM);
2250 		goto out;
2251 	}
2252 
2253 	kref_init(&dlm_debug->d_refcnt);
2254 	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2255 	dlm_debug->d_locking_state = NULL;
2256 out:
2257 	return dlm_debug;
2258 }
2259 
2260 /* Access to this is arbitrated for us via seq_file->sem. */
2261 struct ocfs2_dlm_seq_priv {
2262 	struct ocfs2_dlm_debug *p_dlm_debug;
2263 	struct ocfs2_lock_res p_iter_res;
2264 	struct ocfs2_lock_res p_tmp_res;
2265 };
2266 
2267 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2268 						 struct ocfs2_dlm_seq_priv *priv)
2269 {
2270 	struct ocfs2_lock_res *iter, *ret = NULL;
2271 	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2272 
2273 	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2274 
2275 	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2276 		/* discover the head of the list */
2277 		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2278 			mlog(0, "End of list found, %p\n", ret);
2279 			break;
2280 		}
2281 
2282 		/* We track our "dummy" iteration lockres' by a NULL
2283 		 * l_ops field. */
2284 		if (iter->l_ops != NULL) {
2285 			ret = iter;
2286 			break;
2287 		}
2288 	}
2289 
2290 	return ret;
2291 }
2292 
2293 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2294 {
2295 	struct ocfs2_dlm_seq_priv *priv = m->private;
2296 	struct ocfs2_lock_res *iter;
2297 
2298 	spin_lock(&ocfs2_dlm_tracking_lock);
2299 	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2300 	if (iter) {
2301 		/* Since lockres' have the lifetime of their container
2302 		 * (which can be inodes, ocfs2_supers, etc) we want to
2303 		 * copy this out to a temporary lockres while still
2304 		 * under the spinlock. Obviously after this we can't
2305 		 * trust any pointers on the copy returned, but that's
2306 		 * ok as the information we want isn't typically held
2307 		 * in them. */
2308 		priv->p_tmp_res = *iter;
2309 		iter = &priv->p_tmp_res;
2310 	}
2311 	spin_unlock(&ocfs2_dlm_tracking_lock);
2312 
2313 	return iter;
2314 }
2315 
2316 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2317 {
2318 }
2319 
2320 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2321 {
2322 	struct ocfs2_dlm_seq_priv *priv = m->private;
2323 	struct ocfs2_lock_res *iter = v;
2324 	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2325 
2326 	spin_lock(&ocfs2_dlm_tracking_lock);
2327 	iter = ocfs2_dlm_next_res(iter, priv);
2328 	list_del_init(&dummy->l_debug_list);
2329 	if (iter) {
2330 		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2331 		priv->p_tmp_res = *iter;
2332 		iter = &priv->p_tmp_res;
2333 	}
2334 	spin_unlock(&ocfs2_dlm_tracking_lock);
2335 
2336 	return iter;
2337 }
2338 
2339 /* So that debugfs.ocfs2 can determine which format is being used */
2340 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2341 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2342 {
2343 	int i;
2344 	char *lvb;
2345 	struct ocfs2_lock_res *lockres = v;
2346 
2347 	if (!lockres)
2348 		return -EINVAL;
2349 
2350 	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2351 
2352 	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2353 		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2354 			   lockres->l_name,
2355 			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2356 	else
2357 		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2358 
2359 	seq_printf(m, "%d\t"
2360 		   "0x%lx\t"
2361 		   "0x%x\t"
2362 		   "0x%x\t"
2363 		   "%u\t"
2364 		   "%u\t"
2365 		   "%d\t"
2366 		   "%d\t",
2367 		   lockres->l_level,
2368 		   lockres->l_flags,
2369 		   lockres->l_action,
2370 		   lockres->l_unlock_action,
2371 		   lockres->l_ro_holders,
2372 		   lockres->l_ex_holders,
2373 		   lockres->l_requested,
2374 		   lockres->l_blocking);
2375 
2376 	/* Dump the raw LVB */
2377 	lvb = lockres->l_lksb.lvb;
2378 	for(i = 0; i < DLM_LVB_LEN; i++)
2379 		seq_printf(m, "0x%x\t", lvb[i]);
2380 
2381 	/* End the line */
2382 	seq_printf(m, "\n");
2383 	return 0;
2384 }
2385 
2386 static struct seq_operations ocfs2_dlm_seq_ops = {
2387 	.start =	ocfs2_dlm_seq_start,
2388 	.stop =		ocfs2_dlm_seq_stop,
2389 	.next =		ocfs2_dlm_seq_next,
2390 	.show =		ocfs2_dlm_seq_show,
2391 };
2392 
2393 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2394 {
2395 	struct seq_file *seq = (struct seq_file *) file->private_data;
2396 	struct ocfs2_dlm_seq_priv *priv = seq->private;
2397 	struct ocfs2_lock_res *res = &priv->p_iter_res;
2398 
2399 	ocfs2_remove_lockres_tracking(res);
2400 	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2401 	return seq_release_private(inode, file);
2402 }
2403 
2404 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2405 {
2406 	int ret;
2407 	struct ocfs2_dlm_seq_priv *priv;
2408 	struct seq_file *seq;
2409 	struct ocfs2_super *osb;
2410 
2411 	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2412 	if (!priv) {
2413 		ret = -ENOMEM;
2414 		mlog_errno(ret);
2415 		goto out;
2416 	}
2417 	osb = inode->i_private;
2418 	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2419 	priv->p_dlm_debug = osb->osb_dlm_debug;
2420 	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2421 
2422 	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2423 	if (ret) {
2424 		kfree(priv);
2425 		mlog_errno(ret);
2426 		goto out;
2427 	}
2428 
2429 	seq = (struct seq_file *) file->private_data;
2430 	seq->private = priv;
2431 
2432 	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2433 				   priv->p_dlm_debug);
2434 
2435 out:
2436 	return ret;
2437 }
2438 
2439 static const struct file_operations ocfs2_dlm_debug_fops = {
2440 	.open =		ocfs2_dlm_debug_open,
2441 	.release =	ocfs2_dlm_debug_release,
2442 	.read =		seq_read,
2443 	.llseek =	seq_lseek,
2444 };
2445 
2446 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2447 {
2448 	int ret = 0;
2449 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2450 
2451 	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2452 							 S_IFREG|S_IRUSR,
2453 							 osb->osb_debug_root,
2454 							 osb,
2455 							 &ocfs2_dlm_debug_fops);
2456 	if (!dlm_debug->d_locking_state) {
2457 		ret = -EINVAL;
2458 		mlog(ML_ERROR,
2459 		     "Unable to create locking state debugfs file.\n");
2460 		goto out;
2461 	}
2462 
2463 	ocfs2_get_dlm_debug(dlm_debug);
2464 out:
2465 	return ret;
2466 }
2467 
2468 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2469 {
2470 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2471 
2472 	if (dlm_debug) {
2473 		debugfs_remove(dlm_debug->d_locking_state);
2474 		ocfs2_put_dlm_debug(dlm_debug);
2475 	}
2476 }
2477 
2478 int ocfs2_dlm_init(struct ocfs2_super *osb)
2479 {
2480 	int status = 0;
2481 	u32 dlm_key;
2482 	struct dlm_ctxt *dlm = NULL;
2483 
2484 	mlog_entry_void();
2485 
2486 	if (ocfs2_mount_local(osb))
2487 		goto local;
2488 
2489 	status = ocfs2_dlm_init_debug(osb);
2490 	if (status < 0) {
2491 		mlog_errno(status);
2492 		goto bail;
2493 	}
2494 
2495 	/* launch downconvert thread */
2496 	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2497 	if (IS_ERR(osb->dc_task)) {
2498 		status = PTR_ERR(osb->dc_task);
2499 		osb->dc_task = NULL;
2500 		mlog_errno(status);
2501 		goto bail;
2502 	}
2503 
2504 	/* used by the dlm code to make message headers unique, each
2505 	 * node in this domain must agree on this. */
2506 	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2507 
2508 	/* for now, uuid == domain */
2509 	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2510 	if (IS_ERR(dlm)) {
2511 		status = PTR_ERR(dlm);
2512 		mlog_errno(status);
2513 		goto bail;
2514 	}
2515 
2516 	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2517 
2518 local:
2519 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2520 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2521 
2522 	osb->dlm = dlm;
2523 
2524 	status = 0;
2525 bail:
2526 	if (status < 0) {
2527 		ocfs2_dlm_shutdown_debug(osb);
2528 		if (osb->dc_task)
2529 			kthread_stop(osb->dc_task);
2530 	}
2531 
2532 	mlog_exit(status);
2533 	return status;
2534 }
2535 
2536 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2537 {
2538 	mlog_entry_void();
2539 
2540 	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2541 
2542 	ocfs2_drop_osb_locks(osb);
2543 
2544 	if (osb->dc_task) {
2545 		kthread_stop(osb->dc_task);
2546 		osb->dc_task = NULL;
2547 	}
2548 
2549 	ocfs2_lock_res_free(&osb->osb_super_lockres);
2550 	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2551 
2552 	dlm_unregister_domain(osb->dlm);
2553 	osb->dlm = NULL;
2554 
2555 	ocfs2_dlm_shutdown_debug(osb);
2556 
2557 	mlog_exit_void();
2558 }
2559 
2560 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2561 {
2562 	struct ocfs2_lock_res *lockres = opaque;
2563 	unsigned long flags;
2564 
2565 	mlog_entry_void();
2566 
2567 	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2568 	     lockres->l_unlock_action);
2569 
2570 	spin_lock_irqsave(&lockres->l_lock, flags);
2571 	/* We tried to cancel a convert request, but it was already
2572 	 * granted. All we want to do here is clear our unlock
2573 	 * state. The wake_up call done at the bottom is redundant
2574 	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2575 	 * hurt anything anyway */
2576 	if (status == DLM_CANCELGRANT &&
2577 	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2578 		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2579 
2580 		/* We don't clear the busy flag in this case as it
2581 		 * should have been cleared by the ast which the dlm
2582 		 * has called. */
2583 		goto complete_unlock;
2584 	}
2585 
2586 	if (status != DLM_NORMAL) {
2587 		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2588 		     "unlock_action %d\n", status, lockres->l_name,
2589 		     lockres->l_unlock_action);
2590 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2591 		return;
2592 	}
2593 
2594 	switch(lockres->l_unlock_action) {
2595 	case OCFS2_UNLOCK_CANCEL_CONVERT:
2596 		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2597 		lockres->l_action = OCFS2_AST_INVALID;
2598 		break;
2599 	case OCFS2_UNLOCK_DROP_LOCK:
2600 		lockres->l_level = LKM_IVMODE;
2601 		break;
2602 	default:
2603 		BUG();
2604 	}
2605 
2606 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2607 complete_unlock:
2608 	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2609 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2610 
2611 	wake_up(&lockres->l_event);
2612 
2613 	mlog_exit_void();
2614 }
2615 
2616 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2617 			   struct ocfs2_lock_res *lockres)
2618 {
2619 	enum dlm_status status;
2620 	unsigned long flags;
2621 	int lkm_flags = 0;
2622 
2623 	/* We didn't get anywhere near actually using this lockres. */
2624 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2625 		goto out;
2626 
2627 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2628 		lkm_flags |= LKM_VALBLK;
2629 
2630 	spin_lock_irqsave(&lockres->l_lock, flags);
2631 
2632 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2633 			"lockres %s, flags 0x%lx\n",
2634 			lockres->l_name, lockres->l_flags);
2635 
2636 	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2637 		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2638 		     "%u, unlock_action = %u\n",
2639 		     lockres->l_name, lockres->l_flags, lockres->l_action,
2640 		     lockres->l_unlock_action);
2641 
2642 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2643 
2644 		/* XXX: Today we just wait on any busy
2645 		 * locks... Perhaps we need to cancel converts in the
2646 		 * future? */
2647 		ocfs2_wait_on_busy_lock(lockres);
2648 
2649 		spin_lock_irqsave(&lockres->l_lock, flags);
2650 	}
2651 
2652 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2653 		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2654 		    lockres->l_level == LKM_EXMODE &&
2655 		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2656 			lockres->l_ops->set_lvb(lockres);
2657 	}
2658 
2659 	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2660 		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2661 		     lockres->l_name);
2662 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2663 		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2664 
2665 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2666 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2667 		goto out;
2668 	}
2669 
2670 	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2671 
2672 	/* make sure we never get here while waiting for an ast to
2673 	 * fire. */
2674 	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2675 
2676 	/* is this necessary? */
2677 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2678 	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2679 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2680 
2681 	mlog(0, "lock %s\n", lockres->l_name);
2682 
2683 	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2684 			   ocfs2_unlock_ast, lockres);
2685 	if (status != DLM_NORMAL) {
2686 		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2687 		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2688 		dlm_print_one_lock(lockres->l_lksb.lockid);
2689 		BUG();
2690 	}
2691 	mlog(0, "lock %s, successfull return from dlmunlock\n",
2692 	     lockres->l_name);
2693 
2694 	ocfs2_wait_on_busy_lock(lockres);
2695 out:
2696 	mlog_exit(0);
2697 	return 0;
2698 }
2699 
2700 /* Mark the lockres as being dropped. It will no longer be
2701  * queued if blocking, but we still may have to wait on it
2702  * being dequeued from the downconvert thread before we can consider
2703  * it safe to drop.
2704  *
2705  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2706 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2707 {
2708 	int status;
2709 	struct ocfs2_mask_waiter mw;
2710 	unsigned long flags;
2711 
2712 	ocfs2_init_mask_waiter(&mw);
2713 
2714 	spin_lock_irqsave(&lockres->l_lock, flags);
2715 	lockres->l_flags |= OCFS2_LOCK_FREEING;
2716 	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2717 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2718 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2719 
2720 		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2721 
2722 		status = ocfs2_wait_for_mask(&mw);
2723 		if (status)
2724 			mlog_errno(status);
2725 
2726 		spin_lock_irqsave(&lockres->l_lock, flags);
2727 	}
2728 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2729 }
2730 
2731 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2732 			       struct ocfs2_lock_res *lockres)
2733 {
2734 	int ret;
2735 
2736 	ocfs2_mark_lockres_freeing(lockres);
2737 	ret = ocfs2_drop_lock(osb, lockres);
2738 	if (ret)
2739 		mlog_errno(ret);
2740 }
2741 
2742 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2743 {
2744 	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2745 	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2746 }
2747 
2748 int ocfs2_drop_inode_locks(struct inode *inode)
2749 {
2750 	int status, err;
2751 
2752 	mlog_entry_void();
2753 
2754 	/* No need to call ocfs2_mark_lockres_freeing here -
2755 	 * ocfs2_clear_inode has done it for us. */
2756 
2757 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2758 			      &OCFS2_I(inode)->ip_open_lockres);
2759 	if (err < 0)
2760 		mlog_errno(err);
2761 
2762 	status = err;
2763 
2764 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2765 			      &OCFS2_I(inode)->ip_inode_lockres);
2766 	if (err < 0)
2767 		mlog_errno(err);
2768 	if (err < 0 && !status)
2769 		status = err;
2770 
2771 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2772 			      &OCFS2_I(inode)->ip_rw_lockres);
2773 	if (err < 0)
2774 		mlog_errno(err);
2775 	if (err < 0 && !status)
2776 		status = err;
2777 
2778 	mlog_exit(status);
2779 	return status;
2780 }
2781 
2782 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2783 				      int new_level)
2784 {
2785 	assert_spin_locked(&lockres->l_lock);
2786 
2787 	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2788 
2789 	if (lockres->l_level <= new_level) {
2790 		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2791 		     lockres->l_level, new_level);
2792 		BUG();
2793 	}
2794 
2795 	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2796 	     lockres->l_name, new_level, lockres->l_blocking);
2797 
2798 	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2799 	lockres->l_requested = new_level;
2800 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2801 }
2802 
2803 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2804 				  struct ocfs2_lock_res *lockres,
2805 				  int new_level,
2806 				  int lvb)
2807 {
2808 	int ret, dlm_flags = LKM_CONVERT;
2809 	enum dlm_status status;
2810 
2811 	mlog_entry_void();
2812 
2813 	if (lvb)
2814 		dlm_flags |= LKM_VALBLK;
2815 
2816 	status = dlmlock(osb->dlm,
2817 			 new_level,
2818 			 &lockres->l_lksb,
2819 			 dlm_flags,
2820 			 lockres->l_name,
2821 			 OCFS2_LOCK_ID_MAX_LEN - 1,
2822 			 ocfs2_locking_ast,
2823 			 lockres,
2824 			 ocfs2_blocking_ast);
2825 	if (status != DLM_NORMAL) {
2826 		ocfs2_log_dlm_error("dlmlock", status, lockres);
2827 		ret = -EINVAL;
2828 		ocfs2_recover_from_dlm_error(lockres, 1);
2829 		goto bail;
2830 	}
2831 
2832 	ret = 0;
2833 bail:
2834 	mlog_exit(ret);
2835 	return ret;
2836 }
2837 
2838 /* returns 1 when the caller should unlock and call dlmunlock */
2839 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2840 				        struct ocfs2_lock_res *lockres)
2841 {
2842 	assert_spin_locked(&lockres->l_lock);
2843 
2844 	mlog_entry_void();
2845 	mlog(0, "lock %s\n", lockres->l_name);
2846 
2847 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2848 		/* If we're already trying to cancel a lock conversion
2849 		 * then just drop the spinlock and allow the caller to
2850 		 * requeue this lock. */
2851 
2852 		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2853 		return 0;
2854 	}
2855 
2856 	/* were we in a convert when we got the bast fire? */
2857 	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2858 	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2859 	/* set things up for the unlockast to know to just
2860 	 * clear out the ast_action and unset busy, etc. */
2861 	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2862 
2863 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2864 			"lock %s, invalid flags: 0x%lx\n",
2865 			lockres->l_name, lockres->l_flags);
2866 
2867 	return 1;
2868 }
2869 
2870 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2871 				struct ocfs2_lock_res *lockres)
2872 {
2873 	int ret;
2874 	enum dlm_status status;
2875 
2876 	mlog_entry_void();
2877 	mlog(0, "lock %s\n", lockres->l_name);
2878 
2879 	ret = 0;
2880 	status = dlmunlock(osb->dlm,
2881 			   &lockres->l_lksb,
2882 			   LKM_CANCEL,
2883 			   ocfs2_unlock_ast,
2884 			   lockres);
2885 	if (status != DLM_NORMAL) {
2886 		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2887 		ret = -EINVAL;
2888 		ocfs2_recover_from_dlm_error(lockres, 0);
2889 	}
2890 
2891 	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2892 
2893 	mlog_exit(ret);
2894 	return ret;
2895 }
2896 
2897 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2898 			      struct ocfs2_lock_res *lockres,
2899 			      struct ocfs2_unblock_ctl *ctl)
2900 {
2901 	unsigned long flags;
2902 	int blocking;
2903 	int new_level;
2904 	int ret = 0;
2905 	int set_lvb = 0;
2906 
2907 	mlog_entry_void();
2908 
2909 	spin_lock_irqsave(&lockres->l_lock, flags);
2910 
2911 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2912 
2913 recheck:
2914 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2915 		ctl->requeue = 1;
2916 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2917 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2918 		if (ret) {
2919 			ret = ocfs2_cancel_convert(osb, lockres);
2920 			if (ret < 0)
2921 				mlog_errno(ret);
2922 		}
2923 		goto leave;
2924 	}
2925 
2926 	/* if we're blocking an exclusive and we have *any* holders,
2927 	 * then requeue. */
2928 	if ((lockres->l_blocking == LKM_EXMODE)
2929 	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2930 		goto leave_requeue;
2931 
2932 	/* If it's a PR we're blocking, then only
2933 	 * requeue if we've got any EX holders */
2934 	if (lockres->l_blocking == LKM_PRMODE &&
2935 	    lockres->l_ex_holders)
2936 		goto leave_requeue;
2937 
2938 	/*
2939 	 * Can we get a lock in this state if the holder counts are
2940 	 * zero? The meta data unblock code used to check this.
2941 	 */
2942 	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2943 	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2944 		goto leave_requeue;
2945 
2946 	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2947 
2948 	if (lockres->l_ops->check_downconvert
2949 	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2950 		goto leave_requeue;
2951 
2952 	/* If we get here, then we know that there are no more
2953 	 * incompatible holders (and anyone asking for an incompatible
2954 	 * lock is blocked). We can now downconvert the lock */
2955 	if (!lockres->l_ops->downconvert_worker)
2956 		goto downconvert;
2957 
2958 	/* Some lockres types want to do a bit of work before
2959 	 * downconverting a lock. Allow that here. The worker function
2960 	 * may sleep, so we save off a copy of what we're blocking as
2961 	 * it may change while we're not holding the spin lock. */
2962 	blocking = lockres->l_blocking;
2963 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2964 
2965 	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2966 
2967 	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2968 		goto leave;
2969 
2970 	spin_lock_irqsave(&lockres->l_lock, flags);
2971 	if (blocking != lockres->l_blocking) {
2972 		/* If this changed underneath us, then we can't drop
2973 		 * it just yet. */
2974 		goto recheck;
2975 	}
2976 
2977 downconvert:
2978 	ctl->requeue = 0;
2979 
2980 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2981 		if (lockres->l_level == LKM_EXMODE)
2982 			set_lvb = 1;
2983 
2984 		/*
2985 		 * We only set the lvb if the lock has been fully
2986 		 * refreshed - otherwise we risk setting stale
2987 		 * data. Otherwise, there's no need to actually clear
2988 		 * out the lvb here as it's value is still valid.
2989 		 */
2990 		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2991 			lockres->l_ops->set_lvb(lockres);
2992 	}
2993 
2994 	ocfs2_prepare_downconvert(lockres, new_level);
2995 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2996 	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2997 leave:
2998 	mlog_exit(ret);
2999 	return ret;
3000 
3001 leave_requeue:
3002 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3003 	ctl->requeue = 1;
3004 
3005 	mlog_exit(0);
3006 	return 0;
3007 }
3008 
3009 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3010 				     int blocking)
3011 {
3012 	struct inode *inode;
3013 	struct address_space *mapping;
3014 
3015        	inode = ocfs2_lock_res_inode(lockres);
3016 	mapping = inode->i_mapping;
3017 
3018 	if (S_ISREG(inode->i_mode))
3019 		goto out;
3020 
3021 	/*
3022 	 * We need this before the filemap_fdatawrite() so that it can
3023 	 * transfer the dirty bit from the PTE to the
3024 	 * page. Unfortunately this means that even for EX->PR
3025 	 * downconverts, we'll lose our mappings and have to build
3026 	 * them up again.
3027 	 */
3028 	unmap_mapping_range(mapping, 0, 0, 0);
3029 
3030 	if (filemap_fdatawrite(mapping)) {
3031 		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3032 		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3033 	}
3034 	sync_mapping_buffers(mapping);
3035 	if (blocking == LKM_EXMODE) {
3036 		truncate_inode_pages(mapping, 0);
3037 	} else {
3038 		/* We only need to wait on the I/O if we're not also
3039 		 * truncating pages because truncate_inode_pages waits
3040 		 * for us above. We don't truncate pages if we're
3041 		 * blocking anything < EXMODE because we want to keep
3042 		 * them around in that case. */
3043 		filemap_fdatawait(mapping);
3044 	}
3045 
3046 out:
3047 	return UNBLOCK_CONTINUE;
3048 }
3049 
3050 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3051 					int new_level)
3052 {
3053 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3054 	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3055 
3056 	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
3057 	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
3058 
3059 	if (checkpointed)
3060 		return 1;
3061 
3062 	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3063 	return 0;
3064 }
3065 
3066 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3067 {
3068 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3069 
3070 	__ocfs2_stuff_meta_lvb(inode);
3071 }
3072 
3073 /*
3074  * Does the final reference drop on our dentry lock. Right now this
3075  * happens in the downconvert thread, but we could choose to simplify the
3076  * dlmglue API and push these off to the ocfs2_wq in the future.
3077  */
3078 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3079 				     struct ocfs2_lock_res *lockres)
3080 {
3081 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3082 	ocfs2_dentry_lock_put(osb, dl);
3083 }
3084 
3085 /*
3086  * d_delete() matching dentries before the lock downconvert.
3087  *
3088  * At this point, any process waiting to destroy the
3089  * dentry_lock due to last ref count is stopped by the
3090  * OCFS2_LOCK_QUEUED flag.
3091  *
3092  * We have two potential problems
3093  *
3094  * 1) If we do the last reference drop on our dentry_lock (via dput)
3095  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3096  *    the downconvert to finish. Instead we take an elevated
3097  *    reference and push the drop until after we've completed our
3098  *    unblock processing.
3099  *
3100  * 2) There might be another process with a final reference,
3101  *    waiting on us to finish processing. If this is the case, we
3102  *    detect it and exit out - there's no more dentries anyway.
3103  */
3104 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3105 				       int blocking)
3106 {
3107 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3108 	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3109 	struct dentry *dentry;
3110 	unsigned long flags;
3111 	int extra_ref = 0;
3112 
3113 	/*
3114 	 * This node is blocking another node from getting a read
3115 	 * lock. This happens when we've renamed within a
3116 	 * directory. We've forced the other nodes to d_delete(), but
3117 	 * we never actually dropped our lock because it's still
3118 	 * valid. The downconvert code will retain a PR for this node,
3119 	 * so there's no further work to do.
3120 	 */
3121 	if (blocking == LKM_PRMODE)
3122 		return UNBLOCK_CONTINUE;
3123 
3124 	/*
3125 	 * Mark this inode as potentially orphaned. The code in
3126 	 * ocfs2_delete_inode() will figure out whether it actually
3127 	 * needs to be freed or not.
3128 	 */
3129 	spin_lock(&oi->ip_lock);
3130 	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3131 	spin_unlock(&oi->ip_lock);
3132 
3133 	/*
3134 	 * Yuck. We need to make sure however that the check of
3135 	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3136 	 * respect to a reference decrement or the setting of that
3137 	 * flag.
3138 	 */
3139 	spin_lock_irqsave(&lockres->l_lock, flags);
3140 	spin_lock(&dentry_attach_lock);
3141 	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3142 	    && dl->dl_count) {
3143 		dl->dl_count++;
3144 		extra_ref = 1;
3145 	}
3146 	spin_unlock(&dentry_attach_lock);
3147 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3148 
3149 	mlog(0, "extra_ref = %d\n", extra_ref);
3150 
3151 	/*
3152 	 * We have a process waiting on us in ocfs2_dentry_iput(),
3153 	 * which means we can't have any more outstanding
3154 	 * aliases. There's no need to do any more work.
3155 	 */
3156 	if (!extra_ref)
3157 		return UNBLOCK_CONTINUE;
3158 
3159 	spin_lock(&dentry_attach_lock);
3160 	while (1) {
3161 		dentry = ocfs2_find_local_alias(dl->dl_inode,
3162 						dl->dl_parent_blkno, 1);
3163 		if (!dentry)
3164 			break;
3165 		spin_unlock(&dentry_attach_lock);
3166 
3167 		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3168 		     dentry->d_name.name);
3169 
3170 		/*
3171 		 * The following dcache calls may do an
3172 		 * iput(). Normally we don't want that from the
3173 		 * downconverting thread, but in this case it's ok
3174 		 * because the requesting node already has an
3175 		 * exclusive lock on the inode, so it can't be queued
3176 		 * for a downconvert.
3177 		 */
3178 		d_delete(dentry);
3179 		dput(dentry);
3180 
3181 		spin_lock(&dentry_attach_lock);
3182 	}
3183 	spin_unlock(&dentry_attach_lock);
3184 
3185 	/*
3186 	 * If we are the last holder of this dentry lock, there is no
3187 	 * reason to downconvert so skip straight to the unlock.
3188 	 */
3189 	if (dl->dl_count == 1)
3190 		return UNBLOCK_STOP_POST;
3191 
3192 	return UNBLOCK_CONTINUE_POST;
3193 }
3194 
3195 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3196 				struct ocfs2_lock_res *lockres)
3197 {
3198 	int status;
3199 	struct ocfs2_unblock_ctl ctl = {0, 0,};
3200 	unsigned long flags;
3201 
3202 	/* Our reference to the lockres in this function can be
3203 	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3204 	 * flag. */
3205 
3206 	mlog_entry_void();
3207 
3208 	BUG_ON(!lockres);
3209 	BUG_ON(!lockres->l_ops);
3210 
3211 	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3212 
3213 	/* Detect whether a lock has been marked as going away while
3214 	 * the downconvert thread was processing other things. A lock can
3215 	 * still be marked with OCFS2_LOCK_FREEING after this check,
3216 	 * but short circuiting here will still save us some
3217 	 * performance. */
3218 	spin_lock_irqsave(&lockres->l_lock, flags);
3219 	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3220 		goto unqueue;
3221 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3222 
3223 	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3224 	if (status < 0)
3225 		mlog_errno(status);
3226 
3227 	spin_lock_irqsave(&lockres->l_lock, flags);
3228 unqueue:
3229 	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3230 		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3231 	} else
3232 		ocfs2_schedule_blocked_lock(osb, lockres);
3233 
3234 	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3235 	     ctl.requeue ? "yes" : "no");
3236 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3237 
3238 	if (ctl.unblock_action != UNBLOCK_CONTINUE
3239 	    && lockres->l_ops->post_unlock)
3240 		lockres->l_ops->post_unlock(osb, lockres);
3241 
3242 	mlog_exit_void();
3243 }
3244 
3245 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3246 					struct ocfs2_lock_res *lockres)
3247 {
3248 	mlog_entry_void();
3249 
3250 	assert_spin_locked(&lockres->l_lock);
3251 
3252 	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3253 		/* Do not schedule a lock for downconvert when it's on
3254 		 * the way to destruction - any nodes wanting access
3255 		 * to the resource will get it soon. */
3256 		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3257 		     lockres->l_name, lockres->l_flags);
3258 		return;
3259 	}
3260 
3261 	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3262 
3263 	spin_lock(&osb->dc_task_lock);
3264 	if (list_empty(&lockres->l_blocked_list)) {
3265 		list_add_tail(&lockres->l_blocked_list,
3266 			      &osb->blocked_lock_list);
3267 		osb->blocked_lock_count++;
3268 	}
3269 	spin_unlock(&osb->dc_task_lock);
3270 
3271 	mlog_exit_void();
3272 }
3273 
3274 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3275 {
3276 	unsigned long processed;
3277 	struct ocfs2_lock_res *lockres;
3278 
3279 	mlog_entry_void();
3280 
3281 	spin_lock(&osb->dc_task_lock);
3282 	/* grab this early so we know to try again if a state change and
3283 	 * wake happens part-way through our work  */
3284 	osb->dc_work_sequence = osb->dc_wake_sequence;
3285 
3286 	processed = osb->blocked_lock_count;
3287 	while (processed) {
3288 		BUG_ON(list_empty(&osb->blocked_lock_list));
3289 
3290 		lockres = list_entry(osb->blocked_lock_list.next,
3291 				     struct ocfs2_lock_res, l_blocked_list);
3292 		list_del_init(&lockres->l_blocked_list);
3293 		osb->blocked_lock_count--;
3294 		spin_unlock(&osb->dc_task_lock);
3295 
3296 		BUG_ON(!processed);
3297 		processed--;
3298 
3299 		ocfs2_process_blocked_lock(osb, lockres);
3300 
3301 		spin_lock(&osb->dc_task_lock);
3302 	}
3303 	spin_unlock(&osb->dc_task_lock);
3304 
3305 	mlog_exit_void();
3306 }
3307 
3308 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3309 {
3310 	int empty = 0;
3311 
3312 	spin_lock(&osb->dc_task_lock);
3313 	if (list_empty(&osb->blocked_lock_list))
3314 		empty = 1;
3315 
3316 	spin_unlock(&osb->dc_task_lock);
3317 	return empty;
3318 }
3319 
3320 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3321 {
3322 	int should_wake = 0;
3323 
3324 	spin_lock(&osb->dc_task_lock);
3325 	if (osb->dc_work_sequence != osb->dc_wake_sequence)
3326 		should_wake = 1;
3327 	spin_unlock(&osb->dc_task_lock);
3328 
3329 	return should_wake;
3330 }
3331 
3332 int ocfs2_downconvert_thread(void *arg)
3333 {
3334 	int status = 0;
3335 	struct ocfs2_super *osb = arg;
3336 
3337 	/* only quit once we've been asked to stop and there is no more
3338 	 * work available */
3339 	while (!(kthread_should_stop() &&
3340 		ocfs2_downconvert_thread_lists_empty(osb))) {
3341 
3342 		wait_event_interruptible(osb->dc_event,
3343 					 ocfs2_downconvert_thread_should_wake(osb) ||
3344 					 kthread_should_stop());
3345 
3346 		mlog(0, "downconvert_thread: awoken\n");
3347 
3348 		ocfs2_downconvert_thread_do_work(osb);
3349 	}
3350 
3351 	osb->dc_task = NULL;
3352 	return status;
3353 }
3354 
3355 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3356 {
3357 	spin_lock(&osb->dc_task_lock);
3358 	/* make sure the voting thread gets a swipe at whatever changes
3359 	 * the caller may have made to the voting state */
3360 	osb->dc_wake_sequence++;
3361 	spin_unlock(&osb->dc_task_lock);
3362 	wake_up(&osb->dc_event);
3363 }
3364