xref: /linux/fs/ocfs2/dlm/dlmlock.c (revision 37744feebc086908fd89760650f458ab19071750)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * dlmlock.c
6  *
7  * underlying calls for lock creation
8  *
9  * Copyright (C) 2004 Oracle.  All rights reserved.
10  */
11 
12 
13 #include <linux/module.h>
14 #include <linux/fs.h>
15 #include <linux/types.h>
16 #include <linux/slab.h>
17 #include <linux/highmem.h>
18 #include <linux/init.h>
19 #include <linux/sysctl.h>
20 #include <linux/random.h>
21 #include <linux/blkdev.h>
22 #include <linux/socket.h>
23 #include <linux/inet.h>
24 #include <linux/spinlock.h>
25 #include <linux/delay.h>
26 
27 
28 #include "../cluster/heartbeat.h"
29 #include "../cluster/nodemanager.h"
30 #include "../cluster/tcp.h"
31 
32 #include "dlmapi.h"
33 #include "dlmcommon.h"
34 
35 #include "dlmconvert.h"
36 
37 #define MLOG_MASK_PREFIX ML_DLM
38 #include "../cluster/masklog.h"
39 
40 static struct kmem_cache *dlm_lock_cache;
41 
42 static DEFINE_SPINLOCK(dlm_cookie_lock);
43 static u64 dlm_next_cookie = 1;
44 
45 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
46 					       struct dlm_lock_resource *res,
47 					       struct dlm_lock *lock, int flags);
48 static void dlm_init_lock(struct dlm_lock *newlock, int type,
49 			  u8 node, u64 cookie);
50 static void dlm_lock_release(struct kref *kref);
51 static void dlm_lock_detach_lockres(struct dlm_lock *lock);
52 
53 int dlm_init_lock_cache(void)
54 {
55 	dlm_lock_cache = kmem_cache_create("o2dlm_lock",
56 					   sizeof(struct dlm_lock),
57 					   0, SLAB_HWCACHE_ALIGN, NULL);
58 	if (dlm_lock_cache == NULL)
59 		return -ENOMEM;
60 	return 0;
61 }
62 
63 void dlm_destroy_lock_cache(void)
64 {
65 	kmem_cache_destroy(dlm_lock_cache);
66 }
67 
68 /* Tell us whether we can grant a new lock request.
69  * locking:
70  *   caller needs:  res->spinlock
71  *   taken:         none
72  *   held on exit:  none
73  * returns: 1 if the lock can be granted, 0 otherwise.
74  */
75 static int dlm_can_grant_new_lock(struct dlm_lock_resource *res,
76 				  struct dlm_lock *lock)
77 {
78 	struct dlm_lock *tmplock;
79 
80 	list_for_each_entry(tmplock, &res->granted, list) {
81 		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
82 			return 0;
83 	}
84 
85 	list_for_each_entry(tmplock, &res->converting, list) {
86 		if (!dlm_lock_compatible(tmplock->ml.type, lock->ml.type))
87 			return 0;
88 		if (!dlm_lock_compatible(tmplock->ml.convert_type,
89 					 lock->ml.type))
90 			return 0;
91 	}
92 
93 	return 1;
94 }
95 
96 /* performs lock creation at the lockres master site
97  * locking:
98  *   caller needs:  none
99  *   taken:         takes and drops res->spinlock
100  *   held on exit:  none
101  * returns: DLM_NORMAL, DLM_NOTQUEUED
102  */
103 static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
104 				      struct dlm_lock_resource *res,
105 				      struct dlm_lock *lock, int flags)
106 {
107 	int call_ast = 0, kick_thread = 0;
108 	enum dlm_status status = DLM_NORMAL;
109 
110 	mlog(0, "type=%d\n", lock->ml.type);
111 
112 	spin_lock(&res->spinlock);
113 	/* if called from dlm_create_lock_handler, need to
114 	 * ensure it will not sleep in dlm_wait_on_lockres */
115 	status = __dlm_lockres_state_to_status(res);
116 	if (status != DLM_NORMAL &&
117 	    lock->ml.node != dlm->node_num) {
118 		/* erf.  state changed after lock was dropped. */
119 		spin_unlock(&res->spinlock);
120 		dlm_error(status);
121 		return status;
122 	}
123 	__dlm_wait_on_lockres(res);
124 	__dlm_lockres_reserve_ast(res);
125 
126 	if (dlm_can_grant_new_lock(res, lock)) {
127 		mlog(0, "I can grant this lock right away\n");
128 		/* got it right away */
129 		lock->lksb->status = DLM_NORMAL;
130 		status = DLM_NORMAL;
131 		dlm_lock_get(lock);
132 		list_add_tail(&lock->list, &res->granted);
133 
134 		/* for the recovery lock, we can't allow the ast
135 		 * to be queued since the dlmthread is already
136 		 * frozen.  but the recovery lock is always locked
137 		 * with LKM_NOQUEUE so we do not need the ast in
138 		 * this special case */
139 		if (!dlm_is_recovery_lock(res->lockname.name,
140 					  res->lockname.len)) {
141 			kick_thread = 1;
142 			call_ast = 1;
143 		} else {
144 			mlog(0, "%s: returning DLM_NORMAL to "
145 			     "node %u for reco lock\n", dlm->name,
146 			     lock->ml.node);
147 		}
148 	} else {
149 		/* for NOQUEUE request, unless we get the
150 		 * lock right away, return DLM_NOTQUEUED */
151 		if (flags & LKM_NOQUEUE) {
152 			status = DLM_NOTQUEUED;
153 			if (dlm_is_recovery_lock(res->lockname.name,
154 						 res->lockname.len)) {
155 				mlog(0, "%s: returning NOTQUEUED to "
156 				     "node %u for reco lock\n", dlm->name,
157 				     lock->ml.node);
158 			}
159 		} else {
160 			status = DLM_NORMAL;
161 			dlm_lock_get(lock);
162 			list_add_tail(&lock->list, &res->blocked);
163 			kick_thread = 1;
164 		}
165 	}
166 
167 	spin_unlock(&res->spinlock);
168 	wake_up(&res->wq);
169 
170 	/* either queue the ast or release it */
171 	if (call_ast)
172 		dlm_queue_ast(dlm, lock);
173 	else
174 		dlm_lockres_release_ast(dlm, res);
175 
176 	dlm_lockres_calc_usage(dlm, res);
177 	if (kick_thread)
178 		dlm_kick_thread(dlm, res);
179 
180 	return status;
181 }
182 
183 void dlm_revert_pending_lock(struct dlm_lock_resource *res,
184 			     struct dlm_lock *lock)
185 {
186 	/* remove from local queue if it failed */
187 	list_del_init(&lock->list);
188 	lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
189 }
190 
191 
192 /*
193  * locking:
194  *   caller needs:  none
195  *   taken:         takes and drops res->spinlock
196  *   held on exit:  none
197  * returns: DLM_DENIED, DLM_RECOVERING, or net status
198  */
199 static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
200 				      struct dlm_lock_resource *res,
201 				      struct dlm_lock *lock, int flags)
202 {
203 	enum dlm_status status = DLM_DENIED;
204 	int lockres_changed = 1;
205 
206 	mlog(0, "type=%d, lockres %.*s, flags = 0x%x\n",
207 	     lock->ml.type, res->lockname.len,
208 	     res->lockname.name, flags);
209 
210 	/*
211 	 * Wait if resource is getting recovered, remastered, etc.
212 	 * If the resource was remastered and new owner is self, then exit.
213 	 */
214 	spin_lock(&res->spinlock);
215 	__dlm_wait_on_lockres(res);
216 	if (res->owner == dlm->node_num) {
217 		spin_unlock(&res->spinlock);
218 		return DLM_RECOVERING;
219 	}
220 	res->state |= DLM_LOCK_RES_IN_PROGRESS;
221 
222 	/* add lock to local (secondary) queue */
223 	dlm_lock_get(lock);
224 	list_add_tail(&lock->list, &res->blocked);
225 	lock->lock_pending = 1;
226 	spin_unlock(&res->spinlock);
227 
228 	/* spec seems to say that you will get DLM_NORMAL when the lock
229 	 * has been queued, meaning we need to wait for a reply here. */
230 	status = dlm_send_remote_lock_request(dlm, res, lock, flags);
231 
232 	spin_lock(&res->spinlock);
233 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
234 	lock->lock_pending = 0;
235 	if (status != DLM_NORMAL) {
236 		if (status == DLM_RECOVERING &&
237 		    dlm_is_recovery_lock(res->lockname.name,
238 					 res->lockname.len)) {
239 			/* recovery lock was mastered by dead node.
240 			 * we need to have calc_usage shoot down this
241 			 * lockres and completely remaster it. */
242 			mlog(0, "%s: recovery lock was owned by "
243 			     "dead node %u, remaster it now.\n",
244 			     dlm->name, res->owner);
245 		} else if (status != DLM_NOTQUEUED) {
246 			/*
247 			 * DO NOT call calc_usage, as this would unhash
248 			 * the remote lockres before we ever get to use
249 			 * it.  treat as if we never made any change to
250 			 * the lockres.
251 			 */
252 			lockres_changed = 0;
253 			dlm_error(status);
254 		}
255 		dlm_revert_pending_lock(res, lock);
256 		dlm_lock_put(lock);
257 	} else if (dlm_is_recovery_lock(res->lockname.name,
258 					res->lockname.len)) {
259 		/* special case for the $RECOVERY lock.
260 		 * there will never be an AST delivered to put
261 		 * this lock on the proper secondary queue
262 		 * (granted), so do it manually. */
263 		mlog(0, "%s: $RECOVERY lock for this node (%u) is "
264 		     "mastered by %u; got lock, manually granting (no ast)\n",
265 		     dlm->name, dlm->node_num, res->owner);
266 		list_move_tail(&lock->list, &res->granted);
267 	}
268 	spin_unlock(&res->spinlock);
269 
270 	if (lockres_changed)
271 		dlm_lockres_calc_usage(dlm, res);
272 
273 	wake_up(&res->wq);
274 	return status;
275 }
276 
277 
278 /* for remote lock creation.
279  * locking:
280  *   caller needs:  none, but need res->state & DLM_LOCK_RES_IN_PROGRESS
281  *   taken:         none
282  *   held on exit:  none
283  * returns: DLM_NOLOCKMGR, or net status
284  */
285 static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm,
286 					       struct dlm_lock_resource *res,
287 					       struct dlm_lock *lock, int flags)
288 {
289 	struct dlm_create_lock create;
290 	int tmpret, status = 0;
291 	enum dlm_status ret;
292 
293 	memset(&create, 0, sizeof(create));
294 	create.node_idx = dlm->node_num;
295 	create.requested_type = lock->ml.type;
296 	create.cookie = lock->ml.cookie;
297 	create.namelen = res->lockname.len;
298 	create.flags = cpu_to_be32(flags);
299 	memcpy(create.name, res->lockname.name, create.namelen);
300 
301 	tmpret = o2net_send_message(DLM_CREATE_LOCK_MSG, dlm->key, &create,
302 				    sizeof(create), res->owner, &status);
303 	if (tmpret >= 0) {
304 		ret = status;
305 		if (ret == DLM_REJECTED) {
306 			mlog(ML_ERROR, "%s: res %.*s, Stale lockres no longer "
307 			     "owned by node %u. That node is coming back up "
308 			     "currently.\n", dlm->name, create.namelen,
309 			     create.name, res->owner);
310 			dlm_print_one_lock_resource(res);
311 			BUG();
312 		}
313 	} else {
314 		mlog(ML_ERROR, "%s: res %.*s, Error %d send CREATE LOCK to "
315 		     "node %u\n", dlm->name, create.namelen, create.name,
316 		     tmpret, res->owner);
317 		if (dlm_is_host_down(tmpret))
318 			ret = DLM_RECOVERING;
319 		else
320 			ret = dlm_err_to_dlm_status(tmpret);
321 	}
322 
323 	return ret;
324 }
325 
326 void dlm_lock_get(struct dlm_lock *lock)
327 {
328 	kref_get(&lock->lock_refs);
329 }
330 
331 void dlm_lock_put(struct dlm_lock *lock)
332 {
333 	kref_put(&lock->lock_refs, dlm_lock_release);
334 }
335 
336 static void dlm_lock_release(struct kref *kref)
337 {
338 	struct dlm_lock *lock;
339 
340 	lock = container_of(kref, struct dlm_lock, lock_refs);
341 
342 	BUG_ON(!list_empty(&lock->list));
343 	BUG_ON(!list_empty(&lock->ast_list));
344 	BUG_ON(!list_empty(&lock->bast_list));
345 	BUG_ON(lock->ast_pending);
346 	BUG_ON(lock->bast_pending);
347 
348 	dlm_lock_detach_lockres(lock);
349 
350 	if (lock->lksb_kernel_allocated) {
351 		mlog(0, "freeing kernel-allocated lksb\n");
352 		kfree(lock->lksb);
353 	}
354 	kmem_cache_free(dlm_lock_cache, lock);
355 }
356 
357 /* associate a lock with it's lockres, getting a ref on the lockres */
358 void dlm_lock_attach_lockres(struct dlm_lock *lock,
359 			     struct dlm_lock_resource *res)
360 {
361 	dlm_lockres_get(res);
362 	lock->lockres = res;
363 }
364 
365 /* drop ref on lockres, if there is still one associated with lock */
366 static void dlm_lock_detach_lockres(struct dlm_lock *lock)
367 {
368 	struct dlm_lock_resource *res;
369 
370 	res = lock->lockres;
371 	if (res) {
372 		lock->lockres = NULL;
373 		mlog(0, "removing lock's lockres reference\n");
374 		dlm_lockres_put(res);
375 	}
376 }
377 
378 static void dlm_init_lock(struct dlm_lock *newlock, int type,
379 			  u8 node, u64 cookie)
380 {
381 	INIT_LIST_HEAD(&newlock->list);
382 	INIT_LIST_HEAD(&newlock->ast_list);
383 	INIT_LIST_HEAD(&newlock->bast_list);
384 	spin_lock_init(&newlock->spinlock);
385 	newlock->ml.type = type;
386 	newlock->ml.convert_type = LKM_IVMODE;
387 	newlock->ml.highest_blocked = LKM_IVMODE;
388 	newlock->ml.node = node;
389 	newlock->ml.pad1 = 0;
390 	newlock->ml.list = 0;
391 	newlock->ml.flags = 0;
392 	newlock->ast = NULL;
393 	newlock->bast = NULL;
394 	newlock->astdata = NULL;
395 	newlock->ml.cookie = cpu_to_be64(cookie);
396 	newlock->ast_pending = 0;
397 	newlock->bast_pending = 0;
398 	newlock->convert_pending = 0;
399 	newlock->lock_pending = 0;
400 	newlock->unlock_pending = 0;
401 	newlock->cancel_pending = 0;
402 	newlock->lksb_kernel_allocated = 0;
403 
404 	kref_init(&newlock->lock_refs);
405 }
406 
407 struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
408 			       struct dlm_lockstatus *lksb)
409 {
410 	struct dlm_lock *lock;
411 	int kernel_allocated = 0;
412 
413 	lock = kmem_cache_zalloc(dlm_lock_cache, GFP_NOFS);
414 	if (!lock)
415 		return NULL;
416 
417 	if (!lksb) {
418 		/* zero memory only if kernel-allocated */
419 		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
420 		if (!lksb) {
421 			kmem_cache_free(dlm_lock_cache, lock);
422 			return NULL;
423 		}
424 		kernel_allocated = 1;
425 	}
426 
427 	dlm_init_lock(lock, type, node, cookie);
428 	if (kernel_allocated)
429 		lock->lksb_kernel_allocated = 1;
430 	lock->lksb = lksb;
431 	lksb->lockid = lock;
432 	return lock;
433 }
434 
435 /* handler for lock creation net message
436  * locking:
437  *   caller needs:  none
438  *   taken:         takes and drops res->spinlock
439  *   held on exit:  none
440  * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED
441  */
442 int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data,
443 			    void **ret_data)
444 {
445 	struct dlm_ctxt *dlm = data;
446 	struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf;
447 	struct dlm_lock_resource *res = NULL;
448 	struct dlm_lock *newlock = NULL;
449 	struct dlm_lockstatus *lksb = NULL;
450 	enum dlm_status status = DLM_NORMAL;
451 	char *name;
452 	unsigned int namelen;
453 
454 	BUG_ON(!dlm);
455 
456 	if (!dlm_grab(dlm))
457 		return DLM_REJECTED;
458 
459 	name = create->name;
460 	namelen = create->namelen;
461 	status = DLM_REJECTED;
462 	if (!dlm_domain_fully_joined(dlm)) {
463 		mlog(ML_ERROR, "Domain %s not fully joined, but node %u is "
464 		     "sending a create_lock message for lock %.*s!\n",
465 		     dlm->name, create->node_idx, namelen, name);
466 		dlm_error(status);
467 		goto leave;
468 	}
469 
470 	status = DLM_IVBUFLEN;
471 	if (namelen > DLM_LOCKID_NAME_MAX) {
472 		dlm_error(status);
473 		goto leave;
474 	}
475 
476 	status = DLM_SYSERR;
477 	newlock = dlm_new_lock(create->requested_type,
478 			       create->node_idx,
479 			       be64_to_cpu(create->cookie), NULL);
480 	if (!newlock) {
481 		dlm_error(status);
482 		goto leave;
483 	}
484 
485 	lksb = newlock->lksb;
486 
487 	if (be32_to_cpu(create->flags) & LKM_GET_LVB) {
488 		lksb->flags |= DLM_LKSB_GET_LVB;
489 		mlog(0, "set DLM_LKSB_GET_LVB flag\n");
490 	}
491 
492 	status = DLM_IVLOCKID;
493 	res = dlm_lookup_lockres(dlm, name, namelen);
494 	if (!res) {
495 		dlm_error(status);
496 		goto leave;
497 	}
498 
499 	spin_lock(&res->spinlock);
500 	status = __dlm_lockres_state_to_status(res);
501 	spin_unlock(&res->spinlock);
502 
503 	if (status != DLM_NORMAL) {
504 		mlog(0, "lockres recovering/migrating/in-progress\n");
505 		goto leave;
506 	}
507 
508 	dlm_lock_attach_lockres(newlock, res);
509 
510 	status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));
511 leave:
512 	if (status != DLM_NORMAL)
513 		if (newlock)
514 			dlm_lock_put(newlock);
515 
516 	if (res)
517 		dlm_lockres_put(res);
518 
519 	dlm_put(dlm);
520 
521 	return status;
522 }
523 
524 
525 /* fetch next node-local (u8 nodenum + u56 cookie) into u64 */
526 static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie)
527 {
528 	u64 tmpnode = node_num;
529 
530 	/* shift single byte of node num into top 8 bits */
531 	tmpnode <<= 56;
532 
533 	spin_lock(&dlm_cookie_lock);
534 	*cookie = (dlm_next_cookie | tmpnode);
535 	if (++dlm_next_cookie & 0xff00000000000000ull) {
536 		mlog(0, "This node's cookie will now wrap!\n");
537 		dlm_next_cookie = 1;
538 	}
539 	spin_unlock(&dlm_cookie_lock);
540 }
541 
542 enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
543 			struct dlm_lockstatus *lksb, int flags,
544 			const char *name, int namelen, dlm_astlockfunc_t *ast,
545 			void *data, dlm_bastlockfunc_t *bast)
546 {
547 	enum dlm_status status;
548 	struct dlm_lock_resource *res = NULL;
549 	struct dlm_lock *lock = NULL;
550 	int convert = 0, recovery = 0;
551 
552 	/* yes this function is a mess.
553 	 * TODO: clean this up.  lots of common code in the
554 	 *       lock and convert paths, especially in the retry blocks */
555 	if (!lksb) {
556 		dlm_error(DLM_BADARGS);
557 		return DLM_BADARGS;
558 	}
559 
560 	status = DLM_BADPARAM;
561 	if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) {
562 		dlm_error(status);
563 		goto error;
564 	}
565 
566 	if (flags & ~LKM_VALID_FLAGS) {
567 		dlm_error(status);
568 		goto error;
569 	}
570 
571 	convert = (flags & LKM_CONVERT);
572 	recovery = (flags & LKM_RECOVERY);
573 
574 	if (recovery &&
575 	    (!dlm_is_recovery_lock(name, namelen) || convert) ) {
576 		dlm_error(status);
577 		goto error;
578 	}
579 	if (convert && (flags & LKM_LOCAL)) {
580 		mlog(ML_ERROR, "strange LOCAL convert request!\n");
581 		goto error;
582 	}
583 
584 	if (convert) {
585 		/* CONVERT request */
586 
587 		/* if converting, must pass in a valid dlm_lock */
588 		lock = lksb->lockid;
589 		if (!lock) {
590 			mlog(ML_ERROR, "NULL lock pointer in convert "
591 			     "request\n");
592 			goto error;
593 		}
594 
595 		res = lock->lockres;
596 		if (!res) {
597 			mlog(ML_ERROR, "NULL lockres pointer in convert "
598 			     "request\n");
599 			goto error;
600 		}
601 		dlm_lockres_get(res);
602 
603 		/* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are
604 	 	 * static after the original lock call.  convert requests will
605 		 * ensure that everything is the same, or return DLM_BADARGS.
606 	 	 * this means that DLM_DENIED_NOASTS will never be returned.
607 	 	 */
608 		if (lock->lksb != lksb || lock->ast != ast ||
609 		    lock->bast != bast || lock->astdata != data) {
610 			status = DLM_BADARGS;
611 			mlog(ML_ERROR, "new args:  lksb=%p, ast=%p, bast=%p, "
612 			     "astdata=%p\n", lksb, ast, bast, data);
613 			mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, "
614 			     "astdata=%p\n", lock->lksb, lock->ast,
615 			     lock->bast, lock->astdata);
616 			goto error;
617 		}
618 retry_convert:
619 		dlm_wait_for_recovery(dlm);
620 
621 		if (res->owner == dlm->node_num)
622 			status = dlmconvert_master(dlm, res, lock, flags, mode);
623 		else
624 			status = dlmconvert_remote(dlm, res, lock, flags, mode);
625 		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
626 		    status == DLM_FORWARD) {
627 			/* for now, see how this works without sleeping
628 			 * and just retry right away.  I suspect the reco
629 			 * or migration will complete fast enough that
630 			 * no waiting will be necessary */
631 			mlog(0, "retrying convert with migration/recovery/"
632 			     "in-progress\n");
633 			msleep(100);
634 			goto retry_convert;
635 		}
636 	} else {
637 		u64 tmpcookie;
638 
639 		/* LOCK request */
640 		status = DLM_BADARGS;
641 		if (!name) {
642 			dlm_error(status);
643 			goto error;
644 		}
645 
646 		status = DLM_IVBUFLEN;
647 		if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) {
648 			dlm_error(status);
649 			goto error;
650 		}
651 
652 		dlm_get_next_cookie(dlm->node_num, &tmpcookie);
653 		lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb);
654 		if (!lock) {
655 			dlm_error(status);
656 			goto error;
657 		}
658 
659 		if (!recovery)
660 			dlm_wait_for_recovery(dlm);
661 
662 		/* find or create the lock resource */
663 		res = dlm_get_lock_resource(dlm, name, namelen, flags);
664 		if (!res) {
665 			status = DLM_IVLOCKID;
666 			dlm_error(status);
667 			goto error;
668 		}
669 
670 		mlog(0, "type=%d, flags = 0x%x\n", mode, flags);
671 		mlog(0, "creating lock: lock=%p res=%p\n", lock, res);
672 
673 		dlm_lock_attach_lockres(lock, res);
674 		lock->ast = ast;
675 		lock->bast = bast;
676 		lock->astdata = data;
677 
678 retry_lock:
679 		if (flags & LKM_VALBLK) {
680 			mlog(0, "LKM_VALBLK passed by caller\n");
681 
682 			/* LVB requests for non PR, PW or EX locks are
683 			 * ignored. */
684 			if (mode < LKM_PRMODE)
685 				flags &= ~LKM_VALBLK;
686 			else {
687 				flags |= LKM_GET_LVB;
688 				lock->lksb->flags |= DLM_LKSB_GET_LVB;
689 			}
690 		}
691 
692 		if (res->owner == dlm->node_num)
693 			status = dlmlock_master(dlm, res, lock, flags);
694 		else
695 			status = dlmlock_remote(dlm, res, lock, flags);
696 
697 		if (status == DLM_RECOVERING || status == DLM_MIGRATING ||
698 		    status == DLM_FORWARD) {
699 			msleep(100);
700 			if (recovery) {
701 				if (status != DLM_RECOVERING)
702 					goto retry_lock;
703 				/* wait to see the node go down, then
704 				 * drop down and allow the lockres to
705 				 * get cleaned up.  need to remaster. */
706 				dlm_wait_for_node_death(dlm, res->owner,
707 						DLM_NODE_DEATH_WAIT_MAX);
708 			} else {
709 				dlm_wait_for_recovery(dlm);
710 				goto retry_lock;
711 			}
712 		}
713 
714 		/* Inflight taken in dlm_get_lock_resource() is dropped here */
715 		spin_lock(&res->spinlock);
716 		dlm_lockres_drop_inflight_ref(dlm, res);
717 		spin_unlock(&res->spinlock);
718 
719 		dlm_lockres_calc_usage(dlm, res);
720 		dlm_kick_thread(dlm, res);
721 
722 		if (status != DLM_NORMAL) {
723 			lock->lksb->flags &= ~DLM_LKSB_GET_LVB;
724 			if (status != DLM_NOTQUEUED)
725 				dlm_error(status);
726 			goto error;
727 		}
728 	}
729 
730 error:
731 	if (status != DLM_NORMAL) {
732 		if (lock && !convert)
733 			dlm_lock_put(lock);
734 		// this is kind of unnecessary
735 		lksb->status = status;
736 	}
737 
738 	/* put lockres ref from the convert path
739 	 * or from dlm_get_lock_resource */
740 	if (res)
741 		dlm_lockres_put(res);
742 
743 	return status;
744 }
745 EXPORT_SYMBOL_GPL(dlmlock);
746