xref: /linux/fs/ocfs2/dlmglue.c (revision c537b994505099b7197e7d3125b942ecbcc51eb6)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25 
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/smp_lock.h>
31 #include <linux/crc32.h>
32 #include <linux/kthread.h>
33 #include <linux/pagemap.h>
34 #include <linux/debugfs.h>
35 #include <linux/seq_file.h>
36 
37 #include <cluster/heartbeat.h>
38 #include <cluster/nodemanager.h>
39 #include <cluster/tcp.h>
40 
41 #include <dlm/dlmapi.h>
42 
43 #define MLOG_MASK_PREFIX ML_DLM_GLUE
44 #include <cluster/masklog.h>
45 
46 #include "ocfs2.h"
47 
48 #include "alloc.h"
49 #include "dcache.h"
50 #include "dlmglue.h"
51 #include "extent_map.h"
52 #include "file.h"
53 #include "heartbeat.h"
54 #include "inode.h"
55 #include "journal.h"
56 #include "slot_map.h"
57 #include "super.h"
58 #include "uptodate.h"
59 #include "vote.h"
60 
61 #include "buffer_head_io.h"
62 
63 struct ocfs2_mask_waiter {
64 	struct list_head	mw_item;
65 	int			mw_status;
66 	struct completion	mw_complete;
67 	unsigned long		mw_mask;
68 	unsigned long		mw_goal;
69 };
70 
71 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73 
74 /*
75  * Return value from ->downconvert_worker functions.
76  *
77  * These control the precise actions of ocfs2_unblock_lock()
78  * and ocfs2_process_blocked_lock()
79  *
80  */
81 enum ocfs2_unblock_action {
82 	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
83 	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
84 				      * ->post_unlock callback */
85 	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
86 				      * ->post_unlock() callback. */
87 };
88 
89 struct ocfs2_unblock_ctl {
90 	int requeue;
91 	enum ocfs2_unblock_action unblock_action;
92 };
93 
94 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
95 					int new_level);
96 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
97 
98 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
99 				     int blocking);
100 
101 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
102 				       int blocking);
103 
104 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
105 				     struct ocfs2_lock_res *lockres);
106 
107 /*
108  * OCFS2 Lock Resource Operations
109  *
110  * These fine tune the behavior of the generic dlmglue locking infrastructure.
111  *
112  * The most basic of lock types can point ->l_priv to their respective
113  * struct ocfs2_super and allow the default actions to manage things.
114  *
115  * Right now, each lock type also needs to implement an init function,
116  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
117  * should be called when the lock is no longer needed (i.e., object
118  * destruction time).
119  */
120 struct ocfs2_lock_res_ops {
121 	/*
122 	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
123 	 * this callback if ->l_priv is not an ocfs2_super pointer
124 	 */
125 	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
126 
127 	/*
128 	 * Optionally called in the downconvert (or "vote") thread
129 	 * after a successful downconvert. The lockres will not be
130 	 * referenced after this callback is called, so it is safe to
131 	 * free memory, etc.
132 	 *
133 	 * The exact semantics of when this is called are controlled
134 	 * by ->downconvert_worker()
135 	 */
136 	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
137 
138 	/*
139 	 * Allow a lock type to add checks to determine whether it is
140 	 * safe to downconvert a lock. Return 0 to re-queue the
141 	 * downconvert at a later time, nonzero to continue.
142 	 *
143 	 * For most locks, the default checks that there are no
144 	 * incompatible holders are sufficient.
145 	 *
146 	 * Called with the lockres spinlock held.
147 	 */
148 	int (*check_downconvert)(struct ocfs2_lock_res *, int);
149 
150 	/*
151 	 * Allows a lock type to populate the lock value block. This
152 	 * is called on downconvert, and when we drop a lock.
153 	 *
154 	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
155 	 * in the flags field.
156 	 *
157 	 * Called with the lockres spinlock held.
158 	 */
159 	void (*set_lvb)(struct ocfs2_lock_res *);
160 
161 	/*
162 	 * Called from the downconvert thread when it is determined
163 	 * that a lock will be downconverted. This is called without
164 	 * any locks held so the function can do work that might
165 	 * schedule (syncing out data, etc).
166 	 *
167 	 * This should return any one of the ocfs2_unblock_action
168 	 * values, depending on what it wants the thread to do.
169 	 */
170 	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
171 
172 	/*
173 	 * LOCK_TYPE_* flags which describe the specific requirements
174 	 * of a lock type. Descriptions of each individual flag follow.
175 	 */
176 	int flags;
177 };
178 
179 /*
180  * Some locks want to "refresh" potentially stale data when a
181  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
182  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
183  * individual lockres l_flags member from the ast function. It is
184  * expected that the locking wrapper will clear the
185  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
186  */
187 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
188 
189 /*
190  * Indicate that a lock type makes use of the lock value block. The
191  * ->set_lvb lock type callback must be defined.
192  */
193 #define LOCK_TYPE_USES_LVB		0x2
194 
195 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
196 	.get_osb	= ocfs2_get_inode_osb,
197 	.flags		= 0,
198 };
199 
200 static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
201 	.get_osb	= ocfs2_get_inode_osb,
202 	.check_downconvert = ocfs2_check_meta_downconvert,
203 	.set_lvb	= ocfs2_set_meta_lvb,
204 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
205 };
206 
207 static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
208 	.get_osb	= ocfs2_get_inode_osb,
209 	.downconvert_worker = ocfs2_data_convert_worker,
210 	.flags		= 0,
211 };
212 
213 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
214 	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
215 };
216 
217 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
218 	.flags		= 0,
219 };
220 
221 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
222 	.get_osb	= ocfs2_get_dentry_osb,
223 	.post_unlock	= ocfs2_dentry_post_unlock,
224 	.downconvert_worker = ocfs2_dentry_convert_worker,
225 	.flags		= 0,
226 };
227 
228 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
229 {
230 	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
231 		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
232 		lockres->l_type == OCFS2_LOCK_TYPE_RW;
233 }
234 
235 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
236 {
237 	BUG_ON(!ocfs2_is_inode_lock(lockres));
238 
239 	return (struct inode *) lockres->l_priv;
240 }
241 
242 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
243 {
244 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
245 
246 	return (struct ocfs2_dentry_lock *)lockres->l_priv;
247 }
248 
249 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
250 {
251 	if (lockres->l_ops->get_osb)
252 		return lockres->l_ops->get_osb(lockres);
253 
254 	return (struct ocfs2_super *)lockres->l_priv;
255 }
256 
257 static int ocfs2_lock_create(struct ocfs2_super *osb,
258 			     struct ocfs2_lock_res *lockres,
259 			     int level,
260 			     int dlm_flags);
261 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
262 						     int wanted);
263 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
264 				 struct ocfs2_lock_res *lockres,
265 				 int level);
266 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
267 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
268 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
269 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
270 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
271 					struct ocfs2_lock_res *lockres);
272 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
273 						int convert);
274 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
275 	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
276 		"resource %s: %s\n", dlm_errname(_stat), _func,	\
277 		_lockres->l_name, dlm_errmsg(_stat));		\
278 } while (0)
279 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
280 				 struct ocfs2_lock_res *lockres);
281 static int ocfs2_meta_lock_update(struct inode *inode,
282 				  struct buffer_head **bh);
283 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
284 static inline int ocfs2_highest_compat_lock_level(int level);
285 
286 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
287 				  u64 blkno,
288 				  u32 generation,
289 				  char *name)
290 {
291 	int len;
292 
293 	mlog_entry_void();
294 
295 	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
296 
297 	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
298 		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
299 		       (long long)blkno, generation);
300 
301 	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
302 
303 	mlog(0, "built lock resource with name: %s\n", name);
304 
305 	mlog_exit_void();
306 }
307 
308 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
309 
310 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
311 				       struct ocfs2_dlm_debug *dlm_debug)
312 {
313 	mlog(0, "Add tracking for lockres %s\n", res->l_name);
314 
315 	spin_lock(&ocfs2_dlm_tracking_lock);
316 	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
317 	spin_unlock(&ocfs2_dlm_tracking_lock);
318 }
319 
320 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
321 {
322 	spin_lock(&ocfs2_dlm_tracking_lock);
323 	if (!list_empty(&res->l_debug_list))
324 		list_del_init(&res->l_debug_list);
325 	spin_unlock(&ocfs2_dlm_tracking_lock);
326 }
327 
328 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
329 				       struct ocfs2_lock_res *res,
330 				       enum ocfs2_lock_type type,
331 				       struct ocfs2_lock_res_ops *ops,
332 				       void *priv)
333 {
334 	res->l_type          = type;
335 	res->l_ops           = ops;
336 	res->l_priv          = priv;
337 
338 	res->l_level         = LKM_IVMODE;
339 	res->l_requested     = LKM_IVMODE;
340 	res->l_blocking      = LKM_IVMODE;
341 	res->l_action        = OCFS2_AST_INVALID;
342 	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
343 
344 	res->l_flags         = OCFS2_LOCK_INITIALIZED;
345 
346 	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
347 }
348 
349 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
350 {
351 	/* This also clears out the lock status block */
352 	memset(res, 0, sizeof(struct ocfs2_lock_res));
353 	spin_lock_init(&res->l_lock);
354 	init_waitqueue_head(&res->l_event);
355 	INIT_LIST_HEAD(&res->l_blocked_list);
356 	INIT_LIST_HEAD(&res->l_mask_waiters);
357 }
358 
359 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
360 			       enum ocfs2_lock_type type,
361 			       unsigned int generation,
362 			       struct inode *inode)
363 {
364 	struct ocfs2_lock_res_ops *ops;
365 
366 	switch(type) {
367 		case OCFS2_LOCK_TYPE_RW:
368 			ops = &ocfs2_inode_rw_lops;
369 			break;
370 		case OCFS2_LOCK_TYPE_META:
371 			ops = &ocfs2_inode_meta_lops;
372 			break;
373 		case OCFS2_LOCK_TYPE_DATA:
374 			ops = &ocfs2_inode_data_lops;
375 			break;
376 		default:
377 			mlog_bug_on_msg(1, "type: %d\n", type);
378 			ops = NULL; /* thanks, gcc */
379 			break;
380 	};
381 
382 	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
383 			      generation, res->l_name);
384 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
385 }
386 
387 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
388 {
389 	struct inode *inode = ocfs2_lock_res_inode(lockres);
390 
391 	return OCFS2_SB(inode->i_sb);
392 }
393 
394 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
395 {
396 	__be64 inode_blkno_be;
397 
398 	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
399 	       sizeof(__be64));
400 
401 	return be64_to_cpu(inode_blkno_be);
402 }
403 
404 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
405 {
406 	struct ocfs2_dentry_lock *dl = lockres->l_priv;
407 
408 	return OCFS2_SB(dl->dl_inode->i_sb);
409 }
410 
411 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
412 				u64 parent, struct inode *inode)
413 {
414 	int len;
415 	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
416 	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
417 	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
418 
419 	ocfs2_lock_res_init_once(lockres);
420 
421 	/*
422 	 * Unfortunately, the standard lock naming scheme won't work
423 	 * here because we have two 16 byte values to use. Instead,
424 	 * we'll stuff the inode number as a binary value. We still
425 	 * want error prints to show something without garbling the
426 	 * display, so drop a null byte in there before the inode
427 	 * number. A future version of OCFS2 will likely use all
428 	 * binary lock names. The stringified names have been a
429 	 * tremendous aid in debugging, but now that the debugfs
430 	 * interface exists, we can mangle things there if need be.
431 	 *
432 	 * NOTE: We also drop the standard "pad" value (the total lock
433 	 * name size stays the same though - the last part is all
434 	 * zeros due to the memset in ocfs2_lock_res_init_once()
435 	 */
436 	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
437 		       "%c%016llx",
438 		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
439 		       (long long)parent);
440 
441 	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
442 
443 	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
444 	       sizeof(__be64));
445 
446 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
447 				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
448 				   dl);
449 }
450 
451 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
452 				      struct ocfs2_super *osb)
453 {
454 	/* Superblock lockres doesn't come from a slab so we call init
455 	 * once on it manually.  */
456 	ocfs2_lock_res_init_once(res);
457 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
458 			      0, res->l_name);
459 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
460 				   &ocfs2_super_lops, osb);
461 }
462 
463 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
464 				       struct ocfs2_super *osb)
465 {
466 	/* Rename lockres doesn't come from a slab so we call init
467 	 * once on it manually.  */
468 	ocfs2_lock_res_init_once(res);
469 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
470 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
471 				   &ocfs2_rename_lops, osb);
472 }
473 
474 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
475 {
476 	mlog_entry_void();
477 
478 	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
479 		return;
480 
481 	ocfs2_remove_lockres_tracking(res);
482 
483 	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
484 			"Lockres %s is on the blocked list\n",
485 			res->l_name);
486 	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
487 			"Lockres %s has mask waiters pending\n",
488 			res->l_name);
489 	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
490 			"Lockres %s is locked\n",
491 			res->l_name);
492 	mlog_bug_on_msg(res->l_ro_holders,
493 			"Lockres %s has %u ro holders\n",
494 			res->l_name, res->l_ro_holders);
495 	mlog_bug_on_msg(res->l_ex_holders,
496 			"Lockres %s has %u ex holders\n",
497 			res->l_name, res->l_ex_holders);
498 
499 	/* Need to clear out the lock status block for the dlm */
500 	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
501 
502 	res->l_flags = 0UL;
503 	mlog_exit_void();
504 }
505 
506 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
507 				     int level)
508 {
509 	mlog_entry_void();
510 
511 	BUG_ON(!lockres);
512 
513 	switch(level) {
514 	case LKM_EXMODE:
515 		lockres->l_ex_holders++;
516 		break;
517 	case LKM_PRMODE:
518 		lockres->l_ro_holders++;
519 		break;
520 	default:
521 		BUG();
522 	}
523 
524 	mlog_exit_void();
525 }
526 
527 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
528 				     int level)
529 {
530 	mlog_entry_void();
531 
532 	BUG_ON(!lockres);
533 
534 	switch(level) {
535 	case LKM_EXMODE:
536 		BUG_ON(!lockres->l_ex_holders);
537 		lockres->l_ex_holders--;
538 		break;
539 	case LKM_PRMODE:
540 		BUG_ON(!lockres->l_ro_holders);
541 		lockres->l_ro_holders--;
542 		break;
543 	default:
544 		BUG();
545 	}
546 	mlog_exit_void();
547 }
548 
549 /* WARNING: This function lives in a world where the only three lock
550  * levels are EX, PR, and NL. It *will* have to be adjusted when more
551  * lock types are added. */
552 static inline int ocfs2_highest_compat_lock_level(int level)
553 {
554 	int new_level = LKM_EXMODE;
555 
556 	if (level == LKM_EXMODE)
557 		new_level = LKM_NLMODE;
558 	else if (level == LKM_PRMODE)
559 		new_level = LKM_PRMODE;
560 	return new_level;
561 }
562 
563 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
564 			      unsigned long newflags)
565 {
566 	struct list_head *pos, *tmp;
567 	struct ocfs2_mask_waiter *mw;
568 
569  	assert_spin_locked(&lockres->l_lock);
570 
571 	lockres->l_flags = newflags;
572 
573 	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
574 		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
575 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
576 			continue;
577 
578 		list_del_init(&mw->mw_item);
579 		mw->mw_status = 0;
580 		complete(&mw->mw_complete);
581 	}
582 }
583 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
584 {
585 	lockres_set_flags(lockres, lockres->l_flags | or);
586 }
587 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
588 				unsigned long clear)
589 {
590 	lockres_set_flags(lockres, lockres->l_flags & ~clear);
591 }
592 
593 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
594 {
595 	mlog_entry_void();
596 
597 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
598 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
599 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
600 	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
601 
602 	lockres->l_level = lockres->l_requested;
603 	if (lockres->l_level <=
604 	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
605 		lockres->l_blocking = LKM_NLMODE;
606 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
607 	}
608 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
609 
610 	mlog_exit_void();
611 }
612 
613 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
614 {
615 	mlog_entry_void();
616 
617 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
618 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
619 
620 	/* Convert from RO to EX doesn't really need anything as our
621 	 * information is already up to data. Convert from NL to
622 	 * *anything* however should mark ourselves as needing an
623 	 * update */
624 	if (lockres->l_level == LKM_NLMODE &&
625 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
626 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
627 
628 	lockres->l_level = lockres->l_requested;
629 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
630 
631 	mlog_exit_void();
632 }
633 
634 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
635 {
636 	mlog_entry_void();
637 
638 	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
639 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
640 
641 	if (lockres->l_requested > LKM_NLMODE &&
642 	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
643 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
644 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
645 
646 	lockres->l_level = lockres->l_requested;
647 	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
648 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
649 
650 	mlog_exit_void();
651 }
652 
653 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
654 				     int level)
655 {
656 	int needs_downconvert = 0;
657 	mlog_entry_void();
658 
659 	assert_spin_locked(&lockres->l_lock);
660 
661 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
662 
663 	if (level > lockres->l_blocking) {
664 		/* only schedule a downconvert if we haven't already scheduled
665 		 * one that goes low enough to satisfy the level we're
666 		 * blocking.  this also catches the case where we get
667 		 * duplicate BASTs */
668 		if (ocfs2_highest_compat_lock_level(level) <
669 		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
670 			needs_downconvert = 1;
671 
672 		lockres->l_blocking = level;
673 	}
674 
675 	mlog_exit(needs_downconvert);
676 	return needs_downconvert;
677 }
678 
679 static void ocfs2_blocking_ast(void *opaque, int level)
680 {
681 	struct ocfs2_lock_res *lockres = opaque;
682 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
683 	int needs_downconvert;
684 	unsigned long flags;
685 
686 	BUG_ON(level <= LKM_NLMODE);
687 
688 	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
689 	     lockres->l_name, level, lockres->l_level,
690 	     ocfs2_lock_type_string(lockres->l_type));
691 
692 	spin_lock_irqsave(&lockres->l_lock, flags);
693 	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
694 	if (needs_downconvert)
695 		ocfs2_schedule_blocked_lock(osb, lockres);
696 	spin_unlock_irqrestore(&lockres->l_lock, flags);
697 
698 	wake_up(&lockres->l_event);
699 
700 	ocfs2_kick_vote_thread(osb);
701 }
702 
703 static void ocfs2_locking_ast(void *opaque)
704 {
705 	struct ocfs2_lock_res *lockres = opaque;
706 	struct dlm_lockstatus *lksb = &lockres->l_lksb;
707 	unsigned long flags;
708 
709 	spin_lock_irqsave(&lockres->l_lock, flags);
710 
711 	if (lksb->status != DLM_NORMAL) {
712 		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
713 		     lockres->l_name, lksb->status);
714 		spin_unlock_irqrestore(&lockres->l_lock, flags);
715 		return;
716 	}
717 
718 	switch(lockres->l_action) {
719 	case OCFS2_AST_ATTACH:
720 		ocfs2_generic_handle_attach_action(lockres);
721 		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
722 		break;
723 	case OCFS2_AST_CONVERT:
724 		ocfs2_generic_handle_convert_action(lockres);
725 		break;
726 	case OCFS2_AST_DOWNCONVERT:
727 		ocfs2_generic_handle_downconvert_action(lockres);
728 		break;
729 	default:
730 		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
731 		     "lockres flags = 0x%lx, unlock action: %u\n",
732 		     lockres->l_name, lockres->l_action, lockres->l_flags,
733 		     lockres->l_unlock_action);
734 		BUG();
735 	}
736 
737 	/* set it to something invalid so if we get called again we
738 	 * can catch it. */
739 	lockres->l_action = OCFS2_AST_INVALID;
740 
741 	wake_up(&lockres->l_event);
742 	spin_unlock_irqrestore(&lockres->l_lock, flags);
743 }
744 
745 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
746 						int convert)
747 {
748 	unsigned long flags;
749 
750 	mlog_entry_void();
751 	spin_lock_irqsave(&lockres->l_lock, flags);
752 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
753 	if (convert)
754 		lockres->l_action = OCFS2_AST_INVALID;
755 	else
756 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
757 	spin_unlock_irqrestore(&lockres->l_lock, flags);
758 
759 	wake_up(&lockres->l_event);
760 	mlog_exit_void();
761 }
762 
763 /* Note: If we detect another process working on the lock (i.e.,
764  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
765  * to do the right thing in that case.
766  */
767 static int ocfs2_lock_create(struct ocfs2_super *osb,
768 			     struct ocfs2_lock_res *lockres,
769 			     int level,
770 			     int dlm_flags)
771 {
772 	int ret = 0;
773 	enum dlm_status status = DLM_NORMAL;
774 	unsigned long flags;
775 
776 	mlog_entry_void();
777 
778 	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
779 	     dlm_flags);
780 
781 	spin_lock_irqsave(&lockres->l_lock, flags);
782 	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
783 	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
784 		spin_unlock_irqrestore(&lockres->l_lock, flags);
785 		goto bail;
786 	}
787 
788 	lockres->l_action = OCFS2_AST_ATTACH;
789 	lockres->l_requested = level;
790 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
791 	spin_unlock_irqrestore(&lockres->l_lock, flags);
792 
793 	status = dlmlock(osb->dlm,
794 			 level,
795 			 &lockres->l_lksb,
796 			 dlm_flags,
797 			 lockres->l_name,
798 			 OCFS2_LOCK_ID_MAX_LEN - 1,
799 			 ocfs2_locking_ast,
800 			 lockres,
801 			 ocfs2_blocking_ast);
802 	if (status != DLM_NORMAL) {
803 		ocfs2_log_dlm_error("dlmlock", status, lockres);
804 		ret = -EINVAL;
805 		ocfs2_recover_from_dlm_error(lockres, 1);
806 	}
807 
808 	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
809 
810 bail:
811 	mlog_exit(ret);
812 	return ret;
813 }
814 
815 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
816 					int flag)
817 {
818 	unsigned long flags;
819 	int ret;
820 
821 	spin_lock_irqsave(&lockres->l_lock, flags);
822 	ret = lockres->l_flags & flag;
823 	spin_unlock_irqrestore(&lockres->l_lock, flags);
824 
825 	return ret;
826 }
827 
828 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
829 
830 {
831 	wait_event(lockres->l_event,
832 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
833 }
834 
835 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
836 
837 {
838 	wait_event(lockres->l_event,
839 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
840 }
841 
842 /* predict what lock level we'll be dropping down to on behalf
843  * of another node, and return true if the currently wanted
844  * level will be compatible with it. */
845 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
846 						     int wanted)
847 {
848 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
849 
850 	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
851 }
852 
853 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
854 {
855 	INIT_LIST_HEAD(&mw->mw_item);
856 	init_completion(&mw->mw_complete);
857 }
858 
859 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
860 {
861 	wait_for_completion(&mw->mw_complete);
862 	/* Re-arm the completion in case we want to wait on it again */
863 	INIT_COMPLETION(mw->mw_complete);
864 	return mw->mw_status;
865 }
866 
867 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
868 				    struct ocfs2_mask_waiter *mw,
869 				    unsigned long mask,
870 				    unsigned long goal)
871 {
872 	BUG_ON(!list_empty(&mw->mw_item));
873 
874 	assert_spin_locked(&lockres->l_lock);
875 
876 	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
877 	mw->mw_mask = mask;
878 	mw->mw_goal = goal;
879 }
880 
881 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
882  * if the mask still hadn't reached its goal */
883 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
884 				      struct ocfs2_mask_waiter *mw)
885 {
886 	unsigned long flags;
887 	int ret = 0;
888 
889 	spin_lock_irqsave(&lockres->l_lock, flags);
890 	if (!list_empty(&mw->mw_item)) {
891 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
892 			ret = -EBUSY;
893 
894 		list_del_init(&mw->mw_item);
895 		init_completion(&mw->mw_complete);
896 	}
897 	spin_unlock_irqrestore(&lockres->l_lock, flags);
898 
899 	return ret;
900 
901 }
902 
903 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
904 			      struct ocfs2_lock_res *lockres,
905 			      int level,
906 			      int lkm_flags,
907 			      int arg_flags)
908 {
909 	struct ocfs2_mask_waiter mw;
910 	enum dlm_status status;
911 	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
912 	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
913 	unsigned long flags;
914 
915 	mlog_entry_void();
916 
917 	ocfs2_init_mask_waiter(&mw);
918 
919 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
920 		lkm_flags |= LKM_VALBLK;
921 
922 again:
923 	wait = 0;
924 
925 	if (catch_signals && signal_pending(current)) {
926 		ret = -ERESTARTSYS;
927 		goto out;
928 	}
929 
930 	spin_lock_irqsave(&lockres->l_lock, flags);
931 
932 	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
933 			"Cluster lock called on freeing lockres %s! flags "
934 			"0x%lx\n", lockres->l_name, lockres->l_flags);
935 
936 	/* We only compare against the currently granted level
937 	 * here. If the lock is blocked waiting on a downconvert,
938 	 * we'll get caught below. */
939 	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
940 	    level > lockres->l_level) {
941 		/* is someone sitting in dlm_lock? If so, wait on
942 		 * them. */
943 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
944 		wait = 1;
945 		goto unlock;
946 	}
947 
948 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
949 		/* lock has not been created yet. */
950 		spin_unlock_irqrestore(&lockres->l_lock, flags);
951 
952 		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
953 		if (ret < 0) {
954 			mlog_errno(ret);
955 			goto out;
956 		}
957 		goto again;
958 	}
959 
960 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
961 	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
962 		/* is the lock is currently blocked on behalf of
963 		 * another node */
964 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
965 		wait = 1;
966 		goto unlock;
967 	}
968 
969 	if (level > lockres->l_level) {
970 		if (lockres->l_action != OCFS2_AST_INVALID)
971 			mlog(ML_ERROR, "lockres %s has action %u pending\n",
972 			     lockres->l_name, lockres->l_action);
973 
974 		lockres->l_action = OCFS2_AST_CONVERT;
975 		lockres->l_requested = level;
976 		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
977 		spin_unlock_irqrestore(&lockres->l_lock, flags);
978 
979 		BUG_ON(level == LKM_IVMODE);
980 		BUG_ON(level == LKM_NLMODE);
981 
982 		mlog(0, "lock %s, convert from %d to level = %d\n",
983 		     lockres->l_name, lockres->l_level, level);
984 
985 		/* call dlm_lock to upgrade lock now */
986 		status = dlmlock(osb->dlm,
987 				 level,
988 				 &lockres->l_lksb,
989 				 lkm_flags|LKM_CONVERT,
990 				 lockres->l_name,
991 				 OCFS2_LOCK_ID_MAX_LEN - 1,
992 				 ocfs2_locking_ast,
993 				 lockres,
994 				 ocfs2_blocking_ast);
995 		if (status != DLM_NORMAL) {
996 			if ((lkm_flags & LKM_NOQUEUE) &&
997 			    (status == DLM_NOTQUEUED))
998 				ret = -EAGAIN;
999 			else {
1000 				ocfs2_log_dlm_error("dlmlock", status,
1001 						    lockres);
1002 				ret = -EINVAL;
1003 			}
1004 			ocfs2_recover_from_dlm_error(lockres, 1);
1005 			goto out;
1006 		}
1007 
1008 		mlog(0, "lock %s, successfull return from dlmlock\n",
1009 		     lockres->l_name);
1010 
1011 		/* At this point we've gone inside the dlm and need to
1012 		 * complete our work regardless. */
1013 		catch_signals = 0;
1014 
1015 		/* wait for busy to clear and carry on */
1016 		goto again;
1017 	}
1018 
1019 	/* Ok, if we get here then we're good to go. */
1020 	ocfs2_inc_holders(lockres, level);
1021 
1022 	ret = 0;
1023 unlock:
1024 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1025 out:
1026 	/*
1027 	 * This is helping work around a lock inversion between the page lock
1028 	 * and dlm locks.  One path holds the page lock while calling aops
1029 	 * which block acquiring dlm locks.  The voting thread holds dlm
1030 	 * locks while acquiring page locks while down converting data locks.
1031 	 * This block is helping an aop path notice the inversion and back
1032 	 * off to unlock its page lock before trying the dlm lock again.
1033 	 */
1034 	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1035 	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1036 		wait = 0;
1037 		if (lockres_remove_mask_waiter(lockres, &mw))
1038 			ret = -EAGAIN;
1039 		else
1040 			goto again;
1041 	}
1042 	if (wait) {
1043 		ret = ocfs2_wait_for_mask(&mw);
1044 		if (ret == 0)
1045 			goto again;
1046 		mlog_errno(ret);
1047 	}
1048 
1049 	mlog_exit(ret);
1050 	return ret;
1051 }
1052 
1053 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1054 				 struct ocfs2_lock_res *lockres,
1055 				 int level)
1056 {
1057 	unsigned long flags;
1058 
1059 	mlog_entry_void();
1060 	spin_lock_irqsave(&lockres->l_lock, flags);
1061 	ocfs2_dec_holders(lockres, level);
1062 	ocfs2_vote_on_unlock(osb, lockres);
1063 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1064 	mlog_exit_void();
1065 }
1066 
1067 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1068 				 struct ocfs2_lock_res *lockres,
1069 				 int ex,
1070 				 int local)
1071 {
1072 	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1073 	unsigned long flags;
1074 	int lkm_flags = local ? LKM_LOCAL : 0;
1075 
1076 	spin_lock_irqsave(&lockres->l_lock, flags);
1077 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1078 	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1079 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1080 
1081 	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1082 }
1083 
1084 /* Grants us an EX lock on the data and metadata resources, skipping
1085  * the normal cluster directory lookup. Use this ONLY on newly created
1086  * inodes which other nodes can't possibly see, and which haven't been
1087  * hashed in the inode hash yet. This can give us a good performance
1088  * increase as it'll skip the network broadcast normally associated
1089  * with creating a new lock resource. */
1090 int ocfs2_create_new_inode_locks(struct inode *inode)
1091 {
1092 	int ret;
1093 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1094 
1095 	BUG_ON(!inode);
1096 	BUG_ON(!ocfs2_inode_is_new(inode));
1097 
1098 	mlog_entry_void();
1099 
1100 	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1101 
1102 	/* NOTE: That we don't increment any of the holder counts, nor
1103 	 * do we add anything to a journal handle. Since this is
1104 	 * supposed to be a new inode which the cluster doesn't know
1105 	 * about yet, there is no need to.  As far as the LVB handling
1106 	 * is concerned, this is basically like acquiring an EX lock
1107 	 * on a resource which has an invalid one -- we'll set it
1108 	 * valid when we release the EX. */
1109 
1110 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1111 	if (ret) {
1112 		mlog_errno(ret);
1113 		goto bail;
1114 	}
1115 
1116 	/*
1117 	 * We don't want to use LKM_LOCAL on a meta data lock as they
1118 	 * don't use a generation in their lock names.
1119 	 */
1120 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1121 	if (ret) {
1122 		mlog_errno(ret);
1123 		goto bail;
1124 	}
1125 
1126 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1127 	if (ret) {
1128 		mlog_errno(ret);
1129 		goto bail;
1130 	}
1131 
1132 bail:
1133 	mlog_exit(ret);
1134 	return ret;
1135 }
1136 
1137 int ocfs2_rw_lock(struct inode *inode, int write)
1138 {
1139 	int status, level;
1140 	struct ocfs2_lock_res *lockres;
1141 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1142 
1143 	BUG_ON(!inode);
1144 
1145 	mlog_entry_void();
1146 
1147 	mlog(0, "inode %llu take %s RW lock\n",
1148 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1149 	     write ? "EXMODE" : "PRMODE");
1150 
1151 	if (ocfs2_mount_local(osb))
1152 		return 0;
1153 
1154 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1155 
1156 	level = write ? LKM_EXMODE : LKM_PRMODE;
1157 
1158 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1159 				    0);
1160 	if (status < 0)
1161 		mlog_errno(status);
1162 
1163 	mlog_exit(status);
1164 	return status;
1165 }
1166 
1167 void ocfs2_rw_unlock(struct inode *inode, int write)
1168 {
1169 	int level = write ? LKM_EXMODE : LKM_PRMODE;
1170 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1171 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1172 
1173 	mlog_entry_void();
1174 
1175 	mlog(0, "inode %llu drop %s RW lock\n",
1176 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1177 	     write ? "EXMODE" : "PRMODE");
1178 
1179 	if (!ocfs2_mount_local(osb))
1180 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1181 
1182 	mlog_exit_void();
1183 }
1184 
1185 int ocfs2_data_lock_full(struct inode *inode,
1186 			 int write,
1187 			 int arg_flags)
1188 {
1189 	int status = 0, level;
1190 	struct ocfs2_lock_res *lockres;
1191 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1192 
1193 	BUG_ON(!inode);
1194 
1195 	mlog_entry_void();
1196 
1197 	mlog(0, "inode %llu take %s DATA lock\n",
1198 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1199 	     write ? "EXMODE" : "PRMODE");
1200 
1201 	/* We'll allow faking a readonly data lock for
1202 	 * rodevices. */
1203 	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1204 		if (write) {
1205 			status = -EROFS;
1206 			mlog_errno(status);
1207 		}
1208 		goto out;
1209 	}
1210 
1211 	if (ocfs2_mount_local(osb))
1212 		goto out;
1213 
1214 	lockres = &OCFS2_I(inode)->ip_data_lockres;
1215 
1216 	level = write ? LKM_EXMODE : LKM_PRMODE;
1217 
1218 	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1219 				    0, arg_flags);
1220 	if (status < 0 && status != -EAGAIN)
1221 		mlog_errno(status);
1222 
1223 out:
1224 	mlog_exit(status);
1225 	return status;
1226 }
1227 
1228 /* see ocfs2_meta_lock_with_page() */
1229 int ocfs2_data_lock_with_page(struct inode *inode,
1230 			      int write,
1231 			      struct page *page)
1232 {
1233 	int ret;
1234 
1235 	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1236 	if (ret == -EAGAIN) {
1237 		unlock_page(page);
1238 		if (ocfs2_data_lock(inode, write) == 0)
1239 			ocfs2_data_unlock(inode, write);
1240 		ret = AOP_TRUNCATED_PAGE;
1241 	}
1242 
1243 	return ret;
1244 }
1245 
1246 static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1247 				 struct ocfs2_lock_res *lockres)
1248 {
1249 	int kick = 0;
1250 
1251 	mlog_entry_void();
1252 
1253 	/* If we know that another node is waiting on our lock, kick
1254 	 * the vote thread * pre-emptively when we reach a release
1255 	 * condition. */
1256 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1257 		switch(lockres->l_blocking) {
1258 		case LKM_EXMODE:
1259 			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1260 				kick = 1;
1261 			break;
1262 		case LKM_PRMODE:
1263 			if (!lockres->l_ex_holders)
1264 				kick = 1;
1265 			break;
1266 		default:
1267 			BUG();
1268 		}
1269 	}
1270 
1271 	if (kick)
1272 		ocfs2_kick_vote_thread(osb);
1273 
1274 	mlog_exit_void();
1275 }
1276 
1277 void ocfs2_data_unlock(struct inode *inode,
1278 		       int write)
1279 {
1280 	int level = write ? LKM_EXMODE : LKM_PRMODE;
1281 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1282 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1283 
1284 	mlog_entry_void();
1285 
1286 	mlog(0, "inode %llu drop %s DATA lock\n",
1287 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1288 	     write ? "EXMODE" : "PRMODE");
1289 
1290 	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1291 	    !ocfs2_mount_local(osb))
1292 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1293 
1294 	mlog_exit_void();
1295 }
1296 
1297 #define OCFS2_SEC_BITS   34
1298 #define OCFS2_SEC_SHIFT  (64 - 34)
1299 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1300 
1301 /* LVB only has room for 64 bits of time here so we pack it for
1302  * now. */
1303 static u64 ocfs2_pack_timespec(struct timespec *spec)
1304 {
1305 	u64 res;
1306 	u64 sec = spec->tv_sec;
1307 	u32 nsec = spec->tv_nsec;
1308 
1309 	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1310 
1311 	return res;
1312 }
1313 
1314 /* Call this with the lockres locked. I am reasonably sure we don't
1315  * need ip_lock in this function as anyone who would be changing those
1316  * values is supposed to be blocked in ocfs2_meta_lock right now. */
1317 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1318 {
1319 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1320 	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1321 	struct ocfs2_meta_lvb *lvb;
1322 
1323 	mlog_entry_void();
1324 
1325 	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1326 
1327 	/*
1328 	 * Invalidate the LVB of a deleted inode - this way other
1329 	 * nodes are forced to go to disk and discover the new inode
1330 	 * status.
1331 	 */
1332 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1333 		lvb->lvb_version = 0;
1334 		goto out;
1335 	}
1336 
1337 	lvb->lvb_version   = OCFS2_LVB_VERSION;
1338 	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1339 	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1340 	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1341 	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1342 	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1343 	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1344 	lvb->lvb_iatime_packed  =
1345 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1346 	lvb->lvb_ictime_packed =
1347 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1348 	lvb->lvb_imtime_packed =
1349 		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1350 	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1351 	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1352 
1353 out:
1354 	mlog_meta_lvb(0, lockres);
1355 
1356 	mlog_exit_void();
1357 }
1358 
1359 static void ocfs2_unpack_timespec(struct timespec *spec,
1360 				  u64 packed_time)
1361 {
1362 	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1363 	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1364 }
1365 
1366 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1367 {
1368 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1369 	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1370 	struct ocfs2_meta_lvb *lvb;
1371 
1372 	mlog_entry_void();
1373 
1374 	mlog_meta_lvb(0, lockres);
1375 
1376 	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1377 
1378 	/* We're safe here without the lockres lock... */
1379 	spin_lock(&oi->ip_lock);
1380 	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1381 	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1382 
1383 	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1384 	ocfs2_set_inode_flags(inode);
1385 
1386 	/* fast-symlinks are a special case */
1387 	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1388 		inode->i_blocks = 0;
1389 	else
1390 		inode->i_blocks =
1391 			ocfs2_align_bytes_to_sectors(i_size_read(inode));
1392 
1393 	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1394 	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1395 	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1396 	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1397 	ocfs2_unpack_timespec(&inode->i_atime,
1398 			      be64_to_cpu(lvb->lvb_iatime_packed));
1399 	ocfs2_unpack_timespec(&inode->i_mtime,
1400 			      be64_to_cpu(lvb->lvb_imtime_packed));
1401 	ocfs2_unpack_timespec(&inode->i_ctime,
1402 			      be64_to_cpu(lvb->lvb_ictime_packed));
1403 	spin_unlock(&oi->ip_lock);
1404 
1405 	mlog_exit_void();
1406 }
1407 
1408 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1409 					      struct ocfs2_lock_res *lockres)
1410 {
1411 	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1412 
1413 	if (lvb->lvb_version == OCFS2_LVB_VERSION
1414 	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1415 		return 1;
1416 	return 0;
1417 }
1418 
1419 /* Determine whether a lock resource needs to be refreshed, and
1420  * arbitrate who gets to refresh it.
1421  *
1422  *   0 means no refresh needed.
1423  *
1424  *   > 0 means you need to refresh this and you MUST call
1425  *   ocfs2_complete_lock_res_refresh afterwards. */
1426 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1427 {
1428 	unsigned long flags;
1429 	int status = 0;
1430 
1431 	mlog_entry_void();
1432 
1433 refresh_check:
1434 	spin_lock_irqsave(&lockres->l_lock, flags);
1435 	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1436 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1437 		goto bail;
1438 	}
1439 
1440 	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1441 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1442 
1443 		ocfs2_wait_on_refreshing_lock(lockres);
1444 		goto refresh_check;
1445 	}
1446 
1447 	/* Ok, I'll be the one to refresh this lock. */
1448 	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1449 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1450 
1451 	status = 1;
1452 bail:
1453 	mlog_exit(status);
1454 	return status;
1455 }
1456 
1457 /* If status is non zero, I'll mark it as not being in refresh
1458  * anymroe, but i won't clear the needs refresh flag. */
1459 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1460 						   int status)
1461 {
1462 	unsigned long flags;
1463 	mlog_entry_void();
1464 
1465 	spin_lock_irqsave(&lockres->l_lock, flags);
1466 	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1467 	if (!status)
1468 		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1469 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1470 
1471 	wake_up(&lockres->l_event);
1472 
1473 	mlog_exit_void();
1474 }
1475 
1476 /* may or may not return a bh if it went to disk. */
1477 static int ocfs2_meta_lock_update(struct inode *inode,
1478 				  struct buffer_head **bh)
1479 {
1480 	int status = 0;
1481 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1482 	struct ocfs2_lock_res *lockres = NULL;
1483 	struct ocfs2_dinode *fe;
1484 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1485 
1486 	mlog_entry_void();
1487 
1488 	spin_lock(&oi->ip_lock);
1489 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1490 		mlog(0, "Orphaned inode %llu was deleted while we "
1491 		     "were waiting on a lock. ip_flags = 0x%x\n",
1492 		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1493 		spin_unlock(&oi->ip_lock);
1494 		status = -ENOENT;
1495 		goto bail;
1496 	}
1497 	spin_unlock(&oi->ip_lock);
1498 
1499 	if (!ocfs2_mount_local(osb)) {
1500 		lockres = &oi->ip_meta_lockres;
1501 
1502 		if (!ocfs2_should_refresh_lock_res(lockres))
1503 			goto bail;
1504 	}
1505 
1506 	/* This will discard any caching information we might have had
1507 	 * for the inode metadata. */
1508 	ocfs2_metadata_cache_purge(inode);
1509 
1510 	/* will do nothing for inode types that don't use the extent
1511 	 * map (directories, bitmap files, etc) */
1512 	ocfs2_extent_map_trunc(inode, 0);
1513 
1514 	if (lockres && ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1515 		mlog(0, "Trusting LVB on inode %llu\n",
1516 		     (unsigned long long)oi->ip_blkno);
1517 		ocfs2_refresh_inode_from_lvb(inode);
1518 	} else {
1519 		/* Boo, we have to go to disk. */
1520 		/* read bh, cast, ocfs2_refresh_inode */
1521 		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1522 					  bh, OCFS2_BH_CACHED, inode);
1523 		if (status < 0) {
1524 			mlog_errno(status);
1525 			goto bail_refresh;
1526 		}
1527 		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1528 
1529 		/* This is a good chance to make sure we're not
1530 		 * locking an invalid object.
1531 		 *
1532 		 * We bug on a stale inode here because we checked
1533 		 * above whether it was wiped from disk. The wiping
1534 		 * node provides a guarantee that we receive that
1535 		 * message and can mark the inode before dropping any
1536 		 * locks associated with it. */
1537 		if (!OCFS2_IS_VALID_DINODE(fe)) {
1538 			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1539 			status = -EIO;
1540 			goto bail_refresh;
1541 		}
1542 		mlog_bug_on_msg(inode->i_generation !=
1543 				le32_to_cpu(fe->i_generation),
1544 				"Invalid dinode %llu disk generation: %u "
1545 				"inode->i_generation: %u\n",
1546 				(unsigned long long)oi->ip_blkno,
1547 				le32_to_cpu(fe->i_generation),
1548 				inode->i_generation);
1549 		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1550 				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1551 				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1552 				(unsigned long long)oi->ip_blkno,
1553 				(unsigned long long)le64_to_cpu(fe->i_dtime),
1554 				le32_to_cpu(fe->i_flags));
1555 
1556 		ocfs2_refresh_inode(inode, fe);
1557 	}
1558 
1559 	status = 0;
1560 bail_refresh:
1561 	if (lockres)
1562 		ocfs2_complete_lock_res_refresh(lockres, status);
1563 bail:
1564 	mlog_exit(status);
1565 	return status;
1566 }
1567 
1568 static int ocfs2_assign_bh(struct inode *inode,
1569 			   struct buffer_head **ret_bh,
1570 			   struct buffer_head *passed_bh)
1571 {
1572 	int status;
1573 
1574 	if (passed_bh) {
1575 		/* Ok, the update went to disk for us, use the
1576 		 * returned bh. */
1577 		*ret_bh = passed_bh;
1578 		get_bh(*ret_bh);
1579 
1580 		return 0;
1581 	}
1582 
1583 	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1584 				  OCFS2_I(inode)->ip_blkno,
1585 				  ret_bh,
1586 				  OCFS2_BH_CACHED,
1587 				  inode);
1588 	if (status < 0)
1589 		mlog_errno(status);
1590 
1591 	return status;
1592 }
1593 
1594 /*
1595  * returns < 0 error if the callback will never be called, otherwise
1596  * the result of the lock will be communicated via the callback.
1597  */
1598 int ocfs2_meta_lock_full(struct inode *inode,
1599 			 struct buffer_head **ret_bh,
1600 			 int ex,
1601 			 int arg_flags)
1602 {
1603 	int status, level, dlm_flags, acquired;
1604 	struct ocfs2_lock_res *lockres = NULL;
1605 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1606 	struct buffer_head *local_bh = NULL;
1607 
1608 	BUG_ON(!inode);
1609 
1610 	mlog_entry_void();
1611 
1612 	mlog(0, "inode %llu, take %s META lock\n",
1613 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1614 	     ex ? "EXMODE" : "PRMODE");
1615 
1616 	status = 0;
1617 	acquired = 0;
1618 	/* We'll allow faking a readonly metadata lock for
1619 	 * rodevices. */
1620 	if (ocfs2_is_hard_readonly(osb)) {
1621 		if (ex)
1622 			status = -EROFS;
1623 		goto bail;
1624 	}
1625 
1626 	if (ocfs2_mount_local(osb))
1627 		goto local;
1628 
1629 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1630 		wait_event(osb->recovery_event,
1631 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1632 
1633 	acquired = 0;
1634 	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1635 	level = ex ? LKM_EXMODE : LKM_PRMODE;
1636 	dlm_flags = 0;
1637 	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1638 		dlm_flags |= LKM_NOQUEUE;
1639 
1640 	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1641 	if (status < 0) {
1642 		if (status != -EAGAIN && status != -EIOCBRETRY)
1643 			mlog_errno(status);
1644 		goto bail;
1645 	}
1646 
1647 	/* Notify the error cleanup path to drop the cluster lock. */
1648 	acquired = 1;
1649 
1650 	/* We wait twice because a node may have died while we were in
1651 	 * the lower dlm layers. The second time though, we've
1652 	 * committed to owning this lock so we don't allow signals to
1653 	 * abort the operation. */
1654 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1655 		wait_event(osb->recovery_event,
1656 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1657 
1658 local:
1659 	/*
1660 	 * We only see this flag if we're being called from
1661 	 * ocfs2_read_locked_inode(). It means we're locking an inode
1662 	 * which hasn't been populated yet, so clear the refresh flag
1663 	 * and let the caller handle it.
1664 	 */
1665 	if (inode->i_state & I_NEW) {
1666 		status = 0;
1667 		if (lockres)
1668 			ocfs2_complete_lock_res_refresh(lockres, 0);
1669 		goto bail;
1670 	}
1671 
1672 	/* This is fun. The caller may want a bh back, or it may
1673 	 * not. ocfs2_meta_lock_update definitely wants one in, but
1674 	 * may or may not read one, depending on what's in the
1675 	 * LVB. The result of all of this is that we've *only* gone to
1676 	 * disk if we have to, so the complexity is worthwhile. */
1677 	status = ocfs2_meta_lock_update(inode, &local_bh);
1678 	if (status < 0) {
1679 		if (status != -ENOENT)
1680 			mlog_errno(status);
1681 		goto bail;
1682 	}
1683 
1684 	if (ret_bh) {
1685 		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1686 		if (status < 0) {
1687 			mlog_errno(status);
1688 			goto bail;
1689 		}
1690 	}
1691 
1692 bail:
1693 	if (status < 0) {
1694 		if (ret_bh && (*ret_bh)) {
1695 			brelse(*ret_bh);
1696 			*ret_bh = NULL;
1697 		}
1698 		if (acquired)
1699 			ocfs2_meta_unlock(inode, ex);
1700 	}
1701 
1702 	if (local_bh)
1703 		brelse(local_bh);
1704 
1705 	mlog_exit(status);
1706 	return status;
1707 }
1708 
1709 /*
1710  * This is working around a lock inversion between tasks acquiring DLM locks
1711  * while holding a page lock and the vote thread which blocks dlm lock acquiry
1712  * while acquiring page locks.
1713  *
1714  * ** These _with_page variantes are only intended to be called from aop
1715  * methods that hold page locks and return a very specific *positive* error
1716  * code that aop methods pass up to the VFS -- test for errors with != 0. **
1717  *
1718  * The DLM is called such that it returns -EAGAIN if it would have blocked
1719  * waiting for the vote thread.  In that case we unlock our page so the vote
1720  * thread can make progress.  Once we've done this we have to return
1721  * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1722  * into the VFS who will then immediately retry the aop call.
1723  *
1724  * We do a blocking lock and immediate unlock before returning, though, so that
1725  * the lock has a great chance of being cached on this node by the time the VFS
1726  * calls back to retry the aop.    This has a potential to livelock as nodes
1727  * ping locks back and forth, but that's a risk we're willing to take to avoid
1728  * the lock inversion simply.
1729  */
1730 int ocfs2_meta_lock_with_page(struct inode *inode,
1731 			      struct buffer_head **ret_bh,
1732 			      int ex,
1733 			      struct page *page)
1734 {
1735 	int ret;
1736 
1737 	ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
1738 	if (ret == -EAGAIN) {
1739 		unlock_page(page);
1740 		if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
1741 			ocfs2_meta_unlock(inode, ex);
1742 		ret = AOP_TRUNCATED_PAGE;
1743 	}
1744 
1745 	return ret;
1746 }
1747 
1748 int ocfs2_meta_lock_atime(struct inode *inode,
1749 			  struct vfsmount *vfsmnt,
1750 			  int *level)
1751 {
1752 	int ret;
1753 
1754 	mlog_entry_void();
1755 	ret = ocfs2_meta_lock(inode, NULL, 0);
1756 	if (ret < 0) {
1757 		mlog_errno(ret);
1758 		return ret;
1759 	}
1760 
1761 	/*
1762 	 * If we should update atime, we will get EX lock,
1763 	 * otherwise we just get PR lock.
1764 	 */
1765 	if (ocfs2_should_update_atime(inode, vfsmnt)) {
1766 		struct buffer_head *bh = NULL;
1767 
1768 		ocfs2_meta_unlock(inode, 0);
1769 		ret = ocfs2_meta_lock(inode, &bh, 1);
1770 		if (ret < 0) {
1771 			mlog_errno(ret);
1772 			return ret;
1773 		}
1774 		*level = 1;
1775 		if (ocfs2_should_update_atime(inode, vfsmnt))
1776 			ocfs2_update_inode_atime(inode, bh);
1777 		if (bh)
1778 			brelse(bh);
1779 	} else
1780 		*level = 0;
1781 
1782 	mlog_exit(ret);
1783 	return ret;
1784 }
1785 
1786 void ocfs2_meta_unlock(struct inode *inode,
1787 		       int ex)
1788 {
1789 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1790 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1791 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1792 
1793 	mlog_entry_void();
1794 
1795 	mlog(0, "inode %llu drop %s META lock\n",
1796 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1797 	     ex ? "EXMODE" : "PRMODE");
1798 
1799 	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1800 	    !ocfs2_mount_local(osb))
1801 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1802 
1803 	mlog_exit_void();
1804 }
1805 
1806 int ocfs2_super_lock(struct ocfs2_super *osb,
1807 		     int ex)
1808 {
1809 	int status = 0;
1810 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1811 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1812 	struct buffer_head *bh;
1813 	struct ocfs2_slot_info *si = osb->slot_info;
1814 
1815 	mlog_entry_void();
1816 
1817 	if (ocfs2_is_hard_readonly(osb))
1818 		return -EROFS;
1819 
1820 	if (ocfs2_mount_local(osb))
1821 		goto bail;
1822 
1823 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1824 	if (status < 0) {
1825 		mlog_errno(status);
1826 		goto bail;
1827 	}
1828 
1829 	/* The super block lock path is really in the best position to
1830 	 * know when resources covered by the lock need to be
1831 	 * refreshed, so we do it here. Of course, making sense of
1832 	 * everything is up to the caller :) */
1833 	status = ocfs2_should_refresh_lock_res(lockres);
1834 	if (status < 0) {
1835 		mlog_errno(status);
1836 		goto bail;
1837 	}
1838 	if (status) {
1839 		bh = si->si_bh;
1840 		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1841 					  si->si_inode);
1842 		if (status == 0)
1843 			ocfs2_update_slot_info(si);
1844 
1845 		ocfs2_complete_lock_res_refresh(lockres, status);
1846 
1847 		if (status < 0)
1848 			mlog_errno(status);
1849 	}
1850 bail:
1851 	mlog_exit(status);
1852 	return status;
1853 }
1854 
1855 void ocfs2_super_unlock(struct ocfs2_super *osb,
1856 			int ex)
1857 {
1858 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1859 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1860 
1861 	if (!ocfs2_mount_local(osb))
1862 		ocfs2_cluster_unlock(osb, lockres, level);
1863 }
1864 
1865 int ocfs2_rename_lock(struct ocfs2_super *osb)
1866 {
1867 	int status;
1868 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1869 
1870 	if (ocfs2_is_hard_readonly(osb))
1871 		return -EROFS;
1872 
1873 	if (ocfs2_mount_local(osb))
1874 		return 0;
1875 
1876 	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1877 	if (status < 0)
1878 		mlog_errno(status);
1879 
1880 	return status;
1881 }
1882 
1883 void ocfs2_rename_unlock(struct ocfs2_super *osb)
1884 {
1885 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1886 
1887 	if (!ocfs2_mount_local(osb))
1888 		ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1889 }
1890 
1891 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1892 {
1893 	int ret;
1894 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1895 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1896 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1897 
1898 	BUG_ON(!dl);
1899 
1900 	if (ocfs2_is_hard_readonly(osb))
1901 		return -EROFS;
1902 
1903 	if (ocfs2_mount_local(osb))
1904 		return 0;
1905 
1906 	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1907 	if (ret < 0)
1908 		mlog_errno(ret);
1909 
1910 	return ret;
1911 }
1912 
1913 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1914 {
1915 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1916 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1917 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1918 
1919 	if (!ocfs2_mount_local(osb))
1920 		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1921 }
1922 
1923 /* Reference counting of the dlm debug structure. We want this because
1924  * open references on the debug inodes can live on after a mount, so
1925  * we can't rely on the ocfs2_super to always exist. */
1926 static void ocfs2_dlm_debug_free(struct kref *kref)
1927 {
1928 	struct ocfs2_dlm_debug *dlm_debug;
1929 
1930 	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1931 
1932 	kfree(dlm_debug);
1933 }
1934 
1935 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1936 {
1937 	if (dlm_debug)
1938 		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1939 }
1940 
1941 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1942 {
1943 	kref_get(&debug->d_refcnt);
1944 }
1945 
1946 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1947 {
1948 	struct ocfs2_dlm_debug *dlm_debug;
1949 
1950 	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1951 	if (!dlm_debug) {
1952 		mlog_errno(-ENOMEM);
1953 		goto out;
1954 	}
1955 
1956 	kref_init(&dlm_debug->d_refcnt);
1957 	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1958 	dlm_debug->d_locking_state = NULL;
1959 out:
1960 	return dlm_debug;
1961 }
1962 
1963 /* Access to this is arbitrated for us via seq_file->sem. */
1964 struct ocfs2_dlm_seq_priv {
1965 	struct ocfs2_dlm_debug *p_dlm_debug;
1966 	struct ocfs2_lock_res p_iter_res;
1967 	struct ocfs2_lock_res p_tmp_res;
1968 };
1969 
1970 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1971 						 struct ocfs2_dlm_seq_priv *priv)
1972 {
1973 	struct ocfs2_lock_res *iter, *ret = NULL;
1974 	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1975 
1976 	assert_spin_locked(&ocfs2_dlm_tracking_lock);
1977 
1978 	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1979 		/* discover the head of the list */
1980 		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1981 			mlog(0, "End of list found, %p\n", ret);
1982 			break;
1983 		}
1984 
1985 		/* We track our "dummy" iteration lockres' by a NULL
1986 		 * l_ops field. */
1987 		if (iter->l_ops != NULL) {
1988 			ret = iter;
1989 			break;
1990 		}
1991 	}
1992 
1993 	return ret;
1994 }
1995 
1996 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1997 {
1998 	struct ocfs2_dlm_seq_priv *priv = m->private;
1999 	struct ocfs2_lock_res *iter;
2000 
2001 	spin_lock(&ocfs2_dlm_tracking_lock);
2002 	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2003 	if (iter) {
2004 		/* Since lockres' have the lifetime of their container
2005 		 * (which can be inodes, ocfs2_supers, etc) we want to
2006 		 * copy this out to a temporary lockres while still
2007 		 * under the spinlock. Obviously after this we can't
2008 		 * trust any pointers on the copy returned, but that's
2009 		 * ok as the information we want isn't typically held
2010 		 * in them. */
2011 		priv->p_tmp_res = *iter;
2012 		iter = &priv->p_tmp_res;
2013 	}
2014 	spin_unlock(&ocfs2_dlm_tracking_lock);
2015 
2016 	return iter;
2017 }
2018 
2019 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2020 {
2021 }
2022 
2023 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2024 {
2025 	struct ocfs2_dlm_seq_priv *priv = m->private;
2026 	struct ocfs2_lock_res *iter = v;
2027 	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2028 
2029 	spin_lock(&ocfs2_dlm_tracking_lock);
2030 	iter = ocfs2_dlm_next_res(iter, priv);
2031 	list_del_init(&dummy->l_debug_list);
2032 	if (iter) {
2033 		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2034 		priv->p_tmp_res = *iter;
2035 		iter = &priv->p_tmp_res;
2036 	}
2037 	spin_unlock(&ocfs2_dlm_tracking_lock);
2038 
2039 	return iter;
2040 }
2041 
2042 /* So that debugfs.ocfs2 can determine which format is being used */
2043 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2044 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2045 {
2046 	int i;
2047 	char *lvb;
2048 	struct ocfs2_lock_res *lockres = v;
2049 
2050 	if (!lockres)
2051 		return -EINVAL;
2052 
2053 	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2054 
2055 	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2056 		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2057 			   lockres->l_name,
2058 			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2059 	else
2060 		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2061 
2062 	seq_printf(m, "%d\t"
2063 		   "0x%lx\t"
2064 		   "0x%x\t"
2065 		   "0x%x\t"
2066 		   "%u\t"
2067 		   "%u\t"
2068 		   "%d\t"
2069 		   "%d\t",
2070 		   lockres->l_level,
2071 		   lockres->l_flags,
2072 		   lockres->l_action,
2073 		   lockres->l_unlock_action,
2074 		   lockres->l_ro_holders,
2075 		   lockres->l_ex_holders,
2076 		   lockres->l_requested,
2077 		   lockres->l_blocking);
2078 
2079 	/* Dump the raw LVB */
2080 	lvb = lockres->l_lksb.lvb;
2081 	for(i = 0; i < DLM_LVB_LEN; i++)
2082 		seq_printf(m, "0x%x\t", lvb[i]);
2083 
2084 	/* End the line */
2085 	seq_printf(m, "\n");
2086 	return 0;
2087 }
2088 
2089 static struct seq_operations ocfs2_dlm_seq_ops = {
2090 	.start =	ocfs2_dlm_seq_start,
2091 	.stop =		ocfs2_dlm_seq_stop,
2092 	.next =		ocfs2_dlm_seq_next,
2093 	.show =		ocfs2_dlm_seq_show,
2094 };
2095 
2096 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2097 {
2098 	struct seq_file *seq = (struct seq_file *) file->private_data;
2099 	struct ocfs2_dlm_seq_priv *priv = seq->private;
2100 	struct ocfs2_lock_res *res = &priv->p_iter_res;
2101 
2102 	ocfs2_remove_lockres_tracking(res);
2103 	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2104 	return seq_release_private(inode, file);
2105 }
2106 
2107 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2108 {
2109 	int ret;
2110 	struct ocfs2_dlm_seq_priv *priv;
2111 	struct seq_file *seq;
2112 	struct ocfs2_super *osb;
2113 
2114 	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2115 	if (!priv) {
2116 		ret = -ENOMEM;
2117 		mlog_errno(ret);
2118 		goto out;
2119 	}
2120 	osb = inode->i_private;
2121 	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2122 	priv->p_dlm_debug = osb->osb_dlm_debug;
2123 	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2124 
2125 	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2126 	if (ret) {
2127 		kfree(priv);
2128 		mlog_errno(ret);
2129 		goto out;
2130 	}
2131 
2132 	seq = (struct seq_file *) file->private_data;
2133 	seq->private = priv;
2134 
2135 	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2136 				   priv->p_dlm_debug);
2137 
2138 out:
2139 	return ret;
2140 }
2141 
2142 static const struct file_operations ocfs2_dlm_debug_fops = {
2143 	.open =		ocfs2_dlm_debug_open,
2144 	.release =	ocfs2_dlm_debug_release,
2145 	.read =		seq_read,
2146 	.llseek =	seq_lseek,
2147 };
2148 
2149 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2150 {
2151 	int ret = 0;
2152 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2153 
2154 	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2155 							 S_IFREG|S_IRUSR,
2156 							 osb->osb_debug_root,
2157 							 osb,
2158 							 &ocfs2_dlm_debug_fops);
2159 	if (!dlm_debug->d_locking_state) {
2160 		ret = -EINVAL;
2161 		mlog(ML_ERROR,
2162 		     "Unable to create locking state debugfs file.\n");
2163 		goto out;
2164 	}
2165 
2166 	ocfs2_get_dlm_debug(dlm_debug);
2167 out:
2168 	return ret;
2169 }
2170 
2171 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2172 {
2173 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2174 
2175 	if (dlm_debug) {
2176 		debugfs_remove(dlm_debug->d_locking_state);
2177 		ocfs2_put_dlm_debug(dlm_debug);
2178 	}
2179 }
2180 
2181 int ocfs2_dlm_init(struct ocfs2_super *osb)
2182 {
2183 	int status = 0;
2184 	u32 dlm_key;
2185 	struct dlm_ctxt *dlm = NULL;
2186 
2187 	mlog_entry_void();
2188 
2189 	if (ocfs2_mount_local(osb))
2190 		goto local;
2191 
2192 	status = ocfs2_dlm_init_debug(osb);
2193 	if (status < 0) {
2194 		mlog_errno(status);
2195 		goto bail;
2196 	}
2197 
2198 	/* launch vote thread */
2199 	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2200 	if (IS_ERR(osb->vote_task)) {
2201 		status = PTR_ERR(osb->vote_task);
2202 		osb->vote_task = NULL;
2203 		mlog_errno(status);
2204 		goto bail;
2205 	}
2206 
2207 	/* used by the dlm code to make message headers unique, each
2208 	 * node in this domain must agree on this. */
2209 	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2210 
2211 	/* for now, uuid == domain */
2212 	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2213 	if (IS_ERR(dlm)) {
2214 		status = PTR_ERR(dlm);
2215 		mlog_errno(status);
2216 		goto bail;
2217 	}
2218 
2219 	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2220 
2221 local:
2222 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2223 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2224 
2225 	osb->dlm = dlm;
2226 
2227 	status = 0;
2228 bail:
2229 	if (status < 0) {
2230 		ocfs2_dlm_shutdown_debug(osb);
2231 		if (osb->vote_task)
2232 			kthread_stop(osb->vote_task);
2233 	}
2234 
2235 	mlog_exit(status);
2236 	return status;
2237 }
2238 
2239 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2240 {
2241 	mlog_entry_void();
2242 
2243 	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2244 
2245 	ocfs2_drop_osb_locks(osb);
2246 
2247 	if (osb->vote_task) {
2248 		kthread_stop(osb->vote_task);
2249 		osb->vote_task = NULL;
2250 	}
2251 
2252 	ocfs2_lock_res_free(&osb->osb_super_lockres);
2253 	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2254 
2255 	dlm_unregister_domain(osb->dlm);
2256 	osb->dlm = NULL;
2257 
2258 	ocfs2_dlm_shutdown_debug(osb);
2259 
2260 	mlog_exit_void();
2261 }
2262 
2263 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2264 {
2265 	struct ocfs2_lock_res *lockres = opaque;
2266 	unsigned long flags;
2267 
2268 	mlog_entry_void();
2269 
2270 	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2271 	     lockres->l_unlock_action);
2272 
2273 	spin_lock_irqsave(&lockres->l_lock, flags);
2274 	/* We tried to cancel a convert request, but it was already
2275 	 * granted. All we want to do here is clear our unlock
2276 	 * state. The wake_up call done at the bottom is redundant
2277 	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2278 	 * hurt anything anyway */
2279 	if (status == DLM_CANCELGRANT &&
2280 	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2281 		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2282 
2283 		/* We don't clear the busy flag in this case as it
2284 		 * should have been cleared by the ast which the dlm
2285 		 * has called. */
2286 		goto complete_unlock;
2287 	}
2288 
2289 	if (status != DLM_NORMAL) {
2290 		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2291 		     "unlock_action %d\n", status, lockres->l_name,
2292 		     lockres->l_unlock_action);
2293 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2294 		return;
2295 	}
2296 
2297 	switch(lockres->l_unlock_action) {
2298 	case OCFS2_UNLOCK_CANCEL_CONVERT:
2299 		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2300 		lockres->l_action = OCFS2_AST_INVALID;
2301 		break;
2302 	case OCFS2_UNLOCK_DROP_LOCK:
2303 		lockres->l_level = LKM_IVMODE;
2304 		break;
2305 	default:
2306 		BUG();
2307 	}
2308 
2309 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2310 complete_unlock:
2311 	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2312 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2313 
2314 	wake_up(&lockres->l_event);
2315 
2316 	mlog_exit_void();
2317 }
2318 
2319 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2320 			   struct ocfs2_lock_res *lockres)
2321 {
2322 	enum dlm_status status;
2323 	unsigned long flags;
2324 	int lkm_flags = 0;
2325 
2326 	/* We didn't get anywhere near actually using this lockres. */
2327 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2328 		goto out;
2329 
2330 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2331 		lkm_flags |= LKM_VALBLK;
2332 
2333 	spin_lock_irqsave(&lockres->l_lock, flags);
2334 
2335 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2336 			"lockres %s, flags 0x%lx\n",
2337 			lockres->l_name, lockres->l_flags);
2338 
2339 	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2340 		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2341 		     "%u, unlock_action = %u\n",
2342 		     lockres->l_name, lockres->l_flags, lockres->l_action,
2343 		     lockres->l_unlock_action);
2344 
2345 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2346 
2347 		/* XXX: Today we just wait on any busy
2348 		 * locks... Perhaps we need to cancel converts in the
2349 		 * future? */
2350 		ocfs2_wait_on_busy_lock(lockres);
2351 
2352 		spin_lock_irqsave(&lockres->l_lock, flags);
2353 	}
2354 
2355 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2356 		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2357 		    lockres->l_level == LKM_EXMODE &&
2358 		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2359 			lockres->l_ops->set_lvb(lockres);
2360 	}
2361 
2362 	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2363 		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2364 		     lockres->l_name);
2365 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2366 		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2367 
2368 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2369 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2370 		goto out;
2371 	}
2372 
2373 	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2374 
2375 	/* make sure we never get here while waiting for an ast to
2376 	 * fire. */
2377 	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2378 
2379 	/* is this necessary? */
2380 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2381 	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2382 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2383 
2384 	mlog(0, "lock %s\n", lockres->l_name);
2385 
2386 	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2387 			   ocfs2_unlock_ast, lockres);
2388 	if (status != DLM_NORMAL) {
2389 		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2390 		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2391 		dlm_print_one_lock(lockres->l_lksb.lockid);
2392 		BUG();
2393 	}
2394 	mlog(0, "lock %s, successfull return from dlmunlock\n",
2395 	     lockres->l_name);
2396 
2397 	ocfs2_wait_on_busy_lock(lockres);
2398 out:
2399 	mlog_exit(0);
2400 	return 0;
2401 }
2402 
2403 /* Mark the lockres as being dropped. It will no longer be
2404  * queued if blocking, but we still may have to wait on it
2405  * being dequeued from the vote thread before we can consider
2406  * it safe to drop.
2407  *
2408  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2409 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2410 {
2411 	int status;
2412 	struct ocfs2_mask_waiter mw;
2413 	unsigned long flags;
2414 
2415 	ocfs2_init_mask_waiter(&mw);
2416 
2417 	spin_lock_irqsave(&lockres->l_lock, flags);
2418 	lockres->l_flags |= OCFS2_LOCK_FREEING;
2419 	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2420 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2421 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2422 
2423 		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2424 
2425 		status = ocfs2_wait_for_mask(&mw);
2426 		if (status)
2427 			mlog_errno(status);
2428 
2429 		spin_lock_irqsave(&lockres->l_lock, flags);
2430 	}
2431 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2432 }
2433 
2434 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2435 			       struct ocfs2_lock_res *lockres)
2436 {
2437 	int ret;
2438 
2439 	ocfs2_mark_lockres_freeing(lockres);
2440 	ret = ocfs2_drop_lock(osb, lockres);
2441 	if (ret)
2442 		mlog_errno(ret);
2443 }
2444 
2445 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2446 {
2447 	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2448 	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2449 }
2450 
2451 int ocfs2_drop_inode_locks(struct inode *inode)
2452 {
2453 	int status, err;
2454 
2455 	mlog_entry_void();
2456 
2457 	/* No need to call ocfs2_mark_lockres_freeing here -
2458 	 * ocfs2_clear_inode has done it for us. */
2459 
2460 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2461 			      &OCFS2_I(inode)->ip_data_lockres);
2462 	if (err < 0)
2463 		mlog_errno(err);
2464 
2465 	status = err;
2466 
2467 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2468 			      &OCFS2_I(inode)->ip_meta_lockres);
2469 	if (err < 0)
2470 		mlog_errno(err);
2471 	if (err < 0 && !status)
2472 		status = err;
2473 
2474 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2475 			      &OCFS2_I(inode)->ip_rw_lockres);
2476 	if (err < 0)
2477 		mlog_errno(err);
2478 	if (err < 0 && !status)
2479 		status = err;
2480 
2481 	mlog_exit(status);
2482 	return status;
2483 }
2484 
2485 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2486 				      int new_level)
2487 {
2488 	assert_spin_locked(&lockres->l_lock);
2489 
2490 	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2491 
2492 	if (lockres->l_level <= new_level) {
2493 		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2494 		     lockres->l_level, new_level);
2495 		BUG();
2496 	}
2497 
2498 	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2499 	     lockres->l_name, new_level, lockres->l_blocking);
2500 
2501 	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2502 	lockres->l_requested = new_level;
2503 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2504 }
2505 
2506 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2507 				  struct ocfs2_lock_res *lockres,
2508 				  int new_level,
2509 				  int lvb)
2510 {
2511 	int ret, dlm_flags = LKM_CONVERT;
2512 	enum dlm_status status;
2513 
2514 	mlog_entry_void();
2515 
2516 	if (lvb)
2517 		dlm_flags |= LKM_VALBLK;
2518 
2519 	status = dlmlock(osb->dlm,
2520 			 new_level,
2521 			 &lockres->l_lksb,
2522 			 dlm_flags,
2523 			 lockres->l_name,
2524 			 OCFS2_LOCK_ID_MAX_LEN - 1,
2525 			 ocfs2_locking_ast,
2526 			 lockres,
2527 			 ocfs2_blocking_ast);
2528 	if (status != DLM_NORMAL) {
2529 		ocfs2_log_dlm_error("dlmlock", status, lockres);
2530 		ret = -EINVAL;
2531 		ocfs2_recover_from_dlm_error(lockres, 1);
2532 		goto bail;
2533 	}
2534 
2535 	ret = 0;
2536 bail:
2537 	mlog_exit(ret);
2538 	return ret;
2539 }
2540 
2541 /* returns 1 when the caller should unlock and call dlmunlock */
2542 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2543 				        struct ocfs2_lock_res *lockres)
2544 {
2545 	assert_spin_locked(&lockres->l_lock);
2546 
2547 	mlog_entry_void();
2548 	mlog(0, "lock %s\n", lockres->l_name);
2549 
2550 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2551 		/* If we're already trying to cancel a lock conversion
2552 		 * then just drop the spinlock and allow the caller to
2553 		 * requeue this lock. */
2554 
2555 		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2556 		return 0;
2557 	}
2558 
2559 	/* were we in a convert when we got the bast fire? */
2560 	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2561 	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2562 	/* set things up for the unlockast to know to just
2563 	 * clear out the ast_action and unset busy, etc. */
2564 	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2565 
2566 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2567 			"lock %s, invalid flags: 0x%lx\n",
2568 			lockres->l_name, lockres->l_flags);
2569 
2570 	return 1;
2571 }
2572 
2573 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2574 				struct ocfs2_lock_res *lockres)
2575 {
2576 	int ret;
2577 	enum dlm_status status;
2578 
2579 	mlog_entry_void();
2580 	mlog(0, "lock %s\n", lockres->l_name);
2581 
2582 	ret = 0;
2583 	status = dlmunlock(osb->dlm,
2584 			   &lockres->l_lksb,
2585 			   LKM_CANCEL,
2586 			   ocfs2_unlock_ast,
2587 			   lockres);
2588 	if (status != DLM_NORMAL) {
2589 		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2590 		ret = -EINVAL;
2591 		ocfs2_recover_from_dlm_error(lockres, 0);
2592 	}
2593 
2594 	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2595 
2596 	mlog_exit(ret);
2597 	return ret;
2598 }
2599 
2600 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2601 			      struct ocfs2_lock_res *lockres,
2602 			      struct ocfs2_unblock_ctl *ctl)
2603 {
2604 	unsigned long flags;
2605 	int blocking;
2606 	int new_level;
2607 	int ret = 0;
2608 	int set_lvb = 0;
2609 
2610 	mlog_entry_void();
2611 
2612 	spin_lock_irqsave(&lockres->l_lock, flags);
2613 
2614 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2615 
2616 recheck:
2617 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2618 		ctl->requeue = 1;
2619 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2620 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2621 		if (ret) {
2622 			ret = ocfs2_cancel_convert(osb, lockres);
2623 			if (ret < 0)
2624 				mlog_errno(ret);
2625 		}
2626 		goto leave;
2627 	}
2628 
2629 	/* if we're blocking an exclusive and we have *any* holders,
2630 	 * then requeue. */
2631 	if ((lockres->l_blocking == LKM_EXMODE)
2632 	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2633 		goto leave_requeue;
2634 
2635 	/* If it's a PR we're blocking, then only
2636 	 * requeue if we've got any EX holders */
2637 	if (lockres->l_blocking == LKM_PRMODE &&
2638 	    lockres->l_ex_holders)
2639 		goto leave_requeue;
2640 
2641 	/*
2642 	 * Can we get a lock in this state if the holder counts are
2643 	 * zero? The meta data unblock code used to check this.
2644 	 */
2645 	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2646 	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2647 		goto leave_requeue;
2648 
2649 	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2650 
2651 	if (lockres->l_ops->check_downconvert
2652 	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2653 		goto leave_requeue;
2654 
2655 	/* If we get here, then we know that there are no more
2656 	 * incompatible holders (and anyone asking for an incompatible
2657 	 * lock is blocked). We can now downconvert the lock */
2658 	if (!lockres->l_ops->downconvert_worker)
2659 		goto downconvert;
2660 
2661 	/* Some lockres types want to do a bit of work before
2662 	 * downconverting a lock. Allow that here. The worker function
2663 	 * may sleep, so we save off a copy of what we're blocking as
2664 	 * it may change while we're not holding the spin lock. */
2665 	blocking = lockres->l_blocking;
2666 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2667 
2668 	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2669 
2670 	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2671 		goto leave;
2672 
2673 	spin_lock_irqsave(&lockres->l_lock, flags);
2674 	if (blocking != lockres->l_blocking) {
2675 		/* If this changed underneath us, then we can't drop
2676 		 * it just yet. */
2677 		goto recheck;
2678 	}
2679 
2680 downconvert:
2681 	ctl->requeue = 0;
2682 
2683 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2684 		if (lockres->l_level == LKM_EXMODE)
2685 			set_lvb = 1;
2686 
2687 		/*
2688 		 * We only set the lvb if the lock has been fully
2689 		 * refreshed - otherwise we risk setting stale
2690 		 * data. Otherwise, there's no need to actually clear
2691 		 * out the lvb here as it's value is still valid.
2692 		 */
2693 		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2694 			lockres->l_ops->set_lvb(lockres);
2695 	}
2696 
2697 	ocfs2_prepare_downconvert(lockres, new_level);
2698 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2699 	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2700 leave:
2701 	mlog_exit(ret);
2702 	return ret;
2703 
2704 leave_requeue:
2705 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2706 	ctl->requeue = 1;
2707 
2708 	mlog_exit(0);
2709 	return 0;
2710 }
2711 
2712 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2713 				     int blocking)
2714 {
2715 	struct inode *inode;
2716 	struct address_space *mapping;
2717 
2718        	inode = ocfs2_lock_res_inode(lockres);
2719 	mapping = inode->i_mapping;
2720 
2721 	/*
2722 	 * We need this before the filemap_fdatawrite() so that it can
2723 	 * transfer the dirty bit from the PTE to the
2724 	 * page. Unfortunately this means that even for EX->PR
2725 	 * downconverts, we'll lose our mappings and have to build
2726 	 * them up again.
2727 	 */
2728 	unmap_mapping_range(mapping, 0, 0, 0);
2729 
2730 	if (filemap_fdatawrite(mapping)) {
2731 		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2732 		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2733 	}
2734 	sync_mapping_buffers(mapping);
2735 	if (blocking == LKM_EXMODE) {
2736 		truncate_inode_pages(mapping, 0);
2737 	} else {
2738 		/* We only need to wait on the I/O if we're not also
2739 		 * truncating pages because truncate_inode_pages waits
2740 		 * for us above. We don't truncate pages if we're
2741 		 * blocking anything < EXMODE because we want to keep
2742 		 * them around in that case. */
2743 		filemap_fdatawait(mapping);
2744 	}
2745 
2746 	return UNBLOCK_CONTINUE;
2747 }
2748 
2749 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2750 					int new_level)
2751 {
2752 	struct inode *inode = ocfs2_lock_res_inode(lockres);
2753 	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2754 
2755 	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2756 	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2757 
2758 	if (checkpointed)
2759 		return 1;
2760 
2761 	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2762 	return 0;
2763 }
2764 
2765 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2766 {
2767 	struct inode *inode = ocfs2_lock_res_inode(lockres);
2768 
2769 	__ocfs2_stuff_meta_lvb(inode);
2770 }
2771 
2772 /*
2773  * Does the final reference drop on our dentry lock. Right now this
2774  * happens in the vote thread, but we could choose to simplify the
2775  * dlmglue API and push these off to the ocfs2_wq in the future.
2776  */
2777 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2778 				     struct ocfs2_lock_res *lockres)
2779 {
2780 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2781 	ocfs2_dentry_lock_put(osb, dl);
2782 }
2783 
2784 /*
2785  * d_delete() matching dentries before the lock downconvert.
2786  *
2787  * At this point, any process waiting to destroy the
2788  * dentry_lock due to last ref count is stopped by the
2789  * OCFS2_LOCK_QUEUED flag.
2790  *
2791  * We have two potential problems
2792  *
2793  * 1) If we do the last reference drop on our dentry_lock (via dput)
2794  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2795  *    the downconvert to finish. Instead we take an elevated
2796  *    reference and push the drop until after we've completed our
2797  *    unblock processing.
2798  *
2799  * 2) There might be another process with a final reference,
2800  *    waiting on us to finish processing. If this is the case, we
2801  *    detect it and exit out - there's no more dentries anyway.
2802  */
2803 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2804 				       int blocking)
2805 {
2806 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2807 	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2808 	struct dentry *dentry;
2809 	unsigned long flags;
2810 	int extra_ref = 0;
2811 
2812 	/*
2813 	 * This node is blocking another node from getting a read
2814 	 * lock. This happens when we've renamed within a
2815 	 * directory. We've forced the other nodes to d_delete(), but
2816 	 * we never actually dropped our lock because it's still
2817 	 * valid. The downconvert code will retain a PR for this node,
2818 	 * so there's no further work to do.
2819 	 */
2820 	if (blocking == LKM_PRMODE)
2821 		return UNBLOCK_CONTINUE;
2822 
2823 	/*
2824 	 * Mark this inode as potentially orphaned. The code in
2825 	 * ocfs2_delete_inode() will figure out whether it actually
2826 	 * needs to be freed or not.
2827 	 */
2828 	spin_lock(&oi->ip_lock);
2829 	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2830 	spin_unlock(&oi->ip_lock);
2831 
2832 	/*
2833 	 * Yuck. We need to make sure however that the check of
2834 	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2835 	 * respect to a reference decrement or the setting of that
2836 	 * flag.
2837 	 */
2838 	spin_lock_irqsave(&lockres->l_lock, flags);
2839 	spin_lock(&dentry_attach_lock);
2840 	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2841 	    && dl->dl_count) {
2842 		dl->dl_count++;
2843 		extra_ref = 1;
2844 	}
2845 	spin_unlock(&dentry_attach_lock);
2846 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2847 
2848 	mlog(0, "extra_ref = %d\n", extra_ref);
2849 
2850 	/*
2851 	 * We have a process waiting on us in ocfs2_dentry_iput(),
2852 	 * which means we can't have any more outstanding
2853 	 * aliases. There's no need to do any more work.
2854 	 */
2855 	if (!extra_ref)
2856 		return UNBLOCK_CONTINUE;
2857 
2858 	spin_lock(&dentry_attach_lock);
2859 	while (1) {
2860 		dentry = ocfs2_find_local_alias(dl->dl_inode,
2861 						dl->dl_parent_blkno, 1);
2862 		if (!dentry)
2863 			break;
2864 		spin_unlock(&dentry_attach_lock);
2865 
2866 		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2867 		     dentry->d_name.name);
2868 
2869 		/*
2870 		 * The following dcache calls may do an
2871 		 * iput(). Normally we don't want that from the
2872 		 * downconverting thread, but in this case it's ok
2873 		 * because the requesting node already has an
2874 		 * exclusive lock on the inode, so it can't be queued
2875 		 * for a downconvert.
2876 		 */
2877 		d_delete(dentry);
2878 		dput(dentry);
2879 
2880 		spin_lock(&dentry_attach_lock);
2881 	}
2882 	spin_unlock(&dentry_attach_lock);
2883 
2884 	/*
2885 	 * If we are the last holder of this dentry lock, there is no
2886 	 * reason to downconvert so skip straight to the unlock.
2887 	 */
2888 	if (dl->dl_count == 1)
2889 		return UNBLOCK_STOP_POST;
2890 
2891 	return UNBLOCK_CONTINUE_POST;
2892 }
2893 
2894 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2895 				struct ocfs2_lock_res *lockres)
2896 {
2897 	int status;
2898 	struct ocfs2_unblock_ctl ctl = {0, 0,};
2899 	unsigned long flags;
2900 
2901 	/* Our reference to the lockres in this function can be
2902 	 * considered valid until we remove the OCFS2_LOCK_QUEUED
2903 	 * flag. */
2904 
2905 	mlog_entry_void();
2906 
2907 	BUG_ON(!lockres);
2908 	BUG_ON(!lockres->l_ops);
2909 
2910 	mlog(0, "lockres %s blocked.\n", lockres->l_name);
2911 
2912 	/* Detect whether a lock has been marked as going away while
2913 	 * the vote thread was processing other things. A lock can
2914 	 * still be marked with OCFS2_LOCK_FREEING after this check,
2915 	 * but short circuiting here will still save us some
2916 	 * performance. */
2917 	spin_lock_irqsave(&lockres->l_lock, flags);
2918 	if (lockres->l_flags & OCFS2_LOCK_FREEING)
2919 		goto unqueue;
2920 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2921 
2922 	status = ocfs2_unblock_lock(osb, lockres, &ctl);
2923 	if (status < 0)
2924 		mlog_errno(status);
2925 
2926 	spin_lock_irqsave(&lockres->l_lock, flags);
2927 unqueue:
2928 	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
2929 		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2930 	} else
2931 		ocfs2_schedule_blocked_lock(osb, lockres);
2932 
2933 	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
2934 	     ctl.requeue ? "yes" : "no");
2935 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2936 
2937 	if (ctl.unblock_action != UNBLOCK_CONTINUE
2938 	    && lockres->l_ops->post_unlock)
2939 		lockres->l_ops->post_unlock(osb, lockres);
2940 
2941 	mlog_exit_void();
2942 }
2943 
2944 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
2945 					struct ocfs2_lock_res *lockres)
2946 {
2947 	mlog_entry_void();
2948 
2949 	assert_spin_locked(&lockres->l_lock);
2950 
2951 	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
2952 		/* Do not schedule a lock for downconvert when it's on
2953 		 * the way to destruction - any nodes wanting access
2954 		 * to the resource will get it soon. */
2955 		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
2956 		     lockres->l_name, lockres->l_flags);
2957 		return;
2958 	}
2959 
2960 	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
2961 
2962 	spin_lock(&osb->vote_task_lock);
2963 	if (list_empty(&lockres->l_blocked_list)) {
2964 		list_add_tail(&lockres->l_blocked_list,
2965 			      &osb->blocked_lock_list);
2966 		osb->blocked_lock_count++;
2967 	}
2968 	spin_unlock(&osb->vote_task_lock);
2969 
2970 	mlog_exit_void();
2971 }
2972 
2973 /* This aids in debugging situations where a bad LVB might be involved. */
2974 void ocfs2_dump_meta_lvb_info(u64 level,
2975 			      const char *function,
2976 			      unsigned int line,
2977 			      struct ocfs2_lock_res *lockres)
2978 {
2979 	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
2980 
2981 	mlog(level, "LVB information for %s (called from %s:%u):\n",
2982 	     lockres->l_name, function, line);
2983 	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
2984 	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
2985 	     be32_to_cpu(lvb->lvb_igeneration));
2986 	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
2987 	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
2988 	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
2989 	     be16_to_cpu(lvb->lvb_imode));
2990 	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
2991 	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
2992 	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
2993 	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
2994 	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
2995 	     be32_to_cpu(lvb->lvb_iattr));
2996 }
2997