xref: /linux/fs/dlm/lock.c (revision 5499b45190237ca90dd2ac86395cf464fe1f4cc7)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
172 	       r->res_recover_locks_count, r->res_name);
173 }
174 
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177 	struct dlm_lkb *lkb;
178 
179 	dlm_print_rsb(r);
180 
181 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 	printk(KERN_ERR "rsb lookup list\n");
184 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb grant queue:\n");
187 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb convert queue:\n");
190 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb wait queue:\n");
193 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201 	down_read(&ls->ls_in_recovery);
202 }
203 
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206 	up_read(&ls->ls_in_recovery);
207 }
208 
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211 	return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 	return !!r->res_nodeid;
243 }
244 
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261 		return 1;
262 	return 0;
263 }
264 
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 				  DLM_IFL_OVERLAP_CANCEL));
284 }
285 
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288 	if (is_master_copy(lkb))
289 		return;
290 
291 	del_timeout(lkb);
292 
293 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 	   timeout caused the cancel then return -ETIMEDOUT */
297 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299 		rv = -ETIMEDOUT;
300 	}
301 
302 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304 		rv = -EDEADLK;
305 	}
306 
307 	lkb->lkb_lksb->sb_status = rv;
308 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310 	dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
311 }
312 
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315 	queue_cast(r, lkb,
316 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321 	lkb->lkb_time_bast = ktime_get();
322 
323 	if (is_master_copy(lkb)) {
324 		lkb->lkb_bastmode = rqmode; /* printed by debugfs */
325 		send_bast(r, lkb, rqmode);
326 	} else {
327 		dlm_add_ast(lkb, AST_BAST, rqmode);
328 	}
329 }
330 
331 /*
332  * Basic operations on rsb's and lkb's
333  */
334 
335 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
336 {
337 	struct dlm_rsb *r;
338 
339 	r = dlm_allocate_rsb(ls, len);
340 	if (!r)
341 		return NULL;
342 
343 	r->res_ls = ls;
344 	r->res_length = len;
345 	memcpy(r->res_name, name, len);
346 	mutex_init(&r->res_mutex);
347 
348 	INIT_LIST_HEAD(&r->res_lookup);
349 	INIT_LIST_HEAD(&r->res_grantqueue);
350 	INIT_LIST_HEAD(&r->res_convertqueue);
351 	INIT_LIST_HEAD(&r->res_waitqueue);
352 	INIT_LIST_HEAD(&r->res_root_list);
353 	INIT_LIST_HEAD(&r->res_recover_list);
354 
355 	return r;
356 }
357 
358 static int search_rsb_list(struct list_head *head, char *name, int len,
359 			   unsigned int flags, struct dlm_rsb **r_ret)
360 {
361 	struct dlm_rsb *r;
362 	int error = 0;
363 
364 	list_for_each_entry(r, head, res_hashchain) {
365 		if (len == r->res_length && !memcmp(name, r->res_name, len))
366 			goto found;
367 	}
368 	*r_ret = NULL;
369 	return -EBADR;
370 
371  found:
372 	if (r->res_nodeid && (flags & R_MASTER))
373 		error = -ENOTBLK;
374 	*r_ret = r;
375 	return error;
376 }
377 
378 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
379 		       unsigned int flags, struct dlm_rsb **r_ret)
380 {
381 	struct dlm_rsb *r;
382 	int error;
383 
384 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
385 	if (!error) {
386 		kref_get(&r->res_ref);
387 		goto out;
388 	}
389 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
390 	if (error)
391 		goto out;
392 
393 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
394 
395 	if (dlm_no_directory(ls))
396 		goto out;
397 
398 	if (r->res_nodeid == -1) {
399 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
400 		r->res_first_lkid = 0;
401 	} else if (r->res_nodeid > 0) {
402 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
403 		r->res_first_lkid = 0;
404 	} else {
405 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
406 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
407 	}
408  out:
409 	*r_ret = r;
410 	return error;
411 }
412 
413 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
414 		      unsigned int flags, struct dlm_rsb **r_ret)
415 {
416 	int error;
417 	spin_lock(&ls->ls_rsbtbl[b].lock);
418 	error = _search_rsb(ls, name, len, b, flags, r_ret);
419 	spin_unlock(&ls->ls_rsbtbl[b].lock);
420 	return error;
421 }
422 
423 /*
424  * Find rsb in rsbtbl and potentially create/add one
425  *
426  * Delaying the release of rsb's has a similar benefit to applications keeping
427  * NL locks on an rsb, but without the guarantee that the cached master value
428  * will still be valid when the rsb is reused.  Apps aren't always smart enough
429  * to keep NL locks on an rsb that they may lock again shortly; this can lead
430  * to excessive master lookups and removals if we don't delay the release.
431  *
432  * Searching for an rsb means looking through both the normal list and toss
433  * list.  When found on the toss list the rsb is moved to the normal list with
434  * ref count of 1; when found on normal list the ref count is incremented.
435  */
436 
437 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
438 		    unsigned int flags, struct dlm_rsb **r_ret)
439 {
440 	struct dlm_rsb *r = NULL, *tmp;
441 	uint32_t hash, bucket;
442 	int error = -EINVAL;
443 
444 	if (namelen > DLM_RESNAME_MAXLEN)
445 		goto out;
446 
447 	if (dlm_no_directory(ls))
448 		flags |= R_CREATE;
449 
450 	error = 0;
451 	hash = jhash(name, namelen, 0);
452 	bucket = hash & (ls->ls_rsbtbl_size - 1);
453 
454 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
455 	if (!error)
456 		goto out;
457 
458 	if (error == -EBADR && !(flags & R_CREATE))
459 		goto out;
460 
461 	/* the rsb was found but wasn't a master copy */
462 	if (error == -ENOTBLK)
463 		goto out;
464 
465 	error = -ENOMEM;
466 	r = create_rsb(ls, name, namelen);
467 	if (!r)
468 		goto out;
469 
470 	r->res_hash = hash;
471 	r->res_bucket = bucket;
472 	r->res_nodeid = -1;
473 	kref_init(&r->res_ref);
474 
475 	/* With no directory, the master can be set immediately */
476 	if (dlm_no_directory(ls)) {
477 		int nodeid = dlm_dir_nodeid(r);
478 		if (nodeid == dlm_our_nodeid())
479 			nodeid = 0;
480 		r->res_nodeid = nodeid;
481 	}
482 
483 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
484 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
485 	if (!error) {
486 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
487 		dlm_free_rsb(r);
488 		r = tmp;
489 		goto out;
490 	}
491 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
492 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
493 	error = 0;
494  out:
495 	*r_ret = r;
496 	return error;
497 }
498 
499 /* This is only called to add a reference when the code already holds
500    a valid reference to the rsb, so there's no need for locking. */
501 
502 static inline void hold_rsb(struct dlm_rsb *r)
503 {
504 	kref_get(&r->res_ref);
505 }
506 
507 void dlm_hold_rsb(struct dlm_rsb *r)
508 {
509 	hold_rsb(r);
510 }
511 
512 static void toss_rsb(struct kref *kref)
513 {
514 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
515 	struct dlm_ls *ls = r->res_ls;
516 
517 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
518 	kref_init(&r->res_ref);
519 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
520 	r->res_toss_time = jiffies;
521 	if (r->res_lvbptr) {
522 		dlm_free_lvb(r->res_lvbptr);
523 		r->res_lvbptr = NULL;
524 	}
525 }
526 
527 /* When all references to the rsb are gone it's transfered to
528    the tossed list for later disposal. */
529 
530 static void put_rsb(struct dlm_rsb *r)
531 {
532 	struct dlm_ls *ls = r->res_ls;
533 	uint32_t bucket = r->res_bucket;
534 
535 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
536 	kref_put(&r->res_ref, toss_rsb);
537 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
538 }
539 
540 void dlm_put_rsb(struct dlm_rsb *r)
541 {
542 	put_rsb(r);
543 }
544 
545 /* See comment for unhold_lkb */
546 
547 static void unhold_rsb(struct dlm_rsb *r)
548 {
549 	int rv;
550 	rv = kref_put(&r->res_ref, toss_rsb);
551 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
552 }
553 
554 static void kill_rsb(struct kref *kref)
555 {
556 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
557 
558 	/* All work is done after the return from kref_put() so we
559 	   can release the write_lock before the remove and free. */
560 
561 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
562 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
563 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
564 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
565 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
566 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
567 }
568 
569 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
570    The rsb must exist as long as any lkb's for it do. */
571 
572 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
573 {
574 	hold_rsb(r);
575 	lkb->lkb_resource = r;
576 }
577 
578 static void detach_lkb(struct dlm_lkb *lkb)
579 {
580 	if (lkb->lkb_resource) {
581 		put_rsb(lkb->lkb_resource);
582 		lkb->lkb_resource = NULL;
583 	}
584 }
585 
586 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
587 {
588 	struct dlm_lkb *lkb, *tmp;
589 	uint32_t lkid = 0;
590 	uint16_t bucket;
591 
592 	lkb = dlm_allocate_lkb(ls);
593 	if (!lkb)
594 		return -ENOMEM;
595 
596 	lkb->lkb_nodeid = -1;
597 	lkb->lkb_grmode = DLM_LOCK_IV;
598 	kref_init(&lkb->lkb_ref);
599 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
600 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601 	INIT_LIST_HEAD(&lkb->lkb_time_list);
602 
603 	get_random_bytes(&bucket, sizeof(bucket));
604 	bucket &= (ls->ls_lkbtbl_size - 1);
605 
606 	write_lock(&ls->ls_lkbtbl[bucket].lock);
607 
608 	/* counter can roll over so we must verify lkid is not in use */
609 
610 	while (lkid == 0) {
611 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
612 
613 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
614 				    lkb_idtbl_list) {
615 			if (tmp->lkb_id != lkid)
616 				continue;
617 			lkid = 0;
618 			break;
619 		}
620 	}
621 
622 	lkb->lkb_id = lkid;
623 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
624 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
625 
626 	*lkb_ret = lkb;
627 	return 0;
628 }
629 
630 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
631 {
632 	struct dlm_lkb *lkb;
633 	uint16_t bucket = (lkid >> 16);
634 
635 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
636 		if (lkb->lkb_id == lkid)
637 			return lkb;
638 	}
639 	return NULL;
640 }
641 
642 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
643 {
644 	struct dlm_lkb *lkb;
645 	uint16_t bucket = (lkid >> 16);
646 
647 	if (bucket >= ls->ls_lkbtbl_size)
648 		return -EBADSLT;
649 
650 	read_lock(&ls->ls_lkbtbl[bucket].lock);
651 	lkb = __find_lkb(ls, lkid);
652 	if (lkb)
653 		kref_get(&lkb->lkb_ref);
654 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
655 
656 	*lkb_ret = lkb;
657 	return lkb ? 0 : -ENOENT;
658 }
659 
660 static void kill_lkb(struct kref *kref)
661 {
662 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
663 
664 	/* All work is done after the return from kref_put() so we
665 	   can release the write_lock before the detach_lkb */
666 
667 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
668 }
669 
670 /* __put_lkb() is used when an lkb may not have an rsb attached to
671    it so we need to provide the lockspace explicitly */
672 
673 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
674 {
675 	uint16_t bucket = (lkb->lkb_id >> 16);
676 
677 	write_lock(&ls->ls_lkbtbl[bucket].lock);
678 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
679 		list_del(&lkb->lkb_idtbl_list);
680 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
681 
682 		detach_lkb(lkb);
683 
684 		/* for local/process lkbs, lvbptr points to caller's lksb */
685 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
686 			dlm_free_lvb(lkb->lkb_lvbptr);
687 		dlm_free_lkb(lkb);
688 		return 1;
689 	} else {
690 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
691 		return 0;
692 	}
693 }
694 
695 int dlm_put_lkb(struct dlm_lkb *lkb)
696 {
697 	struct dlm_ls *ls;
698 
699 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
700 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
701 
702 	ls = lkb->lkb_resource->res_ls;
703 	return __put_lkb(ls, lkb);
704 }
705 
706 /* This is only called to add a reference when the code already holds
707    a valid reference to the lkb, so there's no need for locking. */
708 
709 static inline void hold_lkb(struct dlm_lkb *lkb)
710 {
711 	kref_get(&lkb->lkb_ref);
712 }
713 
714 /* This is called when we need to remove a reference and are certain
715    it's not the last ref.  e.g. del_lkb is always called between a
716    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
717    put_lkb would work fine, but would involve unnecessary locking */
718 
719 static inline void unhold_lkb(struct dlm_lkb *lkb)
720 {
721 	int rv;
722 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
723 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
724 }
725 
726 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
727 			    int mode)
728 {
729 	struct dlm_lkb *lkb = NULL;
730 
731 	list_for_each_entry(lkb, head, lkb_statequeue)
732 		if (lkb->lkb_rqmode < mode)
733 			break;
734 
735 	if (!lkb)
736 		list_add_tail(new, head);
737 	else
738 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
739 }
740 
741 /* add/remove lkb to rsb's grant/convert/wait queue */
742 
743 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
744 {
745 	kref_get(&lkb->lkb_ref);
746 
747 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
748 
749 	lkb->lkb_timestamp = ktime_get();
750 
751 	lkb->lkb_status = status;
752 
753 	switch (status) {
754 	case DLM_LKSTS_WAITING:
755 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
756 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
757 		else
758 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
759 		break;
760 	case DLM_LKSTS_GRANTED:
761 		/* convention says granted locks kept in order of grmode */
762 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
763 				lkb->lkb_grmode);
764 		break;
765 	case DLM_LKSTS_CONVERT:
766 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
767 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
768 		else
769 			list_add_tail(&lkb->lkb_statequeue,
770 				      &r->res_convertqueue);
771 		break;
772 	default:
773 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
774 	}
775 }
776 
777 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
778 {
779 	lkb->lkb_status = 0;
780 	list_del(&lkb->lkb_statequeue);
781 	unhold_lkb(lkb);
782 }
783 
784 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
785 {
786 	hold_lkb(lkb);
787 	del_lkb(r, lkb);
788 	add_lkb(r, lkb, sts);
789 	unhold_lkb(lkb);
790 }
791 
792 static int msg_reply_type(int mstype)
793 {
794 	switch (mstype) {
795 	case DLM_MSG_REQUEST:
796 		return DLM_MSG_REQUEST_REPLY;
797 	case DLM_MSG_CONVERT:
798 		return DLM_MSG_CONVERT_REPLY;
799 	case DLM_MSG_UNLOCK:
800 		return DLM_MSG_UNLOCK_REPLY;
801 	case DLM_MSG_CANCEL:
802 		return DLM_MSG_CANCEL_REPLY;
803 	case DLM_MSG_LOOKUP:
804 		return DLM_MSG_LOOKUP_REPLY;
805 	}
806 	return -1;
807 }
808 
809 /* add/remove lkb from global waiters list of lkb's waiting for
810    a reply from a remote node */
811 
812 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
813 {
814 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
815 	int error = 0;
816 
817 	mutex_lock(&ls->ls_waiters_mutex);
818 
819 	if (is_overlap_unlock(lkb) ||
820 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
821 		error = -EINVAL;
822 		goto out;
823 	}
824 
825 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
826 		switch (mstype) {
827 		case DLM_MSG_UNLOCK:
828 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
829 			break;
830 		case DLM_MSG_CANCEL:
831 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
832 			break;
833 		default:
834 			error = -EBUSY;
835 			goto out;
836 		}
837 		lkb->lkb_wait_count++;
838 		hold_lkb(lkb);
839 
840 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
841 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
842 			  lkb->lkb_wait_count, lkb->lkb_flags);
843 		goto out;
844 	}
845 
846 	DLM_ASSERT(!lkb->lkb_wait_count,
847 		   dlm_print_lkb(lkb);
848 		   printk("wait_count %d\n", lkb->lkb_wait_count););
849 
850 	lkb->lkb_wait_count++;
851 	lkb->lkb_wait_type = mstype;
852 	hold_lkb(lkb);
853 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
854  out:
855 	if (error)
856 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
857 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
858 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
859 	mutex_unlock(&ls->ls_waiters_mutex);
860 	return error;
861 }
862 
863 /* We clear the RESEND flag because we might be taking an lkb off the waiters
864    list as part of process_requestqueue (e.g. a lookup that has an optimized
865    request reply on the requestqueue) between dlm_recover_waiters_pre() which
866    set RESEND and dlm_recover_waiters_post() */
867 
868 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
869 				struct dlm_message *ms)
870 {
871 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
872 	int overlap_done = 0;
873 
874 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
875 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
876 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
877 		overlap_done = 1;
878 		goto out_del;
879 	}
880 
881 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
882 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
883 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
884 		overlap_done = 1;
885 		goto out_del;
886 	}
887 
888 	/* Cancel state was preemptively cleared by a successful convert,
889 	   see next comment, nothing to do. */
890 
891 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
892 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
893 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
894 			  lkb->lkb_id, lkb->lkb_wait_type);
895 		return -1;
896 	}
897 
898 	/* Remove for the convert reply, and premptively remove for the
899 	   cancel reply.  A convert has been granted while there's still
900 	   an outstanding cancel on it (the cancel is moot and the result
901 	   in the cancel reply should be 0).  We preempt the cancel reply
902 	   because the app gets the convert result and then can follow up
903 	   with another op, like convert.  This subsequent op would see the
904 	   lingering state of the cancel and fail with -EBUSY. */
905 
906 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
907 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
908 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
909 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
910 			  lkb->lkb_id);
911 		lkb->lkb_wait_type = 0;
912 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
913 		lkb->lkb_wait_count--;
914 		goto out_del;
915 	}
916 
917 	/* N.B. type of reply may not always correspond to type of original
918 	   msg due to lookup->request optimization, verify others? */
919 
920 	if (lkb->lkb_wait_type) {
921 		lkb->lkb_wait_type = 0;
922 		goto out_del;
923 	}
924 
925 	log_error(ls, "remwait error %x reply %d flags %x no wait_type",
926 		  lkb->lkb_id, mstype, lkb->lkb_flags);
927 	return -1;
928 
929  out_del:
930 	/* the force-unlock/cancel has completed and we haven't recvd a reply
931 	   to the op that was in progress prior to the unlock/cancel; we
932 	   give up on any reply to the earlier op.  FIXME: not sure when/how
933 	   this would happen */
934 
935 	if (overlap_done && lkb->lkb_wait_type) {
936 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
937 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
938 		lkb->lkb_wait_count--;
939 		lkb->lkb_wait_type = 0;
940 	}
941 
942 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
943 
944 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
945 	lkb->lkb_wait_count--;
946 	if (!lkb->lkb_wait_count)
947 		list_del_init(&lkb->lkb_wait_reply);
948 	unhold_lkb(lkb);
949 	return 0;
950 }
951 
952 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
953 {
954 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955 	int error;
956 
957 	mutex_lock(&ls->ls_waiters_mutex);
958 	error = _remove_from_waiters(lkb, mstype, NULL);
959 	mutex_unlock(&ls->ls_waiters_mutex);
960 	return error;
961 }
962 
963 /* Handles situations where we might be processing a "fake" or "stub" reply in
964    which we can't try to take waiters_mutex again. */
965 
966 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
967 {
968 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
969 	int error;
970 
971 	if (ms != &ls->ls_stub_ms)
972 		mutex_lock(&ls->ls_waiters_mutex);
973 	error = _remove_from_waiters(lkb, ms->m_type, ms);
974 	if (ms != &ls->ls_stub_ms)
975 		mutex_unlock(&ls->ls_waiters_mutex);
976 	return error;
977 }
978 
979 static void dir_remove(struct dlm_rsb *r)
980 {
981 	int to_nodeid;
982 
983 	if (dlm_no_directory(r->res_ls))
984 		return;
985 
986 	to_nodeid = dlm_dir_nodeid(r);
987 	if (to_nodeid != dlm_our_nodeid())
988 		send_remove(r);
989 	else
990 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
991 				     r->res_name, r->res_length);
992 }
993 
994 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
995    found since they are in order of newest to oldest? */
996 
997 static int shrink_bucket(struct dlm_ls *ls, int b)
998 {
999 	struct dlm_rsb *r;
1000 	int count = 0, found;
1001 
1002 	for (;;) {
1003 		found = 0;
1004 		spin_lock(&ls->ls_rsbtbl[b].lock);
1005 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1006 					    res_hashchain) {
1007 			if (!time_after_eq(jiffies, r->res_toss_time +
1008 					   dlm_config.ci_toss_secs * HZ))
1009 				continue;
1010 			found = 1;
1011 			break;
1012 		}
1013 
1014 		if (!found) {
1015 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1016 			break;
1017 		}
1018 
1019 		if (kref_put(&r->res_ref, kill_rsb)) {
1020 			list_del(&r->res_hashchain);
1021 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1022 
1023 			if (is_master(r))
1024 				dir_remove(r);
1025 			dlm_free_rsb(r);
1026 			count++;
1027 		} else {
1028 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1029 			log_error(ls, "tossed rsb in use %s", r->res_name);
1030 		}
1031 	}
1032 
1033 	return count;
1034 }
1035 
1036 void dlm_scan_rsbs(struct dlm_ls *ls)
1037 {
1038 	int i;
1039 
1040 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1041 		shrink_bucket(ls, i);
1042 		if (dlm_locking_stopped(ls))
1043 			break;
1044 		cond_resched();
1045 	}
1046 }
1047 
1048 static void add_timeout(struct dlm_lkb *lkb)
1049 {
1050 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1051 
1052 	if (is_master_copy(lkb))
1053 		return;
1054 
1055 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1056 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1057 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1058 		goto add_it;
1059 	}
1060 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1061 		goto add_it;
1062 	return;
1063 
1064  add_it:
1065 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1066 	mutex_lock(&ls->ls_timeout_mutex);
1067 	hold_lkb(lkb);
1068 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1069 	mutex_unlock(&ls->ls_timeout_mutex);
1070 }
1071 
1072 static void del_timeout(struct dlm_lkb *lkb)
1073 {
1074 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1075 
1076 	mutex_lock(&ls->ls_timeout_mutex);
1077 	if (!list_empty(&lkb->lkb_time_list)) {
1078 		list_del_init(&lkb->lkb_time_list);
1079 		unhold_lkb(lkb);
1080 	}
1081 	mutex_unlock(&ls->ls_timeout_mutex);
1082 }
1083 
1084 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1085    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1086    and then lock rsb because of lock ordering in add_timeout.  We may need
1087    to specify some special timeout-related bits in the lkb that are just to
1088    be accessed under the timeout_mutex. */
1089 
1090 void dlm_scan_timeout(struct dlm_ls *ls)
1091 {
1092 	struct dlm_rsb *r;
1093 	struct dlm_lkb *lkb;
1094 	int do_cancel, do_warn;
1095 	s64 wait_us;
1096 
1097 	for (;;) {
1098 		if (dlm_locking_stopped(ls))
1099 			break;
1100 
1101 		do_cancel = 0;
1102 		do_warn = 0;
1103 		mutex_lock(&ls->ls_timeout_mutex);
1104 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1105 
1106 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1107 					      		lkb->lkb_timestamp));
1108 
1109 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1110 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1111 				do_cancel = 1;
1112 
1113 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1114 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1115 				do_warn = 1;
1116 
1117 			if (!do_cancel && !do_warn)
1118 				continue;
1119 			hold_lkb(lkb);
1120 			break;
1121 		}
1122 		mutex_unlock(&ls->ls_timeout_mutex);
1123 
1124 		if (!do_cancel && !do_warn)
1125 			break;
1126 
1127 		r = lkb->lkb_resource;
1128 		hold_rsb(r);
1129 		lock_rsb(r);
1130 
1131 		if (do_warn) {
1132 			/* clear flag so we only warn once */
1133 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1134 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1135 				del_timeout(lkb);
1136 			dlm_timeout_warn(lkb);
1137 		}
1138 
1139 		if (do_cancel) {
1140 			log_debug(ls, "timeout cancel %x node %d %s",
1141 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1142 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1143 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1144 			del_timeout(lkb);
1145 			_cancel_lock(r, lkb);
1146 		}
1147 
1148 		unlock_rsb(r);
1149 		unhold_rsb(r);
1150 		dlm_put_lkb(lkb);
1151 	}
1152 }
1153 
1154 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1155    dlm_recoverd before checking/setting ls_recover_begin. */
1156 
1157 void dlm_adjust_timeouts(struct dlm_ls *ls)
1158 {
1159 	struct dlm_lkb *lkb;
1160 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1161 
1162 	ls->ls_recover_begin = 0;
1163 	mutex_lock(&ls->ls_timeout_mutex);
1164 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1165 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1166 	mutex_unlock(&ls->ls_timeout_mutex);
1167 }
1168 
1169 /* lkb is master or local copy */
1170 
1171 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1172 {
1173 	int b, len = r->res_ls->ls_lvblen;
1174 
1175 	/* b=1 lvb returned to caller
1176 	   b=0 lvb written to rsb or invalidated
1177 	   b=-1 do nothing */
1178 
1179 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1180 
1181 	if (b == 1) {
1182 		if (!lkb->lkb_lvbptr)
1183 			return;
1184 
1185 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1186 			return;
1187 
1188 		if (!r->res_lvbptr)
1189 			return;
1190 
1191 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1192 		lkb->lkb_lvbseq = r->res_lvbseq;
1193 
1194 	} else if (b == 0) {
1195 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1196 			rsb_set_flag(r, RSB_VALNOTVALID);
1197 			return;
1198 		}
1199 
1200 		if (!lkb->lkb_lvbptr)
1201 			return;
1202 
1203 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1204 			return;
1205 
1206 		if (!r->res_lvbptr)
1207 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1208 
1209 		if (!r->res_lvbptr)
1210 			return;
1211 
1212 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1213 		r->res_lvbseq++;
1214 		lkb->lkb_lvbseq = r->res_lvbseq;
1215 		rsb_clear_flag(r, RSB_VALNOTVALID);
1216 	}
1217 
1218 	if (rsb_flag(r, RSB_VALNOTVALID))
1219 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1220 }
1221 
1222 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1223 {
1224 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1225 		return;
1226 
1227 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1228 		rsb_set_flag(r, RSB_VALNOTVALID);
1229 		return;
1230 	}
1231 
1232 	if (!lkb->lkb_lvbptr)
1233 		return;
1234 
1235 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1236 		return;
1237 
1238 	if (!r->res_lvbptr)
1239 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1240 
1241 	if (!r->res_lvbptr)
1242 		return;
1243 
1244 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1245 	r->res_lvbseq++;
1246 	rsb_clear_flag(r, RSB_VALNOTVALID);
1247 }
1248 
1249 /* lkb is process copy (pc) */
1250 
1251 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1252 			    struct dlm_message *ms)
1253 {
1254 	int b;
1255 
1256 	if (!lkb->lkb_lvbptr)
1257 		return;
1258 
1259 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1260 		return;
1261 
1262 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1263 	if (b == 1) {
1264 		int len = receive_extralen(ms);
1265 		if (len > DLM_RESNAME_MAXLEN)
1266 			len = DLM_RESNAME_MAXLEN;
1267 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1268 		lkb->lkb_lvbseq = ms->m_lvbseq;
1269 	}
1270 }
1271 
1272 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1273    remove_lock -- used for unlock, removes lkb from granted
1274    revert_lock -- used for cancel, moves lkb from convert to granted
1275    grant_lock  -- used for request and convert, adds lkb to granted or
1276                   moves lkb from convert or waiting to granted
1277 
1278    Each of these is used for master or local copy lkb's.  There is
1279    also a _pc() variation used to make the corresponding change on
1280    a process copy (pc) lkb. */
1281 
1282 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1283 {
1284 	del_lkb(r, lkb);
1285 	lkb->lkb_grmode = DLM_LOCK_IV;
1286 	/* this unhold undoes the original ref from create_lkb()
1287 	   so this leads to the lkb being freed */
1288 	unhold_lkb(lkb);
1289 }
1290 
1291 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1292 {
1293 	set_lvb_unlock(r, lkb);
1294 	_remove_lock(r, lkb);
1295 }
1296 
1297 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298 {
1299 	_remove_lock(r, lkb);
1300 }
1301 
1302 /* returns: 0 did nothing
1303 	    1 moved lock to granted
1304 	   -1 removed lock */
1305 
1306 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1307 {
1308 	int rv = 0;
1309 
1310 	lkb->lkb_rqmode = DLM_LOCK_IV;
1311 
1312 	switch (lkb->lkb_status) {
1313 	case DLM_LKSTS_GRANTED:
1314 		break;
1315 	case DLM_LKSTS_CONVERT:
1316 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1317 		rv = 1;
1318 		break;
1319 	case DLM_LKSTS_WAITING:
1320 		del_lkb(r, lkb);
1321 		lkb->lkb_grmode = DLM_LOCK_IV;
1322 		/* this unhold undoes the original ref from create_lkb()
1323 		   so this leads to the lkb being freed */
1324 		unhold_lkb(lkb);
1325 		rv = -1;
1326 		break;
1327 	default:
1328 		log_print("invalid status for revert %d", lkb->lkb_status);
1329 	}
1330 	return rv;
1331 }
1332 
1333 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1334 {
1335 	return revert_lock(r, lkb);
1336 }
1337 
1338 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1339 {
1340 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1341 		lkb->lkb_grmode = lkb->lkb_rqmode;
1342 		if (lkb->lkb_status)
1343 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344 		else
1345 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1346 	}
1347 
1348 	lkb->lkb_rqmode = DLM_LOCK_IV;
1349 }
1350 
1351 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1352 {
1353 	set_lvb_lock(r, lkb);
1354 	_grant_lock(r, lkb);
1355 	lkb->lkb_highbast = 0;
1356 }
1357 
1358 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1359 			  struct dlm_message *ms)
1360 {
1361 	set_lvb_lock_pc(r, lkb, ms);
1362 	_grant_lock(r, lkb);
1363 }
1364 
1365 /* called by grant_pending_locks() which means an async grant message must
1366    be sent to the requesting node in addition to granting the lock if the
1367    lkb belongs to a remote node. */
1368 
1369 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1370 {
1371 	grant_lock(r, lkb);
1372 	if (is_master_copy(lkb))
1373 		send_grant(r, lkb);
1374 	else
1375 		queue_cast(r, lkb, 0);
1376 }
1377 
1378 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1379    change the granted/requested modes.  We're munging things accordingly in
1380    the process copy.
1381    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1382    conversion deadlock
1383    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1384    compatible with other granted locks */
1385 
1386 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1387 {
1388 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1389 		log_print("munge_demoted %x invalid reply type %d",
1390 			  lkb->lkb_id, ms->m_type);
1391 		return;
1392 	}
1393 
1394 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1395 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1396 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1397 		return;
1398 	}
1399 
1400 	lkb->lkb_grmode = DLM_LOCK_NL;
1401 }
1402 
1403 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1404 {
1405 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1406 	    ms->m_type != DLM_MSG_GRANT) {
1407 		log_print("munge_altmode %x invalid reply type %d",
1408 			  lkb->lkb_id, ms->m_type);
1409 		return;
1410 	}
1411 
1412 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1413 		lkb->lkb_rqmode = DLM_LOCK_PR;
1414 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1415 		lkb->lkb_rqmode = DLM_LOCK_CW;
1416 	else {
1417 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1418 		dlm_print_lkb(lkb);
1419 	}
1420 }
1421 
1422 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1423 {
1424 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1425 					   lkb_statequeue);
1426 	if (lkb->lkb_id == first->lkb_id)
1427 		return 1;
1428 
1429 	return 0;
1430 }
1431 
1432 /* Check if the given lkb conflicts with another lkb on the queue. */
1433 
1434 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1435 {
1436 	struct dlm_lkb *this;
1437 
1438 	list_for_each_entry(this, head, lkb_statequeue) {
1439 		if (this == lkb)
1440 			continue;
1441 		if (!modes_compat(this, lkb))
1442 			return 1;
1443 	}
1444 	return 0;
1445 }
1446 
1447 /*
1448  * "A conversion deadlock arises with a pair of lock requests in the converting
1449  * queue for one resource.  The granted mode of each lock blocks the requested
1450  * mode of the other lock."
1451  *
1452  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1453  * convert queue from being granted, then deadlk/demote lkb.
1454  *
1455  * Example:
1456  * Granted Queue: empty
1457  * Convert Queue: NL->EX (first lock)
1458  *                PR->EX (second lock)
1459  *
1460  * The first lock can't be granted because of the granted mode of the second
1461  * lock and the second lock can't be granted because it's not first in the
1462  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1463  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1464  * flag set and return DEMOTED in the lksb flags.
1465  *
1466  * Originally, this function detected conv-deadlk in a more limited scope:
1467  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1468  * - if lkb1 was the first entry in the queue (not just earlier), and was
1469  *   blocked by the granted mode of lkb2, and there was nothing on the
1470  *   granted queue preventing lkb1 from being granted immediately, i.e.
1471  *   lkb2 was the only thing preventing lkb1 from being granted.
1472  *
1473  * That second condition meant we'd only say there was conv-deadlk if
1474  * resolving it (by demotion) would lead to the first lock on the convert
1475  * queue being granted right away.  It allowed conversion deadlocks to exist
1476  * between locks on the convert queue while they couldn't be granted anyway.
1477  *
1478  * Now, we detect and take action on conversion deadlocks immediately when
1479  * they're created, even if they may not be immediately consequential.  If
1480  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1481  * mode that would prevent lkb1's conversion from being granted, we do a
1482  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1483  * I think this means that the lkb_is_ahead condition below should always
1484  * be zero, i.e. there will never be conv-deadlk between two locks that are
1485  * both already on the convert queue.
1486  */
1487 
1488 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1489 {
1490 	struct dlm_lkb *lkb1;
1491 	int lkb_is_ahead = 0;
1492 
1493 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1494 		if (lkb1 == lkb2) {
1495 			lkb_is_ahead = 1;
1496 			continue;
1497 		}
1498 
1499 		if (!lkb_is_ahead) {
1500 			if (!modes_compat(lkb2, lkb1))
1501 				return 1;
1502 		} else {
1503 			if (!modes_compat(lkb2, lkb1) &&
1504 			    !modes_compat(lkb1, lkb2))
1505 				return 1;
1506 		}
1507 	}
1508 	return 0;
1509 }
1510 
1511 /*
1512  * Return 1 if the lock can be granted, 0 otherwise.
1513  * Also detect and resolve conversion deadlocks.
1514  *
1515  * lkb is the lock to be granted
1516  *
1517  * now is 1 if the function is being called in the context of the
1518  * immediate request, it is 0 if called later, after the lock has been
1519  * queued.
1520  *
1521  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1522  */
1523 
1524 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1525 {
1526 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1527 
1528 	/*
1529 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1530 	 * a new request for a NL mode lock being blocked.
1531 	 *
1532 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1533 	 * request, then it would be granted.  In essence, the use of this flag
1534 	 * tells the Lock Manager to expedite theis request by not considering
1535 	 * what may be in the CONVERTING or WAITING queues...  As of this
1536 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1537 	 * mode locks.  This flag is not valid for conversion requests.
1538 	 *
1539 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1540 	 * conversion or used with a non-NL requested mode.  We also know an
1541 	 * EXPEDITE request is always granted immediately, so now must always
1542 	 * be 1.  The full condition to grant an expedite request: (now &&
1543 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1544 	 * therefore be shortened to just checking the flag.
1545 	 */
1546 
1547 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1548 		return 1;
1549 
1550 	/*
1551 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1552 	 * added to the remaining conditions.
1553 	 */
1554 
1555 	if (queue_conflict(&r->res_grantqueue, lkb))
1556 		goto out;
1557 
1558 	/*
1559 	 * 6-3: By default, a conversion request is immediately granted if the
1560 	 * requested mode is compatible with the modes of all other granted
1561 	 * locks
1562 	 */
1563 
1564 	if (queue_conflict(&r->res_convertqueue, lkb))
1565 		goto out;
1566 
1567 	/*
1568 	 * 6-5: But the default algorithm for deciding whether to grant or
1569 	 * queue conversion requests does not by itself guarantee that such
1570 	 * requests are serviced on a "first come first serve" basis.  This, in
1571 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1572 	 *
1573 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1574 	 * the system service employed to request a lock conversion.  This flag
1575 	 * forces certain conversion requests to be queued, even if they are
1576 	 * compatible with the granted modes of other locks on the same
1577 	 * resource.  Thus, the use of this flag results in conversion requests
1578 	 * being ordered on a "first come first servce" basis.
1579 	 *
1580 	 * DCT: This condition is all about new conversions being able to occur
1581 	 * "in place" while the lock remains on the granted queue (assuming
1582 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1583 	 * doesn't _have_ to go onto the convert queue where it's processed in
1584 	 * order.  The "now" variable is necessary to distinguish converts
1585 	 * being received and processed for the first time now, because once a
1586 	 * convert is moved to the conversion queue the condition below applies
1587 	 * requiring fifo granting.
1588 	 */
1589 
1590 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1591 		return 1;
1592 
1593 	/*
1594 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1595 	 * order.
1596 	 */
1597 
1598 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1599 		return 1;
1600 
1601 	/*
1602 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1603 	 * granted until all other conversion requests ahead of it are granted
1604 	 * and/or canceled.
1605 	 */
1606 
1607 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1608 		return 1;
1609 
1610 	/*
1611 	 * 6-4: By default, a new request is immediately granted only if all
1612 	 * three of the following conditions are satisfied when the request is
1613 	 * issued:
1614 	 * - The queue of ungranted conversion requests for the resource is
1615 	 *   empty.
1616 	 * - The queue of ungranted new requests for the resource is empty.
1617 	 * - The mode of the new request is compatible with the most
1618 	 *   restrictive mode of all granted locks on the resource.
1619 	 */
1620 
1621 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1622 	    list_empty(&r->res_waitqueue))
1623 		return 1;
1624 
1625 	/*
1626 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1627 	 * it cannot be granted until the queue of ungranted conversion
1628 	 * requests is empty, all ungranted new requests ahead of it are
1629 	 * granted and/or canceled, and it is compatible with the granted mode
1630 	 * of the most restrictive lock granted on the resource.
1631 	 */
1632 
1633 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1634 	    first_in_list(lkb, &r->res_waitqueue))
1635 		return 1;
1636  out:
1637 	return 0;
1638 }
1639 
1640 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1641 			  int *err)
1642 {
1643 	int rv;
1644 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1645 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1646 
1647 	if (err)
1648 		*err = 0;
1649 
1650 	rv = _can_be_granted(r, lkb, now);
1651 	if (rv)
1652 		goto out;
1653 
1654 	/*
1655 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1656 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1657 	 * cancels one of the locks.
1658 	 */
1659 
1660 	if (is_convert && can_be_queued(lkb) &&
1661 	    conversion_deadlock_detect(r, lkb)) {
1662 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1663 			lkb->lkb_grmode = DLM_LOCK_NL;
1664 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1665 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1666 			if (err)
1667 				*err = -EDEADLK;
1668 			else {
1669 				log_print("can_be_granted deadlock %x now %d",
1670 					  lkb->lkb_id, now);
1671 				dlm_dump_rsb(r);
1672 			}
1673 		}
1674 		goto out;
1675 	}
1676 
1677 	/*
1678 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1679 	 * to grant a request in a mode other than the normal rqmode.  It's a
1680 	 * simple way to provide a big optimization to applications that can
1681 	 * use them.
1682 	 */
1683 
1684 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1685 		alt = DLM_LOCK_PR;
1686 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1687 		alt = DLM_LOCK_CW;
1688 
1689 	if (alt) {
1690 		lkb->lkb_rqmode = alt;
1691 		rv = _can_be_granted(r, lkb, now);
1692 		if (rv)
1693 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1694 		else
1695 			lkb->lkb_rqmode = rqmode;
1696 	}
1697  out:
1698 	return rv;
1699 }
1700 
1701 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1702    for locks pending on the convert list.  Once verified (watch for these
1703    log_prints), we should be able to just call _can_be_granted() and not
1704    bother with the demote/deadlk cases here (and there's no easy way to deal
1705    with a deadlk here, we'd have to generate something like grant_lock with
1706    the deadlk error.) */
1707 
1708 /* Returns the highest requested mode of all blocked conversions; sets
1709    cw if there's a blocked conversion to DLM_LOCK_CW. */
1710 
1711 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1712 {
1713 	struct dlm_lkb *lkb, *s;
1714 	int hi, demoted, quit, grant_restart, demote_restart;
1715 	int deadlk;
1716 
1717 	quit = 0;
1718  restart:
1719 	grant_restart = 0;
1720 	demote_restart = 0;
1721 	hi = DLM_LOCK_IV;
1722 
1723 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1724 		demoted = is_demoted(lkb);
1725 		deadlk = 0;
1726 
1727 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1728 			grant_lock_pending(r, lkb);
1729 			grant_restart = 1;
1730 			continue;
1731 		}
1732 
1733 		if (!demoted && is_demoted(lkb)) {
1734 			log_print("WARN: pending demoted %x node %d %s",
1735 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1736 			demote_restart = 1;
1737 			continue;
1738 		}
1739 
1740 		if (deadlk) {
1741 			log_print("WARN: pending deadlock %x node %d %s",
1742 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1743 			dlm_dump_rsb(r);
1744 			continue;
1745 		}
1746 
1747 		hi = max_t(int, lkb->lkb_rqmode, hi);
1748 
1749 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1750 			*cw = 1;
1751 	}
1752 
1753 	if (grant_restart)
1754 		goto restart;
1755 	if (demote_restart && !quit) {
1756 		quit = 1;
1757 		goto restart;
1758 	}
1759 
1760 	return max_t(int, high, hi);
1761 }
1762 
1763 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1764 {
1765 	struct dlm_lkb *lkb, *s;
1766 
1767 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1768 		if (can_be_granted(r, lkb, 0, NULL))
1769 			grant_lock_pending(r, lkb);
1770                 else {
1771 			high = max_t(int, lkb->lkb_rqmode, high);
1772 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1773 				*cw = 1;
1774 		}
1775 	}
1776 
1777 	return high;
1778 }
1779 
1780 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1781    on either the convert or waiting queue.
1782    high is the largest rqmode of all locks blocked on the convert or
1783    waiting queue. */
1784 
1785 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1786 {
1787 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1788 		if (gr->lkb_highbast < DLM_LOCK_EX)
1789 			return 1;
1790 		return 0;
1791 	}
1792 
1793 	if (gr->lkb_highbast < high &&
1794 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1795 		return 1;
1796 	return 0;
1797 }
1798 
1799 static void grant_pending_locks(struct dlm_rsb *r)
1800 {
1801 	struct dlm_lkb *lkb, *s;
1802 	int high = DLM_LOCK_IV;
1803 	int cw = 0;
1804 
1805 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1806 
1807 	high = grant_pending_convert(r, high, &cw);
1808 	high = grant_pending_wait(r, high, &cw);
1809 
1810 	if (high == DLM_LOCK_IV)
1811 		return;
1812 
1813 	/*
1814 	 * If there are locks left on the wait/convert queue then send blocking
1815 	 * ASTs to granted locks based on the largest requested mode (high)
1816 	 * found above.
1817 	 */
1818 
1819 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1820 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1821 			if (cw && high == DLM_LOCK_PR &&
1822 			    lkb->lkb_grmode == DLM_LOCK_PR)
1823 				queue_bast(r, lkb, DLM_LOCK_CW);
1824 			else
1825 				queue_bast(r, lkb, high);
1826 			lkb->lkb_highbast = high;
1827 		}
1828 	}
1829 }
1830 
1831 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1832 {
1833 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1834 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1835 		if (gr->lkb_highbast < DLM_LOCK_EX)
1836 			return 1;
1837 		return 0;
1838 	}
1839 
1840 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1841 		return 1;
1842 	return 0;
1843 }
1844 
1845 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1846 			    struct dlm_lkb *lkb)
1847 {
1848 	struct dlm_lkb *gr;
1849 
1850 	list_for_each_entry(gr, head, lkb_statequeue) {
1851 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1852 			queue_bast(r, gr, lkb->lkb_rqmode);
1853 			gr->lkb_highbast = lkb->lkb_rqmode;
1854 		}
1855 	}
1856 }
1857 
1858 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1859 {
1860 	send_bast_queue(r, &r->res_grantqueue, lkb);
1861 }
1862 
1863 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1864 {
1865 	send_bast_queue(r, &r->res_grantqueue, lkb);
1866 	send_bast_queue(r, &r->res_convertqueue, lkb);
1867 }
1868 
1869 /* set_master(r, lkb) -- set the master nodeid of a resource
1870 
1871    The purpose of this function is to set the nodeid field in the given
1872    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1873    known, it can just be copied to the lkb and the function will return
1874    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1875    before it can be copied to the lkb.
1876 
1877    When the rsb nodeid is being looked up remotely, the initial lkb
1878    causing the lookup is kept on the ls_waiters list waiting for the
1879    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1880    on the rsb's res_lookup list until the master is verified.
1881 
1882    Return values:
1883    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1884    1: the rsb master is not available and the lkb has been placed on
1885       a wait queue
1886 */
1887 
1888 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1889 {
1890 	struct dlm_ls *ls = r->res_ls;
1891 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1892 
1893 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1894 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1895 		r->res_first_lkid = lkb->lkb_id;
1896 		lkb->lkb_nodeid = r->res_nodeid;
1897 		return 0;
1898 	}
1899 
1900 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1901 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1902 		return 1;
1903 	}
1904 
1905 	if (r->res_nodeid == 0) {
1906 		lkb->lkb_nodeid = 0;
1907 		return 0;
1908 	}
1909 
1910 	if (r->res_nodeid > 0) {
1911 		lkb->lkb_nodeid = r->res_nodeid;
1912 		return 0;
1913 	}
1914 
1915 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1916 
1917 	dir_nodeid = dlm_dir_nodeid(r);
1918 
1919 	if (dir_nodeid != our_nodeid) {
1920 		r->res_first_lkid = lkb->lkb_id;
1921 		send_lookup(r, lkb);
1922 		return 1;
1923 	}
1924 
1925 	for (i = 0; i < 2; i++) {
1926 		/* It's possible for dlm_scand to remove an old rsb for
1927 		   this same resource from the toss list, us to create
1928 		   a new one, look up the master locally, and find it
1929 		   already exists just before dlm_scand does the
1930 		   dir_remove() on the previous rsb. */
1931 
1932 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1933 				       r->res_length, &ret_nodeid);
1934 		if (!error)
1935 			break;
1936 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1937 		schedule();
1938 	}
1939 	if (error && error != -EEXIST)
1940 		return error;
1941 
1942 	if (ret_nodeid == our_nodeid) {
1943 		r->res_first_lkid = 0;
1944 		r->res_nodeid = 0;
1945 		lkb->lkb_nodeid = 0;
1946 	} else {
1947 		r->res_first_lkid = lkb->lkb_id;
1948 		r->res_nodeid = ret_nodeid;
1949 		lkb->lkb_nodeid = ret_nodeid;
1950 	}
1951 	return 0;
1952 }
1953 
1954 static void process_lookup_list(struct dlm_rsb *r)
1955 {
1956 	struct dlm_lkb *lkb, *safe;
1957 
1958 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1959 		list_del_init(&lkb->lkb_rsb_lookup);
1960 		_request_lock(r, lkb);
1961 		schedule();
1962 	}
1963 }
1964 
1965 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1966 
1967 static void confirm_master(struct dlm_rsb *r, int error)
1968 {
1969 	struct dlm_lkb *lkb;
1970 
1971 	if (!r->res_first_lkid)
1972 		return;
1973 
1974 	switch (error) {
1975 	case 0:
1976 	case -EINPROGRESS:
1977 		r->res_first_lkid = 0;
1978 		process_lookup_list(r);
1979 		break;
1980 
1981 	case -EAGAIN:
1982 	case -EBADR:
1983 	case -ENOTBLK:
1984 		/* the remote request failed and won't be retried (it was
1985 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
1986 		   lkb the first_lkid */
1987 
1988 		r->res_first_lkid = 0;
1989 
1990 		if (!list_empty(&r->res_lookup)) {
1991 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1992 					 lkb_rsb_lookup);
1993 			list_del_init(&lkb->lkb_rsb_lookup);
1994 			r->res_first_lkid = lkb->lkb_id;
1995 			_request_lock(r, lkb);
1996 		}
1997 		break;
1998 
1999 	default:
2000 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2001 	}
2002 }
2003 
2004 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2005 			 int namelen, unsigned long timeout_cs,
2006 			 void (*ast) (void *astparam),
2007 			 void *astparam,
2008 			 void (*bast) (void *astparam, int mode),
2009 			 struct dlm_args *args)
2010 {
2011 	int rv = -EINVAL;
2012 
2013 	/* check for invalid arg usage */
2014 
2015 	if (mode < 0 || mode > DLM_LOCK_EX)
2016 		goto out;
2017 
2018 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2019 		goto out;
2020 
2021 	if (flags & DLM_LKF_CANCEL)
2022 		goto out;
2023 
2024 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2025 		goto out;
2026 
2027 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2028 		goto out;
2029 
2030 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2031 		goto out;
2032 
2033 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2034 		goto out;
2035 
2036 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2037 		goto out;
2038 
2039 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2040 		goto out;
2041 
2042 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2043 		goto out;
2044 
2045 	if (!ast || !lksb)
2046 		goto out;
2047 
2048 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2049 		goto out;
2050 
2051 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2052 		goto out;
2053 
2054 	/* these args will be copied to the lkb in validate_lock_args,
2055 	   it cannot be done now because when converting locks, fields in
2056 	   an active lkb cannot be modified before locking the rsb */
2057 
2058 	args->flags = flags;
2059 	args->astfn = ast;
2060 	args->astparam = astparam;
2061 	args->bastfn = bast;
2062 	args->timeout = timeout_cs;
2063 	args->mode = mode;
2064 	args->lksb = lksb;
2065 	rv = 0;
2066  out:
2067 	return rv;
2068 }
2069 
2070 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2071 {
2072 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2073  		      DLM_LKF_FORCEUNLOCK))
2074 		return -EINVAL;
2075 
2076 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2077 		return -EINVAL;
2078 
2079 	args->flags = flags;
2080 	args->astparam = astarg;
2081 	return 0;
2082 }
2083 
2084 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2085 			      struct dlm_args *args)
2086 {
2087 	int rv = -EINVAL;
2088 
2089 	if (args->flags & DLM_LKF_CONVERT) {
2090 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2091 			goto out;
2092 
2093 		if (args->flags & DLM_LKF_QUECVT &&
2094 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2095 			goto out;
2096 
2097 		rv = -EBUSY;
2098 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2099 			goto out;
2100 
2101 		if (lkb->lkb_wait_type)
2102 			goto out;
2103 
2104 		if (is_overlap(lkb))
2105 			goto out;
2106 	}
2107 
2108 	lkb->lkb_exflags = args->flags;
2109 	lkb->lkb_sbflags = 0;
2110 	lkb->lkb_astfn = args->astfn;
2111 	lkb->lkb_astparam = args->astparam;
2112 	lkb->lkb_bastfn = args->bastfn;
2113 	lkb->lkb_rqmode = args->mode;
2114 	lkb->lkb_lksb = args->lksb;
2115 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2116 	lkb->lkb_ownpid = (int) current->pid;
2117 	lkb->lkb_timeout_cs = args->timeout;
2118 	rv = 0;
2119  out:
2120 	if (rv)
2121 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2122 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2123 			  lkb->lkb_status, lkb->lkb_wait_type,
2124 			  lkb->lkb_resource->res_name);
2125 	return rv;
2126 }
2127 
2128 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2129    for success */
2130 
2131 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2132    because there may be a lookup in progress and it's valid to do
2133    cancel/unlockf on it */
2134 
2135 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2136 {
2137 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2138 	int rv = -EINVAL;
2139 
2140 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2141 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2142 		dlm_print_lkb(lkb);
2143 		goto out;
2144 	}
2145 
2146 	/* an lkb may still exist even though the lock is EOL'ed due to a
2147 	   cancel, unlock or failed noqueue request; an app can't use these
2148 	   locks; return same error as if the lkid had not been found at all */
2149 
2150 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2151 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2152 		rv = -ENOENT;
2153 		goto out;
2154 	}
2155 
2156 	/* an lkb may be waiting for an rsb lookup to complete where the
2157 	   lookup was initiated by another lock */
2158 
2159 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2160 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2161 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2162 			list_del_init(&lkb->lkb_rsb_lookup);
2163 			queue_cast(lkb->lkb_resource, lkb,
2164 				   args->flags & DLM_LKF_CANCEL ?
2165 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2166 			unhold_lkb(lkb); /* undoes create_lkb() */
2167 		}
2168 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2169 		rv = -EBUSY;
2170 		goto out;
2171 	}
2172 
2173 	/* cancel not allowed with another cancel/unlock in progress */
2174 
2175 	if (args->flags & DLM_LKF_CANCEL) {
2176 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2177 			goto out;
2178 
2179 		if (is_overlap(lkb))
2180 			goto out;
2181 
2182 		/* don't let scand try to do a cancel */
2183 		del_timeout(lkb);
2184 
2185 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2186 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2187 			rv = -EBUSY;
2188 			goto out;
2189 		}
2190 
2191 		/* there's nothing to cancel */
2192 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2193 		    !lkb->lkb_wait_type) {
2194 			rv = -EBUSY;
2195 			goto out;
2196 		}
2197 
2198 		switch (lkb->lkb_wait_type) {
2199 		case DLM_MSG_LOOKUP:
2200 		case DLM_MSG_REQUEST:
2201 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2202 			rv = -EBUSY;
2203 			goto out;
2204 		case DLM_MSG_UNLOCK:
2205 		case DLM_MSG_CANCEL:
2206 			goto out;
2207 		}
2208 		/* add_to_waiters() will set OVERLAP_CANCEL */
2209 		goto out_ok;
2210 	}
2211 
2212 	/* do we need to allow a force-unlock if there's a normal unlock
2213 	   already in progress?  in what conditions could the normal unlock
2214 	   fail such that we'd want to send a force-unlock to be sure? */
2215 
2216 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2217 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2218 			goto out;
2219 
2220 		if (is_overlap_unlock(lkb))
2221 			goto out;
2222 
2223 		/* don't let scand try to do a cancel */
2224 		del_timeout(lkb);
2225 
2226 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2227 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2228 			rv = -EBUSY;
2229 			goto out;
2230 		}
2231 
2232 		switch (lkb->lkb_wait_type) {
2233 		case DLM_MSG_LOOKUP:
2234 		case DLM_MSG_REQUEST:
2235 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2236 			rv = -EBUSY;
2237 			goto out;
2238 		case DLM_MSG_UNLOCK:
2239 			goto out;
2240 		}
2241 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2242 		goto out_ok;
2243 	}
2244 
2245 	/* normal unlock not allowed if there's any op in progress */
2246 	rv = -EBUSY;
2247 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2248 		goto out;
2249 
2250  out_ok:
2251 	/* an overlapping op shouldn't blow away exflags from other op */
2252 	lkb->lkb_exflags |= args->flags;
2253 	lkb->lkb_sbflags = 0;
2254 	lkb->lkb_astparam = args->astparam;
2255 	rv = 0;
2256  out:
2257 	if (rv)
2258 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2259 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2260 			  args->flags, lkb->lkb_wait_type,
2261 			  lkb->lkb_resource->res_name);
2262 	return rv;
2263 }
2264 
2265 /*
2266  * Four stage 4 varieties:
2267  * do_request(), do_convert(), do_unlock(), do_cancel()
2268  * These are called on the master node for the given lock and
2269  * from the central locking logic.
2270  */
2271 
2272 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2273 {
2274 	int error = 0;
2275 
2276 	if (can_be_granted(r, lkb, 1, NULL)) {
2277 		grant_lock(r, lkb);
2278 		queue_cast(r, lkb, 0);
2279 		goto out;
2280 	}
2281 
2282 	if (can_be_queued(lkb)) {
2283 		error = -EINPROGRESS;
2284 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2285 		add_timeout(lkb);
2286 		goto out;
2287 	}
2288 
2289 	error = -EAGAIN;
2290 	queue_cast(r, lkb, -EAGAIN);
2291  out:
2292 	return error;
2293 }
2294 
2295 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2296 			       int error)
2297 {
2298 	switch (error) {
2299 	case -EAGAIN:
2300 		if (force_blocking_asts(lkb))
2301 			send_blocking_asts_all(r, lkb);
2302 		break;
2303 	case -EINPROGRESS:
2304 		send_blocking_asts(r, lkb);
2305 		break;
2306 	}
2307 }
2308 
2309 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2310 {
2311 	int error = 0;
2312 	int deadlk = 0;
2313 
2314 	/* changing an existing lock may allow others to be granted */
2315 
2316 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2317 		grant_lock(r, lkb);
2318 		queue_cast(r, lkb, 0);
2319 		goto out;
2320 	}
2321 
2322 	/* can_be_granted() detected that this lock would block in a conversion
2323 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2324 	   the ast for the convert. */
2325 
2326 	if (deadlk) {
2327 		/* it's left on the granted queue */
2328 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2329 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2330 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2331 		revert_lock(r, lkb);
2332 		queue_cast(r, lkb, -EDEADLK);
2333 		error = -EDEADLK;
2334 		goto out;
2335 	}
2336 
2337 	/* is_demoted() means the can_be_granted() above set the grmode
2338 	   to NL, and left us on the granted queue.  This auto-demotion
2339 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2340 	   now grantable.  We have to try to grant other converting locks
2341 	   before we try again to grant this one. */
2342 
2343 	if (is_demoted(lkb)) {
2344 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2345 		if (_can_be_granted(r, lkb, 1)) {
2346 			grant_lock(r, lkb);
2347 			queue_cast(r, lkb, 0);
2348 			goto out;
2349 		}
2350 		/* else fall through and move to convert queue */
2351 	}
2352 
2353 	if (can_be_queued(lkb)) {
2354 		error = -EINPROGRESS;
2355 		del_lkb(r, lkb);
2356 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2357 		add_timeout(lkb);
2358 		goto out;
2359 	}
2360 
2361 	error = -EAGAIN;
2362 	queue_cast(r, lkb, -EAGAIN);
2363  out:
2364 	return error;
2365 }
2366 
2367 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2368 			       int error)
2369 {
2370 	switch (error) {
2371 	case 0:
2372 		grant_pending_locks(r);
2373 		/* grant_pending_locks also sends basts */
2374 		break;
2375 	case -EAGAIN:
2376 		if (force_blocking_asts(lkb))
2377 			send_blocking_asts_all(r, lkb);
2378 		break;
2379 	case -EINPROGRESS:
2380 		send_blocking_asts(r, lkb);
2381 		break;
2382 	}
2383 }
2384 
2385 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387 	remove_lock(r, lkb);
2388 	queue_cast(r, lkb, -DLM_EUNLOCK);
2389 	return -DLM_EUNLOCK;
2390 }
2391 
2392 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2393 			      int error)
2394 {
2395 	grant_pending_locks(r);
2396 }
2397 
2398 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2399 
2400 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2401 {
2402 	int error;
2403 
2404 	error = revert_lock(r, lkb);
2405 	if (error) {
2406 		queue_cast(r, lkb, -DLM_ECANCEL);
2407 		return -DLM_ECANCEL;
2408 	}
2409 	return 0;
2410 }
2411 
2412 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2413 			      int error)
2414 {
2415 	if (error)
2416 		grant_pending_locks(r);
2417 }
2418 
2419 /*
2420  * Four stage 3 varieties:
2421  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2422  */
2423 
2424 /* add a new lkb to a possibly new rsb, called by requesting process */
2425 
2426 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2427 {
2428 	int error;
2429 
2430 	/* set_master: sets lkb nodeid from r */
2431 
2432 	error = set_master(r, lkb);
2433 	if (error < 0)
2434 		goto out;
2435 	if (error) {
2436 		error = 0;
2437 		goto out;
2438 	}
2439 
2440 	if (is_remote(r)) {
2441 		/* receive_request() calls do_request() on remote node */
2442 		error = send_request(r, lkb);
2443 	} else {
2444 		error = do_request(r, lkb);
2445 		/* for remote locks the request_reply is sent
2446 		   between do_request and do_request_effects */
2447 		do_request_effects(r, lkb, error);
2448 	}
2449  out:
2450 	return error;
2451 }
2452 
2453 /* change some property of an existing lkb, e.g. mode */
2454 
2455 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2456 {
2457 	int error;
2458 
2459 	if (is_remote(r)) {
2460 		/* receive_convert() calls do_convert() on remote node */
2461 		error = send_convert(r, lkb);
2462 	} else {
2463 		error = do_convert(r, lkb);
2464 		/* for remote locks the convert_reply is sent
2465 		   between do_convert and do_convert_effects */
2466 		do_convert_effects(r, lkb, error);
2467 	}
2468 
2469 	return error;
2470 }
2471 
2472 /* remove an existing lkb from the granted queue */
2473 
2474 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2475 {
2476 	int error;
2477 
2478 	if (is_remote(r)) {
2479 		/* receive_unlock() calls do_unlock() on remote node */
2480 		error = send_unlock(r, lkb);
2481 	} else {
2482 		error = do_unlock(r, lkb);
2483 		/* for remote locks the unlock_reply is sent
2484 		   between do_unlock and do_unlock_effects */
2485 		do_unlock_effects(r, lkb, error);
2486 	}
2487 
2488 	return error;
2489 }
2490 
2491 /* remove an existing lkb from the convert or wait queue */
2492 
2493 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2494 {
2495 	int error;
2496 
2497 	if (is_remote(r)) {
2498 		/* receive_cancel() calls do_cancel() on remote node */
2499 		error = send_cancel(r, lkb);
2500 	} else {
2501 		error = do_cancel(r, lkb);
2502 		/* for remote locks the cancel_reply is sent
2503 		   between do_cancel and do_cancel_effects */
2504 		do_cancel_effects(r, lkb, error);
2505 	}
2506 
2507 	return error;
2508 }
2509 
2510 /*
2511  * Four stage 2 varieties:
2512  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2513  */
2514 
2515 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2516 			int len, struct dlm_args *args)
2517 {
2518 	struct dlm_rsb *r;
2519 	int error;
2520 
2521 	error = validate_lock_args(ls, lkb, args);
2522 	if (error)
2523 		goto out;
2524 
2525 	error = find_rsb(ls, name, len, R_CREATE, &r);
2526 	if (error)
2527 		goto out;
2528 
2529 	lock_rsb(r);
2530 
2531 	attach_lkb(r, lkb);
2532 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2533 
2534 	error = _request_lock(r, lkb);
2535 
2536 	unlock_rsb(r);
2537 	put_rsb(r);
2538 
2539  out:
2540 	return error;
2541 }
2542 
2543 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2544 			struct dlm_args *args)
2545 {
2546 	struct dlm_rsb *r;
2547 	int error;
2548 
2549 	r = lkb->lkb_resource;
2550 
2551 	hold_rsb(r);
2552 	lock_rsb(r);
2553 
2554 	error = validate_lock_args(ls, lkb, args);
2555 	if (error)
2556 		goto out;
2557 
2558 	error = _convert_lock(r, lkb);
2559  out:
2560 	unlock_rsb(r);
2561 	put_rsb(r);
2562 	return error;
2563 }
2564 
2565 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2566 		       struct dlm_args *args)
2567 {
2568 	struct dlm_rsb *r;
2569 	int error;
2570 
2571 	r = lkb->lkb_resource;
2572 
2573 	hold_rsb(r);
2574 	lock_rsb(r);
2575 
2576 	error = validate_unlock_args(lkb, args);
2577 	if (error)
2578 		goto out;
2579 
2580 	error = _unlock_lock(r, lkb);
2581  out:
2582 	unlock_rsb(r);
2583 	put_rsb(r);
2584 	return error;
2585 }
2586 
2587 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2588 		       struct dlm_args *args)
2589 {
2590 	struct dlm_rsb *r;
2591 	int error;
2592 
2593 	r = lkb->lkb_resource;
2594 
2595 	hold_rsb(r);
2596 	lock_rsb(r);
2597 
2598 	error = validate_unlock_args(lkb, args);
2599 	if (error)
2600 		goto out;
2601 
2602 	error = _cancel_lock(r, lkb);
2603  out:
2604 	unlock_rsb(r);
2605 	put_rsb(r);
2606 	return error;
2607 }
2608 
2609 /*
2610  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2611  */
2612 
2613 int dlm_lock(dlm_lockspace_t *lockspace,
2614 	     int mode,
2615 	     struct dlm_lksb *lksb,
2616 	     uint32_t flags,
2617 	     void *name,
2618 	     unsigned int namelen,
2619 	     uint32_t parent_lkid,
2620 	     void (*ast) (void *astarg),
2621 	     void *astarg,
2622 	     void (*bast) (void *astarg, int mode))
2623 {
2624 	struct dlm_ls *ls;
2625 	struct dlm_lkb *lkb;
2626 	struct dlm_args args;
2627 	int error, convert = flags & DLM_LKF_CONVERT;
2628 
2629 	ls = dlm_find_lockspace_local(lockspace);
2630 	if (!ls)
2631 		return -EINVAL;
2632 
2633 	dlm_lock_recovery(ls);
2634 
2635 	if (convert)
2636 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2637 	else
2638 		error = create_lkb(ls, &lkb);
2639 
2640 	if (error)
2641 		goto out;
2642 
2643 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2644 			      astarg, bast, &args);
2645 	if (error)
2646 		goto out_put;
2647 
2648 	if (convert)
2649 		error = convert_lock(ls, lkb, &args);
2650 	else
2651 		error = request_lock(ls, lkb, name, namelen, &args);
2652 
2653 	if (error == -EINPROGRESS)
2654 		error = 0;
2655  out_put:
2656 	if (convert || error)
2657 		__put_lkb(ls, lkb);
2658 	if (error == -EAGAIN || error == -EDEADLK)
2659 		error = 0;
2660  out:
2661 	dlm_unlock_recovery(ls);
2662 	dlm_put_lockspace(ls);
2663 	return error;
2664 }
2665 
2666 int dlm_unlock(dlm_lockspace_t *lockspace,
2667 	       uint32_t lkid,
2668 	       uint32_t flags,
2669 	       struct dlm_lksb *lksb,
2670 	       void *astarg)
2671 {
2672 	struct dlm_ls *ls;
2673 	struct dlm_lkb *lkb;
2674 	struct dlm_args args;
2675 	int error;
2676 
2677 	ls = dlm_find_lockspace_local(lockspace);
2678 	if (!ls)
2679 		return -EINVAL;
2680 
2681 	dlm_lock_recovery(ls);
2682 
2683 	error = find_lkb(ls, lkid, &lkb);
2684 	if (error)
2685 		goto out;
2686 
2687 	error = set_unlock_args(flags, astarg, &args);
2688 	if (error)
2689 		goto out_put;
2690 
2691 	if (flags & DLM_LKF_CANCEL)
2692 		error = cancel_lock(ls, lkb, &args);
2693 	else
2694 		error = unlock_lock(ls, lkb, &args);
2695 
2696 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2697 		error = 0;
2698 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2699 		error = 0;
2700  out_put:
2701 	dlm_put_lkb(lkb);
2702  out:
2703 	dlm_unlock_recovery(ls);
2704 	dlm_put_lockspace(ls);
2705 	return error;
2706 }
2707 
2708 /*
2709  * send/receive routines for remote operations and replies
2710  *
2711  * send_args
2712  * send_common
2713  * send_request			receive_request
2714  * send_convert			receive_convert
2715  * send_unlock			receive_unlock
2716  * send_cancel			receive_cancel
2717  * send_grant			receive_grant
2718  * send_bast			receive_bast
2719  * send_lookup			receive_lookup
2720  * send_remove			receive_remove
2721  *
2722  * 				send_common_reply
2723  * receive_request_reply	send_request_reply
2724  * receive_convert_reply	send_convert_reply
2725  * receive_unlock_reply		send_unlock_reply
2726  * receive_cancel_reply		send_cancel_reply
2727  * receive_lookup_reply		send_lookup_reply
2728  */
2729 
2730 static int _create_message(struct dlm_ls *ls, int mb_len,
2731 			   int to_nodeid, int mstype,
2732 			   struct dlm_message **ms_ret,
2733 			   struct dlm_mhandle **mh_ret)
2734 {
2735 	struct dlm_message *ms;
2736 	struct dlm_mhandle *mh;
2737 	char *mb;
2738 
2739 	/* get_buffer gives us a message handle (mh) that we need to
2740 	   pass into lowcomms_commit and a message buffer (mb) that we
2741 	   write our data into */
2742 
2743 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2744 	if (!mh)
2745 		return -ENOBUFS;
2746 
2747 	memset(mb, 0, mb_len);
2748 
2749 	ms = (struct dlm_message *) mb;
2750 
2751 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2752 	ms->m_header.h_lockspace = ls->ls_global_id;
2753 	ms->m_header.h_nodeid = dlm_our_nodeid();
2754 	ms->m_header.h_length = mb_len;
2755 	ms->m_header.h_cmd = DLM_MSG;
2756 
2757 	ms->m_type = mstype;
2758 
2759 	*mh_ret = mh;
2760 	*ms_ret = ms;
2761 	return 0;
2762 }
2763 
2764 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2765 			  int to_nodeid, int mstype,
2766 			  struct dlm_message **ms_ret,
2767 			  struct dlm_mhandle **mh_ret)
2768 {
2769 	int mb_len = sizeof(struct dlm_message);
2770 
2771 	switch (mstype) {
2772 	case DLM_MSG_REQUEST:
2773 	case DLM_MSG_LOOKUP:
2774 	case DLM_MSG_REMOVE:
2775 		mb_len += r->res_length;
2776 		break;
2777 	case DLM_MSG_CONVERT:
2778 	case DLM_MSG_UNLOCK:
2779 	case DLM_MSG_REQUEST_REPLY:
2780 	case DLM_MSG_CONVERT_REPLY:
2781 	case DLM_MSG_GRANT:
2782 		if (lkb && lkb->lkb_lvbptr)
2783 			mb_len += r->res_ls->ls_lvblen;
2784 		break;
2785 	}
2786 
2787 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2788 			       ms_ret, mh_ret);
2789 }
2790 
2791 /* further lowcomms enhancements or alternate implementations may make
2792    the return value from this function useful at some point */
2793 
2794 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2795 {
2796 	dlm_message_out(ms);
2797 	dlm_lowcomms_commit_buffer(mh);
2798 	return 0;
2799 }
2800 
2801 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2802 		      struct dlm_message *ms)
2803 {
2804 	ms->m_nodeid   = lkb->lkb_nodeid;
2805 	ms->m_pid      = lkb->lkb_ownpid;
2806 	ms->m_lkid     = lkb->lkb_id;
2807 	ms->m_remid    = lkb->lkb_remid;
2808 	ms->m_exflags  = lkb->lkb_exflags;
2809 	ms->m_sbflags  = lkb->lkb_sbflags;
2810 	ms->m_flags    = lkb->lkb_flags;
2811 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2812 	ms->m_status   = lkb->lkb_status;
2813 	ms->m_grmode   = lkb->lkb_grmode;
2814 	ms->m_rqmode   = lkb->lkb_rqmode;
2815 	ms->m_hash     = r->res_hash;
2816 
2817 	/* m_result and m_bastmode are set from function args,
2818 	   not from lkb fields */
2819 
2820 	if (lkb->lkb_bastfn)
2821 		ms->m_asts |= AST_BAST;
2822 	if (lkb->lkb_astfn)
2823 		ms->m_asts |= AST_COMP;
2824 
2825 	/* compare with switch in create_message; send_remove() doesn't
2826 	   use send_args() */
2827 
2828 	switch (ms->m_type) {
2829 	case DLM_MSG_REQUEST:
2830 	case DLM_MSG_LOOKUP:
2831 		memcpy(ms->m_extra, r->res_name, r->res_length);
2832 		break;
2833 	case DLM_MSG_CONVERT:
2834 	case DLM_MSG_UNLOCK:
2835 	case DLM_MSG_REQUEST_REPLY:
2836 	case DLM_MSG_CONVERT_REPLY:
2837 	case DLM_MSG_GRANT:
2838 		if (!lkb->lkb_lvbptr)
2839 			break;
2840 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2841 		break;
2842 	}
2843 }
2844 
2845 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2846 {
2847 	struct dlm_message *ms;
2848 	struct dlm_mhandle *mh;
2849 	int to_nodeid, error;
2850 
2851 	error = add_to_waiters(lkb, mstype);
2852 	if (error)
2853 		return error;
2854 
2855 	to_nodeid = r->res_nodeid;
2856 
2857 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2858 	if (error)
2859 		goto fail;
2860 
2861 	send_args(r, lkb, ms);
2862 
2863 	error = send_message(mh, ms);
2864 	if (error)
2865 		goto fail;
2866 	return 0;
2867 
2868  fail:
2869 	remove_from_waiters(lkb, msg_reply_type(mstype));
2870 	return error;
2871 }
2872 
2873 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2874 {
2875 	return send_common(r, lkb, DLM_MSG_REQUEST);
2876 }
2877 
2878 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2879 {
2880 	int error;
2881 
2882 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2883 
2884 	/* down conversions go without a reply from the master */
2885 	if (!error && down_conversion(lkb)) {
2886 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2887 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2888 		r->res_ls->ls_stub_ms.m_result = 0;
2889 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2890 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2891 	}
2892 
2893 	return error;
2894 }
2895 
2896 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2897    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2898    that the master is still correct. */
2899 
2900 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2901 {
2902 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2903 }
2904 
2905 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2906 {
2907 	return send_common(r, lkb, DLM_MSG_CANCEL);
2908 }
2909 
2910 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2911 {
2912 	struct dlm_message *ms;
2913 	struct dlm_mhandle *mh;
2914 	int to_nodeid, error;
2915 
2916 	to_nodeid = lkb->lkb_nodeid;
2917 
2918 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2919 	if (error)
2920 		goto out;
2921 
2922 	send_args(r, lkb, ms);
2923 
2924 	ms->m_result = 0;
2925 
2926 	error = send_message(mh, ms);
2927  out:
2928 	return error;
2929 }
2930 
2931 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2932 {
2933 	struct dlm_message *ms;
2934 	struct dlm_mhandle *mh;
2935 	int to_nodeid, error;
2936 
2937 	to_nodeid = lkb->lkb_nodeid;
2938 
2939 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2940 	if (error)
2941 		goto out;
2942 
2943 	send_args(r, lkb, ms);
2944 
2945 	ms->m_bastmode = mode;
2946 
2947 	error = send_message(mh, ms);
2948  out:
2949 	return error;
2950 }
2951 
2952 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2953 {
2954 	struct dlm_message *ms;
2955 	struct dlm_mhandle *mh;
2956 	int to_nodeid, error;
2957 
2958 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2959 	if (error)
2960 		return error;
2961 
2962 	to_nodeid = dlm_dir_nodeid(r);
2963 
2964 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2965 	if (error)
2966 		goto fail;
2967 
2968 	send_args(r, lkb, ms);
2969 
2970 	error = send_message(mh, ms);
2971 	if (error)
2972 		goto fail;
2973 	return 0;
2974 
2975  fail:
2976 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2977 	return error;
2978 }
2979 
2980 static int send_remove(struct dlm_rsb *r)
2981 {
2982 	struct dlm_message *ms;
2983 	struct dlm_mhandle *mh;
2984 	int to_nodeid, error;
2985 
2986 	to_nodeid = dlm_dir_nodeid(r);
2987 
2988 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2989 	if (error)
2990 		goto out;
2991 
2992 	memcpy(ms->m_extra, r->res_name, r->res_length);
2993 	ms->m_hash = r->res_hash;
2994 
2995 	error = send_message(mh, ms);
2996  out:
2997 	return error;
2998 }
2999 
3000 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3001 			     int mstype, int rv)
3002 {
3003 	struct dlm_message *ms;
3004 	struct dlm_mhandle *mh;
3005 	int to_nodeid, error;
3006 
3007 	to_nodeid = lkb->lkb_nodeid;
3008 
3009 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3010 	if (error)
3011 		goto out;
3012 
3013 	send_args(r, lkb, ms);
3014 
3015 	ms->m_result = rv;
3016 
3017 	error = send_message(mh, ms);
3018  out:
3019 	return error;
3020 }
3021 
3022 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3023 {
3024 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3025 }
3026 
3027 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3028 {
3029 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3030 }
3031 
3032 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3033 {
3034 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3035 }
3036 
3037 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3038 {
3039 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3040 }
3041 
3042 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3043 			     int ret_nodeid, int rv)
3044 {
3045 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3046 	struct dlm_message *ms;
3047 	struct dlm_mhandle *mh;
3048 	int error, nodeid = ms_in->m_header.h_nodeid;
3049 
3050 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3051 	if (error)
3052 		goto out;
3053 
3054 	ms->m_lkid = ms_in->m_lkid;
3055 	ms->m_result = rv;
3056 	ms->m_nodeid = ret_nodeid;
3057 
3058 	error = send_message(mh, ms);
3059  out:
3060 	return error;
3061 }
3062 
3063 /* which args we save from a received message depends heavily on the type
3064    of message, unlike the send side where we can safely send everything about
3065    the lkb for any type of message */
3066 
3067 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3068 {
3069 	lkb->lkb_exflags = ms->m_exflags;
3070 	lkb->lkb_sbflags = ms->m_sbflags;
3071 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3072 		         (ms->m_flags & 0x0000FFFF);
3073 }
3074 
3075 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3076 {
3077 	lkb->lkb_sbflags = ms->m_sbflags;
3078 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3079 		         (ms->m_flags & 0x0000FFFF);
3080 }
3081 
3082 static int receive_extralen(struct dlm_message *ms)
3083 {
3084 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3085 }
3086 
3087 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3088 		       struct dlm_message *ms)
3089 {
3090 	int len;
3091 
3092 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3093 		if (!lkb->lkb_lvbptr)
3094 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3095 		if (!lkb->lkb_lvbptr)
3096 			return -ENOMEM;
3097 		len = receive_extralen(ms);
3098 		if (len > DLM_RESNAME_MAXLEN)
3099 			len = DLM_RESNAME_MAXLEN;
3100 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3101 	}
3102 	return 0;
3103 }
3104 
3105 static void fake_bastfn(void *astparam, int mode)
3106 {
3107 	log_print("fake_bastfn should not be called");
3108 }
3109 
3110 static void fake_astfn(void *astparam)
3111 {
3112 	log_print("fake_astfn should not be called");
3113 }
3114 
3115 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3116 				struct dlm_message *ms)
3117 {
3118 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3119 	lkb->lkb_ownpid = ms->m_pid;
3120 	lkb->lkb_remid = ms->m_lkid;
3121 	lkb->lkb_grmode = DLM_LOCK_IV;
3122 	lkb->lkb_rqmode = ms->m_rqmode;
3123 
3124 	lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3125 	lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3126 
3127 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3128 		/* lkb was just created so there won't be an lvb yet */
3129 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3130 		if (!lkb->lkb_lvbptr)
3131 			return -ENOMEM;
3132 	}
3133 
3134 	return 0;
3135 }
3136 
3137 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3138 				struct dlm_message *ms)
3139 {
3140 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3141 		return -EBUSY;
3142 
3143 	if (receive_lvb(ls, lkb, ms))
3144 		return -ENOMEM;
3145 
3146 	lkb->lkb_rqmode = ms->m_rqmode;
3147 	lkb->lkb_lvbseq = ms->m_lvbseq;
3148 
3149 	return 0;
3150 }
3151 
3152 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3153 			       struct dlm_message *ms)
3154 {
3155 	if (receive_lvb(ls, lkb, ms))
3156 		return -ENOMEM;
3157 	return 0;
3158 }
3159 
3160 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3161    uses to send a reply and that the remote end uses to process the reply. */
3162 
3163 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3166 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3167 	lkb->lkb_remid = ms->m_lkid;
3168 }
3169 
3170 /* This is called after the rsb is locked so that we can safely inspect
3171    fields in the lkb. */
3172 
3173 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3174 {
3175 	int from = ms->m_header.h_nodeid;
3176 	int error = 0;
3177 
3178 	switch (ms->m_type) {
3179 	case DLM_MSG_CONVERT:
3180 	case DLM_MSG_UNLOCK:
3181 	case DLM_MSG_CANCEL:
3182 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3183 			error = -EINVAL;
3184 		break;
3185 
3186 	case DLM_MSG_CONVERT_REPLY:
3187 	case DLM_MSG_UNLOCK_REPLY:
3188 	case DLM_MSG_CANCEL_REPLY:
3189 	case DLM_MSG_GRANT:
3190 	case DLM_MSG_BAST:
3191 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3192 			error = -EINVAL;
3193 		break;
3194 
3195 	case DLM_MSG_REQUEST_REPLY:
3196 		if (!is_process_copy(lkb))
3197 			error = -EINVAL;
3198 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3199 			error = -EINVAL;
3200 		break;
3201 
3202 	default:
3203 		error = -EINVAL;
3204 	}
3205 
3206 	if (error)
3207 		log_error(lkb->lkb_resource->res_ls,
3208 			  "ignore invalid message %d from %d %x %x %x %d",
3209 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3210 			  lkb->lkb_flags, lkb->lkb_nodeid);
3211 	return error;
3212 }
3213 
3214 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3215 {
3216 	struct dlm_lkb *lkb;
3217 	struct dlm_rsb *r;
3218 	int error, namelen;
3219 
3220 	error = create_lkb(ls, &lkb);
3221 	if (error)
3222 		goto fail;
3223 
3224 	receive_flags(lkb, ms);
3225 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3226 	error = receive_request_args(ls, lkb, ms);
3227 	if (error) {
3228 		__put_lkb(ls, lkb);
3229 		goto fail;
3230 	}
3231 
3232 	namelen = receive_extralen(ms);
3233 
3234 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3235 	if (error) {
3236 		__put_lkb(ls, lkb);
3237 		goto fail;
3238 	}
3239 
3240 	lock_rsb(r);
3241 
3242 	attach_lkb(r, lkb);
3243 	error = do_request(r, lkb);
3244 	send_request_reply(r, lkb, error);
3245 	do_request_effects(r, lkb, error);
3246 
3247 	unlock_rsb(r);
3248 	put_rsb(r);
3249 
3250 	if (error == -EINPROGRESS)
3251 		error = 0;
3252 	if (error)
3253 		dlm_put_lkb(lkb);
3254 	return;
3255 
3256  fail:
3257 	setup_stub_lkb(ls, ms);
3258 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3259 }
3260 
3261 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3262 {
3263 	struct dlm_lkb *lkb;
3264 	struct dlm_rsb *r;
3265 	int error, reply = 1;
3266 
3267 	error = find_lkb(ls, ms->m_remid, &lkb);
3268 	if (error)
3269 		goto fail;
3270 
3271 	r = lkb->lkb_resource;
3272 
3273 	hold_rsb(r);
3274 	lock_rsb(r);
3275 
3276 	error = validate_message(lkb, ms);
3277 	if (error)
3278 		goto out;
3279 
3280 	receive_flags(lkb, ms);
3281 
3282 	error = receive_convert_args(ls, lkb, ms);
3283 	if (error) {
3284 		send_convert_reply(r, lkb, error);
3285 		goto out;
3286 	}
3287 
3288 	reply = !down_conversion(lkb);
3289 
3290 	error = do_convert(r, lkb);
3291 	if (reply)
3292 		send_convert_reply(r, lkb, error);
3293 	do_convert_effects(r, lkb, error);
3294  out:
3295 	unlock_rsb(r);
3296 	put_rsb(r);
3297 	dlm_put_lkb(lkb);
3298 	return;
3299 
3300  fail:
3301 	setup_stub_lkb(ls, ms);
3302 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3303 }
3304 
3305 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3306 {
3307 	struct dlm_lkb *lkb;
3308 	struct dlm_rsb *r;
3309 	int error;
3310 
3311 	error = find_lkb(ls, ms->m_remid, &lkb);
3312 	if (error)
3313 		goto fail;
3314 
3315 	r = lkb->lkb_resource;
3316 
3317 	hold_rsb(r);
3318 	lock_rsb(r);
3319 
3320 	error = validate_message(lkb, ms);
3321 	if (error)
3322 		goto out;
3323 
3324 	receive_flags(lkb, ms);
3325 
3326 	error = receive_unlock_args(ls, lkb, ms);
3327 	if (error) {
3328 		send_unlock_reply(r, lkb, error);
3329 		goto out;
3330 	}
3331 
3332 	error = do_unlock(r, lkb);
3333 	send_unlock_reply(r, lkb, error);
3334 	do_unlock_effects(r, lkb, error);
3335  out:
3336 	unlock_rsb(r);
3337 	put_rsb(r);
3338 	dlm_put_lkb(lkb);
3339 	return;
3340 
3341  fail:
3342 	setup_stub_lkb(ls, ms);
3343 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3344 }
3345 
3346 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3347 {
3348 	struct dlm_lkb *lkb;
3349 	struct dlm_rsb *r;
3350 	int error;
3351 
3352 	error = find_lkb(ls, ms->m_remid, &lkb);
3353 	if (error)
3354 		goto fail;
3355 
3356 	receive_flags(lkb, ms);
3357 
3358 	r = lkb->lkb_resource;
3359 
3360 	hold_rsb(r);
3361 	lock_rsb(r);
3362 
3363 	error = validate_message(lkb, ms);
3364 	if (error)
3365 		goto out;
3366 
3367 	error = do_cancel(r, lkb);
3368 	send_cancel_reply(r, lkb, error);
3369 	do_cancel_effects(r, lkb, error);
3370  out:
3371 	unlock_rsb(r);
3372 	put_rsb(r);
3373 	dlm_put_lkb(lkb);
3374 	return;
3375 
3376  fail:
3377 	setup_stub_lkb(ls, ms);
3378 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3379 }
3380 
3381 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3382 {
3383 	struct dlm_lkb *lkb;
3384 	struct dlm_rsb *r;
3385 	int error;
3386 
3387 	error = find_lkb(ls, ms->m_remid, &lkb);
3388 	if (error) {
3389 		log_debug(ls, "receive_grant from %d no lkb %x",
3390 			  ms->m_header.h_nodeid, ms->m_remid);
3391 		return;
3392 	}
3393 
3394 	r = lkb->lkb_resource;
3395 
3396 	hold_rsb(r);
3397 	lock_rsb(r);
3398 
3399 	error = validate_message(lkb, ms);
3400 	if (error)
3401 		goto out;
3402 
3403 	receive_flags_reply(lkb, ms);
3404 	if (is_altmode(lkb))
3405 		munge_altmode(lkb, ms);
3406 	grant_lock_pc(r, lkb, ms);
3407 	queue_cast(r, lkb, 0);
3408  out:
3409 	unlock_rsb(r);
3410 	put_rsb(r);
3411 	dlm_put_lkb(lkb);
3412 }
3413 
3414 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3415 {
3416 	struct dlm_lkb *lkb;
3417 	struct dlm_rsb *r;
3418 	int error;
3419 
3420 	error = find_lkb(ls, ms->m_remid, &lkb);
3421 	if (error) {
3422 		log_debug(ls, "receive_bast from %d no lkb %x",
3423 			  ms->m_header.h_nodeid, ms->m_remid);
3424 		return;
3425 	}
3426 
3427 	r = lkb->lkb_resource;
3428 
3429 	hold_rsb(r);
3430 	lock_rsb(r);
3431 
3432 	error = validate_message(lkb, ms);
3433 	if (error)
3434 		goto out;
3435 
3436 	queue_bast(r, lkb, ms->m_bastmode);
3437  out:
3438 	unlock_rsb(r);
3439 	put_rsb(r);
3440 	dlm_put_lkb(lkb);
3441 }
3442 
3443 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3444 {
3445 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3446 
3447 	from_nodeid = ms->m_header.h_nodeid;
3448 	our_nodeid = dlm_our_nodeid();
3449 
3450 	len = receive_extralen(ms);
3451 
3452 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3453 	if (dir_nodeid != our_nodeid) {
3454 		log_error(ls, "lookup dir_nodeid %d from %d",
3455 			  dir_nodeid, from_nodeid);
3456 		error = -EINVAL;
3457 		ret_nodeid = -1;
3458 		goto out;
3459 	}
3460 
3461 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3462 
3463 	/* Optimization: we're master so treat lookup as a request */
3464 	if (!error && ret_nodeid == our_nodeid) {
3465 		receive_request(ls, ms);
3466 		return;
3467 	}
3468  out:
3469 	send_lookup_reply(ls, ms, ret_nodeid, error);
3470 }
3471 
3472 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3473 {
3474 	int len, dir_nodeid, from_nodeid;
3475 
3476 	from_nodeid = ms->m_header.h_nodeid;
3477 
3478 	len = receive_extralen(ms);
3479 
3480 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3481 	if (dir_nodeid != dlm_our_nodeid()) {
3482 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3483 			  dir_nodeid, from_nodeid);
3484 		return;
3485 	}
3486 
3487 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3488 }
3489 
3490 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3491 {
3492 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3493 }
3494 
3495 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3496 {
3497 	struct dlm_lkb *lkb;
3498 	struct dlm_rsb *r;
3499 	int error, mstype, result;
3500 
3501 	error = find_lkb(ls, ms->m_remid, &lkb);
3502 	if (error) {
3503 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3504 			  ms->m_header.h_nodeid, ms->m_remid);
3505 		return;
3506 	}
3507 
3508 	r = lkb->lkb_resource;
3509 	hold_rsb(r);
3510 	lock_rsb(r);
3511 
3512 	error = validate_message(lkb, ms);
3513 	if (error)
3514 		goto out;
3515 
3516 	mstype = lkb->lkb_wait_type;
3517 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3518 	if (error)
3519 		goto out;
3520 
3521 	/* Optimization: the dir node was also the master, so it took our
3522 	   lookup as a request and sent request reply instead of lookup reply */
3523 	if (mstype == DLM_MSG_LOOKUP) {
3524 		r->res_nodeid = ms->m_header.h_nodeid;
3525 		lkb->lkb_nodeid = r->res_nodeid;
3526 	}
3527 
3528 	/* this is the value returned from do_request() on the master */
3529 	result = ms->m_result;
3530 
3531 	switch (result) {
3532 	case -EAGAIN:
3533 		/* request would block (be queued) on remote master */
3534 		queue_cast(r, lkb, -EAGAIN);
3535 		confirm_master(r, -EAGAIN);
3536 		unhold_lkb(lkb); /* undoes create_lkb() */
3537 		break;
3538 
3539 	case -EINPROGRESS:
3540 	case 0:
3541 		/* request was queued or granted on remote master */
3542 		receive_flags_reply(lkb, ms);
3543 		lkb->lkb_remid = ms->m_lkid;
3544 		if (is_altmode(lkb))
3545 			munge_altmode(lkb, ms);
3546 		if (result) {
3547 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3548 			add_timeout(lkb);
3549 		} else {
3550 			grant_lock_pc(r, lkb, ms);
3551 			queue_cast(r, lkb, 0);
3552 		}
3553 		confirm_master(r, result);
3554 		break;
3555 
3556 	case -EBADR:
3557 	case -ENOTBLK:
3558 		/* find_rsb failed to find rsb or rsb wasn't master */
3559 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3560 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3561 		r->res_nodeid = -1;
3562 		lkb->lkb_nodeid = -1;
3563 
3564 		if (is_overlap(lkb)) {
3565 			/* we'll ignore error in cancel/unlock reply */
3566 			queue_cast_overlap(r, lkb);
3567 			confirm_master(r, result);
3568 			unhold_lkb(lkb); /* undoes create_lkb() */
3569 		} else
3570 			_request_lock(r, lkb);
3571 		break;
3572 
3573 	default:
3574 		log_error(ls, "receive_request_reply %x error %d",
3575 			  lkb->lkb_id, result);
3576 	}
3577 
3578 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3579 		log_debug(ls, "receive_request_reply %x result %d unlock",
3580 			  lkb->lkb_id, result);
3581 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3582 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3583 		send_unlock(r, lkb);
3584 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3585 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3586 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3587 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3588 		send_cancel(r, lkb);
3589 	} else {
3590 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3591 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3592 	}
3593  out:
3594 	unlock_rsb(r);
3595 	put_rsb(r);
3596 	dlm_put_lkb(lkb);
3597 }
3598 
3599 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3600 				    struct dlm_message *ms)
3601 {
3602 	/* this is the value returned from do_convert() on the master */
3603 	switch (ms->m_result) {
3604 	case -EAGAIN:
3605 		/* convert would block (be queued) on remote master */
3606 		queue_cast(r, lkb, -EAGAIN);
3607 		break;
3608 
3609 	case -EDEADLK:
3610 		receive_flags_reply(lkb, ms);
3611 		revert_lock_pc(r, lkb);
3612 		queue_cast(r, lkb, -EDEADLK);
3613 		break;
3614 
3615 	case -EINPROGRESS:
3616 		/* convert was queued on remote master */
3617 		receive_flags_reply(lkb, ms);
3618 		if (is_demoted(lkb))
3619 			munge_demoted(lkb, ms);
3620 		del_lkb(r, lkb);
3621 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3622 		add_timeout(lkb);
3623 		break;
3624 
3625 	case 0:
3626 		/* convert was granted on remote master */
3627 		receive_flags_reply(lkb, ms);
3628 		if (is_demoted(lkb))
3629 			munge_demoted(lkb, ms);
3630 		grant_lock_pc(r, lkb, ms);
3631 		queue_cast(r, lkb, 0);
3632 		break;
3633 
3634 	default:
3635 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3636 			  lkb->lkb_id, ms->m_result);
3637 	}
3638 }
3639 
3640 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3641 {
3642 	struct dlm_rsb *r = lkb->lkb_resource;
3643 	int error;
3644 
3645 	hold_rsb(r);
3646 	lock_rsb(r);
3647 
3648 	error = validate_message(lkb, ms);
3649 	if (error)
3650 		goto out;
3651 
3652 	/* stub reply can happen with waiters_mutex held */
3653 	error = remove_from_waiters_ms(lkb, ms);
3654 	if (error)
3655 		goto out;
3656 
3657 	__receive_convert_reply(r, lkb, ms);
3658  out:
3659 	unlock_rsb(r);
3660 	put_rsb(r);
3661 }
3662 
3663 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3664 {
3665 	struct dlm_lkb *lkb;
3666 	int error;
3667 
3668 	error = find_lkb(ls, ms->m_remid, &lkb);
3669 	if (error) {
3670 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3671 			  ms->m_header.h_nodeid, ms->m_remid);
3672 		return;
3673 	}
3674 
3675 	_receive_convert_reply(lkb, ms);
3676 	dlm_put_lkb(lkb);
3677 }
3678 
3679 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3680 {
3681 	struct dlm_rsb *r = lkb->lkb_resource;
3682 	int error;
3683 
3684 	hold_rsb(r);
3685 	lock_rsb(r);
3686 
3687 	error = validate_message(lkb, ms);
3688 	if (error)
3689 		goto out;
3690 
3691 	/* stub reply can happen with waiters_mutex held */
3692 	error = remove_from_waiters_ms(lkb, ms);
3693 	if (error)
3694 		goto out;
3695 
3696 	/* this is the value returned from do_unlock() on the master */
3697 
3698 	switch (ms->m_result) {
3699 	case -DLM_EUNLOCK:
3700 		receive_flags_reply(lkb, ms);
3701 		remove_lock_pc(r, lkb);
3702 		queue_cast(r, lkb, -DLM_EUNLOCK);
3703 		break;
3704 	case -ENOENT:
3705 		break;
3706 	default:
3707 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3708 			  lkb->lkb_id, ms->m_result);
3709 	}
3710  out:
3711 	unlock_rsb(r);
3712 	put_rsb(r);
3713 }
3714 
3715 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3716 {
3717 	struct dlm_lkb *lkb;
3718 	int error;
3719 
3720 	error = find_lkb(ls, ms->m_remid, &lkb);
3721 	if (error) {
3722 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3723 			  ms->m_header.h_nodeid, ms->m_remid);
3724 		return;
3725 	}
3726 
3727 	_receive_unlock_reply(lkb, ms);
3728 	dlm_put_lkb(lkb);
3729 }
3730 
3731 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3732 {
3733 	struct dlm_rsb *r = lkb->lkb_resource;
3734 	int error;
3735 
3736 	hold_rsb(r);
3737 	lock_rsb(r);
3738 
3739 	error = validate_message(lkb, ms);
3740 	if (error)
3741 		goto out;
3742 
3743 	/* stub reply can happen with waiters_mutex held */
3744 	error = remove_from_waiters_ms(lkb, ms);
3745 	if (error)
3746 		goto out;
3747 
3748 	/* this is the value returned from do_cancel() on the master */
3749 
3750 	switch (ms->m_result) {
3751 	case -DLM_ECANCEL:
3752 		receive_flags_reply(lkb, ms);
3753 		revert_lock_pc(r, lkb);
3754 		queue_cast(r, lkb, -DLM_ECANCEL);
3755 		break;
3756 	case 0:
3757 		break;
3758 	default:
3759 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3760 			  lkb->lkb_id, ms->m_result);
3761 	}
3762  out:
3763 	unlock_rsb(r);
3764 	put_rsb(r);
3765 }
3766 
3767 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3768 {
3769 	struct dlm_lkb *lkb;
3770 	int error;
3771 
3772 	error = find_lkb(ls, ms->m_remid, &lkb);
3773 	if (error) {
3774 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3775 			  ms->m_header.h_nodeid, ms->m_remid);
3776 		return;
3777 	}
3778 
3779 	_receive_cancel_reply(lkb, ms);
3780 	dlm_put_lkb(lkb);
3781 }
3782 
3783 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3784 {
3785 	struct dlm_lkb *lkb;
3786 	struct dlm_rsb *r;
3787 	int error, ret_nodeid;
3788 
3789 	error = find_lkb(ls, ms->m_lkid, &lkb);
3790 	if (error) {
3791 		log_error(ls, "receive_lookup_reply no lkb");
3792 		return;
3793 	}
3794 
3795 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3796 	   FIXME: will a non-zero error ever be returned? */
3797 
3798 	r = lkb->lkb_resource;
3799 	hold_rsb(r);
3800 	lock_rsb(r);
3801 
3802 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3803 	if (error)
3804 		goto out;
3805 
3806 	ret_nodeid = ms->m_nodeid;
3807 	if (ret_nodeid == dlm_our_nodeid()) {
3808 		r->res_nodeid = 0;
3809 		ret_nodeid = 0;
3810 		r->res_first_lkid = 0;
3811 	} else {
3812 		/* set_master() will copy res_nodeid to lkb_nodeid */
3813 		r->res_nodeid = ret_nodeid;
3814 	}
3815 
3816 	if (is_overlap(lkb)) {
3817 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3818 			  lkb->lkb_id, lkb->lkb_flags);
3819 		queue_cast_overlap(r, lkb);
3820 		unhold_lkb(lkb); /* undoes create_lkb() */
3821 		goto out_list;
3822 	}
3823 
3824 	_request_lock(r, lkb);
3825 
3826  out_list:
3827 	if (!ret_nodeid)
3828 		process_lookup_list(r);
3829  out:
3830 	unlock_rsb(r);
3831 	put_rsb(r);
3832 	dlm_put_lkb(lkb);
3833 }
3834 
3835 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3836 {
3837 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3838 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3839 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3840 			  ms->m_remid, ms->m_result);
3841 		return;
3842 	}
3843 
3844 	switch (ms->m_type) {
3845 
3846 	/* messages sent to a master node */
3847 
3848 	case DLM_MSG_REQUEST:
3849 		receive_request(ls, ms);
3850 		break;
3851 
3852 	case DLM_MSG_CONVERT:
3853 		receive_convert(ls, ms);
3854 		break;
3855 
3856 	case DLM_MSG_UNLOCK:
3857 		receive_unlock(ls, ms);
3858 		break;
3859 
3860 	case DLM_MSG_CANCEL:
3861 		receive_cancel(ls, ms);
3862 		break;
3863 
3864 	/* messages sent from a master node (replies to above) */
3865 
3866 	case DLM_MSG_REQUEST_REPLY:
3867 		receive_request_reply(ls, ms);
3868 		break;
3869 
3870 	case DLM_MSG_CONVERT_REPLY:
3871 		receive_convert_reply(ls, ms);
3872 		break;
3873 
3874 	case DLM_MSG_UNLOCK_REPLY:
3875 		receive_unlock_reply(ls, ms);
3876 		break;
3877 
3878 	case DLM_MSG_CANCEL_REPLY:
3879 		receive_cancel_reply(ls, ms);
3880 		break;
3881 
3882 	/* messages sent from a master node (only two types of async msg) */
3883 
3884 	case DLM_MSG_GRANT:
3885 		receive_grant(ls, ms);
3886 		break;
3887 
3888 	case DLM_MSG_BAST:
3889 		receive_bast(ls, ms);
3890 		break;
3891 
3892 	/* messages sent to a dir node */
3893 
3894 	case DLM_MSG_LOOKUP:
3895 		receive_lookup(ls, ms);
3896 		break;
3897 
3898 	case DLM_MSG_REMOVE:
3899 		receive_remove(ls, ms);
3900 		break;
3901 
3902 	/* messages sent from a dir node (remove has no reply) */
3903 
3904 	case DLM_MSG_LOOKUP_REPLY:
3905 		receive_lookup_reply(ls, ms);
3906 		break;
3907 
3908 	/* other messages */
3909 
3910 	case DLM_MSG_PURGE:
3911 		receive_purge(ls, ms);
3912 		break;
3913 
3914 	default:
3915 		log_error(ls, "unknown message type %d", ms->m_type);
3916 	}
3917 
3918 	dlm_astd_wake();
3919 }
3920 
3921 /* If the lockspace is in recovery mode (locking stopped), then normal
3922    messages are saved on the requestqueue for processing after recovery is
3923    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3924    messages off the requestqueue before we process new ones. This occurs right
3925    after recovery completes when we transition from saving all messages on
3926    requestqueue, to processing all the saved messages, to processing new
3927    messages as they arrive. */
3928 
3929 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3930 				int nodeid)
3931 {
3932 	if (dlm_locking_stopped(ls)) {
3933 		dlm_add_requestqueue(ls, nodeid, ms);
3934 	} else {
3935 		dlm_wait_requestqueue(ls);
3936 		_receive_message(ls, ms);
3937 	}
3938 }
3939 
3940 /* This is called by dlm_recoverd to process messages that were saved on
3941    the requestqueue. */
3942 
3943 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3944 {
3945 	_receive_message(ls, ms);
3946 }
3947 
3948 /* This is called by the midcomms layer when something is received for
3949    the lockspace.  It could be either a MSG (normal message sent as part of
3950    standard locking activity) or an RCOM (recovery message sent as part of
3951    lockspace recovery). */
3952 
3953 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3954 {
3955 	struct dlm_header *hd = &p->header;
3956 	struct dlm_ls *ls;
3957 	int type = 0;
3958 
3959 	switch (hd->h_cmd) {
3960 	case DLM_MSG:
3961 		dlm_message_in(&p->message);
3962 		type = p->message.m_type;
3963 		break;
3964 	case DLM_RCOM:
3965 		dlm_rcom_in(&p->rcom);
3966 		type = p->rcom.rc_type;
3967 		break;
3968 	default:
3969 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3970 		return;
3971 	}
3972 
3973 	if (hd->h_nodeid != nodeid) {
3974 		log_print("invalid h_nodeid %d from %d lockspace %x",
3975 			  hd->h_nodeid, nodeid, hd->h_lockspace);
3976 		return;
3977 	}
3978 
3979 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3980 	if (!ls) {
3981 		if (dlm_config.ci_log_debug)
3982 			log_print("invalid lockspace %x from %d cmd %d type %d",
3983 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
3984 
3985 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3986 			dlm_send_ls_not_ready(nodeid, &p->rcom);
3987 		return;
3988 	}
3989 
3990 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3991 	   be inactive (in this ls) before transitioning to recovery mode */
3992 
3993 	down_read(&ls->ls_recv_active);
3994 	if (hd->h_cmd == DLM_MSG)
3995 		dlm_receive_message(ls, &p->message, nodeid);
3996 	else
3997 		dlm_receive_rcom(ls, &p->rcom, nodeid);
3998 	up_read(&ls->ls_recv_active);
3999 
4000 	dlm_put_lockspace(ls);
4001 }
4002 
4003 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4004 {
4005 	if (middle_conversion(lkb)) {
4006 		hold_lkb(lkb);
4007 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4008 		ls->ls_stub_ms.m_result = -EINPROGRESS;
4009 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4010 		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4011 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
4012 
4013 		/* Same special case as in receive_rcom_lock_args() */
4014 		lkb->lkb_grmode = DLM_LOCK_IV;
4015 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4016 		unhold_lkb(lkb);
4017 
4018 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4019 		lkb->lkb_flags |= DLM_IFL_RESEND;
4020 	}
4021 
4022 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4023 	   conversions are async; there's no reply from the remote master */
4024 }
4025 
4026 /* A waiting lkb needs recovery if the master node has failed, or
4027    the master node is changing (only when no directory is used) */
4028 
4029 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4030 {
4031 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
4032 		return 1;
4033 
4034 	if (!dlm_no_directory(ls))
4035 		return 0;
4036 
4037 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4038 		return 1;
4039 
4040 	return 0;
4041 }
4042 
4043 /* Recovery for locks that are waiting for replies from nodes that are now
4044    gone.  We can just complete unlocks and cancels by faking a reply from the
4045    dead node.  Requests and up-conversions we flag to be resent after
4046    recovery.  Down-conversions can just be completed with a fake reply like
4047    unlocks.  Conversions between PR and CW need special attention. */
4048 
4049 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4050 {
4051 	struct dlm_lkb *lkb, *safe;
4052 	int wait_type, stub_unlock_result, stub_cancel_result;
4053 
4054 	mutex_lock(&ls->ls_waiters_mutex);
4055 
4056 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4057 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4058 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4059 
4060 		/* all outstanding lookups, regardless of destination  will be
4061 		   resent after recovery is done */
4062 
4063 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4064 			lkb->lkb_flags |= DLM_IFL_RESEND;
4065 			continue;
4066 		}
4067 
4068 		if (!waiter_needs_recovery(ls, lkb))
4069 			continue;
4070 
4071 		wait_type = lkb->lkb_wait_type;
4072 		stub_unlock_result = -DLM_EUNLOCK;
4073 		stub_cancel_result = -DLM_ECANCEL;
4074 
4075 		/* Main reply may have been received leaving a zero wait_type,
4076 		   but a reply for the overlapping op may not have been
4077 		   received.  In that case we need to fake the appropriate
4078 		   reply for the overlap op. */
4079 
4080 		if (!wait_type) {
4081 			if (is_overlap_cancel(lkb)) {
4082 				wait_type = DLM_MSG_CANCEL;
4083 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4084 					stub_cancel_result = 0;
4085 			}
4086 			if (is_overlap_unlock(lkb)) {
4087 				wait_type = DLM_MSG_UNLOCK;
4088 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4089 					stub_unlock_result = -ENOENT;
4090 			}
4091 
4092 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4093 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
4094 				  stub_cancel_result, stub_unlock_result);
4095 		}
4096 
4097 		switch (wait_type) {
4098 
4099 		case DLM_MSG_REQUEST:
4100 			lkb->lkb_flags |= DLM_IFL_RESEND;
4101 			break;
4102 
4103 		case DLM_MSG_CONVERT:
4104 			recover_convert_waiter(ls, lkb);
4105 			break;
4106 
4107 		case DLM_MSG_UNLOCK:
4108 			hold_lkb(lkb);
4109 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4110 			ls->ls_stub_ms.m_result = stub_unlock_result;
4111 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4112 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4113 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
4114 			dlm_put_lkb(lkb);
4115 			break;
4116 
4117 		case DLM_MSG_CANCEL:
4118 			hold_lkb(lkb);
4119 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4120 			ls->ls_stub_ms.m_result = stub_cancel_result;
4121 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4122 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4123 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
4124 			dlm_put_lkb(lkb);
4125 			break;
4126 
4127 		default:
4128 			log_error(ls, "invalid lkb wait_type %d %d",
4129 				  lkb->lkb_wait_type, wait_type);
4130 		}
4131 		schedule();
4132 	}
4133 	mutex_unlock(&ls->ls_waiters_mutex);
4134 }
4135 
4136 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4137 {
4138 	struct dlm_lkb *lkb;
4139 	int found = 0;
4140 
4141 	mutex_lock(&ls->ls_waiters_mutex);
4142 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4143 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4144 			hold_lkb(lkb);
4145 			found = 1;
4146 			break;
4147 		}
4148 	}
4149 	mutex_unlock(&ls->ls_waiters_mutex);
4150 
4151 	if (!found)
4152 		lkb = NULL;
4153 	return lkb;
4154 }
4155 
4156 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4157    master or dir-node for r.  Processing the lkb may result in it being placed
4158    back on waiters. */
4159 
4160 /* We do this after normal locking has been enabled and any saved messages
4161    (in requestqueue) have been processed.  We should be confident that at
4162    this point we won't get or process a reply to any of these waiting
4163    operations.  But, new ops may be coming in on the rsbs/locks here from
4164    userspace or remotely. */
4165 
4166 /* there may have been an overlap unlock/cancel prior to recovery or after
4167    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4168    overlap flag would just have been set and nothing new sent.  we can be
4169    confident here than any replies to either the initial op or overlap ops
4170    prior to recovery have been received. */
4171 
4172 int dlm_recover_waiters_post(struct dlm_ls *ls)
4173 {
4174 	struct dlm_lkb *lkb;
4175 	struct dlm_rsb *r;
4176 	int error = 0, mstype, err, oc, ou;
4177 
4178 	while (1) {
4179 		if (dlm_locking_stopped(ls)) {
4180 			log_debug(ls, "recover_waiters_post aborted");
4181 			error = -EINTR;
4182 			break;
4183 		}
4184 
4185 		lkb = find_resend_waiter(ls);
4186 		if (!lkb)
4187 			break;
4188 
4189 		r = lkb->lkb_resource;
4190 		hold_rsb(r);
4191 		lock_rsb(r);
4192 
4193 		mstype = lkb->lkb_wait_type;
4194 		oc = is_overlap_cancel(lkb);
4195 		ou = is_overlap_unlock(lkb);
4196 		err = 0;
4197 
4198 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4199 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4200 
4201 		/* At this point we assume that we won't get a reply to any
4202 		   previous op or overlap op on this lock.  First, do a big
4203 		   remove_from_waiters() for all previous ops. */
4204 
4205 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4206 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4207 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4208 		lkb->lkb_wait_type = 0;
4209 		lkb->lkb_wait_count = 0;
4210 		mutex_lock(&ls->ls_waiters_mutex);
4211 		list_del_init(&lkb->lkb_wait_reply);
4212 		mutex_unlock(&ls->ls_waiters_mutex);
4213 		unhold_lkb(lkb); /* for waiters list */
4214 
4215 		if (oc || ou) {
4216 			/* do an unlock or cancel instead of resending */
4217 			switch (mstype) {
4218 			case DLM_MSG_LOOKUP:
4219 			case DLM_MSG_REQUEST:
4220 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4221 							-DLM_ECANCEL);
4222 				unhold_lkb(lkb); /* undoes create_lkb() */
4223 				break;
4224 			case DLM_MSG_CONVERT:
4225 				if (oc) {
4226 					queue_cast(r, lkb, -DLM_ECANCEL);
4227 				} else {
4228 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4229 					_unlock_lock(r, lkb);
4230 				}
4231 				break;
4232 			default:
4233 				err = 1;
4234 			}
4235 		} else {
4236 			switch (mstype) {
4237 			case DLM_MSG_LOOKUP:
4238 			case DLM_MSG_REQUEST:
4239 				_request_lock(r, lkb);
4240 				if (is_master(r))
4241 					confirm_master(r, 0);
4242 				break;
4243 			case DLM_MSG_CONVERT:
4244 				_convert_lock(r, lkb);
4245 				break;
4246 			default:
4247 				err = 1;
4248 			}
4249 		}
4250 
4251 		if (err)
4252 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4253 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4254 		unlock_rsb(r);
4255 		put_rsb(r);
4256 		dlm_put_lkb(lkb);
4257 	}
4258 
4259 	return error;
4260 }
4261 
4262 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4263 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4264 {
4265 	struct dlm_ls *ls = r->res_ls;
4266 	struct dlm_lkb *lkb, *safe;
4267 
4268 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4269 		if (test(ls, lkb)) {
4270 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4271 			del_lkb(r, lkb);
4272 			/* this put should free the lkb */
4273 			if (!dlm_put_lkb(lkb))
4274 				log_error(ls, "purged lkb not released");
4275 		}
4276 	}
4277 }
4278 
4279 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4280 {
4281 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4282 }
4283 
4284 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4285 {
4286 	return is_master_copy(lkb);
4287 }
4288 
4289 static void purge_dead_locks(struct dlm_rsb *r)
4290 {
4291 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4292 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4293 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4294 }
4295 
4296 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4297 {
4298 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4299 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4300 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4301 }
4302 
4303 /* Get rid of locks held by nodes that are gone. */
4304 
4305 int dlm_purge_locks(struct dlm_ls *ls)
4306 {
4307 	struct dlm_rsb *r;
4308 
4309 	log_debug(ls, "dlm_purge_locks");
4310 
4311 	down_write(&ls->ls_root_sem);
4312 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4313 		hold_rsb(r);
4314 		lock_rsb(r);
4315 		if (is_master(r))
4316 			purge_dead_locks(r);
4317 		unlock_rsb(r);
4318 		unhold_rsb(r);
4319 
4320 		schedule();
4321 	}
4322 	up_write(&ls->ls_root_sem);
4323 
4324 	return 0;
4325 }
4326 
4327 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4328 {
4329 	struct dlm_rsb *r, *r_ret = NULL;
4330 
4331 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4332 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4333 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4334 			continue;
4335 		hold_rsb(r);
4336 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4337 		r_ret = r;
4338 		break;
4339 	}
4340 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4341 	return r_ret;
4342 }
4343 
4344 void dlm_grant_after_purge(struct dlm_ls *ls)
4345 {
4346 	struct dlm_rsb *r;
4347 	int bucket = 0;
4348 
4349 	while (1) {
4350 		r = find_purged_rsb(ls, bucket);
4351 		if (!r) {
4352 			if (bucket == ls->ls_rsbtbl_size - 1)
4353 				break;
4354 			bucket++;
4355 			continue;
4356 		}
4357 		lock_rsb(r);
4358 		if (is_master(r)) {
4359 			grant_pending_locks(r);
4360 			confirm_master(r, 0);
4361 		}
4362 		unlock_rsb(r);
4363 		put_rsb(r);
4364 		schedule();
4365 	}
4366 }
4367 
4368 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4369 					 uint32_t remid)
4370 {
4371 	struct dlm_lkb *lkb;
4372 
4373 	list_for_each_entry(lkb, head, lkb_statequeue) {
4374 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4375 			return lkb;
4376 	}
4377 	return NULL;
4378 }
4379 
4380 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4381 				    uint32_t remid)
4382 {
4383 	struct dlm_lkb *lkb;
4384 
4385 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4386 	if (lkb)
4387 		return lkb;
4388 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4389 	if (lkb)
4390 		return lkb;
4391 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4392 	if (lkb)
4393 		return lkb;
4394 	return NULL;
4395 }
4396 
4397 /* needs at least dlm_rcom + rcom_lock */
4398 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4399 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4400 {
4401 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4402 
4403 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4404 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4405 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4406 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4407 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4408 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4409 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4410 	lkb->lkb_rqmode = rl->rl_rqmode;
4411 	lkb->lkb_grmode = rl->rl_grmode;
4412 	/* don't set lkb_status because add_lkb wants to itself */
4413 
4414 	lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4415 	lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4416 
4417 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4418 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4419 			 sizeof(struct rcom_lock);
4420 		if (lvblen > ls->ls_lvblen)
4421 			return -EINVAL;
4422 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4423 		if (!lkb->lkb_lvbptr)
4424 			return -ENOMEM;
4425 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4426 	}
4427 
4428 	/* Conversions between PR and CW (middle modes) need special handling.
4429 	   The real granted mode of these converting locks cannot be determined
4430 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4431 
4432 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4433 	    middle_conversion(lkb)) {
4434 		rl->rl_status = DLM_LKSTS_CONVERT;
4435 		lkb->lkb_grmode = DLM_LOCK_IV;
4436 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4437 	}
4438 
4439 	return 0;
4440 }
4441 
4442 /* This lkb may have been recovered in a previous aborted recovery so we need
4443    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4444    If so we just send back a standard reply.  If not, we create a new lkb with
4445    the given values and send back our lkid.  We send back our lkid by sending
4446    back the rcom_lock struct we got but with the remid field filled in. */
4447 
4448 /* needs at least dlm_rcom + rcom_lock */
4449 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4450 {
4451 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4452 	struct dlm_rsb *r;
4453 	struct dlm_lkb *lkb;
4454 	int error;
4455 
4456 	if (rl->rl_parent_lkid) {
4457 		error = -EOPNOTSUPP;
4458 		goto out;
4459 	}
4460 
4461 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4462 			 R_MASTER, &r);
4463 	if (error)
4464 		goto out;
4465 
4466 	lock_rsb(r);
4467 
4468 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4469 	if (lkb) {
4470 		error = -EEXIST;
4471 		goto out_remid;
4472 	}
4473 
4474 	error = create_lkb(ls, &lkb);
4475 	if (error)
4476 		goto out_unlock;
4477 
4478 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4479 	if (error) {
4480 		__put_lkb(ls, lkb);
4481 		goto out_unlock;
4482 	}
4483 
4484 	attach_lkb(r, lkb);
4485 	add_lkb(r, lkb, rl->rl_status);
4486 	error = 0;
4487 
4488  out_remid:
4489 	/* this is the new value returned to the lock holder for
4490 	   saving in its process-copy lkb */
4491 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4492 
4493  out_unlock:
4494 	unlock_rsb(r);
4495 	put_rsb(r);
4496  out:
4497 	if (error)
4498 		log_debug(ls, "recover_master_copy %d %x", error,
4499 			  le32_to_cpu(rl->rl_lkid));
4500 	rl->rl_result = cpu_to_le32(error);
4501 	return error;
4502 }
4503 
4504 /* needs at least dlm_rcom + rcom_lock */
4505 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4506 {
4507 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4508 	struct dlm_rsb *r;
4509 	struct dlm_lkb *lkb;
4510 	int error;
4511 
4512 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4513 	if (error) {
4514 		log_error(ls, "recover_process_copy no lkid %x",
4515 				le32_to_cpu(rl->rl_lkid));
4516 		return error;
4517 	}
4518 
4519 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4520 
4521 	error = le32_to_cpu(rl->rl_result);
4522 
4523 	r = lkb->lkb_resource;
4524 	hold_rsb(r);
4525 	lock_rsb(r);
4526 
4527 	switch (error) {
4528 	case -EBADR:
4529 		/* There's a chance the new master received our lock before
4530 		   dlm_recover_master_reply(), this wouldn't happen if we did
4531 		   a barrier between recover_masters and recover_locks. */
4532 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4533 			  (unsigned long)r, r->res_name);
4534 		dlm_send_rcom_lock(r, lkb);
4535 		goto out;
4536 	case -EEXIST:
4537 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4538 		/* fall through */
4539 	case 0:
4540 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4541 		break;
4542 	default:
4543 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4544 			  error, lkb->lkb_id);
4545 	}
4546 
4547 	/* an ack for dlm_recover_locks() which waits for replies from
4548 	   all the locks it sends to new masters */
4549 	dlm_recovered_lock(r);
4550  out:
4551 	unlock_rsb(r);
4552 	put_rsb(r);
4553 	dlm_put_lkb(lkb);
4554 
4555 	return 0;
4556 }
4557 
4558 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4559 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4560 		     unsigned long timeout_cs)
4561 {
4562 	struct dlm_lkb *lkb;
4563 	struct dlm_args args;
4564 	int error;
4565 
4566 	dlm_lock_recovery(ls);
4567 
4568 	error = create_lkb(ls, &lkb);
4569 	if (error) {
4570 		kfree(ua);
4571 		goto out;
4572 	}
4573 
4574 	if (flags & DLM_LKF_VALBLK) {
4575 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4576 		if (!ua->lksb.sb_lvbptr) {
4577 			kfree(ua);
4578 			__put_lkb(ls, lkb);
4579 			error = -ENOMEM;
4580 			goto out;
4581 		}
4582 	}
4583 
4584 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4585 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4586 	   lock and that lkb_astparam is the dlm_user_args structure. */
4587 
4588 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4589 			      fake_astfn, ua, fake_bastfn, &args);
4590 	lkb->lkb_flags |= DLM_IFL_USER;
4591 	ua->old_mode = DLM_LOCK_IV;
4592 
4593 	if (error) {
4594 		__put_lkb(ls, lkb);
4595 		goto out;
4596 	}
4597 
4598 	error = request_lock(ls, lkb, name, namelen, &args);
4599 
4600 	switch (error) {
4601 	case 0:
4602 		break;
4603 	case -EINPROGRESS:
4604 		error = 0;
4605 		break;
4606 	case -EAGAIN:
4607 		error = 0;
4608 		/* fall through */
4609 	default:
4610 		__put_lkb(ls, lkb);
4611 		goto out;
4612 	}
4613 
4614 	/* add this new lkb to the per-process list of locks */
4615 	spin_lock(&ua->proc->locks_spin);
4616 	hold_lkb(lkb);
4617 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4618 	spin_unlock(&ua->proc->locks_spin);
4619  out:
4620 	dlm_unlock_recovery(ls);
4621 	return error;
4622 }
4623 
4624 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4625 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4626 		     unsigned long timeout_cs)
4627 {
4628 	struct dlm_lkb *lkb;
4629 	struct dlm_args args;
4630 	struct dlm_user_args *ua;
4631 	int error;
4632 
4633 	dlm_lock_recovery(ls);
4634 
4635 	error = find_lkb(ls, lkid, &lkb);
4636 	if (error)
4637 		goto out;
4638 
4639 	/* user can change the params on its lock when it converts it, or
4640 	   add an lvb that didn't exist before */
4641 
4642 	ua = lkb->lkb_ua;
4643 
4644 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4645 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4646 		if (!ua->lksb.sb_lvbptr) {
4647 			error = -ENOMEM;
4648 			goto out_put;
4649 		}
4650 	}
4651 	if (lvb_in && ua->lksb.sb_lvbptr)
4652 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4653 
4654 	ua->xid = ua_tmp->xid;
4655 	ua->castparam = ua_tmp->castparam;
4656 	ua->castaddr = ua_tmp->castaddr;
4657 	ua->bastparam = ua_tmp->bastparam;
4658 	ua->bastaddr = ua_tmp->bastaddr;
4659 	ua->user_lksb = ua_tmp->user_lksb;
4660 	ua->old_mode = lkb->lkb_grmode;
4661 
4662 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4663 			      fake_astfn, ua, fake_bastfn, &args);
4664 	if (error)
4665 		goto out_put;
4666 
4667 	error = convert_lock(ls, lkb, &args);
4668 
4669 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4670 		error = 0;
4671  out_put:
4672 	dlm_put_lkb(lkb);
4673  out:
4674 	dlm_unlock_recovery(ls);
4675 	kfree(ua_tmp);
4676 	return error;
4677 }
4678 
4679 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4680 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4681 {
4682 	struct dlm_lkb *lkb;
4683 	struct dlm_args args;
4684 	struct dlm_user_args *ua;
4685 	int error;
4686 
4687 	dlm_lock_recovery(ls);
4688 
4689 	error = find_lkb(ls, lkid, &lkb);
4690 	if (error)
4691 		goto out;
4692 
4693 	ua = lkb->lkb_ua;
4694 
4695 	if (lvb_in && ua->lksb.sb_lvbptr)
4696 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4697 	if (ua_tmp->castparam)
4698 		ua->castparam = ua_tmp->castparam;
4699 	ua->user_lksb = ua_tmp->user_lksb;
4700 
4701 	error = set_unlock_args(flags, ua, &args);
4702 	if (error)
4703 		goto out_put;
4704 
4705 	error = unlock_lock(ls, lkb, &args);
4706 
4707 	if (error == -DLM_EUNLOCK)
4708 		error = 0;
4709 	/* from validate_unlock_args() */
4710 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4711 		error = 0;
4712 	if (error)
4713 		goto out_put;
4714 
4715 	spin_lock(&ua->proc->locks_spin);
4716 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4717 	if (!list_empty(&lkb->lkb_ownqueue))
4718 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4719 	spin_unlock(&ua->proc->locks_spin);
4720  out_put:
4721 	dlm_put_lkb(lkb);
4722  out:
4723 	dlm_unlock_recovery(ls);
4724 	kfree(ua_tmp);
4725 	return error;
4726 }
4727 
4728 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4729 		    uint32_t flags, uint32_t lkid)
4730 {
4731 	struct dlm_lkb *lkb;
4732 	struct dlm_args args;
4733 	struct dlm_user_args *ua;
4734 	int error;
4735 
4736 	dlm_lock_recovery(ls);
4737 
4738 	error = find_lkb(ls, lkid, &lkb);
4739 	if (error)
4740 		goto out;
4741 
4742 	ua = lkb->lkb_ua;
4743 	if (ua_tmp->castparam)
4744 		ua->castparam = ua_tmp->castparam;
4745 	ua->user_lksb = ua_tmp->user_lksb;
4746 
4747 	error = set_unlock_args(flags, ua, &args);
4748 	if (error)
4749 		goto out_put;
4750 
4751 	error = cancel_lock(ls, lkb, &args);
4752 
4753 	if (error == -DLM_ECANCEL)
4754 		error = 0;
4755 	/* from validate_unlock_args() */
4756 	if (error == -EBUSY)
4757 		error = 0;
4758  out_put:
4759 	dlm_put_lkb(lkb);
4760  out:
4761 	dlm_unlock_recovery(ls);
4762 	kfree(ua_tmp);
4763 	return error;
4764 }
4765 
4766 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4767 {
4768 	struct dlm_lkb *lkb;
4769 	struct dlm_args args;
4770 	struct dlm_user_args *ua;
4771 	struct dlm_rsb *r;
4772 	int error;
4773 
4774 	dlm_lock_recovery(ls);
4775 
4776 	error = find_lkb(ls, lkid, &lkb);
4777 	if (error)
4778 		goto out;
4779 
4780 	ua = lkb->lkb_ua;
4781 
4782 	error = set_unlock_args(flags, ua, &args);
4783 	if (error)
4784 		goto out_put;
4785 
4786 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4787 
4788 	r = lkb->lkb_resource;
4789 	hold_rsb(r);
4790 	lock_rsb(r);
4791 
4792 	error = validate_unlock_args(lkb, &args);
4793 	if (error)
4794 		goto out_r;
4795 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4796 
4797 	error = _cancel_lock(r, lkb);
4798  out_r:
4799 	unlock_rsb(r);
4800 	put_rsb(r);
4801 
4802 	if (error == -DLM_ECANCEL)
4803 		error = 0;
4804 	/* from validate_unlock_args() */
4805 	if (error == -EBUSY)
4806 		error = 0;
4807  out_put:
4808 	dlm_put_lkb(lkb);
4809  out:
4810 	dlm_unlock_recovery(ls);
4811 	return error;
4812 }
4813 
4814 /* lkb's that are removed from the waiters list by revert are just left on the
4815    orphans list with the granted orphan locks, to be freed by purge */
4816 
4817 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4818 {
4819 	struct dlm_args args;
4820 	int error;
4821 
4822 	hold_lkb(lkb);
4823 	mutex_lock(&ls->ls_orphans_mutex);
4824 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4825 	mutex_unlock(&ls->ls_orphans_mutex);
4826 
4827 	set_unlock_args(0, lkb->lkb_ua, &args);
4828 
4829 	error = cancel_lock(ls, lkb, &args);
4830 	if (error == -DLM_ECANCEL)
4831 		error = 0;
4832 	return error;
4833 }
4834 
4835 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4836    Regardless of what rsb queue the lock is on, it's removed and freed. */
4837 
4838 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4839 {
4840 	struct dlm_args args;
4841 	int error;
4842 
4843 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4844 
4845 	error = unlock_lock(ls, lkb, &args);
4846 	if (error == -DLM_EUNLOCK)
4847 		error = 0;
4848 	return error;
4849 }
4850 
4851 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4852    (which does lock_rsb) due to deadlock with receiving a message that does
4853    lock_rsb followed by dlm_user_add_ast() */
4854 
4855 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4856 				     struct dlm_user_proc *proc)
4857 {
4858 	struct dlm_lkb *lkb = NULL;
4859 
4860 	mutex_lock(&ls->ls_clear_proc_locks);
4861 	if (list_empty(&proc->locks))
4862 		goto out;
4863 
4864 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4865 	list_del_init(&lkb->lkb_ownqueue);
4866 
4867 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4868 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4869 	else
4870 		lkb->lkb_flags |= DLM_IFL_DEAD;
4871  out:
4872 	mutex_unlock(&ls->ls_clear_proc_locks);
4873 	return lkb;
4874 }
4875 
4876 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4877    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4878    which we clear here. */
4879 
4880 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4881    list, and no more device_writes should add lkb's to proc->locks list; so we
4882    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4883    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4884    them ourself. */
4885 
4886 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4887 {
4888 	struct dlm_lkb *lkb, *safe;
4889 
4890 	dlm_lock_recovery(ls);
4891 
4892 	while (1) {
4893 		lkb = del_proc_lock(ls, proc);
4894 		if (!lkb)
4895 			break;
4896 		del_timeout(lkb);
4897 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4898 			orphan_proc_lock(ls, lkb);
4899 		else
4900 			unlock_proc_lock(ls, lkb);
4901 
4902 		/* this removes the reference for the proc->locks list
4903 		   added by dlm_user_request, it may result in the lkb
4904 		   being freed */
4905 
4906 		dlm_put_lkb(lkb);
4907 	}
4908 
4909 	mutex_lock(&ls->ls_clear_proc_locks);
4910 
4911 	/* in-progress unlocks */
4912 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4913 		list_del_init(&lkb->lkb_ownqueue);
4914 		lkb->lkb_flags |= DLM_IFL_DEAD;
4915 		dlm_put_lkb(lkb);
4916 	}
4917 
4918 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4919 		lkb->lkb_ast_type = 0;
4920 		list_del(&lkb->lkb_astqueue);
4921 		dlm_put_lkb(lkb);
4922 	}
4923 
4924 	mutex_unlock(&ls->ls_clear_proc_locks);
4925 	dlm_unlock_recovery(ls);
4926 }
4927 
4928 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4929 {
4930 	struct dlm_lkb *lkb, *safe;
4931 
4932 	while (1) {
4933 		lkb = NULL;
4934 		spin_lock(&proc->locks_spin);
4935 		if (!list_empty(&proc->locks)) {
4936 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4937 					 lkb_ownqueue);
4938 			list_del_init(&lkb->lkb_ownqueue);
4939 		}
4940 		spin_unlock(&proc->locks_spin);
4941 
4942 		if (!lkb)
4943 			break;
4944 
4945 		lkb->lkb_flags |= DLM_IFL_DEAD;
4946 		unlock_proc_lock(ls, lkb);
4947 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4948 	}
4949 
4950 	spin_lock(&proc->locks_spin);
4951 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4952 		list_del_init(&lkb->lkb_ownqueue);
4953 		lkb->lkb_flags |= DLM_IFL_DEAD;
4954 		dlm_put_lkb(lkb);
4955 	}
4956 	spin_unlock(&proc->locks_spin);
4957 
4958 	spin_lock(&proc->asts_spin);
4959 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4960 		list_del(&lkb->lkb_astqueue);
4961 		dlm_put_lkb(lkb);
4962 	}
4963 	spin_unlock(&proc->asts_spin);
4964 }
4965 
4966 /* pid of 0 means purge all orphans */
4967 
4968 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4969 {
4970 	struct dlm_lkb *lkb, *safe;
4971 
4972 	mutex_lock(&ls->ls_orphans_mutex);
4973 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4974 		if (pid && lkb->lkb_ownpid != pid)
4975 			continue;
4976 		unlock_proc_lock(ls, lkb);
4977 		list_del_init(&lkb->lkb_ownqueue);
4978 		dlm_put_lkb(lkb);
4979 	}
4980 	mutex_unlock(&ls->ls_orphans_mutex);
4981 }
4982 
4983 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4984 {
4985 	struct dlm_message *ms;
4986 	struct dlm_mhandle *mh;
4987 	int error;
4988 
4989 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4990 				DLM_MSG_PURGE, &ms, &mh);
4991 	if (error)
4992 		return error;
4993 	ms->m_nodeid = nodeid;
4994 	ms->m_pid = pid;
4995 
4996 	return send_message(mh, ms);
4997 }
4998 
4999 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5000 		   int nodeid, int pid)
5001 {
5002 	int error = 0;
5003 
5004 	if (nodeid != dlm_our_nodeid()) {
5005 		error = send_purge(ls, nodeid, pid);
5006 	} else {
5007 		dlm_lock_recovery(ls);
5008 		if (pid == current->pid)
5009 			purge_proc_locks(ls, proc);
5010 		else
5011 			do_purge(ls, nodeid, pid);
5012 		dlm_unlock_recovery(ls);
5013 	}
5014 	return error;
5015 }
5016 
5017