xref: /linux/fs/dlm/lock.c (revision 12871a0bd67dd4db4418e1daafcd46e9d329ef10)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76 
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88 				    struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92 
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100 
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112 
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121 
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133 
134 #define modes_compat(gr, rq) \
135 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136 
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141 
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147 
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159 
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 	       "     status %d rqmode %d grmode %d wait_type %d\n",
164 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 	       lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168 
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
173 	       r->res_recover_locks_count, r->res_name);
174 }
175 
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178 	struct dlm_lkb *lkb;
179 
180 	dlm_print_rsb(r);
181 
182 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 	printk(KERN_ERR "rsb lookup list\n");
185 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 		dlm_print_lkb(lkb);
187 	printk(KERN_ERR "rsb grant queue:\n");
188 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 		dlm_print_lkb(lkb);
190 	printk(KERN_ERR "rsb convert queue:\n");
191 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 		dlm_print_lkb(lkb);
193 	printk(KERN_ERR "rsb wait queue:\n");
194 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 		dlm_print_lkb(lkb);
196 }
197 
198 /* Threads cannot use the lockspace while it's being recovered */
199 
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202 	down_read(&ls->ls_in_recovery);
203 }
204 
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207 	up_read(&ls->ls_in_recovery);
208 }
209 
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212 	return down_read_trylock(&ls->ls_in_recovery);
213 }
214 
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219 
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224 
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229 
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234 
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239 
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 	return !!r->res_nodeid;
244 }
245 
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250 
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257 
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262 		return 1;
263 	return 0;
264 }
265 
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270 
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275 
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280 
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 				  DLM_IFL_OVERLAP_CANCEL));
285 }
286 
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289 	if (is_master_copy(lkb))
290 		return;
291 
292 	del_timeout(lkb);
293 
294 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295 
296 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 	   timeout caused the cancel then return -ETIMEDOUT */
298 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 		rv = -ETIMEDOUT;
301 	}
302 
303 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 		rv = -EDEADLK;
306 	}
307 
308 	dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310 
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313 	queue_cast(r, lkb,
314 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316 
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319 	if (is_master_copy(lkb)) {
320 		send_bast(r, lkb, rqmode);
321 	} else {
322 		dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
323 	}
324 }
325 
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329 
330 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331 {
332 	struct dlm_rsb *r;
333 
334 	r = dlm_allocate_rsb(ls, len);
335 	if (!r)
336 		return NULL;
337 
338 	r->res_ls = ls;
339 	r->res_length = len;
340 	memcpy(r->res_name, name, len);
341 	mutex_init(&r->res_mutex);
342 
343 	INIT_LIST_HEAD(&r->res_lookup);
344 	INIT_LIST_HEAD(&r->res_grantqueue);
345 	INIT_LIST_HEAD(&r->res_convertqueue);
346 	INIT_LIST_HEAD(&r->res_waitqueue);
347 	INIT_LIST_HEAD(&r->res_root_list);
348 	INIT_LIST_HEAD(&r->res_recover_list);
349 
350 	return r;
351 }
352 
353 static int search_rsb_list(struct list_head *head, char *name, int len,
354 			   unsigned int flags, struct dlm_rsb **r_ret)
355 {
356 	struct dlm_rsb *r;
357 	int error = 0;
358 
359 	list_for_each_entry(r, head, res_hashchain) {
360 		if (len == r->res_length && !memcmp(name, r->res_name, len))
361 			goto found;
362 	}
363 	*r_ret = NULL;
364 	return -EBADR;
365 
366  found:
367 	if (r->res_nodeid && (flags & R_MASTER))
368 		error = -ENOTBLK;
369 	*r_ret = r;
370 	return error;
371 }
372 
373 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374 		       unsigned int flags, struct dlm_rsb **r_ret)
375 {
376 	struct dlm_rsb *r;
377 	int error;
378 
379 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380 	if (!error) {
381 		kref_get(&r->res_ref);
382 		goto out;
383 	}
384 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385 	if (error)
386 		goto out;
387 
388 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389 
390 	if (dlm_no_directory(ls))
391 		goto out;
392 
393 	if (r->res_nodeid == -1) {
394 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395 		r->res_first_lkid = 0;
396 	} else if (r->res_nodeid > 0) {
397 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398 		r->res_first_lkid = 0;
399 	} else {
400 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402 	}
403  out:
404 	*r_ret = r;
405 	return error;
406 }
407 
408 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409 		      unsigned int flags, struct dlm_rsb **r_ret)
410 {
411 	int error;
412 	spin_lock(&ls->ls_rsbtbl[b].lock);
413 	error = _search_rsb(ls, name, len, b, flags, r_ret);
414 	spin_unlock(&ls->ls_rsbtbl[b].lock);
415 	return error;
416 }
417 
418 /*
419  * Find rsb in rsbtbl and potentially create/add one
420  *
421  * Delaying the release of rsb's has a similar benefit to applications keeping
422  * NL locks on an rsb, but without the guarantee that the cached master value
423  * will still be valid when the rsb is reused.  Apps aren't always smart enough
424  * to keep NL locks on an rsb that they may lock again shortly; this can lead
425  * to excessive master lookups and removals if we don't delay the release.
426  *
427  * Searching for an rsb means looking through both the normal list and toss
428  * list.  When found on the toss list the rsb is moved to the normal list with
429  * ref count of 1; when found on normal list the ref count is incremented.
430  */
431 
432 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433 		    unsigned int flags, struct dlm_rsb **r_ret)
434 {
435 	struct dlm_rsb *r = NULL, *tmp;
436 	uint32_t hash, bucket;
437 	int error = -EINVAL;
438 
439 	if (namelen > DLM_RESNAME_MAXLEN)
440 		goto out;
441 
442 	if (dlm_no_directory(ls))
443 		flags |= R_CREATE;
444 
445 	error = 0;
446 	hash = jhash(name, namelen, 0);
447 	bucket = hash & (ls->ls_rsbtbl_size - 1);
448 
449 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
450 	if (!error)
451 		goto out;
452 
453 	if (error == -EBADR && !(flags & R_CREATE))
454 		goto out;
455 
456 	/* the rsb was found but wasn't a master copy */
457 	if (error == -ENOTBLK)
458 		goto out;
459 
460 	error = -ENOMEM;
461 	r = create_rsb(ls, name, namelen);
462 	if (!r)
463 		goto out;
464 
465 	r->res_hash = hash;
466 	r->res_bucket = bucket;
467 	r->res_nodeid = -1;
468 	kref_init(&r->res_ref);
469 
470 	/* With no directory, the master can be set immediately */
471 	if (dlm_no_directory(ls)) {
472 		int nodeid = dlm_dir_nodeid(r);
473 		if (nodeid == dlm_our_nodeid())
474 			nodeid = 0;
475 		r->res_nodeid = nodeid;
476 	}
477 
478 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
479 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480 	if (!error) {
481 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
482 		dlm_free_rsb(r);
483 		r = tmp;
484 		goto out;
485 	}
486 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
487 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488 	error = 0;
489  out:
490 	*r_ret = r;
491 	return error;
492 }
493 
494 /* This is only called to add a reference when the code already holds
495    a valid reference to the rsb, so there's no need for locking. */
496 
497 static inline void hold_rsb(struct dlm_rsb *r)
498 {
499 	kref_get(&r->res_ref);
500 }
501 
502 void dlm_hold_rsb(struct dlm_rsb *r)
503 {
504 	hold_rsb(r);
505 }
506 
507 static void toss_rsb(struct kref *kref)
508 {
509 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 	struct dlm_ls *ls = r->res_ls;
511 
512 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 	kref_init(&r->res_ref);
514 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 	r->res_toss_time = jiffies;
516 	if (r->res_lvbptr) {
517 		dlm_free_lvb(r->res_lvbptr);
518 		r->res_lvbptr = NULL;
519 	}
520 }
521 
522 /* When all references to the rsb are gone it's transferred to
523    the tossed list for later disposal. */
524 
525 static void put_rsb(struct dlm_rsb *r)
526 {
527 	struct dlm_ls *ls = r->res_ls;
528 	uint32_t bucket = r->res_bucket;
529 
530 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
531 	kref_put(&r->res_ref, toss_rsb);
532 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
533 }
534 
535 void dlm_put_rsb(struct dlm_rsb *r)
536 {
537 	put_rsb(r);
538 }
539 
540 /* See comment for unhold_lkb */
541 
542 static void unhold_rsb(struct dlm_rsb *r)
543 {
544 	int rv;
545 	rv = kref_put(&r->res_ref, toss_rsb);
546 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
547 }
548 
549 static void kill_rsb(struct kref *kref)
550 {
551 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552 
553 	/* All work is done after the return from kref_put() so we
554 	   can release the write_lock before the remove and free. */
555 
556 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
562 }
563 
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565    The rsb must exist as long as any lkb's for it do. */
566 
567 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568 {
569 	hold_rsb(r);
570 	lkb->lkb_resource = r;
571 }
572 
573 static void detach_lkb(struct dlm_lkb *lkb)
574 {
575 	if (lkb->lkb_resource) {
576 		put_rsb(lkb->lkb_resource);
577 		lkb->lkb_resource = NULL;
578 	}
579 }
580 
581 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582 {
583 	struct dlm_lkb *lkb, *tmp;
584 	uint32_t lkid = 0;
585 	uint16_t bucket;
586 
587 	lkb = dlm_allocate_lkb(ls);
588 	if (!lkb)
589 		return -ENOMEM;
590 
591 	lkb->lkb_nodeid = -1;
592 	lkb->lkb_grmode = DLM_LOCK_IV;
593 	kref_init(&lkb->lkb_ref);
594 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596 	INIT_LIST_HEAD(&lkb->lkb_time_list);
597 	INIT_LIST_HEAD(&lkb->lkb_astqueue);
598 
599 	get_random_bytes(&bucket, sizeof(bucket));
600 	bucket &= (ls->ls_lkbtbl_size - 1);
601 
602 	write_lock(&ls->ls_lkbtbl[bucket].lock);
603 
604 	/* counter can roll over so we must verify lkid is not in use */
605 
606 	while (lkid == 0) {
607 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608 
609 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610 				    lkb_idtbl_list) {
611 			if (tmp->lkb_id != lkid)
612 				continue;
613 			lkid = 0;
614 			break;
615 		}
616 	}
617 
618 	lkb->lkb_id = lkid;
619 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
621 
622 	*lkb_ret = lkb;
623 	return 0;
624 }
625 
626 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627 {
628 	struct dlm_lkb *lkb;
629 	uint16_t bucket = (lkid >> 16);
630 
631 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632 		if (lkb->lkb_id == lkid)
633 			return lkb;
634 	}
635 	return NULL;
636 }
637 
638 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639 {
640 	struct dlm_lkb *lkb;
641 	uint16_t bucket = (lkid >> 16);
642 
643 	if (bucket >= ls->ls_lkbtbl_size)
644 		return -EBADSLT;
645 
646 	read_lock(&ls->ls_lkbtbl[bucket].lock);
647 	lkb = __find_lkb(ls, lkid);
648 	if (lkb)
649 		kref_get(&lkb->lkb_ref);
650 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
651 
652 	*lkb_ret = lkb;
653 	return lkb ? 0 : -ENOENT;
654 }
655 
656 static void kill_lkb(struct kref *kref)
657 {
658 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659 
660 	/* All work is done after the return from kref_put() so we
661 	   can release the write_lock before the detach_lkb */
662 
663 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664 }
665 
666 /* __put_lkb() is used when an lkb may not have an rsb attached to
667    it so we need to provide the lockspace explicitly */
668 
669 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 {
671 	uint16_t bucket = (lkb->lkb_id >> 16);
672 
673 	write_lock(&ls->ls_lkbtbl[bucket].lock);
674 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675 		list_del(&lkb->lkb_idtbl_list);
676 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
677 
678 		detach_lkb(lkb);
679 
680 		/* for local/process lkbs, lvbptr points to caller's lksb */
681 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
682 			dlm_free_lvb(lkb->lkb_lvbptr);
683 		dlm_free_lkb(lkb);
684 		return 1;
685 	} else {
686 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
687 		return 0;
688 	}
689 }
690 
691 int dlm_put_lkb(struct dlm_lkb *lkb)
692 {
693 	struct dlm_ls *ls;
694 
695 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697 
698 	ls = lkb->lkb_resource->res_ls;
699 	return __put_lkb(ls, lkb);
700 }
701 
702 /* This is only called to add a reference when the code already holds
703    a valid reference to the lkb, so there's no need for locking. */
704 
705 static inline void hold_lkb(struct dlm_lkb *lkb)
706 {
707 	kref_get(&lkb->lkb_ref);
708 }
709 
710 /* This is called when we need to remove a reference and are certain
711    it's not the last ref.  e.g. del_lkb is always called between a
712    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713    put_lkb would work fine, but would involve unnecessary locking */
714 
715 static inline void unhold_lkb(struct dlm_lkb *lkb)
716 {
717 	int rv;
718 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
719 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720 }
721 
722 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723 			    int mode)
724 {
725 	struct dlm_lkb *lkb = NULL;
726 
727 	list_for_each_entry(lkb, head, lkb_statequeue)
728 		if (lkb->lkb_rqmode < mode)
729 			break;
730 
731 	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732 }
733 
734 /* add/remove lkb to rsb's grant/convert/wait queue */
735 
736 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737 {
738 	kref_get(&lkb->lkb_ref);
739 
740 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741 
742 	lkb->lkb_timestamp = ktime_get();
743 
744 	lkb->lkb_status = status;
745 
746 	switch (status) {
747 	case DLM_LKSTS_WAITING:
748 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750 		else
751 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752 		break;
753 	case DLM_LKSTS_GRANTED:
754 		/* convention says granted locks kept in order of grmode */
755 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756 				lkb->lkb_grmode);
757 		break;
758 	case DLM_LKSTS_CONVERT:
759 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761 		else
762 			list_add_tail(&lkb->lkb_statequeue,
763 				      &r->res_convertqueue);
764 		break;
765 	default:
766 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767 	}
768 }
769 
770 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771 {
772 	lkb->lkb_status = 0;
773 	list_del(&lkb->lkb_statequeue);
774 	unhold_lkb(lkb);
775 }
776 
777 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778 {
779 	hold_lkb(lkb);
780 	del_lkb(r, lkb);
781 	add_lkb(r, lkb, sts);
782 	unhold_lkb(lkb);
783 }
784 
785 static int msg_reply_type(int mstype)
786 {
787 	switch (mstype) {
788 	case DLM_MSG_REQUEST:
789 		return DLM_MSG_REQUEST_REPLY;
790 	case DLM_MSG_CONVERT:
791 		return DLM_MSG_CONVERT_REPLY;
792 	case DLM_MSG_UNLOCK:
793 		return DLM_MSG_UNLOCK_REPLY;
794 	case DLM_MSG_CANCEL:
795 		return DLM_MSG_CANCEL_REPLY;
796 	case DLM_MSG_LOOKUP:
797 		return DLM_MSG_LOOKUP_REPLY;
798 	}
799 	return -1;
800 }
801 
802 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
803 {
804 	int i;
805 
806 	for (i = 0; i < num_nodes; i++) {
807 		if (!warned[i]) {
808 			warned[i] = nodeid;
809 			return 0;
810 		}
811 		if (warned[i] == nodeid)
812 			return 1;
813 	}
814 	return 0;
815 }
816 
817 void dlm_scan_waiters(struct dlm_ls *ls)
818 {
819 	struct dlm_lkb *lkb;
820 	ktime_t zero = ktime_set(0, 0);
821 	s64 us;
822 	s64 debug_maxus = 0;
823 	u32 debug_scanned = 0;
824 	u32 debug_expired = 0;
825 	int num_nodes = 0;
826 	int *warned = NULL;
827 
828 	if (!dlm_config.ci_waitwarn_us)
829 		return;
830 
831 	mutex_lock(&ls->ls_waiters_mutex);
832 
833 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834 		if (ktime_equal(lkb->lkb_wait_time, zero))
835 			continue;
836 
837 		debug_scanned++;
838 
839 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
840 
841 		if (us < dlm_config.ci_waitwarn_us)
842 			continue;
843 
844 		lkb->lkb_wait_time = zero;
845 
846 		debug_expired++;
847 		if (us > debug_maxus)
848 			debug_maxus = us;
849 
850 		if (!num_nodes) {
851 			num_nodes = ls->ls_num_nodes;
852 			warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
853 			if (warned)
854 				memset(warned, 0, num_nodes * sizeof(int));
855 		}
856 		if (!warned)
857 			continue;
858 		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
859 			continue;
860 
861 		log_error(ls, "waitwarn %x %lld %d us check connection to "
862 			  "node %d", lkb->lkb_id, (long long)us,
863 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864 	}
865 	mutex_unlock(&ls->ls_waiters_mutex);
866 
867 	if (warned)
868 		kfree(warned);
869 
870 	if (debug_expired)
871 		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872 			  debug_scanned, debug_expired,
873 			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
874 }
875 
876 /* add/remove lkb from global waiters list of lkb's waiting for
877    a reply from a remote node */
878 
879 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
880 {
881 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
882 	int error = 0;
883 
884 	mutex_lock(&ls->ls_waiters_mutex);
885 
886 	if (is_overlap_unlock(lkb) ||
887 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
888 		error = -EINVAL;
889 		goto out;
890 	}
891 
892 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
893 		switch (mstype) {
894 		case DLM_MSG_UNLOCK:
895 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
896 			break;
897 		case DLM_MSG_CANCEL:
898 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
899 			break;
900 		default:
901 			error = -EBUSY;
902 			goto out;
903 		}
904 		lkb->lkb_wait_count++;
905 		hold_lkb(lkb);
906 
907 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
908 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
909 			  lkb->lkb_wait_count, lkb->lkb_flags);
910 		goto out;
911 	}
912 
913 	DLM_ASSERT(!lkb->lkb_wait_count,
914 		   dlm_print_lkb(lkb);
915 		   printk("wait_count %d\n", lkb->lkb_wait_count););
916 
917 	lkb->lkb_wait_count++;
918 	lkb->lkb_wait_type = mstype;
919 	lkb->lkb_wait_time = ktime_get();
920 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
921 	hold_lkb(lkb);
922 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
923  out:
924 	if (error)
925 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
926 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
927 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
928 	mutex_unlock(&ls->ls_waiters_mutex);
929 	return error;
930 }
931 
932 /* We clear the RESEND flag because we might be taking an lkb off the waiters
933    list as part of process_requestqueue (e.g. a lookup that has an optimized
934    request reply on the requestqueue) between dlm_recover_waiters_pre() which
935    set RESEND and dlm_recover_waiters_post() */
936 
937 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
938 				struct dlm_message *ms)
939 {
940 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
941 	int overlap_done = 0;
942 
943 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
944 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
945 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
946 		overlap_done = 1;
947 		goto out_del;
948 	}
949 
950 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
951 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
952 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
953 		overlap_done = 1;
954 		goto out_del;
955 	}
956 
957 	/* Cancel state was preemptively cleared by a successful convert,
958 	   see next comment, nothing to do. */
959 
960 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
961 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
962 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
963 			  lkb->lkb_id, lkb->lkb_wait_type);
964 		return -1;
965 	}
966 
967 	/* Remove for the convert reply, and premptively remove for the
968 	   cancel reply.  A convert has been granted while there's still
969 	   an outstanding cancel on it (the cancel is moot and the result
970 	   in the cancel reply should be 0).  We preempt the cancel reply
971 	   because the app gets the convert result and then can follow up
972 	   with another op, like convert.  This subsequent op would see the
973 	   lingering state of the cancel and fail with -EBUSY. */
974 
975 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
976 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
977 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
978 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
979 			  lkb->lkb_id);
980 		lkb->lkb_wait_type = 0;
981 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
982 		lkb->lkb_wait_count--;
983 		goto out_del;
984 	}
985 
986 	/* N.B. type of reply may not always correspond to type of original
987 	   msg due to lookup->request optimization, verify others? */
988 
989 	if (lkb->lkb_wait_type) {
990 		lkb->lkb_wait_type = 0;
991 		goto out_del;
992 	}
993 
994 	log_error(ls, "remwait error %x reply %d flags %x no wait_type",
995 		  lkb->lkb_id, mstype, lkb->lkb_flags);
996 	return -1;
997 
998  out_del:
999 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1000 	   to the op that was in progress prior to the unlock/cancel; we
1001 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1002 	   this would happen */
1003 
1004 	if (overlap_done && lkb->lkb_wait_type) {
1005 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1006 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1007 		lkb->lkb_wait_count--;
1008 		lkb->lkb_wait_type = 0;
1009 	}
1010 
1011 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1012 
1013 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1014 	lkb->lkb_wait_count--;
1015 	if (!lkb->lkb_wait_count)
1016 		list_del_init(&lkb->lkb_wait_reply);
1017 	unhold_lkb(lkb);
1018 	return 0;
1019 }
1020 
1021 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1022 {
1023 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1024 	int error;
1025 
1026 	mutex_lock(&ls->ls_waiters_mutex);
1027 	error = _remove_from_waiters(lkb, mstype, NULL);
1028 	mutex_unlock(&ls->ls_waiters_mutex);
1029 	return error;
1030 }
1031 
1032 /* Handles situations where we might be processing a "fake" or "stub" reply in
1033    which we can't try to take waiters_mutex again. */
1034 
1035 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1036 {
1037 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038 	int error;
1039 
1040 	if (ms->m_flags != DLM_IFL_STUB_MS)
1041 		mutex_lock(&ls->ls_waiters_mutex);
1042 	error = _remove_from_waiters(lkb, ms->m_type, ms);
1043 	if (ms->m_flags != DLM_IFL_STUB_MS)
1044 		mutex_unlock(&ls->ls_waiters_mutex);
1045 	return error;
1046 }
1047 
1048 static void dir_remove(struct dlm_rsb *r)
1049 {
1050 	int to_nodeid;
1051 
1052 	if (dlm_no_directory(r->res_ls))
1053 		return;
1054 
1055 	to_nodeid = dlm_dir_nodeid(r);
1056 	if (to_nodeid != dlm_our_nodeid())
1057 		send_remove(r);
1058 	else
1059 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
1060 				     r->res_name, r->res_length);
1061 }
1062 
1063 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1064    found since they are in order of newest to oldest? */
1065 
1066 static int shrink_bucket(struct dlm_ls *ls, int b)
1067 {
1068 	struct dlm_rsb *r;
1069 	int count = 0, found;
1070 
1071 	for (;;) {
1072 		found = 0;
1073 		spin_lock(&ls->ls_rsbtbl[b].lock);
1074 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1075 					    res_hashchain) {
1076 			if (!time_after_eq(jiffies, r->res_toss_time +
1077 					   dlm_config.ci_toss_secs * HZ))
1078 				continue;
1079 			found = 1;
1080 			break;
1081 		}
1082 
1083 		if (!found) {
1084 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1085 			break;
1086 		}
1087 
1088 		if (kref_put(&r->res_ref, kill_rsb)) {
1089 			list_del(&r->res_hashchain);
1090 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1091 
1092 			if (is_master(r))
1093 				dir_remove(r);
1094 			dlm_free_rsb(r);
1095 			count++;
1096 		} else {
1097 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1098 			log_error(ls, "tossed rsb in use %s", r->res_name);
1099 		}
1100 	}
1101 
1102 	return count;
1103 }
1104 
1105 void dlm_scan_rsbs(struct dlm_ls *ls)
1106 {
1107 	int i;
1108 
1109 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1110 		shrink_bucket(ls, i);
1111 		if (dlm_locking_stopped(ls))
1112 			break;
1113 		cond_resched();
1114 	}
1115 }
1116 
1117 static void add_timeout(struct dlm_lkb *lkb)
1118 {
1119 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1120 
1121 	if (is_master_copy(lkb))
1122 		return;
1123 
1124 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1125 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1126 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1127 		goto add_it;
1128 	}
1129 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1130 		goto add_it;
1131 	return;
1132 
1133  add_it:
1134 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1135 	mutex_lock(&ls->ls_timeout_mutex);
1136 	hold_lkb(lkb);
1137 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1138 	mutex_unlock(&ls->ls_timeout_mutex);
1139 }
1140 
1141 static void del_timeout(struct dlm_lkb *lkb)
1142 {
1143 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1144 
1145 	mutex_lock(&ls->ls_timeout_mutex);
1146 	if (!list_empty(&lkb->lkb_time_list)) {
1147 		list_del_init(&lkb->lkb_time_list);
1148 		unhold_lkb(lkb);
1149 	}
1150 	mutex_unlock(&ls->ls_timeout_mutex);
1151 }
1152 
1153 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1154    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1155    and then lock rsb because of lock ordering in add_timeout.  We may need
1156    to specify some special timeout-related bits in the lkb that are just to
1157    be accessed under the timeout_mutex. */
1158 
1159 void dlm_scan_timeout(struct dlm_ls *ls)
1160 {
1161 	struct dlm_rsb *r;
1162 	struct dlm_lkb *lkb;
1163 	int do_cancel, do_warn;
1164 	s64 wait_us;
1165 
1166 	for (;;) {
1167 		if (dlm_locking_stopped(ls))
1168 			break;
1169 
1170 		do_cancel = 0;
1171 		do_warn = 0;
1172 		mutex_lock(&ls->ls_timeout_mutex);
1173 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1174 
1175 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1176 					      		lkb->lkb_timestamp));
1177 
1178 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1179 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1180 				do_cancel = 1;
1181 
1182 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1183 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1184 				do_warn = 1;
1185 
1186 			if (!do_cancel && !do_warn)
1187 				continue;
1188 			hold_lkb(lkb);
1189 			break;
1190 		}
1191 		mutex_unlock(&ls->ls_timeout_mutex);
1192 
1193 		if (!do_cancel && !do_warn)
1194 			break;
1195 
1196 		r = lkb->lkb_resource;
1197 		hold_rsb(r);
1198 		lock_rsb(r);
1199 
1200 		if (do_warn) {
1201 			/* clear flag so we only warn once */
1202 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1203 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1204 				del_timeout(lkb);
1205 			dlm_timeout_warn(lkb);
1206 		}
1207 
1208 		if (do_cancel) {
1209 			log_debug(ls, "timeout cancel %x node %d %s",
1210 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1211 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1212 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1213 			del_timeout(lkb);
1214 			_cancel_lock(r, lkb);
1215 		}
1216 
1217 		unlock_rsb(r);
1218 		unhold_rsb(r);
1219 		dlm_put_lkb(lkb);
1220 	}
1221 }
1222 
1223 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1224    dlm_recoverd before checking/setting ls_recover_begin. */
1225 
1226 void dlm_adjust_timeouts(struct dlm_ls *ls)
1227 {
1228 	struct dlm_lkb *lkb;
1229 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1230 
1231 	ls->ls_recover_begin = 0;
1232 	mutex_lock(&ls->ls_timeout_mutex);
1233 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1234 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1235 	mutex_unlock(&ls->ls_timeout_mutex);
1236 
1237 	if (!dlm_config.ci_waitwarn_us)
1238 		return;
1239 
1240 	mutex_lock(&ls->ls_waiters_mutex);
1241 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242 		if (ktime_to_us(lkb->lkb_wait_time))
1243 			lkb->lkb_wait_time = ktime_get();
1244 	}
1245 	mutex_unlock(&ls->ls_waiters_mutex);
1246 }
1247 
1248 /* lkb is master or local copy */
1249 
1250 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251 {
1252 	int b, len = r->res_ls->ls_lvblen;
1253 
1254 	/* b=1 lvb returned to caller
1255 	   b=0 lvb written to rsb or invalidated
1256 	   b=-1 do nothing */
1257 
1258 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1259 
1260 	if (b == 1) {
1261 		if (!lkb->lkb_lvbptr)
1262 			return;
1263 
1264 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1265 			return;
1266 
1267 		if (!r->res_lvbptr)
1268 			return;
1269 
1270 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1271 		lkb->lkb_lvbseq = r->res_lvbseq;
1272 
1273 	} else if (b == 0) {
1274 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1275 			rsb_set_flag(r, RSB_VALNOTVALID);
1276 			return;
1277 		}
1278 
1279 		if (!lkb->lkb_lvbptr)
1280 			return;
1281 
1282 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1283 			return;
1284 
1285 		if (!r->res_lvbptr)
1286 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1287 
1288 		if (!r->res_lvbptr)
1289 			return;
1290 
1291 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1292 		r->res_lvbseq++;
1293 		lkb->lkb_lvbseq = r->res_lvbseq;
1294 		rsb_clear_flag(r, RSB_VALNOTVALID);
1295 	}
1296 
1297 	if (rsb_flag(r, RSB_VALNOTVALID))
1298 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1299 }
1300 
1301 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1302 {
1303 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1304 		return;
1305 
1306 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1307 		rsb_set_flag(r, RSB_VALNOTVALID);
1308 		return;
1309 	}
1310 
1311 	if (!lkb->lkb_lvbptr)
1312 		return;
1313 
1314 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1315 		return;
1316 
1317 	if (!r->res_lvbptr)
1318 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1319 
1320 	if (!r->res_lvbptr)
1321 		return;
1322 
1323 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1324 	r->res_lvbseq++;
1325 	rsb_clear_flag(r, RSB_VALNOTVALID);
1326 }
1327 
1328 /* lkb is process copy (pc) */
1329 
1330 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1331 			    struct dlm_message *ms)
1332 {
1333 	int b;
1334 
1335 	if (!lkb->lkb_lvbptr)
1336 		return;
1337 
1338 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1339 		return;
1340 
1341 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1342 	if (b == 1) {
1343 		int len = receive_extralen(ms);
1344 		if (len > DLM_RESNAME_MAXLEN)
1345 			len = DLM_RESNAME_MAXLEN;
1346 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1347 		lkb->lkb_lvbseq = ms->m_lvbseq;
1348 	}
1349 }
1350 
1351 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1352    remove_lock -- used for unlock, removes lkb from granted
1353    revert_lock -- used for cancel, moves lkb from convert to granted
1354    grant_lock  -- used for request and convert, adds lkb to granted or
1355                   moves lkb from convert or waiting to granted
1356 
1357    Each of these is used for master or local copy lkb's.  There is
1358    also a _pc() variation used to make the corresponding change on
1359    a process copy (pc) lkb. */
1360 
1361 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1362 {
1363 	del_lkb(r, lkb);
1364 	lkb->lkb_grmode = DLM_LOCK_IV;
1365 	/* this unhold undoes the original ref from create_lkb()
1366 	   so this leads to the lkb being freed */
1367 	unhold_lkb(lkb);
1368 }
1369 
1370 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1371 {
1372 	set_lvb_unlock(r, lkb);
1373 	_remove_lock(r, lkb);
1374 }
1375 
1376 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377 {
1378 	_remove_lock(r, lkb);
1379 }
1380 
1381 /* returns: 0 did nothing
1382 	    1 moved lock to granted
1383 	   -1 removed lock */
1384 
1385 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1386 {
1387 	int rv = 0;
1388 
1389 	lkb->lkb_rqmode = DLM_LOCK_IV;
1390 
1391 	switch (lkb->lkb_status) {
1392 	case DLM_LKSTS_GRANTED:
1393 		break;
1394 	case DLM_LKSTS_CONVERT:
1395 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1396 		rv = 1;
1397 		break;
1398 	case DLM_LKSTS_WAITING:
1399 		del_lkb(r, lkb);
1400 		lkb->lkb_grmode = DLM_LOCK_IV;
1401 		/* this unhold undoes the original ref from create_lkb()
1402 		   so this leads to the lkb being freed */
1403 		unhold_lkb(lkb);
1404 		rv = -1;
1405 		break;
1406 	default:
1407 		log_print("invalid status for revert %d", lkb->lkb_status);
1408 	}
1409 	return rv;
1410 }
1411 
1412 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1413 {
1414 	return revert_lock(r, lkb);
1415 }
1416 
1417 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1418 {
1419 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1420 		lkb->lkb_grmode = lkb->lkb_rqmode;
1421 		if (lkb->lkb_status)
1422 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1423 		else
1424 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1425 	}
1426 
1427 	lkb->lkb_rqmode = DLM_LOCK_IV;
1428 }
1429 
1430 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1431 {
1432 	set_lvb_lock(r, lkb);
1433 	_grant_lock(r, lkb);
1434 	lkb->lkb_highbast = 0;
1435 }
1436 
1437 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1438 			  struct dlm_message *ms)
1439 {
1440 	set_lvb_lock_pc(r, lkb, ms);
1441 	_grant_lock(r, lkb);
1442 }
1443 
1444 /* called by grant_pending_locks() which means an async grant message must
1445    be sent to the requesting node in addition to granting the lock if the
1446    lkb belongs to a remote node. */
1447 
1448 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1449 {
1450 	grant_lock(r, lkb);
1451 	if (is_master_copy(lkb))
1452 		send_grant(r, lkb);
1453 	else
1454 		queue_cast(r, lkb, 0);
1455 }
1456 
1457 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1458    change the granted/requested modes.  We're munging things accordingly in
1459    the process copy.
1460    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1461    conversion deadlock
1462    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463    compatible with other granted locks */
1464 
1465 static void munge_demoted(struct dlm_lkb *lkb)
1466 {
1467 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1468 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1469 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1470 		return;
1471 	}
1472 
1473 	lkb->lkb_grmode = DLM_LOCK_NL;
1474 }
1475 
1476 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1477 {
1478 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1479 	    ms->m_type != DLM_MSG_GRANT) {
1480 		log_print("munge_altmode %x invalid reply type %d",
1481 			  lkb->lkb_id, ms->m_type);
1482 		return;
1483 	}
1484 
1485 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1486 		lkb->lkb_rqmode = DLM_LOCK_PR;
1487 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1488 		lkb->lkb_rqmode = DLM_LOCK_CW;
1489 	else {
1490 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1491 		dlm_print_lkb(lkb);
1492 	}
1493 }
1494 
1495 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1496 {
1497 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1498 					   lkb_statequeue);
1499 	if (lkb->lkb_id == first->lkb_id)
1500 		return 1;
1501 
1502 	return 0;
1503 }
1504 
1505 /* Check if the given lkb conflicts with another lkb on the queue. */
1506 
1507 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1508 {
1509 	struct dlm_lkb *this;
1510 
1511 	list_for_each_entry(this, head, lkb_statequeue) {
1512 		if (this == lkb)
1513 			continue;
1514 		if (!modes_compat(this, lkb))
1515 			return 1;
1516 	}
1517 	return 0;
1518 }
1519 
1520 /*
1521  * "A conversion deadlock arises with a pair of lock requests in the converting
1522  * queue for one resource.  The granted mode of each lock blocks the requested
1523  * mode of the other lock."
1524  *
1525  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1526  * convert queue from being granted, then deadlk/demote lkb.
1527  *
1528  * Example:
1529  * Granted Queue: empty
1530  * Convert Queue: NL->EX (first lock)
1531  *                PR->EX (second lock)
1532  *
1533  * The first lock can't be granted because of the granted mode of the second
1534  * lock and the second lock can't be granted because it's not first in the
1535  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1536  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1537  * flag set and return DEMOTED in the lksb flags.
1538  *
1539  * Originally, this function detected conv-deadlk in a more limited scope:
1540  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1541  * - if lkb1 was the first entry in the queue (not just earlier), and was
1542  *   blocked by the granted mode of lkb2, and there was nothing on the
1543  *   granted queue preventing lkb1 from being granted immediately, i.e.
1544  *   lkb2 was the only thing preventing lkb1 from being granted.
1545  *
1546  * That second condition meant we'd only say there was conv-deadlk if
1547  * resolving it (by demotion) would lead to the first lock on the convert
1548  * queue being granted right away.  It allowed conversion deadlocks to exist
1549  * between locks on the convert queue while they couldn't be granted anyway.
1550  *
1551  * Now, we detect and take action on conversion deadlocks immediately when
1552  * they're created, even if they may not be immediately consequential.  If
1553  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1554  * mode that would prevent lkb1's conversion from being granted, we do a
1555  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1556  * I think this means that the lkb_is_ahead condition below should always
1557  * be zero, i.e. there will never be conv-deadlk between two locks that are
1558  * both already on the convert queue.
1559  */
1560 
1561 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1562 {
1563 	struct dlm_lkb *lkb1;
1564 	int lkb_is_ahead = 0;
1565 
1566 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1567 		if (lkb1 == lkb2) {
1568 			lkb_is_ahead = 1;
1569 			continue;
1570 		}
1571 
1572 		if (!lkb_is_ahead) {
1573 			if (!modes_compat(lkb2, lkb1))
1574 				return 1;
1575 		} else {
1576 			if (!modes_compat(lkb2, lkb1) &&
1577 			    !modes_compat(lkb1, lkb2))
1578 				return 1;
1579 		}
1580 	}
1581 	return 0;
1582 }
1583 
1584 /*
1585  * Return 1 if the lock can be granted, 0 otherwise.
1586  * Also detect and resolve conversion deadlocks.
1587  *
1588  * lkb is the lock to be granted
1589  *
1590  * now is 1 if the function is being called in the context of the
1591  * immediate request, it is 0 if called later, after the lock has been
1592  * queued.
1593  *
1594  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1595  */
1596 
1597 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1598 {
1599 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1600 
1601 	/*
1602 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1603 	 * a new request for a NL mode lock being blocked.
1604 	 *
1605 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1606 	 * request, then it would be granted.  In essence, the use of this flag
1607 	 * tells the Lock Manager to expedite theis request by not considering
1608 	 * what may be in the CONVERTING or WAITING queues...  As of this
1609 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1610 	 * mode locks.  This flag is not valid for conversion requests.
1611 	 *
1612 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1613 	 * conversion or used with a non-NL requested mode.  We also know an
1614 	 * EXPEDITE request is always granted immediately, so now must always
1615 	 * be 1.  The full condition to grant an expedite request: (now &&
1616 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1617 	 * therefore be shortened to just checking the flag.
1618 	 */
1619 
1620 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1621 		return 1;
1622 
1623 	/*
1624 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1625 	 * added to the remaining conditions.
1626 	 */
1627 
1628 	if (queue_conflict(&r->res_grantqueue, lkb))
1629 		goto out;
1630 
1631 	/*
1632 	 * 6-3: By default, a conversion request is immediately granted if the
1633 	 * requested mode is compatible with the modes of all other granted
1634 	 * locks
1635 	 */
1636 
1637 	if (queue_conflict(&r->res_convertqueue, lkb))
1638 		goto out;
1639 
1640 	/*
1641 	 * 6-5: But the default algorithm for deciding whether to grant or
1642 	 * queue conversion requests does not by itself guarantee that such
1643 	 * requests are serviced on a "first come first serve" basis.  This, in
1644 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1645 	 *
1646 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1647 	 * the system service employed to request a lock conversion.  This flag
1648 	 * forces certain conversion requests to be queued, even if they are
1649 	 * compatible with the granted modes of other locks on the same
1650 	 * resource.  Thus, the use of this flag results in conversion requests
1651 	 * being ordered on a "first come first servce" basis.
1652 	 *
1653 	 * DCT: This condition is all about new conversions being able to occur
1654 	 * "in place" while the lock remains on the granted queue (assuming
1655 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1656 	 * doesn't _have_ to go onto the convert queue where it's processed in
1657 	 * order.  The "now" variable is necessary to distinguish converts
1658 	 * being received and processed for the first time now, because once a
1659 	 * convert is moved to the conversion queue the condition below applies
1660 	 * requiring fifo granting.
1661 	 */
1662 
1663 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1664 		return 1;
1665 
1666 	/*
1667 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1668 	 * order.
1669 	 */
1670 
1671 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1672 		return 1;
1673 
1674 	/*
1675 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1676 	 * granted until all other conversion requests ahead of it are granted
1677 	 * and/or canceled.
1678 	 */
1679 
1680 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1681 		return 1;
1682 
1683 	/*
1684 	 * 6-4: By default, a new request is immediately granted only if all
1685 	 * three of the following conditions are satisfied when the request is
1686 	 * issued:
1687 	 * - The queue of ungranted conversion requests for the resource is
1688 	 *   empty.
1689 	 * - The queue of ungranted new requests for the resource is empty.
1690 	 * - The mode of the new request is compatible with the most
1691 	 *   restrictive mode of all granted locks on the resource.
1692 	 */
1693 
1694 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1695 	    list_empty(&r->res_waitqueue))
1696 		return 1;
1697 
1698 	/*
1699 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1700 	 * it cannot be granted until the queue of ungranted conversion
1701 	 * requests is empty, all ungranted new requests ahead of it are
1702 	 * granted and/or canceled, and it is compatible with the granted mode
1703 	 * of the most restrictive lock granted on the resource.
1704 	 */
1705 
1706 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1707 	    first_in_list(lkb, &r->res_waitqueue))
1708 		return 1;
1709  out:
1710 	return 0;
1711 }
1712 
1713 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1714 			  int *err)
1715 {
1716 	int rv;
1717 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1718 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1719 
1720 	if (err)
1721 		*err = 0;
1722 
1723 	rv = _can_be_granted(r, lkb, now);
1724 	if (rv)
1725 		goto out;
1726 
1727 	/*
1728 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1729 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1730 	 * cancels one of the locks.
1731 	 */
1732 
1733 	if (is_convert && can_be_queued(lkb) &&
1734 	    conversion_deadlock_detect(r, lkb)) {
1735 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1736 			lkb->lkb_grmode = DLM_LOCK_NL;
1737 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1738 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1739 			if (err)
1740 				*err = -EDEADLK;
1741 			else {
1742 				log_print("can_be_granted deadlock %x now %d",
1743 					  lkb->lkb_id, now);
1744 				dlm_dump_rsb(r);
1745 			}
1746 		}
1747 		goto out;
1748 	}
1749 
1750 	/*
1751 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1752 	 * to grant a request in a mode other than the normal rqmode.  It's a
1753 	 * simple way to provide a big optimization to applications that can
1754 	 * use them.
1755 	 */
1756 
1757 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1758 		alt = DLM_LOCK_PR;
1759 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1760 		alt = DLM_LOCK_CW;
1761 
1762 	if (alt) {
1763 		lkb->lkb_rqmode = alt;
1764 		rv = _can_be_granted(r, lkb, now);
1765 		if (rv)
1766 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1767 		else
1768 			lkb->lkb_rqmode = rqmode;
1769 	}
1770  out:
1771 	return rv;
1772 }
1773 
1774 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1775    for locks pending on the convert list.  Once verified (watch for these
1776    log_prints), we should be able to just call _can_be_granted() and not
1777    bother with the demote/deadlk cases here (and there's no easy way to deal
1778    with a deadlk here, we'd have to generate something like grant_lock with
1779    the deadlk error.) */
1780 
1781 /* Returns the highest requested mode of all blocked conversions; sets
1782    cw if there's a blocked conversion to DLM_LOCK_CW. */
1783 
1784 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1785 {
1786 	struct dlm_lkb *lkb, *s;
1787 	int hi, demoted, quit, grant_restart, demote_restart;
1788 	int deadlk;
1789 
1790 	quit = 0;
1791  restart:
1792 	grant_restart = 0;
1793 	demote_restart = 0;
1794 	hi = DLM_LOCK_IV;
1795 
1796 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1797 		demoted = is_demoted(lkb);
1798 		deadlk = 0;
1799 
1800 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1801 			grant_lock_pending(r, lkb);
1802 			grant_restart = 1;
1803 			continue;
1804 		}
1805 
1806 		if (!demoted && is_demoted(lkb)) {
1807 			log_print("WARN: pending demoted %x node %d %s",
1808 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1809 			demote_restart = 1;
1810 			continue;
1811 		}
1812 
1813 		if (deadlk) {
1814 			log_print("WARN: pending deadlock %x node %d %s",
1815 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1816 			dlm_dump_rsb(r);
1817 			continue;
1818 		}
1819 
1820 		hi = max_t(int, lkb->lkb_rqmode, hi);
1821 
1822 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1823 			*cw = 1;
1824 	}
1825 
1826 	if (grant_restart)
1827 		goto restart;
1828 	if (demote_restart && !quit) {
1829 		quit = 1;
1830 		goto restart;
1831 	}
1832 
1833 	return max_t(int, high, hi);
1834 }
1835 
1836 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1837 {
1838 	struct dlm_lkb *lkb, *s;
1839 
1840 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1841 		if (can_be_granted(r, lkb, 0, NULL))
1842 			grant_lock_pending(r, lkb);
1843                 else {
1844 			high = max_t(int, lkb->lkb_rqmode, high);
1845 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1846 				*cw = 1;
1847 		}
1848 	}
1849 
1850 	return high;
1851 }
1852 
1853 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1854    on either the convert or waiting queue.
1855    high is the largest rqmode of all locks blocked on the convert or
1856    waiting queue. */
1857 
1858 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1859 {
1860 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1861 		if (gr->lkb_highbast < DLM_LOCK_EX)
1862 			return 1;
1863 		return 0;
1864 	}
1865 
1866 	if (gr->lkb_highbast < high &&
1867 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1868 		return 1;
1869 	return 0;
1870 }
1871 
1872 static void grant_pending_locks(struct dlm_rsb *r)
1873 {
1874 	struct dlm_lkb *lkb, *s;
1875 	int high = DLM_LOCK_IV;
1876 	int cw = 0;
1877 
1878 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1879 
1880 	high = grant_pending_convert(r, high, &cw);
1881 	high = grant_pending_wait(r, high, &cw);
1882 
1883 	if (high == DLM_LOCK_IV)
1884 		return;
1885 
1886 	/*
1887 	 * If there are locks left on the wait/convert queue then send blocking
1888 	 * ASTs to granted locks based on the largest requested mode (high)
1889 	 * found above.
1890 	 */
1891 
1892 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1893 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1894 			if (cw && high == DLM_LOCK_PR &&
1895 			    lkb->lkb_grmode == DLM_LOCK_PR)
1896 				queue_bast(r, lkb, DLM_LOCK_CW);
1897 			else
1898 				queue_bast(r, lkb, high);
1899 			lkb->lkb_highbast = high;
1900 		}
1901 	}
1902 }
1903 
1904 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1905 {
1906 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1907 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1908 		if (gr->lkb_highbast < DLM_LOCK_EX)
1909 			return 1;
1910 		return 0;
1911 	}
1912 
1913 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1914 		return 1;
1915 	return 0;
1916 }
1917 
1918 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1919 			    struct dlm_lkb *lkb)
1920 {
1921 	struct dlm_lkb *gr;
1922 
1923 	list_for_each_entry(gr, head, lkb_statequeue) {
1924 		/* skip self when sending basts to convertqueue */
1925 		if (gr == lkb)
1926 			continue;
1927 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1928 			queue_bast(r, gr, lkb->lkb_rqmode);
1929 			gr->lkb_highbast = lkb->lkb_rqmode;
1930 		}
1931 	}
1932 }
1933 
1934 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1935 {
1936 	send_bast_queue(r, &r->res_grantqueue, lkb);
1937 }
1938 
1939 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1940 {
1941 	send_bast_queue(r, &r->res_grantqueue, lkb);
1942 	send_bast_queue(r, &r->res_convertqueue, lkb);
1943 }
1944 
1945 /* set_master(r, lkb) -- set the master nodeid of a resource
1946 
1947    The purpose of this function is to set the nodeid field in the given
1948    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1949    known, it can just be copied to the lkb and the function will return
1950    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1951    before it can be copied to the lkb.
1952 
1953    When the rsb nodeid is being looked up remotely, the initial lkb
1954    causing the lookup is kept on the ls_waiters list waiting for the
1955    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1956    on the rsb's res_lookup list until the master is verified.
1957 
1958    Return values:
1959    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1960    1: the rsb master is not available and the lkb has been placed on
1961       a wait queue
1962 */
1963 
1964 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1965 {
1966 	struct dlm_ls *ls = r->res_ls;
1967 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1968 
1969 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1970 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1971 		r->res_first_lkid = lkb->lkb_id;
1972 		lkb->lkb_nodeid = r->res_nodeid;
1973 		return 0;
1974 	}
1975 
1976 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1977 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1978 		return 1;
1979 	}
1980 
1981 	if (r->res_nodeid == 0) {
1982 		lkb->lkb_nodeid = 0;
1983 		return 0;
1984 	}
1985 
1986 	if (r->res_nodeid > 0) {
1987 		lkb->lkb_nodeid = r->res_nodeid;
1988 		return 0;
1989 	}
1990 
1991 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1992 
1993 	dir_nodeid = dlm_dir_nodeid(r);
1994 
1995 	if (dir_nodeid != our_nodeid) {
1996 		r->res_first_lkid = lkb->lkb_id;
1997 		send_lookup(r, lkb);
1998 		return 1;
1999 	}
2000 
2001 	for (i = 0; i < 2; i++) {
2002 		/* It's possible for dlm_scand to remove an old rsb for
2003 		   this same resource from the toss list, us to create
2004 		   a new one, look up the master locally, and find it
2005 		   already exists just before dlm_scand does the
2006 		   dir_remove() on the previous rsb. */
2007 
2008 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2009 				       r->res_length, &ret_nodeid);
2010 		if (!error)
2011 			break;
2012 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2013 		schedule();
2014 	}
2015 	if (error && error != -EEXIST)
2016 		return error;
2017 
2018 	if (ret_nodeid == our_nodeid) {
2019 		r->res_first_lkid = 0;
2020 		r->res_nodeid = 0;
2021 		lkb->lkb_nodeid = 0;
2022 	} else {
2023 		r->res_first_lkid = lkb->lkb_id;
2024 		r->res_nodeid = ret_nodeid;
2025 		lkb->lkb_nodeid = ret_nodeid;
2026 	}
2027 	return 0;
2028 }
2029 
2030 static void process_lookup_list(struct dlm_rsb *r)
2031 {
2032 	struct dlm_lkb *lkb, *safe;
2033 
2034 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2035 		list_del_init(&lkb->lkb_rsb_lookup);
2036 		_request_lock(r, lkb);
2037 		schedule();
2038 	}
2039 }
2040 
2041 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2042 
2043 static void confirm_master(struct dlm_rsb *r, int error)
2044 {
2045 	struct dlm_lkb *lkb;
2046 
2047 	if (!r->res_first_lkid)
2048 		return;
2049 
2050 	switch (error) {
2051 	case 0:
2052 	case -EINPROGRESS:
2053 		r->res_first_lkid = 0;
2054 		process_lookup_list(r);
2055 		break;
2056 
2057 	case -EAGAIN:
2058 	case -EBADR:
2059 	case -ENOTBLK:
2060 		/* the remote request failed and won't be retried (it was
2061 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2062 		   lkb the first_lkid */
2063 
2064 		r->res_first_lkid = 0;
2065 
2066 		if (!list_empty(&r->res_lookup)) {
2067 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2068 					 lkb_rsb_lookup);
2069 			list_del_init(&lkb->lkb_rsb_lookup);
2070 			r->res_first_lkid = lkb->lkb_id;
2071 			_request_lock(r, lkb);
2072 		}
2073 		break;
2074 
2075 	default:
2076 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2077 	}
2078 }
2079 
2080 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2081 			 int namelen, unsigned long timeout_cs,
2082 			 void (*ast) (void *astparam),
2083 			 void *astparam,
2084 			 void (*bast) (void *astparam, int mode),
2085 			 struct dlm_args *args)
2086 {
2087 	int rv = -EINVAL;
2088 
2089 	/* check for invalid arg usage */
2090 
2091 	if (mode < 0 || mode > DLM_LOCK_EX)
2092 		goto out;
2093 
2094 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2095 		goto out;
2096 
2097 	if (flags & DLM_LKF_CANCEL)
2098 		goto out;
2099 
2100 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2101 		goto out;
2102 
2103 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2104 		goto out;
2105 
2106 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2107 		goto out;
2108 
2109 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2110 		goto out;
2111 
2112 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2113 		goto out;
2114 
2115 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2116 		goto out;
2117 
2118 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2119 		goto out;
2120 
2121 	if (!ast || !lksb)
2122 		goto out;
2123 
2124 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2125 		goto out;
2126 
2127 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2128 		goto out;
2129 
2130 	/* these args will be copied to the lkb in validate_lock_args,
2131 	   it cannot be done now because when converting locks, fields in
2132 	   an active lkb cannot be modified before locking the rsb */
2133 
2134 	args->flags = flags;
2135 	args->astfn = ast;
2136 	args->astparam = astparam;
2137 	args->bastfn = bast;
2138 	args->timeout = timeout_cs;
2139 	args->mode = mode;
2140 	args->lksb = lksb;
2141 	rv = 0;
2142  out:
2143 	return rv;
2144 }
2145 
2146 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2147 {
2148 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2149  		      DLM_LKF_FORCEUNLOCK))
2150 		return -EINVAL;
2151 
2152 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2153 		return -EINVAL;
2154 
2155 	args->flags = flags;
2156 	args->astparam = astarg;
2157 	return 0;
2158 }
2159 
2160 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2161 			      struct dlm_args *args)
2162 {
2163 	int rv = -EINVAL;
2164 
2165 	if (args->flags & DLM_LKF_CONVERT) {
2166 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2167 			goto out;
2168 
2169 		if (args->flags & DLM_LKF_QUECVT &&
2170 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2171 			goto out;
2172 
2173 		rv = -EBUSY;
2174 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2175 			goto out;
2176 
2177 		if (lkb->lkb_wait_type)
2178 			goto out;
2179 
2180 		if (is_overlap(lkb))
2181 			goto out;
2182 	}
2183 
2184 	lkb->lkb_exflags = args->flags;
2185 	lkb->lkb_sbflags = 0;
2186 	lkb->lkb_astfn = args->astfn;
2187 	lkb->lkb_astparam = args->astparam;
2188 	lkb->lkb_bastfn = args->bastfn;
2189 	lkb->lkb_rqmode = args->mode;
2190 	lkb->lkb_lksb = args->lksb;
2191 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2192 	lkb->lkb_ownpid = (int) current->pid;
2193 	lkb->lkb_timeout_cs = args->timeout;
2194 	rv = 0;
2195  out:
2196 	if (rv)
2197 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2198 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2199 			  lkb->lkb_status, lkb->lkb_wait_type,
2200 			  lkb->lkb_resource->res_name);
2201 	return rv;
2202 }
2203 
2204 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2205    for success */
2206 
2207 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2208    because there may be a lookup in progress and it's valid to do
2209    cancel/unlockf on it */
2210 
2211 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2212 {
2213 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2214 	int rv = -EINVAL;
2215 
2216 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2217 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2218 		dlm_print_lkb(lkb);
2219 		goto out;
2220 	}
2221 
2222 	/* an lkb may still exist even though the lock is EOL'ed due to a
2223 	   cancel, unlock or failed noqueue request; an app can't use these
2224 	   locks; return same error as if the lkid had not been found at all */
2225 
2226 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2227 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2228 		rv = -ENOENT;
2229 		goto out;
2230 	}
2231 
2232 	/* an lkb may be waiting for an rsb lookup to complete where the
2233 	   lookup was initiated by another lock */
2234 
2235 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2236 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2237 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2238 			list_del_init(&lkb->lkb_rsb_lookup);
2239 			queue_cast(lkb->lkb_resource, lkb,
2240 				   args->flags & DLM_LKF_CANCEL ?
2241 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2242 			unhold_lkb(lkb); /* undoes create_lkb() */
2243 		}
2244 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2245 		rv = -EBUSY;
2246 		goto out;
2247 	}
2248 
2249 	/* cancel not allowed with another cancel/unlock in progress */
2250 
2251 	if (args->flags & DLM_LKF_CANCEL) {
2252 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2253 			goto out;
2254 
2255 		if (is_overlap(lkb))
2256 			goto out;
2257 
2258 		/* don't let scand try to do a cancel */
2259 		del_timeout(lkb);
2260 
2261 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2262 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2263 			rv = -EBUSY;
2264 			goto out;
2265 		}
2266 
2267 		/* there's nothing to cancel */
2268 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2269 		    !lkb->lkb_wait_type) {
2270 			rv = -EBUSY;
2271 			goto out;
2272 		}
2273 
2274 		switch (lkb->lkb_wait_type) {
2275 		case DLM_MSG_LOOKUP:
2276 		case DLM_MSG_REQUEST:
2277 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2278 			rv = -EBUSY;
2279 			goto out;
2280 		case DLM_MSG_UNLOCK:
2281 		case DLM_MSG_CANCEL:
2282 			goto out;
2283 		}
2284 		/* add_to_waiters() will set OVERLAP_CANCEL */
2285 		goto out_ok;
2286 	}
2287 
2288 	/* do we need to allow a force-unlock if there's a normal unlock
2289 	   already in progress?  in what conditions could the normal unlock
2290 	   fail such that we'd want to send a force-unlock to be sure? */
2291 
2292 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2293 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2294 			goto out;
2295 
2296 		if (is_overlap_unlock(lkb))
2297 			goto out;
2298 
2299 		/* don't let scand try to do a cancel */
2300 		del_timeout(lkb);
2301 
2302 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2303 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2304 			rv = -EBUSY;
2305 			goto out;
2306 		}
2307 
2308 		switch (lkb->lkb_wait_type) {
2309 		case DLM_MSG_LOOKUP:
2310 		case DLM_MSG_REQUEST:
2311 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2312 			rv = -EBUSY;
2313 			goto out;
2314 		case DLM_MSG_UNLOCK:
2315 			goto out;
2316 		}
2317 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2318 		goto out_ok;
2319 	}
2320 
2321 	/* normal unlock not allowed if there's any op in progress */
2322 	rv = -EBUSY;
2323 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2324 		goto out;
2325 
2326  out_ok:
2327 	/* an overlapping op shouldn't blow away exflags from other op */
2328 	lkb->lkb_exflags |= args->flags;
2329 	lkb->lkb_sbflags = 0;
2330 	lkb->lkb_astparam = args->astparam;
2331 	rv = 0;
2332  out:
2333 	if (rv)
2334 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2335 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2336 			  args->flags, lkb->lkb_wait_type,
2337 			  lkb->lkb_resource->res_name);
2338 	return rv;
2339 }
2340 
2341 /*
2342  * Four stage 4 varieties:
2343  * do_request(), do_convert(), do_unlock(), do_cancel()
2344  * These are called on the master node for the given lock and
2345  * from the central locking logic.
2346  */
2347 
2348 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2349 {
2350 	int error = 0;
2351 
2352 	if (can_be_granted(r, lkb, 1, NULL)) {
2353 		grant_lock(r, lkb);
2354 		queue_cast(r, lkb, 0);
2355 		goto out;
2356 	}
2357 
2358 	if (can_be_queued(lkb)) {
2359 		error = -EINPROGRESS;
2360 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2361 		add_timeout(lkb);
2362 		goto out;
2363 	}
2364 
2365 	error = -EAGAIN;
2366 	queue_cast(r, lkb, -EAGAIN);
2367  out:
2368 	return error;
2369 }
2370 
2371 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2372 			       int error)
2373 {
2374 	switch (error) {
2375 	case -EAGAIN:
2376 		if (force_blocking_asts(lkb))
2377 			send_blocking_asts_all(r, lkb);
2378 		break;
2379 	case -EINPROGRESS:
2380 		send_blocking_asts(r, lkb);
2381 		break;
2382 	}
2383 }
2384 
2385 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387 	int error = 0;
2388 	int deadlk = 0;
2389 
2390 	/* changing an existing lock may allow others to be granted */
2391 
2392 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2393 		grant_lock(r, lkb);
2394 		queue_cast(r, lkb, 0);
2395 		goto out;
2396 	}
2397 
2398 	/* can_be_granted() detected that this lock would block in a conversion
2399 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2400 	   the ast for the convert. */
2401 
2402 	if (deadlk) {
2403 		/* it's left on the granted queue */
2404 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2405 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2406 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2407 		revert_lock(r, lkb);
2408 		queue_cast(r, lkb, -EDEADLK);
2409 		error = -EDEADLK;
2410 		goto out;
2411 	}
2412 
2413 	/* is_demoted() means the can_be_granted() above set the grmode
2414 	   to NL, and left us on the granted queue.  This auto-demotion
2415 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2416 	   now grantable.  We have to try to grant other converting locks
2417 	   before we try again to grant this one. */
2418 
2419 	if (is_demoted(lkb)) {
2420 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2421 		if (_can_be_granted(r, lkb, 1)) {
2422 			grant_lock(r, lkb);
2423 			queue_cast(r, lkb, 0);
2424 			goto out;
2425 		}
2426 		/* else fall through and move to convert queue */
2427 	}
2428 
2429 	if (can_be_queued(lkb)) {
2430 		error = -EINPROGRESS;
2431 		del_lkb(r, lkb);
2432 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2433 		add_timeout(lkb);
2434 		goto out;
2435 	}
2436 
2437 	error = -EAGAIN;
2438 	queue_cast(r, lkb, -EAGAIN);
2439  out:
2440 	return error;
2441 }
2442 
2443 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2444 			       int error)
2445 {
2446 	switch (error) {
2447 	case 0:
2448 		grant_pending_locks(r);
2449 		/* grant_pending_locks also sends basts */
2450 		break;
2451 	case -EAGAIN:
2452 		if (force_blocking_asts(lkb))
2453 			send_blocking_asts_all(r, lkb);
2454 		break;
2455 	case -EINPROGRESS:
2456 		send_blocking_asts(r, lkb);
2457 		break;
2458 	}
2459 }
2460 
2461 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2462 {
2463 	remove_lock(r, lkb);
2464 	queue_cast(r, lkb, -DLM_EUNLOCK);
2465 	return -DLM_EUNLOCK;
2466 }
2467 
2468 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2469 			      int error)
2470 {
2471 	grant_pending_locks(r);
2472 }
2473 
2474 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2475 
2476 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2477 {
2478 	int error;
2479 
2480 	error = revert_lock(r, lkb);
2481 	if (error) {
2482 		queue_cast(r, lkb, -DLM_ECANCEL);
2483 		return -DLM_ECANCEL;
2484 	}
2485 	return 0;
2486 }
2487 
2488 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2489 			      int error)
2490 {
2491 	if (error)
2492 		grant_pending_locks(r);
2493 }
2494 
2495 /*
2496  * Four stage 3 varieties:
2497  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2498  */
2499 
2500 /* add a new lkb to a possibly new rsb, called by requesting process */
2501 
2502 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2503 {
2504 	int error;
2505 
2506 	/* set_master: sets lkb nodeid from r */
2507 
2508 	error = set_master(r, lkb);
2509 	if (error < 0)
2510 		goto out;
2511 	if (error) {
2512 		error = 0;
2513 		goto out;
2514 	}
2515 
2516 	if (is_remote(r)) {
2517 		/* receive_request() calls do_request() on remote node */
2518 		error = send_request(r, lkb);
2519 	} else {
2520 		error = do_request(r, lkb);
2521 		/* for remote locks the request_reply is sent
2522 		   between do_request and do_request_effects */
2523 		do_request_effects(r, lkb, error);
2524 	}
2525  out:
2526 	return error;
2527 }
2528 
2529 /* change some property of an existing lkb, e.g. mode */
2530 
2531 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532 {
2533 	int error;
2534 
2535 	if (is_remote(r)) {
2536 		/* receive_convert() calls do_convert() on remote node */
2537 		error = send_convert(r, lkb);
2538 	} else {
2539 		error = do_convert(r, lkb);
2540 		/* for remote locks the convert_reply is sent
2541 		   between do_convert and do_convert_effects */
2542 		do_convert_effects(r, lkb, error);
2543 	}
2544 
2545 	return error;
2546 }
2547 
2548 /* remove an existing lkb from the granted queue */
2549 
2550 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 {
2552 	int error;
2553 
2554 	if (is_remote(r)) {
2555 		/* receive_unlock() calls do_unlock() on remote node */
2556 		error = send_unlock(r, lkb);
2557 	} else {
2558 		error = do_unlock(r, lkb);
2559 		/* for remote locks the unlock_reply is sent
2560 		   between do_unlock and do_unlock_effects */
2561 		do_unlock_effects(r, lkb, error);
2562 	}
2563 
2564 	return error;
2565 }
2566 
2567 /* remove an existing lkb from the convert or wait queue */
2568 
2569 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571 	int error;
2572 
2573 	if (is_remote(r)) {
2574 		/* receive_cancel() calls do_cancel() on remote node */
2575 		error = send_cancel(r, lkb);
2576 	} else {
2577 		error = do_cancel(r, lkb);
2578 		/* for remote locks the cancel_reply is sent
2579 		   between do_cancel and do_cancel_effects */
2580 		do_cancel_effects(r, lkb, error);
2581 	}
2582 
2583 	return error;
2584 }
2585 
2586 /*
2587  * Four stage 2 varieties:
2588  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2589  */
2590 
2591 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2592 			int len, struct dlm_args *args)
2593 {
2594 	struct dlm_rsb *r;
2595 	int error;
2596 
2597 	error = validate_lock_args(ls, lkb, args);
2598 	if (error)
2599 		goto out;
2600 
2601 	error = find_rsb(ls, name, len, R_CREATE, &r);
2602 	if (error)
2603 		goto out;
2604 
2605 	lock_rsb(r);
2606 
2607 	attach_lkb(r, lkb);
2608 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2609 
2610 	error = _request_lock(r, lkb);
2611 
2612 	unlock_rsb(r);
2613 	put_rsb(r);
2614 
2615  out:
2616 	return error;
2617 }
2618 
2619 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2620 			struct dlm_args *args)
2621 {
2622 	struct dlm_rsb *r;
2623 	int error;
2624 
2625 	r = lkb->lkb_resource;
2626 
2627 	hold_rsb(r);
2628 	lock_rsb(r);
2629 
2630 	error = validate_lock_args(ls, lkb, args);
2631 	if (error)
2632 		goto out;
2633 
2634 	error = _convert_lock(r, lkb);
2635  out:
2636 	unlock_rsb(r);
2637 	put_rsb(r);
2638 	return error;
2639 }
2640 
2641 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2642 		       struct dlm_args *args)
2643 {
2644 	struct dlm_rsb *r;
2645 	int error;
2646 
2647 	r = lkb->lkb_resource;
2648 
2649 	hold_rsb(r);
2650 	lock_rsb(r);
2651 
2652 	error = validate_unlock_args(lkb, args);
2653 	if (error)
2654 		goto out;
2655 
2656 	error = _unlock_lock(r, lkb);
2657  out:
2658 	unlock_rsb(r);
2659 	put_rsb(r);
2660 	return error;
2661 }
2662 
2663 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2664 		       struct dlm_args *args)
2665 {
2666 	struct dlm_rsb *r;
2667 	int error;
2668 
2669 	r = lkb->lkb_resource;
2670 
2671 	hold_rsb(r);
2672 	lock_rsb(r);
2673 
2674 	error = validate_unlock_args(lkb, args);
2675 	if (error)
2676 		goto out;
2677 
2678 	error = _cancel_lock(r, lkb);
2679  out:
2680 	unlock_rsb(r);
2681 	put_rsb(r);
2682 	return error;
2683 }
2684 
2685 /*
2686  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2687  */
2688 
2689 int dlm_lock(dlm_lockspace_t *lockspace,
2690 	     int mode,
2691 	     struct dlm_lksb *lksb,
2692 	     uint32_t flags,
2693 	     void *name,
2694 	     unsigned int namelen,
2695 	     uint32_t parent_lkid,
2696 	     void (*ast) (void *astarg),
2697 	     void *astarg,
2698 	     void (*bast) (void *astarg, int mode))
2699 {
2700 	struct dlm_ls *ls;
2701 	struct dlm_lkb *lkb;
2702 	struct dlm_args args;
2703 	int error, convert = flags & DLM_LKF_CONVERT;
2704 
2705 	ls = dlm_find_lockspace_local(lockspace);
2706 	if (!ls)
2707 		return -EINVAL;
2708 
2709 	dlm_lock_recovery(ls);
2710 
2711 	if (convert)
2712 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2713 	else
2714 		error = create_lkb(ls, &lkb);
2715 
2716 	if (error)
2717 		goto out;
2718 
2719 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2720 			      astarg, bast, &args);
2721 	if (error)
2722 		goto out_put;
2723 
2724 	if (convert)
2725 		error = convert_lock(ls, lkb, &args);
2726 	else
2727 		error = request_lock(ls, lkb, name, namelen, &args);
2728 
2729 	if (error == -EINPROGRESS)
2730 		error = 0;
2731  out_put:
2732 	if (convert || error)
2733 		__put_lkb(ls, lkb);
2734 	if (error == -EAGAIN || error == -EDEADLK)
2735 		error = 0;
2736  out:
2737 	dlm_unlock_recovery(ls);
2738 	dlm_put_lockspace(ls);
2739 	return error;
2740 }
2741 
2742 int dlm_unlock(dlm_lockspace_t *lockspace,
2743 	       uint32_t lkid,
2744 	       uint32_t flags,
2745 	       struct dlm_lksb *lksb,
2746 	       void *astarg)
2747 {
2748 	struct dlm_ls *ls;
2749 	struct dlm_lkb *lkb;
2750 	struct dlm_args args;
2751 	int error;
2752 
2753 	ls = dlm_find_lockspace_local(lockspace);
2754 	if (!ls)
2755 		return -EINVAL;
2756 
2757 	dlm_lock_recovery(ls);
2758 
2759 	error = find_lkb(ls, lkid, &lkb);
2760 	if (error)
2761 		goto out;
2762 
2763 	error = set_unlock_args(flags, astarg, &args);
2764 	if (error)
2765 		goto out_put;
2766 
2767 	if (flags & DLM_LKF_CANCEL)
2768 		error = cancel_lock(ls, lkb, &args);
2769 	else
2770 		error = unlock_lock(ls, lkb, &args);
2771 
2772 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2773 		error = 0;
2774 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2775 		error = 0;
2776  out_put:
2777 	dlm_put_lkb(lkb);
2778  out:
2779 	dlm_unlock_recovery(ls);
2780 	dlm_put_lockspace(ls);
2781 	return error;
2782 }
2783 
2784 /*
2785  * send/receive routines for remote operations and replies
2786  *
2787  * send_args
2788  * send_common
2789  * send_request			receive_request
2790  * send_convert			receive_convert
2791  * send_unlock			receive_unlock
2792  * send_cancel			receive_cancel
2793  * send_grant			receive_grant
2794  * send_bast			receive_bast
2795  * send_lookup			receive_lookup
2796  * send_remove			receive_remove
2797  *
2798  * 				send_common_reply
2799  * receive_request_reply	send_request_reply
2800  * receive_convert_reply	send_convert_reply
2801  * receive_unlock_reply		send_unlock_reply
2802  * receive_cancel_reply		send_cancel_reply
2803  * receive_lookup_reply		send_lookup_reply
2804  */
2805 
2806 static int _create_message(struct dlm_ls *ls, int mb_len,
2807 			   int to_nodeid, int mstype,
2808 			   struct dlm_message **ms_ret,
2809 			   struct dlm_mhandle **mh_ret)
2810 {
2811 	struct dlm_message *ms;
2812 	struct dlm_mhandle *mh;
2813 	char *mb;
2814 
2815 	/* get_buffer gives us a message handle (mh) that we need to
2816 	   pass into lowcomms_commit and a message buffer (mb) that we
2817 	   write our data into */
2818 
2819 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2820 	if (!mh)
2821 		return -ENOBUFS;
2822 
2823 	memset(mb, 0, mb_len);
2824 
2825 	ms = (struct dlm_message *) mb;
2826 
2827 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2828 	ms->m_header.h_lockspace = ls->ls_global_id;
2829 	ms->m_header.h_nodeid = dlm_our_nodeid();
2830 	ms->m_header.h_length = mb_len;
2831 	ms->m_header.h_cmd = DLM_MSG;
2832 
2833 	ms->m_type = mstype;
2834 
2835 	*mh_ret = mh;
2836 	*ms_ret = ms;
2837 	return 0;
2838 }
2839 
2840 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2841 			  int to_nodeid, int mstype,
2842 			  struct dlm_message **ms_ret,
2843 			  struct dlm_mhandle **mh_ret)
2844 {
2845 	int mb_len = sizeof(struct dlm_message);
2846 
2847 	switch (mstype) {
2848 	case DLM_MSG_REQUEST:
2849 	case DLM_MSG_LOOKUP:
2850 	case DLM_MSG_REMOVE:
2851 		mb_len += r->res_length;
2852 		break;
2853 	case DLM_MSG_CONVERT:
2854 	case DLM_MSG_UNLOCK:
2855 	case DLM_MSG_REQUEST_REPLY:
2856 	case DLM_MSG_CONVERT_REPLY:
2857 	case DLM_MSG_GRANT:
2858 		if (lkb && lkb->lkb_lvbptr)
2859 			mb_len += r->res_ls->ls_lvblen;
2860 		break;
2861 	}
2862 
2863 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2864 			       ms_ret, mh_ret);
2865 }
2866 
2867 /* further lowcomms enhancements or alternate implementations may make
2868    the return value from this function useful at some point */
2869 
2870 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2871 {
2872 	dlm_message_out(ms);
2873 	dlm_lowcomms_commit_buffer(mh);
2874 	return 0;
2875 }
2876 
2877 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2878 		      struct dlm_message *ms)
2879 {
2880 	ms->m_nodeid   = lkb->lkb_nodeid;
2881 	ms->m_pid      = lkb->lkb_ownpid;
2882 	ms->m_lkid     = lkb->lkb_id;
2883 	ms->m_remid    = lkb->lkb_remid;
2884 	ms->m_exflags  = lkb->lkb_exflags;
2885 	ms->m_sbflags  = lkb->lkb_sbflags;
2886 	ms->m_flags    = lkb->lkb_flags;
2887 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2888 	ms->m_status   = lkb->lkb_status;
2889 	ms->m_grmode   = lkb->lkb_grmode;
2890 	ms->m_rqmode   = lkb->lkb_rqmode;
2891 	ms->m_hash     = r->res_hash;
2892 
2893 	/* m_result and m_bastmode are set from function args,
2894 	   not from lkb fields */
2895 
2896 	if (lkb->lkb_bastfn)
2897 		ms->m_asts |= DLM_CB_BAST;
2898 	if (lkb->lkb_astfn)
2899 		ms->m_asts |= DLM_CB_CAST;
2900 
2901 	/* compare with switch in create_message; send_remove() doesn't
2902 	   use send_args() */
2903 
2904 	switch (ms->m_type) {
2905 	case DLM_MSG_REQUEST:
2906 	case DLM_MSG_LOOKUP:
2907 		memcpy(ms->m_extra, r->res_name, r->res_length);
2908 		break;
2909 	case DLM_MSG_CONVERT:
2910 	case DLM_MSG_UNLOCK:
2911 	case DLM_MSG_REQUEST_REPLY:
2912 	case DLM_MSG_CONVERT_REPLY:
2913 	case DLM_MSG_GRANT:
2914 		if (!lkb->lkb_lvbptr)
2915 			break;
2916 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2917 		break;
2918 	}
2919 }
2920 
2921 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2922 {
2923 	struct dlm_message *ms;
2924 	struct dlm_mhandle *mh;
2925 	int to_nodeid, error;
2926 
2927 	to_nodeid = r->res_nodeid;
2928 
2929 	error = add_to_waiters(lkb, mstype, to_nodeid);
2930 	if (error)
2931 		return error;
2932 
2933 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2934 	if (error)
2935 		goto fail;
2936 
2937 	send_args(r, lkb, ms);
2938 
2939 	error = send_message(mh, ms);
2940 	if (error)
2941 		goto fail;
2942 	return 0;
2943 
2944  fail:
2945 	remove_from_waiters(lkb, msg_reply_type(mstype));
2946 	return error;
2947 }
2948 
2949 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2950 {
2951 	return send_common(r, lkb, DLM_MSG_REQUEST);
2952 }
2953 
2954 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2955 {
2956 	int error;
2957 
2958 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2959 
2960 	/* down conversions go without a reply from the master */
2961 	if (!error && down_conversion(lkb)) {
2962 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963 		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2964 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2965 		r->res_ls->ls_stub_ms.m_result = 0;
2966 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2967 	}
2968 
2969 	return error;
2970 }
2971 
2972 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2973    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2974    that the master is still correct. */
2975 
2976 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2977 {
2978 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2979 }
2980 
2981 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2982 {
2983 	return send_common(r, lkb, DLM_MSG_CANCEL);
2984 }
2985 
2986 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2987 {
2988 	struct dlm_message *ms;
2989 	struct dlm_mhandle *mh;
2990 	int to_nodeid, error;
2991 
2992 	to_nodeid = lkb->lkb_nodeid;
2993 
2994 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2995 	if (error)
2996 		goto out;
2997 
2998 	send_args(r, lkb, ms);
2999 
3000 	ms->m_result = 0;
3001 
3002 	error = send_message(mh, ms);
3003  out:
3004 	return error;
3005 }
3006 
3007 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3008 {
3009 	struct dlm_message *ms;
3010 	struct dlm_mhandle *mh;
3011 	int to_nodeid, error;
3012 
3013 	to_nodeid = lkb->lkb_nodeid;
3014 
3015 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3016 	if (error)
3017 		goto out;
3018 
3019 	send_args(r, lkb, ms);
3020 
3021 	ms->m_bastmode = mode;
3022 
3023 	error = send_message(mh, ms);
3024  out:
3025 	return error;
3026 }
3027 
3028 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3029 {
3030 	struct dlm_message *ms;
3031 	struct dlm_mhandle *mh;
3032 	int to_nodeid, error;
3033 
3034 	to_nodeid = dlm_dir_nodeid(r);
3035 
3036 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3037 	if (error)
3038 		return error;
3039 
3040 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3041 	if (error)
3042 		goto fail;
3043 
3044 	send_args(r, lkb, ms);
3045 
3046 	error = send_message(mh, ms);
3047 	if (error)
3048 		goto fail;
3049 	return 0;
3050 
3051  fail:
3052 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3053 	return error;
3054 }
3055 
3056 static int send_remove(struct dlm_rsb *r)
3057 {
3058 	struct dlm_message *ms;
3059 	struct dlm_mhandle *mh;
3060 	int to_nodeid, error;
3061 
3062 	to_nodeid = dlm_dir_nodeid(r);
3063 
3064 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3065 	if (error)
3066 		goto out;
3067 
3068 	memcpy(ms->m_extra, r->res_name, r->res_length);
3069 	ms->m_hash = r->res_hash;
3070 
3071 	error = send_message(mh, ms);
3072  out:
3073 	return error;
3074 }
3075 
3076 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3077 			     int mstype, int rv)
3078 {
3079 	struct dlm_message *ms;
3080 	struct dlm_mhandle *mh;
3081 	int to_nodeid, error;
3082 
3083 	to_nodeid = lkb->lkb_nodeid;
3084 
3085 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3086 	if (error)
3087 		goto out;
3088 
3089 	send_args(r, lkb, ms);
3090 
3091 	ms->m_result = rv;
3092 
3093 	error = send_message(mh, ms);
3094  out:
3095 	return error;
3096 }
3097 
3098 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3099 {
3100 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3101 }
3102 
3103 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3104 {
3105 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3106 }
3107 
3108 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3109 {
3110 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3111 }
3112 
3113 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3114 {
3115 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3116 }
3117 
3118 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3119 			     int ret_nodeid, int rv)
3120 {
3121 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3122 	struct dlm_message *ms;
3123 	struct dlm_mhandle *mh;
3124 	int error, nodeid = ms_in->m_header.h_nodeid;
3125 
3126 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3127 	if (error)
3128 		goto out;
3129 
3130 	ms->m_lkid = ms_in->m_lkid;
3131 	ms->m_result = rv;
3132 	ms->m_nodeid = ret_nodeid;
3133 
3134 	error = send_message(mh, ms);
3135  out:
3136 	return error;
3137 }
3138 
3139 /* which args we save from a received message depends heavily on the type
3140    of message, unlike the send side where we can safely send everything about
3141    the lkb for any type of message */
3142 
3143 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3144 {
3145 	lkb->lkb_exflags = ms->m_exflags;
3146 	lkb->lkb_sbflags = ms->m_sbflags;
3147 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3148 		         (ms->m_flags & 0x0000FFFF);
3149 }
3150 
3151 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3152 {
3153 	if (ms->m_flags == DLM_IFL_STUB_MS)
3154 		return;
3155 
3156 	lkb->lkb_sbflags = ms->m_sbflags;
3157 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3158 		         (ms->m_flags & 0x0000FFFF);
3159 }
3160 
3161 static int receive_extralen(struct dlm_message *ms)
3162 {
3163 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3164 }
3165 
3166 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3167 		       struct dlm_message *ms)
3168 {
3169 	int len;
3170 
3171 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3172 		if (!lkb->lkb_lvbptr)
3173 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3174 		if (!lkb->lkb_lvbptr)
3175 			return -ENOMEM;
3176 		len = receive_extralen(ms);
3177 		if (len > DLM_RESNAME_MAXLEN)
3178 			len = DLM_RESNAME_MAXLEN;
3179 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3180 	}
3181 	return 0;
3182 }
3183 
3184 static void fake_bastfn(void *astparam, int mode)
3185 {
3186 	log_print("fake_bastfn should not be called");
3187 }
3188 
3189 static void fake_astfn(void *astparam)
3190 {
3191 	log_print("fake_astfn should not be called");
3192 }
3193 
3194 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3195 				struct dlm_message *ms)
3196 {
3197 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3198 	lkb->lkb_ownpid = ms->m_pid;
3199 	lkb->lkb_remid = ms->m_lkid;
3200 	lkb->lkb_grmode = DLM_LOCK_IV;
3201 	lkb->lkb_rqmode = ms->m_rqmode;
3202 
3203 	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3204 	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3205 
3206 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3207 		/* lkb was just created so there won't be an lvb yet */
3208 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3209 		if (!lkb->lkb_lvbptr)
3210 			return -ENOMEM;
3211 	}
3212 
3213 	return 0;
3214 }
3215 
3216 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3217 				struct dlm_message *ms)
3218 {
3219 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3220 		return -EBUSY;
3221 
3222 	if (receive_lvb(ls, lkb, ms))
3223 		return -ENOMEM;
3224 
3225 	lkb->lkb_rqmode = ms->m_rqmode;
3226 	lkb->lkb_lvbseq = ms->m_lvbseq;
3227 
3228 	return 0;
3229 }
3230 
3231 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3232 			       struct dlm_message *ms)
3233 {
3234 	if (receive_lvb(ls, lkb, ms))
3235 		return -ENOMEM;
3236 	return 0;
3237 }
3238 
3239 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3240    uses to send a reply and that the remote end uses to process the reply. */
3241 
3242 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3243 {
3244 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3245 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3246 	lkb->lkb_remid = ms->m_lkid;
3247 }
3248 
3249 /* This is called after the rsb is locked so that we can safely inspect
3250    fields in the lkb. */
3251 
3252 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3253 {
3254 	int from = ms->m_header.h_nodeid;
3255 	int error = 0;
3256 
3257 	switch (ms->m_type) {
3258 	case DLM_MSG_CONVERT:
3259 	case DLM_MSG_UNLOCK:
3260 	case DLM_MSG_CANCEL:
3261 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3262 			error = -EINVAL;
3263 		break;
3264 
3265 	case DLM_MSG_CONVERT_REPLY:
3266 	case DLM_MSG_UNLOCK_REPLY:
3267 	case DLM_MSG_CANCEL_REPLY:
3268 	case DLM_MSG_GRANT:
3269 	case DLM_MSG_BAST:
3270 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3271 			error = -EINVAL;
3272 		break;
3273 
3274 	case DLM_MSG_REQUEST_REPLY:
3275 		if (!is_process_copy(lkb))
3276 			error = -EINVAL;
3277 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3278 			error = -EINVAL;
3279 		break;
3280 
3281 	default:
3282 		error = -EINVAL;
3283 	}
3284 
3285 	if (error)
3286 		log_error(lkb->lkb_resource->res_ls,
3287 			  "ignore invalid message %d from %d %x %x %x %d",
3288 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3289 			  lkb->lkb_flags, lkb->lkb_nodeid);
3290 	return error;
3291 }
3292 
3293 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3294 {
3295 	struct dlm_lkb *lkb;
3296 	struct dlm_rsb *r;
3297 	int error, namelen;
3298 
3299 	error = create_lkb(ls, &lkb);
3300 	if (error)
3301 		goto fail;
3302 
3303 	receive_flags(lkb, ms);
3304 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3305 	error = receive_request_args(ls, lkb, ms);
3306 	if (error) {
3307 		__put_lkb(ls, lkb);
3308 		goto fail;
3309 	}
3310 
3311 	namelen = receive_extralen(ms);
3312 
3313 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3314 	if (error) {
3315 		__put_lkb(ls, lkb);
3316 		goto fail;
3317 	}
3318 
3319 	lock_rsb(r);
3320 
3321 	attach_lkb(r, lkb);
3322 	error = do_request(r, lkb);
3323 	send_request_reply(r, lkb, error);
3324 	do_request_effects(r, lkb, error);
3325 
3326 	unlock_rsb(r);
3327 	put_rsb(r);
3328 
3329 	if (error == -EINPROGRESS)
3330 		error = 0;
3331 	if (error)
3332 		dlm_put_lkb(lkb);
3333 	return;
3334 
3335  fail:
3336 	setup_stub_lkb(ls, ms);
3337 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3338 }
3339 
3340 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3341 {
3342 	struct dlm_lkb *lkb;
3343 	struct dlm_rsb *r;
3344 	int error, reply = 1;
3345 
3346 	error = find_lkb(ls, ms->m_remid, &lkb);
3347 	if (error)
3348 		goto fail;
3349 
3350 	r = lkb->lkb_resource;
3351 
3352 	hold_rsb(r);
3353 	lock_rsb(r);
3354 
3355 	error = validate_message(lkb, ms);
3356 	if (error)
3357 		goto out;
3358 
3359 	receive_flags(lkb, ms);
3360 
3361 	error = receive_convert_args(ls, lkb, ms);
3362 	if (error) {
3363 		send_convert_reply(r, lkb, error);
3364 		goto out;
3365 	}
3366 
3367 	reply = !down_conversion(lkb);
3368 
3369 	error = do_convert(r, lkb);
3370 	if (reply)
3371 		send_convert_reply(r, lkb, error);
3372 	do_convert_effects(r, lkb, error);
3373  out:
3374 	unlock_rsb(r);
3375 	put_rsb(r);
3376 	dlm_put_lkb(lkb);
3377 	return;
3378 
3379  fail:
3380 	setup_stub_lkb(ls, ms);
3381 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3382 }
3383 
3384 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3385 {
3386 	struct dlm_lkb *lkb;
3387 	struct dlm_rsb *r;
3388 	int error;
3389 
3390 	error = find_lkb(ls, ms->m_remid, &lkb);
3391 	if (error)
3392 		goto fail;
3393 
3394 	r = lkb->lkb_resource;
3395 
3396 	hold_rsb(r);
3397 	lock_rsb(r);
3398 
3399 	error = validate_message(lkb, ms);
3400 	if (error)
3401 		goto out;
3402 
3403 	receive_flags(lkb, ms);
3404 
3405 	error = receive_unlock_args(ls, lkb, ms);
3406 	if (error) {
3407 		send_unlock_reply(r, lkb, error);
3408 		goto out;
3409 	}
3410 
3411 	error = do_unlock(r, lkb);
3412 	send_unlock_reply(r, lkb, error);
3413 	do_unlock_effects(r, lkb, error);
3414  out:
3415 	unlock_rsb(r);
3416 	put_rsb(r);
3417 	dlm_put_lkb(lkb);
3418 	return;
3419 
3420  fail:
3421 	setup_stub_lkb(ls, ms);
3422 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3423 }
3424 
3425 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3426 {
3427 	struct dlm_lkb *lkb;
3428 	struct dlm_rsb *r;
3429 	int error;
3430 
3431 	error = find_lkb(ls, ms->m_remid, &lkb);
3432 	if (error)
3433 		goto fail;
3434 
3435 	receive_flags(lkb, ms);
3436 
3437 	r = lkb->lkb_resource;
3438 
3439 	hold_rsb(r);
3440 	lock_rsb(r);
3441 
3442 	error = validate_message(lkb, ms);
3443 	if (error)
3444 		goto out;
3445 
3446 	error = do_cancel(r, lkb);
3447 	send_cancel_reply(r, lkb, error);
3448 	do_cancel_effects(r, lkb, error);
3449  out:
3450 	unlock_rsb(r);
3451 	put_rsb(r);
3452 	dlm_put_lkb(lkb);
3453 	return;
3454 
3455  fail:
3456 	setup_stub_lkb(ls, ms);
3457 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3458 }
3459 
3460 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3461 {
3462 	struct dlm_lkb *lkb;
3463 	struct dlm_rsb *r;
3464 	int error;
3465 
3466 	error = find_lkb(ls, ms->m_remid, &lkb);
3467 	if (error) {
3468 		log_debug(ls, "receive_grant from %d no lkb %x",
3469 			  ms->m_header.h_nodeid, ms->m_remid);
3470 		return;
3471 	}
3472 
3473 	r = lkb->lkb_resource;
3474 
3475 	hold_rsb(r);
3476 	lock_rsb(r);
3477 
3478 	error = validate_message(lkb, ms);
3479 	if (error)
3480 		goto out;
3481 
3482 	receive_flags_reply(lkb, ms);
3483 	if (is_altmode(lkb))
3484 		munge_altmode(lkb, ms);
3485 	grant_lock_pc(r, lkb, ms);
3486 	queue_cast(r, lkb, 0);
3487  out:
3488 	unlock_rsb(r);
3489 	put_rsb(r);
3490 	dlm_put_lkb(lkb);
3491 }
3492 
3493 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3494 {
3495 	struct dlm_lkb *lkb;
3496 	struct dlm_rsb *r;
3497 	int error;
3498 
3499 	error = find_lkb(ls, ms->m_remid, &lkb);
3500 	if (error) {
3501 		log_debug(ls, "receive_bast from %d no lkb %x",
3502 			  ms->m_header.h_nodeid, ms->m_remid);
3503 		return;
3504 	}
3505 
3506 	r = lkb->lkb_resource;
3507 
3508 	hold_rsb(r);
3509 	lock_rsb(r);
3510 
3511 	error = validate_message(lkb, ms);
3512 	if (error)
3513 		goto out;
3514 
3515 	queue_bast(r, lkb, ms->m_bastmode);
3516  out:
3517 	unlock_rsb(r);
3518 	put_rsb(r);
3519 	dlm_put_lkb(lkb);
3520 }
3521 
3522 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3523 {
3524 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3525 
3526 	from_nodeid = ms->m_header.h_nodeid;
3527 	our_nodeid = dlm_our_nodeid();
3528 
3529 	len = receive_extralen(ms);
3530 
3531 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3532 	if (dir_nodeid != our_nodeid) {
3533 		log_error(ls, "lookup dir_nodeid %d from %d",
3534 			  dir_nodeid, from_nodeid);
3535 		error = -EINVAL;
3536 		ret_nodeid = -1;
3537 		goto out;
3538 	}
3539 
3540 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3541 
3542 	/* Optimization: we're master so treat lookup as a request */
3543 	if (!error && ret_nodeid == our_nodeid) {
3544 		receive_request(ls, ms);
3545 		return;
3546 	}
3547  out:
3548 	send_lookup_reply(ls, ms, ret_nodeid, error);
3549 }
3550 
3551 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3552 {
3553 	int len, dir_nodeid, from_nodeid;
3554 
3555 	from_nodeid = ms->m_header.h_nodeid;
3556 
3557 	len = receive_extralen(ms);
3558 
3559 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3560 	if (dir_nodeid != dlm_our_nodeid()) {
3561 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3562 			  dir_nodeid, from_nodeid);
3563 		return;
3564 	}
3565 
3566 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3567 }
3568 
3569 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3570 {
3571 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3572 }
3573 
3574 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3575 {
3576 	struct dlm_lkb *lkb;
3577 	struct dlm_rsb *r;
3578 	int error, mstype, result;
3579 
3580 	error = find_lkb(ls, ms->m_remid, &lkb);
3581 	if (error) {
3582 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3583 			  ms->m_header.h_nodeid, ms->m_remid);
3584 		return;
3585 	}
3586 
3587 	r = lkb->lkb_resource;
3588 	hold_rsb(r);
3589 	lock_rsb(r);
3590 
3591 	error = validate_message(lkb, ms);
3592 	if (error)
3593 		goto out;
3594 
3595 	mstype = lkb->lkb_wait_type;
3596 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3597 	if (error)
3598 		goto out;
3599 
3600 	/* Optimization: the dir node was also the master, so it took our
3601 	   lookup as a request and sent request reply instead of lookup reply */
3602 	if (mstype == DLM_MSG_LOOKUP) {
3603 		r->res_nodeid = ms->m_header.h_nodeid;
3604 		lkb->lkb_nodeid = r->res_nodeid;
3605 	}
3606 
3607 	/* this is the value returned from do_request() on the master */
3608 	result = ms->m_result;
3609 
3610 	switch (result) {
3611 	case -EAGAIN:
3612 		/* request would block (be queued) on remote master */
3613 		queue_cast(r, lkb, -EAGAIN);
3614 		confirm_master(r, -EAGAIN);
3615 		unhold_lkb(lkb); /* undoes create_lkb() */
3616 		break;
3617 
3618 	case -EINPROGRESS:
3619 	case 0:
3620 		/* request was queued or granted on remote master */
3621 		receive_flags_reply(lkb, ms);
3622 		lkb->lkb_remid = ms->m_lkid;
3623 		if (is_altmode(lkb))
3624 			munge_altmode(lkb, ms);
3625 		if (result) {
3626 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3627 			add_timeout(lkb);
3628 		} else {
3629 			grant_lock_pc(r, lkb, ms);
3630 			queue_cast(r, lkb, 0);
3631 		}
3632 		confirm_master(r, result);
3633 		break;
3634 
3635 	case -EBADR:
3636 	case -ENOTBLK:
3637 		/* find_rsb failed to find rsb or rsb wasn't master */
3638 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3639 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3640 		r->res_nodeid = -1;
3641 		lkb->lkb_nodeid = -1;
3642 
3643 		if (is_overlap(lkb)) {
3644 			/* we'll ignore error in cancel/unlock reply */
3645 			queue_cast_overlap(r, lkb);
3646 			confirm_master(r, result);
3647 			unhold_lkb(lkb); /* undoes create_lkb() */
3648 		} else
3649 			_request_lock(r, lkb);
3650 		break;
3651 
3652 	default:
3653 		log_error(ls, "receive_request_reply %x error %d",
3654 			  lkb->lkb_id, result);
3655 	}
3656 
3657 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3658 		log_debug(ls, "receive_request_reply %x result %d unlock",
3659 			  lkb->lkb_id, result);
3660 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3661 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3662 		send_unlock(r, lkb);
3663 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3664 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3665 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3666 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3667 		send_cancel(r, lkb);
3668 	} else {
3669 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3670 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3671 	}
3672  out:
3673 	unlock_rsb(r);
3674 	put_rsb(r);
3675 	dlm_put_lkb(lkb);
3676 }
3677 
3678 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3679 				    struct dlm_message *ms)
3680 {
3681 	/* this is the value returned from do_convert() on the master */
3682 	switch (ms->m_result) {
3683 	case -EAGAIN:
3684 		/* convert would block (be queued) on remote master */
3685 		queue_cast(r, lkb, -EAGAIN);
3686 		break;
3687 
3688 	case -EDEADLK:
3689 		receive_flags_reply(lkb, ms);
3690 		revert_lock_pc(r, lkb);
3691 		queue_cast(r, lkb, -EDEADLK);
3692 		break;
3693 
3694 	case -EINPROGRESS:
3695 		/* convert was queued on remote master */
3696 		receive_flags_reply(lkb, ms);
3697 		if (is_demoted(lkb))
3698 			munge_demoted(lkb);
3699 		del_lkb(r, lkb);
3700 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3701 		add_timeout(lkb);
3702 		break;
3703 
3704 	case 0:
3705 		/* convert was granted on remote master */
3706 		receive_flags_reply(lkb, ms);
3707 		if (is_demoted(lkb))
3708 			munge_demoted(lkb);
3709 		grant_lock_pc(r, lkb, ms);
3710 		queue_cast(r, lkb, 0);
3711 		break;
3712 
3713 	default:
3714 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3715 			  lkb->lkb_id, ms->m_result);
3716 	}
3717 }
3718 
3719 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3720 {
3721 	struct dlm_rsb *r = lkb->lkb_resource;
3722 	int error;
3723 
3724 	hold_rsb(r);
3725 	lock_rsb(r);
3726 
3727 	error = validate_message(lkb, ms);
3728 	if (error)
3729 		goto out;
3730 
3731 	/* stub reply can happen with waiters_mutex held */
3732 	error = remove_from_waiters_ms(lkb, ms);
3733 	if (error)
3734 		goto out;
3735 
3736 	__receive_convert_reply(r, lkb, ms);
3737  out:
3738 	unlock_rsb(r);
3739 	put_rsb(r);
3740 }
3741 
3742 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3743 {
3744 	struct dlm_lkb *lkb;
3745 	int error;
3746 
3747 	error = find_lkb(ls, ms->m_remid, &lkb);
3748 	if (error) {
3749 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3750 			  ms->m_header.h_nodeid, ms->m_remid);
3751 		return;
3752 	}
3753 
3754 	_receive_convert_reply(lkb, ms);
3755 	dlm_put_lkb(lkb);
3756 }
3757 
3758 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3759 {
3760 	struct dlm_rsb *r = lkb->lkb_resource;
3761 	int error;
3762 
3763 	hold_rsb(r);
3764 	lock_rsb(r);
3765 
3766 	error = validate_message(lkb, ms);
3767 	if (error)
3768 		goto out;
3769 
3770 	/* stub reply can happen with waiters_mutex held */
3771 	error = remove_from_waiters_ms(lkb, ms);
3772 	if (error)
3773 		goto out;
3774 
3775 	/* this is the value returned from do_unlock() on the master */
3776 
3777 	switch (ms->m_result) {
3778 	case -DLM_EUNLOCK:
3779 		receive_flags_reply(lkb, ms);
3780 		remove_lock_pc(r, lkb);
3781 		queue_cast(r, lkb, -DLM_EUNLOCK);
3782 		break;
3783 	case -ENOENT:
3784 		break;
3785 	default:
3786 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3787 			  lkb->lkb_id, ms->m_result);
3788 	}
3789  out:
3790 	unlock_rsb(r);
3791 	put_rsb(r);
3792 }
3793 
3794 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3795 {
3796 	struct dlm_lkb *lkb;
3797 	int error;
3798 
3799 	error = find_lkb(ls, ms->m_remid, &lkb);
3800 	if (error) {
3801 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3802 			  ms->m_header.h_nodeid, ms->m_remid);
3803 		return;
3804 	}
3805 
3806 	_receive_unlock_reply(lkb, ms);
3807 	dlm_put_lkb(lkb);
3808 }
3809 
3810 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3811 {
3812 	struct dlm_rsb *r = lkb->lkb_resource;
3813 	int error;
3814 
3815 	hold_rsb(r);
3816 	lock_rsb(r);
3817 
3818 	error = validate_message(lkb, ms);
3819 	if (error)
3820 		goto out;
3821 
3822 	/* stub reply can happen with waiters_mutex held */
3823 	error = remove_from_waiters_ms(lkb, ms);
3824 	if (error)
3825 		goto out;
3826 
3827 	/* this is the value returned from do_cancel() on the master */
3828 
3829 	switch (ms->m_result) {
3830 	case -DLM_ECANCEL:
3831 		receive_flags_reply(lkb, ms);
3832 		revert_lock_pc(r, lkb);
3833 		queue_cast(r, lkb, -DLM_ECANCEL);
3834 		break;
3835 	case 0:
3836 		break;
3837 	default:
3838 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3839 			  lkb->lkb_id, ms->m_result);
3840 	}
3841  out:
3842 	unlock_rsb(r);
3843 	put_rsb(r);
3844 }
3845 
3846 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3847 {
3848 	struct dlm_lkb *lkb;
3849 	int error;
3850 
3851 	error = find_lkb(ls, ms->m_remid, &lkb);
3852 	if (error) {
3853 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3854 			  ms->m_header.h_nodeid, ms->m_remid);
3855 		return;
3856 	}
3857 
3858 	_receive_cancel_reply(lkb, ms);
3859 	dlm_put_lkb(lkb);
3860 }
3861 
3862 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3863 {
3864 	struct dlm_lkb *lkb;
3865 	struct dlm_rsb *r;
3866 	int error, ret_nodeid;
3867 
3868 	error = find_lkb(ls, ms->m_lkid, &lkb);
3869 	if (error) {
3870 		log_error(ls, "receive_lookup_reply no lkb");
3871 		return;
3872 	}
3873 
3874 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3875 	   FIXME: will a non-zero error ever be returned? */
3876 
3877 	r = lkb->lkb_resource;
3878 	hold_rsb(r);
3879 	lock_rsb(r);
3880 
3881 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3882 	if (error)
3883 		goto out;
3884 
3885 	ret_nodeid = ms->m_nodeid;
3886 	if (ret_nodeid == dlm_our_nodeid()) {
3887 		r->res_nodeid = 0;
3888 		ret_nodeid = 0;
3889 		r->res_first_lkid = 0;
3890 	} else {
3891 		/* set_master() will copy res_nodeid to lkb_nodeid */
3892 		r->res_nodeid = ret_nodeid;
3893 	}
3894 
3895 	if (is_overlap(lkb)) {
3896 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3897 			  lkb->lkb_id, lkb->lkb_flags);
3898 		queue_cast_overlap(r, lkb);
3899 		unhold_lkb(lkb); /* undoes create_lkb() */
3900 		goto out_list;
3901 	}
3902 
3903 	_request_lock(r, lkb);
3904 
3905  out_list:
3906 	if (!ret_nodeid)
3907 		process_lookup_list(r);
3908  out:
3909 	unlock_rsb(r);
3910 	put_rsb(r);
3911 	dlm_put_lkb(lkb);
3912 }
3913 
3914 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3915 {
3916 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3917 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3918 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3919 			  ms->m_remid, ms->m_result);
3920 		return;
3921 	}
3922 
3923 	switch (ms->m_type) {
3924 
3925 	/* messages sent to a master node */
3926 
3927 	case DLM_MSG_REQUEST:
3928 		receive_request(ls, ms);
3929 		break;
3930 
3931 	case DLM_MSG_CONVERT:
3932 		receive_convert(ls, ms);
3933 		break;
3934 
3935 	case DLM_MSG_UNLOCK:
3936 		receive_unlock(ls, ms);
3937 		break;
3938 
3939 	case DLM_MSG_CANCEL:
3940 		receive_cancel(ls, ms);
3941 		break;
3942 
3943 	/* messages sent from a master node (replies to above) */
3944 
3945 	case DLM_MSG_REQUEST_REPLY:
3946 		receive_request_reply(ls, ms);
3947 		break;
3948 
3949 	case DLM_MSG_CONVERT_REPLY:
3950 		receive_convert_reply(ls, ms);
3951 		break;
3952 
3953 	case DLM_MSG_UNLOCK_REPLY:
3954 		receive_unlock_reply(ls, ms);
3955 		break;
3956 
3957 	case DLM_MSG_CANCEL_REPLY:
3958 		receive_cancel_reply(ls, ms);
3959 		break;
3960 
3961 	/* messages sent from a master node (only two types of async msg) */
3962 
3963 	case DLM_MSG_GRANT:
3964 		receive_grant(ls, ms);
3965 		break;
3966 
3967 	case DLM_MSG_BAST:
3968 		receive_bast(ls, ms);
3969 		break;
3970 
3971 	/* messages sent to a dir node */
3972 
3973 	case DLM_MSG_LOOKUP:
3974 		receive_lookup(ls, ms);
3975 		break;
3976 
3977 	case DLM_MSG_REMOVE:
3978 		receive_remove(ls, ms);
3979 		break;
3980 
3981 	/* messages sent from a dir node (remove has no reply) */
3982 
3983 	case DLM_MSG_LOOKUP_REPLY:
3984 		receive_lookup_reply(ls, ms);
3985 		break;
3986 
3987 	/* other messages */
3988 
3989 	case DLM_MSG_PURGE:
3990 		receive_purge(ls, ms);
3991 		break;
3992 
3993 	default:
3994 		log_error(ls, "unknown message type %d", ms->m_type);
3995 	}
3996 
3997 	dlm_astd_wake();
3998 }
3999 
4000 /* If the lockspace is in recovery mode (locking stopped), then normal
4001    messages are saved on the requestqueue for processing after recovery is
4002    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4003    messages off the requestqueue before we process new ones. This occurs right
4004    after recovery completes when we transition from saving all messages on
4005    requestqueue, to processing all the saved messages, to processing new
4006    messages as they arrive. */
4007 
4008 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4009 				int nodeid)
4010 {
4011 	if (dlm_locking_stopped(ls)) {
4012 		dlm_add_requestqueue(ls, nodeid, ms);
4013 	} else {
4014 		dlm_wait_requestqueue(ls);
4015 		_receive_message(ls, ms);
4016 	}
4017 }
4018 
4019 /* This is called by dlm_recoverd to process messages that were saved on
4020    the requestqueue. */
4021 
4022 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4023 {
4024 	_receive_message(ls, ms);
4025 }
4026 
4027 /* This is called by the midcomms layer when something is received for
4028    the lockspace.  It could be either a MSG (normal message sent as part of
4029    standard locking activity) or an RCOM (recovery message sent as part of
4030    lockspace recovery). */
4031 
4032 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4033 {
4034 	struct dlm_header *hd = &p->header;
4035 	struct dlm_ls *ls;
4036 	int type = 0;
4037 
4038 	switch (hd->h_cmd) {
4039 	case DLM_MSG:
4040 		dlm_message_in(&p->message);
4041 		type = p->message.m_type;
4042 		break;
4043 	case DLM_RCOM:
4044 		dlm_rcom_in(&p->rcom);
4045 		type = p->rcom.rc_type;
4046 		break;
4047 	default:
4048 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4049 		return;
4050 	}
4051 
4052 	if (hd->h_nodeid != nodeid) {
4053 		log_print("invalid h_nodeid %d from %d lockspace %x",
4054 			  hd->h_nodeid, nodeid, hd->h_lockspace);
4055 		return;
4056 	}
4057 
4058 	ls = dlm_find_lockspace_global(hd->h_lockspace);
4059 	if (!ls) {
4060 		if (dlm_config.ci_log_debug)
4061 			log_print("invalid lockspace %x from %d cmd %d type %d",
4062 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
4063 
4064 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4065 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4066 		return;
4067 	}
4068 
4069 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4070 	   be inactive (in this ls) before transitioning to recovery mode */
4071 
4072 	down_read(&ls->ls_recv_active);
4073 	if (hd->h_cmd == DLM_MSG)
4074 		dlm_receive_message(ls, &p->message, nodeid);
4075 	else
4076 		dlm_receive_rcom(ls, &p->rcom, nodeid);
4077 	up_read(&ls->ls_recv_active);
4078 
4079 	dlm_put_lockspace(ls);
4080 }
4081 
4082 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4083 				   struct dlm_message *ms_stub)
4084 {
4085 	if (middle_conversion(lkb)) {
4086 		hold_lkb(lkb);
4087 		memset(ms_stub, 0, sizeof(struct dlm_message));
4088 		ms_stub->m_flags = DLM_IFL_STUB_MS;
4089 		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4090 		ms_stub->m_result = -EINPROGRESS;
4091 		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4092 		_receive_convert_reply(lkb, ms_stub);
4093 
4094 		/* Same special case as in receive_rcom_lock_args() */
4095 		lkb->lkb_grmode = DLM_LOCK_IV;
4096 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4097 		unhold_lkb(lkb);
4098 
4099 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4100 		lkb->lkb_flags |= DLM_IFL_RESEND;
4101 	}
4102 
4103 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4104 	   conversions are async; there's no reply from the remote master */
4105 }
4106 
4107 /* A waiting lkb needs recovery if the master node has failed, or
4108    the master node is changing (only when no directory is used) */
4109 
4110 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4111 {
4112 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
4113 		return 1;
4114 
4115 	if (!dlm_no_directory(ls))
4116 		return 0;
4117 
4118 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4119 		return 1;
4120 
4121 	return 0;
4122 }
4123 
4124 /* Recovery for locks that are waiting for replies from nodes that are now
4125    gone.  We can just complete unlocks and cancels by faking a reply from the
4126    dead node.  Requests and up-conversions we flag to be resent after
4127    recovery.  Down-conversions can just be completed with a fake reply like
4128    unlocks.  Conversions between PR and CW need special attention. */
4129 
4130 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4131 {
4132 	struct dlm_lkb *lkb, *safe;
4133 	struct dlm_message *ms_stub;
4134 	int wait_type, stub_unlock_result, stub_cancel_result;
4135 
4136 	ms_stub = kmalloc(GFP_KERNEL, sizeof(struct dlm_message));
4137 	if (!ms_stub) {
4138 		log_error(ls, "dlm_recover_waiters_pre no mem");
4139 		return;
4140 	}
4141 
4142 	mutex_lock(&ls->ls_waiters_mutex);
4143 
4144 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4145 
4146 		/* exclude debug messages about unlocks because there can be so
4147 		   many and they aren't very interesting */
4148 
4149 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4150 			log_debug(ls, "recover_waiter %x nodeid %d "
4151 				  "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4152 				  lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4153 		}
4154 
4155 		/* all outstanding lookups, regardless of destination  will be
4156 		   resent after recovery is done */
4157 
4158 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4159 			lkb->lkb_flags |= DLM_IFL_RESEND;
4160 			continue;
4161 		}
4162 
4163 		if (!waiter_needs_recovery(ls, lkb))
4164 			continue;
4165 
4166 		wait_type = lkb->lkb_wait_type;
4167 		stub_unlock_result = -DLM_EUNLOCK;
4168 		stub_cancel_result = -DLM_ECANCEL;
4169 
4170 		/* Main reply may have been received leaving a zero wait_type,
4171 		   but a reply for the overlapping op may not have been
4172 		   received.  In that case we need to fake the appropriate
4173 		   reply for the overlap op. */
4174 
4175 		if (!wait_type) {
4176 			if (is_overlap_cancel(lkb)) {
4177 				wait_type = DLM_MSG_CANCEL;
4178 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4179 					stub_cancel_result = 0;
4180 			}
4181 			if (is_overlap_unlock(lkb)) {
4182 				wait_type = DLM_MSG_UNLOCK;
4183 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4184 					stub_unlock_result = -ENOENT;
4185 			}
4186 
4187 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4188 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
4189 				  stub_cancel_result, stub_unlock_result);
4190 		}
4191 
4192 		switch (wait_type) {
4193 
4194 		case DLM_MSG_REQUEST:
4195 			lkb->lkb_flags |= DLM_IFL_RESEND;
4196 			break;
4197 
4198 		case DLM_MSG_CONVERT:
4199 			recover_convert_waiter(ls, lkb, ms_stub);
4200 			break;
4201 
4202 		case DLM_MSG_UNLOCK:
4203 			hold_lkb(lkb);
4204 			memset(ms_stub, 0, sizeof(struct dlm_message));
4205 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4206 			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4207 			ms_stub->m_result = stub_unlock_result;
4208 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4209 			_receive_unlock_reply(lkb, ms_stub);
4210 			dlm_put_lkb(lkb);
4211 			break;
4212 
4213 		case DLM_MSG_CANCEL:
4214 			hold_lkb(lkb);
4215 			memset(ms_stub, 0, sizeof(struct dlm_message));
4216 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4217 			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4218 			ms_stub->m_result = stub_cancel_result;
4219 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4220 			_receive_cancel_reply(lkb, ms_stub);
4221 			dlm_put_lkb(lkb);
4222 			break;
4223 
4224 		default:
4225 			log_error(ls, "invalid lkb wait_type %d %d",
4226 				  lkb->lkb_wait_type, wait_type);
4227 		}
4228 		schedule();
4229 	}
4230 	mutex_unlock(&ls->ls_waiters_mutex);
4231 	kfree(ms_stub);
4232 }
4233 
4234 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4235 {
4236 	struct dlm_lkb *lkb;
4237 	int found = 0;
4238 
4239 	mutex_lock(&ls->ls_waiters_mutex);
4240 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4241 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4242 			hold_lkb(lkb);
4243 			found = 1;
4244 			break;
4245 		}
4246 	}
4247 	mutex_unlock(&ls->ls_waiters_mutex);
4248 
4249 	if (!found)
4250 		lkb = NULL;
4251 	return lkb;
4252 }
4253 
4254 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4255    master or dir-node for r.  Processing the lkb may result in it being placed
4256    back on waiters. */
4257 
4258 /* We do this after normal locking has been enabled and any saved messages
4259    (in requestqueue) have been processed.  We should be confident that at
4260    this point we won't get or process a reply to any of these waiting
4261    operations.  But, new ops may be coming in on the rsbs/locks here from
4262    userspace or remotely. */
4263 
4264 /* there may have been an overlap unlock/cancel prior to recovery or after
4265    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4266    overlap flag would just have been set and nothing new sent.  we can be
4267    confident here than any replies to either the initial op or overlap ops
4268    prior to recovery have been received. */
4269 
4270 int dlm_recover_waiters_post(struct dlm_ls *ls)
4271 {
4272 	struct dlm_lkb *lkb;
4273 	struct dlm_rsb *r;
4274 	int error = 0, mstype, err, oc, ou;
4275 
4276 	while (1) {
4277 		if (dlm_locking_stopped(ls)) {
4278 			log_debug(ls, "recover_waiters_post aborted");
4279 			error = -EINTR;
4280 			break;
4281 		}
4282 
4283 		lkb = find_resend_waiter(ls);
4284 		if (!lkb)
4285 			break;
4286 
4287 		r = lkb->lkb_resource;
4288 		hold_rsb(r);
4289 		lock_rsb(r);
4290 
4291 		mstype = lkb->lkb_wait_type;
4292 		oc = is_overlap_cancel(lkb);
4293 		ou = is_overlap_unlock(lkb);
4294 		err = 0;
4295 
4296 		log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4297 			  lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4298 
4299 		/* At this point we assume that we won't get a reply to any
4300 		   previous op or overlap op on this lock.  First, do a big
4301 		   remove_from_waiters() for all previous ops. */
4302 
4303 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4304 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4305 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4306 		lkb->lkb_wait_type = 0;
4307 		lkb->lkb_wait_count = 0;
4308 		mutex_lock(&ls->ls_waiters_mutex);
4309 		list_del_init(&lkb->lkb_wait_reply);
4310 		mutex_unlock(&ls->ls_waiters_mutex);
4311 		unhold_lkb(lkb); /* for waiters list */
4312 
4313 		if (oc || ou) {
4314 			/* do an unlock or cancel instead of resending */
4315 			switch (mstype) {
4316 			case DLM_MSG_LOOKUP:
4317 			case DLM_MSG_REQUEST:
4318 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4319 							-DLM_ECANCEL);
4320 				unhold_lkb(lkb); /* undoes create_lkb() */
4321 				break;
4322 			case DLM_MSG_CONVERT:
4323 				if (oc) {
4324 					queue_cast(r, lkb, -DLM_ECANCEL);
4325 				} else {
4326 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4327 					_unlock_lock(r, lkb);
4328 				}
4329 				break;
4330 			default:
4331 				err = 1;
4332 			}
4333 		} else {
4334 			switch (mstype) {
4335 			case DLM_MSG_LOOKUP:
4336 			case DLM_MSG_REQUEST:
4337 				_request_lock(r, lkb);
4338 				if (is_master(r))
4339 					confirm_master(r, 0);
4340 				break;
4341 			case DLM_MSG_CONVERT:
4342 				_convert_lock(r, lkb);
4343 				break;
4344 			default:
4345 				err = 1;
4346 			}
4347 		}
4348 
4349 		if (err)
4350 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4351 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4352 		unlock_rsb(r);
4353 		put_rsb(r);
4354 		dlm_put_lkb(lkb);
4355 	}
4356 
4357 	return error;
4358 }
4359 
4360 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4361 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4362 {
4363 	struct dlm_ls *ls = r->res_ls;
4364 	struct dlm_lkb *lkb, *safe;
4365 
4366 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4367 		if (test(ls, lkb)) {
4368 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4369 			del_lkb(r, lkb);
4370 			/* this put should free the lkb */
4371 			if (!dlm_put_lkb(lkb))
4372 				log_error(ls, "purged lkb not released");
4373 		}
4374 	}
4375 }
4376 
4377 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4378 {
4379 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4380 }
4381 
4382 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4383 {
4384 	return is_master_copy(lkb);
4385 }
4386 
4387 static void purge_dead_locks(struct dlm_rsb *r)
4388 {
4389 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4390 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4391 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4392 }
4393 
4394 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4395 {
4396 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4397 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4398 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4399 }
4400 
4401 /* Get rid of locks held by nodes that are gone. */
4402 
4403 int dlm_purge_locks(struct dlm_ls *ls)
4404 {
4405 	struct dlm_rsb *r;
4406 
4407 	log_debug(ls, "dlm_purge_locks");
4408 
4409 	down_write(&ls->ls_root_sem);
4410 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4411 		hold_rsb(r);
4412 		lock_rsb(r);
4413 		if (is_master(r))
4414 			purge_dead_locks(r);
4415 		unlock_rsb(r);
4416 		unhold_rsb(r);
4417 
4418 		schedule();
4419 	}
4420 	up_write(&ls->ls_root_sem);
4421 
4422 	return 0;
4423 }
4424 
4425 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4426 {
4427 	struct dlm_rsb *r, *r_ret = NULL;
4428 
4429 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4430 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4431 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4432 			continue;
4433 		hold_rsb(r);
4434 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4435 		r_ret = r;
4436 		break;
4437 	}
4438 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4439 	return r_ret;
4440 }
4441 
4442 void dlm_grant_after_purge(struct dlm_ls *ls)
4443 {
4444 	struct dlm_rsb *r;
4445 	int bucket = 0;
4446 
4447 	while (1) {
4448 		r = find_purged_rsb(ls, bucket);
4449 		if (!r) {
4450 			if (bucket == ls->ls_rsbtbl_size - 1)
4451 				break;
4452 			bucket++;
4453 			continue;
4454 		}
4455 		lock_rsb(r);
4456 		if (is_master(r)) {
4457 			grant_pending_locks(r);
4458 			confirm_master(r, 0);
4459 		}
4460 		unlock_rsb(r);
4461 		put_rsb(r);
4462 		schedule();
4463 	}
4464 }
4465 
4466 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4467 					 uint32_t remid)
4468 {
4469 	struct dlm_lkb *lkb;
4470 
4471 	list_for_each_entry(lkb, head, lkb_statequeue) {
4472 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4473 			return lkb;
4474 	}
4475 	return NULL;
4476 }
4477 
4478 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4479 				    uint32_t remid)
4480 {
4481 	struct dlm_lkb *lkb;
4482 
4483 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4484 	if (lkb)
4485 		return lkb;
4486 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4487 	if (lkb)
4488 		return lkb;
4489 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4490 	if (lkb)
4491 		return lkb;
4492 	return NULL;
4493 }
4494 
4495 /* needs at least dlm_rcom + rcom_lock */
4496 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4497 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4498 {
4499 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4500 
4501 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4502 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4503 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4504 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4505 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4506 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4507 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4508 	lkb->lkb_rqmode = rl->rl_rqmode;
4509 	lkb->lkb_grmode = rl->rl_grmode;
4510 	/* don't set lkb_status because add_lkb wants to itself */
4511 
4512 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4513 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4514 
4515 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4516 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4517 			 sizeof(struct rcom_lock);
4518 		if (lvblen > ls->ls_lvblen)
4519 			return -EINVAL;
4520 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4521 		if (!lkb->lkb_lvbptr)
4522 			return -ENOMEM;
4523 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4524 	}
4525 
4526 	/* Conversions between PR and CW (middle modes) need special handling.
4527 	   The real granted mode of these converting locks cannot be determined
4528 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4529 
4530 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4531 	    middle_conversion(lkb)) {
4532 		rl->rl_status = DLM_LKSTS_CONVERT;
4533 		lkb->lkb_grmode = DLM_LOCK_IV;
4534 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4535 	}
4536 
4537 	return 0;
4538 }
4539 
4540 /* This lkb may have been recovered in a previous aborted recovery so we need
4541    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4542    If so we just send back a standard reply.  If not, we create a new lkb with
4543    the given values and send back our lkid.  We send back our lkid by sending
4544    back the rcom_lock struct we got but with the remid field filled in. */
4545 
4546 /* needs at least dlm_rcom + rcom_lock */
4547 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4548 {
4549 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4550 	struct dlm_rsb *r;
4551 	struct dlm_lkb *lkb;
4552 	int error;
4553 
4554 	if (rl->rl_parent_lkid) {
4555 		error = -EOPNOTSUPP;
4556 		goto out;
4557 	}
4558 
4559 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4560 			 R_MASTER, &r);
4561 	if (error)
4562 		goto out;
4563 
4564 	lock_rsb(r);
4565 
4566 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4567 	if (lkb) {
4568 		error = -EEXIST;
4569 		goto out_remid;
4570 	}
4571 
4572 	error = create_lkb(ls, &lkb);
4573 	if (error)
4574 		goto out_unlock;
4575 
4576 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4577 	if (error) {
4578 		__put_lkb(ls, lkb);
4579 		goto out_unlock;
4580 	}
4581 
4582 	attach_lkb(r, lkb);
4583 	add_lkb(r, lkb, rl->rl_status);
4584 	error = 0;
4585 
4586  out_remid:
4587 	/* this is the new value returned to the lock holder for
4588 	   saving in its process-copy lkb */
4589 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4590 
4591  out_unlock:
4592 	unlock_rsb(r);
4593 	put_rsb(r);
4594  out:
4595 	if (error)
4596 		log_debug(ls, "recover_master_copy %d %x", error,
4597 			  le32_to_cpu(rl->rl_lkid));
4598 	rl->rl_result = cpu_to_le32(error);
4599 	return error;
4600 }
4601 
4602 /* needs at least dlm_rcom + rcom_lock */
4603 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4604 {
4605 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4606 	struct dlm_rsb *r;
4607 	struct dlm_lkb *lkb;
4608 	int error;
4609 
4610 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4611 	if (error) {
4612 		log_error(ls, "recover_process_copy no lkid %x",
4613 				le32_to_cpu(rl->rl_lkid));
4614 		return error;
4615 	}
4616 
4617 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4618 
4619 	error = le32_to_cpu(rl->rl_result);
4620 
4621 	r = lkb->lkb_resource;
4622 	hold_rsb(r);
4623 	lock_rsb(r);
4624 
4625 	switch (error) {
4626 	case -EBADR:
4627 		/* There's a chance the new master received our lock before
4628 		   dlm_recover_master_reply(), this wouldn't happen if we did
4629 		   a barrier between recover_masters and recover_locks. */
4630 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4631 			  (unsigned long)r, r->res_name);
4632 		dlm_send_rcom_lock(r, lkb);
4633 		goto out;
4634 	case -EEXIST:
4635 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4636 		/* fall through */
4637 	case 0:
4638 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4639 		break;
4640 	default:
4641 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4642 			  error, lkb->lkb_id);
4643 	}
4644 
4645 	/* an ack for dlm_recover_locks() which waits for replies from
4646 	   all the locks it sends to new masters */
4647 	dlm_recovered_lock(r);
4648  out:
4649 	unlock_rsb(r);
4650 	put_rsb(r);
4651 	dlm_put_lkb(lkb);
4652 
4653 	return 0;
4654 }
4655 
4656 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4657 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4658 		     unsigned long timeout_cs)
4659 {
4660 	struct dlm_lkb *lkb;
4661 	struct dlm_args args;
4662 	int error;
4663 
4664 	dlm_lock_recovery(ls);
4665 
4666 	error = create_lkb(ls, &lkb);
4667 	if (error) {
4668 		kfree(ua);
4669 		goto out;
4670 	}
4671 
4672 	if (flags & DLM_LKF_VALBLK) {
4673 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4674 		if (!ua->lksb.sb_lvbptr) {
4675 			kfree(ua);
4676 			__put_lkb(ls, lkb);
4677 			error = -ENOMEM;
4678 			goto out;
4679 		}
4680 	}
4681 
4682 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4683 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4684 	   lock and that lkb_astparam is the dlm_user_args structure. */
4685 
4686 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4687 			      fake_astfn, ua, fake_bastfn, &args);
4688 	lkb->lkb_flags |= DLM_IFL_USER;
4689 
4690 	if (error) {
4691 		__put_lkb(ls, lkb);
4692 		goto out;
4693 	}
4694 
4695 	error = request_lock(ls, lkb, name, namelen, &args);
4696 
4697 	switch (error) {
4698 	case 0:
4699 		break;
4700 	case -EINPROGRESS:
4701 		error = 0;
4702 		break;
4703 	case -EAGAIN:
4704 		error = 0;
4705 		/* fall through */
4706 	default:
4707 		__put_lkb(ls, lkb);
4708 		goto out;
4709 	}
4710 
4711 	/* add this new lkb to the per-process list of locks */
4712 	spin_lock(&ua->proc->locks_spin);
4713 	hold_lkb(lkb);
4714 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4715 	spin_unlock(&ua->proc->locks_spin);
4716  out:
4717 	dlm_unlock_recovery(ls);
4718 	return error;
4719 }
4720 
4721 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4722 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4723 		     unsigned long timeout_cs)
4724 {
4725 	struct dlm_lkb *lkb;
4726 	struct dlm_args args;
4727 	struct dlm_user_args *ua;
4728 	int error;
4729 
4730 	dlm_lock_recovery(ls);
4731 
4732 	error = find_lkb(ls, lkid, &lkb);
4733 	if (error)
4734 		goto out;
4735 
4736 	/* user can change the params on its lock when it converts it, or
4737 	   add an lvb that didn't exist before */
4738 
4739 	ua = lkb->lkb_ua;
4740 
4741 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4742 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4743 		if (!ua->lksb.sb_lvbptr) {
4744 			error = -ENOMEM;
4745 			goto out_put;
4746 		}
4747 	}
4748 	if (lvb_in && ua->lksb.sb_lvbptr)
4749 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4750 
4751 	ua->xid = ua_tmp->xid;
4752 	ua->castparam = ua_tmp->castparam;
4753 	ua->castaddr = ua_tmp->castaddr;
4754 	ua->bastparam = ua_tmp->bastparam;
4755 	ua->bastaddr = ua_tmp->bastaddr;
4756 	ua->user_lksb = ua_tmp->user_lksb;
4757 
4758 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4759 			      fake_astfn, ua, fake_bastfn, &args);
4760 	if (error)
4761 		goto out_put;
4762 
4763 	error = convert_lock(ls, lkb, &args);
4764 
4765 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4766 		error = 0;
4767  out_put:
4768 	dlm_put_lkb(lkb);
4769  out:
4770 	dlm_unlock_recovery(ls);
4771 	kfree(ua_tmp);
4772 	return error;
4773 }
4774 
4775 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4776 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4777 {
4778 	struct dlm_lkb *lkb;
4779 	struct dlm_args args;
4780 	struct dlm_user_args *ua;
4781 	int error;
4782 
4783 	dlm_lock_recovery(ls);
4784 
4785 	error = find_lkb(ls, lkid, &lkb);
4786 	if (error)
4787 		goto out;
4788 
4789 	ua = lkb->lkb_ua;
4790 
4791 	if (lvb_in && ua->lksb.sb_lvbptr)
4792 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4793 	if (ua_tmp->castparam)
4794 		ua->castparam = ua_tmp->castparam;
4795 	ua->user_lksb = ua_tmp->user_lksb;
4796 
4797 	error = set_unlock_args(flags, ua, &args);
4798 	if (error)
4799 		goto out_put;
4800 
4801 	error = unlock_lock(ls, lkb, &args);
4802 
4803 	if (error == -DLM_EUNLOCK)
4804 		error = 0;
4805 	/* from validate_unlock_args() */
4806 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4807 		error = 0;
4808 	if (error)
4809 		goto out_put;
4810 
4811 	spin_lock(&ua->proc->locks_spin);
4812 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4813 	if (!list_empty(&lkb->lkb_ownqueue))
4814 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4815 	spin_unlock(&ua->proc->locks_spin);
4816  out_put:
4817 	dlm_put_lkb(lkb);
4818  out:
4819 	dlm_unlock_recovery(ls);
4820 	kfree(ua_tmp);
4821 	return error;
4822 }
4823 
4824 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4825 		    uint32_t flags, uint32_t lkid)
4826 {
4827 	struct dlm_lkb *lkb;
4828 	struct dlm_args args;
4829 	struct dlm_user_args *ua;
4830 	int error;
4831 
4832 	dlm_lock_recovery(ls);
4833 
4834 	error = find_lkb(ls, lkid, &lkb);
4835 	if (error)
4836 		goto out;
4837 
4838 	ua = lkb->lkb_ua;
4839 	if (ua_tmp->castparam)
4840 		ua->castparam = ua_tmp->castparam;
4841 	ua->user_lksb = ua_tmp->user_lksb;
4842 
4843 	error = set_unlock_args(flags, ua, &args);
4844 	if (error)
4845 		goto out_put;
4846 
4847 	error = cancel_lock(ls, lkb, &args);
4848 
4849 	if (error == -DLM_ECANCEL)
4850 		error = 0;
4851 	/* from validate_unlock_args() */
4852 	if (error == -EBUSY)
4853 		error = 0;
4854  out_put:
4855 	dlm_put_lkb(lkb);
4856  out:
4857 	dlm_unlock_recovery(ls);
4858 	kfree(ua_tmp);
4859 	return error;
4860 }
4861 
4862 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4863 {
4864 	struct dlm_lkb *lkb;
4865 	struct dlm_args args;
4866 	struct dlm_user_args *ua;
4867 	struct dlm_rsb *r;
4868 	int error;
4869 
4870 	dlm_lock_recovery(ls);
4871 
4872 	error = find_lkb(ls, lkid, &lkb);
4873 	if (error)
4874 		goto out;
4875 
4876 	ua = lkb->lkb_ua;
4877 
4878 	error = set_unlock_args(flags, ua, &args);
4879 	if (error)
4880 		goto out_put;
4881 
4882 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4883 
4884 	r = lkb->lkb_resource;
4885 	hold_rsb(r);
4886 	lock_rsb(r);
4887 
4888 	error = validate_unlock_args(lkb, &args);
4889 	if (error)
4890 		goto out_r;
4891 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4892 
4893 	error = _cancel_lock(r, lkb);
4894  out_r:
4895 	unlock_rsb(r);
4896 	put_rsb(r);
4897 
4898 	if (error == -DLM_ECANCEL)
4899 		error = 0;
4900 	/* from validate_unlock_args() */
4901 	if (error == -EBUSY)
4902 		error = 0;
4903  out_put:
4904 	dlm_put_lkb(lkb);
4905  out:
4906 	dlm_unlock_recovery(ls);
4907 	return error;
4908 }
4909 
4910 /* lkb's that are removed from the waiters list by revert are just left on the
4911    orphans list with the granted orphan locks, to be freed by purge */
4912 
4913 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4914 {
4915 	struct dlm_args args;
4916 	int error;
4917 
4918 	hold_lkb(lkb);
4919 	mutex_lock(&ls->ls_orphans_mutex);
4920 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4921 	mutex_unlock(&ls->ls_orphans_mutex);
4922 
4923 	set_unlock_args(0, lkb->lkb_ua, &args);
4924 
4925 	error = cancel_lock(ls, lkb, &args);
4926 	if (error == -DLM_ECANCEL)
4927 		error = 0;
4928 	return error;
4929 }
4930 
4931 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4932    Regardless of what rsb queue the lock is on, it's removed and freed. */
4933 
4934 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4935 {
4936 	struct dlm_args args;
4937 	int error;
4938 
4939 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4940 
4941 	error = unlock_lock(ls, lkb, &args);
4942 	if (error == -DLM_EUNLOCK)
4943 		error = 0;
4944 	return error;
4945 }
4946 
4947 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4948    (which does lock_rsb) due to deadlock with receiving a message that does
4949    lock_rsb followed by dlm_user_add_ast() */
4950 
4951 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4952 				     struct dlm_user_proc *proc)
4953 {
4954 	struct dlm_lkb *lkb = NULL;
4955 
4956 	mutex_lock(&ls->ls_clear_proc_locks);
4957 	if (list_empty(&proc->locks))
4958 		goto out;
4959 
4960 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4961 	list_del_init(&lkb->lkb_ownqueue);
4962 
4963 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4964 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4965 	else
4966 		lkb->lkb_flags |= DLM_IFL_DEAD;
4967  out:
4968 	mutex_unlock(&ls->ls_clear_proc_locks);
4969 	return lkb;
4970 }
4971 
4972 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4973    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4974    which we clear here. */
4975 
4976 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4977    list, and no more device_writes should add lkb's to proc->locks list; so we
4978    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4979    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4980    them ourself. */
4981 
4982 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4983 {
4984 	struct dlm_lkb *lkb, *safe;
4985 
4986 	dlm_lock_recovery(ls);
4987 
4988 	while (1) {
4989 		lkb = del_proc_lock(ls, proc);
4990 		if (!lkb)
4991 			break;
4992 		del_timeout(lkb);
4993 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4994 			orphan_proc_lock(ls, lkb);
4995 		else
4996 			unlock_proc_lock(ls, lkb);
4997 
4998 		/* this removes the reference for the proc->locks list
4999 		   added by dlm_user_request, it may result in the lkb
5000 		   being freed */
5001 
5002 		dlm_put_lkb(lkb);
5003 	}
5004 
5005 	mutex_lock(&ls->ls_clear_proc_locks);
5006 
5007 	/* in-progress unlocks */
5008 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5009 		list_del_init(&lkb->lkb_ownqueue);
5010 		lkb->lkb_flags |= DLM_IFL_DEAD;
5011 		dlm_put_lkb(lkb);
5012 	}
5013 
5014 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
5015 		memset(&lkb->lkb_callbacks, 0,
5016 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5017 		list_del_init(&lkb->lkb_astqueue);
5018 		dlm_put_lkb(lkb);
5019 	}
5020 
5021 	mutex_unlock(&ls->ls_clear_proc_locks);
5022 	dlm_unlock_recovery(ls);
5023 }
5024 
5025 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5026 {
5027 	struct dlm_lkb *lkb, *safe;
5028 
5029 	while (1) {
5030 		lkb = NULL;
5031 		spin_lock(&proc->locks_spin);
5032 		if (!list_empty(&proc->locks)) {
5033 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
5034 					 lkb_ownqueue);
5035 			list_del_init(&lkb->lkb_ownqueue);
5036 		}
5037 		spin_unlock(&proc->locks_spin);
5038 
5039 		if (!lkb)
5040 			break;
5041 
5042 		lkb->lkb_flags |= DLM_IFL_DEAD;
5043 		unlock_proc_lock(ls, lkb);
5044 		dlm_put_lkb(lkb); /* ref from proc->locks list */
5045 	}
5046 
5047 	spin_lock(&proc->locks_spin);
5048 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5049 		list_del_init(&lkb->lkb_ownqueue);
5050 		lkb->lkb_flags |= DLM_IFL_DEAD;
5051 		dlm_put_lkb(lkb);
5052 	}
5053 	spin_unlock(&proc->locks_spin);
5054 
5055 	spin_lock(&proc->asts_spin);
5056 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
5057 		memset(&lkb->lkb_callbacks, 0,
5058 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5059 		list_del_init(&lkb->lkb_astqueue);
5060 		dlm_put_lkb(lkb);
5061 	}
5062 	spin_unlock(&proc->asts_spin);
5063 }
5064 
5065 /* pid of 0 means purge all orphans */
5066 
5067 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5068 {
5069 	struct dlm_lkb *lkb, *safe;
5070 
5071 	mutex_lock(&ls->ls_orphans_mutex);
5072 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5073 		if (pid && lkb->lkb_ownpid != pid)
5074 			continue;
5075 		unlock_proc_lock(ls, lkb);
5076 		list_del_init(&lkb->lkb_ownqueue);
5077 		dlm_put_lkb(lkb);
5078 	}
5079 	mutex_unlock(&ls->ls_orphans_mutex);
5080 }
5081 
5082 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5083 {
5084 	struct dlm_message *ms;
5085 	struct dlm_mhandle *mh;
5086 	int error;
5087 
5088 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5089 				DLM_MSG_PURGE, &ms, &mh);
5090 	if (error)
5091 		return error;
5092 	ms->m_nodeid = nodeid;
5093 	ms->m_pid = pid;
5094 
5095 	return send_message(mh, ms);
5096 }
5097 
5098 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5099 		   int nodeid, int pid)
5100 {
5101 	int error = 0;
5102 
5103 	if (nodeid != dlm_our_nodeid()) {
5104 		error = send_purge(ls, nodeid, pid);
5105 	} else {
5106 		dlm_lock_recovery(ls);
5107 		if (pid == current->pid)
5108 			purge_proc_locks(ls, proc);
5109 		else
5110 			do_purge(ls, nodeid, pid);
5111 		dlm_unlock_recovery(ls);
5112 	}
5113 	return error;
5114 }
5115 
5116