xref: /linux/fs/ocfs2/dlm/dlmunlock.c (revision 2697b79a469b68e3ad3640f55284359c1396278d)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3  * dlmunlock.c
4  *
5  * underlying calls for unlocking locks
6  *
7  * Copyright (C) 2004 Oracle.  All rights reserved.
8  */
9 
10 
11 #include <linux/module.h>
12 #include <linux/fs.h>
13 #include <linux/types.h>
14 #include <linux/highmem.h>
15 #include <linux/init.h>
16 #include <linux/sysctl.h>
17 #include <linux/random.h>
18 #include <linux/blkdev.h>
19 #include <linux/socket.h>
20 #include <linux/inet.h>
21 #include <linux/spinlock.h>
22 #include <linux/delay.h>
23 
24 #include "../cluster/heartbeat.h"
25 #include "../cluster/nodemanager.h"
26 #include "../cluster/tcp.h"
27 
28 #include "dlmapi.h"
29 #include "dlmcommon.h"
30 
31 #define MLOG_MASK_PREFIX ML_DLM
32 #include "../cluster/masklog.h"
33 
34 #define DLM_UNLOCK_FREE_LOCK           0x00000001
35 #define DLM_UNLOCK_CALL_AST            0x00000002
36 #define DLM_UNLOCK_REMOVE_LOCK         0x00000004
37 #define DLM_UNLOCK_REGRANT_LOCK        0x00000008
38 #define DLM_UNLOCK_CLEAR_CONVERT_TYPE  0x00000010
39 
40 
41 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
42 					      struct dlm_lock_resource *res,
43 					      struct dlm_lock *lock,
44 					      struct dlm_lockstatus *lksb,
45 					      int *actions);
46 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
47 					      struct dlm_lock_resource *res,
48 					      struct dlm_lock *lock,
49 					      struct dlm_lockstatus *lksb,
50 					      int *actions);
51 
52 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
53 						 struct dlm_lock_resource *res,
54 						 struct dlm_lock *lock,
55 						 struct dlm_lockstatus *lksb,
56 						 int flags,
57 						 u8 owner);
58 
59 
60 /*
61  * according to the spec:
62  * http://opendlm.sourceforge.net/cvsmirror/opendlm/docs/dlmbook_final.pdf
63  *
64  *  flags & LKM_CANCEL != 0: must be converting or blocked
65  *  flags & LKM_CANCEL == 0: must be granted
66  *
67  * So to unlock a converting lock, you must first cancel the
68  * convert (passing LKM_CANCEL in flags), then call the unlock
69  * again (with no LKM_CANCEL in flags).
70  */
71 
72 
73 /*
74  * locking:
75  *   caller needs:  none
76  *   taken:         res->spinlock and lock->spinlock taken and dropped
77  *   held on exit:  none
78  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
79  * all callers should have taken an extra ref on lock coming in
80  */
81 static enum dlm_status dlmunlock_common(struct dlm_ctxt *dlm,
82 					struct dlm_lock_resource *res,
83 					struct dlm_lock *lock,
84 					struct dlm_lockstatus *lksb,
85 					int flags, int *call_ast,
86 					int master_node)
87 {
88 	enum dlm_status status;
89 	int actions = 0;
90 	int in_use;
91 	u8 owner;
92 	int recovery_wait = 0;
93 
94 	mlog(0, "master_node = %d, valblk = %d\n", master_node,
95 	     flags & LKM_VALBLK);
96 
97 	if (master_node)
98 		BUG_ON(res->owner != dlm->node_num);
99 	else
100 		BUG_ON(res->owner == dlm->node_num);
101 
102 	spin_lock(&dlm->ast_lock);
103 	/* We want to be sure that we're not freeing a lock
104 	 * that still has AST's pending... */
105 	in_use = !list_empty(&lock->ast_list);
106 	spin_unlock(&dlm->ast_lock);
107 	if (in_use && !(flags & LKM_CANCEL)) {
108 	       mlog(ML_ERROR, "lockres %.*s: Someone is calling dlmunlock "
109 		    "while waiting for an ast!", res->lockname.len,
110 		    res->lockname.name);
111 		return DLM_BADPARAM;
112 	}
113 
114 	spin_lock(&res->spinlock);
115 	if (res->state & DLM_LOCK_RES_IN_PROGRESS) {
116 		if (master_node && !(flags & LKM_CANCEL)) {
117 			mlog(ML_ERROR, "lockres in progress!\n");
118 			spin_unlock(&res->spinlock);
119 			return DLM_FORWARD;
120 		}
121 		/* ok for this to sleep if not in a network handler */
122 		__dlm_wait_on_lockres(res);
123 		res->state |= DLM_LOCK_RES_IN_PROGRESS;
124 	}
125 	spin_lock(&lock->spinlock);
126 
127 	if (res->state & DLM_LOCK_RES_RECOVERING) {
128 		status = DLM_RECOVERING;
129 		goto leave;
130 	}
131 
132 	if (res->state & DLM_LOCK_RES_MIGRATING) {
133 		status = DLM_MIGRATING;
134 		goto leave;
135 	}
136 
137 	/* see above for what the spec says about
138 	 * LKM_CANCEL and the lock queue state */
139 	if (flags & LKM_CANCEL)
140 		status = dlm_get_cancel_actions(dlm, res, lock, lksb, &actions);
141 	else
142 		status = dlm_get_unlock_actions(dlm, res, lock, lksb, &actions);
143 
144 	if (status != DLM_NORMAL && (status != DLM_CANCELGRANT || !master_node))
145 		goto leave;
146 
147 	/* By now this has been masked out of cancel requests. */
148 	if (flags & LKM_VALBLK) {
149 		/* make the final update to the lvb */
150 		if (master_node)
151 			memcpy(res->lvb, lksb->lvb, DLM_LVB_LEN);
152 		else
153 			flags |= LKM_PUT_LVB; /* let the send function
154 					       * handle it. */
155 	}
156 
157 	if (!master_node) {
158 		owner = res->owner;
159 		/* drop locks and send message */
160 		if (flags & LKM_CANCEL)
161 			lock->cancel_pending = 1;
162 		else
163 			lock->unlock_pending = 1;
164 		spin_unlock(&lock->spinlock);
165 		spin_unlock(&res->spinlock);
166 		status = dlm_send_remote_unlock_request(dlm, res, lock, lksb,
167 							flags, owner);
168 		spin_lock(&res->spinlock);
169 		spin_lock(&lock->spinlock);
170 		/* if the master told us the lock was already granted,
171 		 * let the ast handle all of these actions */
172 		if (status == DLM_CANCELGRANT) {
173 			actions &= ~(DLM_UNLOCK_REMOVE_LOCK|
174 				     DLM_UNLOCK_REGRANT_LOCK|
175 				     DLM_UNLOCK_CLEAR_CONVERT_TYPE);
176 		} else if (status == DLM_RECOVERING ||
177 			   status == DLM_MIGRATING ||
178 			   status == DLM_FORWARD ||
179 			   status == DLM_NOLOCKMGR
180 			   ) {
181 			/* must clear the actions because this unlock
182 			 * is about to be retried.  cannot free or do
183 			 * any list manipulation. */
184 			mlog(0, "%s:%.*s: clearing actions, %s\n",
185 			     dlm->name, res->lockname.len,
186 			     res->lockname.name,
187 			     status==DLM_RECOVERING?"recovering":
188 			     (status==DLM_MIGRATING?"migrating":
189 				(status == DLM_FORWARD ? "forward" :
190 						"nolockmanager")));
191 			actions = 0;
192 		}
193 		if (flags & LKM_CANCEL)
194 			lock->cancel_pending = 0;
195 		else {
196 			if (!lock->unlock_pending)
197 				recovery_wait = 1;
198 			else
199 				lock->unlock_pending = 0;
200 		}
201 	}
202 
203 	/* get an extra ref on lock.  if we are just switching
204 	 * lists here, we dont want the lock to go away. */
205 	dlm_lock_get(lock);
206 
207 	if (actions & DLM_UNLOCK_REMOVE_LOCK) {
208 		list_del_init(&lock->list);
209 		dlm_lock_put(lock);
210 	}
211 	if (actions & DLM_UNLOCK_REGRANT_LOCK) {
212 		dlm_lock_get(lock);
213 		list_add_tail(&lock->list, &res->granted);
214 	}
215 	if (actions & DLM_UNLOCK_CLEAR_CONVERT_TYPE) {
216 		mlog(0, "clearing convert_type at %smaster node\n",
217 		     master_node ? "" : "non-");
218 		lock->ml.convert_type = LKM_IVMODE;
219 	}
220 
221 	/* remove the extra ref on lock */
222 	dlm_lock_put(lock);
223 
224 leave:
225 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
226 	if (!dlm_lock_on_list(&res->converting, lock))
227 		BUG_ON(lock->ml.convert_type != LKM_IVMODE);
228 	else
229 		BUG_ON(lock->ml.convert_type == LKM_IVMODE);
230 	spin_unlock(&lock->spinlock);
231 	spin_unlock(&res->spinlock);
232 	wake_up(&res->wq);
233 
234 	if (recovery_wait) {
235 		spin_lock(&res->spinlock);
236 		/* Unlock request will directly succeed after owner dies,
237 		 * and the lock is already removed from grant list. We have to
238 		 * wait for RECOVERING done or we miss the chance to purge it
239 		 * since the removement is much faster than RECOVERING proc.
240 		 */
241 		__dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_RECOVERING);
242 		spin_unlock(&res->spinlock);
243 	}
244 
245 	/* let the caller's final dlm_lock_put handle the actual kfree */
246 	if (actions & DLM_UNLOCK_FREE_LOCK) {
247 		/* this should always be coupled with list removal */
248 		BUG_ON(!(actions & DLM_UNLOCK_REMOVE_LOCK));
249 		mlog(0, "lock %u:%llu should be gone now! refs=%d\n",
250 		     dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
251 		     dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
252 		     kref_read(&lock->lock_refs)-1);
253 		dlm_lock_put(lock);
254 	}
255 	if (actions & DLM_UNLOCK_CALL_AST)
256 		*call_ast = 1;
257 
258 	/* if cancel or unlock succeeded, lvb work is done */
259 	if (status == DLM_NORMAL)
260 		lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
261 
262 	return status;
263 }
264 
265 void dlm_commit_pending_unlock(struct dlm_lock_resource *res,
266 			       struct dlm_lock *lock)
267 {
268 	/* leave DLM_LKSB_PUT_LVB on the lksb so any final
269 	 * update of the lvb will be sent to the new master */
270 	list_del_init(&lock->list);
271 }
272 
273 void dlm_commit_pending_cancel(struct dlm_lock_resource *res,
274 			       struct dlm_lock *lock)
275 {
276 	list_move_tail(&lock->list, &res->granted);
277 	lock->ml.convert_type = LKM_IVMODE;
278 }
279 
280 
281 static inline enum dlm_status dlmunlock_master(struct dlm_ctxt *dlm,
282 					  struct dlm_lock_resource *res,
283 					  struct dlm_lock *lock,
284 					  struct dlm_lockstatus *lksb,
285 					  int flags,
286 					  int *call_ast)
287 {
288 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 1);
289 }
290 
291 static inline enum dlm_status dlmunlock_remote(struct dlm_ctxt *dlm,
292 					  struct dlm_lock_resource *res,
293 					  struct dlm_lock *lock,
294 					  struct dlm_lockstatus *lksb,
295 					  int flags, int *call_ast)
296 {
297 	return dlmunlock_common(dlm, res, lock, lksb, flags, call_ast, 0);
298 }
299 
300 /*
301  * locking:
302  *   caller needs:  none
303  *   taken:         none
304  *   held on exit:  none
305  * returns: DLM_NORMAL, DLM_NOLOCKMGR, status from network
306  */
307 static enum dlm_status dlm_send_remote_unlock_request(struct dlm_ctxt *dlm,
308 						 struct dlm_lock_resource *res,
309 						 struct dlm_lock *lock,
310 						 struct dlm_lockstatus *lksb,
311 						 int flags,
312 						 u8 owner)
313 {
314 	struct dlm_unlock_lock unlock;
315 	int tmpret;
316 	enum dlm_status ret;
317 	int status = 0;
318 	struct kvec vec[2];
319 	size_t veclen = 1;
320 
321 	mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
322 
323 	if (owner == dlm->node_num) {
324 		/* ended up trying to contact ourself.  this means
325 		 * that the lockres had been remote but became local
326 		 * via a migration.  just retry it, now as local */
327 		mlog(0, "%s:%.*s: this node became the master due to a "
328 		     "migration, re-evaluate now\n", dlm->name,
329 		     res->lockname.len, res->lockname.name);
330 		return DLM_FORWARD;
331 	}
332 
333 	memset(&unlock, 0, sizeof(unlock));
334 	unlock.node_idx = dlm->node_num;
335 	unlock.flags = cpu_to_be32(flags);
336 	unlock.cookie = lock->ml.cookie;
337 	unlock.namelen = res->lockname.len;
338 	memcpy(unlock.name, res->lockname.name, unlock.namelen);
339 
340 	vec[0].iov_len = sizeof(struct dlm_unlock_lock);
341 	vec[0].iov_base = &unlock;
342 
343 	if (flags & LKM_PUT_LVB) {
344 		/* extra data to send if we are updating lvb */
345 		vec[1].iov_len = DLM_LVB_LEN;
346 		vec[1].iov_base = lock->lksb->lvb;
347 		veclen++;
348 	}
349 
350 	tmpret = o2net_send_message_vec(DLM_UNLOCK_LOCK_MSG, dlm->key,
351 					vec, veclen, owner, &status);
352 	if (tmpret >= 0) {
353 		// successfully sent and received
354 		if (status == DLM_FORWARD)
355 			mlog(0, "master was in-progress.  retry\n");
356 		ret = status;
357 	} else {
358 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
359 		     "node %u\n", tmpret, DLM_UNLOCK_LOCK_MSG, dlm->key, owner);
360 		if (dlm_is_host_down(tmpret)) {
361 			/* NOTE: this seems strange, but it is what we want.
362 			 * when the master goes down during a cancel or
363 			 * unlock, the recovery code completes the operation
364 			 * as if the master had not died, then passes the
365 			 * updated state to the recovery master.  this thread
366 			 * just needs to finish out the operation and call
367 			 * the unlockast. */
368 			if (dlm_is_node_dead(dlm, owner))
369 				ret = DLM_NORMAL;
370 			else
371 				ret = DLM_NOLOCKMGR;
372 		} else {
373 			/* something bad.  this will BUG in ocfs2 */
374 			ret = dlm_err_to_dlm_status(tmpret);
375 		}
376 	}
377 
378 	return ret;
379 }
380 
381 /*
382  * locking:
383  *   caller needs:  none
384  *   taken:         takes and drops res->spinlock
385  *   held on exit:  none
386  * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID,
387  *          return value from dlmunlock_master
388  */
389 int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data,
390 			    void **ret_data)
391 {
392 	struct dlm_ctxt *dlm = data;
393 	struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf;
394 	struct dlm_lock_resource *res = NULL;
395 	struct dlm_lock *lock = NULL, *iter;
396 	enum dlm_status status = DLM_NORMAL;
397 	int i;
398 	struct dlm_lockstatus *lksb = NULL;
399 	int ignore;
400 	u32 flags;
401 	struct list_head *queue;
402 
403 	flags = be32_to_cpu(unlock->flags);
404 
405 	if (flags & LKM_GET_LVB) {
406 		mlog(ML_ERROR, "bad args!  GET_LVB specified on unlock!\n");
407 		return DLM_BADARGS;
408 	}
409 
410 	if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == (LKM_PUT_LVB|LKM_CANCEL)) {
411 		mlog(ML_ERROR, "bad args!  cannot modify lvb on a CANCEL "
412 		     "request!\n");
413 		return DLM_BADARGS;
414 	}
415 
416 	if (unlock->namelen > DLM_LOCKID_NAME_MAX) {
417 		mlog(ML_ERROR, "Invalid name length in unlock handler!\n");
418 		return DLM_IVBUFLEN;
419 	}
420 
421 	if (!dlm_grab(dlm))
422 		return DLM_FORWARD;
423 
424 	mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
425 			"Domain %s not fully joined!\n", dlm->name);
426 
427 	mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none");
428 
429 	res = dlm_lookup_lockres(dlm, unlock->name, unlock->namelen);
430 	if (!res) {
431 		/* We assume here that a no lock resource simply means
432 		 * it was migrated away and destroyed before the other
433 		 * node could detect it. */
434 		mlog(0, "returning DLM_FORWARD -- res no longer exists\n");
435 		status = DLM_FORWARD;
436 		goto not_found;
437 	}
438 
439 	queue=&res->granted;
440 	spin_lock(&res->spinlock);
441 	if (res->state & DLM_LOCK_RES_RECOVERING) {
442 		spin_unlock(&res->spinlock);
443 		mlog(0, "returning DLM_RECOVERING\n");
444 		status = DLM_RECOVERING;
445 		goto leave;
446 	}
447 
448 	if (res->state & DLM_LOCK_RES_MIGRATING) {
449 		spin_unlock(&res->spinlock);
450 		mlog(0, "returning DLM_MIGRATING\n");
451 		status = DLM_MIGRATING;
452 		goto leave;
453 	}
454 
455 	if (res->owner != dlm->node_num) {
456 		spin_unlock(&res->spinlock);
457 		mlog(0, "returning DLM_FORWARD -- not master\n");
458 		status = DLM_FORWARD;
459 		goto leave;
460 	}
461 
462 	for (i=0; i<3; i++) {
463 		list_for_each_entry(iter, queue, list) {
464 			if (iter->ml.cookie == unlock->cookie &&
465 			    iter->ml.node == unlock->node_idx) {
466 				dlm_lock_get(iter);
467 				lock = iter;
468 				break;
469 			}
470 		}
471 		if (lock)
472 			break;
473 		/* scan granted -> converting -> blocked queues */
474 		queue++;
475 	}
476 	spin_unlock(&res->spinlock);
477 	if (!lock) {
478 		status = DLM_IVLOCKID;
479 		goto not_found;
480 	}
481 
482 	/* lock was found on queue */
483 	lksb = lock->lksb;
484 	if (flags & (LKM_VALBLK|LKM_PUT_LVB) &&
485 	    lock->ml.type != LKM_EXMODE)
486 		flags &= ~(LKM_VALBLK|LKM_PUT_LVB);
487 
488 	/* unlockast only called on originating node */
489 	if (flags & LKM_PUT_LVB) {
490 		lksb->flags |= DLM_LKSB_PUT_LVB;
491 		memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN);
492 	}
493 
494 	/* if this is in-progress, propagate the DLM_FORWARD
495 	 * all the way back out */
496 	status = dlmunlock_master(dlm, res, lock, lksb, flags, &ignore);
497 	if (status == DLM_FORWARD)
498 		mlog(0, "lockres is in progress\n");
499 
500 	if (flags & LKM_PUT_LVB)
501 		lksb->flags &= ~DLM_LKSB_PUT_LVB;
502 
503 	dlm_lockres_calc_usage(dlm, res);
504 	dlm_kick_thread(dlm, res);
505 
506 not_found:
507 	if (!lock)
508 		mlog(ML_ERROR, "failed to find lock to unlock! "
509 			       "cookie=%u:%llu\n",
510 		     dlm_get_lock_cookie_node(be64_to_cpu(unlock->cookie)),
511 		     dlm_get_lock_cookie_seq(be64_to_cpu(unlock->cookie)));
512 	else
513 		dlm_lock_put(lock);
514 
515 leave:
516 	if (res)
517 		dlm_lockres_put(res);
518 
519 	dlm_put(dlm);
520 
521 	return status;
522 }
523 
524 
525 static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm,
526 					      struct dlm_lock_resource *res,
527 					      struct dlm_lock *lock,
528 					      struct dlm_lockstatus *lksb,
529 					      int *actions)
530 {
531 	enum dlm_status status;
532 
533 	if (dlm_lock_on_list(&res->blocked, lock)) {
534 		/* cancel this outright */
535 		status = DLM_NORMAL;
536 		*actions = (DLM_UNLOCK_CALL_AST |
537 			    DLM_UNLOCK_REMOVE_LOCK);
538 	} else if (dlm_lock_on_list(&res->converting, lock)) {
539 		/* cancel the request, put back on granted */
540 		status = DLM_NORMAL;
541 		*actions = (DLM_UNLOCK_CALL_AST |
542 			    DLM_UNLOCK_REMOVE_LOCK |
543 			    DLM_UNLOCK_REGRANT_LOCK |
544 			    DLM_UNLOCK_CLEAR_CONVERT_TYPE);
545 	} else if (dlm_lock_on_list(&res->granted, lock)) {
546 		/* too late, already granted. */
547 		status = DLM_CANCELGRANT;
548 		*actions = DLM_UNLOCK_CALL_AST;
549 	} else {
550 		mlog(ML_ERROR, "lock to cancel is not on any list!\n");
551 		status = DLM_IVLOCKID;
552 		*actions = 0;
553 	}
554 	return status;
555 }
556 
557 static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm,
558 					      struct dlm_lock_resource *res,
559 					      struct dlm_lock *lock,
560 					      struct dlm_lockstatus *lksb,
561 					      int *actions)
562 {
563 	enum dlm_status status;
564 
565 	/* unlock request */
566 	if (!dlm_lock_on_list(&res->granted, lock)) {
567 		status = DLM_DENIED;
568 		dlm_error(status);
569 		*actions = 0;
570 	} else {
571 		/* unlock granted lock */
572 		status = DLM_NORMAL;
573 		*actions = (DLM_UNLOCK_FREE_LOCK |
574 			    DLM_UNLOCK_CALL_AST |
575 			    DLM_UNLOCK_REMOVE_LOCK);
576 	}
577 	return status;
578 }
579 
580 /* there seems to be no point in doing this async
581  * since (even for the remote case) there is really
582  * no work to queue up... so just do it and fire the
583  * unlockast by hand when done... */
584 enum dlm_status dlmunlock(struct dlm_ctxt *dlm, struct dlm_lockstatus *lksb,
585 			  int flags, dlm_astunlockfunc_t *unlockast, void *data)
586 {
587 	enum dlm_status status;
588 	struct dlm_lock_resource *res;
589 	struct dlm_lock *lock = NULL;
590 	int call_ast, is_master;
591 
592 	if (!lksb) {
593 		dlm_error(DLM_BADARGS);
594 		return DLM_BADARGS;
595 	}
596 
597 	if (flags & ~(LKM_CANCEL | LKM_VALBLK | LKM_INVVALBLK)) {
598 		dlm_error(DLM_BADPARAM);
599 		return DLM_BADPARAM;
600 	}
601 
602 	if ((flags & (LKM_VALBLK | LKM_CANCEL)) == (LKM_VALBLK | LKM_CANCEL)) {
603 		mlog(0, "VALBLK given with CANCEL: ignoring VALBLK\n");
604 		flags &= ~LKM_VALBLK;
605 	}
606 
607 	if (!lksb->lockid || !lksb->lockid->lockres) {
608 		dlm_error(DLM_BADPARAM);
609 		return DLM_BADPARAM;
610 	}
611 
612 	lock = lksb->lockid;
613 	BUG_ON(!lock);
614 	dlm_lock_get(lock);
615 
616 	res = lock->lockres;
617 	BUG_ON(!res);
618 	dlm_lockres_get(res);
619 retry:
620 	call_ast = 0;
621 	/* need to retry up here because owner may have changed */
622 	mlog(0, "lock=%p res=%p\n", lock, res);
623 
624 	spin_lock(&res->spinlock);
625 	is_master = (res->owner == dlm->node_num);
626 	if (flags & LKM_VALBLK && lock->ml.type != LKM_EXMODE)
627 		flags &= ~LKM_VALBLK;
628 	spin_unlock(&res->spinlock);
629 
630 	if (is_master) {
631 		status = dlmunlock_master(dlm, res, lock, lksb, flags,
632 					  &call_ast);
633 		mlog(0, "done calling dlmunlock_master: returned %d, "
634 		     "call_ast is %d\n", status, call_ast);
635 	} else {
636 		status = dlmunlock_remote(dlm, res, lock, lksb, flags,
637 					  &call_ast);
638 		mlog(0, "done calling dlmunlock_remote: returned %d, "
639 		     "call_ast is %d\n", status, call_ast);
640 	}
641 
642 	if (status == DLM_RECOVERING ||
643 	    status == DLM_MIGRATING ||
644 	    status == DLM_FORWARD ||
645 	    status == DLM_NOLOCKMGR) {
646 
647 		/* We want to go away for a tiny bit to allow recovery
648 		 * / migration to complete on this resource. I don't
649 		 * know of any wait queue we could sleep on as this
650 		 * may be happening on another node. Perhaps the
651 		 * proper solution is to queue up requests on the
652 		 * other end? */
653 
654 		/* do we want to yield(); ?? */
655 		msleep(50);
656 
657 		mlog(0, "retrying unlock due to pending recovery/"
658 		     "migration/in-progress/reconnect\n");
659 		goto retry;
660 	}
661 
662 	if (call_ast) {
663 		mlog(0, "calling unlockast(%p, %d)\n", data, status);
664 		if (is_master) {
665 			/* it is possible that there is one last bast
666 			 * pending.  make sure it is flushed, then
667 			 * call the unlockast.
668 			 * not an issue if this is a mastered remotely,
669 			 * since this lock has been removed from the
670 			 * lockres queues and cannot be found. */
671 			dlm_kick_thread(dlm, NULL);
672 			wait_event(dlm->ast_wq,
673 				   dlm_lock_basts_flushed(dlm, lock));
674 		}
675 		(*unlockast)(data, status);
676 	}
677 
678 	if (status == DLM_CANCELGRANT)
679 		status = DLM_NORMAL;
680 
681 	if (status == DLM_NORMAL) {
682 		mlog(0, "kicking the thread\n");
683 		dlm_kick_thread(dlm, res);
684 	} else
685 		dlm_error(status);
686 
687 	dlm_lockres_calc_usage(dlm, res);
688 	dlm_lockres_put(res);
689 	dlm_lock_put(lock);
690 
691 	mlog(0, "returning status=%d!\n", status);
692 	return status;
693 }
694 EXPORT_SYMBOL_GPL(dlmunlock);
695 
696