xref: /linux/fs/dlm/lock.c (revision c537b994505099b7197e7d3125b942ecbcc51eb6)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 				    struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 
89 /*
90  * Lock compatibilty matrix - thanks Steve
91  * UN = Unlocked state. Not really a state, used as a flag
92  * PD = Padding. Used to make the matrix a nice power of two in size
93  * Other states are the same as the VMS DLM.
94  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
95  */
96 
97 static const int __dlm_compat_matrix[8][8] = {
98       /* UN NL CR CW PR PW EX PD */
99         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
101         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
102         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
103         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
104         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
105         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
106         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
107 };
108 
109 /*
110  * This defines the direction of transfer of LVB data.
111  * Granted mode is the row; requested mode is the column.
112  * Usage: matrix[grmode+1][rqmode+1]
113  * 1 = LVB is returned to the caller
114  * 0 = LVB is written to the resource
115  * -1 = nothing happens to the LVB
116  */
117 
118 const int dlm_lvb_operations[8][8] = {
119         /* UN   NL  CR  CW  PR  PW  EX  PD*/
120         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
121         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
122         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
123         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
124         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
125         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
126         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
128 };
129 
130 #define modes_compat(gr, rq) \
131 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132 
133 int dlm_modes_compat(int mode1, int mode2)
134 {
135 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
136 }
137 
138 /*
139  * Compatibility matrix for conversions with QUECVT set.
140  * Granted mode is the row; requested mode is the column.
141  * Usage: matrix[grmode+1][rqmode+1]
142  */
143 
144 static const int __quecvt_compat_matrix[8][8] = {
145       /* UN NL CR CW PR PW EX PD */
146         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
147         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
148         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
149         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
150         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
151         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
153         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
154 };
155 
156 void dlm_print_lkb(struct dlm_lkb *lkb)
157 {
158 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
163 }
164 
165 void dlm_print_rsb(struct dlm_rsb *r)
166 {
167 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
169 	       r->res_recover_locks_count, r->res_name);
170 }
171 
172 void dlm_dump_rsb(struct dlm_rsb *r)
173 {
174 	struct dlm_lkb *lkb;
175 
176 	dlm_print_rsb(r);
177 
178 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180 	printk(KERN_ERR "rsb lookup list\n");
181 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182 		dlm_print_lkb(lkb);
183 	printk(KERN_ERR "rsb grant queue:\n");
184 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb convert queue:\n");
187 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb wait queue:\n");
190 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 }
193 
194 /* Threads cannot use the lockspace while it's being recovered */
195 
196 static inline void lock_recovery(struct dlm_ls *ls)
197 {
198 	down_read(&ls->ls_in_recovery);
199 }
200 
201 static inline void unlock_recovery(struct dlm_ls *ls)
202 {
203 	up_read(&ls->ls_in_recovery);
204 }
205 
206 static inline int lock_recovery_try(struct dlm_ls *ls)
207 {
208 	return down_read_trylock(&ls->ls_in_recovery);
209 }
210 
211 static inline int can_be_queued(struct dlm_lkb *lkb)
212 {
213 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
214 }
215 
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
217 {
218 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
219 }
220 
221 static inline int is_demoted(struct dlm_lkb *lkb)
222 {
223 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224 }
225 
226 static inline int is_remote(struct dlm_rsb *r)
227 {
228 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229 	return !!r->res_nodeid;
230 }
231 
232 static inline int is_process_copy(struct dlm_lkb *lkb)
233 {
234 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
235 }
236 
237 static inline int is_master_copy(struct dlm_lkb *lkb)
238 {
239 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
242 }
243 
244 static inline int middle_conversion(struct dlm_lkb *lkb)
245 {
246 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
248 		return 1;
249 	return 0;
250 }
251 
252 static inline int down_conversion(struct dlm_lkb *lkb)
253 {
254 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255 }
256 
257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258 {
259 	if (is_master_copy(lkb))
260 		return;
261 
262 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
263 
264 	lkb->lkb_lksb->sb_status = rv;
265 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
266 
267 	dlm_add_ast(lkb, AST_COMP);
268 }
269 
270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271 {
272 	if (is_master_copy(lkb))
273 		send_bast(r, lkb, rqmode);
274 	else {
275 		lkb->lkb_bastmode = rqmode;
276 		dlm_add_ast(lkb, AST_BAST);
277 	}
278 }
279 
280 /*
281  * Basic operations on rsb's and lkb's
282  */
283 
284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
285 {
286 	struct dlm_rsb *r;
287 
288 	r = allocate_rsb(ls, len);
289 	if (!r)
290 		return NULL;
291 
292 	r->res_ls = ls;
293 	r->res_length = len;
294 	memcpy(r->res_name, name, len);
295 	mutex_init(&r->res_mutex);
296 
297 	INIT_LIST_HEAD(&r->res_lookup);
298 	INIT_LIST_HEAD(&r->res_grantqueue);
299 	INIT_LIST_HEAD(&r->res_convertqueue);
300 	INIT_LIST_HEAD(&r->res_waitqueue);
301 	INIT_LIST_HEAD(&r->res_root_list);
302 	INIT_LIST_HEAD(&r->res_recover_list);
303 
304 	return r;
305 }
306 
307 static int search_rsb_list(struct list_head *head, char *name, int len,
308 			   unsigned int flags, struct dlm_rsb **r_ret)
309 {
310 	struct dlm_rsb *r;
311 	int error = 0;
312 
313 	list_for_each_entry(r, head, res_hashchain) {
314 		if (len == r->res_length && !memcmp(name, r->res_name, len))
315 			goto found;
316 	}
317 	return -EBADR;
318 
319  found:
320 	if (r->res_nodeid && (flags & R_MASTER))
321 		error = -ENOTBLK;
322 	*r_ret = r;
323 	return error;
324 }
325 
326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327 		       unsigned int flags, struct dlm_rsb **r_ret)
328 {
329 	struct dlm_rsb *r;
330 	int error;
331 
332 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
333 	if (!error) {
334 		kref_get(&r->res_ref);
335 		goto out;
336 	}
337 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
338 	if (error)
339 		goto out;
340 
341 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
342 
343 	if (dlm_no_directory(ls))
344 		goto out;
345 
346 	if (r->res_nodeid == -1) {
347 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348 		r->res_first_lkid = 0;
349 	} else if (r->res_nodeid > 0) {
350 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351 		r->res_first_lkid = 0;
352 	} else {
353 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
355 	}
356  out:
357 	*r_ret = r;
358 	return error;
359 }
360 
361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362 		      unsigned int flags, struct dlm_rsb **r_ret)
363 {
364 	int error;
365 	write_lock(&ls->ls_rsbtbl[b].lock);
366 	error = _search_rsb(ls, name, len, b, flags, r_ret);
367 	write_unlock(&ls->ls_rsbtbl[b].lock);
368 	return error;
369 }
370 
371 /*
372  * Find rsb in rsbtbl and potentially create/add one
373  *
374  * Delaying the release of rsb's has a similar benefit to applications keeping
375  * NL locks on an rsb, but without the guarantee that the cached master value
376  * will still be valid when the rsb is reused.  Apps aren't always smart enough
377  * to keep NL locks on an rsb that they may lock again shortly; this can lead
378  * to excessive master lookups and removals if we don't delay the release.
379  *
380  * Searching for an rsb means looking through both the normal list and toss
381  * list.  When found on the toss list the rsb is moved to the normal list with
382  * ref count of 1; when found on normal list the ref count is incremented.
383  */
384 
385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386 		    unsigned int flags, struct dlm_rsb **r_ret)
387 {
388 	struct dlm_rsb *r, *tmp;
389 	uint32_t hash, bucket;
390 	int error = 0;
391 
392 	if (dlm_no_directory(ls))
393 		flags |= R_CREATE;
394 
395 	hash = jhash(name, namelen, 0);
396 	bucket = hash & (ls->ls_rsbtbl_size - 1);
397 
398 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
399 	if (!error)
400 		goto out;
401 
402 	if (error == -EBADR && !(flags & R_CREATE))
403 		goto out;
404 
405 	/* the rsb was found but wasn't a master copy */
406 	if (error == -ENOTBLK)
407 		goto out;
408 
409 	error = -ENOMEM;
410 	r = create_rsb(ls, name, namelen);
411 	if (!r)
412 		goto out;
413 
414 	r->res_hash = hash;
415 	r->res_bucket = bucket;
416 	r->res_nodeid = -1;
417 	kref_init(&r->res_ref);
418 
419 	/* With no directory, the master can be set immediately */
420 	if (dlm_no_directory(ls)) {
421 		int nodeid = dlm_dir_nodeid(r);
422 		if (nodeid == dlm_our_nodeid())
423 			nodeid = 0;
424 		r->res_nodeid = nodeid;
425 	}
426 
427 	write_lock(&ls->ls_rsbtbl[bucket].lock);
428 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
429 	if (!error) {
430 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
431 		free_rsb(r);
432 		r = tmp;
433 		goto out;
434 	}
435 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
437 	error = 0;
438  out:
439 	*r_ret = r;
440 	return error;
441 }
442 
443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444 		 unsigned int flags, struct dlm_rsb **r_ret)
445 {
446 	return find_rsb(ls, name, namelen, flags, r_ret);
447 }
448 
449 /* This is only called to add a reference when the code already holds
450    a valid reference to the rsb, so there's no need for locking. */
451 
452 static inline void hold_rsb(struct dlm_rsb *r)
453 {
454 	kref_get(&r->res_ref);
455 }
456 
457 void dlm_hold_rsb(struct dlm_rsb *r)
458 {
459 	hold_rsb(r);
460 }
461 
462 static void toss_rsb(struct kref *kref)
463 {
464 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465 	struct dlm_ls *ls = r->res_ls;
466 
467 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468 	kref_init(&r->res_ref);
469 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470 	r->res_toss_time = jiffies;
471 	if (r->res_lvbptr) {
472 		free_lvb(r->res_lvbptr);
473 		r->res_lvbptr = NULL;
474 	}
475 }
476 
477 /* When all references to the rsb are gone it's transfered to
478    the tossed list for later disposal. */
479 
480 static void put_rsb(struct dlm_rsb *r)
481 {
482 	struct dlm_ls *ls = r->res_ls;
483 	uint32_t bucket = r->res_bucket;
484 
485 	write_lock(&ls->ls_rsbtbl[bucket].lock);
486 	kref_put(&r->res_ref, toss_rsb);
487 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
488 }
489 
490 void dlm_put_rsb(struct dlm_rsb *r)
491 {
492 	put_rsb(r);
493 }
494 
495 /* See comment for unhold_lkb */
496 
497 static void unhold_rsb(struct dlm_rsb *r)
498 {
499 	int rv;
500 	rv = kref_put(&r->res_ref, toss_rsb);
501 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
502 }
503 
504 static void kill_rsb(struct kref *kref)
505 {
506 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
507 
508 	/* All work is done after the return from kref_put() so we
509 	   can release the write_lock before the remove and free. */
510 
511 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
517 }
518 
519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520    The rsb must exist as long as any lkb's for it do. */
521 
522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
523 {
524 	hold_rsb(r);
525 	lkb->lkb_resource = r;
526 }
527 
528 static void detach_lkb(struct dlm_lkb *lkb)
529 {
530 	if (lkb->lkb_resource) {
531 		put_rsb(lkb->lkb_resource);
532 		lkb->lkb_resource = NULL;
533 	}
534 }
535 
536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
537 {
538 	struct dlm_lkb *lkb, *tmp;
539 	uint32_t lkid = 0;
540 	uint16_t bucket;
541 
542 	lkb = allocate_lkb(ls);
543 	if (!lkb)
544 		return -ENOMEM;
545 
546 	lkb->lkb_nodeid = -1;
547 	lkb->lkb_grmode = DLM_LOCK_IV;
548 	kref_init(&lkb->lkb_ref);
549 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
550 
551 	get_random_bytes(&bucket, sizeof(bucket));
552 	bucket &= (ls->ls_lkbtbl_size - 1);
553 
554 	write_lock(&ls->ls_lkbtbl[bucket].lock);
555 
556 	/* counter can roll over so we must verify lkid is not in use */
557 
558 	while (lkid == 0) {
559 		lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
560 
561 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 				    lkb_idtbl_list) {
563 			if (tmp->lkb_id != lkid)
564 				continue;
565 			lkid = 0;
566 			break;
567 		}
568 	}
569 
570 	lkb->lkb_id = lkid;
571 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
573 
574 	*lkb_ret = lkb;
575 	return 0;
576 }
577 
578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579 {
580 	uint16_t bucket = lkid & 0xFFFF;
581 	struct dlm_lkb *lkb;
582 
583 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 		if (lkb->lkb_id == lkid)
585 			return lkb;
586 	}
587 	return NULL;
588 }
589 
590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591 {
592 	struct dlm_lkb *lkb;
593 	uint16_t bucket = lkid & 0xFFFF;
594 
595 	if (bucket >= ls->ls_lkbtbl_size)
596 		return -EBADSLT;
597 
598 	read_lock(&ls->ls_lkbtbl[bucket].lock);
599 	lkb = __find_lkb(ls, lkid);
600 	if (lkb)
601 		kref_get(&lkb->lkb_ref);
602 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
603 
604 	*lkb_ret = lkb;
605 	return lkb ? 0 : -ENOENT;
606 }
607 
608 static void kill_lkb(struct kref *kref)
609 {
610 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
611 
612 	/* All work is done after the return from kref_put() so we
613 	   can release the write_lock before the detach_lkb */
614 
615 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
616 }
617 
618 /* __put_lkb() is used when an lkb may not have an rsb attached to
619    it so we need to provide the lockspace explicitly */
620 
621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
622 {
623 	uint16_t bucket = lkb->lkb_id & 0xFFFF;
624 
625 	write_lock(&ls->ls_lkbtbl[bucket].lock);
626 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627 		list_del(&lkb->lkb_idtbl_list);
628 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
629 
630 		detach_lkb(lkb);
631 
632 		/* for local/process lkbs, lvbptr points to caller's lksb */
633 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
634 			free_lvb(lkb->lkb_lvbptr);
635 		free_lkb(lkb);
636 		return 1;
637 	} else {
638 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
639 		return 0;
640 	}
641 }
642 
643 int dlm_put_lkb(struct dlm_lkb *lkb)
644 {
645 	struct dlm_ls *ls;
646 
647 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
649 
650 	ls = lkb->lkb_resource->res_ls;
651 	return __put_lkb(ls, lkb);
652 }
653 
654 /* This is only called to add a reference when the code already holds
655    a valid reference to the lkb, so there's no need for locking. */
656 
657 static inline void hold_lkb(struct dlm_lkb *lkb)
658 {
659 	kref_get(&lkb->lkb_ref);
660 }
661 
662 /* This is called when we need to remove a reference and are certain
663    it's not the last ref.  e.g. del_lkb is always called between a
664    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665    put_lkb would work fine, but would involve unnecessary locking */
666 
667 static inline void unhold_lkb(struct dlm_lkb *lkb)
668 {
669 	int rv;
670 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
671 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
672 }
673 
674 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
675 			    int mode)
676 {
677 	struct dlm_lkb *lkb = NULL;
678 
679 	list_for_each_entry(lkb, head, lkb_statequeue)
680 		if (lkb->lkb_rqmode < mode)
681 			break;
682 
683 	if (!lkb)
684 		list_add_tail(new, head);
685 	else
686 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
687 }
688 
689 /* add/remove lkb to rsb's grant/convert/wait queue */
690 
691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
692 {
693 	kref_get(&lkb->lkb_ref);
694 
695 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
696 
697 	lkb->lkb_status = status;
698 
699 	switch (status) {
700 	case DLM_LKSTS_WAITING:
701 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
703 		else
704 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
705 		break;
706 	case DLM_LKSTS_GRANTED:
707 		/* convention says granted locks kept in order of grmode */
708 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
709 				lkb->lkb_grmode);
710 		break;
711 	case DLM_LKSTS_CONVERT:
712 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
714 		else
715 			list_add_tail(&lkb->lkb_statequeue,
716 				      &r->res_convertqueue);
717 		break;
718 	default:
719 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
720 	}
721 }
722 
723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
724 {
725 	lkb->lkb_status = 0;
726 	list_del(&lkb->lkb_statequeue);
727 	unhold_lkb(lkb);
728 }
729 
730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
731 {
732 	hold_lkb(lkb);
733 	del_lkb(r, lkb);
734 	add_lkb(r, lkb, sts);
735 	unhold_lkb(lkb);
736 }
737 
738 /* add/remove lkb from global waiters list of lkb's waiting for
739    a reply from a remote node */
740 
741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
742 {
743 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744 
745 	mutex_lock(&ls->ls_waiters_mutex);
746 	if (lkb->lkb_wait_type) {
747 		log_print("add_to_waiters error %d", lkb->lkb_wait_type);
748 		goto out;
749 	}
750 	lkb->lkb_wait_type = mstype;
751 	kref_get(&lkb->lkb_ref);
752 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753  out:
754 	mutex_unlock(&ls->ls_waiters_mutex);
755 }
756 
757 /* We clear the RESEND flag because we might be taking an lkb off the waiters
758    list as part of process_requestqueue (e.g. a lookup that has an optimized
759    request reply on the requestqueue) between dlm_recover_waiters_pre() which
760    set RESEND and dlm_recover_waiters_post() */
761 
762 static int _remove_from_waiters(struct dlm_lkb *lkb)
763 {
764 	int error = 0;
765 
766 	if (!lkb->lkb_wait_type) {
767 		log_print("remove_from_waiters error");
768 		error = -EINVAL;
769 		goto out;
770 	}
771 	lkb->lkb_wait_type = 0;
772 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
773 	list_del(&lkb->lkb_wait_reply);
774 	unhold_lkb(lkb);
775  out:
776 	return error;
777 }
778 
779 static int remove_from_waiters(struct dlm_lkb *lkb)
780 {
781 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782 	int error;
783 
784 	mutex_lock(&ls->ls_waiters_mutex);
785 	error = _remove_from_waiters(lkb);
786 	mutex_unlock(&ls->ls_waiters_mutex);
787 	return error;
788 }
789 
790 static void dir_remove(struct dlm_rsb *r)
791 {
792 	int to_nodeid;
793 
794 	if (dlm_no_directory(r->res_ls))
795 		return;
796 
797 	to_nodeid = dlm_dir_nodeid(r);
798 	if (to_nodeid != dlm_our_nodeid())
799 		send_remove(r);
800 	else
801 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
802 				     r->res_name, r->res_length);
803 }
804 
805 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
806    found since they are in order of newest to oldest? */
807 
808 static int shrink_bucket(struct dlm_ls *ls, int b)
809 {
810 	struct dlm_rsb *r;
811 	int count = 0, found;
812 
813 	for (;;) {
814 		found = 0;
815 		write_lock(&ls->ls_rsbtbl[b].lock);
816 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
817 					    res_hashchain) {
818 			if (!time_after_eq(jiffies, r->res_toss_time +
819 					   dlm_config.ci_toss_secs * HZ))
820 				continue;
821 			found = 1;
822 			break;
823 		}
824 
825 		if (!found) {
826 			write_unlock(&ls->ls_rsbtbl[b].lock);
827 			break;
828 		}
829 
830 		if (kref_put(&r->res_ref, kill_rsb)) {
831 			list_del(&r->res_hashchain);
832 			write_unlock(&ls->ls_rsbtbl[b].lock);
833 
834 			if (is_master(r))
835 				dir_remove(r);
836 			free_rsb(r);
837 			count++;
838 		} else {
839 			write_unlock(&ls->ls_rsbtbl[b].lock);
840 			log_error(ls, "tossed rsb in use %s", r->res_name);
841 		}
842 	}
843 
844 	return count;
845 }
846 
847 void dlm_scan_rsbs(struct dlm_ls *ls)
848 {
849 	int i;
850 
851 	if (dlm_locking_stopped(ls))
852 		return;
853 
854 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
855 		shrink_bucket(ls, i);
856 		cond_resched();
857 	}
858 }
859 
860 /* lkb is master or local copy */
861 
862 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
863 {
864 	int b, len = r->res_ls->ls_lvblen;
865 
866 	/* b=1 lvb returned to caller
867 	   b=0 lvb written to rsb or invalidated
868 	   b=-1 do nothing */
869 
870 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
871 
872 	if (b == 1) {
873 		if (!lkb->lkb_lvbptr)
874 			return;
875 
876 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
877 			return;
878 
879 		if (!r->res_lvbptr)
880 			return;
881 
882 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
883 		lkb->lkb_lvbseq = r->res_lvbseq;
884 
885 	} else if (b == 0) {
886 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
887 			rsb_set_flag(r, RSB_VALNOTVALID);
888 			return;
889 		}
890 
891 		if (!lkb->lkb_lvbptr)
892 			return;
893 
894 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
895 			return;
896 
897 		if (!r->res_lvbptr)
898 			r->res_lvbptr = allocate_lvb(r->res_ls);
899 
900 		if (!r->res_lvbptr)
901 			return;
902 
903 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
904 		r->res_lvbseq++;
905 		lkb->lkb_lvbseq = r->res_lvbseq;
906 		rsb_clear_flag(r, RSB_VALNOTVALID);
907 	}
908 
909 	if (rsb_flag(r, RSB_VALNOTVALID))
910 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
911 }
912 
913 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
914 {
915 	if (lkb->lkb_grmode < DLM_LOCK_PW)
916 		return;
917 
918 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
919 		rsb_set_flag(r, RSB_VALNOTVALID);
920 		return;
921 	}
922 
923 	if (!lkb->lkb_lvbptr)
924 		return;
925 
926 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
927 		return;
928 
929 	if (!r->res_lvbptr)
930 		r->res_lvbptr = allocate_lvb(r->res_ls);
931 
932 	if (!r->res_lvbptr)
933 		return;
934 
935 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
936 	r->res_lvbseq++;
937 	rsb_clear_flag(r, RSB_VALNOTVALID);
938 }
939 
940 /* lkb is process copy (pc) */
941 
942 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
943 			    struct dlm_message *ms)
944 {
945 	int b;
946 
947 	if (!lkb->lkb_lvbptr)
948 		return;
949 
950 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
951 		return;
952 
953 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
954 	if (b == 1) {
955 		int len = receive_extralen(ms);
956 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
957 		lkb->lkb_lvbseq = ms->m_lvbseq;
958 	}
959 }
960 
961 /* Manipulate lkb's on rsb's convert/granted/waiting queues
962    remove_lock -- used for unlock, removes lkb from granted
963    revert_lock -- used for cancel, moves lkb from convert to granted
964    grant_lock  -- used for request and convert, adds lkb to granted or
965                   moves lkb from convert or waiting to granted
966 
967    Each of these is used for master or local copy lkb's.  There is
968    also a _pc() variation used to make the corresponding change on
969    a process copy (pc) lkb. */
970 
971 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
972 {
973 	del_lkb(r, lkb);
974 	lkb->lkb_grmode = DLM_LOCK_IV;
975 	/* this unhold undoes the original ref from create_lkb()
976 	   so this leads to the lkb being freed */
977 	unhold_lkb(lkb);
978 }
979 
980 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
981 {
982 	set_lvb_unlock(r, lkb);
983 	_remove_lock(r, lkb);
984 }
985 
986 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
987 {
988 	_remove_lock(r, lkb);
989 }
990 
991 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992 {
993 	lkb->lkb_rqmode = DLM_LOCK_IV;
994 
995 	switch (lkb->lkb_status) {
996 	case DLM_LKSTS_GRANTED:
997 		break;
998 	case DLM_LKSTS_CONVERT:
999 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000 		break;
1001 	case DLM_LKSTS_WAITING:
1002 		del_lkb(r, lkb);
1003 		lkb->lkb_grmode = DLM_LOCK_IV;
1004 		/* this unhold undoes the original ref from create_lkb()
1005 		   so this leads to the lkb being freed */
1006 		unhold_lkb(lkb);
1007 		break;
1008 	default:
1009 		log_print("invalid status for revert %d", lkb->lkb_status);
1010 	}
1011 }
1012 
1013 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1014 {
1015 	revert_lock(r, lkb);
1016 }
1017 
1018 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1019 {
1020 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1021 		lkb->lkb_grmode = lkb->lkb_rqmode;
1022 		if (lkb->lkb_status)
1023 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1024 		else
1025 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1026 	}
1027 
1028 	lkb->lkb_rqmode = DLM_LOCK_IV;
1029 }
1030 
1031 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1032 {
1033 	set_lvb_lock(r, lkb);
1034 	_grant_lock(r, lkb);
1035 	lkb->lkb_highbast = 0;
1036 }
1037 
1038 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1039 			  struct dlm_message *ms)
1040 {
1041 	set_lvb_lock_pc(r, lkb, ms);
1042 	_grant_lock(r, lkb);
1043 }
1044 
1045 /* called by grant_pending_locks() which means an async grant message must
1046    be sent to the requesting node in addition to granting the lock if the
1047    lkb belongs to a remote node. */
1048 
1049 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1050 {
1051 	grant_lock(r, lkb);
1052 	if (is_master_copy(lkb))
1053 		send_grant(r, lkb);
1054 	else
1055 		queue_cast(r, lkb, 0);
1056 }
1057 
1058 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1059 {
1060 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1061 					   lkb_statequeue);
1062 	if (lkb->lkb_id == first->lkb_id)
1063 		return 1;
1064 
1065 	return 0;
1066 }
1067 
1068 /* Check if the given lkb conflicts with another lkb on the queue. */
1069 
1070 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1071 {
1072 	struct dlm_lkb *this;
1073 
1074 	list_for_each_entry(this, head, lkb_statequeue) {
1075 		if (this == lkb)
1076 			continue;
1077 		if (!modes_compat(this, lkb))
1078 			return 1;
1079 	}
1080 	return 0;
1081 }
1082 
1083 /*
1084  * "A conversion deadlock arises with a pair of lock requests in the converting
1085  * queue for one resource.  The granted mode of each lock blocks the requested
1086  * mode of the other lock."
1087  *
1088  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1089  * convert queue from being granted, then demote lkb (set grmode to NL).
1090  * This second form requires that we check for conv-deadlk even when
1091  * now == 0 in _can_be_granted().
1092  *
1093  * Example:
1094  * Granted Queue: empty
1095  * Convert Queue: NL->EX (first lock)
1096  *                PR->EX (second lock)
1097  *
1098  * The first lock can't be granted because of the granted mode of the second
1099  * lock and the second lock can't be granted because it's not first in the
1100  * list.  We demote the granted mode of the second lock (the lkb passed to this
1101  * function).
1102  *
1103  * After the resolution, the "grant pending" function needs to go back and try
1104  * to grant locks on the convert queue again since the first lock can now be
1105  * granted.
1106  */
1107 
1108 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1109 {
1110 	struct dlm_lkb *this, *first = NULL, *self = NULL;
1111 
1112 	list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1113 		if (!first)
1114 			first = this;
1115 		if (this == lkb) {
1116 			self = lkb;
1117 			continue;
1118 		}
1119 
1120 		if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1121 			return 1;
1122 	}
1123 
1124 	/* if lkb is on the convert queue and is preventing the first
1125 	   from being granted, then there's deadlock and we demote lkb.
1126 	   multiple converting locks may need to do this before the first
1127 	   converting lock can be granted. */
1128 
1129 	if (self && self != first) {
1130 		if (!modes_compat(lkb, first) &&
1131 		    !queue_conflict(&rsb->res_grantqueue, first))
1132 			return 1;
1133 	}
1134 
1135 	return 0;
1136 }
1137 
1138 /*
1139  * Return 1 if the lock can be granted, 0 otherwise.
1140  * Also detect and resolve conversion deadlocks.
1141  *
1142  * lkb is the lock to be granted
1143  *
1144  * now is 1 if the function is being called in the context of the
1145  * immediate request, it is 0 if called later, after the lock has been
1146  * queued.
1147  *
1148  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1149  */
1150 
1151 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1152 {
1153 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1154 
1155 	/*
1156 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1157 	 * a new request for a NL mode lock being blocked.
1158 	 *
1159 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1160 	 * request, then it would be granted.  In essence, the use of this flag
1161 	 * tells the Lock Manager to expedite theis request by not considering
1162 	 * what may be in the CONVERTING or WAITING queues...  As of this
1163 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1164 	 * mode locks.  This flag is not valid for conversion requests.
1165 	 *
1166 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1167 	 * conversion or used with a non-NL requested mode.  We also know an
1168 	 * EXPEDITE request is always granted immediately, so now must always
1169 	 * be 1.  The full condition to grant an expedite request: (now &&
1170 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1171 	 * therefore be shortened to just checking the flag.
1172 	 */
1173 
1174 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1175 		return 1;
1176 
1177 	/*
1178 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1179 	 * added to the remaining conditions.
1180 	 */
1181 
1182 	if (queue_conflict(&r->res_grantqueue, lkb))
1183 		goto out;
1184 
1185 	/*
1186 	 * 6-3: By default, a conversion request is immediately granted if the
1187 	 * requested mode is compatible with the modes of all other granted
1188 	 * locks
1189 	 */
1190 
1191 	if (queue_conflict(&r->res_convertqueue, lkb))
1192 		goto out;
1193 
1194 	/*
1195 	 * 6-5: But the default algorithm for deciding whether to grant or
1196 	 * queue conversion requests does not by itself guarantee that such
1197 	 * requests are serviced on a "first come first serve" basis.  This, in
1198 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1199 	 *
1200 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1201 	 * the system service employed to request a lock conversion.  This flag
1202 	 * forces certain conversion requests to be queued, even if they are
1203 	 * compatible with the granted modes of other locks on the same
1204 	 * resource.  Thus, the use of this flag results in conversion requests
1205 	 * being ordered on a "first come first servce" basis.
1206 	 *
1207 	 * DCT: This condition is all about new conversions being able to occur
1208 	 * "in place" while the lock remains on the granted queue (assuming
1209 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1210 	 * doesn't _have_ to go onto the convert queue where it's processed in
1211 	 * order.  The "now" variable is necessary to distinguish converts
1212 	 * being received and processed for the first time now, because once a
1213 	 * convert is moved to the conversion queue the condition below applies
1214 	 * requiring fifo granting.
1215 	 */
1216 
1217 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1218 		return 1;
1219 
1220 	/*
1221 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1222 	 * order.
1223 	 */
1224 
1225 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1226 		return 1;
1227 
1228 	/*
1229 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1230 	 * granted until all other conversion requests ahead of it are granted
1231 	 * and/or canceled.
1232 	 */
1233 
1234 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1235 		return 1;
1236 
1237 	/*
1238 	 * 6-4: By default, a new request is immediately granted only if all
1239 	 * three of the following conditions are satisfied when the request is
1240 	 * issued:
1241 	 * - The queue of ungranted conversion requests for the resource is
1242 	 *   empty.
1243 	 * - The queue of ungranted new requests for the resource is empty.
1244 	 * - The mode of the new request is compatible with the most
1245 	 *   restrictive mode of all granted locks on the resource.
1246 	 */
1247 
1248 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1249 	    list_empty(&r->res_waitqueue))
1250 		return 1;
1251 
1252 	/*
1253 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1254 	 * it cannot be granted until the queue of ungranted conversion
1255 	 * requests is empty, all ungranted new requests ahead of it are
1256 	 * granted and/or canceled, and it is compatible with the granted mode
1257 	 * of the most restrictive lock granted on the resource.
1258 	 */
1259 
1260 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1261 	    first_in_list(lkb, &r->res_waitqueue))
1262 		return 1;
1263 
1264  out:
1265 	/*
1266 	 * The following, enabled by CONVDEADLK, departs from VMS.
1267 	 */
1268 
1269 	if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1270 	    conversion_deadlock_detect(r, lkb)) {
1271 		lkb->lkb_grmode = DLM_LOCK_NL;
1272 		lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1273 	}
1274 
1275 	return 0;
1276 }
1277 
1278 /*
1279  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1280  * simple way to provide a big optimization to applications that can use them.
1281  */
1282 
1283 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1284 {
1285 	uint32_t flags = lkb->lkb_exflags;
1286 	int rv;
1287 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1288 
1289 	rv = _can_be_granted(r, lkb, now);
1290 	if (rv)
1291 		goto out;
1292 
1293 	if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1294 		goto out;
1295 
1296 	if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1297 		alt = DLM_LOCK_PR;
1298 	else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1299 		alt = DLM_LOCK_CW;
1300 
1301 	if (alt) {
1302 		lkb->lkb_rqmode = alt;
1303 		rv = _can_be_granted(r, lkb, now);
1304 		if (rv)
1305 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1306 		else
1307 			lkb->lkb_rqmode = rqmode;
1308 	}
1309  out:
1310 	return rv;
1311 }
1312 
1313 static int grant_pending_convert(struct dlm_rsb *r, int high)
1314 {
1315 	struct dlm_lkb *lkb, *s;
1316 	int hi, demoted, quit, grant_restart, demote_restart;
1317 
1318 	quit = 0;
1319  restart:
1320 	grant_restart = 0;
1321 	demote_restart = 0;
1322 	hi = DLM_LOCK_IV;
1323 
1324 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1325 		demoted = is_demoted(lkb);
1326 		if (can_be_granted(r, lkb, 0)) {
1327 			grant_lock_pending(r, lkb);
1328 			grant_restart = 1;
1329 		} else {
1330 			hi = max_t(int, lkb->lkb_rqmode, hi);
1331 			if (!demoted && is_demoted(lkb))
1332 				demote_restart = 1;
1333 		}
1334 	}
1335 
1336 	if (grant_restart)
1337 		goto restart;
1338 	if (demote_restart && !quit) {
1339 		quit = 1;
1340 		goto restart;
1341 	}
1342 
1343 	return max_t(int, high, hi);
1344 }
1345 
1346 static int grant_pending_wait(struct dlm_rsb *r, int high)
1347 {
1348 	struct dlm_lkb *lkb, *s;
1349 
1350 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1351 		if (can_be_granted(r, lkb, 0))
1352 			grant_lock_pending(r, lkb);
1353                 else
1354 			high = max_t(int, lkb->lkb_rqmode, high);
1355 	}
1356 
1357 	return high;
1358 }
1359 
1360 static void grant_pending_locks(struct dlm_rsb *r)
1361 {
1362 	struct dlm_lkb *lkb, *s;
1363 	int high = DLM_LOCK_IV;
1364 
1365 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1366 
1367 	high = grant_pending_convert(r, high);
1368 	high = grant_pending_wait(r, high);
1369 
1370 	if (high == DLM_LOCK_IV)
1371 		return;
1372 
1373 	/*
1374 	 * If there are locks left on the wait/convert queue then send blocking
1375 	 * ASTs to granted locks based on the largest requested mode (high)
1376 	 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1377 	 */
1378 
1379 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1380 		if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1381 		    !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1382 			queue_bast(r, lkb, high);
1383 			lkb->lkb_highbast = high;
1384 		}
1385 	}
1386 }
1387 
1388 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1389 			    struct dlm_lkb *lkb)
1390 {
1391 	struct dlm_lkb *gr;
1392 
1393 	list_for_each_entry(gr, head, lkb_statequeue) {
1394 		if (gr->lkb_bastaddr &&
1395 		    gr->lkb_highbast < lkb->lkb_rqmode &&
1396 		    !modes_compat(gr, lkb)) {
1397 			queue_bast(r, gr, lkb->lkb_rqmode);
1398 			gr->lkb_highbast = lkb->lkb_rqmode;
1399 		}
1400 	}
1401 }
1402 
1403 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1404 {
1405 	send_bast_queue(r, &r->res_grantqueue, lkb);
1406 }
1407 
1408 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1409 {
1410 	send_bast_queue(r, &r->res_grantqueue, lkb);
1411 	send_bast_queue(r, &r->res_convertqueue, lkb);
1412 }
1413 
1414 /* set_master(r, lkb) -- set the master nodeid of a resource
1415 
1416    The purpose of this function is to set the nodeid field in the given
1417    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1418    known, it can just be copied to the lkb and the function will return
1419    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1420    before it can be copied to the lkb.
1421 
1422    When the rsb nodeid is being looked up remotely, the initial lkb
1423    causing the lookup is kept on the ls_waiters list waiting for the
1424    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1425    on the rsb's res_lookup list until the master is verified.
1426 
1427    Return values:
1428    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1429    1: the rsb master is not available and the lkb has been placed on
1430       a wait queue
1431 */
1432 
1433 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1434 {
1435 	struct dlm_ls *ls = r->res_ls;
1436 	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1437 
1438 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1439 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1440 		r->res_first_lkid = lkb->lkb_id;
1441 		lkb->lkb_nodeid = r->res_nodeid;
1442 		return 0;
1443 	}
1444 
1445 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1446 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1447 		return 1;
1448 	}
1449 
1450 	if (r->res_nodeid == 0) {
1451 		lkb->lkb_nodeid = 0;
1452 		return 0;
1453 	}
1454 
1455 	if (r->res_nodeid > 0) {
1456 		lkb->lkb_nodeid = r->res_nodeid;
1457 		return 0;
1458 	}
1459 
1460 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1461 
1462 	dir_nodeid = dlm_dir_nodeid(r);
1463 
1464 	if (dir_nodeid != our_nodeid) {
1465 		r->res_first_lkid = lkb->lkb_id;
1466 		send_lookup(r, lkb);
1467 		return 1;
1468 	}
1469 
1470 	for (;;) {
1471 		/* It's possible for dlm_scand to remove an old rsb for
1472 		   this same resource from the toss list, us to create
1473 		   a new one, look up the master locally, and find it
1474 		   already exists just before dlm_scand does the
1475 		   dir_remove() on the previous rsb. */
1476 
1477 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1478 				       r->res_length, &ret_nodeid);
1479 		if (!error)
1480 			break;
1481 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1482 		schedule();
1483 	}
1484 
1485 	if (ret_nodeid == our_nodeid) {
1486 		r->res_first_lkid = 0;
1487 		r->res_nodeid = 0;
1488 		lkb->lkb_nodeid = 0;
1489 	} else {
1490 		r->res_first_lkid = lkb->lkb_id;
1491 		r->res_nodeid = ret_nodeid;
1492 		lkb->lkb_nodeid = ret_nodeid;
1493 	}
1494 	return 0;
1495 }
1496 
1497 static void process_lookup_list(struct dlm_rsb *r)
1498 {
1499 	struct dlm_lkb *lkb, *safe;
1500 
1501 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502 		list_del(&lkb->lkb_rsb_lookup);
1503 		_request_lock(r, lkb);
1504 		schedule();
1505 	}
1506 }
1507 
1508 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1509 
1510 static void confirm_master(struct dlm_rsb *r, int error)
1511 {
1512 	struct dlm_lkb *lkb;
1513 
1514 	if (!r->res_first_lkid)
1515 		return;
1516 
1517 	switch (error) {
1518 	case 0:
1519 	case -EINPROGRESS:
1520 		r->res_first_lkid = 0;
1521 		process_lookup_list(r);
1522 		break;
1523 
1524 	case -EAGAIN:
1525 		/* the remote master didn't queue our NOQUEUE request;
1526 		   make a waiting lkb the first_lkid */
1527 
1528 		r->res_first_lkid = 0;
1529 
1530 		if (!list_empty(&r->res_lookup)) {
1531 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532 					 lkb_rsb_lookup);
1533 			list_del(&lkb->lkb_rsb_lookup);
1534 			r->res_first_lkid = lkb->lkb_id;
1535 			_request_lock(r, lkb);
1536 		} else
1537 			r->res_nodeid = -1;
1538 		break;
1539 
1540 	default:
1541 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1542 	}
1543 }
1544 
1545 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1546 			 int namelen, uint32_t parent_lkid, void *ast,
1547 			 void *astarg, void *bast, struct dlm_args *args)
1548 {
1549 	int rv = -EINVAL;
1550 
1551 	/* check for invalid arg usage */
1552 
1553 	if (mode < 0 || mode > DLM_LOCK_EX)
1554 		goto out;
1555 
1556 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1557 		goto out;
1558 
1559 	if (flags & DLM_LKF_CANCEL)
1560 		goto out;
1561 
1562 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1563 		goto out;
1564 
1565 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1566 		goto out;
1567 
1568 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1569 		goto out;
1570 
1571 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1572 		goto out;
1573 
1574 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1575 		goto out;
1576 
1577 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1578 		goto out;
1579 
1580 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1581 		goto out;
1582 
1583 	if (!ast || !lksb)
1584 		goto out;
1585 
1586 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1587 		goto out;
1588 
1589 	/* parent/child locks not yet supported */
1590 	if (parent_lkid)
1591 		goto out;
1592 
1593 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1594 		goto out;
1595 
1596 	/* these args will be copied to the lkb in validate_lock_args,
1597 	   it cannot be done now because when converting locks, fields in
1598 	   an active lkb cannot be modified before locking the rsb */
1599 
1600 	args->flags = flags;
1601 	args->astaddr = ast;
1602 	args->astparam = (long) astarg;
1603 	args->bastaddr = bast;
1604 	args->mode = mode;
1605 	args->lksb = lksb;
1606 	rv = 0;
1607  out:
1608 	return rv;
1609 }
1610 
1611 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1612 {
1613 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1614  		      DLM_LKF_FORCEUNLOCK))
1615 		return -EINVAL;
1616 
1617 	args->flags = flags;
1618 	args->astparam = (long) astarg;
1619 	return 0;
1620 }
1621 
1622 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1623 			      struct dlm_args *args)
1624 {
1625 	int rv = -EINVAL;
1626 
1627 	if (args->flags & DLM_LKF_CONVERT) {
1628 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1629 			goto out;
1630 
1631 		if (args->flags & DLM_LKF_QUECVT &&
1632 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1633 			goto out;
1634 
1635 		rv = -EBUSY;
1636 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1637 			goto out;
1638 
1639 		if (lkb->lkb_wait_type)
1640 			goto out;
1641 	}
1642 
1643 	lkb->lkb_exflags = args->flags;
1644 	lkb->lkb_sbflags = 0;
1645 	lkb->lkb_astaddr = args->astaddr;
1646 	lkb->lkb_astparam = args->astparam;
1647 	lkb->lkb_bastaddr = args->bastaddr;
1648 	lkb->lkb_rqmode = args->mode;
1649 	lkb->lkb_lksb = args->lksb;
1650 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1651 	lkb->lkb_ownpid = (int) current->pid;
1652 	rv = 0;
1653  out:
1654 	return rv;
1655 }
1656 
1657 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1658 {
1659 	int rv = -EINVAL;
1660 
1661 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1662 		goto out;
1663 
1664 	if (args->flags & DLM_LKF_FORCEUNLOCK)
1665 		goto out_ok;
1666 
1667 	if (args->flags & DLM_LKF_CANCEL &&
1668 	    lkb->lkb_status == DLM_LKSTS_GRANTED)
1669 		goto out;
1670 
1671 	if (!(args->flags & DLM_LKF_CANCEL) &&
1672 	    lkb->lkb_status != DLM_LKSTS_GRANTED)
1673 		goto out;
1674 
1675 	rv = -EBUSY;
1676 	if (lkb->lkb_wait_type)
1677 		goto out;
1678 
1679  out_ok:
1680 	lkb->lkb_exflags = args->flags;
1681 	lkb->lkb_sbflags = 0;
1682 	lkb->lkb_astparam = args->astparam;
1683 
1684 	rv = 0;
1685  out:
1686 	return rv;
1687 }
1688 
1689 /*
1690  * Four stage 4 varieties:
1691  * do_request(), do_convert(), do_unlock(), do_cancel()
1692  * These are called on the master node for the given lock and
1693  * from the central locking logic.
1694  */
1695 
1696 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1697 {
1698 	int error = 0;
1699 
1700 	if (can_be_granted(r, lkb, 1)) {
1701 		grant_lock(r, lkb);
1702 		queue_cast(r, lkb, 0);
1703 		goto out;
1704 	}
1705 
1706 	if (can_be_queued(lkb)) {
1707 		error = -EINPROGRESS;
1708 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
1709 		send_blocking_asts(r, lkb);
1710 		goto out;
1711 	}
1712 
1713 	error = -EAGAIN;
1714 	if (force_blocking_asts(lkb))
1715 		send_blocking_asts_all(r, lkb);
1716 	queue_cast(r, lkb, -EAGAIN);
1717 
1718  out:
1719 	return error;
1720 }
1721 
1722 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723 {
1724 	int error = 0;
1725 
1726 	/* changing an existing lock may allow others to be granted */
1727 
1728 	if (can_be_granted(r, lkb, 1)) {
1729 		grant_lock(r, lkb);
1730 		queue_cast(r, lkb, 0);
1731 		grant_pending_locks(r);
1732 		goto out;
1733 	}
1734 
1735 	if (can_be_queued(lkb)) {
1736 		if (is_demoted(lkb))
1737 			grant_pending_locks(r);
1738 		error = -EINPROGRESS;
1739 		del_lkb(r, lkb);
1740 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1741 		send_blocking_asts(r, lkb);
1742 		goto out;
1743 	}
1744 
1745 	error = -EAGAIN;
1746 	if (force_blocking_asts(lkb))
1747 		send_blocking_asts_all(r, lkb);
1748 	queue_cast(r, lkb, -EAGAIN);
1749 
1750  out:
1751 	return error;
1752 }
1753 
1754 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1755 {
1756 	remove_lock(r, lkb);
1757 	queue_cast(r, lkb, -DLM_EUNLOCK);
1758 	grant_pending_locks(r);
1759 	return -DLM_EUNLOCK;
1760 }
1761 
1762 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1763    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1764    completed (and queued a normal ast) just before the cancel; we don't
1765    want to clobber the sb_result for the normal ast with ECANCEL. */
1766 
1767 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1768 {
1769 	revert_lock(r, lkb);
1770 	queue_cast(r, lkb, -DLM_ECANCEL);
1771 	grant_pending_locks(r);
1772 	return -DLM_ECANCEL;
1773 }
1774 
1775 /*
1776  * Four stage 3 varieties:
1777  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1778  */
1779 
1780 /* add a new lkb to a possibly new rsb, called by requesting process */
1781 
1782 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1783 {
1784 	int error;
1785 
1786 	/* set_master: sets lkb nodeid from r */
1787 
1788 	error = set_master(r, lkb);
1789 	if (error < 0)
1790 		goto out;
1791 	if (error) {
1792 		error = 0;
1793 		goto out;
1794 	}
1795 
1796 	if (is_remote(r))
1797 		/* receive_request() calls do_request() on remote node */
1798 		error = send_request(r, lkb);
1799 	else
1800 		error = do_request(r, lkb);
1801  out:
1802 	return error;
1803 }
1804 
1805 /* change some property of an existing lkb, e.g. mode */
1806 
1807 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1808 {
1809 	int error;
1810 
1811 	if (is_remote(r))
1812 		/* receive_convert() calls do_convert() on remote node */
1813 		error = send_convert(r, lkb);
1814 	else
1815 		error = do_convert(r, lkb);
1816 
1817 	return error;
1818 }
1819 
1820 /* remove an existing lkb from the granted queue */
1821 
1822 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1823 {
1824 	int error;
1825 
1826 	if (is_remote(r))
1827 		/* receive_unlock() calls do_unlock() on remote node */
1828 		error = send_unlock(r, lkb);
1829 	else
1830 		error = do_unlock(r, lkb);
1831 
1832 	return error;
1833 }
1834 
1835 /* remove an existing lkb from the convert or wait queue */
1836 
1837 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1838 {
1839 	int error;
1840 
1841 	if (is_remote(r))
1842 		/* receive_cancel() calls do_cancel() on remote node */
1843 		error = send_cancel(r, lkb);
1844 	else
1845 		error = do_cancel(r, lkb);
1846 
1847 	return error;
1848 }
1849 
1850 /*
1851  * Four stage 2 varieties:
1852  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1853  */
1854 
1855 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1856 			int len, struct dlm_args *args)
1857 {
1858 	struct dlm_rsb *r;
1859 	int error;
1860 
1861 	error = validate_lock_args(ls, lkb, args);
1862 	if (error)
1863 		goto out;
1864 
1865 	error = find_rsb(ls, name, len, R_CREATE, &r);
1866 	if (error)
1867 		goto out;
1868 
1869 	lock_rsb(r);
1870 
1871 	attach_lkb(r, lkb);
1872 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1873 
1874 	error = _request_lock(r, lkb);
1875 
1876 	unlock_rsb(r);
1877 	put_rsb(r);
1878 
1879  out:
1880 	return error;
1881 }
1882 
1883 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1884 			struct dlm_args *args)
1885 {
1886 	struct dlm_rsb *r;
1887 	int error;
1888 
1889 	r = lkb->lkb_resource;
1890 
1891 	hold_rsb(r);
1892 	lock_rsb(r);
1893 
1894 	error = validate_lock_args(ls, lkb, args);
1895 	if (error)
1896 		goto out;
1897 
1898 	error = _convert_lock(r, lkb);
1899  out:
1900 	unlock_rsb(r);
1901 	put_rsb(r);
1902 	return error;
1903 }
1904 
1905 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1906 		       struct dlm_args *args)
1907 {
1908 	struct dlm_rsb *r;
1909 	int error;
1910 
1911 	r = lkb->lkb_resource;
1912 
1913 	hold_rsb(r);
1914 	lock_rsb(r);
1915 
1916 	error = validate_unlock_args(lkb, args);
1917 	if (error)
1918 		goto out;
1919 
1920 	error = _unlock_lock(r, lkb);
1921  out:
1922 	unlock_rsb(r);
1923 	put_rsb(r);
1924 	return error;
1925 }
1926 
1927 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1928 		       struct dlm_args *args)
1929 {
1930 	struct dlm_rsb *r;
1931 	int error;
1932 
1933 	r = lkb->lkb_resource;
1934 
1935 	hold_rsb(r);
1936 	lock_rsb(r);
1937 
1938 	error = validate_unlock_args(lkb, args);
1939 	if (error)
1940 		goto out;
1941 
1942 	error = _cancel_lock(r, lkb);
1943  out:
1944 	unlock_rsb(r);
1945 	put_rsb(r);
1946 	return error;
1947 }
1948 
1949 /*
1950  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1951  */
1952 
1953 int dlm_lock(dlm_lockspace_t *lockspace,
1954 	     int mode,
1955 	     struct dlm_lksb *lksb,
1956 	     uint32_t flags,
1957 	     void *name,
1958 	     unsigned int namelen,
1959 	     uint32_t parent_lkid,
1960 	     void (*ast) (void *astarg),
1961 	     void *astarg,
1962 	     void (*bast) (void *astarg, int mode))
1963 {
1964 	struct dlm_ls *ls;
1965 	struct dlm_lkb *lkb;
1966 	struct dlm_args args;
1967 	int error, convert = flags & DLM_LKF_CONVERT;
1968 
1969 	ls = dlm_find_lockspace_local(lockspace);
1970 	if (!ls)
1971 		return -EINVAL;
1972 
1973 	lock_recovery(ls);
1974 
1975 	if (convert)
1976 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
1977 	else
1978 		error = create_lkb(ls, &lkb);
1979 
1980 	if (error)
1981 		goto out;
1982 
1983 	error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1984 			      astarg, bast, &args);
1985 	if (error)
1986 		goto out_put;
1987 
1988 	if (convert)
1989 		error = convert_lock(ls, lkb, &args);
1990 	else
1991 		error = request_lock(ls, lkb, name, namelen, &args);
1992 
1993 	if (error == -EINPROGRESS)
1994 		error = 0;
1995  out_put:
1996 	if (convert || error)
1997 		__put_lkb(ls, lkb);
1998 	if (error == -EAGAIN)
1999 		error = 0;
2000  out:
2001 	unlock_recovery(ls);
2002 	dlm_put_lockspace(ls);
2003 	return error;
2004 }
2005 
2006 int dlm_unlock(dlm_lockspace_t *lockspace,
2007 	       uint32_t lkid,
2008 	       uint32_t flags,
2009 	       struct dlm_lksb *lksb,
2010 	       void *astarg)
2011 {
2012 	struct dlm_ls *ls;
2013 	struct dlm_lkb *lkb;
2014 	struct dlm_args args;
2015 	int error;
2016 
2017 	ls = dlm_find_lockspace_local(lockspace);
2018 	if (!ls)
2019 		return -EINVAL;
2020 
2021 	lock_recovery(ls);
2022 
2023 	error = find_lkb(ls, lkid, &lkb);
2024 	if (error)
2025 		goto out;
2026 
2027 	error = set_unlock_args(flags, astarg, &args);
2028 	if (error)
2029 		goto out_put;
2030 
2031 	if (flags & DLM_LKF_CANCEL)
2032 		error = cancel_lock(ls, lkb, &args);
2033 	else
2034 		error = unlock_lock(ls, lkb, &args);
2035 
2036 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037 		error = 0;
2038  out_put:
2039 	dlm_put_lkb(lkb);
2040  out:
2041 	unlock_recovery(ls);
2042 	dlm_put_lockspace(ls);
2043 	return error;
2044 }
2045 
2046 /*
2047  * send/receive routines for remote operations and replies
2048  *
2049  * send_args
2050  * send_common
2051  * send_request			receive_request
2052  * send_convert			receive_convert
2053  * send_unlock			receive_unlock
2054  * send_cancel			receive_cancel
2055  * send_grant			receive_grant
2056  * send_bast			receive_bast
2057  * send_lookup			receive_lookup
2058  * send_remove			receive_remove
2059  *
2060  * 				send_common_reply
2061  * receive_request_reply	send_request_reply
2062  * receive_convert_reply	send_convert_reply
2063  * receive_unlock_reply		send_unlock_reply
2064  * receive_cancel_reply		send_cancel_reply
2065  * receive_lookup_reply		send_lookup_reply
2066  */
2067 
2068 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2069 			  int to_nodeid, int mstype,
2070 			  struct dlm_message **ms_ret,
2071 			  struct dlm_mhandle **mh_ret)
2072 {
2073 	struct dlm_message *ms;
2074 	struct dlm_mhandle *mh;
2075 	char *mb;
2076 	int mb_len = sizeof(struct dlm_message);
2077 
2078 	switch (mstype) {
2079 	case DLM_MSG_REQUEST:
2080 	case DLM_MSG_LOOKUP:
2081 	case DLM_MSG_REMOVE:
2082 		mb_len += r->res_length;
2083 		break;
2084 	case DLM_MSG_CONVERT:
2085 	case DLM_MSG_UNLOCK:
2086 	case DLM_MSG_REQUEST_REPLY:
2087 	case DLM_MSG_CONVERT_REPLY:
2088 	case DLM_MSG_GRANT:
2089 		if (lkb && lkb->lkb_lvbptr)
2090 			mb_len += r->res_ls->ls_lvblen;
2091 		break;
2092 	}
2093 
2094 	/* get_buffer gives us a message handle (mh) that we need to
2095 	   pass into lowcomms_commit and a message buffer (mb) that we
2096 	   write our data into */
2097 
2098 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2099 	if (!mh)
2100 		return -ENOBUFS;
2101 
2102 	memset(mb, 0, mb_len);
2103 
2104 	ms = (struct dlm_message *) mb;
2105 
2106 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107 	ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2108 	ms->m_header.h_nodeid = dlm_our_nodeid();
2109 	ms->m_header.h_length = mb_len;
2110 	ms->m_header.h_cmd = DLM_MSG;
2111 
2112 	ms->m_type = mstype;
2113 
2114 	*mh_ret = mh;
2115 	*ms_ret = ms;
2116 	return 0;
2117 }
2118 
2119 /* further lowcomms enhancements or alternate implementations may make
2120    the return value from this function useful at some point */
2121 
2122 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2123 {
2124 	dlm_message_out(ms);
2125 	dlm_lowcomms_commit_buffer(mh);
2126 	return 0;
2127 }
2128 
2129 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2130 		      struct dlm_message *ms)
2131 {
2132 	ms->m_nodeid   = lkb->lkb_nodeid;
2133 	ms->m_pid      = lkb->lkb_ownpid;
2134 	ms->m_lkid     = lkb->lkb_id;
2135 	ms->m_remid    = lkb->lkb_remid;
2136 	ms->m_exflags  = lkb->lkb_exflags;
2137 	ms->m_sbflags  = lkb->lkb_sbflags;
2138 	ms->m_flags    = lkb->lkb_flags;
2139 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2140 	ms->m_status   = lkb->lkb_status;
2141 	ms->m_grmode   = lkb->lkb_grmode;
2142 	ms->m_rqmode   = lkb->lkb_rqmode;
2143 	ms->m_hash     = r->res_hash;
2144 
2145 	/* m_result and m_bastmode are set from function args,
2146 	   not from lkb fields */
2147 
2148 	if (lkb->lkb_bastaddr)
2149 		ms->m_asts |= AST_BAST;
2150 	if (lkb->lkb_astaddr)
2151 		ms->m_asts |= AST_COMP;
2152 
2153 	/* compare with switch in create_message; send_remove() doesn't
2154 	   use send_args() */
2155 
2156 	switch (ms->m_type) {
2157 	case DLM_MSG_REQUEST:
2158 	case DLM_MSG_LOOKUP:
2159 		memcpy(ms->m_extra, r->res_name, r->res_length);
2160 		break;
2161 	case DLM_MSG_CONVERT:
2162 	case DLM_MSG_UNLOCK:
2163 	case DLM_MSG_REQUEST_REPLY:
2164 	case DLM_MSG_CONVERT_REPLY:
2165 	case DLM_MSG_GRANT:
2166 		if (!lkb->lkb_lvbptr)
2167 			break;
2168 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2169 		break;
2170 	}
2171 }
2172 
2173 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2174 {
2175 	struct dlm_message *ms;
2176 	struct dlm_mhandle *mh;
2177 	int to_nodeid, error;
2178 
2179 	add_to_waiters(lkb, mstype);
2180 
2181 	to_nodeid = r->res_nodeid;
2182 
2183 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2184 	if (error)
2185 		goto fail;
2186 
2187 	send_args(r, lkb, ms);
2188 
2189 	error = send_message(mh, ms);
2190 	if (error)
2191 		goto fail;
2192 	return 0;
2193 
2194  fail:
2195 	remove_from_waiters(lkb);
2196 	return error;
2197 }
2198 
2199 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2200 {
2201 	return send_common(r, lkb, DLM_MSG_REQUEST);
2202 }
2203 
2204 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2205 {
2206 	int error;
2207 
2208 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2209 
2210 	/* down conversions go without a reply from the master */
2211 	if (!error && down_conversion(lkb)) {
2212 		remove_from_waiters(lkb);
2213 		r->res_ls->ls_stub_ms.m_result = 0;
2214 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2216 	}
2217 
2218 	return error;
2219 }
2220 
2221 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2222    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2223    that the master is still correct. */
2224 
2225 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2226 {
2227 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2228 }
2229 
2230 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2231 {
2232 	return send_common(r, lkb, DLM_MSG_CANCEL);
2233 }
2234 
2235 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2236 {
2237 	struct dlm_message *ms;
2238 	struct dlm_mhandle *mh;
2239 	int to_nodeid, error;
2240 
2241 	to_nodeid = lkb->lkb_nodeid;
2242 
2243 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2244 	if (error)
2245 		goto out;
2246 
2247 	send_args(r, lkb, ms);
2248 
2249 	ms->m_result = 0;
2250 
2251 	error = send_message(mh, ms);
2252  out:
2253 	return error;
2254 }
2255 
2256 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2257 {
2258 	struct dlm_message *ms;
2259 	struct dlm_mhandle *mh;
2260 	int to_nodeid, error;
2261 
2262 	to_nodeid = lkb->lkb_nodeid;
2263 
2264 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2265 	if (error)
2266 		goto out;
2267 
2268 	send_args(r, lkb, ms);
2269 
2270 	ms->m_bastmode = mode;
2271 
2272 	error = send_message(mh, ms);
2273  out:
2274 	return error;
2275 }
2276 
2277 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2278 {
2279 	struct dlm_message *ms;
2280 	struct dlm_mhandle *mh;
2281 	int to_nodeid, error;
2282 
2283 	add_to_waiters(lkb, DLM_MSG_LOOKUP);
2284 
2285 	to_nodeid = dlm_dir_nodeid(r);
2286 
2287 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2288 	if (error)
2289 		goto fail;
2290 
2291 	send_args(r, lkb, ms);
2292 
2293 	error = send_message(mh, ms);
2294 	if (error)
2295 		goto fail;
2296 	return 0;
2297 
2298  fail:
2299 	remove_from_waiters(lkb);
2300 	return error;
2301 }
2302 
2303 static int send_remove(struct dlm_rsb *r)
2304 {
2305 	struct dlm_message *ms;
2306 	struct dlm_mhandle *mh;
2307 	int to_nodeid, error;
2308 
2309 	to_nodeid = dlm_dir_nodeid(r);
2310 
2311 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2312 	if (error)
2313 		goto out;
2314 
2315 	memcpy(ms->m_extra, r->res_name, r->res_length);
2316 	ms->m_hash = r->res_hash;
2317 
2318 	error = send_message(mh, ms);
2319  out:
2320 	return error;
2321 }
2322 
2323 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2324 			     int mstype, int rv)
2325 {
2326 	struct dlm_message *ms;
2327 	struct dlm_mhandle *mh;
2328 	int to_nodeid, error;
2329 
2330 	to_nodeid = lkb->lkb_nodeid;
2331 
2332 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2333 	if (error)
2334 		goto out;
2335 
2336 	send_args(r, lkb, ms);
2337 
2338 	ms->m_result = rv;
2339 
2340 	error = send_message(mh, ms);
2341  out:
2342 	return error;
2343 }
2344 
2345 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2346 {
2347 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2348 }
2349 
2350 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2351 {
2352 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2353 }
2354 
2355 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2356 {
2357 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2358 }
2359 
2360 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2361 {
2362 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2363 }
2364 
2365 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2366 			     int ret_nodeid, int rv)
2367 {
2368 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2369 	struct dlm_message *ms;
2370 	struct dlm_mhandle *mh;
2371 	int error, nodeid = ms_in->m_header.h_nodeid;
2372 
2373 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2374 	if (error)
2375 		goto out;
2376 
2377 	ms->m_lkid = ms_in->m_lkid;
2378 	ms->m_result = rv;
2379 	ms->m_nodeid = ret_nodeid;
2380 
2381 	error = send_message(mh, ms);
2382  out:
2383 	return error;
2384 }
2385 
2386 /* which args we save from a received message depends heavily on the type
2387    of message, unlike the send side where we can safely send everything about
2388    the lkb for any type of message */
2389 
2390 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2391 {
2392 	lkb->lkb_exflags = ms->m_exflags;
2393 	lkb->lkb_sbflags = ms->m_sbflags;
2394 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2395 		         (ms->m_flags & 0x0000FFFF);
2396 }
2397 
2398 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2399 {
2400 	lkb->lkb_sbflags = ms->m_sbflags;
2401 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2402 		         (ms->m_flags & 0x0000FFFF);
2403 }
2404 
2405 static int receive_extralen(struct dlm_message *ms)
2406 {
2407 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2408 }
2409 
2410 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2411 		       struct dlm_message *ms)
2412 {
2413 	int len;
2414 
2415 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2416 		if (!lkb->lkb_lvbptr)
2417 			lkb->lkb_lvbptr = allocate_lvb(ls);
2418 		if (!lkb->lkb_lvbptr)
2419 			return -ENOMEM;
2420 		len = receive_extralen(ms);
2421 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2422 	}
2423 	return 0;
2424 }
2425 
2426 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2427 				struct dlm_message *ms)
2428 {
2429 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2430 	lkb->lkb_ownpid = ms->m_pid;
2431 	lkb->lkb_remid = ms->m_lkid;
2432 	lkb->lkb_grmode = DLM_LOCK_IV;
2433 	lkb->lkb_rqmode = ms->m_rqmode;
2434 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2435 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2436 
2437 	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2438 
2439 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2440 		/* lkb was just created so there won't be an lvb yet */
2441 		lkb->lkb_lvbptr = allocate_lvb(ls);
2442 		if (!lkb->lkb_lvbptr)
2443 			return -ENOMEM;
2444 	}
2445 
2446 	return 0;
2447 }
2448 
2449 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2450 				struct dlm_message *ms)
2451 {
2452 	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2453 		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2454 			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
2455 			  lkb->lkb_id, lkb->lkb_remid);
2456 		return -EINVAL;
2457 	}
2458 
2459 	if (!is_master_copy(lkb))
2460 		return -EINVAL;
2461 
2462 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2463 		return -EBUSY;
2464 
2465 	if (receive_lvb(ls, lkb, ms))
2466 		return -ENOMEM;
2467 
2468 	lkb->lkb_rqmode = ms->m_rqmode;
2469 	lkb->lkb_lvbseq = ms->m_lvbseq;
2470 
2471 	return 0;
2472 }
2473 
2474 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2475 			       struct dlm_message *ms)
2476 {
2477 	if (!is_master_copy(lkb))
2478 		return -EINVAL;
2479 	if (receive_lvb(ls, lkb, ms))
2480 		return -ENOMEM;
2481 	return 0;
2482 }
2483 
2484 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2485    uses to send a reply and that the remote end uses to process the reply. */
2486 
2487 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2488 {
2489 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2490 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2491 	lkb->lkb_remid = ms->m_lkid;
2492 }
2493 
2494 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2495 {
2496 	struct dlm_lkb *lkb;
2497 	struct dlm_rsb *r;
2498 	int error, namelen;
2499 
2500 	error = create_lkb(ls, &lkb);
2501 	if (error)
2502 		goto fail;
2503 
2504 	receive_flags(lkb, ms);
2505 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
2506 	error = receive_request_args(ls, lkb, ms);
2507 	if (error) {
2508 		__put_lkb(ls, lkb);
2509 		goto fail;
2510 	}
2511 
2512 	namelen = receive_extralen(ms);
2513 
2514 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2515 	if (error) {
2516 		__put_lkb(ls, lkb);
2517 		goto fail;
2518 	}
2519 
2520 	lock_rsb(r);
2521 
2522 	attach_lkb(r, lkb);
2523 	error = do_request(r, lkb);
2524 	send_request_reply(r, lkb, error);
2525 
2526 	unlock_rsb(r);
2527 	put_rsb(r);
2528 
2529 	if (error == -EINPROGRESS)
2530 		error = 0;
2531 	if (error)
2532 		dlm_put_lkb(lkb);
2533 	return;
2534 
2535  fail:
2536 	setup_stub_lkb(ls, ms);
2537 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2538 }
2539 
2540 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2541 {
2542 	struct dlm_lkb *lkb;
2543 	struct dlm_rsb *r;
2544 	int error, reply = 1;
2545 
2546 	error = find_lkb(ls, ms->m_remid, &lkb);
2547 	if (error)
2548 		goto fail;
2549 
2550 	r = lkb->lkb_resource;
2551 
2552 	hold_rsb(r);
2553 	lock_rsb(r);
2554 
2555 	receive_flags(lkb, ms);
2556 	error = receive_convert_args(ls, lkb, ms);
2557 	if (error)
2558 		goto out;
2559 	reply = !down_conversion(lkb);
2560 
2561 	error = do_convert(r, lkb);
2562  out:
2563 	if (reply)
2564 		send_convert_reply(r, lkb, error);
2565 
2566 	unlock_rsb(r);
2567 	put_rsb(r);
2568 	dlm_put_lkb(lkb);
2569 	return;
2570 
2571  fail:
2572 	setup_stub_lkb(ls, ms);
2573 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2574 }
2575 
2576 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2577 {
2578 	struct dlm_lkb *lkb;
2579 	struct dlm_rsb *r;
2580 	int error;
2581 
2582 	error = find_lkb(ls, ms->m_remid, &lkb);
2583 	if (error)
2584 		goto fail;
2585 
2586 	r = lkb->lkb_resource;
2587 
2588 	hold_rsb(r);
2589 	lock_rsb(r);
2590 
2591 	receive_flags(lkb, ms);
2592 	error = receive_unlock_args(ls, lkb, ms);
2593 	if (error)
2594 		goto out;
2595 
2596 	error = do_unlock(r, lkb);
2597  out:
2598 	send_unlock_reply(r, lkb, error);
2599 
2600 	unlock_rsb(r);
2601 	put_rsb(r);
2602 	dlm_put_lkb(lkb);
2603 	return;
2604 
2605  fail:
2606 	setup_stub_lkb(ls, ms);
2607 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2608 }
2609 
2610 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2611 {
2612 	struct dlm_lkb *lkb;
2613 	struct dlm_rsb *r;
2614 	int error;
2615 
2616 	error = find_lkb(ls, ms->m_remid, &lkb);
2617 	if (error)
2618 		goto fail;
2619 
2620 	receive_flags(lkb, ms);
2621 
2622 	r = lkb->lkb_resource;
2623 
2624 	hold_rsb(r);
2625 	lock_rsb(r);
2626 
2627 	error = do_cancel(r, lkb);
2628 	send_cancel_reply(r, lkb, error);
2629 
2630 	unlock_rsb(r);
2631 	put_rsb(r);
2632 	dlm_put_lkb(lkb);
2633 	return;
2634 
2635  fail:
2636 	setup_stub_lkb(ls, ms);
2637 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2638 }
2639 
2640 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2641 {
2642 	struct dlm_lkb *lkb;
2643 	struct dlm_rsb *r;
2644 	int error;
2645 
2646 	error = find_lkb(ls, ms->m_remid, &lkb);
2647 	if (error) {
2648 		log_error(ls, "receive_grant no lkb");
2649 		return;
2650 	}
2651 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2652 
2653 	r = lkb->lkb_resource;
2654 
2655 	hold_rsb(r);
2656 	lock_rsb(r);
2657 
2658 	receive_flags_reply(lkb, ms);
2659 	grant_lock_pc(r, lkb, ms);
2660 	queue_cast(r, lkb, 0);
2661 
2662 	unlock_rsb(r);
2663 	put_rsb(r);
2664 	dlm_put_lkb(lkb);
2665 }
2666 
2667 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2668 {
2669 	struct dlm_lkb *lkb;
2670 	struct dlm_rsb *r;
2671 	int error;
2672 
2673 	error = find_lkb(ls, ms->m_remid, &lkb);
2674 	if (error) {
2675 		log_error(ls, "receive_bast no lkb");
2676 		return;
2677 	}
2678 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2679 
2680 	r = lkb->lkb_resource;
2681 
2682 	hold_rsb(r);
2683 	lock_rsb(r);
2684 
2685 	queue_bast(r, lkb, ms->m_bastmode);
2686 
2687 	unlock_rsb(r);
2688 	put_rsb(r);
2689 	dlm_put_lkb(lkb);
2690 }
2691 
2692 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2693 {
2694 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2695 
2696 	from_nodeid = ms->m_header.h_nodeid;
2697 	our_nodeid = dlm_our_nodeid();
2698 
2699 	len = receive_extralen(ms);
2700 
2701 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2702 	if (dir_nodeid != our_nodeid) {
2703 		log_error(ls, "lookup dir_nodeid %d from %d",
2704 			  dir_nodeid, from_nodeid);
2705 		error = -EINVAL;
2706 		ret_nodeid = -1;
2707 		goto out;
2708 	}
2709 
2710 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2711 
2712 	/* Optimization: we're master so treat lookup as a request */
2713 	if (!error && ret_nodeid == our_nodeid) {
2714 		receive_request(ls, ms);
2715 		return;
2716 	}
2717  out:
2718 	send_lookup_reply(ls, ms, ret_nodeid, error);
2719 }
2720 
2721 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2722 {
2723 	int len, dir_nodeid, from_nodeid;
2724 
2725 	from_nodeid = ms->m_header.h_nodeid;
2726 
2727 	len = receive_extralen(ms);
2728 
2729 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2730 	if (dir_nodeid != dlm_our_nodeid()) {
2731 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
2732 			  dir_nodeid, from_nodeid);
2733 		return;
2734 	}
2735 
2736 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2737 }
2738 
2739 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2740 {
2741 	struct dlm_lkb *lkb;
2742 	struct dlm_rsb *r;
2743 	int error, mstype;
2744 
2745 	error = find_lkb(ls, ms->m_remid, &lkb);
2746 	if (error) {
2747 		log_error(ls, "receive_request_reply no lkb");
2748 		return;
2749 	}
2750 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2751 
2752 	mstype = lkb->lkb_wait_type;
2753 	error = remove_from_waiters(lkb);
2754 	if (error) {
2755 		log_error(ls, "receive_request_reply not on waiters");
2756 		goto out;
2757 	}
2758 
2759 	/* this is the value returned from do_request() on the master */
2760 	error = ms->m_result;
2761 
2762 	r = lkb->lkb_resource;
2763 	hold_rsb(r);
2764 	lock_rsb(r);
2765 
2766 	/* Optimization: the dir node was also the master, so it took our
2767 	   lookup as a request and sent request reply instead of lookup reply */
2768 	if (mstype == DLM_MSG_LOOKUP) {
2769 		r->res_nodeid = ms->m_header.h_nodeid;
2770 		lkb->lkb_nodeid = r->res_nodeid;
2771 	}
2772 
2773 	switch (error) {
2774 	case -EAGAIN:
2775 		/* request would block (be queued) on remote master;
2776 		   the unhold undoes the original ref from create_lkb()
2777 		   so it leads to the lkb being freed */
2778 		queue_cast(r, lkb, -EAGAIN);
2779 		confirm_master(r, -EAGAIN);
2780 		unhold_lkb(lkb);
2781 		break;
2782 
2783 	case -EINPROGRESS:
2784 	case 0:
2785 		/* request was queued or granted on remote master */
2786 		receive_flags_reply(lkb, ms);
2787 		lkb->lkb_remid = ms->m_lkid;
2788 		if (error)
2789 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790 		else {
2791 			grant_lock_pc(r, lkb, ms);
2792 			queue_cast(r, lkb, 0);
2793 		}
2794 		confirm_master(r, error);
2795 		break;
2796 
2797 	case -EBADR:
2798 	case -ENOTBLK:
2799 		/* find_rsb failed to find rsb or rsb wasn't master */
2800 		r->res_nodeid = -1;
2801 		lkb->lkb_nodeid = -1;
2802 		_request_lock(r, lkb);
2803 		break;
2804 
2805 	default:
2806 		log_error(ls, "receive_request_reply error %d", error);
2807 	}
2808 
2809 	unlock_rsb(r);
2810 	put_rsb(r);
2811  out:
2812 	dlm_put_lkb(lkb);
2813 }
2814 
2815 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816 				    struct dlm_message *ms)
2817 {
2818 	int error = ms->m_result;
2819 
2820 	/* this is the value returned from do_convert() on the master */
2821 
2822 	switch (error) {
2823 	case -EAGAIN:
2824 		/* convert would block (be queued) on remote master */
2825 		queue_cast(r, lkb, -EAGAIN);
2826 		break;
2827 
2828 	case -EINPROGRESS:
2829 		/* convert was queued on remote master */
2830 		del_lkb(r, lkb);
2831 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2832 		break;
2833 
2834 	case 0:
2835 		/* convert was granted on remote master */
2836 		receive_flags_reply(lkb, ms);
2837 		grant_lock_pc(r, lkb, ms);
2838 		queue_cast(r, lkb, 0);
2839 		break;
2840 
2841 	default:
2842 		log_error(r->res_ls, "receive_convert_reply error %d", error);
2843 	}
2844 }
2845 
2846 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2847 {
2848 	struct dlm_rsb *r = lkb->lkb_resource;
2849 
2850 	hold_rsb(r);
2851 	lock_rsb(r);
2852 
2853 	__receive_convert_reply(r, lkb, ms);
2854 
2855 	unlock_rsb(r);
2856 	put_rsb(r);
2857 }
2858 
2859 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2860 {
2861 	struct dlm_lkb *lkb;
2862 	int error;
2863 
2864 	error = find_lkb(ls, ms->m_remid, &lkb);
2865 	if (error) {
2866 		log_error(ls, "receive_convert_reply no lkb");
2867 		return;
2868 	}
2869 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2870 
2871 	error = remove_from_waiters(lkb);
2872 	if (error) {
2873 		log_error(ls, "receive_convert_reply not on waiters");
2874 		goto out;
2875 	}
2876 
2877 	_receive_convert_reply(lkb, ms);
2878  out:
2879 	dlm_put_lkb(lkb);
2880 }
2881 
2882 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2883 {
2884 	struct dlm_rsb *r = lkb->lkb_resource;
2885 	int error = ms->m_result;
2886 
2887 	hold_rsb(r);
2888 	lock_rsb(r);
2889 
2890 	/* this is the value returned from do_unlock() on the master */
2891 
2892 	switch (error) {
2893 	case -DLM_EUNLOCK:
2894 		receive_flags_reply(lkb, ms);
2895 		remove_lock_pc(r, lkb);
2896 		queue_cast(r, lkb, -DLM_EUNLOCK);
2897 		break;
2898 	default:
2899 		log_error(r->res_ls, "receive_unlock_reply error %d", error);
2900 	}
2901 
2902 	unlock_rsb(r);
2903 	put_rsb(r);
2904 }
2905 
2906 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2907 {
2908 	struct dlm_lkb *lkb;
2909 	int error;
2910 
2911 	error = find_lkb(ls, ms->m_remid, &lkb);
2912 	if (error) {
2913 		log_error(ls, "receive_unlock_reply no lkb");
2914 		return;
2915 	}
2916 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2917 
2918 	error = remove_from_waiters(lkb);
2919 	if (error) {
2920 		log_error(ls, "receive_unlock_reply not on waiters");
2921 		goto out;
2922 	}
2923 
2924 	_receive_unlock_reply(lkb, ms);
2925  out:
2926 	dlm_put_lkb(lkb);
2927 }
2928 
2929 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930 {
2931 	struct dlm_rsb *r = lkb->lkb_resource;
2932 	int error = ms->m_result;
2933 
2934 	hold_rsb(r);
2935 	lock_rsb(r);
2936 
2937 	/* this is the value returned from do_cancel() on the master */
2938 
2939 	switch (error) {
2940 	case -DLM_ECANCEL:
2941 		receive_flags_reply(lkb, ms);
2942 		revert_lock_pc(r, lkb);
2943 		queue_cast(r, lkb, -DLM_ECANCEL);
2944 		break;
2945 	default:
2946 		log_error(r->res_ls, "receive_cancel_reply error %d", error);
2947 	}
2948 
2949 	unlock_rsb(r);
2950 	put_rsb(r);
2951 }
2952 
2953 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2954 {
2955 	struct dlm_lkb *lkb;
2956 	int error;
2957 
2958 	error = find_lkb(ls, ms->m_remid, &lkb);
2959 	if (error) {
2960 		log_error(ls, "receive_cancel_reply no lkb");
2961 		return;
2962 	}
2963 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2964 
2965 	error = remove_from_waiters(lkb);
2966 	if (error) {
2967 		log_error(ls, "receive_cancel_reply not on waiters");
2968 		goto out;
2969 	}
2970 
2971 	_receive_cancel_reply(lkb, ms);
2972  out:
2973 	dlm_put_lkb(lkb);
2974 }
2975 
2976 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2977 {
2978 	struct dlm_lkb *lkb;
2979 	struct dlm_rsb *r;
2980 	int error, ret_nodeid;
2981 
2982 	error = find_lkb(ls, ms->m_lkid, &lkb);
2983 	if (error) {
2984 		log_error(ls, "receive_lookup_reply no lkb");
2985 		return;
2986 	}
2987 
2988 	error = remove_from_waiters(lkb);
2989 	if (error) {
2990 		log_error(ls, "receive_lookup_reply not on waiters");
2991 		goto out;
2992 	}
2993 
2994 	/* this is the value returned by dlm_dir_lookup on dir node
2995 	   FIXME: will a non-zero error ever be returned? */
2996 	error = ms->m_result;
2997 
2998 	r = lkb->lkb_resource;
2999 	hold_rsb(r);
3000 	lock_rsb(r);
3001 
3002 	ret_nodeid = ms->m_nodeid;
3003 	if (ret_nodeid == dlm_our_nodeid()) {
3004 		r->res_nodeid = 0;
3005 		ret_nodeid = 0;
3006 		r->res_first_lkid = 0;
3007 	} else {
3008 		/* set_master() will copy res_nodeid to lkb_nodeid */
3009 		r->res_nodeid = ret_nodeid;
3010 	}
3011 
3012 	_request_lock(r, lkb);
3013 
3014 	if (!ret_nodeid)
3015 		process_lookup_list(r);
3016 
3017 	unlock_rsb(r);
3018 	put_rsb(r);
3019  out:
3020 	dlm_put_lkb(lkb);
3021 }
3022 
3023 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3024 {
3025 	struct dlm_message *ms = (struct dlm_message *) hd;
3026 	struct dlm_ls *ls;
3027 	int error = 0;
3028 
3029 	if (!recovery)
3030 		dlm_message_in(ms);
3031 
3032 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3033 	if (!ls) {
3034 		log_print("drop message %d from %d for unknown lockspace %d",
3035 			  ms->m_type, nodeid, hd->h_lockspace);
3036 		return -EINVAL;
3037 	}
3038 
3039 	/* recovery may have just ended leaving a bunch of backed-up requests
3040 	   in the requestqueue; wait while dlm_recoverd clears them */
3041 
3042 	if (!recovery)
3043 		dlm_wait_requestqueue(ls);
3044 
3045 	/* recovery may have just started while there were a bunch of
3046 	   in-flight requests -- save them in requestqueue to be processed
3047 	   after recovery.  we can't let dlm_recvd block on the recovery
3048 	   lock.  if dlm_recoverd is calling this function to clear the
3049 	   requestqueue, it needs to be interrupted (-EINTR) if another
3050 	   recovery operation is starting. */
3051 
3052 	while (1) {
3053 		if (dlm_locking_stopped(ls)) {
3054 			if (recovery) {
3055 				error = -EINTR;
3056 				goto out;
3057 			}
3058 			error = dlm_add_requestqueue(ls, nodeid, hd);
3059 			if (error == -EAGAIN)
3060 				continue;
3061 			else {
3062 				error = -EINTR;
3063 				goto out;
3064 			}
3065 		}
3066 
3067 		if (lock_recovery_try(ls))
3068 			break;
3069 		schedule();
3070 	}
3071 
3072 	switch (ms->m_type) {
3073 
3074 	/* messages sent to a master node */
3075 
3076 	case DLM_MSG_REQUEST:
3077 		receive_request(ls, ms);
3078 		break;
3079 
3080 	case DLM_MSG_CONVERT:
3081 		receive_convert(ls, ms);
3082 		break;
3083 
3084 	case DLM_MSG_UNLOCK:
3085 		receive_unlock(ls, ms);
3086 		break;
3087 
3088 	case DLM_MSG_CANCEL:
3089 		receive_cancel(ls, ms);
3090 		break;
3091 
3092 	/* messages sent from a master node (replies to above) */
3093 
3094 	case DLM_MSG_REQUEST_REPLY:
3095 		receive_request_reply(ls, ms);
3096 		break;
3097 
3098 	case DLM_MSG_CONVERT_REPLY:
3099 		receive_convert_reply(ls, ms);
3100 		break;
3101 
3102 	case DLM_MSG_UNLOCK_REPLY:
3103 		receive_unlock_reply(ls, ms);
3104 		break;
3105 
3106 	case DLM_MSG_CANCEL_REPLY:
3107 		receive_cancel_reply(ls, ms);
3108 		break;
3109 
3110 	/* messages sent from a master node (only two types of async msg) */
3111 
3112 	case DLM_MSG_GRANT:
3113 		receive_grant(ls, ms);
3114 		break;
3115 
3116 	case DLM_MSG_BAST:
3117 		receive_bast(ls, ms);
3118 		break;
3119 
3120 	/* messages sent to a dir node */
3121 
3122 	case DLM_MSG_LOOKUP:
3123 		receive_lookup(ls, ms);
3124 		break;
3125 
3126 	case DLM_MSG_REMOVE:
3127 		receive_remove(ls, ms);
3128 		break;
3129 
3130 	/* messages sent from a dir node (remove has no reply) */
3131 
3132 	case DLM_MSG_LOOKUP_REPLY:
3133 		receive_lookup_reply(ls, ms);
3134 		break;
3135 
3136 	default:
3137 		log_error(ls, "unknown message type %d", ms->m_type);
3138 	}
3139 
3140 	unlock_recovery(ls);
3141  out:
3142 	dlm_put_lockspace(ls);
3143 	dlm_astd_wake();
3144 	return error;
3145 }
3146 
3147 
3148 /*
3149  * Recovery related
3150  */
3151 
3152 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3153 {
3154 	if (middle_conversion(lkb)) {
3155 		hold_lkb(lkb);
3156 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3157 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158 		_remove_from_waiters(lkb);
3159 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3160 
3161 		/* Same special case as in receive_rcom_lock_args() */
3162 		lkb->lkb_grmode = DLM_LOCK_IV;
3163 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3164 		unhold_lkb(lkb);
3165 
3166 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3167 		lkb->lkb_flags |= DLM_IFL_RESEND;
3168 	}
3169 
3170 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3171 	   conversions are async; there's no reply from the remote master */
3172 }
3173 
3174 /* A waiting lkb needs recovery if the master node has failed, or
3175    the master node is changing (only when no directory is used) */
3176 
3177 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3178 {
3179 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3180 		return 1;
3181 
3182 	if (!dlm_no_directory(ls))
3183 		return 0;
3184 
3185 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3186 		return 1;
3187 
3188 	return 0;
3189 }
3190 
3191 /* Recovery for locks that are waiting for replies from nodes that are now
3192    gone.  We can just complete unlocks and cancels by faking a reply from the
3193    dead node.  Requests and up-conversions we flag to be resent after
3194    recovery.  Down-conversions can just be completed with a fake reply like
3195    unlocks.  Conversions between PR and CW need special attention. */
3196 
3197 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3198 {
3199 	struct dlm_lkb *lkb, *safe;
3200 
3201 	mutex_lock(&ls->ls_waiters_mutex);
3202 
3203 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3204 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3205 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3206 
3207 		/* all outstanding lookups, regardless of destination  will be
3208 		   resent after recovery is done */
3209 
3210 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3211 			lkb->lkb_flags |= DLM_IFL_RESEND;
3212 			continue;
3213 		}
3214 
3215 		if (!waiter_needs_recovery(ls, lkb))
3216 			continue;
3217 
3218 		switch (lkb->lkb_wait_type) {
3219 
3220 		case DLM_MSG_REQUEST:
3221 			lkb->lkb_flags |= DLM_IFL_RESEND;
3222 			break;
3223 
3224 		case DLM_MSG_CONVERT:
3225 			recover_convert_waiter(ls, lkb);
3226 			break;
3227 
3228 		case DLM_MSG_UNLOCK:
3229 			hold_lkb(lkb);
3230 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232 			_remove_from_waiters(lkb);
3233 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3234 			dlm_put_lkb(lkb);
3235 			break;
3236 
3237 		case DLM_MSG_CANCEL:
3238 			hold_lkb(lkb);
3239 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241 			_remove_from_waiters(lkb);
3242 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3243 			dlm_put_lkb(lkb);
3244 			break;
3245 
3246 		default:
3247 			log_error(ls, "invalid lkb wait_type %d",
3248 				  lkb->lkb_wait_type);
3249 		}
3250 		schedule();
3251 	}
3252 	mutex_unlock(&ls->ls_waiters_mutex);
3253 }
3254 
3255 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3256 {
3257 	struct dlm_lkb *lkb;
3258 	int rv = 0;
3259 
3260 	mutex_lock(&ls->ls_waiters_mutex);
3261 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263 			rv = lkb->lkb_wait_type;
3264 			_remove_from_waiters(lkb);
3265 			lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266 			break;
3267 		}
3268 	}
3269 	mutex_unlock(&ls->ls_waiters_mutex);
3270 
3271 	if (!rv)
3272 		lkb = NULL;
3273 	*lkb_ret = lkb;
3274 	return rv;
3275 }
3276 
3277 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3278    master or dir-node for r.  Processing the lkb may result in it being placed
3279    back on waiters. */
3280 
3281 int dlm_recover_waiters_post(struct dlm_ls *ls)
3282 {
3283 	struct dlm_lkb *lkb;
3284 	struct dlm_rsb *r;
3285 	int error = 0, mstype;
3286 
3287 	while (1) {
3288 		if (dlm_locking_stopped(ls)) {
3289 			log_debug(ls, "recover_waiters_post aborted");
3290 			error = -EINTR;
3291 			break;
3292 		}
3293 
3294 		mstype = remove_resend_waiter(ls, &lkb);
3295 		if (!mstype)
3296 			break;
3297 
3298 		r = lkb->lkb_resource;
3299 
3300 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3302 
3303 		switch (mstype) {
3304 
3305 		case DLM_MSG_LOOKUP:
3306 			hold_rsb(r);
3307 			lock_rsb(r);
3308 			_request_lock(r, lkb);
3309 			if (is_master(r))
3310 				confirm_master(r, 0);
3311 			unlock_rsb(r);
3312 			put_rsb(r);
3313 			break;
3314 
3315 		case DLM_MSG_REQUEST:
3316 			hold_rsb(r);
3317 			lock_rsb(r);
3318 			_request_lock(r, lkb);
3319 			if (is_master(r))
3320 				confirm_master(r, 0);
3321 			unlock_rsb(r);
3322 			put_rsb(r);
3323 			break;
3324 
3325 		case DLM_MSG_CONVERT:
3326 			hold_rsb(r);
3327 			lock_rsb(r);
3328 			_convert_lock(r, lkb);
3329 			unlock_rsb(r);
3330 			put_rsb(r);
3331 			break;
3332 
3333 		default:
3334 			log_error(ls, "recover_waiters_post type %d", mstype);
3335 		}
3336 	}
3337 
3338 	return error;
3339 }
3340 
3341 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3342 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3343 {
3344 	struct dlm_ls *ls = r->res_ls;
3345 	struct dlm_lkb *lkb, *safe;
3346 
3347 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3348 		if (test(ls, lkb)) {
3349 			rsb_set_flag(r, RSB_LOCKS_PURGED);
3350 			del_lkb(r, lkb);
3351 			/* this put should free the lkb */
3352 			if (!dlm_put_lkb(lkb))
3353 				log_error(ls, "purged lkb not released");
3354 		}
3355 	}
3356 }
3357 
3358 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3359 {
3360 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3361 }
3362 
3363 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3364 {
3365 	return is_master_copy(lkb);
3366 }
3367 
3368 static void purge_dead_locks(struct dlm_rsb *r)
3369 {
3370 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3371 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3372 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3373 }
3374 
3375 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3376 {
3377 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3378 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3379 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3380 }
3381 
3382 /* Get rid of locks held by nodes that are gone. */
3383 
3384 int dlm_purge_locks(struct dlm_ls *ls)
3385 {
3386 	struct dlm_rsb *r;
3387 
3388 	log_debug(ls, "dlm_purge_locks");
3389 
3390 	down_write(&ls->ls_root_sem);
3391 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3392 		hold_rsb(r);
3393 		lock_rsb(r);
3394 		if (is_master(r))
3395 			purge_dead_locks(r);
3396 		unlock_rsb(r);
3397 		unhold_rsb(r);
3398 
3399 		schedule();
3400 	}
3401 	up_write(&ls->ls_root_sem);
3402 
3403 	return 0;
3404 }
3405 
3406 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3407 {
3408 	struct dlm_rsb *r, *r_ret = NULL;
3409 
3410 	read_lock(&ls->ls_rsbtbl[bucket].lock);
3411 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3412 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
3413 			continue;
3414 		hold_rsb(r);
3415 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
3416 		r_ret = r;
3417 		break;
3418 	}
3419 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
3420 	return r_ret;
3421 }
3422 
3423 void dlm_grant_after_purge(struct dlm_ls *ls)
3424 {
3425 	struct dlm_rsb *r;
3426 	int bucket = 0;
3427 
3428 	while (1) {
3429 		r = find_purged_rsb(ls, bucket);
3430 		if (!r) {
3431 			if (bucket == ls->ls_rsbtbl_size - 1)
3432 				break;
3433 			bucket++;
3434 			continue;
3435 		}
3436 		lock_rsb(r);
3437 		if (is_master(r)) {
3438 			grant_pending_locks(r);
3439 			confirm_master(r, 0);
3440 		}
3441 		unlock_rsb(r);
3442 		put_rsb(r);
3443 		schedule();
3444 	}
3445 }
3446 
3447 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3448 					 uint32_t remid)
3449 {
3450 	struct dlm_lkb *lkb;
3451 
3452 	list_for_each_entry(lkb, head, lkb_statequeue) {
3453 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3454 			return lkb;
3455 	}
3456 	return NULL;
3457 }
3458 
3459 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3460 				    uint32_t remid)
3461 {
3462 	struct dlm_lkb *lkb;
3463 
3464 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3465 	if (lkb)
3466 		return lkb;
3467 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3468 	if (lkb)
3469 		return lkb;
3470 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3471 	if (lkb)
3472 		return lkb;
3473 	return NULL;
3474 }
3475 
3476 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3477 				  struct dlm_rsb *r, struct dlm_rcom *rc)
3478 {
3479 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3480 	int lvblen;
3481 
3482 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3483 	lkb->lkb_ownpid = rl->rl_ownpid;
3484 	lkb->lkb_remid = rl->rl_lkid;
3485 	lkb->lkb_exflags = rl->rl_exflags;
3486 	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3487 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3488 	lkb->lkb_lvbseq = rl->rl_lvbseq;
3489 	lkb->lkb_rqmode = rl->rl_rqmode;
3490 	lkb->lkb_grmode = rl->rl_grmode;
3491 	/* don't set lkb_status because add_lkb wants to itself */
3492 
3493 	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3494 	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3495 
3496 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3497 		lkb->lkb_lvbptr = allocate_lvb(ls);
3498 		if (!lkb->lkb_lvbptr)
3499 			return -ENOMEM;
3500 		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3501 			 sizeof(struct rcom_lock);
3502 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3503 	}
3504 
3505 	/* Conversions between PR and CW (middle modes) need special handling.
3506 	   The real granted mode of these converting locks cannot be determined
3507 	   until all locks have been rebuilt on the rsb (recover_conversion) */
3508 
3509 	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3510 		rl->rl_status = DLM_LKSTS_CONVERT;
3511 		lkb->lkb_grmode = DLM_LOCK_IV;
3512 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
3513 	}
3514 
3515 	return 0;
3516 }
3517 
3518 /* This lkb may have been recovered in a previous aborted recovery so we need
3519    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3520    If so we just send back a standard reply.  If not, we create a new lkb with
3521    the given values and send back our lkid.  We send back our lkid by sending
3522    back the rcom_lock struct we got but with the remid field filled in. */
3523 
3524 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3525 {
3526 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3527 	struct dlm_rsb *r;
3528 	struct dlm_lkb *lkb;
3529 	int error;
3530 
3531 	if (rl->rl_parent_lkid) {
3532 		error = -EOPNOTSUPP;
3533 		goto out;
3534 	}
3535 
3536 	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3537 	if (error)
3538 		goto out;
3539 
3540 	lock_rsb(r);
3541 
3542 	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3543 	if (lkb) {
3544 		error = -EEXIST;
3545 		goto out_remid;
3546 	}
3547 
3548 	error = create_lkb(ls, &lkb);
3549 	if (error)
3550 		goto out_unlock;
3551 
3552 	error = receive_rcom_lock_args(ls, lkb, r, rc);
3553 	if (error) {
3554 		__put_lkb(ls, lkb);
3555 		goto out_unlock;
3556 	}
3557 
3558 	attach_lkb(r, lkb);
3559 	add_lkb(r, lkb, rl->rl_status);
3560 	error = 0;
3561 
3562  out_remid:
3563 	/* this is the new value returned to the lock holder for
3564 	   saving in its process-copy lkb */
3565 	rl->rl_remid = lkb->lkb_id;
3566 
3567  out_unlock:
3568 	unlock_rsb(r);
3569 	put_rsb(r);
3570  out:
3571 	if (error)
3572 		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3573 	rl->rl_result = error;
3574 	return error;
3575 }
3576 
3577 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3578 {
3579 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3580 	struct dlm_rsb *r;
3581 	struct dlm_lkb *lkb;
3582 	int error;
3583 
3584 	error = find_lkb(ls, rl->rl_lkid, &lkb);
3585 	if (error) {
3586 		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3587 		return error;
3588 	}
3589 
3590 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3591 
3592 	error = rl->rl_result;
3593 
3594 	r = lkb->lkb_resource;
3595 	hold_rsb(r);
3596 	lock_rsb(r);
3597 
3598 	switch (error) {
3599 	case -EBADR:
3600 		/* There's a chance the new master received our lock before
3601 		   dlm_recover_master_reply(), this wouldn't happen if we did
3602 		   a barrier between recover_masters and recover_locks. */
3603 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3604 			  (unsigned long)r, r->res_name);
3605 		dlm_send_rcom_lock(r, lkb);
3606 		goto out;
3607 	case -EEXIST:
3608 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
3609 		/* fall through */
3610 	case 0:
3611 		lkb->lkb_remid = rl->rl_remid;
3612 		break;
3613 	default:
3614 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3615 			  error, lkb->lkb_id);
3616 	}
3617 
3618 	/* an ack for dlm_recover_locks() which waits for replies from
3619 	   all the locks it sends to new masters */
3620 	dlm_recovered_lock(r);
3621  out:
3622 	unlock_rsb(r);
3623 	put_rsb(r);
3624 	dlm_put_lkb(lkb);
3625 
3626 	return 0;
3627 }
3628 
3629 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3630 		     int mode, uint32_t flags, void *name, unsigned int namelen,
3631 		     uint32_t parent_lkid)
3632 {
3633 	struct dlm_lkb *lkb;
3634 	struct dlm_args args;
3635 	int error;
3636 
3637 	lock_recovery(ls);
3638 
3639 	error = create_lkb(ls, &lkb);
3640 	if (error) {
3641 		kfree(ua);
3642 		goto out;
3643 	}
3644 
3645 	if (flags & DLM_LKF_VALBLK) {
3646 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3647 		if (!ua->lksb.sb_lvbptr) {
3648 			kfree(ua);
3649 			__put_lkb(ls, lkb);
3650 			error = -ENOMEM;
3651 			goto out;
3652 		}
3653 	}
3654 
3655 	/* After ua is attached to lkb it will be freed by free_lkb().
3656 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
3657 	   lock and that lkb_astparam is the dlm_user_args structure. */
3658 
3659 	error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3660 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3661 	lkb->lkb_flags |= DLM_IFL_USER;
3662 	ua->old_mode = DLM_LOCK_IV;
3663 
3664 	if (error) {
3665 		__put_lkb(ls, lkb);
3666 		goto out;
3667 	}
3668 
3669 	error = request_lock(ls, lkb, name, namelen, &args);
3670 
3671 	switch (error) {
3672 	case 0:
3673 		break;
3674 	case -EINPROGRESS:
3675 		error = 0;
3676 		break;
3677 	case -EAGAIN:
3678 		error = 0;
3679 		/* fall through */
3680 	default:
3681 		__put_lkb(ls, lkb);
3682 		goto out;
3683 	}
3684 
3685 	/* add this new lkb to the per-process list of locks */
3686 	spin_lock(&ua->proc->locks_spin);
3687 	kref_get(&lkb->lkb_ref);
3688 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689 	spin_unlock(&ua->proc->locks_spin);
3690  out:
3691 	unlock_recovery(ls);
3692 	return error;
3693 }
3694 
3695 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3696 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3697 {
3698 	struct dlm_lkb *lkb;
3699 	struct dlm_args args;
3700 	struct dlm_user_args *ua;
3701 	int error;
3702 
3703 	lock_recovery(ls);
3704 
3705 	error = find_lkb(ls, lkid, &lkb);
3706 	if (error)
3707 		goto out;
3708 
3709 	/* user can change the params on its lock when it converts it, or
3710 	   add an lvb that didn't exist before */
3711 
3712 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3713 
3714 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3715 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3716 		if (!ua->lksb.sb_lvbptr) {
3717 			error = -ENOMEM;
3718 			goto out_put;
3719 		}
3720 	}
3721 	if (lvb_in && ua->lksb.sb_lvbptr)
3722 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3723 
3724 	ua->castparam = ua_tmp->castparam;
3725 	ua->castaddr = ua_tmp->castaddr;
3726 	ua->bastparam = ua_tmp->bastparam;
3727 	ua->bastaddr = ua_tmp->bastaddr;
3728 	ua->user_lksb = ua_tmp->user_lksb;
3729 	ua->old_mode = lkb->lkb_grmode;
3730 
3731 	error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3732 			      ua, DLM_FAKE_USER_AST, &args);
3733 	if (error)
3734 		goto out_put;
3735 
3736 	error = convert_lock(ls, lkb, &args);
3737 
3738 	if (error == -EINPROGRESS || error == -EAGAIN)
3739 		error = 0;
3740  out_put:
3741 	dlm_put_lkb(lkb);
3742  out:
3743 	unlock_recovery(ls);
3744 	kfree(ua_tmp);
3745 	return error;
3746 }
3747 
3748 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3749 		    uint32_t flags, uint32_t lkid, char *lvb_in)
3750 {
3751 	struct dlm_lkb *lkb;
3752 	struct dlm_args args;
3753 	struct dlm_user_args *ua;
3754 	int error;
3755 
3756 	lock_recovery(ls);
3757 
3758 	error = find_lkb(ls, lkid, &lkb);
3759 	if (error)
3760 		goto out;
3761 
3762 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3763 
3764 	if (lvb_in && ua->lksb.sb_lvbptr)
3765 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3766 	ua->castparam = ua_tmp->castparam;
3767 	ua->user_lksb = ua_tmp->user_lksb;
3768 
3769 	error = set_unlock_args(flags, ua, &args);
3770 	if (error)
3771 		goto out_put;
3772 
3773 	error = unlock_lock(ls, lkb, &args);
3774 
3775 	if (error == -DLM_EUNLOCK)
3776 		error = 0;
3777 	if (error)
3778 		goto out_put;
3779 
3780 	spin_lock(&ua->proc->locks_spin);
3781 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
3782 	if (!list_empty(&lkb->lkb_ownqueue))
3783 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3784 	spin_unlock(&ua->proc->locks_spin);
3785  out_put:
3786 	dlm_put_lkb(lkb);
3787  out:
3788 	unlock_recovery(ls);
3789 	return error;
3790 }
3791 
3792 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3793 		    uint32_t flags, uint32_t lkid)
3794 {
3795 	struct dlm_lkb *lkb;
3796 	struct dlm_args args;
3797 	struct dlm_user_args *ua;
3798 	int error;
3799 
3800 	lock_recovery(ls);
3801 
3802 	error = find_lkb(ls, lkid, &lkb);
3803 	if (error)
3804 		goto out;
3805 
3806 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3807 	ua->castparam = ua_tmp->castparam;
3808 	ua->user_lksb = ua_tmp->user_lksb;
3809 
3810 	error = set_unlock_args(flags, ua, &args);
3811 	if (error)
3812 		goto out_put;
3813 
3814 	error = cancel_lock(ls, lkb, &args);
3815 
3816 	if (error == -DLM_ECANCEL)
3817 		error = 0;
3818 	if (error)
3819 		goto out_put;
3820 
3821 	/* this lkb was removed from the WAITING queue */
3822 	if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823 		spin_lock(&ua->proc->locks_spin);
3824 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825 		spin_unlock(&ua->proc->locks_spin);
3826 	}
3827  out_put:
3828 	dlm_put_lkb(lkb);
3829  out:
3830 	unlock_recovery(ls);
3831 	return error;
3832 }
3833 
3834 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3835 {
3836 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3837 
3838 	if (ua->lksb.sb_lvbptr)
3839 		kfree(ua->lksb.sb_lvbptr);
3840 	kfree(ua);
3841 	lkb->lkb_astparam = (long)NULL;
3842 
3843 	/* TODO: propogate to master if needed */
3844 	return 0;
3845 }
3846 
3847 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3848    Regardless of what rsb queue the lock is on, it's removed and freed. */
3849 
3850 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3851 {
3852 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3853 	struct dlm_args args;
3854 	int error;
3855 
3856 	/* FIXME: we need to handle the case where the lkb is in limbo
3857 	   while the rsb is being looked up, currently we assert in
3858 	   _unlock_lock/is_remote because rsb nodeid is -1. */
3859 
3860 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3861 
3862 	error = unlock_lock(ls, lkb, &args);
3863 	if (error == -DLM_EUNLOCK)
3864 		error = 0;
3865 	return error;
3866 }
3867 
3868 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870    which we clear here. */
3871 
3872 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3873    list, and no more device_writes should add lkb's to proc->locks list; so we
3874    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3875    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3876    them ourself. */
3877 
3878 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3879 {
3880 	struct dlm_lkb *lkb, *safe;
3881 
3882 	lock_recovery(ls);
3883 	mutex_lock(&ls->ls_clear_proc_locks);
3884 
3885 	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3886 		list_del_init(&lkb->lkb_ownqueue);
3887 
3888 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3889 			lkb->lkb_flags |= DLM_IFL_ORPHAN;
3890 			orphan_proc_lock(ls, lkb);
3891 		} else {
3892 			lkb->lkb_flags |= DLM_IFL_DEAD;
3893 			unlock_proc_lock(ls, lkb);
3894 		}
3895 
3896 		/* this removes the reference for the proc->locks list
3897 		   added by dlm_user_request, it may result in the lkb
3898 		   being freed */
3899 
3900 		dlm_put_lkb(lkb);
3901 	}
3902 
3903 	/* in-progress unlocks */
3904 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905 		list_del_init(&lkb->lkb_ownqueue);
3906 		lkb->lkb_flags |= DLM_IFL_DEAD;
3907 		dlm_put_lkb(lkb);
3908 	}
3909 
3910 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
3911 		list_del(&lkb->lkb_astqueue);
3912 		dlm_put_lkb(lkb);
3913 	}
3914 
3915 	mutex_unlock(&ls->ls_clear_proc_locks);
3916 	unlock_recovery(ls);
3917 }
3918 
3919