xref: /linux/fs/ocfs2/dlm/dlmlock.c (revision eb01fe7abbe2d0b38824d2a93fdb4cc3eaf2ccc1)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * dlmlock.c
4  *
5  * underlying calls for lock creation
6  *
7  * Copyright (C) 2004 Oracle.  All rights reserved.
8  */
9 
10 
11 #include <linux/module.h>
12 #include <linux/fs.h>
13 #include <linux/types.h>
14 #include <linux/slab.h>
15 #include <linux/highmem.h>
16 #include <linux/init.h>
17 #include <linux/sysctl.h>
18 #include <linux/random.h>
19 #include <linux/blkdev.h>
20 #include <linux/socket.h>
21 #include <linux/inet.h>
22 #include <linux/spinlock.h>
23 #include <linux/delay.h>
24 
25 
26 #include "../cluster/heartbeat.h"
27 #include "../cluster/nodemanager.h"
28 #include "../cluster/tcp.h"
29 
30 #include "dlmapi.h"
31 #include "dlmcommon.h"
32 
33 #include "dlmconvert.h"
34 
35 #define MLOG_MASK_PREFIX ML_DLM
36 #include "../cluster/masklog.h"
37 
38 static struct kmem_cache *dlm_lock_cache;
39 
40 static DEFINE_SPINLOCK(dlm_cookie_lock);
41 static u64 dlm_next_cookie = 1;
42 
43 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
44 					       struct dlm_lock_resource *res,
45 					       struct dlm_lock *lock, int flags);
46 static void dlm_init_lock(struct dlm_lock *newlock, int type,
47 			  u8 node, u64 cookie);
48 static void dlm_lock_release(struct kref *kref);
49 static void dlm_lock_detach_lockres(struct dlm_lock *lock);
50 
51 int dlm_init_lock_cache(void)
52 {
53 	dlm_lock_cache = kmem_cache_create("o2dlm_lock",
54 					   sizeof(struct dlm_lock),
55 					   0, SLAB_HWCACHE_ALIGN, NULL);
56 	if (dlm_lock_cache == NULL)
57 		return -ENOMEM;
58 	return 0;
59 }
60 
61 void dlm_destroy_lock_cache(void)
62 {
63 	kmem_cache_destroy(dlm_lock_cache);
64 }
65 
66 /* Tell us whether we can grant a new lock request.
67  * locking:
68  *   caller needs:  res->spinlock
69  *   taken:         none
70  *   held on exit:  none
71  * returns: 1 if the lock can be granted, 0 otherwise.
72  */
73 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
74 				  struct dlm_lock *lock)
75 {
76 	struct dlm_lock *tmplock;
77 
78 	list_for_each_entry(tmplock, &res->granted, list) {
79 		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
80 			return 0;
81 	}
82 
83 	list_for_each_entry(tmplock, &res->converting, list) {
84 		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
85 			return 0;
86 		if (!dlm_lock_compatible(tmplock->ml.convert_type,
87 					 lock->ml.type))
88 			return 0;
89 	}
90 
91 	return 1;
92 }
93 
94 /* performs lock creation at the lockres master site
95  * locking:
96  *   caller needs:  none
97  *   taken:         takes and drops res->spinlock
98  *   held on exit:  none
99  * returns: DLM_NORMAL, DLM_NOTQUEUED
100  */
101 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
102 				      struct dlm_lock_resource *res,
103 				      struct dlm_lock *lock, int flags)
104 {
105 	int call_ast = 0, kick_thread = 0;
106 	enum dlm_status status = DLM_NORMAL;
107 
108 	mlog(0, "type=%d\n", lock->ml.type);
109 
110 	spin_lock(&res->spinlock);
111 	/* if called from dlm_create_lock_handler, need to
112 	 * ensure it will not sleep in dlm_wait_on_lockres */
113 	status = __dlm_lockres_state_to_status(res);
114 	if (status != DLM_NORMAL &&
115 	    lock->ml.node != dlm->node_num) {
116 		/* erf.  state changed after lock was dropped. */
117 		spin_unlock(&res->spinlock);
118 		dlm_error(status);
119 		return status;
120 	}
121 	__dlm_wait_on_lockres(res);
122 	__dlm_lockres_reserve_ast(res);
123 
124 	if (dlm_can_grant_new_lock(res, lock)) {
125 		mlog(0, "I can grant this lock right away\n");
126 		/* got it right away */
127 		lock->lksb->status = DLM_NORMAL;
128 		status = DLM_NORMAL;
129 		dlm_lock_get(lock);
130 		list_add_tail(&lock->list, &res->granted);
131 
132 		/* for the recovery lock, we can't allow the ast
133 		 * to be queued since the dlmthread is already
134 		 * frozen.  but the recovery lock is always locked
135 		 * with LKM_NOQUEUE so we do not need the ast in
136 		 * this special case */
137 		if (!dlm_is_recovery_lock(res->lockname.name,
138 					  res->lockname.len)) {
139 			kick_thread = 1;
140 			call_ast = 1;
141 		} else {
142 			mlog(0, "%s: returning DLM_NORMAL to "
143 			     "node %u for reco lock\n", dlm->name,
144 			     lock->ml.node);
145 		}
146 	} else {
147 		/* for NOQUEUE request, unless we get the
148 		 * lock right away, return DLM_NOTQUEUED */
149 		if (flags & LKM_NOQUEUE) {
150 			status = DLM_NOTQUEUED;
151 			if (dlm_is_recovery_lock(res->lockname.name,
152 						 res->lockname.len)) {
153 				mlog(0, "%s: returning NOTQUEUED to "
154 				     "node %u for reco lock\n", dlm->name,
155 				     lock->ml.node);
156 			}
157 		} else {
158 			status = DLM_NORMAL;
159 			dlm_lock_get(lock);
160 			list_add_tail(&lock->list, &res->blocked);
161 			kick_thread = 1;
162 		}
163 	}
164 
165 	spin_unlock(&res->spinlock);
166 	wake_up(&res->wq);
167 
168 	/* either queue the ast or release it */
169 	if (call_ast)
170 		dlm_queue_ast(dlm, lock);
171 	else
172 		dlm_lockres_release_ast(dlm, res);
173 
174 	dlm_lockres_calc_usage(dlm, res);
175 	if (kick_thread)
176 		dlm_kick_thread(dlm, res);
177 
178 	return status;
179 }
180 
181 void dlm_revert_pending_lock(struct dlm_lock_resource *res,
182 			     struct dlm_lock *lock)
183 {
184 	/* remove from local queue if it failed */
185 	list_del_init(&lock->list);
186 	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
187 }
188 
189 
190 /*
191  * locking:
192  *   caller needs:  none
193  *   taken:         takes and drops res->spinlock
194  *   held on exit:  none
195  * returns: DLM_DENIED, DLM_RECOVERING, or net status
196  */
197 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
198 				      struct dlm_lock_resource *res,
199 				      struct dlm_lock *lock, int flags)
200 {
201 	enum dlm_status status = DLM_DENIED;
202 	int lockres_changed = 1;
203 
204 	mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
205 	     lock->ml.type, res->lockname.len,
206 	     res->lockname.name, flags);
207 
208 	/*
209 	 * Wait if resource is getting recovered, remastered, etc.
210 	 * If the resource was remastered and new owner is self, then exit.
211 	 */
212 	spin_lock(&res->spinlock);
213 	__dlm_wait_on_lockres(res);
214 	if (res->owner == dlm->node_num) {
215 		spin_unlock(&res->spinlock);
216 		return DLM_RECOVERING;
217 	}
218 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
219 
220 	/* add lock to local (secondary) queue */
221 	dlm_lock_get(lock);
222 	list_add_tail(&lock->list, &res->blocked);
223 	lock->lock_pending = 1;
224 	spin_unlock(&res->spinlock);
225 
226 	/* spec seems to say that you will get DLM_NORMAL when the lock
227 	 * has been queued, meaning we need to wait for a reply here. */
228 	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
229 
230 	spin_lock(&res->spinlock);
231 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
232 	lock->lock_pending = 0;
233 	if (status != DLM_NORMAL) {
234 		if (status == DLM_RECOVERING &&
235 		    dlm_is_recovery_lock(res->lockname.name,
236 					 res->lockname.len)) {
237 			/* recovery lock was mastered by dead node.
238 			 * we need to have calc_usage shoot down this
239 			 * lockres and completely remaster it. */
240 			mlog(0, "%s: recovery lock was owned by "
241 			     "dead node %u, remaster it now.\n",
242 			     dlm->name, res->owner);
243 		} else if (status != DLM_NOTQUEUED) {
244 			/*
245 			 * DO NOT call calc_usage, as this would unhash
246 			 * the remote lockres before we ever get to use
247 			 * it.  treat as if we never made any change to
248 			 * the lockres.
249 			 */
250 			lockres_changed = 0;
251 			dlm_error(status);
252 		}
253 		dlm_revert_pending_lock(res, lock);
254 		dlm_lock_put(lock);
255 	} else if (dlm_is_recovery_lock(res->lockname.name,
256 					res->lockname.len)) {
257 		/* special case for the $RECOVERY lock.
258 		 * there will never be an AST delivered to put
259 		 * this lock on the proper secondary queue
260 		 * (granted), so do it manually. */
261 		mlog(0, "%s: $RECOVERY lock for this node (%u) is "
262 		     "mastered by %u; got lock, manually granting (no ast)\n",
263 		     dlm->name, dlm->node_num, res->owner);
264 		list_move_tail(&lock->list, &res->granted);
265 	}
266 	spin_unlock(&res->spinlock);
267 
268 	if (lockres_changed)
269 		dlm_lockres_calc_usage(dlm, res);
270 
271 	wake_up(&res->wq);
272 	return status;
273 }
274 
275 
276 /* for remote lock creation.
277  * locking:
278  *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
279  *   taken:         none
280  *   held on exit:  none
281  * returns: DLM_NOLOCKMGR, or net status
282  */
283 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
284 					       struct dlm_lock_resource *res,
285 					       struct dlm_lock *lock, int flags)
286 {
287 	struct dlm_create_lock create;
288 	int tmpret, status = 0;
289 	enum dlm_status ret;
290 
291 	memset(&create, 0, sizeof(create));
292 	create.node_idx = dlm->node_num;
293 	create.requested_type = lock->ml.type;
294 	create.cookie = lock->ml.cookie;
295 	create.namelen = res->lockname.len;
296 	create.flags = cpu_to_be32(flags);
297 	memcpy(create.name, res->lockname.name, create.namelen);
298 
299 	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
300 				    sizeof(create), res->owner, &status);
301 	if (tmpret >= 0) {
302 		ret = status;
303 		if (ret == DLM_REJECTED) {
304 			mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
305 			     "owned by node %u. That node is coming back up "
306 			     "currently.\n", dlm->name, create.namelen,
307 			     create.name, res->owner);
308 			dlm_print_one_lock_resource(res);
309 			BUG();
310 		}
311 	} else {
312 		mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
313 		     "node %u\n", dlm->name, create.namelen, create.name,
314 		     tmpret, res->owner);
315 		if (dlm_is_host_down(tmpret))
316 			ret = DLM_RECOVERING;
317 		else
318 			ret = dlm_err_to_dlm_status(tmpret);
319 	}
320 
321 	return ret;
322 }
323 
324 void dlm_lock_get(struct dlm_lock *lock)
325 {
326 	kref_get(&lock->lock_refs);
327 }
328 
329 void dlm_lock_put(struct dlm_lock *lock)
330 {
331 	kref_put(&lock->lock_refs, dlm_lock_release);
332 }
333 
334 static void dlm_lock_release(struct kref *kref)
335 {
336 	struct dlm_lock *lock;
337 
338 	lock = container_of(kref, struct dlm_lock, lock_refs);
339 
340 	BUG_ON(!list_empty(&lock->list));
341 	BUG_ON(!list_empty(&lock->ast_list));
342 	BUG_ON(!list_empty(&lock->bast_list));
343 	BUG_ON(lock->ast_pending);
344 	BUG_ON(lock->bast_pending);
345 
346 	dlm_lock_detach_lockres(lock);
347 
348 	if (lock->lksb_kernel_allocated) {
349 		mlog(0, "freeing kernel-allocated lksb\n");
350 		kfree(lock->lksb);
351 	}
352 	kmem_cache_free(dlm_lock_cache, lock);
353 }
354 
355 /* associate a lock with it's lockres, getting a ref on the lockres */
356 void dlm_lock_attach_lockres(struct dlm_lock *lock,
357 			     struct dlm_lock_resource *res)
358 {
359 	dlm_lockres_get(res);
360 	lock->lockres = res;
361 }
362 
363 /* drop ref on lockres, if there is still one associated with lock */
364 static void dlm_lock_detach_lockres(struct dlm_lock *lock)
365 {
366 	struct dlm_lock_resource *res;
367 
368 	res = lock->lockres;
369 	if (res) {
370 		lock->lockres = NULL;
371 		mlog(0, "removing lock's lockres reference\n");
372 		dlm_lockres_put(res);
373 	}
374 }
375 
376 static void dlm_init_lock(struct dlm_lock *newlock, int type,
377 			  u8 node, u64 cookie)
378 {
379 	INIT_LIST_HEAD(&newlock->list);
380 	INIT_LIST_HEAD(&newlock->ast_list);
381 	INIT_LIST_HEAD(&newlock->bast_list);
382 	spin_lock_init(&newlock->spinlock);
383 	newlock->ml.type = type;
384 	newlock->ml.convert_type = LKM_IVMODE;
385 	newlock->ml.highest_blocked = LKM_IVMODE;
386 	newlock->ml.node = node;
387 	newlock->ml.pad1 = 0;
388 	newlock->ml.list = 0;
389 	newlock->ml.flags = 0;
390 	newlock->ast = NULL;
391 	newlock->bast = NULL;
392 	newlock->astdata = NULL;
393 	newlock->ml.cookie = cpu_to_be64(cookie);
394 	newlock->ast_pending = 0;
395 	newlock->bast_pending = 0;
396 	newlock->convert_pending = 0;
397 	newlock->lock_pending = 0;
398 	newlock->unlock_pending = 0;
399 	newlock->cancel_pending = 0;
400 	newlock->lksb_kernel_allocated = 0;
401 
402 	kref_init(&newlock->lock_refs);
403 }
404 
405 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
406 			       struct dlm_lockstatus *lksb)
407 {
408 	struct dlm_lock *lock;
409 	int kernel_allocated = 0;
410 
411 	lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
412 	if (!lock)
413 		return NULL;
414 
415 	if (!lksb) {
416 		/* zero memory only if kernel-allocated */
417 		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
418 		if (!lksb) {
419 			kmem_cache_free(dlm_lock_cache, lock);
420 			return NULL;
421 		}
422 		kernel_allocated = 1;
423 	}
424 
425 	dlm_init_lock(lock, type, node, cookie);
426 	if (kernel_allocated)
427 		lock->lksb_kernel_allocated = 1;
428 	lock->lksb = lksb;
429 	lksb->lockid = lock;
430 	return lock;
431 }
432 
433 /* handler for lock creation net message
434  * locking:
435  *   caller needs:  none
436  *   taken:         takes and drops res->spinlock
437  *   held on exit:  none
438  * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
439  */
440 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
441 			    void **ret_data)
442 {
443 	struct dlm_ctxt *dlm = data;
444 	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
445 	struct dlm_lock_resource *res = NULL;
446 	struct dlm_lock *newlock = NULL;
447 	struct dlm_lockstatus *lksb = NULL;
448 	enum dlm_status status = DLM_NORMAL;
449 	char *name;
450 	unsigned int namelen;
451 
452 	BUG_ON(!dlm);
453 
454 	if (!dlm_grab(dlm))
455 		return DLM_REJECTED;
456 
457 	name = create->name;
458 	namelen = create->namelen;
459 	status = DLM_REJECTED;
460 	if (!dlm_domain_fully_joined(dlm)) {
461 		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
462 		     "sending a create_lock message for lock %.*s!\n",
463 		     dlm->name, create->node_idx, namelen, name);
464 		dlm_error(status);
465 		goto leave;
466 	}
467 
468 	status = DLM_IVBUFLEN;
469 	if (namelen > DLM_LOCKID_NAME_MAX) {
470 		dlm_error(status);
471 		goto leave;
472 	}
473 
474 	status = DLM_SYSERR;
475 	newlock = dlm_new_lock(create->requested_type,
476 			       create->node_idx,
477 			       be64_to_cpu(create->cookie), NULL);
478 	if (!newlock) {
479 		dlm_error(status);
480 		goto leave;
481 	}
482 
483 	lksb = newlock->lksb;
484 
485 	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
486 		lksb->flags |= DLM_LKSB_GET_LVB;
487 		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
488 	}
489 
490 	status = DLM_IVLOCKID;
491 	res = dlm_lookup_lockres(dlm, name, namelen);
492 	if (!res) {
493 		dlm_error(status);
494 		goto leave;
495 	}
496 
497 	spin_lock(&res->spinlock);
498 	status = __dlm_lockres_state_to_status(res);
499 	spin_unlock(&res->spinlock);
500 
501 	if (status != DLM_NORMAL) {
502 		mlog(0, "lockres recovering/migrating/in-progress\n");
503 		goto leave;
504 	}
505 
506 	dlm_lock_attach_lockres(newlock, res);
507 
508 	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
509 leave:
510 	if (status != DLM_NORMAL)
511 		if (newlock)
512 			dlm_lock_put(newlock);
513 
514 	if (res)
515 		dlm_lockres_put(res);
516 
517 	dlm_put(dlm);
518 
519 	return status;
520 }
521 
522 
523 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
524 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
525 {
526 	u64 tmpnode = node_num;
527 
528 	/* shift single byte of node num into top 8 bits */
529 	tmpnode <<= 56;
530 
531 	spin_lock(&dlm_cookie_lock);
532 	*cookie = (dlm_next_cookie | tmpnode);
533 	if (++dlm_next_cookie & 0xff00000000000000ull) {
534 		mlog(0, "This node's cookie will now wrap!\n");
535 		dlm_next_cookie = 1;
536 	}
537 	spin_unlock(&dlm_cookie_lock);
538 }
539 
540 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
541 			struct dlm_lockstatus *lksb, int flags,
542 			const char *name, int namelen, dlm_astlockfunc_t *ast,
543 			void *data, dlm_bastlockfunc_t *bast)
544 {
545 	enum dlm_status status;
546 	struct dlm_lock_resource *res = NULL;
547 	struct dlm_lock *lock = NULL;
548 	int convert = 0, recovery = 0;
549 
550 	/* yes this function is a mess.
551 	 * TODO: clean this up.  lots of common code in the
552 	 *       lock and convert paths, especially in the retry blocks */
553 	if (!lksb) {
554 		dlm_error(DLM_BADARGS);
555 		return DLM_BADARGS;
556 	}
557 
558 	status = DLM_BADPARAM;
559 	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
560 		dlm_error(status);
561 		goto error;
562 	}
563 
564 	if (flags & ~LKM_VALID_FLAGS) {
565 		dlm_error(status);
566 		goto error;
567 	}
568 
569 	convert = (flags & LKM_CONVERT);
570 	recovery = (flags & LKM_RECOVERY);
571 
572 	if (recovery &&
573 	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
574 		dlm_error(status);
575 		goto error;
576 	}
577 	if (convert && (flags & LKM_LOCAL)) {
578 		mlog(ML_ERROR, "strange LOCAL convert request!\n");
579 		goto error;
580 	}
581 
582 	if (convert) {
583 		/* CONVERT request */
584 
585 		/* if converting, must pass in a valid dlm_lock */
586 		lock = lksb->lockid;
587 		if (!lock) {
588 			mlog(ML_ERROR, "NULL lock pointer in convert "
589 			     "request\n");
590 			goto error;
591 		}
592 
593 		res = lock->lockres;
594 		if (!res) {
595 			mlog(ML_ERROR, "NULL lockres pointer in convert "
596 			     "request\n");
597 			goto error;
598 		}
599 		dlm_lockres_get(res);
600 
601 		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
602 	 	 * static after the original lock call.  convert requests will
603 		 * ensure that everything is the same, or return DLM_BADARGS.
604 	 	 * this means that DLM_DENIED_NOASTS will never be returned.
605 	 	 */
606 		if (lock->lksb != lksb || lock->ast != ast ||
607 		    lock->bast != bast || lock->astdata != data) {
608 			status = DLM_BADARGS;
609 			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
610 			     "astdata=%p\n", lksb, ast, bast, data);
611 			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
612 			     "astdata=%p\n", lock->lksb, lock->ast,
613 			     lock->bast, lock->astdata);
614 			goto error;
615 		}
616 retry_convert:
617 		dlm_wait_for_recovery(dlm);
618 
619 		if (res->owner == dlm->node_num)
620 			status = dlmconvert_master(dlm, res, lock, flags, mode);
621 		else
622 			status = dlmconvert_remote(dlm, res, lock, flags, mode);
623 		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
624 		    status == DLM_FORWARD) {
625 			/* for now, see how this works without sleeping
626 			 * and just retry right away.  I suspect the reco
627 			 * or migration will complete fast enough that
628 			 * no waiting will be necessary */
629 			mlog(0, "retrying convert with migration/recovery/"
630 			     "in-progress\n");
631 			msleep(100);
632 			goto retry_convert;
633 		}
634 	} else {
635 		u64 tmpcookie;
636 
637 		/* LOCK request */
638 		status = DLM_BADARGS;
639 		if (!name) {
640 			dlm_error(status);
641 			goto error;
642 		}
643 
644 		status = DLM_IVBUFLEN;
645 		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
646 			dlm_error(status);
647 			goto error;
648 		}
649 
650 		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
651 		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
652 		if (!lock) {
653 			dlm_error(status);
654 			goto error;
655 		}
656 
657 		if (!recovery)
658 			dlm_wait_for_recovery(dlm);
659 
660 		/* find or create the lock resource */
661 		res = dlm_get_lock_resource(dlm, name, namelen, flags);
662 		if (!res) {
663 			status = DLM_IVLOCKID;
664 			dlm_error(status);
665 			goto error;
666 		}
667 
668 		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
669 		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
670 
671 		dlm_lock_attach_lockres(lock, res);
672 		lock->ast = ast;
673 		lock->bast = bast;
674 		lock->astdata = data;
675 
676 retry_lock:
677 		if (flags & LKM_VALBLK) {
678 			mlog(0, "LKM_VALBLK passed by caller\n");
679 
680 			/* LVB requests for non PR, PW or EX locks are
681 			 * ignored. */
682 			if (mode < LKM_PRMODE)
683 				flags &= ~LKM_VALBLK;
684 			else {
685 				flags |= LKM_GET_LVB;
686 				lock->lksb->flags |= DLM_LKSB_GET_LVB;
687 			}
688 		}
689 
690 		if (res->owner == dlm->node_num)
691 			status = dlmlock_master(dlm, res, lock, flags);
692 		else
693 			status = dlmlock_remote(dlm, res, lock, flags);
694 
695 		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
696 		    status == DLM_FORWARD) {
697 			msleep(100);
698 			if (recovery) {
699 				if (status != DLM_RECOVERING)
700 					goto retry_lock;
701 				/* wait to see the node go down, then
702 				 * drop down and allow the lockres to
703 				 * get cleaned up.  need to remaster. */
704 				dlm_wait_for_node_death(dlm, res->owner,
705 						DLM_NODE_DEATH_WAIT_MAX);
706 			} else {
707 				dlm_wait_for_recovery(dlm);
708 				goto retry_lock;
709 			}
710 		}
711 
712 		/* Inflight taken in dlm_get_lock_resource() is dropped here */
713 		spin_lock(&res->spinlock);
714 		dlm_lockres_drop_inflight_ref(dlm, res);
715 		spin_unlock(&res->spinlock);
716 
717 		dlm_lockres_calc_usage(dlm, res);
718 		dlm_kick_thread(dlm, res);
719 
720 		if (status != DLM_NORMAL) {
721 			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
722 			if (status != DLM_NOTQUEUED)
723 				dlm_error(status);
724 			goto error;
725 		}
726 	}
727 
728 error:
729 	if (status != DLM_NORMAL) {
730 		if (lock && !convert)
731 			dlm_lock_put(lock);
732 		// this is kind of unnecessary
733 		lksb->status = status;
734 	}
735 
736 	/* put lockres ref from the convert path
737 	 * or from dlm_get_lock_resource */
738 	if (res)
739 		dlm_lockres_put(res);
740 
741 	return status;
742 }
743 EXPORT_SYMBOL_GPL(dlmlock);
744